first add

This commit is contained in:
zhangkun 2024-12-14 19:09:06 +08:00
commit d9714d59b9
39 changed files with 5251 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
vendor

21
LICENSE Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2021 wang
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

32
config/conf.go Normal file
View File

@ -0,0 +1,32 @@
package config
import "fmt"
type Env struct {
RestEndpoint string `yaml:"RestEndpoint"`
WsEndpoint string `yaml:"WsEndpoint"`
IsSimulation bool `yaml:"IsSimulation"`
}
type ApiInfo struct {
ApiKey string `yaml:"ApiKey"`
SecretKey string `yaml:"SecretKey"`
Passphrase string `yaml:"Passphrase"`
}
type MetaData struct {
Description string `yaml:"Description"`
}
type Config struct {
MetaData `yaml:"MetaData"`
Env `yaml:"Env"`
ApiInfo `yaml:"ApiInfo"`
}
func (s *ApiInfo) String() string {
res := "ApiInfo{"
res += fmt.Sprintf("ApiKey:%v,SecretKey:%v,Passphrase:%v", s.ApiKey, s.SecretKey, s.Passphrase)
res += "}"
return res
}

3
config/go.mod Normal file
View File

@ -0,0 +1,3 @@
module v5sdk_go/config
go 1.14

8
go.mod Normal file
View File

@ -0,0 +1,8 @@
module v5sdk_go
go 1.15
require (
github.com/gorilla/websocket v1.4.2
github.com/stretchr/testify v1.7.0
)

12
go.sum Normal file
View File

@ -0,0 +1,12 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

237
main.go Normal file
View File

@ -0,0 +1,237 @@
package main
import (
"context"
"fmt"
"log"
"time"
. "v5sdk_go/rest"
. "v5sdk_go/ws"
)
/*
rest API请求
更多示例请查看 rest/rest_test.go
*/
func REST() {
// 设置您的APIKey
apikey := APIKeyInfo{
ApiKey: "eca767d4-fda5-4a1b-bb28-49ae18093307",
SecKey: "8CA3628A556ED137977DB298D37BC7F3",
PassPhrase: "Op3Druaron",
}
// 第三个参数代表是否为模拟环境,更多信息查看接口说明
cli := NewRESTClient("https://www.okex.win", &apikey, false)
rsp, err := cli.Get(context.Background(), "/api/v5/account/balance", nil)
if err != nil {
return
}
fmt.Println("Response:")
fmt.Println("\thttp code: ", rsp.Code)
fmt.Println("\t总耗时: ", rsp.TotalUsedTime)
fmt.Println("\t请求耗时: ", rsp.ReqUsedTime)
fmt.Println("\t返回消息: ", rsp.Body)
fmt.Println("\terrCode: ", rsp.V5Response.Code)
fmt.Println("\terrMsg: ", rsp.V5Response.Msg)
fmt.Println("\tdata: ", rsp.V5Response.Data)
}
// 订阅私有频道
func wsPriv() {
ep := "wss://ws.okex.com:8443/ws/v5/private?brokerId=9999"
// 填写您自己的APIKey信息
apikey := "xxxx"
secretKey := "xxxxx"
passphrase := "xxxxx"
// 创建ws客户端
r, err := NewWsClient(ep)
if err != nil {
log.Println(err)
return
}
// 设置连接超时
r.SetDailTimeout(time.Second * 2)
err = r.Start()
if err != nil {
log.Println(err)
return
}
defer r.Stop()
var res bool
res, _, err = r.Login(apikey, secretKey, passphrase)
if res {
fmt.Println("登录成功!")
} else {
fmt.Println("登录失败!", err)
return
}
// 订阅账户频道
var args []map[string]string
arg := make(map[string]string)
arg["ccy"] = "BTC"
args = append(args, arg)
start := time.Now()
res, _, err = r.PrivAccout(OP_SUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("订阅成功!耗时:", usedTime.String())
} else {
fmt.Println("订阅失败!", err)
}
time.Sleep(100 * time.Second)
start = time.Now()
res, _, err = r.PrivAccout(OP_UNSUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("取消订阅成功!", usedTime.String())
} else {
fmt.Println("取消订阅失败!", err)
}
}
// 订阅公共频道
func wsPub() {
ep := "wss://wsaws.okex.com:8443/ws/v5/public?brokerId=9999"
// 创建ws客户端
r, err := NewWsClient(ep)
if err != nil {
log.Println(err)
return
}
// 设置连接超时
r.SetDailTimeout(time.Second * 2)
err = r.Start()
if err != nil {
log.Println(err)
return
}
defer r.Stop()
// 订阅产品频道
// 在这里初始化instrument列表
var args []map[string]string
arg := make(map[string]string)
arg["instType"] = FUTURES
//arg["instType"] = OPTION
args = append(args, arg)
start := time.Now()
//订阅
res, _, err := r.PubInstruemnts(OP_SUBSCRIBE, args)
fmt.Println("args:", args)
if res {
usedTime := time.Since(start)
fmt.Println("订阅成功!", usedTime.String())
} else {
fmt.Println("订阅失败!", err)
}
// 在这里 args1 初始化tickerList的列表
var args1 []map[string]string
arg1 := make(map[string]string)
arg1["instId"] = "ETH-USDT"
//arg["instType"] = OPTION
args1 = append(args1, arg1)
//------------------------------------------------------
start1 := time.Now()
res, _, err = r.PubTickers(OP_SUBSCRIBE, args1)
fmt.Println("args:", args)
if res {
usedTime := time.Since(start1)
fmt.Println("订阅成功!", usedTime.String())
} else {
fmt.Println("订阅失败!", err)
}
time.Sleep(300 * time.Second)
//
// start = time.Now()
// res, _, err = r.PubInstruemnts(OP_UNSUBSCRIBE, args)
// if res {
// usedTime := time.Since(start)
// fmt.Println("取消订阅成功!", usedTime.String())
// } else {
// fmt.Println("取消订阅失败!", err)
// }
}
// websocket交易
func wsJrpc() {
ep := "wss://ws.okex.com:8443/ws/v5/private?brokerId=9999"
// 填写您自己的APIKey信息
apikey := "xxxx"
secretKey := "xxxxx"
passphrase := "xxxxx"
var res bool
var req_id string
// 创建ws客户端
r, err := NewWsClient(ep)
if err != nil {
log.Println(err)
return
}
// 设置连接超时
r.SetDailTimeout(time.Second * 2)
err = r.Start()
if err != nil {
log.Println(err)
return
}
defer r.Stop()
res, _, err = r.Login(apikey, secretKey, passphrase)
if res {
fmt.Println("登录成功!")
} else {
fmt.Println("登录失败!", err)
return
}
start := time.Now()
param := map[string]interface{}{}
param["instId"] = "BTC-USDT"
param["tdMode"] = "cash"
param["side"] = "buy"
param["ordType"] = "market"
param["sz"] = "200"
req_id = "00001"
res, _, err = r.PlaceOrder(req_id, param)
if res {
usedTime := time.Since(start)
fmt.Println("下单成功!", usedTime.String())
} else {
usedTime := time.Since(start)
fmt.Println("下单失败!", usedTime.String(), err)
}
}
func main() {
// 公共订阅
wsPub()
// 私有订阅
// wsPriv()
// websocket交易
// wsJrpc()
// rest请求
// REST()
}

283
readme.md Normal file
View File

@ -0,0 +1,283 @@
# 简介
OKEX go版本的v5sdk仅供学习交流使用。
(文档持续完善中)
# 项目说明
## REST调用
``` go
// 设置您的APIKey
apikey := APIKeyInfo{
ApiKey: "xxxx",
SecKey: "xxxx",
PassPhrase: "xxxx",
}
// 第三个参数代表是否为模拟环境,更多信息查看接口说明
cli := NewRESTClient("https://www.okex.win", &apikey, true)
rsp, err := cli.Get(context.Background(), "/api/v5/account/balance", nil)
if err != nil {
return
}
fmt.Println("Response:")
fmt.Println("\thttp code: ", rsp.Code)
fmt.Println("\t总耗时: ", rsp.TotalUsedTime)
fmt.Println("\t请求耗时: ", rsp.ReqUsedTime)
fmt.Println("\t返回消息: ", rsp.Body)
fmt.Println("\terrCode: ", rsp.V5Response.Code)
fmt.Println("\terrMsg: ", rsp.V5Response.Msg)
fmt.Println("\tdata: ", rsp.V5Response.Data)
```
更多示例请查看rest/rest_test.go
## websocket订阅
### 私有频道
```go
ep := "wss://ws.okex.com:8443/ws/v5/private?brokerId=9999"
// 填写您自己的APIKey信息
apikey := "xxxx"
secretKey := "xxxxx"
passphrase := "xxxxx"
// 创建ws客户端
r, err := NewWsClient(ep)
if err != nil {
log.Println(err)
return
}
// 设置连接超时
r.SetDailTimeout(time.Second * 2)
err = r.Start()
if err != nil {
log.Println(err)
return
}
defer r.Stop()
var res bool
// 私有频道需要登录
res, _, err = r.Login(apikey, secretKey, passphrase)
if res {
fmt.Println("登录成功!")
} else {
fmt.Println("登录失败!", err)
return
}
var args []map[string]string
arg := make(map[string]string)
arg["ccy"] = "BTC"
args = append(args, arg)
start := time.Now()
// 订阅账户频道
res, _, err = r.PrivAccout(OP_SUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("订阅成功!耗时:", usedTime.String())
} else {
fmt.Println("订阅失败!", err)
}
time.Sleep(100 * time.Second)
start = time.Now()
// 取消订阅账户频道
res, _, err = r.PrivAccout(OP_UNSUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("取消订阅成功!", usedTime.String())
} else {
fmt.Println("取消订阅失败!", err)
}
```
更多示例请查看ws/ws_priv_channel_test.go
### 公有频道
```go
ep := "wss://ws.okex.com:8443/ws/v5/public?brokerId=9999"
// 创建ws客户端
r, err := NewWsClient(ep)
if err != nil {
log.Println(err)
return
}
// 设置连接超时
r.SetDailTimeout(time.Second * 2)
err = r.Start()
if err != nil {
log.Println(err)
return
}
defer r.Stop()
var args []map[string]string
arg := make(map[string]string)
arg["instType"] = FUTURES
//arg["instType"] = OPTION
args = append(args, arg)
start := time.Now()
// 订阅产品频道
res, _, err := r.PubInstruemnts(OP_SUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("订阅成功!", usedTime.String())
} else {
fmt.Println("订阅失败!", err)
}
time.Sleep(30 * time.Second)
start = time.Now()
// 取消订阅产品频道
res, _, err = r.PubInstruemnts(OP_UNSUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("取消订阅成功!", usedTime.String())
} else {
fmt.Println("取消订阅失败!", err)
}
```
更多示例请查看ws/ws_pub_channel_test.go
## websocket交易
```go
ep := "wss://ws.okex.com:8443/ws/v5/private?brokerId=9999"
// 填写您自己的APIKey信息
apikey := "xxxx"
secretKey := "xxxxx"
passphrase := "xxxxx"
var res bool
var req_id string
// 创建ws客户端
r, err := NewWsClient(ep)
if err != nil {
log.Println(err)
return
}
// 设置连接超时
r.SetDailTimeout(time.Second * 2)
err = r.Start()
if err != nil {
log.Println(err)
return
}
defer r.Stop()
res, _, err = r.Login(apikey, secretKey, passphrase)
if res {
fmt.Println("登录成功!")
} else {
fmt.Println("登录失败!", err)
return
}
start := time.Now()
param := map[string]interface{}{}
param["instId"] = "BTC-USDT"
param["tdMode"] = "cash"
param["side"] = "buy"
param["ordType"] = "market"
param["sz"] = "200"
req_id = "00001"
// 单个下单
res, _, err = r.PlaceOrder(req_id, param)
if res {
usedTime := time.Since(start)
fmt.Println("下单成功!", usedTime.String())
} else {
usedTime := time.Since(start)
fmt.Println("下单失败!", usedTime.String(), err)
}
```
更多示例请查看ws/ws_jrpc_test.go
## wesocket推送
websocket推送数据分为两种类型数据:`普通推送数据``深度类型数据`
```go
ws/wImpl/BookData.go
// 普通推送
type MsgData struct {
Arg map[string]string `json:"arg"`
Data []interface{} `json:"data"`
}
// 深度数据
type DepthData struct {
Arg map[string]string `json:"arg"`
Action string `json:"action"`
Data []DepthDetail `json:"data"`
}
```
如果需要对推送数据做处理用户可以自定义回调函数:
1. 全局消息处理的回调函数
该回调函数会处理所有从服务端接受到的数据。
```go
/*
添加全局消息处理的回调函数
*/
func (a *WsClient) AddMessageHook(fn ReceivedDataCallback) error {
a.onMessageHook = fn
return nil
}
```
使用方法参见 ws/ws_test.go中测试用例TestAddMessageHook。
2. 订阅消息处理回调函数
可以处理所有非深度类型的数据,包括 订阅/取消订阅,普通推送数据。
```go
/*
添加订阅消息处理的回调函数
*/
func (a *WsClient) AddBookMsgHook(fn ReceivedMsgDataCallback) error {
a.onBookMsgHook = fn
return nil
}
```
使用方法参见 ws/ws_test.go中测试用例TestAddBookedDataHook。
3. 深度消息处理的回调函数
这里需要说明的是Wsclient提供了深度数据管理和自动checksum的功能用户如果需要关闭此功能只需要调用EnableAutoDepthMgr方法。
```go
/*
添加深度消息处理的回调函数
*/
func (a *WsClient) AddDepthHook(fn ReceivedDepthDataCallback) error {
a.onDepthHook = fn
return nil
}
```
使用方法参见 ws/ws_pub_channel_test.go中测试用例TestOrderBooks。
4. 错误消息类型回调函数
```go
func (a *WsClient) AddErrMsgHook(fn ReceivedDataCallback) error {
a.OnErrorHook = fn
return nil
}
```
# 联系方式
邮箱:caron_co@163.com
微信:caron_co

35
rest/contants.go Normal file
View File

@ -0,0 +1,35 @@
package rest
const (
/*
http headers
*/
OK_ACCESS_KEY = "OK-ACCESS-KEY"
OK_ACCESS_SIGN = "OK-ACCESS-SIGN"
OK_ACCESS_TIMESTAMP = "OK-ACCESS-TIMESTAMP"
OK_ACCESS_PASSPHRASE = "OK-ACCESS-PASSPHRASE"
X_SIMULATE_TRADING = "x-simulated-trading"
CONTENT_TYPE = "Content-Type"
ACCEPT = "Accept"
COOKIE = "Cookie"
LOCALE = "locale="
APPLICATION_JSON = "application/json"
APPLICATION_JSON_UTF8 = "application/json; charset=UTF-8"
/*
i18n: internationalization
*/
ENGLISH = "en_US"
SIMPLIFIED_CHINESE = "zh_CN"
//zh_TW || zh_HK
TRADITIONAL_CHINESE = "zh_HK"
GET = "GET"
POST = "POST"
)

0
rest/go.mod Normal file
View File

331
rest/rest.go Normal file
View File

