""" 网格交易管理器 - 维护单个币对的完整网格持仓生命周期 """ from dataclasses import dataclass, field from typing import Optional, List, Dict, Any from enum import Enum import sys import json from datetime import datetime from urllib.parse import urlparse try: import redis REDIS_AVAILABLE = True except ImportError: REDIS_AVAILABLE = False class AdjustmentType(Enum): """加减仓类型""" ENTRY = "entry" # 初始建仓 ADD = "add_position" # 加仓 REDUCE = "reduce_position" # 减仓 EXIT = "exit" # 全部平仓 @dataclass class Grid: """ 单个网格对象 - 代表网格中的一个位置 可以是已成交的买单或未成交的卖单 """ hash_id: str # 随机唯一标识 grid_index: int # 网格位置索引 (0-49) direction: str # "buy" 或 "sell" # 对于已成交买单 entry_price: Optional[float] = None # 被填充时的入场价格 entry_time: Optional[int] = None # 被填充的时间(蜡烛线索引) quantity: float = 0.0 # 持仓数量 # 对于未成交卖单 bid_price: Optional[float] = None # 挂单价格 bid_time: Optional[int] = None # 挂单时间 # 兑现信息 filled_price: Optional[float] = None # 兑现价格 filled_time: Optional[int] = None # 兑现时间 profit: Optional[float] = None # 单个网格的盈亏 @dataclass class GridLevel: """单个网格点的状态""" price: float # 网格价格 status: str # "filled" (已买入) 或 "empty" (未买/已卖出) quantity: float # 该网格的持仓数量 entry_price: float # 该网格的实际买入价格 entry_time: int # 建仓蜡烛线索引 @dataclass class PositionRecord: """单次加减仓记录""" level_index: int # 网格级别序号 price: float # 操作价格 quantity: float # 操作份数 type: AdjustmentType # 操作类型 timestamp: int # 操作蜡烛线索引 @dataclass class OrderFill: """订单成交信息""" order_id: str # 订单 ID pair: str # 币对 side: str # "buy" 或 "sell" filled_price: float # 成交价格 filled_amount: float # 成交数量 fee: float # 手续费(USDT) timestamp: int # 成交蜡烛线索引 reason: str = "" # 成交原因(entry/add/exit 等) class GridMonitor: """ GridManager 自监测机制 - 判断是否需要止盈或止损 简单策略: - 止盈:持仓利润达到 target_profit_pct(如 2%) - 止损:持仓亏损达到 stop_loss_pct(如 -5%) """ def __init__(self, target_profit_pct: float = 0.02, # 止盈目标 2% stop_loss_pct: float = -0.05): # 止损阈值 -5% self.target_profit_pct = target_profit_pct self.stop_loss_pct = stop_loss_pct def should_take_profit(self, current_price: float, avg_entry_price: float) -> bool: """ 判断是否应该止盈 Args: current_price: 当前价格 avg_entry_price: 平均建仓价 Returns: True 表示应该止盈 """ if avg_entry_price <= 0: return False profit_pct = (current_price - avg_entry_price) / avg_entry_price return profit_pct >= self.target_profit_pct def should_stop_loss(self, current_price: float, avg_entry_price: float) -> bool: """ 判断是否应该止损 Args: current_price: 当前价格 avg_entry_price: 平均建仓价 Returns: True 表示应该止损 """ if avg_entry_price <= 0: return False loss_pct = (current_price - avg_entry_price) / avg_entry_price return loss_pct <= self.stop_loss_pct def get_exit_reason(self, current_price: float, avg_entry_price: float) -> Optional[str]: """ 判断退出原因(止盈或止损) Returns: "take_profit", "stop_loss" 或 None """ if self.should_take_profit(current_price, avg_entry_price): return "take_profit" elif self.should_stop_loss(current_price, avg_entry_price): return "stop_loss" return None class GridManager: """ 网格交易管理器 - 维护完整的持仓生命周期 架构: - currentGridList: 长度固定=50的网格集合(已成交买单+未成交卖单) - historyGridList: 所有已兑现的Grid信息(长度只增不减) - pointer_index: 浮标指针(虚拟),分界线位置 规则: - 价格下跌 → pointer_index左移 → 右侧卖单被兑现 - 价格上涨 → pointer_index右移 → 左侧买单被兑现 - 兑现后的Grid自动从currentGridList移除,新Grid补充 """ # 常量定义:所有都可以修改 TOTAL_GRIDS = 50 # 总网格数 FILL_RATIO = 0.5 # 初始填充比例 PRICE_AMPLITUDE_BTC = 0.20 # BTC波幅 20% PRICE_AMPLITUDE_DEFAULT = 0.33 # 其他币种波幅 33% def __init__(self, pair: str, lower_price: float, upper_price: float, step: float, stake_per_grid: float): """ 初始化网格管理器 Args: pair: 币对名称,如 "ETH/USDT" lower_price: 网格下限价格 upper_price: 网格上限价格 step: 网格间距 stake_per_grid: 每个网格的投资额 """ self.pair = pair self.lower_price = lower_price self.upper_price = upper_price self.step = step self.stake_per_grid = stake_per_grid # 创建时间和随机hash_id self.created_time = int(datetime.now().timestamp()) import uuid self.hash_id = str(uuid.uuid4())[:8] # 8位随机值 # ✅ 新架构:浮标指针 self.pointer_index = int(self.TOTAL_GRIDS * self.FILL_RATIO) # 初始位置 = 25 # currentGridList 和 historyGridList 将由 Redis 存储 # 这里只保存引用key self.current_grid_list_key = f"grids:current:{pair}:{self.hash_id}" self.history_grid_list_key = f"grids:history:{pair}:{self.hash_id}" # GridManager 本身的 key(存储元数据) self.gridmanager_key = f"gm:{pair}:{self.created_time}:{self.hash_id}" # ... existing code ... self.total_grid_levels = int((upper_price - lower_price) / step) + 1 # 生成所有网格点价格 self.grid_prices = [lower_price + i * step for i in range(self.total_grid_levels)] # 核心数据结构:网格状态数组 # grid_states[i] = GridLevel 对象,记录该网格的状态(filled/empty) self.grid_states: Dict[float, GridLevel] = {} for price in self.grid_prices: self.grid_states[price] = GridLevel( price=price, status="empty", # 初始状态:全部为 empty(未持有) quantity=0.0, # 当前持仓数量 entry_price=0.0, # 买入价格 entry_time=0 ) # 持仓统计 self.position_history: List[PositionRecord] = [] # 历史加减仓记录 self.order_fills: List[OrderFill] = [] # 所有订单成交记录 self.pending_orders: Dict[str, PositionRecord] = {} # 待成交的订单映射 # 当前状态 self.current_price: Optional[float] = None self.total_quantity: float = 0.0 # 总持仓数量 self.total_invested: float = 0.0 # 总投入金额 self.avg_entry_price: float = 0.0 # 平均建仓价 self.highest_price: float = 0.0 # 持仓期间最高价 self.lowest_price: float = float('inf') # 持仓期间最低价 # 调试 self.candle_index = 0 # 当前蜡烛线索引 # Redis 连接 self.redis_client: Optional[redis.Redis] = None self.redis_key = f"grid:{pair}" # Redis 存储键 # 报告 self.last_report_candle = 0 # 上次报告的蜡烛线 self.report_interval_candles = 600 # 每 600 个蜡烛线报告一次(约 10h,1h K线) # 控制标志:是否已从 Trade 对象初始化过持仓 self._synced_from_trade_once = False # 控制标志:是否已经从 Trade 对象一次性初始化过网格状态(第一次有持仓时) self._grid_initialized_from_trade = False # ✅ 自监测机制 - 止盈/止损判断 self.monitor = GridMonitor( target_profit_pct=0.02, # 止盈目标: 2% stop_loss_pct=-0.05 # 止损阈值: -5% ) self.is_completed = False # GridManager 是否完结 self.completion_reason: Optional[str] = None # 完结原因 (take_profit / stop_loss) # ❌ 删除min_grids_for_exit - 止盈与加仓次数无关,只由ROI信号决定 # ✅ GridManager 不变量:沉求下来就不改了(除非网格范围更新) # 便于 Redis 序列化保存 def update_state(self, current_price: float, candle_index: int) -> None: """ 更新网格对象的当前状态 这个方法应该在每个蜡烛线调用一次,用最新的市场价格更新网格对象 Args: current_price: 当前市场价格 candle_index: 当前蜡烛线索引(用于时间戳) """ self.current_price = current_price self.candle_index = candle_index # 更新最高/最低价 if self.total_quantity > 0: if current_price > self.highest_price: self.highest_price = current_price if current_price < self.lowest_price: self.lowest_price = current_price # ✅ 自动更新 pointer_index(根据价格变化) move_steps = self.update_pointer_by_price(current_price) if move_steps != 0: print(f"[GridManager] {self.pair} 指针自动移动 {move_steps} 步", file=sys.stderr, flush=True) # ... existing code ... filled_count = sum(1 for gs in self.grid_states.values() if gs.status == "filled") empty_count = len(self.grid_states) - filled_count print(f"[GridManager] {self.pair} 状态更新 - 价格: {current_price:.2f}, " f"持仓: {self.total_quantity:.6f}, 平均价: {self.avg_entry_price:.2f}, " f"网格状态: {filled_count} FILLED / {empty_count} EMPTY", file=sys.stderr, flush=True) # ✅ 自监测:检查是否需要止盈/止损 # 只要有持仓,就检查是否触发止盈/止损条件 if not self.is_completed and self.total_quantity > 0: exit_reason = self.monitor.get_exit_reason(current_price, self.avg_entry_price) if exit_reason: self.is_completed = True self.completion_reason = exit_reason profit_pct = (current_price - self.avg_entry_price) / self.avg_entry_price * 100 filled_count = sum(1 for gs in self.grid_states.values() if gs.status == "filled") print(f"[GridManager] {self.pair} 自监测触发✅ - 原因: {exit_reason}, " f"利润: {profit_pct:.2f}%, 当前价: {current_price:.2f}, 网格数: {filled_count}", file=sys.stderr, flush=True) def apply_adjustment(self, adjustment: PositionRecord) -> None: """ 应用一次加减仓操作,并更新网格状态 通过更新 grid_states 中相关网格的状态(filled/empty)来报告内部一致性 Args: adjustment: PositionRecord 对象,包含加减仓的所有信息 """ price = adjustment.price # ✅ 现在 price 已经是网格价格,不需要舍入 quantity = adjustment.quantity adj_type = adjustment.type # ✅ 直接使用 price 作为网格价格 grid_price = price grid_state = self.grid_states.get(grid_price) if adj_type == AdjustmentType.ENTRY or adj_type == AdjustmentType.ADD: # 建仓或加仓 → 将该网格标记为 FILLED if grid_state: grid_state.status = "filled" grid_state.quantity += quantity # ✅ 关键:增加该网格的持仓数量 grid_state.entry_price = price # 记录实际买入价 grid_state.entry_time = self.candle_index # 更新持仓统计 self.total_invested += quantity * price self.total_quantity += quantity if self.total_quantity > 0: self.avg_entry_price = self.total_invested / self.total_quantity print(f"[GridManager] {self.pair} 加仓 - 网格 {grid_price:.2f} 标记为 FILLED, " f"数量: {quantity:.6f}, 总持仓: {self.total_quantity:.6f}", file=sys.stderr, flush=True) elif adj_type == AdjustmentType.REDUCE: # 减仓 → 其他网格的故事,不常用(网格交易通常只有 ENTRY/ADD/EXIT) if grid_state: grid_state.quantity -= quantity if grid_state.quantity <= 0: grid_state.quantity = 0 self.total_quantity -= quantity self.total_invested -= quantity * grid_state.entry_price if grid_state else 0 if self.total_quantity > 0: self.avg_entry_price = self.total_invested / self.total_quantity print(f"[GridManager] {self.pair} 减仓 - 网格 {grid_price:.2f}, " f"减少: {quantity:.6f}, 剩余持仓: {self.total_quantity:.6f}", file=sys.stderr, flush=True) elif adj_type == AdjustmentType.EXIT: # 平仓 → 将所有 FILLED 的网格一次排空(或不排空对象网格) # 此处只是记录平仓决策,实际的排空是由 decide_adjustment 程序控制 print(f"[GridManager] {self.pair} 平仓 - 总持仓: {self.total_quantity:.6f}, " f"平均价: {self.avg_entry_price:.2f}, 当前价: {price:.2f}", file=sys.stderr, flush=True) # 清空所有 FILLED 网格,下次套需要绍转建仓时重新填满 for gs in self.grid_states.values(): if gs.status == "filled": gs.status = "empty" gs.quantity = 0 gs.entry_price = 0.0 # 清空污了持仓计数 self.total_quantity = 0.0 self.total_invested = 0.0 self.avg_entry_price = 0.0 self.highest_price = 0.0 self.lowest_price = float('inf') # 记录到历史 self.position_history.append(adjustment) # ✅ 立即同步元数据到 Redis self.save_to_redis() print(f"[GridManager] {self.pair} 网格状态已同步到 Redis", file=sys.stderr, flush=True) def decide_adjustment(self) -> Optional[PositionRecord]: """ 判定是否需要加减仓,基于网格填充/排空逻辑 核心规则: 1. 下沿到当前价格 → 所有网格必须被 FILLED(已买入) 如果有 EMPTY 的网格 → 加仓填满它 2. 当前价格到上沿 → 所有网格必须被 EMPTY(不持有) 如果有 FILLED 的网格 → 平仓排空它 Returns: PositionRecord 如果需要操作,否则 None """ if self.current_price is None: return None # ✅ 自监测止盈/止损:如果已经变为完结,返回 EXIT 信号 if self.is_completed: if self.total_quantity > 0: # 全部平仓 return PositionRecord( level_index=0, price=self.current_price, quantity=self.total_quantity, type=AdjustmentType.EXIT, timestamp=self.candle_index ) return None # 找到当前价格所在的网格点 current_grid_price = self._round_to_grid(self.current_price) # 规则 2: 检查上方是否有需要平仓的网格 # 当前价格到上沿之间的网格都应该是 EMPTY for grid_price in self.grid_prices: if grid_price > current_grid_price: grid_state = self.grid_states.get(grid_price) if grid_state and grid_state.status == "filled" and grid_state.quantity > 0: # 上方有持仓,需要平仓 print(f"[GridManager] {self.pair} 平仓信号 - 价格已涨到 {self.current_price:.2f}, " f"上方网格 {grid_price:.2f} 有持仓需排空", file=sys.stderr, flush=True) # 平仓该网格 return PositionRecord( level_index=self._price_to_level_index(grid_price), price=self.current_price, quantity=grid_state.quantity, type=AdjustmentType.EXIT, timestamp=self.candle_index ) # 规则 1: 检查下方是否有需要加仓的网格 # 下沿到当前价格之间的网格都应该是 FILLED for grid_price in self.grid_prices: if grid_price <= current_grid_price: grid_state = self.grid_states.get(grid_price) if grid_state and grid_state.status == "empty" and grid_state.quantity == 0: # 下方有空白网格,需要加仓 print(f"[GridManager] {self.pair} 加仓信号 - 价格已跌到 {self.current_price:.2f}, " f"下方网格 {grid_price:.2f} 为空需填满", file=sys.stderr, flush=True) # ✅ 加仓该网格 - 关键修复:price 应该是网格价格,而不是当前价格 return PositionRecord( level_index=self._price_to_level_index(grid_price), price=grid_price, # ✅ 修复:使用网格价格而不是当前价格 quantity=1.0, type=AdjustmentType.ADD if self.total_quantity > 0 else AdjustmentType.ENTRY, timestamp=self.candle_index ) # 没有操作建议(所有网格状态都符合规则) return None def _price_to_level_index(self, price: float) -> int: """将价格转换为网格级别序号""" index = int((price - self.lower_price) / self.step) return max(0, min(index, self.total_grid_levels - 1)) def _round_to_grid(self, price: float) -> float: """ 将价格舍入到最接近的网格点(向下舍入) 基于 lower_price 的偏移量进行计算 """ # 计算相对于下沿的偏移量 offset = price - self.lower_price # 舍入到最接近的网格间距 grid_count = int(offset / self.step) # 返回对应的网格价格 return self.lower_price + grid_count * self.step def get_summary(self) -> Dict[str, Any]: """获取当前持仓的完整摘要""" filled_grids = sum(1 for gs in self.grid_states.values() if gs.status == "filled") return { "pair": self.pair, "current_price": self.current_price, "total_quantity": self.total_quantity, "total_invested": self.total_invested, "avg_entry_price": self.avg_entry_price, "unrealized_profit": (self.current_price - self.avg_entry_price) * self.total_quantity if self.total_quantity > 0 else 0, "filled_grids": filled_grids, # FILLED 的网格数 "empty_grids": self.total_grid_levels - filled_grids, # EMPTY 的网格数 "total_orders": len(self.order_fills), "highest_price": self.highest_price if self.total_quantity > 0 else None, "lowest_price": self.lowest_price if self.total_quantity > 0 else None, } def record_order_fill(self, order_fill: OrderFill) -> None: """ 记录一个订单成交事件 这个方法应该在订单成交时立即调用,将真实的成交数据反映到网格对象 Args: order_fill: OrderFill 对象,包含成交的所有信息 """ self.order_fills.append(order_fill) if order_fill.side == "buy": # 买入订单成交 actual_cost = order_fill.filled_amount * order_fill.filled_price + order_fill.fee self.total_invested += actual_cost self.total_quantity += order_fill.filled_amount # 更新平均价(考虑手续费) if self.total_quantity > 0: self.avg_entry_price = self.total_invested / self.total_quantity print(f"[GridManager] {self.pair} 买入订单成交 - " f"价格: {order_fill.filled_price:.2f}, " f"数量: {order_fill.filled_amount:.6f}, " f"手续费: {order_fill.fee:.4f} USDT, " f"总持仓: {self.total_quantity:.6f}", file=sys.stderr, flush=True) # 记录到 grid_levels if order_fill.filled_price not in self.grid_levels: self.grid_levels[order_fill.filled_price] = GridLevel( price=order_fill.filled_price, quantity=order_fill.filled_amount, entry_time=order_fill.timestamp, status="open" ) else: self.grid_levels[order_fill.filled_price].quantity += order_fill.filled_amount elif order_fill.side == "sell": # 卖出订单成交 sell_proceeds = order_fill.filled_amount * order_fill.filled_price - order_fill.fee self.total_invested -= order_fill.filled_amount * self.avg_entry_price self.total_quantity -= order_fill.filled_amount # 更新平均价 if self.total_quantity > 0: self.avg_entry_price = self.total_invested / self.total_quantity else: self.avg_entry_price = 0.0 # 计算本次卖出的利润 sell_profit = sell_proceeds - (order_fill.filled_amount * self.avg_entry_price if self.avg_entry_price > 0 else 0) profit_pct = (order_fill.filled_price - self.avg_entry_price) / self.avg_entry_price * 100 if self.avg_entry_price > 0 else 0 print(f"[GridManager] {self.pair} 卖出订单成交 - " f"价格: {order_fill.filled_price:.2f}, " f"数量: {order_fill.filled_amount:.6f}, " f"手续费: {order_fill.fee:.4f} USDT, " f"利润: {sell_profit:.2f} USDT ({profit_pct:.2f}%), " f"剩余持仓: {self.total_quantity:.6f}", file=sys.stderr, flush=True) def sync_from_trade_object(self, trade: Any, candle_index: int) -> None: """ 从 Freqtrade 的 Trade 对象同步状态 只同步持仓数量、成本、平均价等数值信息 网格 FILLED/EMPTY 状态由 apply_adjustment() 和 position_history 维护 Args: trade: Freqtrade Trade 对象 candle_index: 当前蜡烛线索引 """ # 从 trade 对象提取关键信息 self.total_quantity = trade.amount self.total_invested = trade.stake_amount self.avg_entry_price = trade.open_rate # ✅ 只同步数值信息,不推导网格状态 # 网格状态由真实的加仓操作(apply_adjustment)维护 # 不再根据平均价假设所有低于平均价的网格都被加仓了 print(f"[GridManager] {self.pair} 从 Trade 同步 - 持仓: {self.total_quantity:.6f}, " f"平均价: {self.avg_entry_price:.2f}, 成本: {self.total_invested:.2f}", file=sys.stderr, flush=True) def record_pending_order(self, order_id: str, adjustment: PositionRecord) -> None: """ 记录一个待成交的订单 当策略决定建仓/加仓后,订单被提交给 Freqtrade 这个方法记录订单 ID 与策略决策的映射关系 Args: order_id: Freqtrade 返回的订单 ID adjustment: 对应的 PositionRecord 对象 """ self.pending_orders[order_id] = adjustment print(f"[GridManager] {self.pair} 记录待成交订单 - " f"订单ID: {order_id}, " f"类型: {adjustment.type.value}, " f"价格: {adjustment.price:.2f}", file=sys.stderr, flush=True) def remove_pending_order(self, order_id: str) -> Optional[PositionRecord]: """ 移除待成交订单(当订单成交或取消时调用) Args: order_id: 订单 ID Returns: 对应的 PositionRecord,如果存在 """ return self.pending_orders.pop(order_id, None) def init_redis(self, redis_url: str) -> bool: """ 初始化 Redis 连接 Args: redis_url: Redis URL,如 "redis://192.168.1.215:6379/0" Returns: 是否连接成功 Raises: ConnectionError: 如果 Redis 连接失败 """ if not REDIS_AVAILABLE: raise RuntimeError(f"[GridManager] {self.pair} Redis 包未安装,无法继续") try: # 解析 Redis URL parsed = urlparse(redis_url) self.redis_client = redis.Redis( host=parsed.hostname or 'localhost', port=parsed.port or 6379, db=int(parsed.path.strip('/') or 0), decode_responses=True, socket_connect_timeout=5, socket_keepalive=True ) # 测试连接 self.redis_client.ping() print(f"[GridManager] {self.pair} Redis 连接成功 - Key: {self.redis_key}", file=sys.stderr, flush=True) return True except Exception as e: error_msg = f"[GridManager] {self.pair} Redis 连接失败: {str(e)}" print(error_msg, file=sys.stderr, flush=True) self.redis_client = None raise ConnectionError(error_msg) def sync_grid_state_to_redis(self) -> None: """ 将当前的网格状态同步到 Redis 包我每个网格的 FILLED/EMPTY 状态 """ if not self.redis_client: return try: # 构建网格状态字典 grid_states_data = {} filled_count = 0 for price, grid_level in self.grid_states.items(): # 使用未旤的串串化,避免浮点数精度问题 price_key = f"{price:.8f}".rstrip('0').rstrip('.') grid_states_data[price_key] = { "price": price, "status": grid_level.status, "quantity": grid_level.quantity, "entry_price": grid_level.entry_price, "entry_time": grid_level.entry_time } if grid_level.status == "filled": filled_count += 1 # 存存到 Redis redis_key = f"grid_state:{self.pair}" self.redis_client.set( redis_key, json.dumps(grid_states_data), ex=86400 # 24 小时过期 ) print(f"[GridManager] {self.pair} 网格状态同步到 Redis - " f"{filled_count} FILLED, {len(grid_states_data) - filled_count} EMPTY", file=sys.stderr, flush=True) except Exception as e: print(f"[GridManager] {self.pair} 网格状态同步失败: {str(e)}", file=sys.stderr, flush=True) def recover_grid_state_from_redis(self) -> Optional[Dict[str, Any]]: """ 从 Redis 恢复网格状态 """ if not self.redis_client: return None try: redis_key = f"grid_state:{self.pair}" data = self.redis_client.get(redis_key) if not data: return None return json.loads(data) except Exception as e: print(f"[GridManager] {self.pair} 从 Redis 恢复网格状态失败: {str(e)}", file=sys.stderr, flush=True) return None def restore_grid_state(self, grid_states_data: Dict[str, Any]) -> None: """ 恢复网格状态 """ try: restored_count = 0 for price_key, state_data in grid_states_data.items(): # 能儫不同格式: 情况 1:新格式(同 state_data 中有 price 字段) if "price" in state_data: price = state_data["price"] else: # 情况 2:旧格式(直接从键名转换) price = float(price_key) # 找到最接近的存在网格点(处理浮点数精度问题) closest_price = None min_diff = float('inf') for grid_price in self.grid_states.keys(): diff = abs(grid_price - price) if diff < min_diff: min_diff = diff closest_price = grid_price if closest_price is not None and min_diff < 1e-6: # 中仂茇平 grid_level = self.grid_states[closest_price] grid_level.status = state_data.get("status", "empty") grid_level.quantity = state_data.get("quantity", 0.0) grid_level.entry_price = state_data.get("entry_price", 0.0) grid_level.entry_time = state_data.get("entry_time", 0) restored_count += 1 print(f"[GridManager] {self.pair} 网格状态恢复完成 - 恢复 {restored_count}/{len(grid_states_data)} 个网格", file=sys.stderr, flush=True) except Exception as e: print(f"[GridManager] {self.pair} 恢复网格状态失败: {str(e)}", file=sys.stderr, flush=True) def report_to_redis(self) -> None: """ 将当前状态同步到 Redis """ if not self.redis_client: return try: # 构建状态报告 report = { "timestamp": datetime.now().isoformat(), "candle_index": self.candle_index, "current_price": self.current_price, "total_quantity": self.total_quantity, "total_invested": self.total_invested, "avg_entry_price": self.avg_entry_price, "unrealized_profit": (self.current_price - self.avg_entry_price) * self.total_quantity if self.total_quantity > 0 else 0, "active_grid_levels": len(self.grid_levels), "max_positions_ever": self.max_positions, "total_orders": len(self.order_fills), "highest_price": self.highest_price if self.total_quantity > 0 else None, "lowest_price": self.lowest_price if self.total_quantity > 0 else None, } # 存储到 Redis self.redis_client.set( self.redis_key, json.dumps(report), ex=3600 # 1 小时过期 ) except Exception as e: print(f"[GridManager] {self.pair} Redis 写入失败: {str(e)}", file=sys.stderr, flush=True) def generate_report(self) -> str: """ 生成人类可读的状态报告 Returns: 格式化的状态报告字符串 """ unrealized = (self.current_price - self.avg_entry_price) * self.total_quantity if self.total_quantity > 0 else 0 unrealized_pct = (self.current_price - self.avg_entry_price) / self.avg_entry_price * 100 if self.avg_entry_price > 0 else 0 report = f""" {'='*80} [{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 网格交易报告 - {self.pair} {'='*80} 【持仓状态】 当前价格: {self.current_price:.4f} USDT 总持仓数量: {self.total_quantity:.8f} 平均入场价: {self.avg_entry_price:.4f} USDT 总成本投入: {self.total_invested:.2f} USDT 【盈亏状态】 未实现盈亏: {unrealized:.2f} USDT 未实现盈亏率: {unrealized_pct:.2f}% 日期收益状态: {'亏损' if unrealized < 0 else '盈利'} 【网格样态】 活跃网格点数: {len(self.grid_levels)} 历史最大同时持仓: {self.max_positions} 持仓期最高价: {self.highest_price:.4f if self.highest_price > 0 else 'N/A'} USDT 持仓期最低价: {self.lowest_price:.4f if self.lowest_price != float('inf') else 'N/A'} USDT 最低点亏损率: {(self.lowest_price - self.avg_entry_price) / self.avg_entry_price * 100 if self.lowest_price != float('inf') and self.avg_entry_price > 0 else 0:.2f}% 【交易统计】 已成交订单数: {len(self.order_fills)} 待成交订单数: {len(self.pending_orders)} 历史加减仓次数: {len(self.position_history)} 当前蜡烛线索引: {self.candle_index} {'='*80} """ return report def check_and_report(self) -> None: """ 每隔指定间隔执行一次报告,并同步到 Redis 应该在 populate_indicators() 中调用 """ if self.candle_index - self.last_report_candle < self.report_interval_candles: return # 打印报告 report_text = self.generate_report() print(report_text, file=sys.stderr, flush=True) # 同步到 Redis self.report_to_redis() # 更新本次报告时间 self.last_report_candle = self.candle_index print(f"[GridManager] {self.pair} 报告已同步到 Redis", file=sys.stderr, flush=True) @classmethod def recover_from_redis(cls, pair: str, redis_url: str) -> Optional[Dict[str, Any]]: """ 从 Redis 恢复一个 GridManager 对象 当 Docker 容器重启后,通过 Redis 中的持仓状态恢复 GridManager Args: pair: 币对名称,如 "BTC/USDT" redis_url: Redis URL Returns: 恢复后的 GridManager 对象,如果 Redis 中无数据返回 None """ if not REDIS_AVAILABLE: print(f"[GridManager] {pair} Redis 未安装,无法恢复", file=sys.stderr, flush=True) return None try: # 连接 Redis parsed = urlparse(redis_url) redis_client = redis.Redis( host=parsed.hostname or 'localhost', port=parsed.port or 6379, db=int(parsed.path.strip('/') or 0), decode_responses=True, socket_connect_timeout=5 ) # 测试连接 redis_client.ping() # 尝试获取持仓数据 redis_key = f"grid:{pair}" data = redis_client.get(redis_key) if not data: print(f"[GridManager] {pair} Redis 中无持仓数据,无法恢复", file=sys.stderr, flush=True) return None # 解析 JSON state = json.loads(data) # 从 Redis 数据反向构建 GridManager # 由于 Redis 中只有最新的快照,我们无法完全恢复所有历史 # 但可以恢复当前的持仓状态 print(f"[GridManager] {pair} 从 Redis 恢复持仓数据:", file=sys.stderr, flush=True) print(f" 持仓数量: {state.get('total_quantity')}", file=sys.stderr, flush=True) print(f" 平均价: {state.get('avg_entry_price')}", file=sys.stderr, flush=True) print(f" 总投入: {state.get('total_invested')}", file=sys.stderr, flush=True) return state # 返回原始数据,让调用者处理恢复 except Exception as e: print(f"[GridManager] {pair} 从 Redis 恢复失败: {str(e)}", file=sys.stderr, flush=True) return None def restore_from_redis_state(self, redis_state: Dict[str, Any]) -> None: """ 从 Redis 恢复的状态数据中还原对象内部状态 Args: redis_state: 从 recover_from_redis() 获取的状态字典 """ try: self.current_price = redis_state.get('current_price', 0) self.total_quantity = redis_state.get('total_quantity', 0) self.total_invested = redis_state.get('total_invested', 0) self.avg_entry_price = redis_state.get('avg_entry_price', 0) self.highest_price = redis_state.get('highest_price', 0) or 0 self.lowest_price = redis_state.get('lowest_price', float('inf')) or float('inf') self.max_positions = redis_state.get('max_positions_ever', 0) self.candle_index = redis_state.get('candle_index', 0) # 重新初始化 grid_levels(从 avg_entry_price 推断) if self.total_quantity > 0 and self.avg_entry_price > 0: # 创建一个占位符网格点,表示当前持仓 grid_price = round(self.avg_entry_price / self.step) * self.step self.grid_levels[grid_price] = GridLevel( price=grid_price, quantity=self.total_quantity, entry_time=self.candle_index, status="open" ) print(f"[GridManager] {self.pair} 状态已从 Redis 恢复", file=sys.stderr, flush=True) except Exception as e: print(f"[GridManager] {self.pair} 状态恢复失败: {str(e)}", file=sys.stderr, flush=True) def destroy(self) -> None: """ 销毁网格管理器,清理 Redis 中存储的网格状态 通常在平仓后调用,表示该网格管理器不再有价值 """ try: if self.redis_client: # 删除 Redis 中该币对的网格状态 self.redis_client.delete(self.redis_key) print(f"[GridManager] {self.pair} 已从 Redis 删除网格状态", file=sys.stderr, flush=True) except Exception as e: print(f"[GridManager] {self.pair} 删除 Redis 状态失败: {str(e)}", file=sys.stderr, flush=True) # 清空内部状态 self.grid_states.clear() self.position_history.clear() self.order_fills.clear() self.total_quantity = 0.0 self.total_invested = 0.0 self.avg_entry_price = 0.0 print(f"[GridManager] {self.pair} 网格管理器已销毁,可以释放该对象", file=sys.stderr, flush=True) def serialize_to_dict(self) -> Dict[str, Any]: """ 将 GridManager 的元数据序列化为字典 用于存储到 Redis Returns: 包含所有元数据的字典 """ return { 'pair': self.pair, 'hash_id': self.hash_id, 'created_time': self.created_time, 'lower_price': self.lower_price, 'upper_price': self.upper_price, 'step': self.step, 'stake_per_grid': self.stake_per_grid, 'pointer_index': self.pointer_index, 'total_grid_levels': self.total_grid_levels, 'is_completed': self.is_completed, 'completion_reason': self.completion_reason, 'current_price': self.current_price, 'total_quantity': self.total_quantity, 'total_invested': self.total_invested, 'avg_entry_price': self.avg_entry_price, 'highest_price': self.highest_price, 'lowest_price': self.lowest_price if self.lowest_price != float('inf') else -1, 'candle_index': self.candle_index, 'current_grid_list_key': self.current_grid_list_key, 'history_grid_list_key': self.history_grid_list_key, } @staticmethod def deserialize_from_dict(data: Dict[str, Any]) -> 'GridManager': """ 从字典恢复 GridManager 对象 Args: data: 包含元数据的字典 Returns: 恢复后的 GridManager 对象 """ # 创建新的 GridManager gm = GridManager( pair=data['pair'], lower_price=data['lower_price'], upper_price=data['upper_price'], step=data['step'], stake_per_grid=data['stake_per_grid'] ) # 恢复元数据 gm.hash_id = data['hash_id'] gm.created_time = data['created_time'] gm.pointer_index = data['pointer_index'] gm.is_completed = data['is_completed'] gm.completion_reason = data['completion_reason'] gm.current_price = data['current_price'] gm.total_quantity = data['total_quantity'] gm.total_invested = data['total_invested'] gm.avg_entry_price = data['avg_entry_price'] gm.highest_price = data['highest_price'] lowest = data['lowest_price'] gm.lowest_price = float('inf') if lowest == -1 else lowest gm.candle_index = data['candle_index'] gm.current_grid_list_key = data['current_grid_list_key'] gm.history_grid_list_key = data['history_grid_list_key'] gm.total_grid_levels = data['total_grid_levels'] print(f"[GridManager] {data['pair']} 从字典恢复成功 - hash_id: {gm.hash_id}", file=sys.stderr, flush=True) return gm def save_to_redis(self) -> bool: """ 将 GridManager 的元数据保存到 Redis Returns: 是否保存成功 """ if not self.redis_client: print(f"[GridManager] {self.pair} Redis 客户端未初始化", file=sys.stderr, flush=True) return False try: data = self.serialize_to_dict() self.redis_client.set( self.gridmanager_key, json.dumps(data) ) print(f"[GridManager] {self.pair} 元数据已保存到 Redis - key: {self.gridmanager_key}", file=sys.stderr, flush=True) return True except Exception as e: print(f"[GridManager] {self.pair} 保存到 Redis 失败: {str(e)}", file=sys.stderr, flush=True) return False @staticmethod def load_from_redis(pair: str, redis_url: str, hash_id: str = None) -> Optional['GridManager']: """ 从 Redis 加载 GridManager 如果 hash_id 为空,则加载最新的(按时间戳) Args: pair: 币对 redis_url: Redis 连接URL hash_id: 可选,指定特定的 GridManager Returns: 恢复的 GridManager 或 None """ try: parsed_url = urlparse(redis_url) redis_client = redis.Redis( host=parsed_url.hostname or 'localhost', port=parsed_url.port or 6379, db=int(parsed_url.path.strip('/') or 0), decode_responses=True ) if hash_id: # 加载指定的 GridManager # 需要遍历查找(因为 key 包含 timestamp) # 这里简化为直接用 hash_id 查找最新的 pattern = f"gm:{pair}:*:{hash_id}" keys = redis_client.keys(pattern) if keys: data = redis_client.get(keys[0]) if data: return GridManager.deserialize_from_dict(json.loads(data)) else: # 加载最新的 GridManager(按 key 中的时间戳排序) pattern = f"gm:{pair}:*" keys = redis_client.keys(pattern) if keys: # 按时间戳排序,取最新的 keys_sorted = sorted(keys, reverse=True) data = redis_client.get(keys_sorted[0]) if data: return GridManager.deserialize_from_dict(json.loads(data)) print(f"[GridManager] {pair} Redis 中未找到 GridManager", file=sys.stderr, flush=True) return None except Exception as e: print(f"[GridManager] {pair} 从 Redis 加载失败: {str(e)}", file=sys.stderr, flush=True) return None # ===== Redis List 操作接口 ===== def add_to_current_grid_list(self, grid: 'Grid') -> bool: """ 添加一个 Grid 到 currentGridList(从右侧入栈) 后进先出,最新添加的在右侧 Args: grid: Grid 对象 Returns: 是否添加成功 """ if not self.redis_client: return False try: grid_data = { 'hash_id': grid.hash_id, 'grid_index': grid.grid_index, 'direction': grid.direction, 'entry_price': grid.entry_price, 'entry_time': grid.entry_time, 'quantity': grid.quantity, 'bid_price': grid.bid_price, 'bid_time': grid.bid_time, } self.redis_client.rpush( self.current_grid_list_key, json.dumps(grid_data) ) return True except Exception as e: print(f"[GridManager] {self.pair} 添加到 currentGridList 失败: {str(e)}", file=sys.stderr, flush=True) return False def remove_from_current_grid_list(self, hash_id: str) -> bool: """ 从 currentGridList 移除指定的 Grid (按 hash_id 查找并删除) Args: hash_id: 要移除的 Grid 的 hash_id Returns: 是否移除成功 """ if not self.redis_client: return False try: # 获取列表中的所有元素 items = self.redis_client.lrange(self.current_grid_list_key, 0, -1) # 重新构建列表(跳过目标元素) self.redis_client.delete(self.current_grid_list_key) for item in items: data = json.loads(item) if data['hash_id'] != hash_id: self.redis_client.rpush(self.current_grid_list_key, item) return True except Exception as e: print(f"[GridManager] {self.pair} 从 currentGridList 移除失败: {str(e)}", file=sys.stderr, flush=True) return False def get_current_grid_list(self) -> list: """ 获取整个 currentGridList Returns: Grid 对象列表 """ if not self.redis_client: return [] try: items = self.redis_client.lrange(self.current_grid_list_key, 0, -1) grids = [] for item in items: data = json.loads(item) grid = Grid( hash_id=data['hash_id'], grid_index=data['grid_index'], direction=data['direction'], entry_price=data.get('entry_price'), entry_time=data.get('entry_time'), quantity=data.get('quantity', 0.0), bid_price=data.get('bid_price'), bid_time=data.get('bid_time'), ) grids.append(grid) return grids except Exception as e: print(f"[GridManager] {self.pair} 获取 currentGridList 失败: {str(e)}", file=sys.stderr, flush=True) return [] def add_to_history_grid_list(self, grid: 'Grid', filled_price: float, filled_time: int, profit: float) -> bool: """ 添加一个已兑现的 Grid 到 historyGridList(仅追加) Args: grid: Grid 对象 filled_price: 兑现价格 filled_time: 兑现时间(蜡烛线索引) profit: 单个网格的盈亏 Returns: 是否添加成功 """ if not self.redis_client: return False try: history_data = { 'hash_id': grid.hash_id, 'grid_index': grid.grid_index, 'direction': grid.direction, 'entry_price': grid.entry_price, 'entry_time': grid.entry_time, 'quantity': grid.quantity, 'bid_price': grid.bid_price, 'filled_price': filled_price, 'filled_time': filled_time, 'profit': profit, } self.redis_client.rpush( self.history_grid_list_key, json.dumps(history_data) ) return True except Exception as e: print(f"[GridManager] {self.pair} 添加到 historyGridList 失败: {str(e)}", file=sys.stderr, flush=True) return False def get_history_grid_list(self) -> list: """ 获取整个 historyGridList Returns: 已兑现的 Grid 对象列表 """ if not self.redis_client: return [] try: items = self.redis_client.lrange(self.history_grid_list_key, 0, -1) grids = [] for item in items: data = json.loads(item) grid = Grid( hash_id=data['hash_id'], grid_index=data['grid_index'], direction=data['direction'], entry_price=data.get('entry_price'), entry_time=data.get('entry_time'), quantity=data.get('quantity', 0.0), bid_price=data.get('bid_price'), filled_price=data.get('filled_price'), filled_time=data.get('filled_time'), profit=data.get('profit'), ) grids.append(grid) return grids except Exception as e: print(f"[GridManager] {self.pair} 获取 historyGridList 失败: {str(e)}", file=sys.stderr, flush=True) return [] def calculate_pnl(self) -> Dict[str, float]: """ 通过对比 currentGridList 和 historyGridList 计算盈亏 Returns: 包含以下字段的字典: - total_entry_cost: 总建仓成本 - total_filled_revenue: 总兑现收益 - realized_pnl: 已实现盈亏 - unrealized_pnl: 未实现盈亏 - total_pnl: 总盈亏 """ try: history = self.get_history_grid_list() current = self.get_current_grid_list() # 计算已兑现的盈亏 realized_pnl = sum(grid.profit or 0.0 for grid in history) # 计算未实现的盈亏(当前持仓) unrealized_pnl = 0.0 if self.current_price and self.avg_entry_price > 0: unrealized_pnl = (self.current_price - self.avg_entry_price) * self.total_quantity # 总成本和收益 total_entry_cost = sum( (grid.entry_price or 0) * grid.quantity for grid in history if grid.direction == 'buy' ) total_filled_revenue = sum( (grid.filled_price or 0) * grid.quantity for grid in history if grid.direction == 'sell' ) return { 'total_entry_cost': total_entry_cost, 'total_filled_revenue': total_filled_revenue, 'realized_pnl': realized_pnl, 'unrealized_pnl': unrealized_pnl, 'total_pnl': realized_pnl + unrealized_pnl, } except Exception as e: print(f"[GridManager] {self.pair} 计算盈亏失败: {str(e)}", file=sys.stderr, flush=True) return {} # ===== Pointer Index 操作接口 ===== def move_pointer_left(self) -> bool: """ 指针左移(价格下跌) - 右侧的卖单被兑现,从 currentGridList 移除 - 新的买单补充到左侧 Returns: 是否移动成功 """ if self.pointer_index <= 0: print(f"[GridManager] {self.pair} 指针已在最左侧,无法继续左移", file=sys.stderr, flush=True) return False try: current_grids = self.get_current_grid_list() # 找到右侧的卖单(index = pointer_index 之后的第一个卖单) sell_grid = None for grid in reversed(current_grids): if grid.direction == 'sell' and grid.grid_index >= self.pointer_index: sell_grid = grid break if sell_grid: # 将卖单兑现,移到 historyGridList if self.current_price and sell_grid.bid_price: profit = (sell_grid.bid_price - sell_grid.entry_price) * sell_grid.quantity if sell_grid.entry_price else 0 self.add_to_history_grid_list( sell_grid, filled_price=self.current_price, filled_time=self.candle_index, profit=profit ) # 从 currentGridList 移除 self.remove_from_current_grid_list(sell_grid.hash_id) # 左移指针 self.pointer_index -= 1 self.save_to_redis() print(f"[GridManager] {self.pair} 指针左移 {self.pointer_index + 1} -> {self.pointer_index}, " f"卖单已兑现: {sell_grid.hash_id}", file=sys.stderr, flush=True) return True return False except Exception as e: print(f"[GridManager] {self.pair} 指针左移失败: {str(e)}", file=sys.stderr, flush=True) return False def move_pointer_right(self) -> bool: """ 指针右移(价格上涨) - 左侧的买单被兑现,从 currentGridList 移除 - 新的卖单补充到右侧 Returns: 是否移动成功 """ if self.pointer_index >= self.TOTAL_GRIDS - 1: print(f"[GridManager] {self.pair} 指针已在最右侧,无法继续右移", file=sys.stderr, flush=True) return False try: current_grids = self.get_current_grid_list() # 找到左侧的买单(index = pointer_index 之前的最后一个买单) buy_grid = None for grid in current_grids: if grid.direction == 'buy' and grid.grid_index < self.pointer_index: buy_grid = grid if buy_grid: # 将买单兑现,移到 historyGridList if self.current_price and buy_grid.entry_price: profit = (self.current_price - buy_grid.entry_price) * buy_grid.quantity self.add_to_history_grid_list( buy_grid, filled_price=self.current_price, filled_time=self.candle_index, profit=profit ) # 从 currentGridList 移除 self.remove_from_current_grid_list(buy_grid.hash_id) # 右移指针 self.pointer_index += 1 self.save_to_redis() print(f"[GridManager] {self.pair} 指针右移 {self.pointer_index - 1} -> {self.pointer_index}, " f"买单已兑现: {buy_grid.hash_id}", file=sys.stderr, flush=True) return True return False except Exception as e: print(f"[GridManager] {self.pair} 指针右移失败: {str(e)}", file=sys.stderr, flush=True) return False def update_pointer_by_price(self, new_price: float) -> int: """ 根据当前价格自动更新指针位置 计算当前价格应该对应的网格索引 Args: new_price: 新的价格 Returns: 指针移动的步数(负数表示左移,正数表示右移,0表示不动) """ try: # 计算当前价格对应的网格索引 if self.step <= 0: return 0 price_offset = (new_price - self.lower_price) / self.step new_pointer = int(price_offset) # 限制在有效范围内 new_pointer = max(0, min(new_pointer, self.TOTAL_GRIDS - 1)) # 计算移动步数 move_steps = new_pointer - self.pointer_index if move_steps < 0: # 价格下跌,指针左移 for _ in range(abs(move_steps)): self.move_pointer_left() elif move_steps > 0: # 价格上涨,指针右移 for _ in range(move_steps): self.move_pointer_right() return move_steps except Exception as e: print(f"[GridManager] {self.pair} 更新指针失败: {str(e)}", file=sys.stderr, flush=True) return 0 def get_pointer_info(self) -> Dict[str, Any]: """ 获取指针的详细信息 Returns: 包含指针状态的字典 """ current_grids = self.get_current_grid_list() history_grids = self.get_history_grid_list() buy_count = sum(1 for g in current_grids if g.direction == 'buy') sell_count = sum(1 for g in current_grids if g.direction == 'sell') return { 'pointer_index': self.pointer_index, 'pointer_price': self.lower_price + self.pointer_index * self.step, 'current_price': self.current_price, 'current_grid_count': len(current_grids), 'current_buy_count': buy_count, 'current_sell_count': sell_count, 'history_grid_count': len(history_grids), 'total_pnl': self.calculate_pnl().get('total_pnl', 0), }