candleList.ToElastic() 实现了, 支持自定义的index, 和_id, 绕过了fluentd
This commit is contained in:
		
							parent
							
								
									f0814c40bc
								
							
						
					
					
						commit
						50feefda56
					
				| @ -6,7 +6,10 @@ import ( | |||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| type Config struct { | type Config struct { | ||||||
| 	FluentdURL string `json:"fluentdURL_url"` | 	FluentdURL            string `json:"fluentdURL"` | ||||||
|  | 	ElasticsearchURL      string `json:"ElasticsearchURL"` | ||||||
|  | 	ElasticsearchUser     string `json:"ElasticsearchUser"` | ||||||
|  | 	ElasticsearchPassword string `json:"ElasticsearchPassword"` | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // LoadConfig 从指定路径加载配置文件 | // LoadConfig 从指定路径加载配置文件 | ||||||
|  | |||||||
| @ -1,5 +1,8 @@ | |||||||
| { | { | ||||||
|     "fluentdURL_url": "http://fluentd.k8s.xunlang.home" |     "fluentdURL": "http://fluentd.k8s.xunlang.home", | ||||||
|  |     "ElasticsearchURL": "http://elastic.k8s.xunlang.home", | ||||||
|  |     "ElasticsearchUser": "fluentd_user", | ||||||
|  |     "ElasticsearchPassword": "fluentd_password" | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -326,3 +326,130 @@ func parsePeriod(period string) (time.Duration, error) { | |||||||
| 		return 0, fmt.Errorf("unsupported period: %s", period) | 		return 0, fmt.Errorf("unsupported period: %s", period) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | // ToElastic 将Candle数据发送到Elasticsearch | ||||||
|  | func (cl *CandleList) ToElastic() error { | ||||||
|  | 	// 获取当前年份 | ||||||
|  | 	currentYear := time.Now().Year() | ||||||
|  | 
 | ||||||
|  | 	// 构造索引名称,格式为:logstash-candle.交易对.年份.周期(保持周期原样) | ||||||
|  | 	index := fmt.Sprintf("logstash-candle.%s.%d.%s", strings.ToLower(cl.CoinPair), currentYear, strings.ToLower(cl.Period)) | ||||||
|  | 
 | ||||||
|  | 	// 创建带超时的HTTP客户端 | ||||||
|  | 	client := &http.Client{ | ||||||
|  | 		Timeout: 30 * time.Second, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// 获取周期的 duration | ||||||
|  | 	duration, err := parsePeriod(cl.Period) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return fmt.Errorf("failed to parse period: %v", err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	for _, candle := range cl.Candles { | ||||||
|  | 		// 验证时间戳是否为周期的整数倍 | ||||||
|  | 		ts, err := strconv.ParseInt(candle.Timestamp, 10, 64) | ||||||
|  | 		if err != nil { | ||||||
|  | 			fmt.Println("invalid timestamp:", ts, err) | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		// 转换为东八区时间 | ||||||
|  | 		loc, _ := time.LoadLocation("Asia/Shanghai") | ||||||
|  | 		cstTime := time.UnixMilli(ts).In(loc) | ||||||
|  | 
 | ||||||
|  | 		// 添加调试信息 | ||||||
|  | 		fmt.Printf("Timestamp: %d, CST Time: %s, Period: %s\n", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period) | ||||||
|  | 
 | ||||||
|  | 		// 对于日线数据,检查是否为当天的 00:00:00 | ||||||
|  | 		if cl.Period == "1D" { | ||||||
|  | 			if cstTime.Hour() != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 { | ||||||
|  | 				return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period) | ||||||
|  | 			} | ||||||
|  | 		} else { | ||||||
|  | 			// 对于其他周期,使用原来的对齐检查 | ||||||
|  | 			if cstTime.UnixMilli()%duration.Milliseconds() != 0 { | ||||||
|  | 				return fmt.Errorf("timestamp %d is not aligned with period %s", ts, cl.Period) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		// 构造记录 | ||||||
|  | 		record := map[string]interface{}{ | ||||||
|  | 			"dataTime":  cstTime.Format("2006-01-02 15:04:05"), | ||||||
|  | 			"open":      candle.Open, | ||||||
|  | 			"high":      candle.High, | ||||||
|  | 			"low":       candle.Low, | ||||||
|  | 			"close":     candle.Close, | ||||||
|  | 			"volume":    candle.Volume, | ||||||
|  | 			"volumeCcy": candle.VolumeCcy, | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		// 构造请求体 | ||||||
|  | 		jsonData, err := json.Marshal(record) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return fmt.Errorf("failed to marshal data: %v", err) | ||||||
|  | 		} | ||||||
|  | 		var config *cfg.Config | ||||||
|  | 		configPaths := []string{ | ||||||
|  | 			"config/config.json",       // 当前目录下的config目录 | ||||||
|  | 			"../config/config.json",    // 上一级目录下的config目录 | ||||||
|  | 			"../../config/config.json", // 上两级目录下的config目录 | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		var loadErr error | ||||||
|  | 		for _, path := range configPaths { | ||||||
|  | 			config, loadErr = cfg.LoadConfig(path) | ||||||
|  | 			if loadErr == nil { | ||||||
|  | 				break | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		if config == nil { | ||||||
|  | 			return fmt.Errorf("failed to load configuration: %v", loadErr) | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		// 构造完整URL | ||||||
|  | 		fullURL := fmt.Sprintf("%s/%s/_doc/%d", strings.TrimRight(config.ElasticsearchURL, "/"), index, ts) | ||||||
|  | 		fmt.Println("fullURL: ", fullURL) | ||||||
|  | 
 | ||||||
|  | 		// 创建请求 | ||||||
|  | 		req, err := http.NewRequest("POST", fullURL, bytes.NewBuffer(jsonData)) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return fmt.Errorf("failed to create request: %v", err) | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		// 设置请求头 | ||||||
|  | 		req.Header.Set("Content-Type", "application/json") | ||||||
|  | 		req.Header.Set("Accept", "application/json") | ||||||
|  | 		// 尝试从不同层级加载配置 | ||||||
|  | 
 | ||||||
|  | 		// 设置基本认证 | ||||||
|  | 		req.SetBasicAuth(config.ElasticsearchUser, config.ElasticsearchPassword) | ||||||
|  | 
 | ||||||
|  | 		// 发送HTTP请求 | ||||||
|  | 		resp, err := client.Do(req) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return fmt.Errorf("failed to send data to Elasticsearch: %v", err) | ||||||
|  | 		} | ||||||
|  | 		defer resp.Body.Close() | ||||||
|  | 
 | ||||||
|  | 		// 读取响应体 | ||||||
|  | 		body, err := io.ReadAll(resp.Body) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return fmt.Errorf("failed to read response body: %v", err) | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		// 输出完整的响应信息 | ||||||
|  | 		fmt.Printf("HTTP Response Status: %s\n", resp.Status) | ||||||
|  | 		fmt.Printf("HTTP Response Headers: %v\n", resp.Header) | ||||||
|  | 		fmt.Printf("HTTP Response Body: %s\n", string(body)) | ||||||
|  | 
 | ||||||
|  | 		if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { | ||||||
|  | 			return fmt.Errorf("unexpected status code: %d, response: %s", resp.StatusCode, string(body)) | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		fmt.Printf("Successfully sent record to Elasticsearch: %s\n", fullURL) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | |||||||
| @ -2,63 +2,75 @@ package okx | |||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	. "gitea.zjmud.xyz/phyer/tanya/okx" | 	. "gitea.zjmud.xyz/phyer/tanya/okx" | ||||||
| 	"os" | 	// "os" | ||||||
| 	"testing" | 	"testing" | ||||||
| 	"time" | 	"time" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| func TestCandleList_ToJson(t *testing.T) { | // func TestCandleList_ToJson(t *testing.T) { | ||||||
| 	startTime := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC) | // 	startTime := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC) | ||||||
| 	endTime := time.Date(2023, 12, 31, 0, 0, 0, 0, time.UTC) | // 	endTime := time.Date(2023, 12, 31, 0, 0, 0, 0, time.UTC) | ||||||
| 	cl, err := MakeCandleList("BTC-USDT", "1D", startTime, endTime, 50) | // 	cl, err := MakeCandleList("BTC-USDT", "1D", startTime, endTime, 50) | ||||||
| 	if err != nil { | // 	if err != nil { | ||||||
| 		t.Fatalf("ToJson failed: %v", err) | // 		t.Fatalf("ToJson failed: %v", err) | ||||||
| 	} | // 	} | ||||||
| 	jsonStr, err := cl.ToJson() | // 	jsonStr, err := cl.ToJson() | ||||||
| 	if err != nil { | // 	if err != nil { | ||||||
| 		t.Fatalf("ToJson failed: %v", err) | // 		t.Fatalf("ToJson failed: %v", err) | ||||||
| 	} | // 	} | ||||||
|  | // | ||||||
|  | // 	// Write to temp file | ||||||
|  | // 	tmpFile := "/tmp/candle_test.json" | ||||||
|  | // 	err = os.WriteFile(tmpFile, []byte(jsonStr), 0644) | ||||||
|  | // 	if err != nil { | ||||||
|  | // 		t.Fatalf("Failed to write json file: %v", err) | ||||||
|  | // 	} | ||||||
|  | // | ||||||
|  | // 	t.Logf("JSON output written to %s", tmpFile) | ||||||
|  | // } | ||||||
|  | // | ||||||
|  | // func TestCandleList_ToCsv(t *testing.T) { | ||||||
|  | // 	startTime := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC) | ||||||
|  | // 	endTime := time.Date(2023, 12, 31, 0, 0, 0, 0, time.UTC) | ||||||
|  | // 	cl, err := MakeCandleList("BTC-USDT", "1D", startTime, endTime, 50) | ||||||
|  | // 	if err != nil { | ||||||
|  | // 		t.Fatalf("ToJson failed: %v", err) | ||||||
|  | // 	} | ||||||
|  | // 	csvStr, err := cl.ToCsv() | ||||||
|  | // 	if err != nil { | ||||||
|  | // 		t.Fatalf("ToCsv failed: %v", err) | ||||||
|  | // 	} | ||||||
|  | // | ||||||
|  | // 	// Write to temp file | ||||||
|  | // 	tmpFile := "/tmp/candle_test.csv" | ||||||
|  | // 	err = os.WriteFile(tmpFile, []byte(csvStr), 0644) | ||||||
|  | // 	if err != nil { | ||||||
|  | // 		t.Fatalf("Failed to write csv file: %v", err) | ||||||
|  | // 	} | ||||||
|  | // | ||||||
|  | // 	t.Logf("CSV output written to %s", tmpFile) | ||||||
|  | // } | ||||||
| 
 | 
 | ||||||
| 	// Write to temp file | //	func TestCandleList_ToFluentd(t *testing.T) { | ||||||
| 	tmpFile := "/tmp/candle_test.json" | //		startTime := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC) | ||||||
| 	err = os.WriteFile(tmpFile, []byte(jsonStr), 0644) | //		endTime := time.Date(2023, 12, 31, 0, 0, 0, 0, time.UTC) | ||||||
| 	if err != nil { | //		cl, err := MakeCandleList("BTC-USDT", "1D", startTime, endTime, 50) | ||||||
| 		t.Fatalf("Failed to write json file: %v", err) | //		if err != nil { | ||||||
| 	} | //			t.Fatalf("ToEs failed: %v", err) | ||||||
| 
 | //		} | ||||||
| 	t.Logf("JSON output written to %s", tmpFile) | //		err = cl.ToFluentd() | ||||||
| } | //		if err != nil { | ||||||
| 
 | //			t.Fatalf("ToEs failed: %v", err) | ||||||
| func TestCandleList_ToCsv(t *testing.T) { | //		} | ||||||
| 	startTime := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC) | //	} | ||||||
| 	endTime := time.Date(2023, 12, 31, 0, 0, 0, 0, time.UTC) | func TestCandleList_ToElastic(t *testing.T) { | ||||||
| 	cl, err := MakeCandleList("BTC-USDT", "1D", startTime, endTime, 50) |  | ||||||
| 	if err != nil { |  | ||||||
| 		t.Fatalf("ToJson failed: %v", err) |  | ||||||
| 	} |  | ||||||
| 	csvStr, err := cl.ToCsv() |  | ||||||
| 	if err != nil { |  | ||||||
| 		t.Fatalf("ToCsv failed: %v", err) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	// Write to temp file |  | ||||||
| 	tmpFile := "/tmp/candle_test.csv" |  | ||||||
| 	err = os.WriteFile(tmpFile, []byte(csvStr), 0644) |  | ||||||
| 	if err != nil { |  | ||||||
| 		t.Fatalf("Failed to write csv file: %v", err) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	t.Logf("CSV output written to %s", tmpFile) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func TestCandleList_ToEs(t *testing.T) { |  | ||||||
| 	startTime := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC) | 	startTime := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC) | ||||||
| 	endTime := time.Date(2023, 12, 31, 0, 0, 0, 0, time.UTC) | 	endTime := time.Date(2023, 12, 31, 0, 0, 0, 0, time.UTC) | ||||||
| 	cl, err := MakeCandleList("BTC-USDT", "1D", startTime, endTime, 50) | 	cl, err := MakeCandleList("BTC-USDT", "1D", startTime, endTime, 50) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatalf("ToEs failed: %v", err) | 		t.Fatalf("ToEs failed: %v", err) | ||||||
| 	} | 	} | ||||||
| 	err = cl.ToFluentd() | 	err = cl.ToElastic() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatalf("ToEs failed: %v", err) | 		t.Fatalf("ToEs failed: %v", err) | ||||||
| 	} | 	} | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user
	 zhangkun9038@dingtalk.com
						zhangkun9038@dingtalk.com