在deepSeep V3提示下进行修改

This commit is contained in:
zhangkun9038@dingtalk.com 2025-01-27 16:43:18 +08:00
parent 9cd51686cc
commit 0a21cb294c
5 changed files with 293 additions and 147 deletions

1
.gitignore vendored
View File

@ -1 +1,2 @@
vendor/
.DS_Store

53
main.go
View File

@ -30,43 +30,32 @@ func main() {
// 目前只有phyer里部署的tunas会发布tickerInfo信息
// fmt.Println("len of rdsLs: ", len(rdsLs))
// 订阅 redis TickerInfo
go func(vv *core.RedisConfig) {
allowed := os.Getenv("SIAGA_ACCEPTTICKER") == "true"
if !allowed {
return
// 定义订阅配置
subscriptions := []struct {
envVar string
channel string
logPrefix string
}{
{"SIAGA_ACCEPTTICKER", core.TICKERINFO_PUBLISH, "TickerInfo"},
{"SIAGA_ACCEPTCANDLE", core.ALLCANDLES_PUBLISH, "Candles"},
{"SIAGA_ACCEPTMAX", core.ALLMAXES_PUBLISH, "Max"},
{"SIAGA_ACCEPTSERIES", core.ALLSERIESINFO_PUBLISH, "Series"},
}
logrus.Info("start subscribe core.TICKERINFO_PUBLISH")
md.LoopSubscribe(&cr, core.TICKERINFO_PUBLISH, vv)
}(rdsLs[0])
// 订阅 redis Candles
go func(vv *core.RedisConfig) {
allowed := os.Getenv("SIAGA_ACCEPTCANDLE") == "true"
if !allowed {
// 启动所有订阅
for _, sub := range subscriptions {
go func(s struct {
envVar string
channel string
logPrefix string
}) {
if os.Getenv(s.envVar) != "true" {
return
}
logrus.Info("start subscribe core.TICKERINFO_PUBLISH")
md.LoopSubscribe(&cr, core.ALLCANDLES_PUBLISH, vv)
}(rdsLs[0])
// 订阅 redis Max
go func(vv *core.RedisConfig) {
allowed := os.Getenv("SIAGA_ACCEPTMAX") == "true"
if !allowed {
return
logrus.Infof("start subscribe %s: %s", s.logPrefix, s.channel)
md.LoopSubscribe(&cr, s.channel, rdsLs[0])
}(sub)
}
md.LoopSubscribe(&cr, core.ALLMAXES_PUBLISH, vv)
}(rdsLs[0])
// 下面这个暂时不运行, 在环境变量里把它关掉
go func(vv *core.RedisConfig) {
allowed := os.Getenv("SIAGA_ACCEPTSERIES") == "true"
if !allowed {
return
}
md.LoopSubscribe(&cr, core.ALLSERIESINFO_PUBLISH, vv)
}(rdsLs[0])
go func() {
md.TickerInfoProcess(&cr)

View File

@ -1,6 +1,7 @@
package module
import (
"context"
"encoding/json"
"errors"
"fmt"
@ -53,100 +54,159 @@ func GetRemoteRedisConfigList() ([]*core.RedisConfig, error) {
return list, nil
}
func LoopSubscribe(cr *core.Core, channelName string, redisConf *core.RedisConfig) {
redisRemoteCli, _ := cr.GetRedisCliFromConf(*redisConf)
suffix := ""
env := os.Getenv("GO_ENV")
if strings.Contains(env, "demoEnv") {
suffix = "-demoEnv"
func getChannelType(channelName string) string {
switch {
case strings.Contains(channelName, "allCandle"):
return "candle"
case strings.Contains(channelName, "allMaX"):
return "maX"
case strings.Contains(channelName, "ticker"):
return "tickerInfo"
case strings.Contains(channelName, "ccyPositions"):
return "ccyPositions"
case strings.Contains(channelName, "allseriesinfo"):
return "seriesinfo"
case strings.Contains(channelName, "private|order"):
return "private|order"
default:
return ""
}
logrus.Info("loopSubscribe: ", channelName+suffix)
pubsub := redisRemoteCli.Subscribe(channelName + suffix)
_, err := pubsub.Receive()
if err != nil {
// cr.ErrorToRobot(utils.GetFuncName(), err)
logrus.Error(GetFuncName(), " ", err)
panic(err)
}
// 用管道来接收消息
ch := pubsub.Channel()
// 处理消息
for msg := range ch {
if msg.Payload == "" {
// LoopSubscribe 循环订阅指定的 Redis 频道
// 参数:
// - cr: 核心对象
// - channelName: 要订阅的频道名称
// - redisConf: Redis 配置信息
func LoopSubscribe(cr *core.Core, channelName string, redisConf *core.RedisConfig) {
// 创建上下文用于控制循环
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 重试机制配置
retryCount := 0 // 当前重试次数
maxRetries := 3 // 最大重试次数
retryDelay := time.Second * 5 // 重试间隔时间
// 主循环
for {
select {
case <-ctx.Done(): // 如果上下文被取消
return
default:
// 尝试订阅并处理消息
err := subscribeAndProcess(ctx, cr, channelName, redisConf)
if err != nil {
// 如果出错,增加重试计数
retryCount++
if retryCount > maxRetries {
// 超过最大重试次数,记录错误并返回
logrus.Errorf("Max retries reached for channel %s, giving up", channelName)
return
}
// 记录警告并等待重试
logrus.Warnf("Subscription failed, retrying in %v (attempt %d/%d)", retryDelay, retryCount, maxRetries)
time.Sleep(retryDelay)
continue
}
ctype := ""
if strings.Contains(channelName, "allCandle") {
ctype = "candle"
} else if strings.Contains(channelName, "allMaX") {
ctype = "maX"
} else if strings.Contains(channelName, "ticker") {
ctype = "tickerInfo"
} else if strings.Contains(channelName, "ccyPositions") {
ctype = "ccyPositions"
} else if strings.Contains(channelName, "allseriesinfo") {
ctype = "seriesinfo"
} else if strings.Contains(channelName, "private|order") {
ctype = "private|order"
} else {
logrus.Warning("channelname not match", channelName)
// 成功则返回
return
}
logrus.Debug("msg.Payload: ", msg.Payload)
// fmt.Println("channelName: ", channelName, " msg.Payload: ", msg.Payload)
}
}
// subscribeAndProcess 订阅指定的 Redis 频道并处理接收到的消息
// 参数:
// - ctx: 上下文对象,用于控制订阅的生命周期
// - cr: 核心对象,包含系统配置和功能
// - channelName: 要订阅的频道名称
// - redisConf: Redis 连接配置信息
//
// 返回值:
// - error: 如果订阅或处理过程中出现错误则返回错误信息
func subscribeAndProcess(ctx context.Context, cr *core.Core, channelName string, redisConf *core.RedisConfig) error {
// 根据配置获取 Redis 客户端
redisRemoteCli, err := cr.GetRedisCliFromConf(*redisConf)
if err != nil {
return fmt.Errorf("failed to get redis client: %w", err)
}
// 根据环境变量判断是否需要添加后缀
suffix := ""
if strings.Contains(os.Getenv("GO_ENV"), "demoEnv") {
suffix = "-demoEnv"
}
// 构建完整的频道名称
fullChannelName := channelName + suffix
logrus.Infof("Starting subscription to channel: %s", fullChannelName)
// 订阅指定频道
pubsub := redisRemoteCli.Subscribe(fullChannelName)
defer pubsub.Close() // 确保在函数结束时关闭订阅
// 设置5秒超时等待订阅确认
if _, err := pubsub.ReceiveTimeout(time.Second * 5); err != nil {
return fmt.Errorf("failed to subscribe to channel %s: %w", fullChannelName, err)
}
// 获取消息通道
ch := pubsub.Channel()
// 主消息处理循环
for {
select {
case <-ctx.Done(): // 如果上下文被取消则退出
return nil
case msg := <-ch: // 接收到新消息
// 过滤无效消息
if msg == nil || msg.Payload == "" {
continue
}
// 处理消息内容
if err := processMessage(cr, channelName, msg.Payload); err != nil {
logrus.Warnf("Failed to process message: %v", err)
}
}
}
}
func processMessage(cr *core.Core, channelName string, payload string) error {
ctype := getChannelType(channelName)
if ctype == "" {
return fmt.Errorf("unrecognized channel name: %s", channelName)
}
logrus.Debugf("Received message from %s", channelName)
switch ctype {
// 接收到的candle扔到 candle 二次加工流水线
case "candle":
{
cd := core.Candle{}
json.Unmarshal([]byte(msg.Payload), &cd)
founded := cd.Filter(cr)
if !founded {
break
var cd core.Candle
if err := json.Unmarshal([]byte(payload), &cd); err != nil {
return fmt.Errorf("failed to unmarshal candle: %w", err)
}
if cd.Filter(cr) {
cr.CandlesProcessChan <- &cd
}
// 接收到的maX扔到 maX 二次加工流水线
case "maX":
{
mx := core.MaX{}
if msg.Payload == "" {
continue
var mx core.MaX
if err := json.Unmarshal([]byte(payload), &mx); err != nil {
return fmt.Errorf("failed to unmarshal maX: %w", err)
}
json.Unmarshal([]byte(msg.Payload), &mx)
dt := []interface{}{}
dt = append(dt, mx.Ts)
dt = append(dt, mx.AvgVal)
mx.Data = dt
mx.Data = []interface{}{mx.Ts, mx.AvgVal}
cr.MaXProcessChan <- &mx
}
// 接收到的tinckerInfo扔到 tickerInfo 二次加工流水线
case "tickerInfo":
{
//tickerInfo: map[askPx:2.2164 askSz:17.109531 bidPx:2.2136 bidSz:73 high24h:2.497 instId:STX-USDT instType:SPOT last:2.2136 lastSz:0 low24h:2.0508 open24h:2.42 sodUtc0:2.4266 sodUtc8:2.4224 ts:1637077323552 vol24h:5355479.488179 :12247398.975501]
ti := core.TickerInfo{}
err := json.Unmarshal([]byte(msg.Payload), &ti)
if err != nil {
logrus.Warning("tickerInfo payload unmarshal err: ", err, msg.Payload)
var ti core.TickerInfo
if err := json.Unmarshal([]byte(payload), &ti); err != nil {
return fmt.Errorf("failed to unmarshal tickerInfo: %w", err)
}
cr.TickerInforocessChan <- &ti
}
// case "seriesInfo":
// {
// //tickerInfo: map[askPx:2.2164 askSz:17.109531 bidPx:2.2136 bidSz:73 high24h:2.497 instId:STX-USDT instType:SPOT last:2.2136 lastSz:0 low24h:2.0508 open24h:2.42 sodUtc0:2.4266 sodUtc8:2.4224 ts:1637077323552 vol24h:5355479.488179 :12247398.975501]
// sei := SeriesInfo{}
// err := json.Unmarshal([]byte(msg.Payload), &sei)
// if err != nil {
// logrus.Warning("seriesInfo payload unmarshal err: ", err, msg.Payload)
// }
// cr.SeriesChan <- &sei
// continue
// }
}
}
return nil
}
func LoopMakeMaX(cr *core.Core) {
@ -241,46 +301,72 @@ func InvokeCandle(cr *core.Core, candleName string, period string, from int64, t
// count: 倒推多少个周期开始拿数据
// from: 倒推的起始时间点
// ctype: candle或者maX
//
// GetRangeCandleSortedSet 从 Redis 的有序集合中获取指定时间范围内的蜡烛图数据
// 参数:
// - cr: 核心对象,包含 Redis 连接等配置
// - setName: Redis 有序集合的名称,格式为 "candle{period}|{instId}|sortedSet"
// - count: 需要获取的蜡烛图数量
// - from: 时间范围的结束时间点
//
// 返回值:
// - *core.CandleList: 获取到的蜡烛图列表
// - error: 如果获取过程中出现错误则返回错误信息
func GetRangeCandleSortedSet(cr *core.Core, setName string, count int, from time.Time) (*core.CandleList, error) {
cdl := core.CandleList{}
// 解析 setName 获取周期和交易对信息
// setName 格式示例:"candle1m|BTC-USDT|sortedSet"
ary1 := strings.Split(setName, "|")
ary2 := []string{}
period := ""
ary2 = strings.Split(ary1[0], "candle")
period = ary2[1]
period = ary2[1] // 获取周期,如 "1m"
// 将周期转换为分钟数
dui, err := cr.PeriodToMinutes(period)
if err != nil {
return nil, err
}
// 将时间转换为毫秒时间戳
fromt := from.UnixMilli()
nw := time.Now().UnixMilli()
// 检查时间是否合理,防止时间戳错误
if fromt > nw*2 {
err := errors.New("时间错了需要debug")
logrus.Warning(err.Error())
return nil, err
}
froms := strconv.FormatInt(fromt, 10)
sti := fromt - dui*int64(count)*60*1000
sts := strconv.FormatInt(sti, 10)
// 计算时间范围
froms := strconv.FormatInt(fromt, 10) // 结束时间
sti := fromt - dui*int64(count)*60*1000 // 开始时间 = 结束时间 - (周期 * 数量)
sts := strconv.FormatInt(sti, 10) // 开始时间字符串
// 构建 Redis ZRangeBy 查询参数
opt := redis.ZRangeBy{
Min: sts,
Max: froms,
Count: int64(count),
Min: sts, // 最小时间戳
Max: froms, // 最大时间戳
Count: int64(count), // 最大数量
}
ary := []string{}
extt, err := cr.GetExpiration(period)
ot := time.Now().Add(extt * -1)
oti := ot.UnixMilli()
// 清理过期数据
extt, err := cr.GetExpiration(period) // 获取过期时间
ot := time.Now().Add(extt * -1) // 计算过期时间点
oti := ot.UnixMilli() // 转换为毫秒时间戳
cli := cr.RedisLocalCli
cli.LTrim(setName, 0, oti)
cunt, _ := cli.ZRemRangeByScore(setName, "0", strconv.FormatInt(oti, 10)).Result()
// 清理过期数据
cli.LTrim(setName, 0, oti) // 修剪列表
cunt, _ := cli.ZRemRangeByScore(setName, "0", strconv.FormatInt(oti, 10)).Result() // 移除过期数据
if cunt > 0 {
logrus.Warning("移出过期的引用数量setName: ", setName, " cunt: ", cunt, " , ZRemRangeByScore ", setName, 0, strconv.FormatInt(oti, 10))
}
// 从 Redis 有序集合中获取数据
logrus.Info("ZRevRangeByScore in GetRangeCandleSortedSet: setName:", setName, " opt:", opt)
ary, err = cli.ZRevRangeByScore(setName, opt).Result()
// fmt.Println("ary: ", ary, " setName: ", setName, " opt: ", opt)
ary, err = cli.ZRevRangeByScore(setName, opt).Result() // 按分数范围获取数据
if err != nil {
return &cdl, err
}
@ -296,22 +382,30 @@ func GetRangeCandleSortedSet(cr *core.Core, setName string, count int, from time
// }()
return &cdl, err
}
// 解析并处理获取到的蜡烛图数据
for _, str := range keyAry {
if str == nil {
continue
}
cd := core.Candle{}
// 反序列化 JSON 数据
err := json.Unmarshal([]byte(str.(string)), &cd)
if err != nil {
logrus.Warn(GetFuncName(), err, str.(string))
}
// 检查时间戳是否在指定范围内
tmi := ToInt64(cd.Data[0])
tm := time.UnixMilli(tmi)
if tm.Sub(from) > 0 {
break
}
// 将有效的蜡烛图数据添加到列表中
cdl.List = append(cdl.List, &cd)
}
// 设置返回的蜡烛图数量
cdl.Count = count
return &cdl, nil
}

View File

@ -3,34 +3,96 @@ package module
import (
"encoding/json"
// "errors"
// "fmt"
"fmt"
"github.com/phyer/core"
// "os"
// "strconv"
// "strings"
// // "sync"
// "sync"
// "time"
// //
// // simple "github.com/bitly/go-simplejson"
//
// simple "github.com/bitly/go-simplejson"
// "github.com/go-redis/redis"
// // "github.com/phyer/core/utils"
// logrus "github.com/sirupsen/logrus"
// "github.com/phyer/core/utils"
logrus "github.com/sirupsen/logrus"
)
// TODO 从redis里读出来已经存储的plate如果不存在就创建一个新的
// LoadPlate 加载或创建指定Instrument的Plate
// 1. 尝试从Redis加载已存储的Plate
// 2. 如果Redis中不存在则初始化一个新的Plate
// 3. 根据配置创建所有需要的Coaster
// 参数:
// - cr: 核心上下文对象
// - instId: Instrument ID
//
// 返回值:
// - *core.Plate: 加载或新建的Plate对象
// - error: 操作过程中发生的错误
func LoadPlate(cr *core.Core, instId string) (*core.Plate, error) {
// 初始化一个空的Plate对象
pl := core.Plate{}
// 构造Redis中存储Plate数据的key格式为"instId|plate"
plateName := instId + "|plate"
_, err := cr.RedisLocalCli.Exists().Result()
if err == nil {
str, _ := cr.RedisLocalCli.Get(plateName).Result()
json.Unmarshal([]byte(str), &pl)
} else {
pl.Init(instId)
prs := cr.Cfg.Config.Get("candleDimentions").MustArray()
for _, v := range prs {
pl.MakeCoaster(cr, v.(string))
}
// 尝试从Redis获取Plate数据
str, err := cr.RedisLocalCli.Get(plateName).Result()
// 如果Redis中存在数据且没有错误
if err == nil && str != "" {
// 将JSON字符串反序列化为Plate对象
if err := json.Unmarshal([]byte(str), &pl); err != nil {
// 反序列化失败时返回错误
return nil, fmt.Errorf("failed to unmarshal plate data: %v", err)
}
// 返回从Redis加载的Plate对象
return &pl, nil
}
// Redis不可用或数据不存在时初始化一个新的Plate
pl.Init(instId)
// 从配置中获取candleDimentions配置项
prs := cr.Cfg.Config.Get("candleDimentions")
// 检查配置项是否存在
if prs == nil || prs.Interface() == nil {
return nil, fmt.Errorf("candleDimentions config not found")
}
// 将配置项转换为数组
periods := prs.MustArray()
// 遍历所有周期配置
for _, v := range periods {
// 将interface{}类型转换为string
period, ok := v.(string)
// 检查周期字符串是否有效
if !ok || period == "" {
continue // 跳过无效的周期配置
}
// 为每个周期创建Coaster
if err := pl.MakeCoaster(cr, period); err != nil {
// 如果创建失败,返回错误
return nil, fmt.Errorf("failed to create coaster for period %s: %v", period, err)
}
}
// 将新创建的Plate保存到Redis可选操作
if err := savePlateToRedis(cr, plateName, &pl); err != nil {
// 保存失败时记录警告日志
logrus.Warnf("failed to save plate to redis: %v", err)
}
// 返回创建好的Plate对象
return &pl, nil
}
func savePlateToRedis(cr *core.Core, key string, pl *core.Plate) error {
data, err := json.Marshal(pl)
if err != nil {
return err
}
return cr.RedisLocalCli.Set(key, data, 0).Err()
}

BIN
siaga

Binary file not shown.