siaga/modules/extent.go
2024-12-14 17:37:19 +08:00

112 lines
3.0 KiB
Go

package module
import (
"encoding/json"
"errors"
"fmt"
"github.com/phyer/core"
"os"
"strconv"
"strings"
"sync"
"time"
simple "github.com/bitly/go-simplejson"
"github.com/go-redis/redis"
// "github.com/phyer/core/utils"
logrus "github.com/sirupsen/logrus"
)
func LoopSubscribe(cr *core.Core, channelName string, redisConf *core.RedisConfig) {
redisRemoteCli := cr.RedisRemoteCli
suffix := ""
env := os.Getenv("GO_ENV")
if strings.Contains(env, "demoEnv") {
suffix = "-demoEnv"
}
fmt.Println("loopSubscribe: ", channelName+suffix)
pubsub := redisRemoteCli.Subscribe(channelName + suffix)
_, err := pubsub.Receive()
if err != nil {
// cr.ErrorToRobot(utils.GetFuncName(), err)
fmt.Println(GetFuncName(), " ", err)
panic(err)
}
// 用管道来接收消息
ch := pubsub.Channel()
// 处理消息
for msg := range ch {
if msg.Payload == "" {
continue
}
ctype := ""
if strings.Contains(channelName, "allCandle") {
ctype = "candle"
} else if strings.Contains(channelName, "allMaXs") {
ctype = "maX"
} else if strings.Contains(channelName, "ticker") {
ctype = "tickerInfo"
} else if strings.Contains(channelName, "ccyPositions") {
ctype = "ccyPositions"
} else if strings.Contains(channelName, "allseriesinfo") {
ctype = "seriesinfo"
} else if strings.Contains(channelName, "private|order") {
ctype = "private|order"
} else {
logrus.Warning("channelname not match", channelName)
}
// logrus.Warning("msg.Payload: ", msg.Payload)
// fmt.Println("channelName: ", channelName, " msg.Payload: ", msg.Payload)
switch ctype {
case "candle":
{
cd := core.Candle{}
json.Unmarshal([]byte(msg.Payload), &cd)
founded := cd.Filter(cr)
if !founded {
break
}
cr.CandlesProcessChan <- &cd
break
}
case "maX":
{
mx := MaX{}
if msg.Payload == "" {
continue
}
json.Unmarshal([]byte(msg.Payload), &mx)
dt := []interface{}{}
dt = append(dt, mx.Ts)
dt = append(dt, mx.AvgVal)
mx.Data = dt
cr.MaXProcessChan <- &mx
break
}
case "tickerInfo":
{
//tickerInfo: map[askPx:2.2164 askSz:17.109531 bidPx:2.2136 bidSz:73 high24h:2.497 instId:STX-USDT instType:SPOT last:2.2136 lastSz:0 low24h:2.0508 open24h:2.42 sodUtc0:2.4266 sodUtc8:2.4224 ts:1637077323552 vol24h:5355479.488179 :12247398.975501]
ti := TickerInfo{}
err := json.Unmarshal([]byte(msg.Payload), &ti)
if err != nil {
logrus.Warning("tickerInfo payload unmarshal err: ", err, msg.Payload)
}
cr.TickerInforocessChan <- &ti
break
}
case "seriesInfo":
{
//tickerInfo: map[askPx:2.2164 askSz:17.109531 bidPx:2.2136 bidSz:73 high24h:2.497 instId:STX-USDT instType:SPOT last:2.2136 lastSz:0 low24h:2.0508 open24h:2.42 sodUtc0:2.4266 sodUtc8:2.4224 ts:1637077323552 vol24h:5355479.488179 :12247398.975501]
sei := SeriesInfo{}
err := json.Unmarshal([]byte(msg.Payload), &sei)
if err != nil {
logrus.Warning("seriesInfo payload unmarshal err: ", err, msg.Payload)
}
cr.SeriesChan <- &sei
break
}
}
}
}