diff --git a/.gitignore b/.gitignore index f218d12..8fe9567 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ vendor/ tunas +texus diff --git a/core/candle.go b/core/candle.go deleted file mode 100644 index ec0270a..0000000 --- a/core/candle.go +++ /dev/null @@ -1,470 +0,0 @@ -package core - -import ( - "crypto/sha256" - "encoding/hex" - "encoding/json" - "errors" - "fmt" - "math/rand" - "os" - "strconv" - "strings" - "time" - // "v5sdk_go/rest" - - simple "github.com/bitly/go-simplejson" - "github.com/go-redis/redis" - "github.com/phyer/texus/utils" -) - -type Candle struct { - Id string `json:"_id"` - core *Core - InstId string - Period string - Data []interface{} - From string - Timestamp time.Time - Open float64 - High float64 - Low float64 - Close float64 - VolCcy float64 - Confirm bool -} - -type MaX struct { - Core *Core - InstId string - Period string - KeyName string - Count int - Ts int64 - Value float64 - Data []interface{} - From string -} - -type MatchCheck struct { - Minutes int64 - Matched bool -} - -func (mc *MatchCheck) SetMatched(value bool) { - mc.Matched = value -} - -func (core *Core) GetCandlesWithRest(instId string, kidx int, dura time.Duration, maxCandles int) error { - ary := []string{} - - wsary := core.Cfg.CandleDimentions - for k, v := range wsary { - matched := false - // 这个算法的目的是:越靠后的candles维度,被命中的概率越低,第一个百分之百命中,后面开始越来越低, 每分钟都会发生这样的计算, - // 因为维度多了的话,照顾不过来 - rand.New(rand.NewSource(time.Now().UnixNano())) - rand.Seed(time.Now().UnixNano()) - n := (k*2 + 2) * 3 - if n < 1 { - n = 1 - } - b := rand.Intn(n) - if b < 8 { - matched = true - } - if matched { - ary = append(ary, v) - } - } - - mdura := dura/(time.Duration(len(ary)+1)) - 50*time.Millisecond - // fmt.Println("loop4 Ticker Start instId, dura: ", instId, dura, dura/10, mdura, len(ary), " idx: ", kidx) - // time.Duration(len(ary)+1) - ticker := time.NewTicker(mdura) - done := make(chan bool) - idx := 0 - go func(i int) { - for { - select { - case <-ticker.C: - if i >= (len(ary)) { - done <- true - break - } - rand.Seed(time.Now().UnixNano()) - b := rand.Intn(2) - maxCandles = maxCandles * (i + b) * 2 - - if maxCandles < 3 { - maxCandles = 3 - } - if maxCandles > 30 { - maxCandles = 30 - } - mx := strconv.Itoa(maxCandles) - // fmt.Println("loop4 getCandlesWithRest, instId, period,limit,dura, t: ", instId, ary[i], mx, mdura) - go func(ii int) { - restQ := RestQueue{ - InstId: instId, - Bar: ary[ii], - Limit: mx, - Duration: mdura, - WithWs: true, - } - js, _ := json.Marshal(restQ) - core.RedisCli.LPush("restQueue", js) - }(i) - i++ - } - } - }(idx) - time.Sleep(dura - 10*time.Millisecond) - ticker.Stop() - // fmt.Println("loop4 Ticker stopped instId, dura: ", instId, dura, mdura) - done <- true - return nil -} - -// 当前的时间毫秒数 对于某个时间段,比如3分钟,10分钟,是否可以被整除, -func IsModOf(curInt int64, duration time.Duration) bool { - vol := int64(0) - if duration < 24*time.Hour { - // 小于1天 - vol = (curInt + 28800000) - } else if duration >= 24*time.Hour && duration < 48*time.Hour { - // 1天 - vol = curInt - 1633881600000 - } else if duration >= 48*time.Hour && duration < 72*time.Hour { - // 2天 - vol = curInt - 1633795200000 - } else if duration >= 72*time.Hour && duration < 120*time.Hour { - // 3天 - vol = curInt - 1633708800000 - } else if duration >= 120*time.Hour { - // 5天 - vol = curInt - 1633795200000 - } else { - // fmt.Println("noMatched:", curInt) - } - - mody := vol % duration.Milliseconds() - if mody == 0 { - return true - } - return false -} - -func (core *Core) SaveCandle(instId string, period string, rsp *CandleData, dura time.Duration, withWs bool) { - leng := len(rsp.Data) - for _, v := range rsp.Data { - candle := Candle{ - InstId: instId, - Period: period, - Data: v, - From: "rest", - } - //存到elasticSearch - candle.PushToWriteLogChan(core) - //保存rest得到的candle - // 发布到allCandles|publish, 给外部订阅者用于setToKey - arys := []string{ALLCANDLES_PUBLISH} - if withWs { - arys = append(arys, ALLCANDLES_INNER_PUBLISH) - } - // 如果candle都不需要存到redis,那么AddToGeneralCandleChnl也没有意义 - saveCandle := os.Getenv("TEXUS_SAVECANDLE") - if saveCandle == "true" { - candle.SetToKey(core) - core.AddToGeneralCandleChnl(&candle, arys) - time.Sleep(dura / time.Duration(leng)) - } - } -} - -func (candle *Candle) PushToWriteLogChan(cr *Core) error { - did := candle.InstId + candle.Period + candle.Data[0].(string) - candle.Id = HashString(did) - ncd, _ := candle.ToStruct(cr) - fmt.Println("ncd: ", ncd) - cd, _ := json.Marshal(ncd) - wg := WriteLog{ - Content: cd, - Tag: "sardine.log.candle." + candle.Period, - Id: candle.Id, - } - cr.WriteLogChan <- &wg - return nil -} - -func Daoxu(arr []interface{}) { - var temp interface{} - length := len(arr) - for i := 0; i < length/2; i++ { - temp = arr[i] - arr[i] = arr[length-1-i] - arr[length-1-i] = temp - } -} -func HashString(input string) string { - // 计算SHA-256哈希值 - hash := sha256.Sum256([]byte(input)) - // 转换为十六进制字符串 - hashHex := hex.EncodeToString(hash[:]) - // 返回前20位 - return hashHex[:23] -} - -func (cl *Candle) ToStruct(core *Core) (*Candle, error) { - // cl.Timestamp - ncd := Candle{} - ncd.Id = cl.Id - ncd.Period = cl.Period - ncd.InstId = cl.InstId - ncd.From = cl.From - - // 将字符串转换为 int64 类型的时间戳 - ts, err := strconv.ParseInt(cl.Data[0].(string), 10, 64) - if err != nil { - fmt.Println("Error parsing timestamp:", err) - return nil, err - } - ncd.Timestamp = time.Unix(ts/1000, (ts%1000)*1000000) // 纳秒级别 - op, err := strconv.ParseFloat(cl.Data[1].(string), 64) - if err != nil { - fmt.Println("Error parsing string to float64:", err) - return nil, err - } - ncd.Open = op - hi, err := strconv.ParseFloat(cl.Data[2].(string), 64) - if err != nil { - fmt.Println("Error parsing string to float64:", err) - return nil, err - } - ncd.High = hi - lo, err := strconv.ParseFloat(cl.Data[3].(string), 64) - if err != nil { - fmt.Println("Error parsing string to float64:", err) - return nil, err - } - ncd.Low = lo - clse, err := strconv.ParseFloat(cl.Data[4].(string), 64) - - if err != nil { - fmt.Println("Error parsing string to float64:", err) - return nil, err - } - ncd.Close = clse - ncd.VolCcy, err = strconv.ParseFloat(cl.Data[6].(string), 64) - if err != nil { - fmt.Println("Error parsing string to float64:", err) - return nil, err - } - if cl.Data[6].(string) == "1" { - ncd.Confirm = true - } else { - ncd.Confirm = false - } - return &ncd, nil -} - -func (mx *MaX) SetToKey() ([]interface{}, error) { - cstr := strconv.Itoa(mx.Count) - tss := strconv.FormatInt(mx.Ts, 10) - keyName := "ma" + cstr + "|candle" + mx.Period + "|" + mx.InstId + "|ts:" + tss - //过期时间:根号(当前candle的周期/1分钟)*10000 - dt := []interface{}{} - dt = append(dt, mx.Ts) - dt = append(dt, mx.Value) - dj, _ := json.Marshal(dt) - exp := mx.Core.PeriodToMinutes(mx.Period) - expf := utils.Sqrt(float64(exp)) * 100 - extt := time.Duration(expf) * time.Minute - // loc, _ := time.LoadLocation("Asia/Shanghai") - // tm := time.UnixMilli(mx.Ts).In(loc).Format("2006-01-02 15:04") - // fmt.Println("setToKey:", keyName, "ts:", tm, string(dj), "from: ", mx.From) - _, err := mx.Core.RedisCli.GetSet(keyName, dj).Result() - mx.Core.RedisCli.Expire(keyName, extt) - return dt, err -} - -// 保证同一个 period, keyName ,在一个周期里,SaveToSortSet只会被执行一次 -func (core *Core) SaveUniKey(period string, keyName string, extt time.Duration, tsi int64, cl *Candle) { - - refName := keyName + "|refer" - refRes, _ := core.RedisCli.GetSet(refName, 1).Result() - core.RedisCli.Expire(refName, extt) - // 为保证唯一性机制,防止SaveToSortSet 被重复执行 - if len(refRes) != 0 { - fmt.Println("refName exist: ", refName) - return - } - - core.SaveToSortSet(period, keyName, extt, tsi) -} - -// tsi: 上报时间timeStamp millinSecond -func (core *Core) SaveToSortSet(period string, keyName string, extt time.Duration, tsi int64) { - ary := strings.Split(keyName, "ts:") - setName := ary[0] + "sortedSet" - z := redis.Z{ - Score: float64(tsi), - Member: keyName, - } - rs, err := core.RedisCli.ZAdd(setName, z).Result() - if err != nil { - fmt.Println("err of ma7|ma30 add to redis:", err) - } else { - fmt.Println("sortedSet added to redis:", rs, keyName) - } -} - -func (cr *Core) PeriodToMinutes(period string) int64 { - ary := strings.Split(period, "") - beiStr := "1" - danwei := "" - if len(ary) == 3 { - beiStr = ary[0] + ary[1] - danwei = ary[2] - } else { - beiStr = ary[0] - danwei = ary[1] - } - cheng := 1 - bei, _ := strconv.Atoi(beiStr) - switch danwei { - case "m": - { - cheng = bei - break - } - case "H": - { - cheng = bei * 60 - break - } - case "D": - { - cheng = bei * 60 * 24 - break - } - case "W": - { - cheng = bei * 60 * 24 * 7 - break - } - case "M": - { - cheng = bei * 60 * 24 * 30 - break - } - case "Y": - { - cheng = bei * 60 * 24 * 365 - break - } - default: - { - fmt.Println("notmatch:", danwei) - } - } - return int64(cheng) -} - -// type ScanCmd struct { -// baseCmd -// -// page []string -// cursor uint64 -// -// process func(cmd Cmder) error -// } -func (core *Core) GetRangeKeyList(pattern string, from time.Time) ([]*simple.Json, error) { - // 比如,用来计算ma30或ma7,倒推多少时间范围, - redisCli := core.RedisCli - cursor := uint64(0) - n := 0 - allTs := []int64{} - var keys []string - for { - var err error - keys, cursor, _ = redisCli.Scan(cursor, pattern+"*", 2000).Result() - if err != nil { - panic(err) - } - n += len(keys) - if n == 0 { - break - } - } - - // keys, _ := redisCli.Keys(pattern + "*").Result() - for _, key := range keys { - keyAry := strings.Split(key, ":") - key = keyAry[1] - keyi64, _ := strconv.ParseInt(key, 10, 64) - allTs = append(allTs, keyi64) - } - nary := utils.RecursiveBubble(allTs, len(allTs)) - tt := from.UnixMilli() - ff := tt - tt%60000 - fi := int64(ff) - mary := []int64{} - for _, v := range nary { - if v < fi { - break - } - mary = append(mary, v) - } - res := []*simple.Json{} - for _, v := range mary { - // if k > 1 { - // break - // } - nv := pattern + strconv.FormatInt(v, 10) - str, err := redisCli.Get(nv).Result() - if err != nil { - fmt.Println("err of redis get key:", nv, err) - } - cur, err := simple.NewJson([]byte(str)) - if err != nil { - fmt.Println("err of create newJson:", str, err) - } - res = append(res, cur) - } - - return res, nil -} - -func (cl *Candle) SetToKey(core *Core) ([]interface{}, error) { - data := cl.Data - tsi, err := strconv.ParseInt(data[0].(string), 10, 64) - tss := strconv.FormatInt(tsi, 10) - keyName := "candle" + cl.Period + "|" + cl.InstId + "|ts:" + tss - //过期时间:根号(当前candle的周期/1分钟)*10000 - - dt, err := json.Marshal(cl.Data) - exp := core.PeriodToMinutes(cl.Period) - // expf := float64(exp) * 60 - expf := utils.Sqrt(float64(exp)) * 100 - extt := time.Duration(expf) * time.Minute - curVolstr, _ := data[5].(string) - curVol, err := strconv.ParseFloat(curVolstr, 64) - if err != nil { - fmt.Println("err of convert ts:", err) - } - curVolCcystr, _ := data[6].(string) - curVolCcy, err := strconv.ParseFloat(curVolCcystr, 64) - curPrice := curVolCcy / curVol - if curPrice <= 0 { - fmt.Println("price有问题", curPrice, "dt: ", string(dt), "from:", cl.From) - err = errors.New("price有问题") - return cl.Data, err - } - redisCli := core.RedisCli - // tm := time.UnixMilli(tsi).Format("2006-01-02 15:04") - fmt.Println("setToKey:", keyName, "ts: ", "price: ", curPrice, "from:", cl.From) - redisCli.Set(keyName, dt, extt).Result() - core.SaveUniKey(cl.Period, keyName, extt, tsi, cl) - return cl.Data, err -} diff --git a/core/config.go b/core/config.go deleted file mode 100644 index 62a7d4b..0000000 --- a/core/config.go +++ /dev/null @@ -1,122 +0,0 @@ -package core - -import ( - "fmt" - "io/ioutil" - "log" - "os" - "strings" - - simple "github.com/bitly/go-simplejson" -) - -type MyConfig struct { - Env string `json:"env", string` - Config *simple.Json - CandleDimentions []string `json:"candleDimentions"` - RedisConf *RedisConfig `json:"redis"` - CredentialReadOnlyConf *CredentialConfig `json:"credential"` - CredentialMutableConf *CredentialConfig `json:"credential"` - ConnectConf *ConnectConfig `json:"connect"` - // ThreadsConf *ThreadsConfig `json:"threads"` -} - -type RedisConfig struct { - Url string `json:"url", string` - Password string `json:"password", string` - Index int `json:"index", string` -} - -type CredentialConfig struct { - SecretKey string `json:"secretKey", string` - BaseUrl string `json:"baseUrl", string` - OkAccessKey string `json:"okAccessKey", string` - OkAccessPassphrase string `json:"okAccessPassphrase", string` -} - -type ConnectConfig struct { - LoginSubUrl string `json:"loginSubUrl", string` - WsPrivateBaseUrl string `json:"wsPrivateBaseUrl", string` - WsPublicBaseUrl string `json:"wsPublicBaseUrl", string` - RestBaseUrl string `json:"restBaseUrl", string` -} -type ThreadsConfig struct { - MaxLenTickerStream int `json:"maxLenTickerStream", int` - MaxCandles int `json:"maxCandles", string` - AsyncChannels int `json:"asyncChannels", int` - MaxTickers int `json:"maxTickers", int` - RestPeriod int `json:"restPeriod", int` - WaitWs int `json:"waitWs", int` -} - -func (cfg MyConfig) Init() (MyConfig, error) { - env := os.Getenv("GO_ENV") - arystr := os.Getenv("TUNAS_CANDLESDIMENTIONS") - ary := strings.Split(arystr, "|") - cfg.CandleDimentions = ary - jsonStr, err := ioutil.ReadFile("/go/json/basicConfig.json") - if err != nil { - jsonStr, err = ioutil.ReadFile("configs/basicConfig.json") - if err != nil { - fmt.Println("err2:", err.Error()) - return cfg, err - } - cfg.Config, err = simple.NewJson([]byte(jsonStr)) - if err != nil { - fmt.Println("err2:", err.Error()) - return cfg, err - } - cfg.Env = env - } - cfg.Config = cfg.Config.Get(env) - - ru, err := cfg.Config.Get("redis").Get("url").String() - rp, _ := cfg.Config.Get("redis").Get("password").String() - ri, _ := cfg.Config.Get("redis").Get("index").Int() - redisConf := RedisConfig{ - Url: ru, - Password: rp, - Index: ri, - } - // fmt.Println("cfg: ", cfg) - cfg.RedisConf = &redisConf - ls, _ := cfg.Config.Get("connect").Get("loginSubUrl").String() - wsPub, _ := cfg.Config.Get("connect").Get("wsPrivateBaseUrl").String() - wsPri, _ := cfg.Config.Get("connect").Get("wsPublicBaseUrl").String() - restBu, _ := cfg.Config.Get("connect").Get("restBaseUrl").String() - connectConfig := ConnectConfig{ - LoginSubUrl: ls, - WsPublicBaseUrl: wsPub, - WsPrivateBaseUrl: wsPri, - RestBaseUrl: restBu, - } - cfg.ConnectConf = &connectConfig - return cfg, nil -} - -func (cfg *MyConfig) GetConfigJson(arr []string) *simple.Json { - env := os.Getenv("GO_ENV") - fmt.Println("env: ", env) - cfg.Env = env - - json, err := ioutil.ReadFile("/go/json/basicConfig.json") - - if err != nil { - json, err = ioutil.ReadFile("configs/basicConfig.json") - if err != nil { - log.Panic("read config error: ", err.Error()) - } - } - if err != nil { - fmt.Println("read file err: ", err) - } - rjson, err := simple.NewJson(json) - if err != nil { - fmt.Println("newJson err: ", err) - } - for _, s := range arr { - rjson = rjson.Get(s) - // fmt.Println(s, ": ", rjson) - } - return rjson -} diff --git a/core/const.go b/core/const.go deleted file mode 100644 index 4612b45..0000000 --- a/core/const.go +++ /dev/null @@ -1,12 +0,0 @@ -package core - -const MAIN_ALLCOINS_PERIOD_MINUTES = 1 -const MAIN_ALLCOINS_ONCE_COUNTS = 3 -const MAIN_ALLCOINS_BAR_PERIOD = "3m" -const ALLCANDLES_PUBLISH = "allCandles|publish" -const ALLCANDLES_INNER_PUBLISH = "allCandlesInner|publish" -const ORDER_PUBLISH = "private|order|publish" -const TICKERINFO_PUBLISH = "tickerInfo|publish" -const CCYPOSISTIONS_PUBLISH = "ccyPositions|publish" -const SUBACTIONS_PUBLISH = "subActions|publish" -const ORDER_RESP_PUBLISH = "private|actionResp|publish" diff --git a/core/core.go b/core/core.go deleted file mode 100644 index 5c567bf..0000000 --- a/core/core.go +++ /dev/null @@ -1,532 +0,0 @@ -package core - -import ( - "context" - "encoding/json" - // "errors" - "fmt" - // "math/rand" - "io/ioutil" - "net/http" - "os" - "strconv" - "strings" - "sync" - "time" - "v5sdk_go/rest" - "v5sdk_go/ws" - // "v5sdk_go/ws/wImpl" - - simple "github.com/bitly/go-simplejson" - "github.com/go-redis/redis" - "github.com/phyer/texus/private" - "github.com/phyer/texus/utils" -) - -type Core struct { - Env string - Cfg *MyConfig - RedisCli *redis.Client - RedisCli2 *redis.Client - FluentBitUrl string - Wg sync.WaitGroup - RestQueueChan chan *RestQueue - OrderChan chan *private.Order - WriteLogChan chan *WriteLog -} -type RestQueue struct { - InstId string - Bar string - After int64 - Before int64 - Limit string - Duration time.Duration - WithWs bool -} -type CandleData struct { - Code string `json:"code"` - Msg string `json:"msg"` - Data [][]interface{} `json:"data"` // 用二维数组来接收 candles 数据 -} -type SubAction struct { - ActionName string - ForAll bool - MetaInfo map[string]interface{} -} - -func (rst *RestQueue) Show(cr *Core) { - fmt.Println("restQueue:", rst.InstId, rst.Bar, rst.Limit) -} - -func (rst *RestQueue) Save(cr *Core) { - afterSec := "" - if rst.After > 0 { - afterSec = fmt.Sprint("&after=", rst.After) - } - beforeSec := "" - if rst.Before > 0 { - beforeSec = fmt.Sprint("&before=", rst.Before) - } - limitSec := "" - if len(rst.Limit) > 0 { - limitSec = fmt.Sprint("&limit=", rst.Limit) - } - link := "/api/v5/market/candles?instId=" + rst.InstId + "&bar=" + rst.Bar + limitSec + afterSec + beforeSec - - fmt.Println("restLink: ", link) - rsp, err := cr.v5PublicInvoke(link) - if err != nil { - fmt.Println("cr.v5PublicInvoke err:", err) - } else { - fmt.Println("cr.v5PublicInvoke result count:", len(rsp.Data)) - } - cr.SaveCandle(rst.InstId, rst.Bar, rsp, rst.Duration, rst.WithWs) -} - -func WriteLogProcess(cr *Core) { - for { - wg := <-cr.WriteLogChan - go func(wg *WriteLog) { - fmt.Println("start writelog: " + wg.Tag + " " + wg.Id) - wg.Process(cr) - }(wg) - time.Sleep(50 * time.Millisecond) - } -} - -func (cr *Core) ShowSysTime() { - rsp, _ := cr.RestInvoke("/api/v5/public/time", rest.GET) - fmt.Println("serverSystem time:", rsp) -} - -func (core *Core) Init() { - core.Env = os.Getenv("GO_ENV") - gitBranch := os.Getenv("gitBranchName") - commitID := os.Getenv("gitCommitID") - - fmt.Println("当前环境: ", core.Env) - fmt.Println("gitBranch: ", gitBranch) - fmt.Println("gitCommitID: ", commitID) - cfg := MyConfig{} - cfg, _ = cfg.Init() - core.Cfg = &cfg - cli, err := core.GetRedisCli() - core.RedisCli = cli - core.RestQueueChan = make(chan *RestQueue) - core.WriteLogChan = make(chan *WriteLog) - core.OrderChan = make(chan *private.Order) - if err != nil { - fmt.Println("init redis client err: ", err) - } -} - -func (core *Core) GetRedisCli() (*redis.Client, error) { - ru := core.Cfg.RedisConf.Url - rp := core.Cfg.RedisConf.Password - ri := core.Cfg.RedisConf.Index - re := os.Getenv("REDIS_URL") - if len(re) > 0 { - ru = re - } - client := redis.NewClient(&redis.Options{ - Addr: ru, - Password: rp, //默认空密码 - DB: ri, //使用默认数据库 - }) - pong, err := client.Ping().Result() - if pong == "PONG" && err == nil { - return client, err - } else { - fmt.Println("redis状态不可用:", ru, rp, ri, err) - } - - return client, nil -} - -func (core *Core) GetAllTickerInfo() (*rest.RESTAPIResult, error) { - // GET / 获取所有产品行情信息 - rsp, err := core.RestInvoke("/api/v5/market/tickers?instType=SPOT", rest.GET) - return rsp, err -} - -func (core *Core) GetBalances() (*rest.RESTAPIResult, error) { - // TODO 临时用了两个实现,restInvoke,复用原来的会有bug,不知道是谁的bug - rsp, err := core.RestInvoke2("/api/v5/account/balance", rest.GET, nil) - return rsp, err -} -func (core *Core) GetLivingOrderList() ([]*private.Order, error) { - // TODO 临时用了两个实现,restInvoke,复用原来的会有bug,不知道是谁的bug - params := make(map[string]interface{}) - data, err := core.RestInvoke2("/api/v5/trade/orders-pending", rest.GET, ¶ms) - odrsp := private.OrderResp{} - err = json.Unmarshal([]byte(data.Body), &odrsp) - str, _ := json.Marshal(odrsp) - fmt.Println("convert: err:", err, " body: ", data.Body, odrsp, " string:", string(str)) - list, err := odrsp.Convert() - fmt.Println("loopLivingOrder response data:", str) - fmt.Println(utils.GetFuncName(), " 当前数据是 ", data.V5Response.Code, " list len:", len(list)) - return list, err -} -func (core *Core) LoopInstrumentList() error { - for { - time.Sleep(3 * time.Second) - ctype := ws.SPOT - - redisCli := core.RedisCli - counts, err := redisCli.HLen("instruments|" + ctype + "|hash").Result() - if err != nil { - fmt.Println("err of hset to redis:", err) - } - if counts == 0 { - continue - } - break - } - return nil -} -func (core *Core) SubscribeTicker(op string) error { - mp := make(map[string]string) - - redisCli := core.RedisCli - ctype := ws.SPOT - mp, err := redisCli.HGetAll("instruments|" + ctype + "|hash").Result() - b, err := json.Marshal(mp) - js, err := simple.NewJson(b) - if err != nil { - fmt.Println("err of unMarshalJson3:", js) - } - // fmt.Println("ticker js: ", js) - instAry := js.MustMap() - for k, v := range instAry { - b = []byte(v.(string)) - _, err := simple.NewJson(b) - if err != nil { - fmt.Println("err of unMarshalJson4:", js) - } - time.Sleep(5 * time.Second) - go func(instId string, op string) { - - redisCli := core.RedisCli - _, err = redisCli.SAdd("tickers|"+op+"|set", instId).Result() - if err != nil { - fmt.Println("err of unMarshalJson5:", js) - } - }(k, op) - } - return nil -} - -func (core *Core) v5PublicInvoke(subUrl string) (*CandleData, error) { - restUrl, _ := core.Cfg.Config.Get("connect").Get("restBaseUrl").String() - url := restUrl + subUrl - resp, err := http.Get(url) - if err != nil { - return nil, err - } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - var result CandleData - if err := json.Unmarshal(body, &result); err != nil { - return nil, err - } - - return &result, nil -} -func (core *Core) RestInvoke(subUrl string, method string) (*rest.RESTAPIResult, error) { - restUrl, _ := core.Cfg.Config.Get("connect").Get("restBaseUrl").String() - //ep, method, uri string, param *map[string]interface{} - rest := rest.NewRESTAPI(restUrl, method, subUrl, nil) - key, _ := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessKey").String() - secure, _ := core.Cfg.Config.Get("credentialReadOnly").Get("secretKey").String() - pass, _ := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessPassphrase").String() - isDemo := false - if core.Env == "demoEnv" { - isDemo = true - } - rest.SetSimulate(isDemo).SetAPIKey(key, secure, pass) - response, err := rest.Run(context.Background()) - if err != nil { - fmt.Println("restInvoke1 err:", subUrl, err) - } - return response, err -} - -func (core *Core) RestInvoke2(subUrl string, method string, param *map[string]interface{}) (*rest.RESTAPIResult, error) { - key, err1 := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessKey").String() - secret, err2 := core.Cfg.Config.Get("credentialReadOnly").Get("secretKey").String() - pass, err3 := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessPassphrase").String() - userId, err4 := core.Cfg.Config.Get("connect").Get("userId").String() - restUrl, err5 := core.Cfg.Config.Get("connect").Get("restBaseUrl").String() - if err1 != nil || err2 != nil || err3 != nil || err4 != nil || err5 != nil { - fmt.Println(err1, err2, err3, err4, err5) - } else { - fmt.Println("key:", key, secret, pass, "userId:", userId, "restUrl: ", restUrl) - } - // reqParam := make(map[string]interface{}) - // if param != nil { - // reqParam = *param - // } - // rs := rest.NewRESTAPI(restUrl, method, subUrl, &reqParam) - isDemo := false - if core.Env == "demoEnv" { - isDemo = true - } - // rs.SetSimulate(isDemo).SetAPIKey(key, secret, pass).SetUserId(userId) - // response, err := rs.Run(context.Background()) - // if err != nil { - // fmt.Println("restInvoke2 err:", subUrl, err) - // } - apikey := rest.APIKeyInfo{ - ApiKey: key, - SecKey: secret, - PassPhrase: pass, - } - cli := rest.NewRESTClient(restUrl, &apikey, isDemo) - rsp, err := cli.Get(context.Background(), subUrl, param) - if err != nil { - return rsp, err - } - fmt.Println("response:", rsp, err) - return rsp, err -} - -func (core *Core) RestPost(subUrl string, param *map[string]interface{}) (*rest.RESTAPIResult, error) { - key, err1 := core.Cfg.Config.Get("credentialMutable").Get("okAccessKey").String() - secret, err2 := core.Cfg.Config.Get("credentialMutable").Get("secretKey").String() - pass, err3 := core.Cfg.Config.Get("credentialMutable").Get("okAccessPassphrase").String() - userId, err4 := core.Cfg.Config.Get("connect").Get("userId").String() - restUrl, err5 := core.Cfg.Config.Get("connect").Get("restBaseUrl").String() - if err1 != nil || err2 != nil || err3 != nil || err4 != nil || err5 != nil { - fmt.Println(err1, err2, err3, err4, err5) - } else { - fmt.Println("key:", key, secret, pass, "userId:", userId, "restUrl: ", restUrl) - } - // 请求的另一种方式 - apikey := rest.APIKeyInfo{ - ApiKey: key, - SecKey: secret, - PassPhrase: pass, - } - isDemo := false - if core.Env == "demoEnv" { - isDemo = true - } - cli := rest.NewRESTClient(restUrl, &apikey, isDemo) - rsp, err := cli.Post(context.Background(), subUrl, param) - if err != nil { - return rsp, err - } - return rsp, err -} - -// 我当前持有的币,每分钟刷新 -func (core *Core) GetMyFavorList() []string { - redisCli := core.RedisCli - opt := redis.ZRangeBy{ - Min: "10", - Max: "100000000000", - } - cary, _ := redisCli.ZRevRangeByScore("private|positions|sortedSet", opt).Result() - cl := []string{} - for _, v := range cary { - cry := strings.Split(v, "|")[0] + "-USDT" - cl = append(cl, cry) - } - return cary -} - -// 得到交易量排行榜,订阅其中前N名的各维度k线,并merge进来我已经购买的币列表,这个列表是动态更新的 -// 改了,不需要交易排行榜,我手动指定一个排行即可, tickersVol|sortedSet 改成 tickersList|sortedSet -func (core *Core) GetScoreList(count int) []string { - - redisCli := core.RedisCli - - curList, err := redisCli.ZRange("tickersList|sortedSet", 0, int64(count-1)).Result() - if err != nil { - fmt.Println("zrevrange err:", err) - } - fmt.Println("curList: ", curList) - return curList -} - -func LoopBalances(cr *Core, mdura time.Duration) { - //协程:动态维护topScore - ticker := time.NewTicker(mdura) - for { - select { - case <-ticker.C: - //协程:循环执行rest请求candle - fmt.Println("loopBalance: receive ccyChannel start") - RestBalances(cr) - } - } -} -func LoopLivingOrders(cr *Core, mdura time.Duration) { - //协程:动态维护topScore - ticker := time.NewTicker(mdura) - for { - select { - case <-ticker.C: - //协程:循环执行rest请求candle - fmt.Println("loopLivingOrder: receive ccyChannel start") - RestLivingOrder(cr) - } - } -} - -func RestBalances(cr *Core) ([]*private.Ccy, error) { - // fmt.Println("restBalance loopBalance loop start") - ccyList := []*private.Ccy{} - rsp, err := cr.GetBalances() - if err != nil { - fmt.Println("loopBalance err00: ", err) - } - fmt.Println("loopBalance balance rsp: ", rsp) - if err != nil { - fmt.Println("loopBalance err01: ", err) - return ccyList, err - } - if len(rsp.Body) == 0 { - fmt.Println("loopBalance err03: rsp body is null") - return ccyList, err - } - js1, err := simple.NewJson([]byte(rsp.Body)) - if err != nil { - fmt.Println("loopBalance err1: ", err) - } - itemList := js1.Get("data").GetIndex(0).Get("details").MustArray() - // maxTickers是重点关注的topScore的coins的数量 - cli := cr.RedisCli - for _, v := range itemList { - js, _ := json.Marshal(v) - ccyResp := private.CcyResp{} - err := json.Unmarshal(js, &ccyResp) - if err != nil { - fmt.Println("loopBalance err2: ", err) - } - ccy, err := ccyResp.Convert() - ccyList = append(ccyList, ccy) - if err != nil { - fmt.Println("loopBalance err2: ", err) - } - z := redis.Z{ - Score: ccy.EqUsd, - Member: ccy.Ccy + "|position|key", - } - res, err := cli.ZAdd("private|positions|sortedSet", z).Result() - if err != nil { - fmt.Println("loopBalance err3: ", res, err) - } - res1, err := cli.Set(ccy.Ccy+"|position|key", js, 0).Result() - if err != nil { - fmt.Println("loopBalance err4: ", res1, err) - } - bjs, _ := json.Marshal(ccy) - tsi := time.Now().Unix() - tsii := tsi - tsi%600 - tss := strconv.FormatInt(tsii, 10) - cli.Set(CCYPOSISTIONS_PUBLISH+"|ts:"+tss, 1, 10*time.Minute).Result() - fmt.Println("ccy published: ", string(bjs)) - //TODO FIXME 50毫秒,每分钟上限是1200个订单,超过就无法遍历完成 - time.Sleep(50 * time.Millisecond) - suffix := "" - if cr.Env == "demoEnv" { - suffix = "-demoEnv" - } - // TODO FIXME cli2 - cli.Publish(CCYPOSISTIONS_PUBLISH+suffix, string(bjs)).Result() - } - return ccyList, nil -} - -func RestLivingOrder(cr *Core) ([]*private.Order, error) { - // fmt.Println("restOrder loopOrder loop start") - orderList := []*private.Order{} - list, err := cr.GetLivingOrderList() - if err != nil { - fmt.Println("loopLivingOrder err00: ", err) - } - fmt.Println("loopLivingOrder response:", list) - go func() { - for _, v := range list { - fmt.Println("order orderV:", v) - time.Sleep(30 * time.Millisecond) - cr.OrderChan <- v - } - }() - return orderList, nil -} - -func (cr *Core) ProcessOrder(od *private.Order) error { - // publish - go func() { - suffix := "" - if cr.Env == "demoEnv" { - suffix = "-demoEnv" - } - cn := ORDER_PUBLISH + suffix - bj, _ := json.Marshal(od) - - // TODO FIXME cli2 - res, _ := cr.RedisCli.Publish(cn, string(bj)).Result() - fmt.Println("order publish res: ", res, " content: ", string(bj)) - rsch := ORDER_RESP_PUBLISH + suffix - bj1, _ := json.Marshal(res) - - // TODO FIXME cli2 - res, _ = cr.RedisCli.Publish(rsch, string(bj1)).Result() - }() - return nil -} - -func (cr *Core) DispatchSubAction(action *SubAction) error { - go func() { - suffix := "" - if cr.Env == "demoEnv" { - suffix = "-demoEnv" - } - fmt.Println("action: ", action.ActionName, action.MetaInfo) - res, err := cr.RestPostWrapper("/api/v5/trade/"+action.ActionName, action.MetaInfo) - if err != nil { - fmt.Println(utils.GetFuncName(), " actionRes 1:", err) - } - rsch := ORDER_RESP_PUBLISH + suffix - bj1, _ := json.Marshal(res) - - // TODO FIXME cli2 - rs, _ := cr.RedisCli.Publish(rsch, string(bj1)).Result() - fmt.Println("action rs: ", rs) - }() - return nil -} - -func (cr *Core) RestPostWrapper(url string, param map[string]interface{}) (rest.Okexv5APIResponse, error) { - suffix := "" - if cr.Env == "demoEnv" { - suffix = "-demoEnv" - } - res, err := cr.RestPost(url, ¶m) - fmt.Println("actionRes 2:", res.V5Response.Msg, res.V5Response.Data, err) - bj, _ := json.Marshal(res) - - // TODO FIXME cli2 - cr.RedisCli.Publish(ORDER_RESP_PUBLISH+suffix, string(bj)) - return res.V5Response, nil -} - -func (cr *Core) AddToGeneralCandleChnl(candle *Candle, channels []string) { - redisCli := cr.RedisCli - ab, _ := json.Marshal(candle) - for _, v := range channels { - suffix := "" - env := os.Getenv("GO_ENV") - if env == "demoEnv" { - suffix = "-demoEnv" - } - vd := v + suffix - _, err := redisCli.Publish(vd, string(ab)).Result() - if err != nil { - fmt.Println("err of ma7|ma30 add to redis2:", err, candle.From) - } - } -} diff --git a/core/ticker.go b/core/ticker.go deleted file mode 100644 index 650b26d..0000000 --- a/core/ticker.go +++ /dev/null @@ -1,70 +0,0 @@ -package core - -import ( - "fmt" - "reflect" - "strconv" -) - -type TickerInfo struct { - Id string `json:"_id"` - InstId string `json:"instId"` - Last float64 `json:"last"` - InstType string `json:"instType"` - VolCcy24h float64 `json:"volCcy24h"` - Ts int64 `json:"ts"` -} - -type TickerInfoResp struct { - InstId string `json:"instId"` - Last string `json:"last"` - InstType string `json:"instType"` - VolCcy24h string `json:"volCcy24h"` - Ts string `json:"ts"` -} - -func (tir *TickerInfoResp) Convert() TickerInfo { - ti := TickerInfo{ - Id: HashString(tir.InstId + tir.Ts), - InstId: tir.InstId, - InstType: tir.InstType, - Last: ToFloat64(tir.Last), - VolCcy24h: ToFloat64(tir.VolCcy24h), - Ts: ToInt64(tir.Ts), - } - return ti -} - -func ToString(val interface{}) string { - valstr := "" - if reflect.TypeOf(val).Name() == "string" { - valstr = val.(string) - } else if reflect.TypeOf(val).Name() == "float64" { - valstr = fmt.Sprintf("%f", val) - } else if reflect.TypeOf(val).Name() == "int64" { - valstr = strconv.FormatInt(val.(int64), 16) - } - return valstr -} - -func ToInt64(val interface{}) int64 { - vali := int64(0) - if reflect.TypeOf(val).Name() == "string" { - vali, _ = strconv.ParseInt(val.(string), 10, 64) - } else if reflect.TypeOf(val).Name() == "float64" { - vali = int64(val.(float64)) - } - return vali -} - -func ToFloat64(val interface{}) float64 { - valf := float64(0) - if reflect.TypeOf(val).Name() == "string" { - valf, _ = strconv.ParseFloat(val.(string), 64) - } else if reflect.TypeOf(val).Name() == "float64" { - valf = val.(float64) - } else if reflect.TypeOf(val).Name() == "int64" { - valf = float64(val.(int64)) - } - return valf -} diff --git a/core/writeLog.go b/core/writeLog.go deleted file mode 100644 index e16719d..0000000 --- a/core/writeLog.go +++ /dev/null @@ -1,32 +0,0 @@ -package core - -import ( - "bytes" - "fmt" - "net/http" - "os" - - logrus "github.com/sirupsen/logrus" -) - -type WriteLog struct { - Content []byte - Tag string - Id string -} - -func (wg *WriteLog) Process(cr *Core) error { - go func() { - reqBody := bytes.NewBuffer(wg.Content) - cr.Env = os.Getenv("GO_ENV") - cr.FluentBitUrl = os.Getenv("TEXUS_FluentBitUrl") - fullUrl := "http://" + cr.FluentBitUrl + "/" + wg.Tag - res, err := http.Post(fullUrl, "application/json", reqBody) - - fmt.Println("requested, response:", fullUrl, string(wg.Content), res) - if err != nil { - logrus.Error(err) - } - }() - return nil -} diff --git a/go.mod b/go.mod index 36435fa..15d573f 100644 --- a/go.mod +++ b/go.mod @@ -11,20 +11,21 @@ go 1.21 require ( github.com/bitly/go-simplejson v0.5.0 - github.com/go-redis/redis v6.15.9+incompatible - github.com/sirupsen/logrus v1.9.3 + github.com/phyer/core v0.0.0-20241212012123-7c451a49d860 v5sdk_go/rest v0.0.0-00010101000000-000000000000 - v5sdk_go/ws v0.0.0-00010101000000-000000000000 ) require ( github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect + github.com/go-redis/redis v6.15.9+incompatible // indirect github.com/gorilla/websocket v1.5.1 // indirect github.com/kr/pretty v0.3.0 // indirect github.com/onsi/ginkgo v1.16.4 // indirect github.com/onsi/gomega v1.16.0 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect golang.org/x/net v0.17.0 // indirect golang.org/x/sys v0.13.0 // indirect v5sdk_go/config v0.0.0-00010101000000-000000000000 // indirect v5sdk_go/utils v0.0.0-00010101000000-000000000000 // indirect + v5sdk_go/ws v0.0.0-00010101000000-000000000000 // indirect ) diff --git a/go.sum b/go.sum index 74c559e..91e8549 100644 --- a/go.sum +++ b/go.sum @@ -46,6 +46,8 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.16.0 h1:6gjqkI8iiRHMvdccRJM8rVKjCWk6ZIm6FTm3ddIe4/c= github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= +github.com/phyer/core v0.0.0-20241212012123-7c451a49d860 h1:BZvZ7g/dPudXwCB9QNq9vPZb2xDbfTPWHVqyDGXSNBk= +github.com/phyer/core v0.0.0-20241212012123-7c451a49d860/go.mod h1:LyfJrdqSlm2MTOx0M3pnDntpwa64XD5nf0xYxvZ4El4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= diff --git a/main.go b/main.go index 4b779f3..b9b4a96 100644 --- a/main.go +++ b/main.go @@ -13,7 +13,7 @@ import ( simple "github.com/bitly/go-simplejson" // "github.com/go-redis/redis" - "github.com/phyer/texus/core" + "github.com/phyer/core" // "github.com/phyer/texus/private" "github.com/phyer/texus/utils" ) diff --git a/texus b/texus deleted file mode 100755 index f684b96..0000000 Binary files a/texus and /dev/null differ