myTestFreqAI/freqtrade/templates/freqaiprimer.py
2026-02-26 02:02:42 +08:00

1763 lines
93 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import warnings
warnings.filterwarnings("ignore", category=UserWarning, module="pandas_ta")
import logging
import pandas as pd
pd.set_option('future.no_silent_downcasting', True)
from freqtrade.strategy import IStrategy, IntParameter, DecimalParameter
from pandas import DataFrame
import pandas_ta as ta
from freqtrade.persistence import Trade
import numpy as np
from datetime import datetime, timezone, timedelta
import math
logger = logging.getLogger(__name__)
# ========== 时区统一管理 ==========
# FreqAI dataframe 的 date 列是 UTC 时间
# 所有虚机/容器统一为 UTC+8
UTC_PLUS_8 = timezone(timedelta(hours=8))
class FreqaiPrimer(IStrategy):
# 策略参数 - 使用custom_roi替代minimal_roi字典
loglevel = "warning"
minimal_roi = {}
add_position_callback = True
# 启用自定义ROI回调函数
use_custom_roi = True
# FreqAI 要求
process_only_new_candles = True
stoploss = -0.15 # 固定止损 -15% (大幅放宽止损以承受更大波动)
trailing_stop = True
trailing_stop_positive_offset = 0.005 # 追踪止损偏移量 0.5% (更容易触发跟踪止盈)
# 用于跟踪市场状态的数据框缓存
_dataframe_cache = None
def __init__(self, config=None):
"""初始化策略参数调用父类初始化方法并接受config参数"""
super().__init__(config) # 调用父类的初始化方法并传递config
# 存储从配置文件加载的默认值
self._trailing_stop_positive_default = 0.004 # 降低默认值以更容易触发跟踪止盈
# 波动系数缓存(简化版:直接计算,无需历史序列)
self._volatility_timestamp = {} # {pair: timestamp}
self._volatility_cache = {} # {pair: volatility_coef}
self._volatility_update_interval = 180 # 波动系数更新间隔3分钟
# 入场间隔控制:记录每个交易对最近一次入场的时间
# 格式: {pair: datetime}
self._last_entry_time = {}
# 冷启动保护:记录策略启动时间
self._strategy_start_time = datetime.now()
# ========== 数据新鲜度监控统计 ==========
# 用于量化和诊断数据延迟问题的严重性
self._freshness_stats = {
'total_checks': 0, # 总检查次数
'fresh_count': 0, # 新鲜数据次数 (< 10分钟)
'warning_count': 0, # 警告次数 (10-30分钟)
'stale_count': 0, # 过期次数 (> 30分钟)
'max_delay_minutes': 0.0, # 最大延迟时间
'total_delay_minutes': 0.0, # 累计延迟时间(用于计算平均值)
'pair_delays': {}, # 每个币对的延迟统计 {pair: {'count': N, 'avg_delay': M, 'max_delay': X}}
'last_report_time': None, # 上次报告时间
}
self._freshness_report_interval = 3600 # 每小时输出一次统计报告
def strategy_log(self, message: str, level: str = "info") -> None:
"""根据 config 的 enable_strategy_log 决定是否输出日志"""
enable_log = self.config.get('enable_strategy_log', False)
if not enable_log:
return
if level.lower() == "debug":
logger.debug(message)
elif level.lower() == "warning":
logger.warning(message)
elif level.lower() == "error":
logger.error(message)
else:
logger.info(message)
def calculate_data_freshness(self, data_timestamp: datetime, pair: str, dataframe: DataFrame) -> float:
"""
计算数据新鲜度与当前时间比较因为dataframe本身可能已过时
:param data_timestamp: 数据时间戳来自dataframe的最新时间
:param pair: 交易对
:param dataframe: 数据框
:return: 数据延迟时间(分钟)
"""
try:
# 转换为UTC+8时间
if data_timestamp.tzinfo is None:
data_timestamp = data_timestamp.replace(tzinfo=timezone.utc)
data_timestamp_utc8 = data_timestamp.astimezone(UTC_PLUS_8).replace(tzinfo=None)
# 使用当前系统时间作为基准因为dataframe本身可能已过时
# 这样可以检测到Freqtrade内部缓存机制导致的数据延迟
current_time = datetime.now(UTC_PLUS_8).replace(tzinfo=None)
freshness_minutes = (current_time - data_timestamp_utc8).total_seconds() / 60
return max(0.0, freshness_minutes) # 确保不会返回负数
except Exception as e:
self.strategy_log(f"[{pair}] 数据新鲜度计算异常: {e}", "warning")
return 3.0 # 异常时返回保守估计
def update_freshness_stats(self, pair: str, data_age_minutes: float) -> None:
"""
更新数据新鲜度统计
:param pair: 交易对
:param data_age_minutes: 数据延迟时间(分钟)
"""
stats = self._freshness_stats
stats['total_checks'] += 1
stats['total_delay_minutes'] += data_age_minutes
stats['max_delay_minutes'] = max(stats['max_delay_minutes'], data_age_minutes)
# 分类统计
if data_age_minutes <= 10:
stats['fresh_count'] += 1
elif data_age_minutes <= 30:
stats['warning_count'] += 1
else:
stats['stale_count'] += 1
# 每个币对的统计
if pair not in stats['pair_delays']:
stats['pair_delays'][pair] = {
'count': 0,
'total_delay': 0.0,
'max_delay': 0.0,
'fresh': 0,
'warning': 0,
'stale': 0
}
pair_stat = stats['pair_delays'][pair]
pair_stat['count'] += 1
pair_stat['total_delay'] += data_age_minutes
pair_stat['max_delay'] = max(pair_stat['max_delay'], data_age_minutes)
if data_age_minutes <= 10:
pair_stat['fresh'] += 1
elif data_age_minutes <= 30:
pair_stat['warning'] += 1
else:
pair_stat['stale'] += 1
def report_freshness_stats(self, force: bool = False) -> None:
"""
输出数据新鲜度统计报告
:param force: 强制输出(忽略时间间隔)
"""
current_time = datetime.now()
stats = self._freshness_stats
# 检查是否需要报告
if not force and stats['last_report_time'] is not None:
elapsed = (current_time - stats['last_report_time']).total_seconds()
if elapsed < self._freshness_report_interval:
return
# 如果没有数据,不输出
if stats['total_checks'] == 0:
return
# 计算统计指标
avg_delay = stats['total_delay_minutes'] / stats['total_checks']
fresh_rate = stats['fresh_count'] / stats['total_checks'] * 100
warning_rate = stats['warning_count'] / stats['total_checks'] * 100
stale_rate = stats['stale_count'] / stats['total_checks'] * 100
# 输出总体报告
self.strategy_log(
f"\n"
f"========================================\n"
f" 📊 数据新鲜度统计报告\n"
f"========================================\n"
f"总检查次数: {stats['total_checks']}\n"
f"新鲜数据 (<10min): {stats['fresh_count']} ({fresh_rate:.1f}%)\n"
f"警告数据 (10-30min): {stats['warning_count']} ({warning_rate:.1f}%)\n"
f"过期数据 (>30min): {stats['stale_count']} ({stale_rate:.1f}%)\n"
f"平均延迟: {avg_delay:.2f} 分钟\n"
f"最大延迟: {stats['max_delay_minutes']:.2f} 分钟\n"
f"========================================",
level="info"
)
# 输出每个币对的详细统计(按过期率排序)
if stats['pair_delays']:
pair_stats_list = []
for pair, pair_stat in stats['pair_delays'].items():
avg_pair_delay = pair_stat['total_delay'] / pair_stat['count']
stale_rate_pair = pair_stat['stale'] / pair_stat['count'] * 100
pair_stats_list.append({
'pair': pair,
'avg_delay': avg_pair_delay,
'max_delay': pair_stat['max_delay'],
'stale_rate': stale_rate_pair,
'count': pair_stat['count']
})
# 按过期率排序(从高到低)
pair_stats_list.sort(key=lambda x: x['stale_rate'], reverse=True)
report_lines = ["📈 各币对数据新鲜度排名(按过期率降序):"]
for i, ps in enumerate(pair_stats_list[:10], 1): # 只显示前10个最差的
report_lines.append(
f" {i}. {ps['pair']:15s} | "
f"平均: {ps['avg_delay']:5.1f}min | "
f"最大: {ps['max_delay']:5.1f}min | "
f"过期率: {ps['stale_rate']:5.1f}% | "
f"检查: {ps['count']}"
)
self.strategy_log("\n".join(report_lines), level="info")
# 更新报告时间
stats['last_report_time'] = current_time
# 只用于adjust_trade_position方法的波动系数获取
def get_volatility_coefficient(self, pair: str) -> float:
"""
获取币对的波动系数(简化版:直接计算,无需历史序列)
- USDT/USDT 波动系数设置为0
- BTC/USDT 波动系数设置为1
- 其他币对:
计算当前波动系数 = 该币对波动率 / BTC/USDT波动率
基于最近200根1h K线足够稳定无需额外平滑
波动系数表示某币对与BTC/USDT相比的波动幅度倍数
- 山寨币的波动系数可能大于3
- 稳定性较高的币对如DOT/USDT波动系数可能小于1
添加了缓存机制每3分钟更新一次避免频繁计算
"""
# 检查特殊币对
if pair == 'USDT/USDT':
return 0.0
elif pair == 'BTC/USDT':
return 1.0
try:
# 获取当前时间戳
current_time = datetime.now().timestamp()
# 检查缓存:如果距离上次计算时间小于更新间隔,则直接返回缓存值
if (pair in self._volatility_cache and
pair in self._volatility_timestamp and
current_time - self._volatility_timestamp[pair] < self._volatility_update_interval):
return self._volatility_cache[pair]
# 直接计算当前波动系数基于最近200根1h K线
current_volatility_coef = self._calculate_current_volatility_coef(pair)
# 更新缓存和时间戳
self._volatility_cache[pair] = current_volatility_coef
self._volatility_timestamp[pair] = current_time
self.strategy_log(f"波动系数计算完成 {pair}: 系数={current_volatility_coef:.4f} (基于最近200根1h K线)")
return current_volatility_coef
except Exception as e:
logger.warning(f"计算波动系数时出错 {pair}: {str(e)}")
# 如果出错尝试返回缓存值否则返回默认值1.0
return self._volatility_cache.get(pair, 1.0)
def _calculate_current_volatility_coef(self, pair: str) -> float:
"""
计算当前的波动系数(该币对波动率 / BTC/USDT波动率
"""
try:
# 获取当前币对的1小时k线数据
current_pair_df, _ = self.dp.get_analyzed_dataframe(pair, '1h')
# 获取BTC/USDT的1小时k线数据
btc_df, _ = self.dp.get_analyzed_dataframe('BTC/USDT', '1h')
# 确保有足够的数据点
if len(current_pair_df) < 2 or len(btc_df) < 2:
return 1.0 # 如果没有足够数据返回默认值1.0
# 对于数据点少于200个的情况使用所有可用数据
# 对于数据点多于200个的情况使用最近200个数据点
current_data = current_pair_df.iloc[-min(200, len(current_pair_df)):]
btc_data = btc_df.iloc[-min(200, len(btc_df)):]
# 计算当前币对的对数收益率和波动率
current_data['returns'] = current_data['close'].pct_change()
current_volatility = current_data['returns'].std() * 100 # 转换为百分比
# 计算BTC/USDT的对数收益率和波动率
btc_data['returns'] = btc_data['close'].pct_change()
btc_volatility = btc_data['returns'].std() * 100 # 转换为百分比
# 避免除以零的情况
if btc_volatility == 0:
return 1.0
# 计算波动系数:当前币对波动率 / BTC/USDT波动率
volatility_coef = current_volatility / btc_volatility
# 设置合理的上下限,避免极端值影响策略
# 上限设置为5.0(非常高波动的币对)
# 下限设置为0.1(非常稳定的币对)
return max(0.1, min(5.0, volatility_coef))
except Exception as e:
logger.warning(f"计算当前波动系数时出错 {pair}: {str(e)}")
return 1.0 # 出错时返回默认值1.0
# 其他辅助方法可以在这里添加
@property
def protections(self):
"""
保护机制配置
基于最新Freqtrade规范保护机制应定义在策略文件中而非配置文件
"""
return [
{
"method": "StoplossGuard",
"lookback_period_candles": 60, # 3小时回看期60根3分钟K线
"trade_limit": 2, # 最多2笔止损交易
"stop_duration_candles": 60, # 暂停180分钟60根3分钟K线
"only_per_pair": False # 仅针对单个币对
},
{
"method": "CooldownPeriod",
"stop_duration_candles": 15 # 45分钟冷却期15根3分钟K线
},
{
"method": "MaxDrawdown",
"lookback_period_candles": 48, # 2.4小时回看期
"trade_limit": 4, # 4笔交易限制
"stop_duration_candles": 24, # 72分钟暂停24根3分钟K线
"max_allowed_drawdown": 0.20 # 20%最大回撤容忍度
}
]
@property
def trailing_stop_positive(self):
"""根据市场状态动态调整跟踪止盈参数"""
# 获取当前市场状态
if self._dataframe_cache is not None and len(self._dataframe_cache) > 0:
current_state = self._dataframe_cache['market_state'].iloc[-1]
if current_state == 'strong_bull':
return 0.007 # 强劲牛市中降低跟踪止盈,让利润奔跑
elif current_state == 'weak_bull':
return 0.005 # 弱势牛市中保持较低的跟踪止盈
return self._trailing_stop_positive_default # 返回默认值
@trailing_stop_positive.setter
def trailing_stop_positive(self, value):
"""设置trailing_stop_positive的默认值"""
self._trailing_stop_positive_default = value
timeframe = "15m" # 主时间框架为 15 分钟
can_short = False # 禁用做空
# 自定义指标参数 - 使用Hyperopt可优化参数
bb_length = IntParameter(10, 30, default=20, optimize=True, load=True, space='buy')
bb_std = DecimalParameter(1.5, 3.0, decimals=1, default=2.0, optimize=True, load=True, space='buy')
rsi_length = IntParameter(7, 21, default=14, optimize=True, load=True, space='buy')
rsi_oversold = IntParameter(30, 50, default=42, optimize=True, load=True, space='buy')
# 入场条件阈值参数
bb_lower_deviation = DecimalParameter(1.01, 1.05, decimals=2, default=1.03, optimize=True, load=True, space='buy')
rsi_bull_threshold = IntParameter(45, 55, default=50, optimize=True, load=True, space='buy')
stochrsi_bull_threshold = IntParameter(30, 40, default=35, optimize=True, load=True, space='buy')
stochrsi_neutral_threshold = IntParameter(20, 30, default=25, optimize=True, load=True, space='buy')
volume_multiplier = DecimalParameter(1.2, 2.0, decimals=1, default=1.5, optimize=True, load=True, space='buy')
bb_width_threshold = DecimalParameter(0.01, 0.03, decimals=3, default=0.02, optimize=True, load=True, space='buy')
min_condition_count = IntParameter(2, 4, default=3, optimize=True, load=True, space='buy')
# 剧烈拉升检测参数 - 使用Hyperopt可优化参数
h1_max_candles = IntParameter(100, 300, default=200, optimize=True, load=True, space='buy')
h1_rapid_rise_threshold = DecimalParameter(0.05, 0.15, decimals=3, default=0.11, optimize=True, load=True, space='buy')
h1_max_consecutive_candles = IntParameter(1, 4, default=2, optimize=True, load=True, space='buy')
# 入场间隔控制参数(分钟)
entry_interval_minutes = IntParameter(20, 200, default=42, optimize=True, load=True, space='buy')
# EMA20斜率阈值参数用于趋势强度过滤
ema20_slope_threshold = DecimalParameter(0.0005, 0.005, decimals=4, default=0.001, optimize=True, load=True, space='buy')
# ML 审核官entry_signal 拒绝入场的阈值(越高越宽松,越低越严格)
ml_entry_signal_threshold = DecimalParameter(0.05, 0.85, decimals=2, default=0.37, optimize=True, load=True, space='buy')
# ML 审核官exit_signal 拒绝出场的阈值(越高越宽松,越低越严格)
ml_exit_signal_threshold = DecimalParameter(0.05, 0.85, decimals=2, default=0.68, optimize=True, load=True, space='buy')
# FreqAI 标签定义entry_signal 的洛底上涨幅度(%
freqai_entry_up_percent = DecimalParameter(0.3, 2.0, decimals=2, default=0.5, optimize=True, load=True, space='buy')
# FreqAI 标签定义exit_signal 的洛底下跌幅度(%
freqai_exit_down_percent = DecimalParameter(0.3, 2.0, decimals=2, default=0.5, optimize=True, load=True, space='buy')
# 定义可优化参数
# 初始入场金额: 75.00
# 加仓次数 相对降幅间隔 加仓金额
# ------- ------------ --------
# 0 N/A 75
# 1 0.045000 36.29
# 2 0.051750 163.31
# 3 0.059513 734.88
# 4 0.068439 3306.96
#
# 累计投入金额: 4316.43
max_entry_adjustments = IntParameter(2, 5, default=4, optimize=False, load=True, space='buy') # 最大加仓次数
add_position_callback = DecimalParameter(0.02, 0.06, decimals=3, default=0.047, optimize=False, load=True, space='buy') # 加仓回调百分比
add_position_growth = DecimalParameter(1.5, 5.0, decimals=2, default=4.5, optimize=False, load=True, space='buy') # 加仓金额增长因子保留2位小数用于hyperopt优化
add_position_multiplier = DecimalParameter(0.2, 2, decimals=2, default=1.35, optimize=False, load=True, space='buy') # 加仓间隔系数保留2位小数用于hyperopt优化
stake_divisor = DecimalParameter(2.0, 12.0, decimals=2, default=9.3, optimize=False, load=True, space='buy') # 加仓金额分母小数类型保留2位小数
# 线性ROI参数 - 用于线性函数: y = (a * (x + k)) + t
roi_param_a = DecimalParameter(-0.0002, -0.00005, decimals=5, default=-0.0001, optimize=True, load=True, space='sell') # 系数a
roi_param_k = IntParameter(20, 150, default=50, optimize=True, load=True, space='sell') # 偏移量k
roi_param_t = DecimalParameter(0.02, 0.18, decimals=3, default=0.06, optimize=True, load=True, space='sell') # 常数项t
# 出场条件阈值参数
exit_bb_upper_deviation = DecimalParameter(0.98, 1.02, decimals=2, default=1.0, optimize=True, load=True, space='sell')
exit_volume_multiplier = DecimalParameter(1.5, 3.0, decimals=1, default=2.0, optimize=True, load=True, space='sell')
rsi_overbought = IntParameter(50, 70, default=58, optimize=True, load=True, space='sell')
def informative_pairs(self):
pairs = self.dp.current_whitelist()
return [(pair, '15m') for pair in pairs] + [(pair, '1h') for pair in pairs]
def _validate_dataframe_columns(self, dataframe: DataFrame, required_columns: list, metadata: dict):
"""
验证数据框中是否包含所有需要的列。
如果缺少列,则记录警告日志。
"""
missing_columns = [col for col in required_columns if col not in dataframe.columns]
if missing_columns:
logger.warning(f"[{metadata['pair']}] 数据框中缺少以下列: {missing_columns}")
# ========================= FreqAI 特征与标签定义 =========================
def feature_engineering_expand_all(self, dataframe: DataFrame, period: int, metadata: dict, **kwargs) -> DataFrame:
"""FreqAI 全量特征:这里先用简单技术指标,后续可逐步扩展。"""
# 使用 rolling 计算 RSI减少看前偏差
delta = dataframe["close"].diff()
gain = delta.where(delta > 0, 0).rolling(window=period).mean()
loss = -delta.where(delta < 0, 0).rolling(window=period).mean()
rs = gain / loss
dataframe[f"%-rsi-{period}"] = 100 - (100 / (1 + rs))
dataframe[f"%-mfi-{period}"] = ta.mfi(dataframe["high"], dataframe["low"], dataframe["close"], dataframe["volume"], length=period)
adx_df = ta.adx(dataframe["high"], dataframe["low"], dataframe["close"], length=period)
adx_col = f"ADX_{period}"
if adx_col in adx_df.columns:
dataframe[f"%-adx-{period}"] = adx_df[adx_col]
return dataframe
def feature_engineering_expand_basic(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame:
"""FreqAI 基础特征。"""
dataframe["%-pct_change"] = dataframe["close"].pct_change().fillna(0)
dataframe["%-raw_volume"] = dataframe["volume"].fillna(0)
dataframe["%-raw_price"] = dataframe["close"].ffill() # 使用 ffill() 替代 fillna(method="ffill")
return dataframe
def feature_engineering_standard(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame:
"""FreqAI 标准时间类特征。"""
if "date" in dataframe.columns:
dataframe["%-day_of_week"] = dataframe["date"].dt.dayofweek
dataframe["%-hour_of_day"] = dataframe["date"].dt.hour
return dataframe
def set_freqai_targets(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame:
"""定义 FreqAI 训练标签:简单二分类版本 + 持仓时长预测。"""
# 从配置中读取预测窗口参数(禁止硬编码)
label_horizon = self.freqai_info.get('feature_parameters', {}).get('label_period_candles', 24)
# 动态计算上涨/下跌阈值
entry_up_percent = self.freqai_entry_up_percent.value / 100.0 # 转换为小数(如 0.01 表示 1%
exit_down_percent = self.freqai_exit_down_percent.value / 100.0
entry_up_threshold = 1.0 + entry_up_percent # 例如 1.01 表示 +1%
exit_down_threshold = 1.0 - exit_down_percent # 例如 0.99 表示 -1%
# 入场标签:未来窗口内的最高价是否超过 +1%
future_max = dataframe["close"].rolling(window=label_horizon, min_periods=1).max().shift(-label_horizon + 1)
dataframe["&s-entry_signal"] = np.where(
future_max > dataframe["close"] * entry_up_threshold,
1,
0,
)
# 出场标签:未来窗口内的最低价是否跌破 -1%
future_min = dataframe["close"].rolling(window=label_horizon, min_periods=1).min().shift(-label_horizon + 1)
dataframe["&s-exit_signal"] = np.where(
future_min < dataframe["close"] * exit_down_threshold,
1,
0,
)
# 新增:未来波动率预测标签(极端化方案)
# 计算当前波动率过10根K线的收盘价波动
current_volatility = dataframe["close"].pct_change().rolling(window=10, min_periods=5).std()
# 计算未来10根K线的波动率向未来移动
future_pct_change = dataframe["close"].pct_change().shift(-1) # 未来的收盘价变化
future_volatility = future_pct_change.rolling(window=10, min_periods=5).std().shift(-9) # 未来10根K线的波动率
# 标签:未来波动率 > 当前波动率 * 1.5 则标记为高波动(趋势启动)
volatility_ratio = future_volatility / (current_volatility + 1e-8) # 避免除以0
dataframe["&s-future_volatility"] = np.where(
volatility_ratio > 1.5,
1, # 未来高波动(趋势启动),继续持有
0 # 未来低波动(震荡市),快速止盈
)
return dataframe
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# ========== 新增ML目标变量数据新鲜度监控 ==========
# 检查FreqAI模型预测数据的新鲜度确保使用最新的预测结果
# 注意ML预测数据通常比普通量化数据滞后一个timeframe约3分钟
# 这是FreqAI的正常工作机制不影响策略有效性
try:
# 获取FreqAI分析过的数据帧
analyzed_df, _ = self.dp.get_analyzed_dataframe(metadata['pair'], self.timeframe)
if len(analyzed_df) > 0:
latest_analyzed_row = analyzed_df.iloc[-1]
latest_date = latest_analyzed_row.get('date')
if latest_date is not None:
# 确保日期是datetime对象
if not isinstance(latest_date, datetime):
try:
from pandas import Timestamp
if isinstance(latest_date, Timestamp):
latest_date = latest_date.to_pydatetime()
except:
pass
# 计算ML数据新鲜度使用统一函数
if isinstance(latest_date, datetime):
ml_age_minutes = self.calculate_data_freshness(latest_date, metadata['pair'], dataframe)
# 检查目标变量的类型和新鲜度
target_vars = []
for col in analyzed_df.columns:
if col.startswith('target_') or col.startswith('pred_'):
target_vars.append(col)
# 如果有多个目标变量,分别检查
if target_vars:
# 检查所有目标变量的列是否存在
available_targets = []
for var in target_vars:
if var in analyzed_df.columns:
available_targets.append(var)
if available_targets:
# 输出每个目标变量的检查结果
target_info = []
for target in available_targets[:3]: # 只显示前3个目标变量
# 由于所有目标变量共享同一时间戳,我们可以统一显示
if ml_age_minutes <= 10:
icon = ""
elif ml_age_minutes <= 30:
icon = "⚠️"
else:
icon = ""
target_info.append(f"{target}({icon}{ml_age_minutes:.1f}min)")
# 如果有更多目标变量,显示总数
if len(available_targets) > 3:
target_info.append(f"...共{len(available_targets)}个目标变量")
self.strategy_log(f"[{metadata['pair']}] ML目标变量数据新鲜度: {', '.join(target_info)}")
else:
# 如果没有目标变量列,显示通用信息
if ml_age_minutes <= 10:
ml_freshness_icon = ""
elif ml_age_minutes <= 30:
ml_freshness_icon = "⚠️"
else:
ml_freshness_icon = ""
self.strategy_log(f"[{metadata['pair']}] ML目标变量数据新鲜度: {ml_freshness_icon} {ml_age_minutes:.1f}min")
else:
# 如果没有检测到目标变量,显示通用信息
if ml_age_minutes <= 10:
ml_freshness_icon = ""
elif ml_age_minutes <= 30:
ml_freshness_icon = "⚠️"
else:
ml_freshness_icon = ""
self.strategy_log(f"[{metadata['pair']}] ML目标变量数据新鲜度: {ml_freshness_icon} {ml_age_minutes:.1f}min")
except Exception as e:
self.strategy_log(f"[{metadata['pair']}] ML数据新鲜度检查失败: {e}")
# 计算 3m 周期的指标
bb_length_value = self.bb_length.value
bb_std_value = self.bb_std.value
rsi_length_value = self.rsi_length.value
# 使用 rolling 计算布林带(减少看前偏差)
bb_ma_3m = dataframe['close'].rolling(window=bb_length_value).mean()
bb_std_3m = dataframe['close'].rolling(window=bb_length_value).std()
dataframe['bb_lower_3m'] = bb_ma_3m - (bb_std_value * bb_std_3m)
dataframe['bb_upper_3m'] = bb_ma_3m + (bb_std_value * bb_std_3m)
# 使用 rolling 计算 RSI减少看前偏差
delta_3m = dataframe['close'].diff()
gain_3m = delta_3m.where(delta_3m > 0, 0).rolling(window=rsi_length_value).mean()
loss_3m = -delta_3m.where(delta_3m < 0, 0).rolling(window=rsi_length_value).mean()
rs_3m = gain_3m / loss_3m
dataframe['rsi_3m'] = 100 - (100 / (1 + rs_3m))
# 新增 StochRSI 指标
stochrsi_3m = ta.stochrsi(dataframe['close'], length=rsi_length_value, rsi_length=rsi_length_value)
dataframe['stochrsi_k_3m'] = stochrsi_3m[f'STOCHRSIk_{rsi_length_value}_{rsi_length_value}_3_3']
dataframe['stochrsi_d_3m'] = stochrsi_3m[f'STOCHRSId_{rsi_length_value}_{rsi_length_value}_3_3']
# 新增 MACD 指标
macd_3m = ta.macd(dataframe['close'], fast=12, slow=26, signal=9)
dataframe['macd_3m'] = macd_3m['MACD_12_26_9']
dataframe['macd_signal_3m'] = macd_3m['MACDs_12_26_9']
dataframe['macd_hist_3m'] = macd_3m['MACDh_12_26_9']
# 使用 ewm 计算 EMA减少看前偏差adjust=False 确保实时计算)
dataframe['ema_50_3m'] = dataframe['close'].ewm(span=50, adjust=False).mean()
dataframe['ema_200_3m'] = dataframe['close'].ewm(span=200, adjust=False).mean()
# 成交量过滤
dataframe['volume_ma'] = dataframe['volume'].rolling(20).mean()
# 计算 ATR 用于动态止损和退出
dataframe['atr'] = ta.atr(dataframe['high'], dataframe['low'], dataframe['close'], length=14)
# 获取 15m 数据
df_15m = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='15m')
# 使用 rolling 计算 RSI减少看前偏差
delta_15m = df_15m['close'].diff()
gain_15m = delta_15m.where(delta_15m > 0, 0).rolling(window=rsi_length_value).mean()
loss_15m = -delta_15m.where(delta_15m < 0, 0).rolling(window=rsi_length_value).mean()
rs_15m = gain_15m / loss_15m
df_15m['rsi_15m'] = 100 - (100 / (1 + rs_15m))
# 使用 ewm 计算 EMA减少看前偏差
df_15m['ema_20_15m'] = df_15m['close'].ewm(span=20, adjust=False).mean()
df_15m['ema_50_15m'] = df_15m['close'].ewm(span=50, adjust=False).mean()
df_15m['ema_200_15m'] = df_15m['close'].ewm(span=200, adjust=False).mean()
# 新增 StochRSI 指标
stochrsi_15m = ta.stochrsi(df_15m['close'], length=rsi_length_value, rsi_length=rsi_length_value)
df_15m['stochrsi_k_15m'] = stochrsi_15m[f'STOCHRSIk_{rsi_length_value}_{rsi_length_value}_3_3']
df_15m['stochrsi_d_15m'] = stochrsi_15m[f'STOCHRSId_{rsi_length_value}_{rsi_length_value}_3_3']
# 新增 MACD 指标
macd_15m = ta.macd(df_15m['close'], fast=12, slow=26, signal=9)
df_15m['macd_15m'] = macd_15m['MACD_12_26_9']
df_15m['macd_signal_15m'] = macd_15m['MACDs_12_26_9']
df_15m['macd_hist_15m'] = macd_15m['MACDh_12_26_9']
# 将 15m 数据重新索引到主时间框架 (3m)
df_15m = df_15m.set_index('date').reindex(dataframe['date']).reset_index()
df_15m = df_15m.rename(columns={'index': 'date'})
df_15m = df_15m[['date', 'rsi_15m', 'ema_20_15m', 'ema_50_15m', 'ema_200_15m']].ffill()
# 合并 15m 数据
dataframe = dataframe.merge(df_15m, how='left', on='date')
# 获取 1h 数据
df_1h = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='1h')
# 使用 rolling 计算布林带(减少看前偏差)
bb_ma_1h = df_1h['close'].rolling(window=bb_length_value).mean()
bb_std_1h = df_1h['close'].rolling(window=bb_length_value).std()
df_1h['bb_lower_1h'] = bb_ma_1h - (bb_std_value * bb_std_1h)
df_1h['bb_upper_1h'] = bb_ma_1h + (bb_std_value * bb_std_1h)
# 添加 EMA5 和 EMA20 用于趋势过滤方案2宽松条件
df_1h['ema_5_1h'] = df_1h['close'].ewm(span=5, adjust=False).mean()
df_1h['ema_20_1h'] = df_1h['close'].ewm(span=20, adjust=False).mean()
# 计算 EMA20 斜率(用于趋势强度过滤)
df_1h['ema20_slope'] = df_1h['ema_20_1h'].diff()
# 归一化斜率,使其相对于价格水平标准化
df_1h['ema20_slope_normalized'] = df_1h['ema20_slope'] / df_1h['ema_20_1h']
# 检测 EMA5 向上穿越 EMA20添加安全检查
if len(df_1h) >= 2:
df_1h['ema5_cross_above_ema20'] = (
(df_1h['ema_5_1h'] > df_1h['ema_20_1h']) &
(df_1h['ema_5_1h'].shift(1) <= df_1h['ema_20_1h'].shift(1))
)
else:
# 数据不足时默认为False
df_1h['ema5_cross_above_ema20'] = False
# 使用 rolling 计算 RSI减少看前偏差
delta_1h = df_1h['close'].diff()
gain_1h = delta_1h.where(delta_1h > 0, 0).rolling(window=rsi_length_value).mean()
loss_1h = -delta_1h.where(delta_1h < 0, 0).rolling(window=rsi_length_value).mean()
rs_1h = gain_1h / loss_1h
df_1h['rsi_1h'] = 100 - (100 / (1 + rs_1h))
# 使用 ewm 计算 EMA减少看前偏差
df_1h['ema_50_1h'] = df_1h['close'].ewm(span=50, adjust=False).mean()
df_1h['ema_200_1h'] = df_1h['close'].ewm(span=200, adjust=False).mean()
df_1h['trend_1h'] = df_1h['close'] > df_1h['ema_50_1h'] # 1h上涨趋势
# 新增 StochRSI 指标
stochrsi_1h = ta.stochrsi(df_1h['close'], length=rsi_length_value, rsi_length=rsi_length_value)
df_1h['stochrsi_k_1h'] = stochrsi_1h[f'STOCHRSIk_{rsi_length_value}_{rsi_length_value}_3_3']
df_1h['stochrsi_d_1h'] = stochrsi_1h[f'STOCHRSId_{rsi_length_value}_{rsi_length_value}_3_3']
# 新增 MACD 指标
macd_1h = ta.macd(df_1h['close'], fast=12, slow=26, signal=9)
df_1h['macd_1h'] = macd_1h['MACD_12_26_9']
df_1h['macd_signal_1h'] = macd_1h['MACDs_12_26_9']
df_1h['macd_hist_1h'] = macd_1h['MACDh_12_26_9']
# 验证 MACD 列是否正确生成
#self.strategy_log(f"[{metadata['pair']}] 1小时 MACD 列: {list(macd_1h.columns)}")
# 确保 StochRSI 指标已正确计算
# 将 1h 数据重新索引到主时间框架 (3m),并填充缺失值
# 注意:只使用 ffill()(向前填充),不要用 bfill()(避免数据前瞻偏差)
df_1h = df_1h.set_index('date').reindex(dataframe['date']).ffill().reset_index()
df_1h = df_1h.rename(columns={'index': 'date'})
# Include macd_1h, macd_signal_1h, ema_5_1h, ema_20_1h, ema5_cross_above_ema20 in the column selection
df_1h = df_1h[['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h', 'bb_lower_1h', 'bb_upper_1h', 'stochrsi_k_1h', 'stochrsi_d_1h', 'macd_1h', 'macd_signal_1h', 'macd_hist_1h', 'ema_5_1h', 'ema_20_1h', 'ema20_slope_normalized', 'ema5_cross_above_ema20']].ffill()
# Validate that all required columns are present
required_columns = ['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h',
'bb_lower_1h', 'bb_upper_1h', 'stochrsi_k_1h', 'stochrsi_d_1h',
'macd_1h', 'macd_signal_1h', 'macd_hist_1h', 'ema_5_1h', 'ema_20_1h', 'ema20_slope_normalized', 'ema5_cross_above_ema20']
missing_columns = [col for col in required_columns if col not in df_1h.columns]
if missing_columns:
logger.error(f"[{metadata['pair']}] 缺少以下列: {missing_columns}")
raise KeyError(f"缺少以下列: {missing_columns}")
# 确保所有需要的列都被合并
required_columns = ['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h',
'bb_lower_1h', 'bb_upper_1h', 'stochrsi_k_1h', 'stochrsi_d_1h',
'macd_1h', 'macd_signal_1h', 'macd_hist_1h', 'ema_5_1h', 'ema_20_1h', 'ema20_slope_normalized', 'ema5_cross_above_ema20']
# 验证所需列是否存在
missing_columns = [col for col in required_columns if col not in df_1h.columns]
if missing_columns:
logger.error(f"[{metadata['pair']}] 缺少以下列: {missing_columns}")
raise KeyError(f"缺少以下列: {missing_columns}")
# 确保所有需要的列都被合并
required_columns = ['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h',
'bb_lower_1h', 'bb_upper_1h', 'stochrsi_k_1h', 'stochrsi_d_1h',
'macd_1h', 'macd_signal_1h', 'macd_hist_1h', 'ema_5_1h', 'ema_20_1h', 'ema20_slope_normalized', 'ema5_cross_above_ema20']
df_1h = df_1h[required_columns] # 确保包含所有必需的列包括EMA过滤相关列
# 合并 1h 数据
dataframe = dataframe.merge(df_1h, how='left', on='date').ffill()
# 验证合并后的列
#self.strategy_log(f"[{metadata['pair']}] 合并后的数据框列名: {list(dataframe.columns)}")
# K线形态看涨吞没
dataframe['bullish_engulfing'] = (
(dataframe['close'].shift(1) < dataframe['open'].shift(1)) &
(dataframe['close'] > dataframe['open']) &
(dataframe['close'] > dataframe['open'].shift(1)) &
(dataframe['open'] < dataframe['close'].shift(1))
)
# 计算各时间框架的趋势状态(牛/熊)
# 3m时间框架ema50下穿ema200为熊上穿为牛
dataframe['trend_3m'] = np.where(dataframe['ema_50_3m'] > dataframe['ema_200_3m'], 1, 0)
# 15m时间框架ema50下穿ema200为熊上穿为牛
dataframe['trend_15m'] = np.where(dataframe['ema_50_15m'] > dataframe['ema_200_15m'], 1, 0)
# 1h时间框架ema50下穿ema200为熊上穿为牛
dataframe['trend_1h_ema'] = np.where(dataframe['ema_50_1h'] > dataframe['ema_200_1h'], 1, 0)
# 计算熊牛得分0-100
# 权重3m熊牛权重1015m熊牛权重351h熊牛权重65
# 计算加权得分
dataframe['market_score'] = (
dataframe['trend_3m'] * 10 +
dataframe['trend_15m'] * 35 +
dataframe['trend_1h_ema'] * 65
)
# 确保得分在0-100范围内
dataframe['market_score'] = dataframe['market_score'].clip(lower=0, upper=100)
# 根据得分分类市场状态
dataframe['market_state'] = 'neutral'
dataframe.loc[dataframe['market_score'] > 70, 'market_state'] = 'strong_bull'
dataframe.loc[(dataframe['market_score'] > 50) & (dataframe['market_score'] <= 70), 'market_state'] = 'weak_bull'
dataframe.loc[(dataframe['market_score'] >= 30) & (dataframe['market_score'] <= 50), 'market_state'] = 'neutral'
dataframe.loc[(dataframe['market_score'] > 10) & (dataframe['market_score'] < 30), 'market_state'] = 'weak_bear'
dataframe.loc[dataframe['market_score'] <= 10, 'market_state'] = 'strong_bear'
# 创建一个使用前一行市场状态的列避免在populate_entry_trend中使用iloc[-1]
dataframe['prev_market_state'] = dataframe['market_state'].shift(1)
# 为第一行设置默认值
dataframe['prev_market_state'] = dataframe['prev_market_state'].fillna('neutral')
# 记录当前的市场状态
if len(dataframe) > 0:
current_score = dataframe['market_score'].iloc[-1]
current_state = dataframe['market_state'].iloc[-1]
#self.strategy_log(f"[{metadata['pair']}] 熊牛得分: {current_score:.1f}, 市场状态: {current_state}")
# 计算并显示基于未来波动率的最终阈值
if len(dataframe) > 0:
# 检查是否有未来波动率列
future_volatility = None
if '&s-future_volatility' in dataframe.columns:
future_volatility = dataframe['&s-future_volatility'].iloc[-1]
elif '&-future_volatility' in dataframe.columns:
future_volatility = dataframe['&-future_volatility'].iloc[-1]
if future_volatility is not None:
# 确保 future_volatility 在 [0, 1] 范围内
future_volatility = max(0.0, min(1.0, future_volatility))
# 使用与 confirm_trade_entry 相同的线性函数计算阈值
base_signal_ratio_threshold = 1.33
x_min = 0.3
x_max = 0.9
y_min = 1.6
y_max = 1.1
if future_volatility >= x_max:
signal_ratio_threshold = y_max
elif future_volatility <= x_min:
signal_ratio_threshold = y_min
else:
signal_ratio_threshold = y_max + (y_min - y_max) * (future_volatility - x_max) / (x_min - x_max)
# 限制阈值范围
signal_ratio_threshold = max(1.1, min(1.6, signal_ratio_threshold))
self.strategy_log(f"[{metadata['pair']}] 基于未来波动率的最终阈值: {signal_ratio_threshold:.2f} (波动率: {future_volatility:.2f})")
#self.strategy_log(f"[{metadata['pair']}] 各时间框架趋势: 3m={'牛' if dataframe['trend_3m'].iloc[-1] == 1 else '熊'}, \
# 15m={'牛' if dataframe['trend_15m'].iloc[-1] == 1 else '熊'}, \
# 1h={'牛' if dataframe['trend_1h_ema'].iloc[-1] == 1 else '熊'}")
# 调试:打印指标值(最后 5 行),验证时间对齐
#print(f"Pair: {metadata['pair']}, Last 5 rows after reindexing:")
#print(dataframe[['date', 'close', 'bb_lower_3m', 'rsi_3m', 'rsi_15m', 'rsi_1h', 'trend_1h',
# 'trend_3m', 'trend_15m', 'trend_1h_ema', 'market_score', 'market_state',
# 'bullish_engulfing', 'volume', 'volume_ma']].tail(5))
# 打印最终数据框的列名以验证
#self.strategy_log(f"[{metadata['pair']}] 最终数据框列名: {list(dataframe.columns)}")
# 检查FreqAI是否启用
freqai_enabled = False
try:
# 更可靠的检查方法检查config中是否启用了FreqAI
freqai_enabled = self.config.get('freqai', {}).get('enabled', False)
# 同时检查freqai属性是否存在且有效
if freqai_enabled and hasattr(self, 'freqai') and self.freqai is not None:
freqai_enabled = True
else:
freqai_enabled = False
except Exception as e:
self.strategy_log(f"[{metadata['pair']}] 检查FreqAI启用状态时出错: {str(e)}")
freqai_enabled = False
# 只有在FreqAI启用时才调用
if freqai_enabled:
self.strategy_log(f"[{metadata['pair']}] FreqAI已启用启动FreqAI处理")
# 启用 FreqAI在所有指标计算完成后调用
dataframe = self.freqai.start(dataframe, metadata, self)
# ========== 新增记录FreqAI多目标预测结果 ==========
# 检查并记录所有可用的预测列确认XGBoostRegressorMultiTarget是否为所有目标变量生成预测
try:
pred_columns = []
for col in dataframe.columns:
if col.startswith('&-') or col.startswith('&s-'): # FreqAI预测列通常以&-或&s-开头
pred_columns.append(col)
if pred_columns:
# 获取最后一行的预测值
last_row = dataframe.iloc[-1]
pred_values = {}
for col in pred_columns:
if col in last_row:
pred_values[col] = round(float(last_row[col]), 4)
if pred_values:
pred_info = ", ".join([f"{k}:{v}" for k, v in pred_values.items()])
self.strategy_log(f"[{metadata['pair']}] FreqAI多目标预测: {pred_info}")
# 特别记录三个主要目标变量
entry_val = pred_values.get('&s-entry_signal', 'N/A')
exit_val = pred_values.get('&s-exit_signal', 'N/A')
vol_val = pred_values.get('&s-future_volatility', 'N/A')
self.strategy_log(f"[{metadata['pair']}] FreqAI三目标预测 - 入场:{entry_val}, 出场:{exit_val}, 波动率:{vol_val}")
# ========== 新增发送HTTP请求 ==========
# 从配置中读取API URL
api_url = self.config.get('freqai', {}).get('ml_prediction_api_url', '')
if api_url:
import json
import urllib.request
import urllib.error
import os
# 从环境变量获取virtualHostName
virtual_host_name = os.environ.get('VIRTUAL_HOST_NAME')
# 只有正确获取到virtualHostName时才发送请求
if virtual_host_name and virtual_host_name != 'default-vhost':
# 准备请求数据
request_data = {
"hostName": "freqtrade-live-c6b6f357-20260215212649",
"virtualHostName": virtual_host_name,
"pairName": metadata['pair'],
"timestamp": int(last_row['date'].timestamp()),
"values": {
"&s-entry_signal": pred_values.get('&s-entry_signal', 0),
"&s-exit_signal": pred_values.get('&s-exit_signal', 0),
"&s-future_volatility": pred_values.get('&s-future_volatility', 0)
}
}
# 转换为JSON
json_data = json.dumps(request_data).encode('utf-8')
# 发送请求
try:
req = urllib.request.Request(api_url, data=json_data, method='POST')
req.add_header('Content-Type', 'application/json')
with urllib.request.urlopen(req, timeout=5) as response:
status_code = response.getcode()
if status_code == 200:
self.strategy_log(f"[{metadata['pair']}] 成功发送ML预测数据到API (virtualHostName: {virtual_host_name})")
else:
self.strategy_log(f"[{metadata['pair']}] 发送ML预测数据失败状态码: {status_code}")
except urllib.error.URLError as e:
self.strategy_log(f"[{metadata['pair']}] 发送ML预测数据网络错误: {str(e)}")
except Exception as e:
self.strategy_log(f"[{metadata['pair']}] 发送ML预测数据其他错误: {str(e)}")
else:
self.strategy_log(f"[{metadata['pair']}] 未正确获取到virtualHostName跳过发送ML预测数据")
else:
self.strategy_log(f"[{metadata['pair']}] ML预测API URL未配置跳过发送")
except Exception as e:
self.strategy_log(f"[{metadata['pair']}] 记录FreqAI预测结果时出错: {str(e)}")
else:
self.strategy_log(f"[{metadata['pair']}] FreqAI未启用跳过FreqAI处理")
return dataframe
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 出场信号基于趋势和量价关系
# 条件1: 价格突破布林带上轨(使用可优化的偏差参数)
breakout_condition = dataframe['close'] >= dataframe['bb_upper_1h'] * self.exit_bb_upper_deviation.value
# 条件2: 成交量显著放大(使用可优化的成交量乘数)
volume_spike = dataframe['volume'] > dataframe['volume_ma'] * self.exit_volume_multiplier.value
# 条件3: MACD 下降趋势
macd_downward = dataframe['macd_1h'] < dataframe['macd_signal_1h']
# 条件4: RSI 进入超买区域(使用可优化的超买阈值)
rsi_overbought = dataframe['rsi_1h'] > self.rsi_overbought.value
# 合并所有条件
final_condition = breakout_condition | volume_spike | macd_downward | rsi_overbought
# 设置出场信号
dataframe.loc[final_condition, 'exit_long'] = 1
# 设置出场价格上浮1.25%(使用乘法避免除零风险)
# Freqtrade 会优先使用 exit_price 列作为限价单价格
final_exit_condition = dataframe['exit_long'] == 1
#dataframe.loc[final_exit_condition, 'exit_price'] = dataframe.loc[final_exit_condition, 'close'] * 1.0125
# 增强调试信息
#self.strategy_log(f"[{metadata['pair']}] 出场条件检查:")
#self.strategy_log(f" - 价格突破布林带上轨: {breakout_condition.sum()} 次")
#self.strategy_log(f" - 成交量显著放大: {volume_spike.sum()} 次")
#self.strategy_log(f" - MACD 下降趋势: {macd_downward.sum()} 次")
#self.strategy_log(f" - RSI 超买: {rsi_overbought.sum()} 次")
#self.strategy_log(f" - 最终条件: {final_condition.sum()} 次")
#self.strategy_log(f" - 使用参数: exit_bb_upper_deviation={self.exit_bb_upper_deviation.value}, exit_volume_multiplier={self.exit_volume_multiplier.value}, rsi_overbought={self.rsi_overbought.value}")
# 日志记录
#if dataframe['exit_long'].sum() > 0:
# self.strategy_log(f"[{metadata['pair']}] 触发出场信号数量: {dataframe['exit_long'].sum()}")
return dataframe
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 确保prev_market_state列存在
if 'prev_market_state' not in dataframe.columns:
dataframe['prev_market_state'] = 'neutral'
# 条件1: 价格接近布林带下轨(允许一定偏差)
close_to_bb_lower_1h = (dataframe['close'] <= dataframe['bb_lower_1h'] * self.bb_lower_deviation.value) # 可优化偏差
# 条件2: RSI 不高于阈值(根据市场状态动态调整)
# 为每一行创建动态阈值
rsi_condition_1h = dataframe.apply(lambda row:
row['rsi_1h'] < self.rsi_bull_threshold.value if row['prev_market_state'] in ['strong_bull', 'weak_bull'] else row['rsi_1h'] < self.rsi_oversold.value,
axis=1)
# 条件3: StochRSI 处于超卖区域(根据市场状态动态调整)
stochrsi_condition_1h = dataframe.apply(lambda row:
(row['stochrsi_k_1h'] < self.stochrsi_bull_threshold.value and row['stochrsi_d_1h'] < self.stochrsi_bull_threshold.value) if row['prev_market_state'] in ['strong_bull', 'weak_bull']
else (row['stochrsi_k_1h'] < self.stochrsi_neutral_threshold.value and row['stochrsi_d_1h'] < self.stochrsi_neutral_threshold.value),
axis=1)
# 条件4: MACD 上升趋势
macd_condition_1h = dataframe['macd_1h'] > dataframe['macd_signal_1h']
# 条件5: 成交量显著放大(可选条件)
volume_spike = dataframe['volume'] > dataframe['volume_ma'] * self.volume_multiplier.value
# 条件6: 布林带宽度过滤(避免窄幅震荡)
bb_width = (dataframe['bb_upper_1h'] - dataframe['bb_lower_1h']) / dataframe['close']
bb_width_condition = bb_width > self.bb_width_threshold.value # 可优化的布林带宽度阈值
# 辅助条件: 3m 和 15m 趋势确认(允许部分时间框架不一致)
trend_confirmation = (dataframe['trend_3m'] == 1) | (dataframe['trend_15m'] == 1)
# 新增EMA趋势过滤条件方案2宽松版本
# 条件1EMA5保持在EMA20之上 或 条件2最近20根1h K线内发生过向上穿越
# 这样既能捕捉趋势启动,又能在趋势延续时继续入场
if 'ema_5_1h' in dataframe.columns and 'ema_20_1h' in dataframe.columns:
# 条件1EMA5保持在EMA20之上
ema5_above_ema20 = dataframe['ema_5_1h'] > dataframe['ema_20_1h']
# 条件2最近20根1h K线内发生过向上穿越
if 'ema5_cross_above_ema20' in dataframe.columns:
# 使用rolling.max检查最近20根K线内是否有True值
recent_cross = dataframe['ema5_cross_above_ema20'].rolling(window=20, min_periods=1).max() == 1
# 两个条件满足其一即可
ema5_20_condition = ema5_above_ema20 | recent_cross
else:
# 如果没有交叉列,只用保持在上方的条件
ema5_20_condition = ema5_above_ema20
# 新增条件EMA20斜率阈值检查
if 'ema20_slope_normalized' in dataframe.columns:
# EMA20斜率为正上升趋势且超过阈值
ema20_positive_slope = dataframe['ema20_slope_normalized'] > self.ema20_slope_threshold.value
# 组合条件EMA5/EMA20条件 AND EMA20斜率条件
ema_trend_filter = ema5_20_condition & ema20_positive_slope
else:
# 如果没有斜率列只用EMA5/EMA20条件
ema_trend_filter = ema5_20_condition
else:
# 如果列不存在创建一个全False的Series不允许入场
self.strategy_log(f"[{metadata['pair']}] 警告ema_5_1h或ema_20_1h列不存在过滤条件设为False")
ema_trend_filter = pd.Series(False, index=dataframe.index)
# 合并所有条件(减少强制性条件)
# 至少满足多个条件中的一定数量并且必须满足EMA趋势过滤
condition_count = (
close_to_bb_lower_1h.astype(int) +
rsi_condition_1h.astype(int) +
stochrsi_condition_1h.astype(int) +
macd_condition_1h.astype(int) +
(volume_spike | bb_width_condition).astype(int) + # 成交量或布林带宽度满足其一即可
trend_confirmation.astype(int)
)
# 条件1超卖反弹入场原逻辑
# 要求:基本条件满足 + EMA趋势过滤 + 价格低于15分钟EMA20
price_below_ema20_15m = dataframe['close'] < dataframe['ema_20_15m']
basic_condition = condition_count >= self.min_condition_count.value
pullback_entry = basic_condition & ema_trend_filter & price_below_ema20_15m
# 条件2趋势跟踪入场新增逻辑
# 不要求价格低于EMA20而是要求
# - EMA趋势过滤满足
# - MACD上升趋势
# - 趋势确认3m或15m趋势向上
# - 价格高于15分钟EMA20表明处于上涨趋势
price_above_ema20_15m = dataframe['close'] >= dataframe['ema_20_15m']
# 趋势跟踪需要的条件数量较少,因为已经在明显上涨趋势中
trend_condition_count = (
macd_condition_1h.astype(int) +
trend_confirmation.astype(int) +
(volume_spike | bb_width_condition).astype(int)
)
# 趋势跟踪只需要满足2个条件即可降低门槛
trend_basic_condition = trend_condition_count >= 2
trend_following_entry = trend_basic_condition & ema_trend_filter & price_above_ema20_15m
# 最终条件:超卖反弹入场 或 趋势跟踪入场
final_condition = pullback_entry | trend_following_entry
# 设置入场信号
dataframe.loc[final_condition, 'enter_long'] = 1
# ========== 新增:入场诊断统计(回测可用) ==========
# 先输出最新数据时间dataframe最后一行用于判断数据新鲜度
if 'date' in dataframe.columns and len(dataframe) > 0:
latest_data_date = dataframe['date'].iloc[-1]
try:
if isinstance(latest_data_date, datetime):
display_latest = latest_data_date
if display_latest.tzinfo is None:
display_latest = display_latest.replace(tzinfo=timezone.utc)
display_latest = display_latest.astimezone(UTC_PLUS_8)
latest_time_str = display_latest.strftime('%H:%M:%S')
# 计算数据新鲜度(使用统一函数)
data_freshness = self.calculate_data_freshness(latest_data_date, metadata['pair'], dataframe)
# 数据新鲜度判断
if data_freshness <= 3: # 3分钟内为新鲜数据
fresh_icon = ""
elif data_freshness <= 10: # 3-10分钟为可接受延迟
fresh_icon = "⚠️"
else: # 超过10分钟为过期数据
fresh_icon = ""
self.strategy_log(
f"[{metadata['pair']}] 数据新鲜度: {fresh_icon} {data_freshness:.1f}min | "
f"最新K线: {latest_time_str} | 总行数: {len(dataframe)}"
)
else:
from pandas import Timestamp
if isinstance(latest_data_date, Timestamp):
display_latest = latest_data_date.to_pydatetime()
if display_latest.tzinfo is None:
display_latest = display_latest.replace(tzinfo=timezone.utc)
display_latest = display_latest.astimezone(UTC_PLUS_8)
latest_time_str = display_latest.strftime('%H:%M:%S')
latest_naive = display_latest.replace(tzinfo=None)
# 计算数据新鲜度(使用统一函数)
data_freshness = self.calculate_data_freshness(latest_data_date, metadata['pair'], dataframe)
# 数据新鲜度判断
if data_freshness <= 3: # 3分钟内为新鲜数据
fresh_icon = ""
elif data_freshness <= 10: # 3-10分钟为可接受延迟
fresh_icon = "⚠️"
else: # 超过10分钟为过期数据
fresh_icon = ""
self.strategy_log(
f"[{metadata['pair']}] 数据新鲜度: {fresh_icon} {data_freshness:.1f}min | "
f"最新K线: {latest_time_str} | 总行数: {len(dataframe)}"
)
except Exception as e:
self.strategy_log(f"[{metadata['pair']}] 数据新鲜度计算异常: {e}")
# ========== 诊断统计结束 ==========
# 设置入场价格下调1.67%(使用乘法避免除零风险)
final_condition_updated = dataframe['enter_long'] == 1
#dataframe.loc[final_condition_updated, 'enter_price'] = dataframe.loc[final_condition_updated, 'close'] * 0.9833
# 增强调试信息
# 确保ema_trend_filter是Series类型才能调用sum()
if isinstance(ema_trend_filter, pd.Series):
ema_trend_count = ema_trend_filter.sum()
else:
ema_trend_count = 0
basic_condition_count = basic_condition.sum()
final_condition_count = final_condition.sum()
self.strategy_log(f"[{metadata['pair']}] 入场条件检查:")
self.strategy_log(f" - 价格接近布林带下轨: {close_to_bb_lower_1h.sum()}")
self.strategy_log(f" - RSI 超卖: {rsi_condition_1h.sum()}")
self.strategy_log(f" - StochRSI 超卖: {stochrsi_condition_1h.sum()}")
self.strategy_log(f" - MACD 上升趋势: {macd_condition_1h.sum()}")
self.strategy_log(f" - 成交量或布林带宽度: {(volume_spike | bb_width_condition).sum()}")
self.strategy_log(f" - 价格低于15分钟EMA20: {price_below_ema20_15m.sum()}")
self.strategy_log(f" - 价格高于15分钟EMA20: {price_above_ema20_15m.sum()}")
self.strategy_log(f" - 趋势确认: {trend_confirmation.sum()}")
self.strategy_log(f" - EMA趋势过滤(在上方或20根K线内穿越): {ema_trend_count}")
self.strategy_log(f" - 超卖反弹入场信号: {pullback_entry.sum()}")
self.strategy_log(f" - 趋势跟踪入场信号: {trend_following_entry.sum()}")
self.strategy_log(f" - 最终入场信号(总计): {final_condition.sum()}")
# ========== 新增最近5个时间点的条件满足状况可视化 ==========
# 显示最近5个时间点的条件满足情况更直观
if len(dataframe) >= 5:
recent_indices = dataframe.index[-5:]
self.strategy_log(f"[{metadata['pair']}] 最近5个时间点条件满足状况:")
# 辅助函数根据数值距离阈值的程度返回颜色emoji
def get_color_emoji(value, threshold, condition_type='less_than'):
"""
根据数值距离阈值的程度返回颜色emoji
condition_type: 'less_than' 表示值小于阈值为满足条件
'greater_than' 表示值大于阈值为满足条件
"""
if condition_type == 'less_than':
# value越小越满足条件所以距离阈值越远(值越大)越红
distance = value - threshold # 距离阈值的距离,正值表示未满足,负值表示已满足
else: # greater_than
# value越大越满足条件所以距离阈值越远(值越小)越红
distance = threshold - value # 距离阈值的距离,正值表示未满足,负值表示已满足
# 根据距离阈值的程度返回不同颜色
if condition_type == 'less_than':
is_satisfied = value < threshold
else: # greater_than
is_satisfied = value > threshold
if is_satisfied:
# 条件已满足,使用绿色系
if distance < -threshold * 0.5: # 远超阈值
return '🟢' # 深绿
elif distance < -threshold * 0.2: # 明显超越阈值
return '🟢' # 绿色
else: # 接近阈值但仍满足
return '🟡' # 黄绿色
else:
# 条件未满足,使用红色系
if distance > abs(threshold) * 0.5: # 远离阈值
return '🔴' # 深红
elif distance > abs(threshold) * 0.2: # 接近阈值
return '🟠' # 橙色
else: # 非常接近阈值
return '🟡' # 黄色(接近满足)
# RSI 超卖条件可视化RSI越小越好
rsi_values = dataframe['rsi_1h'].iloc[-5:]
rsi_threshold = self.rsi_bull_threshold.value # 使用牛市阈值作为参考
rsi_visual = ''.join([get_color_emoji(val, rsi_threshold, 'less_than') for val in rsi_values])
self.strategy_log(f" - [{rsi_visual}]#RSI 超卖(越红越大)")
# StochRSI 超卖条件可视化StochRSI越小越好
stochrsi_values = dataframe['stochrsi_k_1h'].iloc[-5:]
stochrsi_threshold = 0.2 # 固定阈值
stochrsi_visual = ''.join([get_color_emoji(val, stochrsi_threshold, 'less_than') for val in stochrsi_values])
self.strategy_log(f" - [{stochrsi_visual}]#StochRSI超卖(越红越大)")
# MACD 上升趋势条件可视化MACD直方图越大越好
macd_values = dataframe['macd_hist_1h'].iloc[-5:]
macd_threshold = 0 # 直方图大于0为上升趋势
macd_visual = ''.join([get_color_emoji(val, macd_threshold, 'greater_than') for val in macd_values])
self.strategy_log(f" - [{macd_visual}]#MACD上升趋势(越绿越大)")
# 趋势确认条件可视化(布尔值,使用原来的✅❌)
trend_recent = trend_confirmation.iloc[-5:].tolist()
trend_visual = ''.join(['' if x else '' for x in trend_recent])
self.strategy_log(f" - [{trend_visual}]#趋势确认")
# 价格接近布林带下轨条件可视化(价格相对于布林带下轨的比例,越小越好)
price_bb_ratios = (dataframe['close'] / dataframe['bb_lower_1h']).iloc[-5:]
bb_threshold = self.bb_lower_deviation.value # 价格/布林带下轨应该小于这个值
bb_lower_visual = ''.join([get_color_emoji(val, bb_threshold, 'less_than') for val in price_bb_ratios])
self.strategy_log(f" - [{bb_lower_visual}]#价格接近布林带下轨(越绿越近)")
# 如果EMA条件满足但最终条件未满足输出详细信息
if ema_trend_count > 0 and final_condition_count == 0:
self.strategy_log(f"[{metadata['pair']}] 注意:检测到 {ema_trend_count} 次EMA趋势过滤满足但由于其他条件不足未能生成入场信号")
# 在populate_entry_trend方法末尾添加
# 计算条件间的相关性
conditions = DataFrame({
'close_to_bb': close_to_bb_lower_1h,
'rsi': rsi_condition_1h,
'stochrsi': stochrsi_condition_1h,
'macd': macd_condition_1h,
'vol_bb': (volume_spike | bb_width_condition),
'trend': trend_confirmation,
'ema_trend': ema_trend_filter
})
correlation = conditions.corr().mean().mean()
#self.strategy_log(f"[{metadata['pair']}] 条件平均相关性: {correlation:.2f}")
# 日志记录
#if dataframe['enter_long'].sum() > 0:
# self.strategy_log(f"[{metadata['pair']}] 发现入场信号数量: {dataframe['enter_long'].sum()}")
return dataframe
def detect_h1_rapid_rise(self, pair: str) -> bool:
"""
检测1小时K线图上的剧烈拉升情况轻量级版本用于confirm_trade_entry
参数:
- pair: 交易对
返回:
- bool: 是否处于不稳固区域
"""
try:
# 获取1小时K线数据
df_1h = self.dp.get_pair_dataframe(pair=pair, timeframe='1h')
# 获取当前优化参数值
max_candles = self.h1_max_candles.value
rapid_rise_threshold = self.h1_rapid_rise_threshold.value
max_consecutive_candles = self.h1_max_consecutive_candles.value
# 确保有足够的K线数据
if len(df_1h) < max_candles:
logger.warning(f"[{pair}] 1h K线数据不足 {max_candles} 根,当前只有 {len(df_1h)} 根,无法完整检测剧烈拉升")
return False
# 获取最近的K线
recent_data = df_1h.iloc[-max_candles:].copy()
# 检查连续最多几根K线内的最大涨幅
rapid_rise_detected = False
max_rise = 0
for i in range(len(recent_data) - max_consecutive_candles + 1):
window_data = recent_data.iloc[i:i + max_consecutive_candles]
window_low = window_data['low'].min()
window_high = window_data['high'].max()
# 计算区间内的最大涨幅
if window_low > 0:
rise_percentage = (window_high - window_low) / window_low
if rise_percentage > max_rise:
max_rise = rise_percentage
# 检查是否超过阈值
if rise_percentage >= rapid_rise_threshold:
rapid_rise_detected = True
#self.strategy_log(f"[{pair}] 检测到剧烈拉升: 从 {window_low:.2f} 到 {window_high:.2f} ({rise_percentage:.2%}) 在 {max_consecutive_candles} 根K线内")
break
current_price = recent_data['close'].iloc[-1]
#self.strategy_log(f"[{pair}] 剧烈拉升检测结果: {'不稳固' if rapid_rise_detected else '稳固'}")
#self.strategy_log(f"[{pair}] 最近最大涨幅: {max_rise:.2%}")
return rapid_rise_detected
except Exception as e:
logger.error(f"[{pair}] 剧烈拉升检测过程中发生错误: {str(e)}")
return False
def confirm_trade_entry(
self,
pair: str,
order_type: str,
amount: float,
rate: float,
time_in_force: str,
current_time: datetime,
entry_tag: str | None,
side: str,
**kwargs,
) -> bool:
"""
交易买入前的确认函数,用于最终决定是否执行交易
此处实现从API获取数据、高波动禁止入场、时间戳和RMSE检查逻辑
"""
self.strategy_log(f"[{pair}] confirm_trade_entry 被调用 - 价格: {rate:.8f}, 时间: {current_time}")
# 仅对多头交易进行检查
if side != 'long':
self.strategy_log(f"[{pair}] 非多头交易,跳过入场检查")
return False
# 记录所有拒绝条件的列表
rejected_conditions = []
# 冷启动保护程序启动后前20分钟内不允许入场
cold_start_rejected = False
if hasattr(self, "_strategy_start_time"):
# 在回测模式下不启用冷启动保护
if not self.config.get("dry_run", False):
warmup_minutes = 20
# current_time 来自框架,是 offset-aware (UTC)
# _strategy_start_time 是 offset-naive (本地时间)
# 需要统一处理:把 current_time 转换为本地时间并移除时区信息
if current_time.tzinfo is not None:
local_current_time = current_time.astimezone(UTC_PLUS_8).replace(tzinfo=None)
else:
local_current_time = current_time
elapsed_minutes = (local_current_time - self._strategy_start_time).total_seconds() / 60.0
if elapsed_minutes < warmup_minutes:
rejected_conditions.append(f"冷启动保护: 策略启动未满 {warmup_minutes} 分钟(已运行 {elapsed_minutes:.1f} 分钟)")
self.strategy_log(f"[{pair}] {rejected_conditions[-1]}")
cold_start_rejected = True
# 检查1入场间隔控制使用hyperopt参数
interval_rejected = False
if pair in self._last_entry_time and not cold_start_rejected:
last_entry = self._last_entry_time[pair]
time_diff = (current_time - last_entry).total_seconds() * 0.0166666667 # 转换为分钟(使用乘法避免除法)
if time_diff < self.entry_interval_minutes.value:
rejected_conditions.append(f"入场间隔不足: 距离上次入场 {time_diff:.1f}分钟 < {self.entry_interval_minutes.value}分钟")
self.strategy_log(f"[{pair}] {rejected_conditions[-1]}")
interval_rejected = True
# 检查2检查是否处于剧烈拉升的不稳固区域
unstable_rejected = False
if not cold_start_rejected and not interval_rejected:
is_unstable_region = self.detect_h1_rapid_rise(pair)
if is_unstable_region:
rejected_conditions.append("剧烈拉升检测: 检测到1小时图剧烈拉升")
self.strategy_log(f"[{pair}] {rejected_conditions[-1]}")
unstable_rejected = True
# 如果有任何拒绝条件,直接返回 False
if rejected_conditions:
self.strategy_log(f"[{pair}] 入场被拒绝,原因: {', '.join(rejected_conditions)}")
return False
# 所有条件通过,允许入场并记录入场时间
self.strategy_log(f"[{pair}] 所有条件通过,允许入场")
self._last_entry_time[pair] = current_time
self.strategy_log(f"[{pair}] 更新入场时间为: {current_time}")
return True
def custom_stoploss(self, pair: str, trade: 'Trade', current_time, current_rate: float,
current_profit: float, **kwargs) -> float:
# 动态止损基于ATR
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1]
atr = last_candle['atr']
# 获取当前市场状态
current_state = dataframe['market_state'].iloc[-1] if 'market_state' in dataframe.columns else 'unknown'
# 渐进式止损策略
if current_profit > 0.05: # 利润超过5%时
return -3.0 * atr / current_rate
elif current_profit > 0.03: # 利润超过3%时
return -2.5 * atr / current_rate
elif current_profit > 0.01: # 利润超过1%时
return -2.0 * atr / current_rate
# 在强劲牛市中,即使小亏损也可以容忍更大回调
if current_state == 'strong_bull' and current_profit > -0.01:
return -1.8 * atr / current_rate
if atr > 0:
return -1.2 * atr / current_rate
return self.stoploss
def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float,
rate: float, time_in_force: str, exit_reason: str,
current_time: datetime, **kwargs) -> bool:
"""
智能出场确认结合ML预测信号来决定是否出场
"""
# 计算持仓时长(分钟)
trade_age_minutes = (current_time - trade.open_date_utc).total_seconds() / 60
# 设置最小持仓时间保护,避免交易刚开仓就立刻被平仓
min_hold_minutes = 3 # 至少持有3分钟
if trade_age_minutes < min_hold_minutes:
self.strategy_log(f"[{pair}] 交易开仓时间仅{trade_age_minutes:.1f}分钟,小于最小持仓时间{min_hold_minutes}分钟,阻止出场")
return False
# 检查FreqAI是否启用
freqai_enabled = False
try:
# 更可靠的检查方法检查config中是否启用了FreqAI
freqai_enabled = self.config.get('freqai', {}).get('enabled', False)
# 同时检查freqai属性是否存在且有效
if freqai_enabled and hasattr(self, 'freqai') and self.freqai is not None:
freqai_enabled = True
else:
freqai_enabled = False
except Exception as e:
self.strategy_log(f"[{pair}] 检查FreqAI启用状态时出错: {str(e)}")
freqai_enabled = False
try:
if freqai_enabled:
self.strategy_log(f"[{pair}] FreqAI已启用使用ML信号进行出场决策")
# 获取当前的ML预测数据
df, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
if len(df) > 0:
last_row = df.iloc[-1]
# 获取当前入场信号概率
current_entry_prob = None
if '&s-entry_signal' in df.columns:
current_entry_prob = float(last_row['&s-entry_signal'])
elif '&-entry_signal_prob' in df.columns:
current_entry_prob = float(last_row['&-entry_signal_prob'])
elif '&-s-entry_signal_prob' in df.columns:
current_entry_prob = float(last_row['&-s-entry_signal_prob'])
# 获取当前出场信号概率
current_exit_prob = None
if '&s-exit_signal' in df.columns:
current_exit_prob = float(last_row['&s-exit_signal'])
elif '&-exit_signal_prob' in df.columns:
current_exit_prob = float(last_row['&-exit_signal_prob'])
elif '&-s-exit_signal_prob' in df.columns:
current_exit_prob = float(last_row['&-s-exit_signal_prob'])
# 获取波动率预测
predicted_volatility = None
if '&-target_volatility' in df.columns:
predicted_volatility = float(last_row['&-target_volatility'])
# 获取未来波动率信号
future_volatility = None
if '&s-future_volatility' in df.columns:
future_volatility = float(last_row['&s-future_volatility'])
elif '&-future_volatility' in df.columns:
future_volatility = float(last_row['&-future_volatility'])
# 获取趋势预测
trend_prediction = None
if '&-target_trend' in df.columns:
trend_prediction = float(last_row['&-target_trend'])
# 应用新规则当exit_signal > entry_signal × 阈值时退出
# 基础信号比率阈值
base_signal_ratio_threshold = 1.33
signal_ratio_threshold = base_signal_ratio_threshold
# 根据未来波动率调整阈值(线性函数)
if future_volatility is not None:
# 确保 future_volatility 在 [0, 1] 范围内
future_volatility = max(0.0, min(1.0, future_volatility))
# 使用线性函数x范围 0.90.3y范围 1.11.6
# 当波动率为0.9时阈值为1.1当波动率为0.3时阈值为1.6
x_min = 0.3
x_max = 0.9
y_min = 1.6
y_max = 1.1
# 线性插值
if future_volatility >= x_max:
# 高于最高波动率阈值,使用最小阈值
signal_ratio_threshold = y_max
elif future_volatility <= x_min:
# 低于最低波动率阈值,使用最大阈值
signal_ratio_threshold = y_min
else:
# 中间值线性插值
signal_ratio_threshold = y_max + (y_min - y_max) * (future_volatility - x_max) / (x_min - x_max)
# 限制阈值范围
signal_ratio_threshold = max(1.1, min(1.6, signal_ratio_threshold))
if current_entry_prob is not None and current_exit_prob is not None:
# 确保概率在 [0, 1] 范围内
current_entry_prob = max(0.0, min(1.0, current_entry_prob))
current_exit_prob = max(0.0, min(1.0, current_exit_prob))
if current_exit_prob > current_entry_prob * signal_ratio_threshold:
volatility_info = f",未来波动率: {future_volatility:.2f},调整后阈值: {signal_ratio_threshold:.2f}" if future_volatility is not None else ""
self.strategy_log(f"[{pair}] exit_signal ({current_exit_prob:.2f}) 超过 entry_signal ({current_entry_prob:.2f}) 的 {signal_ratio_threshold:.2f} 倍,支持出场{volatility_info}")
return True
# 原有ML预测逻辑作为补充
if current_entry_prob is not None and predicted_volatility is not None:
# 如果当前入场信号概率很高,说明市场条件仍然良好,可以继续持有
if current_entry_prob > 0.6:
self.strategy_log(f"[{pair}] 当前入场信号概率高({current_entry_prob:.2f}),建议继续持有")
return False # 阻止出场
# 如果当前入场信号概率很低,支持出场
if current_entry_prob < 0.25:
self.strategy_log(f"[{pair}] 当前入场信号概率低({current_entry_prob:.2f}),支持出场")
return True # 支持出场
# 如果波动率预测很低但入场信号概率较高,可能不应该立即出场
if predicted_volatility < 0.35 and current_entry_prob > 0.55:
self.strategy_log(f"[{pair}] 低波动+高入场信号({current_entry_prob:.2f}),建议继续持有等待机会")
return False # 阻止因低波动而强制出场
# 如果波动率预测很低且入场信号概率也低,支持出场
if predicted_volatility < 0.35 and current_entry_prob < 0.3:
self.strategy_log(f"[{pair}] 低波动+低入场信号({current_entry_prob:.2f}),支持出场")
return True # 支持出场
# 如果有趋势预测且预测为下跌趋势,支持出场
if trend_prediction is not None and trend_prediction < 0.3:
self.strategy_log(f"[{pair}] 趋势预测为下跌({trend_prediction:.2f}),支持出场")
return True
else:
self.strategy_log(f"[{pair}] FreqAI未启用使用默认出场逻辑")
# 如果无法获取ML数据或FreqAI未启用使用默认逻辑
# 只有当exit_reason是exit_signal时才允许出场
return exit_reason == 'exit_signal'
except Exception as e:
self.strategy_log(f"[{pair}] ML出场确认检查失败使用默认逻辑: {e}")
# 发生错误时,使用默认逻辑
return exit_reason == 'exit_signal'
def custom_exit(self, pair: str, trade: Trade, current_time: datetime, current_rate: float,
current_profit: float, **kwargs) -> float:
if trade.is_short:
return 0.0
trade_age_minutes = (current_time - trade.open_date_utc).total_seconds() / 60
if trade_age_minutes < 0:
trade_age_minutes = 0
# 使用可优化的线性函数: y = (a * (x + k)) + t
a = self.roi_param_a.value # 系数a (可优化参数)
k = self.roi_param_k.value # 偏移量k (可优化参数)
t = self.roi_param_t.value # 常数项t (可优化参数)
dynamic_roi_threshold = (a * (trade_age_minutes + k)) + t
# 确保ROI阈值不小于0
if dynamic_roi_threshold < 0:
dynamic_roi_threshold = 0.0
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
# 获取市场状态,使用多种回退方案
if 'market_state' in dataframe.columns:
current_state = dataframe['market_state'].iloc[-1]
elif 'sma' in dataframe.columns and len(dataframe) > 1:
# 使用 SMA 斜率判断市场状态
sma_diff = dataframe['sma'].diff().iloc[-1]
if sma_diff > 0.01:
current_state = 'strong_bull'
elif sma_diff > 0:
current_state = 'weak_bull'
else:
current_state = 'neutral'
else:
# 默认为中性
current_state = 'neutral'
entry_tag = trade.enter_tag if hasattr(trade, 'enter_tag') else None
profit_ratio = current_profit / dynamic_roi_threshold if dynamic_roi_threshold > 0 else 0
exit_ratio = 0.0
if profit_ratio >= 1.0:
if current_state == 'strong_bull':
exit_ratio = 0.5 if profit_ratio < 1.5 else 0.8
elif current_state == 'weak_bull':
exit_ratio = 0.6 if profit_ratio < 1.2 else 0.9
else:
exit_ratio = 1.0
if entry_tag == 'strong_trend':
exit_ratio *= 0.8
if dynamic_roi_threshold < 0:
exit_ratio = 1.0
#self.strategy_log(f"[{pair}] 动态止盈: 持仓时间={trade_age_minutes:.1f}分钟, 当前利润={current_profit:.2%}, "
# f"动态ROI阈值={dynamic_roi_threshold:.4f}, 利润比值={profit_ratio:.2f}, "
# f"市场状态={current_state}, entry_tag={entry_tag}, 退出比例={exit_ratio:.0%}")
# 当决定退出时,输出出场价格信息(但不调整价格)
if exit_ratio > 0:
self.strategy_log(f"[{pair}] 准备出场 - 市场价: {current_rate:.8f}, 退出比例: {exit_ratio:.0%}")
return exit_ratio
def adjust_trade_position(self, trade: 'Trade', current_time, current_rate: float,
current_profit: float, min_stake: float, max_stake: float, **kwargs) -> float:
"""
根据用户要求实现加仓逻辑
- 加仓间隔设置为可优化参数 add_position_callback
- 加仓额度为: (stake_amount / stake_divisor) ^ (加仓次数 - 1)
"""
# 获取当前交易对
pair = trade.pair
# 获取当前交易的加仓次数
entry_count = len(trade.orders) # 获取所有入场订单数量
# 如果已经达到最大加仓次数,则不再加仓
if entry_count - 1 >= self.max_entry_adjustments.value:
return 0.0
# 获取初始入场价格和当前价格的差值百分比
initial_price = trade.open_rate
if initial_price == 0:
return 0.0
price_diff_pct = (current_rate - initial_price) / initial_price
# 计算加仓次数从1开始计数
adjustment_count = entry_count - 1 # 已加仓次数
# 检查价格回调是否达到加仓间隔
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
current_state = dataframe['market_state'].iloc[-1] if 'market_state' in dataframe.columns else 'neutral'
# 计算当前所需的加仓间隔百分比 = 基础间隔 * (系数 ^ 已加仓次数)
# 获取当前币对的波动系数,用于动态调整回调百分比
volatility_coef = self.get_volatility_coefficient(pair)
# 回调百分比 = 基础回调 * (系数 ^ 已加仓次数) * 波动系数
current_callback = self.add_position_callback.value * (self.add_position_multiplier.value ** adjustment_count) * volatility_coef
if price_diff_pct <= -current_callback:
# 计算初始入场金额
initial_stake = trade.orders[0].cost # 第一笔订单的成本
# 计算加仓金额: (initial_stake / stake_divisor) ^ (adjustment_count + 1)
additional_stake = (initial_stake / self.stake_divisor.value) * (self.add_position_growth.value ** (adjustment_count + 1))
# 确保加仓金额在允许的范围内
additional_stake = max(min_stake, min(additional_stake, max_stake - trade.stake_amount))
#self.strategy_log(f"[{pair}] 触发加仓: 第{adjustment_count + 1}次加仓, 初始金额{initial_stake:.2f}, \
# 加仓金额{additional_stake:.2f}, 价格差{price_diff_pct:.2%}, 当前利润{current_profit:.2%}")
return additional_stake
# 不符合加仓条件返回0
return 0.0
def custom_stake_amount(self, pair: str, current_time: datetime, **kwargs) -> float:
"""
定义初始仓位大小
"""
# 获取默认的基础仓位大小
default_stake = self.stake_amount
# 从kwargs获取最小和最大仓位限制
min_stake = kwargs.get('min_stake', 0.0)
max_stake = kwargs.get('max_stake', default_stake)
# 确保仓位在允许的范围内
adjusted_stake = max(min_stake, min(default_stake, max_stake))
return adjusted_stake
def custom_exit_price(self, pair: str, trade: Trade, current_time: datetime, proposed_rate: float,
current_profit: float, exit_tag: str | None, **kwargs) -> float:
"""
自定义出场价格
根据文档,价格调整逻辑应该放在这里,而不是在 custom_exit 中
"""
# 计算出场价格上浮比例0.25%
price_markup_percent = 0.25
adjusted_exit_price = proposed_rate * (1 + price_markup_percent / 100)
self.strategy_log(f"[{pair}] 自定义出场价格: 原价格 {proposed_rate:.8f} -> 调整后价格 {adjusted_exit_price:.8f}, 上浮: {price_markup_percent}%")
return adjusted_exit_price
def custom_entry_price(self, pair: str, trade: Trade | None, current_time: datetime,
proposed_rate: float, entry_tag: str | None, side: str, **kwargs) -> float:
"""
自定义入场价格
将入场价格调低0.25%,以提高成交概率
"""
# 计算入场价格下调比例0.25%
price_discount_percent = 0.25
adjusted_entry_price = proposed_rate * (1 - price_discount_percent / 100)
self.strategy_log(f"[{pair}] 自定义入场价格: 原价格 {proposed_rate:.8f} -> 调整后价格 {adjusted_entry_price:.8f}, 下调: {price_discount_percent}%")
return adjusted_entry_price