diff --git a/configs/basicConfig.json b/configs/basicConfig.json index f3d7664..631f773 100644 --- a/configs/basicConfig.json +++ b/configs/basicConfig.json @@ -153,17 +153,11 @@ }, "redisRemotes": { "master": { - "url": "localhost:16379", + "url": "fluentd.k8s.xunlang.home:6379", "password": "", "index": 5, "description": "" }, - "slave": { - "url": "localhost:16379", - "password": "", - "index": 5, - "description": "" - } }, "threads": { "asyncChannels": 20, @@ -202,7 +196,6 @@ "OKB-USDT", "DOGE-USDT", "SOL-USDT", - "XRP-USDT", "DYDX-USDT" ], "softCandleSegmentList": [{ diff --git a/go.mod b/go.mod index 8e481d9..01f6571 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ go 1.21 require ( github.com/go-redis/redis v6.15.9+incompatible - github.com/phyer/core v0.1.18 + github.com/phyer/core v0.1.20 github.com/sirupsen/logrus v1.9.3 ) @@ -15,5 +15,6 @@ require ( github.com/onsi/ginkgo v1.16.5 // indirect github.com/onsi/gomega v1.18.1 // indirect github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196 // indirect + github.com/phyer/v5sdkgo v0.1.4 // indirect golang.org/x/sys v0.13.0 // indirect ) diff --git a/go.sum b/go.sum index 28d7f79..298ddda 100644 --- a/go.sum +++ b/go.sum @@ -28,6 +28,7 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= @@ -48,10 +49,12 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= -github.com/phyer/core v0.1.18 h1:pXQ2QDvkbCVtqcmaQl2nCa7LjYYeJkYVQdb26HcTvgc= -github.com/phyer/core v0.1.18/go.mod h1:oVP5mvnnJvI2Qxlnh4jYGj92DbH7XyY2xeRagQ3hdo8= +github.com/phyer/core v0.1.20 h1:KqDlXB0IFWXpbx8UViRxC5+y2CRZ1K+f8GzQBVhq3Vw= +github.com/phyer/core v0.1.20/go.mod h1:XZdniJiiZPzOU8+QHPFRQWdvJa6m5Ilj5VClWWI0OQg= github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196 h1:P1sxgCsS0VIL38ufZzgUuZLLyY/B+po6kSY7ziNZT7E= github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196/go.mod h1:iZexs5agdApNlp8HW/FqKgma4Ij1x8/o+ZLcMvY3f80= +github.com/phyer/v5sdkgo v0.1.4 h1:mAxxjPJVTYGuGDarqOcFGkzj5AgqbbzJGsnYmmsbapU= +github.com/phyer/v5sdkgo v0.1.4/go.mod h1:QCMnQFQNizOvFRPKytv50fOg/MoxS44IFcQicc4NxOg= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= @@ -59,9 +62,16 @@ github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTE github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -124,5 +134,6 @@ gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go index f544445..e8f4eba 100644 --- a/main.go +++ b/main.go @@ -19,6 +19,8 @@ func main() { rdsLs, _ := md.GetRemoteRedisConfigList() // 目前只有phyer里部署的tunas会发布tickerInfo信息 + + // 订阅 redis TickerInfo go func(vv *core.RedisConfig) { allowed := os.Getenv("SIAGA_ACCEPTTICKER") == "true" if !allowed { @@ -26,7 +28,8 @@ func main() { } md.LoopSubscribe(&cr, core.TICKERINFO_PUBLISH, vv) }(rdsLs[0]) - time.Sleep(5 * time.Second) + + // 订阅 redis Candles go func(vv *core.RedisConfig) { allowed := os.Getenv("SIAGA_ACCEPTCANDLE") == "true" if !allowed { @@ -34,6 +37,8 @@ func main() { } md.LoopSubscribe(&cr, core.ALLCANDLES_PUBLISH, vv) }(rdsLs[0]) + + // 订阅 redis Max go func(vv *core.RedisConfig) { allowed := os.Getenv("SIAGA_ACCEPTMAX") == "true" if !allowed { @@ -41,6 +46,7 @@ func main() { } md.LoopSubscribe(&cr, core.ALLMAXES_PUBLISH, vv) }(rdsLs[0]) + // 下面这个暂时不运行, 在环境变量里把它关掉 go func(vv *core.RedisConfig) { allowed := os.Getenv("SIAGA_ACCEPTSERIES") == "true" @@ -51,20 +57,16 @@ func main() { }(rdsLs[0]) go func() { - md.LoopMakeMaX(&cr) + md.TickerInfoProcess(&cr) }() - // 这些临时关掉,很快打开 - // go func() { - // core.LoopCheckRemoteRedis(&cr) - // }() go func() { md.CandlesProcess(&cr) }() go func() { - md.MaXsProcess(&cr) + md.LoopMakeMaX(&cr) }() go func() { - md.TickerInfoProcess(&cr) + md.MaXsProcess(&cr) }() // 这些暂时不运行, 以后要不要运行再说 diff --git a/modules/candle.go b/modules/candle.go index d31e0bb..e0e1b1a 100644 --- a/modules/candle.go +++ b/modules/candle.go @@ -38,6 +38,10 @@ func (cd *MyCandle) Process(cr *core.Core) { logrus.Warning("SetToKey err: ", err) } } + // 对于软candle,推到elasticSearch + if strings.HasPrefix(cd.From, "soft") { + cd.PushToWriteLogChan(cr) + } }() // TODO update plate and coaster go func() { diff --git a/modules/extent.go b/modules/extent.go index 2183a6a..8073135 100644 --- a/modules/extent.go +++ b/modules/extent.go @@ -93,6 +93,7 @@ func LoopSubscribe(cr *core.Core, channelName string, redisConf *core.RedisConfi // logrus.Warning("msg.Payload: ", msg.Payload) // fmt.Println("channelName: ", channelName, " msg.Payload: ", msg.Payload) switch ctype { + // 接收到的candle扔到 candle 二次加工流水线 case "candle": { cd := core.Candle{} @@ -104,6 +105,8 @@ func LoopSubscribe(cr *core.Core, channelName string, redisConf *core.RedisConfi cr.CandlesProcessChan <- &cd break } + + // 接收到的maX扔到 maX 二次加工流水线 case "maX": { mx := core.MaX{} @@ -118,6 +121,8 @@ func LoopSubscribe(cr *core.Core, channelName string, redisConf *core.RedisConfi cr.MaXProcessChan <- &mx break } + + // 接收到的tinckerInfo扔到 tickerInfo 二次加工流水线 case "tickerInfo": { //tickerInfo: map[askPx:2.2164 askSz:17.109531 bidPx:2.2136 bidSz:73 high24h:2.497 instId:STX-USDT instType:SPOT last:2.2136 lastSz:0 low24h:2.0508 open24h:2.42 sodUtc0:2.4266 sodUtc8:2.4224 ts:1637077323552 vol24h:5355479.488179 :12247398.975501] @@ -317,6 +322,7 @@ func MakeMaX(cr *core.Core, cl *core.Candle, count int) (error, int) { Ts: tsi, AvgVal: avgLast, } + // MaX的Data里包含三个有效信息:时间戳,平均值,计算平均值所采用的数列长度 dt := []interface{}{} dt = append(dt, mx.Ts) dt = append(dt, mx.AvgVal) diff --git a/modules/maX.go b/modules/maX.go index 522eb25..8540e4c 100644 --- a/modules/maX.go +++ b/modules/maX.go @@ -28,6 +28,9 @@ func (mmx *MyMaX) Process(cr *core.Core) { fmt.Println("max SetToKey err: ", err) return } + go func() { + // mx.PushToWriteLogChan(cr) + }() // TODO go func() { torqueSorted := os.Getenv("SIAGA_MAKESERIES") == "true" diff --git a/siaga b/siaga index 141806f..ba8e827 100755 Binary files a/siaga and b/siaga differ diff --git a/siaga.env b/siaga.env index 800dee5..1765166 100644 --- a/siaga.env +++ b/siaga.env @@ -16,5 +16,5 @@ SIAGA_TICKERTOCANDLE=true TEXUS_FluentBitUrl=fluentd.k8s.xunlang.home SIAGA_UPSTREAM_REDIS_LIST=SILVER -SIAGA_UPSTREAM_REDIS_SILVER_URL=10.66.66.20:6379 +SIAGA_UPSTREAM_REDIS_SILVER_URL=fluentd.k8s.xunlang.home:6379 SIAGA_UPSTREAM_REDIS_SILVER_INDEX=4