diff --git a/go.mod b/go.mod index 77f185b..8e481d9 100644 --- a/go.mod +++ b/go.mod @@ -5,13 +5,13 @@ replace github.com/phyer/siaga/modules => ./modules go 1.21 require ( - github.com/bitly/go-simplejson v0.5.0 github.com/go-redis/redis v6.15.9+incompatible - github.com/phyer/core v0.1.12 + github.com/phyer/core v0.1.18 github.com/sirupsen/logrus v1.9.3 ) require ( + github.com/bitly/go-simplejson v0.5.0 // indirect github.com/onsi/ginkgo v1.16.5 // indirect github.com/onsi/gomega v1.18.1 // indirect github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196 // indirect diff --git a/go.sum b/go.sum index a75f7d1..28d7f79 100644 --- a/go.sum +++ b/go.sum @@ -48,8 +48,8 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= -github.com/phyer/core v0.1.12 h1:fOvE0T3obnzPUqT8F78i/rIwl/opa7ZhB3xx96GIFPc= -github.com/phyer/core v0.1.12/go.mod h1:oVP5mvnnJvI2Qxlnh4jYGj92DbH7XyY2xeRagQ3hdo8= +github.com/phyer/core v0.1.18 h1:pXQ2QDvkbCVtqcmaQl2nCa7LjYYeJkYVQdb26HcTvgc= +github.com/phyer/core v0.1.18/go.mod h1:oVP5mvnnJvI2Qxlnh4jYGj92DbH7XyY2xeRagQ3hdo8= github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196 h1:P1sxgCsS0VIL38ufZzgUuZLLyY/B+po6kSY7ziNZT7E= github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196/go.mod h1:iZexs5agdApNlp8HW/FqKgma4Ij1x8/o+ZLcMvY3f80= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/main.go b/main.go index a10e93c..f544445 100644 --- a/main.go +++ b/main.go @@ -60,12 +60,12 @@ func main() { go func() { md.CandlesProcess(&cr) }() - // go func() { - // core.MaXsProcess(&cr) - // }() - // go func() { - // core.TickerInfoProcess(&cr) - // }() + go func() { + md.MaXsProcess(&cr) + }() + go func() { + md.TickerInfoProcess(&cr) + }() // 这些暂时不运行, 以后要不要运行再说 // go func() { diff --git a/modules/candle.go b/modules/candle.go index 0e62be4..d31e0bb 100644 --- a/modules/candle.go +++ b/modules/candle.go @@ -12,7 +12,7 @@ import ( "time" // // simple "github.com/bitly/go-simplejson" - "github.com/go-redis/redis" + // "github.com/go-redis/redis" // "github.com/phyer/core/utils" logrus "github.com/sirupsen/logrus" ) @@ -29,8 +29,9 @@ func (cd *MyCandle) Process(cr *core.Core) { if !founded { return } + // 把candle存下来 go func() { - saveCandle := os.Getenv("SARDINE_SAVECANDLE") + saveCandle := os.Getenv("SIAGA_SAVECANDLE") if saveCandle == "true" { _, err := cd.SetToKey(cr) if err != nil { @@ -40,7 +41,7 @@ func (cd *MyCandle) Process(cr *core.Core) { }() // TODO update plate and coaster go func() { - makeSeries := os.Getenv("SARDINE_MAKESERIES") + makeSeries := os.Getenv("SIAGA_MAKESERIES") if makeSeries == "true" { _, err := cd.InsertIntoPlate(cr) if err != nil { @@ -51,7 +52,7 @@ func (cd *MyCandle) Process(cr *core.Core) { // 由于max可以从远端直接拿,不需要sardine自己做,所以可以sardine做也可以不做 go func(cad *MyCandle) { - makeMaX := os.Getenv("SARDINE_MAKEMAX") + makeMaX := os.Getenv("SIAGA_MAKEMAX") if makeMaX == "true" { // 等一会儿防止candle还没有加进CoinMap time.Sleep(200 * time.Millisecond) @@ -64,19 +65,20 @@ func (cd *MyCandle) Process(cr *core.Core) { makeSoft := false // makeVolSoft := true // makeVolSoft := false - if os.Getenv("SARDINE_MAKESOFTCANDLE") == "true" { + if os.Getenv("SIAGA_MAKESOFTCANDLE") == "true" { makeSoft = true } // 根据低维度candle,模拟出"软"的高纬度candle if cad.Period == "1m" && makeSoft { - fmt.Println("makeSoft:", cad.Period, cad.InstId) - MakeSoftCandles(cr, &cad.Candle) + fmt.Println("makeSoft:", cad.Period, cad.InstID) + MakeSoftCandles(cr, cad) } }(cd) - go func(cad *MyCandle) { - time.Sleep(100 * time.Millisecond) - cr.AddToGeneralCandleChnl(cad) - }(cd) + // 再次发布到redis channel, 給可能的收听者,我觉得没有这个必要了吧 + // go func(cad *MyCandle) { + // time.Sleep(100 * time.Millisecond) + // cad.AddToGeneralCandleChnl(cr) + // }(cd) } func (cd *MyCandle) InsertIntoPlate(cr *core.Core) (*core.Sample, error) { @@ -100,6 +102,87 @@ func (cd *MyCandle) InsertIntoPlate(cr *core.Core) (*core.Sample, error) { err := errors.New(erstr) return nil, err } - sm, err := po.RPushSample(cr, *cd, "candle") + sm, err := po.RPushSample(cr, &cd.Candle, "candle") return sm, err } +func (cad *MyCandle) AddToGeneralCandleChnl(cr *core.Core) { + suffix := "" + env := os.Getenv("GO_ENV") + if strings.Contains(env, "demoEnv") { + suffix = "-demoEnv" + } + redisCli := cr.RedisLocalCli + ab, _ := json.Marshal(cad.Candle) + _, err := redisCli.Publish(core.ALLCANDLES_PUBLISH+suffix, string(ab)).Result() + // logrus.Debug("publish, res,err:", res, err, "candle:", string(ab)) + if err != nil { + logrus.Debug("err of ma7|ma30 add to redis2:", err, cad.Candle.From) + } +} + +// TODO 用key名 BTC-USDT|3M|CandleInfo 维护唯一的一份。 +// 当这个key不存在的时候,创建这个key。周期的起始时间,目前MakeSoftCandle函数已经实现了这个逻辑。 +// 当存在时,检查现有key里的时间信息,和刚创建的周期其实时间一致不一致, +// 如果一致,更新这个CandleInfo的last值,如果需要的话,更新high和low值。 +// 当周期起始时间和已经保存的周期起始时间不一致的时候,说明到了跨周期的时间点了。这时候更新现有的key里保存的open,high,low,close信息为当前tickerInfo的last值。 +func (mcd MyCandle) GetSetCandleInfo(cr *core.Core, newPeriod string, ts int64) []interface{} { + tss := strconv.FormatInt(ts, 10) + cd := mcd.Candle + keyName := cd.InstID + "|" + newPeriod + "|candleData" + str, _ := cr.RedisLocalCli.Get(keyName).Result() + odata := []interface{}{} + founded := false + + if len(str) > 0 { + err := json.Unmarshal([]byte(str), &odata) + if err != nil { + logrus.Panic(GetFuncName(), " str:", str, " err:", err) + } else { + founded = true + } + } + if !founded { + cd.Data[0] = tss // 时间,数据都是新的 + cd.Data[1] = cd.Data[4] //开盘价 + cd.Data[2] = cd.Data[4] //最高价 + cd.Data[3] = cd.Data[4] //最低价 + bj, _ := json.Marshal(cd.Data) + cr.RedisLocalCli.Set(keyName, string(bj), 0) + return cd.Data + } else { + // 发现原有的candleData,且还没过期 + + if (odata[0]).(string) == tss { + oHigh := ToFloat64(odata[2]) + oLow := ToFloat64(odata[3]) + cd.Data[0] = tss + cd.Data[1] = odata[1] + cd.Data[5] = odata[5] + cd.Data[6] = odata[6] + if oHigh <= ToFloat64(cd.Data[4]) { + cd.Data[2] = cd.Data[4] + } else { + cd.Data[2] = odata[2] + } + if oLow >= ToFloat64(cd.Data[4]) { + cd.Data[3] = cd.Data[4] + } else { + cd.Data[3] = odata[3] + } + bj, _ := json.Marshal(cd.Data) + cr.RedisLocalCli.Set(keyName, string(bj), 0) + return cd.Data + } else { + // 发现原有的candleData,但是已经过期了 + cd.Data[0] = tss // 新开始一个cd周期,时间,数据都是新的 + cd.Data[1] = cd.Data[4] //开盘价 + cd.Data[2] = cd.Data[4] //最高价 + cd.Data[3] = cd.Data[4] //最低价 + cd.Data[5] = odata[5] + cd.Data[6] = odata[6] + bj, _ := json.Marshal(cd.Data) + cr.RedisLocalCli.Set(keyName, string(bj), 0) + return cd.Data + } + } +} diff --git a/modules/extent.go b/modules/extent.go index 87fccd3..e0c7ba0 100644 --- a/modules/extent.go +++ b/modules/extent.go @@ -152,7 +152,7 @@ func LoopMakeMaX(cr *core.Core) { // sz := utils.ShaiziInt(1500) + 500 time.Sleep(time.Duration(30) * time.Millisecond) err, ct := MakeMaX(cr, cad, 7) - logrus.Warn(GetFuncName(), " ma7 err:", err, " ct:", ct, " cd.InstID:", cd.InstId, " cd.Period:", cd.Period) + logrus.Warn(GetFuncName(), " ma7 err:", err, " ct:", ct, " cd.InstID:", cd.InstID, " cd.Period:", cd.Period) //TODO 这个思路不错,单行不通,远程redis禁不住这么频繁的请求 // cd.InvokeRestQFromRemote(cr, ct) }(cd) @@ -161,7 +161,7 @@ func LoopMakeMaX(cr *core.Core) { // sz := utils.ShaiziInt(2000) + 500 time.Sleep(time.Duration(30) * time.Millisecond) err, ct := MakeMaX(cr, cad, 30) - logrus.Warn(GetFuncName(), " ma30 err:", err, " ct:", ct, " cd.InstID:", cd.InstId, " cd.Period:", cd.Period) + logrus.Warn(GetFuncName(), " ma30 err:", err, " ct:", ct, " cd.InstID:", cd.InstID, " cd.Period:", cd.Period) // cd.InvokeRestQFromRemote(cr, ct) }(cd) // TODO TODO 这地方不能加延时,否则makeMax处理不过来,多的就丢弃了,造成maX的sortedSet比candle的短很多。后面所有依赖的逻辑都受影响. @@ -266,13 +266,13 @@ func MakeMaX(cr *core.Core, cl *core.Candle, count int) (error, int) { // tsa := time.UnixMilli(tsi).Format("01-02 15:03:04") // fmt.Println("MakeMaX candle: ", cl.InstID, cl.Period, tsa, cl.From) tss := strconv.FormatInt(tsi, 10) - keyName := "candle" + cl.Period + "|" + cl.InstId + "|ts:" + tss + keyName := "candle" + cl.Period + "|" + cl.InstID + "|ts:" + tss //过期时间:根号(当前candle的周期/1分钟)*10000 lastTime := time.UnixMilli(tsi) // lasts := lastTime.Format("2006-01-02 15:04") // 以当前candle的时间戳为起点倒推count个周期,取得所需candle用于计算maX - setName := "candle" + cl.Period + "|" + cl.InstId + "|sortedSet" + setName := "candle" + cl.Period + "|" + cl.InstID + "|sortedSet" // cdl, err := cr.GetLastCandleListOfCoin(cl.InstID, cl.Period, count, lastTime) cdl, err := GetRangeCandleSortedSet(cr, setName, count, lastTime) if err != nil { @@ -310,7 +310,7 @@ func MakeMaX(cr *core.Core, cl *core.Candle, count int) (error, int) { } mx := core.MaX{ KeyName: keyName, - InstID: cl.InstId, + InstID: cl.InstID, Period: cl.Period, From: cl.From, Count: count, @@ -349,7 +349,7 @@ func CandlesProcess(cr *core.Core) { // "count": 1 // }, // 从startTime开始,经历整数个(count * seg)之后,还能不大于分钟粒度的当前时间的话,那个时间点就是最近的当前段起始时间点 -func MakeSoftCandles(cr *core.Core, cd *core.Candle) { +func MakeSoftCandles(cr *core.Core, mcd *MyCandle) { segments := cr.Cfg.Config.Get("softCandleSegmentList").MustArray() for k, v := range segments { cs := core.CandleSegment{} @@ -362,7 +362,7 @@ func MakeSoftCandles(cr *core.Core, cd *core.Candle) { continue } // TODO: 通过序列化和反序列化,对原始的candle进行克隆,因为是对引用进行操作,所以每个seg里对candle进行操作都会改变原始对象,这和预期不符 - bt, _ := json.Marshal(cd) + bt, _ := json.Marshal(mcd.Candle) cd0 := core.Candle{} json.Unmarshal(bt, &cd0) @@ -384,7 +384,7 @@ func MakeSoftCandles(cr *core.Core, cd *core.Candle) { } otmi := otm.UnixMilli() cd1 := core.Candle{ - InstId: cd0.InstId, // string `json:"instId", string` + InstID: cd0.InstID, // string `json:"instId", string` Period: cs.Seg, // `json:"period", string` Data: cd0.Data, // `json:"data"` From: "soft|" + os.Getenv("HOSTNAME"), // string `json:"from"` @@ -392,7 +392,10 @@ func MakeSoftCandles(cr *core.Core, cd *core.Candle) { // cd0是从tickerInfo创建的1m Candle克隆来的, Data里只有Data[4]被赋值,是last,其他都是"-1" // TODO 填充其余几个未赋值的字段,除了成交量和成交美元数以外,并存入redis待用 // strconv.FormatInt(otmi, 10) - cd1.Data = cd0.GetSetCandleInfo(cr, cs.Seg, otmi) + mcd := MyCandle{ + Candle: cd0, + } + cd1.Data = mcd.GetSetCandleInfo(cr, cs.Seg, otmi) // 生成软交易量和交易数对,用于代替last生成max go func(k int) { time.Sleep(time.Duration(100*k) * time.Millisecond) @@ -400,3 +403,29 @@ func MakeSoftCandles(cr *core.Core, cd *core.Candle) { }(k) } } + +func MaXsProcess(cr *core.Core) { + for { + mx := <-cr.MaXProcessChan + logrus.Debug("mx: ", mx) + go func(maX *core.MaX) { + mmx := MyMaX{ + MaX: *mx, + } + mmx.Process(cr) + }(mx) + } +} + +func TickerInfoProcess(cr *core.Core) { + for { + ti := <-cr.TickerInforocessChan + logrus.Debug("ti: ", ti) + go func(ti *core.TickerInfo) { + mti := MyTickerInfo{ + TickerInfo: *ti, + } + mti.Process(cr) + }(ti) + } +} diff --git a/modules/maX.go b/modules/maX.go new file mode 100644 index 0000000..160cb7f --- /dev/null +++ b/modules/maX.go @@ -0,0 +1,85 @@ +package module + +import ( + // "encoding/json" + // "errors" + "fmt" + "github.com/phyer/core" + "os" + "strconv" + //"strings" + // "sync" + //"time" + // + // simple "github.com/bitly/go-simplejson" + // "github.com/go-redis/redis" + // "github.com/phyer/core/utils" + //logrus "github.com/sirupsen/logrus" +) + +type MyMaX struct { + core.MaX +} + +func (mmx *MyMaX) Process(cr *core.Core) { + mx := mmx.MaX + _, err := mx.SetToKey(cr) + if err != nil { + fmt.Println("max SetToKey err: ", err) + return + } + // TODO + go func() { + torqueSorted := os.Getenv("SARDINE_MAKESERIES") == "true" + if !torqueSorted { + return + } + sm, err := mmx.InsertIntoPlate(cr) + if err != nil { + // fmt.Println("InsertIntoPlate err: ", err) + } else { + // 只有在ma30计算完成,触发coasterChan相关动作 + if mx.Count == 30 { + ci := core.CoasterInfo{ + InstID: mx.InstID, + Period: mx.Period, + InsertedNew: sm != nil, + } + cr.CoasterChan <- &ci + } else { + // fmt.Println("maX: candle记录尚不足30条,无需触发coaster计算:", mx) + // TODO + // bj, _ := json.Marshal(mx) + // fmt.Println("mx:", string(bj)) + // 这个地方可以加个逻辑,给tunas端发消息,缺啥补啥 + } + } + }() + + // 发送给下级消息队列订阅者, 关掉了,暂时用不上,2024-12-16 + // cr.AddToGeneralMaXChnl(mx) +} + +func (mmx *MyMaX) InsertIntoPlate(cr *core.Core) (*core.Sample, error) { + mx := mmx.MaX + cr.Mu.Lock() + defer cr.Mu.Unlock() + pl, ok := cr.PlateMap[mx.InstID] + // 尝试放弃一级缓存 + // if !ok { + pl, _ = LoadPlate(cr, mx.InstID) + cr.PlateMap["period"+mx.Period] = pl + // } + _, ok = pl.CoasterMap["period"+mx.Period] + if !ok { + pl.MakeCoaster(cr, mx.Period) + } + // if pl.CoasterMap["period"+mx.Period] == nil { + // fmt.Println("candle coaster: ", mx.Period, pl.CoasterMap["period"+mx.Period], pl.CoasterMap) + // 创建失败的原因是原始数据不够,一般发生在服务中断了,缺少部分数据的情况下, 后续需要数据补全措施 + // err := errors.New("coaster创建失败 maX instId: " + mx.InstID + "; period: " + mx.Period) + // return nil, err + // } + sm, err := pl.CoasterMap["period"+mx.Period].RPushSample(cr, mx, "ma"+strconv.Itoa(mx.Count)) + return sm, err +} diff --git a/modules/plate.go b/modules/plate.go new file mode 100644 index 0000000..237859c --- /dev/null +++ b/modules/plate.go @@ -0,0 +1,36 @@ +package module + +import ( + "encoding/json" + // "errors" + // "fmt" + "github.com/phyer/core" + // "os" + // "strconv" + // "strings" + // // "sync" + // "time" + // // + // // simple "github.com/bitly/go-simplejson" + // "github.com/go-redis/redis" + // // "github.com/phyer/core/utils" + // logrus "github.com/sirupsen/logrus" +) + +// TODO 从redis里读出来已经存储的plate,如果不存在就创建一个新的 +func LoadPlate(cr *core.Core, instId string) (*core.Plate, error) { + pl := core.Plate{} + plateName := instId + "|plate" + _, err := cr.RedisLocalCli.Exists().Result() + if err == nil { + str, _ := cr.RedisLocalCli.Get(plateName).Result() + json.Unmarshal([]byte(str), &pl) + } else { + pl.Init(instId) + prs := cr.Cfg.Config.Get("candleDimentions").MustArray() + for _, v := range prs { + pl.MakeCoaster(cr, v.(string)) + } + } + return &pl, nil +} diff --git a/modules/tickerInfo.go b/modules/tickerInfo.go new file mode 100644 index 0000000..675010f --- /dev/null +++ b/modules/tickerInfo.go @@ -0,0 +1,81 @@ +package module + +import ( + // "errors" + "fmt" + "github.com/phyer/core" + // "math" + "os" + "strconv" +) + +type MyTickerInfo struct { + core.TickerInfo +} + +func (mti *MyTickerInfo) Process(cr *core.Core) { + ti := mti.TickerInfo + go func() { + // 无需发布出去, 下面内容注释 2024-12-15 + // tickerPush := os.Getenv("SIAGA_TICKERPUSH") == "true" + // if !tickerPush { + // return + // } + // err := AddToGeneralTickerChnl(ti) + // if err != nil { + // logrus.Warn("tickerPush err:", err, ti) + // } + }() + go func() { + torqueSorted := os.Getenv("SIAGA_TORQUESORTED") == "true" + if !torqueSorted { + return + } + // 这个功能貌似很有意思,以后可以考虑打开 + // ti.MakePriceSorted(cr, "2m") + // ti.MakePriceSorted(cr, "4m") + // ti.MakePriceSorted(cr, "8m") + //用这个方法保证事件在每个周期只发生一次 + }() + go func() { + tickerToCandle := os.Getenv("SIAGA_TICKERTOCANDLE") == "true" + if tickerToCandle { + fmt.Println("tickerToCandle: ", ti) + cd := mti.ConvertToCandle(cr, "1m") + cr.CandlesProcessChan <- cd + } + }() + + go func() { + ti.SetToKey(cr) + // 流出时间让下面的函数进行比对 + // time.Sleep(50 * time.Millisecond) + // snapshot 先关掉 2024-12-15 + // ti.MakeSnapShot(cr) + }() +} + +// TODO 当前这个版本的实现里,开盘价,最高价,最低价都不重要,主要是收盘价,用来算ma7,ma30,这个就够了,以后需要的话,再细化 +func (mti *MyTickerInfo) ConvertToCandle(cr *core.Core, period string) *core.Candle { + ti := mti.TickerInfo + hn := os.Getenv("HOSTNAME") + lst := strconv.FormatFloat(ti.Last, 'f', -1, 64) + tmi := ti.Ts + tmi = tmi - tmi%60000 + cd := core.Candle{ + InstID: ti.InstId, + Period: period, + Data: []interface{}{ + strconv.FormatInt(tmi, 10), //开始时间,Unix时间戳的毫秒数格式,如 1597026383085 + "-1", //o String 开盘价格 + "-1", //h String 最高价格 + "-1", //l String 最低价格 + lst, //c String 收盘价格 + "-1", //c String 成交量 + "-1", //c String 成交美元数 + }, + From: "tickerInfo|" + hn, + } + + return &cd +} diff --git a/modules/util.go b/modules/util.go index 2121893..b825f93 100644 --- a/modules/util.go +++ b/modules/util.go @@ -3,23 +3,23 @@ package module import ( "crypto/md5" "encoding/hex" - "encoding/json" - "errors" - "fmt" - simple "github.com/bitly/go-simplejson" - "github.com/go-redis/redis" - "github.com/phyer/core" - "math" - "math/rand" - "os" + // "encoding/json" + // "errors" + // "fmt" + // simple "github.com/bitly/go-simplejson" + // "github.com/go-redis/redis" + // "github.com/phyer/core" + // "math" + // "math/rand" + // "os" "reflect" "runtime" "strconv" - "strings" - "sync" - "time" - // "github.com/phyer/core/utils" - logrus "github.com/sirupsen/logrus" + // "strings" + // "sync" + // "time" + // // "github.com/phyer/core/utils" + // logrus "github.com/sirupsen/logrus" ) func NextPeriod(curPeriod string, idx int) string { diff --git a/siaga b/siaga new file mode 100755 index 0000000..141806f Binary files /dev/null and b/siaga differ