zhangkun9038@dingtalk.com 658db450ec fix
2025-11-27 15:48:55 +08:00

302 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# user_data/strategies/StaticGrid.py
from freqtrade.strategy import IStrategy
from pandas import DataFrame
from typing import Optional, Dict, Any
import logging
import sys
from grid_manager import GridManager, OrderFill, AdjustmentType
logger = logging.getLogger(__name__)
class StaticGrid(IStrategy):
INTERFACE_VERSION = 3
timeframe = '1h'
can_short = False
minimal_roi = {"0": 0.001} # 极小收益就卖
stoploss = -0.99
use_exit_signal = True
position_adjustment_enable = False # 关闭加仓
cooldown_candles = 0
# Grid parameters - 基于价格的百分比
# BTC/USDT: 上下 20%
# ETH/USDT, SOL/USDT: 上下 33%
GRID_CONFIG = {
"BTC/USDT": {"percent": 0.20, "step": 500}, # 上下 20%,步长 500
"ETH/USDT": {"percent": 0.33, "step": 100}, # 上下 33%,步长 100
"SOL/USDT": {"percent": 0.33, "step": 5}, # 上下 33%,步长 5
}
STAKE = 40.0 # 每个网格的投资额
def __init__(self, config: dict) -> None:
super().__init__(config)
self.grid_managers: Dict[str, GridManager] = {}
self.redis_available = 'redis' in config and 'url' in config.get('redis', {})
self.redis_url = config.get('redis', {}).get('url', '')
print("[StaticGrid] Strategy initialized!", file=sys.stderr, flush=True)
print(f"[StaticGrid] Redis available: {self.redis_available}", file=sys.stderr, flush=True)
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 在每个蜡烛线更新网格管理器状态
pair = metadata['pair']
if pair not in self.grid_managers:
self._init_grid_manager(pair)
grid_manager = self.grid_managers[pair]
current_price = dataframe['close'].iloc[-1]
candle_index = len(dataframe) - 1
print(f"[StaticGrid] {pair} populate_indicators - 价格: {current_price:.2f}, candle: {candle_index}",
file=sys.stderr, flush=True)
print(f"[StaticGrid] {pair} GridManager 范围: {grid_manager.lower_price:.2f}-{grid_manager.upper_price:.2f}",
file=sys.stderr, flush=True)
# 动态更新网格范围(如果初始化时用的是占位符价格)
if grid_manager.lower_price <= 1000 and grid_manager.upper_price <= 5000:
# 说明用的是默认占位符,需要更新为实际价格
config = self.GRID_CONFIG.get(pair, {"percent": 0.20, "step": 100})
percent = config.get("percent", 0.20)
step = config.get("step", 100)
new_lower = current_price * (1 - percent)
new_upper = current_price * (1 + percent)
# 更新 GridManager 的范围
grid_manager.lower_price = new_lower
grid_manager.upper_price = new_upper
# 重新计算总网格数
grid_manager.total_grid_levels = int((new_upper - new_lower) / step) + 1
grid_manager.grid_prices = [new_lower + i * step for i in range(grid_manager.total_grid_levels)]
print(f"[StaticGrid] {pair} 更新网格范围 - 从 1000-5000 更新为 {new_lower:.2f}-{new_upper:.2f}",
file=sys.stderr, flush=True)
print(f"[StaticGrid] {pair} 新网格数: {grid_manager.total_grid_levels}",
file=sys.stderr, flush=True)
# 更新网格管理器状态
grid_manager.update_state(current_price, candle_index)
# 定期报告
grid_manager.check_and_report()
return dataframe
def _init_grid_manager(self, pair: str) -> None:
"""初始化或恢复网格管理器"""
# 获取币对配置(如果不存在则使用默认值)
config = self.GRID_CONFIG.get(pair, {"percent": 0.20, "step": 100})
percent = config.get("percent", 0.20)
step = config.get("step", 100)
print(f"[StaticGrid] {pair} 网格配置 - 百分比: {percent*100:.0f}%, 步长: {step}",
file=sys.stderr, flush=True)
# 尝试从 Redis 恢复
recovered_state = None
if self.redis_available:
recovered_state = GridManager.recover_from_redis(pair, self.redis_url)
# 如果从 Redis 恢复了状态,使用恢复的价格作为基准
# 否则等待第一次 populate_indicators 更新价格后再初始化
if recovered_state:
current_price = recovered_state.get('current_price', 0)
print(f"[StaticGrid] {pair} 从 Redis 恢复价格: {current_price}",
file=sys.stderr, flush=True)
else:
# 这里暂时用占位符,会在 populate_indicators 中更新
current_price = 0
print(f"[StaticGrid] {pair} 使用占位符价格,会在第一次 populate_indicators 中更新",
file=sys.stderr, flush=True)
# 计算动态的上下沿
if current_price > 0:
lower_price = current_price * (1 - percent)
upper_price = current_price * (1 + percent)
print(f"[StaticGrid] {pair} 计算动态上下沿 - 价格: {current_price}, 范围: {lower_price:.2f}-{upper_price:.2f}",
file=sys.stderr, flush=True)
else:
# 如果没有价格信息,使用默认值(会在第一次更新时重新计算)
lower_price = 1000
upper_price = 5000
print(f"[StaticGrid] {pair} 使用默认上下沿: {lower_price}-{upper_price}",
file=sys.stderr, flush=True)
print(f"[StaticGrid] {pair} 动态网格范围 - 下沿: {lower_price:.2f}, 上沿: {upper_price:.2f}",
file=sys.stderr, flush=True)
# 创建新的 GridManager
grid_manager = GridManager(
pair=pair,
lower_price=lower_price,
upper_price=upper_price,
step=step,
stake_per_grid=self.STAKE
)
# 如果 Redis 中有持仓数据,进行恢复
if recovered_state:
grid_manager.restore_from_redis_state(recovered_state)
print(f"[StaticGrid] {pair} 已从 Redis 恢复", file=sys.stderr, flush=True)
else:
print(f"[StaticGrid] {pair} 新建持仓", file=sys.stderr, flush=True)
# 初始化 Redis 连接用于后续同步
if self.redis_available:
grid_manager.init_redis(self.redis_url)
# 保存网格配置,用于价格更新时重新计算
self.grid_managers[pair] = grid_manager
self._pair_configs = getattr(self, '_pair_configs', {})
self._pair_configs[pair] = {'percent': percent, 'step': step}
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
网格入场逻辑
"""
dataframe['enter_long'] = False
dataframe['enter_tag'] = ''
if len(dataframe) == 0:
print(f"[StaticGrid] {metadata['pair']} dataframe 为空,跳过", file=sys.stderr, flush=True)
return dataframe
pair = metadata['pair']
if pair not in self.grid_managers:
self._init_grid_manager(pair)
grid_manager = self.grid_managers[pair]
current_price = dataframe['close'].iloc[-1]
print(f"[StaticGrid] {pair} populate_entry_trend 执行 - 价格: {current_price:.2f}",
file=sys.stderr, flush=True)
# 询问网格管理器是否应该加仓
adjustment = grid_manager.decide_adjustment()
print(f"[StaticGrid] {pair} decide_adjustment 返回: {adjustment}",
file=sys.stderr, flush=True)
if adjustment:
if adjustment.type == AdjustmentType.ENTRY:
# 初始建仓
dataframe.loc[dataframe.index[-1], 'enter_long'] = True
dataframe.loc[dataframe.index[-1], 'enter_tag'] = f"grid_entry_{current_price:.0f}"
print(f"[StaticGrid] {pair} 初始建仓信号 @ {current_price:.2f} (ENTRY 设置为 True)",
file=sys.stderr, flush=True)
elif adjustment.type == AdjustmentType.ADD:
# 加仓
dataframe.loc[dataframe.index[-1], 'enter_long'] = True
dataframe.loc[dataframe.index[-1], 'enter_tag'] = f"grid_add_{current_price:.0f}"
print(f"[StaticGrid] {pair} 加仓信号 @ {current_price:.2f} (ADD 设置为 True)",
file=sys.stderr, flush=True)
else:
print(f"[StaticGrid] {pair} 无操作建议", file=sys.stderr, flush=True)
# 打印最终的 enter_long 状态
enter_long_value = dataframe.loc[dataframe.index[-1], 'enter_long']
enter_tag_value = dataframe.loc[dataframe.index[-1], 'enter_tag']
print(f"[StaticGrid] {pair} 最终状态 - enter_long: {enter_long_value}, enter_tag: {enter_tag_value}",
file=sys.stderr, flush=True)
return dataframe
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
网格出场逻辑
"""
dataframe['exit_long'] = False
if len(dataframe) == 0:
print(f"[StaticGrid] {metadata['pair']} dataframe 为空,跳过出场逻辑",
file=sys.stderr, flush=True)
return dataframe
pair = metadata['pair']
if pair not in self.grid_managers:
print(f"[StaticGrid] {pair} GridManager 不存在,跳过出场逻辑",
file=sys.stderr, flush=True)
return dataframe
grid_manager = self.grid_managers[pair]
print(f"[StaticGrid] {pair} populate_exit_trend 执行",
file=sys.stderr, flush=True)
# 询问网格管理器是否应该平仓
adjustment = grid_manager.decide_adjustment()
print(f"[StaticGrid] {pair} exit 决策结果: {adjustment}",
file=sys.stderr, flush=True)
if adjustment and adjustment.type == AdjustmentType.EXIT:
# 平仓
dataframe.loc[dataframe.index[-1], 'exit_long'] = True
print(f"[StaticGrid] {pair} 平仓信号 @ {grid_manager.current_price:.2f} (EXIT 设置为 True)",
file=sys.stderr, flush=True)
# 打印最终的 exit_long 状态
exit_long_value = dataframe.loc[dataframe.index[-1], 'exit_long']
print(f"[StaticGrid] {pair} 最终 exit_long: {exit_long_value}",
file=sys.stderr, flush=True)
return dataframe
def adjust_trade_position(self, trade, current_rate: float,
current_profit: float, min_stake: float,
max_stake: float, **kwargs) -> Optional[float]:
"""
加仓逻辑:根据 GridManager 的决策进行加仓
"""
pair = trade.pair
print(f"[StaticGrid] {pair} adjust_trade_position 执行 - 当前价: {current_rate:.2f}, 利润: {current_profit:.4f}",
file=sys.stderr, flush=True)
print(f"[StaticGrid] {pair} Trade 信息 - 持仓: {trade.amount:.6f}, 平均价: {trade.open_rate:.2f}, stake: {trade.stake_amount:.2f}",
file=sys.stderr, flush=True)
if pair not in self.grid_managers:
print(f"[StaticGrid] {pair} GridManager 不存在,创建新的",
file=sys.stderr, flush=True)
self._init_grid_manager(pair)
grid_manager = self.grid_managers[pair]
# 同步 Trade 对象的信息到 GridManager用于加仓决策
# 使用 trade.open_date_utc 的时间戳作为 candle_index
try:
candle_index = int(trade.open_date_utc.timestamp() / 3600) if trade.open_date_utc else 0
except:
candle_index = 0
grid_manager.sync_from_trade_object(trade, candle_index)
# 询问网格管理器是否应该加仓
adjustment = grid_manager.decide_adjustment()
print(f"[StaticGrid] {pair} decide_adjustment 返回: {adjustment}",
file=sys.stderr, flush=True)
if adjustment and adjustment.type == AdjustmentType.ADD:
# 计算加仓金额
stake_amount = adjustment.quantity * self.STAKE
print(f"[StaticGrid] {pair} 加仓决策 - 数量: {adjustment.quantity}, 金额: {stake_amount} USDT",
file=sys.stderr, flush=True)
print(f"[StaticGrid] {pair} 可用金额范围 - min: {min_stake:.2f}, max: {max_stake:.2f}",
file=sys.stderr, flush=True)
# 检查是否超过最大可用金额
if stake_amount <= max_stake:
print(f"[StaticGrid] {pair} ✅ 加仓批准 - 返回金额: {stake_amount:.2f}",
file=sys.stderr, flush=True)
return stake_amount
else:
print(f"[StaticGrid] {pair} ❌ 加仓金额超过最大值 ({stake_amount:.2f} > {max_stake:.2f}),跳过",
file=sys.stderr, flush=True)
return None
else:
print(f"[StaticGrid] {pair} 无加仓建议,返回 None",
file=sys.stderr, flush=True)
return None