@ -0,0 +1,331 @@
package rest
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"strings"
"time"
. "v5sdk_go/utils"
)
type RESTAPI struct {
EndPoint string `json:"endPoint"`
// GET/POST
Method string `json:"method"`
Uri string `json:"uri"`
Param map[string]interface{} `json:"param"`
Timeout time.Duration
ApiKeyInfo *APIKeyInfo
isSimulate bool
}
type APIKeyInfo struct {
ApiKey string
PassPhrase string
SecKey string
UserId string
}
type RESTAPIResult struct {
Url string `json:"url"`
Param string `json:"param"`
Header string `json:"header"`
Code int `json:"code"`
// 原始返回信息
Body string `json:"body"`
// okexV5返回的数据
V5Response Okexv5APIResponse `json:"v5Response"`
ReqUsedTime time.Duration `json:"reqUsedTime"`
TotalUsedTime time.Duration `json:"totalUsedTime"`
}
type Okexv5APIResponse struct {
Code string `json:"code"`
Msg string `json:"msg"`
Data []map[string]interface{} `json:"data"`
}
/*
endPoint:请求地址
apiKey
isSimulate: 是否为模拟环境
*/
func NewRESTClient(endPoint string, apiKey *APIKeyInfo, isSimulate bool) *RESTAPI {
res := &RESTAPI{
EndPoint: endPoint,
ApiKeyInfo: apiKey,
isSimulate: isSimulate,
Timeout: 5 * time.Second,
}
return res
}
func NewRESTAPI(ep, method, uri string, param *map[string]interface{}) *RESTAPI {
//TODO:参数校验
reqParam := make(map[string]interface{})
if param != nil {
reqParam = *param
}
res := &RESTAPI{
EndPoint: ep,
Method: method,
Uri: uri,
Param: reqParam,
Timeout: 150 * time.Second,
}
return res
}
func (this *RESTAPI) SetSimulate(b bool) *RESTAPI {
this.isSimulate = b
return this
}
func (this *RESTAPI) SetAPIKey(apiKey, secKey, passPhrase string) *RESTAPI {
if this.ApiKeyInfo == nil {
this.ApiKeyInfo = &APIKeyInfo{
ApiKey: apiKey,
PassPhrase: passPhrase,
SecKey: secKey,
}
} else {
this.ApiKeyInfo.ApiKey = apiKey
this.ApiKeyInfo.PassPhrase = passPhrase
this.ApiKeyInfo.SecKey = secKey
}
return this
}
func (this *RESTAPI) SetUserId(userId string) *RESTAPI {
if this.ApiKeyInfo == nil {
fmt.Println("ApiKey为空")
return this
}
this.ApiKeyInfo.UserId = userId
return this
}
func (this *RESTAPI) SetTimeOut(timeout time.Duration) *RESTAPI {
this.Timeout = timeout
return this
}
// GET请求
func (this *RESTAPI) Get(ctx context.Context, uri string, param *map[string]interface{}) (res *RESTAPIResult, err error) {
this.Method = GET
this.Uri = uri
reqParam := make(map[string]interface{})
if param != nil {
reqParam = *param
}
this.Param = reqParam
return this.Run(ctx)
}
// POST请求
func (this *RESTAPI) Post(ctx context.Context, uri string, param *map[string]interface{}) (res *RESTAPIResult, err error) {
this.Method = POST
this.Uri = uri
reqParam := make(map[string]interface{})
if param != nil {
reqParam = *param
}
this.Param = reqParam
return this.Run(ctx)
}
func (this *RESTAPI) Run(ctx context.Context) (res *RESTAPIResult, err error) {
if this.ApiKeyInfo == nil {
err = errors.New("APIKey不可为空")
return
}
procStart := time.Now()
defer func() {
if res != nil {
res.TotalUsedTime = time.Since(procStart)
}
}()
client := &http.Client{
Timeout: this.Timeout,
}
uri, body, err := this.GenReqInfo()
if err != nil {
return
}
url := this.EndPoint + uri
bodyBuf := new(bytes.Buffer)
bodyBuf.ReadFrom(strings.NewReader(body))
req, err := http.NewRequest(this.Method, url, bodyBuf)
if err != nil {
return
}
res = &RESTAPIResult{
Url: url,
Param: body,
}
// Sign and set request headers
timestamp := IsoTime()
preHash := PreHashString(timestamp, this.Method, uri, body)
//log.Println("preHash:", preHash)
sign, err := HmacSha256Base64Signer(preHash, this.ApiKeyInfo.SecKey)
if err != nil {
return
}
//log.Println("sign:", sign)
headStr := this.SetHeaders(req, timestamp, sign)
res.Header = headStr
this.PrintRequest(req, body, preHash)
resp, err := client.Do(req)
if err != nil {
fmt.Println("请求失败!", err)
return
}
defer resp.Body.Close()
res.ReqUsedTime = time.Since(procStart)
resBuff, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Println("获取请求结果失败!", err)
return
}
res.Body = string(resBuff)
res.Code = resp.StatusCode
// 解析结果
var v5rsp Okexv5APIResponse
err = json.Unmarshal(resBuff, &v5rsp)
if err != nil {
fmt.Println("解析v5返回失败", err)
return
}
res.V5Response = v5rsp
return
}
/*
生成请求对应的参数
*/
func (this *RESTAPI) GenReqInfo() (uri string, body string, err error) {
uri = this.Uri
switch this.Method {
case GET:
getParam := []string{}
if len(this.Param) == 0 {
return
}
for k, v := range this.Param {
getParam = append(getParam, fmt.Sprintf("%v=%v", k, v))
}
uri = uri + "?" + strings.Join(getParam, "&")
case POST:
var rawBody []byte
rawBody, err = json.Marshal(this.Param)
if err != nil {
return
}
body = string(rawBody)
default:
err = errors.New("request type unknown!")
return
}
return
}
/*
Set http request headers:
Accept: application/json
Content-Type: application/json; charset=UTF-8 (default)
Cookie: locale=en_US (English)
OK-ACCESS-KEY: (Your setting)
OK-ACCESS-SIGN: (Use your setting, auto sign and add)
OK-ACCESS-TIMESTAMP: (Auto add)
OK-ACCESS-PASSPHRASE: Your setting
*/
func (this *RESTAPI) SetHeaders(request *http.Request, timestamp string, sign string) (header string) {
request.Header.Add(ACCEPT, APPLICATION_JSON)
header += ACCEPT + ":" + APPLICATION_JSON + "\n"
request.Header.Add(CONTENT_TYPE, APPLICATION_JSON_UTF8)
header += CONTENT_TYPE + ":" + APPLICATION_JSON_UTF8 + "\n"
request.Header.Add(COOKIE, LOCALE+ENGLISH)
header += COOKIE + ":" + LOCALE + ENGLISH + "\n"
request.Header.Add(OK_ACCESS_KEY, this.ApiKeyInfo.ApiKey)
header += OK_ACCESS_KEY + ":" + this.ApiKeyInfo.ApiKey + "\n"
request.Header.Add(OK_ACCESS_SIGN, sign)
header += OK_ACCESS_SIGN + ":" + sign + "\n"
request.Header.Add(OK_ACCESS_TIMESTAMP, timestamp)
header += OK_ACCESS_TIMESTAMP + ":" + timestamp + "\n"
request.Header.Add(OK_ACCESS_PASSPHRASE, this.ApiKeyInfo.PassPhrase)
header += OK_ACCESS_PASSPHRASE + ":" + this.ApiKeyInfo.PassPhrase + "\n"
//模拟盘交易标记
if this.isSimulate {
request.Header.Add(X_SIMULATE_TRADING, "1")
header += X_SIMULATE_TRADING + ":1" + "\n"
}
return
}
/*
打印header信息
*/
func (this *RESTAPI) PrintRequest(request *http.Request, body string, preHash string) {
if this.ApiKeyInfo.SecKey != "" {
fmt.Println(" Secret-Key: " + this.ApiKeyInfo.SecKey)
}
fmt.Println(" Request(" + IsoTime() + "):")
fmt.Println("\tUrl: " + request.URL.String())
fmt.Println("\tMethod: " + strings.ToUpper(request.Method))
if len(request.Header) > 0 {
fmt.Println("\tHeaders: ")
for k, v := range request.Header {
if strings.Contains(k, "Ok-") {
k = strings.ToUpper(k)
}
fmt.Println("\t\t" + k + ": " + v[0])
}
}
fmt.Println("\tBody: " + body)
if preHash != "" {
fmt.Println(" PreHash: " + preHash)
}
}

100
rest/rest_test.go Normal file
View File

@ -0,0 +1,100 @@
package rest
import (
"context"
"fmt"
"testing"
)
/*
GET请求
*/
func TestRESTAPIGet(t *testing.T) {
rest := NewRESTAPI("https://www.okex.win", GET, "/api/v5/account/balance", nil)
rest.SetSimulate(true).SetAPIKey("xxxx", "xxxx", "xxxx")
rest.SetUserId("xxxxx")
response, err := rest.Run(context.Background())
if err != nil {
fmt.Println(err)
return
}
fmt.Println("Response:")
fmt.Println("\thttp code: ", response.Code)
fmt.Println("\t总耗时: ", response.TotalUsedTime)
fmt.Println("\t请求耗时: ", response.ReqUsedTime)
fmt.Println("\t返回消息: ", response.Body)
fmt.Println("\terrCode: ", response.V5Response.Code)
fmt.Println("\terrMsg: ", response.V5Response.Msg)
fmt.Println("\tdata: ", response.V5Response.Data)
// 请求的另一种方式
apikey := APIKeyInfo{
ApiKey: "xxxxx",
SecKey: "xxxxx",
PassPhrase: "xxx",
}
cli := NewRESTClient("https://www.okex.win", &apikey, true)
rsp, err := cli.Get(context.Background(), "/api/v5/account/balance", nil)
if err != nil {
return
}
fmt.Println("Response:")
fmt.Println("\thttp code: ", rsp.Code)
fmt.Println("\t总耗时: ", rsp.TotalUsedTime)
fmt.Println("\t请求耗时: ", rsp.ReqUsedTime)
fmt.Println("\t返回消息: ", rsp.Body)
fmt.Println("\terrCode: ", rsp.V5Response.Code)
fmt.Println("\terrMsg: ", rsp.V5Response.Msg)
fmt.Println("\tdata: ", rsp.V5Response.Data)
}
/*
POST请求
*/
func TestRESTAPIPost(t *testing.T) {
param := make(map[string]interface{})
param["greeksType"] = "PA"
rest := NewRESTAPI("https://www.okex.win", POST, "/api/v5/account/set-greeks", &param)
rest.SetSimulate(true).SetAPIKey("xxxx", "xxxx", "xxxx")
response, err := rest.Run(context.Background())
if err != nil {
fmt.Println(err)
return
}
fmt.Println("Response:")
fmt.Println("\thttp code: ", response.Code)
fmt.Println("\t总耗时: ", response.TotalUsedTime)
fmt.Println("\t请求耗时: ", response.ReqUsedTime)
fmt.Println("\t返回消息: ", response.Body)
fmt.Println("\terrCode: ", response.V5Response.Code)
fmt.Println("\terrMsg: ", response.V5Response.Msg)
fmt.Println("\tdata: ", response.V5Response.Data)
// 请求的另一种方式
apikey := APIKeyInfo{
ApiKey: "xxxx",
SecKey: "xxxxx",
PassPhrase: "xxxx",
}
cli := NewRESTClient("https://www.okex.win", &apikey, true)
rsp, err := cli.Post(context.Background(), "/api/v5/account/set-greeks", &param)
if err != nil {
return
}
fmt.Println("Response:")
fmt.Println("\thttp code: ", rsp.Code)
fmt.Println("\t总耗时: ", rsp.TotalUsedTime)
fmt.Println("\t请求耗时: ", rsp.ReqUsedTime)
fmt.Println("\t返回消息: ", rsp.Body)
fmt.Println("\terrCode: ", rsp.V5Response.Code)
fmt.Println("\terrMsg: ", rsp.V5Response.Msg)
fmt.Println("\tdata: ", rsp.V5Response.Data)
}

0
utils/go.mod Normal file
View File

102
utils/utils.go Normal file
View File

@ -0,0 +1,102 @@
package utils
import (
"bytes"
"compress/flate"
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"io/ioutil"
"log"
"strconv"
"strings"
"time"
//"net/http"
)
/*
Get a epoch time
eg: 1521221737
*/
func EpochTime() string {
millisecond := time.Now().UnixNano() / 1000000
epoch := strconv.Itoa(int(millisecond))
epochBytes := []byte(epoch)
epoch = string(epochBytes[:10])
return epoch
}
/*
signing a message
using: hmac sha256 + base64
eg:
message = Pre_hash function comment
secretKey = E65791902180E9EF4510DB6A77F6EBAE
return signed string = TO6uwdqz+31SIPkd4I+9NiZGmVH74dXi+Fd5X0EzzSQ=
*/
func HmacSha256Base64Signer(message string, secretKey string) (string, error) {
mac := hmac.New(sha256.New, []byte(secretKey))
_, err := mac.Write([]byte(message))
if err != nil {
return "", err
}
return base64.StdEncoding.EncodeToString(mac.Sum(nil)), nil
}
/*
the pre hash string
eg:
timestamp = 2018-03-08T10:59:25.789Z
method = POST
request_path = /orders?before=2&limit=30
body = {"product_id":"BTC-USD-0309","order_id":"377454671037440"}
return pre hash string = 2018-03-08T10:59:25.789ZPOST/orders?before=2&limit=30{"product_id":"BTC-USD-0309","order_id":"377454671037440"}
*/
func PreHashString(timestamp string, method string, requestPath string, body string) string {
return timestamp + strings.ToUpper(method) + requestPath + body
}
/*
struct convert json string
*/
func Struct2JsonString(raw interface{}) (jsonString string, err error) {
//fmt.Println("转化json,", raw)
data, err := json.Marshal(raw)
if err != nil {
log.Println("convert json failed!", err)
return "", err
}
//log.Println(string(data))
return string(data), nil
}
// 解压缩消息
func GzipDecode(in []byte) ([]byte, error) {
reader := flate.NewReader(bytes.NewReader(in))
defer reader.Close()
return ioutil.ReadAll(reader)
}
/*
Get a iso time
eg: 2018-03-16T18:02:48.284Z
*/
func IsoTime() string {
utcTime := time.Now().UTC()
iso := utcTime.String()
isoBytes := []byte(iso)
iso = string(isoBytes[:10]) + "T" + string(isoBytes[11:23]) + "Z"
return iso
}

17
utils/utils_test.go Normal file
View File

@ -0,0 +1,17 @@
package utils
import (
"fmt"
"testing"
)
func TestHmacSha256Base64Signer(t *testing.T) {
raw := `2021-04-06T03:33:21.681ZPOST/api/v5/trade/order{"instId":"ETH-USDT-SWAP","ordType":"limit","px":"2300","side":"sell","sz":"1","tdMode":"cross"}`
key := "1A9E86759F2D2AA16E389FD3B7F8273E"
res, err := HmacSha256Base64Signer(raw, key)
if err != nil {
t.Fatal(err)
}
fmt.Println(res)
t.Log(res)
}

0
ws/go.mod Normal file
View File

75
ws/utils.go Normal file
View File

@ -0,0 +1,75 @@
package ws
import (
"errors"
"log"
"runtime/debug"
. "v5sdk_go/ws/wImpl"
. "v5sdk_go/ws/wInterface"
)
// 判断返回结果成功失败
func checkResult(wsReq WSReqData, wsRsps []*Msg) (res bool, err error) {
defer func() {
a := recover()
if a != nil {
log.Printf("Receive End. Recover msg: %+v", a)
debug.PrintStack()
}
return
}()
res = false
if len(wsRsps) == 0 {
return
}
for _, v := range wsRsps {
switch v.Info.(type) {
case ErrData:
return
}
if wsReq.GetType() != v.Info.(WSRspData).MsgType() {
err = errors.New("消息类型不一致")
return
}
}
//检查所有频道是否都更新成功
if wsReq.GetType() == MSG_NORMAL {
req, ok := wsReq.(ReqData)
if !ok {
log.Println("类型转化失败", req)
err = errors.New("类型转化失败")
return
}
for idx, _ := range req.Args {
ok := false
i_req := req.Args[idx]
//fmt.Println("检查",i_req)
for i, _ := range wsRsps {
info, _ := wsRsps[i].Info.(RspData)
//fmt.Println("<<",info)
if info.Event == req.Op && info.Arg["channel"] == i_req["channel"] && info.Arg["instType"] == i_req["instType"] {
ok = true
continue
}
}
if !ok {
err = errors.New("未得到所有的期望的返回结果")
return
}
}
} else {
for i, _ := range wsRsps {
info, _ := wsRsps[i].Info.(JRPCRsp)
if info.Code != "0" {
return
}
}
}
res = true
return
}

226
ws/wImpl/BookData.go Normal file
View File

