From f259bcd4ad32fa261225a64bd4fd7051d4938843 Mon Sep 17 00:00:00 2001 From: zhangkun Date: Mon, 2 Dec 2024 22:28:41 +0800 Subject: [PATCH] up --- .README.md.swp | Bin 12288 -> 0 bytes configs/basicConfig.json | 2 +- core/candle.go | 98 ++------- core/core.go | 272 ++--------------------- core/writeLog.go | 31 +++ go.mod | 21 +- go.sum | 38 +--- main.go | 455 +++------------------------------------ 8 files changed, 113 insertions(+), 804 deletions(-) delete mode 100644 .README.md.swp create mode 100644 core/writeLog.go diff --git a/.README.md.swp b/.README.md.swp deleted file mode 100644 index db352f98444058d3e477da02937eef830db6169e..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 12288 zcmeI&&1=*^7zf~~HxE_OiwA|#y}FrNDMRJB{ceC!PH$92Iv#tnxsPtHT13xB{d1sP5w~)yVWhXN9pfyb3 z*ha|vlNoR6nomxIn~BcpKxs3u_7Mxe#G|!q%;@>w3o_SMHuEG4^Hm;MjxU@7Q@o;M zHaR|aG&61$-G48J6)1oL>l7Fy2Zr|#nMtX=d+3hsQ|p|M01BW03ZMWApuh$cu;Ewxk3mAJ zTj1g0|No!w|DRh4`40I8`3iXrc?PLLZbGt<_bmE|)D?s926HGFLirx-`Ph zC9elH(qvT8m$LUa&|17BQls4 zo^>{sL9_ni%f(BCc#=jspha5*mM6P+J0{EY%xO9~b*ejDlrAmtz?G3HHa#WWD)EG( zvCctX`<&_!cD!?%PEX6oF>S;h9mS$#&P4#V=2B5A-Z#;VbuWe5_le_D%VK>cw(HWI mEnGdyA}JM%cqn7Dxn34{B|9<}3Tn>G 0.90 { - // fmt.Println("drop ", from) - // return - // } - // fmt.Println("not drop ", from) - // TODO map - redisCli := cr.RedisCli - - ab, _ := json.Marshal(candle) - //特定的币只发送给特定的下游接收者 - // 这么设计不太好,业务逻辑和程序架构紧耦合了, - prename := cr.DispatchDownstreamNodes(candle.InstId) - - for _, v := range channels { - suffix := "" - env := os.Getenv("GO_ENV") - if env == "demoEnv" { - suffix = "-demoEnv" - } - vd := v + suffix - - // TODO FIXME cli2 - _, err := redisCli.Publish(vd, string(ab)).Result() - if len(prename) == 0 { - continue - } - nodeAdd := prename + "|" + v + suffix - // fmt.Println("nodeAdd: ", nodeAdd) - - // TODO FIXME cli2 - _, err = redisCli.Publish(nodeAdd, string(ab)).Result() - // fmt.Println("publish, channel,res,err:", nodeAdd, res, err, "candle:", string(ab)) - - if err != nil { - fmt.Println("err of ma7|ma30 add to redis2:", err, candle.From) - } - } - // TODO 下面这个先屏蔽,不订阅ws信息,否则系统压力会太大,等有更灵活的机制再说 - // redisCli.Publish("allCandlesInner|publish"+suffix, string(ab)).Result() -} diff --git a/core/core.go b/core/core.go index 6424d93..1e65eef 100644 --- a/core/core.go +++ b/core/core.go @@ -3,9 +3,9 @@ package core import ( "context" "encoding/json" - "errors" + // "errors" "fmt" - "math/rand" + // "math/rand" "os" "strconv" "strings" @@ -13,7 +13,7 @@ import ( "time" "v5sdk_go/rest" "v5sdk_go/ws" - "v5sdk_go/ws/wImpl" + // "v5sdk_go/ws/wImpl" simple "github.com/bitly/go-simplejson" "github.com/go-redis/redis" @@ -26,9 +26,11 @@ type Core struct { Cfg *MyConfig RedisCli *redis.Client RedisCli2 *redis.Client + FluentBitUrl string Wg sync.WaitGroup RestQueueChan chan *RestQueue OrderChan chan *private.Order + WriteLogChan chan *WriteLog } type RestQueue struct { InstId string @@ -70,6 +72,15 @@ func (rst *RestQueue) Save(cr *Core) { cr.SaveCandle(rst.InstId, rst.Bar, rsp, rst.Duration, rst.WithWs) } +func WriteLogProcess(cr *Core) { + for { + wg := <-cr.WriteLogChan + go func(wg *WriteLog) { + wg.Process(cr) + }(wg) + } +} + func (cr *Core) ShowSysTime() { rsp, _ := cr.RestInvoke("/api/v5/public/time", rest.GET) fmt.Println("serverSystem time:", rsp) @@ -95,107 +106,6 @@ func (core *Core) Init() { } } -func (core *Core) GetWsCli() (*ws.WsClient, error) { - url, err := core.Cfg.Config.Get("connect").Get("wsPublicBaseUrl").String() - if err != nil { - fmt.Println("err of json decode: ", err) - } - pubCli, err := ws.NewWsClient("wss://" + url) - pubCli.AddBookMsgHook(core.PubMsgDispatcher) - - if err != nil { - fmt.Println("err of create ublic ws cli:", err) - } - return pubCli, err -} - -func (core *Core) DispatchDownstreamNodes(originName string) string { - nodesStr := os.Getenv("TUNAS_DOWNSTREAM_NODES") - if len(nodesStr) == 0 { - return "" - } - nodes := strings.Split(nodesStr, "|") - count := len(nodes) - idx := utils.HashDispatch(originName, uint8(count)) - return nodes[idx] -} - -func (core *Core) Dispatch(channel string, ctype string, instId string, data interface{}) error { - // fmt.Println("start to SaveToRedis:", channel, ctype, instId, data) - b, err := json.Marshal(data) - js, err := simple.NewJson(b) - if err != nil { - fmt.Println("err of unMarshalJson1:", js) - } - - isUsdt := strings.Contains(instId, "-USDT") - instType, err := js.Get("instType").String() - if !isUsdt { - return err - } - - // fmt.Println("instId: ", instId) - redisCli := core.RedisCli - channelType := "" - if channel == "instruments" { - channelType = "instruments" - } else if strings.Contains(channel, "candle") { - channelType = "candle" - } - switch channelType { - case "instruments": - { - // fmt.Println("isInstrument:", instId) - if instType != "SPOT" { - return errors.New("instType is not SPOT") - } - _, err = redisCli.HSet("instruments|"+ctype+"|hash", instId, b).Result() - if err != nil { - fmt.Println("err of hset to redis:", err) - } - break - } - case "candle": - { - data := data.([]interface{}) - ary := strings.Split(channel, "candle") - // fmt.Println("dispatch candle:", ary[1], instId) - candle := Candle{ - InstId: instId, - Period: ary[1], - Data: data, - From: "ws", - } - core.WsSubscribe(&candle) - saveCandle := os.Getenv("TUNAS_SAVECANDLE") - if saveCandle == "true" { - candle.SetToKey(core) - } - // TODO mxLen要放到core.Cfg里 - arys := []string{ALLCANDLES_PUBLISH} - core.AddToGeneralCandleChnl(&candle, arys) - break - } - - default: - { - // data := data.([]interface{}) - // bj, _ := json.Marshal(data) - // fmt.Println("private data:", string(bj)) - return errors.New("channel type not catched") - } - } - return nil -} - -func (core *Core) PubMsgDispatcher(ts time.Time, data wImpl.MsgData) error { - instList := data.Data - for _, v := range instList { - core.Dispatch(data.Arg["channel"], data.Arg["instType"], data.Arg["instId"], v) - } - return nil -} - func (core *Core) GetRedisCli() (*redis.Client, error) { ru := core.Cfg.RedisConf.Url rp := core.Cfg.RedisConf.Password @@ -292,78 +202,6 @@ func (core *Core) SubscribeTicker(op string) error { return nil } -func (core *Core) InnerSubscribeTicker(name string, op string, retry bool) error { - // 在这里 args1 初始化tickerList的列表 - var args []map[string]string - arg := make(map[string]string) - arg["instId"] = name - arg["instType"] = ws.SPOT - args = append(args, arg) - - if retry { - go func(op string, args []map[string]string) { - core.retrySubscribe(op, args) - }(op, args) - } else { - go func(op string, args []map[string]string) { - core.OnceSubscribe(op, args) - }(op, args) - } - return nil -} - -func (core *Core) OnceSubscribe(op string, args []map[string]string) error { - wsCli, _ := core.GetWsCli() - res, _, err := wsCli.PubTickers(op, args) - // defer wsCli.Stop() - start := time.Now() - if err != nil { - fmt.Println("pubTickers err:", err) - } - if res { - usedTime := time.Since(start) - fmt.Println("订阅成功!", usedTime.String()) - } else { - fmt.Println("订阅失败!", err) - } - return err -} - -func (core *Core) retrySubscribe(op string, args []map[string]string) error { - wsCli, _ := core.GetWsCli() - res, _, err := wsCli.PubTickers(op, args) - start := time.Now() - if err != nil { - r := rand.New(rand.NewSource(time.Now().UnixNano())) - val := r.Int() / 20000000000000000 - fmt.Println("pubTickers err:", err, val, "秒后重试") - time.Sleep(time.Duration(val) * time.Second) - - return core.retrySubscribe(op, args) - } - if res { - usedTime := time.Since(start) - fmt.Println("订阅成功!", usedTime.String()) - } else { - fmt.Println("订阅失败!", err) - } - return err -} - -func (core *Core) TickerScore(score float64, name string) error { - - redisCli := core.RedisCli - mb := redis.Z{ - Score: score, - Member: name, - } - _, err := redisCli.ZAdd("tradingRank", mb).Result() - if err != nil { - fmt.Println("zadd err:", err) - } - return err -} - func (core *Core) RestInvoke(subUrl string, method string) (*rest.RESTAPIResult, error) { restUrl, _ := core.Cfg.Config.Get("connect").Get("restBaseUrl").String() //ep, method, uri string, param *map[string]interface{} @@ -473,92 +311,14 @@ func (core *Core) GetScoreList(count int) []string { redisCli := core.RedisCli - curList, err := redisCli.ZRevRange("tickersList|sortedSet", 0, int64(count-1)).Result() + curList, err := redisCli.ZRange("tickersList|sortedSet", 0, int64(count-1)).Result() if err != nil { fmt.Println("zrevrange err:", err) } + fmt.Println("curList: ", curList) return curList } -func (core *Core) SubscribeCandleWithDimention(op string, instIdList []string, dimension string) { - wsCli, err := core.GetWsCli() - if err != nil { - fmt.Println("ws client err", err) - } - err = wsCli.Start() - // 创建ws客户端 - if err != nil { - fmt.Println("ws client start err", err) - } - - var args []map[string]string - for _, vs := range instIdList { - arg := make(map[string]string) - arg["instId"] = vs - args = append(args, arg) - } - _, _, err = wsCli.PubKLine(op, wImpl.Period(dimension), args) - if err != nil { - fmt.Println("pubTickers err:", err) - } - -} - -// 订阅某币,先要和配置比对,是否允许订阅此币此周期, -func (core *Core) WsSubscribe(candle *Candle) error { - wsPeriods := []string{} - wsary := core.Cfg.Config.Get("wsDimentions").MustArray() - for _, v := range wsary { - wsPeriods = append(wsPeriods, v.(string)) - } - redisCli := core.RedisCli - period := candle.Period - instId := candle.InstId - from := candle.From - sname := instId + "|" + period + "ts|Subscribed|key" - - exists, _ := redisCli.Exists(sname).Result() - ttl, _ := redisCli.TTL(sname).Result() - inAry, _ := utils.In_Array(period, wsPeriods) - if !inAry { - estr := "subscribe 在配置中此period未被订阅: " + "," + period - // fmt.Println(estr) - err := errors.New(estr) - return err - } else { - fmt.Println("subscribe 已经订阅: ", period) - } - waitWs, _ := core.Cfg.Config.Get("threads").Get("waitWs").Int64() - willSub := false - if exists > 0 { - if ttl > 0 { - if from == "ws" { - redisCli.Expire(sname, time.Duration(waitWs)*time.Second).Result() - } - } else { - willSub = true - } - } else { - willSub = true - } - - if willSub { - // 需要订阅 - - instIdList := []string{} - instIdList = append(instIdList, instId) - - core.SubscribeCandleWithDimention(ws.OP_SUBSCRIBE, instIdList, period) - // 如果距离上次检查此candle此维度订阅状态已经过去超过2分钟还没有发现有ws消息上报,执行订阅 - dr := 1 * time.Duration(waitWs) * time.Second - redisCli.Set(sname, 1, dr).Result() - } else { - // fmt.Println("拒绝订阅candles:", keyName, "tm: ", tm, "otsi:", otsi) - } - - return nil -} - func LoopBalances(cr *Core, mdura time.Duration) { //协程:动态维护topScore ticker := time.NewTicker(mdura) diff --git a/core/writeLog.go b/core/writeLog.go new file mode 100644 index 0000000..563ecc7 --- /dev/null +++ b/core/writeLog.go @@ -0,0 +1,31 @@ +package core + +import ( + "bytes" + "fmt" + "net/http" + "os" + + logrus "github.com/sirupsen/logrus" +) + +type WriteLog struct { + Content []byte + Tag string +} + +func (wg *WriteLog) Process(cr *Core) error { + go func() { + reqBody := bytes.NewBuffer(wg.Content) + cr.Env = os.Getenv("GO_ENV") + cr.FluentBitUrl = os.Getenv("TEXUS_FluentBitUrl") + fullUrl := "http://" + cr.FluentBitUrl + "/" + wg.Tag + res, err := http.Post(fullUrl, "application/json", reqBody) + + fmt.Println("requested, response:", fullUrl, string(wg.Content), res) + if err != nil { + logrus.Error(err) + } + }() + return nil +} diff --git a/go.mod b/go.mod index e924f38..52d3e6d 100644 --- a/go.mod +++ b/go.mod @@ -7,17 +7,24 @@ replace ( v5sdk_go/ws => ./submodules/okex/ws ) -go 1.14 +go 1.21 require ( github.com/bitly/go-simplejson v0.5.0 - github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect github.com/go-redis/redis v6.15.9+incompatible - github.com/gorilla/websocket v1.5.1 // indirect - github.com/kr/pretty v0.3.0 // indirect - github.com/onsi/gomega v1.16.0 // indirect - v5sdk_go/config v0.0.0-00010101000000-000000000000 // indirect + github.com/sirupsen/logrus v1.9.3 v5sdk_go/rest v0.0.0-00010101000000-000000000000 - v5sdk_go/utils v0.0.0-00010101000000-000000000000 // indirect v5sdk_go/ws v0.0.0-00010101000000-000000000000 ) + +require ( + github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect + github.com/gorilla/websocket v1.5.1 // indirect + github.com/kr/pretty v0.3.0 // indirect + github.com/onsi/ginkgo v1.16.4 // indirect + github.com/onsi/gomega v1.16.0 // indirect + golang.org/x/net v0.17.0 // indirect + golang.org/x/sys v0.13.0 // indirect + v5sdk_go/config v0.0.0-00010101000000-000000000000 // indirect + v5sdk_go/utils v0.0.0-00010101000000-000000000000 // indirect +) diff --git a/go.sum b/go.sum index 4c76978..74c559e 100644 --- a/go.sum +++ b/go.sum @@ -20,12 +20,10 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= @@ -52,36 +50,28 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= -golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= -golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -93,35 +83,21 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= -golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= -golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= @@ -130,10 +106,8 @@ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miE google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= @@ -144,3 +118,5 @@ gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go index 43cc815..06e476b 100644 --- a/main.go +++ b/main.go @@ -3,24 +3,22 @@ package main import ( "encoding/json" "fmt" - "log" "math/rand" "os" "strings" "time" "v5sdk_go/rest" - "v5sdk_go/ws" - "v5sdk_go/ws/wImpl" + // "v5sdk_go/ws" + // "v5sdk_go/ws/wImpl" simple "github.com/bitly/go-simplejson" - "github.com/go-redis/redis" + // "github.com/go-redis/redis" "phyer.click/tunas/core" - "phyer.click/tunas/private" + // "phyer.click/tunas/private" "phyer.click/tunas/utils" ) func init() { - // //fmt.Println("inited") } // 通过rest接口,获取所有ticker信息,存入redis的stream和成交量排行榜 @@ -43,12 +41,7 @@ func RestTicker(cr *core.Core, dura time.Duration) { return } itemList = js.Get("data").MustArray() - // maxTickers是重点关注的topScore的coins的数量 - length, _ := cr.Cfg.Config.Get("threads").Get("maxTickers").Int() fmt.Println("itemList length:", len(itemList)) - if len(itemList) < length { - return - } // 关注多少个币,在这里设置, 只需要5个币 allTicker := cr.GetScoreList(5) redisCli := cr.RedisCli @@ -73,14 +66,6 @@ func RestTicker(cr *core.Core, dura time.Duration) { if ti.InstType != "SPOT" { continue } - // 把单个ticker信息小时交易量存到交易量排行榜 - // fmt.Println("ticker item: ", item) - // 不需要排行榜了 - // _, err = redisCli.ZAdd("tickersVol|sortedSet", redis.Z{ti.VolCcy24h, ti.InstId}).Result() - // if err != nil { - // fmt.Println("restTicker redis err: ", err) - // } - ab, _ := json.Marshal(ti) suffix := "" env := os.Getenv("GO_ENV") @@ -95,76 +80,6 @@ func RestTicker(cr *core.Core, dura time.Duration) { } } -// 私有订阅: 订阅订单频道 -func wsPriv(core *core.Core) error { - cfg := core.Cfg - url, _ := cfg.Config.Get("connect").Get("wsPrivateBaseUrl").String() - priCli, err := ws.NewWsClient("wss://" + url) - err = priCli.Start() - if err != nil { - log.Println(err) - return err - } - priCli.SetDailTimeout(time.Second * 10) - defer func() { - priCli.Stop() - }() - var res bool - key, _ := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessKey").String() - secure, _ := core.Cfg.Config.Get("credentialReadOnly").Get("secretKey").String() - pass, _ := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessPassphrase").String() - // TODO 这里订阅收听订单频道结果,但是ws请求不可靠,这个所谓冗余机制存在,有比没有强,restPost函数 是保底方案 - priCli.AddBookMsgHook(func(ts time.Time, data wImpl.MsgData) error { - // 添加你的方法 - fmt.Println("这是自定义AddBookMsgHook") - bj, _ := json.Marshal(data) - fmt.Println("当前数据是", string(bj)) - resp := private.OrderResp{} - // TODO FIXME 这里默认所有的数据都是OrderResp类型,但是ws请求有其他类型的,这个地方将来肯定得改 - err := json.Unmarshal(bj, &resp) - if err != nil { - fmt.Println("Canvert MsgData to OrderResp err:", err) - return nil - } - fmt.Println("order resp: ", resp) - list, _ := resp.Convert() - fmt.Println("order list: ", list) - go func() { - for _, v := range list { - fmt.Println("order orderV:", v) - core.OrderChan <- v - } - }() - return nil - }) - - fmt.Println("key, secure, pass:", key, secure, pass) - res, _, err = priCli.Login(key, secure, pass) - - if res { - fmt.Println("私有订阅登录成功!") - } else { - fmt.Println("私有订阅登录失败!", err) - return err - } - - var args []map[string]string - arg := make(map[string]string) - arg["instType"] = ws.ANY - args = append(args, arg) - // 订阅订单频道动作执行 - res, msg, err := priCli.PrivBookOrder("subscribe", args) - bj, _ := json.Marshal(msg) - fmt.Println("PrivBookOrder res:", res, " msg:", string(bj), " err:", err) - for { - fmt.Println(utils.GetFuncName(), url+" is in subscribing") - time.Sleep(10 * time.Minute) - priCli.Stop() - wsPriv(core) - } - return err -} - // 统一受理发起rest请求的请求 func LoopSaveCandle(cr *core.Core) { for { @@ -175,7 +90,7 @@ func LoopSaveCandle(cr *core.Core) { } restQ := core.RestQueue{} json.Unmarshal([]byte(ary[1]), &restQ) - // fmt.Println("before: ", restQ.InstId) + fmt.Println("before: ", restQ.InstId) // before: USDT|position|key ary1 := strings.Split(restQ.InstId, "|") if ary1[0] == "USDT" { @@ -185,7 +100,7 @@ func LoopSaveCandle(cr *core.Core) { if len(ary1) > 1 && ary1[1] == "position" { restQ.InstId = ary1[0] + "-USDT" } - // fmt.Println("after: ", restQ.InstId) + fmt.Println("after: ", restQ.InstId) // after: restQueue-USDT go func() { restQ.Show(cr) @@ -194,107 +109,6 @@ func LoopSaveCandle(cr *core.Core) { } } -// 这个函数被废弃了, 根据现有逻辑,redisCli.Scan(cursor, "*"+pattern+"*", 2000) 得不到任何内容 -func LoopRestQ(cr *core.Core) { - redisCli := cr.RedisCli - cursor := uint64(0) - n := 0 - // allTs := []int64{} - rstr := []string{} - pattern := "restQueue" - fmt.Println("LoopRestQ start") - for { - var err error - rstr, cursor, err = redisCli.Scan(cursor, "*"+pattern+"*", 2000).Result() - if err != nil { - panic(err) - } - n += len(rstr) - if n == 0 { - break - } - if n > 200000 { - break - } - if n > 0 { - fmt.Println("LoopRestQ rstr: ", rstr) - } - } - // fmt.Println("LoopRestQ rstr: ", rstr) - fmt.Println("LoopRestQ rstr len: ", len(rstr)) - for _, keyN := range rstr { - val, err := redisCli.Get(keyN).Result() - fmt.Println("LoopRestQ val:", val) - restQ := core.RestQueue{} - if err != nil || len(val) == 0 { - continue - } - err = json.Unmarshal([]byte(val), &restQ) - if err != nil { - fmt.Println("RestQueue Unmarshal err: ", err) - } - // res, err := redisCli.LPush("restQueue", val).Result() - fmt.Println("restQueue will LPush: ", val) - if err != nil { - redisCli.Del(keyN) - } - } - time.Sleep(10 * time.Second) - LoopRestQ(cr) -} - -func LoopSubscribe(cr *core.Core) { - redisCli := cr.RedisCli - suffix := "" - if cr.Env == "demoEnv" { - suffix = "-demoEnv" - } - pubsub := redisCli.Subscribe(core.ALLCANDLES_INNER_PUBLISH + suffix) - _, err := pubsub.Receive() - if err != nil { - log.Fatal(err) - } - // 用管道来接收消息 - ch := pubsub.Channel() - // 处理消息 - - for msg := range ch { - cd := core.Candle{} - json.Unmarshal([]byte(msg.Payload), &cd) - fmt.Println("msg.PayLoad:", msg.Payload, "candle:", cd) - go func() { - cr.WsSubscribe(&cd) - }() - } -} - -// 订阅并执行sardine端传来的订单相关的动作 -func LoopSubscribeSubAction(cr *core.Core) { - redisCli := cr.RedisCli - suffix := "" - if cr.Env == "demoEnv" { - suffix = "-demoEnv" - } - // TODO FIXME cli2 - prisub := redisCli.Subscribe(core.SUBACTIONS_PUBLISH + suffix) - _, err := prisub.Receive() - if err != nil { - log.Fatal(err) - } - // 用管道来接收消息 - ch := prisub.Channel() - // 处理消息 - - for msg := range ch { - action := core.SubAction{} - json.Unmarshal([]byte(msg.Payload), &action) - fmt.Println("actionMsg.PayLoad:", msg.Payload, "action:", action) - go func() { - cr.DispatchSubAction(&action) - }() - } -} - // period: 每个循环开始的时间点,单位:秒 // delay:延时多少秒后去取此值, 单位:秒 // mdura:多少个分钟之内,遍历完获取到的goins列表, 单位:秒 @@ -305,13 +119,13 @@ func LoopAllCoinsList(period int64, delay int64, mdura int, barPeriod string, on cr := core.Core{} cr.Init() allScoreChan := make(chan []string) - // fmt.Println("allCoins1") + fmt.Println("allCoins1") per1 := 1 * time.Minute ticker := time.NewTicker(per1) go func() { for { tsi := time.Now().Unix() - // fmt.Println("tsi, period, delay, tsi%(period): ", tsi, period, delay, tsi%(period)) + fmt.Println("tsi, period, delay, tsi%(period): ", tsi, period, delay, tsi%(period)) if tsi%(period) != delay { time.Sleep(1 * time.Second) continue @@ -321,7 +135,7 @@ func LoopAllCoinsList(period int64, delay int64, mdura int, barPeriod string, on go func() { // -1 是获取全部coin列表 list := cr.GetScoreList(5) - // fmt.Println("allCoins3", list) + fmt.Println("allCoins3", list) allScoreChan <- list }() } @@ -329,7 +143,7 @@ func LoopAllCoinsList(period int64, delay int64, mdura int, barPeriod string, on }() for { allScore, _ := <-allScoreChan - // fmt.Println("allCoins allScore", allScore) + fmt.Println("allCoins allScore", allScore) if len(allScore) == 0 { continue } @@ -351,153 +165,13 @@ func LoopAllCoinsList(period int64, delay int64, mdura int, barPeriod string, on After: tmi, } js, err := json.Marshal(restQ) - // fmt.Println("allCoins lpush js:", string(js)) + fmt.Println("allCoins lpush js:", string(js)) cr.RedisCli.LPush("restQueue", js) return err }) } } -func LoopCheckMyFavorList(core *core.Core) { - maxTickers, _ := core.Cfg.Config.Get("threads").Get("maxTickers").Int() - myFavorChan := make(chan []string) - //协程:动态维护topScore - go func(mx int) { - myFavor := []string{} - for { - myFavor = core.GetMyFavorList() - if len(myFavor) < mx { - fmt.Println("topScore 长度不符合预期") - break - } else { - fmt.Println("topScore 长度符合预期") - } - myFavorChan <- myFavor - time.Sleep(12 * time.Minute) - } - }(maxTickers) - //协程:循环执行rest请求candle - - for { - myFavor, _ := <-myFavorChan - go func() { - loop2(core, myFavor, maxTickers) - }() - } -} - -func loop2(core *core.Core, topScore []string, maxTickers int) { - restPeriod, _ := core.Cfg.Config.Get("threads").Get("restPeriod").Int() - maxCandles, _ := core.Cfg.Config.Get("threads").Get("maxCandles").Int() - if len(topScore) < maxTickers { - return - } - // fmt.Println("loop1 ", 12*time.Minute, " topScore2: ", topScore) - // fmt.Println("topScore: ", topScore, len(topScore), mx, ok) - //每隔Period1,重新发起一次rest请求的大循环。里面还有60秒一次的小循环. 作为ws请求的备份(保底)机制,实效性差了一点,但是稳定性高于ws - dura := time.Duration(restPeriod) * time.Second - - mdura := dura/time.Duration(len(topScore)) - 20*time.Millisecond - ticker := time.NewTicker(mdura) - done := make(chan bool) - idx := 0 - go func(i int) { - for { - select { - case <-ticker.C: - if i >= 4 { //: 12分钟 / 3分钟 = 4 - done <- true - break - } - for { - //内层循环3分钟一圈,3分钟内遍历完topScore限定的candle列表, 12分钟能够跑4圈 - if i >= 4 { //: 12分钟 / 3分钟 = 4 - done <- true - break - } - go func() { - // fmt.Println("loop2 :", "dura:", dura, "i:", i) - // core.InVokeCandle(topScore, per1, maxCandles) - mdura := dura / time.Duration(len(topScore)+1) - for k, v := range topScore { - go func(k int, v string) { - time.Sleep(mdura*time.Duration(k) - 10*time.Millisecond) - core.GetCandlesWithRest(v, k, mdura, maxCandles) - }(k, v) - } - i++ - }() - time.Sleep(dura) - continue - } - } - } - }(idx) - time.Sleep(dura - 100*time.Millisecond) - done <- true - ticker.Stop() -} - -// 订阅公共频道 -func wsPub(core *core.Core) { - wsCli, _ := core.GetWsCli() - err := wsCli.Start() - // 创建ws客户端 - if err != nil { - //fmt.Println("ws client err", err) - return - } - - // 设置连接超时 - - wsCli.SetDailTimeout(time.Second * 20) - - defer wsCli.Stop() - // 订阅产品频道 - // 在这里初始化instrument列表 - var args []map[string]string - arg := make(map[string]string) - arg["instType"] = ws.SPOT - //arg["instType"] = OPTION - args = append(args, arg) - - // start := time.Now() - //订阅 - - //设置订阅事件的event handler - wsCli.AddBookMsgHook(core.PubMsgDispatcher) - res, _, err := wsCli.PubInstruemnts(ws.OP_SUBSCRIBE, args) - //fmt.Println("args:", args) - if res { - // usedTime := time.Since(start) - //fmt.Println("订阅成功!", usedTime.String()) - } else { - //fmt.Println("订阅失败!", err) - } - core.LoopInstrumentList() - - // start = time.Now() - // res, _, err = r.PubInstruemnts(OP_UNSUBSCRIBE, args) - // if res { - // usedTime := time.Since(start) - // //fmt.Println("取消订阅成功!", usedTime.String()) - // } else { - // //fmt.Println("取消订阅失败!", err) - // } -} - -// 处理 ws 订单请求相关订阅回调 -func subscribeOrder(cr *core.Core) { - fmt.Println("subscribeOrder order:") - for { - order := <-cr.OrderChan - go func() { - bj, _ := json.Marshal(order) - fmt.Println("process order:", string(bj)) - cr.ProcessOrder(order) - }() - } -} func main() { cr := core.Core{} cr.Init() @@ -514,57 +188,36 @@ func main() { }() } }() - //全员1分钟candle & maX - // period: 每个循环开始的时间点,单位:秒 - // delay:延时多少秒后去取此值 - // mdura:多少秒之内,遍历完获取到的goins列表 - // onceCount:每次获取这个coin几个当前周期的candle数据 - // 全员3m - // go func() { - // fmt.Println("LoopAllCoinsList1") - // LoopAllCoinsList(180, 0, 180, "3m", 5, 6) - // }() - - // 全员15m candle - // go func() { - // fmt.Println("LoopAllCoinsList2") - // LoopAllCoinsList(380, 90, 380, "15m", 4, 7) - // }() - // 全员30m candle - // go func() { - // fmt.Println("LoopAllCoinsList2") - // LoopAllCoinsList(510, 90, 500, "30m", 5, 8) - // }() // 全员1H candle - // go func() { - // fmt.Println("LoopAllCoinsList2") - // LoopAllCoinsList(770, 0, 760, "1H", 9, 12) - // }() + go func() { + fmt.Println("LoopAllCoinsList2") + LoopAllCoinsList(770, 0, 760, "1H", 9, 12) + }() // 全员2H candle go func() { fmt.Println("LoopAllCoinsList2") LoopAllCoinsList(820, 0, 820, "2H", 12, 15) }() // 全员4小时candle - // go func() { - // fmt.Println("LoopAllCoinsList1") - // LoopAllCoinsList(1280, 150, 1280, "4H", 15, 19) - // }() + go func() { + fmt.Println("LoopAllCoinsList1") + LoopAllCoinsList(1280, 150, 1280, "4H", 15, 19) + }() // 全员6小时candle - //go func() { - // fmt.Println("LoopAllCoinsList1") - // LoopAllCoinsList(1440, 180, 1440, "6H", 17, 21) - //}() + go func() { + fmt.Println("LoopAllCoinsList1") + LoopAllCoinsList(1440, 180, 1440, "6H", 17, 21) + }() // 全员12小时candle go func() { fmt.Println("LoopAllCoinsList1") LoopAllCoinsList(1680, 180, 1680, "12H", 19, 23) }() // 全员1Day candle & maX - //go func() { - // fmt.Println("LoopAllCoinsList1") - // LoopAllCoinsList(1920, 4, 1920, "1D", 25, 30) - // }() + go func() { + fmt.Println("LoopAllCoinsList1") + LoopAllCoinsList(1920, 4, 1920, "1D", 25, 30) + }() // 全员2Day candle & maX go func() { fmt.Println("LoopAllCoinsList1") @@ -575,65 +228,13 @@ func main() { fmt.Println("LoopAllCoinsList1") LoopAllCoinsList(6400, 4, 6400, "5D", 28, 35) }() - // 循环检查tickersVol|sortedSet,并执行订阅candles - go func() { - LoopCheckMyFavorList(&cr) - }() - go func() { LoopSaveCandle(&cr) }() - go func() { - LoopSubscribe(&cr) - }() - // 订阅下游sardine发过来的要执行的动作 - go func() { - LoopSubscribeSubAction(&cr) - }() - // 废弃 - // go func() { - // LoopRestQ(&cr) - // }() - - //----------- - //私有部分 - go func() { - core.LoopLivingOrders(&cr, 1*time.Minute) - }() - go func() { - core.LoopBalances(&cr, 1*time.Minute) - }() - - // 公共订阅 - // wsPub(&cr) - - // 停止私有订阅 - - // go func() { - // //正常情况下 wsPrive不会主动退出,如果不慎退出了,自动重新运行 - // for { - // wsPriv(&cr) - // } - //}() go func() { - subscribeOrder(&cr) + core.WriteLogProcess(&cr) }() - // gcl := map[string]models.GlobalCoin{} - // msgr := cr.Messager{} - // msgr.Init() - // msgr.Login() - // msgr.Alive() - // msgr.GlobalSubscribe() // 订阅全局该订阅的公共和私有内容 - // msgr.Dispatcher(gcl) - // msgr.Pop() - - // //fmt.Println("listenning ... ") - // 这个地方为了让main不退出, 将来可以改成一个http的listener - // ip := "0.0.0.0:6066" - // if err := http.ListenAndServe(ip, nil); err != nil { - // fmt.Printf("start pprof failed on %s\n", ip) - // } // 永久阻塞 select {} }