zhangkun9038@dingtalk.com 9aa25a1582 fix
2025-11-27 15:53:51 +08:00

689 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
网格交易管理器 - 维护单个币对的完整网格持仓生命周期
"""
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any
from enum import Enum
import sys
import json
from datetime import datetime
from urllib.parse import urlparse
try:
import redis
REDIS_AVAILABLE = True
except ImportError:
REDIS_AVAILABLE = False
class AdjustmentType(Enum):
"""加减仓类型"""
ENTRY = "entry" # 初始建仓
ADD = "add_position" # 加仓
REDUCE = "reduce_position" # 减仓
EXIT = "exit" # 全部平仓
@dataclass
class GridLevel:
"""单个网格点记录"""
price: float # 网格价格
quantity: float # 该价格的持仓数量
entry_time: int # 建仓蜡烛线索引
status: str # "open" 或 "closed"
@dataclass
class PositionRecord:
"""单次加减仓记录"""
level_index: int # 网格级别序号
price: float # 操作价格
quantity: float # 操作份数
type: AdjustmentType # 操作类型
timestamp: int # 操作蜡烛线索引
@dataclass
class OrderFill:
"""订单成交信息"""
order_id: str # 订单 ID
pair: str # 币对
side: str # "buy" 或 "sell"
filled_price: float # 成交价格
filled_amount: float # 成交数量
fee: float # 手续费USDT
timestamp: int # 成交蜡烛线索引
reason: str = "" # 成交原因entry/add/exit 等)
class GridManager:
"""
网格交易管理器
维护单个币对(如 ETH/USDT的完整网格持仓生命周期
包含:
- 网格参数管理
- 持仓状态跟踪
- 加减仓决策
"""
def __init__(self,
pair: str,
lower_price: float,
upper_price: float,
step: float,
stake_per_grid: float):
"""
初始化网格管理器
Args:
pair: 币对名称,如 "ETH/USDT"
lower_price: 网格下限价格,如 1500
upper_price: 网格上限价格,如 4500
step: 网格间距,如 50
stake_per_grid: 每个网格的投资额,如 40 USDT
"""
self.pair = pair
self.lower_price = lower_price
self.upper_price = upper_price
self.step = step
self.stake_per_grid = stake_per_grid
# 计算总网格数
self.total_grid_levels = int((upper_price - lower_price) / step) + 1
# 生成所有网格点价格
self.grid_prices = [lower_price + i * step for i in range(self.total_grid_levels)]
# 持仓状态
self.grid_levels: Dict[float, GridLevel] = {} # 价格 -> GridLevel
self.position_history: List[PositionRecord] = [] # 历史加减仓记录
# 订单执行历史
self.order_fills: List[OrderFill] = [] # 所有订单成交记录
self.pending_orders: Dict[str, PositionRecord] = {} # 待成交的订单映射
# 当前状态
self.current_price: Optional[float] = None
self.total_quantity: float = 0.0 # 总持仓数量
self.total_invested: float = 0.0 # 总投入金额
self.avg_entry_price: float = 0.0 # 平均建仓价
self.highest_price: float = 0.0 # 持仓期间最高价
self.lowest_price: float = float('inf') # 持仓期间最低价
self.max_positions: int = 0 # 历史最大持仓数
# 调试
self.candle_index = 0 # 当前蜡烛线索引
# Redis 连接
self.redis_client: Optional[redis.Redis] = None
self.redis_key = f"grid:{pair}" # Redis 存储键
# 报告
self.last_report_candle = 0 # 上次报告的蜡烛线
self.report_interval_candles = 600 # 每 600 个蜡烛线报告一次(约 10h1h K线
def update_state(self, current_price: float, candle_index: int) -> None:
"""
更新网格对象的当前状态
这个方法应该在每个蜡烛线调用一次,用最新的市场价格更新网格对象
Args:
current_price: 当前市场价格
candle_index: 当前蜡烛线索引(用于时间戳)
"""
self.current_price = current_price
self.candle_index = candle_index
# 更新最高/最低价
if self.total_quantity > 0:
if current_price > self.highest_price:
self.highest_price = current_price
if current_price < self.lowest_price:
self.lowest_price = current_price
print(f"[GridManager] {self.pair} 状态更新 - 价格: {current_price:.2f}, "
f"持仓: {self.total_quantity:.6f}, 平均价: {self.avg_entry_price:.2f}",
file=sys.stderr, flush=True)
def apply_adjustment(self, adjustment: PositionRecord) -> None:
"""
应用一次加减仓操作(来自策略的决策)
Args:
adjustment: PositionRecord 对象,包含加减仓的所有信息
"""
price = adjustment.price
quantity = adjustment.quantity
adj_type = adjustment.type
if adj_type == AdjustmentType.ENTRY or adj_type == AdjustmentType.ADD:
# 建仓或加仓
if price not in self.grid_levels:
self.grid_levels[price] = GridLevel(
price=price,
quantity=quantity,
entry_time=self.candle_index,
status="open"
)
else:
self.grid_levels[price].quantity += quantity
# 更新总持仓
old_total = self.total_quantity
self.total_invested += quantity * price
self.total_quantity += quantity
# 更新平均价
if self.total_quantity > 0:
self.avg_entry_price = self.total_invested / self.total_quantity
# 更新最大持仓数
if len(self.grid_levels) > self.max_positions:
self.max_positions = len(self.grid_levels)
print(f"[GridManager] {self.pair} 加仓 - 价格: {price:.2f}, "
f"数量: {quantity:.6f}, 总持仓: {self.total_quantity:.6f}",
file=sys.stderr, flush=True)
elif adj_type == AdjustmentType.REDUCE:
# 减仓
if price in self.grid_levels:
self.grid_levels[price].quantity -= quantity
if self.grid_levels[price].quantity <= 0:
del self.grid_levels[price]
self.total_quantity -= quantity
self.total_invested -= quantity * price
if self.total_quantity > 0:
self.avg_entry_price = self.total_invested / self.total_quantity
print(f"[GridManager] {self.pair} 减仓 - 价格: {price:.2f}, "
f"数量: {quantity:.6f}, 剩余持仓: {self.total_quantity:.6f}",
file=sys.stderr, flush=True)
elif adj_type == AdjustmentType.EXIT:
# 全部平仓
print(f"[GridManager] {self.pair} 全部平仓 - 持仓: {self.total_quantity:.6f}, "
f"平均价: {self.avg_entry_price:.2f}, 当前价: {price:.2f}",
file=sys.stderr, flush=True)
self.grid_levels.clear()
self.total_quantity = 0.0
self.total_invested = 0.0
self.avg_entry_price = 0.0
self.highest_price = 0.0
self.lowest_price = float('inf')
# 记录到历史
self.position_history.append(adjustment)
def decide_adjustment(self) -> Optional[PositionRecord]:
"""
判定是否需要加减仓,并返回建议
核心逻辑:
1. 如果还没有建仓过,且价格在网格范围内 → 初始建仓
2. 如果已有头寸,价格跌入新的更低网格点 → 加仓
3. 如果已有头寸,价格涨超过平均价 → 全部平仓
4. 如果已有多个头寸,价格涨到最高点 → 可选:部分减仓
Returns:
PositionRecord 如果需要操作,否则 None
"""
if self.current_price is None:
return None
# 情况 1: 还没有持仓,且价格在范围内 → 初始建仓
if self.total_quantity == 0:
if self.lower_price <= self.current_price <= self.upper_price:
# 找到当前价格对应的网格点(向上舍入)
grid_price = (int(self.current_price / self.step) + 1) * self.step
if grid_price > self.upper_price:
grid_price = self.upper_price
print(f"[GridManager] {self.pair} 初始建仓建议 - 价格: {grid_price:.2f}",
file=sys.stderr, flush=True)
return PositionRecord(
level_index=self._price_to_level_index(grid_price),
price=self.current_price,
quantity=1.0, # 1个单位实际金额由策略乘以 stake_per_grid
type=AdjustmentType.ENTRY,
timestamp=self.candle_index
)
# 情况 2: 已有持仓,价格涨超过平均价 → 全部平仓
if self.total_quantity > 0 and self.current_price > self.avg_entry_price:
profit_pct = (self.current_price - self.avg_entry_price) / self.avg_entry_price * 100
print(f"[GridManager] {self.pair} 平仓建议 - 利润: {profit_pct:.2f}%",
file=sys.stderr, flush=True)
return PositionRecord(
level_index=0,
price=self.current_price,
quantity=self.total_quantity,
type=AdjustmentType.EXIT,
timestamp=self.candle_index
)
# 情况 3: 已有持仓,价格跌入新的更低网格点 → 加仓
if self.total_quantity > 0:
# 找到当前价格最接近的网格点(向下舍入)
current_grid_level = int(self.current_price / self.step) * self.step
# 检查这个价格是否已经加仓过
if current_grid_level not in self.grid_levels and current_grid_level >= self.lower_price:
# 检查是否还有加仓空间
if len(self.grid_levels) < self.total_grid_levels:
print(f"[GridManager] {self.pair} 加仓建议 - 价格: {current_grid_level:.2f}, "
f"已有网格数: {len(self.grid_levels)}",
file=sys.stderr, flush=True)
return PositionRecord(
level_index=self._price_to_level_index(current_grid_level),
price=self.current_price,
quantity=1.0,
type=AdjustmentType.ADD,
timestamp=self.candle_index
)
# 没有操作建议
return None
def _price_to_level_index(self, price: float) -> int:
"""将价格转换为网格级别序号"""
index = int((price - self.lower_price) / self.step)
return max(0, min(index, self.total_grid_levels - 1))
def get_summary(self) -> Dict[str, Any]:
"""获取当前持仓的完整摘要"""
return {
"pair": self.pair,
"current_price": self.current_price,
"total_quantity": self.total_quantity,
"total_invested": self.total_invested,
"avg_entry_price": self.avg_entry_price,
"unrealized_profit": (self.current_price - self.avg_entry_price) * self.total_quantity if self.total_quantity > 0 else 0,
"active_grid_levels": len(self.grid_levels),
"max_positions_ever": self.max_positions,
"total_adjustments": len(self.position_history),
"highest_price": self.highest_price if self.total_quantity > 0 else None,
"lowest_price": self.lowest_price if self.total_quantity > 0 else None,
}
def record_order_fill(self, order_fill: OrderFill) -> None:
"""
记录一个订单成交事件
这个方法应该在订单成交时立即调用,将真实的成交数据反映到网格对象
Args:
order_fill: OrderFill 对象,包含成交的所有信息
"""
self.order_fills.append(order_fill)
if order_fill.side == "buy":
# 买入订单成交
actual_cost = order_fill.filled_amount * order_fill.filled_price + order_fill.fee
self.total_invested += actual_cost
self.total_quantity += order_fill.filled_amount
# 更新平均价(考虑手续费)
if self.total_quantity > 0:
self.avg_entry_price = self.total_invested / self.total_quantity
print(f"[GridManager] {self.pair} 买入订单成交 - "
f"价格: {order_fill.filled_price:.2f}, "
f"数量: {order_fill.filled_amount:.6f}, "
f"手续费: {order_fill.fee:.4f} USDT, "
f"总持仓: {self.total_quantity:.6f}",
file=sys.stderr, flush=True)
# 记录到 grid_levels
if order_fill.filled_price not in self.grid_levels:
self.grid_levels[order_fill.filled_price] = GridLevel(
price=order_fill.filled_price,
quantity=order_fill.filled_amount,
entry_time=order_fill.timestamp,
status="open"
)
else:
self.grid_levels[order_fill.filled_price].quantity += order_fill.filled_amount
elif order_fill.side == "sell":
# 卖出订单成交
sell_proceeds = order_fill.filled_amount * order_fill.filled_price - order_fill.fee
self.total_invested -= order_fill.filled_amount * self.avg_entry_price
self.total_quantity -= order_fill.filled_amount
# 更新平均价
if self.total_quantity > 0:
self.avg_entry_price = self.total_invested / self.total_quantity
else:
self.avg_entry_price = 0.0
# 计算本次卖出的利润
sell_profit = sell_proceeds - (order_fill.filled_amount * self.avg_entry_price if self.avg_entry_price > 0 else 0)
profit_pct = (order_fill.filled_price - self.avg_entry_price) / self.avg_entry_price * 100 if self.avg_entry_price > 0 else 0
print(f"[GridManager] {self.pair} 卖出订单成交 - "
f"价格: {order_fill.filled_price:.2f}, "
f"数量: {order_fill.filled_amount:.6f}, "
f"手续费: {order_fill.fee:.4f} USDT, "
f"利润: {sell_profit:.2f} USDT ({profit_pct:.2f}%), "
f"剩余持仓: {self.total_quantity:.6f}",
file=sys.stderr, flush=True)
def sync_from_trade_object(self, trade: Any, candle_index: int) -> None:
"""
从 Freqtrade 的 Trade 对象同步状态
当无法直接捕获订单成交时,可以通过 Trade 对象反向同步状态
(备选方案,不如直接记录 OrderFill 准确)
Args:
trade: Freqtrade Trade 对象
candle_index: 当前蜡烛线索引
"""
# 从 trade 对象提取关键信息
self.total_quantity = trade.amount
self.total_invested = trade.stake_amount
self.avg_entry_price = trade.open_rate
# 重新初始化 grid_levels仅保留当前持仓
self.grid_levels.clear()
if self.total_quantity > 0 and self.avg_entry_price > 0:
# 创建一个体现当前持仓的网格点
# 依据平均价格找最接近的网格位置
grid_price = round(self.avg_entry_price / self.step) * self.step
self.grid_levels[grid_price] = GridLevel(
price=grid_price,
quantity=self.total_quantity,
entry_time=candle_index,
status="open"
)
# 如果 trade 有多个 entry加仓的情况
if hasattr(trade, 'trades') and trade.trades:
entries = [t for t in trade.trades if t.entry_side == 'entry']
self.max_positions = len(entries)
print(f"[GridManager] {self.pair} 从 Trade 对象同步状态 - "
f"持仓: {self.total_quantity:.6f}, "
f"投入: {self.total_invested:.2f} USDT, "
f"平均价: {self.avg_entry_price:.2f}, "
f"网格点数: {len(self.grid_levels)}",
file=sys.stderr, flush=True)
def record_pending_order(self, order_id: str, adjustment: PositionRecord) -> None:
"""
记录一个待成交的订单
当策略决定建仓/加仓后,订单被提交给 Freqtrade
这个方法记录订单 ID 与策略决策的映射关系
Args:
order_id: Freqtrade 返回的订单 ID
adjustment: 对应的 PositionRecord 对象
"""
self.pending_orders[order_id] = adjustment
print(f"[GridManager] {self.pair} 记录待成交订单 - "
f"订单ID: {order_id}, "
f"类型: {adjustment.type.value}, "
f"价格: {adjustment.price:.2f}",
file=sys.stderr, flush=True)
def remove_pending_order(self, order_id: str) -> Optional[PositionRecord]:
"""
移除待成交订单(当订单成交或取消时调用)
Args:
order_id: 订单 ID
Returns:
对应的 PositionRecord如果存在
"""
return self.pending_orders.pop(order_id, None)
def init_redis(self, redis_url: str) -> bool:
"""
初始化 Redis 连接
Args:
redis_url: Redis URL"redis://192.168.1.215:6379/0"
Returns:
是否连接成功
"""
if not REDIS_AVAILABLE:
print(f"[GridManager] {self.pair} Redis 未安装,跳过 Redis 连接",
file=sys.stderr, flush=True)
return False
try:
# 解析 Redis URL
parsed = urlparse(redis_url)
self.redis_client = redis.Redis(
host=parsed.hostname or 'localhost',
port=parsed.port or 6379,
db=int(parsed.path.strip('/') or 0),
decode_responses=True,
socket_connect_timeout=5,
socket_keepalive=True
)
# 测试连接
self.redis_client.ping()
print(f"[GridManager] {self.pair} Redis 连接成功 - Key: {self.redis_key}",
file=sys.stderr, flush=True)
return True
except Exception as e:
print(f"[GridManager] {self.pair} Redis 连接失败: {str(e)}",
file=sys.stderr, flush=True)
self.redis_client = None
return False
def report_to_redis(self) -> None:
"""
将当前状态同步到 Redis
"""
if not self.redis_client:
return
try:
# 构成状态报告
report = {
"timestamp": datetime.now().isoformat(),
"candle_index": self.candle_index,
"current_price": self.current_price,
"total_quantity": self.total_quantity,
"total_invested": self.total_invested,
"avg_entry_price": self.avg_entry_price,
"unrealized_profit": (self.current_price - self.avg_entry_price) * self.total_quantity if self.total_quantity > 0 else 0,
"active_grid_levels": len(self.grid_levels),
"max_positions_ever": self.max_positions,
"total_orders": len(self.order_fills),
"highest_price": self.highest_price if self.total_quantity > 0 else None,
"lowest_price": self.lowest_price if self.total_quantity > 0 else None,
}
# 存储到 Redis
self.redis_client.set(
self.redis_key,
json.dumps(report),
ex=3600 # 1 小时过期
)
except Exception as e:
print(f"[GridManager] {self.pair} Redis 写入失败: {str(e)}",
file=sys.stderr, flush=True)
def generate_report(self) -> str:
"""
生成人类可读的状态报告
Returns:
格式化的状态报告字符串
"""
unrealized = (self.current_price - self.avg_entry_price) * self.total_quantity if self.total_quantity > 0 else 0
unrealized_pct = (self.current_price - self.avg_entry_price) / self.avg_entry_price * 100 if self.avg_entry_price > 0 else 0
report = f"""
{'='*80}
[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 网格交易报告 - {self.pair}
{'='*80}
【持仓状态】
当前价格: {self.current_price:.4f} USDT
总持仓数量: {self.total_quantity:.8f}
平均入场价: {self.avg_entry_price:.4f} USDT
总成本投入: {self.total_invested:.2f} USDT
【盈亏状态】
未实现盈亏: {unrealized:.2f} USDT
未实现盈亏率: {unrealized_pct:.2f}%
日期收益状态: {'亏损' if unrealized < 0 else '盈利'}
【网格样态】
活跃网格点数: {len(self.grid_levels)}
历史最大同时持仓: {self.max_positions}
持仓期最高价: {self.highest_price:.4f if self.highest_price > 0 else 'N/A'} USDT
持仓期最低价: {self.lowest_price:.4f if self.lowest_price != float('inf') else 'N/A'} USDT
最低点亏损率: {(self.lowest_price - self.avg_entry_price) / self.avg_entry_price * 100 if self.lowest_price != float('inf') and self.avg_entry_price > 0 else 0:.2f}%
【交易统计】
已成交订单数: {len(self.order_fills)}
待成交订单数: {len(self.pending_orders)}
历史加减仓次数: {len(self.position_history)}
当前蜡烛线索引: {self.candle_index}
{'='*80}
"""
return report
def check_and_report(self) -> None:
"""
每隔指定间隔执行一次报告,并同步到 Redis
应该在 populate_indicators() 中调用
"""
if self.candle_index - self.last_report_candle < self.report_interval_candles:
return
# 打印报告
report_text = self.generate_report()
print(report_text, file=sys.stderr, flush=True)
# 同步到 Redis
self.report_to_redis()
# 更新本次报告时间
self.last_report_candle = self.candle_index
print(f"[GridManager] {self.pair} 报告已同步到 Redis",
file=sys.stderr, flush=True)
@classmethod
def recover_from_redis(cls, pair: str, redis_url: str) -> Optional[Dict[str, Any]]:
"""
从 Redis 恢复一个 GridManager 对象
当 Docker 容器重启后,通过 Redis 中的持仓状态恢复 GridManager
Args:
pair: 币对名称,如 "BTC/USDT"
redis_url: Redis URL
Returns:
恢复后的 GridManager 对象,如果 Redis 中无数据返回 None
"""
if not REDIS_AVAILABLE:
print(f"[GridManager] {pair} Redis 未安装,无法恢复",
file=sys.stderr, flush=True)
return None
try:
# 连接 Redis
parsed = urlparse(redis_url)
redis_client = redis.Redis(
host=parsed.hostname or 'localhost',
port=parsed.port or 6379,
db=int(parsed.path.strip('/') or 0),
decode_responses=True,
socket_connect_timeout=5
)
# 测试连接
redis_client.ping()
# 尝试获取持仓数据
redis_key = f"grid:{pair}"
data = redis_client.get(redis_key)
if not data:
print(f"[GridManager] {pair} Redis 中无持仓数据,无法恢复",
file=sys.stderr, flush=True)
return None
# 解析 JSON
state = json.loads(data)
# 从 Redis 数据反向构建 GridManager
# 由于 Redis 中只有最新的快照,我们无法完全恢复所有历史
# 但可以恢复当前的持仓状态
print(f"[GridManager] {pair} 从 Redis 恢复持仓数据:",
file=sys.stderr, flush=True)
print(f" 持仓数量: {state.get('total_quantity')}",
file=sys.stderr, flush=True)
print(f" 平均价: {state.get('avg_entry_price')}",
file=sys.stderr, flush=True)
print(f" 总投入: {state.get('total_invested')}",
file=sys.stderr, flush=True)
return state # 返回原始数据,让调用者处理恢复
except Exception as e:
print(f"[GridManager] {pair} 从 Redis 恢复失败: {str(e)}",
file=sys.stderr, flush=True)
return None
def restore_from_redis_state(self, redis_state: Dict[str, Any]) -> None:
"""
从 Redis 恢复的状态数据中还原对象内部状态
Args:
redis_state: 从 recover_from_redis() 获取的状态字典
"""
try:
self.current_price = redis_state.get('current_price', 0)
self.total_quantity = redis_state.get('total_quantity', 0)
self.total_invested = redis_state.get('total_invested', 0)
self.avg_entry_price = redis_state.get('avg_entry_price', 0)
self.highest_price = redis_state.get('highest_price', 0) or 0
self.lowest_price = redis_state.get('lowest_price', float('inf')) or float('inf')
self.max_positions = redis_state.get('max_positions_ever', 0)
self.candle_index = redis_state.get('candle_index', 0)
# 重新初始化 grid_levels从 avg_entry_price 推断)
if self.total_quantity > 0 and self.avg_entry_price > 0:
# 创建一个占位符网格点,表示当前持仓
grid_price = round(self.avg_entry_price / self.step) * self.step
self.grid_levels[grid_price] = GridLevel(
price=grid_price,
quantity=self.total_quantity,
entry_time=self.candle_index,
status="open"
)
print(f"[GridManager] {self.pair} 状态已从 Redis 恢复",
file=sys.stderr, flush=True)
except Exception as e:
print(f"[GridManager] {self.pair} 状态恢复失败: {str(e)}",
file=sys.stderr, flush=True)