CheckSortedSet

This commit is contained in:
zhangkun9038@dingtalk.com 2025-01-13 20:52:29 +08:00
parent b0a12688f9
commit 83790d53c0

78
main.go
View File

@ -105,52 +105,46 @@ func LoopRestTicker(cr *core.Core) {
} }
} }
// CheckSortedSet 遍历redis下所有以"candle"开头,"|sortedSet"结尾的key检查其内容是否达到300条记录且连续 // CheckSortedSet 函数需要相应修改
// 如果不满足判定出缺少的时间段构造出一个core.RestQueue对象序列化后推到redis的restQueue队列中
func CheckSortedSet(cr *core.Core) { func CheckSortedSet(cr *core.Core) {
time.Sleep(30 * time.Second)
keys, err := cr.RedisLocalCli.Keys("candle*|sortedSet").Result() keys, err := cr.RedisLocalCli.Keys("candle*|sortedSet").Result()
if err != nil { if err != nil {
logrus.Errorf("获取Redis键失败: %v", err) logrus.Error("获取Redis键失败:", err)
return return
} }
for _, key := range keys { for _, key := range keys {
time.Sleep(2 * time.Second)
period, instId := extractInfo(key) period, instId := extractInfo(key)
if period == "" || instId == "" { if period == "" || instId == "" {
logrus.Warnf("无法从键 %s 中提取周期信息或instId", key) logrus.Warnf("无法从键 %s 中提取周期信息或instId", key)
continue continue
} }
// 使用 ZRangeWithScores 获取成员和分数
members, err := cr.RedisLocalCli.ZRangeWithScores(key, 0, -1).Result() members, err := cr.RedisLocalCli.ZRangeWithScores(key, 0, -1).Result()
if err != nil { if err != nil {
logrus.Errorf("获取SortedSet %s 的成员失败: %v", key, err) logrus.Errorf("获取SortedSet %s 的成员失败: %v", key, err)
continue continue
} }
if len(members) < 300 || !isConsecutive(members, period) { missingTimes := findMissingPeriods(members, period)
missingTimes := findMissingPeriods(members, period) if len(missingTimes) > 0 {
if len(missingTimes) > 0 { restQueue := &core.RestQueue{
restQueue := &core.RestQueue{ InstId: instId,
InstId: instId, Bar: period,
Bar: period, After: missingTimes[0].UnixMilli(),
After: missingTimes[0].UnixMilli(), Limit: strconv.Itoa(len(missingTimes)),
Limit: strconv.Itoa(len(missingTimes)), WithWs: false,
WithWs: false, }
}
jsonData, err := json.Marshal(restQueue) jsonData, err := json.Marshal(restQueue)
if err != nil { if err != nil {
logrus.Errorf("序列化RestQueue失败: %v", err) logrus.Error("序列化RestQueue失败:", err)
continue continue
} }
err = cr.RedisLocalCli.RPush("restQueue", jsonData).Err() err = cr.RedisLocalCli.RPush("restQueue", jsonData).Err()
if err != nil { if err != nil {
logrus.Errorf("推送到restQueue失败: %v", err) logrus.Error("推送到restQueue失败:", err)
}
} }
} }
} }
@ -166,32 +160,16 @@ func extractInfo(key string) (period string, instId string) {
return periodPart, parts[1] return periodPart, parts[1]
} }
// isConsecutive 检查记录是否连续 // findMissingPeriods 找出缺失的时间段,返回连续缺失的时间戳
func isConsecutive(members []redis.Z, period string) bool {
if len(members) < 2 {
return true
}
duration := parseDuration(period)
for i := 1; i < len(members); i++ {
prev := int64(members[i-1].Score)
curr := int64(members[i].Score)
if curr-prev != int64(duration.Milliseconds()) {
return false
}
}
return true
}
// findMissingPeriods 找出第一个缺失的时间段,返回连续缺失的时间戳
func findMissingPeriods(members []redis.Z, period string) []time.Time { func findMissingPeriods(members []redis.Z, period string) []time.Time {
if len(members) < 2 { if len(members) == 0 {
return nil return nil
} }
duration := parseDuration(period) duration := parseDuration(period)
var missingTimes []time.Time var missingTimes []time.Time
// 检查是否有中间缺失的部分
for i := 1; i < len(members); i++ { for i := 1; i < len(members); i++ {
prev := int64(members[i-1].Score) prev := int64(members[i-1].Score)
curr := int64(members[i].Score) curr := int64(members[i].Score)
@ -201,9 +179,19 @@ func findMissingPeriods(members []redis.Z, period string) []time.Time {
for t := expected; t < curr; t += duration.Milliseconds() { for t := expected; t < curr; t += duration.Milliseconds() {
missingTimes = append(missingTimes, time.UnixMilli(t)) missingTimes = append(missingTimes, time.UnixMilli(t))
} }
break // 只处理第一个缺失段 return missingTimes // 返回第一个缺失段
} }
} }
// 如果没有中间缺失但记录数少于300从最后一条记录开始向后补充
if len(members) < 300 {
lastTimestamp := int64(members[len(members)-1].Score)
for i := len(members); i < 300; i++ {
lastTimestamp += duration.Milliseconds()
missingTimes = append(missingTimes, time.UnixMilli(lastTimestamp))
}
}
return missingTimes return missingTimes
} }