据说索引都要有alias,alias不能跟别的索引重复,加上了相关逻辑,有待后续确认

This commit is contained in:
zhangkun9038@dingtalk.com 2025-03-28 22:04:49 +08:00
parent e4f05bb543
commit dcb41e3266
11 changed files with 741 additions and 200 deletions

View File

@ -1,5 +0,0 @@
<<<<<<< HEAD
=======
RUN fluent-gem install fluent-plugin-elasticsearch fluent-plugin-rewrite-tag-filter
>>>>>>> Snippet

View File

@ -35,6 +35,13 @@ type ILMConfig struct {
}
// DataTypeConfig 每种数据类型的 ILM 配置
type TimeParameters struct {
TimestampFormat string `json:"timestamp_format"`
DefaultAfter string `json:"default_after"`
DefaultBefore string `json:"default_before"`
MaxRangeDays int `json:"max_range_days"`
}
type DataTypeConfig struct {
IndexPattern string `json:"index_pattern"`
MaxRetention map[string]int `json:"max_retention"`
@ -43,6 +50,7 @@ type DataTypeConfig struct {
Replicas int `json:"replicas"`
NormalRollover map[string]string `json:"normal_rollover"`
NormalPhases map[string]int `json:"normal_phases"`
TimeParameters TimeParameters `json:"time_parameters"`
}
// LoadConfig 加载配置文件

View File

@ -35,11 +35,13 @@
"replicas": 1,
"normal_rollover": {
"max_size": "30GB",
"max_age": "14d"
"max_age": "14d",
"max_docs": "1000000"
},
"normal_phases": {
"warm": 60,
"cold": 180
"cold": 180,
"delete": 365
}
},
"ma": {
@ -58,6 +60,12 @@
"normal_phases": {
"warm": 90,
"cold": 270
},
"time_parameters": {
"timestamp_format": "unix_millis",
"default_after": "now-7d",
"default_before": "now",
"max_range_days": 30
}
}
}

View File

@ -0,0 +1,49 @@
<<<<<<< HEAD
=======
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_size": "50GB",
"max_age": "1d"
},
"allocate": {
"include": {
"_tier_preference": "data_hot"
}
}
}
},
"warm": {
"min_age": "1d",
"actions": {
"allocate": {
"include": {
"_tier_preference": "data_warm"
}
},
"shrink": {
"number_of_shards": 2
}
}
},
"cold": {
"min_age": "7d",
"actions": {
"allocate": {
"include": {
"_tier_preference": "data_cold"
}
},
"shrink": {
"number_of_shards": 1
}
}
}
}
}
}
>>>>>>> Snippet

9
config/fluentd.yaml Normal file
View File

@ -0,0 +1,9 @@
<<<<<<< HEAD
=======
host ${ENV['ELASTICSEARCH_HOST'] || "elasticsearch"}
port ${ENV['ELASTICSEARCH_PORT'] || 9200}
scheme ${ENV['ELASTICSEARCH_SCHEME'] || "http"}
user ${ENV['ELASTICSEARCH_USER'] || "fluentd_user"}
password ${ENV['ELASTICSEARCH_PASSWORD'] || "fluentd_password"}
>>>>>>> Snippet

View File

