This commit is contained in:
zhangkun9038@dingtalk.com 2025-03-10 11:00:46 +08:00
parent 64fe4a18b0
commit 7202a98998
8 changed files with 468 additions and 11 deletions

26
config/config.go Normal file
View File

@ -0,0 +1,26 @@
package config
import (
"encoding/json"
"os"
)
type Config struct {
ElasticsearchURL string `json:"elasticsearch_url"`
}
// LoadConfig 从指定路径加载配置文件
func LoadConfig(path string) (*Config, error) {
file, err := os.ReadFile(path)
if err != nil {
return nil, err
}
var config Config
err = json.Unmarshal(file, &config)
if err != nil {
return nil, err
}
return &config, nil
}

5
config/config.json Normal file
View File

@ -0,0 +1,5 @@
{
"elasticsearch_url": "http://fluentd.k8s.xunlang.home"
}

78
config/fluentd.yaml Normal file
View File

@ -0,0 +1,78 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: fluentd-config
namespace: efk
data:
fluent.conf: |
<source>
@type http
@id input_http
port 8888
tag sardine.log # 初始 tag客户端需指定完整 tag 如 sardine.log.candle.BTC-USDT.15M
@label @main
<parse>
@type json
</parse>
</source>
<label @main>
<filter sardine.log.candle.**>
@type record_transformer
enable_ruby true
<record>
coin_pair ${tag_parts[3] || "UNKNOWN"} # 从 tag 第 4 部分提取,如 BTC-USDT
timeframe ${tag_parts[4] || "UNKNOWN"} # 从 tag 第 5 部分提取,如 15M
year ${time.strftime("%Y")} # 从 timestamp 提取年份
# 生成唯一 _id例如 BTC-USDT_15M_2024-03-06-12:00:00
unique_id "${tag_parts[3]}_${tag_parts[4]}_#{record['timestamp'].gsub(/[: ]/, '-')}"
# 修改索引名称,加入 candle
index_name "logstash_candle_${tag_parts[3]}_${time.strftime('%Y')}_${tag_parts[4]}"
</record>
</filter>
<match sardine.log.candle.**>
@type copy
<store>
@type elasticsearch
@id output_elasticsearch_custom
host elasticsearch
port 9200
scheme http
user fluentd_user
password fluentd_password
logstash_format false
index_name ${record["index_name"]} # 动态索引,如 logstash_candle_BTC-USDT_2024_15M
id_key unique_id # 使用 unique_id 作为 _id 去重
flush_interval 5s
@log_level debug
remove_keys _id, coin_pair, timeframe, year, unique_id, index_name
</store>
# 保留原有按日期切分的输出(不加 candle
<store>
@type elasticsearch
@id output_elasticsearch
host elasticsearch
port 9200
scheme http
user fluentd_user
password fluentd_password
logstash_format true
logstash_prefix logstash
logstash_dateformat %Y.%m.%d
id_key unique_id # 同样使用 unique_id 去重
flush_interval 5s
@log_level debug
remove_keys _id, unique_id
</store>
<store>
@type stdout
@id output_stdout
</store>
</match>
</label>
<match **>
@type stdout
@id output_stdout_all
</match>

61
fluentd.conf Normal file
View File

@ -0,0 +1,61 @@
<source>
@type http
@id input_http
port 8888
@label @main
</source>
<label @main>
<match sardine.log.**>
@type copy
<store>
@type elasticsearch
@id output_elasticsearch
host elasticsearch
port 9200
scheme http
user fluentd_user
password fluentd_password
logstash_format true
logstash_prefix logstash
logstash_dateformat %Y.%m.%d
flush_interval 5s
@log_level debug
id_key _id
remove_keys _id
</store>
<store>
@type stdout
@id output_stdout
</store>
</match>
<match tanya.**>
@type copy
<store>
@type elasticsearch
@id output_elasticsearch_tanya
host elasticsearch
port 9200
scheme http
user fluentd_user
password fluentd_password
index_name candle_${tag_parts[1]}_${tag_parts[2]}_${tag_parts[3]}_${tag_parts[4]}
flush_interval 5s
@log_level debug
id_key _id
remove_keys _id
include_tag_key true
tag_key @log_name
</store>
<store>
@type stdout
@id output_stdout_tanya
</store>
</match>
</label>
<match **>
@type stdout
@id output_stdout_all
</match>

View File

