去掉Coaster UploadTray,以后再说

This commit is contained in:
zhangkun9038@dingtalk.com 2024-12-15 20:49:18 +08:00
parent 91388f85e8
commit 534186cf8f
2 changed files with 90 additions and 89 deletions

@ -21,7 +21,7 @@ import (
type Candle struct {
Id string `json:"_id"`
core *Core
InstId string
InstID string
Period string
Data []interface{}
From string
@ -64,7 +64,7 @@ func (cd *Candle) Filter(cr *Core) bool {
myFocusList := cr.Cfg.Config.Get("focusList").MustArray()
founded := false
for _, v := range myFocusList {
if v.(string) == cd.InstId {
if v.(string) == cd.InstID {
founded = true
break
}
@ -179,7 +179,7 @@ func (core *Core) SaveCandle(instId string, period string, rsp *CandleData, dura
leng := len(rsp.Data)
for _, v := range rsp.Data {
candle := Candle{
InstId: instId,
InstID: instId,
Period: period,
Data: v,
From: "rest",
@ -203,7 +203,7 @@ func (core *Core) SaveCandle(instId string, period string, rsp *CandleData, dura
}
func (candle *Candle) PushToWriteLogChan(cr *Core) error {
did := candle.InstId + candle.Period + candle.Data[0].(string)
did := candle.InstID + candle.Period + candle.Data[0].(string)
candle.Id = HashString(did)
ncd, _ := candle.ToStruct(cr)
fmt.Println("ncd: ", ncd)
@ -240,7 +240,7 @@ func (cl *Candle) ToStruct(core *Core) (*Candle, error) {
ncd := Candle{}
ncd.Id = cl.Id
ncd.Period = cl.Period
ncd.InstId = cl.InstId
ncd.InstID = cl.InstID
ncd.From = cl.From
// 将字符串转换为 int64 类型的时间戳
@ -446,7 +446,7 @@ func (cl *Candle) SetToKey(core *Core) ([]interface{}, error) {
data := cl.Data
tsi, err := strconv.ParseInt(data[0].(string), 10, 64)
tss := strconv.FormatInt(tsi, 10)
keyName := "candle" + cl.Period + "|" + cl.InstId + "|ts:" + tss
keyName := "candle" + cl.Period + "|" + cl.InstID + "|ts:" + tss
//过期时间:根号(当前candle的周期/1分钟)*10000
dt, err := json.Marshal(cl.Data)

@ -89,88 +89,89 @@ func (co *Coaster) SetToKey(cr *Core) (string, error) {
return res, err
}
func (coi *CoasterInfo) Process(cr *Core) {
curCo, _ := cr.GetCoasterFromPlate(coi.InstID, coi.Period)
go func(co Coaster) {
//这里执行创建一个tray对象,用现有的co的数据计算和填充其listMap
// TODO 发到一个channel里来执行下面的任务
allow := os.Getenv("SARDINE_MAKESERIES") == "true"
if !allow {
return
}
srs, err := co.UpdateTray(cr)
if err != nil || srs == nil {
logrus.Warn("tray err: ", err)
return
}
_, err = srs.SetToKey(cr)
if err != nil {
logrus.Warn("srs SetToKey err: ", err)
return
}
//实例化完一个tray之后拿着这个tray去执行Analytics方法
//
// srsinfo := SeriesInfo{
// InstID: curCo.InstID,
// Period: curCo.Period,
// }
//
// cr.SeriesChan <- &srsinfo
}(curCo)
go func(co Coaster) {
// 每3次会有一次触发缓存落盘
// run := utils.Shaizi(3)
// if run {
_, err := co.SetToKey(cr)
if err != nil {
logrus.Warn("coaster process err: ", err)
fmt.Println("coaster SetToKey err: ", err)
}
// }
}(curCo)
}
// func (coi *CoasterInfo) Process(cr *Core) {
// curCo, _ := cr.GetCoasterFromPlate(coi.InstID, coi.Period)
// go func(co Coaster) {
// //这里执行创建一个tray对象,用现有的co的数据计算和填充其listMap
// // TODO 发到一个channel里来执行下面的任务
// allow := os.Getenv("SARDINE_MAKESERIES") == "true"
// if !allow {
// return
// }
// srs, err := co.UpdateTray(cr)
// if err != nil || srs == nil {
// logrus.Warn("tray err: ", err)
// return
// }
// _, err = srs.SetToKey(cr)
// if err != nil {
// logrus.Warn("srs SetToKey err: ", err)
// return
// }
// //实例化完一个tray之后拿着这个tray去执行Analytics方法
// //
// // srsinfo := SeriesInfo{
// // InstID: curCo.InstID,
// // Period: curCo.Period,
// // }
// //
// // cr.SeriesChan <- &srsinfo
// }(curCo)
//
// go func(co Coaster) {
// // 每3次会有一次触发缓存落盘
// // run := utils.Shaizi(3)
// // if run {
// _, err := co.SetToKey(cr)
// if err != nil {
// logrus.Warn("coaster process err: ", err)
// fmt.Println("coaster SetToKey err: ", err)
// }
// // }
//
// }(curCo)
// }
//
// TODO 类似于InsertIntoPlate函数照猫画虎就行了
func (co *Coaster) UpdateTray(cr *Core) (*Series, error) {
cr.Mu1.Lock()
defer cr.Mu1.Unlock()
//尝试从内存读取tray对象
tr, trayFounded := cr.TrayMap[co.InstID]
if !trayFounded {
tr1, err := co.LoadTray(cr)
if err != nil {
return nil, err
}
cr.TrayMap[co.InstID] = tr1
tr = tr1
}
srs, seriesFounded := tr.SeriesMap["period"+co.Period]
err := errors.New("")
if !seriesFounded {
srs1, err := tr.NewSeries(cr, co.Period)
if err != nil {
return nil, err
}
tr.SeriesMap["period"+co.Period] = srs1
} else {
err = srs.Refresh(cr)
}
// if err == nil {
// bj, _ := json.Marshal(srs)
// logrus.Debug("series:,string"(bj))
// }
return srs, err
}
//
// func (co *Coaster) UpdateTray(cr *Core) (*Series, error) {
// cr.Mu1.Lock()
// defer cr.Mu1.Unlock()
// //尝试从内存读取tray对象
// tr, trayFounded := cr.TrayMap[co.InstID]
// if !trayFounded {
// tr1, err := co.LoadTray(cr)
// if err != nil {
// return nil, err
// }
// cr.TrayMap[co.InstID] = tr1
// tr = tr1
// }
// srs, seriesFounded := tr.SeriesMap["period"+co.Period]
// err := errors.New("")
// if !seriesFounded {
// srs1, err := tr.NewSeries(cr, co.Period)
// if err != nil {
// return nil, err
// }
// tr.SeriesMap["period"+co.Period] = srs1
// } else {
// err = srs.Refresh(cr)
// }
// // if err == nil {
// // bj, _ := json.Marshal(srs)
// // logrus.Debug("series:,string"(bj))
// // }
// return srs, err
// }
//
// TODO
func (co *Coaster) LoadTray(cr *Core) (*Tray, error) {
tray := Tray{}
tray.Init(co.InstID)
prs := cr.Cfg.Config.Get("candleDimentions").MustArray()
for _, v := range prs {
tray.NewSeries(cr, v.(string))
}
return &tray, nil
}
// func (co *Coaster) LoadTray(cr *Core) (*Tray, error) {
// tray := Tray{}
// tray.Init(co.InstID)
// prs := cr.Cfg.Config.Get("candleDimentions").MustArray()
// for _, v := range prs {
// tray.NewSeries(cr, v.(string))
// }
// return &tray, nil
// }