for siaga

This commit is contained in:
zhangkun9038@dingtalk.com 2024-12-15 17:40:14 +08:00
parent 47bd02f138
commit 0599d895ca
7 changed files with 563 additions and 7 deletions

1
.gitignore vendored
View File

@ -1 +1,2 @@
submodules/
vendor/

View File

@ -477,3 +477,67 @@ func (cl *Candle) SetToKey(core *Core) ([]interface{}, error) {
core.SaveUniKey(cl.Period, keyName, extt, tsi)
return cl.Data, err
}
// 冒泡排序
func (cdl *CandleList) RecursiveBubbleS(length int, ctype string) error {
if length == 0 {
return nil
}
for idx, _ := range cdl.List {
if idx >= length-1 {
break
}
temp := Candle{}
pre := ToInt64(cdl.List[idx].Data[0])
nex := ToInt64(cdl.List[idx+1].Data[0])
daoxu := pre < nex
if ctype == "asc" {
daoxu = !daoxu
}
if daoxu { //改变成>,换成从小到大排序
temp = *cdl.List[idx]
cdl.List[idx] = cdl.List[idx+1]
cdl.List[idx+1] = &temp
}
}
length--
cdl.RecursiveBubbleS(length, ctype)
return nil
}
// TODO 返回的Sample是被弹出队列的元素如果没有就是nil
func (cdl *CandleList) RPush(sp *Candle) (Sample, error) {
last := Candle{}
tsi := ToInt64(sp.Data[0])
matched := false
// bj, _ := json.Marshal(*sp)
cdl.RecursiveBubbleS(len(cdl.List), "asc")
for k, v := range cdl.List {
if ToInt64(v.Data[0]) == tsi {
matched = true
cdl.List[k] = sp
bj, err := json.Marshal(sp)
if err != nil {
logrus.Warning("err of convert cdl item:", err)
}
logrus.Debug("candleList RPush replace: ", string(bj), "v.Data[0]: ", v.Data[0], "tsi:", tsi)
}
}
if matched {
return nil, nil
}
if len(cdl.List) >= cdl.Count {
last = *cdl.List[0]
cdl.List = cdl.List[1:]
cdl.List = append(cdl.List, sp)
bj, err := json.Marshal(sp)
logrus.Debug("candleList RPush popup: ", string(bj), "len(cdl.List): ", len(cdl.List), "cdl.Count:", cdl.Count)
return &last, err
} else {
cdl.List = append(cdl.List, sp)
bj, err := json.Marshal(sp)
logrus.Debug("candleList RPush insert: ", string(bj), "len(cdl.List): ", len(cdl.List), "cdl.Count:", cdl.Count)
return nil, err
}
}

176
coaster.go Normal file
View File

