myTestFreqAI/freqtrade/templates/freqaiprimer.py
zhangkun9038@dingtalk.com db95f4bb58 add logs
2025-06-23 16:36:21 +08:00

720 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import numpy as np
import datetime
import os
import json
import glob
from functools import reduce
from freqtrade.persistence import Trade
import talib.abstract as ta
from pandas import DataFrame
from typing import Dict
from freqtrade.strategy import (DecimalParameter, IStrategy, IntParameter)
logger = logging.getLogger(__name__)
class FreqaiPrimer(IStrategy):
"""
基于 FreqAI 的动态阈值交易策略,集成动态加仓和减仓逻辑,兼容最新 Freqtrade 版本
"""
# --- 🧪 Hyperopt Parameters ---
ROI_T0 = DecimalParameter(0.01, 0.05, default=0.02, space='roi', optimize=True)
ROI_T1 = DecimalParameter(0.005, 0.02, default=0.01, space='roi', optimize=True)
ROI_T2 = DecimalParameter(0.0, 0.01, default=0.0, space='roi', optimize=True)
TRAILING_STOP_START = DecimalParameter(0.01, 0.05, default=0.03, space='sell', optimize=True)
TRAILING_STOP_DISTANCE = DecimalParameter(0.005, 0.02, default=0.01, space='sell', optimize=True)
BUY_THRESHOLD_MIN = DecimalParameter(-0.1, -0.01, default=-0.05, space='buy', optimize=True)
BUY_THRESHOLD_MAX = DecimalParameter(-0.02, -0.001, default=-0.005, space='buy', optimize=True)
SELL_THRESHOLD_MIN = DecimalParameter(0.001, 0.02, default=0.005, space='sell', optimize=True)
SELL_THRESHOLD_MAX = DecimalParameter(0.02, 0.1, default=0.05, space='sell', optimize=True)
# 新增:加仓和减仓参数
ADD_POSITION_THRESHOLD = DecimalParameter(-0.05, -0.01, default=-0.02, space='buy', optimize=True)
EXIT_POSITION_RATIO = DecimalParameter(0.2, 0.7, default=0.5, space='sell', optimize=True)
COOLDOWN_PERIOD_MINUTES = IntParameter(1, 10, default=5, space='buy', optimize=True)
MAX_ENTRY_POSITION_ADJUSTMENT = IntParameter(1, 3, default=2, space='buy', optimize=True)
# --- 🛠️ 固定配置参数 ---
minimal_roi = {
"0": ROI_T0.value,
"30": ROI_T1.value,
"60": ROI_T2.value
}
stoploss = -0.015
timeframe = "3m"
use_custom_stoploss = True
position_adjustment_enable = True # 启用动态仓位调整
plot_config = {
"main_plot": {
"ema200": {"color": "blue"},
"bb_upperband": {"color": "gray"},
"bb_lowerband": {"color": "gray"},
"bb_middleband": {"color": "gray"}
},
"subplots": {
"Signals": {
"enter_long": {"color": "green"},
"exit_long": {"color": "red"}
},
"Price-Value Divergence": {
"&-price_value_divergence": {"color": "purple"}
},
"Volume Z-Score": {
"volume_z_score": {"color": "orange"}
},
"RSI": {
"rsi": {"color": "cyan"}
}
}
}
freqai_info = {
"identifier": "test58",
"model": "LightGBMRegressor",
"feature_parameters": {
"include_timeframes": ["3m", "15m", "1h"],
"label_period_candles": 12,
"include_shifted_candles": 3,
},
"data_split_parameters": {
"test_size": 0.2,
"shuffle": False,
},
"model_training_parameters": {
"n_estimators": 200,
"learning_rate": 0.05,
"num_leaves": 31,
"verbose": -1,
},
"fit_live_predictions_candles": 100,
"live_retrain_candles": 100,
}
def __init__(self, config: dict, *args, **kwargs):
super().__init__(config, *args, **kwargs)
logger.setLevel(logging.DEBUG)
logger.debug("✅ 策略已初始化,日志级别设置为 DEBUG")
self.trailing_stop_enabled = False
self.pair_stats = {}
self.stats_logged = False
self.fit_live_predictions_candles = self.freqai_info.get("fit_live_predictions_candles", 100)
self.last_entry_time = {} # 记录每个币种的最后入场时间
self.indicator_cache = {} # 缓存技术指标用于日志记录
# 设置交易专用日志
trade_log_handler = logging.FileHandler("trade_log.jsonl")
trade_log_handler.setFormatter(logging.Formatter("%(asctime)s - %(message)s"))
self.trade_logger = logging.getLogger("trade_logger")
self.trade_logger.setLevel(logging.INFO)
self.trade_logger.addHandler(trade_log_handler)
def feature_engineering_expand_all(self, dataframe: DataFrame, period: int, metadata: dict, **kwargs) -> DataFrame:
dataframe["%-rsi-period"] = ta.RSI(dataframe, timeperiod=period)
dataframe["%-sma-period"] = ta.SMA(dataframe, timeperiod=period)
dataframe["%-ema-period"] = ta.EMA(dataframe, timeperiod=period)
real = ta.TYPPRICE(dataframe)
upperband, middleband, lowerband = ta.BBANDS(real, timeperiod=period, nbdevup=2.0, nbdevdn=2.0)
dataframe["bb_lowerband-period"] = lowerband
dataframe["bb_upperband-period"] = upperband
dataframe["bb_middleband-period"] = middleband
dataframe["%-bb_width-period"] = (dataframe["bb_upperband-period"] - dataframe["bb_lowerband-period"]) / dataframe["bb_middleband-period"]
dataframe["%-mfi-period"] = ta.MFI(dataframe, timeperiod=period)
dataframe["%-adx-period"] = ta.ADX(dataframe, timeperiod=period)
dataframe["%-relative_volume-period"] = dataframe["volume"] / dataframe["volume"].rolling(period).mean()
dataframe["ema200"] = ta.EMA(dataframe, timeperiod=200)
dataframe["%-price_value_divergence"] = (dataframe["close"] - dataframe["ema200"]) / dataframe["ema200"]
columns_to_clean = [
"%-rsi-period", "%-mfi-period", "%-sma-period", "%-ema-period", "%-adx-period",
"bb_lowerband-period", "bb_middleband-period", "bb_upperband-period",
"%-bb_width-period", "%-relative_volume-period", "%-price_value_divergence"
]
for col in columns_to_clean:
dataframe[col] = dataframe[col].replace([np.inf, -np.inf], 0).ffill().fillna(0)
pair = metadata.get('pair', 'Unknown')
logger.debug(f"[{pair}] 特征工程完成,列:{list(dataframe.columns)}")
return dataframe
def set_freqai_targets(self, dataframe: DataFrame, metadata: dict, **kwargs) -> DataFrame:
pair = metadata.get('pair', 'Unknown')
if len(dataframe) < 200:
logger.warning(f"[{pair}] 数据量不足({len(dataframe)}根K线需要至少200根K线进行训练")
return dataframe
dataframe["ema200"] = ta.EMA(dataframe, timeperiod=200)
dataframe["&-price_value_divergence"] = (dataframe["close"] - dataframe["ema200"]) / dataframe["ema200"]
dataframe["volume_mean_20"] = dataframe["volume"].rolling(20).mean()
dataframe["volume_std_20"] = dataframe["volume"].rolling(20).std()
dataframe["volume_z_score"] = (dataframe["volume"] - dataframe["volume_mean_20"]) / dataframe["volume_std_20"]
dataframe["&-price_value_divergence"] = dataframe["&-price_value_divergence"].replace([np.inf, -np.inf], 0).ffill().fillna(0)
dataframe["volume_z_score"] = dataframe["volume_z_score"].replace([np.inf, -np.inf], 0).ffill().fillna(0)
return dataframe
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
pair = metadata.get('pair', 'Unknown')
logger.info(f"[{pair}] 当前可用列调用FreqAI前{list(dataframe.columns)}")
# 计算200周期EMA和历史价值背离
dataframe["ema200"] = ta.EMA(dataframe, timeperiod=200)
dataframe["price_value_divergence"] = (dataframe["close"] - dataframe["ema200"]) / dataframe["ema200"]
# 调用FreqAI预测价值背离
if not hasattr(self, 'freqai') or self.freqai is None:
logger.error(f"[{pair}] FreqAI 未初始化,请确保回测命令中启用了 --freqai")
dataframe["&-price_value_divergence"] = dataframe["price_value_divergence"]
else:
logger.debug(f"self.freqai 类型:{type(self.freqai)}")
dataframe = self.freqai.start(dataframe, metadata, self)
if "&-price_value_divergence" not in dataframe.columns:
logger.warning(f"[{pair}] 回归模型未生成 &-price_value_divergence回退到规则计算")
dataframe["&-price_value_divergence"] = dataframe["price_value_divergence"]
# 计算其他指标
upperband, middleband, lowerband = ta.BBANDS(dataframe["close"], timeperiod=20, nbdevup=2.0, nbdevdn=2.0)
dataframe["bb_upperband"] = upperband
dataframe["bb_middleband"] = middleband
dataframe["bb_lowerband"] = lowerband
dataframe["rsi"] = ta.RSI(dataframe, timeperiod=14)
dataframe["volume_mean_20"] = dataframe["volume"].rolling(20).mean()
dataframe["volume_std_20"] = dataframe["volume"].rolling(20).std()
dataframe["volume_z_score"] = (dataframe["volume"] - dataframe["volume_mean_20"]) / dataframe["volume_std_20"]
# 数据清理
for col in ["ema200", "bb_upperband", "bb_middleband", "bb_lowerband", "rsi", "volume_z_score", "&-price_value_divergence", "price_value_divergence"]:
dataframe[col] = dataframe[col].replace([np.inf, -np.inf], 0).ffill().fillna(0)
# 添加调试日志
logger.debug(f"[{pair}] 最新数据 - close{dataframe['close'].iloc[-1]:.6f}, "
f"rsi{dataframe['rsi'].iloc[-1]:.2f}, "
f"&-price_value_divergence{dataframe['&-price_value_divergence'].iloc[-1]:.6f}, "
f"volume_z_score{dataframe['volume_z_score'].iloc[-1]:.2f}, "
f"bb_lowerband{dataframe['bb_lowerband'].iloc[-1]:.6f}")
# 获取 labels_mean 和 labels_std
labels_mean = None
labels_std = None
try:
model_base_dir = os.path.join(self.config["user_data_dir"], "models", self.freqai_info["identifier"])
pair_base = pair.split('/')[0] if '/' in pair else pair
sub_dirs = glob.glob(os.path.join(model_base_dir, f"sub-train-{pair_base}_*"))
if not sub_dirs:
logger.warning(f"[{pair}] 未找到任何子目录:{model_base_dir}/sub-train-{pair_base}_*")
else:
latest_sub_dir = max(sub_dirs, key=lambda x: int(x.split('_')[-1]))
pair_base_lower = pair_base.lower()
timestamp = latest_sub_dir.split('_')[-1]
metadata_file = os.path.join(latest_sub_dir, f"cb_{pair_base_lower}_{timestamp}_metadata.json")
if os.path.exists(metadata_file):
with open(metadata_file, "r") as f:
metadata = json.load(f)
labels_mean = metadata["labels_mean"]["&-price_value_divergence"]
labels_std = metadata["labels_std"]["&-price_value_divergence"]
logger.info(f"[{pair}] 从最新子目录 {latest_sub_dir} 读取 labels_mean{labels_mean}, labels_std{labels_std}")
else:
logger.warning(f"[{pair}] 最新的 metadata.json 文件 {metadata_file} 不存在")
except Exception as e:
logger.warning(f"[{pair}] 无法从子目录读取 labels_mean 和 labels_std{e},重新计算")
if labels_mean is None or labels_std is None:
logger.warning(f"[{pair}] 无法获取 labels_mean 和 labels_std重新计算")
dataframe["&-price_value_divergence_actual"] = (dataframe["close"] - dataframe["ema200"]) / dataframe["ema200"]
dataframe["&-price_value_divergence_actual"] = dataframe["&-price_value_divergence_actual"].replace([np.inf, -np.inf], 0).ffill().fillna(0)
recent_data = dataframe["&-price_value_divergence_actual"].tail(self.fit_live_predictions_candles)
labels_mean = recent_data.mean()
labels_std = recent_data.std()
if np.isnan(labels_std) or labels_std == 0:
labels_std = 0.01
logger.warning(f"[{pair}] labels_std 计算异常,使用默认值 0.01")
# 根据市场趋势动态调整买卖阈值
market_trend = self.get_market_trend()
k_buy = 1.0
k_sell = 1.2
if market_trend == 'bull':
k_buy = 0.8 # 放宽买入阈值
k_sell = 1.0 # 收紧卖出阈值
elif market_trend == 'bear':
k_buy = 1.2 # 收紧买入阈值
k_sell = 1.5 # 放宽卖出阈值
else:
k_buy = 1.0
k_sell = 1.2
if labels_mean > 0.015:
k_sell += 0.5
self.buy_threshold = labels_mean - k_buy * labels_std
self.sell_threshold = labels_mean + k_sell * labels_std
# 使用 Hyperopt 参数限制阈值
self.buy_threshold = max(self.buy_threshold, self.BUY_THRESHOLD_MIN.value)
self.buy_threshold = min(self.buy_threshold, self.BUY_THRESHOLD_MAX.value)
self.sell_threshold = min(self.sell_threshold, self.SELL_THRESHOLD_MAX.value)
self.sell_threshold = max(self.sell_threshold, self.SELL_THRESHOLD_MIN.value)
logger.info(f"[{pair}] 市场趋势:{market_trend}, labels_mean{labels_mean:.4f}, labels_std{labels_std:.4f}")
logger.info(f"[{pair}] k_buy{k_buy:.2f}, k_sell{k_sell:.2f}")
logger.info(f"[{pair}] 动态买入阈值:{self.buy_threshold:.4f}, 卖出阈值:{self.sell_threshold:.4f}")
if not self.stats_logged:
logger.info("===== 所有币对的 labels_mean 和 labels_std 汇总 =====")
for p, stats in self.pair_stats.items():
logger.info(f"[{p}] labels_mean{stats['labels_mean']:.4f}, labels_std{stats['labels_std']:.4f}")
logger.info("==============================================")
self.stats_logged = True
return dataframe
def generate_roi_table(self, params: Dict) -> Dict[int, float]:
roi_table = {
"0": params.get("roi_t0", self.ROI_T0.value),
"30": params.get("roi_t1", self.ROI_T1.value),
"60": params.get("roi_t2", self.ROI_T2.value),
}
return roi_table
def roi_space(self):
return [
DecimalParameter(0.01, 0.05, name="roi_t0"),
DecimalParameter(0.005, 0.02, name="roi_t1"),
DecimalParameter(0.0, 0.01, name="roi_t2")
]
def trailing_space(self):
return [
DecimalParameter(0.01, 0.05, name="trailing_stop_start"),
DecimalParameter(0.005, 0.02, name="trailing_stop_distance")
]
def leverage_space(self):
return [
DecimalParameter(-0.05, -0.01, name="add_position_threshold", default=-0.02),
DecimalParameter(0.2, 0.7, name="exit_position_ratio", default=0.5),
IntParameter(1, 10, name="cooldown_period_minutes", default=5),
IntParameter(1, 3, name="max_entry_position_adjustment", default=2)
]
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
pair = metadata.get('pair', 'Unknown')
conditions = []
if "&-price_value_divergence" in dataframe.columns:
cond1 = (dataframe["&-price_value_divergence"] < self.buy_threshold)
cond2 = (dataframe["volume_z_score"] > 1.5)
cond3 = (dataframe["rsi"] < 40)
cond4 = (dataframe["close"] <= dataframe["bb_lowerband"])
buy_condition = cond1 & cond2 & cond3 & cond4
conditions.append(buy_condition)
logger.debug(f"[{pair}] 买入条件检查 - "
f"&-price_value_divergence < {self.buy_threshold:.6f}: {cond1.iloc[-1]}, "
f"volume_z_score > 1.5: {cond2.iloc[-1]}, "
f"rsi < 40: {cond3.iloc[-1]}, "
f"close <= bb_lowerband: {cond4.iloc[-1]}")
else:
logger.warning(f"[{pair}] ⚠️ &-price_value_divergence 列缺失,跳过该条件")
if len(conditions) > 0:
combined_condition = reduce(lambda x, y: x & y, conditions)
if combined_condition.any():
dataframe.loc[combined_condition, 'enter_long'] = 1
# 缓存指标用于交易日志
self.indicator_cache[pair] = {
"rsi": dataframe["rsi"].iloc[-1],
"volume_z_score": dataframe["volume_z_score"].iloc[-1],
"&-price_value_divergence": dataframe["&-price_value_divergence"].iloc[-1],
"bb_lowerband": dataframe["bb_lowerband"].iloc[-1]
}
logger.debug(f"[{pair}] 入场信号触发,条件满足")
else:
logger.debug(f"[{pair}] 买入条件均不满足,未触发入场信号")
else:
logger.debug(f"[{pair}] 无有效买入条件")
return dataframe
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
pair = metadata.get('pair', 'Unknown')
conditions = []
if "&-price_value_divergence" in dataframe.columns:
cond1 = (dataframe["&-price_value_divergence"] > self.sell_threshold)
cond2 = (dataframe["rsi"] > 75)
sell_condition = cond1 | cond2
conditions.append(sell_condition)
logger.debug(f"[{pair}] 卖出条件检查 - "
f"&-price_value_divergence > {self.sell_threshold:.6f}: {cond1.iloc[-1]}, "
f"rsi > 75: {cond2.iloc[-1]}")
else:
logger.warning(f"[{pair}] ⚠️ &-price_value_divergence 列缺失,跳过该条件")
if len(conditions) > 0:
combined_condition = reduce(lambda x, y: x & y, conditions)
if combined_condition.any():
dataframe.loc[combined_condition, 'exit_long'] = 1
# 缓存指标用于交易日志
self.indicator_cache[pair] = {
"rsi": dataframe["rsi"].iloc[-1],
"volume_z_score": dataframe["volume_z_score"].iloc[-1],
"&-price_value_divergence": dataframe["&-price_value_divergence"].iloc[-1],
"bb_upperband": dataframe["bb_upperband"].iloc[-1]
}
logger.debug(f"[{pair}] 出场信号触发,条件满足")
else:
logger.debug(f"[{pair}] 卖出条件均不满足,未触发出场信号")
else:
logger.debug(f"[{pair}] 无有效卖出条件")
return dataframe
def buy_space(self):
return [
DecimalParameter(-0.1, -0.01, name="buy_threshold_min"),
DecimalParameter(-0.02, -0.001, name="buy_threshold_max"),
DecimalParameter(-0.05, -0.01, name="add_position_threshold", default=-0.02),
IntParameter(1, 10, name="cooldown_period_minutes", default=5),
IntParameter(1, 3, name="max_entry_position_adjustment", default=2)
]
def sell_space(self):
return [
DecimalParameter(0.001, 0.02, name="sell_threshold_min"),
DecimalParameter(0.02, 0.1, name="sell_threshold_max"),
DecimalParameter(0.2, 0.7, name="exit_position_ratio", default=0.5)
]
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float,
min_stake: float | None, max_stake: float,
current_entry_rate: float, current_exit_rate: float,
current_entry_profit: float, current_exit_profit: float,
**kwargs) -> float | None | tuple[float | None, str | None]:
"""
动态调整仓位:支持加仓和分批减仓
参数:
trade: 当前交易对象
current_time: 当前时间
current_rate: 当前市场价格
current_profit: 当前未实现利润百分比
min_stake: 最小下单金额(可能为 None
max_stake: 最大可用下单金额
current_entry_rate: 当前入场价格
current_exit_rate: 当前退出价格
current_entry_profit: 当前入场利润百分比
current_exit_profit: 当前退出利润百分比
返回值:
- 正数float加仓金额报价货币例如 USDT
- 负数float减仓金额例如 -30.0 表示卖出 30 USDT
- None不调整仓位
- tuple[float | None, str | None](调整金额, 调整原因)
"""
pair = trade.pair
hold_time = (current_time - trade.open_date_utc).total_seconds() / 60
market_trend = self.get_market_trend()
profit_ratio = (current_rate - trade.open_rate) / trade.open_rate
# --- 加仓逻辑 ---
max_entry_adjustments = self.MAX_ENTRY_POSITION_ADJUSTMENT.value
if trade.nr_of_successful_entries <= max_entry_adjustments + 1: # +1 包括初始入场
add_position_threshold = self.ADD_POSITION_THRESHOLD.value
if profit_ratio <= add_position_threshold and hold_time > 5:
if market_trend in ['bear', 'sideways']: # 熊市或震荡市更倾向加仓
add_amount = 0.5 * trade.stake_amount # 加仓 50% 的初始仓位
if min_stake is not None and add_amount < min_stake:
logger.warning(f"[{pair}] 加仓金额 {add_amount:.2f} 低于最小下单金额 {min_stake},取消加仓")
return (None, "Add amount below min_stake")
if add_amount > max_stake:
logger.warning(f"[{pair}] 加仓金额 {add_amount:.2f} 超出最大可用金额 {max_stake},取消加仓")
return (None, "Add amount exceeds max_stake")
logger.info(f"[{pair}] 价格下跌 {profit_ratio*100:.2f}%,触发加仓 {add_amount:.2f}")
return (add_amount, f"Price dropped {profit_ratio*100:.2f}%")
else:
logger.debug(f"[{pair}] 价格下跌但市场趋势为 {market_trend},不加仓")
return None
# --- 减仓逻辑 ---
exit_position_ratio = self.EXIT_POSITION_RATIO.value
if profit_ratio >= 0.03: # 利润达到 3%
if market_trend == 'bull':
reduce_amount = -exit_position_ratio * 0.6 * trade.stake_amount # 牛市减仓较少
logger.info(f"[{pair}] 牛市,利润 {profit_ratio*100:.2f}%,减仓 {abs(reduce_amount):.2f}")
return (reduce_amount, f"Bull market, profit {profit_ratio*100:.2f}%")
else:
reduce_amount = -exit_position_ratio * trade.stake_amount # 其他市场减仓较多
logger.info(f"[{pair}] 利润 {profit_ratio*100:.2f}%,减仓 {abs(reduce_amount):.2f}")
return (reduce_amount, f"Profit {profit_ratio*100:.2f}%")
elif profit_ratio >= 0.05: # 利润达到 5%
reduce_amount = -exit_position_ratio * 1.4 * trade.stake_amount # 卖出更多
logger.info(f"[{pair}] 利润 {profit_ratio*100:.2f}%,减仓 {abs(reduce_amount):.2f}")
return (reduce_amount, f"Profit {profit_ratio*100:.2f}%")
# --- 追踪止损逻辑 ---
trailing_stop_start = self.TRAILING_STOP_START.value
trailing_stop_distance = self.TRAILING_STOP_DISTANCE.value
if market_trend == 'bull':
trailing_stop_distance *= 1.5 # 牛市放宽追踪止损
trailing_stop_start *= 1.2
elif market_trend == 'bear':
trailing_stop_distance *= 0.7 # 熊市收紧追踪止损
trailing_stop_start *= 0.8
if profit_ratio >= trailing_stop_start and not self.trailing_stop_enabled:
self.trailing_stop_enabled = True
trade.adjust_min_max_rates(current_rate, current_rate) # 初始化最高和最低价格
logger.info(f"[{pair}] 价格上涨超过 {trailing_stop_start*100:.1f}%,启动 Trailing Stop")
return None
if self.trailing_stop_enabled:
max_rate = trade.max_rate
trailing_stop_price = max_rate * (1 - trailing_stop_distance)
if current_rate < trailing_stop_price:
logger.info(f"[{pair}] 价格回落至 Trailing Stop 点 {trailing_stop_price:.6f},触发全部卖出")
return (-trade.stake_amount, f"Trailing stop at {trailing_stop_price:.6f}")
trade.adjust_min_max_rates(current_rate, trade.min_rate) # 更新最高价格
return None
# --- 最大持仓时间限制 ---
if hold_time > 30:
logger.info(f"[{pair}] 持仓时间超过30分钟强制清仓")
return (-trade.stake_amount, "Max hold time exceeded")
return None
def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float,
time_in_force: str, current_time: datetime, **kwargs) -> bool:
"""
确认交易进入时的逻辑,并记录详细日志
"""
market_trend = self.get_market_trend()
cooldown_period_minutes = self.COOLDOWN_PERIOD_MINUTES.value if market_trend == 'bull' else self.COOLDOWN_PERIOD_MINUTES.value // 2
# 检查冷却期
if pair in self.last_entry_time:
last_time = self.last_entry_time[pair]
if (current_time - last_time).total_seconds() < cooldown_period_minutes * 60:
logger.info(f"[{pair}] 冷却期内({cooldown_period_minutes} 分钟),跳过本次入场")
return False
# 记录进入交易的详细信息
entry_log = {
"pair": pair,
"order_type": order_type,
"amount": amount,
"rate": rate,
"total_value": amount * rate,
"time_in_force": time_in_force,
"timestamp": current_time.isoformat(),
"market_trend": market_trend,
"cooldown_period_minutes": cooldown_period_minutes,
"buy_threshold": self.buy_threshold,
"indicators": self.indicator_cache.get(pair, {
"rsi": "N/A",
"volume_z_score": "N/A",
"&-price_value_divergence": "N/A",
"bb_lowerband": "N/A"
})
}
logger.info(f"[{pair}] 交易进入确认: {json.dumps(entry_log, indent=2)}")
self.trade_logger.info(json.dumps(entry_log))
# 更新最后入场时间并重置追踪止损
self.last_entry_time[pair] = current_time
self.trailing_stop_enabled = False
return True
def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float,
rate: float, time_in_force: str, exit_reason: str,
current_time: datetime, **kwargs) -> bool:
"""
确认交易退出时的逻辑,并记录详细日志
"""
adjusted_rate = rate * (1 + 0.0025)
profit_ratio = trade.calc_profit_ratio(rate)
profit_amount = trade.calc_profit(rate)
hold_time_minutes = (current_time - trade.open_date_utc).total_seconds() / 60
# 记录退出交易的详细信息
exit_log = {
"pair": pair,
"order_type": order_type,
"amount": amount,
"rate": rate,
"adjusted_rate": adjusted_rate,
"total_value": amount * rate,
"time_in_force": time_in_force,
"timestamp": current_time.isoformat(),
"exit_reason": exit_reason,
"profit_ratio": profit_ratio,
"profit_amount": profit_amount,
"hold_time_minutes": hold_time_minutes,
"market_trend": self.get_market_trend(),
"sell_threshold": self.sell_threshold,
"indicators": self.indicator_cache.get(pair, {
"rsi": "N/A",
"volume_z_score": "N/A",
"&-price_value_divergence": "N/A",
"bb_upperband": "N/A"
}),
"trade_details": {
"open_rate": trade.open_rate,
"open_date": trade.open_date_utc.isoformat(),
"stake_amount": trade.stake_amount,
"nr_of_successful_entries": trade.nr_of_successful_entries
}
}
logger.info(f"[{pair}] 交易退出确认: {json.dumps(exit_log, indent=2)}")
self.trade_logger.info(json.dumps(exit_log))
return True
def custom_entry_price(self, pair: str, trade: Trade | None, current_time: datetime, proposed_rate: float,
entry_tag: str | None, side: str, **kwargs) -> float:
adjusted_rate = proposed_rate * (1 - 0.005)
logger.debug(f"[{pair}] 自定义买入价:{adjusted_rate:.6f}(原价:{proposed_rate:.6f}")
return adjusted_rate
def custom_exit_price(self, pair: str, trade: Trade,
current_time: datetime, proposed_rate: float,
current_profit: float, exit_tag: str | None, **kwargs) -> float:
adjusted_rate = proposed_rate * (1 + 0.0025)
logger.debug(f"[{pair}] 自定义卖出价:{adjusted_rate:.6f}(原价:{proposed_rate:.6f}")
return adjusted_rate
def get_market_trend(self, dataframe: DataFrame = None, metadata: dict = None) -> str:
"""
判断市场趋势bull牛市、bear熊市、sideways震荡市
基于多时间框架的EMA、K线形态和量价关系
"""
try:
timeframes = ["3m", "15m", "1h"]
weights = {"3m": 0.5, "15m": 0.3, "1h": 0.2}
trend_scores = {}
pair = metadata.get('pair', 'Unknown') if metadata else 'Unknown'
logger.debug(f"[{pair}] 正在计算多时间框架市场趋势")
for tf in timeframes:
if tf == "3m" and dataframe is not None:
btc_df = dataframe
else:
btc_df = self.dp.get_pair_dataframe("BTC/USDT", tf)
min_candles = 200 if tf == "3m" else 100 if tf == "15m" else 50
if len(btc_df) < min_candles:
logger.warning(f"[{pair}] BTC 数据不足({tf}{len(btc_df)} 根K线默认震荡市")
trend_scores[tf] = 50
continue
# 计算EMA
btc_df["ema_short"] = ta.EMA(btc_df, timeperiod=50 if tf == "3m" else 20 if tf == "15m" else 12)
btc_df["ema_long"] = ta.EMA(btc_df, timeperiod=200 if tf == "3m" else 80 if tf == "15m" else 50)
btc_df["ema_short_slope"] = (btc_df["ema_short"] - btc_df["ema_short"].shift(10)) / btc_df["ema_short"].shift(10)
# 价格趋势得分
price_score = 0
if btc_df["close"].iloc[-1] > btc_df["ema_long"].iloc[-1]:
price_score += 20
if btc_df["ema_short"].iloc[-1] > btc_df["ema_long"].iloc[-1]:
price_score += 20
if btc_df["ema_short_slope"].iloc[-1] > 0.005:
price_score += 15
elif btc_df["ema_short_slope"].iloc[-1] < -0.005:
price_score -= 15
# K线形态得分
btc_df["bullish_engulfing"] = (
(btc_df["close"].shift(1) < btc_df["open"].shift(1)) &
(btc_df["close"] > btc_df["open"]) &
(btc_df["close"] > btc_df["open"].shift(1)) &
(btc_df["open"] < btc_df["close"].shift(1))
)
btc_df["bearish_engulfing"] = (
(btc_df["close"].shift(1) > btc_df["open"].shift(1)) &
(btc_df["close"] < btc_df["open"]) &
(btc_df["close"] < btc_df["open"].shift(1)) &
(btc_df["open"] > btc_df["close"].shift(1))
)
kline_score = 0
if btc_df["bullish_engulfing"].iloc[-1]:
kline_score += 15
elif btc_df["bearish_engulfing"].iloc[-1]:
kline_score -= 15
volatility = btc_df["close"].pct_change(10).std() * 100
if volatility > 0.5:
kline_score += 10 if price_score > 0 else -10
# StochRSI得分
stochrsi = ta.STOCHRSI(btc_df, timeperiod=14, fastk_period=3, fastd_period=3)
btc_df["stochrsi_k"] = stochrsi["fastk"]
btc_df["stochrsi_d"] = stochrsi["fastd"]
stochrsi_score = 0
if btc_df["stochrsi_k"].iloc[-1] > 80 and btc_df["stochrsi_k"].iloc[-1] < btc_df["stochrsi_d"].iloc[-1]:
stochrsi_score -= 15
elif btc_df["stochrsi_k"].iloc[-1] < 20 and btc_df["stochrsi_k"].iloc[-1] > btc_df["stochrsi_d"].iloc[-1]:
stochrsi_score += 15
elif btc_df["stochrsi_k"].iloc[-1] > 50:
stochrsi_score += 5
elif btc_df["stochrsi_k"].iloc[-1] < 50:
stochrsi_score -= 5
# 量价关系得分
btc_df["volume_mean_20"] = btc_df["volume"].rolling(20).mean()
btc_df["volume_std_20"] = btc_df["volume"].rolling(20).std()
btc_df["volume_z_score"] = (btc_df["volume"] - btc_df["volume_mean_20"]) / btc_df["volume_std_20"]
btc_df["adx"] = ta.ADX(btc_df, timeperiod=14)
volume_score = 0
if btc_df["volume_z_score"].iloc[-1] > 1.5:
volume_score += 10 if price_score > 0 else -10
if btc_df["adx"].iloc[-1] > 25:
volume_score += 10 if price_score > 0 else -10
# 综合得分
raw_score = price_score + kline_score + stochrsi_score + volume_score
raw_score = max(min(raw_score, 50), -50)
if raw_score >= 0:
mapped_score = 50 + 50 * (np.log1p(raw_score / 50) / np.log1p(1))
else:
mapped_score = 50 * (np.log1p(-raw_score / 50) / np.log1p(1))
trend_scores[tf] = max(0, min(100, int(round(mapped_score))))
logger.debug(f"[{pair}] {tf} 趋势得分:{trend_scores[tf]}, "
f"原始得分:{raw_score}, 价格得分:{price_score}, "
f"K线得分{kline_score}, StochRSI得分{stochrsi_score}, "
f"量价得分:{volume_score}")
# 加权融合
final_score = sum(trend_scores[tf] * weights[tf] for tf in timeframes)
final_score = max(0, min(100, int(round(final_score))))
logger.info(f"[{pair}] 最终趋势得分:{final_score}, "
f"3m得分{trend_scores.get('3m', 50)}, "
f"15m得分{trend_scores.get('15m', 50)}, "
f"1h得分{trend_scores.get('1h', 50)}")
# 映射到趋势类别
if final_score >= 70:
return 'bull'
elif final_score <= 30:
return 'bear'
else:
return 'sideways'
except Exception as e:
logger.error(f"[{pair}] 获取市场趋势失败:{e}", exc_info=True)
return 'sideways'