tanya/elasticilm/ilm.go
2025-03-30 19:32:12 +08:00

683 lines
22 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package elasticilm
import (
"bytes"
"encoding/json"
"fmt"
cfg "gitea.zjmud.xyz/phyer/tanya/config" // 导入你的 config 包
"io"
"net/http"
"strconv"
"strings"
"time"
)
// RetentionCalculator 计算保留时间的接口
type RetentionCalculator interface {
Calculate(daysDiff int, period string) int
}
// DefaultRetentionCalculator 默认保留时间计算器
type DefaultRetentionCalculator struct {
MaxRetention map[string]int
MinRetention int
}
var periodBaseConfig = map[string]struct {
Base int // 基础值
Interval float64 // 时间间隔系数
}{
"1m": {30, 1.0},
"3m": {45, 1.5},
"5m": {60, 2.0},
"15m": {90, 3.0},
"30m": {120, 4.0},
"1H": {150, 5.0},
"2H": {180, 6.0},
"4H": {240, 8.0},
"6H": {300, 10.0},
"12H": {360, 12.0},
"1D": {480, 16.0},
"2D": {600, 20.0},
"5D": {720, 24.0},
"1W": {840, 28.0},
}
func NonLinearCoolingModel(daysDiff int, period string, config map[string]float64) (int, int, int) {
// 获取基础配置
cfg, ok := periodBaseConfig[period]
if !ok {
cfg = periodBaseConfig["1m"] // 默认配置
}
// 从配置获取参数,设置默认值
timeDecayFactor := 0.5
if td, ok := config["timeDecayFactor"]; ok {
timeDecayFactor = td
}
// 反转时间衰减因子:数据越新,保留时间越长
// 使用反比例函数1/(x+1) 确保新数据(小daysDiff)获得更大值
ageFactor := timeDecayFactor / (float64(daysDiff)/365.0 + 1.0)
// 计算基础保留时间(与时间框架相关)
base := cfg.Base
// 应用年龄因子和阶段乘数 - 新数据获得更长保留时间
warmPhaseMultiplier := 1.0
if wm, ok := config["warmPhaseMultiplier"]; ok {
warmPhaseMultiplier = wm
}
warm := int(float64(base) * warmPhaseMultiplier * (1.0 + ageFactor))
cold := warm * 2
delete := warm * 3
// 确保最小值
if warm < cfg.Base {
warm = cfg.Base
}
if cold < warm*2 {
cold = warm * 2
}
if delete < warm*3 {
delete = warm * 3
}
// 打印计算结果
fmt.Printf("[Cooling Model] Period: %s, DaysDiff: %d => warm=%dd, cold=%dd, delete=%dd\n",
period, daysDiff, warm, cold, delete)
return warm, cold, delete
}
// Calculate 使用非线性冷却模型计算保留时间
func (c DefaultRetentionCalculator) Calculate(daysDiff int, period string) int {
// 默认配置
config := map[string]float64{
"timeDecayFactor": 0.5,
"periodGranularityFactor": 0.5,
"warmPhaseMultiplier": 1.0,
"coldPhaseMultiplier": 2.0,
"deletePhaseMultiplier": 3.0,
}
_, _, deleteDays := NonLinearCoolingModel(daysDiff, period, config)
return deleteDays
}
// periodToMinutes 将时间框架转换为分钟数
func periodToMinutes(period string) int {
switch period {
case "1m":
return 1
case "3m":
return 3
case "5m":
return 5
case "15m":
return 15
case "30m":
return 30
case "1h":
return 60
case "2h":
return 120
case "4h":
return 240
case "6h":
return 360
case "12h":
return 720
case "1d", "1D":
return 1440
case "2d":
return 2880
case "5d":
return 7200
case "1W":
return 10080
case "1M":
return 43200 // 假设 30 天
default:
return 1
}
}
// getPhase 根据时间差和保留时间决定初始阶段
func getPhase(daysDiff, retentionDays int) string {
if daysDiff > 730 || retentionDays < 180 { // 超过 2 年或保留时间少于 180 天
fmt.Printf("[ILM Phase] 数据已超过2年或保留时间少于180天判定为cold阶段。daysDiff: %d, retentionDays: %d\n", daysDiff, retentionDays)
return "cold"
}
if daysDiff <= 7 { // 7天内的数据为hot阶段
fmt.Printf("[ILM Phase] 数据在7天内判定为hot阶段。daysDiff: %d\n", daysDiff)
return "hot"
}
fmt.Printf("[ILM Phase] 数据在7天到2年之间判定为warm阶段。daysDiff: %d\n", daysDiff)
return "warm"
}
func ensureAlias(client *http.Client, esConfig cfg.ElasticsearchConfig, alias, period, dataType, coinPair string, indexTime time.Time, dataConfig cfg.DataTypeConfig, daysDiff int) error {
// 获取当前别名关联的索引
getAliasURL := fmt.Sprintf("%s/_alias/%s", esConfig.URL, alias)
req, err := http.NewRequest("GET", getAliasURL, nil)
if err != nil {
return err
}
req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password)
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
// 准备别名更新的操作
var actions []interface{}
var aliasInfo map[string]map[string]interface{}
// 如果别名存在,解析当前关联的索引
if resp.StatusCode == http.StatusOK {
if err := json.NewDecoder(resp.Body).Decode(&aliasInfo); err != nil {
return err
}
// 移除所有现有索引的写入索引标记
for indexName, info := range aliasInfo {
if aliases, ok := info["aliases"].(map[string]interface{}); ok {
for aliasName, aliasDetails := range aliases {
if aliasName == alias && aliasDetails.(map[string]interface{})["is_write_index"] == true {
actions = append(actions, map[string]interface{}{
"remove": map[string]interface{}{
"index": indexName,
"alias": alias,
},
})
}
}
}
}
}
// 确定最新的索引作为写入索引
year, month := indexTime.Year(), int(indexTime.Month())
latestIndex := fmt.Sprintf("logstash-%s.%s.%s.%d-%02d", strings.ToLower(period), strings.ToLower(dataType), strings.ToLower(coinPair), year, month)
// 如果没有找到合适的索引,创建一个新的索引模式
if latestIndex == "" {
latestIndex = fmt.Sprintf("logstash-%s.candle.okb-eth.%s", period, time.Now().Format("2006-01"))
}
// 检查索引是否存在,如果不存在则创建
indexURL := fmt.Sprintf("%s/%s", esConfig.URL, latestIndex)
req, err = http.NewRequest("HEAD", indexURL, nil)
if err != nil {
return err
}
req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password)
resp, err = client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
// 索引不存在,创建索引
fmt.Printf("Index %s does not exist, creating it...\n", latestIndex)
createIndexURL := fmt.Sprintf("%s/%s", esConfig.URL, latestIndex)
// 简单的索引创建请求,可以根据需要添加更多设置
indexData := map[string]interface{}{
"settings": map[string]interface{}{
"number_of_shards": 2, // 根据你的配置调整
"number_of_replicas": 1, // 根据你的配置调整
},
}
data, err := json.Marshal(indexData)
if err != nil {
return fmt.Errorf("failed to marshal index creation data: %v", err)
}
req, err = http.NewRequest("PUT", createIndexURL, bytes.NewBuffer(data))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password)
resp, err = client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("failed to create index %s, status: %d, response: %s", latestIndex, resp.StatusCode, string(body))
}
fmt.Printf("Successfully created index: %s\n", latestIndex)
}
// 添加最新的索引作为写入索引
actions = append(actions, map[string]interface{}{
"add": map[string]interface{}{
"index": latestIndex,
"alias": alias,
"is_write_index": true,
},
})
// 确保其他索引的 is_write_index 为 false
for indexName := range aliasInfo {
if indexName != latestIndex {
actions = append(actions, map[string]interface{}{
"add": map[string]interface{}{
"index": indexName,
"alias": alias,
"is_write_index": false,
},
})
}
}
// 执行别名更新操作
aliasURL := fmt.Sprintf("%s/_aliases", esConfig.URL)
aliasData := map[string]interface{}{
"actions": actions,
}
data, err := json.Marshal(aliasData)
if err != nil {
return err
}
req, err = http.NewRequest("POST", aliasURL, bytes.NewBuffer(data))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password)
resp, err = client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
// 加载冷却模型配置
coolingConfig := dataConfig.CoolingModelConfig // Ensure this matches the actual field name
// 使用非线性冷却模型计算各个阶段的时间
warmDays, coldDays, deleteDays := NonLinearCoolingModel(daysDiff, period, coolingConfig)
fmt.Printf("[ILM Policy] Calculated phases: warm=%dd, cold=%dd, delete=%dd\n", warmDays, coldDays, deleteDays)
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("failed to create alias, status: %d, response: %s", resp.StatusCode, string(body))
}
return nil
}
func ConfigureILM(client *http.Client, config *cfg.Config, dataType, period, coinPair string, indexTime time.Time) error {
esConfig := config.Elasticsearch
dataConfig, ok := esConfig.ILM.DataTypes[dataType]
if !ok {
return fmt.Errorf("ILM configuration for data type '%s' not found in config", dataType)
}
// 计算保留时间和时间差
calc := DefaultRetentionCalculator{
MaxRetention: dataConfig.MaxRetention,
MinRetention: dataConfig.MinRetention,
}
now := time.Now()
daysDiff := int(now.Sub(indexTime).Hours() / 24)
retentionDays := calc.Calculate(daysDiff, period)
// 确保保留时间不低于冷阶段的最小值
minDeleteAge := dataConfig.NormalPhases["cold"]
if retentionDays < minDeleteAge {
retentionDays = minDeleteAge
}
// 格式化策略名称、模板名称、索引模式和别名
year, month := indexTime.Year(), int(indexTime.Month())
policyName := fmt.Sprintf("logstash_%s_%s_%d_%02d", strings.ToLower(dataType), strings.ToLower(period), year, month)
templateName := fmt.Sprintf("logstash_%s_%s_%d_%02d", dataType, period, year, month)
indexPattern := fmt.Sprintf("logstash-%s.%s.*.%d-%02d", strings.ToLower(period), strings.ToLower(dataType), year, month)
aliasName := fmt.Sprintf("logstash-%s.%s.%s.%d-%02d_alias", strings.ToLower(period), strings.ToLower(dataType), strings.ToLower(coinPair), year, month)
fmt.Printf("[ConfigureILM] Configuring ILM with policyName: %s, templateName: %s, aliasName: %s\n", policyName, templateName, aliasName)
// 创建索引模板
template := map[string]interface{}{
"index_patterns": []string{indexPattern},
"priority": 100,
"template": map[string]interface{}{
"settings": map[string]interface{}{
"number_of_shards": dataConfig.Shards,
"number_of_replicas": dataConfig.Replicas,
"index.lifecycle.name": policyName,
"index.lifecycle.rollover_alias": aliasName,
},
"mappings": map[string]interface{}{
"properties": map[string]interface{}{
"dataTime": map[string]string{"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
// 根据不同的dataType保留通用字段
"coinPair": map[string]string{"type": "keyword"},
"period": map[string]string{"type": "keyword"},
// 以下字段仅适用于candle类型其他数据类型可能需要不同字段
"open": map[string]string{"type": "float"},
"high": map[string]string{"type": "float"},
"low": map[string]string{"type": "float"},
"close": map[string]string{"type": "float"},
"volume": map[string]string{"type": "float"},
"volumeCcy": map[string]string{"type": "float"},
},
},
},
}
templateURL := fmt.Sprintf("%s/_index_template/%s", esConfig.URL, templateName)
templateData, err := json.Marshal(template)
if err != nil {
return fmt.Errorf("failed to marshal index template data: %v", err)
}
req, err := http.NewRequest("PUT", templateURL, bytes.NewBuffer(templateData))
if err != nil {
return fmt.Errorf("failed to create index template request: %v", err)
}
req.Header.Set("Content-Type", "application/json")
req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password)
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to send index template request: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("failed to create index template, status: %d, response: %s", resp.StatusCode, string(body))
}
fmt.Printf("[ConfigureILM] Successfully configured ILM template: %s\n", templateName)
// 创建或更新 ILM 策略
if err := ensureILMPolicy(client, esConfig, policyName, aliasName, period, dataType, daysDiff, retentionDays, dataConfig, indexTime, coinPair); err != nil {
return fmt.Errorf("failed to ensure ILM policy: %v", err)
}
// 设置别名,确保每次调用时传入正确的 aliasName 和 indexTime
//
if err := ensureAlias(client, esConfig, aliasName, period, dataType, coinPair, indexTime, dataConfig, daysDiff); err != nil {
return fmt.Errorf("failed to ensure alias: %v", err)
}
fmt.Printf("[ConfigureILM] Successfully completed ILM configuration for policy: %s\n", policyName)
return nil
}
func ensureILMPolicy(client *http.Client, esConfig cfg.ElasticsearchConfig, policyName, aliasName, period, dataType string, daysDiff, retentionDays int, dataConfig cfg.DataTypeConfig, indexTime time.Time, coinPair string) error {
// 检查 default_policy 是否存在
defaultPolicyURL := fmt.Sprintf("%s/_ilm/policy/default_policy", esConfig.URL)
resp, err := client.Get(defaultPolicyURL)
if err != nil {
return fmt.Errorf("failed to check default_policy: %v", err)
}
defer resp.Body.Close()
// 如果 default_policy 不存在,则创建
if resp.StatusCode != http.StatusOK {
fmt.Println("[ILM Policy] Default policy does not exist, creating it...")
defaultPolicy := map[string]interface{}{
"policy": map[string]interface{}{
"phases": map[string]interface{}{
"hot": map[string]interface{}{
"actions": map[string]interface{}{
"set_priority": map[string]interface{}{
"priority": 100,
},
},
},
"delete": map[string]interface{}{
"min_age": "365d",
"actions": map[string]interface{}{
"delete": map[string]interface{}{},
},
},
},
},
}
defaultPolicyData, err := json.Marshal(defaultPolicy)
if err != nil {
return fmt.Errorf("failed to marshal default_policy: %v", err)
}
req, err := http.NewRequest("PUT", defaultPolicyURL, bytes.NewBuffer(defaultPolicyData))
if err != nil {
return fmt.Errorf("failed to create default_policy request: %v", err)
}
req.Header.Set("Content-Type", "application/json")
req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password)
resp, err = client.Do(req)
if err != nil {
return fmt.Errorf("failed to create default_policy: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("failed to create default_policy, status: %d, response: %s", resp.StatusCode, string(body))
}
fmt.Println("[ILM Policy] Successfully created default_policy")
}
// 检查目标策略是否存在
policyURL := fmt.Sprintf("%s/_ilm/policy/%s", esConfig.URL, policyName)
resp, err = client.Get(policyURL)
if err != nil {
return fmt.Errorf("failed to check ILM policy: %v", err)
}
defer resp.Body.Close()
// 如果策略已存在,更新索引设置并删除旧策略
if resp.StatusCode == http.StatusOK {
fmt.Printf("[ILM Policy] Policy %s already exists, updating index settings...\n", policyName)
// 提取年份和月份
year, month := indexTime.Year(), int(indexTime.Month())
indexName := fmt.Sprintf("logstash-%s.candle.%s.%d-%02d", strings.ToLower(period), strings.ToLower(coinPair), year, month)
// 首先检查索引是否存在
indexURL := fmt.Sprintf("%s/%s", esConfig.URL, indexName)
req, err := http.NewRequest("HEAD", indexURL, nil)
if err != nil {
return fmt.Errorf("failed to create index check request: %v", err)
}
req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password)
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to check index existence: %v", err)
}
defer resp.Body.Close()
// 如果索引不存在,创建它
if resp.StatusCode == http.StatusNotFound {
createIndexURL := fmt.Sprintf("%s/%s", esConfig.URL, indexName)
indexData := map[string]interface{}{
"settings": map[string]interface{}{
"number_of_shards": 2,
"number_of_replicas": 1,
},
}
data, err := json.Marshal(indexData)
if err != nil {
return fmt.Errorf("failed to marshal index creation data: %v", err)
}
req, err = http.NewRequest("PUT", createIndexURL, bytes.NewBuffer(data))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password)
resp, err = client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("failed to create index %s, status: %d, response: %s", indexName, resp.StatusCode, string(body))
}
fmt.Printf("Successfully created index: %s\n", indexName)
}
// 更新索引设置
updateIndexURL := fmt.Sprintf("%s/%s/_settings", esConfig.URL, indexName)
updateData := map[string]interface{}{
"index.lifecycle.name": policyName,
"index.lifecycle.rollover_alias": fmt.Sprintf("logstash-%s.candle.%s.%d-%02d_alias", period, coinPair, year, month),
}
updateDataBytes, err := json.Marshal(updateData)
if err != nil {
return fmt.Errorf("failed to marshal update index data: %v", err)
}
req, err = http.NewRequest("PUT", updateIndexURL, bytes.NewBuffer(updateDataBytes))
if err != nil {
return fmt.Errorf("failed to create update index request: %v", err)
}
req.Header.Set("Content-Type", "application/json")
req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password)
updateResp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to update index settings: %v", err)
}
defer updateResp.Body.Close()
if updateResp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(updateResp.Body)
return fmt.Errorf("failed to update index settings, status: %d, response: %s", updateResp.StatusCode, string(body))
}
fmt.Printf("[ILM Policy] Successfully updated index settings for policy: %s\n", policyName)
fmt.Printf("[ILM Policy] Policy %s already exists, attempting to update...\n", policyName)
} else {
fmt.Printf("[ILM Policy] Policy %s does not exist, creating a new one...\n", policyName)
}
// 创建/更新 ILM 策略
initialPhase := getPhase(daysDiff, retentionDays)
// 当策略已存在时Elasticsearch 允许通过 PUT 请求直接更新策略
// 注意:某些策略修改可能需要满足条件(如不能缩短 min_age 时间)
fmt.Printf("[ILM Policy] Initial phase for policy %s is: %s\n", policyName, initialPhase)
rolloverActions := map[string]interface{}{
"max_age": dataConfig.NormalRollover["max_age"],
"max_size": dataConfig.NormalRollover["max_size"],
}
if maxDocsStr, ok := dataConfig.NormalRollover["max_docs"]; ok && maxDocsStr != "" {
maxDocs, err := strconv.Atoi(maxDocsStr)
if err != nil || maxDocs <= 0 {
fmt.Printf("[ILM Policy] Invalid max_docs value: %s, skipping max_docs in rollover\n", maxDocsStr)
} else {
rolloverActions["max_docs"] = maxDocs
}
} else {
fmt.Println("[ILM Policy] max_docs not set in NormalRollover, skipping max_docs in rollover")
}
policy := map[string]interface{}{
"policy": map[string]interface{}{
"phases": map[string]interface{}{
initialPhase: map[string]interface{}{
"actions": map[string]interface{}{
"rollover": rolloverActions,
"set_priority": map[string]interface{}{
"priority": 100,
},
},
},
"warm": map[string]interface{}{
"min_age": fmt.Sprintf("%dd", dataConfig.NormalPhases["warm"]),
"actions": map[string]interface{}{
"allocate": map[string]interface{}{
"include": map[string]string{"_tier_preference": "data_warm"},
},
},
},
"cold": map[string]interface{}{
"min_age": fmt.Sprintf("%dd", dataConfig.NormalPhases["cold"]),
"actions": map[string]interface{}{
"allocate": map[string]interface{}{
"include": map[string]string{"_tier_preference": "data_cold"},
},
},
},
"delete": map[string]interface{}{
"min_age": fmt.Sprintf("%dd", retentionDays),
"actions": map[string]interface{}{
"delete": map[string]interface{}{},
},
},
},
},
}
// 如果数据超过 730 天2 年),直接进入冷阶段
if daysDiff > 730 {
phases, ok := policy["policy"].(map[string]interface{})["phases"].(map[string]interface{})
if !ok {
return fmt.Errorf("failed to get phases from policy")
}
delete(phases, initialPhase)
if _, exists := phases["cold"]; !exists {
phases["cold"] = map[string]interface{}{
"min_age": "0d",
"actions": map[string]interface{}{
"allocate": map[string]interface{}{
"include": map[string]string{"_tier_preference": "data_cold"},
},
},
}
} else {
coldPhase, ok := phases["cold"].(map[string]interface{})
if !ok {
return fmt.Errorf("failed to get cold phase from phases")
}
coldPhase["min_age"] = "0d"
}
}
// 创建或更新策略
policyData, err := json.Marshal(policy)
if err != nil {
return fmt.Errorf("failed to marshal ILM policy: %v", err)
}
req, err := http.NewRequest("PUT", policyURL, bytes.NewBuffer(policyData))
if err != nil {
return fmt.Errorf("failed to create ILM policy request: %v", err)
}
req.Header.Set("Content-Type", "application/json")
req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password)
resp, err = client.Do(req)
if err != nil {
return fmt.Errorf("failed to send ILM policy request: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("failed to create/update ILM policy, status: %d, response: %s", resp.StatusCode, string(body))
}
// 记录策略更新时间
action := "updated"
if resp.StatusCode == http.StatusCreated {
action = "created"
}
fmt.Printf("[ILM Policy] Successfully %s ILM policy: %s (modified: %v)\n",
action,
policyName,
time.Now().Format("2006-01-02 15:04:05"))
return nil
}