@ -0,0 +1,226 @@
/*
订阅频道后收到的推送数据
*/
package wImpl
import (
"bytes"
"errors"
"fmt"
"hash/crc32"
"log"
"strconv"
"strings"
)
// 普通推送
type MsgData struct {
Arg map[string]string `json:"arg"`
Data []interface{} `json:"data"`
}
// 深度数据
type DepthData struct {
Arg map[string]string `json:"arg"`
Action string `json:"action"`
Data []DepthDetail `json:"data"`
}
type DepthDetail struct {
Asks [][]string `json:"asks"`
Bids [][]string `json:"bids"`
Ts string `json:"ts"`
Checksum int32 `json:"checksum"`
}
/*
深度数据校验
*/
func (this *DepthData) CheckSum(snap *DepthDetail) (pDepData *DepthDetail, err error) {
if len(this.Data) != 1 {
err = errors.New("深度数据错误!")
return
}
if this.Action == DEPTH_SNAPSHOT {
_, cs := CalCrc32(this.Data[0].Asks, this.Data[0].Bids)
if cs != this.Data[0].Checksum {
err = errors.New("校验失败!")
return
}
pDepData = &this.Data[0]
log.Println("snapshot校验成功", this.Data[0].Checksum)
}
if this.Action == DEPTH_UPDATE {
if snap == nil {
err = errors.New("深度快照数据不可为空!")
return
}
pDepData, err = MergDepthData(*snap, this.Data[0], this.Data[0].Checksum)
if err != nil {
return
}
log.Println("update校验成功", this.Data[0].Checksum)
}
return
}
func CalCrc32(askDepths [][]string, bidDepths [][]string) (bytes.Buffer, int32) {
crc32BaseBuffer := bytes.Buffer{}
crcAskDepth, crcBidDepth := 25, 25
if len(askDepths) < 25 {
crcAskDepth = len(askDepths)
}
if len(bidDepths) < 25 {
crcBidDepth = len(bidDepths)
}
if crcAskDepth == crcBidDepth {
for i := 0; i < crcAskDepth; i++ {
if crc32BaseBuffer.Len() > 0 {
crc32BaseBuffer.WriteString(":")
}
crc32BaseBuffer.WriteString(
fmt.Sprintf("%v:%v:%v:%v",
(bidDepths)[i][0], (bidDepths)[i][1],
(askDepths)[i][0], (askDepths)[i][1]))
}
} else {
var crcArr []string
for i, j := 0, 0; i < crcBidDepth || j < crcAskDepth; {
if i < crcBidDepth {
crcArr = append(crcArr, fmt.Sprintf("%v:%v", (bidDepths)[i][0], (bidDepths)[i][1]))
i++
}
if j < crcAskDepth {
crcArr = append(crcArr, fmt.Sprintf("%v:%v", (askDepths)[j][0], (askDepths)[j][1]))
j++
}
}
crc32BaseBuffer.WriteString(strings.Join(crcArr, ":"))
}
expectCrc32 := int32(crc32.ChecksumIEEE(crc32BaseBuffer.Bytes()))
return crc32BaseBuffer, expectCrc32
}
/*
深度合并的内部方法
返回结果
res合并后的深度
index: 最新的 ask1/bids1 的索引
*/
func mergeDepth(oldDepths [][]string, newDepths [][]string, method string) (res [][]string, err error) {
oldIdx, newIdx := 0, 0
for oldIdx < len(oldDepths) && newIdx < len(newDepths) {
oldItem := oldDepths[oldIdx]
newItem := newDepths[newIdx]
var oldPrice, newPrice float64
oldPrice, err = strconv.ParseFloat(oldItem[0], 10)
if err != nil {
return
}
newPrice, err = strconv.ParseFloat(newItem[0], 10)
if err != nil {
return
}
if oldPrice == newPrice {
if newItem[1] != "0" {
res = append(res, newItem)
}
oldIdx++
newIdx++
} else {
switch method {
// 降序
case "bids":
if oldPrice < newPrice {
res = append(res, newItem)
newIdx++
} else {
res = append(res, oldItem)
oldIdx++
}
// 升序
case "asks":
if oldPrice > newPrice {
res = append(res, newItem)
newIdx++
} else {
res = append(res, oldItem)
oldIdx++
}
}
}
}
if oldIdx < len(oldDepths) {
res = append(res, oldDepths[oldIdx:]...)
}
if newIdx < len(newDepths) {
res = append(res, newDepths[newIdx:]...)
}
return
}
/*
深度合并并校验
*/
func MergDepthData(snap DepthDetail, update DepthDetail, expChecksum int32) (res *DepthDetail, err error) {
newAskDepths, err1 := mergeDepth(snap.Asks, update.Asks, "asks")
if err1 != nil {
return
}
// log.Println("old Ask - ", snap.Asks)
// log.Println("update Ask - ", update.Asks)
// log.Println("new Ask - ", newAskDepths)
newBidDepths, err2 := mergeDepth(snap.Bids, update.Bids, "bids")
if err2 != nil {
return
}
// log.Println("old Bids - ", snap.Bids)
// log.Println("update Bids - ", update.Bids)
// log.Println("new Bids - ", newBidDepths)
cBuf, checksum := CalCrc32(newAskDepths, newBidDepths)
if checksum != expChecksum {
err = errors.New("校验失败!")
log.Println("buffer:", cBuf.String())
log.Fatal(checksum, expChecksum)
return
}
res = &DepthDetail{
Asks: newAskDepths,
Bids: newBidDepths,
Ts: update.Ts,
Checksum: update.Checksum,
}
return
}

13
ws/wImpl/ErrData.go Normal file
View File

@ -0,0 +1,13 @@
/*
错误数据
*/
package wImpl
// 服务端请求错误返回消息格式
type ErrData struct {
Event string `json:"event"`
Code string `json:"code"`
Msg string `json:"msg"`
}

50
ws/wImpl/JRPCData.go Normal file
View File

@ -0,0 +1,50 @@
/*
JRPC请求/响应数据
*/
package wImpl
import (
"encoding/json"
. "v5sdk_go/utils"
)
// jrpc请求结构体
type JRPCReq struct {
Id string `json:"id"`
Op string `json:"op"`
Args []map[string]interface{} `json:"args"`
}
func (r JRPCReq) GetType() int {
return MSG_JRPC
}
func (r JRPCReq) ToString() string {
data, err := Struct2JsonString(r)
if err != nil {
return ""
}
return data
}
func (r JRPCReq) Len() int {
return 1
}
// jrpc响应结构体
type JRPCRsp struct {
Id string `json:"id"`
Op string `json:"op"`
Data []map[string]interface{} `json:"data"`
Code string `json:"code"`
Msg string `json:"msg"`
}
func (r JRPCRsp) MsgType() int {
return MSG_JRPC
}
func (r JRPCRsp) String() string {
raw, _ := json.Marshal(r)
return string(raw)
}

47
ws/wImpl/ReqData.go Normal file
View File

@ -0,0 +1,47 @@
/*
普通订阅请求和响应的数据格式
*/
package wImpl
import (
"encoding/json"
. "v5sdk_go/utils"
)
// 客户端请求消息格式
type ReqData struct {
Op string `json:"op"`
Args []map[string]string `json:"args"`
}
func (r ReqData) GetType() int {
return MSG_NORMAL
}
func (r ReqData) ToString() string {
data, err := Struct2JsonString(r)
if err != nil {
return ""
}
return data
}
func (r ReqData) Len() int {
return len(r.Args)
}
// 服务端请求响应消息格式
type RspData struct {
Event string `json:"event"`
Arg map[string]string `json:"arg"`
}
func (r RspData) MsgType() int {
return MSG_NORMAL
}
func (r RspData) String() string {
raw, _ := json.Marshal(r)
return string(raw)
}

241
ws/wImpl/contants.go Normal file
View File

@ -0,0 +1,241 @@
package wImpl
import (
"regexp"
)
/*
*/
const (
MSG_NORMAL = iota
MSG_JRPC
)
//事件
type Event int
/*
EventID
*/
const (
EVENT_UNKNOWN Event = iota
EVENT_ERROR
EVENT_PING
EVENT_LOGIN
//订阅公共频道
EVENT_BOOK_INSTRUMENTS
EVENT_STATUS
EVENT_BOOK_TICKERS
EVENT_BOOK_OPEN_INTEREST
EVENT_BOOK_KLINE
EVENT_BOOK_TRADE
EVENT_BOOK_ESTIMATE_PRICE
EVENT_BOOK_MARK_PRICE
EVENT_BOOK_MARK_PRICE_CANDLE_CHART
EVENT_BOOK_LIMIT_PRICE
EVENT_BOOK_ORDER_BOOK
EVENT_BOOK_ORDER_BOOK5
EVENT_BOOK_ORDER_BOOK_TBT
EVENT_BOOK_ORDER_BOOK50_TBT
EVENT_BOOK_OPTION_SUMMARY
EVENT_BOOK_FUND_RATE
EVENT_BOOK_KLINE_INDEX
EVENT_BOOK_INDEX_TICKERS
//订阅私有频道
EVENT_BOOK_ACCOUNT
EVENT_BOOK_POSTION
EVENT_BOOK_ORDER
EVENT_BOOK_ALG_ORDER
EVENT_BOOK_B_AND_P
// JRPC
EVENT_PLACE_ORDER
EVENT_PLACE_BATCH_ORDERS
EVENT_CANCEL_ORDER
EVENT_CANCEL_BATCH_ORDERS
EVENT_AMEND_ORDER
EVENT_AMEND_BATCH_ORDERS
//订阅返回数据
EVENT_BOOKED_DATA
EVENT_DEPTH_DATA
)
/*
EventID事件名称channel
带有周期参数的频道 行情频道 需要将channel写为 正则表达模式方便 类型匹配 "^candle*"
*/
var EVENT_TABLE = [][]interface{}{
// 未知的消息
{EVENT_UNKNOWN, "未知", ""},
// 错误的消息
{EVENT_ERROR, "错误", ""},
// Ping
{EVENT_PING, "ping", ""},
// 登陆
{EVENT_LOGIN, "登录", ""},
/*
订阅公共频道
*/
{EVENT_BOOK_INSTRUMENTS, "产品", "instruments"},
{EVENT_STATUS, "status", "status"},
{EVENT_BOOK_TICKERS, "行情", "tickers"},
{EVENT_BOOK_OPEN_INTEREST, "持仓总量", "open-interest"},
{EVENT_BOOK_KLINE, "K线", "candle"},
{EVENT_BOOK_TRADE, "交易", "trades"},
{EVENT_BOOK_ESTIMATE_PRICE, "预估交割/行权价格", "estimated-price"},
{EVENT_BOOK_MARK_PRICE, "标记价格", "mark-price"},
{EVENT_BOOK_MARK_PRICE_CANDLE_CHART, "标记价格K线", "mark-price-candle"},
{EVENT_BOOK_LIMIT_PRICE, "限价", "price-limit"},
{EVENT_BOOK_ORDER_BOOK, "400档深度", "books"},
{EVENT_BOOK_ORDER_BOOK5, "5档深度", "books5"},
{EVENT_BOOK_ORDER_BOOK_TBT, "tbt深度", "books-l2-tbt"},
{EVENT_BOOK_ORDER_BOOK50_TBT, "tbt50深度", "books50-l2-tbt"},
{EVENT_BOOK_OPTION_SUMMARY, "期权定价", "opt-summary"},
{EVENT_BOOK_FUND_RATE, "资金费率", "funding-rate"},
{EVENT_BOOK_KLINE_INDEX, "指数K线", "index-candle"},
{EVENT_BOOK_INDEX_TICKERS, "指数行情", "index-tickers"},
/*
订阅私有频道
*/
{EVENT_BOOK_ACCOUNT, "账户", "account"},
{EVENT_BOOK_POSTION, "持仓", "positions"},
{EVENT_BOOK_ORDER, "订单", "orders"},
{EVENT_BOOK_ALG_ORDER, "策略委托订单", "orders-algo"},
{EVENT_BOOK_B_AND_P, "账户余额和持仓", "balance_and_position"},
/*
JRPC
*/
{EVENT_PLACE_ORDER, "下单", "order"},
{EVENT_PLACE_BATCH_ORDERS, "批量下单", "batch-orders"},
{EVENT_CANCEL_ORDER, "撤单", "cancel-order"},
{EVENT_CANCEL_BATCH_ORDERS, "批量撤单", "batch-cancel-orders"},
{EVENT_AMEND_ORDER, "改单", "amend-order"},
{EVENT_AMEND_BATCH_ORDERS, "批量改单", "batch-amend-orders"},
/*
订阅返回数据
注意推送数据channle统一为""
*/
{EVENT_BOOKED_DATA, "普通推送", ""},
{EVENT_DEPTH_DATA, "深度推送", ""},
}
/*
获取事件名称
*/
func (e Event) String() string {
for _, v := range EVENT_TABLE {
eventId := v[0].(Event)
if e == eventId {
return v[1].(string)
}
}
return ""
}
/*
通过事件获取对应的channel信息
对于频道名称有时间周期的 通过参数 pd 传入拼接后返回完整channel信息
*/
func (e Event) GetChannel(pd Period) string {
channel := ""
for _, v := range EVENT_TABLE {
eventId := v[0].(Event)
if e == eventId {
channel = v[2].(string)
break
}
}
if channel == "" {
return ""
}
return channel + string(pd)
}
/*
通过channel信息匹配获取事件类型
*/
func GetEventId(raw string) Event {
evt := EVENT_UNKNOWN
for _, v := range EVENT_TABLE {
channel := v[2].(string)
if raw == channel {
evt = v[0].(Event)
break
}
regexp := regexp.MustCompile(`^(.*)([1-9][0-9]?[\w])$`)
//regexp := regexp.MustCompile(`^http://www.flysnow.org/([\d]{4})/([\d]{2})/([\d]{2})/([\w-]+).html$`)
substr := regexp.FindStringSubmatch(raw)
//fmt.Println(substr)
if len(substr) >= 2 {
if substr[1] == channel {
evt = v[0].(Event)
break
}
}
}
return evt
}
// 时间维度
type Period string
const (
// 年
PERIOD_1YEAR Period = "1Y"
// 月
PERIOD_6Mon Period = "6M"
PERIOD_3Mon Period = "3M"
PERIOD_1Mon Period = "1M"
// 周
PERIOD_1WEEK Period = "1W"
// 天
PERIOD_5DAY Period = "5D"
PERIOD_3DAY Period = "3D"
PERIOD_2DAY Period = "2D"
PERIOD_1DAY Period = "1D"
// 小时
PERIOD_12HOUR Period = "12H"
PERIOD_6HOUR Period = "6H"
PERIOD_4HOUR Period = "4H"
PERIOD_2HOUR Period = "2H"
PERIOD_1HOUR Period = "1H"
// 分钟
PERIOD_30MIN Period = "30m"
PERIOD_15MIN Period = "15m"
PERIOD_5MIN Period = "5m"
PERIOD_3MIN Period = "3m"
PERIOD_1MIN Period = "1m"
// 缺省
PERIOD_NONE Period = ""
)
// 深度枚举
const (
DEPTH_SNAPSHOT = "snapshot"
DEPTH_UPDATE = "update"
)

28
ws/wImpl/contants_test.go Normal file
View File

@ -0,0 +1,28 @@
package wImpl
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestGetEventId(t *testing.T) {
id1 := GetEventId("index-candle30m")
assert.True(t, id1 == EVENT_BOOK_KLINE_INDEX)
id2 := GetEventId("candle1Y")
assert.True(t, id2 == EVENT_BOOK_KLINE)
id3 := GetEventId("orders-algo")
assert.True(t, id3 == EVENT_BOOK_ALG_ORDER)
id4 := GetEventId("balance_and_position")
assert.True(t, id4 == EVENT_BOOK_B_AND_P)
id5 := GetEventId("index-candle1m")
assert.True(t, id5 == EVENT_BOOK_KLINE_INDEX)
}

9
ws/wInterface/IParam.go Normal file
View File

@ -0,0 +1,9 @@
package wInterface
import . "v5sdk_go/ws/wImpl"
// 请求数据
type WSParam interface {
EventType() Event
ToMap() *map[string]string
}

View File

@ -0,0 +1,8 @@
package wInterface
// 请求数据
type WSReqData interface {
GetType() int
Len() int
ToString() string
}

View File

@ -0,0 +1,6 @@
package wInterface
// 返回数据
type WSRspData interface {
MsgType() int
}

View File

