From b952de1a8acaa0cd6ea69a657169b29ee51761c6 Mon Sep 17 00:00:00 2001 From: "zhangkun9038@dingtalk.com" Date: Sat, 21 Dec 2024 00:22:11 +0800 Subject: [PATCH] =?UTF-8?q?makeRsi=2016=20=E5=91=A8=E6=9C=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 2 +- go.sum | 4 +- main.go | 4 ++ modules/extent.go | 106 +++++++++++++++++++++++++++++++++++++++++++++- modules/rsi.go | 29 +++++++++++++ 5 files changed, 141 insertions(+), 4 deletions(-) create mode 100644 modules/rsi.go diff --git a/go.mod b/go.mod index 42fc8a1..5c3ffb5 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ go 1.21 require ( github.com/go-redis/redis v6.15.9+incompatible - github.com/phyer/core v0.1.55 + github.com/phyer/core v0.1.59 github.com/sirupsen/logrus v1.9.3 ) diff --git a/go.sum b/go.sum index c0e12c8..e815c03 100644 --- a/go.sum +++ b/go.sum @@ -49,8 +49,8 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= -github.com/phyer/core v0.1.55 h1:JSKvtB+taxdz+r/kIwEsMNIbwu06JXnIraqwL3Gzn6Q= -github.com/phyer/core v0.1.55/go.mod h1:XZdniJiiZPzOU8+QHPFRQWdvJa6m5Ilj5VClWWI0OQg= +github.com/phyer/core v0.1.59 h1:3evFXcX5B3BqykjONe1u1fSk+7ZQZfthKEVPwXegns8= +github.com/phyer/core v0.1.59/go.mod h1:XZdniJiiZPzOU8+QHPFRQWdvJa6m5Ilj5VClWWI0OQg= github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196 h1:P1sxgCsS0VIL38ufZzgUuZLLyY/B+po6kSY7ziNZT7E= github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196/go.mod h1:iZexs5agdApNlp8HW/FqKgma4Ij1x8/o+ZLcMvY3f80= github.com/phyer/v5sdkgo v0.1.4 h1:mAxxjPJVTYGuGDarqOcFGkzj5AgqbbzJGsnYmmsbapU= diff --git a/main.go b/main.go index f09dde8..cc98875 100644 --- a/main.go +++ b/main.go @@ -17,6 +17,7 @@ func main() { cr.TickerInforocessChan = make(chan *core.TickerInfo) cr.CandlesProcessChan = make(chan *core.Candle) cr.MaXProcessChan = make(chan *core.MaX) + cr.RsiProcessChan = make(chan *core.Rsi) cr.MakeMaXsChan = make(chan *core.Candle) cli, _ := cr.GetRedisLocalCli() cr.RedisRemoteCli = cli @@ -75,6 +76,9 @@ func main() { go func() { md.MaXsProcess(&cr) }() + go func() { + md.RsisProcess(&cr) + }() // 这些暂时不运行, 以后要不要运行再说 // go func() { diff --git a/modules/extent.go b/modules/extent.go index 0d3a33b..ebebe07 100644 --- a/modules/extent.go +++ b/modules/extent.go @@ -168,6 +168,13 @@ func LoopMakeMaX(cr *core.Core) { logrus.Warn(GetFuncName(), " ma30 err:", err, " ct:", ct, " cd.InstID:", cd.InstID, " cd.Period:", cd.Period) // cd.InvokeRestQFromRemote(cr, ct) }(cd) + go func(cad *core.Candle) { + time.Sleep(time.Duration(300) * time.Millisecond) + err, ct := MakeRsi(cr, cad, 16) + logrus.Warn(GetFuncName(), " ma30 err:", err, " ct:", ct, " cd.InstID:", cd.InstID, " cd.Period:", cd.Period) + // cd.InvokeRestQFromRemote(cr, ct) + }(cd) + // TODO TODO 这地方不能加延时,否则makeMax处理不过来,多的就丢弃了,造成maX的sortedSet比candle的短很多。后面所有依赖的逻辑都受影响. // time.Sleep(300 * time.Millisecond) } @@ -257,7 +264,54 @@ func GetExpiration(cr *core.Core, per string) (time.Duration, error) { dur := time.Duration(exp*49) * time.Minute return dur, err } - +func MakeRsi(cr *core.Core, cl *core.Candle, count int) (error, int) { + data := cl.Data + js, _ := json.Marshal(data) + if len(data) == 0 { + err := errors.New("data is block: " + string(js)) + return err, 0 + } + tsi := ToInt64(data[0]) + // tss := strconv.FormatInt(tsi, 10) + // keyName := "candle" + cl.Period + "|" + cl.InstID + "|ts:" + tss + lastTime := time.UnixMilli(tsi) + setName := "candle" + cl.Period + "|" + cl.InstID + "|sortedSet" + cdl, err := GetRangeCandleSortedSet(cr, setName, count, lastTime) + if err != nil { + return err, 0 + } + // amountLast := float64(0) + // ct := float64(0) + if len(cdl.List) == 0 { + return err, 0 + } + closeList := []float64{} + for _, v := range cdl.List { + closeList = append(closeList, ToFloat64(v.Data[4])) + } + rv, err := CalculateRSI(closeList, count) + if err != nil { + logrus.Error("CalculateRSI err: ", err) + } + rsi := core.Rsi{ + InstID: cl.InstID, + Period: cl.Period, + Timestamp: cl.Timestamp, + Ts: tsi, + LastUpdate: time.Now(), + RsiVol: rv, + Confirm: false, + } + duration := time.Now().Sub(cl.Timestamp) // 获取时间差 + if duration < 0 { + duration = -duration // 将时间差取绝对值 + } + if duration < 3*time.Minute { + rsi.Confirm = true + } + cr.RsiProcessChan <- &rsi + return nil, 0 +} func MakeMaX(cr *core.Core, cl *core.Candle, count int) (error, int) { data := cl.Data js, _ := json.Marshal(data) @@ -439,6 +493,18 @@ func MaXsProcess(cr *core.Core) { }(mx) } } +func RsisProcess(cr *core.Core) { + for { + rsi := <-cr.RsiProcessChan + // logrus.Debug("mx: ", mx) + go func(rsi *core.Rsi) { + mrs := MyRsi{ + Rsi: *rsi, + } + mrs.Process(cr) + }(rsi) + } +} func TickerInfoProcess(cr *core.Core) { for { @@ -452,3 +518,41 @@ func TickerInfoProcess(cr *core.Core) { }(ti) } } + +// 计算 RSI 的函数 + +// CalculateRSI calculates the RSI value for a given period and price data. +// prices: input price data, must be equal to the period length. +// period: the period for calculating RSI (e.g., 14). +// Returns the RSI value (float64) or an error if input validation fails. +func CalculateRSI(prices []float64, count int) (float64, error) { + if len(prices) != count { + return 0, errors.New("input prices length must be equal to the period length") + } + + // Calculate gains and losses + var gainSum, lossSum float64 + for i := 1; i < len(prices); i++ { + change := prices[i] - prices[i-1] + if change > 0 { + gainSum += change + } else { + lossSum -= change + } + } + + // Average gains and losses + avgGain := gainSum / float64(count) + avgLoss := lossSum / float64(count) + + // Prevent division by zero + if avgLoss == 0 { + return 100, nil // RSI is 100 if there's no loss + } + + // Calculate RS and RSI + rs := avgGain / avgLoss + rsi := 100 - (100 / (1 + rs)) + + return rsi, nil +} diff --git a/modules/rsi.go b/modules/rsi.go new file mode 100644 index 0000000..534ca8a --- /dev/null +++ b/modules/rsi.go @@ -0,0 +1,29 @@ +package module + +import ( + // "encoding/json" + // "errors" + // "fmt" + "github.com/phyer/core" + // "os" + // "strconv" + // "strings" + // "sync" + // "time" + // + // simple "github.com/bitly/go-simplejson" + // "github.com/go-redis/redis" + // "github.com/phyer/core/utils" + // logrus "github.com/sirupsen/logrus" +) + +type MyRsi struct { + core.Rsi +} + +func (mrsi *MyRsi) Process(cr *core.Core) { + rsi := mrsi.Rsi + go func() { + rsi.PushToWriteLogChan(cr) + }() +}