重大发现:ML数据新鲜度问题

This commit is contained in:
zhangkun9038@dingtalk.com 2026-02-08 03:17:57 +08:00
parent dd596ef593
commit cb21bde327

View File

@ -48,6 +48,20 @@ class FreqaiPrimer(IStrategy):
# 冷启动保护:记录策略启动时间
self._strategy_start_time = datetime.datetime.now(datetime.timezone.utc)
# ========== 数据新鲜度监控统计 ==========
# 用于量化和诊断数据延迟问题的严重性
self._freshness_stats = {
'total_checks': 0, # 总检查次数
'fresh_count': 0, # 新鲜数据次数 (< 10分钟)
'warning_count': 0, # 警告次数 (10-30分钟)
'stale_count': 0, # 过期次数 (> 30分钟)
'max_delay_minutes': 0.0, # 最大延迟时间
'total_delay_minutes': 0.0, # 累计延迟时间(用于计算平均值)
'pair_delays': {}, # 每个币对的延迟统计 {pair: {'count': N, 'avg_delay': M, 'max_delay': X}}
'last_report_time': None, # 上次报告时间
}
self._freshness_report_interval = 3600 # 每小时输出一次统计报告
def strategy_log(self, message: str, level: str = "info") -> None:
@ -63,6 +77,122 @@ class FreqaiPrimer(IStrategy):
logger.error(message)
else:
logger.info(message)
def update_freshness_stats(self, pair: str, data_age_minutes: float) -> None:
"""
更新数据新鲜度统计
:param pair: 交易对
:param data_age_minutes: 数据延迟时间分钟
"""
stats = self._freshness_stats
stats['total_checks'] += 1
stats['total_delay_minutes'] += data_age_minutes
stats['max_delay_minutes'] = max(stats['max_delay_minutes'], data_age_minutes)
# 分类统计
if data_age_minutes <= 10:
stats['fresh_count'] += 1
elif data_age_minutes <= 30:
stats['warning_count'] += 1
else:
stats['stale_count'] += 1
# 每个币对的统计
if pair not in stats['pair_delays']:
stats['pair_delays'][pair] = {
'count': 0,
'total_delay': 0.0,
'max_delay': 0.0,
'fresh': 0,
'warning': 0,
'stale': 0
}
pair_stat = stats['pair_delays'][pair]
pair_stat['count'] += 1
pair_stat['total_delay'] += data_age_minutes
pair_stat['max_delay'] = max(pair_stat['max_delay'], data_age_minutes)
if data_age_minutes <= 10:
pair_stat['fresh'] += 1
elif data_age_minutes <= 30:
pair_stat['warning'] += 1
else:
pair_stat['stale'] += 1
def report_freshness_stats(self, force: bool = False) -> None:
"""
输出数据新鲜度统计报告
:param force: 强制输出忽略时间间隔
"""
from datetime import datetime, timezone
current_time = datetime.now(timezone.utc)
stats = self._freshness_stats
# 检查是否需要报告
if not force and stats['last_report_time'] is not None:
elapsed = (current_time - stats['last_report_time']).total_seconds()
if elapsed < self._freshness_report_interval:
return
# 如果没有数据,不输出
if stats['total_checks'] == 0:
return
# 计算统计指标
avg_delay = stats['total_delay_minutes'] / stats['total_checks']
fresh_rate = stats['fresh_count'] / stats['total_checks'] * 100
warning_rate = stats['warning_count'] / stats['total_checks'] * 100
stale_rate = stats['stale_count'] / stats['total_checks'] * 100
# 输出总体报告
self.strategy_log(
f"\n"
f"========================================\n"
f" 📊 数据新鲜度统计报告\n"
f"========================================\n"
f"总检查次数: {stats['total_checks']}\n"
f"新鲜数据 (<10min): {stats['fresh_count']} ({fresh_rate:.1f}%)\n"
f"警告数据 (10-30min): {stats['warning_count']} ({warning_rate:.1f}%)\n"
f"过期数据 (>30min): {stats['stale_count']} ({stale_rate:.1f}%)\n"
f"平均延迟: {avg_delay:.2f} 分钟\n"
f"最大延迟: {stats['max_delay_minutes']:.2f} 分钟\n"
f"========================================",
level="info"
)
# 输出每个币对的详细统计(按过期率排序)
if stats['pair_delays']:
pair_stats_list = []
for pair, pair_stat in stats['pair_delays'].items():
avg_pair_delay = pair_stat['total_delay'] / pair_stat['count']
stale_rate_pair = pair_stat['stale'] / pair_stat['count'] * 100
pair_stats_list.append({
'pair': pair,
'avg_delay': avg_pair_delay,
'max_delay': pair_stat['max_delay'],
'stale_rate': stale_rate_pair,
'count': pair_stat['count']
})
# 按过期率排序(从高到低)
pair_stats_list.sort(key=lambda x: x['stale_rate'], reverse=True)
report_lines = ["📈 各币对数据新鲜度排名(按过期率降序):"]
for i, ps in enumerate(pair_stats_list[:10], 1): # 只显示前10个最差的
report_lines.append(
f" {i}. {ps['pair']:15s} | "
f"平均: {ps['avg_delay']:5.1f}min | "
f"最大: {ps['max_delay']:5.1f}min | "
f"过期率: {ps['stale_rate']:5.1f}% | "
f"检查: {ps['count']}"
)
self.strategy_log("\n".join(report_lines), level="info")
# 更新报告时间
stats['last_report_time'] = current_time
# 只用于adjust_trade_position方法的波动系数获取
@ -948,22 +1078,81 @@ class FreqaiPrimer(IStrategy):
df, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
if len(df) > 0:
last_row = df.iloc[-1]
entry_prob = None
# 优先使用 FreqAI 的 entry_signal 预测列
if '&s-entry_signal' in df.columns:
entry_prob = float(last_row['&s-entry_signal'])
elif '&-entry_signal_prob' in df.columns:
entry_prob = float(last_row['&-entry_signal_prob'])
elif '&-s-entry_signal_prob' in df.columns:
entry_prob = float(last_row['&-s-entry_signal_prob'])
elif '&-entry_signal' in df.columns:
val = last_row['&-entry_signal']
if isinstance(val, (int, float)):
entry_prob = float(val)
else:
# 文本标签时,简单映射为 0/1
entry_prob = 1.0 if str(val).lower() in ['entry', 'buy', '1'] else 0.0
# ========== 关键修复:数据新鲜度检查 ==========
# 检查 ML 预测数据是否过期,避免使用过时的预测结果
# 注意:只在 Live/Dryrun 模式下检查,回测模式不需要(数据总是同步的)
freshness_rejected = False
if self.dp.runmode.value in ('live', 'dry_run'):
data_timestamp = last_row.get('date')
if data_timestamp is not None:
# 确保 data_timestamp 是 datetime 对象
if not isinstance(data_timestamp, datetime):
try:
from pandas import Timestamp
if isinstance(data_timestamp, Timestamp):
data_timestamp = data_timestamp.to_pydatetime()
except:
pass
# 计算数据延迟(分钟)
if isinstance(data_timestamp, datetime):
# 处理时区一致性
if data_timestamp.tzinfo is None and current_time.tzinfo is not None:
# 如果 data_timestamp 没有时区,假设为 UTC
import pytz
data_timestamp = pytz.utc.localize(data_timestamp)
elif data_timestamp.tzinfo is not None and current_time.tzinfo is None:
# 如果 current_time 没有时区,假设为 UTC
import pytz
current_time = pytz.utc.localize(current_time)
data_age_minutes = (current_time - data_timestamp).total_seconds() / 60
# 📊 更新统计数据
self.update_freshness_stats(pair, data_age_minutes)
# 📈 定期输出统计报告(每小时)
self.report_freshness_stats()
# 如果数据超过 30 分钟,拒绝入场
if data_age_minutes > 30:
rejected_conditions.append(
f"数据过期: 最新数据时间 {data_timestamp}, "
f"距离现在 {data_age_minutes:.1f} 分钟ML 预测已失效"
)
self.strategy_log(
f"[数据新鲜度] [{pair}] ❌ {rejected_conditions[-1]}"
)
freshness_rejected = True
elif data_age_minutes > 10:
# 10-30分钟警告但允许入场
self.strategy_log(
f"[数据新鲜度] [{pair}] ⚠️ 数据较旧: 最新数据时间 {data_timestamp}, "
f"距离现在 {data_age_minutes:.1f} 分钟,警告但允许入场"
)
# ========== 数据新鲜度检查结束 ==========
if freshness_rejected:
# 数据过期,直接拒绝,不再进行后续检查
pass
else:
entry_prob = None
# 优先使用 FreqAI 的 entry_signal 预测列
if '&s-entry_signal' in df.columns:
entry_prob = float(last_row['&s-entry_signal'])
elif '&-entry_signal_prob' in df.columns:
entry_prob = float(last_row['&-entry_signal_prob'])
elif '&-s-entry_signal_prob' in df.columns:
entry_prob = float(last_row['&-s-entry_signal_prob'])
elif '&-entry_signal' in df.columns:
val = last_row['&-entry_signal']
if isinstance(val, (int, float)):
entry_prob = float(val)
else:
# 文本标签时,简单映射为 0/1
entry_prob = 1.0 if str(val).lower() in ['entry', 'buy', '1'] else 0.0
# ========== 新增:入场诊断统计 ==========
# 统计当前入场点的关键指标,用于分析"买在高位"问题