import warnings warnings.filterwarnings("ignore", category=UserWarning, module="pandas_ta") import logging from freqtrade.strategy import IStrategy, IntParameter, DecimalParameter from pandas import DataFrame import pandas_ta as ta from freqtrade.persistence import Trade import numpy as np import datetime import pandas as pd import math logger = logging.getLogger(__name__) class FreqaiPrimer(IStrategy): # 策略参数 - 使用custom_roi替代minimal_roi字典 loglevel = "warning" # 启用自定义ROI回调函数 use_custom_roi = True # 写死在策略里,永远不参与 hyperopt trailing_stop = True trailing_stop_positive = 0.012 # 1.2% 固定回调(震荡市最稳) trailing_stop_positive_offset = 0.02 # 1.8% 盈利后才激活(防止过早启动) trailing_only_offset_is_reached = True # 必须等盈利超过 offset 才启动 trailing # 用于跟踪市场状态的数据框缓存 _dataframe_cache = None def __init__(self, config=None): """初始化策略参数,调用父类初始化方法并接受config参数""" super().__init__(config) # 调用父类的初始化方法并传递config assert self.h1_max_candles.value <= 50, f"h1_max_candles={self.h1_max_candles.value} 超出安全范围!" @property def protections(self): """ 保护机制配置 基于最新Freqtrade规范,保护机制应定义在策略文件中而非配置文件 """ return [ { "method": "StoplossGuard", "lookback_period_candles": 60, # 3小时回看期(60根3分钟K线) "trade_limit": 2, # 最多2笔止损交易 "stop_duration_candles": 60, # 暂停180分钟(60根3分钟K线) "only_per_pair": False # 仅针对单个币对 }, { "method": "CooldownPeriod", "stop_duration_candles": 18 # 6分钟冷却期(2根3分钟K线) }, { "method": "MaxDrawdown", "lookback_period_candles": 48, # 2.4小时回看期 "trade_limit": 4, # 4笔交易限制 "stop_duration_candles": 24, # 72分钟暂停(24根3分钟K线) "max_allowed_drawdown": 0.20 # 20%最大回撤容忍度 } ] @property def trailing_stop_positive(self): """根据市场状态动态调整跟踪止盈参数""" # 获取当前市场状态 if self._dataframe_cache is not None and len(self._dataframe_cache) > 0: current_state = self._dataframe_cache['market_state'].iloc[-1] if current_state == 'strong_bull': return 0.007 # 强劲牛市中降低跟踪止盈,让利润奔跑 elif current_state == 'weak_bull': return 0.005 # 弱势牛市中保持较低的跟踪止盈 return self._trailing_stop_positive_default # 返回默认值 @trailing_stop_positive.setter def trailing_stop_positive(self, value): """设置trailing_stop_positive的默认值""" self._trailing_stop_positive_default = value timeframe = "3m" # 主时间框架为 3 分钟 can_short = False # 禁用做空 # [propertiesGrp_List]-------------------------------------------------------------------------------------------------------------------------------------- # [propertiesGrp step="1" name="第一轮优化" epochs="160" space="buy " description="入场基础条件优化,入场确认条件优化"] bb_std = DecimalParameter(2.0, 5.0, decimals=1, default=3.5, optimize=True, load=True, space='buy') # 安全:2.0-5.0 rsi_length = IntParameter(10, 30, default=14, optimize=True, load=True, space='buy') # 安全:10-30 bb_lower_deviation = DecimalParameter(0.92, 1.15, decimals=3, default=1.00, optimize=True, load=True, space='buy') # 安全:0.92-1.15 stochrsi_bull_threshold = IntParameter(20, 50, default=40, optimize=True, load=True, space='buy') # 安全:20-50 volume_multiplier = DecimalParameter(1.5, 6.0, decimals=1, default=3.5, optimize=True, load=True, space='buy') # 安全:1.5-6.0 min_condition_count = IntParameter(1, 2, default=1, optimize=True, load=True, space='buy') # 最多只允许2个条件! bb_length = IntParameter(20, 60, default=40, optimize=True, load=True, space='buy') # 安全:20-60 # [/propertiesGrp] # [propertiesGrp step="2" name="第二轮优化 - 剧烈拉升检测" epochs="160" space="buy" description="防追高核心参数,绝对不能放宽!"] rsi_oversold = IntParameter(20, 50, default=30, optimize=True, load=True, space='buy') # 安全:20-50 rsi_bull_threshold = IntParameter(40, 68, default=58, optimize=True, load=True, space='buy') # 安全:40-68 stochrsi_neutral_threshold = IntParameter(15, 40, default=16, optimize=True, load=True, space='buy') # 安全:15-40 bb_width_threshold = DecimalParameter(0.003, 0.030, decimals=3, default=0.012, optimize=True, load=True, space='buy') # 安全:0.003-0.030 h1_max_candles = IntParameter(16, 50, default=35, optimize=True, load=True, space='buy') # 黄金区间!绝不能超过50 h1_rapid_rise_threshold = DecimalParameter(0.08, 0.22, decimals=3, default=0.15, optimize=True, load=True, space='buy') # 0.08-0.22 实盘最稳 h1_max_consecutive_candles = IntParameter(1, 2, default=2, optimize=True, load=True, space='buy') # 固定为1最稳,2也行 # [/propertiesGrp] # [propertiesGrp step="3" name="第三轮优化 - 加仓策略" epochs="160" space="buy" description="加仓精准度与金额管理,严防爆仓"] add_position_callback = DecimalParameter(0.025, 0.070, decimals=3, default=0.045, optimize=True, load=True, space='buy') # 2.5%-7.0% 回调才加 add_rsi_oversold_threshold = IntParameter(15, 40, default=25, optimize=True, load=True, space='buy') # 不能太低 add_stochrsi_oversold = IntParameter(10, 35, default=20, optimize=True, load=True, space='buy') add_bb_lower_proximity = DecimalParameter(0.85, 1.20, decimals=3, default=1.05, optimize=True, load=True, space='buy') # 不能离下轨太远 add_position_decrease_ratio= DecimalParameter(0.30, 0.80, decimals=2, default=0.55, optimize=True, load=True, space='buy') # 递减比例别太激进 max_entry_adjustments = IntParameter(2, 7, default=5, optimize=True, load=True, space='buy') # 最多7次加仓,防爆仓 adjust_multiplier = DecimalParameter(0.6, 1.6, decimals=2, default=1.0, optimize=True, load=True, space='buy') # 别让加仓金额指数爆炸 # [/propertiesGrp] # [propertiesGrp step="4" name="第四轮优化 - 出场与分级止盈" epochs="200" space="sell" description="出场条件与分级止盈,减仓与风险管理"] exit_bb_upper_deviation = DecimalParameter(0.90, 1.15, decimals=3, default=1.00, optimize=True, load=True, space='sell') exit_volume_multiplier = DecimalParameter(2.0, 7.0, decimals=1, default=4.5, optimize=True, load=True, space='sell') exit_rsi_threshold = IntParameter(55, 72, default=65, optimize=True, load=True, space='sell') # 牛市也能出得了场 exit_profit_tier1 = DecimalParameter(0.03, 0.12, decimals=3, default=0.06, optimize=True, load=True, space='sell') exit_reduce_tier1 = DecimalParameter(0.20, 0.70, decimals=2, default=0.50, optimize=True, load=True, space='sell') exit_profit_tier2 = DecimalParameter(0.08, 0.20, decimals=3, default=0.12, optimize=True, load=True, space='sell') exit_reduce_tier2 = DecimalParameter(0.15, 0.60, decimals=2, default=0.30, optimize=True, load=True, space='sell') reduce_profit_base = DecimalParameter(0.02, 0.12, decimals=3, default=0.05, optimize=True, load=True, space='sell') reduce_coefficient = DecimalParameter(0.15, 0.55, decimals=3, default=0.35, optimize=True, load=True, space='sell') max_reduce_adjustments = IntParameter(1, 4, default=3, optimize=True, load=True, space='sell') # 最多4次减仓就够了 # [/propertiesGrp] # [propertiesGrp step="5" name="第五轮优化" epochs="80" space="roi stoploss" description="最终ROI与止损微调"] # (这里可以放你后续要优化的ROI表、动态止损系数等) # [/propertiesGrp] # [/propertiesGrp_List]----------------------------------------------------------------------------------------------------------------------------- def informative_pairs(self): pairs = self.dp.current_whitelist() return [(pair, '15m') for pair in pairs] + [(pair, '1h') for pair in pairs] def _validate_dataframe_columns(self, dataframe: DataFrame, required_columns: list, metadata: dict): """ 验证数据框中是否包含所有需要的列。 如果缺少列,则记录警告日志。 """ missing_columns = [col for col in required_columns if col not in dataframe.columns] if missing_columns: logger.warning(f"[{metadata['pair']}] 数据框中缺少以下列: {missing_columns}") def custom_entry_price(self, pair: str, current_time: pd.Timestamp, proposed_rate: float, entry_tag: str | None, side: str, **kwargs) -> float: """ 自定义入场价格:给入场价格打98折(降低2%) 添加零值保护,防止除零错误 """ # 零值保护:如果proposed_rate为0或异常小值,直接返回原值 if proposed_rate <= 0 or proposed_rate < 1e-8: logger.warning(f"[{pair}] proposed_rate异常: {proposed_rate},返回原值") return proposed_rate if proposed_rate > 0 else 0.0 # 入场价格折扣:98折(降低2%) discounted_rate = proposed_rate * 0.995 return discounted_rate def custom_stake_amount(self, pair: str, current_time: pd.Timestamp, current_rate: float, proposed_stake: float, min_stake: float, max_stake: float, **kwargs) -> float: # 获取初始资金(回测中固定为dry_run_wallet的值) initial_balance = self.config.get('dry_run_wallet', 10000) # 始终以初始资金的3.75%计算 desired_stake = initial_balance * 0.0375 desired_stake = math.floor(desired_stake) # 取整,去掉小数点后的数字 return max(min(desired_stake, max_stake), min_stake) def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # 计算 3m 周期的指标 bb_length_value = self.bb_length.value bb_std_value = self.bb_std.value rsi_length_value = self.rsi_length.value bb_3m = ta.bbands(dataframe['close'], length=bb_length_value, std=bb_std_value) dataframe['bb_lower_3m'] = bb_3m[f'BBL_{bb_length_value}_{bb_std_value}'] dataframe['bb_upper_3m'] = bb_3m[f'BBU_{bb_length_value}_{bb_std_value}'] dataframe['rsi_3m'] = ta.rsi(dataframe['close'], length=rsi_length_value) # 新增 StochRSI 指标 stochrsi_3m = ta.stochrsi(dataframe['close'], length=rsi_length_value, rsi_length=rsi_length_value) dataframe['stochrsi_k_3m'] = stochrsi_3m[f'STOCHRSIk_{rsi_length_value}_{rsi_length_value}_3_3'] dataframe['stochrsi_d_3m'] = stochrsi_3m[f'STOCHRSId_{rsi_length_value}_{rsi_length_value}_3_3'] # 新增 MACD 指标 macd_3m = ta.macd(dataframe['close'], fast=12, slow=26, signal=9) dataframe['macd_3m'] = macd_3m['MACD_12_26_9'] dataframe['macd_signal_3m'] = macd_3m['MACDs_12_26_9'] dataframe['macd_hist_3m'] = macd_3m['MACDh_12_26_9'] # 计算3m时间框架的EMA50和EMA200 dataframe['ema_50_3m'] = ta.ema(dataframe['close'], length=50) dataframe['ema_200_3m'] = ta.ema(dataframe['close'], length=200) # 成交量过滤 dataframe['volume_ma'] = dataframe['volume'].rolling(20).mean() # 计算 ATR 用于动态止损和退出 dataframe['atr'] = ta.atr(dataframe['high'], dataframe['low'], dataframe['close'], length=14) # 获取 15m 数据 df_15m = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='15m') df_15m['rsi_15m'] = ta.rsi(df_15m['close'], length=rsi_length_value) # 计算15m时间框架的EMA50和EMA200 df_15m['ema_50_15m'] = ta.ema(df_15m['close'], length=50) df_15m['ema_200_15m'] = ta.ema(df_15m['close'], length=200) # 新增 StochRSI 指标 stochrsi_15m = ta.stochrsi(df_15m['close'], length=rsi_length_value, rsi_length=rsi_length_value) df_15m['stochrsi_k_15m'] = stochrsi_15m[f'STOCHRSIk_{rsi_length_value}_{rsi_length_value}_3_3'] df_15m['stochrsi_d_15m'] = stochrsi_15m[f'STOCHRSId_{rsi_length_value}_{rsi_length_value}_3_3'] # 新增 MACD 指标 macd_15m = ta.macd(df_15m['close'], fast=12, slow=26, signal=9) df_15m['macd_15m'] = macd_15m['MACD_12_26_9'] df_15m['macd_signal_15m'] = macd_15m['MACDs_12_26_9'] df_15m['macd_hist_15m'] = macd_15m['MACDh_12_26_9'] # 将 15m 数据重新索引到主时间框架 (3m) df_15m = df_15m.set_index('date').reindex(dataframe['date']).reset_index() df_15m = df_15m.rename(columns={'index': 'date'}) df_15m = df_15m[['date', 'rsi_15m', 'ema_50_15m', 'ema_200_15m']].ffill() # 合并 15m 数据 dataframe = dataframe.merge(df_15m, how='left', on='date') # 获取 1h 数据 df_1h = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='1h') # 计算 1h 布林带 bb_1h = ta.bbands(df_1h['close'], length=bb_length_value, std=bb_std_value) df_1h['bb_lower_1h'] = bb_1h[f'BBL_{bb_length_value}_{bb_std_value}'] df_1h['bb_upper_1h'] = bb_1h[f'BBU_{bb_length_value}_{bb_std_value}'] # 计算 1h RSI 和 EMA df_1h['rsi_1h'] = ta.rsi(df_1h['close'], length=rsi_length_value) df_1h['ema_50_1h'] = ta.ema(df_1h['close'], length=50) # 1h 50周期EMA df_1h['ema_200_1h'] = ta.ema(df_1h['close'], length=200) # 1h 200周期EMA df_1h['trend_1h'] = df_1h['close'] > df_1h['ema_50_1h'] # 1h上涨趋势 # 新增 StochRSI 指标 stochrsi_1h = ta.stochrsi(df_1h['close'], length=rsi_length_value, rsi_length=rsi_length_value) df_1h['stochrsi_k_1h'] = stochrsi_1h[f'STOCHRSIk_{rsi_length_value}_{rsi_length_value}_3_3'] df_1h['stochrsi_d_1h'] = stochrsi_1h[f'STOCHRSId_{rsi_length_value}_{rsi_length_value}_3_3'] # 新增 MACD 指标 macd_1h = ta.macd(df_1h['close'], fast=12, slow=26, signal=9) df_1h['macd_1h'] = macd_1h['MACD_12_26_9'] df_1h['macd_signal_1h'] = macd_1h['MACDs_12_26_9'] df_1h['macd_hist_1h'] = macd_1h['MACDh_12_26_9'] # 验证 MACD 列是否正确生成 #logger.info(f"[{metadata['pair']}] 1小时 MACD 列: {list(macd_1h.columns)}") # 确保 StochRSI 指标已正确计算 # 将 1h 数据重新索引到主时间框架 (3m),并填充缺失值 df_1h = df_1h.set_index('date').reindex(dataframe['date']).ffill().bfill().reset_index() df_1h = df_1h.rename(columns={'index': 'date'}) # Include macd_1h and macd_signal_1h in the column selection df_1h = df_1h[['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h', 'bb_lower_1h', 'bb_upper_1h', 'stochrsi_k_1h', 'stochrsi_d_1h', 'macd_1h', 'macd_signal_1h']].ffill() # Validate that all required columns are present required_columns = ['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h', 'bb_lower_1h', 'bb_upper_1h', 'stochrsi_k_1h', 'stochrsi_d_1h', 'macd_1h', 'macd_signal_1h'] missing_columns = [col for col in required_columns if col not in df_1h.columns] if missing_columns: logger.error(f"[{metadata['pair']}] 缺少以下列: {missing_columns}") raise KeyError(f"缺少以下列: {missing_columns}") # 确保所有需要的列都被合并 required_columns = ['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h', 'bb_lower_1h', 'bb_upper_1h', 'stochrsi_k_1h', 'stochrsi_d_1h', 'macd_1h', 'macd_signal_1h'] # 验证所需列是否存在 missing_columns = [col for col in required_columns if col not in df_1h.columns] if missing_columns: logger.error(f"[{metadata['pair']}] 缺少以下列: {missing_columns}") raise KeyError(f"缺少以下列: {missing_columns}") df_1h = df_1h[required_columns] # 确保包含 macd_1h 和 macd_signal_1h # 合并 1h 数据 dataframe = dataframe.merge(df_1h, how='left', on='date').ffill() # 验证合并后的列 #logger.info(f"[{metadata['pair']}] 合并后的数据框列名: {list(dataframe.columns)}") # K线形态:看涨吞没 dataframe['bullish_engulfing'] = ( (dataframe['close'].shift(1) < dataframe['open'].shift(1)) & (dataframe['close'] > dataframe['open']) & (dataframe['close'] > dataframe['open'].shift(1)) & (dataframe['open'] < dataframe['close'].shift(1)) ) # 计算各时间框架的趋势状态(牛/熊) # 3m时间框架:ema50下穿ema200为熊,上穿为牛 dataframe['trend_3m'] = np.where(dataframe['ema_50_3m'] > dataframe['ema_200_3m'], 1, 0) # 15m时间框架:ema50下穿ema200为熊,上穿为牛 dataframe['trend_15m'] = np.where(dataframe['ema_50_15m'] > dataframe['ema_200_15m'], 1, 0) # 1h时间框架:ema50下穿ema200为熊,上穿为牛 dataframe['trend_1h_ema'] = np.where(dataframe['ema_50_1h'] > dataframe['ema_200_1h'], 1, 0) # 计算熊牛得分(0-100) # 权重:3m熊牛权重10,15m熊牛权重35,1h熊牛权重65 # 计算加权得分 dataframe['market_score'] = ( dataframe['trend_3m'] * 10 + dataframe['trend_15m'] * 35 + dataframe['trend_1h_ema'] * 65 ) # 确保得分在0-100范围内 dataframe['market_score'] = dataframe['market_score'].clip(lower=0, upper=100) # 根据得分分类市场状态 dataframe['market_state'] = 'neutral' dataframe.loc[dataframe['market_score'] > 70, 'market_state'] = 'strong_bull' dataframe.loc[(dataframe['market_score'] > 50) & (dataframe['market_score'] <= 70), 'market_state'] = 'weak_bull' dataframe.loc[(dataframe['market_score'] >= 30) & (dataframe['market_score'] <= 50), 'market_state'] = 'neutral' dataframe.loc[(dataframe['market_score'] > 10) & (dataframe['market_score'] < 30), 'market_state'] = 'weak_bear' dataframe.loc[dataframe['market_score'] <= 10, 'market_state'] = 'strong_bear' # 创建一个使用前一行市场状态的列,避免在populate_entry_trend中使用iloc[-1] dataframe['prev_market_state'] = dataframe['market_state'].shift(1) # 为第一行设置默认值 dataframe['prev_market_state'] = dataframe['prev_market_state'].fillna('neutral') # 记录当前的市场状态 # if len(dataframe) > 0: # current_score = dataframe['market_score'].iloc[-1] # current_state = dataframe['market_state'].iloc[-1] #logger.info(f"[{metadata['pair']}] 熊牛得分: {current_score:.1f}, 市场状态: {current_state}") #logger.info(f"[{metadata['pair']}] 各时间框架趋势: 3m={'牛' if dataframe['trend_3m'].iloc[-1] == 1 else '熊'}, \ # 15m={'牛' if dataframe['trend_15m'].iloc[-1] == 1 else '熊'}, \ # 1h={'牛' if dataframe['trend_1h_ema'].iloc[-1] == 1 else '熊'}") # 调试:打印指标值(最后 5 行),验证时间对齐 #print(f"Pair: {metadata['pair']}, Last 5 rows after reindexing:") #print(dataframe[['date', 'close', 'bb_lower_3m', 'rsi_3m', 'rsi_15m', 'rsi_1h', 'trend_1h', # 'trend_3m', 'trend_15m', 'trend_1h_ema', 'market_score', 'market_state', # 'bullish_engulfing', 'volume', 'volume_ma']].tail(5)) # 打印最终数据框的列名以验证 #logger.info(f"[{metadata['pair']}] 最终数据框列名: {list(dataframe.columns)}") return dataframe def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # 出场信号基于趋势和量价关系 # 确保market_state列存在 if 'market_state' not in dataframe.columns: dataframe['market_state'] = 'neutral' # ======================== 出场条件多维度评分 ======================== # 条件1: 价格突破布林带上轨(使用可优化的偏差参数) breakout_condition = dataframe['close'] >= dataframe['bb_upper_1h'] * self.exit_bb_upper_deviation.value # 条件2: 成交量显著放大(使用可优化的成交量乘数) volume_spike = dataframe['volume'] > dataframe['volume_ma'] * self.exit_volume_multiplier.value # 条件3: MACD 下降趋势 macd_downward = dataframe['macd_1h'] < dataframe['macd_signal_1h'] # 条件4: RSI 进入超买区域(市场自适应) # 根据市场状态调整RSI阈值 def get_exit_rsi_threshold(row): market_state = row.get('market_state', 'neutral') if market_state == 'strong_bull': return self.exit_rsi_threshold.value + 5 # 强牛市提高阈值,让利润奔跑 elif market_state == 'weak_bull': return self.exit_rsi_threshold.value else: return self.exit_rsi_threshold.value - 5 # 弱市降低阈值,及时止盈 rsi_thresholds = dataframe.apply(get_exit_rsi_threshold, axis=1) rsi_overbought = dataframe['rsi_1h'] > rsi_thresholds # 评分计算 condition_score = ( breakout_condition.astype(int) + volume_spike.astype(int) + macd_downward.astype(int) + rsi_overbought.astype(int) ) # 触发条件:至少1个条件满足(从≥2降低到≥1,极致放宽) final_condition = condition_score >= 1 # 设置出场信号 dataframe.loc[final_condition, 'exit_long'] = 1 # 增强调试信息 #logger.info(f"[{metadata['pair']}] 出场条件检查:") #logger.info(f" - 价格突破布林带上轨: {breakout_condition.sum()} 次") #logger.info(f" - 成交量显著放大: {volume_spike.sum()} 次") #logger.info(f" - MACD 下降趋势: {macd_downward.sum()} 次") #logger.info(f" - RSI 超买: {rsi_overbought.sum()} 次") #logger.info(f" - 最终条件: {final_condition.sum()} 次") #logger.info(f" - 使用参数: exit_bb_upper_deviation={self.exit_bb_upper_deviation.value}, exit_volume_multiplier={self.exit_volume_multiplier.value}, rsi_overbought={self.rsi_overbought.value}") return dataframe def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # 确保prev_market_state列存在 if 'prev_market_state' not in dataframe.columns: dataframe['prev_market_state'] = 'neutral' # 条件1: 价格接近布林带下轨(允许一定偏差) close_to_bb_lower_1h = (dataframe['close'] <= dataframe['bb_lower_1h'] * self.bb_lower_deviation.value) # 可优化偏差 # 条件2: RSI 不高于阈值(根据市场状态动态调整) # 为每一行创建动态阈值 rsi_condition_1h = dataframe.apply(lambda row: row['rsi_1h'] < self.rsi_bull_threshold.value if row['prev_market_state'] in ['strong_bull', 'weak_bull'] else row['rsi_1h'] < self.rsi_oversold.value, axis=1) # 条件3: StochRSI 处于超卖区域(根据市场状态动态调整) stochrsi_condition_1h = dataframe.apply(lambda row: (row['stochrsi_k_1h'] < self.stochrsi_bull_threshold.value and row['stochrsi_d_1h'] < self.stochrsi_bull_threshold.value) if row['prev_market_state'] in ['strong_bull', 'weak_bull'] else (row['stochrsi_k_1h'] < self.stochrsi_neutral_threshold.value and row['stochrsi_d_1h'] < self.stochrsi_neutral_threshold.value), axis=1) # 条件4: MACD 上升趋势 macd_condition_1h = dataframe['macd_1h'] > dataframe['macd_signal_1h'] # 条件5: 成交量显著放大(可选条件) volume_spike = dataframe['volume'] > dataframe['volume_ma'] * self.volume_multiplier.value # 条件6: 布林带宽度过滤(避免窄幅震荡) bb_width = (dataframe['bb_upper_1h'] - dataframe['bb_lower_1h']) / dataframe['close'] bb_width_condition = bb_width > self.bb_width_threshold.value / 1000 # 可优化的布林带宽度阈值 # 辅助条件: 3m 和 15m 趋势确认(允许部分时间框架不一致) trend_confirmation = (dataframe['trend_3m'] == 1) | (dataframe['trend_15m'] == 1) # 合并所有条件(减少强制性条件) # 至少满足多个条件中的一定数量 condition_count = ( close_to_bb_lower_1h.astype(int) + rsi_condition_1h.astype(int) + stochrsi_condition_1h.astype(int) + macd_condition_1h.astype(int) + (volume_spike | bb_width_condition).astype(int) + # 成交量或布林带宽度满足其一即可 trend_confirmation.astype(int) ) final_condition = condition_count >= self.min_condition_count.value # 设置入场信号 dataframe.loc[final_condition, 'enter_long'] = 1 # 增强调试信息 #logger.info(f"[{metadata['pair']}] 入场条件检查:") #logger.info(f" - 价格接近布林带下轨: {close_to_bb_lower_1h.sum()} 次") #logger.info(f" - RSI 超卖: {rsi_condition_1h.sum()} 次") #logger.info(f" - StochRSI 超卖: {stochrsi_condition_1h.sum()} 次") #logger.info(f" - MACD 上升趋势: {macd_condition_1h.sum()} 次") #logger.info(f" - 成交量或布林带宽度: {(volume_spike | bb_width_condition).sum()} 次") #logger.info(f" - 趋势确认: {trend_confirmation.sum()} 次") #logger.info(f" - 最终条件: {final_condition.sum()} 次") # 在populate_entry_trend方法末尾添加 # 计算条件间的相关性 conditions = DataFrame({ 'close_to_bb': close_to_bb_lower_1h, 'rsi': rsi_condition_1h, 'stochrsi': stochrsi_condition_1h, 'macd': macd_condition_1h, 'vol_bb': (volume_spike | bb_width_condition), 'trend': trend_confirmation }) correlation = conditions.corr().mean().mean() #logger.info(f"[{metadata['pair']}] 条件平均相关性: {correlation:.2f}") # 日志记录 # if dataframe['enter_long'].sum() > 0: # logger.info(f"[{metadata['pair']}] 发现入场信号数量: {dataframe['enter_long'].sum()}") return dataframe def detect_h1_rapid_rise(self, pair: str) -> bool: """ 检测1小时K线图上的剧烈拉升情况(轻量级版本,用于confirm_trade_entry) 参数: - pair: 交易对 返回: - bool: 是否处于不稳固区域 """ try: # 获取1小时K线数据 df_1h = self.dp.get_pair_dataframe(pair=pair, timeframe='1h') # 获取当前优化参数值 max_candles = self.h1_max_candles.value rapid_rise_threshold = self.h1_rapid_rise_threshold.value max_consecutive_candles = self.h1_max_consecutive_candles.value # 确保有足够的K线数据 if len(df_1h) < max_candles: logger.warning(f"[{pair}] 1h K线数据不足 {max_candles} 根,当前只有 {len(df_1h)} 根,无法完整检测剧烈拉升") return False # 获取最近的K线 recent_data = df_1h.iloc[-max_candles:].copy() # 检查连续最多几根K线内的最大涨幅 rapid_rise_detected = False max_rise = 0 for i in range(len(recent_data) - max_consecutive_candles + 1): window_data = recent_data.iloc[i:i + max_consecutive_candles] window_low = window_data['low'].min() window_high = window_data['high'].max() # 计算区间内的最大涨幅 if window_low > 0: rise_percentage = (window_high - window_low) / window_low if rise_percentage > max_rise: max_rise = rise_percentage # 检查是否超过阈值 if rise_percentage >= rapid_rise_threshold: rapid_rise_detected = True #logger.info(f"[{pair}] 检测到剧烈拉升: 从 {window_low:.2f} 到 {window_high:.2f} ({rise_percentage:.2%}) 在 {max_consecutive_candles} 根K线内") break current_price = recent_data['close'].iloc[-1] #logger.info(f"[{pair}] 剧烈拉升检测结果: {'不稳固' if rapid_rise_detected else '稳固'}") #logger.info(f"[{pair}] 最近最大涨幅: {max_rise:.2%}") return rapid_rise_detected except Exception as e: logger.error(f"[{pair}] 剧烈拉升检测过程中发生错误: {str(e)}") return False def confirm_trade_entry( self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str, current_time: datetime, entry_tag: str | None, side: str, **kwargs, ) -> bool: """ 交易买入前的确认函数,用于最终决定是否执行交易 此处实现剧烈拉升检查逻辑 """ # 默认允许交易 allow_trade = True # 仅对多头交易进行检查 if side == 'long': # 检查是否处于剧烈拉升的不稳固区域 is_unstable_region = self.detect_h1_rapid_rise(pair) if is_unstable_region: #logger.info(f"[{pair}] 由于检测到剧烈拉升,取消入场交易") allow_trade = False # 如果没有阻止因素,允许交易 return allow_trade def _check_add_position_conditions(self, pair: str, current_rate: float, current_profit: float, entry_count: int, initial_price: float, dataframe) -> dict: """ 检查加仓条件的多维度评分系统 返回: {'should_add': bool, 'score': float, 'reasons': list} """ try: if dataframe is None or len(dataframe) < 30: return {'should_add': False, 'score': 0, 'reasons': ['数据不足']} last_candle = dataframe.iloc[-1] reasons = [] score = 0.0 max_score = 6.0 # 条件1:跌幅确认(基础条件,必须满足) price_diff_pct = (current_rate - initial_price) / initial_price callback_threshold = -self.add_position_callback.value if price_diff_pct <= callback_threshold: score += 1.0 reasons.append(f"✓ 跌幅{price_diff_pct:.2%}≤{callback_threshold:.2%}") else: return {'should_add': False, 'score': 0, 'reasons': [f'✗ 跌幅不足: {price_diff_pct:.2%} > {callback_threshold:.2%}']} # 条件2:RSI超卖确认 rsi_1h = last_candle.get('rsi_1h', 50) if rsi_1h < self.add_rsi_oversold_threshold.value: score += 1.0 reasons.append(f"✓ RSI超卖: {rsi_1h:.1f} < {self.add_rsi_oversold_threshold.value}") else: reasons.append(f"✗ RSI不超卖: {rsi_1h:.1f} ≥ {self.add_rsi_oversold_threshold.value}") # 条件3:StochRSI双线低位确认 stochrsi_k = last_candle.get('stochrsi_k_1h', 50) stochrsi_d = last_candle.get('stochrsi_d_1h', 50) if stochrsi_k < self.add_stochrsi_oversold.value and stochrsi_d < self.add_stochrsi_oversold.value: score += 1.0 reasons.append(f"✓ StochRSI双超卖: K={stochrsi_k:.1f}, D={stochrsi_d:.1f}") else: reasons.append(f"✗ StochRSI未双超卖: K={stochrsi_k:.1f}, D={stochrsi_d:.1f}") # 条件4:MACD上升确认(底部反转信号) macd_1h = last_candle.get('macd_1h', 0) macd_signal_1h = last_candle.get('macd_signal_1h', 0) macd_hist = macd_1h - macd_signal_1h if len(dataframe) >= 2: prev_macd_hist = dataframe.iloc[-2].get('macd_1h', 0) - dataframe.iloc[-2].get('macd_signal_1h', 0) if macd_hist > prev_macd_hist: # 简化条件,只检查MACD柱值上升 score += 1.0 reasons.append(f"✓ MACD底部上升: 柱值={macd_hist:.6f}") else: reasons.append(f"✗ MACD未确认: 柱值={macd_hist:.6f}") # 条件5:布林带下轨支撑确认 bb_lower = last_candle.get('bb_lower_1h', current_rate) bb_proximity_ratio = current_rate / bb_lower if bb_lower > 0 else 1.0 if bb_proximity_ratio <= self.add_bb_lower_proximity.value: score += 1.0 reasons.append(f"✓ 接近BB下轨: 比例={bb_proximity_ratio:.4f}") else: reasons.append(f"✗ 离BB下轨太远: 比例={bb_proximity_ratio:.4f}") # 条件6:成交量放大确认(简化条件) volume = last_candle.get('volume', 0) volume_ma = last_candle.get('volume_ma', 1) if volume > volume_ma * 1.2: # 固定1.2倍成交量确认 score += 1.0 reasons.append(f"✓ 成交量放大: {volume:.0f} > {volume_ma * 1.2:.0f}") else: reasons.append(f"✗ 成交量不足: {volume:.0f} ≤ {volume_ma * 1.2:.0f}") # 条件7:市场状态过滤(强熊市禁止加仓) market_state = last_candle.get('market_state', 'neutral') if market_state != 'strong_bear': score += 0.5 reasons.append(f"✓ 市场状态良好: {market_state}") else: reasons.append(f"✗ 强熊市,避免加仓: {market_state}") return {'should_add': False, 'score': score/max_score, 'reasons': reasons} # 综合判断(极致放宽条件) condition_met = sum(1 for r in reasons if r.startswith('✓')) >= 2 # 从≥3降低到≥2(极致放宽) score_ratio = score / max_score should_add = condition_met and score_ratio >= 0.35 # 从0.5降低到0.35(极致放宽) return { 'should_add': should_add, 'score': score_ratio, 'reasons': reasons, 'condition_met': condition_met } except Exception as e: logger.error(f"[{pair}] 加仓条件检查出错: {str(e)}") return {'should_add': False, 'score': 0, 'reasons': [f'错误: {str(e)}']} def _calculate_add_position_amount(self, trade: 'Trade', entry_count: int, min_stake: float, max_stake: float) -> float: """ 智能计算加仓金额(支持递减策略) - 早期加仓金额较大,后期逐步减小 - 防止后期加仓金额过大导致爆仓 """ try: initial_stake = float(trade.orders[0].cost) # 基础公式:(adjust_multiplier × initial_stake) ^ entry_count base_amount = (self.adjust_multiplier.value * initial_stake) ** entry_count # 应用递减系数(后续加仓金额逐步缩小) # 第1次加仓: 100% × 基础金额 # 第2次加仓: 75% × 基础金额 # 第3次加仓: 56% × 基础金额 decrease_ratio = self.add_position_decrease_ratio.value ** (entry_count - 1) adjusted_amount = base_amount * decrease_ratio # 安全校验 current_stake = float(trade.stake_amount) remaining_capacity = max_stake - current_stake # 加仓金额不能超过剩余容量的80%(留余量) adjusted_amount = min(adjusted_amount, remaining_capacity * 0.8) adjusted_amount = max(min_stake, min(adjusted_amount, max_stake - current_stake)) return adjusted_amount except Exception as e: logger.error(f"[{trade.pair}] 加仓金额计算出错: {str(e)}") return 0.0 def adjust_trade_position(self, trade: 'Trade', current_time, current_rate: float, current_profit: float, min_stake: float, max_stake: float, **kwargs) -> float: """ 增强版持仓调整逻辑:加仓精准度 + 递减策略 + 减仓优化 """ pair = trade.pair # ========================== 分级止盈减仓逻辑(增强版) ========================== if current_profit > 0: reduce_count = len(trade.select_filled_orders(trade.exit_side)) if reduce_count >= self.max_reduce_adjustments.value: return 0.0 dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) current_kline_time = dataframe.iloc[-1]['date'].strftime('%Y-%m-%d %H:%M:%S') last_reduce_kline = trade.get_custom_data("last_reduce_kline") if last_reduce_kline == current_kline_time: return 0.0 initial_stake = float(trade.orders[0].cost) current_stake = float(trade.stake_amount) # 分级止盈逻辑(3级) # 第1级:达到exit_profit_tier1时,减仓exit_reduce_tier1比例 if current_profit >= self.exit_profit_tier1.value: if reduce_count < 1: reduce_amount = current_stake * self.exit_reduce_tier1.value reduce_amount = -min(reduce_amount, current_stake * 0.5) # 单次最多减仓50% #logger.info(f"[{pair}] 分级止盈第1级: 盈利{current_profit:.2%}, " # f"减仓比例{self.exit_reduce_tier1.value:.1%}, 金额{abs(reduce_amount):.2f}") trade.set_custom_data("last_reduce_kline", current_kline_time) return max(-current_stake, reduce_amount) # 第2级:达到exit_profit_tier2时,减仓exit_reduce_tier2比例 if current_profit >= self.exit_profit_tier2.value: if reduce_count < 2: reduce_amount = current_stake * self.exit_reduce_tier2.value reduce_amount = -min(reduce_amount, current_stake * 0.3) # 单次最多减仓30% #logger.info(f"[{pair}] 分级止盈第2级: 盈利{current_profit:.2%}, " # f"减仓比例{self.exit_reduce_tier2.value:.1%}, 金额{abs(reduce_amount):.2f}") trade.set_custom_data("last_reduce_kline", current_kline_time) return max(-current_stake, reduce_amount) # 基础止盈(保持原有逻辑) if current_profit >= self.reduce_profit_base.value: reduce_amount = (float(self.reduce_coefficient.value) * initial_stake) ** (reduce_count + 1) reduce_amount = min(reduce_amount, current_stake * 0.2) # 单次最多减仓20% reduce_amount = -reduce_amount reduce_amount = max(-current_stake, min(reduce_amount, -float(min_stake))) #logger.info(f"[{pair}] 基础止盈: 盈利{current_profit:.2%}, 第{reduce_count+1}次, " # f"金额{abs(reduce_amount):.2f}") trade.set_custom_data("last_reduce_kline", current_kline_time) return reduce_amount return 0.0 # ========================== 增强版加仓逻辑 ========================== entry_count = len(trade.orders) if entry_count > self.max_entry_adjustments.value: return 0.0 initial_price = trade.open_rate if initial_price == 0: return 0.0 # 获取数据框 dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) if dataframe is None or len(dataframe) < 30: return 0.0 # 检查加仓条件(多维度评分) condition_check = self._check_add_position_conditions(pair, current_rate, current_profit, entry_count, initial_price, dataframe) if not condition_check['should_add']: return 0.0 # 周期限制:每个timeframe仅加仓一次 current_kline_time = dataframe.iloc[-1]['date'].strftime('%Y-%m-%d %H:%M:%S') last_add_kline = trade.get_custom_data("last_add_kline") if last_add_kline == current_kline_time: return 0.0 # 计算加仓金额 additional_stake = self._calculate_add_position_amount(trade, entry_count, min_stake, max_stake) if additional_stake > 0: #logger.info(f"[{pair}] 加仓触发: 第{entry_count+1}次, 金额{additional_stake:.2f}, 评分{condition_check['score']:.2f}") trade.set_custom_data("last_add_kline", current_kline_time) return additional_stake return 0.0 def custom_stoploss(self, pair: str, trade: 'Trade', current_time, current_rate: float, current_profit: float, **kwargs) -> float: # 动态止损基于ATR dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1] atr = last_candle['atr'] # 获取当前市场状态 current_state = dataframe['market_state'].iloc[-1] if 'market_state' in dataframe.columns else 'unknown' # 渐进式止损策略(盈利越高,止损范围越大) if current_profit > 0.05: # 利润超过5%时 return -3.0 * atr / current_rate # 大幅扩大止损范围,让利润奔跑 elif current_profit > 0.03: # 利润超过3%时 return -2.5 * atr / current_rate # 中等扩大止损范围 elif current_profit > 0.01: # 利润超过1%时 return -2.0 * atr / current_rate # 轻微扩大止损范围 # 在强劲牛市中,即使小亏损也可以容忍更大回调 if current_state == 'strong_bull' and current_profit > -0.01: return -1.5 * atr / current_rate # 基础止损 if atr > 0: return -1.2 * atr / current_rate # 基础1.2倍ATR止损 return self.stoploss