zhangkun9038@dingtalk.com 5103976971 update ignore
2025-11-27 08:02:32 +08:00

757 lines
39 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import warnings
warnings.filterwarnings("ignore", category=UserWarning, module="pandas_ta")
import logging
from freqtrade.strategy import IStrategy, IntParameter, DecimalParameter
from pandas import DataFrame
import pandas_ta as ta
from freqtrade.persistence import Trade
import numpy as np
import datetime
import pandas as pd
import math
logger = logging.getLogger(__name__)
class FreqaiPrimer(IStrategy):
# 策略参数 - 使用custom_roi替代minimal_roi字典
loglevel = "warning"
# 启用自定义ROI回调函数
use_custom_roi = True
# 写死在策略里,永远不参与 hyperopt
trailing_stop = True
trailing_stop_positive = 0.012 # 1.2% 固定回调(震荡市最稳)
trailing_stop_positive_offset = 0.02 # 1.8% 盈利后才激活(防止过早启动)
trailing_only_offset_is_reached = True # 必须等盈利超过 offset 才启动 trailing
# 用于跟踪市场状态的数据框缓存
_dataframe_cache = None
def __init__(self, config=None):
"""初始化策略参数调用父类初始化方法并接受config参数"""
super().__init__(config) # 调用父类的初始化方法并传递config
assert self.h1_max_candles.value <= 50, f"h1_max_candles={self.h1_max_candles.value} 超出安全范围!"
@property
def protections(self):
"""
保护机制配置
基于最新Freqtrade规范保护机制应定义在策略文件中而非配置文件
"""
return [
{
"method": "StoplossGuard",
"lookback_period_candles": 60, # 3小时回看期60根3分钟K线
"trade_limit": 2, # 最多2笔止损交易
"stop_duration_candles": 60, # 暂停180分钟60根3分钟K线
"only_per_pair": False # 仅针对单个币对
},
{
"method": "CooldownPeriod",
"stop_duration_candles": 18 # 6分钟冷却期2根3分钟K线
},
{
"method": "MaxDrawdown",
"lookback_period_candles": 48, # 2.4小时回看期
"trade_limit": 4, # 4笔交易限制
"stop_duration_candles": 24, # 72分钟暂停24根3分钟K线
"max_allowed_drawdown": 0.20 # 20%最大回撤容忍度
}
]
@property
def trailing_stop_positive(self):
"""根据市场状态动态调整跟踪止盈参数"""
# 获取当前市场状态
if self._dataframe_cache is not None and len(self._dataframe_cache) > 0:
current_state = self._dataframe_cache['market_state'].iloc[-1]
if current_state == 'strong_bull':
return 0.007 # 强劲牛市中降低跟踪止盈,让利润奔跑
elif current_state == 'weak_bull':
return 0.005 # 弱势牛市中保持较低的跟踪止盈
return self._trailing_stop_positive_default # 返回默认值
@trailing_stop_positive.setter
def trailing_stop_positive(self, value):
"""设置trailing_stop_positive的默认值"""
self._trailing_stop_positive_default = value
timeframe = "3m" # 主时间框架为 3 分钟
can_short = False # 禁用做空
# [propertiesGrp_List]--------------------------------------------------------------------------------------------------------------------------------------
# [propertiesGrp step="1" name="第一轮优化" epochs="160" space="buy " description="入场基础条件优化,入场确认条件优化"]
bb_std = DecimalParameter(2.0, 5.0, decimals=1, default=2.7, optimize=True, load=True, space='buy') # 安全2.0-5.0
rsi_length = IntParameter(10, 30, default=14, optimize=True, load=True, space='buy') # 安全10-30
bb_lower_deviation = DecimalParameter(0.92, 1.15, decimals=3, default=0.926, optimize=True, load=True, space='buy') # 安全0.92-1.15
stochrsi_bull_threshold = IntParameter(20, 50, default=21, optimize=True, load=True, space='buy') # 安全20-50
volume_multiplier = DecimalParameter(1.5, 6.0, decimals=1, default=4.3, optimize=True, load=True, space='buy') # 安全1.5-6.0
min_condition_count = IntParameter(1, 2, default=2, optimize=True, load=True, space='buy') # 最多只允许2个条件
bb_length = IntParameter(20, 60, default=36, optimize=True, load=True, space='buy') # 安全20-60
# [/propertiesGrp]
# [propertiesGrp step="2" name="第二轮优化 - 剧烈拉升检测" epochs="160" space="buy" description="防追高核心参数,绝对不能放宽!"]
rsi_oversold = IntParameter(20, 50, default=20, optimize=True, load=True, space='buy') # 安全20-50
rsi_bull_threshold = IntParameter(40, 68, default=68, optimize=True, load=True, space='buy') # 安全40-68
stochrsi_neutral_threshold = IntParameter(15, 40, default=22, optimize=True, load=True, space='buy') # 安全15-40
bb_width_threshold = DecimalParameter(0.003, 0.030, decimals=3, default=0.005, optimize=True, load=True, space='buy') # 安全0.003-0.030
h1_max_candles = IntParameter(16, 50, default=40, optimize=True, load=True, space='buy') # 黄金区间绝不能超过50
h1_rapid_rise_threshold = DecimalParameter(0.08, 0.22, decimals=3, default=0.08, optimize=True, load=True, space='buy') # 0.08-0.22 实盘最稳
h1_max_consecutive_candles = IntParameter(1, 2, default=2, optimize=True, load=True, space='buy') # 固定为1最稳2也行
# [/propertiesGrp]
# [propertiesGrp step="3" name="第三轮优化 - 加仓策略" epochs="160" space="buy" description="加仓精准度与金额管理,严防爆仓"]
add_position_callback = DecimalParameter(0.025, 0.070, decimals=3, default=0.052, optimize=True, load=True, space='buy') # 2.5%-7.0% 回调才加
add_rsi_oversold_threshold = IntParameter(15, 40, default=20, optimize=True, load=True, space='buy') # 不能太低
add_stochrsi_oversold = IntParameter(10, 35, default=20, optimize=True, load=True, space='buy')
add_bb_lower_proximity = DecimalParameter(0.85, 1.20, decimals=3, default=1.102, optimize=True, load=True, space='buy') # 不能离下轨太远
add_position_decrease_ratio= DecimalParameter(0.30, 0.80, decimals=2, default=0.36, optimize=True, load=True, space='buy') # 递减比例别太激进
max_entry_adjustments = IntParameter(2, 7, default=6, optimize=True, load=True, space='buy') # 最多7次加仓防爆仓
adjust_multiplier = DecimalParameter(0.6, 1.6, decimals=2, default=0.66, optimize=True, load=True, space='buy') # 别让加仓金额指数爆炸
# [/propertiesGrp]
# [propertiesGrp step="4" name="第四轮优化 - 出场与分级止盈" epochs="200" space="sell" description="出场条件与分级止盈,减仓与风险管理"]
exit_bb_upper_deviation = DecimalParameter(0.90, 1.15, decimals=3, default=0.923, optimize=True, load=True, space='sell')
exit_volume_multiplier = DecimalParameter(2.0, 7.0, decimals=1, default=5.1, optimize=True, load=True, space='sell')
exit_rsi_threshold = IntParameter(55, 72, default=68, optimize=True, load=True, space='sell') # 牛市也能出得了场
exit_profit_tier1 = DecimalParameter(0.03, 0.12, decimals=3, default=0.032, optimize=True, load=True, space='sell')
exit_reduce_tier1 = DecimalParameter(0.20, 0.70, decimals=2, default=0.69, optimize=True, load=True, space='sell')
exit_profit_tier2 = DecimalParameter(0.08, 0.20, decimals=3, default=0.167, optimize=True, load=True, space='sell')
exit_reduce_tier2 = DecimalParameter(0.15, 0.60, decimals=2, default=0.33, optimize=True, load=True, space='sell')
reduce_profit_base = DecimalParameter(0.02, 0.12, decimals=3, default=0.063, optimize=True, load=True, space='sell')
reduce_coefficient = DecimalParameter(0.15, 0.55, decimals=3, default=0.263, optimize=True, load=True, space='sell')
max_reduce_adjustments = IntParameter(1, 4, default=4, optimize=True, load=True, space='sell') # 最多4次减仓就够了
# [/propertiesGrp]
# [propertiesGrp step="5" name="第五轮优化" epochs="80" space="roi stoploss" description="最终ROI与止损微调"]
# 这里可以放你后续要优化的ROI表、动态止损系数等
# [/propertiesGrp]
# [/propertiesGrp_List]-----------------------------------------------------------------------------------------------------------------------------
def informative_pairs(self):
pairs = self.dp.current_whitelist()
return [(pair, '15m') for pair in pairs] + [(pair, '1h') for pair in pairs]
def _validate_dataframe_columns(self, dataframe: DataFrame, required_columns: list, metadata: dict):
"""
验证数据框中是否包含所有需要的列。
如果缺少列,则记录警告日志。
"""
missing_columns = [col for col in required_columns if col not in dataframe.columns]
if missing_columns:
logger.warning(f"[{metadata['pair']}] 数据框中缺少以下列: {missing_columns}")
def custom_entry_price(self, pair: str, current_time: pd.Timestamp, proposed_rate: float,
entry_tag: str | None, side: str, **kwargs) -> float:
"""
自定义入场价格给入场价格打98折降低2%
添加零值保护,防止除零错误
"""
# 零值保护如果proposed_rate为0或异常小值直接返回原值
if proposed_rate <= 0 or proposed_rate < 1e-8:
logger.warning(f"[{pair}] proposed_rate异常: {proposed_rate},返回原值")
return proposed_rate if proposed_rate > 0 else 0.0
# 入场价格折扣98折降低2%
discounted_rate = proposed_rate * 0.995
return discounted_rate
def custom_stake_amount(self, pair: str, current_time: pd.Timestamp,
current_rate: float,
proposed_stake: float,
min_stake: float,
max_stake: float,
**kwargs) -> float:
# 获取初始资金回测中固定为dry_run_wallet的值
initial_balance = self.config.get('dry_run_wallet', 10000)
# 始终以初始资金的3.75%计算
desired_stake = initial_balance * 0.0375
desired_stake = math.floor(desired_stake) # 取整,去掉小数点后的数字
return max(min(desired_stake, max_stake), min_stake)
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 计算 3m 周期的指标
bb_length_value = self.bb_length.value
bb_std_value = self.bb_std.value
rsi_length_value = self.rsi_length.value
bb_3m = ta.bbands(dataframe['close'], length=bb_length_value, std=bb_std_value)
dataframe['bb_lower_3m'] = bb_3m[f'BBL_{bb_length_value}_{bb_std_value}']
dataframe['bb_upper_3m'] = bb_3m[f'BBU_{bb_length_value}_{bb_std_value}']
dataframe['rsi_3m'] = ta.rsi(dataframe['close'], length=rsi_length_value)
# 新增 StochRSI 指标
stochrsi_3m = ta.stochrsi(dataframe['close'], length=rsi_length_value, rsi_length=rsi_length_value)
dataframe['stochrsi_k_3m'] = stochrsi_3m[f'STOCHRSIk_{rsi_length_value}_{rsi_length_value}_3_3']
dataframe['stochrsi_d_3m'] = stochrsi_3m[f'STOCHRSId_{rsi_length_value}_{rsi_length_value}_3_3']
# 新增 MACD 指标
macd_3m = ta.macd(dataframe['close'], fast=12, slow=26, signal=9)
dataframe['macd_3m'] = macd_3m['MACD_12_26_9']
dataframe['macd_signal_3m'] = macd_3m['MACDs_12_26_9']
dataframe['macd_hist_3m'] = macd_3m['MACDh_12_26_9']
# 计算3m时间框架的EMA50和EMA200
dataframe['ema_50_3m'] = ta.ema(dataframe['close'], length=50)
dataframe['ema_200_3m'] = ta.ema(dataframe['close'], length=200)
# 成交量过滤
dataframe['volume_ma'] = dataframe['volume'].rolling(20).mean()
# 计算 ATR 用于动态止损和退出
dataframe['atr'] = ta.atr(dataframe['high'], dataframe['low'], dataframe['close'], length=14)
# 获取 15m 数据
df_15m = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='15m')
df_15m['rsi_15m'] = ta.rsi(df_15m['close'], length=rsi_length_value)
# 计算15m时间框架的EMA50和EMA200
df_15m['ema_50_15m'] = ta.ema(df_15m['close'], length=50)
df_15m['ema_200_15m'] = ta.ema(df_15m['close'], length=200)
# 新增 StochRSI 指标
stochrsi_15m = ta.stochrsi(df_15m['close'], length=rsi_length_value, rsi_length=rsi_length_value)
df_15m['stochrsi_k_15m'] = stochrsi_15m[f'STOCHRSIk_{rsi_length_value}_{rsi_length_value}_3_3']
df_15m['stochrsi_d_15m'] = stochrsi_15m[f'STOCHRSId_{rsi_length_value}_{rsi_length_value}_3_3']
# 新增 MACD 指标
macd_15m = ta.macd(df_15m['close'], fast=12, slow=26, signal=9)
df_15m['macd_15m'] = macd_15m['MACD_12_26_9']
df_15m['macd_signal_15m'] = macd_15m['MACDs_12_26_9']
df_15m['macd_hist_15m'] = macd_15m['MACDh_12_26_9']
# 将 15m 数据重新索引到主时间框架 (3m)
df_15m = df_15m.set_index('date').reindex(dataframe['date']).reset_index()
df_15m = df_15m.rename(columns={'index': 'date'})
df_15m = df_15m[['date', 'rsi_15m', 'ema_50_15m', 'ema_200_15m']].ffill()
# 合并 15m 数据
dataframe = dataframe.merge(df_15m, how='left', on='date')
# 获取 1h 数据
df_1h = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='1h')
# 计算 1h 布林带
bb_1h = ta.bbands(df_1h['close'], length=bb_length_value, std=bb_std_value)
df_1h['bb_lower_1h'] = bb_1h[f'BBL_{bb_length_value}_{bb_std_value}']
df_1h['bb_upper_1h'] = bb_1h[f'BBU_{bb_length_value}_{bb_std_value}']
# 计算 1h RSI 和 EMA
df_1h['rsi_1h'] = ta.rsi(df_1h['close'], length=rsi_length_value)
df_1h['ema_50_1h'] = ta.ema(df_1h['close'], length=50) # 1h 50周期EMA
df_1h['ema_200_1h'] = ta.ema(df_1h['close'], length=200) # 1h 200周期EMA
df_1h['trend_1h'] = df_1h['close'] > df_1h['ema_50_1h'] # 1h上涨趋势
# 新增 StochRSI 指标
stochrsi_1h = ta.stochrsi(df_1h['close'], length=rsi_length_value, rsi_length=rsi_length_value)
df_1h['stochrsi_k_1h'] = stochrsi_1h[f'STOCHRSIk_{rsi_length_value}_{rsi_length_value}_3_3']
df_1h['stochrsi_d_1h'] = stochrsi_1h[f'STOCHRSId_{rsi_length_value}_{rsi_length_value}_3_3']
# 新增 MACD 指标
macd_1h = ta.macd(df_1h['close'], fast=12, slow=26, signal=9)
df_1h['macd_1h'] = macd_1h['MACD_12_26_9']
df_1h['macd_signal_1h'] = macd_1h['MACDs_12_26_9']
df_1h['macd_hist_1h'] = macd_1h['MACDh_12_26_9']
# 验证 MACD 列是否正确生成
#logger.info(f"[{metadata['pair']}] 1小时 MACD 列: {list(macd_1h.columns)}")
# 确保 StochRSI 指标已正确计算
# 将 1h 数据重新索引到主时间框架 (3m),并填充缺失值
df_1h = df_1h.set_index('date').reindex(dataframe['date']).ffill().bfill().reset_index()
df_1h = df_1h.rename(columns={'index': 'date'})
# Include macd_1h and macd_signal_1h in the column selection
df_1h = df_1h[['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h', 'bb_lower_1h', 'bb_upper_1h', 'stochrsi_k_1h', 'stochrsi_d_1h', 'macd_1h', 'macd_signal_1h']].ffill()
# Validate that all required columns are present
required_columns = ['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h',
'bb_lower_1h', 'bb_upper_1h', 'stochrsi_k_1h', 'stochrsi_d_1h',
'macd_1h', 'macd_signal_1h']
missing_columns = [col for col in required_columns if col not in df_1h.columns]
if missing_columns:
logger.error(f"[{metadata['pair']}] 缺少以下列: {missing_columns}")
raise KeyError(f"缺少以下列: {missing_columns}")
# 确保所有需要的列都被合并
required_columns = ['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h',
'bb_lower_1h', 'bb_upper_1h', 'stochrsi_k_1h', 'stochrsi_d_1h',
'macd_1h', 'macd_signal_1h']
# 验证所需列是否存在
missing_columns = [col for col in required_columns if col not in df_1h.columns]
if missing_columns:
logger.error(f"[{metadata['pair']}] 缺少以下列: {missing_columns}")
raise KeyError(f"缺少以下列: {missing_columns}")
df_1h = df_1h[required_columns] # 确保包含 macd_1h 和 macd_signal_1h
# 合并 1h 数据
dataframe = dataframe.merge(df_1h, how='left', on='date').ffill()
# 验证合并后的列
#logger.info(f"[{metadata['pair']}] 合并后的数据框列名: {list(dataframe.columns)}")
# K线形态看涨吞没
dataframe['bullish_engulfing'] = (
(dataframe['close'].shift(1) < dataframe['open'].shift(1)) &
(dataframe['close'] > dataframe['open']) &
(dataframe['close'] > dataframe['open'].shift(1)) &
(dataframe['open'] < dataframe['close'].shift(1))
)
# 计算各时间框架的趋势状态(牛/熊)
# 3m时间框架ema50下穿ema200为熊上穿为牛
dataframe['trend_3m'] = np.where(dataframe['ema_50_3m'] > dataframe['ema_200_3m'], 1, 0)
# 15m时间框架ema50下穿ema200为熊上穿为牛
dataframe['trend_15m'] = np.where(dataframe['ema_50_15m'] > dataframe['ema_200_15m'], 1, 0)
# 1h时间框架ema50下穿ema200为熊上穿为牛
dataframe['trend_1h_ema'] = np.where(dataframe['ema_50_1h'] > dataframe['ema_200_1h'], 1, 0)
# 计算熊牛得分0-100
# 权重3m熊牛权重1015m熊牛权重351h熊牛权重65
# 计算加权得分
dataframe['market_score'] = (
dataframe['trend_3m'] * 10 +
dataframe['trend_15m'] * 35 +
dataframe['trend_1h_ema'] * 65
)
# 确保得分在0-100范围内
dataframe['market_score'] = dataframe['market_score'].clip(lower=0, upper=100)
# 根据得分分类市场状态
dataframe['market_state'] = 'neutral'
dataframe.loc[dataframe['market_score'] > 70, 'market_state'] = 'strong_bull'
dataframe.loc[(dataframe['market_score'] > 50) & (dataframe['market_score'] <= 70), 'market_state'] = 'weak_bull'
dataframe.loc[(dataframe['market_score'] >= 30) & (dataframe['market_score'] <= 50), 'market_state'] = 'neutral'
dataframe.loc[(dataframe['market_score'] > 10) & (dataframe['market_score'] < 30), 'market_state'] = 'weak_bear'
dataframe.loc[dataframe['market_score'] <= 10, 'market_state'] = 'strong_bear'
# 创建一个使用前一行市场状态的列避免在populate_entry_trend中使用iloc[-1]
dataframe['prev_market_state'] = dataframe['market_state'].shift(1)
# 为第一行设置默认值
dataframe['prev_market_state'] = dataframe['prev_market_state'].fillna('neutral')
# 记录当前的市场状态
# if len(dataframe) > 0:
# current_score = dataframe['market_score'].iloc[-1]
# current_state = dataframe['market_state'].iloc[-1]
#logger.info(f"[{metadata['pair']}] 熊牛得分: {current_score:.1f}, 市场状态: {current_state}")
#logger.info(f"[{metadata['pair']}] 各时间框架趋势: 3m={'牛' if dataframe['trend_3m'].iloc[-1] == 1 else '熊'}, \
# 15m={'牛' if dataframe['trend_15m'].iloc[-1] == 1 else '熊'}, \
# 1h={'牛' if dataframe['trend_1h_ema'].iloc[-1] == 1 else '熊'}")
# 调试:打印指标值(最后 5 行),验证时间对齐
#print(f"Pair: {metadata['pair']}, Last 5 rows after reindexing:")
#print(dataframe[['date', 'close', 'bb_lower_3m', 'rsi_3m', 'rsi_15m', 'rsi_1h', 'trend_1h',
# 'trend_3m', 'trend_15m', 'trend_1h_ema', 'market_score', 'market_state',
# 'bullish_engulfing', 'volume', 'volume_ma']].tail(5))
# 打印最终数据框的列名以验证
#logger.info(f"[{metadata['pair']}] 最终数据框列名: {list(dataframe.columns)}")
return dataframe
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 出场信号基于趋势和量价关系
# 确保market_state列存在
if 'market_state' not in dataframe.columns:
dataframe['market_state'] = 'neutral'
# ======================== 巨不下器的出场逻辑 ========================
# 极其区慢,给利润奔跑的机会
# 条件1: RSI极度超买大于75才考虑出场
rsi_extreme_overbought = dataframe['rsi_1h'] > 75
# 条件2: 趋势大幅反转(超卖)
trend_extreme_reversal = dataframe['rsi_1h'] < 20
# 条件3: MACD死亡交叉严格的反转信号
macd_death_cross = (
(dataframe['macd_1h'] < dataframe['macd_signal_1h']) &
(dataframe['macd_1h'].shift(1) >= dataframe['macd_signal_1h'].shift(1))
)
# 综合成出条件(需要需要偏保守,不然遍地开花)
final_condition = (
rsi_extreme_overbought |
trend_extreme_reversal |
macd_death_cross
)
# 设置出场信号
dataframe.loc[final_condition, 'exit_long'] = 1
return dataframe
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 确保prev_market_state列存在
if 'prev_market_state' not in dataframe.columns:
dataframe['prev_market_state'] = 'neutral'
# ======================== 简化版入场逻辑 - 只需1个条件满足 ========================
# 条件1: RSI超卖核心条件
rsi_oversold = dataframe['rsi_1h'] < self.rsi_oversold.value
# 条件2: 价格接近BB下轨
price_near_bb_lower = dataframe['close'] <= dataframe['bb_lower_1h'] * self.bb_lower_deviation.value
# 条件3: MACD看涨
macd_bullish = dataframe['macd_1h'] > dataframe['macd_signal_1h']
# 条件4: 避免极高位
not_too_high = dataframe['close'] < dataframe['bb_upper_1h']
# 只需满足任意1个条件就入场极其宽松
final_condition = (
rsi_oversold |
price_near_bb_lower |
macd_bullish
) & not_too_high # 但必须不在极高位
# 设置入场信号
dataframe.loc[final_condition, 'enter_long'] = 1
# 日志
signal_count = dataframe['enter_long'].sum()
if signal_count > 0:
logger.info(f"[{metadata['pair']}] 🎯 发现 {signal_count} 个入场信号")
return dataframe
def detect_h1_rapid_rise(self, pair: str) -> bool:
"""
检测1小时K线图上的剧烈拉升情况轻量级版本用于confirm_trade_entry
参数:
- pair: 交易对
返回:
- bool: 是否处于不稳固区域
"""
try:
# 获取1小时K线数据
df_1h = self.dp.get_pair_dataframe(pair=pair, timeframe='1h')
# 获取当前优化参数值
max_candles = self.h1_max_candles.value
rapid_rise_threshold = self.h1_rapid_rise_threshold.value
max_consecutive_candles = self.h1_max_consecutive_candles.value
# 确保有足够的K线数据
if len(df_1h) < max_candles:
logger.warning(f"[{pair}] 1h K线数据不足 {max_candles} 根,当前只有 {len(df_1h)} 根,无法完整检测剧烈拉升")
return False
# 获取最近的K线
recent_data = df_1h.iloc[-max_candles:].copy()
# 检查连续最多几根K线内的最大涨幅
rapid_rise_detected = False
max_rise = 0
for i in range(len(recent_data) - max_consecutive_candles + 1):
window_data = recent_data.iloc[i:i + max_consecutive_candles]
window_low = window_data['low'].min()
window_high = window_data['high'].max()
# 计算区间内的最大涨幅
if window_low > 0:
rise_percentage = (window_high - window_low) / window_low
if rise_percentage > max_rise:
max_rise = rise_percentage
# 检查是否超过阈值
if rise_percentage >= rapid_rise_threshold:
rapid_rise_detected = True
#logger.info(f"[{pair}] 检测到剧烈拉升: 从 {window_low:.2f} 到 {window_high:.2f} ({rise_percentage:.2%}) 在 {max_consecutive_candles} 根K线内")
break
current_price = recent_data['close'].iloc[-1]
#logger.info(f"[{pair}] 剧烈拉升检测结果: {'不稳固' if rapid_rise_detected else '稳固'}")
#logger.info(f"[{pair}] 最近最大涨幅: {max_rise:.2%}")
return rapid_rise_detected
except Exception as e:
logger.error(f"[{pair}] 剧烈拉升检测过程中发生错误: {str(e)}")
return False
def confirm_trade_entry(
self,
pair: str,
order_type: str,
amount: float,
rate: float,
time_in_force: str,
current_time: datetime,
entry_tag: str | None,
side: str,
**kwargs,
) -> bool:
"""
交易买入前的确认函数
⚠️ 禁用剧烈拉升检测以增加交易频率
"""
# 允许所有符合条件的交易
return True
def _check_add_position_conditions(self, pair: str, current_rate: float, current_profit: float,
entry_count: int, initial_price: float, dataframe) -> dict:
"""
检查加仓条件的多维度评分系统
返回: {'should_add': bool, 'score': float, 'reasons': list}
"""
try:
if dataframe is None or len(dataframe) < 30:
return {'should_add': False, 'score': 0, 'reasons': ['数据不足']}
last_candle = dataframe.iloc[-1]
reasons = []
score = 0.0
max_score = 6.0
# 条件1跌幅确认基础条件必须满足
price_diff_pct = (current_rate - initial_price) / initial_price
callback_threshold = -self.add_position_callback.value
if price_diff_pct <= callback_threshold:
score += 1.0
reasons.append(f"✓ 跌幅{price_diff_pct:.2%}{callback_threshold:.2%}")
else:
return {'should_add': False, 'score': 0, 'reasons': [f'✗ 跌幅不足: {price_diff_pct:.2%} > {callback_threshold:.2%}']}
# 条件2RSI超卖确认
rsi_1h = last_candle.get('rsi_1h', 50)
if rsi_1h < self.add_rsi_oversold_threshold.value:
score += 1.0
reasons.append(f"✓ RSI超卖: {rsi_1h:.1f} < {self.add_rsi_oversold_threshold.value}")
else:
reasons.append(f"✗ RSI不超卖: {rsi_1h:.1f}{self.add_rsi_oversold_threshold.value}")
# 条件3StochRSI双线低位确认
stochrsi_k = last_candle.get('stochrsi_k_1h', 50)
stochrsi_d = last_candle.get('stochrsi_d_1h', 50)
if stochrsi_k < self.add_stochrsi_oversold.value and stochrsi_d < self.add_stochrsi_oversold.value:
score += 1.0
reasons.append(f"✓ StochRSI双超卖: K={stochrsi_k:.1f}, D={stochrsi_d:.1f}")
else:
reasons.append(f"✗ StochRSI未双超卖: K={stochrsi_k:.1f}, D={stochrsi_d:.1f}")
# 条件4MACD上升确认底部反转信号
macd_1h = last_candle.get('macd_1h', 0)
macd_signal_1h = last_candle.get('macd_signal_1h', 0)
macd_hist = macd_1h - macd_signal_1h
if len(dataframe) >= 2:
prev_macd_hist = dataframe.iloc[-2].get('macd_1h', 0) - dataframe.iloc[-2].get('macd_signal_1h', 0)
if macd_hist > prev_macd_hist: # 简化条件只检查MACD柱值上升
score += 1.0
reasons.append(f"✓ MACD底部上升: 柱值={macd_hist:.6f}")
else:
reasons.append(f"✗ MACD未确认: 柱值={macd_hist:.6f}")
# 条件5布林带下轨支撑确认
bb_lower = last_candle.get('bb_lower_1h', current_rate)
bb_proximity_ratio = current_rate / bb_lower if bb_lower > 0 else 1.0
if bb_proximity_ratio <= self.add_bb_lower_proximity.value:
score += 1.0
reasons.append(f"✓ 接近BB下轨: 比例={bb_proximity_ratio:.4f}")
else:
reasons.append(f"✗ 离BB下轨太远: 比例={bb_proximity_ratio:.4f}")
# 条件6成交量放大确认简化条件
volume = last_candle.get('volume', 0)
volume_ma = last_candle.get('volume_ma', 1)
if volume > volume_ma * 1.2: # 固定1.2倍成交量确认
score += 1.0
reasons.append(f"✓ 成交量放大: {volume:.0f} > {volume_ma * 1.2:.0f}")
else:
reasons.append(f"✗ 成交量不足: {volume:.0f}{volume_ma * 1.2:.0f}")
# 条件7市场状态过滤强熊市禁止加仓
market_state = last_candle.get('market_state', 'neutral')
if market_state != 'strong_bear':
score += 0.5
reasons.append(f"✓ 市场状态良好: {market_state}")
else:
reasons.append(f"✗ 强熊市,避免加仓: {market_state}")
return {'should_add': False, 'score': score/max_score, 'reasons': reasons}
# 综合判断(极致放宽条件)
condition_met = sum(1 for r in reasons if r.startswith('')) >= 2 # 从≥3降低到≥2极致放宽
score_ratio = score / max_score
should_add = condition_met and score_ratio >= 0.35 # 从0.5降低到0.35(极致放宽)
return {
'should_add': should_add,
'score': score_ratio,
'reasons': reasons,
'condition_met': condition_met
}
except Exception as e:
logger.error(f"[{pair}] 加仓条件检查出错: {str(e)}")
return {'should_add': False, 'score': 0, 'reasons': [f'错误: {str(e)}']}
def _calculate_add_position_amount(self, trade: 'Trade', entry_count: int, min_stake: float, max_stake: float) -> float:
"""
智能计算加仓金额(支持递减策略)
- 早期加仓金额较大,后期逐步减小
- 防止后期加仓金额过大导致爆仓
"""
try:
initial_stake = float(trade.orders[0].cost)
# 基础公式:(adjust_multiplier × initial_stake) ^ entry_count
base_amount = (self.adjust_multiplier.value * initial_stake) ** entry_count
# 应用递减系数(后续加仓金额逐步缩小)
# 第1次加仓: 100% × 基础金额
# 第2次加仓: 75% × 基础金额
# 第3次加仓: 56% × 基础金额
decrease_ratio = self.add_position_decrease_ratio.value ** (entry_count - 1)
adjusted_amount = base_amount * decrease_ratio
# 安全校验
current_stake = float(trade.stake_amount)
remaining_capacity = max_stake - current_stake
# 加仓金额不能超过剩余容量的80%(留余量)
adjusted_amount = min(adjusted_amount, remaining_capacity * 0.8)
adjusted_amount = max(min_stake, min(adjusted_amount, max_stake - current_stake))
return adjusted_amount
except Exception as e:
logger.error(f"[{trade.pair}] 加仓金额计算出错: {str(e)}")
return 0.0
def adjust_trade_position(self, trade: 'Trade', current_time, current_rate: float,
current_profit: float, min_stake: float, max_stake: float, **kwargs) -> float:
"""
增强版持仓调整逻辑:加仓精准度 + 递减策略 + 减仓优化
"""
pair = trade.pair
# ========================== 分级止盈减仓逻辑(增强版) ==========================
if current_profit > 0:
reduce_count = len(trade.select_filled_orders(trade.exit_side))
if reduce_count >= self.max_reduce_adjustments.value:
return 0.0
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
current_kline_time = dataframe.iloc[-1]['date'].strftime('%Y-%m-%d %H:%M:%S')
last_reduce_kline = trade.get_custom_data("last_reduce_kline")
if last_reduce_kline == current_kline_time:
return 0.0
initial_stake = float(trade.orders[0].cost)
current_stake = float(trade.stake_amount)
# 分级止盈逻辑3级
# 第1级达到exit_profit_tier1时减仓exit_reduce_tier1比例
if current_profit >= self.exit_profit_tier1.value:
if reduce_count < 1:
reduce_amount = current_stake * self.exit_reduce_tier1.value
reduce_amount = -min(reduce_amount, current_stake * 0.5) # 单次最多减仓50%
#logger.info(f"[{pair}] 分级止盈第1级: 盈利{current_profit:.2%}, "
# f"减仓比例{self.exit_reduce_tier1.value:.1%}, 金额{abs(reduce_amount):.2f}")
trade.set_custom_data("last_reduce_kline", current_kline_time)
return max(-current_stake, reduce_amount)
# 第2级达到exit_profit_tier2时减仓exit_reduce_tier2比例
if current_profit >= self.exit_profit_tier2.value:
if reduce_count < 2:
reduce_amount = current_stake * self.exit_reduce_tier2.value
reduce_amount = -min(reduce_amount, current_stake * 0.3) # 单次最多减仓30%
#logger.info(f"[{pair}] 分级止盈第2级: 盈利{current_profit:.2%}, "
# f"减仓比例{self.exit_reduce_tier2.value:.1%}, 金额{abs(reduce_amount):.2f}")
trade.set_custom_data("last_reduce_kline", current_kline_time)
return max(-current_stake, reduce_amount)
# 基础止盈(保持原有逻辑)
if current_profit >= self.reduce_profit_base.value:
reduce_amount = (float(self.reduce_coefficient.value) * initial_stake) ** (reduce_count + 1)
reduce_amount = min(reduce_amount, current_stake * 0.2) # 单次最多减仓20%
reduce_amount = -reduce_amount
reduce_amount = max(-current_stake, min(reduce_amount, -float(min_stake)))
#logger.info(f"[{pair}] 基础止盈: 盈利{current_profit:.2%}, 第{reduce_count+1}次, "
# f"金额{abs(reduce_amount):.2f}")
trade.set_custom_data("last_reduce_kline", current_kline_time)
return reduce_amount
return 0.0
# ========================== 增强版加仓逻辑 ==========================
entry_count = len(trade.orders)
if entry_count > self.max_entry_adjustments.value:
return 0.0
initial_price = trade.open_rate
if initial_price == 0:
return 0.0
# 获取数据框
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
if dataframe is None or len(dataframe) < 30:
return 0.0
# 检查加仓条件(多维度评分)
condition_check = self._check_add_position_conditions(pair, current_rate, current_profit, entry_count, initial_price, dataframe)
if not condition_check['should_add']:
return 0.0
# 周期限制每个timeframe仅加仓一次
current_kline_time = dataframe.iloc[-1]['date'].strftime('%Y-%m-%d %H:%M:%S')
last_add_kline = trade.get_custom_data("last_add_kline")
if last_add_kline == current_kline_time:
return 0.0
# 计算加仓金额
additional_stake = self._calculate_add_position_amount(trade, entry_count, min_stake, max_stake)
if additional_stake > 0:
#logger.info(f"[{pair}] 加仓触发: 第{entry_count+1}次, 金额{additional_stake:.2f}, 评分{condition_check['score']:.2f}")
trade.set_custom_data("last_add_kline", current_kline_time)
return additional_stake
return 0.0
def custom_stoploss(self, pair: str, trade: 'Trade', current_time, current_rate: float,
current_profit: float, **kwargs) -> float:
# 动态止损基于ATR
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1]
atr = last_candle['atr']
# 获取当前市场状态
current_state = dataframe['market_state'].iloc[-1] if 'market_state' in dataframe.columns else 'unknown'
# 渐进式止损策略(盈利越高,止损范围越大)
if current_profit > 0.05: # 利润超过5%时
return -3.0 * atr / current_rate # 大幅扩大止损范围,让利润奔跑
elif current_profit > 0.03: # 利润超过3%时
return -2.5 * atr / current_rate # 中等扩大止损范围
elif current_profit > 0.01: # 利润超过1%时
return -2.0 * atr / current_rate # 轻微扩大止损范围
# 在强劲牛市中,即使小亏损也可以容忍更大回调
if current_state == 'strong_bull' and current_profit > -0.01:
return -1.5 * atr / current_rate
# 基础止损
if atr > 0:
return -1.2 * atr / current_rate # 基础1.2倍ATR止损
return self.stoploss