up
This commit is contained in:
parent
bf3c65e488
commit
f259bcd4ad
BIN
.README.md.swp
BIN
.README.md.swp
Binary file not shown.
@ -28,7 +28,7 @@
|
||||
"maxTickers":3,
|
||||
"restPeriod": 180,
|
||||
"waitWs": 120
|
||||
},
|
||||
}
|
||||
},
|
||||
"stage": {
|
||||
"redis": {
|
||||
|
@ -5,7 +5,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
// "os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
@ -157,8 +157,6 @@ func (core *Core) SaveCandle(instId string, period string, rsp *rest.RESTAPIResu
|
||||
}
|
||||
itemList := js.Get("data").MustArray()
|
||||
Daoxu(itemList)
|
||||
|
||||
leng := len(itemList)
|
||||
for _, v := range itemList {
|
||||
candle := Candle{
|
||||
InstId: instId,
|
||||
@ -166,18 +164,7 @@ func (core *Core) SaveCandle(instId string, period string, rsp *rest.RESTAPIResu
|
||||
Data: v.([]interface{}),
|
||||
From: "rest",
|
||||
}
|
||||
//保存rest得到的candle
|
||||
saveCandle := os.Getenv("TUNAS_SAVECANDLE")
|
||||
if saveCandle == "true" {
|
||||
candle.SetToKey(core)
|
||||
}
|
||||
// 发布到allCandles|publish, 给外部订阅者用于setToKey,和给其他协程用于订阅ws
|
||||
arys := []string{ALLCANDLES_PUBLISH}
|
||||
if withWs {
|
||||
arys = append(arys, ALLCANDLES_INNER_PUBLISH)
|
||||
}
|
||||
core.AddToGeneralCandleChnl(&candle, arys)
|
||||
time.Sleep(dura / time.Duration(leng))
|
||||
candle.SetToKey(core)
|
||||
}
|
||||
}
|
||||
|
||||
@ -220,7 +207,7 @@ func (cl *Candle) SetToKey(core *Core) ([]interface{}, error) {
|
||||
// tm := time.UnixMilli(tsi).Format("2006-01-02 15:04")
|
||||
// fmt.Println("setToKey:", keyName, "ts: ", tm, "price: ", curPrice, "from:", cl.From)
|
||||
redisCli.Set(keyName, dt, extt).Result()
|
||||
core.SaveUniKey(cl.Period, keyName, extt, tsi)
|
||||
core.SaveUniKey(cl.Period, keyName, extt, tsi, cl)
|
||||
return cl.Data, err
|
||||
}
|
||||
|
||||
@ -244,13 +231,23 @@ func (mx *MaX) SetToKey() ([]interface{}, error) {
|
||||
return dt, err
|
||||
}
|
||||
|
||||
func (core *Core) SaveUniKey(period string, keyName string, extt time.Duration, tsi int64) {
|
||||
// 保证同一个 period, keyName ,在一个周期里,SaveToSortSet只会被执行一次
|
||||
func (core *Core) SaveUniKey(period string, keyName string, extt time.Duration, tsi int64, cl *Candle) {
|
||||
refName := keyName + "|refer"
|
||||
refRes, _ := core.RedisCli.GetSet(refName, 1).Result()
|
||||
core.RedisCli.Expire(refName, extt)
|
||||
if len(refRes) == 0 {
|
||||
core.SaveToSortSet(period, keyName, extt, tsi)
|
||||
if len(refRes) != 0 {
|
||||
return
|
||||
}
|
||||
cd, _ := json.Marshal(cl)
|
||||
wg := WriteLog{
|
||||
Content: cd,
|
||||
Tag: "sardine.log.candle." + cl.Period,
|
||||
}
|
||||
go func() {
|
||||
core.WriteLogChan <- &wg
|
||||
}()
|
||||
core.SaveToSortSet(period, keyName, extt, tsi)
|
||||
}
|
||||
|
||||
// tsi: 上报时间timeStamp millinSecond
|
||||
@ -385,66 +382,3 @@ func (core *Core) GetRangeKeyList(pattern string, from time.Time) ([]*simple.Jso
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (core *Core) InitStreamGroups() {
|
||||
redisCli := core.RedisCli
|
||||
streamNames := []string{
|
||||
"candles|stream",
|
||||
"maXs|stream",
|
||||
}
|
||||
grpNames := []string{
|
||||
"sardine1",
|
||||
"sardine2",
|
||||
"sardine3",
|
||||
}
|
||||
for _, v := range streamNames {
|
||||
for _, vv := range grpNames {
|
||||
redisCli.XGroupCreate(v, vv, "0-0").Result()
|
||||
}
|
||||
}
|
||||
}
|
||||
func (cr *Core) AddToGeneralCandleChnl(candle *Candle, channels []string) {
|
||||
//只让特定概率的事件发生
|
||||
// r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
// val := r.Float64()
|
||||
// rand.Seed(time.Now().UnixNano())
|
||||
// if from == "ws" && val > 0.90 {
|
||||
// fmt.Println("drop ", from)
|
||||
// return
|
||||
// }
|
||||
// fmt.Println("not drop ", from)
|
||||
// TODO map
|
||||
redisCli := cr.RedisCli
|
||||
|
||||
ab, _ := json.Marshal(candle)
|
||||
//特定的币只发送给特定的下游接收者
|
||||
// 这么设计不太好,业务逻辑和程序架构紧耦合了,
|
||||
prename := cr.DispatchDownstreamNodes(candle.InstId)
|
||||
|
||||
for _, v := range channels {
|
||||
suffix := ""
|
||||
env := os.Getenv("GO_ENV")
|
||||
if env == "demoEnv" {
|
||||
suffix = "-demoEnv"
|
||||
}
|
||||
vd := v + suffix
|
||||
|
||||
// TODO FIXME cli2
|
||||
_, err := redisCli.Publish(vd, string(ab)).Result()
|
||||
if len(prename) == 0 {
|
||||
continue
|
||||
}
|
||||
nodeAdd := prename + "|" + v + suffix
|
||||
// fmt.Println("nodeAdd: ", nodeAdd)
|
||||
|
||||
// TODO FIXME cli2
|
||||
_, err = redisCli.Publish(nodeAdd, string(ab)).Result()
|
||||
// fmt.Println("publish, channel,res,err:", nodeAdd, res, err, "candle:", string(ab))
|
||||
|
||||
if err != nil {
|
||||
fmt.Println("err of ma7|ma30 add to redis2:", err, candle.From)
|
||||
}
|
||||
}
|
||||
// TODO 下面这个先屏蔽,不订阅ws信息,否则系统压力会太大,等有更灵活的机制再说
|
||||
// redisCli.Publish("allCandlesInner|publish"+suffix, string(ab)).Result()
|
||||
}
|
||||
|
272
core/core.go
272
core/core.go
@ -3,9 +3,9 @@ package core
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
// "errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
// "math/rand"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
@ -13,7 +13,7 @@ import (
|
||||
"time"
|
||||
"v5sdk_go/rest"
|
||||
"v5sdk_go/ws"
|
||||
"v5sdk_go/ws/wImpl"
|
||||
// "v5sdk_go/ws/wImpl"
|
||||
|
||||
simple "github.com/bitly/go-simplejson"
|
||||
"github.com/go-redis/redis"
|
||||
@ -26,9 +26,11 @@ type Core struct {
|
||||
Cfg *MyConfig
|
||||
RedisCli *redis.Client
|
||||
RedisCli2 *redis.Client
|
||||
FluentBitUrl string
|
||||
Wg sync.WaitGroup
|
||||
RestQueueChan chan *RestQueue
|
||||
OrderChan chan *private.Order
|
||||
WriteLogChan chan *WriteLog
|
||||
}
|
||||
type RestQueue struct {
|
||||
InstId string
|
||||
@ -70,6 +72,15 @@ func (rst *RestQueue) Save(cr *Core) {
|
||||
cr.SaveCandle(rst.InstId, rst.Bar, rsp, rst.Duration, rst.WithWs)
|
||||
}
|
||||
|
||||
func WriteLogProcess(cr *Core) {
|
||||
for {
|
||||
wg := <-cr.WriteLogChan
|
||||
go func(wg *WriteLog) {
|
||||
wg.Process(cr)
|
||||
}(wg)
|
||||
}
|
||||
}
|
||||
|
||||
func (cr *Core) ShowSysTime() {
|
||||
rsp, _ := cr.RestInvoke("/api/v5/public/time", rest.GET)
|
||||
fmt.Println("serverSystem time:", rsp)
|
||||
@ -95,107 +106,6 @@ func (core *Core) Init() {
|
||||
}
|
||||
}
|
||||
|
||||
func (core *Core) GetWsCli() (*ws.WsClient, error) {
|
||||
url, err := core.Cfg.Config.Get("connect").Get("wsPublicBaseUrl").String()
|
||||
if err != nil {
|
||||
fmt.Println("err of json decode: ", err)
|
||||
}
|
||||
pubCli, err := ws.NewWsClient("wss://" + url)
|
||||
pubCli.AddBookMsgHook(core.PubMsgDispatcher)
|
||||
|
||||
if err != nil {
|
||||
fmt.Println("err of create ublic ws cli:", err)
|
||||
}
|
||||
return pubCli, err
|
||||
}
|
||||
|
||||
func (core *Core) DispatchDownstreamNodes(originName string) string {
|
||||
nodesStr := os.Getenv("TUNAS_DOWNSTREAM_NODES")
|
||||
if len(nodesStr) == 0 {
|
||||
return ""
|
||||
}
|
||||
nodes := strings.Split(nodesStr, "|")
|
||||
count := len(nodes)
|
||||
idx := utils.HashDispatch(originName, uint8(count))
|
||||
return nodes[idx]
|
||||
}
|
||||
|
||||
func (core *Core) Dispatch(channel string, ctype string, instId string, data interface{}) error {
|
||||
// fmt.Println("start to SaveToRedis:", channel, ctype, instId, data)
|
||||
b, err := json.Marshal(data)
|
||||
js, err := simple.NewJson(b)
|
||||
if err != nil {
|
||||
fmt.Println("err of unMarshalJson1:", js)
|
||||
}
|
||||
|
||||
isUsdt := strings.Contains(instId, "-USDT")
|
||||
instType, err := js.Get("instType").String()
|
||||
if !isUsdt {
|
||||
return err
|
||||
}
|
||||
|
||||
// fmt.Println("instId: ", instId)
|
||||
redisCli := core.RedisCli
|
||||
channelType := ""
|
||||
if channel == "instruments" {
|
||||
channelType = "instruments"
|
||||
} else if strings.Contains(channel, "candle") {
|
||||
channelType = "candle"
|
||||
}
|
||||
switch channelType {
|
||||
case "instruments":
|
||||
{
|
||||
// fmt.Println("isInstrument:", instId)
|
||||
if instType != "SPOT" {
|
||||
return errors.New("instType is not SPOT")
|
||||
}
|
||||
_, err = redisCli.HSet("instruments|"+ctype+"|hash", instId, b).Result()
|
||||
if err != nil {
|
||||
fmt.Println("err of hset to redis:", err)
|
||||
}
|
||||
break
|
||||
}
|
||||
case "candle":
|
||||
{
|
||||
data := data.([]interface{})
|
||||
ary := strings.Split(channel, "candle")
|
||||
// fmt.Println("dispatch candle:", ary[1], instId)
|
||||
candle := Candle{
|
||||
InstId: instId,
|
||||
Period: ary[1],
|
||||
Data: data,
|
||||
From: "ws",
|
||||
}
|
||||
core.WsSubscribe(&candle)
|
||||
saveCandle := os.Getenv("TUNAS_SAVECANDLE")
|
||||
if saveCandle == "true" {
|
||||
candle.SetToKey(core)
|
||||
}
|
||||
// TODO mxLen要放到core.Cfg里
|
||||
arys := []string{ALLCANDLES_PUBLISH}
|
||||
core.AddToGeneralCandleChnl(&candle, arys)
|
||||
break
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
// data := data.([]interface{})
|
||||
// bj, _ := json.Marshal(data)
|
||||
// fmt.Println("private data:", string(bj))
|
||||
return errors.New("channel type not catched")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (core *Core) PubMsgDispatcher(ts time.Time, data wImpl.MsgData) error {
|
||||
instList := data.Data
|
||||
for _, v := range instList {
|
||||
core.Dispatch(data.Arg["channel"], data.Arg["instType"], data.Arg["instId"], v)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (core *Core) GetRedisCli() (*redis.Client, error) {
|
||||
ru := core.Cfg.RedisConf.Url
|
||||
rp := core.Cfg.RedisConf.Password
|
||||
@ -292,78 +202,6 @@ func (core *Core) SubscribeTicker(op string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (core *Core) InnerSubscribeTicker(name string, op string, retry bool) error {
|
||||
// 在这里 args1 初始化tickerList的列表
|
||||
var args []map[string]string
|
||||
arg := make(map[string]string)
|
||||
arg["instId"] = name
|
||||
arg["instType"] = ws.SPOT
|
||||
args = append(args, arg)
|
||||
|
||||
if retry {
|
||||
go func(op string, args []map[string]string) {
|
||||
core.retrySubscribe(op, args)
|
||||
}(op, args)
|
||||
} else {
|
||||
go func(op string, args []map[string]string) {
|
||||
core.OnceSubscribe(op, args)
|
||||
}(op, args)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (core *Core) OnceSubscribe(op string, args []map[string]string) error {
|
||||
wsCli, _ := core.GetWsCli()
|
||||
res, _, err := wsCli.PubTickers(op, args)
|
||||
// defer wsCli.Stop()
|
||||
start := time.Now()
|
||||
if err != nil {
|
||||
fmt.Println("pubTickers err:", err)
|
||||
}
|
||||
if res {
|
||||
usedTime := time.Since(start)
|
||||
fmt.Println("订阅成功!", usedTime.String())
|
||||
} else {
|
||||
fmt.Println("订阅失败!", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (core *Core) retrySubscribe(op string, args []map[string]string) error {
|
||||
wsCli, _ := core.GetWsCli()
|
||||
res, _, err := wsCli.PubTickers(op, args)
|
||||
start := time.Now()
|
||||
if err != nil {
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
val := r.Int() / 20000000000000000
|
||||
fmt.Println("pubTickers err:", err, val, "秒后重试")
|
||||
time.Sleep(time.Duration(val) * time.Second)
|
||||
|
||||
return core.retrySubscribe(op, args)
|
||||
}
|
||||
if res {
|
||||
usedTime := time.Since(start)
|
||||
fmt.Println("订阅成功!", usedTime.String())
|
||||
} else {
|
||||
fmt.Println("订阅失败!", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (core *Core) TickerScore(score float64, name string) error {
|
||||
|
||||
redisCli := core.RedisCli
|
||||
mb := redis.Z{
|
||||
Score: score,
|
||||
Member: name,
|
||||
}
|
||||
_, err := redisCli.ZAdd("tradingRank", mb).Result()
|
||||
if err != nil {
|
||||
fmt.Println("zadd err:", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (core *Core) RestInvoke(subUrl string, method string) (*rest.RESTAPIResult, error) {
|
||||
restUrl, _ := core.Cfg.Config.Get("connect").Get("restBaseUrl").String()
|
||||
//ep, method, uri string, param *map[string]interface{}
|
||||
@ -473,92 +311,14 @@ func (core *Core) GetScoreList(count int) []string {
|
||||
|
||||
redisCli := core.RedisCli
|
||||
|
||||
curList, err := redisCli.ZRevRange("tickersList|sortedSet", 0, int64(count-1)).Result()
|
||||
curList, err := redisCli.ZRange("tickersList|sortedSet", 0, int64(count-1)).Result()
|
||||
if err != nil {
|
||||
fmt.Println("zrevrange err:", err)
|
||||
}
|
||||
fmt.Println("curList: ", curList)
|
||||
return curList
|
||||
}
|
||||
|
||||
func (core *Core) SubscribeCandleWithDimention(op string, instIdList []string, dimension string) {
|
||||
wsCli, err := core.GetWsCli()
|
||||
if err != nil {
|
||||
fmt.Println("ws client err", err)
|
||||
}
|
||||
err = wsCli.Start()
|
||||
// 创建ws客户端
|
||||
if err != nil {
|
||||
fmt.Println("ws client start err", err)
|
||||
}
|
||||
|
||||
var args []map[string]string
|
||||
for _, vs := range instIdList {
|
||||
arg := make(map[string]string)
|
||||
arg["instId"] = vs
|
||||
args = append(args, arg)
|
||||
}
|
||||
_, _, err = wsCli.PubKLine(op, wImpl.Period(dimension), args)
|
||||
if err != nil {
|
||||
fmt.Println("pubTickers err:", err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// 订阅某币,先要和配置比对,是否允许订阅此币此周期,
|
||||
func (core *Core) WsSubscribe(candle *Candle) error {
|
||||
wsPeriods := []string{}
|
||||
wsary := core.Cfg.Config.Get("wsDimentions").MustArray()
|
||||
for _, v := range wsary {
|
||||
wsPeriods = append(wsPeriods, v.(string))
|
||||
}
|
||||
redisCli := core.RedisCli
|
||||
period := candle.Period
|
||||
instId := candle.InstId
|
||||
from := candle.From
|
||||
sname := instId + "|" + period + "ts|Subscribed|key"
|
||||
|
||||
exists, _ := redisCli.Exists(sname).Result()
|
||||
ttl, _ := redisCli.TTL(sname).Result()
|
||||
inAry, _ := utils.In_Array(period, wsPeriods)
|
||||
if !inAry {
|
||||
estr := "subscribe 在配置中此period未被订阅: " + "," + period
|
||||
// fmt.Println(estr)
|
||||
err := errors.New(estr)
|
||||
return err
|
||||
} else {
|
||||
fmt.Println("subscribe 已经订阅: ", period)
|
||||
}
|
||||
waitWs, _ := core.Cfg.Config.Get("threads").Get("waitWs").Int64()
|
||||
willSub := false
|
||||
if exists > 0 {
|
||||
if ttl > 0 {
|
||||
if from == "ws" {
|
||||
redisCli.Expire(sname, time.Duration(waitWs)*time.Second).Result()
|
||||
}
|
||||
} else {
|
||||
willSub = true
|
||||
}
|
||||
} else {
|
||||
willSub = true
|
||||
}
|
||||
|
||||
if willSub {
|
||||
// 需要订阅
|
||||
|
||||
instIdList := []string{}
|
||||
instIdList = append(instIdList, instId)
|
||||
|
||||
core.SubscribeCandleWithDimention(ws.OP_SUBSCRIBE, instIdList, period)
|
||||
// 如果距离上次检查此candle此维度订阅状态已经过去超过2分钟还没有发现有ws消息上报,执行订阅
|
||||
dr := 1 * time.Duration(waitWs) * time.Second
|
||||
redisCli.Set(sname, 1, dr).Result()
|
||||
} else {
|
||||
// fmt.Println("拒绝订阅candles:", keyName, "tm: ", tm, "otsi:", otsi)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func LoopBalances(cr *Core, mdura time.Duration) {
|
||||
//协程:动态维护topScore
|
||||
ticker := time.NewTicker(mdura)
|
||||
|
31
core/writeLog.go
Normal file
31
core/writeLog.go
Normal file
@ -0,0 +1,31 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
|
||||
logrus "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type WriteLog struct {
|
||||
Content []byte
|
||||
Tag string
|
||||
}
|
||||
|
||||
func (wg *WriteLog) Process(cr *Core) error {
|
||||
go func() {
|
||||
reqBody := bytes.NewBuffer(wg.Content)
|
||||
cr.Env = os.Getenv("GO_ENV")
|
||||
cr.FluentBitUrl = os.Getenv("TEXUS_FluentBitUrl")
|
||||
fullUrl := "http://" + cr.FluentBitUrl + "/" + wg.Tag
|
||||
res, err := http.Post(fullUrl, "application/json", reqBody)
|
||||
|
||||
fmt.Println("requested, response:", fullUrl, string(wg.Content), res)
|
||||
if err != nil {
|
||||
logrus.Error(err)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
21
go.mod
21
go.mod
@ -7,17 +7,24 @@ replace (
|
||||
v5sdk_go/ws => ./submodules/okex/ws
|
||||
)
|
||||
|
||||
go 1.14
|
||||
go 1.21
|
||||
|
||||
require (
|
||||
github.com/bitly/go-simplejson v0.5.0
|
||||
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
|
||||
github.com/go-redis/redis v6.15.9+incompatible
|
||||
github.com/gorilla/websocket v1.5.1 // indirect
|
||||
github.com/kr/pretty v0.3.0 // indirect
|
||||
github.com/onsi/gomega v1.16.0 // indirect
|
||||
v5sdk_go/config v0.0.0-00010101000000-000000000000 // indirect
|
||||
github.com/sirupsen/logrus v1.9.3
|
||||
v5sdk_go/rest v0.0.0-00010101000000-000000000000
|
||||
v5sdk_go/utils v0.0.0-00010101000000-000000000000 // indirect
|
||||
v5sdk_go/ws v0.0.0-00010101000000-000000000000
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
|
||||
github.com/gorilla/websocket v1.5.1 // indirect
|
||||
github.com/kr/pretty v0.3.0 // indirect
|
||||
github.com/onsi/ginkgo v1.16.4 // indirect
|
||||
github.com/onsi/gomega v1.16.0 // indirect
|
||||
golang.org/x/net v0.17.0 // indirect
|
||||
golang.org/x/sys v0.13.0 // indirect
|
||||
v5sdk_go/config v0.0.0-00010101000000-000000000000 // indirect
|
||||
v5sdk_go/utils v0.0.0-00010101000000-000000000000 // indirect
|
||||
)
|
||||
|
38
go.sum
38
go.sum
@ -20,12 +20,10 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W
|
||||
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
|
||||
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
|
||||
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
||||
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
|
||||
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
|
||||
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
|
||||
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
|
||||
@ -52,36 +50,28 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
|
||||
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
|
||||
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
|
||||
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
|
||||
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
|
||||
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
|
||||
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
|
||||
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
|
||||
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
|
||||
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
|
||||
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
|
||||
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
|
||||
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
|
||||
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
@ -93,35 +83,21 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
|
||||
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
|
||||
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
|
||||
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
|
||||
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
|
||||
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
|
||||
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
|
||||
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
|
||||
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
|
||||
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
|
||||
@ -130,10 +106,8 @@ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miE
|
||||
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
|
||||
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
|
||||
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
|
||||
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
|
||||
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
|
||||
@ -144,3 +118,5 @@ gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
|
||||
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
455
main.go
455
main.go
@ -3,24 +3,22 @@ package main
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
"v5sdk_go/rest"
|
||||
"v5sdk_go/ws"
|
||||
"v5sdk_go/ws/wImpl"
|
||||
// "v5sdk_go/ws"
|
||||
// "v5sdk_go/ws/wImpl"
|
||||
|
||||
simple "github.com/bitly/go-simplejson"
|
||||
"github.com/go-redis/redis"
|
||||
// "github.com/go-redis/redis"
|
||||
"phyer.click/tunas/core"
|
||||
"phyer.click/tunas/private"
|
||||
// "phyer.click/tunas/private"
|
||||
"phyer.click/tunas/utils"
|
||||
)
|
||||
|
||||
func init() {
|
||||
// //fmt.Println("inited")
|
||||
}
|
||||
|
||||
// 通过rest接口,获取所有ticker信息,存入redis的stream和成交量排行榜
|
||||
@ -43,12 +41,7 @@ func RestTicker(cr *core.Core, dura time.Duration) {
|
||||
return
|
||||
}
|
||||
itemList = js.Get("data").MustArray()
|
||||
// maxTickers是重点关注的topScore的coins的数量
|
||||
length, _ := cr.Cfg.Config.Get("threads").Get("maxTickers").Int()
|
||||
fmt.Println("itemList length:", len(itemList))
|
||||
if len(itemList) < length {
|
||||
return
|
||||
}
|
||||
// 关注多少个币,在这里设置, 只需要5个币
|
||||
allTicker := cr.GetScoreList(5)
|
||||
redisCli := cr.RedisCli
|
||||
@ -73,14 +66,6 @@ func RestTicker(cr *core.Core, dura time.Duration) {
|
||||
if ti.InstType != "SPOT" {
|
||||
continue
|
||||
}
|
||||
// 把单个ticker信息小时交易量存到交易量排行榜
|
||||
// fmt.Println("ticker item: ", item)
|
||||
// 不需要排行榜了
|
||||
// _, err = redisCli.ZAdd("tickersVol|sortedSet", redis.Z{ti.VolCcy24h, ti.InstId}).Result()
|
||||
// if err != nil {
|
||||
// fmt.Println("restTicker redis err: ", err)
|
||||
// }
|
||||
|
||||
ab, _ := json.Marshal(ti)
|
||||
suffix := ""
|
||||
env := os.Getenv("GO_ENV")
|
||||
@ -95,76 +80,6 @@ func RestTicker(cr *core.Core, dura time.Duration) {
|
||||
}
|
||||
}
|
||||
|
||||
// 私有订阅: 订阅订单频道
|
||||
func wsPriv(core *core.Core) error {
|
||||
cfg := core.Cfg
|
||||
url, _ := cfg.Config.Get("connect").Get("wsPrivateBaseUrl").String()
|
||||
priCli, err := ws.NewWsClient("wss://" + url)
|
||||
err = priCli.Start()
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return err
|
||||
}
|
||||
priCli.SetDailTimeout(time.Second * 10)
|
||||
defer func() {
|
||||
priCli.Stop()
|
||||
}()
|
||||
var res bool
|
||||
key, _ := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessKey").String()
|
||||
secure, _ := core.Cfg.Config.Get("credentialReadOnly").Get("secretKey").String()
|
||||
pass, _ := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessPassphrase").String()
|
||||
// TODO 这里订阅收听订单频道结果,但是ws请求不可靠,这个所谓冗余机制存在,有比没有强,restPost函数 是保底方案
|
||||
priCli.AddBookMsgHook(func(ts time.Time, data wImpl.MsgData) error {
|
||||
// 添加你的方法
|
||||
fmt.Println("这是自定义AddBookMsgHook")
|
||||
bj, _ := json.Marshal(data)
|
||||
fmt.Println("当前数据是", string(bj))
|
||||
resp := private.OrderResp{}
|
||||
// TODO FIXME 这里默认所有的数据都是OrderResp类型,但是ws请求有其他类型的,这个地方将来肯定得改
|
||||
err := json.Unmarshal(bj, &resp)
|
||||
if err != nil {
|
||||
fmt.Println("Canvert MsgData to OrderResp err:", err)
|
||||
return nil
|
||||
}
|
||||
fmt.Println("order resp: ", resp)
|
||||
list, _ := resp.Convert()
|
||||
fmt.Println("order list: ", list)
|
||||
go func() {
|
||||
for _, v := range list {
|
||||
fmt.Println("order orderV:", v)
|
||||
core.OrderChan <- v
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
})
|
||||
|
||||
fmt.Println("key, secure, pass:", key, secure, pass)
|
||||
res, _, err = priCli.Login(key, secure, pass)
|
||||
|
||||
if res {
|
||||
fmt.Println("私有订阅登录成功!")
|
||||
} else {
|
||||
fmt.Println("私有订阅登录失败!", err)
|
||||
return err
|
||||
}
|
||||
|
||||
var args []map[string]string
|
||||
arg := make(map[string]string)
|
||||
arg["instType"] = ws.ANY
|
||||
args = append(args, arg)
|
||||
// 订阅订单频道动作执行
|
||||
res, msg, err := priCli.PrivBookOrder("subscribe", args)
|
||||
bj, _ := json.Marshal(msg)
|
||||
fmt.Println("PrivBookOrder res:", res, " msg:", string(bj), " err:", err)
|
||||
for {
|
||||
fmt.Println(utils.GetFuncName(), url+" is in subscribing")
|
||||
time.Sleep(10 * time.Minute)
|
||||
priCli.Stop()
|
||||
wsPriv(core)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// 统一受理发起rest请求的请求
|
||||
func LoopSaveCandle(cr *core.Core) {
|
||||
for {
|
||||
@ -175,7 +90,7 @@ func LoopSaveCandle(cr *core.Core) {
|
||||
}
|
||||
restQ := core.RestQueue{}
|
||||
json.Unmarshal([]byte(ary[1]), &restQ)
|
||||
// fmt.Println("before: ", restQ.InstId)
|
||||
fmt.Println("before: ", restQ.InstId)
|
||||
// before: USDT|position|key
|
||||
ary1 := strings.Split(restQ.InstId, "|")
|
||||
if ary1[0] == "USDT" {
|
||||
@ -185,7 +100,7 @@ func LoopSaveCandle(cr *core.Core) {
|
||||
if len(ary1) > 1 && ary1[1] == "position" {
|
||||
restQ.InstId = ary1[0] + "-USDT"
|
||||
}
|
||||
// fmt.Println("after: ", restQ.InstId)
|
||||
fmt.Println("after: ", restQ.InstId)
|
||||
// after: restQueue-USDT
|
||||
go func() {
|
||||
restQ.Show(cr)
|
||||
@ -194,107 +109,6 @@ func LoopSaveCandle(cr *core.Core) {
|
||||
}
|
||||
}
|
||||
|
||||
// 这个函数被废弃了, 根据现有逻辑,redisCli.Scan(cursor, "*"+pattern+"*", 2000) 得不到任何内容
|
||||
func LoopRestQ(cr *core.Core) {
|
||||
redisCli := cr.RedisCli
|
||||
cursor := uint64(0)
|
||||
n := 0
|
||||
// allTs := []int64{}
|
||||
rstr := []string{}
|
||||
pattern := "restQueue"
|
||||
fmt.Println("LoopRestQ start")
|
||||
for {
|
||||
var err error
|
||||
rstr, cursor, err = redisCli.Scan(cursor, "*"+pattern+"*", 2000).Result()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
n += len(rstr)
|
||||
if n == 0 {
|
||||
break
|
||||
}
|
||||
if n > 200000 {
|
||||
break
|
||||
}
|
||||
if n > 0 {
|
||||
fmt.Println("LoopRestQ rstr: ", rstr)
|
||||
}
|
||||
}
|
||||
// fmt.Println("LoopRestQ rstr: ", rstr)
|
||||
fmt.Println("LoopRestQ rstr len: ", len(rstr))
|
||||
for _, keyN := range rstr {
|
||||
val, err := redisCli.Get(keyN).Result()
|
||||
fmt.Println("LoopRestQ val:", val)
|
||||
restQ := core.RestQueue{}
|
||||
if err != nil || len(val) == 0 {
|
||||
continue
|
||||
}
|
||||
err = json.Unmarshal([]byte(val), &restQ)
|
||||
if err != nil {
|
||||
fmt.Println("RestQueue Unmarshal err: ", err)
|
||||
}
|
||||
// res, err := redisCli.LPush("restQueue", val).Result()
|
||||
fmt.Println("restQueue will LPush: ", val)
|
||||
if err != nil {
|
||||
redisCli.Del(keyN)
|
||||
}
|
||||
}
|
||||
time.Sleep(10 * time.Second)
|
||||
LoopRestQ(cr)
|
||||
}
|
||||
|
||||
func LoopSubscribe(cr *core.Core) {
|
||||
redisCli := cr.RedisCli
|
||||
suffix := ""
|
||||
if cr.Env == "demoEnv" {
|
||||
suffix = "-demoEnv"
|
||||
}
|
||||
pubsub := redisCli.Subscribe(core.ALLCANDLES_INNER_PUBLISH + suffix)
|
||||
_, err := pubsub.Receive()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
// 用管道来接收消息
|
||||
ch := pubsub.Channel()
|
||||
// 处理消息
|
||||
|
||||
for msg := range ch {
|
||||
cd := core.Candle{}
|
||||
json.Unmarshal([]byte(msg.Payload), &cd)
|
||||
fmt.Println("msg.PayLoad:", msg.Payload, "candle:", cd)
|
||||
go func() {
|
||||
cr.WsSubscribe(&cd)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// 订阅并执行sardine端传来的订单相关的动作
|
||||
func LoopSubscribeSubAction(cr *core.Core) {
|
||||
redisCli := cr.RedisCli
|
||||
suffix := ""
|
||||
if cr.Env == "demoEnv" {
|
||||
suffix = "-demoEnv"
|
||||
}
|
||||
// TODO FIXME cli2
|
||||
prisub := redisCli.Subscribe(core.SUBACTIONS_PUBLISH + suffix)
|
||||
_, err := prisub.Receive()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
// 用管道来接收消息
|
||||
ch := prisub.Channel()
|
||||
// 处理消息
|
||||
|
||||
for msg := range ch {
|
||||
action := core.SubAction{}
|
||||
json.Unmarshal([]byte(msg.Payload), &action)
|
||||
fmt.Println("actionMsg.PayLoad:", msg.Payload, "action:", action)
|
||||
go func() {
|
||||
cr.DispatchSubAction(&action)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// period: 每个循环开始的时间点,单位:秒
|
||||
// delay:延时多少秒后去取此值, 单位:秒
|
||||
// mdura:多少个分钟之内,遍历完获取到的goins列表, 单位:秒
|
||||
@ -305,13 +119,13 @@ func LoopAllCoinsList(period int64, delay int64, mdura int, barPeriod string, on
|
||||
cr := core.Core{}
|
||||
cr.Init()
|
||||
allScoreChan := make(chan []string)
|
||||
// fmt.Println("allCoins1")
|
||||
fmt.Println("allCoins1")
|
||||
per1 := 1 * time.Minute
|
||||
ticker := time.NewTicker(per1)
|
||||
go func() {
|
||||
for {
|
||||
tsi := time.Now().Unix()
|
||||
// fmt.Println("tsi, period, delay, tsi%(period): ", tsi, period, delay, tsi%(period))
|
||||
fmt.Println("tsi, period, delay, tsi%(period): ", tsi, period, delay, tsi%(period))
|
||||
if tsi%(period) != delay {
|
||||
time.Sleep(1 * time.Second)
|
||||
continue
|
||||
@ -321,7 +135,7 @@ func LoopAllCoinsList(period int64, delay int64, mdura int, barPeriod string, on
|
||||
go func() {
|
||||
// -1 是获取全部coin列表
|
||||
list := cr.GetScoreList(5)
|
||||
// fmt.Println("allCoins3", list)
|
||||
fmt.Println("allCoins3", list)
|
||||
allScoreChan <- list
|
||||
}()
|
||||
}
|
||||
@ -329,7 +143,7 @@ func LoopAllCoinsList(period int64, delay int64, mdura int, barPeriod string, on
|
||||
}()
|
||||
for {
|
||||
allScore, _ := <-allScoreChan
|
||||
// fmt.Println("allCoins allScore", allScore)
|
||||
fmt.Println("allCoins allScore", allScore)
|
||||
if len(allScore) == 0 {
|
||||
continue
|
||||
}
|
||||
@ -351,153 +165,13 @@ func LoopAllCoinsList(period int64, delay int64, mdura int, barPeriod string, on
|
||||
After: tmi,
|
||||
}
|
||||
js, err := json.Marshal(restQ)
|
||||
// fmt.Println("allCoins lpush js:", string(js))
|
||||
fmt.Println("allCoins lpush js:", string(js))
|
||||
cr.RedisCli.LPush("restQueue", js)
|
||||
return err
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func LoopCheckMyFavorList(core *core.Core) {
|
||||
maxTickers, _ := core.Cfg.Config.Get("threads").Get("maxTickers").Int()
|
||||
myFavorChan := make(chan []string)
|
||||
//协程:动态维护topScore
|
||||
go func(mx int) {
|
||||
myFavor := []string{}
|
||||
for {
|
||||
myFavor = core.GetMyFavorList()
|
||||
if len(myFavor) < mx {
|
||||
fmt.Println("topScore 长度不符合预期")
|
||||
break
|
||||
} else {
|
||||
fmt.Println("topScore 长度符合预期")
|
||||
}
|
||||
myFavorChan <- myFavor
|
||||
time.Sleep(12 * time.Minute)
|
||||
}
|
||||
}(maxTickers)
|
||||
//协程:循环执行rest请求candle
|
||||
|
||||
for {
|
||||
myFavor, _ := <-myFavorChan
|
||||
go func() {
|
||||
loop2(core, myFavor, maxTickers)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func loop2(core *core.Core, topScore []string, maxTickers int) {
|
||||
restPeriod, _ := core.Cfg.Config.Get("threads").Get("restPeriod").Int()
|
||||
maxCandles, _ := core.Cfg.Config.Get("threads").Get("maxCandles").Int()
|
||||
if len(topScore) < maxTickers {
|
||||
return
|
||||
}
|
||||
// fmt.Println("loop1 ", 12*time.Minute, " topScore2: ", topScore)
|
||||
// fmt.Println("topScore: ", topScore, len(topScore), mx, ok)
|
||||
//每隔Period1,重新发起一次rest请求的大循环。里面还有60秒一次的小循环. 作为ws请求的备份(保底)机制,实效性差了一点,但是稳定性高于ws
|
||||
dura := time.Duration(restPeriod) * time.Second
|
||||
|
||||
mdura := dura/time.Duration(len(topScore)) - 20*time.Millisecond
|
||||
ticker := time.NewTicker(mdura)
|
||||
done := make(chan bool)
|
||||
idx := 0
|
||||
go func(i int) {
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if i >= 4 { //: 12分钟 / 3分钟 = 4
|
||||
done <- true
|
||||
break
|
||||
}
|
||||
for {
|
||||
//内层循环3分钟一圈,3分钟内遍历完topScore限定的candle列表, 12分钟能够跑4圈
|
||||
if i >= 4 { //: 12分钟 / 3分钟 = 4
|
||||
done <- true
|
||||
break
|
||||
}
|
||||
go func() {
|
||||
// fmt.Println("loop2 :", "dura:", dura, "i:", i)
|
||||
// core.InVokeCandle(topScore, per1, maxCandles)
|
||||
mdura := dura / time.Duration(len(topScore)+1)
|
||||
for k, v := range topScore {
|
||||
go func(k int, v string) {
|
||||
time.Sleep(mdura*time.Duration(k) - 10*time.Millisecond)
|
||||
core.GetCandlesWithRest(v, k, mdura, maxCandles)
|
||||
}(k, v)
|
||||
}
|
||||
i++
|
||||
}()
|
||||
time.Sleep(dura)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}(idx)
|
||||
time.Sleep(dura - 100*time.Millisecond)
|
||||
done <- true
|
||||
ticker.Stop()
|
||||
}
|
||||
|
||||
// 订阅公共频道
|
||||
func wsPub(core *core.Core) {
|
||||
wsCli, _ := core.GetWsCli()
|
||||
err := wsCli.Start()
|
||||
// 创建ws客户端
|
||||
if err != nil {
|
||||
//fmt.Println("ws client err", err)
|
||||
return
|
||||
}
|
||||
|
||||
// 设置连接超时
|
||||
|
||||
wsCli.SetDailTimeout(time.Second * 20)
|
||||
|
||||
defer wsCli.Stop()
|
||||
// 订阅产品频道
|
||||
// 在这里初始化instrument列表
|
||||
var args []map[string]string
|
||||
arg := make(map[string]string)
|
||||
arg["instType"] = ws.SPOT
|
||||
//arg["instType"] = OPTION
|
||||
args = append(args, arg)
|
||||
|
||||
// start := time.Now()
|
||||
//订阅
|
||||
|
||||
//设置订阅事件的event handler
|
||||
wsCli.AddBookMsgHook(core.PubMsgDispatcher)
|
||||
res, _, err := wsCli.PubInstruemnts(ws.OP_SUBSCRIBE, args)
|
||||
//fmt.Println("args:", args)
|
||||
if res {
|
||||
// usedTime := time.Since(start)
|
||||
//fmt.Println("订阅成功!", usedTime.String())
|
||||
} else {
|
||||
//fmt.Println("订阅失败!", err)
|
||||
}
|
||||
core.LoopInstrumentList()
|
||||
|
||||
// start = time.Now()
|
||||
// res, _, err = r.PubInstruemnts(OP_UNSUBSCRIBE, args)
|
||||
// if res {
|
||||
// usedTime := time.Since(start)
|
||||
// //fmt.Println("取消订阅成功!", usedTime.String())
|
||||
// } else {
|
||||
// //fmt.Println("取消订阅失败!", err)
|
||||
// }
|
||||
}
|
||||
|
||||
// 处理 ws 订单请求相关订阅回调
|
||||
func subscribeOrder(cr *core.Core) {
|
||||
fmt.Println("subscribeOrder order:")
|
||||
for {
|
||||
order := <-cr.OrderChan
|
||||
go func() {
|
||||
bj, _ := json.Marshal(order)
|
||||
fmt.Println("process order:", string(bj))
|
||||
cr.ProcessOrder(order)
|
||||
}()
|
||||
}
|
||||
}
|
||||
func main() {
|
||||
cr := core.Core{}
|
||||
cr.Init()
|
||||
@ -514,57 +188,36 @@ func main() {
|
||||
}()
|
||||
}
|
||||
}()
|
||||
//全员1分钟candle & maX
|
||||
// period: 每个循环开始的时间点,单位:秒
|
||||
// delay:延时多少秒后去取此值
|
||||
// mdura:多少秒之内,遍历完获取到的goins列表
|
||||
// onceCount:每次获取这个coin几个当前周期的candle数据
|
||||
// 全员3m
|
||||
// go func() {
|
||||
// fmt.Println("LoopAllCoinsList1")
|
||||
// LoopAllCoinsList(180, 0, 180, "3m", 5, 6)
|
||||
// }()
|
||||
|
||||
// 全员15m candle
|
||||
// go func() {
|
||||
// fmt.Println("LoopAllCoinsList2")
|
||||
// LoopAllCoinsList(380, 90, 380, "15m", 4, 7)
|
||||
// }()
|
||||
// 全员30m candle
|
||||
// go func() {
|
||||
// fmt.Println("LoopAllCoinsList2")
|
||||
// LoopAllCoinsList(510, 90, 500, "30m", 5, 8)
|
||||
// }()
|
||||
// 全员1H candle
|
||||
// go func() {
|
||||
// fmt.Println("LoopAllCoinsList2")
|
||||
// LoopAllCoinsList(770, 0, 760, "1H", 9, 12)
|
||||
// }()
|
||||
go func() {
|
||||
fmt.Println("LoopAllCoinsList2")
|
||||
LoopAllCoinsList(770, 0, 760, "1H", 9, 12)
|
||||
}()
|
||||
// 全员2H candle
|
||||
go func() {
|
||||
fmt.Println("LoopAllCoinsList2")
|
||||
LoopAllCoinsList(820, 0, 820, "2H", 12, 15)
|
||||
}()
|
||||
// 全员4小时candle
|
||||
// go func() {
|
||||
// fmt.Println("LoopAllCoinsList1")
|
||||
// LoopAllCoinsList(1280, 150, 1280, "4H", 15, 19)
|
||||
// }()
|
||||
go func() {
|
||||
fmt.Println("LoopAllCoinsList1")
|
||||
LoopAllCoinsList(1280, 150, 1280, "4H", 15, 19)
|
||||
}()
|
||||
// 全员6小时candle
|
||||
//go func() {
|
||||
// fmt.Println("LoopAllCoinsList1")
|
||||
// LoopAllCoinsList(1440, 180, 1440, "6H", 17, 21)
|
||||
//}()
|
||||
go func() {
|
||||
fmt.Println("LoopAllCoinsList1")
|
||||
LoopAllCoinsList(1440, 180, 1440, "6H", 17, 21)
|
||||
}()
|
||||
// 全员12小时candle
|
||||
go func() {
|
||||
fmt.Println("LoopAllCoinsList1")
|
||||
LoopAllCoinsList(1680, 180, 1680, "12H", 19, 23)
|
||||
}()
|
||||
// 全员1Day candle & maX
|
||||
//go func() {
|
||||
// fmt.Println("LoopAllCoinsList1")
|
||||
// LoopAllCoinsList(1920, 4, 1920, "1D", 25, 30)
|
||||
// }()
|
||||
go func() {
|
||||
fmt.Println("LoopAllCoinsList1")
|
||||
LoopAllCoinsList(1920, 4, 1920, "1D", 25, 30)
|
||||
}()
|
||||
// 全员2Day candle & maX
|
||||
go func() {
|
||||
fmt.Println("LoopAllCoinsList1")
|
||||
@ -575,65 +228,13 @@ func main() {
|
||||
fmt.Println("LoopAllCoinsList1")
|
||||
LoopAllCoinsList(6400, 4, 6400, "5D", 28, 35)
|
||||
}()
|
||||
// 循环检查tickersVol|sortedSet,并执行订阅candles
|
||||
go func() {
|
||||
LoopCheckMyFavorList(&cr)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
LoopSaveCandle(&cr)
|
||||
}()
|
||||
go func() {
|
||||
LoopSubscribe(&cr)
|
||||
}()
|
||||
// 订阅下游sardine发过来的要执行的动作
|
||||
go func() {
|
||||
LoopSubscribeSubAction(&cr)
|
||||
}()
|
||||
// 废弃
|
||||
// go func() {
|
||||
// LoopRestQ(&cr)
|
||||
// }()
|
||||
|
||||
//-----------
|
||||
//私有部分
|
||||
go func() {
|
||||
core.LoopLivingOrders(&cr, 1*time.Minute)
|
||||
}()
|
||||
go func() {
|
||||
core.LoopBalances(&cr, 1*time.Minute)
|
||||
}()
|
||||
|
||||
// 公共订阅
|
||||
// wsPub(&cr)
|
||||
|
||||
// 停止私有订阅
|
||||
|
||||
// go func() {
|
||||
// //正常情况下 wsPrive不会主动退出,如果不慎退出了,自动重新运行
|
||||
// for {
|
||||
// wsPriv(&cr)
|
||||
// }
|
||||
//}()
|
||||
|
||||
go func() {
|
||||
subscribeOrder(&cr)
|
||||
core.WriteLogProcess(&cr)
|
||||
}()
|
||||
// gcl := map[string]models.GlobalCoin{}
|
||||
// msgr := cr.Messager{}
|
||||
// msgr.Init()
|
||||
// msgr.Login()
|
||||
// msgr.Alive()
|
||||
// msgr.GlobalSubscribe() // 订阅全局该订阅的公共和私有内容
|
||||
// msgr.Dispatcher(gcl)
|
||||
// msgr.Pop()
|
||||
|
||||
// //fmt.Println("listenning ... ")
|
||||
// 这个地方为了让main不退出, 将来可以改成一个http的listener
|
||||
// ip := "0.0.0.0:6066"
|
||||
// if err := http.ListenAndServe(ip, nil); err != nil {
|
||||
// fmt.Printf("start pprof failed on %s\n", ip)
|
||||
// }
|
||||
// 永久阻塞
|
||||
select {}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user