up
This commit is contained in:
parent
d7f5dc9bfe
commit
bae89b0ad2
15
go.mod
15
go.mod
@ -1,30 +1,19 @@
|
|||||||
module github.com/phyer/siaga
|
module github.com/phyer/siaga
|
||||||
|
|
||||||
replace (
|
replace github.com/phyer/siaga/modules => ./modules
|
||||||
v5sdk_go/config => ../core/submodules/okex/config
|
|
||||||
v5sdk_go/rest => ../core/submodules/okex/rest
|
|
||||||
v5sdk_go/utils => ../core/submodules/okex/utils
|
|
||||||
v5sdk_go/ws => ../core/submodules/okex/ws
|
|
||||||
)
|
|
||||||
|
|
||||||
go 1.21
|
go 1.21
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/bitly/go-simplejson v0.5.0
|
github.com/bitly/go-simplejson v0.5.0
|
||||||
github.com/go-redis/redis v6.15.9+incompatible
|
github.com/go-redis/redis v6.15.9+incompatible
|
||||||
github.com/phyer/core v0.1.1
|
github.com/phyer/core v0.1.12
|
||||||
github.com/sirupsen/logrus v1.9.3
|
github.com/sirupsen/logrus v1.9.3
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/gorilla/websocket v1.5.1 // indirect
|
|
||||||
github.com/onsi/ginkgo v1.16.5 // indirect
|
github.com/onsi/ginkgo v1.16.5 // indirect
|
||||||
github.com/onsi/gomega v1.18.1 // indirect
|
github.com/onsi/gomega v1.18.1 // indirect
|
||||||
github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196 // indirect
|
github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196 // indirect
|
||||||
golang.org/x/net v0.17.0 // indirect
|
|
||||||
golang.org/x/sys v0.13.0 // indirect
|
golang.org/x/sys v0.13.0 // indirect
|
||||||
v5sdk_go/config v0.0.0-00010101000000-000000000000 // indirect
|
|
||||||
v5sdk_go/rest v0.0.0-00010101000000-000000000000 // indirect
|
|
||||||
v5sdk_go/utils v0.0.0-00010101000000-000000000000 // indirect
|
|
||||||
v5sdk_go/ws v0.0.0-00010101000000-000000000000 // indirect
|
|
||||||
)
|
)
|
||||||
|
6
go.sum
6
go.sum
@ -28,8 +28,6 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
|
|||||||
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||||
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
|
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
|
||||||
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
|
|
||||||
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
|
|
||||||
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
|
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
|
||||||
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
|
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
|
||||||
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
|
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
|
||||||
@ -50,8 +48,8 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y
|
|||||||
github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
|
github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
|
||||||
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
|
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
|
||||||
github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
|
github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
|
||||||
github.com/phyer/core v0.1.1 h1:mFlh+oV/K1pa7vbvo4wGDf5d+88wnRdrYnDcRaxAsRU=
|
github.com/phyer/core v0.1.12 h1:fOvE0T3obnzPUqT8F78i/rIwl/opa7ZhB3xx96GIFPc=
|
||||||
github.com/phyer/core v0.1.1/go.mod h1:LyfJrdqSlm2MTOx0M3pnDntpwa64XD5nf0xYxvZ4El4=
|
github.com/phyer/core v0.1.12/go.mod h1:oVP5mvnnJvI2Qxlnh4jYGj92DbH7XyY2xeRagQ3hdo8=
|
||||||
github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196 h1:P1sxgCsS0VIL38ufZzgUuZLLyY/B+po6kSY7ziNZT7E=
|
github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196 h1:P1sxgCsS0VIL38ufZzgUuZLLyY/B+po6kSY7ziNZT7E=
|
||||||
github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196/go.mod h1:iZexs5agdApNlp8HW/FqKgma4Ij1x8/o+ZLcMvY3f80=
|
github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196/go.mod h1:iZexs5agdApNlp8HW/FqKgma4Ij1x8/o+ZLcMvY3f80=
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
|
75
main.go
75
main.go
@ -1,24 +1,23 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
// "fmt"
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/phyer/core"
|
"github.com/phyer/core"
|
||||||
md "github.com/phyer/siaga/module"
|
md "github.com/phyer/siaga/modules"
|
||||||
"github.com/sirupsen/logrus"
|
// "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
cr := core.Core{}
|
cr := core.Core{}
|
||||||
cr.Init()
|
cr.Init()
|
||||||
|
|
||||||
cli, err := cr.GetRedisCli()
|
cli, _ := cr.GetRedisLocalCli()
|
||||||
cr.RedisRemoteCli = cli
|
cr.RedisRemoteCli = cli
|
||||||
|
|
||||||
allCandleAdd := core.ALLCANDLES_PUBLISH
|
rdsLs, _ := md.GetRemoteRedisConfigList()
|
||||||
allMaXAdd := core.ALLMAX_PUBLISH
|
|
||||||
// 目前只有phyer里部署的tunas会发布tickerInfo信息
|
// 目前只有phyer里部署的tunas会发布tickerInfo信息
|
||||||
go func(vv *core.RedisConfig) {
|
go func(vv *core.RedisConfig) {
|
||||||
allowed := os.Getenv("SIAGA_ACCEPTTICKER") == "true"
|
allowed := os.Getenv("SIAGA_ACCEPTTICKER") == "true"
|
||||||
@ -26,57 +25,61 @@ func main() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
md.LoopSubscribe(&cr, core.TICKERINFO_PUBLISH, vv)
|
md.LoopSubscribe(&cr, core.TICKERINFO_PUBLISH, vv)
|
||||||
}(v)
|
}(rdsLs[0])
|
||||||
time.Sleep(5 * time.Second)
|
time.Sleep(5 * time.Second)
|
||||||
go func(vv *core.RedisConfig) {
|
go func(vv *core.RedisConfig) {
|
||||||
allowed := os.Getenv("SIAGA_ACCEPTCANDLE") == "true"
|
allowed := os.Getenv("SIAGA_ACCEPTCANDLE") == "true"
|
||||||
if !allowed {
|
if !allowed {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// core.LoopSubscribe(&cr, allCandleAdd, vv)
|
md.LoopSubscribe(&cr, core.ALLCANDLES_PUBLISH, vv)
|
||||||
}(v)
|
}(rdsLs[0])
|
||||||
go func(vv *core.RedisConfig) {
|
go func(vv *core.RedisConfig) {
|
||||||
allowed := os.Getenv("SIAGA_ACCEPTMAX") == "true"
|
allowed := os.Getenv("SIAGA_ACCEPTMAX") == "true"
|
||||||
if !allowed {
|
if !allowed {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
core.LoopSubscribe(&cr, allMaXAdd, vv)
|
md.LoopSubscribe(&cr, core.ALLMAXES_PUBLISH, vv)
|
||||||
}(v)
|
}(rdsLs[0])
|
||||||
|
// 下面这个暂时不运行, 在环境变量里把它关掉
|
||||||
go func(vv *core.RedisConfig) {
|
go func(vv *core.RedisConfig) {
|
||||||
allowed := os.Getenv("SIAGA_ACCEPTSERIES") == "true"
|
allowed := os.Getenv("SIAGA_ACCEPTSERIES") == "true"
|
||||||
if !allowed {
|
if !allowed {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
core.LoopSubscribe(&cr, core.ALLSERIESINFO_PUBLISH, vv)
|
md.LoopSubscribe(&cr, core.ALLSERIESINFO_PUBLISH, vv)
|
||||||
}(v)
|
}(rdsLs[0])
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
core.LoopMakeMaX(&cr)
|
md.LoopMakeMaX(&cr)
|
||||||
}()
|
}()
|
||||||
|
// 这些临时关掉,很快打开
|
||||||
|
// go func() {
|
||||||
|
// core.LoopCheckRemoteRedis(&cr)
|
||||||
|
// }()
|
||||||
go func() {
|
go func() {
|
||||||
core.LoopCheckRemoteRedis(&cr)
|
md.CandlesProcess(&cr)
|
||||||
}()
|
|
||||||
go func() {
|
|
||||||
core.CandlesProcess(&cr)
|
|
||||||
}()
|
|
||||||
go func() {
|
|
||||||
core.MaXsProcess(&cr)
|
|
||||||
}()
|
|
||||||
go func() {
|
|
||||||
core.TickerInfoProcess(&cr)
|
|
||||||
}()
|
|
||||||
go func() {
|
|
||||||
core.CoasterProcess(&cr)
|
|
||||||
}()
|
|
||||||
go func() {
|
|
||||||
core.SeriesProcess(&cr)
|
|
||||||
}()
|
|
||||||
go func() {
|
|
||||||
core.SegmentItemProcess(&cr)
|
|
||||||
}()
|
|
||||||
go func() {
|
|
||||||
core.ShearForceProcess(&cr)
|
|
||||||
}()
|
}()
|
||||||
|
// go func() {
|
||||||
|
// core.MaXsProcess(&cr)
|
||||||
|
// }()
|
||||||
|
// go func() {
|
||||||
|
// core.TickerInfoProcess(&cr)
|
||||||
|
// }()
|
||||||
|
|
||||||
|
// 这些暂时不运行, 以后要不要运行再说
|
||||||
|
// go func() {
|
||||||
|
// core.CoasterProcess(&cr)
|
||||||
|
// }()
|
||||||
|
// go func() {
|
||||||
|
// core.SeriesProcess(&cr)
|
||||||
|
// }()
|
||||||
|
// go func() {
|
||||||
|
// core.SegmentItemProcess(&cr)
|
||||||
|
// }()
|
||||||
|
// go func() {
|
||||||
|
// core.ShearForceProcess(&cr)
|
||||||
|
// }()
|
||||||
go func() {
|
go func() {
|
||||||
core.WriteLogProcess(&cr)
|
core.WriteLogProcess(&cr)
|
||||||
}()
|
}()
|
||||||
|
105
modules/candle.go
Normal file
105
modules/candle.go
Normal file
@ -0,0 +1,105 @@
|
|||||||
|
package module
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"github.com/phyer/core"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
// "sync"
|
||||||
|
"time"
|
||||||
|
//
|
||||||
|
// simple "github.com/bitly/go-simplejson"
|
||||||
|
"github.com/go-redis/redis"
|
||||||
|
// "github.com/phyer/core/utils"
|
||||||
|
logrus "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MyCandle struct {
|
||||||
|
core.Candle
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cd *MyCandle) Process(cr *core.Core) {
|
||||||
|
tmi := ToInt64(cd.Data[0])
|
||||||
|
time.UnixMilli(tmi).Format("01-02 15:04")
|
||||||
|
// 如果sardine是运行在远端,不能让candle存盘, 因为这是tunnas该干的事情,不能跟它抢
|
||||||
|
founded := cd.Filter(cr)
|
||||||
|
if !founded {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
saveCandle := os.Getenv("SARDINE_SAVECANDLE")
|
||||||
|
if saveCandle == "true" {
|
||||||
|
_, err := cd.SetToKey(cr)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Warning("SetToKey err: ", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
// TODO update plate and coaster
|
||||||
|
go func() {
|
||||||
|
makeSeries := os.Getenv("SARDINE_MAKESERIES")
|
||||||
|
if makeSeries == "true" {
|
||||||
|
_, err := cd.InsertIntoPlate(cr)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Warning("SetToKey err: ", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// 由于max可以从远端直接拿,不需要sardine自己做,所以可以sardine做也可以不做
|
||||||
|
go func(cad *MyCandle) {
|
||||||
|
makeMaX := os.Getenv("SARDINE_MAKEMAX")
|
||||||
|
if makeMaX == "true" {
|
||||||
|
// 等一会儿防止candle还没有加进CoinMap
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
cr.MakeMaXsChan <- &cad.Candle
|
||||||
|
}
|
||||||
|
}(cd)
|
||||||
|
|
||||||
|
go func(cad *MyCandle) {
|
||||||
|
// 触发制作插值candle
|
||||||
|
makeSoft := false
|
||||||
|
// makeVolSoft := true
|
||||||
|
// makeVolSoft := false
|
||||||
|
if os.Getenv("SARDINE_MAKESOFTCANDLE") == "true" {
|
||||||
|
makeSoft = true
|
||||||
|
}
|
||||||
|
// 根据低维度candle,模拟出"软"的高纬度candle
|
||||||
|
if cad.Period == "1m" && makeSoft {
|
||||||
|
fmt.Println("makeSoft:", cad.Period, cad.InstId)
|
||||||
|
MakeSoftCandles(cr, &cad.Candle)
|
||||||
|
}
|
||||||
|
}(cd)
|
||||||
|
go func(cad *MyCandle) {
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
cr.AddToGeneralCandleChnl(cad)
|
||||||
|
}(cd)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cd *MyCandle) InsertIntoPlate(cr *core.Core) (*core.Sample, error) {
|
||||||
|
cr.Mu.Lock()
|
||||||
|
defer cr.Mu.Unlock()
|
||||||
|
// pl, plateFounded := cr.PlateMap[cd.InstID]
|
||||||
|
// if !plateFounded || pl == nil {
|
||||||
|
pl, _ := LoadPlate(cr, cd.InstID)
|
||||||
|
cr.PlateMap[cd.InstID] = pl
|
||||||
|
// }
|
||||||
|
po, coasterFounded := pl.CoasterMap["period"+cd.Period]
|
||||||
|
err := errors.New("")
|
||||||
|
if !coasterFounded {
|
||||||
|
pl.MakeCoaster(cr, cd.Period)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(po.InstID) == 0 {
|
||||||
|
// logrus.Debug("candle coaster: ", cd.Period, pl.CoasterMap["period"+cd.Period], pl.CoasterMap)
|
||||||
|
//创建失败的原因是原始数据不够,一般发生在服务中断了,缺少部分数据的情况下, 后续需要数据补全措施
|
||||||
|
erstr := fmt.Sprintln("coaster创建失败 candle instID: "+cd.InstID+"; period: "+cd.Period, "coasterFounded: ", coasterFounded, " ", err)
|
||||||
|
err := errors.New(erstr)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
sm, err := po.RPushSample(cr, *cd, "candle")
|
||||||
|
return sm, err
|
||||||
|
}
|
@ -8,15 +8,49 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
// "sync"
|
||||||
"time"
|
"time"
|
||||||
|
//
|
||||||
simple "github.com/bitly/go-simplejson"
|
// simple "github.com/bitly/go-simplejson"
|
||||||
"github.com/go-redis/redis"
|
"github.com/go-redis/redis"
|
||||||
// "github.com/phyer/core/utils"
|
// "github.com/phyer/core/utils"
|
||||||
logrus "github.com/sirupsen/logrus"
|
logrus "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func GetRemoteRedisConfigList() ([]*core.RedisConfig, error) {
|
||||||
|
list := []*core.RedisConfig{}
|
||||||
|
envListStr := os.Getenv("SARDINE_REMOTE_REDIS_LIST")
|
||||||
|
envList := strings.Split(envListStr, "|")
|
||||||
|
for _, v := range envList {
|
||||||
|
if len(v) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
urlstr := core.REMOTE_REDIS_PRE_NAME + v + "_URL"
|
||||||
|
indexstr := core.REMOTE_REDIS_PRE_NAME + v + "_INDEX"
|
||||||
|
password := os.Getenv(core.REMOTE_REDIS_PRE_NAME + v + "_PASSWORD")
|
||||||
|
// channelstr := core.REMOTE_REDIS_PRE_NAME + v + "_CHANNEL_PRENAME"
|
||||||
|
// channelPreName := os.Getenv(channelstr)
|
||||||
|
url := os.Getenv(urlstr)
|
||||||
|
index := os.Getenv(indexstr)
|
||||||
|
if len(url) == 0 || len(index) == 0 {
|
||||||
|
err := errors.New("remote redis config err:" + urlstr + "," + url + "," + indexstr + "," + index)
|
||||||
|
return list, err
|
||||||
|
}
|
||||||
|
idx, err := strconv.Atoi(index)
|
||||||
|
if err != nil {
|
||||||
|
return list, err
|
||||||
|
}
|
||||||
|
curConf := core.RedisConfig{
|
||||||
|
Url: url,
|
||||||
|
Password: password,
|
||||||
|
Index: idx,
|
||||||
|
// ChannelPreName: channelPreName,
|
||||||
|
}
|
||||||
|
list = append(list, &curConf)
|
||||||
|
}
|
||||||
|
return list, nil
|
||||||
|
}
|
||||||
|
|
||||||
func LoopSubscribe(cr *core.Core, channelName string, redisConf *core.RedisConfig) {
|
func LoopSubscribe(cr *core.Core, channelName string, redisConf *core.RedisConfig) {
|
||||||
redisRemoteCli := cr.RedisRemoteCli
|
redisRemoteCli := cr.RedisRemoteCli
|
||||||
suffix := ""
|
suffix := ""
|
||||||
@ -72,7 +106,7 @@ func LoopSubscribe(cr *core.Core, channelName string, redisConf *core.RedisConfi
|
|||||||
}
|
}
|
||||||
case "maX":
|
case "maX":
|
||||||
{
|
{
|
||||||
mx := MaX{}
|
mx := core.MaX{}
|
||||||
if msg.Payload == "" {
|
if msg.Payload == "" {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -87,7 +121,7 @@ func LoopSubscribe(cr *core.Core, channelName string, redisConf *core.RedisConfi
|
|||||||
case "tickerInfo":
|
case "tickerInfo":
|
||||||
{
|
{
|
||||||
//tickerInfo: map[askPx:2.2164 askSz:17.109531 bidPx:2.2136 bidSz:73 high24h:2.497 instId:STX-USDT instType:SPOT last:2.2136 lastSz:0 low24h:2.0508 open24h:2.42 sodUtc0:2.4266 sodUtc8:2.4224 ts:1637077323552 vol24h:5355479.488179 :12247398.975501]
|
//tickerInfo: map[askPx:2.2164 askSz:17.109531 bidPx:2.2136 bidSz:73 high24h:2.497 instId:STX-USDT instType:SPOT last:2.2136 lastSz:0 low24h:2.0508 open24h:2.42 sodUtc0:2.4266 sodUtc8:2.4224 ts:1637077323552 vol24h:5355479.488179 :12247398.975501]
|
||||||
ti := TickerInfo{}
|
ti := core.TickerInfo{}
|
||||||
err := json.Unmarshal([]byte(msg.Payload), &ti)
|
err := json.Unmarshal([]byte(msg.Payload), &ti)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Warning("tickerInfo payload unmarshal err: ", err, msg.Payload)
|
logrus.Warning("tickerInfo payload unmarshal err: ", err, msg.Payload)
|
||||||
@ -95,17 +129,274 @@ func LoopSubscribe(cr *core.Core, channelName string, redisConf *core.RedisConfi
|
|||||||
cr.TickerInforocessChan <- &ti
|
cr.TickerInforocessChan <- &ti
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
case "seriesInfo":
|
// case "seriesInfo":
|
||||||
{
|
// {
|
||||||
//tickerInfo: map[askPx:2.2164 askSz:17.109531 bidPx:2.2136 bidSz:73 high24h:2.497 instId:STX-USDT instType:SPOT last:2.2136 lastSz:0 low24h:2.0508 open24h:2.42 sodUtc0:2.4266 sodUtc8:2.4224 ts:1637077323552 vol24h:5355479.488179 :12247398.975501]
|
// //tickerInfo: map[askPx:2.2164 askSz:17.109531 bidPx:2.2136 bidSz:73 high24h:2.497 instId:STX-USDT instType:SPOT last:2.2136 lastSz:0 low24h:2.0508 open24h:2.42 sodUtc0:2.4266 sodUtc8:2.4224 ts:1637077323552 vol24h:5355479.488179 :12247398.975501]
|
||||||
sei := SeriesInfo{}
|
// sei := SeriesInfo{}
|
||||||
err := json.Unmarshal([]byte(msg.Payload), &sei)
|
// err := json.Unmarshal([]byte(msg.Payload), &sei)
|
||||||
if err != nil {
|
// if err != nil {
|
||||||
logrus.Warning("seriesInfo payload unmarshal err: ", err, msg.Payload)
|
// logrus.Warning("seriesInfo payload unmarshal err: ", err, msg.Payload)
|
||||||
}
|
// }
|
||||||
cr.SeriesChan <- &sei
|
// cr.SeriesChan <- &sei
|
||||||
break
|
// break
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func LoopMakeMaX(cr *core.Core) {
|
||||||
|
for {
|
||||||
|
cd := <-cr.MakeMaXsChan
|
||||||
|
go func(cad *core.Candle) {
|
||||||
|
//当一个candle的多个时间点的数据几乎同时到达时,顺序无法保证,制作maX会因为中间缺失数据而计算错,因此,等待一秒钟等数据都全了再算
|
||||||
|
// sz := utils.ShaiziInt(1500) + 500
|
||||||
|
time.Sleep(time.Duration(30) * time.Millisecond)
|
||||||
|
err, ct := MakeMaX(cr, cad, 7)
|
||||||
|
logrus.Warn(GetFuncName(), " ma7 err:", err, " ct:", ct, " cd.InstID:", cd.InstId, " cd.Period:", cd.Period)
|
||||||
|
//TODO 这个思路不错,单行不通,远程redis禁不住这么频繁的请求
|
||||||
|
// cd.InvokeRestQFromRemote(cr, ct)
|
||||||
|
}(cd)
|
||||||
|
go func(cad *core.Candle) {
|
||||||
|
//当一个candle的多个时间点的数据几乎同时到达时,顺序无法保证,制作maX会因为中间缺失数据而计算错,因此,等待一秒钟等数据都全了再算
|
||||||
|
// sz := utils.ShaiziInt(2000) + 500
|
||||||
|
time.Sleep(time.Duration(30) * time.Millisecond)
|
||||||
|
err, ct := MakeMaX(cr, cad, 30)
|
||||||
|
logrus.Warn(GetFuncName(), " ma30 err:", err, " ct:", ct, " cd.InstID:", cd.InstId, " cd.Period:", cd.Period)
|
||||||
|
// cd.InvokeRestQFromRemote(cr, ct)
|
||||||
|
}(cd)
|
||||||
|
// TODO TODO 这地方不能加延时,否则makeMax处理不过来,多的就丢弃了,造成maX的sortedSet比candle的短很多。后面所有依赖的逻辑都受影响.
|
||||||
|
// time.Sleep(300 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// setName := "candle" + period + "|" + instId + "|sortedSet"
|
||||||
|
// count: 倒推多少个周期开始拿数据
|
||||||
|
// from: 倒推的起始时间点
|
||||||
|
// ctype: candle或者maX
|
||||||
|
func GetRangeCandleSortedSet(cr *core.Core, setName string, count int, from time.Time) (*core.CandleList, error) {
|
||||||
|
cdl := core.CandleList{}
|
||||||
|
ary1 := strings.Split(setName, "|")
|
||||||
|
ary2 := []string{}
|
||||||
|
period := ""
|
||||||
|
ary2 = strings.Split(ary1[0], "candle")
|
||||||
|
period = ary2[1]
|
||||||
|
|
||||||
|
dui, err := cr.PeriodToMinutes(period)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
fromt := from.UnixMilli()
|
||||||
|
nw := time.Now().UnixMilli()
|
||||||
|
if fromt > nw*2 {
|
||||||
|
err := errors.New("时间错了需要debug")
|
||||||
|
logrus.Warning(err.Error())
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
froms := strconv.FormatInt(fromt, 10)
|
||||||
|
sti := fromt - dui*int64(count)*60*1000
|
||||||
|
sts := strconv.FormatInt(sti, 10)
|
||||||
|
opt := redis.ZRangeBy{
|
||||||
|
Min: sts,
|
||||||
|
Max: froms,
|
||||||
|
Count: int64(count),
|
||||||
|
}
|
||||||
|
ary := []string{}
|
||||||
|
extt, err := GetExpiration(cr, period)
|
||||||
|
ot := time.Now().Add(extt * -1)
|
||||||
|
oti := ot.UnixMilli()
|
||||||
|
cli := cr.RedisLocalCli
|
||||||
|
cli.LTrim(setName, 0, oti)
|
||||||
|
cunt, _ := cli.ZRemRangeByScore(setName, "0", strconv.FormatInt(oti, 10)).Result()
|
||||||
|
if cunt > 0 {
|
||||||
|
logrus.Warning("移出过期的引用数量:", setName, count, "ZRemRangeByScore ", setName, 0, strconv.FormatInt(oti, 10))
|
||||||
|
}
|
||||||
|
logrus.Info("ZRevRangeByScore ", setName, opt)
|
||||||
|
ary, err = cli.ZRevRangeByScore(setName, opt).Result()
|
||||||
|
if err != nil {
|
||||||
|
return &cdl, err
|
||||||
|
}
|
||||||
|
keyAry, err := cli.MGet(ary...).Result()
|
||||||
|
if err != nil || len(keyAry) == 0 {
|
||||||
|
logrus.Warning("no record with cmd: ZRevRangeByScore ", setName, froms, sts, " ", err.Error())
|
||||||
|
logrus.Warning("zrev lens of ary: lens: ", len(ary), "GetRangeSortedSet ZRevRangeByScore:", "setName:", setName, "opt.Max:", opt.Max, "opt.Min:", opt.Min)
|
||||||
|
return &cdl, err
|
||||||
|
}
|
||||||
|
for _, str := range keyAry {
|
||||||
|
if str == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
cd := core.Candle{}
|
||||||
|
err := json.Unmarshal([]byte(str.(string)), &cd)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Warn(GetFuncName(), err, str.(string))
|
||||||
|
}
|
||||||
|
tmi := ToInt64(cd.Data[0])
|
||||||
|
tm := time.UnixMilli(tmi)
|
||||||
|
if tm.Sub(from) > 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
cdl.List = append(cdl.List, &cd)
|
||||||
|
}
|
||||||
|
cdl.Count = count
|
||||||
|
return &cdl, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetExpiration(cr *core.Core, per string) (time.Duration, error) {
|
||||||
|
if len(per) == 0 {
|
||||||
|
erstr := fmt.Sprint("period没有设置")
|
||||||
|
logrus.Warn(erstr)
|
||||||
|
err := errors.New(erstr)
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
exp, err := cr.PeriodToMinutes(per)
|
||||||
|
dur := time.Duration(exp*49) * time.Minute
|
||||||
|
return dur, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func MakeMaX(cr *core.Core, cl *core.Candle, count int) (error, int) {
|
||||||
|
data := cl.Data
|
||||||
|
js, _ := json.Marshal(data)
|
||||||
|
// cjs, _ := json.Marshal(cl)
|
||||||
|
if len(data) == 0 {
|
||||||
|
err := errors.New("data is block: " + string(js))
|
||||||
|
return err, 0
|
||||||
|
}
|
||||||
|
|
||||||
|
tsi := ToInt64(data[0])
|
||||||
|
// tsa := time.UnixMilli(tsi).Format("01-02 15:03:04")
|
||||||
|
// fmt.Println("MakeMaX candle: ", cl.InstID, cl.Period, tsa, cl.From)
|
||||||
|
tss := strconv.FormatInt(tsi, 10)
|
||||||
|
keyName := "candle" + cl.Period + "|" + cl.InstId + "|ts:" + tss
|
||||||
|
//过期时间:根号(当前candle的周期/1分钟)*10000
|
||||||
|
|
||||||
|
lastTime := time.UnixMilli(tsi)
|
||||||
|
// lasts := lastTime.Format("2006-01-02 15:04")
|
||||||
|
// 以当前candle的时间戳为起点倒推count个周期,取得所需candle用于计算maX
|
||||||
|
setName := "candle" + cl.Period + "|" + cl.InstId + "|sortedSet"
|
||||||
|
// cdl, err := cr.GetLastCandleListOfCoin(cl.InstID, cl.Period, count, lastTime)
|
||||||
|
cdl, err := GetRangeCandleSortedSet(cr, setName, count, lastTime)
|
||||||
|
if err != nil {
|
||||||
|
return err, 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// fmt.Println("makeMaX: list: ", "instId: ", cl.InstID, "cl.Period: ", cl.Period, " lastTime:", lastTime, " count: ", count)
|
||||||
|
amountLast := float64(0)
|
||||||
|
ct := float64(0)
|
||||||
|
// fmt.Println("makeMax len of GetLastCandleListOfCoin list: ", len(cdl.List), "makeMax err of GetLastCandleListOfCoin: ", err)
|
||||||
|
if len(cdl.List) == 0 {
|
||||||
|
return err, 0
|
||||||
|
}
|
||||||
|
// ljs, _ := json.Marshal(cdl.List)
|
||||||
|
// fmt.Println("makeMax: ljs: ", string(ljs))
|
||||||
|
for _, v := range cdl.List {
|
||||||
|
curLast, err := strconv.ParseFloat(v.Data[4].(string), 64)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if curLast > 0 {
|
||||||
|
ct++
|
||||||
|
}
|
||||||
|
amountLast += curLast
|
||||||
|
//----------------------------------------------
|
||||||
|
}
|
||||||
|
avgLast := amountLast / ct
|
||||||
|
if float64(ct) < float64(count) {
|
||||||
|
err := errors.New("no enough source to calculate maX ")
|
||||||
|
return err, int(float64(count) - ct)
|
||||||
|
// fmt.Println("makeMax err: 没有足够的数据进行计算ma", "candle:", cl, "counts:", count, "ct:", ct, "avgLast: ")
|
||||||
|
} else {
|
||||||
|
// fmt.Println("makeMax keyName: ma", count, keyName, " avgLast: ", avgLast, "ts: ", tsi, "ct: ", ct, "ots: ", ots, "candle: ", string(cjs))
|
||||||
|
|
||||||
|
}
|
||||||
|
mx := core.MaX{
|
||||||
|
KeyName: keyName,
|
||||||
|
InstID: cl.InstId,
|
||||||
|
Period: cl.Period,
|
||||||
|
From: cl.From,
|
||||||
|
Count: count,
|
||||||
|
Ts: tsi,
|
||||||
|
AvgVal: avgLast,
|
||||||
|
}
|
||||||
|
dt := []interface{}{}
|
||||||
|
dt = append(dt, mx.Ts)
|
||||||
|
dt = append(dt, mx.AvgVal)
|
||||||
|
dt = append(dt, ct)
|
||||||
|
mx.Data = dt
|
||||||
|
|
||||||
|
// key存到redis
|
||||||
|
|
||||||
|
cr.MaXProcessChan <- &mx
|
||||||
|
return nil, 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func CandlesProcess(cr *core.Core) {
|
||||||
|
for {
|
||||||
|
cd := <-cr.CandlesProcessChan
|
||||||
|
logrus.Debug("cd: ", cd)
|
||||||
|
go func(cad *core.Candle) {
|
||||||
|
mcd := MyCandle{
|
||||||
|
Candle: *cad,
|
||||||
|
}
|
||||||
|
mcd.Process(cr)
|
||||||
|
}(cd)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 使用当前某个原始维度的candle对象,生成其他目标维度的candle对象,比如用3分钟的candle可以生成15分钟及以上的candle
|
||||||
|
// {
|
||||||
|
// "startTime": "2021-12-04 20:00",
|
||||||
|
// "seg": "m",
|
||||||
|
// "count": 1
|
||||||
|
// },
|
||||||
|
// 从startTime开始,经历整数个(count * seg)之后,还能不大于分钟粒度的当前时间的话,那个时间点就是最近的当前段起始时间点
|
||||||
|
func MakeSoftCandles(cr *core.Core, cd *core.Candle) {
|
||||||
|
segments := cr.Cfg.Config.Get("softCandleSegmentList").MustArray()
|
||||||
|
for k, v := range segments {
|
||||||
|
cs := core.CandleSegment{}
|
||||||
|
sv, _ := json.Marshal(v)
|
||||||
|
json.Unmarshal(sv, &cs)
|
||||||
|
// if k > 2 {
|
||||||
|
// continue
|
||||||
|
// }
|
||||||
|
if !cs.Enabled {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// TODO: 通过序列化和反序列化,对原始的candle进行克隆,因为是对引用进行操作,所以每个seg里对candle进行操作都会改变原始对象,这和预期不符
|
||||||
|
bt, _ := json.Marshal(cd)
|
||||||
|
cd0 := core.Candle{}
|
||||||
|
json.Unmarshal(bt, &cd0)
|
||||||
|
|
||||||
|
tmi := ToInt64(cd0.Data[0])
|
||||||
|
tm := time.UnixMilli(tmi)
|
||||||
|
if tm.Unix() > 10*time.Now().Unix() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// 下面这几种目标维度的,不生成softCandle
|
||||||
|
if cs.Seg == "1m" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
otm, err := cr.PeriodToLastTime(cs.Seg, tm)
|
||||||
|
logrus.Warn("MakeSoftCandles cs.Seg: ", cs.Seg, ", otm:", otm)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
logrus.Warning("MakeSoftCandles err: ", err)
|
||||||
|
}
|
||||||
|
otmi := otm.UnixMilli()
|
||||||
|
cd1 := core.Candle{
|
||||||
|
InstId: cd0.InstId, // string `json:"instId", string`
|
||||||
|
Period: cs.Seg, // `json:"period", string`
|
||||||
|
Data: cd0.Data, // `json:"data"`
|
||||||
|
From: "soft|" + os.Getenv("HOSTNAME"), // string `json:"from"`
|
||||||
|
}
|
||||||
|
// cd0是从tickerInfo创建的1m Candle克隆来的, Data里只有Data[4]被赋值,是last,其他都是"-1"
|
||||||
|
// TODO 填充其余几个未赋值的字段,除了成交量和成交美元数以外,并存入redis待用
|
||||||
|
// strconv.FormatInt(otmi, 10)
|
||||||
|
cd1.Data = cd0.GetSetCandleInfo(cr, cs.Seg, otmi)
|
||||||
|
// 生成软交易量和交易数对,用于代替last生成max
|
||||||
|
go func(k int) {
|
||||||
|
time.Sleep(time.Duration(100*k) * time.Millisecond)
|
||||||
|
cr.CandlesProcessChan <- &cd1
|
||||||
|
}(k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
package module
|
package module
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/md5"
|
||||||
|
"encoding/hex"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
@ -10,6 +12,7 @@ import (
|
|||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
|
"reflect"
|
||||||
"runtime"
|
"runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@ -69,3 +72,36 @@ func Md5V(str string) string {
|
|||||||
h.Write([]byte(str))
|
h.Write([]byte(str))
|
||||||
return hex.EncodeToString(h.Sum(nil))
|
return hex.EncodeToString(h.Sum(nil))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ToString(val interface{}) string {
|
||||||
|
valstr := ""
|
||||||
|
if reflect.TypeOf(val).Name() == "string" {
|
||||||
|
valstr = val.(string)
|
||||||
|
} else if reflect.TypeOf(val).Name() == "float64" {
|
||||||
|
valstr = strconv.FormatFloat(val.(float64), 'f', 1, 64)
|
||||||
|
} else if reflect.TypeOf(val).Name() == "int64" {
|
||||||
|
valstr = strconv.FormatInt(val.(int64), 16)
|
||||||
|
}
|
||||||
|
return valstr
|
||||||
|
}
|
||||||
|
|
||||||
|
func ToInt64(val interface{}) int64 {
|
||||||
|
vali := int64(0)
|
||||||
|
if reflect.TypeOf(val).Name() == "string" {
|
||||||
|
vali, _ = strconv.ParseInt(val.(string), 10, 64)
|
||||||
|
} else if reflect.TypeOf(val).Name() == "float64" {
|
||||||
|
vali = int64(val.(float64))
|
||||||
|
}
|
||||||
|
return vali
|
||||||
|
}
|
||||||
|
func ToFloat64(val interface{}) float64 {
|
||||||
|
valf := float64(0)
|
||||||
|
if reflect.TypeOf(val).Name() == "string" {
|
||||||
|
valf, _ = strconv.ParseFloat(val.(string), 64)
|
||||||
|
} else if reflect.TypeOf(val).Name() == "float64" {
|
||||||
|
valf = val.(float64)
|
||||||
|
} else if reflect.TypeOf(val).Name() == "int64" {
|
||||||
|
valf = float64(val.(int64))
|
||||||
|
}
|
||||||
|
return valf
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user