diff --git a/.gitignore b/.gitignore index 539b4cd..1f4160c 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ submodules/ +vendor/ diff --git a/candle.go b/candle.go index adbd044..40bb48b 100644 --- a/candle.go +++ b/candle.go @@ -477,3 +477,67 @@ func (cl *Candle) SetToKey(core *Core) ([]interface{}, error) { core.SaveUniKey(cl.Period, keyName, extt, tsi) return cl.Data, err } + +// 冒泡排序 +func (cdl *CandleList) RecursiveBubbleS(length int, ctype string) error { + if length == 0 { + return nil + } + + for idx, _ := range cdl.List { + if idx >= length-1 { + break + } + temp := Candle{} + pre := ToInt64(cdl.List[idx].Data[0]) + nex := ToInt64(cdl.List[idx+1].Data[0]) + daoxu := pre < nex + if ctype == "asc" { + daoxu = !daoxu + } + if daoxu { //改变成>,换成从小到大排序 + temp = *cdl.List[idx] + cdl.List[idx] = cdl.List[idx+1] + cdl.List[idx+1] = &temp + } + } + length-- + cdl.RecursiveBubbleS(length, ctype) + return nil +} + +// TODO 返回的Sample是被弹出队列的元素,如果没有就是nil +func (cdl *CandleList) RPush(sp *Candle) (Sample, error) { + last := Candle{} + tsi := ToInt64(sp.Data[0]) + matched := false + // bj, _ := json.Marshal(*sp) + cdl.RecursiveBubbleS(len(cdl.List), "asc") + for k, v := range cdl.List { + if ToInt64(v.Data[0]) == tsi { + matched = true + cdl.List[k] = sp + bj, err := json.Marshal(sp) + if err != nil { + logrus.Warning("err of convert cdl item:", err) + } + logrus.Debug("candleList RPush replace: ", string(bj), "v.Data[0]: ", v.Data[0], "tsi:", tsi) + } + } + if matched { + return nil, nil + } + if len(cdl.List) >= cdl.Count { + last = *cdl.List[0] + cdl.List = cdl.List[1:] + cdl.List = append(cdl.List, sp) + bj, err := json.Marshal(sp) + logrus.Debug("candleList RPush popup: ", string(bj), "len(cdl.List): ", len(cdl.List), "cdl.Count:", cdl.Count) + return &last, err + } else { + cdl.List = append(cdl.List, sp) + bj, err := json.Marshal(sp) + logrus.Debug("candleList RPush insert: ", string(bj), "len(cdl.List): ", len(cdl.List), "cdl.Count:", cdl.Count) + return nil, err + } +} diff --git a/coaster.go b/coaster.go new file mode 100644 index 0000000..62e9f2c --- /dev/null +++ b/coaster.go @@ -0,0 +1,176 @@ +package core + +import ( + "encoding/json" + "errors" + "fmt" + logrus "github.com/sirupsen/logrus" + "os" + "strconv" + "time" +) + +// TODO 目前没有实现tickerInfo,用一分钟维度的candle代替, 后续如果订阅ws的话,ticker就不用了,也还是直接用candle1m就够了 +type Coaster struct { + InstID string `json:"instID"` + Period string `json:"period"` + Count int `json:"count"` + Scale float64 `json:"scale"` + LastUpdateTime int64 `json:"lastUpdateTime"` + UpdateNickName string `json:"updateNickName"` + CandleList CandleList `json:"candleList"` + Ma7List MaXList `json:"ma7List"` + Ma30List MaXList `json:"ma30List"` +} + +type CoasterInfo struct { + InstID string + Period string + InsertedNew bool +} + +func (co Coaster) RPushSample(cr *Core, sp Sample, ctype string) (*Sample, error) { + cd := Candle{} + spjs, _ := json.Marshal(sp) + logrus.Debug("RPushSample spjs: ", string(spjs)) + if ctype == "candle" { + json.Unmarshal(spjs, &cd) + cd.Data[0] = cd.Data[0] + cd.Data[1], _ = strconv.ParseFloat(cd.Data[1].(string), 64) + cd.Data[2], _ = strconv.ParseFloat(cd.Data[2].(string), 64) + cd.Data[3], _ = strconv.ParseFloat(cd.Data[3].(string), 64) + cd.Data[4], _ = strconv.ParseFloat(cd.Data[4].(string), 64) + cd.Data[5], _ = strconv.ParseFloat(cd.Data[5].(string), 64) + cd.Data[6], _ = strconv.ParseFloat(cd.Data[6].(string), 64) + sm, err := co.CandleList.RPush(&cd) + if err == nil { + now := time.Now().UnixMilli() + co.LastUpdateTime = now + co.CandleList.LastUpdateTime = now + co.UpdateNickName = utils.GetRandomString(12) + co.CandleList.UpdateNickName = utils.GetRandomString(12) + } + return &sm, err + } + mx := MaX{} + if ctype == "ma7" { + json.Unmarshal(spjs, &mx) + sm, err := co.Ma7List.RPush(&mx) + if err == nil { + now := time.Now().UnixMilli() + co.LastUpdateTime = now + co.Ma7List.UpdateNickName = utils.GetRandomString(12) + co.Ma7List.LastUpdateTime = now + } + return &sm, err + } + if ctype == "ma30" { + json.Unmarshal(spjs, &mx) + sm, err := co.Ma30List.RPush(&mx) + // bj, _ := json.Marshal(co) + if err == nil { + now := time.Now().UnixMilli() + co.LastUpdateTime = now + co.Ma30List.UpdateNickName = utils.GetRandomString(12) + co.Ma30List.LastUpdateTime = now + } + return &sm, err + } + return nil, nil +} + +func (co *Coaster) SetToKey(cr *Core) (string, error) { + co.CandleList.RecursiveBubbleS(len(co.CandleList.List), "asc") + co.Ma7List.RecursiveBubbleS(len(co.Ma7List.List), "asc") + co.Ma30List.RecursiveBubbleS(len(co.Ma30List.List), "asc") + js, _ := json.Marshal(*co) + coasterName := co.InstID + "|" + co.Period + "|coaster" + res, err := cr.RedisLocalCli.Set(coasterName, string(js), 0).Result() + return res, err +} + +func (coi *CoasterInfo) Process(cr *Core) { + curCo, _ := cr.GetCoasterFromPlate(coi.InstID, coi.Period) + go func(co Coaster) { + //这里执行:创建一个tray对象,用现有的co的数据计算和填充其listMap + // TODO 发到一个channel里来执行下面的任务, + allow := os.Getenv("SARDINE_MAKESERIES") == "true" + if !allow { + return + } + srs, err := co.UpdateTray(cr) + if err != nil || srs == nil { + logrus.Warn("tray err: ", err) + return + } + _, err = srs.SetToKey(cr) + if err != nil { + logrus.Warn("srs SetToKey err: ", err) + return + } + //实例化完一个tray之后,拿着这个tray去执行Analytics方法 + // + srsinfo := SeriesInfo{ + InstID: curCo.InstID, + Period: curCo.Period, + } + + cr.SeriesChan <- &srsinfo + }(curCo) + + go func(co Coaster) { + // 每3次会有一次触发缓存落盘 + // run := utils.Shaizi(3) + // if run { + _, err := co.SetToKey(cr) + if err != nil { + logrus.Warn("coaster process err: ", err) + fmt.Println("coaster SetToKey err: ", err) + } + // } + + }(curCo) +} + +// TODO 类似于InsertIntoPlate函数,照猫画虎就行了 +func (co *Coaster) UpdateTray(cr *Core) (*Series, error) { + cr.Mu1.Lock() + defer cr.Mu1.Unlock() + //尝试从内存读取tray对象 + tr, trayFounded := cr.TrayMap[co.InstID] + if !trayFounded { + tr1, err := co.LoadTray(cr) + if err != nil { + return nil, err + } + cr.TrayMap[co.InstID] = tr1 + tr = tr1 + } + srs, seriesFounded := tr.SeriesMap["period"+co.Period] + err := errors.New("") + if !seriesFounded { + srs1, err := tr.NewSeries(cr, co.Period) + if err != nil { + return nil, err + } + tr.SeriesMap["period"+co.Period] = srs1 + } else { + err = srs.Refresh(cr) + } + // if err == nil { + // bj, _ := json.Marshal(srs) + // logrus.Debug("series:,string"(bj)) + // } + return srs, err +} + +// TODO +func (co *Coaster) LoadTray(cr *Core) (*Tray, error) { + tray := Tray{} + tray.Init(co.InstID) + prs := cr.Cfg.Config.Get("candleDimentions").MustArray() + for _, v := range prs { + tray.NewSeries(cr, v.(string)) + } + return &tray, nil +} diff --git a/core.go b/core.go index 831a66a..d4d87c4 100644 --- a/core.go +++ b/core.go @@ -23,13 +23,13 @@ import ( ) type Core struct { - Env string - Cfg *MyConfig - RedisLocalCli *redis.Client - RedisRemoteCli *redis.Client - FluentBitUrl string - // PlateMap map[string]*Plate - // TrayMap map[string]*Tray + Env string + Cfg *MyConfig + RedisLocalCli *redis.Client + RedisRemoteCli *redis.Client + FluentBitUrl string + PlateMap map[string]*Plate + TrayMap map[string]*Tray CoasterMd5SyncMap sync.Map Mu *sync.Mutex Mu1 *sync.Mutex @@ -714,3 +714,88 @@ func (cr *Core) PeriodToLastTime(period string, from time.Time) (time.Time, erro // fmt.Println("PeriodToLastTime: period: ", period, " lastTime:", om.Format("2006-01-02 15:04:05.000")) return om, nil } + +// setName := "candle" + period + "|" + instId + "|sortedSet" +// count: 倒推多少个周期开始拿数据 +// from: 倒推的起始时间点 +// ctype: candle或者maX +func (core *Core) GetRangeCandleSortedSet(setName string, count int, from time.Time) (*CandleList, error) { + cdl := CandleList{} + ary1 := strings.Split(setName, "|") + ary2 := []string{} + period := "" + ary2 = strings.Split(ary1[0], "candle") + period = ary2[1] + + dui, err := core.PeriodToMinutes(period) + if err != nil { + return nil, err + } + fromt := from.UnixMilli() + nw := time.Now().UnixMilli() + if fromt > nw*2 { + err := errors.New("时间错了需要debug") + logrus.Warning(err.Error()) + return nil, err + } + froms := strconv.FormatInt(fromt, 10) + sti := fromt - dui*int64(count)*60*1000 + sts := strconv.FormatInt(sti, 10) + opt := redis.ZRangeBy{ + Min: sts, + Max: froms, + Count: int64(count), + } + ary := []string{} + extt, err := core.GetExpiration(period) + ot := time.Now().Add(extt * -1) + oti := ot.UnixMilli() + cli := core.RedisLocalCli + cli.LTrim(setName, 0, oti) + cunt, _ := cli.ZRemRangeByScore(setName, "0", strconv.FormatInt(oti, 10)).Result() + if cunt > 0 { + logrus.Warning("移出过期的引用数量:", setName, count, "ZRemRangeByScore ", setName, 0, strconv.FormatInt(oti, 10)) + } + logrus.Info("ZRevRangeByScore ", setName, opt) + ary, err = cli.ZRevRangeByScore(setName, opt).Result() + if err != nil { + return &cdl, err + } + keyAry, err := cli.MGet(ary...).Result() + if err != nil || len(keyAry) == 0 { + logrus.Warning("no record with cmd: ZRevRangeByScore ", setName, froms, sts, " ", err.Error()) + logrus.Warning("zrev lens of ary: lens: ", len(ary), "GetRangeSortedSet ZRevRangeByScore:", "setName:", setName, "opt.Max:", opt.Max, "opt.Min:", opt.Min) + return &cdl, err + } + for _, str := range keyAry { + if str == nil { + continue + } + cd := Candle{} + err := json.Unmarshal([]byte(str.(string)), &cd) + if err != nil { + logrus.Warn(GetFuncName(), err, str.(string)) + } + tmi := ToInt64(cd.Data[0]) + tm := time.UnixMilli(tmi) + if tm.Sub(from) > 0 { + break + } + cdl.List = append(cdl.List, &cd) + } + cdl.Count = count + return &cdl, nil +} + +func (cr *Core) GetCoasterFromPlate(instID string, period string) (Coaster, error) { + // cr.Mu.Lock() + // defer cr.Mu.Unlock() + pl, ok := cr.PlateMap[instID] + if !ok { + err := errors.New("instID period not found : " + instID + " " + period) + return *new(Coaster), err + } + co := pl.CoasterMap["period"+period] + + return co, nil +} diff --git a/pixel.go b/pixel.go new file mode 100644 index 0000000..60c5508 --- /dev/null +++ b/pixel.go @@ -0,0 +1,92 @@ +package core + +type Pixel struct { + X float64 `json:"x"` + Y float64 `json:"y"` + YCandle YCandle `json:"yCandle,omitempty"` + Score float64 `json:"Score"` + TimeStamp int64 `json:"timeStamp"` + // ListMap map +} +type YCandle struct { + Open float64 // 开盘价格 + High float64 // 最高价格 + Low float64 // 最低价格 + Close float64 // 收盘价格 +} +type MyPixel struct { + InstID string `json:"instID"` + Period string `json:"period"` + Pixel *Pixel `json:"pixel"` +} +type PixelList struct { + Count int `json:"count"` + LastUpdateTime int64 `json:"lastUpdateTime"` + UpdateNickName string `json:"updateNickName"` + Ctype string `json:"ctype"` + List []*Pixel `json:"pixel"` +} + +func (pxl *PixelList) ReIndex() error { + for k, v := range pxl.List { + v.X = float64(k) + } + return nil +} + +// 冒泡排序 按时间排序 +func (pxl *PixelList) RecursiveBubbleS(length int, ctype string) error { + if length == 0 { + return nil + } + for idx, _ := range pxl.List { + if idx >= length-1 { + break + } + temp := Pixel{} + + pre := pxl.List[idx] + nex := pxl.List[idx+1] + daoxu := pre.TimeStamp < nex.TimeStamp + if ctype == "asc" { + daoxu = !daoxu + } + if daoxu { //改变成>,换成从小到大排序 + temp = *pxl.List[idx] + pxl.List[idx] = pxl.List[idx+1] + pxl.List[idx+1] = &temp + } + + } + length-- + pxl.RecursiveBubbleS(length, ctype) + return nil +} + +func (pxl *PixelList) RecursiveBubbleX(length int, ctype string) error { + if length == 0 { + return nil + } + for idx, _ := range pxl.List { + if idx >= length-1 { + break + } + temp := float64(0) + + pre := pxl.List[idx] + nex := pxl.List[idx+1] + daoxu := pre.X < nex.X + if ctype == "asc" { + daoxu = !daoxu + } + if daoxu { //改变成>,换成从小到大排序 + temp = pxl.List[idx].X + pxl.List[idx].X = pxl.List[idx+1].X + pxl.List[idx+1].X = temp + } + + } + length-- + pxl.RecursiveBubbleS(length, ctype) + return nil +} diff --git a/plate.go b/plate.go new file mode 100644 index 0000000..02b915a --- /dev/null +++ b/plate.go @@ -0,0 +1,78 @@ +package core + +import ( + "encoding/json" + "time" +) + +type Plate struct { + InstID string `json:"instId,string"` + Scale float64 `json:"scale,number"` + Count int `json:"count,number"` + CoasterMap map[string]Coaster `json:"coasterMap"` +} + +func (pl *Plate) Init(instId string) { + pl.InstID = instId + pl.Count = 24 + pl.Scale = float64(0.005) + pl.CoasterMap = make(map[string]Coaster) +} + +// TODO 从redis里读出来已经存储的plate,如果不存在就创建一个新的 +func LoadPlate(cr *Core, instId string) (*Plate, error) { + pl := Plate{} + plateName := instId + "|plate" + _, err := cr.RedisLocalCli.Exists().Result() + if err == nil { + str, _ := cr.RedisLocalCli.Get(plateName).Result() + json.Unmarshal([]byte(str), &pl) + } else { + pl.Init(instId) + prs := cr.Cfg.Config.Get("candleDimentions").MustArray() + for _, v := range prs { + pl.MakeCoaster(cr, v.(string)) + } + } + return &pl, nil +} + +func (pl *Plate) SetToKey(cr *Core) error { + js, _ := json.Marshal(*pl) + plateName := pl.InstID + "|plate" + _, err := cr.RedisLocalCli.Set(plateName, string(js), 0).Result() + return err +} + +func (pl *Plate) MakeCoaster(cr *Core, period string) (*Coaster, error) { + lastTime := time.Now() + setName := "candle" + period + "|" + pl.InstID + "|sortedSet" + cdl, err := cr.GetRangeCandleSortedSet(setName, pl.Count, lastTime) + if err != nil { + return nil, err + } + cdl.RecursiveBubbleS(len(cdl.List), "asc") + setName7 := "ma7|" + setName + setName30 := "ma30|" + setName + mxl7, err := cr.GetRangeMaXSortedSet(setName7, pl.Count, lastTime) + if err != nil { + return nil, err + } + mxl7.RecursiveBubbleS(len(mxl7.List), "asc") + mxl30, err := cr.GetRangeMaXSortedSet(setName30, pl.Count, lastTime) + if err != nil { + return nil, err + } + mxl30.RecursiveBubbleS(len(mxl30.List), "asc") + coaster := Coaster{ + InstID: pl.InstID, + Period: period, + Count: pl.Count, + Scale: pl.Scale, + CandleList: *cdl, + Ma7List: *mxl7, + Ma30List: *mxl30, + } + pl.CoasterMap["period"+period] = coaster + return &coaster, err +} diff --git a/tray.go b/tray.go new file mode 100644 index 0000000..923207c --- /dev/null +++ b/tray.go @@ -0,0 +1,60 @@ +package core + +import ( + "encoding/json" +) + +type Tray struct { + InstID string `json:"instId,string"` + Period string `json:"period,string"` + Count int `json:"count,number"` + Scale float64 `json:"scale,number"` + LastUpdateTime int64 `json:"lastUpdateTime,number"` + // SeriesMap map[string]*Series `json:"seriesMap"` +} + +type PixelSeries struct { + Count int64 `json:"count"` + Section int64 `json:"section"` + List []*Pixel `json:"list"` +} + +func (tr *Tray) Init(instId string) { + tr.InstID = instId + tr.Count = 24 + tr.Scale = float64(0.005) + // tr.SeriesMap = make(map[string]*Series) +} +func (tr *Tray) SetToKey(cr *Core) error { + js, _ := json.Marshal(tr) + keyName := tr.InstID + "|" + tr.Period + "|tray" + _, err := cr.RedisLocalCli.Set(keyName, string(js), 0).Result() + // fmt.Println(utils.GetFuncName(), "tray SetToKey:", string(js)) + return err +} + +// TODO 执行单维度分析,相对应的是跨维度的分析,那个还没想好 +// 单维度下的分析结果中包含以下信息: +// 1. +func (tr *Tray) Analytics(cr *Core) { + go func() { + + }() +} + +// TODO 实例化一个series +// func (tr *Tray) NewSeries(cr *Core, period string) (*Series, error) { +// sr := Series{ +// InstID: tr.InstID, +// Period: period, +// Count: tr.Count, +// Scale: tr.Scale, +// CandleSeries: &PixelList{}, +// Ma7Series: &PixelList{}, +// Ma30Series: &PixelList{}, +// } +// // 自我更新 +// err := sr.Refresh(cr) +// tr.SeriesMap["period"+period] = &sr +// return &sr, err +// }