import logging from freqtrade.strategy import IStrategy, IntParameter, DecimalParameter from pandas import DataFrame import pandas_ta as ta from freqtrade.persistence import Trade import numpy as np import datetime import pandas as pd import math # 设置pandas选项以解决FutureWarning警告 pd.set_option('future.no_silent_downcasting', True) logger = logging.getLogger(__name__) class MartinGale(IStrategy): # 策略参数 loglevel = "warning" minimal_roi = {} # 启用自定义ROI回调函数 use_custom_roi = True # 固定止损设置为与OKX App一致 stoploss = -0.12 # 对应OKX App中的12.00%止损 trailing_stop = True trailing_stop_positive_offset = 0.005 # 跟踪止损偏移量 0.5% # 用于跟踪市场状态的数据框缓存 _dataframe_cache = None def __init__(self, config=None): """初始化策略参数""" super().__init__(config) self._trailing_stop_positive_default = 0.004 # 降低默认值以更容易触发跟踪止盈 @property def protections(self): """ 保护机制配置 基于最新Freqtrade规范,保护机制应定义在策略文件中而非配置文件 """ return [ { "method": "StoplossGuard", "lookback_period_candles": 60, # 3小时回看期(60根3分钟K线) "trade_limit": 2, # 最多2笔止损交易 "stop_duration_candles": 60, # 暂停180分钟(60根3分钟K线) "only_per_pair": False # 仅针对单个币对 }, { "method": "CooldownPeriod", "stop_duration_candles": 2 # 6分钟冷却期(2根3分钟K线) }, { "method": "MaxDrawdown", "lookback_period_candles": 48, # 2.4小时回看期 "trade_limit": 4, # 4笔交易限制 "stop_duration_candles": 24, # 72分钟暂停(24根3分钟K线) "max_allowed_drawdown": 0.20 # 20%最大回撤容忍度 } ] @property def trailing_stop_positive(self): """根据市场状态动态调整跟踪止盈参数""" # 获取当前市场状态 if self._dataframe_cache is not None and len(self._dataframe_cache) > 0: current_state = self._dataframe_cache['market_state'].iloc[-1] if current_state == 'strong_bull': return 0.007 # 强劲牛市中降低跟踪止盈,让利润奔跑 elif current_state == 'weak_bull': return 0.005 # 弱势牛市中保持较低的跟踪止盈 return self._trailing_stop_positive_default # 返回默认值 @trailing_stop_positive.setter def trailing_stop_positive(self, value): """设置trailing_stop_positive的默认值""" self._trailing_stop_positive_default = value # 时间框架设置 - 主时间框架为3分钟,与OKX App一致 timeframe = "3m" # 主时间框架为 3 分钟 can_short = False # 禁用做空 # 自定义指标参数 - 设置为可通过Hyperopt优化 # 从OKX App中获取的参数 rsi_length = IntParameter(7, 21, default=14, optimize=True, load=True, space='buy') # RSI周期为14,可优化范围7-21 rsi_oversold = IntParameter(20, 40, default=30, optimize=True, load=True, space='buy') # RSI触发阈值为30,可优化范围20-40 # 马丁格尔策略参数 - 基于OKX App界面设置 max_entry_adjustments = IntParameter(5, 15, default=10, optimize=True, load=True, space='buy') # 最大加仓次数10次,可优化范围5-15 add_position_callback = DecimalParameter(0.005, 0.015, decimals=3, default=0.0066, optimize=True, load=True, space='buy') # 跌幅加仓阈值0.66%,可优化范围0.5%-1.5% # 马丁格尔加仓比例参数 step_coefficient = DecimalParameter(1.0, 1.1, decimals=2, default=1.05, optimize=True, load=True, space='buy') # 加仓比例1.05倍,可优化范围1.0-1.1 stake_divisor = DecimalParameter(0.9, 1.1, decimals=2, default=1.0, optimize=True, load=True, space='buy') # 初始比例,可优化范围0.9-1.1 # 止盈目标参数 - 对应OKX App中的单周期止盈目标2.50% take_profit_target = DecimalParameter(0.02, 0.03, decimals=3, default=0.025, optimize=True, load=True, space='sell') # 止盈目标2.5%,可优化范围2.0%-3.0% def informative_pairs(self): """定义辅助时间框架""" pairs = self.dp.current_whitelist() # 添加15m和1h作为辅助时间框架 return [(pair, '15m') for pair in pairs] + [(pair, '1h') for pair in pairs] def custom_stake_amount(self, pair: str, current_time: pd.Timestamp, current_rate: float, proposed_stake: float, min_stake: float, max_stake: float, **kwargs) -> float: """自定义下单金额""" # 初始资金设置 initial_balance = self.config.get('dry_run_wallet', 10000) # 固定使用初始资金的5%作为初次下单金额(在OKX App范围内) desired_stake = initial_balance * 0.05 desired_stake = math.floor(desired_stake) # 取整 # 确保在OKX App设定的范围内(2-50,000 USDT) return max(min(desired_stake, 50000, max_stake), 2, min_stake) def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: """计算各种技术指标""" # 计算3m周期的指标 rsi_length_value = self.rsi_length.value # 计算3m RSI指标 dataframe['rsi_3m'] = ta.rsi(dataframe['close'], length=rsi_length_value) # 计算3m时间框架的EMA50和EMA200 dataframe['ema_50_3m'] = ta.ema(dataframe['close'], length=50) dataframe['ema_200_3m'] = ta.ema(dataframe['close'], length=200) # 成交量过滤 dataframe['volume_ma'] = dataframe['volume'].rolling(20).mean() # 计算ATR用于动态止损 dataframe['atr'] = ta.atr(dataframe['high'], dataframe['low'], dataframe['close'], length=14) # 获取15m数据 df_15m = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='15m') df_15m['rsi_15m'] = ta.rsi(df_15m['close'], length=rsi_length_value) # 计算15m时间框架的EMA50和EMA200 df_15m['ema_50_15m'] = ta.ema(df_15m['close'], length=50) df_15m['ema_200_15m'] = ta.ema(df_15m['close'], length=200) # 将15m数据重新索引到主时间框架(3m) df_15m = df_15m.set_index('date').reindex(dataframe['date']).reset_index() df_15m = df_15m.rename(columns={'index': 'date'}) df_15m = df_15m[['date', 'rsi_15m', 'ema_50_15m', 'ema_200_15m']].ffill() # 合并15m数据 dataframe = dataframe.merge(df_15m, how='left', on='date') # 获取1h数据 df_1h = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='1h') # 计算1h RSI和EMA df_1h['rsi_1h'] = ta.rsi(df_1h['close'], length=rsi_length_value) df_1h['ema_50_1h'] = ta.ema(df_1h['close'], length=50) # 1h 50周期EMA df_1h['ema_200_1h'] = ta.ema(df_1h['close'], length=200) # 1h 200周期EMA # 将1h数据重新索引到主时间框架(3m),并填充缺失值 df_1h = df_1h.set_index('date').reindex(dataframe['date']).ffill().bfill().reset_index() # 计算1h上涨趋势,确保不包含None值 df_1h['trend_1h'] = df_1h['close'] > df_1h['ema_50_1h'] # 1h上涨趋势 df_1h = df_1h.rename(columns={'index': 'date'}) df_1h = df_1h[['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h']].ffill() # 合并1h数据 dataframe = dataframe.merge(df_1h, how='left', on='date').ffill() # 计算各时间框架的趋势状态(牛/熊) # 3m时间框架:ema50下穿ema200为熊,上穿为牛 dataframe['trend_3m'] = np.where(dataframe['ema_50_3m'] > dataframe['ema_200_3m'], 1, 0) # 15m时间框架:ema50下穿ema200为熊,上穿为牛 dataframe['trend_15m'] = np.where(dataframe['ema_50_15m'] > dataframe['ema_200_15m'], 1, 0) # 1h时间框架:ema50下穿ema200为熊,上穿为牛 dataframe['trend_1h_ema'] = np.where(dataframe['ema_50_1h'] > dataframe['ema_200_1h'], 1, 0) # 计算熊牛得分(0-100) # 权重:3m熊牛权重10,15m熊牛权重35,1h熊牛权重65 dataframe['market_score'] = ( dataframe['trend_3m'] * 10 + dataframe['trend_15m'] * 35 + dataframe['trend_1h_ema'] * 65 ) # 确保得分在0-100范围内 dataframe['market_score'] = dataframe['market_score'].clip(lower=0, upper=100) # 根据得分分类市场状态 dataframe['market_state'] = 'neutral' dataframe.loc[dataframe['market_score'] > 70, 'market_state'] = 'strong_bull' dataframe.loc[(dataframe['market_score'] > 50) & (dataframe['market_score'] <= 70), 'market_state'] = 'weak_bull' dataframe.loc[(dataframe['market_score'] >= 30) & (dataframe['market_score'] <= 50), 'market_state'] = 'neutral' dataframe.loc[(dataframe['market_score'] > 10) & (dataframe['market_score'] < 30), 'market_state'] = 'weak_bear' dataframe.loc[dataframe['market_score'] <= 10, 'market_state'] = 'strong_bear' # 创建一个使用前一行市场状态的列 dataframe['prev_market_state'] = dataframe['market_state'].shift(1) # 为第一行设置默认值 dataframe['prev_market_state'] = dataframe['prev_market_state'].fillna('neutral') # 保存数据框缓存用于trailing_stop_positive计算 self._dataframe_cache = dataframe return dataframe def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: """设置出场条件""" # 基于固定止盈目标(2.50%) # 注意:实际止盈将在custom_exit中实现 # 当RSI进入超买区域时考虑出场 rsi_overbought = dataframe['rsi_1h'] > 70 # 设置出场信号 dataframe.loc[rsi_overbought, 'exit_long'] = 1 return dataframe def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: """设置入场条件""" # 确保prev_market_state列存在 if 'prev_market_state' not in dataframe.columns: dataframe['prev_market_state'] = 'neutral' # 基于OKX App的RSI条件:RSI-14向下穿过30阈值 # 条件1: RSI处于超卖区域(根据市场状态动态调整) rsi_condition = dataframe.apply(lambda row: row['rsi_3m'] < self.rsi_oversold.value if row['prev_market_state'] in ['strong_bull', 'weak_bull'] else row['rsi_3m'] < self.rsi_oversold.value, axis=1) # 条件2: RSI向下穿过阈值(交叉检测) rsi_cross_down = (dataframe['rsi_3m'] < self.rsi_oversold.value) & (dataframe['rsi_3m'].shift(1) >= self.rsi_oversold.value) # 条件3: 成交量放大确认信号 volume_spike = dataframe['volume'] > dataframe['volume_ma'] * 1.2 # 条件4: 至少有一个时间框架的趋势确认 trend_confirmation = (dataframe['trend_3m'] == 1) | (dataframe['trend_15m'] == 1) | (dataframe['trend_1h_ema'] == 1) # 合并所有条件 final_condition = rsi_cross_down & volume_spike & trend_confirmation # 设置入场信号 dataframe.loc[final_condition, 'enter_long'] = 1 # 日志记录 # if dataframe['enter_long'].sum() > 0: # logger.info(f"[{metadata['pair']}] 发现入场信号数量: {dataframe['enter_long'].sum()}") return dataframe def custom_stoploss(self, pair: str, trade: 'Trade', current_time, current_rate: float, current_profit: float, **kwargs) -> float: """自定义动态止损""" # 动态止损基于ATR dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1] atr = last_candle['atr'] # 获取当前市场状态 current_state = dataframe['market_state'].iloc[-1] if 'market_state' in dataframe.columns else 'unknown' # 更激进的渐进式止损策略 if current_profit > 0.05: # 利润超过5%时 return -3.0 * atr / current_rate # 更大幅扩大止损范围,让利润奔跑 elif current_profit > 0.03: # 利润超过3%时 return -2.5 * atr / current_rate # 更中等扩大止损范围 elif current_profit > 0.01: # 利润超过1%时 return -2.0 * atr / current_rate # 更轻微扩大止损范围 # 在强劲牛市中,即使小亏损也可以容忍更大回调 if current_state == 'strong_bull' and current_profit > -0.01: return -1.8 * atr / current_rate if atr > 0: return -1.2 * atr / current_rate # 基础1.2倍ATR止损 return self.stoploss def custom_exit(self, pair: str, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, **kwargs) -> float: """自定义出场逻辑,实现固定止盈目标""" if trade.is_short: return 0.0 # 基于OKX App的单周期止盈目标2.50% if current_profit >= self.take_profit_target.value: return 1.0 # 全额出场 # 未达到止盈目标,不出场 return 0.0 def adjust_trade_position(self, trade: 'Trade', current_time, current_rate: float, current_profit: float, min_stake: float, max_stake: float, **kwargs) -> float: """ 实现马丁格尔加仓逻辑 - 基于OKX App的参数:跌幅0.66%加仓,最大10次加仓,1.05倍加仓比例 """ # 获取当前交易对 pair = trade.pair # 获取当前交易的加仓次数 entry_count = len(trade.orders) # 获取所有入场订单数量 # 如果已经达到最大加仓次数,则不再加仓 if entry_count - 1 >= self.max_entry_adjustments.value: logger.info(f"[{pair}] 已达到最大加仓次数 {self.max_entry_adjustments.value},停止加仓") return 0.0 # 获取初始入场价格和当前价格的差值百分比 initial_price = trade.open_rate if initial_price == 0: return 0.0 price_diff_pct = (current_rate - initial_price) / initial_price # 检查价格回调是否达到加仓间隔(OKX App中的0.66%跌幅) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) current_state = dataframe['market_state'].iloc[-1] if 'market_state' in dataframe.columns else 'neutral' if price_diff_pct <= -self.add_position_callback.value and current_state not in ['bear', 'weak_bear']: # 计算初始入场金额 initial_stake = trade.orders[0].cost # 第一笔订单的成本 # 计算加仓次数(从1开始计数) adjustment_count = entry_count - 1 # 已加仓次数 # 计算加仓金额: (initial_stake * step_coefficient) ^ (adjustment_count + 1) / stake_divisor # 实现OKX App中的1.05倍加仓比例 additional_stake = (self.step_coefficient.value * initial_stake / self.stake_divisor.value) ** (adjustment_count + 1) # 确保加仓金额在允许的范围内(≥2 USDT,与OKX App一致) additional_stake = max(min_stake, min(additional_stake, max_stake - trade.stake_amount)) logger.info(f"[{pair}] 触发加仓: 第{adjustment_count + 1}次加仓, 初始金额{initial_stake:.2f}, \ 加仓金额{additional_stake:.2f}, 价格差{price_diff_pct:.2%}, 当前利润{current_profit:.2%}") return additional_stake # 不符合加仓条件,返回0 return 0.0