移除没用的

This commit is contained in:
zhangkun9038@dingtalk.com 2025-10-17 21:38:51 +08:00
parent 12aa006db0
commit e2ce8cef49
20 changed files with 0 additions and 2342 deletions

View File

@ -1,48 +0,0 @@
[
{
"pair": "BTC/USDT",
"open_date": "2025-09-01 10:00:00+00:00",
"close_date": "2025-09-02 10:00:00+00:00",
"open_rate": 50000,
"close_rate": 52000,
"amount": 3,
"profit_ratio": 0.04,
"profit_abs": 600,
"exit_reason": "roi",
"fee_open": 0.001,
"fee_close": 0.001,
"trade_duration": 86400,
"min_rate": 49000,
"max_rate": 52500,
"orders": [
{
"cost": 50000,
"amount": 1,
"order_filled_timestamp": 1751565600000,
"safe_price": 50000,
"ft_is_entry": true
},
{
"cost": 50000,
"amount": 1,
"order_filled_timestamp": 1751569200000,
"safe_price": 49800,
"ft_is_entry": true
},
{
"cost": 50000,
"amount": 1,
"order_filled_timestamp": 1751572800000,
"safe_price": 49600,
"ft_is_entry": true
},
{
"cost": 156000,
"amount": 3,
"order_filled_timestamp": 1751652000000,
"safe_price": 52000,
"ft_is_entry": false
}
]
}
]

View File

@ -1,123 +0,0 @@
#!/usr/bin/env python3
"""
验证防未来数据泄露策略的测试脚本
"""
import pandas as pd
import numpy as np
import logging
from datetime import datetime, timedelta
# 设置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def create_test_dataframe():
"""创建测试数据"""
dates = pd.date_range(start='2024-01-01', periods=100, freq='3min')
np.random.seed(42)
close = 100 + np.cumsum(np.random.randn(100) * 0.5)
high = close + np.abs(np.random.randn(100) * 0.2)
low = close - np.abs(np.random.randn(100) * 0.2)
open_price = close + np.random.randn(100) * 0.1
volume = np.abs(np.random.randn(100) * 1000) + 1000
return pd.DataFrame({
'date': dates,
'open': open_price,
'high': high,
'low': low,
'close': close,
'volume': volume
})
def test_vectorized_operations():
"""测试向量化操作是否避免未来数据泄露"""
df = create_test_dataframe()
original_len = len(df)
logger.info("=== 测试向量化操作 ===")
# 测试1: 使用TA-Lib计算指标安全
import talib.abstract as ta
df['rsi'] = ta.RSI(df, timeperiod=14)
df['ema200'] = ta.EMA(df, timeperiod=200)
# 验证长度一致性
assert len(df) == original_len, f"长度不匹配: {len(df)} vs {original_len}"
logger.info("✅ TA-Lib指标计算安全")
# 测试2: 使用rolling窗口安全
df['volume_ma'] = df['volume'].rolling(20).mean()
assert len(df) == original_len, f"长度不匹配: {len(df)} vs {original_len}"
logger.info("✅ Rolling窗口计算安全")
# 测试3: 使用shift获取历史数据安全
df['price_change'] = df['close'] - df['close'].shift(1)
assert len(df) == original_len, f"长度不匹配: {len(df)} vs {original_len}"
logger.info("✅ Shift操作安全")
# 测试4: 检查是否避免了iloc[-1]在业务逻辑中的使用
conditions = [
(df['rsi'] < 30),
(df['close'] < df['ema200'] * 0.95)
]
# 向量化条件计算
buy_signal = conditions[0] & conditions[1]
df['buy_signal'] = buy_signal.astype(int)
# 验证没有使用iloc[-1]做决策
assert not df['buy_signal'].isna().any(), "存在NaN值可能使用了未来数据"
logger.info("✅ 向量化条件计算安全")
return True
def test_dangerous_patterns():
"""测试危险模式(用于对比)"""
df = create_test_dataframe()
logger.info("=== 测试危险模式(对比)===")
# 危险模式1: 使用全量数据计算均值
try:
mean_price = df['close'].mean() # 这会使用未来数据
logger.warning("⚠️ 使用了全量数据均值 - 可能导致未来数据泄露")
except Exception as e:
logger.error(f"错误: {e}")
# 危险模式2: 使用iloc[-1]在业务逻辑中
try:
if len(df) > 0:
last_price = df['close'].iloc[-1] # 这在日志中可用,但不应影响决策
logger.info(f"最后价格: {last_price} - 仅用于日志记录")
except Exception as e:
logger.error(f"错误: {e}")
return True
def main():
"""主测试函数"""
logger.info("开始测试防未来数据泄露策略...")
# 测试向量化操作
test_vectorized_operations()
# 测试危险模式(对比)
test_dangerous_patterns()
logger.info("=== 测试总结 ===")
logger.info("✅ 所有向量化操作都避免了未来数据泄露")
logger.info("✅ 使用TA-Lib、rolling、shift等操作都是安全的")
logger.info("✅ 业务逻辑中避免了iloc[-1]的使用")
# 安全使用建议
logger.info("\n=== 安全使用建议 ===")
logger.info("1. 使用TA-Lib计算技术指标")
logger.info("2. 使用rolling窗口计算移动平均")
logger.info("3. 使用shift(1)获取历史数据")
logger.info("4. 避免在业务逻辑中使用全量数据计算")
logger.info("5. iloc[-1]仅用于日志记录,不影响交易决策")
if __name__ == "__main__":
main()

View File

@ -1,29 +0,0 @@
#!/bin/bash
# 设置工作目录
export WORK_DIR="/Users/zhangkun/myTestFreqAI"
cd "$WORK_DIR"
echo "🚀 启动混合模型策略测试..."
echo "📊 使用模型: freqai_primer_mixed"
echo "📁 配置文件: user_data/config.json"
# 清理旧模型和缓存
echo "🧹 清理旧模型和缓存..."
rm -rf user_data/models/test58/
rm -rf user_data/models/test_mixed_models/
rm -rf user_data/models/freqai_primer_mixed
rm -rf user_data/data/test_mixed_models
# 启动策略
python -m freqtrade trade \
--config user_data/config.json \
--strategy freqaiprimer \
--log-level INFO \
--freqai-enabled \
--freqaimodel LightGBMClassifier \
--strategy-path /Users/zhangkun/myTestFreqAI/freqtrade/templates \
--db-url sqlite:///user_data/tradesv3.sqlite \
--userdir user_data
echo "✅ 策略启动完成"

View File

@ -1,102 +0,0 @@
{
"strategy": "freqaiprimer",
"max_open_trades": 3,
"stake_currency": "USDT",
"stake_amount": 25,
"tradable_balance_ratio": 0.99,
"fiat_display_currency": "USD",
"dry_run": true,
"cancel_open_orders_on_exit": false,
"trading_mode": "spot",
"exchange": {
"name": "binance",
"key": "",
"secret": "",
"ccxt_config": {"enableRateLimit": true},
"ccxt_async_config": {"enableRateLimit": true},
"pair_blacklist": [
"BNB/.*"
]
},
"pairlists": [
{
"method": "StaticPairList",
"pairs": ["SOL/USDT", "WCT/USDT"]
}
],
"freqai": {
"enabled": true,
"identifier": "freqai_primer_mixed",
"purge_old_models": 2,
"train_period_days": 15,
"backtest_period_days": 7,
"live_retrain_candles": 100,
"fit_live_predictions_candles": 100,
"feature_parameters": {
"include_timeframes": ["3m", "15m", "1h"],
"label_period_candles": 12,
"include_shifted_candles": 3,
"indicator_periods_candles": [10, 20],
"DI_threshold": 30,
"weight_factor": 0.9
},
"data_split_parameters": {
"test_size": 0.2,
"shuffle": false
},
"model_training_parameters": {
"price_value_divergence": {
"model": "LightGBMRegressor",
"model_params": {
"n_estimators": 200,
"learning_rate": 0.05,
"num_leaves": 31,
"verbose": -1
}
},
"optimal_first_length": {
"model": "LightGBMClassifier",
"model_params": {
"n_estimators": 150,
"learning_rate": 0.1,
"num_leaves": 15,
"max_depth": 8,
"min_child_samples": 10,
"class_weight": "balanced",
"verbose": -1
}
}
}
},
"timeframe": "3m",
"dry_run_wallet": 1000,
"unfilledtimeout": {
"entry": 10,
"exit": 30
},
"entry_pricing": {
"price_side": "same",
"use_order_book": true,
"order_book_top": 1,
"check_depth_of_market": {
"enabled": false,
"bids_to_ask_delta": 1
}
},
"exit_pricing": {
"price_side": "same",
"use_order_book": true,
"order_book_top": 1
},
"order_types": {
"entry": "limit",
"exit": "limit",
"emergency_exit": "market",
"force_exit": "market",
"stoploss": "market",
"stoploss_on_exchange": false
},
"edge": {
"enabled": false
}
}

View File

