diff --git a/core/candle.go b/core/candle.go index bb1a2c0..6c42548 100644 --- a/core/candle.go +++ b/core/candle.go @@ -30,6 +30,8 @@ type Candle struct { High float64 Low float64 Close float64 + VolCcy float64 + Confirm bool } type MaX struct { @@ -218,6 +220,229 @@ func (cl *Candle) ToStruct(core *Core) error { return err } cl.Close = clse + cl.VolCcy, err = strconv.ParseFloat(cl.Data[6].(string), 64) + if err != nil { + fmt.Println("Error parsing string to float64:", err) + return err + } + if cl.Data[6].(string) == 1 { + cl.Confirm = true + }else { + cl.Confirm = false + } + cl.Data = nil + return nil +} +func (cl *Candle) SetToKey(core *Core) ([]interface{}, error) { + data := cl.Data + tsi, err := strconv.ParseInt(data[0].(string), 10, 64) + tss := strconv.FormatInt(tsi, 10) + keyName := "candle" + cl.Period + "|" + cl.InstId + "|ts:" + tss + //过期时间:根号(当前candle的周期/1分钟)*10000 + + dt, err := json.Marshal(cl.Data) + exp := core.PeriodToMinutes(cl.Period) + // expf := float64(exp) * 60 + expf := utils.Sqrt(float64(exp)) * 100 + extt := time.Duration(expf) * time.Minute + curVolstr, _ := data[5].(string) + curVol, err := strconv.ParseFloat(curVolstr, 64) + if err != nil { + fmt.Println("err of convert ts:", err) + } + curVolCcystr, _ := data[6].(string) + curVolCcy, err := strconv.ParseFloat(curVolCcystr, 64) + curPrice := curVolCcy / curVol + if curPrice <= 0 { + fmt.Println("price有问题", curPrice, "dt: ", string(dt), "from:", cl.From) + err = errors.New("price有问题") + return cl.Data, err + } + redisCli := core.RedisCli + // tm := time.UnixMilli(tsi).Format("2006-01-02 15:04") + fmt.Println("setToKey:", keyName, "ts: ", "price: ", curPrice, "from:", cl.From) + redisCli.Set(keyName, dt, extt).Result() + core.SaveUniKey(cl.Period, keyName, extt, tsi, cl) + return cl.Data, err +} + +func (mx *MaX) SetToKey() ([]interface{}, error) { + cstr := strconv.Itoa(mx.Count) + tss := strconv.FormatInt(mx.Ts, 10) + keyName := "ma" + cstr + "|candle" + mx.Period + "|" + mx.InstId + "|ts:" + tss + //过期时间:根号(当前candle的周期/1分钟)*10000 + dt := []interface{}{} + dt = append(dt, mx.Ts) + dt = append(dt, mx.Value) + dj, _ := json.Marshal(dt) + exp := mx.Core.PeriodToMinutes(mx.Period) + expf := utils.Sqrt(float64(exp)) * 100 + extt := time.Duration(expf) * time.Minute + // loc, _ := time.LoadLocation("Asia/Shanghai") + // tm := time.UnixMilli(mx.Ts).In(loc).Format("2006-01-02 15:04") + // fmt.Println("setToKey:", keyName, "ts:", tm, string(dj), "from: ", mx.From) + _, err := mx.Core.RedisCli.GetSet(keyName, dj).Result() + mx.Core.RedisCli.Expire(keyName, extt) + return dt, err +} + +// 保证同一个 period, keyName ,在一个周期里,SaveToSortSet只会被执行一次 +func (core *Core) SaveUniKey(period string, keyName string, extt time.Duration, tsi int64, cl *Candle) { + did := cl.InstId + cl.Period + cl.Data[0].(string) + cl.Id = HashString(did) + cl.ToStruct(core) + cd, _ := json.Marshal(cl) + wg := WriteLog{ + Content: cd, + Tag: "sardine.log.candle." + cl.Period, + Id: cl.Id, + } + core.WriteLogChan <- &wg + + refName := keyName + "|refer" + refRes, _ := core.RedisCli.GetSet(refName, 1).Result() + core.RedisCli.Expire(refName, extt) + // 为保证唯一性机制,防止SaveToSortSet 被重复执行 + if len(refRes) != 0 { + fmt.Println("refName exist: ", refName) + return + } + + core.SaveToSortSet(period, keyName, extt, tsi) +} + +// tsi: 上报时间timeStamp millinSecond +func (core *Core) SaveToSortSet(period string, keyName string, extt time.Duration, tsi int64) { + ary := strings.Split(keyName, "ts:") + setName := ary[0] + "sortedSet" + z := redis.Z{ + Score: float64(tsi), + Member: keyName, + } + rs, err := core.RedisCli.ZAdd(setName, z).Result() + if err != nil { + fmt.Println("err of ma7|ma30 add to redis:", err) + } else { + fmt.Println("sortedSet added to redis:", rs, keyName) + } +} + +func (cr *Core) PeriodToMinutes(period string) int64 { + ary := strings.Split(period, "") + beiStr := "1" + danwei := "" + if len(ary) == 3 { + beiStr = ary[0] + ary[1] + danwei = ary[2] + } else { + beiStr = ary[0] + danwei = ary[1] + } + cheng := 1 + bei, _ := strconv.Atoi(beiStr) + switch danwei { + case "m": + { + cheng = bei + break + } + case "H": + { + cheng = bei * 60 + break + } + case "D": + { + cheng = bei * 60 * 24 + break + } + case "W": + { + cheng = bei * 60 * 24 * 7 + break + } + case "M": + { + cheng = bei * 60 * 24 * 30 + break + } + case "Y": + { + cheng = bei * 60 * 24 * 365 + break + } + default: + { + fmt.Println("notmatch:", danwei) + } + } + return int64(cheng) +} + +// type ScanCmd struct { +// baseCmd +// +// page []string +// cursor uint64 +// +// process func(cmd Cmder) error +// } +func (core *Core) GetRangeKeyList(pattern string, from time.Time) ([]*simple.Json, error) { + // 比如,用来计算ma30或ma7,倒推多少时间范围, + redisCli := core.RedisCli + cursor := uint64(0) + n := 0 + allTs := []int64{} + var keys []string + for { + var err error + keys, cursor, _ = redisCli.Scan(cursor, pattern+"*", 2000).Result() + if err != nil { + panic(err) + } + n += len(keys) + if n == 0 { + break + } + } + + // keys, _ := redisCli.Keys(pattern + "*").Result() + for _, key := range keys { + keyAry := strings.Split(key, ":") + key = keyAry[1] + keyi64, _ := strconv.ParseInt(key, 10, 64) + allTs = append(allTs, keyi64) + } + nary := utils.RecursiveBubble(allTs, len(allTs)) + tt := from.UnixMilli() + ff := tt - tt%60000 + fi := int64(ff) + mary := []int64{} + for _, v := range nary { + if v < fi { + break + } + mary = append(mary, v) + } + res := []*simple.Json{} + for _, v := range mary { + // if k > 1 { + // break + // } + nv := pattern + strconv.FormatInt(v, 10) + str, err := redisCli.Get(nv).Result() + if err != nil { + fmt.Println("err of redis get key:", nv, err) + } + cur, err := simple.NewJson([]byte(str)) + if err != nil { + fmt.Println("err of create newJson:", str, err) + } + res = append(res, cur) + } + + return res, nil +} cl.Data = nil return nil diff --git a/tunas b/tunas index 167bbc9..aae3c01 100755 Binary files a/tunas and b/tunas differ