diff --git a/config/default_policy.json b/config/default_policy.json index 4d0461d..84542c4 100644 --- a/config/default_policy.json +++ b/config/default_policy.json @@ -1,5 +1,3 @@ -<<<<<<< HEAD -======= { "policy": { "phases": { @@ -45,5 +43,4 @@ } } } ->>>>>>> Snippet diff --git a/config/fluentd.yaml b/config/fluentd.yaml index e0f853e..03bd8dc 100644 --- a/config/fluentd.yaml +++ b/config/fluentd.yaml @@ -1,9 +1,5 @@ - -<<<<<<< HEAD -======= host ${ENV['ELASTICSEARCH_HOST'] || "elasticsearch"} port ${ENV['ELASTICSEARCH_PORT'] || 9200} scheme ${ENV['ELASTICSEARCH_SCHEME'] || "http"} user ${ENV['ELASTICSEARCH_USER'] || "fluentd_user"} password ${ENV['ELASTICSEARCH_PASSWORD'] || "fluentd_password"} ->>>>>>> Snippet diff --git a/elasticilm/ilm.go b/elasticilm/ilm.go index 179169d..0d65811 100644 --- a/elasticilm/ilm.go +++ b/elasticilm/ilm.go @@ -4,14 +4,13 @@ import ( "bytes" "encoding/json" "fmt" + cfg "gitea.zjmud.xyz/phyer/tanya/config" // 导入你的 config 包 "io" "math" "net/http" "strconv" "strings" "time" - - cfg "gitea.zjmud.xyz/phyer/tanya/config" // 导入你的 config 包 ) // RetentionCalculator 计算保留时间的接口 @@ -97,8 +96,7 @@ func getPhase(daysDiff, retentionDays int) string { fmt.Printf("[ILM Phase] 数据在7天到2年之间,判定为warm阶段。daysDiff: %d\n", daysDiff) return "warm" } - -func ensureAlias(client *http.Client, esConfig cfg.ElasticsearchConfig, alias, period string) error { +func ensureAlias(client *http.Client, esConfig cfg.ElasticsearchConfig, alias, period, dataType, coinPair string, indexTime time.Time) error { // 获取当前别名关联的索引 getAliasURL := fmt.Sprintf("%s/_alias/%s", esConfig.URL, alias) req, err := http.NewRequest("GET", getAliasURL, nil) @@ -141,23 +139,8 @@ func ensureAlias(client *http.Client, esConfig cfg.ElasticsearchConfig, alias, p } // 确定最新的索引作为写入索引 - var latestIndex string - var latestTime time.Time - for indexName := range aliasInfo { - parts := strings.Split(indexName, ".") - if len(parts) < 3 { - continue - } - datePart := parts[len(parts)-1] // 例如 2025-03 - indexTime, err := time.Parse("2006-01", datePart) - if err != nil { - continue - } - if latestIndex == "" || indexTime.After(latestTime) { - latestIndex = indexName - latestTime = indexTime - } - } + year, month := indexTime.Year(), int(indexTime.Month()) + latestIndex := fmt.Sprintf("logstash-%s.%s.%s.%d-%02d", strings.ToLower(period), strings.ToLower(dataType), strings.ToLower(coinPair), year, month) // 如果没有找到合适的索引,创建一个新的索引模式 if latestIndex == "" { @@ -267,7 +250,7 @@ func ensureAlias(client *http.Client, esConfig cfg.ElasticsearchConfig, alias, p return nil } -func ConfigureILM(client *http.Client, config *cfg.Config, dataType, period string, indexTime time.Time) error { +func ConfigureILM(client *http.Client, config *cfg.Config, dataType, period, coinPair string, indexTime time.Time) error { esConfig := config.Elasticsearch dataConfig, ok := esConfig.ILM.DataTypes[dataType] if !ok { @@ -291,10 +274,10 @@ func ConfigureILM(client *http.Client, config *cfg.Config, dataType, period stri // 格式化策略名称、模板名称、索引模式和别名 year, month := indexTime.Year(), int(indexTime.Month()) - policyName := fmt.Sprintf("logstash_%s_%s_%d_%02d", dataType, period, year, month) - templateName := fmt.Sprintf("log_stash_%s_%s_%d_%02d", dataType, period, year, month) - indexPattern := fmt.Sprintf(dataConfig.IndexPattern, period, year, month) - aliasName := fmt.Sprintf("%s_alias", policyName) + policyName := fmt.Sprintf("logstash_%s_%s_%d_%02d", strings.ToLower(dataType), strings.ToLower(period), year, month) + templateName := fmt.Sprintf("logstash_%s_%s_%d_%02d", dataType, period, year, month) + indexPattern := fmt.Sprintf("logstash-%s.%s.*.%d-%02d", strings.ToLower(period), strings.ToLower(dataType), year, month) + aliasName := fmt.Sprintf("logstash-%s.%s.%s.%d-%02d_alias", strings.ToLower(period), strings.ToLower(dataType), strings.ToLower(coinPair), year, month) fmt.Printf("[ConfigureILM] Configuring ILM with policyName: %s, templateName: %s, aliasName: %s\n", policyName, templateName, aliasName) @@ -307,11 +290,15 @@ func ConfigureILM(client *http.Client, config *cfg.Config, dataType, period stri "number_of_shards": dataConfig.Shards, "number_of_replicas": dataConfig.Replicas, "index.lifecycle.name": policyName, - "index.lifecycle.rollover_alias": aliasName, // 确保与 aliasName 一致 + "index.lifecycle.rollover_alias": aliasName, }, "mappings": map[string]interface{}{ "properties": map[string]interface{}{ - "dataTime": map[string]string{"type": "date", "format": "yyyy-MM-dd HH:mm:ss"}, + "dataTime": map[string]string{"type": "date", "format": "yyyy-MM-dd HH:mm:ss"}, + // 根据不同的dataType保留通用字段 + "coinPair": map[string]string{"type": "keyword"}, + "period": map[string]string{"type": "keyword"}, + // 以下字段仅适用于candle类型,其他数据类型可能需要不同字段 "open": map[string]string{"type": "float"}, "high": map[string]string{"type": "float"}, "low": map[string]string{"type": "float"}, @@ -348,12 +335,13 @@ func ConfigureILM(client *http.Client, config *cfg.Config, dataType, period stri fmt.Printf("[ConfigureILM] Successfully configured ILM template: %s\n", templateName) // 创建或更新 ILM 策略 - if err := ensureILMPolicy(client, esConfig, policyName, dataType, period, daysDiff, retentionDays, dataConfig); err != nil { + if err := ensureILMPolicy(client, esConfig, policyName, aliasName, period, dataType, daysDiff, retentionDays, dataConfig, indexTime, coinPair); err != nil { return fmt.Errorf("failed to ensure ILM policy: %v", err) } - // 设置别名 - if err := ensureAlias(client, esConfig, aliasName, period); err != nil { + // 设置别名,确保每次调用时传入正确的 aliasName 和 indexTime + // + if err := ensureAlias(client, esConfig, aliasName, period, dataType, coinPair, indexTime); err != nil { return fmt.Errorf("failed to ensure alias: %v", err) } @@ -361,7 +349,7 @@ func ConfigureILM(client *http.Client, config *cfg.Config, dataType, period stri return nil } -func ensureILMPolicy(client *http.Client, esConfig cfg.ElasticsearchConfig, policyName, dataType, period string, daysDiff, retentionDays int, dataConfig cfg.DataTypeConfig) error { +func ensureILMPolicy(client *http.Client, esConfig cfg.ElasticsearchConfig, policyName, aliasName, period, dataType string, daysDiff, retentionDays int, dataConfig cfg.DataTypeConfig, indexTime time.Time, coinPair string) error { // 检查 default_policy 是否存在 defaultPolicyURL := fmt.Sprintf("%s/_ilm/policy/default_policy", esConfig.URL) resp, err := client.Get(defaultPolicyURL) @@ -426,17 +414,68 @@ func ensureILMPolicy(client *http.Client, esConfig cfg.ElasticsearchConfig, poli if resp.StatusCode == http.StatusOK { fmt.Printf("[ILM Policy] Policy %s already exists, updating index settings...\n", policyName) + // 提取年份和月份 + year, month := indexTime.Year(), int(indexTime.Month()) + indexName := fmt.Sprintf("logstash-%s.candle.%s.%d-%02d", strings.ToLower(period), strings.ToLower(coinPair), year, month) + + // 首先检查索引是否存在 + indexURL := fmt.Sprintf("%s/%s", esConfig.URL, indexName) + req, err := http.NewRequest("HEAD", indexURL, nil) + if err != nil { + return fmt.Errorf("failed to create index check request: %v", err) + } + req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to check index existence: %v", err) + } + defer resp.Body.Close() + + // 如果索引不存在,创建它 + if resp.StatusCode == http.StatusNotFound { + createIndexURL := fmt.Sprintf("%s/%s", esConfig.URL, indexName) + indexData := map[string]interface{}{ + "settings": map[string]interface{}{ + "number_of_shards": 2, + "number_of_replicas": 1, + }, + } + data, err := json.Marshal(indexData) + if err != nil { + return fmt.Errorf("failed to marshal index creation data: %v", err) + } + + req, err = http.NewRequest("PUT", createIndexURL, bytes.NewBuffer(data)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) + + resp, err = client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("failed to create index %s, status: %d, response: %s", indexName, resp.StatusCode, string(body)) + } + fmt.Printf("Successfully created index: %s\n", indexName) + } + // 更新索引设置 - updateIndexURL := fmt.Sprintf("%s/logstash-%s.%s.*/_settings", esConfig.URL, period, dataType) + updateIndexURL := fmt.Sprintf("%s/%s/_settings", esConfig.URL, indexName) updateData := map[string]interface{}{ - "index.lifecycle.name": "default_policy", - "index.lifecycle.rollover_alias": fmt.Sprintf("logs-%s-%s-candle", dataType, period), + "index.lifecycle.name": policyName, + "index.lifecycle.rollover_alias": fmt.Sprintf("logstash-%s.candle.%s.%d-%02d_alias", period, coinPair, year, month), } updateDataBytes, err := json.Marshal(updateData) if err != nil { return fmt.Errorf("failed to marshal update index data: %v", err) } - req, err := http.NewRequest("PUT", updateIndexURL, bytes.NewBuffer(updateDataBytes)) + req, err = http.NewRequest("PUT", updateIndexURL, bytes.NewBuffer(updateDataBytes)) if err != nil { return fmt.Errorf("failed to create update index request: %v", err) } @@ -453,29 +492,16 @@ func ensureILMPolicy(client *http.Client, esConfig cfg.ElasticsearchConfig, poli } fmt.Printf("[ILM Policy] Successfully updated index settings for policy: %s\n", policyName) - // 删除旧策略 - req, err = http.NewRequest("DELETE", policyURL, nil) - if err != nil { - return fmt.Errorf("failed to create delete policy request: %v", err) - } - req.Header.Set("Content-Type", "application/json") - req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) - deleteResp, err := client.Do(req) - if err != nil { - return fmt.Errorf("failed to delete old policy: %v", err) - } - defer deleteResp.Body.Close() - if deleteResp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(deleteResp.Body) - return fmt.Errorf("failed to delete old policy, status: %d, response: %s", deleteResp.StatusCode, string(body)) - } - fmt.Printf("[ILM Policy] Successfully deleted old policy: %s\n", policyName) + fmt.Printf("[ILM Policy] Policy %s already exists, attempting to update...\n", policyName) } else { fmt.Printf("[ILM Policy] Policy %s does not exist, creating a new one...\n", policyName) } - // 创建新的 ILM 策略 + // 创建/更新 ILM 策略 initialPhase := getPhase(daysDiff, retentionDays) + + // 当策略已存在时,Elasticsearch 允许通过 PUT 请求直接更新策略 + // 注意:某些策略修改可能需要满足条件(如不能缩短 min_age 时间) fmt.Printf("[ILM Policy] Initial phase for policy %s is: %s\n", policyName, initialPhase) rolloverActions := map[string]interface{}{ @@ -573,8 +599,17 @@ func ensureILMPolicy(client *http.Client, esConfig cfg.ElasticsearchConfig, poli defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("failed to create ILM policy, status: %d, response: %s", resp.StatusCode, string(body)) + return fmt.Errorf("failed to create/update ILM policy, status: %d, response: %s", resp.StatusCode, string(body)) } - fmt.Printf("[ILM Policy] Successfully created or updated ILM policy: %s\n", policyName) + + // 记录策略更新时间 + action := "updated" + if resp.StatusCode == http.StatusCreated { + action = "created" + } + fmt.Printf("[ILM Policy] Successfully %s ILM policy: %s (modified: %v)\n", + action, + policyName, + time.Now().Format("2006-01-02 15:04:05")) return nil } diff --git a/okx/candleList.go b/okx/candleList.go index 6e3268e..24f2529 100644 --- a/okx/candleList.go +++ b/okx/candleList.go @@ -265,15 +265,14 @@ func (cl *CandleList) ToFluentd() error { // 输出完整的响应信息 fmt.Printf("HTTP Response Status: %s\n", resp.Status) fmt.Printf("HTTP Response Headers: %v\n", resp.Header) - fmt.Printf("HTTP Response Body: %s\n", string(body)) + // fmt.Printf("HTTP Response Body: %s\n", string(body)) if resp.StatusCode != http.StatusOK { return fmt.Errorf("unexpected status code: %d, response: %s", resp.StatusCode, string(body)) } - fmt.Printf("Successfully sent record to Fluentd: %s\n", fullURL) } - + fmt.Printf("Successfully sent record to Fluentd count:", len(cl.Candles)) return nil } @@ -427,7 +426,7 @@ func (cl *CandleList) ToElastic() error { cstTime := time.UnixMilli(ts).In(loc) index := fmt.Sprintf("logstash-%s.candle.%s.%d-%02d", strings.ToLower(cl.Period), - strings.ToLower(cl.CoinPair), + strings.ToLower(cl.CoinPair), // Keep the hyphen in coin pair name cstTime.Year(), int(cstTime.Month()), ) @@ -447,7 +446,7 @@ func (cl *CandleList) ToElastic() error { // 为每个月的索引配置 ILM 并插入数据 for _, group := range indexMap { // 配置 ILM,使用该月第一条数据的时间 - err = elasticilm.ConfigureILM(client, config, "candle", strings.ToLower(cl.Period), group.IndexTime) + err = elasticilm.ConfigureILM(client, config, "candle", strings.ToLower(cl.Period), cl.CoinPair, group.IndexTime) if err != nil { return fmt.Errorf("failed to configure ILM for index %s: %v", group.Index, err) } @@ -525,6 +524,8 @@ func (cl *CandleList) ToElastic() error { record := map[string]interface{}{ "dataTime": cstTime.Format("2006-01-02 15:04:05"), + "coinPair": cl.CoinPair, + "period": cl.Period, "open": candle.Open, "high": candle.High, "low": candle.Low, @@ -558,10 +559,11 @@ func (cl *CandleList) ToElastic() error { return fmt.Errorf("unexpected status code: %d, response: %s", resp.StatusCode, string(body)) } - fmt.Printf("Successfully sent record to Elasticsearch: %s\n", fullURL) } } + fmt.Printf("Successfully sent record to Elasticsearch") + return nil }