diff --git a/.gitignore b/.gitignore index 48b8bf9..8c126ea 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ vendor/ +.DS_Store diff --git a/main.go b/main.go index 97aba21..da6a1b7 100644 --- a/main.go +++ b/main.go @@ -30,43 +30,32 @@ func main() { // 目前只有phyer里部署的tunas会发布tickerInfo信息 // fmt.Println("len of rdsLs: ", len(rdsLs)) - // 订阅 redis TickerInfo - go func(vv *core.RedisConfig) { - allowed := os.Getenv("SIAGA_ACCEPTTICKER") == "true" - if !allowed { - return - } - logrus.Info("start subscribe core.TICKERINFO_PUBLISH") - md.LoopSubscribe(&cr, core.TICKERINFO_PUBLISH, vv) - }(rdsLs[0]) + // 定义订阅配置 + subscriptions := []struct { + envVar string + channel string + logPrefix string + }{ + {"SIAGA_ACCEPTTICKER", core.TICKERINFO_PUBLISH, "TickerInfo"}, + {"SIAGA_ACCEPTCANDLE", core.ALLCANDLES_PUBLISH, "Candles"}, + {"SIAGA_ACCEPTMAX", core.ALLMAXES_PUBLISH, "Max"}, + {"SIAGA_ACCEPTSERIES", core.ALLSERIESINFO_PUBLISH, "Series"}, + } - // 订阅 redis Candles - go func(vv *core.RedisConfig) { - allowed := os.Getenv("SIAGA_ACCEPTCANDLE") == "true" - if !allowed { - return - } - logrus.Info("start subscribe core.TICKERINFO_PUBLISH") - md.LoopSubscribe(&cr, core.ALLCANDLES_PUBLISH, vv) - }(rdsLs[0]) - - // 订阅 redis Max - go func(vv *core.RedisConfig) { - allowed := os.Getenv("SIAGA_ACCEPTMAX") == "true" - if !allowed { - return - } - md.LoopSubscribe(&cr, core.ALLMAXES_PUBLISH, vv) - }(rdsLs[0]) - - // 下面这个暂时不运行, 在环境变量里把它关掉 - go func(vv *core.RedisConfig) { - allowed := os.Getenv("SIAGA_ACCEPTSERIES") == "true" - if !allowed { - return - } - md.LoopSubscribe(&cr, core.ALLSERIESINFO_PUBLISH, vv) - }(rdsLs[0]) + // 启动所有订阅 + for _, sub := range subscriptions { + go func(s struct { + envVar string + channel string + logPrefix string + }) { + if os.Getenv(s.envVar) != "true" { + return + } + logrus.Infof("start subscribe %s: %s", s.logPrefix, s.channel) + md.LoopSubscribe(&cr, s.channel, rdsLs[0]) + }(sub) + } go func() { md.TickerInfoProcess(&cr) diff --git a/modules/extent.go b/modules/extent.go index 2ea955c..27db57b 100644 --- a/modules/extent.go +++ b/modules/extent.go @@ -1,6 +1,7 @@ package module import ( + "context" "encoding/json" "errors" "fmt" @@ -53,102 +54,161 @@ func GetRemoteRedisConfigList() ([]*core.RedisConfig, error) { return list, nil } +func getChannelType(channelName string) string { + switch { + case strings.Contains(channelName, "allCandle"): + return "candle" + case strings.Contains(channelName, "allMaX"): + return "maX" + case strings.Contains(channelName, "ticker"): + return "tickerInfo" + case strings.Contains(channelName, "ccyPositions"): + return "ccyPositions" + case strings.Contains(channelName, "allseriesinfo"): + return "seriesinfo" + case strings.Contains(channelName, "private|order"): + return "private|order" + default: + return "" + } +} + +// LoopSubscribe 循环订阅指定的 Redis 频道 +// 参数: +// - cr: 核心对象 +// - channelName: 要订阅的频道名称 +// - redisConf: Redis 配置信息 func LoopSubscribe(cr *core.Core, channelName string, redisConf *core.RedisConfig) { - redisRemoteCli, _ := cr.GetRedisCliFromConf(*redisConf) + // 创建上下文用于控制循环 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // 重试机制配置 + retryCount := 0 // 当前重试次数 + maxRetries := 3 // 最大重试次数 + retryDelay := time.Second * 5 // 重试间隔时间 + + // 主循环 + for { + select { + case <-ctx.Done(): // 如果上下文被取消 + return + default: + // 尝试订阅并处理消息 + err := subscribeAndProcess(ctx, cr, channelName, redisConf) + if err != nil { + // 如果出错,增加重试计数 + retryCount++ + if retryCount > maxRetries { + // 超过最大重试次数,记录错误并返回 + logrus.Errorf("Max retries reached for channel %s, giving up", channelName) + return + } + // 记录警告并等待重试 + logrus.Warnf("Subscription failed, retrying in %v (attempt %d/%d)", retryDelay, retryCount, maxRetries) + time.Sleep(retryDelay) + continue + } + // 成功则返回 + return + } + } +} + +// subscribeAndProcess 订阅指定的 Redis 频道并处理接收到的消息 +// 参数: +// - ctx: 上下文对象,用于控制订阅的生命周期 +// - cr: 核心对象,包含系统配置和功能 +// - channelName: 要订阅的频道名称 +// - redisConf: Redis 连接配置信息 +// +// 返回值: +// - error: 如果订阅或处理过程中出现错误则返回错误信息 +func subscribeAndProcess(ctx context.Context, cr *core.Core, channelName string, redisConf *core.RedisConfig) error { + // 根据配置获取 Redis 客户端 + redisRemoteCli, err := cr.GetRedisCliFromConf(*redisConf) + if err != nil { + return fmt.Errorf("failed to get redis client: %w", err) + } + + // 根据环境变量判断是否需要添加后缀 suffix := "" - env := os.Getenv("GO_ENV") - if strings.Contains(env, "demoEnv") { + if strings.Contains(os.Getenv("GO_ENV"), "demoEnv") { suffix = "-demoEnv" } - logrus.Info("loopSubscribe: ", channelName+suffix) - pubsub := redisRemoteCli.Subscribe(channelName + suffix) - _, err := pubsub.Receive() - if err != nil { - // cr.ErrorToRobot(utils.GetFuncName(), err) - logrus.Error(GetFuncName(), " ", err) - panic(err) + + // 构建完整的频道名称 + fullChannelName := channelName + suffix + logrus.Infof("Starting subscription to channel: %s", fullChannelName) + + // 订阅指定频道 + pubsub := redisRemoteCli.Subscribe(fullChannelName) + defer pubsub.Close() // 确保在函数结束时关闭订阅 + + // 设置5秒超时等待订阅确认 + if _, err := pubsub.ReceiveTimeout(time.Second * 5); err != nil { + return fmt.Errorf("failed to subscribe to channel %s: %w", fullChannelName, err) } - // 用管道来接收消息 + // 获取消息通道 ch := pubsub.Channel() - // 处理消息 - for msg := range ch { - if msg.Payload == "" { - continue - } - ctype := "" - if strings.Contains(channelName, "allCandle") { - ctype = "candle" - } else if strings.Contains(channelName, "allMaX") { - ctype = "maX" - } else if strings.Contains(channelName, "ticker") { - ctype = "tickerInfo" - } else if strings.Contains(channelName, "ccyPositions") { - ctype = "ccyPositions" - } else if strings.Contains(channelName, "allseriesinfo") { - ctype = "seriesinfo" - } else if strings.Contains(channelName, "private|order") { - ctype = "private|order" - } else { - logrus.Warning("channelname not match", channelName) - } - logrus.Debug("msg.Payload: ", msg.Payload) - // fmt.Println("channelName: ", channelName, " msg.Payload: ", msg.Payload) - switch ctype { - // 接收到的candle扔到 candle 二次加工流水线 - case "candle": - { - cd := core.Candle{} - json.Unmarshal([]byte(msg.Payload), &cd) - founded := cd.Filter(cr) - if !founded { - break - } - cr.CandlesProcessChan <- &cd + + // 主消息处理循环 + for { + select { + case <-ctx.Done(): // 如果上下文被取消则退出 + return nil + case msg := <-ch: // 接收到新消息 + // 过滤无效消息 + if msg == nil || msg.Payload == "" { + continue } - // 接收到的maX扔到 maX 二次加工流水线 - case "maX": - { - mx := core.MaX{} - if msg.Payload == "" { - continue - } - json.Unmarshal([]byte(msg.Payload), &mx) - dt := []interface{}{} - dt = append(dt, mx.Ts) - dt = append(dt, mx.AvgVal) - mx.Data = dt - cr.MaXProcessChan <- &mx - + // 处理消息内容 + if err := processMessage(cr, channelName, msg.Payload); err != nil { + logrus.Warnf("Failed to process message: %v", err) } - - // 接收到的tinckerInfo扔到 tickerInfo 二次加工流水线 - case "tickerInfo": - { - //tickerInfo: map[askPx:2.2164 askSz:17.109531 bidPx:2.2136 bidSz:73 high24h:2.497 instId:STX-USDT instType:SPOT last:2.2136 lastSz:0 low24h:2.0508 open24h:2.42 sodUtc0:2.4266 sodUtc8:2.4224 ts:1637077323552 vol24h:5355479.488179 :12247398.975501] - ti := core.TickerInfo{} - err := json.Unmarshal([]byte(msg.Payload), &ti) - if err != nil { - logrus.Warning("tickerInfo payload unmarshal err: ", err, msg.Payload) - } - cr.TickerInforocessChan <- &ti - } - // case "seriesInfo": - // { - // //tickerInfo: map[askPx:2.2164 askSz:17.109531 bidPx:2.2136 bidSz:73 high24h:2.497 instId:STX-USDT instType:SPOT last:2.2136 lastSz:0 low24h:2.0508 open24h:2.42 sodUtc0:2.4266 sodUtc8:2.4224 ts:1637077323552 vol24h:5355479.488179 :12247398.975501] - // sei := SeriesInfo{} - // err := json.Unmarshal([]byte(msg.Payload), &sei) - // if err != nil { - // logrus.Warning("seriesInfo payload unmarshal err: ", err, msg.Payload) - // } - // cr.SeriesChan <- &sei - // continue - // } } } } +func processMessage(cr *core.Core, channelName string, payload string) error { + ctype := getChannelType(channelName) + if ctype == "" { + return fmt.Errorf("unrecognized channel name: %s", channelName) + } + + logrus.Debugf("Received message from %s", channelName) + + switch ctype { + case "candle": + var cd core.Candle + if err := json.Unmarshal([]byte(payload), &cd); err != nil { + return fmt.Errorf("failed to unmarshal candle: %w", err) + } + if cd.Filter(cr) { + cr.CandlesProcessChan <- &cd + } + + case "maX": + var mx core.MaX + if err := json.Unmarshal([]byte(payload), &mx); err != nil { + return fmt.Errorf("failed to unmarshal maX: %w", err) + } + mx.Data = []interface{}{mx.Ts, mx.AvgVal} + cr.MaXProcessChan <- &mx + + case "tickerInfo": + var ti core.TickerInfo + if err := json.Unmarshal([]byte(payload), &ti); err != nil { + return fmt.Errorf("failed to unmarshal tickerInfo: %w", err) + } + cr.TickerInforocessChan <- &ti + } + + return nil +} + func LoopMakeMaX(cr *core.Core) { for { cd := <-cr.MakeMaXsChan @@ -241,46 +301,72 @@ func InvokeCandle(cr *core.Core, candleName string, period string, from int64, t // count: 倒推多少个周期开始拿数据 // from: 倒推的起始时间点 // ctype: candle或者maX +// +// GetRangeCandleSortedSet 从 Redis 的有序集合中获取指定时间范围内的蜡烛图数据 +// 参数: +// - cr: 核心对象,包含 Redis 连接等配置 +// - setName: Redis 有序集合的名称,格式为 "candle{period}|{instId}|sortedSet" +// - count: 需要获取的蜡烛图数量 +// - from: 时间范围的结束时间点 +// +// 返回值: +// - *core.CandleList: 获取到的蜡烛图列表 +// - error: 如果获取过程中出现错误则返回错误信息 func GetRangeCandleSortedSet(cr *core.Core, setName string, count int, from time.Time) (*core.CandleList, error) { cdl := core.CandleList{} + + // 解析 setName 获取周期和交易对信息 + // setName 格式示例:"candle1m|BTC-USDT|sortedSet" ary1 := strings.Split(setName, "|") ary2 := []string{} period := "" ary2 = strings.Split(ary1[0], "candle") - period = ary2[1] + period = ary2[1] // 获取周期,如 "1m" + // 将周期转换为分钟数 dui, err := cr.PeriodToMinutes(period) if err != nil { return nil, err } + + // 将时间转换为毫秒时间戳 fromt := from.UnixMilli() nw := time.Now().UnixMilli() + + // 检查时间是否合理,防止时间戳错误 if fromt > nw*2 { err := errors.New("时间错了需要debug") logrus.Warning(err.Error()) return nil, err } - froms := strconv.FormatInt(fromt, 10) - sti := fromt - dui*int64(count)*60*1000 - sts := strconv.FormatInt(sti, 10) + // 计算时间范围 + froms := strconv.FormatInt(fromt, 10) // 结束时间 + sti := fromt - dui*int64(count)*60*1000 // 开始时间 = 结束时间 - (周期 * 数量) + sts := strconv.FormatInt(sti, 10) // 开始时间字符串 + + // 构建 Redis ZRangeBy 查询参数 opt := redis.ZRangeBy{ - Min: sts, - Max: froms, - Count: int64(count), + Min: sts, // 最小时间戳 + Max: froms, // 最大时间戳 + Count: int64(count), // 最大数量 } ary := []string{} - extt, err := cr.GetExpiration(period) - ot := time.Now().Add(extt * -1) - oti := ot.UnixMilli() + + // 清理过期数据 + extt, err := cr.GetExpiration(period) // 获取过期时间 + ot := time.Now().Add(extt * -1) // 计算过期时间点 + oti := ot.UnixMilli() // 转换为毫秒时间戳 cli := cr.RedisLocalCli - cli.LTrim(setName, 0, oti) - cunt, _ := cli.ZRemRangeByScore(setName, "0", strconv.FormatInt(oti, 10)).Result() + + // 清理过期数据 + cli.LTrim(setName, 0, oti) // 修剪列表 + cunt, _ := cli.ZRemRangeByScore(setName, "0", strconv.FormatInt(oti, 10)).Result() // 移除过期数据 if cunt > 0 { logrus.Warning("移出过期的引用数量:setName: ", setName, " cunt: ", cunt, " , ZRemRangeByScore ", setName, 0, strconv.FormatInt(oti, 10)) } + // 从 Redis 有序集合中获取数据 logrus.Info("ZRevRangeByScore in GetRangeCandleSortedSet: setName:", setName, " opt:", opt) - ary, err = cli.ZRevRangeByScore(setName, opt).Result() - // fmt.Println("ary: ", ary, " setName: ", setName, " opt: ", opt) + ary, err = cli.ZRevRangeByScore(setName, opt).Result() // 按分数范围获取数据 if err != nil { return &cdl, err } @@ -296,22 +382,30 @@ func GetRangeCandleSortedSet(cr *core.Core, setName string, count int, from time // }() return &cdl, err } + // 解析并处理获取到的蜡烛图数据 for _, str := range keyAry { if str == nil { continue } cd := core.Candle{} + // 反序列化 JSON 数据 err := json.Unmarshal([]byte(str.(string)), &cd) if err != nil { logrus.Warn(GetFuncName(), err, str.(string)) } + + // 检查时间戳是否在指定范围内 tmi := ToInt64(cd.Data[0]) tm := time.UnixMilli(tmi) if tm.Sub(from) > 0 { break } + + // 将有效的蜡烛图数据添加到列表中 cdl.List = append(cdl.List, &cd) } + + // 设置返回的蜡烛图数量 cdl.Count = count return &cdl, nil } diff --git a/modules/plate.go b/modules/plate.go index 237859c..324f9b9 100644 --- a/modules/plate.go +++ b/modules/plate.go @@ -3,34 +3,96 @@ package module import ( "encoding/json" // "errors" - // "fmt" + "fmt" "github.com/phyer/core" // "os" // "strconv" // "strings" - // // "sync" + // "sync" // "time" - // // - // // simple "github.com/bitly/go-simplejson" + // + // simple "github.com/bitly/go-simplejson" // "github.com/go-redis/redis" - // // "github.com/phyer/core/utils" - // logrus "github.com/sirupsen/logrus" + // "github.com/phyer/core/utils" + logrus "github.com/sirupsen/logrus" ) // TODO 从redis里读出来已经存储的plate,如果不存在就创建一个新的 +// LoadPlate 加载或创建指定Instrument的Plate +// 1. 尝试从Redis加载已存储的Plate +// 2. 如果Redis中不存在,则初始化一个新的Plate +// 3. 根据配置创建所有需要的Coaster +// 参数: +// - cr: 核心上下文对象 +// - instId: Instrument ID +// +// 返回值: +// - *core.Plate: 加载或新建的Plate对象 +// - error: 操作过程中发生的错误 func LoadPlate(cr *core.Core, instId string) (*core.Plate, error) { + // 初始化一个空的Plate对象 pl := core.Plate{} + + // 构造Redis中存储Plate数据的key,格式为"instId|plate" plateName := instId + "|plate" - _, err := cr.RedisLocalCli.Exists().Result() - if err == nil { - str, _ := cr.RedisLocalCli.Get(plateName).Result() - json.Unmarshal([]byte(str), &pl) - } else { - pl.Init(instId) - prs := cr.Cfg.Config.Get("candleDimentions").MustArray() - for _, v := range prs { - pl.MakeCoaster(cr, v.(string)) + + // 尝试从Redis获取Plate数据 + str, err := cr.RedisLocalCli.Get(plateName).Result() + + // 如果Redis中存在数据且没有错误 + if err == nil && str != "" { + // 将JSON字符串反序列化为Plate对象 + if err := json.Unmarshal([]byte(str), &pl); err != nil { + // 反序列化失败时返回错误 + return nil, fmt.Errorf("failed to unmarshal plate data: %v", err) + } + // 返回从Redis加载的Plate对象 + return &pl, nil + } + + // Redis不可用或数据不存在时,初始化一个新的Plate + pl.Init(instId) + + // 从配置中获取candleDimentions配置项 + prs := cr.Cfg.Config.Get("candleDimentions") + + // 检查配置项是否存在 + if prs == nil || prs.Interface() == nil { + return nil, fmt.Errorf("candleDimentions config not found") + } + + // 将配置项转换为数组 + periods := prs.MustArray() + + // 遍历所有周期配置 + for _, v := range periods { + // 将interface{}类型转换为string + period, ok := v.(string) + // 检查周期字符串是否有效 + if !ok || period == "" { + continue // 跳过无效的周期配置 + } + // 为每个周期创建Coaster + if err := pl.MakeCoaster(cr, period); err != nil { + // 如果创建失败,返回错误 + return nil, fmt.Errorf("failed to create coaster for period %s: %v", period, err) } } + + // 将新创建的Plate保存到Redis(可选操作) + if err := savePlateToRedis(cr, plateName, &pl); err != nil { + // 保存失败时记录警告日志 + logrus.Warnf("failed to save plate to redis: %v", err) + } + + // 返回创建好的Plate对象 return &pl, nil } + +func savePlateToRedis(cr *core.Core, key string, pl *core.Plate) error { + data, err := json.Marshal(pl) + if err != nil { + return err + } + return cr.RedisLocalCli.Set(key, data, 0).Err() +} diff --git a/siaga b/siaga deleted file mode 100755 index b0c83d0..0000000 Binary files a/siaga and /dev/null differ