227 lines
4.5 KiB
Go
227 lines
4.5 KiB
Go
/*
|
||
订阅频道后收到的推送数据
|
||
*/
|
||
|
||
package wImpl
|
||
|
||
import (
|
||
"bytes"
|
||
"errors"
|
||
"fmt"
|
||
"hash/crc32"
|
||
"log"
|
||
"strconv"
|
||
"strings"
|
||
)
|
||
|
||
// 普通推送
|
||
type MsgData struct {
|
||
Arg map[string]string `json:"arg"`
|
||
Data []interface{} `json:"data"`
|
||
}
|
||
|
||
// 深度数据
|
||
type DepthData struct {
|
||
Arg map[string]string `json:"arg"`
|
||
Action string `json:"action"`
|
||
Data []DepthDetail `json:"data"`
|
||
}
|
||
|
||
type DepthDetail struct {
|
||
Asks [][]string `json:"asks"`
|
||
Bids [][]string `json:"bids"`
|
||
Ts string `json:"ts"`
|
||
Checksum int32 `json:"checksum"`
|
||
}
|
||
|
||
/*
|
||
深度数据校验
|
||
*/
|
||
func (this *DepthData) CheckSum(snap *DepthDetail) (pDepData *DepthDetail, err error) {
|
||
|
||
if len(this.Data) != 1 {
|
||
err = errors.New("深度数据错误!")
|
||
return
|
||
}
|
||
|
||
if this.Action == DEPTH_SNAPSHOT {
|
||
_, cs := CalCrc32(this.Data[0].Asks, this.Data[0].Bids)
|
||
|
||
if cs != this.Data[0].Checksum {
|
||
err = errors.New("校验失败!")
|
||
return
|
||
}
|
||
pDepData = &this.Data[0]
|
||
log.Println("snapshot校验成功", this.Data[0].Checksum)
|
||
|
||
}
|
||
|
||
if this.Action == DEPTH_UPDATE {
|
||
if snap == nil {
|
||
err = errors.New("深度快照数据不可为空!")
|
||
return
|
||
}
|
||
|
||
pDepData, err = MergDepthData(*snap, this.Data[0], this.Data[0].Checksum)
|
||
if err != nil {
|
||
return
|
||
}
|
||
log.Println("update校验成功", this.Data[0].Checksum)
|
||
}
|
||
|
||
return
|
||
}
|
||
|
||
func CalCrc32(askDepths [][]string, bidDepths [][]string) (bytes.Buffer, int32) {
|
||
|
||
crc32BaseBuffer := bytes.Buffer{}
|
||
crcAskDepth, crcBidDepth := 25, 25
|
||
|
||
if len(askDepths) < 25 {
|
||
crcAskDepth = len(askDepths)
|
||
}
|
||
if len(bidDepths) < 25 {
|
||
crcBidDepth = len(bidDepths)
|
||
}
|
||
if crcAskDepth == crcBidDepth {
|
||
for i := 0; i < crcAskDepth; i++ {
|
||
if crc32BaseBuffer.Len() > 0 {
|
||
crc32BaseBuffer.WriteString(":")
|
||
}
|
||
crc32BaseBuffer.WriteString(
|
||
fmt.Sprintf("%v:%v:%v:%v",
|
||
(bidDepths)[i][0], (bidDepths)[i][1],
|
||
(askDepths)[i][0], (askDepths)[i][1]))
|
||
}
|
||
} else {
|
||
|
||
var crcArr []string
|
||
for i, j := 0, 0; i < crcBidDepth || j < crcAskDepth; {
|
||
|
||
if i < crcBidDepth {
|
||
crcArr = append(crcArr, fmt.Sprintf("%v:%v", (bidDepths)[i][0], (bidDepths)[i][1]))
|
||
i++
|
||
}
|
||
|
||
if j < crcAskDepth {
|
||
crcArr = append(crcArr, fmt.Sprintf("%v:%v", (askDepths)[j][0], (askDepths)[j][1]))
|
||
j++
|
||
}
|
||
}
|
||
|
||
crc32BaseBuffer.WriteString(strings.Join(crcArr, ":"))
|
||
}
|
||
|
||
expectCrc32 := int32(crc32.ChecksumIEEE(crc32BaseBuffer.Bytes()))
|
||
|
||
return crc32BaseBuffer, expectCrc32
|
||
}
|
||
|
||
/*
|
||
深度合并的内部方法
|
||
返回结果:
|
||
res:合并后的深度
|
||
index: 最新的 ask1/bids1 的索引
|
||
*/
|
||
func mergeDepth(oldDepths [][]string, newDepths [][]string, method string) (res [][]string, err error) {
|
||
|
||
oldIdx, newIdx := 0, 0
|
||
|
||
for oldIdx < len(oldDepths) && newIdx < len(newDepths) {
|
||
|
||
oldItem := oldDepths[oldIdx]
|
||
newItem := newDepths[newIdx]
|
||
var oldPrice, newPrice float64
|
||
oldPrice, err = strconv.ParseFloat(oldItem[0], 10)
|
||
if err != nil {
|
||
return
|
||
}
|
||
newPrice, err = strconv.ParseFloat(newItem[0], 10)
|
||
if err != nil {
|
||
return
|
||
}
|
||
|
||
if oldPrice == newPrice {
|
||
if newItem[1] != "0" {
|
||
res = append(res, newItem)
|
||
}
|
||
|
||
oldIdx++
|
||
newIdx++
|
||
} else {
|
||
switch method {
|
||
// 降序
|
||
case "bids":
|
||
if oldPrice < newPrice {
|
||
res = append(res, newItem)
|
||
newIdx++
|
||
} else {
|
||
|
||
res = append(res, oldItem)
|
||
oldIdx++
|
||
}
|
||
// 升序
|
||
case "asks":
|
||
if oldPrice > newPrice {
|
||
res = append(res, newItem)
|
||
newIdx++
|
||
} else {
|
||
|
||
res = append(res, oldItem)
|
||
oldIdx++
|
||
}
|
||
}
|
||
}
|
||
|
||
}
|
||
|
||
if oldIdx < len(oldDepths) {
|
||
res = append(res, oldDepths[oldIdx:]...)
|
||
}
|
||
|
||
if newIdx < len(newDepths) {
|
||
res = append(res, newDepths[newIdx:]...)
|
||
}
|
||
|
||
return
|
||
}
|
||
|
||
/*
|
||
深度合并,并校验
|
||
*/
|
||
func MergDepthData(snap DepthDetail, update DepthDetail, expChecksum int32) (res *DepthDetail, err error) {
|
||
|
||
newAskDepths, err1 := mergeDepth(snap.Asks, update.Asks, "asks")
|
||
if err1 != nil {
|
||
return
|
||
}
|
||
|
||
// log.Println("old Ask - ", snap.Asks)
|
||
// log.Println("update Ask - ", update.Asks)
|
||
// log.Println("new Ask - ", newAskDepths)
|
||
newBidDepths, err2 := mergeDepth(snap.Bids, update.Bids, "bids")
|
||
if err2 != nil {
|
||
return
|
||
}
|
||
// log.Println("old Bids - ", snap.Bids)
|
||
// log.Println("update Bids - ", update.Bids)
|
||
// log.Println("new Bids - ", newBidDepths)
|
||
|
||
cBuf, checksum := CalCrc32(newAskDepths, newBidDepths)
|
||
if checksum != expChecksum {
|
||
err = errors.New("校验失败!")
|
||
log.Println("buffer:", cBuf.String())
|
||
log.Fatal(checksum, expChecksum)
|
||
return
|
||
}
|
||
|
||
res = &DepthDetail{
|
||
Asks: newAskDepths,
|
||
Bids: newBidDepths,
|
||
Ts: update.Ts,
|
||
Checksum: update.Checksum,
|
||
}
|
||
|
||
return
|
||
}
|