myTestFreqAI/tools/advanced_position_analysis.py
2025-10-17 21:36:27 +08:00

201 lines
7.6 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
增强版加仓分析脚本
根据用户定义的规则分析加仓操作,并提供详细的时间间隔分析
"""
import json
from pathlib import Path
from datetime import datetime
def timestamp_to_datetime(timestamp):
"""将毫秒级时间戳转换为可读的日期时间"""
if isinstance(timestamp, (int, float)) and timestamp > 1000000000000:
# 毫秒级时间戳
return datetime.fromtimestamp(timestamp/1000).strftime('%Y-%m-%d %H:%M:%S')
elif isinstance(timestamp, str):
return timestamp
return str(timestamp)
def analyze_user_defined_positions(trades):
"""
根据用户定义的规则分析加仓操作
规则:
1. 无明确出场记录则视为彻底出场,后续入场为初次进场
2. 初次进场后的所有进场视为加仓
3. 出现一次出场后,后续入场为初次入场
4. 无任何入场出场记录时,第一次入场为初次入场
"""
print("=== 用户定义加仓规则分析 ===")
print("规则说明:")
print(" 1. 无明确出场记录则视为彻底出场,后续入场为初次进场")
print(" 2. 初次进场后的所有进场视为加仓")
print(" 3. 出现一次出场后,后续入场为初次入场")
print(" 4. 无任何入场出场记录时,第一次入场为初次入场")
print(f"\n总交易数: {len(trades)}")
# 统计信息
multi_entry_trades = 0 # 有多笔入场订单的交易数
total_add_positions = 0 # 总加仓次数
detailed_analysis = [] # 详细分析结果
time_intervals = [] # 时间间隔列表
for i, trade in enumerate(trades):
entries = trade.get('entries', [])
pair = trade.get('pair', f'Trade_{i+1}')
# 如果只有一笔入场订单,则没有加仓
if len(entries) <= 1:
continue
# 有多笔入场订单的交易
multi_entry_trades += 1
add_positions_in_trade = len(entries) - 1 # 第一笔是初次入场,其余都是加仓
total_add_positions += add_positions_in_trade
# 分析时间间隔
entry_timestamps = []
valid_entries = []
for entry in entries:
timestamp = entry.get('timestamp')
if timestamp is not None and timestamp != 'N/A':
entry_timestamps.append(timestamp)
valid_entries.append(entry)
# 计算时间间隔
intervals = []
for j in range(1, len(entry_timestamps)):
if isinstance(entry_timestamps[j], (int, float)) and isinstance(entry_timestamps[j-1], (int, float)):
interval = abs(entry_timestamps[j] - entry_timestamps[j-1]) / 1000 # 转换为秒
intervals.append(interval)
time_intervals.append(interval)
# 记录详细信息
trade_info = {
'pair': pair,
'total_entries': len(entries),
'valid_entries': len(valid_entries),
'add_positions': add_positions_in_trade,
'entries': entries,
'intervals': intervals,
'entry_timestamps': entry_timestamps
}
detailed_analysis.append(trade_info)
print(f"\n有多笔入场订单的交易数: {multi_entry_trades}")
print(f"总的加仓次数: {total_add_positions}")
# 详细分析有多笔入场订单的交易
if detailed_analysis:
print("\n=== 详细加仓分析 ===")
for trade_info in detailed_analysis:
print(f"\n交易对: {trade_info['pair']}")
print(f" 总入场订单数: {trade_info['total_entries']}")
print(f" 有效时间戳订单数: {trade_info['valid_entries']}")
print(f" 加仓次数: {trade_info['add_positions']}")
# 分析每笔订单
entries = trade_info['entries']
entry_timestamps = trade_info['entry_timestamps']
for j, entry in enumerate(entries):
order_type = entry.get('order_type', 'unknown')
timestamp = entry.get('timestamp', 'N/A')
dt_str = timestamp_to_datetime(timestamp)
if j == 0:
position_type = "初次入场"
else:
position_type = "加仓"
print(f" 订单 {j+1} ({order_type}): {dt_str} ({timestamp}) - {position_type}")
# 显示时间间隔
intervals = trade_info['intervals']
if intervals:
print(f" 时间间隔分析:")
for k, interval in enumerate(intervals):
print(f" 订单 {k+2} 与订单 {k+1} 的间隔: {interval:.0f} 秒 ({interval/60:.1f} 分钟)")
# 时间间隔统计
if time_intervals:
print(f"\n=== 时间间隔统计 ===")
print(f"平均时间间隔: {sum(time_intervals)/len(time_intervals):.0f} 秒 ({sum(time_intervals)/len(time_intervals)/60:.1f} 分钟)")
print(f"最短时间间隔: {min(time_intervals):.0f} 秒 ({min(time_intervals)/60:.1f} 分钟)")
print(f"最长时间间隔: {max(time_intervals):.0f} 秒 ({max(time_intervals)/60:.1f} 分钟)")
# 找到最短和最长间隔的交易
if detailed_analysis:
# 找到包含最短时间间隔的交易
min_interval_trade = None
min_interval_index = -1
for trade_info in detailed_analysis:
intervals = trade_info['intervals']
if intervals and min(intervals) == min(time_intervals):
min_interval_trade = trade_info
min_interval_index = intervals.index(min(intervals))
break
if min_interval_trade:
print(f"\n⏱️ 最短时间间隔详情:")
print(f" 交易对: {min_interval_trade['pair']}")
print(f" 时间间隔: {min(time_intervals):.0f} 秒 ({min(time_intervals)/60:.1f} 分钟)")
if not detailed_analysis:
print("\n🔍 分析结果:")
print(" 当前数据中没有发现加仓操作")
print(" 所有交易都只有初始入场订单")
print(" 所有交易的开仓和平仓时间相同")
def analyze_backtest_trades():
"""分析回测交易数据"""
# 定义结果目录
result_dir = Path('../result')
# 按优先级查找数据文件
data_files = [
('测试数据', result_dir / 'test_trades_with_entries.json'),
('回测数据', result_dir / 'backtest_trades.json')
]
data_source = ""
data_file = None
for source, file_path in data_files:
if file_path.exists():
data_source = source
data_file = file_path
break
if not data_file:
print(f"❌ 未找到数据文件")
# 尝试查找其他可能的文件
for file_path in result_dir.glob('*.json'):
if 'trade' in file_path.name.lower():
data_file = file_path
data_source = "未知数据"
break
if not data_file or not data_file.exists():
print(f"❌ 文件不存在")
return
# 读取JSON文件
try:
with open(data_file, 'r', encoding='utf-8') as f:
data = json.load(f)
except Exception as e:
print(f"❌ 读取文件失败: {e}")
return
print(f"=== 数据源: {data_source} ({data_file.name}) ===")
analyze_user_defined_positions(data)
def main():
"""主函数"""
analyze_backtest_trades()
if __name__ == "__main__":
main()