ToElastic支持动态ilm
This commit is contained in:
parent
e0700e77a9
commit
e73c36725c
@ -7,6 +7,7 @@ import (
|
||||
"fmt"
|
||||
cfg "gitea.zjmud.xyz/phyer/tanya/config"
|
||||
"io"
|
||||
"math"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
@ -422,38 +423,47 @@ func parsePeriod(period string) (time.Duration, error) {
|
||||
}
|
||||
|
||||
// ToElastic 将Candle数据发送到Elasticsearch
|
||||
|
||||
// ToElastic 将 Candle 数据发送到 Elasticsearch,并动态配置 ILM 策略
|
||||
func (cl *CandleList) ToElastic() error {
|
||||
client := &http.Client{Timeout: 30 * time.Second}
|
||||
|
||||
// 创建带超时的HTTP客户端
|
||||
client := &http.Client{
|
||||
Timeout: 30 * time.Second,
|
||||
}
|
||||
|
||||
// 获取周期的 duration
|
||||
duration, err := parsePeriod(cl.Period)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse period: %v", err)
|
||||
}
|
||||
|
||||
var config *cfg.Config
|
||||
configPaths := []string{"config/config.json", "../config/config.json", "../../config/config.json"}
|
||||
for _, path := range configPaths {
|
||||
config, err = cfg.LoadConfig(path)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to load config: %v", err)
|
||||
}
|
||||
|
||||
for _, candle := range cl.Candles {
|
||||
// 解析时间戳
|
||||
ts, err := strconv.ParseInt(candle.Timestamp, 10, 64)
|
||||
if err != nil {
|
||||
fmt.Println("invalid timestamp:", ts, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// 转换为东八区时间
|
||||
loc, _ := time.LoadLocation("Asia/Shanghai")
|
||||
cstTime := time.UnixMilli(ts).In(loc)
|
||||
|
||||
// 根据当前记录的时间确定年份
|
||||
currentYear := cstTime.Year()
|
||||
// 新索引名称:logstash-<period>.candle.<coinpair>.<year>-<month>
|
||||
index := fmt.Sprintf("logstash-%s.candle.%s.%d-%02d",
|
||||
strings.ToLower(cl.Period),
|
||||
strings.ToLower(cl.CoinPair),
|
||||
cstTime.Year(),
|
||||
int(cstTime.Month()),
|
||||
)
|
||||
|
||||
// 动态构造索引名称
|
||||
index := fmt.Sprintf("logstash-candle.%s.%d.%s", strings.ToLower(cl.CoinPair), currentYear, strings.ToLower(cl.Period))
|
||||
|
||||
// 验证时间戳是否为周期的整数倍
|
||||
// 验证时间戳对齐
|
||||
if cl.Period == "1D" {
|
||||
if cstTime.Hour() != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 {
|
||||
return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period)
|
||||
@ -464,24 +474,15 @@ func (cl *CandleList) ToElastic() error {
|
||||
}
|
||||
}
|
||||
|
||||
// 添加调试信息
|
||||
fmt.Printf("Timestamp: %d, CST Time: %s, Period: %s\n", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period)
|
||||
|
||||
// 对于日线数据,检查是否为当天的 00:00:00
|
||||
if cl.Period == "1D" {
|
||||
if cstTime.Hour() != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 {
|
||||
return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period)
|
||||
}
|
||||
} else {
|
||||
// 对于其他周期,使用原来的对齐检查
|
||||
if cstTime.UnixMilli()%duration.Milliseconds() != 0 {
|
||||
return fmt.Errorf("timestamp %d is not aligned with period %s", ts, cl.Period)
|
||||
}
|
||||
// 动态配置 ILM
|
||||
err = configureILM(client, config, index, cstTime, cl.Period)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to configure ILM: %v", err)
|
||||
}
|
||||
|
||||
// 构造记录
|
||||
record := map[string]interface{}{
|
||||
"dataTime": cstTime.Format("2006-01-02T15:04:05") + "+08:00",
|
||||
"dataTime": cstTime.Format("2006-01-02 15:04:05"),
|
||||
"open": candle.Open,
|
||||
"high": candle.High,
|
||||
"low": candle.Low,
|
||||
@ -490,64 +491,35 @@ func (cl *CandleList) ToElastic() error {
|
||||
"volumeCcy": candle.VolumeCcy,
|
||||
}
|
||||
|
||||
// 构造请求体
|
||||
jsonData, err := json.Marshal(record)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal data: %v", err)
|
||||
}
|
||||
var config *cfg.Config
|
||||
configPaths := []string{
|
||||
"config/config.json", // 当前目录下的config目录
|
||||
"../config/config.json", // 上一级目录下的config目录
|
||||
"../../config/config.json", // 上两级目录下的config目录
|
||||
}
|
||||
|
||||
var loadErr error
|
||||
for _, path := range configPaths {
|
||||
config, loadErr = cfg.LoadConfig(path)
|
||||
if loadErr == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if config == nil {
|
||||
return fmt.Errorf("failed to load configuration: %v", loadErr)
|
||||
}
|
||||
|
||||
// 构造完整URL
|
||||
fullURL := fmt.Sprintf("%s/%s/_doc/%d", strings.TrimRight(config.Elasticsearch.URL, "/"), index, ts)
|
||||
fmt.Println("fullURL: ", fullURL)
|
||||
|
||||
// 创建请求
|
||||
req, err := http.NewRequest("POST", fullURL, bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create request: %v", err)
|
||||
}
|
||||
|
||||
// 设置请求头
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("Accept", "application/json")
|
||||
// 尝试从不同层级加载配置
|
||||
|
||||
// 设置基本认证
|
||||
req.SetBasicAuth(config.Elasticsearch.Auth.Username, config.Elasticsearch.Auth.Password)
|
||||
|
||||
// 发送HTTP请求
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send data to Elasticsearch: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// 读取响应体
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read response body: %v", err)
|
||||
}
|
||||
|
||||
// 输出完整的响应信息
|
||||
fmt.Printf("HTTP Response Status: %s\n", resp.Status)
|
||||
fmt.Printf("HTTP Response Headers: %v\n", resp.Header)
|
||||
fmt.Printf("HTTP Response Body: %s\n", string(body))
|
||||
|
||||
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
|
||||
@ -559,3 +531,247 @@ func (cl *CandleList) ToElastic() error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// configureILM 动态配置 ILM 策略
|
||||
func configureILM(client *http.Client, config *cfg.Config, index string, indexTime time.Time, period string) error {
|
||||
now := time.Now()
|
||||
daysDiff := int(now.Sub(indexTime).Hours() / 24) // 距今天数
|
||||
|
||||
// 时间框架转换为分钟数
|
||||
minutes := periodToMinutes(period)
|
||||
|
||||
// 计算保留时间(天)
|
||||
maxRetention := getMaxRetention(period)
|
||||
retentionDays := calculateRetention(daysDiff, minutes, maxRetention)
|
||||
|
||||
// 决定 ILM 策略
|
||||
var ilmPolicy string
|
||||
if daysDiff > 730 || retentionDays < 180 { // 超过 2 年或保留时间短,直<EFBC8C><E79BB4><EFBFBD> cold
|
||||
ilmPolicy = fmt.Sprintf("logstash_candle_%s_%d_cold", period, indexTime.Year())
|
||||
} else {
|
||||
ilmPolicy = fmt.Sprintf("logstash_candle_%s_%d_normal", period, indexTime.Year())
|
||||
}
|
||||
|
||||
// 创建 ILM 策略
|
||||
if err := ensureILMPolicy(client, config, ilmPolicy, daysDiff, retentionDays); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 配置索引模板
|
||||
templateName := fmt.Sprintf("logstash-candle-%s-%d-%02d", period, indexTime.Year(), int(indexTime.Month()))
|
||||
template := map[string]interface{}{
|
||||
"index_patterns": []string{fmt.Sprintf("logstash-%s.candle.*.%d-%02d", period, indexTime.Year(), int(indexTime.Month()))},
|
||||
"template": map[string]interface{}{
|
||||
"settings": map[string]interface{}{
|
||||
"number_of_shards": 2,
|
||||
"number_of_replicas": 1,
|
||||
"index.lifecycle.name": ilmPolicy,
|
||||
},
|
||||
"mappings": map[string]interface{}{
|
||||
"properties": map[string]interface{}{
|
||||
"dataTime": map[string]string{"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
|
||||
"open": map[string]string{"type": "float"},
|
||||
"high": map[string]string{"type": "float"},
|
||||
"low": map[string]string{"type": "float"},
|
||||
"close": map[string]string{"type": "float"},
|
||||
"volume": map[string]string{"type": "float"},
|
||||
"volumeCcy": map[string]string{"type": "float"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
templateURL := fmt.Sprintf("%s/_index_template/%s", strings.TrimRight(config.Elasticsearch.URL, "/"), templateName)
|
||||
templateData, _ := json.Marshal(template)
|
||||
req, err := http.NewRequest("PUT", templateURL, bytes.NewBuffer(templateData))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create template request: %v", err)
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.SetBasicAuth(config.Elasticsearch.Auth.Username, config.Elasticsearch.Auth.Password)
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create index template: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return fmt.Errorf("failed to create template, status: %d, response: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
fmt.Printf("Successfully configured ILM template: %s\n", templateName)
|
||||
return nil
|
||||
}
|
||||
|
||||
// periodToMinutes 将时间框架转换为分钟数
|
||||
func periodToMinutes(period string) int {
|
||||
switch period {
|
||||
case "1m":
|
||||
return 1
|
||||
case "3m":
|
||||
return 3
|
||||
case "5m":
|
||||
return 5
|
||||
case "15m":
|
||||
return 15
|
||||
case "30m":
|
||||
return 30
|
||||
case "1h":
|
||||
return 60
|
||||
case "2h":
|
||||
return 120
|
||||
case "4h":
|
||||
return 240
|
||||
case "6h":
|
||||
return 360
|
||||
case "12h":
|
||||
return 720
|
||||
case "1d", "1D":
|
||||
return 1440
|
||||
case "2d":
|
||||
return 2880
|
||||
case "5d":
|
||||
return 7200
|
||||
case "1W":
|
||||
return 10080
|
||||
case "1M":
|
||||
return 43200 // 假设 30 天
|
||||
default:
|
||||
return 1 // 默认 1 分钟
|
||||
}
|
||||
}
|
||||
|
||||
// getMaxRetention 获取时间框架的最大保留时间(天)
|
||||
func getMaxRetention(period string) int {
|
||||
switch period {
|
||||
case "1m", "3m":
|
||||
return 365 // 1 年
|
||||
case "5m", "15m", "30m":
|
||||
return 730 // 2 年
|
||||
case "1h", "2h":
|
||||
return 1095 // 3 年
|
||||
case "4h", "6h", "12h":
|
||||
return 1825 // 5 年
|
||||
case "1d", "1D", "2d", "5d":
|
||||
return 2555 // 7 年
|
||||
case "1W", "1M":
|
||||
return 3650 // 10 年
|
||||
default:
|
||||
return 365
|
||||
}
|
||||
}
|
||||
|
||||
// calculateRetention 计算保留时间(天)
|
||||
func calculateRetention(daysDiff, minutes, maxRetention int) int {
|
||||
timeFactor := 1 - math.Sqrt(float64(daysDiff))/50 // 距今时间非线性衰减
|
||||
if timeFactor < 0 {
|
||||
timeFactor = 0
|
||||
}
|
||||
periodFactor := math.Sqrt(float64(minutes)) / math.Sqrt(43200) // 时间框架非线性增长
|
||||
retention := float64(maxRetention) * timeFactor * periodFactor
|
||||
if retention < 30 {
|
||||
return 30 // 最小保留 30 天
|
||||
}
|
||||
return int(retention)
|
||||
}
|
||||
|
||||
// ensureILMPolicy 确保 ILM 策略存在
|
||||
func ensureILMPolicy(client *http.Client, config *cfg.Config, policyName string, daysDiff, retentionDays int) error {
|
||||
policyURL := fmt.Sprintf("%s/_ilm/policy/%s", strings.TrimRight(config.Elasticsearch.URL, "/"), policyName)
|
||||
resp, err := client.Get(policyURL)
|
||||
if err == nil && resp.StatusCode == http.StatusOK {
|
||||
resp.Body.Close()
|
||||
return nil
|
||||
}
|
||||
resp.Body.Close()
|
||||
|
||||
var policy map[string]interface{}
|
||||
if daysDiff > 730 || retentionDays < 180 { // 超过 2 年或保留时间短,直接 cold
|
||||
policy = map[string]interface{}{
|
||||
"policy": map[string]interface{}{
|
||||
"phases": map[string]interface{}{
|
||||
"cold": map[string]interface{}{
|
||||
"min_age": "0d",
|
||||
"actions": map[string]interface{}{
|
||||
"allocate": map[string]interface{}{
|
||||
"include": map[string]string{"_tier_preference": "data_cold"},
|
||||
},
|
||||
"shrink": map[string]interface{}{
|
||||
"number_of_shards": 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
"delete": map[string]interface{}{
|
||||
"min_age": fmt.Sprintf("%dd", retentionDays),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
} else {
|
||||
policy = map[string]interface{}{
|
||||
"policy": map[string]interface{}{
|
||||
"phases": map[string]interface{}{
|
||||
"hot": map[string]interface{}{
|
||||
"actions": map[string]interface{}{
|
||||
"rollover": map[string]interface{}{
|
||||
"max_size": "30GB",
|
||||
"max_age": "14d",
|
||||
},
|
||||
"allocate": map[string]interface{}{
|
||||
"include": map[string]string{"_tier_preference": "data_hot"},
|
||||
},
|
||||
},
|
||||
},
|
||||
"warm": map[string]interface{}{
|
||||
"min_age": "60d",
|
||||
"actions": map[string]interface{}{
|
||||
"allocate": map[string]interface{}{
|
||||
"include": map[string]string{"_tier_preference": "data_warm"},
|
||||
},
|
||||
},
|
||||
},
|
||||
"cold": map[string]interface{}{
|
||||
"min_age": "180d",
|
||||
"actions": map[string]interface{}{
|
||||
"allocate": map[string]interface{}{
|
||||
"include": map[string]string{"_tier_preference": "data_cold"},
|
||||
},
|
||||
"shrink": map[string]interface{}{
|
||||
"number_of_shards": 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
"delete": map[string]interface{}{
|
||||
"min_age": fmt.Sprintf("%dd", retentionDays),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
policyData, _ := json.Marshal(policy)
|
||||
req, err := http.NewRequest("PUT", policyURL, bytes.NewBuffer(policyData))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create ILM policy request: %v", err)
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.SetBasicAuth(config.Elasticsearch.Auth.Username, config.Elasticsearch.Auth.Password)
|
||||
|
||||
resp, err = client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create ILM policy: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return fmt.Errorf("failed to create ILM policy, status: %d, response: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
fmt.Printf("Successfully created ILM policy: %s\n", policyName)
|
||||
return nil
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user