myTestFreqAI/freqtrade/templates/freqaiprimer.py
2025-08-23 00:02:55 +08:00

2133 lines
116 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import numpy as np
import datetime
import datetime
import os
import json
import glob
import redis
from functools import reduce
from freqtrade.persistence import Trade
import talib.abstract as ta
from pandas import DataFrame
import pandas as pd
from typing import Dict
from freqtrade.strategy import (DecimalParameter, IStrategy, IntParameter)
from datetime import datetime
logger = logging.getLogger(__name__)
class FreqaiPrimer(IStrategy):
"""
基于 FreqAI 的动态阈值交易策略,集成动态加仓、减仓和自定义 ROI 逻辑,兼容最新 Freqtrade 版本
"""
INTERFACE_VERSION = 3 # 使用最新策略接口版本
# --- 🧪 固定配置参数从hyperopt优化结果获取---
TRAILING_STOP_START = 0.045
TRAILING_STOP_DISTANCE = 0.0125
BUY_THRESHOLD_MIN = -0.035
BUY_THRESHOLD_MAX = -0.001
SELL_THRESHOLD_MIN = 0.002
SELL_THRESHOLD_MAX = 0.065
# 新增:加仓和减仓参数
ADD_POSITION_THRESHOLD = -0.021
EXIT_POSITION_RATIO = 0.472
COOLDOWN_PERIOD_MINUTES = 9
MAX_ENTRY_POSITION_ADJUSTMENT = 3
# 🟢 绿色通道开关 - 直接控制是否启用绿色通道功能
GREEN_CHANNEL_ENABLED = False # 设置为False关闭绿色通道True开启绿色通道
# 趋势判定阈值参数
TREND_BULLISH_THRESHOLD = 85
TREND_BEARISH_THRESHOLD = 60
# 新的加权趋势判定阈值用于hyperopt优化
TREND_FINAL_BULLISH_THRESHOLD = 55 # 上涨趋势最终阈值
TREND_FINAL_BEARISH_THRESHOLD = 13 # 下跌趋势最终阈值
# 🎯 绿色通道折扣常量不走Hyperopt单独维护
GREEN_CHANNEL_DISCOUNT = 0.025 # 2.5% 固定折扣
# Hyperopt 可优化参数 - 基于初步结果调整范围
trend_final_bullish_threshold = IntParameter(20, 85, default=71, space="buy", optimize=True, load=True) # 降低上限,避免过于保守
trend_final_bearish_threshold = IntParameter(5, 45, default=21, space="buy", optimize=True, load=True) # 扩大下限,捕获更多熊市机会
# 趋势确认参数 - 增加交易频率
trend_confirmation_period = IntParameter(3, 15, default=8, space="buy", optimize=True, load=True) # 趋势确认周期
# 出场策略相关参数
exit_divergence_multiplier = DecimalParameter(1.0, 1.3, default=1.06, space="sell", optimize=True, load=True)
exit_adx_threshold = IntParameter(15, 35, default=20, space="sell", optimize=True, load=True)
exit_rsi_threshold = IntParameter(55, 75, default=65, space="sell", optimize=True, load=True)
exit_stochrsi_threshold = IntParameter(60, 80, default=70, space="sell", optimize=True, load=True)
exit_adx_threshold_overbought = IntParameter(20, 35, default=25, space="sell", optimize=True, load=True)
exit_min_profit_threshold = DecimalParameter(0.01, 0.08, default=0.039, space="sell", optimize=True, load=True) # 提高最小盈利要求
exit_rapid_rise_bull = DecimalParameter(1.5, 4.0, default=2.0, space="sell", optimize=True, load=True)
exit_rapid_rise_bear = DecimalParameter(2.5, 5.0, default=3.5, space="sell", optimize=True, load=True)
exit_stochrsi_rapid = IntParameter(65, 85, default=70, space="sell", optimize=True, load=True)
exit_volume_threshold = DecimalParameter(0.5, 2.0, default=1.0, space="sell", optimize=True, load=True)
# 趋势状态相关出场参数
exit_bullish_trend_score_max = IntParameter(90, 98, default=95, space="sell", optimize=True, load=True)
exit_bullish_divergence_mult = DecimalParameter(1.1, 1.3, default=1.15, space="sell", optimize=True, load=True)
exit_bullish_rsi = IntParameter(70, 85, default=75, space="sell", optimize=True, load=True)
exit_bullish_stochrsi = IntParameter(80, 90, default=85, space="sell", optimize=True, load=True)
exit_bullish_adx = IntParameter(25, 40, default=30, space="sell", optimize=True, load=True)
exit_bullish_return = DecimalParameter(3.0, 7.0, default=5.0, space="sell", optimize=True, load=True)
exit_bullish_stochrsi_rapid = IntParameter(80, 90, default=85, space="sell", optimize=True, load=True)
exit_bearish_divergence_mult = DecimalParameter(0.8, 1.0, default=0.9, space="sell", optimize=True, load=True)
exit_bearish_rsi = IntParameter(55, 65, default=60, space="sell", optimize=True, load=True)
exit_bearish_stochrsi = IntParameter(65, 75, default=70, space="sell", optimize=True, load=True)
exit_bearish_adx = IntParameter(15, 25, default=20, space="sell", optimize=True, load=True)
exit_bearish_return = DecimalParameter(1.0, 3.0, default=1.5, space="sell", optimize=True, load=True)
exit_bearish_stochrsi_rapid = IntParameter(70, 85, default=75, space="sell", optimize=True, load=True)
exit_ranging_trend_score_max = IntParameter(95, 99, default=98, space="sell", optimize=True, load=True)
exit_ranging_trend_score_threshold = IntParameter(80, 90, default=85, space="sell", optimize=True, load=True)
# 🎯 入场折扣率参数可通过hyperopt优化
entry_discount_bull_normal = DecimalParameter(0.005, 0.03, default=0.010, decimals=3, space="buy", optimize=True, load=True) # 牛市正常通道折扣 (默认1%)
entry_discount_ranging = DecimalParameter(0.001, 0.02, default=0.0075, decimals=3, space="buy", optimize=True, load=True) # 震荡市折扣 (默认0.75%)
entry_discount_bearish = DecimalParameter(0.001, 0.015, default=0.005, decimals=3, space="buy", optimize=True, load=True) # 熊市折扣 (默认0.5%)
# 🎯 加仓策略参数(统一标准,移除趋势判断)
ADD_PROGRESSION_FACTOR = 1.09 # 递进系数每次加仓阈值递增1.09倍
# --- 🛠️ 固定配置参数 ---
stoploss = -0.15
timeframe = "3m"
use_custom_stoploss = True
position_adjustment_enable = True # 启用动态仓位调整
trailing_stop = True
trailing_stop_positive = 0.0125 # 🎯 降低锁定利润到2.5%
trailing_stop_positive_offset = 0.04 # 🎯 降低启动阈值到8%
trailing_only_offset_is_reached = False
minimal_roi = {
"0": 0.06, # 30分钟0-30分钟8% 盈利退出
"30": 0.04, # 2小时30-120分钟4% 盈利退出
"90": 0.025, # 4小时120-240分钟2% 盈利退出
"270": 0.002 # 8小时240-480分钟0% 盈利退出
}
plot_config = {
"main_plot": {
"ema200": {"color": "blue"},
"bb_upperband": {"color": "gray"},
"bb_lowerband": {"color": "gray"},
},
"subplots": {
"Signals": {
"enter_long": {"color": "green", "type": "scatter"},
"exit_long": {"color": "red", "type": "scatter"}
},
"Indicators": {
"&-price_value_divergence": {"color": "purple"},
"volume_z_score": {"color": "orange"},
"rsi": {"color": "cyan"},
"stochrsi_k": {"color": "magenta"},
},
"Bearish Signals": {
"bearish_signal": {"type": "bar", "color": "red"},
"stochrsi_overbought": {"type": "bar", "color": "orange"},
}
}
}
freqai_info = {
"identifier": "freqai_primer_mixed_v7", # 更新标识符以强制重新训练
"feature_parameters": {
"include_timeframes": ["3m", "15m", "1h"],
"label_period_candles": 12,
"include_shifted_candles": 3,
"include_corr_pairlist": [],
"indicator_periods_candles": [10, 20, 50],
},
"data_split_parameters": {
"test_size": 0.2,
"random_state": 42,
"shuffle": False,
},
"model_training_parameters": {
"price_value_divergence": {
"model": "LightGBMRegressor",
"model_params": {
"n_estimators": 200,
"learning_rate": 0.08,
"num_leaves": 25,
"max_depth": 7,
"min_child_samples": 8,
"subsample": 0.85,
"colsample_bytree": 0.85,
"reg_alpha": 0.08,
"reg_lambda": 0.08,
"verbose": -1,
}
},
"optimal_first_length": {
"model": "LightGBMClassifier",
"model_params": {
"n_estimators": 200,
"learning_rate": 0.08,
"num_leaves": 25,
"max_depth": 7,
"min_child_samples": 8,
"subsample": 0.85,
"colsample_bytree": 0.85,
"reg_alpha": 0.08,
"reg_lambda": 0.08,
"class_weight": "balanced",
"verbose": -1,
}
},
"market_regime": {
"model": "LightGBMClassifier",
"model_params": {
"n_estimators": 200,
"learning_rate": 0.08,
"num_leaves": 25,
"max_depth": 7,
"min_child_samples": 8,
"subsample": 0.85,
"colsample_bytree": 0.85,
"reg_alpha": 0.08,
"reg_lambda": 0.08,
"class_weight": "balanced",
"verbose": -1,
}
}
},
"fit_live_predictions_candles": 100,
"live_retrain_candles": 100,
}
@staticmethod
def linear_map(value, from_min, from_max, to_min, to_max):
return (value - from_min) / (from_max - from_min) * (to_max - to_min) + to_min
def __init__(self, config: dict, *args, **kwargs):
super().__init__(config, *args, **kwargs)
# 读取 Redis 配置(不存储客户端实例,避免序列化问题)
self.redis_url = config.get('redis', {}).get('url', None)
self.stats_logged = False
# 验证Redis配置但不创建持久化客户端
if self.redis_url:
logger.info(f"Redis 配置已启用: {self.redis_url}")
else:
logger.info(" 未找到 Redis 配置,使用本地缓存模式")
self.trailing_stop_enabled = False
self.pair_stats = {}
self.fit_live_predictions_candles = self.freqai_info.get("fit_live_predictions_candles", 100)
self.last_entry_time = {} # 记录每个币种的最后入场时间
# 绿色通道开关直接使用全局变量
self.green_channel_enabled = self.GREEN_CHANNEL_ENABLED
logger.info(f"🟢 绿色通道开关状态: {'开启' if self.green_channel_enabled else '关闭'}(通过全局变量控制)")
def _get_redis_client(self):
"""延迟初始化Redis客户端避免序列化问题"""
if not self.redis_url:
return None
try:
import redis
client = redis.from_url(self.redis_url)
# 快速测试连接
client.ping()
return client
except Exception:
return None
def feature_engineering_expand_all(self, dataframe: DataFrame, period: int, metadata: dict, **kwargs) -> DataFrame:
dataframe["%-rsi-period"] = ta.RSI(dataframe, timeperiod=period)
dataframe["%-sma-period"] = ta.SMA(dataframe, timeperiod=period)
dataframe["%-ema-period"] = ta.EMA(dataframe, timeperiod=period)
real = ta.TYPPRICE(dataframe)
upperband, middleband, lowerband = ta.BBANDS(real, timeperiod=period, nbdevup=2.0, nbdevdn=2.0)
dataframe["bb_lowerband-period"] = lowerband
dataframe["bb_upperband-period"] = upperband
dataframe["bb_middleband-period"] = middleband
dataframe["%-bb_width-period"] = (dataframe["bb_upperband-period"] - dataframe["bb_lowerband-period"]) / dataframe["bb_middleband-period"]
dataframe["%-mfi-period"] = ta.MFI(dataframe, timeperiod=period)
dataframe["%-adx-period"] = ta.ADX(dataframe, timeperiod=period)
dataframe["%-relative_volume-period"] = dataframe["volume"] / dataframe["volume"].rolling(period).mean()
dataframe["ema200"] = ta.EMA(dataframe, timeperiod=200)
dataframe["%-price_value_divergence"] = (dataframe["close"] - dataframe["ema200"]) / dataframe["ema200"]
# 新增:市场状态/波动率制度相关特征
dataframe["%-atr-period"] = ta.ATR(dataframe, timeperiod=period)
dataframe["%-bb_width"] = upperband - lowerband
dataframe["%-bb_width_ratio"] = dataframe["%-bb_width"] / middleband * 100 # Bollinger Band宽度百分比
dataframe["%-volatility_regime"] = dataframe["%-bb_width_ratio"].rolling(20).mean() # 波动率制度
dataframe["%-regime_stability"] = dataframe["%-volatility_regime"].rolling(10).std() # 制度稳定性
dataframe["%-price_efficiency"] = abs(dataframe["close"] - dataframe["ema200"]) / dataframe["ema200"] # 价格效率指标
columns_to_clean = [
"%-rsi-period", "%-mfi-period", "%-sma-period", "%-ema-period", "%-adx-period",
"bb_lowerband-period", "bb_middleband-period", "bb_upperband-period",
"%-bb_width-period", "%-relative_volume-period", "%-price_value_divergence",
"%-atr-period", "%-bb_width", "%-bb_width_ratio", "%-volatility_regime",
"%-regime_stability", "%-price_efficiency"
]
for col in columns_to_clean:
dataframe[col] = dataframe[col].replace([np.inf, -np.inf], 0).ffill().fillna(0)
pair = metadata.get('pair', 'Unknown')
logger.info(f"[{pair}] 特征工程完成,列:{list(dataframe.columns)}")
return dataframe
def set_freqai_targets(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame:
pair = metadata.get('pair', 'Unknown')
if len(dataframe) < 200:
logger.warning(f"[{pair}] 数据量不足({len(dataframe)}根K线需要至少200根K线进行训练")
return dataframe
dataframe["ema200"] = ta.EMA(dataframe, timeperiod=200)
dataframe["&-price_value_divergence"] = (dataframe["close"] - dataframe["ema200"]) / dataframe["ema200"]
dataframe["volume_mean_20"] = dataframe["volume"].rolling(20).mean()
dataframe["volume_std_20"] = dataframe["volume"].rolling(20).std()
dataframe["volume_z_score"] = (dataframe["volume"] - dataframe["volume_mean_20"]) / dataframe["volume_std_20"]
dataframe["volume_z_score"] = dataframe["volume_z_score"].replace([np.inf, -np.inf], 0).ffill().fillna(0)
# 严格清理price_value_divergence确保数据类型正确
price_divergence = dataframe["&-price_value_divergence"].replace([np.inf, -np.inf], 0).ffill().fillna(0)
dataframe["&-price_value_divergence"] = price_divergence.astype(np.float64)
# 验证数据类型和范围
if not np.isfinite(dataframe["&-price_value_divergence"]).all():
logger.warning(f"[{pair}] price_value_divergence包含非有限值进行清理")
dataframe["&-price_value_divergence"] = dataframe["&-price_value_divergence"].replace([np.inf, -np.inf, np.nan], 0.0)
# 新增:市场状态/波动率制度分类目标
# 基于Bollinger Band宽度、波动率持续性、价格效率等多维度评估市场状态
# 0: 低波动震荡 1: 正常趋势 2: 高波动趋势 3: 极端波动 4: 黑天鹅状态
# 计算Bollinger Band宽度作为波动率制度指标
bb_upper, bb_middle, bb_lower = ta.BBANDS(dataframe['close'], timeperiod=20, nbdevup=2.0, nbdevdn=2.0)
bb_width_ratio = pd.Series((bb_upper - bb_lower) / bb_middle * 100, index=dataframe.index)
bb_width_ratio = bb_width_ratio.replace([np.inf, -np.inf], 0)
# 波动率持续性指标(制度稳定性)
volatility_regime = bb_width_ratio.rolling(20).mean()
regime_stability = volatility_regime.rolling(10).std()
# 价格效率指标(偏离长期均线的程度)
price_efficiency = abs(dataframe['close'] - dataframe['ema200']) / dataframe['ema200']
# 综合市场状态评分
market_state_score = (
volatility_regime * 0.5 + # 波动率制度权重50%
regime_stability * 0.3 + # 制度稳定性权重30%
price_efficiency * 0.2 # 价格效率权重20%
)
# 市场状态分类(基于波动率制度)
state_conditions = [
market_state_score < 2.0, # 低波动震荡
market_state_score < 4.0, # 正常趋势
market_state_score < 6.0, # 高波动趋势
market_state_score < 8.0, # 极端波动
]
state_choices = [0, 1, 2, 3]
dataframe["&*-market_regime"] = np.select(state_conditions, state_choices, default=4).astype(np.int32)
# 清理市场状态值
if not np.isfinite(dataframe["&*-market_regime"]).all():
logger.warning(f"[{pair}] market_regime包含非有限值进行清理")
dataframe["&*-market_regime"] = dataframe["&*-market_regime"].replace([np.inf, -np.inf, np.nan], 2).astype(np.int32)
logger.info(f"[{pair}] 市场状态分布: {dict(pd.Series(dataframe['&*-market_regime']).value_counts())}")
# 新增:分类模型优化 first_length
# 基于市场状态预测最优的 first_length 分类2, 4, 6, 8, 10
# 使用技术指标和市场波动率作为特征
atr = ta.ATR(dataframe, timeperiod=14)
volatility_ratio = atr / dataframe['close']
# 定义市场状态特征
trend_strength = abs(dataframe['close'] - dataframe['ema200']) / dataframe['ema200']
volume_anomaly = abs(dataframe['volume_z_score']) > 2
# 基于市场状态确定最优 first_length 类别
# 0: 激进(2) 1: 中性(4) 2: 稳健(6) 3: 保守(8) 4: 极保守(10)
conditions = [
(volatility_ratio > 0.02) & (trend_strength > 0.05), # 高波动+强趋势 -> 激进
(volatility_ratio > 0.015) & (trend_strength > 0.03), # 中高波动+中强趋势 -> 中性
(volatility_ratio > 0.01) | (volume_anomaly), # 中等波动或成交量异常 -> 稳健
(volatility_ratio < 0.008) & (trend_strength < 0.02), # 低波动+弱趋势 -> 保守
]
choices = [0, 1, 2, 3]
# 确保分类目标列是整数类型,添加严格验证
try:
optimal_length_class = np.select(conditions, choices, default=4)
# 确保所有值都是有效的整数
optimal_length_class = np.clip(optimal_length_class, 0, 4) # 限制在0-4范围内
dataframe["&*-optimal_first_length"] = optimal_length_class.astype(np.int32)
# 验证分类值的有效性
unique_values = np.unique(optimal_length_class)
if not all(0 <= val <= 4 for val in unique_values):
logger.warning(f"[{pair}] 发现无效的first_length分类值: {unique_values}")
dataframe["&*-optimal_first_length"] = np.clip(optimal_length_class, 0, 4).astype(np.int32)
except Exception as e:
logger.error(f"[{pair}] 计算分类目标时出错: {e}")
dataframe["&*-optimal_first_length"] = np.full(len(dataframe), 2, dtype=np.int32) # 默认值为2
# 添加调试信息
optimal_length_series = pd.Series(optimal_length_class)
logger.info(f"[{pair}] 📈 分类目标统计: {dict(optimal_length_series.value_counts())}")
logger.info(f"[{pair}] 🎯 最新分类值: {optimal_length_series.iloc[-5:]}")
return dataframe
def is_stochrsi_overbought(self, dataframe: DataFrame, period=10, threshold=85) -> bool:
"""
判断当前 STOCHRSI 是否在过去 N 根 K 线中平均高于阈值(如 85
"""
if 'stochrsi_k' not in dataframe.columns:
# 如果列不存在,返回全 False 的 Series
return pd.Series([False] * len(dataframe), index=dataframe.index)
# 计算滚动平均值并判断是否超过阈值
avg_stochrsi = dataframe['stochrsi_k'].rolling(window=period).mean()
return avg_stochrsi > threshold
def is_bearish_market(self, dataframe: DataFrame, metadata: dict, timeframe: str = "1h") -> pd.Series:
pair = metadata.get('pair', 'Unknown')
logger.info(f"[{pair}] 开始计算 1h 熊市信号")
# 防御性检查
required_columns = ['close_1h', 'high_1h', 'low_1h', 'open_1h', 'stochrsi_k_1h', 'stochrsi_d_1h', 'bb_middle_1h']
missing = [col for col in required_columns if col not in dataframe.columns]
if missing:
logger.error(f"[{pair}] 缺少必要列:{missing},返回全 False")
return pd.Series([False] * len(dataframe), index=dataframe.index)
# 检查 volume_z_score_1h 是否存在
has_volume_z_score = 'volume_z_score_1h' in dataframe.columns and not dataframe['volume_z_score_1h'].isna().all()
# 条件 a价格远低于布林带中轨低于中轨 1% 以上)
cond_a = dataframe['close_1h'] < dataframe['bb_middle_1h'] * 0.99
# 条件 bSTOCHRSI 超过90
cond_b = dataframe['stochrsi_k_1h'] > 90
# 条件 c看跌蜡烛图形态
open_1h = dataframe['open_1h']
close_1h = dataframe['close_1h']
high_1h = dataframe['high_1h']
low_1h = dataframe['low_1h']
prev_open = open_1h.shift(1)
prev_close = close_1h.shift(1)
cond_engulfing = (
(prev_close > prev_open) &
(close_1h < open_1h) &
(close_1h < prev_open) &
(open_1h > prev_close)
)
cond_dark_cloud_cover = (
(prev_close > prev_open) &
(open_1h > prev_close) &
(close_1h < (prev_open + prev_close) / 2) &
(close_1h < open_1h)
)
body = abs(open_1h - close_1h)
upper_wick = high_1h - np.maximum(open_1h, close_1h)
lower_wick = np.minimum(open_1h, close_1h) - low_1h
cond_shooting_star = (upper_wick > 2 * body) & (lower_wick < body) & (close_1h < open_1h)
cond_stochrsi_high = self.is_stochrsi_overbought(dataframe, period=10, threshold=85)
cond_c = cond_engulfing | cond_dark_cloud_cover | cond_shooting_star | cond_stochrsi_high
# 条件 d成交量显著下降
cond_d = dataframe['volume_z_score_1h'] < -1.0 if has_volume_z_score else pd.Series([False] * len(dataframe), index=dataframe.index)
# 综合熊市信号(至少满足两个条件)
bearish_signal = (cond_a & cond_b) | (cond_a & cond_c) | (cond_b & cond_c) | (cond_a & cond_d)
# 记录每个条件的触发情况
logger.info(f"[{pair}] 熊市信号 - 条件a (价格低于布林中轨): {cond_a.iloc[-1]}")
logger.info(f"[{pair}] 熊市信号 - 条件b (STOCHRSI>90): {cond_b.iloc[-1]}")
logger.info(f"[{pair}] 熊市信号 - 条件c (看跌形态): {cond_c.iloc[-1]}")
logger.info(f"[{pair}] 熊市信号 - 条件d (成交量下降): {cond_d.iloc[-1]}")
# 使用向量化操作计算得分避免使用iloc[-1]进行业务逻辑判断
bearish_score_vector = (cond_a.astype(int) + cond_b.astype(int) + cond_c.astype(int) + cond_d.astype(int)) * 25
# 仅在日志中使用最后一行的值(这是允许的用途)
if len(dataframe) > 0:
bearish_score = bearish_score_vector.iloc[-1]
logger.info(f"[{pair}] 熊市信号总得分: {bearish_score}/100")
else:
bearish_score = 0
return bearish_signal
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
计算主时间框架3m和 1h 时间框架的指标,并映射到主 dataframe。
包含 FreqAI 预测、布林带、RSI、成交量 Z 分数等,并确保 1h 数据列完整性。
"""
pair = metadata.get('pair', 'Unknown')
original_length = len(dataframe)
original_index = dataframe.index
logger.info(f"[{pair}] populate_indicators 开始处理,原始数据长度:{original_length}")
logger.info(f"[{pair}] 当前可用列调用FreqAI前{list(dataframe.columns)}")
# 计算主时间框架3m指标
dataframe["ema200"] = ta.EMA(dataframe, timeperiod=200)
dataframe["price_value_divergence"] = (dataframe["close"] - dataframe["ema200"]) / dataframe["ema200"]
upperband, middleband, lowerband = ta.BBANDS(dataframe["close"], timeperiod=20, nbdevup=2.0, nbdevdn=2.0)
dataframe["bb_upperband"] = upperband
dataframe["bb_middleband"] = middleband
dataframe["bb_lowerband"] = lowerband
dataframe["rsi"] = ta.RSI(dataframe, timeperiod=14)
dataframe["volume_mean_20"] = dataframe["volume"].rolling(20).mean()
dataframe["volume_std_20"] = dataframe["volume"].rolling(20).std()
dataframe["volume_z_score"] = (dataframe["volume"] - dataframe["volume_mean_20"]) / dataframe["volume_std_20"]
# 计算主时间框架的 STOCHRSI
stochrsi = ta.STOCHRSI(dataframe, timeperiod=14, fastk_period=3, fastd_period=3)
dataframe["stochrsi_k"] = stochrsi["fastk"]
dataframe["stochrsi_d"] = stochrsi["fastd"]
# 获取 1h 时间框架数据
dataframe_1h = self.dp.get_pair_dataframe(pair=pair, timeframe="1h")
if dataframe_1h.empty or len(dataframe_1h) < 50:
logger.warning(f"[{pair}] 1h 数据为空或不足({len(dataframe_1h)} 根K线初始化空列")
for col in ['open_1h', 'high_1h', 'low_1h', 'close_1h', 'stochrsi_k_1h', 'stochrsi_d_1h',
'bb_upper_1h', 'bb_middle_1h', 'bb_lower_1h', 'volume_z_score_1h']:
dataframe[col] = 0.0 # 使用0.0代替np.nan避免NaN值
else:
# 计算 1h 指标
if len(dataframe_1h) >= 20: # 确保有足够数据计算 rolling(20)
stochrsi_1h = ta.STOCHRSI(dataframe_1h, timeperiod=14, fastk_period=3, fastd_period=3)
dataframe_1h['stochrsi_k'] = stochrsi_1h['fastk']
dataframe_1h['stochrsi_d'] = stochrsi_1h['fastd']
real = ta.TYPPRICE(dataframe_1h)
upperband, middleband, lowerband = ta.BBANDS(real, timeperiod=20, nbdevup=2.0, nbdevdn=2.0)
dataframe_1h['bb_upper_1h'] = upperband
dataframe_1h['bb_middle_1h'] = middleband
dataframe_1h['bb_lower_1h'] = lowerband
dataframe_1h['volume_mean_20_1h'] = dataframe_1h["volume"].rolling(20).mean()
dataframe_1h['volume_std_20_1h'] = dataframe_1h["volume"].rolling(20).std()
dataframe_1h['volume_z_score_1h'] = (dataframe_1h["volume"] - dataframe_1h['volume_mean_20_1h']) / dataframe_1h['volume_std_20_1h']
# 清理 NaN 和无穷值
dataframe_1h['volume_z_score_1h'] = dataframe_1h['volume_z_score_1h'].replace([np.inf, -np.inf], 0).ffill().fillna(0)
else:
logger.warning(f"[{pair}] 1h 数据不足以计算 volume_z_score_1h{len(dataframe_1h)} 根K线需至少20根")
dataframe_1h['volume_z_score_1h'] = 0.0
# 映射 1h 数据到主时间框架
for col in ['open', 'high', 'low', 'close', 'stochrsi_k', 'stochrsi_d', 'bb_upper_1h', 'bb_middle_1h', 'bb_lower_1h', 'volume_z_score_1h']:
if col in dataframe_1h.columns:
dataframe[col if col.endswith('_1h') else f"{col}_1h"] = dataframe_1h[col].reindex(dataframe.index, method='ffill').bfill()
else:
logger.warning(f"[{pair}] 1h 数据缺少列 {col},初始化为空")
dataframe[col if col.endswith('_1h') else f"{col}_1h"] = 0.0
# 数据清理:处理 NaN 和无穷值,确保数据完整性
critical_columns = ["ema200", "bb_upperband", "bb_middleband", "bb_lowerband", "rsi", "volume_z_score",
"&-price_value_divergence", "price_value_divergence", "open_1h", "high_1h", "low_1h",
"close_1h", "stochrsi_k_1h", "stochrsi_d_1h", "bb_upper_1h", "bb_middle_1h", "bb_lower_1h",
"volume_z_score_1h", "stochrsi_k", "stochrsi_d", "close", "volume"]
for col in critical_columns:
if col in dataframe.columns:
# 处理无穷值和NaN值
dataframe[col] = dataframe[col].replace([np.inf, -np.inf], 0)
# 前向填充然后用0填充剩余NaN
dataframe[col] = dataframe[col].ffill().fillna(0)
# 确保数据类型正确
dataframe[col] = pd.to_numeric(dataframe[col], errors='coerce').fillna(0)
# 验证并修复DataFrame长度一致性
final_length = len(dataframe)
if final_length != original_length:
logger.warning(f"[{pair}] DataFrame长度不匹配: 原始{original_length} vs 当前{final_length}")
# 重新索引确保长度一致
dataframe = dataframe.reindex(original_index)
# 填充任何NaN值
dataframe = dataframe.fillna(0)
logger.info(f"[{pair}] 已修复DataFrame长度当前长度: {len(dataframe)}")
# 最终验证
if len(dataframe) != original_length:
logger.error(f"[{pair}] 严重错误无法修复DataFrame长度不匹配问题")
# 强制截断或填充到原始长度
if len(dataframe) > original_length:
dataframe = dataframe.iloc[:original_length]
else:
# 填充缺失的行
missing_rows = original_length - len(dataframe)
logger.warning(f"[{pair}] 需要填充{missing_rows}行缺失数据")
# 使用最后一个有效值填充
last_row = dataframe.iloc[-1:] if len(dataframe) > 0 else dataframe.iloc[0:1]
for _ in range(missing_rows):
dataframe = pd.concat([dataframe, last_row])
dataframe = dataframe.iloc[:original_length]
dataframe.index = original_index
# 调用 FreqAI 预测 - 使用单一回归模型
if not hasattr(self, 'freqai') or self.freqai is None:
logger.error(f"[{pair}] FreqAI 未初始化,回退到规则计算")
dataframe["&-price_value_divergence"] = dataframe["price_value_divergence"]
else:
logger.info(f"[{pair}] 调用 FreqAI 预测,类型:{type(self.freqai)}")
try:
dataframe = self.freqai.start(dataframe, metadata, self)
# 检查 FreqAI 生成的列
freqai_columns = [col for col in dataframe.columns if col.startswith('&')]
logger.info(f"[{pair}] FreqAI 生成的列: {freqai_columns}")
if "&-price_value_divergence" not in dataframe.columns:
logger.warning(f"[{pair}] FreqAI 未生成 &-price_value_divergence回退到规则计算")
dataframe["&-price_value_divergence"] = dataframe["price_value_divergence"]
except Exception as e:
logger.error(f"[{pair}] FreqAI 预测失败: {e}")
dataframe["&-price_value_divergence"] = dataframe["price_value_divergence"]
# 计算 labels_mean 和 labels_std
labels_mean = None
labels_std = None
try:
model_base_dir = os.path.join(self.config["user_data_dir"], "models", self.freqai_info["identifier"])
pair_base = pair.split('/')[0] if '/' in pair else pair
sub_dirs = glob.glob(os.path.join(model_base_dir, f"sub-train-{pair_base}_*"))
if not sub_dirs:
logger.warning(f"[{pair}] 未找到任何子目录:{model_base_dir}/sub-train-{pair_base}_*")
else:
latest_sub_dir = max(sub_dirs, key=lambda x: int(x.split('_')[-1]))
pair_base_lower = pair_base.lower()
timestamp = latest_sub_dir.split('_')[-1]
metadata_file = os.path.join(latest_sub_dir, f"cb_{pair_base_lower}_{timestamp}_metadata.json")
if os.path.exists(metadata_file):
with open(metadata_file, "r") as f:
metadata = json.load(f)
labels_mean = metadata["labels_mean"]["&-price_value_divergence"]
labels_std = metadata["labels_std"]["&-price_value_divergence"]
logger.info(f"[{pair}] 从最新子目录 {latest_sub_dir} 读取 labels_mean{labels_mean}, labels_std{labels_std}")
else:
logger.warning(f"[{pair}] 最新的 metadata.json 文件 {metadata_file} 不存在")
except Exception as e:
logger.warning(f"[{pair}] 无法从子目录读取 labels_mean 和 labels_std{e},重新计算")
if labels_mean is None or labels_std is None:
logger.warning(f"[{pair}] 无法获取 labels_mean 和 labels_std重新计算")
dataframe["&-price_value_divergence_actual"] = (dataframe["close"] - dataframe["ema200"]) / dataframe["ema200"]
dataframe["&-price_value_divergence_actual"] = dataframe["&-price_value_divergence_actual"].replace([np.inf, -np.inf], 0).ffill().fillna(0)
recent_data = dataframe["&-price_value_divergence_actual"].tail(self.fit_live_predictions_candles)
labels_mean = recent_data.mean()
labels_std = recent_data.std()
if np.isnan(labels_std) or labels_std == 0:
labels_std = 0.01
logger.warning(f"[{pair}] labels_std 计算异常,使用默认值 0.01")
# 根据市场趋势得分动态调整买卖阈值
market_trend_score = self.get_market_trend(dataframe=dataframe, metadata={'pair': pair})
k_buy = self.linear_map(market_trend_score, 0, 100, 1.2, 0.8)
k_sell = self.linear_map(market_trend_score, 0, 100, 1.5, 1.0)
# 处理分类模型的预测值
if "&*-optimal_first_length" in dataframe.columns:
dataframe["optimal_first_length_pred"] = dataframe["&*-optimal_first_length"]
logger.info(f"[{pair}] ✅ 找到并复制分类模型预测列: &*-optimal_first_length -> optimal_first_length_pred")
logger.info(f"[{pair}] 分类预测值统计: {dataframe['&*-optimal_first_length'].value_counts().to_dict()}")
# 确保列存在
if "optimal_first_length_pred" in dataframe.columns:
logger.info(f"[{pair}] ✅ optimal_first_length_pred 列已创建最后5个值: {dataframe['optimal_first_length_pred'].tail().tolist()}")
else:
logger.error(f"[{pair}] ❌ optimal_first_length_pred 列创建失败")
else:
logger.warning(f"[{pair}] 未找到分类模型预测列 &*-optimal_first_length")
# 创建默认列
dataframe["optimal_first_length_pred"] = 2.0
logger.info(f"[{pair}] 创建默认 optimal_first_length_pred 列,值=2.0")
self.buy_threshold = labels_mean - k_buy * labels_std
self.sell_threshold = labels_mean + k_sell * labels_std
# 使用固定参数限制阈值
self.buy_threshold = max(self.buy_threshold, self.BUY_THRESHOLD_MIN)
self.buy_threshold = min(self.buy_threshold, self.BUY_THRESHOLD_MAX)
self.sell_threshold = min(self.sell_threshold, self.SELL_THRESHOLD_MAX)
self.sell_threshold = max(self.sell_threshold, self.SELL_THRESHOLD_MIN)
# 调试日志
# 处理市场状态预测
if "&*-market_regime" in dataframe.columns:
market_regime = dataframe["&*-market_regime"].iloc[-1] if len(dataframe) > 0 else 2
# 基于市场状态调整仓位和止损
regime_multipliers = {
0: {"position": 1.5, "stoploss": 0.8, "label": "🟢低波动震荡"}, # 低波动震荡:加大仓位,收紧止损
1: {"position": 1.2, "stoploss": 0.9, "label": "🔵正常趋势"}, # 正常趋势:适中仓位
2: {"position": 1.0, "stoploss": 1.0, "label": "🟡高波动趋势"}, # 高波动趋势:标准参数
3: {"position": 0.7, "stoploss": 1.2, "label": "🟠极端波动"}, # 极端波动:减小仓位,放宽止损
4: {"position": 0.5, "stoploss": 1.5, "label": "🔴黑天鹅状态"}, # 黑天鹅状态:最小仓位,最宽止损
}
regime_config = regime_multipliers.get(market_regime, regime_multipliers[2])
# 应用到策略参数
self.risk_position_multiplier = regime_config["position"]
self.risk_stoploss_multiplier = regime_config["stoploss"]
logger.info(f"[{pair}] {regime_config['label']} - 仓位倍数: {self.risk_position_multiplier}, "
f"止损倍数: {self.risk_stoploss_multiplier}")
else:
# 默认值
self.risk_position_multiplier = 1.0
self.risk_stoploss_multiplier = 1.0
logger.info(f"[{pair}] 市场趋势得分:{market_trend_score}, labels_mean{labels_mean:.4f}, labels_std{labels_std:.4f}")
logger.info(f"[{pair}] k_buy{k_buy:.2f}, k_sell{k_sell:.2f}")
logger.info(f"[{pair}] 动态买入阈值:{self.buy_threshold:.4f}, 卖出阈值:{self.sell_threshold:.4f}")
logger.info(f"[{pair}] 最新数据 - close: {dataframe['close'].iloc[-1]:.6f}, "
f"rsi: {dataframe['rsi'].iloc[-1]:.2f}, "
f"&-price_value_divergence: {dataframe['&-price_value_divergence'].iloc[-1]:.6f}, "
f"volume_z_score: {dataframe['volume_z_score'].iloc[-1]:.2f}, "
f"bb_lowerband: {dataframe['bb_lowerband'].iloc[-1]:.6f}, "
f"market_regime: {dataframe['&*-market_regime'].iloc[-1] if '&*-market_regime' in dataframe else 'N/A'}")
if not self.stats_logged:
logger.info("===== 所有币对的 labels_mean 和 labels_std 汇总 =====")
for p, stats in self.pair_stats.items():
logger.info(f"[{p}] labels_mean{stats['labels_mean']:.4f}, labels_std{stats['labels_std']:.4f}")
logger.info("==============================================")
self.stats_logged = True
return dataframe
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
pair = metadata.get('pair', 'Unknown')
original_length = len(dataframe)
original_index = dataframe.index
conditions = []
logger.info(f"[{pair}] populate_entry_trend 被调用,原始数据长度:{original_length},时间:{dataframe.index[-1]}")
# 获取市场趋势得分
trend_score = self.get_market_trend(dataframe=dataframe, metadata=metadata)
# 动态调整成交量和 RSI 阈值
volume_z_score_min = 0.5
volume_z_score_max = 1.5
volume_z_score_threshold = self.linear_map(trend_score, 0, 100, volume_z_score_max, volume_z_score_min)
rsi_min = 35
rsi_max = 55
rsi_threshold = self.linear_map(trend_score, 0, 100, rsi_max, rsi_min)
stochrsi_min = 25
stochrsi_max = 45
stochrsi_threshold = self.linear_map(trend_score, 0, 100, stochrsi_max, stochrsi_min)
# 计算熊市信号和 STOCHRSI 超买信号
bearish_signal = self.is_bearish_market(dataframe, metadata, timeframe="1h")
bearish_signal_aligned = bearish_signal.reindex(dataframe.index, method='ffill').fillna(False)
stochrsi_overbought = self.is_stochrsi_overbought(dataframe, period=10, threshold=85)
stochrsi_overbought_aligned = stochrsi_overbought.reindex(dataframe.index, method='ffill').fillna(True)
# 检测趋势状态
trend_status = self.detect_trend_status(dataframe, metadata)
# 根据趋势状态调整入场策略
if "&-price_value_divergence" in dataframe.columns:
# 检测当前持仓订单数量(用于绿色通道判断)
open_trades = len(self.active_trades) if hasattr(self, 'active_trades') else 0
is_green_channel = (trend_status == "bullish" and open_trades <= 2 and self.GREEN_CHANNEL_ENABLED)
# 获取市场状态并调整入场条件
market_regime = dataframe["&*-market_regime"].iloc[-1] if "&*-market_regime" in dataframe.columns else 2
# 市场状态影响入场严格程度
regime_adjustments = {
0: {"threshold_mult": 1.5, "strict_mult": 0.7}, # 低波动震荡:更宽松入场
1: {"threshold_mult": 1.2, "strict_mult": 0.8}, # 正常趋势:较宽松
2: {"threshold_mult": 1.0, "strict_mult": 1.0}, # 高波动趋势:标准
3: {"threshold_mult": 0.8, "strict_mult": 1.2}, # 极端波动:更严格
4: {"threshold_mult": 0.6, "strict_mult": 1.5}, # 黑天鹅状态:最严格
}
regime_adj = regime_adjustments.get(market_regime, regime_adjustments[2])
logger.info(f"[{pair}] 市场状态: {market_regime}, 阈值调整: {regime_adj['threshold_mult']}, 严格度调整: {regime_adj['strict_mult']}")
# 为每个入场信号定义详细的标签
entry_tag = ""
if is_green_channel:
# 🟢 牛市绿色通道持仓≤2个25USDT入场5条件需要满足4个
cond1 = (dataframe["&-price_value_divergence"] < self.buy_threshold * 1.8) # 超宽松偏离度
cond2 = (dataframe["volume_z_score"] > volume_z_score_threshold * 0.4) # 超低成交量要求
cond3 = (dataframe["rsi"] < rsi_threshold * 1.4) # 超宽松RSI
cond4 = (dataframe["close"] <= dataframe["bb_upperband"] * 1.05) # 允许上轨附近
cond5 = (dataframe["stochrsi_k"] < stochrsi_threshold * 1.4) # 超宽松STOCHRSI
core_conditions = [cond1, cond2, cond3, cond4, cond5]
# 使用向量化操作计算满足条件的数量
satisfied_count_vector = cond1.astype(int) + cond2.astype(int) + cond3.astype(int) + cond4.astype(int) + cond5.astype(int)
buy_condition = satisfied_count_vector >= 4
entry_tag = "bull_green_channel"
# 仅在日志中使用最后一行的值
if len(dataframe) > 0:
satisfied_count = satisfied_count_vector.iloc[-1]
logger.info(f"[{pair}] 🟢 牛市绿色通道:持仓{open_trades}≤2个25USDT入场5条件需要满足{satisfied_count}/4个")
elif trend_status == "bullish":
# 牛市正常通道:持仓>2个75USDT入场必须满足全部7个条件
cond1 = (dataframe["&-price_value_divergence"] < self.buy_threshold * 1.5) # 放宽到1.5倍
cond2 = (dataframe["volume_z_score"] > volume_z_score_threshold * 0.7) # 降低成交量要求
cond3 = (dataframe["rsi"] < rsi_threshold * 1.2) # 放宽RSI要求
cond4 = (dataframe["close"] <= dataframe["bb_upperband"]) # 可以在上轨附近入场
cond5 = (dataframe["stochrsi_k"] < stochrsi_threshold * 1.2) # 放宽STOCHRSI要求
cond6 = pd.Series([True] * len(dataframe), index=dataframe.index) # 取消熊市过滤
cond7 = pd.Series([True] * len(dataframe), index=dataframe.index) # 取消超买过滤
buy_condition = cond1 & cond2 & cond3 & cond4 & cond5 & cond6 & cond7
entry_tag = "bull_normal"
logger.info(f"[{pair}] 🚀 牛市正常通道:持仓{open_trades}>2个75USDT入场必须满足全部7个条件")
elif trend_status == "bearish":
# 下跌趋势:严格入场条件,只抄底
cond1 = (dataframe["&-price_value_divergence"] < self.buy_threshold * 0.7) # 严格到0.7倍
cond2 = (dataframe["volume_z_score"] > volume_z_score_threshold * 1.3) # 提高成交量要求
cond3 = (dataframe["rsi"] < rsi_threshold * 0.8) # 严格RSI要求
cond4 = (dataframe["close"] <= dataframe["bb_lowerband"] * 0.95) # 必须跌破下轨
cond5 = (dataframe["stochrsi_k"] < stochrsi_threshold * 0.8) # 严格STOCHRSI要求
cond6 = ~bearish_signal_aligned # 保持熊市过滤
cond7 = ~stochrsi_overbought_aligned # 保持超买过滤
buy_condition = cond1 & cond2 & cond3 & cond4 & cond5 & cond6 & cond7
entry_tag = "bearish"
logger.info(f"[{pair}] 📉 下跌趋势策略:严格入场条件")
else: # ranging
# 震荡趋势:使用原策略
cond1 = (dataframe["&-price_value_divergence"] < self.buy_threshold)
cond2 = (dataframe["volume_z_score"] > volume_z_score_threshold)
cond3 = (dataframe["rsi"] < rsi_threshold)
cond4 = (dataframe["close"] <= dataframe["bb_lowerband"])
cond5 = (dataframe["stochrsi_k"] < stochrsi_threshold)
cond6 = ~bearish_signal_aligned
cond7 = ~stochrsi_overbought_aligned
buy_condition = cond1 & cond2 & cond3 & cond4 & cond5 & cond6 & cond7
entry_tag = "ranging"
logger.info(f"[{pair}] ⚖️ 震荡趋势策略:标准入场条件")
# 绿色通道和趋势状态的条件已经设置好buy_condition
conditions.append(buy_condition)
# 🎯 计算信号强度评分 (0-100分) 并作为tag传递
signal_strength = 0.0
if len(dataframe) > 0:
# 基础评分:趋势得分影响
base_score = trend_score * 0.4 # 趋势得分占40%
# 条件满足评分
if is_green_channel:
# 绿色通道基础分60 + 条件满足加分
satisfied_count = int(cond1.iloc[-1]) + int(cond2.iloc[-1]) + int(cond3.iloc[-1]) + int(cond4.iloc[-1]) + int(cond5.iloc[-1])
signal_strength = 60 + (satisfied_count * 8) + base_score
elif trend_status == "bullish":
# 牛市正常基础分50 + 条件满足加分
satisfied_count = int(cond1.iloc[-1]) + int(cond2.iloc[-1]) + int(cond3.iloc[-1]) + int(cond4.iloc[-1]) + int(cond5.iloc[-1])
signal_strength = 50 + (satisfied_count * 7) + base_score * 1.2
elif trend_status == "bearish":
# 熊市基础分40 + 超跌加分
satisfied_count = int(cond1.iloc[-1]) + int(cond2.iloc[-1]) + int(cond3.iloc[-1]) + int(cond4.iloc[-1]) + int(cond5.iloc[-1])
oversold_bonus = max(0, (100 - trend_score) * 0.3)
signal_strength = 40 + (satisfied_count * 8) + oversold_bonus
else: # ranging
# 震荡市基础分45 + 条件满足加分
satisfied_count = int(cond1.iloc[-1]) + int(cond2.iloc[-1]) + int(cond3.iloc[-1]) + int(cond4.iloc[-1]) + int(cond5.iloc[-1])
signal_strength = 45 + (satisfied_count * 8) + base_score
# 限制在0-100分范围
signal_strength = max(0, min(100, signal_strength))
# 存储信号强度到dataframe
dataframe.loc[buy_condition, 'signal_strength'] = signal_strength
dataframe.loc[buy_condition, 'immediate_entry'] = signal_strength >= 75 # 75分以上立即入场
# 调试日志 - 仅在日志中使用最后一行的值
if len(dataframe) > 0:
divergence_value = dataframe["&-price_value_divergence"].iloc[-1]
volume_z_score_value = dataframe["volume_z_score"].iloc[-1]
rsi_value = dataframe["rsi"].iloc[-1]
stochrsi_value = dataframe["stochrsi_k"].iloc[-1]
bb_close_value = dataframe["close"].iloc[-1]
bb_lower_value = dataframe["bb_lowerband"].iloc[-1]
bb_upper_value = dataframe["bb_upperband"].iloc[-1]
# 输出关键指标以便调试
logger.info(f"[{pair}] 📊 关键指标 - 偏离度: {divergence_value:.6f}, RSI: {rsi_value:.2f}, STOCHRSI: {stochrsi_value:.2f}, 价格: {bb_close_value:.4f}, 下轨: {bb_lower_value:.4f}, 上轨: {bb_upper_value:.4f}")
logger.info(f"[{pair}] 🔍 买入阈值 - 偏离度: {self.buy_threshold:.6f}, RSI: {rsi_threshold:.2f}, STOCHRSI: {stochrsi_threshold:.2f}")
# 获取条件结果的最后一行值(仅用于日志)
cond1_last = cond1.iloc[-1]
cond2_last = cond2.iloc[-1]
cond3_last = cond3.iloc[-1]
cond4_last = cond4.iloc[-1]
cond5_last = cond5.iloc[-1]
cond6_last = cond6.iloc[-1] if trend_status != "bullish" or open_trades > 2 else False
cond7_last = cond7.iloc[-1] if trend_status != "bullish" or open_trades > 2 else False
# 定义条件名称和状态(仅用于日志)
conditions_summary = [
("&-price_value_divergence", divergence_value, "<", self.buy_threshold, cond1_last),
("volume_z_score", volume_z_score_value, ">", volume_z_score_threshold, cond2_last),
("rsi", rsi_value, "<", rsi_threshold, cond3_last),
("close <= bb_lowerband", bb_close_value, "<=", bb_lower_value, cond4_last),
("stochrsi_k", stochrsi_value, "<", stochrsi_threshold, cond5_last),
]
# 根据趋势状态添加对应的条件6和7
if trend_status != "bullish" or open_trades > 2:
conditions_summary.extend([
("非熊市", None, None, None, cond6_last),
("STOCHRSI未持续超买", None, None, None, cond7_last),
])
# 输出每个条件的状态
logger.info(f"[{pair}] === 买入条件检查 ===")
failed_conditions = []
for name, value, operator, threshold, result in conditions_summary:
status = "" if result else ""
if value is not None and threshold is not None:
logger.info(f"[{pair}] {status} {name}: {value:.6f} {operator} {threshold:.6f}")
else:
logger.info(f"[{pair}] {status} {name}")
if not result:
failed_conditions.append(name)
else:
logger.warning(f"[{pair}] &-price_value_divergence 列缺失,跳过买入信号生成")
if conditions:
combined_condition = reduce(lambda x, y: x & y, conditions)
dataframe.loc[combined_condition, 'enter_long'] = 1
dataframe.loc[combined_condition, 'entry_tag'] = entry_tag
# 🎯 记录实际入场信号到Redis只在enter_long=1时
enter_long_indices = dataframe[dataframe['enter_long'] == 1].index
if len(enter_long_indices) > 0:
# 获取最后一根K线的信号数据避免重复记录
last_idx = enter_long_indices[-1]
try:
# 获取主机名
hostname = socket.gethostname()
# 将币对名从BTC/USDT格式转换为BTC-USDT格式
pair_redis = pair.replace('/', '-')
# 获取当前时间戳(毫秒)
timestamp_ms = int(time.time() * 1000)
# 构建Redis key: ${主机名}_${币对名}_entry_${时间戳}
redis_key = f"{hostname}_{pair_redis}_entry_{timestamp_ms}"
# 获取Redis客户端
redis_client = self._get_redis_client()
if redis_client:
# 准备要存储的信号数据
signal_data = {
'signal_strength': float(dataframe.loc[last_idx, 'signal_strength']) if 'signal_strength' in dataframe.columns else 0.0,
'entry_tag': str(dataframe.loc[last_idx, 'entry_tag']) if 'entry_tag' in dataframe.columns else str(entry_tag),
'trend_score': float(trend_score),
'market_regime': str(market_regime),
'timestamp': timestamp_ms,
'pair': str(pair),
'hostname': str(hostname),
'immediate_entry': bool(dataframe.loc[last_idx, 'immediate_entry']) if 'immediate_entry' in dataframe.columns else False
}
# 将信号数据序列化为JSON字符串
signal_json = json.dumps(signal_data)
# 存储到Redis设置24小时过期时间
redis_client.setex(redis_key, 86400, signal_json)
logger.info(f"[Redis] ✅ 实际入场信号已记录: {redis_key}, 信号强度: {signal_data['signal_strength']:.2f}, 趋势: {signal_data['entry_tag']}")
else:
logger.debug(f"[Redis] ⚠️ Redis客户端不可用跳过记录: {pair}")
except Exception as e:
logger.error(f"[Redis] ❌ 记录入场信号失败: {e}")
# 输出每个条件的状态
logger.info(f"[{pair}] === 买入条件检查 ===")
satisfied_conditions = []
for name, value, operator, threshold, result in conditions_summary:
status = "" if result else ""
if value is not None and threshold is not None:
logger.info(f"[{pair}] {status} {name}: {value:.6f} {operator} {threshold:.6f}")
else:
logger.info(f"[{pair}] {status} {name}")
if result:
satisfied_conditions.append(name)
# 总结满足的条件
if combined_condition.any():
logger.info(f"[{pair}] ✅ 买入信号触发,满足条件: {', '.join(satisfied_conditions)},趋势得分:{trend_score:.2f}")
else:
logger.info(f"[{pair}] ❌ 买入条件未满足")
else:
logger.info(f"[{pair}] 无有效买入条件")
# 记录各条件触发率
logger.info(f"[{pair}] 各条件触发率 - "
f"cond1: {cond1.mean():.2%}, "
f"cond2: {cond2.mean():.2%}, "
f"cond3: {cond3.mean():.2%}, "
f"cond4: {cond4.mean():.2%}, "
f"cond5: {cond5.mean():.2%}, "
f"buy_condition: {buy_condition.mean():.2%}")
# 记录 enter_long 信号统计
enter_long_count = dataframe['enter_long'].sum() if 'enter_long' in dataframe.columns else 0
logger.info(f"[{pair}] enter_long 信号总数:{enter_long_count}")
# 数据完整性检查和长度验证修复
if len(dataframe) > 0:
# 验证数据长度一致性
current_length = len(dataframe)
if current_length != original_length:
logger.warning(f"[{pair}] ⚠️ DataFrame长度不匹配原始长度: {original_length}, 当前长度: {current_length}")
# 修复数据长度
if current_length < original_length:
# 数据行数不足,需要填充
missing_rows = original_length - current_length
logger.info(f"[{pair}] 填充缺失的 {missing_rows} 行数据...")
# 创建缺失行的索引
missing_index = original_index.difference(dataframe.index)
if len(missing_index) > 0:
# 用最后一行的数据填充缺失行
last_row = dataframe.iloc[-1:].copy()
filled_rows = pd.DataFrame(index=missing_index)
for col in dataframe.columns:
filled_rows[col] = last_row[col].iloc[0] if len(last_row) > 0 else 0
# 合并数据
dataframe = pd.concat([dataframe, filled_rows])
dataframe = dataframe.reindex(original_index)
logger.info(f"[{pair}] 数据填充完成,新长度: {len(dataframe)}")
elif current_length > original_length:
# 数据行数过多,截断到原始长度
excess_rows = current_length - original_length
logger.info(f"[{pair}] 截断多余的 {excess_rows} 行数据...")
dataframe = dataframe.iloc[:original_length].copy()
dataframe = dataframe.reindex(original_index)
logger.info(f"[{pair}] 数据截断完成,新长度: {len(dataframe)}")
else:
# 长度一致但索引可能不同,重新对齐索引
dataframe = dataframe.reindex(original_index)
logger.info(f"[{pair}] 索引重新对齐完成,长度: {len(dataframe)}")
# 处理NaN值
nan_columns = [col for col in dataframe.columns if dataframe[col].isna().any()]
if nan_columns:
logger.warning(f"[{pair}] 发现NaN值的列: {nan_columns}")
for col in nan_columns:
nan_count = dataframe[col].isna().sum()
if nan_count > 0:
logger.warning(f"[{pair}] 列 {col}{nan_count} 个NaN值正在清理...")
# 使用前向填充,然后零填充
dataframe[col] = dataframe[col].ffill().fillna(0)
# 最终验证
final_length = len(dataframe)
if final_length != original_length:
logger.error(f"[{pair}] ❌ 数据长度修复失败!期望: {original_length}, 实际: {final_length}")
# 强制截断或填充到原始长度
if final_length > original_length:
dataframe = dataframe.iloc[:original_length]
elif final_length < original_length:
# 复制最后一行填充
last_row = dataframe.iloc[-1:].copy()
while len(dataframe) < original_length:
dataframe = pd.concat([dataframe, last_row])
dataframe = dataframe.iloc[:original_length]
logger.info(f"[{pair}] 强制修复完成,最终长度: {len(dataframe)}")
else:
logger.info(f"[{pair}] ✅ 数据长度验证通过,最终长度: {final_length}")
return dataframe
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
pair = metadata.get('pair', 'Unknown')
# 记录原始数据长度和索引
original_length = len(dataframe)
original_index = dataframe.index
logger.info(f"[{pair}] populate_exit_trend 开始处理,原始数据长度:{original_length}")
conditions = []
if "&-price_value_divergence" in dataframe.columns:
# 计算额外指标StochRSI、ADX 和短期价格变化
stochrsi = ta.STOCHRSI(dataframe, timeperiod=14, fastk_period=3, fastd_period=3)
dataframe["stochrsi_k"] = stochrsi["fastk"]
dataframe["adx"] = ta.ADX(dataframe, timeperiod=14)
# 计算短期价格涨幅(最近 5 根 K 线,约 15 分钟)
dataframe["short_term_return"] = dataframe["close"].pct_change(5, fill_method=None) * 100 # 百分比回报
# 清理新计算列的NaN值
for col in ["stochrsi_k", "adx", "short_term_return"]:
if col in dataframe.columns:
dataframe[col] = dataframe[col].replace([np.inf, -np.inf], 0).ffill().fillna(0)
# 获取市场趋势得分
trend_score = self.get_market_trend(dataframe=dataframe, metadata={'pair': pair})
# 条件 1高阈值 &-price_value_divergence
cond1 = (
(dataframe["&-price_value_divergence"] > self.sell_threshold * float(self.exit_divergence_multiplier.value)) &
(dataframe["adx"] > float(self.exit_adx_threshold.value)) # 趋势强度过滤
)
# 条件 2超买信号
cond2 = (
(dataframe["rsi"] > float(self.exit_rsi_threshold.value)) &
(dataframe["stochrsi_k"] > float(self.exit_stochrsi_threshold.value)) & # StochRSI 超买
(dataframe["adx"] > float(self.exit_adx_threshold_overbought.value)) # 趋势强度
)
# 条件 3快速拉升退出
# 检测最近 5 根 K 线(约 15 分钟)涨幅超过阈值,且有最低利润要求
min_profit = float(self.exit_min_profit_threshold.value) # 最低利润要求
rapid_rise_threshold = self.linear_map(trend_score, 0, 100, float(self.exit_rapid_rise_bear.value), float(self.exit_rapid_rise_bull.value))
cond3 = (
(dataframe["short_term_return"] > rapid_rise_threshold) & # 短期快速拉升
(dataframe["close"] / dataframe["close"].shift(5) - 1 > min_profit) & # 确保最低利润
(dataframe["stochrsi_k"] > float(self.exit_stochrsi_rapid.value)) & # 超买确认
(dataframe["volume_z_score"] > float(self.exit_volume_threshold.value))
)
# 检测趋势状态
trend_status = self.detect_trend_status(dataframe, metadata)
# 根据趋势状态调整出场策略
if trend_status == "bullish":
# 上涨趋势:严格出场条件,让利润奔跑
if trend_score > float(self.exit_bullish_trend_score_max.value):
logger.info(f"[{pair}] 🚀 强劲上涨趋势,拒绝卖出")
return dataframe
# 上涨趋势下需要更强的卖出信号
cond1_bullish = (dataframe["&-price_value_divergence"] > self.sell_threshold * float(self.exit_bullish_divergence_mult.value))
cond2_bullish = (dataframe["rsi"] > float(self.exit_bullish_rsi.value)) & (dataframe["stochrsi_k"] > float(self.exit_bullish_stochrsi.value)) & (dataframe["adx"] > float(self.exit_bullish_adx.value))
cond3_bullish = (dataframe["short_term_return"] > float(self.exit_bullish_return.value)) & (dataframe["stochrsi_k"] > float(self.exit_bullish_stochrsi_rapid.value))
sell_condition = (cond1_bullish & cond2_bullish) | (cond1_bullish & cond3_bullish) | (cond2_bullish & cond3_bullish)
logger.info(f"[{pair}] 🚀 上涨趋势策略:严格出场条件")
elif trend_status == "bearish":
# 下跌趋势:宽松出场条件,快速止盈止损
cond1_bearish = (dataframe["&-price_value_divergence"] > self.sell_threshold * float(self.exit_bearish_divergence_mult.value))
cond2_bearish = (dataframe["rsi"] > float(self.exit_bearish_rsi.value)) & (dataframe["stochrsi_k"] > float(self.exit_bearish_stochrsi.value)) & (dataframe["adx"] > float(self.exit_bearish_adx.value))
cond3_bearish = (dataframe["short_term_return"] > float(self.exit_bearish_return.value)) & (dataframe["stochrsi_k"] > float(self.exit_bearish_stochrsi_rapid.value))
sell_condition = cond1_bearish | cond2_bearish | cond3_bearish # 任一条件即可卖出
logger.info(f"[{pair}] 📉 下跌趋势策略:宽松出场条件")
else: # ranging
# 震荡趋势:使用原策略
if trend_score > float(self.exit_ranging_trend_score_max.value):
logger.info(f"[{pair}] ⚖️ 震荡趋势但得分较高,拒绝卖出")
return dataframe
if trend_score > float(self.exit_ranging_trend_score_threshold.value):
sell_condition = (cond1 & cond2) | (cond1 & cond3) | (cond2 & cond3) # 中等趋势,至少两个条件满足
logger.info(f"[{pair}] ⚖️ 震荡趋势策略:标准出场条件")
else:
sell_condition = cond1 | cond2 | cond3 # 弱势趋势,任一条件满足
logger.info(f"[{pair}] ⚖️ 弱势震荡,任一条件满足")
conditions.append(sell_condition)
# 调试日志 - 仅在日志中使用最后一行的值(这是允许的用途)
if len(dataframe) > 0:
divergence_value = dataframe["&-price_value_divergence"].iloc[-1] if not dataframe["&-price_value_divergence"].isna().iloc[-1] else 0.0
rsi_value = dataframe["rsi"].iloc[-1] if not dataframe["rsi"].isna().iloc[-1] else 0.0
stochrsi_value = dataframe["stochrsi_k"].iloc[-1] if not dataframe["stochrsi_k"].isna().iloc[-1] else 0.0
adx_value = dataframe["adx"].iloc[-1] if not dataframe["adx"].isna().iloc[-1] else 0.0
short_term_return = dataframe["short_term_return"].iloc[-1] if not dataframe["short_term_return"].isna().iloc[-1] else 0.0
# 获取条件结果的最后一行值(仅用于日志)
cond1_last = cond1.iloc[-1]
cond2_last = cond2.iloc[-1]
cond3_last = cond3.iloc[-1]
logger.info(f"[{pair}] 卖出条件检查 - "
f"&-price_value_divergence={divergence_value:.6f} > {self.sell_threshold * float(self.exit_divergence_multiplier.value):.6f}: {cond1_last}, "
f"rsi={rsi_value:.2f} > {float(self.exit_rsi_threshold.value)} & stochrsi_k={stochrsi_value:.2f} > {float(self.exit_stochrsi_threshold.value)}: {cond2_last}, "
f"short_term_return={short_term_return:.2f}% > {rapid_rise_threshold:.2f}% & profit > {min_profit*100:.2f}%: {cond3_last}, "
f"adx={adx_value:.2f}, trend_score={trend_score:.2f}")
else:
logger.warning(f"[{pair}] ⚠️ &-price_value_divergence 列缺失,跳过该条件")
if len(conditions) > 0:
dataframe.loc[reduce(lambda x, y: x & y, conditions), 'exit_long'] = 1
logger.info(f"[{pair}] 出场信号触发,条件满足,趋势得分:{trend_score:.2f}")
else:
logger.info(f"[{pair}] 无有效卖出条件")
# 数据完整性检查和长度验证修复
if len(dataframe) > 0:
# 验证数据长度一致性
current_length = len(dataframe)
if current_length != original_length:
logger.warning(f"[{pair}] ⚠️ 卖出DataFrame长度不匹配原始长度: {original_length}, 当前长度: {current_length}")
# 修复数据长度
if current_length < original_length:
# 数据行数不足,需要填充
missing_rows = original_length - current_length
logger.info(f"[{pair}] 卖出填充缺失的 {missing_rows} 行数据...")
# 创建缺失行的索引
missing_index = original_index.difference(dataframe.index)
if len(missing_index) > 0:
# 用最后一行的数据填充缺失行
last_row = dataframe.iloc[-1:].copy()
filled_rows = pd.DataFrame(index=missing_index)
for col in dataframe.columns:
filled_rows[col] = last_row[col].iloc[0] if len(last_row) > 0 else 0
# 合并数据
dataframe = pd.concat([dataframe, filled_rows])
dataframe = dataframe.reindex(original_index)
logger.info(f"[{pair}] 卖出数据填充完成,新长度: {len(dataframe)}")
elif current_length > original_length:
# 数据行数过多,截断到原始长度
excess_rows = current_length - original_length
logger.info(f"[{pair}] 卖出截断多余的 {excess_rows} 行数据...")
dataframe = dataframe.iloc[:original_length].copy()
dataframe = dataframe.reindex(original_index)
logger.info(f"[{pair}] 卖出数据截断完成,新长度: {len(dataframe)}")
else:
# 长度一致但索引可能不同,重新对齐索引
dataframe = dataframe.reindex(original_index)
logger.info(f"[{pair}] 卖出索引重新对齐完成,长度: {len(dataframe)}")
# 处理NaN值
nan_columns = [col for col in dataframe.columns if dataframe[col].isna().any()]
if nan_columns:
logger.warning(f"[{pair}] 卖出检查 - 发现NaN值的列: {nan_columns}")
for col in nan_columns:
nan_count = dataframe[col].isna().sum()
if nan_count > 0:
logger.warning(f"[{pair}] 卖出检查 - 列 {col}{nan_count} 个NaN值正在清理...")
# 使用前向填充,然后零填充
dataframe[col] = dataframe[col].ffill().fillna(0)
# 最终验证
final_length = len(dataframe)
if final_length != original_length:
logger.error(f"[{pair}] ❌ 卖出数据长度修复失败!期望: {original_length}, 实际: {final_length}")
# 强制截断或填充到原始长度
if final_length > original_length:
dataframe = dataframe.iloc[:original_length]
elif final_length < original_length:
# 复制最后一行填充
last_row = dataframe.iloc[-1:].copy()
while len(dataframe) < original_length:
dataframe = pd.concat([dataframe, last_row])
dataframe = dataframe.iloc[:original_length]
logger.info(f"[{pair}] 卖出强制修复完成,最终长度: {len(dataframe)}")
else:
logger.info(f"[{pair}] ✅ 卖出数据长度验证通过,最终长度: {final_length}")
# 记录exit_long信号
exit_long_count = dataframe['exit_long'].sum() if 'exit_long' in dataframe.columns else 0
logger.info(f"[{pair}] exit_long 信号总数:{exit_long_count}")
return dataframe
def buy_space(self):
return [
DecimalParameter(-0.1, -0.01, name="buy_threshold_min"),
DecimalParameter(-0.02, -0.001, name="buy_threshold_max"),
DecimalParameter(-0.05, -0.01, name="add_position_threshold", default=-0.02),
IntParameter(1, 10, name="cooldown_period_minutes", default=5),
IntParameter(1, 3, name="max_entry_position_adjustment", default=2)
]
def sell_space(self):
return [
DecimalParameter(0.001, 0.02, name="sell_threshold_min"),
DecimalParameter(0.02, 0.1, name="sell_threshold_max"),
DecimalParameter(0.2, 0.7, name="exit_position_ratio", default=0.5),
# 出场策略参数
DecimalParameter(1.0, 1.3, name="exit_divergence_multiplier"),
IntParameter(15, 35, name="exit_adx_threshold"),
IntParameter(55, 75, name="exit_rsi_threshold"),
IntParameter(60, 80, name="exit_stochrsi_threshold"),
IntParameter(20, 35, name="exit_adx_threshold_overbought"),
DecimalParameter(0.005, 0.05, name="exit_min_profit_threshold"),
DecimalParameter(1.5, 4.0, name="exit_rapid_rise_bull"),
DecimalParameter(2.5, 5.0, name="exit_rapid_rise_bear"),
IntParameter(65, 85, name="exit_stochrsi_rapid"),
DecimalParameter(0.5, 2.0, name="exit_volume_threshold"),
# 趋势状态相关出场参数
IntParameter(90, 98, name="exit_bullish_trend_score_max"),
DecimalParameter(1.1, 1.3, name="exit_bullish_divergence_mult"),
IntParameter(70, 85, name="exit_bullish_rsi"),
IntParameter(80, 90, name="exit_bullish_stochrsi"),
IntParameter(25, 40, name="exit_bullish_adx"),
DecimalParameter(3.0, 7.0, name="exit_bullish_return"),
IntParameter(80, 90, name="exit_bullish_stochrsi_rapid"),
IntParameter(95, 99, name="exit_ranging_trend_score_max"),
IntParameter(80, 90, name="exit_ranging_trend_score_threshold"),
DecimalParameter(0.8, 1.0, name="exit_bearish_divergence_mult"),
IntParameter(55, 65, name="exit_bearish_rsi"),
IntParameter(65, 75, name="exit_bearish_stochrsi"),
IntParameter(15, 25, name="exit_bearish_adx"),
DecimalParameter(1.0, 3.0, name="exit_bearish_return"),
IntParameter(70, 85, name="exit_bearish_stochrsi_rapid")
]
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float,
min_stake: float | None, max_stake: float,
current_entry_rate: float, current_exit_rate: float,
current_entry_profit: float, current_exit_profit: float,
**kwargs) -> float | None | tuple[float | None, str | None]:
"""
统一标准的仓位调整策略:
- 移除所有趋势状态判断和趋势得分依赖
- 基于固定阈值和递进系数的加仓逻辑
- 每次加仓阈值递增:初次阈值 × (递进系数^加仓次数)
"""
pair = trade.pair
hold_time = (current_time - trade.open_date_utc).total_seconds() / 60
profit_ratio = (current_rate - trade.open_rate) / trade.open_rate
# 统一参数设置(移除趋势状态判断)
max_entry_adjustments = self.MAX_ENTRY_POSITION_ADJUSTMENT # 最大加仓次数
exit_position_ratio = self.EXIT_POSITION_RATIO # 减仓阈值
trailing_stop_start = self.TRAILING_STOP_START # 追踪止损启动阈值
trailing_stop_distance = self.TRAILING_STOP_DISTANCE # 追踪止损距离
# 统一入场金额(移除绿色通道判断)
initial_stake_amount = 75.0 # 固定75USDT入场
logger.info(f"{pair} 统一仓位管理: max_entries={max_entry_adjustments}, initial_stake={initial_stake_amount:.2f}")
# 🔢 计算递进加仓阈值
add_count = trade.nr_of_successful_entries - 1 # 已加仓次数
# 计算当前加仓阈值config中的ADD_POSITION_THRESHOLD × (递进系数^加仓次数)
current_add_threshold = self.ADD_POSITION_THRESHOLD * (self.ADD_PROGRESSION_FACTOR ** add_count)
# 计算从上一次加仓以来的跌幅(使用正确的基准)
if add_count > 0:
# 有加仓记录,使用最后一次加仓价作为基准
last_entry_price = trade.orders[-1].safe_price if trade.orders else trade.open_rate
drop_since_last_entry = (current_rate - last_entry_price) / last_entry_price
drop_baseline = drop_since_last_entry
logger.info(f"{pair}{add_count}次加仓后跌幅: {drop_since_last_entry*100:.2f}% (基于上次加仓价)")
else:
# 首次加仓,使用初始入场价
drop_baseline = profit_ratio
logger.info(f"{pair} 首次加仓跌幅: {profit_ratio*100:.2f}% (基于初始入场价)")
# 统一加仓逻辑(增加冷却期检查)
if trade.nr_of_successful_entries <= max_entry_adjustments + 1:
# 检查加仓冷却期至少10分钟
if trade.orders:
last_order_time = trade.orders[-1].order_date_utc
time_since_last_add = (current_time - last_order_time).total_seconds() / 60
if time_since_last_add < 10: # 10分钟冷却期
logger.info(f"{pair} 加仓冷却期中,{10 - time_since_last_add:.1f}分钟后可再次加仓")
return None
if drop_baseline <= current_add_threshold and hold_time > 5: # 使用正确跌幅基准
# 新的加仓逻辑:基于标准入场金额一半(37.5)的递进倍数
base_add_amount = initial_stake_amount / 2 # 37.5 USDT
multipliers = [1, 4, 8, 16] # 递进倍数序列 [首次, 第二次, 第三次, 第四次]
if add_count < len(multipliers):
multiplier = multipliers[add_count]
add_amount = base_add_amount * multiplier
if min_stake is not None and add_amount < min_stake:
logger.warning(f"{pair} 加仓金额 {add_amount:.2f} 低于最小下注金额 {min_stake:.2f}")
return (None, f"Add amount {add_amount:.2f} below min_stake {min_stake:.2f}")
if add_amount > max_stake:
add_amount = max_stake
logger.info(f"{pair}{add_count + 1} 次加仓触发: "
f"跌幅 {profit_ratio*100:.2f}% <= 阈值 {current_add_threshold*100:.2f}%, "
f"加仓金额 {add_amount:.2f} (基于37.5×{multiplier})")
return (add_amount, f"Add {add_count + 1}: Drop {profit_ratio*100:.2f}% <= {current_add_threshold*100:.2f}%, add {add_amount:.2f} (37.5×{multiplier})")
# 统一减仓逻辑(移除趋势判断)
if profit_ratio >= exit_position_ratio:
reduce_factor = 1.0 # 统一全部减仓
reduce_amount = -trade.stake_amount * reduce_factor
logger.info(f"{pair} 利润 {profit_ratio*100:.2f}% 达到减仓阈值 {exit_position_ratio*100:.2f}%,减仓 {abs(reduce_amount):.2f}")
return (reduce_amount, f"Profit {profit_ratio*100:.2f}% >= {exit_position_ratio*100:.2f}%, reduce {abs(reduce_amount):.2f}")
# 统一追踪止损逻辑(移除趋势判断)
if profit_ratio >= trailing_stop_start and not self.trailing_stop_enabled:
self.trailing_stop_enabled = True
trade.adjust_min_max_rates(current_rate, current_rate)
logger.info(f"{pair} 价格上涨 {profit_ratio*100:.2f}% 超过 {trailing_stop_start*100:.1f}%,启动追踪止损")
return None
if self.trailing_stop_enabled:
max_rate = trade.max_rate or current_rate
trailing_stop_price = max_rate * (1 - trailing_stop_distance)
if current_rate < trailing_stop_price:
logger.info(f"{pair} 价格回落至 {trailing_stop_price:.6f},触发全部卖出")
return (-trade.stake_amount, f"Trailing stop at {trailing_stop_price:.6f}")
trade.adjust_min_max_rates(current_rate, trade.min_rate)
return None
return None
def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float,
time_in_force: str, current_time: datetime, entry_tag: str | None = None, **kwargs) -> bool:
# 调试日志:记录输入参数
logger.info(f"[{pair}] confirm_trade_entry called with rate={rate}, type(rate)={type(rate)}, "
f"amount={amount}, order_type={order_type}, time_in_force={time_in_force}")
# 🔒 策略启动保护运行未满10分钟禁止交易
if not hasattr(self, 'strategy_start_time'):
self.strategy_start_time = current_time
strategy_run_time = (current_time - self.strategy_start_time).total_seconds() / 60
if strategy_run_time < 10: # 10分钟启动保护
logger.info(f"[{pair}] 策略启动保护中,运行{strategy_run_time:.1f}分钟 < 10分钟禁止交易")
return False
# 检查 rate 是否有效
if not isinstance(rate, (float, int)) or rate is None:
logger.error(f"[{pair}] Invalid rate value: {rate} (type: {type(rate)}). Skipping trade entry.")
return False
# 获取当前数据
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1]
market_trend_score = self.get_market_trend(dataframe=DataFrame, metadata={'pair': pair})
# 修正逻辑:趋势得分越低(熊市),冷却期越长;得分越高(牛市),冷却期越短
cooldown_period_minutes = self.COOLDOWN_PERIOD_MINUTES if market_trend_score < 50 else self.COOLDOWN_PERIOD_MINUTES // 2
if pair in self.last_entry_time:
last_time = self.last_entry_time[pair]
if (current_time - last_time).total_seconds() < cooldown_period_minutes * 60:
logger.info(f"[{pair}] 冷却期内({cooldown_period_minutes} 分钟),跳过本次入场")
return False
self.last_entry_time[pair] = current_time
self.trailing_stop_enabled = False
try:
logger.info(f"[{pair}] 确认入场,价格:{float(rate):.6f}")
except (ValueError, TypeError) as e:
logger.error(f"[{pair}] Failed to format rate: {rate} (type: {type(rate)}), error: {e}")
return False
return True
def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float,
rate: float, time_in_force: str, exit_reason: str,
current_time: datetime, **kwargs) -> bool:
adjusted_rate = rate * (1 + 0.0025)
# 增强exit tag显示包含更多信息
trade_profit = trade.calc_profit_ratio(rate)
hold_duration = (current_time - trade.open_date_utc).total_seconds() / 3600 # 小时
# 为不同exit原因添加详细tag
detailed_exit_reason = f"{exit_reason}_profit{trade_profit:.1%}_hold{hold_duration:.1f}h"
logger.info(f"[{pair}] 退出交易,原因:{detailed_exit_reason}, 原始利润:{trade_profit:.2%}, 调整后卖出价:{adjusted_rate:.6f}")
# 将详细信息添加到trade对象便于UI显示
if hasattr(trade, 'exit_reason_detailed'):
trade.exit_reason_detailed = detailed_exit_reason
return True
def custom_roi(self, trade: Trade, current_profit: float, current_time: datetime, trade_dur: int,
current_rate: float = None, min_stake: float | None = None, max_stake: float | None = None) -> dict:
"""
动态调整 ROI 表格,考虑加仓状态和盈亏状况。
- 加仓未完成时完全禁用ROI机制返回极高值
- 亏损状态下使用固定保守ROI
- 盈利状态下才启用动态调整
"""
pair = trade.pair
logger.info(f"[{pair}] 计算自定义 ROI当前盈利: {current_profit:.2%}, 持仓时间: {trade_dur} 分钟")
# 检查加仓次数是否用完
filled_entries = trade.select_filled_orders(trade.entry_side)
add_count = len(filled_entries) - 1 # 减去首笔入场
if add_count < MAX_ENTRY_POSITION_ADJUSTMENT:
# 加仓未完成完全禁用ROI机制
disabled_roi = {
0: 1.0, # 极高值,确保不会触发
999999: 1.0
}
logger.info(f"[{pair}] 加仓次数 {add_count}/{MAX_ENTRY_POSITION_ADJUSTMENT} 未完成禁用ROI机制")
return disabled_roi
# 加仓完成后才考虑ROI
if current_profit < 0:
# 亏损状态下使用保守ROI
conservative_roi = {
0: 0.08,
60: 0.05,
180: 0.03,
360: 0.01
}
logger.info(f"[{pair}] 加仓完成但亏损 {current_profit:.2%}使用保守ROI: {conservative_roi}")
return conservative_roi
# 盈利状态下启用动态调整
dataframe = self.dp.get_pair_dataframe(pair=pair, timeframe=self.timeframe)
dataframe = self.populate_indicators(dataframe, {'pair': pair})
divergence = dataframe["&-price_value_divergence"].iloc[-1] if "&-price_value_divergence" in dataframe else 0
rsi = dataframe["rsi"].iloc[-1] if "rsi" in dataframe else 50
divergence_factor = self.linear_map(divergence, -0.1, 0.1, 1.2, 0.8)
rsi_factor = self.linear_map(rsi, 30, 70, 1.2, 0.8)
time_factor = self.linear_map(trade_dur, 0, 240, 1.0, 0.7)
roi_factor = divergence_factor * rsi_factor * time_factor
base_roi = {
0: 0.06,
30: 0.04,
90: 0.025,
270: 0.002
}
dynamic_roi = {time: min(max(roi * roi_factor, 0.0), 0.2) for time, roi in base_roi.items()}
logger.info(f"[{pair}] 加仓完成且盈利动态ROI: {dynamic_roi}")
return dynamic_roi
def custom_entry_price(self, pair: str, trade: Trade | None, current_time: datetime, proposed_rate: float,
entry_tag: str | None, side: str, **kwargs) -> float:
"""根据入场标签动态调整入场价格
基于入场信号的判定条件,为不同的市场状态提供不同的价格折扣:
- bull_green_channel: 牛市绿色通道固定折扣不走Hyperopt
- bull_normal: 牛市正常通道Hyperopt优化
- ranging: 震荡市Hyperopt优化
- bearish: 熊市Hyperopt优化
Args:
pair: 交易对
trade: 交易对象
current_time: 当前时间
proposed_rate: 建议价格
entry_tag: 入场信号标签
side: 交易方向
**kwargs: 其他参数
Returns:
调整后的入场价格
"""
# 根据入场标签获取对应的折扣率
if entry_tag == "bull_green_channel":
discount = self.GREEN_CHANNEL_DISCOUNT # 牛市绿色通道固定折扣
logger.info(f"[{pair}] 🟢 牛市绿色通道入场,固定折扣: {discount*100:.2f}%")
elif entry_tag == "bull_normal":
discount = float(self.entry_discount_bull_normal.value) # 牛市正常通道折扣Hyperopt优化
logger.info(f"[{pair}] 🚀 牛市正常通道入场,折扣率: {discount*100:.2f}%")
elif entry_tag == "ranging":
discount = float(self.entry_discount_ranging.value) # 震荡市折扣Hyperopt优化
logger.info(f"[{pair}] ⚖️ 震荡市入场,折扣率: {discount*100:.2f}%")
elif entry_tag == "bearish":
discount = float(self.entry_discount_bearish.value) # 熊市折扣Hyperopt优化
logger.info(f"[{pair}] 📉 熊市入场,折扣率: {discount*100:.2f}%")
else:
discount = 0.0 # 无折扣
logger.info(f"[{pair}] 无入场标签,使用原价 {proposed_rate:.6f}")
adjusted_rate = proposed_rate * (1 - discount)
if discount > 0:
logger.info(f"[{pair}] 入场标签: {entry_tag}, 原价: {proposed_rate:.6f}, 调整后: {adjusted_rate:.6f}, 折扣: {discount*100:.2f}%")
return adjusted_rate
def custom_exit_price(self, pair: str, trade: Trade,
current_time: datetime, proposed_rate: float,
current_profit: float, exit_tag: str | None, **kwargs) -> float:
if trade.entry_side_count >= 2:
# 当加仓次数 >= 2 时,只允许止损退出
# 止损逻辑由Freqtrade自动处理无需显式检查
return None # 禁用其他退出信号
adjusted_rate = proposed_rate * (1 + 0.0025)
logger.info(f"[{pair}] 自定义卖出价:{adjusted_rate:.6f}(原价:{proposed_rate:.6f}")
return adjusted_rate
def get_market_trend(self, dataframe: DataFrame = None, metadata: dict = None) -> int:
try:
timeframes = ["3m", "15m", "1h"]
weights = {"3m": 0.3, "15m": 0.35, "1h": 0.35}
trend_scores = {}
pair = metadata.get('pair', 'Unknown') if metadata else 'Unknown'
# 检查 pair 是否有效
if pair == 'Unknown':
logger.error(f"[{pair}] Invalid pair in metadata: {metadata}. Returning default score 50")
return 50
logger.info(f"[{pair}] 正在计算多时间框架市场趋势得分")
for tf in timeframes:
# 优先使用传入的 dataframe如果匹配主时间框架否则加载目标币对数据
pair_df = dataframe if tf == self.timeframe and dataframe is not None else self.dp.get_pair_dataframe(pair=pair, timeframe=tf)
min_candles = 200 if tf == "3m" else 100 if tf == "15m" else 50
if pair_df.empty or len(pair_df) < min_candles:
logger.warning(f"[{pair}] 数据不足({tf}使用默认得分50")
trend_scores[tf] = 50
continue
# 价格趋势
ema_short_period = 50 if tf == "3m" else 20 if tf == "15m" else 12
ema_long_period = 200 if tf == "3m" else 80 if tf == "15m" else 50
pair_df["ema_short"] = ta.EMA(pair_df, timeperiod=ema_short_period)
pair_df["ema_long"] = ta.EMA(pair_df, timeperiod=ema_long_period)
pair_df["ema_short_slope"] = (pair_df["ema_short"] - pair_df["ema_short"].shift(10)) / pair_df["ema_short"].shift(10)
price_above_ema = pair_df["close"].iloc[-1] > pair_df["ema_long"].iloc[-1]
ema_short_above_ema_long = pair_df["ema_short"].iloc[-1] > pair_df["ema_long"].iloc[-1]
ema_short_slope = pair_df["ema_short_slope"].iloc[-1]
price_score = 0
if price_above_ema:
price_score += 20
if ema_short_above_ema_long:
price_score += 20
if ema_short_slope > 0.005:
price_score += 15
elif ema_short_slope < -0.005:
price_score -= 15
# K线形态
pair_df["bullish_engulfing"] = (
(pair_df["close"].shift(1) < pair_df["open"].shift(1)) &
(pair_df["close"] > pair_df["open"]) &
(pair_df["close"] > pair_df["open"].shift(1)) &
(pair_df["open"] < pair_df["close"].shift(1))
).fillna(False)
pair_df["bearish_engulfing"] = (
(pair_df["close"].shift(1) > pair_df["open"].shift(1)) &
(pair_df["close"] < pair_df["open"]) &
(pair_df["close"] < pair_df["open"].shift(1)) &
(pair_df["open"] > pair_df["close"].shift(1))
).fillna(False)
kline_score = 0
if pair_df["bullish_engulfing"].iloc[-1]:
kline_score += 15
elif pair_df["bearish_engulfing"].iloc[-1]:
kline_score -= 15
volatility = pair_df["close"].pct_change(10, fill_method=None).std() * 100
if volatility > 0.5:
kline_score += 10 if price_score > 0 else -10
# StochRSI
stochrsi = ta.STOCHRSI(pair_df, timeperiod=14, fastk_period=3, fastd_period=3)
pair_df["stochrsi_k"] = stochrsi["fastk"]
pair_df["stochrsi_d"] = stochrsi["fastd"]
stochrsi_score = 0
stochrsi_k = pair_df["stochrsi_k"].iloc[-1]
stochrsi_d = pair_df["stochrsi_d"].iloc[-1]
if stochrsi_k > 80 and stochrsi_k < stochrsi_d:
stochrsi_score -= 15
elif stochrsi_k < 20 and stochrsi_k > stochrsi_d:
stochrsi_score += 15
elif stochrsi_k > 50:
stochrsi_score += 5
elif stochrsi_k < 50:
stochrsi_score -= 5
# 量价关系
pair_df["volume_mean_20"] = pair_df["volume"].rolling(20).mean()
pair_df["volume_std_20"] = pair_df["volume"].rolling(20).std()
pair_df["volume_z_score"] = (pair_df["volume"] - pair_df["volume_mean_20"]) / pair_df["volume_std_20"]
pair_df["adx"] = ta.ADX(pair_df, timeperiod=14)
volume_score = 0
if pair_df["volume_z_score"].iloc[-1] > 1.5:
volume_score += 10 if price_score > 0 else -10
if pair_df["adx"].iloc[-1] > 25:
volume_score += 10 if price_score > 0 else -10
# 综合得分
raw_score = price_score + kline_score + stochrsi_score + volume_score
raw_score = max(min(raw_score, 50), -50)
# 对数映射到 [0, 100]
if raw_score >= 0:
mapped_score = 50 + 50 * (np.log1p(raw_score / 50) / np.log1p(1))
else:
mapped_score = 50 * (np.log1p(-raw_score / 50) / np.log1p(1))
trend_scores[tf] = max(0, min(100, int(round(mapped_score))))
logger.info(f"[{pair}] {tf} 趋势得分:{trend_scores[tf]}, 原始得分:{raw_score}, "
f"价格得分:{price_score}, K线得分{kline_score}, "
f"StochRSI得分{stochrsi_score}, 量价得分:{volume_score}")
# 动态调整权重
if trend_scores.get("1h", 50) - trend_scores.get("3m", 50) > 20 or trend_scores.get("15m", 50) - trend_scores.get("3m", 50) > 20:
weights = {"3m": 0.2, "15m": 0.35, "1h": 0.45}
logger.info(f"[{pair}] 1h 趋势得分({trend_scores.get('1h', 50)})显著高于 3m{trend_scores.get('3m', 50)}),调整权重为 {weights}")
# 加权融合
final_score = sum(trend_scores[tf] * weights[tf] for tf in timeframes)
final_score = int(round(final_score))
final_score = max(0, min(100, final_score))
logger.info(f"[{pair}] 最终趋势得分:{final_score}, "
f"3m得分{trend_scores.get('3m', 50)}, 15m得分{trend_scores.get('15m', 50)}, "
f"1h得分{trend_scores.get('1h', 50)}")
return final_score
except Exception as e:
logger.error(f"[{pair}] 获取市场趋势失败:{e}", exc_info=True)
return 50
def get_trend_score_with_cache(self, pair: str, timeframe: str, timestamp: int, dataframe: DataFrame, metadata: dict) -> float:
"""
三级缓存架构获取趋势得分:
1. 本地内存缓存(一级缓存)- 最快,无网络开销
2. Redis缓存二级缓存- 局域网共享,减少重复计算
3. 实时计算(三级缓存)- 最慢,但保证准确性
"""
# 初始化本地缓存(如果尚未初始化)
if not hasattr(self, '_local_cache'):
self._local_cache = {}
self._local_cache_stats = {'hits': 0, 'misses': 0, 'redis_hits': 0, 'computes': 0}
logger.info("🚀 初始化本地内存缓存")
# 创建 Redis 客户端(如果尚未创建)
if not hasattr(self, 'redis_client') and self.redis_url:
try:
self.redis_client = redis.from_url(self.redis_url)
logger.info("✅ Redis 客户端已成功初始化")
except Exception as e:
logger.error(f"❌ 初始化 Redis 客户端失败: {e}")
# Redis 失败时继续运行,降级为本地缓存
self.redis_client = None
# 生成统一的缓存键
strategy_name = "freqaiprimer"
timeframes_str = "3m-15m-1h"
# 将时间戳精度从秒改为分钟,去掉最后两位秒数
minute_timestamp = int(timestamp // 60) * 60 # 向下取整到分钟
cache_key = f"{strategy_name}|trend_score|{pair.replace('/', '-')}|{timeframes_str}|{minute_timestamp}"
logger.debug(f"[{pair}] 生成缓存键:{cache_key} (时间戳: {minute_timestamp})")
# 🎯 一级缓存:本地内存检查
if cache_key in self._local_cache:
cached_score = self._local_cache[cache_key]
self._local_cache_stats['hits'] += 1
# 只在调试模式下显示单个缓存命中,正常模式下只统计
logger.debug(f"[{pair}] 🟢 本地缓存命中key={cache_key}, value={cached_score:.2f}")
return cached_score
self._local_cache_stats['misses'] += 1
logger.debug(f"[{pair}] 本地缓存未命中key={cache_key}")
# 🎯 二级缓存Redis检查
redis_client = self._get_redis_client()
if redis_client:
logger.debug(f"[{pair}] 尝试从 Redis 查询趋势得分key={cache_key}")
try:
cached_score = redis_client.get(cache_key)
if cached_score is not None:
score = float(cached_score)
# 同时写入本地缓存,加速后续访问
self._local_cache[cache_key] = score
self._local_cache_stats['redis_hits'] += 1
logger.debug(f"[{pair}] 🟡 Redis 缓存命中key={cache_key}, value={score:.2f} (已同步到本地缓存)")
return score
else:
logger.debug(f"[{pair}] 🔴 Redis 缓存未命中key={cache_key}")
except Exception as e:
logger.error(f"[{pair}] Redis 查询失败key={cache_key}, 错误: {e}")
# Redis 失败时继续运行,降级为本地缓存
# 🎯 三级缓存:实时计算
logger.info(f"[{pair}] 开始计算趋势得分 (时间戳: {timestamp})")
trend_score = self.get_market_trend(dataframe=dataframe, metadata=metadata)
self._local_cache_stats['computes'] += 1
logger.info(f"[{pair}] ✅ 成功计算趋势得分: {trend_score:.2f}")
# 将结果写入两级缓存
self._local_cache[cache_key] = trend_score
redis_client = self._get_redis_client()
if redis_client:
logger.debug(f"[{pair}] 尝试将趋势得分写入 Rediskey={cache_key}, value={trend_score:.2f}")
try:
redis_client.setex(cache_key, 86400, trend_score)
logger.info(f"[{pair}] ✅ 成功将趋势得分存储到 Rediskey={cache_key}, value={trend_score:.2f}")
except Exception as e:
logger.error(f"[{pair}] Redis 写入失败key={cache_key}, 错误: {e}")
# Redis 失败时继续运行
# 定期打印缓存统计信息(更频繁地显示本轮命中统计)
total_requests = sum(self._local_cache_stats.values())
if total_requests % 50 == 0 and total_requests > 0: # 每50次显示一次统计
local_hits = self._local_cache_stats['hits']
redis_hits = self._local_cache_stats['redis_hits']
hit_rate = (local_hits + redis_hits) / total_requests * 100
logger.info(f"📊 本轮缓存统计 - 本地命中: {local_hits}次, "
f"Redis命中: {redis_hits}次, "
f"计算: {self._local_cache_stats['computes']}次, "
f"总命中率: {hit_rate:.1f}%")
return trend_score
def cleanup_local_cache(self):
"""
清理本地缓存,防止内存泄漏
"""
if hasattr(self, '_local_cache'):
cache_size = len(self._local_cache)
self._local_cache.clear()
logger.info(f"🧹 已清理本地缓存,清除条目数: {cache_size}")
def get_current_round_cache_hits(self) -> int:
"""
获取当前轮次的缓存命中总数(本地+Redis
"""
if hasattr(self, '_local_cache_stats'):
return self._local_cache_stats['hits'] + self._local_cache_stats['redis_hits']
return 0
if hasattr(self, '_local_cache_stats'):
total_requests = sum(self._local_cache_stats.values())
if total_requests > 0:
local_hits = self._local_cache_stats['hits']
redis_hits = self._local_cache_stats['redis_hits']
return {
'local_hits': local_hits,
'redis_hits': redis_hits,
'misses': self._local_cache_stats['misses'],
'computes': self._local_cache_stats['computes'],
'total_requests': total_requests,
'local_hit_rate': local_hits / total_requests * 100,
'redis_hit_rate': redis_hits / total_requests * 100,
'overall_hit_rate': (local_hits + redis_hits) / total_requests * 100,
'current_round_hits': local_hits + redis_hits # 当前轮次总命中数
}
return {'local_hits': 0, 'redis_hits': 0, 'misses': 0, 'computes': 0, 'total_requests': 0, 'current_round_hits': 0}
def bot_loop_start(self, current_time: datetime, **kwargs) -> None:
"""
每个交易循环开始时调用,用于自动清理本地缓存
"""
if hasattr(self, '_local_cache') and self._local_cache:
cache_size = len(self._local_cache)
# 每1000次循环清理一次或当缓存超过500条时清理
if not hasattr(self, '_loop_counter'):
self._loop_counter = 0
self._loop_counter += 1
if self._loop_counter % 1000 == 0 or cache_size > 500:
logger.info(f"🧹 自动清理本地缓存 - 循环次数: {self._loop_counter}, 缓存大小: {cache_size}")
self.cleanup_local_cache()
# 打印缓存统计
stats = self._local_cache_stats
total_requests = stats['hits'] + stats['misses']
if total_requests > 0:
local_hit_rate = (stats['hits'] / total_requests) * 100
logger.info(f"📊 缓存性能统计: 本地命中率 {local_hit_rate:.1f}%, "
f"计算次数: {stats['computes']}")
def bot_start(self, **kwargs) -> None:
"""
机器人启动时调用,初始化缓存相关设置
"""
logger.info("🚀 策略启动 - 初始化多级缓存系统")
# 初始化本地缓存
if not hasattr(self, '_local_cache'):
self._local_cache = {}
self._local_cache_stats = {'hits': 0, 'misses': 0, 'redis_hits': 0, 'computes': 0}
logger.info("✅ 本地内存缓存已初始化")
# 设置最大本地缓存大小
self._max_local_cache_size = 500
# 初始化循环计数器
self._loop_counter = 0
# 🎯 测试Redis连接
redis_client = self._get_redis_client()
if redis_client:
try:
redis_client.ping()
logger.info("✅ Redis连接测试成功 - 入场信号记录已启用")
except Exception as e:
logger.error(f"❌ Redis连接测试失败: {e}")
else:
logger.warning("⚠️ Redis客户端初始化失败入场信号将不会记录到Redis")
def bot_stop(self, **kwargs) -> None:
"""
机器人停止时调用,清理所有缓存
"""
logger.info("🛑 策略停止 - 清理所有缓存")
# 清理本地缓存
if hasattr(self, '_local_cache'):
cache_size = len(self._local_cache)
self.cleanup_local_cache()
# 打印最终统计
stats = self._local_cache_stats
total_requests = stats['hits'] + stats['misses']
if total_requests > 0:
hit_rate = (stats['hits'] / total_requests) * 100
logger.info(f"📊 最终缓存统计 - 总请求: {total_requests}, "
f"本地命中: {stats['hits']}, "
f"计算次数: {stats['computes']}, "
f"总命中率: {hit_rate:.1f}%")
# 注意由于使用延迟初始化无需关闭持久化的Redis连接
@property
def protections(self):
"""
保护机制配置
基于最新Freqtrade规范保护机制应定义在策略文件中而非配置文件
"""
return [
{
"method": "StoplossGuard",
"lookback_period_candles": 60, # 3小时回看期60根3分钟K线
"trade_limit": 2, # 最多2笔止损交易
"stop_duration_candles": 60, # 暂停180分钟60根3分钟K线
"only_per_pair": True # 仅针对单个币对
},
{
"method": "CooldownPeriod",
"stop_duration_candles": 2 # 6分钟冷却期2根3分钟K线
},
{
"method": "MaxDrawdown",
"lookback_period_candles": 48, # 2.4小时回看期
"trade_limit": 4, # 4笔交易限制
"stop_duration_candles": 24, # 72分钟暂停24根3分钟K线
"max_allowed_drawdown": 0.20 # 20%最大回撤容忍度
}
]
def detect_trend_status(self, dataframe: DataFrame, metadata: dict) -> str:
"""
基于分类模型优化 first_length 的趋势检测
规则:
- 使用 LightGBMClassifier 预测最优 first_length 类别
- 类别映射0:激进(2), 1:中性(4), 2:稳健(6), 3:保守(8), 4:极保守(10)
- 根据预测的类别动态调整段长而非固定使用2
"""
pair = metadata.get('pair', 'Unknown')
# 检查数据完整性
if len(dataframe) == 0:
logger.warning(f"[{pair}] ⚠️ 数据为空,返回震荡趋势")
return "ranging"
try:
# 使用分类模型预测最优 first_length
# 检查所有可能的分类列(精确匹配实际列名格式)
classification_cols = [col for col in dataframe.columns
if col in ["&*-optimal_first_length", "optimal_first_length_pred"]]
if classification_cols:
logger.info(f"[{pair}] 📊 检测到分类相关列: {classification_cols}")
else:
logger.debug(f"[{pair}] 📊 未检测到分类相关列")
# 调试:显示所有包含"optimal"的列名
all_optimal_cols = [col for col in dataframe.columns if "optimal" in str(col).lower()]
if all_optimal_cols:
logger.debug(f"[{pair}] 所有包含'optimal'的列: {all_optimal_cols}")
else:
logger.debug(f"[{pair}] 未找到任何包含'optimal'的列")
if "optimal_first_length_pred" in dataframe.columns:
predicted_value = dataframe["optimal_first_length_pred"].iloc[-1]
if pd.notna(predicted_value):
# 处理浮点预测值,四舍五入到最近的整数类别
optimal_length_class = int(round(float(predicted_value)))
length_mapping = {0: 2, 1: 4, 2: 6, 3: 8, 4: 10}
first_length = length_mapping.get(optimal_length_class, 2)
logger.info(f"[{pair}] ✅ 使用 optimal_first_length_pred: first_length={first_length} (原始值: {predicted_value}, 类别: {optimal_length_class})")
else:
first_length = 2
logger.warning(f"[{pair}] ⚠️ optimal_first_length_pred 为NaN使用默认值: {first_length}")
elif "&*-optimal_first_length" in dataframe.columns:
# 直接检查原始列
predicted_value = dataframe["&*-optimal_first_length"].iloc[-1]
if pd.notna(predicted_value):
optimal_length_class = int(round(float(predicted_value)))
length_mapping = {0: 2, 1: 4, 2: 6, 3: 8, 4: 10}
first_length = length_mapping.get(optimal_length_class, 2)
logger.info(f"[{pair}] ✅ 使用 &*-optimal_first_length: first_length={first_length} (原始值: {predicted_value}, 类别: {optimal_length_class})")
else:
first_length = 2
logger.warning(f"[{pair}] ⚠️ &*-optimal_first_length 为NaN使用默认值: {first_length}")
else:
first_length = 2
# 只在首次或特定条件下显示警告
if not hasattr(self, '_classification_warning_shown'):
logger.warning(f"[{pair}] ⚠️ 未找到分类模型列,使用默认值: {first_length}")
self._classification_warning_shown = True
else:
logger.debug(f"[{pair}] 未找到分类模型列,使用默认值: {first_length}")
# 根据 first_length 动态调整其他段长
second_length = first_length * 3
third_length = first_length * 5
# 计算总长度
total_length_needed = first_length + second_length + third_length
# 检查数据是否充足
if len(dataframe) < total_length_needed:
logger.warning(f"[{pair}] 数据不足{total_length_needed}个周期,返回震荡趋势")
return "ranging"
# 获取所需长度的trend_score历史
trend_scores = []
actual_total_length = len(dataframe)
for i in range(-total_length_needed, 0):
# 确保索引在有效范围内
if abs(i) > actual_total_length:
logger.warning(f"[{pair}] 索引 {i} 超出数据范围,使用默认趋势得分 50")
trend_scores.append(50)
continue
# 获取历史数据片段
end_idx = i + 1 if i != -1 else None
hist_df = dataframe.iloc[:end_idx]
if hist_df.empty:
logger.warning(f"[{pair}] 历史数据片段为空,使用默认趋势得分 50")
trend_scores.append(50)
continue
# 获取时间戳 - 统一使用整数时间戳
try:
last_idx = hist_df.index[-1]
if isinstance(last_idx, pd.Timestamp):
# 确保时间戳是无时区的整数
ts = last_idx.tz_localize(None) if last_idx.tz else last_idx
timestamp = int(ts.timestamp())
elif isinstance(last_idx, (int, np.integer)):
# 如果索引已经是整数,直接使用
timestamp = int(last_idx)
elif hasattr(last_idx, 'timestamp'):
timestamp = int(last_idx.timestamp())
else:
# 使用当前时间的整数时间戳
timestamp = int(pd.Timestamp.utcnow().timestamp())
except Exception as e:
# 使用当前时间的整数时间戳作为fallback
timestamp = int(pd.Timestamp.utcnow().timestamp())
# 获取趋势得分
score = self.get_trend_score_with_cache(
pair=pair,
timeframe=self.timeframe,
timestamp=timestamp,
dataframe=hist_df,
metadata=metadata
)
trend_scores.append(score)
# 验证结果数量
if len(trend_scores) < total_length_needed:
logger.warning(f"[{pair}] 只获取到 {len(trend_scores)} 个趋势得分,需要{total_length_needed}")
while len(trend_scores) < total_length_needed:
trend_scores.append(50)
# 分段计算加权得分
# 第一段最近first_length个周期
segment1 = trend_scores[-first_length:]
weighted_score1 = sum(score * 10 for score in segment1) / len(segment1)
# 第二段接下来的second_length个周期
segment2 = trend_scores[-(first_length + second_length):-first_length]
weighted_score2 = sum(score * 7 for score in segment2) / len(segment2)
# 第三段最后的third_length个周期
segment3 = trend_scores[-total_length_needed:-(first_length + second_length)]
weighted_score3 = sum(score * 3 for score in segment3) / len(segment3)
# 计算最终加权得分
final_weighted_score = (weighted_score1 + weighted_score2 + weighted_score3) / (10 + 7 + 3)
# 将得分映射到0-100区间
final_score = max(0, min(100, final_weighted_score))
# 使用hyperopt优化的阈值判断趋势状态
bullish_threshold = self.trend_final_bullish_threshold.value
bearish_threshold = self.trend_final_bearish_threshold.value
# 判定趋势状态
if final_score >= bullish_threshold:
trend_status = "bullish"
logger.info(f"[{pair}] 🚀 检测到上涨趋势: 最终加权得分={final_score:.2f}, 阈值≥{bullish_threshold}, 段长={first_length},{second_length},{third_length}")
elif final_score <= bearish_threshold:
trend_status = "bearish"
logger.info(f"[{pair}] 📉 检测到下跌趋势: 最终加权得分={final_score:.2f}, 阈值≤{bearish_threshold}, 段长={first_length},{second_length},{third_length}")
else:
trend_status = "ranging"
logger.info(f"[{pair}] ⚖️ 检测到震荡趋势: 最终加权得分={final_score:.2f}, 阈值范围({bearish_threshold}, {bullish_threshold}), 段长={first_length},{second_length},{third_length}")
# 输出分段详细信息用于调试
logger.debug(f"[{pair}] 趋势分析详情 - "
f"第一段({first_length}个,权重10): {[f'{s:.1f}' for s in segment1]}, "
f"第二段({second_length}个,权重7): {[f'{s:.1f}' for s in segment2]}, "
f"第三段({third_length}个,权重3): {[f'{s:.1f}' for s in segment3]}")
return trend_status
except Exception as e:
logger.error(f"[{pair}] 趋势状态检测失败: {e}")
return "ranging"