#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 增强版加仓分析脚本 根据用户定义的规则分析加仓操作,并提供详细的时间间隔分析 """ import json from pathlib import Path from datetime import datetime def timestamp_to_datetime(timestamp): """将毫秒级时间戳转换为可读的日期时间""" if isinstance(timestamp, (int, float)) and timestamp > 1000000000000: # 毫秒级时间戳 return datetime.fromtimestamp(timestamp/1000).strftime('%Y-%m-%d %H:%M:%S') elif isinstance(timestamp, str): return timestamp return str(timestamp) def analyze_user_defined_positions(trades): """ 根据用户定义的规则分析加仓操作 规则: 1. 无明确出场记录则视为彻底出场,后续入场为初次进场 2. 初次进场后的所有进场视为加仓 3. 出现一次出场后,后续入场为初次入场 4. 无任何入场出场记录时,第一次入场为初次入场 """ print("=== 用户定义加仓规则分析 ===") print("规则说明:") print(" 1. 无明确出场记录则视为彻底出场,后续入场为初次进场") print(" 2. 初次进场后的所有进场视为加仓") print(" 3. 出现一次出场后,后续入场为初次入场") print(" 4. 无任何入场出场记录时,第一次入场为初次入场") print(f"\n总交易数: {len(trades)}") # 统计信息 multi_entry_trades = 0 # 有多笔入场订单的交易数 total_add_positions = 0 # 总加仓次数 detailed_analysis = [] # 详细分析结果 time_intervals = [] # 时间间隔列表 for i, trade in enumerate(trades): entries = trade.get('entries', []) pair = trade.get('pair', f'Trade_{i+1}') # 如果只有一笔入场订单,则没有加仓 if len(entries) <= 1: continue # 有多笔入场订单的交易 multi_entry_trades += 1 add_positions_in_trade = len(entries) - 1 # 第一笔是初次入场,其余都是加仓 total_add_positions += add_positions_in_trade # 分析时间间隔 entry_timestamps = [] valid_entries = [] for entry in entries: timestamp = entry.get('timestamp') if timestamp is not None and timestamp != 'N/A': entry_timestamps.append(timestamp) valid_entries.append(entry) # 计算时间间隔 intervals = [] for j in range(1, len(entry_timestamps)): if isinstance(entry_timestamps[j], (int, float)) and isinstance(entry_timestamps[j-1], (int, float)): interval = abs(entry_timestamps[j] - entry_timestamps[j-1]) / 1000 # 转换为秒 intervals.append(interval) time_intervals.append(interval) # 记录详细信息 trade_info = { 'pair': pair, 'total_entries': len(entries), 'valid_entries': len(valid_entries), 'add_positions': add_positions_in_trade, 'entries': entries, 'intervals': intervals, 'entry_timestamps': entry_timestamps } detailed_analysis.append(trade_info) print(f"\n有多笔入场订单的交易数: {multi_entry_trades}") print(f"总的加仓次数: {total_add_positions}") # 详细分析有多笔入场订单的交易 if detailed_analysis: print("\n=== 详细加仓分析 ===") for trade_info in detailed_analysis: print(f"\n交易对: {trade_info['pair']}") print(f" 总入场订单数: {trade_info['total_entries']}") print(f" 有效时间戳订单数: {trade_info['valid_entries']}") print(f" 加仓次数: {trade_info['add_positions']}") # 分析每笔订单 entries = trade_info['entries'] entry_timestamps = trade_info['entry_timestamps'] for j, entry in enumerate(entries): order_type = entry.get('order_type', 'unknown') timestamp = entry.get('timestamp', 'N/A') dt_str = timestamp_to_datetime(timestamp) if j == 0: position_type = "初次入场" else: position_type = "加仓" print(f" 订单 {j+1} ({order_type}): {dt_str} ({timestamp}) - {position_type}") # 显示时间间隔 intervals = trade_info['intervals'] if intervals: print(f" 时间间隔分析:") for k, interval in enumerate(intervals): print(f" 订单 {k+2} 与订单 {k+1} 的间隔: {interval:.0f} 秒 ({interval/60:.1f} 分钟)") # 时间间隔统计 if time_intervals: print(f"\n=== 时间间隔统计 ===") print(f"平均时间间隔: {sum(time_intervals)/len(time_intervals):.0f} 秒 ({sum(time_intervals)/len(time_intervals)/60:.1f} 分钟)") print(f"最短时间间隔: {min(time_intervals):.0f} 秒 ({min(time_intervals)/60:.1f} 分钟)") print(f"最长时间间隔: {max(time_intervals):.0f} 秒 ({max(time_intervals)/60:.1f} 分钟)") # 找到最短和最长间隔的交易 if detailed_analysis: # 找到包含最短时间间隔的交易 min_interval_trade = None min_interval_index = -1 for trade_info in detailed_analysis: intervals = trade_info['intervals'] if intervals and min(intervals) == min(time_intervals): min_interval_trade = trade_info min_interval_index = intervals.index(min(intervals)) break if min_interval_trade: print(f"\n⏱️ 最短时间间隔详情:") print(f" 交易对: {min_interval_trade['pair']}") print(f" 时间间隔: {min(time_intervals):.0f} 秒 ({min(time_intervals)/60:.1f} 分钟)") if not detailed_analysis: print("\n🔍 分析结果:") print(" 当前数据中没有发现加仓操作") print(" 所有交易都只有初始入场订单") print(" 所有交易的开仓和平仓时间相同") def analyze_backtest_trades(): """分析回测交易数据""" # 定义结果目录 result_dir = Path('../result') # 按优先级查找数据文件 data_files = [ ('测试数据', result_dir / 'test_trades_with_entries.json'), ('回测数据', result_dir / 'backtest_trades.json') ] data_source = "" data_file = None for source, file_path in data_files: if file_path.exists(): data_source = source data_file = file_path break if not data_file: print(f"❌ 未找到数据文件") # 尝试查找其他可能的文件 for file_path in result_dir.glob('*.json'): if 'trade' in file_path.name.lower(): data_file = file_path data_source = "未知数据" break if not data_file or not data_file.exists(): print(f"❌ 文件不存在") return # 读取JSON文件 try: with open(data_file, 'r', encoding='utf-8') as f: data = json.load(f) except Exception as e: print(f"❌ 读取文件失败: {e}") return print(f"=== 数据源: {data_source} ({data_file.name}) ===") analyze_user_defined_positions(data) def main(): """主函数""" analyze_backtest_trades() if __name__ == "__main__": main()