From 7c451a49d860fe3eb1735b13b4054fa5ee85fbc2 Mon Sep 17 00:00:00 2001 From: zhangkun Date: Thu, 12 Dec 2024 09:21:23 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8A=8Acore=E7=9B=B8=E5=85=B3=E7=9A=84?= =?UTF-8?q?=E9=83=A8=E5=88=86=E5=8D=95=E7=8B=AC=E6=8A=BD=E7=A6=BB=E5=87=BA?= =?UTF-8?q?=E6=9D=A5=E5=81=9A=E6=88=90=E5=85=AC=E5=85=B1=E7=9A=84=E6=A8=A1?= =?UTF-8?q?=E5=9D=97w?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + .gitmodules | 3 + README.md | 13 ++ candle.go | 470 ++++++++++++++++++++++++++++++++++++++++++++++ config.go | 122 ++++++++++++ const.go | 12 ++ core.go | 532 ++++++++++++++++++++++++++++++++++++++++++++++++++++ go.mod | 31 +++ go.sum | 55 ++++++ ticker.go | 70 +++++++ writeLog.go | 32 ++++ 11 files changed, 1341 insertions(+) create mode 100644 .gitignore create mode 100644 .gitmodules create mode 100644 README.md create mode 100644 candle.go create mode 100644 config.go create mode 100644 const.go create mode 100644 core.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 ticker.go create mode 100644 writeLog.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..539b4cd --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +submodules/ diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..7a1bd20 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "submodule/okex"] + path = submodule/okex + url = baidu:/root/repos/go/okexV5Api diff --git a/README.md b/README.md new file mode 100644 index 0000000..b0434ac --- /dev/null +++ b/README.md @@ -0,0 +1,13 @@ + +## Init + +mkdir submodules && cd submodules +git submodule add baidu:/root/repos/go/okexV5Api okex +cd ../ +git pull +git submodule init +git submodule update --force --recursive --init --remote +go mod tidy +go mod vendor + + diff --git a/candle.go b/candle.go new file mode 100644 index 0000000..ec0270a --- /dev/null +++ b/candle.go @@ -0,0 +1,470 @@ +package core + +import ( + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "math/rand" + "os" + "strconv" + "strings" + "time" + // "v5sdk_go/rest" + + simple "github.com/bitly/go-simplejson" + "github.com/go-redis/redis" + "github.com/phyer/texus/utils" +) + +type Candle struct { + Id string `json:"_id"` + core *Core + InstId string + Period string + Data []interface{} + From string + Timestamp time.Time + Open float64 + High float64 + Low float64 + Close float64 + VolCcy float64 + Confirm bool +} + +type MaX struct { + Core *Core + InstId string + Period string + KeyName string + Count int + Ts int64 + Value float64 + Data []interface{} + From string +} + +type MatchCheck struct { + Minutes int64 + Matched bool +} + +func (mc *MatchCheck) SetMatched(value bool) { + mc.Matched = value +} + +func (core *Core) GetCandlesWithRest(instId string, kidx int, dura time.Duration, maxCandles int) error { + ary := []string{} + + wsary := core.Cfg.CandleDimentions + for k, v := range wsary { + matched := false + // 这个算法的目的是:越靠后的candles维度,被命中的概率越低,第一个百分之百命中,后面开始越来越低, 每分钟都会发生这样的计算, + // 因为维度多了的话,照顾不过来 + rand.New(rand.NewSource(time.Now().UnixNano())) + rand.Seed(time.Now().UnixNano()) + n := (k*2 + 2) * 3 + if n < 1 { + n = 1 + } + b := rand.Intn(n) + if b < 8 { + matched = true + } + if matched { + ary = append(ary, v) + } + } + + mdura := dura/(time.Duration(len(ary)+1)) - 50*time.Millisecond + // fmt.Println("loop4 Ticker Start instId, dura: ", instId, dura, dura/10, mdura, len(ary), " idx: ", kidx) + // time.Duration(len(ary)+1) + ticker := time.NewTicker(mdura) + done := make(chan bool) + idx := 0 + go func(i int) { + for { + select { + case <-ticker.C: + if i >= (len(ary)) { + done <- true + break + } + rand.Seed(time.Now().UnixNano()) + b := rand.Intn(2) + maxCandles = maxCandles * (i + b) * 2 + + if maxCandles < 3 { + maxCandles = 3 + } + if maxCandles > 30 { + maxCandles = 30 + } + mx := strconv.Itoa(maxCandles) + // fmt.Println("loop4 getCandlesWithRest, instId, period,limit,dura, t: ", instId, ary[i], mx, mdura) + go func(ii int) { + restQ := RestQueue{ + InstId: instId, + Bar: ary[ii], + Limit: mx, + Duration: mdura, + WithWs: true, + } + js, _ := json.Marshal(restQ) + core.RedisCli.LPush("restQueue", js) + }(i) + i++ + } + } + }(idx) + time.Sleep(dura - 10*time.Millisecond) + ticker.Stop() + // fmt.Println("loop4 Ticker stopped instId, dura: ", instId, dura, mdura) + done <- true + return nil +} + +// 当前的时间毫秒数 对于某个时间段,比如3分钟,10分钟,是否可以被整除, +func IsModOf(curInt int64, duration time.Duration) bool { + vol := int64(0) + if duration < 24*time.Hour { + // 小于1天 + vol = (curInt + 28800000) + } else if duration >= 24*time.Hour && duration < 48*time.Hour { + // 1天 + vol = curInt - 1633881600000 + } else if duration >= 48*time.Hour && duration < 72*time.Hour { + // 2天 + vol = curInt - 1633795200000 + } else if duration >= 72*time.Hour && duration < 120*time.Hour { + // 3天 + vol = curInt - 1633708800000 + } else if duration >= 120*time.Hour { + // 5天 + vol = curInt - 1633795200000 + } else { + // fmt.Println("noMatched:", curInt) + } + + mody := vol % duration.Milliseconds() + if mody == 0 { + return true + } + return false +} + +func (core *Core) SaveCandle(instId string, period string, rsp *CandleData, dura time.Duration, withWs bool) { + leng := len(rsp.Data) + for _, v := range rsp.Data { + candle := Candle{ + InstId: instId, + Period: period, + Data: v, + From: "rest", + } + //存到elasticSearch + candle.PushToWriteLogChan(core) + //保存rest得到的candle + // 发布到allCandles|publish, 给外部订阅者用于setToKey + arys := []string{ALLCANDLES_PUBLISH} + if withWs { + arys = append(arys, ALLCANDLES_INNER_PUBLISH) + } + // 如果candle都不需要存到redis,那么AddToGeneralCandleChnl也没有意义 + saveCandle := os.Getenv("TEXUS_SAVECANDLE") + if saveCandle == "true" { + candle.SetToKey(core) + core.AddToGeneralCandleChnl(&candle, arys) + time.Sleep(dura / time.Duration(leng)) + } + } +} + +func (candle *Candle) PushToWriteLogChan(cr *Core) error { + did := candle.InstId + candle.Period + candle.Data[0].(string) + candle.Id = HashString(did) + ncd, _ := candle.ToStruct(cr) + fmt.Println("ncd: ", ncd) + cd, _ := json.Marshal(ncd) + wg := WriteLog{ + Content: cd, + Tag: "sardine.log.candle." + candle.Period, + Id: candle.Id, + } + cr.WriteLogChan <- &wg + return nil +} + +func Daoxu(arr []interface{}) { + var temp interface{} + length := len(arr) + for i := 0; i < length/2; i++ { + temp = arr[i] + arr[i] = arr[length-1-i] + arr[length-1-i] = temp + } +} +func HashString(input string) string { + // 计算SHA-256哈希值 + hash := sha256.Sum256([]byte(input)) + // 转换为十六进制字符串 + hashHex := hex.EncodeToString(hash[:]) + // 返回前20位 + return hashHex[:23] +} + +func (cl *Candle) ToStruct(core *Core) (*Candle, error) { + // cl.Timestamp + ncd := Candle{} + ncd.Id = cl.Id + ncd.Period = cl.Period + ncd.InstId = cl.InstId + ncd.From = cl.From + + // 将字符串转换为 int64 类型的时间戳 + ts, err := strconv.ParseInt(cl.Data[0].(string), 10, 64) + if err != nil { + fmt.Println("Error parsing timestamp:", err) + return nil, err + } + ncd.Timestamp = time.Unix(ts/1000, (ts%1000)*1000000) // 纳秒级别 + op, err := strconv.ParseFloat(cl.Data[1].(string), 64) + if err != nil { + fmt.Println("Error parsing string to float64:", err) + return nil, err + } + ncd.Open = op + hi, err := strconv.ParseFloat(cl.Data[2].(string), 64) + if err != nil { + fmt.Println("Error parsing string to float64:", err) + return nil, err + } + ncd.High = hi + lo, err := strconv.ParseFloat(cl.Data[3].(string), 64) + if err != nil { + fmt.Println("Error parsing string to float64:", err) + return nil, err + } + ncd.Low = lo + clse, err := strconv.ParseFloat(cl.Data[4].(string), 64) + + if err != nil { + fmt.Println("Error parsing string to float64:", err) + return nil, err + } + ncd.Close = clse + ncd.VolCcy, err = strconv.ParseFloat(cl.Data[6].(string), 64) + if err != nil { + fmt.Println("Error parsing string to float64:", err) + return nil, err + } + if cl.Data[6].(string) == "1" { + ncd.Confirm = true + } else { + ncd.Confirm = false + } + return &ncd, nil +} + +func (mx *MaX) SetToKey() ([]interface{}, error) { + cstr := strconv.Itoa(mx.Count) + tss := strconv.FormatInt(mx.Ts, 10) + keyName := "ma" + cstr + "|candle" + mx.Period + "|" + mx.InstId + "|ts:" + tss + //过期时间:根号(当前candle的周期/1分钟)*10000 + dt := []interface{}{} + dt = append(dt, mx.Ts) + dt = append(dt, mx.Value) + dj, _ := json.Marshal(dt) + exp := mx.Core.PeriodToMinutes(mx.Period) + expf := utils.Sqrt(float64(exp)) * 100 + extt := time.Duration(expf) * time.Minute + // loc, _ := time.LoadLocation("Asia/Shanghai") + // tm := time.UnixMilli(mx.Ts).In(loc).Format("2006-01-02 15:04") + // fmt.Println("setToKey:", keyName, "ts:", tm, string(dj), "from: ", mx.From) + _, err := mx.Core.RedisCli.GetSet(keyName, dj).Result() + mx.Core.RedisCli.Expire(keyName, extt) + return dt, err +} + +// 保证同一个 period, keyName ,在一个周期里,SaveToSortSet只会被执行一次 +func (core *Core) SaveUniKey(period string, keyName string, extt time.Duration, tsi int64, cl *Candle) { + + refName := keyName + "|refer" + refRes, _ := core.RedisCli.GetSet(refName, 1).Result() + core.RedisCli.Expire(refName, extt) + // 为保证唯一性机制,防止SaveToSortSet 被重复执行 + if len(refRes) != 0 { + fmt.Println("refName exist: ", refName) + return + } + + core.SaveToSortSet(period, keyName, extt, tsi) +} + +// tsi: 上报时间timeStamp millinSecond +func (core *Core) SaveToSortSet(period string, keyName string, extt time.Duration, tsi int64) { + ary := strings.Split(keyName, "ts:") + setName := ary[0] + "sortedSet" + z := redis.Z{ + Score: float64(tsi), + Member: keyName, + } + rs, err := core.RedisCli.ZAdd(setName, z).Result() + if err != nil { + fmt.Println("err of ma7|ma30 add to redis:", err) + } else { + fmt.Println("sortedSet added to redis:", rs, keyName) + } +} + +func (cr *Core) PeriodToMinutes(period string) int64 { + ary := strings.Split(period, "") + beiStr := "1" + danwei := "" + if len(ary) == 3 { + beiStr = ary[0] + ary[1] + danwei = ary[2] + } else { + beiStr = ary[0] + danwei = ary[1] + } + cheng := 1 + bei, _ := strconv.Atoi(beiStr) + switch danwei { + case "m": + { + cheng = bei + break + } + case "H": + { + cheng = bei * 60 + break + } + case "D": + { + cheng = bei * 60 * 24 + break + } + case "W": + { + cheng = bei * 60 * 24 * 7 + break + } + case "M": + { + cheng = bei * 60 * 24 * 30 + break + } + case "Y": + { + cheng = bei * 60 * 24 * 365 + break + } + default: + { + fmt.Println("notmatch:", danwei) + } + } + return int64(cheng) +} + +// type ScanCmd struct { +// baseCmd +// +// page []string +// cursor uint64 +// +// process func(cmd Cmder) error +// } +func (core *Core) GetRangeKeyList(pattern string, from time.Time) ([]*simple.Json, error) { + // 比如,用来计算ma30或ma7,倒推多少时间范围, + redisCli := core.RedisCli + cursor := uint64(0) + n := 0 + allTs := []int64{} + var keys []string + for { + var err error + keys, cursor, _ = redisCli.Scan(cursor, pattern+"*", 2000).Result() + if err != nil { + panic(err) + } + n += len(keys) + if n == 0 { + break + } + } + + // keys, _ := redisCli.Keys(pattern + "*").Result() + for _, key := range keys { + keyAry := strings.Split(key, ":") + key = keyAry[1] + keyi64, _ := strconv.ParseInt(key, 10, 64) + allTs = append(allTs, keyi64) + } + nary := utils.RecursiveBubble(allTs, len(allTs)) + tt := from.UnixMilli() + ff := tt - tt%60000 + fi := int64(ff) + mary := []int64{} + for _, v := range nary { + if v < fi { + break + } + mary = append(mary, v) + } + res := []*simple.Json{} + for _, v := range mary { + // if k > 1 { + // break + // } + nv := pattern + strconv.FormatInt(v, 10) + str, err := redisCli.Get(nv).Result() + if err != nil { + fmt.Println("err of redis get key:", nv, err) + } + cur, err := simple.NewJson([]byte(str)) + if err != nil { + fmt.Println("err of create newJson:", str, err) + } + res = append(res, cur) + } + + return res, nil +} + +func (cl *Candle) SetToKey(core *Core) ([]interface{}, error) { + data := cl.Data + tsi, err := strconv.ParseInt(data[0].(string), 10, 64) + tss := strconv.FormatInt(tsi, 10) + keyName := "candle" + cl.Period + "|" + cl.InstId + "|ts:" + tss + //过期时间:根号(当前candle的周期/1分钟)*10000 + + dt, err := json.Marshal(cl.Data) + exp := core.PeriodToMinutes(cl.Period) + // expf := float64(exp) * 60 + expf := utils.Sqrt(float64(exp)) * 100 + extt := time.Duration(expf) * time.Minute + curVolstr, _ := data[5].(string) + curVol, err := strconv.ParseFloat(curVolstr, 64) + if err != nil { + fmt.Println("err of convert ts:", err) + } + curVolCcystr, _ := data[6].(string) + curVolCcy, err := strconv.ParseFloat(curVolCcystr, 64) + curPrice := curVolCcy / curVol + if curPrice <= 0 { + fmt.Println("price有问题", curPrice, "dt: ", string(dt), "from:", cl.From) + err = errors.New("price有问题") + return cl.Data, err + } + redisCli := core.RedisCli + // tm := time.UnixMilli(tsi).Format("2006-01-02 15:04") + fmt.Println("setToKey:", keyName, "ts: ", "price: ", curPrice, "from:", cl.From) + redisCli.Set(keyName, dt, extt).Result() + core.SaveUniKey(cl.Period, keyName, extt, tsi, cl) + return cl.Data, err +} diff --git a/config.go b/config.go new file mode 100644 index 0000000..62a7d4b --- /dev/null +++ b/config.go @@ -0,0 +1,122 @@ +package core + +import ( + "fmt" + "io/ioutil" + "log" + "os" + "strings" + + simple "github.com/bitly/go-simplejson" +) + +type MyConfig struct { + Env string `json:"env", string` + Config *simple.Json + CandleDimentions []string `json:"candleDimentions"` + RedisConf *RedisConfig `json:"redis"` + CredentialReadOnlyConf *CredentialConfig `json:"credential"` + CredentialMutableConf *CredentialConfig `json:"credential"` + ConnectConf *ConnectConfig `json:"connect"` + // ThreadsConf *ThreadsConfig `json:"threads"` +} + +type RedisConfig struct { + Url string `json:"url", string` + Password string `json:"password", string` + Index int `json:"index", string` +} + +type CredentialConfig struct { + SecretKey string `json:"secretKey", string` + BaseUrl string `json:"baseUrl", string` + OkAccessKey string `json:"okAccessKey", string` + OkAccessPassphrase string `json:"okAccessPassphrase", string` +} + +type ConnectConfig struct { + LoginSubUrl string `json:"loginSubUrl", string` + WsPrivateBaseUrl string `json:"wsPrivateBaseUrl", string` + WsPublicBaseUrl string `json:"wsPublicBaseUrl", string` + RestBaseUrl string `json:"restBaseUrl", string` +} +type ThreadsConfig struct { + MaxLenTickerStream int `json:"maxLenTickerStream", int` + MaxCandles int `json:"maxCandles", string` + AsyncChannels int `json:"asyncChannels", int` + MaxTickers int `json:"maxTickers", int` + RestPeriod int `json:"restPeriod", int` + WaitWs int `json:"waitWs", int` +} + +func (cfg MyConfig) Init() (MyConfig, error) { + env := os.Getenv("GO_ENV") + arystr := os.Getenv("TUNAS_CANDLESDIMENTIONS") + ary := strings.Split(arystr, "|") + cfg.CandleDimentions = ary + jsonStr, err := ioutil.ReadFile("/go/json/basicConfig.json") + if err != nil { + jsonStr, err = ioutil.ReadFile("configs/basicConfig.json") + if err != nil { + fmt.Println("err2:", err.Error()) + return cfg, err + } + cfg.Config, err = simple.NewJson([]byte(jsonStr)) + if err != nil { + fmt.Println("err2:", err.Error()) + return cfg, err + } + cfg.Env = env + } + cfg.Config = cfg.Config.Get(env) + + ru, err := cfg.Config.Get("redis").Get("url").String() + rp, _ := cfg.Config.Get("redis").Get("password").String() + ri, _ := cfg.Config.Get("redis").Get("index").Int() + redisConf := RedisConfig{ + Url: ru, + Password: rp, + Index: ri, + } + // fmt.Println("cfg: ", cfg) + cfg.RedisConf = &redisConf + ls, _ := cfg.Config.Get("connect").Get("loginSubUrl").String() + wsPub, _ := cfg.Config.Get("connect").Get("wsPrivateBaseUrl").String() + wsPri, _ := cfg.Config.Get("connect").Get("wsPublicBaseUrl").String() + restBu, _ := cfg.Config.Get("connect").Get("restBaseUrl").String() + connectConfig := ConnectConfig{ + LoginSubUrl: ls, + WsPublicBaseUrl: wsPub, + WsPrivateBaseUrl: wsPri, + RestBaseUrl: restBu, + } + cfg.ConnectConf = &connectConfig + return cfg, nil +} + +func (cfg *MyConfig) GetConfigJson(arr []string) *simple.Json { + env := os.Getenv("GO_ENV") + fmt.Println("env: ", env) + cfg.Env = env + + json, err := ioutil.ReadFile("/go/json/basicConfig.json") + + if err != nil { + json, err = ioutil.ReadFile("configs/basicConfig.json") + if err != nil { + log.Panic("read config error: ", err.Error()) + } + } + if err != nil { + fmt.Println("read file err: ", err) + } + rjson, err := simple.NewJson(json) + if err != nil { + fmt.Println("newJson err: ", err) + } + for _, s := range arr { + rjson = rjson.Get(s) + // fmt.Println(s, ": ", rjson) + } + return rjson +} diff --git a/const.go b/const.go new file mode 100644 index 0000000..4612b45 --- /dev/null +++ b/const.go @@ -0,0 +1,12 @@ +package core + +const MAIN_ALLCOINS_PERIOD_MINUTES = 1 +const MAIN_ALLCOINS_ONCE_COUNTS = 3 +const MAIN_ALLCOINS_BAR_PERIOD = "3m" +const ALLCANDLES_PUBLISH = "allCandles|publish" +const ALLCANDLES_INNER_PUBLISH = "allCandlesInner|publish" +const ORDER_PUBLISH = "private|order|publish" +const TICKERINFO_PUBLISH = "tickerInfo|publish" +const CCYPOSISTIONS_PUBLISH = "ccyPositions|publish" +const SUBACTIONS_PUBLISH = "subActions|publish" +const ORDER_RESP_PUBLISH = "private|actionResp|publish" diff --git a/core.go b/core.go new file mode 100644 index 0000000..5c567bf --- /dev/null +++ b/core.go @@ -0,0 +1,532 @@ +package core + +import ( + "context" + "encoding/json" + // "errors" + "fmt" + // "math/rand" + "io/ioutil" + "net/http" + "os" + "strconv" + "strings" + "sync" + "time" + "v5sdk_go/rest" + "v5sdk_go/ws" + // "v5sdk_go/ws/wImpl" + + simple "github.com/bitly/go-simplejson" + "github.com/go-redis/redis" + "github.com/phyer/texus/private" + "github.com/phyer/texus/utils" +) + +type Core struct { + Env string + Cfg *MyConfig + RedisCli *redis.Client + RedisCli2 *redis.Client + FluentBitUrl string + Wg sync.WaitGroup + RestQueueChan chan *RestQueue + OrderChan chan *private.Order + WriteLogChan chan *WriteLog +} +type RestQueue struct { + InstId string + Bar string + After int64 + Before int64 + Limit string + Duration time.Duration + WithWs bool +} +type CandleData struct { + Code string `json:"code"` + Msg string `json:"msg"` + Data [][]interface{} `json:"data"` // 用二维数组来接收 candles 数据 +} +type SubAction struct { + ActionName string + ForAll bool + MetaInfo map[string]interface{} +} + +func (rst *RestQueue) Show(cr *Core) { + fmt.Println("restQueue:", rst.InstId, rst.Bar, rst.Limit) +} + +func (rst *RestQueue) Save(cr *Core) { + afterSec := "" + if rst.After > 0 { + afterSec = fmt.Sprint("&after=", rst.After) + } + beforeSec := "" + if rst.Before > 0 { + beforeSec = fmt.Sprint("&before=", rst.Before) + } + limitSec := "" + if len(rst.Limit) > 0 { + limitSec = fmt.Sprint("&limit=", rst.Limit) + } + link := "/api/v5/market/candles?instId=" + rst.InstId + "&bar=" + rst.Bar + limitSec + afterSec + beforeSec + + fmt.Println("restLink: ", link) + rsp, err := cr.v5PublicInvoke(link) + if err != nil { + fmt.Println("cr.v5PublicInvoke err:", err) + } else { + fmt.Println("cr.v5PublicInvoke result count:", len(rsp.Data)) + } + cr.SaveCandle(rst.InstId, rst.Bar, rsp, rst.Duration, rst.WithWs) +} + +func WriteLogProcess(cr *Core) { + for { + wg := <-cr.WriteLogChan + go func(wg *WriteLog) { + fmt.Println("start writelog: " + wg.Tag + " " + wg.Id) + wg.Process(cr) + }(wg) + time.Sleep(50 * time.Millisecond) + } +} + +func (cr *Core) ShowSysTime() { + rsp, _ := cr.RestInvoke("/api/v5/public/time", rest.GET) + fmt.Println("serverSystem time:", rsp) +} + +func (core *Core) Init() { + core.Env = os.Getenv("GO_ENV") + gitBranch := os.Getenv("gitBranchName") + commitID := os.Getenv("gitCommitID") + + fmt.Println("当前环境: ", core.Env) + fmt.Println("gitBranch: ", gitBranch) + fmt.Println("gitCommitID: ", commitID) + cfg := MyConfig{} + cfg, _ = cfg.Init() + core.Cfg = &cfg + cli, err := core.GetRedisCli() + core.RedisCli = cli + core.RestQueueChan = make(chan *RestQueue) + core.WriteLogChan = make(chan *WriteLog) + core.OrderChan = make(chan *private.Order) + if err != nil { + fmt.Println("init redis client err: ", err) + } +} + +func (core *Core) GetRedisCli() (*redis.Client, error) { + ru := core.Cfg.RedisConf.Url + rp := core.Cfg.RedisConf.Password + ri := core.Cfg.RedisConf.Index + re := os.Getenv("REDIS_URL") + if len(re) > 0 { + ru = re + } + client := redis.NewClient(&redis.Options{ + Addr: ru, + Password: rp, //默认空密码 + DB: ri, //使用默认数据库 + }) + pong, err := client.Ping().Result() + if pong == "PONG" && err == nil { + return client, err + } else { + fmt.Println("redis状态不可用:", ru, rp, ri, err) + } + + return client, nil +} + +func (core *Core) GetAllTickerInfo() (*rest.RESTAPIResult, error) { + // GET / 获取所有产品行情信息 + rsp, err := core.RestInvoke("/api/v5/market/tickers?instType=SPOT", rest.GET) + return rsp, err +} + +func (core *Core) GetBalances() (*rest.RESTAPIResult, error) { + // TODO 临时用了两个实现,restInvoke,复用原来的会有bug,不知道是谁的bug + rsp, err := core.RestInvoke2("/api/v5/account/balance", rest.GET, nil) + return rsp, err +} +func (core *Core) GetLivingOrderList() ([]*private.Order, error) { + // TODO 临时用了两个实现,restInvoke,复用原来的会有bug,不知道是谁的bug + params := make(map[string]interface{}) + data, err := core.RestInvoke2("/api/v5/trade/orders-pending", rest.GET, ¶ms) + odrsp := private.OrderResp{} + err = json.Unmarshal([]byte(data.Body), &odrsp) + str, _ := json.Marshal(odrsp) + fmt.Println("convert: err:", err, " body: ", data.Body, odrsp, " string:", string(str)) + list, err := odrsp.Convert() + fmt.Println("loopLivingOrder response data:", str) + fmt.Println(utils.GetFuncName(), " 当前数据是 ", data.V5Response.Code, " list len:", len(list)) + return list, err +} +func (core *Core) LoopInstrumentList() error { + for { + time.Sleep(3 * time.Second) + ctype := ws.SPOT + + redisCli := core.RedisCli + counts, err := redisCli.HLen("instruments|" + ctype + "|hash").Result() + if err != nil { + fmt.Println("err of hset to redis:", err) + } + if counts == 0 { + continue + } + break + } + return nil +} +func (core *Core) SubscribeTicker(op string) error { + mp := make(map[string]string) + + redisCli := core.RedisCli + ctype := ws.SPOT + mp, err := redisCli.HGetAll("instruments|" + ctype + "|hash").Result() + b, err := json.Marshal(mp) + js, err := simple.NewJson(b) + if err != nil { + fmt.Println("err of unMarshalJson3:", js) + } + // fmt.Println("ticker js: ", js) + instAry := js.MustMap() + for k, v := range instAry { + b = []byte(v.(string)) + _, err := simple.NewJson(b) + if err != nil { + fmt.Println("err of unMarshalJson4:", js) + } + time.Sleep(5 * time.Second) + go func(instId string, op string) { + + redisCli := core.RedisCli + _, err = redisCli.SAdd("tickers|"+op+"|set", instId).Result() + if err != nil { + fmt.Println("err of unMarshalJson5:", js) + } + }(k, op) + } + return nil +} + +func (core *Core) v5PublicInvoke(subUrl string) (*CandleData, error) { + restUrl, _ := core.Cfg.Config.Get("connect").Get("restBaseUrl").String() + url := restUrl + subUrl + resp, err := http.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + var result CandleData + if err := json.Unmarshal(body, &result); err != nil { + return nil, err + } + + return &result, nil +} +func (core *Core) RestInvoke(subUrl string, method string) (*rest.RESTAPIResult, error) { + restUrl, _ := core.Cfg.Config.Get("connect").Get("restBaseUrl").String() + //ep, method, uri string, param *map[string]interface{} + rest := rest.NewRESTAPI(restUrl, method, subUrl, nil) + key, _ := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessKey").String() + secure, _ := core.Cfg.Config.Get("credentialReadOnly").Get("secretKey").String() + pass, _ := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessPassphrase").String() + isDemo := false + if core.Env == "demoEnv" { + isDemo = true + } + rest.SetSimulate(isDemo).SetAPIKey(key, secure, pass) + response, err := rest.Run(context.Background()) + if err != nil { + fmt.Println("restInvoke1 err:", subUrl, err) + } + return response, err +} + +func (core *Core) RestInvoke2(subUrl string, method string, param *map[string]interface{}) (*rest.RESTAPIResult, error) { + key, err1 := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessKey").String() + secret, err2 := core.Cfg.Config.Get("credentialReadOnly").Get("secretKey").String() + pass, err3 := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessPassphrase").String() + userId, err4 := core.Cfg.Config.Get("connect").Get("userId").String() + restUrl, err5 := core.Cfg.Config.Get("connect").Get("restBaseUrl").String() + if err1 != nil || err2 != nil || err3 != nil || err4 != nil || err5 != nil { + fmt.Println(err1, err2, err3, err4, err5) + } else { + fmt.Println("key:", key, secret, pass, "userId:", userId, "restUrl: ", restUrl) + } + // reqParam := make(map[string]interface{}) + // if param != nil { + // reqParam = *param + // } + // rs := rest.NewRESTAPI(restUrl, method, subUrl, &reqParam) + isDemo := false + if core.Env == "demoEnv" { + isDemo = true + } + // rs.SetSimulate(isDemo).SetAPIKey(key, secret, pass).SetUserId(userId) + // response, err := rs.Run(context.Background()) + // if err != nil { + // fmt.Println("restInvoke2 err:", subUrl, err) + // } + apikey := rest.APIKeyInfo{ + ApiKey: key, + SecKey: secret, + PassPhrase: pass, + } + cli := rest.NewRESTClient(restUrl, &apikey, isDemo) + rsp, err := cli.Get(context.Background(), subUrl, param) + if err != nil { + return rsp, err + } + fmt.Println("response:", rsp, err) + return rsp, err +} + +func (core *Core) RestPost(subUrl string, param *map[string]interface{}) (*rest.RESTAPIResult, error) { + key, err1 := core.Cfg.Config.Get("credentialMutable").Get("okAccessKey").String() + secret, err2 := core.Cfg.Config.Get("credentialMutable").Get("secretKey").String() + pass, err3 := core.Cfg.Config.Get("credentialMutable").Get("okAccessPassphrase").String() + userId, err4 := core.Cfg.Config.Get("connect").Get("userId").String() + restUrl, err5 := core.Cfg.Config.Get("connect").Get("restBaseUrl").String() + if err1 != nil || err2 != nil || err3 != nil || err4 != nil || err5 != nil { + fmt.Println(err1, err2, err3, err4, err5) + } else { + fmt.Println("key:", key, secret, pass, "userId:", userId, "restUrl: ", restUrl) + } + // 请求的另一种方式 + apikey := rest.APIKeyInfo{ + ApiKey: key, + SecKey: secret, + PassPhrase: pass, + } + isDemo := false + if core.Env == "demoEnv" { + isDemo = true + } + cli := rest.NewRESTClient(restUrl, &apikey, isDemo) + rsp, err := cli.Post(context.Background(), subUrl, param) + if err != nil { + return rsp, err + } + return rsp, err +} + +// 我当前持有的币,每分钟刷新 +func (core *Core) GetMyFavorList() []string { + redisCli := core.RedisCli + opt := redis.ZRangeBy{ + Min: "10", + Max: "100000000000", + } + cary, _ := redisCli.ZRevRangeByScore("private|positions|sortedSet", opt).Result() + cl := []string{} + for _, v := range cary { + cry := strings.Split(v, "|")[0] + "-USDT" + cl = append(cl, cry) + } + return cary +} + +// 得到交易量排行榜,订阅其中前N名的各维度k线,并merge进来我已经购买的币列表,这个列表是动态更新的 +// 改了,不需要交易排行榜,我手动指定一个排行即可, tickersVol|sortedSet 改成 tickersList|sortedSet +func (core *Core) GetScoreList(count int) []string { + + redisCli := core.RedisCli + + curList, err := redisCli.ZRange("tickersList|sortedSet", 0, int64(count-1)).Result() + if err != nil { + fmt.Println("zrevrange err:", err) + } + fmt.Println("curList: ", curList) + return curList +} + +func LoopBalances(cr *Core, mdura time.Duration) { + //协程:动态维护topScore + ticker := time.NewTicker(mdura) + for { + select { + case <-ticker.C: + //协程:循环执行rest请求candle + fmt.Println("loopBalance: receive ccyChannel start") + RestBalances(cr) + } + } +} +func LoopLivingOrders(cr *Core, mdura time.Duration) { + //协程:动态维护topScore + ticker := time.NewTicker(mdura) + for { + select { + case <-ticker.C: + //协程:循环执行rest请求candle + fmt.Println("loopLivingOrder: receive ccyChannel start") + RestLivingOrder(cr) + } + } +} + +func RestBalances(cr *Core) ([]*private.Ccy, error) { + // fmt.Println("restBalance loopBalance loop start") + ccyList := []*private.Ccy{} + rsp, err := cr.GetBalances() + if err != nil { + fmt.Println("loopBalance err00: ", err) + } + fmt.Println("loopBalance balance rsp: ", rsp) + if err != nil { + fmt.Println("loopBalance err01: ", err) + return ccyList, err + } + if len(rsp.Body) == 0 { + fmt.Println("loopBalance err03: rsp body is null") + return ccyList, err + } + js1, err := simple.NewJson([]byte(rsp.Body)) + if err != nil { + fmt.Println("loopBalance err1: ", err) + } + itemList := js1.Get("data").GetIndex(0).Get("details").MustArray() + // maxTickers是重点关注的topScore的coins的数量 + cli := cr.RedisCli + for _, v := range itemList { + js, _ := json.Marshal(v) + ccyResp := private.CcyResp{} + err := json.Unmarshal(js, &ccyResp) + if err != nil { + fmt.Println("loopBalance err2: ", err) + } + ccy, err := ccyResp.Convert() + ccyList = append(ccyList, ccy) + if err != nil { + fmt.Println("loopBalance err2: ", err) + } + z := redis.Z{ + Score: ccy.EqUsd, + Member: ccy.Ccy + "|position|key", + } + res, err := cli.ZAdd("private|positions|sortedSet", z).Result() + if err != nil { + fmt.Println("loopBalance err3: ", res, err) + } + res1, err := cli.Set(ccy.Ccy+"|position|key", js, 0).Result() + if err != nil { + fmt.Println("loopBalance err4: ", res1, err) + } + bjs, _ := json.Marshal(ccy) + tsi := time.Now().Unix() + tsii := tsi - tsi%600 + tss := strconv.FormatInt(tsii, 10) + cli.Set(CCYPOSISTIONS_PUBLISH+"|ts:"+tss, 1, 10*time.Minute).Result() + fmt.Println("ccy published: ", string(bjs)) + //TODO FIXME 50毫秒,每分钟上限是1200个订单,超过就无法遍历完成 + time.Sleep(50 * time.Millisecond) + suffix := "" + if cr.Env == "demoEnv" { + suffix = "-demoEnv" + } + // TODO FIXME cli2 + cli.Publish(CCYPOSISTIONS_PUBLISH+suffix, string(bjs)).Result() + } + return ccyList, nil +} + +func RestLivingOrder(cr *Core) ([]*private.Order, error) { + // fmt.Println("restOrder loopOrder loop start") + orderList := []*private.Order{} + list, err := cr.GetLivingOrderList() + if err != nil { + fmt.Println("loopLivingOrder err00: ", err) + } + fmt.Println("loopLivingOrder response:", list) + go func() { + for _, v := range list { + fmt.Println("order orderV:", v) + time.Sleep(30 * time.Millisecond) + cr.OrderChan <- v + } + }() + return orderList, nil +} + +func (cr *Core) ProcessOrder(od *private.Order) error { + // publish + go func() { + suffix := "" + if cr.Env == "demoEnv" { + suffix = "-demoEnv" + } + cn := ORDER_PUBLISH + suffix + bj, _ := json.Marshal(od) + + // TODO FIXME cli2 + res, _ := cr.RedisCli.Publish(cn, string(bj)).Result() + fmt.Println("order publish res: ", res, " content: ", string(bj)) + rsch := ORDER_RESP_PUBLISH + suffix + bj1, _ := json.Marshal(res) + + // TODO FIXME cli2 + res, _ = cr.RedisCli.Publish(rsch, string(bj1)).Result() + }() + return nil +} + +func (cr *Core) DispatchSubAction(action *SubAction) error { + go func() { + suffix := "" + if cr.Env == "demoEnv" { + suffix = "-demoEnv" + } + fmt.Println("action: ", action.ActionName, action.MetaInfo) + res, err := cr.RestPostWrapper("/api/v5/trade/"+action.ActionName, action.MetaInfo) + if err != nil { + fmt.Println(utils.GetFuncName(), " actionRes 1:", err) + } + rsch := ORDER_RESP_PUBLISH + suffix + bj1, _ := json.Marshal(res) + + // TODO FIXME cli2 + rs, _ := cr.RedisCli.Publish(rsch, string(bj1)).Result() + fmt.Println("action rs: ", rs) + }() + return nil +} + +func (cr *Core) RestPostWrapper(url string, param map[string]interface{}) (rest.Okexv5APIResponse, error) { + suffix := "" + if cr.Env == "demoEnv" { + suffix = "-demoEnv" + } + res, err := cr.RestPost(url, ¶m) + fmt.Println("actionRes 2:", res.V5Response.Msg, res.V5Response.Data, err) + bj, _ := json.Marshal(res) + + // TODO FIXME cli2 + cr.RedisCli.Publish(ORDER_RESP_PUBLISH+suffix, string(bj)) + return res.V5Response, nil +} + +func (cr *Core) AddToGeneralCandleChnl(candle *Candle, channels []string) { + redisCli := cr.RedisCli + ab, _ := json.Marshal(candle) + for _, v := range channels { + suffix := "" + env := os.Getenv("GO_ENV") + if env == "demoEnv" { + suffix = "-demoEnv" + } + vd := v + suffix + _, err := redisCli.Publish(vd, string(ab)).Result() + if err != nil { + fmt.Println("err of ma7|ma30 add to redis2:", err, candle.From) + } + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..14ecff3 --- /dev/null +++ b/go.mod @@ -0,0 +1,31 @@ +module github.com/phyer/core + +replace ( + v5sdk_go/config => ./submodules/okex/config + v5sdk_go/rest => ./submodules/okex/rest + v5sdk_go/utils => ./submodules/okex/utils + v5sdk_go/ws => ./submodules/okex/ws +) + +go 1.21 + +require ( + github.com/bitly/go-simplejson v0.5.0 + github.com/go-redis/redis v6.15.9+incompatible + github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196 + github.com/sirupsen/logrus v1.9.3 + v5sdk_go/rest v0.0.0-00010101000000-000000000000 + v5sdk_go/ws v0.0.0-00010101000000-000000000000 +) + +require ( + github.com/gorilla/websocket v1.5.1 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/nxadm/tail v1.4.8 // indirect + github.com/rogpeppe/go-internal v1.6.1 // indirect + golang.org/x/net v0.17.0 // indirect + golang.org/x/sys v0.13.0 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect + v5sdk_go/config v0.0.0-00010101000000-000000000000 // indirect + v5sdk_go/utils v0.0.0-00010101000000-000000000000 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..a166792 --- /dev/null +++ b/go.sum @@ -0,0 +1,55 @@ +github.com/bitly/go-simplejson v0.5.0 h1:6IH+V8/tVMab511d5bn4M7EwGXZf9Hj6i2xSwkNEM+Y= +github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= +github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= +github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= +github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= +github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc= +github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= +github.com/onsi/gomega v1.16.0 h1:6gjqkI8iiRHMvdccRJM8rVKjCWk6ZIm6FTm3ddIe4/c= +github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= +github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196 h1:P1sxgCsS0VIL38ufZzgUuZLLyY/B+po6kSY7ziNZT7E= +github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196/go.mod h1:iZexs5agdApNlp8HW/FqKgma4Ij1x8/o+ZLcMvY3f80= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/ticker.go b/ticker.go new file mode 100644 index 0000000..650b26d --- /dev/null +++ b/ticker.go @@ -0,0 +1,70 @@ +package core + +import ( + "fmt" + "reflect" + "strconv" +) + +type TickerInfo struct { + Id string `json:"_id"` + InstId string `json:"instId"` + Last float64 `json:"last"` + InstType string `json:"instType"` + VolCcy24h float64 `json:"volCcy24h"` + Ts int64 `json:"ts"` +} + +type TickerInfoResp struct { + InstId string `json:"instId"` + Last string `json:"last"` + InstType string `json:"instType"` + VolCcy24h string `json:"volCcy24h"` + Ts string `json:"ts"` +} + +func (tir *TickerInfoResp) Convert() TickerInfo { + ti := TickerInfo{ + Id: HashString(tir.InstId + tir.Ts), + InstId: tir.InstId, + InstType: tir.InstType, + Last: ToFloat64(tir.Last), + VolCcy24h: ToFloat64(tir.VolCcy24h), + Ts: ToInt64(tir.Ts), + } + return ti +} + +func ToString(val interface{}) string { + valstr := "" + if reflect.TypeOf(val).Name() == "string" { + valstr = val.(string) + } else if reflect.TypeOf(val).Name() == "float64" { + valstr = fmt.Sprintf("%f", val) + } else if reflect.TypeOf(val).Name() == "int64" { + valstr = strconv.FormatInt(val.(int64), 16) + } + return valstr +} + +func ToInt64(val interface{}) int64 { + vali := int64(0) + if reflect.TypeOf(val).Name() == "string" { + vali, _ = strconv.ParseInt(val.(string), 10, 64) + } else if reflect.TypeOf(val).Name() == "float64" { + vali = int64(val.(float64)) + } + return vali +} + +func ToFloat64(val interface{}) float64 { + valf := float64(0) + if reflect.TypeOf(val).Name() == "string" { + valf, _ = strconv.ParseFloat(val.(string), 64) + } else if reflect.TypeOf(val).Name() == "float64" { + valf = val.(float64) + } else if reflect.TypeOf(val).Name() == "int64" { + valf = float64(val.(int64)) + } + return valf +} diff --git a/writeLog.go b/writeLog.go new file mode 100644 index 0000000..e16719d --- /dev/null +++ b/writeLog.go @@ -0,0 +1,32 @@ +package core + +import ( + "bytes" + "fmt" + "net/http" + "os" + + logrus "github.com/sirupsen/logrus" +) + +type WriteLog struct { + Content []byte + Tag string + Id string +} + +func (wg *WriteLog) Process(cr *Core) error { + go func() { + reqBody := bytes.NewBuffer(wg.Content) + cr.Env = os.Getenv("GO_ENV") + cr.FluentBitUrl = os.Getenv("TEXUS_FluentBitUrl") + fullUrl := "http://" + cr.FluentBitUrl + "/" + wg.Tag + res, err := http.Post(fullUrl, "application/json", reqBody) + + fmt.Println("requested, response:", fullUrl, string(wg.Content), res) + if err != nil { + logrus.Error(err) + } + }() + return nil +}