optimize GetRangeCandleSortedSet function

This commit is contained in:
zhangkun9038@dingtalk.com 2025-01-12 23:42:17 +08:00
parent 05a1c4529c
commit db66ceaf7b

138
core.go
View File

@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
// "math/rand" // "math/rand"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
@ -688,6 +689,7 @@ func (core *Core) GetRangeMaXSortedSet(setName string, count int, from time.Time
} }
// 根据周期的文本内容,返回这代表多少个分钟 // 根据周期的文本内容,返回这代表多少个分钟
// 设置sortedSet的过期时间
func (cr *Core) GetExpiration(per string) (time.Duration, error) { func (cr *Core) GetExpiration(per string) (time.Duration, error) {
if len(per) == 0 { if len(per) == 0 {
@ -697,7 +699,7 @@ func (cr *Core) GetExpiration(per string) (time.Duration, error) {
return 0, err return 0, err
} }
exp, err := cr.PeriodToMinutes(per) exp, err := cr.PeriodToMinutes(per)
dur := time.Duration(exp*49) * time.Minute dur := time.Duration(exp*300) * time.Minute
return dur, err return dur, err
} }
@ -752,71 +754,107 @@ func (cr *Core) PeriodToLastTime(period string, from time.Time) (time.Time, erro
// from: 倒推的起始时间点 // from: 倒推的起始时间点
// ctype: candle或者maX // ctype: candle或者maX
func (core *Core) GetRangeCandleSortedSet(setName string, count int, from time.Time) (*CandleList, error) { func (core *Core) GetRangeCandleSortedSet(setName string, count int, from time.Time) (*CandleList, error) {
cdl := CandleList{} // 初始化返回结果
ary1 := strings.Split(setName, "|") cdl := &CandleList{Count: count}
ary2 := []string{}
period := ""
ary2 = strings.Split(ary1[0], "candle")
period = ary2[1]
dui, err := core.PeriodToMinutes(period) // 解析period
ary := strings.Split(setName, "|")
if len(ary) < 1 {
return nil, fmt.Errorf("invalid setName format: %s", setName)
}
period := strings.TrimPrefix(ary[0], "candle")
// 获取period对应的分钟数
durationMinutes, err := core.PeriodToMinutes(period)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("failed to get period minutes: %w", err)
} }
fromt := from.UnixMilli()
nw := time.Now().UnixMilli() // 计算时间范围
if fromt > nw*2 { fromTs := from.UnixMilli()
err := errors.New("时间错了需要debug") nowTs := time.Now().UnixMilli()
logrus.Warning(err.Error()) if fromTs > nowTs*2 {
return nil, err return nil, fmt.Errorf("invalid from time: %v", from)
} }
froms := strconv.FormatInt(fromt, 10)
sti := fromt - dui*int64(count)*60*1000 // 计算查询的起始时间
sts := strconv.FormatInt(sti, 10) startTs := fromTs - durationMinutes*int64(count)*60*1000
// 清理过期数据
if err := core.cleanExpiredData(setName, period); err != nil {
logrus.Warnf("Failed to clean expired data: %v", err)
}
// 从Redis获取数据
cli := core.RedisLocalCli
opt := redis.ZRangeBy{ opt := redis.ZRangeBy{
Min: sts, Min: strconv.FormatInt(startTs, 10),
Max: froms, Max: strconv.FormatInt(fromTs, 10),
Count: int64(count), Count: int64(count),
} }
ary := []string{}
extt, err := core.GetExpiration(period) logrus.Debugf("Querying Redis: set=%s, start=%v, end=%v", setName, time.UnixMilli(startTs), from)
ot := time.Now().Add(extt * -1)
oti := ot.UnixMilli() // 获取键列表
cli := core.RedisLocalCli keys, err := cli.ZRevRangeByScore(setName, opt).Result()
cli.LTrim(setName, 0, oti)
cunt, _ := cli.ZRemRangeByScore(setName, "0", strconv.FormatInt(oti, 10)).Result()
if cunt > 0 {
logrus.Warning("移出过期的引用数量:", setName, count, "ZRemRangeByScore ", setName, 0, strconv.FormatInt(oti, 10))
}
logrus.Info("ZRevRangeByScore ", setName, opt)
ary, err = cli.ZRevRangeByScore(setName, opt).Result()
if err != nil { if err != nil {
return &cdl, err return nil, fmt.Errorf("failed to query Redis: %w", err)
} }
keyAry, err := cli.MGet(ary...).Result()
if err != nil || len(keyAry) == 0 { if len(keys) == 0 {
logrus.Warning("no record with cmd: ZRevRangeByScore ", setName, froms, sts, " ", err.Error()) logrus.Warnf("No data found for set: %s between %v and %v", setName, time.UnixMilli(startTs), from)
logrus.Warning("zrev lens of ary: lens: ", len(ary), "GetRangeSortedSet ZRevRangeByScore:", "setName:", setName, "opt.Max:", opt.Max, "opt.Min:", opt.Min) return cdl, nil
return &cdl, err
} }
for _, str := range keyAry {
if str == nil { // 批量获取数据
values, err := cli.MGet(keys...).Result()
if err != nil {
return nil, fmt.Errorf("failed to get values from Redis: %w", err)
}
// 解析数据
for _, val := range values {
if val == nil {
continue continue
} }
cd := Candle{}
err := json.Unmarshal([]byte(str.(string)), &cd) var candle Candle
if err != nil { if err := json.Unmarshal([]byte(val.(string)), &candle); err != nil {
logrus.Warn(GetFuncName(), err, str.(string)) logrus.Warnf("Failed to unmarshal candle data: %v", err)
continue
} }
tmi := ToInt64(cd.Data[0])
tm := time.UnixMilli(tmi) // 检查时间是否在范围内
if tm.Sub(from) > 0 { candleTime := time.UnixMilli(ToInt64(candle.Data[0]))
if candleTime.After(from) {
break break
} }
cdl.List = append(cdl.List, &cd)
cdl.List = append(cdl.List, &candle)
} }
cdl.Count = count
return &cdl, nil return cdl, nil
}
// cleanExpiredData 清理过期的数据
func (core *Core) cleanExpiredData(setName, period string) error {
expiration, err := core.GetExpiration(period)
if err != nil {
return err
}
cli := core.RedisLocalCli
expirationTime := time.Now().Add(-expiration)
expirationTs := strconv.FormatInt(expirationTime.UnixMilli(), 10)
// 清理过期数据
if count, err := cli.ZRemRangeByScore(setName, "0", expirationTs).Result(); err != nil {
return err
} else if count > 0 {
logrus.Infof("Cleaned %d expired records from %s", count, setName)
}
return nil
} }
func (cr *Core) GetCoasterFromPlate(instID string, period string) (Coaster, error) { func (cr *Core) GetCoasterFromPlate(instID string, period string) (Coaster, error) {