索引 ilm

This commit is contained in:
zhangkun9038@dingtalk.com 2025-03-29 19:53:31 +08:00
parent f93f42f1d7
commit 78fd73aec0
4 changed files with 99 additions and 69 deletions

View File

@ -1,5 +1,3 @@
<<<<<<< HEAD
=======
{ {
"policy": { "policy": {
"phases": { "phases": {
@ -45,5 +43,4 @@
} }
} }
} }
>>>>>>> Snippet

View File

@ -1,9 +1,5 @@
<<<<<<< HEAD
=======
host ${ENV['ELASTICSEARCH_HOST'] || "elasticsearch"} host ${ENV['ELASTICSEARCH_HOST'] || "elasticsearch"}
port ${ENV['ELASTICSEARCH_PORT'] || 9200} port ${ENV['ELASTICSEARCH_PORT'] || 9200}
scheme ${ENV['ELASTICSEARCH_SCHEME'] || "http"} scheme ${ENV['ELASTICSEARCH_SCHEME'] || "http"}
user ${ENV['ELASTICSEARCH_USER'] || "fluentd_user"} user ${ENV['ELASTICSEARCH_USER'] || "fluentd_user"}
password ${ENV['ELASTICSEARCH_PASSWORD'] || "fluentd_password"} password ${ENV['ELASTICSEARCH_PASSWORD'] || "fluentd_password"}
>>>>>>> Snippet

View File

