import logging import numpy as np from functools import reduce import talib.abstract as ta from pandas import DataFrame from technical import qtpylib from freqtrade.strategy import IStrategy, IntParameter, DecimalParameter logger = logging.getLogger(__name__) class FreqaiExampleStrategy(IStrategy): # 移除硬编码的 minimal_roi 和 stoploss,改为动态适配 minimal_roi = {} # 将在 populate_indicators 中动态生成 stoploss = 0.0 # 将在 populate_indicators 中动态设置 trailing_stop = True process_only_new_candles = True use_exit_signal = True startup_candle_count: int = 40 can_short = False # 参数定义:FreqAI 动态适配 buy_rsi 和 sell_rsi,禁用 Hyperopt 优化 buy_rsi = IntParameter(low=10, high=50, default=27, space="buy", optimize=False, load=True) sell_rsi = IntParameter(low=50, high=90, default=59, space="sell", optimize=False, load=True) # 为 Hyperopt 优化添加 ROI 和 stoploss 参数 roi_0 = DecimalParameter(low=0.01, high=0.2, default=0.038, space="roi", optimize=True, load=True) roi_15 = DecimalParameter(low=0.005, high=0.1, default=0.027, space="roi", optimize=True, load=True) roi_30 = DecimalParameter(low=0.001, high=0.05, default=0.009, space="roi", optimize=True, load=True) stoploss_param = DecimalParameter(low=-0.35, high=-0.1, default=-0.182, space="stoploss", optimize=True, load=True) # FreqAI 配置 freqai_info = { "model": "CatboostClassifier", # 与config保持一致 "feature_parameters": { "include_timeframes": ["3m", "15m", "1h"], # 与config一致 "include_corr_pairlist": ["BTC/USDT", "SOL/USDT"], # 添加相关交易对 "label_period_candles": 20, # 与config一致 "include_shifted_candles": 2, # 与config一致 }, "data_split_parameters": { "test_size": 0.2, "shuffle": True, # 启用shuffle }, "model_training_parameters": { "n_estimators": 100, # 减少树的数量 "learning_rate": 0.1, # 提高学习率 "max_depth": 6, # 限制树深度 "subsample": 0.8, # 添加子采样 "colsample_bytree": 0.8, # 添加特征采样 "objective": "reg:squarederror", "eval_metric": "rmse", "early_stopping_rounds": 20, "verbose": 0, }, "data_kitchen": { "feature_parameters": { "DI_threshold": 1.5, # 降低异常值过滤阈值 "use_DBSCAN_to_remove_outliers": False # 禁用DBSCAN } } } plot_config = { "main_plot": {}, "subplots": { "&-buy_rsi": {"&-buy_rsi": {"color": "green"}}, "&-sell_rsi": {"&-sell_rsi": {"color": "red"}}, "&-stoploss": {"&-stoploss": {"color": "purple"}}, "&-roi_0": {"&-roi_0": {"color": "orange"}}, "do_predict": {"do_predict": {"color": "brown"}}, }, } def featcaure_engineering_expand_all(self, dataframe: DataFrame, period: int, metadata: dict, **kwargs) -> DataFrame: # 保留关键的技术指标 dataframe["rsi"] = ta.RSI(dataframe, timeperiod=14) # 确保 MACD 列被正确计算并保留 try: macd = ta.MACD(dataframe, fastperiod=12, slowperiod=26, signalperiod=9) dataframe["macd"] = macd["macd"] dataframe["macdsignal"] = macd["macdsignal"] except Exception as e: logger.error(f"计算 MACD 列时出错:{str(e)}") dataframe["macd"] = np.nan dataframe["macdsignal"] = np.nan # 检查并填充 NaN 值 if "macd" in dataframe.columns and "macdsignal" in dataframe.columns: dataframe["macd"] = dataframe["macd"].fillna(method='ffill').fillna(0) dataframe["macdsignal"] = dataframe["macdsignal"].fillna(method='ffill').fillna(0) else: logger.error("MACD 或 MACD 信号列缺失,无法生成买入信号。尝试重新计算 MACD 列。") try: macd = ta.MACD(dataframe, fastperiod=12, slowperiod=26, signalperiod=9) dataframe["macd"] = macd["macd"].fillna(method='ffill').fillna(0) dataframe["macdsignal"] = macd["macdsignal"].fillna(method='ffill').fillna(0) logger.info("MACD 列已成功重新计算。") except Exception as e: logger.error(f"重新计算 MACD 列时出错:{str(e)}") raise ValueError("DataFrame 缺少必要的 MACD 列且无法重新计算。") # 检查 MACD 列是否存在 if "macd" not in dataframe.columns or "macdsignal" not in dataframe.columns: logger.error("MACD 或 MACD 信号列缺失,无法生成买入信号。尝试重新计算 MACD 列。") try: macd = ta.MACD(dataframe, fastperiod=12, slowperiod=26, signalperiod=9) dataframe["macd"] = macd["macd"] dataframe["macdsignal"] = macd["macdsignal"] logger.info("MACD 列已成功重新计算。") except Exception as e: logger.error(f"重新计算 MACD 列时出错:{str(e)}") raise ValueError("dataframe 缺少必要的 MACD 列且无法重新计算。") logger.error("MACD 或 MACD 信号列缺失,无法生成买入信号") raise ValueError("DataFrame 缺少必要的 MACD 列") # 确保 MACD 列存在 if "macd" not in dataframe.columns or "macdsignal" not in dataframe.columns: logger.error("MACD 或 MACD 信号列缺失,无法生成买入信号") raise ValueError("DataFrame 缺少必要的 MACD 列") # 保留布林带相关特征 bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2) dataframe["bb_lowerband"] = bollinger["lower"] dataframe["bb_middleband"] = bollinger["mid"] dataframe["bb_upperband"] = bollinger["upper"] # 保留成交量相关特征 dataframe["volume_ma"] = dataframe["volume"].rolling(window=20).mean() # 数据清理 for col in dataframe.columns: if dataframe[col].dtype in ["float64", "int64"]: dataframe[col] = dataframe[col].replace([np.inf, -np.inf], np.nan) dataframe[col] = dataframe[col].ffill().fillna(0) logger.info(f"特征工程完成,特征数量:{len(dataframe.columns)}") return dataframe def feature_engineering_expand_basic(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame: dataframe["%-pct-change"] = dataframe["close"].pct_change() dataframe["%-raw_volume"] = dataframe["volume"] dataframe["%-raw_price"] = dataframe["close"] # 数据清理逻辑 for col in dataframe.columns: if dataframe[col].dtype in ["float64", "int64"]: dataframe[col] = dataframe[col].replace([np.inf, -np.inf], 0) dataframe[col] = dataframe[col].ffill() dataframe[col] = dataframe[col].fillna(0) # 检查是否仍有无效值 if dataframe[col].isna().any() or np.isinf(dataframe[col]).any(): logger.warning(f"列 {col} 仍包含无效值,已填充为默认值") dataframe[col] = dataframe[col].fillna(0) return dataframe def feature_engineering_standard(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame: dataframe["%-day_of_week"] = dataframe["date"].dt.dayofweek dataframe["%-hour_of_day"] = dataframe["date"].dt.hour dataframe.replace([np.inf, -np.inf], 0, inplace=True) dataframe.ffill(inplace=True) dataframe.fillna(0, inplace=True) return dataframe def set_freqai_targets(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame: logger.info(f"设置 FreqAI 目标,交易对:{metadata['pair']}") if "close" not in dataframe.columns: logger.error("数据框缺少必要的 'close' 列") raise ValueError("数据框缺少必要的 'close' 列") try: label_period = self.freqai_info["feature_parameters"]["label_period_candles"] # 定义目标变量为未来价格变化百分比(连续值) dataframe["up_or_down"] = ( dataframe["close"].shift(-label_period) - dataframe["close"] ) / dataframe["close"] # 数据清理:处理 NaN 和 Inf 值 dataframe["up_or_down"] = dataframe["up_or_down"].replace([np.inf, -np.inf], np.nan) dataframe["up_or_down"] = dataframe["up_or_down"].ffill().fillna(0) # 确保目标变量是二维数组 if dataframe["up_or_down"].ndim == 1: dataframe["up_or_down"] = dataframe["up_or_down"].values.reshape(-1, 1) # 检查并处理 NaN 或无限值 dataframe["up_or_down"] = dataframe["up_or_down"].replace([np.inf, -np.inf], np.nan) dataframe["up_or_down"] = dataframe["up_or_down"].ffill().fillna(0) # 生成 %-volatility 特征 dataframe["%-volatility"] = dataframe["close"].pct_change().rolling(20).std() # 确保 &-buy_rsi 列的值计算正确 dataframe["&-buy_rsi"] = ta.RSI(dataframe, timeperiod=14) # 数据清理 for col in ["&-buy_rsi", "up_or_down", "%-volatility"]: # 使用直接操作避免链式赋值 dataframe[col] = dataframe[col].replace([np.inf, -np.inf], np.nan) dataframe[col] = dataframe[col].ffill() # 替代 fillna(method='ffill') dataframe[col] = dataframe[col].fillna(dataframe[col].mean()) # 使用均值填充 NaN 值 if dataframe[col].isna().any(): logger.warning(f"目标列 {col} 仍包含 NaN,填充为默认值") except Exception as e: logger.error(f"创建 FreqAI 目标失败:{str(e)}") raise # Log the shape of the target variable for debugging logger.info(f"目标列形状:{dataframe['up_or_down'].shape}") logger.info(f"目标列预览:\n{dataframe[['up_or_down', '&-buy_rsi']].head().to_string()}") return dataframe def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: logger.info(f"处理交易对:{metadata['pair']}") dataframe = self.freqai.start(dataframe, metadata, self) # 计算传统指标 dataframe["rsi"] = ta.RSI(dataframe, timeperiod=14) bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2) dataframe["bb_lowerband"] = bollinger["lower"] dataframe["bb_middleband"] = bollinger["mid"] dataframe["bb_upperband"] = bollinger["upper"] dataframe["tema"] = ta.TEMA(dataframe, timeperiod=9) # 生成 up_or_down 信号(非 FreqAI 目标) label_period = self.freqai_info["feature_parameters"]["label_period_candles"] # 使用未来价格变化方向生成 up_or_down 信号 label_period = self.freqai_info["feature_parameters"]["label_period_candles"] dataframe["up_or_down"] = np.where( dataframe["close"].shift(-label_period) > dataframe["close"], 1, 0 ) # 动态设置参数 if "&-buy_rsi" in dataframe.columns: # 派生其他目标 dataframe["&-sell_rsi"] = dataframe["&-buy_rsi"] + 30 dataframe["%-volatility"] = dataframe["close"].pct_change().rolling(20).std() # Ensure proper calculation and handle potential NaN values dataframe["&-stoploss"] = (-0.1 - (dataframe["%-volatility"] * 10).clip(0, 0.25)).fillna(-0.1) dataframe["&-roi_0"] = ((dataframe["close"] / dataframe["close"].shift(label_period) - 1).clip(0, 0.2)).fillna(0) # Additional check to ensure no NaN values remain for col in ["&-stoploss", "&-roi_0"]: if dataframe[col].isna().any(): logger.warning(f"列 {col} 仍包含 NaN,填充为默认值") dataframe[col] = dataframe[col].fillna(-0.1 if col == "&-stoploss" else 0) # 简化动态参数生成逻辑 # 放松 buy_rsi 和 sell_rsi 的生成逻辑 # 计算 buy_rsi_pred 并清理 NaN 值 dataframe["buy_rsi_pred"] = dataframe["rsi"].rolling(window=10).mean().clip(30, 50) dataframe["buy_rsi_pred"] = dataframe["buy_rsi_pred"].fillna(dataframe["buy_rsi_pred"].median()) # 计算 sell_rsi_pred 并清理 NaN 值 dataframe["sell_rsi_pred"] = dataframe["buy_rsi_pred"] + 20 dataframe["sell_rsi_pred"] = dataframe["sell_rsi_pred"].fillna(dataframe["sell_rsi_pred"].median()) # 计算 stoploss_pred 并清理 NaN 值 dataframe["stoploss_pred"] = -0.1 - (dataframe["%-volatility"] * 10).clip(0, 0.25) dataframe["stoploss_pred"] = dataframe["stoploss_pred"].fillna(dataframe["stoploss_pred"].mean()) # 计算 roi_0_pred 并清理 NaN 值 dataframe["roi_0_pred"] = dataframe["&-roi_0"].clip(0.01, 0.2) dataframe["roi_0_pred"] = dataframe["roi_0_pred"].fillna(dataframe["roi_0_pred"].mean()) # 检查预测值 for col in ["buy_rsi_pred", "sell_rsi_pred", "stoploss_pred", "roi_0_pred", "&-sell_rsi", "&-stoploss", "&-roi_0"]: if dataframe[col].isna().any(): logger.warning(f"列 {col} 包含 NaN,填充为默认值") dataframe[col] = dataframe[col].fillna(dataframe[col].mean()) # 更保守的止损和止盈设置 dataframe["trailing_stop_positive"] = (dataframe["roi_0_pred"] * 0.3).clip(0.01, 0.2) dataframe["trailing_stop_positive_offset"] = (dataframe["roi_0_pred"] * 0.5).clip(0.01, 0.3) # 设置策略级参数 self.buy_rsi.value = float(dataframe["buy_rsi_pred"].iloc[-1]) self.sell_rsi.value = float(dataframe["sell_rsi_pred"].iloc[-1]) # 更保守的止损设置 self.stoploss = -0.15 # 固定止损 15% self.minimal_roi = { 0: float(self.roi_0.value), 15: float(self.roi_15.value), 30: float(self.roi_30.value), 60: 0 } # 更保守的追踪止损设置 self.trailing_stop_positive = 0.05 # 追踪止损触发点 self.trailing_stop_positive_offset = 0.1 # 追踪止损偏移量 exit_long_conditions = [ (dataframe["rsi"] > dataframe["sell_rsi_pred"]), # RSI 高于卖出阈值 (dataframe["volume"] > dataframe["volume"].rolling(window=10).mean()), # 成交量高于近期均值 (dataframe["close"] < dataframe["bb_middleband"]), # 价格低于布林带中轨 (dataframe["close"] < dataframe["bb_lowerband"].shift(1)), # 当前价格低于上一周期的布林带下轨 (dataframe["volume"] < dataframe["volume"].shift(1) * 0.9) # 当前成交量低于上一周期的10% ] dataframe.ffill(inplace=True) dataframe.fillna(0, inplace=True) logger.info(f"up_or_down 值统计:\n{dataframe['up_or_down'].value_counts().to_string()}") logger.info(f"do_predict 值统计:\n{dataframe['do_predict'].value_counts().to_string()}") return dataframe def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # 改进卖出信号条件 exit_long_conditions = [ (dataframe["rsi"] > dataframe["sell_rsi_pred"]), # RSI 高于卖出阈值 ( (dataframe["volume"] > dataframe["volume"].rolling(window=10).mean()) | # 成交量高于近期均值 (dataframe["volume"] < dataframe["volume"].shift(1) * 0.9) # 当前成交量低于上一周期的10% ), ( (dataframe["close"] < dataframe["bb_middleband"]) | # 价格低于布林带中轨 (dataframe["close"] < dataframe["bb_lowerband"].shift(1)) # 当前价格低于上一周期的布林带下轨 ) ] if exit_long_conditions: dataframe.loc[ reduce(lambda x, y: x & y, exit_long_conditions), "exit_long" ] = 1 return dataframe def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # 改进买入信号条件 # 检查 MACD 列是否存在 if "macd" not in dataframe.columns or "macdsignal" not in dataframe.columns: logger.error("MACD 或 MACD 信号列缺失,无法生成买入信号。尝试重新计算 MACD 列。") try: macd = ta.MACD(dataframe, fastperiod=12, slowperiod=26, signalperiod=9) dataframe["macd"] = macd["macd"] dataframe["macdsignal"] = macd["macdsignal"] logger.info("MACD 列已成功重新计算。") except Exception as e: logger.error(f"重新计算 MACD 列时出错:{str(e)}") raise ValueError("dataframe 缺少必要的 MACD 列且无法重新计算。") enter_long_conditions = [ (dataframe["rsi"] < dataframe["buy_rsi_pred"]), # RSI 低于买入阈值 ( (dataframe["volume"] > dataframe["volume"].rolling(window=10).mean() * 1.2) | # 成交量高于近期均值20% (dataframe["volume"] > dataframe["volume"].shift(1) * 1.1) # 当前成交量高于上一周期的10% ), ( (dataframe["close"] > dataframe["bb_middleband"]) | # 价格高于布林带中轨 (dataframe["close"] > dataframe["bb_upperband"].shift(1)) # 当前价格高于上一周期的布林带上轨 ) ] # 如果 MACD 列存在,则添加 MACD 金叉条件 if "macd" in dataframe.columns and "macdsignal" in dataframe.columns: enter_long_conditions.append((dataframe["macd"] > dataframe["macdsignal"])) # 确保模型预测为买入 enter_long_conditions.append((dataframe["do_predict"] == 1)) if enter_long_conditions: dataframe.loc[ reduce(lambda x, y: x & y, enter_long_conditions), ["enter_long", "enter_tag"] ] = (1, "long") return dataframe def confirm_trade_entry( self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str, current_time, entry_tag, side: str, **kwargs ) -> bool: df, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = df.iloc[-1].squeeze() if side == "long": if rate > (last_candle["close"] * (1 + 0.0025)): return False return True