462 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# user_data/strategies/StaticGrid.py
from freqtrade.strategy import IStrategy
from pandas import DataFrame
from typing import Optional, Dict, Any
import logging
import sys
from grid_manager import GridManager, OrderFill, AdjustmentType, GridLevel
logger = logging.getLogger(__name__)
class StaticGrid(IStrategy):
INTERFACE_VERSION = 3
timeframe = '1h'
can_short = False
minimal_roi = {"0": 0.001} # 极小收益就卖
stoploss = -0.99
use_exit_signal = True
position_adjustment_enable = False # 关闭加仓
cooldown_candles = 0
# Grid parameters - 基于价格的百分比
# BTC/USDT: 上下 20%
# ETH/USDT, SOL/USDT: 上下 33%
GRID_CONFIG = {
"BTC/USDT": {"percent": 0.20}, # 上下 20%
"ETH/USDT": {"percent": 0.33}, # 上下 33%
"SOL/USDT": {"percent": 0.33}, # 上下 33%
"TRX/USDT": {"percent": 0.33}, # 上下 33%
}
STAKE = 40.0 # 每个持仓周期的总投资额会分散到25个网格
GRIDS_PER_CYCLE = 25 # 每个持仓周期的网格数
def __init__(self, config: dict) -> None:
super().__init__(config)
self.grid_managers: Dict[str, GridManager] = {}
self.redis_available = 'redis' in config and 'url' in config.get('redis', {})
self.redis_url = config.get('redis', {}).get('url', '')
print("[StaticGrid] Strategy initialized!", file=sys.stderr, flush=True)
print(f"[StaticGrid] Redis available: {self.redis_available}", file=sys.stderr, flush=True)
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 在每个蜡烛线更新网格管理器状态
pair = metadata['pair']
if pair not in self.grid_managers:
self._init_grid_manager(pair)
grid_manager = self.grid_managers[pair]
current_price = dataframe['close'].iloc[-1]
candle_index = len(dataframe) - 1
print(f"[StaticGrid] {pair} populate_indicators - 价格: {current_price:.2f}, candle: {candle_index}",
file=sys.stderr, flush=True)
print(f"[StaticGrid] {pair} GridManager 范围: {grid_manager.lower_price:.2f}-{grid_manager.upper_price:.2f}",
file=sys.stderr, flush=True)
# 动态更新网格范围(仅在第一次初始化时)
# 检查是否需要从占位符更新为实际价格
# ✅ 修复只有在范围恰好是占位符1000-5000时才更新
has_placeholder = (abs(grid_manager.lower_price - 1000) < 0.1 and
abs(grid_manager.upper_price - 5000) < 0.1)
if has_placeholder:
# 第一次初始化,从占位符更新为实际价格
config = self.GRID_CONFIG.get(pair, {"percent": 0.20})
percent = config.get("percent", 0.20)
new_lower = current_price * (1 - percent)
new_upper = current_price * (1 + percent)
# ✅ 处理极端情况:如果上下沿舍入后相同,需要调整
# 这可能发生在低价币上(如 0.01 * 1.33 = 0.01330... 舍入到 0.01
range_size = new_upper - new_lower
if range_size <= 0 or abs(range_size) < 1e-8:
# 范围太小,动态扩大:使用固定的小数位精度
# 计算 current_price 的小数位数
price_str = f"{current_price:.10f}".rstrip('0')
decimal_places = len(price_str.split('.')[1]) if '.' in price_str else 0
# 基于小数位数动态设置步长
min_step = 10 ** (-decimal_places - 1) # 比最小单位少一位
# 设置范围:上下各 25 步(总共 50 个网格)
new_lower = max(min_step, current_price - min_step * 25) # 下方 25 步(不低于最小单位)
new_upper = current_price + min_step * 25 # 上方 25 步
step = min_step
print(f"[StaticGrid] {pair} 检测到价格范围过小,动态调整 - 小数位: {decimal_places}, 步长: {step:.10f}, 范围: {new_lower:.10f}-{new_upper:.10f}",
file=sys.stderr, flush=True)
else:
# 动态计算步长:范围 / 50
step = range_size / 50
# 更新 GridManager 的范围
grid_manager.lower_price = new_lower
grid_manager.upper_price = new_upper
# 重新计算总网格数
grid_manager.total_grid_levels = int((new_upper - new_lower) / step) + 1
grid_manager.grid_prices = [new_lower + i * step for i in range(grid_manager.total_grid_levels)]
grid_manager.step = step # 更新步长
# 重新初始化 grid_states仅在第一次之后保留已有的 FILLED 状态)
grid_manager.grid_states.clear()
for price in grid_manager.grid_prices:
grid_manager.grid_states[price] = GridLevel(
price=price,
status="empty",
quantity=0.0,
entry_price=0.0,
entry_time=0
)
# 重置同步标志,以便网格范围更新后的第一次 sync 能重新初始化
grid_manager._synced_from_trade_once = False
# ✅ 当网格范围更新时,清理 Redis 中的旧数据,重新开始
# 并不恢复旧数据,因为网格范围已经改变,网格位置名不对应
if self.redis_available:
try:
if grid_manager.redis_client:
grid_manager.redis_client.delete(grid_manager.redis_key)
print(f"[StaticGrid] {pair} 清理 Redis 中的旧数据",
file=sys.stderr, flush=True)
except:
pass
print(f"[StaticGrid] {pair} 更新网格范围 - 从 1000-5000 更新为 {new_lower:.2f}-{new_upper:.2f}",
file=sys.stderr, flush=True)
print(f"[StaticGrid] {pair} 新网格数: {grid_manager.total_grid_levels}, 步长: {step:.4f}",
file=sys.stderr, flush=True)
# 更新网格管理器状态
grid_manager.update_state(current_price, candle_index)
# ✅ 定期报告 - 每次加仓时或每5分钟输出盈亏统计面板
# 只要有持仓就定期输出(便于调试)
if grid_manager.total_quantity > 0 or candle_index % 5 == 0:
self._print_pnl_panel(pair, grid_manager)
return dataframe
def _init_grid_manager(self, pair: str) -> None:
"""初始化或恢复网格管理器"""
# 获取币对配置(如果不存在则使用默认值)
config = self.GRID_CONFIG.get(pair, {"percent": 0.20})
percent = config.get("percent", 0.20)
print(f"[StaticGrid] {pair} 网格配置 - 百分比: {percent*100:.0f}%",
file=sys.stderr, flush=True)
# 尝试从 Redis 恢复
recovered_state = None
if self.redis_available:
recovered_state = GridManager.recover_from_redis(pair, self.redis_url)
# 如果从 Redis 恢复了状态,使用恢复的价格作为基准
# 否则等待第一次 populate_indicators 更新价格后再初始化
if recovered_state:
current_price = recovered_state.get('current_price', 0)
print(f"[StaticGrid] {pair} 从 Redis 恢复价格: {current_price}",
file=sys.stderr, flush=True)
else:
# 这里暂时用占位符,会在 populate_indicators 中更新
current_price = 0
print(f"[StaticGrid] {pair} 使用占位符价格,会在第一次 populate_indicators 中更新",
file=sys.stderr, flush=True)
# 计算动态的上下沿
if current_price > 0:
lower_price = current_price * (1 - percent)
upper_price = current_price * (1 + percent)
print(f"[StaticGrid] {pair} 计算动态上下沿 - 价格: {current_price}, 范围: {lower_price:.2f}-{upper_price:.2f}",
file=sys.stderr, flush=True)
else:
# 如果没有价格信息,使用默认值(会在第一次更新时重新计算)
lower_price = 1000
upper_price = 5000
print(f"[StaticGrid] {pair} 使用默认上下沿: {lower_price}-{upper_price}",
file=sys.stderr, flush=True)
# 动态计算步长:范围 / 50
range_size = upper_price - lower_price
step = range_size / 50
print(f"[StaticGrid] {pair} 动态网格范围 - 下沿: {lower_price:.2f}, 上沿: {upper_price:.2f}, 步长: {step:.4f}",
file=sys.stderr, flush=True)
# 创建新的 GridManager
# ✅ stake_per_grid = 总投资额 / 网格数,保证每个网格投入一致
stake_per_grid = self.STAKE / self.GRIDS_PER_CYCLE
grid_manager = GridManager(
pair=pair,
lower_price=lower_price,
upper_price=upper_price,
step=step,
stake_per_grid=stake_per_grid
)
# 如果 Redis 中有持仓数据,进行恢复
if recovered_state:
grid_manager.restore_from_redis_state(recovered_state)
filled_count = sum(1 for gs in grid_manager.grid_states.values() if gs.status == "filled")
print(f"[StaticGrid] {pair} 从 Redis 恢复 - {filled_count} FILLED 网格", file=sys.stderr, flush=True)
else:
# ❌ 不再在初始化时填充网格状态
# 所有网格状态应该由真实的 apply_adjustment() 操作维护
# 初始时所有网格都是 empty第一次加仓时才标记为 filled
print(f"[StaticGrid] {pair} 新建持仓 - 所有网格初始为空,由真实订单维护",
file=sys.stderr, flush=True)
# 初始化 Redis 连接用于后续同步
if self.redis_available:
try:
grid_manager.init_redis(self.redis_url)
except (RuntimeError, ConnectionError) as e:
print(f"[StaticGrid] {pair} Redis 连接失败: {str(e)}",
file=sys.stderr, flush=True)
raise # 重新投掷,导致策略退出
# 保存网格配置,用于价格更新时重新计算
self.grid_managers[pair] = grid_manager
self._pair_configs = getattr(self, '_pair_configs', {})
self._pair_configs[pair] = {'percent': percent}
def _print_pnl_panel(self, pair: str, grid_manager: GridManager) -> None:
"""
打印盈亏统计面板(每5分钟一次)
"""
# 计算盈亏情况
pnl_data = grid_manager.calculate_pnl()
filled_count = sum(1 for gs in grid_manager.grid_states.values() if gs.status == "filled")
empty_count = len(grid_manager.grid_states) - filled_count
unrealized_pnl = pnl_data["unrealized_pnl"]
unrealized_pnl_pct = pnl_data["unrealized_pnl_pct"]
realized_pnl = pnl_data["realized_pnl"]
total_pnl = pnl_data["total_pnl"]
lowest_price_str = f"{grid_manager.lowest_price:.4f}" if grid_manager.lowest_price != float('inf') else 'N/A'
status_str = '亏损💥' if unrealized_pnl < 0 else '盈利✅'
# 完整的盈亏面板
panel = f"""
{'='*100}
│ [盈亏统计面板] {pair} | candle#{grid_manager.candle_index} | 价格: {grid_manager.current_price:.4f}
{'='*100}
│ 【持仓状态】 │
│ · 数量: {grid_manager.total_quantity:.8f} | 空位: {empty_count}/{len(grid_manager.grid_states)} | 成本: {grid_manager.total_invested:.2f} USDT │
│ · 平均价: {grid_manager.avg_entry_price:.4f} USDT | 位置: {filled_count}/{len(grid_manager.grid_states)} FILLED │
│ │
│ 【盈亏情况】 │
│ · 未实现: {unrealized_pnl:+.2f} USDT ({unrealized_pnl_pct:+.2f}%) │
│ · 已实现: {realized_pnl:+.2f} USDT | 总计: {total_pnl:+.2f} USDT │
│ · 最高价: {grid_manager.highest_price:.4f} USDT | 最低价: {lowest_price_str}
│ · 状态: {status_str}
│ │
│ 【网格信息】 │
│ · gridMgr_id: {grid_manager.hash_id} | 创建时间: {grid_manager.created_time}
│ · 已装: {filled_count} | 余次: {50 - filled_count}
{'='*100}"""
print(panel, file=sys.stderr, flush=True)
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
网格入场逻辑
"""
dataframe['enter_long'] = False
dataframe['enter_tag'] = ''
if len(dataframe) == 0:
print(f"[StaticGrid] {metadata['pair']} dataframe 为空,跳过", file=sys.stderr, flush=True)
return dataframe
pair = metadata['pair']
if pair not in self.grid_managers:
self._init_grid_manager(pair)
grid_manager = self.grid_managers[pair]
current_price = dataframe['close'].iloc[-1]
print(f"[StaticGrid] {pair} populate_entry_trend 执行 - 价格: {current_price:.2f}",
file=sys.stderr, flush=True)
# 询问网格管理器是否应该加仓
adjustment = grid_manager.decide_adjustment()
print(f"[StaticGrid] {pair} decide_adjustment 返回: {adjustment}",
file=sys.stderr, flush=True)
if adjustment:
if adjustment.type == AdjustmentType.ENTRY:
# 初始建仓
grid_manager.apply_adjustment(adjustment) # ✅ 更新网格状态
dataframe.loc[dataframe.index[-1], 'enter_long'] = True
dataframe.loc[dataframe.index[-1], 'enter_tag'] = f"grid_entry_{current_price:.0f}"
print(f"[StaticGrid] {pair} 初始建仓信号 @ {current_price:.2f} (ENTRY 设置为 True)",
file=sys.stderr, flush=True)
elif adjustment.type == AdjustmentType.ADD:
# 加仓
grid_manager.apply_adjustment(adjustment) # ✅ 更新网格状态
dataframe.loc[dataframe.index[-1], 'enter_long'] = True
dataframe.loc[dataframe.index[-1], 'enter_tag'] = f"grid_add_{current_price:.0f}"
print(f"[StaticGrid] {pair} 加仓信号 @ {current_price:.2f} (ADD 设置为 True)",
file=sys.stderr, flush=True)
else:
print(f"[StaticGrid] {pair} 无操作建议", file=sys.stderr, flush=True)
# 打印最终的 enter_long 状态
enter_long_value = dataframe.loc[dataframe.index[-1], 'enter_long']
enter_tag_value = dataframe.loc[dataframe.index[-1], 'enter_tag']
print(f"[StaticGrid] {pair} 最终状态 - enter_long: {enter_long_value}, enter_tag: {enter_tag_value}",
file=sys.stderr, flush=True)
return dataframe
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
网格出场逻辑
"""
dataframe['exit_long'] = False
if len(dataframe) == 0:
print(f"[StaticGrid] {metadata['pair']} dataframe 为空,跳过出场逻辑",
file=sys.stderr, flush=True)
return dataframe
pair = metadata['pair']
if pair not in self.grid_managers:
print(f"[StaticGrid] {pair} GridManager 不存在,跳过出场逻辑",
file=sys.stderr, flush=True)
return dataframe
grid_manager = self.grid_managers[pair]
print(f"[StaticGrid] {pair} populate_exit_trend 执行",
file=sys.stderr, flush=True)
# 询问网格管理器是否应该平仓
adjustment = grid_manager.decide_adjustment()
print(f"[StaticGrid] {pair} exit 决策结果: {adjustment}",
file=sys.stderr, flush=True)
if adjustment and adjustment.type == AdjustmentType.EXIT:
# 平仓
dataframe.loc[dataframe.index[-1], 'exit_long'] = True
print(f"[StaticGrid] {pair} 平仓信号 @ {grid_manager.current_price:.2f} (EXIT 设置为 True)",
file=sys.stderr, flush=True)
# ✅ 平仓后,销毁该网格管理器,释放该对象
grid_manager.destroy()
del self.grid_managers[pair]
print(f"[StaticGrid] {pair} GridManager 已销毁,下一次会创建新的",
file=sys.stderr, flush=True)
# 打印最终的 exit_long 状态
exit_long_value = dataframe.loc[dataframe.index[-1], 'exit_long']
print(f"[StaticGrid] {pair} 最终 exit_long: {exit_long_value}",
file=sys.stderr, flush=True)
return dataframe
def adjust_trade_position(self, trade, current_rate: float,
current_profit: float, min_stake: float,
max_stake: float, **kwargs) -> Optional[float]:
"""
加仓逻辑:根据 GridManager 的决策进行加仓
"""
pair = trade.pair
print(f"[StaticGrid] {pair} adjust_trade_position 执行 - 当前价: {current_rate:.2f}, 利润: {current_profit:.4f}",
file=sys.stderr, flush=True)
print(f"[StaticGrid] {pair} Trade 信息 - 持仓: {trade.amount:.6f}, 平均价: {trade.open_rate:.2f}, stake: {trade.stake_amount:.2f}",
file=sys.stderr, flush=True)
if pair not in self.grid_managers:
print(f"[StaticGrid] {pair} GridManager 不存在,创建新的",
file=sys.stderr, flush=True)
self._init_grid_manager(pair)
grid_manager = self.grid_managers[pair]
# 同步 Trade 对象的信息到 GridManager用于加仓决策
# 使用 trade.open_date_utc 的时间戳作为 candle_index
try:
candle_index = int(trade.open_date_utc.timestamp() / 3600) if trade.open_date_utc else 0
except:
candle_index = 0
grid_manager.sync_from_trade_object(trade, candle_index)
# 询问网格管理器是否应该加仓
adjustment = grid_manager.decide_adjustment()
print(f"[StaticGrid] {pair} decide_adjustment 返回: {adjustment}",
file=sys.stderr, flush=True)
if adjustment and adjustment.type == AdjustmentType.ADD:
# 订单可执行,先更新网格状态(谨慎起见)
grid_manager.apply_adjustment(adjustment)
# 计算加仓金额
stake_amount = adjustment.quantity * self.STAKE
print(f"[StaticGrid] {pair} 加仓决策 - 数量: {adjustment.quantity}, 金额: {stake_amount} USDT",
file=sys.stderr, flush=True)
print(f"[StaticGrid] {pair} 可用金额范围 - min: {min_stake:.2f}, max: {max_stake:.2f}",
file=sys.stderr, flush=True)
# 检查是否超过最大可用金额
if stake_amount <= max_stake:
# 计算还能加仓多少次
filled_grids = sum(1 for gs in grid_manager.grid_states.values() if gs.status == "filled")
remaining_times = grid_manager.total_grid_levels - filled_grids
print(f"[StaticGrid] {pair} 加仓批准, 最多还可以加仓{remaining_times}次 - 返回金额: {stake_amount:.2f}",
file=sys.stderr, flush=True)
return stake_amount
else:
print(f"[StaticGrid] {pair} ❌ 加仓金额超过最大值 ({stake_amount:.2f} > {max_stake:.2f}),跳过",
file=sys.stderr, flush=True)
return None
else:
print(f"[StaticGrid] {pair} 无加仓建议,返回 None",
file=sys.stderr, flush=True)
# ✅ 检查自监测:如果已完结,返回负数来强制平仓所有持仓
if grid_manager.is_completed and trade.amount > 0:
print(f"[StaticGrid] {pair} GridManager 已完结,强制平仓 - 理由: {grid_manager.completion_reason}",
file=sys.stderr, flush=True)
# 返回负数来平仓所有持仓
return -trade.amount
return None
def custom_stake_amount(self, pair: str, current_rate: float, proposed_stake: float,
min_stake: float, max_stake: float, **kwargs) -> float:
"""
自定义初始建仓金额,使每个网格投入金额一致
对于网格交易,初次建仓也应该只投入一个网格的金额,保持一致性
"""
if pair not in self.grid_managers:
# 如果还没有 GridManager创建一个
self._init_grid_manager(pair)
grid_manager = self.grid_managers[pair]
# 每个网格的投入金额
grid_stake = grid_manager.stake_per_grid
print(f"[StaticGrid] {pair} custom_stake_amount - proposed: {proposed_stake:.2f}, "
f"grid_stake: {grid_stake:.2f}, min: {min_stake:.2f}, max: {max_stake:.2f}",
file=sys.stderr, flush=True)
# 返回每个网格的投入金额(确保在 min_stake 和 max_stake 之间)
actual_stake = max(min_stake, min(grid_stake, max_stake))
print(f"[StaticGrid] {pair} custom_stake_amount 返回: {actual_stake:.2f}",
file=sys.stderr, flush=True)
return actual_stake