myTestFreqAI/freqtrade/templates/freqaiprimer.py
2025-08-31 19:30:50 +08:00

719 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import warnings
warnings.filterwarnings("ignore", category=UserWarning, module="pandas_ta")
import logging
from freqtrade.strategy import IStrategy
from pandas import DataFrame
import pandas_ta as ta
from freqtrade.persistence import Trade
import numpy as np
import datetime
logger = logging.getLogger(__name__)
class FreqaiPrimer(IStrategy):
# 策略参数 - 调整以提高单笔盈利潜力
minimal_roi = {
"0": 0.05, # 5% ROI (10 分钟内)
"60": 0.03, # 3% ROI (1 小时)
"180": 0.01, # 1% ROI (3 小时)
"360": 0.005 # 0.5% ROI (6 小时)
}
stoploss = -0.15 # 固定止损 -15% (大幅放宽止损以承受更大波动)
trailing_stop = True
trailing_stop_positive_offset = 0.005 # 追踪止损偏移量 0.5% (更容易触发跟踪止盈)
# 用于跟踪市场状态的数据框缓存
_dataframe_cache = None
def __init__(self, config=None):
"""初始化策略参数调用父类初始化方法并接受config参数"""
super().__init__(config) # 调用父类的初始化方法并传递config
# 存储从配置文件加载的默认值
self._trailing_stop_positive_default = 0.004 # 降低默认值以更容易触发跟踪止盈
@property
def protections(self):
"""
保护机制配置
基于最新Freqtrade规范保护机制应定义在策略文件中而非配置文件
"""
return [
{
"method": "StoplossGuard",
"lookback_period_candles": 60, # 3小时回看期60根3分钟K线
"trade_limit": 2, # 最多2笔止损交易
"stop_duration_candles": 60, # 暂停180分钟60根3分钟K线
"only_per_pair": False # 仅针对单个币对
},
{
"method": "CooldownPeriod",
"stop_duration_candles": 2 # 6分钟冷却期2根3分钟K线
},
{
"method": "MaxDrawdown",
"lookback_period_candles": 48, # 2.4小时回看期
"trade_limit": 4, # 4笔交易限制
"stop_duration_candles": 24, # 72分钟暂停24根3分钟K线
"max_allowed_drawdown": 0.20 # 20%最大回撤容忍度
}
]
@property
def trailing_stop_positive(self):
"""根据市场状态动态调整跟踪止盈参数"""
# 获取当前市场状态
if self._dataframe_cache is not None and len(self._dataframe_cache) > 0:
current_state = self._dataframe_cache['market_state'].iloc[-1]
if current_state == 'strong_bull':
return 0.007 # 强劲牛市中降低跟踪止盈,让利润奔跑
elif current_state == 'weak_bull':
return 0.005 # 弱势牛市中保持较低的跟踪止盈
return self._trailing_stop_positive_default # 返回默认值
@trailing_stop_positive.setter
def trailing_stop_positive(self, value):
"""设置trailing_stop_positive的默认值"""
self._trailing_stop_positive_default = value
timeframe = "3m" # 主时间框架为 3 分钟
can_short = False # 禁用做空
# 自定义指标参数
bb_length = 20
bb_std = 2.0
rsi_length = 14
rsi_overbought = 58 # 超买阈值
rsi_oversold = 42 # 放宽超卖阈值,增加入场信号
# 剧烈拉升检测参数
H1_MAX_CANDLES = 200 # 检查最近200根1h K线
H1_RAPID_RISE_THRESHOLD = 0.05 # 5%的抬升阈值
H1_MAX_CONSECUTIVE_CANDLES = 3 # 最多连续3根K线内检查
def informative_pairs(self):
pairs = self.dp.current_whitelist()
return [(pair, '15m') for pair in pairs] + [(pair, '1h') for pair in pairs]
def _validate_dataframe_columns(self, dataframe: DataFrame, required_columns: list, metadata: dict):
"""
验证数据框中是否包含所有需要的列。
如果缺少列,则记录警告日志。
"""
missing_columns = [col for col in required_columns if col not in dataframe.columns]
if missing_columns:
logger.warning(f"[{metadata['pair']}] 数据框中缺少以下列: {missing_columns}")
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 计算 3m 周期的指标
bb_3m = ta.bbands(dataframe['close'], length=self.bb_length, std=self.bb_std)
dataframe['bb_lower_3m'] = bb_3m[f'BBL_{self.bb_length}_{self.bb_std}']
dataframe['bb_upper_3m'] = bb_3m[f'BBU_{self.bb_length}_{self.bb_std}']
dataframe['rsi_3m'] = ta.rsi(dataframe['close'], length=self.rsi_length)
# 新增 StochRSI 指标
stochrsi_3m = ta.stochrsi(dataframe['close'], length=14, rsi_length=14)
dataframe['stochrsi_k_3m'] = stochrsi_3m['STOCHRSIk_14_14_3_3']
dataframe['stochrsi_d_3m'] = stochrsi_3m['STOCHRSId_14_14_3_3']
# 新增 MACD 指标
macd_3m = ta.macd(dataframe['close'], fast=12, slow=26, signal=9)
dataframe['macd_3m'] = macd_3m['MACD_12_26_9']
dataframe['macd_signal_3m'] = macd_3m['MACDs_12_26_9']
dataframe['macd_hist_3m'] = macd_3m['MACDh_12_26_9']
# 计算3m时间框架的EMA50和EMA200
dataframe['ema_50_3m'] = ta.ema(dataframe['close'], length=50)
dataframe['ema_200_3m'] = ta.ema(dataframe['close'], length=200)
# 成交量过滤
dataframe['volume_ma'] = dataframe['volume'].rolling(20).mean()
# 计算 ATR 用于动态止损和退出
dataframe['atr'] = ta.atr(dataframe['high'], dataframe['low'], dataframe['close'], length=14)
# 获取 15m 数据
df_15m = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='15m')
df_15m['rsi_15m'] = ta.rsi(df_15m['close'], length=self.rsi_length)
# 计算15m时间框架的EMA50和EMA200
df_15m['ema_50_15m'] = ta.ema(df_15m['close'], length=50)
df_15m['ema_200_15m'] = ta.ema(df_15m['close'], length=200)
# 新增 StochRSI 指标
stochrsi_15m = ta.stochrsi(df_15m['close'], length=14, rsi_length=14)
df_15m['stochrsi_k_15m'] = stochrsi_15m['STOCHRSIk_14_14_3_3']
df_15m['stochrsi_d_15m'] = stochrsi_15m['STOCHRSId_14_14_3_3']
# 新增 MACD 指标
macd_15m = ta.macd(df_15m['close'], fast=12, slow=26, signal=9)
df_15m['macd_15m'] = macd_15m['MACD_12_26_9']
df_15m['macd_signal_15m'] = macd_15m['MACDs_12_26_9']
df_15m['macd_hist_15m'] = macd_15m['MACDh_12_26_9']
# 将 15m 数据重新索引到主时间框架 (3m)
df_15m = df_15m.set_index('date').reindex(dataframe['date']).reset_index()
df_15m = df_15m.rename(columns={'index': 'date'})
df_15m = df_15m[['date', 'rsi_15m', 'ema_50_15m', 'ema_200_15m']].fillna(method='ffill')
# 合并 15m 数据
dataframe = dataframe.merge(df_15m, how='left', on='date')
# 获取 1h 数据
df_1h = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='1h')
# 计算 1h 布林带
bb_1h = ta.bbands(df_1h['close'], length=self.bb_length, std=self.bb_std)
df_1h['bb_lower_1h'] = bb_1h[f'BBL_{self.bb_length}_{self.bb_std}']
df_1h['bb_upper_1h'] = bb_1h[f'BBU_{self.bb_length}_{self.bb_std}']
# 计算 1h RSI 和 EMA
df_1h['rsi_1h'] = ta.rsi(df_1h['close'], length=self.rsi_length)
df_1h['ema_50_1h'] = ta.ema(df_1h['close'], length=50) # 1h 50周期EMA
df_1h['ema_200_1h'] = ta.ema(df_1h['close'], length=200) # 1h 200周期EMA
df_1h['trend_1h'] = df_1h['close'] > df_1h['ema_50_1h'] # 1h上涨趋势
# 新增 StochRSI 指标
stochrsi_1h = ta.stochrsi(df_1h['close'], length=14, rsi_length=14)
df_1h['stochrsi_k_1h'] = stochrsi_1h['STOCHRSIk_14_14_3_3']
df_1h['stochrsi_d_1h'] = stochrsi_1h['STOCHRSId_14_14_3_3']
# 新增 MACD 指标
macd_1h = ta.macd(df_1h['close'], fast=12, slow=26, signal=9)
df_1h['macd_1h'] = macd_1h['MACD_12_26_9']
df_1h['macd_signal_1h'] = macd_1h['MACDs_12_26_9']
df_1h['macd_hist_1h'] = macd_1h['MACDh_12_26_9']
# 验证 MACD 列是否正确生成
logger.info(f"[{metadata['pair']}] 1小时 MACD 列: {list(macd_1h.columns)}")
# 新增 StochRSI 指标
stochrsi_1h = ta.stochrsi(df_1h['close'], length=14, rsi_length=14)
df_1h['stochrsi_k_1h'] = stochrsi_1h['STOCHRSIk_14_14_3_3']
df_1h['stochrsi_d_1h'] = stochrsi_1h['STOCHRSId_14_14_3_3']
# 将 1h 数据重新索引到主时间框架 (3m)
df_1h = df_1h.set_index('date').reindex(dataframe['date']).reset_index()
df_1h = df_1h.rename(columns={'index': 'date'})
# Include macd_1h and macd_signal_1h in the column selection
df_1h = df_1h[['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h', 'bb_lower_1h', 'bb_upper_1h', 'stochrsi_k_1h', 'stochrsi_d_1h', 'macd_1h', 'macd_signal_1h']].ffill()
# Validate that all required columns are present
required_columns = ['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h',
'bb_lower_1h', 'bb_upper_1h', 'stochrsi_k_1h', 'stochrsi_d_1h',
'macd_1h', 'macd_signal_1h']
missing_columns = [col for col in required_columns if col not in df_1h.columns]
if missing_columns:
logger.error(f"[{metadata['pair']}] 缺少以下列: {missing_columns}")
raise KeyError(f"缺少以下列: {missing_columns}")
# 确保所有需要的列都被合并
required_columns = ['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h',
'bb_lower_1h', 'bb_upper_1h', 'stochrsi_k_1h', 'stochrsi_d_1h',
'macd_1h', 'macd_signal_1h']
# 验证所需列是否存在
missing_columns = [col for col in required_columns if col not in df_1h.columns]
if missing_columns:
logger.error(f"[{metadata['pair']}] 缺少以下列: {missing_columns}")
raise KeyError(f"缺少以下列: {missing_columns}")
df_1h = df_1h[required_columns] # 确保包含 macd_1h 和 macd_signal_1h
# 合并 1h 数据
dataframe = dataframe.merge(df_1h, how='left', on='date').fillna(method='ffill')
# 验证合并后的列
logger.info(f"[{metadata['pair']}] 合并后的数据框列名: {list(dataframe.columns)}")
# K线形态看涨吞没
dataframe['bullish_engulfing'] = (
(dataframe['close'].shift(1) < dataframe['open'].shift(1)) &
(dataframe['close'] > dataframe['open']) &
(dataframe['close'] > dataframe['open'].shift(1)) &
(dataframe['open'] < dataframe['close'].shift(1))
)
# 计算各时间框架的趋势状态(牛/熊)
# 3m时间框架ema50下穿ema200为熊上穿为牛
dataframe['trend_3m'] = np.where(dataframe['ema_50_3m'] > dataframe['ema_200_3m'], 1, 0)
# 15m时间框架ema50下穿ema200为熊上穿为牛
dataframe['trend_15m'] = np.where(dataframe['ema_50_15m'] > dataframe['ema_200_15m'], 1, 0)
# 1h时间框架ema50下穿ema200为熊上穿为牛
dataframe['trend_1h_ema'] = np.where(dataframe['ema_50_1h'] > dataframe['ema_200_1h'], 1, 0)
# 计算熊牛得分0-100
# 权重3m熊牛权重1015m熊牛权重351h熊牛权重65
# 计算加权得分
dataframe['market_score'] = (
dataframe['trend_3m'] * 10 +
dataframe['trend_15m'] * 35 +
dataframe['trend_1h_ema'] * 65
)
# 确保得分在0-100范围内
dataframe['market_score'] = dataframe['market_score'].clip(lower=0, upper=100)
# 根据得分分类市场状态
dataframe['market_state'] = 'neutral'
dataframe.loc[dataframe['market_score'] > 70, 'market_state'] = 'strong_bull'
dataframe.loc[(dataframe['market_score'] > 50) & (dataframe['market_score'] <= 70), 'market_state'] = 'weak_bull'
dataframe.loc[(dataframe['market_score'] >= 30) & (dataframe['market_score'] <= 50), 'market_state'] = 'neutral'
dataframe.loc[(dataframe['market_score'] > 10) & (dataframe['market_score'] < 30), 'market_state'] = 'weak_bear'
dataframe.loc[dataframe['market_score'] <= 10, 'market_state'] = 'strong_bear'
# 记录当前的市场状态
if len(dataframe) > 0:
current_score = dataframe['market_score'].iloc[-1]
current_state = dataframe['market_state'].iloc[-1]
logger.info(f"[{metadata['pair']}] 熊牛得分: {current_score:.1f}, 市场状态: {current_state}")
logger.info(f"[{metadata['pair']}] 各时间框架趋势: 3m={'' if dataframe['trend_3m'].iloc[-1] == 1 else ''}, \
15m={'' if dataframe['trend_15m'].iloc[-1] == 1 else ''}, \
1h={'' if dataframe['trend_1h_ema'].iloc[-1] == 1 else ''}")
# 调试:打印指标值(最后 5 行),验证时间对齐
print(f"Pair: {metadata['pair']}, Last 5 rows after reindexing:")
print(dataframe[['date', 'close', 'bb_lower_3m', 'rsi_3m', 'rsi_15m', 'rsi_1h', 'trend_1h',
'trend_3m', 'trend_15m', 'trend_1h_ema', 'market_score', 'market_state',
'bullish_engulfing', 'volume', 'volume_ma']].tail(5))
# 打印最终数据框的列名以验证
logger.info(f"[{metadata['pair']}] 最终数据框列名: {list(dataframe.columns)}")
return dataframe
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 入场信号主要依据1小时周期
# 条件1: 价格接近布林带下轨
close_to_bb_lower_1h = (dataframe['close'] <= dataframe['bb_lower_1h'] * 1.02)
# 条件2: RSI 不高于阈值
rsi_condition_1h = dataframe['rsi_1h'] < self.rsi_oversold
# 条件3: StochRSI 处于超卖区域
stochrsi_condition_1h = (dataframe['stochrsi_k_1h'] < 20) & (dataframe['stochrsi_d_1h'] < 20)
# 条件4: MACD 上升趋势
macd_condition_1h = dataframe['macd_1h'] > dataframe['macd_signal_1h']
# 辅助条件: 3m 和 15m 趋势确认
trend_confirmation = (dataframe['trend_3m'] == 1) & (dataframe['trend_15m'] == 1)
# 合并所有条件
final_condition = close_to_bb_lower_1h & rsi_condition_1h & stochrsi_condition_1h & macd_condition_1h & trend_confirmation
# 设置入场信号
dataframe.loc[final_condition, 'enter_long'] = 1
# 日志记录
if dataframe['enter_long'].sum() > 0:
logger.info(f"[{metadata['pair']}] 发现入场信号数量: {dataframe['enter_long'].sum()}")
return dataframe
# 做多条件(放宽以增加入场信号)
# 1. 价格接近3m布林带下轨放宽到5%偏差)
close_to_bb_lower = (dataframe['close'] <= dataframe['bb_lower_3m'] * 1.05)
# 2. 至少一个时间框架RSI处于超卖区域
rsi_oversold = (dataframe['rsi_3m'] < self.rsi_oversold) | \
(dataframe['rsi_15m'] < self.rsi_oversold)
# 3. 1小时趋势向上或横盘
trend_1h = dataframe['trend_1h'] | (dataframe['market_state'] == 'neutral')
# 4. 成交量至少达到平均水平的80%
volume_condition = dataframe['volume'] > dataframe['volume_ma'] * 0.8
# 5. 看涨吞没形态或RSI接近超卖
special_condition = dataframe['bullish_engulfing'] | (dataframe['rsi_3m'] < self.rsi_oversold + 3)
# 检查剧烈拉升情况 - 如果检测到剧烈拉升,则不产生入场信号
pair = metadata['pair']
is_unstable_region, _ = self.detect_h1_rapid_rise(pair, dataframe, metadata)
# 熊牛得分检查当得分低于55时禁止入场放宽门槛
market_score_condition = dataframe['market_score'] >= 55
# 合并所有条件,并且确保不在不稳固区域且市场状态良好
final_condition = close_to_bb_lower & rsi_oversold & trend_1h & volume_condition & special_condition & (~is_unstable_region) & market_score_condition
# 所有原始条件(不包括剧烈拉升检查和熊牛得分检查)
original_conditions = close_to_bb_lower & rsi_oversold & trend_1h & volume_condition & special_condition & (~is_unstable_region)
dataframe.loc[final_condition, 'enter_long'] = 1
# 记录入场信号被阻止的情况
prevented_entries = dataframe[(~final_condition) & original_conditions]
# 统计因熊牛得分不足而被阻止的信号
low_score_prevented = dataframe[(~market_score_condition) & original_conditions]
if len(low_score_prevented) > 0:
logger.info(f"[{pair}] 由于熊牛得分低于60阻止了 {len(low_score_prevented)} 个潜在的入场信号")
if len(prevented_entries) > 0:
logger.info(f"[{pair}] 总共阻止了 {len(prevented_entries)} 个潜在的入场信号")
# 调试:检查每个条件的触发情况(简化输出)
if dataframe['enter_long'].sum() > 0:
logger.info(f"[{pair}] 发现入场信号数量: {dataframe['enter_long'].sum()}")
return dataframe
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 出场信号基于趋势和量价关系
# 条件1: 价格突破布林带上轨
breakout_condition = dataframe['close'] >= dataframe['bb_upper_1h']
# 条件2: 成交量显著放大
volume_spike = dataframe['volume'] > dataframe['volume_ma'] * 2
# 条件3: MACD 下降趋势
macd_downward = dataframe['macd_1h'] < dataframe['macd_signal_1h']
# 条件4: RSI 进入超买区域
rsi_overbought = dataframe['rsi_1h'] > self.rsi_overbought
# 合并所有条件
final_condition = breakout_condition | volume_spike | macd_downward | rsi_overbought
# 设置出场信号
dataframe.loc[final_condition, 'exit_long'] = 1
# 日志记录
if dataframe['exit_long'].sum() > 0:
logger.info(f"[{metadata['pair']}] 触发出场信号数量: {dataframe['exit_long'].sum()}")
return dataframe
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 入场信号主要依据1小时周期
# 条件1: 价格接近布林带下轨
close_to_bb_lower_1h = (dataframe['close'] <= dataframe['bb_lower_1h'] * 1.02)
# 条件2: RSI 不高于阈值(放宽阈值)
rsi_condition_1h = dataframe['rsi_1h'] < 50 # 放宽 RSI 阈值
# 条件3: StochRSI 处于超卖区域(放宽阈值)
stochrsi_condition_1h = (dataframe['stochrsi_k_1h'] < 30) & (dataframe['stochrsi_d_1h'] < 30) # 放宽 StochRSI 阈值
# 条件4: MACD 上升趋势
macd_condition_1h = dataframe['macd_1h'] > dataframe['macd_signal_1h']
# 条件5: 成交量显著放大
volume_spike = dataframe['volume'] > dataframe['volume_ma'] * 1.5
# 条件6: 布林带宽度过滤(避免窄幅震荡)
bb_width = (dataframe['bb_upper_1h'] - dataframe['bb_lower_1h']) / dataframe['close']
bb_width_condition = bb_width > 0.02 # 布林带宽度大于2%
# 辅助条件: 3m 和 15m 趋势确认
trend_confirmation = (dataframe['trend_3m'] == 1) & (dataframe['trend_15m'] == 1)
# 合并所有条件(增加成交量和布林带宽度过滤)
final_condition = (
close_to_bb_lower_1h &
(rsi_condition_1h | stochrsi_condition_1h) & # 允许 RSI 或 StochRSI 满足其一
macd_condition_1h &
volume_spike & # 成交量显著放大
bb_width_condition & # 布林带宽度过滤
trend_confirmation
)
# 设置入场信号
dataframe.loc[final_condition, 'enter_long'] = 1
# 增强调试信息
logger.info(f"[{metadata['pair']}] 入场条件检查:")
logger.info(f" - 价格接近布林带下轨: {close_to_bb_lower_1h.sum()}")
logger.info(f" - RSI 超卖: {rsi_condition_1h.sum()}")
logger.info(f" - StochRSI 超卖: {stochrsi_condition_1h.sum()}")
logger.info(f" - MACD 上升趋势: {macd_condition_1h.sum()}")
logger.info(f" - 趋势确认: {trend_confirmation.sum()}")
logger.info(f" - 最终条件: {final_condition.sum()}")
# 日志记录
if dataframe['enter_long'].sum() > 0:
logger.info(f"[{metadata['pair']}] 发现入场信号数量: {dataframe['enter_long'].sum()}")
return dataframe
# 缓存数据框以便在trailing_stop_positive属性中使用
self._dataframe_cache = dataframe
# 基础退出条件
basic_exit = (
(dataframe['close'] >= dataframe['bb_upper_3m']) |
(dataframe['rsi_3m'] > self.rsi_overbought)
)
# 强劲趋势条件1h趋势向上 + 熊牛得分>70
strong_trend = (dataframe['trend_1h_ema'] == 1) & (dataframe['market_score'] > 70)
# 一般趋势条件熊牛得分50-70
normal_trend = (dataframe['market_score'] >= 50) & (dataframe['market_score'] <= 70)
# 极端强劲趋势条件:熊牛得分>95
extreme_strong_trend = (dataframe['market_score'] > 95)
# 获取15m数据进行趋势确认
df_15m = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='15m')
# 确保df_15m中包含rsi_15m列如果没有则计算
try:
import pandas_ta as ta
if 'rsi_15m' not in df_15m.columns:
# 使用pandas_ta计算14期RSI
rsi_series = ta.rsi(df_15m['close'], length=14)
df_15m['rsi_15m'] = rsi_series
except Exception as e:
# 如果计算失败,创建一个默认值列
logger.warning(f"[{metadata['pair']}] 计算15m RSI时出错: {e}")
df_15m['rsi_15m'] = 50 # 设置默认值为中性
# 重命名date列并保留必要的列
df_15m = df_15m[['date', 'rsi_15m']].rename(columns={'date': 'date_15m'})
# 合并数据
merged_data = dataframe.copy()
# 创建合并列
merged_data['rsi_15m'] = 50 # 设置默认值
# 尝试合并数据
try:
# 使用更简单的方式直接将df_15m的rsi_15m列映射到dataframe
# 创建一个日期到rsi的映射字典
date_rsi_map = df_15m.set_index('date_15m')['rsi_15m'].to_dict()
# 使用映射填充rsi_15m列
merged_data['rsi_15m'] = merged_data['date'].map(date_rsi_map).ffill().bfill()
except Exception as e:
logger.warning(f"[{metadata['pair']}] 合并15m数据时出错: {e}")
# 趋势反转信号15m RSI超买
trend_reversal = merged_data['rsi_15m'] > 75
# 动态调整退出条件 - 提高ATR倍数以增加单笔盈利潜力
# 强劲趋势中大幅提高ATR倍数到5.5倍,充分利用趋势利润
# 但在极端强劲趋势(market_score>95)下不给出退出信号
dataframe.loc[strong_trend & ~extreme_strong_trend & ((dataframe['close'] > dataframe['open'] + dataframe['atr'] * 5.5) | (basic_exit & trend_reversal)), 'exit_long'] = 1
# 一般趋势中提高到4倍ATR退出
# 但在极端强劲趋势(market_score>95)下不给出退出信号
dataframe.loc[normal_trend & ~extreme_strong_trend & ((dataframe['close'] > dataframe['open'] + dataframe['atr'] * 4) | basic_exit), 'exit_long'] = 1
# 非趋势或弱势保持2倍ATR退出
# 但在极端强劲趋势(market_score>95)下不给出退出信号
# 记录退出决策日志
if len(dataframe[dataframe['exit_long'] == 1]) > 0:
last_exit = dataframe[dataframe['exit_long'] == 1].iloc[-1]
current_state = dataframe['market_state'].iloc[-1]
logger.info(f"[{metadata['pair']}] 触发退出信号,市场状态: {current_state}, 价格: {last_exit['close']:.2f}")
# 记录因极端强劲趋势阻止的退出信号
extreme_strong_prevented = dataframe[extreme_strong_trend & ((strong_trend & ((dataframe['close'] > dataframe['open'] + dataframe['atr'] * 5.5) | (basic_exit & trend_reversal))) | (normal_trend & ((dataframe['close'] > dataframe['open'] + dataframe['atr'] * 4) | basic_exit)))]
if len(extreme_strong_prevented) > 0:
logger.info(f"[{metadata['pair']}] 由于极端强劲趋势(market_score>95),阻止了 {len(extreme_strong_prevented)} 个潜在的退出信号")
return dataframe
def detect_h1_rapid_rise(self, pair: str, dataframe: DataFrame, metadata: dict) -> tuple[bool, float]:
"""
检测1小时K线图上的剧烈拉升情况
参数:
- pair: 交易对
- dataframe: 数据框
- metadata: 元数据
返回:
- tuple: (是否处于不稳固区域, 当前价格在最近价格范围中的百分比)
"""
try:
# 获取1小时K线数据
df_1h = self.dp.get_pair_dataframe(pair=pair, timeframe='1h')
# 确保有足够的K线数据
if len(df_1h) < self.H1_MAX_CANDLES:
logger.warning(f"[{pair}] 1h K线数据不足 {self.H1_MAX_CANDLES} 根,当前只有 {len(df_1h)} 根,无法完整检测剧烈拉升")
return False, 0.0
# 获取最近的K线
recent_data = df_1h.iloc[-self.H1_MAX_CANDLES:].copy()
current_price = recent_data['close'].iloc[-1]
# 检查连续最多3根K线内的最大涨幅
rapid_rise_detected = False
max_rise = 0
for i in range(len(recent_data) - self.H1_MAX_CONSECUTIVE_CANDLES + 1):
window_data = recent_data.iloc[i:i + self.H1_MAX_CONSECUTIVE_CANDLES]
window_low = window_data['low'].min()
window_high = window_data['high'].max()
# 计算区间内的最大涨幅
if window_low > 0:
rise_percentage = (window_high - window_low) / window_low
if rise_percentage > max_rise:
max_rise = rise_percentage
# 检查是否超过阈值
if rise_percentage >= self.H1_RAPID_RISE_THRESHOLD:
rapid_rise_detected = True
logger.info(f"[{pair}] 检测到剧烈拉升: 从 {window_low:.2f}{window_high:.2f} ({rise_percentage:.2%}) 在 {self.H1_MAX_CONSECUTIVE_CANDLES} 根K线内")
break
# 计算当前价格在最近价格范围中的位置百分比
recent_low = recent_data['low'].min()
recent_high = recent_data['high'].max()
if recent_high > recent_low:
price_position_pct = (current_price - recent_low) / (recent_high - recent_low) * 100
else:
price_position_pct = 50.0 # 默认中间位置
# 判断是否处于不稳固区域
is_unstable_region = rapid_rise_detected
logger.info(f"[{pair}] 剧烈拉升检测结果: {'不稳固' if is_unstable_region else '稳固'}")
logger.info(f"[{pair}] 当前价格位置: {current_price:.2f} ({price_position_pct:.1f}%), 最近最大涨幅: {max_rise:.2%}")
return is_unstable_region, price_position_pct
except Exception as e:
logger.error(f"[{pair}] 剧烈拉升检测过程中发生错误: {str(e)}")
return False, 0.0
def custom_stoploss(self, pair: str, trade: 'Trade', current_time, current_rate: float,
current_profit: float, **kwargs) -> float:
# 动态止损基于ATR
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1]
atr = last_candle['atr']
# 获取当前市场状态
current_state = dataframe['market_state'].iloc[-1] if 'market_state' in dataframe.columns else 'unknown'
# 更激进的渐进式止损策略
if current_profit > 0.04: # 利润超过4%时
return -2.0 * atr / current_rate # 大幅扩大止损范围,让利润奔跑
elif current_profit > 0.025: # 利润超过2.5%时
return -1.7 * atr / current_rate # 中等扩大止损范围
elif current_profit > 0.01: # 利润超过1%时
return -1.3 * atr / current_rate # 轻微扩大止损范围
# 在强劲牛市中,即使小亏损也可以容忍更大回调
if current_state == 'strong_bull' and current_profit > -0.01:
return -1.5 * atr / current_rate
# 动态调整止损范围
if current_profit > 0.05: # 利润超过5%时
return -2.5 * atr / current_rate # 大幅扩大止损范围,让利润奔跑
elif current_profit > 0.03: # 利润超过3%时
return -2.0 * atr / current_rate # 中等扩大止损范围
elif current_profit > 0.01: # 利润超过1%时
return -1.5 * atr / current_rate # 轻微扩大止损范围
# 在强劲牛市中,即使小亏损也可以容忍更大回调
if current_state == 'strong_bull' and current_profit > -0.01:
return -1.8 * atr / current_rate
if atr > 0:
return -1.2 * atr / current_rate # 基础1.2倍ATR止损
return self.stoploss
def custom_exit(self, pair: str, trade: 'Trade', current_time, current_rate: float,
current_profit: float, **kwargs) -> float:
"""渐进式止盈逻辑"""
# 获取当前市场状态
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
current_state = dataframe['market_state'].iloc[-1] if 'market_state' in dataframe.columns else 'unknown'
# 定义更激进的渐进式止盈水平,提高收益上限
profit_levels = {
# 状态: [(止盈触发利润, 止盈比例)]
'strong_bull': [(0.03, 0.2), (0.06, 0.35), (0.09, 0.5), (0.12, 0.65), (0.15, 0.8)], # 强劲牛市的渐进止盈,提高目标
'weak_bull': [(0.02, 0.25), (0.04, 0.45), (0.07, 0.7), (0.10, 0.9)], # 弱牛市的渐进止盈
'neutral': [(0.015, 0.35), (0.03, 0.6), (0.05, 0.85)], # 中性市场的渐进止盈
'bear': [(0.01, 0.6), (0.02, 0.9), (0.03, 1.0)] # 熊市的渐进止盈(更保守)
}
# 默认使用中性市场的止盈设置
levels = profit_levels.get(current_state, profit_levels['neutral'])
# 在强劲牛市中,进一步放宽止盈目标
if current_state == 'strong_bull':
levels = [(p + 0.01, r) for p, r in levels] # 将止盈触发利润提高1%
# 确定当前应该止盈的比例
exit_ratio = 0.0
for profit_target, ratio in levels:
if current_profit >= profit_target:
exit_ratio = ratio
else:
break
# 记录渐进式止盈决策
if exit_ratio > 0:
logger.info(f"[{pair}] 渐进式止盈: 当前利润 {current_profit:.2%}, 市场状态 {current_state}, 止盈比例 {exit_ratio:.0%}")
# 返回应退出的比例0.0表示不退出1.0表示全部退出)
return exit_ratio
def adjust_trade_position(self, trade: 'Trade', current_time, current_rate: float,
current_profit: float, min_stake: float, max_stake: float, **kwargs) -> float:
"""
根据用户要求实现加仓逻辑
- 加仓间隔设置为0.0474.7%回调)
- 加仓额度为: (stake_amount / 2) ^ (加仓次数 - 1)
"""
# 检查是否已启用加仓
if not hasattr(self, 'max_entry_adjustments'):
self.max_entry_adjustments = 3 # 设置最大加仓次数
# 获取当前交易对
pair = trade.pair
# 获取当前交易的加仓次数
# 初始交易算第1次加仓次数=total_entry_position - 1
entry_count = len(trade.orders) # 获取所有入场订单数量
# 如果已经达到最大加仓次数,则不再加仓
if entry_count - 1 >= self.max_entry_adjustments:
return 0.0
# 获取初始入场价格和当前价格的差值百分比
initial_price = trade.open_rate
price_diff_pct = (current_rate - initial_price) / initial_price
# 检查价格回调是否达到加仓间隔0.047
# 价格回调表示价格比初始价格低4.7%或更多
# 同时考虑市场状态,只在市场不是弱势时加仓
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
current_state = dataframe['market_state'].iloc[-1] if 'market_state' in dataframe.columns else 'neutral'
if price_diff_pct <= -0.047 and current_state not in ['bear', 'weak_bear']:
# 计算初始入场金额
initial_stake = trade.orders[0].cost # 第一笔订单的成本
# 计算加仓次数从1开始计数
adjustment_count = entry_count - 1 # 已加仓次数
# 计算加仓额度: (stake_amount / 2) ^ (加仓次数)
# 对于第一次加仓,公式为 (initial_stake / 2) ^ 1 = initial_stake / 2
# 第二次加仓,公式为 (initial_stake / 2) ^ 2
# 第三次加仓,公式为 (initial_stake / 2) ^ 3
# 计算加仓金额
additional_stake = (initial_stake / 2) ** (adjustment_count + 1)
# 确保加仓金额在允许的范围内
additional_stake = max(min_stake, min(additional_stake, max_stake - trade.stake_amount))
logger.info(f"[{pair}] 触发加仓: 第{adjustment_count + 1}次加仓, 初始金额{initial_stake:.2f}, \
加仓金额{additional_stake:.2f}, 价格差{price_diff_pct:.2%}, 当前利润{current_profit:.2%}")
return additional_stake
# 不符合加仓条件返回0
return 0.0