ToElastic支持跨越插入数据时 只设置一次template的问题

This commit is contained in:
zhangkun9038@dingtalk.com 2025-03-28 00:16:09 +08:00
parent b0eb71283c
commit f1bd74c484

View File

@ -405,34 +405,25 @@ func (cl *CandleList) ToElastic() error {
return fmt.Errorf("failed to load config: %v", err)
}
// 如果没有数据,直接返回
if len(cl.Candles) == 0 {
return nil
}
// 使用第一条数据的时间戳配置 ILM
firstCandle := cl.Candles[0]
ts, err := strconv.ParseInt(firstCandle.Timestamp, 10, 64)
if err != nil {
return fmt.Errorf("invalid timestamp in first candle: %v", err)
// 按月分组数据
type monthlyCandles struct {
Index string
Candles []*Candle
IndexTime time.Time
}
indexMap := make(map[string]monthlyCandles)
loc, _ := time.LoadLocation("Asia/Shanghai")
cstTime := time.UnixMilli(ts).In(loc)
// 配置 ILM只执行一次
err = elasticilm.ConfigureILM(client, config, "candle", strings.ToLower(cl.Period), cstTime)
if err != nil {
return fmt.Errorf("failed to configure ILM: %v", err)
}
// 批量插入数据
for _, candle := range cl.Candles {
ts, err := strconv.ParseInt(candle.Timestamp, 10, 64)
if err != nil {
fmt.Println("invalid timestamp:", candle.Timestamp, err)
fmt.Printf("invalid timestamp: %s, skipping: %v\n", candle.Timestamp, err)
continue
}
cstTime := time.UnixMilli(ts).In(loc)
index := fmt.Sprintf("logstash-%s.candle.%s.%d-%02d",
strings.ToLower(cl.Period),
@ -441,63 +432,83 @@ func (cl *CandleList) ToElastic() error {
int(cstTime.Month()),
)
// 检查时间戳对齐
if cl.Period == "1D" {
if cstTime.Hour() != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 {
return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period)
}
if group, exists := indexMap[index]; exists {
group.Candles = append(group.Candles, candle)
indexMap[index] = group
} else {
if cstTime.UnixMilli()%duration.Milliseconds() != 0 {
return fmt.Errorf("timestamp %d is not aligned with period %s", ts, cl.Period)
indexMap[index] = monthlyCandles{
Index: index,
Candles: []*Candle{candle},
IndexTime: cstTime,
}
}
}
record := map[string]interface{}{
"dataTime": cstTime.Format("2006-01-02 15:04:05"),
"open": candle.Open,
"high": candle.High,
"low": candle.Low,
"close": candle.Close,
"volume": candle.Volume,
"volumeCcy": candle.VolumeCcy,
}
jsonData, err := json.Marshal(record)
// 为每个月的索引配置 ILM 并插入数据
for _, group := range indexMap {
// 配置 ILM使用该月第一条数据的时间
err = elasticilm.ConfigureILM(client, config, "candle", strings.ToLower(cl.Period), group.IndexTime)
if err != nil {
return fmt.Errorf("failed to marshal data: %v", err)
return fmt.Errorf("failed to configure ILM for index %s: %v", group.Index, err)
}
fullURL := fmt.Sprintf("%s/%s/_doc/%d", strings.TrimRight(config.Elasticsearch.URL, "/"), index, ts)
fmt.Println("fullURL: ", fullURL)
// 插入该月的所有数据
for _, candle := range group.Candles {
ts, err := strconv.ParseInt(candle.Timestamp, 10, 64)
if err != nil {
fmt.Printf("invalid timestamp: %s, skipping: %v\n", candle.Timestamp, err)
continue
}
cstTime := time.UnixMilli(ts).In(loc)
req, err := http.NewRequest("POST", fullURL, bytes.NewBuffer(jsonData))
if err != nil {
return fmt.Errorf("failed to create request: %v", err)
// 时间戳对齐检查
if cl.Period == "1D" {
if cstTime.Hour() != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 {
return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period)
}
} else {
if cstTime.UnixMilli()%duration.Milliseconds() != 0 {
return fmt.Errorf("timestamp %d is not aligned with period %s", ts, cl.Period)
}
}
record := map[string]interface{}{
"dataTime": cstTime.Format("2006-01-02 15:04:05"),
"open": candle.Open,
"high": candle.High,
"low": candle.Low,
"close": candle.Close,
"volume": candle.Volume,
"volumeCcy": candle.VolumeCcy,
}
jsonData, err := json.Marshal(record)
if err != nil {
return fmt.Errorf("failed to marshal data: %v", err)
}
fullURL := fmt.Sprintf("%s/%s/_doc/%d", strings.TrimRight(config.Elasticsearch.URL, "/"), group.Index, ts)
req, err := http.NewRequest("POST", fullURL, bytes.NewBuffer(jsonData))
if err != nil {
return fmt.Errorf("failed to create request: %v", err)
}
req.Header.Set("Content-Type", "application/json")
req.SetBasicAuth(config.Elasticsearch.Auth.Username, config.Elasticsearch.Auth.Password)
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to send data to Elasticsearch: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("unexpected status code: %d, response: %s", resp.StatusCode, string(body))
}
fmt.Printf("Successfully sent record to Elasticsearch: %s\n", fullURL)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
req.SetBasicAuth(config.Elasticsearch.Auth.Username, config.Elasticsearch.Auth.Password)
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to send data to Elasticsearch: %v", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response body: %v", err)
}
fmt.Printf("HTTP Response Status: %s\n", resp.Status)
fmt.Printf("HTTP Response Body: %s\n", string(body))
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
return fmt.Errorf("unexpected status code: %d, response: %s", resp.StatusCode, string(body))
}
fmt.Printf("Successfully sent record to Elasticsearch: %s\n", fullURL)
}
return nil