问题依旧

This commit is contained in:
zhangkun9038@dingtalk.com 2025-10-12 16:43:54 +00:00
parent 09751fd19c
commit 4acca7500f
2 changed files with 275 additions and 153 deletions

View File

@ -17,50 +17,24 @@ class FreqaiPrimer(IStrategy):
# 策略参数 - 使用custom_roi替代minimal_roi字典
loglevel = "warning"
minimal_roi = {}
# 启用自定义ROI回调函数
use_custom_roi = True
stoploss = -0.15 # 固定止损 -15% (大幅放宽止损以承受更大波动)
trailing_stop = True
trailing_stop_positive_offset = 0.005 # 追踪止损偏移量 0.5% (更容易触发跟踪止盈)
# 用于跟踪市场状态的数据框缓存
_dataframe_cache = None
def __init__(self, config=None):
"""初始化策略参数调用父类初始化方法并接受config参数"""
super().__init__(config) # 调用父类的初始化方法并传递config
# 存储从配置文件加载的默认值
self._trailing_stop_positive_default = 0.004 # 降低默认值以更容易触发跟踪止盈
@property
def protections(self):
"""
保护机制配置
基于最新Freqtrade规范保护机制应定义在策略文件中而非配置文件
"""
return [
{
"method": "StoplossGuard",
"lookback_period_candles": 60, # 3小时回看期60根3分钟K线
"trade_limit": 2, # 最多2笔止损交易
"stop_duration_candles": 60, # 暂停180分钟60根3分钟K线
"only_per_pair": False # 仅针对单个币对
},
{
"method": "CooldownPeriod",
"stop_duration_candles": 2 # 6分钟冷却期2根3分钟K线
},
{
"method": "MaxDrawdown",
"lookback_period_candles": 48, # 2.4小时回看期
"trade_limit": 4, # 4笔交易限制
"stop_duration_candles": 24, # 72分钟暂停24根3分钟K线
"max_allowed_drawdown": 0.20 # 20%最大回撤容忍度
}
]
@property
def trailing_stop_positive(self):
"""根据市场状态动态调整跟踪止盈参数"""
@ -72,12 +46,12 @@ class FreqaiPrimer(IStrategy):
elif current_state == 'weak_bull':
return 0.005 # 弱势牛市中保持较低的跟踪止盈
return self._trailing_stop_positive_default # 返回默认值
@trailing_stop_positive.setter
def trailing_stop_positive(self, value):
"""设置trailing_stop_positive的默认值"""
self._trailing_stop_positive_default = value
timeframe = "3m" # 主时间框架为 3 分钟
can_short = False # 禁用做空
@ -86,9 +60,9 @@ class FreqaiPrimer(IStrategy):
bb_std = DecimalParameter(1.5, 3.0, decimals=1, default=2.0, optimize=False, load=True, space='buy')
rsi_length = IntParameter(7, 21, default=14, optimize=False, load=True, space='buy')
rsi_oversold = IntParameter(30, 50, default=42, optimize=True, load=True, space='buy')
# 入场条件阈值参数
bb_lower_deviation = DecimalParameter(1.01, 1.05, decimals=2, default=1.03, optimize=True, load=True, space='buy')
rsi_bull_threshold = IntParameter(45, 55, default=50, optimize=False, load=True, space='buy')
@ -97,7 +71,7 @@ class FreqaiPrimer(IStrategy):
volume_multiplier = DecimalParameter(1.2, 2.0, decimals=1, default=1.5, optimize=False, load=True, space='buy')
bb_width_threshold = DecimalParameter(0.01, 0.03, decimals=3, default=0.02, optimize=False, load=True, space='buy')
min_condition_count = IntParameter(2, 4, default=3, optimize=True, load=True, space='buy')
# 剧烈拉升检测参数 - 使用Hyperopt可优化参数
h1_max_candles = IntParameter(100, 300, default=200, optimize=False, load=True, space='buy')
h1_rapid_rise_threshold = DecimalParameter(0.05, 0.15, decimals=3, default=0.11, optimize=True, load=True, space='buy')
@ -107,7 +81,7 @@ class FreqaiPrimer(IStrategy):
add_position_callback = DecimalParameter(0.03, 0.06, decimals=3, default=0.053, optimize=True, load=True, space='buy') # 加仓回调百分比
stake_divisor = DecimalParameter(2, 4, decimals=3, default=2.867, optimize=True, load=True, space='buy') # 加仓金额分母
step_coefficient = DecimalParameter(0.5, 1.5, decimals=2, default=0.92, optimize=True, load=True, space='buy') # 加仓金额分母
# 线性ROI参数 - 用于线性函数: y = (a * (x + k)) + t
roi_param_a = DecimalParameter(-0.0002, -0.00005, decimals=5, default=-0.0001, optimize=True, load=True, space='sell') # 系数a
roi_param_k = IntParameter(20, 150, default=50, optimize=True, load=True, space='sell') # 偏移量k
@ -131,7 +105,7 @@ class FreqaiPrimer(IStrategy):
missing_columns = [col for col in required_columns if col not in dataframe.columns]
if missing_columns:
logger.warning(f"[{metadata['pair']}] 数据框中缺少以下列: {missing_columns}")
def custom_stake_amount(self, pair: str, current_time: pd.Timestamp,
current_rate: float,
proposed_stake: float,
@ -145,35 +119,99 @@ class FreqaiPrimer(IStrategy):
desired_stake = math.floor(desired_stake) # 取整,去掉小数点后的数字
return max(min(desired_stake, max_stake), min_stake)
def generate_dip_signal(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
生成闪崩信号
参数:
- dataframe: 包含OHLCV数据的DataFrame
- metadata: 包含交易对信息的字典
返回:
- 处理后的DataFrame添加了闪崩信号列
"""
# 初始化闪崩信号列为0
dataframe['enter_long_dip_test'] = 0
# 安全检查:确保必要的列存在
required_columns = ['high', 'close', 'open', 'rsi_3m']
if not all(col in dataframe.columns for col in required_columns):
missing_cols = [col for col in required_columns if col not in dataframe.columns]
logger.warning(f"[{metadata.get('pair', 'Unknown')}] 缺少必要的列来生成闪崩信号: {missing_cols}")
return dataframe
# 只有当有足够的K线数据时才尝试生成信号
if len(dataframe) < 6: # 至少需要6根3m K线来检测闪崩
logger.debug(f"[{metadata.get('pair', 'Unknown')}] K线数量不足({len(dataframe)}根K线),无法生成闪崩信号")
return dataframe
# 计算6根K线内的最大跌幅从高点到当前收盘价
# 使用rolling窗口计算每个K线位置的最大涨幅
dataframe['rolling_high'] = dataframe['high'].rolling(window=6).max()
dataframe['dip_percentage'] = (dataframe['close'] - dataframe['rolling_high']) / dataframe['rolling_high'] * 100
# 定义闪崩信号条件6根K线内价格下跌超过3%、RSI<30、收盘价高于开盘价阳线
flash_crash_condition = (
(dataframe['dip_percentage'] < -3) & # 跌幅超过3%
(dataframe['rsi_3m'] < 30) #& # RSI超卖
#(dataframe['close'] > dataframe['open']) # 收阳线,有反弹迹象
)
# 设置闪崩信号
dataframe.loc[flash_crash_condition, 'enter_long_dip_test'] = 1
# 记录闪崩信号的生成
if dataframe['enter_long_dip_test'].sum() > 0:
# 找出最近一次闪崩信号的时间
last_crash_idx = dataframe.index[dataframe['enter_long_dip_test'] == 1][-1]
last_crash_time = dataframe.loc[last_crash_idx, 'date']
logger.info(f"[{metadata.get('pair', 'Unknown')}] 闪崩信号生成: 共 {dataframe['enter_long_dip_test'].sum()} 个信号,最近一次在 {last_crash_time}")
# 记录每个闪崩信号的详细信息
flash_crash_indices = dataframe.index[dataframe['enter_long_dip_test'] == 1]
for idx in flash_crash_indices:
date_str = dataframe.loc[idx, 'date']
price = dataframe.loc[idx, 'close']
rsi = dataframe.loc[idx, 'rsi_3m']
dip_percent = dataframe.loc[idx, 'dip_percentage']
logger.debug(f"[{metadata.get('pair', 'Unknown')}] 闪崩详情: 时间={date_str}, 价格={price:.4f}, RSI={rsi:.1f}, 跌幅={dip_percent:.2f}%")
else:
logger.debug(f"[{metadata.get('pair', 'Unknown')}] 未检测到闪崩信号")
logger.debug(f"[BTC/USDT] 当前K线: 时间={dataframe['date'].iloc[-1]}, "
f"收盘={dataframe['close'].iloc[-1]:.2f}, "
f"跌幅={dataframe['dip_percentage'].iloc[-1]:.2f}%, "
f"RSI={dataframe['rsi_3m'].iloc[-1]:.1f}, "
f"是否阳线={dataframe['close'].iloc[-1] > dataframe['open'].iloc[-1]}")
return dataframe
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 计算 3m 周期的指标
bb_length_value = self.bb_length.value
bb_std_value = self.bb_std.value
rsi_length_value = self.rsi_length.value
bb_3m = ta.bbands(dataframe['close'], length=bb_length_value, std=bb_std_value)
dataframe['bb_lower_3m'] = bb_3m[f'BBL_{bb_length_value}_{bb_std_value}']
dataframe['bb_upper_3m'] = bb_3m[f'BBU_{bb_length_value}_{bb_std_value}']
dataframe['rsi_3m'] = ta.rsi(dataframe['close'], length=rsi_length_value)
# 新增 StochRSI 指标
stochrsi_3m = ta.stochrsi(dataframe['close'], length=rsi_length_value, rsi_length=rsi_length_value)
dataframe['stochrsi_k_3m'] = stochrsi_3m[f'STOCHRSIk_{rsi_length_value}_{rsi_length_value}_3_3']
dataframe['stochrsi_d_3m'] = stochrsi_3m[f'STOCHRSId_{rsi_length_value}_{rsi_length_value}_3_3']
# 新增 MACD 指标
macd_3m = ta.macd(dataframe['close'], fast=12, slow=26, signal=9)
dataframe['macd_3m'] = macd_3m['MACD_12_26_9']
dataframe['macd_signal_3m'] = macd_3m['MACDs_12_26_9']
dataframe['macd_hist_3m'] = macd_3m['MACDh_12_26_9']
# 计算3m时间框架的EMA50和EMA200
dataframe['ema_50_3m'] = ta.ema(dataframe['close'], length=50)
dataframe['ema_200_3m'] = ta.ema(dataframe['close'], length=200)
# 成交量过滤
dataframe['volume_ma'] = dataframe['volume'].rolling(20).mean()
# 计算 ATR 用于动态止损和退出
dataframe['atr'] = ta.atr(dataframe['high'], dataframe['low'], dataframe['close'], length=14)
@ -183,45 +221,45 @@ class FreqaiPrimer(IStrategy):
# 计算15m时间框架的EMA50和EMA200
df_15m['ema_50_15m'] = ta.ema(df_15m['close'], length=50)
df_15m['ema_200_15m'] = ta.ema(df_15m['close'], length=200)
# 新增 StochRSI 指标
stochrsi_15m = ta.stochrsi(df_15m['close'], length=rsi_length_value, rsi_length=rsi_length_value)
df_15m['stochrsi_k_15m'] = stochrsi_15m[f'STOCHRSIk_{rsi_length_value}_{rsi_length_value}_3_3']
df_15m['stochrsi_d_15m'] = stochrsi_15m[f'STOCHRSId_{rsi_length_value}_{rsi_length_value}_3_3']
# 新增 MACD 指标
macd_15m = ta.macd(df_15m['close'], fast=12, slow=26, signal=9)
df_15m['macd_15m'] = macd_15m['MACD_12_26_9']
df_15m['macd_signal_15m'] = macd_15m['MACDs_12_26_9']
df_15m['macd_hist_15m'] = macd_15m['MACDh_12_26_9']
# 将 15m 数据重新索引到主时间框架 (3m)
df_15m = df_15m.set_index('date').reindex(dataframe['date']).reset_index()
df_15m = df_15m.rename(columns={'index': 'date'})
df_15m = df_15m[['date', 'rsi_15m', 'ema_50_15m', 'ema_200_15m']].ffill()
# 合并 15m 数据
dataframe = dataframe.merge(df_15m, how='left', on='date')
# 获取 1h 数据
df_1h = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='1h')
# 计算 1h 布林带
bb_1h = ta.bbands(df_1h['close'], length=bb_length_value, std=bb_std_value)
df_1h['bb_lower_1h'] = bb_1h[f'BBL_{bb_length_value}_{bb_std_value}']
df_1h['bb_upper_1h'] = bb_1h[f'BBU_{bb_length_value}_{bb_std_value}']
# 计算 1h RSI 和 EMA
df_1h['rsi_1h'] = ta.rsi(df_1h['close'], length=rsi_length_value)
df_1h['ema_50_1h'] = ta.ema(df_1h['close'], length=50) # 1h 50周期EMA
df_1h['ema_200_1h'] = ta.ema(df_1h['close'], length=200) # 1h 200周期EMA
df_1h['trend_1h'] = df_1h['close'] > df_1h['ema_50_1h'] # 1h上涨趋势
# 新增 StochRSI 指标
stochrsi_1h = ta.stochrsi(df_1h['close'], length=rsi_length_value, rsi_length=rsi_length_value)
df_1h['stochrsi_k_1h'] = stochrsi_1h[f'STOCHRSIk_{rsi_length_value}_{rsi_length_value}_3_3']
df_1h['stochrsi_d_1h'] = stochrsi_1h[f'STOCHRSId_{rsi_length_value}_{rsi_length_value}_3_3']
# 新增 MACD 指标
macd_1h = ta.macd(df_1h['close'], fast=12, slow=26, signal=9)
df_1h['macd_1h'] = macd_1h['MACD_12_26_9']
@ -239,30 +277,30 @@ class FreqaiPrimer(IStrategy):
df_1h = df_1h[['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h', 'bb_lower_1h', 'bb_upper_1h', 'stochrsi_k_1h', 'stochrsi_d_1h', 'macd_1h', 'macd_signal_1h']].ffill()
# Validate that all required columns are present
required_columns = ['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h',
required_columns = ['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h',
'bb_lower_1h', 'bb_upper_1h', 'stochrsi_k_1h', 'stochrsi_d_1h',
'macd_1h', 'macd_signal_1h']
missing_columns = [col for col in required_columns if col not in df_1h.columns]
if missing_columns:
logger.error(f"[{metadata['pair']}] 缺少以下列: {missing_columns}")
raise KeyError(f"缺少以下列: {missing_columns}")
# 确保所有需要的列都被合并
required_columns = ['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h',
required_columns = ['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h',
'bb_lower_1h', 'bb_upper_1h', 'stochrsi_k_1h', 'stochrsi_d_1h',
'macd_1h', 'macd_signal_1h']
# 验证所需列是否存在
missing_columns = [col for col in required_columns if col not in df_1h.columns]
if missing_columns:
logger.error(f"[{metadata['pair']}] 缺少以下列: {missing_columns}")
raise KeyError(f"缺少以下列: {missing_columns}")
df_1h = df_1h[required_columns] # 确保包含 macd_1h 和 macd_signal_1h
# 合并 1h 数据
dataframe = dataframe.merge(df_1h, how='left', on='date').ffill()
# 验证合并后的列
#logger.info(f"[{metadata['pair']}] 合并后的数据框列名: {list(dataframe.columns)}")
@ -277,13 +315,13 @@ class FreqaiPrimer(IStrategy):
# 计算各时间框架的趋势状态(牛/熊)
# 3m时间框架ema50下穿ema200为熊上穿为牛
dataframe['trend_3m'] = np.where(dataframe['ema_50_3m'] > dataframe['ema_200_3m'], 1, 0)
# 15m时间框架ema50下穿ema200为熊上穿为牛
dataframe['trend_15m'] = np.where(dataframe['ema_50_15m'] > dataframe['ema_200_15m'], 1, 0)
# 1h时间框架ema50下穿ema200为熊上穿为牛
dataframe['trend_1h_ema'] = np.where(dataframe['ema_50_1h'] > dataframe['ema_200_1h'], 1, 0)
# 计算熊牛得分0-100
# 权重3m熊牛权重1015m熊牛权重351h熊牛权重65
# 计算加权得分
@ -292,10 +330,10 @@ class FreqaiPrimer(IStrategy):
dataframe['trend_15m'] * 35 +
dataframe['trend_1h_ema'] * 65
)
# 确保得分在0-100范围内
dataframe['market_score'] = dataframe['market_score'].clip(lower=0, upper=100)
# 根据得分分类市场状态
dataframe['market_state'] = 'neutral'
dataframe.loc[dataframe['market_score'] > 70, 'market_state'] = 'strong_bull'
@ -308,7 +346,7 @@ class FreqaiPrimer(IStrategy):
dataframe['prev_market_state'] = dataframe['market_state'].shift(1)
# 为第一行设置默认值
dataframe['prev_market_state'] = dataframe['prev_market_state'].fillna('neutral')
# 记录当前的市场状态
# if len(dataframe) > 0:
# current_score = dataframe['market_score'].iloc[-1]
@ -317,38 +355,52 @@ class FreqaiPrimer(IStrategy):
#logger.info(f"[{metadata['pair']}] 各时间框架趋势: 3m={'牛' if dataframe['trend_3m'].iloc[-1] == 1 else '熊'}, \
# 15m={'牛' if dataframe['trend_15m'].iloc[-1] == 1 else '熊'}, \
# 1h={'牛' if dataframe['trend_1h_ema'].iloc[-1] == 1 else '熊'}")
# 调试:打印指标值(最后 5 行),验证时间对齐
#print(f"Pair: {metadata['pair']}, Last 5 rows after reindexing:")
#print(dataframe[['date', 'close', 'bb_lower_3m', 'rsi_3m', 'rsi_15m', 'rsi_1h', 'trend_1h',
#print(dataframe[['date', 'close', 'bb_lower_3m', 'rsi_3m', 'rsi_15m', 'rsi_1h', 'trend_1h',
# 'trend_3m', 'trend_15m', 'trend_1h_ema', 'market_score', 'market_state',
# 'bullish_engulfing', 'volume', 'volume_ma']].tail(5))
# 打印最终数据框的列名以验证
#logger.info(f"[{metadata['pair']}] 最终数据框列名: {list(dataframe.columns)}")
# 为BTC生成闪崩信号可选但推荐
if metadata['pair'] == 'BTC/USDT':
# 确保rsi_3m列存在
if 'rsi_3m' not in dataframe.columns:
logger.debug(f"[{metadata['pair']}] rsi_3m列丢失重新计算")
rsi_length_value = self.rsi_length.value
dataframe['rsi_3m'] = ta.rsi(dataframe['close'], length=rsi_length_value)
# 必须接收返回值,否则列不会被添加
dataframe = self.generate_dip_signal(dataframe, metadata)
if len(dataframe) > 0 and 'enter_long_dip_test' in dataframe.columns:
if dataframe['enter_long_dip_test'].iloc[-1]:
logger.info(f"🚨 [BTC/USDT] 检测到闪崩信号!")
return dataframe
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 出场信号基于趋势和量价关系
# 条件1: 价格突破布林带上轨(使用可优化的偏差参数)
breakout_condition = dataframe['close'] >= dataframe['bb_upper_1h'] * self.exit_bb_upper_deviation.value
# 条件2: 成交量显著放大(使用可优化的成交量乘数)
volume_spike = dataframe['volume'] > dataframe['volume_ma'] * self.exit_volume_multiplier.value
# 条件3: MACD 下降趋势
macd_downward = dataframe['macd_1h'] < dataframe['macd_signal_1h']
# 条件4: RSI 进入超买区域(使用可优化的超买阈值)
rsi_overbought = dataframe['rsi_1h'] > self.rsi_overbought.value
# 合并所有条件
final_condition = breakout_condition | volume_spike | macd_downward | rsi_overbought
# 设置出场信号
dataframe.loc[final_condition, 'exit_long'] = 1
# 增强调试信息
#logger.info(f"[{metadata['pair']}] 出场条件检查:")
#logger.info(f" - 价格突破布林带上轨: {breakout_condition.sum()} 次")
@ -357,46 +409,125 @@ class FreqaiPrimer(IStrategy):
#logger.info(f" - RSI 超买: {rsi_overbought.sum()} 次")
#logger.info(f" - 最终条件: {final_condition.sum()} 次")
#logger.info(f" - 使用参数: exit_bb_upper_deviation={self.exit_bb_upper_deviation.value}, exit_volume_multiplier={self.exit_volume_multiplier.value}, rsi_overbought={self.rsi_overbought.value}")
# 日志记录
if dataframe['exit_long'].sum() > 0:
logger.info(f"[{metadata['pair']}] 触发出场信号数量: {dataframe['exit_long'].sum()}")
return dataframe
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 确保prev_market_state列存在
if 'prev_market_state' not in dataframe.columns:
dataframe['prev_market_state'] = 'neutral'
pair = metadata['pair']
# 初始化enter_long列为0确保它存在
if 'enter_long' not in dataframe.columns:
dataframe['enter_long'] = 0
# ================== 紧急通道1如果是 BTC 自身,直接使用闪崩信号 ==================
if pair == 'BTC/USDT':
# 确保闪崩信号列被正确生成
if 'enter_long_dip_test' not in dataframe.columns:
dataframe = self.generate_dip_signal(dataframe, metadata)
if 'enter_long_dip_test' in dataframe.columns:
# 1. 找出所有闪崩信号位置(用于回测)
flash_crash_indices = dataframe.index[dataframe['enter_long_dip_test'] == 1]
# 2. 为每个闪崩信号设置入场标记和标签
if len(flash_crash_indices) > 0:
dataframe.loc[flash_crash_indices, 'enter_long'] = 1
dataframe.loc[flash_crash_indices, 'enter_tag'] = 'flash_crash'
# 👉 输出每个信号的详细信息
log_msg = f"🚨 [{pair}] 设置闪崩抄底信号 (共 {len(flash_crash_indices)} 个):\n"
for idx in flash_crash_indices:
date_str = dataframe.loc[idx, 'date'].strftime('%Y-%m-%d %H:%M:%S')
price = dataframe.loc[idx, 'close']
rsi = dataframe.loc[idx, 'rsi_3m']
dip_percent = dataframe.loc[idx, 'dip_percentage']
log_msg += f" → 时间={date_str}, 价格={price:.4f}, RSI={rsi:.1f}, 跌幅={dip_percent:.2f}%\n"
logger.info(log_msg.strip())
# 3. 实盘逻辑检查最新K线的闪崩信号
if len(dataframe) > 0 and dataframe['enter_long_dip_test'].iloc[-1]:
dataframe.loc[dataframe.index[-1], 'enter_long'] = 1
dataframe.loc[dataframe.index[-1], 'enter_tag'] = 'flash_crash' # 添加闪崩抄底标签
date_str = dataframe.loc[dataframe.index[-1], 'date']
price = dataframe.loc[dataframe.index[-1], 'close']
rsi = dataframe.loc[dataframe.index[-1], 'rsi_3m']
dip_percent = dataframe.loc[dataframe.index[-1], 'dip_percentage'] if 'dip_percentage' in dataframe.columns else 0
logger.info(f"🚨 [{pair}] BTC最新闪崩立即入场时间: {date_str}, 价格: {price:.4f}, RSI: {rsi:.1f}, 跌幅: {dip_percent:.2f}%")
# ================== 紧急通道2如果是 SOL/SUI检查 BTC 是否刚闪崩 ==================
elif pair in ['SOL/USDT', 'SUI/USDT']:
try:
# 获取完整的 BTC 数据集
full_btc_df = self.dp.get_pair_dataframe(pair='BTC/USDT', timeframe=self.timeframe)
if len(full_btc_df) == 0:
logger.warning(f"[{pair}] 无法获取BTC数据")
return dataframe
# 为BTC生成闪崩信号无收阳限制
full_btc_df = self.generate_dip_signal(full_btc_df.copy(), {'pair': 'BTC/USDT'})
current_time = dataframe['date'].iloc[-1]
# 查找最近一次闪崩信号
dip_signals = full_btc_df[full_btc_df['enter_long_dip_test'] == 1]
if len(dip_signals) > 0:
last_btc_signal_time = dip_signals['date'].iloc[-1]
time_since_crash = current_time - last_btc_signal_time
# 👉 放宽响应窗口至 18 分钟6根K线
if pd.Timedelta(minutes=3) <= time_since_crash <= pd.Timedelta(minutes=18):
# 放宽条件RSI < 40 或 StochRSI_K < 25
rsi_ok = dataframe['rsi_3m'].iloc[-1] < 40
stoch_ok = dataframe['stochrsi_k_3m'].iloc[-1] < 25
price_drop_recent = (dataframe['close'].iloc[-1] / dataframe['close'].iloc[-2]) < 0.99 # 最近一根下跌
if (rsi_ok or stoch_ok) and price_drop_recent:
dataframe.loc[dataframe.index[-1], 'enter_long'] = 1
dataframe.loc[dataframe.index[-1], 'enter_tag'] = 'btc_flash_crash_response'
logger.info(f"🎯🎯 [{pair}] 【联动抄底】响应BTC闪崩"
f" 时间={current_time}, "
f"间隔={int(time_since_crash.total_seconds()//60)}分钟, "
f"RSI={dataframe['rsi_3m'].iloc[-1]:.1f}, "
f"StochRSI_K={dataframe['stochrsi_k_3m'].iloc[-1]:.1f}")
except Exception as e:
logger.error(f"[{pair}] BTC联动检测异常: {repr(e)}", exc_info=True)
# 条件1: 价格接近布林带下轨(允许一定偏差)
close_to_bb_lower_1h = (dataframe['close'] <= dataframe['bb_lower_1h'] * self.bb_lower_deviation.value) # 可优化偏差
# 条件2: RSI 不高于阈值(根据市场状态动态调整)
# 为每一行创建动态阈值
rsi_condition_1h = dataframe.apply(lambda row:
row['rsi_1h'] < self.rsi_bull_threshold.value if row['prev_market_state'] in ['strong_bull', 'weak_bull'] else row['rsi_1h'] < self.rsi_oversold.value,
rsi_condition_1h = dataframe.apply(lambda row:
row['rsi_1h'] < self.rsi_bull_threshold.value if row['prev_market_state'] in ['strong_bull', 'weak_bull'] else row['rsi_1h'] < self.rsi_oversold.value,
axis=1)
# 条件3: StochRSI 处于超卖区域(根据市场状态动态调整)
stochrsi_condition_1h = dataframe.apply(lambda row:
(row['stochrsi_k_1h'] < self.stochrsi_bull_threshold.value and row['stochrsi_d_1h'] < self.stochrsi_bull_threshold.value) if row['prev_market_state'] in ['strong_bull', 'weak_bull']
else (row['stochrsi_k_1h'] < self.stochrsi_neutral_threshold.value and row['stochrsi_d_1h'] < self.stochrsi_neutral_threshold.value),
stochrsi_condition_1h = dataframe.apply(lambda row:
(row['stochrsi_k_1h'] < self.stochrsi_bull_threshold.value and row['stochrsi_d_1h'] < self.stochrsi_bull_threshold.value) if row['prev_market_state'] in ['strong_bull', 'weak_bull']
else (row['stochrsi_k_1h'] < self.stochrsi_neutral_threshold.value and row['stochrsi_d_1h'] < self.stochrsi_neutral_threshold.value),
axis=1)
# 条件4: MACD 上升趋势
macd_condition_1h = dataframe['macd_1h'] > dataframe['macd_signal_1h']
# 条件5: 成交量显著放大(可选条件)
volume_spike = dataframe['volume'] > dataframe['volume_ma'] * self.volume_multiplier.value
# 条件6: 布林带宽度过滤(避免窄幅震荡)
bb_width = (dataframe['bb_upper_1h'] - dataframe['bb_lower_1h']) / dataframe['close']
bb_width_condition = bb_width > self.bb_width_threshold.value # 可优化的布林带宽度阈值
# 辅助条件: 3m 和 15m 趋势确认(允许部分时间框架不一致)
trend_confirmation = (dataframe['trend_3m'] == 1) | (dataframe['trend_15m'] == 1)
# 合并所有条件(减少强制性条件)
# 至少满足多个条件中的一定数量
condition_count = (
@ -408,10 +539,10 @@ class FreqaiPrimer(IStrategy):
trend_confirmation.astype(int)
)
final_condition = condition_count >= self.min_condition_count.value
# 设置入场信号
dataframe.loc[final_condition, 'enter_long'] = 1
# 设置入场信号,但保留已经设置的闪崩信号
dataframe.loc[final_condition & (dataframe['enter_long'] == 0), 'enter_long'] = 1
# 增强调试信息
#logger.info(f"[{metadata['pair']}] 入场条件检查:")
#logger.info(f" - 价格接近布林带下轨: {close_to_bb_lower_1h.sum()} 次")
@ -432,11 +563,11 @@ class FreqaiPrimer(IStrategy):
'trend': trend_confirmation
})
correlation = conditions.corr().mean().mean()
#logger.info(f"[{metadata['pair']}] 条件平均相关性: {correlation:.2f}")
#logger.info(f"[{metadata['pair']}] 条件平均相关性: {correlation:.2f}")
# 日志记录
if dataframe['enter_long'].sum() > 0:
logger.info(f"[{metadata['pair']}] 发现入场信号数量: {dataframe['enter_long'].sum()}")
return dataframe
def detect_h1_rapid_rise(self, pair: str) -> bool:
@ -450,51 +581,51 @@ class FreqaiPrimer(IStrategy):
try:
# 获取1小时K线数据
df_1h = self.dp.get_pair_dataframe(pair=pair, timeframe='1h')
# 获取当前优化参数值
max_candles = self.h1_max_candles.value
rapid_rise_threshold = self.h1_rapid_rise_threshold.value
max_consecutive_candles = self.h1_max_consecutive_candles.value
# 确保有足够的K线数据
if len(df_1h) < max_candles:
logger.warning(f"[{pair}] 1h K线数据不足 {max_candles} 根,当前只有 {len(df_1h)} 根,无法完整检测剧烈拉升")
return False
# 获取最近的K线
recent_data = df_1h.iloc[-max_candles:].copy()
# 检查连续最多几根K线内的最大涨幅
rapid_rise_detected = False
max_rise = 0
for i in range(len(recent_data) - max_consecutive_candles + 1):
window_data = recent_data.iloc[i:i + max_consecutive_candles]
window_low = window_data['low'].min()
window_high = window_data['high'].max()
# 计算区间内的最大涨幅
if window_low > 0:
rise_percentage = (window_high - window_low) / window_low
if rise_percentage > max_rise:
max_rise = rise_percentage
# 检查是否超过阈值
if rise_percentage >= rapid_rise_threshold:
rapid_rise_detected = True
#logger.info(f"[{pair}] 检测到剧烈拉升: 从 {window_low:.2f} 到 {window_high:.2f} ({rise_percentage:.2%}) 在 {max_consecutive_candles} 根K线内")
break
current_price = recent_data['close'].iloc[-1]
#logger.info(f"[{pair}] 剧烈拉升检测结果: {'不稳固' if rapid_rise_detected else '稳固'}")
#logger.info(f"[{pair}] 最近最大涨幅: {max_rise:.2%}")
return rapid_rise_detected
except Exception as e:
logger.error(f"[{pair}] 剧烈拉升检测过程中发生错误: {str(e)}")
return False
def confirm_trade_entry(
self,
pair: str,
@ -509,60 +640,54 @@ class FreqaiPrimer(IStrategy):
) -> bool:
"""
交易买入前的确认函数用于最终决定是否执行交易
此处实现剧烈拉升检查逻辑
此处实现剧烈拉升检查逻辑但对闪崩抄底交易特殊处理
"""
# 默认允许交易
allow_trade = True
# 仅对多头交易进行检查
if side == 'long':
# 检查是否处于剧烈拉升的不稳固区域
is_unstable_region = self.detect_h1_rapid_rise(pair)
if is_unstable_region:
#logger.info(f"[{pair}] 由于检测到剧烈拉升,取消入场交易")
allow_trade = False
# 检查是否有特殊入场标签(例如闪崩抄底)
if entry_tag != 'flash_crash':
# 非闪崩抄底交易才进行剧烈拉升检查
is_unstable_region = self.detect_h1_rapid_rise(pair)
if is_unstable_region:
#logger.info(f"[{pair}] 由于检测到剧烈拉升,取消入场交易")
allow_trade = False
else:
# 闪崩抄底交易总是允许,不进行剧烈拉升检查
logger.info(f"[{pair}] 闪崩抄底交易,绕过剧烈拉升检查")
allow_trade = True
# 如果没有阻止因素,允许交易
return allow_trade
def custom_stoploss(self, pair: str, trade: 'Trade', current_time, current_rate: float,
def custom_stoploss(self, pair: str, trade: 'Trade', current_time, current_rate: float,
current_profit: float, **kwargs) -> float:
# 动态止损基于ATR
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1]
atr = last_candle['atr']
# 获取当前市场状态
current_state = dataframe['market_state'].iloc[-1] if 'market_state' in dataframe.columns else 'unknown'
# 激进的渐进式止损策略
# 激进的渐进式止损策略
if current_profit > 0.05: # 利润超过5%时
return -3.0 * atr / current_rate # 更大幅扩大止损范围,让利润奔跑
elif current_profit > 0.03: # 利润超过3%时
return -2.5 * atr / current_rate # 更中等扩大止损范围
elif current_profit > 0.01: # 利润超过1%时
return -2.0 * atr / current_rate # 更轻微扩大止损范围
# 在强劲牛市中,即使小亏损也可以容忍更大回调
if current_state == 'strong_bull' and current_profit > -0.01:
return -1.5 * atr / current_rate
# 动态调整止损范围
if current_profit > 0.05: # 利润超过5%时
return -3.0 * atr / current_rate # 更大幅扩大止损范围,让利润奔跑
elif current_profit > 0.03: # 利润超过3%时
return -2.5 * atr / current_rate # 更中等扩大止损范围
elif current_profit > 0.01: # 利润超过1%时
return -2.0 * atr / current_rate # 更轻微扩大止损范围
# 在强劲牛市中,即使小亏损也可以容忍更大回调
if current_state == 'strong_bull' and current_profit > -0.01:
return -1.8 * atr / current_rate
return -1.8 * atr / current_rate # 更强的牛市容忍度
if atr > 0:
return -1.2 * atr / current_rate # 基础1.2倍ATR止损
return self.stoploss
def custom_exit(self, pair: str, trade: Trade, current_time: datetime, current_rate: float,
current_profit: float, **kwargs) -> float:
if trade.is_short:
@ -576,16 +701,14 @@ class FreqaiPrimer(IStrategy):
a = self.roi_param_a.value # 系数a (可优化参数)
k = self.roi_param_k.value # 偏移量k (可优化参数)
t = self.roi_param_t.value # 常数项t (可优化参数)
dynamic_roi_threshold = (a * (trade_age_minutes + k)) + t
# 确保ROI阈值不小于0
if dynamic_roi_threshold < 0:
dynamic_roi_threshold = 0.0
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
current_state = dataframe['market_state'].iloc[-1] if 'market_state' in dataframe.columns else (
'strong_bull' if dataframe['sma'].diff().iloc[-1] > 0.01 else 'weak_bull' if dataframe['sma'].diff().iloc[-1] > 0 else 'neutral'
)
current_state = dataframe['market_state'].iloc[-1] if 'market_state' in dataframe.columns else 'neutral'
entry_tag = trade.enter_tag if hasattr(trade, 'enter_tag') else None
profit_ratio = current_profit / dynamic_roi_threshold if dynamic_roi_threshold > 0 else 0
@ -611,7 +734,7 @@ class FreqaiPrimer(IStrategy):
return exit_ratio
def adjust_trade_position(self, trade: 'Trade', current_time, current_rate: float,
def adjust_trade_position(self, trade: 'Trade', current_time, current_rate: float,
current_profit: float, min_stake: float, max_stake: float, **kwargs) -> float:
"""
根据用户要求实现加仓逻辑
@ -620,41 +743,41 @@ class FreqaiPrimer(IStrategy):
"""
# 获取当前交易对
pair = trade.pair
# 获取当前交易的加仓次数
entry_count = len(trade.orders) # 获取所有入场订单数量
# 如果已经达到最大加仓次数,则不再加仓
if entry_count - 1 >= self.max_entry_adjustments.value:
return 0.0
# 获取初始入场价格和当前价格的差值百分比
initial_price = trade.open_rate
if initial_price == 0:
return 0.0
price_diff_pct = (current_rate - initial_price) / initial_price
# 检查价格回调是否达到加仓间隔
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
current_state = dataframe['market_state'].iloc[-1] if 'market_state' in dataframe.columns else 'neutral'
if price_diff_pct <= -self.add_position_callback.value and current_state not in ['bear', 'weak_bear']:
# 计算初始入场金额
initial_stake = trade.orders[0].cost # 第一笔订单的成本
# 计算加仓次数从1开始计数
adjustment_count = entry_count - 1 # 已加仓次数
# 计算加仓金额: (initial_stake / stake_divisor) ^ (adjustment_count + 1)
additional_stake = (self.step_coefficient.value * initial_stake / self.stake_divisor.value) ** (adjustment_count + 1)
# 确保加仓金额在允许的范围内
additional_stake = max(min_stake, min(additional_stake, max_stake - trade.stake_amount))
#logger.info(f"[{pair}] 触发加仓: 第{adjustment_count + 1}次加仓, 初始金额{initial_stake:.2f}, \
# 加仓金额{additional_stake:.2f}, 价格差{price_diff_pct:.2%}, 当前利润{current_profit:.2%}")
return additional_stake
# 不符合加仓条件返回0
return 0.0

View File

@ -203,7 +203,6 @@ docker-compose run --rm freqtrade backtesting $PAIRS_FLAG \
--config /freqtrade/config_examples/$CONFIG_FILE \
--config /freqtrade/templates/freqaiprimer.json \
--strategy-path /freqtrade/templates \
--enable-protections \
--strategy $STRATEGY_NAME \
--timerange $START_DATE-$END_DATE \
--fee 0.0008 \