myTestFreqAI/freqtrade/templates/freqaiprimer.py
zhangkun9038@dingtalk.com 9209b48799 up
2025-06-29 06:57:31 +00:00

651 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import numpy as np
import datetime
import os
import json
import glob
from functools import reduce
from freqtrade.persistence import Trade
import talib.abstract as ta
from pandas import DataFrame
from typing import Dict
from freqtrade.strategy import (DecimalParameter, IStrategy, IntParameter)
logger = logging.getLogger(__name__)
class FreqaiPrimer(IStrategy):
"""
基于 FreqAI 的动态阈值交易策略,集成动态加仓和减仓逻辑,兼容最新 Freqtrade 版本
"""
# --- 🧪 Hyperopt Parameters ---
TRAILING_STOP_START = DecimalParameter(0.01, 0.05, default=0.03, space='sell', optimize=True)
TRAILING_STOP_DISTANCE = DecimalParameter(0.005, 0.02, default=0.01, space='sell', optimize=True)
BUY_THRESHOLD_MIN = DecimalParameter(-0.1, -0.01, default=-0.05, space='buy', optimize=True)
BUY_THRESHOLD_MAX = DecimalParameter(-0.02, -0.001, default=-0.005, space='buy', optimize=True)
SELL_THRESHOLD_MIN = DecimalParameter(0.001, 0.02, default=0.005, space='sell', optimize=True)
SELL_THRESHOLD_MAX = DecimalParameter(0.02, 0.1, default=0.05, space='sell', optimize=True)
# 新增:加仓和减仓参数
ADD_POSITION_THRESHOLD = DecimalParameter(-0.05, -0.01, default=-0.02, space='buy', optimize=True)
EXIT_POSITION_RATIO = DecimalParameter(0.2, 0.7, default=0.5, space='sell', optimize=True)
COOLDOWN_PERIOD_MINUTES = IntParameter(1, 10, default=5, space='buy', optimize=True)
MAX_ENTRY_POSITION_ADJUSTMENT = IntParameter(1, 3, default=2, space='buy', optimize=True)
# --- 🛠️ 固定配置参数 ---
stoploss = -0.015
timeframe = "3m"
use_custom_stoploss = True
position_adjustment_enable = True # 启用动态仓位调整
plot_config = {
"main_plot": {
"ema200": {"color": "blue"},
"bb_upperband": {"color": "gray"},
"bb_lowerband": {"color": "gray"},
"bb_middleband": {"color": "gray"}
},
"subplots": {
"Signals": {
"enter_long": {"color": "green"},
"exit_long": {"color": "red"}
},
"Price-Value Divergence": {
"&-price_value_divergence": {"color": "purple"}
},
"Volume Z-Score": {
"volume_z_score": {"color": "orange"}
},
"RSI": {
"rsi": {"color": "cyan"}
}
}
}
freqai_info = {
"identifier": "test58",
"model": "LightGBMRegressor",
"feature_parameters": {
"include_timeframes": ["3m", "15m", "1h"],
"label_period_candles": 12,
"include_shifted_candles": 3,
},
"data_split_parameters": {
"test_size": 0.2,
"shuffle": False,
},
"model_training_parameters": {
"n_estimators": 200,
"learning_rate": 0.05,
"num_leaves": 31,
"verbose": -1,
},
"fit_live_predictions_candles": 100,
"live_retrain_candles": 100,
}
@staticmethod
def linear_map(value, from_min, from_max, to_min, to_max):
return (value - from_min) / (from_max - from_min) * (to_max - to_min) + to_min
def __init__(self, config: dict, *args, **kwargs):
super().__init__(config, *args, **kwargs)
logger.setLevel(logging.DEBUG)
logger.debug("✅ 策略已初始化,日志级别设置为 DEBUG")
self.trailing_stop_enabled = False
self.pair_stats = {}
self.stats_logged = False
self.fit_live_predictions_candles = self.freqai_info.get("fit_live_predictions_candles", 100)
self.last_entry_time = {} # 记录每个币种的最后入场时间
def feature_engineering_expand_all(self, dataframe: DataFrame, period: int, metadata: dict, **kwargs) -> DataFrame:
dataframe["%-rsi-period"] = ta.RSI(dataframe, timeperiod=period)
dataframe["%-sma-period"] = ta.SMA(dataframe, timeperiod=period)
dataframe["%-ema-period"] = ta.EMA(dataframe, timeperiod=period)
real = ta.TYPPRICE(dataframe)
upperband, middleband, lowerband = ta.BBANDS(real, timeperiod=period, nbdevup=2.0, nbdevdn=2.0)
dataframe["bb_lowerband-period"] = lowerband
dataframe["bb_upperband-period"] = upperband
dataframe["bb_middleband-period"] = middleband
dataframe["%-bb_width-period"] = (dataframe["bb_upperband-period"] - dataframe["bb_lowerband-period"]) / dataframe["bb_middleband-period"]
dataframe["%-mfi-period"] = ta.MFI(dataframe, timeperiod=period)
dataframe["%-adx-period"] = ta.ADX(dataframe, timeperiod=period)
dataframe["%-relative_volume-period"] = dataframe["volume"] / dataframe["volume"].rolling(period).mean()
dataframe["ema200"] = ta.EMA(dataframe, timeperiod=200)
dataframe["%-price_value_divergence"] = (dataframe["close"] - dataframe["ema200"]) / dataframe["ema200"]
columns_to_clean = [
"%-rsi-period", "%-mfi-period", "%-sma-period", "%-ema-period", "%-adx-period",
"bb_lowerband-period", "bb_middleband-period", "bb_upperband-period",
"%-bb_width-period", "%-relative_volume-period", "%-price_value_divergence"
]
for col in columns_to_clean:
dataframe[col] = dataframe[col].replace([np.inf, -np.inf], 0).ffill().fillna(0)
pair = metadata.get('pair', 'Unknown')
logger.debug(f"[{pair}] 特征工程完成,列:{list(dataframe.columns)}")
return dataframe
def set_freqai_targets(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame:
pair = metadata.get('pair', 'Unknown')
if len(dataframe) < 200:
logger.warning(f"[{pair}] 数据量不足({len(dataframe)}根K线需要至少200根K线进行训练")
return dataframe
dataframe["ema200"] = ta.EMA(dataframe, timeperiod=200)
dataframe["&-price_value_divergence"] = (dataframe["close"] - dataframe["ema200"]) / dataframe["ema200"]
dataframe["volume_mean_20"] = dataframe["volume"].rolling(20).mean()
dataframe["volume_std_20"] = dataframe["volume"].rolling(20).std()
dataframe["volume_z_score"] = (dataframe["volume"] - dataframe["volume_mean_20"]) / dataframe["volume_std_20"]
dataframe["&-price_value_divergence"] = dataframe["&-price_value_divergence"].replace([np.inf, -np.inf], 0).ffill().fillna(0)
dataframe["volume_z_score"] = dataframe["volume_z_score"].replace([np.inf, -np.inf], 0).ffill().fillna(0)
return dataframe
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
pair = metadata.get('pair', 'Unknown')
logger.info(f"[{pair}] 当前可用列调用FreqAI前{list(dataframe.columns)}")
# 计算200周期EMA和历史价值背离
dataframe["ema200"] = ta.EMA(dataframe, timeperiod=200)
dataframe["price_value_divergence"] = (dataframe["close"] - dataframe["ema200"]) / dataframe["ema200"]
# 调用FreqAI预测价值背离
if not hasattr(self, 'freqai') or self.freqai is None:
logger.error(f"[{pair}] FreqAI 未初始化,请确保回测命令中启用了 --freqai")
dataframe["&-price_value_divergence"] = dataframe["price_value_divergence"]
else:
logger.debug(f"self.freqai 类型:{type(self.freqai)}")
dataframe = self.freqai.start(dataframe, metadata, self)
if "&-price_value_divergence" not in dataframe.columns:
logger.warning(f"[{pair}] 回归模型未生成 &-price_value_divergence回退到规则计算")
dataframe["&-price_value_divergence"] = dataframe["price_value_divergence"]
# 计算其他指标
upperband, middleband, lowerband = ta.BBANDS(dataframe["close"], timeperiod=20, nbdevup=2.0, nbdevdn=2.0)
dataframe["bb_upperband"] = upperband
dataframe["bb_middleband"] = middleband
dataframe["bb_lowerband"] = lowerband
dataframe["rsi"] = ta.RSI(dataframe, timeperiod=14)
dataframe["volume_mean_20"] = dataframe["volume"].rolling(20).mean()
dataframe["volume_std_20"] = dataframe["volume"].rolling(20).std()
dataframe["volume_z_score"] = (dataframe["volume"] - dataframe["volume_mean_20"]) / dataframe["volume_std_20"]
# 数据清理
for col in ["ema200", "bb_upperband", "bb_middleband", "bb_lowerband", "rsi", "volume_z_score", "&-price_value_divergence", "price_value_divergence"]:
dataframe[col] = dataframe[col].replace([np.inf, -np.inf], 0).ffill().fillna(0)
# 添加调试日志
logger.debug(f"[{pair}] 最新数据 - close{dataframe['close'].iloc[-1]:.6f}, "
f"rsi{dataframe['rsi'].iloc[-1]:.2f}, "
f"&-price_value_divergence{dataframe['&-price_value_divergence'].iloc[-1]:.6f}, "
f"volume_z_score{dataframe['volume_z_score'].iloc[-1]:.2f}, "
f"bb_lowerband{dataframe['bb_lowerband'].iloc[-1]:.6f}")
# 获取 labels_mean 和 labels_std
labels_mean = None
labels_std = None
try:
model_base_dir = os.path.join(self.config["user_data_dir"], "models", self.freqai_info["identifier"])
pair_base = pair.split('/')[0] if '/' in pair else pair
sub_dirs = glob.glob(os.path.join(model_base_dir, f"sub-train-{pair_base}_*"))
if not sub_dirs:
logger.warning(f"[{pair}] 未找到任何子目录:{model_base_dir}/sub-train-{pair_base}_*")
else:
latest_sub_dir = max(sub_dirs, key=lambda x: int(x.split('_')[-1]))
pair_base_lower = pair_base.lower()
timestamp = latest_sub_dir.split('_')[-1]
metadata_file = os.path.join(latest_sub_dir, f"cb_{pair_base_lower}_{timestamp}_metadata.json")
if os.path.exists(metadata_file):
with open(metadata_file, "r") as f:
metadata = json.load(f)
labels_mean = metadata["labels_mean"]["&-price_value_divergence"]
labels_std = metadata["labels_std"]["&-price_value_divergence"]
logger.info(f"[{pair}] 从最新子目录 {latest_sub_dir} 读取 labels_mean{labels_mean}, labels_std{labels_std}")
else:
logger.warning(f"[{pair}] 最新的 metadata.json 文件 {metadata_file} 不存在")
except Exception as e:
logger.warning(f"[{pair}] 无法从子目录读取 labels_mean 和 labels_std{e},重新计算")
if labels_mean is None or labels_std is None:
logger.warning(f"[{pair}] 无法获取 labels_mean 和 labels_std重新计算")
dataframe["&-price_value_divergence_actual"] = (dataframe["close"] - dataframe["ema200"]) / dataframe["ema200"]
dataframe["&-price_value_divergence_actual"] = dataframe["&-price_value_divergence_actual"].replace([np.inf, -np.inf], 0).ffill().fillna(0)
recent_data = dataframe["&-price_value_divergence_actual"].tail(self.fit_live_predictions_candles)
labels_mean = recent_data.mean()
labels_std = recent_data.std()
if np.isnan(labels_std) or labels_std == 0:
labels_std = 0.01
logger.warning(f"[{pair}] labels_std 计算异常,使用默认值 0.01")
# 根据市场趋势得分动态调整买卖阈值
market_trend_score = self.get_market_trend()
k_buy = FreqaiPrimer.linear_map(market_trend_score, 0, 100, 1.2, 0.8)
k_sell = FreqaiPrimer.linear_map(market_trend_score, 0, 100, 1.5, 1.0)
self.buy_threshold = labels_mean - k_buy * labels_std
self.sell_threshold = labels_mean + k_sell * labels_std
# 使用 Hyperopt 参数限制阈值
self.buy_threshold = max(self.buy_threshold, self.BUY_THRESHOLD_MIN.value)
self.buy_threshold = min(self.buy_threshold, self.BUY_THRESHOLD_MAX.value)
self.sell_threshold = min(self.sell_threshold, self.SELL_THRESHOLD_MAX.value)
self.sell_threshold = max(self.sell_threshold, self.SELL_THRESHOLD_MIN.value)
logger.info(f"[{pair}] 市场趋势得分:{market_trend_score}, labels_mean{labels_mean:.4f}, labels_std{labels_std:.4f}")
logger.info(f"[{pair}] k_buy{k_buy:.2f}, k_sell{k_sell:.2f}")
logger.info(f"[{pair}] 动态买入阈值:{self.buy_threshold:.4f}, 卖出阈值:{self.sell_threshold:.4f}")
if not self.stats_logged:
logger.info("===== 所有币对的 labels_mean 和 labels_std 汇总 =====")
for p, stats in self.pair_stats.items():
logger.info(f"[{p}] labels_mean{stats['labels_mean']:.4f}, labels_std{stats['labels_std']:.4f}")
logger.info("==============================================")
self.stats_logged = True
return dataframe
def trailing_space(self):
return [
DecimalParameter(0.01, 0.05, name="trailing_stop_start"),
DecimalParameter(0.005, 0.02, name="trailing_stop_distance")
]
def leverage_space(self):
return [
DecimalParameter(-0.05, -0.01, name="add_position_threshold", default=-0.02),
DecimalParameter(0.2, 0.7, name="exit_position_ratio", default=0.5),
IntParameter(1, 10, name="cooldown_period_minutes", default=5),
IntParameter(1, 3, name="max_entry_position_adjustment", default=2)
]
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
pair = metadata.get('pair', 'Unknown')
conditions = []
# 获取市场趋势得分
trend_score = self.get_market_trend(dataframe=dataframe, metadata=metadata)
# 动态调整成交量阈值牛市trend_score=100-> 0.5熊市trend_score=0-> 1.5
volume_z_score_min = 0.5
volume_z_score_max = 1.5
volume_z_score_threshold = self.linear_map(trend_score, 0, 100, volume_z_score_max, volume_z_score_min)
# 动态调整 RSI 阈值牛市trend_score=100-> 40熊市trend_score=0-> 60
rsi_min = 40
rsi_max = 60
rsi_threshold = self.linear_map(trend_score, 0, 100, rsi_max, rsi_min)
# 新增:动态调整 STOCHRSI 阈值,牛市 -> 30熊市 -> 50
stochrsi_min = 30
stochrsi_max = 50
stochrsi_threshold = self.linear_map(trend_score, 0, 100, stochrsi_max, stochrsi_min)
if "&-price_value_divergence" in dataframe.columns:
# 计算 STOCHRSI
stochrsi = ta.STOCHRSI(dataframe, timeperiod=14, fastk_period=3, fastd_period=3)
dataframe["stochrsi_k"] = stochrsi["fastk"]
cond1 = (dataframe["&-price_value_divergence"] < self.buy_threshold)
cond2 = (dataframe["volume_z_score"] > volume_z_score_threshold)
cond3 = (dataframe["rsi"] < rsi_threshold)
cond4 = (dataframe["close"] <= dataframe["bb_lowerband"])
cond5 = (dataframe["stochrsi_k"] < stochrsi_threshold) # 新增 STOCHRSI 条件
buy_condition = cond1 & cond2 & cond3 & cond4 & cond5
conditions.append(buy_condition)
divergence_value = dataframe['&-price_value_divergence'].iloc[-1] if not dataframe['&-price_value_divergence'].isna().all() else np.nan
volume_z_score_value = dataframe['volume_z_score'].iloc[-1] if not dataframe['volume_z_score'].isna().all() else np.nan
rsi_value = dataframe['rsi'].iloc[-1] if not dataframe['rsi'].isna().all() else np.nan
stochrsi_value = dataframe['stochrsi_k'].iloc[-1] if not dataframe['stochrsi_k'].isna().all() else np.nan
logger.debug(f"[{pair}] 买入条件检查 - "
f"&-price_value_divergence={divergence_value:.6f} < {self.buy_threshold:.6f}: {cond1.iloc[-1]}, "
f"volume_z_score={volume_z_score_value:.2f} > {volume_z_score_threshold:.2f}: {cond2.iloc[-1]}, "
f"rsi={rsi_value:.2f} < {rsi_threshold:.2f}: {cond3.iloc[-1]}, "
f"close={dataframe['close'].iloc[-1]:.6f} <= bb_lowerband={dataframe['bb_lowerband'].iloc[-1]:.6f}: {cond4.iloc[-1]}, "
f"stochrsi_k={stochrsi_value:.2f} < {stochrsi_threshold:.2f}: {cond5.iloc[-1]}")
else:
logger.warning(f"[{pair}] ⚠️ &-price_value_divergence 列缺失,跳过买入信号生成")
if len(conditions) > 0:
combined_condition = reduce(lambda x, y: x & y, conditions)
if combined_condition.any():
dataframe.loc[combined_condition, 'enter_long'] = 1
logger.info(f"[{pair}] 买入信号触发,条件满足,趋势得分:{trend_score:.2f}")
else:
logger.debug(f"[{pair}] 买入条件未满足,无买入信号")
else:
logger.debug(f"[{pair}] 无有效买入条件")
return dataframe
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
pair = metadata.get('pair', 'Unknown')
conditions = []
if "&-price_value_divergence" in dataframe.columns:
cond1 = (dataframe["&-price_value_divergence"] > self.sell_threshold)
cond2 = (dataframe["rsi"] > 75)
sell_condition = cond1 | cond2
conditions.append(sell_condition)
logger.debug(f"[{pair}] 卖出条件检查 - "
f"&-price_value_divergence > {self.sell_threshold:.6f}: {cond1.iloc[-1]}, "
f"rsi > 75: {cond2.iloc[-1]}")
else:
logger.warning(f"[{pair}] ⚠️ &-price_value_divergence 列缺失,跳过该条件")
if len(conditions) > 0:
dataframe.loc[reduce(lambda x, y: x & y, conditions), 'exit_long'] = 1
logger.debug(f"[{pair}] 出场信号触发,条件满足")
else:
logger.debug(f"[{pair}] 无有效卖出条件")
return dataframe
def buy_space(self):
return [
DecimalParameter(-0.1, -0.01, name="buy_threshold_min"),
DecimalParameter(-0.02, -0.001, name="buy_threshold_max"),
DecimalParameter(-0.05, -0.01, name="add_position_threshold", default=-0.02),
IntParameter(1, 10, name="cooldown_period_minutes", default=5),
IntParameter(1, 3, name="max_entry_position_adjustment", default=2)
]
def sell_space(self):
return [
DecimalParameter(0.001, 0.02, name="sell_threshold_min"),
DecimalParameter(0.02, 0.1, name="sell_threshold_max"),
DecimalParameter(0.2, 0.7, name="exit_position_ratio", default=0.5)
]
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float,
min_stake: float | None, max_stake: float,
current_entry_rate: float, current_exit_rate: float,
current_entry_profit: float, current_exit_profit: float,
**kwargs) -> float | None | tuple[float | None, str | None]:
"""
动态调整仓位:支持加仓、减仓、追踪止损和最大持仓时间限制
参数:
- trade: 当前交易对象
- current_time: 当前时间
- current_rate: 当前价格
- current_profit: 当前总盈利
- min_stake: 最小下注金额
- max_stake: 最大下注金额
- current_entry_rate: 当前入场价格
- current_exit_rate: 当前退出价格
- current_entry_profit: 当前入场盈利
- current_exit_profit: 当前退出盈利
返回:
- 调整金额(正数为加仓,负数为减仓)或 None
"""
pair = trade.pair
dataframe = self.dp.get_pair_dataframe(pair, self.timeframe)
trend_score = self.get_market_trend(dataframe=dataframe, metadata={'pair': pair})
hold_time = (current_time - trade.open_date_utc).total_seconds() / 60
profit_ratio = (current_rate - trade.open_rate) / trade.open_rate
initial_stake_amount = trade.stake_amount / 3
logger.debug(f"{pair} 首次入场金额: {initial_stake_amount:.2f}, 当前持仓金额: {trade.stake_amount:.2f}, "
f"加仓次数: {trade.nr_of_successful_entries - 1}")
# 加仓逻辑
max_entry_adjustments = self.MAX_ENTRY_POSITION_ADJUSTMENT.value
if trade.nr_of_successful_entries <= max_entry_adjustments + 1:
add_position_threshold = self.ADD_POSITION_THRESHOLD.value
# 线性映射加仓阈值,趋势值越高,加仓越严格
add_threshold = 80 - 30 * (trend_score / 100) # 趋势值 100 -> 50, 0 -> 80
if profit_ratio <= add_position_threshold and hold_time > 5 and trend_score <= add_threshold:
logger.debug(f"{pair} 初始下注金额: {initial_stake_amount:.2f}, trend_score: {trend_score:.2f}, add_threshold: {add_threshold} ")
# 计算加仓金额
add_count = trade.nr_of_successful_entries - 1
multipliers = [2, 4, 8]
if add_count < len(multipliers):
multiplier = multipliers[add_count]
add_amount = initial_stake_amount * multiplier
logger.debug(f"{pair}{add_count + 1} 次加仓,倍数={multiplier}, "
f"金额 = {initial_stake_amount:.2f} * {multiplier} = {add_amount:.2f}")
logger.debug(f"{pair} 加仓计算: 第 {add_count + 1} 次加仓,倍数={multiplier}, "
f"金额 = {initial_stake_amount:.2f} * {multiplier} = {add_amount:.2f}")
if min_stake is not None and add_amount < min_stake:
logger.warning(f"{pair} 加仓金额 {add_amount:.2f} 低于最小下注金额 {min_stake:.2f},取消加仓")
return (None, f"Add amount {add_amount:.2f} below min_stake {min_stake:.2f}")
if add_amount > max_stake:
logger.warning(f"{pair} 加仓金额 {add_amount:.2f} 超出最大可用金额 {max_stake:.2f},调整为 {max_stake:.2f}")
add_amount = max_stake
logger.info(f"{pair} 价格下跌 {profit_ratio*100:.2f}%,触发第 {add_count + 1} 次加仓 {add_amount:.2f}")
return (add_amount, f"Price dropped {profit_ratio*100:.2f}%, add {add_amount:.2f}")
# 减仓逻辑
exit_position_ratio = self.EXIT_POSITION_RATIO.value
if profit_ratio >= 0.03:
# 趋势值越高,减仓比例越低
reduce_factor = 0.6 + 0.4 * (1 - trend_score / 100) # 牛市(100) -> 0.6, 熊市(0) -> 1.0
reduce_amount = -exit_position_ratio * reduce_factor * trade.stake_amount
logger.info(f"{pair} 趋势值 {trend_score:.2f},利润 {profit_ratio*100:.2f}%,减仓 {abs(reduce_amount):.2f}")
return (reduce_amount, f"Profit {profit_ratio*100:.2f}%")
elif profit_ratio >= 0.05:
reduce_factor = 1.4 - 0.4 * (trend_score / 100) # 牛市(100) -> 1.0, 熊市(0) -> 1.4
reduce_amount = -exit_position_ratio * reduce_factor * trade.stake_amount
logger.info(f"{pair} 趋势值 {trend_score:.2f},利润 {profit_ratio*100:.2f}%,减仓 {abs(reduce_amount):.2f}")
return (reduce_amount, f"Profit {profit_ratio*100:.2f}%")
# 追踪止损逻辑
trailing_stop_start = self.TRAILING_STOP_START.value
trailing_stop_distance = self.TRAILING_STOP_DISTANCE.value
# 使用 Sigmoid 映射调整追踪止损参数
sigmoid = 1 / (1 + np.exp(-0.1 * (trend_score - 50)))
trailing_factor = 0.8 + (1.2 - 0.8) * sigmoid # 牛市(100) -> 1.2, 熊市(0) -> 0.8
distance_factor = 0.7 + (1.5 - 0.7) * sigmoid # 牛市(100) -> 1.5, 熊市(0) -> 0.7
trailing_stop_start *= trailing_factor
trailing_stop_distance *= distance_factor
if profit_ratio >= trailing_stop_start and not self.trailing_stop_enabled:
self.trailing_stop_enabled = True
trade.adjust_min_max_rates(current_rate, current_rate)
logger.info(f"{pair} 价格上涨超过 {trailing_stop_start*100:.1f}%,启动追踪止损")
return None
if self.trailing_stop_enabled:
max_rate = trade.max_rate or current_rate
trailing_stop_price = max_rate * (1 - trailing_stop_distance)
if current_rate < trailing_stop_price:
logger.info(f"{pair} 价格回落至 {trailing_stop_price:.6f},触发全部卖出")
return (-trade.stake_amount, f"Trailing stop at {trailing_stop_price:.6f}")
trade.adjust_min_max_rates(current_rate, trade.min_rate)
return None
return None
def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float,
time_in_force: str, current_time: datetime, **kwargs) -> bool:
# 调试日志:记录输入参数
logger.debug(f"[{pair}] confirm_trade_entry called with rate={rate}, type(rate)={type(rate)}, "
f"amount={amount}, order_type={order_type}, time_in_force={time_in_force}")
# 检查 rate 是否有效
if not isinstance(rate, (float, int)) or rate is None:
logger.error(f"[{pair}] Invalid rate value: {rate} (type: {type(rate)}). Skipping trade entry.")
return False
market_trend_score = self.get_market_trend()
cooldown_period_minutes = self.COOLDOWN_PERIOD_MINUTES.value if market_trend_score > 50 else self.COOLDOWN_PERIOD_MINUTES.value // 2
if pair in self.last_entry_time:
last_time = self.last_entry_time[pair]
if (current_time - last_time).total_seconds() < cooldown_period_minutes * 60:
logger.info(f"[{pair}] 冷却期内({cooldown_period_minutes} 分钟),跳过本次入场")
return False
self.last_entry_time[pair] = current_time
self.trailing_stop_enabled = False
try:
logger.info(f"[{pair}] 确认入场,价格:{float(rate):.6f}")
except (ValueError, TypeError) as e:
logger.error(f"[{pair}] Failed to format rate: {rate} (type: {type(rate)}), error: {e}")
return False
return True
def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float,
rate: float, time_in_force: str, exit_reason: str,
current_time: datetime, **kwargs) -> bool:
adjusted_rate = rate * (1 + 0.0025)
logger.info(f"[{pair}] 退出交易,原因:{exit_reason}, 原始利润:{trade.calc_profit_ratio(rate):.2%},"
f"调整后卖出价:{adjusted_rate:.6f}")
return True
def custom_entry_price(self, pair: str, trade: Trade | None, current_time: datetime, proposed_rate: float,
entry_tag: str | None, side: str, **kwargs) -> float:
adjusted_rate = proposed_rate * (1 - 0.005)
logger.debug(f"[{pair}] 自定义买入价:{adjusted_rate:.6f}(原价:{proposed_rate:.6f}")
return adjusted_rate
def custom_exit_price(self, pair: str, trade: Trade,
current_time: datetime, proposed_rate: float,
current_profit: float, exit_tag: str | None, **kwargs) -> float:
adjusted_rate = proposed_rate * (1 + 0.0025)
logger.debug(f"[{pair}] 自定义卖出价:{adjusted_rate:.6f}(原价:{proposed_rate:.6f}")
return adjusted_rate
def get_market_trend(self, dataframe: DataFrame = None, metadata: dict = None) -> int:
try:
timeframes = ["3m", "15m", "1h"]
weights = {"3m": 0.5, "15m": 0.3, "1h": 0.2} # 默认权重
trend_scores = {}
pair = metadata.get('pair', 'Unknown') if metadata else 'Unknown'
logger.debug(f"[{pair}] 正在计算多时间框架市场趋势得分")
for tf in timeframes:
if tf == "3m" and dataframe is not None:
btc_df = dataframe
else:
btc_df = self.dp.get_pair_dataframe("BTC/USDT", tf)
min_candles = 200 if tf == "3m" else 100 if tf == "15m" else 50
if len(btc_df) < min_candles:
logger.warning(f"BTC 数据不足({tf}{len(btc_df)} 根K线使用默认得分50")
trend_scores[tf] = 50
continue
# 价格趋势
ema_short_period = 50 if tf == "3m" else 20 if tf == "15m" else 12
ema_long_period = 200 if tf == "3m" else 80 if tf == "15m" else 50
btc_df["ema_short"] = ta.EMA(btc_df, timeperiod=ema_short_period)
btc_df["ema_long"] = ta.EMA(btc_df, timeperiod=ema_long_period)
btc_df["ema_short_slope"] = (btc_df["ema_short"] - btc_df["ema_short"].shift(10)) / btc_df["ema_short"].shift(10)
price_above_ema = btc_df["close"].iloc[-1] > btc_df["ema_long"].iloc[-1]
ema_short_above_ema_long = btc_df["ema_short"].iloc[-1] > btc_df["ema_long"].iloc[-1]
ema_short_slope = btc_df["ema_short_slope"].iloc[-1]
price_score = 0
if price_above_ema:
price_score += 20
if ema_short_above_ema_long:
price_score += 20
if ema_short_slope > 0.005:
price_score += 15
elif ema_short_slope < -0.005:
price_score -= 15
# K线形态
btc_df["bullish_engulfing"] = (
(btc_df["close"].shift(1) < btc_df["open"].shift(1)) &
(btc_df["close"] > btc_df["open"]) &
(btc_df["close"] > btc_df["open"].shift(1)) &
(btc_df["open"] < btc_df["close"].shift(1))
)
btc_df["bearish_engulfing"] = (
(btc_df["close"].shift(1) > btc_df["open"].shift(1)) &
(btc_df["close"] < btc_df["open"]) &
(btc_df["close"] < btc_df["open"].shift(1)) &
(btc_df["open"] > btc_df["close"].shift(1))
)
kline_score = 0
if btc_df["bullish_engulfing"].iloc[-1]:
kline_score += 15
elif btc_df["bearish_engulfing"]:
kline_score -= 15
volatility = btc_df["close"].pct_change(10).std() * 100
if volatility > 0.5:
kline_score += 10 if price_score > 0 else -10
# StochRSI
stochrsi = ta.STOCHRSI(btc_df, timeperiod=14, fastk_period=3, fastd_period=3)
btc_df["stochrsi_k"] = stochrsi["fastk"]
btc_df["stochrsi_d"] = stochrsi["fastd"]
stochrsi_score = 0
stochrsi_k = btc_df["stochrsi_k"].iloc[-1]
stochrsi_d = btc_df["stochrsi_d"].iloc[-1]
if stochrsi_k > 80 and stochrsi_k < stochrsi_d:
stochrsi_score -= 15
elif stochrsi_k < 20 and stochrsi_k > stochrsi_d:
stochrsi_score += 15
elif stochrsi_k > 50:
stochrsi_score += 5
elif stochrsi_k < 50:
stochrsi_score -= 5
# 量价关系
btc_df["volume_mean_20"] = btc_df["volume"].rolling(20).mean()
btc_df["volume_std_20"] = btc_df["volume"].rolling(20).std()
btc_df["volume_z_score"] = (btc_df["volume"] - btc_df["volume_mean_20"]) / btc_df["volume_std_20"]
btc_df["adx"] = ta.ADX(btc_df, timeperiod=14)
volume_score = 0
if btc_df["volume_z_score"].iloc[-1] > 1.5:
volume_score += 10 if price_score > 0 else -10
if btc_df["adx"].iloc[-1] > 25:
volume_score += 10 if price_score > 0 else -10
# 综合得分
raw_score = price_score + kline_score + stochrsi_score + volume_score
raw_score = max(min(raw_score, 50), -50)
# 对数映射到 [0, 100]
if raw_score >= 0:
mapped_score = 50 + 50 * (np.log1p(raw_score / 50) / np.log1p(1))
else:
mapped_score = 50 * (np.log1p(-raw_score / 50) / np.log1p(1))
trend_scores[tf] = max(0, min(100, int(round(mapped_score))))
logger.debug(f"[{pair}] {tf} 趋势得分:{trend_scores[tf]}, 原始得分:{raw_score}, "
f"价格得分:{price_score}, K线得分{kline_score}, "
f"StochRSI得分{stochrsi_score}, 量价得分:{volume_score}")
# 动态调整权重:当 1h 得分显著高于 3m差值 > 20提高 1h 权重
if trend_scores.get("1h", 50) - trend_scores.get("3m", 50) > 20:
weights = {"3m": 0.3, "15m": 0.3, "1h": 0.4}
logger.debug(f"[{pair}] 1h 趋势得分({trend_scores.get('1h', 50)})显著高于 3m{trend_scores.get('3m', 50)}),调整权重为 {weights}")
# 加权融合
final_score = sum(trend_scores[tf] * weights[tf] for tf in timeframes)
final_score = int(round(final_score))
final_score = max(0, min(100, final_score))
logger.info(f"[{pair}] 最终趋势得分:{final_score}, "
f"3m得分{trend_scores.get('3m', 50)}, 15m得分{trend_scores.get('15m', 50)}, "
f"1h得分{trend_scores.get('1h', 50)}")
return final_score
except Exception as e:
logger.error(f"[{pair}] 获取市场趋势失败:{e}", exc_info=True)
return 50