package okx import ( "bytes" "encoding/csv" "encoding/json" "fmt" cfg "gitea.zjmud.xyz/phyer/tanya/config" "io" "net/http" "strconv" "strings" "time" ) // formatTimestamp 将毫秒时间戳字符串转换为可读格式 func formatTimestamp(ts string) (string, error) { millis, err := strconv.ParseInt(ts, 10, 64) if err != nil { return "", err } loc, _ := time.LoadLocation("Asia/Shanghai") return time.UnixMilli(millis).In(loc).Format("2006-01-02 15:04:05"), nil } // CandleList 封装Candle数组并提供序列化方法 type CandleList struct { Candles []*Candle CoinPair string // 交易对名称,如 BTC-USDT Period string // 周期名称,如 15M, 1D } func MakeCandleList(instId string, period string, startTime time.Time, endTime time.Time, blockSize int) (*CandleList, error) { cl := &CandleList{ CoinPair: instId, Period: period, } service := NewOkxPublicDataService() var allCandles []*Candle currentTime := endTime for currentTime.After(startTime) || currentTime.Equal(startTime) { req := CandlesRequest{ InstID: instId, Bar: period, After: strconv.FormatInt(currentTime.UnixMilli(), 10), Limit: strconv.Itoa(blockSize), } candles, err := service.GetCandles(req) if err != nil { return nil, err } if len(candles) == 0 { break } allCandles = append(allCandles, candles...) // 更新currentTime为最后一条数据的时间戳 lastCandleTime, _ := strconv.ParseInt(candles[len(candles)-1].Timestamp, 10, 64) newCurrentTime := time.UnixMilli(lastCandleTime).Add(time.Millisecond) currentTime = newCurrentTime } fmt.Println("lens of allCandles: ", len(allCandles)) cl.Candles = allCandles return cl, nil } // ToJson 将Candle数组序列化为JSON字符串 func (cl *CandleList) ToJson() (string, error) { // 创建临时结构体用于格式化时间戳 type FormattedCandle struct { Timestamp string `json:"timestamp"` Open string `json:"open"` High string `json:"high"` Low string `json:"low"` Close string `json:"close"` Volume string `json:"volume"` VolumeCcy string `json:"volumeCcy"` } var formattedCandles []FormattedCandle for _, candle := range cl.Candles { formattedTs, err := formatTimestamp(candle.Timestamp) if err != nil { return "", err } formattedCandles = append(formattedCandles, FormattedCandle{ Timestamp: formattedTs, Open: candle.Open, High: candle.High, Low: candle.Low, Close: candle.Close, Volume: candle.Volume, VolumeCcy: candle.VolumeCcy, }) } jsonData, err := json.Marshal(formattedCandles) if err != nil { return "", err } return string(jsonData), nil } // ToCsv 将Candle数组序列化为CSV字符串 func (cl *CandleList) ToCsv() (string, error) { var sb strings.Builder writer := csv.NewWriter(&sb) // 写入CSV头 header := []string{"Timestamp", "Open", "High", "Low", "Close", "Volume", "VolumeCcy"} if err := writer.Write(header); err != nil { return "", err } // 调试信息 fmt.Printf("Number of candles: %d\n", len(cl.Candles)) for _, candle := range cl.Candles { formattedTs, err := formatTimestamp(candle.Timestamp) if err != nil { return "", err } record := []string{ formattedTs, candle.Open, candle.High, candle.Low, candle.Close, candle.Volume, candle.VolumeCcy, } if err := writer.Write(record); err != nil { return "", err } } writer.Flush() if err := writer.Error(); err != nil { return "", err } return sb.String(), nil } // ToFluentd 将Candle数据发送到Fluentd func (cl *CandleList) ToFluentd() error { // 获取当前年份 currentYear := time.Now().Year() // 构造tag,格式为:tanya.candle.交易对.年份.周期(保持周期原样) tag := fmt.Sprintf("tanya.candle.%s.%d.%s", cl.CoinPair, currentYear, cl.Period) // 解析周期 duration, err := parsePeriod(cl.Period) if err != nil { return fmt.Errorf("invalid period: %v", err) } // 分批发送,每批最多50条 batchSize := 50 for i := 0; i < len(cl.Candles); i += batchSize { end := i + batchSize if end > len(cl.Candles) { end = len(cl.Candles) } // 准备批量数据 var records []map[string]interface{} for _, candle := range cl.Candles[i:end] { // 验证时间戳是否为周期的整数倍 ts, err := strconv.ParseInt(candle.Timestamp, 10, 64) if err != nil { fmt.Println("invalid timestamp:", ts, err) continue } // 转换为东八区时间 loc, _ := time.LoadLocation("Asia/Shanghai") cstTime := time.UnixMilli(ts).In(loc) // 对于日线数据,检查是否为当天的 00:00:00 if cl.Period == "1D" { if cstTime.Hour() != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 { return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period) } } else { // 对于其他周期,使用原来的对齐检查 if cstTime.UnixMilli()%duration.Milliseconds() != 0 { return fmt.Errorf("timestamp %d is not aligned with period %s", ts, cl.Period) } } // 格式化时间 formattedTime := cstTime.Format("2006-01-02 15:04:05") // 构造记录 record := map[string]interface{}{ "_id": ts, // 使用时间戳作为_id "dataTime": formattedTime, "open": candle.Open, "high": candle.High, "low": candle.Low, "close": candle.Close, "volume": candle.Volume, "volumeCcy": candle.VolumeCcy, } records = append(records, record) } // 构造请求体 payload := map[string]interface{}{ "tag": tag, "record": records, } jsonData, err := json.Marshal(payload) if err != nil { return fmt.Errorf("failed to marshal data: %v", err) } // 尝试从不同层级加载配置 var config *cfg.Config configPaths := []string{ "config/config.json", // 当前目录下的config目录 "../config/config.json", // 上一级目录下的config目录 "../../config/config.json", // 上两级目录下的config目录 } for _, path := range configPaths { config, err = cfg.LoadConfig(path) if err == nil { break } } if err != nil { return fmt.Errorf("failed to load config after trying paths: %v. Tried paths: %v", err, configPaths) } // 构造完整URL,添加json参数 fullURL := fmt.Sprintf("%s/%s?json", strings.TrimRight(config.FluentdURL, "/"), tag) // 输出完整请求URL和请求体到日志 fmt.Printf("Sending request to URL: %s\n", fullURL) fmt.Printf("Request Body: %s\n", string(jsonData)) // 创建带超时的HTTP客户端 client := &http.Client{ Timeout: 30 * time.Second, } // 创建请求 req, err := http.NewRequest("POST", fullURL, bytes.NewBuffer(jsonData)) if err != nil { return fmt.Errorf("failed to create request: %v", err) } // 设置请求头 req.Header.Set("Content-Type", "application/json") req.Header.Set("Accept", "application/json") // 发送HTTP请求 resp, err := client.Do(req) if err != nil { return fmt.Errorf("failed to send data to Fluentd: %v", err) } defer resp.Body.Close() // 读取响应体 body, err := io.ReadAll(resp.Body) if err != nil { return fmt.Errorf("failed to read response body: %v", err) } // 输出完整的响应信息 fmt.Printf("HTTP Response Status: %s\n", resp.Status) fmt.Printf("HTTP Response Headers: %v\n", resp.Header) fmt.Printf("HTTP Response Body: %s\n", string(body)) if resp.StatusCode != http.StatusOK { return fmt.Errorf("unexpected status code: %d, response: %s", resp.StatusCode, string(body)) } //回头把response列出来看看,是不是有报错 fmt.Printf("Successfully sent %d records to Fluentd\n", len(records)) } return nil } // parsePeriod 将周期字符串转换为time.Duration func parsePeriod(period string) (time.Duration, error) { switch period { case "1m": return time.Minute, nil case "3m": return 3 * time.Minute, nil case "5m": return 5 * time.Minute, nil case "15m": return 15 * time.Minute, nil case "30m": return 30 * time.Minute, nil case "1H": return time.Hour, nil case "2H": return 2 * time.Hour, nil case "4H": return 4 * time.Hour, nil case "6H": return 6 * time.Hour, nil case "12H": return 12 * time.Hour, nil case "1D": return 24 * time.Hour, nil case "D": return 24 * time.Hour, nil case "2D": return 48 * time.Hour, nil case "5D": return 120 * time.Hour, nil case "7D": return 168 * time.Hour, nil default: return 0, fmt.Errorf("unsupported period: %s", period) } }