diff --git a/config/config.go b/config/config.go index 88d7fb7..110de88 100644 --- a/config/config.go +++ b/config/config.go @@ -6,7 +6,10 @@ import ( ) type Config struct { - FluentdURL string `json:"fluentdURL_url"` + FluentdURL string `json:"fluentdURL"` + ElasticsearchURL string `json:"ElasticsearchURL"` + ElasticsearchUser string `json:"ElasticsearchUser"` + ElasticsearchPassword string `json:"ElasticsearchPassword"` } // LoadConfig 从指定路径加载配置文件 diff --git a/config/config.json b/config/config.json index 568691f..ab5f889 100644 --- a/config/config.json +++ b/config/config.json @@ -1,5 +1,8 @@ { - "fluentdURL_url": "http://fluentd.k8s.xunlang.home" + "fluentdURL": "http://fluentd.k8s.xunlang.home", + "ElasticsearchURL": "http://elastic.k8s.xunlang.home", + "ElasticsearchUser": "fluentd_user", + "ElasticsearchPassword": "fluentd_password" } diff --git a/okx/candleList.go b/okx/candleList.go index 313c6c9..b1e500b 100644 --- a/okx/candleList.go +++ b/okx/candleList.go @@ -326,3 +326,130 @@ func parsePeriod(period string) (time.Duration, error) { return 0, fmt.Errorf("unsupported period: %s", period) } } + +// ToElastic 将Candle数据发送到Elasticsearch +func (cl *CandleList) ToElastic() error { + // 获取当前年份 + currentYear := time.Now().Year() + + // 构造索引名称,格式为:logstash-candle.交易对.年份.周期(保持周期原样) + index := fmt.Sprintf("logstash-candle.%s.%d.%s", strings.ToLower(cl.CoinPair), currentYear, strings.ToLower(cl.Period)) + + // 创建带超时的HTTP客户端 + client := &http.Client{ + Timeout: 30 * time.Second, + } + + // 获取周期的 duration + duration, err := parsePeriod(cl.Period) + if err != nil { + return fmt.Errorf("failed to parse period: %v", err) + } + + for _, candle := range cl.Candles { + // 验证时间戳是否为周期的整数倍 + ts, err := strconv.ParseInt(candle.Timestamp, 10, 64) + if err != nil { + fmt.Println("invalid timestamp:", ts, err) + continue + } + + // 转换为东八区时间 + loc, _ := time.LoadLocation("Asia/Shanghai") + cstTime := time.UnixMilli(ts).In(loc) + + // 添加调试信息 + fmt.Printf("Timestamp: %d, CST Time: %s, Period: %s\n", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period) + + // 对于日线数据,检查是否为当天的 00:00:00 + if cl.Period == "1D" { + if cstTime.Hour() != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 { + return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period) + } + } else { + // 对于其他周期,使用原来的对齐检查 + if cstTime.UnixMilli()%duration.Milliseconds() != 0 { + return fmt.Errorf("timestamp %d is not aligned with period %s", ts, cl.Period) + } + } + + // 构造记录 + record := map[string]interface{}{ + "dataTime": cstTime.Format("2006-01-02 15:04:05"), + "open": candle.Open, + "high": candle.High, + "low": candle.Low, + "close": candle.Close, + "volume": candle.Volume, + "volumeCcy": candle.VolumeCcy, + } + + // 构造请求体 + jsonData, err := json.Marshal(record) + if err != nil { + return fmt.Errorf("failed to marshal data: %v", err) + } + var config *cfg.Config + configPaths := []string{ + "config/config.json", // 当前目录下的config目录 + "../config/config.json", // 上一级目录下的config目录 + "../../config/config.json", // 上两级目录下的config目录 + } + + var loadErr error + for _, path := range configPaths { + config, loadErr = cfg.LoadConfig(path) + if loadErr == nil { + break + } + } + + if config == nil { + return fmt.Errorf("failed to load configuration: %v", loadErr) + } + + // 构造完整URL + fullURL := fmt.Sprintf("%s/%s/_doc/%d", strings.TrimRight(config.ElasticsearchURL, "/"), index, ts) + fmt.Println("fullURL: ", fullURL) + + // 创建请求 + req, err := http.NewRequest("POST", fullURL, bytes.NewBuffer(jsonData)) + if err != nil { + return fmt.Errorf("failed to create request: %v", err) + } + + // 设置请求头 + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + // 尝试从不同层级加载配置 + + // 设置基本认证 + req.SetBasicAuth(config.ElasticsearchUser, config.ElasticsearchPassword) + + // 发送HTTP请求 + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to send data to Elasticsearch: %v", err) + } + defer resp.Body.Close() + + // 读取响应体 + body, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to read response body: %v", err) + } + + // 输出完整的响应信息 + fmt.Printf("HTTP Response Status: %s\n", resp.Status) + fmt.Printf("HTTP Response Headers: %v\n", resp.Header) + fmt.Printf("HTTP Response Body: %s\n", string(body)) + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { + return fmt.Errorf("unexpected status code: %d, response: %s", resp.StatusCode, string(body)) + } + + fmt.Printf("Successfully sent record to Elasticsearch: %s\n", fullURL) + } + + return nil +} diff --git a/test/okxAli/candleList_test.go b/test/okxAli/candleList_test.go index 918a372..8452ad6 100644 --- a/test/okxAli/candleList_test.go +++ b/test/okxAli/candleList_test.go @@ -2,63 +2,75 @@ package okx import ( . "gitea.zjmud.xyz/phyer/tanya/okx" - "os" + // "os" "testing" "time" ) -func TestCandleList_ToJson(t *testing.T) { - startTime := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC) - endTime := time.Date(2023, 12, 31, 0, 0, 0, 0, time.UTC) - cl, err := MakeCandleList("BTC-USDT", "1D", startTime, endTime, 50) - if err != nil { - t.Fatalf("ToJson failed: %v", err) - } - jsonStr, err := cl.ToJson() - if err != nil { - t.Fatalf("ToJson failed: %v", err) - } +// func TestCandleList_ToJson(t *testing.T) { +// startTime := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC) +// endTime := time.Date(2023, 12, 31, 0, 0, 0, 0, time.UTC) +// cl, err := MakeCandleList("BTC-USDT", "1D", startTime, endTime, 50) +// if err != nil { +// t.Fatalf("ToJson failed: %v", err) +// } +// jsonStr, err := cl.ToJson() +// if err != nil { +// t.Fatalf("ToJson failed: %v", err) +// } +// +// // Write to temp file +// tmpFile := "/tmp/candle_test.json" +// err = os.WriteFile(tmpFile, []byte(jsonStr), 0644) +// if err != nil { +// t.Fatalf("Failed to write json file: %v", err) +// } +// +// t.Logf("JSON output written to %s", tmpFile) +// } +// +// func TestCandleList_ToCsv(t *testing.T) { +// startTime := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC) +// endTime := time.Date(2023, 12, 31, 0, 0, 0, 0, time.UTC) +// cl, err := MakeCandleList("BTC-USDT", "1D", startTime, endTime, 50) +// if err != nil { +// t.Fatalf("ToJson failed: %v", err) +// } +// csvStr, err := cl.ToCsv() +// if err != nil { +// t.Fatalf("ToCsv failed: %v", err) +// } +// +// // Write to temp file +// tmpFile := "/tmp/candle_test.csv" +// err = os.WriteFile(tmpFile, []byte(csvStr), 0644) +// if err != nil { +// t.Fatalf("Failed to write csv file: %v", err) +// } +// +// t.Logf("CSV output written to %s", tmpFile) +// } - // Write to temp file - tmpFile := "/tmp/candle_test.json" - err = os.WriteFile(tmpFile, []byte(jsonStr), 0644) - if err != nil { - t.Fatalf("Failed to write json file: %v", err) - } - - t.Logf("JSON output written to %s", tmpFile) -} - -func TestCandleList_ToCsv(t *testing.T) { - startTime := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC) - endTime := time.Date(2023, 12, 31, 0, 0, 0, 0, time.UTC) - cl, err := MakeCandleList("BTC-USDT", "1D", startTime, endTime, 50) - if err != nil { - t.Fatalf("ToJson failed: %v", err) - } - csvStr, err := cl.ToCsv() - if err != nil { - t.Fatalf("ToCsv failed: %v", err) - } - - // Write to temp file - tmpFile := "/tmp/candle_test.csv" - err = os.WriteFile(tmpFile, []byte(csvStr), 0644) - if err != nil { - t.Fatalf("Failed to write csv file: %v", err) - } - - t.Logf("CSV output written to %s", tmpFile) -} - -func TestCandleList_ToEs(t *testing.T) { +// func TestCandleList_ToFluentd(t *testing.T) { +// startTime := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC) +// endTime := time.Date(2023, 12, 31, 0, 0, 0, 0, time.UTC) +// cl, err := MakeCandleList("BTC-USDT", "1D", startTime, endTime, 50) +// if err != nil { +// t.Fatalf("ToEs failed: %v", err) +// } +// err = cl.ToFluentd() +// if err != nil { +// t.Fatalf("ToEs failed: %v", err) +// } +// } +func TestCandleList_ToElastic(t *testing.T) { startTime := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC) endTime := time.Date(2023, 12, 31, 0, 0, 0, 0, time.UTC) cl, err := MakeCandleList("BTC-USDT", "1D", startTime, endTime, 50) if err != nil { t.Fatalf("ToEs failed: %v", err) } - err = cl.ToFluentd() + err = cl.ToElastic() if err != nil { t.Fatalf("ToEs failed: %v", err) }