874 lines
25 KiB
Go
874 lines
25 KiB
Go
package core
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
|
||
// "math/rand"
|
||
"io/ioutil"
|
||
"net/http"
|
||
"os"
|
||
"strconv"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
// simple "github.com/bitly/go-simplejson"
|
||
// "v5sdk_go/ws/wImpl"
|
||
"github.com/go-redis/redis"
|
||
"github.com/phyer/texus/private"
|
||
"github.com/phyer/v5sdkgo/rest"
|
||
logrus "github.com/sirupsen/logrus"
|
||
)
|
||
|
||
type Core struct {
|
||
Env string
|
||
Cfg *MyConfig
|
||
RedisLocalCli *redis.Client
|
||
RedisRemoteCli *redis.Client
|
||
FluentBitUrl string
|
||
PlateMap map[string]*Plate
|
||
TrayMap map[string]*Tray
|
||
CoasterMd5SyncMap sync.Map
|
||
Mu *sync.Mutex
|
||
Mu1 *sync.Mutex
|
||
Waity *sync.WaitGroup
|
||
CandlesProcessChan chan *Candle
|
||
MaXProcessChan chan *MaX
|
||
RsiProcessChan chan *Rsi
|
||
StockRsiProcessChan chan *StockRsi
|
||
TickerInforocessChan chan *TickerInfo
|
||
CoasterChan chan *CoasterInfo
|
||
// SeriesChan chan *SeriesInfo
|
||
// SegmentItemChan chan *SegmentItem
|
||
MakeMaXsChan chan *Candle
|
||
// ShearForceGrpChan chan *ShearForceGrp
|
||
InvokeRestQueueChan chan *RestQueue
|
||
RedisLocal2Cli *redis.Client
|
||
RestQueueChan chan *RestQueue
|
||
RestQueue
|
||
WriteLogChan chan *WriteLog
|
||
}
|
||
|
||
type RestQueue struct {
|
||
InstId string
|
||
Bar string
|
||
After int64
|
||
Before int64
|
||
Limit string
|
||
Duration time.Duration
|
||
WithWs bool
|
||
}
|
||
|
||
type CandleData struct {
|
||
Code string `json:"code"`
|
||
Msg string `json:"msg"`
|
||
Data [][]interface{} `json:"data"` // 用二维数组来接收 candles 数据
|
||
}
|
||
type SubAction struct {
|
||
ActionName string
|
||
ForAll bool
|
||
MetaInfo map[string]interface{}
|
||
}
|
||
|
||
func (rst *RestQueue) Show(cr *Core) {
|
||
logrus.Info("restQueue:", rst.InstId, rst.Bar, rst.Limit)
|
||
}
|
||
|
||
func (rst *RestQueue) Save(cr *Core) {
|
||
afterSec := ""
|
||
if rst.After > 0 {
|
||
afterSec = fmt.Sprint("&after=", rst.After)
|
||
}
|
||
beforeSec := ""
|
||
if rst.Before > 0 {
|
||
beforeSec = fmt.Sprint("&before=", rst.Before)
|
||
}
|
||
limitSec := ""
|
||
if len(rst.Limit) > 0 {
|
||
limitSec = fmt.Sprint("&limit=", rst.Limit)
|
||
}
|
||
isHistory := false
|
||
ct, _ := cr.PeriodToMinutes(rst.Bar)
|
||
if rst.After/ct > 10 {
|
||
isHistory = true
|
||
}
|
||
prfix := ""
|
||
if isHistory {
|
||
prfix = "history-"
|
||
}
|
||
link := "/api/v5/market/" + prfix + "candles?instId=" + rst.InstId + "&bar=" + rst.Bar + limitSec + afterSec + beforeSec
|
||
|
||
logrus.Info("restLink: ", link)
|
||
rsp, err := cr.v5PublicInvoke(link)
|
||
if err != nil {
|
||
logrus.Warn("cr.v5PublicInvoke err:", err, " count: ", len(rsp.Data), " instId: ", rst.InstId, " period: ", rst.Bar, " after:", afterSec)
|
||
} else {
|
||
logrus.Warn("cr.v5PublicInvoke result count:", len(rsp.Data), " instId: ", rst.InstId, " period: ", rst.Bar, " after:", afterSec, " rsp: ", rsp)
|
||
}
|
||
cr.SaveCandle(rst.InstId, rst.Bar, rsp, rst.Duration, rst.WithWs)
|
||
}
|
||
|
||
func WriteLogProcess(cr *Core) {
|
||
for {
|
||
wg := <-cr.WriteLogChan
|
||
go func(wg *WriteLog) {
|
||
logrus.Info("start writelog: " + wg.Tag + " " + wg.Id)
|
||
wg.Process(cr)
|
||
}(wg)
|
||
time.Sleep(5 * time.Millisecond)
|
||
}
|
||
}
|
||
|
||
// func (cr *Core) ShowSysTime() {
|
||
// rsp, _ := cr.RestInvoke("/api/v5/public/time", rest.GET)
|
||
// fmt.Println("serverSystem time:", rsp)
|
||
// }
|
||
|
||
func (core *Core) Init() {
|
||
core.Env = os.Getenv("GO_ENV")
|
||
gitBranch := os.Getenv("gitBranchName")
|
||
commitID := os.Getenv("gitCommitID")
|
||
|
||
logrus.Info("当前环境: ", core.Env)
|
||
logrus.Info("gitBranch: ", gitBranch)
|
||
logrus.Info("gitCommitID: ", commitID)
|
||
cfg := MyConfig{}
|
||
cfg, _ = cfg.Init()
|
||
core.Cfg = &cfg
|
||
cli, err := core.GetRedisLocalCli()
|
||
core.RedisLocalCli = cli
|
||
core.RestQueueChan = make(chan *RestQueue)
|
||
core.WriteLogChan = make(chan *WriteLog)
|
||
// 跟订单有关的都关掉
|
||
// core.OrderChan = make(chan *private.Order)
|
||
if err != nil {
|
||
logrus.Error("init redis client err: ", err)
|
||
}
|
||
}
|
||
|
||
func (core *Core) GetRedisCliFromConf(conf RedisConfig) (*redis.Client, error) {
|
||
client := redis.NewClient(&redis.Options{
|
||
Addr: conf.Url,
|
||
Password: conf.Password, //默认空密码
|
||
DB: conf.Index, //使用默认数据库
|
||
})
|
||
pong, err := client.Ping().Result()
|
||
if pong == "PONG" && err == nil {
|
||
return client, err
|
||
} else {
|
||
logrus.Error("redis状态不可用:", conf.Url, conf.Password, conf.Index, err)
|
||
}
|
||
|
||
return client, nil
|
||
}
|
||
|
||
func (core *Core) GetRemoteRedisLocalCli() (*redis.Client, error) {
|
||
ru := core.Cfg.RedisConf.Url
|
||
rp := core.Cfg.RedisConf.Password
|
||
ri := core.Cfg.RedisConf.Index
|
||
re := os.Getenv("REDIS_URL")
|
||
if len(re) > 0 {
|
||
ru = re
|
||
}
|
||
client := redis.NewClient(&redis.Options{
|
||
Addr: ru,
|
||
Password: rp, //默认空密码
|
||
DB: ri, //使用默认数据库
|
||
})
|
||
pong, err := client.Ping().Result()
|
||
if pong == "PONG" && err == nil {
|
||
return client, err
|
||
} else {
|
||
logrus.Error("redis状态不可用:", ru, rp, ri, err)
|
||
}
|
||
|
||
return client, nil
|
||
}
|
||
func (core *Core) GetRedisLocalCli() (*redis.Client, error) {
|
||
ru := core.Cfg.RedisConf.Url
|
||
rp := core.Cfg.RedisConf.Password
|
||
ri := core.Cfg.RedisConf.Index
|
||
re := os.Getenv("REDIS_URL")
|
||
if len(re) > 0 {
|
||
ru = re
|
||
}
|
||
client := redis.NewClient(&redis.Options{
|
||
Addr: ru,
|
||
Password: rp, //默认空密码
|
||
DB: ri, //使用默认数据库
|
||
})
|
||
pong, err := client.Ping().Result()
|
||
if pong == "PONG" && err == nil {
|
||
return client, err
|
||
} else {
|
||
logrus.Error("redis状态不可用:", ru, rp, ri, err)
|
||
}
|
||
|
||
return client, nil
|
||
}
|
||
|
||
// 这些应该是放到 texus 里实现的
|
||
func (core *Core) GetAllTickerInfo() (*rest.RESTAPIResult, error) {
|
||
// GET / 获取所有产品行情信息
|
||
rsp, err := core.RestInvoke("/api/v5/market/tickers?instType=SPOT", rest.GET)
|
||
return rsp, err
|
||
}
|
||
|
||
// 这些跟 订单有关,都关掉
|
||
//
|
||
// func (core *Core) GetBalances() (*rest.RESTAPIResult, error) {
|
||
// // TODO 临时用了两个实现,restInvoke,复用原来的会有bug,不知道是谁的bug
|
||
// rsp, err := core.RestInvoke2("/api/v5/account/balance", rest.GET, nil)
|
||
// return rsp, err
|
||
// }
|
||
//
|
||
// func (core *Core) GetLivingOrderList() ([]*private.Order, error) {
|
||
// // TODO 临时用了两个实现,restInvoke,复用原来的会有bug,不知道是谁的bug
|
||
// params := make(map[string]interface{})
|
||
// data, err := core.RestInvoke2("/api/v5/trade/orders-pending", rest.GET, ¶ms)
|
||
// odrsp := private.OrderResp{}
|
||
// err = json.Unmarshal([]byte(data.Body), &odrsp)
|
||
// str, _ := json.Marshal(odrsp)
|
||
// fmt.Println("convert: err:", err, " body: ", data.Body, odrsp, " string:", string(str))
|
||
// list, err := odrsp.Convert()
|
||
// fmt.Println("loopLivingOrder response data:", str)
|
||
// fmt.Println(utils.GetFuncName(), " 当前数据是 ", data.V5Response.Code, " list len:", len(list))
|
||
// return list, err
|
||
// }
|
||
// func (core *Core) LoopInstrumentList() error {
|
||
// for {
|
||
// time.Sleep(3 * time.Second)
|
||
// ctype := ws.SPOT
|
||
//
|
||
// redisCli := core.RedisLocalCli
|
||
// counts, err := redisCli.HLen("instruments|" + ctype + "|hash").Result()
|
||
// if err != nil {
|
||
// fmt.Println("err of hset to redis:", err)
|
||
// }
|
||
// if counts == 0 {
|
||
// continue
|
||
// }
|
||
// break
|
||
// }
|
||
// return nil
|
||
// }
|
||
// func (core *Core) SubscribeTicker(op string) error {
|
||
// mp := make(map[string]string)
|
||
//
|
||
// redisCli := core.RedisLocalCli
|
||
// ctype := ws.SPOT
|
||
// mp, err := redisCli.HGetAll("instruments|" + ctype + "|hash").Result()
|
||
// b, err := json.Marshal(mp)
|
||
// js, err := simple.NewJson(b)
|
||
// if err != nil {
|
||
// fmt.Println("err of unMarshalJson3:", js)
|
||
// }
|
||
// // fmt.Println("ticker js: ", js)
|
||
// instAry := js.MustMap()
|
||
// for k, v := range instAry {
|
||
// b = []byte(v.(string))
|
||
// _, err := simple.NewJson(b)
|
||
// if err != nil {
|
||
// fmt.Println("err of unMarshalJson4:", js)
|
||
// }
|
||
// time.Sleep(5 * time.Second)
|
||
// go func(instId string, op string) {
|
||
//
|
||
// redisCli := core.RedisLocalCli
|
||
// _, err = redisCli.SAdd("tickers|"+op+"|set", instId).Result()
|
||
// if err != nil {
|
||
// fmt.Println("err of unMarshalJson5:", js)
|
||
// }
|
||
// }(k, op)
|
||
// }
|
||
// return nil
|
||
// }
|
||
|
||
// 通过接口获取一个币种名下的某个时间范围内的Candle对象集合
|
||
// 按说这个应该放到 texus里实现
|
||
func (core *Core) v5PublicInvoke(subUrl string) (*CandleData, error) {
|
||
restUrl, _ := core.Cfg.Config.Get("connect").Get("restBaseUrl").String()
|
||
url := restUrl + subUrl
|
||
resp, err := http.Get(url)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer resp.Body.Close()
|
||
body, err := ioutil.ReadAll(resp.Body)
|
||
var result CandleData
|
||
if err := json.Unmarshal(body, &result); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
return &result, nil
|
||
}
|
||
|
||
func (core *Core) RestInvoke(subUrl string, method string) (*rest.RESTAPIResult, error) {
|
||
restUrl, _ := core.Cfg.Config.Get("connect").Get("restBaseUrl").String()
|
||
//ep, method, uri string, param *map[string]interface{}
|
||
rest := rest.NewRESTAPI(restUrl, method, subUrl, nil)
|
||
key, _ := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessKey").String()
|
||
secure, _ := core.Cfg.Config.Get("credentialReadOnly").Get("secretKey").String()
|
||
pass, _ := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessPassphrase").String()
|
||
isDemo := false
|
||
if core.Env == "demoEnv" {
|
||
isDemo = true
|
||
}
|
||
rest.SetSimulate(isDemo).SetAPIKey(key, secure, pass)
|
||
response, err := rest.Run(context.Background())
|
||
if err != nil {
|
||
logrus.Error("restInvoke1 err:", subUrl, err)
|
||
}
|
||
return response, err
|
||
}
|
||
|
||
// func (core *Core) RestInvoke2(subUrl string, method string, param *map[string]interface{}) (*rest.RESTAPIResult, error) {
|
||
// key, err1 := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessKey").String()
|
||
// secret, err2 := core.Cfg.Config.Get("credentialReadOnly").Get("secretKey").String()
|
||
// pass, err3 := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessPassphrase").String()
|
||
// userId, err4 := core.Cfg.Config.Get("connect").Get("userId").String()
|
||
// restUrl, err5 := core.Cfg.Config.Get("connect").Get("restBaseUrl").String()
|
||
// if err1 != nil || err2 != nil || err3 != nil || err4 != nil || err5 != nil {
|
||
// fmt.Println(err1, err2, err3, err4, err5)
|
||
// } else {
|
||
// fmt.Println("key:", key, secret, pass, "userId:", userId, "restUrl: ", restUrl)
|
||
// }
|
||
// // reqParam := make(map[string]interface{})
|
||
// // if param != nil {
|
||
// // reqParam = *param
|
||
// // }
|
||
// // rs := rest.NewRESTAPI(restUrl, method, subUrl, &reqParam)
|
||
// isDemo := false
|
||
// if core.Env == "demoEnv" {
|
||
// isDemo = true
|
||
// }
|
||
// // rs.SetSimulate(isDemo).SetAPIKey(key, secret, pass).SetUserId(userId)
|
||
// // response, err := rs.Run(context.Background())
|
||
// // if err != nil {
|
||
// // fmt.Println("restInvoke2 err:", subUrl, err)
|
||
// // }
|
||
// apikey := rest.APIKeyInfo{
|
||
// ApiKey: key,
|
||
// SecKey: secret,
|
||
// PassPhrase: pass,
|
||
// }
|
||
// cli := rest.NewRESTClient(restUrl, &apikey, isDemo)
|
||
// rsp, err := cli.Get(context.Background(), subUrl, param)
|
||
// if err != nil {
|
||
// return rsp, err
|
||
// }
|
||
// fmt.Println("response:", rsp, err)
|
||
// return rsp, err
|
||
// }
|
||
|
||
// 跟下单有关的都关掉,以后再说
|
||
// func (core *Core) RestPost(subUrl string, param *map[string]interface{}) (*rest.RESTAPIResult, error) {
|
||
// key, err1 := core.Cfg.Config.Get("credentialMutable").Get("okAccessKey").String()
|
||
// secret, err2 := core.Cfg.Config.Get("credentialMutable").Get("secretKey").String()
|
||
// pass, err3 := core.Cfg.Config.Get("credentialMutable").Get("okAccessPassphrase").String()
|
||
// userId, err4 := core.Cfg.Config.Get("connect").Get("userId").String()
|
||
// restUrl, err5 := core.Cfg.Config.Get("connect").Get("restBaseUrl").String()
|
||
// if err1 != nil || err2 != nil || err3 != nil || err4 != nil || err5 != nil {
|
||
// fmt.Println(err1, err2, err3, err4, err5)
|
||
// } else {
|
||
// fmt.Println("key:", key, secret, pass, "userId:", userId, "restUrl: ", restUrl)
|
||
// }
|
||
// // 请求的另一种方式
|
||
// apikey := rest.APIKeyInfo{
|
||
// ApiKey: key,
|
||
// SecKey: secret,
|
||
// PassPhrase: pass,
|
||
// }
|
||
// isDemo := false
|
||
// if core.Env == "demoEnv" {
|
||
// isDemo = true
|
||
// }
|
||
// cli := rest.NewRESTClient(restUrl, &apikey, isDemo)
|
||
// rsp, err := cli.Post(context.Background(), subUrl, param)
|
||
// if err != nil {
|
||
// return rsp, err
|
||
// }
|
||
// return rsp, err
|
||
// }
|
||
|
||
// 我当前持有的币,每分钟刷新
|
||
func (core *Core) GetMyFavorList() []string {
|
||
redisCli := core.RedisLocalCli
|
||
opt := redis.ZRangeBy{
|
||
Min: "10",
|
||
Max: "100000000000",
|
||
}
|
||
cary, _ := redisCli.ZRevRangeByScore("private|positions|sortedSet", opt).Result()
|
||
cl := []string{}
|
||
for _, v := range cary {
|
||
cry := strings.Split(v, "|")[0] + "-USDT"
|
||
cl = append(cl, cry)
|
||
}
|
||
return cary
|
||
}
|
||
|
||
// 得到交易量排行榜,订阅其中前N名的各维度k线,并merge进来我已经购买的币列表,这个列表是动态更新的
|
||
// 改了,不需要交易排行榜,我手动指定一个排行即可, tickersVol|sortedSet 改成 tickersList|sortedSet
|
||
func (core *Core) GetScoreList(count int) []string {
|
||
|
||
// redisCli := core.RedisLocalCli
|
||
myFocusList := core.Cfg.Config.Get("focusList").MustArray()
|
||
logrus.Debug("curList: ", myFocusList)
|
||
lst := []string{}
|
||
for _, v := range myFocusList {
|
||
lst = append(lst, v.(string))
|
||
}
|
||
return lst
|
||
}
|
||
|
||
// 跟订单有关,关掉
|
||
// func LoopBalances(cr *Core, mdura time.Duration) {
|
||
// //协程:动态维护topScore
|
||
// ticker := time.NewTicker(mdura)
|
||
// for {
|
||
// select {
|
||
// case <-ticker.C:
|
||
// //协程:循环执行rest请求candle
|
||
// fmt.Println("loopBalance: receive ccyChannel start")
|
||
// RestBalances(cr)
|
||
// }
|
||
// }
|
||
// }
|
||
|
||
// func LoopLivingOrders(cr *Core, mdura time.Duration) {
|
||
// //协程:动态维护topScore
|
||
// ticker := time.NewTicker(mdura)
|
||
// for {
|
||
// select {
|
||
// case <-ticker.C:
|
||
// //协程:循环执行rest请求candle
|
||
// fmt.Println("loopLivingOrder: receive ccyChannel start")
|
||
// RestLivingOrder(cr)
|
||
// }
|
||
// }
|
||
// }
|
||
|
||
// 跟订单有关,关掉
|
||
// func RestBalances(cr *Core) ([]*private.Ccy, error) {
|
||
// // fmt.Println("restBalance loopBalance loop start")
|
||
// ccyList := []*private.Ccy{}
|
||
// rsp, err := cr.GetBalances()
|
||
// if err != nil {
|
||
// fmt.Println("loopBalance err00: ", err)
|
||
// }
|
||
// fmt.Println("loopBalance balance rsp: ", rsp)
|
||
// if err != nil {
|
||
// fmt.Println("loopBalance err01: ", err)
|
||
// return ccyList, err
|
||
// }
|
||
// if len(rsp.Body) == 0 {
|
||
// fmt.Println("loopBalance err03: rsp body is null")
|
||
// return ccyList, err
|
||
// }
|
||
// js1, err := simple.NewJson([]byte(rsp.Body))
|
||
// if err != nil {
|
||
// fmt.Println("loopBalance err1: ", err)
|
||
// }
|
||
// itemList := js1.Get("data").GetIndex(0).Get("details").MustArray()
|
||
// // maxTickers是重点关注的topScore的coins的数量
|
||
// cli := cr.RedisLocalCli
|
||
// for _, v := range itemList {
|
||
// js, _ := json.Marshal(v)
|
||
// ccyResp := private.CcyResp{}
|
||
// err := json.Unmarshal(js, &ccyResp)
|
||
// if err != nil {
|
||
// fmt.Println("loopBalance err2: ", err)
|
||
// }
|
||
// ccy, err := ccyResp.Convert()
|
||
// ccyList = append(ccyList, ccy)
|
||
// if err != nil {
|
||
// fmt.Println("loopBalance err2: ", err)
|
||
// }
|
||
// z := redis.Z{
|
||
// Score: ccy.EqUsd,
|
||
// Member: ccy.Ccy + "|position|key",
|
||
// }
|
||
// res, err := cli.ZAdd("private|positions|sortedSet", z).Result()
|
||
// if err != nil {
|
||
// fmt.Println("loopBalance err3: ", res, err)
|
||
// }
|
||
// res1, err := cli.Set(ccy.Ccy+"|position|key", js, 0).Result()
|
||
// if err != nil {
|
||
// fmt.Println("loopBalance err4: ", res1, err)
|
||
// }
|
||
// bjs, _ := json.Marshal(ccy)
|
||
// tsi := time.Now().Unix()
|
||
// tsii := tsi - tsi%600
|
||
// tss := strconv.FormatInt(tsii, 10)
|
||
// cli.Set(CCYPOSISTIONS_PUBLISH+"|ts:"+tss, 1, 10*time.Minute).Result()
|
||
// fmt.Println("ccy published: ", string(bjs))
|
||
// //TODO FIXME 50毫秒,每分钟上限是1200个订单,超过就无法遍历完成
|
||
// time.Sleep(50 * time.Millisecond)
|
||
// suffix := ""
|
||
// if cr.Env == "demoEnv" {
|
||
// suffix = "-demoEnv"
|
||
// }
|
||
// // TODO FIXME cli2
|
||
// cli.Publish(CCYPOSISTIONS_PUBLISH+suffix, string(bjs)).Result()
|
||
// }
|
||
// return ccyList, nil
|
||
// }
|
||
|
||
// func RestLivingOrder(cr *Core) ([]*private.Order, error) {
|
||
// // fmt.Println("restOrder loopOrder loop start")
|
||
// orderList := []*private.Order{}
|
||
// list, err := cr.GetLivingOrderList()
|
||
// if err != nil {
|
||
// fmt.Println("loopLivingOrder err00: ", err)
|
||
// }
|
||
// fmt.Println("loopLivingOrder response:", list)
|
||
// go func() {
|
||
// for _, v := range list {
|
||
// fmt.Println("order orderV:", v)
|
||
// time.Sleep(30 * time.Millisecond)
|
||
// cr.OrderChan <- v
|
||
// }
|
||
// }()
|
||
// return orderList, nil
|
||
// }
|
||
|
||
func (cr *Core) ProcessOrder(od *private.Order) error {
|
||
// publish
|
||
go func() {
|
||
suffix := ""
|
||
if cr.Env == "demoEnv" {
|
||
suffix = "-demoEnv"
|
||
}
|
||
cn := ORDER_PUBLISH + suffix
|
||
bj, _ := json.Marshal(od)
|
||
|
||
// TODO FIXME cli2
|
||
res, _ := cr.RedisLocalCli.Publish(cn, string(bj)).Result()
|
||
logrus.Info("order publish res: ", res, " content: ", string(bj))
|
||
rsch := ORDER_RESP_PUBLISH + suffix
|
||
bj1, _ := json.Marshal(res)
|
||
|
||
// TODO FIXME cli2
|
||
res, _ = cr.RedisLocalCli.Publish(rsch, string(bj1)).Result()
|
||
}()
|
||
return nil
|
||
}
|
||
|
||
// 跟订单有关,关掉
|
||
// func (cr *Core) DispatchSubAction(action *SubAction) error {
|
||
// go func() {
|
||
// suffix := ""
|
||
// if cr.Env == "demoEnv" {
|
||
// suffix = "-demoEnv"
|
||
// }
|
||
// fmt.Println("action: ", action.ActionName, action.MetaInfo)
|
||
// res, err := cr.RestPostWrapper("/api/v5/trade/"+action.ActionName, action.MetaInfo)
|
||
// if err != nil {
|
||
// fmt.Println(utils.GetFuncName(), " actionRes 1:", err)
|
||
// }
|
||
// rsch := ORDER_RESP_PUBLISH + suffix
|
||
// bj1, _ := json.Marshal(res)
|
||
//
|
||
// // TODO FIXME cli2
|
||
// rs, _ := cr.RedisLocalCli.Publish(rsch, string(bj1)).Result()
|
||
// fmt.Println("action rs: ", rs)
|
||
// }()
|
||
// return nil
|
||
// }
|
||
|
||
// func (cr *Core) RestPostWrapper(url string, param map[string]interface{}) (rest.Okexv5APIResponse, error) {
|
||
// suffix := ""
|
||
// if cr.Env == "demoEnv" {
|
||
// suffix = "-demoEnv"
|
||
// }
|
||
// res, err := cr.RestPost(url, ¶m)
|
||
// fmt.Println("actionRes 2:", res.V5Response.Msg, res.V5Response.Data, err)
|
||
// bj, _ := json.Marshal(res)
|
||
//
|
||
// // TODO FIXME cli2
|
||
// cr.RedisLocalCli.Publish(ORDER_RESP_PUBLISH+suffix, string(bj))
|
||
// return res.V5Response, nil
|
||
// }
|
||
|
||
func (cr *Core) AddToGeneralCandleChnl(candle *Candle, channels []string) {
|
||
redisCli := cr.RedisLocalCli
|
||
ab, err := json.Marshal(candle)
|
||
if err != nil {
|
||
logrus.Error("candle marshal err: ", err)
|
||
}
|
||
logrus.Debug("AddToGeneralCandleChnl: ", string(ab), " channels: ", channels)
|
||
for _, v := range channels {
|
||
suffix := ""
|
||
env := os.Getenv("GO_ENV")
|
||
if env == "demoEnv" {
|
||
suffix = "-demoEnv"
|
||
}
|
||
vd := v + suffix
|
||
_, err := redisCli.Publish(vd, string(ab)).Result()
|
||
if err != nil {
|
||
logrus.Error("err of ma7|ma30 add to redis2:", err, candle.From)
|
||
}
|
||
}
|
||
}
|
||
|
||
// setName := "candle" + period + "|" + instId + "|sortedSet"
|
||
// count: 倒推多少个周期开始拿数据
|
||
// from: 倒推的起始时间点
|
||
// ctype: candle或者maX
|
||
func (core *Core) GetRangeMaXSortedSet(setName string, count int, from time.Time) (*MaXList, error) {
|
||
mxl := &MaXList{Count: count}
|
||
ary1 := strings.Split(setName, "|")
|
||
ary2 := []string{}
|
||
period := ""
|
||
ary2 = strings.Split(ary1[1], "candle")
|
||
period = ary2[1]
|
||
|
||
dui, err := core.PeriodToMinutes(period)
|
||
if err != nil {
|
||
return mxl, err
|
||
}
|
||
fromt := from.UnixMilli()
|
||
froms := strconv.FormatInt(fromt, 10)
|
||
sti := fromt - dui*int64(count)*60*1000
|
||
sts := strconv.FormatInt(sti, 10)
|
||
opt := redis.ZRangeBy{
|
||
Min: sts,
|
||
Max: froms,
|
||
Count: int64(count),
|
||
}
|
||
ary := []string{}
|
||
logrus.Debug("ZRevRangeByScore ", " setName:", setName, " froms:", froms, " sts:", sts)
|
||
dura, err := core.GetExpiration(period)
|
||
if err != nil {
|
||
return mxl, err
|
||
}
|
||
// fmt.Println("GetExpiration dura: ", dura, " period: ", period)
|
||
ot := time.Now().Add(dura * -1)
|
||
oti := ot.UnixMilli()
|
||
// fmt.Println(fmt.Sprint("GetExpiration zRemRangeByScore ", setName, " ", 0, " ", strconv.FormatInt(oti, 10)))
|
||
cli := core.RedisLocalCli
|
||
cli.LTrim(setName, 0, oti)
|
||
cunt, _ := cli.ZRemRangeByScore(setName, "0", strconv.FormatInt(oti, 10)).Result()
|
||
if cunt > 0 {
|
||
logrus.Warning("移出过期的引用数量:", "setName:", setName, " cunt:", cunt)
|
||
}
|
||
ary, err = cli.ZRevRangeByScore(setName, opt).Result()
|
||
if err != nil {
|
||
logrus.Warning("GetRangeMaXSortedSet ZRevRangeByScore err: ", " setName: ", setName, ", opt:", opt, ", res:", ary, ", err:", err)
|
||
return mxl, err
|
||
}
|
||
keyAry, err := cli.MGet(ary...).Result()
|
||
if err != nil {
|
||
logrus.Warning("GetRangeMaXSortedSet mget err: ", "setName:", setName, ", max:", opt.Max, ", min:", opt.Min, ", res:", keyAry, ", err:", err)
|
||
return mxl, err
|
||
}
|
||
for _, str := range keyAry {
|
||
if str == nil {
|
||
continue
|
||
}
|
||
mx := MaX{}
|
||
json.Unmarshal([]byte(str.(string)), &mx)
|
||
curData := mx.Data
|
||
if len(curData) == 0 {
|
||
logrus.Warning("curData is null", str)
|
||
continue
|
||
}
|
||
|
||
ts := curData[0].(float64)
|
||
tm := time.UnixMilli(int64(ts))
|
||
if tm.Sub(from) > 0 {
|
||
break
|
||
}
|
||
mxl.List = append(mxl.List, &mx)
|
||
}
|
||
mxl.Count = count
|
||
return mxl, nil
|
||
}
|
||
|
||
// 根据周期的文本内容,返回这代表多少个分钟
|
||
// 设置sortedSet的过期时间
|
||
|
||
func (cr *Core) GetExpiration(per string) (time.Duration, error) {
|
||
if len(per) == 0 {
|
||
erstr := fmt.Sprint("period没有设置")
|
||
logrus.Warn(erstr)
|
||
err := errors.New(erstr)
|
||
return 0, err
|
||
}
|
||
exp, err := cr.PeriodToMinutes(per)
|
||
length, _ := cr.Cfg.Config.Get("sortedSet").Get("length").Int64()
|
||
logrus.Info("length of sortedSet: ", length)
|
||
dur := time.Duration(exp*length) * time.Minute
|
||
return dur, err
|
||
}
|
||
|
||
func (cr *Core) PeriodToLastTime(period string, from time.Time) (time.Time, error) {
|
||
candles := cr.Cfg.Config.Get("softCandleSegmentList").MustArray()
|
||
minutes := int64(0)
|
||
tms := ""
|
||
|
||
for _, v := range candles {
|
||
cs := CandleSegment{}
|
||
sv, _ := json.Marshal(v)
|
||
json.Unmarshal(sv, &cs)
|
||
if cs.Seg == period {
|
||
mns, err := cr.PeriodToMinutes(period)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
minutes = mns
|
||
tms = cs.StartTime
|
||
} else {
|
||
}
|
||
|
||
}
|
||
|
||
tm, err := time.ParseInLocation("2006-01-02 15:04.000", tms, time.Local)
|
||
if err != nil {
|
||
return from, err
|
||
}
|
||
// from.Unix() % minutes
|
||
frmi := from.UnixMilli()
|
||
frmi = frmi - (frmi)%(60*1000)
|
||
tmi := tm.UnixMilli()
|
||
tmi = tmi - (tmi)%(60*1000)
|
||
oms := int64(0)
|
||
for i := int64(0); true; i++ {
|
||
if tmi+i*minutes*60*1000 > frmi {
|
||
break
|
||
} else {
|
||
oms = tmi + i*minutes*60*1000
|
||
continue
|
||
}
|
||
}
|
||
// return from, nil
|
||
// tm :=
|
||
om := time.UnixMilli(oms)
|
||
// fmt.Println("PeriodToLastTime: period: ", period, " lastTime:", om.Format("2006-01-02 15:04:05.000"))
|
||
return om, nil
|
||
}
|
||
|
||
// setName := "candle" + period + "|" + instId + "|sortedSet"
|
||
// count: 倒推多少个周期开始拿数据
|
||
// from: 倒推的起始时间点
|
||
// ctype: candle或者maX
|
||
func (core *Core) GetRangeCandleSortedSet(setName string, count int, from time.Time) (*CandleList, error) {
|
||
// 初始化返回结果
|
||
cdl := &CandleList{Count: count}
|
||
|
||
// 解析period
|
||
ary := strings.Split(setName, "|")
|
||
if len(ary) < 1 {
|
||
return nil, fmt.Errorf("invalid setName format: %s", setName)
|
||
}
|
||
period := strings.TrimPrefix(ary[0], "candle")
|
||
|
||
// 获取period对应的分钟数
|
||
durationMinutes, err := core.PeriodToMinutes(period)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to get period minutes: %w", err)
|
||
}
|
||
|
||
// 计算时间范围
|
||
fromTs := from.UnixMilli()
|
||
nowTs := time.Now().UnixMilli()
|
||
if fromTs > nowTs*2 {
|
||
return nil, fmt.Errorf("invalid from time: %v", from)
|
||
}
|
||
|
||
// 计算查询的起始时间
|
||
startTs := fromTs - durationMinutes*int64(count)*60*1000
|
||
|
||
// 清理过期数据
|
||
if err := core.cleanExpiredData(setName, period); err != nil {
|
||
logrus.Warnf("Failed to clean expired data: %v", err)
|
||
}
|
||
|
||
// 从Redis获取数据
|
||
cli := core.RedisLocalCli
|
||
opt := redis.ZRangeBy{
|
||
Min: strconv.FormatInt(startTs, 10),
|
||
Max: strconv.FormatInt(fromTs, 10),
|
||
Count: int64(count),
|
||
}
|
||
|
||
logrus.Debugf("Querying Redis: set=%s, start=%v, end=%v", setName, time.UnixMilli(startTs), from)
|
||
|
||
// 获取键列表
|
||
keys, err := cli.ZRevRangeByScore(setName, opt).Result()
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to query Redis: %w", err)
|
||
}
|
||
|
||
if len(keys) == 0 {
|
||
logrus.Warnf("No data found for set: %s between %v and %v", setName, time.UnixMilli(startTs), from)
|
||
return cdl, nil
|
||
}
|
||
|
||
// 批量获取数据
|
||
values, err := cli.MGet(keys...).Result()
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to get values from Redis: %w", err)
|
||
}
|
||
|
||
// 解析数据
|
||
for _, val := range values {
|
||
if val == nil {
|
||
continue
|
||
}
|
||
|
||
var candle Candle
|
||
if err := json.Unmarshal([]byte(val.(string)), &candle); err != nil {
|
||
logrus.Warnf("Failed to unmarshal candle data: %v", err)
|
||
continue
|
||
}
|
||
|
||
// 检查时间是否在范围内
|
||
candleTime := time.UnixMilli(ToInt64(candle.Data[0]))
|
||
if candleTime.After(from) {
|
||
break
|
||
}
|
||
|
||
cdl.List = append(cdl.List, &candle)
|
||
}
|
||
|
||
return cdl, nil
|
||
}
|
||
|
||
// cleanExpiredData 清理过期的数据
|
||
func (core *Core) cleanExpiredData(setName, period string) error {
|
||
expiration, err := core.GetExpiration(period)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
cli := core.RedisLocalCli
|
||
expirationTime := time.Now().Add(-expiration)
|
||
expirationTs := strconv.FormatInt(expirationTime.UnixMilli(), 10)
|
||
|
||
// 清理过期数据
|
||
if count, err := cli.ZRemRangeByScore(setName, "0", expirationTs).Result(); err != nil {
|
||
return err
|
||
} else if count > 0 {
|
||
logrus.Infof("Cleaned %d expired records from %s", count, setName)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func (cr *Core) GetCoasterFromPlate(instID string, period string) (Coaster, error) {
|
||
// cr.Mu.Lock()
|
||
// defer cr.Mu.Unlock()
|
||
pl, ok := cr.PlateMap[instID]
|
||
if !ok {
|
||
err := errors.New("instID period not found : " + instID + " " + period)
|
||
return *new(Coaster), err
|
||
}
|
||
co := pl.CoasterMap["period"+period]
|
||
|
||
return co, nil
|
||
}
|