diff --git a/okx/candleList.go b/okx/candleList.go index 30e1de4..1bb9fcf 100644 --- a/okx/candleList.go +++ b/okx/candleList.go @@ -156,62 +156,50 @@ func (cl *CandleList) ToFluentd() error { return fmt.Errorf("invalid period: %v", err) } - // 分批发送,每批最多50条 - batchSize := 50 - for i := 0; i < len(cl.Candles); i += batchSize { - end := i + batchSize - if end > len(cl.Candles) { - end = len(cl.Candles) + // 逐条发送记录 + for _, candle := range cl.Candles { + // 验证时间戳是否为周期的整数倍 + ts, err := strconv.ParseInt(candle.Timestamp, 10, 64) + if err != nil { + fmt.Println("invalid timestamp:", ts, err) + continue } - // 准备批量数据 - var records []map[string]interface{} - for _, candle := range cl.Candles[i:end] { - // 验证时间戳是否为周期的整数倍 - ts, err := strconv.ParseInt(candle.Timestamp, 10, 64) - if err != nil { - fmt.Println("invalid timestamp:", ts, err) - continue + // 转换为东八区时间 + loc, _ := time.LoadLocation("Asia/Shanghai") + cstTime := time.UnixMilli(ts).In(loc) + + // 对于日线数据,检查是否为当天的 00:00:00 + if cl.Period == "1D" { + if cstTime.Hour() != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 { + return fmt.Errorf("timestamp %d (%s) is not aligned with period :%s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period) } - - // 转换为东八区时间 - loc, _ := time.LoadLocation("Asia/Shanghai") - cstTime := time.UnixMilli(ts).In(loc) - - // 对于日线数据,检查是否为当天的 00:00:00 - if cl.Period == "1D" { - if cstTime.Hour() != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 { - return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period) - } - } else { - // 对于其他周期,使用原来的对齐检查 - if cstTime.UnixMilli()%duration.Milliseconds() != 0 { - return fmt.Errorf("timestamp %d is not aligned with period %s", ts, cl.Period) - } + } else { + // 对于其他周期,使用原来的对齐检查 + if cstTime.UnixMilli()%duration.Milliseconds() != 0 { + return fmt.Errorf("timestamp %d is not aligned with period %s", ts, cl.Period) } + } - // 格式化时间 - formattedTime := cstTime.Format("2006-01-02 15:04:05") + // 格式化时间 + formattedTime := cstTime.Format("2006-01-02 15:04:05") - // 构造记录 - record := map[string]interface{}{ - "_id": ts, // 使用时间戳作为_id - "dataTime": formattedTime, - "open": candle.Open, - "high": candle.High, - "low": candle.Low, - "close": candle.Close, - "volume": candle.Volume, - "volumeCcy": candle.VolumeCcy, - } - - records = append(records, record) + // 构造记录 + record := map[string]interface{}{ + "_id": ts, // 使用时间戳作为_id + "dataTime": formattedTime, + "open": candle.Open, + "high": candle.High, + "low": candle.Low, + "close": candle.Close, + "volume": candle.Volume, + "volumeCcy": candle.VolumeCcy, } // 构造请求体 payload := map[string]interface{}{ "tag": tag, - "record": records, + "record": record, } jsonData, err := json.Marshal(payload) @@ -281,9 +269,8 @@ func (cl *CandleList) ToFluentd() error { if resp.StatusCode != http.StatusOK { return fmt.Errorf("unexpected status code: %d, response: %s", resp.StatusCode, string(body)) } - //回头把response列出来看看,是不是有报错 - fmt.Printf("Successfully sent %d records to Fluentd\n", len(records)) + fmt.Printf("Successfully sent record to Fluentd: %s\n", fullURL) } return nil