跟SaveUikey关系不大,应该是max没有区分7/30的hash key造成写入es被统一对待了

This commit is contained in:
zhangkun9038@dingtalk.com 2025-01-14 23:10:02 +08:00
parent 65e8b76080
commit 9b5771b301

133
main.go
View File

@ -3,7 +3,6 @@ package main
import (
"context"
"encoding/json"
"strconv"
// "fmt"
"math/rand"
@ -11,7 +10,6 @@ import (
"strings"
"time"
"github.com/go-redis/redis"
"github.com/phyer/v5sdkgo/rest"
"github.com/sirupsen/logrus"
@ -105,113 +103,6 @@ func LoopRestTicker(cr *core.Core) {
}
}
// CheckSortedSet 函数需要相应修改
func CheckSortedSet(cr *core.Core) {
keys, err := cr.RedisLocalCli.Keys("candle*|sortedSet").Result()
if err != nil {
logrus.Error("获取Redis键失败:", err)
return
}
for _, key := range keys {
period, instId := extractInfo(key)
if period == "" || instId == "" {
logrus.Warnf("无法从键 %s 中提取周期信息或instId", key)
continue
}
members, err := cr.RedisLocalCli.ZRangeWithScores(key, 0, -1).Result()
if err != nil {
logrus.Errorf("获取SortedSet %s 的成员失败: %v", key, err)
continue
}
missingTimes := findMissingPeriods(members, period)
if len(missingTimes) > 0 {
restQueue := &core.RestQueue{
InstId: instId,
Bar: period,
After: missingTimes[0].UnixMilli(),
Limit: strconv.Itoa(len(missingTimes)),
WithWs: false,
}
jsonData, err := json.Marshal(restQueue)
if err != nil {
logrus.Error("序列化RestQueue失败:", err)
continue
}
err = cr.RedisLocalCli.RPush("restQueue", jsonData).Err()
if err != nil {
logrus.Error("推送到restQueue失败:", err)
}
}
}
}
// extractInfo 从键名中提取周期信息和instId
func extractInfo(key string) (period string, instId string) {
parts := strings.Split(key, "|")
if len(parts) != 3 {
return "", ""
}
periodPart := strings.TrimPrefix(parts[0], "candle")
return periodPart, parts[1]
}
// findMissingPeriods 找出缺失的时间段,返回连续缺失的时间戳
func findMissingPeriods(members []redis.Z, period string) []time.Time {
if len(members) == 0 {
return nil
}
duration := parseDuration(period)
var missingTimes []time.Time
// 检查是否有中间缺失的部分
for i := 1; i < len(members); i++ {
prev := int64(members[i-1].Score)
curr := int64(members[i].Score)
expected := prev + duration.Milliseconds()
if expected < curr {
for t := expected; t < curr; t += duration.Milliseconds() {
missingTimes = append(missingTimes, time.UnixMilli(t))
}
return missingTimes // 返回第一个缺失段
}
}
// 如果没有中间缺失但记录数少于300从最后一条记录开始向后补充
if len(members) < 300 {
lastTimestamp := int64(members[len(members)-1].Score)
for i := len(members); i < 300; i++ {
lastTimestamp += duration.Milliseconds()
missingTimes = append(missingTimes, time.UnixMilli(lastTimestamp))
}
}
return missingTimes
}
// parseDuration 解析周期字符串为time.Duration
func parseDuration(period string) time.Duration {
unit := period[len(period)-1:]
value, _ := strconv.Atoi(period[:len(period)-1])
switch unit {
case "m":
return time.Duration(value) * time.Minute
case "H":
return time.Duration(value) * time.Hour
case "D":
return time.Duration(value) * 24 * time.Hour
default:
return 0
}
}
// 统一受理发起rest请求的请求
func LoopSaveCandle(cr *core.Core) {
for {
@ -356,7 +247,7 @@ func LoopAllCoinsList(mdura int, barPeriod string, rge int) {
func main() {
cr := core.Core{}
// level := os.Getenv("TEXUS_LOGLEVEL")
logrus.SetLevel(logrus.WarnLevel)
logrus.SetLevel(logrus.DebugLevel)
cr.Init()
ShowSysTime(&cr)
// 从rest接口获取的ticker记录种的交量计入排行榜指定周期刷新一次
@ -367,57 +258,57 @@ func main() {
// 全员5m
go func() {
logrus.Info("LoopAllCoinsList - 5m")
LoopAllCoinsList(180, "5m", 50)
LoopAllCoinsList(360, "5m", 50)
}()
// 全员15m candle
go func() {
logrus.Info("LoopAllCoinsList - 15m")
LoopAllCoinsList(360, "15m", 100)
LoopAllCoinsList(720, "15m", 100)
}()
// 全员30m candle
go func() {
logrus.Info("LoopAllCoinsList - 30m")
LoopAllCoinsList(600, "30m", 150)
LoopAllCoinsList(1200, "30m", 150)
}()
// 全员1H candle
go func() {
logrus.Info("LoopAllCoinsList - 1H")
LoopAllCoinsList(900, "1H", 200)
LoopAllCoinsList(1800, "1H", 200)
}()
// 全员2H candle
go func() {
logrus.Info("LoopAllCoinsList - 2H")
LoopAllCoinsList(1200, "2H", 250)
LoopAllCoinsList(2400, "2H", 250)
}()
// 全员4小时candle
go func() {
logrus.Info("LoopAllCoinsList - 4H")
LoopAllCoinsList(1500, "4H", 300)
LoopAllCoinsList(3000, "4H", 300)
}()
// 全员6小时candle
go func() {
logrus.Info("LoopAllCoinsList - 6H")
LoopAllCoinsList(1800, "6H", 350)
LoopAllCoinsList(3600, "6H", 350)
}()
// 全员12小时candle
go func() {
logrus.Info("LoopAllCoinsList - 12H")
LoopAllCoinsList(2100, "12H", 400)
LoopAllCoinsList(4200, "12H", 400)
}()
// 全员1Day candle & maX
go func() {
logrus.Info("LoopAllCoinsList - 1D")
LoopAllCoinsList(2400, "1D", 500)
LoopAllCoinsList(4800, "1D", 500)
}()
// 全员2Day candle & maX
go func() {
logrus.Info("LoopAllCoinsList - 2D")
LoopAllCoinsList(3000, "2D", 600)
LoopAllCoinsList(6000, "2D", 600)
}()
// 全员5Day candle & maX
go func() {
logrus.Info("LoopAllCoinsList - 5D")
LoopAllCoinsList(3600, "5D", 700)
LoopAllCoinsList(7200, "5D", 700)
}()
go func() {
LoopSaveCandle(&cr)