import logging import numpy as np from functools import reduce import talib.abstract as ta from pandas import DataFrame from technical import qtpylib from freqtrade.strategy import IStrategy, IntParameter, DecimalParameter import pandas as pd from sklearn.ensemble import RandomForestClassifier from sklearn.preprocessing import StandardScaler from sklearn.model_selection import train_test_split logger = logging.getLogger(__name__) class MLBasedSentimentStrategy(IStrategy): # 参数定义:MLBasedSentimentStrategy 动态适配 buy_rsi 和 sell_rsi,禁用 Hyperopt 优化 buy_rsi = IntParameter(low=10, high=50, default=27, space="buy", optimize=False, load=True) sell_rsi = IntParameter(low=50, high=90, default=59, space="sell", optimize=False, load=True) # 市场情绪参数 sentiment_weight = DecimalParameter(low=0.1, high=0.9, default=0.5, space="buy", optimize=True, load=True) # ROI 参数 roi_0 = DecimalParameter(low=0.01, high=0.2, default=0.05, space="buy", optimize=True, load=True) roi_15 = DecimalParameter(low=0.01, high=0.15, default=0.03, space="buy", optimize=True, load=True) roi_30 = DecimalParameter(low=0.01, high=0.1, default=0.02, space="buy", optimize=True, load=True) def feature_engineering_expand_all(self, dataframe: DataFrame, period: int, metadata: dict, **kwargs) -> DataFrame: # 保留关键的技术指标 dataframe["rsi"] = ta.RSI(dataframe, timeperiod=14) # 确保 MACD 列被正确计算并保留 try: macd = ta.MACD(dataframe, fastperiod=12, slowperiod=26, signalperiod=9) dataframe["macd"] = macd["macd"] dataframe["macdsignal"] = macd["macdsignal"] except Exception as e: logger.error(f"计算 MACD 列时出错:{str(e)}") dataframe["macd"] = np.nan dataframe["macdsignal"] = np.nan # 检查 MACD 列是否存在 if "macd" not in dataframe.columns or "macdsignal" not in dataframe.columns: logger.error("MACD 或 MACD 信号列缺失,无法生成买入信号") raise ValueError("DataFrame 缺少必要的 MACD 列") # 确保 MACD 列存在 if "macd" not in dataframe.columns or "macdsignal" not in dataframe.columns: logger.error("MACD 或 MACD 信号列缺失,无法生成买入信号") raise ValueError("DataFrame 缺少必要的 MACD 列") # 保留布林带相关特征 bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2) dataframe["bb_lowerband"] = bollinger["lower"] dataframe["bb_middleband"] = bollinger["mid"] dataframe["bb_upperband"] = bollinger["upper"] # 保留成交量相关特征 dataframe["volume_ma"] = dataframe["volume"].rolling(window=20).mean() # 添加市场情绪特征 # 假设我们有一个外部数据源提供市场情绪分数 # 这里我们使用一个示例值,实际应用中需要从外部数据源获取 dataframe["sentiment_score"] = 0.5 # 示例值,实际应替换为真实数据 # 数据清理 for col in dataframe.columns: if dataframe[col].dtype in ["float64", "int64"]: dataframe[col] = dataframe[col].replace([np.inf, -np.inf], np.nan) dataframe[col] = dataframe[col].ffill().fillna(0) logger.info(f"特征工程完成,特征数量:{len(dataframe.columns)}") return dataframe def set_freqai_targets(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame: logger.info(f"设置 FreqAI 目标,交易对:{metadata['pair']}") if "close" not in dataframe.columns: logger.error("数据框缺少必要的 'close' 列") raise ValueError("数据框缺少必要的 'close' 列") try: label_period = self.freqai_info["feature_parameters"]["label_period_candles"] # 定义目标变量为未来价格变化百分比(连续值) dataframe["up_or_down"] = ( dataframe["close"].shift(-label_period) - dataframe["close"] ) / dataframe["close"] # 数据清理:处理 NaN 和 Inf 值 dataframe["up_or_down"] = dataframe["up_or_down"].replace([np.inf, -np.inf], np.nan) dataframe["up_or_down"] = dataframe["up_or_down"].ffill().fillna(0) # 确保目标变量是二维数组 if dataframe["up_or_down"].ndim == 1: dataframe["up_or_down"] = dataframe["up_or_down"].values.reshape(-1, 1) # 检查并处理 NaN 或无限值 dataframe["up_or_down"] = dataframe["up_or_down"].replace([np.inf, -np.inf], np.nan) dataframe["up_or_down"] = dataframe["up_or_down"].ffill().fillna(0) # 生成 %-volatility 特征 dataframe["%-volatility"] = dataframe["close"].pct_change().rolling(20).std() # 确保 &-buy_rsi 列的值计算正确 dataframe["&-buy_rsi"] = ta.RSI(dataframe, timeperiod=14) # 数据清理 for col in ["&-buy_rsi", "up_or_down", "%-volatility"]: # 使用直接操作避免链式赋值 dataframe[col] = dataframe[col].replace([np.inf, -np.inf], np.nan) dataframe[col] = dataframe[col].ffill() # 替代 fillna(method='ffill') dataframe[col] = dataframe[col].fillna(dataframe[col].mean()) # 使用均值填充 NaN 值 if dataframe[col].isna().any(): logger.warning(f"目标列 {col} 仍包含 NaN,填充为默认值") except Exception as e: logger.error(f"创建 FreqAI 目标失败:{str(e)}") raise # Log the shape of the target variable for debugging logger.info(f"目标列形状:{dataframe['up_or_down'].shape}") logger.info(f"目标列预览:\n{dataframe[['up_or_down', '&-buy_rsi']].head().to_string()}") return dataframe def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: logger.info(f"处理交易对:{metadata['pair']}") dataframe = self.freqai.start(dataframe, metadata, self) # 计算传统指标 dataframe["rsi"] = ta.RSI(dataframe, timeperiod=14) bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2) dataframe["bb_lowerband"] = bollinger["lower"] dataframe["bb_middleband"] = bollinger["mid"] dataframe["bb_upperband"] = bollinger["upper"] dataframe["tema"] = ta.TEMA(dataframe, timeperiod=9) # 生成 up_or_down 信号(非 FreqAI 目标) label_period = self.freqai_info["feature_parameters"]["label_period_candles"] # 使用未来价格变化方向生成 up_or_down 信号 label_period = self.freqai_info["feature_parameters"]["label_period_candles"] dataframe["up_or_down"] = np.where( dataframe["close"].shift(-label_period) > dataframe["close"], 1, 0 ) # 动态设置参数 if "&-buy_rsi" in dataframe.columns: # 派生其他目标 dataframe["&-sell_rsi"] = dataframe["&-buy_rsi"] + 30 dataframe["%-volatility"] = dataframe["close"].pct_change().rolling(20).std() # Ensure proper calculation and handle potential NaN values dataframe["&-stoploss"] = (-0.1 - (dataframe["%-volatility"] * 10).clip(0, 0.25)).fillna(-0.1) dataframe["&-roi_0"] = ((dataframe["close"] / dataframe["close"].shift(label_period) - 1).clip(0, 0.2)).fillna(0) # Additional check to ensure no NaN values remain for col in ["&-stoploss", "&-roi_0"]: if dataframe[col].isna().any(): logger.warning(f"列 {col} 仍包含 NaN,填充为默认值") dataframe[col] = dataframe[col].fillna(-0.1 if col == "&-stoploss" else 0) # 简化动态参数生成逻辑 # 放松 buy_rsi 和 sell_rsi 的生成逻辑 # 计算 buy_rsi_pred 并清理 NaN 值 dataframe["buy_rsi_pred"] = dataframe["rsi"].rolling(window=10).mean().clip(30, 50) dataframe["buy_rsi_pred"] = dataframe["buy_rsi_pred"].fillna(dataframe["buy_rsi_pred"].median()) # 计算 sell_rsi_pred 并清理 NaN 值 dataframe["sell_rsi_pred"] = dataframe["buy_rsi_pred"] + 20 dataframe["sell_rsi_pred"] = dataframe["sell_rsi_pred"].fillna(dataframe["sell_rsi_pred"].median()) # 计算 stoploss_pred 并清理 NaN 值 dataframe["stoploss_pred"] = -0.1 - (dataframe["%-volatility"] * 10).clip(0, 0.25) dataframe["stoploss_pred"] = dataframe["stoploss_pred"].fillna(dataframe["stoploss_pred"].mean()) # 计算 roi_0_pred 并清理 NaN 值 dataframe["roi_0_pred"] = dataframe["&-roi_0"].clip(0.01, 0.2) dataframe["roi_0_pred"] = dataframe["roi_0_pred"].fillna(dataframe["roi_0_pred"].mean()) # 检查预测值 for col in ["buy_rsi_pred", "sell_rsi_pred", "stoploss_pred", "roi_0_pred", "&-sell_rsi", "&-stoploss", "&-roi_0"]: if dataframe[col].isna().any(): logger.warning(f"列 {col} 包含 NaN,填充为默认值") dataframe[col] = dataframe[col].fillna(dataframe[col].mean()) # 更保守的止损和止盈设置 dataframe["trailing_stop_positive"] = (dataframe["roi_0_pred"] * 0.3).clip(0.01, 0.2) dataframe["trailing_stop_positive_offset"] = (dataframe["roi_0_pred"] * 0.5).clip(0.01, 0.3) # 设置策略级参数 self.buy_rsi.value = float(dataframe["buy_rsi_pred"].iloc[-1]) self.sell_rsi.value = float(dataframe["sell_rsi_pred"].iloc[-1]) # 更保守的止损设置 self.stoploss = -0.15 # 固定止损 15% self.minimal_roi = { 0: float(self.roi_0.value), 15: float(self.roi_15.value), 30: float(self.roi_30.value), 60: 0 } # 更保守的追踪止损设置 self.trailing_stop_positive = 0.05 # 追踪止损触发点 self.trailing_stop_positive_offset = 0.1 # 追踪止损偏移量 logger.info(f"动态参数:buy_rsi={self.buy_rsi.value}, sell_rsi={self.sell_rsi.value}, " f"stoploss={self.stoploss}, trailing_stop_positive={self.trailing_stop_positive}") dataframe.replace([np.inf, -np.inf], 0, inplace=True) dataframe.ffill(inplace=True) dataframe.fillna(0, inplace=True) logger.info(f"up_or_down 值统计:\n{dataframe['up_or_down'].value_counts().to_string()}") logger.info(f"do_predict 值统计:\n{dataframe['do_predict'].value_counts().to_string()}") return dataframe def populate_entry_trend(self, df: DataFrame, metadata: dict) -> DataFrame: # 改进买入信号条件 # 检查 MACD 列是否存在 if "macd" not in df.columns or "macdsignal" not in df.columns: logger.error("MACD 或 MACD 信号列缺失,无法生成买入信号。尝试重新计算 MACD 列。") try: macd = ta.MACD(df, fastperiod=12, slowperiod=26, signalperiod=9) df["macd"] = macd["macd"] df["macdsignal"] = macd["macdsignal"] logger.info("MACD 列已成功重新计算。") except Exception as e: logger.error(f"重新计算 MACD 列时出错:{str(e)}") raise ValueError("DataFrame 缺少必要的 MACD 列且无法重新计算。") enter_long_conditions = [ (df["rsi"] < df["buy_rsi_pred"]), # RSI 低于买入阈值 (df["volume"] > df["volume"].rolling(window=10).mean() * 1.2), # 成交量高于近期均值20% (df["close"] > df["bb_middleband"]), # 价格高于布林带中轨 (df["sentiment_score"] > self.sentiment_weight.value) # 市场情绪积极 ] # 如果 MACD 列存在,则添加 MACD 金叉条件 if "macd" in df.columns and "macdsignal" in df.columns: enter_long_conditions.append((df["macd"] > df["macdsignal"])) # 确保模型预测为买入 enter_long_conditions.append((df["do_predict"] == 1)) if enter_long_conditions: df.loc[ reduce(lambda x, y: x & y, enter_long_conditions), ["enter_long", "enter_tag"] ] = (1, "long") def populate_exit_trend(self, df: DataFrame, metadata: dict) -> DataFrame: # 改进卖出信号条件 exit_long_conditions = [ (df["rsi"] > df["sell_rsi_pred"]), # RSI 高于卖出阈值 (df["volume"] > df["volume"].rolling(window=10).mean()), # 成交量高于近期均值 (df["close"] < df["bb_middleband"]), # 价格低于布林带中轨 (df["sentiment_score"] < self.sentiment_weight.value) # 市场情绪消极 ] if exit_long_conditions: df.loc[ reduce(lambda x, y: x & y, exit_long_conditions), "exit_long" ] = 1 return df