CheckSortedSet
This commit is contained in:
parent
b0f78d3c15
commit
c9600d9355
124
main.go
124
main.go
@ -3,6 +3,7 @@ package main
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
// "fmt"
|
// "fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
@ -10,6 +11,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/go-redis/redis"
|
||||||
"github.com/phyer/v5sdkgo/rest"
|
"github.com/phyer/v5sdkgo/rest"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
@ -103,6 +105,125 @@ func LoopRestTicker(cr *core.Core) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CheckSortedSet 遍历redis下所有以"candle"开头,"|sortedSet"结尾的key,检查其内容是否达到300条记录且连续,
|
||||||
|
// 如果不满足,判定出缺少的时间段,构造出一个core.RestQueue对象,序列化后,推到redis的restQueue队列中
|
||||||
|
func CheckSortedSet(cr *core.Core) {
|
||||||
|
time.Sleep(30 * time.Second)
|
||||||
|
keys, err := cr.RedisLocalCli.Keys("candle*|sortedSet").Result()
|
||||||
|
if err != nil {
|
||||||
|
logrus.Errorf("获取Redis键失败: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, key := range keys {
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
period, instId := extractInfo(key)
|
||||||
|
if period == "" || instId == "" {
|
||||||
|
logrus.Warnf("无法从键 %s 中提取周期信息或instId", key)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// 使用 ZRangeWithScores 获取成员和分数
|
||||||
|
members, err := cr.RedisLocalCli.ZRangeWithScores(key, 0, -1).Result()
|
||||||
|
if err != nil {
|
||||||
|
logrus.Errorf("获取SortedSet %s 的成员失败: %v", key, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(members) < 300 || !isConsecutive(members, period) {
|
||||||
|
missingTimes := findMissingPeriods(members, period)
|
||||||
|
if len(missingTimes) > 0 {
|
||||||
|
restQueue := &core.RestQueue{
|
||||||
|
InstId: instId,
|
||||||
|
Bar: period,
|
||||||
|
After: missingTimes[0].UnixMilli(),
|
||||||
|
Limit: strconv.Itoa(len(missingTimes)),
|
||||||
|
WithWs: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
jsonData, err := json.Marshal(restQueue)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Errorf("序列化RestQueue失败: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
err = cr.RedisLocalCli.RPush("restQueue", jsonData).Err()
|
||||||
|
if err != nil {
|
||||||
|
logrus.Errorf("推送到restQueue失败: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// extractInfo 从键名中提取周期信息和instId
|
||||||
|
func extractInfo(key string) (period string, instId string) {
|
||||||
|
parts := strings.Split(key, "|")
|
||||||
|
if len(parts) != 3 {
|
||||||
|
return "", ""
|
||||||
|
}
|
||||||
|
periodPart := strings.TrimPrefix(parts[0], "candle")
|
||||||
|
return periodPart, parts[1]
|
||||||
|
}
|
||||||
|
|
||||||
|
// isConsecutive 检查记录是否连续
|
||||||
|
func isConsecutive(members []redis.Z, period string) bool {
|
||||||
|
if len(members) < 2 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
duration := parseDuration(period)
|
||||||
|
for i := 1; i < len(members); i++ {
|
||||||
|
prev := int64(members[i-1].Score)
|
||||||
|
curr := int64(members[i].Score)
|
||||||
|
if curr-prev != int64(duration.Milliseconds()) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// findMissingPeriods 找出第一个缺失的时间段,返回连续缺失的时间戳
|
||||||
|
func findMissingPeriods(members []redis.Z, period string) []time.Time {
|
||||||
|
if len(members) < 2 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
duration := parseDuration(period)
|
||||||
|
var missingTimes []time.Time
|
||||||
|
|
||||||
|
for i := 1; i < len(members); i++ {
|
||||||
|
prev := int64(members[i-1].Score)
|
||||||
|
curr := int64(members[i].Score)
|
||||||
|
expected := prev + duration.Milliseconds()
|
||||||
|
|
||||||
|
if expected < curr {
|
||||||
|
for t := expected; t < curr; t += duration.Milliseconds() {
|
||||||
|
missingTimes = append(missingTimes, time.UnixMilli(t))
|
||||||
|
}
|
||||||
|
break // 只处理第一个缺失段
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return missingTimes
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseDuration 解析周期字符串为time.Duration
|
||||||
|
func parseDuration(period string) time.Duration {
|
||||||
|
unit := period[len(period)-1:]
|
||||||
|
value, _ := strconv.Atoi(period[:len(period)-1])
|
||||||
|
|
||||||
|
switch unit {
|
||||||
|
case "m":
|
||||||
|
return time.Duration(value) * time.Minute
|
||||||
|
case "H":
|
||||||
|
return time.Duration(value) * time.Hour
|
||||||
|
case "D":
|
||||||
|
return time.Duration(value) * 24 * time.Hour
|
||||||
|
default:
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// 统一受理发起rest请求的请求
|
// 统一受理发起rest请求的请求
|
||||||
func LoopSaveCandle(cr *core.Core) {
|
func LoopSaveCandle(cr *core.Core) {
|
||||||
for {
|
for {
|
||||||
@ -316,6 +437,9 @@ func main() {
|
|||||||
go func() {
|
go func() {
|
||||||
core.WriteLogProcess(&cr)
|
core.WriteLogProcess(&cr)
|
||||||
}()
|
}()
|
||||||
|
go func() {
|
||||||
|
CheckSortedSet(&cr)
|
||||||
|
}()
|
||||||
// 永久阻塞
|
// 永久阻塞
|
||||||
select {}
|
select {}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user