myTestFreqAI/freqtrade/templates/freqaiprimer.py
2025-10-20 13:03:59 +08:00

636 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import warnings
warnings.filterwarnings("ignore", category=UserWarning, module="pandas_ta")
import logging
from freqtrade.strategy import IStrategy, IntParameter, DecimalParameter
from pandas import DataFrame
import pandas_ta as ta
from freqtrade.persistence import Trade
import numpy as np
import datetime
import pandas as pd
import math
logger = logging.getLogger(__name__)
class FreqaiPrimer(IStrategy):
# 策略参数 - 使用custom_roi替代minimal_roi字典
loglevel = "warning"
minimal_roi = {}
# 启用自定义ROI回调函数
use_custom_roi = True
stoploss = -0.15 # 固定止损 -15% (大幅放宽止损以承受更大波动)
trailing_stop = True
trailing_stop_positive_offset = 0.005 # 追踪止损偏移量 0.5% (更容易触发跟踪止盈)
# 用于跟踪市场状态的数据框缓存
_dataframe_cache = None
def __init__(self, config=None):
"""初始化策略参数调用父类初始化方法并接受config参数"""
super().__init__(config) # 调用父类的初始化方法并传递config
# 存储从配置文件加载的默认值
self._trailing_stop_positive_default = 0.004 # 降低默认值以更容易触发跟踪止盈
@property
def protections(self):
"""
保护机制配置
基于最新Freqtrade规范保护机制应定义在策略文件中而非配置文件
"""
return [
{
"method": "StoplossGuard",
"lookback_period_candles": 60, # 3小时回看期60根3分钟K线
"trade_limit": 2, # 最多2笔止损交易
"stop_duration_candles": 60, # 暂停180分钟60根3分钟K线
"only_per_pair": False # 仅针对单个币对
},
{
"method": "CooldownPeriod",
"stop_duration_candles": 2 # 6分钟冷却期2根3分钟K线
},
{
"method": "MaxDrawdown",
"lookback_period_candles": 48, # 2.4小时回看期
"trade_limit": 4, # 4笔交易限制
"stop_duration_candles": 24, # 72分钟暂停24根3分钟K线
"max_allowed_drawdown": 0.20 # 20%最大回撤容忍度
}
]
@property
def trailing_stop_positive(self):
"""根据市场状态动态调整跟踪止盈参数"""
# 获取当前市场状态
if self._dataframe_cache is not None and len(self._dataframe_cache) > 0:
current_state = self._dataframe_cache['market_state'].iloc[-1]
if current_state == 'strong_bull':
return 0.007 # 强劲牛市中降低跟踪止盈,让利润奔跑
elif current_state == 'weak_bull':
return 0.005 # 弱势牛市中保持较低的跟踪止盈
return self._trailing_stop_positive_default # 返回默认值
@trailing_stop_positive.setter
def trailing_stop_positive(self, value):
"""设置trailing_stop_positive的默认值"""
self._trailing_stop_positive_default = value
timeframe = "3m" # 主时间框架为 3 分钟
can_short = False # 禁用做空
# 这两个就不优化了, 参数太多了
h1_max_consecutive_candles = IntParameter(1, 4, default=3, optimize=False, load=True, space='buy')
h1_max_candles = IntParameter(100, 300, default=200, optimize=False, load=True, space='buy')
# 第一步 buy 空间优化参数 7个:
# --- 趋势与信号生成 ---
bb_length = IntParameter(10, 30, default=14, optimize=False, load=True, space='buy')
bb_std = DecimalParameter(1.5, 3.0, decimals=1, default=3.0, optimize=False, load=True, space='buy')
rsi_length = IntParameter(7, 21, default=16, optimize=False, load=True, space='buy')
# 入场条件阈值
bb_lower_deviation = DecimalParameter(1.01, 1.05, decimals=2, default=1.05, optimize=False, load=True, space='buy')
rsi_oversold = IntParameter(30, 50, default=42, optimize=False, load=True, space='buy')
volume_multiplier = DecimalParameter(1.2, 2.0, decimals=1, default=1.6, optimize=False, load=True, space='buy')
bb_width_threshold = IntParameter(10, 30, default=12, optimize=False, load=True, space='buy')
# 剧烈拉升检测参数 - 使用Hyperopt可优化参数
h1_rapid_rise_threshold = DecimalParameter(0.05, 0.15, decimals=3, default=0.065, optimize=False, load=True, space='buy')
# 第二步 buy 空间优化参数 7个:
# --- 资金管理与加仓 ---
add_position_callback = IntParameter(30, 60, default=28, optimize=True, load=True, space='buy')
# 合并step_coefficient和stake_divisor为multiplier参数
adjust_multiplier = DecimalParameter(0.15, 0.6, decimals=2, default=0.52, optimize=True, load=True, space='buy')
max_entry_adjustments = IntParameter(2, 5, default=2, optimize=True, load=True, space='buy') # 可暂时关闭
# 市场状态相关(辅助决策)
min_condition_count = IntParameter(2, 4, default=2, optimize=False, load=True, space='buy')
stochrsi_neutral_threshold = IntParameter(20, 30, default=29, optimize=True, load=True, space='buy') # 固定为20
stochrsi_bull_threshold = IntParameter(30, 40, default=36, optimize=True, load=True, space='buy')
rsi_bull_threshold = IntParameter(45, 55, default=54, optimize=True, load=True, space='buy') # 固定为50
# 第三步, 优化sell空间
exit_bb_upper_deviation = DecimalParameter(0.98, 1.02, decimals=2, default=0.99, optimize=True, load=True, space='sell')
exit_volume_multiplier = DecimalParameter(1.5, 3.0, decimals=1, default=1.7, optimize=True, load=True, space='sell')
rsi_overbought = IntParameter(57, 59, default=58, optimize=True, load=True, space='sell')
# -------------------------- 简化后的减仓Hyperopt参数仅3个对齐加仓 --------------------------
reduce_profit_base = DecimalParameter(0.05, 0.12, default=0.075, space='sell', optimize=True) # 减仓基础盈利阈值触发门槛默认7.5%
reduce_coefficient = DecimalParameter(0.1, 0.6, default=0.25, space='sell', optimize=True) # 减仓金额系数默认0.25,控制初始金额)
max_reduce_adjustments = IntParameter(1, 3, default=1, space='sell', optimize=True) # 最大减仓次数默认1次避免过度减仓
def informative_pairs(self):
pairs = self.dp.current_whitelist()
return [(pair, '15m') for pair in pairs] + [(pair, '1h') for pair in pairs]
def _validate_dataframe_columns(self, dataframe: DataFrame, required_columns: list, metadata: dict):
"""
验证数据框中是否包含所有需要的列。
如果缺少列,则记录警告日志。
"""
missing_columns = [col for col in required_columns if col not in dataframe.columns]
if missing_columns:
logger.warning(f"[{metadata['pair']}] 数据框中缺少以下列: {missing_columns}")
def custom_stake_amount(self, pair: str, current_time: pd.Timestamp,
current_rate: float,
proposed_stake: float,
min_stake: float,
max_stake: float,
**kwargs) -> float:
# 获取初始资金回测中固定为dry_run_wallet的值
initial_balance = self.config.get('dry_run_wallet', 10000)
# 始终以初始资金的3.75%计算
desired_stake = initial_balance * 0.0375
desired_stake = math.floor(desired_stake) # 取整,去掉小数点后的数字
return max(min(desired_stake, max_stake), min_stake)
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 计算 3m 周期的指标
bb_length_value = self.bb_length.value
bb_std_value = self.bb_std.value
rsi_length_value = self.rsi_length.value
bb_3m = ta.bbands(dataframe['close'], length=bb_length_value, std=bb_std_value)
dataframe['bb_lower_3m'] = bb_3m[f'BBL_{bb_length_value}_{bb_std_value}']
dataframe['bb_upper_3m'] = bb_3m[f'BBU_{bb_length_value}_{bb_std_value}']
dataframe['rsi_3m'] = ta.rsi(dataframe['close'], length=rsi_length_value)
# 新增 StochRSI 指标
stochrsi_3m = ta.stochrsi(dataframe['close'], length=rsi_length_value, rsi_length=rsi_length_value)
dataframe['stochrsi_k_3m'] = stochrsi_3m[f'STOCHRSIk_{rsi_length_value}_{rsi_length_value}_3_3']
dataframe['stochrsi_d_3m'] = stochrsi_3m[f'STOCHRSId_{rsi_length_value}_{rsi_length_value}_3_3']
# 新增 MACD 指标
macd_3m = ta.macd(dataframe['close'], fast=12, slow=26, signal=9)
dataframe['macd_3m'] = macd_3m['MACD_12_26_9']
dataframe['macd_signal_3m'] = macd_3m['MACDs_12_26_9']
dataframe['macd_hist_3m'] = macd_3m['MACDh_12_26_9']
# 计算3m时间框架的EMA50和EMA200
dataframe['ema_50_3m'] = ta.ema(dataframe['close'], length=50)
dataframe['ema_200_3m'] = ta.ema(dataframe['close'], length=200)
# 成交量过滤
dataframe['volume_ma'] = dataframe['volume'].rolling(20).mean()
# 计算 ATR 用于动态止损和退出
dataframe['atr'] = ta.atr(dataframe['high'], dataframe['low'], dataframe['close'], length=14)
# 获取 15m 数据
df_15m = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='15m')
df_15m['rsi_15m'] = ta.rsi(df_15m['close'], length=rsi_length_value)
# 计算15m时间框架的EMA50和EMA200
df_15m['ema_50_15m'] = ta.ema(df_15m['close'], length=50)
df_15m['ema_200_15m'] = ta.ema(df_15m['close'], length=200)
# 新增 StochRSI 指标
stochrsi_15m = ta.stochrsi(df_15m['close'], length=rsi_length_value, rsi_length=rsi_length_value)
df_15m['stochrsi_k_15m'] = stochrsi_15m[f'STOCHRSIk_{rsi_length_value}_{rsi_length_value}_3_3']
df_15m['stochrsi_d_15m'] = stochrsi_15m[f'STOCHRSId_{rsi_length_value}_{rsi_length_value}_3_3']
# 新增 MACD 指标
macd_15m = ta.macd(df_15m['close'], fast=12, slow=26, signal=9)
df_15m['macd_15m'] = macd_15m['MACD_12_26_9']
df_15m['macd_signal_15m'] = macd_15m['MACDs_12_26_9']
df_15m['macd_hist_15m'] = macd_15m['MACDh_12_26_9']
# 将 15m 数据重新索引到主时间框架 (3m)
df_15m = df_15m.set_index('date').reindex(dataframe['date']).reset_index()
df_15m = df_15m.rename(columns={'index': 'date'})
df_15m = df_15m[['date', 'rsi_15m', 'ema_50_15m', 'ema_200_15m']].ffill()
# 合并 15m 数据
dataframe = dataframe.merge(df_15m, how='left', on='date')
# 获取 1h 数据
df_1h = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='1h')
# 计算 1h 布林带
bb_1h = ta.bbands(df_1h['close'], length=bb_length_value, std=bb_std_value)
df_1h['bb_lower_1h'] = bb_1h[f'BBL_{bb_length_value}_{bb_std_value}']
df_1h['bb_upper_1h'] = bb_1h[f'BBU_{bb_length_value}_{bb_std_value}']
# 计算 1h RSI 和 EMA
df_1h['rsi_1h'] = ta.rsi(df_1h['close'], length=rsi_length_value)
df_1h['ema_50_1h'] = ta.ema(df_1h['close'], length=50) # 1h 50周期EMA
df_1h['ema_200_1h'] = ta.ema(df_1h['close'], length=200) # 1h 200周期EMA
df_1h['trend_1h'] = df_1h['close'] > df_1h['ema_50_1h'] # 1h上涨趋势
# 新增 StochRSI 指标
stochrsi_1h = ta.stochrsi(df_1h['close'], length=rsi_length_value, rsi_length=rsi_length_value)
df_1h['stochrsi_k_1h'] = stochrsi_1h[f'STOCHRSIk_{rsi_length_value}_{rsi_length_value}_3_3']
df_1h['stochrsi_d_1h'] = stochrsi_1h[f'STOCHRSId_{rsi_length_value}_{rsi_length_value}_3_3']
# 新增 MACD 指标
macd_1h = ta.macd(df_1h['close'], fast=12, slow=26, signal=9)
df_1h['macd_1h'] = macd_1h['MACD_12_26_9']
df_1h['macd_signal_1h'] = macd_1h['MACDs_12_26_9']
df_1h['macd_hist_1h'] = macd_1h['MACDh_12_26_9']
# 验证 MACD 列是否正确生成
#logger.info(f"[{metadata['pair']}] 1小时 MACD 列: {list(macd_1h.columns)}")
# 确保 StochRSI 指标已正确计算
# 将 1h 数据重新索引到主时间框架 (3m),并填充缺失值
df_1h = df_1h.set_index('date').reindex(dataframe['date']).ffill().bfill().reset_index()
df_1h = df_1h.rename(columns={'index': 'date'})
# Include macd_1h and macd_signal_1h in the column selection
df_1h = df_1h[['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h', 'bb_lower_1h', 'bb_upper_1h', 'stochrsi_k_1h', 'stochrsi_d_1h', 'macd_1h', 'macd_signal_1h']].ffill()
# Validate that all required columns are present
required_columns = ['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h',
'bb_lower_1h', 'bb_upper_1h', 'stochrsi_k_1h', 'stochrsi_d_1h',
'macd_1h', 'macd_signal_1h']
missing_columns = [col for col in required_columns if col not in df_1h.columns]
if missing_columns:
logger.error(f"[{metadata['pair']}] 缺少以下列: {missing_columns}")
raise KeyError(f"缺少以下列: {missing_columns}")
# 确保所有需要的列都被合并
required_columns = ['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h',
'bb_lower_1h', 'bb_upper_1h', 'stochrsi_k_1h', 'stochrsi_d_1h',
'macd_1h', 'macd_signal_1h']
# 验证所需列是否存在
missing_columns = [col for col in required_columns if col not in df_1h.columns]
if missing_columns:
logger.error(f"[{metadata['pair']}] 缺少以下列: {missing_columns}")
raise KeyError(f"缺少以下列: {missing_columns}")
df_1h = df_1h[required_columns] # 确保包含 macd_1h 和 macd_signal_1h
# 合并 1h 数据
dataframe = dataframe.merge(df_1h, how='left', on='date').ffill()
# 验证合并后的列
#logger.info(f"[{metadata['pair']}] 合并后的数据框列名: {list(dataframe.columns)}")
# K线形态看涨吞没
dataframe['bullish_engulfing'] = (
(dataframe['close'].shift(1) < dataframe['open'].shift(1)) &
(dataframe['close'] > dataframe['open']) &
(dataframe['close'] > dataframe['open'].shift(1)) &
(dataframe['open'] < dataframe['close'].shift(1))
)
# 计算各时间框架的趋势状态(牛/熊)
# 3m时间框架ema50下穿ema200为熊上穿为牛
dataframe['trend_3m'] = np.where(dataframe['ema_50_3m'] > dataframe['ema_200_3m'], 1, 0)
# 15m时间框架ema50下穿ema200为熊上穿为牛
dataframe['trend_15m'] = np.where(dataframe['ema_50_15m'] > dataframe['ema_200_15m'], 1, 0)
# 1h时间框架ema50下穿ema200为熊上穿为牛
dataframe['trend_1h_ema'] = np.where(dataframe['ema_50_1h'] > dataframe['ema_200_1h'], 1, 0)
# 计算熊牛得分0-100
# 权重3m熊牛权重1015m熊牛权重351h熊牛权重65
# 计算加权得分
dataframe['market_score'] = (
dataframe['trend_3m'] * 10 +
dataframe['trend_15m'] * 35 +
dataframe['trend_1h_ema'] * 65
)
# 确保得分在0-100范围内
dataframe['market_score'] = dataframe['market_score'].clip(lower=0, upper=100)
# 根据得分分类市场状态
dataframe['market_state'] = 'neutral'
dataframe.loc[dataframe['market_score'] > 70, 'market_state'] = 'strong_bull'
dataframe.loc[(dataframe['market_score'] > 50) & (dataframe['market_score'] <= 70), 'market_state'] = 'weak_bull'
dataframe.loc[(dataframe['market_score'] >= 30) & (dataframe['market_score'] <= 50), 'market_state'] = 'neutral'
dataframe.loc[(dataframe['market_score'] > 10) & (dataframe['market_score'] < 30), 'market_state'] = 'weak_bear'
dataframe.loc[dataframe['market_score'] <= 10, 'market_state'] = 'strong_bear'
# 创建一个使用前一行市场状态的列避免在populate_entry_trend中使用iloc[-1]
dataframe['prev_market_state'] = dataframe['market_state'].shift(1)
# 为第一行设置默认值
dataframe['prev_market_state'] = dataframe['prev_market_state'].fillna('neutral')
# 记录当前的市场状态
# if len(dataframe) > 0:
# current_score = dataframe['market_score'].iloc[-1]
# current_state = dataframe['market_state'].iloc[-1]
#logger.info(f"[{metadata['pair']}] 熊牛得分: {current_score:.1f}, 市场状态: {current_state}")
#logger.info(f"[{metadata['pair']}] 各时间框架趋势: 3m={'牛' if dataframe['trend_3m'].iloc[-1] == 1 else '熊'}, \
# 15m={'牛' if dataframe['trend_15m'].iloc[-1] == 1 else '熊'}, \
# 1h={'牛' if dataframe['trend_1h_ema'].iloc[-1] == 1 else '熊'}")
# 调试:打印指标值(最后 5 行),验证时间对齐
#print(f"Pair: {metadata['pair']}, Last 5 rows after reindexing:")
#print(dataframe[['date', 'close', 'bb_lower_3m', 'rsi_3m', 'rsi_15m', 'rsi_1h', 'trend_1h',
# 'trend_3m', 'trend_15m', 'trend_1h_ema', 'market_score', 'market_state',
# 'bullish_engulfing', 'volume', 'volume_ma']].tail(5))
# 打印最终数据框的列名以验证
#logger.info(f"[{metadata['pair']}] 最终数据框列名: {list(dataframe.columns)}")
return dataframe
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 出场信号基于趋势和量价关系
# 条件1: 价格突破布林带上轨(使用可优化的偏差参数)
breakout_condition = dataframe['close'] >= dataframe['bb_upper_1h'] * self.exit_bb_upper_deviation.value
# 条件2: 成交量显著放大(使用可优化的成交量乘数)
volume_spike = dataframe['volume'] > dataframe['volume_ma'] * self.exit_volume_multiplier.value
# 条件3: MACD 下降趋势
macd_downward = dataframe['macd_1h'] < dataframe['macd_signal_1h']
# 条件4: RSI 进入超买区域(使用可优化的超买阈值)
rsi_overbought = dataframe['rsi_1h'] > self.rsi_overbought.value
# 合并所有条件
final_condition = breakout_condition | volume_spike | macd_downward | rsi_overbought
# 设置出场信号
dataframe.loc[final_condition, 'exit_long'] = 1
# 增强调试信息
#logger.info(f"[{metadata['pair']}] 出场条件检查:")
#logger.info(f" - 价格突破布林带上轨: {breakout_condition.sum()} 次")
#logger.info(f" - 成交量显著放大: {volume_spike.sum()} 次")
#logger.info(f" - MACD 下降趋势: {macd_downward.sum()} 次")
#logger.info(f" - RSI 超买: {rsi_overbought.sum()} 次")
#logger.info(f" - 最终条件: {final_condition.sum()} 次")
#logger.info(f" - 使用参数: exit_bb_upper_deviation={self.exit_bb_upper_deviation.value}, exit_volume_multiplier={self.exit_volume_multiplier.value}, rsi_overbought={self.rsi_overbought.value}")
# 日志记录
if dataframe['exit_long'].sum() > 0:
logger.info(f"[{metadata['pair']}] 触发出场信号数量: {dataframe['exit_long'].sum()}")
return dataframe
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 确保prev_market_state列存在
if 'prev_market_state' not in dataframe.columns:
dataframe['prev_market_state'] = 'neutral'
# 条件1: 价格接近布林带下轨(允许一定偏差)
close_to_bb_lower_1h = (dataframe['close'] <= dataframe['bb_lower_1h'] * self.bb_lower_deviation.value) # 可优化偏差
# 条件2: RSI 不高于阈值(根据市场状态动态调整)
# 为每一行创建动态阈值
rsi_condition_1h = dataframe.apply(lambda row:
row['rsi_1h'] < self.rsi_bull_threshold.value if row['prev_market_state'] in ['strong_bull', 'weak_bull'] else row['rsi_1h'] < self.rsi_oversold.value,
axis=1)
# 条件3: StochRSI 处于超卖区域(根据市场状态动态调整)
stochrsi_condition_1h = dataframe.apply(lambda row:
(row['stochrsi_k_1h'] < self.stochrsi_bull_threshold.value and row['stochrsi_d_1h'] < self.stochrsi_bull_threshold.value) if row['prev_market_state'] in ['strong_bull', 'weak_bull']
else (row['stochrsi_k_1h'] < self.stochrsi_neutral_threshold.value and row['stochrsi_d_1h'] < self.stochrsi_neutral_threshold.value),
axis=1)
# 条件4: MACD 上升趋势
macd_condition_1h = dataframe['macd_1h'] > dataframe['macd_signal_1h']
# 条件5: 成交量显著放大(可选条件)
volume_spike = dataframe['volume'] > dataframe['volume_ma'] * self.volume_multiplier.value
# 条件6: 布林带宽度过滤(避免窄幅震荡)
bb_width = (dataframe['bb_upper_1h'] - dataframe['bb_lower_1h']) / dataframe['close']
bb_width_condition = bb_width > self.bb_width_threshold.value / 1000 # 可优化的布林带宽度阈值
# 辅助条件: 3m 和 15m 趋势确认(允许部分时间框架不一致)
trend_confirmation = (dataframe['trend_3m'] == 1) | (dataframe['trend_15m'] == 1)
# 合并所有条件(减少强制性条件)
# 至少满足多个条件中的一定数量
condition_count = (
close_to_bb_lower_1h.astype(int) +
rsi_condition_1h.astype(int) +
stochrsi_condition_1h.astype(int) +
macd_condition_1h.astype(int) +
(volume_spike | bb_width_condition).astype(int) + # 成交量或布林带宽度满足其一即可
trend_confirmation.astype(int)
)
final_condition = condition_count >= self.min_condition_count.value
# 设置入场信号
dataframe.loc[final_condition, 'enter_long'] = 1
# 增强调试信息
#logger.info(f"[{metadata['pair']}] 入场条件检查:")
#logger.info(f" - 价格接近布林带下轨: {close_to_bb_lower_1h.sum()} 次")
#logger.info(f" - RSI 超卖: {rsi_condition_1h.sum()} 次")
#logger.info(f" - StochRSI 超卖: {stochrsi_condition_1h.sum()} 次")
#logger.info(f" - MACD 上升趋势: {macd_condition_1h.sum()} 次")
#logger.info(f" - 成交量或布林带宽度: {(volume_spike | bb_width_condition).sum()} 次")
#logger.info(f" - 趋势确认: {trend_confirmation.sum()} 次")
#logger.info(f" - 最终条件: {final_condition.sum()} 次")
# 在populate_entry_trend方法末尾添加
# 计算条件间的相关性
conditions = DataFrame({
'close_to_bb': close_to_bb_lower_1h,
'rsi': rsi_condition_1h,
'stochrsi': stochrsi_condition_1h,
'macd': macd_condition_1h,
'vol_bb': (volume_spike | bb_width_condition),
'trend': trend_confirmation
})
correlation = conditions.corr().mean().mean()
#logger.info(f"[{metadata['pair']}] 条件平均相关性: {correlation:.2f}")
# 日志记录
if dataframe['enter_long'].sum() > 0:
logger.info(f"[{metadata['pair']}] 发现入场信号数量: {dataframe['enter_long'].sum()}")
return dataframe
def detect_h1_rapid_rise(self, pair: str) -> bool:
"""
检测1小时K线图上的剧烈拉升情况轻量级版本用于confirm_trade_entry
参数:
- pair: 交易对
返回:
- bool: 是否处于不稳固区域
"""
try:
# 获取1小时K线数据
df_1h = self.dp.get_pair_dataframe(pair=pair, timeframe='1h')
# 获取当前优化参数值
max_candles = self.h1_max_candles.value
rapid_rise_threshold = self.h1_rapid_rise_threshold.value
max_consecutive_candles = self.h1_max_consecutive_candles.value
# 确保有足够的K线数据
if len(df_1h) < max_candles:
logger.warning(f"[{pair}] 1h K线数据不足 {max_candles} 根,当前只有 {len(df_1h)} 根,无法完整检测剧烈拉升")
return False
# 获取最近的K线
recent_data = df_1h.iloc[-max_candles:].copy()
# 检查连续最多几根K线内的最大涨幅
rapid_rise_detected = False
max_rise = 0
for i in range(len(recent_data) - max_consecutive_candles + 1):
window_data = recent_data.iloc[i:i + max_consecutive_candles]
window_low = window_data['low'].min()
window_high = window_data['high'].max()
# 计算区间内的最大涨幅
if window_low > 0:
rise_percentage = (window_high - window_low) / window_low
if rise_percentage > max_rise:
max_rise = rise_percentage
# 检查是否超过阈值
if rise_percentage >= rapid_rise_threshold:
rapid_rise_detected = True
#logger.info(f"[{pair}] 检测到剧烈拉升: 从 {window_low:.2f} 到 {window_high:.2f} ({rise_percentage:.2%}) 在 {max_consecutive_candles} 根K线内")
break
current_price = recent_data['close'].iloc[-1]
#logger.info(f"[{pair}] 剧烈拉升检测结果: {'不稳固' if rapid_rise_detected else '稳固'}")
#logger.info(f"[{pair}] 最近最大涨幅: {max_rise:.2%}")
return rapid_rise_detected
except Exception as e:
logger.error(f"[{pair}] 剧烈拉升检测过程中发生错误: {str(e)}")
return False
def confirm_trade_entry(
self,
pair: str,
order_type: str,
amount: float,
rate: float,
time_in_force: str,
current_time: datetime,
entry_tag: str | None,
side: str,
**kwargs,
) -> bool:
"""
交易买入前的确认函数,用于最终决定是否执行交易
此处实现剧烈拉升检查逻辑
"""
# 默认允许交易
allow_trade = True
# 仅对多头交易进行检查
if side == 'long':
# 检查是否处于剧烈拉升的不稳固区域
is_unstable_region = self.detect_h1_rapid_rise(pair)
if is_unstable_region:
#logger.info(f"[{pair}] 由于检测到剧烈拉升,取消入场交易")
allow_trade = False
# 如果没有阻止因素,允许交易
return allow_trade
def custom_stoploss(self, pair: str, trade: 'Trade', current_time, current_rate: float,
current_profit: float, **kwargs) -> float:
# 动态止损基于ATR
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1]
atr = last_candle['atr']
# 获取当前市场状态
current_state = dataframe['market_state'].iloc[-1] if 'market_state' in dataframe.columns else 'unknown'
# 更激进的渐进式止损策略
if current_profit > 0.05: # 利润超过5%时
return -3.0 * atr / current_rate # 更大幅扩大止损范围,让利润奔跑
elif current_profit > 0.03: # 利润超过3%时
return -2.5 * atr / current_rate # 更中等扩大止损范围
elif current_profit > 0.01: # 利润超过1%时
return -2.0 * atr / current_rate # 更轻微扩大止损范围
# 在强劲牛市中,即使小亏损也可以容忍更大回调
if current_state == 'strong_bull' and current_profit > -0.01:
return -1.5 * atr / current_rate
# 动态调整止损范围
if current_profit > 0.05: # 利润超过5%时
return -3.0 * atr / current_rate # 更大幅扩大止损范围,让利润奔跑
elif current_profit > 0.03: # 利润超过3%时
return -2.5 * atr / current_rate # 更中等扩大止损范围
elif current_profit > 0.01: # 利润超过1%时
return -2.0 * atr / current_rate # 更轻微扩大止损范围
# 在强劲牛市中,即使小亏损也可以容忍更大回调
if current_state == 'strong_bull' and current_profit > -0.01:
return -1.8 * atr / current_rate
if atr > 0:
return -1.2 * atr / current_rate # 基础1.2倍ATR止损
return self.stoploss
def adjust_trade_position(self, trade: 'Trade', current_time, current_rate: float,
current_profit: float, min_stake: float, max_stake: float, **kwargs) -> float:
"""
简化版:加仓(原有)+ 减仓1个阈值+公式计算,对齐加仓逻辑)
- 减仓盈利≥基础阈值触发用公式算阶梯金额每个timeframe+最大次数双重限制
"""
pair = trade.pair
# -------------------------- 简化减仓逻辑1个阈值+公式计算) --------------------------
if current_profit > 0:
# 1. 基础限制:未达最大减仓次数 + 盈利≥基础阈值(核心触发条件,对齐加仓的跌幅阈值)
reduce_count = len(trade.select_filled_orders(trade.exit_side)) # 已成功减仓次数初始0
if reduce_count >= self.max_reduce_adjustments.value:
logger.debug(f"[{pair}] 已达最大减仓次数({self.max_reduce_adjustments.value}次),停止减仓")
return 0.0
if current_profit < self.reduce_profit_base.value:
logger.debug(f"[{pair}] 盈利{current_profit:.2%}<减仓基础阈值{self.reduce_profit_base.value:.2%},不加仓")
return 0.0
# 2. 周期限制每个timeframe仅1次保留之前的简化逻辑
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
current_kline_time = dataframe.iloc[-1]['date'].strftime('%Y-%m-%d %H:%M:%S')
last_reduce_kline = trade.get_custom_data("last_reduce_kline")
if last_reduce_kline == current_kline_time:
logger.debug(f"[{pair}] 当前{self.timeframe}周期已减仓,本次拒绝")
return 0.0
# 3. 公式计算减仓金额(完全对齐加仓公式逻辑,阶梯递增)
# 加仓公式:(step_coefficient × 初始金额 / stake_divisor) ^ 加仓次数
# 减仓公式:(reduce_coefficient × 初始开仓金额) ^ (减仓次数 + 1) → 次数越多,金额越大
initial_stake = float(trade.orders[0].cost) # 初始开仓金额(与加仓用同一基准)
reduce_amount = (float(self.reduce_coefficient.value) * initial_stake) ** (reduce_count + 1)
# 4. 安全校验(避免减仓超当前持仓/低于最小下单量,与加仓逻辑一致)
current_stake = float(trade.stake_amount) # 当前剩余持仓金额(减仓后会更新)
reduce_amount = min(reduce_amount, current_stake * 0.6) # 额外限制单次减仓不超当前持仓60%(防极端)
reduce_amount = -reduce_amount # 负号表示减仓Freqtrade规则
reduce_amount = max(-current_stake, min(reduce_amount, -float(min_stake))) # 安全边界
# 5. 触发减仓,记录周期
logger.info(f"[{pair}] 触发减仓: 盈利{current_profit:.2%}{self.reduce_profit_base.value:.2%},第{reduce_count+1}次减仓,金额{abs(reduce_amount):.2f}")
trade.set_custom_data("last_reduce_kline", current_kline_time)
return reduce_amount
# -------------------------- 原有加仓逻辑(保持不变,确保对齐) --------------------------
entry_count = len(trade.orders)
if entry_count > self.max_entry_adjustments.value:
return 0.0
initial_price = trade.open_rate
if initial_price == 0:
return 0.0
if current_profit > - self.add_position_callback.value / 100 :
return 0.0
price_diff_pct = (current_rate - initial_price) / initial_price
if (price_diff_pct/(entry_count)) <= - self.add_position_callback.value / 100 :
initial_stake = trade.orders[0].cost
additional_stake = (self.adjust_multiplier.value * initial_stake) ** (entry_count)
additional_stake = max(min_stake, min(additional_stake, max_stake - trade.stake_amount))
current_time_str = current_time.strftime('%Y-%m-%d %H:%M:%S')
logger.info(f"[{pair}] 触发加仓: 时间={current_time_str}, 第{entry_count}次开仓,初始金额{initial_stake:.2f},金额{additional_stake:.2f},跌幅{price_diff_pct:.2%}")
return additional_stake
return 0.0