myTestFreqAI/tools/test_okx_candles.py
2026-02-26 10:47:21 +08:00

73 lines
2.0 KiB
Python

#!/usr/bin/env python3
"""
直接测试 OKX API 返回的蜡烛数据
验证最后一根蜡烛是否是最新的
"""
import ccxt
import sys
from datetime import datetime, timezone
def main():
# 初始化 OKX 交易所
exchange = ccxt.okx({
'enableRateLimit': True,
})
symbol = 'BTC/USDT'
timeframe = '15m'
limit = 10
print(f"测试 OKX API: {symbol} {timeframe}")
print("=" * 60)
try:
# 获取最近的蜡烛数据
ohlcv = exchange.fetch_ohlcv(symbol, timeframe=timeframe, limit=limit)
print(f"获取到 {len(ohlcv)} 根蜡烛")
print()
# 显示所有蜡烛的时间
print("所有蜡烛时间:")
print("-" * 40)
for i, candle in enumerate(ohlcv):
timestamp = candle[0]
dt = datetime.fromtimestamp(timestamp / 1000, tz=timezone.utc)
print(f" {i+1:2d}. {dt.strftime('%Y-%m-%d %H:%M:%S UTC')}")
print()
print("=" * 60)
# 最新蜡烛
latest_candle = ohlcv[-1]
latest_timestamp = latest_candle[0]
latest_dt = datetime.fromtimestamp(latest_timestamp / 1000, tz=timezone.utc)
# 当前时间
now = datetime.now(timezone.utc)
print(f"最新蜡烛时间: {latest_dt.strftime('%Y-%m-%d %H:%M:%S UTC')}")
print(f"当前时间: {now.strftime('%Y-%m-%d %H:%M:%S UTC')}")
# 计算时间差
time_diff = (now - latest_dt).total_seconds() / 60
print(f"时间差: {time_diff:.1f} 分钟")
print()
print("最新蜡烛数据:")
print(f" Open: {latest_candle[1]}")
print(f" High: {latest_candle[2]}")
print(f" Low: {latest_candle[3]}")
print(f" Close: {latest_candle[4]}")
print(f" Vol: {latest_candle[5]}")
except Exception as e:
print(f"错误: {e}", file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
sys.exit(main())