@ -0,0 +1,147 @@
package ws
// HOW TO RUN
// go test ws_cli.go ws_op.go ws_contants.go utils.go ws_pub_channel.go ws_pub_channel_test.go ws_priv_channel.go ws_priv_channel_test.go ws_jrpc.go ws_jrpc_test.go ws_test_AddBookedDataHook.go -v
import (
"fmt"
"log"
"testing"
"time"
. "v5sdk_go/ws/wImpl"
)
const (
TRADE_ACCOUNT = iota
ISOLATE_ACCOUNT
CROSS_ACCOUNT
CROSS_ACCOUNT_B
)
func init() {
log.SetFlags(log.LstdFlags | log.Llongfile)
}
func prework() *WsClient {
ep := "wss://wsaws.okex.com:8443/ws/v5/private"
r, err := NewWsClient(ep)
if err != nil {
log.Fatal(err)
}
err = r.Start()
if err != nil {
log.Fatal(err, ep)
}
return r
}
func prework_pri(t int) *WsClient {
// 模拟环境
ep := "wss://wsaws.okex.com:8443/ws/v5/private"
var apikey, passphrase, secretKey string
// 把账号密码写这里
switch t {
case TRADE_ACCOUNT:
apikey = "fe468418-5e40-433f-8d04-04951286d417"
passphrase = "M4pw71Id"
secretKey = "D6D74DF9DD60A25BE2B27CA71D8F814D"
case ISOLATE_ACCOUNT:
apikey = "fe468418-5e40-433f-8d04-04951286d417"
passphrase = "M4pw71Id"
secretKey = "D6D74DF9DD60A25BE2B27CA71D8F814D"
case CROSS_ACCOUNT:
apikey = "fe468418-5e40-433f-8d04-04951286d417"
passphrase = "M4pw71Id"
secretKey = "D6D74DF9DD60A25BE2B27CA71D8F814D"
case CROSS_ACCOUNT_B:
apikey = "fe468418-5e40-433f-8d04-04951286d417"
passphrase = "M4pw71Id"
secretKey = "D6D74DF9DD60A25BE2B27CA71D8F814D"
}
r, err := NewWsClient(ep)
if err != nil {
log.Fatal(err)
}
err = r.Start()
if err != nil {
log.Fatal(err)
}
var res bool
start := time.Now()
res, _, err = r.Login(apikey, secretKey, passphrase)
if res {
usedTime := time.Since(start)
fmt.Println("登录成功!", usedTime.String())
} else {
log.Fatal("登录失败!", err)
}
fmt.Println(apikey, secretKey, passphrase)
return r
}
func TestAddBookedDataHook(t *testing.T) {
var r *WsClient
/*订阅私有频道*/
{
r = prework_pri(CROSS_ACCOUNT)
var res bool
var err error
r.AddBookMsgHook(func(ts time.Time, data MsgData) error {
// 添加你的方法
fmt.Println("这是自定义AddBookMsgHook")
fmt.Println("当前数据是", data)
return nil
})
param := map[string]string{}
param["channel"] = "account"
param["ccy"] = "BTC"
res, _, err = r.Subscribe(param)
if res {
fmt.Println("订阅成功!")
} else {
fmt.Println("订阅失败!", err)
t.Fatal("订阅失败!", err)
//return
}
time.Sleep(100 * time.Second)
}
//订阅公共频道
{
r = prework()
var res bool
var err error
// r.AddBookMsgHook(func(ts time.Time, data MsgData) error {
// 添加你的方法
// fmt.Println("这是公共自定义AddBookMsgHook")
// fmt.Println("当前数据是", data)
// return nil
// })
param := map[string]string{}
param["channel"] = "instruments"
param["instType"] = "FUTURES"
res, _, err = r.Subscribe(param)
if res {
fmt.Println("订阅成功!")
} else {
fmt.Println("订阅失败!", err)
t.Fatal("订阅失败!", err)
//return
}
select {}
}
}

725
ws/ws_cli.go Normal file
View File

@ -0,0 +1,725 @@
package ws
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"regexp"
"runtime/debug"
"sync"
"time"
. "v5sdk_go/config"
. "v5sdk_go/utils"
. "v5sdk_go/ws/wImpl"
"github.com/gorilla/websocket"
)
// 全局回调函数
type ReceivedDataCallback func(*Msg) error
// 普通订阅推送数据回调函数
type ReceivedMsgDataCallback func(time.Time, MsgData) error
// 深度订阅推送数据回调函数
type ReceivedDepthDataCallback func(time.Time, DepthData) error
// websocket
type WsClient struct {
WsEndPoint string
WsApi *ApiInfo
conn *websocket.Conn
sendCh chan string //发消息队列
resCh chan *Msg //收消息队列
errCh chan *Msg
regCh map[Event]chan *Msg //请求响应队列
quitCh chan struct{}
lock sync.RWMutex
onMessageHook ReceivedDataCallback //全局消息回调函数
onBookMsgHook ReceivedMsgDataCallback //普通订阅消息回调函数
onDepthHook ReceivedDepthDataCallback //深度订阅消息回调函数
OnErrorHook ReceivedDataCallback //错误处理回调函数
// 记录深度信息
DepthDataList map[string]DepthDetail
autoDepthMgr bool // 深度数据管理checksum等
DepthDataLock sync.RWMutex
isStarted bool //防止重复启动和关闭
dailTimeout time.Duration
}
/*
服务端响应详细信息
Timestamp: 接受到消息的时间
Info: 接受到的消息字符串
*/
type Msg struct {
Timestamp time.Time `json:"timestamp"`
Info interface{} `json:"info"`
}
func (this *Msg) Print() {
fmt.Println("【消息时间】", this.Timestamp.Format("2006-01-02 15:04:05.000"))
str, _ := json.Marshal(this.Info)
fmt.Println("【消息内容】", string(str))
}
/*
订阅结果封装后的消息结构体
*/
type ProcessDetail struct {
EndPoint string `json:"endPoint"`
ReqInfo string `json:"ReqInfo"` //订阅请求
SendTime time.Time `json:"sendTime"` //发送订阅请求的时间
RecvTime time.Time `json:"recvTime"` //接受到订阅结果的时间
UsedTime time.Duration `json:"UsedTime"` //耗时
Data []*Msg `json:"data"` //订阅结果数据
}
func (p *ProcessDetail) String() string {
data, _ := json.Marshal(p)
return string(data)
}
// 创建ws对象
func NewWsClient(ep string) (r *WsClient, err error) {
if ep == "" {
err = errors.New("websocket endpoint cannot be null")
return
}
r = &WsClient{
WsEndPoint: ep,
sendCh: make(chan string),
resCh: make(chan *Msg),
errCh: make(chan *Msg),
regCh: make(map[Event]chan *Msg),
//cbs: make(map[Event]ReceivedDataCallback),
quitCh: make(chan struct{}),
DepthDataList: make(map[string]DepthDetail),
dailTimeout: time.Second * 10,
// 自动深度校验默认开启
autoDepthMgr: true,
}
return
}
/*
新增记录深度信息
*/
func (a *WsClient) addDepthDataList(key string, dd DepthDetail) error {
a.DepthDataLock.Lock()
defer a.DepthDataLock.Unlock()
a.DepthDataList[key] = dd
return nil
}
/*
更新记录深度信息如果没有记录不会更新成功
*/
func (a *WsClient) updateDepthDataList(key string, dd DepthDetail) error {
a.DepthDataLock.Lock()
defer a.DepthDataLock.Unlock()
if _, ok := a.DepthDataList[key]; !ok {
return errors.New("更新失败!未发现记录" + key)
}
a.DepthDataList[key] = dd
return nil
}
/*
删除记录深度信息
*/
func (a *WsClient) deleteDepthDataList(key string) error {
a.DepthDataLock.Lock()
defer a.DepthDataLock.Unlock()
delete(a.DepthDataList, key)
return nil
}
/*
设置是否自动深度管理开启 true关闭 false
*/
func (a *WsClient) EnableAutoDepthMgr(b bool) error {
a.DepthDataLock.Lock()
defer a.DepthDataLock.Unlock()
if len(a.DepthDataList) != 0 {
err := errors.New("当前有深度数据处于订阅中")
return err
}
a.autoDepthMgr = b
return nil
}
/*
获取当前的深度快照信息(合并后的)
*/
func (a *WsClient) GetSnapshotByChannel(data DepthData) (snapshot *DepthDetail, err error) {
key, err := json.Marshal(data.Arg)
if err != nil {
return
}
a.DepthDataLock.Lock()
defer a.DepthDataLock.Unlock()
val, ok := a.DepthDataList[string(key)]
if !ok {
return
}
snapshot = new(DepthDetail)
raw, err := json.Marshal(val)
if err != nil {
return
}
err = json.Unmarshal(raw, &snapshot)
if err != nil {
return
}
return
}
// 设置dial超时时间
func (a *WsClient) SetDailTimeout(tm time.Duration) {
a.dailTimeout = tm
}
// 非阻塞启动
func (a *WsClient) Start() error {
a.lock.RLock()
if a.isStarted {
a.lock.RUnlock()
fmt.Println("ws已经启动")
return nil
} else {
a.lock.RUnlock()
a.lock.Lock()
defer a.lock.Unlock()
// 增加超时处理
done := make(chan struct{})
ctx, cancel := context.WithTimeout(context.Background(), a.dailTimeout)
defer cancel()
go func() {
defer func() {
close(done)
}()
var c *websocket.Conn
fmt.Println("a.WsEndPoint: ", a.WsEndPoint)
c, _, err := websocket.DefaultDialer.Dial(a.WsEndPoint, nil)
if err != nil {
err = errors.New("dial error:" + err.Error())
return
}
a.conn = c
}()
select {
case <-ctx.Done():
err := errors.New("连接超时退出!")
return err
case <-done:
}
//TODO 自定义的推送消息回调回来试试放在这里
go a.receive()
go a.work()
a.isStarted = true
log.Println("客户端已启动!", a.WsEndPoint)
return nil
}
}
// 客户端退出消息channel
func (a *WsClient) IsQuit() <-chan struct{} {
return a.quitCh
}
func (a *WsClient) work() {
defer func() {
a.Stop()
err := recover()
if err != nil {
log.Printf("work End. Recover msg: %+v", a)
debug.PrintStack()
}
}()
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C: // 保持心跳
// go a.Ping(1000)
go func() {
_, _, err := a.Ping(1000)
if err != nil {
fmt.Println("心跳检测失败!", err)
a.Stop()
return
}
}()
case <-a.quitCh: // 保持心跳
return
case data, ok := <-a.resCh: //接收到服务端发来的消息
if !ok {
return
}
//log.Println("接收到来自resCh的消息:", data)
if a.onMessageHook != nil {
err := a.onMessageHook(data)
if err != nil {
log.Println("执行onMessageHook函数错误", err)
}
}
case errMsg, ok := <-a.errCh: //错误处理
if !ok {
return
}
if a.OnErrorHook != nil {
err := a.OnErrorHook(errMsg)
if err != nil {
log.Println("执行OnErrorHook函数错误", err)
}
}
case req, ok := <-a.sendCh: //从发送队列中取出数据发送到服务端
if !ok {
return
}
//log.Println("接收到来自req的消息:", req)
err := a.conn.WriteMessage(websocket.TextMessage, []byte(req))
if err != nil {
log.Printf("发送请求失败: %s\n", err)
return
}
log.Printf("[发送请求] %v\n", req)
}
}
}
/*
处理接受到的消息
*/
func (a *WsClient) receive() {
defer func() {
a.Stop()
err := recover()
if err != nil {
log.Printf("Receive End. Recover msg: %+v", a)
debug.PrintStack()
}
}()
for {
messageType, message, err := a.conn.ReadMessage()
if err != nil {
if a.isStarted {
log.Println("receive message error!" + err.Error())
}
break
}
txtMsg := message
switch messageType {
case websocket.TextMessage:
case websocket.BinaryMessage:
txtMsg, err = GzipDecode(message)
if err != nil {
log.Println("解压失败!")
continue
}
}
log.Println("[收到消息]", string(txtMsg))
//发送结果到默认消息处理通道
timestamp := time.Now()
msg := &Msg{Timestamp: timestamp, Info: string(txtMsg)}
a.resCh <- msg
evt, data, err := a.parseMessage(txtMsg)
if err != nil {
log.Println("解析消息失败!", err)
continue
}
//log.Println("解析消息成功!消息类型 =", evt)
a.lock.RLock()
ch, ok := a.regCh[evt]
a.lock.RUnlock()
if !ok {
//只有推送消息才会主动创建通道和消费队列
if evt == EVENT_BOOKED_DATA || evt == EVENT_DEPTH_DATA {
//log.Println("channel不存在event:", evt)
//a.lock.RUnlock()
a.lock.Lock()
a.regCh[evt] = make(chan *Msg)
ch = a.regCh[evt]
a.lock.Unlock()
//log.Println("创建", evt, "通道")
// 创建消费队列
go func(evt Event) {
//log.Println("创建goroutine evt:", evt)
for msg := range a.regCh[evt] {
//log.Println(msg)
// msg.Print()
switch evt {
// 处理普通推送数据
case EVENT_BOOKED_DATA:
fn := a.onBookMsgHook
if fn != nil {
err = fn(msg.Timestamp, msg.Info.(MsgData))
if err != nil {
log.Println("订阅数据回调函数执行失败!", err)
}
//log.Println("函数执行成功!", err)
}
// 处理深度推送数据
case EVENT_DEPTH_DATA:
fn := a.onDepthHook
depData := msg.Info.(DepthData)
// 开启深度数据管理功能的,会合并深度数据
if a.autoDepthMgr {
a.MergeDepth(depData)
}
// 运行用户定义回调函数
if fn != nil {
err = fn(msg.Timestamp, msg.Info.(DepthData))
if err != nil {
log.Println("深度回调函数执行失败!", err)
}
}
}
}
//log.Println("退出goroutine evt:", evt)
}(evt)
//continue
} else {
//log.Println("程序异常!通道已关闭", evt)
continue
}
}
//log.Println(evt,"事件已注册",ch)
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Millisecond*1000)
select {
/*
丢弃消息容易引发数据处理处理错误
*/
// case <-ctx.Done():
// log.Println("等待超时,消息丢弃 - ", data)
case ch <- &Msg{Timestamp: timestamp, Info: data}:
}
cancel()
}
}
/*
开启了深度数据管理功能后系统会自动合并深度信息
*/
func (a *WsClient) MergeDepth(depData DepthData) (err error) {
if !a.autoDepthMgr {
return
}
key, err := json.Marshal(depData.Arg)
if err != nil {
err = errors.New("数据错误")
return
}
// books5 不需要做checksum
if depData.Arg["channel"] == "books5" {
a.addDepthDataList(string(key), depData.Data[0])
return
}
if depData.Action == "snapshot" {
_, err = depData.CheckSum(nil)
if err != nil {
log.Println("校验失败", err)
return
}
a.addDepthDataList(string(key), depData.Data[0])
} else {
var newSnapshot *DepthDetail
a.DepthDataLock.RLock()
oldSnapshot, ok := a.DepthDataList[string(key)]
if !ok {
log.Println("深度数据错误,全量数据未发现!")
err = errors.New("数据错误")
return
}
a.DepthDataLock.RUnlock()
newSnapshot, err = depData.CheckSum(&oldSnapshot)
if err != nil {
log.Println("深度校验失败", err)
err = errors.New("校验失败")
return
}
a.updateDepthDataList(string(key), *newSnapshot)
}
return
}
/*
通过ErrorCode判断事件类型
*/
func GetInfoFromErrCode(data ErrData) Event {
switch data.Code {
case "60001":
return EVENT_LOGIN
case "60002":
return EVENT_LOGIN
case "60003":
return EVENT_LOGIN
case "60004":
return EVENT_LOGIN
case "60005":
return EVENT_LOGIN
case "60006":
return EVENT_LOGIN
case "60007":
return EVENT_LOGIN
case "60008":
return EVENT_LOGIN
case "60009":
return EVENT_LOGIN
case "60010":
return EVENT_LOGIN
case "60011":
return EVENT_LOGIN
}
return EVENT_UNKNOWN
}
/*
从error返回中解析出对应的channel
error信息样例
{"event":"error","msg":"channel:index-tickers,instId:BTC-USDT1 doesn't exist","code":"60018"}
*/
func GetInfoFromErrMsg(raw string) (channel string) {
reg := regexp.MustCompile(`channel:(.*?),`)
if reg == nil {
fmt.Println("MustCompile err")
return
}
//提取关键信息
result := reg.FindAllStringSubmatch(raw, -1)
for _, text := range result {
channel = text[1]
}
return
}
/*
解析消息类型
*/
func (a *WsClient) parseMessage(raw []byte) (evt Event, data interface{}, err error) {
evt = EVENT_UNKNOWN
//log.Println("解析消息")
//log.Println("消息内容:", string(raw))
if string(raw) == "pong" {
evt = EVENT_PING
data = raw
return
}
//log.Println(0, evt)
var rspData = RspData{}
err = json.Unmarshal(raw, &rspData)
if err == nil {
op := rspData.Event
if op == OP_SUBSCRIBE || op == OP_UNSUBSCRIBE {
channel := rspData.Arg["channel"]
evt = GetEventId(channel)
data = rspData
return
}
}
//log.Println("ErrData")
var errData = ErrData{}
err = json.Unmarshal(raw, &errData)
if err == nil {
op := errData.Event
switch op {
case OP_LOGIN:
evt = EVENT_LOGIN
data = errData
//log.Println(3, evt)
return
case OP_ERROR:
data = errData
// TODO:细化报错对应的事件判断
//尝试从msg字段中解析对应的事件类型
evt = GetInfoFromErrCode(errData)
if evt != EVENT_UNKNOWN {
return
}
evt = GetEventId(GetInfoFromErrMsg(errData.Msg))
if evt == EVENT_UNKNOWN {
evt = EVENT_ERROR
return
}
return
}
//log.Println(5, evt)
}
//log.Println("JRPCRsp")
var jRPCRsp = JRPCRsp{}
err = json.Unmarshal(raw, &jRPCRsp)
if err == nil {
data = jRPCRsp
evt = GetEventId(jRPCRsp.Op)
if evt != EVENT_UNKNOWN {
return
}
}
var depthData = DepthData{}
err = json.Unmarshal(raw, &depthData)
if err == nil {
evt = EVENT_DEPTH_DATA
data = depthData
//log.Println("-->>EVENT_DEPTH_DATA", evt)
//log.Println(evt, data)
//log.Println(6)
switch depthData.Arg["channel"] {
case "books":
return
case "books-l2-tbt":
return
case "books50-l2-tbt":
return
case "books5":
return
default:
}
}
//log.Println("MsgData")
var msgData = MsgData{}
err = json.Unmarshal(raw, &msgData)
if err == nil {
evt = EVENT_BOOKED_DATA
data = msgData
//log.Println("-->>EVENT_BOOK_DATA", evt)
//log.Println(evt, data)
//log.Println(6)
return
}
evt = EVENT_UNKNOWN
err = errors.New("message unknown")
return
}
func (a *WsClient) Stop() error {
a.lock.Lock()
defer a.lock.Unlock()
if !a.isStarted {
return nil
}
a.isStarted = false
if a.conn != nil {
a.conn.Close()
}
close(a.errCh)
close(a.sendCh)
close(a.resCh)
close(a.quitCh)
for _, ch := range a.regCh {
close(ch)
}
log.Println("ws客户端退出!")
return nil
}
/*
添加全局消息处理的回调函数
*/
func (a *WsClient) AddMessageHook(fn ReceivedDataCallback) error {
a.onMessageHook = fn
return nil
}
/*
添加订阅消息处理的回调函数
*/
func (a *WsClient) AddBookMsgHook(fn ReceivedMsgDataCallback) error {
a.onBookMsgHook = fn
return nil
}
/*
添加深度消息处理的回调函数
例如:
cli.AddDepthHook(func(ts time.Time, data DepthData) error { return nil })
*/
func (a *WsClient) AddDepthHook(fn ReceivedDepthDataCallback) error {
a.onDepthHook = fn
return nil
}
/*
添加错误类型消息处理的回调函数
*/
func (a *WsClient) AddErrMsgHook(fn ReceivedDataCallback) error {
a.OnErrorHook = fn
return nil
}
/*
判断连接是否存活
*/
func (a *WsClient) IsAlive() bool {
res := false
if a.conn == nil {
return res
}
res, _, _ = a.Ping(500)
return res
}

