从metadata.json里读取labels_mean和labels_std

This commit is contained in:
zhangkun9038@dingtalk.com 2025-06-01 01:50:26 +00:00
parent 9c64956c6b
commit 7984a75440

View File

@ -1,6 +1,9 @@
import logging
import numpy as np
import datetime
import os
import json
import glob
from functools import reduce
from freqtrade.persistence import Trade
import talib.abstract as ta
@ -10,7 +13,7 @@ from freqtrade.strategy import IStrategy
logger = logging.getLogger(__name__)
class FreqaiPrimer(IStrategy):
class ValueDivergencePrimerRegression(IStrategy):
minimal_roi = {
"0": 0.02,
"30": 0.01,
@ -48,7 +51,7 @@ class FreqaiPrimer(IStrategy):
}
freqai_info = {
"identifier": "divergence_model",
"identifier": "divergence_model", # 注意:实际目录是 test58可能需要调整
"model": "LightGBMRegressor",
"feature_parameters": {
"include_timeframes": ["3m", "15m", "1h"],
@ -138,6 +141,7 @@ class FreqaiPrimer(IStrategy):
logger.error(f"[{metadata['pair']}] FreqAI 未初始化,请确保回测命令中启用了 --freqai")
dataframe["&-price_value_divergence"] = dataframe["price_value_divergence"]
else:
logger.debug(f"self.freqai 类型:{type(self.freqai)}")
dataframe = self.freqai.start(dataframe, metadata, self)
if "&-price_value_divergence" not in dataframe.columns:
@ -158,27 +162,47 @@ class FreqaiPrimer(IStrategy):
for col in ["ema200", "bb_upperband", "bb_middleband", "bb_lowerband", "rsi", "volume_z_score", "&-price_value_divergence", "price_value_divergence"]:
dataframe[col] = dataframe[col].replace([np.inf, -np.inf], 0).ffill().fillna(0)
# 直接从 FreqAI 获取 labels_mean 和 labels_std
# 获取 labels_mean 和 labels_std
pair = metadata["pair"]
if hasattr(self, 'freqai') and self.freqai is not None and pair in self.freqai.data:
# 从 self.freqai.data 中获取 labels_mean 和 labels_std
if "labels_mean" in self.freqai.data and "&-price_value_divergence" in self.freqai.data["labels_mean"]:
labels_mean = self.freqai.data["labels_mean"]["&-price_value_divergence"]
labels_std = self.freqai.data["labels_std"]["&-price_value_divergence"]
labels_mean = None
labels_std = None
# 调试:打印 freqai_info 和 user_data_dir
logger.debug(f"freqai_info identifier{self.freqai_info['identifier']}")
logger.debug(f"user_data_dir{self.config['user_data_dir']}")
# 从最新的子目录读取 metadata.json
try:
# 优先使用实际目录名 test58硬编码临时解决方案
# model_base_dir = "/freqtrade/user_data/models/test58"
# 如果需要动态获取(推荐),取消注释以下代码:
model_base_dir = os.path.join(self.config["user_data_dir"], "models", self.freqai_info["identifier"])
pair_clean = pair.replace('/', '_') # 将 "KAITO/USDT" 转换为 "KAITO_USDT"
sub_dirs = glob.glob(os.path.join(model_base_dir, f"sub-train-{pair_clean}_*"))
if not sub_dirs:
logger.warning(f"[{pair}] 未找到任何子目录:{model_base_dir}/sub-train-{pair_clean}_*")
else:
logger.warning(f"[{pair}] 无法从 FreqAI 获取 labels_mean 和 labels_std重新计算")
# 回退:重新计算真实目标值
dataframe["&-price_value_divergence_actual"] = (dataframe["close"] - dataframe["ema200"]) / dataframe["ema200"]
dataframe["&-price_value_divergence_actual"] = dataframe["&-price_value_divergence_actual"].replace([np.inf, -np.inf], 0).ffill().fillna(0)
recent_data = dataframe["&-price_value_divergence_actual"].tail(self.freqai_info["fit_live_predictions_candles"])
labels_mean = recent_data.mean()
labels_std = recent_data.std()
if np.isnan(labels_std) or labels_std == 0:
labels_std = 0.01
logger.warning(f"[{pair}] labels_std 计算异常,使用默认值 0.01")
else:
logger.warning(f"[{pair}] FreqAI 未初始化或数据不可用,重新计算")
# 回退:重新计算真实目标值
# 按时间戳排序,获取最新的子目录
latest_sub_dir = max(sub_dirs, key=lambda x: int(x.split('_')[-1]))
metadata_file = os.path.join(latest_sub_dir, f"cb_{pair_clean.lower()}_{latest_sub_dir.split('_')[-1]}_metadata.json")
if os.path.exists(metadata_file):
with open(metadata_file, "r") as f:
metadata = json.load(f)
labels_mean = metadata["labels_mean"]["&-price_value_divergence"]
labels_std = metadata["labels_std"]["&-price_value_divergence"]
logger.info(f"[{pair}] 从最新子目录 {latest_sub_dir} 读取 labels_mean{labels_mean}, labels_std{labels_std}")
else:
logger.warning(f"[{pair}] 最新的 metadata.json 文件 {metadata_file} 不存在")
except Exception as e:
logger.warning(f"[{pair}] 无法从子目录读取 labels_mean 和 labels_std{e},重新计算")
# 回退:如果无法从文件读取,则重新计算
if labels_mean is None or labels_std is None:
logger.warning(f"[{pair}] 无法获取 labels_mean 和 labels_std重新计算")
dataframe["&-price_value_divergence_actual"] = (dataframe["close"] - dataframe["ema200"]) / dataframe["ema200"]
dataframe["&-price_value_divergence_actual"] = dataframe["&-price_value_divergence_actual"].replace([np.inf, -np.inf], 0).ffill().fillna(0)
recent_data = dataframe["&-price_value_divergence_actual"].tail(self.freqai_info["fit_live_predictions_candles"])
@ -220,92 +244,6 @@ class FreqaiPrimer(IStrategy):
logger.info(f"[{pair}] 动态卖出阈值:{self.sell_threshold:.4f}")
if not self.stats_logged:
logger.info("===== 所有币对的 labels_mean 和 labels_std 汇总 =====")
for p, stats in self.pair_stats.items():
logger.info(f"[{p}] labels_mean{stats['labels_mean']:.4f}, labels_std{stats['labels_std']:.4f}")
logger.info("==============================================")
self.stats_logged = True
logger.info("===== 所有币对的 labels_mean 和 labels_std 汇总...
logger.info(f"[{metadata['pair']}] 指标计算完成,列:{list(dataframe.columns)}")
return dataframe
def populate_entry_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
conditions = []
if "&-price_value_divergence" in df.columns:
buy_condition = (df["&-price_value_divergence"] < self.buy_threshold)
buy_condition &= (df["volume_z_score"] > 1.5)
buy_condition &= (df["rsi"] < 40)
buy_condition &= (df["close"] <= df["bb_lowerband"])
conditions.append(buy_condition)
else:
logger.warning("⚠️ &-price_value_divergence 列缺失,跳过该条件")
if len(conditions) > 0:
df.loc[reduce(lambda x, y: x & y, conditions), 'enter_long'] = 1
logger.info(f"[{metadata['pair']}] 入场信号触发,条件满足")
return df
def populate_exit_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
conditions = []
if "&-price_value_divergence" in df.columns:
sell_condition = (df["&-price_value_divergence"] > self.sell_threshold)
sell_condition |= (df["rsi"] > 75)
conditions.append(sell_condition)
else:
logger.warning("⚠️ &-price_value_divergence 列缺失,跳过该条件")
if len(conditions) > 0:
df.loc[reduce(lambda x, y: x & y, conditions), 'exit_long'] = 1
logger.info(f"[{metadata['pair']}] 出场信号触发,条件满足")
return df
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float,
min_roi: Dict[float, float], max_profit: float):
hold_time = (current_time - trade.open_date_utc).total_seconds() / 60
if hold_time < 15:
logger.info(f"[{trade.pair}] 持仓时间 {hold_time:.1f} 分钟,未达到最小持仓时间 15 分钟,暂不退出")
return None
profit_ratio = (current_rate - trade.open_rate) / trade.open_rate
if profit_ratio >= self.trailing_stop_start and not self.trailing_stop_enabled:
self.trailing_stop_enabled = True
trade.adjust_max_rate(current_rate)
logger.info(f"[{trade.pair}] 价格上涨超过 {self.trailing_stop_start*100:.1f}%,启动 Trailing Stop")
if self.trailing_stop_enabled:
max_rate = trade.max_rate
trailing_stop_price = max_rate * (1 - self.trailing_stop_distance)
if current_rate < trailing_stop_price:
logger.info(f"[{trade.pair}] 价格回落至 Trailing Stop 点 {trailing_stop_price:.6f},触发卖出")
return -1
trade.adjust_max_rate(current_rate)
if hold_time > 30:
logger.info(f"[{trade.pair}] 持仓时间超过30分钟强制平仓")
return -1
return None
def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float,
time_in_force: str, current_time: datetime, **kwargs) -> bool:
recent_trades = Trade.get_trades(
pair=pair,
is_open=False,
close_date=current_time - datetime.timedelta(minutes=5)
).all()
if len(recent_trades) > 0:
logger.info(f"[{pair}] 5分钟内有近期交易跳过本次入场")
return False
self.trailing_stop_enabled = False
return True
def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float,
rate: float, time_in_force: str, exit_reason: str,
current_time: datetime, **kwargs) -> bool:
logger.info(f"[{pair}] 退出交易,原因:{exit_reason}, 利润:{trade.calc_profit_ratio(rate):.2%}")
return True
出错了请刷新以重新连接或重试