import logging import numpy as np import datetime from functools import reduce from freqtrade.persistence import Trade import talib.abstract as ta from pandas import DataFrame from typing import Dict from freqtrade.strategy import IStrategy logger = logging.getLogger(__name__) class FreqaiPrimer(IStrategy): """ 策略说明: - 只使用回归模型预测价值背离(&-price_value_divergence) - 动态调整阈值,基于 price_value_divergence 的历史数据 - 放宽过滤条件,增加信号触发频率 - 使用RSI和布林带作为辅助过滤条件 """ # 策略参数 minimal_roi = { "0": 0.02, # 固定止盈2% "30": 0.01, "60": 0 } stoploss = -0.015 # 固定止损-1.5% timeframe = "3m" # 3分钟K线 use_custom_stoploss = False # 不使用动态止损,改为固定止损 # 绘图配置 plot_config = { "main_plot": { "ema200": {"color": "blue"}, "bb_upperband": {"color": "gray"}, "bb_lowerband": {"color": "gray"}, "bb_middleband": {"color": "gray"} }, "subplots": { "Signals": { "enter_long": {"color": "green"}, "exit_long": {"color": "red"} }, "Price-Value Divergence": { "&-price_value_divergence": {"color": "purple"} }, "Volume Z-Score": { "volume_z_score": {"color": "orange"} }, "RSI": { "rsi": {"color": "cyan"} } } } # FreqAI 配置:回归模型(预测价值背离) freqai_info = { "identifier": "divergence_model", "model": "LightGBMRegressor", "feature_parameters": { "include_timeframes": ["3m", "15m", "1h"], "label_period_candles": 12, "include_shifted_candles": 3, }, "data_split_parameters": { "test_size": 0.2, "shuffle": False, }, "model_training_parameters": { "n_estimators": 200, "learning_rate": 0.05, "num_leaves": 31, "verbose": -1, }, "fit_live_predictions_candles": 100, } def __init__(self, config: dict, *args, **kwargs): super().__init__(config, *args, **kwargs) logger.setLevel(logging.DEBUG) logger.debug("✅ 策略已初始化,日志级别设置为 DEBUG") def feature_engineering_expand_all(self, dataframe: DataFrame, period: int, metadata: dict, **kwargs) -> DataFrame: """ 特征工程:计算技术指标作为FreqAI的输入特征 """ dataframe["%-rsi-period"] = ta.RSI(dataframe, timeperiod=period) dataframe["%-sma-period"] = ta.SMA(dataframe, timeperiod=period) dataframe["%-ema-period"] = ta.EMA(dataframe, timeperiod=period) real = ta.TYPPRICE(dataframe) upperband, middleband, lowerband = ta.BBANDS(real, timeperiod=period, nbdevup=2.0, nbdevdn=2.0) dataframe["bb_lowerband-period"] = lowerband dataframe["bb_upperband-period"] = upperband dataframe["bb_middleband-period"] = middleband dataframe["%-bb_width-period"] = (dataframe["bb_upperband-period"] - dataframe["bb_lowerband-period"]) / dataframe["bb_middleband-period"] dataframe["%-mfi-period"] = ta.MFI(dataframe, timeperiod=period) dataframe["%-adx-period"] = ta.ADX(dataframe, timeperiod=period) dataframe["%-relative_volume-period"] = dataframe["volume"] / dataframe["volume"].rolling(period).mean() # 计算价值背离作为特征 dataframe["ema200"] = ta.EMA(dataframe, timeperiod=200) dataframe["%-price_value_divergence"] = (dataframe["close"] - dataframe["ema200"]) / dataframe["ema200"] columns_to_clean = [ "%-rsi-period", "%-mfi-period", "%-sma-period", "%-ema-period", "%-adx-period", "bb_lowerband-period", "bb_middleband-period", "bb_upperband-period", "%-bb_width-period", "%-relative_volume-period", "%-price_value_divergence" ] for col in columns_to_clean: dataframe[col] = dataframe[col].replace([np.inf, -np.inf], 0).ffill().fillna(0) logger.debug(f"[{metadata['pair']}] 特征工程完成,列:{list(dataframe.columns)}") return dataframe def set_freqai_targets(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame: """ 设置FreqAI的训练目标:只预测价值背离 """ if len(dataframe) < 200: logger.warning(f"[{metadata['pair']}] 数据量不足({len(dataframe)}根K线),需要至少200根K线进行训练") return dataframe # 计算200周期EMA作为公平价值 dataframe["ema200"] = ta.EMA(dataframe, timeperiod=200) # 计算价值背离(回归目标) dataframe["&-price_value_divergence"] = (dataframe["close"] - dataframe["ema200"]) / dataframe["ema200"] # 成交量Z分数 dataframe["volume_mean_20"] = dataframe["volume"].rolling(20).mean() dataframe["volume_std_20"] = dataframe["volume"].rolling(20).std() dataframe["volume_z_score"] = (dataframe["volume"] - dataframe["volume_mean_20"]) / dataframe["volume_std_20"] # 数据清理 dataframe["&-price_value_divergence"] = dataframe["&-price_value_divergence"].replace([np.inf, -np.inf], 0).ffill().fillna(0) dataframe["volume_z_score"] = dataframe["volume_z_score"].replace([np.inf, -np.inf], 0).ffill().fillna(0) logger.debug(f"[{metadata['pair']}] 目标列生成完成,列:{list(dataframe.columns)}") return dataframe def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: """ 计算所有指标,包括FreqAI预测结果,并基于历史数据动态调整阈值 """ logger.info(f"[{metadata['pair']}] 当前可用列(调用FreqAI前):{list(dataframe.columns)}") # 计算200周期EMA和历史价值背离 dataframe["ema200"] = ta.EMA(dataframe, timeperiod=200) dataframe["price_value_divergence"] = (dataframe["close"] - dataframe["ema200"]) / dataframe["ema200"] # 动态计算阈值:基于 price_value_divergence 的历史均值和标准差 if "price_value_divergence" in dataframe.columns: divergence_mean = dataframe["price_value_divergence"].mean() divergence_std = dataframe["price_value_divergence"].std() k = 1.5 # 标准差倍数,可以调整 self.buy_threshold = max(divergence_mean - k * divergence_std, -0.05) # 设置下限,避免过低 self.sell_threshold = min(divergence_mean + k * divergence_std, 0.05) # 设置上限,避免过高 # 确保阈值至少有一定宽度 self.buy_threshold = min(self.buy_threshold, -0.005) self.sell_threshold = max(self.sell_threshold, 0.005) logger.info(f"[{metadata['pair']}] price_value_divergence 历史均值:{divergence_mean:.4f}") logger.info(f"[{metadata['pair']}] price_value_divergence 历史标准差:{divergence_std:.4f}") logger.info(f"[{metadata['pair']}] 动态买入阈值:{self.buy_threshold:.4f}") logger.info(f"[{metadata['pair']}] 动态卖出阈值:{self.sell_threshold:.4f}") else: self.buy_threshold = -0.015 # 回退阈值 self.sell_threshold = 0.015 logger.warning(f"[{metadata['pair']}] 无法计算动态阈值,使用默认阈值 ±0.015") # 调用FreqAI预测价值背离 if not hasattr(self, 'freqai') or self.freqai is None: logger.error(f"[{metadata['pair']}] FreqAI 未初始化,请确保回测命令中启用了 --freqai") # 回退到规则计算 dataframe["&-price_value_divergence"] = dataframe["price_value_divergence"] else: dataframe = self.freqai.start(dataframe, metadata, self) # 检查预测结果 if "&-price_value_divergence" not in dataframe.columns: logger.warning(f"[{metadata['pair']}] 回归模型未生成 &-price_value_divergence,回退到规则计算") dataframe["&-price_value_divergence"] = dataframe["price_value_divergence"] # 计算其他指标 upperband, middleband, lowerband = ta.BBANDS(dataframe["close"], timeperiod=20, nbdevup=2.0, nbdevdn=2.0) dataframe["bb_upperband"] = upperband dataframe["bb_middleband"] = middleband dataframe["bb_lowerband"] = lowerband dataframe["rsi"] = ta.RSI(dataframe, timeperiod=14) dataframe["volume_mean_20"] = dataframe["volume"].rolling(20).mean() dataframe["volume_std_20"] = dataframe["volume"].rolling(20).std() dataframe["volume_z_score"] = (dataframe["volume"] - dataframe["volume_mean_20"]) / dataframe["volume_std_20"] # 数据清理 for col in ["ema200", "bb_upperband", "bb_middleband", "bb_lowerband", "rsi", "volume_z_score", "&-price_value_divergence", "price_value_divergence"]: dataframe[col] = dataframe[col].replace([np.inf, -np.inf], 0).ffill().fillna(0) logger.info(f"[{metadata['pair']}] 指标计算完成,列:{list(dataframe.columns)}") return dataframe def populate_entry_trend(self, df: DataFrame, metadata: dict) -> DataFrame: """ 入场逻辑:基于动态阈值和放宽的过滤条件 """ conditions = [] if "&-price_value_divergence" in df.columns: # 买入条件:价格低估且放量 buy_condition = (df["&-price_value_divergence"] < self.buy_threshold) buy_condition &= (df["volume_z_score"] > 1.5) # 降低成交量要求 # RSI超卖过滤(放宽条件) buy_condition &= (df["rsi"] < 40) # 价格触及布林带下轨 buy_condition &= (df["close"] <= df["bb_lowerband"]) conditions.append(buy_condition) else: logger.warning("⚠️ &-price_value_divergence 列缺失,跳过该条件") if len(conditions) > 0: df.loc[reduce(lambda x, y: x & y, conditions), 'enter_long'] = 1 logger.info(f"[{metadata['pair']}] 入场信号触发,条件满足") return df def populate_exit_trend(self, df: DataFrame, metadata: dict) -> DataFrame: """ 出场逻辑:基于动态阈值和放宽的过滤条件 """ conditions = [] if "&-price_value_divergence" in df.columns: # 卖出条件:价格高估 sell_condition = (df["&-price_value_divergence"] > self.sell_threshold) sell_condition &= (df["volume_z_score"] > 1.5) # 降低成交量要求 # RSI超买过滤(放宽条件) sell_condition |= (df["rsi"] > 60) conditions.append(sell_condition) else: logger.warning("⚠️ &-price_value_divergence 列缺失,跳过该条件") if len(conditions) > 0: df.loc[reduce(lambda x, y: x & y, conditions), 'exit_long'] = 1 logger.info(f"[{metadata['pair']}] 出场信号触发,条件满足") return df def adjust_trade_position(self, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, min_roi: Dict[float, float], max_profit: float): """ 动态调整仓位:持仓时间超过30分钟后强制平仓 """ hold_time = (current_time - trade.open_date_utc).total_seconds() / 60 if hold_time > 30: logger.info(f"[{trade.pair}] 持仓时间超过30分钟,强制平仓") return -1 return None def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str, current_time: datetime, **kwargs) -> bool: """ 交易确认:确保没有快速重复交易 """ recent_trades = Trade.get_trades( pair=pair, is_open=False, close_date=current_time - datetime.timedelta(minutes=5) ).all() if len(recent_trades) > 0: logger.info(f"[{pair}] 5分钟内有近期交易,跳过本次入场") return False return True def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float, time_in_force: str, exit_reason: str, current_time: datetime, **kwargs) -> bool: """ 交易退出确认:记录退出原因 """ logger.info(f"[{pair}] 退出交易,原因:{exit_reason}, 利润:{trade.calc_profit_ratio(rate):.2%}") return True