由于fluentd.conf配置的原因,目前ToFluentd不可用,以后能配置出来支持自定义格式的index到es的fluentd.conf再说

This commit is contained in:
zhangkun9038@dingtalk.com 2025-03-13 17:40:28 +08:00
parent 20a5d67d34
commit 8f519aa103

View File

@ -156,62 +156,50 @@ func (cl *CandleList) ToFluentd() error {
return fmt.Errorf("invalid period: %v", err) return fmt.Errorf("invalid period: %v", err)
} }
// 分批发送每批最多50条 // 逐条发送记录
batchSize := 50 for _, candle := range cl.Candles {
for i := 0; i < len(cl.Candles); i += batchSize { // 验证时间戳是否为周期的整数倍
end := i + batchSize ts, err := strconv.ParseInt(candle.Timestamp, 10, 64)
if end > len(cl.Candles) { if err != nil {
end = len(cl.Candles) fmt.Println("invalid timestamp:", ts, err)
continue
} }
// 准备批量数据 // 转换为东八区时间
var records []map[string]interface{} loc, _ := time.LoadLocation("Asia/Shanghai")
for _, candle := range cl.Candles[i:end] { cstTime := time.UnixMilli(ts).In(loc)
// 验证时间戳是否为周期的整数倍
ts, err := strconv.ParseInt(candle.Timestamp, 10, 64) // 对于日线数据,检查是否为当天的 00:00:00
if err != nil { if cl.Period == "1D" {
fmt.Println("invalid timestamp:", ts, err) if cstTime.Hour() != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 {
continue return fmt.Errorf("timestamp %d (%s) is not aligned with period :%s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period)
} }
} else {
// 转换为东八区时间 // 对于其他周期,使用原来的对齐检查
loc, _ := time.LoadLocation("Asia/Shanghai") if cstTime.UnixMilli()%duration.Milliseconds() != 0 {
cstTime := time.UnixMilli(ts).In(loc) return fmt.Errorf("timestamp %d is not aligned with period %s", ts, cl.Period)
// 对于日线数据,检查是否为当天的 00:00:00
if cl.Period == "1D" {
if cstTime.Hour() != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 {
return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period)
}
} else {
// 对于其他周期,使用原来的对齐检查
if cstTime.UnixMilli()%duration.Milliseconds() != 0 {
return fmt.Errorf("timestamp %d is not aligned with period %s", ts, cl.Period)
}
} }
}
// 格式化时间 // 格式化时间
formattedTime := cstTime.Format("2006-01-02 15:04:05") formattedTime := cstTime.Format("2006-01-02 15:04:05")
// 构造记录 // 构造记录
record := map[string]interface{}{ record := map[string]interface{}{
"_id": ts, // 使用时间戳作为_id "_id": ts, // 使用时间戳作为_id
"dataTime": formattedTime, "dataTime": formattedTime,
"open": candle.Open, "open": candle.Open,
"high": candle.High, "high": candle.High,
"low": candle.Low, "low": candle.Low,
"close": candle.Close, "close": candle.Close,
"volume": candle.Volume, "volume": candle.Volume,
"volumeCcy": candle.VolumeCcy, "volumeCcy": candle.VolumeCcy,
}
records = append(records, record)
} }
// 构造请求体 // 构造请求体
payload := map[string]interface{}{ payload := map[string]interface{}{
"tag": tag, "tag": tag,
"record": records, "record": record,
} }
jsonData, err := json.Marshal(payload) jsonData, err := json.Marshal(payload)
@ -281,9 +269,8 @@ func (cl *CandleList) ToFluentd() error {
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %d, response: %s", resp.StatusCode, string(body)) return fmt.Errorf("unexpected status code: %d, response: %s", resp.StatusCode, string(body))
} }
//回头把response列出来看看,是不是有报错
fmt.Printf("Successfully sent %d records to Fluentd\n", len(records)) fmt.Printf("Successfully sent record to Fluentd: %s\n", fullURL)
} }
return nil return nil