From cb21bde3270472d9661a89286d597edd00a0c582 Mon Sep 17 00:00:00 2001 From: "zhangkun9038@dingtalk.com" Date: Sun, 8 Feb 2026 03:17:57 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E5=A4=A7=E5=8F=91=E7=8E=B0=EF=BC=9AML?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E6=96=B0=E9=B2=9C=E5=BA=A6=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- freqtrade/templates/freqaiprimer.py | 219 ++++++++++++++++++++++++++-- 1 file changed, 204 insertions(+), 15 deletions(-) diff --git a/freqtrade/templates/freqaiprimer.py b/freqtrade/templates/freqaiprimer.py index deec5dd5..89605464 100644 --- a/freqtrade/templates/freqaiprimer.py +++ b/freqtrade/templates/freqaiprimer.py @@ -48,6 +48,20 @@ class FreqaiPrimer(IStrategy): # 冷启动保护:记录策略启动时间 self._strategy_start_time = datetime.datetime.now(datetime.timezone.utc) + + # ========== 数据新鲜度监控统计 ========== + # 用于量化和诊断数据延迟问题的严重性 + self._freshness_stats = { + 'total_checks': 0, # 总检查次数 + 'fresh_count': 0, # 新鲜数据次数 (< 10分钟) + 'warning_count': 0, # 警告次数 (10-30分钟) + 'stale_count': 0, # 过期次数 (> 30分钟) + 'max_delay_minutes': 0.0, # 最大延迟时间 + 'total_delay_minutes': 0.0, # 累计延迟时间(用于计算平均值) + 'pair_delays': {}, # 每个币对的延迟统计 {pair: {'count': N, 'avg_delay': M, 'max_delay': X}} + 'last_report_time': None, # 上次报告时间 + } + self._freshness_report_interval = 3600 # 每小时输出一次统计报告 def strategy_log(self, message: str, level: str = "info") -> None: @@ -63,6 +77,122 @@ class FreqaiPrimer(IStrategy): logger.error(message) else: logger.info(message) + + def update_freshness_stats(self, pair: str, data_age_minutes: float) -> None: + """ + 更新数据新鲜度统计 + :param pair: 交易对 + :param data_age_minutes: 数据延迟时间(分钟) + """ + stats = self._freshness_stats + stats['total_checks'] += 1 + stats['total_delay_minutes'] += data_age_minutes + stats['max_delay_minutes'] = max(stats['max_delay_minutes'], data_age_minutes) + + # 分类统计 + if data_age_minutes <= 10: + stats['fresh_count'] += 1 + elif data_age_minutes <= 30: + stats['warning_count'] += 1 + else: + stats['stale_count'] += 1 + + # 每个币对的统计 + if pair not in stats['pair_delays']: + stats['pair_delays'][pair] = { + 'count': 0, + 'total_delay': 0.0, + 'max_delay': 0.0, + 'fresh': 0, + 'warning': 0, + 'stale': 0 + } + + pair_stat = stats['pair_delays'][pair] + pair_stat['count'] += 1 + pair_stat['total_delay'] += data_age_minutes + pair_stat['max_delay'] = max(pair_stat['max_delay'], data_age_minutes) + + if data_age_minutes <= 10: + pair_stat['fresh'] += 1 + elif data_age_minutes <= 30: + pair_stat['warning'] += 1 + else: + pair_stat['stale'] += 1 + + def report_freshness_stats(self, force: bool = False) -> None: + """ + 输出数据新鲜度统计报告 + :param force: 强制输出(忽略时间间隔) + """ + from datetime import datetime, timezone + + current_time = datetime.now(timezone.utc) + stats = self._freshness_stats + + # 检查是否需要报告 + if not force and stats['last_report_time'] is not None: + elapsed = (current_time - stats['last_report_time']).total_seconds() + if elapsed < self._freshness_report_interval: + return + + # 如果没有数据,不输出 + if stats['total_checks'] == 0: + return + + # 计算统计指标 + avg_delay = stats['total_delay_minutes'] / stats['total_checks'] + fresh_rate = stats['fresh_count'] / stats['total_checks'] * 100 + warning_rate = stats['warning_count'] / stats['total_checks'] * 100 + stale_rate = stats['stale_count'] / stats['total_checks'] * 100 + + # 输出总体报告 + self.strategy_log( + f"\n" + f"========================================\n" + f" 📊 数据新鲜度统计报告\n" + f"========================================\n" + f"总检查次数: {stats['total_checks']}\n" + f"新鲜数据 (<10min): {stats['fresh_count']} ({fresh_rate:.1f}%)\n" + f"警告数据 (10-30min): {stats['warning_count']} ({warning_rate:.1f}%)\n" + f"过期数据 (>30min): {stats['stale_count']} ({stale_rate:.1f}%)\n" + f"平均延迟: {avg_delay:.2f} 分钟\n" + f"最大延迟: {stats['max_delay_minutes']:.2f} 分钟\n" + f"========================================", + level="info" + ) + + # 输出每个币对的详细统计(按过期率排序) + if stats['pair_delays']: + pair_stats_list = [] + for pair, pair_stat in stats['pair_delays'].items(): + avg_pair_delay = pair_stat['total_delay'] / pair_stat['count'] + stale_rate_pair = pair_stat['stale'] / pair_stat['count'] * 100 + pair_stats_list.append({ + 'pair': pair, + 'avg_delay': avg_pair_delay, + 'max_delay': pair_stat['max_delay'], + 'stale_rate': stale_rate_pair, + 'count': pair_stat['count'] + }) + + # 按过期率排序(从高到低) + pair_stats_list.sort(key=lambda x: x['stale_rate'], reverse=True) + + report_lines = ["📈 各币对数据新鲜度排名(按过期率降序):"] + for i, ps in enumerate(pair_stats_list[:10], 1): # 只显示前10个最差的 + report_lines.append( + f" {i}. {ps['pair']:15s} | " + f"平均: {ps['avg_delay']:5.1f}min | " + f"最大: {ps['max_delay']:5.1f}min | " + f"过期率: {ps['stale_rate']:5.1f}% | " + f"检查: {ps['count']}次" + ) + + self.strategy_log("\n".join(report_lines), level="info") + + # 更新报告时间 + stats['last_report_time'] = current_time # 只用于adjust_trade_position方法的波动系数获取 @@ -948,22 +1078,81 @@ class FreqaiPrimer(IStrategy): df, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) if len(df) > 0: last_row = df.iloc[-1] - entry_prob = None - # 优先使用 FreqAI 的 entry_signal 预测列 - if '&s-entry_signal' in df.columns: - entry_prob = float(last_row['&s-entry_signal']) - elif '&-entry_signal_prob' in df.columns: - entry_prob = float(last_row['&-entry_signal_prob']) - elif '&-s-entry_signal_prob' in df.columns: - entry_prob = float(last_row['&-s-entry_signal_prob']) - elif '&-entry_signal' in df.columns: - val = last_row['&-entry_signal'] - if isinstance(val, (int, float)): - entry_prob = float(val) - else: - # 文本标签时,简单映射为 0/1 - entry_prob = 1.0 if str(val).lower() in ['entry', 'buy', '1'] else 0.0 + # ========== 关键修复:数据新鲜度检查 ========== + # 检查 ML 预测数据是否过期,避免使用过时的预测结果 + # 注意:只在 Live/Dryrun 模式下检查,回测模式不需要(数据总是同步的) + freshness_rejected = False + if self.dp.runmode.value in ('live', 'dry_run'): + data_timestamp = last_row.get('date') + if data_timestamp is not None: + # 确保 data_timestamp 是 datetime 对象 + if not isinstance(data_timestamp, datetime): + try: + from pandas import Timestamp + if isinstance(data_timestamp, Timestamp): + data_timestamp = data_timestamp.to_pydatetime() + except: + pass + + # 计算数据延迟(分钟) + if isinstance(data_timestamp, datetime): + # 处理时区一致性 + if data_timestamp.tzinfo is None and current_time.tzinfo is not None: + # 如果 data_timestamp 没有时区,假设为 UTC + import pytz + data_timestamp = pytz.utc.localize(data_timestamp) + elif data_timestamp.tzinfo is not None and current_time.tzinfo is None: + # 如果 current_time 没有时区,假设为 UTC + import pytz + current_time = pytz.utc.localize(current_time) + + data_age_minutes = (current_time - data_timestamp).total_seconds() / 60 + + # 📊 更新统计数据 + self.update_freshness_stats(pair, data_age_minutes) + + # 📈 定期输出统计报告(每小时) + self.report_freshness_stats() + + # 如果数据超过 30 分钟,拒绝入场 + if data_age_minutes > 30: + rejected_conditions.append( + f"数据过期: 最新数据时间 {data_timestamp}, " + f"距离现在 {data_age_minutes:.1f} 分钟,ML 预测已失效" + ) + self.strategy_log( + f"[数据新鲜度] [{pair}] ❌ {rejected_conditions[-1]}" + ) + freshness_rejected = True + elif data_age_minutes > 10: + # 10-30分钟:警告但允许入场 + self.strategy_log( + f"[数据新鲜度] [{pair}] ⚠️ 数据较旧: 最新数据时间 {data_timestamp}, " + f"距离现在 {data_age_minutes:.1f} 分钟,警告但允许入场" + ) + # ========== 数据新鲜度检查结束 ========== + + if freshness_rejected: + # 数据过期,直接拒绝,不再进行后续检查 + pass + else: + entry_prob = None + + # 优先使用 FreqAI 的 entry_signal 预测列 + if '&s-entry_signal' in df.columns: + entry_prob = float(last_row['&s-entry_signal']) + elif '&-entry_signal_prob' in df.columns: + entry_prob = float(last_row['&-entry_signal_prob']) + elif '&-s-entry_signal_prob' in df.columns: + entry_prob = float(last_row['&-s-entry_signal_prob']) + elif '&-entry_signal' in df.columns: + val = last_row['&-entry_signal'] + if isinstance(val, (int, float)): + entry_prob = float(val) + else: + # 文本标签时,简单映射为 0/1 + entry_prob = 1.0 if str(val).lower() in ['entry', 'buy', '1'] else 0.0 # ========== 新增:入场诊断统计 ========== # 统计当前入场点的关键指标,用于分析"买在高位"问题