redis保存状态

This commit is contained in:
zhangkun9038@dingtalk.com 2025-11-27 16:55:37 +08:00
parent e69d556737
commit ed0c8fcda0
2 changed files with 99 additions and 8 deletions

View File

@ -240,6 +240,9 @@ class GridManager:
# 记录到历史
self.position_history.append(adjustment)
# ✅ 立即同步上次改变上 Redis
self.sync_grid_state_to_redis()
def decide_adjustment(self) -> Optional[PositionRecord]:
"""
@ -494,11 +497,12 @@ class GridManager:
Returns:
是否连接成功
Raises:
ConnectionError: 如果 Redis 连接失败
"""
if not REDIS_AVAILABLE:
print(f"[GridManager] {self.pair} Redis 未安装,跳过 Redis 连接",
file=sys.stderr, flush=True)
return False
raise RuntimeError(f"[GridManager] {self.pair} Redis 包未安装,无法继续")
try:
# 解析 Redis URL
@ -519,10 +523,83 @@ class GridManager:
return True
except Exception as e:
print(f"[GridManager] {self.pair} Redis 连接失败: {str(e)}",
file=sys.stderr, flush=True)
error_msg = f"[GridManager] {self.pair} Redis 连接失败: {str(e)}"
print(error_msg, file=sys.stderr, flush=True)
self.redis_client = None
return False
raise ConnectionError(error_msg)
def sync_grid_state_to_redis(self) -> None:
"""
将当前的网格状态同步到 Redis
包我每个网格的 FILLED/EMPTY 状态
"""
if not self.redis_client:
return
try:
# 构建网格状态字典
grid_states_data = {}
for price, grid_level in self.grid_states.items():
grid_states_data[str(price)] = {
"status": grid_level.status,
"quantity": grid_level.quantity,
"entry_price": grid_level.entry_price,
"entry_time": grid_level.entry_time
}
# 存存到 Redis
redis_key = f"grid_state:{self.pair}"
self.redis_client.set(
redis_key,
json.dumps(grid_states_data),
ex=86400 # 24 小时过期
)
except Exception as e:
print(f"[GridManager] {self.pair} 网格状态同步失败: {str(e)}",
file=sys.stderr, flush=True)
def recover_grid_state_from_redis(self) -> Optional[Dict[str, Any]]:
"""
Redis 恢复网格状态
"""
if not self.redis_client:
return None
try:
redis_key = f"grid_state:{self.pair}"
data = self.redis_client.get(redis_key)
if not data:
return None
return json.loads(data)
except Exception as e:
print(f"[GridManager] {self.pair} 从 Redis 恢复网格状态失败: {str(e)}",
file=sys.stderr, flush=True)
return None
def restore_grid_state(self, grid_states_data: Dict[str, Any]) -> None:
"""
恢复网格状态
"""
try:
for price_str, state_data in grid_states_data.items():
price = float(price_str)
if price in self.grid_states:
grid_level = self.grid_states[price]
grid_level.status = state_data.get("status", "empty")
grid_level.quantity = state_data.get("quantity", 0.0)
grid_level.entry_price = state_data.get("entry_price", 0.0)
grid_level.entry_time = state_data.get("entry_time", 0)
print(f"[GridManager] {self.pair} 网格状态恢复完成",
file=sys.stderr, flush=True)
except Exception as e:
print(f"[GridManager] {self.pair} 恢复网格状态失败: {str(e)}",
file=sys.stderr, flush=True)
def report_to_redis(self) -> None:
"""
@ -532,7 +609,7 @@ class GridManager:
return
try:
# 构状态报告
# 构状态报告
report = {
"timestamp": datetime.now().isoformat(),
"candle_index": self.candle_index,

View File

@ -79,6 +79,7 @@ class StaticGrid(IStrategy):
grid_manager.step = step # 更新步长
# 重新初始化 grid_states仅在第一次之后保留已有的 FILLED 状态)
# 先从 Redis 恢复之前的网格状态
grid_manager.grid_states.clear()
for price in grid_manager.grid_prices:
grid_manager.grid_states[price] = GridLevel(
@ -89,6 +90,14 @@ class StaticGrid(IStrategy):
entry_time=0
)
# 从 Redis 恢复网格状态(如果存在)
if self.redis_available:
redis_grid_state = grid_manager.recover_grid_state_from_redis()
if redis_grid_state:
grid_manager.restore_grid_state(redis_grid_state)
print(f"[StaticGrid] {pair} 已从 Redis 恢复网格状态",
file=sys.stderr, flush=True)
print(f"[StaticGrid] {pair} 更新网格范围 - 从 1000-5000 更新为 {new_lower:.2f}-{new_upper:.2f}",
file=sys.stderr, flush=True)
print(f"[StaticGrid] {pair} 新网格数: {grid_manager.total_grid_levels}, 步长: {step:.4f}",
@ -166,7 +175,12 @@ class StaticGrid(IStrategy):
# 初始化 Redis 连接用于后续同步
if self.redis_available:
grid_manager.init_redis(self.redis_url)
try:
grid_manager.init_redis(self.redis_url)
except (RuntimeError, ConnectionError) as e:
print(f"[StaticGrid] {pair} Redis 连接失败: {str(e)}",
file=sys.stderr, flush=True)
raise # 重新投掷,导致策略退出
# 保存网格配置,用于价格更新时重新计算
self.grid_managers[pair] = grid_manager