candleList.ToElastic() 实现了, 支持自定义的index, 和_id, 绕过了fluentd
This commit is contained in:
		
							parent
							
								
									50feefda56
								
							
						
					
					
						commit
						d554c4b549
					
				| @ -329,11 +329,6 @@ func parsePeriod(period string) (time.Duration, error) { | |||||||
| 
 | 
 | ||||||
| // ToElastic 将Candle数据发送到Elasticsearch | // ToElastic 将Candle数据发送到Elasticsearch | ||||||
| func (cl *CandleList) ToElastic() error { | func (cl *CandleList) ToElastic() error { | ||||||
| 	// 获取当前年份 |  | ||||||
| 	currentYear := time.Now().Year() |  | ||||||
| 
 |  | ||||||
| 	// 构造索引名称,格式为:logstash-candle.交易对.年份.周期(保持周期原样) |  | ||||||
| 	index := fmt.Sprintf("logstash-candle.%s.%d.%s", strings.ToLower(cl.CoinPair), currentYear, strings.ToLower(cl.Period)) |  | ||||||
| 
 | 
 | ||||||
| 	// 创建带超时的HTTP客户端 | 	// 创建带超时的HTTP客户端 | ||||||
| 	client := &http.Client{ | 	client := &http.Client{ | ||||||
| @ -347,7 +342,7 @@ func (cl *CandleList) ToElastic() error { | |||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	for _, candle := range cl.Candles { | 	for _, candle := range cl.Candles { | ||||||
| 		// 验证时间戳是否为周期的整数倍 | 		// 解析时间戳 | ||||||
| 		ts, err := strconv.ParseInt(candle.Timestamp, 10, 64) | 		ts, err := strconv.ParseInt(candle.Timestamp, 10, 64) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			fmt.Println("invalid timestamp:", ts, err) | 			fmt.Println("invalid timestamp:", ts, err) | ||||||
| @ -358,6 +353,23 @@ func (cl *CandleList) ToElastic() error { | |||||||
| 		loc, _ := time.LoadLocation("Asia/Shanghai") | 		loc, _ := time.LoadLocation("Asia/Shanghai") | ||||||
| 		cstTime := time.UnixMilli(ts).In(loc) | 		cstTime := time.UnixMilli(ts).In(loc) | ||||||
| 
 | 
 | ||||||
|  | 		// 根据当前记录的时间确定年份 | ||||||
|  | 		currentYear := cstTime.Year() | ||||||
|  | 
 | ||||||
|  | 		// 动态构造索引名称 | ||||||
|  | 		index := fmt.Sprintf("logstash-candle.%s.%d.%s", strings.ToLower(cl.CoinPair), currentYear, strings.ToLower(cl.Period)) | ||||||
|  | 
 | ||||||
|  | 		// 验证时间戳是否为周期的整数倍 | ||||||
|  | 		if cl.Period == "1D" { | ||||||
|  | 			if cstTime.Hour() != 0 || cstTime.Minute() != 0 || cstTime.Second() != 0 { | ||||||
|  | 				return fmt.Errorf("timestamp %d (%s) is not aligned with period %s", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period) | ||||||
|  | 			} | ||||||
|  | 		} else { | ||||||
|  | 			if cstTime.UnixMilli()%duration.Milliseconds() != 0 { | ||||||
|  | 				return fmt.Errorf("timestamp %d is not aligned with period %s", ts, cl.Period) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
| 		// 添加调试信息 | 		// 添加调试信息 | ||||||
| 		fmt.Printf("Timestamp: %d, CST Time: %s, Period: %s\n", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period) | 		fmt.Printf("Timestamp: %d, CST Time: %s, Period: %s\n", ts, cstTime.Format("2006-01-02 15:04:05"), cl.Period) | ||||||
| 
 | 
 | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user
	 zhangkun9038@dingtalk.com
						zhangkun9038@dingtalk.com