本地加入一级缓存,redis当作二级缓存

This commit is contained in:
zhangkun9038@dingtalk.com 2025-08-15 14:13:44 +08:00
parent 497ff38133
commit 68a34468fe

View File

@ -1125,55 +1125,190 @@ class FreqaiPrimer(IStrategy):
def get_trend_score_with_cache(self, pair: str, timeframe: str, timestamp: int, dataframe: DataFrame, metadata: dict) -> float:
"""
获取趋势得分优先从 Redis 缓存中读取如果缓存未命中则计算并存储到 Redis
三级缓存架构获取趋势得分
1. 本地内存缓存一级缓存- 最快无网络开销
2. Redis缓存二级缓存- 局域网共享减少重复计算
3. 实时计算三级缓存- 最慢但保证准确性
"""
# 初始化本地缓存(如果尚未初始化)
if not hasattr(self, '_local_cache'):
self._local_cache = {}
self._local_cache_stats = {'hits': 0, 'misses': 0, 'redis_hits': 0, 'computes': 0}
logger.info("🚀 初始化本地内存缓存")
# 创建 Redis 客户端
# 创建 Redis 客户端(如果尚未创建)
if not hasattr(self, 'redis_client') and self.redis_url:
try:
self.redis_client = redis.from_url(self.redis_url)
logger.info("✅ Redis 客户端已成功初始化")
except Exception as e:
logger.error(f"❌ 初始化 Redis 客户端失败: {e}")
raise
# Redis 失败时继续运行,降级为本地缓存
self.redis_client = None
# 生成缓存键,使用新的命名格式
# 生成统一的缓存键
strategy_name = "freqaiprimer"
timeframes_str = "3m-15m-1h" # 固定的多时间框架组合
timeframes_str = "3m-15m-1h"
cache_key = f"{strategy_name}|trend_score|{pair.replace('/', '-')}|{timeframes_str}|{timestamp}"
logger.debug(f"[{pair}] 生成 Redis 缓存键:{cache_key} (时间戳: {timestamp})")
logger.debug(f"[{pair}] 生成缓存键:{cache_key} (时间戳: {timestamp})")
# 尝试从 Redis 获取趋势得分
# 🎯 一级缓存:本地内存检查
if cache_key in self._local_cache:
cached_score = self._local_cache[cache_key]
self._local_cache_stats['hits'] += 1
logger.info(f"[{pair}] 🟢 本地缓存命中key={cache_key}, value={cached_score:.2f}")
return cached_score
self._local_cache_stats['misses'] += 1
logger.debug(f"[{pair}] 本地缓存未命中key={cache_key}")
# 🎯 二级缓存Redis检查
if self.redis_client:
logger.debug(f"[{pair}] 尝试从 Redis 查询趋势得分key={cache_key}")
try:
cached_score = self.redis_client.get(cache_key)
if cached_score is not None:
logger.info(f"[{pair}] 🟩 Redis 缓存命中key={cache_key}, value={float(cached_score):.2f}")
return float(cached_score)
score = float(cached_score)
# 同时写入本地缓存,加速后续访问
self._local_cache[cache_key] = score
self._local_cache_stats['redis_hits'] += 1
logger.info(f"[{pair}] 🟡 Redis 缓存命中key={cache_key}, value={score:.2f} (已同步到本地缓存)")
return score
else:
logger.info(f"[{pair}] 🔴 Redis 缓存未命中key={cache_key}")
except redis.exceptions.RedisError as e:
logger.error(f"[{pair}] Redis 查询失败key={cache_key}, 错误: {e}")
raise
# Redis 失败时继续运行,降级为本地缓存
self.redis_client = None
# 缓存未命中,计算趋势得分
logger.info(f"[{pair}] Redis 缓存未命中,开始计算趋势得分 (时间戳: {timestamp})")
# 🎯 三级缓存:实时计算
logger.info(f"[{pair}] 开始计算趋势得分 (时间戳: {timestamp})")
trend_score = self.get_market_trend(dataframe=dataframe, metadata=metadata)
logger.info(f"[{pair}] 成功计算趋势得分: {trend_score:.2f}")
self._local_cache_stats['computes'] += 1
logger.info(f"[{pair}] ✅ 成功计算趋势得分: {trend_score:.2f}")
# 将趋势得分存储到 Redis设置过期时间为 1 天
# 将结果写入两级缓存
self._local_cache[cache_key] = trend_score
if self.redis_client:
logger.debug(f"[{pair}] 尝试将趋势得分写入 Rediskey={cache_key}, value={trend_score:.2f}")
try:
self.redis_client.setex(cache_key, 86400, trend_score)
logger.info(f"[{pair}] 成功将趋势得分存储到 Rediskey={cache_key}, value={trend_score:.2f}")
logger.info(f"[{pair}] 成功将趋势得分存储到 Rediskey={cache_key}, value={trend_score:.2f}")
except redis.exceptions.RedisError as e:
logger.error(f"[{pair}] Redis 写入失败key={cache_key}, 错误: {e}")
raise
# Redis 失败时继续运行
# 定期打印缓存统计信息
total_requests = sum(self._local_cache_stats.values())
if total_requests % 100 == 0 and total_requests > 0:
hit_rate = (self._local_cache_stats['hits'] + self._local_cache_stats['redis_hits']) / total_requests * 100
logger.info(f"📊 缓存统计 - 本地命中率: {self._local_cache_stats['hits']/total_requests*100:.1f}%, "
f"Redis命中率: {self._local_cache_stats['redis_hits']/total_requests*100:.1f}%, "
f"计算次数: {self._local_cache_stats['computes']}, "
f"总命中率: {hit_rate:.1f}%")
return trend_score
def cleanup_local_cache(self):
"""
清理本地缓存防止内存泄漏
"""
if hasattr(self, '_local_cache'):
cache_size = len(self._local_cache)
self._local_cache.clear()
logger.info(f"🧹 已清理本地缓存,清除条目数: {cache_size}")
def get_cache_stats(self) -> dict:
"""
获取缓存统计信息
"""
if hasattr(self, '_local_cache_stats'):
total_requests = sum(self._local_cache_stats.values())
if total_requests > 0:
return {
'local_hits': self._local_cache_stats['hits'],
'redis_hits': self._local_cache_stats['redis_hits'],
'misses': self._local_cache_stats['misses'],
'computes': self._local_cache_stats['computes'],
'total_requests': total_requests,
'local_hit_rate': self._local_cache_stats['hits'] / total_requests * 100,
'redis_hit_rate': self._local_cache_stats['redis_hits'] / total_requests * 100,
'overall_hit_rate': (self._local_cache_stats['hits'] + self._local_cache_stats['redis_hits']) / total_requests * 100
}
return {'local_hits': 0, 'redis_hits': 0, 'misses': 0, 'computes': 0, 'total_requests': 0}
def bot_loop_start(self, current_time: datetime, **kwargs) -> None:
"""
每个交易循环开始时调用用于自动清理本地缓存
"""
if hasattr(self, '_local_cache') and self._local_cache:
cache_size = len(self._local_cache)
# 每1000次循环清理一次或当缓存超过500条时清理
if not hasattr(self, '_loop_counter'):
self._loop_counter = 0
self._loop_counter += 1
if self._loop_counter % 1000 == 0 or cache_size > 500:
logger.info(f"🧹 自动清理本地缓存 - 循环次数: {self._loop_counter}, 缓存大小: {cache_size}")
self.cleanup_local_cache()
# 打印缓存统计
stats = self.get_cache_stats()
if stats['total_requests'] > 0:
logger.info(f"📊 缓存性能统计: 本地命中率 {stats['local_hit_rate']:.1f}%, "
f"Redis命中率 {stats['redis_hit_rate']:.1f}%, "
f"总命中率 {stats['overall_hit_rate']:.1f}%")
def bot_start(self, **kwargs) -> None:
"""
机器人启动时调用初始化缓存相关设置
"""
logger.info("🚀 策略启动 - 初始化多级缓存系统")
# 初始化本地缓存
if not hasattr(self, '_local_cache'):
self._local_cache = {}
self._local_cache_stats = {'hits': 0, 'misses': 0, 'redis_hits': 0, 'computes': 0}
logger.info("✅ 本地内存缓存已初始化")
# 设置最大本地缓存大小
self._max_local_cache_size = 500
# 初始化循环计数器
self._loop_counter = 0
def bot_stop(self, **kwargs) -> None:
"""
机器人停止时调用清理所有缓存
"""
logger.info("🛑 策略停止 - 清理所有缓存")
# 清理本地缓存
if hasattr(self, '_local_cache'):
cache_size = len(self._local_cache)
self.cleanup_local_cache()
# 打印最终统计
stats = self.get_cache_stats()
if stats['total_requests'] > 0:
logger.info(f"📊 最终缓存统计 - 总请求: {stats['total_requests']}, "
f"本地命中: {stats['local_hits']}, "
f"Redis命中: {stats['redis_hits']}, "
f"计算次数: {stats['computes']}, "
f"总命中率: {stats['overall_hit_rate']:.1f}%")
# 清理Redis连接
if hasattr(self, 'redis_client') and self.redis_client:
try:
self.redis_client.close()
logger.info("✅ Redis连接已关闭")
except Exception as e:
logger.error(f"❌ 关闭Redis连接失败: {e}")
def detect_trend_status(self, dataframe: DataFrame, metadata: dict) -> str:
"""
基于加权分段的trend_score判断趋势状态