import logging import numpy as np from functools import reduce import talib.abstract as ta from pandas import DataFrame from technical import qtpylib from freqtrade.strategy import IStrategy, IntParameter, DecimalParameter logger = logging.getLogger(__name__) class FreqaiExampleStrategy(IStrategy): # 移除硬编码的 minimal_roi 和 stoploss,改为动态适配 minimal_roi = {} # 将在 populate_indicators 中动态生成 stoploss = 0.0 # 将在 populate_indicators 中动态设置 trailing_stop = True process_only_new_candles = True use_exit_signal = True startup_candle_count: int = 40 can_short = False # 参数定义:FreqAI 动态适配 buy_rsi 和 sell_rsi,禁用 Hyperopt 优化 buy_rsi = IntParameter(low=10, high=50, default=27, space="buy", optimize=False, load=True) sell_rsi = IntParameter(low=50, high=90, default=59, space="sell", optimize=False, load=True) # 为 Hyperopt 优化添加 ROI 和 stoploss 参数 roi_0 = DecimalParameter(low=0.01, high=0.2, default=0.038, space="roi", optimize=True, load=True) roi_15 = DecimalParameter(low=0.005, high=0.1, default=0.027, space="roi", optimize=True, load=True) roi_30 = DecimalParameter(low=0.001, high=0.05, default=0.009, space="roi", optimize=True, load=True) stoploss_param = DecimalParameter(low=-0.35, high=-0.1, default=-0.182, space="stoploss", optimize=True, load=True) # FreqAI 配置 freqai_info = { "model": "XGBoostRegressor", # 与config保持一致 "feature_parameters": { "include_timeframes": ["3m", "15m", "1h"], # 与config一致 "include_corr_pairlist": ["BTC/USDT", "SOL/USDT"], # 添加相关交易对 "label_period_candles": 20, # 与config一致 "include_shifted_candles": 2, # 与config一致 }, "data_split_parameters": { "test_size": 0.2, "shuffle": True, # 启用shuffle }, "model_training_parameters": { "n_estimators": 100, # 减少树的数量 "learning_rate": 0.1, # 提高学习率 "max_depth": 6, # 限制树深度 "subsample": 0.8, # 添加子采样 "colsample_bytree": 0.8, # 添加特征采样 "objective": "reg:squarederror", "eval_metric": "rmse", "early_stopping_rounds": 20, "verbose": 0, }, "data_kitchen": { "feature_parameters": { "DI_threshold": 1.5, # 降低异常值过滤阈值 "use_DBSCAN_to_remove_outliers": False # 禁用DBSCAN } } } plot_config = { "main_plot": {}, "subplots": { "&-buy_rsi": {"&-buy_rsi": {"color": "green"}}, "&-sell_rsi": {"&-sell_rsi": {"color": "red"}}, "&-stoploss": {"&-stoploss": {"color": "purple"}}, "&-roi_0": {"&-roi_0": {"color": "orange"}}, "do_predict": {"do_predict": {"color": "brown"}}, }, } def featcaure_engineering_expand_all(self, dataframe: DataFrame, period: int, metadata: dict, **kwargs) -> DataFrame: # 计算关键指标 dataframe["rsi"] = ta.RSI(dataframe, timeperiod=14) # 计算短期和长期SMA dataframe["sma_short"] = ta.SMA(dataframe, timeperiod=12) dataframe["sma_long"] = ta.SMA(dataframe, timeperiod=26) # 计算SMA交叉信号 dataframe["sma_cross"] = np.where( dataframe["sma_short"] > dataframe["sma_long"], 1, -1 ) # 计算布林带 bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2) dataframe["bb_lowerband"] = bollinger["lower"] dataframe["bb_middleband"] = bollinger["mid"] dataframe["bb_upperband"] = bollinger["upper"] # 计算价格与布林带的关系 dataframe["bb_pct"] = (dataframe["close"] - dataframe["bb_lowerband"]) / ( dataframe["bb_upperband"] - dataframe["bb_lowerband"] ) # 成交量相关特征 dataframe["volume_ma"] = dataframe["volume"].rolling(window=20).mean() dataframe["volume_pct"] = dataframe["volume"] / dataframe["volume_ma"] # 价格变化特征 dataframe["pct_change"] = dataframe["close"].pct_change() dataframe["pct_change_5"] = dataframe["close"].pct_change(5) dataframe["pct_change_10"] = dataframe["close"].pct_change(10) # 数据清理 for col in dataframe.columns: if dataframe[col].dtype in ["float64", "int64"]: dataframe[col] = dataframe[col].replace([np.inf, -np.inf], np.nan) dataframe[col] = dataframe[col].ffill().fillna(0) logger.info(f"特征工程完成,特征数量:{len(dataframe.columns)}") return dataframe def feature_engineering_expand_basic(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame: dataframe["%-pct-change"] = dataframe["close"].pct_change() dataframe["%-raw_volume"] = dataframe["volume"] dataframe["%-raw_price"] = dataframe["close"] # 数据清理逻辑 for col in dataframe.columns: if dataframe[col].dtype in ["float64", "int64"]: dataframe[col] = dataframe[col].replace([np.inf, -np.inf], 0) dataframe[col] = dataframe[col].ffill() dataframe[col] = dataframe[col].fillna(0) # 检查是否仍有无效值 if dataframe[col].isna().any() or np.isinf(dataframe[col]).any(): logger.warning(f"列 {col} 仍包含无效值,已填充为默认值") dataframe[col] = dataframe[col].fillna(0) return dataframe def feature_engineering_standard(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame: dataframe["%-day_of_week"] = dataframe["date"].dt.dayofweek dataframe["%-hour_of_day"] = dataframe["date"].dt.hour dataframe.replace([np.inf, -np.inf], 0, inplace=True) dataframe.ffill(inplace=True) dataframe.fillna(0, inplace=True) return dataframe def set_freqai_targets(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame: logger.info(f"设置 FreqAI 目标,交易对:{metadata['pair']}") if "close" not in dataframe.columns: logger.error("数据框缺少必要的 'close' 列") raise ValueError("数据框缺少必要的 'close' 列") try: label_period = self.freqai_info["feature_parameters"]["label_period_candles"] # 定义目标变量为未来价格变化百分比(连续值) dataframe["target"] = ( dataframe["close"].shift(-label_period) - dataframe["close"] ) / dataframe["close"] # 添加辅助目标变量 dataframe["target_5"] = ( dataframe["close"].shift(-5) - dataframe["close"] ) / dataframe["close"] dataframe["target_10"] = ( dataframe["close"].shift(-10) - dataframe["close"] ) / dataframe["close"] # 数据清理:处理 NaN 和 Inf 值 for col in ["target", "target_5", "target_10"]: dataframe[col] = dataframe[col].replace([np.inf, -np.inf], np.nan) dataframe[col] = dataframe[col].ffill().fillna(0) # 确保目标变量是二维数组 if dataframe["target"].ndim == 1: dataframe["target"] = dataframe["target"].values.reshape(-1, 1) # 生成波动率特征 dataframe["%-volatility"] = dataframe["close"].pct_change().rolling(20).std() # 数据清理 for col in ["target", "target_5", "target_10", "%-volatility"]: dataframe[col] = dataframe[col].replace([np.inf, -np.inf], np.nan) dataframe[col] = dataframe[col].ffill() dataframe[col] = dataframe[col].fillna(dataframe[col].mean()) if dataframe[col].isna().any(): logger.warning(f"目标列 {col} 仍包含 NaN,填充为默认值") except Exception as e: logger.error(f"创建 FreqAI 目标失败:{str(e)}") raise # Log the shape of the target variable for debugging logger.info(f"目标列形状:{dataframe['target'].shape}") logger.info(f"目标列预览:\n{dataframe[['target', 'target_5', 'target_10']].head().to_string()}") return dataframe def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: logger.info(f"处理交易对:{metadata['pair']}") dataframe = self.freqai.start(dataframe, metadata, self) # 计算传统指标 dataframe["rsi"] = ta.RSI(dataframe, timeperiod=14) bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2) dataframe["bb_lowerband"] = bollinger["lower"] dataframe["bb_middleband"] = bollinger["mid"] dataframe["bb_upperband"] = bollinger["upper"] dataframe["tema"] = ta.TEMA(dataframe, timeperiod=9) # 生成 up_or_down 信号(非 FreqAI 目标) label_period = self.freqai_info["feature_parameters"]["label_period_candles"] # 使用未来价格变化方向生成 up_or_down 信号 label_period = self.freqai_info["feature_parameters"]["label_period_candles"] dataframe["up_or_down"] = np.where( dataframe["close"].shift(-label_period) > dataframe["close"], 1, 0 ) # 动态设置参数 if "&-buy_rsi" in dataframe.columns: # 派生其他目标 dataframe["&-sell_rsi"] = dataframe["&-buy_rsi"] + 30 dataframe["%-volatility"] = dataframe["close"].pct_change().rolling(20).std() # Ensure proper calculation and handle potential NaN values dataframe["&-stoploss"] = (-0.1 - (dataframe["%-volatility"] * 10).clip(0, 0.25)).fillna(-0.1) dataframe["&-roi_0"] = ((dataframe["close"] / dataframe["close"].shift(label_period) - 1).clip(0, 0.2)).fillna(0) # Additional check to ensure no NaN values remain for col in ["&-stoploss", "&-roi_0"]: if dataframe[col].isna().any(): logger.warning(f"列 {col} 仍包含 NaN,填充为默认值") dataframe[col] = dataframe[col].fillna(-0.1 if col == "&-stoploss" else 0) # 简化动态参数生成逻辑 # 放松 buy_rsi 和 sell_rsi 的生成逻辑 # 计算 buy_rsi_pred 并清理 NaN 值 dataframe["buy_rsi_pred"] = dataframe["rsi"].rolling(window=10).mean().clip(30, 50) dataframe["buy_rsi_pred"] = dataframe["buy_rsi_pred"].fillna(dataframe["buy_rsi_pred"].median()) # 计算 sell_rsi_pred 并清理 NaN 值 dataframe["sell_rsi_pred"] = dataframe["buy_rsi_pred"] + 20 dataframe["sell_rsi_pred"] = dataframe["sell_rsi_pred"].fillna(dataframe["sell_rsi_pred"].median()) # 计算 stoploss_pred 并清理 NaN 值 dataframe["stoploss_pred"] = -0.1 - (dataframe["%-volatility"] * 10).clip(0, 0.25) dataframe["stoploss_pred"] = dataframe["stoploss_pred"].fillna(dataframe["stoploss_pred"].mean()) # 计算 roi_0_pred 并清理 NaN 值 dataframe["roi_0_pred"] = dataframe["&-roi_0"].clip(0.01, 0.2) dataframe["roi_0_pred"] = dataframe["roi_0_pred"].fillna(dataframe["roi_0_pred"].mean()) # 检查预测值 for col in ["buy_rsi_pred", "sell_rsi_pred", "stoploss_pred", "roi_0_pred", "&-sell_rsi", "&-stoploss", "&-roi_0"]: if dataframe[col].isna().any(): logger.warning(f"列 {col} 包含 NaN,填充为默认值") dataframe[col] = dataframe[col].fillna(dataframe[col].mean()) # 更保守的止损和止盈设置 dataframe["trailing_stop_positive"] = (dataframe["roi_0_pred"] * 0.3).clip(0.01, 0.2) dataframe["trailing_stop_positive_offset"] = (dataframe["roi_0_pred"] * 0.5).clip(0.01, 0.3) # 设置策略级参数 self.buy_rsi.value = float(dataframe["buy_rsi_pred"].iloc[-1]) self.sell_rsi.value = float(dataframe["sell_rsi_pred"].iloc[-1]) # 更保守的止损设置 self.stoploss = -0.15 # 固定止损 15% self.minimal_roi = { 0: float(self.roi_0.value), 15: float(self.roi_15.value), 30: float(self.roi_30.value), 60: 0 } # 更保守的追踪止损设置 self.trailing_stop_positive = 0.05 # 追踪止损触发点 self.trailing_stop_positive_offset = 0.1 # 追踪止损偏移量 logger.info(f"动态参数:buy_rsi={self.buy_rsi.value}, sell_rsi={self.sell_rsi.value}, " f"stoploss={self.stoploss}, trailing_stop_positive={self.trailing_stop_positive}") dataframe.replace([np.inf, -np.inf], 0, inplace=True) dataframe.ffill(inplace=True) dataframe.fillna(0, inplace=True) logger.info(f"up_or_down 值统计:\n{dataframe['up_or_down'].value_counts().to_string()}") logger.info(f"do_predict 值统计:\n{dataframe['do_predict'].value_counts().to_string()}") return dataframe def populate_exit_trend(self, df: DataFrame, metadata: dict) -> DataFrame: # 改进卖出信号条件 exit_long_conditions = [ (df["rsi"] > df["sell_rsi_pred"]), # RSI 高于卖出阈值 (df["volume"] > df["volume"].rolling(window=10).mean()), # 成交量高于近期均值 (df["close"] < df["bb_middleband"]), # 价格低于布林带中轨 (df["sma_short"] < df["sma_long"]) # SMA 死叉(短期 SMA 下穿长期 SMA) ] if exit_long_conditions: df.loc[ reduce(lambda x, y: x & y, exit_long_conditions), "exit_long" ] = 1 return df def populate_entry_trend(self, df: DataFrame, metadata: dict) -> DataFrame: # 改进买入信号条件 # 计算短期和长期 SMA df["sma_short"] = ta.SMA(df, timeperiod=12) df["sma_long"] = ta.SMA(df, timeperiod=26) enter_long_conditions = [ (df["rsi"] < df["buy_rsi_pred"]), # RSI 低于买入阈值 (df["volume"] > df["volume"].rolling(window=10).mean() * 1.2), # 成交量高于近期均值20% (df["close"] > df["bb_middleband"]) # 价格高于布林带中轨 ] # 添加 SMA 金叉条件(短期 SMA 上穿长期 SMA) enter_long_conditions.append((df["sma_short"] > df["sma_long"])) # 确保模型预测为买入 enter_long_conditions.append((df["do_predict"] == 1)) if enter_long_conditions: df.loc[ reduce(lambda x, y: x & y, enter_long_conditions), ["enter_long", "enter_tag"] ] = (1, "long") return df def confirm_trade_entry( self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str, current_time, entry_tag, side: str, **kwargs ) -> bool: df, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = df.iloc[-1].squeeze() if side == "long": if rate > (last_candle["close"] * (1 + 0.0025)): return False return True