From f9a5a8cae40ea9aa7b904b0ea78aa313a373714c Mon Sep 17 00:00:00 2001 From: "zhangkun9038@dingtalk.com" Date: Sat, 21 Dec 2024 21:20:46 +0800 Subject: [PATCH] stock rsi push to elasticsearch --- go.mod | 2 +- go.sum | 4 +- main.go | 2 +- modules/extent.go | 156 ++++++++++++++++++++++++++++++++++------------ modules/rsi.go | 8 +-- 5 files changed, 124 insertions(+), 48 deletions(-) diff --git a/go.mod b/go.mod index 721fc00..e5ad00a 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ go 1.21 require ( github.com/go-redis/redis v6.15.9+incompatible - github.com/phyer/core v0.1.62 + github.com/phyer/core v0.1.64 github.com/sirupsen/logrus v1.9.3 ) diff --git a/go.sum b/go.sum index 70e684d..2106b06 100644 --- a/go.sum +++ b/go.sum @@ -49,8 +49,8 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= -github.com/phyer/core v0.1.62 h1:WIrdjV+0SG9pnTll1LwphA9nd3EzrwbSyuIJYKorlpw= -github.com/phyer/core v0.1.62/go.mod h1:XZdniJiiZPzOU8+QHPFRQWdvJa6m5Ilj5VClWWI0OQg= +github.com/phyer/core v0.1.64 h1:/FJC51rjD4ATnnr6efrlb+RoZP9l5SUq329+T7QUvWM= +github.com/phyer/core v0.1.64/go.mod h1:XZdniJiiZPzOU8+QHPFRQWdvJa6m5Ilj5VClWWI0OQg= github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196 h1:P1sxgCsS0VIL38ufZzgUuZLLyY/B+po6kSY7ziNZT7E= github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196/go.mod h1:iZexs5agdApNlp8HW/FqKgma4Ij1x8/o+ZLcMvY3f80= github.com/phyer/v5sdkgo v0.1.4 h1:mAxxjPJVTYGuGDarqOcFGkzj5AgqbbzJGsnYmmsbapU= diff --git a/main.go b/main.go index cc98875..bcbf223 100644 --- a/main.go +++ b/main.go @@ -17,7 +17,7 @@ func main() { cr.TickerInforocessChan = make(chan *core.TickerInfo) cr.CandlesProcessChan = make(chan *core.Candle) cr.MaXProcessChan = make(chan *core.MaX) - cr.RsiProcessChan = make(chan *core.Rsi) + cr.StockRsiProcessChan = make(chan *core.StockRsi) cr.MakeMaXsChan = make(chan *core.Candle) cli, _ := cr.GetRedisLocalCli() cr.RedisRemoteCli = cli diff --git a/modules/extent.go b/modules/extent.go index 044b284..6e0e664 100644 --- a/modules/extent.go +++ b/modules/extent.go @@ -170,16 +170,10 @@ func LoopMakeMaX(cr *core.Core) { }(cd) go func(cad *core.Candle) { time.Sleep(time.Duration(300) * time.Millisecond) - err, ct := MakeRsi(cr, cad, 16) + err, ct := MakeRsi(cr, cad, 14) logrus.Warn(GetFuncName(), " rsi16 err:", err, " ct:", ct, " cd.InstID:", cd.InstID, " cd.Period:", cd.Period) // cd.InvokeRestQFromRemote(cr, ct) }(cd) - go func(cad *core.Candle) { - time.Sleep(time.Duration(300) * time.Millisecond) - err, ct := MakeRsi(cr, cad, 12) - logrus.Warn(GetFuncName(), " rsi12 err:", err, " ct:", ct, " cd.InstID:", cd.InstID, " cd.Period:", cd.Period) - // cd.InvokeRestQFromRemote(cr, ct) - }(cd) // TODO TODO 这地方不能加延时,否则makeMax处理不过来,多的就丢弃了,造成maX的sortedSet比candle的短很多。后面所有依赖的逻辑都受影响. // time.Sleep(300 * time.Millisecond) } @@ -281,7 +275,8 @@ func MakeRsi(cr *core.Core, cl *core.Candle, count int) (error, int) { // keyName := "candle" + cl.Period + "|" + cl.InstID + "|ts:" + tss lastTime := time.UnixMilli(tsi) setName := "candle" + cl.Period + "|" + cl.InstID + "|sortedSet" - cdl, err := GetRangeCandleSortedSet(cr, setName, count, lastTime) + dcount := count * 2 + cdl, err := GetRangeCandleSortedSet(cr, setName, dcount, lastTime) if err != nil { return err, 0 } @@ -294,18 +289,29 @@ func MakeRsi(cr *core.Core, cl *core.Candle, count int) (error, int) { for _, v := range cdl.List { closeList = append(closeList, ToFloat64(v.Data[4])) } - rv, err := CalculateRSI(closeList, count) + rsiList, err := CalculateRSI(closeList, count) + if err != nil { + fmt.Println("Error calculating RSI:", err) + return err, 0 + } + // rv, err := CalculateRSI(closeList, dcount) + percentK, percentD, err := CalculateStochRSI(rsiList, count, 3, 3) + if err != nil { + fmt.Println("Error calculating StochRSI:", err) + return err, 0 + } if err != nil { logrus.Error("CalculateRSI err: ", err) } - rsi := core.Rsi{ + rsi := core.StockRsi{ InstID: cl.InstID, Period: cl.Period, Timestamp: cl.Timestamp, Ts: tsi, Count: count, LastUpdate: time.Now(), - RsiVol: rv, + KVol: percentK[0], + DVol: percentD[0], Confirm: false, } periodMins, err := cr.PeriodToMinutes(cl.Period) @@ -315,7 +321,7 @@ func MakeRsi(cr *core.Core, cl *core.Candle, count int) (error, int) { if duration > time.Duration(periodMins-1)*time.Minute { rsi.Confirm = true } - cr.RsiProcessChan <- &rsi + cr.StockRsiProcessChan <- &rsi return nil, 0 } func MakeMaX(cr *core.Core, cl *core.Candle, count int) (error, int) { @@ -501,14 +507,14 @@ func MaXsProcess(cr *core.Core) { } func RsisProcess(cr *core.Core) { for { - rsi := <-cr.RsiProcessChan + srsi := <-cr.StockRsiProcessChan // logrus.Debug("mx: ", mx) - go func(rsi *core.Rsi) { - mrs := MyRsi{ - Rsi: *rsi, + go func(srsi *core.StockRsi) { + mrs := MyStockRsi{ + StockRsi: *srsi, } mrs.Process(cr) - }(rsi) + }(srsi) } } @@ -529,36 +535,106 @@ func TickerInfoProcess(cr *core.Core) { // CalculateRSI calculates the RSI value for a given period and price data. // prices: input price data, must be equal to the period length. -// period: the period for calculating RSI (e.g., 14). -// Returns the RSI value (float64) or an error if input validation fails. -func CalculateRSI(prices []float64, count int) (float64, error) { - if len(prices) != count { - return 0, errors.New("input prices length must be equal to the period length") + +// CalculateRSI calculates the Relative Strength Index (RSI) for a given period. +func CalculateRSI(prices []float64, period int) ([]float64, error) { + if len(prices) < period { + return nil, errors.New("not enough data to calculate RSI") } - // Calculate gains and losses - var gainSum, lossSum float64 - for i := 1; i < len(prices); i++ { + rsi := make([]float64, len(prices)-period+1) + var avgGain, avgLoss float64 + + // Initial average gain and loss + for i := 1; i <= period; i++ { change := prices[i] - prices[i-1] if change > 0 { - gainSum += change + avgGain += change } else { - lossSum -= change + avgLoss -= change + } + } + avgGain /= float64(period) + avgLoss /= float64(period) + + if avgLoss == 0 { + rsi[0] = 100 + } else { + rs := avgGain / avgLoss + rsi[0] = 100 - (100 / (1 + rs)) + } + + // Calculate RSI for the rest of the data + for i := period; i < len(prices); i++ { + change := prices[i] - prices[i-1] + if change > 0 { + avgGain = (avgGain*(float64(period)-1) + change) / float64(period) + avgLoss = (avgLoss * (float64(period) - 1)) / float64(period) + } else { + avgGain = (avgGain * (float64(period) - 1)) / float64(period) + avgLoss = (avgLoss*(float64(period)-1) - change) / float64(period) + } + + if avgLoss == 0 { + rsi[i-period+1] = 100 + } else { + rs := avgGain / avgLoss + rsi[i-period+1] = 100 - (100 / (1 + rs)) } } - // Average gains and losses - avgGain := gainSum / float64(count) - avgLoss := lossSum / float64(count) - - // Prevent division by zero - if avgLoss == 0 { - return 100, nil // RSI is 100 if there's no loss - } - - // Calculate RS and RSI - rs := avgGain / avgLoss - rsi := 100 - (100 / (1 + rs)) - return rsi, nil } + +// CalculateStochRSI calculates the Stochastic RSI. +func CalculateStochRSI(rsi []float64, period int, kSmoothing int, dSmoothing int) ([]float64, []float64, error) { + if len(rsi) < period { + return nil, nil, errors.New("not enough data to calculate StochRSI") + } + + stochRsi := make([]float64, len(rsi)-period+1) + for i := period; i <= len(rsi); i++ { + lowest := rsi[i-period] + highest := rsi[i-period] + for j := i - period + 1; j < i; j++ { + if rsi[j] < lowest { + lowest = rsi[j] + } + if rsi[j] > highest { + highest = rsi[j] + } + } + + if highest == lowest { + stochRsi[i-period] = 0 + } else { + stochRsi[i-period] = (rsi[i-1] - lowest) / (highest - lowest) + } + } + + // Smooth %K + percentK := smooth(stochRsi, kSmoothing) + + // Smooth %D (signal line) + percentD := smooth(percentK, dSmoothing) + + return percentK, percentD, nil +} + +// Smooth applies a simple moving average to smooth the data. +func smooth(data []float64, period int) []float64 { + if period <= 1 || len(data) < period { + return data + } + + smoothed := make([]float64, len(data)-period+1) + for i := period - 1; i < len(data); i++ { + sum := 0.0 + for j := i - period + 1; j <= i; j++ { + sum += data[j] + } + smoothed[i-period+1] = sum / float64(period) + } + + return smoothed +} diff --git a/modules/rsi.go b/modules/rsi.go index 534ca8a..ffbf506 100644 --- a/modules/rsi.go +++ b/modules/rsi.go @@ -17,12 +17,12 @@ import ( // logrus "github.com/sirupsen/logrus" ) -type MyRsi struct { - core.Rsi +type MyStockRsi struct { + core.StockRsi } -func (mrsi *MyRsi) Process(cr *core.Core) { - rsi := mrsi.Rsi +func (mrsi *MyStockRsi) Process(cr *core.Core) { + rsi := mrsi.StockRsi go func() { rsi.PushToWriteLogChan(cr) }()