201 lines
7.6 KiB
Python
201 lines
7.6 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
增强版加仓分析脚本
|
|
根据用户定义的规则分析加仓操作,并提供详细的时间间隔分析
|
|
"""
|
|
|
|
import json
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
def timestamp_to_datetime(timestamp):
|
|
"""将毫秒级时间戳转换为可读的日期时间"""
|
|
if isinstance(timestamp, (int, float)) and timestamp > 1000000000000:
|
|
# 毫秒级时间戳
|
|
return datetime.fromtimestamp(timestamp/1000).strftime('%Y-%m-%d %H:%M:%S')
|
|
elif isinstance(timestamp, str):
|
|
return timestamp
|
|
return str(timestamp)
|
|
|
|
def analyze_user_defined_positions(trades):
|
|
"""
|
|
根据用户定义的规则分析加仓操作
|
|
规则:
|
|
1. 无明确出场记录则视为彻底出场,后续入场为初次进场
|
|
2. 初次进场后的所有进场视为加仓
|
|
3. 出现一次出场后,后续入场为初次入场
|
|
4. 无任何入场出场记录时,第一次入场为初次入场
|
|
"""
|
|
print("=== 用户定义加仓规则分析 ===")
|
|
print("规则说明:")
|
|
print(" 1. 无明确出场记录则视为彻底出场,后续入场为初次进场")
|
|
print(" 2. 初次进场后的所有进场视为加仓")
|
|
print(" 3. 出现一次出场后,后续入场为初次入场")
|
|
print(" 4. 无任何入场出场记录时,第一次入场为初次入场")
|
|
|
|
print(f"\n总交易数: {len(trades)}")
|
|
|
|
# 统计信息
|
|
multi_entry_trades = 0 # 有多笔入场订单的交易数
|
|
total_add_positions = 0 # 总加仓次数
|
|
detailed_analysis = [] # 详细分析结果
|
|
time_intervals = [] # 时间间隔列表
|
|
|
|
for i, trade in enumerate(trades):
|
|
entries = trade.get('entries', [])
|
|
pair = trade.get('pair', f'Trade_{i+1}')
|
|
|
|
# 如果只有一笔入场订单,则没有加仓
|
|
if len(entries) <= 1:
|
|
continue
|
|
|
|
# 有多笔入场订单的交易
|
|
multi_entry_trades += 1
|
|
add_positions_in_trade = len(entries) - 1 # 第一笔是初次入场,其余都是加仓
|
|
total_add_positions += add_positions_in_trade
|
|
|
|
# 分析时间间隔
|
|
entry_timestamps = []
|
|
valid_entries = []
|
|
for entry in entries:
|
|
timestamp = entry.get('timestamp')
|
|
if timestamp is not None and timestamp != 'N/A':
|
|
entry_timestamps.append(timestamp)
|
|
valid_entries.append(entry)
|
|
|
|
# 计算时间间隔
|
|
intervals = []
|
|
for j in range(1, len(entry_timestamps)):
|
|
if isinstance(entry_timestamps[j], (int, float)) and isinstance(entry_timestamps[j-1], (int, float)):
|
|
interval = abs(entry_timestamps[j] - entry_timestamps[j-1]) / 1000 # 转换为秒
|
|
intervals.append(interval)
|
|
time_intervals.append(interval)
|
|
|
|
# 记录详细信息
|
|
trade_info = {
|
|
'pair': pair,
|
|
'total_entries': len(entries),
|
|
'valid_entries': len(valid_entries),
|
|
'add_positions': add_positions_in_trade,
|
|
'entries': entries,
|
|
'intervals': intervals,
|
|
'entry_timestamps': entry_timestamps
|
|
}
|
|
detailed_analysis.append(trade_info)
|
|
|
|
print(f"\n有多笔入场订单的交易数: {multi_entry_trades}")
|
|
print(f"总的加仓次数: {total_add_positions}")
|
|
|
|
# 详细分析有多笔入场订单的交易
|
|
if detailed_analysis:
|
|
print("\n=== 详细加仓分析 ===")
|
|
for trade_info in detailed_analysis:
|
|
print(f"\n交易对: {trade_info['pair']}")
|
|
print(f" 总入场订单数: {trade_info['total_entries']}")
|
|
print(f" 有效时间戳订单数: {trade_info['valid_entries']}")
|
|
print(f" 加仓次数: {trade_info['add_positions']}")
|
|
|
|
# 分析每笔订单
|
|
entries = trade_info['entries']
|
|
entry_timestamps = trade_info['entry_timestamps']
|
|
|
|
for j, entry in enumerate(entries):
|
|
order_type = entry.get('order_type', 'unknown')
|
|
timestamp = entry.get('timestamp', 'N/A')
|
|
dt_str = timestamp_to_datetime(timestamp)
|
|
|
|
if j == 0:
|
|
position_type = "初次入场"
|
|
else:
|
|
position_type = "加仓"
|
|
|
|
print(f" 订单 {j+1} ({order_type}): {dt_str} ({timestamp}) - {position_type}")
|
|
|
|
# 显示时间间隔
|
|
intervals = trade_info['intervals']
|
|
if intervals:
|
|
print(f" 时间间隔分析:")
|
|
for k, interval in enumerate(intervals):
|
|
print(f" 订单 {k+2} 与订单 {k+1} 的间隔: {interval:.0f} 秒 ({interval/60:.1f} 分钟)")
|
|
|
|
# 时间间隔统计
|
|
if time_intervals:
|
|
print(f"\n=== 时间间隔统计 ===")
|
|
print(f"平均时间间隔: {sum(time_intervals)/len(time_intervals):.0f} 秒 ({sum(time_intervals)/len(time_intervals)/60:.1f} 分钟)")
|
|
print(f"最短时间间隔: {min(time_intervals):.0f} 秒 ({min(time_intervals)/60:.1f} 分钟)")
|
|
print(f"最长时间间隔: {max(time_intervals):.0f} 秒 ({max(time_intervals)/60:.1f} 分钟)")
|
|
|
|
# 找到最短和最长间隔的交易
|
|
if detailed_analysis:
|
|
# 找到包含最短时间间隔的交易
|
|
min_interval_trade = None
|
|
min_interval_index = -1
|
|
for trade_info in detailed_analysis:
|
|
intervals = trade_info['intervals']
|
|
if intervals and min(intervals) == min(time_intervals):
|
|
min_interval_trade = trade_info
|
|
min_interval_index = intervals.index(min(intervals))
|
|
break
|
|
|
|
if min_interval_trade:
|
|
print(f"\n⏱️ 最短时间间隔详情:")
|
|
print(f" 交易对: {min_interval_trade['pair']}")
|
|
print(f" 时间间隔: {min(time_intervals):.0f} 秒 ({min(time_intervals)/60:.1f} 分钟)")
|
|
|
|
if not detailed_analysis:
|
|
print("\n🔍 分析结果:")
|
|
print(" 当前数据中没有发现加仓操作")
|
|
print(" 所有交易都只有初始入场订单")
|
|
print(" 所有交易的开仓和平仓时间相同")
|
|
|
|
def analyze_backtest_trades():
|
|
"""分析回测交易数据"""
|
|
# 定义结果目录
|
|
result_dir = Path('../result')
|
|
|
|
# 按优先级查找数据文件
|
|
data_files = [
|
|
('测试数据', result_dir / 'test_trades_with_entries.json'),
|
|
('回测数据', result_dir / 'backtest_trades.json')
|
|
]
|
|
|
|
data_source = ""
|
|
data_file = None
|
|
|
|
for source, file_path in data_files:
|
|
if file_path.exists():
|
|
data_source = source
|
|
data_file = file_path
|
|
break
|
|
|
|
if not data_file:
|
|
print(f"❌ 未找到数据文件")
|
|
# 尝试查找其他可能的文件
|
|
for file_path in result_dir.glob('*.json'):
|
|
if 'trade' in file_path.name.lower():
|
|
data_file = file_path
|
|
data_source = "未知数据"
|
|
break
|
|
|
|
if not data_file or not data_file.exists():
|
|
print(f"❌ 文件不存在")
|
|
return
|
|
|
|
# 读取JSON文件
|
|
try:
|
|
with open(data_file, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
except Exception as e:
|
|
print(f"❌ 读取文件失败: {e}")
|
|
return
|
|
|
|
print(f"=== 数据源: {data_source} ({data_file.name}) ===")
|
|
analyze_user_defined_positions(data)
|
|
|
|
def main():
|
|
"""主函数"""
|
|
analyze_backtest_trades()
|
|
|
|
if __name__ == "__main__":
|
|
main() |