From a92106313feaa48f05aba83049b49c24f53a985b Mon Sep 17 00:00:00 2001 From: "zhangkun9038@dingtalk.com" Date: Sun, 29 Dec 2024 15:48:29 +0800 Subject: [PATCH] Revert "use cursor to fix [b[[loopSaveCandle function" This reverts commit b927864dd47f732fee6d48bfe111e1caac8aeffc. --- core/core.go | 35 ----------- go.mod | 1 - go.sum | 3 - main.go | 163 ++++++++++++++++----------------------------------- 4 files changed, 49 insertions(+), 153 deletions(-) delete mode 100644 core/core.go diff --git a/core/core.go b/core/core.go deleted file mode 100644 index 6e253ee..0000000 --- a/core/core.go +++ /dev/null @@ -1,35 +0,0 @@ -package core - -import ( - "context" -) - -type Core struct { - // Add any necessary fields here -} - -func (c *Core) GetScoreList(count int) []string { - // Implementation of GetScoreList -} - -func (c *Core) GetScoreListWithContext(ctx context.Context, count int) ([]string, error) { - done := make(chan []string, 1) - errChan := make(chan error, 1) - - go func() { - list := c.GetScoreList(count) - select { - case done <- list: - case <-ctx.Done(): - } - }() - - select { - case list := <-done: - return list, nil - case err := <-errChan: - return nil, err - case <-ctx.Done(): - return nil, ctx.Err() - } -} diff --git a/go.mod b/go.mod index 8cdda1f..f97cd10 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,6 @@ require ( github.com/phyer/core v0.1.90 github.com/phyer/v5sdkgo v0.1.4 github.com/sirupsen/logrus v1.9.3 - golang.org/x/sync v0.10.0 ) require ( diff --git a/go.sum b/go.sum index 9c6e4e1..e6e9037 100644 --- a/go.sum +++ b/go.sum @@ -81,10 +81,7 @@ golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= -golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/main.go b/main.go index 85f5575..c75434e 100644 --- a/main.go +++ b/main.go @@ -3,8 +3,6 @@ package main import ( "context" "encoding/json" - "fmt" - // "fmt" "math/rand" "os" @@ -14,7 +12,6 @@ import ( "github.com/phyer/v5sdkgo/rest" "github.com/sirupsen/logrus" - "golang.org/x/sync/errgroup" // "v5sdk_go/ws" // "v5sdk_go/ws/wImpl" @@ -172,128 +169,66 @@ func ShowSysTime(cr *core.Core) { // range: 随机的范围,从0开始到range个周期,作为查询的after值,也就是随机n个周期,去取之前的记录,对于2D,5D等数据,可以用来补全数据, range值越大,随机散点的范围越大, 越失焦 func LoopAllCoinsList(period int64, delay int64, mdura int, barPeriod string, onceCount int, rge int) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - cr := core.Core{} cr.Init() - - // Increase buffer size to accommodate multiple updates - // Buffer size should be large enough to handle typical batch size - // but not so large that it consumes too much memory - const channelBufferSize = 100 - allScoreChan := make(chan []string, channelBufferSize) - - // Add channel overflow protection - sendWithTimeout := func(scores []string) { - select { - case allScoreChan <- scores: - // Successfully sent - case <-time.After(5 * time.Second): - logrus.Warn("Failed to send scores to channel - buffer full") - case <-ctx.Done(): - return - } - } - - mainTicker := time.NewTicker(time.Duration(period) * time.Second) - defer mainTicker.Stop() - - var eg errgroup.Group - - // Producer goroutine - eg.Go(func() error { + allScoreChan := make(chan []string) + // logrus.Info("start LoopAllCoinsList: period: ", period, " delay: ", delay, " mdura:", mdura, " barPeriod: ", barPeriod, " onceCount: ", onceCount, " rge:", rge) + per1 := 1 * time.Minute + ticker := time.NewTicker(per1) + go func() { for { + tsi := time.Now().Unix() + //logrus.Info("tsi, period, delay, tsi%(period): ", tsi, period, delay, tsi%(period)) + if tsi%(period) != delay { + time.Sleep(1 * time.Second) + continue + } select { - case <-ctx.Done(): - return nil - case <-mainTicker.C: - if time.Now().Unix()%period != delay { - continue - } - - _, cancel := context.WithTimeout(ctx, 5*time.Second) - list := cr.GetScoreList(-1) - cancel() - - // Use the new send function with timeout - if len(list) > 0 { - sendWithTimeout(list) - } + case <-ticker.C: + go func() { + // -1 是获取全部coin列表 + list := cr.GetScoreList(-1) + // logrus.Info("allCoins3", list) + allScoreChan <- list + }() } } - }) + }() + for { + allScore, _ := <-allScoreChan + logrus.Debug("allCoins allScore", allScore) + if len(allScore) == 0 { + continue + } - // Consumer goroutine - eg.Go(func() error { - for { - select { - case <-ctx.Done(): - return nil - case allScore := <-allScoreChan: - if len(allScore) == 0 { - continue - } - - // Process in batches to avoid overwhelming the system - if err := processScores(&cr, allScore, mdura, barPeriod, onceCount, rge); err != nil { - logrus.WithError(err).Error("Failed to process scores") - // Consider implementing exponential backoff retry here - } + utils.TickerWrapper(time.Duration(mdura)*time.Second, allScore, func(i int, ary []string) error { + nw := time.Now() + rand.Seed(nw.UnixNano()) + ct := rand.Intn(rge) + minutes, _ := cr.PeriodToMinutes(barPeriod) + tmi := nw.UnixMilli() + tmi = tmi - tmi%60000 + tmi = tmi - (int64(ct) * minutes * 60000) + lm := strconv.Itoa(onceCount) + // logrus.Info("instId: ", ary[i], " limit: ", lm, " onceCount:", onceCount) + if lm == "0" { + lm = "100" } - } - }) - - if err := eg.Wait(); err != nil { - logrus.WithError(err).Error("LoopAllCoinsList failed") + restQ := core.RestQueue{ + InstId: ary[i], + Bar: barPeriod, + WithWs: false, + Limit: lm, + After: tmi, + } + js, err := json.Marshal(restQ) + logrus.Debug("allCoins lpush js:", string(js)) + cr.RedisLocalCli.LPush("restQueue", js) + return err + }) } } -func processScores(cr *core.Core, allScore []string, mdura int, barPeriod string, onceCount int, rge int) error { - utils.TickerWrapper(time.Duration(mdura)*time.Second, allScore, func(i int, ary []string) error { - nw := time.Now() - // 使用更安全的随机数生成 - randSource := rand.NewSource(nw.UnixNano()) - randGen := rand.New(randSource) - ct := randGen.Intn(rge) - - minutes, err := cr.PeriodToMinutes(barPeriod) - if err != nil { - return fmt.Errorf("failed to get period minutes: %w", err) - } - - tmi := nw.UnixMilli() - tmi = tmi - tmi%60000 - tmi = tmi - (int64(ct) * minutes * 60000) - - limit := onceCount - if limit == 0 { - limit = 100 - } - - restQ := core.RestQueue{ - InstId: ary[i], - Bar: barPeriod, - WithWs: false, - Limit: strconv.Itoa(limit), - After: tmi, - } - - js, err := json.Marshal(restQ) - if err != nil { - return fmt.Errorf("failed to marshal RestQueue: %w", err) - } - logrus.WithFields(logrus.Fields{ - "instId": ary[i], - "bar": barPeriod, - "limit": limit, - }).Debug("Pushing to restQueue") - - return cr.RedisLocalCli.LPush("restQueue", js).Err() - }) - return nil -} - func main() { cr := core.Core{} // level := os.Getenv("TEXUS_LOGLEVEL")