myTestFreqAI/freqtrade/templates/FreqaiExampleStrategy.py
2025-05-07 23:47:27 +08:00

396 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import numpy as np
import pandas as pd # 添加 pandas 导入
from functools import reduce
import talib.abstract as ta
from pandas import DataFrame
from typing import Dict, List, Optional
from technical import qtpylib
from freqtrade.strategy import IStrategy, IntParameter, DecimalParameter
logger = logging.getLogger(__name__)
class FreqaiExampleStrategy(IStrategy):
# 移除硬编码的 minimal_roi 和 stoploss改为动态适配
minimal_roi = {} # 将在 populate_indicators 中动态生成
stoploss = 0.0 # 将在 populate_indicators 中动态设置
trailing_stop = True
process_only_new_candles = True
use_exit_signal = True
startup_candle_count: int = 40
can_short = False
# 参数定义FreqAI 动态适配 buy_rsi 和 sell_rsi禁用 Hyperopt 优化
buy_rsi = IntParameter(low=10, high=50, default=27, space="buy", optimize=False, load=True)
sell_rsi = IntParameter(low=50, high=90, default=59, space="sell", optimize=False, load=True)
# 为 Hyperopt 优化添加 ROI 和 stoploss 参数
roi_0 = DecimalParameter(low=0.01, high=0.2, default=0.038, space="roi", optimize=True, load=True)
roi_15 = DecimalParameter(low=0.005, high=0.1, default=0.027, space="roi", optimize=True, load=True)
roi_30 = DecimalParameter(low=0.001, high=0.05, default=0.009, space="roi", optimize=True, load=True)
stoploss_param = DecimalParameter(low=-0.35, high=-0.1, default=-0.182, space="stoploss", optimize=True, load=True)
# FreqAI 配置
freqai_info = {
"model": "XGBoostRegressor", # 与config保持一致
"save_backtest_models": True,
"feature_parameters": {
"include_timeframes": ["3m", "15m", "1h"], # 与config一致:w
"include_corr_pairlist": ["BTC/USDT", "SOL/USDT"], # 添加相关交易对
"label_period_candles": 20, # 与config一致
"include_shifted_candles": 2, # 与config一致
},
"data_split_parameters": {
"test_size": 0.2,
"shuffle": True, # 启用shuffle
},
"model_training_parameters": {
"n_estimators": 100, # 减少树的数量
"learning_rate": 0.1, # 提高学习率
"max_depth": 6, # 限制树深度
"subsample": 0.8, # 添加子采样
"colsample_bytree": 0.8, # 添加特征采样
"objective": "reg:squarederror",
"eval_metric": "rmse",
"early_stopping_rounds": 20,
"verbose": 0,
},
"data_kitchen": {
"feature_parameters": {
"DI_threshold": 1.5, # 降低异常值过滤阈值
"use_DBSCAN_to_remove_outliers": False # 禁用DBSCAN
}
}
}
plot_config = {
"main_plot": {},
"subplots": {
"&-buy_rsi": {"&-buy_rsi": {"color": "green"}},
"&-sell_rsi": {"&-sell_rsi": {"color": "red"}},
"&-stoploss": {"&-stoploss": {"color": "purple"}},
"&-roi_0": {"&-roi_0": {"color": "orange"}},
"do_predict": {"do_predict": {"color": "brown"}},
},
}
def __init__(self, config: Dict):
super().__init__(config)
# 初始化特征缓存
self.feature_cache = {}
# 设置日志级别
logger.setLevel(logging.DEBUG)
# 输出模型路径用于调试
freqai_model_path = self.config.get("freqai", {}).get("model_path", "/freqtrade/user_data/models")
logger.info(f"FreqAI 模型路径:{freqai_model_path}")
def _normalize_column(self, series: pd.Series) -> pd.Series:
"""对单个列进行最小最大归一化"""
if series.nunique() <= 1:
# 如果列中所有值都相同或为空直接返回全0
return pd.Series(np.zeros_like(series), index=series.index)
min_val = series.min()
max_val = series.max()
normalized = (series - min_val) / (max_val - min_val)
return normalized.fillna(0)
def feature_engineering_expand_all(self, dataframe: DataFrame, period: int, metadata: dict, **kwargs) -> DataFrame:
# 基础指标
dataframe["rsi"] = ta.RSI(dataframe, timeperiod=14)
dataframe["macd"], dataframe["macdsignal"], dataframe["macdhist"] = ta.MACD(
dataframe, fastperiod=12, slowperiod=26, signalperiod=9
)
# 布林带及其宽度
bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2)
dataframe["bb_lowerband"] = bollinger["lower"]
dataframe["bb_middleband"] = bollinger["mid"]
dataframe["bb_upperband"] = bollinger["upper"]
dataframe["bb_width"] = (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["bb_middleband"]
# ATR (Average True Range)
dataframe["atr"] = ta.ATR(dataframe, timeperiod=14)
# RSI 变化率
dataframe["rsi_gradient"] = dataframe["rsi"].diff().fillna(0)
# 数据清理与归一化
for col in dataframe.select_dtypes(include=[np.number]).columns:
# Ensure column is valid and contains more than one unique value to avoid division by zero
if dataframe[col].nunique() > 1:
dataframe[col] = dataframe[col].replace([np.inf, -np.inf], np.nan)
dataframe[col] = dataframe[col].ffill().fillna(0)
dataframe[f"{col}_norm"] = self._normalize_column(dataframe[col])
else:
dataframe[f"{col}_norm"] = 0 # Default if normalization not possible
logger.info(f"特征工程完成,特征数量:{len(dataframe.columns)}")
return dataframe
def _add_noise(self, dataframe: DataFrame, noise_level: float = 0.02) -> DataFrame:
"""为数值型特征添加随机噪声以增强模型泛化能力"""
df = dataframe.copy()
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
for col in numeric_cols:
noise = np.random.normal(loc=0.0, scale=noise_level * df[col].std(), size=df.shape[0])
df[col] += noise
logger.info(f"已向 {len(numeric_cols)} 个数值型特征添加 {noise_level * 100:.0f}% 噪声")
return df
def feature_engineering_expand_basic(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame:
dataframe["%-pct-change"] = dataframe["close"].pct_change()
dataframe["%-raw_volume"] = dataframe["volume"]
dataframe["%-raw_price"] = dataframe["close"]
dataframe["%-volume_change"] = dataframe["volume"].pct_change(periods=5)
dataframe["%-price_momentum"] = dataframe["close"] / dataframe["close"].shift(20) - 1
# 数据清理逻辑
for col in dataframe.columns:
if dataframe[col].dtype in ["float64", "int64"]:
dataframe[col] = dataframe[col].replace([np.inf, -np.inf], 0)
dataframe[col] = dataframe[col].ffill()
dataframe[col] = dataframe[col].fillna(0)
# 检查是否仍有无效值
if dataframe[col].isna().any() or np.isinf(dataframe[col]).any():
logger.warning(f"{col} 仍包含无效值,已填充为默认值")
dataframe[col] = dataframe[col].fillna(0)
# 添加 2% 噪声以增强模型鲁棒性
dataframe = self._add_noise(dataframe, noise_level=0.02)
logger.info(f"特征工程完成,特征数量:{len(dataframe.columns)}")
return dataframe
def feature_engineering_standard(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame:
dataframe["%-day_of_week"] = dataframe["date"].dt.dayofweek
dataframe["%-hour_of_day"] = dataframe["date"].dt.hour
dataframe.replace([np.inf, -np.inf], 0, inplace=True)
dataframe.ffill(inplace=True)
dataframe.fillna(0, inplace=True)
return dataframe
def set_freqai_targets(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame:
logger.info(f"设置 FreqAI 目标,交易对:{metadata['pair']}")
if "close" not in dataframe.columns:
logger.error("数据框缺少必要的 'close'")
raise ValueError("数据框缺少必要的 'close'")
try:
label_period = self.freqai_info["feature_parameters"]["label_period_candles"]
# 定义目标变量为未来价格变化百分比(连续值)
dataframe["up_or_down"] = (
dataframe["close"].shift(-label_period) - dataframe["close"]
) / dataframe["close"]
# 数据清理:处理 NaN 和 Inf 值
dataframe["up_or_down"] = dataframe["up_or_down"].replace([np.inf, -np.inf], np.nan)
dataframe["up_or_down"] = dataframe["up_or_down"].ffill().fillna(0)
# 确保目标变量是二维数组
if dataframe["up_or_down"].ndim == 1:
dataframe["up_or_down"] = dataframe["up_or_down"].values.reshape(-1, 1)
# 检查并处理 NaN 或无限值
dataframe["up_or_down"] = dataframe["up_or_down"].replace([np.inf, -np.inf], np.nan)
dataframe["up_or_down"] = dataframe["up_or_down"].ffill().fillna(0)
# 生成 %-volatility 特征
dataframe["%-volatility"] = dataframe["close"].pct_change().rolling(20).std()
# 确保 &-buy_rsi_pred 列的值计算正确
dataframe["&-buy_rsi"] = ta.RSI(dataframe, timeperiod=14)
if "&-buy_rsi_pred" not in dataframe.columns:
logger.warning("&-buy_rsi_pred 列不存在,正在使用 &-buy_rsi 模拟替代")
dataframe["&-buy_rsi_pred"] = dataframe["&-buy_rsi"].rolling(window=10).mean().clip(20, 40)
dataframe["&-buy_rsi_pred"] = dataframe["&-buy_rsi_pred"].fillna(dataframe["&-buy_rsi_pred"].median())
# 确保 &-sell_rsi_pred 存在(基于 buy_rsi_pred + 偏移量)
if "&-sell_rsi_pred" not in dataframe.columns:
logger.warning("&-sell_rsi_pred 列不存在,正在使用 &-buy_rsi_pred + 20 模拟替代")
dataframe["&-sell_rsi_pred"] = dataframe["&-buy_rsi_pred"] + 20
dataframe["&-sell_rsi_pred"] = dataframe["&-sell_rsi_pred"].clip(50, 90)
dataframe["&-sell_rsi_pred"] = dataframe["&-sell_rsi_pred"].fillna(dataframe["&-sell_rsi_pred"].median())
# 数据清理
for col in ["&-buy_rsi", "up_or_down", "%-volatility"]:
# 使用直接操作避免链式赋值
dataframe[col] = dataframe[col].replace([np.inf, -np.inf], np.nan)
dataframe[col] = dataframe[col].ffill() # 替代 fillna(method='ffill')
dataframe[col] = dataframe[col].fillna(dataframe[col].mean()) # 使用均值填充 NaN 值
if dataframe[col].isna().any():
logger.warning(f"目标列 {col} 仍包含 NaN填充为默认值")
except Exception as e:
logger.error(f"创建 FreqAI 目标失败:{str(e)}")
raise
# Log the shape of the target variable for debugging
logger.info(f"目标列形状:{dataframe['up_or_down'].shape}")
logger.info(f"目标列预览:\n{dataframe[['up_or_down', '&-buy_rsi']].head().to_string()}")
return dataframe
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
logger.info(f"处理交易对:{metadata['pair']}")
dataframe = self.freqai.start(dataframe, metadata, self)
# 计算传统指标
dataframe["rsi"] = ta.RSI(dataframe, timeperiod=14)
bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2)
dataframe["bb_lowerband"] = bollinger["lower"]
dataframe["bb_middleband"] = bollinger["mid"]
dataframe["bb_upperband"] = bollinger["upper"]
dataframe["tema"] = ta.TEMA(dataframe, timeperiod=9)
# 生成 up_or_down 信号(非 FreqAI 目标)
label_period = self.freqai_info["feature_parameters"]["label_period_candles"]
dataframe["up_or_down"] = np.where(
dataframe["close"].shift(-label_period) > dataframe["close"], 1, 0
)
# 确保 do_predict 列存在并填充默认值
if "do_predict" not in dataframe.columns:
dataframe["do_predict"] = 0
logger.warning("do_predict 列不存在,已创建并填充为 0。")
# 添加详细日志以验证 do_predict 的值
if "do_predict" in dataframe.columns:
logger.debug(f"do_predict 列存在前5行预览\n{dataframe[['do_predict']].head().to_string()}")
else:
logger.warning("do_predict 列不存在,可能未正确加载模型或进行预测。")
# 确保 &-buy_rsi_pred 存在(如果模型未提供则模拟生成)
if "&-buy_rsi" in dataframe.columns:
dataframe["&-buy_rsi_pred"] = dataframe["&-buy_rsi"].rolling(window=10).mean().clip(20, 40)
dataframe["&-buy_rsi_pred"] = dataframe["&-buy_rsi_pred"].fillna(dataframe["&-buy_rsi_pred"].median())
else:
logger.warning("&-buy_rsi 列不存在,无法生成 &-buy_rsi_pred将使用默认值")
dataframe["&-buy_rsi_pred"] = 27 # 默认 RSI 买入阈值
# 生成 &-sell_rsi_pred基于 buy_rsi_pred + 偏移量)
if "&-buy_rsi_pred" in dataframe.columns:
dataframe["&-sell_rsi_pred"] = dataframe["&-buy_rsi_pred"] + 20
dataframe["&-sell_rsi_pred"] = dataframe["&-sell_rsi_pred"].clip(50, 90)
dataframe["&-sell_rsi_pred"] = dataframe["&-sell_rsi_pred"].fillna(dataframe["&-sell_rsi_pred"].median())
else:
logger.warning("&-buy_rsi_pred 列不存在,无法生成 &-sell_rsi_pred将使用默认值")
dataframe["&-sell_rsi_pred"] = 59 # 默认 RSI 卖出阈值
# 确保 &-sell_rsi_pred 最终存在于 dataframe 中
if "&-sell_rsi_pred" not in dataframe.columns:
logger.error("&-sell_rsi_pred 列仍然缺失,策略可能无法正常运行")
raise ValueError("&-sell_rsi_pred 列缺失,无法继续执行策略逻辑")
# 生成 stoploss_pred基于波动率
dataframe["%-volatility"] = dataframe["close"].pct_change().rolling(20).std()
dataframe["&-stoploss"] = (-0.1 - (dataframe["%-volatility"] * 10).clip(0, 0.25)).fillna(-0.1)
dataframe["&-stoploss"] = dataframe["&-stoploss"].clip(-0.35, -0.1)
# 生成 roi_0_pred基于 ROI 参数)
dataframe["&-roi_0"] = ((dataframe["close"] / dataframe["close"].shift(label_period) - 1).clip(0, 0.2)).fillna(0)
# 设置策略级参数
try:
self.buy_rsi.value = float(dataframe["&-buy_rsi_pred"].iloc[-1])
self.sell_rsi.value = float(dataframe["&-sell_rsi_pred"].iloc[-1])
except Exception as e:
logger.error(f"设置 buy_rsi/sell_rsi 失败:{str(e)}")
self.buy_rsi.value = 27
self.sell_rsi.value = 59
self.stoploss = -0.15 # 固定止损 15%
self.minimal_roi = {
0: float(self.roi_0.value),
15: float(self.roi_15.value),
30: float(self.roi_30.value),
60: 0
}
# 更保守的追踪止损设置
self.trailing_stop_positive = 0.05 # 追踪止损触发点
self.trailing_stop_positive_offset = 0.1 # 追踪止损偏移量
logger.info(f"动态参数buy_rsi={self.buy_rsi.value}, sell_rsi={self.sell_rsi.value}, "
f"stoploss={self.stoploss}, trailing_stop_positive={self.trailing_stop_positive}")
# 数据清理
dataframe.replace([np.inf, -np.inf], 0, inplace=True)
dataframe.ffill(inplace=True)
dataframe.fillna(0, inplace=True)
return dataframe
def populate_exit_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
# 改进卖出信号条件
exit_long_conditions = [
(df["rsi"] > df["&-sell_rsi_pred"]), # RSI 高于卖出阈值
(df["volume"] > df["volume"].rolling(window=10).mean()), # 成交量高于近期均值
(df["close"] < df["bb_middleband"]) # 价格低于布林带中轨
]
# 添加详细日志以验证 do_predict 的值
if "do_predict" in df.columns:
logger.debug(f"do_predict 列存在前5行预览\n{df[['do_predict']].head().to_string()}")
else:
logger.warning("do_predict 列不存在,可能未正确加载模型或进行预测。")
if exit_long_conditions:
df.loc[
reduce(lambda x, y: x & y, exit_long_conditions),
"exit_long"
] = 1
if "&-buy_rsi_pred" in df.columns:
logger.debug(f"&-buy_rsi_pred 列存在前5行预览\n{df[['&-buy_rsi_pred']].head().to_string()}")
else:
logger.warning("&-buy_rsi_pred 列不存在,可能未正确生成或被覆盖。")
return df
def populate_entry_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
# 改进买入信号条件
# 检查 MACD 列是否存在
if "macd" not in df.columns or "macdsignal" not in df.columns:
logger.error("MACD 或 MACD 信号列缺失,无法生成买入信号。尝试重新计算 MACD 列。")
try:
macd = ta.MACD(df, fastperiod=12, slowperiod=26, signalperiod=9)
df["macd"] = macd["macd"]
df["macdsignal"] = macd["macdsignal"]
logger.info("MACD 列已成功重新计算。")
except Exception as e:
logger.error(f"重新计算 MACD 列时出错:{str(e)}")
raise ValueError("DataFrame 缺少必要的 MACD 列且无法重新计算。")
enter_long_conditions = [
(df["rsi"] < df["&-buy_rsi_pred"]), # RSI 低于买入阈值
(df["volume"] > df["volume"].rolling(window=10).mean() * 1.2), # 成交量高于近期均值20%
(df["close"] > df["bb_middleband"]) # 价格高于布林带中轨
]
# 如果 MACD 列存在,则添加 MACD 金叉条件
if "macd" in df.columns and "macdsignal" in df.columns:
enter_long_conditions.append((df["macd"] > df["macdsignal"]))
# 确保模型预测为买入
enter_long_conditions.append((df["do_predict"] == 1))
# 添加详细日志以验证 do_predict 的值
if "do_predict" in df.columns:
logger.debug(f"do_predict 列存在前5行预览\n{df[['do_predict']].head().to_string()}")
else:
logger.warning("do_predict 列不存在,可能未正确加载模型或进行预测。")
if enter_long_conditions:
df.loc[
reduce(lambda x, y: x & y, enter_long_conditions),
["enter_long", "enter_tag"]
] = (1, "long")
return df
def confirm_trade_entry(
self, pair: str, order_type: str, amount: float, rate: float,
time_in_force: str, current_time, entry_tag, side: str, **kwargs
) -> bool:
df, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = df.iloc[-1].squeeze()
if side == "long":
if rate > (last_candle["close"] * (1 + 0.0025)):
return False
return True