diff --git a/okx/candleList.go b/okx/candleList.go index 61b75c1..54fdb03 100644 --- a/okx/candleList.go +++ b/okx/candleList.go @@ -7,6 +7,7 @@ import ( "fmt" cfg "gitea.zjmud.xyz/phyer/tanya/config" "io" + "math" "net/http" "strconv" "strings" @@ -422,38 +423,47 @@ func parsePeriod(period string) (time.Duration, error) { } // ToElastic 将Candle数据发送到Elasticsearch + +// ToElastic 将 Candle 数据发送到 Elasticsearch,并动态配置 ILM 策略 func (cl *CandleList) ToElastic() error { + client := &http.Client{Timeout: 30 * time.Second} - // 创建带超时的HTTP客户端 - client := &http.Client{ - Timeout: 30 * time.Second, - } - - // 获取周期的 duration duration, err := parsePeriod(cl.Period) if err != nil { return fmt.Errorf("failed to parse period: %v", err) } + var config *cfg.Config + configPaths := []string{"config/config.json", "../config/config.json", "../../config/config.json"} + for _, path := range configPaths { + config, err = cfg.LoadConfig(path) + if err == nil { + break + } + } + if err != nil { + return fmt.Errorf("failed to load config: %v", err) + } + for _, candle := range cl.Candles { - // 解析时间戳 ts, err := strconv.ParseInt(candle.Timestamp, 10, 64) if err != nil { fmt.Println("invalid timestamp:", ts, err) continue } - // 转换为东八区时间 loc, _ := time.LoadLocation("Asia/Shanghai") cstTime := time.UnixMilli(ts).In(loc) - // 根据当前记录的时间确定年份 - currentYear := cstTime.Year() + // 新索引名称:logstash-<period>.candle.<coinpair>.<year>-<month> + index := fmt.Sprintf("logstash-%s.candle.%s.%d-%02d", + strings.ToLower(cl.Period), + strings.ToLower(cl.CoinPair), + cstTime.Year(), + int(cstTime.Month()), + ) - // 动态构造索引名称 - index := fmt.Sprintf("logstash-candle.%s.%d.%s", strings.ToLower(cl.CoinPair), currentYear, strings.ToLower(cl.Period)) - - // 验证时间戳是否为周期的整数倍 + // 验证时间戳对齐 if cl.Period == "1D" { if cstTime.Hour() != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 { return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period) @@ -464,24 +474,15 @@ func (cl *CandleList) ToElastic() error { } } - // 添加调试信息 - fmt.Printf("Timestamp: %d, CST Time: %s, Period: %s\n", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period) - - // 对于日线数据,检查是否为当天的 00:00:00 - if cl.Period == "1D" { - if cstTime.Hour() != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 { - return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period) - } - } else { - // 对于其他周期,使用原来的对齐检查 - if cstTime.UnixMilli()%duration.Milliseconds() != 0 { - return fmt.Errorf("timestamp %d is not aligned with period %s", ts, cl.Period) - } + // 动态配置 ILM + err = configureILM(client, config, index, cstTime, cl.Period) + if err != nil { + return fmt.Errorf("failed to configure ILM: %v", err) } // 构造记录 record := map[string]interface{}{ - "dataTime": cstTime.Format("2006-01-02T15:04:05") + "+08:00", + "dataTime": cstTime.Format("2006-01-02 15:04:05"), "open": candle.Open, "high": candle.High, "low": candle.Low, @@ -490,64 +491,35 @@ func (cl *CandleList) ToElastic() error { "volumeCcy": candle.VolumeCcy, } - // 构造请求体 jsonData, err := json.Marshal(record) if err != nil { return fmt.Errorf("failed to marshal data: %v", err) } - var config *cfg.Config - configPaths := []string{ - "config/config.json", // 当前目录下的config目录 - "../config/config.json", // 上一级目录下的config目录 - "../../config/config.json", // 上两级目录下的config目录 - } - var loadErr error - for _, path := range configPaths { - config, loadErr = cfg.LoadConfig(path) - if loadErr == nil { - break - } - } - - if config == nil { - return fmt.Errorf("failed to load configuration: %v", loadErr) - } - - // 构造完整URL fullURL := fmt.Sprintf("%s/%s/_doc/%d", strings.TrimRight(config.Elasticsearch.URL, "/"), index, ts) fmt.Println("fullURL: ", fullURL) - // 创建请求 req, err := http.NewRequest("POST", fullURL, bytes.NewBuffer(jsonData)) if err != nil { return fmt.Errorf("failed to create request: %v", err) } - // 设置请求头 req.Header.Set("Content-Type", "application/json") req.Header.Set("Accept", "application/json") - // 尝试从不同层级加载配置 - - // 设置基本认证 req.SetBasicAuth(config.Elasticsearch.Auth.Username, config.Elasticsearch.Auth.Password) - // 发送HTTP请求 resp, err := client.Do(req) if err != nil { return fmt.Errorf("failed to send data to Elasticsearch: %v", err) } defer resp.Body.Close() - // 读取响应体 body, err := io.ReadAll(resp.Body) if err != nil { return fmt.Errorf("failed to read response body: %v", err) } - // 输出完整的响应信息 fmt.Printf("HTTP Response Status: %s\n", resp.Status) - fmt.Printf("HTTP Response Headers: %v\n", resp.Header) fmt.Printf("HTTP Response Body: %s\n", string(body)) if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { @@ -559,3 +531,247 @@ func (cl *CandleList) ToElastic() error { return nil } + +// configureILM 动态配置 ILM 策略 +func configureILM(client *http.Client, config *cfg.Config, index string, indexTime time.Time, period string) error { + now := time.Now() + daysDiff := int(now.Sub(indexTime).Hours() / 24) // 距今天数 + + // 时间框架转换为分钟数 + minutes := periodToMinutes(period) + + // 计算保留时间(天) + maxRetention := getMaxRetention(period) + retentionDays := calculateRetention(daysDiff, minutes, maxRetention) + + // 决定 ILM 策略 + var ilmPolicy string + if daysDiff > 730 || retentionDays < 180 { // 超过 2 年或保留时间短,直��� cold + ilmPolicy = fmt.Sprintf("logstash_candle_%s_%d_cold", period, indexTime.Year()) + } else { + ilmPolicy = fmt.Sprintf("logstash_candle_%s_%d_normal", period, indexTime.Year()) + } + + // 创建 ILM 策略 + if err := ensureILMPolicy(client, config, ilmPolicy, daysDiff, retentionDays); err != nil { + return err + } + + // 配置索引模板 + templateName := fmt.Sprintf("logstash-candle-%s-%d-%02d", period, indexTime.Year(), int(indexTime.Month())) + template := map[string]interface{}{ + "index_patterns": []string{fmt.Sprintf("logstash-%s.candle.*.%d-%02d", period, indexTime.Year(), int(indexTime.Month()))}, + "template": map[string]interface{}{ + "settings": map[string]interface{}{ + "number_of_shards": 2, + "number_of_replicas": 1, + "index.lifecycle.name": ilmPolicy, + }, + "mappings": map[string]interface{}{ + "properties": map[string]interface{}{ + "dataTime": map[string]string{"type": "date", "format": "yyyy-MM-dd HH:mm:ss"}, + "open": map[string]string{"type": "float"}, + "high": map[string]string{"type": "float"}, + "low": map[string]string{"type": "float"}, + "close": map[string]string{"type": "float"}, + "volume": map[string]string{"type": "float"}, + "volumeCcy": map[string]string{"type": "float"}, + }, + }, + }, + } + + templateURL := fmt.Sprintf("%s/_index_template/%s", strings.TrimRight(config.Elasticsearch.URL, "/"), templateName) + templateData, _ := json.Marshal(template) + req, err := http.NewRequest("PUT", templateURL, bytes.NewBuffer(templateData)) + if err != nil { + return fmt.Errorf("failed to create template request: %v", err) + } + + req.Header.Set("Content-Type", "application/json") + req.SetBasicAuth(config.Elasticsearch.Auth.Username, config.Elasticsearch.Auth.Password) + + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to create index template: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("failed to create template, status: %d, response: %s", resp.StatusCode, string(body)) + } + + fmt.Printf("Successfully configured ILM template: %s\n", templateName) + return nil +} + +// periodToMinutes 将时间框架转换为分钟数 +func periodToMinutes(period string) int { + switch period { + case "1m": + return 1 + case "3m": + return 3 + case "5m": + return 5 + case "15m": + return 15 + case "30m": + return 30 + case "1h": + return 60 + case "2h": + return 120 + case "4h": + return 240 + case "6h": + return 360 + case "12h": + return 720 + case "1d", "1D": + return 1440 + case "2d": + return 2880 + case "5d": + return 7200 + case "1W": + return 10080 + case "1M": + return 43200 // 假设 30 天 + default: + return 1 // 默认 1 分钟 + } +} + +// getMaxRetention 获取时间框架的最大保留时间(天) +func getMaxRetention(period string) int { + switch period { + case "1m", "3m": + return 365 // 1 年 + case "5m", "15m", "30m": + return 730 // 2 年 + case "1h", "2h": + return 1095 // 3 年 + case "4h", "6h", "12h": + return 1825 // 5 年 + case "1d", "1D", "2d", "5d": + return 2555 // 7 年 + case "1W", "1M": + return 3650 // 10 年 + default: + return 365 + } +} + +// calculateRetention 计算保留时间(天) +func calculateRetention(daysDiff, minutes, maxRetention int) int { + timeFactor := 1 - math.Sqrt(float64(daysDiff))/50 // 距今时间非线性衰减 + if timeFactor < 0 { + timeFactor = 0 + } + periodFactor := math.Sqrt(float64(minutes)) / math.Sqrt(43200) // 时间框架非线性增长 + retention := float64(maxRetention) * timeFactor * periodFactor + if retention < 30 { + return 30 // 最小保留 30 天 + } + return int(retention) +} + +// ensureILMPolicy 确保 ILM 策略存在 +func ensureILMPolicy(client *http.Client, config *cfg.Config, policyName string, daysDiff, retentionDays int) error { + policyURL := fmt.Sprintf("%s/_ilm/policy/%s", strings.TrimRight(config.Elasticsearch.URL, "/"), policyName) + resp, err := client.Get(policyURL) + if err == nil && resp.StatusCode == http.StatusOK { + resp.Body.Close() + return nil + } + resp.Body.Close() + + var policy map[string]interface{} + if daysDiff > 730 || retentionDays < 180 { // 超过 2 年或保留时间短,直接 cold + policy = map[string]interface{}{ + "policy": map[string]interface{}{ + "phases": map[string]interface{}{ + "cold": map[string]interface{}{ + "min_age": "0d", + "actions": map[string]interface{}{ + "allocate": map[string]interface{}{ + "include": map[string]string{"_tier_preference": "data_cold"}, + }, + "shrink": map[string]interface{}{ + "number_of_shards": 1, + }, + }, + }, + "delete": map[string]interface{}{ + "min_age": fmt.Sprintf("%dd", retentionDays), + }, + }, + }, + } + } else { + policy = map[string]interface{}{ + "policy": map[string]interface{}{ + "phases": map[string]interface{}{ + "hot": map[string]interface{}{ + "actions": map[string]interface{}{ + "rollover": map[string]interface{}{ + "max_size": "30GB", + "max_age": "14d", + }, + "allocate": map[string]interface{}{ + "include": map[string]string{"_tier_preference": "data_hot"}, + }, + }, + }, + "warm": map[string]interface{}{ + "min_age": "60d", + "actions": map[string]interface{}{ + "allocate": map[string]interface{}{ + "include": map[string]string{"_tier_preference": "data_warm"}, + }, + }, + }, + "cold": map[string]interface{}{ + "min_age": "180d", + "actions": map[string]interface{}{ + "allocate": map[string]interface{}{ + "include": map[string]string{"_tier_preference": "data_cold"}, + }, + "shrink": map[string]interface{}{ + "number_of_shards": 1, + }, + }, + }, + "delete": map[string]interface{}{ + "min_age": fmt.Sprintf("%dd", retentionDays), + }, + }, + }, + } + } + + policyData, _ := json.Marshal(policy) + req, err := http.NewRequest("PUT", policyURL, bytes.NewBuffer(policyData)) + if err != nil { + return fmt.Errorf("failed to create ILM policy request: %v", err) + } + + req.Header.Set("Content-Type", "application/json") + req.SetBasicAuth(config.Elasticsearch.Auth.Username, config.Elasticsearch.Auth.Password) + + resp, err = client.Do(req) + if err != nil { + return fmt.Errorf("failed to create ILM policy: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("failed to create ILM policy, status: %d, response: %s", resp.StatusCode, string(body)) + } + + fmt.Printf("Successfully created ILM policy: %s\n", policyName) + return nil +}