@ -0,0 +1,176 @@
package core
import (
"encoding/json"
"errors"
"fmt"
logrus "github.com/sirupsen/logrus"
"os"
"strconv"
"time"
)
// TODO 目前没有实现tickerInfo用一分钟维度的candle代替, 后续如果订阅ws的话ticker就不用了也还是直接用candle1m就够了
type Coaster struct {
InstID string `json:"instID"`
Period string `json:"period"`
Count int `json:"count"`
Scale float64 `json:"scale"`
LastUpdateTime int64 `json:"lastUpdateTime"`
UpdateNickName string `json:"updateNickName"`
CandleList CandleList `json:"candleList"`
Ma7List MaXList `json:"ma7List"`
Ma30List MaXList `json:"ma30List"`
}
type CoasterInfo struct {
InstID string
Period string
InsertedNew bool
}
func (co Coaster) RPushSample(cr *Core, sp Sample, ctype string) (*Sample, error) {
cd := Candle{}
spjs, _ := json.Marshal(sp)
logrus.Debug("RPushSample spjs: ", string(spjs))
if ctype == "candle" {
json.Unmarshal(spjs, &cd)
cd.Data[0] = cd.Data[0]
cd.Data[1], _ = strconv.ParseFloat(cd.Data[1].(string), 64)
cd.Data[2], _ = strconv.ParseFloat(cd.Data[2].(string), 64)
cd.Data[3], _ = strconv.ParseFloat(cd.Data[3].(string), 64)
cd.Data[4], _ = strconv.ParseFloat(cd.Data[4].(string), 64)
cd.Data[5], _ = strconv.ParseFloat(cd.Data[5].(string), 64)
cd.Data[6], _ = strconv.ParseFloat(cd.Data[6].(string), 64)
sm, err := co.CandleList.RPush(&cd)
if err == nil {
now := time.Now().UnixMilli()
co.LastUpdateTime = now
co.CandleList.LastUpdateTime = now
co.UpdateNickName = utils.GetRandomString(12)
co.CandleList.UpdateNickName = utils.GetRandomString(12)
}
return &sm, err
}
mx := MaX{}
if ctype == "ma7" {
json.Unmarshal(spjs, &mx)
sm, err := co.Ma7List.RPush(&mx)
if err == nil {
now := time.Now().UnixMilli()
co.LastUpdateTime = now
co.Ma7List.UpdateNickName = utils.GetRandomString(12)
co.Ma7List.LastUpdateTime = now
}
return &sm, err
}
if ctype == "ma30" {
json.Unmarshal(spjs, &mx)
sm, err := co.Ma30List.RPush(&mx)
// bj, _ := json.Marshal(co)
if err == nil {
now := time.Now().UnixMilli()
co.LastUpdateTime = now
co.Ma30List.UpdateNickName = utils.GetRandomString(12)
co.Ma30List.LastUpdateTime = now
}
return &sm, err
}
return nil, nil
}
func (co *Coaster) SetToKey(cr *Core) (string, error) {
co.CandleList.RecursiveBubbleS(len(co.CandleList.List), "asc")
co.Ma7List.RecursiveBubbleS(len(co.Ma7List.List), "asc")
co.Ma30List.RecursiveBubbleS(len(co.Ma30List.List), "asc")
js, _ := json.Marshal(*co)
coasterName := co.InstID + "|" + co.Period + "|coaster"
res, err := cr.RedisLocalCli.Set(coasterName, string(js), 0).Result()
return res, err
}
func (coi *CoasterInfo) Process(cr *Core) {
curCo, _ := cr.GetCoasterFromPlate(coi.InstID, coi.Period)
go func(co Coaster) {
//这里执行创建一个tray对象,用现有的co的数据计算和填充其listMap
// TODO 发到一个channel里来执行下面的任务
allow := os.Getenv("SARDINE_MAKESERIES") == "true"
if !allow {
return
}
srs, err := co.UpdateTray(cr)
if err != nil || srs == nil {
logrus.Warn("tray err: ", err)
return
}
_, err = srs.SetToKey(cr)
if err != nil {
logrus.Warn("srs SetToKey err: ", err)
return
}
//实例化完一个tray之后拿着这个tray去执行Analytics方法
//
srsinfo := SeriesInfo{
InstID: curCo.InstID,
Period: curCo.Period,
}
cr.SeriesChan <- &srsinfo
}(curCo)
go func(co Coaster) {
// 每3次会有一次触发缓存落盘
// run := utils.Shaizi(3)
// if run {
_, err := co.SetToKey(cr)
if err != nil {
logrus.Warn("coaster process err: ", err)
fmt.Println("coaster SetToKey err: ", err)
}
// }
}(curCo)
}
// TODO 类似于InsertIntoPlate函数照猫画虎就行了
func (co *Coaster) UpdateTray(cr *Core) (*Series, error) {
cr.Mu1.Lock()
defer cr.Mu1.Unlock()
//尝试从内存读取tray对象
tr, trayFounded := cr.TrayMap[co.InstID]
if !trayFounded {
tr1, err := co.LoadTray(cr)
if err != nil {
return nil, err
}
cr.TrayMap[co.InstID] = tr1
tr = tr1
}
srs, seriesFounded := tr.SeriesMap["period"+co.Period]
err := errors.New("")
if !seriesFounded {
srs1, err := tr.NewSeries(cr, co.Period)
if err != nil {
return nil, err
}
tr.SeriesMap["period"+co.Period] = srs1
} else {
err = srs.Refresh(cr)
}
// if err == nil {
// bj, _ := json.Marshal(srs)
// logrus.Debug("series:,string"(bj))
// }
return srs, err
}
// TODO
func (co *Coaster) LoadTray(cr *Core) (*Tray, error) {
tray := Tray{}
tray.Init(co.InstID)
prs := cr.Cfg.Config.Get("candleDimentions").MustArray()
for _, v := range prs {
tray.NewSeries(cr, v.(string))
}
return &tray, nil
}

