This commit is contained in:
zhangkun 2024-12-15 00:17:59 +08:00
parent 7a6893be16
commit 47bd02f138
5 changed files with 856 additions and 380 deletions

View File

@ -15,6 +15,7 @@ import (
simple "github.com/bitly/go-simplejson" simple "github.com/bitly/go-simplejson"
"github.com/go-redis/redis" "github.com/go-redis/redis"
"github.com/phyer/texus/utils" "github.com/phyer/texus/utils"
logrus "github.com/sirupsen/logrus"
) )
type Candle struct { type Candle struct {
@ -32,32 +33,28 @@ type Candle struct {
VolCcy float64 VolCcy float64
Confirm bool Confirm bool
} }
type Sample interface {
type MaX struct { SetToKey(cr *Core) ([]interface{}, error)
InstID string `json:"instID"`
Period string `json:"period"`
KeyName string `json:"keyName"`
Data []interface{} `json:"data"`
Count int `json:"count,number"`
Ts int64 `json:"ts,number"`
AvgVal float64 `json:"avgVal,number"`
From string `json:"from,string"`
} }
type SampleList interface {
// 从左边插入一个元素,把超过长度的元素顶出去
RPush(sp Sample) (Sample, error)
// 得到一个切片end一般是0代表末尾元素start是负值-3代表倒数第三个
// start-10, end: -3 代表从倒数第10个到倒数第三个之间的元素组成的切片。
GetSectionOf(start int, end int) ([]*Sample, error)
}
type CandleList struct { type CandleList struct {
Count int `json:"count,number"` Count int `json:"count,number"`
LastUpdateTime int64 `json:"lastUpdateTime"` LastUpdateTime int64 `json:"lastUpdateTime"`
UpdateNickName string `json:"updateNickName"` UpdateNickName string `json:"updateNickName"`
List []*Candle `json:"list"` List []*Candle `json:"list"`
} }
type CandleSegment struct {
type MaXList struct { StartTime string `json:"startTime"`
Count int `json:"count"` Enabled bool `json:"enabled,bool"`
LastUpdateTime int64 `json:"lastUpdateTime"` Seg string `json:"seg"`
UpdateNickName string `json:"updateNickName"`
List []*MaX `json:"list"`
} }
type MatchCheck struct { type MatchCheck struct {
Minutes int64 Minutes int64
Matched bool Matched bool
@ -136,7 +133,7 @@ func (core *Core) GetCandlesWithRest(instId string, kidx int, dura time.Duration
WithWs: true, WithWs: true,
} }
js, _ := json.Marshal(restQ) js, _ := json.Marshal(restQ)
core.RedisCli.LPush("restQueue", js) core.RedisLocalCli.LPush("restQueue", js)
}(i) }(i)
i++ i++
} }
@ -291,32 +288,12 @@ func (cl *Candle) ToStruct(core *Core) (*Candle, error) {
return &ncd, nil return &ncd, nil
} }
func (mx *MaX) SetToKey() ([]interface{}, error) {
cstr := strconv.Itoa(mx.Count)
tss := strconv.FormatInt(mx.Ts, 10)
keyName := "ma" + cstr + "|candle" + mx.Period + "|" + mx.InstId + "|ts:" + tss
//过期时间:根号(当前candle的周期/1分钟)*10000
dt := []interface{}{}
dt = append(dt, mx.Ts)
dt = append(dt, mx.Value)
dj, _ := json.Marshal(dt)
exp := mx.Core.PeriodToMinutes(mx.Period)
expf := utils.Sqrt(float64(exp)) * 100
extt := time.Duration(expf) * time.Minute
// loc, _ := time.LoadLocation("Asia/Shanghai")
// tm := time.UnixMilli(mx.Ts).In(loc).Format("2006-01-02 15:04")
// fmt.Println("setToKey:", keyName, "ts:", tm, string(dj), "from: ", mx.From)
_, err := mx.Core.RedisCli.GetSet(keyName, dj).Result()
mx.Core.RedisCli.Expire(keyName, extt)
return dt, err
}
// 保证同一个 period, keyName 在一个周期里SaveToSortSet只会被执行一次 // 保证同一个 period, keyName 在一个周期里SaveToSortSet只会被执行一次
func (core *Core) SaveUniKey(period string, keyName string, extt time.Duration, tsi int64, cl *Candle) { func (core *Core) SaveUniKey(period string, keyName string, extt time.Duration, tsi int64) {
refName := keyName + "|refer" refName := keyName + "|refer"
refRes, _ := core.RedisCli.GetSet(refName, 1).Result() refRes, _ := core.RedisLocalCli.GetSet(refName, 1).Result()
core.RedisCli.Expire(refName, extt) core.RedisLocalCli.Expire(refName, extt)
// 为保证唯一性机制防止SaveToSortSet 被重复执行 // 为保证唯一性机制防止SaveToSortSet 被重复执行
if len(refRes) != 0 { if len(refRes) != 0 {
fmt.Println("refName exist: ", refName) fmt.Println("refName exist: ", refName)
@ -334,7 +311,7 @@ func (core *Core) SaveToSortSet(period string, keyName string, extt time.Duratio
Score: float64(tsi), Score: float64(tsi),
Member: keyName, Member: keyName,
} }
rs, err := core.RedisCli.ZAdd(setName, z).Result() rs, err := core.RedisLocalCli.ZAdd(setName, z).Result()
if err != nil { if err != nil {
fmt.Println("err of ma7|ma30 add to redis:", err) fmt.Println("err of ma7|ma30 add to redis:", err)
} else { } else {
@ -342,10 +319,15 @@ func (core *Core) SaveToSortSet(period string, keyName string, extt time.Duratio
} }
} }
func (cr *Core) PeriodToMinutes(period string) int64 { // 根据周期的文本内容,返回这代表多少个分钟
func (cr *Core) PeriodToMinutes(period string) (int64, error) {
ary := strings.Split(period, "") ary := strings.Split(period, "")
beiStr := "1" beiStr := "1"
danwei := "" danwei := ""
if len(ary) == 0 {
err := errors.New(utils.GetFuncName() + " period is block")
return 0, err
}
if len(ary) == 3 { if len(ary) == 3 {
beiStr = ary[0] + ary[1] beiStr = ary[0] + ary[1]
danwei = ary[2] danwei = ary[2]
@ -388,10 +370,11 @@ func (cr *Core) PeriodToMinutes(period string) int64 {
} }
default: default:
{ {
fmt.Println("notmatch:", danwei) logrus.Warning("notmatch:", danwei, period)
panic("notmatch:" + period)
} }
} }
return int64(cheng) return int64(cheng), nil
} }
// type ScanCmd struct { // type ScanCmd struct {
@ -404,7 +387,7 @@ func (cr *Core) PeriodToMinutes(period string) int64 {
// } // }
func (core *Core) GetRangeKeyList(pattern string, from time.Time) ([]*simple.Json, error) { func (core *Core) GetRangeKeyList(pattern string, from time.Time) ([]*simple.Json, error) {
// 比如用来计算ma30或ma7倒推多少时间范围 // 比如用来计算ma30或ma7倒推多少时间范围
redisCli := core.RedisCli redisCli := core.RedisLocalCli
cursor := uint64(0) cursor := uint64(0)
n := 0 n := 0
allTs := []int64{} allTs := []int64{}
@ -467,7 +450,10 @@ func (cl *Candle) SetToKey(core *Core) ([]interface{}, error) {
//过期时间:根号(当前candle的周期/1分钟)*10000 //过期时间:根号(当前candle的周期/1分钟)*10000
dt, err := json.Marshal(cl.Data) dt, err := json.Marshal(cl.Data)
exp := core.PeriodToMinutes(cl.Period) exp, err := core.PeriodToMinutes(cl.Period)
if err != nil {
fmt.Println("err of PeriodToMinutes:", err)
}
// expf := float64(exp) * 60 // expf := float64(exp) * 60
expf := utils.Sqrt(float64(exp)) * 100 expf := utils.Sqrt(float64(exp)) * 100
extt := time.Duration(expf) * time.Minute extt := time.Duration(expf) * time.Minute
@ -484,10 +470,10 @@ func (cl *Candle) SetToKey(core *Core) ([]interface{}, error) {
err = errors.New("price有问题") err = errors.New("price有问题")
return cl.Data, err return cl.Data, err
} }
redisCli := core.RedisCli redisCli := core.RedisLocalCli
// tm := time.UnixMilli(tsi).Format("2006-01-02 15:04") // tm := time.UnixMilli(tsi).Format("2006-01-02 15:04")
fmt.Println("setToKey:", keyName, "ts: ", "price: ", curPrice, "from:", cl.From) fmt.Println("setToKey:", keyName, "ts: ", "price: ", curPrice, "from:", cl.From)
redisCli.Set(keyName, dt, extt).Result() redisCli.Set(keyName, dt, extt).Result()
core.SaveUniKey(cl.Period, keyName, extt, tsi, cl) core.SaveUniKey(cl.Period, keyName, extt, tsi)
return cl.Data, err return cl.Data, err
} }

View File

@ -11,42 +11,42 @@ import (
) )
type MyConfig struct { type MyConfig struct {
Env string `json:"env", string` Env string `json:"env"`
Config *simple.Json Config *simple.Json
CandleDimentions []string `json:"candleDimentions"` CandleDimentions []string `json:"candleDimentions"`
RedisConf *RedisConfig `json:"redis"` RedisConf *RedisConfig `json:"redis"`
CredentialReadOnlyConf *CredentialConfig `json:"credential"` CredentialReadOnlyConf *CredentialConfig `json:"credential"`
CredentialMutableConf *CredentialConfig `json:"credential"` CredentialMutableConf *CredentialConfig `json:"credentialMutable"`
ConnectConf *ConnectConfig `json:"connect"` ConnectConf *ConnectConfig `json:"connect"`
// ThreadsConf *ThreadsConfig `json:"threads"` // ThreadsConf *ThreadsConfig `json:"threads"`
} }
type RedisConfig struct { type RedisConfig struct {
Url string `json:"url", string` Url string `json:"url"`
Password string `json:"password", string` Password string `json:"password"`
Index int `json:"index", string` Index int `json:"index"`
} }
type CredentialConfig struct { type CredentialConfig struct {
SecretKey string `json:"secretKey", string` SecretKey string `json:"secretKey"`
BaseUrl string `json:"baseUrl", string` BaseUrl string `json:"baseUrl"`
OkAccessKey string `json:"okAccessKey", string` OkAccessKey string `json:"okAccessKey"`
OkAccessPassphrase string `json:"okAccessPassphrase", string` OkAccessPassphrase string `json:"okAccessPassphrase"`
} }
type ConnectConfig struct { type ConnectConfig struct {
LoginSubUrl string `json:"loginSubUrl", string` LoginSubUrl string `json:"loginSubUrl"`
WsPrivateBaseUrl string `json:"wsPrivateBaseUrl", string` WsPrivateBaseUrl string `json:"wsPrivateBaseUrl"`
WsPublicBaseUrl string `json:"wsPublicBaseUrl", string` WsPublicBaseUrl string `json:"wsPublicBaseUrl"`
RestBaseUrl string `json:"restBaseUrl", string` RestBaseUrl string `json:"restBaseUrl"`
} }
type ThreadsConfig struct { type ThreadsConfig struct {
MaxLenTickerStream int `json:"maxLenTickerStream", int` MaxLenTickerStream int `json:"maxLenTickerStream"`
MaxCandles int `json:"maxCandles", string` MaxCandles int `json:"maxCandles"`
AsyncChannels int `json:"asyncChannels", int` AsyncChannels int `json:"asyncChannels"`
MaxTickers int `json:"maxTickers", int` MaxTickers int `json:"maxTickers"`
RestPeriod int `json:"restPeriod", int` RestPeriod int `json:"restPeriod"`
WaitWs int `json:"waitWs", int` WaitWs int `json:"waitWs"`
} }
func (cfg MyConfig) Init() (MyConfig, error) { func (cfg MyConfig) Init() (MyConfig, error) {

773
core.go
View File

@ -1,9 +1,9 @@
package core package core
import ( import (
"context" // "context"
"encoding/json" "encoding/json"
// "errors" "errors"
"fmt" "fmt"
// "math/rand" // "math/rand"
"io/ioutil" "io/ioutil"
@ -14,18 +14,18 @@ import (
"sync" "sync"
"time" "time"
simple "github.com/bitly/go-simplejson" // simple "github.com/bitly/go-simplejson"
// "v5sdk_go/ws/wImpl" // "v5sdk_go/ws/wImpl"
"github.com/go-redis/redis" "github.com/go-redis/redis"
"github.com/phyer/texus/private" "github.com/phyer/texus/private"
"github.com/phyer/texus/utils" // "github.com/phyer/texus/utils"
logrus "github.com/sirupsen/logrus"
) )
type Core struct { type Core struct {
Env string Env string
Cfg *MyConfig Cfg *MyConfig
RedisLocalCli *redis.Client RedisLocalCli *redis.Client
RedisLocal2Cli *redis.Client
RedisRemoteCli *redis.Client RedisRemoteCli *redis.Client
FluentBitUrl string FluentBitUrl string
// PlateMap map[string]*Plate // PlateMap map[string]*Plate
@ -43,6 +43,8 @@ type Core struct {
MakeMaXsChan chan *Candle MakeMaXsChan chan *Candle
// ShearForceGrpChan chan *ShearForceGrp // ShearForceGrpChan chan *ShearForceGrp
InvokeRestQueueChan chan *RestQueue InvokeRestQueueChan chan *RestQueue
RedisLocal2Cli *redis.Client
RestQueueChan chan *RestQueue
RestQueue RestQueue
WriteLogChan chan *WriteLog WriteLogChan chan *WriteLog
} }
@ -56,6 +58,7 @@ type RestQueue struct {
Duration time.Duration Duration time.Duration
WithWs bool WithWs bool
} }
type CandleData struct { type CandleData struct {
Code string `json:"code"` Code string `json:"code"`
Msg string `json:"msg"` Msg string `json:"msg"`
@ -107,10 +110,10 @@ func WriteLogProcess(cr *Core) {
} }
} }
func (cr *Core) ShowSysTime() { // func (cr *Core) ShowSysTime() {
rsp, _ := cr.RestInvoke("/api/v5/public/time", rest.GET) // rsp, _ := cr.RestInvoke("/api/v5/public/time", rest.GET)
fmt.Println("serverSystem time:", rsp) // fmt.Println("serverSystem time:", rsp)
} // }
func (core *Core) Init() { func (core *Core) Init() {
core.Env = os.Getenv("GO_ENV") core.Env = os.Getenv("GO_ENV")
@ -123,16 +126,17 @@ func (core *Core) Init() {
cfg := MyConfig{} cfg := MyConfig{}
cfg, _ = cfg.Init() cfg, _ = cfg.Init()
core.Cfg = &cfg core.Cfg = &cfg
cli, err := core.GetRedisCli() cli, err := core.GetRedisLocalCli()
core.RedisCli = cli core.RedisLocalCli = cli
core.RestQueueChan = make(chan *RestQueue) core.RestQueueChan = make(chan *RestQueue)
core.WriteLogChan = make(chan *WriteLog) core.WriteLogChan = make(chan *WriteLog)
core.OrderChan = make(chan *private.Order) // 跟订单有关的都关掉
// core.OrderChan = make(chan *private.Order)
if err != nil { if err != nil {
fmt.Println("init redis client err: ", err) fmt.Println("init redis client err: ", err)
} }
} }
func (core *Core) GetRemoteRedisCli() (*redis.Client, error) { func (core *Core) GetRemoteRedisLocalCli() (*redis.Client, error) {
ru := core.Cfg.RedisConf.Url ru := core.Cfg.RedisConf.Url
rp := core.Cfg.RedisConf.Password rp := core.Cfg.RedisConf.Password
ri := core.Cfg.RedisConf.Index ri := core.Cfg.RedisConf.Index
@ -154,7 +158,7 @@ func (core *Core) GetRemoteRedisCli() (*redis.Client, error) {
return client, nil return client, nil
} }
func (core *Core) GetRedisCli() (*redis.Client, error) { func (core *Core) GetRedisLocalCli() (*redis.Client, error) {
ru := core.Cfg.RedisConf.Url ru := core.Cfg.RedisConf.Url
rp := core.Cfg.RedisConf.Password rp := core.Cfg.RedisConf.Password
ri := core.Cfg.RedisConf.Index ri := core.Cfg.RedisConf.Index
@ -177,80 +181,85 @@ func (core *Core) GetRedisCli() (*redis.Client, error) {
return client, nil return client, nil
} }
func (core *Core) GetAllTickerInfo() (*rest.RESTAPIResult, error) { // 这些应该是放到 texus 里实现的
// GET / 获取所有产品行情信息 // func (core *Core) GetAllTickerInfo() (*rest.RESTAPIResult, error) {
rsp, err := core.RestInvoke("/api/v5/market/tickers?instType=SPOT", rest.GET) // // GET / 获取所有产品行情信息
return rsp, err // rsp, err := core.RestInvoke("/api/v5/market/tickers?instType=SPOT", rest.GET)
} // return rsp, err
// }
func (core *Core) GetBalances() (*rest.RESTAPIResult, error) { // 这些跟 订单有关,都关掉
// TODO 临时用了两个实现restInvoke复用原来的会有bug不知道是谁的bug //
rsp, err := core.RestInvoke2("/api/v5/account/balance", rest.GET, nil) // func (core *Core) GetBalances() (*rest.RESTAPIResult, error) {
return rsp, err // // TODO 临时用了两个实现restInvoke复用原来的会有bug不知道是谁的bug
} // rsp, err := core.RestInvoke2("/api/v5/account/balance", rest.GET, nil)
func (core *Core) GetLivingOrderList() ([]*private.Order, error) { // return rsp, err
// TODO 临时用了两个实现restInvoke复用原来的会有bug不知道是谁的bug // }
params := make(map[string]interface{}) //
data, err := core.RestInvoke2("/api/v5/trade/orders-pending", rest.GET, &params) // func (core *Core) GetLivingOrderList() ([]*private.Order, error) {
odrsp := private.OrderResp{} // // TODO 临时用了两个实现restInvoke复用原来的会有bug不知道是谁的bug
err = json.Unmarshal([]byte(data.Body), &odrsp) // params := make(map[string]interface{})
str, _ := json.Marshal(odrsp) // data, err := core.RestInvoke2("/api/v5/trade/orders-pending", rest.GET, &params)
fmt.Println("convert: err:", err, " body: ", data.Body, odrsp, " string:", string(str)) // odrsp := private.OrderResp{}
list, err := odrsp.Convert() // err = json.Unmarshal([]byte(data.Body), &odrsp)
fmt.Println("loopLivingOrder response data:", str) // str, _ := json.Marshal(odrsp)
fmt.Println(utils.GetFuncName(), " 当前数据是 ", data.V5Response.Code, " list len:", len(list)) // fmt.Println("convert: err:", err, " body: ", data.Body, odrsp, " string:", string(str))
return list, err // list, err := odrsp.Convert()
} // fmt.Println("loopLivingOrder response data:", str)
func (core *Core) LoopInstrumentList() error { // fmt.Println(utils.GetFuncName(), " 当前数据是 ", data.V5Response.Code, " list len:", len(list))
for { // return list, err
time.Sleep(3 * time.Second) // }
ctype := ws.SPOT // func (core *Core) LoopInstrumentList() error {
// for {
redisCli := core.RedisCli // time.Sleep(3 * time.Second)
counts, err := redisCli.HLen("instruments|" + ctype + "|hash").Result() // ctype := ws.SPOT
if err != nil { //
fmt.Println("err of hset to redis:", err) // redisCli := core.RedisLocalCli
} // counts, err := redisCli.HLen("instruments|" + ctype + "|hash").Result()
if counts == 0 { // if err != nil {
continue // fmt.Println("err of hset to redis:", err)
} // }
break // if counts == 0 {
} // continue
return nil // }
} // break
func (core *Core) SubscribeTicker(op string) error { // }
mp := make(map[string]string) // return nil
// }
redisCli := core.RedisCli // func (core *Core) SubscribeTicker(op string) error {
ctype := ws.SPOT // mp := make(map[string]string)
mp, err := redisCli.HGetAll("instruments|" + ctype + "|hash").Result() //
b, err := json.Marshal(mp) // redisCli := core.RedisLocalCli
js, err := simple.NewJson(b) // ctype := ws.SPOT
if err != nil { // mp, err := redisCli.HGetAll("instruments|" + ctype + "|hash").Result()
fmt.Println("err of unMarshalJson3:", js) // b, err := json.Marshal(mp)
} // js, err := simple.NewJson(b)
// fmt.Println("ticker js: ", js) // if err != nil {
instAry := js.MustMap() // fmt.Println("err of unMarshalJson3:", js)
for k, v := range instAry { // }
b = []byte(v.(string)) // // fmt.Println("ticker js: ", js)
_, err := simple.NewJson(b) // instAry := js.MustMap()
if err != nil { // for k, v := range instAry {
fmt.Println("err of unMarshalJson4:", js) // b = []byte(v.(string))
} // _, err := simple.NewJson(b)
time.Sleep(5 * time.Second) // if err != nil {
go func(instId string, op string) { // fmt.Println("err of unMarshalJson4:", js)
// }
redisCli := core.RedisCli // time.Sleep(5 * time.Second)
_, err = redisCli.SAdd("tickers|"+op+"|set", instId).Result() // go func(instId string, op string) {
if err != nil { //
fmt.Println("err of unMarshalJson5:", js) // redisCli := core.RedisLocalCli
} // _, err = redisCli.SAdd("tickers|"+op+"|set", instId).Result()
}(k, op) // if err != nil {
} // fmt.Println("err of unMarshalJson5:", js)
return nil // }
} // }(k, op)
// }
// return nil
// }
// 通过接口获取一个币种名下的某个时间范围内的Candle对象集合 // 通过接口获取一个币种名下的某个时间范围内的Candle对象集合
// 按说这个应该放到 texus里实现
func (core *Core) v5PublicInvoke(subUrl string) (*CandleData, error) { func (core *Core) v5PublicInvoke(subUrl string) (*CandleData, error) {
restUrl, _ := core.Cfg.Config.Get("connect").Get("restBaseUrl").String() restUrl, _ := core.Cfg.Config.Get("connect").Get("restBaseUrl").String()
url := restUrl + subUrl url := restUrl + subUrl
@ -267,96 +276,98 @@ func (core *Core) v5PublicInvoke(subUrl string) (*CandleData, error) {
return &result, nil return &result, nil
} }
func (core *Core) RestInvoke(subUrl string, method string) (*rest.RESTAPIResult, error) {
restUrl, _ := core.Cfg.Config.Get("connect").Get("restBaseUrl").String()
//ep, method, uri string, param *map[string]interface{}
rest := rest.NewRESTAPI(restUrl, method, subUrl, nil)
key, _ := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessKey").String()
secure, _ := core.Cfg.Config.Get("credentialReadOnly").Get("secretKey").String()
pass, _ := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessPassphrase").String()
isDemo := false
if core.Env == "demoEnv" {
isDemo = true
}
rest.SetSimulate(isDemo).SetAPIKey(key, secure, pass)
response, err := rest.Run(context.Background())
if err != nil {
fmt.Println("restInvoke1 err:", subUrl, err)
}
return response, err
}
func (core *Core) RestInvoke2(subUrl string, method string, param *map[string]interface{}) (*rest.RESTAPIResult, error) { // func (core *Core) RestInvoke(subUrl string, method string) (*rest.RESTAPIResult, error) {
key, err1 := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessKey").String() // restUrl, _ := core.Cfg.Config.Get("connect").Get("restBaseUrl").String()
secret, err2 := core.Cfg.Config.Get("credentialReadOnly").Get("secretKey").String() // //ep, method, uri string, param *map[string]interface{}
pass, err3 := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessPassphrase").String() // rest := rest.NewRESTAPI(restUrl, method, subUrl, nil)
userId, err4 := core.Cfg.Config.Get("connect").Get("userId").String() // key, _ := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessKey").String()
restUrl, err5 := core.Cfg.Config.Get("connect").Get("restBaseUrl").String() // secure, _ := core.Cfg.Config.Get("credentialReadOnly").Get("secretKey").String()
if err1 != nil || err2 != nil || err3 != nil || err4 != nil || err5 != nil { // pass, _ := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessPassphrase").String()
fmt.Println(err1, err2, err3, err4, err5) // isDemo := false
} else { // if core.Env == "demoEnv" {
fmt.Println("key:", key, secret, pass, "userId:", userId, "restUrl: ", restUrl) // isDemo = true
} // }
// reqParam := make(map[string]interface{}) // rest.SetSimulate(isDemo).SetAPIKey(key, secure, pass)
// if param != nil { // response, err := rest.Run(context.Background())
// reqParam = *param // if err != nil {
// } // fmt.Println("restInvoke1 err:", subUrl, err)
// rs := rest.NewRESTAPI(restUrl, method, subUrl, &reqParam) // }
isDemo := false // return response, err
if core.Env == "demoEnv" { // }
isDemo = true
}
// rs.SetSimulate(isDemo).SetAPIKey(key, secret, pass).SetUserId(userId)
// response, err := rs.Run(context.Background())
// if err != nil {
// fmt.Println("restInvoke2 err:", subUrl, err)
// }
apikey := rest.APIKeyInfo{
ApiKey: key,
SecKey: secret,
PassPhrase: pass,
}
cli := rest.NewRESTClient(restUrl, &apikey, isDemo)
rsp, err := cli.Get(context.Background(), subUrl, param)
if err != nil {
return rsp, err
}
fmt.Println("response:", rsp, err)
return rsp, err
}
func (core *Core) RestPost(subUrl string, param *map[string]interface{}) (*rest.RESTAPIResult, error) { // func (core *Core) RestInvoke2(subUrl string, method string, param *map[string]interface{}) (*rest.RESTAPIResult, error) {
key, err1 := core.Cfg.Config.Get("credentialMutable").Get("okAccessKey").String() // key, err1 := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessKey").String()
secret, err2 := core.Cfg.Config.Get("credentialMutable").Get("secretKey").String() // secret, err2 := core.Cfg.Config.Get("credentialReadOnly").Get("secretKey").String()
pass, err3 := core.Cfg.Config.Get("credentialMutable").Get("okAccessPassphrase").String() // pass, err3 := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessPassphrase").String()
userId, err4 := core.Cfg.Config.Get("connect").Get("userId").String() // userId, err4 := core.Cfg.Config.Get("connect").Get("userId").String()
restUrl, err5 := core.Cfg.Config.Get("connect").Get("restBaseUrl").String() // restUrl, err5 := core.Cfg.Config.Get("connect").Get("restBaseUrl").String()
if err1 != nil || err2 != nil || err3 != nil || err4 != nil || err5 != nil { // if err1 != nil || err2 != nil || err3 != nil || err4 != nil || err5 != nil {
fmt.Println(err1, err2, err3, err4, err5) // fmt.Println(err1, err2, err3, err4, err5)
} else { // } else {
fmt.Println("key:", key, secret, pass, "userId:", userId, "restUrl: ", restUrl) // fmt.Println("key:", key, secret, pass, "userId:", userId, "restUrl: ", restUrl)
} // }
// 请求的另一种方式 // // reqParam := make(map[string]interface{})
apikey := rest.APIKeyInfo{ // // if param != nil {
ApiKey: key, // // reqParam = *param
SecKey: secret, // // }
PassPhrase: pass, // // rs := rest.NewRESTAPI(restUrl, method, subUrl, &reqParam)
} // isDemo := false
isDemo := false // if core.Env == "demoEnv" {
if core.Env == "demoEnv" { // isDemo = true
isDemo = true // }
} // // rs.SetSimulate(isDemo).SetAPIKey(key, secret, pass).SetUserId(userId)
cli := rest.NewRESTClient(restUrl, &apikey, isDemo) // // response, err := rs.Run(context.Background())
rsp, err := cli.Post(context.Background(), subUrl, param) // // if err != nil {
if err != nil { // // fmt.Println("restInvoke2 err:", subUrl, err)
return rsp, err // // }
} // apikey := rest.APIKeyInfo{
return rsp, err // ApiKey: key,
} // SecKey: secret,
// PassPhrase: pass,
// }
// cli := rest.NewRESTClient(restUrl, &apikey, isDemo)
// rsp, err := cli.Get(context.Background(), subUrl, param)
// if err != nil {
// return rsp, err
// }
// fmt.Println("response:", rsp, err)
// return rsp, err
// }
// 跟下单有关的都关掉,以后再说
// func (core *Core) RestPost(subUrl string, param *map[string]interface{}) (*rest.RESTAPIResult, error) {
// key, err1 := core.Cfg.Config.Get("credentialMutable").Get("okAccessKey").String()
// secret, err2 := core.Cfg.Config.Get("credentialMutable").Get("secretKey").String()
// pass, err3 := core.Cfg.Config.Get("credentialMutable").Get("okAccessPassphrase").String()
// userId, err4 := core.Cfg.Config.Get("connect").Get("userId").String()
// restUrl, err5 := core.Cfg.Config.Get("connect").Get("restBaseUrl").String()
// if err1 != nil || err2 != nil || err3 != nil || err4 != nil || err5 != nil {
// fmt.Println(err1, err2, err3, err4, err5)
// } else {
// fmt.Println("key:", key, secret, pass, "userId:", userId, "restUrl: ", restUrl)
// }
// // 请求的另一种方式
// apikey := rest.APIKeyInfo{
// ApiKey: key,
// SecKey: secret,
// PassPhrase: pass,
// }
// isDemo := false
// if core.Env == "demoEnv" {
// isDemo = true
// }
// cli := rest.NewRESTClient(restUrl, &apikey, isDemo)
// rsp, err := cli.Post(context.Background(), subUrl, param)
// if err != nil {
// return rsp, err
// }
// return rsp, err
// }
// 我当前持有的币,每分钟刷新 // 我当前持有的币,每分钟刷新
func (core *Core) GetMyFavorList() []string { func (core *Core) GetMyFavorList() []string {
redisCli := core.RedisCli redisCli := core.RedisLocalCli
opt := redis.ZRangeBy{ opt := redis.ZRangeBy{
Min: "10", Min: "10",
Max: "100000000000", Max: "100000000000",
@ -374,7 +385,7 @@ func (core *Core) GetMyFavorList() []string {
// 改了,不需要交易排行榜,我手动指定一个排行即可, tickersVol|sortedSet 改成 tickersList|sortedSet // 改了,不需要交易排行榜,我手动指定一个排行即可, tickersVol|sortedSet 改成 tickersList|sortedSet
func (core *Core) GetScoreList(count int) []string { func (core *Core) GetScoreList(count int) []string {
redisCli := core.RedisCli redisCli := core.RedisLocalCli
curList, err := redisCli.ZRange("tickersList|sortedSet", 0, int64(count-1)).Result() curList, err := redisCli.ZRange("tickersList|sortedSet", 0, int64(count-1)).Result()
if err != nil { if err != nil {
@ -384,113 +395,116 @@ func (core *Core) GetScoreList(count int) []string {
return curList return curList
} }
func LoopBalances(cr *Core, mdura time.Duration) { // 跟订单有关,关掉
//协程动态维护topScore // func LoopBalances(cr *Core, mdura time.Duration) {
ticker := time.NewTicker(mdura) // //协程动态维护topScore
for { // ticker := time.NewTicker(mdura)
select { // for {
case <-ticker.C: // select {
//协程循环执行rest请求candle // case <-ticker.C:
fmt.Println("loopBalance: receive ccyChannel start") // //协程循环执行rest请求candle
RestBalances(cr) // fmt.Println("loopBalance: receive ccyChannel start")
} // RestBalances(cr)
} // }
} // }
func LoopLivingOrders(cr *Core, mdura time.Duration) { // }
//协程动态维护topScore
ticker := time.NewTicker(mdura)
for {
select {
case <-ticker.C:
//协程循环执行rest请求candle
fmt.Println("loopLivingOrder: receive ccyChannel start")
RestLivingOrder(cr)
}
}
}
func RestBalances(cr *Core) ([]*private.Ccy, error) { // func LoopLivingOrders(cr *Core, mdura time.Duration) {
// fmt.Println("restBalance loopBalance loop start") // //协程动态维护topScore
ccyList := []*private.Ccy{} // ticker := time.NewTicker(mdura)
rsp, err := cr.GetBalances() // for {
if err != nil { // select {
fmt.Println("loopBalance err00: ", err) // case <-ticker.C:
} // //协程循环执行rest请求candle
fmt.Println("loopBalance balance rsp: ", rsp) // fmt.Println("loopLivingOrder: receive ccyChannel start")
if err != nil { // RestLivingOrder(cr)
fmt.Println("loopBalance err01: ", err) // }
return ccyList, err // }
} // }
if len(rsp.Body) == 0 {
fmt.Println("loopBalance err03: rsp body is null")
return ccyList, err
}
js1, err := simple.NewJson([]byte(rsp.Body))
if err != nil {
fmt.Println("loopBalance err1: ", err)
}
itemList := js1.Get("data").GetIndex(0).Get("details").MustArray()
// maxTickers是重点关注的topScore的coins的数量
cli := cr.RedisCli
for _, v := range itemList {
js, _ := json.Marshal(v)
ccyResp := private.CcyResp{}
err := json.Unmarshal(js, &ccyResp)
if err != nil {
fmt.Println("loopBalance err2: ", err)
}
ccy, err := ccyResp.Convert()
ccyList = append(ccyList, ccy)
if err != nil {
fmt.Println("loopBalance err2: ", err)
}
z := redis.Z{
Score: ccy.EqUsd,
Member: ccy.Ccy + "|position|key",
}
res, err := cli.ZAdd("private|positions|sortedSet", z).Result()
if err != nil {
fmt.Println("loopBalance err3: ", res, err)
}
res1, err := cli.Set(ccy.Ccy+"|position|key", js, 0).Result()
if err != nil {
fmt.Println("loopBalance err4: ", res1, err)
}
bjs, _ := json.Marshal(ccy)
tsi := time.Now().Unix()
tsii := tsi - tsi%600
tss := strconv.FormatInt(tsii, 10)
cli.Set(CCYPOSISTIONS_PUBLISH+"|ts:"+tss, 1, 10*time.Minute).Result()
fmt.Println("ccy published: ", string(bjs))
//TODO FIXME 50毫秒每分钟上限是1200个订单超过就无法遍历完成
time.Sleep(50 * time.Millisecond)
suffix := ""
if cr.Env == "demoEnv" {
suffix = "-demoEnv"
}
// TODO FIXME cli2
cli.Publish(CCYPOSISTIONS_PUBLISH+suffix, string(bjs)).Result()
}
return ccyList, nil
}
func RestLivingOrder(cr *Core) ([]*private.Order, error) { // 跟订单有关,关掉
// fmt.Println("restOrder loopOrder loop start") // func RestBalances(cr *Core) ([]*private.Ccy, error) {
orderList := []*private.Order{} // // fmt.Println("restBalance loopBalance loop start")
list, err := cr.GetLivingOrderList() // ccyList := []*private.Ccy{}
if err != nil { // rsp, err := cr.GetBalances()
fmt.Println("loopLivingOrder err00: ", err) // if err != nil {
} // fmt.Println("loopBalance err00: ", err)
fmt.Println("loopLivingOrder response:", list) // }
go func() { // fmt.Println("loopBalance balance rsp: ", rsp)
for _, v := range list { // if err != nil {
fmt.Println("order orderV:", v) // fmt.Println("loopBalance err01: ", err)
time.Sleep(30 * time.Millisecond) // return ccyList, err
cr.OrderChan <- v // }
} // if len(rsp.Body) == 0 {
}() // fmt.Println("loopBalance err03: rsp body is null")
return orderList, nil // return ccyList, err
} // }
// js1, err := simple.NewJson([]byte(rsp.Body))
// if err != nil {
// fmt.Println("loopBalance err1: ", err)
// }
// itemList := js1.Get("data").GetIndex(0).Get("details").MustArray()
// // maxTickers是重点关注的topScore的coins的数量
// cli := cr.RedisLocalCli
// for _, v := range itemList {
// js, _ := json.Marshal(v)
// ccyResp := private.CcyResp{}
// err := json.Unmarshal(js, &ccyResp)
// if err != nil {
// fmt.Println("loopBalance err2: ", err)
// }
// ccy, err := ccyResp.Convert()
// ccyList = append(ccyList, ccy)
// if err != nil {
// fmt.Println("loopBalance err2: ", err)
// }
// z := redis.Z{
// Score: ccy.EqUsd,
// Member: ccy.Ccy + "|position|key",
// }
// res, err := cli.ZAdd("private|positions|sortedSet", z).Result()
// if err != nil {
// fmt.Println("loopBalance err3: ", res, err)
// }
// res1, err := cli.Set(ccy.Ccy+"|position|key", js, 0).Result()
// if err != nil {
// fmt.Println("loopBalance err4: ", res1, err)
// }
// bjs, _ := json.Marshal(ccy)
// tsi := time.Now().Unix()
// tsii := tsi - tsi%600
// tss := strconv.FormatInt(tsii, 10)
// cli.Set(CCYPOSISTIONS_PUBLISH+"|ts:"+tss, 1, 10*time.Minute).Result()
// fmt.Println("ccy published: ", string(bjs))
// //TODO FIXME 50毫秒每分钟上限是1200个订单超过就无法遍历完成
// time.Sleep(50 * time.Millisecond)
// suffix := ""
// if cr.Env == "demoEnv" {
// suffix = "-demoEnv"
// }
// // TODO FIXME cli2
// cli.Publish(CCYPOSISTIONS_PUBLISH+suffix, string(bjs)).Result()
// }
// return ccyList, nil
// }
// func RestLivingOrder(cr *Core) ([]*private.Order, error) {
// // fmt.Println("restOrder loopOrder loop start")
// orderList := []*private.Order{}
// list, err := cr.GetLivingOrderList()
// if err != nil {
// fmt.Println("loopLivingOrder err00: ", err)
// }
// fmt.Println("loopLivingOrder response:", list)
// go func() {
// for _, v := range list {
// fmt.Println("order orderV:", v)
// time.Sleep(30 * time.Millisecond)
// cr.OrderChan <- v
// }
// }()
// return orderList, nil
// }
func (cr *Core) ProcessOrder(od *private.Order) error { func (cr *Core) ProcessOrder(od *private.Order) error {
// publish // publish
@ -503,54 +517,55 @@ func (cr *Core) ProcessOrder(od *private.Order) error {
bj, _ := json.Marshal(od) bj, _ := json.Marshal(od)
// TODO FIXME cli2 // TODO FIXME cli2
res, _ := cr.RedisCli.Publish(cn, string(bj)).Result() res, _ := cr.RedisLocalCli.Publish(cn, string(bj)).Result()
fmt.Println("order publish res: ", res, " content: ", string(bj)) fmt.Println("order publish res: ", res, " content: ", string(bj))
rsch := ORDER_RESP_PUBLISH + suffix rsch := ORDER_RESP_PUBLISH + suffix
bj1, _ := json.Marshal(res) bj1, _ := json.Marshal(res)
// TODO FIXME cli2 // TODO FIXME cli2
res, _ = cr.RedisCli.Publish(rsch, string(bj1)).Result() res, _ = cr.RedisLocalCli.Publish(rsch, string(bj1)).Result()
}() }()
return nil return nil
} }
func (cr *Core) DispatchSubAction(action *SubAction) error { // 跟订单有关,关掉
go func() { // func (cr *Core) DispatchSubAction(action *SubAction) error {
suffix := "" // go func() {
if cr.Env == "demoEnv" { // suffix := ""
suffix = "-demoEnv" // if cr.Env == "demoEnv" {
} // suffix = "-demoEnv"
fmt.Println("action: ", action.ActionName, action.MetaInfo) // }
res, err := cr.RestPostWrapper("/api/v5/trade/"+action.ActionName, action.MetaInfo) // fmt.Println("action: ", action.ActionName, action.MetaInfo)
if err != nil { // res, err := cr.RestPostWrapper("/api/v5/trade/"+action.ActionName, action.MetaInfo)
fmt.Println(utils.GetFuncName(), " actionRes 1:", err) // if err != nil {
} // fmt.Println(utils.GetFuncName(), " actionRes 1:", err)
rsch := ORDER_RESP_PUBLISH + suffix // }
bj1, _ := json.Marshal(res) // rsch := ORDER_RESP_PUBLISH + suffix
// bj1, _ := json.Marshal(res)
//
// // TODO FIXME cli2
// rs, _ := cr.RedisLocalCli.Publish(rsch, string(bj1)).Result()
// fmt.Println("action rs: ", rs)
// }()
// return nil
// }
// TODO FIXME cli2 // func (cr *Core) RestPostWrapper(url string, param map[string]interface{}) (rest.Okexv5APIResponse, error) {
rs, _ := cr.RedisCli.Publish(rsch, string(bj1)).Result() // suffix := ""
fmt.Println("action rs: ", rs) // if cr.Env == "demoEnv" {
}() // suffix = "-demoEnv"
return nil // }
} // res, err := cr.RestPost(url, &param)
// fmt.Println("actionRes 2:", res.V5Response.Msg, res.V5Response.Data, err)
func (cr *Core) RestPostWrapper(url string, param map[string]interface{}) (rest.Okexv5APIResponse, error) { // bj, _ := json.Marshal(res)
suffix := "" //
if cr.Env == "demoEnv" { // // TODO FIXME cli2
suffix = "-demoEnv" // cr.RedisLocalCli.Publish(ORDER_RESP_PUBLISH+suffix, string(bj))
} // return res.V5Response, nil
res, err := cr.RestPost(url, &param) // }
fmt.Println("actionRes 2:", res.V5Response.Msg, res.V5Response.Data, err)
bj, _ := json.Marshal(res)
// TODO FIXME cli2
cr.RedisCli.Publish(ORDER_RESP_PUBLISH+suffix, string(bj))
return res.V5Response, nil
}
func (cr *Core) AddToGeneralCandleChnl(candle *Candle, channels []string) { func (cr *Core) AddToGeneralCandleChnl(candle *Candle, channels []string) {
redisCli := cr.RedisCli redisCli := cr.RedisLocalCli
ab, _ := json.Marshal(candle) ab, _ := json.Marshal(candle)
for _, v := range channels { for _, v := range channels {
suffix := "" suffix := ""
@ -565,3 +580,137 @@ func (cr *Core) AddToGeneralCandleChnl(candle *Candle, channels []string) {
} }
} }
} }
// setName := "candle" + period + "|" + instId + "|sortedSet"
// count: 倒推多少个周期开始拿数据
// from: 倒推的起始时间点
// ctype: candle或者maX
func (core *Core) GetRangeMaXSortedSet(setName string, count int, from time.Time) (*MaXList, error) {
mxl := MaXList{}
ary1 := strings.Split(setName, "|")
ary2 := []string{}
period := ""
ary2 = strings.Split(ary1[1], "candle")
period = ary2[1]
dui, err := core.PeriodToMinutes(period)
if err != nil {
return &mxl, err
}
fromt := from.UnixMilli()
froms := strconv.FormatInt(fromt, 10)
sti := fromt - dui*int64(count)*60*1000
sts := strconv.FormatInt(sti, 10)
opt := redis.ZRangeBy{
Min: sts,
Max: froms,
Count: int64(count),
}
ary := []string{}
logrus.Debug("ZRevRangeByScore ", setName, froms, sts)
dura, err := core.GetExpiration(period)
if err != nil {
return &mxl, err
}
// fmt.Println("GetExpiration dura: ", dura, " period: ", period)
ot := time.Now().Add(dura * -1)
oti := ot.UnixMilli()
// fmt.Println(fmt.Sprint("GetExpiration zRemRangeByScore ", setName, " ", 0, " ", strconv.FormatInt(oti, 10)))
cli := core.RedisLocalCli
cli.LTrim(setName, 0, oti)
cunt, _ := cli.ZRemRangeByScore(setName, "0", strconv.FormatInt(oti, 10)).Result()
if cunt > 0 {
logrus.Warning("移出过期的引用数量:", "setName:", setName, " count:", count)
}
ary, err = cli.ZRevRangeByScore(setName, opt).Result()
if err != nil {
logrus.Warning("GetRangeMaXSortedSet ZRevRangeByScore err: ", " setName: ", setName, ", opt:", opt, ", res:", ary, ", err:", err)
return &mxl, err
}
keyAry, err := cli.MGet(ary...).Result()
if err != nil {
logrus.Warning("GetRangeMaXSortedSet mget err: ", "setName:", setName, ", max:", opt.Max, ", min:", opt.Min, ", res:", keyAry, ", err:", err)
return &mxl, err
}
for _, str := range keyAry {
if str == nil {
continue
}
mx := MaX{}
json.Unmarshal([]byte(str.(string)), &mx)
curData := mx.Data
if len(curData) == 0 {
logrus.Warning("curData is null", str)
continue
}
ts := curData[0].(float64)
tm := time.UnixMilli(int64(ts))
if tm.Sub(from) > 0 {
break
}
mxl.List = append(mxl.List, &mx)
}
mxl.Count = count
return &mxl, nil
}
// 根据周期的文本内容,返回这代表多少个分钟
func (cr *Core) GetExpiration(per string) (time.Duration, error) {
if len(per) == 0 {
erstr := fmt.Sprint("period没有设置")
logrus.Warn(erstr)
err := errors.New(erstr)
return 0, err
}
exp, err := cr.PeriodToMinutes(per)
dur := time.Duration(exp*49) * time.Minute
return dur, err
}
func (cr *Core) PeriodToLastTime(period string, from time.Time) (time.Time, error) {
candles := cr.Cfg.Config.Get("softCandleSegmentList").MustArray()
minutes := int64(0)
tms := ""
for _, v := range candles {
cs := CandleSegment{}
sv, _ := json.Marshal(v)
json.Unmarshal(sv, &cs)
if cs.Seg == period {
mns, err := cr.PeriodToMinutes(period)
if err != nil {
continue
}
minutes = mns
tms = cs.StartTime
} else {
}
}
tm, err := time.ParseInLocation("2006-01-02 15:04.000", tms, time.Local)
if err != nil {
return from, err
}
// from.Unix() % minutes
frmi := from.UnixMilli()
frmi = frmi - (frmi)%(60*1000)
tmi := tm.UnixMilli()
tmi = tmi - (tmi)%(60*1000)
oms := int64(0)
for i := int64(0); true; i++ {
if tmi+i*minutes*60*1000 > frmi {
break
} else {
oms = tmi + i*minutes*60*1000
continue
}
}
// return from, nil
// tm :=
om := time.UnixMilli(oms)
// fmt.Println("PeriodToLastTime: period: ", period, " lastTime:", om.Format("2006-01-02 15:04:05.000"))
return om, nil
}

133
maX.go Normal file
View File

@ -0,0 +1,133 @@
package core
import (
"encoding/json"
"errors"
"fmt"
logrus "github.com/sirupsen/logrus"
// "os"
"strconv"
"time"
)
type MaXList struct {
Count int `json:"count"`
LastUpdateTime int64 `json:"lastUpdateTime"`
UpdateNickName string `json:"updateNickName"`
List []*MaX `json:"list"`
}
type MaX struct {
InstID string `json:"instID"`
Period string `json:"period"`
KeyName string `json:"keyName"`
Data []interface{} `json:"data"`
Count int `json:"count,number"`
Ts int64 `json:"ts,number"`
AvgVal float64 `json:"avgVal,number"`
From string `json:"from,string"`
}
type WillMX struct {
KeyName string
Count int
}
func (mx MaX) SetToKey(cr *Core) ([]interface{}, error) {
// fmt.Println(utils.GetFuncName(), " step1 ", mx.InstID, " ", mx.Period)
cstr := strconv.Itoa(mx.Count)
tss := strconv.FormatInt(mx.Ts, 10)
//校验时间戳是否合法
ntm, err := cr.PeriodToLastTime(mx.Period, time.UnixMilli(mx.Ts))
if ntm.UnixMilli() != mx.Ts {
logrus.Warn(fmt.Sprint(GetFuncName(), " candles时间戳有问题 ", " 应该: ", ntm, "实际:", mx.Ts))
mx.Ts = ntm.UnixMilli()
}
keyName := "ma" + cstr + "|candle" + mx.Period + "|" + mx.InstID + "|ts:" + tss
//过期时间:根号(当前candle的周期/1分钟)*10000
dj, _ := json.Marshal(mx)
extt, err := cr.GetExpiration(mx.Period)
if err != nil {
fmt.Println("max SetToKey err: ", err)
return mx.Data, err
}
// fmt.Println(utils.GetFuncName(), " step2 ", mx.InstID, " ", mx.Period)
// tm := time.UnixMilli(mx.Ts).Format("01-02 15:04")
cli := cr.RedisLocalCli
if len(string(dj)) == 0 {
fmt.Println("mx data is block data: ", mx, string(dj))
err := errors.New("data is block")
return mx.Data, err
}
// fmt.Println(utils.GetFuncName(), " step3 ", mx.InstID, " ", mx.Period)
_, err = cli.Set(keyName, dj, extt).Result()
if err != nil {
fmt.Println(GetFuncName(), " maXSetToKey err:", err)
return mx.Data, err
}
// fmt.Println(utils.GetFuncName(), " step4 ", mx.InstID, " ", mx.Period)
// fmt.Println("max setToKey: ", keyName, "res:", res, "data:", string(dj), "from: ", mx.From)
cr.SaveUniKey(mx.Period, keyName, extt, mx.Ts)
return mx.Data, err
}
// TODO
// 返回:
// Sample被顶出队列的元素
func (mxl *MaXList) RPush(sm *MaX) (Sample, error) {
last := MaX{}
bj, _ := json.Marshal(*sm)
json.Unmarshal(bj, &sm)
tsi := sm.Data[0]
matched := false
for k, v := range mxl.List {
if v.Data[0] == tsi {
matched = true
mxl.List[k] = sm
}
}
if matched {
return nil, nil
}
if len(mxl.List) >= mxl.Count && len(mxl.List) > 1 {
last = *mxl.List[0]
mxl.List = mxl.List[1:]
mxl.List = append(mxl.List, sm)
return last, nil
} else {
mxl.List = append(mxl.List, sm)
return nil, nil
}
return nil, nil
}
// 冒泡排序
func (mxl *MaXList) RecursiveBubbleS(length int, ctype string) error {
if length == 0 {
return nil
}
realLength := len(mxl.List)
//FIXME在对这个List进行排序时List中途长度变了就会报错
// Jan 17 02:40:39 miracle ubuntu[25239]: panic: runtime error: index out of range [23] with length 23
for idx, _ := range mxl.List {
if idx >= length-1 || idx > realLength-1 {
break
}
temp := MaX{}
pre, _ := mxl.List[idx].Data[0].(float64)
nex, _ := mxl.List[idx+1].Data[0].(float64)
daoxu := pre < nex
if ctype == "asc" {
daoxu = !daoxu
}
if daoxu { //改变成>,换成从小到大排序
temp = *mxl.List[idx]
mxl.List[idx] = mxl.List[idx+1]
mxl.List[idx+1] = &temp
}
}
length--
mxl.RecursiveBubbleS(length, ctype)
return nil
}

208
util.go Normal file
View File

@ -0,0 +1,208 @@
package core
import (
"crypto/md5"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"math"
"math/rand"
"runtime"
"strconv"
"time"
)
func NextPeriod(curPeriod string, idx int) string {
list := []string{
"1m",
"3m",
"5m",
"15m",
"30m",
"1H",
"2H",
"4H",
"6H",
"12H",
"1D",
"2D",
"5D",
}
nextPer := ""
for i := 0; i < len(list)-1; i++ {
if list[i] == curPeriod {
nextPer = list[i+idx]
}
}
return nextPer
}
func OtherPeriod(curPeriod string, cha int) string {
list := []string{
"1m",
"3m",
"5m",
"15m",
"30m",
"1H",
"2H",
"4H",
"6H",
"12H",
"1D",
"2D",
"5D",
}
nextPer := ""
for i := 0; i < len(list)-1; i++ {
if list[i] == curPeriod {
if i+cha > 0 && i+cha < len(list)-1 {
nextPer = list[i+cha]
}
}
}
return nextPer
}
func ShaiziInt(n int) int {
rand.Seed(time.Now().UnixNano())
b := rand.Intn(n)
return b
}
func Difference(slice1 []string, slice2 []string) []string {
var diff []string
// Loop two times, first to find slice1 strings not in slice2,
// second loop to find slice2 strings not in slice1
for i := 0; i < 2; i++ {
for _, s1 := range slice1 {
found := false
for _, s2 := range slice2 {
if s1 == s2 {
found = true
break
}
}
// String not found. We add it to return slice
if !found {
diff = append(diff, s1)
}
}
// Swap the slices, only if it was the first loop
if i == 0 {
slice1, slice2 = slice2, slice1
}
}
return diff
}
func JsonToMap(str string) (map[string]interface{}, error) {
var tempMap map[string]interface{}
err := json.Unmarshal([]byte(str), &tempMap)
if err != nil {
fmt.Println("Unmarshal err: ", err, str)
}
return tempMap, err
}
func Sqrt(x float64) float64 {
z := 1.0
for math.Abs(z*z-x) > 0.000001 {
z -= (z*z - x) / (2 * z)
}
return z
}
func RecursiveBubble(ary []int64, length int) []int64 {
if length == 0 {
return ary
}
for idx, _ := range ary {
if idx >= length-1 {
break
}
temp := int64(0)
if ary[idx] < ary[idx+1] { //改变成>,换成从小到大排序
temp = ary[idx]
ary[idx] = ary[idx+1]
ary[idx+1] = temp
}
}
length--
RecursiveBubble(ary, length)
return ary
}
func Decimal(value float64) string {
va := fmt.Sprintf("%.5f", value)
v1, _ := strconv.ParseFloat(va, 32)
v1 = v1 * 100
s1 := fmt.Sprintf("%.3f", v1)
return s1 + "%"
}
func ShortNum(value float64) string {
va := fmt.Sprintf("%.5f", value)
v1, _ := strconv.ParseFloat(va, 32)
v1 = v1 * 100
s1 := fmt.Sprintf("%.3f", v1)
return s1
}
func GetRandomString(l int) string {
str := "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
bytes := []byte(str)
result := []byte{}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < l; i++ {
result = append(result, bytes[r.Intn(len(bytes))])
}
return string(result)
}
func Shaizi(n int) bool {
rand.Seed(time.Now().UnixNano())
b := rand.Intn(n)
if b == 1 {
return true
}
return false
}
func Sishewuru(x float64, digit int) (number float64, err error) {
baseDigit := 10
if digit < 0 {
return x, errors.New("错误的精度不能小于0")
} else {
baseDigit = pow(baseDigit, digit+1)
}
betweenNumber := float64(5)
return (math.Floor((x*float64(baseDigit) + betweenNumber) / 10)) / float64(baseDigit/10), err
}
func pow(x, n int) int {
ret := 1 // 结果初始为0次方的值整数0次方为1。如果是矩阵则为单元矩阵。
for n != 0 {
if n%2 != 0 {
ret = ret * x
}
n /= 2
x = x * x
}
return ret
}
// 获取当前函数名字
func GetFuncName() string {
pc := make([]uintptr, 1)
runtime.Callers(2, pc)
f := runtime.FuncForPC(pc[0])
return f.Name()
}
func Md5V(str string) string {
h := md5.New()
h.Write([]byte(str))
return hex.EncodeToString(h.Sum(nil))
}