18
ws/ws_contants.go Normal file
View File

@ -0,0 +1,18 @@
package ws
//操作符
const (
OP_LOGIN = "login"
OP_ERROR = "error"
OP_SUBSCRIBE = "subscribe"
OP_UNSUBSCRIBE = "unsubscribe"
)
// instrument Type
const (
SPOT = "SPOT"
SWAP = "SWAP"
FUTURES = "FUTURES"
OPTION = "OPTION"
ANY = "ANY"
)

157
ws/ws_jrpc.go Normal file
View File

@ -0,0 +1,157 @@
package ws
import (
"context"
"log"
"time"
. "v5sdk_go/ws/wImpl"
)
/*
websocket交易 通用请求
参数说明
evtId封装的事件类型
id: 请求ID
op: 请求参数op
params: 请求参数
timeOut: 超时时间
*/
func (a *WsClient) jrpcReq(evtId Event, op string, id string, params []map[string]interface{}, timeOut ...int) (res bool, detail *ProcessDetail, err error) {
res = true
tm := 5000
if len(timeOut) != 0 {
tm = timeOut[0]
}
req := &JRPCReq{
Id: id,
Op: op,
Args: params,
}
detail = &ProcessDetail{
EndPoint: a.WsEndPoint,
}
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Duration(tm)*time.Millisecond)
defer cancel()
ctx = context.WithValue(ctx, "detail", detail)
msg, err := a.process(ctx, evtId, req)
if err != nil {
res = false
log.Println("处理请求失败!", req, err)
return
}
detail.Data = msg
res, err = checkResult(req, msg)
if err != nil {
res = false
return
}
return
}
/*
单个下单
参数说明
id: 请求ID
params: 请求参数
timeOut: 超时时间
*/
func (a *WsClient) PlaceOrder(id string, param map[string]interface{}, timeOut ...int) (res bool, detail *ProcessDetail, err error) {
op := "order"
evtId := EVENT_PLACE_ORDER
var args []map[string]interface{}
args = append(args, param)
return a.jrpcReq(evtId, op, id, args, timeOut...)
}
/*
批量下单
参数说明
id: 请求ID
params: 请求参数
timeOut: 超时时间
*/
func (a *WsClient) BatchPlaceOrders(id string, params []map[string]interface{}, timeOut ...int) (res bool, detail *ProcessDetail, err error) {
op := "batch-orders"
evtId := EVENT_PLACE_BATCH_ORDERS
return a.jrpcReq(evtId, op, id, params, timeOut...)
}
/*
单个撤单
参数说明
id: 请求ID
params: 请求参数
timeOut: 超时时间
*/
func (a *WsClient) CancelOrder(id string, param map[string]interface{}, timeOut ...int) (res bool, detail *ProcessDetail, err error) {
op := "cancel-order"
evtId := EVENT_CANCEL_ORDER
var args []map[string]interface{}
args = append(args, param)
return a.jrpcReq(evtId, op, id, args, timeOut...)
}
/*
批量撤单
参数说明
id: 请求ID
params: 请求参数
timeOut: 超时时间
*/
func (a *WsClient) BatchCancelOrders(id string, params []map[string]interface{}, timeOut ...int) (res bool, detail *ProcessDetail, err error) {
op := "batch-cancel-orders"
evtId := EVENT_CANCEL_BATCH_ORDERS
return a.jrpcReq(evtId, op, id, params, timeOut...)
}
/*
单个改单
参数说明
id: 请求ID
params: 请求参数
timeOut: 超时时间
*/
func (a *WsClient) AmendOrder(id string, param map[string]interface{}, timeOut ...int) (res bool, detail *ProcessDetail, err error) {
op := "amend-order"
evtId := EVENT_AMEND_ORDER
var args []map[string]interface{}
args = append(args, param)
return a.jrpcReq(evtId, op, id, args, timeOut...)
}
/*
批量改单
参数说明
id: 请求ID
params: 请求参数
timeOut: 超时时间
*/
func (a *WsClient) BatchAmendOrders(id string, params []map[string]interface{}, timeOut ...int) (res bool, detail *ProcessDetail, err error) {
op := "batch-amend-orders"
evtId := EVENT_AMEND_BATCH_ORDERS
return a.jrpcReq(evtId, op, id, params, timeOut...)
}

186
ws/ws_jrpc_test.go Normal file
View File

@ -0,0 +1,186 @@
package ws
import (
"fmt"
"testing"
"time"
. "v5sdk_go/ws/wImpl"
)
func PrintDetail(d *ProcessDetail) {
fmt.Println("[详细信息]")
fmt.Println("请求地址:", d.EndPoint)
fmt.Println("请求内容:", d.ReqInfo)
fmt.Println("发送时间:", d.SendTime.Format("2006-01-02 15:04:05.000"))
fmt.Println("响应时间:", d.RecvTime.Format("2006-01-02 15:04:05.000"))
fmt.Println("耗时:", d.UsedTime.String())
fmt.Printf("接受到 %v 条消息:\n", len(d.Data))
for _, v := range d.Data {
fmt.Printf("[%v] %v\n", v.Timestamp.Format("2006-01-02 15:04:05.000"), v.Info)
}
}
func (r *WsClient) makeOrder(instId string, tdMode string, side string, ordType string, px string, sz string) (orderId string, err error) {
var res bool
var data *ProcessDetail
param := map[string]interface{}{}
param["instId"] = instId
param["tdMode"] = tdMode
param["side"] = side
param["ordType"] = ordType
if px != "" {
param["px"] = px
}
param["sz"] = sz
res, data, err = r.PlaceOrder("0011", param)
if err != nil {
return
}
if res && len(data.Data) == 1 {
rsp := data.Data[0].Info.(JRPCRsp)
if len(rsp.Data) == 1 {
val, ok := rsp.Data[0]["ordId"]
if !ok {
return
}
orderId = val.(string)
return
}
}
return
}
/*
单个下单
*/
// func TestPlaceOrder(t *testing.T) {
// r := prework_pri(CROSS_ACCOUNT)
// r := prework_pri(TRADE_ACCOUNT)
// var res bool
// var err error
// var data *ProcessDetail
//
// start := time.Now()
// param := map[string]interface{}{}
// param["instId"] = "BTC-USDT"
// param["tdMode"] = "cash"
// param["side"] = "buy"
// param["ordType"] = "market"
// param["px"] = "1"
// param["sz"] = "200"
//
// res, data, err = r.PlaceOrder("0011", param)
// if res {
// usedTime := time.Since(start)
// fmt.Println("下单成功!", usedTime.String())
// PrintDetail(data)
// } else {
// usedTime := time.Since(start)
// fmt.Println("下单失败!", usedTime.String(), err)
// }
//
// }
/*
批量下单
*/
// func TestPlaceBatchOrder(t *testing.T) {
// r := prework_pri(CROSS_ACCOUNT)
// var res bool
// var err error
// var data *ProcessDetail
//
// start := time.Now()
// var params []map[string]interface{}
// param := map[string]interface{}{}
// param["instId"] = "BTC-USDT"
// param["tdMode"] = "cash"
// param["side"] = "sell"
// param["ordType"] = "market"
// param["sz"] = "0.001"
// params = append(params, param)
// param = map[string]interface{}{}
// param["instId"] = "BTC-USDT"
// param["tdMode"] = "cash"
// param["side"] = "buy"
// param["ordType"] = "market"
// param["sz"] = "100"
// params = append(params, param)
// res, data, err = r.BatchPlaceOrders("001", params)
// usedTime := time.Since(start)
// if err != nil {
// fmt.Println("下单失败!", err, usedTime.String())
// t.Fail()
// }
// if res {
// fmt.Println("下单成功!", usedTime.String())
// PrintDetail(data)
// } else {
//
// fmt.Println("下单失败!", usedTime.String())
// t.Fail()
// }
//
// }
/*
撤销订单
*/
// func TestCancelOrder(t *testing.T) {
// r := prework_pri(CROSS_ACCOUNT)
//
// 用户自定义limit限价价格
// ordId, _ := r.makeOrder("BTC-USDT", "cash", "sell", "limit", "57000", "0.01")
// if ordId == "" {
// t.Fatal()
// }
//
// t.Log("生成挂单orderId=", ordId)
//
// param := map[string]interface{}{}
// param["instId"] = "BTC-USDT"
// param["ordId"] = ordId
// start := time.Now()
// res, _, _ := r.CancelOrder("1", param)
// if res {
// usedTime := time.Since(start)
// fmt.Println("撤单成功!", usedTime.String())
// } else {
// t.Fatal("撤单失败!")
// }
// }
/*
修改订单
*/
func TestAmendlOrder(t *testing.T) {
r := prework_pri(CROSS_ACCOUNT)
// 用户自定义limit限价价格
ordId, _ := r.makeOrder("BTC-USDT", "cash", "sell", "limit", "57000", "0.01")
if ordId == "" {
t.Fatal()
}
t.Log("生成挂单orderId=", ordId)
param := map[string]interface{}{}
param["instId"] = "BTC-USDT"
param["ordId"] = ordId
// 调整修改订单的参数
//param["newSz"] = "0.02"
param["newPx"] = "57001"
start := time.Now()
res, _, _ := r.AmendOrder("1", param)
if res {
usedTime := time.Since(start)
fmt.Println("修改订单成功!", usedTime.String())
} else {
t.Fatal("修改订单失败!")
}
}

19
ws/ws_middleware.go Normal file
View File

@ -0,0 +1,19 @@
package ws
import "fmt"
type ReqFunc func(...interface{}) (res bool, msg *Msg, err error)
type Decorator func(ReqFunc) ReqFunc
func handler(h ReqFunc, decors ...Decorator) ReqFunc {
for i := range decors {
d := decors[len(decors)-1-i]
h = d(h)
}
return h
}
func preprocess() (res bool, msg *Msg, err error) {
fmt.Println("preprocess")
return
}

532
ws/ws_op.go Normal file
View File

