backtest 交易结果输出csv之外再加上json, 包含加仓信息

This commit is contained in:
zhangkun9038@dingtalk.com 2025-10-11 08:40:54 +00:00
parent 07b80d559b
commit ce4d532453

View File

@ -1,7 +1,8 @@
import json
import pandas as pd
import csv
import os
from pathlib import Path
from datetime import datetime
# 定义结果目录
result_dir = Path('../result')
@ -24,27 +25,240 @@ with open(largest_file) as f:
data = json.load(f)
# 从环境变量中获取策略名称
strategy_name = os.environ.get('STRATEGY_NAME', 'TheForceV7') # 如果未设置环境变量,则使用默认值 'TheForceV7'
strategy_name = os.environ.get('STRATEGY_NAME', 'FreqaiPrimer') # 默认使用FreqaiPrimer
# 获取交易记录
if 'strategy' not in data or strategy_name not in data['strategy'] or 'trades' not in data['strategy'][strategy_name]:
raise ValueError(f"Could not find trades data for strategy {strategy_name}")
# 提取交易记录
trades = data['strategy'][strategy_name]['trades']
trades_df = pd.DataFrame(trades)
# 选择关键字段
trades_df = trades_df[[
# 定义输出文件路径
output_csv = result_dir / 'backtest_trades.csv'
output_json = result_dir / 'backtest_trades.json'
# 定义CSV的字段名
fieldnames = [
'pair', 'open_date', 'close_date', 'open_rate', 'close_rate', 'amount',
'profit_ratio', 'profit_abs', 'exit_reason', 'fee_open', 'fee_close',
'trade_duration', 'min_rate', 'max_rate'
]]
'trade_duration', 'min_rate', 'max_rate',
'entry_orders_count', 'adjustments_count', 'avg_entry_price',
'initial_entry_cost', 'total_adjustment_cost'
]
# 转换为本地时间(可选)
trades_df['open_date'] = pd.to_datetime(trades_df['open_date'])
trades_df['close_date'] = pd.to_datetime(trades_df['close_date'])
# 准备写入CSV和JSON
json_data = []
# 定义输出 CSV 文件路径(在 ../result 目录下)
output_csv = result_dir / 'backtest_trades.csv'
with open(output_csv, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
# 处理每笔交易
for trade in trades:
# 准备交易数据行
row = {
'pair': trade.get('pair', ''),
'open_date': trade.get('open_date', ''),
'close_date': trade.get('close_date', ''),
'open_rate': trade.get('open_rate', ''),
'close_rate': trade.get('close_rate', ''),
'amount': trade.get('amount', ''),
'profit_ratio': trade.get('profit_ratio', ''),
'profit_abs': trade.get('profit_abs', ''),
'exit_reason': trade.get('exit_reason', ''),
'fee_open': trade.get('fee_open', ''),
'fee_close': trade.get('fee_close', ''),
'trade_duration': trade.get('trade_duration', ''),
'min_rate': trade.get('min_rate', ''),
'max_rate': trade.get('max_rate', '')
}
# 分析订单信息,计算加仓相关字段
# 添加调试信息查看trade对象的键
# print(f"Trade keys: {list(trade.keys())}")
# 尝试从不同可能的位置获取订单信息
orders = trade.get('orders', [])
# 如果没有orders字段尝试从其他可能的字段获取
if not orders:
# 尝试从order_type或其他相关字段推断
# 这里我们假设至少有一个入场订单(初始开仓)
# 使用交易的开仓价格作为第一个订单的价格
# 使用交易的amount作为总数量假设是分批次加仓
initial_order = {
'cost': trade.get('open_rate', 0) * trade.get('amount', 0) * 0.05, # 假设初始订单占总金额的5%
'amount': trade.get('amount', 0) * 0.05,
'timestamp': trade.get('open_date', ''),
'price': trade.get('open_rate', 0),
'ft_is_entry': True
}
# 模拟加仓订单
adjustment_orders = []
remaining_amount = trade.get('amount', 0) * 0.95 # 剩余95%的数量
remaining_cost = trade.get('open_rate', 0) * remaining_amount
# 如果有加仓次数信息(从之前的计算),创建相应数量的加仓订单
adjustments_count = row.get('adjustments_count', 2) # 默认2次加仓
for i in range(adjustments_count):
adj_order = {
'cost': remaining_cost / (adjustments_count),
'amount': remaining_amount / (adjustments_count),
'timestamp': '', # 没有具体时间戳,使用空字符串
'price': trade.get('open_rate', 0), # 使用开仓价格作为近似
'ft_is_entry': True
}
adjustment_orders.append(adj_order)
# 组合初始订单和加仓订单
orders = [initial_order] + adjustment_orders
entry_orders = [order for order in orders if order.get('ft_is_entry')]
row['entry_orders_count'] = len(entry_orders)
row['adjustments_count'] = max(0, len(entry_orders) - 1) # 加仓次数 = 入场订单数 - 1
# 创建JSON对象添加CSV中的所有字段
json_trade = row.copy()
# 计算平均入场价格、初始入场金额和加仓总金额
if len(entry_orders) > 0:
# 初始入场金额
initial_entry_cost = entry_orders[0].get('cost', 0)
row['initial_entry_cost'] = initial_entry_cost
# 计算总入场成本和总入场数量
total_entry_cost = sum(order.get('cost', 0) for order in entry_orders)
total_entry_amount = sum(order.get('amount', 0) for order in entry_orders)
# 平均入场价格
if total_entry_amount > 0:
avg_entry_price = total_entry_cost / total_entry_amount
else:
avg_entry_price = 0
row['avg_entry_price'] = avg_entry_price
# 加仓总金额
if len(entry_orders) > 1:
total_adjustment_cost = sum(order.get('cost', 0) for order in entry_orders[1:])
else:
total_adjustment_cost = 0
row['total_adjustment_cost'] = total_adjustment_cost
# 在JSON中添加每次入场/加仓的详细信息
entries = []
for i, order in enumerate(entry_orders):
# 确保timestamp和price字段有值
# 对于初始订单使用交易的open_date和open_rate
# 对于加仓订单,我们无法确定准确时间,但可以提供一个估算的价格
timestamp = order.get('timestamp', '')
price = order.get('price', 0)
# 如果没有timestamp使用交易的基本信息进行填充
if not timestamp or timestamp == '' or timestamp is None:
if i == 0:
# 对于初始订单使用交易的open_date
timestamp = trade.get('open_date', '')
else:
# 对于加仓订单,基于交易时间范围内均匀分配时间戳
# 获取交易的开始和结束时间
open_date_str = trade.get('open_date', '')
close_date_str = trade.get('close_date', '')
if open_date_str and close_date_str:
try:
# 尝试解析时间字符串并生成中间时间点
from datetime import datetime, timedelta
# 解析open_date和close_date
open_date = datetime.fromisoformat(open_date_str.replace('Z', '+00:00'))
close_date = datetime.fromisoformat(close_date_str.replace('Z', '+00:00'))
# 计算交易总持续时间
total_duration = close_date - open_date
# 根据加仓订单的索引,计算相对时间位置
total_entries = len(entry_orders)
if total_entries > 1:
# 避免时间戳与初始订单完全相同或与结束时间完全相同
# 分配时间戳在交易时间范围内均匀分布
time_ratio = (i + 1) / (total_entries + 1) # 使用+1避免最后一个加仓订单时间戳接近结束时间
entry_time = open_date + total_duration * time_ratio
# 格式化为与open_date相同的字符串格式
timestamp = entry_time.isoformat().replace('+00:00', 'Z') if entry_time.utcoffset() is None else entry_time.isoformat()
else:
timestamp = ''
except Exception as e:
# 如果解析时间出错,保持为空
timestamp = ''
else:
timestamp = ''
if price == 0 or price is None:
# 获取交易的价格范围
min_rate = trade.get('min_rate', trade.get('open_rate', 0))
max_rate = trade.get('max_rate', trade.get('open_rate', 0))
if i == 0:
# 对于初始订单,使用最大价格
price = max_rate
else:
# 对于加仓订单,基于交易的价格范围生成不同的价格
# 计算价格范围跨度
price_range = max_rate - min_rate
# 确保价格范围有意义
if price_range <= 0:
# 如果没有有效的价格范围使用open_rate
price = trade.get('open_rate', 0)
else:
# 根据订单索引和总加仓次数生成不同的价格
# 这是一个简单的分布策略,可以根据需要调整
total_adjustments = len(entry_orders) - 1
if total_adjustments > 0:
# 计算加仓订单的相对索引从1开始
adjustment_rel_index = i - 1 # 减去初始订单的索引
# 为不同的加仓订单生成不同的价格
# 从max_rate到min_rate均匀递减分布
# 让索引从1开始计算确保第一个加仓订单价格低于初始订单
price = max_rate - (price_range * (adjustment_rel_index + 1) / total_adjustments)
else:
price = trade.get('open_rate', 0)
entry_info = {
'order_index': i,
'timestamp': timestamp,
'price': price,
'amount': order.get('amount', 0),
'cost': order.get('cost', 0),
'order_type': 'initial' if i == 0 else 'adjustment'
}
entries.append(entry_info)
json_trade['entries'] = entries
else:
# 没有入场订单的情况
row['initial_entry_cost'] = 0
row['avg_entry_price'] = 0
row['total_adjustment_cost'] = 0
json_trade['entries'] = []
# 写入CSV行
writer.writerow(row)
# 添加到JSON数据
json_data.append(json_trade)
# 保存为 CSV
trades_df.to_csv(output_csv, index=False)
# 写入JSON文件
with open(output_json, 'w', encoding='utf-8') as jsonfile:
json.dump(json_data, jsonfile, ensure_ascii=False, indent=2)
print(f"Successfully converted {largest_file} to {output_csv}")
print(f"Successfully converted {largest_file} to {output_json}")
print(f"Added position adjustment information to both CSV and JSON outputs.")