@ -1,9 +1,14 @@
package okx package okx
import ( import (
"bytes"
"encoding/csv" "encoding/csv"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
cfg "gitea.zjmud.xyz/phyer/tanya/config" // 请将your_project_path替换为实际的项目路径
"net/http"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -15,15 +20,22 @@ func formatTimestamp(ts string) (string, error) {
if err != nil { if err != nil {
return "", err return "", err
} }
return time.UnixMilli(millis).Format("2006-01-02 15:04:05"), nil loc, _ := time.LoadLocation("Asia/Shanghai")
return time.UnixMilli(millis).In(loc).Format("2006-01-02 15:04:05"), nil
} }
// CandleList 封装Candle数组并提供序列化方法 // CandleList 封装Candle数组并提供序列化方法
type CandleList struct { type CandleList struct {
Candles []*Candle Candles []*Candle
CoinPair string // 交易对名称,如 BTC-USDT
Period string // 周期名称,如 15M, 1D
} }
func MakeCandleList(instId string, period string, startTime time.Time, endTime time.Time, blockSize int) (*CandleList, error) { func MakeCandleList(instId string, period string, startTime time.Time, endTime time.Time, blockSize int) (*CandleList, error) {
cl := &CandleList{
CoinPair: instId,
Period: period,
}
service := NewOkxPublicDataService() service := NewOkxPublicDataService()
var allCandles []*Candle var allCandles []*Candle
currentTime := endTime currentTime := endTime
@ -49,7 +61,7 @@ func MakeCandleList(instId string, period string, startTime time.Time, endTime t
currentTime = newCurrentTime currentTime = newCurrentTime
} }
fmt.Println("lens of allCandles: ", len(allCandles)) fmt.Println("lens of allCandles: ", len(allCandles))
cl := &CandleList{Candles: allCandles} cl.Candles = allCandles
return cl, nil return cl, nil
} }
@ -130,3 +142,188 @@ func (cl *CandleList) ToCsv() (string, error) {
return sb.String(), nil return sb.String(), nil
} }
// ToEs 将Candle数据发送到Elasticsearch
func (cl *CandleList) ToEs() error {
// 获取当前年份
currentYear := time.Now().Year()
// 构造tag格式为tanya.candle.交易对.年份.周期(保持周期原样)
tag := fmt.Sprintf("tanya.candle.%s.%d.%s", cl.CoinPair, currentYear, cl.Period)
// 解析周期
duration, err := parsePeriod(cl.Period)
if err != nil {
return fmt.Errorf("invalid period: %v", err)
}
// 分批发送每批最多50条
batchSize := 50
for i := 0; i < len(cl.Candles); i += batchSize {
end := i + batchSize
if end > len(cl.Candles) {
end = len(cl.Candles)
}
// 准备批量数据
var records []map[string]interface{}
for _, candle := range cl.Candles[i:end] {
// 验证时间戳是否为周期的整数倍
ts, err := strconv.ParseInt(candle.Timestamp, 10, 64)
if err != nil {
fmt.Println("invalid timestamp:", ts, err)
continue
}
// 转换为东八区时间
loc, _ := time.LoadLocation("Asia/Shanghai")
cstTime := time.UnixMilli(ts).In(loc)
// 对于日线数据,检查是否为当天的 00:00:00
if cl.Period == "1D" {
if cstTime.Hour() != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 {
return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period)
}
} else {
// 对于其他周期,使用原来的对齐检查
if cstTime.UnixMilli()%duration.Milliseconds() != 0 {
return fmt.Errorf("timestamp %d is not aligned with period %s", ts, cl.Period)
}
}
// 格式化时间
formattedTime := cstTime.Format("2006-01-02 15:04:05")
// 构造记录
record := map[string]interface{}{
"_id": ts, // 使用时间戳作为_id
"dataTime": formattedTime,
"open": candle.Open,
"high": candle.High,
"low": candle.Low,
"close": candle.Close,
"volume": candle.Volume,
"volumeCcy": candle.VolumeCcy,
}
records = append(records, record)
}
// 构造请求体
payload := map[string]interface{}{
"tag": tag,
"record": records,
}
jsonData, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("failed to marshal data: %v", err)
}
// 尝试从不同层级加载配置
var config *cfg.Config
configPaths := []string{
"config/config.json", // 当前目录下的config目录
"../config/config.json", // 上一级目录下的config目录
"../../config/config.json", // 上两级目录下的config目录
}
for _, path := range configPaths {
config, err = cfg.LoadConfig(path)
if err == nil {
break
}
}
if err != nil {
return fmt.Errorf("failed to load config after trying paths: %v. Tried paths: %v", err, configPaths)
}
// 构造完整URL添加json参数
fullURL := fmt.Sprintf("%s/%s?json", strings.TrimRight(config.ElasticsearchURL, "/"), tag)
// 输出完整请求URL和请求体到日志
fmt.Printf("Sending request to URL: %s\n", fullURL)
fmt.Printf("Request Body: %s\n", string(jsonData))
// 创建带超时的HTTP客户端
client := &http.Client{
Timeout: 30 * time.Second,
}
// 创建请求
req, err := http.NewRequest("POST", fullURL, bytes.NewBuffer(jsonData))
if err != nil {
return fmt.Errorf("failed to create request: %v", err)
}
// 设置请求头
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
// 发送HTTP请求
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to send data to Fluentd: %v", err)
}
defer resp.Body.Close()
// 读取响应体
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response body: %v", err)
}
// 输出完整的响应信息
fmt.Printf("HTTP Response Status: %s\n", resp.Status)
fmt.Printf("HTTP Response Headers: %v\n", resp.Header)
fmt.Printf("HTTP Response Body: %s\n", string(body))
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %d, response: %s", resp.StatusCode, string(body))
}
//回头把response列出来看看,是不是有报错
fmt.Printf("Successfully sent %d records to Fluentd\n", len(records))
}
return nil
}
// parsePeriod 将周期字符串转换为time.Duration
func parsePeriod(period string) (time.Duration, error) {
switch period {
case "1m":
return time.Minute, nil
case "3m":
return 3 * time.Minute, nil
case "5m":
return 5 * time.Minute, nil
case "15m":
return 15 * time.Minute, nil
case "30m":
return 30 * time.Minute, nil
case "1H":
return time.Hour, nil
case "2H":
return 2 * time.Hour, nil
case "4H":
return 4 * time.Hour, nil
case "6H":
return 6 * time.Hour, nil
case "12H":
return 12 * time.Hour, nil
case "1D":
return 24 * time.Hour, nil
case "D":
return 24 * time.Hour, nil
case "2D":
return 48 * time.Hour, nil
case "5D":
return 120 * time.Hour, nil
case "7D":
return 168 * time.Hour, nil
default:
return 0, fmt.Errorf("unsupported period: %s", period)
}
}