99
core.go
View File

@ -23,13 +23,13 @@ import (
)
type Core struct {
Env string
Cfg *MyConfig
RedisLocalCli *redis.Client
RedisRemoteCli *redis.Client
FluentBitUrl string
// PlateMap map[string]*Plate
// TrayMap map[string]*Tray
Env string
Cfg *MyConfig
RedisLocalCli *redis.Client
RedisRemoteCli *redis.Client
FluentBitUrl string
PlateMap map[string]*Plate
TrayMap map[string]*Tray
CoasterMd5SyncMap sync.Map
Mu *sync.Mutex
Mu1 *sync.Mutex
@ -714,3 +714,88 @@ func (cr *Core) PeriodToLastTime(period string, from time.Time) (time.Time, erro
// fmt.Println("PeriodToLastTime: period: ", period, " lastTime:", om.Format("2006-01-02 15:04:05.000"))
return om, nil
}
// setName := "candle" + period + "|" + instId + "|sortedSet"
// count: 倒推多少个周期开始拿数据
// from: 倒推的起始时间点
// ctype: candle或者maX
func (core *Core) GetRangeCandleSortedSet(setName string, count int, from time.Time) (*CandleList, error) {
cdl := CandleList{}
ary1 := strings.Split(setName, "|")
ary2 := []string{}
period := ""
ary2 = strings.Split(ary1[0], "candle")
period = ary2[1]
dui, err := core.PeriodToMinutes(period)
if err != nil {
return nil, err
}
fromt := from.UnixMilli()
nw := time.Now().UnixMilli()
if fromt > nw*2 {
err := errors.New("时间错了需要debug")
logrus.Warning(err.Error())
return nil, err
}
froms := strconv.FormatInt(fromt, 10)
sti := fromt - dui*int64(count)*60*1000
sts := strconv.FormatInt(sti, 10)
opt := redis.ZRangeBy{
Min: sts,
Max: froms,
Count: int64(count),
}
ary := []string{}
extt, err := core.GetExpiration(period)
ot := time.Now().Add(extt * -1)
oti := ot.UnixMilli()
cli := core.RedisLocalCli
cli.LTrim(setName, 0, oti)
cunt, _ := cli.ZRemRangeByScore(setName, "0", strconv.FormatInt(oti, 10)).Result()
if cunt > 0 {
logrus.Warning("移出过期的引用数量:", setName, count, "ZRemRangeByScore ", setName, 0, strconv.FormatInt(oti, 10))
}
logrus.Info("ZRevRangeByScore ", setName, opt)
ary, err = cli.ZRevRangeByScore(setName, opt).Result()
if err != nil {
return &cdl, err
}
keyAry, err := cli.MGet(ary...).Result()
if err != nil || len(keyAry) == 0 {
logrus.Warning("no record with cmd: ZRevRangeByScore ", setName, froms, sts, " ", err.Error())
logrus.Warning("zrev lens of ary: lens: ", len(ary), "GetRangeSortedSet ZRevRangeByScore:", "setName:", setName, "opt.Max:", opt.Max, "opt.Min:", opt.Min)
return &cdl, err
}
for _, str := range keyAry {
if str == nil {
continue
}
cd := Candle{}
err := json.Unmarshal([]byte(str.(string)), &cd)
if err != nil {
logrus.Warn(GetFuncName(), err, str.(string))
}
tmi := ToInt64(cd.Data[0])
tm := time.UnixMilli(tmi)
if tm.Sub(from) > 0 {
break
}
cdl.List = append(cdl.List, &cd)
}
cdl.Count = count
return &cdl, nil
}
func (cr *Core) GetCoasterFromPlate(instID string, period string) (Coaster, error) {
// cr.Mu.Lock()
// defer cr.Mu.Unlock()
pl, ok := cr.PlateMap[instID]
if !ok {
err := errors.New("instID period not found : " + instID + " " + period)
return *new(Coaster), err
}
co := pl.CoasterMap["period"+period]
return co, nil
}

92
pixel.go Normal file
View File

@ -0,0 +1,92 @@
package core
type Pixel struct {
X float64 `json:"x"`
Y float64 `json:"y"`
YCandle YCandle `json:"yCandle,omitempty"`
Score float64 `json:"Score"`
TimeStamp int64 `json:"timeStamp"`
// ListMap map
}
type YCandle struct {
Open float64 // 开盘价格
High float64 // 最高价格
Low float64 // 最低价格
Close float64 // 收盘价格
}
type MyPixel struct {
InstID string `json:"instID"`
Period string `json:"period"`
Pixel *Pixel `json:"pixel"`
}
type PixelList struct {
Count int `json:"count"`
LastUpdateTime int64 `json:"lastUpdateTime"`
UpdateNickName string `json:"updateNickName"`
Ctype string `json:"ctype"`
List []*Pixel `json:"pixel"`
}
func (pxl *PixelList) ReIndex() error {
for k, v := range pxl.List {
v.X = float64(k)
}
return nil
}
// 冒泡排序 按时间排序
func (pxl *PixelList) RecursiveBubbleS(length int, ctype string) error {
if length == 0 {
return nil
}
for idx, _ := range pxl.List {
if idx >= length-1 {
break
}
temp := Pixel{}
pre := pxl.List[idx]
nex := pxl.List[idx+1]
daoxu := pre.TimeStamp < nex.TimeStamp
if ctype == "asc" {
daoxu = !daoxu
}
if daoxu { //改变成>,换成从小到大排序
temp = *pxl.List[idx]
pxl.List[idx] = pxl.List[idx+1]
pxl.List[idx+1] = &temp
}
}
length--
pxl.RecursiveBubbleS(length, ctype)
return nil
}
func (pxl *PixelList) RecursiveBubbleX(length int, ctype string) error {
if length == 0 {
return nil
}
for idx, _ := range pxl.List {
if idx >= length-1 {
break
}
temp := float64(0)
pre := pxl.List[idx]
nex := pxl.List[idx+1]
daoxu := pre.X < nex.X
if ctype == "asc" {
daoxu = !daoxu
}
if daoxu { //改变成>,换成从小到大排序
temp = pxl.List[idx].X
pxl.List[idx].X = pxl.List[idx+1].X
pxl.List[idx+1].X = temp
}
}
length--
pxl.RecursiveBubbleS(length, ctype)
return nil
}

78
plate.go Normal file
View File

@ -0,0 +1,78 @@
package core
import (
"encoding/json"
"time"
)
type Plate struct {
InstID string `json:"instId,string"`
Scale float64 `json:"scale,number"`
Count int `json:"count,number"`
CoasterMap map[string]Coaster `json:"coasterMap"`
}
func (pl *Plate) Init(instId string) {
pl.InstID = instId
pl.Count = 24
pl.Scale = float64(0.005)
pl.CoasterMap = make(map[string]Coaster)
}
// TODO 从redis里读出来已经存储的plate如果不存在就创建一个新的
func LoadPlate(cr *Core, instId string) (*Plate, error) {
pl := Plate{}
plateName := instId + "|plate"
_, err := cr.RedisLocalCli.Exists().Result()
if err == nil {
str, _ := cr.RedisLocalCli.Get(plateName).Result()
json.Unmarshal([]byte(str), &pl)
} else {
pl.Init(instId)
prs := cr.Cfg.Config.Get("candleDimentions").MustArray()
for _, v := range prs {
pl.MakeCoaster(cr, v.(string))
}
}
return &pl, nil
}
func (pl *Plate) SetToKey(cr *Core) error {
js, _ := json.Marshal(*pl)
plateName := pl.InstID + "|plate"
_, err := cr.RedisLocalCli.Set(plateName, string(js), 0).Result()
return err
}
func (pl *Plate) MakeCoaster(cr *Core, period string) (*Coaster, error) {
lastTime := time.Now()
setName := "candle" + period + "|" + pl.InstID + "|sortedSet"
cdl, err := cr.GetRangeCandleSortedSet(setName, pl.Count, lastTime)
if err != nil {
return nil, err
}
cdl.RecursiveBubbleS(len(cdl.List), "asc")
setName7 := "ma7|" + setName
setName30 := "ma30|" + setName
mxl7, err := cr.GetRangeMaXSortedSet(setName7, pl.Count, lastTime)
if err != nil {
return nil, err
}
mxl7.RecursiveBubbleS(len(mxl7.List), "asc")
mxl30, err := cr.GetRangeMaXSortedSet(setName30, pl.Count, lastTime)
if err != nil {
return nil, err
}
mxl30.RecursiveBubbleS(len(mxl30.List), "asc")
coaster := Coaster{
InstID: pl.InstID,
Period: period,
Count: pl.Count,
Scale: pl.Scale,
CandleList: *cdl,
Ma7List: *mxl7,
Ma30List: *mxl30,
}
pl.CoasterMap["period"+period] = coaster
return &coaster, err
}

60
tray.go Normal file
View File

@ -0,0 +1,60 @@
package core
import (
"encoding/json"
)
type Tray struct {
InstID string `json:"instId,string"`
Period string `json:"period,string"`
Count int `json:"count,number"`
Scale float64 `json:"scale,number"`
LastUpdateTime int64 `json:"lastUpdateTime,number"`
// SeriesMap map[string]*Series `json:"seriesMap"`
}
type PixelSeries struct {
Count int64 `json:"count"`
Section int64 `json:"section"`
List []*Pixel `json:"list"`
}
func (tr *Tray) Init(instId string) {
tr.InstID = instId
tr.Count = 24
tr.Scale = float64(0.005)
// tr.SeriesMap = make(map[string]*Series)
}
func (tr *Tray) SetToKey(cr *Core) error {
js, _ := json.Marshal(tr)
keyName := tr.InstID + "|" + tr.Period + "|tray"
_, err := cr.RedisLocalCli.Set(keyName, string(js), 0).Result()
// fmt.Println(utils.GetFuncName(), "tray SetToKey:", string(js))
return err
}
// TODO 执行单维度分析,相对应的是跨维度的分析,那个还没想好
// 单维度下的分析结果中包含以下信息:
// 1.
func (tr *Tray) Analytics(cr *Core) {
go func() {
}()
}
// TODO 实例化一个series
// func (tr *Tray) NewSeries(cr *Core, period string) (*Series, error) {
// sr := Series{
// InstID: tr.InstID,
// Period: period,
// Count: tr.Count,
// Scale: tr.Scale,
// CandleSeries: &PixelList{},
// Ma7Series: &PixelList{},
// Ma30Series: &PixelList{},
// }
// // 自我更新
// err := sr.Refresh(cr)
// tr.SeriesMap["period"+period] = &sr
// return &sr, err
// }