package elasticilm import ( "bytes" "encoding/json" "fmt" "io" "math" "net/http" "time" cfg "gitea.zjmud.xyz/phyer/tanya/config" // 导入你的 config 包 ) // RetentionCalculator 计算保留时间的接口 type RetentionCalculator interface { Calculate(daysDiff int, period string) int } // DefaultRetentionCalculator 默认保留时间计算器 type DefaultRetentionCalculator struct { MaxRetention map[string]int MinRetention int } // Calculate 计算保留时间(天) func (c DefaultRetentionCalculator) Calculate(daysDiff int, period string) int { minutes := periodToMinutes(period) maxRetention := c.MaxRetention[period] if maxRetention == 0 { maxRetention = c.MaxRetention["default"] } // 非线性衰减公式 timeFactor := 1 - math.Sqrt(float64(daysDiff))/50 // 距今时间因子 if timeFactor < 0 { timeFactor = 0 } periodFactor := math.Sqrt(float64(minutes)) / math.Sqrt(43200) // 时间框架因子 retention := float64(maxRetention) * timeFactor * periodFactor if retention < float64(c.MinRetention) { return c.MinRetention } return int(retention) } // periodToMinutes 将时间框架转换为分钟数 func periodToMinutes(period string) int { switch period { case "1m": return 1 case "3m": return 3 case "5m": return 5 case "15m": return 15 case "30m": return 30 case "1h": return 60 case "2h": return 120 case "4h": return 240 case "6h": return 360 case "12h": return 720 case "1d", "1D": return 1440 case "2d": return 2880 case "5d": return 7200 case "1W": return 10080 case "1M": return 43200 // 假设 30 天 default: return 1 } } // ConfigureILM 动态配置 ILM 策略和索引模板 func ConfigureILM(client *http.Client, config *cfg.Config, dataType, period string, indexTime time.Time) error { // 从配置文件中获取 Elasticsearch 和 ILM 配置 esConfig := config.Elasticsearch dataConfig, ok := esConfig.ILM.DataTypes[dataType] if !ok { return fmt.Errorf("ILM configuration for data type '%s' not found in config", dataType) } // 创建保留时间计算器 calc := DefaultRetentionCalculator{ MaxRetention: dataConfig.MaxRetention, MinRetention: dataConfig.MinRetention, } // 计算距今时间差和保留时间 now := time.Now() daysDiff := int(now.Sub(indexTime).Hours() / 24) retentionDays := calc.Calculate(daysDiff, period) // 构造 ILM 策略名称 year, month := indexTime.Year(), int(indexTime.Month()) policyName := fmt.Sprintf("logstash_%s_%s_%d_%02d_%s", dataType, period, year, month, getPhase(daysDiff, retentionDays)) // 创建或更新 ILM 策略 if err := ensureILMPolicy(client, esConfig, policyName, daysDiff, retentionDays, dataConfig); err != nil { return fmt.Errorf("failed to ensure ILM policy: %v", err) } // 配置索引模板 templateName := fmt.Sprintf("logstash-%s-%s-%d-%02d", dataType, period, year, month) indexPattern := fmt.Sprintf(dataConfig.IndexPattern, period, year, month) // 根据数据类型设置映射 var mappings map[string]interface{} switch dataType { case "candle": mappings = map[string]interface{}{ "properties": map[string]interface{}{ "dataTime": map[string]string{"type": "date", "format": "yyyy-MM-dd HH:mm:ss"}, "open": map[string]string{"type": "float"}, "high": map[string]string{"type": "float"}, "low": map[string]string{"type": "float"}, "close": map[string]string{"type": "float"}, "volume": map[string]string{"type": "float"}, "volumeCcy": map[string]string{"type": "float"}, }, } case "ma": mappings = map[string]interface{}{ "properties": map[string]interface{}{ "dataTime": map[string]string{"type": "date", "format": "yyyy-MM-dd HH:mm:ss"}, "ma_value": map[string]string{"type": "float"}, // MA 值 }, } default: return fmt.Errorf("unsupported data type: %s", dataType) } template := map[string]interface{}{ "index_patterns": []string{indexPattern}, "template": map[string]interface{}{ "settings": map[string]interface{}{ "number_of_shards": dataConfig.Shards, "number_of_replicas": dataConfig.Replicas, "index.lifecycle.name": policyName, }, "mappings": mappings, }, } templateURL := fmt.Sprintf("%s/_index_template/%s", esConfig.URL, templateName) templateData, _ := json.Marshal(template) req, err := http.NewRequest("PUT", templateURL, bytes.NewBuffer(templateData)) if err != nil { return fmt.Errorf("failed to create template request: %v", err) } req.Header.Set("Content-Type", "application/json") req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) resp, err := client.Do(req) if err != nil { return fmt.Errorf("failed to send template request: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("failed to create template, status: %d, response: %s", resp.StatusCode, string(body)) } fmt.Printf("Successfully configured ILM template: %s\n", templateName) return nil } // getPhase 根据时间差和保留时间决定初始阶段 func getPhase(daysDiff, retentionDays int) string { if daysDiff > 730 || retentionDays < 180 { // 超过 2 年或保留时间少于 180 天 return "cold" } return "normal" } // ensureILMPolicy 创建或更新 ILM 策略 func ensureILMPolicy(client *http.Client, esConfig cfg.ElasticsearchConfig, policyName string, daysDiff, retentionDays int, dataConfig cfg.DataTypeConfig) error { policyURL := fmt.Sprintf("%s/_ilm/policy/%s", esConfig.URL, policyName) resp, err := client.Get(policyURL) if err == nil && resp.StatusCode == http.StatusOK { resp.Body.Close() return nil // 策略已存在 } resp.Body.Close() var policy map[string]interface{} if daysDiff > 730 || retentionDays < 180 { policy = map[string]interface{}{ "policy": map[string]interface{}{ "phases": map[string]interface{}{ "cold": map[string]interface{}{ "min_age": "0d", "actions": map[string]interface{}{ "allocate": map[string]interface{}{ "include": map[string]string{"_tier_preference": "data_cold"}, }, "shrink": map[string]interface{}{ "number_of_shards": 1, }, }, }, "delete": map[string]interface{}{ "min_age": fmt.Sprintf("%dd", retentionDays), }, }, }, } } else { policy = map[string]interface{}{ "policy": map[string]interface{}{ "phases": map[string]interface{}{ "hot": map[string]interface{}{ "actions": map[string]interface{}{ "rollover": dataConfig.NormalRollover, "allocate": map[string]interface{}{ "include": map[string]string{"_tier_preference": "data_hot"}, }, }, }, "warm": map[string]interface{}{ "min_age": fmt.Sprintf("%dd", dataConfig.NormalPhases["warm"]), "actions": map[string]interface{}{ "allocate": map[string]interface{}{ "include": map[string]string{"_tier_preference": "data_warm"}, }, }, }, "cold": map[string]interface{}{ "min_age": fmt.Sprintf("%dd", dataConfig.NormalPhases["cold"]), "actions": map[string]interface{}{ "allocate": map[string]interface{}{ "include": map[string]string{"_tier_preference": "data_cold"}, }, "shrink": map[string]interface{}{ "number_of_shards": 1, }, }, }, "delete": map[string]interface{}{ "min_age": fmt.Sprintf("%dd", retentionDays), }, }, }, } } policyData, _ := json.Marshal(policy) req, err := http.NewRequest("PUT", policyURL, bytes.NewBuffer(policyData)) if err != nil { return fmt.Errorf("failed to create ILM policy request: %v", err) } req.Header.Set("Content-Type", "application/json") req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) resp, err = client.Do(req) if err != nil { return fmt.Errorf("failed to send ILM policy request: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("failed to create ILM policy, status: %d, response: %s", resp.StatusCode, string(body)) } fmt.Printf("Successfully created ILM policy: %s\n", policyName) return nil }