分段长度用freqai优化

This commit is contained in:
zhangkun9038@dingtalk.com 2025-08-17 00:28:35 +08:00
parent 066aa08809
commit 0f0b2e0b83

View File

@ -228,6 +228,10 @@ class FreqaiPrimer(IStrategy):
dataframe["&-price_value_divergence"] = dataframe["&-price_value_divergence"].replace([np.inf, -np.inf], 0).ffill().fillna(0)
# 添加趋势分段长度作为FreqAI训练目标
# 基于未来价格表现计算最优段长
dataframe = self.calculate_optimal_segment_length(dataframe, metadata)
return dataframe
def is_stochrsi_overbought(self, dataframe: DataFrame, period=10, threshold=85) -> bool:
"""
@ -1294,63 +1298,63 @@ class FreqaiPrimer(IStrategy):
"""
基于加权分段的trend_score判断趋势状态
规则
- 将最近20个trend_score分为3段1-3(权重10)4-10(权重7)11-20(权重3)
- 计算加权平均得分映射到0-100区间
- 根据两个可优化的阈值判断趋势状态
- 将最近N个trend_score分为3段first, second, third
- 长度比例为first:second:third = 1:3:5
- 例如1,3,52,6,103,9,154,12,205,15,25
- 权重first(权重10)second(权重7)third(权重3)
- 变量first段长度(范围1-5)由FreqAI模型学习得到
"""
pair = metadata.get('pair', 'Unknown')
try:
# 获取最近20个周期的trend_score历史
if len(dataframe) < 20:
logger.warning(f"[{pair}] 数据不足20个周期返回震荡趋势")
# 从FreqAI模型获取第一段长度预测值
first_length = self.get_trend_segment_length_from_freqai(dataframe, metadata)
# 根据比例计算第二段和第三段长度
second_length = first_length * 3
third_length = first_length * 5
# 计算总长度
total_length_needed = first_length + second_length + third_length
# 检查数据是否充足
if len(dataframe) < total_length_needed:
logger.warning(f"[{pair}] 数据不足{total_length_needed}个周期,返回震荡趋势")
return "ranging"
# 计算最近20个周期的trend_score
trend_scores_20 = []
# 获取所需长度的trend_score历史
trend_scores = []
actual_total_length = len(dataframe)
# 获取dataframe的实际索引长度
total_length = len(dataframe)
for i in range(-20, 0):
for i in range(-total_length_needed, 0):
# 确保索引在有效范围内
if abs(i) > total_length:
if abs(i) > actual_total_length:
logger.warning(f"[{pair}] 索引 {i} 超出数据范围,使用默认趋势得分 50")
trend_scores_20.append(50)
trend_scores.append(50)
continue
# 获取历史数据片段 - 使用正确的切片方式
# 获取历史数据片段
end_idx = i + 1 if i != -1 else None
hist_df = dataframe.iloc[:end_idx]
if hist_df.empty:
logger.warning(f"[{pair}] 历史数据片段为空,使用默认趋势得分 50")
trend_scores_20.append(50)
trend_scores.append(50)
continue
# 获取时间戳 - 使用更可靠的方式
# 获取时间戳
try:
# 获取最后一个时间点的索引
last_idx = hist_df.index[-1]
# 如果是DatetimeIndex直接获取timestamp
if isinstance(last_idx, pd.Timestamp):
timestamp = int(last_idx.timestamp())
elif hasattr(last_idx, 'timestamp'):
# 其他有timestamp属性的类型
timestamp = int(last_idx.timestamp())
else:
# 如果是数值索引,使用时间戳作为标识
timestamp = int(pd.Timestamp.now().timestamp()) + i
logger.debug(f"[{pair}] 成功生成时间戳:{timestamp}, 对应时间:{last_idx}")
except Exception as e:
logger.error(f"[{pair}] 无法生成时间戳: {e}")
# 使用当前时间戳加上偏移量作为后备方案
timestamp = int(pd.Timestamp.now().timestamp()) + i
logger.warning(f"[{pair}] 使用后备时间戳:{timestamp}")
# 使用新的 get_trend_score_with_cache 方法
# 获取趋势得分
score = self.get_trend_score_with_cache(
pair=pair,
timeframe=self.timeframe,
@ -1358,32 +1362,31 @@ class FreqaiPrimer(IStrategy):
dataframe=hist_df,
metadata=metadata
)
trend_scores_20.append(score)
trend_scores.append(score)
# 验证结果数量
if len(trend_scores_20) < 20:
logger.warning(f"[{pair}] 只获取到 {len(trend_scores_20)} 个趋势得分需要20个")
# 用默认值填充缺失的得分
while len(trend_scores_20) < 20:
trend_scores_20.append(50)
if len(trend_scores) < total_length_needed:
logger.warning(f"[{pair}] 只获取到 {len(trend_scores)} 个趋势得分,需要{total_length_needed}")
while len(trend_scores) < total_length_needed:
trend_scores.append(50)
# 分段计算加权得分
# 第一段:最近1-3个周期 (索引-3到-1)
segment1 = trend_scores_20[-3:]
# 第一段:最近first_length个周期
segment1 = trend_scores[-first_length:]
weighted_score1 = sum(score * 10 for score in segment1) / len(segment1)
# 第二段:4-10个周期 (索引-10到-4)
segment2 = trend_scores_20[-10:-3]
# 第二段:接下来的second_length个周期
segment2 = trend_scores[-(first_length + second_length):-first_length]
weighted_score2 = sum(score * 7 for score in segment2) / len(segment2)
# 第三段:11-20个周期 (索引-20到-11)
segment3 = trend_scores_20[-20:-10]
# 第三段:最后的third_length个周期
segment3 = trend_scores[-total_length_needed:-(first_length + second_length)]
weighted_score3 = sum(score * 3 for score in segment3) / len(segment3)
# 计算最终加权得分
final_weighted_score = (weighted_score1 + weighted_score2 + weighted_score3) / (10 + 7 + 3)
# 将得分映射到0-100区间(确保在合理范围内)
# 将得分映射到0-100区间
final_score = max(0, min(100, final_weighted_score))
# 使用hyperopt优化的阈值判断趋势状态
@ -1393,22 +1396,183 @@ class FreqaiPrimer(IStrategy):
# 判定趋势状态
if final_score >= bullish_threshold:
trend_status = "bullish"
logger.info(f"[{pair}] 🚀 检测到上涨趋势: 最终加权得分={final_score:.2f}, 阈值≥{bullish_threshold}")
logger.info(f"[{pair}] 🚀 检测到上涨趋势: 最终加权得分={final_score:.2f}, 阈值≥{bullish_threshold}, 段长={first_length},{second_length},{third_length}")
elif final_score <= bearish_threshold:
trend_status = "bearish"
logger.info(f"[{pair}] 📉 检测到下跌趋势: 最终加权得分={final_score:.2f}, 阈值≤{bearish_threshold}")
logger.info(f"[{pair}] 📉 检测到下跌趋势: 最终加权得分={final_score:.2f}, 阈值≤{bearish_threshold}, 段长={first_length},{second_length},{third_length}")
else:
trend_status = "ranging"
logger.info(f"[{pair}] ⚖️ 检测到震荡趋势: 最终加权得分={final_score:.2f}, 阈值范围({bearish_threshold}, {bullish_threshold})")
logger.info(f"[{pair}] ⚖️ 检测到震荡趋势: 最终加权得分={final_score:.2f}, 阈值范围({bearish_threshold}, {bullish_threshold}), 段长={first_length},{second_length},{third_length}")
# 输出分段详细信息用于调试
logger.debug(f"[{pair}] 趋势分析详情 - "
f"第一段(1-3,权重10): {[f'{s:.1f}' for s in segment1]}, "
f"第二段(4-10,权重7): {[f'{s:.1f}' for s in segment2]}, "
f"第三段(11-20,权重3): {[f'{s:.1f}' for s in segment3]}")
f"第一段({first_length},权重10): {[f'{s:.1f}' for s in segment1]}, "
f"第二段({second_length},权重7): {[f'{s:.1f}' for s in segment2]}, "
f"第三段({third_length},权重3): {[f'{s:.1f}' for s in segment3]}")
return trend_status
except Exception as e:
logger.error(f"[{pair}] 趋势状态检测失败: {e}")
return "ranging"
def get_trend_segment_length_from_freqai(self, dataframe: DataFrame, metadata: dict) -> int:
"""
从FreqAI模型获取趋势分段的第一段长度预测值
范围1-5比例关系first:second:third = 1:3:5
"""
pair = metadata.get('pair', 'Unknown')
try:
# 检查是否启用了FreqAI
if not hasattr(self, 'freqai') or self.freqai is None:
logger.warning(f"[{pair}] FreqAI未启用使用默认段长2")
return 2
# 准备特征数据
features_df = dataframe.tail(50).copy() # 使用最近50个周期的数据作为特征
# 确保有足够的特征数据
if len(features_df) < 10:
logger.warning(f"[{pair}] 特征数据不足使用默认段长2")
return 2
# 构建特征
features = self.build_trend_segment_features(features_df)
# 使用FreqAI模型预测
predictions = self.freqai.predict(features, metadata)
# 获取第一段长度预测值
if 'trend_segment_length' in predictions:
predicted_length = int(predictions['trend_segment_length'])
# 确保在有效范围内
predicted_length = max(1, min(5, predicted_length))
logger.info(f"[{pair}] FreqAI预测段长: {predicted_length} (比例: {predicted_length},{predicted_length*3},{predicted_length*5})")
return predicted_length
else:
logger.warning(f"[{pair}] FreqAI未返回段长预测使用默认段长2")
return 2
except Exception as e:
logger.error(f"[{pair}] FreqAI段长预测失败: {e}")
return 2
def build_trend_segment_features(self, dataframe: DataFrame) -> pd.DataFrame:
"""
构建用于预测趋势分段长度的特征
"""
features = pd.DataFrame(index=dataframe.index)
# 基础价格特征
features['close'] = dataframe['close']
features['volume'] = dataframe['volume']
# 技术指标特征
features['rsi'] = ta.RSI(dataframe['close'], timeperiod=14)
features['macd'] = ta.MACD(dataframe['close'])['macd']
features['bb_upper'], features['bb_middle'], features['bb_lower'] = ta.BBANDS(dataframe['close'])
features['bb_width'] = features['bb_upper'] - features['bb_lower']
features['bb_position'] = (dataframe['close'] - features['bb_lower']) / features['bb_width']
# 波动率特征
features['atr'] = ta.ATR(dataframe['high'], dataframe['low'], dataframe['close'], timeperiod=14)
features['volatility'] = dataframe['close'].pct_change().rolling(window=20).std()
# 趋势特征
features['sma_20'] = ta.SMA(dataframe['close'], timeperiod=20)
features['sma_50'] = ta.SMA(dataframe['close'], timeperiod=50)
features['trend_strength'] = abs(features['sma_20'] - features['sma_50']) / features['sma_50']
# 成交量特征
features['volume_sma'] = ta.SMA(dataframe['volume'], timeperiod=20)
features['volume_ratio'] = dataframe['volume'] / features['volume_sma']
# 价格动量特征
features['price_momentum'] = dataframe['close'].pct_change(periods=10)
features['price_acceleration'] = features['price_momentum'].diff()
# 清理NaN值
features = features.fillna(0)
return features
def calculate_optimal_segment_length(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
基于未来价格表现计算最优的趋势分段长度
评估不同段长组合在历史数据上的表现选择最优的段长
"""
pair = metadata.get('pair', 'Unknown')
# 创建目标变量列
dataframe["&-trend_segment_length"] = 2 # 默认值
if len(dataframe) < 50:
logger.warning(f"[{pair}] 数据量不足,无法计算最优段长")
return dataframe
# 为每个时间点计算最优段长
for i in range(30, len(dataframe) - 25): # 留出足够的未来数据
current_idx = i
# 评估不同段长组合
best_length = 2
best_score = -np.inf
for test_length in range(1, 6): # 1-5
# 计算段长
first_len = test_length
second_len = test_length * 3
third_len = test_length * 5
total_len = first_len + second_len + third_len
if current_idx + total_len + 5 >= len(dataframe):
continue # 数据不足,跳过
# 计算加权趋势得分
try:
# 获取历史数据段
hist_data = dataframe.iloc[current_idx-total_len:current_idx]
if len(hist_data) < total_len:
continue
# 模拟计算趋势得分(简化版本)
close_prices = hist_data['close'].values
# 第一段:最近价格
segment1 = close_prices[-first_len:]
segment2 = close_prices[-(first_len+second_len):-first_len]
segment3 = close_prices[-total_len:-(first_len+second_len)]
# 计算价格变化趋势
trend1 = (segment1[-1] - segment1[0]) / segment1[0] if len(segment1) > 1 else 0
trend2 = (segment2[-1] - segment2[0]) / segment2[0] if len(segment2) > 1 else 0
trend3 = (segment3[-1] - segment3[0]) / segment3[0] if len(segment3) > 1 else 0
# 加权得分
weighted_score = (trend1 * 10 + trend2 * 7 + trend3 * 3) / 20
# 评估未来表现
future_data = dataframe.iloc[current_idx:current_idx+5] # 未来5个周期
if len(future_data) < 5:
continue
future_return = (future_data['close'].iloc[-1] - future_data['close'].iloc[0]) / future_data['close'].iloc[0]
# 计算得分:趋势得分与未来收益的相关性
# 正趋势应该对应正未来收益,负趋势对应负未来收益
correlation_score = weighted_score * future_return
if correlation_score > best_score:
best_score = correlation_score
best_length = test_length
except Exception as e:
logger.debug(f"[{pair}] 段长评估失败: {e}")
continue
# 设置最优段长
dataframe.loc[dataframe.index[current_idx], "&-trend_segment_length"] = float(best_length)
logger.info(f"[{pair}] 最优趋势分段长度计算完成")
return dataframe