上个bug貌似已经解决

This commit is contained in:
zhangkun9038@dingtalk.com 2024-12-25 16:54:46 +08:00
parent 8e36b0b331
commit b56318d385
5 changed files with 31 additions and 30 deletions

10
main.go
View File

@ -1,7 +1,7 @@
package main package main
import ( import (
"fmt" // "fmt"
"os" "os"
"github.com/phyer/core" "github.com/phyer/core"
@ -25,7 +25,7 @@ func main() {
rdsLs, _ := md.GetRemoteRedisConfigList() rdsLs, _ := md.GetRemoteRedisConfigList()
// 目前只有phyer里部署的tunas会发布tickerInfo信息 // 目前只有phyer里部署的tunas会发布tickerInfo信息
fmt.Println("len of rdsLs: ", len(rdsLs)) // fmt.Println("len of rdsLs: ", len(rdsLs))
// 订阅 redis TickerInfo // 订阅 redis TickerInfo
go func(vv *core.RedisConfig) { go func(vv *core.RedisConfig) {
@ -33,7 +33,7 @@ func main() {
if !allowed { if !allowed {
return return
} }
fmt.Println("start subscribe core.TICKERINFO_PUBLISH") logrus.Info("start subscribe core.TICKERINFO_PUBLISH")
md.LoopSubscribe(&cr, core.TICKERINFO_PUBLISH, vv) md.LoopSubscribe(&cr, core.TICKERINFO_PUBLISH, vv)
}(rdsLs[0]) }(rdsLs[0])
@ -43,7 +43,7 @@ func main() {
if !allowed { if !allowed {
return return
} }
fmt.Println("start subscribe core.TICKERINFO_PUBLISH") logrus.Info("start subscribe core.TICKERINFO_PUBLISH")
md.LoopSubscribe(&cr, core.ALLCANDLES_PUBLISH, vv) md.LoopSubscribe(&cr, core.ALLCANDLES_PUBLISH, vv)
}(rdsLs[0]) }(rdsLs[0])
@ -106,6 +106,6 @@ func main() {
// } // }
// allMaxs: {1634413398759-0 map[ma7|candle5m|LUNA-USDT|key:{"ts":1634412300000,"value":36.906796182686605}]} // allMaxs: {1634413398759-0 map[ma7|candle5m|LUNA-USDT|key:{"ts":1634412300000,"value":36.906796182686605}]}
// allCandles: {1634413398859-0 map[candle2H|XRP-USDT|key:{"channel":"candle2H","data":"eyJjIjoxLjExNzk1LCJmcm9tIjoicmVzdCIsImgiOjEuMTIyNzksImwiOjEuMTA4ODUsIm8iOjEuMTE3MzUsInRzIjoxNjM0MjkyMDAwMDAwLCJ2b2wiOjUwMDc5OTEuNDM5MDg1LCJ2b2xDY3kiOjU1OTE2MjUuNzI4NDc2fQ==","instId":"XRP-USDT"}]} // allCandles: {1634413398859-0 map[candle2H|XRP-USDT|key:{"channel":"candle2H","data":"eyJjIjoxLjExNzk1LCJmcm9tIjoicmVzdCIsImgiOjEuMTIyNzksImwiOjEuMTA4ODUsIm8iOjEuMTE3MzUsInRzIjoxNjM0MjkyMDAwMDAwLCJ2b2wiOjUwMDc5OTEuNDM5MDg1LCJ2b2xDY3kiOjU1OTE2MjUuNzI4NDc2fQ==","instId":"XRP-USDT"}]}
fmt.Println("siaga started") logrus.Info("siaga started")
select {} select {}
} }

View File

@ -60,12 +60,12 @@ func LoopSubscribe(cr *core.Core, channelName string, redisConf *core.RedisConfi
if strings.Contains(env, "demoEnv") { if strings.Contains(env, "demoEnv") {
suffix = "-demoEnv" suffix = "-demoEnv"
} }
fmt.Println("loopSubscribe: ", channelName+suffix) logrus.Info("loopSubscribe: ", channelName+suffix)
pubsub := redisRemoteCli.Subscribe(channelName + suffix) pubsub := redisRemoteCli.Subscribe(channelName + suffix)
_, err := pubsub.Receive() _, err := pubsub.Receive()
if err != nil { if err != nil {
// cr.ErrorToRobot(utils.GetFuncName(), err) // cr.ErrorToRobot(utils.GetFuncName(), err)
fmt.Println(GetFuncName(), " ", err) logrus.Error(GetFuncName(), " ", err)
panic(err) panic(err)
} }
@ -92,8 +92,8 @@ func LoopSubscribe(cr *core.Core, channelName string, redisConf *core.RedisConfi
} else { } else {
logrus.Warning("channelname not match", channelName) logrus.Warning("channelname not match", channelName)
} }
logrus.Warning("msg.Payload: ", msg.Payload) logrus.Debug("msg.Payload: ", msg.Payload)
fmt.Println("channelName: ", channelName, " msg.Payload: ", msg.Payload) // fmt.Println("channelName: ", channelName, " msg.Payload: ", msg.Payload)
switch ctype { switch ctype {
// 接收到的candle扔到 candle 二次加工流水线 // 接收到的candle扔到 candle 二次加工流水线
case "candle": case "candle":
@ -235,7 +235,7 @@ func GetRangeCandleSortedSet(cr *core.Core, setName string, count int, from time
} }
logrus.Info("ZRevRangeByScore ", setName, opt) logrus.Info("ZRevRangeByScore ", setName, opt)
ary, err = cli.ZRevRangeByScore(setName, opt).Result() ary, err = cli.ZRevRangeByScore(setName, opt).Result()
fmt.Println("ary: ", ary, " setName: ", setName, " opt: ", opt) // fmt.Println("ary: ", ary, " setName: ", setName, " opt: ", opt)
if err != nil { if err != nil {
return &cdl, err return &cdl, err
} }
@ -301,15 +301,15 @@ func MakeRsi(cr *core.Core, cl *core.Candle, count int, makeStock bool) (error,
} }
cdl.RecursiveBubbleS(len(cdl.List), "asc") cdl.RecursiveBubbleS(len(cdl.List), "asc")
closeList := []float64{} closeList := []float64{}
ll := len(cdl.List) // ll := len(cdl.List)
fmt.Println("candleList len:", ll) // fmt.Println("candleList len:", ll)
for k, v := range cdl.List { for _, v := range cdl.List {
fmt.Println("candle in list", ll, k, v) // fmt.Println("candle in list", ll, k, v)
closeList = append(closeList, ToFloat64(v.Data[4])) closeList = append(closeList, ToFloat64(v.Data[4]))
} }
rsiList, err := CalculateRSI(closeList, count) rsiList, err := CalculateRSI(closeList, count)
if err != nil { if err != nil {
fmt.Println("Error calculating RSI:", err) logrus.Error("Error calculating RSI:", err)
return err, 0 return err, 0
} }
rsi := core.Rsi{ rsi := core.Rsi{
@ -329,9 +329,9 @@ func MakeRsi(cr *core.Core, cl *core.Candle, count int, makeStock bool) (error,
rsi.Confirm = true rsi.Confirm = true
} }
fmt.Println("will send rsi") // fmt.Println("will send rsi")
go func() { go func() {
fmt.Println("make a rsi") // fmt.Println("make a rsi")
cr.RsiProcessChan <- &rsi cr.RsiProcessChan <- &rsi
}() }()
if !makeStock { if !makeStock {
@ -341,7 +341,7 @@ func MakeRsi(cr *core.Core, cl *core.Candle, count int, makeStock bool) (error,
percentK, percentD, err := CalculateStochRSI(rsiList, count, 3, 3) percentK, percentD, err := CalculateStochRSI(rsiList, count, 3, 3)
if err != nil { if err != nil {
fmt.Println("Error calculating StochRSI:", err) logrus.Error("Error calculating StochRSI:", err)
return err, 0 return err, 0
} }
srsi := core.StockRsi{ srsi := core.StockRsi{
@ -356,9 +356,9 @@ func MakeRsi(cr *core.Core, cl *core.Candle, count int, makeStock bool) (error,
Confirm: true, Confirm: true,
} }
fmt.Println("will send stockrsi") // fmt.Println("will send stockrsi")
go func() { go func() {
fmt.Println("make a stockrsi") // fmt.Println("make a stockrsi")
cr.StockRsiProcessChan <- &srsi cr.StockRsiProcessChan <- &srsi
}() }()
@ -420,7 +420,7 @@ func MakeMaX(cr *core.Core, cl *core.Candle, count int) (error, int) {
} }
tm, _ := core.Int64ToTime(tsi) tm, _ := core.Int64ToTime(tsi)
fmt.Println("max tm:", tm) logrus.Debug("max tm:", tm)
mx := core.MaX{ mx := core.MaX{
KeyName: keyName, KeyName: keyName,
InstID: cl.InstID, InstID: cl.InstID,
@ -449,7 +449,7 @@ func CandlesProcess(cr *core.Core) {
cd := <-cr.CandlesProcessChan cd := <-cr.CandlesProcessChan
cd.LastUpdate = time.Now() cd.LastUpdate = time.Now()
// logrus.Debug("cd: ", cd) // logrus.Debug("cd: ", cd)
fmt.Println("candle in process: ", cd) logrus.Debug("candle in process: ", cd)
go func(cad *core.Candle) { go func(cad *core.Candle) {
mcd := MyCandle{ mcd := MyCandle{
Candle: *cad, Candle: *cad,
@ -509,7 +509,7 @@ func MakeSoftCandles(cr *core.Core, mcd *MyCandle) {
Timestamp: ts, Timestamp: ts,
} }
fmt.Println("makeSoftCandles for: ", cd1) // fmt.Println("makeSoftCandles for: ", cd1)
// cd0是从tickerInfo创建的1m Candle克隆来的, Data里只有Data[4]被赋值是last其他都是"-1" // cd0是从tickerInfo创建的1m Candle克隆来的, Data里只有Data[4]被赋值是last其他都是"-1"
// TODO 填充其余几个未赋值的字段除了成交量和成交美元数以外并存入redis待用 // TODO 填充其余几个未赋值的字段除了成交量和成交美元数以外并存入redis待用
// strconv.FormatInt(otmi, 10) // strconv.FormatInt(otmi, 10)
@ -551,7 +551,7 @@ func RsisProcess(cr *core.Core) {
for { for {
rsi := <-cr.RsiProcessChan rsi := <-cr.RsiProcessChan
// logrus.Debug("mx: ", mx) // logrus.Debug("mx: ", mx)
fmt.Println("rsi recieved:", rsi) logrus.Debug("rsi recieved:", rsi)
go func(rsi *core.Rsi) { go func(rsi *core.Rsi) {
mrs := MyRsi{ mrs := MyRsi{
Rsi: *rsi, Rsi: *rsi,
@ -565,7 +565,7 @@ func StockRsisProcess(cr *core.Core) {
for { for {
srsi := <-cr.StockRsiProcessChan srsi := <-cr.StockRsiProcessChan
// logrus.Debug("mx: ", mx) // logrus.Debug("mx: ", mx)
fmt.Println("stockrsi recieved:", srsi) logrus.Debug("stockrsi recieved:", srsi)
go func(srsi *core.StockRsi) { go func(srsi *core.StockRsi) {
mrs := MyStockRsi{ mrs := MyStockRsi{
StockRsi: *srsi, StockRsi: *srsi,

View File

@ -3,7 +3,7 @@ package module
import ( import (
// "encoding/json" // "encoding/json"
// "errors" // "errors"
"fmt" // "fmt"
"github.com/phyer/core" "github.com/phyer/core"
"os" "os"
"strconv" "strconv"
@ -14,7 +14,7 @@ import (
// simple "github.com/bitly/go-simplejson" // simple "github.com/bitly/go-simplejson"
// "github.com/go-redis/redis" // "github.com/go-redis/redis"
// "github.com/phyer/core/utils" // "github.com/phyer/core/utils"
//logrus "github.com/sirupsen/logrus" logrus "github.com/sirupsen/logrus"
) )
type MyMaX struct { type MyMaX struct {
@ -25,7 +25,7 @@ func (mmx *MyMaX) Process(cr *core.Core) {
mx := mmx.MaX mx := mmx.MaX
_, err := mx.SetToKey(cr) _, err := mx.SetToKey(cr)
if err != nil { if err != nil {
fmt.Println("max SetToKey err: ", err) logrus.Error("max SetToKey err: ", err)
return return
} }
go func() { go func() {

View File

@ -2,8 +2,9 @@ package module
import ( import (
// "errors" // "errors"
"fmt" // "fmt"
"github.com/phyer/core" "github.com/phyer/core"
logrus "github.com/sirupsen/logrus"
// "math" // "math"
"os" "os"
"strconv" "strconv"
@ -40,7 +41,7 @@ func (mti *MyTickerInfo) Process(cr *core.Core) {
go func() { go func() {
tickerToCandle := os.Getenv("SIAGA_TICKERTOCANDLE") == "true" tickerToCandle := os.Getenv("SIAGA_TICKERTOCANDLE") == "true"
if tickerToCandle { if tickerToCandle {
fmt.Println("tickerToCandle: ", ti) logrus.Debug("tickerToCandle: ", ti)
cd := mti.ConvertToCandle(cr, "1m") cd := mti.ConvertToCandle(cr, "1m")
cr.CandlesProcessChan <- cd cr.CandlesProcessChan <- cd
} }

BIN
siaga

Binary file not shown.