diff --git a/freqtrade/templates/freqaiprimer.py b/freqtrade/templates/freqaiprimer.py index b38a5b9a..f50f7759 100644 --- a/freqtrade/templates/freqaiprimer.py +++ b/freqtrade/templates/freqaiprimer.py @@ -444,61 +444,61 @@ class FreqaiPrimer(IStrategy): """ 自适应阈值调整:根据最新训练的标签分布,动态调整当前阈值 - 逻辑: - - 如果上次正样本 > 65%,提高阈值(让标签更严格) - - 如果上次正样本 < 35%,降低阈值(让标签更宽松) - - 如果在 40%-60% 之间,保持不变(理想状态) + Logic: + - If last positive > 65%, increase threshold + - If last positive < 35%, decrease threshold + - If between 40%-60%, keep unchanged - 优先级: - 1. 从 Elasticsearch 读取(最新、最权威,每 10 分钟更新) - 2. 从 Redis 读取(如果 ES 不可用) - 3. 从本地文件读取(如果 Redis 也不可用) - 4. 使用默认值(首次训练) + Memory Mechanism: + - Only adjust once per training result. If the latest training result has already been used for adjustment, + directly return the result of that adjustment. - Args: - pair: 交易对名称(如 "BTC/USDT") - default_threshold: 默认阈值(百分比,如 1.5 表示 1.5%) - label_name: 标签名称(如 '&s-entry_signal' 或 '&s-exit_signal') - - Returns: - 调整后的阈值(百分比) + Priority: + 1. Read from Elasticsearch + 2. Read from Redis (if ES unavailable) + 3. Read from local file (if Redis unavailable) + 4. Use default value (first training) """ - import json - import os - import glob - try: - # 方案1:优先从 Elasticsearch 读取(最新数据) - entry_signal_mean = self._get_label_distribution_from_es(pair, label_name) + # Step 1: Get the latest training distribution and its timestamp + entry_signal_mean, train_ts = self._get_label_distribution_from_es(pair, label_name) - # 方案2:如果 ES 没有,从 Redis 读取 + # Fallback to Redis if entry_signal_mean is None: entry_signal_mean = self._get_label_distribution_from_redis(pair, label_name) + train_ts = "" # Redis currently doesn't store TS - # 方案3:如果 Redis 也没有,从本地文件读取 + # Fallback to file if entry_signal_mean is None: - entry_signal_mean = self._get_label_distribution_from_file(pair, label_name) + entry_signal_mean, train_ts = self._get_label_distribution_from_file(pair, label_name) - # 如果都没有,返回默认值(首次训练) + # If still None, return default if entry_signal_mean is None: return default_threshold - # === 确定本轮调整的基准阈值 === - # 优先使用上一轮实际使用的阈值(有记忆),否则退回到配置中的默认阈值 + # Step 2: Get the latest adjustment and its referenced training timestamp base_threshold = default_threshold - last_threshold = self._get_last_threshold_from_es(pair, label_name) + last_threshold, ref_ts = self._get_last_threshold_from_es(pair, label_name) + + # Step 3: Check if we have already adjusted for this training result + if train_ts and ref_ts == train_ts: + # Already adjusted for this training, return the previous result directly + if last_threshold is not None: + # self.strategy_log(f"[{pair}] {label_name} 已针对训练 {train_ts} 调整过,直接复用: {last_threshold:.4f}") + return last_threshold + + # Step 4: Determine the base for adjustment if last_threshold is not None: base_threshold = last_threshold - # 自适应调整逻辑(以 base_threshold 为基准,形成有记忆的收敛轨迹) - return self._calculate_adjusted_threshold(pair, entry_signal_mean, base_threshold, label_name) + # Step 5: Calculate and log new adjustment + return self._calculate_adjusted_threshold(pair, entry_signal_mean, base_threshold, label_name, training_timestamp=train_ts) except Exception as e: - # 出错时使用默认值 logger.warning(f"[{pair}] 自适应阈值调整失败: {e},使用默认值 {default_threshold}") return default_threshold - def _get_label_distribution_from_es(self, pair: str, label_name: str) -> float | None: + def _get_label_distribution_from_es(self, pair: str, label_name: str) -> tuple[float | None, str]: """ 从 Elasticsearch 读取该币对最新的标签分布 @@ -507,13 +507,13 @@ class FreqaiPrimer(IStrategy): label_name: 标签名称(如 "&s-entry_signal") Returns: - 标签均值,如果读取失败则返回 None + (标签均值, 记录时间戳),如果读取失败则返回 (None, "") """ try: es_config = get_es_config(self.config) if not es_config['enabled']: logger.debug(f"[{pair}] ES 未启用,跳过从 ES 读取标签分布") - return None + return None, "" # 获取当前年月(用于确定索引) now = datetime.datetime.now(datetime.timezone.utc) @@ -540,17 +540,19 @@ class FreqaiPrimer(IStrategy): if response.status_code == 200: result = response.json() if result['hits']['total']['value'] > 0: - labels_mean = result['hits']['hits'][0]['_source'].get('labels_mean', {}) + source = result['hits']['hits'][0]['_source'] + labels_mean = source.get('labels_mean', {}) + timestamp = source.get('@timestamp', '') value = labels_mean.get(label_name) if value is not None: - logger.info(f"✅ [{pair}] 从 ES 成功读取标签分布: {label_name}={value:.3f}") - return value + logger.info(f"✅ [{pair}] 从 ES 成功读取标签分布: {label_name}={value:.3f} (TS: {timestamp})") + return value, timestamp else: logger.warning(f"⚠️ [{pair}] ES 记录中缺少标签 {label_name}") - return None + return None, timestamp else: logger.warning(f"⚠️ [{pair}] ES 中未找到训练记录(索引: {index_name})") - return None + return None, "" else: logger.error( f"❌ [{pair}] ES 查询失败 | " @@ -558,17 +560,17 @@ class FreqaiPrimer(IStrategy): f"URL: {es_url} | " f"响应: {response.text[:200]}" ) - return None + return None, "" except requests.exceptions.Timeout: logger.error(f"❌ [{pair}] ES 查询超时(>3秒)| URL: {es_config.get('url', 'N/A')}") - return None + return None, "" except requests.exceptions.ConnectionError as e: logger.error(f"❌ [{pair}] ES 连接失败 | URL: {es_config.get('url', 'N/A')} | 错误: {e}") - return None + return None, "" except Exception as e: logger.error(f"❌ [{pair}] ES 读取异常 | 错误类型: {type(e).__name__} | 详情: {e}") - return None + return None, "" def _log_threshold_to_es( self, @@ -578,7 +580,8 @@ class FreqaiPrimer(IStrategy): new_threshold: float, positive_ratio_current: float, adjustment_type: str, - capped: bool + capped: bool, + training_timestamp: str = "" ) -> None: """ 记录阈值调整到 Elasticsearch(用于历史追踪与曲线分析) @@ -591,6 +594,7 @@ class FreqaiPrimer(IStrategy): positive_ratio_current: 当前正样本比例 adjustment_type: 调整类型(如 "轻微偏低") capped: 是否被限幅 + training_timestamp: 所依据的训练记录时间戳 """ try: es_config = get_es_config(self.config) @@ -617,6 +621,7 @@ class FreqaiPrimer(IStrategy): "container_name": get_container_name(self.config), "pair": pair, "label_name": label_name, + "referenced_training_timestamp": training_timestamp, # 阈值变化 "threshold": { @@ -663,19 +668,19 @@ class FreqaiPrimer(IStrategy): # 静默失败,不影响策略执行 logger.debug(f"[{pair}] 阈值调整记录写入 ES 异常: {e}") - def _get_last_threshold_from_es(self, pair: str, label_name: str) -> float | None: + def _get_last_threshold_from_es(self, pair: str, label_name: str) -> tuple[float | None, str]: """ 从 Elasticsearch 读取该币对该标签最近一次实际使用的阈值(有记忆的基准) - 数据来源:freqai.livelog.threshold_adjustment-YYYY.MM 索引 - 读取逻辑: - - 按 @timestamp 倒序查询 1 条 - - 返回 threshold.new 作为下一轮调整的 old 基准 + Data source: freqai.livelog.threshold_adjustment-YYYY.MM index + Logic: + - Query 1 record sorted by @timestamp desc + - Return (threshold.new, referenced_training_timestamp) """ try: es_config = get_es_config(self.config) if not es_config['enabled']: - return None + return None, "" now = datetime.datetime.now(datetime.timezone.utc) year_month = now.strftime('%Y.%m') @@ -704,21 +709,24 @@ class FreqaiPrimer(IStrategy): ) if response.status_code != 200: - return None + return None, "" result = response.json() if result['hits']['total']['value'] == 0: - return None + return None, "" source = result['hits']['hits'][0]['_source'] threshold_info = source.get("threshold", {}) last_value = threshold_info.get("new") + referenced_ts = source.get("referenced_training_timestamp", "") + if last_value is None: - return None - logger.info(f"📖 [{pair}] 从 ES 读取历史阈值: {label_name} 上一轮={last_value:.4f}") - return float(last_value) + return None, "" + + logger.info(f"📖 [{pair}] 从 ES 读取历史阈值: {label_name} 上一轮={last_value:.4f} (Ref TS: {referenced_ts})") + return float(last_value), referenced_ts except Exception as e: - # 出错时直接忽略,使用默认阈值 + # Error ignored, use default logger.debug(f"[{pair}] 从 ES 读取历史阈值失败: {e}") - return None + return None, "" def _get_label_distribution_from_redis(self, pair: str, label_name: str) -> float | None: """从 Redis 读取标签分布""" @@ -738,11 +746,12 @@ class FreqaiPrimer(IStrategy): logger.debug(f"[{pair}] 无法从 Redis 读取: {e}") return None - def _get_label_distribution_from_file(self, pair: str, label_name: str) -> float | None: + def _get_label_distribution_from_file(self, pair: str, label_name: str) -> tuple[float | None, str]: """从本地文件读取标签分布""" import json import os import glob + from datetime import datetime try: # 获取 FreqAI 模型目录 @@ -755,15 +764,17 @@ class FreqaiPrimer(IStrategy): subdirs = glob.glob(pattern) if not subdirs: - return None + return None, "" # 获取最新的训练目录(按时间戳排序) latest_dir = max(subdirs, key=os.path.getmtime) + dir_mtime = os.path.getmtime(latest_dir) + timestamp_str = datetime.fromtimestamp(dir_mtime).strftime('%Y-%m-%dT%H:%M:%S') # 查找 metadata.json 文件 metadata_files = glob.glob(os.path.join(latest_dir, "*_metadata.json")) if not metadata_files: - return None + return None, timestamp_str metadata_file = metadata_files[0] @@ -771,13 +782,14 @@ class FreqaiPrimer(IStrategy): with open(metadata_file, 'r') as f: metadata = json.load(f) - return metadata.get('labels_mean', {}).get(label_name, None) + value = metadata.get('labels_mean', {}).get(label_name, None) + return value, timestamp_str except Exception as e: logger.debug(f"[{pair}] 无法从文件读取: {e}") - return None + return None, "" - def _calculate_adjusted_threshold(self, pair: str, entry_signal_mean: float, default_threshold: float, label_name: str = '&s-entry_signal') -> float: + def _calculate_adjusted_threshold(self, pair: str, entry_signal_mean: float, default_threshold: float, label_name: str = '&s-entry_signal', training_timestamp: str = "") -> float: """ 计算调整后的阈值(带防震荡机制) @@ -849,7 +861,8 @@ class FreqaiPrimer(IStrategy): new_threshold=new_threshold, positive_ratio_current=entry_signal_mean, adjustment_type=adjustment_type, - capped=bool(capped) + capped=bool(capped), + training_timestamp=training_timestamp ) return new_threshold