尝试给上报到elasticsearch的文档加_id
This commit is contained in:
		
							parent
							
								
									6b018dd813
								
							
						
					
					
						commit
						9c9f6e40b5
					
				
							
								
								
									
										217
									
								
								core/candle.go
									
									
									
									
									
								
							
							
						
						
									
										217
									
								
								core/candle.go
									
									
									
									
									
								
							@ -225,46 +225,14 @@ func (cl *Candle) ToStruct(core *Core) error {
 | 
			
		||||
		fmt.Println("Error parsing string to float64:", err)
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	if cl.Data[6].(string) == 1 {
 | 
			
		||||
	if cl.Data[6].(string) == "1" {
 | 
			
		||||
		cl.Confirm = true
 | 
			
		||||
	}else {
 | 
			
		||||
	} else {
 | 
			
		||||
		cl.Confirm = false
 | 
			
		||||
	}
 | 
			
		||||
	cl.Data = nil
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
func (cl *Candle) SetToKey(core *Core) ([]interface{}, error) {
 | 
			
		||||
	data := cl.Data
 | 
			
		||||
	tsi, err := strconv.ParseInt(data[0].(string), 10, 64)
 | 
			
		||||
	tss := strconv.FormatInt(tsi, 10)
 | 
			
		||||
	keyName := "candle" + cl.Period + "|" + cl.InstId + "|ts:" + tss
 | 
			
		||||
	//过期时间:根号(当前candle的周期/1分钟)*10000
 | 
			
		||||
 | 
			
		||||
	dt, err := json.Marshal(cl.Data)
 | 
			
		||||
	exp := core.PeriodToMinutes(cl.Period)
 | 
			
		||||
	// expf := float64(exp) * 60
 | 
			
		||||
	expf := utils.Sqrt(float64(exp)) * 100
 | 
			
		||||
	extt := time.Duration(expf) * time.Minute
 | 
			
		||||
	curVolstr, _ := data[5].(string)
 | 
			
		||||
	curVol, err := strconv.ParseFloat(curVolstr, 64)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		fmt.Println("err of convert ts:", err)
 | 
			
		||||
	}
 | 
			
		||||
	curVolCcystr, _ := data[6].(string)
 | 
			
		||||
	curVolCcy, err := strconv.ParseFloat(curVolCcystr, 64)
 | 
			
		||||
	curPrice := curVolCcy / curVol
 | 
			
		||||
	if curPrice <= 0 {
 | 
			
		||||
		fmt.Println("price有问题", curPrice, "dt: ", string(dt), "from:", cl.From)
 | 
			
		||||
		err = errors.New("price有问题")
 | 
			
		||||
		return cl.Data, err
 | 
			
		||||
	}
 | 
			
		||||
	redisCli := core.RedisCli
 | 
			
		||||
	// tm := time.UnixMilli(tsi).Format("2006-01-02 15:04")
 | 
			
		||||
	fmt.Println("setToKey:", keyName, "ts: ", "price: ", curPrice, "from:", cl.From)
 | 
			
		||||
	redisCli.Set(keyName, dt, extt).Result()
 | 
			
		||||
	core.SaveUniKey(cl.Period, keyName, extt, tsi, cl)
 | 
			
		||||
	return cl.Data, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mx *MaX) SetToKey() ([]interface{}, error) {
 | 
			
		||||
	cstr := strconv.Itoa(mx.Count)
 | 
			
		||||
@ -443,10 +411,7 @@ func (core *Core) GetRangeKeyList(pattern string, from time.Time) ([]*simple.Jso
 | 
			
		||||
 | 
			
		||||
	return res, nil
 | 
			
		||||
}
 | 
			
		||||
	cl.Data = nil
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
func (cl *Candle) SetToKey(core *Core) ([]interface{}, error) {
 | 
			
		||||
	data := cl.Data
 | 
			
		||||
	tsi, err := strconv.ParseInt(data[0].(string), 10, 64)
 | 
			
		||||
@ -479,181 +444,3 @@ func (cl *Candle) SetToKey(core *Core) ([]interface{}, error) {
 | 
			
		||||
	core.SaveUniKey(cl.Period, keyName, extt, tsi, cl)
 | 
			
		||||
	return cl.Data, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mx *MaX) SetToKey() ([]interface{}, error) {
 | 
			
		||||
	cstr := strconv.Itoa(mx.Count)
 | 
			
		||||
	tss := strconv.FormatInt(mx.Ts, 10)
 | 
			
		||||
	keyName := "ma" + cstr + "|candle" + mx.Period + "|" + mx.InstId + "|ts:" + tss
 | 
			
		||||
	//过期时间:根号(当前candle的周期/1分钟)*10000
 | 
			
		||||
	dt := []interface{}{}
 | 
			
		||||
	dt = append(dt, mx.Ts)
 | 
			
		||||
	dt = append(dt, mx.Value)
 | 
			
		||||
	dj, _ := json.Marshal(dt)
 | 
			
		||||
	exp := mx.Core.PeriodToMinutes(mx.Period)
 | 
			
		||||
	expf := utils.Sqrt(float64(exp)) * 100
 | 
			
		||||
	extt := time.Duration(expf) * time.Minute
 | 
			
		||||
	// loc, _ := time.LoadLocation("Asia/Shanghai")
 | 
			
		||||
	// tm := time.UnixMilli(mx.Ts).In(loc).Format("2006-01-02 15:04")
 | 
			
		||||
	// fmt.Println("setToKey:", keyName, "ts:", tm, string(dj), "from: ", mx.From)
 | 
			
		||||
	_, err := mx.Core.RedisCli.GetSet(keyName, dj).Result()
 | 
			
		||||
	mx.Core.RedisCli.Expire(keyName, extt)
 | 
			
		||||
	return dt, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 保证同一个 period, keyName ,在一个周期里,SaveToSortSet只会被执行一次
 | 
			
		||||
func (core *Core) SaveUniKey(period string, keyName string, extt time.Duration, tsi int64, cl *Candle) {
 | 
			
		||||
	did := cl.InstId + cl.Period + cl.Data[0].(string)
 | 
			
		||||
	cl.Id = HashString(did)
 | 
			
		||||
	cl.ToStruct(core)
 | 
			
		||||
	cd, _ := json.Marshal(cl)
 | 
			
		||||
	wg := WriteLog{
 | 
			
		||||
		Content: cd,
 | 
			
		||||
		Tag:     "sardine.log.candle." + cl.Period,
 | 
			
		||||
		Id:      cl.Id,
 | 
			
		||||
	}
 | 
			
		||||
	core.WriteLogChan <- &wg
 | 
			
		||||
 | 
			
		||||
	refName := keyName + "|refer"
 | 
			
		||||
	refRes, _ := core.RedisCli.GetSet(refName, 1).Result()
 | 
			
		||||
	core.RedisCli.Expire(refName, extt)
 | 
			
		||||
	// 为保证唯一性机制,防止SaveToSortSet 被重复执行
 | 
			
		||||
	if len(refRes) != 0 {
 | 
			
		||||
		fmt.Println("refName exist: ", refName)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	core.SaveToSortSet(period, keyName, extt, tsi)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// tsi: 上报时间timeStamp millinSecond
 | 
			
		||||
func (core *Core) SaveToSortSet(period string, keyName string, extt time.Duration, tsi int64) {
 | 
			
		||||
	ary := strings.Split(keyName, "ts:")
 | 
			
		||||
	setName := ary[0] + "sortedSet"
 | 
			
		||||
	z := redis.Z{
 | 
			
		||||
		Score:  float64(tsi),
 | 
			
		||||
		Member: keyName,
 | 
			
		||||
	}
 | 
			
		||||
	rs, err := core.RedisCli.ZAdd(setName, z).Result()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		fmt.Println("err of ma7|ma30 add to redis:", err)
 | 
			
		||||
	} else {
 | 
			
		||||
		fmt.Println("sortedSet added to redis:", rs, keyName)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (cr *Core) PeriodToMinutes(period string) int64 {
 | 
			
		||||
	ary := strings.Split(period, "")
 | 
			
		||||
	beiStr := "1"
 | 
			
		||||
	danwei := ""
 | 
			
		||||
	if len(ary) == 3 {
 | 
			
		||||
		beiStr = ary[0] + ary[1]
 | 
			
		||||
		danwei = ary[2]
 | 
			
		||||
	} else {
 | 
			
		||||
		beiStr = ary[0]
 | 
			
		||||
		danwei = ary[1]
 | 
			
		||||
	}
 | 
			
		||||
	cheng := 1
 | 
			
		||||
	bei, _ := strconv.Atoi(beiStr)
 | 
			
		||||
	switch danwei {
 | 
			
		||||
	case "m":
 | 
			
		||||
		{
 | 
			
		||||
			cheng = bei
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
	case "H":
 | 
			
		||||
		{
 | 
			
		||||
			cheng = bei * 60
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
	case "D":
 | 
			
		||||
		{
 | 
			
		||||
			cheng = bei * 60 * 24
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
	case "W":
 | 
			
		||||
		{
 | 
			
		||||
			cheng = bei * 60 * 24 * 7
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
	case "M":
 | 
			
		||||
		{
 | 
			
		||||
			cheng = bei * 60 * 24 * 30
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
	case "Y":
 | 
			
		||||
		{
 | 
			
		||||
			cheng = bei * 60 * 24 * 365
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
	default:
 | 
			
		||||
		{
 | 
			
		||||
			fmt.Println("notmatch:", danwei)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return int64(cheng)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// type ScanCmd struct {
 | 
			
		||||
// baseCmd
 | 
			
		||||
//
 | 
			
		||||
// page   []string
 | 
			
		||||
// cursor uint64
 | 
			
		||||
//
 | 
			
		||||
// process func(cmd Cmder) error
 | 
			
		||||
// }
 | 
			
		||||
func (core *Core) GetRangeKeyList(pattern string, from time.Time) ([]*simple.Json, error) {
 | 
			
		||||
	// 比如,用来计算ma30或ma7,倒推多少时间范围,
 | 
			
		||||
	redisCli := core.RedisCli
 | 
			
		||||
	cursor := uint64(0)
 | 
			
		||||
	n := 0
 | 
			
		||||
	allTs := []int64{}
 | 
			
		||||
	var keys []string
 | 
			
		||||
	for {
 | 
			
		||||
		var err error
 | 
			
		||||
		keys, cursor, _ = redisCli.Scan(cursor, pattern+"*", 2000).Result()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			panic(err)
 | 
			
		||||
		}
 | 
			
		||||
		n += len(keys)
 | 
			
		||||
		if n == 0 {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// keys, _ := redisCli.Keys(pattern + "*").Result()
 | 
			
		||||
	for _, key := range keys {
 | 
			
		||||
		keyAry := strings.Split(key, ":")
 | 
			
		||||
		key = keyAry[1]
 | 
			
		||||
		keyi64, _ := strconv.ParseInt(key, 10, 64)
 | 
			
		||||
		allTs = append(allTs, keyi64)
 | 
			
		||||
	}
 | 
			
		||||
	nary := utils.RecursiveBubble(allTs, len(allTs))
 | 
			
		||||
	tt := from.UnixMilli()
 | 
			
		||||
	ff := tt - tt%60000
 | 
			
		||||
	fi := int64(ff)
 | 
			
		||||
	mary := []int64{}
 | 
			
		||||
	for _, v := range nary {
 | 
			
		||||
		if v < fi {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
		mary = append(mary, v)
 | 
			
		||||
	}
 | 
			
		||||
	res := []*simple.Json{}
 | 
			
		||||
	for _, v := range mary {
 | 
			
		||||
		// if k > 1 {
 | 
			
		||||
		// break
 | 
			
		||||
		// }
 | 
			
		||||
		nv := pattern + strconv.FormatInt(v, 10)
 | 
			
		||||
		str, err := redisCli.Get(nv).Result()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			fmt.Println("err of redis get key:", nv, err)
 | 
			
		||||
		}
 | 
			
		||||
		cur, err := simple.NewJson([]byte(str))
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			fmt.Println("err of create newJson:", str, err)
 | 
			
		||||
		}
 | 
			
		||||
		res = append(res, cur)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return res, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user