241 lines
13 KiB
Python
241 lines
13 KiB
Python
import logging
|
||
import numpy as np
|
||
from functools import reduce
|
||
import talib.abstract as ta
|
||
from pandas import DataFrame
|
||
from technical import qtpylib
|
||
from freqtrade.strategy import IStrategy, IntParameter, DecimalParameter
|
||
import pandas as pd
|
||
from sklearn.ensemble import RandomForestClassifier
|
||
from sklearn.preprocessing import StandardScaler
|
||
from sklearn.model_selection import train_test_split
|
||
|
||
|
||
class MLBasedSentimentStrategy(IStrategy):
|
||
# 参数定义:MLBasedSentimentStrategy 动态适配 buy_rsi 和 sell_rsi,禁用 Hyperopt 优化
|
||
buy_rsi = IntParameter(low=10, high=50, default=27, space="buy", optimize=False, load=True)
|
||
sell_rsi = IntParameter(low=50, high=90, default=59, space="sell", optimize=False, load=True)
|
||
# 市场情绪参数
|
||
sentiment_weight = DecimalParameter(low=0.1, high=0.9, default=0.5, space="buy", optimize=True, load=True)
|
||
|
||
def feature_engineering_expand_all(self, dataframe: DataFrame, period: int, metadata: dict, **kwargs) -> DataFrame:
|
||
# 保留关键的技术指标
|
||
dataframe["rsi"] = ta.RSI(dataframe, timeperiod=14)
|
||
|
||
# 确保 MACD 列被正确计算并保留
|
||
try:
|
||
macd = ta.MACD(dataframe, fastperiod=12, slowperiod=26, signalperiod=9)
|
||
dataframe["macd"] = macd["macd"]
|
||
dataframe["macdsignal"] = macd["macdsignal"]
|
||
except Exception as e:
|
||
logger.error(f"计算 MACD 列时出错:{str(e)}")
|
||
dataframe["macd"] = np.nan
|
||
dataframe["macdsignal"] = np.nan
|
||
|
||
# 检查 MACD 列是否存在
|
||
if "macd" not in dataframe.columns or "macdsignal" not in dataframe.columns:
|
||
logger.error("MACD 或 MACD 信号列缺失,无法生成买入信号")
|
||
raise ValueError("DataFrame 缺少必要的 MACD 列")
|
||
|
||
# 确保 MACD 列存在
|
||
if "macd" not in dataframe.columns or "macdsignal" not in dataframe.columns:
|
||
logger.error("MACD 或 MACD 信号列缺失,无法生成买入信号")
|
||
raise ValueError("DataFrame 缺少必要的 MACD 列")
|
||
|
||
# 保留布林带相关特征
|
||
bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2)
|
||
dataframe["bb_lowerband"] = bollinger["lower"]
|
||
dataframe["bb_middleband"] = bollinger["mid"]
|
||
dataframe["bb_upperband"] = bollinger["upper"]
|
||
|
||
# 保留成交量相关特征
|
||
dataframe["volume_ma"] = dataframe["volume"].rolling(window=20).mean()
|
||
|
||
# 添加市场情绪特征
|
||
# 假设我们有一个外部数据源提供市场情绪分数
|
||
# 这里我们使用一个示例值,实际应用中需要从外部数据源获取
|
||
dataframe["sentiment_score"] = 0.5 # 示例值,实际应替换为真实数据
|
||
|
||
# 数据清理
|
||
for col in dataframe.columns:
|
||
if dataframe[col].dtype in ["float64", "int64"]:
|
||
dataframe[col] = dataframe[col].replace([np.inf, -np.inf], np.nan)
|
||
dataframe[col] = dataframe[col].ffill().fillna(0)
|
||
logger.info(f"特征工程完成,特征数量:{len(dataframe.columns)}")
|
||
return dataframe
|
||
|
||
def set_freqai_targets(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame:
|
||
logger.info(f"设置 FreqAI 目标,交易对:{metadata['pair']}")
|
||
if "close" not in dataframe.columns:
|
||
logger.error("数据框缺少必要的 'close' 列")
|
||
raise ValueError("数据框缺少必要的 'close' 列")
|
||
|
||
try:
|
||
label_period = self.freqai_info["feature_parameters"]["label_period_candles"]
|
||
|
||
# 定义目标变量为未来价格变化百分比(连续值)
|
||
dataframe["up_or_down"] = (
|
||
dataframe["close"].shift(-label_period) - dataframe["close"]
|
||
) / dataframe["close"]
|
||
|
||
# 数据清理:处理 NaN 和 Inf 值
|
||
dataframe["up_or_down"] = dataframe["up_or_down"].replace([np.inf, -np.inf], np.nan)
|
||
dataframe["up_or_down"] = dataframe["up_or_down"].ffill().fillna(0)
|
||
|
||
# 确保目标变量是二维数组
|
||
if dataframe["up_or_down"].ndim == 1:
|
||
dataframe["up_or_down"] = dataframe["up_or_down"].values.reshape(-1, 1)
|
||
|
||
# 检查并处理 NaN 或无限值
|
||
dataframe["up_or_down"] = dataframe["up_or_down"].replace([np.inf, -np.inf], np.nan)
|
||
dataframe["up_or_down"] = dataframe["up_or_down"].ffill().fillna(0)
|
||
|
||
# 生成 %-volatility 特征
|
||
dataframe["%-volatility"] = dataframe["close"].pct_change().rolling(20).std()
|
||
|
||
# 确保 &-buy_rsi 列的值计算正确
|
||
dataframe["&-buy_rsi"] = ta.RSI(dataframe, timeperiod=14)
|
||
|
||
# 数据清理
|
||
for col in ["&-buy_rsi", "up_or_down", "%-volatility"]:
|
||
# 使用直接操作避免链式赋值
|
||
dataframe[col] = dataframe[col].replace([np.inf, -np.inf], np.nan)
|
||
dataframe[col] = dataframe[col].ffill() # 替代 fillna(method='ffill')
|
||
dataframe[col] = dataframe[col].fillna(dataframe[col].mean()) # 使用均值填充 NaN 值
|
||
if dataframe[col].isna().any():
|
||
logger.warning(f"目标列 {col} 仍包含 NaN,填充为默认值")
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建 FreqAI 目标失败:{str(e)}")
|
||
raise
|
||
|
||
# Log the shape of the target variable for debugging
|
||
logger.info(f"目标列形状:{dataframe['up_or_down'].shape}")
|
||
logger.info(f"目标列预览:\n{dataframe[['up_or_down', '&-buy_rsi']].head().to_string()}")
|
||
return dataframe
|
||
|
||
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
|
||
logger.info(f"处理交易对:{metadata['pair']}")
|
||
dataframe = self.freqai.start(dataframe, metadata, self)
|
||
# 计算传统指标
|
||
dataframe["rsi"] = ta.RSI(dataframe, timeperiod=14)
|
||
bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2)
|
||
dataframe["bb_lowerband"] = bollinger["lower"]
|
||
dataframe["bb_middleband"] = bollinger["mid"]
|
||
dataframe["bb_upperband"] = bollinger["upper"]
|
||
dataframe["tema"] = ta.TEMA(dataframe, timeperiod=9)
|
||
# 生成 up_or_down 信号(非 FreqAI 目标)
|
||
label_period = self.freqai_info["feature_parameters"]["label_period_candles"]
|
||
# 使用未来价格变化方向生成 up_or_down 信号
|
||
label_period = self.freqai_info["feature_parameters"]["label_period_candles"]
|
||
dataframe["up_or_down"] = np.where(
|
||
dataframe["close"].shift(-label_period) > dataframe["close"], 1, 0
|
||
)
|
||
# 动态设置参数
|
||
if "&-buy_rsi" in dataframe.columns:
|
||
# 派生其他目标
|
||
dataframe["&-sell_rsi"] = dataframe["&-buy_rsi"] + 30
|
||
dataframe["%-volatility"] = dataframe["close"].pct_change().rolling(20).std()
|
||
# Ensure proper calculation and handle potential NaN values
|
||
dataframe["&-stoploss"] = (-0.1 - (dataframe["%-volatility"] * 10).clip(0, 0.25)).fillna(-0.1)
|
||
dataframe["&-roi_0"] = ((dataframe["close"] / dataframe["close"].shift(label_period) - 1).clip(0, 0.2)).fillna(0)
|
||
|
||
# Additional check to ensure no NaN values remain
|
||
for col in ["&-stoploss", "&-roi_0"]:
|
||
if dataframe[col].isna().any():
|
||
logger.warning(f"列 {col} 仍包含 NaN,填充为默认值")
|
||
dataframe[col] = dataframe[col].fillna(-0.1 if col == "&-stoploss" else 0)
|
||
# 简化动态参数生成逻辑
|
||
# 放松 buy_rsi 和 sell_rsi 的生成逻辑
|
||
# 计算 buy_rsi_pred 并清理 NaN 值
|
||
dataframe["buy_rsi_pred"] = dataframe["rsi"].rolling(window=10).mean().clip(30, 50)
|
||
dataframe["buy_rsi_pred"] = dataframe["buy_rsi_pred"].fillna(dataframe["buy_rsi_pred"].median())
|
||
|
||
# 计算 sell_rsi_pred 并清理 NaN 值
|
||
dataframe["sell_rsi_pred"] = dataframe["buy_rsi_pred"] + 20
|
||
dataframe["sell_rsi_pred"] = dataframe["sell_rsi_pred"].fillna(dataframe["sell_rsi_pred"].median())
|
||
|
||
# 计算 stoploss_pred 并清理 NaN 值
|
||
dataframe["stoploss_pred"] = -0.1 - (dataframe["%-volatility"] * 10).clip(0, 0.25)
|
||
dataframe["stoploss_pred"] = dataframe["stoploss_pred"].fillna(dataframe["stoploss_pred"].mean())
|
||
|
||
# 计算 roi_0_pred 并清理 NaN 值
|
||
dataframe["roi_0_pred"] = dataframe["&-roi_0"].clip(0.01, 0.2)
|
||
dataframe["roi_0_pred"] = dataframe["roi_0_pred"].fillna(dataframe["roi_0_pred"].mean())
|
||
# 检查预测值
|
||
for col in ["buy_rsi_pred", "sell_rsi_pred", "stoploss_pred", "roi_0_pred", "&-sell_rsi", "&-stoploss", "&-roi_0"]:
|
||
if dataframe[col].isna().any():
|
||
logger.warning(f"列 {col} 包含 NaN,填充为默认值")
|
||
dataframe[col] = dataframe[col].fillna(dataframe[col].mean())
|
||
# 更保守的止损和止盈设置
|
||
dataframe["trailing_stop_positive"] = (dataframe["roi_0_pred"] * 0.3).clip(0.01, 0.2)
|
||
dataframe["trailing_stop_positive_offset"] = (dataframe["roi_0_pred"] * 0.5).clip(0.01, 0.3)
|
||
# 设置策略级参数
|
||
self.buy_rsi.value = float(dataframe["buy_rsi_pred"].iloc[-1])
|
||
self.sell_rsi.value = float(dataframe["sell_rsi_pred"].iloc[-1])
|
||
# 更保守的止损设置
|
||
self.stoploss = -0.15 # 固定止损 15%
|
||
self.minimal_roi = {
|
||
0: float(self.roi_0.value),
|
||
15: float(self.roi_15.value),
|
||
30: float(self.roi_30.value),
|
||
60: 0
|
||
}
|
||
# 更保守的追踪止损设置
|
||
self.trailing_stop_positive = 0.05 # 追踪止损触发点
|
||
self.trailing_stop_positive_offset = 0.1 # 追踪止损偏移量
|
||
logger.info(f"动态参数:buy_rsi={self.buy_rsi.value}, sell_rsi={self.sell_rsi.value}, "
|
||
f"stoploss={self.stoploss}, trailing_stop_positive={self.trailing_stop_positive}")
|
||
dataframe.replace([np.inf, -np.inf], 0, inplace=True)
|
||
dataframe.ffill(inplace=True)
|
||
dataframe.fillna(0, inplace=True)
|
||
logger.info(f"up_or_down 值统计:\n{dataframe['up_or_down'].value_counts().to_string()}")
|
||
logger.info(f"do_predict 值统计:\n{dataframe['do_predict'].value_counts().to_string()}")
|
||
return dataframe
|
||
def populate_entry_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
|
||
# 改进买入信号条件
|
||
# 检查 MACD 列是否存在
|
||
if "macd" not in df.columns or "macdsignal" not in df.columns:
|
||
logger.error("MACD 或 MACD 信号列缺失,无法生成买入信号。尝试重新计算 MACD 列。")
|
||
try:
|
||
macd = ta.MACD(df, fastperiod=12, slowperiod=26, signalperiod=9)
|
||
df["macd"] = macd["macd"]
|
||
df["macdsignal"] = macd["macdsignal"]
|
||
logger.info("MACD 列已成功重新计算。")
|
||
except Exception as e:
|
||
logger.error(f"重新计算 MACD 列时出错:{str(e)}")
|
||
raise ValueError("DataFrame 缺少必要的 MACD 列且无法重新计算。")
|
||
|
||
enter_long_conditions = [
|
||
(df["rsi"] < df["buy_rsi_pred"]), # RSI 低于买入阈值
|
||
(df["volume"] > df["volume"].rolling(window=10).mean() * 1.2), # 成交量高于近期均值20%
|
||
(df["close"] > df["bb_middleband"]), # 价格高于布林带中轨
|
||
(df["sentiment_score"] > self.sentiment_weight.value) # 市场情绪积极
|
||
]
|
||
|
||
# 如果 MACD 列存在,则添加 MACD 金叉条件
|
||
if "macd" in df.columns and "macdsignal" in df.columns:
|
||
enter_long_conditions.append((df["macd"] > df["macdsignal"]))
|
||
# 确保模型预测为买入
|
||
enter_long_conditions.append((df["do_predict"] == 1))
|
||
if enter_long_conditions:
|
||
df.loc[
|
||
reduce(lambda x, y: x & y, enter_long_conditions),
|
||
["enter_long", "enter_tag"]
|
||
] = (1, "long")
|
||
def populate_exit_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
|
||
# 改进卖出信号条件
|
||
exit_long_conditions = [
|
||
(df["rsi"] > df["sell_rsi_pred"]), # RSI 高于卖出阈值
|
||
(df["volume"] > df["volume"].rolling(window=10).mean()), # 成交量高于近期均值
|
||
(df["close"] < df["bb_middleband"]), # 价格低于布林带中轨
|
||
(df["sentiment_score"] < self.sentiment_weight.value) # 市场情绪消极
|
||
]
|
||
if exit_long_conditions:
|
||
df.loc[
|
||
reduce(lambda x, y: x & y, exit_long_conditions),
|
||
"exit_long"
|
||
] = 1
|
||
return df
|
||
|