From db66ceaf7ba3e2396a7cb94dbff09a85c9fc18a1 Mon Sep 17 00:00:00 2001 From: "zhangkun9038@dingtalk.com" <zhangkun9038@dingtalk.com> Date: Sun, 12 Jan 2025 23:42:17 +0800 Subject: [PATCH] optimize GetRangeCandleSortedSet function --- core.go | 138 ++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 88 insertions(+), 50 deletions(-) diff --git a/core.go b/core.go index a9c787d..6ed04a3 100644 --- a/core.go +++ b/core.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + // "math/rand" "io/ioutil" "net/http" @@ -688,6 +689,7 @@ func (core *Core) GetRangeMaXSortedSet(setName string, count int, from time.Time } // 根据周期的文本内容,返回这代表多少个分钟 +// 设置sortedSet的过期时间 func (cr *Core) GetExpiration(per string) (time.Duration, error) { if len(per) == 0 { @@ -697,7 +699,7 @@ func (cr *Core) GetExpiration(per string) (time.Duration, error) { return 0, err } exp, err := cr.PeriodToMinutes(per) - dur := time.Duration(exp*49) * time.Minute + dur := time.Duration(exp*300) * time.Minute return dur, err } @@ -752,71 +754,107 @@ func (cr *Core) PeriodToLastTime(period string, from time.Time) (time.Time, erro // from: 倒推的起始时间点 // ctype: candle或者maX func (core *Core) GetRangeCandleSortedSet(setName string, count int, from time.Time) (*CandleList, error) { - cdl := CandleList{} - ary1 := strings.Split(setName, "|") - ary2 := []string{} - period := "" - ary2 = strings.Split(ary1[0], "candle") - period = ary2[1] + // 初始化返回结果 + cdl := &CandleList{Count: count} - dui, err := core.PeriodToMinutes(period) + // 解析period + ary := strings.Split(setName, "|") + if len(ary) < 1 { + return nil, fmt.Errorf("invalid setName format: %s", setName) + } + period := strings.TrimPrefix(ary[0], "candle") + + // 获取period对应的分钟数 + durationMinutes, err := core.PeriodToMinutes(period) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get period minutes: %w", err) } - fromt := from.UnixMilli() - nw := time.Now().UnixMilli() - if fromt > nw*2 { - err := errors.New("时间错了需要debug") - logrus.Warning(err.Error()) - return nil, err + + // 计算时间范围 + fromTs := from.UnixMilli() + nowTs := time.Now().UnixMilli() + if fromTs > nowTs*2 { + return nil, fmt.Errorf("invalid from time: %v", from) } - froms := strconv.FormatInt(fromt, 10) - sti := fromt - dui*int64(count)*60*1000 - sts := strconv.FormatInt(sti, 10) + + // 计算查询的起始时间 + startTs := fromTs - durationMinutes*int64(count)*60*1000 + + // 清理过期数据 + if err := core.cleanExpiredData(setName, period); err != nil { + logrus.Warnf("Failed to clean expired data: %v", err) + } + + // 从Redis获取数据 + cli := core.RedisLocalCli opt := redis.ZRangeBy{ - Min: sts, - Max: froms, + Min: strconv.FormatInt(startTs, 10), + Max: strconv.FormatInt(fromTs, 10), Count: int64(count), } - ary := []string{} - extt, err := core.GetExpiration(period) - ot := time.Now().Add(extt * -1) - oti := ot.UnixMilli() - cli := core.RedisLocalCli - cli.LTrim(setName, 0, oti) - cunt, _ := cli.ZRemRangeByScore(setName, "0", strconv.FormatInt(oti, 10)).Result() - if cunt > 0 { - logrus.Warning("移出过期的引用数量:", setName, count, "ZRemRangeByScore ", setName, 0, strconv.FormatInt(oti, 10)) - } - logrus.Info("ZRevRangeByScore ", setName, opt) - ary, err = cli.ZRevRangeByScore(setName, opt).Result() + + logrus.Debugf("Querying Redis: set=%s, start=%v, end=%v", setName, time.UnixMilli(startTs), from) + + // 获取键列表 + keys, err := cli.ZRevRangeByScore(setName, opt).Result() if err != nil { - return &cdl, err + return nil, fmt.Errorf("failed to query Redis: %w", err) } - keyAry, err := cli.MGet(ary...).Result() - if err != nil || len(keyAry) == 0 { - logrus.Warning("no record with cmd: ZRevRangeByScore ", setName, froms, sts, " ", err.Error()) - logrus.Warning("zrev lens of ary: lens: ", len(ary), "GetRangeSortedSet ZRevRangeByScore:", "setName:", setName, "opt.Max:", opt.Max, "opt.Min:", opt.Min) - return &cdl, err + + if len(keys) == 0 { + logrus.Warnf("No data found for set: %s between %v and %v", setName, time.UnixMilli(startTs), from) + return cdl, nil } - for _, str := range keyAry { - if str == nil { + + // 批量获取数据 + values, err := cli.MGet(keys...).Result() + if err != nil { + return nil, fmt.Errorf("failed to get values from Redis: %w", err) + } + + // 解析数据 + for _, val := range values { + if val == nil { continue } - cd := Candle{} - err := json.Unmarshal([]byte(str.(string)), &cd) - if err != nil { - logrus.Warn(GetFuncName(), err, str.(string)) + + var candle Candle + if err := json.Unmarshal([]byte(val.(string)), &candle); err != nil { + logrus.Warnf("Failed to unmarshal candle data: %v", err) + continue } - tmi := ToInt64(cd.Data[0]) - tm := time.UnixMilli(tmi) - if tm.Sub(from) > 0 { + + // 检查时间是否在范围内 + candleTime := time.UnixMilli(ToInt64(candle.Data[0])) + if candleTime.After(from) { break } - cdl.List = append(cdl.List, &cd) + + cdl.List = append(cdl.List, &candle) } - cdl.Count = count - return &cdl, nil + + return cdl, nil +} + +// cleanExpiredData 清理过期的数据 +func (core *Core) cleanExpiredData(setName, period string) error { + expiration, err := core.GetExpiration(period) + if err != nil { + return err + } + + cli := core.RedisLocalCli + expirationTime := time.Now().Add(-expiration) + expirationTs := strconv.FormatInt(expirationTime.UnixMilli(), 10) + + // 清理过期数据 + if count, err := cli.ZRemRangeByScore(setName, "0", expirationTs).Result(); err != nil { + return err + } else if count > 0 { + logrus.Infof("Cleaned %d expired records from %s", count, setName) + } + + return nil } func (cr *Core) GetCoasterFromPlate(instID string, period string) (Coaster, error) {