v5sdkgo/ws/ws_cli.go

727 lines
15 KiB
Go
Raw Normal View History

2024-12-14 19:09:06 +08:00
package ws
import (
"context"
"encoding/json"
"errors"
"fmt"
2025-03-05 20:52:20 +08:00
. "gitea.zjmud.xyz/phyer/v5sdkgo/config"
. "gitea.zjmud.xyz/phyer/v5sdkgo/utils"
. "gitea.zjmud.xyz/phyer/v5sdkgo/ws/wImpl"
2024-12-14 19:09:06 +08:00
"log"
"regexp"
"runtime/debug"
"sync"
"time"
"github.com/gorilla/websocket"
)
// 全局回调函数
type ReceivedDataCallback func(*Msg) error
// 普通订阅推送数据回调函数
type ReceivedMsgDataCallback func(time.Time, MsgData) error
// 深度订阅推送数据回调函数
type ReceivedDepthDataCallback func(time.Time, DepthData) error
// websocket
type WsClient struct {
WsEndPoint string
WsApi *ApiInfo
conn *websocket.Conn
sendCh chan string //发消息队列
resCh chan *Msg //收消息队列
errCh chan *Msg
regCh map[Event]chan *Msg //请求响应队列
quitCh chan struct{}
lock sync.RWMutex
onMessageHook ReceivedDataCallback //全局消息回调函数
onBookMsgHook ReceivedMsgDataCallback //普通订阅消息回调函数
onDepthHook ReceivedDepthDataCallback //深度订阅消息回调函数
OnErrorHook ReceivedDataCallback //错误处理回调函数
// 记录深度信息
DepthDataList map[string]DepthDetail
autoDepthMgr bool // 深度数据管理checksum等
DepthDataLock sync.RWMutex
isStarted bool //防止重复启动和关闭
dailTimeout time.Duration
}
/*
2024-12-16 11:41:20 +08:00
服务端响应详细信息
Timestamp: 接受到消息的时间
Info: 接受到的消息字符串
2024-12-14 19:09:06 +08:00
*/
type Msg struct {
Timestamp time.Time `json:"timestamp"`
Info interface{} `json:"info"`
}
func (this *Msg) Print() {
fmt.Println("【消息时间】", this.Timestamp.Format("2006-01-02 15:04:05.000"))
str, _ := json.Marshal(this.Info)
fmt.Println("【消息内容】", string(str))
}
/*
2024-12-16 11:41:20 +08:00
订阅结果封装后的消息结构体
2024-12-14 19:09:06 +08:00
*/
type ProcessDetail struct {
EndPoint string `json:"endPoint"`
ReqInfo string `json:"ReqInfo"` //订阅请求
SendTime time.Time `json:"sendTime"` //发送订阅请求的时间
RecvTime time.Time `json:"recvTime"` //接受到订阅结果的时间
UsedTime time.Duration `json:"UsedTime"` //耗时
Data []*Msg `json:"data"` //订阅结果数据
}
func (p *ProcessDetail) String() string {
data, _ := json.Marshal(p)
return string(data)
}
// 创建ws对象
func NewWsClient(ep string) (r *WsClient, err error) {
if ep == "" {
err = errors.New("websocket endpoint cannot be null")
return
}
r = &WsClient{
WsEndPoint: ep,
sendCh: make(chan string),
resCh: make(chan *Msg),
errCh: make(chan *Msg),
regCh: make(map[Event]chan *Msg),
//cbs: make(map[Event]ReceivedDataCallback),
quitCh: make(chan struct{}),
DepthDataList: make(map[string]DepthDetail),
dailTimeout: time.Second * 10,
// 自动深度校验默认开启
autoDepthMgr: true,
}
return
}
/*
2024-12-16 11:41:20 +08:00
新增记录深度信息
2024-12-14 19:09:06 +08:00
*/
func (a *WsClient) addDepthDataList(key string, dd DepthDetail) error {
a.DepthDataLock.Lock()
defer a.DepthDataLock.Unlock()
a.DepthDataList[key] = dd
return nil
}
/*
2024-12-16 11:41:20 +08:00
更新记录深度信息如果没有记录不会更新成功
2024-12-14 19:09:06 +08:00
*/
func (a *WsClient) updateDepthDataList(key string, dd DepthDetail) error {
a.DepthDataLock.Lock()
defer a.DepthDataLock.Unlock()
if _, ok := a.DepthDataList[key]; !ok {
return errors.New("更新失败!未发现记录" + key)
}
a.DepthDataList[key] = dd
return nil
}
/*
2024-12-16 11:41:20 +08:00
删除记录深度信息
2024-12-14 19:09:06 +08:00
*/
func (a *WsClient) deleteDepthDataList(key string) error {
a.DepthDataLock.Lock()
defer a.DepthDataLock.Unlock()
delete(a.DepthDataList, key)
return nil
}
/*
2024-12-16 11:41:20 +08:00
设置是否自动深度管理开启 true关闭 false
2024-12-14 19:09:06 +08:00
*/
func (a *WsClient) EnableAutoDepthMgr(b bool) error {
a.DepthDataLock.Lock()
defer a.DepthDataLock.Unlock()
if len(a.DepthDataList) != 0 {
err := errors.New("当前有深度数据处于订阅中")
return err
}
a.autoDepthMgr = b
return nil
}
/*
2024-12-16 11:41:20 +08:00
获取当前的深度快照信息(合并后的)
2024-12-14 19:09:06 +08:00
*/
func (a *WsClient) GetSnapshotByChannel(data DepthData) (snapshot *DepthDetail, err error) {
key, err := json.Marshal(data.Arg)
if err != nil {
return
}
a.DepthDataLock.Lock()
defer a.DepthDataLock.Unlock()
val, ok := a.DepthDataList[string(key)]
if !ok {
return
}
snapshot = new(DepthDetail)
raw, err := json.Marshal(val)
if err != nil {
return
}
err = json.Unmarshal(raw, &snapshot)
if err != nil {
return
}
return
}
// 设置dial超时时间
func (a *WsClient) SetDailTimeout(tm time.Duration) {
a.dailTimeout = tm
}
// 非阻塞启动
func (a *WsClient) Start() error {
a.lock.RLock()
if a.isStarted {
a.lock.RUnlock()
fmt.Println("ws已经启动")
return nil
} else {
a.lock.RUnlock()
a.lock.Lock()
defer a.lock.Unlock()
// 增加超时处理
done := make(chan struct{})
ctx, cancel := context.WithTimeout(context.Background(), a.dailTimeout)
defer cancel()
go func() {
defer func() {
close(done)
}()
var c *websocket.Conn
fmt.Println("a.WsEndPoint: ", a.WsEndPoint)
c, _, err := websocket.DefaultDialer.Dial(a.WsEndPoint, nil)
if err != nil {
err = errors.New("dial error:" + err.Error())
return
}
a.conn = c
}()
select {
case <-ctx.Done():
err := errors.New("连接超时退出!")
return err
case <-done:
}
//TODO 自定义的推送消息回调回来试试放在这里
go a.receive()
go a.work()
a.isStarted = true
log.Println("客户端已启动!", a.WsEndPoint)
return nil
}
}
// 客户端退出消息channel
func (a *WsClient) IsQuit() <-chan struct{} {
return a.quitCh
}
func (a *WsClient) work() {
defer func() {
a.Stop()
err := recover()
if err != nil {
log.Printf("work End. Recover msg: %+v", a)
debug.PrintStack()
}
}()
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C: // 保持心跳
// go a.Ping(1000)
go func() {
_, _, err := a.Ping(1000)
if err != nil {
fmt.Println("心跳检测失败!", err)
a.Stop()
return
}
}()
case <-a.quitCh: // 保持心跳
return
case data, ok := <-a.resCh: //接收到服务端发来的消息
if !ok {
return
}
//log.Println("接收到来自resCh的消息:", data)
if a.onMessageHook != nil {
err := a.onMessageHook(data)
if err != nil {
log.Println("执行onMessageHook函数错误", err)
}
}
case errMsg, ok := <-a.errCh: //错误处理
if !ok {
return
}
if a.OnErrorHook != nil {
err := a.OnErrorHook(errMsg)
if err != nil {
log.Println("执行OnErrorHook函数错误", err)
}
}
case req, ok := <-a.sendCh: //从发送队列中取出数据发送到服务端
if !ok {
return
}
//log.Println("接收到来自req的消息:", req)
err := a.conn.WriteMessage(websocket.TextMessage, []byte(req))
if err != nil {
log.Printf("发送请求失败: %s\n", err)
return
}
log.Printf("[发送请求] %v\n", req)
}
}
}
/*
2024-12-16 11:41:20 +08:00
处理接受到的消息
2024-12-14 19:09:06 +08:00
*/
func (a *WsClient) receive() {
defer func() {
a.Stop()
err := recover()
if err != nil {
log.Printf("Receive End. Recover msg: %+v", a)
debug.PrintStack()
}
}()
for {
messageType, message, err := a.conn.ReadMessage()
if err != nil {
if a.isStarted {
log.Println("receive message error!" + err.Error())
}
break
}
txtMsg := message
switch messageType {
case websocket.TextMessage:
case websocket.BinaryMessage:
txtMsg, err = GzipDecode(message)
if err != nil {
log.Println("解压失败!")
continue
}
}
log.Println("[收到消息]", string(txtMsg))
//发送结果到默认消息处理通道
timestamp := time.Now()
msg := &Msg{Timestamp: timestamp, Info: string(txtMsg)}
a.resCh <- msg
evt, data, err := a.parseMessage(txtMsg)
if err != nil {
log.Println("解析消息失败!", err)
continue
}
//log.Println("解析消息成功!消息类型 =", evt)
a.lock.RLock()
ch, ok := a.regCh[evt]
a.lock.RUnlock()
if !ok {
//只有推送消息才会主动创建通道和消费队列
if evt == EVENT_BOOKED_DATA || evt == EVENT_DEPTH_DATA {
//log.Println("channel不存在event:", evt)
//a.lock.RUnlock()
a.lock.Lock()
a.regCh[evt] = make(chan *Msg)
ch = a.regCh[evt]
a.lock.Unlock()
//log.Println("创建", evt, "通道")
// 创建消费队列
go func(evt Event) {
//log.Println("创建goroutine evt:", evt)
for msg := range a.regCh[evt] {
//log.Println(msg)
// msg.Print()
switch evt {
// 处理普通推送数据
case EVENT_BOOKED_DATA:
fn := a.onBookMsgHook
if fn != nil {
err = fn(msg.Timestamp, msg.Info.(MsgData))
if err != nil {
log.Println("订阅数据回调函数执行失败!", err)
}
//log.Println("函数执行成功!", err)
}
// 处理深度推送数据
case EVENT_DEPTH_DATA:
fn := a.onDepthHook
depData := msg.Info.(DepthData)
// 开启深度数据管理功能的,会合并深度数据
if a.autoDepthMgr {
a.MergeDepth(depData)
}
// 运行用户定义回调函数
if fn != nil {
err = fn(msg.Timestamp, msg.Info.(DepthData))
if err != nil {
log.Println("深度回调函数执行失败!", err)
}
}
}
}
//log.Println("退出goroutine evt:", evt)
}(evt)
//continue
} else {
//log.Println("程序异常!通道已关闭", evt)
continue
}
}
//log.Println(evt,"事件已注册",ch)
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Millisecond*1000)
select {
/*
丢弃消息容易引发数据处理处理错误
*/
// case <-ctx.Done():
// log.Println("等待超时,消息丢弃 - ", data)
case ch <- &Msg{Timestamp: timestamp, Info: data}:
}
cancel()
}
}
/*
2024-12-16 11:41:20 +08:00
开启了深度数据管理功能后系统会自动合并深度信息
2024-12-14 19:09:06 +08:00
*/
func (a *WsClient) MergeDepth(depData DepthData) (err error) {
if !a.autoDepthMgr {
return
}
key, err := json.Marshal(depData.Arg)
if err != nil {
err = errors.New("数据错误")
return
}
// books5 不需要做checksum
if depData.Arg["channel"] == "books5" {
a.addDepthDataList(string(key), depData.Data[0])
return
}
if depData.Action == "snapshot" {
_, err = depData.CheckSum(nil)
if err != nil {
log.Println("校验失败", err)
return
}
a.addDepthDataList(string(key), depData.Data[0])
} else {
var newSnapshot *DepthDetail
a.DepthDataLock.RLock()
oldSnapshot, ok := a.DepthDataList[string(key)]
if !ok {
log.Println("深度数据错误,全量数据未发现!")
err = errors.New("数据错误")
return
}
a.DepthDataLock.RUnlock()
newSnapshot, err = depData.CheckSum(&oldSnapshot)
if err != nil {
log.Println("深度校验失败", err)
err = errors.New("校验失败")
return
}
a.updateDepthDataList(string(key), *newSnapshot)
}
return
}
/*
2024-12-16 11:41:20 +08:00
通过ErrorCode判断事件类型
2024-12-14 19:09:06 +08:00
*/
func GetInfoFromErrCode(data ErrData) Event {
switch data.Code {
case "60001":
return EVENT_LOGIN
case "60002":
return EVENT_LOGIN
case "60003":
return EVENT_LOGIN
case "60004":
return EVENT_LOGIN
case "60005":
return EVENT_LOGIN
case "60006":
return EVENT_LOGIN
case "60007":
return EVENT_LOGIN
case "60008":
return EVENT_LOGIN
case "60009":
return EVENT_LOGIN
case "60010":
return EVENT_LOGIN
case "60011":
return EVENT_LOGIN
}
return EVENT_UNKNOWN
}
/*
2024-12-16 11:41:20 +08:00
从error返回中解析出对应的channel
error信息样例
{"event":"error","msg":"channel:index-tickers,instId:BTC-USDT1 doesn't exist","code":"60018"}
2024-12-14 19:09:06 +08:00
*/
func GetInfoFromErrMsg(raw string) (channel string) {
reg := regexp.MustCompile(`channel:(.*?),`)
if reg == nil {
fmt.Println("MustCompile err")
return
}
//提取关键信息
result := reg.FindAllStringSubmatch(raw, -1)
for _, text := range result {
channel = text[1]
}
return
}
/*
2024-12-16 11:41:20 +08:00
解析消息类型
2024-12-14 19:09:06 +08:00
*/
func (a *WsClient) parseMessage(raw []byte) (evt Event, data interface{}, err error) {
evt = EVENT_UNKNOWN
//log.Println("解析消息")
//log.Println("消息内容:", string(raw))
if string(raw) == "pong" {
evt = EVENT_PING
data = raw
return
}
//log.Println(0, evt)
var rspData = RspData{}
err = json.Unmarshal(raw, &rspData)
if err == nil {
op := rspData.Event
if op == OP_SUBSCRIBE || op == OP_UNSUBSCRIBE {
channel := rspData.Arg["channel"]
evt = GetEventId(channel)
data = rspData
return
}
}
//log.Println("ErrData")
var errData = ErrData{}
err = json.Unmarshal(raw, &errData)
if err == nil {
op := errData.Event
switch op {
case OP_LOGIN:
evt = EVENT_LOGIN
data = errData
//log.Println(3, evt)
return
case OP_ERROR:
data = errData
// TODO:细化报错对应的事件判断
//尝试从msg字段中解析对应的事件类型
evt = GetInfoFromErrCode(errData)
if evt != EVENT_UNKNOWN {
return
}
evt = GetEventId(GetInfoFromErrMsg(errData.Msg))
if evt == EVENT_UNKNOWN {
evt = EVENT_ERROR
return
}
return
}
//log.Println(5, evt)
}
//log.Println("JRPCRsp")
var jRPCRsp = JRPCRsp{}
err = json.Unmarshal(raw, &jRPCRsp)
if err == nil {
data = jRPCRsp
evt = GetEventId(jRPCRsp.Op)
if evt != EVENT_UNKNOWN {
return
}
}
var depthData = DepthData{}
err = json.Unmarshal(raw, &depthData)
if err == nil {
evt = EVENT_DEPTH_DATA
data = depthData
//log.Println("-->>EVENT_DEPTH_DATA", evt)
//log.Println(evt, data)
//log.Println(6)
switch depthData.Arg["channel"] {
case "books":
return
case "books-l2-tbt":
return
case "books50-l2-tbt":
return
case "books5":
return
default:
}
}
//log.Println("MsgData")
var msgData = MsgData{}
err = json.Unmarshal(raw, &msgData)
if err == nil {
evt = EVENT_BOOKED_DATA
data = msgData
//log.Println("-->>EVENT_BOOK_DATA", evt)
//log.Println(evt, data)
//log.Println(6)
return
}
evt = EVENT_UNKNOWN
err = errors.New("message unknown")
return
}
func (a *WsClient) Stop() error {
a.lock.Lock()
defer a.lock.Unlock()
if !a.isStarted {
return nil
}
a.isStarted = false
if a.conn != nil {
a.conn.Close()
}
close(a.errCh)
close(a.sendCh)
close(a.resCh)
close(a.quitCh)
for _, ch := range a.regCh {
close(ch)
}
log.Println("ws客户端退出!")
return nil
}
/*
2024-12-16 11:41:20 +08:00
添加全局消息处理的回调函数
2024-12-14 19:09:06 +08:00
*/
func (a *WsClient) AddMessageHook(fn ReceivedDataCallback) error {
a.onMessageHook = fn
return nil
}
/*
2024-12-16 11:41:20 +08:00
添加订阅消息处理的回调函数
2024-12-14 19:09:06 +08:00
*/
func (a *WsClient) AddBookMsgHook(fn ReceivedMsgDataCallback) error {
a.onBookMsgHook = fn
return nil
}
/*
2024-12-16 11:41:20 +08:00
添加深度消息处理的回调函数
例如:
cli.AddDepthHook(func(ts time.Time, data DepthData) error { return nil })
2024-12-14 19:09:06 +08:00
*/
func (a *WsClient) AddDepthHook(fn ReceivedDepthDataCallback) error {
a.onDepthHook = fn
return nil
}
/*
2024-12-16 11:41:20 +08:00
添加错误类型消息处理的回调函数
2024-12-14 19:09:06 +08:00
*/
func (a *WsClient) AddErrMsgHook(fn ReceivedDataCallback) error {
a.OnErrorHook = fn
return nil
}
/*
2024-12-16 11:41:20 +08:00
判断连接是否存活
2024-12-14 19:09:06 +08:00
*/
func (a *WsClient) IsAlive() bool {
res := false
if a.conn == nil {
return res
}
res, _, _ = a.Ping(500)
return res
}