texus/main.go
2025-01-23 13:02:04 +08:00

322 lines
8.1 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"encoding/json"
// "fmt"
"math/rand"
"os"
"strings"
"time"
"github.com/phyer/v5sdkgo/rest"
"github.com/sirupsen/logrus"
// "v5sdk_go/ws"
// "v5sdk_go/ws/wImpl"
simple "github.com/bitly/go-simplejson"
// "github.com/go-redis/redis"
"github.com/phyer/core"
// "github.com/phyer/texus/private"
"github.com/phyer/texus/utils"
)
func init() {
}
// 通过rest接口获取所有ticker信息存入redis的stream和成交量排行榜
func RestTicker(cr *core.Core, dura time.Duration) {
rsp := rest.RESTAPIResult{}
js := simple.Json{}
itemList := []interface{}{}
// logrus.Info("getAllTickerInfo err: ")
rsp1, err := GetAllTickerInfo(cr)
rsp = *rsp1
js1, err := simple.NewJson([]byte(rsp.Body))
js = *js1
if err != nil {
logrus.Error("restTicker err: ", err)
return
}
if len(rsp.Body) == 0 {
logrus.Error("rsp body is null")
return
}
itemList = js.Get("data").MustArray()
// logrus.Info("itemList length:", len(itemList))
// 关注多少个币,在这里设置, 只需要5个币
allTicker := cr.GetScoreList(-1)
redisCli := cr.RedisLocalCli
// 全部币种列表跟特定币种列表进行比对匹配后push到redis
for _, v := range itemList {
tir := core.TickerInfoResp{}
bs, err := json.Marshal(v)
if err != nil {
logrus.Error("restTicker marshal err: ", err)
return
}
err = json.Unmarshal(bs, &tir)
if err != nil {
logrus.Error("restTicker unmarshal err: ", err)
return
}
ti := tir.Convert()
isUsdt := strings.Contains(ti.InstID, "-USDT")
if !isUsdt {
continue
}
if ti.InstType != "SPOT" {
continue
}
ab, _ := json.Marshal(ti)
suffix := ""
env := os.Getenv("GO_ENV")
if env == "demoEnv" {
suffix = "-demoEnv"
}
for _, v := range allTicker {
if v == ti.InstID {
wg := core.WriteLog{
Content: ab,
Tag: "sardine.log.ticker." + tir.InstID,
Id: ti.Id,
}
cr.WriteLogChan <- &wg
redisCli.Publish(core.TICKERINFO_PUBLISH+suffix, string(ab)).Result()
}
}
}
}
func LoopRestTicker(cr *core.Core) {
per1 := 1 * time.Minute
RestTicker(cr, per1)
limiter := time.Tick(per1)
for {
<-limiter
go func() {
RestTicker(cr, per1)
}()
}
}
// 统一受理发起rest请求的请求
func LoopSaveCandle(cr *core.Core) {
for {
ary, err := cr.RedisLocalCli.BRPop(0, "restQueue").Result()
if err != nil {
logrus.Error("brpop err:", err)
continue
}
restQ := core.RestQueue{}
json.Unmarshal([]byte(ary[1]), &restQ)
// logrus.Info("before: ", restQ.InstId)
// before: USDT|position|key
ary1 := strings.Split(restQ.InstId, "|")
if ary1[0] == "USDT" {
// "USDT-USDT" 这个没有意义,忽略
continue
}
if len(ary1) > 1 && ary1[1] == "position" {
restQ.InstId = ary1[0] + "-USDT"
}
// logrus.Info("after: ", restQ.InstId)
// after: restQueue-USDT
go func() {
restQ.Show(cr)
restQ.Save(cr)
}()
}
}
func GetAllTickerInfo(cr *core.Core) (*rest.RESTAPIResult, error) {
rsp, err := RestInvoke(cr, "/api/v5/market/tickers?instType=SPOT", rest.GET)
return rsp, err
}
func RestInvoke(cr *core.Core, subUrl string, method string) (*rest.RESTAPIResult, error) {
restUrl, _ := cr.Cfg.Config.Get("connect").Get("restBaseUrl").String()
//ep, method, uri string, param *map[string]interface{}
rest := rest.NewRESTAPI(restUrl, method, subUrl, nil)
key, _ := cr.Cfg.Config.Get("credentialReadOnly").Get("okAccessKey").String()
secure, _ := cr.Cfg.Config.Get("credentialReadOnly").Get("secretKey").String()
pass, _ := cr.Cfg.Config.Get("credentialReadOnly").Get("okAccessPassphrase").String()
isDemo := false
if cr.Env == "demoEnv" {
isDemo = true
}
rest.SetSimulate(isDemo).SetAPIKey(key, secure, pass)
response, err := rest.Run(context.Background())
if err != nil {
logrus.Error("restInvoke1 err:", subUrl, err)
}
return response, err
}
func ShowSysTime(cr *core.Core) {
rsp, _ := RestInvoke(cr, "/api/v5/public/time", rest.GET)
logrus.Info("serverSystem time:", rsp)
}
// period: 每个循环开始的时间点,单位:秒
// delay延时多少秒后去取此值, 单位:秒
// mdura多少个秒之内遍历完获取到的goins列表, 单位:秒
// barPeriod: 周期名字
// onceCount每次获取这个coin几个当前周期的candle数据
// range: 随机的范围从0开始到range个周期作为查询的after值也就是随机n个周期去取之前的记录,对于2D5D等数据可以用来补全数据, range值越大随机散点的范围越大, 越失焦
func LoopAllCoinsList(mdura int, barPeriod string, rge int) {
cr := core.Core{}
cr.Init()
allScoreChan := make(chan []string)
// logrus.Info("start LoopAllCoinsList: period: ", period, " delay: ", delay, " mdura:", mdura, " barPeriod: ", barPeriod, " onceCount: ", onceCount, " rge:", rge)
per1 := 1 * time.Minute
ticker := time.NewTicker(per1)
go func() {
for {
tsi := time.Now().Unix()
if tsi%int64(mdura) != 0 {
time.Sleep(1 * time.Second)
continue
}
select {
case <-ticker.C:
go func() {
// -1 是获取全部coin列表
list := cr.GetScoreList(-1)
// logrus.Info("allCoins3", list)
allScoreChan <- list
}()
}
}
}()
for {
allScore := <-allScoreChan
logrus.Debug("allCoins allScore", allScore)
if len(allScore) == 0 {
continue
}
utils.TickerWrapper(time.Duration(mdura)*time.Second, allScore, func(i int, ary []string) error {
nw := time.Now()
rand.Seed(nw.UnixNano())
// 修改随机逻辑
// 将随机范围分成两部分80%的概率获取最近30%的数据20%的概率获取剩余历史数据
var ct int
randVal := rand.Float64()
switch {
case randVal < 0.3:
// 30%的概率获取最近15%的数据
ct = rand.Intn(rge * 15 / 100)
case randVal < 0.5:
// 20%的概率获取最近15%~55%的数据
ct = rand.Intn(rge*40/100) + (rge * 15 / 100)
default:
// 50%的概率获取最近55%~100%的数据
ct = rand.Intn(rge*45/100) + (rge * 55 / 100)
}
minutes, _ := cr.PeriodToMinutes(barPeriod)
tmi := nw.UnixMilli()
tmi = tmi - tmi%60000
tmi = tmi - (int64(ct) * minutes * 60000)
lm := "20"
// logrus.Info("instId: ", ary[i], " limit: ", lm, " onceCount:", onceCount)
if lm == "0" {
lm = "20"
}
restQ := core.RestQueue{
InstId: ary[i],
Bar: barPeriod,
WithWs: false,
Limit: lm,
After: tmi,
}
js, err := json.Marshal(restQ)
logrus.Debug("allCoins lpush js:", string(js))
cr.RedisLocalCli.LPush("restQueue", js)
return err
})
}
}
func main() {
cr := core.Core{}
// level := os.Getenv("TEXUS_LOGLEVEL")
logrus.SetLevel(logrus.DebugLevel)
cr.Init()
ShowSysTime(&cr)
// 从rest接口获取的ticker记录种的交量计入排行榜指定周期刷新一次
go func() {
logrus.Info("LoopRestTicker")
LoopRestTicker(&cr)
}()
// 全员5m
go func() {
logrus.Info("LoopAllCoinsList - 5m")
LoopAllCoinsList(300, "5m", 10220)
}()
// 全员15m candle
go func() {
logrus.Info("LoopAllCoinsList - 15m")
LoopAllCoinsList(300, "15m", 9430)
}()
// 全员30m candle
go func() {
logrus.Info("LoopAllCoinsList - 30m")
LoopAllCoinsList(300, "30m", 7350)
}()
// 全员1H candle
go func() {
logrus.Info("LoopAllCoinsList - 1H")
LoopAllCoinsList(300, "1H", 4300)
}()
// 全员2H candle
go func() {
logrus.Info("LoopAllCoinsList - 2H")
LoopAllCoinsList(300, "2H", 4080)
}()
// 全员4小时candle
go func() {
logrus.Info("LoopAllCoinsList - 4H")
LoopAllCoinsList(300, "4H", 4100)
}()
// 全员6小时candle
go func() {
logrus.Info("LoopAllCoinsList - 6H")
LoopAllCoinsList(360, "6H", 3120)
}()
// 全员12小时candle
go func() {
logrus.Info("LoopAllCoinsList - 12H")
LoopAllCoinsList(360, "12H", 3160)
}()
// 全员1Day candle & maX
go func() {
logrus.Info("LoopAllCoinsList - 1D")
LoopAllCoinsList(400, "1D", 2180)
}()
// 全员2Day candle & maX
go func() {
logrus.Info("LoopAllCoinsList - 2D")
LoopAllCoinsList(400, "2D", 1220)
}()
// 全员5Day candle & maX
go func() {
logrus.Info("LoopAllCoinsList - 5D")
LoopAllCoinsList(420, "5D", 1240)
}()
go func() {
LoopSaveCandle(&cr)
}()
go func() {
core.WriteLogProcess(&cr)
}()
// 永久阻塞
select {}
}