在 threshold_adjustment 索引中新增字段 referenced_training_timestamp,记录本次调整是基于哪一次具体的训练结果

This commit is contained in:
zhangkun9038@dingtalk.com 2026-01-19 20:58:59 +08:00
parent 8ba9607f0e
commit b5093d3972

View File

@ -444,61 +444,61 @@ class FreqaiPrimer(IStrategy):
"""
自适应阈值调整根据最新训练的标签分布动态调整当前阈值
逻辑
- 如果上次正样本 > 65%提高阈值让标签更严格
- 如果上次正样本 < 35%降低阈值让标签更宽松
- 如果在 40%-60% 之间保持不变理想状态
Logic:
- If last positive > 65%, increase threshold
- If last positive < 35%, decrease threshold
- If between 40%-60%, keep unchanged
优先级
1. Elasticsearch 读取最新最权威 10 分钟更新
2. Redis 读取如果 ES 不可用
3. 从本地文件读取如果 Redis 也不可用
4. 使用默认值首次训练
Memory Mechanism:
- Only adjust once per training result. If the latest training result has already been used for adjustment,
directly return the result of that adjustment.
Args:
pair: 交易对名称 "BTC/USDT"
default_threshold: 默认阈值百分比 1.5 表示 1.5%
label_name: 标签名称 '&s-entry_signal' '&s-exit_signal'
Returns:
调整后的阈值百分比
Priority:
1. Read from Elasticsearch
2. Read from Redis (if ES unavailable)
3. Read from local file (if Redis unavailable)
4. Use default value (first training)
"""
import json
import os
import glob
try:
# 方案1优先从 Elasticsearch 读取(最新数据)
entry_signal_mean = self._get_label_distribution_from_es(pair, label_name)
# Step 1: Get the latest training distribution and its timestamp
entry_signal_mean, train_ts = self._get_label_distribution_from_es(pair, label_name)
# 方案2如果 ES 没有,从 Redis 读取
# Fallback to Redis
if entry_signal_mean is None:
entry_signal_mean = self._get_label_distribution_from_redis(pair, label_name)
train_ts = "" # Redis currently doesn't store TS
# 方案3如果 Redis 也没有,从本地文件读取
# Fallback to file
if entry_signal_mean is None:
entry_signal_mean = self._get_label_distribution_from_file(pair, label_name)
entry_signal_mean, train_ts = self._get_label_distribution_from_file(pair, label_name)
# 如果都没有,返回默认值(首次训练)
# If still None, return default
if entry_signal_mean is None:
return default_threshold
# === 确定本轮调整的基准阈值 ===
# 优先使用上一轮实际使用的阈值(有记忆),否则退回到配置中的默认阈值
# Step 2: Get the latest adjustment and its referenced training timestamp
base_threshold = default_threshold
last_threshold = self._get_last_threshold_from_es(pair, label_name)
last_threshold, ref_ts = self._get_last_threshold_from_es(pair, label_name)
# Step 3: Check if we have already adjusted for this training result
if train_ts and ref_ts == train_ts:
# Already adjusted for this training, return the previous result directly
if last_threshold is not None:
# self.strategy_log(f"[{pair}] {label_name} 已针对训练 {train_ts} 调整过,直接复用: {last_threshold:.4f}")
return last_threshold
# Step 4: Determine the base for adjustment
if last_threshold is not None:
base_threshold = last_threshold
# 自适应调整逻辑(以 base_threshold 为基准,形成有记忆的收敛轨迹)
return self._calculate_adjusted_threshold(pair, entry_signal_mean, base_threshold, label_name)
# Step 5: Calculate and log new adjustment
return self._calculate_adjusted_threshold(pair, entry_signal_mean, base_threshold, label_name, training_timestamp=train_ts)
except Exception as e:
# 出错时使用默认值
logger.warning(f"[{pair}] 自适应阈值调整失败: {e},使用默认值 {default_threshold}")
return default_threshold
def _get_label_distribution_from_es(self, pair: str, label_name: str) -> float | None:
def _get_label_distribution_from_es(self, pair: str, label_name: str) -> tuple[float | None, str]:
"""
Elasticsearch 读取该币对最新的标签分布
@ -507,13 +507,13 @@ class FreqaiPrimer(IStrategy):
label_name: 标签名称 "&s-entry_signal"
Returns:
标签均值如果读取失败则返回 None
(标签均值, 记录时间戳)如果读取失败则返回 (None, "")
"""
try:
es_config = get_es_config(self.config)
if not es_config['enabled']:
logger.debug(f"[{pair}] ES 未启用,跳过从 ES 读取标签分布")
return None
return None, ""
# 获取当前年月(用于确定索引)
now = datetime.datetime.now(datetime.timezone.utc)
@ -540,17 +540,19 @@ class FreqaiPrimer(IStrategy):
if response.status_code == 200:
result = response.json()
if result['hits']['total']['value'] > 0:
labels_mean = result['hits']['hits'][0]['_source'].get('labels_mean', {})
source = result['hits']['hits'][0]['_source']
labels_mean = source.get('labels_mean', {})
timestamp = source.get('@timestamp', '')
value = labels_mean.get(label_name)
if value is not None:
logger.info(f"✅ [{pair}] 从 ES 成功读取标签分布: {label_name}={value:.3f}")
return value
logger.info(f"✅ [{pair}] 从 ES 成功读取标签分布: {label_name}={value:.3f} (TS: {timestamp})")
return value, timestamp
else:
logger.warning(f"⚠️ [{pair}] ES 记录中缺少标签 {label_name}")
return None
return None, timestamp
else:
logger.warning(f"⚠️ [{pair}] ES 中未找到训练记录(索引: {index_name}")
return None
return None, ""
else:
logger.error(
f"❌ [{pair}] ES 查询失败 | "
@ -558,17 +560,17 @@ class FreqaiPrimer(IStrategy):
f"URL: {es_url} | "
f"响应: {response.text[:200]}"
)
return None
return None, ""
except requests.exceptions.Timeout:
logger.error(f"❌ [{pair}] ES 查询超时(>3秒| URL: {es_config.get('url', 'N/A')}")
return None
return None, ""
except requests.exceptions.ConnectionError as e:
logger.error(f"❌ [{pair}] ES 连接失败 | URL: {es_config.get('url', 'N/A')} | 错误: {e}")
return None
return None, ""
except Exception as e:
logger.error(f"❌ [{pair}] ES 读取异常 | 错误类型: {type(e).__name__} | 详情: {e}")
return None
return None, ""
def _log_threshold_to_es(
self,
@ -578,7 +580,8 @@ class FreqaiPrimer(IStrategy):
new_threshold: float,
positive_ratio_current: float,
adjustment_type: str,
capped: bool
capped: bool,
training_timestamp: str = ""
) -> None:
"""
记录阈值调整到 Elasticsearch用于历史追踪与曲线分析
@ -591,6 +594,7 @@ class FreqaiPrimer(IStrategy):
positive_ratio_current: 当前正样本比例
adjustment_type: 调整类型 "轻微偏低"
capped: 是否被限幅
training_timestamp: 所依据的训练记录时间戳
"""
try:
es_config = get_es_config(self.config)
@ -617,6 +621,7 @@ class FreqaiPrimer(IStrategy):
"container_name": get_container_name(self.config),
"pair": pair,
"label_name": label_name,
"referenced_training_timestamp": training_timestamp,
# 阈值变化
"threshold": {
@ -663,19 +668,19 @@ class FreqaiPrimer(IStrategy):
# 静默失败,不影响策略执行
logger.debug(f"[{pair}] 阈值调整记录写入 ES 异常: {e}")
def _get_last_threshold_from_es(self, pair: str, label_name: str) -> float | None:
def _get_last_threshold_from_es(self, pair: str, label_name: str) -> tuple[float | None, str]:
"""
Elasticsearch 读取该币对该标签最近一次实际使用的阈值有记忆的基准
数据来源freqai.livelog.threshold_adjustment-YYYY.MM 索引
读取逻辑
- @timestamp 倒序查询 1
- 返回 threshold.new 作为下一轮调整的 old 基准
Data source: freqai.livelog.threshold_adjustment-YYYY.MM index
Logic:
- Query 1 record sorted by @timestamp desc
- Return (threshold.new, referenced_training_timestamp)
"""
try:
es_config = get_es_config(self.config)
if not es_config['enabled']:
return None
return None, ""
now = datetime.datetime.now(datetime.timezone.utc)
year_month = now.strftime('%Y.%m')
@ -704,21 +709,24 @@ class FreqaiPrimer(IStrategy):
)
if response.status_code != 200:
return None
return None, ""
result = response.json()
if result['hits']['total']['value'] == 0:
return None
return None, ""
source = result['hits']['hits'][0]['_source']
threshold_info = source.get("threshold", {})
last_value = threshold_info.get("new")
referenced_ts = source.get("referenced_training_timestamp", "")
if last_value is None:
return None
logger.info(f"📖 [{pair}] 从 ES 读取历史阈值: {label_name} 上一轮={last_value:.4f}")
return float(last_value)
return None, ""
logger.info(f"📖 [{pair}] 从 ES 读取历史阈值: {label_name} 上一轮={last_value:.4f} (Ref TS: {referenced_ts})")
return float(last_value), referenced_ts
except Exception as e:
# 出错时直接忽略,使用默认阈值
# Error ignored, use default
logger.debug(f"[{pair}] 从 ES 读取历史阈值失败: {e}")
return None
return None, ""
def _get_label_distribution_from_redis(self, pair: str, label_name: str) -> float | None:
"""从 Redis 读取标签分布"""
@ -738,11 +746,12 @@ class FreqaiPrimer(IStrategy):
logger.debug(f"[{pair}] 无法从 Redis 读取: {e}")
return None
def _get_label_distribution_from_file(self, pair: str, label_name: str) -> float | None:
def _get_label_distribution_from_file(self, pair: str, label_name: str) -> tuple[float | None, str]:
"""从本地文件读取标签分布"""
import json
import os
import glob
from datetime import datetime
try:
# 获取 FreqAI 模型目录
@ -755,15 +764,17 @@ class FreqaiPrimer(IStrategy):
subdirs = glob.glob(pattern)
if not subdirs:
return None
return None, ""
# 获取最新的训练目录(按时间戳排序)
latest_dir = max(subdirs, key=os.path.getmtime)
dir_mtime = os.path.getmtime(latest_dir)
timestamp_str = datetime.fromtimestamp(dir_mtime).strftime('%Y-%m-%dT%H:%M:%S')
# 查找 metadata.json 文件
metadata_files = glob.glob(os.path.join(latest_dir, "*_metadata.json"))
if not metadata_files:
return None
return None, timestamp_str
metadata_file = metadata_files[0]
@ -771,13 +782,14 @@ class FreqaiPrimer(IStrategy):
with open(metadata_file, 'r') as f:
metadata = json.load(f)
return metadata.get('labels_mean', {}).get(label_name, None)
value = metadata.get('labels_mean', {}).get(label_name, None)
return value, timestamp_str
except Exception as e:
logger.debug(f"[{pair}] 无法从文件读取: {e}")
return None
return None, ""
def _calculate_adjusted_threshold(self, pair: str, entry_signal_mean: float, default_threshold: float, label_name: str = '&s-entry_signal') -> float:
def _calculate_adjusted_threshold(self, pair: str, entry_signal_mean: float, default_threshold: float, label_name: str = '&s-entry_signal', training_timestamp: str = "") -> float:
"""
计算调整后的阈值带防震荡机制
@ -849,7 +861,8 @@ class FreqaiPrimer(IStrategy):
new_threshold=new_threshold,
positive_ratio_current=entry_signal_mean,
adjustment_type=adjustment_type,
capped=bool(capped)
capped=bool(capped),
training_timestamp=training_timestamp
)
return new_threshold