From 83790d53c024153b122a66c31abca032c97cb697 Mon Sep 17 00:00:00 2001 From: "zhangkun9038@dingtalk.com" Date: Mon, 13 Jan 2025 20:52:29 +0800 Subject: [PATCH] CheckSortedSet --- main.go | 78 ++++++++++++++++++++++++--------------------------------- 1 file changed, 33 insertions(+), 45 deletions(-) diff --git a/main.go b/main.go index 9b2eb32..ca12245 100644 --- a/main.go +++ b/main.go @@ -105,52 +105,46 @@ func LoopRestTicker(cr *core.Core) { } } -// CheckSortedSet 遍历redis下所有以"candle"开头,"|sortedSet"结尾的key,检查其内容是否达到300条记录且连续, -// 如果不满足,判定出缺少的时间段,构造出一个core.RestQueue对象,序列化后,推到redis的restQueue队列中 +// CheckSortedSet 函数需要相应修改 func CheckSortedSet(cr *core.Core) { - time.Sleep(30 * time.Second) keys, err := cr.RedisLocalCli.Keys("candle*|sortedSet").Result() if err != nil { - logrus.Errorf("获取Redis键失败: %v", err) + logrus.Error("获取Redis键失败:", err) return } for _, key := range keys { - time.Sleep(2 * time.Second) period, instId := extractInfo(key) if period == "" || instId == "" { logrus.Warnf("无法从键 %s 中提取周期信息或instId", key) continue } - // 使用 ZRangeWithScores 获取成员和分数 members, err := cr.RedisLocalCli.ZRangeWithScores(key, 0, -1).Result() if err != nil { logrus.Errorf("获取SortedSet %s 的成员失败: %v", key, err) continue } - if len(members) < 300 || !isConsecutive(members, period) { - missingTimes := findMissingPeriods(members, period) - if len(missingTimes) > 0 { - restQueue := &core.RestQueue{ - InstId: instId, - Bar: period, - After: missingTimes[0].UnixMilli(), - Limit: strconv.Itoa(len(missingTimes)), - WithWs: false, - } + missingTimes := findMissingPeriods(members, period) + if len(missingTimes) > 0 { + restQueue := &core.RestQueue{ + InstId: instId, + Bar: period, + After: missingTimes[0].UnixMilli(), + Limit: strconv.Itoa(len(missingTimes)), + WithWs: false, + } - jsonData, err := json.Marshal(restQueue) - if err != nil { - logrus.Errorf("序列化RestQueue失败: %v", err) - continue - } + jsonData, err := json.Marshal(restQueue) + if err != nil { + logrus.Error("序列化RestQueue失败:", err) + continue + } - err = cr.RedisLocalCli.RPush("restQueue", jsonData).Err() - if err != nil { - logrus.Errorf("推送到restQueue失败: %v", err) - } + err = cr.RedisLocalCli.RPush("restQueue", jsonData).Err() + if err != nil { + logrus.Error("推送到restQueue失败:", err) } } } @@ -166,32 +160,16 @@ func extractInfo(key string) (period string, instId string) { return periodPart, parts[1] } -// isConsecutive 检查记录是否连续 -func isConsecutive(members []redis.Z, period string) bool { - if len(members) < 2 { - return true - } - - duration := parseDuration(period) - for i := 1; i < len(members); i++ { - prev := int64(members[i-1].Score) - curr := int64(members[i].Score) - if curr-prev != int64(duration.Milliseconds()) { - return false - } - } - return true -} - -// findMissingPeriods 找出第一个缺失的时间段,返回连续缺失的时间戳 +// findMissingPeriods 找出缺失的时间段,返回连续缺失的时间戳 func findMissingPeriods(members []redis.Z, period string) []time.Time { - if len(members) < 2 { + if len(members) == 0 { return nil } duration := parseDuration(period) var missingTimes []time.Time + // 检查是否有中间缺失的部分 for i := 1; i < len(members); i++ { prev := int64(members[i-1].Score) curr := int64(members[i].Score) @@ -201,9 +179,19 @@ func findMissingPeriods(members []redis.Z, period string) []time.Time { for t := expected; t < curr; t += duration.Milliseconds() { missingTimes = append(missingTimes, time.UnixMilli(t)) } - break // 只处理第一个缺失段 + return missingTimes // 返回第一个缺失段 } } + + // 如果没有中间缺失,但记录数少于300,从最后一条记录开始向后补充 + if len(members) < 300 { + lastTimestamp := int64(members[len(members)-1].Score) + for i := len(members); i < 300; i++ { + lastTimestamp += duration.Milliseconds() + missingTimes = append(missingTimes, time.UnixMilli(lastTimestamp)) + } + } + return missingTimes }