package elasticilm import ( "bytes" "encoding/json" "fmt" cfg "gitea.zjmud.xyz/phyer/tanya/config" // 导入你的 config 包 "io" "net/http" "strconv" "strings" "time" ) // RetentionCalculator 计算保留时间的接口 type RetentionCalculator interface { Calculate(daysDiff int, period string) int } // DefaultRetentionCalculator 默认保留时间计算器 type DefaultRetentionCalculator struct { MaxRetention map[string]int MinRetention int } var periodBaseConfig = map[string]struct { Base int // 基础值 Interval float64 // 时间间隔系数 }{ "1m": {30, 1.0}, "3m": {45, 1.5}, "5m": {60, 2.0}, "15m": {90, 3.0}, "30m": {120, 4.0}, "1H": {150, 5.0}, "2H": {180, 6.0}, "4H": {240, 8.0}, "6H": {300, 10.0}, "12H": {360, 12.0}, "1D": {480, 16.0}, "2D": {600, 20.0}, "5D": {720, 24.0}, "1W": {840, 28.0}, } func NonLinearCoolingModel(daysDiff int, period string, config map[string]float64) (int, int, int) { // 获取基础配置 cfg, ok := periodBaseConfig[period] if !ok { cfg = periodBaseConfig["1m"] // 默认配置 } // 从配置获取参数,设置默认值 timeDecayFactor := 0.5 if td, ok := config["timeDecayFactor"]; ok { timeDecayFactor = td } // 反转时间衰减因子:数据越新,保留时间越长 // 使用反比例函数:1/(x+1) 确保新数据(小daysDiff)获得更大值 ageFactor := timeDecayFactor / (float64(daysDiff)/365.0 + 1.0) // 计算基础保留时间(与时间框架相关) base := cfg.Base // 应用年龄因子和阶段乘数 - 新数据获得更长保留时间 warmPhaseMultiplier := 1.0 if wm, ok := config["warmPhaseMultiplier"]; ok { warmPhaseMultiplier = wm } warm := int(float64(base) * warmPhaseMultiplier * (1.0 + ageFactor)) cold := warm * 2 delete := warm * 3 // 确保最小值 if warm < cfg.Base { warm = cfg.Base } if cold < warm*2 { cold = warm * 2 } if delete < warm*3 { delete = warm * 3 } // 打印计算结果 fmt.Printf("[Cooling Model] Period: %s, DaysDiff: %d => warm=%dd, cold=%dd, delete=%dd\n", period, daysDiff, warm, cold, delete) return warm, cold, delete } // Calculate 使用非线性冷却模型计算保留时间 func (c DefaultRetentionCalculator) Calculate(daysDiff int, period string) int { // 默认配置 config := map[string]float64{ "timeDecayFactor": 0.5, "periodGranularityFactor": 0.5, "warmPhaseMultiplier": 1.0, "coldPhaseMultiplier": 2.0, "deletePhaseMultiplier": 3.0, } _, _, deleteDays := NonLinearCoolingModel(daysDiff, period, config) return deleteDays } // periodToMinutes 将时间框架转换为分钟数 func periodToMinutes(period string) int { switch period { case "1m": return 1 case "3m": return 3 case "5m": return 5 case "15m": return 15 case "30m": return 30 case "1h": return 60 case "2h": return 120 case "4h": return 240 case "6h": return 360 case "12h": return 720 case "1d", "1D": return 1440 case "2d": return 2880 case "5d": return 7200 case "1W": return 10080 case "1M": return 43200 // 假设 30 天 default: return 1 } } // getPhase 根据时间差和保留时间决定初始阶段 func getPhase(daysDiff, retentionDays int) string { if daysDiff > 730 || retentionDays < 180 { // 超过 2 年或保留时间少于 180 天 fmt.Printf("[ILM Phase] 数据已超过2年或保留时间少于180天,判定为cold阶段。daysDiff: %d, retentionDays: %d\n", daysDiff, retentionDays) return "cold" } if daysDiff <= 7 { // 7天内的数据为hot阶段 fmt.Printf("[ILM Phase] 数据在7天内,判定为hot阶段。daysDiff: %d\n", daysDiff) return "hot" } fmt.Printf("[ILM Phase] 数据在7天到2年之间,判定为warm阶段。daysDiff: %d\n", daysDiff) return "warm" } func ensureAlias(client *http.Client, esConfig cfg.ElasticsearchConfig, alias, period, dataType, coinPair string, indexTime time.Time, dataConfig cfg.DataTypeConfig, daysDiff int) error { // 获取当前别名关联的索引 getAliasURL := fmt.Sprintf("%s/_alias/%s", esConfig.URL, alias) req, err := http.NewRequest("GET", getAliasURL, nil) if err != nil { return err } req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) resp, err := client.Do(req) if err != nil { return err } defer resp.Body.Close() // 准备别名更新的操作 var actions []interface{} var aliasInfo map[string]map[string]interface{} // 如果别名存在,解析当前关联的索引 if resp.StatusCode == http.StatusOK { if err := json.NewDecoder(resp.Body).Decode(&aliasInfo); err != nil { return err } // 移除所有现有索引的写入索引标记 for indexName, info := range aliasInfo { if aliases, ok := info["aliases"].(map[string]interface{}); ok { for aliasName, aliasDetails := range aliases { if aliasName == alias && aliasDetails.(map[string]interface{})["is_write_index"] == true { actions = append(actions, map[string]interface{}{ "remove": map[string]interface{}{ "index": indexName, "alias": alias, }, }) } } } } } // 确定最新的索引作为写入索引 year, month := indexTime.Year(), int(indexTime.Month()) latestIndex := fmt.Sprintf("logstash-%s.%s.%s.%d-%02d", strings.ToLower(period), strings.ToLower(dataType), strings.ToLower(coinPair), year, month) // 如果没有找到合适的索引,创建一个新的索引模式 if latestIndex == "" { latestIndex = fmt.Sprintf("logstash-%s.candle.okb-eth.%s", period, time.Now().Format("2006-01")) } // 检查索引是否存在,如果不存在则创建 indexURL := fmt.Sprintf("%s/%s", esConfig.URL, latestIndex) req, err = http.NewRequest("HEAD", indexURL, nil) if err != nil { return err } req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) resp, err = client.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode == http.StatusNotFound { // 索引不存在,创建索引 fmt.Printf("Index %s does not exist, creating it...\n", latestIndex) createIndexURL := fmt.Sprintf("%s/%s", esConfig.URL, latestIndex) // 简单的索引创建请求,可以根据需要添加更多设置 indexData := map[string]interface{}{ "settings": map[string]interface{}{ "number_of_shards": 2, // 根据你的配置调整 "number_of_replicas": 1, // 根据你的配置调整 }, } data, err := json.Marshal(indexData) if err != nil { return fmt.Errorf("failed to marshal index creation data: %v", err) } req, err = http.NewRequest("PUT", createIndexURL, bytes.NewBuffer(data)) if err != nil { return err } req.Header.Set("Content-Type", "application/json") req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) resp, err = client.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("failed to create index %s, status: %d, response: %s", latestIndex, resp.StatusCode, string(body)) } fmt.Printf("Successfully created index: %s\n", latestIndex) } // 添加最新的索引作为写入索引 actions = append(actions, map[string]interface{}{ "add": map[string]interface{}{ "index": latestIndex, "alias": alias, "is_write_index": true, }, }) // 确保其他索引的 is_write_index 为 false for indexName := range aliasInfo { if indexName != latestIndex { actions = append(actions, map[string]interface{}{ "add": map[string]interface{}{ "index": indexName, "alias": alias, "is_write_index": false, }, }) } } // 执行别名更新操作 aliasURL := fmt.Sprintf("%s/_aliases", esConfig.URL) aliasData := map[string]interface{}{ "actions": actions, } data, err := json.Marshal(aliasData) if err != nil { return err } req, err = http.NewRequest("POST", aliasURL, bytes.NewBuffer(data)) if err != nil { return err } req.Header.Set("Content-Type", "application/json") req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) resp, err = client.Do(req) if err != nil { return err } defer resp.Body.Close() // 加载冷却模型配置 coolingConfig := dataConfig.CoolingModelConfig // Ensure this matches the actual field name // 使用非线性冷却模型计算各个阶段的时间 warmDays, coldDays, deleteDays := NonLinearCoolingModel(daysDiff, period, coolingConfig) fmt.Printf("[ILM Policy] Calculated phases: warm=%dd, cold=%dd, delete=%dd\n", warmDays, coldDays, deleteDays) if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("failed to create alias, status: %d, response: %s", resp.StatusCode, string(body)) } return nil } func ConfigureILM(client *http.Client, config *cfg.Config, dataType, period, coinPair string, indexTime time.Time) error { esConfig := config.Elasticsearch dataConfig, ok := esConfig.ILM.DataTypes[dataType] if !ok { return fmt.Errorf("ILM configuration for data type '%s' not found in config", dataType) } // 计算保留时间和时间差 calc := DefaultRetentionCalculator{ MaxRetention: dataConfig.MaxRetention, MinRetention: dataConfig.MinRetention, } now := time.Now() daysDiff := int(now.Sub(indexTime).Hours() / 24) retentionDays := calc.Calculate(daysDiff, period) // 确保保留时间不低于冷阶段的最小值 minDeleteAge := dataConfig.NormalPhases["cold"] if retentionDays < minDeleteAge { retentionDays = minDeleteAge } // 格式化策略名称、模板名称、索引模式和别名 year, month := indexTime.Year(), int(indexTime.Month()) policyName := fmt.Sprintf("logstash_%s_%s_%d_%02d", strings.ToLower(dataType), strings.ToLower(period), year, month) templateName := fmt.Sprintf("logstash_%s_%s_%d_%02d", dataType, period, year, month) indexPattern := fmt.Sprintf("logstash-%s.%s.*.%d-%02d", strings.ToLower(period), strings.ToLower(dataType), year, month) aliasName := fmt.Sprintf("logstash-%s.%s.%s.%d-%02d_alias", strings.ToLower(period), strings.ToLower(dataType), strings.ToLower(coinPair), year, month) fmt.Printf("[ConfigureILM] Configuring ILM with policyName: %s, templateName: %s, aliasName: %s\n", policyName, templateName, aliasName) // 创建索引模板 template := map[string]interface{}{ "index_patterns": []string{indexPattern}, "priority": 100, "template": map[string]interface{}{ "settings": map[string]interface{}{ "number_of_shards": dataConfig.Shards, "number_of_replicas": dataConfig.Replicas, "index.lifecycle.name": policyName, "index.lifecycle.rollover_alias": aliasName, }, "mappings": map[string]interface{}{ "properties": map[string]interface{}{ "dataTime": map[string]string{"type": "date", "format": "yyyy-MM-dd HH:mm:ss"}, // 根据不同的dataType保留通用字段 "coinPair": map[string]string{"type": "keyword"}, "period": map[string]string{"type": "keyword"}, // 以下字段仅适用于candle类型,其他数据类型可能需要不同字段 "open": map[string]string{"type": "float"}, "high": map[string]string{"type": "float"}, "low": map[string]string{"type": "float"}, "close": map[string]string{"type": "float"}, "volume": map[string]string{"type": "float"}, "volumeCcy": map[string]string{"type": "float"}, }, }, }, } templateURL := fmt.Sprintf("%s/_index_template/%s", esConfig.URL, templateName) templateData, err := json.Marshal(template) if err != nil { return fmt.Errorf("failed to marshal index template data: %v", err) } req, err := http.NewRequest("PUT", templateURL, bytes.NewBuffer(templateData)) if err != nil { return fmt.Errorf("failed to create index template request: %v", err) } req.Header.Set("Content-Type", "application/json") req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) resp, err := client.Do(req) if err != nil { return fmt.Errorf("failed to send index template request: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("failed to create index template, status: %d, response: %s", resp.StatusCode, string(body)) } fmt.Printf("[ConfigureILM] Successfully configured ILM template: %s\n", templateName) // 创建或更新 ILM 策略 if err := ensureILMPolicy(client, esConfig, policyName, aliasName, period, dataType, daysDiff, retentionDays, dataConfig, indexTime, coinPair); err != nil { return fmt.Errorf("failed to ensure ILM policy: %v", err) } // 设置别名,确保每次调用时传入正确的 aliasName 和 indexTime // if err := ensureAlias(client, esConfig, aliasName, period, dataType, coinPair, indexTime, dataConfig, daysDiff); err != nil { return fmt.Errorf("failed to ensure alias: %v", err) } fmt.Printf("[ConfigureILM] Successfully completed ILM configuration for policy: %s\n", policyName) return nil } func ensureILMPolicy(client *http.Client, esConfig cfg.ElasticsearchConfig, policyName, aliasName, period, dataType string, daysDiff, retentionDays int, dataConfig cfg.DataTypeConfig, indexTime time.Time, coinPair string) error { // 检查 default_policy 是否存在 defaultPolicyURL := fmt.Sprintf("%s/_ilm/policy/default_policy", esConfig.URL) resp, err := client.Get(defaultPolicyURL) if err != nil { return fmt.Errorf("failed to check default_policy: %v", err) } defer resp.Body.Close() // 如果 default_policy 不存在,则创建 if resp.StatusCode != http.StatusOK { fmt.Println("[ILM Policy] Default policy does not exist, creating it...") defaultPolicy := map[string]interface{}{ "policy": map[string]interface{}{ "phases": map[string]interface{}{ "hot": map[string]interface{}{ "actions": map[string]interface{}{ "set_priority": map[string]interface{}{ "priority": 100, }, }, }, "delete": map[string]interface{}{ "min_age": "365d", "actions": map[string]interface{}{ "delete": map[string]interface{}{}, }, }, }, }, } defaultPolicyData, err := json.Marshal(defaultPolicy) if err != nil { return fmt.Errorf("failed to marshal default_policy: %v", err) } req, err := http.NewRequest("PUT", defaultPolicyURL, bytes.NewBuffer(defaultPolicyData)) if err != nil { return fmt.Errorf("failed to create default_policy request: %v", err) } req.Header.Set("Content-Type", "application/json") req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) resp, err = client.Do(req) if err != nil { return fmt.Errorf("failed to create default_policy: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("failed to create default_policy, status: %d, response: %s", resp.StatusCode, string(body)) } fmt.Println("[ILM Policy] Successfully created default_policy") } // 检查目标策略是否存在 policyURL := fmt.Sprintf("%s/_ilm/policy/%s", esConfig.URL, policyName) resp, err = client.Get(policyURL) if err != nil { return fmt.Errorf("failed to check ILM policy: %v", err) } defer resp.Body.Close() // 如果策略已存在,更新索引设置并删除旧策略 if resp.StatusCode == http.StatusOK { fmt.Printf("[ILM Policy] Policy %s already exists, updating index settings...\n", policyName) // 提取年份和月份 year, month := indexTime.Year(), int(indexTime.Month()) indexName := fmt.Sprintf("logstash-%s.candle.%s.%d-%02d", strings.ToLower(period), strings.ToLower(coinPair), year, month) // 首先检查索引是否存在 indexURL := fmt.Sprintf("%s/%s", esConfig.URL, indexName) req, err := http.NewRequest("HEAD", indexURL, nil) if err != nil { return fmt.Errorf("failed to create index check request: %v", err) } req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) resp, err := client.Do(req) if err != nil { return fmt.Errorf("failed to check index existence: %v", err) } defer resp.Body.Close() // 如果索引不存在,创建它 if resp.StatusCode == http.StatusNotFound { createIndexURL := fmt.Sprintf("%s/%s", esConfig.URL, indexName) indexData := map[string]interface{}{ "settings": map[string]interface{}{ "number_of_shards": 2, "number_of_replicas": 1, }, } data, err := json.Marshal(indexData) if err != nil { return fmt.Errorf("failed to marshal index creation data: %v", err) } req, err = http.NewRequest("PUT", createIndexURL, bytes.NewBuffer(data)) if err != nil { return err } req.Header.Set("Content-Type", "application/json") req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) resp, err = client.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("failed to create index %s, status: %d, response: %s", indexName, resp.StatusCode, string(body)) } fmt.Printf("Successfully created index: %s\n", indexName) } // 更新索引设置 updateIndexURL := fmt.Sprintf("%s/%s/_settings", esConfig.URL, indexName) updateData := map[string]interface{}{ "index.lifecycle.name": policyName, "index.lifecycle.rollover_alias": fmt.Sprintf("logstash-%s.candle.%s.%d-%02d_alias", period, coinPair, year, month), } updateDataBytes, err := json.Marshal(updateData) if err != nil { return fmt.Errorf("failed to marshal update index data: %v", err) } req, err = http.NewRequest("PUT", updateIndexURL, bytes.NewBuffer(updateDataBytes)) if err != nil { return fmt.Errorf("failed to create update index request: %v", err) } req.Header.Set("Content-Type", "application/json") req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) updateResp, err := client.Do(req) if err != nil { return fmt.Errorf("failed to update index settings: %v", err) } defer updateResp.Body.Close() if updateResp.StatusCode != http.StatusOK { body, _ := io.ReadAll(updateResp.Body) return fmt.Errorf("failed to update index settings, status: %d, response: %s", updateResp.StatusCode, string(body)) } fmt.Printf("[ILM Policy] Successfully updated index settings for policy: %s\n", policyName) fmt.Printf("[ILM Policy] Policy %s already exists, attempting to update...\n", policyName) } else { fmt.Printf("[ILM Policy] Policy %s does not exist, creating a new one...\n", policyName) } // 创建/更新 ILM 策略 initialPhase := getPhase(daysDiff, retentionDays) // 当策略已存在时,Elasticsearch 允许通过 PUT 请求直接更新策略 // 注意:某些策略修改可能需要满足条件(如不能缩短 min_age 时间) fmt.Printf("[ILM Policy] Initial phase for policy %s is: %s\n", policyName, initialPhase) rolloverActions := map[string]interface{}{ "max_age": dataConfig.NormalRollover["max_age"], "max_size": dataConfig.NormalRollover["max_size"], } if maxDocsStr, ok := dataConfig.NormalRollover["max_docs"]; ok && maxDocsStr != "" { maxDocs, err := strconv.Atoi(maxDocsStr) if err != nil || maxDocs <= 0 { fmt.Printf("[ILM Policy] Invalid max_docs value: %s, skipping max_docs in rollover\n", maxDocsStr) } else { rolloverActions["max_docs"] = maxDocs } } else { fmt.Println("[ILM Policy] max_docs not set in NormalRollover, skipping max_docs in rollover") } warmDays, coldDays, deleteDays := NonLinearCoolingModel(daysDiff, period, dataConfig.CoolingModelConfig) policy := map[string]interface{}{ "policy": map[string]interface{}{ "phases": map[string]interface{}{ initialPhase: map[string]interface{}{ "actions": map[string]interface{}{ "rollover": rolloverActions, "set_priority": map[string]interface{}{ "priority": 100, }, }, }, "warm": map[string]interface{}{ "min_age": fmt.Sprintf("%dd", warmDays), // 使用冷却模型计算的 warmDays "actions": map[string]interface{}{ "allocate": map[string]interface{}{ "include": map[string]string{"_tier_preference": "data_warm"}, }, }, }, "cold": map[string]interface{}{ "min_age": fmt.Sprintf("%dd", coldDays), // 使用冷却模型计算的 coldDays "actions": map[string]interface{}{ "allocate": map[string]interface{}{ "include": map[string]string{"_tier_preference": "data_cold"}, }, }, }, "delete": map[string]interface{}{ "min_age": fmt.Sprintf("%dd", deleteDays), // 使用冷却模型计算的 deleteDays "actions": map[string]interface{}{ "delete": map[string]interface{}{}, }, }, }, }, } // 如果数据超过 730 天(2 年),直接进入冷阶段 if daysDiff > 730 { phases, ok := policy["policy"].(map[string]interface{})["phases"].(map[string]interface{}) if !ok { return fmt.Errorf("failed to get phases from policy") } delete(phases, initialPhase) if _, exists := phases["cold"]; !exists { phases["cold"] = map[string]interface{}{ "min_age": "0d", "actions": map[string]interface{}{ "allocate": map[string]interface{}{ "include": map[string]string{"_tier_preference": "data_cold"}, }, }, } } else { coldPhase, ok := phases["cold"].(map[string]interface{}) if !ok { return fmt.Errorf("failed to get cold phase from phases") } coldPhase["min_age"] = "0d" } } // 创建或更新策略 policyData, err := json.MarshalIndent(policy, "", " ") if err != nil { return fmt.Errorf("failed to marshal ILM policy: %v", err) } fmt.Printf("[ILM Policy] Printing ILM policy to console:\n%s\n", string(policyData)) req, err := http.NewRequest("PUT", policyURL, bytes.NewBuffer(policyData)) if err != nil { return fmt.Errorf("failed to create ILM policy request: %v", err) } req.Header.Set("Content-Type", "application/json") req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) resp, err = client.Do(req) if err != nil { return fmt.Errorf("failed to send ILM policy request: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("failed to create/update ILM policy, status: %d, response: %s", resp.StatusCode, string(body)) } // 记录策略更新时间 action := "updated" if resp.StatusCode == http.StatusCreated { action = "created" } fmt.Printf("[ILM Policy] Successfully %s ILM policy: %s (modified: %v)\n", action, policyName, time.Now().Format("2006-01-02 15:04:05")) return nil }