package model import ( "crypto/sha256" "encoding/hex" "encoding/json" "errors" // "reflect" // "fmt" "math/rand" "os" "strconv" "strings" "time" simple "github.com/bitly/go-simplejson" "github.com/go-redis/redis" "github.com/phyer/texus/utils" logrus "github.com/sirupsen/logrus" ) // 定义核心服务接口 type CoreService interface { GetCfg() *simple.Json GetRedisLocalCli() *redis.Client AddToGeneralCandleChnl(candle *Candle, channels []string) PeriodToMinutes(period string) (int64, error) // 其他需要的方法... } type Candle struct { Id string `json:"_id"` Core CoreService // 改为接口类型 InstID string `json:"instID"` Period string `json:"period"` Data []interface{} From string `json:"from"` Timestamp time.Time `json:"timeStamp"` LastUpdate time.Time `json:"lastUpdate"` Open float64 `json:"open"` High float64 `json:"high"` Low float64 `json:"low"` Close float64 `json:"close"` VolCcy float64 `json:"volCcy"` Confirm bool `json:"confirm"` } type Sample interface { SetToKey(cr *CoreService) ([]interface{}, error) } type SampleList interface { // 从左边插入一个元素,把超过长度的元素顶出去 RPush(sp Sample) (Sample, error) // 得到一个切片,end一般是0,代表末尾元素,start是负值,-3代表倒数第三个 // start:-10, end: -3 代表从倒数第10个到倒数第三个之间的元素组成的切片。 GetSectionOf(start int, end int) ([]*Sample, error) } type CandleList struct { Count int `json:"count,number"` LastUpdateTime int64 `json:"lastUpdateTime"` UpdateNickName string `json:"updateNickName"` List []*Candle `json:"list"` } type CandleSegment struct { StartTime string `json:"startTime"` Enabled bool `json:"enabled,bool"` Seg string `json:"seg"` } type MatchCheck struct { Minutes int64 Matched bool } func (cd *Candle) Filter(cr *CoreService) bool { myFocusList := cr.Cfg.Config.Get("focusList").MustArray() founded := false for _, v := range myFocusList { if v.(string) == cd.InstID { founded = true break } } return founded } func (mc *MatchCheck) SetMatched(value bool) { mc.Matched = value } func (candle *Candle) PushToWriteLogChan(cr *Core) error { did := candle.InstID + candle.Period + candle.Data[0].(string) candle.Id = HashString(did) cl, _ := candle.ToStruct(cr) logrus.Debug("cl: ", cl) cd, err := json.Marshal(cl) if err != nil { logrus.Error("PushToWriteLog json marshal candle err: ", err) } candle = cl wg := WriteLog{ Content: cd, Tag: "sardine.log.candle." + candle.Period, Id: candle.Id, } cr.WriteLogChan <- &wg return nil } func Daoxu(arr []interface{}) { var temp interface{} length := len(arr) for i := 0; i < length/2; i++ { temp = arr[i] arr[i] = arr[length-1-i] arr[length-1-i] = temp } } func HashString(input string) string { // 计算SHA-256哈希值 hash := sha256.Sum256([]byte(input)) // 转换为十六进制字符串 hashHex := hex.EncodeToString(hash[:]) // 返回前20位 return hashHex[:23] } func (cl *Candle) ToStruct(core *Core) (*Candle, error) { // cl.Timestamp // 将字符串转换为 int64 类型的时间戳 ts, err := strconv.ParseInt(cl.Data[0].(string), 10, 64) if err != nil { logrus.Error("Error parsing timestamp:", err) return nil, err } cl.Timestamp = time.Unix(ts/1000, (ts%1000)*1000000) // 纳秒级别 op, err := strconv.ParseFloat(cl.Data[1].(string), 64) if err != nil { logrus.Error("Error parsing string to float64:", err) return nil, err } cl.Open = op hi, err := strconv.ParseFloat(cl.Data[2].(string), 64) if err != nil { logrus.Error("Error parsing string to float64:", err) return nil, err } cl.High = hi lo, err := strconv.ParseFloat(cl.Data[3].(string), 64) if err != nil { logrus.Error("Error parsing string to float64:", err) return nil, err } cl.Low = lo clse, err := strconv.ParseFloat(cl.Data[4].(string), 64) if err != nil { logrus.Error("Error parsing string to float64:", err) return nil, err } cl.Close = clse cl.VolCcy, err = strconv.ParseFloat(cl.Data[6].(string), 64) if err != nil { logrus.Error("Error parsing string to float64:", err) return nil, err } if cl.Data[6].(string) == "1" { cl.Confirm = true } else { cl.Confirm = false } return cl, nil } // 保证同一个 period, keyName ,在一个周期里,SaveToSortSet只会被执行一次 func (core *Core) SaveUniKey(period string, keyName string, extt time.Duration, tsi int64) { refName := keyName + "|refer" // refRes, _ := core.RedisLocalCli.GetSet(refName, 1).Result() core.RedisLocalCli.Expire(refName, extt) // 为保证唯一性机制,防止SaveToSortSet 被重复执行, ps: 不需要唯一,此操作幂等在redis里 // founded, _ := core.findInSortSet(period, keyName, extt, tsi) // if len(refRes) != 0 { // logrus.Error("refName exist: ", refName) // return // } core.SaveToSortSet(period, keyName, extt, tsi) } func (core *Core) findInSortSet(period string, keyName string, extt time.Duration, tsi int64) (bool, error) { founded := false ary := strings.Split(keyName, "ts:") setName := ary[0] + "sortedSet" opt := redis.ZRangeBy{ Min: ToString(tsi), Max: ToString(tsi), } rs, err := core.RedisLocalCli.ZRangeByScore(setName, opt).Result() if len(rs) > 0 { founded = true } if err != nil { logrus.Error("err of ma7|ma30 add to redis:", err) } else { logrus.Info("sortedSet added to redis:", rs, keyName) } return founded, nil } // tsi: 上报时间timeStamp millinSecond func (core *Core) SaveToSortSet(period string, keyName string, extt time.Duration, tsi int64) { ary := strings.Split(keyName, "ts:") setName := ary[0] + "sortedSet" z := redis.Z{ Score: float64(tsi), Member: keyName, } rs, err := core.RedisLocalCli.ZAdd(setName, z).Result() if err != nil { logrus.Warn("err of ma7|ma30 add to redis:", err) } else { logrus.Warn("sortedSet added to redis:", rs, keyName) } } // 根据周期的文本内容,返回这代表多少个分钟 func (cr *Core) PeriodToMinutes(period string) (int64, error) { ary := strings.Split(period, "") beiStr := "1" danwei := "" if len(ary) == 0 { err := errors.New(utils.GetFuncName() + " period is block") return 0, err } if len(ary) == 3 { beiStr = ary[0] + ary[1] danwei = ary[2] } else { beiStr = ary[0] danwei = ary[1] } cheng := 1 bei, _ := strconv.Atoi(beiStr) switch danwei { case "m": { cheng = bei break } case "H": { cheng = bei * 60 break } case "D": { cheng = bei * 60 * 24 break } case "W": { cheng = bei * 60 * 24 * 7 break } case "M": { cheng = bei * 60 * 24 * 30 break } case "Y": { cheng = bei * 60 * 24 * 365 break } default: { logrus.Warning("notmatch:", danwei, period) panic("notmatch:" + period) } } return int64(cheng), nil } // type ScanCmd struct { // baseCmd // // page []string // cursor uint64 // // process func(cmd Cmder) error // } func (core *Core) GetRangeKeyList(pattern string, from time.Time) ([]*simple.Json, error) { // 比如,用来计算ma30或ma7,倒推多少时间范围, redisCli := core.RedisLocalCli cursor := uint64(0) n := 0 allTs := []int64{} var keys []string for { var err error keys, cursor, _ = redisCli.Scan(cursor, pattern+"*", 2000).Result() if err != nil { panic(err) } n += len(keys) if n == 0 { break } } // keys, _ := redisCli.Keys(pattern + "*").Result() for _, key := range keys { keyAry := strings.Split(key, ":") key = keyAry[1] keyi64, _ := strconv.ParseInt(key, 10, 64) allTs = append(allTs, keyi64) } nary := utils.RecursiveBubble(allTs, len(allTs)) tt := from.UnixMilli() ff := tt - tt%60000 fi := int64(ff) mary := []int64{} for _, v := range nary { if v < fi { break } mary = append(mary, v) } res := []*simple.Json{} for _, v := range mary { // if k > 1 { // break // } nv := pattern + strconv.FormatInt(v, 10) str, err := redisCli.Get(nv).Result() if err != nil { logrus.Error("err of redis get key:", nv, err) } cur, err := simple.NewJson([]byte(str)) if err != nil { logrus.Error("err of create newJson:", str, err) } res = append(res, cur) } return res, nil } func (cl *Candle) SetToKey(core *Core) ([]interface{}, error) { data := cl.Data tsi, err := strconv.ParseInt(data[0].(string), 10, 64) tss := strconv.FormatInt(tsi, 10) tm, _ := Int64ToTime(tsi) keyName := "candle" + cl.Period + "|" + cl.InstID + "|ts:" + tss //过期时间:根号(当前candle的周期/1分钟)*10000 cl.LastUpdate = time.Now() cl.Timestamp = tm dt, err := json.Marshal(cl) if err != nil { logrus.Error("candle Save to String err:", err) } logrus.Info("candle Save to String: ", string(dt)) exp, err := core.PeriodToMinutes(cl.Period) if err != nil { logrus.Error("err of PeriodToMinutes:", err) } // expf := float64(exp) * 60 length, _ := core.Cfg.Config.Get("sortedSet").Get("length").String() ln := ToFloat64(length) expf := float64(exp) * ln extt := time.Duration(expf) * time.Minute curVolstr, _ := data[5].(string) curVol, err := strconv.ParseFloat(curVolstr, 64) if err != nil { logrus.Error("err of convert ts:", err) } curVolCcystr, _ := data[6].(string) curVolCcy, err := strconv.ParseFloat(curVolCcystr, 64) curPrice := curVolCcy / curVol if curPrice <= 0 { logrus.Error("price有问题", curPrice, "dt: ", string(dt), "from:", cl.From) err = errors.New("price有问题") return cl.Data, err } redisCli := core.RedisLocalCli // tm := time.UnixMilli(tsi).Format("2006-01-02 15:04") logrus.Info("setToKey:", keyName, "ts: ", "price: ", curPrice, "from:", cl.From) redisCli.Set(keyName, dt, extt).Result() core.SaveUniKey(cl.Period, keyName, extt, tsi) return cl.Data, err } // 冒泡排序 func (cdl *CandleList) RecursiveBubbleS(length int, ctype string) error { if length == 0 { return nil } for idx, _ := range cdl.List { if idx >= length-1 { break } temp := Candle{} pre := ToInt64(cdl.List[idx].Data[0]) nex := ToInt64(cdl.List[idx+1].Data[0]) daoxu := pre < nex if ctype == "asc" { daoxu = !daoxu } if daoxu { //改变成>,换成从小到大排序 temp = *cdl.List[idx] cdl.List[idx] = cdl.List[idx+1] cdl.List[idx+1] = &temp } } length-- cdl.RecursiveBubbleS(length, ctype) return nil } // TODO 返回的Sample是被弹出队列的元素,如果没有就是nil func (cdl *CandleList) RPush(sp *Candle) (Sample, error) { last := Candle{} tsi := ToInt64(sp.Data[0]) matched := false // bj, _ := json.Marshal(*sp) cdl.RecursiveBubbleS(len(cdl.List), "asc") for k, v := range cdl.List { if ToInt64(v.Data[0]) == tsi { matched = true cdl.List[k] = sp bj, err := json.Marshal(sp) if err != nil { logrus.Warning("err of convert cdl item:", err) } logrus.Debug("candleList RPush replace: ", string(bj), "v.Data[0]: ", v.Data[0], "tsi:", tsi) } } if matched { return nil, nil } if len(cdl.List) >= cdl.Count { last = *cdl.List[0] cdl.List = cdl.List[1:] cdl.List = append(cdl.List, sp) bj, err := json.Marshal(sp) logrus.Debug("candleList RPush popup: ", string(bj), "len(cdl.List): ", len(cdl.List), "cdl.Count:", cdl.Count) return &last, err } else { cdl.List = append(cdl.List, sp) bj, err := json.Marshal(sp) logrus.Debug("candleList RPush insert: ", string(bj), "len(cdl.List): ", len(cdl.List), "cdl.Count:", cdl.Count) return nil, err } }