stock rsi push to elasticsearch

This commit is contained in:
zhangkun9038@dingtalk.com 2024-12-21 21:20:46 +08:00
parent 957816e407
commit f9a5a8cae4
5 changed files with 124 additions and 48 deletions

2
go.mod
View File

@ -6,7 +6,7 @@ go 1.21
require (
github.com/go-redis/redis v6.15.9+incompatible
github.com/phyer/core v0.1.62
github.com/phyer/core v0.1.64
github.com/sirupsen/logrus v1.9.3
)

4
go.sum
View File

@ -49,8 +49,8 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y
github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
github.com/phyer/core v0.1.62 h1:WIrdjV+0SG9pnTll1LwphA9nd3EzrwbSyuIJYKorlpw=
github.com/phyer/core v0.1.62/go.mod h1:XZdniJiiZPzOU8+QHPFRQWdvJa6m5Ilj5VClWWI0OQg=
github.com/phyer/core v0.1.64 h1:/FJC51rjD4ATnnr6efrlb+RoZP9l5SUq329+T7QUvWM=
github.com/phyer/core v0.1.64/go.mod h1:XZdniJiiZPzOU8+QHPFRQWdvJa6m5Ilj5VClWWI0OQg=
github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196 h1:P1sxgCsS0VIL38ufZzgUuZLLyY/B+po6kSY7ziNZT7E=
github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196/go.mod h1:iZexs5agdApNlp8HW/FqKgma4Ij1x8/o+ZLcMvY3f80=
github.com/phyer/v5sdkgo v0.1.4 h1:mAxxjPJVTYGuGDarqOcFGkzj5AgqbbzJGsnYmmsbapU=

View File

