875 lines
26 KiB
Go
875 lines
26 KiB
Go
package module
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"os"
|
||
"strconv"
|
||
"strings"
|
||
|
||
"github.com/phyer/core"
|
||
|
||
// "sync"
|
||
"time"
|
||
//
|
||
// simple "github.com/bitly/go-simplejson"
|
||
"github.com/go-redis/redis"
|
||
// "github.com/phyer/core/utils"
|
||
logrus "github.com/sirupsen/logrus"
|
||
)
|
||
|
||
func GetRemoteRedisConfigList() ([]*core.RedisConfig, error) {
|
||
list := []*core.RedisConfig{}
|
||
envListStr := os.Getenv("SIAGA_UPSTREAM_REDIS_LIST")
|
||
envList := strings.Split(envListStr, "|")
|
||
for _, v := range envList {
|
||
if len(v) == 0 {
|
||
continue
|
||
}
|
||
urlstr := "SIAGA_UPSTREAM_REDIS_" + v + "_URL"
|
||
indexstr := "SIAGA_UPSTREAM_REDIS_" + v + "_INDEX"
|
||
password := os.Getenv("SIAGA_UPSTREAM_REDIS_" + v + "_PASSWORD")
|
||
// channelstr := core.REMOTE_REDIS_PRE_NAME + v + "_CHANNEL_PRENAME"
|
||
// channelPreName := os.Getenv(channelstr)
|
||
url := os.Getenv(urlstr)
|
||
index := os.Getenv(indexstr)
|
||
if len(url) == 0 || len(index) == 0 {
|
||
err := errors.New("remote redis config err:" + urlstr + "," + url + "," + indexstr + "," + index)
|
||
return list, err
|
||
}
|
||
idx, err := strconv.Atoi(index)
|
||
if err != nil {
|
||
return list, err
|
||
}
|
||
curConf := core.RedisConfig{
|
||
Url: url,
|
||
Password: password,
|
||
Index: idx,
|
||
// ChannelPreName: channelPreName,
|
||
}
|
||
list = append(list, &curConf)
|
||
}
|
||
return list, nil
|
||
}
|
||
|
||
func getChannelType(channelName string) string {
|
||
switch {
|
||
case strings.Contains(channelName, "allCandle"):
|
||
return "candle"
|
||
case strings.Contains(channelName, "allMaX"):
|
||
return "maX"
|
||
case strings.Contains(channelName, "ticker"):
|
||
return "tickerInfo"
|
||
case strings.Contains(channelName, "ccyPositions"):
|
||
return "ccyPositions"
|
||
case strings.Contains(channelName, "allseriesinfo"):
|
||
return "seriesinfo"
|
||
case strings.Contains(channelName, "private|order"):
|
||
return "private|order"
|
||
default:
|
||
return ""
|
||
}
|
||
}
|
||
|
||
// LoopSubscribe 循环订阅指定的 Redis 频道
|
||
// 参数:
|
||
// - cr: 核心对象
|
||
// - channelName: 要订阅的频道名称
|
||
// - redisConf: Redis 配置信息
|
||
func LoopSubscribe(cr *core.Core, channelName string, redisConf *core.RedisConfig) {
|
||
// 创建上下文用于控制循环
|
||
ctx, cancel := context.WithCancel(context.Background())
|
||
defer cancel()
|
||
|
||
// 重试机制配置
|
||
retryCount := 0 // 当前重试次数
|
||
maxRetries := 3 // 最大重试次数
|
||
retryDelay := time.Second * 5 // 重试间隔时间
|
||
|
||
// 主循环
|
||
for {
|
||
select {
|
||
case <-ctx.Done(): // 如果上下文被取消
|
||
return
|
||
default:
|
||
// 尝试订阅并处理消息
|
||
err := subscribeAndProcess(ctx, cr, channelName, redisConf)
|
||
if err != nil {
|
||
// 如果出错,增加重试计数
|
||
retryCount++
|
||
if retryCount > maxRetries {
|
||
// 超过最大重试次数,记录错误并返回
|
||
logrus.Errorf("Max retries reached for channel %s, giving up", channelName)
|
||
return
|
||
}
|
||
// 记录警告并等待重试
|
||
logrus.Warnf("Subscription failed, retrying in %v (attempt %d/%d)", retryDelay, retryCount, maxRetries)
|
||
time.Sleep(retryDelay)
|
||
continue
|
||
}
|
||
// 成功则返回
|
||
return
|
||
}
|
||
}
|
||
}
|
||
|
||
// subscribeAndProcess 订阅指定的 Redis 频道并处理接收到的消息
|
||
// 参数:
|
||
// - ctx: 上下文对象,用于控制订阅的生命周期
|
||
// - cr: 核心对象,包含系统配置和功能
|
||
// - channelName: 要订阅的频道名称
|
||
// - redisConf: Redis 连接配置信息
|
||
//
|
||
// 返回值:
|
||
// - error: 如果订阅或处理过程中出现错误则返回错误信息
|
||
func subscribeAndProcess(ctx context.Context, cr *core.Core, channelName string, redisConf *core.RedisConfig) error {
|
||
// 根据配置获取 Redis 客户端
|
||
redisRemoteCli, err := cr.GetRedisCliFromConf(*redisConf)
|
||
if err != nil {
|
||
return fmt.Errorf("failed to get redis client: %w", err)
|
||
}
|
||
|
||
// 根据环境变量判断是否需要添加后缀
|
||
suffix := ""
|
||
if strings.Contains(os.Getenv("GO_ENV"), "demoEnv") {
|
||
suffix = "-demoEnv"
|
||
}
|
||
|
||
// 构建完整的频道名称
|
||
fullChannelName := channelName + suffix
|
||
logrus.Infof("Starting subscription to channel: %s", fullChannelName)
|
||
|
||
// 订阅指定频道
|
||
pubsub := redisRemoteCli.Subscribe(fullChannelName)
|
||
defer pubsub.Close() // 确保在函数结束时关闭订阅
|
||
|
||
// 设置5秒超时等待订阅确认
|
||
if _, err := pubsub.ReceiveTimeout(time.Second * 5); err != nil {
|
||
return fmt.Errorf("failed to subscribe to channel %s: %w", fullChannelName, err)
|
||
}
|
||
|
||
// 获取消息通道
|
||
ch := pubsub.Channel()
|
||
|
||
// 主消息处理循环
|
||
for {
|
||
select {
|
||
case <-ctx.Done(): // 如果上下文被取消则退出
|
||
return nil
|
||
case msg := <-ch: // 接收到新消息
|
||
// 过滤无效消息
|
||
if msg == nil || msg.Payload == "" {
|
||
continue
|
||
}
|
||
|
||
// 处理消息内容
|
||
if err := processMessage(cr, channelName, msg.Payload); err != nil {
|
||
logrus.Warnf("Failed to process message: %v", err)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
func processMessage(cr *core.Core, channelName string, payload string) error {
|
||
ctype := getChannelType(channelName)
|
||
if ctype == "" {
|
||
return fmt.Errorf("unrecognized channel name: %s", channelName)
|
||
}
|
||
|
||
logrus.Debugf("Received message from %s", channelName)
|
||
|
||
switch ctype {
|
||
case "candle":
|
||
var cd core.Candle
|
||
if err := json.Unmarshal([]byte(payload), &cd); err != nil {
|
||
return fmt.Errorf("failed to unmarshal candle: %w", err)
|
||
}
|
||
if cd.Filter(cr) {
|
||
cr.CandlesProcessChan <- &cd
|
||
}
|
||
|
||
case "maX":
|
||
var mx core.MaX
|
||
if err := json.Unmarshal([]byte(payload), &mx); err != nil {
|
||
return fmt.Errorf("failed to unmarshal maX: %w", err)
|
||
}
|
||
mx.Data = []interface{}{mx.Ts, mx.AvgVal}
|
||
cr.MaXProcessChan <- &mx
|
||
|
||
case "tickerInfo":
|
||
var ti core.TickerInfo
|
||
if err := json.Unmarshal([]byte(payload), &ti); err != nil {
|
||
return fmt.Errorf("failed to unmarshal tickerInfo: %w", err)
|
||
}
|
||
cr.TickerInforocessChan <- &ti
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func LoopMakeMaX(cr *core.Core) {
|
||
for {
|
||
cd := <-cr.MakeMaXsChan
|
||
go func(cad *core.Candle) {
|
||
//当一个candle的多个时间点的数据几乎同时到达时,顺序无法保证,制作maX会因为中间缺失数据而计算错,因此,等待一秒钟等数据都全了再算
|
||
// sz := utils.ShaiziInt(1500) + 500
|
||
time.Sleep(time.Duration(1500) * time.Millisecond)
|
||
err, ct := MakeMaX(cr, cad, 7)
|
||
if err != nil {
|
||
logrus.Warn(GetFuncName(), " ma7 err:", err, " ct:", ct, " cd.InstID:", cd.InstID, " cd.Period:", cd.Period, " cd.Timestamp: ", cd.Timestamp, " cd.Data[0]: ", cd.Data[0])
|
||
}
|
||
//TODO 这个思路不错,单行不通,远程redis禁不住这么频繁的请求
|
||
// cd.InvokeRestQFromRemote(cr, ct)
|
||
}(cd)
|
||
go func(cad *core.Candle) {
|
||
//当一个candle的多个时间点的数据几乎同时到达时,顺序无法保证,制作maX会因为中间缺失数据而计算错,因此,等待一秒钟等数据都全了再算
|
||
// sz := utils.ShaiziInt(2000) + 500
|
||
time.Sleep(time.Duration(1600) * time.Millisecond)
|
||
err, ct := MakeMaX(cr, cad, 30)
|
||
if err != nil {
|
||
logrus.Warn(GetFuncName(), " ma30 err:", err, " ct:", ct, " cd.InstID:", cd.InstID, " cd.Period:", cd.Period, " cd.Timestamp: ", cd.Timestamp)
|
||
}
|
||
// cd.InvokeRestQFromRemote(cr, ct)
|
||
}(cd)
|
||
go func(cad *core.Candle) {
|
||
time.Sleep(time.Duration(1700) * time.Millisecond)
|
||
err, ct := MakeRsi(cr, cad, 14, true)
|
||
logrus.Warn(GetFuncName(), " rsi14 err:", err, " ct:", ct, " cd.InstID:", cd.InstID, " cd.Period:", cd.Period)
|
||
}(cd)
|
||
go func(cad *core.Candle) {
|
||
time.Sleep(time.Duration(1700) * time.Millisecond)
|
||
err, ct := MakeRsi(cr, cad, 12, false)
|
||
logrus.Warn(GetFuncName(), " rsi12 err:", err, " ct:", ct, " cd.InstID:", cd.InstID, " cd.Period:", cd.Period)
|
||
}(cd)
|
||
go func(cad *core.Candle) {
|
||
time.Sleep(time.Duration(1700) * time.Millisecond)
|
||
err, ct := MakeRsi(cr, cad, 24, false)
|
||
logrus.Warn(GetFuncName(), " rsi24 err:", err, " ct:", ct, " cd.InstID:", cd.InstID, " cd.Period:", cd.Period)
|
||
}(cd)
|
||
// TODO TODO 这地方不能加延时,否则makeMax处理不过来,多的就丢弃了,造成maX的sortedSet比candle的短很多。后面所有依赖的逻辑都受影响.
|
||
// time.Sleep(300 * time.Millisecond)
|
||
}
|
||
}
|
||
|
||
func InvokeCandle(cr *core.Core, candleName string, period string, from int64, to int64) error {
|
||
// 计算from到to之间的时间差(毫秒)
|
||
timeDiff := to - from
|
||
|
||
// 根据period计算每个周期的毫秒数
|
||
periodMinutes, err := cr.PeriodToMinutes(period)
|
||
if err != nil {
|
||
return fmt.Errorf("failed to convert period to minutes: %v", err)
|
||
}
|
||
periodMs := periodMinutes * 60 * 1000 // 转换成毫秒
|
||
|
||
// 计算需要多少个周期的数据
|
||
candleCount := int(timeDiff / periodMs)
|
||
if candleCount <= 0 {
|
||
return fmt.Errorf("invalid time range: from %d to %d", from, to)
|
||
}
|
||
|
||
// 限制最大请求数量,避免请求过大
|
||
if candleCount > 100 {
|
||
candleCount = 100
|
||
}
|
||
|
||
restQ := core.RestQueue{
|
||
InstId: candleName,
|
||
Bar: period,
|
||
WithWs: false,
|
||
Limit: strconv.Itoa(candleCount), // 动态计算limit
|
||
After: from,
|
||
}
|
||
|
||
js, err := json.Marshal(restQ)
|
||
if err != nil {
|
||
return fmt.Errorf("failed to marshal RestQueue: %v", err)
|
||
}
|
||
|
||
cli := cr.RedisLocalCli
|
||
_, err = cli.LPush("restQueue", js).Result()
|
||
if err != nil {
|
||
return fmt.Errorf("failed to push to redis: %v", err)
|
||
}
|
||
|
||
return err
|
||
}
|
||
|
||
// setName := "candle" + period + "|" + instId + "|sortedSet"
|
||
// count: 倒推多少个周期开始拿数据
|
||
// from: 倒推的起始时间点
|
||
// ctype: candle或者maX
|
||
//
|
||
// GetRangeCandleSortedSet 从 Redis 的有序集合中获取指定时间范围内的蜡烛图数据
|
||
// 参数:
|
||
// - cr: 核心对象,包含 Redis 连接等配置
|
||
// - setName: Redis 有序集合的名称,格式为 "candle{period}|{instId}|sortedSet"
|
||
// - count: 需要获取的蜡烛图数量
|
||
// - from: 时间范围的结束时间点
|
||
//
|
||
// 返回值:
|
||
// - *core.CandleList: 获取到的蜡烛图列表
|
||
// - error: 如果获取过程中出现错误则返回错误信息
|
||
func GetRangeCandleSortedSet(cr *core.Core, setName string, count int, from time.Time) (*core.CandleList, error) {
|
||
cdl := core.CandleList{}
|
||
|
||
// 解析 setName 获取周期和交易对信息
|
||
// setName 格式示例:"candle1m|BTC-USDT|sortedSet"
|
||
ary1 := strings.Split(setName, "|")
|
||
ary2 := []string{}
|
||
period := ""
|
||
ary2 = strings.Split(ary1[0], "candle")
|
||
period = ary2[1] // 获取周期,如 "1m"
|
||
|
||
// 将周期转换为分钟数
|
||
dui, err := cr.PeriodToMinutes(period)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 将时间转换为毫秒时间戳
|
||
fromt := from.UnixMilli()
|
||
nw := time.Now().UnixMilli()
|
||
|
||
// 检查时间是否合理,防止时间戳错误
|
||
if fromt > nw*2 {
|
||
err := errors.New("时间错了需要debug")
|
||
logrus.Warning(err.Error())
|
||
return nil, err
|
||
}
|
||
// 计算时间范围
|
||
froms := strconv.FormatInt(fromt, 10) // 结束时间
|
||
sti := fromt - dui*int64(count)*60*1000 // 开始时间 = 结束时间 - (周期 * 数量)
|
||
sts := strconv.FormatInt(sti, 10) // 开始时间字符串
|
||
|
||
// 构建 Redis ZRangeBy 查询参数
|
||
opt := redis.ZRangeBy{
|
||
Min: sts, // 最小时间戳
|
||
Max: froms, // 最大时间戳
|
||
Count: int64(count), // 最大数量
|
||
}
|
||
ary := []string{}
|
||
|
||
// 清理过期数据
|
||
extt, err := cr.GetExpiration(period) // 获取过期时间
|
||
ot := time.Now().Add(extt * -1) // 计算过期时间点
|
||
oti := ot.UnixMilli() // 转换为毫秒时间戳
|
||
cli := cr.RedisLocalCli
|
||
|
||
// 清理过期数据
|
||
cli.LTrim(setName, 0, oti) // 修剪列表
|
||
cunt, _ := cli.ZRemRangeByScore(setName, "0", strconv.FormatInt(oti, 10)).Result() // 移除过期数据
|
||
if cunt > 0 {
|
||
logrus.Warning("移出过期的引用数量:setName: ", setName, " cunt: ", cunt, " , ZRemRangeByScore ", setName, 0, strconv.FormatInt(oti, 10))
|
||
}
|
||
// 从 Redis 有序集合中获取数据
|
||
logrus.Info("ZRevRangeByScore in GetRangeCandleSortedSet: setName:", setName, " opt:", opt)
|
||
ary, err = cli.ZRevRangeByScore(setName, opt).Result() // 按分数范围获取数据
|
||
if err != nil {
|
||
return &cdl, err
|
||
}
|
||
keyAry, err := cli.MGet(ary...).Result()
|
||
if err != nil || len(keyAry) == 0 {
|
||
logrus.Warning("no record with cmd: ZRevRangeByScore ", "setName: ", setName, " from: ", froms, " sts: ", sts, " err:", err.Error())
|
||
logrus.Warning("zrev lens of ary: lens: ", len(ary), "GetRangeSortedSet ZRevRangeByScore:", "setName:", setName, " opt.Max:", opt.Max, " opt.Min:", opt.Min)
|
||
// go func() {
|
||
// parts := strings.Split(setName, "|")
|
||
// instId := parts[1]
|
||
// period, _ := extractString(setName)
|
||
// InvokeCandle(cr, instId, period, fromt, sti)
|
||
// }()
|
||
return &cdl, err
|
||
}
|
||
// 解析并处理获取到的蜡烛图数据
|
||
for _, str := range keyAry {
|
||
if str == nil {
|
||
continue
|
||
}
|
||
cd := core.Candle{}
|
||
// 反序列化 JSON 数据
|
||
err := json.Unmarshal([]byte(str.(string)), &cd)
|
||
if err != nil {
|
||
logrus.Warn(GetFuncName(), err, str.(string))
|
||
}
|
||
|
||
// 检查时间戳是否在指定范围内
|
||
tmi := ToInt64(cd.Data[0])
|
||
tm := time.UnixMilli(tmi)
|
||
if tm.Sub(from) > 0 {
|
||
break
|
||
}
|
||
|
||
// 将有效的蜡烛图数据添加到列表中
|
||
cdl.List = append(cdl.List, &cd)
|
||
}
|
||
|
||
// 设置返回的蜡烛图数量
|
||
cdl.Count = count
|
||
return &cdl, nil
|
||
}
|
||
|
||
func extractString(input string) (string, error) {
|
||
// 定位关键词 maX 或 candle
|
||
var prefix string
|
||
if strings.HasPrefix(input, "maX") {
|
||
prefix = "maX"
|
||
} else if strings.HasPrefix(input, "candle") {
|
||
prefix = "candle"
|
||
} else {
|
||
return "", fmt.Errorf("input does not start with 'maX' or 'candle'")
|
||
}
|
||
|
||
// 去掉前缀部分
|
||
remaining := strings.TrimPrefix(input, prefix)
|
||
|
||
// 找到第一个竖线的位置
|
||
pipeIndex := strings.Index(remaining, "|")
|
||
if pipeIndex == -1 {
|
||
return "", fmt.Errorf("no '|' found in the input")
|
||
}
|
||
|
||
// 返回竖线之前的部分
|
||
return remaining[:pipeIndex], nil
|
||
}
|
||
|
||
// func GetExpiration(cr *core.Core, per string) (time.Duration, error) {
|
||
// if len(per) == 0 {
|
||
// erstr := fmt.Sprint("period没有设置")
|
||
// logrus.Warn(erstr)
|
||
// err := errors.New(erstr)
|
||
// return 0, err
|
||
// }
|
||
// exp, err := cr.PeriodToMinutes(per)
|
||
// dur := time.Duration(exp*319) * time.Minute
|
||
// return dur, err
|
||
// }
|
||
func MakeRsi(cr *core.Core, cl *core.Candle, count int, makeStock bool) (error, int) {
|
||
data := cl.Data
|
||
js, _ := json.Marshal(data)
|
||
if len(data) == 0 {
|
||
err := errors.New("data is block: " + string(js))
|
||
return err, 0
|
||
}
|
||
tsi := ToInt64(data[0])
|
||
// tss := strconv.FormatInt(tsi, 10)
|
||
// keyName := "candle" + cl.Period + "|" + cl.InstID + "|ts:" + tss
|
||
lastTime := time.UnixMilli(tsi)
|
||
setName := "candle" + cl.Period + "|" + cl.InstID + "|sortedSet"
|
||
// dcount := count * 2
|
||
cdl, err := GetRangeCandleSortedSet(cr, setName, count*2, lastTime)
|
||
if err != nil {
|
||
return err, 0
|
||
}
|
||
// amountLast := float64(0)
|
||
// ct := float64(0)
|
||
if len(cdl.List) < 2*count {
|
||
err = errors.New("sortedSet长度不足, 实际长度:" + ToString(len(cdl.List)) + "需要长度:" + ToString(count) + "的2倍, 无法进行rsi计算," + " setName:" + setName + ", fromTime: " + ToString(tsi))
|
||
return err, 0
|
||
}
|
||
cdl.RecursiveBubbleS(len(cdl.List), "asc")
|
||
closeList := []float64{}
|
||
// ll := len(cdl.List)
|
||
// fmt.Println("candleList len:", ll)
|
||
for _, v := range cdl.List {
|
||
// fmt.Println("candle in list", ll, k, v)
|
||
closeList = append(closeList, ToFloat64(v.Data[4]))
|
||
}
|
||
rsiList, err := CalculateRSI(closeList, count)
|
||
if err != nil {
|
||
logrus.Error("Error calculating RSI:", err)
|
||
return err, 0
|
||
}
|
||
rsi := core.Rsi{
|
||
InstID: cl.InstID,
|
||
Period: cl.Period,
|
||
Timestamp: cl.Timestamp,
|
||
Ts: tsi,
|
||
Count: count,
|
||
LastUpdate: time.Now(),
|
||
RsiVol: rsiList[len(rsiList)-1],
|
||
Confirm: false,
|
||
}
|
||
periodMins, err := cr.PeriodToMinutes(cl.Period)
|
||
duration := rsi.LastUpdate.Sub(cl.Timestamp) // 获取时间差
|
||
//最后更新时间差不多大于一个周期,判定为已完成
|
||
if duration > time.Duration(periodMins-1)*time.Minute {
|
||
rsi.Confirm = true
|
||
}
|
||
|
||
// fmt.Println("will send rsi")
|
||
go func() {
|
||
// fmt.Println("make a rsi")
|
||
cr.RsiProcessChan <- &rsi
|
||
}()
|
||
if !makeStock {
|
||
return nil, 0
|
||
}
|
||
|
||
percentK, percentD, err := CalculateStochRSI(rsiList, count, 3, 3)
|
||
|
||
if err != nil {
|
||
logrus.Error("Error calculating StochRSI:", err)
|
||
return err, 0
|
||
}
|
||
srsi := core.StockRsi{
|
||
InstID: cl.InstID,
|
||
Period: cl.Period,
|
||
Timestamp: cl.Timestamp,
|
||
Ts: tsi,
|
||
Count: count,
|
||
LastUpdate: time.Now(),
|
||
KVol: percentK[len(percentK)-1],
|
||
DVol: percentD[len(percentD)-1],
|
||
Confirm: true,
|
||
}
|
||
|
||
// fmt.Println("will send stockrsi")
|
||
go func() {
|
||
// fmt.Println("make a stockrsi")
|
||
cr.StockRsiProcessChan <- &srsi
|
||
}()
|
||
|
||
return nil, 0
|
||
}
|
||
func MakeMaX(cr *core.Core, cl *core.Candle, count int) (error, int) {
|
||
data := cl.Data
|
||
js, _ := json.Marshal(data)
|
||
// cjs, _ := json.Marshal(cl)
|
||
if len(data) == 0 {
|
||
err := errors.New("data is block: " + string(js))
|
||
return err, 0
|
||
}
|
||
|
||
tsi := ToInt64(data[0])
|
||
// tsa := time.UnixMilli(tsi).Format("01-02 15:03:04")
|
||
// fmt.Println("MakeMaX candle: ", cl.InstID, cl.Period, tsa, cl.From)
|
||
tss := strconv.FormatInt(tsi, 10)
|
||
keyName := "candle" + cl.Period + "|" + cl.InstID + "|ts:" + tss
|
||
//过期时间:根号(当前candle的周期/1分钟)*10000
|
||
|
||
lastTime := time.UnixMilli(tsi)
|
||
// lasts := lastTime.Format("2006-01-02 15:04")
|
||
// 以当前candle的时间戳为起点倒推count个周期,取得所需candle用于计算maX
|
||
setName := "candle" + cl.Period + "|" + cl.InstID + "|sortedSet"
|
||
// cdl, err := cr.GetLastCandleListOfCoin(cl.InstID, cl.Period, count, lastTime)
|
||
cdl, err := GetRangeCandleSortedSet(cr, setName, count, lastTime)
|
||
if err != nil {
|
||
// to, _ := cr.PeriodToMinutes(cl.Period)
|
||
// to = tsi + to*ToInt64(count)
|
||
// InvokeCandle(cr, cl.InstID, cl.Period, tsi, to)
|
||
return err, 0
|
||
}
|
||
|
||
fmt.Println("makeMaX: list: ", "instId: ", cl.InstID, "cl.Period: ", cl.Period, " lastTime:", lastTime, " count: ", count)
|
||
amountLast := float64(0)
|
||
ct := float64(0)
|
||
// fmt.Println("makeMax len of GetLastCandleListOfCoin list: ", len(cdl.List), "makeMax err of GetLastCandleListOfCoin: ", err)
|
||
if len(cdl.List) == 0 {
|
||
return err, 0
|
||
}
|
||
// ljs, _ := json.Marshal(cdl.List)
|
||
// fmt.Println("makeMax: ljs: ", string(ljs))
|
||
if len(cdl.List) < count {
|
||
err := errors.New("由于sortedSet容量有限,没有足够的元素数量来计算 maX, setName: " + setName + " ct: " + ToString(ct) + "count: " + ToString(count))
|
||
return err, int(float64(count) - ct)
|
||
}
|
||
for _, v := range cdl.List {
|
||
curLast, err := strconv.ParseFloat(v.Data[4].(string), 64)
|
||
if err != nil {
|
||
logrus.Warn("strconv.ParseFloat err:", err)
|
||
continue
|
||
}
|
||
if curLast > 0 {
|
||
ct++
|
||
} else {
|
||
logrus.Warn("strconv.ParseFloat curLast:", curLast)
|
||
}
|
||
amountLast += curLast
|
||
//----------------------------------------------
|
||
}
|
||
avgLast := amountLast / ct
|
||
if float64(ct) < float64(count) {
|
||
err := errors.New("no enough source to calculate maX, setName: " + setName + " ct: " + ToString(ct) + "count: " + ToString(count))
|
||
return err, int(float64(count) - ct)
|
||
// fmt.Println("makeMax err: 没有足够的数据进行计算ma", "candle:", cl, "counts:", count, "ct:", ct, "avgLast: ")
|
||
} else {
|
||
// fmt.Println("makeMax keyName: ma", count, keyName, " avgLast: ", avgLast, "ts: ", tsi, "ct: ", ct, "ots: ", ots, "candle: ", string(cjs))
|
||
|
||
}
|
||
tm, _ := core.Int64ToTime(tsi)
|
||
logrus.Debug("max tm:", tm)
|
||
mx := core.MaX{
|
||
KeyName: keyName,
|
||
InstID: cl.InstID,
|
||
Period: cl.Period,
|
||
From: cl.From,
|
||
Count: count,
|
||
Ts: tsi,
|
||
AvgVal: avgLast,
|
||
Timestamp: tm,
|
||
}
|
||
// MaX的Data里包含三个有效信息:时间戳,平均值,计算平均值所采用的数列长度
|
||
dt := []interface{}{}
|
||
dt = append(dt, mx.Ts)
|
||
dt = append(dt, mx.AvgVal)
|
||
dt = append(dt, ct)
|
||
mx.Data = dt
|
||
|
||
// key存到redis
|
||
|
||
cr.MaXProcessChan <- &mx
|
||
return nil, 0
|
||
}
|
||
|
||
func CandlesProcess(cr *core.Core) {
|
||
for {
|
||
cd := <-cr.CandlesProcessChan
|
||
cd.LastUpdate = time.Now()
|
||
logrus.Debug("candle in process: ", cd)
|
||
go func(cad *core.Candle) {
|
||
mcd := MyCandle{
|
||
Candle: *cad,
|
||
}
|
||
mcd.Process(cr)
|
||
}(cd)
|
||
}
|
||
}
|
||
|
||
// 使用当前某个原始维度的candle对象,生成其他目标维度的candle对象,比如用3分钟的candle可以生成15分钟及以上的candle
|
||
// {
|
||
// "startTime": "2021-12-04 20:00",
|
||
// "seg": "m",
|
||
// "count": 1
|
||
// },
|
||
// 从startTime开始,经历整数个(count * seg)之后,还能不大于分钟粒度的当前时间的话,那个时间点就是最近的当前段起始时间点
|
||
func MakeSoftCandles(cr *core.Core, mcd *MyCandle) {
|
||
segments := cr.Cfg.Config.Get("softCandleSegmentList").MustArray()
|
||
for k, v := range segments {
|
||
cs := core.CandleSegment{}
|
||
sv, _ := json.Marshal(v)
|
||
json.Unmarshal(sv, &cs)
|
||
// if k > 2 {
|
||
// continue
|
||
// }
|
||
if !cs.Enabled {
|
||
continue
|
||
}
|
||
// TODO: 通过序列化和反序列化,对原始的candle进行克隆,因为是对引用进行操作,所以每个seg里对candle进行操作都会改变原始对象,这和预期不符
|
||
bt, _ := json.Marshal(mcd.Candle)
|
||
cd0 := core.Candle{}
|
||
json.Unmarshal(bt, &cd0)
|
||
|
||
tmi := ToInt64(cd0.Data[0])
|
||
ts, _ := core.Int64ToTime(tmi)
|
||
tm := time.UnixMilli(tmi)
|
||
if tm.Unix() > 10*time.Now().Unix() {
|
||
continue
|
||
}
|
||
// 下面这几种目标维度的,不生成softCandle
|
||
if cs.Seg == "1m" {
|
||
continue
|
||
}
|
||
|
||
otm, err := cr.PeriodToLastTime(cs.Seg, tm)
|
||
logrus.Warn("MakeSoftCandles cs.Seg: ", cs.Seg, ", otm:", otm)
|
||
|
||
if err != nil {
|
||
logrus.Warning("MakeSoftCandles err: ", err)
|
||
}
|
||
otmi := otm.UnixMilli()
|
||
cd1 := core.Candle{
|
||
InstID: cd0.InstID, // string `json:"instId", string`
|
||
Period: cs.Seg, // `json:"period", string`
|
||
Data: cd0.Data, // `json:"data"`
|
||
From: "soft|" + os.Getenv("HOSTNAME"), // string `json:"from"`
|
||
Timestamp: ts,
|
||
}
|
||
|
||
// fmt.Println("makeSoftCandles for: ", cd1)
|
||
// cd0是从tickerInfo创建的1m Candle克隆来的, Data里只有Data[4]被赋值,是last,其他都是"-1"
|
||
// TODO 填充其余几个未赋值的字段,除了成交量和成交美元数以外,并存入redis待用
|
||
// strconv.FormatInt(otmi, 10)
|
||
mcd := MyCandle{
|
||
Candle: cd0,
|
||
}
|
||
cd1.Data = mcd.GetSetCandleInfo(cr, cs.Seg, otmi)
|
||
tmi = ToInt64(cd1.Data[0])
|
||
tm = time.UnixMilli(tmi)
|
||
cd1.Timestamp = tm
|
||
cd1.Open = ToFloat64(cd1.Data[1])
|
||
cd1.High = ToFloat64(cd1.Data[2])
|
||
cd1.Low = ToFloat64(cd1.Data[3])
|
||
cd1.Close = ToFloat64(cd1.Data[4])
|
||
cd1.VolCcy = ToFloat64(cd1.Data[6])
|
||
|
||
// 生成软交易量和交易数对,用于代替last生成max
|
||
go func(k int) {
|
||
time.Sleep(time.Duration(10*k) * time.Millisecond)
|
||
cr.CandlesProcessChan <- &cd1
|
||
}(k)
|
||
}
|
||
}
|
||
|
||
func MaXsProcess(cr *core.Core) {
|
||
for {
|
||
mx := <-cr.MaXProcessChan
|
||
mx.LastUpdate = time.Now()
|
||
logrus.Debug("mx: ", mx)
|
||
go func(maX *core.MaX) {
|
||
mmx := MyMaX{
|
||
MaX: *mx,
|
||
}
|
||
mmx.Process(cr)
|
||
}(mx)
|
||
}
|
||
}
|
||
func RsisProcess(cr *core.Core) {
|
||
for {
|
||
rsi := <-cr.RsiProcessChan
|
||
// logrus.Debug("mx: ", mx)
|
||
logrus.Debug("rsi recieved:", rsi)
|
||
go func(rsi *core.Rsi) {
|
||
mrs := MyRsi{
|
||
Rsi: *rsi,
|
||
}
|
||
mrs.Process(cr)
|
||
}(rsi)
|
||
}
|
||
}
|
||
|
||
func StockRsisProcess(cr *core.Core) {
|
||
for {
|
||
srsi := <-cr.StockRsiProcessChan
|
||
// logrus.Debug("mx: ", mx)
|
||
logrus.Debug("stockrsi recieved:", srsi)
|
||
go func(srsi *core.StockRsi) {
|
||
mrs := MyStockRsi{
|
||
StockRsi: *srsi,
|
||
}
|
||
mrs.Process(cr)
|
||
}(srsi)
|
||
}
|
||
}
|
||
func TickerInfoProcess(cr *core.Core) {
|
||
for {
|
||
ti := <-cr.TickerInforocessChan
|
||
logrus.Debug("ti: ", ti)
|
||
go func(ti *core.TickerInfo) {
|
||
mti := MyTickerInfo{
|
||
TickerInfo: *ti,
|
||
}
|
||
mti.Process(cr)
|
||
}(ti)
|
||
}
|
||
}
|
||
|
||
// 计算 RSI 的函数
|
||
|
||
// CalculateRSI calculates the RSI value for a given period and price data.
|
||
// prices: input price data, must be equal to the period length.
|
||
|
||
// CalculateRSI calculates the Relative Strength Index (RSI) for a given period.
|
||
func CalculateRSI(prices []float64, period int) ([]float64, error) {
|
||
if len(prices) < period {
|
||
return nil, errors.New("not enough data to calculate RSI")
|
||
}
|
||
|
||
rsi := make([]float64, len(prices)-period+1)
|
||
var avgGain, avgLoss float64
|
||
|
||
// Initial average gain and loss
|
||
for i := 1; i <= period; i++ {
|
||
change := prices[i] - prices[i-1]
|
||
if change > 0 {
|
||
avgGain += change
|
||
} else {
|
||
avgLoss -= change
|
||
}
|
||
}
|
||
avgGain /= float64(period)
|
||
avgLoss /= float64(period)
|
||
|
||
if avgLoss == 0 {
|
||
rsi[0] = 100
|
||
} else {
|
||
rs := avgGain / avgLoss
|
||
rsi[0] = 100 - (100 / (1 + rs))
|
||
}
|
||
|
||
// Calculate RSI for the rest of the data
|
||
for i := period; i < len(prices); i++ {
|
||
change := prices[i] - prices[i-1]
|
||
if change > 0 {
|
||
avgGain = (avgGain*(float64(period)-1) + change) / float64(period)
|
||
avgLoss = (avgLoss * (float64(period) - 1)) / float64(period)
|
||
} else {
|
||
avgGain = (avgGain * (float64(period) - 1)) / float64(period)
|
||
avgLoss = (avgLoss*(float64(period)-1) - change) / float64(period)
|
||
}
|
||
|
||
if avgLoss == 0 {
|
||
rsi[i-period+1] = 100
|
||
} else {
|
||
rs := avgGain / avgLoss
|
||
rsi[i-period+1] = 100 - (100 / (1 + rs))
|
||
}
|
||
}
|
||
|
||
return rsi, nil
|
||
}
|
||
|
||
// CalculateStochRSI calculates the Stochastic RSI.
|
||
func CalculateStochRSI(rsi []float64, period int, kSmoothing int, dSmoothing int) ([]float64, []float64, error) {
|
||
if len(rsi) < period {
|
||
return nil, nil, errors.New("not enough data to calculate StochRSI")
|
||
}
|
||
|
||
stochRsi := make([]float64, len(rsi)-period+1)
|
||
for i := period; i <= len(rsi); i++ {
|
||
lowest := rsi[i-period]
|
||
highest := rsi[i-period]
|
||
for j := i - period + 1; j < i; j++ {
|
||
if rsi[j] < lowest {
|
||
lowest = rsi[j]
|
||
}
|
||
if rsi[j] > highest {
|
||
highest = rsi[j]
|
||
}
|
||
}
|
||
|
||
if highest == lowest {
|
||
stochRsi[i-period] = 0
|
||
} else {
|
||
stochRsi[i-period] = (rsi[i-1] - lowest) / (highest - lowest)
|
||
}
|
||
}
|
||
|
||
// Smooth %K
|
||
percentK := smooth(stochRsi, kSmoothing)
|
||
|
||
// Smooth %D (signal line)
|
||
percentD := smooth(percentK, dSmoothing)
|
||
|
||
return percentK, percentD, nil
|
||
}
|
||
|
||
// Smooth applies a simple moving average to smooth the data.
|
||
func smooth(data []float64, period int) []float64 {
|
||
if period <= 1 || len(data) < period {
|
||
return data
|
||
}
|
||
|
||
smoothed := make([]float64, len(data)-period+1)
|
||
for i := period - 1; i < len(data); i++ {
|
||
sum := 0.0
|
||
for j := i - period + 1; j <= i; j++ {
|
||
sum += data[j]
|
||
}
|
||
smoothed[i-period+1] = sum / float64(period)
|
||
}
|
||
|
||
return smoothed
|
||
}
|