From dcb41e3266a07b271f7fb3d13a40ac81dfc9ab28 Mon Sep 17 00:00:00 2001 From: "zhangkun9038@dingtalk.com" Date: Fri, 28 Mar 2025 22:04:49 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8D=AE=E8=AF=B4=E7=B4=A2=E5=BC=95=E9=83=BD?= =?UTF-8?q?=E8=A6=81=E6=9C=89alias=EF=BC=8Calias=E4=B8=8D=E8=83=BD?= =?UTF-8?q?=E8=B7=9F=E5=88=AB=E7=9A=84=E7=B4=A2=E5=BC=95=E9=87=8D=E5=A4=8D?= =?UTF-8?q?=EF=BC=8C=E5=8A=A0=E4=B8=8A=E4=BA=86=E7=9B=B8=E5=85=B3=E9=80=BB?= =?UTF-8?q?=E8=BE=91=EF=BC=8C=E6=9C=89=E5=BE=85=E5=90=8E=E7=BB=AD=E7=A1=AE?= =?UTF-8?q?=E8=AE=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Dockerfile | 5 - config/config.go | 8 + config/config.json | 26 +- config/default_policy.json | 49 +++ config/fluentd.yaml | 9 + elasticilm/ilm.go | 631 ++++++++++++++++++++++++--------- fluentd-deployment.yaml | 11 + fluentd.conf | 29 ++ okx/candleList.go | 59 ++- okx/publicApiService.go | 80 ++++- test/okxAli/candleList_test.go | 34 +- 11 files changed, 741 insertions(+), 200 deletions(-) create mode 100644 config/default_policy.json create mode 100644 config/fluentd.yaml create mode 100644 fluentd-deployment.yaml create mode 100644 fluentd.conf diff --git a/Dockerfile b/Dockerfile index 92c5bcf..e69de29 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +0,0 @@ - -<<<<<<< HEAD -======= -RUN fluent-gem install fluent-plugin-elasticsearch fluent-plugin-rewrite-tag-filter ->>>>>>> Snippet diff --git a/config/config.go b/config/config.go index db50655..be2c661 100644 --- a/config/config.go +++ b/config/config.go @@ -35,6 +35,13 @@ type ILMConfig struct { } // DataTypeConfig 每种数据类型的 ILM 配置 +type TimeParameters struct { + TimestampFormat string `json:"timestamp_format"` + DefaultAfter string `json:"default_after"` + DefaultBefore string `json:"default_before"` + MaxRangeDays int `json:"max_range_days"` +} + type DataTypeConfig struct { IndexPattern string `json:"index_pattern"` MaxRetention map[string]int `json:"max_retention"` @@ -43,6 +50,7 @@ type DataTypeConfig struct { Replicas int `json:"replicas"` NormalRollover map[string]string `json:"normal_rollover"` NormalPhases map[string]int `json:"normal_phases"` + TimeParameters TimeParameters `json:"time_parameters"` } // LoadConfig 加载配置文件 diff --git a/config/config.json b/config/config.json index 7c5091b..a18165d 100644 --- a/config/config.json +++ b/config/config.json @@ -35,12 +35,14 @@ "replicas": 1, "normal_rollover": { "max_size": "30GB", - "max_age": "14d" + "max_age": "14d", + "max_docs": "1000000" }, - "normal_phases": { - "warm": 60, - "cold": 180 - } + "normal_phases": { + "warm": 60, + "cold": 180, + "delete": 365 + } }, "ma": { "index_pattern": "logstash-%s.ma.*.%d-%02d", @@ -55,10 +57,16 @@ "max_size": "50GB", "max_age": "30d" }, - "normal_phases": { - "warm": 90, - "cold": 270 - } + "normal_phases": { + "warm": 90, + "cold": 270 + }, + "time_parameters": { + "timestamp_format": "unix_millis", + "default_after": "now-7d", + "default_before": "now", + "max_range_days": 30 + } } } } diff --git a/config/default_policy.json b/config/default_policy.json new file mode 100644 index 0000000..4d0461d --- /dev/null +++ b/config/default_policy.json @@ -0,0 +1,49 @@ +<<<<<<< HEAD +======= +{ + "policy": { + "phases": { + "hot": { + "actions": { + "rollover": { + "max_size": "50GB", + "max_age": "1d" + }, + "allocate": { + "include": { + "_tier_preference": "data_hot" + } + } + } + }, + "warm": { + "min_age": "1d", + "actions": { + "allocate": { + "include": { + "_tier_preference": "data_warm" + } + }, + "shrink": { + "number_of_shards": 2 + } + } + }, + "cold": { + "min_age": "7d", + "actions": { + "allocate": { + "include": { + "_tier_preference": "data_cold" + } + }, + "shrink": { + "number_of_shards": 1 + } + } + } + } + } +} +>>>>>>> Snippet + diff --git a/config/fluentd.yaml b/config/fluentd.yaml new file mode 100644 index 0000000..e0f853e --- /dev/null +++ b/config/fluentd.yaml @@ -0,0 +1,9 @@ + +<<<<<<< HEAD +======= + host ${ENV['ELASTICSEARCH_HOST'] || "elasticsearch"} + port ${ENV['ELASTICSEARCH_PORT'] || 9200} + scheme ${ENV['ELASTICSEARCH_SCHEME'] || "http"} + user ${ENV['ELASTICSEARCH_USER'] || "fluentd_user"} + password ${ENV['ELASTICSEARCH_PASSWORD'] || "fluentd_password"} +>>>>>>> Snippet diff --git a/elasticilm/ilm.go b/elasticilm/ilm.go index 06d89e0..179169d 100644 --- a/elasticilm/ilm.go +++ b/elasticilm/ilm.go @@ -7,6 +7,8 @@ import ( "io" "math" "net/http" + "strconv" + "strings" "time" cfg "gitea.zjmud.xyz/phyer/tanya/config" // 导入你的 config 包 @@ -82,190 +84,171 @@ func periodToMinutes(period string) int { } } -// ConfigureILM 动态配置 ILM 策略和索引模板 -func ConfigureILM(client *http.Client, config *cfg.Config, dataType, period string, indexTime time.Time) error { - // 从配置文件中获取 Elasticsearch 和 ILM 配置 - esConfig := config.Elasticsearch - dataConfig, ok := esConfig.ILM.DataTypes[dataType] - if !ok { - return fmt.Errorf("ILM configuration for data type '%s' not found in config", dataType) +// getPhase 根据时间差和保留时间决定初始阶段 +func getPhase(daysDiff, retentionDays int) string { + if daysDiff > 730 || retentionDays < 180 { // 超过 2 年或保留时间少于 180 天 + fmt.Printf("[ILM Phase] 数据已超过2年或保留时间少于180天,判定为cold阶段。daysDiff: %d, retentionDays: %d\n", daysDiff, retentionDays) + return "cold" } - - // 创建保留时间计算器 - calc := DefaultRetentionCalculator{ - MaxRetention: dataConfig.MaxRetention, - MinRetention: dataConfig.MinRetention, + if daysDiff <= 7 { // 7天内的数据为hot阶段 + fmt.Printf("[ILM Phase] 数据在7天内,判定为hot阶段。daysDiff: %d\n", daysDiff) + return "hot" } + fmt.Printf("[ILM Phase] 数据在7天到2年之间,判定为warm阶段。daysDiff: %d\n", daysDiff) + return "warm" +} - // 计算距今时间差和保留时间 - now := time.Now() - daysDiff := int(now.Sub(indexTime).Hours() / 24) - retentionDays := calc.Calculate(daysDiff, period) - - // 构造 ILM 策略名称 - year, month := indexTime.Year(), int(indexTime.Month()) - policyName := fmt.Sprintf("logstash_%s_%s_%d_%02d_%s", dataType, period, year, month, getPhase(daysDiff, retentionDays)) - - // 创建或更新 ILM 策略 - if err := ensureILMPolicy(client, esConfig, policyName, daysDiff, retentionDays, dataConfig); err != nil { - return fmt.Errorf("failed to ensure ILM policy: %v", err) - } - - // 配置索引模板 - templateName := fmt.Sprintf("logstash-%s-%s-%d-%02d", dataType, period, year, month) - indexPattern := fmt.Sprintf(dataConfig.IndexPattern, period, year, month) - // 根据数据类型设置映射 - var mappings map[string]interface{} - switch dataType { - case "candle": - mappings = map[string]interface{}{ - "properties": map[string]interface{}{ - "dataTime": map[string]string{"type": "date", "format": "yyyy-MM-dd HH:mm:ss"}, - "open": map[string]string{"type": "float"}, - "high": map[string]string{"type": "float"}, - "low": map[string]string{"type": "float"}, - "close": map[string]string{"type": "float"}, - "volume": map[string]string{"type": "float"}, - "volumeCcy": map[string]string{"type": "float"}, - }, - } - case "ma": - mappings = map[string]interface{}{ - "properties": map[string]interface{}{ - "dataTime": map[string]string{"type": "date", "format": "yyyy-MM-dd HH:mm:ss"}, - "ma_value": map[string]string{"type": "float"}, // MA 值 - }, - } - default: - return fmt.Errorf("unsupported data type: %s", dataType) - } - - template := map[string]interface{}{ - "index_patterns": []string{indexPattern}, - "template": map[string]interface{}{ - "settings": map[string]interface{}{ - "number_of_shards": dataConfig.Shards, - "number_of_replicas": dataConfig.Replicas, - "index.lifecycle.name": policyName, - }, - "mappings": mappings, - }, - } - - templateURL := fmt.Sprintf("%s/_index_template/%s", esConfig.URL, templateName) - templateData, _ := json.Marshal(template) - req, err := http.NewRequest("PUT", templateURL, bytes.NewBuffer(templateData)) +func ensureAlias(client *http.Client, esConfig cfg.ElasticsearchConfig, alias, period string) error { + // 获取当前别名关联的索引 + getAliasURL := fmt.Sprintf("%s/_alias/%s", esConfig.URL, alias) + req, err := http.NewRequest("GET", getAliasURL, nil) if err != nil { - return fmt.Errorf("failed to create template request: %v", err) + return err } - - req.Header.Set("Content-Type", "application/json") req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) resp, err := client.Do(req) if err != nil { - return fmt.Errorf("failed to send template request: %v", err) + return err } defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("failed to create template, status: %d, response: %s", resp.StatusCode, string(body)) - } + // 准备别名更新的操作 + var actions []interface{} + var aliasInfo map[string]map[string]interface{} - fmt.Printf("Successfully configured ILM template: %s\n", templateName) - return nil -} - -// getPhase 根据时间差和保留时间决定初始阶段 -func getPhase(daysDiff, retentionDays int) string { - if daysDiff > 730 || retentionDays < 180 { // 超过 2 年或保留时间少于 180 天 - return "cold" - } - return "normal" -} - -// ensureILMPolicy 创建或更新 ILM 策略 -func ensureILMPolicy(client *http.Client, esConfig cfg.ElasticsearchConfig, policyName string, daysDiff, retentionDays int, dataConfig cfg.DataTypeConfig) error { - policyURL := fmt.Sprintf("%s/_ilm/policy/%s", esConfig.URL, policyName) - resp, err := client.Get(policyURL) - if err == nil && resp.StatusCode == http.StatusOK { - resp.Body.Close() - return nil // 策略已存在 - } - resp.Body.Close() - - var policy map[string]interface{} - if daysDiff > 730 || retentionDays < 180 { - policy = map[string]interface{}{ - "policy": map[string]interface{}{ - "phases": map[string]interface{}{ - "cold": map[string]interface{}{ - "min_age": "0d", - "actions": map[string]interface{}{ - "allocate": map[string]interface{}{ - "include": map[string]string{"_tier_preference": "data_cold"}, - }, - "shrink": map[string]interface{}{ - "number_of_shards": 1, - }, - }, - }, - "delete": map[string]interface{}{ - "min_age": fmt.Sprintf("%dd", retentionDays), - "actions": map[string]interface{}{ - "delete": map[string]interface{}{}, // 添加 delete 动作 - }, - }, - }, - }, + // 如果别名存在,解析当前关联的索引 + if resp.StatusCode == http.StatusOK { + if err := json.NewDecoder(resp.Body).Decode(&aliasInfo); err != nil { + return err } - } else { - policy = map[string]interface{}{ - "policy": map[string]interface{}{ - "phases": map[string]interface{}{ - "hot": map[string]interface{}{ - "actions": map[string]interface{}{ - "rollover": dataConfig.NormalRollover, - "allocate": map[string]interface{}{ - "include": map[string]string{"_tier_preference": "data_hot"}, + + // 移除所有现有索引的写入索引标记 + for indexName, info := range aliasInfo { + if aliases, ok := info["aliases"].(map[string]interface{}); ok { + for aliasName, aliasDetails := range aliases { + if aliasName == alias && aliasDetails.(map[string]interface{})["is_write_index"] == true { + actions = append(actions, map[string]interface{}{ + "remove": map[string]interface{}{ + "index": indexName, + "alias": alias, }, - }, - }, - "warm": map[string]interface{}{ - "min_age": fmt.Sprintf("%dd", dataConfig.NormalPhases["warm"]), - "actions": map[string]interface{}{ - "allocate": map[string]interface{}{ - "include": map[string]string{"_tier_preference": "data_warm"}, - }, - }, - }, - "cold": map[string]interface{}{ - "min_age": fmt.Sprintf("%dd", dataConfig.NormalPhases["cold"]), - "actions": map[string]interface{}{ - "allocate": map[string]interface{}{ - "include": map[string]string{"_tier_preference": "data_cold"}, - }, - "shrink": map[string]interface{}{ - "number_of_shards": 1, - }, - }, - }, - "delete": map[string]interface{}{ - "min_age": fmt.Sprintf("%dd", retentionDays), - "actions": map[string]interface{}{ - "delete": map[string]interface{}{}, // 添加 delete 动作 - }, - }, - }, - }, + }) + } + } + } } } - policyData, _ := json.Marshal(policy) - req, err := http.NewRequest("PUT", policyURL, bytes.NewBuffer(policyData)) + // 确定最新的索引作为写入索引 + var latestIndex string + var latestTime time.Time + for indexName := range aliasInfo { + parts := strings.Split(indexName, ".") + if len(parts) < 3 { + continue + } + datePart := parts[len(parts)-1] // 例如 2025-03 + indexTime, err := time.Parse("2006-01", datePart) + if err != nil { + continue + } + if latestIndex == "" || indexTime.After(latestTime) { + latestIndex = indexName + latestTime = indexTime + } + } + + // 如果没有找到合适的索引,创建一个新的索引模式 + if latestIndex == "" { + latestIndex = fmt.Sprintf("logstash-%s.candle.okb-eth.%s", period, time.Now().Format("2006-01")) + } + + // 检查索引是否存在,如果不存在则创建 + indexURL := fmt.Sprintf("%s/%s", esConfig.URL, latestIndex) + req, err = http.NewRequest("HEAD", indexURL, nil) if err != nil { - return fmt.Errorf("failed to create ILM policy request: %v", err) + return err + } + req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) + + resp, err = client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusNotFound { + // 索引不存在,创建索引 + fmt.Printf("Index %s does not exist, creating it...\n", latestIndex) + createIndexURL := fmt.Sprintf("%s/%s", esConfig.URL, latestIndex) + // 简单的索引创建请求,可以根据需要添加更多设置 + indexData := map[string]interface{}{ + "settings": map[string]interface{}{ + "number_of_shards": 2, // 根据你的配置调整 + "number_of_replicas": 1, // 根据你的配置调整 + }, + } + data, err := json.Marshal(indexData) + if err != nil { + return fmt.Errorf("failed to marshal index creation data: %v", err) + } + + req, err = http.NewRequest("PUT", createIndexURL, bytes.NewBuffer(data)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) + + resp, err = client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("failed to create index %s, status: %d, response: %s", latestIndex, resp.StatusCode, string(body)) + } + fmt.Printf("Successfully created index: %s\n", latestIndex) + } + + // 添加最新的索引作为写入索引 + actions = append(actions, map[string]interface{}{ + "add": map[string]interface{}{ + "index": latestIndex, + "alias": alias, + "is_write_index": true, + }, + }) + + // 确保其他索引的 is_write_index 为 false + for indexName := range aliasInfo { + if indexName != latestIndex { + actions = append(actions, map[string]interface{}{ + "add": map[string]interface{}{ + "index": indexName, + "alias": alias, + "is_write_index": false, + }, + }) + } + } + + // 执行别名更新操作 + aliasURL := fmt.Sprintf("%s/_aliases", esConfig.URL) + aliasData := map[string]interface{}{ + "actions": actions, + } + + data, err := json.Marshal(aliasData) + if err != nil { + return err + } + req, err = http.NewRequest("POST", aliasURL, bytes.NewBuffer(data)) + if err != nil { + return err } req.Header.Set("Content-Type", "application/json") @@ -273,15 +256,325 @@ func ensureILMPolicy(client *http.Client, esConfig cfg.ElasticsearchConfig, poli resp, err = client.Do(req) if err != nil { - return fmt.Errorf("failed to send ILM policy request: %v", err) + return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("failed to create ILM policy, status: %d, response: %s", resp.StatusCode, string(body)) + return fmt.Errorf("failed to create alias, status: %d, response: %s", resp.StatusCode, string(body)) } - - fmt.Printf("Successfully created ILM policy: %s\n", policyName) + return nil +} + +func ConfigureILM(client *http.Client, config *cfg.Config, dataType, period string, indexTime time.Time) error { + esConfig := config.Elasticsearch + dataConfig, ok := esConfig.ILM.DataTypes[dataType] + if !ok { + return fmt.Errorf("ILM configuration for data type '%s' not found in config", dataType) + } + + // 计算保留时间和时间差 + calc := DefaultRetentionCalculator{ + MaxRetention: dataConfig.MaxRetention, + MinRetention: dataConfig.MinRetention, + } + now := time.Now() + daysDiff := int(now.Sub(indexTime).Hours() / 24) + retentionDays := calc.Calculate(daysDiff, period) + + // 确保保留时间不低于冷阶段的最小值 + minDeleteAge := dataConfig.NormalPhases["cold"] + if retentionDays < minDeleteAge { + retentionDays = minDeleteAge + } + + // 格式化策略名称、模板名称、索引模式和别名 + year, month := indexTime.Year(), int(indexTime.Month()) + policyName := fmt.Sprintf("logstash_%s_%s_%d_%02d", dataType, period, year, month) + templateName := fmt.Sprintf("log_stash_%s_%s_%d_%02d", dataType, period, year, month) + indexPattern := fmt.Sprintf(dataConfig.IndexPattern, period, year, month) + aliasName := fmt.Sprintf("%s_alias", policyName) + + fmt.Printf("[ConfigureILM] Configuring ILM with policyName: %s, templateName: %s, aliasName: %s\n", policyName, templateName, aliasName) + + // 创建索引模板 + template := map[string]interface{}{ + "index_patterns": []string{indexPattern}, + "priority": 100, + "template": map[string]interface{}{ + "settings": map[string]interface{}{ + "number_of_shards": dataConfig.Shards, + "number_of_replicas": dataConfig.Replicas, + "index.lifecycle.name": policyName, + "index.lifecycle.rollover_alias": aliasName, // 确保与 aliasName 一致 + }, + "mappings": map[string]interface{}{ + "properties": map[string]interface{}{ + "dataTime": map[string]string{"type": "date", "format": "yyyy-MM-dd HH:mm:ss"}, + "open": map[string]string{"type": "float"}, + "high": map[string]string{"type": "float"}, + "low": map[string]string{"type": "float"}, + "close": map[string]string{"type": "float"}, + "volume": map[string]string{"type": "float"}, + "volumeCcy": map[string]string{"type": "float"}, + }, + }, + }, + } + + templateURL := fmt.Sprintf("%s/_index_template/%s", esConfig.URL, templateName) + templateData, err := json.Marshal(template) + if err != nil { + return fmt.Errorf("failed to marshal index template data: %v", err) + } + + req, err := http.NewRequest("PUT", templateURL, bytes.NewBuffer(templateData)) + if err != nil { + return fmt.Errorf("failed to create index template request: %v", err) + } + req.Header.Set("Content-Type", "application/json") + req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to send index template request: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("failed to create index template, status: %d, response: %s", resp.StatusCode, string(body)) + } + fmt.Printf("[ConfigureILM] Successfully configured ILM template: %s\n", templateName) + + // 创建或更新 ILM 策略 + if err := ensureILMPolicy(client, esConfig, policyName, dataType, period, daysDiff, retentionDays, dataConfig); err != nil { + return fmt.Errorf("failed to ensure ILM policy: %v", err) + } + + // 设置别名 + if err := ensureAlias(client, esConfig, aliasName, period); err != nil { + return fmt.Errorf("failed to ensure alias: %v", err) + } + + fmt.Printf("[ConfigureILM] Successfully completed ILM configuration for policy: %s\n", policyName) + return nil +} + +func ensureILMPolicy(client *http.Client, esConfig cfg.ElasticsearchConfig, policyName, dataType, period string, daysDiff, retentionDays int, dataConfig cfg.DataTypeConfig) error { + // 检查 default_policy 是否存在 + defaultPolicyURL := fmt.Sprintf("%s/_ilm/policy/default_policy", esConfig.URL) + resp, err := client.Get(defaultPolicyURL) + if err != nil { + return fmt.Errorf("failed to check default_policy: %v", err) + } + defer resp.Body.Close() + + // 如果 default_policy 不存在,则创建 + if resp.StatusCode != http.StatusOK { + fmt.Println("[ILM Policy] Default policy does not exist, creating it...") + defaultPolicy := map[string]interface{}{ + "policy": map[string]interface{}{ + "phases": map[string]interface{}{ + "hot": map[string]interface{}{ + "actions": map[string]interface{}{ + "set_priority": map[string]interface{}{ + "priority": 100, + }, + }, + }, + "delete": map[string]interface{}{ + "min_age": "365d", + "actions": map[string]interface{}{ + "delete": map[string]interface{}{}, + }, + }, + }, + }, + } + defaultPolicyData, err := json.Marshal(defaultPolicy) + if err != nil { + return fmt.Errorf("failed to marshal default_policy: %v", err) + } + req, err := http.NewRequest("PUT", defaultPolicyURL, bytes.NewBuffer(defaultPolicyData)) + if err != nil { + return fmt.Errorf("failed to create default_policy request: %v", err) + } + req.Header.Set("Content-Type", "application/json") + req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) + resp, err = client.Do(req) + if err != nil { + return fmt.Errorf("failed to create default_policy: %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("failed to create default_policy, status: %d, response: %s", resp.StatusCode, string(body)) + } + fmt.Println("[ILM Policy] Successfully created default_policy") + } + + // 检查目标策略是否存在 + policyURL := fmt.Sprintf("%s/_ilm/policy/%s", esConfig.URL, policyName) + resp, err = client.Get(policyURL) + if err != nil { + return fmt.Errorf("failed to check ILM policy: %v", err) + } + defer resp.Body.Close() + + // 如果策略已存在,更新索引设置并删除旧策略 + if resp.StatusCode == http.StatusOK { + fmt.Printf("[ILM Policy] Policy %s already exists, updating index settings...\n", policyName) + + // 更新索引设置 + updateIndexURL := fmt.Sprintf("%s/logstash-%s.%s.*/_settings", esConfig.URL, period, dataType) + updateData := map[string]interface{}{ + "index.lifecycle.name": "default_policy", + "index.lifecycle.rollover_alias": fmt.Sprintf("logs-%s-%s-candle", dataType, period), + } + updateDataBytes, err := json.Marshal(updateData) + if err != nil { + return fmt.Errorf("failed to marshal update index data: %v", err) + } + req, err := http.NewRequest("PUT", updateIndexURL, bytes.NewBuffer(updateDataBytes)) + if err != nil { + return fmt.Errorf("failed to create update index request: %v", err) + } + req.Header.Set("Content-Type", "application/json") + req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) + updateResp, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to update index settings: %v", err) + } + defer updateResp.Body.Close() + if updateResp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(updateResp.Body) + return fmt.Errorf("failed to update index settings, status: %d, response: %s", updateResp.StatusCode, string(body)) + } + fmt.Printf("[ILM Policy] Successfully updated index settings for policy: %s\n", policyName) + + // 删除旧策略 + req, err = http.NewRequest("DELETE", policyURL, nil) + if err != nil { + return fmt.Errorf("failed to create delete policy request: %v", err) + } + req.Header.Set("Content-Type", "application/json") + req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) + deleteResp, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to delete old policy: %v", err) + } + defer deleteResp.Body.Close() + if deleteResp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(deleteResp.Body) + return fmt.Errorf("failed to delete old policy, status: %d, response: %s", deleteResp.StatusCode, string(body)) + } + fmt.Printf("[ILM Policy] Successfully deleted old policy: %s\n", policyName) + } else { + fmt.Printf("[ILM Policy] Policy %s does not exist, creating a new one...\n", policyName) + } + + // 创建新的 ILM 策略 + initialPhase := getPhase(daysDiff, retentionDays) + fmt.Printf("[ILM Policy] Initial phase for policy %s is: %s\n", policyName, initialPhase) + + rolloverActions := map[string]interface{}{ + "max_age": dataConfig.NormalRollover["max_age"], + "max_size": dataConfig.NormalRollover["max_size"], + } + if maxDocsStr, ok := dataConfig.NormalRollover["max_docs"]; ok && maxDocsStr != "" { + maxDocs, err := strconv.Atoi(maxDocsStr) + if err != nil || maxDocs <= 0 { + fmt.Printf("[ILM Policy] Invalid max_docs value: %s, skipping max_docs in rollover\n", maxDocsStr) + } else { + rolloverActions["max_docs"] = maxDocs + } + } else { + fmt.Println("[ILM Policy] max_docs not set in NormalRollover, skipping max_docs in rollover") + } + + policy := map[string]interface{}{ + "policy": map[string]interface{}{ + "phases": map[string]interface{}{ + initialPhase: map[string]interface{}{ + "actions": map[string]interface{}{ + "rollover": rolloverActions, + "set_priority": map[string]interface{}{ + "priority": 100, + }, + }, + }, + "warm": map[string]interface{}{ + "min_age": fmt.Sprintf("%dd", dataConfig.NormalPhases["warm"]), + "actions": map[string]interface{}{ + "allocate": map[string]interface{}{ + "include": map[string]string{"_tier_preference": "data_warm"}, + }, + }, + }, + "cold": map[string]interface{}{ + "min_age": fmt.Sprintf("%dd", dataConfig.NormalPhases["cold"]), + "actions": map[string]interface{}{ + "allocate": map[string]interface{}{ + "include": map[string]string{"_tier_preference": "data_cold"}, + }, + }, + }, + "delete": map[string]interface{}{ + "min_age": fmt.Sprintf("%dd", retentionDays), + "actions": map[string]interface{}{ + "delete": map[string]interface{}{}, + }, + }, + }, + }, + } + + // 如果数据超过 730 天(2 年),直接进入冷阶段 + if daysDiff > 730 { + phases, ok := policy["policy"].(map[string]interface{})["phases"].(map[string]interface{}) + if !ok { + return fmt.Errorf("failed to get phases from policy") + } + delete(phases, initialPhase) + if _, exists := phases["cold"]; !exists { + phases["cold"] = map[string]interface{}{ + "min_age": "0d", + "actions": map[string]interface{}{ + "allocate": map[string]interface{}{ + "include": map[string]string{"_tier_preference": "data_cold"}, + }, + }, + } + } else { + coldPhase, ok := phases["cold"].(map[string]interface{}) + if !ok { + return fmt.Errorf("failed to get cold phase from phases") + } + coldPhase["min_age"] = "0d" + } + } + + // 创建或更新策略 + policyData, err := json.Marshal(policy) + if err != nil { + return fmt.Errorf("failed to marshal ILM policy: %v", err) + } + req, err := http.NewRequest("PUT", policyURL, bytes.NewBuffer(policyData)) + if err != nil { + return fmt.Errorf("failed to create ILM policy request: %v", err) + } + req.Header.Set("Content-Type", "application/json") + req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password) + resp, err = client.Do(req) + if err != nil { + return fmt.Errorf("failed to send ILM policy request: %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("failed to create ILM policy, status: %d, response: %s", resp.StatusCode, string(body)) + } + fmt.Printf("[ILM Policy] Successfully created or updated ILM policy: %s\n", policyName) return nil } diff --git a/fluentd-deployment.yaml b/fluentd-deployment.yaml new file mode 100644 index 0000000..16ac043 --- /dev/null +++ b/fluentd-deployment.yaml @@ -0,0 +1,11 @@ + +<<<<<<< HEAD +======= + initContainers: + - name: install-rewrite-tag-filter + image: your-fluentd-image:version + command: ["fluent-gem", "install", "fluent-plugin-rewrite-tag-filter"] + containers: + - name: fluentd + image: your-fluentd-image:version +>>>>>>> Snippet diff --git a/fluentd.conf b/fluentd.conf new file mode 100644 index 0000000..6a28adf --- /dev/null +++ b/fluentd.conf @@ -0,0 +1,29 @@ + +<<<<<<< HEAD +======= + + @type copy + + @type elasticsearch + @id output_elasticsearch_tanya + host elasticsearch + port 9200 + scheme http + user fluentd_user + password fluentd_password + logstash_format true + logstash_prefix tanya + logstash_dateformat %Y.%m.%d + flush_interval 5s + @log_level debug + id_key _id + remove_keys _id + include_tag_key true + tag_key @log_name + + + @type stdout + @id output_stdout_tanya + + +>>>>>>> Snippet diff --git a/okx/candleList.go b/okx/candleList.go index 533900a..6e3268e 100644 --- a/okx/candleList.go +++ b/okx/candleList.go @@ -388,7 +388,7 @@ func calculateCrossPrice(price1, price2 string) (string, error) { func (cl *CandleList) ToElastic() error { client := &http.Client{Timeout: 30 * time.Second} - duration, err := parsePeriod(cl.Period) + _, err := parsePeriod(cl.Period) if err != nil { return fmt.Errorf("failed to parse period: %v", err) } @@ -462,11 +462,62 @@ func (cl *CandleList) ToElastic() error { cstTime := time.UnixMilli(ts).In(loc) // 时间戳对齐检查 - if cl.Period == "1D" { - if cstTime.Hour() != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 { - return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period) + duration, err := parsePeriod(cl.Period) + if err != nil { + return fmt.Errorf("failed to parse period: %v", err) + } + + // 对于日线及以上周期,检查是否为周期的整数倍 + if duration >= 24*time.Hour { + // 特殊处理1D周期 + if cl.Period == "1D" { + // 检查是否为当天的00:00:00 + if cstTime.Hour() != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 { + return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period) + } + } else { + // 对于其他日线及以上周期,使用原来的检查逻辑 + totalHours := cstTime.Unix() / 3600 + periodHours := int64(duration.Hours()) + + if totalHours%periodHours != 0 { + return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period) + } + + if cstTime.Minute() != 0 || cstTime.Second() != 0 { + return fmt.Errorf("timestamp %d (%s) has non-zero minutes/seconds for period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period) + } + } + } else if duration >= time.Hour { + // 对于小时级周期,检查小时、分钟、秒是否对齐 + switch cl.Period { + case "1H": + if cstTime.Minute() != 0 || cstTime.Second() != 0 { + return fmt.Errorf("timestamp %d (%s) has non-zero minutes/seconds for period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period) + } + case "2H": + if cstTime.Hour()%2 != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 { + return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period) + } + case "4H": + if cstTime.Hour()%4 != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 { + return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period) + } + case "6H": + if (cstTime.Hour()%6 != 0) || cstTime.Minute() != 0 || cstTime.Second() != 0 { + return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period) + } + case "12H": + if (cstTime.Hour() != 0 && cstTime.Hour() != 12) || cstTime.Minute() != 0 || cstTime.Second() != 0 { + return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period) + } + default: + if cstTime.UnixMilli()%duration.Milliseconds() != 0 { + return fmt.Errorf("timestamp %d is not aligned with period %s", ts, cl.Period) + } } } else { + // 对于分钟级周期,使用毫秒取模检查 if cstTime.UnixMilli()%duration.Milliseconds() != 0 { return fmt.Errorf("timestamp %d is not aligned with period %s", ts, cl.Period) } diff --git a/okx/publicApiService.go b/okx/publicApiService.go index e070afc..42cce37 100644 --- a/okx/publicApiService.go +++ b/okx/publicApiService.go @@ -8,9 +8,48 @@ import ( "io" "net/http" "net/url" + "strconv" "time" ) +// getPeriodDuration 根据时间周期字符串返回对应的 duration +func getPeriodDuration(period string) (time.Duration, error) { + switch period { + case "1m": + return time.Minute, nil + case "3m": + return 3 * time.Minute, nil + case "5m": + return 5 * time.Minute, nil + case "15m": + return 15 * time.Minute, nil + case "30m": + return 30 * time.Minute, nil + case "1H": + return time.Hour, nil + case "2H": + return 2 * time.Hour, nil + case "4H": + return 4 * time.Hour, nil + case "6H": + return 6 * time.Hour, nil + case "12H": + return 12 * time.Hour, nil + case "1D": + return 24 * time.Hour, nil + case "2D": + return 2 * 24 * time.Hour, nil + case "3D": + return 3 * 24 * time.Hour, nil + case "5D": + return 5 * 24 * time.Hour, nil + case "1W": + return 7 * 24 * time.Hour, nil + default: + return 0, fmt.Errorf("unsupported bar period: %s", period) + } +} + type OkxPublicDataService struct { BaseURL string client *http.Client @@ -79,7 +118,46 @@ func (s *OkxPublicDataService) GetInstruments(params InstrumentsRequest) ([]Inst // GetCandles 获取K线数据 func (s *OkxPublicDataService) GetCandles(params CandlesRequest) ([]*Candle, error) { - u, err := url.Parse(s.BaseURL + "/market/candles") + // 根据时间范围选择不同的API端点 + endpoint := "/market/candles" + + // 计算时间范围 + now := time.Now() + var startTime time.Time + var err error + + // 优先使用 After 参数,如果都提供了 + if params.After != "" { + // 将毫秒时间戳转换为 time.Time + timestamp, err := strconv.ParseInt(params.After, 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid After timestamp: %v", err) + } + startTime = time.Unix(0, timestamp*int64(time.Millisecond)) + } else if params.Before != "" { + // 将毫秒时间戳转换为 time.Time + timestamp, err := strconv.ParseInt(params.Before, 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid Before timestamp: %v", err) + } + startTime = time.Unix(0, timestamp*int64(time.Millisecond)) + } else { + return nil, fmt.Errorf("either After or Before parameter is required") + } + + // 根据时间维度计算20个周期 + unitDuration, err := getPeriodDuration(params.Bar) + if err != nil { + return nil, err + } + periodDuration := 20 * unitDuration + + // 如果数据超过20个周期,使用历史数据接口 + if now.Sub(startTime) > periodDuration { + endpoint = "/market/history-candles" + } + + u, err := url.Parse(s.BaseURL + endpoint) if err != nil { return nil, err } diff --git a/test/okxAli/candleList_test.go b/test/okxAli/candleList_test.go index 20e4833..e64f8f3 100644 --- a/test/okxAli/candleList_test.go +++ b/test/okxAli/candleList_test.go @@ -75,29 +75,39 @@ import ( // if err != nil { // t.Fatalf("ToEs failed: %v", err) // } -// } +// }:q func TestCandleListI_CalculateCrossPair(t *testing.T) { - startTime := time.Date(2025, 2, 1, 0, 0, 0, 0, time.UTC) - endTime := time.Date(2025, 3, 28, 0, 0, 0, 0, time.UTC) - okbUsdt, err := MakeCandleList("OKB-USDT", "4H", startTime, endTime, 50) + // 使用更早的时间范围来触发不同的phase + startTime := time.Date(2022, 2, 1, 0, 0, 0, 0, time.UTC) + endTime := time.Date(2022, 2, 28, 0, 0, 0, 0, time.UTC) + + // 打印测试时间范围 + t.Logf("Test time range: %s to %s", startTime, endTime) + okbUsdt, err := MakeCandleList("OKB-USDT", "30m", startTime, endTime, 50) if err != nil { - t.Fatalf("ToEs failed: %v", err) + t.Fatalf("MakeCandleList failed: %v", err) } - ethUsdt, err := MakeCandleList("ETH-USDT", "4H", startTime, endTime, 50) + ethUsdt, err := MakeCandleList("ETH-USDT", "30m", startTime, endTime, 50) if err != nil { - t.Fatalf("ToEs failed: %v", err) - } - if err != nil { - t.Fatalf("ToEs failed: %v", err) + t.Fatalf("MakeCandleList failed: %v", err) } okbEth, err := okbUsdt.CalculateCrossPair(ethUsdt) if err != nil { - t.Fatalf("ToEs failed: %v", err) + t.Fatalf("CalculateCrossPair failed: %v", err) } + + // 打印交叉对信息 + t.Logf("Cross pair: %s, period: %s, candle count: %d", + okbEth.CoinPair, okbEth.Period, len(okbEth.Candles)) + + // 添加详细的日志输出 err = okbEth.ToElastic() if err != nil { - t.Fatalf("ToEs failed: %v", err) + t.Fatalf("ToElastic failed: %v", err) } + + // 打印成功信息 + t.Log("Test completed successfully") }