myTestFreqAI/freqtrade/templates/freqaiprimer.py
zhangkun9038@dingtalk.com 5ea63b154c fix issue
2025-08-30 13:39:16 +08:00

416 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import warnings
warnings.filterwarnings("ignore", category=UserWarning, module="pandas_ta")
import logging
from freqtrade.strategy import IStrategy
from pandas import DataFrame
import pandas_ta as ta
from freqtrade.persistence import Trade
import numpy as np
import datetime
logger = logging.getLogger(__name__)
class FreqaiPrimer(IStrategy):
# 策略参数
minimal_roi = {
"0": 0.03, # 3% ROI (10 分钟内)
"60": 0.015, # 1.5% ROI (1 小时)
"180": 0.005, # 0.5% ROI (3 小时)
"360": 0.0 # 0% ROI (6 小时)
}
stoploss = -0.01 # 固定止损 -1%
trailing_stop = True
trailing_stop_positive_offset = 0.008 # 追踪止损偏移量 0.8%
# 用于跟踪市场状态的数据框缓存
_dataframe_cache = None
def __init__(self, config=None):
"""初始化策略参数调用父类初始化方法并接受config参数"""
super().__init__(config) # 调用父类的初始化方法并传递config
# 存储从配置文件加载的默认值
self._trailing_stop_positive_default = 0.005
@property
def trailing_stop_positive(self):
"""根据市场状态动态调整跟踪止盈参数"""
# 获取当前市场状态
if self._dataframe_cache is not None and len(self._dataframe_cache) > 0:
current_state = self._dataframe_cache['market_state'].iloc[-1]
if current_state == 'strong_bull':
return 0.01 # 强劲牛市中放宽跟踪止盈
elif current_state == 'weak_bull':
return 0.007
return self._trailing_stop_positive_default # 返回默认值
@trailing_stop_positive.setter
def trailing_stop_positive(self, value):
"""设置trailing_stop_positive的默认值"""
self._trailing_stop_positive_default = value
timeframe = "3m" # 主时间框架为 3 分钟
can_short = False # 禁用做空
# 自定义指标参数
bb_length = 20
bb_std = 2.0
rsi_length = 14
rsi_overbought = 58 # 收紧超买阈值到 60
rsi_oversold = 39 # 放宽超卖阈值到 45
# 剧烈拉升检测参数
H1_MAX_CANDLES = 200 # 检查最近200根1h K线
H1_RAPID_RISE_THRESHOLD = 0.05 # 5%的抬升阈值
H1_MAX_CONSECUTIVE_CANDLES = 3 # 最多连续3根K线内检查
def informative_pairs(self):
pairs = self.dp.current_whitelist()
return [(pair, '15m') for pair in pairs] + [(pair, '1h') for pair in pairs]
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 计算 3m 周期的指标
bb_3m = ta.bbands(dataframe['close'], length=self.bb_length, std=self.bb_std)
dataframe['bb_lower_3m'] = bb_3m[f'BBL_{self.bb_length}_{self.bb_std}']
dataframe['bb_upper_3m'] = bb_3m[f'BBU_{self.bb_length}_{self.bb_std}']
dataframe['rsi_3m'] = ta.rsi(dataframe['close'], length=self.rsi_length)
# 计算3m时间框架的EMA50和EMA200
dataframe['ema_50_3m'] = ta.ema(dataframe['close'], length=50)
dataframe['ema_200_3m'] = ta.ema(dataframe['close'], length=200)
# 成交量过滤
dataframe['volume_ma'] = dataframe['volume'].rolling(20).mean()
# 计算 ATR 用于动态止损和退出
dataframe['atr'] = ta.atr(dataframe['high'], dataframe['low'], dataframe['close'], length=14)
# 获取 15m 数据
df_15m = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='15m')
df_15m['rsi_15m'] = ta.rsi(df_15m['close'], length=self.rsi_length)
# 计算15m时间框架的EMA50和EMA200
df_15m['ema_50_15m'] = ta.ema(df_15m['close'], length=50)
df_15m['ema_200_15m'] = ta.ema(df_15m['close'], length=200)
# 手动合并 15m 数据
df_15m = df_15m[['date', 'rsi_15m', 'ema_50_15m', 'ema_200_15m']]
df_15m = df_15m.rename(columns={'date': 'date_15m'})
dataframe = dataframe.merge(df_15m, how='left', left_on='date', right_on='date_15m')
dataframe = dataframe.fillna(method='ffill')
# 获取 1h 数据
df_1h = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='1h')
df_1h['rsi_1h'] = ta.rsi(df_1h['close'], length=self.rsi_length)
df_1h['ema_50_1h'] = ta.ema(df_1h['close'], length=50) # 1h 50周期EMA
df_1h['ema_200_1h'] = ta.ema(df_1h['close'], length=200) # 1h 200周期EMA
df_1h['trend_1h'] = df_1h['close'] > df_1h['ema_50_1h'] # 1h上涨趋势
# 手动合并 1h 数据
df_1h = df_1h[['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h']]
df_1h = df_1h.rename(columns={'date': 'date_1h'})
dataframe = dataframe.merge(df_1h, how='left', left_on='date', right_on='date_1h')
dataframe = dataframe.fillna(method='ffill')
# K线形态看涨吞没
dataframe['bullish_engulfing'] = (
(dataframe['close'].shift(1) < dataframe['open'].shift(1)) &
(dataframe['close'] > dataframe['open']) &
(dataframe['close'] > dataframe['open'].shift(1)) &
(dataframe['open'] < dataframe['close'].shift(1))
)
# 计算各时间框架的趋势状态(牛/熊)
# 3m时间框架ema50下穿ema200为熊上穿为牛
dataframe['trend_3m'] = np.where(dataframe['ema_50_3m'] > dataframe['ema_200_3m'], 1, 0)
# 15m时间框架ema50下穿ema200为熊上穿为牛
dataframe['trend_15m'] = np.where(dataframe['ema_50_15m'] > dataframe['ema_200_15m'], 1, 0)
# 1h时间框架ema50下穿ema200为熊上穿为牛
dataframe['trend_1h_ema'] = np.where(dataframe['ema_50_1h'] > dataframe['ema_200_1h'], 1, 0)
# 计算熊牛得分0-100
# 权重3m熊牛权重1015m熊牛权重351h熊牛权重65
# 计算加权得分
dataframe['market_score'] = (
dataframe['trend_3m'] * 10 +
dataframe['trend_15m'] * 35 +
dataframe['trend_1h_ema'] * 65
)
# 确保得分在0-100范围内
dataframe['market_score'] = dataframe['market_score'].clip(lower=0, upper=100)
# 根据得分分类市场状态
dataframe['market_state'] = 'neutral'
dataframe.loc[dataframe['market_score'] > 70, 'market_state'] = 'strong_bull'
dataframe.loc[(dataframe['market_score'] > 50) & (dataframe['market_score'] <= 70), 'market_state'] = 'weak_bull'
dataframe.loc[(dataframe['market_score'] >= 30) & (dataframe['market_score'] <= 50), 'market_state'] = 'neutral'
dataframe.loc[(dataframe['market_score'] > 10) & (dataframe['market_score'] < 30), 'market_state'] = 'weak_bear'
dataframe.loc[dataframe['market_score'] <= 10, 'market_state'] = 'strong_bear'
# 记录当前的市场状态
if len(dataframe) > 0:
current_score = dataframe['market_score'].iloc[-1]
current_state = dataframe['market_state'].iloc[-1]
logger.info(f"[{metadata['pair']}] 熊牛得分: {current_score:.1f}, 市场状态: {current_state}")
logger.info(f"[{metadata['pair']}] 各时间框架趋势: 3m={'' if dataframe['trend_3m'].iloc[-1] == 1 else ''}, \
15m={'' if dataframe['trend_15m'].iloc[-1] == 1 else ''}, \
1h={'' if dataframe['trend_1h_ema'].iloc[-1] == 1 else ''}")
# 调试:打印指标值(最后 5 行)
print(f"Pair: {metadata['pair']}, Last 5 rows:")
print(dataframe[['date', 'close', 'bb_lower_3m', 'rsi_3m', 'rsi_15m', 'rsi_1h', 'trend_1h',
'trend_3m', 'trend_15m', 'trend_1h_ema', 'market_score', 'market_state',
'bullish_engulfing', 'volume', 'volume_ma']].tail(5))
return dataframe
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 做多条件放宽以达到每月150单
# 1. 价格接近3m布林带下轨3%偏差)
close_to_bb_lower = (dataframe['close'] <= dataframe['bb_lower_3m'] * 1.03)
# 2. 多时间框架RSI处于超卖区域
rsi_oversold = (dataframe['rsi_3m'] < self.rsi_oversold) & \
(dataframe['rsi_15m'] < self.rsi_oversold)
# 3. 1小时趋势向上
trend_1h = dataframe['trend_1h']
# 4. 成交量高于平均水平
volume_condition = dataframe['volume'] > dataframe['volume_ma']
# 5. 看涨吞没形态或极低RSI
special_condition = dataframe['bullish_engulfing'] | (dataframe['rsi_3m'] < self.rsi_oversold - 5)
# 检查剧烈拉升情况 - 如果检测到剧烈拉升,则不产生入场信号
pair = metadata['pair']
is_unstable_region, _ = self.detect_h1_rapid_rise(pair, dataframe, metadata)
# 熊牛得分检查当得分低于60时禁止入场
market_score_condition = dataframe['market_score'] >= 60
# 合并所有条件,并且确保不在不稳固区域且市场状态良好
final_condition = close_to_bb_lower & rsi_oversold & trend_1h & volume_condition & special_condition & (~is_unstable_region) & market_score_condition
# 所有原始条件(不包括剧烈拉升检查和熊牛得分检查)
original_conditions = close_to_bb_lower & rsi_oversold & trend_1h & volume_condition & special_condition & (~is_unstable_region)
dataframe.loc[final_condition, 'enter_long'] = 1
# 记录入场信号被阻止的情况
prevented_entries = dataframe[(~final_condition) & original_conditions]
# 统计因熊牛得分不足而被阻止的信号
low_score_prevented = dataframe[(~market_score_condition) & original_conditions]
if len(low_score_prevented) > 0:
logger.info(f"[{pair}] 由于熊牛得分低于60阻止了 {len(low_score_prevented)} 个潜在的入场信号")
if len(prevented_entries) > 0:
logger.info(f"[{pair}] 总共阻止了 {len(prevented_entries)} 个潜在的入场信号")
# 调试:检查每个条件的触发情况
print(f"Pair: {metadata['pair']}, Entry condition checks:")
print(f" - Close <= bb_lower_3m * 1.03: {close_to_bb_lower.sum()} candles")
print(f" - RSI_3m < {self.rsi_oversold}: {(dataframe['rsi_3m'] < self.rsi_oversold).sum()} candles")
print(f" - RSI_15m < {self.rsi_oversold}: {(dataframe['rsi_15m'] < self.rsi_oversold).sum()} candles")
print(f" - Trend_1h (Close > EMA_50_1h): {trend_1h.sum()} candles")
print(f" - Volume > Volume_MA: {volume_condition.sum()} candles")
print(f" - Bullish Engulfing or RSI_3m < {self.rsi_oversold - 5}: {special_condition.sum()} candles")
if dataframe['enter_long'].sum() > 0:
print(f"Entry signals found at:")
print(dataframe[dataframe['enter_long'] == 1][['date', 'close', 'rsi_3m', 'rsi_15m', 'trend_1h', 'bullish_engulfing']])
else:
print("No entry signals generated.")
return dataframe
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 缓存数据框以便在trailing_stop_positive属性中使用
self._dataframe_cache = dataframe
# 基础退出条件
basic_exit = (
(dataframe['close'] >= dataframe['bb_upper_3m']) |
(dataframe['rsi_3m'] > self.rsi_overbought)
)
# 强劲趋势条件1h趋势向上 + 熊牛得分>70
strong_trend = (dataframe['trend_1h_ema'] == 1) & (dataframe['market_score'] > 70)
# 一般趋势条件熊牛得分50-70
normal_trend = (dataframe['market_score'] >= 50) & (dataframe['market_score'] <= 70)
# 获取15m数据进行趋势确认
df_15m = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='15m')
# 确保df_15m中包含rsi_15m列如果没有则计算
try:
import pandas_ta as ta
if 'rsi_15m' not in df_15m.columns:
# 使用pandas_ta计算14期RSI
rsi_series = ta.rsi(df_15m['close'], length=14)
df_15m['rsi_15m'] = rsi_series
except Exception as e:
# 如果计算失败,创建一个默认值列
logger.warning(f"[{metadata['pair']}] 计算15m RSI时出错: {e}")
df_15m['rsi_15m'] = 50 # 设置默认值为中性
# 重命名date列并保留必要的列
df_15m = df_15m[['date', 'rsi_15m']].rename(columns={'date': 'date_15m'})
# 合并数据
merged_data = dataframe.copy()
# 创建合并列
merged_data['rsi_15m'] = 50 # 设置默认值
# 尝试合并数据
try:
# 使用更简单的方式直接将df_15m的rsi_15m列映射到dataframe
# 创建一个日期到rsi的映射字典
date_rsi_map = df_15m.set_index('date_15m')['rsi_15m'].to_dict()
# 使用映射填充rsi_15m列
merged_data['rsi_15m'] = merged_data['date'].map(date_rsi_map).ffill().bfill()
except Exception as e:
logger.warning(f"[{metadata['pair']}] 合并15m数据时出错: {e}")
# 趋势反转信号15m RSI超买
trend_reversal = merged_data['rsi_15m'] > 75
# 动态调整退出条件
# 强劲趋势中只有获利达到3倍ATR才退出或出现明确的趋势反转信号
dataframe.loc[strong_trend & ((dataframe['close'] > dataframe['open'] + dataframe['atr'] * 3) | (basic_exit & trend_reversal)), 'exit_long'] = 1
# 一般趋势中保持原有2倍ATR退出
dataframe.loc[normal_trend & ((dataframe['close'] > dataframe['open'] + dataframe['atr'] * 2) | basic_exit), 'exit_long'] = 1
# 非趋势或弱势:使用基础条件+1.5倍ATR
dataframe.loc[~strong_trend & ~normal_trend & (basic_exit | (dataframe['close'] > dataframe['open'] + dataframe['atr'] * 1.5)), 'exit_long'] = 1
# 记录退出决策日志
if len(dataframe[dataframe['exit_long'] == 1]) > 0:
last_exit = dataframe[dataframe['exit_long'] == 1].iloc[-1]
current_state = dataframe['market_state'].iloc[-1]
logger.info(f"[{metadata['pair']}] 触发退出信号,市场状态: {current_state}, 价格: {last_exit['close']:.2f}")
return dataframe
def detect_h1_rapid_rise(self, pair: str, dataframe: DataFrame, metadata: dict) -> tuple[bool, float]:
"""
检测1小时K线图上的剧烈拉升情况
参数:
- pair: 交易对
- dataframe: 数据框
- metadata: 元数据
返回:
- tuple: (是否处于不稳固区域, 当前价格在最近价格范围中的百分比)
"""
try:
# 获取1小时K线数据
df_1h = self.dp.get_pair_dataframe(pair=pair, timeframe='1h')
# 确保有足够的K线数据
if len(df_1h) < self.H1_MAX_CANDLES:
logger.warning(f"[{pair}] 1h K线数据不足 {self.H1_MAX_CANDLES} 根,当前只有 {len(df_1h)} 根,无法完整检测剧烈拉升")
return False, 0.0
# 获取最近的K线
recent_data = df_1h.iloc[-self.H1_MAX_CANDLES:].copy()
current_price = recent_data['close'].iloc[-1]
# 检查连续最多3根K线内的最大涨幅
rapid_rise_detected = False
max_rise = 0
for i in range(len(recent_data) - self.H1_MAX_CONSECUTIVE_CANDLES + 1):
window_data = recent_data.iloc[i:i + self.H1_MAX_CONSECUTIVE_CANDLES]
window_low = window_data['low'].min()
window_high = window_data['high'].max()
# 计算区间内的最大涨幅
if window_low > 0:
rise_percentage = (window_high - window_low) / window_low
if rise_percentage > max_rise:
max_rise = rise_percentage
# 检查是否超过阈值
if rise_percentage >= self.H1_RAPID_RISE_THRESHOLD:
rapid_rise_detected = True
logger.info(f"[{pair}] 检测到剧烈拉升: 从 {window_low:.2f}{window_high:.2f} ({rise_percentage:.2%}) 在 {self.H1_MAX_CONSECUTIVE_CANDLES} 根K线内")
break
# 计算当前价格在最近价格范围中的位置百分比
recent_low = recent_data['low'].min()
recent_high = recent_data['high'].max()
if recent_high > recent_low:
price_position_pct = (current_price - recent_low) / (recent_high - recent_low) * 100
else:
price_position_pct = 50.0 # 默认中间位置
# 判断是否处于不稳固区域
is_unstable_region = rapid_rise_detected
logger.info(f"[{pair}] 剧烈拉升检测结果: {'不稳固' if is_unstable_region else '稳固'}")
logger.info(f"[{pair}] 当前价格位置: {current_price:.2f} ({price_position_pct:.1f}%), 最近最大涨幅: {max_rise:.2%}")
return is_unstable_region, price_position_pct
except Exception as e:
logger.error(f"[{pair}] 剧烈拉升检测过程中发生错误: {str(e)}")
return False, 0.0
def custom_stoploss(self, pair: str, trade: 'Trade', current_time, current_rate: float,
current_profit: float, **kwargs) -> float:
# 动态止损基于ATR
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1]
atr = last_candle['atr']
# 渐进式止损策略
if current_profit > 0.03: # 利润超过3%时
return -1.5 * atr / current_rate # 扩大止损范围,让利润奔跑
elif current_profit > 0.015: # 利润超过1.5%时
return -1.2 * atr / current_rate # 轻微扩大止损范围
if atr > 0:
return -1.0 * atr / current_rate # 基础1倍ATR止损
return self.stoploss
def custom_exit(self, pair: str, trade: 'Trade', current_time, current_rate: float,
current_profit: float, **kwargs) -> float:
"""渐进式止盈逻辑"""
# 获取当前市场状态
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
current_state = dataframe['market_state'].iloc[-1] if 'market_state' in dataframe.columns else 'unknown'
# 定义渐进式止盈水平
profit_levels = {
# 状态: [(止盈触发利润, 止盈比例)]
'strong_bull': [(0.02, 0.3), (0.04, 0.5), (0.06, 0.7), (0.08, 0.9)], # 强劲牛市的渐进止盈
'weak_bull': [(0.015, 0.3), (0.03, 0.5), (0.05, 0.8)], # 弱牛市的渐进止盈
'neutral': [(0.01, 0.4), (0.02, 0.7), (0.03, 0.9)], # 中性市场的渐进止盈
'bear': [(0.008, 0.5), (0.015, 0.8), (0.025, 1.0)] # 熊市的渐进止盈(更保守)
}
# 默认使用中性市场的止盈设置
levels = profit_levels.get(current_state, profit_levels['neutral'])
# 确定当前应该止盈的比例
exit_ratio = 0.0
for profit_target, ratio in levels:
if current_profit >= profit_target:
exit_ratio = ratio
else:
break
# 记录渐进式止盈决策
if exit_ratio > 0:
logger.info(f"[{pair}] 渐进式止盈: 当前利润 {current_profit:.2%}, 市场状态 {current_state}, 止盈比例 {exit_ratio:.0%}")
# 返回应退出的比例0.0表示不退出1.0表示全部退出)
return exit_ratio