core被单独抽离出来做独立模块了
This commit is contained in:
		
							parent
							
								
									0932c04a66
								
							
						
					
					
						commit
						7297621f3d
					
				
							
								
								
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							@ -1,2 +1,3 @@
 | 
				
			|||||||
vendor/
 | 
					vendor/
 | 
				
			||||||
tunas
 | 
					tunas
 | 
				
			||||||
 | 
					texus
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										470
									
								
								core/candle.go
									
									
									
									
									
								
							
							
						
						
									
										470
									
								
								core/candle.go
									
									
									
									
									
								
							@ -1,470 +0,0 @@
 | 
				
			|||||||
package core
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
import (
 | 
					 | 
				
			||||||
	"crypto/sha256"
 | 
					 | 
				
			||||||
	"encoding/hex"
 | 
					 | 
				
			||||||
	"encoding/json"
 | 
					 | 
				
			||||||
	"errors"
 | 
					 | 
				
			||||||
	"fmt"
 | 
					 | 
				
			||||||
	"math/rand"
 | 
					 | 
				
			||||||
	"os"
 | 
					 | 
				
			||||||
	"strconv"
 | 
					 | 
				
			||||||
	"strings"
 | 
					 | 
				
			||||||
	"time"
 | 
					 | 
				
			||||||
	// "v5sdk_go/rest"
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	simple "github.com/bitly/go-simplejson"
 | 
					 | 
				
			||||||
	"github.com/go-redis/redis"
 | 
					 | 
				
			||||||
	"github.com/phyer/texus/utils"
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
type Candle struct {
 | 
					 | 
				
			||||||
	Id        string `json:"_id"`
 | 
					 | 
				
			||||||
	core      *Core
 | 
					 | 
				
			||||||
	InstId    string
 | 
					 | 
				
			||||||
	Period    string
 | 
					 | 
				
			||||||
	Data      []interface{}
 | 
					 | 
				
			||||||
	From      string
 | 
					 | 
				
			||||||
	Timestamp time.Time
 | 
					 | 
				
			||||||
	Open      float64
 | 
					 | 
				
			||||||
	High      float64
 | 
					 | 
				
			||||||
	Low       float64
 | 
					 | 
				
			||||||
	Close     float64
 | 
					 | 
				
			||||||
	VolCcy    float64
 | 
					 | 
				
			||||||
	Confirm   bool
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
type MaX struct {
 | 
					 | 
				
			||||||
	Core    *Core
 | 
					 | 
				
			||||||
	InstId  string
 | 
					 | 
				
			||||||
	Period  string
 | 
					 | 
				
			||||||
	KeyName string
 | 
					 | 
				
			||||||
	Count   int
 | 
					 | 
				
			||||||
	Ts      int64
 | 
					 | 
				
			||||||
	Value   float64
 | 
					 | 
				
			||||||
	Data    []interface{}
 | 
					 | 
				
			||||||
	From    string
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
type MatchCheck struct {
 | 
					 | 
				
			||||||
	Minutes int64
 | 
					 | 
				
			||||||
	Matched bool
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (mc *MatchCheck) SetMatched(value bool) {
 | 
					 | 
				
			||||||
	mc.Matched = value
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (core *Core) GetCandlesWithRest(instId string, kidx int, dura time.Duration, maxCandles int) error {
 | 
					 | 
				
			||||||
	ary := []string{}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	wsary := core.Cfg.CandleDimentions
 | 
					 | 
				
			||||||
	for k, v := range wsary {
 | 
					 | 
				
			||||||
		matched := false
 | 
					 | 
				
			||||||
		// 这个算法的目的是:越靠后的candles维度,被命中的概率越低,第一个百分之百命中,后面开始越来越低, 每分钟都会发生这样的计算,
 | 
					 | 
				
			||||||
		// 因为维度多了的话,照顾不过来
 | 
					 | 
				
			||||||
		rand.New(rand.NewSource(time.Now().UnixNano()))
 | 
					 | 
				
			||||||
		rand.Seed(time.Now().UnixNano())
 | 
					 | 
				
			||||||
		n := (k*2 + 2) * 3
 | 
					 | 
				
			||||||
		if n < 1 {
 | 
					 | 
				
			||||||
			n = 1
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		b := rand.Intn(n)
 | 
					 | 
				
			||||||
		if b < 8 {
 | 
					 | 
				
			||||||
			matched = true
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		if matched {
 | 
					 | 
				
			||||||
			ary = append(ary, v)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	mdura := dura/(time.Duration(len(ary)+1)) - 50*time.Millisecond
 | 
					 | 
				
			||||||
	// fmt.Println("loop4 Ticker Start instId, dura: ", instId, dura, dura/10, mdura, len(ary), " idx: ", kidx)
 | 
					 | 
				
			||||||
	// time.Duration(len(ary)+1)
 | 
					 | 
				
			||||||
	ticker := time.NewTicker(mdura)
 | 
					 | 
				
			||||||
	done := make(chan bool)
 | 
					 | 
				
			||||||
	idx := 0
 | 
					 | 
				
			||||||
	go func(i int) {
 | 
					 | 
				
			||||||
		for {
 | 
					 | 
				
			||||||
			select {
 | 
					 | 
				
			||||||
			case <-ticker.C:
 | 
					 | 
				
			||||||
				if i >= (len(ary)) {
 | 
					 | 
				
			||||||
					done <- true
 | 
					 | 
				
			||||||
					break
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
				rand.Seed(time.Now().UnixNano())
 | 
					 | 
				
			||||||
				b := rand.Intn(2)
 | 
					 | 
				
			||||||
				maxCandles = maxCandles * (i + b) * 2
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
				if maxCandles < 3 {
 | 
					 | 
				
			||||||
					maxCandles = 3
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
				if maxCandles > 30 {
 | 
					 | 
				
			||||||
					maxCandles = 30
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
				mx := strconv.Itoa(maxCandles)
 | 
					 | 
				
			||||||
				// fmt.Println("loop4 getCandlesWithRest, instId, period,limit,dura, t: ", instId, ary[i], mx, mdura)
 | 
					 | 
				
			||||||
				go func(ii int) {
 | 
					 | 
				
			||||||
					restQ := RestQueue{
 | 
					 | 
				
			||||||
						InstId:   instId,
 | 
					 | 
				
			||||||
						Bar:      ary[ii],
 | 
					 | 
				
			||||||
						Limit:    mx,
 | 
					 | 
				
			||||||
						Duration: mdura,
 | 
					 | 
				
			||||||
						WithWs:   true,
 | 
					 | 
				
			||||||
					}
 | 
					 | 
				
			||||||
					js, _ := json.Marshal(restQ)
 | 
					 | 
				
			||||||
					core.RedisCli.LPush("restQueue", js)
 | 
					 | 
				
			||||||
				}(i)
 | 
					 | 
				
			||||||
				i++
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}(idx)
 | 
					 | 
				
			||||||
	time.Sleep(dura - 10*time.Millisecond)
 | 
					 | 
				
			||||||
	ticker.Stop()
 | 
					 | 
				
			||||||
	// fmt.Println("loop4 Ticker stopped instId, dura: ", instId, dura, mdura)
 | 
					 | 
				
			||||||
	done <- true
 | 
					 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// 当前的时间毫秒数 对于某个时间段,比如3分钟,10分钟,是否可以被整除,
 | 
					 | 
				
			||||||
func IsModOf(curInt int64, duration time.Duration) bool {
 | 
					 | 
				
			||||||
	vol := int64(0)
 | 
					 | 
				
			||||||
	if duration < 24*time.Hour {
 | 
					 | 
				
			||||||
		// 小于1天
 | 
					 | 
				
			||||||
		vol = (curInt + 28800000)
 | 
					 | 
				
			||||||
	} else if duration >= 24*time.Hour && duration < 48*time.Hour {
 | 
					 | 
				
			||||||
		// 1天
 | 
					 | 
				
			||||||
		vol = curInt - 1633881600000
 | 
					 | 
				
			||||||
	} else if duration >= 48*time.Hour && duration < 72*time.Hour {
 | 
					 | 
				
			||||||
		// 2天
 | 
					 | 
				
			||||||
		vol = curInt - 1633795200000
 | 
					 | 
				
			||||||
	} else if duration >= 72*time.Hour && duration < 120*time.Hour {
 | 
					 | 
				
			||||||
		// 3天
 | 
					 | 
				
			||||||
		vol = curInt - 1633708800000
 | 
					 | 
				
			||||||
	} else if duration >= 120*time.Hour {
 | 
					 | 
				
			||||||
		// 5天
 | 
					 | 
				
			||||||
		vol = curInt - 1633795200000
 | 
					 | 
				
			||||||
	} else {
 | 
					 | 
				
			||||||
		// fmt.Println("noMatched:", curInt)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	mody := vol % duration.Milliseconds()
 | 
					 | 
				
			||||||
	if mody == 0 {
 | 
					 | 
				
			||||||
		return true
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return false
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (core *Core) SaveCandle(instId string, period string, rsp *CandleData, dura time.Duration, withWs bool) {
 | 
					 | 
				
			||||||
	leng := len(rsp.Data)
 | 
					 | 
				
			||||||
	for _, v := range rsp.Data {
 | 
					 | 
				
			||||||
		candle := Candle{
 | 
					 | 
				
			||||||
			InstId: instId,
 | 
					 | 
				
			||||||
			Period: period,
 | 
					 | 
				
			||||||
			Data:   v,
 | 
					 | 
				
			||||||
			From:   "rest",
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		//存到elasticSearch
 | 
					 | 
				
			||||||
		candle.PushToWriteLogChan(core)
 | 
					 | 
				
			||||||
		//保存rest得到的candle
 | 
					 | 
				
			||||||
		// 发布到allCandles|publish, 给外部订阅者用于setToKey
 | 
					 | 
				
			||||||
		arys := []string{ALLCANDLES_PUBLISH}
 | 
					 | 
				
			||||||
		if withWs {
 | 
					 | 
				
			||||||
			arys = append(arys, ALLCANDLES_INNER_PUBLISH)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		// 如果candle都不需要存到redis,那么AddToGeneralCandleChnl也没有意义
 | 
					 | 
				
			||||||
		saveCandle := os.Getenv("TEXUS_SAVECANDLE")
 | 
					 | 
				
			||||||
		if saveCandle == "true" {
 | 
					 | 
				
			||||||
			candle.SetToKey(core)
 | 
					 | 
				
			||||||
			core.AddToGeneralCandleChnl(&candle, arys)
 | 
					 | 
				
			||||||
			time.Sleep(dura / time.Duration(leng))
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (candle *Candle) PushToWriteLogChan(cr *Core) error {
 | 
					 | 
				
			||||||
	did := candle.InstId + candle.Period + candle.Data[0].(string)
 | 
					 | 
				
			||||||
	candle.Id = HashString(did)
 | 
					 | 
				
			||||||
	ncd, _ := candle.ToStruct(cr)
 | 
					 | 
				
			||||||
	fmt.Println("ncd: ", ncd)
 | 
					 | 
				
			||||||
	cd, _ := json.Marshal(ncd)
 | 
					 | 
				
			||||||
	wg := WriteLog{
 | 
					 | 
				
			||||||
		Content: cd,
 | 
					 | 
				
			||||||
		Tag:     "sardine.log.candle." + candle.Period,
 | 
					 | 
				
			||||||
		Id:      candle.Id,
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	cr.WriteLogChan <- &wg
 | 
					 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func Daoxu(arr []interface{}) {
 | 
					 | 
				
			||||||
	var temp interface{}
 | 
					 | 
				
			||||||
	length := len(arr)
 | 
					 | 
				
			||||||
	for i := 0; i < length/2; i++ {
 | 
					 | 
				
			||||||
		temp = arr[i]
 | 
					 | 
				
			||||||
		arr[i] = arr[length-1-i]
 | 
					 | 
				
			||||||
		arr[length-1-i] = temp
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
func HashString(input string) string {
 | 
					 | 
				
			||||||
	// 计算SHA-256哈希值
 | 
					 | 
				
			||||||
	hash := sha256.Sum256([]byte(input))
 | 
					 | 
				
			||||||
	// 转换为十六进制字符串
 | 
					 | 
				
			||||||
	hashHex := hex.EncodeToString(hash[:])
 | 
					 | 
				
			||||||
	// 返回前20位
 | 
					 | 
				
			||||||
	return hashHex[:23]
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (cl *Candle) ToStruct(core *Core) (*Candle, error) {
 | 
					 | 
				
			||||||
	// cl.Timestamp
 | 
					 | 
				
			||||||
	ncd := Candle{}
 | 
					 | 
				
			||||||
	ncd.Id = cl.Id
 | 
					 | 
				
			||||||
	ncd.Period = cl.Period
 | 
					 | 
				
			||||||
	ncd.InstId = cl.InstId
 | 
					 | 
				
			||||||
	ncd.From = cl.From
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// 将字符串转换为 int64 类型的时间戳
 | 
					 | 
				
			||||||
	ts, err := strconv.ParseInt(cl.Data[0].(string), 10, 64)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		fmt.Println("Error parsing timestamp:", err)
 | 
					 | 
				
			||||||
		return nil, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	ncd.Timestamp = time.Unix(ts/1000, (ts%1000)*1000000) // 纳秒级别
 | 
					 | 
				
			||||||
	op, err := strconv.ParseFloat(cl.Data[1].(string), 64)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		fmt.Println("Error parsing string to float64:", err)
 | 
					 | 
				
			||||||
		return nil, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	ncd.Open = op
 | 
					 | 
				
			||||||
	hi, err := strconv.ParseFloat(cl.Data[2].(string), 64)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		fmt.Println("Error parsing string to float64:", err)
 | 
					 | 
				
			||||||
		return nil, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	ncd.High = hi
 | 
					 | 
				
			||||||
	lo, err := strconv.ParseFloat(cl.Data[3].(string), 64)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		fmt.Println("Error parsing string to float64:", err)
 | 
					 | 
				
			||||||
		return nil, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	ncd.Low = lo
 | 
					 | 
				
			||||||
	clse, err := strconv.ParseFloat(cl.Data[4].(string), 64)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		fmt.Println("Error parsing string to float64:", err)
 | 
					 | 
				
			||||||
		return nil, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	ncd.Close = clse
 | 
					 | 
				
			||||||
	ncd.VolCcy, err = strconv.ParseFloat(cl.Data[6].(string), 64)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		fmt.Println("Error parsing string to float64:", err)
 | 
					 | 
				
			||||||
		return nil, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if cl.Data[6].(string) == "1" {
 | 
					 | 
				
			||||||
		ncd.Confirm = true
 | 
					 | 
				
			||||||
	} else {
 | 
					 | 
				
			||||||
		ncd.Confirm = false
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return &ncd, nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (mx *MaX) SetToKey() ([]interface{}, error) {
 | 
					 | 
				
			||||||
	cstr := strconv.Itoa(mx.Count)
 | 
					 | 
				
			||||||
	tss := strconv.FormatInt(mx.Ts, 10)
 | 
					 | 
				
			||||||
	keyName := "ma" + cstr + "|candle" + mx.Period + "|" + mx.InstId + "|ts:" + tss
 | 
					 | 
				
			||||||
	//过期时间:根号(当前candle的周期/1分钟)*10000
 | 
					 | 
				
			||||||
	dt := []interface{}{}
 | 
					 | 
				
			||||||
	dt = append(dt, mx.Ts)
 | 
					 | 
				
			||||||
	dt = append(dt, mx.Value)
 | 
					 | 
				
			||||||
	dj, _ := json.Marshal(dt)
 | 
					 | 
				
			||||||
	exp := mx.Core.PeriodToMinutes(mx.Period)
 | 
					 | 
				
			||||||
	expf := utils.Sqrt(float64(exp)) * 100
 | 
					 | 
				
			||||||
	extt := time.Duration(expf) * time.Minute
 | 
					 | 
				
			||||||
	// loc, _ := time.LoadLocation("Asia/Shanghai")
 | 
					 | 
				
			||||||
	// tm := time.UnixMilli(mx.Ts).In(loc).Format("2006-01-02 15:04")
 | 
					 | 
				
			||||||
	// fmt.Println("setToKey:", keyName, "ts:", tm, string(dj), "from: ", mx.From)
 | 
					 | 
				
			||||||
	_, err := mx.Core.RedisCli.GetSet(keyName, dj).Result()
 | 
					 | 
				
			||||||
	mx.Core.RedisCli.Expire(keyName, extt)
 | 
					 | 
				
			||||||
	return dt, err
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// 保证同一个 period, keyName ,在一个周期里,SaveToSortSet只会被执行一次
 | 
					 | 
				
			||||||
func (core *Core) SaveUniKey(period string, keyName string, extt time.Duration, tsi int64, cl *Candle) {
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	refName := keyName + "|refer"
 | 
					 | 
				
			||||||
	refRes, _ := core.RedisCli.GetSet(refName, 1).Result()
 | 
					 | 
				
			||||||
	core.RedisCli.Expire(refName, extt)
 | 
					 | 
				
			||||||
	// 为保证唯一性机制,防止SaveToSortSet 被重复执行
 | 
					 | 
				
			||||||
	if len(refRes) != 0 {
 | 
					 | 
				
			||||||
		fmt.Println("refName exist: ", refName)
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	core.SaveToSortSet(period, keyName, extt, tsi)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// tsi: 上报时间timeStamp millinSecond
 | 
					 | 
				
			||||||
func (core *Core) SaveToSortSet(period string, keyName string, extt time.Duration, tsi int64) {
 | 
					 | 
				
			||||||
	ary := strings.Split(keyName, "ts:")
 | 
					 | 
				
			||||||
	setName := ary[0] + "sortedSet"
 | 
					 | 
				
			||||||
	z := redis.Z{
 | 
					 | 
				
			||||||
		Score:  float64(tsi),
 | 
					 | 
				
			||||||
		Member: keyName,
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	rs, err := core.RedisCli.ZAdd(setName, z).Result()
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		fmt.Println("err of ma7|ma30 add to redis:", err)
 | 
					 | 
				
			||||||
	} else {
 | 
					 | 
				
			||||||
		fmt.Println("sortedSet added to redis:", rs, keyName)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (cr *Core) PeriodToMinutes(period string) int64 {
 | 
					 | 
				
			||||||
	ary := strings.Split(period, "")
 | 
					 | 
				
			||||||
	beiStr := "1"
 | 
					 | 
				
			||||||
	danwei := ""
 | 
					 | 
				
			||||||
	if len(ary) == 3 {
 | 
					 | 
				
			||||||
		beiStr = ary[0] + ary[1]
 | 
					 | 
				
			||||||
		danwei = ary[2]
 | 
					 | 
				
			||||||
	} else {
 | 
					 | 
				
			||||||
		beiStr = ary[0]
 | 
					 | 
				
			||||||
		danwei = ary[1]
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	cheng := 1
 | 
					 | 
				
			||||||
	bei, _ := strconv.Atoi(beiStr)
 | 
					 | 
				
			||||||
	switch danwei {
 | 
					 | 
				
			||||||
	case "m":
 | 
					 | 
				
			||||||
		{
 | 
					 | 
				
			||||||
			cheng = bei
 | 
					 | 
				
			||||||
			break
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	case "H":
 | 
					 | 
				
			||||||
		{
 | 
					 | 
				
			||||||
			cheng = bei * 60
 | 
					 | 
				
			||||||
			break
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	case "D":
 | 
					 | 
				
			||||||
		{
 | 
					 | 
				
			||||||
			cheng = bei * 60 * 24
 | 
					 | 
				
			||||||
			break
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	case "W":
 | 
					 | 
				
			||||||
		{
 | 
					 | 
				
			||||||
			cheng = bei * 60 * 24 * 7
 | 
					 | 
				
			||||||
			break
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	case "M":
 | 
					 | 
				
			||||||
		{
 | 
					 | 
				
			||||||
			cheng = bei * 60 * 24 * 30
 | 
					 | 
				
			||||||
			break
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	case "Y":
 | 
					 | 
				
			||||||
		{
 | 
					 | 
				
			||||||
			cheng = bei * 60 * 24 * 365
 | 
					 | 
				
			||||||
			break
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	default:
 | 
					 | 
				
			||||||
		{
 | 
					 | 
				
			||||||
			fmt.Println("notmatch:", danwei)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return int64(cheng)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// type ScanCmd struct {
 | 
					 | 
				
			||||||
// baseCmd
 | 
					 | 
				
			||||||
//
 | 
					 | 
				
			||||||
// page   []string
 | 
					 | 
				
			||||||
// cursor uint64
 | 
					 | 
				
			||||||
//
 | 
					 | 
				
			||||||
// process func(cmd Cmder) error
 | 
					 | 
				
			||||||
// }
 | 
					 | 
				
			||||||
func (core *Core) GetRangeKeyList(pattern string, from time.Time) ([]*simple.Json, error) {
 | 
					 | 
				
			||||||
	// 比如,用来计算ma30或ma7,倒推多少时间范围,
 | 
					 | 
				
			||||||
	redisCli := core.RedisCli
 | 
					 | 
				
			||||||
	cursor := uint64(0)
 | 
					 | 
				
			||||||
	n := 0
 | 
					 | 
				
			||||||
	allTs := []int64{}
 | 
					 | 
				
			||||||
	var keys []string
 | 
					 | 
				
			||||||
	for {
 | 
					 | 
				
			||||||
		var err error
 | 
					 | 
				
			||||||
		keys, cursor, _ = redisCli.Scan(cursor, pattern+"*", 2000).Result()
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			panic(err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		n += len(keys)
 | 
					 | 
				
			||||||
		if n == 0 {
 | 
					 | 
				
			||||||
			break
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// keys, _ := redisCli.Keys(pattern + "*").Result()
 | 
					 | 
				
			||||||
	for _, key := range keys {
 | 
					 | 
				
			||||||
		keyAry := strings.Split(key, ":")
 | 
					 | 
				
			||||||
		key = keyAry[1]
 | 
					 | 
				
			||||||
		keyi64, _ := strconv.ParseInt(key, 10, 64)
 | 
					 | 
				
			||||||
		allTs = append(allTs, keyi64)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	nary := utils.RecursiveBubble(allTs, len(allTs))
 | 
					 | 
				
			||||||
	tt := from.UnixMilli()
 | 
					 | 
				
			||||||
	ff := tt - tt%60000
 | 
					 | 
				
			||||||
	fi := int64(ff)
 | 
					 | 
				
			||||||
	mary := []int64{}
 | 
					 | 
				
			||||||
	for _, v := range nary {
 | 
					 | 
				
			||||||
		if v < fi {
 | 
					 | 
				
			||||||
			break
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		mary = append(mary, v)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	res := []*simple.Json{}
 | 
					 | 
				
			||||||
	for _, v := range mary {
 | 
					 | 
				
			||||||
		// if k > 1 {
 | 
					 | 
				
			||||||
		// break
 | 
					 | 
				
			||||||
		// }
 | 
					 | 
				
			||||||
		nv := pattern + strconv.FormatInt(v, 10)
 | 
					 | 
				
			||||||
		str, err := redisCli.Get(nv).Result()
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			fmt.Println("err of redis get key:", nv, err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		cur, err := simple.NewJson([]byte(str))
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			fmt.Println("err of create newJson:", str, err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		res = append(res, cur)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return res, nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (cl *Candle) SetToKey(core *Core) ([]interface{}, error) {
 | 
					 | 
				
			||||||
	data := cl.Data
 | 
					 | 
				
			||||||
	tsi, err := strconv.ParseInt(data[0].(string), 10, 64)
 | 
					 | 
				
			||||||
	tss := strconv.FormatInt(tsi, 10)
 | 
					 | 
				
			||||||
	keyName := "candle" + cl.Period + "|" + cl.InstId + "|ts:" + tss
 | 
					 | 
				
			||||||
	//过期时间:根号(当前candle的周期/1分钟)*10000
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	dt, err := json.Marshal(cl.Data)
 | 
					 | 
				
			||||||
	exp := core.PeriodToMinutes(cl.Period)
 | 
					 | 
				
			||||||
	// expf := float64(exp) * 60
 | 
					 | 
				
			||||||
	expf := utils.Sqrt(float64(exp)) * 100
 | 
					 | 
				
			||||||
	extt := time.Duration(expf) * time.Minute
 | 
					 | 
				
			||||||
	curVolstr, _ := data[5].(string)
 | 
					 | 
				
			||||||
	curVol, err := strconv.ParseFloat(curVolstr, 64)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		fmt.Println("err of convert ts:", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	curVolCcystr, _ := data[6].(string)
 | 
					 | 
				
			||||||
	curVolCcy, err := strconv.ParseFloat(curVolCcystr, 64)
 | 
					 | 
				
			||||||
	curPrice := curVolCcy / curVol
 | 
					 | 
				
			||||||
	if curPrice <= 0 {
 | 
					 | 
				
			||||||
		fmt.Println("price有问题", curPrice, "dt: ", string(dt), "from:", cl.From)
 | 
					 | 
				
			||||||
		err = errors.New("price有问题")
 | 
					 | 
				
			||||||
		return cl.Data, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	redisCli := core.RedisCli
 | 
					 | 
				
			||||||
	// tm := time.UnixMilli(tsi).Format("2006-01-02 15:04")
 | 
					 | 
				
			||||||
	fmt.Println("setToKey:", keyName, "ts: ", "price: ", curPrice, "from:", cl.From)
 | 
					 | 
				
			||||||
	redisCli.Set(keyName, dt, extt).Result()
 | 
					 | 
				
			||||||
	core.SaveUniKey(cl.Period, keyName, extt, tsi, cl)
 | 
					 | 
				
			||||||
	return cl.Data, err
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
							
								
								
									
										122
									
								
								core/config.go
									
									
									
									
									
								
							
							
						
						
									
										122
									
								
								core/config.go
									
									
									
									
									
								
							@ -1,122 +0,0 @@
 | 
				
			|||||||
package core
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
import (
 | 
					 | 
				
			||||||
	"fmt"
 | 
					 | 
				
			||||||
	"io/ioutil"
 | 
					 | 
				
			||||||
	"log"
 | 
					 | 
				
			||||||
	"os"
 | 
					 | 
				
			||||||
	"strings"
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	simple "github.com/bitly/go-simplejson"
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
type MyConfig struct {
 | 
					 | 
				
			||||||
	Env                    string `json:"env", string`
 | 
					 | 
				
			||||||
	Config                 *simple.Json
 | 
					 | 
				
			||||||
	CandleDimentions       []string          `json:"candleDimentions"`
 | 
					 | 
				
			||||||
	RedisConf              *RedisConfig      `json:"redis"`
 | 
					 | 
				
			||||||
	CredentialReadOnlyConf *CredentialConfig `json:"credential"`
 | 
					 | 
				
			||||||
	CredentialMutableConf  *CredentialConfig `json:"credential"`
 | 
					 | 
				
			||||||
	ConnectConf            *ConnectConfig    `json:"connect"`
 | 
					 | 
				
			||||||
	// ThreadsConf    *ThreadsConfig    `json:"threads"`
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
type RedisConfig struct {
 | 
					 | 
				
			||||||
	Url      string `json:"url", string`
 | 
					 | 
				
			||||||
	Password string `json:"password", string`
 | 
					 | 
				
			||||||
	Index    int    `json:"index", string`
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
type CredentialConfig struct {
 | 
					 | 
				
			||||||
	SecretKey          string `json:"secretKey", string`
 | 
					 | 
				
			||||||
	BaseUrl            string `json:"baseUrl", string`
 | 
					 | 
				
			||||||
	OkAccessKey        string `json:"okAccessKey", string`
 | 
					 | 
				
			||||||
	OkAccessPassphrase string `json:"okAccessPassphrase", string`
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
type ConnectConfig struct {
 | 
					 | 
				
			||||||
	LoginSubUrl      string `json:"loginSubUrl", string`
 | 
					 | 
				
			||||||
	WsPrivateBaseUrl string `json:"wsPrivateBaseUrl", string`
 | 
					 | 
				
			||||||
	WsPublicBaseUrl  string `json:"wsPublicBaseUrl", string`
 | 
					 | 
				
			||||||
	RestBaseUrl      string `json:"restBaseUrl", string`
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
type ThreadsConfig struct {
 | 
					 | 
				
			||||||
	MaxLenTickerStream int `json:"maxLenTickerStream", int`
 | 
					 | 
				
			||||||
	MaxCandles         int `json:"maxCandles", string`
 | 
					 | 
				
			||||||
	AsyncChannels      int `json:"asyncChannels", int`
 | 
					 | 
				
			||||||
	MaxTickers         int `json:"maxTickers", int`
 | 
					 | 
				
			||||||
	RestPeriod         int `json:"restPeriod", int`
 | 
					 | 
				
			||||||
	WaitWs             int `json:"waitWs", int`
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (cfg MyConfig) Init() (MyConfig, error) {
 | 
					 | 
				
			||||||
	env := os.Getenv("GO_ENV")
 | 
					 | 
				
			||||||
	arystr := os.Getenv("TUNAS_CANDLESDIMENTIONS")
 | 
					 | 
				
			||||||
	ary := strings.Split(arystr, "|")
 | 
					 | 
				
			||||||
	cfg.CandleDimentions = ary
 | 
					 | 
				
			||||||
	jsonStr, err := ioutil.ReadFile("/go/json/basicConfig.json")
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		jsonStr, err = ioutil.ReadFile("configs/basicConfig.json")
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			fmt.Println("err2:", err.Error())
 | 
					 | 
				
			||||||
			return cfg, err
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		cfg.Config, err = simple.NewJson([]byte(jsonStr))
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			fmt.Println("err2:", err.Error())
 | 
					 | 
				
			||||||
			return cfg, err
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		cfg.Env = env
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	cfg.Config = cfg.Config.Get(env)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	ru, err := cfg.Config.Get("redis").Get("url").String()
 | 
					 | 
				
			||||||
	rp, _ := cfg.Config.Get("redis").Get("password").String()
 | 
					 | 
				
			||||||
	ri, _ := cfg.Config.Get("redis").Get("index").Int()
 | 
					 | 
				
			||||||
	redisConf := RedisConfig{
 | 
					 | 
				
			||||||
		Url:      ru,
 | 
					 | 
				
			||||||
		Password: rp,
 | 
					 | 
				
			||||||
		Index:    ri,
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	// fmt.Println("cfg: ", cfg)
 | 
					 | 
				
			||||||
	cfg.RedisConf = &redisConf
 | 
					 | 
				
			||||||
	ls, _ := cfg.Config.Get("connect").Get("loginSubUrl").String()
 | 
					 | 
				
			||||||
	wsPub, _ := cfg.Config.Get("connect").Get("wsPrivateBaseUrl").String()
 | 
					 | 
				
			||||||
	wsPri, _ := cfg.Config.Get("connect").Get("wsPublicBaseUrl").String()
 | 
					 | 
				
			||||||
	restBu, _ := cfg.Config.Get("connect").Get("restBaseUrl").String()
 | 
					 | 
				
			||||||
	connectConfig := ConnectConfig{
 | 
					 | 
				
			||||||
		LoginSubUrl:      ls,
 | 
					 | 
				
			||||||
		WsPublicBaseUrl:  wsPub,
 | 
					 | 
				
			||||||
		WsPrivateBaseUrl: wsPri,
 | 
					 | 
				
			||||||
		RestBaseUrl:      restBu,
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	cfg.ConnectConf = &connectConfig
 | 
					 | 
				
			||||||
	return cfg, nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (cfg *MyConfig) GetConfigJson(arr []string) *simple.Json {
 | 
					 | 
				
			||||||
	env := os.Getenv("GO_ENV")
 | 
					 | 
				
			||||||
	fmt.Println("env: ", env)
 | 
					 | 
				
			||||||
	cfg.Env = env
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	json, err := ioutil.ReadFile("/go/json/basicConfig.json")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		json, err = ioutil.ReadFile("configs/basicConfig.json")
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			log.Panic("read config error: ", err.Error())
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		fmt.Println("read file err: ", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	rjson, err := simple.NewJson(json)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		fmt.Println("newJson err: ", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	for _, s := range arr {
 | 
					 | 
				
			||||||
		rjson = rjson.Get(s)
 | 
					 | 
				
			||||||
		// fmt.Println(s, ": ", rjson)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return rjson
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
@ -1,12 +0,0 @@
 | 
				
			|||||||
package core
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
const MAIN_ALLCOINS_PERIOD_MINUTES = 1
 | 
					 | 
				
			||||||
const MAIN_ALLCOINS_ONCE_COUNTS = 3
 | 
					 | 
				
			||||||
const MAIN_ALLCOINS_BAR_PERIOD = "3m"
 | 
					 | 
				
			||||||
const ALLCANDLES_PUBLISH = "allCandles|publish"
 | 
					 | 
				
			||||||
const ALLCANDLES_INNER_PUBLISH = "allCandlesInner|publish"
 | 
					 | 
				
			||||||
const ORDER_PUBLISH = "private|order|publish"
 | 
					 | 
				
			||||||
const TICKERINFO_PUBLISH = "tickerInfo|publish"
 | 
					 | 
				
			||||||
const CCYPOSISTIONS_PUBLISH = "ccyPositions|publish"
 | 
					 | 
				
			||||||
const SUBACTIONS_PUBLISH = "subActions|publish"
 | 
					 | 
				
			||||||
const ORDER_RESP_PUBLISH = "private|actionResp|publish"
 | 
					 | 
				
			||||||
							
								
								
									
										532
									
								
								core/core.go
									
									
									
									
									
								
							
							
						
						
									
										532
									
								
								core/core.go
									
									
									
									
									
								
							@ -1,532 +0,0 @@
 | 
				
			|||||||
package core
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
import (
 | 
					 | 
				
			||||||
	"context"
 | 
					 | 
				
			||||||
	"encoding/json"
 | 
					 | 
				
			||||||
	// "errors"
 | 
					 | 
				
			||||||
	"fmt"
 | 
					 | 
				
			||||||
	// "math/rand"
 | 
					 | 
				
			||||||
	"io/ioutil"
 | 
					 | 
				
			||||||
	"net/http"
 | 
					 | 
				
			||||||
	"os"
 | 
					 | 
				
			||||||
	"strconv"
 | 
					 | 
				
			||||||
	"strings"
 | 
					 | 
				
			||||||
	"sync"
 | 
					 | 
				
			||||||
	"time"
 | 
					 | 
				
			||||||
	"v5sdk_go/rest"
 | 
					 | 
				
			||||||
	"v5sdk_go/ws"
 | 
					 | 
				
			||||||
	// "v5sdk_go/ws/wImpl"
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	simple "github.com/bitly/go-simplejson"
 | 
					 | 
				
			||||||
	"github.com/go-redis/redis"
 | 
					 | 
				
			||||||
	"github.com/phyer/texus/private"
 | 
					 | 
				
			||||||
	"github.com/phyer/texus/utils"
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
type Core struct {
 | 
					 | 
				
			||||||
	Env           string
 | 
					 | 
				
			||||||
	Cfg           *MyConfig
 | 
					 | 
				
			||||||
	RedisCli      *redis.Client
 | 
					 | 
				
			||||||
	RedisCli2     *redis.Client
 | 
					 | 
				
			||||||
	FluentBitUrl  string
 | 
					 | 
				
			||||||
	Wg            sync.WaitGroup
 | 
					 | 
				
			||||||
	RestQueueChan chan *RestQueue
 | 
					 | 
				
			||||||
	OrderChan     chan *private.Order
 | 
					 | 
				
			||||||
	WriteLogChan  chan *WriteLog
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
type RestQueue struct {
 | 
					 | 
				
			||||||
	InstId   string
 | 
					 | 
				
			||||||
	Bar      string
 | 
					 | 
				
			||||||
	After    int64
 | 
					 | 
				
			||||||
	Before   int64
 | 
					 | 
				
			||||||
	Limit    string
 | 
					 | 
				
			||||||
	Duration time.Duration
 | 
					 | 
				
			||||||
	WithWs   bool
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
type CandleData struct {
 | 
					 | 
				
			||||||
	Code string          `json:"code"`
 | 
					 | 
				
			||||||
	Msg  string          `json:"msg"`
 | 
					 | 
				
			||||||
	Data [][]interface{} `json:"data"` // 用二维数组来接收 candles 数据
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
type SubAction struct {
 | 
					 | 
				
			||||||
	ActionName string
 | 
					 | 
				
			||||||
	ForAll     bool
 | 
					 | 
				
			||||||
	MetaInfo   map[string]interface{}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (rst *RestQueue) Show(cr *Core) {
 | 
					 | 
				
			||||||
	fmt.Println("restQueue:", rst.InstId, rst.Bar, rst.Limit)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (rst *RestQueue) Save(cr *Core) {
 | 
					 | 
				
			||||||
	afterSec := ""
 | 
					 | 
				
			||||||
	if rst.After > 0 {
 | 
					 | 
				
			||||||
		afterSec = fmt.Sprint("&after=", rst.After)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	beforeSec := ""
 | 
					 | 
				
			||||||
	if rst.Before > 0 {
 | 
					 | 
				
			||||||
		beforeSec = fmt.Sprint("&before=", rst.Before)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	limitSec := ""
 | 
					 | 
				
			||||||
	if len(rst.Limit) > 0 {
 | 
					 | 
				
			||||||
		limitSec = fmt.Sprint("&limit=", rst.Limit)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	link := "/api/v5/market/candles?instId=" + rst.InstId + "&bar=" + rst.Bar + limitSec + afterSec + beforeSec
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	fmt.Println("restLink: ", link)
 | 
					 | 
				
			||||||
	rsp, err := cr.v5PublicInvoke(link)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		fmt.Println("cr.v5PublicInvoke err:", err)
 | 
					 | 
				
			||||||
	} else {
 | 
					 | 
				
			||||||
		fmt.Println("cr.v5PublicInvoke result count:", len(rsp.Data))
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	cr.SaveCandle(rst.InstId, rst.Bar, rsp, rst.Duration, rst.WithWs)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func WriteLogProcess(cr *Core) {
 | 
					 | 
				
			||||||
	for {
 | 
					 | 
				
			||||||
		wg := <-cr.WriteLogChan
 | 
					 | 
				
			||||||
		go func(wg *WriteLog) {
 | 
					 | 
				
			||||||
			fmt.Println("start writelog: " + wg.Tag + " " + wg.Id)
 | 
					 | 
				
			||||||
			wg.Process(cr)
 | 
					 | 
				
			||||||
		}(wg)
 | 
					 | 
				
			||||||
		time.Sleep(50 * time.Millisecond)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (cr *Core) ShowSysTime() {
 | 
					 | 
				
			||||||
	rsp, _ := cr.RestInvoke("/api/v5/public/time", rest.GET)
 | 
					 | 
				
			||||||
	fmt.Println("serverSystem time:", rsp)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (core *Core) Init() {
 | 
					 | 
				
			||||||
	core.Env = os.Getenv("GO_ENV")
 | 
					 | 
				
			||||||
	gitBranch := os.Getenv("gitBranchName")
 | 
					 | 
				
			||||||
	commitID := os.Getenv("gitCommitID")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	fmt.Println("当前环境: ", core.Env)
 | 
					 | 
				
			||||||
	fmt.Println("gitBranch: ", gitBranch)
 | 
					 | 
				
			||||||
	fmt.Println("gitCommitID: ", commitID)
 | 
					 | 
				
			||||||
	cfg := MyConfig{}
 | 
					 | 
				
			||||||
	cfg, _ = cfg.Init()
 | 
					 | 
				
			||||||
	core.Cfg = &cfg
 | 
					 | 
				
			||||||
	cli, err := core.GetRedisCli()
 | 
					 | 
				
			||||||
	core.RedisCli = cli
 | 
					 | 
				
			||||||
	core.RestQueueChan = make(chan *RestQueue)
 | 
					 | 
				
			||||||
	core.WriteLogChan = make(chan *WriteLog)
 | 
					 | 
				
			||||||
	core.OrderChan = make(chan *private.Order)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		fmt.Println("init redis client err: ", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (core *Core) GetRedisCli() (*redis.Client, error) {
 | 
					 | 
				
			||||||
	ru := core.Cfg.RedisConf.Url
 | 
					 | 
				
			||||||
	rp := core.Cfg.RedisConf.Password
 | 
					 | 
				
			||||||
	ri := core.Cfg.RedisConf.Index
 | 
					 | 
				
			||||||
	re := os.Getenv("REDIS_URL")
 | 
					 | 
				
			||||||
	if len(re) > 0 {
 | 
					 | 
				
			||||||
		ru = re
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	client := redis.NewClient(&redis.Options{
 | 
					 | 
				
			||||||
		Addr:     ru,
 | 
					 | 
				
			||||||
		Password: rp, //默认空密码
 | 
					 | 
				
			||||||
		DB:       ri, //使用默认数据库
 | 
					 | 
				
			||||||
	})
 | 
					 | 
				
			||||||
	pong, err := client.Ping().Result()
 | 
					 | 
				
			||||||
	if pong == "PONG" && err == nil {
 | 
					 | 
				
			||||||
		return client, err
 | 
					 | 
				
			||||||
	} else {
 | 
					 | 
				
			||||||
		fmt.Println("redis状态不可用:", ru, rp, ri, err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return client, nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (core *Core) GetAllTickerInfo() (*rest.RESTAPIResult, error) {
 | 
					 | 
				
			||||||
	// GET / 获取所有产品行情信息
 | 
					 | 
				
			||||||
	rsp, err := core.RestInvoke("/api/v5/market/tickers?instType=SPOT", rest.GET)
 | 
					 | 
				
			||||||
	return rsp, err
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (core *Core) GetBalances() (*rest.RESTAPIResult, error) {
 | 
					 | 
				
			||||||
	// TODO 临时用了两个实现,restInvoke,复用原来的会有bug,不知道是谁的bug
 | 
					 | 
				
			||||||
	rsp, err := core.RestInvoke2("/api/v5/account/balance", rest.GET, nil)
 | 
					 | 
				
			||||||
	return rsp, err
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
func (core *Core) GetLivingOrderList() ([]*private.Order, error) {
 | 
					 | 
				
			||||||
	// TODO 临时用了两个实现,restInvoke,复用原来的会有bug,不知道是谁的bug
 | 
					 | 
				
			||||||
	params := make(map[string]interface{})
 | 
					 | 
				
			||||||
	data, err := core.RestInvoke2("/api/v5/trade/orders-pending", rest.GET, ¶ms)
 | 
					 | 
				
			||||||
	odrsp := private.OrderResp{}
 | 
					 | 
				
			||||||
	err = json.Unmarshal([]byte(data.Body), &odrsp)
 | 
					 | 
				
			||||||
	str, _ := json.Marshal(odrsp)
 | 
					 | 
				
			||||||
	fmt.Println("convert: err:", err, " body: ", data.Body, odrsp, " string:", string(str))
 | 
					 | 
				
			||||||
	list, err := odrsp.Convert()
 | 
					 | 
				
			||||||
	fmt.Println("loopLivingOrder response data:", str)
 | 
					 | 
				
			||||||
	fmt.Println(utils.GetFuncName(), " 当前数据是 ", data.V5Response.Code, " list len:", len(list))
 | 
					 | 
				
			||||||
	return list, err
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
func (core *Core) LoopInstrumentList() error {
 | 
					 | 
				
			||||||
	for {
 | 
					 | 
				
			||||||
		time.Sleep(3 * time.Second)
 | 
					 | 
				
			||||||
		ctype := ws.SPOT
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		redisCli := core.RedisCli
 | 
					 | 
				
			||||||
		counts, err := redisCli.HLen("instruments|" + ctype + "|hash").Result()
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			fmt.Println("err of hset to redis:", err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		if counts == 0 {
 | 
					 | 
				
			||||||
			continue
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		break
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
func (core *Core) SubscribeTicker(op string) error {
 | 
					 | 
				
			||||||
	mp := make(map[string]string)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	redisCli := core.RedisCli
 | 
					 | 
				
			||||||
	ctype := ws.SPOT
 | 
					 | 
				
			||||||
	mp, err := redisCli.HGetAll("instruments|" + ctype + "|hash").Result()
 | 
					 | 
				
			||||||
	b, err := json.Marshal(mp)
 | 
					 | 
				
			||||||
	js, err := simple.NewJson(b)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		fmt.Println("err of unMarshalJson3:", js)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	// fmt.Println("ticker js: ", js)
 | 
					 | 
				
			||||||
	instAry := js.MustMap()
 | 
					 | 
				
			||||||
	for k, v := range instAry {
 | 
					 | 
				
			||||||
		b = []byte(v.(string))
 | 
					 | 
				
			||||||
		_, err := simple.NewJson(b)
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			fmt.Println("err of unMarshalJson4:", js)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		time.Sleep(5 * time.Second)
 | 
					 | 
				
			||||||
		go func(instId string, op string) {
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			redisCli := core.RedisCli
 | 
					 | 
				
			||||||
			_, err = redisCli.SAdd("tickers|"+op+"|set", instId).Result()
 | 
					 | 
				
			||||||
			if err != nil {
 | 
					 | 
				
			||||||
				fmt.Println("err of unMarshalJson5:", js)
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}(k, op)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (core *Core) v5PublicInvoke(subUrl string) (*CandleData, error) {
 | 
					 | 
				
			||||||
	restUrl, _ := core.Cfg.Config.Get("connect").Get("restBaseUrl").String()
 | 
					 | 
				
			||||||
	url := restUrl + subUrl
 | 
					 | 
				
			||||||
	resp, err := http.Get(url)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return nil, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	defer resp.Body.Close()
 | 
					 | 
				
			||||||
	body, err := ioutil.ReadAll(resp.Body)
 | 
					 | 
				
			||||||
	var result CandleData
 | 
					 | 
				
			||||||
	if err := json.Unmarshal(body, &result); err != nil {
 | 
					 | 
				
			||||||
		return nil, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return &result, nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
func (core *Core) RestInvoke(subUrl string, method string) (*rest.RESTAPIResult, error) {
 | 
					 | 
				
			||||||
	restUrl, _ := core.Cfg.Config.Get("connect").Get("restBaseUrl").String()
 | 
					 | 
				
			||||||
	//ep, method, uri string, param *map[string]interface{}
 | 
					 | 
				
			||||||
	rest := rest.NewRESTAPI(restUrl, method, subUrl, nil)
 | 
					 | 
				
			||||||
	key, _ := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessKey").String()
 | 
					 | 
				
			||||||
	secure, _ := core.Cfg.Config.Get("credentialReadOnly").Get("secretKey").String()
 | 
					 | 
				
			||||||
	pass, _ := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessPassphrase").String()
 | 
					 | 
				
			||||||
	isDemo := false
 | 
					 | 
				
			||||||
	if core.Env == "demoEnv" {
 | 
					 | 
				
			||||||
		isDemo = true
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	rest.SetSimulate(isDemo).SetAPIKey(key, secure, pass)
 | 
					 | 
				
			||||||
	response, err := rest.Run(context.Background())
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		fmt.Println("restInvoke1 err:", subUrl, err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return response, err
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (core *Core) RestInvoke2(subUrl string, method string, param *map[string]interface{}) (*rest.RESTAPIResult, error) {
 | 
					 | 
				
			||||||
	key, err1 := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessKey").String()
 | 
					 | 
				
			||||||
	secret, err2 := core.Cfg.Config.Get("credentialReadOnly").Get("secretKey").String()
 | 
					 | 
				
			||||||
	pass, err3 := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessPassphrase").String()
 | 
					 | 
				
			||||||
	userId, err4 := core.Cfg.Config.Get("connect").Get("userId").String()
 | 
					 | 
				
			||||||
	restUrl, err5 := core.Cfg.Config.Get("connect").Get("restBaseUrl").String()
 | 
					 | 
				
			||||||
	if err1 != nil || err2 != nil || err3 != nil || err4 != nil || err5 != nil {
 | 
					 | 
				
			||||||
		fmt.Println(err1, err2, err3, err4, err5)
 | 
					 | 
				
			||||||
	} else {
 | 
					 | 
				
			||||||
		fmt.Println("key:", key, secret, pass, "userId:", userId, "restUrl: ", restUrl)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	// reqParam := make(map[string]interface{})
 | 
					 | 
				
			||||||
	// if param != nil {
 | 
					 | 
				
			||||||
	// reqParam = *param
 | 
					 | 
				
			||||||
	// }
 | 
					 | 
				
			||||||
	// rs := rest.NewRESTAPI(restUrl, method, subUrl, &reqParam)
 | 
					 | 
				
			||||||
	isDemo := false
 | 
					 | 
				
			||||||
	if core.Env == "demoEnv" {
 | 
					 | 
				
			||||||
		isDemo = true
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	// rs.SetSimulate(isDemo).SetAPIKey(key, secret, pass).SetUserId(userId)
 | 
					 | 
				
			||||||
	// response, err := rs.Run(context.Background())
 | 
					 | 
				
			||||||
	// if err != nil {
 | 
					 | 
				
			||||||
	// fmt.Println("restInvoke2 err:", subUrl, err)
 | 
					 | 
				
			||||||
	// }
 | 
					 | 
				
			||||||
	apikey := rest.APIKeyInfo{
 | 
					 | 
				
			||||||
		ApiKey:     key,
 | 
					 | 
				
			||||||
		SecKey:     secret,
 | 
					 | 
				
			||||||
		PassPhrase: pass,
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	cli := rest.NewRESTClient(restUrl, &apikey, isDemo)
 | 
					 | 
				
			||||||
	rsp, err := cli.Get(context.Background(), subUrl, param)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return rsp, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	fmt.Println("response:", rsp, err)
 | 
					 | 
				
			||||||
	return rsp, err
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (core *Core) RestPost(subUrl string, param *map[string]interface{}) (*rest.RESTAPIResult, error) {
 | 
					 | 
				
			||||||
	key, err1 := core.Cfg.Config.Get("credentialMutable").Get("okAccessKey").String()
 | 
					 | 
				
			||||||
	secret, err2 := core.Cfg.Config.Get("credentialMutable").Get("secretKey").String()
 | 
					 | 
				
			||||||
	pass, err3 := core.Cfg.Config.Get("credentialMutable").Get("okAccessPassphrase").String()
 | 
					 | 
				
			||||||
	userId, err4 := core.Cfg.Config.Get("connect").Get("userId").String()
 | 
					 | 
				
			||||||
	restUrl, err5 := core.Cfg.Config.Get("connect").Get("restBaseUrl").String()
 | 
					 | 
				
			||||||
	if err1 != nil || err2 != nil || err3 != nil || err4 != nil || err5 != nil {
 | 
					 | 
				
			||||||
		fmt.Println(err1, err2, err3, err4, err5)
 | 
					 | 
				
			||||||
	} else {
 | 
					 | 
				
			||||||
		fmt.Println("key:", key, secret, pass, "userId:", userId, "restUrl: ", restUrl)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	// 请求的另一种方式
 | 
					 | 
				
			||||||
	apikey := rest.APIKeyInfo{
 | 
					 | 
				
			||||||
		ApiKey:     key,
 | 
					 | 
				
			||||||
		SecKey:     secret,
 | 
					 | 
				
			||||||
		PassPhrase: pass,
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	isDemo := false
 | 
					 | 
				
			||||||
	if core.Env == "demoEnv" {
 | 
					 | 
				
			||||||
		isDemo = true
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	cli := rest.NewRESTClient(restUrl, &apikey, isDemo)
 | 
					 | 
				
			||||||
	rsp, err := cli.Post(context.Background(), subUrl, param)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return rsp, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return rsp, err
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// 我当前持有的币,每分钟刷新
 | 
					 | 
				
			||||||
func (core *Core) GetMyFavorList() []string {
 | 
					 | 
				
			||||||
	redisCli := core.RedisCli
 | 
					 | 
				
			||||||
	opt := redis.ZRangeBy{
 | 
					 | 
				
			||||||
		Min: "10",
 | 
					 | 
				
			||||||
		Max: "100000000000",
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	cary, _ := redisCli.ZRevRangeByScore("private|positions|sortedSet", opt).Result()
 | 
					 | 
				
			||||||
	cl := []string{}
 | 
					 | 
				
			||||||
	for _, v := range cary {
 | 
					 | 
				
			||||||
		cry := strings.Split(v, "|")[0] + "-USDT"
 | 
					 | 
				
			||||||
		cl = append(cl, cry)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return cary
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// 得到交易量排行榜,订阅其中前N名的各维度k线,并merge进来我已经购买的币列表,这个列表是动态更新的
 | 
					 | 
				
			||||||
// 改了,不需要交易排行榜,我手动指定一个排行即可, tickersVol|sortedSet 改成 tickersList|sortedSet
 | 
					 | 
				
			||||||
func (core *Core) GetScoreList(count int) []string {
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	redisCli := core.RedisCli
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	curList, err := redisCli.ZRange("tickersList|sortedSet", 0, int64(count-1)).Result()
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		fmt.Println("zrevrange err:", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	fmt.Println("curList: ", curList)
 | 
					 | 
				
			||||||
	return curList
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func LoopBalances(cr *Core, mdura time.Duration) {
 | 
					 | 
				
			||||||
	//协程:动态维护topScore
 | 
					 | 
				
			||||||
	ticker := time.NewTicker(mdura)
 | 
					 | 
				
			||||||
	for {
 | 
					 | 
				
			||||||
		select {
 | 
					 | 
				
			||||||
		case <-ticker.C:
 | 
					 | 
				
			||||||
			//协程:循环执行rest请求candle
 | 
					 | 
				
			||||||
			fmt.Println("loopBalance: receive ccyChannel start")
 | 
					 | 
				
			||||||
			RestBalances(cr)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
func LoopLivingOrders(cr *Core, mdura time.Duration) {
 | 
					 | 
				
			||||||
	//协程:动态维护topScore
 | 
					 | 
				
			||||||
	ticker := time.NewTicker(mdura)
 | 
					 | 
				
			||||||
	for {
 | 
					 | 
				
			||||||
		select {
 | 
					 | 
				
			||||||
		case <-ticker.C:
 | 
					 | 
				
			||||||
			//协程:循环执行rest请求candle
 | 
					 | 
				
			||||||
			fmt.Println("loopLivingOrder: receive ccyChannel start")
 | 
					 | 
				
			||||||
			RestLivingOrder(cr)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func RestBalances(cr *Core) ([]*private.Ccy, error) {
 | 
					 | 
				
			||||||
	// fmt.Println("restBalance loopBalance loop start")
 | 
					 | 
				
			||||||
	ccyList := []*private.Ccy{}
 | 
					 | 
				
			||||||
	rsp, err := cr.GetBalances()
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		fmt.Println("loopBalance err00: ", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	fmt.Println("loopBalance balance rsp: ", rsp)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		fmt.Println("loopBalance err01: ", err)
 | 
					 | 
				
			||||||
		return ccyList, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if len(rsp.Body) == 0 {
 | 
					 | 
				
			||||||
		fmt.Println("loopBalance err03: rsp body is null")
 | 
					 | 
				
			||||||
		return ccyList, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	js1, err := simple.NewJson([]byte(rsp.Body))
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		fmt.Println("loopBalance err1: ", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	itemList := js1.Get("data").GetIndex(0).Get("details").MustArray()
 | 
					 | 
				
			||||||
	// maxTickers是重点关注的topScore的coins的数量
 | 
					 | 
				
			||||||
	cli := cr.RedisCli
 | 
					 | 
				
			||||||
	for _, v := range itemList {
 | 
					 | 
				
			||||||
		js, _ := json.Marshal(v)
 | 
					 | 
				
			||||||
		ccyResp := private.CcyResp{}
 | 
					 | 
				
			||||||
		err := json.Unmarshal(js, &ccyResp)
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			fmt.Println("loopBalance err2: ", err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		ccy, err := ccyResp.Convert()
 | 
					 | 
				
			||||||
		ccyList = append(ccyList, ccy)
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			fmt.Println("loopBalance err2: ", err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		z := redis.Z{
 | 
					 | 
				
			||||||
			Score:  ccy.EqUsd,
 | 
					 | 
				
			||||||
			Member: ccy.Ccy + "|position|key",
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		res, err := cli.ZAdd("private|positions|sortedSet", z).Result()
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			fmt.Println("loopBalance err3: ", res, err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		res1, err := cli.Set(ccy.Ccy+"|position|key", js, 0).Result()
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			fmt.Println("loopBalance err4: ", res1, err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		bjs, _ := json.Marshal(ccy)
 | 
					 | 
				
			||||||
		tsi := time.Now().Unix()
 | 
					 | 
				
			||||||
		tsii := tsi - tsi%600
 | 
					 | 
				
			||||||
		tss := strconv.FormatInt(tsii, 10)
 | 
					 | 
				
			||||||
		cli.Set(CCYPOSISTIONS_PUBLISH+"|ts:"+tss, 1, 10*time.Minute).Result()
 | 
					 | 
				
			||||||
		fmt.Println("ccy published: ", string(bjs))
 | 
					 | 
				
			||||||
		//TODO FIXME 50毫秒,每分钟上限是1200个订单,超过就无法遍历完成
 | 
					 | 
				
			||||||
		time.Sleep(50 * time.Millisecond)
 | 
					 | 
				
			||||||
		suffix := ""
 | 
					 | 
				
			||||||
		if cr.Env == "demoEnv" {
 | 
					 | 
				
			||||||
			suffix = "-demoEnv"
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		// TODO FIXME cli2
 | 
					 | 
				
			||||||
		cli.Publish(CCYPOSISTIONS_PUBLISH+suffix, string(bjs)).Result()
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return ccyList, nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func RestLivingOrder(cr *Core) ([]*private.Order, error) {
 | 
					 | 
				
			||||||
	// fmt.Println("restOrder loopOrder loop start")
 | 
					 | 
				
			||||||
	orderList := []*private.Order{}
 | 
					 | 
				
			||||||
	list, err := cr.GetLivingOrderList()
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		fmt.Println("loopLivingOrder err00: ", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	fmt.Println("loopLivingOrder response:", list)
 | 
					 | 
				
			||||||
	go func() {
 | 
					 | 
				
			||||||
		for _, v := range list {
 | 
					 | 
				
			||||||
			fmt.Println("order orderV:", v)
 | 
					 | 
				
			||||||
			time.Sleep(30 * time.Millisecond)
 | 
					 | 
				
			||||||
			cr.OrderChan <- v
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}()
 | 
					 | 
				
			||||||
	return orderList, nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (cr *Core) ProcessOrder(od *private.Order) error {
 | 
					 | 
				
			||||||
	// publish
 | 
					 | 
				
			||||||
	go func() {
 | 
					 | 
				
			||||||
		suffix := ""
 | 
					 | 
				
			||||||
		if cr.Env == "demoEnv" {
 | 
					 | 
				
			||||||
			suffix = "-demoEnv"
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		cn := ORDER_PUBLISH + suffix
 | 
					 | 
				
			||||||
		bj, _ := json.Marshal(od)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		// TODO FIXME cli2
 | 
					 | 
				
			||||||
		res, _ := cr.RedisCli.Publish(cn, string(bj)).Result()
 | 
					 | 
				
			||||||
		fmt.Println("order publish res: ", res, " content: ", string(bj))
 | 
					 | 
				
			||||||
		rsch := ORDER_RESP_PUBLISH + suffix
 | 
					 | 
				
			||||||
		bj1, _ := json.Marshal(res)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		// TODO FIXME cli2
 | 
					 | 
				
			||||||
		res, _ = cr.RedisCli.Publish(rsch, string(bj1)).Result()
 | 
					 | 
				
			||||||
	}()
 | 
					 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (cr *Core) DispatchSubAction(action *SubAction) error {
 | 
					 | 
				
			||||||
	go func() {
 | 
					 | 
				
			||||||
		suffix := ""
 | 
					 | 
				
			||||||
		if cr.Env == "demoEnv" {
 | 
					 | 
				
			||||||
			suffix = "-demoEnv"
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		fmt.Println("action: ", action.ActionName, action.MetaInfo)
 | 
					 | 
				
			||||||
		res, err := cr.RestPostWrapper("/api/v5/trade/"+action.ActionName, action.MetaInfo)
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			fmt.Println(utils.GetFuncName(), " actionRes 1:", err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		rsch := ORDER_RESP_PUBLISH + suffix
 | 
					 | 
				
			||||||
		bj1, _ := json.Marshal(res)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		// TODO FIXME cli2
 | 
					 | 
				
			||||||
		rs, _ := cr.RedisCli.Publish(rsch, string(bj1)).Result()
 | 
					 | 
				
			||||||
		fmt.Println("action rs: ", rs)
 | 
					 | 
				
			||||||
	}()
 | 
					 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (cr *Core) RestPostWrapper(url string, param map[string]interface{}) (rest.Okexv5APIResponse, error) {
 | 
					 | 
				
			||||||
	suffix := ""
 | 
					 | 
				
			||||||
	if cr.Env == "demoEnv" {
 | 
					 | 
				
			||||||
		suffix = "-demoEnv"
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	res, err := cr.RestPost(url, ¶m)
 | 
					 | 
				
			||||||
	fmt.Println("actionRes 2:", res.V5Response.Msg, res.V5Response.Data, err)
 | 
					 | 
				
			||||||
	bj, _ := json.Marshal(res)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// TODO FIXME cli2
 | 
					 | 
				
			||||||
	cr.RedisCli.Publish(ORDER_RESP_PUBLISH+suffix, string(bj))
 | 
					 | 
				
			||||||
	return res.V5Response, nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (cr *Core) AddToGeneralCandleChnl(candle *Candle, channels []string) {
 | 
					 | 
				
			||||||
	redisCli := cr.RedisCli
 | 
					 | 
				
			||||||
	ab, _ := json.Marshal(candle)
 | 
					 | 
				
			||||||
	for _, v := range channels {
 | 
					 | 
				
			||||||
		suffix := ""
 | 
					 | 
				
			||||||
		env := os.Getenv("GO_ENV")
 | 
					 | 
				
			||||||
		if env == "demoEnv" {
 | 
					 | 
				
			||||||
			suffix = "-demoEnv"
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		vd := v + suffix
 | 
					 | 
				
			||||||
		_, err := redisCli.Publish(vd, string(ab)).Result()
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			fmt.Println("err of ma7|ma30 add to redis2:", err, candle.From)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
@ -1,70 +0,0 @@
 | 
				
			|||||||
package core
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
import (
 | 
					 | 
				
			||||||
	"fmt"
 | 
					 | 
				
			||||||
	"reflect"
 | 
					 | 
				
			||||||
	"strconv"
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
type TickerInfo struct {
 | 
					 | 
				
			||||||
	Id        string  `json:"_id"`
 | 
					 | 
				
			||||||
	InstId    string  `json:"instId"`
 | 
					 | 
				
			||||||
	Last      float64 `json:"last"`
 | 
					 | 
				
			||||||
	InstType  string  `json:"instType"`
 | 
					 | 
				
			||||||
	VolCcy24h float64 `json:"volCcy24h"`
 | 
					 | 
				
			||||||
	Ts        int64   `json:"ts"`
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
type TickerInfoResp struct {
 | 
					 | 
				
			||||||
	InstId    string `json:"instId"`
 | 
					 | 
				
			||||||
	Last      string `json:"last"`
 | 
					 | 
				
			||||||
	InstType  string `json:"instType"`
 | 
					 | 
				
			||||||
	VolCcy24h string `json:"volCcy24h"`
 | 
					 | 
				
			||||||
	Ts        string `json:"ts"`
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (tir *TickerInfoResp) Convert() TickerInfo {
 | 
					 | 
				
			||||||
	ti := TickerInfo{
 | 
					 | 
				
			||||||
		Id:        HashString(tir.InstId + tir.Ts),
 | 
					 | 
				
			||||||
		InstId:    tir.InstId,
 | 
					 | 
				
			||||||
		InstType:  tir.InstType,
 | 
					 | 
				
			||||||
		Last:      ToFloat64(tir.Last),
 | 
					 | 
				
			||||||
		VolCcy24h: ToFloat64(tir.VolCcy24h),
 | 
					 | 
				
			||||||
		Ts:        ToInt64(tir.Ts),
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return ti
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func ToString(val interface{}) string {
 | 
					 | 
				
			||||||
	valstr := ""
 | 
					 | 
				
			||||||
	if reflect.TypeOf(val).Name() == "string" {
 | 
					 | 
				
			||||||
		valstr = val.(string)
 | 
					 | 
				
			||||||
	} else if reflect.TypeOf(val).Name() == "float64" {
 | 
					 | 
				
			||||||
		valstr = fmt.Sprintf("%f", val)
 | 
					 | 
				
			||||||
	} else if reflect.TypeOf(val).Name() == "int64" {
 | 
					 | 
				
			||||||
		valstr = strconv.FormatInt(val.(int64), 16)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return valstr
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func ToInt64(val interface{}) int64 {
 | 
					 | 
				
			||||||
	vali := int64(0)
 | 
					 | 
				
			||||||
	if reflect.TypeOf(val).Name() == "string" {
 | 
					 | 
				
			||||||
		vali, _ = strconv.ParseInt(val.(string), 10, 64)
 | 
					 | 
				
			||||||
	} else if reflect.TypeOf(val).Name() == "float64" {
 | 
					 | 
				
			||||||
		vali = int64(val.(float64))
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return vali
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func ToFloat64(val interface{}) float64 {
 | 
					 | 
				
			||||||
	valf := float64(0)
 | 
					 | 
				
			||||||
	if reflect.TypeOf(val).Name() == "string" {
 | 
					 | 
				
			||||||
		valf, _ = strconv.ParseFloat(val.(string), 64)
 | 
					 | 
				
			||||||
	} else if reflect.TypeOf(val).Name() == "float64" {
 | 
					 | 
				
			||||||
		valf = val.(float64)
 | 
					 | 
				
			||||||
	} else if reflect.TypeOf(val).Name() == "int64" {
 | 
					 | 
				
			||||||
		valf = float64(val.(int64))
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return valf
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
@ -1,32 +0,0 @@
 | 
				
			|||||||
package core
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
import (
 | 
					 | 
				
			||||||
	"bytes"
 | 
					 | 
				
			||||||
	"fmt"
 | 
					 | 
				
			||||||
	"net/http"
 | 
					 | 
				
			||||||
	"os"
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	logrus "github.com/sirupsen/logrus"
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
type WriteLog struct {
 | 
					 | 
				
			||||||
	Content []byte
 | 
					 | 
				
			||||||
	Tag     string
 | 
					 | 
				
			||||||
	Id      string
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (wg *WriteLog) Process(cr *Core) error {
 | 
					 | 
				
			||||||
	go func() {
 | 
					 | 
				
			||||||
		reqBody := bytes.NewBuffer(wg.Content)
 | 
					 | 
				
			||||||
		cr.Env = os.Getenv("GO_ENV")
 | 
					 | 
				
			||||||
		cr.FluentBitUrl = os.Getenv("TEXUS_FluentBitUrl")
 | 
					 | 
				
			||||||
		fullUrl := "http://" + cr.FluentBitUrl + "/" + wg.Tag
 | 
					 | 
				
			||||||
		res, err := http.Post(fullUrl, "application/json", reqBody)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		fmt.Println("requested, response:", fullUrl, string(wg.Content), res)
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			logrus.Error(err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}()
 | 
					 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
							
								
								
									
										7
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										7
									
								
								go.mod
									
									
									
									
									
								
							@ -11,20 +11,21 @@ go 1.21
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
require (
 | 
					require (
 | 
				
			||||||
	github.com/bitly/go-simplejson v0.5.0
 | 
						github.com/bitly/go-simplejson v0.5.0
 | 
				
			||||||
	github.com/go-redis/redis v6.15.9+incompatible
 | 
						github.com/phyer/core v0.0.0-20241212012123-7c451a49d860
 | 
				
			||||||
	github.com/sirupsen/logrus v1.9.3
 | 
					 | 
				
			||||||
	v5sdk_go/rest v0.0.0-00010101000000-000000000000
 | 
						v5sdk_go/rest v0.0.0-00010101000000-000000000000
 | 
				
			||||||
	v5sdk_go/ws v0.0.0-00010101000000-000000000000
 | 
					 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
require (
 | 
					require (
 | 
				
			||||||
	github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
 | 
						github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
 | 
				
			||||||
 | 
						github.com/go-redis/redis v6.15.9+incompatible // indirect
 | 
				
			||||||
	github.com/gorilla/websocket v1.5.1 // indirect
 | 
						github.com/gorilla/websocket v1.5.1 // indirect
 | 
				
			||||||
	github.com/kr/pretty v0.3.0 // indirect
 | 
						github.com/kr/pretty v0.3.0 // indirect
 | 
				
			||||||
	github.com/onsi/ginkgo v1.16.4 // indirect
 | 
						github.com/onsi/ginkgo v1.16.4 // indirect
 | 
				
			||||||
	github.com/onsi/gomega v1.16.0 // indirect
 | 
						github.com/onsi/gomega v1.16.0 // indirect
 | 
				
			||||||
 | 
						github.com/sirupsen/logrus v1.9.3 // indirect
 | 
				
			||||||
	golang.org/x/net v0.17.0 // indirect
 | 
						golang.org/x/net v0.17.0 // indirect
 | 
				
			||||||
	golang.org/x/sys v0.13.0 // indirect
 | 
						golang.org/x/sys v0.13.0 // indirect
 | 
				
			||||||
	v5sdk_go/config v0.0.0-00010101000000-000000000000 // indirect
 | 
						v5sdk_go/config v0.0.0-00010101000000-000000000000 // indirect
 | 
				
			||||||
	v5sdk_go/utils v0.0.0-00010101000000-000000000000 // indirect
 | 
						v5sdk_go/utils v0.0.0-00010101000000-000000000000 // indirect
 | 
				
			||||||
 | 
						v5sdk_go/ws v0.0.0-00010101000000-000000000000 // indirect
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										2
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.sum
									
									
									
									
									
								
							@ -46,6 +46,8 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J
 | 
				
			|||||||
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
 | 
					github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
 | 
				
			||||||
github.com/onsi/gomega v1.16.0 h1:6gjqkI8iiRHMvdccRJM8rVKjCWk6ZIm6FTm3ddIe4/c=
 | 
					github.com/onsi/gomega v1.16.0 h1:6gjqkI8iiRHMvdccRJM8rVKjCWk6ZIm6FTm3ddIe4/c=
 | 
				
			||||||
github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
 | 
					github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
 | 
				
			||||||
 | 
					github.com/phyer/core v0.0.0-20241212012123-7c451a49d860 h1:BZvZ7g/dPudXwCB9QNq9vPZb2xDbfTPWHVqyDGXSNBk=
 | 
				
			||||||
 | 
					github.com/phyer/core v0.0.0-20241212012123-7c451a49d860/go.mod h1:LyfJrdqSlm2MTOx0M3pnDntpwa64XD5nf0xYxvZ4El4=
 | 
				
			||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 | 
					github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 | 
				
			||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 | 
					github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 | 
				
			||||||
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
 | 
					github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										2
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								main.go
									
									
									
									
									
								
							@ -13,7 +13,7 @@ import (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	simple "github.com/bitly/go-simplejson"
 | 
						simple "github.com/bitly/go-simplejson"
 | 
				
			||||||
	// "github.com/go-redis/redis"
 | 
						// "github.com/go-redis/redis"
 | 
				
			||||||
	"github.com/phyer/texus/core"
 | 
						"github.com/phyer/core"
 | 
				
			||||||
	//	"github.com/phyer/texus/private"
 | 
						//	"github.com/phyer/texus/private"
 | 
				
			||||||
	"github.com/phyer/texus/utils"
 | 
						"github.com/phyer/texus/utils"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user