candle的volCcy和confirm 上报到elasticsearch
This commit is contained in:
		
							parent
							
								
									e50d818d9b
								
							
						
					
					
						commit
						6b018dd813
					
				
							
								
								
									
										225
									
								
								core/candle.go
									
									
									
									
									
								
							
							
						
						
									
										225
									
								
								core/candle.go
									
									
									
									
									
								
							@ -30,6 +30,8 @@ type Candle struct {
 | 
				
			|||||||
	High      float64
 | 
						High      float64
 | 
				
			||||||
	Low       float64
 | 
						Low       float64
 | 
				
			||||||
	Close     float64
 | 
						Close     float64
 | 
				
			||||||
 | 
						VolCcy    float64
 | 
				
			||||||
 | 
						Confirm   bool
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type MaX struct {
 | 
					type MaX struct {
 | 
				
			||||||
@ -218,6 +220,229 @@ func (cl *Candle) ToStruct(core *Core) error {
 | 
				
			|||||||
		return err
 | 
							return err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	cl.Close = clse
 | 
						cl.Close = clse
 | 
				
			||||||
 | 
						cl.VolCcy, err = strconv.ParseFloat(cl.Data[6].(string), 64)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							fmt.Println("Error parsing string to float64:", err)
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if cl.Data[6].(string) == 1 {
 | 
				
			||||||
 | 
							cl.Confirm = true 
 | 
				
			||||||
 | 
						}else {
 | 
				
			||||||
 | 
							cl.Confirm = false 
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						cl.Data = nil
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					func (cl *Candle) SetToKey(core *Core) ([]interface{}, error) {
 | 
				
			||||||
 | 
						data := cl.Data
 | 
				
			||||||
 | 
						tsi, err := strconv.ParseInt(data[0].(string), 10, 64)
 | 
				
			||||||
 | 
						tss := strconv.FormatInt(tsi, 10)
 | 
				
			||||||
 | 
						keyName := "candle" + cl.Period + "|" + cl.InstId + "|ts:" + tss
 | 
				
			||||||
 | 
						//过期时间:根号(当前candle的周期/1分钟)*10000
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						dt, err := json.Marshal(cl.Data)
 | 
				
			||||||
 | 
						exp := core.PeriodToMinutes(cl.Period)
 | 
				
			||||||
 | 
						// expf := float64(exp) * 60
 | 
				
			||||||
 | 
						expf := utils.Sqrt(float64(exp)) * 100
 | 
				
			||||||
 | 
						extt := time.Duration(expf) * time.Minute
 | 
				
			||||||
 | 
						curVolstr, _ := data[5].(string)
 | 
				
			||||||
 | 
						curVol, err := strconv.ParseFloat(curVolstr, 64)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							fmt.Println("err of convert ts:", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						curVolCcystr, _ := data[6].(string)
 | 
				
			||||||
 | 
						curVolCcy, err := strconv.ParseFloat(curVolCcystr, 64)
 | 
				
			||||||
 | 
						curPrice := curVolCcy / curVol
 | 
				
			||||||
 | 
						if curPrice <= 0 {
 | 
				
			||||||
 | 
							fmt.Println("price有问题", curPrice, "dt: ", string(dt), "from:", cl.From)
 | 
				
			||||||
 | 
							err = errors.New("price有问题")
 | 
				
			||||||
 | 
							return cl.Data, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						redisCli := core.RedisCli
 | 
				
			||||||
 | 
						// tm := time.UnixMilli(tsi).Format("2006-01-02 15:04")
 | 
				
			||||||
 | 
						fmt.Println("setToKey:", keyName, "ts: ", "price: ", curPrice, "from:", cl.From)
 | 
				
			||||||
 | 
						redisCli.Set(keyName, dt, extt).Result()
 | 
				
			||||||
 | 
						core.SaveUniKey(cl.Period, keyName, extt, tsi, cl)
 | 
				
			||||||
 | 
						return cl.Data, err
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (mx *MaX) SetToKey() ([]interface{}, error) {
 | 
				
			||||||
 | 
						cstr := strconv.Itoa(mx.Count)
 | 
				
			||||||
 | 
						tss := strconv.FormatInt(mx.Ts, 10)
 | 
				
			||||||
 | 
						keyName := "ma" + cstr + "|candle" + mx.Period + "|" + mx.InstId + "|ts:" + tss
 | 
				
			||||||
 | 
						//过期时间:根号(当前candle的周期/1分钟)*10000
 | 
				
			||||||
 | 
						dt := []interface{}{}
 | 
				
			||||||
 | 
						dt = append(dt, mx.Ts)
 | 
				
			||||||
 | 
						dt = append(dt, mx.Value)
 | 
				
			||||||
 | 
						dj, _ := json.Marshal(dt)
 | 
				
			||||||
 | 
						exp := mx.Core.PeriodToMinutes(mx.Period)
 | 
				
			||||||
 | 
						expf := utils.Sqrt(float64(exp)) * 100
 | 
				
			||||||
 | 
						extt := time.Duration(expf) * time.Minute
 | 
				
			||||||
 | 
						// loc, _ := time.LoadLocation("Asia/Shanghai")
 | 
				
			||||||
 | 
						// tm := time.UnixMilli(mx.Ts).In(loc).Format("2006-01-02 15:04")
 | 
				
			||||||
 | 
						// fmt.Println("setToKey:", keyName, "ts:", tm, string(dj), "from: ", mx.From)
 | 
				
			||||||
 | 
						_, err := mx.Core.RedisCli.GetSet(keyName, dj).Result()
 | 
				
			||||||
 | 
						mx.Core.RedisCli.Expire(keyName, extt)
 | 
				
			||||||
 | 
						return dt, err
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// 保证同一个 period, keyName ,在一个周期里,SaveToSortSet只会被执行一次
 | 
				
			||||||
 | 
					func (core *Core) SaveUniKey(period string, keyName string, extt time.Duration, tsi int64, cl *Candle) {
 | 
				
			||||||
 | 
						did := cl.InstId + cl.Period + cl.Data[0].(string)
 | 
				
			||||||
 | 
						cl.Id = HashString(did)
 | 
				
			||||||
 | 
						cl.ToStruct(core)
 | 
				
			||||||
 | 
						cd, _ := json.Marshal(cl)
 | 
				
			||||||
 | 
						wg := WriteLog{
 | 
				
			||||||
 | 
							Content: cd,
 | 
				
			||||||
 | 
							Tag:     "sardine.log.candle." + cl.Period,
 | 
				
			||||||
 | 
							Id:      cl.Id,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						core.WriteLogChan <- &wg
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						refName := keyName + "|refer"
 | 
				
			||||||
 | 
						refRes, _ := core.RedisCli.GetSet(refName, 1).Result()
 | 
				
			||||||
 | 
						core.RedisCli.Expire(refName, extt)
 | 
				
			||||||
 | 
						// 为保证唯一性机制,防止SaveToSortSet 被重复执行
 | 
				
			||||||
 | 
						if len(refRes) != 0 {
 | 
				
			||||||
 | 
							fmt.Println("refName exist: ", refName)
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						core.SaveToSortSet(period, keyName, extt, tsi)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// tsi: 上报时间timeStamp millinSecond
 | 
				
			||||||
 | 
					func (core *Core) SaveToSortSet(period string, keyName string, extt time.Duration, tsi int64) {
 | 
				
			||||||
 | 
						ary := strings.Split(keyName, "ts:")
 | 
				
			||||||
 | 
						setName := ary[0] + "sortedSet"
 | 
				
			||||||
 | 
						z := redis.Z{
 | 
				
			||||||
 | 
							Score:  float64(tsi),
 | 
				
			||||||
 | 
							Member: keyName,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						rs, err := core.RedisCli.ZAdd(setName, z).Result()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							fmt.Println("err of ma7|ma30 add to redis:", err)
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							fmt.Println("sortedSet added to redis:", rs, keyName)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (cr *Core) PeriodToMinutes(period string) int64 {
 | 
				
			||||||
 | 
						ary := strings.Split(period, "")
 | 
				
			||||||
 | 
						beiStr := "1"
 | 
				
			||||||
 | 
						danwei := ""
 | 
				
			||||||
 | 
						if len(ary) == 3 {
 | 
				
			||||||
 | 
							beiStr = ary[0] + ary[1]
 | 
				
			||||||
 | 
							danwei = ary[2]
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							beiStr = ary[0]
 | 
				
			||||||
 | 
							danwei = ary[1]
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						cheng := 1
 | 
				
			||||||
 | 
						bei, _ := strconv.Atoi(beiStr)
 | 
				
			||||||
 | 
						switch danwei {
 | 
				
			||||||
 | 
						case "m":
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								cheng = bei
 | 
				
			||||||
 | 
								break
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						case "H":
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								cheng = bei * 60
 | 
				
			||||||
 | 
								break
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						case "D":
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								cheng = bei * 60 * 24
 | 
				
			||||||
 | 
								break
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						case "W":
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								cheng = bei * 60 * 24 * 7
 | 
				
			||||||
 | 
								break
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						case "M":
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								cheng = bei * 60 * 24 * 30
 | 
				
			||||||
 | 
								break
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						case "Y":
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								cheng = bei * 60 * 24 * 365
 | 
				
			||||||
 | 
								break
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						default:
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								fmt.Println("notmatch:", danwei)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return int64(cheng)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// type ScanCmd struct {
 | 
				
			||||||
 | 
					// baseCmd
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					// page   []string
 | 
				
			||||||
 | 
					// cursor uint64
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					// process func(cmd Cmder) error
 | 
				
			||||||
 | 
					// }
 | 
				
			||||||
 | 
					func (core *Core) GetRangeKeyList(pattern string, from time.Time) ([]*simple.Json, error) {
 | 
				
			||||||
 | 
						// 比如,用来计算ma30或ma7,倒推多少时间范围,
 | 
				
			||||||
 | 
						redisCli := core.RedisCli
 | 
				
			||||||
 | 
						cursor := uint64(0)
 | 
				
			||||||
 | 
						n := 0
 | 
				
			||||||
 | 
						allTs := []int64{}
 | 
				
			||||||
 | 
						var keys []string
 | 
				
			||||||
 | 
						for {
 | 
				
			||||||
 | 
							var err error
 | 
				
			||||||
 | 
							keys, cursor, _ = redisCli.Scan(cursor, pattern+"*", 2000).Result()
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								panic(err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							n += len(keys)
 | 
				
			||||||
 | 
							if n == 0 {
 | 
				
			||||||
 | 
								break
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// keys, _ := redisCli.Keys(pattern + "*").Result()
 | 
				
			||||||
 | 
						for _, key := range keys {
 | 
				
			||||||
 | 
							keyAry := strings.Split(key, ":")
 | 
				
			||||||
 | 
							key = keyAry[1]
 | 
				
			||||||
 | 
							keyi64, _ := strconv.ParseInt(key, 10, 64)
 | 
				
			||||||
 | 
							allTs = append(allTs, keyi64)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						nary := utils.RecursiveBubble(allTs, len(allTs))
 | 
				
			||||||
 | 
						tt := from.UnixMilli()
 | 
				
			||||||
 | 
						ff := tt - tt%60000
 | 
				
			||||||
 | 
						fi := int64(ff)
 | 
				
			||||||
 | 
						mary := []int64{}
 | 
				
			||||||
 | 
						for _, v := range nary {
 | 
				
			||||||
 | 
							if v < fi {
 | 
				
			||||||
 | 
								break
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							mary = append(mary, v)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						res := []*simple.Json{}
 | 
				
			||||||
 | 
						for _, v := range mary {
 | 
				
			||||||
 | 
							// if k > 1 {
 | 
				
			||||||
 | 
							// break
 | 
				
			||||||
 | 
							// }
 | 
				
			||||||
 | 
							nv := pattern + strconv.FormatInt(v, 10)
 | 
				
			||||||
 | 
							str, err := redisCli.Get(nv).Result()
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								fmt.Println("err of redis get key:", nv, err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							cur, err := simple.NewJson([]byte(str))
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								fmt.Println("err of create newJson:", str, err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							res = append(res, cur)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return res, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
	cl.Data = nil
 | 
						cl.Data = nil
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user