@ -0,0 +1,532 @@
package ws
import (
"context"
"errors"
"log"
"sync"
"time"
. "v5sdk_go/config"
"v5sdk_go/rest"
. "v5sdk_go/utils"
. "v5sdk_go/ws/wImpl"
. "v5sdk_go/ws/wInterface"
)
/*
Ping服务端保持心跳
timeOut:超时时间(毫秒)如果不填默认为5000ms
*/
func (a *WsClient) Ping(timeOut ...int) (res bool, detail *ProcessDetail, err error) {
tm := 5000
if len(timeOut) != 0 {
tm = timeOut[0]
}
res = true
detail = &ProcessDetail{
EndPoint: a.WsEndPoint,
}
ctx := context.Background()
ctx, _ = context.WithTimeout(ctx, time.Duration(tm)*time.Millisecond)
ctx = context.WithValue(ctx, "detail", detail)
msg, err := a.process(ctx, EVENT_PING, nil)
if err != nil {
res = false
log.Println("处理请求失败!", err)
return
}
detail.Data = msg
if len(msg) == 0 {
res = false
return
}
str := string(msg[0].Info.([]byte))
if str != "pong" {
res = false
return
}
return
}
/*
登录私有频道
*/
func (a *WsClient) Login(apiKey, secKey, passPhrase string, timeOut ...int) (res bool, detail *ProcessDetail, err error) {
if apiKey == "" {
err = errors.New("ApiKey cannot be null")
return
}
if secKey == "" {
err = errors.New("SecretKey cannot be null")
return
}
if passPhrase == "" {
err = errors.New("Passphrase cannot be null")
return
}
a.WsApi = &ApiInfo{
ApiKey: apiKey,
SecretKey: secKey,
Passphrase: passPhrase,
}
tm := 5000
if len(timeOut) != 0 {
tm = timeOut[0]
}
res = true
timestamp := EpochTime()
preHash := PreHashString(timestamp, rest.GET, "/users/self/verify", "")
//fmt.Println("preHash:", preHash)
var sign string
if sign, err = HmacSha256Base64Signer(preHash, secKey); err != nil {
log.Println("处理签名失败!", err)
return
}
args := map[string]string{}
args["apiKey"] = apiKey
args["passphrase"] = passPhrase
args["timestamp"] = timestamp
args["sign"] = sign
req := &ReqData{
Op: OP_LOGIN,
Args: []map[string]string{args},
}
detail = &ProcessDetail{
EndPoint: a.WsEndPoint,
}
ctx := context.Background()
ctx, _ = context.WithTimeout(ctx, time.Duration(tm)*time.Millisecond)
ctx = context.WithValue(ctx, "detail", detail)
msg, err := a.process(ctx, EVENT_LOGIN, req)
if err != nil {
res = false
log.Println("处理请求失败!", req, err)
return
}
detail.Data = msg
if len(msg) == 0 {
res = false
return
}
info, _ := msg[0].Info.(ErrData)
if info.Code == "0" && info.Event == OP_LOGIN {
log.Println("登录成功!")
} else {
log.Println("登录失败!")
res = false
return
}
return
}
/*
等待结果响应
*/
func (a *WsClient) waitForResult(e Event, timeOut int) (data interface{}, err error) {
if _, ok := a.regCh[e]; !ok {
a.lock.Lock()
a.regCh[e] = make(chan *Msg)
a.lock.Unlock()
//log.Println("注册", e, "事件成功")
}
a.lock.RLock()
defer a.lock.RUnlock()
ch := a.regCh[e]
//log.Println(e, "等待响应!")
select {
case <-time.After(time.Duration(timeOut) * time.Millisecond):
log.Println(e, "超时未响应!")
err = errors.New(e.String() + "超时未响应!")
return
case data = <-ch:
//log.Println(data)
}
return
}
/*
发送消息到服务端
*/
func (a *WsClient) Send(ctx context.Context, op WSReqData) (err error) {
select {
case <-ctx.Done():
log.Println("发生失败退出!")
err = errors.New("发送超时退出!")
case a.sendCh <- op.ToString():
}
return
}
func (a *WsClient) process(ctx context.Context, e Event, op WSReqData) (data []*Msg, err error) {
defer func() {
_ = recover()
}()
var detail *ProcessDetail
if val := ctx.Value("detail"); val != nil {
detail = val.(*ProcessDetail)
} else {
detail = &ProcessDetail{
EndPoint: a.WsEndPoint,
}
}
defer func() {
//fmt.Println("处理完成,", e.String())
detail.UsedTime = detail.RecvTime.Sub(detail.SendTime)
}()
//查看事件是否被注册
if _, ok := a.regCh[e]; !ok {
a.lock.Lock()
a.regCh[e] = make(chan *Msg)
a.lock.Unlock()
//log.Println("注册", e, "事件成功")
} else {
//log.Println("事件", e, "已注册!")
err = errors.New("事件" + e.String() + "尚未处理完毕")
return
}
//预期请求响应的条数
expectCnt := 1
if op != nil {
expectCnt = op.Len()
}
recvCnt := 0
//等待完成通知
wg := sync.WaitGroup{}
wg.Add(1)
// 这里要先定义go routine func(){} 是为了在里面订阅channel的内容 我们知道一个队列要想往里塞东西,必先给他安排一个订阅它的协程
go func(ctx context.Context) {
defer func() {
a.lock.Lock()
delete(a.regCh, e)
//log.Println("事件已注销!",e)
a.lock.Unlock()
wg.Done()
}()
a.lock.RLock()
ch := a.regCh[e] //请求响应队列
a.lock.RUnlock()
//log.Println(e, "等待响应!")
done := false
ok := true
for {
var item *Msg
select {
case <-ctx.Done():
log.Println(e, "超时未响应!")
err = errors.New(e.String() + "超时未响应!")
return
case item, ok = <-ch:
if !ok {
return
}
detail.RecvTime = time.Now()
//log.Println(e, "接受到数据", item)
// 这里只是把推送的数据显示出来,并没有做更近一步的处理,后续可以二次开发,在这个位置上进行处理
data = append(data, item)
recvCnt++
//log.Println(data)
if recvCnt == expectCnt {
done = true
break
}
}
if done {
break
}
}
if ok {
close(ch)
}
}(ctx)
//
switch e {
case EVENT_PING:
msg := "ping"
detail.ReqInfo = msg
a.sendCh <- msg
detail.SendTime = time.Now()
default:
detail.ReqInfo = op.ToString()
//这个时候ctx中已经提供了meta信息用于发送ws请求
err = a.Send(ctx, op)
if err != nil {
log.Println("发送[", e, "]消息失败!", err)
return
}
detail.SendTime = time.Now()
}
wg.Wait()
return
}
/*
根据args请求参数判断请求类型
{"channel": "account","ccy": "BTC"} 类型为 EVENT_BOOK_ACCOUNT
*/
func GetEventByParam(param map[string]string) (evtId Event) {
evtId = EVENT_UNKNOWN
channel, ok := param["channel"]
if !ok {
return
}
evtId = GetEventId(channel)
return
}
/*
订阅频道
req请求json字符串
*/
func (a *WsClient) Subscribe(param map[string]string, timeOut ...int) (res bool, detail *ProcessDetail, err error) {
res = true
tm := 5000
if len(timeOut) != 0 {
tm = timeOut[0]
}
evtid := GetEventByParam(param)
if evtid == EVENT_UNKNOWN {
err = errors.New("非法的请求参数!")
return
}
var args []map[string]string
args = append(args, param)
req := ReqData{
Op: OP_SUBSCRIBE,
Args: args,
}
detail = &ProcessDetail{
EndPoint: a.WsEndPoint,
}
ctx := context.Background()
ctx, _ = context.WithTimeout(ctx, time.Duration(tm)*time.Millisecond)
ctx = context.WithValue(ctx, "detail", detail)
msg, err := a.process(ctx, evtid, req)
if err != nil {
res = false
log.Println("处理请求失败!", req, err)
return
}
detail.Data = msg
//检查所有频道是否都更新成功
res, err = checkResult(req, msg)
if err != nil {
res = false
return
}
return
}
/*
取消订阅频道
req请求json字符串
*/
func (a *WsClient) UnSubscribe(param map[string]string, timeOut ...int) (res bool, detail *ProcessDetail, err error) {
res = true
tm := 5000
if len(timeOut) != 0 {
tm = timeOut[0]
}
evtid := GetEventByParam(param)
if evtid == EVENT_UNKNOWN {
err = errors.New("非法的请求参数!")
return
}
var args []map[string]string
args = append(args, param)
req := ReqData{
Op: OP_UNSUBSCRIBE,
Args: args,
}
detail = &ProcessDetail{
EndPoint: a.WsEndPoint,
}
ctx := context.Background()
ctx, _ = context.WithTimeout(ctx, time.Duration(tm)*time.Millisecond)
ctx = context.WithValue(ctx, "detail", detail)
msg, err := a.process(ctx, evtid, req)
if err != nil {
res = false
log.Println("处理请求失败!", req, err)
return
}
detail.Data = msg
//检查所有频道是否都更新成功
res, err = checkResult(req, msg)
if err != nil {
res = false
return
}
return
}
/*
jrpc请求
*/
func (a *WsClient) Jrpc(id, op string, params []map[string]interface{}, timeOut ...int) (res bool, detail *ProcessDetail, err error) {
res = true
tm := 5000
if len(timeOut) != 0 {
tm = timeOut[0]
}
evtid := GetEventId(op)
if evtid == EVENT_UNKNOWN {
err = errors.New("非法的请求参数!")
return
}
req := JRPCReq{
Id: id,
Op: op,
Args: params,
}
detail = &ProcessDetail{
EndPoint: a.WsEndPoint,
}
ctx := context.Background()
ctx, _ = context.WithTimeout(ctx, time.Duration(tm)*time.Millisecond)
ctx = context.WithValue(ctx, "detail", detail)
msg, err := a.process(ctx, evtid, req)
if err != nil {
res = false
log.Println("处理请求失败!", req, err)
return
}
detail.Data = msg
//检查所有频道是否都更新成功
res, err = checkResult(req, msg)
if err != nil {
res = false
return
}
return
}
func (a *WsClient) PubChannel(evtId Event, op string, params []map[string]string, pd Period, timeOut ...int) (res bool, msg []*Msg, err error) {
// 参数校验
pa, err := checkParams(evtId, params, pd)
if err != nil {
return
}
res = true
tm := 5000
if len(timeOut) != 0 {
tm = timeOut[0]
}
req := ReqData{
Op: op,
Args: pa,
}
ctx := context.Background()
ctx, _ = context.WithTimeout(ctx, time.Duration(tm)*time.Millisecond)
msg, err = a.process(ctx, evtId, req)
if err != nil {
res = false
log.Println("处理请求失败!", req, err)
return
}
//检查所有频道是否都更新成功
res, err = checkResult(req, msg)
if err != nil {
res = false
return
}
return
}
// 参数校验
func checkParams(evtId Event, params []map[string]string, pd Period) (res []map[string]string, err error) {
channel := evtId.GetChannel(pd)
if channel == "" {
err = errors.New("参数校验失败!未知的类型:" + evtId.String())
return
}
log.Println(channel)
if params == nil {
tmp := make(map[string]string)
tmp["channel"] = channel
res = append(res, tmp)
} else {
//log.Println(params)
for _, param := range params {
tmp := make(map[string]string)
for k, v := range param {
tmp[k] = v
}
val, ok := tmp["channel"]
if !ok {
tmp["channel"] = channel
} else {
if val != channel {
err = errors.New("参数校验失败!channel应为" + channel + val)
return
}
}
res = append(res, tmp)
}
}
return
}

40
ws/ws_priv_channel.go Normal file
View File

@ -0,0 +1,40 @@
package ws
import (
. "v5sdk_go/ws/wImpl"
)
/*
订阅账户频道
*/
func (a *WsClient) PrivAccout(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) {
return a.PubChannel(EVENT_BOOK_ACCOUNT, op, params, PERIOD_NONE, timeOut...)
}
/*
订阅持仓频道
*/
func (a *WsClient) PrivPostion(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) {
return a.PubChannel(EVENT_BOOK_POSTION, op, params, PERIOD_NONE, timeOut...)
}
/*
订阅订单频道
*/
func (a *WsClient) PrivBookOrder(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) {
return a.PubChannel(EVENT_BOOK_ORDER, op, params, PERIOD_NONE, timeOut...)
}
/*
订阅策略委托订单频道
*/
func (a *WsClient) PrivBookAlgoOrder(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) {
return a.PubChannel(EVENT_BOOK_ALG_ORDER, op, params, PERIOD_NONE, timeOut...)
}
/*
订阅账户余额和持仓频道
*/
func (a *WsClient) PrivBalAndPos(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) {
return a.PubChannel(EVENT_BOOK_B_AND_P, op, params, PERIOD_NONE, timeOut...)
}

View File

@ -0,0 +1,99 @@
package ws
// HOW TO RUN
// go test ws_cli.go ws_op.go ws_contants.go utils.go ws_priv_channel.go ws_priv_channel_Accout_test.go -v
import (
"fmt"
"log"
"testing"
"time"
)
const (
TRADE_ACCOUNT = iota
ISOLATE_ACCOUNT
CROSS_ACCOUNT
CROSS_ACCOUNT_B
)
func prework_pri(t int) *WsClient {
// 模拟环境
ep := "wss://wsaws.okex.com:8443/ws/v5/private"
var apikey, passphrase, secretKey string
// 把账号密码写这里
switch t {
case TRADE_ACCOUNT:
apikey = "fe468418-5e40-433f-8d04-04951286d417"
passphrase = "M4pw71Id"
secretKey = "D6D74DF9DD60A25BE2B27CA71D8F814D"
case ISOLATE_ACCOUNT:
apikey = "fe468418-5e40-433f-8d04-04951286d417"
passphrase = "M4pw71Id"
secretKey = "D6D74DF9DD60A25BE2B27CA71D8F814D"
case CROSS_ACCOUNT:
apikey = "fe468418-5e40-433f-8d04-04951286d417"
passphrase = "M4pw71Id"
secretKey = "D6D74DF9DD60A25BE2B27CA71D8F814D"
case CROSS_ACCOUNT_B:
apikey = "fe468418-5e40-433f-8d04-04951286d417"
passphrase = "M4pw71Id"
secretKey = "D6D74DF9DD60A25BE2B27CA71D8F814D"
}
r, err := NewWsClient(ep)
if err != nil {
log.Fatal(err)
}
err = r.Start()
if err != nil {
log.Fatal(err)
}
var res bool
start := time.Now()
res, _, err = r.Login(apikey, secretKey, passphrase)
if res {
usedTime := time.Since(start)
fmt.Println("登录成功!", usedTime.String())
} else {
log.Fatal("登录失败!", err)
}
fmt.Println(apikey, secretKey, passphrase)
return r
}
// 账户频道 测试
func TestAccout(t *testing.T) {
r := prework_pri(CROSS_ACCOUNT)
var res bool
var err error
var args []map[string]string
arg := make(map[string]string)
arg["ccy"] = "BTC"
args = append(args, arg)
fmt.Println("args: ", args)
start := time.Now()
res, _, err = r.PrivAccout(OP_SUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("订阅所有成功!", usedTime.String())
} else {
fmt.Println("订阅所有成功!", err)
t.Fatal("订阅所有成功!", err)
}
time.Sleep(100 * time.Second)
start = time.Now()
// res, _, err = r.PrivAccout(OP_UNSUBSCRIBE, args)
// if res {
// usedTime := time.Since(start)
// fmt.Println("取消订阅所有成功!", usedTime.String())
// } else {
// fmt.Println("取消订阅所有失败!", err)
// t.Fatal("取消订阅所有失败!", err)
// }
}

247
ws/ws_priv_channel_test.go Normal file
View File

