myTestFreqAI/tools/tradestocsv.py
2025-10-16 19:12:11 +00:00

202 lines
7.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import csv
import os
from pathlib import Path
from datetime import datetime
# 定义结果目录
result_dir = Path('../result')
# 确保结果目录存在
if not result_dir.exists():
raise FileNotFoundError(f"Directory {result_dir} does not exist")
# 寻找文件名包含 'backtest-result-' 的 JSON 文件
json_files = [f for f in result_dir.glob('*.json') if 'backtest-result-' in f.name]
if not json_files:
raise FileNotFoundError("No JSON files with 'backtest-result-' in name found in ../result")
# 首先尝试找到包含加仓交易的文件
files_with_adjustments = []
for json_file in json_files:
try:
with open(json_file) as f:
data = json.load(f)
# 检查是否包含加仓交易
has_adjustments = False
if 'strategy' in data:
for strategy_name, strategy_data in data['strategy'].items():
if 'trades' in strategy_data:
for trade in strategy_data['trades']:
orders = trade.get('orders', [])
entry_orders = [o for o in orders if o.get('ft_is_entry')]
if len(entry_orders) > 1:
has_adjustments = True
break
if has_adjustments:
break
if has_adjustments:
files_with_adjustments.append(json_file)
except:
continue
# 如果有包含加仓交易的文件,选择其中最大的;否则选择所有文件中最大的
if files_with_adjustments:
largest_file = max(files_with_adjustments, key=lambda x: x.stat().st_size)
print(f"Found {len(files_with_adjustments)} files with position adjustments, using the largest: {largest_file.name}")
else:
largest_file = max(json_files, key=lambda x: x.stat().st_size)
print(f"No files with position adjustments found, using the largest file: {largest_file.name}")
# 读取选定的 JSON 文件
with open(largest_file) as f:
data = json.load(f)
# 从环境变量中获取策略名称
strategy_name = os.environ.get('STRATEGY_NAME', 'FreqaiPrimer') # 默认使用FreqaiPrimer
# 获取交易记录
if 'strategy' not in data or strategy_name not in data['strategy'] or 'trades' not in data['strategy'][strategy_name]:
raise ValueError(f"Could not find trades data for strategy {strategy_name}")
trades = data['strategy'][strategy_name]['trades']
# 定义输出文件路径
output_csv = result_dir / 'backtest_trades.csv'
output_json = result_dir / 'backtest_trades.json'
# 定义CSV的字段名
fieldnames = [
'pair', 'open_date', 'close_date', 'open_rate', 'close_rate', 'amount',
'profit_ratio', 'profit_abs', 'exit_reason', 'fee_open', 'fee_close',
'trade_duration', 'min_rate', 'max_rate',
'entry_orders_count', 'adjustments_count', 'avg_entry_price',
'initial_entry_cost', 'total_adjustment_cost'
]
# 准备写入CSV和JSON
json_data = []
with open(output_csv, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
# 处理每笔交易
for trade in trades:
# 准备交易数据行
row = {
'pair': trade.get('pair', ''),
'open_date': trade.get('open_date', ''),
'close_date': trade.get('close_date', ''),
'open_rate': trade.get('open_rate', ''),
'close_rate': trade.get('close_rate', ''),
'amount': trade.get('amount', ''),
'profit_ratio': trade.get('profit_ratio', ''),
'profit_abs': trade.get('profit_abs', ''),
'exit_reason': trade.get('exit_reason', ''),
'fee_open': trade.get('fee_open', ''),
'fee_close': trade.get('fee_close', ''),
'trade_duration': trade.get('trade_duration', ''),
'min_rate': trade.get('min_rate', ''),
'max_rate': trade.get('max_rate', '')
}
# 从原始数据中获取订单信息
orders = trade.get('orders', [])
# 如果有时间戳但缺少open_date/close_date尝试从时间戳转换
if not row['open_date'] and trade.get('open_timestamp'):
row['open_date'] = datetime.fromtimestamp(trade.get('open_timestamp')/1000).isoformat()
if not row['close_date'] and trade.get('close_timestamp'):
row['close_date'] = datetime.fromtimestamp(trade.get('close_timestamp')/1000).isoformat()
# 如果没有orders字段跳过此交易
if not orders:
continue
entry_orders = [order for order in orders if order.get('ft_is_entry')]
row['entry_orders_count'] = len(entry_orders)
row['adjustments_count'] = max(0, len(entry_orders) - 1) # 加仓次数 = 入场订单数 - 1
# 创建JSON对象添加CSV中的所有字段
json_trade = row.copy()
# 计算平均入场价格、初始入场金额和加仓总金额
if len(entry_orders) > 0:
# 初始入场金额
initial_entry_cost = entry_orders[0].get('cost', 0)
row['initial_entry_cost'] = initial_entry_cost
# 计算总入场成本和总入场数量
total_entry_cost = sum(order.get('cost', 0) for order in entry_orders)
total_entry_amount = sum(order.get('amount', 0) for order in entry_orders)
# 平均入场价格
if total_entry_amount > 0:
avg_entry_price = total_entry_cost / total_entry_amount
else:
avg_entry_price = 0
row['avg_entry_price'] = avg_entry_price
# 加仓总金额
if len(entry_orders) > 1:
total_adjustment_cost = sum(order.get('cost', 0) for order in entry_orders[1:])
else:
total_adjustment_cost = 0
row['total_adjustment_cost'] = total_adjustment_cost
# 在JSON中添加每次入场/加仓的详细信息
entries = []
for i, order in enumerate(entry_orders):
# 获取原始订单的时间戳
order_timestamp = order.get('order_filled_timestamp', '')
# 转换时间戳为可读格式
timestamp = ''
if order_timestamp:
try:
timestamp = datetime.fromtimestamp(order_timestamp/1000).isoformat()
except:
timestamp = str(order_timestamp)
# 获取订单价格
price = order.get('safe_price', 0)
entry_info = {
'order_index': i,
'timestamp': timestamp,
'price': price,
'amount': order.get('amount', 0),
'cost': order.get('cost', 0),
'order_type': 'initial' if i == 0 else 'adjustment',
'raw_timestamp': order_timestamp # 保留原始时间戳
}
entries.append(entry_info)
json_trade['entries'] = entries
else:
# 没有入场订单的情况
row['initial_entry_cost'] = 0
row['avg_entry_price'] = 0
row['total_adjustment_cost'] = 0
json_trade['entries'] = []
# 写入CSV行
writer.writerow(row)
# 添加到JSON数据
json_data.append(json_trade)
# 写入JSON文件
with open(output_json, 'w', encoding='utf-8') as jsonfile:
json.dump(json_data, jsonfile, ensure_ascii=False, indent=2)
print(f"Successfully converted {largest_file.name} to {output_csv}")
print(f"Successfully converted {largest_file.name} to {output_json}")
print(f"Added position adjustment information with correct timestamps to both CSV and JSON outputs.")