From bae89b0ad2773b21b8da78e243742edbd6198d2d Mon Sep 17 00:00:00 2001 From: zhangkun Date: Sun, 15 Dec 2024 16:00:35 +0800 Subject: [PATCH] up --- go.mod | 15 +-- go.sum | 6 +- main.go | 75 +++++------ modules/candle.go | 105 +++++++++++++++ modules/extent.go | 323 +++++++++++++++++++++++++++++++++++++++++++--- modules/util.go | 36 ++++++ 6 files changed, 491 insertions(+), 69 deletions(-) create mode 100644 modules/candle.go diff --git a/go.mod b/go.mod index 131930c..77f185b 100644 --- a/go.mod +++ b/go.mod @@ -1,30 +1,19 @@ module github.com/phyer/siaga -replace ( - v5sdk_go/config => ../core/submodules/okex/config - v5sdk_go/rest => ../core/submodules/okex/rest - v5sdk_go/utils => ../core/submodules/okex/utils - v5sdk_go/ws => ../core/submodules/okex/ws -) +replace github.com/phyer/siaga/modules => ./modules go 1.21 require ( github.com/bitly/go-simplejson v0.5.0 github.com/go-redis/redis v6.15.9+incompatible - github.com/phyer/core v0.1.1 + github.com/phyer/core v0.1.12 github.com/sirupsen/logrus v1.9.3 ) require ( - github.com/gorilla/websocket v1.5.1 // indirect github.com/onsi/ginkgo v1.16.5 // indirect github.com/onsi/gomega v1.18.1 // indirect github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196 // indirect - golang.org/x/net v0.17.0 // indirect golang.org/x/sys v0.13.0 // indirect - v5sdk_go/config v0.0.0-00010101000000-000000000000 // indirect - v5sdk_go/rest v0.0.0-00010101000000-000000000000 // indirect - v5sdk_go/utils v0.0.0-00010101000000-000000000000 // indirect - v5sdk_go/ws v0.0.0-00010101000000-000000000000 // indirect ) diff --git a/go.sum b/go.sum index 26e8dca..a75f7d1 100644 --- a/go.sum +++ b/go.sum @@ -28,8 +28,6 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= -github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= -github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= @@ -50,8 +48,8 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= -github.com/phyer/core v0.1.1 h1:mFlh+oV/K1pa7vbvo4wGDf5d+88wnRdrYnDcRaxAsRU= -github.com/phyer/core v0.1.1/go.mod h1:LyfJrdqSlm2MTOx0M3pnDntpwa64XD5nf0xYxvZ4El4= +github.com/phyer/core v0.1.12 h1:fOvE0T3obnzPUqT8F78i/rIwl/opa7ZhB3xx96GIFPc= +github.com/phyer/core v0.1.12/go.mod h1:oVP5mvnnJvI2Qxlnh4jYGj92DbH7XyY2xeRagQ3hdo8= github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196 h1:P1sxgCsS0VIL38ufZzgUuZLLyY/B+po6kSY7ziNZT7E= github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196/go.mod h1:iZexs5agdApNlp8HW/FqKgma4Ij1x8/o+ZLcMvY3f80= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/main.go b/main.go index c65363e..a10e93c 100644 --- a/main.go +++ b/main.go @@ -1,24 +1,23 @@ package main import ( - "fmt" + // "fmt" "os" "time" "github.com/phyer/core" - md "github.com/phyer/siaga/module" - "github.com/sirupsen/logrus" + md "github.com/phyer/siaga/modules" + // "github.com/sirupsen/logrus" ) func main() { cr := core.Core{} cr.Init() - cli, err := cr.GetRedisCli() + cli, _ := cr.GetRedisLocalCli() cr.RedisRemoteCli = cli - allCandleAdd := core.ALLCANDLES_PUBLISH - allMaXAdd := core.ALLMAX_PUBLISH + rdsLs, _ := md.GetRemoteRedisConfigList() // 目前只有phyer里部署的tunas会发布tickerInfo信息 go func(vv *core.RedisConfig) { allowed := os.Getenv("SIAGA_ACCEPTTICKER") == "true" @@ -26,57 +25,61 @@ func main() { return } md.LoopSubscribe(&cr, core.TICKERINFO_PUBLISH, vv) - }(v) + }(rdsLs[0]) time.Sleep(5 * time.Second) go func(vv *core.RedisConfig) { allowed := os.Getenv("SIAGA_ACCEPTCANDLE") == "true" if !allowed { return } - // core.LoopSubscribe(&cr, allCandleAdd, vv) - }(v) + md.LoopSubscribe(&cr, core.ALLCANDLES_PUBLISH, vv) + }(rdsLs[0]) go func(vv *core.RedisConfig) { allowed := os.Getenv("SIAGA_ACCEPTMAX") == "true" if !allowed { return } - core.LoopSubscribe(&cr, allMaXAdd, vv) - }(v) + md.LoopSubscribe(&cr, core.ALLMAXES_PUBLISH, vv) + }(rdsLs[0]) + // 下面这个暂时不运行, 在环境变量里把它关掉 go func(vv *core.RedisConfig) { allowed := os.Getenv("SIAGA_ACCEPTSERIES") == "true" if !allowed { return } - core.LoopSubscribe(&cr, core.ALLSERIESINFO_PUBLISH, vv) - }(v) + md.LoopSubscribe(&cr, core.ALLSERIESINFO_PUBLISH, vv) + }(rdsLs[0]) go func() { - core.LoopMakeMaX(&cr) + md.LoopMakeMaX(&cr) }() + // 这些临时关掉,很快打开 + // go func() { + // core.LoopCheckRemoteRedis(&cr) + // }() go func() { - core.LoopCheckRemoteRedis(&cr) - }() - go func() { - core.CandlesProcess(&cr) - }() - go func() { - core.MaXsProcess(&cr) - }() - go func() { - core.TickerInfoProcess(&cr) - }() - go func() { - core.CoasterProcess(&cr) - }() - go func() { - core.SeriesProcess(&cr) - }() - go func() { - core.SegmentItemProcess(&cr) - }() - go func() { - core.ShearForceProcess(&cr) + md.CandlesProcess(&cr) }() + // go func() { + // core.MaXsProcess(&cr) + // }() + // go func() { + // core.TickerInfoProcess(&cr) + // }() + + // 这些暂时不运行, 以后要不要运行再说 + // go func() { + // core.CoasterProcess(&cr) + // }() + // go func() { + // core.SeriesProcess(&cr) + // }() + // go func() { + // core.SegmentItemProcess(&cr) + // }() + // go func() { + // core.ShearForceProcess(&cr) + // }() go func() { core.WriteLogProcess(&cr) }() diff --git a/modules/candle.go b/modules/candle.go new file mode 100644 index 0000000..0e62be4 --- /dev/null +++ b/modules/candle.go @@ -0,0 +1,105 @@ +package module + +import ( + "encoding/json" + "errors" + "fmt" + "github.com/phyer/core" + "os" + "strconv" + "strings" + // "sync" + "time" + // + // simple "github.com/bitly/go-simplejson" + "github.com/go-redis/redis" + // "github.com/phyer/core/utils" + logrus "github.com/sirupsen/logrus" +) + +type MyCandle struct { + core.Candle +} + +func (cd *MyCandle) Process(cr *core.Core) { + tmi := ToInt64(cd.Data[0]) + time.UnixMilli(tmi).Format("01-02 15:04") + // 如果sardine是运行在远端,不能让candle存盘, 因为这是tunnas该干的事情,不能跟它抢 + founded := cd.Filter(cr) + if !founded { + return + } + go func() { + saveCandle := os.Getenv("SARDINE_SAVECANDLE") + if saveCandle == "true" { + _, err := cd.SetToKey(cr) + if err != nil { + logrus.Warning("SetToKey err: ", err) + } + } + }() + // TODO update plate and coaster + go func() { + makeSeries := os.Getenv("SARDINE_MAKESERIES") + if makeSeries == "true" { + _, err := cd.InsertIntoPlate(cr) + if err != nil { + logrus.Warning("SetToKey err: ", err) + } + } + }() + + // 由于max可以从远端直接拿,不需要sardine自己做,所以可以sardine做也可以不做 + go func(cad *MyCandle) { + makeMaX := os.Getenv("SARDINE_MAKEMAX") + if makeMaX == "true" { + // 等一会儿防止candle还没有加进CoinMap + time.Sleep(200 * time.Millisecond) + cr.MakeMaXsChan <- &cad.Candle + } + }(cd) + + go func(cad *MyCandle) { + // 触发制作插值candle + makeSoft := false + // makeVolSoft := true + // makeVolSoft := false + if os.Getenv("SARDINE_MAKESOFTCANDLE") == "true" { + makeSoft = true + } + // 根据低维度candle,模拟出"软"的高纬度candle + if cad.Period == "1m" && makeSoft { + fmt.Println("makeSoft:", cad.Period, cad.InstId) + MakeSoftCandles(cr, &cad.Candle) + } + }(cd) + go func(cad *MyCandle) { + time.Sleep(100 * time.Millisecond) + cr.AddToGeneralCandleChnl(cad) + }(cd) +} + +func (cd *MyCandle) InsertIntoPlate(cr *core.Core) (*core.Sample, error) { + cr.Mu.Lock() + defer cr.Mu.Unlock() + // pl, plateFounded := cr.PlateMap[cd.InstID] + // if !plateFounded || pl == nil { + pl, _ := LoadPlate(cr, cd.InstID) + cr.PlateMap[cd.InstID] = pl + // } + po, coasterFounded := pl.CoasterMap["period"+cd.Period] + err := errors.New("") + if !coasterFounded { + pl.MakeCoaster(cr, cd.Period) + } + + if len(po.InstID) == 0 { + // logrus.Debug("candle coaster: ", cd.Period, pl.CoasterMap["period"+cd.Period], pl.CoasterMap) + //创建失败的原因是原始数据不够,一般发生在服务中断了,缺少部分数据的情况下, 后续需要数据补全措施 + erstr := fmt.Sprintln("coaster创建失败 candle instID: "+cd.InstID+"; period: "+cd.Period, "coasterFounded: ", coasterFounded, " ", err) + err := errors.New(erstr) + return nil, err + } + sm, err := po.RPushSample(cr, *cd, "candle") + return sm, err +} diff --git a/modules/extent.go b/modules/extent.go index 29b88e8..87fccd3 100644 --- a/modules/extent.go +++ b/modules/extent.go @@ -8,15 +8,49 @@ import ( "os" "strconv" "strings" - "sync" + // "sync" "time" - - simple "github.com/bitly/go-simplejson" + // + // simple "github.com/bitly/go-simplejson" "github.com/go-redis/redis" // "github.com/phyer/core/utils" logrus "github.com/sirupsen/logrus" ) +func GetRemoteRedisConfigList() ([]*core.RedisConfig, error) { + list := []*core.RedisConfig{} + envListStr := os.Getenv("SARDINE_REMOTE_REDIS_LIST") + envList := strings.Split(envListStr, "|") + for _, v := range envList { + if len(v) == 0 { + continue + } + urlstr := core.REMOTE_REDIS_PRE_NAME + v + "_URL" + indexstr := core.REMOTE_REDIS_PRE_NAME + v + "_INDEX" + password := os.Getenv(core.REMOTE_REDIS_PRE_NAME + v + "_PASSWORD") + // channelstr := core.REMOTE_REDIS_PRE_NAME + v + "_CHANNEL_PRENAME" + // channelPreName := os.Getenv(channelstr) + url := os.Getenv(urlstr) + index := os.Getenv(indexstr) + if len(url) == 0 || len(index) == 0 { + err := errors.New("remote redis config err:" + urlstr + "," + url + "," + indexstr + "," + index) + return list, err + } + idx, err := strconv.Atoi(index) + if err != nil { + return list, err + } + curConf := core.RedisConfig{ + Url: url, + Password: password, + Index: idx, + // ChannelPreName: channelPreName, + } + list = append(list, &curConf) + } + return list, nil +} + func LoopSubscribe(cr *core.Core, channelName string, redisConf *core.RedisConfig) { redisRemoteCli := cr.RedisRemoteCli suffix := "" @@ -72,7 +106,7 @@ func LoopSubscribe(cr *core.Core, channelName string, redisConf *core.RedisConfi } case "maX": { - mx := MaX{} + mx := core.MaX{} if msg.Payload == "" { continue } @@ -87,7 +121,7 @@ func LoopSubscribe(cr *core.Core, channelName string, redisConf *core.RedisConfi case "tickerInfo": { //tickerInfo: map[askPx:2.2164 askSz:17.109531 bidPx:2.2136 bidSz:73 high24h:2.497 instId:STX-USDT instType:SPOT last:2.2136 lastSz:0 low24h:2.0508 open24h:2.42 sodUtc0:2.4266 sodUtc8:2.4224 ts:1637077323552 vol24h:5355479.488179 :12247398.975501] - ti := TickerInfo{} + ti := core.TickerInfo{} err := json.Unmarshal([]byte(msg.Payload), &ti) if err != nil { logrus.Warning("tickerInfo payload unmarshal err: ", err, msg.Payload) @@ -95,17 +129,274 @@ func LoopSubscribe(cr *core.Core, channelName string, redisConf *core.RedisConfi cr.TickerInforocessChan <- &ti break } - case "seriesInfo": - { - //tickerInfo: map[askPx:2.2164 askSz:17.109531 bidPx:2.2136 bidSz:73 high24h:2.497 instId:STX-USDT instType:SPOT last:2.2136 lastSz:0 low24h:2.0508 open24h:2.42 sodUtc0:2.4266 sodUtc8:2.4224 ts:1637077323552 vol24h:5355479.488179 :12247398.975501] - sei := SeriesInfo{} - err := json.Unmarshal([]byte(msg.Payload), &sei) - if err != nil { - logrus.Warning("seriesInfo payload unmarshal err: ", err, msg.Payload) - } - cr.SeriesChan <- &sei - break - } + // case "seriesInfo": + // { + // //tickerInfo: map[askPx:2.2164 askSz:17.109531 bidPx:2.2136 bidSz:73 high24h:2.497 instId:STX-USDT instType:SPOT last:2.2136 lastSz:0 low24h:2.0508 open24h:2.42 sodUtc0:2.4266 sodUtc8:2.4224 ts:1637077323552 vol24h:5355479.488179 :12247398.975501] + // sei := SeriesInfo{} + // err := json.Unmarshal([]byte(msg.Payload), &sei) + // if err != nil { + // logrus.Warning("seriesInfo payload unmarshal err: ", err, msg.Payload) + // } + // cr.SeriesChan <- &sei + // break + // } } } } + +func LoopMakeMaX(cr *core.Core) { + for { + cd := <-cr.MakeMaXsChan + go func(cad *core.Candle) { + //当一个candle的多个时间点的数据几乎同时到达时,顺序无法保证,制作maX会因为中间缺失数据而计算错,因此,等待一秒钟等数据都全了再算 + // sz := utils.ShaiziInt(1500) + 500 + time.Sleep(time.Duration(30) * time.Millisecond) + err, ct := MakeMaX(cr, cad, 7) + logrus.Warn(GetFuncName(), " ma7 err:", err, " ct:", ct, " cd.InstID:", cd.InstId, " cd.Period:", cd.Period) + //TODO 这个思路不错,单行不通,远程redis禁不住这么频繁的请求 + // cd.InvokeRestQFromRemote(cr, ct) + }(cd) + go func(cad *core.Candle) { + //当一个candle的多个时间点的数据几乎同时到达时,顺序无法保证,制作maX会因为中间缺失数据而计算错,因此,等待一秒钟等数据都全了再算 + // sz := utils.ShaiziInt(2000) + 500 + time.Sleep(time.Duration(30) * time.Millisecond) + err, ct := MakeMaX(cr, cad, 30) + logrus.Warn(GetFuncName(), " ma30 err:", err, " ct:", ct, " cd.InstID:", cd.InstId, " cd.Period:", cd.Period) + // cd.InvokeRestQFromRemote(cr, ct) + }(cd) + // TODO TODO 这地方不能加延时,否则makeMax处理不过来,多的就丢弃了,造成maX的sortedSet比candle的短很多。后面所有依赖的逻辑都受影响. + // time.Sleep(300 * time.Millisecond) + } +} + +// setName := "candle" + period + "|" + instId + "|sortedSet" +// count: 倒推多少个周期开始拿数据 +// from: 倒推的起始时间点 +// ctype: candle或者maX +func GetRangeCandleSortedSet(cr *core.Core, setName string, count int, from time.Time) (*core.CandleList, error) { + cdl := core.CandleList{} + ary1 := strings.Split(setName, "|") + ary2 := []string{} + period := "" + ary2 = strings.Split(ary1[0], "candle") + period = ary2[1] + + dui, err := cr.PeriodToMinutes(period) + if err != nil { + return nil, err + } + fromt := from.UnixMilli() + nw := time.Now().UnixMilli() + if fromt > nw*2 { + err := errors.New("时间错了需要debug") + logrus.Warning(err.Error()) + return nil, err + } + froms := strconv.FormatInt(fromt, 10) + sti := fromt - dui*int64(count)*60*1000 + sts := strconv.FormatInt(sti, 10) + opt := redis.ZRangeBy{ + Min: sts, + Max: froms, + Count: int64(count), + } + ary := []string{} + extt, err := GetExpiration(cr, period) + ot := time.Now().Add(extt * -1) + oti := ot.UnixMilli() + cli := cr.RedisLocalCli + cli.LTrim(setName, 0, oti) + cunt, _ := cli.ZRemRangeByScore(setName, "0", strconv.FormatInt(oti, 10)).Result() + if cunt > 0 { + logrus.Warning("移出过期的引用数量:", setName, count, "ZRemRangeByScore ", setName, 0, strconv.FormatInt(oti, 10)) + } + logrus.Info("ZRevRangeByScore ", setName, opt) + ary, err = cli.ZRevRangeByScore(setName, opt).Result() + if err != nil { + return &cdl, err + } + keyAry, err := cli.MGet(ary...).Result() + if err != nil || len(keyAry) == 0 { + logrus.Warning("no record with cmd: ZRevRangeByScore ", setName, froms, sts, " ", err.Error()) + logrus.Warning("zrev lens of ary: lens: ", len(ary), "GetRangeSortedSet ZRevRangeByScore:", "setName:", setName, "opt.Max:", opt.Max, "opt.Min:", opt.Min) + return &cdl, err + } + for _, str := range keyAry { + if str == nil { + continue + } + cd := core.Candle{} + err := json.Unmarshal([]byte(str.(string)), &cd) + if err != nil { + logrus.Warn(GetFuncName(), err, str.(string)) + } + tmi := ToInt64(cd.Data[0]) + tm := time.UnixMilli(tmi) + if tm.Sub(from) > 0 { + break + } + cdl.List = append(cdl.List, &cd) + } + cdl.Count = count + return &cdl, nil +} + +func GetExpiration(cr *core.Core, per string) (time.Duration, error) { + if len(per) == 0 { + erstr := fmt.Sprint("period没有设置") + logrus.Warn(erstr) + err := errors.New(erstr) + return 0, err + } + exp, err := cr.PeriodToMinutes(per) + dur := time.Duration(exp*49) * time.Minute + return dur, err +} + +func MakeMaX(cr *core.Core, cl *core.Candle, count int) (error, int) { + data := cl.Data + js, _ := json.Marshal(data) + // cjs, _ := json.Marshal(cl) + if len(data) == 0 { + err := errors.New("data is block: " + string(js)) + return err, 0 + } + + tsi := ToInt64(data[0]) + // tsa := time.UnixMilli(tsi).Format("01-02 15:03:04") + // fmt.Println("MakeMaX candle: ", cl.InstID, cl.Period, tsa, cl.From) + tss := strconv.FormatInt(tsi, 10) + keyName := "candle" + cl.Period + "|" + cl.InstId + "|ts:" + tss + //过期时间:根号(当前candle的周期/1分钟)*10000 + + lastTime := time.UnixMilli(tsi) + // lasts := lastTime.Format("2006-01-02 15:04") + // 以当前candle的时间戳为起点倒推count个周期,取得所需candle用于计算maX + setName := "candle" + cl.Period + "|" + cl.InstId + "|sortedSet" + // cdl, err := cr.GetLastCandleListOfCoin(cl.InstID, cl.Period, count, lastTime) + cdl, err := GetRangeCandleSortedSet(cr, setName, count, lastTime) + if err != nil { + return err, 0 + } + + // fmt.Println("makeMaX: list: ", "instId: ", cl.InstID, "cl.Period: ", cl.Period, " lastTime:", lastTime, " count: ", count) + amountLast := float64(0) + ct := float64(0) + // fmt.Println("makeMax len of GetLastCandleListOfCoin list: ", len(cdl.List), "makeMax err of GetLastCandleListOfCoin: ", err) + if len(cdl.List) == 0 { + return err, 0 + } + // ljs, _ := json.Marshal(cdl.List) + // fmt.Println("makeMax: ljs: ", string(ljs)) + for _, v := range cdl.List { + curLast, err := strconv.ParseFloat(v.Data[4].(string), 64) + if err != nil { + continue + } + if curLast > 0 { + ct++ + } + amountLast += curLast + //---------------------------------------------- + } + avgLast := amountLast / ct + if float64(ct) < float64(count) { + err := errors.New("no enough source to calculate maX ") + return err, int(float64(count) - ct) + // fmt.Println("makeMax err: 没有足够的数据进行计算ma", "candle:", cl, "counts:", count, "ct:", ct, "avgLast: ") + } else { + // fmt.Println("makeMax keyName: ma", count, keyName, " avgLast: ", avgLast, "ts: ", tsi, "ct: ", ct, "ots: ", ots, "candle: ", string(cjs)) + + } + mx := core.MaX{ + KeyName: keyName, + InstID: cl.InstId, + Period: cl.Period, + From: cl.From, + Count: count, + Ts: tsi, + AvgVal: avgLast, + } + dt := []interface{}{} + dt = append(dt, mx.Ts) + dt = append(dt, mx.AvgVal) + dt = append(dt, ct) + mx.Data = dt + + // key存到redis + + cr.MaXProcessChan <- &mx + return nil, 0 +} + +func CandlesProcess(cr *core.Core) { + for { + cd := <-cr.CandlesProcessChan + logrus.Debug("cd: ", cd) + go func(cad *core.Candle) { + mcd := MyCandle{ + Candle: *cad, + } + mcd.Process(cr) + }(cd) + } +} + +// 使用当前某个原始维度的candle对象,生成其他目标维度的candle对象,比如用3分钟的candle可以生成15分钟及以上的candle +// { +// "startTime": "2021-12-04 20:00", +// "seg": "m", +// "count": 1 +// }, +// 从startTime开始,经历整数个(count * seg)之后,还能不大于分钟粒度的当前时间的话,那个时间点就是最近的当前段起始时间点 +func MakeSoftCandles(cr *core.Core, cd *core.Candle) { + segments := cr.Cfg.Config.Get("softCandleSegmentList").MustArray() + for k, v := range segments { + cs := core.CandleSegment{} + sv, _ := json.Marshal(v) + json.Unmarshal(sv, &cs) + // if k > 2 { + // continue + // } + if !cs.Enabled { + continue + } + // TODO: 通过序列化和反序列化,对原始的candle进行克隆,因为是对引用进行操作,所以每个seg里对candle进行操作都会改变原始对象,这和预期不符 + bt, _ := json.Marshal(cd) + cd0 := core.Candle{} + json.Unmarshal(bt, &cd0) + + tmi := ToInt64(cd0.Data[0]) + tm := time.UnixMilli(tmi) + if tm.Unix() > 10*time.Now().Unix() { + continue + } + // 下面这几种目标维度的,不生成softCandle + if cs.Seg == "1m" { + continue + } + + otm, err := cr.PeriodToLastTime(cs.Seg, tm) + logrus.Warn("MakeSoftCandles cs.Seg: ", cs.Seg, ", otm:", otm) + + if err != nil { + logrus.Warning("MakeSoftCandles err: ", err) + } + otmi := otm.UnixMilli() + cd1 := core.Candle{ + InstId: cd0.InstId, // string `json:"instId", string` + Period: cs.Seg, // `json:"period", string` + Data: cd0.Data, // `json:"data"` + From: "soft|" + os.Getenv("HOSTNAME"), // string `json:"from"` + } + // cd0是从tickerInfo创建的1m Candle克隆来的, Data里只有Data[4]被赋值,是last,其他都是"-1" + // TODO 填充其余几个未赋值的字段,除了成交量和成交美元数以外,并存入redis待用 + // strconv.FormatInt(otmi, 10) + cd1.Data = cd0.GetSetCandleInfo(cr, cs.Seg, otmi) + // 生成软交易量和交易数对,用于代替last生成max + go func(k int) { + time.Sleep(time.Duration(100*k) * time.Millisecond) + cr.CandlesProcessChan <- &cd1 + }(k) + } +} diff --git a/modules/util.go b/modules/util.go index fd21198..2121893 100644 --- a/modules/util.go +++ b/modules/util.go @@ -1,6 +1,8 @@ package module import ( + "crypto/md5" + "encoding/hex" "encoding/json" "errors" "fmt" @@ -10,6 +12,7 @@ import ( "math" "math/rand" "os" + "reflect" "runtime" "strconv" "strings" @@ -69,3 +72,36 @@ func Md5V(str string) string { h.Write([]byte(str)) return hex.EncodeToString(h.Sum(nil)) } + +func ToString(val interface{}) string { + valstr := "" + if reflect.TypeOf(val).Name() == "string" { + valstr = val.(string) + } else if reflect.TypeOf(val).Name() == "float64" { + valstr = strconv.FormatFloat(val.(float64), 'f', 1, 64) + } else if reflect.TypeOf(val).Name() == "int64" { + valstr = strconv.FormatInt(val.(int64), 16) + } + return valstr +} + +func ToInt64(val interface{}) int64 { + vali := int64(0) + if reflect.TypeOf(val).Name() == "string" { + vali, _ = strconv.ParseInt(val.(string), 10, 64) + } else if reflect.TypeOf(val).Name() == "float64" { + vali = int64(val.(float64)) + } + return vali +} +func ToFloat64(val interface{}) float64 { + valf := float64(0) + if reflect.TypeOf(val).Name() == "string" { + valf, _ = strconv.ParseFloat(val.(string), 64) + } else if reflect.TypeOf(val).Name() == "float64" { + valf = val.(float64) + } else if reflect.TypeOf(val).Name() == "int64" { + valf = float64(val.(int64)) + } + return valf +}