# user_data/strategies/StaticGrid.py from freqtrade.strategy import IStrategy from pandas import DataFrame from typing import Optional, Dict, Any import logging import sys from grid_manager import GridManager, OrderFill, AdjustmentType, GridLevel logger = logging.getLogger(__name__) class StaticGrid(IStrategy): INTERFACE_VERSION = 3 timeframe = '1h' can_short = False minimal_roi = {"0": 0.001} # 极小收益就卖 stoploss = -0.99 use_exit_signal = True position_adjustment_enable = False # 关闭加仓 cooldown_candles = 0 # Grid parameters - 基于价格的百分比 # BTC/USDT: 上下 20% # ETH/USDT, SOL/USDT: 上下 33% GRID_CONFIG = { "BTC/USDT": {"percent": 0.20}, # 上下 20% "ETH/USDT": {"percent": 0.33}, # 上下 33% "SOL/USDT": {"percent": 0.33}, # 上下 33% "TRX/USDT": {"percent": 0.33}, # 上下 33% } STAKE = 40.0 # 每个持仓周期的总投资额(会分散到25个网格) GRIDS_PER_CYCLE = 25 # 每个持仓周期的网格数 def __init__(self, config: dict) -> None: super().__init__(config) self.grid_managers: Dict[str, GridManager] = {} self.redis_available = 'redis' in config and 'url' in config.get('redis', {}) self.redis_url = config.get('redis', {}).get('url', '') print("[StaticGrid] Strategy initialized!", file=sys.stderr, flush=True) print(f"[StaticGrid] Redis available: {self.redis_available}", file=sys.stderr, flush=True) def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # 在每个蜡烛线更新网格管理器状态 pair = metadata['pair'] if pair not in self.grid_managers: self._init_grid_manager(pair) grid_manager = self.grid_managers[pair] current_price = dataframe['close'].iloc[-1] candle_index = len(dataframe) - 1 print(f"[StaticGrid] {pair} populate_indicators - 价格: {current_price:.2f}, candle: {candle_index}", file=sys.stderr, flush=True) print(f"[StaticGrid] {pair} GridManager 范围: {grid_manager.lower_price:.2f}-{grid_manager.upper_price:.2f}", file=sys.stderr, flush=True) # 动态更新网格范围(仅在第一次初始化时) # 检查是否需要从占位符更新为实际价格 # ✅ 修复:只有在范围恰好是占位符(1000-5000)时才更新 has_placeholder = (abs(grid_manager.lower_price - 1000) < 0.1 and abs(grid_manager.upper_price - 5000) < 0.1) if has_placeholder: # 第一次初始化,从占位符更新为实际价格 config = self.GRID_CONFIG.get(pair, {"percent": 0.20}) percent = config.get("percent", 0.20) new_lower = current_price * (1 - percent) new_upper = current_price * (1 + percent) # ✅ 处理极端情况:如果上下沿舍入后相同,需要调整 # 这可能发生在低价币上(如 0.01 * 1.33 = 0.01330... 舍入到 0.01) range_size = new_upper - new_lower if range_size <= 0 or abs(range_size) < 1e-8: # 范围太小,动态扩大:使用固定的小数位精度 # 计算 current_price 的小数位数 price_str = f"{current_price:.10f}".rstrip('0') decimal_places = len(price_str.split('.')[1]) if '.' in price_str else 0 # 基于小数位数动态设置步长 min_step = 10 ** (-decimal_places - 1) # 比最小单位少一位 # 设置范围:上下各 25 步(总共 50 个网格) new_lower = max(min_step, current_price - min_step * 25) # 下方 25 步(不低于最小单位) new_upper = current_price + min_step * 25 # 上方 25 步 step = min_step print(f"[StaticGrid] {pair} 检测到价格范围过小,动态调整 - 小数位: {decimal_places}, 步长: {step:.10f}, 范围: {new_lower:.10f}-{new_upper:.10f}", file=sys.stderr, flush=True) else: # 动态计算步长:范围 / 50 step = range_size / 50 # 更新 GridManager 的范围 grid_manager.lower_price = new_lower grid_manager.upper_price = new_upper # 重新计算总网格数 grid_manager.total_grid_levels = int((new_upper - new_lower) / step) + 1 grid_manager.grid_prices = [new_lower + i * step for i in range(grid_manager.total_grid_levels)] grid_manager.step = step # 更新步长 # 重新初始化 grid_states(仅在第一次,之后保留已有的 FILLED 状态) grid_manager.grid_states.clear() for price in grid_manager.grid_prices: grid_manager.grid_states[price] = GridLevel( price=price, status="empty", quantity=0.0, entry_price=0.0, entry_time=0 ) # 重置同步标志,以便网格范围更新后的第一次 sync 能重新初始化 grid_manager._synced_from_trade_once = False # ✅ 当网格范围更新时,清理 Redis 中的旧数据,重新开始 # 并不恢复旧数据,因为网格范围已经改变,网格位置名不对应 if self.redis_available: try: if grid_manager.redis_client: grid_manager.redis_client.delete(grid_manager.redis_key) print(f"[StaticGrid] {pair} 清理 Redis 中的旧数据", file=sys.stderr, flush=True) except: pass print(f"[StaticGrid] {pair} 更新网格范围 - 从 1000-5000 更新为 {new_lower:.2f}-{new_upper:.2f}", file=sys.stderr, flush=True) print(f"[StaticGrid] {pair} 新网格数: {grid_manager.total_grid_levels}, 步长: {step:.4f}", file=sys.stderr, flush=True) # 更新网格管理器状态 grid_manager.update_state(current_price, candle_index) # ✅ 定期报告 - 每次加仓时或每5分钟输出盈亏统计面板 # 只要有持仓就定期输出(便于调试) if grid_manager.total_quantity > 0 or candle_index % 5 == 0: self._print_pnl_panel(pair, grid_manager) return dataframe def _init_grid_manager(self, pair: str) -> None: """初始化或恢复网格管理器""" # 获取币对配置(如果不存在则使用默认值) config = self.GRID_CONFIG.get(pair, {"percent": 0.20}) percent = config.get("percent", 0.20) print(f"[StaticGrid] {pair} 网格配置 - 百分比: {percent*100:.0f}%", file=sys.stderr, flush=True) # 尝试从 Redis 恢复 recovered_state = None if self.redis_available: recovered_state = GridManager.recover_from_redis(pair, self.redis_url) # 如果从 Redis 恢复了状态,使用恢复的价格作为基准 # 否则等待第一次 populate_indicators 更新价格后再初始化 if recovered_state: current_price = recovered_state.get('current_price', 0) print(f"[StaticGrid] {pair} 从 Redis 恢复价格: {current_price}", file=sys.stderr, flush=True) else: # 这里暂时用占位符,会在 populate_indicators 中更新 current_price = 0 print(f"[StaticGrid] {pair} 使用占位符价格,会在第一次 populate_indicators 中更新", file=sys.stderr, flush=True) # 计算动态的上下沿 if current_price > 0: lower_price = current_price * (1 - percent) upper_price = current_price * (1 + percent) print(f"[StaticGrid] {pair} 计算动态上下沿 - 价格: {current_price}, 范围: {lower_price:.2f}-{upper_price:.2f}", file=sys.stderr, flush=True) else: # 如果没有价格信息,使用默认值(会在第一次更新时重新计算) lower_price = 1000 upper_price = 5000 print(f"[StaticGrid] {pair} 使用默认上下沿: {lower_price}-{upper_price}", file=sys.stderr, flush=True) # 动态计算步长:范围 / 50 range_size = upper_price - lower_price step = range_size / 50 print(f"[StaticGrid] {pair} 动态网格范围 - 下沿: {lower_price:.2f}, 上沿: {upper_price:.2f}, 步长: {step:.4f}", file=sys.stderr, flush=True) # 创建新的 GridManager # ✅ stake_per_grid = 总投资额 / 网格数,保证每个网格投入一致 stake_per_grid = self.STAKE / self.GRIDS_PER_CYCLE grid_manager = GridManager( pair=pair, lower_price=lower_price, upper_price=upper_price, step=step, stake_per_grid=stake_per_grid ) # 如果 Redis 中有持仓数据,进行恢复 if recovered_state: grid_manager.restore_from_redis_state(recovered_state) filled_count = sum(1 for gs in grid_manager.grid_states.values() if gs.status == "filled") print(f"[StaticGrid] {pair} 从 Redis 恢复 - {filled_count} FILLED 网格", file=sys.stderr, flush=True) else: # ✅ 新建持仓时,初始化时直接填充一半的网格(25个) # 第1个=初次入场,剩余24个=加仓 filled_count = 0 for i in range(25): # 填充前25个网格(总共50个) grid_price = grid_manager.lower_price + i * step if grid_price in grid_manager.grid_states: grid_manager.grid_states[grid_price].status = "filled" filled_count += 1 print(f"[StaticGrid] {pair} 新建持仓 - 初始化填充 {filled_count} 个网格(1个初次入场+{filled_count-1}个加仓)", file=sys.stderr, flush=True) # 初始化 Redis 连接用于后续同步 if self.redis_available: try: grid_manager.init_redis(self.redis_url) except (RuntimeError, ConnectionError) as e: print(f"[StaticGrid] {pair} Redis 连接失败: {str(e)}", file=sys.stderr, flush=True) raise # 重新投掷,导致策略退出 # 保存网格配置,用于价格更新时重新计算 self.grid_managers[pair] = grid_manager self._pair_configs = getattr(self, '_pair_configs', {}) self._pair_configs[pair] = {'percent': percent} def _print_pnl_panel(self, pair: str, grid_manager: GridManager) -> None: """ 打印盈亏统计面板(每5分钟一次) """ # 计算盈亏指标 filled_count = sum(1 for gs in grid_manager.grid_states.values() if gs.status == "filled") empty_count = len(grid_manager.grid_states) - filled_count unrealized = (grid_manager.current_price - grid_manager.avg_entry_price) * grid_manager.total_quantity if grid_manager.total_quantity > 0 else 0 unrealized_pct = (grid_manager.current_price - grid_manager.avg_entry_price) / grid_manager.avg_entry_price * 100 if grid_manager.avg_entry_price > 0 else 0 lowest_price_str = f"{grid_manager.lowest_price:.4f}" if grid_manager.lowest_price != float('inf') else 'N/A' status_str = '亏损💥' if unrealized < 0 else '盈利✅' # 一旧扯住盈亏面板 panel = f""" ┌{'='*100}┐ │ [盈亏统计面板] {pair} | candle#{grid_manager.candle_index} | 价格: {grid_manager.current_price:.4f}│ ├{'='*100}┤ │ 【持仓状态】 │ │ • 数量: {grid_manager.total_quantity:.8f} | 分散世旧: {empty_count}/{len(grid_manager.grid_states)} | 成本: {grid_manager.total_invested:.2f} USDT │ │ • 平均价: {grid_manager.avg_entry_price:.4f} USDT | 位置: {filled_count}/{len(grid_manager.grid_states)} FILLED │ │ │ │ 【盈亏情况】 │ │ • 未实珰: {unrealized:+.2f} USDT ({unrealized_pct:+.2f}%) │ │ • 最高价: {grid_manager.highest_price:.4f} USDT | 最低价: {lowest_price_str}│ │ • 状态: {status_str} │ │ │ │ 【捷站显示】 │ │ • gridMgr_id: {grid_manager.hash_id} | 创建时间: {grid_manager.created_time} │ │ • 接次: {filled_count} | 下次需废数: {50 - filled_count} │ └{'='*100}┘""" print(panel, file=sys.stderr, flush=True) def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: """ 网格入场逻辑 """ dataframe['enter_long'] = False dataframe['enter_tag'] = '' if len(dataframe) == 0: print(f"[StaticGrid] {metadata['pair']} dataframe 为空,跳过", file=sys.stderr, flush=True) return dataframe pair = metadata['pair'] if pair not in self.grid_managers: self._init_grid_manager(pair) grid_manager = self.grid_managers[pair] current_price = dataframe['close'].iloc[-1] print(f"[StaticGrid] {pair} populate_entry_trend 执行 - 价格: {current_price:.2f}", file=sys.stderr, flush=True) # 询问网格管理器是否应该加仓 adjustment = grid_manager.decide_adjustment() print(f"[StaticGrid] {pair} decide_adjustment 返回: {adjustment}", file=sys.stderr, flush=True) if adjustment: if adjustment.type == AdjustmentType.ENTRY: # 初始建仓 grid_manager.apply_adjustment(adjustment) # ✅ 更新网格状态 dataframe.loc[dataframe.index[-1], 'enter_long'] = True dataframe.loc[dataframe.index[-1], 'enter_tag'] = f"grid_entry_{current_price:.0f}" print(f"[StaticGrid] {pair} 初始建仓信号 @ {current_price:.2f} (ENTRY 设置为 True)", file=sys.stderr, flush=True) elif adjustment.type == AdjustmentType.ADD: # 加仓 grid_manager.apply_adjustment(adjustment) # ✅ 更新网格状态 dataframe.loc[dataframe.index[-1], 'enter_long'] = True dataframe.loc[dataframe.index[-1], 'enter_tag'] = f"grid_add_{current_price:.0f}" print(f"[StaticGrid] {pair} 加仓信号 @ {current_price:.2f} (ADD 设置为 True)", file=sys.stderr, flush=True) else: print(f"[StaticGrid] {pair} 无操作建议", file=sys.stderr, flush=True) # 打印最终的 enter_long 状态 enter_long_value = dataframe.loc[dataframe.index[-1], 'enter_long'] enter_tag_value = dataframe.loc[dataframe.index[-1], 'enter_tag'] print(f"[StaticGrid] {pair} 最终状态 - enter_long: {enter_long_value}, enter_tag: {enter_tag_value}", file=sys.stderr, flush=True) return dataframe def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: """ 网格出场逻辑 """ dataframe['exit_long'] = False if len(dataframe) == 0: print(f"[StaticGrid] {metadata['pair']} dataframe 为空,跳过出场逻辑", file=sys.stderr, flush=True) return dataframe pair = metadata['pair'] if pair not in self.grid_managers: print(f"[StaticGrid] {pair} GridManager 不存在,跳过出场逻辑", file=sys.stderr, flush=True) return dataframe grid_manager = self.grid_managers[pair] print(f"[StaticGrid] {pair} populate_exit_trend 执行", file=sys.stderr, flush=True) # 询问网格管理器是否应该平仓 adjustment = grid_manager.decide_adjustment() print(f"[StaticGrid] {pair} exit 决策结果: {adjustment}", file=sys.stderr, flush=True) if adjustment and adjustment.type == AdjustmentType.EXIT: # 平仓 dataframe.loc[dataframe.index[-1], 'exit_long'] = True print(f"[StaticGrid] {pair} 平仓信号 @ {grid_manager.current_price:.2f} (EXIT 设置为 True)", file=sys.stderr, flush=True) # ✅ 平仓后,销毁该网格管理器,释放该对象 grid_manager.destroy() del self.grid_managers[pair] print(f"[StaticGrid] {pair} GridManager 已销毁,下一次会创建新的", file=sys.stderr, flush=True) # 打印最终的 exit_long 状态 exit_long_value = dataframe.loc[dataframe.index[-1], 'exit_long'] print(f"[StaticGrid] {pair} 最终 exit_long: {exit_long_value}", file=sys.stderr, flush=True) return dataframe def adjust_trade_position(self, trade, current_rate: float, current_profit: float, min_stake: float, max_stake: float, **kwargs) -> Optional[float]: """ 加仓逻辑:根据 GridManager 的决策进行加仓 """ pair = trade.pair print(f"[StaticGrid] {pair} adjust_trade_position 执行 - 当前价: {current_rate:.2f}, 利润: {current_profit:.4f}", file=sys.stderr, flush=True) print(f"[StaticGrid] {pair} Trade 信息 - 持仓: {trade.amount:.6f}, 平均价: {trade.open_rate:.2f}, stake: {trade.stake_amount:.2f}", file=sys.stderr, flush=True) if pair not in self.grid_managers: print(f"[StaticGrid] {pair} GridManager 不存在,创建新的", file=sys.stderr, flush=True) self._init_grid_manager(pair) grid_manager = self.grid_managers[pair] # 同步 Trade 对象的信息到 GridManager(用于加仓决策) # 使用 trade.open_date_utc 的时间戳作为 candle_index try: candle_index = int(trade.open_date_utc.timestamp() / 3600) if trade.open_date_utc else 0 except: candle_index = 0 grid_manager.sync_from_trade_object(trade, candle_index) # 询问网格管理器是否应该加仓 adjustment = grid_manager.decide_adjustment() print(f"[StaticGrid] {pair} decide_adjustment 返回: {adjustment}", file=sys.stderr, flush=True) if adjustment and adjustment.type == AdjustmentType.ADD: # 订单可执行,先更新网格状态(谨慎起见) grid_manager.apply_adjustment(adjustment) # 计算加仓金额 stake_amount = adjustment.quantity * self.STAKE print(f"[StaticGrid] {pair} 加仓决策 - 数量: {adjustment.quantity}, 金额: {stake_amount} USDT", file=sys.stderr, flush=True) print(f"[StaticGrid] {pair} 可用金额范围 - min: {min_stake:.2f}, max: {max_stake:.2f}", file=sys.stderr, flush=True) # 检查是否超过最大可用金额 if stake_amount <= max_stake: # 计算还能加仓多少次 filled_grids = sum(1 for gs in grid_manager.grid_states.values() if gs.status == "filled") remaining_times = grid_manager.total_grid_levels - filled_grids print(f"[StaticGrid] {pair} 加仓批准, 最多还可以加仓{remaining_times}次 - 返回金额: {stake_amount:.2f}", file=sys.stderr, flush=True) return stake_amount else: print(f"[StaticGrid] {pair} ❌ 加仓金额超过最大值 ({stake_amount:.2f} > {max_stake:.2f}),跳过", file=sys.stderr, flush=True) return None else: print(f"[StaticGrid] {pair} 无加仓建议,返回 None", file=sys.stderr, flush=True) # ✅ 检查自监测:如果已完结,返回负数来强制平仓所有持仓 if grid_manager.is_completed and trade.amount > 0: print(f"[StaticGrid] {pair} GridManager 已完结,强制平仓 - 理由: {grid_manager.completion_reason}", file=sys.stderr, flush=True) # 返回负数来平仓所有持仓 return -trade.amount return None def custom_stake_amount(self, pair: str, current_rate: float, proposed_stake: float, min_stake: float, max_stake: float, **kwargs) -> float: """ 自定义初始建仓金额,使每个网格投入金额一致 对于网格交易,初次建仓也应该只投入一个网格的金额,保持一致性 """ if pair not in self.grid_managers: # 如果还没有 GridManager,创建一个 self._init_grid_manager(pair) grid_manager = self.grid_managers[pair] # 每个网格的投入金额 grid_stake = grid_manager.stake_per_grid print(f"[StaticGrid] {pair} custom_stake_amount - proposed: {proposed_stake:.2f}, " f"grid_stake: {grid_stake:.2f}, min: {min_stake:.2f}, max: {max_stake:.2f}", file=sys.stderr, flush=True) # 返回每个网格的投入金额(确保在 min_stake 和 max_stake 之间) actual_stake = max(min_stake, min(grid_stake, max_stake)) print(f"[StaticGrid] {pair} custom_stake_amount 返回: {actual_stake:.2f}", file=sys.stderr, flush=True) return actual_stake