@ -0,0 +1,247 @@
package ws
import (
"fmt"
"log"
"testing"
"time"
)
const (
TRADE_ACCOUNT = iota
ISOLATE_ACCOUNT
CROSS_ACCOUNT
CROSS_ACCOUNT_B
)
func prework_pri(t int) *WsClient {
ep := "wss://wsaws.okex.com:8443/ws/v5/private"
var apikey, passphrase, secretKey string
// 把账号密码写这里
switch t {
case TRADE_ACCOUNT:
apikey = "fe468418-5e40-433f-8d04-04951286d417"
passphrase = "M4pw71Id"
secretKey = "D6D74DF9DD60A25BE2B27CA71D8F814D"
case ISOLATE_ACCOUNT:
apikey = "fe468418-5e40-433f-8d04-04951286d417"
passphrase = "M4pw71Id"
secretKey = "D6D74DF9DD60A25BE2B27CA71D8F814D"
case CROSS_ACCOUNT:
apikey = "fe468418-5e40-433f-8d04-04951286d417"
passphrase = "M4pw71Id"
secretKey = "D6D74DF9DD60A25BE2B27CA71D8F814D"
case CROSS_ACCOUNT_B:
apikey = "fe468418-5e40-433f-8d04-04951286d417"
passphrase = "M4pw71Id"
secretKey = "D6D74DF9DD60A25BE2B27CA71D8F814D"
}
r, err := NewWsClient(ep)
if err != nil {
log.Fatal(err)
}
err = r.Start()
if err != nil {
log.Fatal(err)
}
var res bool
//start := time.Now()
res, _, err = r.Login(apikey, secretKey, passphrase)
if res {
//usedTime := time.Since(start)
//fmt.Println("登录成功!",usedTime.String())
} else {
log.Fatal("登录失败!", err)
}
fmt.Println(apikey, secretKey, passphrase)
return r
}
// 账户频道 测试
func TestAccout(t *testing.T) {
r := prework_pri(CROSS_ACCOUNT)
var res bool
var err error
var args []map[string]string
arg := make(map[string]string)
//arg["ccy"] = "BTC"
args = append(args, arg)
start := time.Now()
res, _, err = r.PrivAccout(OP_SUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("订阅所有成功!", usedTime.String())
} else {
fmt.Println("订阅所有成功!", err)
t.Fatal("订阅所有成功!", err)
}
time.Sleep(100 * time.Second)
start = time.Now()
res, _, err = r.PrivAccout(OP_UNSUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("取消订阅所有成功!", usedTime.String())
} else {
fmt.Println("取消订阅所有失败!", err)
t.Fatal("取消订阅所有失败!", err)
}
}
// 持仓频道 测试
func TestPositon(t *testing.T) {
r := prework_pri(CROSS_ACCOUNT)
var err error
var res bool
var args []map[string]string
arg := make(map[string]string)
arg["instType"] = FUTURES
arg["uly"] = "BTC-USD"
//arg["instId"] = "BTC-USD-210319"
args = append(args, arg)
start := time.Now()
res, _, err = r.PrivPostion(OP_SUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("订阅成功!", usedTime.String())
} else {
fmt.Println("订阅失败!", err)
t.Fatal("订阅失败!", err)
//return
}
time.Sleep(60000 * time.Second)
//等待推送
start = time.Now()
res, _, err = r.PrivPostion(OP_UNSUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("取消订阅成功!", usedTime.String())
} else {
fmt.Println("取消订阅失败!", err)
t.Fatal("取消订阅失败!", err)
}
}
// 订单频道 测试
func TestBookOrder(t *testing.T) {
r := prework_pri(CROSS_ACCOUNT)
var err error
var res bool
var args []map[string]string
arg := make(map[string]string)
arg["instId"] = "BTC-USDT"
arg["instType"] = "ANY"
//arg["instType"] = "SWAP"
args = append(args, arg)
start := time.Now()
res, _, err = r.PrivBookOrder(OP_SUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("订阅成功!", usedTime.String())
} else {
fmt.Println("订阅失败!", err)
t.Fatal("订阅失败!", err)
//return
}
time.Sleep(6000 * time.Second)
//等待推送
start = time.Now()
res, _, err = r.PrivBookOrder(OP_UNSUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("取消订阅成功!", usedTime.String())
} else {
fmt.Println("取消订阅失败!", err)
t.Fatal("取消订阅失败!", err)
}
}
// 策略委托订单频道 测试
func TestAlgoOrder(t *testing.T) {
r := prework_pri(CROSS_ACCOUNT)
var err error
var res bool
var args []map[string]string
arg := make(map[string]string)
arg["instType"] = "SPOT"
args = append(args, arg)
start := time.Now()
res, _, err = r.PrivBookAlgoOrder(OP_SUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("订阅成功!", usedTime.String())
} else {
fmt.Println("订阅失败!", err)
t.Fatal("订阅失败!", err)
//return
}
time.Sleep(60 * time.Second)
//等待推送
start = time.Now()
res, _, err = r.PrivBookAlgoOrder(OP_UNSUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("取消订阅成功!", usedTime.String())
} else {
fmt.Println("取消订阅失败!", err)
t.Fatal("取消订阅失败!", err)
}
}
// 账户余额和持仓频道 测试
func TestPrivBalAndPos(t *testing.T) {
r := prework_pri(CROSS_ACCOUNT)
var err error
var res bool
var args []map[string]string
arg := make(map[string]string)
args = append(args, arg)
start := time.Now()
res, _, err = r.PrivBalAndPos(OP_SUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("订阅成功!", usedTime.String())
} else {
fmt.Println("订阅失败!", err)
t.Fatal("订阅失败!", err)
//return
}
time.Sleep(600 * time.Second)
//等待推送
start = time.Now()
res, _, err = r.PrivBalAndPos(OP_UNSUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("取消订阅成功!", usedTime.String())
} else {
fmt.Println("取消订阅失败!", err)
t.Fatal("取消订阅失败!", err)
}
}

141
ws/ws_pub_channel.go Normal file
View File

@ -0,0 +1,141 @@
package ws
import (
"errors"
. "v5sdk_go/ws/wImpl"
)
/*
产品频道
*/
func (a *WsClient) PubInstruemnts(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) {
return a.PubChannel(EVENT_BOOK_INSTRUMENTS, op, params, PERIOD_NONE, timeOut...)
}
// 获取系统维护的状态,当系统维护状态改变时推送
func (a *WsClient) PubStatus(op string, timeOut ...int) (res bool, msg []*Msg, err error) {
return a.PubChannel(EVENT_STATUS, op, nil, PERIOD_NONE, timeOut...)
}
/*
行情频道
*/
func (a *WsClient) PubTickers(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) {
return a.PubChannel(EVENT_BOOK_TICKERS, op, params, PERIOD_NONE, timeOut...)
}
/*
持仓总量频道
*/
func (a *WsClient) PubOpenInsterest(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) {
return a.PubChannel(EVENT_BOOK_OPEN_INTEREST, op, params, PERIOD_NONE, timeOut...)
}
/*
K线频道
*/
func (a *WsClient) PubKLine(op string, period Period, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) {
return a.PubChannel(EVENT_BOOK_KLINE, op, params, period, timeOut...)
}
/*
交易频道
*/
func (a *WsClient) PubTrade(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) {
return a.PubChannel(EVENT_BOOK_TRADE, op, params, PERIOD_NONE, timeOut...)
}
/*
预估交割/行权价格频道
*/
func (a *WsClient) PubEstDePrice(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) {
return a.PubChannel(EVENT_BOOK_ESTIMATE_PRICE, op, params, PERIOD_NONE, timeOut...)
}
/*
标记价格频道
*/
func (a *WsClient) PubMarkPrice(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) {
return a.PubChannel(EVENT_BOOK_MARK_PRICE, op, params, PERIOD_NONE, timeOut...)
}
/*
标记价格K线频道
*/
func (a *WsClient) PubMarkPriceCandle(op string, pd Period, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) {
return a.PubChannel(EVENT_BOOK_MARK_PRICE_CANDLE_CHART, op, params, pd, timeOut...)
}
/*
限价频道
*/
func (a *WsClient) PubLimitPrice(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) {
return a.PubChannel(EVENT_BOOK_LIMIT_PRICE, op, params, PERIOD_NONE, timeOut...)
}
/*
深度频道
*/
func (a *WsClient) PubOrderBooks(op string, channel string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) {
switch channel {
// 400档快照
case "books":
return a.PubChannel(EVENT_BOOK_ORDER_BOOK, op, params, PERIOD_NONE, timeOut...)
// 5档快照
case "books5":
return a.PubChannel(EVENT_BOOK_ORDER_BOOK5, op, params, PERIOD_NONE, timeOut...)
// 400 tbt
case "books-l2-tbt":
return a.PubChannel(EVENT_BOOK_ORDER_BOOK_TBT, op, params, PERIOD_NONE, timeOut...)
// 50 tbt
case "books50-l2-tbt":
return a.PubChannel(EVENT_BOOK_ORDER_BOOK50_TBT, op, params, PERIOD_NONE, timeOut...)
default:
err = errors.New("未知的channel")
return
}
}
/*
期权定价频道
*/
func (a *WsClient) PubOptionSummary(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) {
return a.PubChannel(EVENT_BOOK_OPTION_SUMMARY, op, params, PERIOD_NONE, timeOut...)
}
/*
资金费率频道
*/
func (a *WsClient) PubFundRate(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) {
return a.PubChannel(EVENT_BOOK_FUND_RATE, op, params, PERIOD_NONE, timeOut...)
}
/*
指数K线频道
*/
func (a *WsClient) PubKLineIndex(op string, pd Period, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) {
return a.PubChannel(EVENT_BOOK_KLINE_INDEX, op, params, pd, timeOut...)
}
/*
指数行情频道
*/
func (a *WsClient) PubIndexTickers(op string, params []map[string]string, timeOut ...int) (res bool, msg []*Msg, err error) {
return a.PubChannel(EVENT_BOOK_INDEX_TICKERS, op, params, PERIOD_NONE, timeOut...)
}

669
ws/ws_pub_channel_test.go Normal file
View File

@ -0,0 +1,669 @@
package ws
import (
"encoding/json"
"fmt"
"log"
"strings"
"testing"
"time"
. "v5sdk_go/ws/wImpl"
)
func prework() *WsClient {
ep := "wss://wsaws.okex.com:8443/ws/v5/private"
r, err := NewWsClient(ep)
if err != nil {
log.Fatal(err)
}
err = r.Start()
if err != nil {
log.Fatal(err, ep)
}
return r
}
// 产品频道测试
func TestInstruemnts(t *testing.T) {
r := prework()
var err error
var res bool
var args []map[string]string
arg := make(map[string]string)
arg["instType"] = SPOT
//arg["instType"] = OPTION
args = append(args, arg)
start := time.Now()
res, _, err = r.PubInstruemnts(OP_SUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("订阅成功!", usedTime.String())
} else {
fmt.Println("订阅失败!", err)
t.Fatal("订阅失败!", err)
//return
}
time.Sleep(3 * time.Second)
//等待推送
start = time.Now()
res, _, err = r.PubInstruemnts(OP_UNSUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("取消订阅成功!", usedTime.String())
} else {
fmt.Println("取消订阅失败!", err)
t.Fatal("取消订阅失败!", err)
}
}
// status频道测试
func TestStatus(t *testing.T) {
r := prework()
var err error
var res bool
start := time.Now()
res, _, err = r.PubStatus(OP_SUBSCRIBE)
if res {
usedTime := time.Since(start)
fmt.Println("订阅成功!", usedTime.String())
} else {
fmt.Println("订阅失败!", err)
t.Fatal("订阅失败!", err)
//return
}
time.Sleep(10000 * time.Second)
//等待推送
start = time.Now()
res, _, err = r.PubStatus(OP_UNSUBSCRIBE)
if res {
usedTime := time.Since(start)
fmt.Println("取消订阅成功!", usedTime.String())
} else {
fmt.Println("取消订阅失败!", err)
t.Fatal("取消订阅失败!", err)
}
}
// 行情频道测试
func TestTickers(t *testing.T) {
r := prework()
var err error
var res bool
var args []map[string]string
arg := make(map[string]string)
arg["instId"] = "BTC-USDT"
args = append(args, arg)
start := time.Now()
res, _, err = r.PubTickers(OP_SUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("订阅成功!", usedTime.String())
} else {
fmt.Println("订阅失败!", err)
t.Fatal("订阅失败!", err)
//return
}
time.Sleep(600 * time.Second)
//等待推送
start = time.Now()
res, _, err = r.PubTickers(OP_UNSUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("取消订阅成功!", usedTime.String())
} else {
fmt.Println("取消订阅失败!", err)
t.Fatal("取消订阅失败!", err)
}
}
// 持仓总量频道 测试
func TestOpenInsterest(t *testing.T) {
r := prework()
var err error
var res bool
var args []map[string]string
arg := make(map[string]string)
arg["instId"] = "LTC-USD-SWAP"
args = append(args, arg)
start := time.Now()
res, _, err = r.PubOpenInsterest(OP_SUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("订阅成功!", usedTime.String())
} else {
fmt.Println("订阅失败!", err)
t.Fatal("订阅失败!", err)
//return
}
time.Sleep(60 * time.Second)
//等待推送
start = time.Now()
res, _, err = r.PubOpenInsterest(OP_UNSUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("取消订阅成功!", usedTime.String())
} else {
fmt.Println("取消订阅失败!", err)
t.Fatal("取消订阅失败!", err)
}
}
// K线频道测试
func TestKLine(t *testing.T) {
r := prework()
var err error
var res bool
var args []map[string]string
arg := make(map[string]string)
arg["instId"] = "BTC-USDT"
args = append(args, arg)
// 1分钟K
period := PERIOD_1MIN
start := time.Now()
res, _, err = r.PubKLine(OP_SUBSCRIBE, period, args)
if res {
usedTime := time.Since(start)
fmt.Println("订阅成功!", usedTime.String())
} else {
fmt.Println("订阅失败!", err)
t.Fatal("订阅失败!", err)
}
time.Sleep(60 * time.Second)
//等待推送
start = time.Now()
res, _, err = r.PubKLine(OP_UNSUBSCRIBE, period, args)
if res {
usedTime := time.Since(start)
fmt.Println("取消订阅成功!", usedTime.String())
} else {
fmt.Println("取消订阅失败!", err)
t.Fatal("取消订阅失败!", err)
}
}
// 交易频道测试
func TestTrade(t *testing.T) {
r := prework()
var err error
var res bool
var args []map[string]string
arg := make(map[string]string)
arg["instId"] = "BTC-USDT"
args = append(args, arg)
start := time.Now()
res, _, err = r.PubTrade(OP_SUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("订阅成功!", usedTime.String())
} else {
fmt.Println("订阅失败!", err)
t.Fatal("订阅失败!", err)
//return
}
time.Sleep(60 * time.Second)
//等待推送
start = time.Now()
res, _, err = r.PubTrade(OP_UNSUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("取消订阅成功!", usedTime.String())
} else {
fmt.Println("取消订阅失败!", err)
t.Fatal("取消订阅失败!", err)
}
}
// 预估交割/行权价格频道 测试
func TestEstDePrice(t *testing.T) {
r := prework()
var err error
var res bool
var args []map[string]string
arg := make(map[string]string)
arg["instType"] = FUTURES
arg["uly"] = "BTC-USD"
args = append(args, arg)
start := time.Now()
res, _, err = r.PubEstDePrice(OP_SUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("订阅成功!", usedTime.String())
} else {
fmt.Println("订阅失败!", err)
t.Fatal("订阅失败!", err)
//return
}
time.Sleep(60 * time.Second)
//等待推送
start = time.Now()
res, _, err = r.PubEstDePrice(OP_UNSUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("取消订阅成功!", usedTime.String())
} else {
fmt.Println("取消订阅失败!", err)
t.Fatal("取消订阅失败!", err)
}
}
// 标记价格频道 测试
func TestMarkPrice(t *testing.T) {
r := prework()
var err error
var res bool
var args []map[string]string
arg := make(map[string]string)
arg["instId"] = "BTC-USDT"
args = append(args, arg)
start := time.Now()
res, _, err = r.PubMarkPrice(OP_SUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("订阅成功!", usedTime.String())
} else {
fmt.Println("订阅失败!", err)
t.Fatal("订阅失败!", err)
//return
}
time.Sleep(60 * time.Second)
//等待推送
start = time.Now()
res, _, err = r.PubMarkPrice(OP_UNSUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("取消订阅成功!", usedTime.String())
} else {
fmt.Println("取消订阅失败!", err)
t.Fatal("取消订阅失败!", err)
}
}
// 标记价格K线频道 测试s
func TestMarkPriceCandle(t *testing.T) {
r := prework()
var err error
var res bool
var args []map[string]string
arg := make(map[string]string)
arg["instId"] = "BTC-USDT"
args = append(args, arg)
period := PERIOD_1MIN
start := time.Now()
res, _, err = r.PubMarkPriceCandle(OP_SUBSCRIBE, period, args)
if res {
usedTime := time.Since(start)
fmt.Println("订阅成功!", usedTime.String())
} else {
fmt.Println("订阅失败!", err)
t.Fatal("订阅失败!", err)
//return
}
time.Sleep(60 * time.Second)
//等待推送
start = time.Now()
res, _, err = r.PubMarkPriceCandle(OP_UNSUBSCRIBE, period, args)
if res {
usedTime := time.Since(start)
fmt.Println("取消订阅成功!", usedTime.String())
} else {
fmt.Println("取消订阅失败!", err)
t.Fatal("取消订阅失败!", err)
}
}
// 限价频道 测试
func TestLimitPrice(t *testing.T) {
r := prework()
var err error
var res bool
var args []map[string]string
arg := make(map[string]string)
arg["instId"] = "BTC-USDT"
args = append(args, arg)
start := time.Now()
res, _, err = r.PubLimitPrice(OP_SUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("订阅成功!", usedTime.String())
} else {
fmt.Println("订阅失败!", err)
t.Fatal("订阅失败!", err)
//return
}
time.Sleep(60 * time.Second)
//等待推送
start = time.Now()
res, _, err = r.PubLimitPrice(OP_UNSUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("取消订阅成功!", usedTime.String())
} else {
fmt.Println("取消订阅失败!", err)
t.Fatal("取消订阅失败!", err)
}
}
// 深度频道 测试
func TestOrderBooks(t *testing.T) {
r := prework()
var err error
var res bool
/*
设置关闭深度数据管理
*/
// err = r.EnableAutoDepthMgr(false)
// if err != nil {
// fmt.Println("关闭自动校验失败!")
// }
end := make(chan struct{})
r.AddDepthHook(func(ts time.Time, data DepthData) error {
// 对于深度类型数据处理的用户可以自定义
// 检测深度数据是否正常
key, _ := json.Marshal(data.Arg)
fmt.Println("个数:", len(data.Data[0].Asks))
checksum := data.Data[0].Checksum
fmt.Println("[自定义方法] ", string(key), ", checksum = ", checksum)
for _, ask := range data.Data[0].Asks {
arr := strings.Split(ask[0], ".")
//fmt.Println(arr)
if len(arr) > 1 && len(arr[1]) > 2 {
fmt.Println("ask数据异常,", checksum, "ask:", ask)
t.Fatal()
end <- struct{}{}
return nil
} else {
fmt.Println("bid数据正常,", checksum, "ask:", ask)
}
}
for _, bid := range data.Data[0].Bids {
arr := strings.Split(bid[0], ".")
//fmt.Println(arr)
if len(arr) > 1 && len(arr[1]) > 2 {
fmt.Println("bid数据异常,", checksum, "bid:", bid)
t.Fatal()
end <- struct{}{}
return nil
} else {
fmt.Println("ask数据正常,", checksum, "bid:", bid)
}
}
// // 查看当前合并后的全量深度数据
// snapshot, err := r.GetSnapshotByChannel(data)
// if err != nil {
// t.Fatal("深度数据不存在!")
// }
// // 展示ask/bid 前5档数据
// fmt.Println(" Ask 5 档数据 >> ")
// for _, v := range snapshot.Asks[:5] {
// fmt.Println(" price:", v[0], " amount:", v[1])
// }
// fmt.Println(" Bid 5 档数据 >> ")
// for _, v := range snapshot.Bids[:5] {
// fmt.Println(" price:", v[0], " amount:", v[1])
// }
return nil
})
// 可选类型books books5 books-l2-tbt
channel := "books50-l2-tbt"
instIds := []string{"BTC-USDT"}
for _, instId := range instIds {
var args []map[string]string
arg := make(map[string]string)
arg["instId"] = instId
args = append(args, arg)
start := time.Now()
res, _, err = r.PubOrderBooks(OP_SUBSCRIBE, channel, args)
if res {
usedTime := time.Since(start)
fmt.Println("订阅成功!", usedTime.String())
} else {
fmt.Println("订阅失败!", err)
t.Fatal("订阅失败!", err)
}
}
select {
case <-end:
}
//等待推送
for _, instId := range instIds {
var args []map[string]string
arg := make(map[string]string)
arg["instId"] = instId
args = append(args, arg)
start := time.Now()
res, _, err = r.PubOrderBooks(OP_UNSUBSCRIBE, channel, args)
if res {
usedTime := time.Since(start)
fmt.Println("取消订阅成功!", usedTime.String())
} else {
fmt.Println("取消订阅失败!", err)
t.Fatal("取消订阅失败!", err)
}
}
}
// 期权定价频道 测试
func TestOptionSummary(t *testing.T) {
r := prework()
var err error
var res bool
var args []map[string]string
arg := make(map[string]string)
arg["uly"] = "BTC-USD"
args = append(args, arg)
start := time.Now()
res, _, err = r.PubOptionSummary(OP_SUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("订阅成功!", usedTime.String())
} else {
fmt.Println("订阅失败!", err)
t.Fatal("订阅失败!", err)
//return
}
time.Sleep(60 * time.Second)
//等待推送
start = time.Now()
res, _, err = r.PubOptionSummary(OP_UNSUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("取消订阅成功!", usedTime.String())
} else {
fmt.Println("取消订阅失败!", err)
t.Fatal("取消订阅失败!", err)
}
}
// 资金费率 测试
func TestFundRate(t *testing.T) {
r := prework()
var err error
var res bool
var args []map[string]string
arg := make(map[string]string)
arg["instId"] = "BTC-USD-SWAP"
args = append(args, arg)
start := time.Now()
res, _, err = r.PubFundRate(OP_SUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("订阅成功!", usedTime.String())
} else {
fmt.Println("订阅失败!", err)
t.Fatal("订阅失败!", err)
//return
}
time.Sleep(600 * time.Second)
//等待推送
start = time.Now()
res, _, err = r.PubFundRate(OP_UNSUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("取消订阅成功!", usedTime.String())
} else {
fmt.Println("取消订阅失败!", err)
t.Fatal("取消订阅失败!", err)
}
}
// 指数K线频道 测试
func TestKLineIndex(t *testing.T) {
r := prework()
var err error
var res bool
var args []map[string]string
arg := make(map[string]string)
arg["instId"] = "BTC-USDT"
args = append(args, arg)
period := PERIOD_1MIN
start := time.Now()
res, _, err = r.PubKLineIndex(OP_SUBSCRIBE, period, args)
if res {
usedTime := time.Since(start)
fmt.Println("订阅成功!", usedTime.String())
} else {
fmt.Println("订阅失败!", err)
t.Fatal("订阅失败!", err)
//return
}
time.Sleep(60 * time.Second)
//等待推送
start = time.Now()
res, _, err = r.PubKLineIndex(OP_UNSUBSCRIBE, period, args)
if res {
usedTime := time.Since(start)
fmt.Println("取消订阅成功!", usedTime.String())
} else {
fmt.Println("取消订阅失败!", err)
t.Fatal("取消订阅失败!", err)
}
}
// 指数行情频道 测试
func TestIndexMarket(t *testing.T) {
r := prework()
var err error
var res bool
var args []map[string]string
arg := make(map[string]string)
arg["instId"] = "BTC-USDT"
args = append(args, arg)
start := time.Now()
res, _, err = r.PubIndexTickers(OP_SUBSCRIBE, args)
if err != nil {
fmt.Println("订阅失败!", err)
}
usedTime := time.Since(start)
if res {
fmt.Println("订阅成功!", usedTime.String())
} else {
fmt.Println("订阅失败!", usedTime.String())
//return
}
time.Sleep(600 * time.Second)
//等待推送
start = time.Now()
res, _, err = r.PubIndexTickers(OP_UNSUBSCRIBE, args)
if res {
usedTime := time.Since(start)
fmt.Println("取消订阅成功!", usedTime.String())
} else {
fmt.Println("取消订阅失败!", err)
t.Fatal("取消订阅失败!", err)
}
}

