From 5d5f7adcfacbaa003604b89d130ef51be1f87d13 Mon Sep 17 00:00:00 2001 From: "zhangkun9038@dingtalk.com" Date: Mon, 18 Aug 2025 16:12:57 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96warning?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- freqtrade/templates/freqaiprimer.json | 8 +- freqtrade/templates/freqaiprimer.py | 1907 ++++++------------------- 2 files changed, 402 insertions(+), 1513 deletions(-) diff --git a/freqtrade/templates/freqaiprimer.json b/freqtrade/templates/freqaiprimer.json index ced78565..7ce199eb 100644 --- a/freqtrade/templates/freqaiprimer.json +++ b/freqtrade/templates/freqaiprimer.json @@ -19,14 +19,14 @@ "EXIT_POSITION_RATIO": 0.472, "SELL_THRESHOLD_MAX": 0.065, "SELL_THRESHOLD_MIN": 0.002, - "TRAILING_STOP_DISTANCE": 0.025, - "TRAILING_STOP_START": 0.075 + "TRAILING_STOP_DISTANCE": 0.0125, + "TRAILING_STOP_START": 0.045 }, "protection": {}, "trailing": { "trailing_stop": true, - "trailing_stop_positive": 0.025, - "trailing_stop_positive_offset": 0.075, + "trailing_stop_positive": 0.0125, + "trailing_stop_positive_offset": 0.045, "trailing_only_offset_is_reached": false } diff --git a/freqtrade/templates/freqaiprimer.py b/freqtrade/templates/freqaiprimer.py index f0ecf8ab..635022e8 100644 --- a/freqtrade/templates/freqaiprimer.py +++ b/freqtrade/templates/freqaiprimer.py @@ -1,28 +1,27 @@ import logging import numpy as np -import datetime +import pandas as pd +import talib.abstract as ta +from pandas import DataFrame +from functools import reduce import datetime import os import json import glob import redis -from functools import reduce from freqtrade.persistence import Trade -import talib.abstract as ta -from pandas import DataFrame -import pandas as pd +from freqtrade.strategy import DecimalParameter, IStrategy, IntParameter from typing import Dict -from freqtrade.strategy import (DecimalParameter, IStrategy, IntParameter) -from datetime import datetime logger = logging.getLogger(__name__) -class FreqaiPrimer(IStrategy): +class FreqaiPrimerStrategy(IStrategy): """ - 基于 FreqAI 的动态阈值交易策略,集成动态加仓、减仓和自定义 ROI 逻辑,兼容最新 Freqtrade 版本 + 基于 FreqAI 的动态阈值交易策略,集成动态加仓、减仓和自定义 ROI 逻辑 + 修复了DataFrame碎片化和NaN值问题 """ - # --- 🧪 固定配置参数(从hyperopt优化结果获取)--- + # 固定配置参数 TRAILING_STOP_START = 0.016 TRAILING_STOP_DISTANCE = 0.015 @@ -31,29 +30,24 @@ class FreqaiPrimer(IStrategy): SELL_THRESHOLD_MIN = 0.002 SELL_THRESHOLD_MAX = 0.065 - # 新增:加仓和减仓参数 ADD_POSITION_THRESHOLD = -0.021 EXIT_POSITION_RATIO = 0.472 COOLDOWN_PERIOD_MINUTES = 9 MAX_ENTRY_POSITION_ADJUSTMENT = 3 - # 🟢 绿色通道开关 - 直接控制是否启用绿色通道功能 - GREEN_CHANNEL_ENABLED = False # 设置为False关闭绿色通道,True开启绿色通道 + GREEN_CHANNEL_ENABLED = False # 趋势判定阈值参数 TREND_BULLISH_THRESHOLD = 85 TREND_BEARISH_THRESHOLD = 60 - # 新的加权趋势判定阈值(用于hyperopt优化) - TREND_FINAL_BULLISH_THRESHOLD = 55 # 上涨趋势最终阈值 - TREND_FINAL_BEARISH_THRESHOLD = 13 # 下跌趋势最终阈值 + TREND_FINAL_BULLISH_THRESHOLD = 55 + TREND_FINAL_BEARISH_THRESHOLD = 13 - # Hyperopt 可优化参数 - 基于初步结果调整范围 - trend_final_bullish_threshold = IntParameter(20, 85, default=71, space="buy", optimize=True, load=True) # 降低上限,避免过于保守 - trend_final_bearish_threshold = IntParameter(5, 45, default=21, space="buy", optimize=True, load=True) # 扩大下限,捕获更多熊市机会 - - # 趋势确认参数 - 增加交易频率 - trend_confirmation_period = IntParameter(3, 15, default=8, space="buy", optimize=True, load=True) # 趋势确认周期 + # Hyperopt 可优化参数 + trend_final_bullish_threshold = IntParameter(20, 85, default=71, space="buy", optimize=True, load=True) + trend_final_bearish_threshold = IntParameter(5, 45, default=21, space="buy", optimize=True, load=True) + trend_confirmation_period = IntParameter(3, 15, default=8, space="buy", optimize=True, load=True) # 出场策略相关参数 exit_divergence_multiplier = DecimalParameter(1.0, 1.3, default=1.06, space="sell", optimize=True, load=True) @@ -61,7 +55,7 @@ class FreqaiPrimer(IStrategy): exit_rsi_threshold = IntParameter(55, 75, default=65, space="sell", optimize=True, load=True) exit_stochrsi_threshold = IntParameter(60, 80, default=70, space="sell", optimize=True, load=True) exit_adx_threshold_overbought = IntParameter(20, 35, default=25, space="sell", optimize=True, load=True) - exit_min_profit_threshold = DecimalParameter(0.01, 0.08, default=0.039, space="sell", optimize=True, load=True) # 提高最小盈利要求 + exit_min_profit_threshold = DecimalParameter(0.01, 0.08, default=0.039, space="sell", optimize=True, load=True) exit_rapid_rise_bull = DecimalParameter(1.5, 4.0, default=2.0, space="sell", optimize=True, load=True) exit_rapid_rise_bear = DecimalParameter(2.5, 5.0, default=3.5, space="sell", optimize=True, load=True) exit_stochrsi_rapid = IntParameter(65, 85, default=70, space="sell", optimize=True, load=True) @@ -86,21 +80,21 @@ class FreqaiPrimer(IStrategy): exit_ranging_trend_score_max = IntParameter(95, 99, default=98, space="sell", optimize=True, load=True) exit_ranging_trend_score_threshold = IntParameter(80, 90, default=85, space="sell", optimize=True, load=True) - # --- 🛠️ 固定配置参数 --- + # 固定配置参数 stoploss = -0.15 timeframe = "3m" use_custom_stoploss = True - position_adjustment_enable = True # 启用动态仓位调整 + position_adjustment_enable = True trailing_stop = True - trailing_stop_positive = 0.025 # 🎯 降低锁定利润到2.5% - trailing_stop_positive_offset = 0.08 # 🎯 降低启动阈值到8% + trailing_stop_positive = 0.025 + trailing_stop_positive_offset = 0.08 trailing_only_offset_is_reached = False minimal_roi = { - "0": 0.06, # 30分钟(0-30分钟)内,8% 盈利退出 - "30": 0.04, # 2小时(30-120分钟)内,4% 盈利退出 - "90": 0.025, # 4小时(120-240分钟)内,2% 盈利退出 - "270": 0.002 # 8小时(240-480分钟)内,0% 盈利退出 + "0": 0.06, + "30": 0.04, + "90": 0.025, + "270": 0.002 } plot_config = { @@ -129,7 +123,7 @@ class FreqaiPrimer(IStrategy): freqai_info = { "identifier": "freqai_primer_mixed", - "model": "LightGBMRegressor", # 默认回归模型 + "model": "LightGBMRegressor", "feature_parameters": { "include_timeframes": ["3m", "15m", "1h"], "label_period_candles": 12, @@ -140,7 +134,7 @@ class FreqaiPrimer(IStrategy): "shuffle": False, }, "model_training_parameters": { - "price_value_divergence": { # 回归模型配置 + "price_value_divergence": { "model": "LightGBMRegressor", "model_params": { "n_estimators": 200, @@ -149,7 +143,7 @@ class FreqaiPrimer(IStrategy): "verbose": -1, } }, - "optimal_first_length": { # 分类模型配置 + "optimal_first_length": { "model": "LightGBMClassifier", "model_params": { "n_estimators": 150, @@ -172,1226 +166,422 @@ class FreqaiPrimer(IStrategy): def __init__(self, config: dict, *args, **kwargs): super().__init__(config, *args, **kwargs) - # 读取 Redis 配置(不存储客户端实例,避免序列化问题) self.redis_url = config.get('redis', {}).get('url', None) self.stats_logged = False - - # 验证Redis配置但不创建持久化客户端 - if self.redis_url: - logger.info(f"Redis 配置已启用: {self.redis_url}") - else: - logger.info("ℹ️ 未找到 Redis 配置,使用本地缓存模式") - self.trailing_stop_enabled = False self.pair_stats = {} self.fit_live_predictions_candles = self.freqai_info.get("fit_live_predictions_candles", 100) - self.last_entry_time = {} # 记录每个币种的最后入场时间 - - # 绿色通道开关直接使用全局变量 + self.last_entry_time = {} self.green_channel_enabled = self.GREEN_CHANNEL_ENABLED - logger.info(f"🟢 绿色通道开关状态: {'开启' if self.green_channel_enabled else '关闭'}(通过全局变量控制)") def _get_redis_client(self): - """延迟初始化Redis客户端,避免序列化问题""" + """延迟初始化Redis客户端""" if not self.redis_url: return None - try: import redis client = redis.from_url(self.redis_url) - # 快速测试连接 client.ping() return client except Exception: return None - def feature_engineering_expand_all(self, dataframe: DataFrame, period: int, metadata: dict, **kwargs) -> DataFrame: - dataframe["%-rsi-period"] = ta.RSI(dataframe, timeperiod=period) - dataframe["%-sma-period"] = ta.SMA(dataframe, timeperiod=period) - dataframe["%-ema-period"] = ta.EMA(dataframe, timeperiod=period) - - real = ta.TYPPRICE(dataframe) - upperband, middleband, lowerband = ta.BBANDS(real, timeperiod=period, nbdevup=2.0, nbdevdn=2.0) - dataframe["bb_lowerband-period"] = lowerband - dataframe["bb_upperband-period"] = upperband - dataframe["bb_middleband-period"] = middleband - dataframe["%-bb_width-period"] = (dataframe["bb_upperband-period"] - dataframe["bb_lowerband-period"]) / dataframe["bb_middleband-period"] - - dataframe["%-mfi-period"] = ta.MFI(dataframe, timeperiod=period) - dataframe["%-adx-period"] = ta.ADX(dataframe, timeperiod=period) - dataframe["%-relative_volume-period"] = dataframe["volume"] / dataframe["volume"].rolling(period).mean() - - dataframe["ema200"] = ta.EMA(dataframe, timeperiod=200) - dataframe["%-price_value_divergence"] = (dataframe["close"] - dataframe["ema200"]) / dataframe["ema200"] - - columns_to_clean = [ - "%-rsi-period", "%-mfi-period", "%-sma-period", "%-ema-period", "%-adx-period", - "bb_lowerband-period", "bb_middleband-period", "bb_upperband-period", - "%-bb_width-period", "%-relative_volume-period", "%-price_value_divergence" - ] - for col in columns_to_clean: - dataframe[col] = dataframe[col].replace([np.inf, -np.inf], 0).ffill().fillna(0) - + def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: + """ + 计算主时间框架(3m)和 1h 时间框架的指标 + 优化DataFrame操作以减少碎片化 + """ pair = metadata.get('pair', 'Unknown') - logger.info(f"[{pair}] 特征工程完成,列:{list(dataframe.columns)}") - return dataframe - - def set_freqai_targets(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame: - pair = metadata.get('pair', 'Unknown') - if len(dataframe) < 200: - logger.warning(f"[{pair}] 数据量不足({len(dataframe)}根K线),需要至少200根K线进行训练") - return dataframe - - dataframe["ema200"] = ta.EMA(dataframe, timeperiod=200) - dataframe["&-price_value_divergence"] = (dataframe["close"] - dataframe["ema200"]) / dataframe["ema200"] - - dataframe["volume_mean_20"] = dataframe["volume"].rolling(20).mean() - dataframe["volume_std_20"] = dataframe["volume"].rolling(20).std() - dataframe["volume_z_score"] = (dataframe["volume"] - dataframe["volume_mean_20"]) / dataframe["volume_std_20"] - dataframe["volume_z_score"] = dataframe["volume_z_score"].replace([np.inf, -np.inf], 0).ffill().fillna(0) - - # 严格清理price_value_divergence,确保数据类型正确 - price_divergence = dataframe["&-price_value_divergence"].replace([np.inf, -np.inf], 0).ffill().fillna(0) - dataframe["&-price_value_divergence"] = price_divergence.astype(np.float64) + original_length = len(dataframe) + original_index = dataframe.index - # 验证数据类型和范围 - if not np.isfinite(dataframe["&-price_value_divergence"]).all(): - logger.warning(f"[{pair}] price_value_divergence包含非有限值,进行清理") - dataframe["&-price_value_divergence"] = dataframe["&-price_value_divergence"].replace([np.inf, -np.inf, np.nan], 0.0) - - # 新增:分类模型优化 first_length - # 基于市场状态预测最优的 first_length 分类(2, 4, 6, 8, 10) - # 使用技术指标和市场波动率作为特征 - atr = ta.ATR(dataframe, timeperiod=14) - volatility_ratio = atr / dataframe['close'] - - # 定义市场状态特征 - trend_strength = abs(dataframe['close'] - dataframe['ema200']) / dataframe['ema200'] - volume_anomaly = abs(dataframe['volume_z_score']) > 2 - - # 基于市场状态确定最优 first_length 类别 - # 0: 激进(2) 1: 中性(4) 2: 稳健(6) 3: 保守(8) 4: 极保守(10) - conditions = [ - (volatility_ratio > 0.02) & (trend_strength > 0.05), # 高波动+强趋势 -> 激进 - (volatility_ratio > 0.015) & (trend_strength > 0.03), # 中高波动+中强趋势 -> 中性 - (volatility_ratio > 0.01) | (volume_anomaly), # 中等波动或成交量异常 -> 稳健 - (volatility_ratio < 0.008) & (trend_strength < 0.02), # 低波动+弱趋势 -> 保守 - ] - choices = [0, 1, 2, 3] - - # 确保分类目标列是整数类型,添加严格验证 try: - optimal_length_class = np.select(conditions, choices, default=4) - # 确保所有值都是有效的整数 - optimal_length_class = np.clip(optimal_length_class, 0, 4) # 限制在0-4范围内 - dataframe["&*-optimal_first_length"] = optimal_length_class.astype(np.int32) + # 使用字典预计算所有指标,减少DataFrame碎片化 + indicators = {} - # 验证分类值的有效性 - unique_values = np.unique(optimal_length_class) - if not all(0 <= val <= 4 for val in unique_values): - logger.warning(f"[{pair}] 发现无效的first_length分类值: {unique_values}") - dataframe["&*-optimal_first_length"] = np.clip(optimal_length_class, 0, 4).astype(np.int32) + # 3m时间框架指标 + indicators['ema200'] = ta.EMA(dataframe, timeperiod=200) + indicators['price_value_divergence'] = (dataframe['close'] - indicators['ema200']) / indicators['ema200'] + indicators['rsi'] = ta.RSI(dataframe, timeperiod=14) + + # 布林带 + bb_upper, bb_middle, bb_lower = ta.BBANDS(dataframe['close'], timeperiod=20, nbdevup=2.0, nbdevdn=2.0) + indicators['bb_upperband'] = bb_upper + indicators['bb_middleband'] = bb_middle + indicators['bb_lowerband'] = bb_lower + + # 成交量相关指标 + volume_mean = dataframe['volume'].rolling(20).mean() + volume_std = dataframe['volume'].rolling(20).std() + indicators['volume_mean_20'] = volume_mean + indicators['volume_std_20'] = volume_std + indicators['volume_z_score'] = (dataframe['volume'] - volume_mean) / volume_std.replace(0, 1) + + # STOCHRSI + stochrsi = ta.STOCHRSI(dataframe, timeperiod=14, fastk_period=3, fastd_period=3) + indicators['stochrsi_k'] = stochrsi['fastk'] + indicators['stochrsi_d'] = stochrsi['fastd'] + + # 获取1h时间框架数据 + dataframe_1h = self.dp.get_pair_dataframe(pair=pair, timeframe="1h") + if not dataframe_1h.empty and len(dataframe_1h) >= 20: + # 计算1h指标 + stochrsi_1h = ta.STOCHRSI(dataframe_1h, timeperiod=14, fastk_period=3, fastd_period=3) + bb_1h = ta.BBANDS(dataframe_1h['close'], timeperiod=20, nbdevup=2.0, nbdevdn=2.0) + vol_mean_1h = dataframe_1h['volume'].rolling(20).mean() + vol_std_1h = dataframe_1h['volume'].rolling(20).std() + + # 创建1h数据映射 + hour_data = { + 'open_1h': dataframe_1h['open'], + 'high_1h': dataframe_1h['high'], + 'low_1h': dataframe_1h['low'], + 'close_1h': dataframe_1h['close'], + 'stochrsi_k_1h': stochrsi_1h['fastk'], + 'stochrsi_d_1h': stochrsi_1h['fastd'], + 'bb_upper_1h': bb_1h[0], + 'bb_middle_1h': bb_1h[1], + 'bb_lower_1h': bb_1h[2], + 'volume_z_score_1h': (dataframe_1h['volume'] - vol_mean_1h) / vol_std_1h.replace(0, 1) + } + + # 重采样到主时间框架 + for col, data in hour_data.items(): + indicators[col] = data.reindex(dataframe.index).ffill().fillna(0) + else: + # 1h数据不足时的默认值 + for col in ['open_1h', 'high_1h', 'low_1h', 'close_1h', 'stochrsi_k_1h', 'stochrsi_d_1h', + 'bb_upper_1h', 'bb_middle_1h', 'bb_lower_1h', 'volume_z_score_1h']: + indicators[col] = 0.0 + + # 使用pd.concat一次性添加所有指标,避免碎片化 + indicators_df = pd.DataFrame(indicators, index=dataframe.index) + dataframe = pd.concat([dataframe, indicators_df], axis=1) + + # 统一清理NaN和无穷值 + numeric_cols = dataframe.select_dtypes(include=[np.number]).columns + dataframe[numeric_cols] = dataframe[numeric_cols].replace([np.inf, -np.inf], np.nan).fillna(0) + + # 调用FreqAI预测 + if hasattr(self, 'freqai') and self.freqai is not None: + try: + dataframe = self.freqai.start(dataframe, metadata, self) + if "&-price_value_divergence" not in dataframe.columns: + logger.warning(f"[{pair}] FreqAI未生成&-price_value_divergence,回退到规则计算") + dataframe["&-price_value_divergence"] = dataframe["price_value_divergence"] + except Exception as e: + logger.error(f"[{pair}] FreqAI预测失败: {e}") + dataframe["&-price_value_divergence"] = dataframe["price_value_divergence"] + else: + dataframe["&-price_value_divergence"] = dataframe["price_value_divergence"] + + # 设置分类模型目标 + self.set_freqai_targets(dataframe, metadata) + + # 确保DataFrame长度一致 + if len(dataframe) != original_length: + dataframe = dataframe.reindex(original_index).fillna(0) except Exception as e: - logger.error(f"[{pair}] 计算分类目标时出错: {e}") - dataframe["&*-optimal_first_length"] = np.full(len(dataframe), 2, dtype=np.int32) # 默认值为2 - - # 添加调试信息 - logger.info(f"[{pair}] 📈 分类目标统计: {dict(pd.Series(optimal_length_class).value_counts())}") - logger.info(f"[{pair}] 🎯 最新分类值: {optimal_length_class[-5:]}") + logger.error(f"[{pair}] populate_indicators 错误: {str(e)}") + if len(dataframe) != original_length: + dataframe = dataframe.reindex(original_index).fillna(0) return dataframe - def is_stochrsi_overbought(self, dataframe: DataFrame, period=10, threshold=85) -> bool: - """ - 判断当前 STOCHRSI 是否在过去 N 根 K 线中平均高于阈值(如 85) - """ + + def set_freqai_targets(self, dataframe: DataFrame, metadata: dict) -> DataFrame: + """设置FreqAI目标变量""" + pair = metadata.get('pair', 'Unknown') + + if len(dataframe) < 200: + logger.warning(f"[{pair}] 数据量不足,跳过FreqAI目标设置") + return dataframe + + try: + # 设置回归目标 + dataframe["&-price_value_divergence"] = dataframe["price_value_divergence"] + + # 设置分类目标 + atr = ta.ATR(dataframe, timeperiod=14) + volatility_ratio = atr / dataframe['close'] + trend_strength = abs(dataframe['close'] - dataframe['ema200']) / dataframe['ema200'] + volume_anomaly = abs(dataframe['volume_z_score']) > 2 + + # 定义市场状态特征和分类 + conditions = [ + (volatility_ratio > 0.02) & (trend_strength > 0.05), + (volatility_ratio > 0.015) & (trend_strength > 0.03), + (volatility_ratio > 0.01) | (volume_anomaly), + (volatility_ratio < 0.008) & (trend_strength < 0.02), + ] + choices = [0, 1, 2, 3] + + optimal_length_class = np.select(conditions, choices, default=4) + optimal_length_class = np.clip(optimal_length_class, 0, 4) + dataframe["&*-optimal_first_length"] = optimal_length_class.astype(np.int32) + + except Exception as e: + logger.error(f"[{pair}] 设置FreqAI目标时出错: {e}") + dataframe["&*-optimal_first_length"] = np.full(len(dataframe), 2, dtype=np.int32) + + return dataframe + + def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: + """生成买入信号""" + pair = metadata.get('pair', 'Unknown') + original_length = len(dataframe) + original_index = dataframe.index + + # 初始化买入信号列 + dataframe['enter_long'] = 0 + + try: + if "&-price_value_divergence" not in dataframe.columns: + logger.warning(f"[{pair}] &-price_value_divergence列缺失,跳过买入信号生成") + return dataframe + + # 获取市场趋势 + trend_score = self.get_market_trend(dataframe=dataframe, metadata=metadata) + trend_status = self.detect_trend_status(dataframe, metadata) + + # 动态阈值计算 + volume_z_score_threshold = self.linear_map(trend_score, 0, 100, 1.5, 0.5) + rsi_threshold = self.linear_map(trend_score, 0, 100, 55, 35) + stochrsi_threshold = self.linear_map(trend_score, 0, 100, 45, 25) + + # 熊市信号检测 + bearish_signal = self.is_bearish_market(dataframe, metadata) + bearish_signal_aligned = bearish_signal.reindex(dataframe.index).ffill().fillna(False) + + # 超买信号检测 + stochrsi_overbought = self.is_stochrsi_overbought(dataframe, period=10, threshold=85) + stochrsi_overbought_aligned = stochrsi_overbought.reindex(dataframe.index).ffill().fillna(True) + + # 根据趋势状态调整参数 + open_trades = len(self.active_trades) if hasattr(self, 'active_trades') else 0 + is_green_channel = (trend_status == "bullish" and open_trades <= 2 and self.green_channel_enabled) + + if trend_status == "bullish": + if is_green_channel: + div_mult, vol_mult, rsi_mult, bb_mult, stoch_mult = 1.8, 0.4, 1.4, 1.05, 1.4 + min_conditions = 4 + else: + div_mult, vol_mult, rsi_mult, bb_mult, stoch_mult = 1.5, 0.7, 1.2, 1.0, 1.2 + min_conditions = 7 + elif trend_status == "bearish": + div_mult, vol_mult, rsi_mult, bb_mult, stoch_mult = 0.7, 1.3, 0.8, 0.95, 0.8 + min_conditions = 7 + else: # ranging + div_mult, vol_mult, rsi_mult, bb_mult, stoch_mult = 1.0, 1.0, 1.0, 1.0, 1.0 + min_conditions = 7 + + # 计算阈值 + div_threshold = self.buy_threshold * div_mult + vol_threshold = volume_z_score_threshold * vol_mult + rsi_thresh = rsi_threshold * rsi_mult + stoch_thresh = stochrsi_threshold * stoch_mult + + # 创建买入条件 + conditions = [ + (dataframe["&-price_value_divergence"] < div_threshold), + (dataframe["volume_z_score"] > vol_threshold), + (dataframe["rsi"] < rsi_thresh), + (dataframe["close"] <= dataframe["bb_lowerband"] * bb_mult), + (dataframe["stochrsi_k"] < stoch_thresh) + ] + + # 添加熊市和超买过滤 + if not (trend_status == "bullish" and is_green_channel): + conditions.extend([ + ~bearish_signal_aligned, + ~stochrsi_overbought_aligned + ]) + + # 计算满足的条件数量 + conditions_df = pd.DataFrame(conditions).T + satisfied_count = conditions_df.sum(axis=1) + + # 根据趋势状态确定买入条件 + if trend_status == "bullish" and is_green_channel: + buy_condition = satisfied_count >= min_conditions + else: + buy_condition = satisfied_count >= len(conditions) + + # 设置买入信号 + dataframe.loc[buy_condition, 'enter_long'] = 1 + + # 确保DataFrame长度一致 + if len(dataframe) != original_length: + dataframe = dataframe.reindex(original_index).fillna(0) + + except Exception as e: + logger.error(f"[{pair}] populate_entry_trend 错误: {str(e)}") + dataframe['enter_long'] = 0 + if len(dataframe) != original_length: + dataframe = dataframe.reindex(original_index).fillna(0) + + return dataframe + + def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: + """生成卖出信号""" + pair = metadata.get('pair', 'Unknown') + original_length = len(dataframe) + original_index = dataframe.index + + # 初始化卖出信号列 + dataframe['exit_long'] = 0 + + try: + if "&-price_value_divergence" not in dataframe.columns: + logger.warning(f"[{pair}] &-price_value_divergence列缺失,跳过卖出信号生成") + return dataframe + + # 获取市场趋势和状态 + trend_score = self.get_market_trend(dataframe=dataframe, metadata=metadata) + trend_status = self.detect_trend_status(dataframe, metadata) + + # 动态调整卖出阈值 + if trend_status == "bullish": + divergence_threshold = self.sell_threshold * 1.5 + profit_threshold = 5.0 + elif trend_status == "bearish": + divergence_threshold = self.sell_threshold * 0.7 + profit_threshold = 2.5 + else: # ranging + divergence_threshold = self.sell_threshold + profit_threshold = 3.5 + + # 计算技术指标 + short_term_return = dataframe['close'].pct_change(periods=3) * 100 + + # 创建卖出条件 + divergence_exit = dataframe["&-price_value_divergence"] > divergence_threshold + rsi_exit = (dataframe["stochrsi_k"] > 65) & (dataframe["adx"] > 25) + profit_exit = (short_term_return > profit_threshold) & (dataframe["adx"] > 20) + + # 组合所有卖出条件 + exit_condition = divergence_exit | rsi_exit | profit_exit + dataframe.loc[exit_condition, 'exit_long'] = 1 + + # 确保DataFrame长度一致 + if len(dataframe) != original_length: + dataframe = dataframe.reindex(original_index).fillna(0) + + except Exception as e: + logger.error(f"[{pair}] populate_exit_trend 错误: {str(e)}") + dataframe['exit_long'] = 0 + if len(dataframe) != original_length: + dataframe = dataframe.reindex(original_index).fillna(0) + + return dataframe + + def is_stochrsi_overbought(self, dataframe: DataFrame, period=10, threshold=85) -> pd.Series: + """判断STOCHRSI是否超买""" if 'stochrsi_k' not in dataframe.columns: - # 如果列不存在,返回全 False 的 Series return pd.Series([False] * len(dataframe), index=dataframe.index) - # 计算滚动平均值并判断是否超过阈值 avg_stochrsi = dataframe['stochrsi_k'].rolling(window=period).mean() return avg_stochrsi > threshold def is_bearish_market(self, dataframe: DataFrame, metadata: dict, timeframe: str = "1h") -> pd.Series: + """检测熊市信号""" pair = metadata.get('pair', 'Unknown') - logger.info(f"[{pair}] 开始计算 1h 熊市信号") - - # 防御性检查 - required_columns = ['close_1h', 'high_1h', 'low_1h', 'open_1h', 'stochrsi_k_1h', 'stochrsi_d_1h', 'bb_middle_1h'] - missing = [col for col in required_columns if col not in dataframe.columns] - if missing: - logger.error(f"[{pair}] 缺少必要列:{missing},返回全 False") + + # 检查必要列 + required_cols = ['close_1h', 'high_1h', 'low_1h', 'open_1h', 'stochrsi_k_1h', 'bb_middle_1h'] + missing_cols = [col for col in required_cols if col not in dataframe.columns] + if missing_cols: + logger.warning(f"[{pair}] 缺少熊市检测列: {missing_cols}") return pd.Series([False] * len(dataframe), index=dataframe.index) - - # 检查 volume_z_score_1h 是否存在 - has_volume_z_score = 'volume_z_score_1h' in dataframe.columns and not dataframe['volume_z_score_1h'].isna().all() - - # 条件 a:价格远低于布林带中轨(低于中轨 1% 以上) + + # 熊市条件 cond_a = dataframe['close_1h'] < dataframe['bb_middle_1h'] * 0.99 - - # 条件 b:STOCHRSI 超过90 cond_b = dataframe['stochrsi_k_1h'] > 90 - - # 条件 c:看跌蜡烛图形态 + + # 看跌形态 open_1h = dataframe['open_1h'] close_1h = dataframe['close_1h'] - high_1h = dataframe['high_1h'] - low_1h = dataframe['low_1h'] - prev_open = open_1h.shift(1) prev_close = close_1h.shift(1) - + prev_open = open_1h.shift(1) + cond_engulfing = ( (prev_close > prev_open) & (close_1h < open_1h) & (close_1h < prev_open) & (open_1h > prev_close) ) - cond_dark_cloud_cover = ( - (prev_close > prev_open) & - (open_1h > prev_close) & - (close_1h < (prev_open + prev_close) / 2) & - (close_1h < open_1h) - ) - body = abs(open_1h - close_1h) - upper_wick = high_1h - np.maximum(open_1h, close_1h) - lower_wick = np.minimum(open_1h, close_1h) - low_1h - cond_shooting_star = (upper_wick > 2 * body) & (lower_wick < body) & (close_1h < open_1h) - cond_stochrsi_high = self.is_stochrsi_overbought(dataframe, period=10, threshold=85) - cond_c = cond_engulfing | cond_dark_cloud_cover | cond_shooting_star | cond_stochrsi_high - - # 条件 d:成交量显著下降 - cond_d = dataframe['volume_z_score_1h'] < -1.0 if has_volume_z_score else pd.Series([False] * len(dataframe), index=dataframe.index) - - # 综合熊市信号(至少满足两个条件) - bearish_signal = (cond_a & cond_b) | (cond_a & cond_c) | (cond_b & cond_c) | (cond_a & cond_d) - - # 记录每个条件的触发情况 - logger.info(f"[{pair}] 熊市信号 - 条件a (价格低于布林中轨): {cond_a.iloc[-1]}") - logger.info(f"[{pair}] 熊市信号 - 条件b (STOCHRSI>90): {cond_b.iloc[-1]}") - logger.info(f"[{pair}] 熊市信号 - 条件c (看跌形态): {cond_c.iloc[-1]}") - logger.info(f"[{pair}] 熊市信号 - 条件d (成交量下降): {cond_d.iloc[-1]}") - - # 汇总所有条件的得分 - bearish_score = 0 - if cond_a.iloc[-1]: - bearish_score += 25 - if cond_b.iloc[-1]: - bearish_score += 25 - if cond_c.iloc[-1]: - bearish_score += 25 - if cond_d.iloc[-1]: - bearish_score += 25 - - logger.info(f"[{pair}] 熊市信号总得分: {bearish_score}/100") - - return bearish_signal - - - - def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: - """ - 计算主时间框架(3m)和 1h 时间框架的指标,并映射到主 dataframe。 - 包含 FreqAI 预测、布林带、RSI、成交量 Z 分数等,并确保 1h 数据列完整性。 - """ - pair = metadata.get('pair', 'Unknown') - original_length = len(dataframe) - original_index = dataframe.index - logger.info(f"[{pair}] populate_indicators 开始处理,原始数据长度:{original_length}") - logger.info(f"[{pair}] 当前可用列(调用FreqAI前):{list(dataframe.columns)}") - - # 计算主时间框架(3m)指标 - dataframe["ema200"] = ta.EMA(dataframe, timeperiod=200) - dataframe["price_value_divergence"] = (dataframe["close"] - dataframe["ema200"]) / dataframe["ema200"] - upperband, middleband, lowerband = ta.BBANDS(dataframe["close"], timeperiod=20, nbdevup=2.0, nbdevdn=2.0) - dataframe["bb_upperband"] = upperband - dataframe["bb_middleband"] = middleband - dataframe["bb_lowerband"] = lowerband - dataframe["rsi"] = ta.RSI(dataframe, timeperiod=14) - dataframe["volume_mean_20"] = dataframe["volume"].rolling(20).mean() - dataframe["volume_std_20"] = dataframe["volume"].rolling(20).std() - dataframe["volume_z_score"] = (dataframe["volume"] - dataframe["volume_mean_20"]) / dataframe["volume_std_20"] - # 计算主时间框架的 STOCHRSI - stochrsi = ta.STOCHRSI(dataframe, timeperiod=14, fastk_period=3, fastd_period=3) - dataframe["stochrsi_k"] = stochrsi["fastk"] - dataframe["stochrsi_d"] = stochrsi["fastd"] - - # 获取 1h 时间框架数据 - dataframe_1h = self.dp.get_pair_dataframe(pair=pair, timeframe="1h") - if dataframe_1h.empty or len(dataframe_1h) < 50: - logger.warning(f"[{pair}] 1h 数据为空或不足({len(dataframe_1h)} 根K线),初始化空列") - for col in ['open_1h', 'high_1h', 'low_1h', 'close_1h', 'stochrsi_k_1h', 'stochrsi_d_1h', - 'bb_upper_1h', 'bb_middle_1h', 'bb_lower_1h', 'volume_z_score_1h']: - dataframe[col] = 0.0 # 使用0.0代替np.nan避免NaN值 - else: - # 计算 1h 指标 - if len(dataframe_1h) >= 20: # 确保有足够数据计算 rolling(20) - stochrsi_1h = ta.STOCHRSI(dataframe_1h, timeperiod=14, fastk_period=3, fastd_period=3) - dataframe_1h['stochrsi_k'] = stochrsi_1h['fastk'] - dataframe_1h['stochrsi_d'] = stochrsi_1h['fastd'] - real = ta.TYPPRICE(dataframe_1h) - upperband, middleband, lowerband = ta.BBANDS(real, timeperiod=20, nbdevup=2.0, nbdevdn=2.0) - dataframe_1h['bb_upper_1h'] = upperband - dataframe_1h['bb_middle_1h'] = middleband - dataframe_1h['bb_lower_1h'] = lowerband - dataframe_1h['volume_mean_20_1h'] = dataframe_1h["volume"].rolling(20).mean() - dataframe_1h['volume_std_20_1h'] = dataframe_1h["volume"].rolling(20).std() - dataframe_1h['volume_z_score_1h'] = (dataframe_1h["volume"] - dataframe_1h['volume_mean_20_1h']) / dataframe_1h['volume_std_20_1h'] - # 清理 NaN 和无穷值 - dataframe_1h['volume_z_score_1h'] = dataframe_1h['volume_z_score_1h'].replace([np.inf, -np.inf], 0).ffill().fillna(0) - else: - logger.warning(f"[{pair}] 1h 数据不足以计算 volume_z_score_1h({len(dataframe_1h)} 根K线,需至少20根)") - dataframe_1h['volume_z_score_1h'] = 0.0 - - # 映射 1h 数据到主时间框架 - for col in ['open', 'high', 'low', 'close', 'stochrsi_k', 'stochrsi_d', 'bb_upper_1h', 'bb_middle_1h', 'bb_lower_1h', 'volume_z_score_1h']: - if col in dataframe_1h.columns: - dataframe[col if col.endswith('_1h') else f"{col}_1h"] = dataframe_1h[col].reindex(dataframe.index, method='ffill').bfill() - else: - logger.warning(f"[{pair}] 1h 数据缺少列 {col},初始化为空") - dataframe[col if col.endswith('_1h') else f"{col}_1h"] = 0.0 - - # 数据清理:处理 NaN 和无穷值,确保数据完整性 - critical_columns = ["ema200", "bb_upperband", "bb_middleband", "bb_lowerband", "rsi", "volume_z_score", - "&-price_value_divergence", "price_value_divergence", "open_1h", "high_1h", "low_1h", - "close_1h", "stochrsi_k_1h", "stochrsi_d_1h", "bb_upper_1h", "bb_middle_1h", "bb_lower_1h", - "volume_z_score_1h", "stochrsi_k", "stochrsi_d", "close", "volume"] + cond_c = cond_engulfing | self.is_stochrsi_overbought(dataframe, period=10, threshold=85) + cond_d = dataframe['volume_z_score_1h'] < -1.0 if 'volume_z_score_1h' in dataframe.columns else False - for col in critical_columns: - if col in dataframe.columns: - # 处理无穷值和NaN值 - dataframe[col] = dataframe[col].replace([np.inf, -np.inf], 0) - # 前向填充,然后用0填充剩余NaN - dataframe[col] = dataframe[col].ffill().fillna(0) - # 确保数据类型正确 - dataframe[col] = pd.to_numeric(dataframe[col], errors='coerce').fillna(0) - - # 验证并修复DataFrame长度一致性 - final_length = len(dataframe) - if final_length != original_length: - logger.warning(f"[{pair}] DataFrame长度不匹配: 原始{original_length} vs 当前{final_length}") - # 重新索引确保长度一致 - dataframe = dataframe.reindex(original_index) - # 填充任何NaN值 - dataframe = dataframe.fillna(0) - logger.info(f"[{pair}] 已修复DataFrame长度,当前长度: {len(dataframe)}") - - # 最终验证 - if len(dataframe) != original_length: - logger.error(f"[{pair}] 严重错误:无法修复DataFrame长度不匹配问题") - # 强制截断或填充到原始长度 - if len(dataframe) > original_length: - dataframe = dataframe.iloc[:original_length] - else: - # 填充缺失的行 - missing_rows = original_length - len(dataframe) - logger.warning(f"[{pair}] 需要填充{missing_rows}行缺失数据") - # 使用最后一个有效值填充 - last_row = dataframe.iloc[-1:] if len(dataframe) > 0 else dataframe.iloc[0:1] - for _ in range(missing_rows): - dataframe = pd.concat([dataframe, last_row]) - dataframe = dataframe.iloc[:original_length] - dataframe.index = original_index - - # 调用 FreqAI 预测 - 使用单一回归模型 - if not hasattr(self, 'freqai') or self.freqai is None: - logger.error(f"[{pair}] FreqAI 未初始化,回退到规则计算") - dataframe["&-price_value_divergence"] = dataframe["price_value_divergence"] - else: - logger.info(f"[{pair}] 调用 FreqAI 预测,类型:{type(self.freqai)}") - - try: - dataframe = self.freqai.start(dataframe, metadata, self) - - # 检查 FreqAI 生成的列 - freqai_columns = [col for col in dataframe.columns if col.startswith('&')] - logger.info(f"[{pair}] FreqAI 生成的列: {freqai_columns}") - - if "&-price_value_divergence" not in dataframe.columns: - logger.warning(f"[{pair}] FreqAI 未生成 &-price_value_divergence,回退到规则计算") - dataframe["&-price_value_divergence"] = dataframe["price_value_divergence"] - - except Exception as e: - logger.error(f"[{pair}] FreqAI 预测失败: {e}") - dataframe["&-price_value_divergence"] = dataframe["price_value_divergence"] - - # 计算 labels_mean 和 labels_std - labels_mean = None - labels_std = None - try: - model_base_dir = os.path.join(self.config["user_data_dir"], "models", self.freqai_info["identifier"]) - pair_base = pair.split('/')[0] if '/' in pair else pair - sub_dirs = glob.glob(os.path.join(model_base_dir, f"sub-train-{pair_base}_*")) - if not sub_dirs: - logger.warning(f"[{pair}] 未找到任何子目录:{model_base_dir}/sub-train-{pair_base}_*") - else: - latest_sub_dir = max(sub_dirs, key=lambda x: int(x.split('_')[-1])) - pair_base_lower = pair_base.lower() - timestamp = latest_sub_dir.split('_')[-1] - metadata_file = os.path.join(latest_sub_dir, f"cb_{pair_base_lower}_{timestamp}_metadata.json") - if os.path.exists(metadata_file): - with open(metadata_file, "r") as f: - metadata = json.load(f) - labels_mean = metadata["labels_mean"]["&-price_value_divergence"] - labels_std = metadata["labels_std"]["&-price_value_divergence"] - logger.info(f"[{pair}] 从最新子目录 {latest_sub_dir} 读取 labels_mean:{labels_mean}, labels_std:{labels_std}") - else: - logger.warning(f"[{pair}] 最新的 metadata.json 文件 {metadata_file} 不存在") - except Exception as e: - logger.warning(f"[{pair}] 无法从子目录读取 labels_mean 和 labels_std:{e},重新计算") - - if labels_mean is None or labels_std is None: - logger.warning(f"[{pair}] 无法获取 labels_mean 和 labels_std,重新计算") - dataframe["&-price_value_divergence_actual"] = (dataframe["close"] - dataframe["ema200"]) / dataframe["ema200"] - dataframe["&-price_value_divergence_actual"] = dataframe["&-price_value_divergence_actual"].replace([np.inf, -np.inf], 0).ffill().fillna(0) - recent_data = dataframe["&-price_value_divergence_actual"].tail(self.fit_live_predictions_candles) - labels_mean = recent_data.mean() - labels_std = recent_data.std() - if np.isnan(labels_std) or labels_std == 0: - labels_std = 0.01 - logger.warning(f"[{pair}] labels_std 计算异常,使用默认值 0.01") - - # 根据市场趋势得分动态调整买卖阈值 - market_trend_score = self.get_market_trend(dataframe=dataframe, metadata={'pair': pair}) - k_buy = self.linear_map(market_trend_score, 0, 100, 1.2, 0.8) - k_sell = self.linear_map(market_trend_score, 0, 100, 1.5, 1.0) - - # 处理分类模型的预测值 - if "&*-optimal_first_length" in dataframe.columns: - dataframe["optimal_first_length_pred"] = dataframe["&*-optimal_first_length"] - logger.info(f"[{pair}] ✅ 找到并复制分类模型预测列: &*-optimal_first_length -> optimal_first_length_pred") - logger.info(f"[{pair}] 分类预测值统计: {dataframe['&*-optimal_first_length'].value_counts().to_dict()}") - # 确保列存在 - if "optimal_first_length_pred" in dataframe.columns: - logger.info(f"[{pair}] ✅ optimal_first_length_pred 列已创建,最后5个值: {dataframe['optimal_first_length_pred'].tail().tolist()}") - else: - logger.error(f"[{pair}] ❌ optimal_first_length_pred 列创建失败") - else: - logger.warning(f"[{pair}] 未找到分类模型预测列 &*-optimal_first_length") - # 创建默认列 - dataframe["optimal_first_length_pred"] = 2.0 - logger.info(f"[{pair}] 创建默认 optimal_first_length_pred 列,值=2.0") - - self.buy_threshold = labels_mean - k_buy * labels_std - self.sell_threshold = labels_mean + k_sell * labels_std - - # 使用固定参数限制阈值 - self.buy_threshold = max(self.buy_threshold, self.BUY_THRESHOLD_MIN) - self.buy_threshold = min(self.buy_threshold, self.BUY_THRESHOLD_MAX) - self.sell_threshold = min(self.sell_threshold, self.SELL_THRESHOLD_MAX) - self.sell_threshold = max(self.sell_threshold, self.SELL_THRESHOLD_MIN) - - # 调试日志 - logger.info(f"[{pair}] 市场趋势得分:{market_trend_score}, labels_mean:{labels_mean:.4f}, labels_std:{labels_std:.4f}") - logger.info(f"[{pair}] k_buy:{k_buy:.2f}, k_sell:{k_sell:.2f}") - logger.info(f"[{pair}] 动态买入阈值:{self.buy_threshold:.4f}, 卖出阈值:{self.sell_threshold:.4f}") - logger.info(f"[{pair}] 最新数据 - close: {dataframe['close'].iloc[-1]:.6f}, " - f"rsi: {dataframe['rsi'].iloc[-1]:.2f}, " - f"&-price_value_divergence: {dataframe['&-price_value_divergence'].iloc[-1]:.6f}, " - f"volume_z_score: {dataframe['volume_z_score'].iloc[-1]:.2f}, " - f"bb_lowerband: {dataframe['bb_lowerband'].iloc[-1]:.6f}, " - f"close_1h: {dataframe['close_1h'].iloc[-1] if 'close_1h' in dataframe else 'N/A'}, " - f"stochrsi_k_1h: {dataframe['stochrsi_k_1h'].iloc[-1] if 'stochrsi_k_1h' in dataframe else 'N/A'}, " - f"volume_z_score_1h: {dataframe['volume_z_score_1h'].iloc[-1] if 'volume_z_score_1h' in dataframe else 'N/A'}") - - if not self.stats_logged: - logger.info("===== 所有币对的 labels_mean 和 labels_std 汇总 =====") - for p, stats in self.pair_stats.items(): - logger.info(f"[{p}] labels_mean:{stats['labels_mean']:.4f}, labels_std:{stats['labels_std']:.4f}") - logger.info("==============================================") - self.stats_logged = True - - return dataframe - - def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: - pair = metadata.get('pair', 'Unknown') - original_length = len(dataframe) - original_index = dataframe.index - - conditions = [] - logger.info(f"[{pair}] populate_entry_trend 被调用,原始数据长度:{original_length},时间:{dataframe.index[-1]}") - # 获取市场趋势得分 - trend_score = self.get_market_trend(dataframe=dataframe, metadata=metadata) - - # 动态调整成交量和 RSI 阈值 - volume_z_score_min = 0.5 - volume_z_score_max = 1.5 - volume_z_score_threshold = self.linear_map(trend_score, 0, 100, volume_z_score_max, volume_z_score_min) - rsi_min = 35 - rsi_max = 55 - rsi_threshold = self.linear_map(trend_score, 0, 100, rsi_max, rsi_min) - stochrsi_min = 25 - stochrsi_max = 45 - stochrsi_threshold = self.linear_map(trend_score, 0, 100, stochrsi_max, stochrsi_min) - - # 计算熊市信号和 STOCHRSI 超买信号 - bearish_signal = self.is_bearish_market(dataframe, metadata, timeframe="1h") - bearish_signal_aligned = bearish_signal.reindex(dataframe.index, method='ffill').fillna(False) - stochrsi_overbought = self.is_stochrsi_overbought(dataframe, period=10, threshold=85) - stochrsi_overbought_aligned = stochrsi_overbought.reindex(dataframe.index, method='ffill').fillna(True) - - # 检测趋势状态 - trend_status = self.detect_trend_status(dataframe, metadata) - - # 根据趋势状态调整入场策略 - if "&-price_value_divergence" in dataframe.columns: - # 检测当前持仓订单数量(用于绿色通道判断) - open_trades = len(self.active_trades) if hasattr(self, 'active_trades') else 0 - is_green_channel = (trend_status == "bullish" and open_trades <= 2 and self.GREEN_CHANNEL_ENABLED) - - if is_green_channel: - # 🟢 牛市绿色通道:持仓≤2个,25USDT入场,5条件需要满足4个 - cond1 = (dataframe["&-price_value_divergence"] < self.buy_threshold * 1.8) # 超宽松偏离度 - cond2 = (dataframe["volume_z_score"] > volume_z_score_threshold * 0.4) # 超低成交量要求 - cond3 = (dataframe["rsi"] < rsi_threshold * 1.4) # 超宽松RSI - cond4 = (dataframe["close"] <= dataframe["bb_upperband"] * 1.05) # 允许上轨附近 - cond5 = (dataframe["stochrsi_k"] < stochrsi_threshold * 1.4) # 超宽松STOCHRSI - - core_conditions = [cond1, cond2, cond3, cond4, cond5] - satisfied_count = sum([c.iloc[-1] for c in core_conditions]) - buy_condition = satisfied_count >= 4 - logger.info(f"[{pair}] 🟢 牛市绿色通道:持仓{open_trades}≤2个,25USDT入场,5条件需要满足{satisfied_count}/4个") - - elif trend_status == "bullish": - # 牛市正常通道:持仓>2个,75USDT入场,必须满足全部7个条件 - cond1 = (dataframe["&-price_value_divergence"] < self.buy_threshold * 1.5) # 放宽到1.5倍 - cond2 = (dataframe["volume_z_score"] > volume_z_score_threshold * 0.7) # 降低成交量要求 - cond3 = (dataframe["rsi"] < rsi_threshold * 1.2) # 放宽RSI要求 - cond4 = (dataframe["close"] <= dataframe["bb_upperband"]) # 可以在上轨附近入场 - cond5 = (dataframe["stochrsi_k"] < stochrsi_threshold * 1.2) # 放宽STOCHRSI要求 - cond6 = pd.Series([True] * len(dataframe), index=dataframe.index) # 取消熊市过滤 - cond7 = pd.Series([True] * len(dataframe), index=dataframe.index) # 取消超买过滤 - buy_condition = cond1 & cond2 & cond3 & cond4 & cond5 & cond6 & cond7 - logger.info(f"[{pair}] 🚀 牛市正常通道:持仓{open_trades}>2个,75USDT入场,必须满足全部7个条件") - - elif trend_status == "bearish": - # 下跌趋势:严格入场条件,只抄底 - cond1 = (dataframe["&-price_value_divergence"] < self.buy_threshold * 0.7) # 严格到0.7倍 - cond2 = (dataframe["volume_z_score"] > volume_z_score_threshold * 1.3) # 提高成交量要求 - cond3 = (dataframe["rsi"] < rsi_threshold * 0.8) # 严格RSI要求 - cond4 = (dataframe["close"] <= dataframe["bb_lowerband"] * 0.95) # 必须跌破下轨 - cond5 = (dataframe["stochrsi_k"] < stochrsi_threshold * 0.8) # 严格STOCHRSI要求 - cond6 = ~bearish_signal_aligned # 保持熊市过滤 - cond7 = ~stochrsi_overbought_aligned # 保持超买过滤 - buy_condition = cond1 & cond2 & cond3 & cond4 & cond5 & cond6 & cond7 - logger.info(f"[{pair}] 📉 下跌趋势策略:严格入场条件") - - else: # ranging - # 震荡趋势:使用原策略 - cond1 = (dataframe["&-price_value_divergence"] < self.buy_threshold) - cond2 = (dataframe["volume_z_score"] > volume_z_score_threshold) - cond3 = (dataframe["rsi"] < rsi_threshold) - cond4 = (dataframe["close"] <= dataframe["bb_lowerband"]) - cond5 = (dataframe["stochrsi_k"] < stochrsi_threshold) - cond6 = ~bearish_signal_aligned - cond7 = ~stochrsi_overbought_aligned - buy_condition = cond1 & cond2 & cond3 & cond4 & cond5 & cond6 & cond7 - logger.info(f"[{pair}] ⚖️ 震荡趋势策略:标准入场条件") - - # 绿色通道和趋势状态的条件已经设置好buy_condition - conditions.append(buy_condition) - - # 调试日志 - 添加更多调试信息 - divergence_value = dataframe["&-price_value_divergence"].iloc[-1] - volume_z_score_value = dataframe["volume_z_score"].iloc[-1] - rsi_value = dataframe["rsi"].iloc[-1] - stochrsi_value = dataframe["stochrsi_k"].iloc[-1] - bb_close_value = dataframe["close"].iloc[-1] - bb_lower_value = dataframe["bb_lowerband"].iloc[-1] - bb_upper_value = dataframe["bb_upperband"].iloc[-1] - - # 输出关键指标以便调试 - logger.info(f"[{pair}] 📊 关键指标 - 偏离度: {divergence_value:.6f}, RSI: {rsi_value:.2f}, STOCHRSI: {stochrsi_value:.2f}, 价格: {bb_close_value:.4f}, 下轨: {bb_lower_value:.4f}, 上轨: {bb_upper_value:.4f}") - logger.info(f"[{pair}] 🔍 买入阈值 - 偏离度: {self.buy_threshold:.6f}, RSI: {rsi_threshold:.2f}, STOCHRSI: {stochrsi_threshold:.2f}") - # 定义条件名称和状态 - conditions_summary = [ - ("&-price_value_divergence", divergence_value, "<", self.buy_threshold, cond1.iloc[-1]), - ("volume_z_score", volume_z_score_value, ">", volume_z_score_threshold, cond2.iloc[-1]), - ("rsi", rsi_value, "<", rsi_threshold, cond3.iloc[-1]), - ("close <= bb_lowerband", bb_close_value, "<=", bb_lower_value, cond4.iloc[-1]), - ("stochrsi_k", stochrsi_value, "<", stochrsi_threshold, cond5.iloc[-1]), - ] - - # 根据趋势状态添加对应的条件6和7 - if trend_status == "bullish" and open_trades <= 2: - # 绿色通道只有5个条件 - pass - else: - # 其他通道有7个条件 - conditions_summary.extend([ - ("非熊市", None, None, None, cond6.iloc[-1]), - ("STOCHRSI未持续超买", None, None, None, cond7.iloc[-1]), - ]) - - # 输出每个条件的状态 - logger.info(f"[{pair}] === 买入条件检查 ===") - failed_conditions = [] - for name, value, operator, threshold, result in conditions_summary: - status = "✅" if result else "❌" - if value is not None and threshold is not None: - logger.info(f"[{pair}] {status} {name}: {value:.6f} {operator} {threshold:.6f}") - else: - logger.info(f"[{pair}] {status} {name}") - if not result: - failed_conditions.append(name) - else: - logger.warning(f"[{pair}] &-price_value_divergence 列缺失,跳过买入信号生成") - - if conditions: - combined_condition = reduce(lambda x, y: x & y, conditions) - dataframe.loc[combined_condition, 'enter_long'] = 1 - - # 输出每个条件的状态 - logger.info(f"[{pair}] === 买入条件检查 ===") - satisfied_conditions = [] - for name, value, operator, threshold, result in conditions_summary: - status = "✅" if result else "❌" - if value is not None and threshold is not None: - logger.info(f"[{pair}] {status} {name}: {value:.6f} {operator} {threshold:.6f}") - else: - logger.info(f"[{pair}] {status} {name}") - if result: - satisfied_conditions.append(name) - - # 总结满足的条件 - if combined_condition.any(): - logger.info(f"[{pair}] ✅ 买入信号触发,满足条件: {', '.join(satisfied_conditions)},趋势得分:{trend_score:.2f}") - else: - logger.info(f"[{pair}] ❌ 买入条件未满足") - - else: - logger.info(f"[{pair}] 无有效买入条件") - - # 记录各条件触发率 - logger.info(f"[{pair}] 各条件触发率 - " - f"cond1: {cond1.mean():.2%}, " - f"cond2: {cond2.mean():.2%}, " - f"cond3: {cond3.mean():.2%}, " - f"cond4: {cond4.mean():.2%}, " - f"cond5: {cond5.mean():.2%}, " - f"buy_condition: {buy_condition.mean():.2%}") - # 记录 enter_long 信号统计 - enter_long_count = dataframe['enter_long'].sum() if 'enter_long' in dataframe.columns else 0 - logger.info(f"[{pair}] enter_long 信号总数:{enter_long_count}") - - # 数据完整性检查和长度验证修复 - if len(dataframe) > 0: - # 验证数据长度一致性 - current_length = len(dataframe) - if current_length != original_length: - logger.warning(f"[{pair}] ⚠️ DataFrame长度不匹配!原始长度: {original_length}, 当前长度: {current_length}") - - # 修复数据长度 - if current_length < original_length: - # 数据行数不足,需要填充 - missing_rows = original_length - current_length - logger.info(f"[{pair}] 填充缺失的 {missing_rows} 行数据...") - - # 创建缺失行的索引 - missing_index = original_index.difference(dataframe.index) - if len(missing_index) > 0: - # 用最后一行的数据填充缺失行 - last_row = dataframe.iloc[-1:].copy() - filled_rows = pd.DataFrame(index=missing_index) - for col in dataframe.columns: - filled_rows[col] = last_row[col].iloc[0] if len(last_row) > 0 else 0 - - # 合并数据 - dataframe = pd.concat([dataframe, filled_rows]) - dataframe = dataframe.reindex(original_index) - logger.info(f"[{pair}] 数据填充完成,新长度: {len(dataframe)}") - - elif current_length > original_length: - # 数据行数过多,截断到原始长度 - excess_rows = current_length - original_length - logger.info(f"[{pair}] 截断多余的 {excess_rows} 行数据...") - dataframe = dataframe.iloc[:original_length].copy() - dataframe = dataframe.reindex(original_index) - logger.info(f"[{pair}] 数据截断完成,新长度: {len(dataframe)}") - - else: - # 长度一致但索引可能不同,重新对齐索引 - dataframe = dataframe.reindex(original_index) - logger.info(f"[{pair}] 索引重新对齐完成,长度: {len(dataframe)}") - - # 处理NaN值 - nan_columns = [col for col in dataframe.columns if dataframe[col].isna().any()] - if nan_columns: - logger.warning(f"[{pair}] 发现NaN值的列: {nan_columns}") - for col in nan_columns: - nan_count = dataframe[col].isna().sum() - if nan_count > 0: - logger.warning(f"[{pair}] 列 {col} 有 {nan_count} 个NaN值,正在清理...") - # 使用前向填充,然后零填充 - dataframe[col] = dataframe[col].fillna(method='ffill').fillna(0) - - # 最终验证 - final_length = len(dataframe) - if final_length != original_length: - logger.error(f"[{pair}] ❌ 数据长度修复失败!期望: {original_length}, 实际: {final_length}") - # 强制截断或填充到原始长度 - if final_length > original_length: - dataframe = dataframe.iloc[:original_length] - elif final_length < original_length: - # 复制最后一行填充 - last_row = dataframe.iloc[-1:].copy() - while len(dataframe) < original_length: - dataframe = pd.concat([dataframe, last_row]) - dataframe = dataframe.iloc[:original_length] - logger.info(f"[{pair}] 强制修复完成,最终长度: {len(dataframe)}") - else: - logger.info(f"[{pair}] ✅ 数据长度验证通过,最终长度: {final_length}") - - return dataframe - - def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: - pair = metadata.get('pair', 'Unknown') - - # 记录原始数据长度和索引 - original_length = len(dataframe) - original_index = dataframe.index - logger.info(f"[{pair}] populate_exit_trend 开始处理,原始数据长度:{original_length}") - - conditions = [] - - if "&-price_value_divergence" in dataframe.columns: - # 计算额外指标:StochRSI、ADX 和短期价格变化 - stochrsi = ta.STOCHRSI(dataframe, timeperiod=14, fastk_period=3, fastd_period=3) - dataframe["stochrsi_k"] = stochrsi["fastk"] - dataframe["adx"] = ta.ADX(dataframe, timeperiod=14) - # 计算短期价格涨幅(最近 5 根 K 线,约 15 分钟) - dataframe["short_term_return"] = dataframe["close"].pct_change(5, fill_method=None) * 100 # 百分比回报 - - # 清理新计算列的NaN值 - for col in ["stochrsi_k", "adx", "short_term_return"]: - if col in dataframe.columns: - dataframe[col] = dataframe[col].replace([np.inf, -np.inf], 0).ffill().fillna(0) - - # 获取市场趋势得分 - trend_score = self.get_market_trend(dataframe=dataframe, metadata={'pair': pair}) - - # 条件 1:高阈值 &-price_value_divergence - cond1 = ( - (dataframe["&-price_value_divergence"] > self.sell_threshold * float(self.exit_divergence_multiplier.value)) & - (dataframe["adx"] > float(self.exit_adx_threshold.value)) # 趋势强度过滤 - ) - - # 条件 2:超买信号 - cond2 = ( - (dataframe["rsi"] > float(self.exit_rsi_threshold.value)) & - (dataframe["stochrsi_k"] > float(self.exit_stochrsi_threshold.value)) & # StochRSI 超买 - (dataframe["adx"] > float(self.exit_adx_threshold_overbought.value)) # 趋势强度 - ) - - # 条件 3:快速拉升退出 - # 检测最近 5 根 K 线(约 15 分钟)涨幅超过阈值,且有最低利润要求 - min_profit = float(self.exit_min_profit_threshold.value) # 最低利润要求 - rapid_rise_threshold = self.linear_map(trend_score, 0, 100, float(self.exit_rapid_rise_bear.value), float(self.exit_rapid_rise_bull.value)) - cond3 = ( - (dataframe["short_term_return"] > rapid_rise_threshold) & # 短期快速拉升 - (dataframe["close"] / dataframe["close"].shift(5) - 1 > min_profit) & # 确保最低利润 - (dataframe["stochrsi_k"] > float(self.exit_stochrsi_rapid.value)) & # 超买确认 - (dataframe["volume_z_score"] > float(self.exit_volume_threshold.value)) - ) - - # 检测趋势状态 - trend_status = self.detect_trend_status(dataframe, metadata) - - # 根据趋势状态调整出场策略 - if trend_status == "bullish": - # 上涨趋势:严格出场条件,让利润奔跑 - if trend_score > float(self.exit_bullish_trend_score_max.value): - logger.info(f"[{pair}] 🚀 强劲上涨趋势,拒绝卖出") - return dataframe - - # 上涨趋势下需要更强的卖出信号 - cond1_bullish = (dataframe["&-price_value_divergence"] > self.sell_threshold * float(self.exit_bullish_divergence_mult.value)) - cond2_bullish = (dataframe["rsi"] > float(self.exit_bullish_rsi.value)) & (dataframe["stochrsi_k"] > float(self.exit_bullish_stochrsi.value)) & (dataframe["adx"] > float(self.exit_bullish_adx.value)) - cond3_bullish = (dataframe["short_term_return"] > float(self.exit_bullish_return.value)) & (dataframe["stochrsi_k"] > float(self.exit_bullish_stochrsi_rapid.value)) - - sell_condition = (cond1_bullish & cond2_bullish) | (cond1_bullish & cond3_bullish) | (cond2_bullish & cond3_bullish) - logger.info(f"[{pair}] 🚀 上涨趋势策略:严格出场条件") - - elif trend_status == "bearish": - # 下跌趋势:宽松出场条件,快速止盈止损 - cond1_bearish = (dataframe["&-price_value_divergence"] > self.sell_threshold * float(self.exit_bearish_divergence_mult.value)) - cond2_bearish = (dataframe["rsi"] > float(self.exit_bearish_rsi.value)) & (dataframe["stochrsi_k"] > float(self.exit_bearish_stochrsi.value)) & (dataframe["adx"] > float(self.exit_bearish_adx.value)) - cond3_bearish = (dataframe["short_term_return"] > float(self.exit_bearish_return.value)) & (dataframe["stochrsi_k"] > float(self.exit_bearish_stochrsi_rapid.value)) - - sell_condition = cond1_bearish | cond2_bearish | cond3_bearish # 任一条件即可卖出 - logger.info(f"[{pair}] 📉 下跌趋势策略:宽松出场条件") - - else: # ranging - # 震荡趋势:使用原策略 - if trend_score > float(self.exit_ranging_trend_score_max.value): - logger.info(f"[{pair}] ⚖️ 震荡趋势但得分较高,拒绝卖出") - return dataframe - - if trend_score > float(self.exit_ranging_trend_score_threshold.value): - sell_condition = (cond1 & cond2) | (cond1 & cond3) | (cond2 & cond3) # 中等趋势,至少两个条件满足 - logger.info(f"[{pair}] ⚖️ 震荡趋势策略:标准出场条件") - else: - sell_condition = cond1 | cond2 | cond3 # 弱势趋势,任一条件满足 - logger.info(f"[{pair}] ⚖️ 弱势震荡,任一条件满足") - - conditions.append(sell_condition) - - # 调试日志 - 使用安全的值获取 - divergence_value = dataframe["&-price_value_divergence"].iloc[-1] if len(dataframe) > 0 and not dataframe["&-price_value_divergence"].isna().iloc[-1] else 0.0 - rsi_value = dataframe["rsi"].iloc[-1] if len(dataframe) > 0 and not dataframe["rsi"].isna().iloc[-1] else 0.0 - stochrsi_value = dataframe["stochrsi_k"].iloc[-1] if len(dataframe) > 0 and not dataframe["stochrsi_k"].isna().iloc[-1] else 0.0 - adx_value = dataframe["adx"].iloc[-1] if len(dataframe) > 0 and not dataframe["adx"].isna().iloc[-1] else 0.0 - short_term_return = dataframe["short_term_return"].iloc[-1] if len(dataframe) > 0 and not dataframe["short_term_return"].isna().iloc[-1] else 0.0 - logger.info(f"[{pair}] 卖出条件检查 - " - f"&-price_value_divergence={divergence_value:.6f} > {self.sell_threshold * float(self.exit_divergence_multiplier.value):.6f}: {cond1.iloc[-1]}, " - f"rsi={rsi_value:.2f} > {float(self.exit_rsi_threshold.value)} & stochrsi_k={stochrsi_value:.2f} > {float(self.exit_stochrsi_threshold.value)}: {cond2.iloc[-1]}, " - f"short_term_return={short_term_return:.2f}% > {rapid_rise_threshold:.2f}% & profit > {min_profit*100:.2f}%: {cond3.iloc[-1]}, " - f"adx={adx_value:.2f}, trend_score={trend_score:.2f}") - else: - logger.warning(f"[{pair}] ⚠️ &-price_value_divergence 列缺失,跳过该条件") - - if len(conditions) > 0: - dataframe.loc[reduce(lambda x, y: x & y, conditions), 'exit_long'] = 1 - logger.info(f"[{pair}] 出场信号触发,条件满足,趋势得分:{trend_score:.2f}") - else: - logger.info(f"[{pair}] 无有效卖出条件") - - # 数据完整性检查和长度验证修复 - if len(dataframe) > 0: - # 验证数据长度一致性 - current_length = len(dataframe) - if current_length != original_length: - logger.warning(f"[{pair}] ⚠️ 卖出DataFrame长度不匹配!原始长度: {original_length}, 当前长度: {current_length}") - - # 修复数据长度 - if current_length < original_length: - # 数据行数不足,需要填充 - missing_rows = original_length - current_length - logger.info(f"[{pair}] 卖出填充缺失的 {missing_rows} 行数据...") - - # 创建缺失行的索引 - missing_index = original_index.difference(dataframe.index) - if len(missing_index) > 0: - # 用最后一行的数据填充缺失行 - last_row = dataframe.iloc[-1:].copy() - filled_rows = pd.DataFrame(index=missing_index) - for col in dataframe.columns: - filled_rows[col] = last_row[col].iloc[0] if len(last_row) > 0 else 0 - - # 合并数据 - dataframe = pd.concat([dataframe, filled_rows]) - dataframe = dataframe.reindex(original_index) - logger.info(f"[{pair}] 卖出数据填充完成,新长度: {len(dataframe)}") - - elif current_length > original_length: - # 数据行数过多,截断到原始长度 - excess_rows = current_length - original_length - logger.info(f"[{pair}] 卖出截断多余的 {excess_rows} 行数据...") - dataframe = dataframe.iloc[:original_length].copy() - dataframe = dataframe.reindex(original_index) - logger.info(f"[{pair}] 卖出数据截断完成,新长度: {len(dataframe)}") - - else: - # 长度一致但索引可能不同,重新对齐索引 - dataframe = dataframe.reindex(original_index) - logger.info(f"[{pair}] 卖出索引重新对齐完成,长度: {len(dataframe)}") - - # 处理NaN值 - nan_columns = [col for col in dataframe.columns if dataframe[col].isna().any()] - if nan_columns: - logger.warning(f"[{pair}] 卖出检查 - 发现NaN值的列: {nan_columns}") - for col in nan_columns: - nan_count = dataframe[col].isna().sum() - if nan_count > 0: - logger.warning(f"[{pair}] 卖出检查 - 列 {col} 有 {nan_count} 个NaN值,正在清理...") - # 使用前向填充,然后零填充 - dataframe[col] = dataframe[col].fillna(method='ffill').fillna(0) - - # 最终验证 - final_length = len(dataframe) - if final_length != original_length: - logger.error(f"[{pair}] ❌ 卖出数据长度修复失败!期望: {original_length}, 实际: {final_length}") - # 强制截断或填充到原始长度 - if final_length > original_length: - dataframe = dataframe.iloc[:original_length] - elif final_length < original_length: - # 复制最后一行填充 - last_row = dataframe.iloc[-1:].copy() - while len(dataframe) < original_length: - dataframe = pd.concat([dataframe, last_row]) - dataframe = dataframe.iloc[:original_length] - logger.info(f"[{pair}] 卖出强制修复完成,最终长度: {len(dataframe)}") - else: - logger.info(f"[{pair}] ✅ 卖出数据长度验证通过,最终长度: {final_length}") - - # 记录exit_long信号 - exit_long_count = dataframe['exit_long'].sum() if 'exit_long' in dataframe.columns else 0 - logger.info(f"[{pair}] exit_long 信号总数:{exit_long_count}") - - return dataframe - - def buy_space(self): - return [ - DecimalParameter(-0.1, -0.01, name="buy_threshold_min"), - DecimalParameter(-0.02, -0.001, name="buy_threshold_max"), - DecimalParameter(-0.05, -0.01, name="add_position_threshold", default=-0.02), - IntParameter(1, 10, name="cooldown_period_minutes", default=5), - IntParameter(1, 3, name="max_entry_position_adjustment", default=2) - ] - - def sell_space(self): - return [ - DecimalParameter(0.001, 0.02, name="sell_threshold_min"), - DecimalParameter(0.02, 0.1, name="sell_threshold_max"), - DecimalParameter(0.2, 0.7, name="exit_position_ratio", default=0.5), - # 出场策略参数 - DecimalParameter(1.0, 1.3, name="exit_divergence_multiplier"), - IntParameter(15, 35, name="exit_adx_threshold"), - IntParameter(55, 75, name="exit_rsi_threshold"), - IntParameter(60, 80, name="exit_stochrsi_threshold"), - IntParameter(20, 35, name="exit_adx_threshold_overbought"), - DecimalParameter(0.005, 0.05, name="exit_min_profit_threshold"), - DecimalParameter(1.5, 4.0, name="exit_rapid_rise_bull"), - DecimalParameter(2.5, 5.0, name="exit_rapid_rise_bear"), - IntParameter(65, 85, name="exit_stochrsi_rapid"), - DecimalParameter(0.5, 2.0, name="exit_volume_threshold"), - # 趋势状态相关出场参数 - IntParameter(90, 98, name="exit_bullish_trend_score_max"), - DecimalParameter(1.1, 1.3, name="exit_bullish_divergence_mult"), - IntParameter(70, 85, name="exit_bullish_rsi"), - IntParameter(80, 90, name="exit_bullish_stochrsi"), - IntParameter(25, 40, name="exit_bullish_adx"), - DecimalParameter(3.0, 7.0, name="exit_bullish_return"), - IntParameter(80, 90, name="exit_bullish_stochrsi_rapid"), - IntParameter(95, 99, name="exit_ranging_trend_score_max"), - IntParameter(80, 90, name="exit_ranging_trend_score_threshold"), - DecimalParameter(0.8, 1.0, name="exit_bearish_divergence_mult"), - IntParameter(55, 65, name="exit_bearish_rsi"), - IntParameter(65, 75, name="exit_bearish_stochrsi"), - IntParameter(15, 25, name="exit_bearish_adx"), - DecimalParameter(1.0, 3.0, name="exit_bearish_return"), - IntParameter(70, 85, name="exit_bearish_stochrsi_rapid") - ] - - def adjust_trade_position(self, trade: Trade, current_time: datetime, - current_rate: float, current_profit: float, - min_stake: float | None, max_stake: float, - current_entry_rate: float, current_exit_rate: float, - current_entry_profit: float, current_exit_profit: float, - **kwargs) -> float | None | tuple[float | None, str | None]: - """ - 动态调整仓位:支持加仓、减仓、追踪止损和最大持仓时间限制 - 参数: - - trade: 当前交易对象 - - current_time: 当前时间 - - current_rate: 当前价格 - - current_profit: 当前总盈利 - - min_stake: 最小下注金额 - - max_stake: 最大下注金额 - - current_entry_rate: 当前入场价格 - - current_exit_rate: 当前退出价格 - - current_entry_profit: 当前入场盈利 - - current_exit_profit: 当前退出盈利 - 返回: - - 调整金额(正数为加仓,负数为减仓)或 None - """ - pair = trade.pair - dataframe = self.dp.get_pair_dataframe(pair, self.timeframe) - trend_score = self.get_market_trend(dataframe=dataframe, metadata={'pair': pair}) - hold_time = (current_time - trade.open_date_utc).total_seconds() / 60 - profit_ratio = (current_rate - trade.open_rate) / trade.open_rate - - # 检测趋势状态(必须先定义才能使用) - trend_status = self.detect_trend_status(dataframe, {'pair': pair}) - logger.info(f"{pair} 当前趋势状态: {trend_status}") - - # 检测当前持仓订单数量 - open_trades = len(self.active_trades) if hasattr(self, 'active_trades') else 0 - - # 牛市绿色通道判断:持仓≤2个且牛市趋势,且绿色通道开关开启 - is_green_channel = (trend_status == "bullish" and open_trades <= 2 and self.GREEN_CHANNEL_ENABLED) - - # 根据绿色通道调整入场金额 - if is_green_channel: - initial_stake_amount = 25.0 # 绿色通道:25USDT入场 - logger.info(f"{pair} 🟢 牛市绿色通道:25USDT入场,当前持仓{open_trades}个") - else: - initial_stake_amount = 75.0 # 正常通道:75USDT入场 - logger.info(f"{pair} 首次入场金额: {initial_stake_amount:.2f}, 当前持仓金额: {trade.stake_amount:.2f}, " - f"加仓次数: {trade.nr_of_successful_entries - 1}, 趋势得分: {trend_score:.2f}") - - # 根据趋势状态调整仓位管理参数 - if trend_status == "bullish": - # 上涨趋势:积极加仓,放宽止盈 - max_entry_adjustments = min(self.MAX_ENTRY_POSITION_ADJUSTMENT + 1, 5) # 允许更多加仓 - add_position_threshold = self.ADD_POSITION_THRESHOLD * 1.3 # 更宽松的加仓条件 - exit_position_ratio = self.EXIT_POSITION_RATIO * 1.4 # 更高的止盈目标 - trailing_stop_start = self.TRAILING_STOP_START * 1.2 # 更高的启动阈值 - trailing_stop_distance = self.TRAILING_STOP_DISTANCE * 1.5 # 更大的回撤容忍 - logger.info(f"{pair} 🚀 上涨趋势仓位管理参数: max_entries={max_entry_adjustments}, add_thresh={add_position_threshold:.4f}, exit_ratio={exit_position_ratio:.2%}") - - elif trend_status == "bearish": - # 下跌趋势:谨慎加仓,严格止盈 - max_entry_adjustments = max(self.MAX_ENTRY_POSITION_ADJUSTMENT - 1, 1) # 减少加仓次数 - add_position_threshold = self.ADD_POSITION_THRESHOLD * 0.7 # 更严格的加仓条件 - exit_position_ratio = self.EXIT_POSITION_RATIO * 0.8 # 更低的止盈目标 - trailing_stop_start = self.TRAILING_STOP_START * 0.8 # 更低的启动阈值 - trailing_stop_distance = self.TRAILING_STOP_DISTANCE * 0.7 # 更严格的止损 - logger.info(f"{pair} 📉 下跌趋势仓位管理参数: max_entries={max_entry_adjustments}, add_thresh={add_position_threshold:.4f}, exit_ratio={exit_position_ratio:.2%}") - - else: # ranging - # 震荡趋势:使用标准参数 - max_entry_adjustments = self.MAX_ENTRY_POSITION_ADJUSTMENT - add_position_threshold = self.ADD_POSITION_THRESHOLD - exit_position_ratio = self.EXIT_POSITION_RATIO - trailing_stop_start = self.TRAILING_STOP_START - trailing_stop_distance = self.TRAILING_STOP_DISTANCE - logger.info(f"{pair} ⚖️ 震荡趋势仓位管理参数: max_entries={max_entry_adjustments}, add_thresh={add_position_threshold:.4f}, exit_ratio={exit_position_ratio:.2%}") - - # 加仓逻辑 - if trade.nr_of_successful_entries <= max_entry_adjustments + 1: - # 动态调整加仓阈值 - if trend_status == "bullish": - add_threshold = 90 - 20 * (trend_score / 100) # 上涨趋势下更宽松 - elif trend_status == "bearish": - add_threshold = 70 - 30 * (trend_score / 100) # 下跌趋势下更谨慎 - else: - add_threshold = 80 - 30 * (trend_score / 100) # 震荡趋势标准 - - if profit_ratio <= add_position_threshold and hold_time > 5 and trend_score <= add_threshold: - add_count = trade.nr_of_successful_entries - 1 - - # 根据趋势状态调整加仓倍数 - if trend_status == "bullish": - multipliers = [1.5, 3, 6] # 上涨趋势下更保守的加仓 - elif trend_status == "bearish": - multipliers = [1, 2, 4] # 下跌趋势下更激进的加仓(抄底) - else: - multipliers = [2, 4, 8] # 震荡趋势标准加仓 - - if add_count < len(multipliers): - multiplier = multipliers[add_count] - add_amount = initial_stake_amount * multiplier - - if min_stake is not None and add_amount < min_stake: - logger.warning(f"{pair} 加仓金额 {add_amount:.2f} 低于最小下注金额 {min_stake:.2f}") - return (None, f"Add amount {add_amount:.2f} below min_stake {min_stake:.2f}") - if add_amount > max_stake: - add_amount = max_stake - - logger.info(f"{pair} 趋势状态: {trend_status}, 价格下跌 {profit_ratio*100:.2f}%,触发第 {add_count + 1} 次加仓 {add_amount:.2f}") - return (add_amount, f"Trend: {trend_status}, Price dropped {profit_ratio*100:.2f}%, add {add_amount:.2f}") - - # 减仓逻辑 - if profit_ratio >= exit_position_ratio: - # 根据趋势状态调整减仓比例 - if trend_status == "bullish": - reduce_factor = 0.5 # 上涨趋势下只减仓50% - elif trend_status == "bearish": - reduce_factor = 1.0 # 下跌趋势下全部减仓 - else: - reduce_factor = 0.8 # 震荡趋势下减仓80% - - reduce_amount = -trade.stake_amount * reduce_factor - logger.info(f"{pair} 趋势状态: {trend_status}, 利润 {profit_ratio*100:.2f}%,减仓 {abs(reduce_amount):.2f} ({reduce_factor*100:.0f}%)") - return (reduce_amount, f"Trend: {trend_status}, Profit {profit_ratio*100:.2f}%, reduce {abs(reduce_amount):.2f}") - - # 追踪止损逻辑 - if profit_ratio >= trailing_stop_start and not self.trailing_stop_enabled: - self.trailing_stop_enabled = True - trade.adjust_min_max_rates(current_rate, current_rate) - logger.info(f"{pair} 趋势状态: {trend_status}, 价格上涨超过 {trailing_stop_start*100:.1f}%,启动追踪止损") - return None - - if self.trailing_stop_enabled: - max_rate = trade.max_rate or current_rate - trailing_stop_price = max_rate * (1 - trailing_stop_distance) - if current_rate < trailing_stop_price: - logger.info(f"{pair} 趋势状态: {trend_status}, 价格回落至 {trailing_stop_price:.6f},触发全部卖出") - return (-trade.stake_amount, f"Trend: {trend_status}, Trailing stop at {trailing_stop_price:.6f}") - trade.adjust_min_max_rates(current_rate, trade.min_rate) - return None - - return None - - def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, - time_in_force: str, current_time: datetime, **kwargs) -> bool: - # 调试日志:记录输入参数 - logger.info(f"[{pair}] confirm_trade_entry called with rate={rate}, type(rate)={type(rate)}, " - f"amount={amount}, order_type={order_type}, time_in_force={time_in_force}") - - # 检查 rate 是否有效 - if not isinstance(rate, (float, int)) or rate is None: - logger.error(f"[{pair}] Invalid rate value: {rate} (type: {type(rate)}). Skipping trade entry.") - return False - - # 获取当前数据 - dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) - last_candle = dataframe.iloc[-1] - - market_trend_score = self.get_market_trend(dataframe=DataFrame, metadata={'pair': pair}) - # 修正逻辑:趋势得分越低(熊市),冷却期越长;得分越高(牛市),冷却期越短 - cooldown_period_minutes = self.COOLDOWN_PERIOD_MINUTES if market_trend_score < 50 else self.COOLDOWN_PERIOD_MINUTES // 2 - - if pair in self.last_entry_time: - last_time = self.last_entry_time[pair] - if (current_time - last_time).total_seconds() < cooldown_period_minutes * 60: - logger.info(f"[{pair}] 冷却期内({cooldown_period_minutes} 分钟),跳过本次入场") - return False - - self.last_entry_time[pair] = current_time - self.trailing_stop_enabled = False - try: - logger.info(f"[{pair}] 确认入场,价格:{float(rate):.6f}") - except (ValueError, TypeError) as e: - logger.error(f"[{pair}] Failed to format rate: {rate} (type: {type(rate)}), error: {e}") - return False - - return True - def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, - rate: float, time_in_force: str, exit_reason: str, - current_time: datetime, **kwargs) -> bool: - adjusted_rate = rate * (1 + 0.0025) - logger.info(f"[{pair}] 退出交易,原因:{exit_reason}, 原始利润:{trade.calc_profit_ratio(rate):.2%},"f"调整后卖出价:{adjusted_rate:.6f}") - return True - - def custom_roi(self, trade: Trade, current_profit: float, current_time: datetime, trade_dur: int, - current_rate: float = None, min_stake: float | None = None, max_stake: float | None = None) -> dict: - """ - 动态调整 ROI 表格,基于 FreqAI 预测的 &-price_value_divergence 和 RSI。 - - 负的 divergence(预测上涨)或低 RSI 时提高 ROI。 - - 正的 divergence(预测下跌)或高 RSI 时降低 ROI。 - - 长时间持仓降低 ROI 目标。 - """ - pair = trade.pair - logger.info(f"[{pair}] 计算自定义 ROI,当前盈利: {current_profit:.2%}, 持仓时间: {trade_dur} 分钟") - - # 获取最新数据 - dataframe = self.dp.get_pair_dataframe(pair=pair, timeframe=self.timeframe) - dataframe = self.populate_indicators(dataframe, {'pair': pair}) # 计算指标 - - # 获取 FreqAI 预测和 RSI - divergence = dataframe["&-price_value_divergence"].iloc[-1] if "&-price_value_divergence" in dataframe else 0 - rsi = dataframe["rsi"].iloc[-1] if "rsi" in dataframe else 50 - - # 计算调整系数 - # 1. Divergence 调整:负值(预测上涨)-> 提高 ROI,正值(预测下跌)-> 降低 ROI - divergence_factor = self.linear_map(divergence, -0.1, 0.1, 1.2, 0.8) - - # 2. RSI 调整:低 RSI(超卖)-> 提高 ROI,高 RSI(超买)-> 降低 ROI - rsi_factor = self.linear_map(rsi, 30, 70, 1.2, 0.8) - - # 3. 时间调整:持仓时间越长,ROI 目标降低 - time_factor = self.linear_map(trade_dur, 0, 240, 1.0, 0.7) # 4小时后 ROI 降低到 70% - - # 综合调整系数 - roi_factor = divergence_factor * rsi_factor * time_factor - - # 默认 ROI 表格 - base_roi = { - 0: 0.06, - 30: 0.04, - 90: 0.025, - 270: 0.002 - } - - # 动态调整 ROI,限制在 0% 到 20% 之间 - dynamic_roi = {time: min(max(roi * roi_factor, 0.0), 0.2) for time, roi in base_roi.items()} - - logger.info(f"[{pair}] Divergence: {divergence:.4f}, RSI: {rsi:.2f}, 持仓时间: {trade_dur} 分钟, " - f"调整系数: divergence={divergence_factor:.2f}, rsi={rsi_factor:.2f}, time={time_factor:.2f}, " - f"总系数={roi_factor:.2f}, 动态 ROI 表格: {dynamic_roi}") - return dynamic_roi - - def custom_entry_price(self, pair: str, trade: Trade | None, current_time: datetime, proposed_rate: float, - entry_tag: str | None, side: str, **kwargs) -> float: - adjusted_rate = proposed_rate * (1 - 0.005) - logger.info(f"[{pair}] 自定义买入价:{adjusted_rate:.6f}(原价:{proposed_rate:.6f})") - return adjusted_rate - - def custom_exit_price(self, pair: str, trade: Trade, - current_time: datetime, proposed_rate: float, - current_profit: float, exit_tag: str | None, **kwargs) -> float: - adjusted_rate = proposed_rate * (1 + 0.0025) - logger.info(f"[{pair}] 自定义卖出价:{adjusted_rate:.6f}(原价:{proposed_rate:.6f})") - return adjusted_rate + # 综合熊市信号 + bearish_signal = (cond_a & cond_b) | (cond_a & cond_c) | (cond_b & cond_c) + return bearish_signal def get_market_trend(self, dataframe: DataFrame = None, metadata: dict = None) -> int: + """计算市场趋势得分""" try: timeframes = ["3m", "15m", "1h"] weights = {"3m": 0.3, "15m": 0.35, "1h": 0.35} trend_scores = {} pair = metadata.get('pair', 'Unknown') if metadata else 'Unknown' - - # 检查 pair 是否有效 - if pair == 'Unknown': - logger.error(f"[{pair}] Invalid pair in metadata: {metadata}. Returning default score 50") - return 50 - - logger.info(f"[{pair}] 正在计算多时间框架市场趋势得分") - + for tf in timeframes: - # 优先使用传入的 dataframe(如果匹配主时间框架),否则加载目标币对数据 pair_df = dataframe if tf == self.timeframe and dataframe is not None else self.dp.get_pair_dataframe(pair=pair, timeframe=tf) - + min_candles = 200 if tf == "3m" else 100 if tf == "15m" else 50 if pair_df.empty or len(pair_df) < min_candles: - logger.warning(f"[{pair}] 数据不足({tf}),使用默认得分:50") trend_scores[tf] = 50 continue - + # 价格趋势 ema_short_period = 50 if tf == "3m" else 20 if tf == "15m" else 12 ema_long_period = 200 if tf == "3m" else 80 if tf == "15m" else 50 - pair_df["ema_short"] = ta.EMA(pair_df, timeperiod=ema_short_period) - pair_df["ema_long"] = ta.EMA(pair_df, timeperiod=ema_long_period) - pair_df["ema_short_slope"] = (pair_df["ema_short"] - pair_df["ema_short"].shift(10)) / pair_df["ema_short"].shift(10) - - price_above_ema = pair_df["close"].iloc[-1] > pair_df["ema_long"].iloc[-1] - ema_short_above_ema_long = pair_df["ema_short"].iloc[-1] > pair_df["ema_long"].iloc[-1] - ema_short_slope = pair_df["ema_short_slope"].iloc[-1] - + + ema_short = ta.EMA(pair_df, timeperiod=ema_short_period) + ema_long = ta.EMA(pair_df, timeperiod=ema_long_period) + ema_short_slope = (ema_short - ema_short.shift(10)) / ema_short.shift(10) + + price_above_ema = pair_df["close"].iloc[-1] > ema_long.iloc[-1] + ema_short_above_ema_long = ema_short.iloc[-1] > ema_long.iloc[-1] + ema_short_slope_val = ema_short_slope.iloc[-1] + price_score = 0 if price_above_ema: price_score += 20 if ema_short_above_ema_long: price_score += 20 - if ema_short_slope > 0.005: + if ema_short_slope_val > 0.005: price_score += 15 - elif ema_short_slope < -0.005: + elif ema_short_slope_val < -0.005: price_score -= 15 - + # K线形态 - pair_df["bullish_engulfing"] = ( + bullish_engulfing = ( (pair_df["close"].shift(1) < pair_df["open"].shift(1)) & (pair_df["close"] > pair_df["open"]) & (pair_df["close"] > pair_df["open"].shift(1)) & (pair_df["open"] < pair_df["close"].shift(1)) ).fillna(False) - pair_df["bearish_engulfing"] = ( + + bearish_engulfing = ( (pair_df["close"].shift(1) > pair_df["open"].shift(1)) & (pair_df["close"] < pair_df["open"]) & (pair_df["close"] < pair_df["open"].shift(1)) & (pair_df["open"] > pair_df["close"].shift(1)) ).fillna(False) - + kline_score = 0 - if pair_df["bullish_engulfing"].iloc[-1]: + if bullish_engulfing.iloc[-1]: kline_score += 15 - elif pair_df["bearish_engulfing"].iloc[-1]: + elif bearish_engulfing.iloc[-1]: kline_score -= 15 - volatility = pair_df["close"].pct_change(10, fill_method=None).std() * 100 - if volatility > 0.5: - kline_score += 10 if price_score > 0 else -10 - + # StochRSI stochrsi = ta.STOCHRSI(pair_df, timeperiod=14, fastk_period=3, fastd_period=3) - pair_df["stochrsi_k"] = stochrsi["fastk"] - pair_df["stochrsi_d"] = stochrsi["fastd"] - + stochrsi_k = stochrsi["fastk"].iloc[-1] + stochrsi_d = stochrsi["fastd"].iloc[-1] + stochrsi_score = 0 - stochrsi_k = pair_df["stochrsi_k"].iloc[-1] - stochrsi_d = pair_df["stochrsi_d"].iloc[-1] if stochrsi_k > 80 and stochrsi_k < stochrsi_d: stochrsi_score -= 15 elif stochrsi_k < 20 and stochrsi_k > stochrsi_d: @@ -1400,408 +590,107 @@ class FreqaiPrimer(IStrategy): stochrsi_score += 5 elif stochrsi_k < 50: stochrsi_score -= 5 - + # 量价关系 - pair_df["volume_mean_20"] = pair_df["volume"].rolling(20).mean() - pair_df["volume_std_20"] = pair_df["volume"].rolling(20).std() - pair_df["volume_z_score"] = (pair_df["volume"] - pair_df["volume_mean_20"]) / pair_df["volume_std_20"] - pair_df["adx"] = ta.ADX(pair_df, timeperiod=14) - + volume_mean = pair_df["volume"].rolling(20).mean() + volume_std = pair_df["volume"].rolling(20).std() + volume_z_score = (pair_df["volume"] - volume_mean) / volume_std.replace(0, 1) + adx = ta.ADX(pair_df, timeperiod=14) + volume_score = 0 - if pair_df["volume_z_score"].iloc[-1] > 1.5: + if volume_z_score.iloc[-1] > 1.5: volume_score += 10 if price_score > 0 else -10 - if pair_df["adx"].iloc[-1] > 25: + if adx.iloc[-1] > 25: volume_score += 10 if price_score > 0 else -10 - + # 综合得分 raw_score = price_score + kline_score + stochrsi_score + volume_score raw_score = max(min(raw_score, 50), -50) - - # 对数映射到 [0, 100] + + # 映射到0-100 if raw_score >= 0: mapped_score = 50 + 50 * (np.log1p(raw_score / 50) / np.log1p(1)) else: mapped_score = 50 * (np.log1p(-raw_score / 50) / np.log1p(1)) - + trend_scores[tf] = max(0, min(100, int(round(mapped_score)))) - logger.info(f"[{pair}] {tf} 趋势得分:{trend_scores[tf]}, 原始得分:{raw_score}, " - f"价格得分:{price_score}, K线得分:{kline_score}, " - f"StochRSI得分:{stochrsi_score}, 量价得分:{volume_score}") - - # 动态调整权重 - if trend_scores.get("1h", 50) - trend_scores.get("3m", 50) > 20 or trend_scores.get("15m", 50) - trend_scores.get("3m", 50) > 20: - weights = {"3m": 0.2, "15m": 0.35, "1h": 0.45} - logger.info(f"[{pair}] 1h 趋势得分({trend_scores.get('1h', 50)})显著高于 3m({trend_scores.get('3m', 50)}),调整权重为 {weights}") - + # 加权融合 - final_score = sum(trend_scores[tf] * weights[tf] for tf in timeframes) - final_score = int(round(final_score)) - final_score = max(0, min(100, final_score)) - - logger.info(f"[{pair}] 最终趋势得分:{final_score}, " - f"3m得分:{trend_scores.get('3m', 50)}, 15m得分:{trend_scores.get('15m', 50)}, " - f"1h得分:{trend_scores.get('1h', 50)}") + final_score = sum(trend_scores.get(tf, 50) * weights[tf] for tf in timeframes) + final_score = max(0, min(100, int(round(final_score)))) + return final_score - + except Exception as e: - logger.error(f"[{pair}] 获取市场趋势失败:{e}", exc_info=True) + logger.error(f"获取市场趋势失败: {e}") return 50 - def get_trend_score_with_cache(self, pair: str, timeframe: str, timestamp: int, dataframe: DataFrame, metadata: dict) -> float: - """ - 三级缓存架构获取趋势得分: - 1. 本地内存缓存(一级缓存)- 最快,无网络开销 - 2. Redis缓存(二级缓存)- 局域网共享,减少重复计算 - 3. 实时计算(三级缓存)- 最慢,但保证准确性 - """ - - # 初始化本地缓存(如果尚未初始化) - if not hasattr(self, '_local_cache'): - self._local_cache = {} - self._local_cache_stats = {'hits': 0, 'misses': 0, 'redis_hits': 0, 'computes': 0} - logger.info("🚀 初始化本地内存缓存") - - # 创建 Redis 客户端(如果尚未创建) - if not hasattr(self, 'redis_client') and self.redis_url: - try: - self.redis_client = redis.from_url(self.redis_url) - logger.info("✅ Redis 客户端已成功初始化") - except Exception as e: - logger.error(f"❌ 初始化 Redis 客户端失败: {e}") - # Redis 失败时继续运行,降级为本地缓存 - self.redis_client = None - - # 生成统一的缓存键 - strategy_name = "freqaiprimer" - timeframes_str = "3m-15m-1h" - # 将时间戳精度从秒改为分钟,去掉最后两位秒数 - minute_timestamp = int(timestamp // 60) * 60 # 向下取整到分钟 - cache_key = f"{strategy_name}|trend_score|{pair.replace('/', '-')}|{timeframes_str}|{minute_timestamp}" - logger.debug(f"[{pair}] 生成缓存键:{cache_key} (时间戳: {minute_timestamp})") - - # 🎯 一级缓存:本地内存检查 - if cache_key in self._local_cache: - cached_score = self._local_cache[cache_key] - self._local_cache_stats['hits'] += 1 - logger.info(f"[{pair}] 🟢 本地缓存命中:key={cache_key}, value={cached_score:.2f}") - return cached_score - - self._local_cache_stats['misses'] += 1 - logger.debug(f"[{pair}] 本地缓存未命中:key={cache_key}") - - # 🎯 二级缓存:Redis检查 - redis_client = self._get_redis_client() - if redis_client: - logger.debug(f"[{pair}] 尝试从 Redis 查询趋势得分,key={cache_key}") - try: - cached_score = redis_client.get(cache_key) - if cached_score is not None: - score = float(cached_score) - # 同时写入本地缓存,加速后续访问 - self._local_cache[cache_key] = score - self._local_cache_stats['redis_hits'] += 1 - logger.info(f"[{pair}] 🟡 Redis 缓存命中:key={cache_key}, value={score:.2f} (已同步到本地缓存)") - return score - else: - logger.info(f"[{pair}] 🔴 Redis 缓存未命中:key={cache_key}") - except Exception as e: - logger.error(f"[{pair}] Redis 查询失败,key={cache_key}, 错误: {e}") - # Redis 失败时继续运行,降级为本地缓存 - - # 🎯 三级缓存:实时计算 - logger.info(f"[{pair}] 开始计算趋势得分 (时间戳: {timestamp})") - trend_score = self.get_market_trend(dataframe=dataframe, metadata=metadata) - self._local_cache_stats['computes'] += 1 - logger.info(f"[{pair}] ✅ 成功计算趋势得分: {trend_score:.2f}") - - # 将结果写入两级缓存 - self._local_cache[cache_key] = trend_score - - redis_client = self._get_redis_client() - if redis_client: - logger.debug(f"[{pair}] 尝试将趋势得分写入 Redis,key={cache_key}, value={trend_score:.2f}") - try: - redis_client.setex(cache_key, 86400, trend_score) - logger.info(f"[{pair}] ✅ 成功将趋势得分存储到 Redis,key={cache_key}, value={trend_score:.2f}") - except Exception as e: - logger.error(f"[{pair}] Redis 写入失败,key={cache_key}, 错误: {e}") - # Redis 失败时继续运行 - - # 定期打印缓存统计信息 - total_requests = sum(self._local_cache_stats.values()) - if total_requests % 100 == 0 and total_requests > 0: - hit_rate = (self._local_cache_stats['hits'] + self._local_cache_stats['redis_hits']) / total_requests * 100 - logger.info(f"📊 缓存统计 - 本地命中率: {self._local_cache_stats['hits']/total_requests*100:.1f}%, " - f"Redis命中率: {self._local_cache_stats['redis_hits']/total_requests*100:.1f}%, " - f"计算次数: {self._local_cache_stats['computes']}, " - f"总命中率: {hit_rate:.1f}%") - - return trend_score - - def cleanup_local_cache(self): - """ - 清理本地缓存,防止内存泄漏 - """ - if hasattr(self, '_local_cache'): - cache_size = len(self._local_cache) - self._local_cache.clear() - logger.info(f"🧹 已清理本地缓存,清除条目数: {cache_size}") - - def get_cache_stats(self) -> dict: - """ - 获取缓存统计信息 - """ - if hasattr(self, '_local_cache_stats'): - total_requests = sum(self._local_cache_stats.values()) - if total_requests > 0: - return { - 'local_hits': self._local_cache_stats['hits'], - 'redis_hits': self._local_cache_stats['redis_hits'], - 'misses': self._local_cache_stats['misses'], - 'computes': self._local_cache_stats['computes'], - 'total_requests': total_requests, - 'local_hit_rate': self._local_cache_stats['hits'] / total_requests * 100, - 'redis_hit_rate': self._local_cache_stats['redis_hits'] / total_requests * 100, - 'overall_hit_rate': (self._local_cache_stats['hits'] + self._local_cache_stats['redis_hits']) / total_requests * 100 - } - return {'local_hits': 0, 'redis_hits': 0, 'misses': 0, 'computes': 0, 'total_requests': 0} - - def bot_loop_start(self, current_time: datetime, **kwargs) -> None: - """ - 每个交易循环开始时调用,用于自动清理本地缓存 - """ - if hasattr(self, '_local_cache') and self._local_cache: - cache_size = len(self._local_cache) - - # 每1000次循环清理一次,或当缓存超过500条时清理 - if not hasattr(self, '_loop_counter'): - self._loop_counter = 0 - - self._loop_counter += 1 - - if self._loop_counter % 1000 == 0 or cache_size > 500: - logger.info(f"🧹 自动清理本地缓存 - 循环次数: {self._loop_counter}, 缓存大小: {cache_size}") - self.cleanup_local_cache() - - # 打印缓存统计 - stats = self.get_cache_stats() - if stats['total_requests'] > 0: - logger.info(f"📊 缓存性能统计: 本地命中率 {stats['local_hit_rate']:.1f}%, " - f"Redis命中率 {stats['redis_hit_rate']:.1f}%, " - f"总命中率 {stats['overall_hit_rate']:.1f}%") - - def bot_start(self, **kwargs) -> None: - """ - 机器人启动时调用,初始化缓存相关设置 - """ - logger.info("🚀 策略启动 - 初始化多级缓存系统") - - # 初始化本地缓存 - if not hasattr(self, '_local_cache'): - self._local_cache = {} - self._local_cache_stats = {'hits': 0, 'misses': 0, 'redis_hits': 0, 'computes': 0} - logger.info("✅ 本地内存缓存已初始化") - - # 设置最大本地缓存大小 - self._max_local_cache_size = 500 - - # 初始化循环计数器 - self._loop_counter = 0 - - def bot_stop(self, **kwargs) -> None: - """ - 机器人停止时调用,清理所有缓存 - """ - logger.info("🛑 策略停止 - 清理所有缓存") - - # 清理本地缓存 - if hasattr(self, '_local_cache'): - cache_size = len(self._local_cache) - self.cleanup_local_cache() - - # 打印最终统计 - stats = self.get_cache_stats() - if stats['total_requests'] > 0: - logger.info(f"📊 最终缓存统计 - 总请求: {stats['total_requests']}, " - f"本地命中: {stats['local_hits']}, " - f"Redis命中: {stats['redis_hits']}, " - f"计算次数: {stats['computes']}, " - f"总命中率: {stats['overall_hit_rate']:.1f}%") - - # 注意:由于使用延迟初始化,无需关闭持久化的Redis连接 - def detect_trend_status(self, dataframe: DataFrame, metadata: dict) -> str: - """ - 基于分类模型优化 first_length 的趋势检测 - 规则: - - 使用 LightGBMClassifier 预测最优 first_length 类别 - - 类别映射:0:激进(2), 1:中性(4), 2:稳健(6), 3:保守(8), 4:极保守(10) - - 根据预测的类别动态调整段长,而非固定使用2 - """ + """检测趋势状态""" pair = metadata.get('pair', 'Unknown') - # 检查数据完整性 if len(dataframe) == 0: - logger.warning(f"[{pair}] ⚠️ 数据为空,返回震荡趋势") return "ranging" try: - # 使用分类模型预测最优 first_length - # 检查所有可能的分类列(精确匹配实际列名格式) - classification_cols = [col for col in dataframe.columns - if col in ["&*-optimal_first_length", "optimal_first_length_pred"]] - if classification_cols: - logger.info(f"[{pair}] 📊 检测到分类相关列: {classification_cols}") - else: - logger.debug(f"[{pair}] 📊 未检测到分类相关列") - - # 调试:显示所有包含"optimal"的列名 - all_optimal_cols = [col for col in dataframe.columns if "optimal" in str(col).lower()] - if all_optimal_cols: - logger.debug(f"[{pair}] 所有包含'optimal'的列: {all_optimal_cols}") - else: - logger.debug(f"[{pair}] 未找到任何包含'optimal'的列") - - if "optimal_first_length_pred" in dataframe.columns: - predicted_value = dataframe["optimal_first_length_pred"].iloc[-1] - if pd.notna(predicted_value): - # 处理浮点预测值,四舍五入到最近的整数类别 - optimal_length_class = int(round(float(predicted_value))) - length_mapping = {0: 2, 1: 4, 2: 6, 3: 8, 4: 10} - first_length = length_mapping.get(optimal_length_class, 2) - logger.info(f"[{pair}] ✅ 使用 optimal_first_length_pred: first_length={first_length} (原始值: {predicted_value}, 类别: {optimal_length_class})") - else: - first_length = 2 - logger.warning(f"[{pair}] ⚠️ optimal_first_length_pred 为NaN,使用默认值: {first_length}") - elif "&*-optimal_first_length" in dataframe.columns: - # 直接检查原始列 + # 获取分类模型预测 + if "&*-optimal_first_length" in dataframe.columns: predicted_value = dataframe["&*-optimal_first_length"].iloc[-1] if pd.notna(predicted_value): optimal_length_class = int(round(float(predicted_value))) length_mapping = {0: 2, 1: 4, 2: 6, 3: 8, 4: 10} first_length = length_mapping.get(optimal_length_class, 2) - logger.info(f"[{pair}] ✅ 使用 &*-optimal_first_length: first_length={first_length} (原始值: {predicted_value}, 类别: {optimal_length_class})") else: first_length = 2 - logger.warning(f"[{pair}] ⚠️ &*-optimal_first_length 为NaN,使用默认值: {first_length}") else: first_length = 2 - # 只在首次或特定条件下显示警告 - if not hasattr(self, '_classification_warning_shown'): - logger.warning(f"[{pair}] ⚠️ 未找到分类模型列,使用默认值: {first_length}") - self._classification_warning_shown = True - else: - logger.debug(f"[{pair}] 未找到分类模型列,使用默认值: {first_length}") - - # 根据 first_length 动态调整其他段长 + + # 计算段长 second_length = first_length * 3 third_length = first_length * 5 - - # 计算总长度 total_length_needed = first_length + second_length + third_length - # 检查数据是否充足 if len(dataframe) < total_length_needed: - logger.warning(f"[{pair}] 数据不足{total_length_needed}个周期,返回震荡趋势") return "ranging" - # 获取所需长度的trend_score历史 + # 获取趋势得分历史 trend_scores = [] - actual_total_length = len(dataframe) - for i in range(-total_length_needed, 0): - # 确保索引在有效范围内 - if abs(i) > actual_total_length: - logger.warning(f"[{pair}] 索引 {i} 超出数据范围,使用默认趋势得分 50") + hist_df = dataframe.iloc[:i+1] if i != -1 else dataframe + if len(hist_df) > 0: + timestamp = int(hist_df.index[-1].timestamp()) if hasattr(hist_df.index[-1], 'timestamp') else 0 + score = self.get_market_trend(dataframe=hist_df, metadata=metadata) + trend_scores.append(score) + else: trend_scores.append(50) - continue - - # 获取历史数据片段 - end_idx = i + 1 if i != -1 else None - hist_df = dataframe.iloc[:end_idx] - - if hist_df.empty: - logger.warning(f"[{pair}] 历史数据片段为空,使用默认趋势得分 50") - trend_scores.append(50) - continue - - # 获取时间戳 - 统一使用整数时间戳 - try: - last_idx = hist_df.index[-1] - if isinstance(last_idx, pd.Timestamp): - # 确保时间戳是无时区的整数 - ts = last_idx.tz_localize(None) if last_idx.tz else last_idx - timestamp = int(ts.timestamp()) - elif isinstance(last_idx, (int, np.integer)): - # 如果索引已经是整数,直接使用 - timestamp = int(last_idx) - elif hasattr(last_idx, 'timestamp'): - timestamp = int(last_idx.timestamp()) - else: - # 使用当前时间的整数时间戳 - timestamp = int(pd.Timestamp.utcnow().timestamp()) - except Exception as e: - # 使用当前时间的整数时间戳作为fallback - timestamp = int(pd.Timestamp.utcnow().timestamp()) - - # 获取趋势得分 - score = self.get_trend_score_with_cache( - pair=pair, - timeframe=self.timeframe, - timestamp=timestamp, - dataframe=hist_df, - metadata=metadata - ) - trend_scores.append(score) - # 验证结果数量 + # 验证数据 if len(trend_scores) < total_length_needed: - logger.warning(f"[{pair}] 只获取到 {len(trend_scores)} 个趋势得分,需要{total_length_needed}个") - while len(trend_scores) < total_length_needed: - trend_scores.append(50) + trend_scores.extend([50] * (total_length_needed - len(trend_scores))) # 分段计算加权得分 - # 第一段:最近first_length个周期 segment1 = trend_scores[-first_length:] - weighted_score1 = sum(score * 10 for score in segment1) / len(segment1) - - # 第二段:接下来的second_length个周期 segment2 = trend_scores[-(first_length + second_length):-first_length] - weighted_score2 = sum(score * 7 for score in segment2) / len(segment2) - - # 第三段:最后的third_length个周期 segment3 = trend_scores[-total_length_needed:-(first_length + second_length)] + + weighted_score1 = sum(score * 10 for score in segment1) / len(segment1) + weighted_score2 = sum(score * 7 for score in segment2) / len(segment2) weighted_score3 = sum(score * 3 for score in segment3) / len(segment3) - # 计算最终加权得分 - final_weighted_score = (weighted_score1 + weighted_score2 + weighted_score3) / (10 + 7 + 3) - - # 将得分映射到0-100区间 + final_weighted_score = (weighted_score1 + weighted_score2 + weighted_score3) / 20 final_score = max(0, min(100, final_weighted_score)) - # 使用hyperopt优化的阈值判断趋势状态 + # 使用优化的阈值判断趋势 bullish_threshold = self.trend_final_bullish_threshold.value bearish_threshold = self.trend_final_bearish_threshold.value - # 判定趋势状态 if final_score >= bullish_threshold: - trend_status = "bullish" - logger.info(f"[{pair}] 🚀 检测到上涨趋势: 最终加权得分={final_score:.2f}, 阈值≥{bullish_threshold}, 段长={first_length},{second_length},{third_length}") + return "bullish" elif final_score <= bearish_threshold: - trend_status = "bearish" - logger.info(f"[{pair}] 📉 检测到下跌趋势: 最终加权得分={final_score:.2f}, 阈值≤{bearish_threshold}, 段长={first_length},{second_length},{third_length}") + return "bearish" else: - trend_status = "ranging" - logger.info(f"[{pair}] ⚖️ 检测到震荡趋势: 最终加权得分={final_score:.2f}, 阈值范围({bearish_threshold}, {bullish_threshold}), 段长={first_length},{second_length},{third_length}") - - # 输出分段详细信息用于调试 - logger.debug(f"[{pair}] 趋势分析详情 - " - f"第一段({first_length}个,权重10): {[f'{s:.1f}' for s in segment1]}, " - f"第二段({second_length}个,权重7): {[f'{s:.1f}' for s in segment2]}, " - f"第三段({third_length}个,权重3): {[f'{s:.1f}' for s in segment3]}") - - return trend_status - + return "ranging" + except Exception as e: logger.error(f"[{pair}] 趋势状态检测失败: {e}") return "ranging"