diff --git a/okx/candleList.go b/okx/candleList.go index e675c88..101692e 100644 --- a/okx/candleList.go +++ b/okx/candleList.go @@ -405,34 +405,25 @@ func (cl *CandleList) ToElastic() error { return fmt.Errorf("failed to load config: %v", err) } - // 如果没有数据,直接返回 if len(cl.Candles) == 0 { return nil } - // 使用第一条数据的时间戳配置 ILM - firstCandle := cl.Candles[0] - ts, err := strconv.ParseInt(firstCandle.Timestamp, 10, 64) - if err != nil { - return fmt.Errorf("invalid timestamp in first candle: %v", err) + // 按月分组数据 + type monthlyCandles struct { + Index string + Candles []*Candle + IndexTime time.Time } + indexMap := make(map[string]monthlyCandles) loc, _ := time.LoadLocation("Asia/Shanghai") - cstTime := time.UnixMilli(ts).In(loc) - // 配置 ILM(只执行一次) - err = elasticilm.ConfigureILM(client, config, "candle", strings.ToLower(cl.Period), cstTime) - if err != nil { - return fmt.Errorf("failed to configure ILM: %v", err) - } - - // 批量插入数据 for _, candle := range cl.Candles { ts, err := strconv.ParseInt(candle.Timestamp, 10, 64) if err != nil { - fmt.Println("invalid timestamp:", candle.Timestamp, err) + fmt.Printf("invalid timestamp: %s, skipping: %v\n", candle.Timestamp, err) continue } - cstTime := time.UnixMilli(ts).In(loc) index := fmt.Sprintf("logstash-%s.candle.%s.%d-%02d", strings.ToLower(cl.Period), @@ -441,63 +432,83 @@ func (cl *CandleList) ToElastic() error { int(cstTime.Month()), ) - // 检查时间戳对齐 - if cl.Period == "1D" { - if cstTime.Hour() != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 { - return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period) - } + if group, exists := indexMap[index]; exists { + group.Candles = append(group.Candles, candle) + indexMap[index] = group } else { - if cstTime.UnixMilli()%duration.Milliseconds() != 0 { - return fmt.Errorf("timestamp %d is not aligned with period %s", ts, cl.Period) + indexMap[index] = monthlyCandles{ + Index: index, + Candles: []*Candle{candle}, + IndexTime: cstTime, } } + } - record := map[string]interface{}{ - "dataTime": cstTime.Format("2006-01-02 15:04:05"), - "open": candle.Open, - "high": candle.High, - "low": candle.Low, - "close": candle.Close, - "volume": candle.Volume, - "volumeCcy": candle.VolumeCcy, - } - - jsonData, err := json.Marshal(record) + // 为每个月的索引配置 ILM 并插入数据 + for _, group := range indexMap { + // 配置 ILM,使用该月第一条数据的时间 + err = elasticilm.ConfigureILM(client, config, "candle", strings.ToLower(cl.Period), group.IndexTime) if err != nil { - return fmt.Errorf("failed to marshal data: %v", err) + return fmt.Errorf("failed to configure ILM for index %s: %v", group.Index, err) } - fullURL := fmt.Sprintf("%s/%s/_doc/%d", strings.TrimRight(config.Elasticsearch.URL, "/"), index, ts) - fmt.Println("fullURL: ", fullURL) + // 插入该月的所有数据 + for _, candle := range group.Candles { + ts, err := strconv.ParseInt(candle.Timestamp, 10, 64) + if err != nil { + fmt.Printf("invalid timestamp: %s, skipping: %v\n", candle.Timestamp, err) + continue + } + cstTime := time.UnixMilli(ts).In(loc) - req, err := http.NewRequest("POST", fullURL, bytes.NewBuffer(jsonData)) - if err != nil { - return fmt.Errorf("failed to create request: %v", err) + // 时间戳对齐检查 + if cl.Period == "1D" { + if cstTime.Hour() != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 { + return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period) + } + } else { + if cstTime.UnixMilli()%duration.Milliseconds() != 0 { + return fmt.Errorf("timestamp %d is not aligned with period %s", ts, cl.Period) + } + } + + record := map[string]interface{}{ + "dataTime": cstTime.Format("2006-01-02 15:04:05"), + "open": candle.Open, + "high": candle.High, + "low": candle.Low, + "close": candle.Close, + "volume": candle.Volume, + "volumeCcy": candle.VolumeCcy, + } + + jsonData, err := json.Marshal(record) + if err != nil { + return fmt.Errorf("failed to marshal data: %v", err) + } + + fullURL := fmt.Sprintf("%s/%s/_doc/%d", strings.TrimRight(config.Elasticsearch.URL, "/"), group.Index, ts) + req, err := http.NewRequest("POST", fullURL, bytes.NewBuffer(jsonData)) + if err != nil { + return fmt.Errorf("failed to create request: %v", err) + } + + req.Header.Set("Content-Type", "application/json") + req.SetBasicAuth(config.Elasticsearch.Auth.Username, config.Elasticsearch.Auth.Password) + + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to send data to Elasticsearch: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("unexpected status code: %d, response: %s", resp.StatusCode, string(body)) + } + + fmt.Printf("Successfully sent record to Elasticsearch: %s\n", fullURL) } - - req.Header.Set("Content-Type", "application/json") - req.Header.Set("Accept", "application/json") - req.SetBasicAuth(config.Elasticsearch.Auth.Username, config.Elasticsearch.Auth.Password) - - resp, err := client.Do(req) - if err != nil { - return fmt.Errorf("failed to send data to Elasticsearch: %v", err) - } - defer resp.Body.Close() - - body, err := io.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf("failed to read response body: %v", err) - } - - fmt.Printf("HTTP Response Status: %s\n", resp.Status) - fmt.Printf("HTTP Response Body: %s\n", string(body)) - - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { - return fmt.Errorf("unexpected status code: %d, response: %s", resp.StatusCode, string(body)) - } - - fmt.Printf("Successfully sent record to Elasticsearch: %s\n", fullURL) } return nil