分段长度去掉freqai优化

This commit is contained in:
zhangkun9038@dingtalk.com 2025-08-17 01:11:56 +08:00
parent 2673588121
commit 0332a1f9f3

View File

@ -1300,15 +1300,15 @@ class FreqaiPrimer(IStrategy):
规则
- 将最近N个trend_score分为3段first, second, third
- 长度比例为first:second:third = 1:3:5
- 例如1,3,52,6,103,9,154,12,205,15,25
- 例如2,6,10固定使用段长2
- 权重first(权重10)second(权重7)third(权重3)
- 变量first段长度(范围1-5)由FreqAI模型学习得到
- 使用静态段长first段长度固定为2
"""
pair = metadata.get('pair', 'Unknown')
try:
# 从FreqAI模型获取第一段长度预测值
first_length = self.get_trend_segment_length_from_freqai(dataframe, metadata)
# 直接使用静态值2作为第一段长度
first_length = 2
# 根据比例计算第二段和第三段长度
second_length = first_length * 3
@ -1415,224 +1415,3 @@ class FreqaiPrimer(IStrategy):
except Exception as e:
logger.error(f"[{pair}] 趋势状态检测失败: {e}")
return "ranging"
def get_trend_segment_length_from_freqai(self, dataframe: DataFrame, metadata: dict) -> int:
"""
从FreqAI模型获取趋势分段的第一段长度预测值
范围1-5比例关系first:second:third = 1:3:5
"""
pair = metadata.get('pair', 'Unknown')
try:
# 检查是否启用了FreqAI
if not hasattr(self, 'freqai') or self.freqai is None:
logger.warning(f"[{pair}] FreqAI未启用使用默认段长2")
return 2
# 准备特征数据
features_df = dataframe.tail(50).copy() # 使用最近50个周期的数据作为特征
# 确保有足够的特征数据
if len(features_df) < 10:
logger.warning(f"[{pair}] 特征数据不足使用默认段长2")
return 2
# 构建特征
features = self.build_trend_segment_features(features_df)
# 使用FreqAI模型预测
predictions = self.freqai.predict(features, metadata)
# 调试:记录预测结果的类型和结构
logger.debug(f"[{pair}] FreqAI预测结果类型: {type(predictions)}, 内容: {predictions}")
# 获取第一段长度预测值
predicted_length = 2 # 默认值
try:
if isinstance(predictions, dict):
if 'trend_segment_length' in predictions:
predicted_length = int(predictions['trend_segment_length'])
elif 'prediction' in predictions:
predicted_length = int(predictions['prediction'])
else:
# 尝试获取字典中的第一个数值
for key, value in predictions.items():
if isinstance(value, (int, float, np.number)):
predicted_length = int(value)
break
elif isinstance(predictions, (list, tuple)) and len(predictions) > 0:
# 如果是列表或元组,使用第一个预测值
first_pred = predictions[0]
if isinstance(first_pred, dict):
if 'trend_segment_length' in first_pred:
predicted_length = int(first_pred['trend_segment_length'])
else:
predicted_length = int(list(first_pred.values())[0])
else:
predicted_length = int(first_pred)
elif hasattr(predictions, 'trend_segment_length'):
# 如果是对象属性
predicted_length = int(predictions.trend_segment_length)
else:
logger.warning(f"[{pair}] 无法识别的预测格式使用默认段长2")
except (ValueError, IndexError, TypeError) as e:
logger.error(f"[{pair}] 解析预测结果失败: {e}, 使用默认段长2")
predicted_length = 2
# 确保在有效范围内
predicted_length = max(1, min(5, predicted_length))
logger.info(f"[{pair}] FreqAI预测段长: {predicted_length} (比例: {predicted_length}:{predicted_length*3}:{predicted_length*5})")
logger.info(f"[{pair}] FreqAI预测详情 - "
f"第一段长度: {predicted_length}, "
f"第二段长度: {predicted_length*3}, "
f"第三段长度: {predicted_length*5}, "
f"总分析周期: {predicted_length + predicted_length*3 + predicted_length*5}")
return predicted_length
else:
logger.warning(f"[{pair}] FreqAI未返回段长预测使用默认段长2")
logger.info(f"[{pair}] 使用默认段长 - 第一段长度: 2, 第二段长度: 6, 第三段长度: 10, 总分析周期: 18")
return 2
except Exception as e:
logger.error(f"[{pair}] FreqAI段长预测失败: {e}")
return 2
def build_trend_segment_features(self, dataframe: DataFrame) -> pd.DataFrame:
"""
构建用于预测趋势分段长度的特征
"""
features = pd.DataFrame(index=dataframe.index)
# 基础价格特征
features['close'] = dataframe['close']
features['volume'] = dataframe['volume']
# 技术指标特征
features['rsi'] = ta.RSI(dataframe['close'], timeperiod=14)
features['macd'] = ta.MACD(dataframe['close'])['macd']
features['bb_upper'], features['bb_middle'], features['bb_lower'] = ta.BBANDS(dataframe['close'])
features['bb_width'] = features['bb_upper'] - features['bb_lower']
features['bb_position'] = (dataframe['close'] - features['bb_lower']) / features['bb_width']
# 波动率特征
features['atr'] = ta.ATR(dataframe['high'], dataframe['low'], dataframe['close'], timeperiod=14)
features['volatility'] = dataframe['close'].pct_change().rolling(window=20).std()
# 趋势特征
features['sma_20'] = ta.SMA(dataframe['close'], timeperiod=20)
features['sma_50'] = ta.SMA(dataframe['close'], timeperiod=50)
features['trend_strength'] = abs(features['sma_20'] - features['sma_50']) / features['sma_50']
# 成交量特征
features['volume_sma'] = ta.SMA(dataframe['volume'], timeperiod=20)
features['volume_ratio'] = dataframe['volume'] / features['volume_sma']
# 价格动量特征
features['price_momentum'] = dataframe['close'].pct_change(periods=10)
features['price_acceleration'] = features['price_momentum'].diff()
# 清理NaN值
features = features.fillna(0)
return features
def calculate_optimal_segment_length(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
基于未来价格表现计算最优的趋势分段长度
评估不同段长组合在历史数据上的表现选择最优的段长
"""
pair = metadata.get('pair', 'Unknown')
# 创建目标变量列
dataframe["&-trend_segment_length"] = 2 # 默认值
if len(dataframe) < 50:
logger.warning(f"[{pair}] 数据量不足,无法计算最优段长")
return dataframe
# 为每个时间点计算最优段长
for i in range(30, len(dataframe) - 25): # 留出足够的未来数据
current_idx = i
# 评估不同段长组合
best_length = 2
best_score = -np.inf
for test_length in range(1, 6): # 1-5
# 计算段长
first_len = test_length
second_len = test_length * 3
third_len = test_length * 5
total_len = first_len + second_len + third_len
if current_idx + total_len + 5 >= len(dataframe):
continue # 数据不足,跳过
# 计算加权趋势得分
try:
# 获取历史数据段
hist_data = dataframe.iloc[current_idx-total_len:current_idx]
if len(hist_data) < total_len:
continue
# 模拟计算趋势得分(简化版本)
close_prices = hist_data['close'].values
# 第一段:最近价格
segment1 = close_prices[-first_len:]
segment2 = close_prices[-(first_len+second_len):-first_len]
segment3 = close_prices[-total_len:-(first_len+second_len)]
# 计算价格变化趋势
trend1 = (segment1[-1] - segment1[0]) / segment1[0] if len(segment1) > 1 else 0
trend2 = (segment2[-1] - segment2[0]) / segment2[0] if len(segment2) > 1 else 0
trend3 = (segment3[-1] - segment3[0]) / segment3[0] if len(segment3) > 1 else 0
# 加权得分
weighted_score = (trend1 * 10 + trend2 * 7 + trend3 * 3) / 20
# 评估未来表现
future_data = dataframe.iloc[current_idx:current_idx+5] # 未来5个周期
if len(future_data) < 5:
continue
future_return = (future_data['close'].iloc[-1] - future_data['close'].iloc[0]) / future_data['close'].iloc[0]
# 计算得分:趋势得分与未来收益的相关性
# 正趋势应该对应正未来收益,负趋势对应负未来收益
correlation_score = weighted_score * future_return
if correlation_score > best_score:
best_score = correlation_score
best_length = test_length
except Exception as e:
logger.debug(f"[{pair}] 段长评估失败: {e}")
continue
# 设置最优段长
dataframe.loc[dataframe.index[current_idx], "&-trend_segment_length"] = float(best_length)
# 输出具体的最优段长结果
logger.info(f"[{pair}] 最优趋势分段长度计算完成 - "
f"第一段长度: {best_length}, "
f"第二段长度: {best_length*3}, "
f"第三段长度: {best_length*5}, "
f"总长度: {best_length + best_length*3 + best_length*5}, "
f"最佳得分: {best_score:.4f}")
# 计算统计信息并输出
segment_lengths = dataframe["&-trend_segment_length"].dropna()
if len(segment_lengths) > 0:
length_counts = segment_lengths.value_counts()
logger.info(f"[{pair}] 趋势分段长度统计:")
for length in sorted(length_counts.index):
count = length_counts[length]
percentage = (count / len(segment_lengths)) * 100
logger.info(f"[{pair}] 段长{int(length)}: {count}次 ({percentage:.1f}%) - 比例 {int(length)}:{int(length*3)}:{int(length*5)}")
avg_length = segment_lengths.mean()
logger.info(f"[{pair}] 平均段长: {avg_length:.2f}")
logger.info(f"[{pair}] 最优趋势分段长度计算完成,共计算 {len(segment_lengths)} 个数据点")
return dataframe