""" 网格交易管理器 - 维护单个币对的完整网格持仓生命周期 """ from dataclasses import dataclass, field from typing import Optional, List, Dict, Any from enum import Enum import sys import json from datetime import datetime from urllib.parse import urlparse try: import redis REDIS_AVAILABLE = True except ImportError: REDIS_AVAILABLE = False class AdjustmentType(Enum): """加减仓类型""" ENTRY = "entry" # 初始建仓 ADD = "add_position" # 加仓 REDUCE = "reduce_position" # 减仓 EXIT = "exit" # 全部平仓 @dataclass class GridLevel: """单个网格点的状态""" price: float # 网格价格 status: str # "filled" (已买入) 或 "empty" (未买/已卖出) quantity: float # 该网格的持仓数量 entry_price: float # 该网格的实际买入价格 entry_time: int # 建仓蜡烛线索引 @dataclass class PositionRecord: """单次加减仓记录""" level_index: int # 网格级别序号 price: float # 操作价格 quantity: float # 操作份数 type: AdjustmentType # 操作类型 timestamp: int # 操作蜡烛线索引 @dataclass class OrderFill: """订单成交信息""" order_id: str # 订单 ID pair: str # 币对 side: str # "buy" 或 "sell" filled_price: float # 成交价格 filled_amount: float # 成交数量 fee: float # 手续费(USDT) timestamp: int # 成交蜡烛线索引 reason: str = "" # 成交原因(entry/add/exit 等) class GridManager: """ 网格交易管理器 - 核心思想:维护网格的填充/排空状态 网格逻辑: - 下沿到当前价格 → 所有网格必须被填满(已买入持仓) - 当前价格到上沿 → 所有网格必须被排空(不持有 或 已卖出) 决策规则: 1. 如果下方有空白网格 → 加仓(填满它们) 2. 如果上方有持仓网格 → 平仓(排空它们) 3. 维护一个网格状态数组,每个位置记录 "filled" 或 "empty" """ def __init__(self, pair: str, lower_price: float, upper_price: float, step: float, stake_per_grid: float): """ 初始化网格管理器 Args: pair: 币对名称,如 "ETH/USDT" lower_price: 网格下限价格 upper_price: 网格上限价格 step: 网格间距 stake_per_grid: 每个网格的投资额 """ self.pair = pair self.lower_price = lower_price self.upper_price = upper_price self.step = step self.stake_per_grid = stake_per_grid # 计算总网格数 self.total_grid_levels = int((upper_price - lower_price) / step) + 1 # 生成所有网格点价格 self.grid_prices = [lower_price + i * step for i in range(self.total_grid_levels)] # 核心数据结构:网格状态数组 # grid_states[i] = GridLevel 对象,记录该网格的状态(filled/empty) self.grid_states: Dict[float, GridLevel] = {} for price in self.grid_prices: self.grid_states[price] = GridLevel( price=price, status="empty", # 初始状态:全部为 empty(未持有) quantity=0.0, # 当前持仓数量 entry_price=0.0, # 买入价格 entry_time=0 ) # 持仓统计 self.position_history: List[PositionRecord] = [] # 历史加减仓记录 self.order_fills: List[OrderFill] = [] # 所有订单成交记录 self.pending_orders: Dict[str, PositionRecord] = {} # 待成交的订单映射 # 当前状态 self.current_price: Optional[float] = None self.total_quantity: float = 0.0 # 总持仓数量 self.total_invested: float = 0.0 # 总投入金额 self.avg_entry_price: float = 0.0 # 平均建仓价 self.highest_price: float = 0.0 # 持仓期间最高价 self.lowest_price: float = float('inf') # 持仓期间最低价 # 调试 self.candle_index = 0 # 当前蜡烛线索引 # Redis 连接 self.redis_client: Optional[redis.Redis] = None self.redis_key = f"grid:{pair}" # Redis 存储键 # 报告 self.last_report_candle = 0 # 上次报告的蜡烛线 self.report_interval_candles = 600 # 每 600 个蜡烛线报告一次(约 10h,1h K线) def update_state(self, current_price: float, candle_index: int) -> None: """ 更新网格对象的当前状态 这个方法应该在每个蜡烛线调用一次,用最新的市场价格更新网格对象 Args: current_price: 当前市场价格 candle_index: 当前蜡烛线索引(用于时间戳) """ self.current_price = current_price self.candle_index = candle_index # 更新最高/最低价 if self.total_quantity > 0: if current_price > self.highest_price: self.highest_price = current_price if current_price < self.lowest_price: self.lowest_price = current_price print(f"[GridManager] {self.pair} 状态更新 - 价格: {current_price:.2f}, " f"持仓: {self.total_quantity:.6f}, 平均价: {self.avg_entry_price:.2f}", file=sys.stderr, flush=True) def apply_adjustment(self, adjustment: PositionRecord) -> None: """ 应用一次加减仓操作,并更新网格状态 通过更新 grid_states 中相关网格的状态(filled/empty)来报告内部一致性 Args: adjustment: PositionRecord 对象,包含加减仓的所有信息 """ price = adjustment.price quantity = adjustment.quantity adj_type = adjustment.type # 找到该操作所属的网格点 grid_price = self._round_to_grid(price) grid_state = self.grid_states.get(grid_price) if adj_type == AdjustmentType.ENTRY or adj_type == AdjustmentType.ADD: # 建仓或加仓 → 将该网格标记为 FILLED if grid_state: grid_state.status = "filled" grid_state.quantity += quantity grid_state.entry_price = price # 记录实际买入价 grid_state.entry_time = self.candle_index # 更新持仓统计 self.total_invested += quantity * price self.total_quantity += quantity if self.total_quantity > 0: self.avg_entry_price = self.total_invested / self.total_quantity print(f"[GridManager] {self.pair} 加仓 - 网格 {grid_price:.2f} 标记为 FILLED, " f"数量: {quantity:.6f}, 总持仓: {self.total_quantity:.6f}", file=sys.stderr, flush=True) elif adj_type == AdjustmentType.REDUCE: # 减仓 → 其他网格的故事,不常用(网格交易通常只有 ENTRY/ADD/EXIT) if grid_state: grid_state.quantity -= quantity if grid_state.quantity <= 0: grid_state.quantity = 0 self.total_quantity -= quantity self.total_invested -= quantity * grid_state.entry_price if grid_state else 0 if self.total_quantity > 0: self.avg_entry_price = self.total_invested / self.total_quantity print(f"[GridManager] {self.pair} 减仓 - 网格 {grid_price:.2f}, " f"减少: {quantity:.6f}, 剩余持仓: {self.total_quantity:.6f}", file=sys.stderr, flush=True) elif adj_type == AdjustmentType.EXIT: # 平仓 → 将所有 FILLED 的网格一次排空(或不排空对象网格) # 此处只是记录平仓决策,实际的排空是由 decide_adjustment 程序控制 print(f"[GridManager] {self.pair} 平仓 - 总持仓: {self.total_quantity:.6f}, " f"平均价: {self.avg_entry_price:.2f}, 当前价: {price:.2f}", file=sys.stderr, flush=True) # 清空所有 FILLED 网格,下次套需要绍转建仓时重新填满 for gs in self.grid_states.values(): if gs.status == "filled": gs.status = "empty" gs.quantity = 0 gs.entry_price = 0.0 # 清空污了持仓计数 self.total_quantity = 0.0 self.total_invested = 0.0 self.avg_entry_price = 0.0 self.highest_price = 0.0 self.lowest_price = float('inf') # 记录到历史 self.position_history.append(adjustment) def decide_adjustment(self) -> Optional[PositionRecord]: """ 判定是否需要加减仓,基于网格填充/排空逻辑 核心规则: 1. 下沿到当前价格 → 所有网格必须被 FILLED(已买入) 如果有 EMPTY 的网格 → 加仓填满它 2. 当前价格到上沿 → 所有网格必须被 EMPTY(不持有) 如果有 FILLED 的网格 → 平仓排空它 Returns: PositionRecord 如果需要操作,否则 None """ if self.current_price is None: return None # 找到当前价格所在的网格点 current_grid_price = self._round_to_grid(self.current_price) # 规则 2: 检查上方是否有需要平仓的网格 # 当前价格到上沿之间的网格都应该是 EMPTY for grid_price in self.grid_prices: if grid_price > current_grid_price: grid_state = self.grid_states.get(grid_price) if grid_state and grid_state.status == "filled" and grid_state.quantity > 0: # 上方有持仓,需要平仓 print(f"[GridManager] {self.pair} 平仓信号 - 价格已涨到 {self.current_price:.2f}, " f"上方网格 {grid_price:.2f} 有持仓需排空", file=sys.stderr, flush=True) # 平仓该网格 return PositionRecord( level_index=self._price_to_level_index(grid_price), price=self.current_price, quantity=grid_state.quantity, type=AdjustmentType.EXIT, timestamp=self.candle_index ) # 规则 1: 检查下方是否有需要加仓的网格 # 下沿到当前价格之间的网格都应该是 FILLED for grid_price in self.grid_prices: if grid_price <= current_grid_price: grid_state = self.grid_states.get(grid_price) if grid_state and grid_state.status == "empty" and grid_state.quantity == 0: # 下方有空白网格,需要加仓 print(f"[GridManager] {self.pair} 加仓信号 - 价格已跌到 {self.current_price:.2f}, " f"下方网格 {grid_price:.2f} 为空需填满", file=sys.stderr, flush=True) # 加仓该网格 return PositionRecord( level_index=self._price_to_level_index(grid_price), price=self.current_price, quantity=1.0, type=AdjustmentType.ADD if self.total_quantity > 0 else AdjustmentType.ENTRY, timestamp=self.candle_index ) # 没有操作建议(所有网格状态都符合规则) return None def _price_to_level_index(self, price: float) -> int: """将价格转换为网格级别序号""" index = int((price - self.lower_price) / self.step) return max(0, min(index, self.total_grid_levels - 1)) def _round_to_grid(self, price: float) -> float: """ 将价格舍入到最接近的网格点(向下舍入) 基于 lower_price 的偏移量进行计算 """ # 计算相对于下沿的偏移量 offset = price - self.lower_price # 舍入到最接近的网格间距 grid_count = int(offset / self.step) # 返回对应的网格价格 return self.lower_price + grid_count * self.step def get_summary(self) -> Dict[str, Any]: """获取当前持仓的完整摘要""" filled_grids = sum(1 for gs in self.grid_states.values() if gs.status == "filled") return { "pair": self.pair, "current_price": self.current_price, "total_quantity": self.total_quantity, "total_invested": self.total_invested, "avg_entry_price": self.avg_entry_price, "unrealized_profit": (self.current_price - self.avg_entry_price) * self.total_quantity if self.total_quantity > 0 else 0, "filled_grids": filled_grids, # FILLED 的网格数 "empty_grids": self.total_grid_levels - filled_grids, # EMPTY 的网格数 "total_orders": len(self.order_fills), "highest_price": self.highest_price if self.total_quantity > 0 else None, "lowest_price": self.lowest_price if self.total_quantity > 0 else None, } def record_order_fill(self, order_fill: OrderFill) -> None: """ 记录一个订单成交事件 这个方法应该在订单成交时立即调用,将真实的成交数据反映到网格对象 Args: order_fill: OrderFill 对象,包含成交的所有信息 """ self.order_fills.append(order_fill) if order_fill.side == "buy": # 买入订单成交 actual_cost = order_fill.filled_amount * order_fill.filled_price + order_fill.fee self.total_invested += actual_cost self.total_quantity += order_fill.filled_amount # 更新平均价(考虑手续费) if self.total_quantity > 0: self.avg_entry_price = self.total_invested / self.total_quantity print(f"[GridManager] {self.pair} 买入订单成交 - " f"价格: {order_fill.filled_price:.2f}, " f"数量: {order_fill.filled_amount:.6f}, " f"手续费: {order_fill.fee:.4f} USDT, " f"总持仓: {self.total_quantity:.6f}", file=sys.stderr, flush=True) # 记录到 grid_levels if order_fill.filled_price not in self.grid_levels: self.grid_levels[order_fill.filled_price] = GridLevel( price=order_fill.filled_price, quantity=order_fill.filled_amount, entry_time=order_fill.timestamp, status="open" ) else: self.grid_levels[order_fill.filled_price].quantity += order_fill.filled_amount elif order_fill.side == "sell": # 卖出订单成交 sell_proceeds = order_fill.filled_amount * order_fill.filled_price - order_fill.fee self.total_invested -= order_fill.filled_amount * self.avg_entry_price self.total_quantity -= order_fill.filled_amount # 更新平均价 if self.total_quantity > 0: self.avg_entry_price = self.total_invested / self.total_quantity else: self.avg_entry_price = 0.0 # 计算本次卖出的利润 sell_profit = sell_proceeds - (order_fill.filled_amount * self.avg_entry_price if self.avg_entry_price > 0 else 0) profit_pct = (order_fill.filled_price - self.avg_entry_price) / self.avg_entry_price * 100 if self.avg_entry_price > 0 else 0 print(f"[GridManager] {self.pair} 卖出订单成交 - " f"价格: {order_fill.filled_price:.2f}, " f"数量: {order_fill.filled_amount:.6f}, " f"手续费: {order_fill.fee:.4f} USDT, " f"利润: {sell_profit:.2f} USDT ({profit_pct:.2f}%), " f"剩余持仓: {self.total_quantity:.6f}", file=sys.stderr, flush=True) def sync_from_trade_object(self, trade: Any, candle_index: int) -> None: """ 从 Freqtrade 的 Trade 对象同步状态 由于 Trade 对象提供了实际持仓信息,使用其更新网格状态: - 根据平均价找到应该填满的网格 - 全部低于平均价的网格为 FILLED、高于平均价的为 EMPTY Args: trade: Freqtrade Trade 对象 candle_index: 当前蜡烛线索引 """ # 从 trade 对象提取关键信息 self.total_quantity = trade.amount self.total_invested = trade.stake_amount self.avg_entry_price = trade.open_rate # 根据 Trade 对象的平均价,更新所有网格的状态 # 下沿 → 平均价:全部 FILLED # 平均价 → 上沿:全部 EMPTY avg_grid_price = self._round_to_grid(self.avg_entry_price) for price, grid_state in self.grid_states.items(): if price < avg_grid_price: # 下沿:应该是 FILLED if self.total_quantity > 0: grid_state.status = "filled" grid_state.entry_price = self.avg_entry_price grid_state.entry_time = candle_index # 简化:把所有 filled 网格的持仓信息污盖到了均价 # 实际应用中会根据 trade.trades 进行细上漂歪 else: grid_state.status = "empty" grid_state.quantity = 0 elif price > avg_grid_price: # 上沿:应该是 EMPTY grid_state.status = "empty" grid_state.quantity = 0 else: # 不太可能抹挡,但为了严谨标记为 FILLED grid_state.status = "filled" grid_state.entry_price = self.avg_entry_price grid_state.entry_time = candle_index print(f"[GridManager] {self.pair} 从 Trade 对象同步 - " f"持仓: {self.total_quantity:.6f}, " f"平均价: {self.avg_entry_price:.2f}, " f"下沿到平均价的网格标记为 FILLED", file=sys.stderr, flush=True) def record_pending_order(self, order_id: str, adjustment: PositionRecord) -> None: """ 记录一个待成交的订单 当策略决定建仓/加仓后,订单被提交给 Freqtrade 这个方法记录订单 ID 与策略决策的映射关系 Args: order_id: Freqtrade 返回的订单 ID adjustment: 对应的 PositionRecord 对象 """ self.pending_orders[order_id] = adjustment print(f"[GridManager] {self.pair} 记录待成交订单 - " f"订单ID: {order_id}, " f"类型: {adjustment.type.value}, " f"价格: {adjustment.price:.2f}", file=sys.stderr, flush=True) def remove_pending_order(self, order_id: str) -> Optional[PositionRecord]: """ 移除待成交订单(当订单成交或取消时调用) Args: order_id: 订单 ID Returns: 对应的 PositionRecord,如果存在 """ return self.pending_orders.pop(order_id, None) def init_redis(self, redis_url: str) -> bool: """ 初始化 Redis 连接 Args: redis_url: Redis URL,如 "redis://192.168.1.215:6379/0" Returns: 是否连接成功 """ if not REDIS_AVAILABLE: print(f"[GridManager] {self.pair} Redis 未安装,跳过 Redis 连接", file=sys.stderr, flush=True) return False try: # 解析 Redis URL parsed = urlparse(redis_url) self.redis_client = redis.Redis( host=parsed.hostname or 'localhost', port=parsed.port or 6379, db=int(parsed.path.strip('/') or 0), decode_responses=True, socket_connect_timeout=5, socket_keepalive=True ) # 测试连接 self.redis_client.ping() print(f"[GridManager] {self.pair} Redis 连接成功 - Key: {self.redis_key}", file=sys.stderr, flush=True) return True except Exception as e: print(f"[GridManager] {self.pair} Redis 连接失败: {str(e)}", file=sys.stderr, flush=True) self.redis_client = None return False def report_to_redis(self) -> None: """ 将当前状态同步到 Redis """ if not self.redis_client: return try: # 构成状态报告 report = { "timestamp": datetime.now().isoformat(), "candle_index": self.candle_index, "current_price": self.current_price, "total_quantity": self.total_quantity, "total_invested": self.total_invested, "avg_entry_price": self.avg_entry_price, "unrealized_profit": (self.current_price - self.avg_entry_price) * self.total_quantity if self.total_quantity > 0 else 0, "active_grid_levels": len(self.grid_levels), "max_positions_ever": self.max_positions, "total_orders": len(self.order_fills), "highest_price": self.highest_price if self.total_quantity > 0 else None, "lowest_price": self.lowest_price if self.total_quantity > 0 else None, } # 存储到 Redis self.redis_client.set( self.redis_key, json.dumps(report), ex=3600 # 1 小时过期 ) except Exception as e: print(f"[GridManager] {self.pair} Redis 写入失败: {str(e)}", file=sys.stderr, flush=True) def generate_report(self) -> str: """ 生成人类可读的状态报告 Returns: 格式化的状态报告字符串 """ unrealized = (self.current_price - self.avg_entry_price) * self.total_quantity if self.total_quantity > 0 else 0 unrealized_pct = (self.current_price - self.avg_entry_price) / self.avg_entry_price * 100 if self.avg_entry_price > 0 else 0 report = f""" {'='*80} [{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 网格交易报告 - {self.pair} {'='*80} 【持仓状态】 当前价格: {self.current_price:.4f} USDT 总持仓数量: {self.total_quantity:.8f} 平均入场价: {self.avg_entry_price:.4f} USDT 总成本投入: {self.total_invested:.2f} USDT 【盈亏状态】 未实现盈亏: {unrealized:.2f} USDT 未实现盈亏率: {unrealized_pct:.2f}% 日期收益状态: {'亏损' if unrealized < 0 else '盈利'} 【网格样态】 活跃网格点数: {len(self.grid_levels)} 历史最大同时持仓: {self.max_positions} 持仓期最高价: {self.highest_price:.4f if self.highest_price > 0 else 'N/A'} USDT 持仓期最低价: {self.lowest_price:.4f if self.lowest_price != float('inf') else 'N/A'} USDT 最低点亏损率: {(self.lowest_price - self.avg_entry_price) / self.avg_entry_price * 100 if self.lowest_price != float('inf') and self.avg_entry_price > 0 else 0:.2f}% 【交易统计】 已成交订单数: {len(self.order_fills)} 待成交订单数: {len(self.pending_orders)} 历史加减仓次数: {len(self.position_history)} 当前蜡烛线索引: {self.candle_index} {'='*80} """ return report def check_and_report(self) -> None: """ 每隔指定间隔执行一次报告,并同步到 Redis 应该在 populate_indicators() 中调用 """ if self.candle_index - self.last_report_candle < self.report_interval_candles: return # 打印报告 report_text = self.generate_report() print(report_text, file=sys.stderr, flush=True) # 同步到 Redis self.report_to_redis() # 更新本次报告时间 self.last_report_candle = self.candle_index print(f"[GridManager] {self.pair} 报告已同步到 Redis", file=sys.stderr, flush=True) @classmethod def recover_from_redis(cls, pair: str, redis_url: str) -> Optional[Dict[str, Any]]: """ 从 Redis 恢复一个 GridManager 对象 当 Docker 容器重启后,通过 Redis 中的持仓状态恢复 GridManager Args: pair: 币对名称,如 "BTC/USDT" redis_url: Redis URL Returns: 恢复后的 GridManager 对象,如果 Redis 中无数据返回 None """ if not REDIS_AVAILABLE: print(f"[GridManager] {pair} Redis 未安装,无法恢复", file=sys.stderr, flush=True) return None try: # 连接 Redis parsed = urlparse(redis_url) redis_client = redis.Redis( host=parsed.hostname or 'localhost', port=parsed.port or 6379, db=int(parsed.path.strip('/') or 0), decode_responses=True, socket_connect_timeout=5 ) # 测试连接 redis_client.ping() # 尝试获取持仓数据 redis_key = f"grid:{pair}" data = redis_client.get(redis_key) if not data: print(f"[GridManager] {pair} Redis 中无持仓数据,无法恢复", file=sys.stderr, flush=True) return None # 解析 JSON state = json.loads(data) # 从 Redis 数据反向构建 GridManager # 由于 Redis 中只有最新的快照,我们无法完全恢复所有历史 # 但可以恢复当前的持仓状态 print(f"[GridManager] {pair} 从 Redis 恢复持仓数据:", file=sys.stderr, flush=True) print(f" 持仓数量: {state.get('total_quantity')}", file=sys.stderr, flush=True) print(f" 平均价: {state.get('avg_entry_price')}", file=sys.stderr, flush=True) print(f" 总投入: {state.get('total_invested')}", file=sys.stderr, flush=True) return state # 返回原始数据,让调用者处理恢复 except Exception as e: print(f"[GridManager] {pair} 从 Redis 恢复失败: {str(e)}", file=sys.stderr, flush=True) return None def restore_from_redis_state(self, redis_state: Dict[str, Any]) -> None: """ 从 Redis 恢复的状态数据中还原对象内部状态 Args: redis_state: 从 recover_from_redis() 获取的状态字典 """ try: self.current_price = redis_state.get('current_price', 0) self.total_quantity = redis_state.get('total_quantity', 0) self.total_invested = redis_state.get('total_invested', 0) self.avg_entry_price = redis_state.get('avg_entry_price', 0) self.highest_price = redis_state.get('highest_price', 0) or 0 self.lowest_price = redis_state.get('lowest_price', float('inf')) or float('inf') self.max_positions = redis_state.get('max_positions_ever', 0) self.candle_index = redis_state.get('candle_index', 0) # 重新初始化 grid_levels(从 avg_entry_price 推断) if self.total_quantity > 0 and self.avg_entry_price > 0: # 创建一个占位符网格点,表示当前持仓 grid_price = round(self.avg_entry_price / self.step) * self.step self.grid_levels[grid_price] = GridLevel( price=grid_price, quantity=self.total_quantity, entry_time=self.candle_index, status="open" ) print(f"[GridManager] {self.pair} 状态已从 Redis 恢复", file=sys.stderr, flush=True) except Exception as e: print(f"[GridManager] {self.pair} 状态恢复失败: {str(e)}", file=sys.stderr, flush=True)