From 9b29cc659794bff03bd177f30c95a2095beafda4 Mon Sep 17 00:00:00 2001 From: "zhangkun9038@dingtalk.com" Date: Wed, 18 Feb 2026 01:23:49 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B8=85=E7=90=86=E5=8E=9F=E6=9C=89=E5=88=86?= =?UTF-8?q?=E6=94=AF=E9=81=97=E7=95=99=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- freqtrade/templates/freqaiprimer.py | 1773 ++------------------------- 1 file changed, 104 insertions(+), 1669 deletions(-) diff --git a/freqtrade/templates/freqaiprimer.py b/freqtrade/templates/freqaiprimer.py index be0dc711..f1553553 100644 --- a/freqtrade/templates/freqaiprimer.py +++ b/freqtrade/templates/freqaiprimer.py @@ -2,14 +2,13 @@ import warnings warnings.filterwarnings("ignore", category=UserWarning, module="pandas_ta") import logging -from freqtrade.strategy import IStrategy, IntParameter, DecimalParameter +from freqtrade.strategy import IStrategy from pandas import DataFrame import pandas as pd import pandas_ta as ta from freqtrade.persistence import Trade import numpy as np from datetime import datetime, timezone, timedelta -import math logger = logging.getLogger(__name__) @@ -19,7 +18,7 @@ logger = logging.getLogger(__name__) UTC_PLUS_8 = timezone(timedelta(hours=8)) class FreqaiPrimer(IStrategy): - # 策略参数 - 使用custom_roi替代minimal_roi字典 + # 策略参数 loglevel = "warning" minimal_roi = {} add_position_callback = True @@ -30,46 +29,16 @@ class FreqaiPrimer(IStrategy): # FreqAI 要求 process_only_new_candles = True - stoploss = -0.15 # 固定止损 -15% (大幅放宽止损以承受更大波动) + stoploss = -0.15 # 固定止损 -15% trailing_stop = True - trailing_stop_positive_offset = 0.005 # 追踪止损偏移量 0.5% (更容易触发跟踪止盈) - - # 用于跟踪市场状态的数据框缓存 - _dataframe_cache = None + trailing_stop_positive_offset = 0.005 # 追踪止损偏移量 0.5% def __init__(self, config=None): """初始化策略参数,调用父类初始化方法并接受config参数""" super().__init__(config) # 调用父类的初始化方法并传递config - # 存储从配置文件加载的默认值 - self._trailing_stop_positive_default = 0.004 # 降低默认值以更容易触发跟踪止盈 - - # 波动系数缓存(简化版:直接计算,无需历史序列) - self._volatility_timestamp = {} # {pair: timestamp} - self._volatility_cache = {} # {pair: volatility_coef} - self._volatility_update_interval = 180 # 波动系数更新间隔(秒),3分钟 - - # 入场间隔控制:记录每个交易对最近一次入场的时间 - # 格式: {pair: datetime} - self._last_entry_time = {} - - # 冷启动保护:记录策略启动时间 + # 基础初始化 self._strategy_start_time = datetime.now() - - # ========== 数据新鲜度监控统计 ========== - # 用于量化和诊断数据延迟问题的严重性 - self._freshness_stats = { - 'total_checks': 0, # 总检查次数 - 'fresh_count': 0, # 新鲜数据次数 (< 10分钟) - 'warning_count': 0, # 警告次数 (10-30分钟) - 'stale_count': 0, # 过期次数 (> 30分钟) - 'max_delay_minutes': 0.0, # 最大延迟时间 - 'total_delay_minutes': 0.0, # 累计延迟时间(用于计算平均值) - 'pair_delays': {}, # 每个币对的延迟统计 {pair: {'count': N, 'avg_delay': M, 'max_delay': X}} - 'last_report_time': None, # 上次报告时间 - } - self._freshness_report_interval = 3600 # 每小时输出一次统计报告 - def strategy_log(self, message: str, level: str = "info") -> None: """根据 config 的 enable_strategy_log 决定是否输出日志""" enable_log = self.config.get('enable_strategy_log', False) @@ -83,1682 +52,148 @@ class FreqaiPrimer(IStrategy): logger.error(message) else: logger.info(message) - - def calculate_data_freshness(self, data_timestamp: datetime, pair: str, dataframe: DataFrame) -> float: - """ - 计算数据新鲜度(与交易所最新数据比较) - :param data_timestamp: 数据时间戳 - :param pair: 交易对 - :param dataframe: 数据框 - :return: 数据延迟时间(分钟) - """ - try: - # 转换为UTC+8时间 - if data_timestamp.tzinfo is None: - data_timestamp = data_timestamp.replace(tzinfo=timezone.utc) - data_timestamp_utc8 = data_timestamp.astimezone(UTC_PLUS_8).replace(tzinfo=None) - - # 获取交易所的最新数据时间 - exchange_latest_candle, exchange_latest_date = self.get_latest_candle(pair, self.timeframe, dataframe) - - if exchange_latest_date: - # 转换为UTC+8时间进行比较 - if exchange_latest_date.tzinfo is None: - exchange_latest_date = exchange_latest_date.replace(tzinfo=timezone.utc) - exchange_latest_utc8 = exchange_latest_date.astimezone(UTC_PLUS_8).replace(tzinfo=None) - freshness_minutes = (exchange_latest_utc8 - data_timestamp_utc8).total_seconds() / 60 - else: - # 如果无法获取交易所最新时间,使用保守估计(3分钟延迟) - freshness_minutes = 3.0 - - return max(0.0, freshness_minutes) # 确保不会返回负数 - - except Exception as e: - self.strategy_log(f"[{pair}] 数据新鲜度计算异常: {e}", "warning") - return 3.0 # 异常时返回保守估计 - - def update_freshness_stats(self, pair: str, data_age_minutes: float) -> None: - """ - 更新数据新鲜度统计 - :param pair: 交易对 - :param data_age_minutes: 数据延迟时间(分钟) - """ - stats = self._freshness_stats - stats['total_checks'] += 1 - stats['total_delay_minutes'] += data_age_minutes - stats['max_delay_minutes'] = max(stats['max_delay_minutes'], data_age_minutes) - # 分类统计 - if data_age_minutes <= 10: - stats['fresh_count'] += 1 - elif data_age_minutes <= 30: - stats['warning_count'] += 1 - else: - stats['stale_count'] += 1 - - # 每个币对的统计 - if pair not in stats['pair_delays']: - stats['pair_delays'][pair] = { - 'count': 0, - 'total_delay': 0.0, - 'max_delay': 0.0, - 'fresh': 0, - 'warning': 0, - 'stale': 0 - } - - pair_stat = stats['pair_delays'][pair] - pair_stat['count'] += 1 - pair_stat['total_delay'] += data_age_minutes - pair_stat['max_delay'] = max(pair_stat['max_delay'], data_age_minutes) - - if data_age_minutes <= 10: - pair_stat['fresh'] += 1 - elif data_age_minutes <= 30: - pair_stat['warning'] += 1 - else: - pair_stat['stale'] += 1 - - def report_freshness_stats(self, force: bool = False) -> None: - """ - 输出数据新鲜度统计报告 - :param force: 强制输出(忽略时间间隔) - """ - current_time = datetime.now() - stats = self._freshness_stats - - # 检查是否需要报告 - if not force and stats['last_report_time'] is not None: - elapsed = (current_time - stats['last_report_time']).total_seconds() - if elapsed < self._freshness_report_interval: - return - - # 如果没有数据,不输出 - if stats['total_checks'] == 0: - return - - # 计算统计指标 - avg_delay = stats['total_delay_minutes'] / stats['total_checks'] - fresh_rate = stats['fresh_count'] / stats['total_checks'] * 100 - warning_rate = stats['warning_count'] / stats['total_checks'] * 100 - stale_rate = stats['stale_count'] / stats['total_checks'] * 100 - - # 输出总体报告 - self.strategy_log( - f"\n" - f"========================================\n" - f" 📊 数据新鲜度统计报告\n" - f"========================================\n" - f"总检查次数: {stats['total_checks']}\n" - f"新鲜数据 (<10min): {stats['fresh_count']} ({fresh_rate:.1f}%)\n" - f"警告数据 (10-30min): {stats['warning_count']} ({warning_rate:.1f}%)\n" - f"过期数据 (>30min): {stats['stale_count']} ({stale_rate:.1f}%)\n" - f"平均延迟: {avg_delay:.2f} 分钟\n" - f"最大延迟: {stats['max_delay_minutes']:.2f} 分钟\n" - f"========================================", - level="info" - ) - - # 输出每个币对的详细统计(按过期率排序) - if stats['pair_delays']: - pair_stats_list = [] - for pair, pair_stat in stats['pair_delays'].items(): - avg_pair_delay = pair_stat['total_delay'] / pair_stat['count'] - stale_rate_pair = pair_stat['stale'] / pair_stat['count'] * 100 - pair_stats_list.append({ - 'pair': pair, - 'avg_delay': avg_pair_delay, - 'max_delay': pair_stat['max_delay'], - 'stale_rate': stale_rate_pair, - 'count': pair_stat['count'] - }) - - # 按过期率排序(从高到低) - pair_stats_list.sort(key=lambda x: x['stale_rate'], reverse=True) - - report_lines = ["📈 各币对数据新鲜度排名(按过期率降序):"] - for i, ps in enumerate(pair_stats_list[:10], 1): # 只显示前10个最差的 - report_lines.append( - f" {i}. {ps['pair']:15s} | " - f"平均: {ps['avg_delay']:5.1f}min | " - f"最大: {ps['max_delay']:5.1f}min | " - f"过期率: {ps['stale_rate']:5.1f}% | " - f"检查: {ps['count']}次" - ) - - self.strategy_log("\n".join(report_lines), level="info") - - # 更新报告时间 - stats['last_report_time'] = current_time - - - # 只用于adjust_trade_position方法的波动系数获取 - def get_volatility_coefficient(self, pair: str) -> float: - """ - 获取币对的波动系数(简化版:直接计算,无需历史序列) - - USDT/USDT 波动系数设置为0 - - BTC/USDT 波动系数设置为1 - - 其他币对: - 计算当前波动系数 = 该币对波动率 / BTC/USDT波动率 - (基于最近200根1h K线,足够稳定,无需额外平滑) - - 波动系数表示某币对与BTC/USDT相比的波动幅度倍数 - - 山寨币的波动系数可能大于3 - - 稳定性较高的币对(如DOT/USDT)波动系数可能小于1 - - 添加了缓存机制,每3分钟更新一次,避免频繁计算 - """ - # 检查特殊币对 - if pair == 'USDT/USDT': - return 0.0 - elif pair == 'BTC/USDT': - return 1.0 - - try: - # 获取当前时间戳 - current_time = datetime.now().timestamp() - - # 检查缓存:如果距离上次计算时间小于更新间隔,则直接返回缓存值 - if (pair in self._volatility_cache and - pair in self._volatility_timestamp and - current_time - self._volatility_timestamp[pair] < self._volatility_update_interval): - return self._volatility_cache[pair] - - # 直接计算当前波动系数(基于最近200根1h K线) - current_volatility_coef = self._calculate_current_volatility_coef(pair) - - # 更新缓存和时间戳 - self._volatility_cache[pair] = current_volatility_coef - self._volatility_timestamp[pair] = current_time - - self.strategy_log(f"波动系数计算完成 {pair}: 系数={current_volatility_coef:.4f} (基于最近200根1h K线)") - - return current_volatility_coef - - except Exception as e: - logger.warning(f"计算波动系数时出错 {pair}: {str(e)}") - # 如果出错,尝试返回缓存值,否则返回默认值1.0 - return self._volatility_cache.get(pair, 1.0) - - - def _calculate_current_volatility_coef(self, pair: str) -> float: - """ - 计算当前的波动系数(该币对波动率 / BTC/USDT波动率) - """ - try: - # 获取当前币对的1小时k线数据 - current_pair_df, _ = self.dp.get_analyzed_dataframe(pair, '1h') - # 获取BTC/USDT的1小时k线数据 - btc_df, _ = self.dp.get_analyzed_dataframe('BTC/USDT', '1h') - - # 确保有足够的数据点 - if len(current_pair_df) < 2 or len(btc_df) < 2: - return 1.0 # 如果没有足够数据,返回默认值1.0 - - # 对于数据点少于200个的情况,使用所有可用数据 - # 对于数据点多于200个的情况,使用最近200个数据点 - current_data = current_pair_df.iloc[-min(200, len(current_pair_df)):] - btc_data = btc_df.iloc[-min(200, len(btc_df)):] - - # 计算当前币对的对数收益率和波动率 - current_data['returns'] = current_data['close'].pct_change() - current_volatility = current_data['returns'].std() * 100 # 转换为百分比 - - # 计算BTC/USDT的对数收益率和波动率 - btc_data['returns'] = btc_data['close'].pct_change() - btc_volatility = btc_data['returns'].std() * 100 # 转换为百分比 - - # 避免除以零的情况 - if btc_volatility == 0: - return 1.0 - - # 计算波动系数:当前币对波动率 / BTC/USDT波动率 - volatility_coef = current_volatility / btc_volatility - - # 设置合理的上下限,避免极端值影响策略 - # 上限设置为5.0(非常高波动的币对) - # 下限设置为0.1(非常稳定的币对) - return max(0.1, min(5.0, volatility_coef)) - except Exception as e: - logger.warning(f"计算当前波动系数时出错 {pair}: {str(e)}") - return 1.0 # 出错时返回默认值1.0 - - # 其他辅助方法可以在这里添加 - - @property - def protections(self): - """ - 保护机制配置 - 基于最新Freqtrade规范,保护机制应定义在策略文件中而非配置文件 - """ - return [ - { - "method": "StoplossGuard", - "lookback_period_candles": 60, # 3小时回看期(60根3分钟K线) - "trade_limit": 2, # 最多2笔止损交易 - "stop_duration_candles": 60, # 暂停180分钟(60根3分钟K线) - "only_per_pair": False # 仅针对单个币对 - }, - { - "method": "CooldownPeriod", - "stop_duration_candles": 15 # 45分钟冷却期(15根3分钟K线) - }, - { - "method": "MaxDrawdown", - "lookback_period_candles": 48, # 2.4小时回看期 - "trade_limit": 4, # 4笔交易限制 - "stop_duration_candles": 24, # 72分钟暂停(24根3分钟K线) - "max_allowed_drawdown": 0.20 # 20%最大回撤容忍度 - } - ] - - @property - def trailing_stop_positive(self): - """根据市场状态动态调整跟踪止盈参数""" - # 获取当前市场状态 - if self._dataframe_cache is not None and len(self._dataframe_cache) > 0: - current_state = self._dataframe_cache['market_state'].iloc[-1] - if current_state == 'strong_bull': - return 0.007 # 强劲牛市中降低跟踪止盈,让利润奔跑 - elif current_state == 'weak_bull': - return 0.005 # 弱势牛市中保持较低的跟踪止盈 - return self._trailing_stop_positive_default # 返回默认值 - - @trailing_stop_positive.setter - def trailing_stop_positive(self, value): - """设置trailing_stop_positive的默认值""" - self._trailing_stop_positive_default = value - - timeframe = "3m" # 主时间框架为 3 分钟 - can_short = False # 禁用做空 - - # 自定义指标参数 - 使用Hyperopt可优化参数 - bb_length = IntParameter(10, 30, default=20, optimize=True, load=True, space='buy') - bb_std = DecimalParameter(1.5, 3.0, decimals=1, default=2.0, optimize=True, load=True, space='buy') - rsi_length = IntParameter(7, 21, default=14, optimize=True, load=True, space='buy') - rsi_oversold = IntParameter(30, 50, default=42, optimize=True, load=True, space='buy') - - - - # 入场条件阈值参数 - bb_lower_deviation = DecimalParameter(1.01, 1.05, decimals=2, default=1.03, optimize=True, load=True, space='buy') - rsi_bull_threshold = IntParameter(45, 55, default=50, optimize=True, load=True, space='buy') - stochrsi_bull_threshold = IntParameter(30, 40, default=35, optimize=True, load=True, space='buy') - stochrsi_neutral_threshold = IntParameter(20, 30, default=25, optimize=True, load=True, space='buy') - volume_multiplier = DecimalParameter(1.2, 2.0, decimals=1, default=1.5, optimize=True, load=True, space='buy') - bb_width_threshold = DecimalParameter(0.01, 0.03, decimals=3, default=0.02, optimize=True, load=True, space='buy') - min_condition_count = IntParameter(2, 4, default=3, optimize=True, load=True, space='buy') - - # 剧烈拉升检测参数 - 使用Hyperopt可优化参数 - h1_max_candles = IntParameter(100, 300, default=200, optimize=True, load=True, space='buy') - h1_rapid_rise_threshold = DecimalParameter(0.05, 0.15, decimals=3, default=0.11, optimize=True, load=True, space='buy') - h1_max_consecutive_candles = IntParameter(1, 4, default=2, optimize=True, load=True, space='buy') - - # 入场间隔控制参数(分钟) - entry_interval_minutes = IntParameter(20, 200, default=42, optimize=True, load=True, space='buy') - - # EMA20斜率阈值参数(用于趋势强度过滤) - ema20_slope_threshold = DecimalParameter(0.0005, 0.005, decimals=4, default=0.001, optimize=True, load=True, space='buy') - - # ML 审核官:entry_signal 拒绝入场的阈值(越高越宽松,越低越严格) - ml_entry_signal_threshold = DecimalParameter(0.05, 0.85, decimals=2, default=0.37, optimize=True, load=True, space='buy') - - # ML 审核官:exit_signal 拒绝出场的阈值(越高越宽松,越低越严格) - ml_exit_signal_threshold = DecimalParameter(0.05, 0.85, decimals=2, default=0.68, optimize=True, load=True, space='buy') - - # FreqAI 标签定义:entry_signal 的洛底上涨幅度(%) - freqai_entry_up_percent = DecimalParameter(0.3, 2.0, decimals=2, default=0.5, optimize=True, load=True, space='buy') - - # FreqAI 标签定义:exit_signal 的洛底下跌幅度(%) - freqai_exit_down_percent = DecimalParameter(0.3, 2.0, decimals=2, default=0.5, optimize=True, load=True, space='buy') - - # 定义可优化参数 - # 初始入场金额: 75.00 - -# 加仓次数 相对降幅间隔 加仓金额 -# ------- ------------ -------- -# 0 N/A 75 -# 1 0.045000 36.29 -# 2 0.051750 163.31 -# 3 0.059513 734.88 -# 4 0.068439 3306.96 -# -# 累计投入金额: 4316.43 - max_entry_adjustments = IntParameter(2, 5, default=4, optimize=False, load=True, space='buy') # 最大加仓次数 - add_position_callback = DecimalParameter(0.02, 0.06, decimals=3, default=0.047, optimize=False, load=True, space='buy') # 加仓回调百分比 - add_position_growth = DecimalParameter(1.5, 5.0, decimals=2, default=4.5, optimize=False, load=True, space='buy') # 加仓金额增长因子,保留2位小数用于hyperopt优化 - add_position_multiplier = DecimalParameter(0.2, 2, decimals=2, default=1.35, optimize=False, load=True, space='buy') # 加仓间隔系数,保留2位小数用于hyperopt优化 - stake_divisor = DecimalParameter(2.0, 12.0, decimals=2, default=9.3, optimize=False, load=True, space='buy') # 加仓金额分母(小数类型,保留2位小数) - - # 线性ROI参数 - 用于线性函数: y = (a * (x + k)) + t - roi_param_a = DecimalParameter(-0.0002, -0.00005, decimals=5, default=-0.0001, optimize=True, load=True, space='sell') # 系数a - roi_param_k = IntParameter(20, 150, default=50, optimize=True, load=True, space='sell') # 偏移量k - roi_param_t = DecimalParameter(0.02, 0.18, decimals=3, default=0.06, optimize=True, load=True, space='sell') # 常数项t - # 出场条件阈值参数 - exit_bb_upper_deviation = DecimalParameter(0.98, 1.02, decimals=2, default=1.0, optimize=True, load=True, space='sell') - exit_volume_multiplier = DecimalParameter(1.5, 3.0, decimals=1, default=2.0, optimize=True, load=True, space='sell') - - rsi_overbought = IntParameter(50, 70, default=58, optimize=True, load=True, space='sell') - - + # ========================= 强化学习相关方法 ========================= def informative_pairs(self): pairs = self.dp.current_whitelist() return [(pair, '15m') for pair in pairs] + [(pair, '1h') for pair in pairs] - - def _validate_dataframe_columns(self, dataframe: DataFrame, required_columns: list, metadata: dict): - """ - 验证数据框中是否包含所有需要的列。 - 如果缺少列,则记录警告日志。 - """ - missing_columns = [col for col in required_columns if col not in dataframe.columns] - if missing_columns: - logger.warning(f"[{metadata['pair']}] 数据框中缺少以下列: {missing_columns}") - - # ========================= FreqAI 特征与标签定义 ========================= - def feature_engineering_expand_all(self, dataframe: DataFrame, period: int, metadata: dict, **kwargs) -> DataFrame: - """FreqAI 全量特征:这里先用简单技术指标,后续可逐步扩展。""" - # 使用 rolling 计算 RSI(减少看前偏差) - delta = dataframe["close"].diff() - gain = delta.where(delta > 0, 0).rolling(window=period).mean() - loss = -delta.where(delta < 0, 0).rolling(window=period).mean() - rs = gain / loss - dataframe[f"%-rsi-{period}"] = 100 - (100 / (1 + rs)) - - dataframe[f"%-mfi-{period}"] = ta.mfi(dataframe["high"], dataframe["low"], dataframe["close"], dataframe["volume"], length=period) - adx_df = ta.adx(dataframe["high"], dataframe["low"], dataframe["close"], length=period) - adx_col = f"ADX_{period}" - if adx_col in adx_df.columns: - dataframe[f"%-adx-{period}"] = adx_df[adx_col] - return dataframe def feature_engineering_expand_basic(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame: - """FreqAI 基础特征。""" + """基础特征工程""" dataframe["%-pct_change"] = dataframe["close"].pct_change().fillna(0) dataframe["%-raw_volume"] = dataframe["volume"].fillna(0) - dataframe["%-raw_price"] = dataframe["close"].ffill() # 使用 ffill() 替代 fillna(method="ffill") + dataframe["%-raw_price"] = dataframe["close"].ffill() return dataframe def feature_engineering_standard(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame: - """FreqAI 标准时间类特征。""" + """标准时间类特征""" if "date" in dataframe.columns: dataframe["%-day_of_week"] = dataframe["date"].dt.dayofweek dataframe["%-hour_of_day"] = dataframe["date"].dt.hour return dataframe def set_freqai_targets(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame: - """定义 FreqAI 训练标签:简单二分类版本 + 持仓时长预测。""" - # 从配置中读取预测窗口参数(禁止硬编码) - label_horizon = self.freqai_info.get('feature_parameters', {}).get('label_period_candles', 24) - - # 动态计算上涨/下跌阈值 - entry_up_percent = self.freqai_entry_up_percent.value / 100.0 # 转换为小数(如 0.01 表示 1%) - exit_down_percent = self.freqai_exit_down_percent.value / 100.0 - - entry_up_threshold = 1.0 + entry_up_percent # 例如 1.01 表示 +1% - exit_down_threshold = 1.0 - exit_down_percent # 例如 0.99 表示 -1% - - # 入场标签:未来窗口内的最高价是否超过 +1% - future_max = dataframe["close"].rolling(window=label_horizon, min_periods=1).max().shift(-label_horizon + 1) - dataframe["&s-entry_signal"] = np.where( - future_max > dataframe["close"] * entry_up_threshold, - 1, - 0, - ) - - # 出场标签:未来窗口内的最低价是否跌破 -1% - future_min = dataframe["close"].rolling(window=label_horizon, min_periods=1).min().shift(-label_horizon + 1) - dataframe["&s-exit_signal"] = np.where( - future_min < dataframe["close"] * exit_down_threshold, - 1, - 0, - ) - - # 新增:未来波动率预测标签(极端化方案) - # 计算当前波动率(过10根K线的收盘价波动) - current_volatility = dataframe["close"].pct_change().rolling(window=10, min_periods=5).std() - - # 计算未来10根K线的波动率(向未来移动) - future_pct_change = dataframe["close"].pct_change().shift(-1) # 未来的收盘价变化 - future_volatility = future_pct_change.rolling(window=10, min_periods=5).std().shift(-9) # 未来10根K线的波动率 - - # 标签:未来波动率 > 当前波动率 * 1.5 则标记为高波动(趋势启动) - volatility_ratio = future_volatility / (current_volatility + 1e-8) # 避免除以0 - dataframe["&s-future_volatility"] = np.where( - volatility_ratio > 1.5, - 1, # 未来高波动(趋势启动),继续持有 - 0 # 未来低波动(震荡市),快速止盈 - ) - + """设置FreqAI目标变量(强化学习不需要传统标签,但需要保持兼容)""" + # 为了保持与FreqAI框架的兼容性,添加默认目标列 + dataframe["&s-entry_signal"] = 0 + dataframe["&s-exit_signal"] = 0 + dataframe["&s-future_volatility"] = 0 return dataframe def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: - # ========== 新增:ML目标变量数据新鲜度监控 ========== - # 检查FreqAI模型预测数据的新鲜度,确保使用最新的预测结果 - # 注意:ML预测数据通常比普通量化数据滞后一个timeframe(约3分钟) - # 这是FreqAI的正常工作机制,不影响策略有效性 - try: - # 获取FreqAI分析过的数据帧 - analyzed_df, _ = self.dp.get_analyzed_dataframe(metadata['pair'], self.timeframe) - if len(analyzed_df) > 0: - latest_analyzed_row = analyzed_df.iloc[-1] - latest_date = latest_analyzed_row.get('date') - - if latest_date is not None: - # 确保日期是datetime对象 - if not isinstance(latest_date, datetime): - try: - from pandas import Timestamp - if isinstance(latest_date, Timestamp): - latest_date = latest_date.to_pydatetime() - except: - pass - # 计算ML数据新鲜度(使用统一函数) - if isinstance(latest_date, datetime): - ml_age_minutes = self.calculate_data_freshness(latest_date, metadata['pair'], dataframe) - - # 检查目标变量的类型和新鲜度 - target_vars = [] - for col in analyzed_df.columns: - if col.startswith('target_') or col.startswith('pred_'): - target_vars.append(col) - - # 如果有多个目标变量,分别检查 - if target_vars: - # 检查所有目标变量的列是否存在 - available_targets = [] - for var in target_vars: - if var in analyzed_df.columns: - available_targets.append(var) - - if available_targets: - # 输出每个目标变量的检查结果 - target_info = [] - for target in available_targets[:3]: # 只显示前3个目标变量 - # 由于所有目标变量共享同一时间戳,我们可以统一显示 - if ml_age_minutes <= 10: - icon = "✅" - elif ml_age_minutes <= 30: - icon = "⚠️" - else: - icon = "❌" - target_info.append(f"{target}({icon}{ml_age_minutes:.1f}min)") - - # 如果有更多目标变量,显示总数 - if len(available_targets) > 3: - target_info.append(f"...共{len(available_targets)}个目标变量") - - self.strategy_log(f"[{metadata['pair']}] ML目标变量数据新鲜度: {', '.join(target_info)}") - else: - # 如果没有目标变量列,显示通用信息 - if ml_age_minutes <= 10: - ml_freshness_icon = "✅" - elif ml_age_minutes <= 30: - ml_freshness_icon = "⚠️" - else: - ml_freshness_icon = "❌" - - self.strategy_log(f"[{metadata['pair']}] ML目标变量数据新鲜度: {ml_freshness_icon} {ml_age_minutes:.1f}min") - else: - # 如果没有检测到目标变量,显示通用信息 - if ml_age_minutes <= 10: - ml_freshness_icon = "✅" - elif ml_age_minutes <= 30: - ml_freshness_icon = "⚠️" - else: - ml_freshness_icon = "❌" - - self.strategy_log(f"[{metadata['pair']}] ML目标变量数据新鲜度: {ml_freshness_icon} {ml_age_minutes:.1f}min") - except Exception as e: - self.strategy_log(f"[{metadata['pair']}] ML数据新鲜度检查失败: {e}") + """计算用于强化学习的特征""" + # 基础技术指标 + dataframe['rsi_1h'] = ta.rsi(dataframe['close'], length=14) - # 计算 3m 周期的指标 - bb_length_value = self.bb_length.value - bb_std_value = self.bb_std.value - rsi_length_value = self.rsi_length.value + # MACD指标 + macd = ta.macd(dataframe['close'], fast=12, slow=26, signal=9) + dataframe['macd_1h'] = macd['MACD_12_26_9'] + dataframe['macd_signal_1h'] = macd['MACDs_12_26_9'] - # 使用 rolling 计算布林带(减少看前偏差) - bb_ma_3m = dataframe['close'].rolling(window=bb_length_value).mean() - bb_std_3m = dataframe['close'].rolling(window=bb_length_value).std() - dataframe['bb_lower_3m'] = bb_ma_3m - (bb_std_value * bb_std_3m) - dataframe['bb_upper_3m'] = bb_ma_3m + (bb_std_value * bb_std_3m) + # 布林带 + bbands = ta.bbands(dataframe['close'], length=20, std=2) + dataframe['bb_upper_1h'] = bbands['BBU_20_2.0'] + dataframe['bb_lower_1h'] = bbands['BBL_20_2.0'] - # 使用 rolling 计算 RSI(减少看前偏差) - delta_3m = dataframe['close'].diff() - gain_3m = delta_3m.where(delta_3m > 0, 0).rolling(window=rsi_length_value).mean() - loss_3m = -delta_3m.where(delta_3m < 0, 0).rolling(window=rsi_length_value).mean() - rs_3m = gain_3m / loss_3m - dataframe['rsi_3m'] = 100 - (100 / (1 + rs_3m)) + # 移动平均线 + dataframe['ema_5_1h'] = ta.ema(dataframe['close'], length=5) + dataframe['ema_20_1h'] = ta.ema(dataframe['close'], length=20) - # 新增 StochRSI 指标 - stochrsi_3m = ta.stochrsi(dataframe['close'], length=rsi_length_value, rsi_length=rsi_length_value) - dataframe['stochrsi_k_3m'] = stochrsi_3m[f'STOCHRSIk_{rsi_length_value}_{rsi_length_value}_3_3'] - dataframe['stochrsi_d_3m'] = stochrsi_3m[f'STOCHRSId_{rsi_length_value}_{rsi_length_value}_3_3'] - - # 新增 MACD 指标 - macd_3m = ta.macd(dataframe['close'], fast=12, slow=26, signal=9) - dataframe['macd_3m'] = macd_3m['MACD_12_26_9'] - dataframe['macd_signal_3m'] = macd_3m['MACDs_12_26_9'] - dataframe['macd_hist_3m'] = macd_3m['MACDh_12_26_9'] - - # 使用 ewm 计算 EMA(减少看前偏差,adjust=False 确保实时计算) - dataframe['ema_50_3m'] = dataframe['close'].ewm(span=50, adjust=False).mean() - dataframe['ema_200_3m'] = dataframe['close'].ewm(span=200, adjust=False).mean() - - # 成交量过滤 - dataframe['volume_ma'] = dataframe['volume'].rolling(20).mean() - - # 计算 ATR 用于动态止损和退出 - dataframe['atr'] = ta.atr(dataframe['high'], dataframe['low'], dataframe['close'], length=14) - - # 获取 15m 数据 - df_15m = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='15m') - - # 使用 rolling 计算 RSI(减少看前偏差) - delta_15m = df_15m['close'].diff() - gain_15m = delta_15m.where(delta_15m > 0, 0).rolling(window=rsi_length_value).mean() - loss_15m = -delta_15m.where(delta_15m < 0, 0).rolling(window=rsi_length_value).mean() - rs_15m = gain_15m / loss_15m - df_15m['rsi_15m'] = 100 - (100 / (1 + rs_15m)) - - # 使用 ewm 计算 EMA(减少看前偏差) - df_15m['ema_20_15m'] = df_15m['close'].ewm(span=20, adjust=False).mean() - df_15m['ema_50_15m'] = df_15m['close'].ewm(span=50, adjust=False).mean() - df_15m['ema_200_15m'] = df_15m['close'].ewm(span=200, adjust=False).mean() - - # 新增 StochRSI 指标 - stochrsi_15m = ta.stochrsi(df_15m['close'], length=rsi_length_value, rsi_length=rsi_length_value) - df_15m['stochrsi_k_15m'] = stochrsi_15m[f'STOCHRSIk_{rsi_length_value}_{rsi_length_value}_3_3'] - df_15m['stochrsi_d_15m'] = stochrsi_15m[f'STOCHRSId_{rsi_length_value}_{rsi_length_value}_3_3'] - - # 新增 MACD 指标 - macd_15m = ta.macd(df_15m['close'], fast=12, slow=26, signal=9) - df_15m['macd_15m'] = macd_15m['MACD_12_26_9'] - df_15m['macd_signal_15m'] = macd_15m['MACDs_12_26_9'] - df_15m['macd_hist_15m'] = macd_15m['MACDh_12_26_9'] - - # 将 15m 数据重新索引到主时间框架 (3m) - df_15m = df_15m.set_index('date').reindex(dataframe['date']).reset_index() - df_15m = df_15m.rename(columns={'index': 'date'}) - df_15m = df_15m[['date', 'rsi_15m', 'ema_20_15m', 'ema_50_15m', 'ema_200_15m']].ffill() - - # 合并 15m 数据 - dataframe = dataframe.merge(df_15m, how='left', on='date') - - # 获取 1h 数据 - df_1h = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='1h') - - # 使用 rolling 计算布林带(减少看前偏差) - bb_ma_1h = df_1h['close'].rolling(window=bb_length_value).mean() - bb_std_1h = df_1h['close'].rolling(window=bb_length_value).std() - df_1h['bb_lower_1h'] = bb_ma_1h - (bb_std_value * bb_std_1h) - df_1h['bb_upper_1h'] = bb_ma_1h + (bb_std_value * bb_std_1h) - - # 添加 EMA5 和 EMA20 用于趋势过滤(方案2:宽松条件) - df_1h['ema_5_1h'] = df_1h['close'].ewm(span=5, adjust=False).mean() - df_1h['ema_20_1h'] = df_1h['close'].ewm(span=20, adjust=False).mean() - - # 计算 EMA20 斜率(用于趋势强度过滤) - df_1h['ema20_slope'] = df_1h['ema_20_1h'].diff() - # 归一化斜率,使其相对于价格水平标准化 - df_1h['ema20_slope_normalized'] = df_1h['ema20_slope'] / df_1h['ema_20_1h'] - - # 检测 EMA5 向上穿越 EMA20(添加安全检查) - if len(df_1h) >= 2: - df_1h['ema5_cross_above_ema20'] = ( - (df_1h['ema_5_1h'] > df_1h['ema_20_1h']) & - (df_1h['ema_5_1h'].shift(1) <= df_1h['ema_20_1h'].shift(1)) - ) - else: - # 数据不足时,默认为False - df_1h['ema5_cross_above_ema20'] = False - - # 使用 rolling 计算 RSI(减少看前偏差) - delta_1h = df_1h['close'].diff() - gain_1h = delta_1h.where(delta_1h > 0, 0).rolling(window=rsi_length_value).mean() - loss_1h = -delta_1h.where(delta_1h < 0, 0).rolling(window=rsi_length_value).mean() - rs_1h = gain_1h / loss_1h - df_1h['rsi_1h'] = 100 - (100 / (1 + rs_1h)) - - # 使用 ewm 计算 EMA(减少看前偏差) - df_1h['ema_50_1h'] = df_1h['close'].ewm(span=50, adjust=False).mean() - df_1h['ema_200_1h'] = df_1h['close'].ewm(span=200, adjust=False).mean() - df_1h['trend_1h'] = df_1h['close'] > df_1h['ema_50_1h'] # 1h上涨趋势 - - # 新增 StochRSI 指标 - stochrsi_1h = ta.stochrsi(df_1h['close'], length=rsi_length_value, rsi_length=rsi_length_value) - df_1h['stochrsi_k_1h'] = stochrsi_1h[f'STOCHRSIk_{rsi_length_value}_{rsi_length_value}_3_3'] - df_1h['stochrsi_d_1h'] = stochrsi_1h[f'STOCHRSId_{rsi_length_value}_{rsi_length_value}_3_3'] - - # 新增 MACD 指标 - macd_1h = ta.macd(df_1h['close'], fast=12, slow=26, signal=9) - df_1h['macd_1h'] = macd_1h['MACD_12_26_9'] - df_1h['macd_signal_1h'] = macd_1h['MACDs_12_26_9'] - df_1h['macd_hist_1h'] = macd_1h['MACDh_12_26_9'] - - # 验证 MACD 列是否正确生成 - #self.strategy_log(f"[{metadata['pair']}] 1小时 MACD 列: {list(macd_1h.columns)}") - - # 确保 StochRSI 指标已正确计算 - # 将 1h 数据重新索引到主时间框架 (3m),并填充缺失值 - df_1h = df_1h.set_index('date').reindex(dataframe['date']).ffill().bfill().reset_index() - df_1h = df_1h.rename(columns={'index': 'date'}) -# Include macd_1h, macd_signal_1h, ema_5_1h, ema_20_1h, ema5_cross_above_ema20 in the column selection - df_1h = df_1h[['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h', 'bb_lower_1h', 'bb_upper_1h', 'stochrsi_k_1h', 'stochrsi_d_1h', 'macd_1h', 'macd_signal_1h', 'macd_hist_1h', 'ema_5_1h', 'ema_20_1h', 'ema20_slope_normalized', 'ema5_cross_above_ema20']].ffill() - -# Validate that all required columns are present - required_columns = ['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h', - 'bb_lower_1h', 'bb_upper_1h', 'stochrsi_k_1h', 'stochrsi_d_1h', - 'macd_1h', 'macd_signal_1h', 'macd_hist_1h', 'ema_5_1h', 'ema_20_1h', 'ema20_slope_normalized', 'ema5_cross_above_ema20'] - missing_columns = [col for col in required_columns if col not in df_1h.columns] - if missing_columns: - logger.error(f"[{metadata['pair']}] 缺少以下列: {missing_columns}") - raise KeyError(f"缺少以下列: {missing_columns}") - - # 确保所有需要的列都被合并 - required_columns = ['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h', - 'bb_lower_1h', 'bb_upper_1h', 'stochrsi_k_1h', 'stochrsi_d_1h', - 'macd_1h', 'macd_signal_1h', 'macd_hist_1h', 'ema_5_1h', 'ema_20_1h', 'ema20_slope_normalized', 'ema5_cross_above_ema20'] - - # 验证所需列是否存在 - missing_columns = [col for col in required_columns if col not in df_1h.columns] - if missing_columns: - logger.error(f"[{metadata['pair']}] 缺少以下列: {missing_columns}") - raise KeyError(f"缺少以下列: {missing_columns}") - - # 确保所有需要的列都被合并 - required_columns = ['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h', - 'bb_lower_1h', 'bb_upper_1h', 'stochrsi_k_1h', 'stochrsi_d_1h', - 'macd_1h', 'macd_signal_1h', 'macd_hist_1h', 'ema_5_1h', 'ema_20_1h', 'ema20_slope_normalized', 'ema5_cross_above_ema20'] - - df_1h = df_1h[required_columns] # 确保包含所有必需的列(包括EMA过滤相关列) - - # 合并 1h 数据 - dataframe = dataframe.merge(df_1h, how='left', on='date').ffill() - - # 验证合并后的列 - #self.strategy_log(f"[{metadata['pair']}] 合并后的数据框列名: {list(dataframe.columns)}") - - # K线形态:看涨吞没 - dataframe['bullish_engulfing'] = ( - (dataframe['close'].shift(1) < dataframe['open'].shift(1)) & - (dataframe['close'] > dataframe['open']) & - (dataframe['close'] > dataframe['open'].shift(1)) & - (dataframe['open'] < dataframe['close'].shift(1)) - ) - - # 计算各时间框架的趋势状态(牛/熊) - # 3m时间框架:ema50下穿ema200为熊,上穿为牛 - dataframe['trend_3m'] = np.where(dataframe['ema_50_3m'] > dataframe['ema_200_3m'], 1, 0) - - # 15m时间框架:ema50下穿ema200为熊,上穿为牛 - dataframe['trend_15m'] = np.where(dataframe['ema_50_15m'] > dataframe['ema_200_15m'], 1, 0) - - # 1h时间框架:ema50下穿ema200为熊,上穿为牛 - dataframe['trend_1h_ema'] = np.where(dataframe['ema_50_1h'] > dataframe['ema_200_1h'], 1, 0) - - # 计算熊牛得分(0-100) - # 权重:3m熊牛权重10,15m熊牛权重35,1h熊牛权重65 - # 计算加权得分 - dataframe['market_score'] = ( - dataframe['trend_3m'] * 10 + - dataframe['trend_15m'] * 35 + - dataframe['trend_1h_ema'] * 65 - ) - - # 确保得分在0-100范围内 - dataframe['market_score'] = dataframe['market_score'].clip(lower=0, upper=100) - - # 根据得分分类市场状态 - dataframe['market_state'] = 'neutral' - dataframe.loc[dataframe['market_score'] > 70, 'market_state'] = 'strong_bull' - dataframe.loc[(dataframe['market_score'] > 50) & (dataframe['market_score'] <= 70), 'market_state'] = 'weak_bull' - dataframe.loc[(dataframe['market_score'] >= 30) & (dataframe['market_score'] <= 50), 'market_state'] = 'neutral' - dataframe.loc[(dataframe['market_score'] > 10) & (dataframe['market_score'] < 30), 'market_state'] = 'weak_bear' - dataframe.loc[dataframe['market_score'] <= 10, 'market_state'] = 'strong_bear' - - # 创建一个使用前一行市场状态的列,避免在populate_entry_trend中使用iloc[-1] - dataframe['prev_market_state'] = dataframe['market_state'].shift(1) - # 为第一行设置默认值 - dataframe['prev_market_state'] = dataframe['prev_market_state'].fillna('neutral') - - # 记录当前的市场状态 - if len(dataframe) > 0: - current_score = dataframe['market_score'].iloc[-1] - current_state = dataframe['market_state'].iloc[-1] - #self.strategy_log(f"[{metadata['pair']}] 熊牛得分: {current_score:.1f}, 市场状态: {current_state}") - - # 计算并显示基于未来波动率的最终阈值 - if len(dataframe) > 0: - # 检查是否有未来波动率列 - future_volatility = None - if '&s-future_volatility' in dataframe.columns: - future_volatility = dataframe['&s-future_volatility'].iloc[-1] - elif '&-future_volatility' in dataframe.columns: - future_volatility = dataframe['&-future_volatility'].iloc[-1] - - if future_volatility is not None: - # 确保 future_volatility 在 [0, 1] 范围内 - future_volatility = max(0.0, min(1.0, future_volatility)) - - # 使用与 confirm_trade_entry 相同的线性函数计算阈值 - base_signal_ratio_threshold = 1.33 - x_min = 0.3 - x_max = 0.9 - y_min = 1.6 - y_max = 1.1 - - if future_volatility >= x_max: - signal_ratio_threshold = y_max - elif future_volatility <= x_min: - signal_ratio_threshold = y_min - else: - signal_ratio_threshold = y_max + (y_min - y_max) * (future_volatility - x_max) / (x_min - x_max) - - # 限制阈值范围 - signal_ratio_threshold = max(1.1, min(1.6, signal_ratio_threshold)) - - self.strategy_log(f"[{metadata['pair']}] 基于未来波动率的最终阈值: {signal_ratio_threshold:.2f} (波动率: {future_volatility:.2f})") - #self.strategy_log(f"[{metadata['pair']}] 各时间框架趋势: 3m={'牛' if dataframe['trend_3m'].iloc[-1] == 1 else '熊'}, \ - # 15m={'牛' if dataframe['trend_15m'].iloc[-1] == 1 else '熊'}, \ - # 1h={'牛' if dataframe['trend_1h_ema'].iloc[-1] == 1 else '熊'}") - - # 调试:打印指标值(最后 5 行),验证时间对齐 - #print(f"Pair: {metadata['pair']}, Last 5 rows after reindexing:") - #print(dataframe[['date', 'close', 'bb_lower_3m', 'rsi_3m', 'rsi_15m', 'rsi_1h', 'trend_1h', - # 'trend_3m', 'trend_15m', 'trend_1h_ema', 'market_score', 'market_state', - # 'bullish_engulfing', 'volume', 'volume_ma']].tail(5)) - - # 打印最终数据框的列名以验证 - #self.strategy_log(f"[{metadata['pair']}] 最终数据框列名: {list(dataframe.columns)}") + # 成交量指标 + dataframe['volume_ma'] = dataframe['volume'].rolling(window=20).mean() + dataframe['volume_ratio'] = dataframe['volume'] / dataframe['volume_ma'] + dataframe['volume_spike'] = dataframe['volume_ratio'] > 2.0 # 检查FreqAI是否启用 - freqai_enabled = False - try: - # 更可靠的检查方法:检查config中是否启用了FreqAI - freqai_enabled = self.config.get('freqai', {}).get('enabled', False) - # 同时检查freqai属性是否存在且有效 - if freqai_enabled and hasattr(self, 'freqai') and self.freqai is not None: - freqai_enabled = True - else: - freqai_enabled = False - except Exception as e: - self.strategy_log(f"[{metadata['pair']}] 检查FreqAI启用状态时出错: {str(e)}") - freqai_enabled = False + freqai_enabled = self.config.get('freqai', {}).get('enabled', False) # 只有在FreqAI启用时才调用 - if freqai_enabled: + if freqai_enabled and hasattr(self, 'freqai') and self.freqai is not None: self.strategy_log(f"[{metadata['pair']}] FreqAI已启用,启动FreqAI处理") - # 启用 FreqAI:在所有指标计算完成后调用 dataframe = self.freqai.start(dataframe, metadata, self) - - # ========== 新增:记录FreqAI多目标预测结果 ========== - # 检查并记录所有可用的预测列,确认XGBoostRegressorMultiTarget是否为所有目标变量生成预测 - try: - pred_columns = [] - for col in dataframe.columns: - if col.startswith('&-') or col.startswith('&s-'): # FreqAI预测列通常以&-或&s-开头 - pred_columns.append(col) - - if pred_columns: - # 获取最后一行的预测值 - last_row = dataframe.iloc[-1] - pred_values = {} - for col in pred_columns: - if col in last_row: - pred_values[col] = round(float(last_row[col]), 4) - - if pred_values: - pred_info = ", ".join([f"{k}:{v}" for k, v in pred_values.items()]) - self.strategy_log(f"[{metadata['pair']}] FreqAI多目标预测: {pred_info}") - - # 特别记录三个主要目标变量 - entry_val = pred_values.get('&s-entry_signal', 'N/A') - exit_val = pred_values.get('&s-exit_signal', 'N/A') - vol_val = pred_values.get('&s-future_volatility', 'N/A') - - self.strategy_log(f"[{metadata['pair']}] FreqAI三目标预测 - 入场:{entry_val}, 出场:{exit_val}, 波动率:{vol_val}") - - # ========== 新增:发送HTTP请求 ========== - # 从配置中读取API URL - api_url = self.config.get('freqai', {}).get('ml_prediction_api_url', '') - if api_url: - import json - import urllib.request - import urllib.error - import os - - # 从环境变量获取virtualHostName - virtual_host_name = os.environ.get('VIRTUAL_HOST_NAME') - - # 只有正确获取到virtualHostName时才发送请求 - if virtual_host_name and virtual_host_name != 'default-vhost': - # 准备请求数据 - request_data = { - "hostName": "freqtrade-live-c6b6f357-20260215212649", - "virtualHostName": virtual_host_name, - "pairName": metadata['pair'], - "timestamp": int(last_row['date'].timestamp()), - "values": { - "&s-entry_signal": pred_values.get('&s-entry_signal', 0), - "&s-exit_signal": pred_values.get('&s-exit_signal', 0), - "&s-future_volatility": pred_values.get('&s-future_volatility', 0) - } - } - - # 转换为JSON - json_data = json.dumps(request_data).encode('utf-8') - - # 发送请求 - try: - req = urllib.request.Request(api_url, data=json_data, method='POST') - req.add_header('Content-Type', 'application/json') - with urllib.request.urlopen(req, timeout=5) as response: - status_code = response.getcode() - if status_code == 200: - self.strategy_log(f"[{metadata['pair']}] 成功发送ML预测数据到API (virtualHostName: {virtual_host_name})") - else: - self.strategy_log(f"[{metadata['pair']}] 发送ML预测数据失败,状态码: {status_code}") - except urllib.error.URLError as e: - self.strategy_log(f"[{metadata['pair']}] 发送ML预测数据网络错误: {str(e)}") - except Exception as e: - self.strategy_log(f"[{metadata['pair']}] 发送ML预测数据其他错误: {str(e)}") - else: - self.strategy_log(f"[{metadata['pair']}] 未正确获取到virtualHostName,跳过发送ML预测数据") - else: - self.strategy_log(f"[{metadata['pair']}] ML预测API URL未配置,跳过发送") - except Exception as e: - self.strategy_log(f"[{metadata['pair']}] 记录FreqAI预测结果时出错: {str(e)}") else: self.strategy_log(f"[{metadata['pair']}] FreqAI未启用,跳过FreqAI处理") return dataframe - - def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: - # 出场信号基于趋势和量价关系 - # 条件1: 价格突破布林带上轨(使用可优化的偏差参数) - breakout_condition = dataframe['close'] >= dataframe['bb_upper_1h'] * self.exit_bb_upper_deviation.value - - # 条件2: 成交量显著放大(使用可优化的成交量乘数) - volume_spike = dataframe['volume'] > dataframe['volume_ma'] * self.exit_volume_multiplier.value - - # 条件3: MACD 下降趋势 - macd_downward = dataframe['macd_1h'] < dataframe['macd_signal_1h'] - - # 条件4: RSI 进入超买区域(使用可优化的超买阈值) - rsi_overbought = dataframe['rsi_1h'] > self.rsi_overbought.value - - # 合并所有条件 - final_condition = breakout_condition | volume_spike | macd_downward | rsi_overbought - - # 设置出场信号 - dataframe.loc[final_condition, 'exit_long'] = 1 - - # 设置出场价格:上浮1.25%(使用乘法避免除零风险) - # Freqtrade 会优先使用 exit_price 列作为限价单价格 - final_exit_condition = dataframe['exit_long'] == 1 - #dataframe.loc[final_exit_condition, 'exit_price'] = dataframe.loc[final_exit_condition, 'close'] * 1.0125 - - # 增强调试信息 - #self.strategy_log(f"[{metadata['pair']}] 出场条件检查:") - #self.strategy_log(f" - 价格突破布林带上轨: {breakout_condition.sum()} 次") - #self.strategy_log(f" - 成交量显著放大: {volume_spike.sum()} 次") - #self.strategy_log(f" - MACD 下降趋势: {macd_downward.sum()} 次") - #self.strategy_log(f" - RSI 超买: {rsi_overbought.sum()} 次") - #self.strategy_log(f" - 最终条件: {final_condition.sum()} 次") - #self.strategy_log(f" - 使用参数: exit_bb_upper_deviation={self.exit_bb_upper_deviation.value}, exit_volume_multiplier={self.exit_volume_multiplier.value}, rsi_overbought={self.rsi_overbought.value}") - - # 日志记录 - #if dataframe['exit_long'].sum() > 0: - # self.strategy_log(f"[{metadata['pair']}] 触发出场信号数量: {dataframe['exit_long'].sum()}") - - return dataframe - - def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: - # 确保prev_market_state列存在 - if 'prev_market_state' not in dataframe.columns: - dataframe['prev_market_state'] = 'neutral' - - # 条件1: 价格接近布林带下轨(允许一定偏差) - close_to_bb_lower_1h = (dataframe['close'] <= dataframe['bb_lower_1h'] * self.bb_lower_deviation.value) # 可优化偏差 - - # 条件2: RSI 不高于阈值(根据市场状态动态调整) - # 为每一行创建动态阈值 - rsi_condition_1h = dataframe.apply(lambda row: - row['rsi_1h'] < self.rsi_bull_threshold.value if row['prev_market_state'] in ['strong_bull', 'weak_bull'] else row['rsi_1h'] < self.rsi_oversold.value, - axis=1) - - # 条件3: StochRSI 处于超卖区域(根据市场状态动态调整) - stochrsi_condition_1h = dataframe.apply(lambda row: - (row['stochrsi_k_1h'] < self.stochrsi_bull_threshold.value and row['stochrsi_d_1h'] < self.stochrsi_bull_threshold.value) if row['prev_market_state'] in ['strong_bull', 'weak_bull'] - else (row['stochrsi_k_1h'] < self.stochrsi_neutral_threshold.value and row['stochrsi_d_1h'] < self.stochrsi_neutral_threshold.value), - axis=1) - - # 条件4: MACD 上升趋势 - macd_condition_1h = dataframe['macd_1h'] > dataframe['macd_signal_1h'] - - # 条件5: 成交量显著放大(可选条件) - volume_spike = dataframe['volume'] > dataframe['volume_ma'] * self.volume_multiplier.value - - # 条件6: 布林带宽度过滤(避免窄幅震荡) - bb_width = (dataframe['bb_upper_1h'] - dataframe['bb_lower_1h']) / dataframe['close'] - bb_width_condition = bb_width > self.bb_width_threshold.value # 可优化的布林带宽度阈值 - - # 辅助条件: 3m 和 15m 趋势确认(允许部分时间框架不一致) - trend_confirmation = (dataframe['trend_3m'] == 1) | (dataframe['trend_15m'] == 1) - - # 新增:EMA趋势过滤条件(方案2:宽松版本) - # 条件1:EMA5保持在EMA20之上 或 条件2:最近20根1h K线内发生过向上穿越 - # 这样既能捕捉趋势启动,又能在趋势延续时继续入场 - if 'ema_5_1h' in dataframe.columns and 'ema_20_1h' in dataframe.columns: - # 条件1:EMA5保持在EMA20之上 - ema5_above_ema20 = dataframe['ema_5_1h'] > dataframe['ema_20_1h'] - - # 条件2:最近20根1h K线内发生过向上穿越 - if 'ema5_cross_above_ema20' in dataframe.columns: - # 使用rolling.max检查最近20根K线内是否有True值 - recent_cross = dataframe['ema5_cross_above_ema20'].rolling(window=20, min_periods=1).max() == 1 - # 两个条件满足其一即可 - ema5_20_condition = ema5_above_ema20 | recent_cross - else: - # 如果没有交叉列,只用保持在上方的条件 - ema5_20_condition = ema5_above_ema20 - - # 新增条件:EMA20斜率阈值检查 - if 'ema20_slope_normalized' in dataframe.columns: - # EMA20斜率为正(上升趋势)且超过阈值 - ema20_positive_slope = dataframe['ema20_slope_normalized'] > self.ema20_slope_threshold.value - # 组合条件:EMA5/EMA20条件 AND EMA20斜率条件 - ema_trend_filter = ema5_20_condition & ema20_positive_slope - else: - # 如果没有斜率列,只用EMA5/EMA20条件 - ema_trend_filter = ema5_20_condition - else: - # 如果列不存在,创建一个全False的Series(不允许入场) - self.strategy_log(f"[{metadata['pair']}] 警告:ema_5_1h或ema_20_1h列不存在,过滤条件设为False") - ema_trend_filter = pd.Series(False, index=dataframe.index) - - # 合并所有条件(减少强制性条件) - # 至少满足多个条件中的一定数量,并且必须满足EMA趋势过滤 - condition_count = ( - close_to_bb_lower_1h.astype(int) + - rsi_condition_1h.astype(int) + - stochrsi_condition_1h.astype(int) + - macd_condition_1h.astype(int) + - (volume_spike | bb_width_condition).astype(int) + # 成交量或布林带宽度满足其一即可 - trend_confirmation.astype(int) - ) - # 新增:价格必须低于15分钟EMA20的条件 - price_below_ema20_15m = dataframe['close'] < dataframe['ema_20_15m'] - - # 最终条件:基本条件 + EMA趋势过滤 + 价格低于15分钟EMA20(方案2:宽松版) - basic_condition = condition_count >= self.min_condition_count.value - final_condition = basic_condition & ema_trend_filter & price_below_ema20_15m - - # 设置入场信号 - dataframe.loc[final_condition, 'enter_long'] = 1 - - # ========== 新增:入场诊断统计(回测可用) ========== - # 先输出最新数据时间(dataframe最后一行),用于判断数据新鲜度 - if 'date' in dataframe.columns and len(dataframe) > 0: - latest_data_date = dataframe['date'].iloc[-1] - try: - if isinstance(latest_data_date, datetime): - display_latest = latest_data_date - if display_latest.tzinfo is None: - display_latest = display_latest.replace(tzinfo=timezone.utc) - display_latest = display_latest.astimezone(UTC_PLUS_8) - latest_time_str = display_latest.strftime('%H:%M:%S') - # 计算数据新鲜度(使用统一函数) - data_freshness = self.calculate_data_freshness(latest_data_date, metadata['pair'], dataframe) - - # 数据新鲜度判断 - if data_freshness <= 3: # 3分钟内为新鲜数据 - fresh_icon = "✅" - elif data_freshness <= 10: # 3-10分钟为可接受延迟 - fresh_icon = "⚠️" - else: # 超过10分钟为过期数据 - fresh_icon = "❌" - self.strategy_log( - f"[{metadata['pair']}] 数据新鲜度: {fresh_icon} {data_freshness:.1f}min | " - f"最新K线: {latest_time_str} | 总行数: {len(dataframe)}" - ) - else: - from pandas import Timestamp - if isinstance(latest_data_date, Timestamp): - display_latest = latest_data_date.to_pydatetime() - if display_latest.tzinfo is None: - display_latest = display_latest.replace(tzinfo=timezone.utc) - display_latest = display_latest.astimezone(UTC_PLUS_8) - latest_time_str = display_latest.strftime('%H:%M:%S') - latest_naive = display_latest.replace(tzinfo=None) - - # 计算数据新鲜度(使用统一函数) - data_freshness = self.calculate_data_freshness(latest_data_date, metadata['pair'], dataframe) - - # 数据新鲜度判断 - if data_freshness <= 3: # 3分钟内为新鲜数据 - fresh_icon = "✅" - elif data_freshness <= 10: # 3-10分钟为可接受延迟 - fresh_icon = "⚠️" - else: # 超过10分钟为过期数据 - fresh_icon = "❌" - self.strategy_log( - f"[{metadata['pair']}] 数据新鲜度: {fresh_icon} {data_freshness:.1f}min | " - f"最新K线: {latest_time_str} | 总行数: {len(dataframe)}" - ) - except Exception as e: - self.strategy_log(f"[{metadata['pair']}] 数据新鲜度计算异常: {e}") - - # ========== 诊断统计结束 ========== - - # 设置入场价格:下调1.67%(使用乘法避免除零风险) - final_condition_updated = dataframe['enter_long'] == 1 - #dataframe.loc[final_condition_updated, 'enter_price'] = dataframe.loc[final_condition_updated, 'close'] * 0.9833 - - # 增强调试信息 - # 确保ema_trend_filter是Series类型才能调用sum() - if isinstance(ema_trend_filter, pd.Series): - ema_trend_count = ema_trend_filter.sum() - else: - ema_trend_count = 0 - - basic_condition_count = basic_condition.sum() - final_condition_count = final_condition.sum() - - self.strategy_log(f"[{metadata['pair']}] 入场条件检查:") - self.strategy_log(f" - 价格接近布林带下轨: {close_to_bb_lower_1h.sum()} 次") - self.strategy_log(f" - RSI 超卖: {rsi_condition_1h.sum()} 次") - self.strategy_log(f" - StochRSI 超卖: {stochrsi_condition_1h.sum()} 次") - self.strategy_log(f" - MACD 上升趋势: {macd_condition_1h.sum()} 次") - self.strategy_log(f" - 成交量或布林带宽度: {(volume_spike | bb_width_condition).sum()} 次") - self.strategy_log(f" - 价格低于15分钟EMA20: {price_below_ema20_15m.sum()} 次") - self.strategy_log(f" - 趋势确认: {trend_confirmation.sum()} 次") - self.strategy_log(f" - EMA趋势过滤(在上方或20根K线内穿越): {ema_trend_count} 次") - self.strategy_log(f" - 基本条件满足: {basic_condition_count} 次") - self.strategy_log(f" - 最终条件(基本+EMA过滤): {final_condition_count} 次") - - # ========== 新增:最近5个时间点的条件满足状况可视化 ========== - # 显示最近5个时间点的条件满足情况(更直观) - if len(dataframe) >= 5: - recent_indices = dataframe.index[-5:] - self.strategy_log(f"[{metadata['pair']}] 最近5个时间点条件满足状况:") - - # 辅助函数:根据数值距离阈值的程度返回颜色emoji - def get_color_emoji(value, threshold, condition_type='less_than'): - """ - 根据数值距离阈值的程度返回颜色emoji - condition_type: 'less_than' 表示值小于阈值为满足条件 - 'greater_than' 表示值大于阈值为满足条件 - """ - if condition_type == 'less_than': - # value越小越满足条件,所以距离阈值越远(值越大)越红 - distance = value - threshold # 距离阈值的距离,正值表示未满足,负值表示已满足 - else: # greater_than - # value越大越满足条件,所以距离阈值越远(值越小)越红 - distance = threshold - value # 距离阈值的距离,正值表示未满足,负值表示已满足 - - # 根据距离阈值的程度返回不同颜色 - if condition_type == 'less_than': - is_satisfied = value < threshold - else: # greater_than - is_satisfied = value > threshold - - if is_satisfied: - # 条件已满足,使用绿色系 - if distance < -threshold * 0.5: # 远超阈值 - return '🟢' # 深绿 - elif distance < -threshold * 0.2: # 明显超越阈值 - return '🟢' # 绿色 - else: # 接近阈值但仍满足 - return '🟡' # 黄绿色 - else: - # 条件未满足,使用红色系 - if distance > abs(threshold) * 0.5: # 远离阈值 - return '🔴' # 深红 - elif distance > abs(threshold) * 0.2: # 接近阈值 - return '🟠' # 橙色 - else: # 非常接近阈值 - return '🟡' # 黄色(接近满足) - - # RSI 超卖条件可视化(RSI越小越好) - rsi_values = dataframe['rsi_1h'].iloc[-5:] - rsi_threshold = self.rsi_bull_threshold.value # 使用牛市阈值作为参考 - rsi_visual = ''.join([get_color_emoji(val, rsi_threshold, 'less_than') for val in rsi_values]) - self.strategy_log(f" - [{rsi_visual}]#RSI 超卖(越红越大)") - - # StochRSI 超卖条件可视化(StochRSI越小越好) - stochrsi_values = dataframe['stochrsi_k_1h'].iloc[-5:] - stochrsi_threshold = 0.2 # 固定阈值 - stochrsi_visual = ''.join([get_color_emoji(val, stochrsi_threshold, 'less_than') for val in stochrsi_values]) - self.strategy_log(f" - [{stochrsi_visual}]#StochRSI超卖(越红越大)") - - # MACD 上升趋势条件可视化(MACD直方图越大越好) - macd_values = dataframe['macd_hist_1h'].iloc[-5:] - macd_threshold = 0 # 直方图大于0为上升趋势 - macd_visual = ''.join([get_color_emoji(val, macd_threshold, 'greater_than') for val in macd_values]) - self.strategy_log(f" - [{macd_visual}]#MACD上升趋势(越绿越大)") - - # 趋势确认条件可视化(布尔值,使用原来的✅❌) - trend_recent = trend_confirmation.iloc[-5:].tolist() - trend_visual = ''.join(['✅' if x else '❌' for x in trend_recent]) - self.strategy_log(f" - [{trend_visual}]#趋势确认") - - # 价格接近布林带下轨条件可视化(价格相对于布林带下轨的比例,越小越好) - price_bb_ratios = (dataframe['close'] / dataframe['bb_lower_1h']).iloc[-5:] - bb_threshold = self.bb_lower_deviation.value # 价格/布林带下轨应该小于这个值 - bb_lower_visual = ''.join([get_color_emoji(val, bb_threshold, 'less_than') for val in price_bb_ratios]) - self.strategy_log(f" - [{bb_lower_visual}]#价格接近布林带下轨(越绿越近)") - - # 如果EMA条件满足但最终条件未满足,输出详细信息 - if ema_trend_count > 0 and final_condition_count == 0: - self.strategy_log(f"[{metadata['pair']}] 注意:检测到 {ema_trend_count} 次EMA趋势过滤满足,但由于其他条件不足未能生成入场信号") - # 在populate_entry_trend方法末尾添加 - # 计算条件间的相关性 - conditions = DataFrame({ - 'close_to_bb': close_to_bb_lower_1h, - 'rsi': rsi_condition_1h, - 'stochrsi': stochrsi_condition_1h, - 'macd': macd_condition_1h, - 'vol_bb': (volume_spike | bb_width_condition), - 'trend': trend_confirmation, - 'ema_trend': ema_trend_filter - }) - correlation = conditions.corr().mean().mean() - #self.strategy_log(f"[{metadata['pair']}] 条件平均相关性: {correlation:.2f}") - # 日志记录 - #if dataframe['enter_long'].sum() > 0: - # self.strategy_log(f"[{metadata['pair']}] 发现入场信号数量: {dataframe['enter_long'].sum()}") - - return dataframe - - def detect_h1_rapid_rise(self, pair: str) -> bool: - """ - 检测1小时K线图上的剧烈拉升情况(轻量级版本,用于confirm_trade_entry) - 参数: - - pair: 交易对 - 返回: - - bool: 是否处于不稳固区域 - """ - try: - # 获取1小时K线数据 - df_1h = self.dp.get_pair_dataframe(pair=pair, timeframe='1h') - - # 获取当前优化参数值 - max_candles = self.h1_max_candles.value - rapid_rise_threshold = self.h1_rapid_rise_threshold.value - max_consecutive_candles = self.h1_max_consecutive_candles.value - - # 确保有足够的K线数据 - if len(df_1h) < max_candles: - logger.warning(f"[{pair}] 1h K线数据不足 {max_candles} 根,当前只有 {len(df_1h)} 根,无法完整检测剧烈拉升") - return False - - # 获取最近的K线 - recent_data = df_1h.iloc[-max_candles:].copy() - - # 检查连续最多几根K线内的最大涨幅 - rapid_rise_detected = False - max_rise = 0 - - for i in range(len(recent_data) - max_consecutive_candles + 1): - window_data = recent_data.iloc[i:i + max_consecutive_candles] - window_low = window_data['low'].min() - window_high = window_data['high'].max() - - # 计算区间内的最大涨幅 - if window_low > 0: - rise_percentage = (window_high - window_low) / window_low - if rise_percentage > max_rise: - max_rise = rise_percentage - - # 检查是否超过阈值 - if rise_percentage >= rapid_rise_threshold: - rapid_rise_detected = True - #self.strategy_log(f"[{pair}] 检测到剧烈拉升: 从 {window_low:.2f} 到 {window_high:.2f} ({rise_percentage:.2%}) 在 {max_consecutive_candles} 根K线内") - break - - current_price = recent_data['close'].iloc[-1] - #self.strategy_log(f"[{pair}] 剧烈拉升检测结果: {'不稳固' if rapid_rise_detected else '稳固'}") - #self.strategy_log(f"[{pair}] 最近最大涨幅: {max_rise:.2%}") - - return rapid_rise_detected - - except Exception as e: - logger.error(f"[{pair}] 剧烈拉升检测过程中发生错误: {str(e)}") - return False - - def confirm_trade_entry( - self, - pair: str, - order_type: str, - amount: float, - rate: float, - time_in_force: str, - current_time: datetime, - entry_tag: str | None, - side: str, - **kwargs, - ) -> bool: - """ - 交易买入前的确认函数,用于最终决定是否执行交易 - 此处实现从API获取数据、高波动禁止入场、时间戳和RMSE检查逻辑 - """ - self.strategy_log(f"[{pair}] confirm_trade_entry 被调用 - 价格: {rate:.8f}, 时间: {current_time}") - - # 仅对多头交易进行检查 - if side != 'long': - self.strategy_log(f"[{pair}] 非多头交易,跳过入场检查") - return False - - # 获取虚机名字 - import os - virtual_host_name = os.environ.get('VIRTUAL_HOST_NAME') - self.strategy_log(f"[{pair}] 当前虚机名字: {virtual_host_name}") - - # 只有当虚机名字是kiko时,才执行从外部获取数据和RMSE限制的逻辑 - if virtual_host_name == 'kiko': - self.strategy_log(f"[{pair}] 虚机名字是kiko,执行外部数据获取和RMSE限制逻辑") - import requests - import json - - try: - # 构建API请求URL - api_url = f"http://pairlist.xl.home/api/mlpredictionList?pairName={pair}&sortBy=timestamp" - self.strategy_log(f"[{pair}] 请求API数据: {api_url}") - - # 发送API请求 - response = requests.get(api_url, timeout=10) - response.raise_for_status() # 检查请求是否成功 - - # 解析API响应 - data = response.json() - if not data.get('success'): - self.strategy_log(f"[{pair}] API请求失败: {data.get('message', '未知错误')}") - return False - - predictions = data.get('predictions', []) - if not predictions: - self.strategy_log(f"[{pair}] API返回空预测数据") - return False - - # 只取第一条数据 - first_prediction = predictions[0] - self.strategy_log(f"[{pair}] 从API获取到预测数据: 时间戳={first_prediction.get('timestamp')}") - - # 检查时间戳是否不超过现在时间5分钟 - prediction_timestamp = first_prediction.get('timestamp') - if not prediction_timestamp: - self.strategy_log(f"[{pair}] 预测数据缺少时间戳") - return False - - import time - current_timestamp = int(time.time()) - time_diff_seconds = current_timestamp - prediction_timestamp - time_diff_minutes = time_diff_seconds / 60 - - if time_diff_minutes > 5: - self.strategy_log(f"[{pair}] 预测数据过期: 时间戳 {prediction_timestamp} 距离现在 {time_diff_minutes:.1f} 分钟,超过5分钟限制") - return False - - # 检查RMSE是否小于0.6 - rmse = first_prediction.get('rmse') - if rmse is None: - self.strategy_log(f"[{pair}] 预测数据缺少RMSE值") - return False - - if rmse >= 0.6: - self.strategy_log(f"[{pair}] RMSE值过大: {rmse} >= 0.6,拒绝入场") - return False - - # 检查未来波动率,高波动禁止入场 - values = first_prediction.get('values', {}) - future_volatility = values.get('&s-future_volatility') - - if future_volatility is None: - self.strategy_log(f"[{pair}] 预测数据缺少未来波动率值") - return False - - # 定义高波动率阈值(可根据需要调整) - high_volatility_threshold = 0.7 - if future_volatility > high_volatility_threshold: - self.strategy_log(f"[{pair}] 高波动禁止入场: 未来波动率 {future_volatility} > 阈值 {high_volatility_threshold}") - return False - - # 所有条件通过,允许入场 - self.strategy_log(f"[{pair}] 所有条件通过,允许入场") - self.strategy_log(f"[{pair}] 预测数据详情: ") - self.strategy_log(f" - 时间戳: {prediction_timestamp}") - self.strategy_log(f" - RMSE: {rmse}") - self.strategy_log(f" - 未来波动率: {future_volatility}") - self.strategy_log(f" - 入场信号: {values.get('&s-entry_signal')}") - self.strategy_log(f" - 出场信号: {values.get('&s-exit_signal')}") - - return True - - except requests.RequestException as e: - self.strategy_log(f"[{pair}] API请求失败: {str(e)}") - return False - except Exception as e: - self.strategy_log(f"[{pair}] 检查过程中发生错误: {str(e)}") - return False - else: - # 虚机名字不是kiko,跳过外部数据获取和RMSE限制,直接允许入场 - self.strategy_log(f"[{pair}] 虚机名字不是kiko,跳过外部数据获取和RMSE限制,直接允许入场") - return True - - - - def custom_stoploss(self, pair: str, trade: 'Trade', current_time, current_rate: float, - current_profit: float, **kwargs) -> float: - # 动态止损基于ATR - dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) - last_candle = dataframe.iloc[-1] - atr = last_candle['atr'] - - # 获取当前市场状态 - current_state = dataframe['market_state'].iloc[-1] if 'market_state' in dataframe.columns else 'unknown' - - # 渐进式止损策略 - if current_profit > 0.05: # 利润超过5%时 - return -3.0 * atr / current_rate - elif current_profit > 0.03: # 利润超过3%时 - return -2.5 * atr / current_rate - elif current_profit > 0.01: # 利润超过1%时 - return -2.0 * atr / current_rate - - # 在强劲牛市中,即使小亏损也可以容忍更大回调 - if current_state == 'strong_bull' and current_profit > -0.01: - return -1.8 * atr / current_rate - - if atr > 0: - return -1.2 * atr / current_rate - return self.stoploss - def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, - rate: float, time_in_force: str, exit_reason: str, - current_time: datetime, **kwargs) -> bool: - """ - 智能出场确认:结合ML预测信号来决定是否出场 - """ - # 计算持仓时长(分钟) - trade_age_minutes = (current_time - trade.open_date_utc).total_seconds() / 60 + def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: + """生成入场信号""" + # 基础入场条件:价格接近布林带下轨且成交量放大 + entry_condition = ( + (dataframe['close'] <= dataframe['bb_lower_1h'] * 1.01) & + (dataframe['volume_spike'] == True) + ) - # 设置最小持仓时间保护,避免交易刚开仓就立刻被平仓 - min_hold_minutes = 3 # 至少持有3分钟 - if trade_age_minutes < min_hold_minutes: - self.strategy_log(f"[{pair}] 交易开仓时间仅{trade_age_minutes:.1f}分钟,小于最小持仓时间{min_hold_minutes}分钟,阻止出场") - return False + dataframe.loc[entry_condition, 'enter_long'] = 1 + return dataframe + + def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: + """生成出场信号""" + # 基础出场条件:价格接近布林带上轨 + exit_condition = ( + (dataframe['close'] >= dataframe['bb_upper_1h'] * 0.99) + ) + dataframe.loc[exit_condition, 'exit_long'] = 1 + return dataframe + + def confirm_trade_entry(self, pair, order_type, amount, rate, time_in_force, current_time, entry_tag, side, **kwargs): + """确认入场交易""" # 检查FreqAI是否启用 - freqai_enabled = False - try: - # 更可靠的检查方法:检查config中是否启用了FreqAI - freqai_enabled = self.config.get('freqai', {}).get('enabled', False) - # 同时检查freqai属性是否存在且有效 - if freqai_enabled and hasattr(self, 'freqai') and self.freqai is not None: - freqai_enabled = True - else: - freqai_enabled = False - except Exception as e: - self.strategy_log(f"[{pair}] 检查FreqAI启用状态时出错: {str(e)}") - freqai_enabled = False + freqai_enabled = self.config.get('freqai', {}).get('enabled', False) - try: - if freqai_enabled: - self.strategy_log(f"[{pair}] FreqAI已启用,使用ML信号进行出场决策") - # 获取当前的ML预测数据 - df, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) - if len(df) > 0: - last_row = df.iloc[-1] - - # 获取当前入场信号概率 - current_entry_prob = None - if '&s-entry_signal' in df.columns: - current_entry_prob = float(last_row['&s-entry_signal']) - elif '&-entry_signal_prob' in df.columns: - current_entry_prob = float(last_row['&-entry_signal_prob']) - elif '&-s-entry_signal_prob' in df.columns: - current_entry_prob = float(last_row['&-s-entry_signal_prob']) - - # 获取当前出场信号概率 - current_exit_prob = None - if '&s-exit_signal' in df.columns: - current_exit_prob = float(last_row['&s-exit_signal']) - elif '&-exit_signal_prob' in df.columns: - current_exit_prob = float(last_row['&-exit_signal_prob']) - elif '&-s-exit_signal_prob' in df.columns: - current_exit_prob = float(last_row['&-s-exit_signal_prob']) - - # 获取波动率预测 - predicted_volatility = None - if '&-target_volatility' in df.columns: - predicted_volatility = float(last_row['&-target_volatility']) - - # 获取未来波动率信号 - future_volatility = None - if '&s-future_volatility' in df.columns: - future_volatility = float(last_row['&s-future_volatility']) - elif '&-future_volatility' in df.columns: - future_volatility = float(last_row['&-future_volatility']) - - # 获取趋势预测 - trend_prediction = None - if '&-target_trend' in df.columns: - trend_prediction = float(last_row['&-target_trend']) - - # 应用新规则:当exit_signal > entry_signal × 阈值时退出 - # 基础信号比率阈值 - base_signal_ratio_threshold = 1.33 - signal_ratio_threshold = base_signal_ratio_threshold - - # 根据未来波动率调整阈值(线性函数) - if future_volatility is not None: - # 确保 future_volatility 在 [0, 1] 范围内 - future_volatility = max(0.0, min(1.0, future_volatility)) - - # 使用线性函数:x范围 0.9~0.3,y范围 1.1~1.6 - # 当波动率为0.9时,阈值为1.1;当波动率为0.3时,阈值为1.6 - x_min = 0.3 - x_max = 0.9 - y_min = 1.6 - y_max = 1.1 - - # 线性插值 - if future_volatility >= x_max: - # 高于最高波动率阈值,使用最小阈值 - signal_ratio_threshold = y_max - elif future_volatility <= x_min: - # 低于最低波动率阈值,使用最大阈值 - signal_ratio_threshold = y_min - else: - # 中间值线性插值 - signal_ratio_threshold = y_max + (y_min - y_max) * (future_volatility - x_max) / (x_min - x_max) - - # 限制阈值范围 - signal_ratio_threshold = max(1.1, min(1.6, signal_ratio_threshold)) - if current_entry_prob is not None and current_exit_prob is not None: - # 确保概率在 [0, 1] 范围内 - current_entry_prob = max(0.0, min(1.0, current_entry_prob)) - current_exit_prob = max(0.0, min(1.0, current_exit_prob)) - - if current_exit_prob > current_entry_prob * signal_ratio_threshold: - volatility_info = f",未来波动率: {future_volatility:.2f},调整后阈值: {signal_ratio_threshold:.2f}" if future_volatility is not None else "" - self.strategy_log(f"[{pair}] exit_signal ({current_exit_prob:.2f}) 超过 entry_signal ({current_entry_prob:.2f}) 的 {signal_ratio_threshold:.2f} 倍,支持出场{volatility_info}") - return True - - # 原有ML预测逻辑作为补充 - if current_entry_prob is not None and predicted_volatility is not None: - # 如果当前入场信号概率很高,说明市场条件仍然良好,可以继续持有 - if current_entry_prob > 0.6: - self.strategy_log(f"[{pair}] 当前入场信号概率高({current_entry_prob:.2f}),建议继续持有") - return False # 阻止出场 - - # 如果当前入场信号概率很低,支持出场 - if current_entry_prob < 0.25: - self.strategy_log(f"[{pair}] 当前入场信号概率低({current_entry_prob:.2f}),支持出场") - return True # 支持出场 - - # 如果波动率预测很低但入场信号概率较高,可能不应该立即出场 - if predicted_volatility < 0.35 and current_entry_prob > 0.55: - self.strategy_log(f"[{pair}] 低波动+高入场信号({current_entry_prob:.2f}),建议继续持有等待机会") - return False # 阻止因低波动而强制出场 - - # 如果波动率预测很低且入场信号概率也低,支持出场 - if predicted_volatility < 0.35 and current_entry_prob < 0.3: - self.strategy_log(f"[{pair}] 低波动+低入场信号({current_entry_prob:.2f}),支持出场") - return True # 支持出场 - - # 如果有趋势预测且预测为下跌趋势,支持出场 - if trend_prediction is not None and trend_prediction < 0.3: - self.strategy_log(f"[{pair}] 趋势预测为下跌({trend_prediction:.2f}),支持出场") - return True - else: - self.strategy_log(f"[{pair}] FreqAI未启用,使用默认出场逻辑") - - # 如果无法获取ML数据或FreqAI未启用,使用默认逻辑 - # 只有当exit_reason是exit_signal时才允许出场 - return exit_reason == 'exit_signal' - - except Exception as e: - self.strategy_log(f"[{pair}] ML出场确认检查失败,使用默认逻辑: {e}") - # 发生错误时,使用默认逻辑 - return exit_reason == 'exit_signal' - - def custom_exit(self, pair: str, trade: Trade, current_time: datetime, current_rate: float, - current_profit: float, **kwargs) -> float: - if trade.is_short: - return 0.0 - - trade_age_minutes = (current_time - trade.open_date_utc).total_seconds() / 60 - if trade_age_minutes < 0: - trade_age_minutes = 0 - - # 使用可优化的线性函数: y = (a * (x + k)) + t - a = self.roi_param_a.value # 系数a (可优化参数) - k = self.roi_param_k.value # 偏移量k (可优化参数) - t = self.roi_param_t.value # 常数项t (可优化参数) + if freqai_enabled: + # 强化学习模式下,入场决策主要由RL智能体通过环境交互决定 + # 这里可以添加一些基础的过滤条件 + df, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) + if len(df) > 0: + last_row = df.iloc[-1] + # 基础过滤:避免在RSI超买时入场 + if 'rsi_1h' in last_row and last_row['rsi_1h'] > 70: + return False - dynamic_roi_threshold = (a * (trade_age_minutes + k)) + t - # 确保ROI阈值不小于0 - if dynamic_roi_threshold < 0: - dynamic_roi_threshold = 0.0 - - dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) + return True + + def confirm_trade_exit(self, pair, order_type, amount, rate, time_in_force, current_time, exit_reason, side, **kwargs): + """确认出场交易""" + # 检查FreqAI是否启用 + freqai_enabled = self.config.get('freqai', {}).get('enabled', False) - # 获取市场状态,使用多种回退方案 - if 'market_state' in dataframe.columns: - current_state = dataframe['market_state'].iloc[-1] - elif 'sma' in dataframe.columns and len(dataframe) > 1: - # 使用 SMA 斜率判断市场状态 - sma_diff = dataframe['sma'].diff().iloc[-1] - if sma_diff > 0.01: - current_state = 'strong_bull' - elif sma_diff > 0: - current_state = 'weak_bull' - else: - current_state = 'neutral' - else: - # 默认为中性 - current_state = 'neutral' - - entry_tag = trade.enter_tag if hasattr(trade, 'enter_tag') else None - profit_ratio = current_profit / dynamic_roi_threshold if dynamic_roi_threshold > 0 else 0 - - exit_ratio = 0.0 - if profit_ratio >= 1.0: - if current_state == 'strong_bull': - exit_ratio = 0.5 if profit_ratio < 1.5 else 0.8 - elif current_state == 'weak_bull': - exit_ratio = 0.6 if profit_ratio < 1.2 else 0.9 - else: - exit_ratio = 1.0 - if entry_tag == 'strong_trend': - exit_ratio *= 0.8 - - if dynamic_roi_threshold < 0: - exit_ratio = 1.0 - - #self.strategy_log(f"[{pair}] 动态止盈: 持仓时间={trade_age_minutes:.1f}分钟, 当前利润={current_profit:.2%}, " - # f"动态ROI阈值={dynamic_roi_threshold:.4f}, 利润比值={profit_ratio:.2f}, " - # f"市场状态={current_state}, entry_tag={entry_tag}, 退出比例={exit_ratio:.0%}") + if freqai_enabled: + # 强化学习模式下,出场决策主要由RL智能体通过环境交互决定 + pass - # 当决定退出时,输出出场价格信息 - if exit_ratio > 0: - # 计算出场价格上浮比例(1.25%) - price_markup_percent = 1.25 - adjusted_exit_price = current_rate * 1.0125 - self.strategy_log(f"[{pair}] 准备出场 - 市场价: {current_rate:.8f}, 调整后出场价: {adjusted_exit_price:.8f}, 上浮: {price_markup_percent}%, 退出比例: {exit_ratio:.0%}") + # 基础出场逻辑 + return exit_reason == 'exit_signal' - return exit_ratio +# 强化学习模型定义 +from freqtrade.freqai.prediction_models.ReinforcementLearner import BaseReinforcementLearner - - def adjust_trade_position(self, trade: 'Trade', current_time, current_rate: float, - current_profit: float, min_stake: float, max_stake: float, **kwargs) -> float: - """ - 根据用户要求实现加仓逻辑 - - 加仓间隔设置为可优化参数 add_position_callback - - 加仓额度为: (stake_amount / stake_divisor) ^ (加仓次数 - 1) - """ - # 获取当前交易对 - pair = trade.pair - - # 获取当前交易的加仓次数 - entry_count = len(trade.orders) # 获取所有入场订单数量 - - # 如果已经达到最大加仓次数,则不再加仓 - if entry_count - 1 >= self.max_entry_adjustments.value: - return 0.0 - - # 获取初始入场价格和当前价格的差值百分比 - initial_price = trade.open_rate - if initial_price == 0: - return 0.0 - price_diff_pct = (current_rate - initial_price) / initial_price - - # 计算加仓次数(从1开始计数) - adjustment_count = entry_count - 1 # 已加仓次数 - - # 检查价格回调是否达到加仓间隔 - dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) - current_state = dataframe['market_state'].iloc[-1] if 'market_state' in dataframe.columns else 'neutral' - - # 计算当前所需的加仓间隔百分比 = 基础间隔 * (系数 ^ 已加仓次数) - # 获取当前币对的波动系数,用于动态调整回调百分比 - volatility_coef = self.get_volatility_coefficient(pair) - # 回调百分比 = 基础回调 * (系数 ^ 已加仓次数) * 波动系数 - current_callback = self.add_position_callback.value * (self.add_position_multiplier.value ** adjustment_count) * volatility_coef - - if price_diff_pct <= -current_callback: - # 计算初始入场金额 - initial_stake = trade.orders[0].cost # 第一笔订单的成本 - - # 计算加仓金额: (initial_stake / stake_divisor) ^ (adjustment_count + 1) - additional_stake = (initial_stake / self.stake_divisor.value) * (self.add_position_growth.value ** (adjustment_count + 1)) - - # 确保加仓金额在允许的范围内 - additional_stake = max(min_stake, min(additional_stake, max_stake - trade.stake_amount)) - - #self.strategy_log(f"[{pair}] 触发加仓: 第{adjustment_count + 1}次加仓, 初始金额{initial_stake:.2f}, \ - # 加仓金额{additional_stake:.2f}, 价格差{price_diff_pct:.2%}, 当前利润{current_profit:.2%}") - - return additional_stake - - # 不符合加仓条件,返回0 - return 0.0 - - def custom_stake_amount(self, pair: str, current_time: datetime, **kwargs) -> float: - """ - 定义初始仓位大小 - """ - # 获取默认的基础仓位大小 - default_stake = self.stake_amount - - # 从kwargs获取最小和最大仓位限制 - min_stake = kwargs.get('min_stake', 0.0) - max_stake = kwargs.get('max_stake', default_stake) +class MyReinforcementLearner(BaseReinforcementLearner): + def calculate_reward(self, trade: 'Trade', dataframe: DataFrame, pair: str, trade_dir: int) -> float: + """自定义奖励函数,重点识别和应对洗盘行为""" + # 基础收益奖励 + profit = trade.calc_profit_ratio() + reward = profit * 100 - # 确保仓位在允许的范围内 - adjusted_stake = max(min_stake, min(default_stake, max_stake)) + # 持仓时间奖励/惩罚:避免过早被洗盘出局 + hold_duration = (trade.close_date_utc - trade.open_date_utc).total_seconds() / 60 + if hold_duration < 15 and profit < 0: + reward -= 5 # 惩罚被洗盘的交易 + elif hold_duration > 120 and profit < 0.02: + reward -= 2 # 惩罚长时间低收益交易 - return adjusted_stake - + # 成交量异常奖励:识别洗盘后的反弹 + if 'volume_spike' in dataframe.columns: + entry_idx = dataframe[dataframe['date'] == trade.open_date_utc].index[0] + if entry_idx + 20 < len(dataframe): + post_entry_volume = dataframe.iloc[entry_idx:entry_idx+20]['volume_spike'].sum() + if post_entry_volume > 3 and profit > 0.03: + reward += 8 # 奖励识别并利用洗盘后反弹的交易 + + # 最大回撤惩罚:控制风险 + max_drawdown = trade.max_drawdown + if max_drawdown > 0.05: + reward -= max_drawdown * 200 + + return reward