zhangkun9038@dingtalk.com 5aab051fbf remove some log
2025-11-20 05:23:21 +08:00

346 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
from freqtrade.strategy import IStrategy, IntParameter, DecimalParameter
from pandas import DataFrame
import pandas_ta as ta
from freqtrade.persistence import Trade
import numpy as np
import datetime
import pandas as pd
import math
# 设置pandas选项以解决FutureWarning警告
pd.set_option('future.no_silent_downcasting', True)
logger = logging.getLogger(__name__)
class MartinGale(IStrategy):
# 策略参数
loglevel = "warning"
minimal_roi = {}
# 启用自定义ROI回调函数
use_custom_roi = True
# 固定止损设置为与OKX App一致
stoploss = -0.12 # 对应OKX App中的12.00%止损
trailing_stop = True
trailing_stop_positive_offset = 0.005 # 跟踪止损偏移量 0.5%
# 用于跟踪市场状态的数据框缓存
_dataframe_cache = None
def __init__(self, config=None):
"""初始化策略参数"""
super().__init__(config)
self._trailing_stop_positive_default = 0.004 # 降低默认值以更容易触发跟踪止盈
@property
def protections(self):
"""
保护机制配置
基于最新Freqtrade规范保护机制应定义在策略文件中而非配置文件
"""
return [
{
"method": "StoplossGuard",
"lookback_period_candles": 60, # 3小时回看期60根3分钟K线
"trade_limit": 2, # 最多2笔止损交易
"stop_duration_candles": 60, # 暂停180分钟60根3分钟K线
"only_per_pair": False # 仅针对单个币对
},
{
"method": "CooldownPeriod",
"stop_duration_candles": 2 # 6分钟冷却期2根3分钟K线
},
{
"method": "MaxDrawdown",
"lookback_period_candles": 48, # 2.4小时回看期
"trade_limit": 4, # 4笔交易限制
"stop_duration_candles": 24, # 72分钟暂停24根3分钟K线
"max_allowed_drawdown": 0.20 # 20%最大回撤容忍度
}
]
@property
def trailing_stop_positive(self):
"""根据市场状态动态调整跟踪止盈参数"""
# 获取当前市场状态
if self._dataframe_cache is not None and len(self._dataframe_cache) > 0:
current_state = self._dataframe_cache['market_state'].iloc[-1]
if current_state == 'strong_bull':
return 0.007 # 强劲牛市中降低跟踪止盈,让利润奔跑
elif current_state == 'weak_bull':
return 0.005 # 弱势牛市中保持较低的跟踪止盈
return self._trailing_stop_positive_default # 返回默认值
@trailing_stop_positive.setter
def trailing_stop_positive(self, value):
"""设置trailing_stop_positive的默认值"""
self._trailing_stop_positive_default = value
# 时间框架设置 - 主时间框架为3分钟与OKX App一致
timeframe = "3m" # 主时间框架为 3 分钟
can_short = False # 禁用做空
# 自定义指标参数 - 设置为可通过Hyperopt优化
# 从OKX App中获取的参数
rsi_length = IntParameter(7, 21, default=14, optimize=True, load=True, space='buy') # RSI周期为14可优化范围7-21
rsi_oversold = IntParameter(20, 40, default=30, optimize=True, load=True, space='buy') # RSI触发阈值为30可优化范围20-40
# 马丁格尔策略参数 - 基于OKX App界面设置
max_entry_adjustments = IntParameter(5, 15, default=10, optimize=True, load=True, space='buy') # 最大加仓次数10次可优化范围5-15
add_position_callback = DecimalParameter(0.005, 0.015, decimals=3, default=0.0066, optimize=True, load=True, space='buy') # 跌幅加仓阈值0.66%可优化范围0.5%-1.5%
# 马丁格尔加仓比例参数
step_coefficient = DecimalParameter(1.0, 1.1, decimals=2, default=1.05, optimize=True, load=True, space='buy') # 加仓比例1.05倍可优化范围1.0-1.1
stake_divisor = DecimalParameter(0.9, 1.1, decimals=2, default=1.0, optimize=True, load=True, space='buy') # 初始比例可优化范围0.9-1.1
# 止盈目标参数 - 对应OKX App中的单周期止盈目标2.50%
take_profit_target = DecimalParameter(0.02, 0.03, decimals=3, default=0.025, optimize=True, load=True, space='sell') # 止盈目标2.5%可优化范围2.0%-3.0%
def informative_pairs(self):
"""定义辅助时间框架"""
pairs = self.dp.current_whitelist()
# 添加15m和1h作为辅助时间框架
return [(pair, '15m') for pair in pairs] + [(pair, '1h') for pair in pairs]
def custom_stake_amount(self, pair: str, current_time: pd.Timestamp,
current_rate: float,
proposed_stake: float,
min_stake: float,
max_stake: float,
**kwargs) -> float:
"""自定义下单金额"""
# 初始资金设置
initial_balance = self.config.get('dry_run_wallet', 10000)
# 固定使用初始资金的5%作为初次下单金额在OKX App范围内
desired_stake = initial_balance * 0.05
desired_stake = math.floor(desired_stake) # 取整
# 确保在OKX App设定的范围内2-50,000 USDT
return max(min(desired_stake, 50000, max_stake), 2, min_stake)
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""计算各种技术指标"""
# 计算3m周期的指标
rsi_length_value = self.rsi_length.value
# 计算3m RSI指标
dataframe['rsi_3m'] = ta.rsi(dataframe['close'], length=rsi_length_value)
# 计算3m时间框架的EMA50和EMA200
dataframe['ema_50_3m'] = ta.ema(dataframe['close'], length=50)
dataframe['ema_200_3m'] = ta.ema(dataframe['close'], length=200)
# 成交量过滤
dataframe['volume_ma'] = dataframe['volume'].rolling(20).mean()
# 计算ATR用于动态止损
dataframe['atr'] = ta.atr(dataframe['high'], dataframe['low'], dataframe['close'], length=14)
# 获取15m数据
df_15m = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='15m')
df_15m['rsi_15m'] = ta.rsi(df_15m['close'], length=rsi_length_value)
# 计算15m时间框架的EMA50和EMA200
df_15m['ema_50_15m'] = ta.ema(df_15m['close'], length=50)
df_15m['ema_200_15m'] = ta.ema(df_15m['close'], length=200)
# 将15m数据重新索引到主时间框架(3m)
df_15m = df_15m.set_index('date').reindex(dataframe['date']).reset_index()
df_15m = df_15m.rename(columns={'index': 'date'})
df_15m = df_15m[['date', 'rsi_15m', 'ema_50_15m', 'ema_200_15m']].ffill()
# 合并15m数据
dataframe = dataframe.merge(df_15m, how='left', on='date')
# 获取1h数据
df_1h = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='1h')
# 计算1h RSI和EMA
df_1h['rsi_1h'] = ta.rsi(df_1h['close'], length=rsi_length_value)
df_1h['ema_50_1h'] = ta.ema(df_1h['close'], length=50) # 1h 50周期EMA
df_1h['ema_200_1h'] = ta.ema(df_1h['close'], length=200) # 1h 200周期EMA
# 将1h数据重新索引到主时间框架(3m),并填充缺失值
df_1h = df_1h.set_index('date').reindex(dataframe['date']).ffill().bfill().reset_index()
# 计算1h上涨趋势确保不包含None值
df_1h['trend_1h'] = df_1h['close'] > df_1h['ema_50_1h'] # 1h上涨趋势
df_1h = df_1h.rename(columns={'index': 'date'})
df_1h = df_1h[['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h']].ffill()
# 合并1h数据
dataframe = dataframe.merge(df_1h, how='left', on='date').ffill()
# 计算各时间框架的趋势状态(牛/熊)
# 3m时间框架ema50下穿ema200为熊上穿为牛
dataframe['trend_3m'] = np.where(dataframe['ema_50_3m'] > dataframe['ema_200_3m'], 1, 0)
# 15m时间框架ema50下穿ema200为熊上穿为牛
dataframe['trend_15m'] = np.where(dataframe['ema_50_15m'] > dataframe['ema_200_15m'], 1, 0)
# 1h时间框架ema50下穿ema200为熊上穿为牛
dataframe['trend_1h_ema'] = np.where(dataframe['ema_50_1h'] > dataframe['ema_200_1h'], 1, 0)
# 计算熊牛得分0-100
# 权重3m熊牛权重1015m熊牛权重351h熊牛权重65
dataframe['market_score'] = (
dataframe['trend_3m'] * 10 +
dataframe['trend_15m'] * 35 +
dataframe['trend_1h_ema'] * 65
)
# 确保得分在0-100范围内
dataframe['market_score'] = dataframe['market_score'].clip(lower=0, upper=100)
# 根据得分分类市场状态
dataframe['market_state'] = 'neutral'
dataframe.loc[dataframe['market_score'] > 70, 'market_state'] = 'strong_bull'
dataframe.loc[(dataframe['market_score'] > 50) & (dataframe['market_score'] <= 70), 'market_state'] = 'weak_bull'
dataframe.loc[(dataframe['market_score'] >= 30) & (dataframe['market_score'] <= 50), 'market_state'] = 'neutral'
dataframe.loc[(dataframe['market_score'] > 10) & (dataframe['market_score'] < 30), 'market_state'] = 'weak_bear'
dataframe.loc[dataframe['market_score'] <= 10, 'market_state'] = 'strong_bear'
# 创建一个使用前一行市场状态的列
dataframe['prev_market_state'] = dataframe['market_state'].shift(1)
# 为第一行设置默认值
dataframe['prev_market_state'] = dataframe['prev_market_state'].fillna('neutral')
# 保存数据框缓存用于trailing_stop_positive计算
self._dataframe_cache = dataframe
return dataframe
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""设置出场条件"""
# 基于固定止盈目标2.50%
# 注意实际止盈将在custom_exit中实现
# 当RSI进入超买区域时考虑出场
rsi_overbought = dataframe['rsi_1h'] > 70
# 设置出场信号
dataframe.loc[rsi_overbought, 'exit_long'] = 1
return dataframe
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""设置入场条件"""
# 确保prev_market_state列存在
if 'prev_market_state' not in dataframe.columns:
dataframe['prev_market_state'] = 'neutral'
# 基于OKX App的RSI条件RSI-14向下穿过30阈值
# 条件1: RSI处于超卖区域根据市场状态动态调整
rsi_condition = dataframe.apply(lambda row:
row['rsi_3m'] < self.rsi_oversold.value if row['prev_market_state'] in ['strong_bull', 'weak_bull'] else row['rsi_3m'] < self.rsi_oversold.value,
axis=1)
# 条件2: RSI向下穿过阈值交叉检测
rsi_cross_down = (dataframe['rsi_3m'] < self.rsi_oversold.value) & (dataframe['rsi_3m'].shift(1) >= self.rsi_oversold.value)
# 条件3: 成交量放大确认信号
volume_spike = dataframe['volume'] > dataframe['volume_ma'] * 1.2
# 条件4: 至少有一个时间框架的趋势确认
trend_confirmation = (dataframe['trend_3m'] == 1) | (dataframe['trend_15m'] == 1) | (dataframe['trend_1h_ema'] == 1)
# 合并所有条件
final_condition = rsi_cross_down & volume_spike & trend_confirmation
# 设置入场信号
dataframe.loc[final_condition, 'enter_long'] = 1
# 日志记录
# if dataframe['enter_long'].sum() > 0:
# logger.info(f"[{metadata['pair']}] 发现入场信号数量: {dataframe['enter_long'].sum()}")
return dataframe
def custom_stoploss(self, pair: str, trade: 'Trade', current_time, current_rate: float,
current_profit: float, **kwargs) -> float:
"""自定义动态止损"""
# 动态止损基于ATR
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1]
atr = last_candle['atr']
# 获取当前市场状态
current_state = dataframe['market_state'].iloc[-1] if 'market_state' in dataframe.columns else 'unknown'
# 更激进的渐进式止损策略
if current_profit > 0.05: # 利润超过5%时
return -3.0 * atr / current_rate # 更大幅扩大止损范围,让利润奔跑
elif current_profit > 0.03: # 利润超过3%时
return -2.5 * atr / current_rate # 更中等扩大止损范围
elif current_profit > 0.01: # 利润超过1%时
return -2.0 * atr / current_rate # 更轻微扩大止损范围
# 在强劲牛市中,即使小亏损也可以容忍更大回调
if current_state == 'strong_bull' and current_profit > -0.01:
return -1.8 * atr / current_rate
if atr > 0:
return -1.2 * atr / current_rate # 基础1.2倍ATR止损
return self.stoploss
def custom_exit(self, pair: str, trade: Trade, current_time: datetime, current_rate: float,
current_profit: float, **kwargs) -> float:
"""自定义出场逻辑,实现固定止盈目标"""
if trade.is_short:
return 0.0
# 基于OKX App的单周期止盈目标2.50%
if current_profit >= self.take_profit_target.value:
return 1.0 # 全额出场
# 未达到止盈目标,不出场
return 0.0
def adjust_trade_position(self, trade: 'Trade', current_time, current_rate: float,
current_profit: float, min_stake: float, max_stake: float, **kwargs) -> float:
"""
实现马丁格尔加仓逻辑
- 基于OKX App的参数跌幅0.66%加仓最大10次加仓1.05倍加仓比例
"""
# 获取当前交易对
pair = trade.pair
# 获取当前交易的加仓次数
entry_count = len(trade.orders) # 获取所有入场订单数量
# 如果已经达到最大加仓次数,则不再加仓
if entry_count - 1 >= self.max_entry_adjustments.value:
logger.info(f"[{pair}] 已达到最大加仓次数 {self.max_entry_adjustments.value},停止加仓")
return 0.0
# 获取初始入场价格和当前价格的差值百分比
initial_price = trade.open_rate
if initial_price == 0:
return 0.0
price_diff_pct = (current_rate - initial_price) / initial_price
# 检查价格回调是否达到加仓间隔OKX App中的0.66%跌幅)
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
current_state = dataframe['market_state'].iloc[-1] if 'market_state' in dataframe.columns else 'neutral'
if price_diff_pct <= -self.add_position_callback.value and current_state not in ['bear', 'weak_bear']:
# 计算初始入场金额
initial_stake = trade.orders[0].cost # 第一笔订单的成本
# 计算加仓次数从1开始计数
adjustment_count = entry_count - 1 # 已加仓次数
# 计算加仓金额: (initial_stake * step_coefficient) ^ (adjustment_count + 1) / stake_divisor
# 实现OKX App中的1.05倍加仓比例
additional_stake = (self.step_coefficient.value * initial_stake / self.stake_divisor.value) ** (adjustment_count + 1)
# 确保加仓金额在允许的范围内≥2 USDT与OKX App一致
additional_stake = max(min_stake, min(additional_stake, max_stake - trade.stake_amount))
logger.info(f"[{pair}] 触发加仓: 第{adjustment_count + 1}次加仓, 初始金额{initial_stake:.2f}, \
加仓金额{additional_stake:.2f}, 价格差{price_diff_pct:.2%}, 当前利润{current_profit:.2%}")
return additional_stake
# 不符合加仓条件返回0
return 0.0