@ -7,6 +7,8 @@ import (
"io"
"math"
"net/http"
"strconv"
"strings"
"time"
cfg "gitea.zjmud.xyz/phyer/tanya/config" // 导入你的 config 包
@ -82,43 +84,232 @@ func periodToMinutes(period string) int {
}
}
// ConfigureILM 动态配置 ILM 策略和索引模板
// getPhase 根据时间差和保留时间决定初始阶段
func getPhase(daysDiff, retentionDays int) string {
if daysDiff > 730 || retentionDays < 180 { // 超过 2 年或保留时间少于 180 天
fmt.Printf("[ILM Phase] 数据已超过2年或保留时间少于180天判定为cold阶段。daysDiff: %d, retentionDays: %d\n", daysDiff, retentionDays)
return "cold"
}
if daysDiff <= 7 { // 7天内的数据为hot阶段
fmt.Printf("[ILM Phase] 数据在7天内判定为hot阶段。daysDiff: %d\n", daysDiff)
return "hot"
}
fmt.Printf("[ILM Phase] 数据在7天到2年之间判定为warm阶段。daysDiff: %d\n", daysDiff)
return "warm"
}
func ensureAlias(client *http.Client, esConfig cfg.ElasticsearchConfig, alias, period string) error {
// 获取当前别名关联的索引
getAliasURL := fmt.Sprintf("%s/_alias/%s", esConfig.URL, alias)
req, err := http.NewRequest("GET", getAliasURL, nil)
if err != nil {
return err
}
req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password)
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
// 准备别名更新的操作
var actions []interface{}
var aliasInfo map[string]map[string]interface{}
// 如果别名存在,解析当前关联的索引
if resp.StatusCode == http.StatusOK {
if err := json.NewDecoder(resp.Body).Decode(&aliasInfo); err != nil {
return err
}
// 移除所有现有索引的写入索引标记
for indexName, info := range aliasInfo {
if aliases, ok := info["aliases"].(map[string]interface{}); ok {
for aliasName, aliasDetails := range aliases {
if aliasName == alias && aliasDetails.(map[string]interface{})["is_write_index"] == true {
actions = append(actions, map[string]interface{}{
"remove": map[string]interface{}{
"index": indexName,
"alias": alias,
},
})
}
}
}
}
}
// 确定最新的索引作为写入索引
var latestIndex string
var latestTime time.Time
for indexName := range aliasInfo {
parts := strings.Split(indexName, ".")
if len(parts) < 3 {
continue
}
datePart := parts[len(parts)-1] // 例如 2025-03
indexTime, err := time.Parse("2006-01", datePart)
if err != nil {
continue
}
if latestIndex == "" || indexTime.After(latestTime) {
latestIndex = indexName
latestTime = indexTime
}
}
// 如果没有找到合适的索引,创建一个新的索引模式
if latestIndex == "" {
latestIndex = fmt.Sprintf("logstash-%s.candle.okb-eth.%s", period, time.Now().Format("2006-01"))
}
// 检查索引是否存在,如果不存在则创建
indexURL := fmt.Sprintf("%s/%s", esConfig.URL, latestIndex)
req, err = http.NewRequest("HEAD", indexURL, nil)
if err != nil {
return err
}
req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password)
resp, err = client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
// 索引不存在,创建索引
fmt.Printf("Index %s does not exist, creating it...\n", latestIndex)
createIndexURL := fmt.Sprintf("%s/%s", esConfig.URL, latestIndex)
// 简单的索引创建请求,可以根据需要添加更多设置
indexData := map[string]interface{}{
"settings": map[string]interface{}{
"number_of_shards": 2, // 根据你的配置调整
"number_of_replicas": 1, // 根据你的配置调整
},
}
data, err := json.Marshal(indexData)
if err != nil {
return fmt.Errorf("failed to marshal index creation data: %v", err)
}
req, err = http.NewRequest("PUT", createIndexURL, bytes.NewBuffer(data))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password)
resp, err = client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("failed to create index %s, status: %d, response: %s", latestIndex, resp.StatusCode, string(body))
}
fmt.Printf("Successfully created index: %s\n", latestIndex)
}
// 添加最新的索引作为写入索引
actions = append(actions, map[string]interface{}{
"add": map[string]interface{}{
"index": latestIndex,
"alias": alias,
"is_write_index": true,
},
})
// 确保其他索引的 is_write_index 为 false
for indexName := range aliasInfo {
if indexName != latestIndex {
actions = append(actions, map[string]interface{}{
"add": map[string]interface{}{
"index": indexName,
"alias": alias,
"is_write_index": false,
},
})
}
}
// 执行别名更新操作
aliasURL := fmt.Sprintf("%s/_aliases", esConfig.URL)
aliasData := map[string]interface{}{
"actions": actions,
}
data, err := json.Marshal(aliasData)
if err != nil {
return err
}
req, err = http.NewRequest("POST", aliasURL, bytes.NewBuffer(data))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password)
resp, err = client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("failed to create alias, status: %d, response: %s", resp.StatusCode, string(body))
}
return nil
}
func ConfigureILM(client *http.Client, config *cfg.Config, dataType, period string, indexTime time.Time) error {
// 从配置文件中获取 Elasticsearch 和 ILM 配置
esConfig := config.Elasticsearch
dataConfig, ok := esConfig.ILM.DataTypes[dataType]
if !ok {
return fmt.Errorf("ILM configuration for data type '%s' not found in config", dataType)
}
// 创建保留时间计算器
// 计算保留时间和时间差
calc := DefaultRetentionCalculator{
MaxRetention: dataConfig.MaxRetention,
MinRetention: dataConfig.MinRetention,
}
// 计算距今时间差和保留时间
now := time.Now()
daysDiff := int(now.Sub(indexTime).Hours() / 24)
retentionDays := calc.Calculate(daysDiff, period)
// 构造 ILM 策略名称
year, month := indexTime.Year(), int(indexTime.Month())
policyName := fmt.Sprintf("logstash_%s_%s_%d_%02d_%s", dataType, period, year, month, getPhase(daysDiff, retentionDays))
// 创建或更新 ILM 策略
if err := ensureILMPolicy(client, esConfig, policyName, daysDiff, retentionDays, dataConfig); err != nil {
return fmt.Errorf("failed to ensure ILM policy: %v", err)
// 确保保留时间不低于冷阶段的最小值
minDeleteAge := dataConfig.NormalPhases["cold"]
if retentionDays < minDeleteAge {
retentionDays = minDeleteAge
}
// 配置索引模板
templateName := fmt.Sprintf("logstash-%s-%s-%d-%02d", dataType, period, year, month)
// 格式化策略名称、模板名称、索引模式和别名
year, month := indexTime.Year(), int(indexTime.Month())
policyName := fmt.Sprintf("logstash_%s_%s_%d_%02d", dataType, period, year, month)
templateName := fmt.Sprintf("log_stash_%s_%s_%d_%02d", dataType, period, year, month)
indexPattern := fmt.Sprintf(dataConfig.IndexPattern, period, year, month)
// 根据数据类型设置映射
var mappings map[string]interface{}
switch dataType {
case "candle":
mappings = map[string]interface{}{
aliasName := fmt.Sprintf("%s_alias", policyName)
fmt.Printf("[ConfigureILM] Configuring ILM with policyName: %s, templateName: %s, aliasName: %s\n", policyName, templateName, aliasName)
// 创建索引模板
template := map[string]interface{}{
"index_patterns": []string{indexPattern},
"priority": 100,
"template": map[string]interface{}{
"settings": map[string]interface{}{
"number_of_shards": dataConfig.Shards,
"number_of_replicas": dataConfig.Replicas,
"index.lifecycle.name": policyName,
"index.lifecycle.rollover_alias": aliasName, // 确保与 aliasName 一致
},
"mappings": map[string]interface{}{
"properties": map[string]interface{}{
"dataTime": map[string]string{"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
"open": map[string]string{"type": "float"},
@ -128,107 +319,188 @@ func ConfigureILM(client *http.Client, config *cfg.Config, dataType, period stri
"volume": map[string]string{"type": "float"},
"volumeCcy": map[string]string{"type": "float"},
},
}
case "ma":
mappings = map[string]interface{}{
"properties": map[string]interface{}{
"dataTime": map[string]string{"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
"ma_value": map[string]string{"type": "float"}, // MA 值
},
}
default:
return fmt.Errorf("unsupported data type: %s", dataType)
}
template := map[string]interface{}{
"index_patterns": []string{indexPattern},
"template": map[string]interface{}{
"settings": map[string]interface{}{
"number_of_shards": dataConfig.Shards,
"number_of_replicas": dataConfig.Replicas,
"index.lifecycle.name": policyName,
},
"mappings": mappings,
},
}
templateURL := fmt.Sprintf("%s/_index_template/%s", esConfig.URL, templateName)
templateData, _ := json.Marshal(template)
req, err := http.NewRequest("PUT", templateURL, bytes.NewBuffer(templateData))
templateData, err := json.Marshal(template)
if err != nil {
return fmt.Errorf("failed to create template request: %v", err)
return fmt.Errorf("failed to marshal index template data: %v", err)
}
req, err := http.NewRequest("PUT", templateURL, bytes.NewBuffer(templateData))
if err != nil {
return fmt.Errorf("failed to create index template request: %v", err)
}
req.Header.Set("Content-Type", "application/json")
req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password)
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to send template request: %v", err)
return fmt.Errorf("failed to send index template request: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("failed to create template, status: %d, response: %s", resp.StatusCode, string(body))
return fmt.Errorf("failed to create index template, status: %d, response: %s", resp.StatusCode, string(body))
}
fmt.Printf("[ConfigureILM] Successfully configured ILM template: %s\n", templateName)
// 创建或更新 ILM 策略
if err := ensureILMPolicy(client, esConfig, policyName, dataType, period, daysDiff, retentionDays, dataConfig); err != nil {
return fmt.Errorf("failed to ensure ILM policy: %v", err)
}
fmt.Printf("Successfully configured ILM template: %s\n", templateName)
// 设置别名
if err := ensureAlias(client, esConfig, aliasName, period); err != nil {
return fmt.Errorf("failed to ensure alias: %v", err)
}
fmt.Printf("[ConfigureILM] Successfully completed ILM configuration for policy: %s\n", policyName)
return nil
}
// getPhase 根据时间差和保留时间决定初始阶段
func getPhase(daysDiff, retentionDays int) string {
if daysDiff > 730 || retentionDays < 180 { // 超过 2 年或保留时间少于 180 天
return "cold"
}
return "normal"
func ensureILMPolicy(client *http.Client, esConfig cfg.ElasticsearchConfig, policyName, dataType, period string, daysDiff, retentionDays int, dataConfig cfg.DataTypeConfig) error {
// 检查 default_policy 是否存在
defaultPolicyURL := fmt.Sprintf("%s/_ilm/policy/default_policy", esConfig.URL)
resp, err := client.Get(defaultPolicyURL)
if err != nil {
return fmt.Errorf("failed to check default_policy: %v", err)
}
defer resp.Body.Close()
// ensureILMPolicy 创建或更新 ILM 策略
func ensureILMPolicy(client *http.Client, esConfig cfg.ElasticsearchConfig, policyName string, daysDiff, retentionDays int, dataConfig cfg.DataTypeConfig) error {
policyURL := fmt.Sprintf("%s/_ilm/policy/%s", esConfig.URL, policyName)
resp, err := client.Get(policyURL)
if err == nil && resp.StatusCode == http.StatusOK {
resp.Body.Close()
return nil // 策略已存在
}
resp.Body.Close()
var policy map[string]interface{}
if daysDiff > 730 || retentionDays < 180 {
policy = map[string]interface{}{
"policy": map[string]interface{}{
"phases": map[string]interface{}{
"cold": map[string]interface{}{
"min_age": "0d",
"actions": map[string]interface{}{
"allocate": map[string]interface{}{
"include": map[string]string{"_tier_preference": "data_cold"},
},
"shrink": map[string]interface{}{
"number_of_shards": 1,
},
},
},
"delete": map[string]interface{}{
"min_age": fmt.Sprintf("%dd", retentionDays),
"actions": map[string]interface{}{
"delete": map[string]interface{}{}, // 添加 delete 动作
},
},
},
},
}
} else {
policy = map[string]interface{}{
// 如果 default_policy 不存在,则创建
if resp.StatusCode != http.StatusOK {
fmt.Println("[ILM Policy] Default policy does not exist, creating it...")
defaultPolicy := map[string]interface{}{
"policy": map[string]interface{}{
"phases": map[string]interface{}{
"hot": map[string]interface{}{
"actions": map[string]interface{}{
"rollover": dataConfig.NormalRollover,
"allocate": map[string]interface{}{
"include": map[string]string{"_tier_preference": "data_hot"},
"set_priority": map[string]interface{}{
"priority": 100,
},
},
},
"delete": map[string]interface{}{
"min_age": "365d",
"actions": map[string]interface{}{
"delete": map[string]interface{}{},
},
},
},
},
}
defaultPolicyData, err := json.Marshal(defaultPolicy)
if err != nil {
return fmt.Errorf("failed to marshal default_policy: %v", err)
}
req, err := http.NewRequest("PUT", defaultPolicyURL, bytes.NewBuffer(defaultPolicyData))
if err != nil {
return fmt.Errorf("failed to create default_policy request: %v", err)
}
req.Header.Set("Content-Type", "application/json")
req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password)
resp, err = client.Do(req)
if err != nil {
return fmt.Errorf("failed to create default_policy: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("failed to create default_policy, status: %d, response: %s", resp.StatusCode, string(body))
}
fmt.Println("[ILM Policy] Successfully created default_policy")
}
// 检查目标策略是否存在
policyURL := fmt.Sprintf("%s/_ilm/policy/%s", esConfig.URL, policyName)
resp, err = client.Get(policyURL)
if err != nil {
return fmt.Errorf("failed to check ILM policy: %v", err)
}
defer resp.Body.Close()
// 如果策略已存在,更新索引设置并删除旧策略
if resp.StatusCode == http.StatusOK {
fmt.Printf("[ILM Policy] Policy %s already exists, updating index settings...\n", policyName)
// 更新索引设置
updateIndexURL := fmt.Sprintf("%s/logstash-%s.%s.*/_settings", esConfig.URL, period, dataType)
updateData := map[string]interface{}{
"index.lifecycle.name": "default_policy",
"index.lifecycle.rollover_alias": fmt.Sprintf("logs-%s-%s-candle", dataType, period),
}
updateDataBytes, err := json.Marshal(updateData)
if err != nil {
return fmt.Errorf("failed to marshal update index data: %v", err)
}
req, err := http.NewRequest("PUT", updateIndexURL, bytes.NewBuffer(updateDataBytes))
if err != nil {
return fmt.Errorf("failed to create update index request: %v", err)
}
req.Header.Set("Content-Type", "application/json")
req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password)
updateResp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to update index settings: %v", err)
}
defer updateResp.Body.Close()
if updateResp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(updateResp.Body)
return fmt.Errorf("failed to update index settings, status: %d, response: %s", updateResp.StatusCode, string(body))
}
fmt.Printf("[ILM Policy] Successfully updated index settings for policy: %s\n", policyName)
// 删除旧策略
req, err = http.NewRequest("DELETE", policyURL, nil)
if err != nil {
return fmt.Errorf("failed to create delete policy request: %v", err)
}
req.Header.Set("Content-Type", "application/json")
req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password)
deleteResp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to delete old policy: %v", err)
}
defer deleteResp.Body.Close()
if deleteResp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(deleteResp.Body)
return fmt.Errorf("failed to delete old policy, status: %d, response: %s", deleteResp.StatusCode, string(body))
}
fmt.Printf("[ILM Policy] Successfully deleted old policy: %s\n", policyName)
} else {
fmt.Printf("[ILM Policy] Policy %s does not exist, creating a new one...\n", policyName)
}
// 创建新的 ILM 策略
initialPhase := getPhase(daysDiff, retentionDays)
fmt.Printf("[ILM Policy] Initial phase for policy %s is: %s\n", policyName, initialPhase)
rolloverActions := map[string]interface{}{
"max_age": dataConfig.NormalRollover["max_age"],
"max_size": dataConfig.NormalRollover["max_size"],
}
if maxDocsStr, ok := dataConfig.NormalRollover["max_docs"]; ok && maxDocsStr != "" {
maxDocs, err := strconv.Atoi(maxDocsStr)
if err != nil || maxDocs <= 0 {
fmt.Printf("[ILM Policy] Invalid max_docs value: %s, skipping max_docs in rollover\n", maxDocsStr)
} else {
rolloverActions["max_docs"] = maxDocs
}
} else {
fmt.Println("[ILM Policy] max_docs not set in NormalRollover, skipping max_docs in rollover")
}
policy := map[string]interface{}{
"policy": map[string]interface{}{
"phases": map[string]interface{}{
initialPhase: map[string]interface{}{
"actions": map[string]interface{}{
"rollover": rolloverActions,
"set_priority": map[string]interface{}{
"priority": 100,
},
},
},
@ -246,42 +518,63 @@ func ensureILMPolicy(client *http.Client, esConfig cfg.ElasticsearchConfig, poli
"allocate": map[string]interface{}{
"include": map[string]string{"_tier_preference": "data_cold"},
},
"shrink": map[string]interface{}{
"number_of_shards": 1,
},
},
},
"delete": map[string]interface{}{
"min_age": fmt.Sprintf("%dd", retentionDays),
"actions": map[string]interface{}{
"delete": map[string]interface{}{}, // 添加 delete 动作
"delete": map[string]interface{}{},
},
},
},
},
}
// 如果数据超过 730 天2 年),直接进入冷阶段
if daysDiff > 730 {
phases, ok := policy["policy"].(map[string]interface{})["phases"].(map[string]interface{})
if !ok {
return fmt.Errorf("failed to get phases from policy")
}
delete(phases, initialPhase)
if _, exists := phases["cold"]; !exists {
phases["cold"] = map[string]interface{}{
"min_age": "0d",
"actions": map[string]interface{}{
"allocate": map[string]interface{}{
"include": map[string]string{"_tier_preference": "data_cold"},
},
},
}
} else {
coldPhase, ok := phases["cold"].(map[string]interface{})
if !ok {
return fmt.Errorf("failed to get cold phase from phases")
}
coldPhase["min_age"] = "0d"
}
}
policyData, _ := json.Marshal(policy)
// 创建或更新策略
policyData, err := json.Marshal(policy)
if err != nil {
return fmt.Errorf("failed to marshal ILM policy: %v", err)
}
req, err := http.NewRequest("PUT", policyURL, bytes.NewBuffer(policyData))
if err != nil {
return fmt.Errorf("failed to create ILM policy request: %v", err)
}
req.Header.Set("Content-Type", "application/json")
req.SetBasicAuth(esConfig.Auth.Username, esConfig.Auth.Password)
resp, err = client.Do(req)
if err != nil {
return fmt.Errorf("failed to send ILM policy request: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("failed to create ILM policy, status: %d, response: %s", resp.StatusCode, string(body))
}
fmt.Printf("Successfully created ILM policy: %s\n", policyName)
fmt.Printf("[ILM Policy] Successfully created or updated ILM policy: %s\n", policyName)
return nil
}

11
fluentd-deployment.yaml Normal file
View File

@ -0,0 +1,11 @@
<<<<<<< HEAD
=======
initContainers:
- name: install-rewrite-tag-filter
image: your-fluentd-image:version
command: ["fluent-gem", "install", "fluent-plugin-rewrite-tag-filter"]
containers:
- name: fluentd
image: your-fluentd-image:version
>>>>>>> Snippet

29
fluentd.conf Normal file
View File

@ -0,0 +1,29 @@
<<<<<<< HEAD
=======
<match tanya.**>
@type copy
<store>
@type elasticsearch
@id output_elasticsearch_tanya
host elasticsearch
port 9200
scheme http
user fluentd_user
password fluentd_password
logstash_format true
logstash_prefix tanya
logstash_dateformat %Y.%m.%d
flush_interval 5s
@log_level debug
id_key _id
remove_keys _id
include_tag_key true
tag_key @log_name
</store>
<store>
@type stdout
@id output_stdout_tanya
</store>
</match>
>>>>>>> Snippet

View File

@ -388,7 +388,7 @@ func calculateCrossPrice(price1, price2 string) (string, error) {
func (cl *CandleList) ToElastic() error {
client := &http.Client{Timeout: 30 * time.Second}
duration, err := parsePeriod(cl.Period)
_, err := parsePeriod(cl.Period)
if err != nil {
return fmt.Errorf("failed to parse period: %v", err)
}
@ -462,11 +462,62 @@ func (cl *CandleList) ToElastic() error {
cstTime := time.UnixMilli(ts).In(loc)
// 时间戳对齐检查
duration, err := parsePeriod(cl.Period)
if err != nil {
return fmt.Errorf("failed to parse period: %v", err)
}
// 对于日线及以上周期,检查是否为周期的整数倍
if duration >= 24*time.Hour {
// 特殊处理1D周期
if cl.Period == "1D" {
// 检查是否为当天的00:00:00
if cstTime.Hour() != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 {
return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period)
}
} else {
// 对于其他日线及以上周期,使用原来的检查逻辑
totalHours := cstTime.Unix() / 3600
periodHours := int64(duration.Hours())
if totalHours%periodHours != 0 {
return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period)
}
if cstTime.Minute() != 0 || cstTime.Second() != 0 {
return fmt.Errorf("timestamp %d (%s) has non-zero minutes/seconds for period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period)
}
}
} else if duration >= time.Hour {
// 对于小时级周期,检查小时、分钟、秒是否对齐
switch cl.Period {
case "1H":
if cstTime.Minute() != 0 || cstTime.Second() != 0 {
return fmt.Errorf("timestamp %d (%s) has non-zero minutes/seconds for period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period)
}
case "2H":
if cstTime.Hour()%2 != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 {
return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period)
}
case "4H":
if cstTime.Hour()%4 != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 {
return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period)
}
case "6H":
if (cstTime.Hour()%6 != 0) || cstTime.Minute() != 0 || cstTime.Second() != 0 {
return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period)
}
case "12H":
if (cstTime.Hour() != 0 && cstTime.Hour() != 12) || cstTime.Minute() != 0 || cstTime.Second() != 0 {
return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period)
}
default:
if cstTime.UnixMilli()%duration.Milliseconds() != 0 {
return fmt.Errorf("timestamp %d is not aligned with period %s", ts, cl.Period)
}
}
} else {
// 对于分钟级周期,使用毫秒取模检查
if cstTime.UnixMilli()%duration.Milliseconds() != 0 {
return fmt.Errorf("timestamp %d is not aligned with period %s", ts, cl.Period)
}

View File

@ -8,9 +8,48 @@ import (
"io"
"net/http"
"net/url"
"strconv"
"time"
)
// getPeriodDuration 根据时间周期字符串返回对应的 duration
func getPeriodDuration(period string) (time.Duration, error) {
switch period {
case "1m":
return time.Minute, nil
case "3m":
return 3 * time.Minute, nil
case "5m":
return 5 * time.Minute, nil
case "15m":
return 15 * time.Minute, nil
case "30m":
return 30 * time.Minute, nil
case "1H":
return time.Hour, nil
case "2H":
return 2 * time.Hour, nil
case "4H":
return 4 * time.Hour, nil
case "6H":
return 6 * time.Hour, nil
case "12H":
return 12 * time.Hour, nil
case "1D":
return 24 * time.Hour, nil
case "2D":
return 2 * 24 * time.Hour, nil
case "3D":
return 3 * 24 * time.Hour, nil
case "5D":
return 5 * 24 * time.Hour, nil
case "1W":
return 7 * 24 * time.Hour, nil
default:
return 0, fmt.Errorf("unsupported bar period: %s", period)
}
}
type OkxPublicDataService struct {
BaseURL string
client *http.Client
@ -79,7 +118,46 @@ func (s *OkxPublicDataService) GetInstruments(params InstrumentsRequest) ([]Inst
// GetCandles 获取K线数据
func (s *OkxPublicDataService) GetCandles(params CandlesRequest) ([]*Candle, error) {
u, err := url.Parse(s.BaseURL + "/market/candles")
// 根据时间范围选择不同的API端点
endpoint := "/market/candles"
// 计算时间范围
now := time.Now()
var startTime time.Time
var err error
// 优先使用 After 参数,如果都提供了
if params.After != "" {
// 将毫秒时间戳转换为 time.Time
timestamp, err := strconv.ParseInt(params.After, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid After timestamp: %v", err)
}
startTime = time.Unix(0, timestamp*int64(time.Millisecond))
} else if params.Before != "" {
// 将毫秒时间戳转换为 time.Time
timestamp, err := strconv.ParseInt(params.Before, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid Before timestamp: %v", err)
}
startTime = time.Unix(0, timestamp*int64(time.Millisecond))
} else {
return nil, fmt.Errorf("either After or Before parameter is required")
}
// 根据时间维度计算20个周期
unitDuration, err := getPeriodDuration(params.Bar)
if err != nil {
return nil, err
}
periodDuration := 20 * unitDuration
// 如果数据超过20个周期使用历史数据接口
if now.Sub(startTime) > periodDuration {
endpoint = "/market/history-candles"
}
u, err := url.Parse(s.BaseURL + endpoint)
if err != nil {
return nil, err
}

View File

@ -75,29 +75,39 @@ import (
// if err != nil {
// t.Fatalf("ToEs failed: %v", err)
// }
// }
// }:q
func TestCandleListI_CalculateCrossPair(t *testing.T) {
startTime := time.Date(2025, 2, 1, 0, 0, 0, 0, time.UTC)
endTime := time.Date(2025, 3, 28, 0, 0, 0, 0, time.UTC)
okbUsdt, err := MakeCandleList("OKB-USDT", "4H", startTime, endTime, 50)
// 使用更早的时间范围来触发不同的phase
startTime := time.Date(2022, 2, 1, 0, 0, 0, 0, time.UTC)
endTime := time.Date(2022, 2, 28, 0, 0, 0, 0, time.UTC)
// 打印测试时间范围
t.Logf("Test time range: %s to %s", startTime, endTime)
okbUsdt, err := MakeCandleList("OKB-USDT", "30m", startTime, endTime, 50)
if err != nil {
t.Fatalf("ToEs failed: %v", err)
t.Fatalf("MakeCandleList failed: %v", err)
}
ethUsdt, err := MakeCandleList("ETH-USDT", "4H", startTime, endTime, 50)
ethUsdt, err := MakeCandleList("ETH-USDT", "30m", startTime, endTime, 50)
if err != nil {
t.Fatalf("ToEs failed: %v", err)
}
if err != nil {
t.Fatalf("ToEs failed: %v", err)
t.Fatalf("MakeCandleList failed: %v", err)
}
okbEth, err := okbUsdt.CalculateCrossPair(ethUsdt)
if err != nil {
t.Fatalf("ToEs failed: %v", err)
t.Fatalf("CalculateCrossPair failed: %v", err)
}
// 打印交叉对信息
t.Logf("Cross pair: %s, period: %s, candle count: %d",
okbEth.CoinPair, okbEth.Period, len(okbEth.Candles))
// 添加详细的日志输出
err = okbEth.ToElastic()
if err != nil {
t.Fatalf("ToEs failed: %v", err)
t.Fatalf("ToElastic failed: %v", err)
}
// 打印成功信息
t.Log("Test completed successfully")
}