myTestFreqAI/freqtrade/templates/freqaiprimer.py
2025-08-14 19:50:56 +08:00

1113 lines
59 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import numpy as np
import datetime
import datetime
import os
import json
import glob
from functools import reduce
from freqtrade.persistence import Trade
import talib.abstract as ta
from pandas import DataFrame
import pandas as pd
from typing import Dict
from freqtrade.strategy import (DecimalParameter, IStrategy, IntParameter)
logger = logging.getLogger(__name__)
class FreqaiPrimer(IStrategy):
"""
基于 FreqAI 的动态阈值交易策略,集成动态加仓、减仓和自定义 ROI 逻辑,兼容最新 Freqtrade 版本
"""
# --- 🧪 固定配置参数从hyperopt优化结果获取---
TRAILING_STOP_START = 0.016
TRAILING_STOP_DISTANCE = 0.015
BUY_THRESHOLD_MIN = -0.035
BUY_THRESHOLD_MAX = -0.001
SELL_THRESHOLD_MIN = 0.002
SELL_THRESHOLD_MAX = 0.065
# 新增:加仓和减仓参数
ADD_POSITION_THRESHOLD = -0.021
EXIT_POSITION_RATIO = 0.472
COOLDOWN_PERIOD_MINUTES = 9
MAX_ENTRY_POSITION_ADJUSTMENT = 3
# 趋势判定阈值参数
TREND_BULLISH_THRESHOLD = 85
TREND_BEARISH_THRESHOLD = 60
# 新的加权趋势判定阈值用于hyperopt优化
TREND_FINAL_BULLISH_THRESHOLD = 55 # 上涨趋势最终阈值
TREND_FINAL_BEARISH_THRESHOLD = 13 # 下跌趋势最终阈值
# Hyperopt 可优化参数
trend_final_bullish_threshold = IntParameter(40, 90, default=55, space="buy", optimize=True, load=True)
trend_final_bearish_threshold = IntParameter(10, 40, default=13, space="buy", optimize=True, load=True)
# --- 🛠️ 固定配置参数 ---
stoploss = -0.15
timeframe = "3m"
use_custom_stoploss = True
position_adjustment_enable = True # 启用动态仓位调整
trailing_stop = True
trailing_stop_positive = 0.046
trailing_stop_positive_offset = 0.146
trailing_only_offset_is_reached = False
minimal_roi = {
"0": 0.06, # 30分钟0-30分钟8% 盈利退出
"30": 0.04, # 2小时30-120分钟4% 盈利退出
"90": 0.025, # 4小时120-240分钟2% 盈利退出
"270": 0.002 # 8小时240-480分钟0% 盈利退出
}
plot_config = {
"main_plot": {
"ema200": {"color": "blue"},
"bb_upperband": {"color": "gray"},
"bb_lowerband": {"color": "gray"},
},
"subplots": {
"Signals": {
"enter_long": {"color": "green", "type": "scatter"},
"exit_long": {"color": "red", "type": "scatter"}
},
"Indicators": {
"&-price_value_divergence": {"color": "purple"},
"volume_z_score": {"color": "orange"},
"rsi": {"color": "cyan"},
"stochrsi_k": {"color": "magenta"},
},
"Bearish Signals": {
"bearish_signal": {"type": "bar", "color": "red"},
"stochrsi_overbought": {"type": "bar", "color": "orange"},
}
}
}
freqai_info = {
"identifier": "test58",
"model": "LightGBMRegressor",
"feature_parameters": {
"include_timeframes": ["3m", "15m", "1h"],
"label_period_candles": 12,
"include_shifted_candles": 3,
},
"data_split_parameters": {
"test_size": 0.2,
"shuffle": False,
},
"model_training_parameters": {
"n_estimators": 200,
"learning_rate": 0.05,
"num_leaves": 31,
"verbose": -1,
},
"fit_live_predictions_candles": 100,
"live_retrain_candles": 100,
}
@staticmethod
def linear_map(value, from_min, from_max, to_min, to_max):
return (value - from_min) / (from_max - from_min) * (to_max - to_min) + to_min
def __init__(self, config: dict, *args, **kwargs):
super().__init__(config, *args, **kwargs)
# 读取 Redis 配置
self.redis_url = config.get('redis', {}).get('url', None)
if self.redis_url:
logger.info(f"✅ 成功读取 Redis 配置: {self.redis_url}")
else:
logger.warning("⚠️ 未找到 Redis 配置")
# 配置日志格式,包含时间戳
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.info,
datefmt='%Y-%m-%d %H:%M:%S,%f'[:-3] # 毫秒精度
)
logger.info("✅ 策略已初始化,日志级别设置为 DEBUG")
self.trailing_stop_enabled = False
self.pair_stats = {}
self.stats_logged = False
self.fit_live_predictions_candles = self.freqai_info.get("fit_live_predictions_candles", 100)
self.last_entry_time = {} # 记录每个币种的最后入场时间
def feature_engineering_expand_all(self, dataframe: DataFrame, period: int, metadata: dict, **kwargs) -> DataFrame:
dataframe["%-rsi-period"] = ta.RSI(dataframe, timeperiod=period)
dataframe["%-sma-period"] = ta.SMA(dataframe, timeperiod=period)
dataframe["%-ema-period"] = ta.EMA(dataframe, timeperiod=period)
real = ta.TYPPRICE(dataframe)
upperband, middleband, lowerband = ta.BBANDS(real, timeperiod=period, nbdevup=2.0, nbdevdn=2.0)
dataframe["bb_lowerband-period"] = lowerband
dataframe["bb_upperband-period"] = upperband
dataframe["bb_middleband-period"] = middleband
dataframe["%-bb_width-period"] = (dataframe["bb_upperband-period"] - dataframe["bb_lowerband-period"]) / dataframe["bb_middleband-period"]
dataframe["%-mfi-period"] = ta.MFI(dataframe, timeperiod=period)
dataframe["%-adx-period"] = ta.ADX(dataframe, timeperiod=period)
dataframe["%-relative_volume-period"] = dataframe["volume"] / dataframe["volume"].rolling(period).mean()
dataframe["ema200"] = ta.EMA(dataframe, timeperiod=200)
dataframe["%-price_value_divergence"] = (dataframe["close"] - dataframe["ema200"]) / dataframe["ema200"]
columns_to_clean = [
"%-rsi-period", "%-mfi-period", "%-sma-period", "%-ema-period", "%-adx-period",
"bb_lowerband-period", "bb_middleband-period", "bb_upperband-period",
"%-bb_width-period", "%-relative_volume-period", "%-price_value_divergence"
]
for col in columns_to_clean:
dataframe[col] = dataframe[col].replace([np.inf, -np.inf], 0).ffill().fillna(0)
pair = metadata.get('pair', 'Unknown')
logger.info(f"[{pair}] 特征工程完成,列:{list(dataframe.columns)}")
return dataframe
def set_freqai_targets(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame:
pair = metadata.get('pair', 'Unknown')
if len(dataframe) < 200:
logger.warning(f"[{pair}] 数据量不足({len(dataframe)}根K线需要至少200根K线进行训练")
return dataframe
dataframe["ema200"] = ta.EMA(dataframe, timeperiod=200)
dataframe["&-price_value_divergence"] = (dataframe["close"] - dataframe["ema200"]) / dataframe["ema200"]
dataframe["volume_mean_20"] = dataframe["volume"].rolling(20).mean()
dataframe["volume_std_20"] = dataframe["volume"].rolling(20).std()
dataframe["volume_z_score"] = (dataframe["volume"] - dataframe["volume_mean_20"]) / dataframe["volume_std_20"]
dataframe["volume_z_score"] = dataframe["volume_z_score"].replace([np.inf, -np.inf], 0).ffill().fillna(0)
dataframe["&-price_value_divergence"] = dataframe["&-price_value_divergence"].replace([np.inf, -np.inf], 0).ffill().fillna(0)
return dataframe
def is_stochrsi_overbought(self, dataframe: DataFrame, period=10, threshold=85) -> bool:
"""
判断当前 STOCHRSI 是否在过去 N 根 K 线中平均高于阈值(如 85
"""
if 'stochrsi_k' not in dataframe.columns:
# 如果列不存在,返回全 False 的 Series
return pd.Series([False] * len(dataframe), index=dataframe.index)
# 计算滚动平均值并判断是否超过阈值
avg_stochrsi = dataframe['stochrsi_k'].rolling(window=period).mean()
return avg_stochrsi > threshold
def is_bearish_market(self, dataframe: DataFrame, metadata: dict, timeframe: str = "1h") -> pd.Series:
pair = metadata.get('pair', 'Unknown')
logger.info(f"[{pair}] 开始计算 1h 熊市信号")
# 防御性检查
required_columns = ['close_1h', 'high_1h', 'low_1h', 'open_1h', 'stochrsi_k_1h', 'stochrsi_d_1h', 'bb_middle_1h']
missing = [col for col in required_columns if col not in dataframe.columns]
if missing:
logger.error(f"[{pair}] 缺少必要列:{missing},返回全 False")
return pd.Series([False] * len(dataframe), index=dataframe.index)
# 检查 volume_z_score_1h 是否存在
has_volume_z_score = 'volume_z_score_1h' in dataframe.columns and not dataframe['volume_z_score_1h'].isna().all()
# 条件 a价格远低于布林带中轨低于中轨 1% 以上)
cond_a = dataframe['close_1h'] < dataframe['bb_middle_1h'] * 0.99
# 条件 bSTOCHRSI 超过90
cond_b = dataframe['stochrsi_k_1h'] > 90
# 条件 c看跌蜡烛图形态
open_1h = dataframe['open_1h']
close_1h = dataframe['close_1h']
high_1h = dataframe['high_1h']
low_1h = dataframe['low_1h']
prev_open = open_1h.shift(1)
prev_close = close_1h.shift(1)
cond_engulfing = (
(prev_close > prev_open) &
(close_1h < open_1h) &
(close_1h < prev_open) &
(open_1h > prev_close)
)
cond_dark_cloud_cover = (
(prev_close > prev_open) &
(open_1h > prev_close) &
(close_1h < (prev_open + prev_close) / 2) &
(close_1h < open_1h)
)
body = abs(open_1h - close_1h)
upper_wick = high_1h - np.maximum(open_1h, close_1h)
lower_wick = np.minimum(open_1h, close_1h) - low_1h
cond_shooting_star = (upper_wick > 2 * body) & (lower_wick < body) & (close_1h < open_1h)
cond_stochrsi_high = self.is_stochrsi_overbought(dataframe, period=10, threshold=85)
cond_c = cond_engulfing | cond_dark_cloud_cover | cond_shooting_star | cond_stochrsi_high
# 条件 d成交量显著下降
cond_d = dataframe['volume_z_score_1h'] < -1.0 if has_volume_z_score else pd.Series([False] * len(dataframe), index=dataframe.index)
# 综合熊市信号(至少满足两个条件)
bearish_signal = (cond_a & cond_b) | (cond_a & cond_c) | (cond_b & cond_c) | (cond_a & cond_d)
# 记录每个条件的触发情况
logger.info(f"[{pair}] 熊市信号 - 条件a (价格低于布林中轨): {cond_a.iloc[-1]}")
logger.info(f"[{pair}] 熊市信号 - 条件b (STOCHRSI>90): {cond_b.iloc[-1]}")
logger.info(f"[{pair}] 熊市信号 - 条件c (看跌形态): {cond_c.iloc[-1]}")
logger.info(f"[{pair}] 熊市信号 - 条件d (成交量下降): {cond_d.iloc[-1]}")
# 汇总所有条件的得分
bearish_score = 0
if cond_a.iloc[-1]:
bearish_score += 25
if cond_b.iloc[-1]:
bearish_score += 25
if cond_c.iloc[-1]:
bearish_score += 25
if cond_d.iloc[-1]:
bearish_score += 25
logger.info(f"[{pair}] 熊市信号总得分: {bearish_score}/100")
return bearish_signal
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
计算主时间框架3m和 1h 时间框架的指标,并映射到主 dataframe。
包含 FreqAI 预测、布林带、RSI、成交量 Z 分数等,并确保 1h 数据列完整性。
"""
pair = metadata.get('pair', 'Unknown')
logger.info(f"[{pair}] 当前可用列调用FreqAI前{list(dataframe.columns)}")
# 计算主时间框架3m指标
dataframe["ema200"] = ta.EMA(dataframe, timeperiod=200)
dataframe["price_value_divergence"] = (dataframe["close"] - dataframe["ema200"]) / dataframe["ema200"]
upperband, middleband, lowerband = ta.BBANDS(dataframe["close"], timeperiod=20, nbdevup=2.0, nbdevdn=2.0)
dataframe["bb_upperband"] = upperband
dataframe["bb_middleband"] = middleband
dataframe["bb_lowerband"] = lowerband
dataframe["rsi"] = ta.RSI(dataframe, timeperiod=14)
dataframe["volume_mean_20"] = dataframe["volume"].rolling(20).mean()
dataframe["volume_std_20"] = dataframe["volume"].rolling(20).std()
dataframe["volume_z_score"] = (dataframe["volume"] - dataframe["volume_mean_20"]) / dataframe["volume_std_20"]
# 计算主时间框架的 STOCHRSI
stochrsi = ta.STOCHRSI(dataframe, timeperiod=14, fastk_period=3, fastd_period=3)
dataframe["stochrsi_k"] = stochrsi["fastk"]
dataframe["stochrsi_d"] = stochrsi["fastd"]
# 获取 1h 时间框架数据
dataframe_1h = self.dp.get_pair_dataframe(pair=pair, timeframe="1h")
if dataframe_1h.empty or len(dataframe_1h) < 50:
logger.warning(f"[{pair}] 1h 数据为空或不足({len(dataframe_1h)} 根K线初始化空列")
for col in ['open_1h', 'high_1h', 'low_1h', 'close_1h', 'stochrsi_k_1h', 'stochrsi_d_1h',
'bb_upper_1h', 'bb_middle_1h', 'bb_lower_1h', 'volume_z_score_1h']:
dataframe[col] = np.nan
else:
# 计算 1h 指标
if len(dataframe_1h) >= 20: # 确保有足够数据计算 rolling(20)
stochrsi_1h = ta.STOCHRSI(dataframe_1h, timeperiod=14, fastk_period=3, fastd_period=3)
dataframe_1h['stochrsi_k'] = stochrsi_1h['fastk']
dataframe_1h['stochrsi_d'] = stochrsi_1h['fastd']
real = ta.TYPPRICE(dataframe_1h)
upperband, middleband, lowerband = ta.BBANDS(real, timeperiod=20, nbdevup=2.0, nbdevdn=2.0)
dataframe_1h['bb_upper_1h'] = upperband
dataframe_1h['bb_middle_1h'] = middleband
dataframe_1h['bb_lower_1h'] = lowerband
dataframe_1h['volume_mean_20_1h'] = dataframe_1h["volume"].rolling(20).mean()
dataframe_1h['volume_std_20_1h'] = dataframe_1h["volume"].rolling(20).std()
dataframe_1h['volume_z_score_1h'] = (dataframe_1h["volume"] - dataframe_1h['volume_mean_20_1h']) / dataframe_1h['volume_std_20_1h']
# 清理 NaN 和无穷值
dataframe_1h['volume_z_score_1h'] = dataframe_1h['volume_z_score_1h'].replace([np.inf, -np.inf], 0).ffill().fillna(0)
else:
logger.warning(f"[{pair}] 1h 数据不足以计算 volume_z_score_1h{len(dataframe_1h)} 根K线需至少20根")
dataframe_1h['volume_z_score_1h'] = np.nan
# 映射 1h 数据到主时间框架
for col in ['open', 'high', 'low', 'close', 'stochrsi_k', 'stochrsi_d', 'bb_upper_1h', 'bb_middle_1h', 'bb_lower_1h', 'volume_z_score_1h']:
if col in dataframe_1h.columns:
dataframe[col if col.endswith('_1h') else f"{col}_1h"] = dataframe_1h[col].reindex(dataframe.index, method='ffill').bfill()
else:
logger.warning(f"[{pair}] 1h 数据缺少列 {col},初始化为空")
dataframe[col if col.endswith('_1h') else f"{col}_1h"] = np.nan
# 数据清理:处理 NaN 和无穷值
for col in ["ema200", "bb_upperband", "bb_middleband", "bb_lowerband", "rsi", "volume_z_score",
"&-price_value_divergence", "price_value_divergence", "open_1h", "high_1h", "low_1h",
"close_1h", "stochrsi_k_1h", "stochrsi_d_1h", "bb_upper_1h", "bb_middle_1h", "bb_lower_1h",
"volume_z_score_1h"]:
if col in dataframe.columns:
dataframe[col] = dataframe[col].replace([np.inf, -np.inf], 0).ffill().fillna(0)
# 调用 FreqAI 预测
if not hasattr(self, 'freqai') or self.freqai is None:
logger.error(f"[{pair}] FreqAI 未初始化,回退到规则计算")
dataframe["&-price_value_divergence"] = dataframe["price_value_divergence"]
else:
logger.info(f"[{pair}] 调用 FreqAI 预测,类型:{type(self.freqai)}")
dataframe = self.freqai.start(dataframe, metadata, self)
if "&-price_value_divergence" not in dataframe.columns:
logger.warning(f"[{pair}] FreqAI 未生成 &-price_value_divergence回退到规则计算")
dataframe["&-price_value_divergence"] = dataframe["price_value_divergence"]
# 计算 labels_mean 和 labels_std
labels_mean = None
labels_std = None
try:
model_base_dir = os.path.join(self.config["user_data_dir"], "models", self.freqai_info["identifier"])
pair_base = pair.split('/')[0] if '/' in pair else pair
sub_dirs = glob.glob(os.path.join(model_base_dir, f"sub-train-{pair_base}_*"))
if not sub_dirs:
logger.warning(f"[{pair}] 未找到任何子目录:{model_base_dir}/sub-train-{pair_base}_*")
else:
latest_sub_dir = max(sub_dirs, key=lambda x: int(x.split('_')[-1]))
pair_base_lower = pair_base.lower()
timestamp = latest_sub_dir.split('_')[-1]
metadata_file = os.path.join(latest_sub_dir, f"cb_{pair_base_lower}_{timestamp}_metadata.json")
if os.path.exists(metadata_file):
with open(metadata_file, "r") as f:
metadata = json.load(f)
labels_mean = metadata["labels_mean"]["&-price_value_divergence"]
labels_std = metadata["labels_std"]["&-price_value_divergence"]
logger.info(f"[{pair}] 从最新子目录 {latest_sub_dir} 读取 labels_mean{labels_mean}, labels_std{labels_std}")
else:
logger.warning(f"[{pair}] 最新的 metadata.json 文件 {metadata_file} 不存在")
except Exception as e:
logger.warning(f"[{pair}] 无法从子目录读取 labels_mean 和 labels_std{e},重新计算")
if labels_mean is None or labels_std is None:
logger.warning(f"[{pair}] 无法获取 labels_mean 和 labels_std重新计算")
dataframe["&-price_value_divergence_actual"] = (dataframe["close"] - dataframe["ema200"]) / dataframe["ema200"]
dataframe["&-price_value_divergence_actual"] = dataframe["&-price_value_divergence_actual"].replace([np.inf, -np.inf], 0).ffill().fillna(0)
recent_data = dataframe["&-price_value_divergence_actual"].tail(self.fit_live_predictions_candles)
labels_mean = recent_data.mean()
labels_std = recent_data.std()
if np.isnan(labels_std) or labels_std == 0:
labels_std = 0.01
logger.warning(f"[{pair}] labels_std 计算异常,使用默认值 0.01")
# 根据市场趋势得分动态调整买卖阈值
market_trend_score = self.get_market_trend(dataframe=dataframe, metadata={'pair': pair})
k_buy = self.linear_map(market_trend_score, 0, 100, 1.2, 0.8)
k_sell = self.linear_map(market_trend_score, 0, 100, 1.5, 1.0)
self.buy_threshold = labels_mean - k_buy * labels_std
self.sell_threshold = labels_mean + k_sell * labels_std
# 使用固定参数限制阈值
self.buy_threshold = max(self.buy_threshold, self.BUY_THRESHOLD_MIN)
self.buy_threshold = min(self.buy_threshold, self.BUY_THRESHOLD_MAX)
self.sell_threshold = min(self.sell_threshold, self.SELL_THRESHOLD_MAX)
self.sell_threshold = max(self.sell_threshold, self.SELL_THRESHOLD_MIN)
# 调试日志
logger.info(f"[{pair}] 市场趋势得分:{market_trend_score}, labels_mean{labels_mean:.4f}, labels_std{labels_std:.4f}")
logger.info(f"[{pair}] k_buy{k_buy:.2f}, k_sell{k_sell:.2f}")
logger.info(f"[{pair}] 动态买入阈值:{self.buy_threshold:.4f}, 卖出阈值:{self.sell_threshold:.4f}")
logger.info(f"[{pair}] 最新数据 - close: {dataframe['close'].iloc[-1]:.6f}, "
f"rsi: {dataframe['rsi'].iloc[-1]:.2f}, "
f"&-price_value_divergence: {dataframe['&-price_value_divergence'].iloc[-1]:.6f}, "
f"volume_z_score: {dataframe['volume_z_score'].iloc[-1]:.2f}, "
f"bb_lowerband: {dataframe['bb_lowerband'].iloc[-1]:.6f}, "
f"close_1h: {dataframe['close_1h'].iloc[-1] if 'close_1h' in dataframe else 'N/A'}, "
f"stochrsi_k_1h: {dataframe['stochrsi_k_1h'].iloc[-1] if 'stochrsi_k_1h' in dataframe else 'N/A'}, "
f"volume_z_score_1h: {dataframe['volume_z_score_1h'].iloc[-1] if 'volume_z_score_1h' in dataframe else 'N/A'}")
if not self.stats_logged:
logger.info("===== 所有币对的 labels_mean 和 labels_std 汇总 =====")
for p, stats in self.pair_stats.items():
logger.info(f"[{p}] labels_mean{stats['labels_mean']:.4f}, labels_std{stats['labels_std']:.4f}")
logger.info("==============================================")
self.stats_logged = True
return dataframe
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
pair = metadata.get('pair', 'Unknown')
conditions = []
logger.info(f"[{pair}] populate_entry_trend 被调用,数据行数:{len(dataframe)},时间:{dataframe.index[-1]}")
# 获取市场趋势得分
trend_score = self.get_market_trend(dataframe=dataframe, metadata=metadata)
# 动态调整成交量和 RSI 阈值
volume_z_score_min = 0.5
volume_z_score_max = 1.5
volume_z_score_threshold = self.linear_map(trend_score, 0, 100, volume_z_score_max, volume_z_score_min)
rsi_min = 35
rsi_max = 55
rsi_threshold = self.linear_map(trend_score, 0, 100, rsi_max, rsi_min)
stochrsi_min = 25
stochrsi_max = 45
stochrsi_threshold = self.linear_map(trend_score, 0, 100, stochrsi_max, stochrsi_min)
# 计算熊市信号和 STOCHRSI 超买信号
bearish_signal = self.is_bearish_market(dataframe, metadata, timeframe="1h")
bearish_signal_aligned = bearish_signal.reindex(dataframe.index, method='ffill').fillna(False)
stochrsi_overbought = self.is_stochrsi_overbought(dataframe, period=10, threshold=85)
stochrsi_overbought_aligned = stochrsi_overbought.reindex(dataframe.index, method='ffill').fillna(True)
# 检测趋势状态
trend_status = self.detect_trend_status(dataframe, metadata)
# 根据趋势状态调整入场策略
if "&-price_value_divergence" in dataframe.columns:
if trend_status == "bullish":
# 上涨趋势:放宽入场条件,允许追高
cond1 = (dataframe["&-price_value_divergence"] < self.buy_threshold * 1.5) # 放宽到1.5倍
cond2 = (dataframe["volume_z_score"] > volume_z_score_threshold * 0.7) # 降低成交量要求
cond3 = (dataframe["rsi"] < rsi_threshold * 1.2) # 放宽RSI要求
cond4 = (dataframe["close"] <= dataframe["bb_upperband"]) # 可以在上轨附近入场
cond5 = (dataframe["stochrsi_k"] < stochrsi_threshold * 1.2) # 放宽STOCHRSI要求
cond6 = pd.Series([True] * len(dataframe), index=dataframe.index) # 取消熊市过滤
cond7 = pd.Series([True] * len(dataframe), index=dataframe.index) # 取消超买过滤
logger.info(f"[{pair}] 🚀 上涨趋势策略:放宽入场条件")
elif trend_status == "bearish":
# 下跌趋势:严格入场条件,只抄底
cond1 = (dataframe["&-price_value_divergence"] < self.buy_threshold * 0.7) # 严格到0.7倍
cond2 = (dataframe["volume_z_score"] > volume_z_score_threshold * 1.3) # 提高成交量要求
cond3 = (dataframe["rsi"] < rsi_threshold * 0.8) # 严格RSI要求
cond4 = (dataframe["close"] <= dataframe["bb_lowerband"] * 0.95) # 必须跌破下轨
cond5 = (dataframe["stochrsi_k"] < stochrsi_threshold * 0.8) # 严格STOCHRSI要求
cond6 = ~bearish_signal_aligned # 保持熊市过滤
cond7 = ~stochrsi_overbought_aligned # 保持超买过滤
logger.info(f"[{pair}] 📉 下跌趋势策略:严格入场条件")
else: # ranging
# 震荡趋势:使用原策略
cond1 = (dataframe["&-price_value_divergence"] < self.buy_threshold)
cond2 = (dataframe["volume_z_score"] > volume_z_score_threshold)
cond3 = (dataframe["rsi"] < rsi_threshold)
cond4 = (dataframe["close"] <= dataframe["bb_lowerband"])
cond5 = (dataframe["stochrsi_k"] < stochrsi_threshold)
cond6 = ~bearish_signal_aligned
cond7 = ~stochrsi_overbought_aligned
logger.info(f"[{pair}] ⚖️ 震荡趋势策略:标准入场条件")
buy_condition = cond1 & cond2 & cond3 & cond4 & cond5 & cond6 & cond7
conditions.append(buy_condition)
# 调试日志
divergence_value = dataframe["&-price_value_divergence"].iloc[-1]
volume_z_score_value = dataframe["volume_z_score"].iloc[-1]
rsi_value = dataframe["rsi"].iloc[-1]
stochrsi_value = dataframe["stochrsi_k"].iloc[-1]
bb_close_value = dataframe["close"].iloc[-1]
bb_lower_value = dataframe["bb_lowerband"].iloc[-1]
# 定义条件名称和状态
conditions_summary = [
("&-price_value_divergence", divergence_value, "<", self.buy_threshold, cond1.iloc[-1]),
("volume_z_score", volume_z_score_value, ">", volume_z_score_threshold, cond2.iloc[-1]),
("rsi", rsi_value, "<", rsi_threshold, cond3.iloc[-1]),
("close <= bb_lowerband", bb_close_value, "<=", bb_lower_value, cond4.iloc[-1]),
("stochrsi_k", stochrsi_value, "<", stochrsi_threshold, cond5.iloc[-1]),
("非熊市", None, None, None, cond6.iloc[-1]),
("STOCHRSI未持续超买", None, None, None, cond7.iloc[-1]),
]
# 输出每个条件的状态
logger.info(f"[{pair}] === 买入条件检查 ===")
failed_conditions = []
for name, value, operator, threshold, result in conditions_summary:
status = "" if result else ""
if value is not None and threshold is not None:
logger.info(f"[{pair}] {status} {name}: {value:.6f} {operator} {threshold:.6f}")
else:
logger.info(f"[{pair}] {status} {name}")
if not result:
failed_conditions.append(name)
else:
logger.warning(f"[{pair}] &-price_value_divergence 列缺失,跳过买入信号生成")
if conditions:
combined_condition = reduce(lambda x, y: x & y, conditions)
dataframe.loc[combined_condition, 'enter_long'] = 1
# 输出每个条件的状态
logger.info(f"[{pair}] === 买入条件检查 ===")
satisfied_conditions = []
for name, value, operator, threshold, result in conditions_summary:
status = "" if result else ""
if value is not None and threshold is not None:
logger.info(f"[{pair}] {status} {name}: {value:.6f} {operator} {threshold:.6f}")
else:
logger.info(f"[{pair}] {status} {name}")
if result:
satisfied_conditions.append(name)
# 总结满足的条件
if combined_condition.any():
logger.info(f"[{pair}] ✅ 买入信号触发,满足条件: {', '.join(satisfied_conditions)},趋势得分:{trend_score:.2f}")
else:
logger.info(f"[{pair}] ❌ 买入条件未满足")
else:
logger.info(f"[{pair}] 无有效买入条件")
# 记录各条件触发率
logger.info(f"[{pair}] 各条件触发率 - "
f"cond1: {cond1.mean():.2%}, "
f"cond2: {cond2.mean():.2%}, "
f"cond3: {cond3.mean():.2%}, "
f"cond4: {cond4.mean():.2%}, "
f"cond5: {cond5.mean():.2%}, "
f"buy_condition: {buy_condition.mean():.2%}")
# 记录 enter_long 信号统计
logger.info(f"[{pair}] enter_long 信号总数:{dataframe['enter_long'].sum() if 'enter_long' in dataframe.columns else 0}")
return dataframe
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
pair = metadata.get('pair', 'Unknown')
conditions = []
if "&-price_value_divergence" in dataframe.columns:
# 计算额外指标StochRSI、ADX 和短期价格变化
stochrsi = ta.STOCHRSI(dataframe, timeperiod=14, fastk_period=3, fastd_period=3)
dataframe["stochrsi_k"] = stochrsi["fastk"]
dataframe["adx"] = ta.ADX(dataframe, timeperiod=14)
# 计算短期价格涨幅(最近 5 根 K 线,约 15 分钟)
dataframe["short_term_return"] = dataframe["close"].pct_change(5) * 100 # 百分比回报
# 获取市场趋势得分
trend_score = self.get_market_trend(dataframe=dataframe, metadata={'pair': pair})
# 条件 1高阈值 &-price_value_divergence
cond1 = (
(dataframe["&-price_value_divergence"] > self.sell_threshold * 1.06) & # 提高到 1.06
(dataframe["adx"] > 20) # 趋势强度过滤
)
# 条件 2超买信号
cond2 = (
(dataframe["rsi"] > 65) &
(dataframe["stochrsi_k"] > 70) & # StochRSI 超买
(dataframe["adx"] > 25) # 趋势强度
)
# 条件 3快速拉升退出
# 检测最近 5 根 K 线(约 15 分钟)涨幅超过 3%,且有最低 2% 利润,结合 StochRSI 超买
min_profit = 0.02 # 最低利润 2%
rapid_rise_threshold = self.linear_map(trend_score, 0, 100, 3.5, 2) # 熊市 4%,牛市 2.5%
cond3 = (
(dataframe["short_term_return"] > rapid_rise_threshold) & # 短期快速拉升
(dataframe["close"] / dataframe["close"].shift(5) - 1 > min_profit) & # 确保最低利润
(dataframe["stochrsi_k"] > 70) & # 超买确认
(dataframe["volume_z_score"] > 1.0)
)
# 检测趋势状态
trend_status = self.detect_trend_status(dataframe, metadata)
# 根据趋势状态调整出场策略
if trend_status == "bullish":
# 上涨趋势:严格出场条件,让利润奔跑
if trend_score > 95:
logger.info(f"[{pair}] 🚀 强劲上涨趋势,拒绝卖出")
return dataframe
# 上涨趋势下需要更强的卖出信号
bullish_sell_threshold = 90
cond1_bullish = (dataframe["&-price_value_divergence"] > self.sell_threshold * 1.15) # 更严格的阈值
cond2_bullish = (dataframe["rsi"] > 75) & (dataframe["stochrsi_k"] > 85) & (dataframe["adx"] > 30)
cond3_bullish = (dataframe["short_term_return"] > 5.0) & (dataframe["stochrsi_k"] > 85) # 更高的涨幅要求
sell_condition = (cond1_bullish & cond2_bullish) | (cond1_bullish & cond3_bullish) | (cond2_bullish & cond3_bullish)
logger.info(f"[{pair}] 🚀 上涨趋势策略:严格出场条件")
elif trend_status == "bearish":
# 下跌趋势:宽松出场条件,快速止盈止损
cond1_bearish = (dataframe["&-price_value_divergence"] > self.sell_threshold * 0.9) # 更宽松的阈值
cond2_bearish = (dataframe["rsi"] > 60) & (dataframe["stochrsi_k"] > 70) & (dataframe["adx"] > 20)
cond3_bearish = (dataframe["short_term_return"] > 1.5) & (dataframe["stochrsi_k"] > 75) # 较低的涨幅要求
sell_condition = cond1_bearish | cond2_bearish | cond3_bearish # 任一条件即可卖出
logger.info(f"[{pair}] 📉 下跌趋势策略:宽松出场条件")
else: # ranging
# 震荡趋势:使用原策略
if trend_score > 98:
logger.info(f"[{pair}] ⚖️ 震荡趋势但得分较高,拒绝卖出")
return dataframe
trend_sell_threshold = 85
if trend_score > trend_sell_threshold:
sell_condition = (cond1 & cond2) | (cond1 & cond3) | (cond2 & cond3) # 中等趋势,至少两个条件满足
logger.info(f"[{pair}] ⚖️ 震荡趋势策略:标准出场条件")
else:
sell_condition = cond1 | cond2 | cond3 # 弱势趋势,任一条件满足
logger.info(f"[{pair}] ⚖️ 弱势震荡,任一条件满足")
conditions.append(sell_condition)
# 调试日志
divergence_value = dataframe["&-price_value_divergence"].iloc[-1] if not dataframe["&-price_value_divergence"].isna().all() else np.nan
rsi_value = dataframe["rsi"].iloc[-1] if not dataframe["rsi"].isna().all() else np.nan
stochrsi_value = dataframe["stochrsi_k"].iloc[-1] if not dataframe["stochrsi_k"].isna().all() else np.nan
adx_value = dataframe["adx"].iloc[-1] if not dataframe["adx"].isna().all() else np.nan
short_term_return = dataframe["short_term_return"].iloc[-1] if not dataframe["short_term_return"].isna().all() else np.nan
logger.info(f"[{pair}] 卖出条件检查 - "
f"&-price_value_divergence={divergence_value:.6f} > {self.sell_threshold * 1.06:.6f}: {cond1.iloc[-1]}, "
f"rsi={rsi_value:.2f} > 75 & stochrsi_k={stochrsi_value:.2f} > 80: {cond2.iloc[-1]}, "
f"short_term_return={short_term_return:.2f}% > {rapid_rise_threshold:.2f}% & profit > {min_profit*100:.2f}%: {cond3.iloc[-1]}, "
f"adx={adx_value:.2f}, trend_score={trend_score:.2f}")
else:
logger.warning(f"[{pair}] ⚠️ &-price_value_divergence 列缺失,跳过该条件")
if len(conditions) > 0:
dataframe.loc[reduce(lambda x, y: x & y, conditions), 'exit_long'] = 1
logger.info(f"[{pair}] 出场信号触发,条件满足,趋势得分:{trend_score:.2f}")
else:
logger.info(f"[{pair}] 无有效卖出条件")
return dataframe
def buy_space(self):
return [
DecimalParameter(-0.1, -0.01, name="buy_threshold_min"),
DecimalParameter(-0.02, -0.001, name="buy_threshold_max"),
DecimalParameter(-0.05, -0.01, name="add_position_threshold", default=-0.02),
IntParameter(1, 10, name="cooldown_period_minutes", default=5),
IntParameter(1, 3, name="max_entry_position_adjustment", default=2)
]
def sell_space(self):
return [
DecimalParameter(0.001, 0.02, name="sell_threshold_min"),
DecimalParameter(0.02, 0.1, name="sell_threshold_max"),
DecimalParameter(0.2, 0.7, name="exit_position_ratio", default=0.5)
]
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float,
min_stake: float | None, max_stake: float,
current_entry_rate: float, current_exit_rate: float,
current_entry_profit: float, current_exit_profit: float,
**kwargs) -> float | None | tuple[float | None, str | None]:
"""
动态调整仓位:支持加仓、减仓、追踪止损和最大持仓时间限制
参数:
- trade: 当前交易对象
- current_time: 当前时间
- current_rate: 当前价格
- current_profit: 当前总盈利
- min_stake: 最小下注金额
- max_stake: 最大下注金额
- current_entry_rate: 当前入场价格
- current_exit_rate: 当前退出价格
- current_entry_profit: 当前入场盈利
- current_exit_profit: 当前退出盈利
返回:
- 调整金额(正数为加仓,负数为减仓)或 None
"""
pair = trade.pair
dataframe = self.dp.get_pair_dataframe(pair, self.timeframe)
trend_score = self.get_market_trend(dataframe=dataframe, metadata={'pair': pair})
hold_time = (current_time - trade.open_date_utc).total_seconds() / 60
profit_ratio = (current_rate - trade.open_rate) / trade.open_rate
initial_stake_amount = trade.stake_amount / 3
logger.info(f"{pair} 首次入场金额: {initial_stake_amount:.2f}, 当前持仓金额: {trade.stake_amount:.2f}, "
f"加仓次数: {trade.nr_of_successful_entries - 1}, 趋势得分: {trend_score:.2f}")
# 检测趋势状态
trend_status = self.detect_trend_status(dataframe, {'pair': pair})
logger.info(f"{pair} 当前趋势状态: {trend_status}")
# 根据趋势状态调整仓位管理参数
if trend_status == "bullish":
# 上涨趋势:积极加仓,放宽止盈
max_entry_adjustments = min(self.MAX_ENTRY_POSITION_ADJUSTMENT + 1, 5) # 允许更多加仓
add_position_threshold = self.ADD_POSITION_THRESHOLD * 1.3 # 更宽松的加仓条件
exit_position_ratio = self.EXIT_POSITION_RATIO * 1.4 # 更高的止盈目标
trailing_stop_start = self.TRAILING_STOP_START * 1.2 # 更高的启动阈值
trailing_stop_distance = self.TRAILING_STOP_DISTANCE * 1.5 # 更大的回撤容忍
logger.info(f"{pair} 🚀 上涨趋势仓位管理参数: max_entries={max_entry_adjustments}, add_thresh={add_position_threshold:.4f}, exit_ratio={exit_position_ratio:.2%}")
elif trend_status == "bearish":
# 下跌趋势:谨慎加仓,严格止盈
max_entry_adjustments = max(self.MAX_ENTRY_POSITION_ADJUSTMENT - 1, 1) # 减少加仓次数
add_position_threshold = self.ADD_POSITION_THRESHOLD * 0.7 # 更严格的加仓条件
exit_position_ratio = self.EXIT_POSITION_RATIO * 0.8 # 更低的止盈目标
trailing_stop_start = self.TRAILING_STOP_START * 0.8 # 更低的启动阈值
trailing_stop_distance = self.TRAILING_STOP_DISTANCE * 0.7 # 更严格的止损
logger.info(f"{pair} 📉 下跌趋势仓位管理参数: max_entries={max_entry_adjustments}, add_thresh={add_position_threshold:.4f}, exit_ratio={exit_position_ratio:.2%}")
else: # ranging
# 震荡趋势:使用标准参数
max_entry_adjustments = self.MAX_ENTRY_POSITION_ADJUSTMENT
add_position_threshold = self.ADD_POSITION_THRESHOLD
exit_position_ratio = self.EXIT_POSITION_RATIO
trailing_stop_start = self.TRAILING_STOP_START
trailing_stop_distance = self.TRAILING_STOP_DISTANCE
logger.info(f"{pair} ⚖️ 震荡趋势仓位管理参数: max_entries={max_entry_adjustments}, add_thresh={add_position_threshold:.4f}, exit_ratio={exit_position_ratio:.2%}")
# 加仓逻辑
if trade.nr_of_successful_entries <= max_entry_adjustments + 1:
# 动态调整加仓阈值
if trend_status == "bullish":
add_threshold = 90 - 20 * (trend_score / 100) # 上涨趋势下更宽松
elif trend_status == "bearish":
add_threshold = 70 - 30 * (trend_score / 100) # 下跌趋势下更谨慎
else:
add_threshold = 80 - 30 * (trend_score / 100) # 震荡趋势标准
if profit_ratio <= add_position_threshold and hold_time > 5 and trend_score <= add_threshold:
add_count = trade.nr_of_successful_entries - 1
# 根据趋势状态调整加仓倍数
if trend_status == "bullish":
multipliers = [1.5, 3, 6] # 上涨趋势下更保守的加仓
elif trend_status == "bearish":
multipliers = [1, 2, 4] # 下跌趋势下更激进的加仓(抄底)
else:
multipliers = [2, 4, 8] # 震荡趋势标准加仓
if add_count < len(multipliers):
multiplier = multipliers[add_count]
add_amount = initial_stake_amount * multiplier
if min_stake is not None and add_amount < min_stake:
logger.warning(f"{pair} 加仓金额 {add_amount:.2f} 低于最小下注金额 {min_stake:.2f}")
return (None, f"Add amount {add_amount:.2f} below min_stake {min_stake:.2f}")
if add_amount > max_stake:
add_amount = max_stake
logger.info(f"{pair} 趋势状态: {trend_status}, 价格下跌 {profit_ratio*100:.2f}%,触发第 {add_count + 1} 次加仓 {add_amount:.2f}")
return (add_amount, f"Trend: {trend_status}, Price dropped {profit_ratio*100:.2f}%, add {add_amount:.2f}")
# 减仓逻辑
if profit_ratio >= exit_position_ratio:
# 根据趋势状态调整减仓比例
if trend_status == "bullish":
reduce_factor = 0.5 # 上涨趋势下只减仓50%
elif trend_status == "bearish":
reduce_factor = 1.0 # 下跌趋势下全部减仓
else:
reduce_factor = 0.8 # 震荡趋势下减仓80%
reduce_amount = -trade.stake_amount * reduce_factor
logger.info(f"{pair} 趋势状态: {trend_status}, 利润 {profit_ratio*100:.2f}%,减仓 {abs(reduce_amount):.2f} ({reduce_factor*100:.0f}%)")
return (reduce_amount, f"Trend: {trend_status}, Profit {profit_ratio*100:.2f}%, reduce {abs(reduce_amount):.2f}")
# 追踪止损逻辑
if profit_ratio >= trailing_stop_start and not self.trailing_stop_enabled:
self.trailing_stop_enabled = True
trade.adjust_min_max_rates(current_rate, current_rate)
logger.info(f"{pair} 趋势状态: {trend_status}, 价格上涨超过 {trailing_stop_start*100:.1f}%,启动追踪止损")
return None
if self.trailing_stop_enabled:
max_rate = trade.max_rate or current_rate
trailing_stop_price = max_rate * (1 - trailing_stop_distance)
if current_rate < trailing_stop_price:
logger.info(f"{pair} 趋势状态: {trend_status}, 价格回落至 {trailing_stop_price:.6f},触发全部卖出")
return (-trade.stake_amount, f"Trend: {trend_status}, Trailing stop at {trailing_stop_price:.6f}")
trade.adjust_min_max_rates(current_rate, trade.min_rate)
return None
return None
def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float,
time_in_force: str, current_time: datetime, **kwargs) -> bool:
# 调试日志:记录输入参数
logger.info(f"[{pair}] confirm_trade_entry called with rate={rate}, type(rate)={type(rate)}, "
f"amount={amount}, order_type={order_type}, time_in_force={time_in_force}")
# 检查 rate 是否有效
if not isinstance(rate, (float, int)) or rate is None:
logger.error(f"[{pair}] Invalid rate value: {rate} (type: {type(rate)}). Skipping trade entry.")
return False
# 获取当前数据
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1]
market_trend_score = self.get_market_trend(dataframe=DataFrame, metadata={'pair': pair})
cooldown_period_minutes = self.COOLDOWN_PERIOD_MINUTES if market_trend_score > 50 else self.COOLDOWN_PERIOD_MINUTES // 2
if pair in self.last_entry_time:
last_time = self.last_entry_time[pair]
if (current_time - last_time).total_seconds() < cooldown_period_minutes * 60:
logger.info(f"[{pair}] 冷却期内({cooldown_period_minutes} 分钟),跳过本次入场")
return False
self.last_entry_time[pair] = current_time
self.trailing_stop_enabled = False
try:
logger.info(f"[{pair}] 确认入场,价格:{float(rate):.6f}")
except (ValueError, TypeError) as e:
logger.error(f"[{pair}] Failed to format rate: {rate} (type: {type(rate)}), error: {e}")
return False
return True
def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float,
rate: float, time_in_force: str, exit_reason: str,
current_time: datetime, **kwargs) -> bool:
adjusted_rate = rate * (1 + 0.0025)
logger.info(f"[{pair}] 退出交易,原因:{exit_reason}, 原始利润:{trade.calc_profit_ratio(rate):.2%},"f"调整后卖出价:{adjusted_rate:.6f}")
return True
def custom_roi(self, trade: Trade, current_profit: float, current_time: datetime, trade_dur: int,
current_rate: float = None, min_stake: float | None = None, max_stake: float | None = None) -> dict:
"""
动态调整 ROI 表格,基于 FreqAI 预测的 &-price_value_divergence 和 RSI。
- 负的 divergence预测上涨或低 RSI 时提高 ROI。
- 正的 divergence预测下跌或高 RSI 时降低 ROI。
- 长时间持仓降低 ROI 目标。
"""
pair = trade.pair
logger.info(f"[{pair}] 计算自定义 ROI当前盈利: {current_profit:.2%}, 持仓时间: {trade_dur} 分钟")
# 获取最新数据
dataframe = self.dp.get_pair_dataframe(pair=pair, timeframe=self.timeframe)
dataframe = self.populate_indicators(dataframe, {'pair': pair}) # 计算指标
# 获取 FreqAI 预测和 RSI
divergence = dataframe["&-price_value_divergence"].iloc[-1] if "&-price_value_divergence" in dataframe else 0
rsi = dataframe["rsi"].iloc[-1] if "rsi" in dataframe else 50
# 计算调整系数
# 1. Divergence 调整:负值(预测上涨)-> 提高 ROI正值预测下跌-> 降低 ROI
divergence_factor = self.linear_map(divergence, -0.1, 0.1, 1.2, 0.8)
# 2. RSI 调整:低 RSI超卖-> 提高 ROI高 RSI超买-> 降低 ROI
rsi_factor = self.linear_map(rsi, 30, 70, 1.2, 0.8)
# 3. 时间调整持仓时间越长ROI 目标降低
time_factor = self.linear_map(trade_dur, 0, 240, 1.0, 0.7) # 4小时后 ROI 降低到 70%
# 综合调整系数
roi_factor = divergence_factor * rsi_factor * time_factor
# 默认 ROI 表格
base_roi = {
0: 0.06,
30: 0.04,
90: 0.025,
270: 0.002
}
# 动态调整 ROI限制在 0% 到 20% 之间
dynamic_roi = {time: min(max(roi * roi_factor, 0.0), 0.2) for time, roi in base_roi.items()}
logger.info(f"[{pair}] Divergence: {divergence:.4f}, RSI: {rsi:.2f}, 持仓时间: {trade_dur} 分钟, "
f"调整系数: divergence={divergence_factor:.2f}, rsi={rsi_factor:.2f}, time={time_factor:.2f}, "
f"总系数={roi_factor:.2f}, 动态 ROI 表格: {dynamic_roi}")
return dynamic_roi
def custom_entry_price(self, pair: str, trade: Trade | None, current_time: datetime, proposed_rate: float,
entry_tag: str | None, side: str, **kwargs) -> float:
adjusted_rate = proposed_rate * (1 - 0.005)
logger.info(f"[{pair}] 自定义买入价:{adjusted_rate:.6f}(原价:{proposed_rate:.6f}")
return adjusted_rate
def custom_exit_price(self, pair: str, trade: Trade,
current_time: datetime, proposed_rate: float,
current_profit: float, exit_tag: str | None, **kwargs) -> float:
adjusted_rate = proposed_rate * (1 + 0.0025)
logger.info(f"[{pair}] 自定义卖出价:{adjusted_rate:.6f}(原价:{proposed_rate:.6f}")
return adjusted_rate
def get_market_trend(self, dataframe: DataFrame = None, metadata: dict = None) -> int:
try:
timeframes = ["3m", "15m", "1h"]
weights = {"3m": 0.3, "15m": 0.35, "1h": 0.35}
trend_scores = {}
pair = metadata.get('pair', 'Unknown') if metadata else 'Unknown'
# 检查 pair 是否有效
if pair == 'Unknown':
logger.error(f"[{pair}] Invalid pair in metadata: {metadata}. Returning default score 50")
return 50
logger.info(f"[{pair}] 正在计算多时间框架市场趋势得分")
for tf in timeframes:
# 优先使用传入的 dataframe如果匹配主时间框架否则加载目标币对数据
pair_df = dataframe if tf == self.timeframe and dataframe is not None else self.dp.get_pair_dataframe(pair=pair, timeframe=tf)
min_candles = 200 if tf == "3m" else 100 if tf == "15m" else 50
if pair_df.empty or len(pair_df) < min_candles:
logger.warning(f"[{pair}] 数据不足({tf}使用默认得分50")
trend_scores[tf] = 50
continue
# 价格趋势
ema_short_period = 50 if tf == "3m" else 20 if tf == "15m" else 12
ema_long_period = 200 if tf == "3m" else 80 if tf == "15m" else 50
pair_df["ema_short"] = ta.EMA(pair_df, timeperiod=ema_short_period)
pair_df["ema_long"] = ta.EMA(pair_df, timeperiod=ema_long_period)
pair_df["ema_short_slope"] = (pair_df["ema_short"] - pair_df["ema_short"].shift(10)) / pair_df["ema_short"].shift(10)
price_above_ema = pair_df["close"].iloc[-1] > pair_df["ema_long"].iloc[-1]
ema_short_above_ema_long = pair_df["ema_short"].iloc[-1] > pair_df["ema_long"].iloc[-1]
ema_short_slope = pair_df["ema_short_slope"].iloc[-1]
price_score = 0
if price_above_ema:
price_score += 20
if ema_short_above_ema_long:
price_score += 20
if ema_short_slope > 0.005:
price_score += 15
elif ema_short_slope < -0.005:
price_score -= 15
# K线形态
pair_df["bullish_engulfing"] = (
(pair_df["close"].shift(1) < pair_df["open"].shift(1)) &
(pair_df["close"] > pair_df["open"]) &
(pair_df["close"] > pair_df["open"].shift(1)) &
(pair_df["open"] < pair_df["close"].shift(1))
).fillna(False)
pair_df["bearish_engulfing"] = (
(pair_df["close"].shift(1) > pair_df["open"].shift(1)) &
(pair_df["close"] < pair_df["open"]) &
(pair_df["close"] < pair_df["open"].shift(1)) &
(pair_df["open"] > pair_df["close"].shift(1))
).fillna(False)
kline_score = 0
if pair_df["bullish_engulfing"].iloc[-1]:
kline_score += 15
elif pair_df["bearish_engulfing"].iloc[-1]:
kline_score -= 15
volatility = pair_df["close"].pct_change(10).std() * 100
if volatility > 0.5:
kline_score += 10 if price_score > 0 else -10
# StochRSI
stochrsi = ta.STOCHRSI(pair_df, timeperiod=14, fastk_period=3, fastd_period=3)
pair_df["stochrsi_k"] = stochrsi["fastk"]
pair_df["stochrsi_d"] = stochrsi["fastd"]
stochrsi_score = 0
stochrsi_k = pair_df["stochrsi_k"].iloc[-1]
stochrsi_d = pair_df["stochrsi_d"].iloc[-1]
if stochrsi_k > 80 and stochrsi_k < stochrsi_d:
stochrsi_score -= 15
elif stochrsi_k < 20 and stochrsi_k > stochrsi_d:
stochrsi_score += 15
elif stochrsi_k > 50:
stochrsi_score += 5
elif stochrsi_k < 50:
stochrsi_score -= 5
# 量价关系
pair_df["volume_mean_20"] = pair_df["volume"].rolling(20).mean()
pair_df["volume_std_20"] = pair_df["volume"].rolling(20).std()
pair_df["volume_z_score"] = (pair_df["volume"] - pair_df["volume_mean_20"]) / pair_df["volume_std_20"]
pair_df["adx"] = ta.ADX(pair_df, timeperiod=14)
volume_score = 0
if pair_df["volume_z_score"].iloc[-1] > 1.5:
volume_score += 10 if price_score > 0 else -10
if pair_df["adx"].iloc[-1] > 25:
volume_score += 10 if price_score > 0 else -10
# 综合得分
raw_score = price_score + kline_score + stochrsi_score + volume_score
raw_score = max(min(raw_score, 50), -50)
# 对数映射到 [0, 100]
if raw_score >= 0:
mapped_score = 50 + 50 * (np.log1p(raw_score / 50) / np.log1p(1))
else:
mapped_score = 50 * (np.log1p(-raw_score / 50) / np.log1p(1))
trend_scores[tf] = max(0, min(100, int(round(mapped_score))))
logger.info(f"[{pair}] {tf} 趋势得分:{trend_scores[tf]}, 原始得分:{raw_score}, "
f"价格得分:{price_score}, K线得分{kline_score}, "
f"StochRSI得分{stochrsi_score}, 量价得分:{volume_score}")
# 动态调整权重
if trend_scores.get("1h", 50) - trend_scores.get("3m", 50) > 20 or trend_scores.get("15m", 50) - trend_scores.get("3m", 50) > 20:
weights = {"3m": 0.2, "15m": 0.35, "1h": 0.45}
logger.info(f"[{pair}] 1h 趋势得分({trend_scores.get('1h', 50)})显著高于 3m{trend_scores.get('3m', 50)}),调整权重为 {weights}")
# 加权融合
final_score = sum(trend_scores[tf] * weights[tf] for tf in timeframes)
final_score = int(round(final_score))
final_score = max(0, min(100, final_score))
logger.info(f"[{pair}] 最终趋势得分:{final_score}, "
f"3m得分{trend_scores.get('3m', 50)}, 15m得分{trend_scores.get('15m', 50)}, "
f"1h得分{trend_scores.get('1h', 50)}")
return final_score
except Exception as e:
logger.error(f"[{pair}] 获取市场趋势失败:{e}", exc_info=True)
return 50
def detect_trend_status(self, dataframe: DataFrame, metadata: dict) -> str:
"""
基于加权分段的trend_score判断趋势状态
规则:
- 将最近20个trend_score分为3段1-3(权重10)4-10(权重7)11-20(权重3)
- 计算加权平均得分映射到0-100区间
- 根据两个可优化的阈值判断趋势状态
"""
pair = metadata.get('pair', 'Unknown')
try:
# 获取最近20个周期的trend_score历史
if len(dataframe) < 20:
logger.warning(f"[{pair}] 数据不足20个周期返回震荡趋势")
return "ranging"
# 计算最近20个周期的trend_score
trend_scores_20 = []
for i in range(-20, 0):
# 获取历史数据片段
hist_df = dataframe.iloc[:i+1] if i != -1 else dataframe
if len(hist_df) > 0:
score = self.get_market_trend(hist_df, metadata)
trend_scores_20.append(score)
else:
trend_scores_20.append(50) # 默认值
# 分段计算加权得分
# 第一段最近1-3个周期 (索引-3到-1)
segment1 = trend_scores_20[-3:]
weighted_score1 = sum(score * 10 for score in segment1) / len(segment1)
# 第二段4-10个周期 (索引-10到-4)
segment2 = trend_scores_20[-10:-3]
weighted_score2 = sum(score * 7 for score in segment2) / len(segment2)
# 第三段11-20个周期 (索引-20到-11)
segment3 = trend_scores_20[-20:-10]
weighted_score3 = sum(score * 3 for score in segment3) / len(segment3)
# 计算最终加权得分
final_weighted_score = (weighted_score1 + weighted_score2 + weighted_score3) / (10 + 7 + 3)
# 将得分映射到0-100区间确保在合理范围内
final_score = max(0, min(100, final_weighted_score))
# 使用hyperopt优化的阈值判断趋势状态
bullish_threshold = self.trend_final_bullish_threshold.value
bearish_threshold = self.trend_final_bearish_threshold.value
# 判定趋势状态
if final_score >= bullish_threshold:
trend_status = "bullish"
logger.info(f"[{pair}] 🚀 检测到上涨趋势: 最终加权得分={final_score:.2f}, 阈值≥{bullish_threshold}")
elif final_score <= bearish_threshold:
trend_status = "bearish"
logger.info(f"[{pair}] 📉 检测到下跌趋势: 最终加权得分={final_score:.2f}, 阈值≤{bearish_threshold}")
else:
trend_status = "ranging"
logger.info(f"[{pair}] ⚖️ 检测到震荡趋势: 最终加权得分={final_score:.2f}, 阈值范围({bearish_threshold}, {bullish_threshold})")
# 输出分段详细信息用于调试
logger.debug(f"[{pair}] 趋势分析详情 - "
f"第一段(1-3,权重10): {[f'{s:.1f}' for s in segment1]}, "
f"第二段(4-10,权重7): {[f'{s:.1f}' for s in segment2]}, "
f"第三段(11-20,权重3): {[f'{s:.1f}' for s in segment3]}")
return trend_status
except Exception as e:
logger.error(f"[{pair}] 趋势状态检测失败: {e}")
return "ranging"