get_market_trend 改成线性函数

This commit is contained in:
zhangkun9038@dingtalk.com 2025-06-22 17:37:55 +08:00
parent 73a64adf98
commit 0b00430b9a

View File

@ -154,6 +154,11 @@ class FreqaiPrimer(IStrategy):
return dataframe
@staticmethod
def linear_map(value, from_min, from_max, to_min, to_max):
return (value - from_min) / (from_max - from_min) * (to_max - to_min) + to_min
# 在 populate_indicators 中修改市场趋势相关逻辑
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
pair = metadata.get('pair', 'Unknown')
logger.info(f"[{pair}] 当前可用列调用FreqAI前{list(dataframe.columns)}")
@ -190,10 +195,10 @@ class FreqaiPrimer(IStrategy):
# 添加调试日志
logger.debug(f"[{pair}] 最新数据 - close{dataframe['close'].iloc[-1]:.6f}, "
f"rsi{dataframe['rsi'].iloc[-1]:.2f}, "
f"&-price_value_divergence{dataframe['&-price_value_divergence'].iloc[-1]:.6f}, "
f"volume_z_score{dataframe['volume_z_score'].iloc[-1]:.2f}, "
f"bb_lowerband{dataframe['bb_lowerband'].iloc[-1]:.6f}")
f"rsi{dataframe['rsi'].iloc[-1]:.2f}, "
f"&-price_value_divergence{dataframe['&-price_value_divergence'].iloc[-1]:.6f}, "
f"volume_z_score{dataframe['volume_z_score'].iloc[-1]:.2f}, "
f"bb_lowerband{dataframe['bb_lowerband'].iloc[-1]:.6f}")
# 获取 labels_mean 和 labels_std
labels_mean = None
@ -234,22 +239,10 @@ class FreqaiPrimer(IStrategy):
labels_std = 0.01
logger.warning(f"[{pair}] labels_std 计算异常,使用默认值 0.01")
# 根据市场趋势动态调整买卖阈值
market_trend = self.get_market_trend()
k_buy = 1.0
k_sell = 1.2
if market_trend == 'bull':
k_buy = 0.8 # 放宽买入阈值
k_sell = 1.0 # 收紧卖出阈值
elif market_trend == 'bear':
k_buy = 1.2 # 收紧买入阈值
k_sell = 1.5 # 放宽卖出阈值
else:
k_buy = 1.0
k_sell = 1.2
if labels_mean > 0.015:
k_sell += 0.5
# 根据市场趋势得分动态调整买卖阈值
market_trend_score = self.get_market_trend()
k_buy = FreqaiPrimer.linear_map(market_trend_score, 0, 100, 1.2, 0.8)
k_sell = FreqaiPrimer.linear_map(market_trend_score, 0, 100, 1.5, 1.0)
self.buy_threshold = labels_mean - k_buy * labels_std
self.sell_threshold = labels_mean + k_sell * labels_std
@ -260,7 +253,7 @@ class FreqaiPrimer(IStrategy):
self.sell_threshold = min(self.sell_threshold, self.SELL_THRESHOLD_MAX.value)
self.sell_threshold = max(self.sell_threshold, self.SELL_THRESHOLD_MIN.value)
logger.info(f"[{pair}] 市场趋势{market_trend}, labels_mean{labels_mean:.4f}, labels_std{labels_std:.4f}")
logger.info(f"[{pair}] 市场趋势得分{market_trend_score}, labels_mean{labels_mean:.4f}, labels_std{labels_std:.4f}")
logger.info(f"[{pair}] k_buy{k_buy:.2f}, k_sell{k_sell:.2f}")
logger.info(f"[{pair}] 动态买入阈值:{self.buy_threshold:.4f}, 卖出阈值:{self.sell_threshold:.4f}")
@ -382,26 +375,10 @@ class FreqaiPrimer(IStrategy):
**kwargs) -> float | None | tuple[float | None, str | None]:
"""
动态调整仓位支持加仓和分批减仓
参数
trade: 当前交易对象
current_time: 当前时间
current_rate: 当前市场价格
current_profit: 当前未实现利润百分比
min_stake: 最小下单金额可能为 None
max_stake: 最大可用下单金额
current_entry_rate: 当前入场价格
current_exit_rate: 当前退出价格
current_entry_profit: 当前入场利润百分比
current_exit_profit: 当前退出利润百分比
返回值
- 正数float加仓金额报价货币例如 USDT
- 负数float减仓金额例如 -30.0 表示卖出 30 USDT
- None不调整仓位
- tuple[float | None, str | None](调整金额, 调整原因)
"""
pair = trade.pair
hold_time = (current_time - trade.open_date_utc).total_seconds() / 60
market_trend = self.get_market_trend()
market_trend_score = self.get_market_trend()
profit_ratio = (current_rate - trade.open_rate) / trade.open_rate
# --- 加仓逻辑 ---
@ -409,7 +386,7 @@ class FreqaiPrimer(IStrategy):
if trade.nr_of_successful_entries <= max_entry_adjustments + 1: # +1 包括初始入场
add_position_threshold = self.ADD_POSITION_THRESHOLD.value
if profit_ratio <= add_position_threshold and hold_time > 5:
if market_trend in ['bear', 'sideways']: # 熊市或震荡市更倾向加仓
if market_trend_score <= 60: # 熊市或中性更倾向加仓
add_amount = 0.5 * trade.stake_amount # 加仓 50% 的初始仓位
if min_stake is not None and add_amount < min_stake:
logger.warning(f"[{pair}] 加仓金额 {add_amount:.2f} 低于最小下单金额 {min_stake},取消加仓")
@ -420,16 +397,16 @@ class FreqaiPrimer(IStrategy):
logger.info(f"[{pair}] 价格下跌 {profit_ratio*100:.2f}%,触发加仓 {add_amount:.2f}")
return (add_amount, f"Price dropped {profit_ratio*100:.2f}%")
else:
logger.debug(f"[{pair}] 价格下跌但市场趋势{market_trend},不加仓")
logger.debug(f"[{pair}] 价格下跌但市场趋势得分 {market_trend_score},不加仓")
return None
# --- 减仓逻辑 ---
exit_position_ratio = self.EXIT_POSITION_RATIO.value
if profit_ratio >= 0.03: # 利润达到 3%
if market_trend == 'bull':
reduce_amount = -exit_position_ratio * 0.6 * trade.stake_amount # 牛市减仓较少
logger.info(f"[{pair}] 牛市,利润 {profit_ratio*100:.2f}%,减仓 {abs(reduce_amount):.2f}")
return (reduce_amount, f"Bull market, profit {profit_ratio*100:.2f}%")
if market_trend_score > 70: # 强牛市
reduce_amount = -exit_position_ratio * 0.6 * trade.stake_amount # 减仓较少
logger.info(f"[{pair}] 牛市(得分 {market_trend_score},利润 {profit_ratio*100:.2f}%,减仓 {abs(reduce_amount):.2f}")
return (reduce_amount, f"Strong bull market, profit {profit_ratio*100:.2f}%")
else:
reduce_amount = -exit_position_ratio * trade.stake_amount # 其他市场减仓较多
logger.info(f"[{pair}] 利润 {profit_ratio*100:.2f}%,减仓 {abs(reduce_amount):.2f}")
@ -442,16 +419,16 @@ class FreqaiPrimer(IStrategy):
# --- 追踪止损逻辑 ---
trailing_stop_start = self.TRAILING_STOP_START.value
trailing_stop_distance = self.TRAILING_STOP_DISTANCE.value
if market_trend == 'bull':
trailing_stop_distance *= 1.5 # 牛市放宽追踪止损
if market_trend_score > 70: # 强牛市
trailing_stop_distance *= 1.5
trailing_stop_start *= 1.2
elif market_trend == 'bear':
trailing_stop_distance *= 0.7 # 熊市收紧追踪止损
elif market_trend_score < 30: # 强熊市
trailing_stop_distance *= 0.7
trailing_stop_start *= 0.8
if profit_ratio >= trailing_stop_start and not self.trailing_stop_enabled:
self.trailing_stop_enabled = True
trade.adjust_min_max_rates(current_rate, current_rate) # 初始化最高和最低价格
trade.adjust_min_max_rates(current_rate, current_rate)
logger.info(f"[{pair}] 价格上涨超过 {trailing_stop_start*100:.1f}%,启动 Trailing Stop")
return None
@ -461,7 +438,7 @@ class FreqaiPrimer(IStrategy):
if current_rate < trailing_stop_price:
logger.info(f"[{pair}] 价格回落至 Trailing Stop 点 {trailing_stop_price:.6f},触发全部卖出")
return (-trade.stake_amount, f"Trailing stop at {trailing_stop_price:.6f}")
trade.adjust_min_max_rates(current_rate, trade.min_rate) # 更新最高价格
trade.adjust_min_max_rates(current_rate, trade.min_rate)
return None
# --- 最大持仓时间限制 ---
@ -470,10 +447,20 @@ class FreqaiPrimer(IStrategy):
return (-trade.stake_amount, "Max hold time exceeded")
return None
def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float,
time_in_force: str, current_time: datetime, **kwargs) -> bool:
market_trend = self.get_market_trend()
cooldown_period_minutes = self.COOLDOWN_PERIOD_MINUTES.value if market_trend == 'bull' else self.COOLDOWN_PERIOD_MINUTES.value // 2
# 调试日志:记录输入参数
logger.debug(f"[{pair}] confirm_trade_entry called with rate={rate}, type(rate)={type(rate)}, "
f"amount={amount}, order_type={order_type}, time_in_force={time_in_force}")
# 检查 rate 是否有效
if not isinstance(rate, (float, int)) or rate is None:
logger.error(f"[{pair}] Invalid rate value: {rate} (type: {type(rate)}). Skipping trade entry.")
return False
market_trend_score = self.get_market_trend()
cooldown_period_minutes = self.COOLDOWN_PERIOD_MINUTES.value if market_trend_score > 50 else self.COOLDOWN_PERIOD_MINUTES.value // 2
if pair in self.last_entry_time:
last_time = self.last_entry_time[pair]
@ -483,7 +470,12 @@ class FreqaiPrimer(IStrategy):
self.last_entry_time[pair] = current_time
self.trailing_stop_enabled = False
logger.info(f"[{pair}] 确认入场,价格:{rate:.6f}")
try:
logger.info(f"[{pair}] 确认入场,价格:{float(rate):.6f}")
except (ValueError, TypeError) as e:
logger.error(f"[{pair}] Failed to format rate: {rate} (type: {type(rate)}), error: {e}")
return False
return True
def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float,
@ -507,28 +499,118 @@ class FreqaiPrimer(IStrategy):
logger.debug(f"[{pair}] 自定义卖出价:{adjusted_rate:.6f}(原价:{proposed_rate:.6f}")
return adjusted_rate
def get_market_trend(self, dataframe: DataFrame = None) -> str:
def get_market_trend(self, dataframe: DataFrame = None) -> int:
"""
计算市场趋势得分0 表示强熊100 表示强牛50 表示中性
使用对数函数映射非线性增强两端趋势
综合价格趋势K 线形态StochRSI 和量价关系
参数
dataframe: 可选外部传入的 BTC/USDT 数据默认为 None自动获取
返回值
int: 0-100 的趋势得分
"""
try:
btc_df = self.dp.get_pair_dataframe("BTC/USDT", self.timeframe)
# 获取 BTC/USDT 数据
btc_df = dataframe if dataframe is not None else self.dp.get_pair_dataframe("BTC/USDT", self.timeframe)
if len(btc_df) < 200:
logger.warning("BTC 数据不足返回默认趋势sideways")
return "sideways"
logger.warning("BTC 数据不足,返回默认趋势得分50")
return 50
# --- 价格趋势 ---
btc_df["ema50"] = ta.EMA(btc_df, timeperiod=50)
btc_df["ema200"] = ta.EMA(btc_df, timeperiod=200)
btc_df["adx"] = ta.ADX(btc_df, timeperiod=14)
trend_strength = btc_df["close"].pct_change(50).iloc[-1]
btc_df["ema50_slope"] = (btc_df["ema50"] - btc_df["ema50"].shift(10)) / btc_df["ema50"].shift(10)
price_above_ema = btc_df["close"].iloc[-1] > btc_df["ema200"].iloc[-1]
adx_value = btc_df["adx"].iloc[-1]
ema50_above_ema200 = btc_df["ema50"].iloc[-1] > btc_df["ema200"].iloc[-1]
ema50_slope = btc_df["ema50_slope"].iloc[-1]
price_score = 0
if price_above_ema:
price_score += 20
if ema50_above_ema200:
price_score += 20
if ema50_slope > 0.005:
price_score += 15
elif ema50_slope < -0.005:
price_score -= 15
if price_above_ema and trend_strength > 0.03 and adx_value > 25:
return "bull"
elif not price_above_ema and trend_strength < -0.03 and adx_value > 25:
return "bear"
# --- K 线形态 ---
btc_df["bullish_engulfing"] = (
(btc_df["close"].shift(1) < btc_df["open"].shift(1)) & # 前一天阴线
(btc_df["close"] > btc_df["open"]) & # 当天阳线
(btc_df["close"] > btc_df["open"].shift(1)) & # 吞没前一天开盘价
(btc_df["open"] < btc_df["close"].shift(1)) # 吞没前一天收盘价
)
btc_df["bearish_engulfing"] = (
(btc_df["close"].shift(1) > btc_df["open"].shift(1)) & # 前一天阳线
(btc_df["close"] < btc_df["open"]) & # 当天阴线
(btc_df["close"] < btc_df["open"].shift(1)) & # 吞没前一天开盘价
(btc_df["open"] > btc_df["close"].shift(1)) # 吞没前一天收盘价
)
kline_score = 0
if btc_df["bullish_engulfing"].iloc[-1]:
kline_score += 15
elif btc_df["bearish_engulfing"].iloc[-1]:
kline_score -= 15
# 近期波动性
volatility = btc_df["close"].pct_change(10).std() * 100
if volatility > 0.5:
kline_score += 10 if price_score > 0 else -10
# --- StochRSI ---
stochrsi = ta.STOCHRSI(btc_df, timeperiod=14, fastk_period=3, fastd_period=3)
btc_df["stochrsi_k"] = stochrsi["fastk"]
btc_df["stochrsi_d"] = stochrsi["fastd"]
stochrsi_score = 0
stochrsi_k = btc_df["stochrsi_k"].iloc[-1]
stochrsi_d = btc_df["stochrsi_d"].iloc[-1]
if stochrsi_k > 80 and stochrsi_k < stochrsi_d:
stochrsi_score -= 15 # 超买且 K 下穿 D
elif stochrsi_k < 20 and stochrsi_k > stochrsi_d:
stochrsi_score += 15 # 超卖且 K 上穿 D
elif stochrsi_k > 50:
stochrsi_score += 5
elif stochrsi_k < 50:
stochrsi_score -= 5
# --- 量价关系 ---
btc_df["volume_mean_20"] = btc_df["volume"].rolling(20).mean()
btc_df["volume_std_20"] = btc_df["volume"].rolling(20).std()
btc_df["volume_z_score"] = (btc_df["volume"] - btc_df["volume_mean_20"]) / btc_df["volume_std_20"]
btc_df["adx"] = ta.ADX(btc_df, timeperiod=14)
volume_score = 0
if btc_df["volume_z_score"].iloc[-1] > 1.5:
volume_score += 10 if price_score > 0 else -10
if btc_df["adx"].iloc[-1] > 25:
volume_score += 10 if price_score > 0 else -10
# --- 综合得分 ---
raw_score = price_score + kline_score + stochrsi_score + volume_score
# 归一化到 [-50, 50]
raw_score = max(min(raw_score, 50), -50)
# 对数映射到 [0, 100]50 为中性
if raw_score >= 0:
# 正向50 到 100log(x + 1) 确保非线性
mapped_score = 50 + 50 * (np.log1p(raw_score / 50) / np.log1p(1))
else:
return "sideways"
# 负向0 到 50log(1 - x)
mapped_score = 50 * (np.log1p(-raw_score / 50) / np.log1p(1))
final_score = int(round(mapped_score))
final_score = max(0, min(100, final_score))
logger.debug(f"BTC 趋势得分:{final_score}, 原始得分:{raw_score}, "
f"价格得分:{price_score}, K线得分{kline_score}, "
f"StochRSI得分{stochrsi_score}, 量价得分:{volume_score}")
return final_score
except Exception as e:
logger.error(f"获取市场趋势失败:{e}", exc_info=True)
return "sideways"
return 50