tanya/okx/candleList.go
zhangkun9038@dingtalk.com 78fd73aec0 索引 ilm
2025-03-29 19:53:31 +08:00

607 lines
17 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package okx
import (
"bytes"
"encoding/csv"
"encoding/json"
"fmt"
cfg "gitea.zjmud.xyz/phyer/tanya/config"
"gitea.zjmud.xyz/phyer/tanya/elasticilm"
"io"
"net/http"
"strconv"
"strings"
"time"
)
// formatTimestamp 将毫秒时间戳字符串转换为可读格式
func formatTimestamp(ts string) (string, error) {
millis, err := strconv.ParseInt(ts, 10, 64)
if err != nil {
return "", err
}
loc, _ := time.LoadLocation("Asia/Shanghai")
return time.UnixMilli(millis).In(loc).Format("2006-01-02T15:04:05+08:00"), nil
}
// CandleList 封装Candle数组并提供序列化方法
type CandleList struct {
Candles []*Candle
CoinPair string // 交易对名称,如 BTC-USDT
Period string // 周期名称,如 15M, 1D
}
func MakeCandleList(instId string, period string, startTime time.Time, endTime time.Time, blockSize int) (*CandleList, error) {
cl := &CandleList{
CoinPair: instId,
Period: period,
}
service := NewOkxPublicDataService()
var allCandles []*Candle
currentTime := endTime
for currentTime.After(startTime) || currentTime.Equal(startTime) {
req := CandlesRequest{
InstID: instId,
Bar: period,
After: strconv.FormatInt(currentTime.UnixMilli(), 10),
Limit: strconv.Itoa(blockSize),
}
candles, err := service.GetCandles(req)
if err != nil {
return nil, err
}
if len(candles) == 0 {
break
}
allCandles = append(allCandles, candles...)
// 更新currentTime为最后一条数据的时间戳
lastCandleTime, _ := strconv.ParseInt(candles[len(candles)-1].Timestamp, 10, 64)
newCurrentTime := time.UnixMilli(lastCandleTime).Add(time.Millisecond)
currentTime = newCurrentTime
}
fmt.Println("lens of allCandles: ", len(allCandles))
cl.Candles = allCandles
return cl, nil
}
// ToJson 将Candle数组序列化为JSON字符串
func (cl *CandleList) ToJson() (string, error) {
// 创建临时结构体用于格式化时间戳
type FormattedCandle struct {
Timestamp string `json:"timestamp"`
Open string `json:"open"`
High string `json:"high"`
Low string `json:"low"`
Close string `json:"close"`
Volume string `json:"volume"`
VolumeCcy string `json:"volumeCcy"`
}
var formattedCandles []FormattedCandle
for _, candle := range cl.Candles {
formattedTs, err := formatTimestamp(candle.Timestamp)
if err != nil {
return "", err
}
formattedCandles = append(formattedCandles, FormattedCandle{
Timestamp: formattedTs,
Open: candle.Open,
High: candle.High,
Low: candle.Low,
Close: candle.Close,
Volume: candle.Volume,
VolumeCcy: candle.VolumeCcy,
})
}
jsonData, err := json.Marshal(formattedCandles)
if err != nil {
return "", err
}
return string(jsonData), nil
}
// ToCsv 将Candle数组序列化为CSV字符串
func (cl *CandleList) ToCsv() (string, error) {
var sb strings.Builder
writer := csv.NewWriter(&sb)
// 写入CSV头
header := []string{"Timestamp", "Open", "High", "Low", "Close", "Volume", "VolumeCcy"}
if err := writer.Write(header); err != nil {
return "", err
}
// 调试信息
fmt.Printf("Number of candles: %d\n", len(cl.Candles))
for _, candle := range cl.Candles {
formattedTs, err := formatTimestamp(candle.Timestamp)
if err != nil {
return "", err
}
record := []string{
formattedTs,
candle.Open,
candle.High,
candle.Low,
candle.Close,
candle.Volume,
candle.VolumeCcy,
}
if err := writer.Write(record); err != nil {
return "", err
}
}
writer.Flush()
if err := writer.Error(); err != nil {
return "", err
}
return sb.String(), nil
}
// ToFluentd 将Candle数据发送到Fluentd
func (cl *CandleList) ToFluentd() error {
// 获取当前年份
currentYear := time.Now().Year()
// 构造tag格式为tanya.candle.交易对.年份.周期(保持周期原样)
tag := fmt.Sprintf("tanya.candle.%s.%d.%s", cl.CoinPair, currentYear, cl.Period)
// 解析周期
duration, err := parsePeriod(cl.Period)
if err != nil {
return fmt.Errorf("invalid period: %v", err)
}
// 逐条发送记录
for _, candle := range cl.Candles {
// 验证时间戳是否为周期的整数倍
ts, err := strconv.ParseInt(candle.Timestamp, 10, 64)
if err != nil {
fmt.Println("invalid timestamp:", ts, err)
continue
}
// 转换为东八区时间
loc, _ := time.LoadLocation("Asia/Shanghai")
cstTime := time.UnixMilli(ts).In(loc)
// 对于日线数据,检查是否为当天的 00:00:00
if cl.Period == "1D" {
if cstTime.Hour() != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 {
return fmt.Errorf("timestamp %d (%s) is not aligned with period :%s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period)
}
} else {
// 对于其他周期,使用原来的对齐检查
if cstTime.UnixMilli()%duration.Milliseconds() != 0 {
return fmt.Errorf("timestamp %d is not aligned with period %s", ts, cl.Period)
}
}
// 格式化时间
formattedTime := cstTime.Format("2006-01-02 15:04:05")
// 构造记录
record := map[string]interface{}{
"_id": ts, // 使用时间戳作为_id
"dataTime": formattedTime,
"open": candle.Open,
"high": candle.High,
"low": candle.Low,
"close": candle.Close,
"volume": candle.Volume,
"volumeCcy": candle.VolumeCcy,
}
// 构造请求体
payload := map[string]interface{}{
"tag": tag,
"record": record,
}
jsonData, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("failed to marshal data: %v", err)
}
// 尝试从不同层级加载配置
var config *cfg.Config
configPaths := []string{
"config/config.json", // 当前目录下的config目录
"../config/config.json", // 上一级目录下的config目录
"../../config/config.json", // 上两级目录下的config目录
}
for _, path := range configPaths {
config, err = cfg.LoadConfig(path)
if err == nil {
break
}
}
if err != nil {
return fmt.Errorf("failed to load config after trying paths: %v. Tried paths: %v", err, configPaths)
}
// 构造完整URL添加json参数
fullURL := fmt.Sprintf("%s/%s?json", strings.TrimRight(config.Fluentd.URL, "/"), tag)
// 输出完整请求URL和请求体到日志
fmt.Printf("Sending request to URL: %s\n", fullURL)
fmt.Printf("Request Body: %s\n", string(jsonData))
// 创建带超时的HTTP客户端
client := &http.Client{
Timeout: 30 * time.Second,
}
// 创建请求
req, err := http.NewRequest("POST", fullURL, bytes.NewBuffer(jsonData))
if err != nil {
return fmt.Errorf("failed to create request: %v", err)
}
// 设置请求头
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
// 发送HTTP请求
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to send data to Fluentd: %v", err)
}
defer resp.Body.Close()
// 读取响应体
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response body: %v", err)
}
// 输出完整的响应信息
fmt.Printf("HTTP Response Status: %s\n", resp.Status)
fmt.Printf("HTTP Response Headers: %v\n", resp.Header)
// fmt.Printf("HTTP Response Body: %s\n", string(body))
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %d, response: %s", resp.StatusCode, string(body))
}
}
fmt.Printf("Successfully sent record to Fluentd count:", len(cl.Candles))
return nil
}
// CalculateCrossPair 计算两个币对的中间币对
func (cl *CandleList) CalculateCrossPair(other *CandleList) (*CandleList, error) {
// 验证周期是否一致
if cl.Period != other.Period {
return nil, fmt.Errorf("period mismatch: %s vs %s", cl.Period, other.Period)
}
// 解析币对名称
base1, quote1 := parsePair(cl.CoinPair)
base2, quote2 := parsePair(other.CoinPair)
// 验证是否构成交叉币对
if quote1 != quote2 {
return nil, fmt.Errorf("not a valid cross pair: %s and %s", cl.CoinPair, other.CoinPair)
}
// 创建新的CandleList
crossPair := fmt.Sprintf("%s-%s", base1, base2)
result := &CandleList{
CoinPair: crossPair,
Period: cl.Period,
}
fmt.Println("result.CoinPair, result.Period: ", result.CoinPair, result.Period)
// 创建时间戳映射
clMap := make(map[string]*Candle)
for _, candle := range cl.Candles {
clMap[candle.Timestamp] = candle
}
// 遍历另一个CandleList寻找匹配的时间戳
for _, otherCandle := range other.Candles {
if clCandle, exists := clMap[otherCandle.Timestamp]; exists {
// 计算中间币对的价格
open, err := calculateCrossPrice(clCandle.Open, otherCandle.Open)
if err != nil {
return nil, err
}
high, err := calculateCrossPrice(clCandle.High, otherCandle.High)
if err != nil {
return nil, err
}
low, err := calculateCrossPrice(clCandle.Low, otherCandle.Low)
if err != nil {
return nil, err
}
close, err := calculateCrossPrice(clCandle.Close, otherCandle.Close)
if err != nil {
return nil, err
}
// 创建新的Candle
crossCandle := &Candle{
Timestamp: otherCandle.Timestamp,
Open: open,
High: high,
Low: low,
Close: close,
Volume: "0", // 成交量不直接转换
VolumeCcy: "0", // 成交量不直接转换
}
result.Candles = append(result.Candles, crossCandle)
}
}
if len(result.Candles) == 0 {
return nil, fmt.Errorf("no overlapping timestamps found")
}
return result, nil
}
// parsePair 解析币对名称
func parsePair(pair string) (string, string) {
parts := strings.Split(pair, "-")
if len(parts) != 2 {
return "", ""
}
return parts[0], parts[1]
}
// calculateCrossPrice 计算交叉币对价格
func calculateCrossPrice(price1, price2 string) (string, error) {
p1, err := strconv.ParseFloat(price1, 64)
if err != nil {
return "", fmt.Errorf("invalid price1: %v", err)
}
p2, err := strconv.ParseFloat(price2, 64)
if err != nil {
return "", fmt.Errorf("invalid price2: %v", err)
}
if err != nil {
return "", fmt.Errorf("invalid price2: %v", err)
}
if p2 == 0 {
return "", fmt.Errorf("division by zero")
}
result := p1 / p2
return fmt.Sprintf("%.8f", result), nil
}
// ToElastic 将 Candle 数据发送到 Elasticsearch
func (cl *CandleList) ToElastic() error {
client := &http.Client{Timeout: 30 * time.Second}
_, err := parsePeriod(cl.Period)
if err != nil {
return fmt.Errorf("failed to parse period: %v", err)
}
var config *cfg.Config
configPaths := []string{"config/config.json", "../config/config.json", "../../config/config.json"}
for _, path := range configPaths {
config, err = cfg.LoadConfig(path)
if err == nil {
break
}
}
if err != nil {
return fmt.Errorf("failed to load config: %v", err)
}
if len(cl.Candles) == 0 {
return nil
}
// 按月分组数据
type monthlyCandles struct {
Index string
Candles []*Candle
IndexTime time.Time
}
indexMap := make(map[string]monthlyCandles)
loc, _ := time.LoadLocation("Asia/Shanghai")
for _, candle := range cl.Candles {
ts, err := strconv.ParseInt(candle.Timestamp, 10, 64)
if err != nil {
fmt.Printf("invalid timestamp: %s, skipping: %v\n", candle.Timestamp, err)
continue
}
cstTime := time.UnixMilli(ts).In(loc)
index := fmt.Sprintf("logstash-%s.candle.%s.%d-%02d",
strings.ToLower(cl.Period),
strings.ToLower(cl.CoinPair), // Keep the hyphen in coin pair name
cstTime.Year(),
int(cstTime.Month()),
)
if group, exists := indexMap[index]; exists {
group.Candles = append(group.Candles, candle)
indexMap[index] = group
} else {
indexMap[index] = monthlyCandles{
Index: index,
Candles: []*Candle{candle},
IndexTime: cstTime,
}
}
}
// 为每个月的索引配置 ILM 并插入数据
for _, group := range indexMap {
// 配置 ILM使用该月第一条数据的时间
err = elasticilm.ConfigureILM(client, config, "candle", strings.ToLower(cl.Period), cl.CoinPair, group.IndexTime)
if err != nil {
return fmt.Errorf("failed to configure ILM for index %s: %v", group.Index, err)
}
// 插入该月的所有数据
for _, candle := range group.Candles {
ts, err := strconv.ParseInt(candle.Timestamp, 10, 64)
if err != nil {
fmt.Printf("invalid timestamp: %s, skipping: %v\n", candle.Timestamp, err)
continue
}
cstTime := time.UnixMilli(ts).In(loc)
// 时间戳对齐检查
duration, err := parsePeriod(cl.Period)
if err != nil {
return fmt.Errorf("failed to parse period: %v", err)
}
// 对于日线及以上周期,检查是否为周期的整数倍
if duration >= 24*time.Hour {
// 特殊处理1D周期
if cl.Period == "1D" {
// 检查是否为当天的00:00:00
if cstTime.Hour() != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 {
return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period)
}
} else {
// 对于其他日线及以上周期,使用原来的检查逻辑
totalHours := cstTime.Unix() / 3600
periodHours := int64(duration.Hours())
if totalHours%periodHours != 0 {
return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period)
}
if cstTime.Minute() != 0 || cstTime.Second() != 0 {
return fmt.Errorf("timestamp %d (%s) has non-zero minutes/seconds for period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period)
}
}
} else if duration >= time.Hour {
// 对于小时级周期,检查小时、分钟、秒是否对齐
switch cl.Period {
case "1H":
if cstTime.Minute() != 0 || cstTime.Second() != 0 {
return fmt.Errorf("timestamp %d (%s) has non-zero minutes/seconds for period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period)
}
case "2H":
if cstTime.Hour()%2 != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 {
return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period)
}
case "4H":
if cstTime.Hour()%4 != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 {
return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period)
}
case "6H":
if (cstTime.Hour()%6 != 0) || cstTime.Minute() != 0 || cstTime.Second() != 0 {
return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period)
}
case "12H":
if (cstTime.Hour() != 0 && cstTime.Hour() != 12) || cstTime.Minute() != 0 || cstTime.Second() != 0 {
return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period)
}
default:
if cstTime.UnixMilli()%duration.Milliseconds() != 0 {
return fmt.Errorf("timestamp %d is not aligned with period %s", ts, cl.Period)
}
}
} else {
// 对于分钟级周期,使用毫秒取模检查
if cstTime.UnixMilli()%duration.Milliseconds() != 0 {
return fmt.Errorf("timestamp %d is not aligned with period %s", ts, cl.Period)
}
}
record := map[string]interface{}{
"dataTime": cstTime.Format("2006-01-02 15:04:05"),
"coinPair": cl.CoinPair,
"period": cl.Period,
"open": candle.Open,
"high": candle.High,
"low": candle.Low,
"close": candle.Close,
"volume": candle.Volume,
"volumeCcy": candle.VolumeCcy,
}
jsonData, err := json.Marshal(record)
if err != nil {
return fmt.Errorf("failed to marshal data: %v", err)
}
fullURL := fmt.Sprintf("%s/%s/_doc/%d", strings.TrimRight(config.Elasticsearch.URL, "/"), group.Index, ts)
req, err := http.NewRequest("POST", fullURL, bytes.NewBuffer(jsonData))
if err != nil {
return fmt.Errorf("failed to create request: %v", err)
}
req.Header.Set("Content-Type", "application/json")
req.SetBasicAuth(config.Elasticsearch.Auth.Username, config.Elasticsearch.Auth.Password)
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to send data to Elasticsearch: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("unexpected status code: %d, response: %s", resp.StatusCode, string(body))
}
}
}
fmt.Printf("Successfully sent record to Elasticsearch")
return nil
}
// parsePeriod 将时间框架字符串转换为 time.Duration
func parsePeriod(period string) (time.Duration, error) {
switch period {
case "1m":
return time.Minute, nil
case "3m":
return 3 * time.Minute, nil
case "5m":
return 5 * time.Minute, nil
case "15m":
return 15 * time.Minute, nil
case "30m":
return 30 * time.Minute, nil
case "1H":
return time.Hour, nil
case "2H":
return 2 * time.Hour, nil
case "4H":
return 4 * time.Hour, nil
case "6H":
return 6 * time.Hour, nil
case "12H":
return 12 * time.Hour, nil
case "1D", "1d":
return 24 * time.Hour, nil
case "2D":
return 48 * time.Hour, nil
case "5D":
return 120 * time.Hour, nil
case "1W":
return 168 * time.Hour, nil
case "1M":
return 720 * time.Hour, nil // 假设 30 天
default:
return 0, fmt.Errorf("unsupported period: %s", period)
}
}