@ -17,7 +17,7 @@ func main() {
cr.TickerInforocessChan = make(chan *core.TickerInfo)
cr.CandlesProcessChan = make(chan *core.Candle)
cr.MaXProcessChan = make(chan *core.MaX)
cr.RsiProcessChan = make(chan *core.Rsi)
cr.StockRsiProcessChan = make(chan *core.StockRsi)
cr.MakeMaXsChan = make(chan *core.Candle)
cli, _ := cr.GetRedisLocalCli()
cr.RedisRemoteCli = cli

View File

@ -170,16 +170,10 @@ func LoopMakeMaX(cr *core.Core) {
}(cd)
go func(cad *core.Candle) {
time.Sleep(time.Duration(300) * time.Millisecond)
err, ct := MakeRsi(cr, cad, 16)
err, ct := MakeRsi(cr, cad, 14)
logrus.Warn(GetFuncName(), " rsi16 err:", err, " ct:", ct, " cd.InstID:", cd.InstID, " cd.Period:", cd.Period)
// cd.InvokeRestQFromRemote(cr, ct)
}(cd)
go func(cad *core.Candle) {
time.Sleep(time.Duration(300) * time.Millisecond)
err, ct := MakeRsi(cr, cad, 12)
logrus.Warn(GetFuncName(), " rsi12 err:", err, " ct:", ct, " cd.InstID:", cd.InstID, " cd.Period:", cd.Period)
// cd.InvokeRestQFromRemote(cr, ct)
}(cd)
// TODO TODO 这地方不能加延时否则makeMax处理不过来多的就丢弃了造成maX的sortedSet比candle的短很多。后面所有依赖的逻辑都受影响.
// time.Sleep(300 * time.Millisecond)
}
@ -281,7 +275,8 @@ func MakeRsi(cr *core.Core, cl *core.Candle, count int) (error, int) {
// keyName := "candle" + cl.Period + "|" + cl.InstID + "|ts:" + tss
lastTime := time.UnixMilli(tsi)
setName := "candle" + cl.Period + "|" + cl.InstID + "|sortedSet"
cdl, err := GetRangeCandleSortedSet(cr, setName, count, lastTime)
dcount := count * 2
cdl, err := GetRangeCandleSortedSet(cr, setName, dcount, lastTime)
if err != nil {
return err, 0
}
@ -294,18 +289,29 @@ func MakeRsi(cr *core.Core, cl *core.Candle, count int) (error, int) {
for _, v := range cdl.List {
closeList = append(closeList, ToFloat64(v.Data[4]))
}
rv, err := CalculateRSI(closeList, count)
rsiList, err := CalculateRSI(closeList, count)
if err != nil {
fmt.Println("Error calculating RSI:", err)
return err, 0
}
// rv, err := CalculateRSI(closeList, dcount)
percentK, percentD, err := CalculateStochRSI(rsiList, count, 3, 3)
if err != nil {
fmt.Println("Error calculating StochRSI:", err)
return err, 0
}
if err != nil {
logrus.Error("CalculateRSI err: ", err)
}
rsi := core.Rsi{
rsi := core.StockRsi{
InstID: cl.InstID,
Period: cl.Period,
Timestamp: cl.Timestamp,
Ts: tsi,
Count: count,
LastUpdate: time.Now(),
RsiVol: rv,
KVol: percentK[0],
DVol: percentD[0],
Confirm: false,
}
periodMins, err := cr.PeriodToMinutes(cl.Period)
@ -315,7 +321,7 @@ func MakeRsi(cr *core.Core, cl *core.Candle, count int) (error, int) {
if duration > time.Duration(periodMins-1)*time.Minute {
rsi.Confirm = true
}
cr.RsiProcessChan <- &rsi
cr.StockRsiProcessChan <- &rsi
return nil, 0
}
func MakeMaX(cr *core.Core, cl *core.Candle, count int) (error, int) {
@ -501,14 +507,14 @@ func MaXsProcess(cr *core.Core) {
}
func RsisProcess(cr *core.Core) {
for {
rsi := <-cr.RsiProcessChan
srsi := <-cr.StockRsiProcessChan
// logrus.Debug("mx: ", mx)
go func(rsi *core.Rsi) {
mrs := MyRsi{
Rsi: *rsi,
go func(srsi *core.StockRsi) {
mrs := MyStockRsi{
StockRsi: *srsi,
}
mrs.Process(cr)
}(rsi)
}(srsi)
}
}
@ -529,36 +535,106 @@ func TickerInfoProcess(cr *core.Core) {
// CalculateRSI calculates the RSI value for a given period and price data.
// prices: input price data, must be equal to the period length.
// period: the period for calculating RSI (e.g., 14).
// Returns the RSI value (float64) or an error if input validation fails.
func CalculateRSI(prices []float64, count int) (float64, error) {
if len(prices) != count {
return 0, errors.New("input prices length must be equal to the period length")
// CalculateRSI calculates the Relative Strength Index (RSI) for a given period.
func CalculateRSI(prices []float64, period int) ([]float64, error) {
if len(prices) < period {
return nil, errors.New("not enough data to calculate RSI")
}
// Calculate gains and losses
var gainSum, lossSum float64
for i := 1; i < len(prices); i++ {
rsi := make([]float64, len(prices)-period+1)
var avgGain, avgLoss float64
// Initial average gain and loss
for i := 1; i <= period; i++ {
change := prices[i] - prices[i-1]
if change > 0 {
gainSum += change
avgGain += change
} else {
lossSum -= change
avgLoss -= change
}
}
avgGain /= float64(period)
avgLoss /= float64(period)
if avgLoss == 0 {
rsi[0] = 100
} else {
rs := avgGain / avgLoss
rsi[0] = 100 - (100 / (1 + rs))
}
// Calculate RSI for the rest of the data
for i := period; i < len(prices); i++ {
change := prices[i] - prices[i-1]
if change > 0 {
avgGain = (avgGain*(float64(period)-1) + change) / float64(period)
avgLoss = (avgLoss * (float64(period) - 1)) / float64(period)
} else {
avgGain = (avgGain * (float64(period) - 1)) / float64(period)
avgLoss = (avgLoss*(float64(period)-1) - change) / float64(period)
}
if avgLoss == 0 {
rsi[i-period+1] = 100
} else {
rs := avgGain / avgLoss
rsi[i-period+1] = 100 - (100 / (1 + rs))
}
}
// Average gains and losses
avgGain := gainSum / float64(count)
avgLoss := lossSum / float64(count)
// Prevent division by zero
if avgLoss == 0 {
return 100, nil // RSI is 100 if there's no loss
}
// Calculate RS and RSI
rs := avgGain / avgLoss
rsi := 100 - (100 / (1 + rs))
return rsi, nil
}
// CalculateStochRSI calculates the Stochastic RSI.
func CalculateStochRSI(rsi []float64, period int, kSmoothing int, dSmoothing int) ([]float64, []float64, error) {
if len(rsi) < period {
return nil, nil, errors.New("not enough data to calculate StochRSI")
}
stochRsi := make([]float64, len(rsi)-period+1)
for i := period; i <= len(rsi); i++ {
lowest := rsi[i-period]
highest := rsi[i-period]
for j := i - period + 1; j < i; j++ {
if rsi[j] < lowest {
lowest = rsi[j]
}
if rsi[j] > highest {
highest = rsi[j]
}
}
if highest == lowest {
stochRsi[i-period] = 0
} else {
stochRsi[i-period] = (rsi[i-1] - lowest) / (highest - lowest)
}
}
// Smooth %K
percentK := smooth(stochRsi, kSmoothing)
// Smooth %D (signal line)
percentD := smooth(percentK, dSmoothing)
return percentK, percentD, nil
}
// Smooth applies a simple moving average to smooth the data.
func smooth(data []float64, period int) []float64 {
if period <= 1 || len(data) < period {
return data
}
smoothed := make([]float64, len(data)-period+1)
for i := period - 1; i < len(data); i++ {
sum := 0.0
for j := i - period + 1; j <= i; j++ {
sum += data[j]
}
smoothed[i-period+1] = sum / float64(period)
}
return smoothed
}

View File

@ -17,12 +17,12 @@ import (
// logrus "github.com/sirupsen/logrus"
)
type MyRsi struct {
core.Rsi
type MyStockRsi struct {
core.StockRsi
}
func (mrsi *MyRsi) Process(cr *core.Core) {
rsi := mrsi.Rsi
func (mrsi *MyStockRsi) Process(cr *core.Core) {
rsi := mrsi.StockRsi
go func() {
rsi.PushToWriteLogChan(cr)
}()