myTestFreqAI/freqtrade/templates/freqaiprimer.py
2026-02-04 22:03:24 +08:00

1962 lines
106 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import warnings
warnings.filterwarnings("ignore", category=UserWarning, module="pandas_ta")
import logging
from freqtrade.strategy import IStrategy, IntParameter, DecimalParameter
from pandas import DataFrame
import pandas as pd
import pandas_ta as ta
from freqtrade.persistence import Trade
import numpy as np
import math
from typing import Tuple, Dict
from datetime import datetime
logger = logging.getLogger(__name__)
class FreqaiPrimer(IStrategy):
# 策略参数 - 使用custom_roi替代minimal_roi字典
loglevel = "warning"
minimal_roi = {}
# 启用自定义ROI回调函数
use_custom_roi = True
# FreqAI 要求
process_only_new_candles = True
stoploss = -0.15 # 固定止损 -15% (大幅放宽止损以承受更大波动)
trailing_stop = True
trailing_stop_positive_offset = 0.005 # 追踪止损偏移量 0.5% (更容易触发跟踪止盈)
# 用于跟踪市场状态的数据框缓存
_dataframe_cache = None
def __init__(self, config=None):
"""初始化策略参数调用父类初始化方法并接受config参数"""
super().__init__(config) # 调用父类的初始化方法并传递config
# 存储从配置文件加载的默认值
self._trailing_stop_positive_default = 0.004 # 降低默认值以更容易触发跟踪止盈
# 波动系数缓存(简化版:直接计算,无需历史序列)
self._volatility_timestamp = {} # {pair: timestamp}
self._volatility_cache = {} # {pair: volatility_coef}
self._volatility_update_interval = 180 # 波动系数更新间隔3分钟
# 入场间隔控制:记录每个交易对最近一次入场的时间
# 格式: {pair: datetime}
self._last_entry_time = {}
def strategy_log(self, message: str, level: str = "info") -> None:
"""根据 config 的 enable_strategy_log 决定是否输出日志"""
enable_log = self.config.get('enable_strategy_log', False)
if not enable_log:
return
if level.lower() == "debug":
logger.debug(message)
elif level.lower() == "warning":
logger.warning(message)
elif level.lower() == "error":
logger.error(message)
else:
logger.info(message)
def get_advanced_market_state(self, dataframe: DataFrame, metadata: dict) -> dict:
"""
获取高级市场状态信息,包括趋势强度、方向和可信度
返回: {
'state': 'strong_bull/weak_bull/neutral/weak_bear/strong_bear',
'strength': float, # 趋势强度 (-1.0 to 1.0)
'confidence': float, # 趋势可信度 (0.0 to 1.0)
'momentum': float, # 价格动量
'consistency': float # 趋势一致性
}
"""
if len(dataframe) < 10:
return {
'state': 'neutral',
'strength': 0.0,
'confidence': 0.5,
'momentum': 0.0,
'consistency': 0.5
}
# 获取当前数据
current_data = dataframe.iloc[-1]
# 基础市场状态
basic_state = current_data.get('market_state', 'neutral')
# 计算趋势强度(综合多个指标)
trend_strength = 0.0
weight_sum = 0
# 1. EMA趋势强度
if 'ema_50_1h' in dataframe.columns and 'ema_200_1h' in dataframe.columns:
ema_ratio = current_data['ema_50_1h'] / current_data['ema_200_1h']
ema_strength = (ema_ratio - 1.0) * 10 # 放大差异
trend_strength += ema_strength * 0.3
weight_sum += 0.3
# 2. MACD趋势强度
if 'macd_trend_strength' in dataframe.columns:
macd_strength = current_data['macd_trend_strength']
trend_strength += macd_strength * 0.25
weight_sum += 0.25
# 3. RSI趋势强度
if 'rsi_trend_strength' in dataframe.columns:
rsi_strength = current_data['rsi_trend_strength']
trend_strength += rsi_strength * 0.25
weight_sum += 0.25
# 4. 价格动量
if 'close' in dataframe.columns and len(dataframe) >= 10:
recent_prices = dataframe['close'].tail(10)
price_momentum = (recent_prices.iloc[-1] - recent_prices.iloc[0]) / recent_prices.iloc[0]
trend_strength += price_momentum * 0.2
weight_sum += 0.2
# 标准化趋势强度
if weight_sum > 0:
trend_strength = max(-1.0, min(1.0, trend_strength / weight_sum))
else:
trend_strength = 0.0
# 计算趋势一致性(短期趋势与长期趋势的一致性)
consistency = 0.5
if ('trend_3m' in dataframe.columns and
'trend_15m' in dataframe.columns and
'trend_1h_ema' in dataframe.columns):
trends = [
current_data['trend_3m'],
current_data['trend_15m'],
current_data['trend_1h_ema']
]
# 计算趋势一致性(同向趋势的比例)
bull_count = sum(1 for t in trends if t == 1)
bear_count = len(trends) - bull_count
consistency = max(bull_count, bear_count) / len(trends)
# 计算可信度(基于趋势强度和一致性)
confidence = (abs(trend_strength) + consistency) / 2
return {
'state': basic_state,
'strength': trend_strength,
'confidence': confidence,
'momentum': price_momentum if 'price_momentum' in locals() else 0.0,
'consistency': consistency
}
def identify_entry_type(self, dataframe: DataFrame, idx: int) -> dict:
"""识别入场类型,返回包含类型、建议持仓时间、风险等级等信息的字典"""
row = dataframe.iloc[idx]
# 计算各种特征
bb_width = (row['bb_upper_3m'] - row['bb_lower_3m']) / row['close'] if 'bb_upper_3m' in dataframe.columns and 'bb_lower_3m' in dataframe.columns and row['close'] != 0 else 0.02
atr_ratio = row['atr'] / row['close'] if 'atr' in dataframe.columns and 'close' in dataframe.columns and row['close'] != 0 else 0.01
rsi = row['rsi_3m'] if 'rsi_3m' in dataframe.columns else 50
volume_ratio = row['volume'] / row['volume_ma'] if 'volume' in dataframe.columns and 'volume_ma' in dataframe.columns and row['volume_ma'] != 0 else 1.0
price_change = (row['close'] - row['close'].shift(1)) / row['close'].shift(1) if len(dataframe) > 1 and row['close'].shift(1) != 0 else 0
ema_trend_strength = abs(row['ema_50_3m'] / row['ema_200_3m'] - 1) if 'ema_50_3m' in dataframe.columns and 'ema_200_3m' in dataframe.columns and row['ema_200_3m'] != 0 else 0.01
# 动量背离检测
bullish_divergence = False
bearish_divergence = False
if (len(dataframe) >= 20 and
'close' in dataframe.columns and
'rsi_3m' in dataframe.columns):
recent_close = dataframe['close'].tail(20)
recent_rsi = dataframe['rsi_3m'].tail(20)
price_new_high = (recent_close.iloc[-1] == recent_close.max())
rsi_new_high = (recent_rsi.iloc[-1] == recent_rsi.max())
price_new_low = (recent_close.iloc[-1] == recent_close.min())
rsi_new_low = (recent_rsi.iloc[-1] == recent_rsi.min())
bearish_divergence = price_new_high and not rsi_new_high
bullish_divergence = price_new_low and not rsi_new_low
# 均线排列
ema_bullish = row['ema_50_3m'] > row['ema_200_3m'] if 'ema_50_3m' in dataframe.columns and 'ema_200_3m' in dataframe.columns else False
ema_bearish = row['ema_50_3m'] < row['ema_200_3m'] if 'ema_50_3m' in dataframe.columns and 'ema_200_3m' in dataframe.columns else False
# 价格在布林带中的位置
bb_position = 0.5
if ('bb_upper_3m' in dataframe.columns and
'bb_lower_3m' in dataframe.columns and
(row['bb_upper_3m'] - row['bb_lower_3m']) != 0):
bb_position = (row['close'] - row['bb_lower_3m']) / (row['bb_upper_3m'] - row['bb_lower_3m'])
# 特征组合判断
features = {
'high_volatility': bb_width > 0.05 and atr_ratio > 0.015,
'extreme_rsi': rsi > 80 or rsi < 20,
'momentum_divergence': bullish_divergence or bearish_divergence,
'volume_spike': volume_ratio > 2.0,
'strong_trend': ema_trend_strength > 0.02,
'low_volatility': bb_width < 0.02 and atr_ratio < 0.01,
'breakout': bb_position > 0.95 or bb_position < 0.05,
'mean_reversion_zone': bb_position > 0.9 or bb_position < 0.1,
'momentum_strong': abs(price_change) > 0.02,
'trend_bullish': ema_bullish,
'trend_bearish': ema_bearish,
'range_bound': 0.3 <= bb_position <= 0.7 and bb_width < 0.01
}
# 类型判断逻辑
if features['high_volatility'] and features['extreme_rsi'] and features['momentum_divergence']:
return {'type': 0, 'duration': 45, 'risk': 2, 'confidence': 0.7, 'name': '快进快出'}
elif features['volume_spike'] and features['momentum_strong']:
return {'type': 5, 'duration': 360, 'risk': 1, 'confidence': 0.6, 'name': '突破追涨'}
elif features['strong_trend'] and features['low_volatility'] and features['trend_bullish']:
return {'type': 3, 'duration': 1440, 'risk': 0, 'confidence': 0.8, 'name': '长期持有'}
elif features['mean_reversion_zone'] and features['low_volatility']:
return {'type': 4, 'duration': 240, 'risk': 1, 'confidence': 0.6, 'name': '套利机会'}
elif features['range_bound']:
return {'type': 6, 'duration': 360, 'risk': 1, 'confidence': 0.5, 'name': '震荡套利'}
elif 0.02 <= bb_width <= 0.05 and 40 <= rsi <= 60:
return {'type': 2, 'duration': 720, 'risk': 1, 'confidence': 0.7, 'name': '中期趋势'}
elif 0.02 <= bb_width <= 0.05 and (30 <= rsi <= 40 or 60 <= rsi <= 70):
return {'type': 1, 'duration': 240, 'risk': 1, 'confidence': 0.6, 'name': '短期波段'}
else:
# 默认为中期趋势
return {'type': 2, 'duration': 360, 'risk': 1, 'confidence': 0.5, 'name': '默认中期'}
def get_comprehensive_market_context(self, dataframe: DataFrame, pair: str) -> dict:
"""
获取全面的市场环境信息,整合市场形态标签和市场趋势
"""
# 获取市场状态
current_state = dataframe['market_state'].iloc[-1] if 'market_state' in dataframe.columns else 'neutral'
# 获取高级市场状态信息
advanced_market_info = self.get_advanced_market_state(dataframe, {'pair': pair})
# 获取当前行数据
current_row = dataframe.iloc[-1]
# 整合所有市场信息
market_context = {
'market_state': current_state,
'advanced_market_info': advanced_market_info,
'trend_strength': advanced_market_info['strength'],
'trend_confidence': advanced_market_info['confidence'],
'volatility': current_row.get('bb_width', 0.02), # 假设存在bb_width
'momentum': current_row.get('rsi_1h', 50),
'volume_activity': current_row.get('volume_ratio', 1.0),
'price_position': current_row.get('bb_position', 0.5),
'atr_ratio': current_row.get('atr', 0.01) / current_row.get('close', 1.0) if current_row.get('close', 1.0) != 0 else 0.01
}
return market_context
# 只用于adjust_trade_position方法的波动系数获取
def get_volatility_coefficient(self, pair: str) -> float:
"""
获取币对的波动系数(简化版:直接计算,无需历史序列)
- USDT/USDT 波动系数设置为0
- BTC/USDT 波动系数设置为1
- 其他币对:
计算当前波动系数 = 该币对波动率 / BTC/USDT波动率
基于最近200根1h K线足够稳定无需额外平滑
波动系数表示某币对与BTC/USDT相比的波动幅度倍数
- 山寨币的波动系数可能大于3
- 稳定性较高的币对如DOT/USDT波动系数可能小于1
添加了缓存机制每3分钟更新一次避免频繁计算
"""
# 检查特殊币对
if pair == 'USDT/USDT':
return 0.0
elif pair == 'BTC/USDT':
return 1.0
try:
# 获取当前时间戳
current_time = datetime.now().timestamp()
# 检查缓存:如果距离上次计算时间小于更新间隔,则直接返回缓存值
if (pair in self._volatility_cache and
pair in self._volatility_timestamp and
current_time - self._volatility_timestamp[pair] < self._volatility_update_interval):
return self._volatility_cache[pair]
# 直接计算当前波动系数基于最近200根1h K线
current_volatility_coef = self._calculate_current_volatility_coef(pair)
# 更新缓存和时间戳
self._volatility_cache[pair] = current_volatility_coef
self._volatility_timestamp[pair] = current_time
self.strategy_log(f"波动系数计算完成 {pair}: 系数={current_volatility_coef:.4f} (基于最近200根1h K线)")
return current_volatility_coef
except Exception as e:
logger.warning(f"计算波动系数时出错 {pair}: {str(e)}")
# 如果出错尝试返回缓存值否则返回默认值1.0
return self._volatility_cache.get(pair, 1.0)
def _calculate_current_volatility_coef(self, pair: str) -> float:
"""
计算当前的波动系数(该币对波动率 / BTC/USDT波动率
"""
try:
# 获取当前币对的1小时k线数据
current_pair_df, _ = self.dp.get_analyzed_dataframe(pair, '1h')
# 获取BTC/USDT的1小时k线数据
btc_df, _ = self.dp.get_analyzed_dataframe('BTC/USDT', '1h')
# 确保有足够的数据点
if len(current_pair_df) < 2 or len(btc_df) < 2:
return 1.0 # 如果没有足够数据返回默认值1.0
# 对于数据点少于200个的情况使用所有可用数据
# 对于数据点多于200个的情况使用最近200个数据点
current_data = current_pair_df.iloc[-min(200, len(current_pair_df)):]
btc_data = btc_df.iloc[-min(200, len(btc_df)):]
# 计算当前币对的对数收益率和波动率
current_data['returns'] = current_data['close'].pct_change()
current_volatility = current_data['returns'].std() * 100 # 转换为百分比
# 计算BTC/USDT的对数收益率和波动率
btc_data['returns'] = btc_data['close'].pct_change()
btc_volatility = btc_data['returns'].std() * 100 # 转换为百分比
# 避免除以零的情况
if btc_volatility == 0:
return 1.0
# 计算波动系数:当前币对波动率 / BTC/USDT波动率
volatility_coef = current_volatility / btc_volatility
# 设置合理的上下限,避免极端值影响策略
# 上限设置为5.0(非常高波动的币对)
# 下限设置为0.1(非常稳定的币对)
return max(0.1, min(5.0, volatility_coef))
except Exception as e:
logger.warning(f"计算当前波动系数时出错 {pair}: {str(e)}")
return 1.0 # 出错时返回默认值1.0
# 其他辅助方法可以在这里添加
@property
def protections(self):
"""
保护机制配置
基于最新Freqtrade规范保护机制应定义在策略文件中而非配置文件
"""
return [
{
"method": "StoplossGuard",
"lookback_period_candles": 60, # 3小时回看期60根3分钟K线
"trade_limit": 2, # 最多2笔止损交易
"stop_duration_candles": 60, # 暂停180分钟60根3分钟K线
"only_per_pair": False # 仅针对单个币对
},
{
"method": "CooldownPeriod",
"stop_duration_candles": 2 # 6分钟冷却期2根3分钟K线
},
{
"method": "MaxDrawdown",
"lookback_period_candles": 48, # 2.4小时回看期
"trade_limit": 4, # 4笔交易限制
"stop_duration_candles": 24, # 72分钟暂停24根3分钟K线
"max_allowed_drawdown": 0.20 # 20%最大回撤容忍度
}
]
@property
def trailing_stop_positive(self):
"""根据市场状态动态调整跟踪止盈参数"""
# 获取当前市场状态
if self._dataframe_cache is not None and len(self._dataframe_cache) > 0:
current_state = self._dataframe_cache['market_state'].iloc[-1]
if current_state == 'strong_bull':
return 0.007 # 强劲牛市中降低跟踪止盈,让利润奔跑
elif current_state == 'weak_bull':
return 0.005 # 弱势牛市中保持较低的跟踪止盈
return self._trailing_stop_positive_default # 返回默认值
@trailing_stop_positive.setter
def trailing_stop_positive(self, value):
"""设置trailing_stop_positive的默认值"""
self._trailing_stop_positive_default = value
timeframe = "3m" # 主时间框架为 3 分钟
can_short = False # 禁用做空
# 自定义指标参数 - 使用Hyperopt可优化参数
bb_length = IntParameter(10, 30, default=20, optimize=True, load=True, space='buy')
bb_std = DecimalParameter(1.5, 3.0, decimals=1, default=2.0, optimize=True, load=True, space='buy')
rsi_length = IntParameter(7, 21, default=14, optimize=True, load=True, space='buy')
rsi_oversold = IntParameter(30, 50, default=42, optimize=True, load=True, space='buy')
# 入场条件阈值参数
bb_lower_deviation = DecimalParameter(1.01, 1.05, decimals=2, default=1.03, optimize=True, load=True, space='buy')
rsi_bull_threshold = IntParameter(45, 55, default=50, optimize=True, load=True, space='buy')
stochrsi_bull_threshold = IntParameter(30, 40, default=35, optimize=True, load=True, space='buy')
stochrsi_neutral_threshold = IntParameter(20, 30, default=25, optimize=True, load=True, space='buy')
volume_multiplier = DecimalParameter(1.2, 2.0, decimals=1, default=1.5, optimize=True, load=True, space='buy')
bb_width_threshold = DecimalParameter(0.01, 0.03, decimals=3, default=0.02, optimize=True, load=True, space='buy')
min_condition_count = IntParameter(2, 4, default=3, optimize=True, load=True, space='buy')
# 剧烈拉升检测参数 - 使用Hyperopt可优化参数
h1_max_candles = IntParameter(100, 300, default=200, optimize=True, load=True, space='buy')
h1_rapid_rise_threshold = DecimalParameter(0.05, 0.15, decimals=3, default=0.11, optimize=True, load=True, space='buy')
h1_max_consecutive_candles = IntParameter(1, 4, default=2, optimize=True, load=True, space='buy')
# 入场间隔控制参数(分钟)
entry_interval_minutes = IntParameter(20, 200, default=42, optimize=True, load=True, space='buy')
# ML 审核官entry_signal 拒绝入场的阈值(越高越宽松,越低越严格)
ml_entry_signal_threshold = DecimalParameter(0.05, 0.85, decimals=2, default=0.37, optimize=True, load=True, space='buy')
# ML 审核官exit_signal 拒绝出场的阈值(越高越宽松,越低越严格)
ml_exit_signal_threshold = DecimalParameter(0.05, 0.85, decimals=2, default=0.68, optimize=True, load=True, space='buy')
# FreqAI 标签定义entry_signal 的洛底上涨幅度(%
freqai_entry_up_percent = DecimalParameter(0.3, 2.0, decimals=2, default=0.5, optimize=True, load=True, space='buy')
# FreqAI 标签定义exit_signal 的洛底下跌幅度(%
freqai_exit_down_percent = DecimalParameter(0.3, 2.0, decimals=2, default=0.5, optimize=True, load=True, space='buy')
# 定义可优化参数
# 初始入场金额: 75.00
# 加仓次数 相对降幅间隔 加仓金额
# ------- ------------ --------
# 0 N/A 75
# 1 0.045000 36.29
# 2 0.051750 163.31
# 3 0.059513 734.88
# 4 0.068439 3306.96
#
# 累计投入金额: 4316.43
max_entry_adjustments = IntParameter(2, 5, default=4, optimize=False, load=True, space='buy') # 最大加仓次数
add_position_callback = DecimalParameter(0.02, 0.06, decimals=3, default=0.047, optimize=False, load=True, space='buy') # 加仓回调百分比
add_position_growth = DecimalParameter(1.5, 5.0, decimals=2, default=4.5, optimize=False, load=True, space='buy') # 加仓金额增长因子保留2位小数用于hyperopt优化
add_position_multiplier = DecimalParameter(0.2, 2, decimals=2, default=1.35, optimize=False, load=True, space='buy') # 加仓间隔系数保留2位小数用于hyperopt优化
stake_divisor = DecimalParameter(2.0, 12.0, decimals=2, default=9.3, optimize=False, load=True, space='buy') # 加仓金额分母小数类型保留2位小数
# 线性ROI参数 - 用于线性函数: y = (a * (x + k)) + t
roi_param_a = DecimalParameter(-0.0002, -0.00005, decimals=5, default=-0.0001, optimize=True, load=True, space='sell') # 系数a
roi_param_k = IntParameter(20, 150, default=50, optimize=True, load=True, space='sell') # 偏移量k
roi_param_t = DecimalParameter(0.02, 0.18, decimals=3, default=0.06, optimize=True, load=True, space='sell') # 常数项t
# 出场条件阈值参数
exit_bb_upper_deviation = DecimalParameter(0.98, 1.02, decimals=2, default=1.0, optimize=True, load=True, space='sell')
exit_volume_multiplier = DecimalParameter(1.5, 3.0, decimals=1, default=2.0, optimize=True, load=True, space='sell')
rsi_overbought = IntParameter(50, 70, default=58, optimize=True, load=True, space='sell')
def informative_pairs(self):
pairs = self.dp.current_whitelist()
return [(pair, '15m') for pair in pairs] + [(pair, '1h') for pair in pairs]
def _validate_dataframe_columns(self, dataframe: DataFrame, required_columns: list, metadata: dict):
"""
验证数据框中是否包含所有需要的列。
如果缺少列,则记录警告日志。
"""
missing_columns = [col for col in required_columns if col not in dataframe.columns]
if missing_columns:
logger.warning(f"[{metadata['pair']}] 数据框中缺少以下列: {missing_columns}")
# ========================= FreqAI 特征与标签定义 =========================
def feature_engineering_expand_all(self, dataframe: DataFrame, period: int, metadata: dict, **kwargs) -> DataFrame:
"""FreqAI 全量特征:这里先用简单技术指标,后续可逐步扩展。"""
# 使用 rolling 计算 RSI减少看前偏差
delta = dataframe["close"].diff()
gain = delta.where(delta > 0, 0).rolling(window=period).mean()
loss = -delta.where(delta < 0, 0).rolling(window=period).mean()
rs = gain / loss
dataframe[f"%-rsi-{period}"] = 100 - (100 / (1 + rs))
dataframe[f"%-mfi-{period}"] = ta.mfi(dataframe["high"], dataframe["low"], dataframe["close"], dataframe["volume"], length=period)
adx_df = ta.adx(dataframe["high"], dataframe["low"], dataframe["close"], length=period)
adx_col = f"ADX_{period}"
if adx_col in adx_df.columns:
dataframe[f"%-adx-{period}"] = adx_df[adx_col]
return dataframe
def feature_engineering_expand_basic(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame:
"""FreqAI 基础特征。"""
dataframe["%-pct_change"] = dataframe["close"].pct_change().fillna(0)
dataframe["%-raw_volume"] = dataframe["volume"].fillna(0)
dataframe["%-raw_price"] = dataframe["close"].ffill() # 使用 ffill() 替代 fillna(method="ffill")
return dataframe
def feature_engineering_standard(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame:
"""FreqAI 标准时间类特征。"""
if "date" in dataframe.columns:
dataframe["%-day_of_week"] = dataframe["date"].dt.dayofweek
dataframe["%-hour_of_day"] = dataframe["date"].dt.hour
return dataframe
def set_freqai_targets(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame:
"""定义 FreqAI 训练标签:简单二分类版本 + 持仓时长预测。"""
# 从配置中读取预测窗口参数(禁止硬编码)
label_horizon = self.freqai_info.get('feature_parameters', {}).get('label_period_candles', 24)
# 动态计算上涨/下跌阈值
entry_up_percent = self.freqai_entry_up_percent.value / 100.0 # 转换为小数(如 0.01 表示 1%
exit_down_percent = self.freqai_exit_down_percent.value / 100.0
entry_up_threshold = 1.0 + entry_up_percent # 例如 1.01 表示 +1%
exit_down_threshold = 1.0 - exit_down_percent # 例如 0.99 表示 -1%
# 入场标签:未来窗口内的最高价是否超过 +1%
future_max = dataframe["close"].rolling(window=label_horizon, min_periods=1).max().shift(-label_horizon + 1)
dataframe["&s-entry_signal"] = np.where(
future_max > dataframe["close"] * entry_up_threshold,
1,
0,
)
# 出场标签:未来窗口内的最低价是否跌破 -1%
future_min = dataframe["close"].rolling(window=label_horizon, min_periods=1).min().shift(-label_horizon + 1)
dataframe["&s-exit_signal"] = np.where(
future_min < dataframe["close"] * exit_down_threshold,
1,
0,
)
# 新增:未来波动率预测标签(极端化方案)
# 计算当前波动率过10根K线的收盘价波动
current_volatility = dataframe["close"].pct_change().rolling(window=10, min_periods=5).std()
# 计算未来10根K线的波动率向未来移动
future_pct_change = dataframe["close"].pct_change().shift(-1) # 未来的收盘价变化
future_volatility = future_pct_change.rolling(window=10, min_periods=5).std().shift(-9) # 未来10根K线的波动率
# 标签:未来波动率 > 当前波动率 * 1.5 则标记为高波动(趋势启动)
volatility_ratio = future_volatility / (current_volatility + 1e-8) # 避免除以0
dataframe["&s-future_volatility"] = np.where(
volatility_ratio > 1.5,
1, # 未来高波动(趋势启动),继续持有
0 # 未来低波动(震荡市),快速止盈
)
return dataframe
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 计算 3m 周期的指标
bb_length_value = self.bb_length.value
bb_std_value = self.bb_std.value
rsi_length_value = self.rsi_length.value
# 使用 rolling 计算布林带(减少看前偏差)
bb_ma_3m = dataframe['close'].rolling(window=bb_length_value).mean()
bb_std_3m = dataframe['close'].rolling(window=bb_length_value).std()
dataframe['bb_lower_3m'] = bb_ma_3m - (bb_std_value * bb_std_3m)
dataframe['bb_upper_3m'] = bb_ma_3m + (bb_std_value * bb_std_3m)
# 使用 rolling 计算 RSI减少看前偏差
delta_3m = dataframe['close'].diff()
gain_3m = delta_3m.where(delta_3m > 0, 0).rolling(window=rsi_length_value).mean()
loss_3m = -delta_3m.where(delta_3m < 0, 0).rolling(window=rsi_length_value).mean()
rs_3m = gain_3m / loss_3m
dataframe['rsi_3m'] = 100 - (100 / (1 + rs_3m))
# 新增 StochRSI 指标
stochrsi_3m = ta.stochrsi(dataframe['close'], length=rsi_length_value, rsi_length=rsi_length_value)
dataframe['stochrsi_k_3m'] = stochrsi_3m[f'STOCHRSIk_{rsi_length_value}_{rsi_length_value}_3_3']
dataframe['stochrsi_d_3m'] = stochrsi_3m[f'STOCHRSId_{rsi_length_value}_{rsi_length_value}_3_3']
# 新增 KDJ 指标3m
kdj_3m = ta.stoch(high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], k=9, d=3, smooth_k=3)
dataframe['kdj_k_3m'] = kdj_3m['STOCHk_9_3_3']
dataframe['kdj_d_3m'] = kdj_3m['STOCHd_9_3_3']
dataframe['kdj_j_3m'] = 3 * dataframe['kdj_k_3m'] - 2 * dataframe['kdj_d_3m']
# 新增 MACD 指标
macd_3m = ta.macd(dataframe['close'], fast=12, slow=26, signal=9)
dataframe['macd_3m'] = macd_3m['MACD_12_26_9']
dataframe['macd_signal_3m'] = macd_3m['MACDs_12_26_9']
dataframe['macd_hist_3m'] = macd_3m['MACDh_12_26_9']
# 使用 ewm 计算 EMA减少看前偏差adjust=False 确保实时计算)
dataframe['ema_50_3m'] = dataframe['close'].ewm(span=50, adjust=False).mean()
dataframe['ema_200_3m'] = dataframe['close'].ewm(span=200, adjust=False).mean()
# 成交量过滤
dataframe['volume_ma'] = dataframe['volume'].rolling(20).mean()
# 计算 ATR 用于动态止损和退出
dataframe['atr'] = ta.atr(dataframe['high'], dataframe['low'], dataframe['close'], length=14)
# 获取 15m 数据
df_15m = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='15m')
# 使用 rolling 计算 RSI减少看前偏差
delta_15m = df_15m['close'].diff()
gain_15m = delta_15m.where(delta_15m > 0, 0).rolling(window=rsi_length_value).mean()
loss_15m = -delta_15m.where(delta_15m < 0, 0).rolling(window=rsi_length_value).mean()
rs_15m = gain_15m / loss_15m
df_15m['rsi_15m'] = 100 - (100 / (1 + rs_15m))
# 使用 ewm 计算 EMA减少看前偏差
df_15m['ema_50_15m'] = df_15m['close'].ewm(span=50, adjust=False).mean()
df_15m['ema_200_15m'] = df_15m['close'].ewm(span=200, adjust=False).mean()
# 新增 StochRSI 指标
stochrsi_15m = ta.stochrsi(df_15m['close'], length=rsi_length_value, rsi_length=rsi_length_value)
df_15m['stochrsi_k_15m'] = stochrsi_15m[f'STOCHRSIk_{rsi_length_value}_{rsi_length_value}_3_3']
df_15m['stochrsi_d_15m'] = stochrsi_15m[f'STOCHRSId_{rsi_length_value}_{rsi_length_value}_3_3']
# 新增 MACD 指标
macd_15m = ta.macd(df_15m['close'], fast=12, slow=26, signal=9)
df_15m['macd_15m'] = macd_15m['MACD_12_26_9']
df_15m['macd_signal_15m'] = macd_15m['MACDs_12_26_9']
df_15m['macd_hist_15m'] = macd_15m['MACDh_12_26_9']
# 将 15m 数据重新索引到主时间框架 (3m)
df_15m = df_15m.set_index('date').reindex(dataframe['date']).reset_index()
df_15m = df_15m.rename(columns={'index': 'date'})
df_15m = df_15m[['date', 'rsi_15m', 'ema_50_15m', 'ema_200_15m']].ffill()
# 合并 15m 数据
dataframe = dataframe.merge(df_15m, how='left', on='date')
# 获取 1h 数据
df_1h = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='1h')
# 使用 rolling 计算布林带(减少看前偏差)
bb_ma_1h = df_1h['close'].rolling(window=bb_length_value).mean()
bb_std_1h = df_1h['close'].rolling(window=bb_length_value).std()
df_1h['bb_lower_1h'] = bb_ma_1h - (bb_std_value * bb_std_1h)
df_1h['bb_upper_1h'] = bb_ma_1h + (bb_std_value * bb_std_1h)
# 添加 EMA5 和 EMA20 用于趋势过滤方案2宽松条件
df_1h['ema_5_1h'] = df_1h['close'].ewm(span=5, adjust=False).mean()
df_1h['ema_20_1h'] = df_1h['close'].ewm(span=20, adjust=False).mean()
# 检测 EMA5 向上穿越 EMA20添加安全检查
if len(df_1h) >= 2:
df_1h['ema5_cross_above_ema20'] = (
(df_1h['ema_5_1h'] > df_1h['ema_20_1h']) &
(df_1h['ema_5_1h'].shift(1) <= df_1h['ema_20_1h'].shift(1))
)
else:
# 数据不足时默认为False
df_1h['ema5_cross_above_ema20'] = False
# 使用 rolling 计算 RSI减少看前偏差
delta_1h = df_1h['close'].diff()
gain_1h = delta_1h.where(delta_1h > 0, 0).rolling(window=rsi_length_value).mean()
loss_1h = -delta_1h.where(delta_1h < 0, 0).rolling(window=rsi_length_value).mean()
rs_1h = gain_1h / loss_1h
df_1h['rsi_1h'] = 100 - (100 / (1 + rs_1h))
# 使用 ewm 计算 EMA减少看前偏差
df_1h['ema_50_1h'] = df_1h['close'].ewm(span=50, adjust=False).mean()
df_1h['ema_200_1h'] = df_1h['close'].ewm(span=200, adjust=False).mean()
df_1h['trend_1h'] = df_1h['close'] > df_1h['ema_50_1h'] # 1h上涨趋势
# 新增 StochRSI 指标
stochrsi_1h = ta.stochrsi(df_1h['close'], length=rsi_length_value, rsi_length=rsi_length_value)
df_1h['stochrsi_k_1h'] = stochrsi_1h[f'STOCHRSIk_{rsi_length_value}_{rsi_length_value}_3_3']
df_1h['stochrsi_d_1h'] = stochrsi_1h[f'STOCHRSId_{rsi_length_value}_{rsi_length_value}_3_3']
# 新增 KDJ 指标1h
kdj_1h = ta.stoch(high=df_1h['high'], low=df_1h['low'], close=df_1h['close'], k=9, d=3, smooth_k=3)
df_1h['kdj_k_1h'] = kdj_1h['STOCHk_9_3_3']
df_1h['kdj_d_1h'] = kdj_1h['STOCHd_9_3_3']
df_1h['kdj_j_1h'] = 3 * df_1h['kdj_k_1h'] - 2 * df_1h['kdj_d_1h']
# 新增 MACD 指标
macd_1h = ta.macd(df_1h['close'], fast=12, slow=26, signal=9)
df_1h['macd_1h'] = macd_1h['MACD_12_26_9']
df_1h['macd_signal_1h'] = macd_1h['MACDs_12_26_9']
df_1h['macd_hist_1h'] = macd_1h['MACDh_12_26_9']
# 验证 MACD 列是否正确生成
#self.strategy_log(f"[{metadata['pair']}] 1小时 MACD 列: {list(macd_1h.columns)}")
# 确保 StochRSI 指标已正确计算
# 将 1h 数据重新索引到主时间框架 (3m),并填充缺失值
df_1h = df_1h.set_index('date').reindex(dataframe['date']).ffill().bfill().reset_index()
df_1h = df_1h.rename(columns={'index': 'date'})
# Include macd_1h, macd_signal_1h, ema_5_1h, ema_20_1h, ema5_cross_above_ema20 in the column selection
df_1h = df_1h[['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h', 'bb_lower_1h', 'bb_upper_1h', 'stochrsi_k_1h', 'stochrsi_d_1h', 'kdj_k_1h', 'kdj_d_1h', 'kdj_j_1h', 'macd_1h', 'macd_signal_1h', 'ema_5_1h', 'ema_20_1h', 'ema5_cross_above_ema20']].ffill()
required_columns = ['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h',
'bb_lower_1h', 'bb_upper_1h', 'stochrsi_k_1h', 'stochrsi_d_1h',
'kdj_k_1h', 'kdj_d_1h', 'kdj_j_1h',
'macd_1h', 'macd_signal_1h', 'ema_5_1h', 'ema_20_1h', 'ema5_cross_above_ema20']
missing_columns = [col for col in required_columns if col not in df_1h.columns]
if missing_columns:
logger.error(f"[{metadata['pair']}] 缺少以下列: {missing_columns}")
raise KeyError(f"缺少以下列: {missing_columns}")
# 确保所有需要的列都被合并
required_columns = ['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h',
'bb_lower_1h', 'bb_upper_1h', 'stochrsi_k_1h', 'stochrsi_d_1h',
'kdj_k_1h', 'kdj_d_1h', 'kdj_j_1h',
'macd_1h', 'macd_signal_1h', 'ema_5_1h', 'ema_20_1h', 'ema5_cross_above_ema20']
# 验证所需列是否存在
missing_columns = [col for col in required_columns if col not in df_1h.columns]
if missing_columns:
logger.error(f"[{metadata['pair']}] 缺少以下列: {missing_columns}")
raise KeyError(f"缺少以下列: {missing_columns}")
df_1h = df_1h[required_columns] # 确保包含所有必需的列包括EMA过滤相关列
# 合并 1h 数据
dataframe = dataframe.merge(df_1h, how='left', on='date').ffill()
# 验证合并后的列
#self.strategy_log(f"[{metadata['pair']}] 合并后的数据框列名: {list(dataframe.columns)}")
# K线形态看涨吞没
dataframe['bullish_engulfing'] = (
(dataframe['close'].shift(1) < dataframe['open'].shift(1)) &
(dataframe['close'] > dataframe['open']) &
(dataframe['close'] > dataframe['open'].shift(1)) &
(dataframe['open'] < dataframe['close'].shift(1))
)
# 计算各时间框架的趋势状态(牛/熊)
# 3m时间框架综合EMA趋势、价格动量和趋势强度
# EMA趋势
ema_trend_3m = np.where(dataframe['ema_50_3m'] > dataframe['ema_200_3m'], 1, 0)
# 价格动量(短期价格变化趋势)
price_momentum_3m = np.where(dataframe['close'] > dataframe['close'].shift(5), 1, 0)
# 趋势强度EMA斜率
ema_slope_3m = (dataframe['ema_50_3m'] - dataframe['ema_50_3m'].shift(5)) / dataframe['ema_50_3m'].shift(5)
slope_trend_3m = np.where(ema_slope_3m > 0.001, 1, 0) # 超过0.1%的增长率
# 综合趋势(三者投票)
dataframe['trend_3m'] = np.where((ema_trend_3m + price_momentum_3m + slope_trend_3m) >= 2, 1, 0)
# 15m时间框架综合EMA趋势、价格动量和趋势强度
ema_trend_15m = np.where(dataframe['ema_50_15m'] > dataframe['ema_200_15m'], 1, 0)
price_momentum_15m = np.where(dataframe['close'] > dataframe['close'].shift(5), 1, 0)
ema_slope_15m = (dataframe['ema_50_15m'] - dataframe['ema_50_15m'].shift(5)) / dataframe['ema_50_15m'].shift(5)
slope_trend_15m = np.where(ema_slope_15m > 0.002, 1, 0) # 超过0.2%的增长率
dataframe['trend_15m'] = np.where((ema_trend_15m + price_momentum_15m + slope_trend_15m) >= 2, 1, 0)
# 1h时间框架综合EMA趋势、价格动量和趋势强度
ema_trend_1h = np.where(dataframe['ema_50_1h'] > dataframe['ema_200_1h'], 1, 0)
price_momentum_1h = np.where(dataframe['close'] > dataframe['close'].shift(5), 1, 0)
ema_slope_1h = (dataframe['ema_50_1h'] - dataframe['ema_50_1h'].shift(5)) / dataframe['ema_50_1h'].shift(5)
slope_trend_1h = np.where(ema_slope_1h > 0.003, 1, 0) # 超过0.3%的增长率
dataframe['trend_1h_ema'] = np.where((ema_trend_1h + price_momentum_1h + slope_trend_1h) >= 2, 1, 0)
# 计算趋势强度(考虑趋势的一致性和持续性)
# 计算趋势一致性得分
trend_consistency_3m = dataframe['trend_3m'] * (1 + abs(ema_slope_3m.fillna(0)) * 10) # 斜率越大,趋势越强
trend_consistency_15m = dataframe['trend_15m'] * (1 + abs(ema_slope_15m.fillna(0)) * 10)
trend_consistency_1h = dataframe['trend_1h_ema'] * (1 + abs(ema_slope_1h.fillna(0)) * 10)
# 计算熊牛得分0-100
# 权重3m熊牛权重1015m熊牛权重351h熊牛权重65
# 加入趋势强度因子
dataframe['market_score'] = (
trend_consistency_3m * 10 +
trend_consistency_15m * 35 +
trend_consistency_1h * 65
)
# 确保得分在0-100范围内
dataframe['market_score'] = dataframe['market_score'].clip(lower=0, upper=100)
# 根据得分分类市场状态
dataframe['market_state'] = 'neutral'
dataframe.loc[dataframe['market_score'] > 70, 'market_state'] = 'strong_bull'
dataframe.loc[(dataframe['market_score'] > 50) & (dataframe['market_score'] <= 70), 'market_state'] = 'weak_bull'
dataframe.loc[(dataframe['market_score'] >= 30) & (dataframe['market_score'] <= 50), 'market_state'] = 'neutral'
dataframe.loc[(dataframe['market_score'] > 10) & (dataframe['market_score'] < 30), 'market_state'] = 'weak_bear'
dataframe.loc[dataframe['market_score'] <= 10, 'market_state'] = 'strong_bear'
# 添加趋势强度指标
# 计算MACD趋势强度
if 'macd_1h' in dataframe.columns and 'macd_signal_1h' in dataframe.columns:
dataframe['macd_trend_strength'] = np.where(
(dataframe['macd_1h'] > dataframe['macd_signal_1h']) & (dataframe['macd_1h'] > 0),
np.abs(dataframe['macd_1h'] - dataframe['macd_signal_1h']),
-np.abs(dataframe['macd_1h'] - dataframe['macd_signal_1h'])
)
else:
dataframe['macd_trend_strength'] = 0
# 计算RSI趋势强度
if 'rsi_1h' in dataframe.columns:
# RSI偏离中性区域的程度
rsi_deviation = np.where(
dataframe['rsi_1h'] > 50,
(dataframe['rsi_1h'] - 50) / 50, # 牛市强度
(dataframe['rsi_1h'] - 50) / 50 # 熊市强度
)
dataframe['rsi_trend_strength'] = rsi_deviation
else:
dataframe['rsi_trend_strength'] = 0
# 创建一个使用前一行市场状态的列避免在populate_entry_trend中使用iloc[-1]
dataframe['prev_market_state'] = dataframe['market_state'].shift(1)
# 为第一行设置默认值
dataframe['prev_market_state'] = dataframe['prev_market_state'].fillna('neutral')
# ========== 新增:成交量趋势指标 ==========
# 成交量变化率
dataframe['volume_change'] = dataframe['volume'].pct_change()
# 成交量移动平均比值
dataframe['volume_ma_ratio'] = dataframe['volume'] / dataframe['volume_ma']
# 成交量标准差(衡量成交量波动性)
dataframe['volume_std'] = dataframe['volume'].rolling(window=20).std()
dataframe['volume_cv'] = dataframe['volume_std'] / dataframe['volume_ma'] # 变异系数
# 成交量Z分数检测异常成交量
dataframe['volume_zscore'] = (dataframe['volume'] - dataframe['volume_ma']) / dataframe['volume_std']
# 成交量趋势(成交量是否在上升通道)
dataframe['volume_trend'] = np.where(dataframe['volume'] > dataframe['volume_ma'], 1, 0)
# 成交量EMA趋势
dataframe['volume_ema_fast'] = dataframe['volume'].ewm(span=5, adjust=False).mean()
dataframe['volume_ema_slow'] = dataframe['volume'].ewm(span=20, adjust=False).mean()
dataframe['volume_ema_trend'] = np.where(dataframe['volume_ema_fast'] > dataframe['volume_ema_slow'], 1, 0)
# ========== 新增:量价关系指标 ==========
# 价格变化率
dataframe['price_change'] = dataframe['close'].pct_change()
# 量价确认度(价格和成交量同向变动)
dataframe['volume_price_confirmation'] = np.sign(dataframe['price_change']) * np.sign(dataframe['volume_change'])
# 量价比率(成交量/价格)
dataframe['volume_price_ratio'] = dataframe['volume'] / dataframe['close']
# 量价背离检测
# 价格创新高但成交量未创新高(顶背离)
dataframe['price_new_high'] = (dataframe['close'] == dataframe['close'].rolling(20).max()).astype(int)
dataframe['volume_new_high'] = (dataframe['volume'] == dataframe['volume'].rolling(20).max()).astype(int)
dataframe['bearish_divergence'] = (dataframe['price_new_high'] == 1) & (dataframe['volume_new_high'] == 0)
# 价格创新低但成交量未创新低(底背离)
dataframe['price_new_low'] = (dataframe['close'] == dataframe['close'].rolling(20).min()).astype(int)
dataframe['volume_new_low'] = (dataframe['volume'] == dataframe['volume'].rolling(20).min()).astype(int)
dataframe['bullish_divergence'] = (dataframe['price_new_low'] == 1) & (dataframe['volume_new_low'] == 0)
# OBV (On Balance Volume) - 平衡成交量
dataframe['obv'] = dataframe['volume'] * np.sign(dataframe['close'].diff())
dataframe['obv'] = dataframe['obv'].cumsum()
# 量价相关系数(短期窗口)
dataframe['volume_price_corr_short'] = dataframe['close'].pct_change().rolling(window=10).corr(dataframe['volume'].pct_change())
# 量价相关系数(中期窗口)
dataframe['volume_price_corr_medium'] = dataframe['close'].pct_change().rolling(window=20).corr(dataframe['volume'].pct_change())
# 记录当前的市场状态
if len(dataframe) > 0:
current_score = dataframe['market_score'].iloc[-1]
current_state = dataframe['market_state'].iloc[-1]
#self.strategy_log(f"[{metadata['pair']}] 熊牛得分: {current_score:.1f}, 市场状态: {current_state}")
#self.strategy_log(f"[{metadata['pair']}] 各时间框架趋势: 3m={'牛' if dataframe['trend_3m'].iloc[-1] == 1 else '熊'}, \
# 15m={'牛' if dataframe['trend_15m'].iloc[-1] == 1 else '熊'}, \
# 1h={'牛' if dataframe['trend_1h_ema'].iloc[-1] == 1 else '熊'}")
# 调试:打印指标值(最后 5 行),验证时间对齐
#print(f"Pair: {metadata['pair']}, Last 5 rows after reindexing:")
#print(dataframe[['date', 'close', 'bb_lower_3m', 'rsi_3m', 'rsi_15m', 'rsi_1h', 'trend_1h',
# 'trend_3m', 'trend_15m', 'trend_1h_ema', 'market_score', 'market_state',
# 'bullish_engulfing', 'volume', 'volume_ma']].tail(5))
# 打印最终数据框的列名以验证
#self.strategy_log(f"[{metadata['pair']}] 最终数据框列名: {list(dataframe.columns)}")
# 启用 FreqAI在所有指标计算完成后调用
dataframe = self.freqai.start(dataframe, metadata, self)
return dataframe
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 出场信号基于趋势和量价关系
# 条件1: 价格突破布林带上轨(使用可优化的偏差参数)
breakout_condition = dataframe['close'] >= dataframe['bb_upper_1h'] * self.exit_bb_upper_deviation.value
# 条件2: 成交量显著放大(使用可优化的成交量乘数)
volume_spike = dataframe['volume'] > dataframe['volume_ma'] * self.exit_volume_multiplier.value
# 条件3: MACD 下降趋势
macd_downward = dataframe['macd_1h'] < dataframe['macd_signal_1h']
# 条件4: RSI 进入超买区域(使用可优化的超买阈值)
rsi_overbought = dataframe['rsi_1h'] > self.rsi_overbought.value
# ========== KDJ 出场辅助判断 ==========
# 3m KDJ 高位死叉K > 80 且 K 下穿 D部分止盈信号
kdj_3m_exit = False
if 'kdj_k_3m' in dataframe.columns and 'kdj_d_3m' in dataframe.columns:
kdj_3m_death_cross = (dataframe['kdj_k_3m'] > 80) & (dataframe['kdj_k_3m'] < dataframe['kdj_d_3m']) & (dataframe['kdj_k_3m'].shift(1) >= dataframe['kdj_d_3m'].shift(1))
kdj_3m_exit = kdj_3m_death_cross
# 合并所有条件KDJ 作为可选信号,不强制)
final_condition = breakout_condition | volume_spike | macd_downward | rsi_overbought | kdj_3m_exit
# 设置出场信号
dataframe.loc[final_condition, 'exit_long'] = 1
# 设置出场价格上浮1.25%(使用乘法避免除零风险)
# Freqtrade 会优先使用 exit_price 列作为限价单价格
final_exit_condition = dataframe['exit_long'] == 1
#dataframe.loc[final_exit_condition, 'exit_price'] = dataframe.loc[final_exit_condition, 'close'] * 1.0125
# 增强调试信息
#self.strategy_log(f"[{metadata['pair']}] 出场条件检查:")
#self.strategy_log(f" - 价格突破布林带上轨: {breakout_condition.sum()} 次")
#self.strategy_log(f" - 成交量显著放大: {volume_spike.sum()} 次")
#self.strategy_log(f" - MACD 下降趋势: {macd_downward.sum()} 次")
#self.strategy_log(f" - RSI 超买: {rsi_overbought.sum()} 次")
#self.strategy_log(f" - 最终条件: {final_condition.sum()} 次")
#self.strategy_log(f" - 使用参数: exit_bb_upper_deviation={self.exit_bb_upper_deviation.value}, exit_volume_multiplier={self.exit_volume_multiplier.value}, rsi_overbought={self.rsi_overbought.value}")
# 日志记录
#if dataframe['exit_long'].sum() > 0:
# self.strategy_log(f"[{metadata['pair']}] 触发出场信号数量: {dataframe['exit_long'].sum()}")
return dataframe
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 确保prev_market_state列存在
if 'prev_market_state' not in dataframe.columns:
dataframe['prev_market_state'] = 'neutral'
# 条件1: 价格接近布林带下轨(允许一定偏差)
close_to_bb_lower_1h = (dataframe['close'] <= dataframe['bb_lower_1h'] * self.bb_lower_deviation.value) # 可优化偏差
# 条件2: RSI 不高于阈值(根据市场状态动态调整)
# 为每一行创建动态阈值
rsi_condition_1h = dataframe.apply(lambda row:
row['rsi_1h'] < self.rsi_bull_threshold.value if row['prev_market_state'] in ['strong_bull', 'weak_bull'] else row['rsi_1h'] < self.rsi_oversold.value,
axis=1)
# 条件3: StochRSI 处于超卖区域(根据市场状态动态调整)
stochrsi_condition_1h = dataframe.apply(lambda row:
(row['stochrsi_k_1h'] < self.stochrsi_bull_threshold.value and row['stochrsi_d_1h'] < self.stochrsi_bull_threshold.value) if row['prev_market_state'] in ['strong_bull', 'weak_bull']
else (row['stochrsi_k_1h'] < self.stochrsi_neutral_threshold.value and row['stochrsi_d_1h'] < self.stochrsi_neutral_threshold.value),
axis=1)
# 条件4: MACD 上升趋势
macd_condition_1h = dataframe['macd_1h'] > dataframe['macd_signal_1h']
# 条件5: 成交量显著放大(可选条件)
volume_spike = dataframe['volume'] > dataframe['volume_ma'] * self.volume_multiplier.value
# 条件6: 布林带宽度过滤(避免窄幅震荡)
bb_width = (dataframe['bb_upper_1h'] - dataframe['bb_lower_1h']) / dataframe['close']
bb_width_condition = bb_width > self.bb_width_threshold.value # 可优化的布林带宽度阈值
# 辅助条件: 3m 和 15m 趋势确认(允许部分时间框架不一致)
trend_confirmation = (dataframe['trend_3m'] == 1) | (dataframe['trend_15m'] == 1)
# 新增EMA趋势过滤条件方案2宽松版本
# 条件1EMA5保持在EMA20之上 或 条件2最近20根1h K线内发生过向上穿越
# 这样既能捕捉趋势启动,又能在趋势延续时继续入场
# ========== KDJ 过滤逻辑1h + 3m 组合)==========
# KDJ 作为辅助信号,不作为强制条件
# 1h KDJ大级别底部确认/方向过滤(只禁止极端高位)
kdj_1h_block = False
if 'kdj_k_1h' in dataframe.columns and 'kdj_j_1h' in dataframe.columns:
# 只禁止极端高位K > 85 且 J 下拐
kdj_1h_block = (dataframe['kdj_k_1h'] > 85) & (dataframe['kdj_j_1h'] < dataframe['kdj_j_1h'].shift(1))
# 3m KDJ作为额外加分条件满足则增加信号质量
kdj_3m_bonus = 0
if 'kdj_k_3m' in dataframe.columns and 'kdj_d_3m' in dataframe.columns:
# 3m 低位金叉K < 40 且 K 上穿 D放宽到 40
kdj_3m_golden_cross = (dataframe['kdj_k_3m'] < 40) & (dataframe['kdj_k_3m'] > dataframe['kdj_d_3m']) & (dataframe['kdj_k_3m'].shift(1) <= dataframe['kdj_d_3m'].shift(1))
# 或 J 从负值区回正
kdj_3m_j_recovery = (dataframe['kdj_j_3m'] > 0) & (dataframe['kdj_j_3m'].shift(1) <= 0)
# 作为加分项(+1 分)
kdj_3m_bonus = (kdj_3m_golden_cross | kdj_3m_j_recovery).astype(int)
# ========== 原有 EMA 趋势过滤 ==========
if 'ema_5_1h' in dataframe.columns and 'ema_20_1h' in dataframe.columns:
# 条件1EMA5保持在EMA20之上
ema5_above_ema20 = dataframe['ema_5_1h'] > dataframe['ema_20_1h']
# 条件2最近20根1h K线内发生过向上穿越
if 'ema5_cross_above_ema20' in dataframe.columns:
# 使用rolling.max检查最近20根K线内是否有True值
recent_cross = dataframe['ema5_cross_above_ema20'].rolling(window=20, min_periods=1).max() == 1
# 两个条件满足其一即可
ema_trend_filter = ema5_above_ema20 | recent_cross
else:
# 如果没有交叉列,只用保持在上方的条件
ema_trend_filter = ema5_above_ema20
else:
# 如果列不存在创建一个全False的Series不允许入场
self.strategy_log(f"[{metadata['pair']}] 警告ema_5_1h或ema_20_1h列不存在过滤条件设为False")
ema_trend_filter = pd.Series(False, index=dataframe.index)
# 合并所有条件(减少强制性条件)
# 至少满足多个条件中的一定数量并且必须满足EMA趋势过滤
condition_count = (
close_to_bb_lower_1h.astype(int) +
rsi_condition_1h.astype(int) +
stochrsi_condition_1h.astype(int) +
macd_condition_1h.astype(int) +
(volume_spike | bb_width_condition).astype(int) + # 成交量或布林带宽度满足其一即可
trend_confirmation.astype(int) +
kdj_3m_bonus # KDJ 3m 作为加分项
)
# 最终条件:基本条件 + EMA趋势过滤 - KDJ高位禁止
# KDJ 不再是必需条件,只在极端高位时禁止入场
basic_condition = condition_count >= self.min_condition_count.value
final_condition = basic_condition & ema_trend_filter & ~kdj_1h_block
# 设置入场信号
dataframe.loc[final_condition, 'enter_long'] = 1
# ========== 新增:入场诊断统计(回测可用) ==========
# 对每个入场信号输出详细诊断信息
entry_signals = dataframe[dataframe['enter_long'] == 1]
if len(entry_signals) > 0:
for idx in entry_signals.index[-5:]: # 只输出最近 5 个信号,避免日志过多
row = dataframe.loc[idx]
current_close = float(row['close'])
# 1. 价格与短期高点的关系
recent_high_5 = float(dataframe.loc[max(0, idx-4):idx+1, 'high'].max()) if idx >= 4 else current_close
price_vs_recent_high = (current_close - recent_high_5) / recent_high_5 if recent_high_5 > 0 else 0
# 2. 价格与 EMA5 的关系
ema5_1h = float(row.get('ema_5_1h', current_close))
price_vs_ema5 = (current_close - ema5_1h) / ema5_1h if ema5_1h > 0 else 0
# 3. 价格与布林带的位置
bb_upper = float(row.get('bb_upper_1h', current_close))
bb_lower = float(row.get('bb_lower_1h', current_close))
bb_position = (current_close - bb_lower) / (bb_upper - bb_lower) if (bb_upper - bb_lower) > 0 else 0.5
# 4. RSI 状态
rsi_1h = float(row.get('rsi_1h', 50))
# 5. MACD 状态
macd_1h = float(row.get('macd_1h', 0))
macd_signal_1h = float(row.get('macd_signal_1h', 0))
macd_cross = 'up' if macd_1h > macd_signal_1h else 'down'
# 6. 市场状态
market_state = str(row.get('market_state', 'unknown'))
# 7. ML 入场概率(如果有)
entry_prob = None
if '&s-entry_signal' in dataframe.columns:
val = row.get('&s-entry_signal', 0)
if val is not None and str(val).strip(): # 检查非空且非空白
try:
entry_prob = float(val)
except (ValueError, TypeError):
pass
elif '&-entry_signal' in dataframe.columns:
val = row.get('&-entry_signal', 0)
if val is not None and str(val).strip():
try:
entry_prob = float(val)
except (ValueError, TypeError):
pass
# 输出诊断日志
ml_prob_str = f"{entry_prob:.2f}" if entry_prob is not None else "N/A"
self.strategy_log(
f"[入场诊断] {metadata['pair']} | "
f"价格: {current_close:.6f} | "
f"vs 5K高点: {price_vs_recent_high:+.2%} | "
f"vs EMA5: {price_vs_ema5:+.2%} | "
f"布林位置: {bb_position:.2f} | "
f"RSI: {rsi_1h:.1f} | "
f"MACD: {macd_cross} | "
f"市场: {market_state} | "
f"ML概率: {ml_prob_str}"
)
# ========== 诊断统计结束 ==========
# 设置入场价格下调1.67%(使用乘法避免除零风险)
final_condition_updated = dataframe['enter_long'] == 1
#dataframe.loc[final_condition_updated, 'enter_price'] = dataframe.loc[final_condition_updated, 'close'] * 0.9833
# 增强调试信息
# 确保ema_trend_filter是Series类型才能调用sum()
if isinstance(ema_trend_filter, pd.Series):
ema_trend_count = ema_trend_filter.sum()
else:
ema_trend_count = 0
basic_condition_count = basic_condition.sum()
final_condition_count = final_condition.sum()
self.strategy_log(f"[{metadata['pair']}] 入场条件检查:")
self.strategy_log(f" - 价格接近布林带下轨: {close_to_bb_lower_1h.sum()}")
self.strategy_log(f" - RSI 超卖: {rsi_condition_1h.sum()}")
self.strategy_log(f" - StochRSI 超卖: {stochrsi_condition_1h.sum()}")
self.strategy_log(f" - MACD 上升趋势: {macd_condition_1h.sum()}")
self.strategy_log(f" - 成交量或布林带宽度: {(volume_spike | bb_width_condition).sum()}")
self.strategy_log(f" - 趋势确认: {trend_confirmation.sum()}")
self.strategy_log(f" - EMA趋势过滤(在上方或20根K线内穿越): {ema_trend_count}")
self.strategy_log(f" - 基本条件满足: {basic_condition_count}")
self.strategy_log(f" - 最终条件(基本+EMA过滤): {final_condition_count}")
# 如果EMA条件满足但最终条件未满足输出详细信息
if ema_trend_count > 0 and final_condition_count == 0:
self.strategy_log(f"[{metadata['pair']}] 注意:检测到 {ema_trend_count} 次EMA趋势过滤满足但由于其他条件不足未能生成入场信号")
# 在populate_entry_trend方法末尾添加
# 计算条件间的相关性
conditions = DataFrame({
'close_to_bb': close_to_bb_lower_1h,
'rsi': rsi_condition_1h,
'stochrsi': stochrsi_condition_1h,
'macd': macd_condition_1h,
'vol_bb': (volume_spike | bb_width_condition),
'trend': trend_confirmation,
'ema_trend': ema_trend_filter
})
correlation = conditions.corr().mean().mean()
#self.strategy_log(f"[{metadata['pair']}] 条件平均相关性: {correlation:.2f}")
# 日志记录
#if dataframe['enter_long'].sum() > 0:
# self.strategy_log(f"[{metadata['pair']}] 发现入场信号数量: {dataframe['enter_long'].sum()}")
return dataframe
def detect_h1_rapid_rise(self, pair: str) -> bool:
"""
检测1小时K线图上的剧烈拉升情况轻量级版本用于confirm_trade_entry
参数:
- pair: 交易对
返回:
- bool: 是否处于不稳固区域
"""
try:
# 获取1小时K线数据
df_1h = self.dp.get_pair_dataframe(pair=pair, timeframe='1h')
# 获取当前优化参数值
max_candles = self.h1_max_candles.value
rapid_rise_threshold = self.h1_rapid_rise_threshold.value
max_consecutive_candles = self.h1_max_consecutive_candles.value
# 确保有足够的K线数据
if len(df_1h) < max_candles:
logger.warning(f"[{pair}] 1h K线数据不足 {max_candles} 根,当前只有 {len(df_1h)} 根,无法完整检测剧烈拉升")
return False
# 获取最近的K线
recent_data = df_1h.iloc[-max_candles:].copy()
# 检查连续最多几根K线内的最大涨幅
rapid_rise_detected = False
max_rise = 0
for i in range(len(recent_data) - max_consecutive_candles + 1):
window_data = recent_data.iloc[i:i + max_consecutive_candles]
window_low = window_data['low'].min()
window_high = window_data['high'].max()
# 计算区间内的最大涨幅
if window_low > 0:
rise_percentage = (window_high - window_low) / window_low
if rise_percentage > max_rise:
max_rise = rise_percentage
# 检查是否超过阈值
if rise_percentage >= rapid_rise_threshold:
rapid_rise_detected = True
#self.strategy_log(f"[{pair}] 检测到剧烈拉升: 从 {window_low:.2f} 到 {window_high:.2f} ({rise_percentage:.2%}) 在 {max_consecutive_candles} 根K线内")
break
current_price = recent_data['close'].iloc[-1]
#self.strategy_log(f"[{pair}] 剧烈拉升检测结果: {'不稳固' if rapid_rise_detected else '稳固'}")
#self.strategy_log(f"[{pair}] 最近最大涨幅: {max_rise:.2%}")
return rapid_rise_detected
except Exception as e:
logger.error(f"[{pair}] 剧烈拉升检测过程中发生错误: {str(e)}")
return False
def confirm_trade_entry(
self,
pair: str,
order_type: str,
amount: float,
rate: float,
time_in_force: str,
current_time: datetime,
entry_tag: str | None,
side: str,
**kwargs,
) -> bool:
"""
交易买入前的确认函数,用于最终决定是否执行交易
此处实现剧烈拉升检查和入场间隔控制逻辑
"""
self.strategy_log(f"[{pair}] confirm_trade_entry 被调用 - 价格: {rate:.8f}, 时间: {current_time}")
# 默认允许交易
allow_trade = True
# 仅对多头交易进行检查
if side == 'long':
# 检查1入场间隔控制使用hyperopt参数
if pair in self._last_entry_time:
last_entry = self._last_entry_time[pair]
time_diff = (current_time - last_entry).total_seconds() * 0.0166666667 # 转换为分钟(使用乘法避免除法)
if time_diff < self.entry_interval_minutes.value:
self.strategy_log(f"[{pair}] 入场间隔不足: 距离上次入场 {time_diff:.1f}分钟 < {self.entry_interval_minutes.value}分钟,取消本次入场")
allow_trade = False
# 检查2检查是否处于剧烈拉升的不稳固区域
if allow_trade:
is_unstable_region = self.detect_h1_rapid_rise(pair)
if is_unstable_region:
#self.strategy_log(f"[{pair}] 由于检测到剧烈拉升,取消入场交易")
allow_trade = False
# 检查3ML 审核官FreqAI 过滤低质量入场)+ 入场诊断统计
# 逻辑:用 entry_signal 概率来判断——若"容易上涨概率"低,则拒绝入场
if allow_trade:
try:
df, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
if len(df) > 0:
last_row = df.iloc[-1]
entry_prob = None
# 优先使用 FreqAI 的 entry_signal 预测列
if '&s-entry_signal' in df.columns:
entry_prob = float(last_row['&s-entry_signal'])
elif '&-entry_signal_prob' in df.columns:
entry_prob = float(last_row['&-entry_signal_prob'])
elif '&-s-entry_signal_prob' in df.columns:
entry_prob = float(last_row['&-s-entry_signal_prob'])
elif '&-entry_signal' in df.columns:
val = last_row['&-entry_signal']
if isinstance(val, (int, float)):
entry_prob = float(val)
else:
# 文本标签时,简单映射为 0/1
entry_prob = 1.0 if str(val).lower() in ['entry', 'buy', '1'] else 0.0
# ========== 新增:入场诊断统计 ==========
# 统计当前入场点的关键指标,用于分析"买在高位"问题
current_close = float(last_row['close'])
# 1. 价格与短期高点的关系
recent_high_5 = float(df['high'].iloc[-5:].max()) if len(df) >= 5 else current_close
price_vs_recent_high = (current_close - recent_high_5) / recent_high_5 if recent_high_5 > 0 else 0
# 2. 价格与 EMA5 的关系
ema5_1h = float(last_row.get('ema_5_1h', current_close))
price_vs_ema5 = (current_close - ema5_1h) / ema5_1h if ema5_1h > 0 else 0
# 3. 价格与布林带的位置
bb_upper = float(last_row.get('bb_upper_1h', current_close))
bb_lower = float(last_row.get('bb_lower_1h', current_close))
bb_position = (current_close - bb_lower) / (bb_upper - bb_lower) if (bb_upper - bb_lower) > 0 else 0.5
# 4. RSI 状态
rsi_1h = float(last_row.get('rsi_1h', 50))
# 5. MACD 状态
macd_1h = float(last_row.get('macd_1h', 0))
macd_signal_1h = float(last_row.get('macd_signal_1h', 0))
macd_cross = 'up' if macd_1h > macd_signal_1h else 'down'
# 6. 市场状态
market_state = str(last_row.get('market_state', 'unknown'))
# 输出诊断日志
self.strategy_log(
f"[入场诊断] {pair} | "
f"价格: {current_close:.6f} | "
f"vs 5K高点: {price_vs_recent_high:+.2%} | "
f"vs EMA5: {price_vs_ema5:+.2%} | "
f"布林位置: {bb_position:.2f} | "
f"RSI: {rsi_1h:.1f} | "
f"MACD: {macd_cross} | "
f"市场: {market_state} | "
f"ML入场概率: {entry_prob:.2f if entry_prob is not None else 'N/A'}"
)
# ========== 诊断统计结束 ==========
if entry_prob is not None:
# 确保概率在 [0, 1] 范围内(分类器输出可能有浮点误差)
entry_prob = max(0.0, min(1.0, entry_prob))
entry_threshold = self.ml_entry_signal_threshold.value
# 新增:读取 AI 预测的未来波动率信号(入场决策版本)
future_vol_signal = None
if '&s-future_volatility' in df.columns:
future_vol_signal = float(last_row['&s-future_volatility'])
elif '&-future_volatility' in df.columns:
future_vol_signal = float(last_row['&-future_volatility'])
# 波动率AI入场决策逻辑
if future_vol_signal is not None:
# 情况AAI 预测强趋势(高波动 > 0.65且ML入场信号强 → 更倾向于入场
if future_vol_signal > 0.65 and entry_prob >= entry_threshold:
self.strategy_log(
f"[波动率 AI] [{pair}] AI 预测强趋势(高波动 {future_vol_signal:.2f}),增强入场信心 | "
f"ML概率: {entry_prob:.2f}, 阈值: {entry_threshold:.2f}"
)
# 可以考虑稍微降低入场阈值或增加入场权重
# 情况BAI 预测震荡市(低波动 < 0.35)→ 谨慎入场,可能错过趋势机会但降低风险
elif future_vol_signal < 0.35:
self.strategy_log(
f"[波动率 AI] [{pair}] AI 预测震荡市(低波动 {future_vol_signal:.2f}),谨慎入场 | "
f"ML概率: {entry_prob:.2f}, 阈值: {entry_threshold:.2f}"
)
# 在震荡市中,可以适当提高入场阈值,要求更强的信号才入场
adjusted_threshold = entry_threshold * 1.1 # 提高10%的要求
if entry_prob < adjusted_threshold:
self.strategy_log(
f"[波动率 AI] [{pair}] 震荡市中入场信号不足: {entry_prob:.2f} < 调整后阈值 {adjusted_threshold:.2f},拒绝入场"
)
allow_trade = False
else:
self.strategy_log(
f"[波动率 AI] [{pair}] 震荡市中入场信号充足: {entry_prob:.2f} >= 调整后阈值 {adjusted_threshold:.2f},允许入场"
)
# 介于 0.35-0.65 之间中性区间按原ML审核官逻辑处理
# 获取高级市场状态信息
advanced_market_info = self.get_advanced_market_state(df, {'pair': pair})
current_state = advanced_market_info['state']
trend_strength = advanced_market_info['strength']
trend_confidence = advanced_market_info['confidence']
# 根据市场状态和趋势强度动态调整入场阈值
# 在下跌趋势中,提高入场门槛以确保更好的入场点
adjusted_threshold = entry_threshold
if current_state in ['strong_bear', 'weak_bear']:
# 在下跌趋势中,根据趋势强度和可信度调整入场门槛
strength_factor = min(1.2, 1.0 + abs(trend_strength) * 0.3) # 趋势越强,门槛越高
confidence_factor = min(1.15, 1.0 + trend_confidence * 0.15) # 可信度越高,门槛越高
adjusted_threshold = min(entry_threshold * strength_factor * confidence_factor, 0.95)
elif current_state in ['strong_bull', 'weak_bull']:
# 在上升趋势中,根据趋势强度和可信度调整入场门槛
strength_factor = max(0.85, 1.0 - abs(trend_strength) * 0.15) # 趋势越强,门槛越低
confidence_factor = max(0.9, 1.0 - trend_confidence * 0.1) # 可信度越高,门槛越低
adjusted_threshold = max(entry_threshold * strength_factor * confidence_factor, 0.05)
# 记录entry_signal值用于调试
self.strategy_log(f"[{pair}] ML 审核官检查: entry_signal={entry_prob:.2f}, 原始阈值={entry_threshold:.2f}, 调整后阈值={adjusted_threshold:.2f}, 市场状态={market_state}, 波动率AI: {future_vol_signal if future_vol_signal is not None else 'N/A'}")
if entry_prob < adjusted_threshold:
self.strategy_log(f"[{pair}] ML 审核官拒绝入场: entry_signal 概率 {entry_prob:.2f} < 调整后阈值 {adjusted_threshold:.2f}(上涨概率低,不宜入场)")
allow_trade = False
else:
self.strategy_log(f"[{pair}] ML 审核官允许入场: entry_signal 概率 {entry_prob:.2f} >= 调整后阈值 {adjusted_threshold:.2f}")
# ========== 新增:强熊市价格保护机制 ==========
# 检查是否在强熊市中,且当前价格高于上次卖出价格
if current_state == 'strong_bear' and allow_trade:
last_exit_price = self._last_exit_prices.get(pair)
if last_exit_price is not None:
# 根据趋势强度和可信度动态调整价格保护阈值
# 趋势越强、可信度越高,价格保护越严格
base_discount = 0.015 # 基础1.5%折扣
strength_factor = min(2.0, 1.0 + abs(trend_strength) * 1.0) # 趋势强度因子
confidence_factor = min(1.5, 1.0 + trend_confidence * 0.5) # 可信度因子
required_discount = base_discount * strength_factor * confidence_factor
required_price = last_exit_price * (1 - required_discount)
if current_close > required_price:
self.strategy_log(
f"[价格保护] [{pair}] 强熊市中价格保护触发 - "
f"当前价格 {current_close:.6f} > 要求价格 {required_price:.6f} "
f"(上次卖出价 {last_exit_price:.6f}{required_discount*100:.1f}% 折扣,趋势强度: {trend_strength:.2f},可信度: {trend_confidence:.2f}),拒绝入场"
)
allow_trade = False
else:
self.strategy_log(
f"[价格保护] [{pair}] 价格满足保护条件 - "
f"当前价格 {current_close:.6f} <= 要求价格 {required_price:.6f} "
f"(上次卖出价 {last_exit_price:.6f}{required_discount*100:.1f}% 折扣,趋势强度: {trend_strength:.2f},可信度: {trend_confidence:.2f}),允许入场"
)
# ========== 价格保护结束 ==========
except Exception as e:
logger.warning(f"[{pair}] ML 审核官检查失败,忽略 ML 过滤: {e}")
# 如果允许入场,更新最后入场时间并生成入场类型标签
if allow_trade:
self._last_entry_time[pair] = current_time
# 识别入场类型并生成标签
df, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
if len(df) > 0:
entry_info = self.identify_entry_type(df, -1)
# 获取全面的市场环境信息
market_context = self.get_comprehensive_market_context(df, pair)
# 生成标签字符串,包含入场类型信息和市场环境信息
tag_string = f"type:{entry_info['type']},duration:{entry_info['duration']},risk:{entry_info['risk']},confidence:{entry_info['confidence']},name:{entry_info['name']},market_state:{market_context['market_state']},trend_strength:{market_context['trend_strength']:.2f}"
self.strategy_log(
f"[{pair}] 入场类型识别: {entry_info['name']} (类型{entry_info['type']}), "
f"建议持仓: {entry_info['duration']}分钟, 风险等级: {entry_info['risk']}, "
f"市场状态: {market_context['market_state']}, 趋势强度: {market_context['trend_strength']:.2f}"
)
# 返回带有标签的字典Freqtrade会自动处理
return {'enter_tag': tag_string}
return allow_trade
def confirm_trade_exit(
self,
pair: str,
trade: 'Trade',
order_type: str,
amount: float,
rate: float,
time_in_force: str,
exit_reason: str,
current_time: datetime,
**kwargs,
) -> bool:
"""
交易卖出前的确认函数,用于最终决定是否执行出场
此处使用 ML 审核官exit_signal 置信度)过滤出场
"""
self.strategy_log(f"[{pair}] confirm_trade_exit 被调用 - 价格: {rate:.8f}, 出场原因: {exit_reason}, 时间: {current_time}")
# 风险控制类退出原因:不经过 ML 审核官,直接允许出场
if exit_reason in ['stop_loss', 'trailing_stop_loss', 'emergency_exit', 'force_exit']:
self.strategy_log(f"[{pair}] 风险控制退出,不走 ML 审核官: exit_reason={exit_reason}")
return True
# 默认允许出场
allow_exit = True
try:
df, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
if len(df) > 0:
last_row = df.iloc[-1]
exit_prob = None
# 优先使用 FreqAI 的 exit_signal 预测列
if '&s-exit_signal' in df.columns:
exit_prob = float(last_row['&s-exit_signal'])
elif '&-exit_signal_prob' in df.columns:
exit_prob = float(last_row['&-exit_signal_prob'])
elif '&-s-exit_signal_prob' in df.columns:
exit_prob = float(last_row['&-s-exit_signal_prob'])
elif '&-exit_signal' in df.columns:
val = last_row['&-exit_signal']
if isinstance(val, (int, float)):
exit_prob = float(val)
else:
# 文本标签时,简单映射为 0/1
exit_prob = 1.0 if str(val).lower() in ['exit', 'sell', '1'] else 0.0
if exit_prob is not None:
# 确保概率在 [0, 1] 范围内(分类器输出可能有浮点误差)
exit_prob = max(0.0, min(1.0, exit_prob))
# 从 kwargs 获取当前利润freqtrade 会传入 current_profit
current_profit = float(kwargs.get('current_profit', 0.0))
# 获取出场一字基础阈值
base_threshold = self.ml_exit_signal_threshold.value
# 计算持仓时长(分钟)
try:
trade_age_minutes = max(0.0, (current_time - trade.open_date_utc).total_seconds() / 60.0)
except Exception:
trade_age_minutes = 0.0
# 基于持仓时长的阈值衰减:持仓越久,阈值越低,越容易出场
age_factor = min(trade_age_minutes / (24 * 60.0), 1.0) # 0~1对应 0~24 小时+
dynamic_threshold = base_threshold * (1.0 - 0.3 * age_factor)
# 小利润单(<=2%)再额外放宽 20%
if current_profit <= 0.02:
dynamic_threshold *= 0.8
# 获取高级市场状态信息
advanced_market_info = self.get_advanced_market_state(df, {'pair': pair})
current_state = advanced_market_info['state']
trend_strength = advanced_market_info['strength']
trend_confidence = advanced_market_info['confidence']
# 根据市场趋势动态调整出场策略
# 在下跌趋势中,更积极地保护利润
if current_state in ['strong_bear', 'weak_bear']:
# 在下跌趋势中,降低出场阈值以更快保护利润
# 根据趋势强度调整阈值:趋势越强,阈值越低
strength_factor = max(0.8, 1.0 - abs(trend_strength) * 0.3) # 强度越大,因子越小
dynamic_threshold *= strength_factor
# 同时缩短持仓时间的容忍度
max_hold_time_bear = 24 * 60 # 下跌趋势中最大持仓时间(分钟)
hold_time_minutes = (current_time - trade.open_date_utc).total_seconds() / 60.0
if hold_time_minutes > max_hold_time_bear:
self.strategy_log(
f"[趋势保护] [{pair}] 在 {current_state} 市场中持仓时间过长 ({hold_time_minutes:.1f}min > {max_hold_time_bear}min),强制出场"
)
return True
# 新增:读取 AI 预测的未来波动率信号(极端化方案)
future_vol_signal = None
if '&s-future_volatility' in df.columns:
future_vol_signal = float(last_row['&s-future_volatility'])
elif '&-future_volatility' in df.columns:
future_vol_signal = float(last_row['&-future_volatility'])
# 极端化逻辑:根据 AI 预测的未来波动率直接接管部分出场决策
if future_vol_signal is not None and exit_reason == 'exit_signal':
# 情况AAI 预测强趋势(高波动),且当前不亏损 → 忽略本次 exit_signal继续持有
if future_vol_signal > 0.65 and current_profit >= 0:
self.strategy_log(
f"[波动率 AI] [{pair}] AI 预测强趋势(高波动 {future_vol_signal:.2f}),忽略本次 exit_signal继续持有 | "
f"持仓: {trade_age_minutes:.1f}min, 利润: {current_profit:.4f}"
)
allow_exit = False
return allow_exit
# 情况BAI 预测震荡市(低波动) → 强制接受 exit_signal立即出场
elif future_vol_signal < 0.35:
self.strategy_log(
f"[波动率 AI] [{pair}] AI 预测震荡市(低波动 {future_vol_signal:.2f}),强制接受 exit_signal 出场 | "
f"持仓: {trade_age_minutes:.1f}min, 利润: {current_profit:.4f}"
)
return True
# 介于 0.35-0.65 之间:中性区间,不做强制处理,继续走原有 ML 审核官逻辑
# 在下跌趋势中,实施更积极的止盈策略
if current_state in ['strong_bear', 'weak_bear'] and current_profit > 0.01:
# 在下跌趋势中,根据趋势强度和可信度调整策略
# 趋势越强、可信度越高,越应该快速出场
if trade_age_minutes > 60: # 持仓超过1小时
self.strategy_log(
f"[趋势保护] [{pair}] 在 {current_state} 市场中持仓超过1小时小幅盈利 {current_profit:.2%},趋势强度: {trend_strength:.2f},可信度: {trend_confidence:.2f},考虑提前出场"
)
# 根据趋势强度和可信度调整出场阈值
strength_factor = max(0.7, 1.0 - abs(trend_strength) * 0.2) # 趋势越强,因子越小
confidence_factor = max(0.8, 1.0 - trend_confidence * 0.2) # 可信度越高,因子越小
dynamic_threshold *= strength_factor * confidence_factor
# 设定下限,避免阈值过低
dynamic_threshold = max(0.05, dynamic_threshold)
if exit_prob < dynamic_threshold:
self.strategy_log(
f"[{pair}] ML 审核官拒绝出场: exit_signal 概率 {exit_prob:.2f} < 动态阈值 {dynamic_threshold:.2f}"
f" | 原应出场原因: {exit_reason} | 持仓: {trade_age_minutes:.1f}min, 利润: {current_profit:.4f}"
f" | 波动率AI: {future_vol_signal if future_vol_signal is not None else 'N/A'}"
)
allow_exit = False
else:
self.strategy_log(
f"[{pair}] ML 审核官允许出场: exit_signal 概率 {exit_prob:.2f} >= 动态阈值 {dynamic_threshold:.2f}"
f" | 出场原因: {exit_reason} | 持仓: {trade_age_minutes:.1f}min, 利润: {current_profit:.4f}"
f" | 波动率AI: {future_vol_signal if future_vol_signal is not None else 'N/A'}"
)
except Exception as e:
logger.warning(f"[{pair}] ML 审核官出场检查失败,允许出场: {e}")
return allow_exit
def custom_stoploss(self, pair: str, trade: 'Trade', current_time, current_rate: float,
current_profit: float, **kwargs) -> float:
# 动态止损基于ATR
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1]
atr = last_candle['atr']
# 获取全面的市场环境信息
market_context = self.get_comprehensive_market_context(dataframe, pair)
current_state = market_context['market_state']
trend_strength = market_context['trend_strength']
trend_confidence = market_context['trend_confidence']
# 获取入场标签信息
entry_tag = getattr(trade, 'enter_tag', '')
# 解析入场类型信息
entry_type = 2 # 默认为中期持有
risk_level = 1 # 默认中等风险
if entry_tag:
try:
tag_parts = entry_tag.split(',')
for part in tag_parts:
if part.startswith('type:'):
entry_type = int(part.split(':')[1])
elif part.startswith('risk:'):
risk_level = int(part.split(':')[1])
except:
pass # 如果解析失败,使用默认值
# 根据入场类型、市场状态和趋势强度调整止损策略
# 在下跌趋势中,更积极地保护利润
if current_state in ['strong_bear', 'weak_bear']:
# 在下跌趋势中,根据趋势强度调整止损策略
# 趋势强度越大,止损越积极
strength_multiplier = max(0.5, 1.0 - abs(trend_strength) * 0.5) # 趋势越强,乘数越小
if current_profit > 0.03: # 利润超过3%时
return -1.5 * strength_multiplier * atr / current_rate # 更快地锁定利润
elif current_profit > 0.01: # 利润超过1%时
return -1.2 * strength_multiplier * atr / current_rate # 更快地锁定利润
elif current_profit > -0.01: # 小幅亏损时
return -0.8 * strength_multiplier * atr / current_rate # 更积极地止损
else:
return -1.0 * strength_multiplier * atr / current_rate # 一般止损
else:
# 渐进式止损策略(非下跌趋势)
if current_profit > 0.05: # 利润超过5%时
return -3.0 * atr / current_rate
elif current_profit > 0.03: # 利润超过3%时
return -2.5 * atr / current_rate
elif current_profit > 0.01: # 利润超过1%时
return -2.0 * atr / current_rate
# 根据入场类型调整止损策略
if entry_type in [0, 1, 6]: # 快进快出、短期波段、震荡套利 - 更保守的止损
if current_profit > -0.015: # 小幅亏损时
return -1.0 * atr / current_rate # 更严格的止损
else:
return -1.2 * atr / current_rate
# 在强劲牛市中,即使小亏损也可以容忍更大回调
if current_state == 'strong_bull' and current_profit > -0.01:
return -1.8 * atr / current_rate
if atr > 0:
return -1.2 * atr / current_rate
return self.stoploss
def custom_exit(self, pair: str, trade: Trade, current_time: datetime, current_rate: float,
current_profit: float, **kwargs) -> float:
if trade.is_short:
return 0.0
trade_age_minutes = (current_time - trade.open_date_utc).total_seconds() / 60
if trade_age_minutes < 0:
trade_age_minutes = 0
# 获取入场标签信息
entry_tag = getattr(trade, 'enter_tag', '')
# 解析入场类型和市场环境信息
entry_type = 2 # 默认为中期持有
suggested_duration = 360 # 默认4小时
risk_level = 1 # 默认中等风险
market_state = 'neutral' # 默认市场状态
trend_strength = 0.0 # 默认趋势强度
if entry_tag:
try:
tag_parts = entry_tag.split(',')
for part in tag_parts:
if part.startswith('type:'):
entry_type = int(part.split(':')[1])
elif part.startswith('duration:'):
suggested_duration = int(part.split(':')[1])
elif part.startswith('risk:'):
risk_level = int(part.split(':')[1])
elif part.startswith('market_state:'):
market_state = part.split(':')[1]
elif part.startswith('trend_strength:'):
trend_strength = float(part.split(':')[1])
except:
pass # 如果解析失败,使用默认值
# 获取实时市场环境信息
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
current_market_context = self.get_comprehensive_market_context(dataframe, pair)
# 如果标签中没有市场状态信息,使用实时信息
if market_state == 'neutral':
market_state = current_market_context['market_state']
# 根据入场类型和实时市场环境调整退出逻辑
exit_ratio = 0.0
if entry_type == 0: # 快进快出
# 快进快出类型的订单,如果持有时间超过建议时间,强制退出
if trade_age_minutes > suggested_duration * 1.5: # 超过建议时间的1.5倍
if current_profit > 0: # 有盈利时退出
exit_ratio = 1.0
elif current_profit < -0.02: # 亏损超过2%时也退出
exit_ratio = 1.0
elif trade_age_minutes > 120 and current_profit > 0.02: # 超过2小时且有2%以上盈利
exit_ratio = 0.8 # 部分退出
# 在强趋势市场中,根据趋势方向调整退出策略
elif abs(trend_strength) > 0.5: # 强趋势
if trend_strength < 0: # 强下降趋势
if current_profit > 0: # 有盈利时立即退出
exit_ratio = 1.0
elif current_profit < -0.01: # 亏损超过1%时退出
exit_ratio = 1.0
elif entry_type == 3: # 长期持有
# 长期持有类型的订单,更耐心,但也要注意风险
if current_profit > 0.15: # 15%利润时考虑部分止盈
exit_ratio = 0.5 # 退出一半仓位
elif current_profit < -0.08: # 亏损超过8%时考虑退出
exit_ratio = 0.7 # 退出较大比例
elif current_state == 'strong_bear' and current_profit < 0.05: # 强熊市且利润不高
exit_ratio = 0.6 # 考虑退出
# 在强上升趋势中,可以更耐心持有
elif trend_strength > 0.5 and current_profit > 0.05: # 强上升趋势且有5%以上利润
exit_ratio = 0.3 # 只退出一小部分,继续持有大部分仓位
elif entry_type == 4: # 套利机会
# 套利机会,达到预期收益就退出
if current_profit > 0.03: # 达到3%收益
exit_ratio = 1.0 # 全部退出
elif trade_age_minutes > suggested_duration * 2: # 超过建议时间2倍仍未盈利
exit_ratio = 1.0 # 强制退出
# 在不利市场条件下提前退出
elif current_state in ['strong_bear', 'weak_bear'] and current_profit < 0.01:
exit_ratio = 1.0 # 不利市场条件下未达到预期收益即退出
elif entry_type == 6: # 震荡套利
# 震荡套利,快速止盈止损
if current_profit > 0.025: # 达到2.5%收益
exit_ratio = 1.0 # 全部退出
elif current_profit < -0.02: # 亏损超过2%
exit_ratio = 1.0 # 全部退出
# 在趋势市场中不适合震荡套利,应提前退出
elif abs(trend_strength) > 0.4: # 明显趋势
exit_ratio = 1.0 # 趋势市场不适合震荡套利,立即退出
else: # 其他类型(短期波段、中期趋势等)
# 使用原有的动态ROI逻辑作为基础
# 使用可优化的线性函数: y = (a * (x + k)) + t
a = self.roi_param_a.value # 系数a (可优化参数)
k = self.roi_param_k.value # 偏移量k (可优化参数)
t = self.roi_param_t.value # 常数项t (可优化参数)
dynamic_roi_threshold = (a * (trade_age_minutes + k)) + t
# 确保ROI阈值不小于0
if dynamic_roi_threshold < 0:
dynamic_roi_threshold = 0.0
profit_ratio = current_profit / dynamic_roi_threshold if dynamic_roi_threshold > 0 else 0
if profit_ratio >= 1.0:
if current_state == 'strong_bull':
exit_ratio = 0.5 if profit_ratio < 1.5 else 0.8
elif current_state == 'weak_bull':
exit_ratio = 0.6 if profit_ratio < 1.2 else 0.9
else:
exit_ratio = 1.0
#self.strategy_log(f"[{pair}] 动态止盈: 持仓时间={trade_age_minutes:.1f}分钟, 当前利润={current_profit:.2%}, "
# f"入场类型={entry_type}, 建议时长={suggested_duration}min, "
# f"市场状态={current_state}, 趋势强度={trend_strength:.2f}, 退出比例={exit_ratio:.0%}")
# 当决定退出时,输出出场价格信息
if exit_ratio > 0:
# 计算出场价格上浮比例1.25%
price_markup_percent = 1.25
adjusted_exit_price = current_rate * 1.0125
self.strategy_log(f"[{pair}] 准备出场 - 市场价: {current_rate:.8f}, 调整后出场价: {adjusted_exit_price:.8f}, 上浮: {price_markup_percent}%, 退出比例: {exit_ratio:.0%}")
return exit_ratio
def custom_roi(self, pair: str, trade: 'Trade', current_time: datetime, current_rate: float,
current_profit: float, **kwargs) -> Tuple[float, float]:
"""
自定义ROI函数根据入场类型和市场环境动态调整ROI
返回 (时间, ROI阈值) 的元组表示在指定时间后达到指定ROI则退出
"""
# 获取入场类型信息从entry_tag解析
entry_tag = getattr(trade, 'enter_tag', '')
# 解析入场类型和市场环境信息
entry_type = 2 # 默认为中期持有
suggested_duration = 360 # 默认4小时
risk_level = 1 # 默认中等风险
market_state = 'neutral' # 默认市场状态
trend_strength = 0.0 # 默认趋势强度
if entry_tag:
try:
# 解析标签信息
tag_parts = entry_tag.split(',')
for part in tag_parts:
if part.startswith('type:'):
entry_type = int(part.split(':')[1])
elif part.startswith('duration:'):
suggested_duration = int(part.split(':')[1])
elif part.startswith('risk:'):
risk_level = int(part.split(':')[1])
elif part.startswith('market_state:'):
market_state = part.split(':')[1]
elif part.startswith('trend_strength:'):
trend_strength = float(part.split(':')[1])
except:
pass # 如果解析失败,使用默认值
# 获取实时市场环境信息
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
current_market_context = self.get_comprehensive_market_context(dataframe, pair)
# 如果标签中没有市场状态信息,使用实时信息
if market_state == 'neutral':
market_state = current_market_context['market_state']
# 根据入场类型和市场环境计算动态ROI
# 基础ROI阈值根据入场类型调整
roi_multipliers = {
0: 0.02, # 快进快出 - 2%止盈
1: 0.04, # 短期波段 - 4%止盈
2: 0.06, # 中期趋势 - 6%止盈
3: 0.10, # 长期持有 - 10%止盈
4: 0.03, # 套利机会 - 3%止盈
5: 0.08, # 突破追涨 - 8%止盈
6: 0.025 # 震荡套利 - 2.5%止盈
}
base_roi = roi_multipliers.get(entry_type, 0.06) # 默认6%
# 根据市场状态调整ROI
market_state_multipliers = {
'strong_bull': 1.2, # 牛市提高止盈目标
'weak_bull': 1.1, # 弱牛市稍微提高止盈目标
'neutral': 1.0, # 盘整保持正常止盈目标
'weak_bear': 0.9, # 弱熊市降低止盈目标
'strong_bear': 0.8 # 熊市大幅降低止盈目标
}
market_multiplier = market_state_multipliers.get(market_state, 1.0)
# 根据趋势强度进一步调整ROI趋势越强调整幅度越大
trend_adjustment = 1.0 + (abs(trend_strength) * 0.1) # 趋势强度每增加0.1调整幅度增加10%
if trend_strength > 0: # 上升趋势,略微提高止盈目标
trend_roi_multiplier = trend_adjustment
else: # 下降趋势,略微降低止盈目标
trend_roi_multiplier = 2.0 - trend_adjustment # 确保下降趋势时调整值小于1
adjusted_roi = base_roi * market_multiplier * trend_roi_multiplier
# 根据风险等级调整时间
time_multipliers = {
0: 0.5, # 低风险 - 延长持仓时间
1: 1.0, # 中等风险 - 正常持仓时间
2: 0.7 # 高风险 - 缩短持仓时间
}
time_multiplier = time_multipliers.get(risk_level, 1.0)
# 根据市场趋势强度调整持仓时间(趋势越强,持仓时间越长或越短,取决于趋势方向)
if abs(trend_strength) > 0.3: # 强趋势
if trend_strength > 0 and entry_type in [2, 3]: # 上升趋势且为中期或长期持有
trend_time_multiplier = 1.2 # 延长持仓时间
elif trend_strength < 0 and entry_type in [0, 1, 6]: # 下降趋势且为短期操作
trend_time_multiplier = 0.8 # 缩短持仓时间
else:
trend_time_multiplier = 1.0
else:
trend_time_multiplier = 1.0
adjusted_time = suggested_duration * time_multiplier * trend_time_multiplier
# 确保时间不小于最小值
min_time = 15 # 最少15分钟
adjusted_time = max(min_time, adjusted_time)
self.strategy_log(
f"[{pair}] 动态ROI: 入场类型={entry_type}, 市场状态={market_state}, "
f"趋势强度={trend_strength:.2f}, 基础ROI={base_roi:.2%}, "
f"市场调整={market_multiplier:.2f}, 趋势调整={trend_roi_multiplier:.2f}, "
f"调整后ROI={adjusted_roi:.2%}, 建议时长={suggested_duration}min, "
f"趋势时间调整={trend_time_multiplier:.2f}, 调整后时长={adjusted_time}min"
)
return adjusted_time, adjusted_roi
def adjust_trade_position(self, trade: 'Trade', current_time, current_rate: float,
current_profit: float, min_stake: float, max_stake: float, **kwargs) -> float:
"""
根据用户要求实现加仓逻辑
- 加仓间隔设置为可优化参数 add_position_callback
- 加仓额度为: (stake_amount / stake_divisor) ^ (加仓次数 - 1)
"""
# 获取当前交易对
pair = trade.pair
# 获取当前交易的加仓次数
entry_count = len(trade.orders) # 获取所有入场订单数量
# 如果已经达到最大加仓次数,则不再加仓
if entry_count - 1 >= self.max_entry_adjustments.value:
return 0.0
# 获取初始入场价格和当前价格的差值百分比
initial_price = trade.open_rate
if initial_price == 0:
return 0.0
price_diff_pct = (current_rate - initial_price) / initial_price
# 计算加仓次数从1开始计数
adjustment_count = entry_count - 1 # 已加仓次数
# 检查价格回调是否达到加仓间隔
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
current_state = dataframe['market_state'].iloc[-1] if 'market_state' in dataframe.columns else 'neutral'
# 计算当前所需的加仓间隔百分比 = 基础间隔 * (系数 ^ 已加仓次数)
# 获取当前币对的波动系数,用于动态调整回调百分比
volatility_coef = self.get_volatility_coefficient(pair)
# 回调百分比 = 基础回调 * (系数 ^ 已加仓次数) * 波动系数
current_callback = self.add_position_callback.value * (self.add_position_multiplier.value ** adjustment_count) * volatility_coef
if price_diff_pct <= -current_callback:
# 计算初始入场金额
initial_stake = trade.orders[0].cost # 第一笔订单的成本
# 计算加仓金额: (initial_stake / stake_divisor) ^ (adjustment_count + 1)
additional_stake = (initial_stake / self.stake_divisor.value) * (self.add_position_growth.value ** (adjustment_count + 1))
# 确保加仓金额在允许的范围内
additional_stake = max(min_stake, min(additional_stake, max_stake - trade.stake_amount))
#self.strategy_log(f"[{pair}] 触发加仓: 第{adjustment_count + 1}次加仓, 初始金额{initial_stake:.2f}, \
# 加仓金额{additional_stake:.2f}, 价格差{price_diff_pct:.2%}, 当前利润{current_profit:.2%}")
return additional_stake
# 不符合加仓条件返回0
return 0.0
def custom_stake_amount(self, pair: str, current_time: datetime, **kwargs) -> float:
"""
定义初始仓位大小
"""
# 获取默认的基础仓位大小
default_stake = self.stake_amount
# 从kwargs获取最小和最大仓位限制
min_stake = kwargs.get('min_stake', 0.0)
max_stake = kwargs.get('max_stake', default_stake)
# 确保仓位在允许的范围内
adjusted_stake = max(min_stake, min(default_stake, max_stake))
return adjusted_stake