texus/main.go

322 lines
8.1 KiB
Go
Raw Permalink Normal View History

2024-12-02 14:03:35 +08:00
package main
import (
2024-12-16 13:11:23 +08:00
"context"
2024-12-02 14:03:35 +08:00
"encoding/json"
2024-12-25 16:43:59 +08:00
// "fmt"
2024-12-02 14:03:35 +08:00
"math/rand"
"os"
"strings"
"time"
2024-12-25 16:43:59 +08:00
"github.com/phyer/v5sdkgo/rest"
"github.com/sirupsen/logrus"
2024-12-02 22:28:41 +08:00
// "v5sdk_go/ws"
// "v5sdk_go/ws/wImpl"
2024-12-02 14:03:35 +08:00
simple "github.com/bitly/go-simplejson"
2024-12-02 22:28:41 +08:00
// "github.com/go-redis/redis"
"github.com/phyer/core"
2024-12-07 21:22:18 +08:00
// "github.com/phyer/texus/private"
"github.com/phyer/texus/utils"
2024-12-02 14:03:35 +08:00
)
func init() {
}
// 通过rest接口获取所有ticker信息存入redis的stream和成交量排行榜
func RestTicker(cr *core.Core, dura time.Duration) {
rsp := rest.RESTAPIResult{}
js := simple.Json{}
itemList := []interface{}{}
2024-12-25 16:43:59 +08:00
// logrus.Info("getAllTickerInfo err: ")
2024-12-16 13:11:23 +08:00
rsp1, err := GetAllTickerInfo(cr)
2024-12-02 14:03:35 +08:00
rsp = *rsp1
js1, err := simple.NewJson([]byte(rsp.Body))
js = *js1
if err != nil {
2024-12-25 16:43:59 +08:00
logrus.Error("restTicker err: ", err)
2024-12-02 14:03:35 +08:00
return
}
if len(rsp.Body) == 0 {
2024-12-25 16:43:59 +08:00
logrus.Error("rsp body is null")
2024-12-02 14:03:35 +08:00
return
}
itemList = js.Get("data").MustArray()
2024-12-25 16:43:59 +08:00
// logrus.Info("itemList length:", len(itemList))
2024-12-02 14:03:35 +08:00
// 关注多少个币,在这里设置, 只需要5个币
allTicker := cr.GetScoreList(-1)
2024-12-16 13:11:23 +08:00
redisCli := cr.RedisLocalCli
2024-12-02 14:03:35 +08:00
// 全部币种列表跟特定币种列表进行比对匹配后push到redis
for _, v := range itemList {
tir := core.TickerInfoResp{}
bs, err := json.Marshal(v)
if err != nil {
2024-12-25 16:43:59 +08:00
logrus.Error("restTicker marshal err: ", err)
2024-12-02 14:03:35 +08:00
return
}
err = json.Unmarshal(bs, &tir)
if err != nil {
2024-12-25 16:43:59 +08:00
logrus.Error("restTicker unmarshal err: ", err)
2024-12-02 14:03:35 +08:00
return
}
ti := tir.Convert()
2024-12-23 14:40:32 +08:00
isUsdt := strings.Contains(ti.InstID, "-USDT")
2024-12-02 14:03:35 +08:00
if !isUsdt {
continue
}
if ti.InstType != "SPOT" {
continue
}
ab, _ := json.Marshal(ti)
suffix := ""
env := os.Getenv("GO_ENV")
if env == "demoEnv" {
suffix = "-demoEnv"
}
for _, v := range allTicker {
2024-12-23 14:40:32 +08:00
if v == ti.InstID {
2024-12-04 15:38:12 +08:00
wg := core.WriteLog{
Content: ab,
2024-12-23 14:40:32 +08:00
Tag: "sardine.log.ticker." + tir.InstID,
2024-12-07 21:26:35 +08:00
Id: ti.Id,
2024-12-04 15:38:12 +08:00
}
cr.WriteLogChan <- &wg
2024-12-02 14:03:35 +08:00
redisCli.Publish(core.TICKERINFO_PUBLISH+suffix, string(ab)).Result()
}
}
}
}
2024-12-10 23:05:44 +08:00
func LoopRestTicker(cr *core.Core) {
per1 := 1 * time.Minute
RestTicker(cr, per1)
limiter := time.Tick(per1)
for {
<-limiter
go func() {
RestTicker(cr, per1)
}()
}
}
2024-12-02 14:03:35 +08:00
// 统一受理发起rest请求的请求
func LoopSaveCandle(cr *core.Core) {
for {
2024-12-16 13:11:23 +08:00
ary, err := cr.RedisLocalCli.BRPop(0, "restQueue").Result()
2024-12-02 14:03:35 +08:00
if err != nil {
2024-12-25 16:43:59 +08:00
logrus.Error("brpop err:", err)
2024-12-02 14:03:35 +08:00
continue
}
restQ := core.RestQueue{}
json.Unmarshal([]byte(ary[1]), &restQ)
2024-12-25 16:43:59 +08:00
// logrus.Info("before: ", restQ.InstId)
2024-12-02 14:03:35 +08:00
// before: USDT|position|key
ary1 := strings.Split(restQ.InstId, "|")
if ary1[0] == "USDT" {
// "USDT-USDT" 这个没有意义,忽略
continue
}
if len(ary1) > 1 && ary1[1] == "position" {
restQ.InstId = ary1[0] + "-USDT"
}
2024-12-25 16:43:59 +08:00
// logrus.Info("after: ", restQ.InstId)
2024-12-02 14:03:35 +08:00
// after: restQueue-USDT
go func() {
restQ.Show(cr)
2024-12-03 13:00:50 +08:00
restQ.Save(cr)
2024-12-02 14:03:35 +08:00
}()
}
}
2024-12-16 13:11:23 +08:00
func GetAllTickerInfo(cr *core.Core) (*rest.RESTAPIResult, error) {
rsp, err := RestInvoke(cr, "/api/v5/market/tickers?instType=SPOT", rest.GET)
return rsp, err
}
func RestInvoke(cr *core.Core, subUrl string, method string) (*rest.RESTAPIResult, error) {
restUrl, _ := cr.Cfg.Config.Get("connect").Get("restBaseUrl").String()
//ep, method, uri string, param *map[string]interface{}
rest := rest.NewRESTAPI(restUrl, method, subUrl, nil)
key, _ := cr.Cfg.Config.Get("credentialReadOnly").Get("okAccessKey").String()
secure, _ := cr.Cfg.Config.Get("credentialReadOnly").Get("secretKey").String()
pass, _ := cr.Cfg.Config.Get("credentialReadOnly").Get("okAccessPassphrase").String()
isDemo := false
if cr.Env == "demoEnv" {
isDemo = true
}
rest.SetSimulate(isDemo).SetAPIKey(key, secure, pass)
response, err := rest.Run(context.Background())
if err != nil {
2024-12-25 16:43:59 +08:00
logrus.Error("restInvoke1 err:", subUrl, err)
2024-12-16 13:11:23 +08:00
}
return response, err
}
func ShowSysTime(cr *core.Core) {
rsp, _ := RestInvoke(cr, "/api/v5/public/time", rest.GET)
2024-12-25 16:43:59 +08:00
logrus.Info("serverSystem time:", rsp)
2024-12-16 13:11:23 +08:00
}
2024-12-02 14:03:35 +08:00
// period: 每个循环开始的时间点,单位:秒
// delay延时多少秒后去取此值, 单位:秒
2024-12-25 14:55:08 +08:00
// mdura多少个秒之内遍历完获取到的goins列表, 单位:秒
2024-12-03 13:39:48 +08:00
// barPeriod: 周期名字
2024-12-02 14:03:35 +08:00
// onceCount每次获取这个coin几个当前周期的candle数据
// range: 随机的范围从0开始到range个周期作为查询的after值也就是随机n个周期去取之前的记录,对于2D5D等数据可以用来补全数据, range值越大随机散点的范围越大, 越失焦
func LoopAllCoinsList(mdura int, barPeriod string, rge int) {
2024-12-02 14:03:35 +08:00
cr := core.Core{}
cr.Init()
allScoreChan := make(chan []string)
// logrus.Info("start LoopAllCoinsList: period: ", period, " delay: ", delay, " mdura:", mdura, " barPeriod: ", barPeriod, " onceCount: ", onceCount, " rge:", rge)
per1 := 1 * time.Minute
ticker := time.NewTicker(per1)
go func() {
2024-12-02 14:03:35 +08:00
for {
tsi := time.Now().Unix()
if tsi%int64(mdura) != 0 {
time.Sleep(1 * time.Second)
continue
2024-12-02 14:03:35 +08:00
}
select {
case <-ticker.C:
go func() {
// -1 是获取全部coin列表
list := cr.GetScoreList(-1)
// logrus.Info("allCoins3", list)
allScoreChan <- list
}()
2024-12-02 14:03:35 +08:00
}
}
}()
for {
allScore := <-allScoreChan
logrus.Debug("allCoins allScore", allScore)
if len(allScore) == 0 {
continue
}
utils.TickerWrapper(time.Duration(mdura)*time.Second, allScore, func(i int, ary []string) error {
nw := time.Now()
rand.Seed(nw.UnixNano())
// 修改随机逻辑
// 将随机范围分成两部分80%的概率获取最近30%的数据20%的概率获取剩余历史数据
var ct int
randVal := rand.Float64()
switch {
case randVal < 0.3:
// 30%的概率获取最近15%的数据
ct = rand.Intn(rge * 15 / 100)
case randVal < 0.5:
// 20%的概率获取最近15%~55%的数据
ct = rand.Intn(rge*40/100) + (rge * 15 / 100)
default:
// 50%的概率获取最近55%~100%的数据
ct = rand.Intn(rge*45/100) + (rge * 55 / 100)
}
minutes, _ := cr.PeriodToMinutes(barPeriod)
tmi := nw.UnixMilli()
tmi = tmi - tmi%60000
tmi = tmi - (int64(ct) * minutes * 60000)
2025-01-15 01:39:16 +08:00
lm := "20"
// logrus.Info("instId: ", ary[i], " limit: ", lm, " onceCount:", onceCount)
if lm == "0" {
2025-01-15 01:39:16 +08:00
lm = "20"
}
restQ := core.RestQueue{
InstId: ary[i],
Bar: barPeriod,
WithWs: false,
Limit: lm,
After: tmi,
}
js, err := json.Marshal(restQ)
logrus.Debug("allCoins lpush js:", string(js))
cr.RedisLocalCli.LPush("restQueue", js)
return err
})
}
}
2024-12-02 14:03:35 +08:00
func main() {
cr := core.Core{}
2024-12-25 17:03:47 +08:00
// level := os.Getenv("TEXUS_LOGLEVEL")
logrus.SetLevel(logrus.DebugLevel)
2024-12-02 14:03:35 +08:00
cr.Init()
2024-12-16 13:11:23 +08:00
ShowSysTime(&cr)
2024-12-02 14:03:35 +08:00
// 从rest接口获取的ticker记录种的交量计入排行榜指定周期刷新一次
go func() {
2024-12-25 16:43:59 +08:00
logrus.Info("LoopRestTicker")
2024-12-10 23:05:44 +08:00
LoopRestTicker(&cr)
2024-12-02 14:03:35 +08:00
}()
2024-12-22 12:11:09 +08:00
// 全员5m
go func() {
logrus.Info("LoopAllCoinsList - 5m")
LoopAllCoinsList(300, "5m", 10220)
2024-12-22 12:11:09 +08:00
}()
2024-12-03 13:39:48 +08:00
// 全员15m candle
go func() {
logrus.Info("LoopAllCoinsList - 15m")
LoopAllCoinsList(300, "15m", 9430)
2024-12-03 13:39:48 +08:00
}()
// 全员30m candle
go func() {
logrus.Info("LoopAllCoinsList - 30m")
LoopAllCoinsList(300, "30m", 7350)
2024-12-03 13:39:48 +08:00
}()
2024-12-02 14:03:35 +08:00
// 全员1H candle
2024-12-02 22:28:41 +08:00
go func() {
logrus.Info("LoopAllCoinsList - 1H")
LoopAllCoinsList(300, "1H", 4300)
2024-12-02 22:28:41 +08:00
}()
2024-12-02 14:03:35 +08:00
// 全员2H candle
go func() {
logrus.Info("LoopAllCoinsList - 2H")
LoopAllCoinsList(300, "2H", 4080)
2024-12-02 14:03:35 +08:00
}()
// 全员4小时candle
2024-12-02 22:28:41 +08:00
go func() {
logrus.Info("LoopAllCoinsList - 4H")
LoopAllCoinsList(300, "4H", 4100)
2024-12-02 22:28:41 +08:00
}()
2024-12-02 14:03:35 +08:00
// 全员6小时candle
2024-12-02 22:28:41 +08:00
go func() {
logrus.Info("LoopAllCoinsList - 6H")
LoopAllCoinsList(360, "6H", 3120)
2024-12-02 22:28:41 +08:00
}()
2024-12-02 14:03:35 +08:00
// 全员12小时candle
go func() {
logrus.Info("LoopAllCoinsList - 12H")
LoopAllCoinsList(360, "12H", 3160)
2024-12-02 14:03:35 +08:00
}()
// 全员1Day candle & maX
2024-12-02 22:28:41 +08:00
go func() {
logrus.Info("LoopAllCoinsList - 1D")
LoopAllCoinsList(400, "1D", 2180)
2024-12-02 22:28:41 +08:00
}()
2024-12-02 14:03:35 +08:00
// 全员2Day candle & maX
go func() {
logrus.Info("LoopAllCoinsList - 2D")
LoopAllCoinsList(400, "2D", 1220)
2024-12-02 14:03:35 +08:00
}()
// 全员5Day candle & maX
go func() {
logrus.Info("LoopAllCoinsList - 5D")
LoopAllCoinsList(420, "5D", 1240)
2024-12-02 14:03:35 +08:00
}()
go func() {
LoopSaveCandle(&cr)
}()
go func() {
core.WriteLogProcess(&cr)
}()
2024-12-02 14:03:35 +08:00
// 永久阻塞
select {}
}