diff --git a/config/config.go b/config/config.go
new file mode 100644
index 0000000..41adb8f
--- /dev/null
+++ b/config/config.go
@@ -0,0 +1,26 @@
+package config
+
+import (
+ "encoding/json"
+ "os"
+)
+
+type Config struct {
+ ElasticsearchURL string `json:"elasticsearch_url"`
+}
+
+// LoadConfig 从指定路径加载配置文件
+func LoadConfig(path string) (*Config, error) {
+ file, err := os.ReadFile(path)
+ if err != nil {
+ return nil, err
+ }
+
+ var config Config
+ err = json.Unmarshal(file, &config)
+ if err != nil {
+ return nil, err
+ }
+
+ return &config, nil
+}
diff --git a/config/config.json b/config/config.json
new file mode 100644
index 0000000..5f8eb60
--- /dev/null
+++ b/config/config.json
@@ -0,0 +1,5 @@
+{
+ "elasticsearch_url": "http://fluentd.k8s.xunlang.home"
+}
+
+
diff --git a/config/fluentd.yaml b/config/fluentd.yaml
new file mode 100644
index 0000000..d785d51
--- /dev/null
+++ b/config/fluentd.yaml
@@ -0,0 +1,78 @@
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: fluentd-config
+ namespace: efk
+data:
+ fluent.conf: |
+
+ @type http
+ @id input_http
+ port 8888
+ tag sardine.log # 初始 tag,客户端需指定完整 tag 如 sardine.log.candle.BTC-USDT.15M
+ @label @main
+
+ @type json
+
+
+
+
+
+
+ @type stdout
+ @id output_stdout_all
+
diff --git a/fluentd.conf b/fluentd.conf
new file mode 100644
index 0000000..290ecea
--- /dev/null
+++ b/fluentd.conf
@@ -0,0 +1,61 @@
+
+ @type http
+ @id input_http
+ port 8888
+ @label @main
+
+
+
+
+
+ @type stdout
+ @id output_stdout_all
+
diff --git a/okx/candleList.go b/okx/candleList.go
index 5b0024d..b42e275 100644
--- a/okx/candleList.go
+++ b/okx/candleList.go
@@ -1,9 +1,14 @@
package okx
import (
+ "bytes"
"encoding/csv"
"encoding/json"
"fmt"
+ "io"
+
+ cfg "gitea.zjmud.xyz/phyer/tanya/config" // 请将your_project_path替换为实际的项目路径
+ "net/http"
"strconv"
"strings"
"time"
@@ -15,15 +20,22 @@ func formatTimestamp(ts string) (string, error) {
if err != nil {
return "", err
}
- return time.UnixMilli(millis).Format("2006-01-02 15:04:05"), nil
+ loc, _ := time.LoadLocation("Asia/Shanghai")
+ return time.UnixMilli(millis).In(loc).Format("2006-01-02 15:04:05"), nil
}
// CandleList 封装Candle数组并提供序列化方法
type CandleList struct {
- Candles []*Candle
+ Candles []*Candle
+ CoinPair string // 交易对名称,如 BTC-USDT
+ Period string // 周期名称,如 15M, 1D
}
func MakeCandleList(instId string, period string, startTime time.Time, endTime time.Time, blockSize int) (*CandleList, error) {
+ cl := &CandleList{
+ CoinPair: instId,
+ Period: period,
+ }
service := NewOkxPublicDataService()
var allCandles []*Candle
currentTime := endTime
@@ -49,7 +61,7 @@ func MakeCandleList(instId string, period string, startTime time.Time, endTime t
currentTime = newCurrentTime
}
fmt.Println("lens of allCandles: ", len(allCandles))
- cl := &CandleList{Candles: allCandles}
+ cl.Candles = allCandles
return cl, nil
}
@@ -130,3 +142,188 @@ func (cl *CandleList) ToCsv() (string, error) {
return sb.String(), nil
}
+
+// ToEs 将Candle数据发送到Elasticsearch
+func (cl *CandleList) ToEs() error {
+ // 获取当前年份
+ currentYear := time.Now().Year()
+
+ // 构造tag,格式为:tanya.candle.交易对.年份.周期(保持周期原样)
+ tag := fmt.Sprintf("tanya.candle.%s.%d.%s", cl.CoinPair, currentYear, cl.Period)
+
+ // 解析周期
+ duration, err := parsePeriod(cl.Period)
+ if err != nil {
+ return fmt.Errorf("invalid period: %v", err)
+ }
+
+ // 分批发送,每批最多50条
+ batchSize := 50
+ for i := 0; i < len(cl.Candles); i += batchSize {
+ end := i + batchSize
+ if end > len(cl.Candles) {
+ end = len(cl.Candles)
+ }
+
+ // 准备批量数据
+ var records []map[string]interface{}
+ for _, candle := range cl.Candles[i:end] {
+ // 验证时间戳是否为周期的整数倍
+ ts, err := strconv.ParseInt(candle.Timestamp, 10, 64)
+ if err != nil {
+ fmt.Println("invalid timestamp:", ts, err)
+ continue
+ }
+
+ // 转换为东八区时间
+ loc, _ := time.LoadLocation("Asia/Shanghai")
+ cstTime := time.UnixMilli(ts).In(loc)
+
+ // 对于日线数据,检查是否为当天的 00:00:00
+ if cl.Period == "1D" {
+ if cstTime.Hour() != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 {
+ return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period)
+ }
+ } else {
+ // 对于其他周期,使用原来的对齐检查
+ if cstTime.UnixMilli()%duration.Milliseconds() != 0 {
+ return fmt.Errorf("timestamp %d is not aligned with period %s", ts, cl.Period)
+ }
+ }
+
+ // 格式化时间
+ formattedTime := cstTime.Format("2006-01-02 15:04:05")
+
+ // 构造记录
+ record := map[string]interface{}{
+ "_id": ts, // 使用时间戳作为_id
+ "dataTime": formattedTime,
+ "open": candle.Open,
+ "high": candle.High,
+ "low": candle.Low,
+ "close": candle.Close,
+ "volume": candle.Volume,
+ "volumeCcy": candle.VolumeCcy,
+ }
+
+ records = append(records, record)
+ }
+
+ // 构造请求体
+ payload := map[string]interface{}{
+ "tag": tag,
+ "record": records,
+ }
+
+ jsonData, err := json.Marshal(payload)
+ if err != nil {
+ return fmt.Errorf("failed to marshal data: %v", err)
+ }
+
+ // 尝试从不同层级加载配置
+ var config *cfg.Config
+ configPaths := []string{
+ "config/config.json", // 当前目录下的config目录
+ "../config/config.json", // 上一级目录下的config目录
+ "../../config/config.json", // 上两级目录下的config目录
+ }
+
+ for _, path := range configPaths {
+ config, err = cfg.LoadConfig(path)
+ if err == nil {
+ break
+ }
+ }
+
+ if err != nil {
+ return fmt.Errorf("failed to load config after trying paths: %v. Tried paths: %v", err, configPaths)
+ }
+
+ // 构造完整URL,添加json参数
+ fullURL := fmt.Sprintf("%s/%s?json", strings.TrimRight(config.ElasticsearchURL, "/"), tag)
+
+ // 输出完整请求URL和请求体到日志
+ fmt.Printf("Sending request to URL: %s\n", fullURL)
+ fmt.Printf("Request Body: %s\n", string(jsonData))
+
+ // 创建带超时的HTTP客户端
+ client := &http.Client{
+ Timeout: 30 * time.Second,
+ }
+
+ // 创建请求
+ req, err := http.NewRequest("POST", fullURL, bytes.NewBuffer(jsonData))
+ if err != nil {
+ return fmt.Errorf("failed to create request: %v", err)
+ }
+
+ // 设置请求头
+ req.Header.Set("Content-Type", "application/json")
+ req.Header.Set("Accept", "application/json")
+
+ // 发送HTTP请求
+ resp, err := client.Do(req)
+ if err != nil {
+ return fmt.Errorf("failed to send data to Fluentd: %v", err)
+ }
+ defer resp.Body.Close()
+
+ // 读取响应体
+ body, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return fmt.Errorf("failed to read response body: %v", err)
+ }
+
+ // 输出完整的响应信息
+ fmt.Printf("HTTP Response Status: %s\n", resp.Status)
+ fmt.Printf("HTTP Response Headers: %v\n", resp.Header)
+ fmt.Printf("HTTP Response Body: %s\n", string(body))
+
+ if resp.StatusCode != http.StatusOK {
+ return fmt.Errorf("unexpected status code: %d, response: %s", resp.StatusCode, string(body))
+ }
+ //回头把response列出来看看,是不是有报错
+
+ fmt.Printf("Successfully sent %d records to Fluentd\n", len(records))
+ }
+
+ return nil
+}
+
+// parsePeriod 将周期字符串转换为time.Duration
+func parsePeriod(period string) (time.Duration, error) {
+ switch period {
+ case "1m":
+ return time.Minute, nil
+ case "3m":
+ return 3 * time.Minute, nil
+ case "5m":
+ return 5 * time.Minute, nil
+ case "15m":
+ return 15 * time.Minute, nil
+ case "30m":
+ return 30 * time.Minute, nil
+ case "1H":
+ return time.Hour, nil
+ case "2H":
+ return 2 * time.Hour, nil
+ case "4H":
+ return 4 * time.Hour, nil
+ case "6H":
+ return 6 * time.Hour, nil
+ case "12H":
+ return 12 * time.Hour, nil
+ case "1D":
+ return 24 * time.Hour, nil
+ case "D":
+ return 24 * time.Hour, nil
+ case "2D":
+ return 48 * time.Hour, nil
+ case "5D":
+ return 120 * time.Hour, nil
+ case "7D":
+ return 168 * time.Hour, nil
+ default:
+ return 0, fmt.Errorf("unsupported period: %s", period)
+ }
+}
diff --git a/okx/publicApiService.go b/okx/publicApiService.go
index 3f2945a..307bd8f 100644
--- a/okx/publicApiService.go
+++ b/okx/publicApiService.go
@@ -1,11 +1,14 @@
package okx
import (
+ "bytes"
"encoding/json"
"fmt"
"github.com/google/go-querystring/query" // 用于将 struct 转为 URL 参数,需要安装:go get github.com/google/go-querystring
+ "io"
"net/http"
"net/url"
+ "time"
)
type OkxPublicDataService struct {
@@ -17,7 +20,9 @@ type OkxPublicDataService struct {
func NewOkxPublicDataService() *OkxPublicDataService {
return &OkxPublicDataService{
BaseURL: "https://aws.okx.com/api/v5",
- client: &http.Client{},
+ client: &http.Client{
+ Timeout: 10 * time.Second, // 设置10秒超时
+ },
}
}
@@ -48,7 +53,25 @@ func (s *OkxPublicDataService) GetInstruments(params InstrumentsRequest) ([]Inst
}
if apiResp.Code != "0" {
- return nil, fmt.Errorf("API error: %s", apiResp.Msg)
+ return nil, fmt.Errorf("API error: code=%s, msg=%s, request_url=%s", apiResp.Code, apiResp.Msg, u.String())
+ }
+
+ // 进行类型断言并检查长度
+ switch data := apiResp.Data.(type) {
+ case []Instrument:
+ if len(data) == 0 {
+ return nil, fmt.Errorf("no data returned from API")
+ }
+ case [][]string:
+ if len(data) == 0 {
+ return nil, fmt.Errorf("no data returned from API")
+ }
+ case []*Ticker:
+ if len(data) == 0 {
+ return nil, fmt.Errorf("no data returned from API")
+ }
+ default:
+ return nil, fmt.Errorf("unexpected data type from API: %T", apiResp.Data)
}
return *apiResp.Data.(*[]Instrument), nil
@@ -68,12 +91,53 @@ func (s *OkxPublicDataService) GetCandles(params CandlesRequest) ([]*Candle, err
}
u.RawQuery = q.Encode()
- resp, err := http.Get(u.String())
+ // 添加调试日志
+ fmt.Printf("Making request to: %s\n", u.String())
+
+ // 创建自定义HTTP请求
+ req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
- return nil, err
+ return nil, fmt.Errorf("failed to create request: %v", err)
}
+
+ // 设置请求头
+ req.Header.Set("Accept", "application/json")
+ req.Header.Set("Content-Type", "application/json")
+
+ // 发送请求
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return nil, fmt.Errorf("request failed: %v", err)
+ }
+
defer resp.Body.Close()
+ // 打印详细调试信息
+ fmt.Printf("Request URL: %s\n", u.String())
+ fmt.Printf("Response Status: %s\n", resp.Status)
+ fmt.Printf("Response Headers: %v\n", resp.Header)
+
+ // 记录响应状态
+ fmt.Printf("Response Status: %s\n", resp.Status)
+
+ // 读取并记录响应体
+ body, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read response body: %v", err)
+ }
+
+ // 打印响应体长度和内容
+ fmt.Printf("Response Body Length: %d\n", len(body))
+ if len(body) > 0 {
+ fmt.Printf("Response Body: %s\n", string(body))
+ } else {
+ fmt.Println("Response Body is empty")
+ }
+ fmt.Printf("Response Body: %s\n", string(body))
+
+ // 重新设置resp.Body以便后续解码
+ resp.Body = io.NopCloser(bytes.NewBuffer(body))
+
var apiResp struct {
Code string `json:"code"`
Msg string `json:"msg"`
diff --git a/test/okxAli/candleList_test.go b/test/okxAli/candleList_test.go
index b931544..4314226 100644
--- a/test/okxAli/candleList_test.go
+++ b/test/okxAli/candleList_test.go
@@ -8,8 +8,8 @@ import (
)
func TestCandleList_ToJson(t *testing.T) {
- startTime := time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC)
- endTime := time.Date(2022, 12, 31, 0, 0, 0, 0, time.UTC)
+ startTime := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC)
+ endTime := time.Date(2023, 12, 31, 0, 0, 0, 0, time.UTC)
cl, err := MakeCandleList("BTC-USDT", "1D", startTime, endTime, 50)
if err != nil {
t.Fatalf("ToJson failed: %v", err)
@@ -30,8 +30,8 @@ func TestCandleList_ToJson(t *testing.T) {
}
func TestCandleList_ToCsv(t *testing.T) {
- startTime := time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC)
- endTime := time.Date(2022, 12, 31, 0, 0, 0, 0, time.UTC)
+ startTime := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC)
+ endTime := time.Date(2023, 12, 31, 0, 0, 0, 0, time.UTC)
cl, err := MakeCandleList("BTC-USDT", "1D", startTime, endTime, 50)
if err != nil {
t.Fatalf("ToJson failed: %v", err)
@@ -50,3 +50,16 @@ func TestCandleList_ToCsv(t *testing.T) {
t.Logf("CSV output written to %s", tmpFile)
}
+
+func TestCandleList_ToEs(t *testing.T) {
+ startTime := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC)
+ endTime := time.Date(2023, 12, 31, 0, 0, 0, 0, time.UTC)
+ cl, err := MakeCandleList("BTC-USDT", "1D", startTime, endTime, 50)
+ if err != nil {
+ t.Fatalf("ToEs failed: %v", err)
+ }
+ err = cl.ToEs()
+ if err != nil {
+ t.Fatalf("ToEs failed: %v", err)
+ }
+}
diff --git a/test/okxAli/get_candles_test.go b/test/okxAli/get_candles_test.go
index 50bf1ca..172cfb9 100644
--- a/test/okxAli/get_candles_test.go
+++ b/test/okxAli/get_candles_test.go
@@ -21,10 +21,23 @@ func TestGetCandles(t *testing.T) {
t.Fatalf("Expected no error, got %v", err)
}
+ // 添加详细验证
if len(candles) == 0 {
t.Fatal("Expected at least one candle, got none")
}
+ // 检查第一个蜡烛数据
+ firstCandle := candles[0]
+ if firstCandle.Timestamp == "" {
+ t.Error("Expected non-empty Timestamp")
+ }
+ if firstCandle.Open == "" || firstCandle.High == "" || firstCandle.Low == "" || firstCandle.Close == "" {
+ t.Error("Expected non-empty OHLC values")
+ }
+ if firstCandle.Volume == "" || firstCandle.VolumeCcy == "" {
+ t.Error("Expected non-empty Volume and VolumeCcy")
+ }
+
// 检查返回数据的结构
for _, candle := range candles {
if candle.Timestamp == "" {