2025-11-27 20:01:42 +08:00

874 lines
36 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
网格交易管理器 - 维护单个币对的完整网格持仓生命周期
"""
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any
from enum import Enum
import sys
import json
from datetime import datetime
from urllib.parse import urlparse
try:
import redis
REDIS_AVAILABLE = True
except ImportError:
REDIS_AVAILABLE = False
class AdjustmentType(Enum):
"""加减仓类型"""
ENTRY = "entry" # 初始建仓
ADD = "add_position" # 加仓
REDUCE = "reduce_position" # 减仓
EXIT = "exit" # 全部平仓
@dataclass
class GridLevel:
"""单个网格点的状态"""
price: float # 网格价格
status: str # "filled" (已买入) 或 "empty" (未买/已卖出)
quantity: float # 该网格的持仓数量
entry_price: float # 该网格的实际买入价格
entry_time: int # 建仓蜡烛线索引
@dataclass
class PositionRecord:
"""单次加减仓记录"""
level_index: int # 网格级别序号
price: float # 操作价格
quantity: float # 操作份数
type: AdjustmentType # 操作类型
timestamp: int # 操作蜡烛线索引
@dataclass
class OrderFill:
"""订单成交信息"""
order_id: str # 订单 ID
pair: str # 币对
side: str # "buy" 或 "sell"
filled_price: float # 成交价格
filled_amount: float # 成交数量
fee: float # 手续费USDT
timestamp: int # 成交蜡烛线索引
reason: str = "" # 成交原因entry/add/exit 等)
class GridManager:
"""
网格交易管理器 - 核心思想:维护网格的填充/排空状态
网格逻辑:
- 下沿到当前价格 → 所有网格必须被填满(已买入持仓)
- 当前价格到上沿 → 所有网格必须被排空(不持有 或 已卖出)
决策规则:
1. 如果下方有空白网格 → 加仓(填满它们)
2. 如果上方有持仓网格 → 平仓(排空它们)
3. 维护一个网格状态数组,每个位置记录 "filled""empty"
"""
def __init__(self,
pair: str,
lower_price: float,
upper_price: float,
step: float,
stake_per_grid: float):
"""
初始化网格管理器
Args:
pair: 币对名称,如 "ETH/USDT"
lower_price: 网格下限价格
upper_price: 网格上限价格
step: 网格间距
stake_per_grid: 每个网格的投资额
"""
self.pair = pair
self.lower_price = lower_price
self.upper_price = upper_price
self.step = step
self.stake_per_grid = stake_per_grid
# 计算总网格数
self.total_grid_levels = int((upper_price - lower_price) / step) + 1
# 生成所有网格点价格
self.grid_prices = [lower_price + i * step for i in range(self.total_grid_levels)]
# 核心数据结构:网格状态数组
# grid_states[i] = GridLevel 对象记录该网格的状态filled/empty
self.grid_states: Dict[float, GridLevel] = {}
for price in self.grid_prices:
self.grid_states[price] = GridLevel(
price=price,
status="empty", # 初始状态:全部为 empty未持有
quantity=0.0, # 当前持仓数量
entry_price=0.0, # 买入价格
entry_time=0
)
# 持仓统计
self.position_history: List[PositionRecord] = [] # 历史加减仓记录
self.order_fills: List[OrderFill] = [] # 所有订单成交记录
self.pending_orders: Dict[str, PositionRecord] = {} # 待成交的订单映射
# 当前状态
self.current_price: Optional[float] = None
self.total_quantity: float = 0.0 # 总持仓数量
self.total_invested: float = 0.0 # 总投入金额
self.avg_entry_price: float = 0.0 # 平均建仓价
self.highest_price: float = 0.0 # 持仓期间最高价
self.lowest_price: float = float('inf') # 持仓期间最低价
# 调试
self.candle_index = 0 # 当前蜡烛线索引
# Redis 连接
self.redis_client: Optional[redis.Redis] = None
self.redis_key = f"grid:{pair}" # Redis 存储键
# 报告
self.last_report_candle = 0 # 上次报告的蜡烛线
self.report_interval_candles = 600 # 每 600 个蜡烛线报告一次(约 10h1h K线
# 控制标志:是否已从 Trade 对象初始化过持仓
self._synced_from_trade_once = False
# 控制标志:是否已经从 Trade 对象一次性初始化过网格状态(第一次有持仓时)
self._grid_initialized_from_trade = False
def update_state(self, current_price: float, candle_index: int) -> None:
"""
更新网格对象的当前状态
这个方法应该在每个蜡烛线调用一次,用最新的市场价格更新网格对象
Args:
current_price: 当前市场价格
candle_index: 当前蜡烛线索引(用于时间戳)
"""
self.current_price = current_price
self.candle_index = candle_index
# 更新最高/最低价
if self.total_quantity > 0:
if current_price > self.highest_price:
self.highest_price = current_price
if current_price < self.lowest_price:
self.lowest_price = current_price
# 统计网格状态
filled_count = sum(1 for gs in self.grid_states.values() if gs.status == "filled")
empty_count = len(self.grid_states) - filled_count
print(f"[GridManager] {self.pair} 状态更新 - 价格: {current_price:.2f}, "
f"持仓: {self.total_quantity:.6f}, 平均价: {self.avg_entry_price:.2f}, "
f"网格状态: {filled_count} FILLED / {empty_count} EMPTY",
file=sys.stderr, flush=True)
def apply_adjustment(self, adjustment: PositionRecord) -> None:
"""
应用一次加减仓操作,并更新网格状态
通过更新 grid_states 中相关网格的状态filled/empty来报告内部一致性
Args:
adjustment: PositionRecord 对象,包含加减仓的所有信息
"""
price = adjustment.price # ✅ 现在 price 已经是网格价格,不需要舍入
quantity = adjustment.quantity
adj_type = adjustment.type
# ✅ 直接使用 price 作为网格价格
grid_price = price
grid_state = self.grid_states.get(grid_price)
if adj_type == AdjustmentType.ENTRY or adj_type == AdjustmentType.ADD:
# 建仓或加仓 → 将该网格标记为 FILLED
if grid_state:
grid_state.status = "filled"
grid_state.quantity += quantity # ✅ 关键:增加该网格的持仓数量
grid_state.entry_price = price # 记录实际买入价
grid_state.entry_time = self.candle_index
# 更新持仓统计
self.total_invested += quantity * price
self.total_quantity += quantity
if self.total_quantity > 0:
self.avg_entry_price = self.total_invested / self.total_quantity
print(f"[GridManager] {self.pair} 加仓 - 网格 {grid_price:.2f} 标记为 FILLED, "
f"数量: {quantity:.6f}, 总持仓: {self.total_quantity:.6f}",
file=sys.stderr, flush=True)
elif adj_type == AdjustmentType.REDUCE:
# 减仓 → 其他网格的故事,不常用(网格交易通常只有 ENTRY/ADD/EXIT
if grid_state:
grid_state.quantity -= quantity
if grid_state.quantity <= 0:
grid_state.quantity = 0
self.total_quantity -= quantity
self.total_invested -= quantity * grid_state.entry_price if grid_state else 0
if self.total_quantity > 0:
self.avg_entry_price = self.total_invested / self.total_quantity
print(f"[GridManager] {self.pair} 减仓 - 网格 {grid_price:.2f}, "
f"减少: {quantity:.6f}, 剩余持仓: {self.total_quantity:.6f}",
file=sys.stderr, flush=True)
elif adj_type == AdjustmentType.EXIT:
# 平仓 → 将所有 FILLED 的网格一次排空(或不排空对象网格)
# 此处只是记录平仓决策,实际的排空是由 decide_adjustment 程序控制
print(f"[GridManager] {self.pair} 平仓 - 总持仓: {self.total_quantity:.6f}, "
f"平均价: {self.avg_entry_price:.2f}, 当前价: {price:.2f}",
file=sys.stderr, flush=True)
# 清空所有 FILLED 网格,下次套需要绍转建仓时重新填满
for gs in self.grid_states.values():
if gs.status == "filled":
gs.status = "empty"
gs.quantity = 0
gs.entry_price = 0.0
# 清空污了持仓计数
self.total_quantity = 0.0
self.total_invested = 0.0
self.avg_entry_price = 0.0
self.highest_price = 0.0
self.lowest_price = float('inf')
# 记录到历史
self.position_history.append(adjustment)
# ✅ 立即同步上次改变上 Redis
self.sync_grid_state_to_redis()
print(f"[GridManager] {self.pair} 网格状态已同步到 Redis",
file=sys.stderr, flush=True)
def decide_adjustment(self) -> Optional[PositionRecord]:
"""
判定是否需要加减仓,基于网格填充/排空逻辑
核心规则:
1. 下沿到当前价格 → 所有网格必须被 FILLED已买入
如果有 EMPTY 的网格 → 加仓填满它
2. 当前价格到上沿 → 所有网格必须被 EMPTY不持有
如果有 FILLED 的网格 → 平仓排空它
Returns:
PositionRecord 如果需要操作,否则 None
"""
if self.current_price is None:
return None
# 找到当前价格所在的网格点
current_grid_price = self._round_to_grid(self.current_price)
# 规则 2: 检查上方是否有需要平仓的网格
# 当前价格到上沿之间的网格都应该是 EMPTY
for grid_price in self.grid_prices:
if grid_price > current_grid_price:
grid_state = self.grid_states.get(grid_price)
if grid_state and grid_state.status == "filled" and grid_state.quantity > 0:
# 上方有持仓,需要平仓
print(f"[GridManager] {self.pair} 平仓信号 - 价格已涨到 {self.current_price:.2f}, "
f"上方网格 {grid_price:.2f} 有持仓需排空",
file=sys.stderr, flush=True)
# 平仓该网格
return PositionRecord(
level_index=self._price_to_level_index(grid_price),
price=self.current_price,
quantity=grid_state.quantity,
type=AdjustmentType.EXIT,
timestamp=self.candle_index
)
# 规则 1: 检查下方是否有需要加仓的网格
# 下沿到当前价格之间的网格都应该是 FILLED
for grid_price in self.grid_prices:
if grid_price <= current_grid_price:
grid_state = self.grid_states.get(grid_price)
if grid_state and grid_state.status == "empty" and grid_state.quantity == 0:
# 下方有空白网格,需要加仓
print(f"[GridManager] {self.pair} 加仓信号 - 价格已跌到 {self.current_price:.2f}, "
f"下方网格 {grid_price:.2f} 为空需填满",
file=sys.stderr, flush=True)
# ✅ 加仓该网格 - 关键修复price 应该是网格价格,而不是当前价格
return PositionRecord(
level_index=self._price_to_level_index(grid_price),
price=grid_price, # ✅ 修复:使用网格价格而不是当前价格
quantity=1.0,
type=AdjustmentType.ADD if self.total_quantity > 0 else AdjustmentType.ENTRY,
timestamp=self.candle_index
)
# 没有操作建议(所有网格状态都符合规则)
return None
def _price_to_level_index(self, price: float) -> int:
"""将价格转换为网格级别序号"""
index = int((price - self.lower_price) / self.step)
return max(0, min(index, self.total_grid_levels - 1))
def _round_to_grid(self, price: float) -> float:
"""
将价格舍入到最接近的网格点(向下舍入)
基于 lower_price 的偏移量进行计算
"""
# 计算相对于下沿的偏移量
offset = price - self.lower_price
# 舍入到最接近的网格间距
grid_count = int(offset / self.step)
# 返回对应的网格价格
return self.lower_price + grid_count * self.step
def get_summary(self) -> Dict[str, Any]:
"""获取当前持仓的完整摘要"""
filled_grids = sum(1 for gs in self.grid_states.values() if gs.status == "filled")
return {
"pair": self.pair,
"current_price": self.current_price,
"total_quantity": self.total_quantity,
"total_invested": self.total_invested,
"avg_entry_price": self.avg_entry_price,
"unrealized_profit": (self.current_price - self.avg_entry_price) * self.total_quantity if self.total_quantity > 0 else 0,
"filled_grids": filled_grids, # FILLED 的网格数
"empty_grids": self.total_grid_levels - filled_grids, # EMPTY 的网格数
"total_orders": len(self.order_fills),
"highest_price": self.highest_price if self.total_quantity > 0 else None,
"lowest_price": self.lowest_price if self.total_quantity > 0 else None,
}
def record_order_fill(self, order_fill: OrderFill) -> None:
"""
记录一个订单成交事件
这个方法应该在订单成交时立即调用,将真实的成交数据反映到网格对象
Args:
order_fill: OrderFill 对象,包含成交的所有信息
"""
self.order_fills.append(order_fill)
if order_fill.side == "buy":
# 买入订单成交
actual_cost = order_fill.filled_amount * order_fill.filled_price + order_fill.fee
self.total_invested += actual_cost
self.total_quantity += order_fill.filled_amount
# 更新平均价(考虑手续费)
if self.total_quantity > 0:
self.avg_entry_price = self.total_invested / self.total_quantity
print(f"[GridManager] {self.pair} 买入订单成交 - "
f"价格: {order_fill.filled_price:.2f}, "
f"数量: {order_fill.filled_amount:.6f}, "
f"手续费: {order_fill.fee:.4f} USDT, "
f"总持仓: {self.total_quantity:.6f}",
file=sys.stderr, flush=True)
# 记录到 grid_levels
if order_fill.filled_price not in self.grid_levels:
self.grid_levels[order_fill.filled_price] = GridLevel(
price=order_fill.filled_price,
quantity=order_fill.filled_amount,
entry_time=order_fill.timestamp,
status="open"
)
else:
self.grid_levels[order_fill.filled_price].quantity += order_fill.filled_amount
elif order_fill.side == "sell":
# 卖出订单成交
sell_proceeds = order_fill.filled_amount * order_fill.filled_price - order_fill.fee
self.total_invested -= order_fill.filled_amount * self.avg_entry_price
self.total_quantity -= order_fill.filled_amount
# 更新平均价
if self.total_quantity > 0:
self.avg_entry_price = self.total_invested / self.total_quantity
else:
self.avg_entry_price = 0.0
# 计算本次卖出的利润
sell_profit = sell_proceeds - (order_fill.filled_amount * self.avg_entry_price if self.avg_entry_price > 0 else 0)
profit_pct = (order_fill.filled_price - self.avg_entry_price) / self.avg_entry_price * 100 if self.avg_entry_price > 0 else 0
print(f"[GridManager] {self.pair} 卖出订单成交 - "
f"价格: {order_fill.filled_price:.2f}, "
f"数量: {order_fill.filled_amount:.6f}, "
f"手续费: {order_fill.fee:.4f} USDT, "
f"利润: {sell_profit:.2f} USDT ({profit_pct:.2f}%), "
f"剩余持仓: {self.total_quantity:.6f}",
file=sys.stderr, flush=True)
def sync_from_trade_object(self, trade: Any, candle_index: int) -> None:
"""
从 Freqtrade 的 Trade 对象同步状态
仅在第一次同步时更新网格状态。其他时候仅更新持仓股数,
不修改 FILLED/EMPTY 网格状态(由 apply_adjustment 专管管理)。
Args:
trade: Freqtrade Trade 对象
candle_index: 当前蜡烛线索引
"""
# 从 trade 对象提取关键信息
self.total_quantity = trade.amount
self.total_invested = trade.stake_amount
self.avg_entry_price = trade.open_rate
# 仅在第一次同步且没有任何 FILLED 网格时,根据均价更新所有网格的状态
# 注意:使用 _grid_initialized_from_trade 而不是 _synced_from_trade_once
# 因为后者可能被重置但我们不希望此时重新初始化网格
filled_count = sum(1 for gs in self.grid_states.values() if gs.status == "filled")
# ✅ 只有在没有任何 FILLED 网格时才从 Trade 初始化
if not self._grid_initialized_from_trade and self.total_quantity > 0 and filled_count == 0:
self._grid_initialized_from_trade = True
# 根据 Trade 对象的平均价,更新所有网格的状态
# 下沿 → 平均价:全部 FILLED
# 平均价 → 上沿:全部 EMPTY
avg_grid_price = self._round_to_grid(self.avg_entry_price)
print(f"[GridManager] {self.pair} 从 Trade 初始化 - 平均价: {self.avg_entry_price:.2f}, "
f"舍入后网格价: {avg_grid_price:.2f}",
file=sys.stderr, flush=True)
for price, grid_state in self.grid_states.items():
if price < avg_grid_price:
# 下沿:应该是 FILLED
if self.total_quantity > 0:
grid_state.status = "filled"
grid_state.entry_price = self.avg_entry_price
grid_state.entry_time = candle_index
elif price > avg_grid_price:
# 上沿:应该是 EMPTY
grid_state.status = "empty"
grid_state.quantity = 0
else:
# 平均价所在的网格
if self.total_quantity > 0:
grid_state.status = "filled"
grid_state.entry_price = self.avg_entry_price
grid_state.entry_time = candle_index
filled_count = sum(1 for gs in self.grid_states.values() if gs.status == "filled")
print(f"[GridManager] {self.pair} 从 Trade 对象初始化 - "
f"持仓: {self.total_quantity:.6f}, "
f"平均价: {self.avg_entry_price:.2f}, "
f"下沿到平均价的网格标记为 FILLED (共 {filled_count} 个)",
file=sys.stderr, flush=True)
else:
# 之后的同步:仅更新持仓信息,不修改网格状态
if not self._synced_from_trade_once and self.total_quantity == 0:
print(f"[GridManager] {self.pair} Trade 对象尚无持仓,跳过初始化",
file=sys.stderr, flush=True)
def record_pending_order(self, order_id: str, adjustment: PositionRecord) -> None:
"""
记录一个待成交的订单
当策略决定建仓/加仓后,订单被提交给 Freqtrade
这个方法记录订单 ID 与策略决策的映射关系
Args:
order_id: Freqtrade 返回的订单 ID
adjustment: 对应的 PositionRecord 对象
"""
self.pending_orders[order_id] = adjustment
print(f"[GridManager] {self.pair} 记录待成交订单 - "
f"订单ID: {order_id}, "
f"类型: {adjustment.type.value}, "
f"价格: {adjustment.price:.2f}",
file=sys.stderr, flush=True)
def remove_pending_order(self, order_id: str) -> Optional[PositionRecord]:
"""
移除待成交订单(当订单成交或取消时调用)
Args:
order_id: 订单 ID
Returns:
对应的 PositionRecord如果存在
"""
return self.pending_orders.pop(order_id, None)
def init_redis(self, redis_url: str) -> bool:
"""
初始化 Redis 连接
Args:
redis_url: Redis URL"redis://192.168.1.215:6379/0"
Returns:
是否连接成功
Raises:
ConnectionError: 如果 Redis 连接失败
"""
if not REDIS_AVAILABLE:
raise RuntimeError(f"[GridManager] {self.pair} Redis 包未安装,无法继续")
try:
# 解析 Redis URL
parsed = urlparse(redis_url)
self.redis_client = redis.Redis(
host=parsed.hostname or 'localhost',
port=parsed.port or 6379,
db=int(parsed.path.strip('/') or 0),
decode_responses=True,
socket_connect_timeout=5,
socket_keepalive=True
)
# 测试连接
self.redis_client.ping()
print(f"[GridManager] {self.pair} Redis 连接成功 - Key: {self.redis_key}",
file=sys.stderr, flush=True)
return True
except Exception as e:
error_msg = f"[GridManager] {self.pair} Redis 连接失败: {str(e)}"
print(error_msg, file=sys.stderr, flush=True)
self.redis_client = None
raise ConnectionError(error_msg)
def sync_grid_state_to_redis(self) -> None:
"""
将当前的网格状态同步到 Redis
包我每个网格的 FILLED/EMPTY 状态
"""
if not self.redis_client:
return
try:
# 构建网格状态字典
grid_states_data = {}
filled_count = 0
for price, grid_level in self.grid_states.items():
# 使用未旤的串串化,避免浮点数精度问题
price_key = f"{price:.8f}".rstrip('0').rstrip('.')
grid_states_data[price_key] = {
"price": price,
"status": grid_level.status,
"quantity": grid_level.quantity,
"entry_price": grid_level.entry_price,
"entry_time": grid_level.entry_time
}
if grid_level.status == "filled":
filled_count += 1
# 存存到 Redis
redis_key = f"grid_state:{self.pair}"
self.redis_client.set(
redis_key,
json.dumps(grid_states_data),
ex=86400 # 24 小时过期
)
print(f"[GridManager] {self.pair} 网格状态同步到 Redis - "
f"{filled_count} FILLED, {len(grid_states_data) - filled_count} EMPTY",
file=sys.stderr, flush=True)
except Exception as e:
print(f"[GridManager] {self.pair} 网格状态同步失败: {str(e)}",
file=sys.stderr, flush=True)
def recover_grid_state_from_redis(self) -> Optional[Dict[str, Any]]:
"""
从 Redis 恢复网格状态
"""
if not self.redis_client:
return None
try:
redis_key = f"grid_state:{self.pair}"
data = self.redis_client.get(redis_key)
if not data:
return None
return json.loads(data)
except Exception as e:
print(f"[GridManager] {self.pair} 从 Redis 恢复网格状态失败: {str(e)}",
file=sys.stderr, flush=True)
return None
def restore_grid_state(self, grid_states_data: Dict[str, Any]) -> None:
"""
恢复网格状态
"""
try:
restored_count = 0
for price_key, state_data in grid_states_data.items():
# 能儫不同格式: 情况 1新格式同 state_data 中有 price 字段)
if "price" in state_data:
price = state_data["price"]
else:
# 情况 2旧格式直接从键名转换
price = float(price_key)
# 找到最接近的存在网格点(处理浮点数精度问题)
closest_price = None
min_diff = float('inf')
for grid_price in self.grid_states.keys():
diff = abs(grid_price - price)
if diff < min_diff:
min_diff = diff
closest_price = grid_price
if closest_price is not None and min_diff < 1e-6: # 中仂茇平
grid_level = self.grid_states[closest_price]
grid_level.status = state_data.get("status", "empty")
grid_level.quantity = state_data.get("quantity", 0.0)
grid_level.entry_price = state_data.get("entry_price", 0.0)
grid_level.entry_time = state_data.get("entry_time", 0)
restored_count += 1
print(f"[GridManager] {self.pair} 网格状态恢复完成 - 恢复 {restored_count}/{len(grid_states_data)} 个网格",
file=sys.stderr, flush=True)
except Exception as e:
print(f"[GridManager] {self.pair} 恢复网格状态失败: {str(e)}",
file=sys.stderr, flush=True)
def report_to_redis(self) -> None:
"""
将当前状态同步到 Redis
"""
if not self.redis_client:
return
try:
# 构建状态报告
report = {
"timestamp": datetime.now().isoformat(),
"candle_index": self.candle_index,
"current_price": self.current_price,
"total_quantity": self.total_quantity,
"total_invested": self.total_invested,
"avg_entry_price": self.avg_entry_price,
"unrealized_profit": (self.current_price - self.avg_entry_price) * self.total_quantity if self.total_quantity > 0 else 0,
"active_grid_levels": len(self.grid_levels),
"max_positions_ever": self.max_positions,
"total_orders": len(self.order_fills),
"highest_price": self.highest_price if self.total_quantity > 0 else None,
"lowest_price": self.lowest_price if self.total_quantity > 0 else None,
}
# 存储到 Redis
self.redis_client.set(
self.redis_key,
json.dumps(report),
ex=3600 # 1 小时过期
)
except Exception as e:
print(f"[GridManager] {self.pair} Redis 写入失败: {str(e)}",
file=sys.stderr, flush=True)
def generate_report(self) -> str:
"""
生成人类可读的状态报告
Returns:
格式化的状态报告字符串
"""
unrealized = (self.current_price - self.avg_entry_price) * self.total_quantity if self.total_quantity > 0 else 0
unrealized_pct = (self.current_price - self.avg_entry_price) / self.avg_entry_price * 100 if self.avg_entry_price > 0 else 0
report = f"""
{'='*80}
[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 网格交易报告 - {self.pair}
{'='*80}
【持仓状态】
当前价格: {self.current_price:.4f} USDT
总持仓数量: {self.total_quantity:.8f}
平均入场价: {self.avg_entry_price:.4f} USDT
总成本投入: {self.total_invested:.2f} USDT
【盈亏状态】
未实现盈亏: {unrealized:.2f} USDT
未实现盈亏率: {unrealized_pct:.2f}%
日期收益状态: {'亏损' if unrealized < 0 else '盈利'}
【网格样态】
活跃网格点数: {len(self.grid_levels)}
历史最大同时持仓: {self.max_positions}
持仓期最高价: {self.highest_price:.4f if self.highest_price > 0 else 'N/A'} USDT
持仓期最低价: {self.lowest_price:.4f if self.lowest_price != float('inf') else 'N/A'} USDT
最低点亏损率: {(self.lowest_price - self.avg_entry_price) / self.avg_entry_price * 100 if self.lowest_price != float('inf') and self.avg_entry_price > 0 else 0:.2f}%
【交易统计】
已成交订单数: {len(self.order_fills)}
待成交订单数: {len(self.pending_orders)}
历史加减仓次数: {len(self.position_history)}
当前蜡烛线索引: {self.candle_index}
{'='*80}
"""
return report
def check_and_report(self) -> None:
"""
每隔指定间隔执行一次报告,并同步到 Redis
应该在 populate_indicators() 中调用
"""
if self.candle_index - self.last_report_candle < self.report_interval_candles:
return
# 打印报告
report_text = self.generate_report()
print(report_text, file=sys.stderr, flush=True)
# 同步到 Redis
self.report_to_redis()
# 更新本次报告时间
self.last_report_candle = self.candle_index
print(f"[GridManager] {self.pair} 报告已同步到 Redis",
file=sys.stderr, flush=True)
@classmethod
def recover_from_redis(cls, pair: str, redis_url: str) -> Optional[Dict[str, Any]]:
"""
从 Redis 恢复一个 GridManager 对象
当 Docker 容器重启后,通过 Redis 中的持仓状态恢复 GridManager
Args:
pair: 币对名称,如 "BTC/USDT"
redis_url: Redis URL
Returns:
恢复后的 GridManager 对象,如果 Redis 中无数据返回 None
"""
if not REDIS_AVAILABLE:
print(f"[GridManager] {pair} Redis 未安装,无法恢复",
file=sys.stderr, flush=True)
return None
try:
# 连接 Redis
parsed = urlparse(redis_url)
redis_client = redis.Redis(
host=parsed.hostname or 'localhost',
port=parsed.port or 6379,
db=int(parsed.path.strip('/') or 0),
decode_responses=True,
socket_connect_timeout=5
)
# 测试连接
redis_client.ping()
# 尝试获取持仓数据
redis_key = f"grid:{pair}"
data = redis_client.get(redis_key)
if not data:
print(f"[GridManager] {pair} Redis 中无持仓数据,无法恢复",
file=sys.stderr, flush=True)
return None
# 解析 JSON
state = json.loads(data)
# 从 Redis 数据反向构建 GridManager
# 由于 Redis 中只有最新的快照,我们无法完全恢复所有历史
# 但可以恢复当前的持仓状态
print(f"[GridManager] {pair} 从 Redis 恢复持仓数据:",
file=sys.stderr, flush=True)
print(f" 持仓数量: {state.get('total_quantity')}",
file=sys.stderr, flush=True)
print(f" 平均价: {state.get('avg_entry_price')}",
file=sys.stderr, flush=True)
print(f" 总投入: {state.get('total_invested')}",
file=sys.stderr, flush=True)
return state # 返回原始数据,让调用者处理恢复
except Exception as e:
print(f"[GridManager] {pair} 从 Redis 恢复失败: {str(e)}",
file=sys.stderr, flush=True)
return None
def restore_from_redis_state(self, redis_state: Dict[str, Any]) -> None:
"""
从 Redis 恢复的状态数据中还原对象内部状态
Args:
redis_state: 从 recover_from_redis() 获取的状态字典
"""
try:
self.current_price = redis_state.get('current_price', 0)
self.total_quantity = redis_state.get('total_quantity', 0)
self.total_invested = redis_state.get('total_invested', 0)
self.avg_entry_price = redis_state.get('avg_entry_price', 0)
self.highest_price = redis_state.get('highest_price', 0) or 0
self.lowest_price = redis_state.get('lowest_price', float('inf')) or float('inf')
self.max_positions = redis_state.get('max_positions_ever', 0)
self.candle_index = redis_state.get('candle_index', 0)
# 重新初始化 grid_levels从 avg_entry_price 推断)
if self.total_quantity > 0 and self.avg_entry_price > 0:
# 创建一个占位符网格点,表示当前持仓
grid_price = round(self.avg_entry_price / self.step) * self.step
self.grid_levels[grid_price] = GridLevel(
price=grid_price,
quantity=self.total_quantity,
entry_time=self.candle_index,
status="open"
)
print(f"[GridManager] {self.pair} 状态已从 Redis 恢复",
file=sys.stderr, flush=True)
except Exception as e:
print(f"[GridManager] {self.pair} 状态恢复失败: {str(e)}",
file=sys.stderr, flush=True)
def destroy(self) -> None:
"""
销毁网格管理器,清理 Redis 中存储的网格状态
通常在平仓后调用,表示该网格管理器不再有价值
"""
try:
if self.redis_client:
# 删除 Redis 中该币对的网格状态
self.redis_client.delete(self.redis_key)
print(f"[GridManager] {self.pair} 已从 Redis 删除网格状态",
file=sys.stderr, flush=True)
except Exception as e:
print(f"[GridManager] {self.pair} 删除 Redis 状态失败: {str(e)}",
file=sys.stderr, flush=True)
# 清空内部状态
self.grid_states.clear()
self.position_history.clear()
self.order_fills.clear()
self.total_quantity = 0.0
self.total_invested = 0.0
self.avg_entry_price = 0.0
print(f"[GridManager] {self.pair} 网格管理器已销毁,可以释放该对象",
file=sys.stderr, flush=True)