From b0eb71283c51c04f2bcac0b8b9b39c0e64c97440 Mon Sep 17 00:00:00 2001 From: "zhangkun9038@dingtalk.com" Date: Thu, 27 Mar 2025 23:46:56 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81ToElastic=E7=9A=84=E6=97=B6?= =?UTF-8?q?=E5=80=99=E5=BD=93index=E7=AC=A6=E5=90=88=E7=89=B9=E5=AE=9A?= =?UTF-8?q?=E7=B1=BB=E5=9E=8B=E6=97=B6=EF=BC=8C=E5=8A=A8=E6=80=81=E8=AE=BE?= =?UTF-8?q?=E7=BD=AEilm=EF=BC=8C=20=E7=BB=93=E6=9E=9C=E6=9C=89=E5=BE=85?= =?UTF-8?q?=E9=AA=8C=E8=AF=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/config.go | 56 +++++--- config/config.json | 56 +++++++- elasticilm/ilm.go | 281 +++++++++++++++++++++++++++++++++++++++ okx/candleList.go | 323 ++++++--------------------------------------- 4 files changed, 419 insertions(+), 297 deletions(-) create mode 100644 elasticilm/ilm.go diff --git a/config/config.go b/config/config.go index 1dcbc91..db50655 100644 --- a/config/config.go +++ b/config/config.go @@ -5,31 +5,55 @@ import ( "os" ) +// Config 配置文件结构体 type Config struct { - Fluentd struct { - URL string `json:"url"` - } `json:"fluentd"` - Elasticsearch struct { - URL string `json:"url"` - Auth struct { - Username string `json:"username"` - Password string `json:"password"` - } `json:"auth"` - } `json:"elasticsearch"` + Fluentd FluentdConfig `json:"fluentd"` + Elasticsearch ElasticsearchConfig `json:"elasticsearch"` } -// LoadConfig 从指定路径加载配置文件 +// FluentdConfig Fluentd 配置 +type FluentdConfig struct { + URL string `json:"url"` +} + +// ElasticsearchConfig Elasticsearch 配置 +type ElasticsearchConfig struct { + URL string `json:"url"` + Auth AuthConfig `json:"auth"` + ILM ILMConfig `json:"ilm"` +} + +// AuthConfig 认证信息 +type AuthConfig struct { + Username string `json:"username"` + Password string `json:"password"` +} + +// ILMConfig ILM 配置 +type ILMConfig struct { + DataTypes map[string]DataTypeConfig `json:"data_types"` +} + +// DataTypeConfig 每种数据类型的 ILM 配置 +type DataTypeConfig struct { + IndexPattern string `json:"index_pattern"` + MaxRetention map[string]int `json:"max_retention"` + MinRetention int `json:"min_retention"` + Shards int `json:"shards"` + Replicas int `json:"replicas"` + NormalRollover map[string]string `json:"normal_rollover"` + NormalPhases map[string]int `json:"normal_phases"` +} + +// LoadConfig 加载配置文件 func LoadConfig(path string) (*Config, error) { - file, err := os.ReadFile(path) + data, err := os.ReadFile(path) if err != nil { return nil, err } - var config Config - err = json.Unmarshal(file, &config) - if err != nil { + if err := json.Unmarshal(data, &config); err != nil { return nil, err } - return &config, nil } diff --git a/config/config.json b/config/config.json index c16da24..7c5091b 100644 --- a/config/config.json +++ b/config/config.json @@ -7,8 +7,60 @@ "auth": { "username": "fluentd_user", "password": "fluentd_password" + }, + "ilm": { + "data_types": { + "candle": { + "index_pattern": "logstash-%s.candle.*.%d-%02d", + "max_retention": { + "1m": 365, + "3m": 365, + "5m": 730, + "15m": 730, + "30m": 730, + "1h": 1095, + "2h": 1095, + "4h": 1825, + "6h": 1825, + "12h": 1825, + "1d": 2555, + "2d": 2555, + "5d": 2555, + "1W": 3650, + "1M": 3650, + "default": 365 + }, + "min_retention": 30, + "shards": 2, + "replicas": 1, + "normal_rollover": { + "max_size": "30GB", + "max_age": "14d" + }, + "normal_phases": { + "warm": 60, + "cold": 180 + } + }, + "ma": { + "index_pattern": "logstash-%s.ma.*.%d-%02d", + "max_retention": { + "1d": 2555, + "default": 365 + }, + "min_retention": 30, + "shards": 2, + "replicas": 1, + "normal_rollover": { + "max_size": "50GB", + "max_age": "30d" + }, + "normal_phases": { + "warm": 90, + "cold": 270 + } + } + } } } } - - diff --git a/elasticilm/ilm.go b/elasticilm/ilm.go new file mode 100644 index 0000000..3fb263b --- /dev/null +++ b/elasticilm/ilm.go @@ -0,0 +1,281 @@ +package elasticilm + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "math" + "net/http" + "time" + + cfg "gitea.zjmud.xyz/phyer/tanya/config" // 导入你的 config 包 +) + +// RetentionCalculator 计算保留时间的接口 +type RetentionCalculator interface { + Calculate(daysDiff int, period string) int +} + +// DefaultRetentionCalculator 默认保留时间计算器 +type DefaultRetentionCalculator struct { + MaxRetention map[string]int + MinRetention int +} + +// Calculate 计算保留时间(天) +func (c DefaultRetentionCalculator) Calculate(daysDiff int, period string) int { + minutes := periodToMinutes(period) + maxRetention := c.MaxRetention[period] + if maxRetention == 0 { + maxRetention = c.MaxRetention["default"] + } + + // 非线性衰减公式 + timeFactor := 1 - math.Sqrt(float64(daysDiff))/50 // 距今时间因子 + if timeFactor < 0 { + timeFactor = 0 + } + periodFactor := math.Sqrt(float64(minutes)) / math.Sqrt(43200) // 时间框架因子 + retention := float64(maxRetention) * timeFactor * periodFactor + if retention < float64(c.MinRetention) { + return c.MinRetention + } + return int(retention) +} + +// periodToMinutes 将时间框架转换为分钟数 +func periodToMinutes(period string) int { + switch period { + case "1m": + return 1 + case "3m": + return 3 + case "5m": + return 5 + case "15m": + return 15 + case "30m": + return 30 + case "1h": + return 60 + case "2h": + return 120 + case "4h": + return 240 + case "6h": + return 360 + case "12h": + return 720 + case "1d", "1D": + return 1440 + case "2d": + return 2880 + case "5d": + return 7200 + case "1W": + return 10080 + case "1M": + return 43200 // 假设 30 天 + default: + return 1 + } +} + +// ConfigureILM 动态配置 ILM 策略和索引模板 +func ConfigureILM(client *http.Client, config *cfg.Config, dataType, period string, indexTime time.Time) error { + // 从配置文件中获取 Elasticsearch 和 ILM 配置 + esConfig := config.Elasticsearch + dataConfig, ok := esConfig.ILM.DataTypes[dataType] + if !ok { + return fmt.Errorf("ILM configuration for data type '%s' not found in config", dataType) + } + + // 创建保留时间计算器 + calc := DefaultRetentionCalculator{ + MaxRetention: dataConfig.MaxRetention, + MinRetention: dataConfig.MinRetention, + } + + // 计算距今时间差和保留时间 + now := time.Now() + daysDiff := int(now.Sub(indexTime).Hours() / 24) + retentionDays := calc.Calculate(daysDiff, period) + + // 构造 ILM 策略名称 + year, month := indexTime.Year(), int(indexTime.Month()) + policyName := fmt.Sprintf("logstash_%s_%s_%d_%02d_%s", dataType, period, year, month, getPhase(daysDiff, retentionDays)) + + // 创建或更新 ILM 策略 + if err := ensureILMPolicy(client, esConfig, policyName, daysDiff, retentionDays, dataConfig); err != nil { + return fmt.Errorf("failed to ensure ILM policy: %v", err) + } + + // 配置索引模板 + templateName := fmt.Sprintf("logstash-%s-%s-%d-%02d", dataType, period, year, month) + indexPattern := fmt.Sprintf(dataConfig.IndexPattern, period, year, month) + // 根据数据类型设置映射 + var mappings map[string]interface{} + switch dataType { + case "candle": + mappings = map[string]interface{}{ + "properties": map[string]interface{}{ + "dataTime": map[string]string{"type": "date", "format": "yyyy-MM-dd HH:mm:ss"}, + "open": map[string]string{"type": "float"}, + "high": map[string]string{"type": "float"}, + "low": map[string]string{"type": "float"}, + "close": map[string]string{"type": "float"}, + "volume": map[string]string{"type": "float"}, + "volumeCcy": map[string]string{"type": "float"}, + }, + } + case "ma": + mappings = map[string]interface{}{ + "properties": map[string]interface{}{ + "dataTime": map[string]string{"type": "date", "format": "yyyy-MM-dd HH:mm:ss"}, + "ma_value": map[string]string{"type": "float"}, // MA 值 + }, + } + default: + return fmt.Errorf("unsupported data type: %s", dataType) + } + + template := map[string]interface{}{ + "index_patterns": []string{indexPattern}, + "template": map[string]interface{}{ + "settings": map[string]interface{}{ + "number_of_shards": dataConfig.Shards, + "number_of_replicas": dataConfig.Replicas, + "index.lifecycle.name": policyName, + }, + "mappings": mappings, + }, + } + + templateURL := fmt.Sprintf("%s/_index_template/%s", esConfig.URL, templateName) + templateData, _ := json.Marshal(template) + req, err := http.NewRequest("PUT", templateURL, bytes.NewBuffer(templateData)) + if err != nil { + return fmt.Errorf("failed to create template request: %v", err) + } + + req.Header.Set("Content-Type", "application/json") + req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) + + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to send template request: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("failed to create template, status: %d, response: %s", resp.StatusCode, string(body)) + } + + fmt.Printf("Successfully configured ILM template: %s\n", templateName) + return nil +} + +// getPhase 根据时间差和保留时间决定初始阶段 +func getPhase(daysDiff, retentionDays int) string { + if daysDiff > 730 || retentionDays < 180 { // 超过 2 年或保留时间少于 180 天 + return "cold" + } + return "normal" +} + +// ensureILMPolicy 创建或更新 ILM 策略 +func ensureILMPolicy(client *http.Client, esConfig cfg.ElasticsearchConfig, policyName string, daysDiff, retentionDays int, dataConfig cfg.DataTypeConfig) error { + policyURL := fmt.Sprintf("%s/_ilm/policy/%s", esConfig.URL, policyName) + resp, err := client.Get(policyURL) + if err == nil && resp.StatusCode == http.StatusOK { + resp.Body.Close() + return nil // 策略已存在 + } + resp.Body.Close() + + var policy map[string]interface{} + if daysDiff > 730 || retentionDays < 180 { + policy = map[string]interface{}{ + "policy": map[string]interface{}{ + "phases": map[string]interface{}{ + "cold": map[string]interface{}{ + "min_age": "0d", + "actions": map[string]interface{}{ + "allocate": map[string]interface{}{ + "include": map[string]string{"_tier_preference": "data_cold"}, + }, + "shrink": map[string]interface{}{ + "number_of_shards": 1, + }, + }, + }, + "delete": map[string]interface{}{ + "min_age": fmt.Sprintf("%dd", retentionDays), + }, + }, + }, + } + } else { + policy = map[string]interface{}{ + "policy": map[string]interface{}{ + "phases": map[string]interface{}{ + "hot": map[string]interface{}{ + "actions": map[string]interface{}{ + "rollover": dataConfig.NormalRollover, + "allocate": map[string]interface{}{ + "include": map[string]string{"_tier_preference": "data_hot"}, + }, + }, + }, + "warm": map[string]interface{}{ + "min_age": fmt.Sprintf("%dd", dataConfig.NormalPhases["warm"]), + "actions": map[string]interface{}{ + "allocate": map[string]interface{}{ + "include": map[string]string{"_tier_preference": "data_warm"}, + }, + }, + }, + "cold": map[string]interface{}{ + "min_age": fmt.Sprintf("%dd", dataConfig.NormalPhases["cold"]), + "actions": map[string]interface{}{ + "allocate": map[string]interface{}{ + "include": map[string]string{"_tier_preference": "data_cold"}, + }, + "shrink": map[string]interface{}{ + "number_of_shards": 1, + }, + }, + }, + "delete": map[string]interface{}{ + "min_age": fmt.Sprintf("%dd", retentionDays), + }, + }, + }, + } + } + + policyData, _ := json.Marshal(policy) + req, err := http.NewRequest("PUT", policyURL, bytes.NewBuffer(policyData)) + if err != nil { + return fmt.Errorf("failed to create ILM policy request: %v", err) + } + + req.Header.Set("Content-Type", "application/json") + req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) + + resp, err = client.Do(req) + if err != nil { + return fmt.Errorf("failed to send ILM policy request: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("failed to create ILM policy, status: %d, response: %s", resp.StatusCode, string(body)) + } + + fmt.Printf("Successfully created ILM policy: %s\n", policyName) + return nil +} diff --git a/okx/candleList.go b/okx/candleList.go index 54fdb03..e675c88 100644 --- a/okx/candleList.go +++ b/okx/candleList.go @@ -6,8 +6,8 @@ import ( "encoding/json" "fmt" cfg "gitea.zjmud.xyz/phyer/tanya/config" + "gitea.zjmud.xyz/phyer/tanya/elasticilm" "io" - "math" "net/http" "strconv" "strings" @@ -384,47 +384,7 @@ func calculateCrossPrice(price1, price2 string) (string, error) { return fmt.Sprintf("%.8f", result), nil } -// parsePeriod 将周期字符串转换为time.Duration -func parsePeriod(period string) (time.Duration, error) { - switch period { - case "1m": - return time.Minute, nil - case "3m": - return 3 * time.Minute, nil - case "5m": - return 5 * time.Minute, nil - case "15m": - return 15 * time.Minute, nil - case "30m": - return 30 * time.Minute, nil - case "1H": - return time.Hour, nil - case "2H": - return 2 * time.Hour, nil - case "4H": - return 4 * time.Hour, nil - case "6H": - return 6 * time.Hour, nil - case "12H": - return 12 * time.Hour, nil - case "1D": - return 24 * time.Hour, nil - case "D": - return 24 * time.Hour, nil - case "2D": - return 48 * time.Hour, nil - case "5D": - return 120 * time.Hour, nil - case "7D": - return 168 * time.Hour, nil - default: - return 0, fmt.Errorf("unsupported period: %s", period) - } -} - -// ToElastic 将Candle数据发送到Elasticsearch - -// ToElastic 将 Candle 数据发送到 Elasticsearch,并动态配置 ILM 策略 +// ToElastic 将 Candle 数据发送到 Elasticsearch func (cl *CandleList) ToElastic() error { client := &http.Client{Timeout: 30 * time.Second} @@ -445,17 +405,35 @@ func (cl *CandleList) ToElastic() error { return fmt.Errorf("failed to load config: %v", err) } + // 如果没有数据,直接返回 + if len(cl.Candles) == 0 { + return nil + } + + // 使用第一条数据的时间戳配置 ILM + firstCandle := cl.Candles[0] + ts, err := strconv.ParseInt(firstCandle.Timestamp, 10, 64) + if err != nil { + return fmt.Errorf("invalid timestamp in first candle: %v", err) + } + loc, _ := time.LoadLocation("Asia/Shanghai") + cstTime := time.UnixMilli(ts).In(loc) + + // 配置 ILM(只执行一次) + err = elasticilm.ConfigureILM(client, config, "candle", strings.ToLower(cl.Period), cstTime) + if err != nil { + return fmt.Errorf("failed to configure ILM: %v", err) + } + + // 批量插入数据 for _, candle := range cl.Candles { ts, err := strconv.ParseInt(candle.Timestamp, 10, 64) if err != nil { - fmt.Println("invalid timestamp:", ts, err) + fmt.Println("invalid timestamp:", candle.Timestamp, err) continue } - loc, _ := time.LoadLocation("Asia/Shanghai") cstTime := time.UnixMilli(ts).In(loc) - - // 新索引名称:logstash-.candle..- index := fmt.Sprintf("logstash-%s.candle.%s.%d-%02d", strings.ToLower(cl.Period), strings.ToLower(cl.CoinPair), @@ -463,7 +441,7 @@ func (cl *CandleList) ToElastic() error { int(cstTime.Month()), ) - // 验证时间戳对齐 + // 检查时间戳对齐 if cl.Period == "1D" { if cstTime.Hour() != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 { return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period) @@ -474,13 +452,6 @@ func (cl *CandleList) ToElastic() error { } } - // 动态配置 ILM - err = configureILM(client, config, index, cstTime, cl.Period) - if err != nil { - return fmt.Errorf("failed to configure ILM: %v", err) - } - - // 构造记录 record := map[string]interface{}{ "dataTime": cstTime.Format("2006-01-02 15:04:05"), "open": candle.Open, @@ -532,246 +503,40 @@ func (cl *CandleList) ToElastic() error { return nil } -// configureILM 动态配置 ILM 策略 -func configureILM(client *http.Client, config *cfg.Config, index string, indexTime time.Time, period string) error { - now := time.Now() - daysDiff := int(now.Sub(indexTime).Hours() / 24) // 距今天数 - - // 时间框架转换为分钟数 - minutes := periodToMinutes(period) - - // 计算保留时间(天) - maxRetention := getMaxRetention(period) - retentionDays := calculateRetention(daysDiff, minutes, maxRetention) - - // 决定 ILM 策略 - var ilmPolicy string - if daysDiff > 730 || retentionDays < 180 { // 超过 2 年或保留时间短,直��� cold - ilmPolicy = fmt.Sprintf("logstash_candle_%s_%d_cold", period, indexTime.Year()) - } else { - ilmPolicy = fmt.Sprintf("logstash_candle_%s_%d_normal", period, indexTime.Year()) - } - - // 创建 ILM 策略 - if err := ensureILMPolicy(client, config, ilmPolicy, daysDiff, retentionDays); err != nil { - return err - } - - // 配置索引模板 - templateName := fmt.Sprintf("logstash-candle-%s-%d-%02d", period, indexTime.Year(), int(indexTime.Month())) - template := map[string]interface{}{ - "index_patterns": []string{fmt.Sprintf("logstash-%s.candle.*.%d-%02d", period, indexTime.Year(), int(indexTime.Month()))}, - "template": map[string]interface{}{ - "settings": map[string]interface{}{ - "number_of_shards": 2, - "number_of_replicas": 1, - "index.lifecycle.name": ilmPolicy, - }, - "mappings": map[string]interface{}{ - "properties": map[string]interface{}{ - "dataTime": map[string]string{"type": "date", "format": "yyyy-MM-dd HH:mm:ss"}, - "open": map[string]string{"type": "float"}, - "high": map[string]string{"type": "float"}, - "low": map[string]string{"type": "float"}, - "close": map[string]string{"type": "float"}, - "volume": map[string]string{"type": "float"}, - "volumeCcy": map[string]string{"type": "float"}, - }, - }, - }, - } - - templateURL := fmt.Sprintf("%s/_index_template/%s", strings.TrimRight(config.Elasticsearch.URL, "/"), templateName) - templateData, _ := json.Marshal(template) - req, err := http.NewRequest("PUT", templateURL, bytes.NewBuffer(templateData)) - if err != nil { - return fmt.Errorf("failed to create template request: %v", err) - } - - req.Header.Set("Content-Type", "application/json") - req.SetBasicAuth(config.Elasticsearch.Auth.Username, config.Elasticsearch.Auth.Password) - - resp, err := client.Do(req) - if err != nil { - return fmt.Errorf("failed to create index template: %v", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("failed to create template, status: %d, response: %s", resp.StatusCode, string(body)) - } - - fmt.Printf("Successfully configured ILM template: %s\n", templateName) - return nil -} - -// periodToMinutes 将时间框架转换为分钟数 -func periodToMinutes(period string) int { +// parsePeriod 将时间框架字符串转换为 time.Duration +func parsePeriod(period string) (time.Duration, error) { switch period { case "1m": - return 1 + return time.Minute, nil case "3m": - return 3 + return 3 * time.Minute, nil case "5m": - return 5 + return 5 * time.Minute, nil case "15m": - return 15 + return 15 * time.Minute, nil case "30m": - return 30 + return 30 * time.Minute, nil case "1h": - return 60 + return time.Hour, nil case "2h": - return 120 + return 2 * time.Hour, nil case "4h": - return 240 + return 4 * time.Hour, nil case "6h": - return 360 + return 6 * time.Hour, nil case "12h": - return 720 - case "1d", "1D": - return 1440 + return 12 * time.Hour, nil + case "1D", "1d": + return 24 * time.Hour, nil case "2d": - return 2880 + return 48 * time.Hour, nil case "5d": - return 7200 + return 120 * time.Hour, nil case "1W": - return 10080 + return 168 * time.Hour, nil case "1M": - return 43200 // 假设 30 天 + return 720 * time.Hour, nil // 假设 30 天 default: - return 1 // 默认 1 分钟 + return 0, fmt.Errorf("unsupported period: %s", period) } } - -// getMaxRetention 获取时间框架的最大保留时间(天) -func getMaxRetention(period string) int { - switch period { - case "1m", "3m": - return 365 // 1 年 - case "5m", "15m", "30m": - return 730 // 2 年 - case "1h", "2h": - return 1095 // 3 年 - case "4h", "6h", "12h": - return 1825 // 5 年 - case "1d", "1D", "2d", "5d": - return 2555 // 7 年 - case "1W", "1M": - return 3650 // 10 年 - default: - return 365 - } -} - -// calculateRetention 计算保留时间(天) -func calculateRetention(daysDiff, minutes, maxRetention int) int { - timeFactor := 1 - math.Sqrt(float64(daysDiff))/50 // 距今时间非线性衰减 - if timeFactor < 0 { - timeFactor = 0 - } - periodFactor := math.Sqrt(float64(minutes)) / math.Sqrt(43200) // 时间框架非线性增长 - retention := float64(maxRetention) * timeFactor * periodFactor - if retention < 30 { - return 30 // 最小保留 30 天 - } - return int(retention) -} - -// ensureILMPolicy 确保 ILM 策略存在 -func ensureILMPolicy(client *http.Client, config *cfg.Config, policyName string, daysDiff, retentionDays int) error { - policyURL := fmt.Sprintf("%s/_ilm/policy/%s", strings.TrimRight(config.Elasticsearch.URL, "/"), policyName) - resp, err := client.Get(policyURL) - if err == nil && resp.StatusCode == http.StatusOK { - resp.Body.Close() - return nil - } - resp.Body.Close() - - var policy map[string]interface{} - if daysDiff > 730 || retentionDays < 180 { // 超过 2 年或保留时间短,直接 cold - policy = map[string]interface{}{ - "policy": map[string]interface{}{ - "phases": map[string]interface{}{ - "cold": map[string]interface{}{ - "min_age": "0d", - "actions": map[string]interface{}{ - "allocate": map[string]interface{}{ - "include": map[string]string{"_tier_preference": "data_cold"}, - }, - "shrink": map[string]interface{}{ - "number_of_shards": 1, - }, - }, - }, - "delete": map[string]interface{}{ - "min_age": fmt.Sprintf("%dd", retentionDays), - }, - }, - }, - } - } else { - policy = map[string]interface{}{ - "policy": map[string]interface{}{ - "phases": map[string]interface{}{ - "hot": map[string]interface{}{ - "actions": map[string]interface{}{ - "rollover": map[string]interface{}{ - "max_size": "30GB", - "max_age": "14d", - }, - "allocate": map[string]interface{}{ - "include": map[string]string{"_tier_preference": "data_hot"}, - }, - }, - }, - "warm": map[string]interface{}{ - "min_age": "60d", - "actions": map[string]interface{}{ - "allocate": map[string]interface{}{ - "include": map[string]string{"_tier_preference": "data_warm"}, - }, - }, - }, - "cold": map[string]interface{}{ - "min_age": "180d", - "actions": map[string]interface{}{ - "allocate": map[string]interface{}{ - "include": map[string]string{"_tier_preference": "data_cold"}, - }, - "shrink": map[string]interface{}{ - "number_of_shards": 1, - }, - }, - }, - "delete": map[string]interface{}{ - "min_age": fmt.Sprintf("%dd", retentionDays), - }, - }, - }, - } - } - - policyData, _ := json.Marshal(policy) - req, err := http.NewRequest("PUT", policyURL, bytes.NewBuffer(policyData)) - if err != nil { - return fmt.Errorf("failed to create ILM policy request: %v", err) - } - - req.Header.Set("Content-Type", "application/json") - req.SetBasicAuth(config.Elasticsearch.Auth.Username, config.Elasticsearch.Auth.Password) - - resp, err = client.Do(req) - if err != nil { - return fmt.Errorf("failed to create ILM policy: %v", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("failed to create ILM policy, status: %d, response: %s", resp.StatusCode, string(body)) - } - - fmt.Printf("Successfully created ILM policy: %s\n", policyName) - return nil -}