From fb0d242307b8949f22dd3cf77361e2f3128f7f53 Mon Sep 17 00:00:00 2001 From: "zhangkun9038@dingtalk.com" Date: Sat, 21 Dec 2024 23:04:36 +0800 Subject: [PATCH] stock rsi push to elasticsearch --- main.go | 3 +++ modules/extent.go | 47 ++++++++++++++++++++++++++++++++++++----------- modules/rsi.go | 11 +++++++++++ 3 files changed, 50 insertions(+), 11 deletions(-) diff --git a/main.go b/main.go index bcbf223..509fb93 100644 --- a/main.go +++ b/main.go @@ -79,6 +79,9 @@ func main() { go func() { md.RsisProcess(&cr) }() + go func() { + md.StockRsisProcess(&cr) + }() // 这些暂时不运行, 以后要不要运行再说 // go func() { diff --git a/modules/extent.go b/modules/extent.go index 20e267b..5941d04 100644 --- a/modules/extent.go +++ b/modules/extent.go @@ -298,30 +298,43 @@ func MakeRsi(cr *core.Core, cl *core.Candle, count int) (error, int) { fmt.Println("Error calculating RSI:", err) return err, 0 } - percentK, percentD, err := CalculateStochRSI(rsiList, count, 3, 3) - if err != nil { - fmt.Println("Error calculating StochRSI:", err) - return err, 0 - } - rsi := core.StockRsi{ + rsi := core.Rsi{ InstID: cl.InstID, Period: cl.Period, Timestamp: cl.Timestamp, Ts: tsi, Count: count, LastUpdate: time.Now(), - KVol: percentK[0], - DVol: percentD[0], + RsiVol: rsiList[len(rsiList)-1], Confirm: false, } periodMins, err := cr.PeriodToMinutes(cl.Period) - duration := rsi.LastUpdate.Sub(cl.Timestamp) // 获取时间差 //最后更新时间差不多大于一个周期,判定为已完成 if duration > time.Duration(periodMins-1)*time.Minute { rsi.Confirm = true } - cr.StockRsiProcessChan <- &rsi + cr.RsiProcessChan <- &rsi + + percentK, percentD, err := CalculateStochRSI(rsiList, count, 3, 3) + if err != nil { + fmt.Println("Error calculating StochRSI:", err) + return err, 0 + } + + srsi := core.StockRsi{ + InstID: cl.InstID, + Period: cl.Period, + Timestamp: cl.Timestamp, + Ts: tsi, + Count: count, + LastUpdate: time.Now(), + KVol: percentK[len(percentK)-1], + DVol: percentD[len(percentD)-1], + Confirm: true, + } + + cr.StockRsiProcessChan <- &srsi return nil, 0 } func MakeMaX(cr *core.Core, cl *core.Candle, count int) (error, int) { @@ -506,6 +519,19 @@ func MaXsProcess(cr *core.Core) { } } func RsisProcess(cr *core.Core) { + for { + rsi := <-cr.RsiProcessChan + // logrus.Debug("mx: ", mx) + go func(rsi *core.Rsi) { + mrs := MyRsi{ + Rsi: *rsi, + } + mrs.Process(cr) + }(rsi) + } +} + +func StockRsisProcess(cr *core.Core) { for { srsi := <-cr.StockRsiProcessChan // logrus.Debug("mx: ", mx) @@ -517,7 +543,6 @@ func RsisProcess(cr *core.Core) { }(srsi) } } - func TickerInfoProcess(cr *core.Core) { for { ti := <-cr.TickerInforocessChan diff --git a/modules/rsi.go b/modules/rsi.go index ffbf506..623b67c 100644 --- a/modules/rsi.go +++ b/modules/rsi.go @@ -17,6 +17,17 @@ import ( // logrus "github.com/sirupsen/logrus" ) +type MyRsi struct { + core.Rsi +} + +func (mrsi *MyRsi) Process(cr *core.Core) { + rsi := mrsi.Rsi + go func() { + rsi.PushToWriteLogChan(cr) + }() +} + type MyStockRsi struct { core.StockRsi }