实现maX
This commit is contained in:
		
							parent
							
								
									bae89b0ad2
								
							
						
					
					
						commit
						9e7aa4fb94
					
				
							
								
								
									
										4
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								go.mod
									
									
									
									
									
								
							@ -5,13 +5,13 @@ replace github.com/phyer/siaga/modules => ./modules
 | 
			
		||||
go 1.21
 | 
			
		||||
 | 
			
		||||
require (
 | 
			
		||||
	github.com/bitly/go-simplejson v0.5.0
 | 
			
		||||
	github.com/go-redis/redis v6.15.9+incompatible
 | 
			
		||||
	github.com/phyer/core v0.1.12
 | 
			
		||||
	github.com/phyer/core v0.1.18
 | 
			
		||||
	github.com/sirupsen/logrus v1.9.3
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
require (
 | 
			
		||||
	github.com/bitly/go-simplejson v0.5.0 // indirect
 | 
			
		||||
	github.com/onsi/ginkgo v1.16.5 // indirect
 | 
			
		||||
	github.com/onsi/gomega v1.18.1 // indirect
 | 
			
		||||
	github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196 // indirect
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										4
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								go.sum
									
									
									
									
									
								
							@ -48,8 +48,8 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y
 | 
			
		||||
github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
 | 
			
		||||
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
 | 
			
		||||
github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
 | 
			
		||||
github.com/phyer/core v0.1.12 h1:fOvE0T3obnzPUqT8F78i/rIwl/opa7ZhB3xx96GIFPc=
 | 
			
		||||
github.com/phyer/core v0.1.12/go.mod h1:oVP5mvnnJvI2Qxlnh4jYGj92DbH7XyY2xeRagQ3hdo8=
 | 
			
		||||
github.com/phyer/core v0.1.18 h1:pXQ2QDvkbCVtqcmaQl2nCa7LjYYeJkYVQdb26HcTvgc=
 | 
			
		||||
github.com/phyer/core v0.1.18/go.mod h1:oVP5mvnnJvI2Qxlnh4jYGj92DbH7XyY2xeRagQ3hdo8=
 | 
			
		||||
github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196 h1:P1sxgCsS0VIL38ufZzgUuZLLyY/B+po6kSY7ziNZT7E=
 | 
			
		||||
github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196/go.mod h1:iZexs5agdApNlp8HW/FqKgma4Ij1x8/o+ZLcMvY3f80=
 | 
			
		||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										12
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										12
									
								
								main.go
									
									
									
									
									
								
							@ -60,12 +60,12 @@ func main() {
 | 
			
		||||
	go func() {
 | 
			
		||||
		md.CandlesProcess(&cr)
 | 
			
		||||
	}()
 | 
			
		||||
	// go func() {
 | 
			
		||||
	// 	core.MaXsProcess(&cr)
 | 
			
		||||
	// }()
 | 
			
		||||
	// go func() {
 | 
			
		||||
	// 	core.TickerInfoProcess(&cr)
 | 
			
		||||
	// }()
 | 
			
		||||
	go func() {
 | 
			
		||||
		md.MaXsProcess(&cr)
 | 
			
		||||
	}()
 | 
			
		||||
	go func() {
 | 
			
		||||
		md.TickerInfoProcess(&cr)
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	// 这些暂时不运行, 以后要不要运行再说
 | 
			
		||||
	// go func() {
 | 
			
		||||
 | 
			
		||||
@ -12,7 +12,7 @@ import (
 | 
			
		||||
	"time"
 | 
			
		||||
	//
 | 
			
		||||
	// simple "github.com/bitly/go-simplejson"
 | 
			
		||||
	"github.com/go-redis/redis"
 | 
			
		||||
	// "github.com/go-redis/redis"
 | 
			
		||||
	// "github.com/phyer/core/utils"
 | 
			
		||||
	logrus "github.com/sirupsen/logrus"
 | 
			
		||||
)
 | 
			
		||||
@ -29,8 +29,9 @@ func (cd *MyCandle) Process(cr *core.Core) {
 | 
			
		||||
	if !founded {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	// 把candle存下来
 | 
			
		||||
	go func() {
 | 
			
		||||
		saveCandle := os.Getenv("SARDINE_SAVECANDLE")
 | 
			
		||||
		saveCandle := os.Getenv("SIAGA_SAVECANDLE")
 | 
			
		||||
		if saveCandle == "true" {
 | 
			
		||||
			_, err := cd.SetToKey(cr)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
@ -40,7 +41,7 @@ func (cd *MyCandle) Process(cr *core.Core) {
 | 
			
		||||
	}()
 | 
			
		||||
	// TODO update plate and coaster
 | 
			
		||||
	go func() {
 | 
			
		||||
		makeSeries := os.Getenv("SARDINE_MAKESERIES")
 | 
			
		||||
		makeSeries := os.Getenv("SIAGA_MAKESERIES")
 | 
			
		||||
		if makeSeries == "true" {
 | 
			
		||||
			_, err := cd.InsertIntoPlate(cr)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
@ -51,7 +52,7 @@ func (cd *MyCandle) Process(cr *core.Core) {
 | 
			
		||||
 | 
			
		||||
	// 由于max可以从远端直接拿,不需要sardine自己做,所以可以sardine做也可以不做
 | 
			
		||||
	go func(cad *MyCandle) {
 | 
			
		||||
		makeMaX := os.Getenv("SARDINE_MAKEMAX")
 | 
			
		||||
		makeMaX := os.Getenv("SIAGA_MAKEMAX")
 | 
			
		||||
		if makeMaX == "true" {
 | 
			
		||||
			// 等一会儿防止candle还没有加进CoinMap
 | 
			
		||||
			time.Sleep(200 * time.Millisecond)
 | 
			
		||||
@ -64,19 +65,20 @@ func (cd *MyCandle) Process(cr *core.Core) {
 | 
			
		||||
		makeSoft := false
 | 
			
		||||
		// makeVolSoft := true
 | 
			
		||||
		// makeVolSoft := false
 | 
			
		||||
		if os.Getenv("SARDINE_MAKESOFTCANDLE") == "true" {
 | 
			
		||||
		if os.Getenv("SIAGA_MAKESOFTCANDLE") == "true" {
 | 
			
		||||
			makeSoft = true
 | 
			
		||||
		}
 | 
			
		||||
		// 根据低维度candle,模拟出"软"的高纬度candle
 | 
			
		||||
		if cad.Period == "1m" && makeSoft {
 | 
			
		||||
			fmt.Println("makeSoft:", cad.Period, cad.InstId)
 | 
			
		||||
			MakeSoftCandles(cr, &cad.Candle)
 | 
			
		||||
			fmt.Println("makeSoft:", cad.Period, cad.InstID)
 | 
			
		||||
			MakeSoftCandles(cr, cad)
 | 
			
		||||
		}
 | 
			
		||||
	}(cd)
 | 
			
		||||
	go func(cad *MyCandle) {
 | 
			
		||||
		time.Sleep(100 * time.Millisecond)
 | 
			
		||||
		cr.AddToGeneralCandleChnl(cad)
 | 
			
		||||
	}(cd)
 | 
			
		||||
	// 再次发布到redis channel, 給可能的收听者,我觉得没有这个必要了吧
 | 
			
		||||
	// go func(cad *MyCandle) {
 | 
			
		||||
	// 	time.Sleep(100 * time.Millisecond)
 | 
			
		||||
	// 	cad.AddToGeneralCandleChnl(cr)
 | 
			
		||||
	// }(cd)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (cd *MyCandle) InsertIntoPlate(cr *core.Core) (*core.Sample, error) {
 | 
			
		||||
@ -100,6 +102,87 @@ func (cd *MyCandle) InsertIntoPlate(cr *core.Core) (*core.Sample, error) {
 | 
			
		||||
		err := errors.New(erstr)
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	sm, err := po.RPushSample(cr, *cd, "candle")
 | 
			
		||||
	sm, err := po.RPushSample(cr, &cd.Candle, "candle")
 | 
			
		||||
	return sm, err
 | 
			
		||||
}
 | 
			
		||||
func (cad *MyCandle) AddToGeneralCandleChnl(cr *core.Core) {
 | 
			
		||||
	suffix := ""
 | 
			
		||||
	env := os.Getenv("GO_ENV")
 | 
			
		||||
	if strings.Contains(env, "demoEnv") {
 | 
			
		||||
		suffix = "-demoEnv"
 | 
			
		||||
	}
 | 
			
		||||
	redisCli := cr.RedisLocalCli
 | 
			
		||||
	ab, _ := json.Marshal(cad.Candle)
 | 
			
		||||
	_, err := redisCli.Publish(core.ALLCANDLES_PUBLISH+suffix, string(ab)).Result()
 | 
			
		||||
	// logrus.Debug("publish, res,err:", res, err, "candle:", string(ab))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		logrus.Debug("err of ma7|ma30 add to redis2:", err, cad.Candle.From)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TODO  用key名 BTC-USDT|3M|CandleInfo 维护唯一的一份。
 | 
			
		||||
// 当这个key不存在的时候,创建这个key。周期的起始时间,目前MakeSoftCandle函数已经实现了这个逻辑。
 | 
			
		||||
// 当存在时,检查现有key里的时间信息,和刚创建的周期其实时间一致不一致,
 | 
			
		||||
// 如果一致,更新这个CandleInfo的last值,如果需要的话,更新high和low值。
 | 
			
		||||
// 当周期起始时间和已经保存的周期起始时间不一致的时候,说明到了跨周期的时间点了。这时候更新现有的key里保存的open,high,low,close信息为当前tickerInfo的last值。
 | 
			
		||||
func (mcd MyCandle) GetSetCandleInfo(cr *core.Core, newPeriod string, ts int64) []interface{} {
 | 
			
		||||
	tss := strconv.FormatInt(ts, 10)
 | 
			
		||||
	cd := mcd.Candle
 | 
			
		||||
	keyName := cd.InstID + "|" + newPeriod + "|candleData"
 | 
			
		||||
	str, _ := cr.RedisLocalCli.Get(keyName).Result()
 | 
			
		||||
	odata := []interface{}{}
 | 
			
		||||
	founded := false
 | 
			
		||||
 | 
			
		||||
	if len(str) > 0 {
 | 
			
		||||
		err := json.Unmarshal([]byte(str), &odata)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			logrus.Panic(GetFuncName(), " str:", str, " err:", err)
 | 
			
		||||
		} else {
 | 
			
		||||
			founded = true
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if !founded {
 | 
			
		||||
		cd.Data[0] = tss        // 时间,数据都是新的
 | 
			
		||||
		cd.Data[1] = cd.Data[4] //开盘价
 | 
			
		||||
		cd.Data[2] = cd.Data[4] //最高价
 | 
			
		||||
		cd.Data[3] = cd.Data[4] //最低价
 | 
			
		||||
		bj, _ := json.Marshal(cd.Data)
 | 
			
		||||
		cr.RedisLocalCli.Set(keyName, string(bj), 0)
 | 
			
		||||
		return cd.Data
 | 
			
		||||
	} else {
 | 
			
		||||
		// 发现原有的candleData,且还没过期
 | 
			
		||||
 | 
			
		||||
		if (odata[0]).(string) == tss {
 | 
			
		||||
			oHigh := ToFloat64(odata[2])
 | 
			
		||||
			oLow := ToFloat64(odata[3])
 | 
			
		||||
			cd.Data[0] = tss
 | 
			
		||||
			cd.Data[1] = odata[1]
 | 
			
		||||
			cd.Data[5] = odata[5]
 | 
			
		||||
			cd.Data[6] = odata[6]
 | 
			
		||||
			if oHigh <= ToFloat64(cd.Data[4]) {
 | 
			
		||||
				cd.Data[2] = cd.Data[4]
 | 
			
		||||
			} else {
 | 
			
		||||
				cd.Data[2] = odata[2]
 | 
			
		||||
			}
 | 
			
		||||
			if oLow >= ToFloat64(cd.Data[4]) {
 | 
			
		||||
				cd.Data[3] = cd.Data[4]
 | 
			
		||||
			} else {
 | 
			
		||||
				cd.Data[3] = odata[3]
 | 
			
		||||
			}
 | 
			
		||||
			bj, _ := json.Marshal(cd.Data)
 | 
			
		||||
			cr.RedisLocalCli.Set(keyName, string(bj), 0)
 | 
			
		||||
			return cd.Data
 | 
			
		||||
		} else {
 | 
			
		||||
			// 发现原有的candleData,但是已经过期了
 | 
			
		||||
			cd.Data[0] = tss        // 新开始一个cd周期,时间,数据都是新的
 | 
			
		||||
			cd.Data[1] = cd.Data[4] //开盘价
 | 
			
		||||
			cd.Data[2] = cd.Data[4] //最高价
 | 
			
		||||
			cd.Data[3] = cd.Data[4] //最低价
 | 
			
		||||
			cd.Data[5] = odata[5]
 | 
			
		||||
			cd.Data[6] = odata[6]
 | 
			
		||||
			bj, _ := json.Marshal(cd.Data)
 | 
			
		||||
			cr.RedisLocalCli.Set(keyName, string(bj), 0)
 | 
			
		||||
			return cd.Data
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -152,7 +152,7 @@ func LoopMakeMaX(cr *core.Core) {
 | 
			
		||||
			// sz := utils.ShaiziInt(1500) + 500
 | 
			
		||||
			time.Sleep(time.Duration(30) * time.Millisecond)
 | 
			
		||||
			err, ct := MakeMaX(cr, cad, 7)
 | 
			
		||||
			logrus.Warn(GetFuncName(), " ma7 err:", err, " ct:", ct, " cd.InstID:", cd.InstId, " cd.Period:", cd.Period)
 | 
			
		||||
			logrus.Warn(GetFuncName(), " ma7 err:", err, " ct:", ct, " cd.InstID:", cd.InstID, " cd.Period:", cd.Period)
 | 
			
		||||
			//TODO 这个思路不错,单行不通,远程redis禁不住这么频繁的请求
 | 
			
		||||
			// cd.InvokeRestQFromRemote(cr, ct)
 | 
			
		||||
		}(cd)
 | 
			
		||||
@ -161,7 +161,7 @@ func LoopMakeMaX(cr *core.Core) {
 | 
			
		||||
			// sz := utils.ShaiziInt(2000) + 500
 | 
			
		||||
			time.Sleep(time.Duration(30) * time.Millisecond)
 | 
			
		||||
			err, ct := MakeMaX(cr, cad, 30)
 | 
			
		||||
			logrus.Warn(GetFuncName(), " ma30 err:", err, " ct:", ct, " cd.InstID:", cd.InstId, " cd.Period:", cd.Period)
 | 
			
		||||
			logrus.Warn(GetFuncName(), " ma30 err:", err, " ct:", ct, " cd.InstID:", cd.InstID, " cd.Period:", cd.Period)
 | 
			
		||||
			// cd.InvokeRestQFromRemote(cr, ct)
 | 
			
		||||
		}(cd)
 | 
			
		||||
		// TODO TODO 这地方不能加延时,否则makeMax处理不过来,多的就丢弃了,造成maX的sortedSet比candle的短很多。后面所有依赖的逻辑都受影响.
 | 
			
		||||
@ -266,13 +266,13 @@ func MakeMaX(cr *core.Core, cl *core.Candle, count int) (error, int) {
 | 
			
		||||
	// tsa := time.UnixMilli(tsi).Format("01-02 15:03:04")
 | 
			
		||||
	// fmt.Println("MakeMaX candle: ", cl.InstID, cl.Period, tsa, cl.From)
 | 
			
		||||
	tss := strconv.FormatInt(tsi, 10)
 | 
			
		||||
	keyName := "candle" + cl.Period + "|" + cl.InstId + "|ts:" + tss
 | 
			
		||||
	keyName := "candle" + cl.Period + "|" + cl.InstID + "|ts:" + tss
 | 
			
		||||
	//过期时间:根号(当前candle的周期/1分钟)*10000
 | 
			
		||||
 | 
			
		||||
	lastTime := time.UnixMilli(tsi)
 | 
			
		||||
	// lasts := lastTime.Format("2006-01-02 15:04")
 | 
			
		||||
	// 以当前candle的时间戳为起点倒推count个周期,取得所需candle用于计算maX
 | 
			
		||||
	setName := "candle" + cl.Period + "|" + cl.InstId + "|sortedSet"
 | 
			
		||||
	setName := "candle" + cl.Period + "|" + cl.InstID + "|sortedSet"
 | 
			
		||||
	// cdl, err := cr.GetLastCandleListOfCoin(cl.InstID, cl.Period, count, lastTime)
 | 
			
		||||
	cdl, err := GetRangeCandleSortedSet(cr, setName, count, lastTime)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@ -310,7 +310,7 @@ func MakeMaX(cr *core.Core, cl *core.Candle, count int) (error, int) {
 | 
			
		||||
	}
 | 
			
		||||
	mx := core.MaX{
 | 
			
		||||
		KeyName: keyName,
 | 
			
		||||
		InstID:  cl.InstId,
 | 
			
		||||
		InstID:  cl.InstID,
 | 
			
		||||
		Period:  cl.Period,
 | 
			
		||||
		From:    cl.From,
 | 
			
		||||
		Count:   count,
 | 
			
		||||
@ -349,7 +349,7 @@ func CandlesProcess(cr *core.Core) {
 | 
			
		||||
// "count": 1
 | 
			
		||||
// },
 | 
			
		||||
// 从startTime开始,经历整数个(count * seg)之后,还能不大于分钟粒度的当前时间的话,那个时间点就是最近的当前段起始时间点
 | 
			
		||||
func MakeSoftCandles(cr *core.Core, cd *core.Candle) {
 | 
			
		||||
func MakeSoftCandles(cr *core.Core, mcd *MyCandle) {
 | 
			
		||||
	segments := cr.Cfg.Config.Get("softCandleSegmentList").MustArray()
 | 
			
		||||
	for k, v := range segments {
 | 
			
		||||
		cs := core.CandleSegment{}
 | 
			
		||||
@ -362,7 +362,7 @@ func MakeSoftCandles(cr *core.Core, cd *core.Candle) {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		// TODO: 通过序列化和反序列化,对原始的candle进行克隆,因为是对引用进行操作,所以每个seg里对candle进行操作都会改变原始对象,这和预期不符
 | 
			
		||||
		bt, _ := json.Marshal(cd)
 | 
			
		||||
		bt, _ := json.Marshal(mcd.Candle)
 | 
			
		||||
		cd0 := core.Candle{}
 | 
			
		||||
		json.Unmarshal(bt, &cd0)
 | 
			
		||||
 | 
			
		||||
@ -384,7 +384,7 @@ func MakeSoftCandles(cr *core.Core, cd *core.Candle) {
 | 
			
		||||
		}
 | 
			
		||||
		otmi := otm.UnixMilli()
 | 
			
		||||
		cd1 := core.Candle{
 | 
			
		||||
			InstId: cd0.InstId,                      // string        `json:"instId", string`
 | 
			
		||||
			InstID: cd0.InstID,                      // string        `json:"instId", string`
 | 
			
		||||
			Period: cs.Seg,                          //   `json:"period", string`
 | 
			
		||||
			Data:   cd0.Data,                        //  `json:"data"`
 | 
			
		||||
			From:   "soft|" + os.Getenv("HOSTNAME"), //  string        `json:"from"`
 | 
			
		||||
@ -392,7 +392,10 @@ func MakeSoftCandles(cr *core.Core, cd *core.Candle) {
 | 
			
		||||
		// cd0是从tickerInfo创建的1m Candle克隆来的, Data里只有Data[4]被赋值,是last,其他都是"-1"
 | 
			
		||||
		// TODO 填充其余几个未赋值的字段,除了成交量和成交美元数以外,并存入redis待用
 | 
			
		||||
		// strconv.FormatInt(otmi, 10)
 | 
			
		||||
		cd1.Data = cd0.GetSetCandleInfo(cr, cs.Seg, otmi)
 | 
			
		||||
		mcd := MyCandle{
 | 
			
		||||
			Candle: cd0,
 | 
			
		||||
		}
 | 
			
		||||
		cd1.Data = mcd.GetSetCandleInfo(cr, cs.Seg, otmi)
 | 
			
		||||
		// 生成软交易量和交易数对,用于代替last生成max
 | 
			
		||||
		go func(k int) {
 | 
			
		||||
			time.Sleep(time.Duration(100*k) * time.Millisecond)
 | 
			
		||||
@ -400,3 +403,29 @@ func MakeSoftCandles(cr *core.Core, cd *core.Candle) {
 | 
			
		||||
		}(k)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func MaXsProcess(cr *core.Core) {
 | 
			
		||||
	for {
 | 
			
		||||
		mx := <-cr.MaXProcessChan
 | 
			
		||||
		logrus.Debug("mx: ", mx)
 | 
			
		||||
		go func(maX *core.MaX) {
 | 
			
		||||
			mmx := MyMaX{
 | 
			
		||||
				MaX: *mx,
 | 
			
		||||
			}
 | 
			
		||||
			mmx.Process(cr)
 | 
			
		||||
		}(mx)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TickerInfoProcess(cr *core.Core) {
 | 
			
		||||
	for {
 | 
			
		||||
		ti := <-cr.TickerInforocessChan
 | 
			
		||||
		logrus.Debug("ti: ", ti)
 | 
			
		||||
		go func(ti *core.TickerInfo) {
 | 
			
		||||
			mti := MyTickerInfo{
 | 
			
		||||
				TickerInfo: *ti,
 | 
			
		||||
			}
 | 
			
		||||
			mti.Process(cr)
 | 
			
		||||
		}(ti)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										85
									
								
								modules/maX.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										85
									
								
								modules/maX.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,85 @@
 | 
			
		||||
package module
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	// "encoding/json"
 | 
			
		||||
	// "errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"github.com/phyer/core"
 | 
			
		||||
	"os"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	//"strings"
 | 
			
		||||
	// "sync"
 | 
			
		||||
	//"time"
 | 
			
		||||
	//
 | 
			
		||||
	// simple "github.com/bitly/go-simplejson"
 | 
			
		||||
	// "github.com/go-redis/redis"
 | 
			
		||||
	// "github.com/phyer/core/utils"
 | 
			
		||||
	//logrus "github.com/sirupsen/logrus"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type MyMaX struct {
 | 
			
		||||
	core.MaX
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mmx *MyMaX) Process(cr *core.Core) {
 | 
			
		||||
	mx := mmx.MaX
 | 
			
		||||
	_, err := mx.SetToKey(cr)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		fmt.Println("max SetToKey err: ", err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	// TODO
 | 
			
		||||
	go func() {
 | 
			
		||||
		torqueSorted := os.Getenv("SARDINE_MAKESERIES") == "true"
 | 
			
		||||
		if !torqueSorted {
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		sm, err := mmx.InsertIntoPlate(cr)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			// fmt.Println("InsertIntoPlate err: ", err)
 | 
			
		||||
		} else {
 | 
			
		||||
			// 只有在ma30计算完成,触发coasterChan相关动作
 | 
			
		||||
			if mx.Count == 30 {
 | 
			
		||||
				ci := core.CoasterInfo{
 | 
			
		||||
					InstID:      mx.InstID,
 | 
			
		||||
					Period:      mx.Period,
 | 
			
		||||
					InsertedNew: sm != nil,
 | 
			
		||||
				}
 | 
			
		||||
				cr.CoasterChan <- &ci
 | 
			
		||||
			} else {
 | 
			
		||||
				// fmt.Println("maX: candle记录尚不足30条,无需触发coaster计算:", mx)
 | 
			
		||||
				// TODO
 | 
			
		||||
				// bj, _ := json.Marshal(mx)
 | 
			
		||||
				// fmt.Println("mx:", string(bj))
 | 
			
		||||
				// 这个地方可以加个逻辑,给tunas端发消息,缺啥补啥
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	// 发送给下级消息队列订阅者, 关掉了,暂时用不上,2024-12-16
 | 
			
		||||
	// cr.AddToGeneralMaXChnl(mx)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mmx *MyMaX) InsertIntoPlate(cr *core.Core) (*core.Sample, error) {
 | 
			
		||||
	mx := mmx.MaX
 | 
			
		||||
	cr.Mu.Lock()
 | 
			
		||||
	defer cr.Mu.Unlock()
 | 
			
		||||
	pl, ok := cr.PlateMap[mx.InstID]
 | 
			
		||||
	// 尝试放弃一级缓存
 | 
			
		||||
	// if !ok {
 | 
			
		||||
	pl, _ = LoadPlate(cr, mx.InstID)
 | 
			
		||||
	cr.PlateMap["period"+mx.Period] = pl
 | 
			
		||||
	// }
 | 
			
		||||
	_, ok = pl.CoasterMap["period"+mx.Period]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		pl.MakeCoaster(cr, mx.Period)
 | 
			
		||||
	}
 | 
			
		||||
	// if pl.CoasterMap["period"+mx.Period] == nil {
 | 
			
		||||
	// fmt.Println("candle coaster: ", mx.Period, pl.CoasterMap["period"+mx.Period], pl.CoasterMap)
 | 
			
		||||
	// 创建失败的原因是原始数据不够,一般发生在服务中断了,缺少部分数据的情况下, 后续需要数据补全措施
 | 
			
		||||
	// err := errors.New("coaster创建失败 maX instId: " + mx.InstID + "; period: " + mx.Period)
 | 
			
		||||
	// return nil, err
 | 
			
		||||
	// }
 | 
			
		||||
	sm, err := pl.CoasterMap["period"+mx.Period].RPushSample(cr, mx, "ma"+strconv.Itoa(mx.Count))
 | 
			
		||||
	return sm, err
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										36
									
								
								modules/plate.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										36
									
								
								modules/plate.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,36 @@
 | 
			
		||||
package module
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	// "errors"
 | 
			
		||||
	// "fmt"
 | 
			
		||||
	"github.com/phyer/core"
 | 
			
		||||
	// "os"
 | 
			
		||||
	// "strconv"
 | 
			
		||||
	// "strings"
 | 
			
		||||
	// // "sync"
 | 
			
		||||
	// "time"
 | 
			
		||||
	// //
 | 
			
		||||
	// // simple "github.com/bitly/go-simplejson"
 | 
			
		||||
	// "github.com/go-redis/redis"
 | 
			
		||||
	// // "github.com/phyer/core/utils"
 | 
			
		||||
	// logrus "github.com/sirupsen/logrus"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// TODO 从redis里读出来已经存储的plate,如果不存在就创建一个新的
 | 
			
		||||
func LoadPlate(cr *core.Core, instId string) (*core.Plate, error) {
 | 
			
		||||
	pl := core.Plate{}
 | 
			
		||||
	plateName := instId + "|plate"
 | 
			
		||||
	_, err := cr.RedisLocalCli.Exists().Result()
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		str, _ := cr.RedisLocalCli.Get(plateName).Result()
 | 
			
		||||
		json.Unmarshal([]byte(str), &pl)
 | 
			
		||||
	} else {
 | 
			
		||||
		pl.Init(instId)
 | 
			
		||||
		prs := cr.Cfg.Config.Get("candleDimentions").MustArray()
 | 
			
		||||
		for _, v := range prs {
 | 
			
		||||
			pl.MakeCoaster(cr, v.(string))
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return &pl, nil
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										81
									
								
								modules/tickerInfo.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										81
									
								
								modules/tickerInfo.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,81 @@
 | 
			
		||||
package module
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	// "errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"github.com/phyer/core"
 | 
			
		||||
	// "math"
 | 
			
		||||
	"os"
 | 
			
		||||
	"strconv"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type MyTickerInfo struct {
 | 
			
		||||
	core.TickerInfo
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mti *MyTickerInfo) Process(cr *core.Core) {
 | 
			
		||||
	ti := mti.TickerInfo
 | 
			
		||||
	go func() {
 | 
			
		||||
		// 无需发布出去, 下面内容注释 2024-12-15
 | 
			
		||||
		// tickerPush := os.Getenv("SIAGA_TICKERPUSH") == "true"
 | 
			
		||||
		// if !tickerPush {
 | 
			
		||||
		// 	return
 | 
			
		||||
		// }
 | 
			
		||||
		// err := AddToGeneralTickerChnl(ti)
 | 
			
		||||
		// if err != nil {
 | 
			
		||||
		// 	logrus.Warn("tickerPush err:", err, ti)
 | 
			
		||||
		// }
 | 
			
		||||
	}()
 | 
			
		||||
	go func() {
 | 
			
		||||
		torqueSorted := os.Getenv("SIAGA_TORQUESORTED") == "true"
 | 
			
		||||
		if !torqueSorted {
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		// 这个功能貌似很有意思,以后可以考虑打开
 | 
			
		||||
		// ti.MakePriceSorted(cr, "2m")
 | 
			
		||||
		// ti.MakePriceSorted(cr, "4m")
 | 
			
		||||
		// ti.MakePriceSorted(cr, "8m")
 | 
			
		||||
		//用这个方法保证事件在每个周期只发生一次
 | 
			
		||||
	}()
 | 
			
		||||
	go func() {
 | 
			
		||||
		tickerToCandle := os.Getenv("SIAGA_TICKERTOCANDLE") == "true"
 | 
			
		||||
		if tickerToCandle {
 | 
			
		||||
			fmt.Println("tickerToCandle: ", ti)
 | 
			
		||||
			cd := mti.ConvertToCandle(cr, "1m")
 | 
			
		||||
			cr.CandlesProcessChan <- cd
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		ti.SetToKey(cr)
 | 
			
		||||
		// 流出时间让下面的函数进行比对
 | 
			
		||||
		// time.Sleep(50 * time.Millisecond)
 | 
			
		||||
		// snapshot 先关掉 2024-12-15
 | 
			
		||||
		// ti.MakeSnapShot(cr)
 | 
			
		||||
	}()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TODO 当前这个版本的实现里,开盘价,最高价,最低价都不重要,主要是收盘价,用来算ma7,ma30,这个就够了,以后需要的话,再细化
 | 
			
		||||
func (mti *MyTickerInfo) ConvertToCandle(cr *core.Core, period string) *core.Candle {
 | 
			
		||||
	ti := mti.TickerInfo
 | 
			
		||||
	hn := os.Getenv("HOSTNAME")
 | 
			
		||||
	lst := strconv.FormatFloat(ti.Last, 'f', -1, 64)
 | 
			
		||||
	tmi := ti.Ts
 | 
			
		||||
	tmi = tmi - tmi%60000
 | 
			
		||||
	cd := core.Candle{
 | 
			
		||||
		InstID: ti.InstId,
 | 
			
		||||
		Period: period,
 | 
			
		||||
		Data: []interface{}{
 | 
			
		||||
			strconv.FormatInt(tmi, 10), //开始时间,Unix时间戳的毫秒数格式,如 1597026383085
 | 
			
		||||
			"-1",                       //o String  开盘价格
 | 
			
		||||
			"-1",                       //h String  最高价格
 | 
			
		||||
			"-1",                       //l String  最低价格
 | 
			
		||||
			lst,                        //c String  收盘价格
 | 
			
		||||
			"-1",                       //c String  成交量
 | 
			
		||||
			"-1",                       //c String  成交美元数
 | 
			
		||||
		},
 | 
			
		||||
		From: "tickerInfo|" + hn,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return &cd
 | 
			
		||||
}
 | 
			
		||||
@ -3,23 +3,23 @@ package module
 | 
			
		||||
import (
 | 
			
		||||
	"crypto/md5"
 | 
			
		||||
	"encoding/hex"
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	simple "github.com/bitly/go-simplejson"
 | 
			
		||||
	"github.com/go-redis/redis"
 | 
			
		||||
	"github.com/phyer/core"
 | 
			
		||||
	"math"
 | 
			
		||||
	"math/rand"
 | 
			
		||||
	"os"
 | 
			
		||||
	// "encoding/json"
 | 
			
		||||
	// "errors"
 | 
			
		||||
	// "fmt"
 | 
			
		||||
	// simple "github.com/bitly/go-simplejson"
 | 
			
		||||
	// "github.com/go-redis/redis"
 | 
			
		||||
	// "github.com/phyer/core"
 | 
			
		||||
	// "math"
 | 
			
		||||
	// "math/rand"
 | 
			
		||||
	// "os"
 | 
			
		||||
	"reflect"
 | 
			
		||||
	"runtime"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
	// "github.com/phyer/core/utils"
 | 
			
		||||
	logrus "github.com/sirupsen/logrus"
 | 
			
		||||
	// "strings"
 | 
			
		||||
	// "sync"
 | 
			
		||||
	// "time"
 | 
			
		||||
	// // "github.com/phyer/core/utils"
 | 
			
		||||
	// logrus "github.com/sirupsen/logrus"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func NextPeriod(curPeriod string, idx int) string {
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user