From 9c9f6e40b5476cd8404e085dcf09c4c1618e1cf8 Mon Sep 17 00:00:00 2001 From: zhangkun <zhangkun> Date: Wed, 4 Dec 2024 17:14:56 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=9D=E8=AF=95=E7=BB=99=E4=B8=8A=E6=8A=A5?= =?UTF-8?q?=E5=88=B0elasticsearch=E7=9A=84=E6=96=87=E6=A1=A3=E5=8A=A0=5Fid?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/candle.go | 221 +------------------------------------------------ 1 file changed, 4 insertions(+), 217 deletions(-) diff --git a/core/candle.go b/core/candle.go index 6c42548..d0624ea 100644 --- a/core/candle.go +++ b/core/candle.go @@ -225,46 +225,14 @@ func (cl *Candle) ToStruct(core *Core) error { fmt.Println("Error parsing string to float64:", err) return err } - if cl.Data[6].(string) == 1 { - cl.Confirm = true - }else { - cl.Confirm = false + if cl.Data[6].(string) == "1" { + cl.Confirm = true + } else { + cl.Confirm = false } cl.Data = nil return nil } -func (cl *Candle) SetToKey(core *Core) ([]interface{}, error) { - data := cl.Data - tsi, err := strconv.ParseInt(data[0].(string), 10, 64) - tss := strconv.FormatInt(tsi, 10) - keyName := "candle" + cl.Period + "|" + cl.InstId + "|ts:" + tss - //过期时间:根号(当前candle的周期/1分钟)*10000 - - dt, err := json.Marshal(cl.Data) - exp := core.PeriodToMinutes(cl.Period) - // expf := float64(exp) * 60 - expf := utils.Sqrt(float64(exp)) * 100 - extt := time.Duration(expf) * time.Minute - curVolstr, _ := data[5].(string) - curVol, err := strconv.ParseFloat(curVolstr, 64) - if err != nil { - fmt.Println("err of convert ts:", err) - } - curVolCcystr, _ := data[6].(string) - curVolCcy, err := strconv.ParseFloat(curVolCcystr, 64) - curPrice := curVolCcy / curVol - if curPrice <= 0 { - fmt.Println("price有问题", curPrice, "dt: ", string(dt), "from:", cl.From) - err = errors.New("price有问题") - return cl.Data, err - } - redisCli := core.RedisCli - // tm := time.UnixMilli(tsi).Format("2006-01-02 15:04") - fmt.Println("setToKey:", keyName, "ts: ", "price: ", curPrice, "from:", cl.From) - redisCli.Set(keyName, dt, extt).Result() - core.SaveUniKey(cl.Period, keyName, extt, tsi, cl) - return cl.Data, err -} func (mx *MaX) SetToKey() ([]interface{}, error) { cstr := strconv.Itoa(mx.Count) @@ -443,10 +411,7 @@ func (core *Core) GetRangeKeyList(pattern string, from time.Time) ([]*simple.Jso return res, nil } - cl.Data = nil - return nil -} func (cl *Candle) SetToKey(core *Core) ([]interface{}, error) { data := cl.Data tsi, err := strconv.ParseInt(data[0].(string), 10, 64) @@ -479,181 +444,3 @@ func (cl *Candle) SetToKey(core *Core) ([]interface{}, error) { core.SaveUniKey(cl.Period, keyName, extt, tsi, cl) return cl.Data, err } - -func (mx *MaX) SetToKey() ([]interface{}, error) { - cstr := strconv.Itoa(mx.Count) - tss := strconv.FormatInt(mx.Ts, 10) - keyName := "ma" + cstr + "|candle" + mx.Period + "|" + mx.InstId + "|ts:" + tss - //过期时间:根号(当前candle的周期/1分钟)*10000 - dt := []interface{}{} - dt = append(dt, mx.Ts) - dt = append(dt, mx.Value) - dj, _ := json.Marshal(dt) - exp := mx.Core.PeriodToMinutes(mx.Period) - expf := utils.Sqrt(float64(exp)) * 100 - extt := time.Duration(expf) * time.Minute - // loc, _ := time.LoadLocation("Asia/Shanghai") - // tm := time.UnixMilli(mx.Ts).In(loc).Format("2006-01-02 15:04") - // fmt.Println("setToKey:", keyName, "ts:", tm, string(dj), "from: ", mx.From) - _, err := mx.Core.RedisCli.GetSet(keyName, dj).Result() - mx.Core.RedisCli.Expire(keyName, extt) - return dt, err -} - -// 保证同一个 period, keyName ,在一个周期里,SaveToSortSet只会被执行一次 -func (core *Core) SaveUniKey(period string, keyName string, extt time.Duration, tsi int64, cl *Candle) { - did := cl.InstId + cl.Period + cl.Data[0].(string) - cl.Id = HashString(did) - cl.ToStruct(core) - cd, _ := json.Marshal(cl) - wg := WriteLog{ - Content: cd, - Tag: "sardine.log.candle." + cl.Period, - Id: cl.Id, - } - core.WriteLogChan <- &wg - - refName := keyName + "|refer" - refRes, _ := core.RedisCli.GetSet(refName, 1).Result() - core.RedisCli.Expire(refName, extt) - // 为保证唯一性机制,防止SaveToSortSet 被重复执行 - if len(refRes) != 0 { - fmt.Println("refName exist: ", refName) - return - } - - core.SaveToSortSet(period, keyName, extt, tsi) -} - -// tsi: 上报时间timeStamp millinSecond -func (core *Core) SaveToSortSet(period string, keyName string, extt time.Duration, tsi int64) { - ary := strings.Split(keyName, "ts:") - setName := ary[0] + "sortedSet" - z := redis.Z{ - Score: float64(tsi), - Member: keyName, - } - rs, err := core.RedisCli.ZAdd(setName, z).Result() - if err != nil { - fmt.Println("err of ma7|ma30 add to redis:", err) - } else { - fmt.Println("sortedSet added to redis:", rs, keyName) - } -} - -func (cr *Core) PeriodToMinutes(period string) int64 { - ary := strings.Split(period, "") - beiStr := "1" - danwei := "" - if len(ary) == 3 { - beiStr = ary[0] + ary[1] - danwei = ary[2] - } else { - beiStr = ary[0] - danwei = ary[1] - } - cheng := 1 - bei, _ := strconv.Atoi(beiStr) - switch danwei { - case "m": - { - cheng = bei - break - } - case "H": - { - cheng = bei * 60 - break - } - case "D": - { - cheng = bei * 60 * 24 - break - } - case "W": - { - cheng = bei * 60 * 24 * 7 - break - } - case "M": - { - cheng = bei * 60 * 24 * 30 - break - } - case "Y": - { - cheng = bei * 60 * 24 * 365 - break - } - default: - { - fmt.Println("notmatch:", danwei) - } - } - return int64(cheng) -} - -// type ScanCmd struct { -// baseCmd -// -// page []string -// cursor uint64 -// -// process func(cmd Cmder) error -// } -func (core *Core) GetRangeKeyList(pattern string, from time.Time) ([]*simple.Json, error) { - // 比如,用来计算ma30或ma7,倒推多少时间范围, - redisCli := core.RedisCli - cursor := uint64(0) - n := 0 - allTs := []int64{} - var keys []string - for { - var err error - keys, cursor, _ = redisCli.Scan(cursor, pattern+"*", 2000).Result() - if err != nil { - panic(err) - } - n += len(keys) - if n == 0 { - break - } - } - - // keys, _ := redisCli.Keys(pattern + "*").Result() - for _, key := range keys { - keyAry := strings.Split(key, ":") - key = keyAry[1] - keyi64, _ := strconv.ParseInt(key, 10, 64) - allTs = append(allTs, keyi64) - } - nary := utils.RecursiveBubble(allTs, len(allTs)) - tt := from.UnixMilli() - ff := tt - tt%60000 - fi := int64(ff) - mary := []int64{} - for _, v := range nary { - if v < fi { - break - } - mary = append(mary, v) - } - res := []*simple.Json{} - for _, v := range mary { - // if k > 1 { - // break - // } - nv := pattern + strconv.FormatInt(v, 10) - str, err := redisCli.Get(nv).Result() - if err != nil { - fmt.Println("err of redis get key:", nv, err) - } - cur, err := simple.NewJson([]byte(str)) - if err != nil { - fmt.Println("err of create newJson:", str, err) - } - res = append(res, cur) - } - - return res, nil -}