动态计算阈值future_adx_threshold

This commit is contained in:
zhangkun9038@dingtalk.com 2025-06-02 01:14:48 +00:00
parent d37385ebc7
commit f0cd5e00bb

View File

@ -8,11 +8,10 @@ from datetime import datetime
import os
import json
import glob
from functools import reduce
logger = logging.getLogger(__name__)
class FreqaiPrimer(IStrategy):
class HybridStrategy(IStrategy):
minimal_roi = {
"0": 0.02,
"30": 0.01,
@ -42,7 +41,7 @@ class FreqaiPrimer(IStrategy):
"verbose": -1,
},
"fit_live_predictions_candles": 100,
"live_retrain_candles": 100,
"live_retrain_candles": 50,
}
def __init__(self, config: dict, *args, **kwargs):
@ -56,6 +55,105 @@ class FreqaiPrimer(IStrategy):
self.stats_logged = False
self.buy_threshold = -0.01
self.sell_threshold = 0.01
self.prev_future_adx_mean = None
self.prev_future_adx_std = None
self.prev_price_value_divergence_mean = None
self.prev_price_value_divergence_std = None
def load_metadata_stats(self, pair: str, identifier: str, dataframe: DataFrame) -> tuple[dict, dict]:
"""
metadata.json 文件读取 labels_mean labels_std并应用平滑逻辑
Args:
pair (str): 币对名称例如 "BTC/USDT"
identifier (str): FreqAI 模型标识例如 "test58"
dataframe (DataFrame): 用于回退计算的数据框架
Returns:
tuple: (平滑后的 labels_mean, 平滑后的 labels_std)
"""
labels_mean = None
labels_std = None
pair_base = pair.split('/')[0] if '/' in pair else pair
try:
model_base_dir = os.path.join(self.config["user_data_dir"], "models", identifier)
sub_dirs = glob.glob(os.path.join(model_base_dir, f"sub-train-{pair_base}_*"))
if not sub_dirs:
logger.warning(f"[{pair}] 未找到任何子目录:{model_base_dir}/sub-train-{pair_base}_*")
else:
latest_sub_dir = max(sub_dirs, key=lambda x: int(x.split('_')[-1]))
pair_base_lower = pair_base.lower()
timestamp = latest_sub_dir.split('_')[-1]
metadata_file = os.path.join(latest_sub_dir, f"cb_{pair_base_lower}_{timestamp}_metadata.json")
if os.path.exists(metadata_file):
with open(metadata_file, "r") as f:
metadata = json.load(f)
labels_mean = metadata["labels_mean"]
labels_std = metadata["labels_std"]
logger.info(f"[{pair}] 从最新子目录 {latest_sub_dir} 读取 labels_mean{labels_mean}, labels_std{labels_std}")
else:
logger.warning(f"[{pair}] 最新的 metadata.json 文件 {metadata_file} 不存在")
except Exception as e:
logger.warning(f"[{pair}] 无法从子目录读取 labels_mean 和 labels_std{e},重新计算")
# 回退计算
if labels_mean is None or labels_std is None:
logger.warning(f"[{pair}] 无法获取 labels_mean 和 labels_std重新计算")
dataframe["&-price_value_divergence_actual"] = (dataframe["close"] - dataframe["ema200"]) / dataframe["ema200"]
dataframe["&-price_value_divergence_actual"] = dataframe["&-price_value_divergence_actual"].replace([np.inf, -np.inf], 0).ffill().fillna(0)
recent_data = dataframe["&-price_value_divergence_actual"].tail(self.fit_live_predictions_candles)
labels_mean = {
"&-future_adx": dataframe["adx"].tail(self.fit_live_predictions_candles).mean(),
"&-price_value_divergence": recent_data.mean()
}
labels_std = {
"&-future_adx": dataframe["adx"].tail(self.fit_live_predictions_candles).std(),
"&-price_value_divergence": recent_data.std()
}
if np.isnan(labels_std["&-future_adx"]) or labels_std["&-future_adx"] == 0:
labels_std["&-future_adx"] = 1.0
if np.isnan(labels_std["&-price_value_divergence"]) or labels_std["&-price_value_divergence"] == 0:
labels_std["&-price_value_divergence"] = 0.01
logger.warning(f"[{pair}] labels_std 计算异常,使用默认值")
# 平滑"&-future_adx"的均值和标准差
current_future_adx_mean = labels_mean["&-future_adx"]
current_future_adx_std = labels_std["&-future_adx"]
if self.prev_future_adx_mean is not None and self.prev_future_adx_std is not None:
smoothed_future_adx_mean = 0.7 * current_future_adx_mean + 0.3 * self.prev_future_adx_mean
smoothed_future_adx_std = 0.7 * current_future_adx_std + 0.3 * self.prev_future_adx_std
else:
smoothed_future_adx_mean = current_future_adx_mean
smoothed_future_adx_std = current_future_adx_std
self.prev_future_adx_mean = smoothed_future_adx_mean
self.prev_future_adx_std = smoothed_future_adx_std
# 平滑"&-price_value_divergence"的均值和标准差
current_price_value_divergence_mean = labels_mean["&-price_value_divergence"]
current_price_value_divergence_std = labels_std["&-price_value_divergence"]
if self.prev_price_value_divergence_mean is not None and self.prev_price_value_divergence_std is not None:
smoothed_price_value_divergence_mean = 0.7 * current_price_value_divergence_mean + 0.3 * self.prev_price_value_divergence_mean
smoothed_price_value_divergence_std = 0.7 * current_price_value_divergence_std + 0.3 * self.prev_price_value_divergence_std
else:
smoothed_price_value_divergence_mean = current_price_value_divergence_mean
smoothed_price_value_divergence_std = current_price_value_divergence_std
self.prev_price_value_divergence_mean = smoothed_price_value_divergence_mean
self.prev_price_value_divergence_std = smoothed_price_value_divergence_std
# 更新平滑后的值
smoothed_labels_mean = {
"&-future_adx": smoothed_future_adx_mean,
"&-price_value_divergence": smoothed_price_value_divergence_mean
}
smoothed_labels_std = {
"&-future_adx": smoothed_future_adx_std,
"&-price_value_divergence": smoothed_price_value_divergence_std
}
return smoothed_labels_mean, smoothed_labels_std
def feature_engineering_expand_all(self, dataframe: DataFrame, period: int, metadata: dict, **kwargs) -> DataFrame:
dataframe["%-rsi-period"] = ta.RSI(dataframe, timeperiod=period)
@ -148,83 +246,60 @@ class FreqaiPrimer(IStrategy):
for col in ["ema200", "ema200_slope", "ema20", "ema50", "adx", "atr", "atr_median", "bb_upperband", "bb_middleband", "bb_lowerband", "rsi", "volume_z_score", "&-future_adx", "&-price_value_divergence"]:
dataframe[col] = dataframe[col].replace([np.inf, -np.inf], 0).ffill().fillna(0)
# 动态阈值计算
labels_mean = None
labels_std = None
try:
model_base_dir = os.path.join(self.config["user_data_dir"], "models", self.freqai_info["identifier"])
pair_base = pair.split('/')[0] if '/' in pair else pair
sub_dirs = glob.glob(os.path.join(model_base_dir, f"sub-train-{pair_base}_*"))
if not sub_dirs:
logger.warning(f"[{pair}] 未找到任何子目录:{model_base_dir}/sub-train-{pair_base}_*")
else:
latest_sub_dir = max(sub_dirs, key=lambda x: int(x.split('_')[-1]))
pair_base_lower = pair_base.lower()
timestamp = latest_sub_dir.split('_')[-1]
metadata_file = os.path.join(latest_sub_dir, f"cb_{pair_base_lower}_{timestamp}_metadata.json")
if os.path.exists(metadata_file):
with open(metadata_file, "r") as f:
metadata = json.load(f)
labels_mean = metadata["labels_mean"]["&-price_value_divergence"]
labels_std = metadata["labels_std"]["&-price_value_divergence"]
logger.info(f"[{pair}] 从最新子目录 {latest_sub_dir} 读取 labels_mean{labels_mean}, labels_std{labels_std}")
else:
logger.warning(f"[{pair}] 最新的 metadata.json 文件 {metadata_file} 不存在")
except Exception as e:
logger.warning(f"[{pair}] 无法从子目录读取 labels_mean 和 labels_std{e},重新计算")
if labels_mean is None or labels_std is None:
logger.warning(f"[{pair}] 无法获取 labels_mean 和 labels_std重新计算")
dataframe["&-price_value_divergence_actual"] = (dataframe["close"] - dataframe["ema200"]) / dataframe["ema200"]
dataframe["&-price_value_divergence_actual"] = dataframe["&-price_value_divergence_actual"].replace([np.inf, -np.inf], 0).ffill().fillna(0)
recent_data = dataframe["&-price_value_divergence_actual"].tail(self.fit_live_predictions_candles)
labels_mean = recent_data.mean()
labels_std = recent_data.std()
if np.isnan(labels_std) or labels_std == 0:
labels_std = 0.01
logger.warning(f"[{pair}] labels_std 计算异常,使用默认值 0.01")
# 使用抽离的函数读取并平滑统计数据
labels_mean, labels_std = self.load_metadata_stats(pair, self.freqai_info["identifier"], dataframe)
self.pair_stats[pair] = {"labels_mean": labels_mean, "labels_std": labels_std}
# 根据市场状态调整系数
market_state = "trending" if ((dataframe["adx"].iloc[-1] > 25) or (dataframe["&-future_adx"].iloc[-1] > 25)) and \
(dataframe["ema200_slope"].iloc[-1] > 0.0005) and \
# 动态调整"&-future_adx"阈值
smoothed_future_adx_mean = labels_mean["&-future_adx"]
smoothed_future_adx_std = labels_std["&-future_adx"]
if smoothed_future_adx_std > 9: # 高波动
k_adx = 0.3
elif smoothed_future_adx_std <= 5: # 低波动
k_adx = 0.7
else: # 中等波动
k_adx = 0.5
future_adx_threshold = smoothed_future_adx_mean + k_adx * smoothed_future_adx_std
future_adx_threshold = max(future_adx_threshold, 20)
future_adx_threshold = min(future_adx_threshold, 35)
# 市场状态切换
market_state = "trending" if ((dataframe["adx"].iloc[-1] > 25) or (dataframe["&-future_adx"].iloc[-1] > future_adx_threshold)) and \
(dataframe["ema200_slope"].iloc[-1] > 0) and \
(dataframe["atr"].iloc[-1] > dataframe["atr_median"].iloc[-1]) else "ranging"
if labels_std > 0.015:
k_buy = 1.2
k_sell = 1.5
elif labels_std < 0.010:
k_buy = 0.8
k_sell = 1.0
else:
k_buy = 1.0
k_sell = 1.2
# 动态调整"&-price_value_divergence"阈值
labels_std_pvd = labels_std["&-price_value_divergence"]
labels_mean_pvd = labels_mean["&-price_value_divergence"]
if labels_mean > 0.015:
if labels_std_pvd > 0.01: # 高波动
k_buy = 1.4 if market_state == "trending" else 1.2
k_sell = 1.6 if market_state == "trending" else 1.4
elif labels_std_pvd < 0.005: # 低波动
k_buy = 1.1 if market_state == "trending" else 0.9
k_sell = 1.3 if market_state == "trending" else 1.1
else: # 中等波动
k_buy = 1.2 if market_state == "trending" else 1.0
k_sell = 1.4 if market_state == "trending" else 1.2
if labels_mean_pvd > 0.015:
k_sell += 0.5
logger.info(f"[{pair}] labels_mean 较高({labels_mean:.4f}),增加 k_sell 到 {k_sell:.2f}")
logger.info(f"[{pair}] labels_mean_pvd 较高({labels_mean_pvd:.4f}),增加 k_sell 到 {k_sell:.2f}")
# 趋势市场放宽阈值,震荡市场更严格
if market_state == "trending":
k_buy *= 1.2 # 趋势市场允许更低的买入点
k_sell *= 0.9 # 趋势市场更早退出
else:
k_buy *= 0.9 # 震荡市场更谨慎买入
k_sell *= 1.1 # 震荡市场更晚退出
self.buy_threshold = labels_mean_pvd - k_buy * labels_std_pvd
self.sell_threshold = labels_mean_pvd + k_sell * labels_std_pvd
self.buy_threshold = labels_mean - k_buy * labels_std
self.sell_threshold = labels_mean + k_sell * labels_std
# 收紧阈值范围
self.buy_threshold = max(self.buy_threshold, -0.03)
self.buy_threshold = min(self.buy_threshold, -0.002)
self.sell_threshold = min(self.sell_threshold, 0.03)
self.sell_threshold = max(self.sell_threshold, 0.002)
# 设置阈值范围
self.buy_threshold = max(self.buy_threshold, -0.05)
self.buy_threshold = min(self.buy_threshold, -0.005)
self.sell_threshold = min(self.sell_threshold, 0.05)
self.sell_threshold = max(self.sell_threshold, 0.005)
logger.info(f"[{pair}] 市场状态:{market_state}, labels_mean{labels_mean:.4f}, labels_std{labels_std:.4f}")
# 日志记录
logger.info(f"[{pair}] 市场状态:{market_state}, labels_mean{labels_mean}, labels_std{labels_std}")
logger.info(f"[{pair}] k_adx{k_adx:.2f}, &-future_adx 动态阈值:{future_adx_threshold:.2f}")
logger.info(f"[{pair}] k_buy{k_buy:.2f}, k_sell{k_sell:.2f}")
logger.info(f"[{pair}] 动态买入阈值:{self.buy_threshold:.4f}")
logger.info(f"[{pair}] 动态卖出阈值:{self.sell_threshold:.4f}")
@ -232,7 +307,7 @@ class FreqaiPrimer(IStrategy):
if not self.stats_logged:
logger.info("===== 所有币对的 labels_mean 和 labels_std 汇总 =====")
for p, stats in self.pair_stats.items():
logger.info(f"[{p}] labels_mean{stats['labels_mean']:.4f}, labels_std{stats['labels_std']:.4f}")
logger.info(f"[{p}] labels_mean{stats['labels_mean']}, labels_std{stats['labels_std']}")
logger.info("==============================================")
self.stats_logged = True
@ -243,10 +318,18 @@ class FreqaiPrimer(IStrategy):
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
pair = metadata.get('pair', 'Unknown')
# 使用ADX、FreqAI预测的ADX、EMA 200斜率和ATR判断市场状态
# 动态阈值已在 populate_indicators 中计算,此处直接使用
labels_mean = self.pair_stats.get(pair, {}).get("labels_mean", {"&-future_adx": 25})
labels_std = self.pair_stats.get(pair, {}).get("labels_std", {"&-future_adx": 1})
k_adx = 0.5 if labels_std["&-future_adx"] <= 9 and labels_std["&-future_adx"] > 5 else (0.3 if labels_std["&-future_adx"] > 9 else 0.7)
future_adx_threshold = labels_mean["&-future_adx"] + k_adx * labels_std["&-future_adx"]
future_adx_threshold = max(future_adx_threshold, 20)
future_adx_threshold = min(future_adx_threshold, 35)
# 使用动态阈值判断市场状态
dataframe["market_state"] = np.where(
((dataframe["adx"] > 25) | (dataframe["&-future_adx"] > 25)) &
(dataframe["ema200_slope"] > 0.0005) &
((dataframe["adx"] > 25) | (dataframe["&-future_adx"] > future_adx_threshold)) &
(dataframe["ema200_slope"] > 0) &
(dataframe["atr"] > dataframe["atr_median"]),
"trending",
"ranging"
@ -317,8 +400,15 @@ class FreqaiPrimer(IStrategy):
hold_time = (current_time - trade.open_date_utc).total_seconds() / 60
# 确定市场状态
market_state = "trending" if ((trade.dataframe["adx"].iloc[-1] > 25) or (trade.dataframe["&-future_adx"].iloc[-1] > 25)) and \
(trade.dataframe["ema200_slope"].iloc[-1] > 0.0005) and \
labels_mean = self.pair_stats.get(trade.pair, {}).get("labels_mean", {"&-future_adx": 25})
labels_std = self.pair_stats.get(trade.pair, {}).get("labels_std", {"&-future_adx": 1})
k_adx = 0.5 if labels_std["&-future_adx"] <= 9 and labels_std["&-future_adx"] > 5 else (0.3 if labels_std["&-future_adx"] > 9 else 0.7)
future_adx_threshold = labels_mean["&-future_adx"] + k_adx * labels_std["&-future_adx"]
future_adx_threshold = max(future_adx_threshold, 20)
future_adx_threshold = min(future_adx_threshold, 35)
market_state = "trending" if ((trade.dataframe["adx"].iloc[-1] > 25) or (trade.dataframe["&-future_adx"].iloc[-1] > future_adx_threshold)) and \
(trade.dataframe["ema200_slope"].iloc[-1] > 0) and \
(trade.dataframe["atr"].iloc[-1] > trade.dataframe["atr_median"].iloc[-1]) else "ranging"
# 最小持仓时间
@ -351,8 +441,15 @@ class FreqaiPrimer(IStrategy):
def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float,
time_in_force: str, current_time: datetime, **kwargs) -> bool:
# 确定市场状态
market_state = "trending" if ((self.dataframe["adx"].iloc[-1] > 25) or (self.dataframe["&-future_adx"].iloc[-1] > 25)) and \
(self.dataframe["ema200_slope"].iloc[-1] > 0.0005) and \
labels_mean = self.pair_stats.get(pair, {}).get("labels_mean", {"&-future_adx": 25})
labels_std = self.pair_stats.get(pair, {}).get("labels_std", {"&-future_adx": 1})
k_adx = 0.5 if labels_std["&-future_adx"] <= 9 and labels_std["&-future_adx"] > 5 else (0.3 if labels_std["&-future_adx"] > 9 else 0.7)
future_adx_threshold = labels_mean["&-future_adx"] + k_adx * labels_std["&-future_adx"]
future_adx_threshold = max(future_adx_threshold, 20)
future_adx_threshold = min(future_adx_threshold, 35)
market_state = "trending" if ((self.dataframe["adx"].iloc[-1] > 25) or (self.dataframe["&-future_adx"].iloc[-1] > future_adx_threshold)) and \
(self.dataframe["ema200_slope"].iloc[-1] > 0) and \
(self.dataframe["atr"].iloc[-1] > self.dataframe["atr_median"].iloc[-1]) else "ranging"
# 根据市场状态调整冷却期
@ -374,8 +471,15 @@ class FreqaiPrimer(IStrategy):
rate: float, time_in_force: str, exit_reason: str,
current_time: datetime, **kwargs) -> bool:
# 确定市场状态
market_state = "trending" if ((trade.dataframe["adx"].iloc[-1] > 25) or (trade.dataframe["&-future_adx"].iloc[-1] > 25)) and \
(trade.dataframe["ema200_slope"].iloc[-1] > 0.0005) and \
labels_mean = self.pair_stats.get(pair, {}).get("labels_mean", {"&-future_adx": 25})
labels_std = self.pair_stats.get(pair, {}).get("labels_std", {"&-future_adx": 1})
k_adx = 0.5 if labels_std["&-future_adx"] <= 9 and labels_std["&-future_adx"] > 5 else (0.3 if labels_std["&-future_adx"] > 9 else 0.7)
future_adx_threshold = labels_mean["&-future_adx"] + k_adx * labels_std["&-future_adx"]
future_adx_threshold = max(future_adx_threshold, 20)
future_adx_threshold = min(future_adx_threshold, 35)
market_state = "trending" if ((trade.dataframe["adx"].iloc[-1] > 25) or (trade.dataframe["&-future_adx"].iloc[-1] > future_adx_threshold)) and \
(trade.dataframe["ema200_slope"].iloc[-1] > 0) and \
(trade.dataframe["atr"].iloc[-1] > trade.dataframe["atr_median"].iloc[-1]) else "ranging"
# 调整卖出价格