core/main.go

372 lines
9.1 KiB
Go
Raw Normal View History

2024-12-02 14:03:35 +08:00
package main
import (
2024-12-16 13:11:23 +08:00
"context"
2024-12-02 14:03:35 +08:00
"encoding/json"
"fmt"
2024-12-25 16:43:59 +08:00
// "fmt"
2024-12-02 14:03:35 +08:00
"math/rand"
"os"
2024-12-25 16:23:27 +08:00
"strconv"
2024-12-02 14:03:35 +08:00
"strings"
"time"
2024-12-25 16:43:59 +08:00
"github.com/phyer/v5sdkgo/rest"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
2024-12-25 16:43:59 +08:00
2024-12-02 22:28:41 +08:00
// "v5sdk_go/ws"
// "v5sdk_go/ws/wImpl"
2024-12-02 14:03:35 +08:00
simple "github.com/bitly/go-simplejson"
2024-12-02 22:28:41 +08:00
// "github.com/go-redis/redis"
"github.com/phyer/core"
2024-12-07 21:22:18 +08:00
// "github.com/phyer/texus/private"
"github.com/phyer/texus/utils"
2024-12-02 14:03:35 +08:00
)
func init() {
}
// 通过rest接口获取所有ticker信息存入redis的stream和成交量排行榜
func RestTicker(cr *core.Core, dura time.Duration) {
rsp := rest.RESTAPIResult{}
js := simple.Json{}
itemList := []interface{}{}
2024-12-25 16:43:59 +08:00
// logrus.Info("getAllTickerInfo err: ")
2024-12-16 13:11:23 +08:00
rsp1, err := GetAllTickerInfo(cr)
2024-12-02 14:03:35 +08:00
rsp = *rsp1
js1, err := simple.NewJson([]byte(rsp.Body))
js = *js1
if err != nil {
2024-12-25 16:43:59 +08:00
logrus.Error("restTicker err: ", err)
2024-12-02 14:03:35 +08:00
return
}
if len(rsp.Body) == 0 {
2024-12-25 16:43:59 +08:00
logrus.Error("rsp body is null")
2024-12-02 14:03:35 +08:00
return
}
itemList = js.Get("data").MustArray()
2024-12-25 16:43:59 +08:00
// logrus.Info("itemList length:", len(itemList))
2024-12-02 14:03:35 +08:00
// 关注多少个币,在这里设置, 只需要5个币
allTicker := cr.GetScoreList(-1)
2024-12-16 13:11:23 +08:00
redisCli := cr.RedisLocalCli
2024-12-02 14:03:35 +08:00
// 全部币种列表跟特定币种列表进行比对匹配后push到redis
for _, v := range itemList {
tir := core.TickerInfoResp{}
bs, err := json.Marshal(v)
if err != nil {
2024-12-25 16:43:59 +08:00
logrus.Error("restTicker marshal err: ", err)
2024-12-02 14:03:35 +08:00
return
}
err = json.Unmarshal(bs, &tir)
if err != nil {
2024-12-25 16:43:59 +08:00
logrus.Error("restTicker unmarshal err: ", err)
2024-12-02 14:03:35 +08:00
return
}
ti := tir.Convert()
2024-12-23 14:40:32 +08:00
isUsdt := strings.Contains(ti.InstID, "-USDT")
2024-12-02 14:03:35 +08:00
if !isUsdt {
continue
}
if ti.InstType != "SPOT" {
continue
}
ab, _ := json.Marshal(ti)
suffix := ""
env := os.Getenv("GO_ENV")
if env == "demoEnv" {
suffix = "-demoEnv"
}
for _, v := range allTicker {
2024-12-23 14:40:32 +08:00
if v == ti.InstID {
2024-12-04 15:38:12 +08:00
wg := core.WriteLog{
Content: ab,
2024-12-23 14:40:32 +08:00
Tag: "sardine.log.ticker." + tir.InstID,
2024-12-07 21:26:35 +08:00
Id: ti.Id,
2024-12-04 15:38:12 +08:00
}
cr.WriteLogChan <- &wg
2024-12-02 14:03:35 +08:00
redisCli.Publish(core.TICKERINFO_PUBLISH+suffix, string(ab)).Result()
}
}
}
}
2024-12-10 23:05:44 +08:00
func LoopRestTicker(cr *core.Core) {
per1 := 1 * time.Minute
RestTicker(cr, per1)
limiter := time.Tick(per1)
for {
<-limiter
go func() {
RestTicker(cr, per1)
}()
}
}
2024-12-02 14:03:35 +08:00
// 统一受理发起rest请求的请求
func LoopSaveCandle(cr *core.Core) {
for {
2024-12-16 13:11:23 +08:00
ary, err := cr.RedisLocalCli.BRPop(0, "restQueue").Result()
2024-12-02 14:03:35 +08:00
if err != nil {
2024-12-25 16:43:59 +08:00
logrus.Error("brpop err:", err)
2024-12-02 14:03:35 +08:00
continue
}
restQ := core.RestQueue{}
json.Unmarshal([]byte(ary[1]), &restQ)
2024-12-25 16:43:59 +08:00
// logrus.Info("before: ", restQ.InstId)
2024-12-02 14:03:35 +08:00
// before: USDT|position|key
ary1 := strings.Split(restQ.InstId, "|")
if ary1[0] == "USDT" {
// "USDT-USDT" 这个没有意义,忽略
continue
}
if len(ary1) > 1 && ary1[1] == "position" {
restQ.InstId = ary1[0] + "-USDT"
}
2024-12-25 16:43:59 +08:00
// logrus.Info("after: ", restQ.InstId)
2024-12-02 14:03:35 +08:00
// after: restQueue-USDT
go func() {
restQ.Show(cr)
2024-12-03 13:00:50 +08:00
restQ.Save(cr)
2024-12-02 14:03:35 +08:00
}()
}
}
2024-12-16 13:11:23 +08:00
func GetAllTickerInfo(cr *core.Core) (*rest.RESTAPIResult, error) {
rsp, err := RestInvoke(cr, "/api/v5/market/tickers?instType=SPOT", rest.GET)
return rsp, err
}
func RestInvoke(cr *core.Core, subUrl string, method string) (*rest.RESTAPIResult, error) {
restUrl, _ := cr.Cfg.Config.Get("connect").Get("restBaseUrl").String()
//ep, method, uri string, param *map[string]interface{}
rest := rest.NewRESTAPI(restUrl, method, subUrl, nil)
key, _ := cr.Cfg.Config.Get("credentialReadOnly").Get("okAccessKey").String()
secure, _ := cr.Cfg.Config.Get("credentialReadOnly").Get("secretKey").String()
pass, _ := cr.Cfg.Config.Get("credentialReadOnly").Get("okAccessPassphrase").String()
isDemo := false
if cr.Env == "demoEnv" {
isDemo = true
}
rest.SetSimulate(isDemo).SetAPIKey(key, secure, pass)
response, err := rest.Run(context.Background())
if err != nil {
2024-12-25 16:43:59 +08:00
logrus.Error("restInvoke1 err:", subUrl, err)
2024-12-16 13:11:23 +08:00
}
return response, err
}
func ShowSysTime(cr *core.Core) {
rsp, _ := RestInvoke(cr, "/api/v5/public/time", rest.GET)
2024-12-25 16:43:59 +08:00
logrus.Info("serverSystem time:", rsp)
2024-12-16 13:11:23 +08:00
}
2024-12-02 14:03:35 +08:00
// period: 每个循环开始的时间点,单位:秒
// delay延时多少秒后去取此值, 单位:秒
2024-12-25 14:55:08 +08:00
// mdura多少个秒之内遍历完获取到的goins列表, 单位:秒
2024-12-03 13:39:48 +08:00
// barPeriod: 周期名字
2024-12-02 14:03:35 +08:00
// onceCount每次获取这个coin几个当前周期的candle数据
// range: 随机的范围从0开始到range个周期作为查询的after值也就是随机n个周期去取之前的记录,对于2D5D等数据可以用来补全数据, range值越大随机散点的范围越大, 越失焦
func LoopAllCoinsList(period int64, delay int64, mdura int, barPeriod string, onceCount int, rge int) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
2024-12-02 14:03:35 +08:00
cr := core.Core{}
cr.Init()
// Increase buffer size to accommodate multiple updates
// Buffer size should be large enough to handle typical batch size
// but not so large that it consumes too much memory
const channelBufferSize = 100
allScoreChan := make(chan []string, channelBufferSize)
// Add channel overflow protection
sendWithTimeout := func(scores []string) {
select {
case allScoreChan <- scores:
// Successfully sent
case <-time.After(5 * time.Second):
logrus.Warn("Failed to send scores to channel - buffer full")
case <-ctx.Done():
return
}
}
mainTicker := time.NewTicker(time.Duration(period) * time.Second)
defer mainTicker.Stop()
var eg errgroup.Group
// Producer goroutine
eg.Go(func() error {
2024-12-02 14:03:35 +08:00
for {
select {
case <-ctx.Done():
return nil
case <-mainTicker.C:
if time.Now().Unix()%period != delay {
continue
}
_, cancel := context.WithTimeout(ctx, 5*time.Second)
list := cr.GetScoreList(-1)
cancel()
// Use the new send function with timeout
if len(list) > 0 {
sendWithTimeout(list)
}
2024-12-02 14:03:35 +08:00
}
}
})
2024-12-02 14:03:35 +08:00
// Consumer goroutine
eg.Go(func() error {
for {
select {
case <-ctx.Done():
return nil
case allScore := <-allScoreChan:
if len(allScore) == 0 {
continue
}
// Process in batches to avoid overwhelming the system
if err := processScores(&cr, allScore, mdura, barPeriod, onceCount, rge); err != nil {
logrus.WithError(err).Error("Failed to process scores")
// Consider implementing exponential backoff retry here
}
2024-12-02 14:03:35 +08:00
}
}
})
if err := eg.Wait(); err != nil {
logrus.WithError(err).Error("LoopAllCoinsList failed")
2024-12-02 14:03:35 +08:00
}
}
func processScores(cr *core.Core, allScore []string, mdura int, barPeriod string, onceCount int, rge int) error {
utils.TickerWrapper(time.Duration(mdura)*time.Second, allScore, func(i int, ary []string) error {
nw := time.Now()
// 使用更安全的随机数生成
randSource := rand.NewSource(nw.UnixNano())
randGen := rand.New(randSource)
ct := randGen.Intn(rge)
minutes, err := cr.PeriodToMinutes(barPeriod)
if err != nil {
return fmt.Errorf("failed to get period minutes: %w", err)
}
tmi := nw.UnixMilli()
tmi = tmi - tmi%60000
tmi = tmi - (int64(ct) * minutes * 60000)
limit := onceCount
if limit == 0 {
limit = 100
}
restQ := core.RestQueue{
InstId: ary[i],
Bar: barPeriod,
WithWs: false,
Limit: strconv.Itoa(limit),
After: tmi,
}
js, err := json.Marshal(restQ)
if err != nil {
return fmt.Errorf("failed to marshal RestQueue: %w", err)
}
logrus.WithFields(logrus.Fields{
"instId": ary[i],
"bar": barPeriod,
"limit": limit,
}).Debug("Pushing to restQueue")
return cr.RedisLocalCli.LPush("restQueue", js).Err()
})
return nil
}
2024-12-02 14:03:35 +08:00
func main() {
cr := core.Core{}
2024-12-25 17:03:47 +08:00
// level := os.Getenv("TEXUS_LOGLEVEL")
2024-12-26 14:30:18 +08:00
logrus.SetLevel(logrus.DebugLevel)
2024-12-02 14:03:35 +08:00
cr.Init()
2024-12-16 13:11:23 +08:00
ShowSysTime(&cr)
2024-12-02 14:03:35 +08:00
// 从rest接口获取的ticker记录种的交量计入排行榜指定周期刷新一次
go func() {
2024-12-25 16:43:59 +08:00
logrus.Info("LoopRestTicker")
2024-12-10 23:05:44 +08:00
LoopRestTicker(&cr)
2024-12-02 14:03:35 +08:00
}()
2024-12-22 12:11:09 +08:00
// 全员5m
go func() {
2024-12-25 16:43:59 +08:00
logrus.Info("LoopAllCoinsList1")
2024-12-27 23:24:39 +08:00
LoopAllCoinsList(6, 0, 60, "5m", 6, 113)
2024-12-22 12:11:09 +08:00
}()
2024-12-03 13:39:48 +08:00
// 全员15m candle
go func() {
2024-12-25 16:43:59 +08:00
logrus.Info("LoopAllCoinsList2")
2024-12-27 23:24:39 +08:00
LoopAllCoinsList(19, 90, 190, "15m", 10, 123)
2024-12-03 13:39:48 +08:00
}()
// 全员30m candle
go func() {
2024-12-25 16:43:59 +08:00
logrus.Info("LoopAllCoinsList2")
2024-12-27 23:24:39 +08:00
LoopAllCoinsList(25, 0, 250, "30m", 15, 117)
2024-12-03 13:39:48 +08:00
}()
2024-12-02 14:03:35 +08:00
// 全员1H candle
2024-12-02 22:28:41 +08:00
go func() {
2024-12-25 16:43:59 +08:00
logrus.Info("LoopAllCoinsList2")
2024-12-27 23:24:39 +08:00
LoopAllCoinsList(38, 0, 380, "1H", 15, 131)
2024-12-02 22:28:41 +08:00
}()
2024-12-02 14:03:35 +08:00
// 全员2H candle
go func() {
2024-12-25 16:43:59 +08:00
logrus.Info("LoopAllCoinsList2")
2024-12-27 23:24:39 +08:00
LoopAllCoinsList(41, 0, 410, "2H", 20, 183)
2024-12-02 14:03:35 +08:00
}()
// 全员4小时candle
2024-12-02 22:28:41 +08:00
go func() {
2024-12-25 16:43:59 +08:00
logrus.Info("LoopAllCoinsList1")
2024-12-27 23:24:39 +08:00
LoopAllCoinsList(69, 0, 690, "4H", 20, 191)
2024-12-02 22:28:41 +08:00
}()
2024-12-02 14:03:35 +08:00
// 全员6小时candle
2024-12-02 22:28:41 +08:00
go func() {
2024-12-25 16:43:59 +08:00
logrus.Info("LoopAllCoinsList1")
2024-12-27 23:24:39 +08:00
LoopAllCoinsList(72, 0, 720, "6H", 20, 203)
2024-12-02 22:28:41 +08:00
}()
2024-12-02 14:03:35 +08:00
// 全员12小时candle
go func() {
2024-12-25 16:43:59 +08:00
logrus.Info("LoopAllCoinsList1")
2024-12-27 23:24:39 +08:00
LoopAllCoinsList(89, 0, 880, "12H", 25, 217)
2024-12-02 14:03:35 +08:00
}()
// 全员1Day candle & maX
2024-12-02 22:28:41 +08:00
go func() {
2024-12-25 16:43:59 +08:00
logrus.Info("LoopAllCoinsList1")
2024-12-27 23:24:39 +08:00
LoopAllCoinsList(94, 4, 940, "1D", 25, 221)
2024-12-02 22:28:41 +08:00
}()
2024-12-02 14:03:35 +08:00
// 全员2Day candle & maX
go func() {
2024-12-25 16:43:59 +08:00
logrus.Info("LoopAllCoinsList1")
2024-12-27 23:24:39 +08:00
LoopAllCoinsList(192, 4, 1920, "2D", 25, 243)
2024-12-02 14:03:35 +08:00
}()
// 全员5Day candle & maX
go func() {
2024-12-25 16:43:59 +08:00
logrus.Info("LoopAllCoinsList1")
2024-12-27 23:24:39 +08:00
LoopAllCoinsList(320, 4, 3200, "5D", 30, 279)
2024-12-02 14:03:35 +08:00
}()
go func() {
LoopSaveCandle(&cr)
}()
go func() {
core.WriteLogProcess(&cr)
}()
2024-12-02 14:03:35 +08:00
// 永久阻塞
select {}
}