View File

@ -1,11 +1,14 @@
package okx package okx
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/google/go-querystring/query" // 用于将 struct 转为 URL 参数需要安装go get github.com/google/go-querystring "github.com/google/go-querystring/query" // 用于将 struct 转为 URL 参数需要安装go get github.com/google/go-querystring
"io"
"net/http" "net/http"
"net/url" "net/url"
"time"
) )
type OkxPublicDataService struct { type OkxPublicDataService struct {
@ -17,7 +20,9 @@ type OkxPublicDataService struct {
func NewOkxPublicDataService() *OkxPublicDataService { func NewOkxPublicDataService() *OkxPublicDataService {
return &OkxPublicDataService{ return &OkxPublicDataService{
BaseURL: "https://aws.okx.com/api/v5", BaseURL: "https://aws.okx.com/api/v5",
client: &http.Client{}, client: &http.Client{
Timeout: 10 * time.Second, // 设置10秒超时
},
} }
} }
@ -48,7 +53,25 @@ func (s *OkxPublicDataService) GetInstruments(params InstrumentsRequest) ([]Inst
} }
if apiResp.Code != "0" { if apiResp.Code != "0" {
return nil, fmt.Errorf("API error: %s", apiResp.Msg) return nil, fmt.Errorf("API error: code=%s, msg=%s, request_url=%s", apiResp.Code, apiResp.Msg, u.String())
}
// 进行类型断言并检查长度
switch data := apiResp.Data.(type) {
case []Instrument:
if len(data) == 0 {
return nil, fmt.Errorf("no data returned from API")
}
case [][]string:
if len(data) == 0 {
return nil, fmt.Errorf("no data returned from API")
}
case []*Ticker:
if len(data) == 0 {
return nil, fmt.Errorf("no data returned from API")
}
default:
return nil, fmt.Errorf("unexpected data type from API: %T", apiResp.Data)
} }
return *apiResp.Data.(*[]Instrument), nil return *apiResp.Data.(*[]Instrument), nil
@ -68,12 +91,53 @@ func (s *OkxPublicDataService) GetCandles(params CandlesRequest) ([]*Candle, err
} }
u.RawQuery = q.Encode() u.RawQuery = q.Encode()
resp, err := http.Get(u.String()) // 添加调试日志
fmt.Printf("Making request to: %s\n", u.String())
// 创建自定义HTTP请求
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("failed to create request: %v", err)
} }
// 设置请求头
req.Header.Set("Accept", "application/json")
req.Header.Set("Content-Type", "application/json")
// 发送请求
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("request failed: %v", err)
}
defer resp.Body.Close() defer resp.Body.Close()
// 打印详细调试信息
fmt.Printf("Request URL: %s\n", u.String())
fmt.Printf("Response Status: %s\n", resp.Status)
fmt.Printf("Response Headers: %v\n", resp.Header)
// 记录响应状态
fmt.Printf("Response Status: %s\n", resp.Status)
// 读取并记录响应体
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %v", err)
}
// 打印响应体长度和内容
fmt.Printf("Response Body Length: %d\n", len(body))
if len(body) > 0 {
fmt.Printf("Response Body: %s\n", string(body))
} else {
fmt.Println("Response Body is empty")
}
fmt.Printf("Response Body: %s\n", string(body))
// 重新设置resp.Body以便后续解码
resp.Body = io.NopCloser(bytes.NewBuffer(body))
var apiResp struct { var apiResp struct {
Code string `json:"code"` Code string `json:"code"`
Msg string `json:"msg"` Msg string `json:"msg"`

View File

@ -8,8 +8,8 @@ import (
) )
func TestCandleList_ToJson(t *testing.T) { func TestCandleList_ToJson(t *testing.T) {
startTime := time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC) startTime := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC)
endTime := time.Date(2022, 12, 31, 0, 0, 0, 0, time.UTC) endTime := time.Date(2023, 12, 31, 0, 0, 0, 0, time.UTC)
cl, err := MakeCandleList("BTC-USDT", "1D", startTime, endTime, 50) cl, err := MakeCandleList("BTC-USDT", "1D", startTime, endTime, 50)
if err != nil { if err != nil {
t.Fatalf("ToJson failed: %v", err) t.Fatalf("ToJson failed: %v", err)
@ -30,8 +30,8 @@ func TestCandleList_ToJson(t *testing.T) {
} }
func TestCandleList_ToCsv(t *testing.T) { func TestCandleList_ToCsv(t *testing.T) {
startTime := time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC) startTime := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC)
endTime := time.Date(2022, 12, 31, 0, 0, 0, 0, time.UTC) endTime := time.Date(2023, 12, 31, 0, 0, 0, 0, time.UTC)
cl, err := MakeCandleList("BTC-USDT", "1D", startTime, endTime, 50) cl, err := MakeCandleList("BTC-USDT", "1D", startTime, endTime, 50)
if err != nil { if err != nil {
t.Fatalf("ToJson failed: %v", err) t.Fatalf("ToJson failed: %v", err)
@ -50,3 +50,16 @@ func TestCandleList_ToCsv(t *testing.T) {
t.Logf("CSV output written to %s", tmpFile) t.Logf("CSV output written to %s", tmpFile)
} }
func TestCandleList_ToEs(t *testing.T) {
startTime := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC)
endTime := time.Date(2023, 12, 31, 0, 0, 0, 0, time.UTC)
cl, err := MakeCandleList("BTC-USDT", "1D", startTime, endTime, 50)
if err != nil {
t.Fatalf("ToEs failed: %v", err)
}
err = cl.ToEs()
if err != nil {
t.Fatalf("ToEs failed: %v", err)
}
}

View File

@ -21,10 +21,23 @@ func TestGetCandles(t *testing.T) {
t.Fatalf("Expected no error, got %v", err) t.Fatalf("Expected no error, got %v", err)
} }
// 添加详细验证
if len(candles) == 0 { if len(candles) == 0 {
t.Fatal("Expected at least one candle, got none") t.Fatal("Expected at least one candle, got none")
} }
// 检查第一个蜡烛数据
firstCandle := candles[0]
if firstCandle.Timestamp == "" {
t.Error("Expected non-empty Timestamp")
}
if firstCandle.Open == "" || firstCandle.High == "" || firstCandle.Low == "" || firstCandle.Close == "" {
t.Error("Expected non-empty OHLC values")
}
if firstCandle.Volume == "" || firstCandle.VolumeCcy == "" {
t.Error("Expected non-empty Volume and VolumeCcy")
}
// 检查返回数据的结构 // 检查返回数据的结构
for _, candle := range candles { for _, candle := range candles {
if candle.Timestamp == "" { if candle.Timestamp == "" {