386
ws/ws_test.go Normal file
View File

@ -0,0 +1,386 @@
package ws
import (
"fmt"
"log"
"testing"
"time"
. "v5sdk_go/ws/wImpl"
"github.com/stretchr/testify/assert"
)
func init() {
log.SetFlags(log.LstdFlags | log.Llongfile)
}
func TestPing(t *testing.T) {
r := prework_pri(CROSS_ACCOUNT)
res, _, _ := r.Ping()
assert.True(t, res, true)
}
func TestWsClient_SubscribeAndUnSubscribe(t *testing.T) {
r := prework()
var err error
var res bool
param := map[string]string{}
param["channel"] = "opt-summary"
param["uly"] = "BTC-USD"
start := time.Now()
res, _, err = r.Subscribe(param)
if res {
usedTime := time.Since(start)
fmt.Println("订阅成功!", usedTime.String())
} else {
fmt.Println("订阅失败!", err)
t.Fatal("订阅失败!", err)
//return
}
time.Sleep(60 * time.Second)
//等待推送
start = time.Now()
res, _, err = r.UnSubscribe(param)
if res {
usedTime := time.Since(start)
fmt.Println("取消订阅成功!", usedTime.String())
} else {
fmt.Println("取消订阅失败!", err)
t.Fatal("取消订阅失败!", err)
}
}
func TestWsClient_SubscribeAndUnSubscribe_priv(t *testing.T) {
r := prework_pri(ISOLATE_ACCOUNT)
var err error
var res bool
var params []map[string]string
params = append(params, map[string]string{"channel": "orders", "instType": SPOT, "instId": "BTC-USDT"})
//一个失败的订阅用例
params = append(params, map[string]string{"channel": "positions", "instType": "any"})
for _, v := range params {
start := time.Now()
var data *ProcessDetail
res, data, err = r.Subscribe(v)
if res {
usedTime := time.Since(start)
fmt.Println("订阅成功!", usedTime.String())
PrintDetail(data)
} else {
fmt.Println("订阅失败!", err)
//return
}
time.Sleep(60 * time.Second)
//等待推送
start = time.Now()
res, _, err = r.UnSubscribe(v)
if res {
usedTime := time.Since(start)
fmt.Println("取消订阅成功!", usedTime.String())
} else {
fmt.Println("取消订阅失败!", err)
}
}
}
func TestWsClient_Jrpc(t *testing.T) {
//r := prework_pri(ISOLATE_ACCOUNT)
r := prework_pri(CROSS_ACCOUNT)
var res bool
var err error
var data *ProcessDetail
start := time.Now()
var args []map[string]interface{}
param := map[string]interface{}{}
param["instId"] = "BTC-USDT"
param["clOrdId"] = "SIM0dcopy16069997808063455"
param["tdMode"] = "cross"
param["side"] = "sell"
param["ordType"] = "limit"
param["px"] = "19333.3"
param["sz"] = "0.18605445"
param1 := map[string]interface{}{}
param1["instId"] = "BTC-USDT"
param1["clOrdId"] = "SIM0dcopy16069997808063456"
param1["tdMode"] = "cross"
param1["side"] = "sell"
param1["ordType"] = "limit"
param1["px"] = "19334.2"
param1["sz"] = "0.03508913"
param2 := map[string]interface{}{}
param2["instId"] = "BTC-USDT"
param2["clOrdId"] = "SIM0dcopy16069997808063457"
param2["tdMode"] = "cross"
param2["side"] = "sell"
param2["ordType"] = "limit"
param2["px"] = "19334.8"
param2["sz"] = "0.03658186"
param3 := map[string]interface{}{}
param3["instId"] = "BTC-USDT"
param3["clOrdId"] = "SIM0dcopy16069997808063458"
param3["tdMode"] = "cross"
param3["side"] = "sell"
param3["ordType"] = "limit"
param3["px"] = "19334.9"
param3["sz"] = "0.5"
param4 := map[string]interface{}{}
param4["instId"] = "BTC-USDT"
param4["clOrdId"] = "SIM0dcopy16069997808063459"
param4["tdMode"] = "cross"
param4["side"] = "sell"
param4["ordType"] = "limit"
param4["px"] = "19335.2"
param4["sz"] = "0.3"
param5 := map[string]interface{}{}
param5["instId"] = "BTC-USDT"
param5["clOrdId"] = "SIM0dcopy16069997808063460"
param5["tdMode"] = "cross"
param5["side"] = "sell"
param5["ordType"] = "limit"
param5["px"] = "19335.9"
param5["sz"] = "0.051"
param6 := map[string]interface{}{}
param6["instId"] = "BTC-USDT"
param6["clOrdId"] = "SIM0dcopy16069997808063461"
param6["tdMode"] = "cross"
param6["side"] = "sell"
param6["ordType"] = "limit"
param6["px"] = "19336.4"
param6["sz"] = "1"
param7 := map[string]interface{}{}
param7["instId"] = "BTC-USDT"
param7["clOrdId"] = "SIM0dcopy16069997808063462"
param7["tdMode"] = "cross"
param7["side"] = "sell"
param7["ordType"] = "limit"
param7["px"] = "19336.8"
param7["sz"] = "0.475"
param8 := map[string]interface{}{}
param8["instId"] = "BTC-USDT"
param8["clOrdId"] = "SIM0dcopy16069997808063463"
param8["tdMode"] = "cross"
param8["side"] = "sell"
param8["ordType"] = "limit"
param8["px"] = "19337.3"
param8["sz"] = "0.21299357"
param9 := map[string]interface{}{}
param9["instId"] = "BTC-USDT"
param9["clOrdId"] = "SIM0dcopy16069997808063464"
param9["tdMode"] = "cross"
param9["side"] = "sell"
param9["ordType"] = "limit"
param9["px"] = "19337.5"
param9["sz"] = "0.5"
args = append(args, param)
args = append(args, param1)
args = append(args, param2)
args = append(args, param3)
args = append(args, param4)
args = append(args, param5)
args = append(args, param6)
args = append(args, param7)
args = append(args, param8)
args = append(args, param9)
res, data, err = r.Jrpc("okexv5wsapi001", "order", args)
if res {
usedTime := time.Since(start)
fmt.Println("下单成功!", usedTime.String())
PrintDetail(data)
} else {
usedTime := time.Since(start)
fmt.Println("下单失败!", usedTime.String(), err)
}
}
/*
测试 添加全局消息回调函数
*/
func TestAddMessageHook(t *testing.T) {
r := prework_pri(CROSS_ACCOUNT)
r.AddMessageHook(func(msg *Msg) error {
// 添加你的方法
fmt.Println("这是全局消息自定义MessageHook")
fmt.Println("当前数据是", msg)
return nil
})
select {}
}
/*
普通推送数据回调函数
*/
func TestAddBookedDataHook(t *testing.T) {
var r *WsClient
/*订阅私有频道*/
{
r = prework_pri(CROSS_ACCOUNT)
var res bool
var err error
r.AddBookMsgHook(func(ts time.Time, data MsgData) error {
// 添加你的方法
fmt.Println("这是私有自定义AddBookMsgHook")
fmt.Println("当前数据是", data)
return nil
})
param := map[string]string{}
param["channel"] = "account"
param["ccy"] = "BTC"
res, _, err = r.Subscribe(param)
if res {
fmt.Println("订阅成功!")
} else {
fmt.Println("订阅失败!", err)
t.Fatal("订阅失败!", err)
//return
}
time.Sleep(100 * time.Second)
}
//订阅公共频道
{
r = prework()
var res bool
var err error
r.AddBookMsgHook(func(ts time.Time, data MsgData) error {
// 添加你的方法
fmt.Println("这是公共自定义AddBookMsgHook")
fmt.Println("当前数据是", data)
return nil
})
param := map[string]string{}
param["channel"] = "instruments"
param["instType"] = "FUTURES"
res, _, err = r.Subscribe(param)
if res {
fmt.Println("订阅成功!")
} else {
fmt.Println("订阅失败!", err)
t.Fatal("订阅失败!", err)
//return
}
select {}
}
}
func TestGetInfoFromErrMsg(t *testing.T) {
a := assert.New(t)
buf := `
"channel:index-tickers,instId:BTC-USDT1 doesn't exist"
`
ch := GetInfoFromErrMsg(buf)
//t.Log(ch)
a.Equal("index-tickers", ch)
//assert.True(t,ch == "index-tickers")
}
/*
*/
func TestParseMessage(t *testing.T) {
r := prework()
var evt Event
msg := `{"event":"error","msg":"Contract does not exist.","code":"51001"}`
evt, _, _ = r.parseMessage([]byte(msg))
assert.True(t, EVENT_ERROR == evt)
msg = `{"event":"error","msg":"channel:positions,ccy:BTC doesn't exist","code":"60018"}`
evt, _, _ = r.parseMessage([]byte(msg))
assert.True(t, EVENT_BOOK_POSTION == evt)
}
/*
原始方式 深度订阅 测试
*/
func TestSubscribeTBT(t *testing.T) {
r := prework()
var res bool
var err error
// 添加你的方法
r.AddDepthHook(func(ts time.Time, data DepthData) error {
//fmt.Println("这是自定义AddBookMsgHook")
fmt.Println("当前数据是:", data)
return nil
})
param := map[string]string{}
param["channel"] = "books-l2-tbt"
//param["channel"] = "books"
param["instId"] = "BTC-USD-SWAP"
res, _, err = r.Subscribe(param)
if res {
fmt.Println("订阅成功!")
} else {
fmt.Println("订阅失败!", err)
t.Fatal("订阅失败!", err)
//return
}
time.Sleep(60 * time.Second)
}
/*
*/
func TestSubscribeBalAndPos(t *testing.T) {
r := prework_pri(CROSS_ACCOUNT)
var res bool
var err error
param := map[string]string{}
// 产品信息
param["channel"] = "balance_and_position"
res, _, err = r.Subscribe(param)
if res {
fmt.Println("订阅成功!")
} else {
fmt.Println("订阅失败!", err)
t.Fatal("订阅失败!", err)
//return
}
time.Sleep(60 * time.Second)
}