@ -4,14 +4,13 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
cfg "gitea.zjmud.xyz/phyer/tanya/config" // 导入你的 config 包
"io" "io"
"math" "math"
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
"time" "time"
cfg "gitea.zjmud.xyz/phyer/tanya/config" // 导入你的 config 包
) )
// RetentionCalculator 计算保留时间的接口 // RetentionCalculator 计算保留时间的接口
@ -97,8 +96,7 @@ func getPhase(daysDiff, retentionDays int) string {
fmt.Printf("[ILM Phase] 数据在7天到2年之间判定为warm阶段。daysDiff: %d\n", daysDiff) fmt.Printf("[ILM Phase] 数据在7天到2年之间判定为warm阶段。daysDiff: %d\n", daysDiff)
return "warm" return "warm"
} }
func ensureAlias(client *http.Client, esConfig cfg.ElasticsearchConfig, alias, period, dataType, coinPair string, indexTime time.Time) error {
func ensureAlias(client *http.Client, esConfig cfg.ElasticsearchConfig, alias, period string) error {
// 获取当前别名关联的索引 // 获取当前别名关联的索引
getAliasURL := fmt.Sprintf("%s/_alias/%s", esConfig.URL, alias) getAliasURL := fmt.Sprintf("%s/_alias/%s", esConfig.URL, alias)
req, err := http.NewRequest("GET", getAliasURL, nil) req, err := http.NewRequest("GET", getAliasURL, nil)
@ -141,23 +139,8 @@ func ensureAlias(client *http.Client, esConfig cfg.ElasticsearchConfig, alias, p
} }
// 确定最新的索引作为写入索引 // 确定最新的索引作为写入索引
var latestIndex string year, month := indexTime.Year(), int(indexTime.Month())
var latestTime time.Time latestIndex := fmt.Sprintf("logstash-%s.%s.%s.%d-%02d", strings.ToLower(period), strings.ToLower(dataType), strings.ToLower(coinPair), year, month)
for indexName := range aliasInfo {
parts := strings.Split(indexName, ".")
if len(parts) < 3 {
continue
}
datePart := parts[len(parts)-1] // 例如 2025-03
indexTime, err := time.Parse("2006-01", datePart)
if err != nil {
continue
}
if latestIndex == "" || indexTime.After(latestTime) {
latestIndex = indexName
latestTime = indexTime
}
}
// 如果没有找到合适的索引,创建一个新的索引模式 // 如果没有找到合适的索引,创建一个新的索引模式
if latestIndex == "" { if latestIndex == "" {
@ -267,7 +250,7 @@ func ensureAlias(client *http.Client, esConfig cfg.ElasticsearchConfig, alias, p
return nil return nil
} }
func ConfigureILM(client *http.Client, config *cfg.Config, dataType, period string, indexTime time.Time) error { func ConfigureILM(client *http.Client, config *cfg.Config, dataType, period, coinPair string, indexTime time.Time) error {
esConfig := config.Elasticsearch esConfig := config.Elasticsearch
dataConfig, ok := esConfig.ILM.DataTypes[dataType] dataConfig, ok := esConfig.ILM.DataTypes[dataType]
if !ok { if !ok {
@ -291,10 +274,10 @@ func ConfigureILM(client *http.Client, config *cfg.Config, dataType, period stri
// 格式化策略名称、模板名称、索引模式和别名 // 格式化策略名称、模板名称、索引模式和别名
year, month := indexTime.Year(), int(indexTime.Month()) year, month := indexTime.Year(), int(indexTime.Month())
policyName := fmt.Sprintf("logstash_%s_%s_%d_%02d", dataType, period, year, month) policyName := fmt.Sprintf("logstash_%s_%s_%d_%02d", strings.ToLower(dataType), strings.ToLower(period), year, month)
templateName := fmt.Sprintf("log_stash_%s_%s_%d_%02d", dataType, period, year, month) templateName := fmt.Sprintf("logstash_%s_%s_%d_%02d", dataType, period, year, month)
indexPattern := fmt.Sprintf(dataConfig.IndexPattern, period, year, month) indexPattern := fmt.Sprintf("logstash-%s.%s.*.%d-%02d", strings.ToLower(period), strings.ToLower(dataType), year, month)
aliasName := fmt.Sprintf("%s_alias", policyName) aliasName := fmt.Sprintf("logstash-%s.%s.%s.%d-%02d_alias", strings.ToLower(period), strings.ToLower(dataType), strings.ToLower(coinPair), year, month)
fmt.Printf("[ConfigureILM] Configuring ILM with policyName: %s, templateName: %s, aliasName: %s\n", policyName, templateName, aliasName) fmt.Printf("[ConfigureILM] Configuring ILM with policyName: %s, templateName: %s, aliasName: %s\n", policyName, templateName, aliasName)
@ -307,11 +290,15 @@ func ConfigureILM(client *http.Client, config *cfg.Config, dataType, period stri
"number_of_shards": dataConfig.Shards, "number_of_shards": dataConfig.Shards,
"number_of_replicas": dataConfig.Replicas, "number_of_replicas": dataConfig.Replicas,
"index.lifecycle.name": policyName, "index.lifecycle.name": policyName,
"index.lifecycle.rollover_alias": aliasName, // 确保与 aliasName 一致 "index.lifecycle.rollover_alias": aliasName,
}, },
"mappings": map[string]interface{}{ "mappings": map[string]interface{}{
"properties": map[string]interface{}{ "properties": map[string]interface{}{
"dataTime": map[string]string{"type": "date", "format": "yyyy-MM-dd HH:mm:ss"}, "dataTime": map[string]string{"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
// 根据不同的dataType保留通用字段
"coinPair": map[string]string{"type": "keyword"},
"period": map[string]string{"type": "keyword"},
// 以下字段仅适用于candle类型其他数据类型可能需要不同字段
"open": map[string]string{"type": "float"}, "open": map[string]string{"type": "float"},
"high": map[string]string{"type": "float"}, "high": map[string]string{"type": "float"},
"low": map[string]string{"type": "float"}, "low": map[string]string{"type": "float"},
@ -348,12 +335,13 @@ func ConfigureILM(client *http.Client, config *cfg.Config, dataType, period stri
fmt.Printf("[ConfigureILM] Successfully configured ILM template: %s\n", templateName) fmt.Printf("[ConfigureILM] Successfully configured ILM template: %s\n", templateName)
// 创建或更新 ILM 策略 // 创建或更新 ILM 策略
if err := ensureILMPolicy(client, esConfig, policyName, dataType, period, daysDiff, retentionDays, dataConfig); err != nil { if err := ensureILMPolicy(client, esConfig, policyName, aliasName, period, dataType, daysDiff, retentionDays, dataConfig, indexTime, coinPair); err != nil {
return fmt.Errorf("failed to ensure ILM policy: %v", err) return fmt.Errorf("failed to ensure ILM policy: %v", err)
} }
// 设置别名 // 设置别名,确保每次调用时传入正确的 aliasName 和 indexTime
if err := ensureAlias(client, esConfig, aliasName, period); err != nil { //
if err := ensureAlias(client, esConfig, aliasName, period, dataType, coinPair, indexTime); err != nil {
return fmt.Errorf("failed to ensure alias: %v", err) return fmt.Errorf("failed to ensure alias: %v", err)
} }
@ -361,7 +349,7 @@ func ConfigureILM(client *http.Client, config *cfg.Config, dataType, period stri
return nil return nil
} }
func ensureILMPolicy(client *http.Client, esConfig cfg.ElasticsearchConfig, policyName, dataType, period string, daysDiff, retentionDays int, dataConfig cfg.DataTypeConfig) error { func ensureILMPolicy(client *http.Client, esConfig cfg.ElasticsearchConfig, policyName, aliasName, period, dataType string, daysDiff, retentionDays int, dataConfig cfg.DataTypeConfig, indexTime time.Time, coinPair string) error {
// 检查 default_policy 是否存在 // 检查 default_policy 是否存在
defaultPolicyURL := fmt.Sprintf("%s/_ilm/policy/default_policy", esConfig.URL) defaultPolicyURL := fmt.Sprintf("%s/_ilm/policy/default_policy", esConfig.URL)
resp, err := client.Get(defaultPolicyURL) resp, err := client.Get(defaultPolicyURL)
@ -426,17 +414,68 @@ func ensureILMPolicy(client *http.Client, esConfig cfg.ElasticsearchConfig, poli
if resp.StatusCode == http.StatusOK { if resp.StatusCode == http.StatusOK {
fmt.Printf("[ILM Policy] Policy %s already exists, updating index settings...\n", policyName) fmt.Printf("[ILM Policy] Policy %s already exists, updating index settings...\n", policyName)
// 提取年份和月份
year, month := indexTime.Year(), int(indexTime.Month())
indexName := fmt.Sprintf("logstash-%s.candle.%s.%d-%02d", strings.ToLower(period), strings.ToLower(coinPair), year, month)
// 首先检查索引是否存在
indexURL := fmt.Sprintf("%s/%s", esConfig.URL, indexName)
req, err := http.NewRequest("HEAD", indexURL, nil)
if err != nil {
return fmt.Errorf("failed to create index check request: %v", err)
}
req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password)
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to check index existence: %v", err)
}
defer resp.Body.Close()
// 如果索引不存在,创建它
if resp.StatusCode == http.StatusNotFound {
createIndexURL := fmt.Sprintf("%s/%s", esConfig.URL, indexName)
indexData := map[string]interface{}{
"settings": map[string]interface{}{
"number_of_shards": 2,
"number_of_replicas": 1,
},
}
data, err := json.Marshal(indexData)
if err != nil {
return fmt.Errorf("failed to marshal index creation data: %v", err)
}
req, err = http.NewRequest("PUT", createIndexURL, bytes.NewBuffer(data))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password)
resp, err = client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("failed to create index %s, status: %d, response: %s", indexName, resp.StatusCode, string(body))
}
fmt.Printf("Successfully created index: %s\n", indexName)
}
// 更新索引设置 // 更新索引设置
updateIndexURL := fmt.Sprintf("%s/logstash-%s.%s.*/_settings", esConfig.URL, period, dataType) updateIndexURL := fmt.Sprintf("%s/%s/_settings", esConfig.URL, indexName)
updateData := map[string]interface{}{ updateData := map[string]interface{}{
"index.lifecycle.name": "default_policy", "index.lifecycle.name": policyName,
"index.lifecycle.rollover_alias": fmt.Sprintf("logs-%s-%s-candle", dataType, period), "index.lifecycle.rollover_alias": fmt.Sprintf("logstash-%s.candle.%s.%d-%02d_alias", period, coinPair, year, month),
} }
updateDataBytes, err := json.Marshal(updateData) updateDataBytes, err := json.Marshal(updateData)
if err != nil { if err != nil {
return fmt.Errorf("failed to marshal update index data: %v", err) return fmt.Errorf("failed to marshal update index data: %v", err)
} }
req, err := http.NewRequest("PUT", updateIndexURL, bytes.NewBuffer(updateDataBytes)) req, err = http.NewRequest("PUT", updateIndexURL, bytes.NewBuffer(updateDataBytes))
if err != nil { if err != nil {
return fmt.Errorf("failed to create update index request: %v", err) return fmt.Errorf("failed to create update index request: %v", err)
} }
@ -453,29 +492,16 @@ func ensureILMPolicy(client *http.Client, esConfig cfg.ElasticsearchConfig, poli
} }
fmt.Printf("[ILM Policy] Successfully updated index settings for policy: %s\n", policyName) fmt.Printf("[ILM Policy] Successfully updated index settings for policy: %s\n", policyName)
// 删除旧策略 fmt.Printf("[ILM Policy] Policy %s already exists, attempting to update...\n", policyName)
req, err = http.NewRequest("DELETE", policyURL, nil)
if err != nil {
return fmt.Errorf("failed to create delete policy request: %v", err)
}
req.Header.Set("Content-Type", "application/json")
req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password)
deleteResp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to delete old policy: %v", err)
}
defer deleteResp.Body.Close()
if deleteResp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(deleteResp.Body)
return fmt.Errorf("failed to delete old policy, status: %d, response: %s", deleteResp.StatusCode, string(body))
}
fmt.Printf("[ILM Policy] Successfully deleted old policy: %s\n", policyName)
} else { } else {
fmt.Printf("[ILM Policy] Policy %s does not exist, creating a new one...\n", policyName) fmt.Printf("[ILM Policy] Policy %s does not exist, creating a new one...\n", policyName)
} }
// 创建 ILM 策略 // 创建/更新 ILM 策略
initialPhase := getPhase(daysDiff, retentionDays) initialPhase := getPhase(daysDiff, retentionDays)
// 当策略已存在时Elasticsearch 允许通过 PUT 请求直接更新策略
// 注意:某些策略修改可能需要满足条件(如不能缩短 min_age 时间)
fmt.Printf("[ILM Policy] Initial phase for policy %s is: %s\n", policyName, initialPhase) fmt.Printf("[ILM Policy] Initial phase for policy %s is: %s\n", policyName, initialPhase)
rolloverActions := map[string]interface{}{ rolloverActions := map[string]interface{}{
@ -573,8 +599,17 @@ func ensureILMPolicy(client *http.Client, esConfig cfg.ElasticsearchConfig, poli
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body) body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("failed to create ILM policy, status: %d, response: %s", resp.StatusCode, string(body)) return fmt.Errorf("failed to create/update ILM policy, status: %d, response: %s", resp.StatusCode, string(body))
} }
fmt.Printf("[ILM Policy] Successfully created or updated ILM policy: %s\n", policyName)
// 记录策略更新时间
action := "updated"
if resp.StatusCode == http.StatusCreated {
action = "created"
}
fmt.Printf("[ILM Policy] Successfully %s ILM policy: %s (modified: %v)\n",
action,
policyName,
time.Now().Format("2006-01-02 15:04:05"))
return nil return nil
} }

View File

@ -265,15 +265,14 @@ func (cl *CandleList) ToFluentd() error {
// 输出完整的响应信息 // 输出完整的响应信息
fmt.Printf("HTTP Response Status: %s\n", resp.Status) fmt.Printf("HTTP Response Status: %s\n", resp.Status)
fmt.Printf("HTTP Response Headers: %v\n", resp.Header) fmt.Printf("HTTP Response Headers: %v\n", resp.Header)
fmt.Printf("HTTP Response Body: %s\n", string(body)) // fmt.Printf("HTTP Response Body: %s\n", string(body))
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %d, response: %s", resp.StatusCode, string(body)) return fmt.Errorf("unexpected status code: %d, response: %s", resp.StatusCode, string(body))
} }
fmt.Printf("Successfully sent record to Fluentd: %s\n", fullURL)
} }
fmt.Printf("Successfully sent record to Fluentd count:", len(cl.Candles))
return nil return nil
} }
@ -427,7 +426,7 @@ func (cl *CandleList) ToElastic() error {
cstTime := time.UnixMilli(ts).In(loc) cstTime := time.UnixMilli(ts).In(loc)
index := fmt.Sprintf("logstash-%s.candle.%s.%d-%02d", index := fmt.Sprintf("logstash-%s.candle.%s.%d-%02d",
strings.ToLower(cl.Period), strings.ToLower(cl.Period),
strings.ToLower(cl.CoinPair), strings.ToLower(cl.CoinPair), // Keep the hyphen in coin pair name
cstTime.Year(), cstTime.Year(),
int(cstTime.Month()), int(cstTime.Month()),
) )
@ -447,7 +446,7 @@ func (cl *CandleList) ToElastic() error {
// 为每个月的索引配置 ILM 并插入数据 // 为每个月的索引配置 ILM 并插入数据
for _, group := range indexMap { for _, group := range indexMap {
// 配置 ILM使用该月第一条数据的时间 // 配置 ILM使用该月第一条数据的时间
err = elasticilm.ConfigureILM(client, config, "candle", strings.ToLower(cl.Period), group.IndexTime) err = elasticilm.ConfigureILM(client, config, "candle", strings.ToLower(cl.Period), cl.CoinPair, group.IndexTime)
if err != nil { if err != nil {
return fmt.Errorf("failed to configure ILM for index %s: %v", group.Index, err) return fmt.Errorf("failed to configure ILM for index %s: %v", group.Index, err)
} }
@ -525,6 +524,8 @@ func (cl *CandleList) ToElastic() error {
record := map[string]interface{}{ record := map[string]interface{}{
"dataTime": cstTime.Format("2006-01-02 15:04:05"), "dataTime": cstTime.Format("2006-01-02 15:04:05"),
"coinPair": cl.CoinPair,
"period": cl.Period,
"open": candle.Open, "open": candle.Open,
"high": candle.High, "high": candle.High,
"low": candle.Low, "low": candle.Low,
@ -558,10 +559,11 @@ func (cl *CandleList) ToElastic() error {
return fmt.Errorf("unexpected status code: %d, response: %s", resp.StatusCode, string(body)) return fmt.Errorf("unexpected status code: %d, response: %s", resp.StatusCode, string(body))
} }
fmt.Printf("Successfully sent record to Elasticsearch: %s\n", fullURL)
} }
} }
fmt.Printf("Successfully sent record to Elasticsearch")
return nil return nil
} }