From 3b357f0912120b3cda97d2580a5571490455f254 Mon Sep 17 00:00:00 2001 From: "zhangkun9038@dingtalk.com" Date: Fri, 20 Dec 2024 23:45:39 +0800 Subject: [PATCH] add rsi --- core.go | 785 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 785 insertions(+) diff --git a/core.go b/core.go index 36a4a8a..cb92889 100644 --- a/core.go +++ b/core.go @@ -36,6 +36,791 @@ type Core struct { Waity *sync.WaitGroup CandlesProcessChan chan *Candle MaXProcessChan chan *MaX + RsiProcessChan chan *Rsi + TickerInforocessChan chan *TickerInfo + CoasterChan chan *CoasterInfo + // SeriesChan chan *SeriesInfo + // SegmentItemChan chan *SegmentItem + MakeMaXsChan chan *Candle + // ShearForceGrpChan chan *ShearForceGrp + InvokeRestQueueChan chan *RestQueue + RedisLocal2Cli *redis.Client + RestQueueChan chan *RestQueue + RestQueue + WriteLogChan chan *WriteLog +} + +type RestQueue struct { + InstId string + Bar string + After int64 + Before int64 + Limit string + Duration time.Duration + WithWs bool +} + +type CandleData struct { + Code string `json:"code"` + Msg string `json:"msg"` + Data [][]interface{} `json:"data"` // 用二维数组来接收 candles 数据 +} +type SubAction struct { + ActionName string + ForAll bool + MetaInfo map[string]interface{} +} + +func (rst *RestQueue) Show(cr *Core) { + logrus.Info("restQueue:", rst.InstId, rst.Bar, rst.Limit) +} + +func (rst *RestQueue) Save(cr *Core) { + afterSec := "" + if rst.After > 0 { + afterSec = fmt.Sprint("&after=", rst.After) + } + beforeSec := "" + if rst.Before > 0 { + beforeSec = fmt.Sprint("&before=", rst.Before) + } + limitSec := "" + if len(rst.Limit) > 0 { + limitSec = fmt.Sprint("&limit=", rst.Limit) + } + link := "/api/v5/market/candles?instId=" + rst.InstId + "&bar=" + rst.Bar + limitSec + afterSec + beforeSec + + logrus.Info("restLink: ", link) + rsp, err := cr.v5PublicInvoke(link) + if err != nil { + logrus.Info("cr.v5PublicInvoke err:", err) + } else { + logrus.Info("cr.v5PublicInvoke result count:", len(rsp.Data)) + } + cr.SaveCandle(rst.InstId, rst.Bar, rsp, rst.Duration, rst.WithWs) +} + +func WriteLogProcess(cr *Core) { + for { + wg := <-cr.WriteLogChan + go func(wg *WriteLog) { + logrus.Info("start writelog: " + wg.Tag + " " + wg.Id) + wg.Process(cr) + }(wg) + time.Sleep(50 * time.Millisecond) + } +} + +// func (cr *Core) ShowSysTime() { +// rsp, _ := cr.RestInvoke("/api/v5/public/time", rest.GET) +// fmt.Println("serverSystem time:", rsp) +// } + +func (core *Core) Init() { + core.Env = os.Getenv("GO_ENV") + gitBranch := os.Getenv("gitBranchName") + commitID := os.Getenv("gitCommitID") + + logrus.Info("当前环境: ", core.Env) + logrus.Info("gitBranch: ", gitBranch) + logrus.Info("gitCommitID: ", commitID) + cfg := MyConfig{} + cfg, _ = cfg.Init() + core.Cfg = &cfg + cli, err := core.GetRedisLocalCli() + core.RedisLocalCli = cli + core.RestQueueChan = make(chan *RestQueue) + core.WriteLogChan = make(chan *WriteLog) + // 跟订单有关的都关掉 + // core.OrderChan = make(chan *private.Order) + if err != nil { + logrus.Error("init redis client err: ", err) + } +} + +func (core *Core) GetRedisCliFromConf(conf RedisConfig) (*redis.Client, error) { + client := redis.NewClient(&redis.Options{ + Addr: conf.Url, + Password: conf.Password, //默认空密码 + DB: conf.Index, //使用默认数据库 + }) + pong, err := client.Ping().Result() + if pong == "PONG" && err == nil { + return client, err + } else { + logrus.Error("redis状态不可用:", conf.Url, conf.Password, conf.Index, err) + } + + return client, nil +} + +func (core *Core) GetRemoteRedisLocalCli() (*redis.Client, error) { + ru := core.Cfg.RedisConf.Url + rp := core.Cfg.RedisConf.Password + ri := core.Cfg.RedisConf.Index + re := os.Getenv("REDIS_URL") + if len(re) > 0 { + ru = re + } + client := redis.NewClient(&redis.Options{ + Addr: ru, + Password: rp, //默认空密码 + DB: ri, //使用默认数据库 + }) + pong, err := client.Ping().Result() + if pong == "PONG" && err == nil { + return client, err + } else { + logrus.Error("redis状态不可用:", ru, rp, ri, err) + } + + return client, nil +} +func (core *Core) GetRedisLocalCli() (*redis.Client, error) { + ru := core.Cfg.RedisConf.Url + rp := core.Cfg.RedisConf.Password + ri := core.Cfg.RedisConf.Index + re := os.Getenv("REDIS_URL") + if len(re) > 0 { + ru = re + } + client := redis.NewClient(&redis.Options{ + Addr: ru, + Password: rp, //默认空密码 + DB: ri, //使用默认数据库 + }) + pong, err := client.Ping().Result() + if pong == "PONG" && err == nil { + return client, err + } else { + logrus.Error("redis状态不可用:", ru, rp, ri, err) + } + + return client, nil +} + +// 这些应该是放到 texus 里实现的 +func (core *Core) GetAllTickerInfo() (*rest.RESTAPIResult, error) { + // GET / 获取所有产品行情信息 + rsp, err := core.RestInvoke("/api/v5/market/tickers?instType=SPOT", rest.GET) + return rsp, err +} + +// 这些跟 订单有关,都关掉 +// +// func (core *Core) GetBalances() (*rest.RESTAPIResult, error) { +// // TODO 临时用了两个实现,restInvoke,复用原来的会有bug,不知道是谁的bug +// rsp, err := core.RestInvoke2("/api/v5/account/balance", rest.GET, nil) +// return rsp, err +// } +// +// func (core *Core) GetLivingOrderList() ([]*private.Order, error) { +// // TODO 临时用了两个实现,restInvoke,复用原来的会有bug,不知道是谁的bug +// params := make(map[string]interface{}) +// data, err := core.RestInvoke2("/api/v5/trade/orders-pending", rest.GET, ¶ms) +// odrsp := private.OrderResp{} +// err = json.Unmarshal([]byte(data.Body), &odrsp) +// str, _ := json.Marshal(odrsp) +// fmt.Println("convert: err:", err, " body: ", data.Body, odrsp, " string:", string(str)) +// list, err := odrsp.Convert() +// fmt.Println("loopLivingOrder response data:", str) +// fmt.Println(utils.GetFuncName(), " 当前数据是 ", data.V5Response.Code, " list len:", len(list)) +// return list, err +// } +// func (core *Core) LoopInstrumentList() error { +// for { +// time.Sleep(3 * time.Second) +// ctype := ws.SPOT +// +// redisCli := core.RedisLocalCli +// counts, err := redisCli.HLen("instruments|" + ctype + "|hash").Result() +// if err != nil { +// fmt.Println("err of hset to redis:", err) +// } +// if counts == 0 { +// continue +// } +// break +// } +// return nil +// } +// func (core *Core) SubscribeTicker(op string) error { +// mp := make(map[string]string) +// +// redisCli := core.RedisLocalCli +// ctype := ws.SPOT +// mp, err := redisCli.HGetAll("instruments|" + ctype + "|hash").Result() +// b, err := json.Marshal(mp) +// js, err := simple.NewJson(b) +// if err != nil { +// fmt.Println("err of unMarshalJson3:", js) +// } +// // fmt.Println("ticker js: ", js) +// instAry := js.MustMap() +// for k, v := range instAry { +// b = []byte(v.(string)) +// _, err := simple.NewJson(b) +// if err != nil { +// fmt.Println("err of unMarshalJson4:", js) +// } +// time.Sleep(5 * time.Second) +// go func(instId string, op string) { +// +// redisCli := core.RedisLocalCli +// _, err = redisCli.SAdd("tickers|"+op+"|set", instId).Result() +// if err != nil { +// fmt.Println("err of unMarshalJson5:", js) +// } +// }(k, op) +// } +// return nil +// } + +// 通过接口获取一个币种名下的某个时间范围内的Candle对象集合 +// 按说这个应该放到 texus里实现 +func (core *Core) v5PublicInvoke(subUrl string) (*CandleData, error) { + restUrl, _ := core.Cfg.Config.Get("connect").Get("restBaseUrl").String() + url := restUrl + subUrl + resp, err := http.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + var result CandleData + if err := json.Unmarshal(body, &result); err != nil { + return nil, err + } + + return &result, nil +} + +func (core *Core) RestInvoke(subUrl string, method string) (*rest.RESTAPIResult, error) { + restUrl, _ := core.Cfg.Config.Get("connect").Get("restBaseUrl").String() + //ep, method, uri string, param *map[string]interface{} + rest := rest.NewRESTAPI(restUrl, method, subUrl, nil) + key, _ := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessKey").String() + secure, _ := core.Cfg.Config.Get("credentialReadOnly").Get("secretKey").String() + pass, _ := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessPassphrase").String() + isDemo := false + if core.Env == "demoEnv" { + isDemo = true + } + rest.SetSimulate(isDemo).SetAPIKey(key, secure, pass) + response, err := rest.Run(context.Background()) + if err != nil { + logrus.Error("restInvoke1 err:", subUrl, err) + } + return response, err +} + +// func (core *Core) RestInvoke2(subUrl string, method string, param *map[string]interface{}) (*rest.RESTAPIResult, error) { +// key, err1 := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessKey").String() +// secret, err2 := core.Cfg.Config.Get("credentialReadOnly").Get("secretKey").String() +// pass, err3 := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessPassphrase").String() +// userId, err4 := core.Cfg.Config.Get("connect").Get("userId").String() +// restUrl, err5 := core.Cfg.Config.Get("connect").Get("restBaseUrl").String() +// if err1 != nil || err2 != nil || err3 != nil || err4 != nil || err5 != nil { +// fmt.Println(err1, err2, err3, err4, err5) +// } else { +// fmt.Println("key:", key, secret, pass, "userId:", userId, "restUrl: ", restUrl) +// } +// // reqParam := make(map[string]interface{}) +// // if param != nil { +// // reqParam = *param +// // } +// // rs := rest.NewRESTAPI(restUrl, method, subUrl, &reqParam) +// isDemo := false +// if core.Env == "demoEnv" { +// isDemo = true +// } +// // rs.SetSimulate(isDemo).SetAPIKey(key, secret, pass).SetUserId(userId) +// // response, err := rs.Run(context.Background()) +// // if err != nil { +// // fmt.Println("restInvoke2 err:", subUrl, err) +// // } +// apikey := rest.APIKeyInfo{ +// ApiKey: key, +// SecKey: secret, +// PassPhrase: pass, +// } +// cli := rest.NewRESTClient(restUrl, &apikey, isDemo) +// rsp, err := cli.Get(context.Background(), subUrl, param) +// if err != nil { +// return rsp, err +// } +// fmt.Println("response:", rsp, err) +// return rsp, err +// } + +// 跟下单有关的都关掉,以后再说 +// func (core *Core) RestPost(subUrl string, param *map[string]interface{}) (*rest.RESTAPIResult, error) { +// key, err1 := core.Cfg.Config.Get("credentialMutable").Get("okAccessKey").String() +// secret, err2 := core.Cfg.Config.Get("credentialMutable").Get("secretKey").String() +// pass, err3 := core.Cfg.Config.Get("credentialMutable").Get("okAccessPassphrase").String() +// userId, err4 := core.Cfg.Config.Get("connect").Get("userId").String() +// restUrl, err5 := core.Cfg.Config.Get("connect").Get("restBaseUrl").String() +// if err1 != nil || err2 != nil || err3 != nil || err4 != nil || err5 != nil { +// fmt.Println(err1, err2, err3, err4, err5) +// } else { +// fmt.Println("key:", key, secret, pass, "userId:", userId, "restUrl: ", restUrl) +// } +// // 请求的另一种方式 +// apikey := rest.APIKeyInfo{ +// ApiKey: key, +// SecKey: secret, +// PassPhrase: pass, +// } +// isDemo := false +// if core.Env == "demoEnv" { +// isDemo = true +// } +// cli := rest.NewRESTClient(restUrl, &apikey, isDemo) +// rsp, err := cli.Post(context.Background(), subUrl, param) +// if err != nil { +// return rsp, err +// } +// return rsp, err +// } + +// 我当前持有的币,每分钟刷新 +func (core *Core) GetMyFavorList() []string { + redisCli := core.RedisLocalCli + opt := redis.ZRangeBy{ + Min: "10", + Max: "100000000000", + } + cary, _ := redisCli.ZRevRangeByScore("private|positions|sortedSet", opt).Result() + cl := []string{} + for _, v := range cary { + cry := strings.Split(v, "|")[0] + "-USDT" + cl = append(cl, cry) + } + return cary +} + +// 得到交易量排行榜,订阅其中前N名的各维度k线,并merge进来我已经购买的币列表,这个列表是动态更新的 +// 改了,不需要交易排行榜,我手动指定一个排行即可, tickersVol|sortedSet 改成 tickersList|sortedSet +func (core *Core) GetScoreList(count int) []string { + + // redisCli := core.RedisLocalCli + myFocusList := core.Cfg.Config.Get("focusList").MustArray() + logrus.Debug("curList: ", myFocusList) + lst := []string{} + for _, v := range myFocusList { + lst = append(lst, v.(string)) + } + return lst +} + +// 跟订单有关,关掉 +// func LoopBalances(cr *Core, mdura time.Duration) { +// //协程:动态维护topScore +// ticker := time.NewTicker(mdura) +// for { +// select { +// case <-ticker.C: +// //协程:循环执行rest请求candle +// fmt.Println("loopBalance: receive ccyChannel start") +// RestBalances(cr) +// } +// } +// } + +// func LoopLivingOrders(cr *Core, mdura time.Duration) { +// //协程:动态维护topScore +// ticker := time.NewTicker(mdura) +// for { +// select { +// case <-ticker.C: +// //协程:循环执行rest请求candle +// fmt.Println("loopLivingOrder: receive ccyChannel start") +// RestLivingOrder(cr) +// } +// } +// } + +// 跟订单有关,关掉 +// func RestBalances(cr *Core) ([]*private.Ccy, error) { +// // fmt.Println("restBalance loopBalance loop start") +// ccyList := []*private.Ccy{} +// rsp, err := cr.GetBalances() +// if err != nil { +// fmt.Println("loopBalance err00: ", err) +// } +// fmt.Println("loopBalance balance rsp: ", rsp) +// if err != nil { +// fmt.Println("loopBalance err01: ", err) +// return ccyList, err +// } +// if len(rsp.Body) == 0 { +// fmt.Println("loopBalance err03: rsp body is null") +// return ccyList, err +// } +// js1, err := simple.NewJson([]byte(rsp.Body)) +// if err != nil { +// fmt.Println("loopBalance err1: ", err) +// } +// itemList := js1.Get("data").GetIndex(0).Get("details").MustArray() +// // maxTickers是重点关注的topScore的coins的数量 +// cli := cr.RedisLocalCli +// for _, v := range itemList { +// js, _ := json.Marshal(v) +// ccyResp := private.CcyResp{} +// err := json.Unmarshal(js, &ccyResp) +// if err != nil { +// fmt.Println("loopBalance err2: ", err) +// } +// ccy, err := ccyResp.Convert() +// ccyList = append(ccyList, ccy) +// if err != nil { +// fmt.Println("loopBalance err2: ", err) +// } +// z := redis.Z{ +// Score: ccy.EqUsd, +// Member: ccy.Ccy + "|position|key", +// } +// res, err := cli.ZAdd("private|positions|sortedSet", z).Result() +// if err != nil { +// fmt.Println("loopBalance err3: ", res, err) +// } +// res1, err := cli.Set(ccy.Ccy+"|position|key", js, 0).Result() +// if err != nil { +// fmt.Println("loopBalance err4: ", res1, err) +// } +// bjs, _ := json.Marshal(ccy) +// tsi := time.Now().Unix() +// tsii := tsi - tsi%600 +// tss := strconv.FormatInt(tsii, 10) +// cli.Set(CCYPOSISTIONS_PUBLISH+"|ts:"+tss, 1, 10*time.Minute).Result() +// fmt.Println("ccy published: ", string(bjs)) +// //TODO FIXME 50毫秒,每分钟上限是1200个订单,超过就无法遍历完成 +// time.Sleep(50 * time.Millisecond) +// suffix := "" +// if cr.Env == "demoEnv" { +// suffix = "-demoEnv" +// } +// // TODO FIXME cli2 +// cli.Publish(CCYPOSISTIONS_PUBLISH+suffix, string(bjs)).Result() +// } +// return ccyList, nil +// } + +// func RestLivingOrder(cr *Core) ([]*private.Order, error) { +// // fmt.Println("restOrder loopOrder loop start") +// orderList := []*private.Order{} +// list, err := cr.GetLivingOrderList() +// if err != nil { +// fmt.Println("loopLivingOrder err00: ", err) +// } +// fmt.Println("loopLivingOrder response:", list) +// go func() { +// for _, v := range list { +// fmt.Println("order orderV:", v) +// time.Sleep(30 * time.Millisecond) +// cr.OrderChan <- v +// } +// }() +// return orderList, nil +// } + +func (cr *Core) ProcessOrder(od *private.Order) error { + // publish + go func() { + suffix := "" + if cr.Env == "demoEnv" { + suffix = "-demoEnv" + } + cn := ORDER_PUBLISH + suffix + bj, _ := json.Marshal(od) + + // TODO FIXME cli2 + res, _ := cr.RedisLocalCli.Publish(cn, string(bj)).Result() + logrus.Info("order publish res: ", res, " content: ", string(bj)) + rsch := ORDER_RESP_PUBLISH + suffix + bj1, _ := json.Marshal(res) + + // TODO FIXME cli2 + res, _ = cr.RedisLocalCli.Publish(rsch, string(bj1)).Result() + }() + return nil +} + +// 跟订单有关,关掉 +// func (cr *Core) DispatchSubAction(action *SubAction) error { +// go func() { +// suffix := "" +// if cr.Env == "demoEnv" { +// suffix = "-demoEnv" +// } +// fmt.Println("action: ", action.ActionName, action.MetaInfo) +// res, err := cr.RestPostWrapper("/api/v5/trade/"+action.ActionName, action.MetaInfo) +// if err != nil { +// fmt.Println(utils.GetFuncName(), " actionRes 1:", err) +// } +// rsch := ORDER_RESP_PUBLISH + suffix +// bj1, _ := json.Marshal(res) +// +// // TODO FIXME cli2 +// rs, _ := cr.RedisLocalCli.Publish(rsch, string(bj1)).Result() +// fmt.Println("action rs: ", rs) +// }() +// return nil +// } + +// func (cr *Core) RestPostWrapper(url string, param map[string]interface{}) (rest.Okexv5APIResponse, error) { +// suffix := "" +// if cr.Env == "demoEnv" { +// suffix = "-demoEnv" +// } +// res, err := cr.RestPost(url, ¶m) +// fmt.Println("actionRes 2:", res.V5Response.Msg, res.V5Response.Data, err) +// bj, _ := json.Marshal(res) +// +// // TODO FIXME cli2 +// cr.RedisLocalCli.Publish(ORDER_RESP_PUBLISH+suffix, string(bj)) +// return res.V5Response, nil +// } + +func (cr *Core) AddToGeneralCandleChnl(candle *Candle, channels []string) { + redisCli := cr.RedisLocalCli + ab, err := json.Marshal(candle) + if err != nil { + logrus.Error("candle marshal err: ", err) + } + logrus.Debug("ab: ", string(ab)) + for _, v := range channels { + suffix := "" + env := os.Getenv("GO_ENV") + if env == "demoEnv" { + suffix = "-demoEnv" + } + vd := v + suffix + _, err := redisCli.Publish(vd, string(ab)).Result() + if err != nil { + logrus.Error("err of ma7|ma30 add to redis2:", err, candle.From) + } + } +} + +// setName := "candle" + period + "|" + instId + "|sortedSet" +// count: 倒推多少个周期开始拿数据 +// from: 倒推的起始时间点 +// ctype: candle或者maX +func (core *Core) GetRangeMaXSortedSet(setName string, count int, from time.Time) (*MaXList, error) { + mxl := MaXList{} + ary1 := strings.Split(setName, "|") + ary2 := []string{} + period := "" + ary2 = strings.Split(ary1[1], "candle") + period = ary2[1] + + dui, err := core.PeriodToMinutes(period) + if err != nil { + return &mxl, err + } + fromt := from.UnixMilli() + froms := strconv.FormatInt(fromt, 10) + sti := fromt - dui*int64(count)*60*1000 + sts := strconv.FormatInt(sti, 10) + opt := redis.ZRangeBy{ + Min: sts, + Max: froms, + Count: int64(count), + } + ary := []string{} + logrus.Debug("ZRevRangeByScore ", setName, froms, sts) + dura, err := core.GetExpiration(period) + if err != nil { + return &mxl, err + } + // fmt.Println("GetExpiration dura: ", dura, " period: ", period) + ot := time.Now().Add(dura * -1) + oti := ot.UnixMilli() + // fmt.Println(fmt.Sprint("GetExpiration zRemRangeByScore ", setName, " ", 0, " ", strconv.FormatInt(oti, 10))) + cli := core.RedisLocalCli + cli.LTrim(setName, 0, oti) + cunt, _ := cli.ZRemRangeByScore(setName, "0", strconv.FormatInt(oti, 10)).Result() + if cunt > 0 { + logrus.Warning("移出过期的引用数量:", "setName:", setName, " count:", count) + } + ary, err = cli.ZRevRangeByScore(setName, opt).Result() + if err != nil { + logrus.Warning("GetRangeMaXSortedSet ZRevRangeByScore err: ", " setName: ", setName, ", opt:", opt, ", res:", ary, ", err:", err) + return &mxl, err + } + keyAry, err := cli.MGet(ary...).Result() + if err != nil { + logrus.Warning("GetRangeMaXSortedSet mget err: ", "setName:", setName, ", max:", opt.Max, ", min:", opt.Min, ", res:", keyAry, ", err:", err) + return &mxl, err + } + for _, str := range keyAry { + if str == nil { + continue + } + mx := MaX{} + json.Unmarshal([]byte(str.(string)), &mx) + curData := mx.Data + if len(curData) == 0 { + logrus.Warning("curData is null", str) + continue + } + + ts := curData[0].(float64) + tm := time.UnixMilli(int64(ts)) + if tm.Sub(from) > 0 { + break + } + mxl.List = append(mxl.List, &mx) + } + mxl.Count = count + return &mxl, nil +} + +// 根据周期的文本内容,返回这代表多少个分钟 + +func (cr *Core) GetExpiration(per string) (time.Duration, error) { + if len(per) == 0 { + erstr := fmt.Sprint("period没有设置") + logrus.Warn(erstr) + err := errors.New(erstr) + return 0, err + } + exp, err := cr.PeriodToMinutes(per) + dur := time.Duration(exp*49) * time.Minute + return dur, err +} + +func (cr *Core) PeriodToLastTime(period string, from time.Time) (time.Time, error) { + candles := cr.Cfg.Config.Get("softCandleSegmentList").MustArray() + minutes := int64(0) + tms := "" + + for _, v := range candles { + cs := CandleSegment{} + sv, _ := json.Marshal(v) + json.Unmarshal(sv, &cs) + if cs.Seg == period { + mns, err := cr.PeriodToMinutes(period) + if err != nil { + continue + } + minutes = mns + tms = cs.StartTime + } else { + } + + } + + tm, err := time.ParseInLocation("2006-01-02 15:04.000", tms, time.Local) + if err != nil { + return from, err + } + // from.Unix() % minutes + frmi := from.UnixMilli() + frmi = frmi - (frmi)%(60*1000) + tmi := tm.UnixMilli() + tmi = tmi - (tmi)%(60*1000) + oms := int64(0) + for i := int64(0); true; i++ { + if tmi+i*minutes*60*1000 > frmi { + break + } else { + oms = tmi + i*minutes*60*1000 + continue + } + } + // return from, nil + // tm := + om := time.UnixMilli(oms) + // fmt.Println("PeriodToLastTime: period: ", period, " lastTime:", om.Format("2006-01-02 15:04:05.000")) + return om, nil +} + +// setName := "candle" + period + "|" + instId + "|sortedSet" +// count: 倒推多少个周期开始拿数据 +// from: 倒推的起始时间点 +// ctype: candle或者maX +func (core *Core) GetRangeCandleSortedSet(setName string, count int, from time.Time) (*CandleList, error) { + cdl := CandleList{} + ary1 := strings.Split(setName, "|") + ary2 := []string{} + period := "" + ary2 = strings.Split(ary1[0], "candle") + period = ary2[1] + + dui, err := core.PeriodToMinutes(period) + if err != nil { + return nil, err + } + fromt := from.UnixMilli() + nw := time.Now().UnixMilli() + if fromt > nw*2 { + err := errors.New("时间错了需要debug") + logrus.Warning(err.Error()) + return nil, err + } + froms := strconv.FormatInt(fromt, 10) + sti := fromt - dui*int64(count)*60*1000 + sts := strconv.FormatInt(sti, 10) + opt := redis.ZRangeBy{ + Min: sts, + Max: froms, + Count: int64(count), + } + ary := []string{} + extt, err := core.GetExpiration(period) + ot := time.Now().Add(extt * -1) + oti := ot.UnixMilli() + cli := core.RedisLocalCli + cli.LTrim(setName, 0, oti) + cunt, _ := cli.ZRemRangeByScore(setName, "0", strconv.FormatInt(oti, 10)).Result() + if cunt > 0 { + logrus.Warning("移出过期的引用数量:", setName, count, "ZRemRangeByScore ", setName, 0, strconv.FormatInt(oti, 10)) + } + logrus.Info("ZRevRangeByScore ", setName, opt) + ary, err = cli.ZRevRangeByScore(setName, opt).Result() + if err != nil { + return &cdl, err + } + keyAry, err := cli.MGet(ary...).Result() + if err != nil || len(keyAry) == 0 { + logrus.Warning("no record with cmd: ZRevRangeByScore ", setName, froms, sts, " ", err.Error()) + logrus.Warning("zrev lens of ary: lens: ", len(ary), "GetRangeSortedSet ZRevRangeByScore:", "setName:", setName, "opt.Max:", opt.Max, "opt.Min:", opt.Min) + return &cdl, err + } + for _, str := range keyAry { + if str == nil { + continue + } + cd := Candle{} + err := json.Unmarshal([]byte(str.(string)), &cd) + if err != nil { + logrus.Warn(GetFuncName(), err, str.(string)) + } + tmi := ToInt64(cd.Data[0]) + tm := time.UnixMilli(tmi) + if tm.Sub(from) > 0 { + break + } + cdl.List = append(cdl.List, &cd) + } + cdl.Count = count + return &cdl, nil +} + +func (cr *Core) GetCoasterFromPlate(instID string, period string) (Coaster, error) { + // cr.Mu.Lock() + // defer cr.Mu.Unlock() + pl, ok := cr.PlateMap[instID] + if !ok { + err := errors.New("instID period not found : " + instID + " " + period) + return *new(Coaster), err + } + co := pl.CoasterMap["period"+period] + + return co, nil +} TickerInforocessChan chan *TickerInfo CoasterChan chan *CoasterInfo // SeriesChan chan *SeriesInfo