985 lines
40 KiB
Python
985 lines
40 KiB
Python
"""
|
||
网格交易管理器 - 维护单个币对的完整网格持仓生命周期
|
||
"""
|
||
from dataclasses import dataclass, field
|
||
from typing import Optional, List, Dict, Any
|
||
from enum import Enum
|
||
import sys
|
||
import json
|
||
from datetime import datetime
|
||
from urllib.parse import urlparse
|
||
|
||
try:
|
||
import redis
|
||
REDIS_AVAILABLE = True
|
||
except ImportError:
|
||
REDIS_AVAILABLE = False
|
||
|
||
|
||
class AdjustmentType(Enum):
|
||
"""加减仓类型"""
|
||
ENTRY = "entry" # 初始建仓
|
||
ADD = "add_position" # 加仓
|
||
REDUCE = "reduce_position" # 减仓
|
||
EXIT = "exit" # 全部平仓
|
||
|
||
|
||
@dataclass
|
||
class Grid:
|
||
"""
|
||
单个网格对象 - 代表网格中的一个位置
|
||
|
||
可以是已成交的买单或未成交的卖单
|
||
"""
|
||
hash_id: str # 随机唯一标识
|
||
grid_index: int # 网格位置索引 (0-49)
|
||
direction: str # "buy" 或 "sell"
|
||
|
||
# 对于已成交买单
|
||
entry_price: Optional[float] = None # 被填充时的入场价格
|
||
entry_time: Optional[int] = None # 被填充的时间(蜡烛线索引)
|
||
quantity: float = 0.0 # 持仓数量
|
||
|
||
# 对于未成交卖单
|
||
bid_price: Optional[float] = None # 挂单价格
|
||
bid_time: Optional[int] = None # 挂单时间
|
||
|
||
# 兑现信息
|
||
filled_price: Optional[float] = None # 兑现价格
|
||
filled_time: Optional[int] = None # 兑现时间
|
||
profit: Optional[float] = None # 单个网格的盈亏
|
||
|
||
|
||
@dataclass
|
||
class GridLevel:
|
||
"""单个网格点的状态"""
|
||
price: float # 网格价格
|
||
status: str # "filled" (已买入) 或 "empty" (未买/已卖出)
|
||
quantity: float # 该网格的持仓数量
|
||
entry_price: float # 该网格的实际买入价格
|
||
entry_time: int # 建仓蜡烛线索引
|
||
|
||
|
||
@dataclass
|
||
class PositionRecord:
|
||
"""单次加减仓记录"""
|
||
level_index: int # 网格级别序号
|
||
price: float # 操作价格
|
||
quantity: float # 操作份数
|
||
type: AdjustmentType # 操作类型
|
||
timestamp: int # 操作蜡烛线索引
|
||
|
||
|
||
@dataclass
|
||
class OrderFill:
|
||
"""订单成交信息"""
|
||
order_id: str # 订单 ID
|
||
pair: str # 币对
|
||
side: str # "buy" 或 "sell"
|
||
filled_price: float # 成交价格
|
||
filled_amount: float # 成交数量
|
||
fee: float # 手续费(USDT)
|
||
timestamp: int # 成交蜡烛线索引
|
||
reason: str = "" # 成交原因(entry/add/exit 等)
|
||
|
||
|
||
class GridMonitor:
|
||
"""
|
||
GridManager 自监测机制 - 判断是否需要止盈或止损
|
||
|
||
简单策略:
|
||
- 止盈:持仓利润达到 target_profit_pct(如 2%)
|
||
- 止损:持仓亏损达到 stop_loss_pct(如 -5%)
|
||
"""
|
||
|
||
def __init__(self,
|
||
target_profit_pct: float = 0.02, # 止盈目标 2%
|
||
stop_loss_pct: float = -0.05): # 止损阈值 -5%
|
||
self.target_profit_pct = target_profit_pct
|
||
self.stop_loss_pct = stop_loss_pct
|
||
|
||
def should_take_profit(self, current_price: float, avg_entry_price: float) -> bool:
|
||
"""
|
||
判断是否应该止盈
|
||
|
||
Args:
|
||
current_price: 当前价格
|
||
avg_entry_price: 平均建仓价
|
||
|
||
Returns:
|
||
True 表示应该止盈
|
||
"""
|
||
if avg_entry_price <= 0:
|
||
return False
|
||
|
||
profit_pct = (current_price - avg_entry_price) / avg_entry_price
|
||
return profit_pct >= self.target_profit_pct
|
||
|
||
def should_stop_loss(self, current_price: float, avg_entry_price: float) -> bool:
|
||
"""
|
||
判断是否应该止损
|
||
|
||
Args:
|
||
current_price: 当前价格
|
||
avg_entry_price: 平均建仓价
|
||
|
||
Returns:
|
||
True 表示应该止损
|
||
"""
|
||
if avg_entry_price <= 0:
|
||
return False
|
||
|
||
loss_pct = (current_price - avg_entry_price) / avg_entry_price
|
||
return loss_pct <= self.stop_loss_pct
|
||
|
||
def get_exit_reason(self, current_price: float, avg_entry_price: float) -> Optional[str]:
|
||
"""
|
||
判断退出原因(止盈或止损)
|
||
|
||
Returns:
|
||
"take_profit", "stop_loss" 或 None
|
||
"""
|
||
if self.should_take_profit(current_price, avg_entry_price):
|
||
return "take_profit"
|
||
elif self.should_stop_loss(current_price, avg_entry_price):
|
||
return "stop_loss"
|
||
return None
|
||
|
||
|
||
class GridManager:
|
||
"""
|
||
网格交易管理器 - 维护完整的持仓生命周期
|
||
|
||
架构:
|
||
- currentGridList: 长度固定=50的网格集合(已成交买单+未成交卖单)
|
||
- historyGridList: 所有已兑现的Grid信息(长度只增不减)
|
||
- pointer_index: 浮标指针(虚拟),分界线位置
|
||
|
||
规则:
|
||
- 价格下跌 → pointer_index左移 → 右侧卖单被兑现
|
||
- 价格上涨 → pointer_index右移 → 左侧买单被兑现
|
||
- 兑现后的Grid自动从currentGridList移除,新Grid补充
|
||
"""
|
||
|
||
# 常量定义:所有都可以修改
|
||
TOTAL_GRIDS = 50 # 总网格数
|
||
FILL_RATIO = 0.5 # 初始填充比例
|
||
PRICE_AMPLITUDE_BTC = 0.20 # BTC波幅 20%
|
||
PRICE_AMPLITUDE_DEFAULT = 0.33 # 其他币种波幅 33%
|
||
|
||
def __init__(self,
|
||
pair: str,
|
||
lower_price: float,
|
||
upper_price: float,
|
||
step: float,
|
||
stake_per_grid: float):
|
||
"""
|
||
初始化网格管理器
|
||
|
||
Args:
|
||
pair: 币对名称,如 "ETH/USDT"
|
||
lower_price: 网格下限价格
|
||
upper_price: 网格上限价格
|
||
step: 网格间距
|
||
stake_per_grid: 每个网格的投资额
|
||
"""
|
||
self.pair = pair
|
||
self.lower_price = lower_price
|
||
self.upper_price = upper_price
|
||
self.step = step
|
||
self.stake_per_grid = stake_per_grid
|
||
|
||
# 创建时间和随机hash_id
|
||
self.created_time = int(datetime.now().timestamp())
|
||
import uuid
|
||
self.hash_id = str(uuid.uuid4())[:8] # 8位随机值
|
||
|
||
# ✅ 新架构:浮标指针
|
||
self.pointer_index = int(self.TOTAL_GRIDS * self.FILL_RATIO) # 初始位置 = 25
|
||
|
||
# currentGridList 和 historyGridList 将由 Redis 存储
|
||
# 这里只保存引用key
|
||
self.current_grid_list_key = f"grids:current:{pair}:{self.hash_id}"
|
||
self.history_grid_list_key = f"grids:history:{pair}:{self.hash_id}"
|
||
|
||
# GridManager 本身的 key(存储元数据)
|
||
self.gridmanager_key = f"gm:{pair}:{self.created_time}:{self.hash_id}"
|
||
|
||
# ... existing code ...
|
||
self.total_grid_levels = int((upper_price - lower_price) / step) + 1
|
||
|
||
# 生成所有网格点价格
|
||
self.grid_prices = [lower_price + i * step for i in range(self.total_grid_levels)]
|
||
|
||
# 核心数据结构:网格状态数组
|
||
# grid_states[i] = GridLevel 对象,记录该网格的状态(filled/empty)
|
||
self.grid_states: Dict[float, GridLevel] = {}
|
||
for price in self.grid_prices:
|
||
self.grid_states[price] = GridLevel(
|
||
price=price,
|
||
status="empty", # 初始状态:全部为 empty(未持有)
|
||
quantity=0.0, # 当前持仓数量
|
||
entry_price=0.0, # 买入价格
|
||
entry_time=0
|
||
)
|
||
|
||
# 持仓统计
|
||
self.position_history: List[PositionRecord] = [] # 历史加减仓记录
|
||
self.order_fills: List[OrderFill] = [] # 所有订单成交记录
|
||
self.pending_orders: Dict[str, PositionRecord] = {} # 待成交的订单映射
|
||
|
||
# 当前状态
|
||
self.current_price: Optional[float] = None
|
||
self.total_quantity: float = 0.0 # 总持仓数量
|
||
self.total_invested: float = 0.0 # 总投入金额
|
||
self.avg_entry_price: float = 0.0 # 平均建仓价
|
||
self.highest_price: float = 0.0 # 持仓期间最高价
|
||
self.lowest_price: float = float('inf') # 持仓期间最低价
|
||
|
||
# 调试
|
||
self.candle_index = 0 # 当前蜡烛线索引
|
||
|
||
# Redis 连接
|
||
self.redis_client: Optional[redis.Redis] = None
|
||
self.redis_key = f"grid:{pair}" # Redis 存储键
|
||
|
||
# 报告
|
||
self.last_report_candle = 0 # 上次报告的蜡烛线
|
||
self.report_interval_candles = 600 # 每 600 个蜡烛线报告一次(约 10h,1h K线)
|
||
|
||
# 控制标志:是否已从 Trade 对象初始化过持仓
|
||
self._synced_from_trade_once = False
|
||
|
||
# 控制标志:是否已经从 Trade 对象一次性初始化过网格状态(第一次有持仓时)
|
||
self._grid_initialized_from_trade = False
|
||
|
||
# ✅ 自监测机制 - 止盈/止损判断
|
||
self.monitor = GridMonitor(
|
||
target_profit_pct=0.02, # 止盈目标: 2%
|
||
stop_loss_pct=-0.05 # 止损阈值: -5%
|
||
)
|
||
self.is_completed = False # GridManager 是否完结
|
||
self.completion_reason: Optional[str] = None # 完结原因 (take_profit / stop_loss)
|
||
self.min_grids_for_exit = 5 # 至少加仓到5个网格才能触发止盈/止损
|
||
|
||
# ✅ GridManager 不变量:沉求下来就不改了(除非网格范围更新)
|
||
# 便于 Redis 序列化保存
|
||
|
||
def update_state(self, current_price: float, candle_index: int) -> None:
|
||
"""
|
||
更新网格对象的当前状态
|
||
|
||
这个方法应该在每个蜡烛线调用一次,用最新的市场价格更新网格对象
|
||
|
||
Args:
|
||
current_price: 当前市场价格
|
||
candle_index: 当前蜡烛线索引(用于时间戳)
|
||
"""
|
||
self.current_price = current_price
|
||
self.candle_index = candle_index
|
||
|
||
# 更新最高/最低价
|
||
if self.total_quantity > 0:
|
||
if current_price > self.highest_price:
|
||
self.highest_price = current_price
|
||
if current_price < self.lowest_price:
|
||
self.lowest_price = current_price
|
||
|
||
# 统计网格状态
|
||
filled_count = sum(1 for gs in self.grid_states.values() if gs.status == "filled")
|
||
empty_count = len(self.grid_states) - filled_count
|
||
|
||
print(f"[GridManager] {self.pair} 状态更新 - 价格: {current_price:.2f}, "
|
||
f"持仓: {self.total_quantity:.6f}, 平均价: {self.avg_entry_price:.2f}, "
|
||
f"网格状态: {filled_count} FILLED / {empty_count} EMPTY",
|
||
file=sys.stderr, flush=True)
|
||
|
||
# ✅ 自监测:检查是否需要止盈/止损
|
||
# 仅在加仓超过最低网格数时才触发,保证有足够的持仓
|
||
if not self.is_completed and self.total_quantity > 0:
|
||
filled_count = sum(1 for gs in self.grid_states.values() if gs.status == "filled")
|
||
if filled_count >= self.min_grids_for_exit:
|
||
exit_reason = self.monitor.get_exit_reason(current_price, self.avg_entry_price)
|
||
if exit_reason:
|
||
self.is_completed = True
|
||
self.completion_reason = exit_reason
|
||
profit_pct = (current_price - self.avg_entry_price) / self.avg_entry_price * 100
|
||
print(f"[GridManager] {self.pair} 自监测触发✅ - 原因: {exit_reason}, "
|
||
f"利润: {profit_pct:.2f}%, 当前价: {current_price:.2f}, 网格数: {filled_count}",
|
||
file=sys.stderr, flush=True)
|
||
|
||
def apply_adjustment(self, adjustment: PositionRecord) -> None:
|
||
"""
|
||
应用一次加减仓操作,并更新网格状态
|
||
|
||
通过更新 grid_states 中相关网格的状态(filled/empty)来报告内部一致性
|
||
|
||
Args:
|
||
adjustment: PositionRecord 对象,包含加减仓的所有信息
|
||
"""
|
||
price = adjustment.price # ✅ 现在 price 已经是网格价格,不需要舍入
|
||
quantity = adjustment.quantity
|
||
adj_type = adjustment.type
|
||
|
||
# ✅ 直接使用 price 作为网格价格
|
||
grid_price = price
|
||
grid_state = self.grid_states.get(grid_price)
|
||
|
||
if adj_type == AdjustmentType.ENTRY or adj_type == AdjustmentType.ADD:
|
||
# 建仓或加仓 → 将该网格标记为 FILLED
|
||
if grid_state:
|
||
grid_state.status = "filled"
|
||
grid_state.quantity += quantity # ✅ 关键:增加该网格的持仓数量
|
||
grid_state.entry_price = price # 记录实际买入价
|
||
grid_state.entry_time = self.candle_index
|
||
|
||
# 更新持仓统计
|
||
self.total_invested += quantity * price
|
||
self.total_quantity += quantity
|
||
|
||
if self.total_quantity > 0:
|
||
self.avg_entry_price = self.total_invested / self.total_quantity
|
||
|
||
print(f"[GridManager] {self.pair} 加仓 - 网格 {grid_price:.2f} 标记为 FILLED, "
|
||
f"数量: {quantity:.6f}, 总持仓: {self.total_quantity:.6f}",
|
||
file=sys.stderr, flush=True)
|
||
|
||
elif adj_type == AdjustmentType.REDUCE:
|
||
# 减仓 → 其他网格的故事,不常用(网格交易通常只有 ENTRY/ADD/EXIT)
|
||
if grid_state:
|
||
grid_state.quantity -= quantity
|
||
if grid_state.quantity <= 0:
|
||
grid_state.quantity = 0
|
||
|
||
self.total_quantity -= quantity
|
||
self.total_invested -= quantity * grid_state.entry_price if grid_state else 0
|
||
|
||
if self.total_quantity > 0:
|
||
self.avg_entry_price = self.total_invested / self.total_quantity
|
||
|
||
print(f"[GridManager] {self.pair} 减仓 - 网格 {grid_price:.2f}, "
|
||
f"减少: {quantity:.6f}, 剩余持仓: {self.total_quantity:.6f}",
|
||
file=sys.stderr, flush=True)
|
||
|
||
elif adj_type == AdjustmentType.EXIT:
|
||
# 平仓 → 将所有 FILLED 的网格一次排空(或不排空对象网格)
|
||
# 此处只是记录平仓决策,实际的排空是由 decide_adjustment 程序控制
|
||
print(f"[GridManager] {self.pair} 平仓 - 总持仓: {self.total_quantity:.6f}, "
|
||
f"平均价: {self.avg_entry_price:.2f}, 当前价: {price:.2f}",
|
||
file=sys.stderr, flush=True)
|
||
|
||
# 清空所有 FILLED 网格,下次套需要绍转建仓时重新填满
|
||
for gs in self.grid_states.values():
|
||
if gs.status == "filled":
|
||
gs.status = "empty"
|
||
gs.quantity = 0
|
||
gs.entry_price = 0.0
|
||
|
||
# 清空污了持仓计数
|
||
self.total_quantity = 0.0
|
||
self.total_invested = 0.0
|
||
self.avg_entry_price = 0.0
|
||
self.highest_price = 0.0
|
||
self.lowest_price = float('inf')
|
||
|
||
# 记录到历史
|
||
self.position_history.append(adjustment)
|
||
|
||
# ✅ 立即同步上次改变上 Redis
|
||
self.sync_grid_state_to_redis()
|
||
|
||
print(f"[GridManager] {self.pair} 网格状态已同步到 Redis",
|
||
file=sys.stderr, flush=True)
|
||
|
||
def decide_adjustment(self) -> Optional[PositionRecord]:
|
||
"""
|
||
判定是否需要加减仓,基于网格填充/排空逻辑
|
||
|
||
核心规则:
|
||
1. 下沿到当前价格 → 所有网格必须被 FILLED(已买入)
|
||
如果有 EMPTY 的网格 → 加仓填满它
|
||
2. 当前价格到上沿 → 所有网格必须被 EMPTY(不持有)
|
||
如果有 FILLED 的网格 → 平仓排空它
|
||
|
||
Returns:
|
||
PositionRecord 如果需要操作,否则 None
|
||
"""
|
||
if self.current_price is None:
|
||
return None
|
||
|
||
# ✅ 自监测止盈/止损:如果已经变为完结,返回 EXIT 信号
|
||
if self.is_completed:
|
||
if self.total_quantity > 0:
|
||
# 全部平仓
|
||
return PositionRecord(
|
||
level_index=0,
|
||
price=self.current_price,
|
||
quantity=self.total_quantity,
|
||
type=AdjustmentType.EXIT,
|
||
timestamp=self.candle_index
|
||
)
|
||
return None
|
||
|
||
# 找到当前价格所在的网格点
|
||
current_grid_price = self._round_to_grid(self.current_price)
|
||
|
||
# 规则 2: 检查上方是否有需要平仓的网格
|
||
# 当前价格到上沿之间的网格都应该是 EMPTY
|
||
for grid_price in self.grid_prices:
|
||
if grid_price > current_grid_price:
|
||
grid_state = self.grid_states.get(grid_price)
|
||
if grid_state and grid_state.status == "filled" and grid_state.quantity > 0:
|
||
# 上方有持仓,需要平仓
|
||
print(f"[GridManager] {self.pair} 平仓信号 - 价格已涨到 {self.current_price:.2f}, "
|
||
f"上方网格 {grid_price:.2f} 有持仓需排空",
|
||
file=sys.stderr, flush=True)
|
||
|
||
# 平仓该网格
|
||
return PositionRecord(
|
||
level_index=self._price_to_level_index(grid_price),
|
||
price=self.current_price,
|
||
quantity=grid_state.quantity,
|
||
type=AdjustmentType.EXIT,
|
||
timestamp=self.candle_index
|
||
)
|
||
|
||
# 规则 1: 检查下方是否有需要加仓的网格
|
||
# 下沿到当前价格之间的网格都应该是 FILLED
|
||
for grid_price in self.grid_prices:
|
||
if grid_price <= current_grid_price:
|
||
grid_state = self.grid_states.get(grid_price)
|
||
if grid_state and grid_state.status == "empty" and grid_state.quantity == 0:
|
||
# 下方有空白网格,需要加仓
|
||
print(f"[GridManager] {self.pair} 加仓信号 - 价格已跌到 {self.current_price:.2f}, "
|
||
f"下方网格 {grid_price:.2f} 为空需填满",
|
||
file=sys.stderr, flush=True)
|
||
|
||
# ✅ 加仓该网格 - 关键修复:price 应该是网格价格,而不是当前价格
|
||
return PositionRecord(
|
||
level_index=self._price_to_level_index(grid_price),
|
||
price=grid_price, # ✅ 修复:使用网格价格而不是当前价格
|
||
quantity=1.0,
|
||
type=AdjustmentType.ADD if self.total_quantity > 0 else AdjustmentType.ENTRY,
|
||
timestamp=self.candle_index
|
||
)
|
||
|
||
# 没有操作建议(所有网格状态都符合规则)
|
||
return None
|
||
|
||
def _price_to_level_index(self, price: float) -> int:
|
||
"""将价格转换为网格级别序号"""
|
||
index = int((price - self.lower_price) / self.step)
|
||
return max(0, min(index, self.total_grid_levels - 1))
|
||
|
||
def _round_to_grid(self, price: float) -> float:
|
||
"""
|
||
将价格舍入到最接近的网格点(向下舍入)
|
||
基于 lower_price 的偏移量进行计算
|
||
"""
|
||
# 计算相对于下沿的偏移量
|
||
offset = price - self.lower_price
|
||
# 舍入到最接近的网格间距
|
||
grid_count = int(offset / self.step)
|
||
# 返回对应的网格价格
|
||
return self.lower_price + grid_count * self.step
|
||
|
||
def get_summary(self) -> Dict[str, Any]:
|
||
"""获取当前持仓的完整摘要"""
|
||
filled_grids = sum(1 for gs in self.grid_states.values() if gs.status == "filled")
|
||
return {
|
||
"pair": self.pair,
|
||
"current_price": self.current_price,
|
||
"total_quantity": self.total_quantity,
|
||
"total_invested": self.total_invested,
|
||
"avg_entry_price": self.avg_entry_price,
|
||
"unrealized_profit": (self.current_price - self.avg_entry_price) * self.total_quantity if self.total_quantity > 0 else 0,
|
||
"filled_grids": filled_grids, # FILLED 的网格数
|
||
"empty_grids": self.total_grid_levels - filled_grids, # EMPTY 的网格数
|
||
"total_orders": len(self.order_fills),
|
||
"highest_price": self.highest_price if self.total_quantity > 0 else None,
|
||
"lowest_price": self.lowest_price if self.total_quantity > 0 else None,
|
||
}
|
||
|
||
def record_order_fill(self, order_fill: OrderFill) -> None:
|
||
"""
|
||
记录一个订单成交事件
|
||
|
||
这个方法应该在订单成交时立即调用,将真实的成交数据反映到网格对象
|
||
|
||
Args:
|
||
order_fill: OrderFill 对象,包含成交的所有信息
|
||
"""
|
||
self.order_fills.append(order_fill)
|
||
|
||
if order_fill.side == "buy":
|
||
# 买入订单成交
|
||
actual_cost = order_fill.filled_amount * order_fill.filled_price + order_fill.fee
|
||
|
||
self.total_invested += actual_cost
|
||
self.total_quantity += order_fill.filled_amount
|
||
|
||
# 更新平均价(考虑手续费)
|
||
if self.total_quantity > 0:
|
||
self.avg_entry_price = self.total_invested / self.total_quantity
|
||
|
||
print(f"[GridManager] {self.pair} 买入订单成交 - "
|
||
f"价格: {order_fill.filled_price:.2f}, "
|
||
f"数量: {order_fill.filled_amount:.6f}, "
|
||
f"手续费: {order_fill.fee:.4f} USDT, "
|
||
f"总持仓: {self.total_quantity:.6f}",
|
||
file=sys.stderr, flush=True)
|
||
|
||
# 记录到 grid_levels
|
||
if order_fill.filled_price not in self.grid_levels:
|
||
self.grid_levels[order_fill.filled_price] = GridLevel(
|
||
price=order_fill.filled_price,
|
||
quantity=order_fill.filled_amount,
|
||
entry_time=order_fill.timestamp,
|
||
status="open"
|
||
)
|
||
else:
|
||
self.grid_levels[order_fill.filled_price].quantity += order_fill.filled_amount
|
||
|
||
elif order_fill.side == "sell":
|
||
# 卖出订单成交
|
||
sell_proceeds = order_fill.filled_amount * order_fill.filled_price - order_fill.fee
|
||
|
||
self.total_invested -= order_fill.filled_amount * self.avg_entry_price
|
||
self.total_quantity -= order_fill.filled_amount
|
||
|
||
# 更新平均价
|
||
if self.total_quantity > 0:
|
||
self.avg_entry_price = self.total_invested / self.total_quantity
|
||
else:
|
||
self.avg_entry_price = 0.0
|
||
|
||
# 计算本次卖出的利润
|
||
sell_profit = sell_proceeds - (order_fill.filled_amount * self.avg_entry_price if self.avg_entry_price > 0 else 0)
|
||
profit_pct = (order_fill.filled_price - self.avg_entry_price) / self.avg_entry_price * 100 if self.avg_entry_price > 0 else 0
|
||
|
||
print(f"[GridManager] {self.pair} 卖出订单成交 - "
|
||
f"价格: {order_fill.filled_price:.2f}, "
|
||
f"数量: {order_fill.filled_amount:.6f}, "
|
||
f"手续费: {order_fill.fee:.4f} USDT, "
|
||
f"利润: {sell_profit:.2f} USDT ({profit_pct:.2f}%), "
|
||
f"剩余持仓: {self.total_quantity:.6f}",
|
||
file=sys.stderr, flush=True)
|
||
|
||
def sync_from_trade_object(self, trade: Any, candle_index: int) -> None:
|
||
"""
|
||
从 Freqtrade 的 Trade 对象同步状态
|
||
|
||
只同步持仓数量、成本、平均价等数值信息
|
||
网格 FILLED/EMPTY 状态由 apply_adjustment() 和 position_history 维护
|
||
|
||
Args:
|
||
trade: Freqtrade Trade 对象
|
||
candle_index: 当前蜡烛线索引
|
||
"""
|
||
# 从 trade 对象提取关键信息
|
||
self.total_quantity = trade.amount
|
||
self.total_invested = trade.stake_amount
|
||
self.avg_entry_price = trade.open_rate
|
||
|
||
# ✅ 只同步数值信息,不推导网格状态
|
||
# 网格状态由真实的加仓操作(apply_adjustment)维护
|
||
# 不再根据平均价假设所有低于平均价的网格都被加仓了
|
||
|
||
print(f"[GridManager] {self.pair} 从 Trade 同步 - 持仓: {self.total_quantity:.6f}, "
|
||
f"平均价: {self.avg_entry_price:.2f}, 成本: {self.total_invested:.2f}",
|
||
file=sys.stderr, flush=True)
|
||
|
||
def record_pending_order(self, order_id: str, adjustment: PositionRecord) -> None:
|
||
"""
|
||
记录一个待成交的订单
|
||
|
||
当策略决定建仓/加仓后,订单被提交给 Freqtrade
|
||
这个方法记录订单 ID 与策略决策的映射关系
|
||
|
||
Args:
|
||
order_id: Freqtrade 返回的订单 ID
|
||
adjustment: 对应的 PositionRecord 对象
|
||
"""
|
||
self.pending_orders[order_id] = adjustment
|
||
|
||
print(f"[GridManager] {self.pair} 记录待成交订单 - "
|
||
f"订单ID: {order_id}, "
|
||
f"类型: {adjustment.type.value}, "
|
||
f"价格: {adjustment.price:.2f}",
|
||
file=sys.stderr, flush=True)
|
||
|
||
def remove_pending_order(self, order_id: str) -> Optional[PositionRecord]:
|
||
"""
|
||
移除待成交订单(当订单成交或取消时调用)
|
||
|
||
Args:
|
||
order_id: 订单 ID
|
||
|
||
Returns:
|
||
对应的 PositionRecord,如果存在
|
||
"""
|
||
return self.pending_orders.pop(order_id, None)
|
||
|
||
def init_redis(self, redis_url: str) -> bool:
|
||
"""
|
||
初始化 Redis 连接
|
||
|
||
Args:
|
||
redis_url: Redis URL,如 "redis://192.168.1.215:6379/0"
|
||
|
||
Returns:
|
||
是否连接成功
|
||
|
||
Raises:
|
||
ConnectionError: 如果 Redis 连接失败
|
||
"""
|
||
if not REDIS_AVAILABLE:
|
||
raise RuntimeError(f"[GridManager] {self.pair} Redis 包未安装,无法继续")
|
||
|
||
try:
|
||
# 解析 Redis URL
|
||
parsed = urlparse(redis_url)
|
||
self.redis_client = redis.Redis(
|
||
host=parsed.hostname or 'localhost',
|
||
port=parsed.port or 6379,
|
||
db=int(parsed.path.strip('/') or 0),
|
||
decode_responses=True,
|
||
socket_connect_timeout=5,
|
||
socket_keepalive=True
|
||
)
|
||
|
||
# 测试连接
|
||
self.redis_client.ping()
|
||
print(f"[GridManager] {self.pair} Redis 连接成功 - Key: {self.redis_key}",
|
||
file=sys.stderr, flush=True)
|
||
return True
|
||
|
||
except Exception as e:
|
||
error_msg = f"[GridManager] {self.pair} Redis 连接失败: {str(e)}"
|
||
print(error_msg, file=sys.stderr, flush=True)
|
||
self.redis_client = None
|
||
raise ConnectionError(error_msg)
|
||
|
||
def sync_grid_state_to_redis(self) -> None:
|
||
"""
|
||
将当前的网格状态同步到 Redis
|
||
包我每个网格的 FILLED/EMPTY 状态
|
||
"""
|
||
if not self.redis_client:
|
||
return
|
||
|
||
try:
|
||
# 构建网格状态字典
|
||
grid_states_data = {}
|
||
filled_count = 0
|
||
for price, grid_level in self.grid_states.items():
|
||
# 使用未旤的串串化,避免浮点数精度问题
|
||
price_key = f"{price:.8f}".rstrip('0').rstrip('.')
|
||
grid_states_data[price_key] = {
|
||
"price": price,
|
||
"status": grid_level.status,
|
||
"quantity": grid_level.quantity,
|
||
"entry_price": grid_level.entry_price,
|
||
"entry_time": grid_level.entry_time
|
||
}
|
||
if grid_level.status == "filled":
|
||
filled_count += 1
|
||
|
||
# 存存到 Redis
|
||
redis_key = f"grid_state:{self.pair}"
|
||
self.redis_client.set(
|
||
redis_key,
|
||
json.dumps(grid_states_data),
|
||
ex=86400 # 24 小时过期
|
||
)
|
||
|
||
print(f"[GridManager] {self.pair} 网格状态同步到 Redis - "
|
||
f"{filled_count} FILLED, {len(grid_states_data) - filled_count} EMPTY",
|
||
file=sys.stderr, flush=True)
|
||
|
||
except Exception as e:
|
||
print(f"[GridManager] {self.pair} 网格状态同步失败: {str(e)}",
|
||
file=sys.stderr, flush=True)
|
||
|
||
def recover_grid_state_from_redis(self) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
从 Redis 恢复网格状态
|
||
"""
|
||
if not self.redis_client:
|
||
return None
|
||
|
||
try:
|
||
redis_key = f"grid_state:{self.pair}"
|
||
data = self.redis_client.get(redis_key)
|
||
|
||
if not data:
|
||
return None
|
||
|
||
return json.loads(data)
|
||
|
||
except Exception as e:
|
||
print(f"[GridManager] {self.pair} 从 Redis 恢复网格状态失败: {str(e)}",
|
||
file=sys.stderr, flush=True)
|
||
return None
|
||
|
||
def restore_grid_state(self, grid_states_data: Dict[str, Any]) -> None:
|
||
"""
|
||
恢复网格状态
|
||
"""
|
||
try:
|
||
restored_count = 0
|
||
for price_key, state_data in grid_states_data.items():
|
||
# 能儫不同格式: 情况 1:新格式(同 state_data 中有 price 字段)
|
||
if "price" in state_data:
|
||
price = state_data["price"]
|
||
else:
|
||
# 情况 2:旧格式(直接从键名转换)
|
||
price = float(price_key)
|
||
|
||
# 找到最接近的存在网格点(处理浮点数精度问题)
|
||
closest_price = None
|
||
min_diff = float('inf')
|
||
for grid_price in self.grid_states.keys():
|
||
diff = abs(grid_price - price)
|
||
if diff < min_diff:
|
||
min_diff = diff
|
||
closest_price = grid_price
|
||
|
||
if closest_price is not None and min_diff < 1e-6: # 中仂茇平
|
||
grid_level = self.grid_states[closest_price]
|
||
grid_level.status = state_data.get("status", "empty")
|
||
grid_level.quantity = state_data.get("quantity", 0.0)
|
||
grid_level.entry_price = state_data.get("entry_price", 0.0)
|
||
grid_level.entry_time = state_data.get("entry_time", 0)
|
||
restored_count += 1
|
||
|
||
print(f"[GridManager] {self.pair} 网格状态恢复完成 - 恢复 {restored_count}/{len(grid_states_data)} 个网格",
|
||
file=sys.stderr, flush=True)
|
||
|
||
except Exception as e:
|
||
print(f"[GridManager] {self.pair} 恢复网格状态失败: {str(e)}",
|
||
file=sys.stderr, flush=True)
|
||
|
||
def report_to_redis(self) -> None:
|
||
"""
|
||
将当前状态同步到 Redis
|
||
"""
|
||
if not self.redis_client:
|
||
return
|
||
|
||
try:
|
||
# 构建状态报告
|
||
report = {
|
||
"timestamp": datetime.now().isoformat(),
|
||
"candle_index": self.candle_index,
|
||
"current_price": self.current_price,
|
||
"total_quantity": self.total_quantity,
|
||
"total_invested": self.total_invested,
|
||
"avg_entry_price": self.avg_entry_price,
|
||
"unrealized_profit": (self.current_price - self.avg_entry_price) * self.total_quantity if self.total_quantity > 0 else 0,
|
||
"active_grid_levels": len(self.grid_levels),
|
||
"max_positions_ever": self.max_positions,
|
||
"total_orders": len(self.order_fills),
|
||
"highest_price": self.highest_price if self.total_quantity > 0 else None,
|
||
"lowest_price": self.lowest_price if self.total_quantity > 0 else None,
|
||
}
|
||
|
||
# 存储到 Redis
|
||
self.redis_client.set(
|
||
self.redis_key,
|
||
json.dumps(report),
|
||
ex=3600 # 1 小时过期
|
||
)
|
||
|
||
except Exception as e:
|
||
print(f"[GridManager] {self.pair} Redis 写入失败: {str(e)}",
|
||
file=sys.stderr, flush=True)
|
||
|
||
def generate_report(self) -> str:
|
||
"""
|
||
生成人类可读的状态报告
|
||
|
||
Returns:
|
||
格式化的状态报告字符串
|
||
"""
|
||
unrealized = (self.current_price - self.avg_entry_price) * self.total_quantity if self.total_quantity > 0 else 0
|
||
unrealized_pct = (self.current_price - self.avg_entry_price) / self.avg_entry_price * 100 if self.avg_entry_price > 0 else 0
|
||
|
||
report = f"""
|
||
{'='*80}
|
||
[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 网格交易报告 - {self.pair}
|
||
{'='*80}
|
||
【持仓状态】
|
||
当前价格: {self.current_price:.4f} USDT
|
||
总持仓数量: {self.total_quantity:.8f}
|
||
平均入场价: {self.avg_entry_price:.4f} USDT
|
||
总成本投入: {self.total_invested:.2f} USDT
|
||
|
||
【盈亏状态】
|
||
未实现盈亏: {unrealized:.2f} USDT
|
||
未实现盈亏率: {unrealized_pct:.2f}%
|
||
日期收益状态: {'亏损' if unrealized < 0 else '盈利'}
|
||
|
||
【网格样态】
|
||
活跃网格点数: {len(self.grid_levels)}
|
||
历史最大同时持仓: {self.max_positions}
|
||
持仓期最高价: {self.highest_price:.4f if self.highest_price > 0 else 'N/A'} USDT
|
||
持仓期最低价: {self.lowest_price:.4f if self.lowest_price != float('inf') else 'N/A'} USDT
|
||
最低点亏损率: {(self.lowest_price - self.avg_entry_price) / self.avg_entry_price * 100 if self.lowest_price != float('inf') and self.avg_entry_price > 0 else 0:.2f}%
|
||
|
||
【交易统计】
|
||
已成交订单数: {len(self.order_fills)}
|
||
待成交订单数: {len(self.pending_orders)}
|
||
历史加减仓次数: {len(self.position_history)}
|
||
当前蜡烛线索引: {self.candle_index}
|
||
{'='*80}
|
||
"""
|
||
return report
|
||
|
||
def check_and_report(self) -> None:
|
||
"""
|
||
每隔指定间隔执行一次报告,并同步到 Redis
|
||
|
||
应该在 populate_indicators() 中调用
|
||
"""
|
||
if self.candle_index - self.last_report_candle < self.report_interval_candles:
|
||
return
|
||
|
||
# 打印报告
|
||
report_text = self.generate_report()
|
||
print(report_text, file=sys.stderr, flush=True)
|
||
|
||
# 同步到 Redis
|
||
self.report_to_redis()
|
||
|
||
# 更新本次报告时间
|
||
self.last_report_candle = self.candle_index
|
||
|
||
print(f"[GridManager] {self.pair} 报告已同步到 Redis",
|
||
file=sys.stderr, flush=True)
|
||
|
||
@classmethod
|
||
def recover_from_redis(cls, pair: str, redis_url: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
从 Redis 恢复一个 GridManager 对象
|
||
|
||
当 Docker 容器重启后,通过 Redis 中的持仓状态恢复 GridManager
|
||
|
||
Args:
|
||
pair: 币对名称,如 "BTC/USDT"
|
||
redis_url: Redis URL
|
||
|
||
Returns:
|
||
恢复后的 GridManager 对象,如果 Redis 中无数据返回 None
|
||
"""
|
||
if not REDIS_AVAILABLE:
|
||
print(f"[GridManager] {pair} Redis 未安装,无法恢复",
|
||
file=sys.stderr, flush=True)
|
||
return None
|
||
|
||
try:
|
||
# 连接 Redis
|
||
parsed = urlparse(redis_url)
|
||
redis_client = redis.Redis(
|
||
host=parsed.hostname or 'localhost',
|
||
port=parsed.port or 6379,
|
||
db=int(parsed.path.strip('/') or 0),
|
||
decode_responses=True,
|
||
socket_connect_timeout=5
|
||
)
|
||
|
||
# 测试连接
|
||
redis_client.ping()
|
||
|
||
# 尝试获取持仓数据
|
||
redis_key = f"grid:{pair}"
|
||
data = redis_client.get(redis_key)
|
||
|
||
if not data:
|
||
print(f"[GridManager] {pair} Redis 中无持仓数据,无法恢复",
|
||
file=sys.stderr, flush=True)
|
||
return None
|
||
|
||
# 解析 JSON
|
||
state = json.loads(data)
|
||
|
||
# 从 Redis 数据反向构建 GridManager
|
||
# 由于 Redis 中只有最新的快照,我们无法完全恢复所有历史
|
||
# 但可以恢复当前的持仓状态
|
||
print(f"[GridManager] {pair} 从 Redis 恢复持仓数据:",
|
||
file=sys.stderr, flush=True)
|
||
print(f" 持仓数量: {state.get('total_quantity')}",
|
||
file=sys.stderr, flush=True)
|
||
print(f" 平均价: {state.get('avg_entry_price')}",
|
||
file=sys.stderr, flush=True)
|
||
print(f" 总投入: {state.get('total_invested')}",
|
||
file=sys.stderr, flush=True)
|
||
|
||
return state # 返回原始数据,让调用者处理恢复
|
||
|
||
except Exception as e:
|
||
print(f"[GridManager] {pair} 从 Redis 恢复失败: {str(e)}",
|
||
file=sys.stderr, flush=True)
|
||
return None
|
||
|
||
def restore_from_redis_state(self, redis_state: Dict[str, Any]) -> None:
|
||
"""
|
||
从 Redis 恢复的状态数据中还原对象内部状态
|
||
|
||
Args:
|
||
redis_state: 从 recover_from_redis() 获取的状态字典
|
||
"""
|
||
try:
|
||
self.current_price = redis_state.get('current_price', 0)
|
||
self.total_quantity = redis_state.get('total_quantity', 0)
|
||
self.total_invested = redis_state.get('total_invested', 0)
|
||
self.avg_entry_price = redis_state.get('avg_entry_price', 0)
|
||
self.highest_price = redis_state.get('highest_price', 0) or 0
|
||
self.lowest_price = redis_state.get('lowest_price', float('inf')) or float('inf')
|
||
self.max_positions = redis_state.get('max_positions_ever', 0)
|
||
self.candle_index = redis_state.get('candle_index', 0)
|
||
|
||
# 重新初始化 grid_levels(从 avg_entry_price 推断)
|
||
if self.total_quantity > 0 and self.avg_entry_price > 0:
|
||
# 创建一个占位符网格点,表示当前持仓
|
||
grid_price = round(self.avg_entry_price / self.step) * self.step
|
||
self.grid_levels[grid_price] = GridLevel(
|
||
price=grid_price,
|
||
quantity=self.total_quantity,
|
||
entry_time=self.candle_index,
|
||
status="open"
|
||
)
|
||
|
||
print(f"[GridManager] {self.pair} 状态已从 Redis 恢复",
|
||
file=sys.stderr, flush=True)
|
||
|
||
except Exception as e:
|
||
print(f"[GridManager] {self.pair} 状态恢复失败: {str(e)}",
|
||
file=sys.stderr, flush=True)
|
||
|
||
def destroy(self) -> None:
|
||
"""
|
||
销毁网格管理器,清理 Redis 中存储的网格状态
|
||
通常在平仓后调用,表示该网格管理器不再有价值
|
||
"""
|
||
try:
|
||
if self.redis_client:
|
||
# 删除 Redis 中该币对的网格状态
|
||
self.redis_client.delete(self.redis_key)
|
||
print(f"[GridManager] {self.pair} 已从 Redis 删除网格状态",
|
||
file=sys.stderr, flush=True)
|
||
except Exception as e:
|
||
print(f"[GridManager] {self.pair} 删除 Redis 状态失败: {str(e)}",
|
||
file=sys.stderr, flush=True)
|
||
|
||
# 清空内部状态
|
||
self.grid_states.clear()
|
||
self.position_history.clear()
|
||
self.order_fills.clear()
|
||
self.total_quantity = 0.0
|
||
self.total_invested = 0.0
|
||
self.avg_entry_price = 0.0
|
||
|
||
print(f"[GridManager] {self.pair} 网格管理器已销毁,可以释放该对象",
|
||
file=sys.stderr, flush=True)
|