From c9600d93553fd2db41455c297113a6f1b16ac73b Mon Sep 17 00:00:00 2001 From: "zhangkun9038@dingtalk.com" Date: Mon, 13 Jan 2025 20:27:38 +0800 Subject: [PATCH] CheckSortedSet --- main.go | 124 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) diff --git a/main.go b/main.go index b14db9f..f598de9 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "context" "encoding/json" + "strconv" // "fmt" "math/rand" @@ -10,6 +11,7 @@ import ( "strings" "time" + "github.com/go-redis/redis" "github.com/phyer/v5sdkgo/rest" "github.com/sirupsen/logrus" @@ -103,6 +105,125 @@ func LoopRestTicker(cr *core.Core) { } } +// CheckSortedSet 遍历redis下所有以"candle"开头,"|sortedSet"结尾的key,检查其内容是否达到300条记录且连续, +// 如果不满足,判定出缺少的时间段,构造出一个core.RestQueue对象,序列化后,推到redis的restQueue队列中 +func CheckSortedSet(cr *core.Core) { + time.Sleep(30 * time.Second) + keys, err := cr.RedisLocalCli.Keys("candle*|sortedSet").Result() + if err != nil { + logrus.Errorf("获取Redis键失败: %v", err) + return + } + + for _, key := range keys { + time.Sleep(2 * time.Second) + period, instId := extractInfo(key) + if period == "" || instId == "" { + logrus.Warnf("无法从键 %s 中提取周期信息或instId", key) + continue + } + + // 使用 ZRangeWithScores 获取成员和分数 + members, err := cr.RedisLocalCli.ZRangeWithScores(key, 0, -1).Result() + if err != nil { + logrus.Errorf("获取SortedSet %s 的成员失败: %v", key, err) + continue + } + + if len(members) < 300 || !isConsecutive(members, period) { + missingTimes := findMissingPeriods(members, period) + if len(missingTimes) > 0 { + restQueue := &core.RestQueue{ + InstId: instId, + Bar: period, + After: missingTimes[0].UnixMilli(), + Limit: strconv.Itoa(len(missingTimes)), + WithWs: false, + } + + jsonData, err := json.Marshal(restQueue) + if err != nil { + logrus.Errorf("序列化RestQueue失败: %v", err) + continue + } + + err = cr.RedisLocalCli.RPush("restQueue", jsonData).Err() + if err != nil { + logrus.Errorf("推送到restQueue失败: %v", err) + } + } + } + } +} + +// extractInfo 从键名中提取周期信息和instId +func extractInfo(key string) (period string, instId string) { + parts := strings.Split(key, "|") + if len(parts) != 3 { + return "", "" + } + periodPart := strings.TrimPrefix(parts[0], "candle") + return periodPart, parts[1] +} + +// isConsecutive 检查记录是否连续 +func isConsecutive(members []redis.Z, period string) bool { + if len(members) < 2 { + return true + } + + duration := parseDuration(period) + for i := 1; i < len(members); i++ { + prev := int64(members[i-1].Score) + curr := int64(members[i].Score) + if curr-prev != int64(duration.Milliseconds()) { + return false + } + } + return true +} + +// findMissingPeriods 找出第一个缺失的时间段,返回连续缺失的时间戳 +func findMissingPeriods(members []redis.Z, period string) []time.Time { + if len(members) < 2 { + return nil + } + + duration := parseDuration(period) + var missingTimes []time.Time + + for i := 1; i < len(members); i++ { + prev := int64(members[i-1].Score) + curr := int64(members[i].Score) + expected := prev + duration.Milliseconds() + + if expected < curr { + for t := expected; t < curr; t += duration.Milliseconds() { + missingTimes = append(missingTimes, time.UnixMilli(t)) + } + break // 只处理第一个缺失段 + } + } + return missingTimes +} + +// parseDuration 解析周期字符串为time.Duration +func parseDuration(period string) time.Duration { + unit := period[len(period)-1:] + value, _ := strconv.Atoi(period[:len(period)-1]) + + switch unit { + case "m": + return time.Duration(value) * time.Minute + case "H": + return time.Duration(value) * time.Hour + case "D": + return time.Duration(value) * 24 * time.Hour + default: + return 0 + } +} + // 统一受理发起rest请求的请求 func LoopSaveCandle(cr *core.Core) { for { @@ -316,6 +437,9 @@ func main() { go func() { core.WriteLogProcess(&cr) }() + go func() { + CheckSortedSet(&cr) + }() // 永久阻塞 select {} }