package elasticilm import ( "bytes" "encoding/json" "fmt" "io" "math" "net/http" "strconv" "strings" "time" cfg "gitea.zjmud.xyz/phyer/tanya/config" // 导入你的 config 包 ) // RetentionCalculator 计算保留时间的接口 type RetentionCalculator interface { Calculate(daysDiff int, period string) int } // DefaultRetentionCalculator 默认保留时间计算器 type DefaultRetentionCalculator struct { MaxRetention map[string]int MinRetention int } // Calculate 计算保留时间(天) func (c DefaultRetentionCalculator) Calculate(daysDiff int, period string) int { minutes := periodToMinutes(period) maxRetention := c.MaxRetention[period] if maxRetention == 0 { maxRetention = c.MaxRetention["default"] } // 非线性衰减公式 timeFactor := 1 - math.Sqrt(float64(daysDiff))/50 // 距今时间因子 if timeFactor < 0 { timeFactor = 0 } periodFactor := math.Sqrt(float64(minutes)) / math.Sqrt(43200) // 时间框架因子 retention := float64(maxRetention) * timeFactor * periodFactor if retention < float64(c.MinRetention) { return c.MinRetention } return int(retention) } // periodToMinutes 将时间框架转换为分钟数 func periodToMinutes(period string) int { switch period { case "1m": return 1 case "3m": return 3 case "5m": return 5 case "15m": return 15 case "30m": return 30 case "1h": return 60 case "2h": return 120 case "4h": return 240 case "6h": return 360 case "12h": return 720 case "1d", "1D": return 1440 case "2d": return 2880 case "5d": return 7200 case "1W": return 10080 case "1M": return 43200 // 假设 30 天 default: return 1 } } // getPhase 根据时间差和保留时间决定初始阶段 func getPhase(daysDiff, retentionDays int) string { if daysDiff > 730 || retentionDays < 180 { // 超过 2 年或保留时间少于 180 天 fmt.Printf("[ILM Phase] 数据已超过2年或保留时间少于180天,判定为cold阶段。daysDiff: %d, retentionDays: %d\n", daysDiff, retentionDays) return "cold" } if daysDiff <= 7 { // 7天内的数据为hot阶段 fmt.Printf("[ILM Phase] 数据在7天内,判定为hot阶段。daysDiff: %d\n", daysDiff) return "hot" } fmt.Printf("[ILM Phase] 数据在7天到2年之间,判定为warm阶段。daysDiff: %d\n", daysDiff) return "warm" } func ensureAlias(client *http.Client, esConfig cfg.ElasticsearchConfig, alias, period string) error { // 获取当前别名关联的索引 getAliasURL := fmt.Sprintf("%s/_alias/%s", esConfig.URL, alias) req, err := http.NewRequest("GET", getAliasURL, nil) if err != nil { return err } req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) resp, err := client.Do(req) if err != nil { return err } defer resp.Body.Close() // 准备别名更新的操作 var actions []interface{} var aliasInfo map[string]map[string]interface{} // 如果别名存在,解析当前关联的索引 if resp.StatusCode == http.StatusOK { if err := json.NewDecoder(resp.Body).Decode(&aliasInfo); err != nil { return err } // 移除所有现有索引的写入索引标记 for indexName, info := range aliasInfo { if aliases, ok := info["aliases"].(map[string]interface{}); ok { for aliasName, aliasDetails := range aliases { if aliasName == alias && aliasDetails.(map[string]interface{})["is_write_index"] == true { actions = append(actions, map[string]interface{}{ "remove": map[string]interface{}{ "index": indexName, "alias": alias, }, }) } } } } } // 确定最新的索引作为写入索引 var latestIndex string var latestTime time.Time for indexName := range aliasInfo { parts := strings.Split(indexName, ".") if len(parts) < 3 { continue } datePart := parts[len(parts)-1] // 例如 2025-03 indexTime, err := time.Parse("2006-01", datePart) if err != nil { continue } if latestIndex == "" || indexTime.After(latestTime) { latestIndex = indexName latestTime = indexTime } } // 如果没有找到合适的索引,创建一个新的索引模式 if latestIndex == "" { latestIndex = fmt.Sprintf("logstash-%s.candle.okb-eth.%s", period, time.Now().Format("2006-01")) } // 检查索引是否存在,如果不存在则创建 indexURL := fmt.Sprintf("%s/%s", esConfig.URL, latestIndex) req, err = http.NewRequest("HEAD", indexURL, nil) if err != nil { return err } req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) resp, err = client.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode == http.StatusNotFound { // 索引不存在,创建索引 fmt.Printf("Index %s does not exist, creating it...\n", latestIndex) createIndexURL := fmt.Sprintf("%s/%s", esConfig.URL, latestIndex) // 简单的索引创建请求,可以根据需要添加更多设置 indexData := map[string]interface{}{ "settings": map[string]interface{}{ "number_of_shards": 2, // 根据你的配置调整 "number_of_replicas": 1, // 根据你的配置调整 }, } data, err := json.Marshal(indexData) if err != nil { return fmt.Errorf("failed to marshal index creation data: %v", err) } req, err = http.NewRequest("PUT", createIndexURL, bytes.NewBuffer(data)) if err != nil { return err } req.Header.Set("Content-Type", "application/json") req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) resp, err = client.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("failed to create index %s, status: %d, response: %s", latestIndex, resp.StatusCode, string(body)) } fmt.Printf("Successfully created index: %s\n", latestIndex) } // 添加最新的索引作为写入索引 actions = append(actions, map[string]interface{}{ "add": map[string]interface{}{ "index": latestIndex, "alias": alias, "is_write_index": true, }, }) // 确保其他索引的 is_write_index 为 false for indexName := range aliasInfo { if indexName != latestIndex { actions = append(actions, map[string]interface{}{ "add": map[string]interface{}{ "index": indexName, "alias": alias, "is_write_index": false, }, }) } } // 执行别名更新操作 aliasURL := fmt.Sprintf("%s/_aliases", esConfig.URL) aliasData := map[string]interface{}{ "actions": actions, } data, err := json.Marshal(aliasData) if err != nil { return err } req, err = http.NewRequest("POST", aliasURL, bytes.NewBuffer(data)) if err != nil { return err } req.Header.Set("Content-Type", "application/json") req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) resp, err = client.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("failed to create alias, status: %d, response: %s", resp.StatusCode, string(body)) } return nil } func ConfigureILM(client *http.Client, config *cfg.Config, dataType, period string, indexTime time.Time) error { esConfig := config.Elasticsearch dataConfig, ok := esConfig.ILM.DataTypes[dataType] if !ok { return fmt.Errorf("ILM configuration for data type '%s' not found in config", dataType) } // 计算保留时间和时间差 calc := DefaultRetentionCalculator{ MaxRetention: dataConfig.MaxRetention, MinRetention: dataConfig.MinRetention, } now := time.Now() daysDiff := int(now.Sub(indexTime).Hours() / 24) retentionDays := calc.Calculate(daysDiff, period) // 确保保留时间不低于冷阶段的最小值 minDeleteAge := dataConfig.NormalPhases["cold"] if retentionDays < minDeleteAge { retentionDays = minDeleteAge } // 格式化策略名称、模板名称、索引模式和别名 year, month := indexTime.Year(), int(indexTime.Month()) policyName := fmt.Sprintf("logstash_%s_%s_%d_%02d", dataType, period, year, month) templateName := fmt.Sprintf("log_stash_%s_%s_%d_%02d", dataType, period, year, month) indexPattern := fmt.Sprintf(dataConfig.IndexPattern, period, year, month) aliasName := fmt.Sprintf("%s_alias", policyName) fmt.Printf("[ConfigureILM] Configuring ILM with policyName: %s, templateName: %s, aliasName: %s\n", policyName, templateName, aliasName) // 创建索引模板 template := map[string]interface{}{ "index_patterns": []string{indexPattern}, "priority": 100, "template": map[string]interface{}{ "settings": map[string]interface{}{ "number_of_shards": dataConfig.Shards, "number_of_replicas": dataConfig.Replicas, "index.lifecycle.name": policyName, "index.lifecycle.rollover_alias": aliasName, // 确保与 aliasName 一致 }, "mappings": map[string]interface{}{ "properties": map[string]interface{}{ "dataTime": map[string]string{"type": "date", "format": "yyyy-MM-dd HH:mm:ss"}, "open": map[string]string{"type": "float"}, "high": map[string]string{"type": "float"}, "low": map[string]string{"type": "float"}, "close": map[string]string{"type": "float"}, "volume": map[string]string{"type": "float"}, "volumeCcy": map[string]string{"type": "float"}, }, }, }, } templateURL := fmt.Sprintf("%s/_index_template/%s", esConfig.URL, templateName) templateData, err := json.Marshal(template) if err != nil { return fmt.Errorf("failed to marshal index template data: %v", err) } req, err := http.NewRequest("PUT", templateURL, bytes.NewBuffer(templateData)) if err != nil { return fmt.Errorf("failed to create index template request: %v", err) } req.Header.Set("Content-Type", "application/json") req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) resp, err := client.Do(req) if err != nil { return fmt.Errorf("failed to send index template request: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("failed to create index template, status: %d, response: %s", resp.StatusCode, string(body)) } fmt.Printf("[ConfigureILM] Successfully configured ILM template: %s\n", templateName) // 创建或更新 ILM 策略 if err := ensureILMPolicy(client, esConfig, policyName, dataType, period, daysDiff, retentionDays, dataConfig); err != nil { return fmt.Errorf("failed to ensure ILM policy: %v", err) } // 设置别名 if err := ensureAlias(client, esConfig, aliasName, period); err != nil { return fmt.Errorf("failed to ensure alias: %v", err) } fmt.Printf("[ConfigureILM] Successfully completed ILM configuration for policy: %s\n", policyName) return nil } func ensureILMPolicy(client *http.Client, esConfig cfg.ElasticsearchConfig, policyName, dataType, period string, daysDiff, retentionDays int, dataConfig cfg.DataTypeConfig) error { // 检查 default_policy 是否存在 defaultPolicyURL := fmt.Sprintf("%s/_ilm/policy/default_policy", esConfig.URL) resp, err := client.Get(defaultPolicyURL) if err != nil { return fmt.Errorf("failed to check default_policy: %v", err) } defer resp.Body.Close() // 如果 default_policy 不存在,则创建 if resp.StatusCode != http.StatusOK { fmt.Println("[ILM Policy] Default policy does not exist, creating it...") defaultPolicy := map[string]interface{}{ "policy": map[string]interface{}{ "phases": map[string]interface{}{ "hot": map[string]interface{}{ "actions": map[string]interface{}{ "set_priority": map[string]interface{}{ "priority": 100, }, }, }, "delete": map[string]interface{}{ "min_age": "365d", "actions": map[string]interface{}{ "delete": map[string]interface{}{}, }, }, }, }, } defaultPolicyData, err := json.Marshal(defaultPolicy) if err != nil { return fmt.Errorf("failed to marshal default_policy: %v", err) } req, err := http.NewRequest("PUT", defaultPolicyURL, bytes.NewBuffer(defaultPolicyData)) if err != nil { return fmt.Errorf("failed to create default_policy request: %v", err) } req.Header.Set("Content-Type", "application/json") req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) resp, err = client.Do(req) if err != nil { return fmt.Errorf("failed to create default_policy: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("failed to create default_policy, status: %d, response: %s", resp.StatusCode, string(body)) } fmt.Println("[ILM Policy] Successfully created default_policy") } // 检查目标策略是否存在 policyURL := fmt.Sprintf("%s/_ilm/policy/%s", esConfig.URL, policyName) resp, err = client.Get(policyURL) if err != nil { return fmt.Errorf("failed to check ILM policy: %v", err) } defer resp.Body.Close() // 如果策略已存在,更新索引设置并删除旧策略 if resp.StatusCode == http.StatusOK { fmt.Printf("[ILM Policy] Policy %s already exists, updating index settings...\n", policyName) // 更新索引设置 updateIndexURL := fmt.Sprintf("%s/logstash-%s.%s.*/_settings", esConfig.URL, period, dataType) updateData := map[string]interface{}{ "index.lifecycle.name": "default_policy", "index.lifecycle.rollover_alias": fmt.Sprintf("logs-%s-%s-candle", dataType, period), } updateDataBytes, err := json.Marshal(updateData) if err != nil { return fmt.Errorf("failed to marshal update index data: %v", err) } req, err := http.NewRequest("PUT", updateIndexURL, bytes.NewBuffer(updateDataBytes)) if err != nil { return fmt.Errorf("failed to create update index request: %v", err) } req.Header.Set("Content-Type", "application/json") req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) updateResp, err := client.Do(req) if err != nil { return fmt.Errorf("failed to update index settings: %v", err) } defer updateResp.Body.Close() if updateResp.StatusCode != http.StatusOK { body, _ := io.ReadAll(updateResp.Body) return fmt.Errorf("failed to update index settings, status: %d, response: %s", updateResp.StatusCode, string(body)) } fmt.Printf("[ILM Policy] Successfully updated index settings for policy: %s\n", policyName) // 删除旧策略 req, err = http.NewRequest("DELETE", policyURL, nil) if err != nil { return fmt.Errorf("failed to create delete policy request: %v", err) } req.Header.Set("Content-Type", "application/json") req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) deleteResp, err := client.Do(req) if err != nil { return fmt.Errorf("failed to delete old policy: %v", err) } defer deleteResp.Body.Close() if deleteResp.StatusCode != http.StatusOK { body, _ := io.ReadAll(deleteResp.Body) return fmt.Errorf("failed to delete old policy, status: %d, response: %s", deleteResp.StatusCode, string(body)) } fmt.Printf("[ILM Policy] Successfully deleted old policy: %s\n", policyName) } else { fmt.Printf("[ILM Policy] Policy %s does not exist, creating a new one...\n", policyName) } // 创建新的 ILM 策略 initialPhase := getPhase(daysDiff, retentionDays) fmt.Printf("[ILM Policy] Initial phase for policy %s is: %s\n", policyName, initialPhase) rolloverActions := map[string]interface{}{ "max_age": dataConfig.NormalRollover["max_age"], "max_size": dataConfig.NormalRollover["max_size"], } if maxDocsStr, ok := dataConfig.NormalRollover["max_docs"]; ok && maxDocsStr != "" { maxDocs, err := strconv.Atoi(maxDocsStr) if err != nil || maxDocs <= 0 { fmt.Printf("[ILM Policy] Invalid max_docs value: %s, skipping max_docs in rollover\n", maxDocsStr) } else { rolloverActions["max_docs"] = maxDocs } } else { fmt.Println("[ILM Policy] max_docs not set in NormalRollover, skipping max_docs in rollover") } policy := map[string]interface{}{ "policy": map[string]interface{}{ "phases": map[string]interface{}{ initialPhase: map[string]interface{}{ "actions": map[string]interface{}{ "rollover": rolloverActions, "set_priority": map[string]interface{}{ "priority": 100, }, }, }, "warm": map[string]interface{}{ "min_age": fmt.Sprintf("%dd", dataConfig.NormalPhases["warm"]), "actions": map[string]interface{}{ "allocate": map[string]interface{}{ "include": map[string]string{"_tier_preference": "data_warm"}, }, }, }, "cold": map[string]interface{}{ "min_age": fmt.Sprintf("%dd", dataConfig.NormalPhases["cold"]), "actions": map[string]interface{}{ "allocate": map[string]interface{}{ "include": map[string]string{"_tier_preference": "data_cold"}, }, }, }, "delete": map[string]interface{}{ "min_age": fmt.Sprintf("%dd", retentionDays), "actions": map[string]interface{}{ "delete": map[string]interface{}{}, }, }, }, }, } // 如果数据超过 730 天(2 年),直接进入冷阶段 if daysDiff > 730 { phases, ok := policy["policy"].(map[string]interface{})["phases"].(map[string]interface{}) if !ok { return fmt.Errorf("failed to get phases from policy") } delete(phases, initialPhase) if _, exists := phases["cold"]; !exists { phases["cold"] = map[string]interface{}{ "min_age": "0d", "actions": map[string]interface{}{ "allocate": map[string]interface{}{ "include": map[string]string{"_tier_preference": "data_cold"}, }, }, } } else { coldPhase, ok := phases["cold"].(map[string]interface{}) if !ok { return fmt.Errorf("failed to get cold phase from phases") } coldPhase["min_age"] = "0d" } } // 创建或更新策略 policyData, err := json.Marshal(policy) if err != nil { return fmt.Errorf("failed to marshal ILM policy: %v", err) } req, err := http.NewRequest("PUT", policyURL, bytes.NewBuffer(policyData)) if err != nil { return fmt.Errorf("failed to create ILM policy request: %v", err) } req.Header.Set("Content-Type", "application/json") req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) resp, err = client.Do(req) if err != nil { return fmt.Errorf("failed to send ILM policy request: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("failed to create ILM policy, status: %d, response: %s", resp.StatusCode, string(body)) } fmt.Printf("[ILM Policy] Successfully created or updated ILM policy: %s\n", policyName) return nil }