commit d9714d59b999d4596beee9e463b3ab890b0d4a9b Author: zhangkun Date: Sat Dec 14 19:09:06 2024 +0800 first add diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..22d0d82 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +vendor diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..35781f8 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 wang + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/config/conf.go b/config/conf.go new file mode 100644 index 0000000..c1c20b5 --- /dev/null +++ b/config/conf.go @@ -0,0 +1,32 @@ +package config + +import "fmt" + +type Env struct { + RestEndpoint string `yaml:"RestEndpoint"` + WsEndpoint string `yaml:"WsEndpoint"` + IsSimulation bool `yaml:"IsSimulation"` +} + +type ApiInfo struct { + ApiKey string `yaml:"ApiKey"` + SecretKey string `yaml:"SecretKey"` + Passphrase string `yaml:"Passphrase"` +} + +type MetaData struct { + Description string `yaml:"Description"` +} + +type Config struct { + MetaData `yaml:"MetaData"` + Env `yaml:"Env"` + ApiInfo `yaml:"ApiInfo"` +} + +func (s *ApiInfo) String() string { + res := "ApiInfo{" + res += fmt.Sprintf("ApiKey:%v,SecretKey:%v,Passphrase:%v", s.ApiKey, s.SecretKey, s.Passphrase) + res += "}" + return res +} diff --git a/config/go.mod b/config/go.mod new file mode 100644 index 0000000..bf70c70 --- /dev/null +++ b/config/go.mod @@ -0,0 +1,3 @@ +module v5sdk_go/config + +go 1.14 diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..cd83b72 --- /dev/null +++ b/go.mod @@ -0,0 +1,8 @@ +module v5sdk_go + +go 1.15 + +require ( + github.com/gorilla/websocket v1.4.2 + github.com/stretchr/testify v1.7.0 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..8eb6d1d --- /dev/null +++ b/go.sum @@ -0,0 +1,12 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go new file mode 100644 index 0000000..5781aa8 --- /dev/null +++ b/main.go @@ -0,0 +1,237 @@ +package main + +import ( + "context" + "fmt" + "log" + "time" + . "v5sdk_go/rest" + . "v5sdk_go/ws" +) + +/* + rest API请求 + 更多示例请查看 rest/rest_test.go +*/ +func REST() { + // 设置您的APIKey + apikey := APIKeyInfo{ + ApiKey: "eca767d4-fda5-4a1b-bb28-49ae18093307", + SecKey: "8CA3628A556ED137977DB298D37BC7F3", + PassPhrase: "Op3Druaron", + } + + // 第三个参数代表是否为模拟环境,更多信息查看接口说明 + cli := NewRESTClient("https://www.okex.win", &apikey, false) + rsp, err := cli.Get(context.Background(), "/api/v5/account/balance", nil) + if err != nil { + return + } + + fmt.Println("Response:") + fmt.Println("\thttp code: ", rsp.Code) + fmt.Println("\t总耗时: ", rsp.TotalUsedTime) + fmt.Println("\t请求耗时: ", rsp.ReqUsedTime) + fmt.Println("\t返回消息: ", rsp.Body) + fmt.Println("\terrCode: ", rsp.V5Response.Code) + fmt.Println("\terrMsg: ", rsp.V5Response.Msg) + fmt.Println("\tdata: ", rsp.V5Response.Data) + +} + +// 订阅私有频道 +func wsPriv() { + ep := "wss://ws.okex.com:8443/ws/v5/private?brokerId=9999" + + // 填写您自己的APIKey信息 + apikey := "xxxx" + secretKey := "xxxxx" + passphrase := "xxxxx" + + // 创建ws客户端 + r, err := NewWsClient(ep) + if err != nil { + log.Println(err) + return + } + + // 设置连接超时 + r.SetDailTimeout(time.Second * 2) + err = r.Start() + if err != nil { + log.Println(err) + return + } + defer r.Stop() + var res bool + + res, _, err = r.Login(apikey, secretKey, passphrase) + if res { + fmt.Println("登录成功!") + } else { + fmt.Println("登录失败!", err) + return + } + + // 订阅账户频道 + var args []map[string]string + arg := make(map[string]string) + arg["ccy"] = "BTC" + args = append(args, arg) + + start := time.Now() + res, _, err = r.PrivAccout(OP_SUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("订阅成功!耗时:", usedTime.String()) + } else { + fmt.Println("订阅失败!", err) + } + + time.Sleep(100 * time.Second) + start = time.Now() + res, _, err = r.PrivAccout(OP_UNSUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("取消订阅成功!", usedTime.String()) + } else { + fmt.Println("取消订阅失败!", err) + } + +} + +// 订阅公共频道 +func wsPub() { + ep := "wss://wsaws.okex.com:8443/ws/v5/public?brokerId=9999" + + // 创建ws客户端 + r, err := NewWsClient(ep) + if err != nil { + log.Println(err) + return + } + + // 设置连接超时 + r.SetDailTimeout(time.Second * 2) + err = r.Start() + if err != nil { + log.Println(err) + return + } + defer r.Stop() + // 订阅产品频道 + // 在这里初始化instrument列表 + var args []map[string]string + arg := make(map[string]string) + arg["instType"] = FUTURES + //arg["instType"] = OPTION + args = append(args, arg) + + start := time.Now() + //订阅 + res, _, err := r.PubInstruemnts(OP_SUBSCRIBE, args) + fmt.Println("args:", args) + if res { + usedTime := time.Since(start) + fmt.Println("订阅成功!", usedTime.String()) + } else { + fmt.Println("订阅失败!", err) + } + + // 在这里 args1 初始化tickerList的列表 + var args1 []map[string]string + arg1 := make(map[string]string) + arg1["instId"] = "ETH-USDT" + //arg["instType"] = OPTION + args1 = append(args1, arg1) + //------------------------------------------------------ + start1 := time.Now() + res, _, err = r.PubTickers(OP_SUBSCRIBE, args1) + fmt.Println("args:", args) + if res { + usedTime := time.Since(start1) + fmt.Println("订阅成功!", usedTime.String()) + } else { + fmt.Println("订阅失败!", err) + } + time.Sleep(300 * time.Second) + // + // start = time.Now() + // res, _, err = r.PubInstruemnts(OP_UNSUBSCRIBE, args) + // if res { + // usedTime := time.Since(start) + // fmt.Println("取消订阅成功!", usedTime.String()) + // } else { + // fmt.Println("取消订阅失败!", err) + // } +} + +// websocket交易 +func wsJrpc() { + ep := "wss://ws.okex.com:8443/ws/v5/private?brokerId=9999" + + // 填写您自己的APIKey信息 + apikey := "xxxx" + secretKey := "xxxxx" + passphrase := "xxxxx" + + var res bool + var req_id string + + // 创建ws客户端 + r, err := NewWsClient(ep) + if err != nil { + log.Println(err) + return + } + + // 设置连接超时 + r.SetDailTimeout(time.Second * 2) + err = r.Start() + if err != nil { + log.Println(err) + return + } + + defer r.Stop() + + res, _, err = r.Login(apikey, secretKey, passphrase) + if res { + fmt.Println("登录成功!") + } else { + fmt.Println("登录失败!", err) + return + } + + start := time.Now() + param := map[string]interface{}{} + param["instId"] = "BTC-USDT" + param["tdMode"] = "cash" + param["side"] = "buy" + param["ordType"] = "market" + param["sz"] = "200" + req_id = "00001" + + res, _, err = r.PlaceOrder(req_id, param) + if res { + usedTime := time.Since(start) + fmt.Println("下单成功!", usedTime.String()) + } else { + usedTime := time.Since(start) + fmt.Println("下单失败!", usedTime.String(), err) + } +} + +func main() { + // 公共订阅 + wsPub() + + // 私有订阅 + // wsPriv() + + // websocket交易 + // wsJrpc() + + // rest请求 + // REST() +} diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..9ff3e39 --- /dev/null +++ b/readme.md @@ -0,0 +1,283 @@ +# 简介 +OKEX go版本的v5sdk,仅供学习交流使用。 +(文档持续完善中) +# 项目说明 + +## REST调用 +``` go + // 设置您的APIKey + apikey := APIKeyInfo{ + ApiKey: "xxxx", + SecKey: "xxxx", + PassPhrase: "xxxx", + } + + // 第三个参数代表是否为模拟环境,更多信息查看接口说明 + cli := NewRESTClient("https://www.okex.win", &apikey, true) + rsp, err := cli.Get(context.Background(), "/api/v5/account/balance", nil) + if err != nil { + return + } + + fmt.Println("Response:") + fmt.Println("\thttp code: ", rsp.Code) + fmt.Println("\t总耗时: ", rsp.TotalUsedTime) + fmt.Println("\t请求耗时: ", rsp.ReqUsedTime) + fmt.Println("\t返回消息: ", rsp.Body) + fmt.Println("\terrCode: ", rsp.V5Response.Code) + fmt.Println("\terrMsg: ", rsp.V5Response.Msg) + fmt.Println("\tdata: ", rsp.V5Response.Data) + ``` +更多示例请查看rest/rest_test.go + +## websocket订阅 + +### 私有频道 +```go + ep := "wss://ws.okex.com:8443/ws/v5/private?brokerId=9999" + + // 填写您自己的APIKey信息 + apikey := "xxxx" + secretKey := "xxxxx" + passphrase := "xxxxx" + + // 创建ws客户端 + r, err := NewWsClient(ep) + if err != nil { + log.Println(err) + return + } + + // 设置连接超时 + r.SetDailTimeout(time.Second * 2) + err = r.Start() + if err != nil { + log.Println(err) + return + } + defer r.Stop() + + var res bool + // 私有频道需要登录 + res, _, err = r.Login(apikey, secretKey, passphrase) + if res { + fmt.Println("登录成功!") + } else { + fmt.Println("登录失败!", err) + return + } + + + var args []map[string]string + arg := make(map[string]string) + arg["ccy"] = "BTC" + args = append(args, arg) + + start := time.Now() + // 订阅账户频道 + res, _, err = r.PrivAccout(OP_SUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("订阅成功!耗时:", usedTime.String()) + } else { + fmt.Println("订阅失败!", err) + } + + time.Sleep(100 * time.Second) + start = time.Now() + // 取消订阅账户频道 + res, _, err = r.PrivAccout(OP_UNSUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("取消订阅成功!", usedTime.String()) + } else { + fmt.Println("取消订阅失败!", err) + } +``` +更多示例请查看ws/ws_priv_channel_test.go + +### 公有频道 +```go + ep := "wss://ws.okex.com:8443/ws/v5/public?brokerId=9999" + + // 创建ws客户端 + r, err := NewWsClient(ep) + if err != nil { + log.Println(err) + return + } + + + // 设置连接超时 + r.SetDailTimeout(time.Second * 2) + err = r.Start() + if err != nil { + log.Println(err) + return + } + + defer r.Stop() + + + var args []map[string]string + arg := make(map[string]string) + arg["instType"] = FUTURES + //arg["instType"] = OPTION + args = append(args, arg) + + start := time.Now() + + // 订阅产品频道 + res, _, err := r.PubInstruemnts(OP_SUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("订阅成功!", usedTime.String()) + } else { + fmt.Println("订阅失败!", err) + } + + time.Sleep(30 * time.Second) + + start = time.Now() + + // 取消订阅产品频道 + res, _, err = r.PubInstruemnts(OP_UNSUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("取消订阅成功!", usedTime.String()) + } else { + fmt.Println("取消订阅失败!", err) + } +``` +更多示例请查看ws/ws_pub_channel_test.go + +## websocket交易 +```go + ep := "wss://ws.okex.com:8443/ws/v5/private?brokerId=9999" + + // 填写您自己的APIKey信息 + apikey := "xxxx" + secretKey := "xxxxx" + passphrase := "xxxxx" + + var res bool + var req_id string + + // 创建ws客户端 + r, err := NewWsClient(ep) + if err != nil { + log.Println(err) + return + } + + // 设置连接超时 + r.SetDailTimeout(time.Second * 2) + err = r.Start() + if err != nil { + log.Println(err) + return + } + + defer r.Stop() + + res, _, err = r.Login(apikey, secretKey, passphrase) + if res { + fmt.Println("登录成功!") + } else { + fmt.Println("登录失败!", err) + return + } + + start := time.Now() + param := map[string]interface{}{} + param["instId"] = "BTC-USDT" + param["tdMode"] = "cash" + param["side"] = "buy" + param["ordType"] = "market" + param["sz"] = "200" + req_id = "00001" + + // 单个下单 + res, _, err = r.PlaceOrder(req_id, param) + if res { + usedTime := time.Since(start) + fmt.Println("下单成功!", usedTime.String()) + } else { + usedTime := time.Since(start) + fmt.Println("下单失败!", usedTime.String(), err) + } + +``` +更多示例请查看ws/ws_jrpc_test.go + +## wesocket推送 +websocket推送数据分为两种类型数据:`普通推送数据`和`深度类型数据`。 + +```go +ws/wImpl/BookData.go + +// 普通推送 +type MsgData struct { + Arg map[string]string `json:"arg"` + Data []interface{} `json:"data"` +} + +// 深度数据 +type DepthData struct { + Arg map[string]string `json:"arg"` + Action string `json:"action"` + Data []DepthDetail `json:"data"` +} +``` +如果需要对推送数据做处理用户可以自定义回调函数: +1. 全局消息处理的回调函数 +该回调函数会处理所有从服务端接受到的数据。 +```go +/* + 添加全局消息处理的回调函数 +*/ +func (a *WsClient) AddMessageHook(fn ReceivedDataCallback) error { + a.onMessageHook = fn + return nil +} +``` +使用方法参见 ws/ws_test.go中测试用例TestAddMessageHook。 + +2. 订阅消息处理回调函数 +可以处理所有非深度类型的数据,包括 订阅/取消订阅,普通推送数据。 +```go +/* + 添加订阅消息处理的回调函数 +*/ +func (a *WsClient) AddBookMsgHook(fn ReceivedMsgDataCallback) error { + a.onBookMsgHook = fn + return nil +} +``` +使用方法参见 ws/ws_test.go中测试用例TestAddBookedDataHook。 + + +3. 深度消息处理的回调函数 +这里需要说明的是,Wsclient提供了深度数据管理和自动checksum的功能,用户如果需要关闭此功能,只需要调用EnableAutoDepthMgr方法。 +```go +/* + 添加深度消息处理的回调函数 +*/ +func (a *WsClient) AddDepthHook(fn ReceivedDepthDataCallback) error { + a.onDepthHook = fn + return nil +} +``` +使用方法参见 ws/ws_pub_channel_test.go中测试用例TestOrderBooks。 + +4. 错误消息类型回调函数 +```go +func (a *WsClient) AddErrMsgHook(fn ReceivedDataCallback) error { + a.OnErrorHook = fn + return nil +} +``` + +# 联系方式 +邮箱:caron_co@163.com +微信:caron_co diff --git a/rest/contants.go b/rest/contants.go new file mode 100644 index 0000000..4497aea --- /dev/null +++ b/rest/contants.go @@ -0,0 +1,35 @@ +package rest + +const ( + + /* + http headers + */ + OK_ACCESS_KEY = "OK-ACCESS-KEY" + OK_ACCESS_SIGN = "OK-ACCESS-SIGN" + OK_ACCESS_TIMESTAMP = "OK-ACCESS-TIMESTAMP" + OK_ACCESS_PASSPHRASE = "OK-ACCESS-PASSPHRASE" + X_SIMULATE_TRADING = "x-simulated-trading" + + CONTENT_TYPE = "Content-Type" + ACCEPT = "Accept" + COOKIE = "Cookie" + LOCALE = "locale=" + + APPLICATION_JSON = "application/json" + APPLICATION_JSON_UTF8 = "application/json; charset=UTF-8" + + /* + i18n: internationalization + */ + ENGLISH = "en_US" + SIMPLIFIED_CHINESE = "zh_CN" + //zh_TW || zh_HK + TRADITIONAL_CHINESE = "zh_HK" + + GET = "GET" + POST = "POST" +) + + + diff --git a/rest/go.mod b/rest/go.mod new file mode 100644 index 0000000..e69de29 diff --git a/rest/rest.go b/rest/rest.go new file mode 100644 index 0000000..d764125 --- /dev/null +++ b/rest/rest.go @@ -0,0 +1,331 @@ +package rest + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + "strings" + "time" + . "v5sdk_go/utils" +) + +type RESTAPI struct { + EndPoint string `json:"endPoint"` + // GET/POST + Method string `json:"method"` + Uri string `json:"uri"` + Param map[string]interface{} `json:"param"` + Timeout time.Duration + ApiKeyInfo *APIKeyInfo + isSimulate bool +} + +type APIKeyInfo struct { + ApiKey string + PassPhrase string + SecKey string + UserId string +} + +type RESTAPIResult struct { + Url string `json:"url"` + Param string `json:"param"` + Header string `json:"header"` + Code int `json:"code"` + // 原始返回信息 + Body string `json:"body"` + // okexV5返回的数据 + V5Response Okexv5APIResponse `json:"v5Response"` + ReqUsedTime time.Duration `json:"reqUsedTime"` + TotalUsedTime time.Duration `json:"totalUsedTime"` +} + +type Okexv5APIResponse struct { + Code string `json:"code"` + Msg string `json:"msg"` + Data []map[string]interface{} `json:"data"` +} + +/* + endPoint:请求地址 + apiKey + isSimulate: 是否为模拟环境 +*/ +func NewRESTClient(endPoint string, apiKey *APIKeyInfo, isSimulate bool) *RESTAPI { + + res := &RESTAPI{ + EndPoint: endPoint, + ApiKeyInfo: apiKey, + isSimulate: isSimulate, + Timeout: 5 * time.Second, + } + return res +} + +func NewRESTAPI(ep, method, uri string, param *map[string]interface{}) *RESTAPI { + //TODO:参数校验 + reqParam := make(map[string]interface{}) + + if param != nil { + reqParam = *param + } + res := &RESTAPI{ + EndPoint: ep, + Method: method, + Uri: uri, + Param: reqParam, + Timeout: 150 * time.Second, + } + return res +} + +func (this *RESTAPI) SetSimulate(b bool) *RESTAPI { + this.isSimulate = b + return this +} + +func (this *RESTAPI) SetAPIKey(apiKey, secKey, passPhrase string) *RESTAPI { + if this.ApiKeyInfo == nil { + this.ApiKeyInfo = &APIKeyInfo{ + ApiKey: apiKey, + PassPhrase: passPhrase, + SecKey: secKey, + } + } else { + this.ApiKeyInfo.ApiKey = apiKey + this.ApiKeyInfo.PassPhrase = passPhrase + this.ApiKeyInfo.SecKey = secKey + } + return this +} + +func (this *RESTAPI) SetUserId(userId string) *RESTAPI { + if this.ApiKeyInfo == nil { + fmt.Println("ApiKey为空") + return this + } + + this.ApiKeyInfo.UserId = userId + return this +} + +func (this *RESTAPI) SetTimeOut(timeout time.Duration) *RESTAPI { + this.Timeout = timeout + return this +} + +// GET请求 +func (this *RESTAPI) Get(ctx context.Context, uri string, param *map[string]interface{}) (res *RESTAPIResult, err error) { + this.Method = GET + this.Uri = uri + + reqParam := make(map[string]interface{}) + + if param != nil { + reqParam = *param + } + this.Param = reqParam + return this.Run(ctx) +} + +// POST请求 +func (this *RESTAPI) Post(ctx context.Context, uri string, param *map[string]interface{}) (res *RESTAPIResult, err error) { + this.Method = POST + this.Uri = uri + + reqParam := make(map[string]interface{}) + + if param != nil { + reqParam = *param + } + this.Param = reqParam + + return this.Run(ctx) +} + +func (this *RESTAPI) Run(ctx context.Context) (res *RESTAPIResult, err error) { + + if this.ApiKeyInfo == nil { + err = errors.New("APIKey不可为空") + return + } + + procStart := time.Now() + + defer func() { + if res != nil { + res.TotalUsedTime = time.Since(procStart) + } + }() + + client := &http.Client{ + Timeout: this.Timeout, + } + + uri, body, err := this.GenReqInfo() + if err != nil { + return + } + + url := this.EndPoint + uri + bodyBuf := new(bytes.Buffer) + bodyBuf.ReadFrom(strings.NewReader(body)) + + req, err := http.NewRequest(this.Method, url, bodyBuf) + if err != nil { + return + } + + res = &RESTAPIResult{ + Url: url, + Param: body, + } + + // Sign and set request headers + timestamp := IsoTime() + preHash := PreHashString(timestamp, this.Method, uri, body) + //log.Println("preHash:", preHash) + sign, err := HmacSha256Base64Signer(preHash, this.ApiKeyInfo.SecKey) + if err != nil { + return + } + //log.Println("sign:", sign) + headStr := this.SetHeaders(req, timestamp, sign) + res.Header = headStr + + this.PrintRequest(req, body, preHash) + resp, err := client.Do(req) + if err != nil { + fmt.Println("请求失败!", err) + return + } + defer resp.Body.Close() + + res.ReqUsedTime = time.Since(procStart) + + resBuff, err := ioutil.ReadAll(resp.Body) + if err != nil { + fmt.Println("获取请求结果失败!", err) + return + } + + res.Body = string(resBuff) + res.Code = resp.StatusCode + + // 解析结果 + var v5rsp Okexv5APIResponse + err = json.Unmarshal(resBuff, &v5rsp) + if err != nil { + fmt.Println("解析v5返回失败!", err) + return + } + + res.V5Response = v5rsp + + return +} + +/* + 生成请求对应的参数 +*/ +func (this *RESTAPI) GenReqInfo() (uri string, body string, err error) { + uri = this.Uri + + switch this.Method { + case GET: + getParam := []string{} + + if len(this.Param) == 0 { + return + } + + for k, v := range this.Param { + getParam = append(getParam, fmt.Sprintf("%v=%v", k, v)) + } + uri = uri + "?" + strings.Join(getParam, "&") + + case POST: + + var rawBody []byte + rawBody, err = json.Marshal(this.Param) + if err != nil { + return + } + body = string(rawBody) + default: + err = errors.New("request type unknown!") + return + } + + return +} + +/* + Set http request headers: + Accept: application/json + Content-Type: application/json; charset=UTF-8 (default) + Cookie: locale=en_US (English) + OK-ACCESS-KEY: (Your setting) + OK-ACCESS-SIGN: (Use your setting, auto sign and add) + OK-ACCESS-TIMESTAMP: (Auto add) + OK-ACCESS-PASSPHRASE: Your setting +*/ +func (this *RESTAPI) SetHeaders(request *http.Request, timestamp string, sign string) (header string) { + + request.Header.Add(ACCEPT, APPLICATION_JSON) + header += ACCEPT + ":" + APPLICATION_JSON + "\n" + + request.Header.Add(CONTENT_TYPE, APPLICATION_JSON_UTF8) + header += CONTENT_TYPE + ":" + APPLICATION_JSON_UTF8 + "\n" + + request.Header.Add(COOKIE, LOCALE+ENGLISH) + header += COOKIE + ":" + LOCALE + ENGLISH + "\n" + + request.Header.Add(OK_ACCESS_KEY, this.ApiKeyInfo.ApiKey) + header += OK_ACCESS_KEY + ":" + this.ApiKeyInfo.ApiKey + "\n" + + request.Header.Add(OK_ACCESS_SIGN, sign) + header += OK_ACCESS_SIGN + ":" + sign + "\n" + + request.Header.Add(OK_ACCESS_TIMESTAMP, timestamp) + header += OK_ACCESS_TIMESTAMP + ":" + timestamp + "\n" + + request.Header.Add(OK_ACCESS_PASSPHRASE, this.ApiKeyInfo.PassPhrase) + header += OK_ACCESS_PASSPHRASE + ":" + this.ApiKeyInfo.PassPhrase + "\n" + + //模拟盘交易标记 + if this.isSimulate { + request.Header.Add(X_SIMULATE_TRADING, "1") + header += X_SIMULATE_TRADING + ":1" + "\n" + } + return +} + +/* + 打印header信息 +*/ +func (this *RESTAPI) PrintRequest(request *http.Request, body string, preHash string) { + if this.ApiKeyInfo.SecKey != "" { + fmt.Println(" Secret-Key: " + this.ApiKeyInfo.SecKey) + } + fmt.Println(" Request(" + IsoTime() + "):") + fmt.Println("\tUrl: " + request.URL.String()) + fmt.Println("\tMethod: " + strings.ToUpper(request.Method)) + if len(request.Header) > 0 { + fmt.Println("\tHeaders: ") + for k, v := range request.Header { + if strings.Contains(k, "Ok-") { + k = strings.ToUpper(k) + } + fmt.Println("\t\t" + k + ": " + v[0]) + } + } + fmt.Println("\tBody: " + body) + if preHash != "" { + fmt.Println(" PreHash: " + preHash) + } +} diff --git a/rest/rest_test.go b/rest/rest_test.go new file mode 100644 index 0000000..cc6c753 --- /dev/null +++ b/rest/rest_test.go @@ -0,0 +1,100 @@ +package rest + +import ( + "context" + "fmt" + "testing" +) + +/* + GET请求 +*/ +func TestRESTAPIGet(t *testing.T) { + + rest := NewRESTAPI("https://www.okex.win", GET, "/api/v5/account/balance", nil) + rest.SetSimulate(true).SetAPIKey("xxxx", "xxxx", "xxxx") + rest.SetUserId("xxxxx") + response, err := rest.Run(context.Background()) + if err != nil { + fmt.Println(err) + return + } + + fmt.Println("Response:") + fmt.Println("\thttp code: ", response.Code) + fmt.Println("\t总耗时: ", response.TotalUsedTime) + fmt.Println("\t请求耗时: ", response.ReqUsedTime) + fmt.Println("\t返回消息: ", response.Body) + fmt.Println("\terrCode: ", response.V5Response.Code) + fmt.Println("\terrMsg: ", response.V5Response.Msg) + fmt.Println("\tdata: ", response.V5Response.Data) + + // 请求的另一种方式 + apikey := APIKeyInfo{ + ApiKey: "xxxxx", + SecKey: "xxxxx", + PassPhrase: "xxx", + } + + cli := NewRESTClient("https://www.okex.win", &apikey, true) + rsp, err := cli.Get(context.Background(), "/api/v5/account/balance", nil) + if err != nil { + return + } + + fmt.Println("Response:") + fmt.Println("\thttp code: ", rsp.Code) + fmt.Println("\t总耗时: ", rsp.TotalUsedTime) + fmt.Println("\t请求耗时: ", rsp.ReqUsedTime) + fmt.Println("\t返回消息: ", rsp.Body) + fmt.Println("\terrCode: ", rsp.V5Response.Code) + fmt.Println("\terrMsg: ", rsp.V5Response.Msg) + fmt.Println("\tdata: ", rsp.V5Response.Data) +} + +/* + POST请求 +*/ +func TestRESTAPIPost(t *testing.T) { + param := make(map[string]interface{}) + param["greeksType"] = "PA" + + rest := NewRESTAPI("https://www.okex.win", POST, "/api/v5/account/set-greeks", ¶m) + rest.SetSimulate(true).SetAPIKey("xxxx", "xxxx", "xxxx") + response, err := rest.Run(context.Background()) + if err != nil { + fmt.Println(err) + return + } + + fmt.Println("Response:") + fmt.Println("\thttp code: ", response.Code) + fmt.Println("\t总耗时: ", response.TotalUsedTime) + fmt.Println("\t请求耗时: ", response.ReqUsedTime) + fmt.Println("\t返回消息: ", response.Body) + fmt.Println("\terrCode: ", response.V5Response.Code) + fmt.Println("\terrMsg: ", response.V5Response.Msg) + fmt.Println("\tdata: ", response.V5Response.Data) + + // 请求的另一种方式 + apikey := APIKeyInfo{ + ApiKey: "xxxx", + SecKey: "xxxxx", + PassPhrase: "xxxx", + } + + cli := NewRESTClient("https://www.okex.win", &apikey, true) + rsp, err := cli.Post(context.Background(), "/api/v5/account/set-greeks", ¶m) + if err != nil { + return + } + + fmt.Println("Response:") + fmt.Println("\thttp code: ", rsp.Code) + fmt.Println("\t总耗时: ", rsp.TotalUsedTime) + fmt.Println("\t请求耗时: ", rsp.ReqUsedTime) + fmt.Println("\t返回消息: ", rsp.Body) + fmt.Println("\terrCode: ", rsp.V5Response.Code) + fmt.Println("\terrMsg: ", rsp.V5Response.Msg) + fmt.Println("\tdata: ", rsp.V5Response.Data) +} diff --git a/utils/go.mod b/utils/go.mod new file mode 100644 index 0000000..e69de29 diff --git a/utils/utils.go b/utils/utils.go new file mode 100644 index 0000000..830186d --- /dev/null +++ b/utils/utils.go @@ -0,0 +1,102 @@ +package utils + +import ( + "bytes" + "compress/flate" + "crypto/hmac" + "crypto/sha256" + "encoding/base64" + "encoding/json" + "io/ioutil" + "log" + "strconv" + "strings" + "time" + //"net/http" +) + +/* + Get a epoch time + eg: 1521221737 +*/ +func EpochTime() string { + millisecond := time.Now().UnixNano() / 1000000 + epoch := strconv.Itoa(int(millisecond)) + epochBytes := []byte(epoch) + epoch = string(epochBytes[:10]) + return epoch +} + +/* + signing a message + using: hmac sha256 + base64 + eg: + message = Pre_hash function comment + secretKey = E65791902180E9EF4510DB6A77F6EBAE + + return signed string = TO6uwdqz+31SIPkd4I+9NiZGmVH74dXi+Fd5X0EzzSQ= +*/ +func HmacSha256Base64Signer(message string, secretKey string) (string, error) { + mac := hmac.New(sha256.New, []byte(secretKey)) + _, err := mac.Write([]byte(message)) + if err != nil { + return "", err + } + return base64.StdEncoding.EncodeToString(mac.Sum(nil)), nil +} + +/* + the pre hash string + eg: + timestamp = 2018-03-08T10:59:25.789Z + method = POST + request_path = /orders?before=2&limit=30 + body = {"product_id":"BTC-USD-0309","order_id":"377454671037440"} + + return pre hash string = 2018-03-08T10:59:25.789ZPOST/orders?before=2&limit=30{"product_id":"BTC-USD-0309","order_id":"377454671037440"} +*/ +func PreHashString(timestamp string, method string, requestPath string, body string) string { + return timestamp + strings.ToUpper(method) + requestPath + body +} + +/* + struct convert json string +*/ +func Struct2JsonString(raw interface{}) (jsonString string, err error) { + //fmt.Println("转化json,", raw) + data, err := json.Marshal(raw) + if err != nil { + log.Println("convert json failed!", err) + return "", err + } + //log.Println(string(data)) + return string(data), nil +} + +// 解压缩消息 +func GzipDecode(in []byte) ([]byte, error) { + reader := flate.NewReader(bytes.NewReader(in)) + defer reader.Close() + + return ioutil.ReadAll(reader) +} + + +/* + Get a iso time + eg: 2018-03-16T18:02:48.284Z +*/ +func IsoTime() string { + utcTime := time.Now().UTC() + iso := utcTime.String() + isoBytes := []byte(iso) + iso = string(isoBytes[:10]) + "T" + string(isoBytes[11:23]) + "Z" + return iso +} + + + + + + + diff --git a/utils/utils_test.go b/utils/utils_test.go new file mode 100644 index 0000000..09bcf5b --- /dev/null +++ b/utils/utils_test.go @@ -0,0 +1,17 @@ +package utils + +import ( + "fmt" + "testing" +) + +func TestHmacSha256Base64Signer(t *testing.T) { + raw := `2021-04-06T03:33:21.681ZPOST/api/v5/trade/order{"instId":"ETH-USDT-SWAP","ordType":"limit","px":"2300","side":"sell","sz":"1","tdMode":"cross"}` + key := "1A9E86759F2D2AA16E389FD3B7F8273E" + res, err := HmacSha256Base64Signer(raw, key) + if err != nil { + t.Fatal(err) + } + fmt.Println(res) + t.Log(res) +} diff --git a/ws/go.mod b/ws/go.mod new file mode 100644 index 0000000..e69de29 diff --git a/ws/utils.go b/ws/utils.go new file mode 100644 index 0000000..3c75066 --- /dev/null +++ b/ws/utils.go @@ -0,0 +1,75 @@ +package ws + +import ( + "errors" + "log" + "runtime/debug" + . "v5sdk_go/ws/wImpl" + . "v5sdk_go/ws/wInterface" +) + +// 判断返回结果成功失败 +func checkResult(wsReq WSReqData, wsRsps []*Msg) (res bool, err error) { + defer func() { + a := recover() + if a != nil { + log.Printf("Receive End. Recover msg: %+v", a) + debug.PrintStack() + } + return + }() + + res = false + if len(wsRsps) == 0 { + return + } + + for _, v := range wsRsps { + switch v.Info.(type) { + case ErrData: + return + } + if wsReq.GetType() != v.Info.(WSRspData).MsgType() { + err = errors.New("消息类型不一致") + return + } + } + + //检查所有频道是否都更新成功 + if wsReq.GetType() == MSG_NORMAL { + req, ok := wsReq.(ReqData) + if !ok { + log.Println("类型转化失败", req) + err = errors.New("类型转化失败") + return + } + + for idx, _ := range req.Args { + ok := false + i_req := req.Args[idx] + //fmt.Println("检查",i_req) + for i, _ := range wsRsps { + info, _ := wsRsps[i].Info.(RspData) + //fmt.Println("<<",info) + if info.Event == req.Op && info.Arg["channel"] == i_req["channel"] && info.Arg["instType"] == i_req["instType"] { + ok = true + continue + } + } + if !ok { + err = errors.New("未得到所有的期望的返回结果") + return + } + } + } else { + for i, _ := range wsRsps { + info, _ := wsRsps[i].Info.(JRPCRsp) + if info.Code != "0" { + return + } + } + } + + res = true + return +} diff --git a/ws/wImpl/BookData.go b/ws/wImpl/BookData.go new file mode 100644 index 0000000..8867c8d --- /dev/null +++ b/ws/wImpl/BookData.go @@ -0,0 +1,226 @@ +/* + 订阅频道后收到的推送数据 +*/ + +package wImpl + +import ( + "bytes" + "errors" + "fmt" + "hash/crc32" + "log" + "strconv" + "strings" +) + +// 普通推送 +type MsgData struct { + Arg map[string]string `json:"arg"` + Data []interface{} `json:"data"` +} + +// 深度数据 +type DepthData struct { + Arg map[string]string `json:"arg"` + Action string `json:"action"` + Data []DepthDetail `json:"data"` +} + +type DepthDetail struct { + Asks [][]string `json:"asks"` + Bids [][]string `json:"bids"` + Ts string `json:"ts"` + Checksum int32 `json:"checksum"` +} + +/* + 深度数据校验 +*/ +func (this *DepthData) CheckSum(snap *DepthDetail) (pDepData *DepthDetail, err error) { + + if len(this.Data) != 1 { + err = errors.New("深度数据错误!") + return + } + + if this.Action == DEPTH_SNAPSHOT { + _, cs := CalCrc32(this.Data[0].Asks, this.Data[0].Bids) + + if cs != this.Data[0].Checksum { + err = errors.New("校验失败!") + return + } + pDepData = &this.Data[0] + log.Println("snapshot校验成功", this.Data[0].Checksum) + + } + + if this.Action == DEPTH_UPDATE { + if snap == nil { + err = errors.New("深度快照数据不可为空!") + return + } + + pDepData, err = MergDepthData(*snap, this.Data[0], this.Data[0].Checksum) + if err != nil { + return + } + log.Println("update校验成功", this.Data[0].Checksum) + } + + return +} + +func CalCrc32(askDepths [][]string, bidDepths [][]string) (bytes.Buffer, int32) { + + crc32BaseBuffer := bytes.Buffer{} + crcAskDepth, crcBidDepth := 25, 25 + + if len(askDepths) < 25 { + crcAskDepth = len(askDepths) + } + if len(bidDepths) < 25 { + crcBidDepth = len(bidDepths) + } + if crcAskDepth == crcBidDepth { + for i := 0; i < crcAskDepth; i++ { + if crc32BaseBuffer.Len() > 0 { + crc32BaseBuffer.WriteString(":") + } + crc32BaseBuffer.WriteString( + fmt.Sprintf("%v:%v:%v:%v", + (bidDepths)[i][0], (bidDepths)[i][1], + (askDepths)[i][0], (askDepths)[i][1])) + } + } else { + + var crcArr []string + for i, j := 0, 0; i < crcBidDepth || j < crcAskDepth; { + + if i < crcBidDepth { + crcArr = append(crcArr, fmt.Sprintf("%v:%v", (bidDepths)[i][0], (bidDepths)[i][1])) + i++ + } + + if j < crcAskDepth { + crcArr = append(crcArr, fmt.Sprintf("%v:%v", (askDepths)[j][0], (askDepths)[j][1])) + j++ + } + } + + crc32BaseBuffer.WriteString(strings.Join(crcArr, ":")) + } + + expectCrc32 := int32(crc32.ChecksumIEEE(crc32BaseBuffer.Bytes())) + + return crc32BaseBuffer, expectCrc32 +} + +/* + 深度合并的内部方法 + 返回结果: + res:合并后的深度 + index: 最新的 ask1/bids1 的索引 +*/ +func mergeDepth(oldDepths [][]string, newDepths [][]string, method string) (res [][]string, err error) { + + oldIdx, newIdx := 0, 0 + + for oldIdx < len(oldDepths) && newIdx < len(newDepths) { + + oldItem := oldDepths[oldIdx] + newItem := newDepths[newIdx] + var oldPrice, newPrice float64 + oldPrice, err = strconv.ParseFloat(oldItem[0], 10) + if err != nil { + return + } + newPrice, err = strconv.ParseFloat(newItem[0], 10) + if err != nil { + return + } + + if oldPrice == newPrice { + if newItem[1] != "0" { + res = append(res, newItem) + } + + oldIdx++ + newIdx++ + } else { + switch method { + // 降序 + case "bids": + if oldPrice < newPrice { + res = append(res, newItem) + newIdx++ + } else { + + res = append(res, oldItem) + oldIdx++ + } + // 升序 + case "asks": + if oldPrice > newPrice { + res = append(res, newItem) + newIdx++ + } else { + + res = append(res, oldItem) + oldIdx++ + } + } + } + + } + + if oldIdx < len(oldDepths) { + res = append(res, oldDepths[oldIdx:]...) + } + + if newIdx < len(newDepths) { + res = append(res, newDepths[newIdx:]...) + } + + return +} + +/* + 深度合并,并校验 +*/ +func MergDepthData(snap DepthDetail, update DepthDetail, expChecksum int32) (res *DepthDetail, err error) { + + newAskDepths, err1 := mergeDepth(snap.Asks, update.Asks, "asks") + if err1 != nil { + return + } + + // log.Println("old Ask - ", snap.Asks) + // log.Println("update Ask - ", update.Asks) + // log.Println("new Ask - ", newAskDepths) + newBidDepths, err2 := mergeDepth(snap.Bids, update.Bids, "bids") + if err2 != nil { + return + } + // log.Println("old Bids - ", snap.Bids) + // log.Println("update Bids - ", update.Bids) + // log.Println("new Bids - ", newBidDepths) + + cBuf, checksum := CalCrc32(newAskDepths, newBidDepths) + if checksum != expChecksum { + err = errors.New("校验失败!") + log.Println("buffer:", cBuf.String()) + log.Fatal(checksum, expChecksum) + return + } + + res = &DepthDetail{ + Asks: newAskDepths, + Bids: newBidDepths, + Ts: update.Ts, + Checksum: update.Checksum, + } + + return +} diff --git a/ws/wImpl/ErrData.go b/ws/wImpl/ErrData.go new file mode 100644 index 0000000..020164b --- /dev/null +++ b/ws/wImpl/ErrData.go @@ -0,0 +1,13 @@ +/* + 错误数据 +*/ +package wImpl + +// 服务端请求错误返回消息格式 +type ErrData struct { + Event string `json:"event"` + Code string `json:"code"` + Msg string `json:"msg"` +} + + diff --git a/ws/wImpl/JRPCData.go b/ws/wImpl/JRPCData.go new file mode 100644 index 0000000..cfd3223 --- /dev/null +++ b/ws/wImpl/JRPCData.go @@ -0,0 +1,50 @@ +/* + JRPC请求/响应数据 +*/ +package wImpl + +import ( + "encoding/json" + . "v5sdk_go/utils" +) + +// jrpc请求结构体 +type JRPCReq struct { + Id string `json:"id"` + Op string `json:"op"` + Args []map[string]interface{} `json:"args"` +} + +func (r JRPCReq) GetType() int { + return MSG_JRPC +} + +func (r JRPCReq) ToString() string { + data, err := Struct2JsonString(r) + if err != nil { + return "" + } + return data +} + +func (r JRPCReq) Len() int { + return 1 +} + +// jrpc响应结构体 +type JRPCRsp struct { + Id string `json:"id"` + Op string `json:"op"` + Data []map[string]interface{} `json:"data"` + Code string `json:"code"` + Msg string `json:"msg"` +} + +func (r JRPCRsp) MsgType() int { + return MSG_JRPC +} + +func (r JRPCRsp) String() string { + raw, _ := json.Marshal(r) + return string(raw) +} diff --git a/ws/wImpl/ReqData.go b/ws/wImpl/ReqData.go new file mode 100644 index 0000000..df7afa1 --- /dev/null +++ b/ws/wImpl/ReqData.go @@ -0,0 +1,47 @@ +/* + 普通订阅请求和响应的数据格式 +*/ + +package wImpl + +import ( + "encoding/json" + . "v5sdk_go/utils" +) + +// 客户端请求消息格式 +type ReqData struct { + Op string `json:"op"` + Args []map[string]string `json:"args"` +} + +func (r ReqData) GetType() int { + return MSG_NORMAL +} + +func (r ReqData) ToString() string { + data, err := Struct2JsonString(r) + if err != nil { + return "" + } + return data +} + +func (r ReqData) Len() int { + return len(r.Args) +} + +// 服务端请求响应消息格式 +type RspData struct { + Event string `json:"event"` + Arg map[string]string `json:"arg"` +} + +func (r RspData) MsgType() int { + return MSG_NORMAL +} + +func (r RspData) String() string { + raw, _ := json.Marshal(r) + return string(raw) +} diff --git a/ws/wImpl/contants.go b/ws/wImpl/contants.go new file mode 100644 index 0000000..c095fae --- /dev/null +++ b/ws/wImpl/contants.go @@ -0,0 +1,241 @@ +package wImpl + +import ( + "regexp" +) + +/* + + */ + +const ( + MSG_NORMAL = iota + MSG_JRPC +) + +//事件 +type Event int + +/* + EventID +*/ +const ( + EVENT_UNKNOWN Event = iota + EVENT_ERROR + EVENT_PING + EVENT_LOGIN + + //订阅公共频道 + EVENT_BOOK_INSTRUMENTS + EVENT_STATUS + EVENT_BOOK_TICKERS + EVENT_BOOK_OPEN_INTEREST + EVENT_BOOK_KLINE + EVENT_BOOK_TRADE + EVENT_BOOK_ESTIMATE_PRICE + EVENT_BOOK_MARK_PRICE + EVENT_BOOK_MARK_PRICE_CANDLE_CHART + EVENT_BOOK_LIMIT_PRICE + EVENT_BOOK_ORDER_BOOK + EVENT_BOOK_ORDER_BOOK5 + EVENT_BOOK_ORDER_BOOK_TBT + EVENT_BOOK_ORDER_BOOK50_TBT + EVENT_BOOK_OPTION_SUMMARY + EVENT_BOOK_FUND_RATE + EVENT_BOOK_KLINE_INDEX + EVENT_BOOK_INDEX_TICKERS + + //订阅私有频道 + EVENT_BOOK_ACCOUNT + EVENT_BOOK_POSTION + EVENT_BOOK_ORDER + EVENT_BOOK_ALG_ORDER + EVENT_BOOK_B_AND_P + + // JRPC + EVENT_PLACE_ORDER + EVENT_PLACE_BATCH_ORDERS + EVENT_CANCEL_ORDER + EVENT_CANCEL_BATCH_ORDERS + EVENT_AMEND_ORDER + EVENT_AMEND_BATCH_ORDERS + + //订阅返回数据 + EVENT_BOOKED_DATA + EVENT_DEPTH_DATA +) + +/* + EventID,事件名称,channel + 注: 带有周期参数的频道 如 行情频道 ,需要将channel写为 正则表达模式方便 类型匹配,如 "^candle*" +*/ +var EVENT_TABLE = [][]interface{}{ + // 未知的消息 + {EVENT_UNKNOWN, "未知", ""}, + // 错误的消息 + {EVENT_ERROR, "错误", ""}, + // Ping + {EVENT_PING, "ping", ""}, + // 登陆 + {EVENT_LOGIN, "登录", ""}, + + /* + 订阅公共频道 + */ + + {EVENT_BOOK_INSTRUMENTS, "产品", "instruments"}, + {EVENT_STATUS, "status", "status"}, + {EVENT_BOOK_TICKERS, "行情", "tickers"}, + {EVENT_BOOK_OPEN_INTEREST, "持仓总量", "open-interest"}, + {EVENT_BOOK_KLINE, "K线", "candle"}, + {EVENT_BOOK_TRADE, "交易", "trades"}, + {EVENT_BOOK_ESTIMATE_PRICE, "预估交割/行权价格", "estimated-price"}, + {EVENT_BOOK_MARK_PRICE, "标记价格", "mark-price"}, + {EVENT_BOOK_MARK_PRICE_CANDLE_CHART, "标记价格K线", "mark-price-candle"}, + {EVENT_BOOK_LIMIT_PRICE, "限价", "price-limit"}, + {EVENT_BOOK_ORDER_BOOK, "400档深度", "books"}, + {EVENT_BOOK_ORDER_BOOK5, "5档深度", "books5"}, + {EVENT_BOOK_ORDER_BOOK_TBT, "tbt深度", "books-l2-tbt"}, + {EVENT_BOOK_ORDER_BOOK50_TBT, "tbt50深度", "books50-l2-tbt"}, + {EVENT_BOOK_OPTION_SUMMARY, "期权定价", "opt-summary"}, + {EVENT_BOOK_FUND_RATE, "资金费率", "funding-rate"}, + {EVENT_BOOK_KLINE_INDEX, "指数K线", "index-candle"}, + {EVENT_BOOK_INDEX_TICKERS, "指数行情", "index-tickers"}, + + /* + 订阅私有频道 + */ + {EVENT_BOOK_ACCOUNT, "账户", "account"}, + {EVENT_BOOK_POSTION, "持仓", "positions"}, + {EVENT_BOOK_ORDER, "订单", "orders"}, + {EVENT_BOOK_ALG_ORDER, "策略委托订单", "orders-algo"}, + {EVENT_BOOK_B_AND_P, "账户余额和持仓", "balance_and_position"}, + + /* + JRPC + */ + {EVENT_PLACE_ORDER, "下单", "order"}, + {EVENT_PLACE_BATCH_ORDERS, "批量下单", "batch-orders"}, + {EVENT_CANCEL_ORDER, "撤单", "cancel-order"}, + {EVENT_CANCEL_BATCH_ORDERS, "批量撤单", "batch-cancel-orders"}, + {EVENT_AMEND_ORDER, "改单", "amend-order"}, + {EVENT_AMEND_BATCH_ORDERS, "批量改单", "batch-amend-orders"}, + + /* + 订阅返回数据 + 注意:推送数据channle统一为"" + */ + {EVENT_BOOKED_DATA, "普通推送", ""}, + {EVENT_DEPTH_DATA, "深度推送", ""}, +} + +/* + 获取事件名称 +*/ +func (e Event) String() string { + for _, v := range EVENT_TABLE { + eventId := v[0].(Event) + if e == eventId { + return v[1].(string) + } + } + + return "" +} + +/* + 通过事件获取对应的channel信息 + 对于频道名称有时间周期的 通过参数 pd 传入,拼接后返回完整channel信息 +*/ +func (e Event) GetChannel(pd Period) string { + + channel := "" + + for _, v := range EVENT_TABLE { + eventId := v[0].(Event) + if e == eventId { + channel = v[2].(string) + break + } + } + + if channel == "" { + return "" + } + + return channel + string(pd) +} + +/* + 通过channel信息匹配获取事件类型 +*/ +func GetEventId(raw string) Event { + evt := EVENT_UNKNOWN + + for _, v := range EVENT_TABLE { + channel := v[2].(string) + if raw == channel { + evt = v[0].(Event) + break + } + + regexp := regexp.MustCompile(`^(.*)([1-9][0-9]?[\w])$`) + //regexp := regexp.MustCompile(`^http://www.flysnow.org/([\d]{4})/([\d]{2})/([\d]{2})/([\w-]+).html$`) + + substr := regexp.FindStringSubmatch(raw) + //fmt.Println(substr) + if len(substr) >= 2 { + if substr[1] == channel { + evt = v[0].(Event) + break + } + } + } + + return evt +} + +// 时间维度 +type Period string + +const ( + // 年 + PERIOD_1YEAR Period = "1Y" + + // 月 + PERIOD_6Mon Period = "6M" + PERIOD_3Mon Period = "3M" + PERIOD_1Mon Period = "1M" + + // 周 + PERIOD_1WEEK Period = "1W" + + // 天 + PERIOD_5DAY Period = "5D" + PERIOD_3DAY Period = "3D" + PERIOD_2DAY Period = "2D" + PERIOD_1DAY Period = "1D" + + // 小时 + PERIOD_12HOUR Period = "12H" + PERIOD_6HOUR Period = "6H" + PERIOD_4HOUR Period = "4H" + PERIOD_2HOUR Period = "2H" + PERIOD_1HOUR Period = "1H" + + // 分钟 + PERIOD_30MIN Period = "30m" + PERIOD_15MIN Period = "15m" + PERIOD_5MIN Period = "5m" + PERIOD_3MIN Period = "3m" + PERIOD_1MIN Period = "1m" + + // 缺省 + PERIOD_NONE Period = "" +) + +// 深度枚举 +const ( + DEPTH_SNAPSHOT = "snapshot" + DEPTH_UPDATE = "update" +) diff --git a/ws/wImpl/contants_test.go b/ws/wImpl/contants_test.go new file mode 100644 index 0000000..b4186ba --- /dev/null +++ b/ws/wImpl/contants_test.go @@ -0,0 +1,28 @@ +package wImpl + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGetEventId(t *testing.T) { + + id1 := GetEventId("index-candle30m") + + assert.True(t, id1 == EVENT_BOOK_KLINE_INDEX) + + id2 := GetEventId("candle1Y") + + assert.True(t, id2 == EVENT_BOOK_KLINE) + + id3 := GetEventId("orders-algo") + assert.True(t, id3 == EVENT_BOOK_ALG_ORDER) + + id4 := GetEventId("balance_and_position") + assert.True(t, id4 == EVENT_BOOK_B_AND_P) + + id5 := GetEventId("index-candle1m") + assert.True(t, id5 == EVENT_BOOK_KLINE_INDEX) + +} diff --git a/ws/wInterface/IParam.go b/ws/wInterface/IParam.go new file mode 100644 index 0000000..72f10e1 --- /dev/null +++ b/ws/wInterface/IParam.go @@ -0,0 +1,9 @@ +package wInterface + +import . "v5sdk_go/ws/wImpl" + +// 请求数据 +type WSParam interface { + EventType() Event + ToMap() *map[string]string +} diff --git a/ws/wInterface/IReqData.go b/ws/wInterface/IReqData.go new file mode 100644 index 0000000..3bd299e --- /dev/null +++ b/ws/wInterface/IReqData.go @@ -0,0 +1,8 @@ +package wInterface + +// 请求数据 +type WSReqData interface { + GetType() int + Len() int + ToString() string +} diff --git a/ws/wInterface/IRspData.go b/ws/wInterface/IRspData.go new file mode 100644 index 0000000..5282d71 --- /dev/null +++ b/ws/wInterface/IRspData.go @@ -0,0 +1,6 @@ +package wInterface + +// 返回数据 +type WSRspData interface { + MsgType() int +} diff --git a/ws/ws_AddBookedDataHook_test.go b/ws/ws_AddBookedDataHook_test.go new file mode 100644 index 0000000..70ff328 --- /dev/null +++ b/ws/ws_AddBookedDataHook_test.go @@ -0,0 +1,147 @@ +package ws + +// HOW TO RUN +// go test ws_cli.go ws_op.go ws_contants.go utils.go ws_pub_channel.go ws_pub_channel_test.go ws_priv_channel.go ws_priv_channel_test.go ws_jrpc.go ws_jrpc_test.go ws_test_AddBookedDataHook.go -v +import ( + "fmt" + "log" + "testing" + "time" + . "v5sdk_go/ws/wImpl" +) + +const ( + TRADE_ACCOUNT = iota + ISOLATE_ACCOUNT + CROSS_ACCOUNT + CROSS_ACCOUNT_B +) + +func init() { + log.SetFlags(log.LstdFlags | log.Llongfile) + +} + +func prework() *WsClient { + ep := "wss://wsaws.okex.com:8443/ws/v5/private" + + r, err := NewWsClient(ep) + if err != nil { + log.Fatal(err) + } + + err = r.Start() + if err != nil { + log.Fatal(err, ep) + } + return r +} +func prework_pri(t int) *WsClient { + // 模拟环境 + ep := "wss://wsaws.okex.com:8443/ws/v5/private" + var apikey, passphrase, secretKey string + // 把账号密码写这里 + switch t { + case TRADE_ACCOUNT: + apikey = "fe468418-5e40-433f-8d04-04951286d417" + passphrase = "M4pw71Id" + secretKey = "D6D74DF9DD60A25BE2B27CA71D8F814D" + case ISOLATE_ACCOUNT: + apikey = "fe468418-5e40-433f-8d04-04951286d417" + passphrase = "M4pw71Id" + secretKey = "D6D74DF9DD60A25BE2B27CA71D8F814D" + case CROSS_ACCOUNT: + apikey = "fe468418-5e40-433f-8d04-04951286d417" + passphrase = "M4pw71Id" + secretKey = "D6D74DF9DD60A25BE2B27CA71D8F814D" + case CROSS_ACCOUNT_B: + apikey = "fe468418-5e40-433f-8d04-04951286d417" + passphrase = "M4pw71Id" + secretKey = "D6D74DF9DD60A25BE2B27CA71D8F814D" + } + + r, err := NewWsClient(ep) + if err != nil { + log.Fatal(err) + } + + err = r.Start() + if err != nil { + log.Fatal(err) + } + + var res bool + start := time.Now() + res, _, err = r.Login(apikey, secretKey, passphrase) + if res { + usedTime := time.Since(start) + fmt.Println("登录成功!", usedTime.String()) + } else { + log.Fatal("登录失败!", err) + } + fmt.Println(apikey, secretKey, passphrase) + return r +} + +func TestAddBookedDataHook(t *testing.T) { + var r *WsClient + + /*订阅私有频道*/ + { + r = prework_pri(CROSS_ACCOUNT) + var res bool + var err error + + r.AddBookMsgHook(func(ts time.Time, data MsgData) error { + // 添加你的方法 + fmt.Println("这是自定义AddBookMsgHook") + fmt.Println("当前数据是", data) + return nil + }) + + param := map[string]string{} + param["channel"] = "account" + param["ccy"] = "BTC" + + res, _, err = r.Subscribe(param) + if res { + fmt.Println("订阅成功!") + } else { + fmt.Println("订阅失败!", err) + t.Fatal("订阅失败!", err) + //return + } + + time.Sleep(100 * time.Second) + } + + //订阅公共频道 + { + r = prework() + var res bool + var err error + + // r.AddBookMsgHook(func(ts time.Time, data MsgData) error { + // 添加你的方法 + // fmt.Println("这是公共自定义AddBookMsgHook") + // fmt.Println("当前数据是", data) + // return nil + // }) + + param := map[string]string{} + param["channel"] = "instruments" + param["instType"] = "FUTURES" + + res, _, err = r.Subscribe(param) + if res { + fmt.Println("订阅成功!") + } else { + fmt.Println("订阅失败!", err) + t.Fatal("订阅失败!", err) + //return + } + + select {} + } + +} diff --git a/ws/ws_cli.go b/ws/ws_cli.go new file mode 100644 index 0000000..0ed1553 --- /dev/null +++ b/ws/ws_cli.go @@ -0,0 +1,725 @@ +package ws + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log" + "regexp" + "runtime/debug" + "sync" + "time" + . "v5sdk_go/config" + . "v5sdk_go/utils" + . "v5sdk_go/ws/wImpl" + + "github.com/gorilla/websocket" +) + +// 全局回调函数 +type ReceivedDataCallback func(*Msg) error + +// 普通订阅推送数据回调函数 +type ReceivedMsgDataCallback func(time.Time, MsgData) error + +// 深度订阅推送数据回调函数 +type ReceivedDepthDataCallback func(time.Time, DepthData) error + +// websocket +type WsClient struct { + WsEndPoint string + WsApi *ApiInfo + conn *websocket.Conn + sendCh chan string //发消息队列 + resCh chan *Msg //收消息队列 + + errCh chan *Msg + regCh map[Event]chan *Msg //请求响应队列 + + quitCh chan struct{} + lock sync.RWMutex + + onMessageHook ReceivedDataCallback //全局消息回调函数 + onBookMsgHook ReceivedMsgDataCallback //普通订阅消息回调函数 + onDepthHook ReceivedDepthDataCallback //深度订阅消息回调函数 + OnErrorHook ReceivedDataCallback //错误处理回调函数 + + // 记录深度信息 + DepthDataList map[string]DepthDetail + autoDepthMgr bool // 深度数据管理(checksum等) + DepthDataLock sync.RWMutex + + isStarted bool //防止重复启动和关闭 + dailTimeout time.Duration +} + +/* + 服务端响应详细信息 + Timestamp: 接受到消息的时间 + Info: 接受到的消息字符串 +*/ +type Msg struct { + Timestamp time.Time `json:"timestamp"` + Info interface{} `json:"info"` +} + +func (this *Msg) Print() { + fmt.Println("【消息时间】", this.Timestamp.Format("2006-01-02 15:04:05.000")) + str, _ := json.Marshal(this.Info) + fmt.Println("【消息内容】", string(str)) +} + +/* + 订阅结果封装后的消息结构体 +*/ +type ProcessDetail struct { + EndPoint string `json:"endPoint"` + ReqInfo string `json:"ReqInfo"` //订阅请求 + SendTime time.Time `json:"sendTime"` //发送订阅请求的时间 + RecvTime time.Time `json:"recvTime"` //接受到订阅结果的时间 + UsedTime time.Duration `json:"UsedTime"` //耗时 + Data []*Msg `json:"data"` //订阅结果数据 +} + +func (p *ProcessDetail) String() string { + data, _ := json.Marshal(p) + return string(data) +} + +// 创建ws对象 +func NewWsClient(ep string) (r *WsClient, err error) { + if ep == "" { + err = errors.New("websocket endpoint cannot be null") + return + } + + r = &WsClient{ + WsEndPoint: ep, + sendCh: make(chan string), + resCh: make(chan *Msg), + errCh: make(chan *Msg), + regCh: make(map[Event]chan *Msg), + //cbs: make(map[Event]ReceivedDataCallback), + quitCh: make(chan struct{}), + DepthDataList: make(map[string]DepthDetail), + dailTimeout: time.Second * 10, + // 自动深度校验默认开启 + autoDepthMgr: true, + } + + return +} + +/* + 新增记录深度信息 +*/ +func (a *WsClient) addDepthDataList(key string, dd DepthDetail) error { + a.DepthDataLock.Lock() + defer a.DepthDataLock.Unlock() + a.DepthDataList[key] = dd + return nil +} + +/* + 更新记录深度信息(如果没有记录不会更新成功) +*/ +func (a *WsClient) updateDepthDataList(key string, dd DepthDetail) error { + a.DepthDataLock.Lock() + defer a.DepthDataLock.Unlock() + if _, ok := a.DepthDataList[key]; !ok { + return errors.New("更新失败!未发现记录" + key) + } + + a.DepthDataList[key] = dd + return nil +} + +/* + 删除记录深度信息 +*/ +func (a *WsClient) deleteDepthDataList(key string) error { + a.DepthDataLock.Lock() + defer a.DepthDataLock.Unlock() + delete(a.DepthDataList, key) + return nil +} + +/* + 设置是否自动深度管理,开启 true,关闭 false +*/ +func (a *WsClient) EnableAutoDepthMgr(b bool) error { + a.DepthDataLock.Lock() + defer a.DepthDataLock.Unlock() + + if len(a.DepthDataList) != 0 { + err := errors.New("当前有深度数据处于订阅中") + return err + } + + a.autoDepthMgr = b + return nil +} + +/* + 获取当前的深度快照信息(合并后的) +*/ +func (a *WsClient) GetSnapshotByChannel(data DepthData) (snapshot *DepthDetail, err error) { + key, err := json.Marshal(data.Arg) + if err != nil { + return + } + a.DepthDataLock.Lock() + defer a.DepthDataLock.Unlock() + val, ok := a.DepthDataList[string(key)] + if !ok { + return + } + snapshot = new(DepthDetail) + raw, err := json.Marshal(val) + if err != nil { + return + } + err = json.Unmarshal(raw, &snapshot) + if err != nil { + return + } + return +} + +// 设置dial超时时间 +func (a *WsClient) SetDailTimeout(tm time.Duration) { + a.dailTimeout = tm +} + +// 非阻塞启动 +func (a *WsClient) Start() error { + a.lock.RLock() + if a.isStarted { + a.lock.RUnlock() + fmt.Println("ws已经启动") + return nil + } else { + a.lock.RUnlock() + a.lock.Lock() + defer a.lock.Unlock() + // 增加超时处理 + done := make(chan struct{}) + ctx, cancel := context.WithTimeout(context.Background(), a.dailTimeout) + defer cancel() + go func() { + defer func() { + close(done) + }() + var c *websocket.Conn + fmt.Println("a.WsEndPoint: ", a.WsEndPoint) + c, _, err := websocket.DefaultDialer.Dial(a.WsEndPoint, nil) + if err != nil { + err = errors.New("dial error:" + err.Error()) + return + } + a.conn = c + + }() + select { + case <-ctx.Done(): + err := errors.New("连接超时退出!") + return err + case <-done: + + } + //TODO 自定义的推送消息回调回来试试放在这里 + go a.receive() + go a.work() + a.isStarted = true + log.Println("客户端已启动!", a.WsEndPoint) + return nil + } +} + +// 客户端退出消息channel +func (a *WsClient) IsQuit() <-chan struct{} { + return a.quitCh +} + +func (a *WsClient) work() { + defer func() { + a.Stop() + err := recover() + if err != nil { + log.Printf("work End. Recover msg: %+v", a) + debug.PrintStack() + } + + }() + + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: // 保持心跳 + // go a.Ping(1000) + go func() { + _, _, err := a.Ping(1000) + if err != nil { + fmt.Println("心跳检测失败!", err) + a.Stop() + return + } + + }() + + case <-a.quitCh: // 保持心跳 + return + case data, ok := <-a.resCh: //接收到服务端发来的消息 + if !ok { + return + } + //log.Println("接收到来自resCh的消息:", data) + if a.onMessageHook != nil { + err := a.onMessageHook(data) + if err != nil { + log.Println("执行onMessageHook函数错误!", err) + } + } + case errMsg, ok := <-a.errCh: //错误处理 + if !ok { + return + } + if a.OnErrorHook != nil { + err := a.OnErrorHook(errMsg) + if err != nil { + log.Println("执行OnErrorHook函数错误!", err) + } + } + case req, ok := <-a.sendCh: //从发送队列中取出数据发送到服务端 + if !ok { + return + } + //log.Println("接收到来自req的消息:", req) + err := a.conn.WriteMessage(websocket.TextMessage, []byte(req)) + if err != nil { + log.Printf("发送请求失败: %s\n", err) + return + } + log.Printf("[发送请求] %v\n", req) + } + } + +} + +/* + 处理接受到的消息 +*/ +func (a *WsClient) receive() { + defer func() { + a.Stop() + err := recover() + if err != nil { + log.Printf("Receive End. Recover msg: %+v", a) + debug.PrintStack() + } + + }() + + for { + messageType, message, err := a.conn.ReadMessage() + if err != nil { + if a.isStarted { + log.Println("receive message error!" + err.Error()) + } + + break + } + + txtMsg := message + switch messageType { + case websocket.TextMessage: + case websocket.BinaryMessage: + txtMsg, err = GzipDecode(message) + if err != nil { + log.Println("解压失败!") + continue + } + } + + log.Println("[收到消息]", string(txtMsg)) + + //发送结果到默认消息处理通道 + + timestamp := time.Now() + msg := &Msg{Timestamp: timestamp, Info: string(txtMsg)} + + a.resCh <- msg + + evt, data, err := a.parseMessage(txtMsg) + if err != nil { + log.Println("解析消息失败!", err) + continue + } + + //log.Println("解析消息成功!消息类型 =", evt) + + a.lock.RLock() + ch, ok := a.regCh[evt] + a.lock.RUnlock() + if !ok { + //只有推送消息才会主动创建通道和消费队列 + if evt == EVENT_BOOKED_DATA || evt == EVENT_DEPTH_DATA { + //log.Println("channel不存在!event:", evt) + //a.lock.RUnlock() + a.lock.Lock() + a.regCh[evt] = make(chan *Msg) + ch = a.regCh[evt] + a.lock.Unlock() + + //log.Println("创建", evt, "通道") + + // 创建消费队列 + go func(evt Event) { + //log.Println("创建goroutine evt:", evt) + + for msg := range a.regCh[evt] { + //log.Println(msg) + // msg.Print() + switch evt { + // 处理普通推送数据 + case EVENT_BOOKED_DATA: + fn := a.onBookMsgHook + if fn != nil { + err = fn(msg.Timestamp, msg.Info.(MsgData)) + if err != nil { + log.Println("订阅数据回调函数执行失败!", err) + } + //log.Println("函数执行成功!", err) + } + // 处理深度推送数据 + case EVENT_DEPTH_DATA: + fn := a.onDepthHook + + depData := msg.Info.(DepthData) + + // 开启深度数据管理功能的,会合并深度数据 + if a.autoDepthMgr { + a.MergeDepth(depData) + } + + // 运行用户定义回调函数 + if fn != nil { + err = fn(msg.Timestamp, msg.Info.(DepthData)) + if err != nil { + log.Println("深度回调函数执行失败!", err) + } + + } + } + + } + //log.Println("退出goroutine evt:", evt) + }(evt) + + //continue + } else { + //log.Println("程序异常!通道已关闭", evt) + continue + } + + } + + //log.Println(evt,"事件已注册",ch) + + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, time.Millisecond*1000) + select { + /* + 丢弃消息容易引发数据处理处理错误 + */ + // case <-ctx.Done(): + // log.Println("等待超时,消息丢弃 - ", data) + case ch <- &Msg{Timestamp: timestamp, Info: data}: + } + cancel() + } +} + +/* + 开启了深度数据管理功能后,系统会自动合并深度信息 +*/ +func (a *WsClient) MergeDepth(depData DepthData) (err error) { + if !a.autoDepthMgr { + return + } + + key, err := json.Marshal(depData.Arg) + if err != nil { + err = errors.New("数据错误") + return + } + + // books5 不需要做checksum + if depData.Arg["channel"] == "books5" { + a.addDepthDataList(string(key), depData.Data[0]) + return + } + + if depData.Action == "snapshot" { + + _, err = depData.CheckSum(nil) + if err != nil { + log.Println("校验失败", err) + return + } + + a.addDepthDataList(string(key), depData.Data[0]) + + } else { + + var newSnapshot *DepthDetail + a.DepthDataLock.RLock() + oldSnapshot, ok := a.DepthDataList[string(key)] + if !ok { + log.Println("深度数据错误,全量数据未发现!") + err = errors.New("数据错误") + return + } + a.DepthDataLock.RUnlock() + newSnapshot, err = depData.CheckSum(&oldSnapshot) + if err != nil { + log.Println("深度校验失败", err) + err = errors.New("校验失败") + return + } + + a.updateDepthDataList(string(key), *newSnapshot) + } + return +} + +/* + 通过ErrorCode判断事件类型 +*/ +func GetInfoFromErrCode(data ErrData) Event { + switch data.Code { + case "60001": + return EVENT_LOGIN + case "60002": + return EVENT_LOGIN + case "60003": + return EVENT_LOGIN + case "60004": + return EVENT_LOGIN + case "60005": + return EVENT_LOGIN + case "60006": + return EVENT_LOGIN + case "60007": + return EVENT_LOGIN + case "60008": + return EVENT_LOGIN + case "60009": + return EVENT_LOGIN + case "60010": + return EVENT_LOGIN + case "60011": + return EVENT_LOGIN + } + + return EVENT_UNKNOWN +} + +/* + 从error返回中解析出对应的channel + error信息样例 + {"event":"error","msg":"channel:index-tickers,instId:BTC-USDT1 doesn't exist","code":"60018"} +*/ +func GetInfoFromErrMsg(raw string) (channel string) { + reg := regexp.MustCompile(`channel:(.*?),`) + if reg == nil { + fmt.Println("MustCompile err") + return + } + //提取关键信息 + result := reg.FindAllStringSubmatch(raw, -1) + for _, text := range result { + channel = text[1] + } + return +} + +/* + 解析消息类型 +*/ +func (a *WsClient) parseMessage(raw []byte) (evt Event, data interface{}, err error) { + evt = EVENT_UNKNOWN + //log.Println("解析消息") + //log.Println("消息内容:", string(raw)) + if string(raw) == "pong" { + evt = EVENT_PING + data = raw + return + } + //log.Println(0, evt) + var rspData = RspData{} + err = json.Unmarshal(raw, &rspData) + if err == nil { + op := rspData.Event + if op == OP_SUBSCRIBE || op == OP_UNSUBSCRIBE { + channel := rspData.Arg["channel"] + evt = GetEventId(channel) + data = rspData + return + } + } + + //log.Println("ErrData") + var errData = ErrData{} + err = json.Unmarshal(raw, &errData) + if err == nil { + op := errData.Event + switch op { + case OP_LOGIN: + evt = EVENT_LOGIN + data = errData + //log.Println(3, evt) + return + case OP_ERROR: + data = errData + // TODO:细化报错对应的事件判断 + + //尝试从msg字段中解析对应的事件类型 + evt = GetInfoFromErrCode(errData) + if evt != EVENT_UNKNOWN { + return + } + evt = GetEventId(GetInfoFromErrMsg(errData.Msg)) + if evt == EVENT_UNKNOWN { + evt = EVENT_ERROR + return + } + return + } + //log.Println(5, evt) + } + + //log.Println("JRPCRsp") + var jRPCRsp = JRPCRsp{} + err = json.Unmarshal(raw, &jRPCRsp) + if err == nil { + data = jRPCRsp + evt = GetEventId(jRPCRsp.Op) + if evt != EVENT_UNKNOWN { + return + } + } + + var depthData = DepthData{} + err = json.Unmarshal(raw, &depthData) + if err == nil { + evt = EVENT_DEPTH_DATA + data = depthData + //log.Println("-->>EVENT_DEPTH_DATA", evt) + //log.Println(evt, data) + //log.Println(6) + switch depthData.Arg["channel"] { + case "books": + return + case "books-l2-tbt": + return + case "books50-l2-tbt": + return + case "books5": + return + default: + + } + } + + //log.Println("MsgData") + var msgData = MsgData{} + err = json.Unmarshal(raw, &msgData) + if err == nil { + evt = EVENT_BOOKED_DATA + data = msgData + //log.Println("-->>EVENT_BOOK_DATA", evt) + //log.Println(evt, data) + //log.Println(6) + return + } + + evt = EVENT_UNKNOWN + err = errors.New("message unknown") + return +} + +func (a *WsClient) Stop() error { + + a.lock.Lock() + defer a.lock.Unlock() + if !a.isStarted { + return nil + } + + a.isStarted = false + + if a.conn != nil { + a.conn.Close() + } + close(a.errCh) + close(a.sendCh) + close(a.resCh) + close(a.quitCh) + + for _, ch := range a.regCh { + close(ch) + } + + log.Println("ws客户端退出!") + return nil +} + +/* + 添加全局消息处理的回调函数 +*/ +func (a *WsClient) AddMessageHook(fn ReceivedDataCallback) error { + a.onMessageHook = fn + return nil +} + +/* + 添加订阅消息处理的回调函数 +*/ +func (a *WsClient) AddBookMsgHook(fn ReceivedMsgDataCallback) error { + a.onBookMsgHook = fn + return nil +} + +/* + 添加深度消息处理的回调函数 + 例如: + cli.AddDepthHook(func(ts time.Time, data DepthData) error { return nil }) +*/ +func (a *WsClient) AddDepthHook(fn ReceivedDepthDataCallback) error { + a.onDepthHook = fn + return nil +} + +/* + 添加错误类型消息处理的回调函数 +*/ +func (a *WsClient) AddErrMsgHook(fn ReceivedDataCallback) error { + a.OnErrorHook = fn + return nil +} + +/* + 判断连接是否存活 +*/ +func (a *WsClient) IsAlive() bool { + res := false + if a.conn == nil { + return res + } + res, _, _ = a.Ping(500) + return res +} diff --git a/ws/ws_contants.go b/ws/ws_contants.go new file mode 100644 index 0000000..69a5dfe --- /dev/null +++ b/ws/ws_contants.go @@ -0,0 +1,18 @@ +package ws + +//操作符 +const ( + OP_LOGIN = "login" + OP_ERROR = "error" + OP_SUBSCRIBE = "subscribe" + OP_UNSUBSCRIBE = "unsubscribe" +) + +// instrument Type +const ( + SPOT = "SPOT" + SWAP = "SWAP" + FUTURES = "FUTURES" + OPTION = "OPTION" + ANY = "ANY" +) diff --git a/ws/ws_jrpc.go b/ws/ws_jrpc.go new file mode 100644 index 0000000..b27480e --- /dev/null +++ b/ws/ws_jrpc.go @@ -0,0 +1,157 @@ +package ws + +import ( + "context" + "log" + "time" + . "v5sdk_go/ws/wImpl" +) + +/* + websocket交易 通用请求 + 参数说明: + evtId:封装的事件类型 + id: 请求ID + op: 请求参数op + params: 请求参数 + timeOut: 超时时间 +*/ +func (a *WsClient) jrpcReq(evtId Event, op string, id string, params []map[string]interface{}, timeOut ...int) (res bool, detail *ProcessDetail, err error) { + res = true + tm := 5000 + if len(timeOut) != 0 { + tm = timeOut[0] + } + + req := &JRPCReq{ + Id: id, + Op: op, + Args: params, + } + + detail = &ProcessDetail{ + EndPoint: a.WsEndPoint, + } + + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, time.Duration(tm)*time.Millisecond) + defer cancel() + ctx = context.WithValue(ctx, "detail", detail) + + msg, err := a.process(ctx, evtId, req) + if err != nil { + res = false + log.Println("处理请求失败!", req, err) + return + } + detail.Data = msg + + res, err = checkResult(req, msg) + if err != nil { + res = false + return + } + + return +} + +/* + 单个下单 + 参数说明: + id: 请求ID + params: 请求参数 + timeOut: 超时时间 +*/ +func (a *WsClient) PlaceOrder(id string, param map[string]interface{}, timeOut ...int) (res bool, detail *ProcessDetail, err error) { + op := "order" + evtId := EVENT_PLACE_ORDER + + var args []map[string]interface{} + args = append(args, param) + + return a.jrpcReq(evtId, op, id, args, timeOut...) + +} + +/* + 批量下单 + 参数说明: + id: 请求ID + params: 请求参数 + timeOut: 超时时间 +*/ +func (a *WsClient) BatchPlaceOrders(id string, params []map[string]interface{}, timeOut ...int) (res bool, detail *ProcessDetail, err error) { + + op := "batch-orders" + evtId := EVENT_PLACE_BATCH_ORDERS + return a.jrpcReq(evtId, op, id, params, timeOut...) + +} + +/* + 单个撤单 + 参数说明: + id: 请求ID + params: 请求参数 + timeOut: 超时时间 +*/ +func (a *WsClient) CancelOrder(id string, param map[string]interface{}, timeOut ...int) (res bool, detail *ProcessDetail, err error) { + + op := "cancel-order" + evtId := EVENT_CANCEL_ORDER + + var args []map[string]interface{} + args = append(args, param) + + return a.jrpcReq(evtId, op, id, args, timeOut...) + +} + +/* + 批量撤单 + 参数说明: + id: 请求ID + params: 请求参数 + timeOut: 超时时间 +*/ +func (a *WsClient) BatchCancelOrders(id string, params []map[string]interface{}, timeOut ...int) (res bool, detail *ProcessDetail, err error) { + + op := "batch-cancel-orders" + evtId := EVENT_CANCEL_BATCH_ORDERS + return a.jrpcReq(evtId, op, id, params, timeOut...) + +} + +/* + 单个改单 + 参数说明: + id: 请求ID + params: 请求参数 + timeOut: 超时时间 +*/ +func (a *WsClient) AmendOrder(id string, param map[string]interface{}, timeOut ...int) (res bool, detail *ProcessDetail, err error) { + + op := "amend-order" + evtId := EVENT_AMEND_ORDER + + var args []map[string]interface{} + args = append(args, param) + + return a.jrpcReq(evtId, op, id, args, timeOut...) + +} + +/* + 批量改单 + 参数说明: + id: 请求ID + params: 请求参数 + timeOut: 超时时间 +*/ +func (a *WsClient) BatchAmendOrders(id string, params []map[string]interface{}, timeOut ...int) (res bool, detail *ProcessDetail, err error) { + + op := "batch-amend-orders" + evtId := EVENT_AMEND_BATCH_ORDERS + return a.jrpcReq(evtId, op, id, params, timeOut...) + +} diff --git a/ws/ws_jrpc_test.go b/ws/ws_jrpc_test.go new file mode 100644 index 0000000..502e7e1 --- /dev/null +++ b/ws/ws_jrpc_test.go @@ -0,0 +1,186 @@ +package ws + +import ( + "fmt" + "testing" + "time" + . "v5sdk_go/ws/wImpl" +) + +func PrintDetail(d *ProcessDetail) { + fmt.Println("[详细信息]") + fmt.Println("请求地址:", d.EndPoint) + fmt.Println("请求内容:", d.ReqInfo) + fmt.Println("发送时间:", d.SendTime.Format("2006-01-02 15:04:05.000")) + fmt.Println("响应时间:", d.RecvTime.Format("2006-01-02 15:04:05.000")) + fmt.Println("耗时:", d.UsedTime.String()) + fmt.Printf("接受到 %v 条消息:\n", len(d.Data)) + for _, v := range d.Data { + fmt.Printf("[%v] %v\n", v.Timestamp.Format("2006-01-02 15:04:05.000"), v.Info) + } +} + +func (r *WsClient) makeOrder(instId string, tdMode string, side string, ordType string, px string, sz string) (orderId string, err error) { + + var res bool + var data *ProcessDetail + + param := map[string]interface{}{} + param["instId"] = instId + param["tdMode"] = tdMode + param["side"] = side + param["ordType"] = ordType + if px != "" { + param["px"] = px + } + param["sz"] = sz + + res, data, err = r.PlaceOrder("0011", param) + if err != nil { + return + } + if res && len(data.Data) == 1 { + rsp := data.Data[0].Info.(JRPCRsp) + if len(rsp.Data) == 1 { + val, ok := rsp.Data[0]["ordId"] + if !ok { + return + } + orderId = val.(string) + return + } + } + + return +} + +/* + 单个下单 +*/ +// func TestPlaceOrder(t *testing.T) { +// r := prework_pri(CROSS_ACCOUNT) +// r := prework_pri(TRADE_ACCOUNT) +// var res bool +// var err error +// var data *ProcessDetail +// +// start := time.Now() +// param := map[string]interface{}{} +// param["instId"] = "BTC-USDT" +// param["tdMode"] = "cash" +// param["side"] = "buy" +// param["ordType"] = "market" +// param["px"] = "1" +// param["sz"] = "200" +// +// res, data, err = r.PlaceOrder("0011", param) +// if res { +// usedTime := time.Since(start) +// fmt.Println("下单成功!", usedTime.String()) +// PrintDetail(data) +// } else { +// usedTime := time.Since(start) +// fmt.Println("下单失败!", usedTime.String(), err) +// } +// +// } + +/* + 批量下单 +*/ +// func TestPlaceBatchOrder(t *testing.T) { +// r := prework_pri(CROSS_ACCOUNT) +// var res bool +// var err error +// var data *ProcessDetail +// +// start := time.Now() +// var params []map[string]interface{} +// param := map[string]interface{}{} +// param["instId"] = "BTC-USDT" +// param["tdMode"] = "cash" +// param["side"] = "sell" +// param["ordType"] = "market" +// param["sz"] = "0.001" +// params = append(params, param) +// param = map[string]interface{}{} +// param["instId"] = "BTC-USDT" +// param["tdMode"] = "cash" +// param["side"] = "buy" +// param["ordType"] = "market" +// param["sz"] = "100" +// params = append(params, param) +// res, data, err = r.BatchPlaceOrders("001", params) +// usedTime := time.Since(start) +// if err != nil { +// fmt.Println("下单失败!", err, usedTime.String()) +// t.Fail() +// } +// if res { +// fmt.Println("下单成功!", usedTime.String()) +// PrintDetail(data) +// } else { +// +// fmt.Println("下单失败!", usedTime.String()) +// t.Fail() +// } +// +// } + +/* + 撤销订单 +*/ +// func TestCancelOrder(t *testing.T) { +// r := prework_pri(CROSS_ACCOUNT) +// +// 用户自定义limit限价价格 +// ordId, _ := r.makeOrder("BTC-USDT", "cash", "sell", "limit", "57000", "0.01") +// if ordId == "" { +// t.Fatal() +// } +// +// t.Log("生成挂单:orderId=", ordId) +// +// param := map[string]interface{}{} +// param["instId"] = "BTC-USDT" +// param["ordId"] = ordId +// start := time.Now() +// res, _, _ := r.CancelOrder("1", param) +// if res { +// usedTime := time.Since(start) +// fmt.Println("撤单成功!", usedTime.String()) +// } else { +// t.Fatal("撤单失败!") +// } +// } + +/* + 修改订单 +*/ +func TestAmendlOrder(t *testing.T) { + r := prework_pri(CROSS_ACCOUNT) + + // 用户自定义limit限价价格 + ordId, _ := r.makeOrder("BTC-USDT", "cash", "sell", "limit", "57000", "0.01") + if ordId == "" { + t.Fatal() + } + + t.Log("生成挂单:orderId=", ordId) + + param := map[string]interface{}{} + param["instId"] = "BTC-USDT" + param["ordId"] = ordId + // 调整修改订单的参数 + //param["newSz"] = "0.02" + param["newPx"] = "57001" + + start := time.Now() + res, _, _ := r.AmendOrder("1", param) + if res { + usedTime := time.Since(start) + fmt.Println("修改订单成功!", usedTime.String()) + } else { + t.Fatal("修改订单失败!") + } +} diff --git a/ws/ws_middleware.go b/ws/ws_middleware.go new file mode 100644 index 0000000..800ea02 --- /dev/null +++ b/ws/ws_middleware.go @@ -0,0 +1,19 @@ +package ws + +import "fmt" + +type ReqFunc func(...interface{}) (res bool, msg *Msg, err error) +type Decorator func(ReqFunc) ReqFunc + +func handler(h ReqFunc, decors ...Decorator) ReqFunc { + for i := range decors { + d := decors[len(decors)-1-i] + h = d(h) + } + return h +} + +func preprocess() (res bool, msg *Msg, err error) { + fmt.Println("preprocess") + return +} diff --git a/ws/ws_op.go b/ws/ws_op.go new file mode 100644 index 0000000..f3522f4 --- /dev/null +++ b/ws/ws_op.go @@ -0,0 +1,532 @@ +package ws + +import ( + "context" + "errors" + "log" + "sync" + "time" + . "v5sdk_go/config" + "v5sdk_go/rest" + . "v5sdk_go/utils" + . "v5sdk_go/ws/wImpl" + . "v5sdk_go/ws/wInterface" +) + +/* + Ping服务端保持心跳。 + timeOut:超时时间(毫秒),如果不填默认为5000ms +*/ +func (a *WsClient) Ping(timeOut ...int) (res bool, detail *ProcessDetail, err error) { + tm := 5000 + if len(timeOut) != 0 { + tm = timeOut[0] + } + res = true + + detail = &ProcessDetail{ + EndPoint: a.WsEndPoint, + } + + ctx := context.Background() + ctx, _ = context.WithTimeout(ctx, time.Duration(tm)*time.Millisecond) + ctx = context.WithValue(ctx, "detail", detail) + msg, err := a.process(ctx, EVENT_PING, nil) + if err != nil { + res = false + log.Println("处理请求失败!", err) + return + } + detail.Data = msg + + if len(msg) == 0 { + res = false + return + } + + str := string(msg[0].Info.([]byte)) + if str != "pong" { + res = false + return + } + + return +} + +/* + 登录私有频道 +*/ +func (a *WsClient) Login(apiKey, secKey, passPhrase string, timeOut ...int) (res bool, detail *ProcessDetail, err error) { + + if apiKey == "" { + err = errors.New("ApiKey cannot be null") + return + } + + if secKey == "" { + err = errors.New("SecretKey cannot be null") + return + } + + if passPhrase == "" { + err = errors.New("Passphrase cannot be null") + return + } + + a.WsApi = &ApiInfo{ + ApiKey: apiKey, + SecretKey: secKey, + Passphrase: passPhrase, + } + + tm := 5000 + if len(timeOut) != 0 { + tm = timeOut[0] + } + res = true + + timestamp := EpochTime() + + preHash := PreHashString(timestamp, rest.GET, "/users/self/verify", "") + //fmt.Println("preHash:", preHash) + var sign string + if sign, err = HmacSha256Base64Signer(preHash, secKey); err != nil { + log.Println("处理签名失败!", err) + return + } + + args := map[string]string{} + args["apiKey"] = apiKey + args["passphrase"] = passPhrase + args["timestamp"] = timestamp + args["sign"] = sign + req := &ReqData{ + Op: OP_LOGIN, + Args: []map[string]string{args}, + } + + detail = &ProcessDetail{ + EndPoint: a.WsEndPoint, + } + + ctx := context.Background() + ctx, _ = context.WithTimeout(ctx, time.Duration(tm)*time.Millisecond) + ctx = context.WithValue(ctx, "detail", detail) + + msg, err := a.process(ctx, EVENT_LOGIN, req) + if err != nil { + res = false + log.Println("处理请求失败!", req, err) + return + } + detail.Data = msg + + if len(msg) == 0 { + res = false + return + } + + info, _ := msg[0].Info.(ErrData) + + if info.Code == "0" && info.Event == OP_LOGIN { + log.Println("登录成功!") + } else { + log.Println("登录失败!") + res = false + return + } + + return +} + +/* + 等待结果响应 +*/ +func (a *WsClient) waitForResult(e Event, timeOut int) (data interface{}, err error) { + + if _, ok := a.regCh[e]; !ok { + a.lock.Lock() + a.regCh[e] = make(chan *Msg) + a.lock.Unlock() + //log.Println("注册", e, "事件成功") + } + + a.lock.RLock() + defer a.lock.RUnlock() + ch := a.regCh[e] + //log.Println(e, "等待响应!") + select { + case <-time.After(time.Duration(timeOut) * time.Millisecond): + log.Println(e, "超时未响应!") + err = errors.New(e.String() + "超时未响应!") + return + case data = <-ch: + //log.Println(data) + } + + return +} + +/* + 发送消息到服务端 +*/ +func (a *WsClient) Send(ctx context.Context, op WSReqData) (err error) { + select { + case <-ctx.Done(): + log.Println("发生失败退出!") + err = errors.New("发送超时退出!") + case a.sendCh <- op.ToString(): + } + + return +} + +func (a *WsClient) process(ctx context.Context, e Event, op WSReqData) (data []*Msg, err error) { + defer func() { + _ = recover() + }() + + var detail *ProcessDetail + if val := ctx.Value("detail"); val != nil { + detail = val.(*ProcessDetail) + } else { + detail = &ProcessDetail{ + EndPoint: a.WsEndPoint, + } + } + defer func() { + //fmt.Println("处理完成,", e.String()) + detail.UsedTime = detail.RecvTime.Sub(detail.SendTime) + }() + + //查看事件是否被注册 + if _, ok := a.regCh[e]; !ok { + a.lock.Lock() + a.regCh[e] = make(chan *Msg) + a.lock.Unlock() + //log.Println("注册", e, "事件成功") + } else { + //log.Println("事件", e, "已注册!") + err = errors.New("事件" + e.String() + "尚未处理完毕") + return + } + + //预期请求响应的条数 + expectCnt := 1 + if op != nil { + expectCnt = op.Len() + } + recvCnt := 0 + + //等待完成通知 + wg := sync.WaitGroup{} + wg.Add(1) + // 这里要先定义go routine func(){} 是为了在里面订阅channel的内容, 我们知道一个队列要想往里塞东西,必先给他安排一个订阅它的协程 + go func(ctx context.Context) { + defer func() { + a.lock.Lock() + delete(a.regCh, e) + //log.Println("事件已注销!",e) + a.lock.Unlock() + wg.Done() + }() + + a.lock.RLock() + ch := a.regCh[e] //请求响应队列 + a.lock.RUnlock() + + //log.Println(e, "等待响应!") + done := false + ok := true + for { + var item *Msg + select { + case <-ctx.Done(): + log.Println(e, "超时未响应!") + err = errors.New(e.String() + "超时未响应!") + return + case item, ok = <-ch: + if !ok { + return + } + detail.RecvTime = time.Now() + //log.Println(e, "接受到数据", item) + // 这里只是把推送的数据显示出来,并没有做更近一步的处理,后续可以二次开发,在这个位置上进行处理 + data = append(data, item) + recvCnt++ + //log.Println(data) + if recvCnt == expectCnt { + done = true + break + } + } + if done { + break + } + } + if ok { + close(ch) + } + + }(ctx) + + // + switch e { + case EVENT_PING: + msg := "ping" + detail.ReqInfo = msg + a.sendCh <- msg + detail.SendTime = time.Now() + default: + detail.ReqInfo = op.ToString() + //这个时候ctx中已经提供了meta信息,用于发送ws请求 + err = a.Send(ctx, op) + if err != nil { + log.Println("发送[", e, "]消息失败!", err) + return + } + detail.SendTime = time.Now() + } + + wg.Wait() + return +} + +/* + 根据args请求参数判断请求类型 + 如:{"channel": "account","ccy": "BTC"} 类型为 EVENT_BOOK_ACCOUNT +*/ +func GetEventByParam(param map[string]string) (evtId Event) { + evtId = EVENT_UNKNOWN + channel, ok := param["channel"] + if !ok { + return + } + + evtId = GetEventId(channel) + return +} + +/* + 订阅频道。 + req:请求json字符串 +*/ +func (a *WsClient) Subscribe(param map[string]string, timeOut ...int) (res bool, detail *ProcessDetail, err error) { + res = true + tm := 5000 + if len(timeOut) != 0 { + tm = timeOut[0] + } + + evtid := GetEventByParam(param) + if evtid == EVENT_UNKNOWN { + err = errors.New("非法的请求参数!") + return + } + + var args []map[string]string + args = append(args, param) + + req := ReqData{ + Op: OP_SUBSCRIBE, + Args: args, + } + + detail = &ProcessDetail{ + EndPoint: a.WsEndPoint, + } + + ctx := context.Background() + ctx, _ = context.WithTimeout(ctx, time.Duration(tm)*time.Millisecond) + ctx = context.WithValue(ctx, "detail", detail) + + msg, err := a.process(ctx, evtid, req) + if err != nil { + res = false + log.Println("处理请求失败!", req, err) + return + } + detail.Data = msg + + //检查所有频道是否都更新成功 + res, err = checkResult(req, msg) + if err != nil { + res = false + return + } + + return +} + +/* + 取消订阅频道。 + req:请求json字符串 +*/ +func (a *WsClient) UnSubscribe(param map[string]string, timeOut ...int) (res bool, detail *ProcessDetail, err error) { + res = true + tm := 5000 + if len(timeOut) != 0 { + tm = timeOut[0] + } + + evtid := GetEventByParam(param) + if evtid == EVENT_UNKNOWN { + err = errors.New("非法的请求参数!") + return + } + + var args []map[string]string + args = append(args, param) + + req := ReqData{ + Op: OP_UNSUBSCRIBE, + Args: args, + } + + detail = &ProcessDetail{ + EndPoint: a.WsEndPoint, + } + + ctx := context.Background() + ctx, _ = context.WithTimeout(ctx, time.Duration(tm)*time.Millisecond) + ctx = context.WithValue(ctx, "detail", detail) + msg, err := a.process(ctx, evtid, req) + if err != nil { + res = false + log.Println("处理请求失败!", req, err) + return + } + detail.Data = msg + //检查所有频道是否都更新成功 + res, err = checkResult(req, msg) + if err != nil { + res = false + return + } + + return +} + +/* + jrpc请求 +*/ +func (a *WsClient) Jrpc(id, op string, params []map[string]interface{}, timeOut ...int) (res bool, detail *ProcessDetail, err error) { + res = true + tm := 5000 + if len(timeOut) != 0 { + tm = timeOut[0] + } + + evtid := GetEventId(op) + if evtid == EVENT_UNKNOWN { + err = errors.New("非法的请求参数!") + return + } + + req := JRPCReq{ + Id: id, + Op: op, + Args: params, + } + detail = &ProcessDetail{ + EndPoint: a.WsEndPoint, + } + + ctx := context.Background() + ctx, _ = context.WithTimeout(ctx, time.Duration(tm)*time.Millisecond) + ctx = context.WithValue(ctx, "detail", detail) + msg, err := a.process(ctx, evtid, req) + if err != nil { + res = false + log.Println("处理请求失败!", req, err) + return + } + detail.Data = msg + + //检查所有频道是否都更新成功 + res, err = checkResult(req, msg) + if err != nil { + res = false + return + } + + return +} + +func (a *WsClient) PubChannel(evtId Event, op string, params []map[string]string, pd Period, timeOut ...int) (res bool, msg []*Msg, err error) { + + // 参数校验 + pa, err := checkParams(evtId, params, pd) + if err != nil { + return + } + + res = true + tm := 5000 + if len(timeOut) != 0 { + tm = timeOut[0] + } + + req := ReqData{ + Op: op, + Args: pa, + } + + ctx := context.Background() + ctx, _ = context.WithTimeout(ctx, time.Duration(tm)*time.Millisecond) + msg, err = a.process(ctx, evtId, req) + if err != nil { + res = false + log.Println("处理请求失败!", req, err) + return + } + + //检查所有频道是否都更新成功 + + res, err = checkResult(req, msg) + if err != nil { + res = false + return + } + + return +} + +// 参数校验 +func checkParams(evtId Event, params []map[string]string, pd Period) (res []map[string]string, err error) { + + channel := evtId.GetChannel(pd) + if channel == "" { + err = errors.New("参数校验失败!未知的类型:" + evtId.String()) + return + } + log.Println(channel) + if params == nil { + tmp := make(map[string]string) + tmp["channel"] = channel + res = append(res, tmp) + } else { + //log.Println(params) + for _, param := range params { + + tmp := make(map[string]string) + for k, v := range param { + tmp[k] = v + } + + val, ok := tmp["channel"] + if !ok { + tmp["channel"] = channel + } else { + if val != channel { + err = errors.New("参数校验失败!channel应为" + channel + val) + return + } + } + + res = append(res, tmp) + } + } + + return +} diff --git a/ws/ws_priv_channel.go b/ws/ws_priv_channel.go new file mode 100644 index 0000000..650d5ec --- /dev/null +++ b/ws/ws_priv_channel.go @@ -0,0 +1,40 @@ +package ws + +import ( + . "v5sdk_go/ws/wImpl" +) + +/* + 订阅账户频道 +*/ +func (a *WsClient) PrivAccout(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) { + return a.PubChannel(EVENT_BOOK_ACCOUNT, op, params, PERIOD_NONE, timeOut...) +} + +/* + 订阅持仓频道 +*/ +func (a *WsClient) PrivPostion(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) { + return a.PubChannel(EVENT_BOOK_POSTION, op, params, PERIOD_NONE, timeOut...) +} + +/* + 订阅订单频道 +*/ +func (a *WsClient) PrivBookOrder(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) { + return a.PubChannel(EVENT_BOOK_ORDER, op, params, PERIOD_NONE, timeOut...) +} + +/* + 订阅策略委托订单频道 +*/ +func (a *WsClient) PrivBookAlgoOrder(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) { + return a.PubChannel(EVENT_BOOK_ALG_ORDER, op, params, PERIOD_NONE, timeOut...) +} + +/* + 订阅账户余额和持仓频道 +*/ +func (a *WsClient) PrivBalAndPos(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) { + return a.PubChannel(EVENT_BOOK_B_AND_P, op, params, PERIOD_NONE, timeOut...) +} diff --git a/ws/ws_priv_channel_Accout_test.go b/ws/ws_priv_channel_Accout_test.go new file mode 100644 index 0000000..5cac15b --- /dev/null +++ b/ws/ws_priv_channel_Accout_test.go @@ -0,0 +1,99 @@ +package ws + +// HOW TO RUN +// go test ws_cli.go ws_op.go ws_contants.go utils.go ws_priv_channel.go ws_priv_channel_Accout_test.go -v + +import ( + "fmt" + "log" + "testing" + "time" +) + +const ( + TRADE_ACCOUNT = iota + ISOLATE_ACCOUNT + CROSS_ACCOUNT + CROSS_ACCOUNT_B +) + +func prework_pri(t int) *WsClient { + // 模拟环境 + ep := "wss://wsaws.okex.com:8443/ws/v5/private" + var apikey, passphrase, secretKey string + // 把账号密码写这里 + switch t { + case TRADE_ACCOUNT: + apikey = "fe468418-5e40-433f-8d04-04951286d417" + passphrase = "M4pw71Id" + secretKey = "D6D74DF9DD60A25BE2B27CA71D8F814D" + case ISOLATE_ACCOUNT: + apikey = "fe468418-5e40-433f-8d04-04951286d417" + passphrase = "M4pw71Id" + secretKey = "D6D74DF9DD60A25BE2B27CA71D8F814D" + case CROSS_ACCOUNT: + apikey = "fe468418-5e40-433f-8d04-04951286d417" + passphrase = "M4pw71Id" + secretKey = "D6D74DF9DD60A25BE2B27CA71D8F814D" + case CROSS_ACCOUNT_B: + apikey = "fe468418-5e40-433f-8d04-04951286d417" + passphrase = "M4pw71Id" + secretKey = "D6D74DF9DD60A25BE2B27CA71D8F814D" + } + + r, err := NewWsClient(ep) + if err != nil { + log.Fatal(err) + } + + err = r.Start() + if err != nil { + log.Fatal(err) + } + + var res bool + start := time.Now() + res, _, err = r.Login(apikey, secretKey, passphrase) + if res { + usedTime := time.Since(start) + fmt.Println("登录成功!", usedTime.String()) + } else { + log.Fatal("登录失败!", err) + } + fmt.Println(apikey, secretKey, passphrase) + return r +} + +// 账户频道 测试 +func TestAccout(t *testing.T) { + r := prework_pri(CROSS_ACCOUNT) + var res bool + var err error + + var args []map[string]string + arg := make(map[string]string) + arg["ccy"] = "BTC" + args = append(args, arg) + fmt.Println("args: ", args) + start := time.Now() + res, _, err = r.PrivAccout(OP_SUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("订阅所有成功!", usedTime.String()) + } else { + fmt.Println("订阅所有成功!", err) + t.Fatal("订阅所有成功!", err) + } + + time.Sleep(100 * time.Second) + start = time.Now() + // res, _, err = r.PrivAccout(OP_UNSUBSCRIBE, args) + // if res { + // usedTime := time.Since(start) + // fmt.Println("取消订阅所有成功!", usedTime.String()) + // } else { + // fmt.Println("取消订阅所有失败!", err) + // t.Fatal("取消订阅所有失败!", err) + // } + +} diff --git a/ws/ws_priv_channel_test.go b/ws/ws_priv_channel_test.go new file mode 100644 index 0000000..99cd382 --- /dev/null +++ b/ws/ws_priv_channel_test.go @@ -0,0 +1,247 @@ +package ws + +import ( + "fmt" + "log" + "testing" + "time" +) + +const ( + TRADE_ACCOUNT = iota + ISOLATE_ACCOUNT + CROSS_ACCOUNT + CROSS_ACCOUNT_B +) + +func prework_pri(t int) *WsClient { + ep := "wss://wsaws.okex.com:8443/ws/v5/private" + var apikey, passphrase, secretKey string + // 把账号密码写这里 + switch t { + case TRADE_ACCOUNT: + apikey = "fe468418-5e40-433f-8d04-04951286d417" + passphrase = "M4pw71Id" + secretKey = "D6D74DF9DD60A25BE2B27CA71D8F814D" + case ISOLATE_ACCOUNT: + apikey = "fe468418-5e40-433f-8d04-04951286d417" + passphrase = "M4pw71Id" + secretKey = "D6D74DF9DD60A25BE2B27CA71D8F814D" + case CROSS_ACCOUNT: + apikey = "fe468418-5e40-433f-8d04-04951286d417" + passphrase = "M4pw71Id" + secretKey = "D6D74DF9DD60A25BE2B27CA71D8F814D" + case CROSS_ACCOUNT_B: + apikey = "fe468418-5e40-433f-8d04-04951286d417" + passphrase = "M4pw71Id" + secretKey = "D6D74DF9DD60A25BE2B27CA71D8F814D" + } + + r, err := NewWsClient(ep) + if err != nil { + log.Fatal(err) + } + + err = r.Start() + if err != nil { + log.Fatal(err) + } + + var res bool + //start := time.Now() + res, _, err = r.Login(apikey, secretKey, passphrase) + if res { + //usedTime := time.Since(start) + //fmt.Println("登录成功!",usedTime.String()) + } else { + log.Fatal("登录失败!", err) + } + fmt.Println(apikey, secretKey, passphrase) + return r +} + +// 账户频道 测试 +func TestAccout(t *testing.T) { + r := prework_pri(CROSS_ACCOUNT) + var res bool + var err error + + var args []map[string]string + arg := make(map[string]string) + //arg["ccy"] = "BTC" + args = append(args, arg) + + start := time.Now() + res, _, err = r.PrivAccout(OP_SUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("订阅所有成功!", usedTime.String()) + } else { + fmt.Println("订阅所有成功!", err) + t.Fatal("订阅所有成功!", err) + } + + time.Sleep(100 * time.Second) + start = time.Now() + res, _, err = r.PrivAccout(OP_UNSUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("取消订阅所有成功!", usedTime.String()) + } else { + fmt.Println("取消订阅所有失败!", err) + t.Fatal("取消订阅所有失败!", err) + } + +} + +// 持仓频道 测试 +func TestPositon(t *testing.T) { + r := prework_pri(CROSS_ACCOUNT) + var err error + var res bool + + var args []map[string]string + arg := make(map[string]string) + arg["instType"] = FUTURES + arg["uly"] = "BTC-USD" + //arg["instId"] = "BTC-USD-210319" + args = append(args, arg) + + start := time.Now() + res, _, err = r.PrivPostion(OP_SUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("订阅成功!", usedTime.String()) + } else { + fmt.Println("订阅失败!", err) + t.Fatal("订阅失败!", err) + //return + } + + time.Sleep(60000 * time.Second) + //等待推送 + + start = time.Now() + res, _, err = r.PrivPostion(OP_UNSUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("取消订阅成功!", usedTime.String()) + + } else { + fmt.Println("取消订阅失败!", err) + t.Fatal("取消订阅失败!", err) + } + +} + +// 订单频道 测试 +func TestBookOrder(t *testing.T) { + r := prework_pri(CROSS_ACCOUNT) + var err error + var res bool + + var args []map[string]string + arg := make(map[string]string) + arg["instId"] = "BTC-USDT" + arg["instType"] = "ANY" + //arg["instType"] = "SWAP" + args = append(args, arg) + + start := time.Now() + res, _, err = r.PrivBookOrder(OP_SUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("订阅成功!", usedTime.String()) + } else { + fmt.Println("订阅失败!", err) + t.Fatal("订阅失败!", err) + //return + } + + time.Sleep(6000 * time.Second) + //等待推送 + + start = time.Now() + res, _, err = r.PrivBookOrder(OP_UNSUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("取消订阅成功!", usedTime.String()) + } else { + fmt.Println("取消订阅失败!", err) + t.Fatal("取消订阅失败!", err) + } + +} + +// 策略委托订单频道 测试 +func TestAlgoOrder(t *testing.T) { + r := prework_pri(CROSS_ACCOUNT) + var err error + var res bool + + var args []map[string]string + arg := make(map[string]string) + arg["instType"] = "SPOT" + args = append(args, arg) + + start := time.Now() + res, _, err = r.PrivBookAlgoOrder(OP_SUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("订阅成功!", usedTime.String()) + } else { + fmt.Println("订阅失败!", err) + t.Fatal("订阅失败!", err) + //return + } + + time.Sleep(60 * time.Second) + //等待推送 + + start = time.Now() + res, _, err = r.PrivBookAlgoOrder(OP_UNSUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("取消订阅成功!", usedTime.String()) + } else { + fmt.Println("取消订阅失败!", err) + t.Fatal("取消订阅失败!", err) + } + +} + +// 账户余额和持仓频道 测试 +func TestPrivBalAndPos(t *testing.T) { + r := prework_pri(CROSS_ACCOUNT) + var err error + var res bool + + var args []map[string]string + arg := make(map[string]string) + args = append(args, arg) + + start := time.Now() + res, _, err = r.PrivBalAndPos(OP_SUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("订阅成功!", usedTime.String()) + } else { + fmt.Println("订阅失败!", err) + t.Fatal("订阅失败!", err) + //return + } + + time.Sleep(600 * time.Second) + //等待推送 + + start = time.Now() + res, _, err = r.PrivBalAndPos(OP_UNSUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("取消订阅成功!", usedTime.String()) + } else { + fmt.Println("取消订阅失败!", err) + t.Fatal("取消订阅失败!", err) + } + +} diff --git a/ws/ws_pub_channel.go b/ws/ws_pub_channel.go new file mode 100644 index 0000000..bad023f --- /dev/null +++ b/ws/ws_pub_channel.go @@ -0,0 +1,141 @@ +package ws + +import ( + "errors" + . "v5sdk_go/ws/wImpl" +) + +/* + 产品频道 +*/ +func (a *WsClient) PubInstruemnts(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) { + + return a.PubChannel(EVENT_BOOK_INSTRUMENTS, op, params, PERIOD_NONE, timeOut...) +} + +// 获取系统维护的状态,当系统维护状态改变时推送 +func (a *WsClient) PubStatus(op string, timeOut ...int) (res bool, msg []*Msg, err error) { + return a.PubChannel(EVENT_STATUS, op, nil, PERIOD_NONE, timeOut...) +} + +/* + 行情频道 +*/ +func (a *WsClient) PubTickers(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) { + + return a.PubChannel(EVENT_BOOK_TICKERS, op, params, PERIOD_NONE, timeOut...) +} + +/* + 持仓总量频道 +*/ +func (a *WsClient) PubOpenInsterest(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) { + return a.PubChannel(EVENT_BOOK_OPEN_INTEREST, op, params, PERIOD_NONE, timeOut...) +} + +/* + K线频道 +*/ +func (a *WsClient) PubKLine(op string, period Period, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) { + + return a.PubChannel(EVENT_BOOK_KLINE, op, params, period, timeOut...) +} + +/* + 交易频道 +*/ +func (a *WsClient) PubTrade(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) { + + return a.PubChannel(EVENT_BOOK_TRADE, op, params, PERIOD_NONE, timeOut...) +} + +/* + 预估交割/行权价格频道 +*/ +func (a *WsClient) PubEstDePrice(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) { + + return a.PubChannel(EVENT_BOOK_ESTIMATE_PRICE, op, params, PERIOD_NONE, timeOut...) + +} + +/* + 标记价格频道 +*/ +func (a *WsClient) PubMarkPrice(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) { + + return a.PubChannel(EVENT_BOOK_MARK_PRICE, op, params, PERIOD_NONE, timeOut...) +} + +/* + 标记价格K线频道 +*/ +func (a *WsClient) PubMarkPriceCandle(op string, pd Period, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) { + + return a.PubChannel(EVENT_BOOK_MARK_PRICE_CANDLE_CHART, op, params, pd, timeOut...) +} + +/* + 限价频道 +*/ +func (a *WsClient) PubLimitPrice(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) { + + return a.PubChannel(EVENT_BOOK_LIMIT_PRICE, op, params, PERIOD_NONE, timeOut...) +} + +/* + 深度频道 +*/ +func (a *WsClient) PubOrderBooks(op string, channel string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) { + + switch channel { + // 400档快照 + case "books": + return a.PubChannel(EVENT_BOOK_ORDER_BOOK, op, params, PERIOD_NONE, timeOut...) + // 5档快照 + case "books5": + return a.PubChannel(EVENT_BOOK_ORDER_BOOK5, op, params, PERIOD_NONE, timeOut...) + // 400 tbt + case "books-l2-tbt": + return a.PubChannel(EVENT_BOOK_ORDER_BOOK_TBT, op, params, PERIOD_NONE, timeOut...) + // 50 tbt + case "books50-l2-tbt": + return a.PubChannel(EVENT_BOOK_ORDER_BOOK50_TBT, op, params, PERIOD_NONE, timeOut...) + + default: + err = errors.New("未知的channel") + return + } + +} + +/* + 期权定价频道 +*/ +func (a *WsClient) PubOptionSummary(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) { + + return a.PubChannel(EVENT_BOOK_OPTION_SUMMARY, op, params, PERIOD_NONE, timeOut...) +} + +/* + 资金费率频道 +*/ +func (a *WsClient) PubFundRate(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) { + + return a.PubChannel(EVENT_BOOK_FUND_RATE, op, params, PERIOD_NONE, timeOut...) +} + +/* + 指数K线频道 +*/ +func (a *WsClient) PubKLineIndex(op string, pd Period, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) { + + return a.PubChannel(EVENT_BOOK_KLINE_INDEX, op, params, pd, timeOut...) +} + +/* + 指数行情频道 +*/ +func (a *WsClient) PubIndexTickers(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) { + + return a.PubChannel(EVENT_BOOK_INDEX_TICKERS, op, params, PERIOD_NONE, timeOut...) +} diff --git a/ws/ws_pub_channel_test.go b/ws/ws_pub_channel_test.go new file mode 100644 index 0000000..ac96eef --- /dev/null +++ b/ws/ws_pub_channel_test.go @@ -0,0 +1,669 @@ +package ws + +import ( + "encoding/json" + "fmt" + "log" + "strings" + "testing" + "time" + . "v5sdk_go/ws/wImpl" +) + +func prework() *WsClient { + ep := "wss://wsaws.okex.com:8443/ws/v5/private" + + r, err := NewWsClient(ep) + if err != nil { + log.Fatal(err) + } + + err = r.Start() + if err != nil { + log.Fatal(err, ep) + } + return r +} + +// 产品频道测试 +func TestInstruemnts(t *testing.T) { + r := prework() + var err error + var res bool + + var args []map[string]string + arg := make(map[string]string) + arg["instType"] = SPOT + //arg["instType"] = OPTION + args = append(args, arg) + + start := time.Now() + res, _, err = r.PubInstruemnts(OP_SUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("订阅成功!", usedTime.String()) + } else { + fmt.Println("订阅失败!", err) + t.Fatal("订阅失败!", err) + //return + } + + time.Sleep(3 * time.Second) + //等待推送 + + start = time.Now() + res, _, err = r.PubInstruemnts(OP_UNSUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("取消订阅成功!", usedTime.String()) + } else { + fmt.Println("取消订阅失败!", err) + t.Fatal("取消订阅失败!", err) + } + +} + +// status频道测试 +func TestStatus(t *testing.T) { + r := prework() + var err error + var res bool + + start := time.Now() + res, _, err = r.PubStatus(OP_SUBSCRIBE) + if res { + usedTime := time.Since(start) + fmt.Println("订阅成功!", usedTime.String()) + } else { + fmt.Println("订阅失败!", err) + t.Fatal("订阅失败!", err) + //return + } + + time.Sleep(10000 * time.Second) + //等待推送 + + start = time.Now() + res, _, err = r.PubStatus(OP_UNSUBSCRIBE) + if res { + usedTime := time.Since(start) + fmt.Println("取消订阅成功!", usedTime.String()) + } else { + fmt.Println("取消订阅失败!", err) + t.Fatal("取消订阅失败!", err) + } + +} + +// 行情频道测试 +func TestTickers(t *testing.T) { + r := prework() + var err error + var res bool + + var args []map[string]string + arg := make(map[string]string) + arg["instId"] = "BTC-USDT" + + args = append(args, arg) + + start := time.Now() + res, _, err = r.PubTickers(OP_SUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("订阅成功!", usedTime.String()) + } else { + fmt.Println("订阅失败!", err) + t.Fatal("订阅失败!", err) + //return + } + + time.Sleep(600 * time.Second) + //等待推送 + + start = time.Now() + res, _, err = r.PubTickers(OP_UNSUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("取消订阅成功!", usedTime.String()) + } else { + fmt.Println("取消订阅失败!", err) + t.Fatal("取消订阅失败!", err) + } + +} + +// 持仓总量频道 测试 +func TestOpenInsterest(t *testing.T) { + r := prework() + var err error + var res bool + + var args []map[string]string + arg := make(map[string]string) + arg["instId"] = "LTC-USD-SWAP" + + args = append(args, arg) + + start := time.Now() + res, _, err = r.PubOpenInsterest(OP_SUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("订阅成功!", usedTime.String()) + } else { + fmt.Println("订阅失败!", err) + t.Fatal("订阅失败!", err) + //return + } + + time.Sleep(60 * time.Second) + //等待推送 + + start = time.Now() + res, _, err = r.PubOpenInsterest(OP_UNSUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("取消订阅成功!", usedTime.String()) + } else { + fmt.Println("取消订阅失败!", err) + t.Fatal("取消订阅失败!", err) + } + +} + +// K线频道测试 +func TestKLine(t *testing.T) { + r := prework() + var err error + var res bool + + var args []map[string]string + arg := make(map[string]string) + arg["instId"] = "BTC-USDT" + args = append(args, arg) + + // 1分钟K + period := PERIOD_1MIN + + start := time.Now() + res, _, err = r.PubKLine(OP_SUBSCRIBE, period, args) + if res { + usedTime := time.Since(start) + fmt.Println("订阅成功!", usedTime.String()) + } else { + fmt.Println("订阅失败!", err) + t.Fatal("订阅失败!", err) + } + + time.Sleep(60 * time.Second) + //等待推送 + + start = time.Now() + res, _, err = r.PubKLine(OP_UNSUBSCRIBE, period, args) + if res { + usedTime := time.Since(start) + fmt.Println("取消订阅成功!", usedTime.String()) + } else { + fmt.Println("取消订阅失败!", err) + t.Fatal("取消订阅失败!", err) + } + +} + +// 交易频道测试 +func TestTrade(t *testing.T) { + r := prework() + var err error + var res bool + + var args []map[string]string + arg := make(map[string]string) + arg["instId"] = "BTC-USDT" + args = append(args, arg) + + start := time.Now() + res, _, err = r.PubTrade(OP_SUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("订阅成功!", usedTime.String()) + } else { + fmt.Println("订阅失败!", err) + t.Fatal("订阅失败!", err) + //return + } + + time.Sleep(60 * time.Second) + //等待推送 + + start = time.Now() + res, _, err = r.PubTrade(OP_UNSUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("取消订阅成功!", usedTime.String()) + } else { + fmt.Println("取消订阅失败!", err) + t.Fatal("取消订阅失败!", err) + } + +} + +// 预估交割/行权价格频道 测试 +func TestEstDePrice(t *testing.T) { + r := prework() + var err error + var res bool + + var args []map[string]string + arg := make(map[string]string) + arg["instType"] = FUTURES + arg["uly"] = "BTC-USD" + args = append(args, arg) + + start := time.Now() + res, _, err = r.PubEstDePrice(OP_SUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("订阅成功!", usedTime.String()) + } else { + fmt.Println("订阅失败!", err) + t.Fatal("订阅失败!", err) + //return + } + + time.Sleep(60 * time.Second) + //等待推送 + + start = time.Now() + res, _, err = r.PubEstDePrice(OP_UNSUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("取消订阅成功!", usedTime.String()) + } else { + fmt.Println("取消订阅失败!", err) + t.Fatal("取消订阅失败!", err) + } + +} + +// 标记价格频道 测试 +func TestMarkPrice(t *testing.T) { + r := prework() + var err error + var res bool + + var args []map[string]string + arg := make(map[string]string) + arg["instId"] = "BTC-USDT" + + args = append(args, arg) + + start := time.Now() + res, _, err = r.PubMarkPrice(OP_SUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("订阅成功!", usedTime.String()) + } else { + fmt.Println("订阅失败!", err) + t.Fatal("订阅失败!", err) + //return + } + + time.Sleep(60 * time.Second) + //等待推送 + + start = time.Now() + res, _, err = r.PubMarkPrice(OP_UNSUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("取消订阅成功!", usedTime.String()) + } else { + fmt.Println("取消订阅失败!", err) + t.Fatal("取消订阅失败!", err) + } + +} + +// 标记价格K线频道 测试s +func TestMarkPriceCandle(t *testing.T) { + r := prework() + var err error + var res bool + + var args []map[string]string + arg := make(map[string]string) + arg["instId"] = "BTC-USDT" + args = append(args, arg) + + period := PERIOD_1MIN + + start := time.Now() + res, _, err = r.PubMarkPriceCandle(OP_SUBSCRIBE, period, args) + if res { + usedTime := time.Since(start) + fmt.Println("订阅成功!", usedTime.String()) + } else { + fmt.Println("订阅失败!", err) + t.Fatal("订阅失败!", err) + //return + } + + time.Sleep(60 * time.Second) + //等待推送 + + start = time.Now() + res, _, err = r.PubMarkPriceCandle(OP_UNSUBSCRIBE, period, args) + if res { + usedTime := time.Since(start) + fmt.Println("取消订阅成功!", usedTime.String()) + } else { + fmt.Println("取消订阅失败!", err) + t.Fatal("取消订阅失败!", err) + } + +} + +// 限价频道 测试 +func TestLimitPrice(t *testing.T) { + r := prework() + var err error + var res bool + + var args []map[string]string + arg := make(map[string]string) + arg["instId"] = "BTC-USDT" + args = append(args, arg) + + start := time.Now() + res, _, err = r.PubLimitPrice(OP_SUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("订阅成功!", usedTime.String()) + } else { + fmt.Println("订阅失败!", err) + t.Fatal("订阅失败!", err) + //return + } + + time.Sleep(60 * time.Second) + //等待推送 + + start = time.Now() + res, _, err = r.PubLimitPrice(OP_UNSUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("取消订阅成功!", usedTime.String()) + } else { + fmt.Println("取消订阅失败!", err) + t.Fatal("取消订阅失败!", err) + } + +} + +// 深度频道 测试 +func TestOrderBooks(t *testing.T) { + r := prework() + var err error + var res bool + + /* + 设置关闭深度数据管理 + */ + // err = r.EnableAutoDepthMgr(false) + // if err != nil { + // fmt.Println("关闭自动校验失败!") + // } + + end := make(chan struct{}) + + r.AddDepthHook(func(ts time.Time, data DepthData) error { + // 对于深度类型数据处理的用户可以自定义 + + // 检测深度数据是否正常 + key, _ := json.Marshal(data.Arg) + fmt.Println("个数:", len(data.Data[0].Asks)) + checksum := data.Data[0].Checksum + fmt.Println("[自定义方法] ", string(key), ", checksum = ", checksum) + + for _, ask := range data.Data[0].Asks { + + arr := strings.Split(ask[0], ".") + //fmt.Println(arr) + if len(arr) > 1 && len(arr[1]) > 2 { + fmt.Println("ask数据异常,", checksum, "ask:", ask) + t.Fatal() + end <- struct{}{} + return nil + } else { + fmt.Println("bid数据正常,", checksum, "ask:", ask) + } + + } + + for _, bid := range data.Data[0].Bids { + + arr := strings.Split(bid[0], ".") + //fmt.Println(arr) + if len(arr) > 1 && len(arr[1]) > 2 { + fmt.Println("bid数据异常,", checksum, "bid:", bid) + t.Fatal() + end <- struct{}{} + return nil + } else { + fmt.Println("ask数据正常,", checksum, "bid:", bid) + } + + } + + // // 查看当前合并后的全量深度数据 + // snapshot, err := r.GetSnapshotByChannel(data) + // if err != nil { + // t.Fatal("深度数据不存在!") + // } + // // 展示ask/bid 前5档数据 + // fmt.Println(" Ask 5 档数据 >> ") + // for _, v := range snapshot.Asks[:5] { + // fmt.Println(" price:", v[0], " amount:", v[1]) + // } + // fmt.Println(" Bid 5 档数据 >> ") + // for _, v := range snapshot.Bids[:5] { + // fmt.Println(" price:", v[0], " amount:", v[1]) + // } + return nil + }) + + // 可选类型:books books5 books-l2-tbt + channel := "books50-l2-tbt" + + instIds := []string{"BTC-USDT"} + for _, instId := range instIds { + var args []map[string]string + arg := make(map[string]string) + arg["instId"] = instId + args = append(args, arg) + + start := time.Now() + res, _, err = r.PubOrderBooks(OP_SUBSCRIBE, channel, args) + if res { + usedTime := time.Since(start) + fmt.Println("订阅成功!", usedTime.String()) + } else { + fmt.Println("订阅失败!", err) + t.Fatal("订阅失败!", err) + } + } + + select { + case <-end: + + } + //等待推送 + for _, instId := range instIds { + var args []map[string]string + arg := make(map[string]string) + arg["instId"] = instId + args = append(args, arg) + + start := time.Now() + res, _, err = r.PubOrderBooks(OP_UNSUBSCRIBE, channel, args) + if res { + usedTime := time.Since(start) + fmt.Println("取消订阅成功!", usedTime.String()) + } else { + fmt.Println("取消订阅失败!", err) + t.Fatal("取消订阅失败!", err) + } + } + +} + +// 期权定价频道 测试 +func TestOptionSummary(t *testing.T) { + r := prework() + var err error + var res bool + + var args []map[string]string + arg := make(map[string]string) + arg["uly"] = "BTC-USD" + args = append(args, arg) + + start := time.Now() + res, _, err = r.PubOptionSummary(OP_SUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("订阅成功!", usedTime.String()) + } else { + fmt.Println("订阅失败!", err) + t.Fatal("订阅失败!", err) + //return + } + + time.Sleep(60 * time.Second) + //等待推送 + + start = time.Now() + res, _, err = r.PubOptionSummary(OP_UNSUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("取消订阅成功!", usedTime.String()) + } else { + fmt.Println("取消订阅失败!", err) + t.Fatal("取消订阅失败!", err) + } + +} + +// 资金费率 测试 +func TestFundRate(t *testing.T) { + r := prework() + var err error + var res bool + + var args []map[string]string + arg := make(map[string]string) + arg["instId"] = "BTC-USD-SWAP" + args = append(args, arg) + + start := time.Now() + res, _, err = r.PubFundRate(OP_SUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("订阅成功!", usedTime.String()) + } else { + fmt.Println("订阅失败!", err) + t.Fatal("订阅失败!", err) + //return + } + + time.Sleep(600 * time.Second) + //等待推送 + + start = time.Now() + res, _, err = r.PubFundRate(OP_UNSUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("取消订阅成功!", usedTime.String()) + } else { + fmt.Println("取消订阅失败!", err) + t.Fatal("取消订阅失败!", err) + } + +} + +// 指数K线频道 测试 +func TestKLineIndex(t *testing.T) { + r := prework() + var err error + var res bool + + var args []map[string]string + arg := make(map[string]string) + + arg["instId"] = "BTC-USDT" + args = append(args, arg) + period := PERIOD_1MIN + + start := time.Now() + res, _, err = r.PubKLineIndex(OP_SUBSCRIBE, period, args) + if res { + usedTime := time.Since(start) + fmt.Println("订阅成功!", usedTime.String()) + } else { + fmt.Println("订阅失败!", err) + t.Fatal("订阅失败!", err) + //return + } + + time.Sleep(60 * time.Second) + //等待推送 + + start = time.Now() + res, _, err = r.PubKLineIndex(OP_UNSUBSCRIBE, period, args) + if res { + usedTime := time.Since(start) + fmt.Println("取消订阅成功!", usedTime.String()) + } else { + fmt.Println("取消订阅失败!", err) + t.Fatal("取消订阅失败!", err) + } + +} + +// 指数行情频道 测试 +func TestIndexMarket(t *testing.T) { + r := prework() + var err error + var res bool + + var args []map[string]string + arg := make(map[string]string) + arg["instId"] = "BTC-USDT" + args = append(args, arg) + + start := time.Now() + res, _, err = r.PubIndexTickers(OP_SUBSCRIBE, args) + if err != nil { + fmt.Println("订阅失败!", err) + } + usedTime := time.Since(start) + if res { + fmt.Println("订阅成功!", usedTime.String()) + } else { + fmt.Println("订阅失败!", usedTime.String()) + //return + } + + time.Sleep(600 * time.Second) + //等待推送 + + start = time.Now() + res, _, err = r.PubIndexTickers(OP_UNSUBSCRIBE, args) + if res { + usedTime := time.Since(start) + fmt.Println("取消订阅成功!", usedTime.String()) + } else { + fmt.Println("取消订阅失败!", err) + t.Fatal("取消订阅失败!", err) + } + +} diff --git a/ws/ws_test.go b/ws/ws_test.go new file mode 100644 index 0000000..8ac7e7e --- /dev/null +++ b/ws/ws_test.go @@ -0,0 +1,386 @@ +package ws + +import ( + "fmt" + "log" + "testing" + "time" + . "v5sdk_go/ws/wImpl" + + "github.com/stretchr/testify/assert" +) + +func init() { + log.SetFlags(log.LstdFlags | log.Llongfile) + +} + +func TestPing(t *testing.T) { + r := prework_pri(CROSS_ACCOUNT) + + res, _, _ := r.Ping() + assert.True(t, res, true) +} + +func TestWsClient_SubscribeAndUnSubscribe(t *testing.T) { + r := prework() + var err error + var res bool + + param := map[string]string{} + param["channel"] = "opt-summary" + param["uly"] = "BTC-USD" + + start := time.Now() + res, _, err = r.Subscribe(param) + if res { + usedTime := time.Since(start) + fmt.Println("订阅成功!", usedTime.String()) + } else { + fmt.Println("订阅失败!", err) + t.Fatal("订阅失败!", err) + //return + } + + time.Sleep(60 * time.Second) + //等待推送 + + start = time.Now() + res, _, err = r.UnSubscribe(param) + if res { + usedTime := time.Since(start) + fmt.Println("取消订阅成功!", usedTime.String()) + } else { + fmt.Println("取消订阅失败!", err) + t.Fatal("取消订阅失败!", err) + } + +} + +func TestWsClient_SubscribeAndUnSubscribe_priv(t *testing.T) { + r := prework_pri(ISOLATE_ACCOUNT) + var err error + var res bool + + var params []map[string]string + params = append(params, map[string]string{"channel": "orders", "instType": SPOT, "instId": "BTC-USDT"}) + //一个失败的订阅用例 + params = append(params, map[string]string{"channel": "positions", "instType": "any"}) + + for _, v := range params { + start := time.Now() + var data *ProcessDetail + res, data, err = r.Subscribe(v) + if res { + usedTime := time.Since(start) + fmt.Println("订阅成功!", usedTime.String()) + PrintDetail(data) + } else { + fmt.Println("订阅失败!", err) + //return + } + time.Sleep(60 * time.Second) + //等待推送 + + start = time.Now() + res, _, err = r.UnSubscribe(v) + if res { + usedTime := time.Since(start) + fmt.Println("取消订阅成功!", usedTime.String()) + } else { + fmt.Println("取消订阅失败!", err) + } + + } + +} + +func TestWsClient_Jrpc(t *testing.T) { + //r := prework_pri(ISOLATE_ACCOUNT) + r := prework_pri(CROSS_ACCOUNT) + var res bool + var err error + var data *ProcessDetail + + start := time.Now() + var args []map[string]interface{} + + param := map[string]interface{}{} + param["instId"] = "BTC-USDT" + param["clOrdId"] = "SIM0dcopy16069997808063455" + param["tdMode"] = "cross" + param["side"] = "sell" + param["ordType"] = "limit" + param["px"] = "19333.3" + param["sz"] = "0.18605445" + + param1 := map[string]interface{}{} + param1["instId"] = "BTC-USDT" + param1["clOrdId"] = "SIM0dcopy16069997808063456" + param1["tdMode"] = "cross" + param1["side"] = "sell" + param1["ordType"] = "limit" + param1["px"] = "19334.2" + param1["sz"] = "0.03508913" + + param2 := map[string]interface{}{} + param2["instId"] = "BTC-USDT" + param2["clOrdId"] = "SIM0dcopy16069997808063457" + param2["tdMode"] = "cross" + param2["side"] = "sell" + param2["ordType"] = "limit" + param2["px"] = "19334.8" + param2["sz"] = "0.03658186" + + param3 := map[string]interface{}{} + param3["instId"] = "BTC-USDT" + param3["clOrdId"] = "SIM0dcopy16069997808063458" + param3["tdMode"] = "cross" + param3["side"] = "sell" + param3["ordType"] = "limit" + param3["px"] = "19334.9" + param3["sz"] = "0.5" + + param4 := map[string]interface{}{} + param4["instId"] = "BTC-USDT" + param4["clOrdId"] = "SIM0dcopy16069997808063459" + param4["tdMode"] = "cross" + param4["side"] = "sell" + param4["ordType"] = "limit" + param4["px"] = "19335.2" + param4["sz"] = "0.3" + + param5 := map[string]interface{}{} + param5["instId"] = "BTC-USDT" + param5["clOrdId"] = "SIM0dcopy16069997808063460" + param5["tdMode"] = "cross" + param5["side"] = "sell" + param5["ordType"] = "limit" + param5["px"] = "19335.9" + param5["sz"] = "0.051" + + param6 := map[string]interface{}{} + param6["instId"] = "BTC-USDT" + param6["clOrdId"] = "SIM0dcopy16069997808063461" + param6["tdMode"] = "cross" + param6["side"] = "sell" + param6["ordType"] = "limit" + param6["px"] = "19336.4" + param6["sz"] = "1" + + param7 := map[string]interface{}{} + param7["instId"] = "BTC-USDT" + param7["clOrdId"] = "SIM0dcopy16069997808063462" + param7["tdMode"] = "cross" + param7["side"] = "sell" + param7["ordType"] = "limit" + param7["px"] = "19336.8" + param7["sz"] = "0.475" + + param8 := map[string]interface{}{} + param8["instId"] = "BTC-USDT" + param8["clOrdId"] = "SIM0dcopy16069997808063463" + param8["tdMode"] = "cross" + param8["side"] = "sell" + param8["ordType"] = "limit" + param8["px"] = "19337.3" + param8["sz"] = "0.21299357" + + param9 := map[string]interface{}{} + param9["instId"] = "BTC-USDT" + param9["clOrdId"] = "SIM0dcopy16069997808063464" + param9["tdMode"] = "cross" + param9["side"] = "sell" + param9["ordType"] = "limit" + param9["px"] = "19337.5" + param9["sz"] = "0.5" + + args = append(args, param) + args = append(args, param1) + args = append(args, param2) + args = append(args, param3) + args = append(args, param4) + args = append(args, param5) + args = append(args, param6) + args = append(args, param7) + args = append(args, param8) + args = append(args, param9) + + res, data, err = r.Jrpc("okexv5wsapi001", "order", args) + if res { + usedTime := time.Since(start) + fmt.Println("下单成功!", usedTime.String()) + PrintDetail(data) + } else { + usedTime := time.Since(start) + fmt.Println("下单失败!", usedTime.String(), err) + } +} + +/* + 测试 添加全局消息回调函数 +*/ +func TestAddMessageHook(t *testing.T) { + + r := prework_pri(CROSS_ACCOUNT) + + r.AddMessageHook(func(msg *Msg) error { + // 添加你的方法 + fmt.Println("这是全局消息自定义MessageHook") + fmt.Println("当前数据是", msg) + return nil + }) + + select {} +} + +/* + 普通推送数据回调函数 +*/ +func TestAddBookedDataHook(t *testing.T) { + var r *WsClient + + /*订阅私有频道*/ + { + r = prework_pri(CROSS_ACCOUNT) + var res bool + var err error + + r.AddBookMsgHook(func(ts time.Time, data MsgData) error { + // 添加你的方法 + fmt.Println("这是私有自定义AddBookMsgHook") + fmt.Println("当前数据是", data) + return nil + }) + + param := map[string]string{} + param["channel"] = "account" + param["ccy"] = "BTC" + + res, _, err = r.Subscribe(param) + if res { + fmt.Println("订阅成功!") + } else { + fmt.Println("订阅失败!", err) + t.Fatal("订阅失败!", err) + //return + } + + time.Sleep(100 * time.Second) + } + + //订阅公共频道 + { + r = prework() + var res bool + var err error + + r.AddBookMsgHook(func(ts time.Time, data MsgData) error { + // 添加你的方法 + fmt.Println("这是公共自定义AddBookMsgHook") + fmt.Println("当前数据是", data) + return nil + }) + + param := map[string]string{} + param["channel"] = "instruments" + param["instType"] = "FUTURES" + + res, _, err = r.Subscribe(param) + if res { + fmt.Println("订阅成功!") + } else { + fmt.Println("订阅失败!", err) + t.Fatal("订阅失败!", err) + //return + } + + select {} + } + +} + +func TestGetInfoFromErrMsg(t *testing.T) { + a := assert.New(t) + buf := ` +"channel:index-tickers,instId:BTC-USDT1 doesn't exist" + ` + ch := GetInfoFromErrMsg(buf) + //t.Log(ch) + a.Equal("index-tickers", ch) + + //assert.True(t,ch == "index-tickers") +} + +/* + + */ +func TestParseMessage(t *testing.T) { + r := prework() + var evt Event + msg := `{"event":"error","msg":"Contract does not exist.","code":"51001"}` + + evt, _, _ = r.parseMessage([]byte(msg)) + assert.True(t, EVENT_ERROR == evt) + + msg = `{"event":"error","msg":"channel:positions,ccy:BTC doesn't exist","code":"60018"}` + evt, _, _ = r.parseMessage([]byte(msg)) + assert.True(t, EVENT_BOOK_POSTION == evt) +} + +/* + 原始方式 深度订阅 测试 +*/ +func TestSubscribeTBT(t *testing.T) { + r := prework() + var res bool + var err error + + // 添加你的方法 + r.AddDepthHook(func(ts time.Time, data DepthData) error { + //fmt.Println("这是自定义AddBookMsgHook") + fmt.Println("当前数据是:", data) + return nil + }) + + param := map[string]string{} + param["channel"] = "books-l2-tbt" + //param["channel"] = "books" + param["instId"] = "BTC-USD-SWAP" + + res, _, err = r.Subscribe(param) + if res { + fmt.Println("订阅成功!") + } else { + fmt.Println("订阅失败!", err) + t.Fatal("订阅失败!", err) + //return + } + + time.Sleep(60 * time.Second) +} + +/* + + */ +func TestSubscribeBalAndPos(t *testing.T) { + r := prework_pri(CROSS_ACCOUNT) + var res bool + var err error + + param := map[string]string{} + + // 产品信息 + param["channel"] = "balance_and_position" + + res, _, err = r.Subscribe(param) + if res { + fmt.Println("订阅成功!") + } else { + fmt.Println("订阅失败!", err) + t.Fatal("订阅失败!", err) + //return + } + + time.Sleep(60 * time.Second) +}