把core相关的部分单独抽离出来做成公共的模块w
This commit is contained in:
commit
7c451a49d8
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
|||||||
|
submodules/
|
3
.gitmodules
vendored
Normal file
3
.gitmodules
vendored
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
[submodule "submodule/okex"]
|
||||||
|
path = submodule/okex
|
||||||
|
url = baidu:/root/repos/go/okexV5Api
|
13
README.md
Normal file
13
README.md
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
|
||||||
|
## Init
|
||||||
|
|
||||||
|
mkdir submodules && cd submodules
|
||||||
|
git submodule add baidu:/root/repos/go/okexV5Api okex
|
||||||
|
cd ../
|
||||||
|
git pull
|
||||||
|
git submodule init
|
||||||
|
git submodule update --force --recursive --init --remote
|
||||||
|
go mod tidy
|
||||||
|
go mod vendor
|
||||||
|
|
||||||
|
|
470
candle.go
Normal file
470
candle.go
Normal file
@ -0,0 +1,470 @@
|
|||||||
|
package core
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/hex"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
// "v5sdk_go/rest"
|
||||||
|
|
||||||
|
simple "github.com/bitly/go-simplejson"
|
||||||
|
"github.com/go-redis/redis"
|
||||||
|
"github.com/phyer/texus/utils"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Candle struct {
|
||||||
|
Id string `json:"_id"`
|
||||||
|
core *Core
|
||||||
|
InstId string
|
||||||
|
Period string
|
||||||
|
Data []interface{}
|
||||||
|
From string
|
||||||
|
Timestamp time.Time
|
||||||
|
Open float64
|
||||||
|
High float64
|
||||||
|
Low float64
|
||||||
|
Close float64
|
||||||
|
VolCcy float64
|
||||||
|
Confirm bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type MaX struct {
|
||||||
|
Core *Core
|
||||||
|
InstId string
|
||||||
|
Period string
|
||||||
|
KeyName string
|
||||||
|
Count int
|
||||||
|
Ts int64
|
||||||
|
Value float64
|
||||||
|
Data []interface{}
|
||||||
|
From string
|
||||||
|
}
|
||||||
|
|
||||||
|
type MatchCheck struct {
|
||||||
|
Minutes int64
|
||||||
|
Matched bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mc *MatchCheck) SetMatched(value bool) {
|
||||||
|
mc.Matched = value
|
||||||
|
}
|
||||||
|
|
||||||
|
func (core *Core) GetCandlesWithRest(instId string, kidx int, dura time.Duration, maxCandles int) error {
|
||||||
|
ary := []string{}
|
||||||
|
|
||||||
|
wsary := core.Cfg.CandleDimentions
|
||||||
|
for k, v := range wsary {
|
||||||
|
matched := false
|
||||||
|
// 这个算法的目的是:越靠后的candles维度,被命中的概率越低,第一个百分之百命中,后面开始越来越低, 每分钟都会发生这样的计算,
|
||||||
|
// 因为维度多了的话,照顾不过来
|
||||||
|
rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
rand.Seed(time.Now().UnixNano())
|
||||||
|
n := (k*2 + 2) * 3
|
||||||
|
if n < 1 {
|
||||||
|
n = 1
|
||||||
|
}
|
||||||
|
b := rand.Intn(n)
|
||||||
|
if b < 8 {
|
||||||
|
matched = true
|
||||||
|
}
|
||||||
|
if matched {
|
||||||
|
ary = append(ary, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mdura := dura/(time.Duration(len(ary)+1)) - 50*time.Millisecond
|
||||||
|
// fmt.Println("loop4 Ticker Start instId, dura: ", instId, dura, dura/10, mdura, len(ary), " idx: ", kidx)
|
||||||
|
// time.Duration(len(ary)+1)
|
||||||
|
ticker := time.NewTicker(mdura)
|
||||||
|
done := make(chan bool)
|
||||||
|
idx := 0
|
||||||
|
go func(i int) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
if i >= (len(ary)) {
|
||||||
|
done <- true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
rand.Seed(time.Now().UnixNano())
|
||||||
|
b := rand.Intn(2)
|
||||||
|
maxCandles = maxCandles * (i + b) * 2
|
||||||
|
|
||||||
|
if maxCandles < 3 {
|
||||||
|
maxCandles = 3
|
||||||
|
}
|
||||||
|
if maxCandles > 30 {
|
||||||
|
maxCandles = 30
|
||||||
|
}
|
||||||
|
mx := strconv.Itoa(maxCandles)
|
||||||
|
// fmt.Println("loop4 getCandlesWithRest, instId, period,limit,dura, t: ", instId, ary[i], mx, mdura)
|
||||||
|
go func(ii int) {
|
||||||
|
restQ := RestQueue{
|
||||||
|
InstId: instId,
|
||||||
|
Bar: ary[ii],
|
||||||
|
Limit: mx,
|
||||||
|
Duration: mdura,
|
||||||
|
WithWs: true,
|
||||||
|
}
|
||||||
|
js, _ := json.Marshal(restQ)
|
||||||
|
core.RedisCli.LPush("restQueue", js)
|
||||||
|
}(i)
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(idx)
|
||||||
|
time.Sleep(dura - 10*time.Millisecond)
|
||||||
|
ticker.Stop()
|
||||||
|
// fmt.Println("loop4 Ticker stopped instId, dura: ", instId, dura, mdura)
|
||||||
|
done <- true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 当前的时间毫秒数 对于某个时间段,比如3分钟,10分钟,是否可以被整除,
|
||||||
|
func IsModOf(curInt int64, duration time.Duration) bool {
|
||||||
|
vol := int64(0)
|
||||||
|
if duration < 24*time.Hour {
|
||||||
|
// 小于1天
|
||||||
|
vol = (curInt + 28800000)
|
||||||
|
} else if duration >= 24*time.Hour && duration < 48*time.Hour {
|
||||||
|
// 1天
|
||||||
|
vol = curInt - 1633881600000
|
||||||
|
} else if duration >= 48*time.Hour && duration < 72*time.Hour {
|
||||||
|
// 2天
|
||||||
|
vol = curInt - 1633795200000
|
||||||
|
} else if duration >= 72*time.Hour && duration < 120*time.Hour {
|
||||||
|
// 3天
|
||||||
|
vol = curInt - 1633708800000
|
||||||
|
} else if duration >= 120*time.Hour {
|
||||||
|
// 5天
|
||||||
|
vol = curInt - 1633795200000
|
||||||
|
} else {
|
||||||
|
// fmt.Println("noMatched:", curInt)
|
||||||
|
}
|
||||||
|
|
||||||
|
mody := vol % duration.Milliseconds()
|
||||||
|
if mody == 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (core *Core) SaveCandle(instId string, period string, rsp *CandleData, dura time.Duration, withWs bool) {
|
||||||
|
leng := len(rsp.Data)
|
||||||
|
for _, v := range rsp.Data {
|
||||||
|
candle := Candle{
|
||||||
|
InstId: instId,
|
||||||
|
Period: period,
|
||||||
|
Data: v,
|
||||||
|
From: "rest",
|
||||||
|
}
|
||||||
|
//存到elasticSearch
|
||||||
|
candle.PushToWriteLogChan(core)
|
||||||
|
//保存rest得到的candle
|
||||||
|
// 发布到allCandles|publish, 给外部订阅者用于setToKey
|
||||||
|
arys := []string{ALLCANDLES_PUBLISH}
|
||||||
|
if withWs {
|
||||||
|
arys = append(arys, ALLCANDLES_INNER_PUBLISH)
|
||||||
|
}
|
||||||
|
// 如果candle都不需要存到redis,那么AddToGeneralCandleChnl也没有意义
|
||||||
|
saveCandle := os.Getenv("TEXUS_SAVECANDLE")
|
||||||
|
if saveCandle == "true" {
|
||||||
|
candle.SetToKey(core)
|
||||||
|
core.AddToGeneralCandleChnl(&candle, arys)
|
||||||
|
time.Sleep(dura / time.Duration(leng))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (candle *Candle) PushToWriteLogChan(cr *Core) error {
|
||||||
|
did := candle.InstId + candle.Period + candle.Data[0].(string)
|
||||||
|
candle.Id = HashString(did)
|
||||||
|
ncd, _ := candle.ToStruct(cr)
|
||||||
|
fmt.Println("ncd: ", ncd)
|
||||||
|
cd, _ := json.Marshal(ncd)
|
||||||
|
wg := WriteLog{
|
||||||
|
Content: cd,
|
||||||
|
Tag: "sardine.log.candle." + candle.Period,
|
||||||
|
Id: candle.Id,
|
||||||
|
}
|
||||||
|
cr.WriteLogChan <- &wg
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func Daoxu(arr []interface{}) {
|
||||||
|
var temp interface{}
|
||||||
|
length := len(arr)
|
||||||
|
for i := 0; i < length/2; i++ {
|
||||||
|
temp = arr[i]
|
||||||
|
arr[i] = arr[length-1-i]
|
||||||
|
arr[length-1-i] = temp
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func HashString(input string) string {
|
||||||
|
// 计算SHA-256哈希值
|
||||||
|
hash := sha256.Sum256([]byte(input))
|
||||||
|
// 转换为十六进制字符串
|
||||||
|
hashHex := hex.EncodeToString(hash[:])
|
||||||
|
// 返回前20位
|
||||||
|
return hashHex[:23]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cl *Candle) ToStruct(core *Core) (*Candle, error) {
|
||||||
|
// cl.Timestamp
|
||||||
|
ncd := Candle{}
|
||||||
|
ncd.Id = cl.Id
|
||||||
|
ncd.Period = cl.Period
|
||||||
|
ncd.InstId = cl.InstId
|
||||||
|
ncd.From = cl.From
|
||||||
|
|
||||||
|
// 将字符串转换为 int64 类型的时间戳
|
||||||
|
ts, err := strconv.ParseInt(cl.Data[0].(string), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("Error parsing timestamp:", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
ncd.Timestamp = time.Unix(ts/1000, (ts%1000)*1000000) // 纳秒级别
|
||||||
|
op, err := strconv.ParseFloat(cl.Data[1].(string), 64)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("Error parsing string to float64:", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
ncd.Open = op
|
||||||
|
hi, err := strconv.ParseFloat(cl.Data[2].(string), 64)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("Error parsing string to float64:", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
ncd.High = hi
|
||||||
|
lo, err := strconv.ParseFloat(cl.Data[3].(string), 64)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("Error parsing string to float64:", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
ncd.Low = lo
|
||||||
|
clse, err := strconv.ParseFloat(cl.Data[4].(string), 64)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("Error parsing string to float64:", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
ncd.Close = clse
|
||||||
|
ncd.VolCcy, err = strconv.ParseFloat(cl.Data[6].(string), 64)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("Error parsing string to float64:", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if cl.Data[6].(string) == "1" {
|
||||||
|
ncd.Confirm = true
|
||||||
|
} else {
|
||||||
|
ncd.Confirm = false
|
||||||
|
}
|
||||||
|
return &ncd, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mx *MaX) SetToKey() ([]interface{}, error) {
|
||||||
|
cstr := strconv.Itoa(mx.Count)
|
||||||
|
tss := strconv.FormatInt(mx.Ts, 10)
|
||||||
|
keyName := "ma" + cstr + "|candle" + mx.Period + "|" + mx.InstId + "|ts:" + tss
|
||||||
|
//过期时间:根号(当前candle的周期/1分钟)*10000
|
||||||
|
dt := []interface{}{}
|
||||||
|
dt = append(dt, mx.Ts)
|
||||||
|
dt = append(dt, mx.Value)
|
||||||
|
dj, _ := json.Marshal(dt)
|
||||||
|
exp := mx.Core.PeriodToMinutes(mx.Period)
|
||||||
|
expf := utils.Sqrt(float64(exp)) * 100
|
||||||
|
extt := time.Duration(expf) * time.Minute
|
||||||
|
// loc, _ := time.LoadLocation("Asia/Shanghai")
|
||||||
|
// tm := time.UnixMilli(mx.Ts).In(loc).Format("2006-01-02 15:04")
|
||||||
|
// fmt.Println("setToKey:", keyName, "ts:", tm, string(dj), "from: ", mx.From)
|
||||||
|
_, err := mx.Core.RedisCli.GetSet(keyName, dj).Result()
|
||||||
|
mx.Core.RedisCli.Expire(keyName, extt)
|
||||||
|
return dt, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 保证同一个 period, keyName ,在一个周期里,SaveToSortSet只会被执行一次
|
||||||
|
func (core *Core) SaveUniKey(period string, keyName string, extt time.Duration, tsi int64, cl *Candle) {
|
||||||
|
|
||||||
|
refName := keyName + "|refer"
|
||||||
|
refRes, _ := core.RedisCli.GetSet(refName, 1).Result()
|
||||||
|
core.RedisCli.Expire(refName, extt)
|
||||||
|
// 为保证唯一性机制,防止SaveToSortSet 被重复执行
|
||||||
|
if len(refRes) != 0 {
|
||||||
|
fmt.Println("refName exist: ", refName)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
core.SaveToSortSet(period, keyName, extt, tsi)
|
||||||
|
}
|
||||||
|
|
||||||
|
// tsi: 上报时间timeStamp millinSecond
|
||||||
|
func (core *Core) SaveToSortSet(period string, keyName string, extt time.Duration, tsi int64) {
|
||||||
|
ary := strings.Split(keyName, "ts:")
|
||||||
|
setName := ary[0] + "sortedSet"
|
||||||
|
z := redis.Z{
|
||||||
|
Score: float64(tsi),
|
||||||
|
Member: keyName,
|
||||||
|
}
|
||||||
|
rs, err := core.RedisCli.ZAdd(setName, z).Result()
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("err of ma7|ma30 add to redis:", err)
|
||||||
|
} else {
|
||||||
|
fmt.Println("sortedSet added to redis:", rs, keyName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cr *Core) PeriodToMinutes(period string) int64 {
|
||||||
|
ary := strings.Split(period, "")
|
||||||
|
beiStr := "1"
|
||||||
|
danwei := ""
|
||||||
|
if len(ary) == 3 {
|
||||||
|
beiStr = ary[0] + ary[1]
|
||||||
|
danwei = ary[2]
|
||||||
|
} else {
|
||||||
|
beiStr = ary[0]
|
||||||
|
danwei = ary[1]
|
||||||
|
}
|
||||||
|
cheng := 1
|
||||||
|
bei, _ := strconv.Atoi(beiStr)
|
||||||
|
switch danwei {
|
||||||
|
case "m":
|
||||||
|
{
|
||||||
|
cheng = bei
|
||||||
|
break
|
||||||
|
}
|
||||||
|
case "H":
|
||||||
|
{
|
||||||
|
cheng = bei * 60
|
||||||
|
break
|
||||||
|
}
|
||||||
|
case "D":
|
||||||
|
{
|
||||||
|
cheng = bei * 60 * 24
|
||||||
|
break
|
||||||
|
}
|
||||||
|
case "W":
|
||||||
|
{
|
||||||
|
cheng = bei * 60 * 24 * 7
|
||||||
|
break
|
||||||
|
}
|
||||||
|
case "M":
|
||||||
|
{
|
||||||
|
cheng = bei * 60 * 24 * 30
|
||||||
|
break
|
||||||
|
}
|
||||||
|
case "Y":
|
||||||
|
{
|
||||||
|
cheng = bei * 60 * 24 * 365
|
||||||
|
break
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
{
|
||||||
|
fmt.Println("notmatch:", danwei)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return int64(cheng)
|
||||||
|
}
|
||||||
|
|
||||||
|
// type ScanCmd struct {
|
||||||
|
// baseCmd
|
||||||
|
//
|
||||||
|
// page []string
|
||||||
|
// cursor uint64
|
||||||
|
//
|
||||||
|
// process func(cmd Cmder) error
|
||||||
|
// }
|
||||||
|
func (core *Core) GetRangeKeyList(pattern string, from time.Time) ([]*simple.Json, error) {
|
||||||
|
// 比如,用来计算ma30或ma7,倒推多少时间范围,
|
||||||
|
redisCli := core.RedisCli
|
||||||
|
cursor := uint64(0)
|
||||||
|
n := 0
|
||||||
|
allTs := []int64{}
|
||||||
|
var keys []string
|
||||||
|
for {
|
||||||
|
var err error
|
||||||
|
keys, cursor, _ = redisCli.Scan(cursor, pattern+"*", 2000).Result()
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
n += len(keys)
|
||||||
|
if n == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// keys, _ := redisCli.Keys(pattern + "*").Result()
|
||||||
|
for _, key := range keys {
|
||||||
|
keyAry := strings.Split(key, ":")
|
||||||
|
key = keyAry[1]
|
||||||
|
keyi64, _ := strconv.ParseInt(key, 10, 64)
|
||||||
|
allTs = append(allTs, keyi64)
|
||||||
|
}
|
||||||
|
nary := utils.RecursiveBubble(allTs, len(allTs))
|
||||||
|
tt := from.UnixMilli()
|
||||||
|
ff := tt - tt%60000
|
||||||
|
fi := int64(ff)
|
||||||
|
mary := []int64{}
|
||||||
|
for _, v := range nary {
|
||||||
|
if v < fi {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
mary = append(mary, v)
|
||||||
|
}
|
||||||
|
res := []*simple.Json{}
|
||||||
|
for _, v := range mary {
|
||||||
|
// if k > 1 {
|
||||||
|
// break
|
||||||
|
// }
|
||||||
|
nv := pattern + strconv.FormatInt(v, 10)
|
||||||
|
str, err := redisCli.Get(nv).Result()
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("err of redis get key:", nv, err)
|
||||||
|
}
|
||||||
|
cur, err := simple.NewJson([]byte(str))
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("err of create newJson:", str, err)
|
||||||
|
}
|
||||||
|
res = append(res, cur)
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cl *Candle) SetToKey(core *Core) ([]interface{}, error) {
|
||||||
|
data := cl.Data
|
||||||
|
tsi, err := strconv.ParseInt(data[0].(string), 10, 64)
|
||||||
|
tss := strconv.FormatInt(tsi, 10)
|
||||||
|
keyName := "candle" + cl.Period + "|" + cl.InstId + "|ts:" + tss
|
||||||
|
//过期时间:根号(当前candle的周期/1分钟)*10000
|
||||||
|
|
||||||
|
dt, err := json.Marshal(cl.Data)
|
||||||
|
exp := core.PeriodToMinutes(cl.Period)
|
||||||
|
// expf := float64(exp) * 60
|
||||||
|
expf := utils.Sqrt(float64(exp)) * 100
|
||||||
|
extt := time.Duration(expf) * time.Minute
|
||||||
|
curVolstr, _ := data[5].(string)
|
||||||
|
curVol, err := strconv.ParseFloat(curVolstr, 64)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("err of convert ts:", err)
|
||||||
|
}
|
||||||
|
curVolCcystr, _ := data[6].(string)
|
||||||
|
curVolCcy, err := strconv.ParseFloat(curVolCcystr, 64)
|
||||||
|
curPrice := curVolCcy / curVol
|
||||||
|
if curPrice <= 0 {
|
||||||
|
fmt.Println("price有问题", curPrice, "dt: ", string(dt), "from:", cl.From)
|
||||||
|
err = errors.New("price有问题")
|
||||||
|
return cl.Data, err
|
||||||
|
}
|
||||||
|
redisCli := core.RedisCli
|
||||||
|
// tm := time.UnixMilli(tsi).Format("2006-01-02 15:04")
|
||||||
|
fmt.Println("setToKey:", keyName, "ts: ", "price: ", curPrice, "from:", cl.From)
|
||||||
|
redisCli.Set(keyName, dt, extt).Result()
|
||||||
|
core.SaveUniKey(cl.Period, keyName, extt, tsi, cl)
|
||||||
|
return cl.Data, err
|
||||||
|
}
|
122
config.go
Normal file
122
config.go
Normal file
@ -0,0 +1,122 @@
|
|||||||
|
package core
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
simple "github.com/bitly/go-simplejson"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MyConfig struct {
|
||||||
|
Env string `json:"env", string`
|
||||||
|
Config *simple.Json
|
||||||
|
CandleDimentions []string `json:"candleDimentions"`
|
||||||
|
RedisConf *RedisConfig `json:"redis"`
|
||||||
|
CredentialReadOnlyConf *CredentialConfig `json:"credential"`
|
||||||
|
CredentialMutableConf *CredentialConfig `json:"credential"`
|
||||||
|
ConnectConf *ConnectConfig `json:"connect"`
|
||||||
|
// ThreadsConf *ThreadsConfig `json:"threads"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type RedisConfig struct {
|
||||||
|
Url string `json:"url", string`
|
||||||
|
Password string `json:"password", string`
|
||||||
|
Index int `json:"index", string`
|
||||||
|
}
|
||||||
|
|
||||||
|
type CredentialConfig struct {
|
||||||
|
SecretKey string `json:"secretKey", string`
|
||||||
|
BaseUrl string `json:"baseUrl", string`
|
||||||
|
OkAccessKey string `json:"okAccessKey", string`
|
||||||
|
OkAccessPassphrase string `json:"okAccessPassphrase", string`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ConnectConfig struct {
|
||||||
|
LoginSubUrl string `json:"loginSubUrl", string`
|
||||||
|
WsPrivateBaseUrl string `json:"wsPrivateBaseUrl", string`
|
||||||
|
WsPublicBaseUrl string `json:"wsPublicBaseUrl", string`
|
||||||
|
RestBaseUrl string `json:"restBaseUrl", string`
|
||||||
|
}
|
||||||
|
type ThreadsConfig struct {
|
||||||
|
MaxLenTickerStream int `json:"maxLenTickerStream", int`
|
||||||
|
MaxCandles int `json:"maxCandles", string`
|
||||||
|
AsyncChannels int `json:"asyncChannels", int`
|
||||||
|
MaxTickers int `json:"maxTickers", int`
|
||||||
|
RestPeriod int `json:"restPeriod", int`
|
||||||
|
WaitWs int `json:"waitWs", int`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cfg MyConfig) Init() (MyConfig, error) {
|
||||||
|
env := os.Getenv("GO_ENV")
|
||||||
|
arystr := os.Getenv("TUNAS_CANDLESDIMENTIONS")
|
||||||
|
ary := strings.Split(arystr, "|")
|
||||||
|
cfg.CandleDimentions = ary
|
||||||
|
jsonStr, err := ioutil.ReadFile("/go/json/basicConfig.json")
|
||||||
|
if err != nil {
|
||||||
|
jsonStr, err = ioutil.ReadFile("configs/basicConfig.json")
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("err2:", err.Error())
|
||||||
|
return cfg, err
|
||||||
|
}
|
||||||
|
cfg.Config, err = simple.NewJson([]byte(jsonStr))
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("err2:", err.Error())
|
||||||
|
return cfg, err
|
||||||
|
}
|
||||||
|
cfg.Env = env
|
||||||
|
}
|
||||||
|
cfg.Config = cfg.Config.Get(env)
|
||||||
|
|
||||||
|
ru, err := cfg.Config.Get("redis").Get("url").String()
|
||||||
|
rp, _ := cfg.Config.Get("redis").Get("password").String()
|
||||||
|
ri, _ := cfg.Config.Get("redis").Get("index").Int()
|
||||||
|
redisConf := RedisConfig{
|
||||||
|
Url: ru,
|
||||||
|
Password: rp,
|
||||||
|
Index: ri,
|
||||||
|
}
|
||||||
|
// fmt.Println("cfg: ", cfg)
|
||||||
|
cfg.RedisConf = &redisConf
|
||||||
|
ls, _ := cfg.Config.Get("connect").Get("loginSubUrl").String()
|
||||||
|
wsPub, _ := cfg.Config.Get("connect").Get("wsPrivateBaseUrl").String()
|
||||||
|
wsPri, _ := cfg.Config.Get("connect").Get("wsPublicBaseUrl").String()
|
||||||
|
restBu, _ := cfg.Config.Get("connect").Get("restBaseUrl").String()
|
||||||
|
connectConfig := ConnectConfig{
|
||||||
|
LoginSubUrl: ls,
|
||||||
|
WsPublicBaseUrl: wsPub,
|
||||||
|
WsPrivateBaseUrl: wsPri,
|
||||||
|
RestBaseUrl: restBu,
|
||||||
|
}
|
||||||
|
cfg.ConnectConf = &connectConfig
|
||||||
|
return cfg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cfg *MyConfig) GetConfigJson(arr []string) *simple.Json {
|
||||||
|
env := os.Getenv("GO_ENV")
|
||||||
|
fmt.Println("env: ", env)
|
||||||
|
cfg.Env = env
|
||||||
|
|
||||||
|
json, err := ioutil.ReadFile("/go/json/basicConfig.json")
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
json, err = ioutil.ReadFile("configs/basicConfig.json")
|
||||||
|
if err != nil {
|
||||||
|
log.Panic("read config error: ", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("read file err: ", err)
|
||||||
|
}
|
||||||
|
rjson, err := simple.NewJson(json)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("newJson err: ", err)
|
||||||
|
}
|
||||||
|
for _, s := range arr {
|
||||||
|
rjson = rjson.Get(s)
|
||||||
|
// fmt.Println(s, ": ", rjson)
|
||||||
|
}
|
||||||
|
return rjson
|
||||||
|
}
|
12
const.go
Normal file
12
const.go
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
package core
|
||||||
|
|
||||||
|
const MAIN_ALLCOINS_PERIOD_MINUTES = 1
|
||||||
|
const MAIN_ALLCOINS_ONCE_COUNTS = 3
|
||||||
|
const MAIN_ALLCOINS_BAR_PERIOD = "3m"
|
||||||
|
const ALLCANDLES_PUBLISH = "allCandles|publish"
|
||||||
|
const ALLCANDLES_INNER_PUBLISH = "allCandlesInner|publish"
|
||||||
|
const ORDER_PUBLISH = "private|order|publish"
|
||||||
|
const TICKERINFO_PUBLISH = "tickerInfo|publish"
|
||||||
|
const CCYPOSISTIONS_PUBLISH = "ccyPositions|publish"
|
||||||
|
const SUBACTIONS_PUBLISH = "subActions|publish"
|
||||||
|
const ORDER_RESP_PUBLISH = "private|actionResp|publish"
|
532
core.go
Normal file
532
core.go
Normal file
@ -0,0 +1,532 @@
|
|||||||
|
package core
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
// "errors"
|
||||||
|
"fmt"
|
||||||
|
// "math/rand"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
"v5sdk_go/rest"
|
||||||
|
"v5sdk_go/ws"
|
||||||
|
// "v5sdk_go/ws/wImpl"
|
||||||
|
|
||||||
|
simple "github.com/bitly/go-simplejson"
|
||||||
|
"github.com/go-redis/redis"
|
||||||
|
"github.com/phyer/texus/private"
|
||||||
|
"github.com/phyer/texus/utils"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Core struct {
|
||||||
|
Env string
|
||||||
|
Cfg *MyConfig
|
||||||
|
RedisCli *redis.Client
|
||||||
|
RedisCli2 *redis.Client
|
||||||
|
FluentBitUrl string
|
||||||
|
Wg sync.WaitGroup
|
||||||
|
RestQueueChan chan *RestQueue
|
||||||
|
OrderChan chan *private.Order
|
||||||
|
WriteLogChan chan *WriteLog
|
||||||
|
}
|
||||||
|
type RestQueue struct {
|
||||||
|
InstId string
|
||||||
|
Bar string
|
||||||
|
After int64
|
||||||
|
Before int64
|
||||||
|
Limit string
|
||||||
|
Duration time.Duration
|
||||||
|
WithWs bool
|
||||||
|
}
|
||||||
|
type CandleData struct {
|
||||||
|
Code string `json:"code"`
|
||||||
|
Msg string `json:"msg"`
|
||||||
|
Data [][]interface{} `json:"data"` // 用二维数组来接收 candles 数据
|
||||||
|
}
|
||||||
|
type SubAction struct {
|
||||||
|
ActionName string
|
||||||
|
ForAll bool
|
||||||
|
MetaInfo map[string]interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rst *RestQueue) Show(cr *Core) {
|
||||||
|
fmt.Println("restQueue:", rst.InstId, rst.Bar, rst.Limit)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rst *RestQueue) Save(cr *Core) {
|
||||||
|
afterSec := ""
|
||||||
|
if rst.After > 0 {
|
||||||
|
afterSec = fmt.Sprint("&after=", rst.After)
|
||||||
|
}
|
||||||
|
beforeSec := ""
|
||||||
|
if rst.Before > 0 {
|
||||||
|
beforeSec = fmt.Sprint("&before=", rst.Before)
|
||||||
|
}
|
||||||
|
limitSec := ""
|
||||||
|
if len(rst.Limit) > 0 {
|
||||||
|
limitSec = fmt.Sprint("&limit=", rst.Limit)
|
||||||
|
}
|
||||||
|
link := "/api/v5/market/candles?instId=" + rst.InstId + "&bar=" + rst.Bar + limitSec + afterSec + beforeSec
|
||||||
|
|
||||||
|
fmt.Println("restLink: ", link)
|
||||||
|
rsp, err := cr.v5PublicInvoke(link)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("cr.v5PublicInvoke err:", err)
|
||||||
|
} else {
|
||||||
|
fmt.Println("cr.v5PublicInvoke result count:", len(rsp.Data))
|
||||||
|
}
|
||||||
|
cr.SaveCandle(rst.InstId, rst.Bar, rsp, rst.Duration, rst.WithWs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func WriteLogProcess(cr *Core) {
|
||||||
|
for {
|
||||||
|
wg := <-cr.WriteLogChan
|
||||||
|
go func(wg *WriteLog) {
|
||||||
|
fmt.Println("start writelog: " + wg.Tag + " " + wg.Id)
|
||||||
|
wg.Process(cr)
|
||||||
|
}(wg)
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cr *Core) ShowSysTime() {
|
||||||
|
rsp, _ := cr.RestInvoke("/api/v5/public/time", rest.GET)
|
||||||
|
fmt.Println("serverSystem time:", rsp)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (core *Core) Init() {
|
||||||
|
core.Env = os.Getenv("GO_ENV")
|
||||||
|
gitBranch := os.Getenv("gitBranchName")
|
||||||
|
commitID := os.Getenv("gitCommitID")
|
||||||
|
|
||||||
|
fmt.Println("当前环境: ", core.Env)
|
||||||
|
fmt.Println("gitBranch: ", gitBranch)
|
||||||
|
fmt.Println("gitCommitID: ", commitID)
|
||||||
|
cfg := MyConfig{}
|
||||||
|
cfg, _ = cfg.Init()
|
||||||
|
core.Cfg = &cfg
|
||||||
|
cli, err := core.GetRedisCli()
|
||||||
|
core.RedisCli = cli
|
||||||
|
core.RestQueueChan = make(chan *RestQueue)
|
||||||
|
core.WriteLogChan = make(chan *WriteLog)
|
||||||
|
core.OrderChan = make(chan *private.Order)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("init redis client err: ", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (core *Core) GetRedisCli() (*redis.Client, error) {
|
||||||
|
ru := core.Cfg.RedisConf.Url
|
||||||
|
rp := core.Cfg.RedisConf.Password
|
||||||
|
ri := core.Cfg.RedisConf.Index
|
||||||
|
re := os.Getenv("REDIS_URL")
|
||||||
|
if len(re) > 0 {
|
||||||
|
ru = re
|
||||||
|
}
|
||||||
|
client := redis.NewClient(&redis.Options{
|
||||||
|
Addr: ru,
|
||||||
|
Password: rp, //默认空密码
|
||||||
|
DB: ri, //使用默认数据库
|
||||||
|
})
|
||||||
|
pong, err := client.Ping().Result()
|
||||||
|
if pong == "PONG" && err == nil {
|
||||||
|
return client, err
|
||||||
|
} else {
|
||||||
|
fmt.Println("redis状态不可用:", ru, rp, ri, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return client, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (core *Core) GetAllTickerInfo() (*rest.RESTAPIResult, error) {
|
||||||
|
// GET / 获取所有产品行情信息
|
||||||
|
rsp, err := core.RestInvoke("/api/v5/market/tickers?instType=SPOT", rest.GET)
|
||||||
|
return rsp, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (core *Core) GetBalances() (*rest.RESTAPIResult, error) {
|
||||||
|
// TODO 临时用了两个实现,restInvoke,复用原来的会有bug,不知道是谁的bug
|
||||||
|
rsp, err := core.RestInvoke2("/api/v5/account/balance", rest.GET, nil)
|
||||||
|
return rsp, err
|
||||||
|
}
|
||||||
|
func (core *Core) GetLivingOrderList() ([]*private.Order, error) {
|
||||||
|
// TODO 临时用了两个实现,restInvoke,复用原来的会有bug,不知道是谁的bug
|
||||||
|
params := make(map[string]interface{})
|
||||||
|
data, err := core.RestInvoke2("/api/v5/trade/orders-pending", rest.GET, ¶ms)
|
||||||
|
odrsp := private.OrderResp{}
|
||||||
|
err = json.Unmarshal([]byte(data.Body), &odrsp)
|
||||||
|
str, _ := json.Marshal(odrsp)
|
||||||
|
fmt.Println("convert: err:", err, " body: ", data.Body, odrsp, " string:", string(str))
|
||||||
|
list, err := odrsp.Convert()
|
||||||
|
fmt.Println("loopLivingOrder response data:", str)
|
||||||
|
fmt.Println(utils.GetFuncName(), " 当前数据是 ", data.V5Response.Code, " list len:", len(list))
|
||||||
|
return list, err
|
||||||
|
}
|
||||||
|
func (core *Core) LoopInstrumentList() error {
|
||||||
|
for {
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
ctype := ws.SPOT
|
||||||
|
|
||||||
|
redisCli := core.RedisCli
|
||||||
|
counts, err := redisCli.HLen("instruments|" + ctype + "|hash").Result()
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("err of hset to redis:", err)
|
||||||
|
}
|
||||||
|
if counts == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (core *Core) SubscribeTicker(op string) error {
|
||||||
|
mp := make(map[string]string)
|
||||||
|
|
||||||
|
redisCli := core.RedisCli
|
||||||
|
ctype := ws.SPOT
|
||||||
|
mp, err := redisCli.HGetAll("instruments|" + ctype + "|hash").Result()
|
||||||
|
b, err := json.Marshal(mp)
|
||||||
|
js, err := simple.NewJson(b)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("err of unMarshalJson3:", js)
|
||||||
|
}
|
||||||
|
// fmt.Println("ticker js: ", js)
|
||||||
|
instAry := js.MustMap()
|
||||||
|
for k, v := range instAry {
|
||||||
|
b = []byte(v.(string))
|
||||||
|
_, err := simple.NewJson(b)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("err of unMarshalJson4:", js)
|
||||||
|
}
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
go func(instId string, op string) {
|
||||||
|
|
||||||
|
redisCli := core.RedisCli
|
||||||
|
_, err = redisCli.SAdd("tickers|"+op+"|set", instId).Result()
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("err of unMarshalJson5:", js)
|
||||||
|
}
|
||||||
|
}(k, op)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (core *Core) v5PublicInvoke(subUrl string) (*CandleData, error) {
|
||||||
|
restUrl, _ := core.Cfg.Config.Get("connect").Get("restBaseUrl").String()
|
||||||
|
url := restUrl + subUrl
|
||||||
|
resp, err := http.Get(url)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
body, err := ioutil.ReadAll(resp.Body)
|
||||||
|
var result CandleData
|
||||||
|
if err := json.Unmarshal(body, &result); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &result, nil
|
||||||
|
}
|
||||||
|
func (core *Core) RestInvoke(subUrl string, method string) (*rest.RESTAPIResult, error) {
|
||||||
|
restUrl, _ := core.Cfg.Config.Get("connect").Get("restBaseUrl").String()
|
||||||
|
//ep, method, uri string, param *map[string]interface{}
|
||||||
|
rest := rest.NewRESTAPI(restUrl, method, subUrl, nil)
|
||||||
|
key, _ := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessKey").String()
|
||||||
|
secure, _ := core.Cfg.Config.Get("credentialReadOnly").Get("secretKey").String()
|
||||||
|
pass, _ := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessPassphrase").String()
|
||||||
|
isDemo := false
|
||||||
|
if core.Env == "demoEnv" {
|
||||||
|
isDemo = true
|
||||||
|
}
|
||||||
|
rest.SetSimulate(isDemo).SetAPIKey(key, secure, pass)
|
||||||
|
response, err := rest.Run(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("restInvoke1 err:", subUrl, err)
|
||||||
|
}
|
||||||
|
return response, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (core *Core) RestInvoke2(subUrl string, method string, param *map[string]interface{}) (*rest.RESTAPIResult, error) {
|
||||||
|
key, err1 := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessKey").String()
|
||||||
|
secret, err2 := core.Cfg.Config.Get("credentialReadOnly").Get("secretKey").String()
|
||||||
|
pass, err3 := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessPassphrase").String()
|
||||||
|
userId, err4 := core.Cfg.Config.Get("connect").Get("userId").String()
|
||||||
|
restUrl, err5 := core.Cfg.Config.Get("connect").Get("restBaseUrl").String()
|
||||||
|
if err1 != nil || err2 != nil || err3 != nil || err4 != nil || err5 != nil {
|
||||||
|
fmt.Println(err1, err2, err3, err4, err5)
|
||||||
|
} else {
|
||||||
|
fmt.Println("key:", key, secret, pass, "userId:", userId, "restUrl: ", restUrl)
|
||||||
|
}
|
||||||
|
// reqParam := make(map[string]interface{})
|
||||||
|
// if param != nil {
|
||||||
|
// reqParam = *param
|
||||||
|
// }
|
||||||
|
// rs := rest.NewRESTAPI(restUrl, method, subUrl, &reqParam)
|
||||||
|
isDemo := false
|
||||||
|
if core.Env == "demoEnv" {
|
||||||
|
isDemo = true
|
||||||
|
}
|
||||||
|
// rs.SetSimulate(isDemo).SetAPIKey(key, secret, pass).SetUserId(userId)
|
||||||
|
// response, err := rs.Run(context.Background())
|
||||||
|
// if err != nil {
|
||||||
|
// fmt.Println("restInvoke2 err:", subUrl, err)
|
||||||
|
// }
|
||||||
|
apikey := rest.APIKeyInfo{
|
||||||
|
ApiKey: key,
|
||||||
|
SecKey: secret,
|
||||||
|
PassPhrase: pass,
|
||||||
|
}
|
||||||
|
cli := rest.NewRESTClient(restUrl, &apikey, isDemo)
|
||||||
|
rsp, err := cli.Get(context.Background(), subUrl, param)
|
||||||
|
if err != nil {
|
||||||
|
return rsp, err
|
||||||
|
}
|
||||||
|
fmt.Println("response:", rsp, err)
|
||||||
|
return rsp, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (core *Core) RestPost(subUrl string, param *map[string]interface{}) (*rest.RESTAPIResult, error) {
|
||||||
|
key, err1 := core.Cfg.Config.Get("credentialMutable").Get("okAccessKey").String()
|
||||||
|
secret, err2 := core.Cfg.Config.Get("credentialMutable").Get("secretKey").String()
|
||||||
|
pass, err3 := core.Cfg.Config.Get("credentialMutable").Get("okAccessPassphrase").String()
|
||||||
|
userId, err4 := core.Cfg.Config.Get("connect").Get("userId").String()
|
||||||
|
restUrl, err5 := core.Cfg.Config.Get("connect").Get("restBaseUrl").String()
|
||||||
|
if err1 != nil || err2 != nil || err3 != nil || err4 != nil || err5 != nil {
|
||||||
|
fmt.Println(err1, err2, err3, err4, err5)
|
||||||
|
} else {
|
||||||
|
fmt.Println("key:", key, secret, pass, "userId:", userId, "restUrl: ", restUrl)
|
||||||
|
}
|
||||||
|
// 请求的另一种方式
|
||||||
|
apikey := rest.APIKeyInfo{
|
||||||
|
ApiKey: key,
|
||||||
|
SecKey: secret,
|
||||||
|
PassPhrase: pass,
|
||||||
|
}
|
||||||
|
isDemo := false
|
||||||
|
if core.Env == "demoEnv" {
|
||||||
|
isDemo = true
|
||||||
|
}
|
||||||
|
cli := rest.NewRESTClient(restUrl, &apikey, isDemo)
|
||||||
|
rsp, err := cli.Post(context.Background(), subUrl, param)
|
||||||
|
if err != nil {
|
||||||
|
return rsp, err
|
||||||
|
}
|
||||||
|
return rsp, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 我当前持有的币,每分钟刷新
|
||||||
|
func (core *Core) GetMyFavorList() []string {
|
||||||
|
redisCli := core.RedisCli
|
||||||
|
opt := redis.ZRangeBy{
|
||||||
|
Min: "10",
|
||||||
|
Max: "100000000000",
|
||||||
|
}
|
||||||
|
cary, _ := redisCli.ZRevRangeByScore("private|positions|sortedSet", opt).Result()
|
||||||
|
cl := []string{}
|
||||||
|
for _, v := range cary {
|
||||||
|
cry := strings.Split(v, "|")[0] + "-USDT"
|
||||||
|
cl = append(cl, cry)
|
||||||
|
}
|
||||||
|
return cary
|
||||||
|
}
|
||||||
|
|
||||||
|
// 得到交易量排行榜,订阅其中前N名的各维度k线,并merge进来我已经购买的币列表,这个列表是动态更新的
|
||||||
|
// 改了,不需要交易排行榜,我手动指定一个排行即可, tickersVol|sortedSet 改成 tickersList|sortedSet
|
||||||
|
func (core *Core) GetScoreList(count int) []string {
|
||||||
|
|
||||||
|
redisCli := core.RedisCli
|
||||||
|
|
||||||
|
curList, err := redisCli.ZRange("tickersList|sortedSet", 0, int64(count-1)).Result()
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("zrevrange err:", err)
|
||||||
|
}
|
||||||
|
fmt.Println("curList: ", curList)
|
||||||
|
return curList
|
||||||
|
}
|
||||||
|
|
||||||
|
func LoopBalances(cr *Core, mdura time.Duration) {
|
||||||
|
//协程:动态维护topScore
|
||||||
|
ticker := time.NewTicker(mdura)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
//协程:循环执行rest请求candle
|
||||||
|
fmt.Println("loopBalance: receive ccyChannel start")
|
||||||
|
RestBalances(cr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func LoopLivingOrders(cr *Core, mdura time.Duration) {
|
||||||
|
//协程:动态维护topScore
|
||||||
|
ticker := time.NewTicker(mdura)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
//协程:循环执行rest请求candle
|
||||||
|
fmt.Println("loopLivingOrder: receive ccyChannel start")
|
||||||
|
RestLivingOrder(cr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func RestBalances(cr *Core) ([]*private.Ccy, error) {
|
||||||
|
// fmt.Println("restBalance loopBalance loop start")
|
||||||
|
ccyList := []*private.Ccy{}
|
||||||
|
rsp, err := cr.GetBalances()
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("loopBalance err00: ", err)
|
||||||
|
}
|
||||||
|
fmt.Println("loopBalance balance rsp: ", rsp)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("loopBalance err01: ", err)
|
||||||
|
return ccyList, err
|
||||||
|
}
|
||||||
|
if len(rsp.Body) == 0 {
|
||||||
|
fmt.Println("loopBalance err03: rsp body is null")
|
||||||
|
return ccyList, err
|
||||||
|
}
|
||||||
|
js1, err := simple.NewJson([]byte(rsp.Body))
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("loopBalance err1: ", err)
|
||||||
|
}
|
||||||
|
itemList := js1.Get("data").GetIndex(0).Get("details").MustArray()
|
||||||
|
// maxTickers是重点关注的topScore的coins的数量
|
||||||
|
cli := cr.RedisCli
|
||||||
|
for _, v := range itemList {
|
||||||
|
js, _ := json.Marshal(v)
|
||||||
|
ccyResp := private.CcyResp{}
|
||||||
|
err := json.Unmarshal(js, &ccyResp)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("loopBalance err2: ", err)
|
||||||
|
}
|
||||||
|
ccy, err := ccyResp.Convert()
|
||||||
|
ccyList = append(ccyList, ccy)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("loopBalance err2: ", err)
|
||||||
|
}
|
||||||
|
z := redis.Z{
|
||||||
|
Score: ccy.EqUsd,
|
||||||
|
Member: ccy.Ccy + "|position|key",
|
||||||
|
}
|
||||||
|
res, err := cli.ZAdd("private|positions|sortedSet", z).Result()
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("loopBalance err3: ", res, err)
|
||||||
|
}
|
||||||
|
res1, err := cli.Set(ccy.Ccy+"|position|key", js, 0).Result()
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("loopBalance err4: ", res1, err)
|
||||||
|
}
|
||||||
|
bjs, _ := json.Marshal(ccy)
|
||||||
|
tsi := time.Now().Unix()
|
||||||
|
tsii := tsi - tsi%600
|
||||||
|
tss := strconv.FormatInt(tsii, 10)
|
||||||
|
cli.Set(CCYPOSISTIONS_PUBLISH+"|ts:"+tss, 1, 10*time.Minute).Result()
|
||||||
|
fmt.Println("ccy published: ", string(bjs))
|
||||||
|
//TODO FIXME 50毫秒,每分钟上限是1200个订单,超过就无法遍历完成
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
suffix := ""
|
||||||
|
if cr.Env == "demoEnv" {
|
||||||
|
suffix = "-demoEnv"
|
||||||
|
}
|
||||||
|
// TODO FIXME cli2
|
||||||
|
cli.Publish(CCYPOSISTIONS_PUBLISH+suffix, string(bjs)).Result()
|
||||||
|
}
|
||||||
|
return ccyList, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func RestLivingOrder(cr *Core) ([]*private.Order, error) {
|
||||||
|
// fmt.Println("restOrder loopOrder loop start")
|
||||||
|
orderList := []*private.Order{}
|
||||||
|
list, err := cr.GetLivingOrderList()
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("loopLivingOrder err00: ", err)
|
||||||
|
}
|
||||||
|
fmt.Println("loopLivingOrder response:", list)
|
||||||
|
go func() {
|
||||||
|
for _, v := range list {
|
||||||
|
fmt.Println("order orderV:", v)
|
||||||
|
time.Sleep(30 * time.Millisecond)
|
||||||
|
cr.OrderChan <- v
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return orderList, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cr *Core) ProcessOrder(od *private.Order) error {
|
||||||
|
// publish
|
||||||
|
go func() {
|
||||||
|
suffix := ""
|
||||||
|
if cr.Env == "demoEnv" {
|
||||||
|
suffix = "-demoEnv"
|
||||||
|
}
|
||||||
|
cn := ORDER_PUBLISH + suffix
|
||||||
|
bj, _ := json.Marshal(od)
|
||||||
|
|
||||||
|
// TODO FIXME cli2
|
||||||
|
res, _ := cr.RedisCli.Publish(cn, string(bj)).Result()
|
||||||
|
fmt.Println("order publish res: ", res, " content: ", string(bj))
|
||||||
|
rsch := ORDER_RESP_PUBLISH + suffix
|
||||||
|
bj1, _ := json.Marshal(res)
|
||||||
|
|
||||||
|
// TODO FIXME cli2
|
||||||
|
res, _ = cr.RedisCli.Publish(rsch, string(bj1)).Result()
|
||||||
|
}()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cr *Core) DispatchSubAction(action *SubAction) error {
|
||||||
|
go func() {
|
||||||
|
suffix := ""
|
||||||
|
if cr.Env == "demoEnv" {
|
||||||
|
suffix = "-demoEnv"
|
||||||
|
}
|
||||||
|
fmt.Println("action: ", action.ActionName, action.MetaInfo)
|
||||||
|
res, err := cr.RestPostWrapper("/api/v5/trade/"+action.ActionName, action.MetaInfo)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println(utils.GetFuncName(), " actionRes 1:", err)
|
||||||
|
}
|
||||||
|
rsch := ORDER_RESP_PUBLISH + suffix
|
||||||
|
bj1, _ := json.Marshal(res)
|
||||||
|
|
||||||
|
// TODO FIXME cli2
|
||||||
|
rs, _ := cr.RedisCli.Publish(rsch, string(bj1)).Result()
|
||||||
|
fmt.Println("action rs: ", rs)
|
||||||
|
}()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cr *Core) RestPostWrapper(url string, param map[string]interface{}) (rest.Okexv5APIResponse, error) {
|
||||||
|
suffix := ""
|
||||||
|
if cr.Env == "demoEnv" {
|
||||||
|
suffix = "-demoEnv"
|
||||||
|
}
|
||||||
|
res, err := cr.RestPost(url, ¶m)
|
||||||
|
fmt.Println("actionRes 2:", res.V5Response.Msg, res.V5Response.Data, err)
|
||||||
|
bj, _ := json.Marshal(res)
|
||||||
|
|
||||||
|
// TODO FIXME cli2
|
||||||
|
cr.RedisCli.Publish(ORDER_RESP_PUBLISH+suffix, string(bj))
|
||||||
|
return res.V5Response, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cr *Core) AddToGeneralCandleChnl(candle *Candle, channels []string) {
|
||||||
|
redisCli := cr.RedisCli
|
||||||
|
ab, _ := json.Marshal(candle)
|
||||||
|
for _, v := range channels {
|
||||||
|
suffix := ""
|
||||||
|
env := os.Getenv("GO_ENV")
|
||||||
|
if env == "demoEnv" {
|
||||||
|
suffix = "-demoEnv"
|
||||||
|
}
|
||||||
|
vd := v + suffix
|
||||||
|
_, err := redisCli.Publish(vd, string(ab)).Result()
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("err of ma7|ma30 add to redis2:", err, candle.From)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
31
go.mod
Normal file
31
go.mod
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
module github.com/phyer/core
|
||||||
|
|
||||||
|
replace (
|
||||||
|
v5sdk_go/config => ./submodules/okex/config
|
||||||
|
v5sdk_go/rest => ./submodules/okex/rest
|
||||||
|
v5sdk_go/utils => ./submodules/okex/utils
|
||||||
|
v5sdk_go/ws => ./submodules/okex/ws
|
||||||
|
)
|
||||||
|
|
||||||
|
go 1.21
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/bitly/go-simplejson v0.5.0
|
||||||
|
github.com/go-redis/redis v6.15.9+incompatible
|
||||||
|
github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196
|
||||||
|
github.com/sirupsen/logrus v1.9.3
|
||||||
|
v5sdk_go/rest v0.0.0-00010101000000-000000000000
|
||||||
|
v5sdk_go/ws v0.0.0-00010101000000-000000000000
|
||||||
|
)
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/gorilla/websocket v1.5.1 // indirect
|
||||||
|
github.com/kr/text v0.2.0 // indirect
|
||||||
|
github.com/nxadm/tail v1.4.8 // indirect
|
||||||
|
github.com/rogpeppe/go-internal v1.6.1 // indirect
|
||||||
|
golang.org/x/net v0.17.0 // indirect
|
||||||
|
golang.org/x/sys v0.13.0 // indirect
|
||||||
|
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||||
|
v5sdk_go/config v0.0.0-00010101000000-000000000000 // indirect
|
||||||
|
v5sdk_go/utils v0.0.0-00010101000000-000000000000 // indirect
|
||||||
|
)
|
55
go.sum
Normal file
55
go.sum
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
github.com/bitly/go-simplejson v0.5.0 h1:6IH+V8/tVMab511d5bn4M7EwGXZf9Hj6i2xSwkNEM+Y=
|
||||||
|
github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA=
|
||||||
|
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
|
||||||
|
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
|
||||||
|
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||||
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
|
||||||
|
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
|
||||||
|
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
|
||||||
|
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
|
||||||
|
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
|
||||||
|
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
|
||||||
|
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||||
|
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
|
||||||
|
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
|
||||||
|
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||||
|
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||||
|
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||||
|
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||||
|
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
|
||||||
|
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
|
||||||
|
github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc=
|
||||||
|
github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
|
||||||
|
github.com/onsi/gomega v1.16.0 h1:6gjqkI8iiRHMvdccRJM8rVKjCWk6ZIm6FTm3ddIe4/c=
|
||||||
|
github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
|
||||||
|
github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196 h1:P1sxgCsS0VIL38ufZzgUuZLLyY/B+po6kSY7ziNZT7E=
|
||||||
|
github.com/phyer/texus v0.0.0-20241207132635-0e7fb63f8196/go.mod h1:iZexs5agdApNlp8HW/FqKgma4Ij1x8/o+ZLcMvY3f80=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
|
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
|
||||||
|
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
|
||||||
|
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
|
||||||
|
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
|
||||||
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
|
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
|
||||||
|
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
|
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
|
||||||
|
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
|
||||||
|
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||||
|
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
|
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
|
||||||
|
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
|
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
|
||||||
|
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
|
||||||
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
|
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
|
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
|
||||||
|
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
|
||||||
|
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
|
||||||
|
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
|
||||||
|
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
||||||
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
|
||||||
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
70
ticker.go
Normal file
70
ticker.go
Normal file
@ -0,0 +1,70 @@
|
|||||||
|
package core
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"strconv"
|
||||||
|
)
|
||||||
|
|
||||||
|
type TickerInfo struct {
|
||||||
|
Id string `json:"_id"`
|
||||||
|
InstId string `json:"instId"`
|
||||||
|
Last float64 `json:"last"`
|
||||||
|
InstType string `json:"instType"`
|
||||||
|
VolCcy24h float64 `json:"volCcy24h"`
|
||||||
|
Ts int64 `json:"ts"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type TickerInfoResp struct {
|
||||||
|
InstId string `json:"instId"`
|
||||||
|
Last string `json:"last"`
|
||||||
|
InstType string `json:"instType"`
|
||||||
|
VolCcy24h string `json:"volCcy24h"`
|
||||||
|
Ts string `json:"ts"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tir *TickerInfoResp) Convert() TickerInfo {
|
||||||
|
ti := TickerInfo{
|
||||||
|
Id: HashString(tir.InstId + tir.Ts),
|
||||||
|
InstId: tir.InstId,
|
||||||
|
InstType: tir.InstType,
|
||||||
|
Last: ToFloat64(tir.Last),
|
||||||
|
VolCcy24h: ToFloat64(tir.VolCcy24h),
|
||||||
|
Ts: ToInt64(tir.Ts),
|
||||||
|
}
|
||||||
|
return ti
|
||||||
|
}
|
||||||
|
|
||||||
|
func ToString(val interface{}) string {
|
||||||
|
valstr := ""
|
||||||
|
if reflect.TypeOf(val).Name() == "string" {
|
||||||
|
valstr = val.(string)
|
||||||
|
} else if reflect.TypeOf(val).Name() == "float64" {
|
||||||
|
valstr = fmt.Sprintf("%f", val)
|
||||||
|
} else if reflect.TypeOf(val).Name() == "int64" {
|
||||||
|
valstr = strconv.FormatInt(val.(int64), 16)
|
||||||
|
}
|
||||||
|
return valstr
|
||||||
|
}
|
||||||
|
|
||||||
|
func ToInt64(val interface{}) int64 {
|
||||||
|
vali := int64(0)
|
||||||
|
if reflect.TypeOf(val).Name() == "string" {
|
||||||
|
vali, _ = strconv.ParseInt(val.(string), 10, 64)
|
||||||
|
} else if reflect.TypeOf(val).Name() == "float64" {
|
||||||
|
vali = int64(val.(float64))
|
||||||
|
}
|
||||||
|
return vali
|
||||||
|
}
|
||||||
|
|
||||||
|
func ToFloat64(val interface{}) float64 {
|
||||||
|
valf := float64(0)
|
||||||
|
if reflect.TypeOf(val).Name() == "string" {
|
||||||
|
valf, _ = strconv.ParseFloat(val.(string), 64)
|
||||||
|
} else if reflect.TypeOf(val).Name() == "float64" {
|
||||||
|
valf = val.(float64)
|
||||||
|
} else if reflect.TypeOf(val).Name() == "int64" {
|
||||||
|
valf = float64(val.(int64))
|
||||||
|
}
|
||||||
|
return valf
|
||||||
|
}
|
32
writeLog.go
Normal file
32
writeLog.go
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
package core
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
logrus "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
type WriteLog struct {
|
||||||
|
Content []byte
|
||||||
|
Tag string
|
||||||
|
Id string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wg *WriteLog) Process(cr *Core) error {
|
||||||
|
go func() {
|
||||||
|
reqBody := bytes.NewBuffer(wg.Content)
|
||||||
|
cr.Env = os.Getenv("GO_ENV")
|
||||||
|
cr.FluentBitUrl = os.Getenv("TEXUS_FluentBitUrl")
|
||||||
|
fullUrl := "http://" + cr.FluentBitUrl + "/" + wg.Tag
|
||||||
|
res, err := http.Post(fullUrl, "application/json", reqBody)
|
||||||
|
|
||||||
|
fmt.Println("requested, response:", fullUrl, string(wg.Content), res)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Error(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user