diff --git a/freqtrade/templates/freqaiprimer.py b/freqtrade/templates/freqaiprimer.py index 8f4d2998..2fca6ab9 100644 --- a/freqtrade/templates/freqaiprimer.py +++ b/freqtrade/templates/freqaiprimer.py @@ -1,914 +1,3 @@ -import warnings -warnings.filterwarnings("ignore", category=UserWarning, module="pandas_ta") - -import logging -from freqtrade.strategy import IStrategy, IntParameter, DecimalParameter -from pandas import DataFrame -import pandas as pd -import pandas_ta as ta -from freqtrade.persistence import Trade -import numpy as np -from datetime import datetime, timezone, timedelta -import math - -logger = logging.getLogger(__name__) - -# ========== 时区统一管理 ========== -# FreqAI dataframe 的 date 列是 UTC 时间 -# 所有虚机/容器统一为 UTC+8 -UTC_PLUS_8 = timezone(timedelta(hours=8)) - -class FreqaiPrimer(IStrategy): - # 策略参数 - 使用custom_roi替代minimal_roi字典 - loglevel = "warning" - minimal_roi = {} - - # 启用自定义ROI回调函数 - use_custom_roi = True - - # FreqAI 要求 - process_only_new_candles = True - - stoploss = -0.15 # 固定止损 -15% (大幅放宽止损以承受更大波动) - trailing_stop = True - trailing_stop_positive_offset = 0.005 # 追踪止损偏移量 0.5% (更容易触发跟踪止盈) - - # 用于跟踪市场状态的数据框缓存 - _dataframe_cache = None - - def __init__(self, config=None): - """初始化策略参数,调用父类初始化方法并接受config参数""" - super().__init__(config) # 调用父类的初始化方法并传递config - # 存储从配置文件加载的默认值 - self._trailing_stop_positive_default = 0.004 # 降低默认值以更容易触发跟踪止盈 - - # 波动系数缓存(简化版:直接计算,无需历史序列) - self._volatility_timestamp = {} # {pair: timestamp} - self._volatility_cache = {} # {pair: volatility_coef} - self._volatility_update_interval = 180 # 波动系数更新间隔(秒),3分钟 - - # 入场间隔控制:记录每个交易对最近一次入场的时间 - # 格式: {pair: datetime} - self._last_entry_time = {} - - # 冷启动保护:记录策略启动时间 - self._strategy_start_time = datetime.now() - - # ========== 数据新鲜度监控统计 ========== - # 用于量化和诊断数据延迟问题的严重性 - self._freshness_stats = { - 'total_checks': 0, # 总检查次数 - 'fresh_count': 0, # 新鲜数据次数 (< 10分钟) - 'warning_count': 0, # 警告次数 (10-30分钟) - 'stale_count': 0, # 过期次数 (> 30分钟) - 'max_delay_minutes': 0.0, # 最大延迟时间 - 'total_delay_minutes': 0.0, # 累计延迟时间(用于计算平均值) - 'pair_delays': {}, # 每个币对的延迟统计 {pair: {'count': N, 'avg_delay': M, 'max_delay': X}} - 'last_report_time': None, # 上次报告时间 - } - self._freshness_report_interval = 3600 # 每小时输出一次统计报告 - - - def strategy_log(self, message: str, level: str = "info") -> None: - """根据 config 的 enable_strategy_log 决定是否输出日志""" - enable_log = self.config.get('enable_strategy_log', False) - if not enable_log: - return - if level.lower() == "debug": - logger.debug(message) - elif level.lower() == "warning": - logger.warning(message) - elif level.lower() == "error": - logger.error(message) - else: - logger.info(message) - - def update_freshness_stats(self, pair: str, data_age_minutes: float) -> None: - """ - 更新数据新鲜度统计 - :param pair: 交易对 - :param data_age_minutes: 数据延迟时间(分钟) - """ - stats = self._freshness_stats - stats['total_checks'] += 1 - stats['total_delay_minutes'] += data_age_minutes - stats['max_delay_minutes'] = max(stats['max_delay_minutes'], data_age_minutes) - - # 分类统计 - if data_age_minutes <= 10: - stats['fresh_count'] += 1 - elif data_age_minutes <= 30: - stats['warning_count'] += 1 - else: - stats['stale_count'] += 1 - - # 每个币对的统计 - if pair not in stats['pair_delays']: - stats['pair_delays'][pair] = { - 'count': 0, - 'total_delay': 0.0, - 'max_delay': 0.0, - 'fresh': 0, - 'warning': 0, - 'stale': 0 - } - - pair_stat = stats['pair_delays'][pair] - pair_stat['count'] += 1 - pair_stat['total_delay'] += data_age_minutes - pair_stat['max_delay'] = max(pair_stat['max_delay'], data_age_minutes) - - if data_age_minutes <= 10: - pair_stat['fresh'] += 1 - elif data_age_minutes <= 30: - pair_stat['warning'] += 1 - else: - pair_stat['stale'] += 1 - - def report_freshness_stats(self, force: bool = False) -> None: - """ - 输出数据新鲜度统计报告 - :param force: 强制输出(忽略时间间隔) - """ - current_time = datetime.now() - stats = self._freshness_stats - - # 检查是否需要报告 - if not force and stats['last_report_time'] is not None: - elapsed = (current_time - stats['last_report_time']).total_seconds() - if elapsed < self._freshness_report_interval: - return - - # 如果没有数据,不输出 - if stats['total_checks'] == 0: - return - - # 计算统计指标 - avg_delay = stats['total_delay_minutes'] / stats['total_checks'] - fresh_rate = stats['fresh_count'] / stats['total_checks'] * 100 - warning_rate = stats['warning_count'] / stats['total_checks'] * 100 - stale_rate = stats['stale_count'] / stats['total_checks'] * 100 - - # 输出总体报告 - self.strategy_log( - f"\n" - f"========================================\n" - f" 📊 数据新鲜度统计报告\n" - f"========================================\n" - f"总检查次数: {stats['total_checks']}\n" - f"新鲜数据 (<10min): {stats['fresh_count']} ({fresh_rate:.1f}%)\n" - f"警告数据 (10-30min): {stats['warning_count']} ({warning_rate:.1f}%)\n" - f"过期数据 (>30min): {stats['stale_count']} ({stale_rate:.1f}%)\n" - f"平均延迟: {avg_delay:.2f} 分钟\n" - f"最大延迟: {stats['max_delay_minutes']:.2f} 分钟\n" - f"========================================", - level="info" - ) - - # 输出每个币对的详细统计(按过期率排序) - if stats['pair_delays']: - pair_stats_list = [] - for pair, pair_stat in stats['pair_delays'].items(): - avg_pair_delay = pair_stat['total_delay'] / pair_stat['count'] - stale_rate_pair = pair_stat['stale'] / pair_stat['count'] * 100 - pair_stats_list.append({ - 'pair': pair, - 'avg_delay': avg_pair_delay, - 'max_delay': pair_stat['max_delay'], - 'stale_rate': stale_rate_pair, - 'count': pair_stat['count'] - }) - - # 按过期率排序(从高到低) - pair_stats_list.sort(key=lambda x: x['stale_rate'], reverse=True) - - report_lines = ["📈 各币对数据新鲜度排名(按过期率降序):"] - for i, ps in enumerate(pair_stats_list[:10], 1): # 只显示前10个最差的 - report_lines.append( - f" {i}. {ps['pair']:15s} | " - f"平均: {ps['avg_delay']:5.1f}min | " - f"最大: {ps['max_delay']:5.1f}min | " - f"过期率: {ps['stale_rate']:5.1f}% | " - f"检查: {ps['count']}次" - ) - - self.strategy_log("\n".join(report_lines), level="info") - - # 更新报告时间 - stats['last_report_time'] = current_time - - - # 只用于adjust_trade_position方法的波动系数获取 - def get_volatility_coefficient(self, pair: str) -> float: - """ - 获取币对的波动系数(简化版:直接计算,无需历史序列) - - USDT/USDT 波动系数设置为0 - - BTC/USDT 波动系数设置为1 - - 其他币对: - 计算当前波动系数 = 该币对波动率 / BTC/USDT波动率 - (基于最近200根1h K线,足够稳定,无需额外平滑) - - 波动系数表示某币对与BTC/USDT相比的波动幅度倍数 - - 山寨币的波动系数可能大于3 - - 稳定性较高的币对(如DOT/USDT)波动系数可能小于1 - - 添加了缓存机制,每3分钟更新一次,避免频繁计算 - """ - # 检查特殊币对 - if pair == 'USDT/USDT': - return 0.0 - elif pair == 'BTC/USDT': - return 1.0 - - try: - # 获取当前时间戳 - current_time = datetime.now().timestamp() - - # 检查缓存:如果距离上次计算时间小于更新间隔,则直接返回缓存值 - if (pair in self._volatility_cache and - pair in self._volatility_timestamp and - current_time - self._volatility_timestamp[pair] < self._volatility_update_interval): - return self._volatility_cache[pair] - - # 直接计算当前波动系数(基于最近200根1h K线) - current_volatility_coef = self._calculate_current_volatility_coef(pair) - - # 更新缓存和时间戳 - self._volatility_cache[pair] = current_volatility_coef - self._volatility_timestamp[pair] = current_time - - self.strategy_log(f"波动系数计算完成 {pair}: 系数={current_volatility_coef:.4f} (基于最近200根1h K线)") - - return current_volatility_coef - - except Exception as e: - logger.warning(f"计算波动系数时出错 {pair}: {str(e)}") - # 如果出错,尝试返回缓存值,否则返回默认值1.0 - return self._volatility_cache.get(pair, 1.0) - - - def _calculate_current_volatility_coef(self, pair: str) -> float: - """ - 计算当前的波动系数(该币对波动率 / BTC/USDT波动率) - """ - try: - # 获取当前币对的1小时k线数据 - current_pair_df, _ = self.dp.get_analyzed_dataframe(pair, '1h') - # 获取BTC/USDT的1小时k线数据 - btc_df, _ = self.dp.get_analyzed_dataframe('BTC/USDT', '1h') - - # 确保有足够的数据点 - if len(current_pair_df) < 2 or len(btc_df) < 2: - return 1.0 # 如果没有足够数据,返回默认值1.0 - - # 对于数据点少于200个的情况,使用所有可用数据 - # 对于数据点多于200个的情况,使用最近200个数据点 - current_data = current_pair_df.iloc[-min(200, len(current_pair_df)):] - btc_data = btc_df.iloc[-min(200, len(btc_df)):] - - # 计算当前币对的对数收益率和波动率 - current_data['returns'] = current_data['close'].pct_change() - current_volatility = current_data['returns'].std() * 100 # 转换为百分比 - - # 计算BTC/USDT的对数收益率和波动率 - btc_data['returns'] = btc_data['close'].pct_change() - btc_volatility = btc_data['returns'].std() * 100 # 转换为百分比 - - # 避免除以零的情况 - if btc_volatility == 0: - return 1.0 - - # 计算波动系数:当前币对波动率 / BTC/USDT波动率 - volatility_coef = current_volatility / btc_volatility - - # 设置合理的上下限,避免极端值影响策略 - # 上限设置为5.0(非常高波动的币对) - # 下限设置为0.1(非常稳定的币对) - return max(0.1, min(5.0, volatility_coef)) - except Exception as e: - logger.warning(f"计算当前波动系数时出错 {pair}: {str(e)}") - return 1.0 # 出错时返回默认值1.0 - - # 其他辅助方法可以在这里添加 - - @property - def protections(self): - """ - 保护机制配置 - 基于最新Freqtrade规范,保护机制应定义在策略文件中而非配置文件 - """ - return [ - { - "method": "StoplossGuard", - "lookback_period_candles": 60, # 3小时回看期(60根3分钟K线) - "trade_limit": 2, # 最多2笔止损交易 - "stop_duration_candles": 60, # 暂停180分钟(60根3分钟K线) - "only_per_pair": False # 仅针对单个币对 - }, - { - "method": "CooldownPeriod", - "stop_duration_candles": 2 # 6分钟冷却期(2根3分钟K线) - }, - { - "method": "MaxDrawdown", - "lookback_period_candles": 48, # 2.4小时回看期 - "trade_limit": 4, # 4笔交易限制 - "stop_duration_candles": 24, # 72分钟暂停(24根3分钟K线) - "max_allowed_drawdown": 0.20 # 20%最大回撤容忍度 - } - ] - - @property - def trailing_stop_positive(self): - """根据市场状态动态调整跟踪止盈参数""" - # 获取当前市场状态 - if self._dataframe_cache is not None and len(self._dataframe_cache) > 0: - current_state = self._dataframe_cache['market_state'].iloc[-1] - if current_state == 'strong_bull': - return 0.007 # 强劲牛市中降低跟踪止盈,让利润奔跑 - elif current_state == 'weak_bull': - return 0.005 # 弱势牛市中保持较低的跟踪止盈 - return self._trailing_stop_positive_default # 返回默认值 - - @trailing_stop_positive.setter - def trailing_stop_positive(self, value): - """设置trailing_stop_positive的默认值""" - self._trailing_stop_positive_default = value - - timeframe = "3m" # 主时间框架为 3 分钟 - can_short = False # 禁用做空 - - # 自定义指标参数 - 使用Hyperopt可优化参数 - bb_length = IntParameter(10, 30, default=20, optimize=True, load=True, space='buy') - bb_std = DecimalParameter(1.5, 3.0, decimals=1, default=2.0, optimize=True, load=True, space='buy') - rsi_length = IntParameter(7, 21, default=14, optimize=True, load=True, space='buy') - rsi_oversold = IntParameter(30, 50, default=42, optimize=True, load=True, space='buy') - - - - # 入场条件阈值参数 - bb_lower_deviation = DecimalParameter(1.01, 1.05, decimals=2, default=1.03, optimize=True, load=True, space='buy') - rsi_bull_threshold = IntParameter(45, 55, default=50, optimize=True, load=True, space='buy') - stochrsi_bull_threshold = IntParameter(30, 40, default=35, optimize=True, load=True, space='buy') - stochrsi_neutral_threshold = IntParameter(20, 30, default=25, optimize=True, load=True, space='buy') - volume_multiplier = DecimalParameter(1.2, 2.0, decimals=1, default=1.5, optimize=True, load=True, space='buy') - bb_width_threshold = DecimalParameter(0.01, 0.03, decimals=3, default=0.02, optimize=True, load=True, space='buy') - min_condition_count = IntParameter(2, 4, default=3, optimize=True, load=True, space='buy') - - # 剧烈拉升检测参数 - 使用Hyperopt可优化参数 - h1_max_candles = IntParameter(100, 300, default=200, optimize=True, load=True, space='buy') - h1_rapid_rise_threshold = DecimalParameter(0.05, 0.15, decimals=3, default=0.11, optimize=True, load=True, space='buy') - h1_max_consecutive_candles = IntParameter(1, 4, default=2, optimize=True, load=True, space='buy') - - # 入场间隔控制参数(分钟) - entry_interval_minutes = IntParameter(20, 200, default=42, optimize=True, load=True, space='buy') - - # EMA20斜率阈值参数(用于趋势强度过滤) - ema20_slope_threshold = DecimalParameter(0.0005, 0.005, decimals=4, default=0.001, optimize=True, load=True, space='buy') - - # ML 审核官:entry_signal 拒绝入场的阈值(越高越宽松,越低越严格) - ml_entry_signal_threshold = DecimalParameter(0.05, 0.85, decimals=2, default=0.37, optimize=True, load=True, space='buy') - - # ML 审核官:exit_signal 拒绝出场的阈值(越高越宽松,越低越严格) - ml_exit_signal_threshold = DecimalParameter(0.05, 0.85, decimals=2, default=0.68, optimize=True, load=True, space='buy') - - # FreqAI 标签定义:entry_signal 的洛底上涨幅度(%) - freqai_entry_up_percent = DecimalParameter(0.3, 2.0, decimals=2, default=0.5, optimize=True, load=True, space='buy') - - # FreqAI 标签定义:exit_signal 的洛底下跌幅度(%) - freqai_exit_down_percent = DecimalParameter(0.3, 2.0, decimals=2, default=0.5, optimize=True, load=True, space='buy') - - # 定义可优化参数 - # 初始入场金额: 75.00 - -# 加仓次数 相对降幅间隔 加仓金额 -# ------- ------------ -------- -# 0 N/A 75 -# 1 0.045000 36.29 -# 2 0.051750 163.31 -# 3 0.059513 734.88 -# 4 0.068439 3306.96 -# -# 累计投入金额: 4316.43 - max_entry_adjustments = IntParameter(2, 5, default=4, optimize=False, load=True, space='buy') # 最大加仓次数 - add_position_callback = DecimalParameter(0.02, 0.06, decimals=3, default=0.047, optimize=False, load=True, space='buy') # 加仓回调百分比 - add_position_growth = DecimalParameter(1.5, 5.0, decimals=2, default=4.5, optimize=False, load=True, space='buy') # 加仓金额增长因子,保留2位小数用于hyperopt优化 - add_position_multiplier = DecimalParameter(0.2, 2, decimals=2, default=1.35, optimize=False, load=True, space='buy') # 加仓间隔系数,保留2位小数用于hyperopt优化 - stake_divisor = DecimalParameter(2.0, 12.0, decimals=2, default=9.3, optimize=False, load=True, space='buy') # 加仓金额分母(小数类型,保留2位小数) - - # 线性ROI参数 - 用于线性函数: y = (a * (x + k)) + t - roi_param_a = DecimalParameter(-0.0002, -0.00005, decimals=5, default=-0.0001, optimize=True, load=True, space='sell') # 系数a - roi_param_k = IntParameter(20, 150, default=50, optimize=True, load=True, space='sell') # 偏移量k - roi_param_t = DecimalParameter(0.02, 0.18, decimals=3, default=0.06, optimize=True, load=True, space='sell') # 常数项t - # 出场条件阈值参数 - exit_bb_upper_deviation = DecimalParameter(0.98, 1.02, decimals=2, default=1.0, optimize=True, load=True, space='sell') - exit_volume_multiplier = DecimalParameter(1.5, 3.0, decimals=1, default=2.0, optimize=True, load=True, space='sell') - - rsi_overbought = IntParameter(50, 70, default=58, optimize=True, load=True, space='sell') - - - def informative_pairs(self): - pairs = self.dp.current_whitelist() - return [(pair, '15m') for pair in pairs] + [(pair, '1h') for pair in pairs] - - def _validate_dataframe_columns(self, dataframe: DataFrame, required_columns: list, metadata: dict): - """ - 验证数据框中是否包含所有需要的列。 - 如果缺少列,则记录警告日志。 - """ - missing_columns = [col for col in required_columns if col not in dataframe.columns] - if missing_columns: - logger.warning(f"[{metadata['pair']}] 数据框中缺少以下列: {missing_columns}") - - # ========================= FreqAI 特征与标签定义 ========================= - def feature_engineering_expand_all(self, dataframe: DataFrame, period: int, metadata: dict, **kwargs) -> DataFrame: - """FreqAI 全量特征:这里先用简单技术指标,后续可逐步扩展。""" - # 使用 rolling 计算 RSI(减少看前偏差) - delta = dataframe["close"].diff() - gain = delta.where(delta > 0, 0).rolling(window=period).mean() - loss = -delta.where(delta < 0, 0).rolling(window=period).mean() - rs = gain / loss - dataframe[f"%-rsi-{period}"] = 100 - (100 / (1 + rs)) - - dataframe[f"%-mfi-{period}"] = ta.mfi(dataframe["high"], dataframe["low"], dataframe["close"], dataframe["volume"], length=period) - adx_df = ta.adx(dataframe["high"], dataframe["low"], dataframe["close"], length=period) - adx_col = f"ADX_{period}" - if adx_col in adx_df.columns: - dataframe[f"%-adx-{period}"] = adx_df[adx_col] - return dataframe - - def feature_engineering_expand_basic(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame: - """FreqAI 基础特征。""" - dataframe["%-pct_change"] = dataframe["close"].pct_change().fillna(0) - dataframe["%-raw_volume"] = dataframe["volume"].fillna(0) - dataframe["%-raw_price"] = dataframe["close"].ffill() # 使用 ffill() 替代 fillna(method="ffill") - return dataframe - - def feature_engineering_standard(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame: - """FreqAI 标准时间类特征。""" - if "date" in dataframe.columns: - dataframe["%-day_of_week"] = dataframe["date"].dt.dayofweek - dataframe["%-hour_of_day"] = dataframe["date"].dt.hour - return dataframe - - def set_freqai_targets(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame: - """定义 FreqAI 训练标签:简单二分类版本 + 持仓时长预测。""" - # 从配置中读取预测窗口参数(禁止硬编码) - label_horizon = self.freqai_info.get('feature_parameters', {}).get('label_period_candles', 24) - - # 动态计算上涨/下跌阈值 - entry_up_percent = self.freqai_entry_up_percent.value / 100.0 # 转换为小数(如 0.01 表示 1%) - exit_down_percent = self.freqai_exit_down_percent.value / 100.0 - - entry_up_threshold = 1.0 + entry_up_percent # 例如 1.01 表示 +1% - exit_down_threshold = 1.0 - exit_down_percent # 例如 0.99 表示 -1% - - # 入场标签:未来窗口内的最高价是否超过 +1% - future_max = dataframe["close"].rolling(window=label_horizon, min_periods=1).max().shift(-label_horizon + 1) - dataframe["&s-entry_signal"] = np.where( - future_max > dataframe["close"] * entry_up_threshold, - 1, - 0, - ) - - # 出场标签:未来窗口内的最低价是否跌破 -1% - future_min = dataframe["close"].rolling(window=label_horizon, min_periods=1).min().shift(-label_horizon + 1) - dataframe["&s-exit_signal"] = np.where( - future_min < dataframe["close"] * exit_down_threshold, - 1, - 0, - ) - - # 新增:未来波动率预测标签(极端化方案) - # 计算当前波动率(过10根K线的收盘价波动) - current_volatility = dataframe["close"].pct_change().rolling(window=10, min_periods=5).std() - - # 计算未来10根K线的波动率(向未来移动) - future_pct_change = dataframe["close"].pct_change().shift(-1) # 未来的收盘价变化 - future_volatility = future_pct_change.rolling(window=10, min_periods=5).std().shift(-9) # 未来10根K线的波动率 - - # 标签:未来波动率 > 当前波动率 * 1.5 则标记为高波动(趋势启动) - volatility_ratio = future_volatility / (current_volatility + 1e-8) # 避免除以0 - dataframe["&s-future_volatility"] = np.where( - volatility_ratio > 1.5, - 1, # 未来高波动(趋势启动),继续持有 - 0 # 未来低波动(震荡市),快速止盈 - ) - - return dataframe - - def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: - # 计算 3m 周期的指标 - bb_length_value = self.bb_length.value - bb_std_value = self.bb_std.value - rsi_length_value = self.rsi_length.value - - # 使用 rolling 计算布林带(减少看前偏差) - bb_ma_3m = dataframe['close'].rolling(window=bb_length_value).mean() - bb_std_3m = dataframe['close'].rolling(window=bb_length_value).std() - dataframe['bb_lower_3m'] = bb_ma_3m - (bb_std_value * bb_std_3m) - dataframe['bb_upper_3m'] = bb_ma_3m + (bb_std_value * bb_std_3m) - - # 使用 rolling 计算 RSI(减少看前偏差) - delta_3m = dataframe['close'].diff() - gain_3m = delta_3m.where(delta_3m > 0, 0).rolling(window=rsi_length_value).mean() - loss_3m = -delta_3m.where(delta_3m < 0, 0).rolling(window=rsi_length_value).mean() - rs_3m = gain_3m / loss_3m - dataframe['rsi_3m'] = 100 - (100 / (1 + rs_3m)) - - # 新增 StochRSI 指标 - stochrsi_3m = ta.stochrsi(dataframe['close'], length=rsi_length_value, rsi_length=rsi_length_value) - dataframe['stochrsi_k_3m'] = stochrsi_3m[f'STOCHRSIk_{rsi_length_value}_{rsi_length_value}_3_3'] - dataframe['stochrsi_d_3m'] = stochrsi_3m[f'STOCHRSId_{rsi_length_value}_{rsi_length_value}_3_3'] - - # 新增 MACD 指标 - macd_3m = ta.macd(dataframe['close'], fast=12, slow=26, signal=9) - dataframe['macd_3m'] = macd_3m['MACD_12_26_9'] - dataframe['macd_signal_3m'] = macd_3m['MACDs_12_26_9'] - dataframe['macd_hist_3m'] = macd_3m['MACDh_12_26_9'] - - # 使用 ewm 计算 EMA(减少看前偏差,adjust=False 确保实时计算) - dataframe['ema_50_3m'] = dataframe['close'].ewm(span=50, adjust=False).mean() - dataframe['ema_200_3m'] = dataframe['close'].ewm(span=200, adjust=False).mean() - - # 成交量过滤 - dataframe['volume_ma'] = dataframe['volume'].rolling(20).mean() - - # 计算 ATR 用于动态止损和退出 - dataframe['atr'] = ta.atr(dataframe['high'], dataframe['low'], dataframe['close'], length=14) - - # 获取 15m 数据 - df_15m = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='15m') - - # 使用 rolling 计算 RSI(减少看前偏差) - delta_15m = df_15m['close'].diff() - gain_15m = delta_15m.where(delta_15m > 0, 0).rolling(window=rsi_length_value).mean() - loss_15m = -delta_15m.where(delta_15m < 0, 0).rolling(window=rsi_length_value).mean() - rs_15m = gain_15m / loss_15m - df_15m['rsi_15m'] = 100 - (100 / (1 + rs_15m)) - - # 使用 ewm 计算 EMA(减少看前偏差) - df_15m['ema_50_15m'] = df_15m['close'].ewm(span=50, adjust=False).mean() - df_15m['ema_200_15m'] = df_15m['close'].ewm(span=200, adjust=False).mean() - - # 新增 StochRSI 指标 - stochrsi_15m = ta.stochrsi(df_15m['close'], length=rsi_length_value, rsi_length=rsi_length_value) - df_15m['stochrsi_k_15m'] = stochrsi_15m[f'STOCHRSIk_{rsi_length_value}_{rsi_length_value}_3_3'] - df_15m['stochrsi_d_15m'] = stochrsi_15m[f'STOCHRSId_{rsi_length_value}_{rsi_length_value}_3_3'] - - # 新增 MACD 指标 - macd_15m = ta.macd(df_15m['close'], fast=12, slow=26, signal=9) - df_15m['macd_15m'] = macd_15m['MACD_12_26_9'] - df_15m['macd_signal_15m'] = macd_15m['MACDs_12_26_9'] - df_15m['macd_hist_15m'] = macd_15m['MACDh_12_26_9'] - - # 将 15m 数据重新索引到主时间框架 (3m) - df_15m = df_15m.set_index('date').reindex(dataframe['date']).reset_index() - df_15m = df_15m.rename(columns={'index': 'date'}) - df_15m = df_15m[['date', 'rsi_15m', 'ema_50_15m', 'ema_200_15m']].ffill() - - # 合并 15m 数据 - dataframe = dataframe.merge(df_15m, how='left', on='date') - - # 获取 1h 数据 - df_1h = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='1h') - - # 使用 rolling 计算布林带(减少看前偏差) - bb_ma_1h = df_1h['close'].rolling(window=bb_length_value).mean() - bb_std_1h = df_1h['close'].rolling(window=bb_length_value).std() - df_1h['bb_lower_1h'] = bb_ma_1h - (bb_std_value * bb_std_1h) - df_1h['bb_upper_1h'] = bb_ma_1h + (bb_std_value * bb_std_1h) - - # 添加 EMA5 和 EMA20 用于趋势过滤(方案2:宽松条件) - df_1h['ema_5_1h'] = df_1h['close'].ewm(span=5, adjust=False).mean() - df_1h['ema_20_1h'] = df_1h['close'].ewm(span=20, adjust=False).mean() - - # 计算 EMA20 斜率(用于趋势强度过滤) - df_1h['ema20_slope'] = df_1h['ema_20_1h'].diff() - # 归一化斜率,使其相对于价格水平标准化 - df_1h['ema20_slope_normalized'] = df_1h['ema20_slope'] / df_1h['ema_20_1h'] - - # 检测 EMA5 向上穿越 EMA20(添加安全检查) - if len(df_1h) >= 2: - df_1h['ema5_cross_above_ema20'] = ( - (df_1h['ema_5_1h'] > df_1h['ema_20_1h']) & - (df_1h['ema_5_1h'].shift(1) <= df_1h['ema_20_1h'].shift(1)) - ) - else: - # 数据不足时,默认为False - df_1h['ema5_cross_above_ema20'] = False - - # 使用 rolling 计算 RSI(减少看前偏差) - delta_1h = df_1h['close'].diff() - gain_1h = delta_1h.where(delta_1h > 0, 0).rolling(window=rsi_length_value).mean() - loss_1h = -delta_1h.where(delta_1h < 0, 0).rolling(window=rsi_length_value).mean() - rs_1h = gain_1h / loss_1h - df_1h['rsi_1h'] = 100 - (100 / (1 + rs_1h)) - - # 使用 ewm 计算 EMA(减少看前偏差) - df_1h['ema_50_1h'] = df_1h['close'].ewm(span=50, adjust=False).mean() - df_1h['ema_200_1h'] = df_1h['close'].ewm(span=200, adjust=False).mean() - df_1h['trend_1h'] = df_1h['close'] > df_1h['ema_50_1h'] # 1h上涨趋势 - - # 新增 StochRSI 指标 - stochrsi_1h = ta.stochrsi(df_1h['close'], length=rsi_length_value, rsi_length=rsi_length_value) - df_1h['stochrsi_k_1h'] = stochrsi_1h[f'STOCHRSIk_{rsi_length_value}_{rsi_length_value}_3_3'] - df_1h['stochrsi_d_1h'] = stochrsi_1h[f'STOCHRSId_{rsi_length_value}_{rsi_length_value}_3_3'] - - # 新增 MACD 指标 - macd_1h = ta.macd(df_1h['close'], fast=12, slow=26, signal=9) - df_1h['macd_1h'] = macd_1h['MACD_12_26_9'] - df_1h['macd_signal_1h'] = macd_1h['MACDs_12_26_9'] - df_1h['macd_hist_1h'] = macd_1h['MACDh_12_26_9'] - - # 验证 MACD 列是否正确生成 - #self.strategy_log(f"[{metadata['pair']}] 1小时 MACD 列: {list(macd_1h.columns)}") - - # 确保 StochRSI 指标已正确计算 - # 将 1h 数据重新索引到主时间框架 (3m),并填充缺失值 - df_1h = df_1h.set_index('date').reindex(dataframe['date']).ffill().bfill().reset_index() - df_1h = df_1h.rename(columns={'index': 'date'}) -# Include macd_1h, macd_signal_1h, ema_5_1h, ema_20_1h, ema5_cross_above_ema20 in the column selection - df_1h = df_1h[['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h', 'bb_lower_1h', 'bb_upper_1h', 'stochrsi_k_1h', 'stochrsi_d_1h', 'macd_1h', 'macd_signal_1h', 'ema_5_1h', 'ema_20_1h', 'ema20_slope_normalized', 'ema5_cross_above_ema20']].ffill() - -# Validate that all required columns are present - required_columns = ['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h', - 'bb_lower_1h', 'bb_upper_1h', 'stochrsi_k_1h', 'stochrsi_d_1h', - 'macd_1h', 'macd_signal_1h', 'ema_5_1h', 'ema_20_1h', 'ema20_slope_normalized', 'ema5_cross_above_ema20'] - missing_columns = [col for col in required_columns if col not in df_1h.columns] - if missing_columns: - logger.error(f"[{metadata['pair']}] 缺少以下列: {missing_columns}") - raise KeyError(f"缺少以下列: {missing_columns}") - - # 确保所有需要的列都被合并 - required_columns = ['date', 'rsi_1h', 'trend_1h', 'ema_50_1h', 'ema_200_1h', - 'bb_lower_1h', 'bb_upper_1h', 'stochrsi_k_1h', 'stochrsi_d_1h', - 'macd_1h', 'macd_signal_1h', 'ema_5_1h', 'ema_20_1h', 'ema20_slope_normalized', 'ema5_cross_above_ema20'] - - # 验证所需列是否存在 - missing_columns = [col for col in required_columns if col not in df_1h.columns] - if missing_columns: - logger.error(f"[{metadata['pair']}] 缺少以下列: {missing_columns}") - raise KeyError(f"缺少以下列: {missing_columns}") - - df_1h = df_1h[required_columns] # 确保包含所有必需的列(包括EMA过滤相关列) - - # 合并 1h 数据 - dataframe = dataframe.merge(df_1h, how='left', on='date').ffill() - - # 验证合并后的列 - #self.strategy_log(f"[{metadata['pair']}] 合并后的数据框列名: {list(dataframe.columns)}") - - # K线形态:看涨吞没 - dataframe['bullish_engulfing'] = ( - (dataframe['close'].shift(1) < dataframe['open'].shift(1)) & - (dataframe['close'] > dataframe['open']) & - (dataframe['close'] > dataframe['open'].shift(1)) & - (dataframe['open'] < dataframe['close'].shift(1)) - ) - - # 计算各时间框架的趋势状态(牛/熊) - # 3m时间框架:ema50下穿ema200为熊,上穿为牛 - dataframe['trend_3m'] = np.where(dataframe['ema_50_3m'] > dataframe['ema_200_3m'], 1, 0) - - # 15m时间框架:ema50下穿ema200为熊,上穿为牛 - dataframe['trend_15m'] = np.where(dataframe['ema_50_15m'] > dataframe['ema_200_15m'], 1, 0) - - # 1h时间框架:ema50下穿ema200为熊,上穿为牛 - dataframe['trend_1h_ema'] = np.where(dataframe['ema_50_1h'] > dataframe['ema_200_1h'], 1, 0) - - # 计算熊牛得分(0-100) - # 权重:3m熊牛权重10,15m熊牛权重35,1h熊牛权重65 - # 计算加权得分 - dataframe['market_score'] = ( - dataframe['trend_3m'] * 10 + - dataframe['trend_15m'] * 35 + - dataframe['trend_1h_ema'] * 65 - ) - - # 确保得分在0-100范围内 - dataframe['market_score'] = dataframe['market_score'].clip(lower=0, upper=100) - - # 根据得分分类市场状态 - dataframe['market_state'] = 'neutral' - dataframe.loc[dataframe['market_score'] > 70, 'market_state'] = 'strong_bull' - dataframe.loc[(dataframe['market_score'] > 50) & (dataframe['market_score'] <= 70), 'market_state'] = 'weak_bull' - dataframe.loc[(dataframe['market_score'] >= 30) & (dataframe['market_score'] <= 50), 'market_state'] = 'neutral' - dataframe.loc[(dataframe['market_score'] > 10) & (dataframe['market_score'] < 30), 'market_state'] = 'weak_bear' - dataframe.loc[dataframe['market_score'] <= 10, 'market_state'] = 'strong_bear' - - # 创建一个使用前一行市场状态的列,避免在populate_entry_trend中使用iloc[-1] - dataframe['prev_market_state'] = dataframe['market_state'].shift(1) - # 为第一行设置默认值 - dataframe['prev_market_state'] = dataframe['prev_market_state'].fillna('neutral') - - # 记录当前的市场状态 - if len(dataframe) > 0: - current_score = dataframe['market_score'].iloc[-1] - current_state = dataframe['market_state'].iloc[-1] - #self.strategy_log(f"[{metadata['pair']}] 熊牛得分: {current_score:.1f}, 市场状态: {current_state}") - #self.strategy_log(f"[{metadata['pair']}] 各时间框架趋势: 3m={'牛' if dataframe['trend_3m'].iloc[-1] == 1 else '熊'}, \ - # 15m={'牛' if dataframe['trend_15m'].iloc[-1] == 1 else '熊'}, \ - # 1h={'牛' if dataframe['trend_1h_ema'].iloc[-1] == 1 else '熊'}") - - # 调试:打印指标值(最后 5 行),验证时间对齐 - #print(f"Pair: {metadata['pair']}, Last 5 rows after reindexing:") - #print(dataframe[['date', 'close', 'bb_lower_3m', 'rsi_3m', 'rsi_15m', 'rsi_1h', 'trend_1h', - # 'trend_3m', 'trend_15m', 'trend_1h_ema', 'market_score', 'market_state', - # 'bullish_engulfing', 'volume', 'volume_ma']].tail(5)) - - # 打印最终数据框的列名以验证 - #self.strategy_log(f"[{metadata['pair']}] 最终数据框列名: {list(dataframe.columns)}") - - # 启用 FreqAI:在所有指标计算完成后调用 - dataframe = self.freqai.start(dataframe, metadata, self) - - return dataframe - - def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: - # 出场信号基于趋势和量价关系 - # 条件1: 价格突破布林带上轨(使用可优化的偏差参数) - breakout_condition = dataframe['close'] >= dataframe['bb_upper_1h'] * self.exit_bb_upper_deviation.value - - # 条件2: 成交量显著放大(使用可优化的成交量乘数) - volume_spike = dataframe['volume'] > dataframe['volume_ma'] * self.exit_volume_multiplier.value - - # 条件3: MACD 下降趋势 - macd_downward = dataframe['macd_1h'] < dataframe['macd_signal_1h'] - - # 条件4: RSI 进入超买区域(使用可优化的超买阈值) - rsi_overbought = dataframe['rsi_1h'] > self.rsi_overbought.value - - # 合并所有条件 - final_condition = breakout_condition | volume_spike | macd_downward | rsi_overbought - - # 设置出场信号 - dataframe.loc[final_condition, 'exit_long'] = 1 - - # 设置出场价格:上浮1.25%(使用乘法避免除零风险) - # Freqtrade 会优先使用 exit_price 列作为限价单价格 - final_exit_condition = dataframe['exit_long'] == 1 - #dataframe.loc[final_exit_condition, 'exit_price'] = dataframe.loc[final_exit_condition, 'close'] * 1.0125 - - # 增强调试信息 - #self.strategy_log(f"[{metadata['pair']}] 出场条件检查:") - #self.strategy_log(f" - 价格突破布林带上轨: {breakout_condition.sum()} 次") - #self.strategy_log(f" - 成交量显著放大: {volume_spike.sum()} 次") - #self.strategy_log(f" - MACD 下降趋势: {macd_downward.sum()} 次") - #self.strategy_log(f" - RSI 超买: {rsi_overbought.sum()} 次") - #self.strategy_log(f" - 最终条件: {final_condition.sum()} 次") - #self.strategy_log(f" - 使用参数: exit_bb_upper_deviation={self.exit_bb_upper_deviation.value}, exit_volume_multiplier={self.exit_volume_multiplier.value}, rsi_overbought={self.rsi_overbought.value}") - - # 日志记录 - #if dataframe['exit_long'].sum() > 0: - # self.strategy_log(f"[{metadata['pair']}] 触发出场信号数量: {dataframe['exit_long'].sum()}") - - return dataframe - - def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: - # 确保prev_market_state列存在 - if 'prev_market_state' not in dataframe.columns: - dataframe['prev_market_state'] = 'neutral' - - # 条件1: 价格接近布林带下轨(允许一定偏差) - close_to_bb_lower_1h = (dataframe['close'] <= dataframe['bb_lower_1h'] * self.bb_lower_deviation.value) # 可优化偏差 - - # 条件2: RSI 不高于阈值(根据市场状态动态调整) - # 为每一行创建动态阈值 - rsi_condition_1h = dataframe.apply(lambda row: - row['rsi_1h'] < self.rsi_bull_threshold.value if row['prev_market_state'] in ['strong_bull', 'weak_bull'] else row['rsi_1h'] < self.rsi_oversold.value, - axis=1) - - # 条件3: StochRSI 处于超卖区域(根据市场状态动态调整) - stochrsi_condition_1h = dataframe.apply(lambda row: - (row['stochrsi_k_1h'] < self.stochrsi_bull_threshold.value and row['stochrsi_d_1h'] < self.stochrsi_bull_threshold.value) if row['prev_market_state'] in ['strong_bull', 'weak_bull'] - else (row['stochrsi_k_1h'] < self.stochrsi_neutral_threshold.value and row['stochrsi_d_1h'] < self.stochrsi_neutral_threshold.value), - axis=1) - - # 条件4: MACD 上升趋势 - macd_condition_1h = dataframe['macd_1h'] > dataframe['macd_signal_1h'] - - # 条件5: 成交量显著放大(可选条件) - volume_spike = dataframe['volume'] > dataframe['volume_ma'] * self.volume_multiplier.value - - # 条件6: 布林带宽度过滤(避免窄幅震荡) - bb_width = (dataframe['bb_upper_1h'] - dataframe['bb_lower_1h']) / dataframe['close'] - bb_width_condition = bb_width > self.bb_width_threshold.value # 可优化的布林带宽度阈值 - - # 辅助条件: 3m 和 15m 趋势确认(允许部分时间框架不一致) - trend_confirmation = (dataframe['trend_3m'] == 1) | (dataframe['trend_15m'] == 1) - - # 新增:EMA趋势过滤条件(方案2:宽松版本) - # 条件1:EMA5保持在EMA20之上 或 条件2:最近20根1h K线内发生过向上穿越 - # 这样既能捕捉趋势启动,又能在趋势延续时继续入场 - if 'ema_5_1h' in dataframe.columns and 'ema_20_1h' in dataframe.columns: - # 条件1:EMA5保持在EMA20之上 - ema5_above_ema20 = dataframe['ema_5_1h'] > dataframe['ema_20_1h'] - - # 条件2:最近20根1h K线内发生过向上穿越 - if 'ema5_cross_above_ema20' in dataframe.columns: - # 使用rolling.max检查最近20根K线内是否有True值 - recent_cross = dataframe['ema5_cross_above_ema20'].rolling(window=20, min_periods=1).max() == 1 - # 两个条件满足其一即可 - ema5_20_condition = ema5_above_ema20 | recent_cross - else: - # 如果没有交叉列,只用保持在上方的条件 - ema5_20_condition = ema5_above_ema20 - - # 新增条件:EMA20斜率阈值检查 - if 'ema20_slope_normalized' in dataframe.columns: - # EMA20斜率为正(上升趋势)且超过阈值 - ema20_positive_slope = dataframe['ema20_slope_normalized'] > self.ema20_slope_threshold.value - # 组合条件:EMA5/EMA20条件 AND EMA20斜率条件 - ema_trend_filter = ema5_20_condition & ema20_positive_slope - else: - # 如果没有斜率列,只用EMA5/EMA20条件 - ema_trend_filter = ema5_20_condition - else: - # 如果列不存在,创建一个全False的Series(不允许入场) - self.strategy_log(f"[{metadata['pair']}] 警告:ema_5_1h或ema_20_1h列不存在,过滤条件设为False") - ema_trend_filter = pd.Series(False, index=dataframe.index) - - # 合并所有条件(减少强制性条件) - # 至少满足多个条件中的一定数量,并且必须满足EMA趋势过滤 - condition_count = ( - close_to_bb_lower_1h.astype(int) + - rsi_condition_1h.astype(int) + - stochrsi_condition_1h.astype(int) + - macd_condition_1h.astype(int) + - (volume_spike | bb_width_condition).astype(int) + # 成交量或布林带宽度满足其一即可 - trend_confirmation.astype(int) - ) - # 最终条件:基本条件 + EMA趋势过滤(方案2:宽松版) - basic_condition = condition_count >= self.min_condition_count.value - final_condition = basic_condition & ema_trend_filter - - # 设置入场信号 - dataframe.loc[final_condition, 'enter_long'] = 1 - - # ========== 新增:入场诊断统计(回测可用) ========== - # 先输出最新数据时间(dataframe最后一行),用于判断数据新鲜度 - if 'date' in dataframe.columns and len(dataframe) > 0: - latest_data_date = dataframe['date'].iloc[-1] - try: - if isinstance(latest_data_date, datetime): - display_latest = latest_data_date - if display_latest.tzinfo is None: - display_latest = display_latest.replace(tzinfo=timezone.utc) - display_latest = display_latest.astimezone(UTC_PLUS_8) - latest_time_str = display_latest.strftime('%H:%M:%S') - # 计算数据新鲜度 - latest_naive = display_latest.replace(tzinfo=None) - data_freshness = (datetime.now() - latest_naive).total_seconds() / 60 - if data_freshness <= 10: - fresh_icon = "✅" - elif data_freshness <= 30: - fresh_icon = "⚠️" - else: - fresh_icon = "❌" - self.strategy_log( - f"[{metadata['pair']}] 数据新鲜度: {fresh_icon} {data_freshness:.1f}min | " - f"最新K线: {latest_time_str} | 总行数: {len(dataframe)}" - ) - else: - from pandas import Timestamp - if isinstance(latest_data_date, Timestamp): - display_latest = latest_data_date.to_pydatetime() - if display_latest.tzinfo is None: - display_latest = display_latest.replace(tzinfo=timezone.utc) - display_latest = display_latest.astimezone(UTC_PLUS_8) - latest_time_str = display_latest.strftime('%H:%M:%S') - latest_naive = display_latest.replace(tzinfo=None) - data_freshness = (datetime.now() - latest_naive).total_seconds() / 60 - if data_freshness <= 10: - fresh_icon = "✅" - elif data_freshness <= 30: - fresh_icon = "⚠️" - else: - fresh_icon = "❌" - self.strategy_log( - f"[{metadata['pair']}] 数据新鲜度: {fresh_icon} {data_freshness:.1f}min | " - f"最新K线: {latest_time_str} | 总行数: {len(dataframe)}" - ) - except Exception as e: - self.strategy_log(f"[{metadata['pair']}] 数据新鲜度计算异常: {e}") - - # ========== 诊断统计结束 ========== - - # 设置入场价格:下调1.67%(使用乘法避免除零风险) - final_condition_updated = dataframe['enter_long'] == 1 - #dataframe.loc[final_condition_updated, 'enter_price'] = dataframe.loc[final_condition_updated, 'close'] * 0.9833 - - # 增强调试信息 - # 确保ema_trend_filter是Series类型才能调用sum() - if isinstance(ema_trend_filter, pd.Series): - ema_trend_count = ema_trend_filter.sum() - else: - ema_trend_count = 0 - - basic_condition_count = basic_condition.sum() - final_condition_count = final_condition.sum() - self.strategy_log(f"[{metadata['pair']}] 入场条件检查:") self.strategy_log(f" - 价格接近布林带下轨: {close_to_bb_lower_1h.sum()} 次") self.strategy_log(f" - RSI 超卖: {rsi_condition_1h.sum()} 次") @@ -920,728 +9,30 @@ class FreqaiPrimer(IStrategy): self.strategy_log(f" - 基本条件满足: {basic_condition_count} 次") self.strategy_log(f" - 最终条件(基本+EMA过滤): {final_condition_count} 次") - # 如果EMA条件满足但最终条件未满足,输出详细信息 - if ema_trend_count > 0 and final_condition_count == 0: - self.strategy_log(f"[{metadata['pair']}] 注意:检测到 {ema_trend_count} 次EMA趋势过滤满足,但由于其他条件不足未能生成入场信号") - # 在populate_entry_trend方法末尾添加 - # 计算条件间的相关性 - conditions = DataFrame({ - 'close_to_bb': close_to_bb_lower_1h, - 'rsi': rsi_condition_1h, - 'stochrsi': stochrsi_condition_1h, - 'macd': macd_condition_1h, - 'vol_bb': (volume_spike | bb_width_condition), - 'trend': trend_confirmation, - 'ema_trend': ema_trend_filter - }) - correlation = conditions.corr().mean().mean() - #self.strategy_log(f"[{metadata['pair']}] 条件平均相关性: {correlation:.2f}") - # 日志记录 - #if dataframe['enter_long'].sum() > 0: - # self.strategy_log(f"[{metadata['pair']}] 发现入场信号数量: {dataframe['enter_long'].sum()}") - - return dataframe - - def detect_h1_rapid_rise(self, pair: str) -> bool: - """ - 检测1小时K线图上的剧烈拉升情况(轻量级版本,用于confirm_trade_entry) - 参数: - - pair: 交易对 - 返回: - - bool: 是否处于不稳固区域 - """ - try: - # 获取1小时K线数据 - df_1h = self.dp.get_pair_dataframe(pair=pair, timeframe='1h') + # ========== 新增:最近5个时间点的条件满足状况可视化 ========== + # 显示最近5个时间点的条件满足情况(更直观) + if len(dataframe) >= 5: + recent_indices = dataframe.index[-5:] + self.strategy_log(f"[{metadata['pair']}] 最近5个时间点条件满足状况:") - # 获取当前优化参数值 - max_candles = self.h1_max_candles.value - rapid_rise_threshold = self.h1_rapid_rise_threshold.value - max_consecutive_candles = self.h1_max_consecutive_candles.value + # RSI 超卖条件可视化 + rsi_recent = rsi_condition_1h.iloc[-5:].tolist() + rsi_visual = ''.join(['✅' if x else '❌' for x in rsi_recent]) + self.strategy_log(f" - RSI 超卖: [{rsi_visual}]") - # 确保有足够的K线数据 - if len(df_1h) < max_candles: - logger.warning(f"[{pair}] 1h K线数据不足 {max_candles} 根,当前只有 {len(df_1h)} 根,无法完整检测剧烈拉升") - return False + # StochRSI 超卖条件可视化 + stochrsi_recent = stochrsi_condition_1h.iloc[-5:].tolist() + stochrsi_visual = ''.join(['✅' if x else '❌' for x in stochrsi_recent]) + self.strategy_log(f" - StochRSI 超卖: [{stochrsi_visual}]") - # 获取最近的K线 - recent_data = df_1h.iloc[-max_candles:].copy() + # MACD 上升趋势条件可视化 + macd_recent = macd_condition_1h.iloc[-5:].tolist() + macd_visual = ''.join(['✅' if x else '❌' for x in macd_recent]) + self.strategy_log(f" - MACD 上升趋势: [{macd_visual}]") - # 检查连续最多几根K线内的最大涨幅 - rapid_rise_detected = False - max_rise = 0 - - for i in range(len(recent_data) - max_consecutive_candles + 1): - window_data = recent_data.iloc[i:i + max_consecutive_candles] - window_low = window_data['low'].min() - window_high = window_data['high'].max() - - # 计算区间内的最大涨幅 - if window_low > 0: - rise_percentage = (window_high - window_low) / window_low - if rise_percentage > max_rise: - max_rise = rise_percentage - - # 检查是否超过阈值 - if rise_percentage >= rapid_rise_threshold: - rapid_rise_detected = True - #self.strategy_log(f"[{pair}] 检测到剧烈拉升: 从 {window_low:.2f} 到 {window_high:.2f} ({rise_percentage:.2%}) 在 {max_consecutive_candles} 根K线内") - break - - current_price = recent_data['close'].iloc[-1] - #self.strategy_log(f"[{pair}] 剧烈拉升检测结果: {'不稳固' if rapid_rise_detected else '稳固'}") - #self.strategy_log(f"[{pair}] 最近最大涨幅: {max_rise:.2%}") - - return rapid_rise_detected - - except Exception as e: - logger.error(f"[{pair}] 剧烈拉升检测过程中发生错误: {str(e)}") - return False - - def confirm_trade_entry( - self, - pair: str, - order_type: str, - amount: float, - rate: float, - time_in_force: str, - current_time: datetime, - entry_tag: str | None, - side: str, - **kwargs, - ) -> bool: - """ - 交易买入前的确认函数,用于最终决定是否执行交易 - 此处实现剧烈拉升检查和入场间隔控制逻辑 - """ - self.strategy_log(f"[{pair}] confirm_trade_entry 被调用 - 价格: {rate:.8f}, 时间: {current_time}") + # 趋势确认条件可视化 + trend_recent = trend_confirmation.iloc[-5:].tolist() + trend_visual = ''.join(['✅' if x else '❌' for x in trend_recent]) + self.strategy_log(f" - 趋势确认: [{trend_visual}]") - # 记录所有拒绝条件的列表 - rejected_conditions = [] - - # 仅对多头交易进行检查 - if side == 'long': - # 冷启动保护:程序启动后前20分钟内不允许入场 - cold_start_rejected = False - if hasattr(self, "_strategy_start_time"): - # 在非实盘环境(回测/模拟)下禁用冷启动保护 - if not self.config.get("dry_run", False): - warmup_minutes = 20 - # current_time 来自框架,是 offset-aware (UTC) - # _strategy_start_time 是 offset-naive (本地时间) - # 需要统一处理:把 current_time 转换为本地时间并移除时区信息 - if current_time.tzinfo is not None: - local_current_time = current_time.astimezone(UTC_PLUS_8).replace(tzinfo=None) - else: - local_current_time = current_time - elapsed_minutes = (local_current_time - self._strategy_start_time).total_seconds() / 60.0 - if elapsed_minutes < warmup_minutes: - rejected_conditions.append(f"冷启动保护: 策略启动未满 {warmup_minutes} 分钟(已运行 {elapsed_minutes:.1f} 分钟)") - self.strategy_log(f"[{pair}] {rejected_conditions[-1]}") - cold_start_rejected = True - else: - # 在实盘模式下,记录冷启动保护已禁用 - self.strategy_log(f"[{pair}] 实盘模式,冷启动保护已禁用") - - # 检查1:入场间隔控制(使用hyperopt参数) - interval_rejected = False - if pair in self._last_entry_time and not cold_start_rejected: - last_entry = self._last_entry_time[pair] - time_diff = (current_time - last_entry).total_seconds() * 0.0166666667 # 转换为分钟(使用乘法避免除法) - if time_diff < self.entry_interval_minutes.value: - rejected_conditions.append(f"入场间隔不足: 距离上次入场 {time_diff:.1f}分钟 < {self.entry_interval_minutes.value}分钟") - self.strategy_log(f"[{pair}] {rejected_conditions[-1]}") - interval_rejected = True - - # 检查2:检查是否处于剧烈拉升的不稳固区域 - unstable_rejected = False - if not cold_start_rejected and not interval_rejected: - is_unstable_region = self.detect_h1_rapid_rise(pair) - if is_unstable_region: - rejected_conditions.append("剧烈拉升检测: 检测到1小时图剧烈拉升") - self.strategy_log(f"[{pair}] {rejected_conditions[-1]}") - unstable_rejected = True - - # 检查3:ML 审核官(FreqAI 过滤低质量入场)+ 入场诊断统计 - # 逻辑:用 entry_signal 概率来判断——若"容易上涨概率"低,则拒绝入场 - ml_rejected = False - try: - if not cold_start_rejected and not interval_rejected and not unstable_rejected: - df, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) - if len(df) > 0: - last_row = df.iloc[-1] - - # ========== 关键修复:数据新鲜度检查 ========== - # 检查 ML 预测数据是否过期,避免使用过时的预测结果 - # 注意:只在 Live/Dryrun 模式下检查,回测模式不需要(数据总是同步的) - freshness_rejected = False - if self.dp.runmode.value in ('live', 'dry_run'): - data_timestamp = last_row.get('date') - if data_timestamp is not None: - # 确保 data_timestamp 是 datetime 对象 - if not isinstance(data_timestamp, datetime): - try: - from pandas import Timestamp - if isinstance(data_timestamp, Timestamp): - data_timestamp = data_timestamp.to_pydatetime() - except: - pass - - # 计算数据延迟(分钟) - if isinstance(data_timestamp, datetime): - current_time = datetime.now() - - # FreqAI dataframe 的 date 列是 UTC 时间 - if data_timestamp.tzinfo is None: - data_timestamp = data_timestamp.replace(tzinfo=timezone.utc) - # 转换为 UTC+8 并移除时区信息 - data_timestamp = data_timestamp.astimezone(UTC_PLUS_8).replace(tzinfo=None) - - data_age_minutes = (current_time - data_timestamp).total_seconds() / 60 - - # 📊 更新统计数据 - self.update_freshness_stats(pair, data_age_minutes) - - # 📈 定期输出统计报告(每小时) - self.report_freshness_stats() - - # 如果数据超过 30 分钟,拒绝入场 - if data_age_minutes > 30: - rejected_conditions.append( - f"数据过期: 最新数据时间 {data_timestamp}, " - f"距离现在 {data_age_minutes:.1f} 分钟,ML 预测已失效" - ) - self.strategy_log( - f"[数据新鲜度] [{pair}] ❌ {rejected_conditions[-1]}" - ) - freshness_rejected = True - elif data_age_minutes > 10: - # 10-30分钟:警告但允许入场 - self.strategy_log( - f"[数据新鲜度] [{pair}] ⚠️ 数据较旧: 最新数据时间 {data_timestamp}, " - f"距离现在 {data_age_minutes:.1f} 分钟,警告但允许入场" - ) - # ========== 数据新鲜度检查结束 ========== - - if freshness_rejected: - # 数据过期,直接拒绝,不再进行后续检查 - pass - else: - entry_prob = None - - # 优先使用 FreqAI 的 entry_signal 预测列 - if '&s-entry_signal' in df.columns: - entry_prob = float(last_row['&s-entry_signal']) - elif '&-entry_signal_prob' in df.columns: - entry_prob = float(last_row['&-entry_signal_prob']) - elif '&-s-entry_signal_prob' in df.columns: - entry_prob = float(last_row['&-s-entry_signal_prob']) - elif '&-entry_signal' in df.columns: - val = last_row['&-entry_signal'] - if isinstance(val, (int, float)): - entry_prob = float(val) - else: - # 文本标签时,简单映射为 0/1 - entry_prob = 1.0 if str(val).lower() in ['entry', 'buy', '1'] else 0.0 - - # ========== 新增:入场诊断统计 ========== - # 统计当前入场点的关键指标,用于分析"买在高位"问题 - current_close = float(last_row['close']) - - # 1. 价格与短期高点的关系 - recent_high_5 = float(df['high'].iloc[-5:].max()) if len(df) >= 5 else current_close - price_vs_recent_high = (current_close - recent_high_5) / recent_high_5 if recent_high_5 > 0 else 0 - - # 2. 价格与 EMA5 的关系 - ema5_1h = float(last_row.get('ema_5_1h', current_close)) - price_vs_ema5 = (current_close - ema5_1h) / ema5_1h if ema5_1h > 0 else 0 - - # 3. 价格与布林带的位置 - bb_upper = float(last_row.get('bb_upper_1h', current_close)) - bb_lower = float(last_row.get('bb_lower_1h', current_close)) - bb_position = (current_close - bb_lower) / (bb_upper - bb_lower) if (bb_upper - bb_lower) > 0 else 0.5 - - # 4. RSI 状态 - rsi_1h = float(last_row.get('rsi_1h', 50)) - - # 5. MACD 状态 - macd_1h = float(last_row.get('macd_1h', 0)) - macd_signal_1h = float(last_row.get('macd_signal_1h', 0)) - macd_cross = 'up' if macd_1h > macd_signal_1h else 'down' - - # 6. 市场状态 - market_state = str(last_row.get('market_state', 'unknown')) - - # 7. 数据时间戳和延迟信息 - data_timestamp = last_row.get('date') - data_age_str = 'N/A' - if data_timestamp is not None: - # 确保 data_timestamp 是 datetime 对象 - if not isinstance(data_timestamp, datetime): - try: - from pandas import Timestamp - if isinstance(data_timestamp, Timestamp): - data_timestamp = data_timestamp.to_pydatetime() - except: - pass - - # 计算数据延迟 - if isinstance(data_timestamp, datetime): - current_time = datetime.now() - - # FreqAI dataframe 的 date 列是 UTC 时间 - if data_timestamp.tzinfo is None: - data_timestamp = data_timestamp.replace(tzinfo=timezone.utc) - # 转换为 UTC+8 并移除时区信息 - data_timestamp = data_timestamp.astimezone(UTC_PLUS_8).replace(tzinfo=None) - - data_age_minutes = (current_time - data_timestamp).total_seconds() / 60 - data_age_str = f"{data_age_minutes:.1f}min" - - # 根据延迟添加状态标识 - if data_age_minutes <= 10: - freshness_icon = "✅" - elif data_age_minutes <= 30: - freshness_icon = "⚠️" - else: - freshness_icon = "❌" - data_age_str = f"{freshness_icon} {data_age_str}" - - # 输出诊断日志 - if entry_prob is not None: - ml_prob_str = f"{entry_prob:.2f}" - else: - ml_prob_str = "N/A" - - # 格式化数字,限制有效位数不超过5位 - def format_number(value, max_digits=5): - """格式化数字,限制有效位数""" - if value == 0: - return "0" - # 转换为字符串并去掉末尾的0 - s = f"{value:.10f}".rstrip('0').rstrip('.') - # 如果小数点前的数字超过max_digits,使用科学计数法 - if len(s.split('.')[0]) > max_digits: - return f"{value:.2e}" - # 如果总长度超过max_digits,截断 - if len(s) > max_digits: - # 保留小数点前的数字 - if '.' in s: - integer_part, decimal_part = s.split('.') - if len(integer_part) > max_digits: - return f"{value:.2e}" - else: - # 截断小数部分 - max_decimal = max_digits - len(integer_part) - 1 - if max_decimal > 0: - s = integer_part + '.' + decimal_part[:max_decimal] - else: - s = integer_part - else: - # 整数部分超过限制 - if len(s) > max_digits: - return f"{value:.2e}" - return s - - # 处理时间戳时区转换,确保显示正确的时间 - display_time = "N/A" - if isinstance(data_timestamp, datetime): - # 如果时间戳有时区信息,转换为本地时间 - if data_timestamp.tzinfo is not None: - # 转换为UTC+8时间显示 - local_time = data_timestamp.astimezone(UTC_PLUS_8) - display_time = local_time.strftime('%H:%M:%S') - else: - # 如果没有时区信息,假设是本地时间 - display_time = data_timestamp.strftime('%H:%M:%S') - - self.strategy_log( - f"[入场诊断] {pair} | " - f"信号时间: {display_time} | " - f"延迟: {data_age_str} | " - f"价格: {format_number(current_close)} | " - f"vs 5K高点: {price_vs_recent_high:+.2%} | " - f"vs EMA5: {price_vs_ema5:+.2%} | " - f"布林位置: {bb_position:.2f} | " - f"RSI: {rsi_1h:.1f} | " - f"MACD: {macd_cross} | " - f"市场: {market_state} | " - f"ML入场概率: {ml_prob_str}" - ) - # ========== 诊断统计结束 ========== - - if entry_prob is not None: - # 确保概率在 [0, 1] 范围内(分类器输出可能有浮点误差) - entry_prob = max(0.0, min(1.0, entry_prob)) - entry_threshold = self.ml_entry_signal_threshold.value - if entry_prob < entry_threshold: - rejected_conditions.append(f"ML审核官: entry_signal 概率 {entry_prob:.2f} < 阈值 {entry_threshold:.2f}") - self.strategy_log(f"[{pair}] {rejected_conditions[-1]}") - ml_rejected = True - else: - self.strategy_log(f"[{pair}] ML 审核官允许入场: entry_signal 概率 {entry_prob:.2f} >= 阈值 {entry_threshold:.2f}") - else: - self.strategy_log(f"[{pair}] 无数据分析数据,跳过ML审核") - else: - self.strategy_log(f"[{pair}] 因前置条件拒绝,跳过ML审核") - except Exception as e: - logger.warning(f"[{pair}] ML 审核官检查失败,忽略 ML 过滤: {e}") - rejected_conditions.append(f"ML审核官异常: {str(e)}") - self.strategy_log(f"[{pair}] {rejected_conditions[-1]}") - else: - # 如果不是多头交易,也记录下来 - self.strategy_log(f"[{pair}] 非多头交易,跳过入场检查") - - # 如果没有任何拒绝条件,表示允许入场 - if not rejected_conditions: - self.strategy_log(f"[{pair}] 所有条件通过,允许入场") - return True - else: - # 显示所有被拒绝的条件 - self.strategy_log(f"[{pair}] 入场被拒绝,原因列表: {', '.join(rejected_conditions)}") - return False - - def confirm_trade_exit( - self, - pair: str, - trade: 'Trade', - order_type: str, - amount: float, - rate: float, - time_in_force: str, - exit_reason: str, - current_time: datetime, - **kwargs, - ) -> bool: - """ - 交易卖出前的确认函数,用于最终决定是否执行出场 - 此处使用 ML 审核官(exit_signal 置信度)过滤出场 - """ - self.strategy_log(f"[{pair}] confirm_trade_exit 被调用 - 价格: {rate:.8f}, 出场原因: {exit_reason}, 时间: {current_time}") - # 风险控制类退出原因:不经过 ML 审核官,直接允许出场 - if exit_reason in ['stop_loss', 'trailing_stop_loss', 'emergency_exit', 'force_exit']: - self.strategy_log(f"[{pair}] 风险控制退出,不走 ML 审核官: exit_reason={exit_reason}") - return True - # 默认允许出场 - allow_exit = True - - try: - df, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) - if len(df) > 0: - last_row = df.iloc[-1] - exit_prob = None - - # 优先使用 FreqAI 的 exit_signal 预测列 - if '&s-exit_signal' in df.columns: - exit_prob = float(last_row['&s-exit_signal']) - elif '&-exit_signal_prob' in df.columns: - exit_prob = float(last_row['&-exit_signal_prob']) - elif '&-s-exit_signal_prob' in df.columns: - exit_prob = float(last_row['&-s-exit_signal_prob']) - elif '&-exit_signal' in df.columns: - val = last_row['&-exit_signal'] - if isinstance(val, (int, float)): - exit_prob = float(val) - else: - # 文本标签时,简单映射为 0/1 - exit_prob = 1.0 if str(val).lower() in ['exit', 'sell', '1'] else 0.0 - - if exit_prob is not None: - # 确保概率在 [0, 1] 范围内(分类器输出可能有浮点误差) - exit_prob = max(0.0, min(1.0, exit_prob)) - - # 从 kwargs 获取当前利润,freqtrade 会传入 current_profit - current_profit = float(kwargs.get('current_profit', 0.0)) - - # 获取出场一字基础阈值 - base_threshold = self.ml_exit_signal_threshold.value - - # 计算持仓时长(分钟) - try: - trade_age_minutes = max(0.0, (current_time - trade.open_date_utc).total_seconds() / 60.0) - except Exception: - trade_age_minutes = 0.0 - - # 基于持仓时长的阈值衰减:持仓越久,阈值越低,越容易出场 - age_factor = min(trade_age_minutes / (24 * 60.0), 1.0) # 0~1,对应 0~24 小时+ - dynamic_threshold = base_threshold * (1.0 - 0.3 * age_factor) - - # 小利润单(<=2%)再额外放宽 20% - if current_profit <= 0.02: - dynamic_threshold *= 0.8 - - # 新增:读取 AI 预测的未来波动率信号(极端化方案) - future_vol_signal = None - if '&s-future_volatility' in df.columns: - future_vol_signal = float(last_row['&s-future_volatility']) - elif '&-future_volatility' in df.columns: - future_vol_signal = float(last_row['&-future_volatility']) - - # 极端化逻辑:根据 AI 预测的未来波动率直接接管部分出场决策 - if future_vol_signal is not None and exit_reason == 'exit_signal': - # 情况A:AI 预测强趋势(高波动),且当前不亏损 → 忽略本次 exit_signal,继续持有 - if future_vol_signal > 0.65 and current_profit >= 0: - self.strategy_log( - f"[波动率 AI] [{pair}] AI 预测强趋势(高波动 {future_vol_signal:.2f}),忽略本次 exit_signal,继续持有 | " - f"持仓: {trade_age_minutes:.1f}min, 利润: {current_profit:.4f}" - ) - allow_exit = False - return allow_exit - # 情况B:AI 预测震荡市(低波动) → 强制接受 exit_signal,立即出场 - elif future_vol_signal < 0.35: - self.strategy_log( - f"[波动率 AI] [{pair}] AI 预测震荡市(低波动 {future_vol_signal:.2f}),强制接受 exit_signal 出场 | " - f"持仓: {trade_age_minutes:.1f}min, 利润: {current_profit:.4f}" - ) - return True - # 介于 0.35-0.65 之间:中性区间,不做强制处理,继续走原有 ML 审核官逻辑 - - # 设定下限,避免阈值过低 - dynamic_threshold = max(0.05, dynamic_threshold) - - if exit_prob < dynamic_threshold: - self.strategy_log( - f"[{pair}] ML 审核官拒绝出场: exit_signal 概率 {exit_prob:.2f} < 动态阈值 {dynamic_threshold:.2f}" - f" | 原应出场原因: {exit_reason} | 持仓: {trade_age_minutes:.1f}min, 利润: {current_profit:.4f}" - f" | 波动率AI: {future_vol_signal if future_vol_signal is not None else 'N/A'}" - ) - allow_exit = False - else: - if future_vol_signal is not None: - future_vol_str = f"{future_vol_signal:.2f}" - else: - future_vol_str = "N/A" - - # 格式化数字,限制有效位数不超过5位 - def format_number(value, max_digits=5): - """格式化数字,限制有效位数""" - if value == 0: - return "0" - # 转换为字符串并去掉末尾的0 - s = f"{value:.10f}".rstrip('0').rstrip('.') - # 如果小数点前的数字超过max_digits,使用科学计数法 - if len(s.split('.')[0]) > max_digits: - return f"{value:.2e}" - # 如果总长度超过max_digits,截断 - if len(s) > max_digits: - # 保留小数点前的数字 - if '.' in s: - integer_part, decimal_part = s.split('.') - if len(integer_part) > max_digits: - return f"{value:.2e}" - else: - # 截断小数部分 - max_decimal = max_digits - len(integer_part) - 1 - if max_decimal > 0: - s = integer_part + '.' + decimal_part[:max_decimal] - else: - s = integer_part - else: - # 整数部分超过限制 - if len(s) > max_digits: - return f"{value:.2e}" - return s - - # 格式化价格,限制有效位数不超过5位 - def format_number(value, max_digits=5): - """格式化数字,限制有效位数""" - if value == 0: - return "0" - # 转换为字符串并去掉末尾的0 - s = f"{value:.10f}".rstrip('0').rstrip('.') - # 如果小数点前的数字超过max_digits,使用科学计数法 - if len(s.split('.')[0]) > max_digits: - return f"{value:.2e}" - # 如果总长度超过max_digits,截断 - if len(s) > max_digits: - # 保留小数点前的数字 - if '.' in s: - integer_part, decimal_part = s.split('.') - if len(integer_part) > max_digits: - return f"{value:.2e}" - else: - # 截断小数部分 - max_decimal = max_digits - len(integer_part) - 1 - if max_decimal > 0: - s = integer_part + '.' + decimal_part[:max_decimal] - else: - s = integer_part - else: - # 整数部分超过限制 - if len(s) > max_digits: - return f"{value:.2e}" - return s - - self.strategy_log( - f"[{pair}] ML 审核官允许出场: exit_signal 概率 {exit_prob:.2f} >= 动态阈值 {dynamic_threshold:.2f}" - f" | 出场原因: {exit_reason} | 持仓: {format_number(trade_age_minutes)}min, 利润: {format_number(current_profit)}" - f" | 波动率AI: {future_vol_str}" - ) - except Exception as e: - logger.warning(f"[{pair}] ML 审核官出场检查失败,允许出场: {e}") - return allow_exit - - def custom_stoploss(self, pair: str, trade: 'Trade', current_time, current_rate: float, - current_profit: float, **kwargs) -> float: - # 动态止损基于ATR - dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) - last_candle = dataframe.iloc[-1] - atr = last_candle['atr'] - - # 获取当前市场状态 - current_state = dataframe['market_state'].iloc[-1] if 'market_state' in dataframe.columns else 'unknown' - - # 渐进式止损策略 - if current_profit > 0.05: # 利润超过5%时 - return -3.0 * atr / current_rate - elif current_profit > 0.03: # 利润超过3%时 - return -2.5 * atr / current_rate - elif current_profit > 0.01: # 利润超过1%时 - return -2.0 * atr / current_rate - - # 在强劲牛市中,即使小亏损也可以容忍更大回调 - if current_state == 'strong_bull' and current_profit > -0.01: - return -1.8 * atr / current_rate - - if atr > 0: - return -1.2 * atr / current_rate - return self.stoploss - - def custom_exit(self, pair: str, trade: Trade, current_time: datetime, current_rate: float, - current_profit: float, **kwargs) -> float: - if trade.is_short: - return 0.0 - - trade_age_minutes = (current_time - trade.open_date_utc).total_seconds() / 60 - if trade_age_minutes < 0: - trade_age_minutes = 0 - - # 使用可优化的线性函数: y = (a * (x + k)) + t - a = self.roi_param_a.value # 系数a (可优化参数) - k = self.roi_param_k.value # 偏移量k (可优化参数) - t = self.roi_param_t.value # 常数项t (可优化参数) - - dynamic_roi_threshold = (a * (trade_age_minutes + k)) + t - # 确保ROI阈值不小于0 - if dynamic_roi_threshold < 0: - dynamic_roi_threshold = 0.0 - - dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) - - # 获取市场状态,使用多种回退方案 - if 'market_state' in dataframe.columns: - current_state = dataframe['market_state'].iloc[-1] - elif 'sma' in dataframe.columns and len(dataframe) > 1: - # 使用 SMA 斜率判断市场状态 - sma_diff = dataframe['sma'].diff().iloc[-1] - if sma_diff > 0.01: - current_state = 'strong_bull' - elif sma_diff > 0: - current_state = 'weak_bull' - else: - current_state = 'neutral' - else: - # 默认为中性 - current_state = 'neutral' - - entry_tag = trade.enter_tag if hasattr(trade, 'enter_tag') else None - profit_ratio = current_profit / dynamic_roi_threshold if dynamic_roi_threshold > 0 else 0 - - exit_ratio = 0.0 - if profit_ratio >= 1.0: - if current_state == 'strong_bull': - exit_ratio = 0.5 if profit_ratio < 1.5 else 0.8 - elif current_state == 'weak_bull': - exit_ratio = 0.6 if profit_ratio < 1.2 else 0.9 - else: - exit_ratio = 1.0 - if entry_tag == 'strong_trend': - exit_ratio *= 0.8 - - if dynamic_roi_threshold < 0: - exit_ratio = 1.0 - - #self.strategy_log(f"[{pair}] 动态止盈: 持仓时间={trade_age_minutes:.1f}分钟, 当前利润={current_profit:.2%}, " - # f"动态ROI阈值={dynamic_roi_threshold:.4f}, 利润比值={profit_ratio:.2f}, " - # f"市场状态={current_state}, entry_tag={entry_tag}, 退出比例={exit_ratio:.0%}") - - # 当决定退出时,输出出场价格信息 - if exit_ratio > 0: - # 计算出场价格上浮比例(1.25%) - price_markup_percent = 1.25 - adjusted_exit_price = current_rate * 1.0125 - self.strategy_log(f"[{pair}] 准备出场 - 市场价: {current_rate:.8f}, 调整后出场价: {adjusted_exit_price:.8f}, 上浮: {price_markup_percent}%, 退出比例: {exit_ratio:.0%}") - - return exit_ratio - - - def adjust_trade_position(self, trade: 'Trade', current_time, current_rate: float, - current_profit: float, min_stake: float, max_stake: float, **kwargs) -> float: - """ - 根据用户要求实现加仓逻辑 - - 加仓间隔设置为可优化参数 add_position_callback - - 加仓额度为: (stake_amount / stake_divisor) ^ (加仓次数 - 1) - """ - # 获取当前交易对 - pair = trade.pair - - # 获取当前交易的加仓次数 - entry_count = len(trade.orders) # 获取所有入场订单数量 - - # 如果已经达到最大加仓次数,则不再加仓 - if entry_count - 1 >= self.max_entry_adjustments.value: - return 0.0 - - # 获取初始入场价格和当前价格的差值百分比 - initial_price = trade.open_rate - if initial_price == 0: - return 0.0 - price_diff_pct = (current_rate - initial_price) / initial_price - - # 计算加仓次数(从1开始计数) - adjustment_count = entry_count - 1 # 已加仓次数 - - # 检查价格回调是否达到加仓间隔 - dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) - current_state = dataframe['market_state'].iloc[-1] if 'market_state' in dataframe.columns else 'neutral' - - # 计算当前所需的加仓间隔百分比 = 基础间隔 * (系数 ^ 已加仓次数) - # 获取当前币对的波动系数,用于动态调整回调百分比 - volatility_coef = self.get_volatility_coefficient(pair) - # 回调百分比 = 基础回调 * (系数 ^ 已加仓次数) * 波动系数 - current_callback = self.add_position_callback.value * (self.add_position_multiplier.value ** adjustment_count) * volatility_coef - - if price_diff_pct <= -current_callback: - # 计算初始入场金额 - initial_stake = trade.orders[0].cost # 第一笔订单的成本 - - # 计算加仓金额: (initial_stake / stake_divisor) ^ (adjustment_count + 1) - additional_stake = (initial_stake / self.stake_divisor.value) * (self.add_position_growth.value ** (adjustment_count + 1)) - - # 确保加仓金额在允许的范围内 - additional_stake = max(min_stake, min(additional_stake, max_stake - trade.stake_amount)) - - #self.strategy_log(f"[{pair}] 触发加仓: 第{adjustment_count + 1}次加仓, 初始金额{initial_stake:.2f}, \ - # 加仓金额{additional_stake:.2f}, 价格差{price_diff_pct:.2%}, 当前利润{current_profit:.2%}") - - return additional_stake - - # 不符合加仓条件,返回0 - return 0.0 - - def custom_stake_amount(self, pair: str, current_time: datetime, **kwargs) -> float: - """ - 定义初始仓位大小 - """ - # 获取默认的基础仓位大小 - default_stake = self.stake_amount - - # 从kwargs获取最小和最大仓位限制 - min_stake = kwargs.get('min_stake', 0.0) - max_stake = kwargs.get('max_stake', default_stake) - - # 确保仓位在允许的范围内 - adjusted_stake = max(min_stake, min(default_stake, max_stake)) - - return adjusted_stake - + # 如果EMA条件满足但最终条件未满足,输出详细信息 \ No newline at end of file