From 7202a98998dbec0f27022a773e322a9e80c302fd Mon Sep 17 00:00:00 2001 From: "zhangkun9038@dingtalk.com" Date: Mon, 10 Mar 2025 11:00:46 +0800 Subject: [PATCH] up --- config/config.go | 26 ++++ config/config.json | 5 + config/fluentd.yaml | 78 ++++++++++++ fluentd.conf | 61 ++++++++++ okx/candleList.go | 203 +++++++++++++++++++++++++++++++- okx/publicApiService.go | 72 ++++++++++- test/okxAli/candleList_test.go | 21 +++- test/okxAli/get_candles_test.go | 13 ++ 8 files changed, 468 insertions(+), 11 deletions(-) create mode 100644 config/config.go create mode 100644 config/config.json create mode 100644 config/fluentd.yaml create mode 100644 fluentd.conf diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..41adb8f --- /dev/null +++ b/config/config.go @@ -0,0 +1,26 @@ +package config + +import ( + "encoding/json" + "os" +) + +type Config struct { + ElasticsearchURL string `json:"elasticsearch_url"` +} + +// LoadConfig 从指定路径加载配置文件 +func LoadConfig(path string) (*Config, error) { + file, err := os.ReadFile(path) + if err != nil { + return nil, err + } + + var config Config + err = json.Unmarshal(file, &config) + if err != nil { + return nil, err + } + + return &config, nil +} diff --git a/config/config.json b/config/config.json new file mode 100644 index 0000000..5f8eb60 --- /dev/null +++ b/config/config.json @@ -0,0 +1,5 @@ +{ + "elasticsearch_url": "http://fluentd.k8s.xunlang.home" +} + + diff --git a/config/fluentd.yaml b/config/fluentd.yaml new file mode 100644 index 0000000..d785d51 --- /dev/null +++ b/config/fluentd.yaml @@ -0,0 +1,78 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: fluentd-config + namespace: efk +data: + fluent.conf: | + + @type http + @id input_http + port 8888 + tag sardine.log # 初始 tag,客户端需指定完整 tag 如 sardine.log.candle.BTC-USDT.15M + @label @main + + @type json + + + + + + + @type stdout + @id output_stdout_all + diff --git a/fluentd.conf b/fluentd.conf new file mode 100644 index 0000000..290ecea --- /dev/null +++ b/fluentd.conf @@ -0,0 +1,61 @@ + + @type http + @id input_http + port 8888 + @label @main + + + + + + @type stdout + @id output_stdout_all + diff --git a/okx/candleList.go b/okx/candleList.go index 5b0024d..b42e275 100644 --- a/okx/candleList.go +++ b/okx/candleList.go @@ -1,9 +1,14 @@ package okx import ( + "bytes" "encoding/csv" "encoding/json" "fmt" + "io" + + cfg "gitea.zjmud.xyz/phyer/tanya/config" // 请将your_project_path替换为实际的项目路径 + "net/http" "strconv" "strings" "time" @@ -15,15 +20,22 @@ func formatTimestamp(ts string) (string, error) { if err != nil { return "", err } - return time.UnixMilli(millis).Format("2006-01-02 15:04:05"), nil + loc, _ := time.LoadLocation("Asia/Shanghai") + return time.UnixMilli(millis).In(loc).Format("2006-01-02 15:04:05"), nil } // CandleList 封装Candle数组并提供序列化方法 type CandleList struct { - Candles []*Candle + Candles []*Candle + CoinPair string // 交易对名称,如 BTC-USDT + Period string // 周期名称,如 15M, 1D } func MakeCandleList(instId string, period string, startTime time.Time, endTime time.Time, blockSize int) (*CandleList, error) { + cl := &CandleList{ + CoinPair: instId, + Period: period, + } service := NewOkxPublicDataService() var allCandles []*Candle currentTime := endTime @@ -49,7 +61,7 @@ func MakeCandleList(instId string, period string, startTime time.Time, endTime t currentTime = newCurrentTime } fmt.Println("lens of allCandles: ", len(allCandles)) - cl := &CandleList{Candles: allCandles} + cl.Candles = allCandles return cl, nil } @@ -130,3 +142,188 @@ func (cl *CandleList) ToCsv() (string, error) { return sb.String(), nil } + +// ToEs 将Candle数据发送到Elasticsearch +func (cl *CandleList) ToEs() error { + // 获取当前年份 + currentYear := time.Now().Year() + + // 构造tag,格式为:tanya.candle.交易对.年份.周期(保持周期原样) + tag := fmt.Sprintf("tanya.candle.%s.%d.%s", cl.CoinPair, currentYear, cl.Period) + + // 解析周期 + duration, err := parsePeriod(cl.Period) + if err != nil { + return fmt.Errorf("invalid period: %v", err) + } + + // 分批发送,每批最多50条 + batchSize := 50 + for i := 0; i < len(cl.Candles); i += batchSize { + end := i + batchSize + if end > len(cl.Candles) { + end = len(cl.Candles) + } + + // 准备批量数据 + var records []map[string]interface{} + for _, candle := range cl.Candles[i:end] { + // 验证时间戳是否为周期的整数倍 + ts, err := strconv.ParseInt(candle.Timestamp, 10, 64) + if err != nil { + fmt.Println("invalid timestamp:", ts, err) + continue + } + + // 转换为东八区时间 + loc, _ := time.LoadLocation("Asia/Shanghai") + cstTime := time.UnixMilli(ts).In(loc) + + // 对于日线数据,检查是否为当天的 00:00:00 + if cl.Period == "1D" { + if cstTime.Hour() != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 { + return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period) + } + } else { + // 对于其他周期,使用原来的对齐检查 + if cstTime.UnixMilli()%duration.Milliseconds() != 0 { + return fmt.Errorf("timestamp %d is not aligned with period %s", ts, cl.Period) + } + } + + // 格式化时间 + formattedTime := cstTime.Format("2006-01-02 15:04:05") + + // 构造记录 + record := map[string]interface{}{ + "_id": ts, // 使用时间戳作为_id + "dataTime": formattedTime, + "open": candle.Open, + "high": candle.High, + "low": candle.Low, + "close": candle.Close, + "volume": candle.Volume, + "volumeCcy": candle.VolumeCcy, + } + + records = append(records, record) + } + + // 构造请求体 + payload := map[string]interface{}{ + "tag": tag, + "record": records, + } + + jsonData, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("failed to marshal data: %v", err) + } + + // 尝试从不同层级加载配置 + var config *cfg.Config + configPaths := []string{ + "config/config.json", // 当前目录下的config目录 + "../config/config.json", // 上一级目录下的config目录 + "../../config/config.json", // 上两级目录下的config目录 + } + + for _, path := range configPaths { + config, err = cfg.LoadConfig(path) + if err == nil { + break + } + } + + if err != nil { + return fmt.Errorf("failed to load config after trying paths: %v. Tried paths: %v", err, configPaths) + } + + // 构造完整URL,添加json参数 + fullURL := fmt.Sprintf("%s/%s?json", strings.TrimRight(config.ElasticsearchURL, "/"), tag) + + // 输出完整请求URL和请求体到日志 + fmt.Printf("Sending request to URL: %s\n", fullURL) + fmt.Printf("Request Body: %s\n", string(jsonData)) + + // 创建带超时的HTTP客户端 + client := &http.Client{ + Timeout: 30 * time.Second, + } + + // 创建请求 + req, err := http.NewRequest("POST", fullURL, bytes.NewBuffer(jsonData)) + if err != nil { + return fmt.Errorf("failed to create request: %v", err) + } + + // 设置请求头 + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + + // 发送HTTP请求 + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to send data to Fluentd: %v", err) + } + defer resp.Body.Close() + + // 读取响应体 + body, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to read response body: %v", err) + } + + // 输出完整的响应信息 + fmt.Printf("HTTP Response Status: %s\n", resp.Status) + fmt.Printf("HTTP Response Headers: %v\n", resp.Header) + fmt.Printf("HTTP Response Body: %s\n", string(body)) + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected status code: %d, response: %s", resp.StatusCode, string(body)) + } + //回头把response列出来看看,是不是有报错 + + fmt.Printf("Successfully sent %d records to Fluentd\n", len(records)) + } + + return nil +} + +// parsePeriod 将周期字符串转换为time.Duration +func parsePeriod(period string) (time.Duration, error) { + switch period { + case "1m": + return time.Minute, nil + case "3m": + return 3 * time.Minute, nil + case "5m": + return 5 * time.Minute, nil + case "15m": + return 15 * time.Minute, nil + case "30m": + return 30 * time.Minute, nil + case "1H": + return time.Hour, nil + case "2H": + return 2 * time.Hour, nil + case "4H": + return 4 * time.Hour, nil + case "6H": + return 6 * time.Hour, nil + case "12H": + return 12 * time.Hour, nil + case "1D": + return 24 * time.Hour, nil + case "D": + return 24 * time.Hour, nil + case "2D": + return 48 * time.Hour, nil + case "5D": + return 120 * time.Hour, nil + case "7D": + return 168 * time.Hour, nil + default: + return 0, fmt.Errorf("unsupported period: %s", period) + } +} diff --git a/okx/publicApiService.go b/okx/publicApiService.go index 3f2945a..307bd8f 100644 --- a/okx/publicApiService.go +++ b/okx/publicApiService.go @@ -1,11 +1,14 @@ package okx import ( + "bytes" "encoding/json" "fmt" "github.com/google/go-querystring/query" // 用于将 struct 转为 URL 参数,需要安装:go get github.com/google/go-querystring + "io" "net/http" "net/url" + "time" ) type OkxPublicDataService struct { @@ -17,7 +20,9 @@ type OkxPublicDataService struct { func NewOkxPublicDataService() *OkxPublicDataService { return &OkxPublicDataService{ BaseURL: "https://aws.okx.com/api/v5", - client: &http.Client{}, + client: &http.Client{ + Timeout: 10 * time.Second, // 设置10秒超时 + }, } } @@ -48,7 +53,25 @@ func (s *OkxPublicDataService) GetInstruments(params InstrumentsRequest) ([]Inst } if apiResp.Code != "0" { - return nil, fmt.Errorf("API error: %s", apiResp.Msg) + return nil, fmt.Errorf("API error: code=%s, msg=%s, request_url=%s", apiResp.Code, apiResp.Msg, u.String()) + } + + // 进行类型断言并检查长度 + switch data := apiResp.Data.(type) { + case []Instrument: + if len(data) == 0 { + return nil, fmt.Errorf("no data returned from API") + } + case [][]string: + if len(data) == 0 { + return nil, fmt.Errorf("no data returned from API") + } + case []*Ticker: + if len(data) == 0 { + return nil, fmt.Errorf("no data returned from API") + } + default: + return nil, fmt.Errorf("unexpected data type from API: %T", apiResp.Data) } return *apiResp.Data.(*[]Instrument), nil @@ -68,12 +91,53 @@ func (s *OkxPublicDataService) GetCandles(params CandlesRequest) ([]*Candle, err } u.RawQuery = q.Encode() - resp, err := http.Get(u.String()) + // 添加调试日志 + fmt.Printf("Making request to: %s\n", u.String()) + + // 创建自定义HTTP请求 + req, err := http.NewRequest("GET", u.String(), nil) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to create request: %v", err) } + + // 设置请求头 + req.Header.Set("Accept", "application/json") + req.Header.Set("Content-Type", "application/json") + + // 发送请求 + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, fmt.Errorf("request failed: %v", err) + } + defer resp.Body.Close() + // 打印详细调试信息 + fmt.Printf("Request URL: %s\n", u.String()) + fmt.Printf("Response Status: %s\n", resp.Status) + fmt.Printf("Response Headers: %v\n", resp.Header) + + // 记录响应状态 + fmt.Printf("Response Status: %s\n", resp.Status) + + // 读取并记录响应体 + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response body: %v", err) + } + + // 打印响应体长度和内容 + fmt.Printf("Response Body Length: %d\n", len(body)) + if len(body) > 0 { + fmt.Printf("Response Body: %s\n", string(body)) + } else { + fmt.Println("Response Body is empty") + } + fmt.Printf("Response Body: %s\n", string(body)) + + // 重新设置resp.Body以便后续解码 + resp.Body = io.NopCloser(bytes.NewBuffer(body)) + var apiResp struct { Code string `json:"code"` Msg string `json:"msg"` diff --git a/test/okxAli/candleList_test.go b/test/okxAli/candleList_test.go index b931544..4314226 100644 --- a/test/okxAli/candleList_test.go +++ b/test/okxAli/candleList_test.go @@ -8,8 +8,8 @@ import ( ) func TestCandleList_ToJson(t *testing.T) { - startTime := time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC) - endTime := time.Date(2022, 12, 31, 0, 0, 0, 0, time.UTC) + startTime := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC) + endTime := time.Date(2023, 12, 31, 0, 0, 0, 0, time.UTC) cl, err := MakeCandleList("BTC-USDT", "1D", startTime, endTime, 50) if err != nil { t.Fatalf("ToJson failed: %v", err) @@ -30,8 +30,8 @@ func TestCandleList_ToJson(t *testing.T) { } func TestCandleList_ToCsv(t *testing.T) { - startTime := time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC) - endTime := time.Date(2022, 12, 31, 0, 0, 0, 0, time.UTC) + startTime := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC) + endTime := time.Date(2023, 12, 31, 0, 0, 0, 0, time.UTC) cl, err := MakeCandleList("BTC-USDT", "1D", startTime, endTime, 50) if err != nil { t.Fatalf("ToJson failed: %v", err) @@ -50,3 +50,16 @@ func TestCandleList_ToCsv(t *testing.T) { t.Logf("CSV output written to %s", tmpFile) } + +func TestCandleList_ToEs(t *testing.T) { + startTime := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC) + endTime := time.Date(2023, 12, 31, 0, 0, 0, 0, time.UTC) + cl, err := MakeCandleList("BTC-USDT", "1D", startTime, endTime, 50) + if err != nil { + t.Fatalf("ToEs failed: %v", err) + } + err = cl.ToEs() + if err != nil { + t.Fatalf("ToEs failed: %v", err) + } +} diff --git a/test/okxAli/get_candles_test.go b/test/okxAli/get_candles_test.go index 50bf1ca..172cfb9 100644 --- a/test/okxAli/get_candles_test.go +++ b/test/okxAli/get_candles_test.go @@ -21,10 +21,23 @@ func TestGetCandles(t *testing.T) { t.Fatalf("Expected no error, got %v", err) } + // 添加详细验证 if len(candles) == 0 { t.Fatal("Expected at least one candle, got none") } + // 检查第一个蜡烛数据 + firstCandle := candles[0] + if firstCandle.Timestamp == "" { + t.Error("Expected non-empty Timestamp") + } + if firstCandle.Open == "" || firstCandle.High == "" || firstCandle.Low == "" || firstCandle.Close == "" { + t.Error("Expected non-empty OHLC values") + } + if firstCandle.Volume == "" || firstCandle.VolumeCcy == "" { + t.Error("Expected non-empty Volume and VolumeCcy") + } + // 检查返回数据的结构 for _, candle := range candles { if candle.Timestamp == "" {