v5sdkgo/ws/wImpl/BookData.go

227 lines
4.5 KiB
Go
Raw Permalink Normal View History

2024-12-14 19:09:06 +08:00
/*
订阅频道后收到的推送数据
*/
package wImpl
import (
"bytes"
"errors"
"fmt"
"hash/crc32"
"log"
"strconv"
"strings"
)
// 普通推送
type MsgData struct {
Arg map[string]string `json:"arg"`
Data []interface{} `json:"data"`
}
// 深度数据
type DepthData struct {
Arg map[string]string `json:"arg"`
Action string `json:"action"`
Data []DepthDetail `json:"data"`
}
type DepthDetail struct {
Asks [][]string `json:"asks"`
Bids [][]string `json:"bids"`
Ts string `json:"ts"`
Checksum int32 `json:"checksum"`
}
/*
深度数据校验
*/
func (this *DepthData) CheckSum(snap *DepthDetail) (pDepData *DepthDetail, err error) {
if len(this.Data) != 1 {
err = errors.New("深度数据错误!")
return
}
if this.Action == DEPTH_SNAPSHOT {
_, cs := CalCrc32(this.Data[0].Asks, this.Data[0].Bids)
if cs != this.Data[0].Checksum {
err = errors.New("校验失败!")
return
}
pDepData = &this.Data[0]
log.Println("snapshot校验成功", this.Data[0].Checksum)
}
if this.Action == DEPTH_UPDATE {
if snap == nil {
err = errors.New("深度快照数据不可为空!")
return
}
pDepData, err = MergDepthData(*snap, this.Data[0], this.Data[0].Checksum)
if err != nil {
return
}
log.Println("update校验成功", this.Data[0].Checksum)
}
return
}
func CalCrc32(askDepths [][]string, bidDepths [][]string) (bytes.Buffer, int32) {
crc32BaseBuffer := bytes.Buffer{}
crcAskDepth, crcBidDepth := 25, 25
if len(askDepths) < 25 {
crcAskDepth = len(askDepths)
}
if len(bidDepths) < 25 {
crcBidDepth = len(bidDepths)
}
if crcAskDepth == crcBidDepth {
for i := 0; i < crcAskDepth; i++ {
if crc32BaseBuffer.Len() > 0 {
crc32BaseBuffer.WriteString(":")
}
crc32BaseBuffer.WriteString(
fmt.Sprintf("%v:%v:%v:%v",
(bidDepths)[i][0], (bidDepths)[i][1],
(askDepths)[i][0], (askDepths)[i][1]))
}
} else {
var crcArr []string
for i, j := 0, 0; i < crcBidDepth || j < crcAskDepth; {
if i < crcBidDepth {
crcArr = append(crcArr, fmt.Sprintf("%v:%v", (bidDepths)[i][0], (bidDepths)[i][1]))
i++
}
if j < crcAskDepth {
crcArr = append(crcArr, fmt.Sprintf("%v:%v", (askDepths)[j][0], (askDepths)[j][1]))
j++
}
}
crc32BaseBuffer.WriteString(strings.Join(crcArr, ":"))
}
expectCrc32 := int32(crc32.ChecksumIEEE(crc32BaseBuffer.Bytes()))
return crc32BaseBuffer, expectCrc32
}
/*
深度合并的内部方法
返回结果
res合并后的深度
index: 最新的 ask1/bids1 的索引
*/
func mergeDepth(oldDepths [][]string, newDepths [][]string, method string) (res [][]string, err error) {
oldIdx, newIdx := 0, 0
for oldIdx < len(oldDepths) && newIdx < len(newDepths) {
oldItem := oldDepths[oldIdx]
newItem := newDepths[newIdx]
var oldPrice, newPrice float64
oldPrice, err = strconv.ParseFloat(oldItem[0], 10)
if err != nil {
return
}
newPrice, err = strconv.ParseFloat(newItem[0], 10)
if err != nil {
return
}
if oldPrice == newPrice {
if newItem[1] != "0" {
res = append(res, newItem)
}
oldIdx++
newIdx++
} else {
switch method {
// 降序
case "bids":
if oldPrice < newPrice {
res = append(res, newItem)
newIdx++
} else {
res = append(res, oldItem)
oldIdx++
}
// 升序
case "asks":
if oldPrice > newPrice {
res = append(res, newItem)
newIdx++
} else {
res = append(res, oldItem)
oldIdx++
}
}
}
}
if oldIdx < len(oldDepths) {
res = append(res, oldDepths[oldIdx:]...)
}
if newIdx < len(newDepths) {
res = append(res, newDepths[newIdx:]...)
}
return
}
/*
深度合并并校验
*/
func MergDepthData(snap DepthDetail, update DepthDetail, expChecksum int32) (res *DepthDetail, err error) {
newAskDepths, err1 := mergeDepth(snap.Asks, update.Asks, "asks")
if err1 != nil {
return
}
// log.Println("old Ask - ", snap.Asks)
// log.Println("update Ask - ", update.Asks)
// log.Println("new Ask - ", newAskDepths)
newBidDepths, err2 := mergeDepth(snap.Bids, update.Bids, "bids")
if err2 != nil {
return
}
// log.Println("old Bids - ", snap.Bids)
// log.Println("update Bids - ", update.Bids)
// log.Println("new Bids - ", newBidDepths)
cBuf, checksum := CalCrc32(newAskDepths, newBidDepths)
if checksum != expChecksum {
err = errors.New("校验失败!")
log.Println("buffer:", cBuf.String())
log.Fatal(checksum, expChecksum)
return
}
res = &DepthDetail{
Asks: newAskDepths,
Bids: newBidDepths,
Ts: update.Ts,
Checksum: update.Checksum,
}
return
}