@ -1,74 +0,0 @@
#!/usr/bin/env python3
"""
测试策略文件是否可以正常加载
"""
import sys
import os
# 添加freqtrade到Python路径
sys.path.insert(0, '/Users/zhangkun/myTestFreqAI')
from freqtrade.resolvers.strategy_resolver import StrategyResolver
from freqtrade.configuration import Configuration
def test_strategy_loading():
"""测试策略加载"""
try:
# 创建最小配置
config = {
'strategy': 'freqaiprimer',
'strategy_path': '/Users/zhangkun/myTestFreqAI/freqtrade/templates',
'stake_currency': 'USDT',
'stake_amount': 10,
'exchange': {
'name': 'binance',
'key': '',
'secret': '',
'ccxt_config': {},
'ccxt_async_config': {}
},
'pairlists': [{'method': 'StaticPairList'}],
'datadir': '/Users/zhangkun/myTestFreqAI/user_data/data',
'timeframe': '3m',
'freqai': {
'enabled': True,
'identifier': 'test',
'feature_parameters': {
'include_timeframes': ['3m'],
'label_period_candles': 12,
'include_shifted_candles': 3,
}
}
}
# 尝试加载策略
strategy = StrategyResolver.load_strategy(config)
print("✅ 策略加载成功!")
print(f"策略名称: {strategy.__class__.__name__}")
print(f"接口版本: {getattr(strategy, 'INTERFACE_VERSION', '未设置')}")
# 检查必需的方法
methods_to_check = [
'populate_indicators',
'populate_entry_trend',
'populate_exit_trend',
'feature_engineering_expand_all',
'set_freqai_targets'
]
for method in methods_to_check:
if hasattr(strategy, method):
print(f"✅ 找到方法: {method}")
else:
print(f"❌ 缺少方法: {method}")
return True
except Exception as e:
print(f"❌ 策略加载失败: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
test_strategy_loading()

View File

@ -1,37 +0,0 @@
#!/usr/bin/env python3
"""
测试时间戳处理的脚本
"""
import pandas as pd
import numpy as np
from datetime import datetime
def test_timestamp_handling():
"""测试时间戳处理逻辑"""
# 创建测试数据
dates = pd.date_range('2024-01-01', periods=100, freq='1min')
df = pd.DataFrame({
'close': np.random.randn(100),
'volume': np.random.randn(100)
}, index=dates)
print("测试数据创建成功")
print(f"数据类型: {type(df.index[0])}")
print(f"索引示例: {df.index[0]}")
# 测试时间戳转换
for i in range(-10, 0):
last_idx = df.index[i]
if isinstance(last_idx, pd.Timestamp):
ts = last_idx.tz_localize(None) if last_idx.tz else last_idx
timestamp = int(ts.timestamp())
print(f"索引 {i}: {last_idx} -> {timestamp}")
else:
timestamp = int(last_idx) if isinstance(last_idx, (int, np.integer)) else int(pd.Timestamp.utcnow().timestamp())
print(f"索引 {i}: {last_idx} -> {timestamp}")
print("时间戳处理测试完成")
if __name__ == "__main__":
test_timestamp_handling()

View File

@ -1,58 +0,0 @@
[
{
"pair": "TON/USDT",
"open_date": "2025-08-28T12:12:00Z",
"close_date": "2025-08-29T12:12:00Z",
"orders": [
{
"ft_is_entry": true,
"amount": 23.3499,
"safe_price": 3.265,
"order_filled_timestamp": null
},
{
"ft_is_entry": true,
"amount": 12.8352,
"safe_price": 2.9235,
"order_filled_timestamp": null
},
{
"ft_is_entry": true,
"amount": 510.9295,
"safe_price": 2.582,
"order_filled_timestamp": null
}
]
},
{
"pair": "TRUMP/USDT",
"open_date": "2025-09-13T16:03:00Z",
"close_date": "2025-09-14T16:03:00Z",
"orders": [
{
"ft_is_entry": true,
"amount": 8.0792,
"safe_price": 9.313,
"order_filled_timestamp": null
},
{
"ft_is_entry": true,
"amount": 4.4412,
"safe_price": 8.48366667,
"order_filled_timestamp": null
},
{
"ft_is_entry": true,
"amount": 142.3201,
"safe_price": 7.65433333,
"order_filled_timestamp": null
},
{
"ft_is_entry": true,
"amount": 54.0201,
"safe_price": 6.825,
"order_filled_timestamp": null
}
]
}
]

View File

@ -1,55 +0,0 @@
import json
import os
from pathlib import Path
# 创建测试数据不包含orders字段
print("Creating test data without orders field...")
test_data = {
"strategy": {
"FreqaiPrimer": {
"trades": [
{
"pair": "BTC/USDT",
"open_date": "2024-01-05T10:00:00Z",
"close_date": "2024-01-06T10:00:00Z",
"open_rate": 42000.0,
"close_rate": 43000.0,
"amount": 0.02,
"profit_ratio": 0.0238,
"profit_abs": 200.0,
"exit_reason": "roi",
"fee_open": 4.2,
"fee_close": 4.3,
"trade_duration": 86400,
"min_rate": 41800.0,
"max_rate": 43200.0
# 故意不包含orders字段
}
]
}
}
}
# 保存测试数据
result_dir = Path('../result')
test_file = result_dir / "backtest-result-test-no-orders.json"
with open(test_file, 'w') as f:
json.dump(test_data, f, ensure_ascii=False, indent=2)
print(f"Test data created at {test_file}")
# 先删除之前的输出文件和sample文件确保只处理测试文件
output_csv = result_dir / 'backtest_trades.csv'
output_json = result_dir / 'backtest_trades.json'
sample_file = result_dir / 'backtest-result-sample.json'
if output_csv.exists():
os.remove(output_csv)
if output_json.exists():
os.remove(output_json)
if sample_file.exists():
os.remove(sample_file)
print("Previous output files and sample file removed")
print("Now run tradestocsv.py to test handling of data without orders field")

View File

@ -1,93 +0,0 @@
{
"strategy": {
"FreqaiPrimer": {
"trades": [
{
"pair": "BTC/USDT",
"open_date": "2025-10-16T10:00:00+00:00",
"close_date": "2025-10-16T14:00:00+00:00",
"open_rate": 65000,
"close_rate": 66000,
"amount": 0.0015,
"profit_ratio": 0.015,
"profit_abs": 15,
"exit_reason": "take_profit",
"fee_open": 0.0008,
"fee_close": 0.0008,
"trade_duration": 14400,
"min_rate": 64800,
"max_rate": 66200,
"orders": [
{
"order_filled_timestamp": 1732620000000,
"safe_price": 65000,
"amount": 0.0005,
"cost": 32.5,
"ft_is_entry": true
},
{
"order_filled_timestamp": 1732627200000,
"safe_price": 65200,
"amount": 0.0005,
"cost": 32.6,
"ft_is_entry": true
},
{
"order_filled_timestamp": 1732634400000,
"safe_price": 65500,
"amount": 0.0005,
"cost": 32.75,
"ft_is_entry": true
},
{
"order_filled_timestamp": 1732636800000,
"safe_price": 66000,
"amount": 0.0015,
"cost": 99,
"ft_is_entry": false
}
]
},
{
"pair": "ETH/USDT",
"open_date": "2025-10-16T11:00:00+00:00",
"close_date": "2025-10-16T15:00:00+00:00",
"open_rate": 3200,
"close_rate": 3300,
"amount": 0.025,
"profit_ratio": 0.03125,
"profit_abs": 25,
"exit_reason": "take_profit",
"fee_open": 0.0008,
"fee_close": 0.0008,
"trade_duration": 14400,
"min_rate": 3180,
"max_rate": 3320,
"orders": [
{
"order_filled_timestamp": 1732623600000,
"safe_price": 3200,
"amount": 0.01,
"cost": 32,
"ft_is_entry": true
},
{
"order_filled_timestamp": 1732630800000,
"safe_price": 3230,
"amount": 0.015,
"cost": 48.45,
"ft_is_entry": true
},
{
"order_filled_timestamp": 1732640400000,
"safe_price": 3300,
"amount": 0.025,
"cost": 82.5,
"ft_is_entry": false
}
]
}
]
}
}
}

View File

@ -1,94 +0,0 @@
#!/usr/bin/env python3
"""
分析修复后的加仓时间间隔
"""
import json
from datetime import datetime
from pathlib import Path
# 读取修复后的JSON文件
result_file = Path('../result/backtest_trades_fixed.json')
with open(result_file) as f:
trades = json.load(f)
print("加仓时间间隔分析\n" + "="*50)
# 统计所有加仓间隔
intervals = []
pair_intervals = {}
for trade in trades:
pair = trade['pair']
entry_orders = trade.get('entries', [])
if len(entry_orders) > 1:
# 计算加仓间隔
pair_intervals[pair] = []
for i in range(1, len(entry_orders)):
prev_timestamp = entry_orders[i-1]['raw_timestamp']
curr_timestamp = entry_orders[i]['raw_timestamp']
# 计算间隔(秒)
interval_seconds = (curr_timestamp - prev_timestamp) / 1000
interval_minutes = interval_seconds / 60
interval_hours = interval_minutes / 60
intervals.append({
'pair': pair,
'from_order': i-1,
'to_order': i,
'interval_seconds': interval_seconds,
'interval_minutes': interval_minutes,
'interval_hours': interval_hours,
'from_time': entry_orders[i-1]['timestamp'],
'to_time': entry_orders[i]['timestamp']
})
pair_intervals[pair].append({
'from_order': i-1,
'to_order': i,
'interval_minutes': interval_minutes,
'interval_hours': interval_hours,
'from_time': entry_orders[i-1]['timestamp'],
'to_time': entry_orders[i]['timestamp']
})
# 按币对显示加仓间隔
for pair, intervals_list in pair_intervals.items():
print(f"\n{pair} 加仓间隔:")
for interval in intervals_list:
print(f" 订单 {interval['from_order']} -> {interval['to_order']}: "
f"{interval['interval_minutes']:.1f} 分钟 "
f"({interval['interval_hours']:.1f} 小时)")
print(f" 从: {interval['from_time']}")
print(f" 到: {interval['to_time']}")
# 统计总体信息
print(f"\n总体统计:")
print(f"总加仓次数: {len(intervals)}")
if intervals:
# 计算间隔分布
short_intervals = [i for i in intervals if i['interval_hours'] < 1]
medium_intervals = [i for i in intervals if 1 <= i['interval_hours'] < 24]
long_intervals = [i for i in intervals if i['interval_hours'] >= 24]
print(f"短期间隔 (<1小时): {len(short_intervals)} ({len(short_intervals)/len(intervals)*100:.1f}%)")
print(f"中期间隔 (1-24小时): {len(medium_intervals)} ({len(medium_intervals)/len(intervals)*100:.1f}%)")
print(f"长期间隔 (>=24小时): {len(long_intervals)} ({len(long_intervals)/len(intervals)*100:.1f}%)")
# 计算最短和最长间隔
min_interval = min(intervals, key=lambda x: x['interval_minutes'])
max_interval = max(intervals, key=lambda x: x['interval_minutes'])
avg_interval = sum(i['interval_minutes'] for i in intervals) / len(intervals)
print(f"\n最短间隔: {min_interval['interval_minutes']:.1f} 分钟 "
f"({min_interval['pair']} 订单 {min_interval['from_order']} -> {min_interval['to_order']})")
print(f"最长间隔: {max_interval['interval_minutes']:.1f} 分钟 "
f"({max_interval['pair']} 订单 {max_interval['from_order']} -> {max_interval['to_order']})")
print(f"平均间隔: {avg_interval:.1f} 分钟 ({avg_interval/60:.1f} 小时)")
print("\n分析完成!")

View File

@ -1,242 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
分析入场订单时间间隔的脚本
用于分析加仓时间问题
"""
import json
import os
from pathlib import Path
from datetime import datetime
def timestamp_to_datetime(timestamp):
"""将毫秒级时间戳转换为可读的日期时间"""
if isinstance(timestamp, (int, float)) and timestamp > 1000000000000:
# 毫秒级时间戳
return datetime.fromtimestamp(timestamp/1000).strftime('%Y-%m-%d %H:%M:%S')
return str(timestamp)
def analyze_reentry_behavior(trades):
"""
根据用户定义的规则分析重新入场行为
1. 无明确出场记录则视为彻底出场后续入场为初次进场
2. 初次进场后的所有进场视为加仓
3. 出现一次出场后后续入场为初次入场
4. 无任何入场出场记录时第一次入场为初次入场
"""
total_trades = len(trades)
multi_entry_trades = 0
total_add_positions = 0
for trade in trades:
entries = trade.get('entries', [])
if len(entries) > 1:
multi_entry_trades += 1
# 根据规则,第一笔为初次入场,其余为加仓
total_add_positions += len(entries) - 1
return total_trades, multi_entry_trades, total_add_positions
def analyze_multi_entry_trades(trades):
"""分析有多笔入场订单的交易详情"""
multi_entry_trades_list = []
total_adjustments = 0
for trade in trades:
entries = trade.get('entries', [])
if len(entries) > 1:
multi_entry_trades_list.append(trade)
total_adjustments += len(entries) - 1
return multi_entry_trades_list, total_adjustments
def analyze_time_intervals(trades):
"""分析时间间隔"""
time_intervals = []
detailed_intervals = []
for trade in trades:
entries = trade.get('entries', [])
pair = trade.get('pair', 'Unknown')
if len(entries) <= 1:
continue
entry_times = []
for entry in entries:
timestamp = entry.get('timestamp')
if timestamp is not None and timestamp != 'N/A':
entry_times.append(timestamp)
# 计算时间间隔
for i in range(1, len(entry_times)):
if isinstance(entry_times[i], (int, float)) and isinstance(entry_times[i-1], (int, float)):
interval = abs(entry_times[i] - entry_times[i-1]) / 1000 # 转换为秒
time_intervals.append(interval)
# 记录详细信息
detailed_info = {
'pair': pair,
'interval_seconds': interval,
'interval_minutes': interval / 60,
'first_timestamp': entry_times[i-1],
'second_timestamp': entry_times[i]
}
detailed_intervals.append(detailed_info)
return time_intervals, detailed_intervals
def analyze_entry_timing():
"""分析入场订单的时间间隔"""
# 定义结果目录
result_dir = Path('../result')
# 首先尝试读取测试数据文件(如果存在)
test_json_file = result_dir / 'test_trades_with_entries.json'
json_file = result_dir / 'backtest_trades.json'
data_source = ""
if test_json_file.exists():
data_file = test_json_file
data_source = "测试数据"
elif json_file.exists():
data_file = json_file
data_source = "回测数据"
else:
print(f"❌ 文件不存在: {json_file}{test_json_file}")
return
# 读取JSON文件
with open(data_file, 'r', encoding='utf-8') as f:
data = json.load(f)
print(f"=== 入场订单时间分析 ({data_source}) ===")
print(f"总交易数: {len(data)}")
# 使用用户定义的规则分析重新入场行为
total_trades, multi_entry_trades, total_add_positions = analyze_reentry_behavior(data)
print(f"有多笔入场订单的交易数: {multi_entry_trades}")
print(f"总的加仓次数: {total_add_positions}")
# 根据用户规则分析结果
print("\n🔍 加仓行为分析 (基于用户定义规则):")
if total_add_positions == 0:
print(" 当前数据中没有发现加仓操作")
print(" 规则说明:")
print(" 1. 无明确出场记录则视为彻底出场,后续入场为初次进场")
print(" 2. 初次进场后的所有进场视为加仓")
print(" 3. 出现一次出场后,后续入场为初次入场")
print(" 4. 无任何入场出场记录时,第一次入场为初次入场")
print("\n 当前所有交易都只有初始入场订单,没有加仓操作")
print(" 所有交易的开仓和平仓时间相同")
else:
print(f" 发现 {total_add_positions} 次加仓操作")
print(f"{multi_entry_trades} 笔交易中存在加仓行为")
# 获取有多笔入场订单的交易列表
multi_entry_trades_list, total_adjustments = analyze_multi_entry_trades(data)
if len(multi_entry_trades_list) == 0:
print("\n📊 详细分析:")
print(" 当前数据中没有发现加仓操作(每笔交易只有初始入场订单)")
print(" 所有交易的开仓和平仓时间相同")
return
# 分析时间间隔
print("\n=== 时间间隔分析 ===")
time_intervals, detailed_intervals = analyze_time_intervals(multi_entry_trades_list)
# 显示每笔交易的详细信息
for trade in multi_entry_trades_list:
entries = trade.get('entries', [])
pair = trade.get('pair', 'Unknown')
print(f"\n交易对: {pair}")
entry_times = []
for i, entry in enumerate(entries):
timestamp = entry.get('timestamp')
entry_times.append(timestamp)
dt_str = timestamp_to_datetime(timestamp)
order_type = entry.get('order_type', 'unknown')
print(f" 订单 {i+1} ({order_type}): {dt_str} ({timestamp})")
# 计算时间间隔
for i in range(1, len(entry_times)):
if isinstance(entry_times[i], (int, float)) and isinstance(entry_times[i-1], (int, float)):
interval = abs(entry_times[i] - entry_times[i-1]) / 1000 # 转换为秒
print(f" 与前一订单时间间隔: {interval:.0f} 秒 ({interval/60:.1f} 分钟)")
# 统计时间间隔
if time_intervals:
print(f"\n=== 时间间隔统计 ===")
print(f"平均时间间隔: {sum(time_intervals)/len(time_intervals):.0f} 秒 ({sum(time_intervals)/len(time_intervals)/60:.1f} 分钟)")
print(f"最短时间间隔: {min(time_intervals):.0f} 秒 ({min(time_intervals)/60:.1f} 分钟)")
print(f"最长时间间隔: {max(time_intervals):.0f} 秒 ({max(time_intervals)/60:.1f} 分钟)")
# 显示最长和最短时间间隔的详细信息
if detailed_intervals:
# 找到最短时间间隔
shortest = min(detailed_intervals, key=lambda x: x['interval_seconds'])
print(f"\n⏱️ 最短时间间隔详情:")
print(f" 交易对: {shortest['pair']}")
print(f" 时间间隔: {shortest['interval_seconds']:.0f} 秒 ({shortest['interval_minutes']:.1f} 分钟)")
print(f" 时间1: {timestamp_to_datetime(shortest['first_timestamp'])}")
print(f" 时间2: {timestamp_to_datetime(shortest['second_timestamp'])}")
# 找到最长时间间隔
longest = max(detailed_intervals, key=lambda x: x['interval_seconds'])
print(f"\n⏱️ 最长时间间隔详情:")
print(f" 交易对: {longest['pair']}")
print(f" 时间间隔: {longest['interval_seconds']:.0f} 秒 ({longest['interval_minutes']:.1f} 分钟)")
print(f" 时间1: {timestamp_to_datetime(longest['first_timestamp'])}")
print(f" 时间2: {timestamp_to_datetime(longest['second_timestamp'])}")
# 分析原始数据中的时间信息
print("\n=== 原始数据时间信息 ===")
original_file = None
for f in result_dir.glob('backtest-result-*.json'):
original_file = f
break
if original_file and original_file.exists():
with open(original_file, 'r', encoding='utf-8') as f:
original_data = json.load(f)
strategy_name = os.environ.get('STRATEGY_NAME', 'FreqaiPrimer')
if 'strategy' in original_data and strategy_name in original_data['strategy'] and 'trades' in original_data['strategy'][strategy_name]:
original_trades = original_data['strategy'][strategy_name]['trades']
print(f"原始交易数: {len(original_trades)}")
# 检查前几个交易的时间信息
for i in range(min(3, len(original_trades))):
trade = original_trades[i]
pair = trade.get('pair', 'Unknown')
open_date = trade.get('open_date', 'N/A')
close_date = trade.get('close_date', 'N/A')
print(f"\n原始交易 {i+1} ({pair}):")
print(f" 开仓时间: {open_date}")
print(f" 平仓时间: {close_date}")
# 检查订单信息
orders = trade.get('orders', [])
print(f" 订单数: {len(orders)}")
entry_orders = [order for order in orders if order.get('ft_is_entry', False)]
exit_orders = [order for order in orders if not order.get('ft_is_entry', True)]
print(f" 入场订单数: {len(entry_orders)}")
print(f" 出场订单数: {len(exit_orders)}")
for j, order in enumerate(orders):
order_type = "入场" if order.get('ft_is_entry') else "出场"
timestamp = order.get('order_filled_timestamp', 'N/A')
dt_str = timestamp_to_datetime(timestamp) if timestamp != 'N/A' else 'N/A'
print(f" 订单 {j+1} ({order_type}): {dt_str}")
if __name__ == "__main__":
analyze_entry_timing()

View File

@ -1,122 +0,0 @@
import json
from datetime import datetime
# 读取修复后的交易数据
with open('../result/backtest_trades_fixed.json', 'r') as f:
trades = json.load(f)
def timestamp_to_datetime(ts):
"""将时间戳转换为datetime对象"""
if not ts:
return None
try:
return datetime.fromisoformat(ts)
except:
try:
# 如果是数字,假设是毫秒时间戳
return datetime.fromtimestamp(int(ts)/1000)
except:
return None
def analyze_position_timing(trades):
"""分析加仓时间间隔"""
print("=" * 60)
print("加仓时间间隔分析")
print("=" * 60)
total_trades = len(trades)
multi_entry_trades = [t for t in trades if t.get('entry_orders_count', 0) > 1]
print(f"总交易数: {total_trades}")
print(f"有多笔入场订单的交易数: {len(multi_entry_trades)}")
print(f"总加仓次数: {sum(t.get('adjustments_count', 0) for t in trades)}")
print()
# 详细分析每笔有多笔入场订单的交易
for i, trade in enumerate(multi_entry_trades):
pair = trade.get('pair', 'Unknown')
print(f"交易 #{i+1}: {pair}")
print(f" 入场订单数: {trade.get('entry_orders_count', 0)}")
print(f" 加仓次数: {trade.get('adjustments_count', 0)}")
entries = trade.get('entries', [])
if entries:
print(" 入场订单详情:")
for j, entry in enumerate(entries):
order_type = entry.get('order_type', 'unknown')
timestamp = entry.get('timestamp', '')
price = entry.get('price', 0)
amount = entry.get('amount', 0)
dt = timestamp_to_datetime(timestamp)
time_str = dt.strftime('%Y-%m-%d %H:%M:%S') if dt else 'N/A'
print(f" 订单 {j+1} ({order_type}): 时间={time_str}, 价格={price}, 数量={amount}")
# 计算加仓时间间隔
if len(entries) > 1:
print(" 加仓时间间隔:")
for j in range(1, len(entries)):
prev_dt = timestamp_to_datetime(entries[j-1].get('timestamp', ''))
curr_dt = timestamp_to_datetime(entries[j].get('timestamp', ''))
if prev_dt and curr_dt:
interval = curr_dt - prev_dt
interval_seconds = interval.total_seconds()
interval_minutes = interval_seconds / 60
interval_hours = interval_minutes / 60
print(f" 从订单 {j} 到订单 {j+1}: {interval_seconds}秒 ({interval_minutes:.1f}分钟, {interval_hours:.2f}小时)")
print()
# 执行分析
analyze_position_timing(trades)
# 统计加仓时间间隔分布
def analyze_interval_distribution(trades):
print("=" * 60)
print("加仓时间间隔分布统计")
print("=" * 60)
intervals = [] # 存储所有加仓间隔(秒)
for trade in trades:
entries = trade.get('entries', [])
if len(entries) > 1:
for j in range(1, len(entries)):
prev_dt = timestamp_to_datetime(entries[j-1].get('timestamp', ''))
curr_dt = timestamp_to_datetime(entries[j].get('timestamp', ''))
if prev_dt and curr_dt:
interval_seconds = (curr_dt - prev_dt).total_seconds()
intervals.append(interval_seconds)
if intervals:
print(f"总加仓间隔数: {len(intervals)}")
print(f"最短间隔: {min(intervals)}秒 ({min(intervals)/60:.1f}分钟)")
print(f"最长间隔: {max(intervals)}秒 ({max(intervals)/60:.1f}分钟)")
print(f"平均间隔: {sum(intervals)/len(intervals):.1f}秒 ({sum(intervals)/len(intervals)/60:.1f}分钟)")
# 按时间范围分类
ranges = [
(0, 300, "0-5分钟"),
(300, 1800, "5分钟-30分钟"),
(1800, 3600, "30分钟-1小时"),
(3600, 7200, "1-2小时"),
(7200, 14400, "2-4小时"),
(14400, 28800, "4-8小时"),
(28800, 86400, "8小时-1天"),
(86400, float('inf'), "超过1天")
]
print("\n时间间隔分布:")
for min_sec, max_sec, label in ranges:
count = sum(1 for i in intervals if min_sec <= i < max_sec)
percentage = count / len(intervals) * 100
print(f" {label}: {count}次 ({percentage:.1f}%)")
else:
print("没有找到加仓间隔数据")
# 执行间隔分布分析
analyze_interval_distribution(trades)

View File

@ -1,39 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
分析测试数据中的多笔入场订单交易
"""
import json
from pathlib import Path
def analyze_test_data():
"""分析测试数据"""
# 定义结果目录
result_dir = Path('../result')
json_file = result_dir / 'test_trades_with_entries.json'
if not json_file.exists():
print(f"❌ 文件不存在: {json_file}")
return
# 读取JSON文件
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
print("=== 测试数据中的多笔入场订单交易 ===")
print(f"总交易数: {len(data)}")
for i, trade in enumerate(data):
entries = trade.get('entries', [])
pair = trade.get('pair', 'Unknown')
print(f"\n交易 {i+1}: {pair}")
print(f" 总入场订单数: {len(entries)}")
for j, entry in enumerate(entries):
order_type = entry.get('order_type', 'unknown')
timestamp = entry.get('timestamp', 'N/A')
print(f" 订单 {j+1} ({order_type}): {timestamp}")
if __name__ == "__main__":
analyze_test_data()

View File

@ -1,121 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
根据用户定义的规则分析加仓操作的详细脚本
规则
1. 无明确出场记录则视为彻底出场后续入场为初次进场
2. 初次进场后的所有进场视为加仓
3. 出现一次出场后后续入场为初次入场
4. 无任何入场出场记录时第一次入场为初次入场
"""
import json
from pathlib import Path
from datetime import datetime
def timestamp_to_datetime(timestamp):
"""将毫秒级时间戳转换为可读的日期时间"""
if isinstance(timestamp, (int, float)) and timestamp > 1000000000000:
# 毫秒级时间戳
return datetime.fromtimestamp(timestamp/1000).strftime('%Y-%m-%d %H:%M:%S')
return str(timestamp)
def analyze_user_defined_positions(trades):
"""
根据用户定义的规则分析加仓操作
"""
print("=== 用户定义加仓规则分析 ===")
print("规则说明:")
print(" 1. 无明确出场记录则视为彻底出场,后续入场为初次进场")
print(" 2. 初次进场后的所有进场视为加仓")
print(" 3. 出现一次出场后,后续入场为初次入场")
print(" 4. 无任何入场出场记录时,第一次入场为初次入场")
print(f"\n总交易数: {len(trades)}")
# 统计信息
multi_entry_trades = 0 # 有多笔入场订单的交易数
total_add_positions = 0 # 总加仓次数
detailed_analysis = [] # 详细分析结果
for i, trade in enumerate(trades):
entries = trade.get('entries', [])
pair = trade.get('pair', f'Trade_{i+1}')
# 如果只有一笔入场订单,则没有加仓
if len(entries) <= 1:
continue
# 有多笔入场订单的交易
multi_entry_trades += 1
add_positions_in_trade = len(entries) - 1 # 第一笔是初次入场,其余都是加仓
total_add_positions += add_positions_in_trade
# 记录详细信息
trade_info = {
'pair': pair,
'total_entries': len(entries),
'add_positions': add_positions_in_trade,
'entries': entries
}
detailed_analysis.append(trade_info)
print(f"\n有多笔入场订单的交易数: {multi_entry_trades}")
print(f"总的加仓次数: {total_add_positions}")
# 详细分析有多笔入场订单的交易
if detailed_analysis:
print("\n=== 详细加仓分析 ===")
for trade_info in detailed_analysis:
print(f"\n交易对: {trade_info['pair']}")
print(f" 总入场订单数: {trade_info['total_entries']}")
print(f" 加仓次数: {trade_info['add_positions']}")
# 分析每笔订单
entries = trade_info['entries']
for j, entry in enumerate(entries):
order_type = entry.get('order_type', 'unknown')
timestamp = entry.get('timestamp', 'N/A')
dt_str = timestamp_to_datetime(timestamp)
if j == 0:
position_type = "初次入场"
else:
position_type = "加仓"
print(f" 订单 {j+1} ({order_type}): {dt_str} ({timestamp}) - {position_type}")
else:
print("\n🔍 分析结果:")
print(" 当前数据中没有发现加仓操作")
print(" 所有交易都只有初始入场订单")
print(" 所有交易的开仓和平仓时间相同")
def main():
"""主函数"""
# 定义结果目录
result_dir = Path('../result')
# 首先尝试读取测试数据文件(如果存在)
test_json_file = result_dir / 'test_trades_with_entries.json'
json_file = result_dir / 'backtest_trades.json'
data_source = ""
if test_json_file.exists():
data_file = test_json_file
data_source = "测试数据"
elif json_file.exists():
data_file = json_file
data_source = "回测数据"
else:
print(f"❌ 文件不存在: {json_file}{test_json_file}")
return
# 读取JSON文件
with open(data_file, 'r', encoding='utf-8') as f:
data = json.load(f)
print(f"=== 数据源: {data_source} ===")
analyze_user_defined_positions(data)
if __name__ == "__main__":
main()

View File

@ -1,275 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
综合加仓分析脚本
支持不同数据源的自动分析包括测试数据和回测数据
"""
import json
from pathlib import Path
from datetime import datetime
def timestamp_to_datetime(timestamp):
"""将毫秒级时间戳转换为可读的日期时间"""
if isinstance(timestamp, (int, float)) and timestamp > 1000000000000:
# 毫秒级时间戳
return datetime.fromtimestamp(timestamp/1000).strftime('%Y-%m-%d %H:%M:%S')
elif isinstance(timestamp, str):
return timestamp
return str(timestamp)
def analyze_user_defined_positions(trades, data_source=""):
"""
根据用户定义的规则分析加仓操作
规则
1. 无明确出场记录则视为彻底出场后续入场为初次进场
2. 初次进场后的所有进场视为加仓
3. 出现一次出场后后续入场为初次入场
4. 无任何入场出场记录时第一次入场为初次入场
"""
print("=" * 50)
if data_source:
print(f"数据源: {data_source}")
print("用户定义加仓规则分析")
print("=" * 50)
print("规则说明:")
print(" 1. 无明确出场记录则视为彻底出场,后续入场为初次进场")
print(" 2. 初次进场后的所有进场视为加仓")
print(" 3. 出现一次出场后,后续入场为初次入场")
print(" 4. 无任何入场出场记录时,第一次入场为初次入场")
print(f"\n📊 总交易数: {len(trades)}")
# 统计信息
multi_entry_trades = 0 # 有多笔入场订单的交易数
total_add_positions = 0 # 总加仓次数
detailed_analysis = [] # 详细分析结果
time_intervals = [] # 时间间隔列表
for i, trade in enumerate(trades):
entries = trade.get('entries', [])
pair = trade.get('pair', f'Trade_{i+1}')
# 如果只有一笔入场订单,则没有加仓
if len(entries) <= 1:
continue
# 有多笔入场订单的交易
multi_entry_trades += 1
add_positions_in_trade = len(entries) - 1 # 第一笔是初次入场,其余都是加仓
total_add_positions += add_positions_in_trade
# 分析时间间隔
entry_timestamps = []
valid_entries = []
for entry in entries:
timestamp = entry.get('timestamp')
if timestamp is not None and timestamp != 'N/A':
entry_timestamps.append(timestamp)
valid_entries.append(entry)
# 计算时间间隔
intervals = []
for j in range(1, len(entry_timestamps)):
if isinstance(entry_timestamps[j], (int, float)) and isinstance(entry_timestamps[j-1], (int, float)):
interval = abs(entry_timestamps[j] - entry_timestamps[j-1]) / 1000 # 转换为秒
intervals.append(interval)
time_intervals.append(interval)
# 记录详细信息
trade_info = {
'pair': pair,
'total_entries': len(entries),
'valid_entries': len(valid_entries),
'add_positions': add_positions_in_trade,
'entries': entries,
'intervals': intervals,
'entry_timestamps': entry_timestamps
}
detailed_analysis.append(trade_info)
print(f"\n📈 有多笔入场订单的交易数: {multi_entry_trades}")
print(f"💰 总的加仓次数: {total_add_positions}")
# 详细分析有多笔入场订单的交易
if detailed_analysis:
print("\n🔍 详细加仓分析")
print("-" * 30)
for trade_info in detailed_analysis:
print(f"\n交易对: {trade_info['pair']}")
print(f" 总入场订单数: {trade_info['total_entries']}")
print(f" 有效时间戳订单数: {trade_info['valid_entries']}")
print(f" 加仓次数: {trade_info['add_positions']}")
# 分析每笔订单
entries = trade_info['entries']
entry_timestamps = trade_info['entry_timestamps']
for j, entry in enumerate(entries):
order_type = entry.get('order_type', 'unknown')
timestamp = entry.get('timestamp', 'N/A')
dt_str = timestamp_to_datetime(timestamp)
if j == 0:
position_type = "初次入场"
else:
position_type = "加仓"
print(f" 订单 {j+1} ({order_type}): {dt_str} ({timestamp}) - {position_type}")
# 显示时间间隔
intervals = trade_info['intervals']
if intervals:
print(f" ⏱️ 时间间隔分析:")
for k, interval in enumerate(intervals):
print(f" 订单 {k+2} 与订单 {k+1} 的间隔: {interval:.0f} 秒 ({interval/60:.1f} 分钟)")
# 时间间隔统计
if time_intervals:
print(f"\n⏱️ 时间间隔统计")
print("-" * 20)
avg_seconds = sum(time_intervals)/len(time_intervals)
print(f"平均时间间隔: {avg_seconds:.0f} 秒 ({avg_seconds/60:.1f} 分钟)")
min_seconds = min(time_intervals)
print(f"最短时间间隔: {min_seconds:.0f} 秒 ({min_seconds/60:.1f} 分钟)")
max_seconds = max(time_intervals)
print(f"最长时间间隔: {max_seconds:.0f} 秒 ({max_seconds/60:.1f} 分钟)")
# 找到最短和最长间隔的交易
if detailed_analysis:
# 找到包含最短时间间隔的交易
min_interval_trade = None
min_interval_index = -1
for trade_info in detailed_analysis:
intervals = trade_info['intervals']
if intervals and min(intervals) == min(time_intervals):
min_interval_trade = trade_info
min_interval_index = intervals.index(min(intervals))
break
if min_interval_trade:
print(f"\n🔥 最短时间间隔详情:")
print(f" 交易对: {min_interval_trade['pair']}")
print(f" 时间间隔: {min(time_intervals):.0f} 秒 ({min(time_intervals)/60:.1f} 分钟)")
else:
print(f"\n📝 时间戳信息:")
print("-" * 15)
if detailed_analysis:
print(" 发现交易包含多笔入场订单,但时间戳信息不可用")
print(" 无法进行时间间隔分析")
else:
print(" 当前数据中没有发现加仓操作")
print(" 所有交易都只有初始入场订单")
print(" 所有交易的开仓和平仓时间相同")
def load_trades_data(result_dir):
"""加载交易数据"""
# 按优先级查找数据文件
data_files = [
('测试数据', result_dir / 'test_trades_with_entries.json'),
('回测数据', result_dir / 'backtest_trades.json')
]
data_source = ""
data_file = None
for source, file_path in data_files:
if file_path.exists():
data_source = source
data_file = file_path
break
if not data_file:
print(f"❌ 未找到数据文件")
# 尝试查找其他可能的文件
for file_path in result_dir.glob('*.json'):
if 'trade' in file_path.name.lower():
data_file = file_path
data_source = "未知数据"
break
if not data_file or not data_file.exists():
print(f"❌ 文件不存在")
return None, ""
# 读取JSON文件
try:
with open(data_file, 'r', encoding='utf-8') as f:
data = json.load(f)
return data, f"{data_source} ({data_file.name})"
except Exception as e:
print(f"❌ 读取文件失败: {e}")
return None, ""
def analyze_sample_data():
"""分析示例数据"""
print("🧪 示例数据分析")
print("=" * 30)
# 创建示例数据
sample_trades = [
{
"pair": "BTC/USDT",
"entries": [
{
"order_index": 0,
"timestamp": 1704067200000, # 2024-01-01 00:00:00
"order_type": "initial"
},
{
"order_index": 1,
"timestamp": 1704067500000, # 2024-01-01 00:05:00
"order_type": "adjustment"
},
{
"order_index": 2,
"timestamp": 1704067800000, # 2024-01-01 00:10:00
"order_type": "adjustment"
}
]
},
{
"pair": "ETH/USDT",
"entries": [
{
"order_index": 0,
"timestamp": 1704153600000, # 2024-01-02 00:00:00
"order_type": "initial"
},
{
"order_index": 1,
"timestamp": 1704154200000, # 2024-01-02 00:10:00
"order_type": "adjustment"
}
]
}
]
analyze_user_defined_positions(sample_trades, "示例数据")
def main():
"""主函数"""
print("🚀 综合加仓分析工具")
print("=" * 50)
# 获取项目根目录
project_root = Path(__file__).parent.parent
result_dir = project_root / 'result'
# 首先分析示例数据
analyze_sample_data()
print("\n" + "=" * 50)
print("📂 实际数据分析")
print("=" * 50)
# 加载并分析实际数据
data, data_source = load_trades_data(result_dir)
if data is not None:
analyze_user_defined_positions(data, data_source)
else:
print("无法加载交易数据")
if __name__ == "__main__":
main()

View File

@ -1,6 +0,0 @@
BTC/USDT
ETH/USDT
SOL/USDT
SUI/USDT
DOT/USDT
PEPE/USDT

View File

@ -1,48 +0,0 @@
#!/bin/bash
# 运行tradestocsv.py脚本
echo "🔄 正在转换回测结果..."
python tradestocsv.py
# 检查脚本执行结果
if [ $? -eq 0 ]; then
echo "✅ 转换完成"
# 显示生成的文件信息
echo -e "\n📄 生成的文件:"
ls -lh ../result/backtest_trades.*
# 显示CSV文件的前几行
echo -e "\n📋 CSV文件前5行:"
head -5 ../result/backtest_trades.csv
# 运行测试脚本
echo -e "\n🔍 正在验证文件格式..."
python test_tradestocsv.py
# 额外检查时间戳信息
echo -e "\n🕒 检查时间戳信息:"
python -c "
import json
with open('../result/backtest_trades.json', 'r') as f:
data = json.load(f)
print('前3个交易的时间戳信息:')
for i in range(min(3, len(data))):
trade = data[i]
pair = trade['pair']
entries = trade.get('entries', [])
if entries:
timestamp = entries[0]['timestamp']
print(f' {pair}: {timestamp} ({type(timestamp).__name__})')
else:
print(f' {pair}: 无入场订单')
"
# 运行加仓时间分析脚本
echo -e "\n⏰ 正在分析入场时间..."
python analyze_entry_timing.py
else
echo "❌ 转换失败"
exit 1
fi

View File

@ -1,453 +0,0 @@
#!/bin/bash
# 定义结果文件路径
RESULT_FILE="../result/backtest_trades_fixed.json"
# 使用绝对路径以确保正确访问
# RESULT_FILE="/Users/zhangkun/myTestFreqAI/result/test_trades_with_entries.json"
# RESULT_FILE="/Users/zhangkun/myTestFreqAI/test_trades.json" # 绝对路径新测试文件
# 检查文件是否存在
if [ ! -f "$RESULT_FILE" ]; then
echo "错误:找不到文件 $RESULT_FILE"
echo "请确保运行了backtest并生成了交易数据文件"
exit 1
fi
# 检查jq是否安装
if ! command -v jq &> /dev/null; then
echo "错误jq命令未找到请安装jq以处理JSON数据"
echo "Ubuntu/Debian: sudo apt-get install jq"
echo "CentOS/RHEL: sudo yum install jq"
exit 1
fi
echo "# 交易持仓分析"
echo ""
echo "## 交易概览"
echo ""
echo "| 交易ID | 交易对 | 第1次加仓下调% | 第2次加仓下调% | 第3次加仓下调% |"
echo "|--------|----------|----------------|----------------|----------------|"
# 使用jq处理JSON数据提取每个交易的entries数组并计算加仓价格下调百分比
trade_id=0
total_trades=0
# 处理每个交易
while IFS= read -r trade; do
trade_id=$((trade_id + 1))
pair=$(echo "$trade" | jq -r '.pair')
# 初始化变量
adj1="-"
adj2="-"
adj3="-"
entry_data=()
prices=()
amounts=()
# 从entries数组获取数据使用更简单的语法
entries_count=$(echo "$trade" | jq -r '.entries | length // 0')
# 尝试从open_date获取时间信息
open_date=$(echo "$trade" | jq -r '.open_date // "N/A"')
open_date=$(echo "$open_date" | tr -d '"')
# 处理entries数据
if [ "$entries_count" -gt 0 ]; then
entries_data=$(echo "$trade" | jq -r '.entries[] | [.order_index, .price, .amount, .order_type, .timestamp, .raw_timestamp] | @csv')
index=0
while IFS=, read -r idx price amount order_type timestamp raw_timestamp; do
# 移除可能的引号
idx=$(echo "$idx" | tr -d '"')
price=$(echo "$price" | tr -d '"')
amount=$(echo "$amount" | tr -d '"')
order_type=$(echo "$order_type" | tr -d '"')
timestamp=$(echo "$timestamp" | tr -d '"')
raw_timestamp=$(echo "$raw_timestamp" | tr -d '"')
# 处理时间信息:优先使用实际时间戳
datetime="N/A"
# 首先尝试使用raw_timestamp通常是毫秒级时间戳
if [ "$raw_timestamp" != "null" ] && [ -n "$raw_timestamp" ] && [[ "$raw_timestamp" =~ ^[0-9]+$ ]]; then
# 处理毫秒时间戳
if [ ${#raw_timestamp} -ge 13 ]; then
# 移除毫秒部分
seconds=${raw_timestamp:0:10}
datetime=$(date -u -r "$seconds" +"%Y-%m-%d %H:%M:%S" 2>/dev/null || date -u -d @"$seconds" +"%Y-%m-%d %H:%M:%S" 2>/dev/null || echo "时间戳转换失败")
else
datetime=$(date -u -r "$raw_timestamp" +"%Y-%m-%d %H:%M:%S" 2>/dev/null || date -u -d @"$raw_timestamp" +"%Y-%m-%d %H:%M:%S" 2>/dev/null || echo "时间戳转换失败")
fi
# 如果没有raw_timestamp尝试使用timestamp字段
elif [ "$timestamp" != "null" ] && [ -n "$timestamp" ]; then
if [[ "$timestamp" =~ ^[0-9]+$ ]]; then
# 处理时间戳
if [ ${#timestamp} -ge 13 ]; then
# 移除毫秒部分
seconds=${timestamp:0:10}
datetime=$(date -u -r "$seconds" +"%Y-%m-%d %H:%M:%S" 2>/dev/null || date -u -d @"$seconds" +"%Y-%m-%d %H:%M:%S" 2>/dev/null || echo "时间戳转换失败")
else
datetime=$(date -u -r "$timestamp" +"%Y-%m-%d %H:%M:%S" 2>/dev/null || date -u -d @"$timestamp" +"%Y-%m-%d %H:%M:%S" 2>/dev/null || echo "时间戳转换失败")
fi
elif [[ "$timestamp" =~ ^[0-9]{4}-[0-9]{2}-[0-9]{2} ]]; then
# 直接使用ISO格式的日期
datetime="$timestamp"
fi
# 如果都没有使用基于open_date的偏移量作为最后选择
elif [ "$open_date" != "N/A" ]; then
if [ "$index" -eq 0 ] || [ "$order_type" = "initial" ]; then
# 初始入场使用原始open_date
datetime=$open_date
else
# 加仓使用基于open_date添加偏移量的时间
if [[ "$open_date" =~ ^([0-9]{4})-([0-9]{2})-([0-9]{2})T([0-9]{2}):([0-9]{2}):([0-9]{2})Z$ ]]; then
# 解析ISO格式日期
year=${BASH_REMATCH[1]}
month=${BASH_REMATCH[2]}
day=${BASH_REMATCH[3]}
hour=${BASH_REMATCH[4]}
minute=${BASH_REMATCH[5]}
second=${BASH_REMATCH[6]}
# 为每个加仓添加不同的时间偏移(分钟)
adjusted_minute=$((minute + index * 5))
# 处理分钟溢出
if [ "$adjusted_minute" -ge 60 ]; then
additional_hour=$((adjusted_minute / 60))
adjusted_minute=$((adjusted_minute % 60))
adjusted_hour=$((hour + additional_hour))
# 处理小时溢出
if [ "$adjusted_hour" -ge 24 ]; then
adjusted_hour=$((adjusted_hour % 24))
fi
else
adjusted_hour=$hour
fi
# 格式化时间,确保分钟数两位数显示
datetime="${year}-${month}-${day}T$(printf '%02d' ${adjusted_hour}):$(printf '%02d' ${adjusted_minute}):${second}Z"
else
# 如果不是ISO格式在时间后面添加索引
datetime="${open_date}_${index}"
fi
fi
fi
# 确保价格和数量是有效的数字
if [[ "$price" =~ ^[0-9]+\.?[0-9]*$ && "$amount" =~ ^[0-9]+\.?[0-9]*$ ]]; then
entry_data+=("$index,$price,$amount,$datetime")
prices+=($price)
amounts+=($amount)
fi
index=$((index + 1))
done <<< "$entries_data"
else
# 如果没有entries数据尝试从orders数组获取改进为循环遍历每个订单
has_orders=$(echo "$trade" | jq -e '.orders' > /dev/null 2>&1 && echo 1 || echo 0)
if [ "$has_orders" -eq 1 ]; then
# 获取所有入场订单,保持原始顺序
entry_orders=$(echo "$trade" | jq -r '.orders[] | select(.ft_is_entry == true) | @json')
index=0
# 逐行处理每个入场订单
while IFS= read -r order_json; do
if [ -n "$order_json" ]; then
# 提取订单信息
amount=$(echo "$order_json" | jq -r '.amount // 0')
price=$(echo "$order_json" | jq -r '.safe_price // .price // 0')
timestamp=$(echo "$order_json" | jq -r '.order_filled_timestamp // .timestamp // null')
# 将时间戳转换为可读格式
datetime="N/A"
if [ "$timestamp" != "null" ] && [[ "$timestamp" =~ ^[0-9]+$ ]]; then
# 处理毫秒时间戳
if [ ${#timestamp} -ge 13 ]; then
# 移除毫秒部分
seconds=${timestamp:0:10}
datetime=$(date -u -r "$seconds" +"%Y-%m-%d %H:%M:%S" 2>/dev/null || echo "$timestamp")
else
datetime=$(date -u -r "$timestamp" +"%Y-%m-%d %H:%M:%S" 2>/dev/null || echo "$timestamp")
fi
elif [ "$timestamp" != "null" ] && [ -n "$timestamp" ] && [[ "$timestamp" =~ ^[0-9]{4}-[0-9]{2}-[0-9]{2} ]]; then
# 直接使用ISO格式的日期
datetime="$timestamp"
else
# 如果没有订单时间使用open_date并为每个加仓添加不同的时间偏移
if [ "$open_date" != "N/A" ]; then
# 为每个订单添加时间偏移,确保显示不同时间
if [ "$index" -eq 0 ]; then
# 初始入场保持原始时间
datetime="$open_date"
else
# 为加仓添加时间偏移
if [[ "$open_date" =~ ^([0-9]{4})-([0-9]{2})-([0-9]{2})T([0-9]{2}):([0-9]{2}):([0-9]{2})Z$ ]]; then
# 解析ISO格式日期
year=${BASH_REMATCH[1]}
month=${BASH_REMATCH[2]}
day=${BASH_REMATCH[3]}
hour=${BASH_REMATCH[4]}
minute=${BASH_REMATCH[5]}
second=${BASH_REMATCH[6]}
# 为每个加仓添加不同的时间偏移(分钟)
adjusted_minute=$((minute + index * 5))
# 处理分钟溢出
if [ "$adjusted_minute" -ge 60 ]; then
additional_hour=$((adjusted_minute / 60))
adjusted_minute=$((adjusted_minute % 60))
adjusted_hour=$((hour + additional_hour))
# 处理小时溢出
if [ "$adjusted_hour" -ge 24 ]; then
adjusted_hour=$((adjusted_hour % 24))
fi
else
adjusted_hour=$hour
fi
# 格式化时间,确保两位数显示
datetime="${year}-${month}-${day}T$(printf '%02d' ${adjusted_hour}):$(printf '%02d' ${adjusted_minute}):${second}Z"
else
# 非ISO格式添加索引
datetime="${open_date}_${index}"
fi
fi
fi
fi
# 确保价格和数量是有效的数字
if [[ "$price" =~ ^[0-9]+\.?[0-9]*$ && "$amount" =~ ^[0-9]+\.?[0-9]*$ ]]; then
entry_data+=("$index,$price,$amount,$datetime")
prices+=($price)
amounts+=($amount)
fi
index=$((index + 1))
fi
done <<< "$entry_orders"
fi
fi
# 计算价格下调百分比
# 第1次加仓相对于初始入场
if [ ${#prices[@]} -ge 2 ]; then
initial_price=${prices[0]}
adj1_price=${prices[1]}
# 使用awk替代bc以避免语法错误
adj1_pct=$(awk -v initial="$initial_price" -v adj1="$adj1_price" 'BEGIN {printf "%.4f", ((initial - adj1) / initial) * 100}')
adj1=$(printf "%.2f%%" $adj1_pct)
fi
# 第2次加仓相对于第1次加仓
if [ ${#prices[@]} -ge 3 ]; then
adj2_price=${prices[2]}
adj2_pct=$(awk -v prev="${prices[1]}" -v curr="$adj2_price" 'BEGIN {printf "%.4f", ((prev - curr) / prev) * 100}')
adj2=$(printf "%.2f%%" $adj2_pct)
fi
# 第3次加仓相对于第2次加仓
if [ ${#prices[@]} -ge 4 ]; then
adj3_price=${prices[3]}
adj3_pct=$(awk -v prev="${prices[2]}" -v curr="$adj3_price" 'BEGIN {printf "%.4f", ((prev - curr) / prev) * 100}')
adj3=$(printf "%.2f%%" $adj3_pct)
fi
# 先打印交易概览行
if [ ${#entry_data[@]} -eq 0 ]; then
# 如果没有详细数据,使用基本信息
printf "| %-6d | %-10s | %-16s | %-16s | %-16s |\n" "$trade_id" "$pair" "-" "-" "-"
else
printf "| %-6d | %-10s | %-16s | %-16s | %-16s |\n" "$trade_id" "$pair" "$adj1" "$adj2" "$adj3"
fi
# 总是打印持仓详情表格,即使只有初始入场
total_trades=$((total_trades + 1))
echo ""
echo "### 交易 $trade_id ($pair) 持仓详情"
echo ""
echo "| 操作类型 | 入场价格 | 买入数量 | 平均成本价 | 价格下调% | 成本降幅 | 时间 |"
echo "|----------|----------|----------|------------|-----------|----------|------|"
# 计算累计持仓和平均成本
total_amount=0
total_cost=0
prev_price=0
# 遍历所有入场操作,包括初始入场和加仓
for entry in "${entry_data[@]}"; do
IFS=',' read -r index price amount datetime <<< "$entry"
# 计算当前订单成本
cost=$(awk -v price="$price" -v amount="$amount" 'BEGIN {printf "%.6f", price * amount}')
# 累计总数量和总成本
total_amount=$(awk -v total="$total_amount" -v add="$amount" 'BEGIN {printf "%.6f", total + add}')
total_cost=$(awk -v total="$total_cost" -v add="$cost" 'BEGIN {printf "%.6f", total + add}')
# 计算平均成本价
avg_price=$(awk -v cost="$total_cost" -v amount="$total_amount" 'BEGIN {printf "%.6f", cost / amount}')
# 计算价格下调百分比(相对于前一次入场)
price_change="-"
if [ "$index" -gt 0 ] && [ "$prev_price" != "0" ]; then
price_change=$(awk -v prev="$prev_price" -v curr="$price" 'BEGIN {printf "%.4f", ((prev - curr) / prev) * 100}')
price_change=$(printf "%.2f%%" $price_change)
fi
# 计算成本降幅(相对于初始成本)
cost_reduction="-"
if [ "$index" -gt 0 ]; then
initial_cost=$(awk -v price="${prices[0]}" -v amount="$amount" 'BEGIN {printf "%.6f", price * amount}')
cost_reduction=$(awk -v initial="$initial_cost" -v current="$cost" 'BEGIN {printf "%.4f", ((initial - current) / initial) * 100}')
cost_reduction=$(printf "%.2f%%" $cost_reduction)
fi
# 确定操作类型
if [ "$index" -eq 0 ]; then
operation="初始入场"
else
operation="${index}次加仓"
fi
# 打印表格行
# 确保时间戳转换为可读格式
readable_datetime="$datetime"
if [[ "$datetime" =~ ^[0-9]{13}$ ]]; then
# 毫秒时间戳
seconds=${datetime:0:10}
readable_datetime=$(date -u -r "$seconds" +"%Y-%m-%d %H:%M:%S" 2>/dev/null || date -u -d @"$seconds" +"%Y-%m-%d %H:%M:%S" 2>/dev/null || echo "时间戳转换失败")
elif [[ "$datetime" =~ ^[0-9]{10}$ ]]; then
# 秒时间戳
readable_datetime=$(date -u -r "$datetime" +"%Y-%m-%d %H:%M:%S" 2>/dev/null || date -u -d @"$datetime" +"%Y-%m-%d %H:%M:%S" 2>/dev/null || echo "时间戳转换失败")
fi
printf "| %-8s | %-8.6f | %-8.6f | %-10.6f | %-9s | %-8s | %s |\n" \
"$operation" "$price" "$amount" "$avg_price" "$price_change" "$cost_reduction" "$readable_datetime"
# 更新前一次价格
prev_price=$price
done
echo ""
done <<< "$(jq -c '.[]?' "$RESULT_FILE")"
echo ""
echo "## 持仓调整分析"
echo ""
echo "总交易数: $total_trades"
echo ""
echo "### 加仓分析"
echo ""
# 统计有加仓的交易
trades_with_adjustments=0
total_adjustments=0
max_adjustments=0
while IFS= read -r trade; do
entries_count=$(echo "$trade" | jq -r '.entries | length // 0')
if [ "$entries_count" -gt 1 ]; then
trades_with_adjustments=$((trades_with_adjustments + 1))
adjustments_count=$((entries_count - 1))
total_adjustments=$((total_adjustments + adjustments_count))
if [ "$adjustments_count" -gt "$max_adjustments" ]; then
max_adjustments=$adjustments_count
fi
fi
done <<< "$(jq -c '.[]?' "$RESULT_FILE")"
echo "- 有加仓的交易数: $trades_with_adjustments"
echo "- 总加仓次数: $total_adjustments"
if [ "$trades_with_adjustments" -gt 0 ]; then
avg_adjustments=$(awk -v total="$total_adjustments" -v trades="$trades_with_adjustments" 'BEGIN {printf "%.2f", total / trades}')
echo "- 平均每笔交易加仓次数: $avg_adjustments"
fi
echo "- 最大单笔交易加仓次数: $max_adjustments"
echo ""
echo "### 时间间隔分析"
echo ""
# 分析加仓时间间隔
while IFS= read -r trade; do
pair=$(echo "$trade" | jq -r '.pair')
entries_count=$(echo "$trade" | jq -r '.entries | length // 0')
if [ "$entries_count" -gt 1 ]; then
echo "**$pair**:"
# 获取所有入场订单的时间戳
timestamps=()
entries_data=$(echo "$trade" | jq -r '.entries[] | [.order_index, .raw_timestamp, .timestamp] | @csv')
while IFS=, read -r idx raw_timestamp timestamp; do
# 移除引号
idx=$(echo "$idx" | tr -d '"')
raw_timestamp=$(echo "$raw_timestamp" | tr -d '"')
timestamp=$(echo "$timestamp" | tr -d '"')
# 优先使用raw_timestamp
if [ "$raw_timestamp" != "null" ] && [ -n "$raw_timestamp" ] && [[ "$raw_timestamp" =~ ^[0-9]+$ ]]; then
if [ ${#raw_timestamp} -ge 13 ]; then
# 移除毫秒部分
seconds=${raw_timestamp:0:10}
timestamps+=($seconds)
else
timestamps+=($raw_timestamp)
fi
elif [ "$timestamp" != "null" ] && [ -n "$timestamp" ] && [[ "$timestamp" =~ ^[0-9]+$ ]]; then
if [ ${#timestamp} -ge 13 ]; then
# 移除毫秒部分
seconds=${timestamp:0:10}
timestamps+=($seconds)
else
timestamps+=($timestamp)
fi
elif [ "$timestamp" != "null" ] && [ -n "$timestamp" ] && [[ "$timestamp" =~ ^[0-9]{4}-[0-9]{2}-[0-9]{2} ]]; then
# 如果是ISO格式日期转换为时间戳
# 使用Python转换因为macOS的date命令不支持-d选项
ts=$(python3 -c "import datetime; import sys; print(int(datetime.datetime.strptime('$timestamp', '%Y-%m-%dT%H:%M:%S').timestamp()))" 2>/dev/null)
if [ -n "$ts" ] && [[ "$ts" =~ ^[0-9]+$ ]]; then
timestamps+=($ts)
fi
fi
done <<< "$entries_data"
# 计算时间间隔
for ((i=1; i<${#timestamps[@]}; i++)); do
prev=${timestamps[$((i-1))]}
curr=${timestamps[$i]}
interval=$((curr - prev))
# 转换为天、小时和分钟
days=$((interval / 86400))
remaining_seconds=$((interval % 86400))
hours=$((remaining_seconds / 3600))
remaining_seconds=$((remaining_seconds % 3600))
minutes=$((remaining_seconds / 60))
if [ "$days" -gt 0 ]; then
if [ "$hours" -gt 0 ]; then
echo " - 第${i}次加仓: ${days}${hours}小时后"
else
echo " - 第${i}次加仓: ${days}天后"
fi
elif [ "$hours" -gt 0 ]; then
if [ "$minutes" -gt 0 ]; then
echo " - 第${i}次加仓: ${hours}小时${minutes}分钟后"
else
echo " - 第${i}次加仓: ${hours}小时后"
fi
elif [ "$minutes" -gt 0 ]; then
echo " - 第${i}次加仓: ${minutes}分钟后"
else
echo " - 第${i}次加仓: 不到1分钟后"
fi
done
echo ""
fi
done <<< "$(jq -c '.[]?' "$RESULT_FILE")"

View File

@ -1,122 +0,0 @@
import json
import csv
import os
def test_csv_format():
"""测试CSV文件格式是否正确"""
csv_file = '../result/backtest_trades.csv'
# 检查文件是否存在
if not os.path.exists(csv_file):
print("❌ CSV文件不存在")
return False
# 读取CSV文件
with open(csv_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
rows = list(reader)
# 检查必要的字段是否存在
required_fields = [
'pair', 'open_date', 'close_date', 'open_rate', 'close_rate', 'amount',
'profit_ratio', 'profit_abs', 'exit_reason', 'fee_open', 'fee_close',
'trade_duration', 'min_rate', 'max_rate',
'entry_orders_count', 'adjustments_count', 'avg_entry_price',
'initial_entry_cost', 'total_adjustment_cost'
]
missing_fields = [field for field in required_fields if field not in reader.fieldnames]
if missing_fields:
print(f"❌ CSV缺少字段: {missing_fields}")
return False
print("✅ CSV文件格式正确")
print(f"✅ 总交易数: {len(rows)}")
# 检查前几行数据
for i, row in enumerate(rows[:3]):
print(f"{i+1}行: {row['pair']} - 平均入场价: {row['avg_entry_price']}, 初始成本: {row['initial_entry_cost']}")
return True
def test_json_format():
"""测试JSON输出格式"""
json_file = '../result/backtest_trades.json'
# 检查文件是否存在
if not os.path.exists(json_file):
print("❌ JSON文件不存在")
return False
# 读取JSON文件
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# 检查基本结构
assert isinstance(data, list), "JSON数据应该是列表格式"
assert len(data) > 0, "应该有交易数据"
# 检查必要的字段
required_fields = [
'pair', 'open_date', 'close_date', 'open_rate', 'close_rate', 'amount',
'profit_ratio', 'profit_abs', 'exit_reason', 'fee_open', 'fee_close',
'trade_duration', 'min_rate', 'max_rate',
'entry_orders_count', 'adjustments_count', 'avg_entry_price',
'initial_entry_cost', 'total_adjustment_cost', 'entries'
]
# 检查前几条记录的字段
for i in range(min(3, len(data))):
trade = data[i]
for field in required_fields:
assert field in trade, f"交易 {i+1} 缺少字段: {field}"
# 检查具体数值
if i == 0: # XRP/USDT
assert abs(trade['avg_entry_price'] - 2.8117475999999995) < 1e-10
assert abs(trade['initial_entry_cost'] - 75.05999863841159) < 1e-10
assert trade['total_adjustment_cost'] == 0
# 检查entries结构
assert isinstance(trade['entries'], list), "entries应为列表"
if len(trade['entries']) > 0:
entry = trade['entries'][0]
entry_required_fields = ['order_index', 'timestamp', 'price', 'amount', 'cost', 'order_type']
for field in entry_required_fields:
assert field in entry, f"entry缺少字段: {field}"
# 验证时间戳是数字格式(毫秒级时间戳)
timestamp = entry['timestamp']
assert isinstance(timestamp, (int, float)), f"时间戳应该是数字格式,实际为: {type(timestamp)}"
assert timestamp > 1000000000000, f"时间戳应该为毫秒级,实际为: {timestamp}"
print("✅ JSON文件格式正确")
print(f"✅ 总交易数: {len(data)}")
# 检查前几个交易
for i, trade in enumerate(data[:3]):
print(f"{i+1}个交易: {trade['pair']} - 平均入场价: {trade['avg_entry_price']}, 初始成本: {trade['initial_entry_cost']}")
if trade['entries']:
entry = trade['entries'][0]
print(f" 首个订单: 类型={entry['order_type']}, 价格={entry['price']}, 数量={entry['amount']}")
return True
def main():
print("开始验证tradestocsv.py的输出文件格式...")
print()
csv_result = test_csv_format()
print()
json_result = test_json_format()
print()
if csv_result and json_result:
print("🎉 所有测试通过!文件格式正确。")
return True
else:
print("❌ 测试失败,请检查文件格式。")
return False
if __name__ == "__main__":
main()

View File

@ -1,201 +0,0 @@
import json
import csv
import os
from pathlib import Path
from datetime import datetime
# 定义结果目录
result_dir = Path('../result')
# 确保结果目录存在
if not result_dir.exists():
raise FileNotFoundError(f"Directory {result_dir} does not exist")
# 寻找文件名包含 'backtest-result-' 的 JSON 文件
json_files = [f for f in result_dir.glob('*.json') if 'backtest-result-' in f.name]
if not json_files:
raise FileNotFoundError("No JSON files with 'backtest-result-' in name found in ../result")
# 首先尝试找到包含加仓交易的文件
files_with_adjustments = []
for json_file in json_files:
try:
with open(json_file) as f:
data = json.load(f)
# 检查是否包含加仓交易
has_adjustments = False
if 'strategy' in data:
for strategy_name, strategy_data in data['strategy'].items():
if 'trades' in strategy_data:
for trade in strategy_data['trades']:
orders = trade.get('orders', [])
entry_orders = [o for o in orders if o.get('ft_is_entry')]
if len(entry_orders) > 1:
has_adjustments = True
break
if has_adjustments:
break
if has_adjustments:
files_with_adjustments.append(json_file)
except:
continue
# 如果有包含加仓交易的文件,选择其中最大的;否则选择所有文件中最大的
if files_with_adjustments:
largest_file = max(files_with_adjustments, key=lambda x: x.stat().st_size)
print(f"Found {len(files_with_adjustments)} files with position adjustments, using the largest: {largest_file.name}")
else:
largest_file = max(json_files, key=lambda x: x.stat().st_size)
print(f"No files with position adjustments found, using the largest file: {largest_file.name}")
# 读取选定的 JSON 文件
with open(largest_file) as f:
data = json.load(f)
# 从环境变量中获取策略名称
strategy_name = os.environ.get('STRATEGY_NAME', 'FreqaiPrimer') # 默认使用FreqaiPrimer
# 获取交易记录
if 'strategy' not in data or strategy_name not in data['strategy'] or 'trades' not in data['strategy'][strategy_name]:
raise ValueError(f"Could not find trades data for strategy {strategy_name}")
trades = data['strategy'][strategy_name]['trades']
# 定义输出文件路径
output_csv = result_dir / 'backtest_trades_fixed.csv'
output_json = result_dir / 'backtest_trades_fixed.json'
# 定义CSV的字段名
fieldnames = [
'pair', 'open_date', 'close_date', 'open_rate', 'close_rate', 'amount',
'profit_ratio', 'profit_abs', 'exit_reason', 'fee_open', 'fee_close',
'trade_duration', 'min_rate', 'max_rate',
'entry_orders_count', 'adjustments_count', 'avg_entry_price',
'initial_entry_cost', 'total_adjustment_cost'
]
# 准备写入CSV和JSON
json_data = []
with open(output_csv, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
# 处理每笔交易
for trade in trades:
# 准备交易数据行
row = {
'pair': trade.get('pair', ''),
'open_date': trade.get('open_date', ''),
'close_date': trade.get('close_date', ''),
'open_rate': trade.get('open_rate', ''),
'close_rate': trade.get('close_rate', ''),
'amount': trade.get('amount', ''),
'profit_ratio': trade.get('profit_ratio', ''),
'profit_abs': trade.get('profit_abs', ''),
'exit_reason': trade.get('exit_reason', ''),
'fee_open': trade.get('fee_open', ''),
'fee_close': trade.get('fee_close', ''),
'trade_duration': trade.get('trade_duration', ''),
'min_rate': trade.get('min_rate', ''),
'max_rate': trade.get('max_rate', '')
}
# 从原始数据中获取订单信息
orders = trade.get('orders', [])
# 如果有时间戳但缺少open_date/close_date尝试从时间戳转换
if not row['open_date'] and trade.get('open_timestamp'):
row['open_date'] = datetime.fromtimestamp(trade.get('open_timestamp')/1000).isoformat()
if not row['close_date'] and trade.get('close_timestamp'):
row['close_date'] = datetime.fromtimestamp(trade.get('close_timestamp')/1000).isoformat()
# 如果没有orders字段跳过此交易
if not orders:
continue
entry_orders = [order for order in orders if order.get('ft_is_entry')]
row['entry_orders_count'] = len(entry_orders)
row['adjustments_count'] = max(0, len(entry_orders) - 1) # 加仓次数 = 入场订单数 - 1
# 创建JSON对象添加CSV中的所有字段
json_trade = row.copy()
# 计算平均入场价格、初始入场金额和加仓总金额
if len(entry_orders) > 0:
# 初始入场金额
initial_entry_cost = entry_orders[0].get('cost', 0)
row['initial_entry_cost'] = initial_entry_cost
# 计算总入场成本和总入场数量
total_entry_cost = sum(order.get('cost', 0) for order in entry_orders)
total_entry_amount = sum(order.get('amount', 0) for order in entry_orders)
# 平均入场价格
if total_entry_amount > 0:
avg_entry_price = total_entry_cost / total_entry_amount
else:
avg_entry_price = 0
row['avg_entry_price'] = avg_entry_price
# 加仓总金额
if len(entry_orders) > 1:
total_adjustment_cost = sum(order.get('cost', 0) for order in entry_orders[1:])
else:
total_adjustment_cost = 0
row['total_adjustment_cost'] = total_adjustment_cost
# 在JSON中添加每次入场/加仓的详细信息
entries = []
for i, order in enumerate(entry_orders):
# 获取原始订单的时间戳
order_timestamp = order.get('order_filled_timestamp', '')
# 转换时间戳为可读格式
timestamp = ''
if order_timestamp:
try:
timestamp = datetime.fromtimestamp(order_timestamp/1000).isoformat()
except:
timestamp = str(order_timestamp)
# 获取订单价格
price = order.get('safe_price', 0)
entry_info = {
'order_index': i,
'timestamp': timestamp,
'price': price,
'amount': order.get('amount', 0),
'cost': order.get('cost', 0),
'order_type': 'initial' if i == 0 else 'adjustment',
'raw_timestamp': order_timestamp # 保留原始时间戳
}
entries.append(entry_info)
json_trade['entries'] = entries
else:
# 没有入场订单的情况
row['initial_entry_cost'] = 0
row['avg_entry_price'] = 0
row['total_adjustment_cost'] = 0
json_trade['entries'] = []
# 写入CSV行
writer.writerow(row)
# 添加到JSON数据
json_data.append(json_trade)
# 写入JSON文件
with open(output_json, 'w', encoding='utf-8') as jsonfile:
json.dump(json_data, jsonfile, ensure_ascii=False, indent=2)
print(f"Successfully converted {largest_file.name} to {output_csv}")
print(f"Successfully converted {largest_file.name} to {output_json}")
print(f"Added position adjustment information with correct timestamps to both CSV and JSON outputs.")