This commit is contained in:
zhangkun9038@dingtalk.com 2025-02-20 15:05:16 +08:00
parent 7ce7deba92
commit 6cb2bcb49b
5 changed files with 382 additions and 380 deletions

View File

@ -7,6 +7,8 @@ import (
logrus "github.com/sirupsen/logrus"
"github.com/phyer/core/internal/core"
"github.com/phyer/core/internal/models"
"github.com/phyer/core/internal/utils"
"strconv"
"time"
)
@ -38,14 +40,14 @@ type WillMX struct {
}
func (mx MaX) SetToKey(cr *core.Core) ([]interface{}, error) {
// fmt.Println(utils.GetFuncName(), " step1 ", mx.InstID, " ", mx.Period)
// fmt.Println(utils.utils.GetFuncName(), " step1 ", mx.InstID, " ", mx.Period)
// mx.Timestamp, _ = Int64ToTime(mx.Ts)
cstr := strconv.Itoa(mx.Count)
tss := strconv.FormatInt(mx.Ts, 10)
//校验时间戳是否合法
ntm, err := cr.PeriodToLastTime(mx.Period, time.UnixMilli(mx.Ts))
if ntm.UnixMilli() != mx.Ts {
logrus.Warn(fmt.Sprint(GetFuncName(), " candles时间戳有问题 ", " 应该: ", ntm, "实际:", mx.Ts))
logrus.Warn(fmt.Sprint(utils.GetFuncName(), " candles时间戳有问题 ", " 应该: ", ntm, "实际:", mx.Ts))
mx.Ts = ntm.UnixMilli()
}
keyName := "ma" + cstr + "|candle" + mx.Period + "|" + mx.InstID + "|ts:" + tss
@ -60,7 +62,7 @@ func (mx MaX) SetToKey(cr *core.Core) ([]interface{}, error) {
logrus.Error("max SetToKey err: ", err)
return mx.Data, err
}
// fmt.Println(utils.GetFuncName(), " step2 ", mx.InstID, " ", mx.Period)
// fmt.Println(utils.utils.GetFuncName(), " step2 ", mx.InstID, " ", mx.Period)
// tm := time.UnixMilli(mx.Ts).Format("01-02 15:04")
cli := cr.RedisLocalCli
if len(string(dj)) == 0 {
@ -68,13 +70,13 @@ func (mx MaX) SetToKey(cr *core.Core) ([]interface{}, error) {
err := errors.New("data is block")
return mx.Data, err
}
// fmt.Println(utils.GetFuncName(), " step3 ", mx.InstID, " ", mx.Period)
// fmt.Println(utils.utils.GetFuncName(), " step3 ", mx.InstID, " ", mx.Period)
_, err = cli.Set(keyName, dj, extt).Result()
if err != nil {
logrus.Error(GetFuncName(), " maXSetToKey err:", err)
logrus.Error(utils.GetFuncName(), " maXSetToKey err:", err)
return mx.Data, err
}
// fmt.Println(utils.GetFuncName(), " step4 ", mx.InstID, " ", mx.Period)
// fmt.Println(utils.utils.GetFuncName(), " step4 ", mx.InstID, " ", mx.Period)
// fmt.Println("max setToKey: ", keyName, "res:", res, "data:", string(dj), "from: ", mx.From)
cr.SaveUniKey(mx.Period, keyName, extt, mx.Ts)
return mx.Data, err
@ -94,7 +96,7 @@ func Int64ToTime(ts int64) (time.Time, error) {
t = t.In(loc)
return t, nil
}
func (mx *MaX) PushToWriteLogChan(cr *Core) error {
func (mx *MaX) PushToWriteLogChan(cr *core.Core) error {
s := strconv.FormatFloat(float64(mx.Ts), 'f', 0, 64)
did := "ma" + ToString(mx.Count) + "|" + mx.InstID + "|" + mx.Period + "|" + s
logrus.Debug("did of max:", did)
@ -113,7 +115,7 @@ func (mx *MaX) PushToWriteLogChan(cr *Core) error {
// TODO
// 返回:
// Sample被顶出队列的元素
func (mxl *MaXList) RPush(sm *MaX) (Sample, error) {
func (mxl *MaXList) RPush(sm *MaX) (models.Sample, error) {
last := MaX{}
bj, _ := json.Marshal(*sm)
json.Unmarshal(bj, &sm)
@ -171,7 +173,7 @@ func (mxl *MaXList) RecursiveBubbleS(length int, ctype string) error {
}
// TODO pixel
func (mxl *MaXList) MakePixelList(cr *Core, mx *MaX, score float64) (*PixelList, error) {
func (mxl *MaXList) MakePixelList(cr *core.Core, mx *MaX, score float64) (*core.PixelList, error) {
if len(mx.Data) == 2 {
err := errors.New("ma30 原始数据不足30条")
return nil, err
@ -180,14 +182,14 @@ func (mxl *MaXList) MakePixelList(cr *Core, mx *MaX, score float64) (*PixelList,
err := errors.New("ma30 原始数据不足30条")
return nil, err
}
pxl := PixelList{
pxl := core.PixelList{
Count: mxl.Count,
UpdateNickName: mxl.UpdateNickName,
LastUpdateTime: mxl.LastUpdateTime,
List: []*Pixel{},
List: []*core.Pixel{},
}
for i := 0; i < mxl.Count; i++ {
pix := Pixel{}
pix := core.Pixel{}
pxl.List = append(pxl.List, &pix)
}
ma30Val := (mx.Data[1]).(float64)

View File

@ -14,30 +14,33 @@ import (
"time"
"github.com/go-redis/redis"
"github.com/phyer/texus/private"
// "github.com/phyer/texus/private"
"github.com/phyer/core/analysis"
"github.com/phyer/core/config"
"github.com/phyer/core/models"
"github.com/phyer/v5sdkgo/rest"
logrus "github.com/sirupsen/logrus"
)
type Core struct {
Env string
Cfg *MyConfig
Cfg *config.MyConfig
RedisLocalCli *redis.Client
RedisRemoteCli *redis.Client
FluentBitUrl string
PlateMap map[string]*Plate
TrayMap map[string]*Tray
PlateMap map[string]*models.Plate
TrayMap map[string]*models.Tray
CoasterMd5SyncMap sync.Map
Mu *sync.Mutex
Mu1 *sync.Mutex
Waity *sync.WaitGroup
CandlesProcessChan chan *Candle
CandlesProcessChan chan *models.Candle
MaXProcessChan chan *MaX
RsiProcessChan chan *Rsi
StockRsiProcessChan chan *StockRsi
TickerInforocessChan chan *TickerInfo
CoasterChan chan *CoasterInfo
SeriesChan chan *SeriesInfo // to be init
analysis.SeriesChan chan *analysis.SeriesInfo // to be init
SegmentItemChan chan *SegmentItem // to be init
MakeMaXsChan chan *Candle
ShearForceGrpChan chan *ShearForceGrp // to be init
@ -48,6 +51,77 @@ type Core struct {
WriteLogChan chan *WriteLog
}
func (cre *coreCore) GetCandlesWithRest(instId string, kidx int, dura time.Duration, maxCandles int) error {
ary := []string{}
wsary := cre.Cfg.CandleDimentions
for k, v := range wsary {
matched := false
// 这个算法的目的是越靠后的candles维度被命中的概率越低第一个百分之百命中后面开始越来越低, 每分钟都会发生这样的计算,
// 因为维度多了的话,照顾不过来
rand.New(rand.NewSource(time.Now().UnixNano()))
rand.Seed(time.Now().UnixNano())
n := (k*2 + 2) * 3
if n < 1 {
n = 1
}
b := rand.Intn(n)
if b < 8 {
matched = true
}
if matched {
ary = append(ary, v)
}
}
mdura := dura/(time.Duration(len(ary)+1)) - 50*time.Millisecond
// fmt.Println("loop4 Ticker Start instId, dura: ", instId, dura, dura/10, mdura, len(ary), " idx: ", kidx)
// time.Duration(len(ary)+1)
ticker := time.NewTicker(mdura)
done := make(chan bool)
idx := 0
go func(i int) {
for {
select {
case <-ticker.C:
if i >= (len(ary)) {
done <- true
break
}
rand.Seed(time.Now().UnixNano())
b := rand.Intn(2)
maxCandles = maxCandles * (i + b) * 2
if maxCandles < 3 {
maxCandles = 3
}
if maxCandles > 30 {
maxCandles = 30
}
mx := strconv.Itoa(maxCandles)
// fmt.Println("loop4 getCandlesWithRest, instId, period,limit,dura, t: ", instId, ary[i], mx, mdura)
go func(ii int) {
restQ := RestQueue{
InstId: instId,
Bar: ary[ii],
Limit: mx,
Duration: mdura,
WithWs: true,
}
js, _ := json.Marshal(restQ)
coreRedisLocalCli.LPush("restQueue", js)
}(i)
i++
}
}
}(idx)
time.Sleep(dura - 10*time.Millisecond)
ticker.Stop()
// fmt.Println("loop4 Ticker stopped instId, dura: ", instId, dura, mdura)
done <- true
return nil
}
type RestQueue struct {
InstId string
Bar string
@ -124,22 +198,22 @@ func WriteLogProcess(cr *Core) {
// }
func (core *Core) Init() {
core.Env = os.Getenv("GO_ENV")
coreEnv = os.Getenv("GO_ENV")
gitBranch := os.Getenv("gitBranchName")
commitID := os.Getenv("gitCommitID")
logrus.Info("当前环境: ", core.Env)
logrus.Info("当前环境: ", coreEnv)
logrus.Info("gitBranch: ", gitBranch)
logrus.Info("gitCommitID: ", commitID)
cfg := MyConfig{}
cfg, _ = cfg.Init()
core.Cfg = &cfg
cli, err := core.GetRedisLocalCli()
core.RedisLocalCli = cli
core.RestQueueChan = make(chan *RestQueue)
core.WriteLogChan = make(chan *WriteLog)
coreCfg = &cfg
cli, err := coreGetRedisLocalCli()
coreRedisLocalCli = cli
coreRestQueueChan = make(chan *RestQueue)
coreWriteLogChan = make(chan *WriteLog)
// 跟订单有关的都关掉
// core.OrderChan = make(chan *private.Order)
// coreOrderChan = make(chan *private.Order)
if err != nil {
logrus.Error("init redis client err: ", err)
}
@ -162,9 +236,9 @@ func (core *Core) GetRedisCliFromConf(conf RedisConfig) (*redis.Client, error) {
}
func (core *Core) GetRemoteRedisLocalCli() (*redis.Client, error) {
ru := core.Cfg.RedisConf.Url
rp := core.Cfg.RedisConf.Password
ri := core.Cfg.RedisConf.Index
ru := coreCfg.RedisConf.Url
rp := coreCfg.RedisConf.Password
ri := coreCfg.RedisConf.Index
re := os.Getenv("REDIS_URL")
if len(re) > 0 {
ru = re
@ -184,9 +258,9 @@ func (core *Core) GetRemoteRedisLocalCli() (*redis.Client, error) {
return client, nil
}
func (core *Core) GetRedisLocalCli() (*redis.Client, error) {
ru := core.Cfg.RedisConf.Url
rp := core.Cfg.RedisConf.Password
ri := core.Cfg.RedisConf.Index
ru := coreCfg.RedisConf.Url
rp := coreCfg.RedisConf.Password
ri := coreCfg.RedisConf.Index
re := os.Getenv("REDIS_URL")
if len(re) > 0 {
ru = re
@ -209,7 +283,7 @@ func (core *Core) GetRedisLocalCli() (*redis.Client, error) {
// 这些应该是放到 texus 里实现的
func (core *Core) GetAllTickerInfo() (*rest.RESTAPIResult, error) {
// GET / 获取所有产品行情信息
rsp, err := core.RestInvoke("/api/v5/market/tickers?instType=SPOT", rest.GET)
rsp, err := coreRestInvoke("/api/v5/market/tickers?instType=SPOT", rest.GET)
return rsp, err
}
@ -217,14 +291,14 @@ func (core *Core) GetAllTickerInfo() (*rest.RESTAPIResult, error) {
//
// func (core *Core) GetBalances() (*rest.RESTAPIResult, error) {
// // TODO 临时用了两个实现restInvoke复用原来的会有bug不知道是谁的bug
// rsp, err := core.RestInvoke2("/api/v5/account/balance", rest.GET, nil)
// rsp, err := coreRestInvoke2("/api/v5/account/balance", rest.GET, nil)
// return rsp, err
// }
//
// func (core *Core) GetLivingOrderList() ([]*private.Order, error) {
// // TODO 临时用了两个实现restInvoke复用原来的会有bug不知道是谁的bug
// params := make(map[string]interface{})
// data, err := core.RestInvoke2("/api/v5/trade/orders-pending", rest.GET, &params)
// data, err := coreRestInvoke2("/api/v5/trade/orders-pending", rest.GET, &params)
// odrsp := private.OrderResp{}
// err = json.Unmarshal([]byte(data.Body), &odrsp)
// str, _ := json.Marshal(odrsp)
@ -239,7 +313,7 @@ func (core *Core) GetAllTickerInfo() (*rest.RESTAPIResult, error) {
// time.Sleep(3 * time.Second)
// ctype := ws.SPOT
//
// redisCli := core.RedisLocalCli
// redisCli := coreRedisLocalCli
// counts, err := redisCli.HLen("instruments|" + ctype + "|hash").Result()
// if err != nil {
// fmt.Println("err of hset to redis:", err)
@ -254,7 +328,7 @@ func (core *Core) GetAllTickerInfo() (*rest.RESTAPIResult, error) {
// func (core *Core) SubscribeTicker(op string) error {
// mp := make(map[string]string)
//
// redisCli := core.RedisLocalCli
// redisCli := coreRedisLocalCli
// ctype := ws.SPOT
// mp, err := redisCli.HGetAll("instruments|" + ctype + "|hash").Result()
// b, err := json.Marshal(mp)
@ -273,7 +347,7 @@ func (core *Core) GetAllTickerInfo() (*rest.RESTAPIResult, error) {
// time.Sleep(5 * time.Second)
// go func(instId string, op string) {
//
// redisCli := core.RedisLocalCli
// redisCli := coreRedisLocalCli
// _, err = redisCli.SAdd("tickers|"+op+"|set", instId).Result()
// if err != nil {
// fmt.Println("err of unMarshalJson5:", js)
@ -286,7 +360,7 @@ func (core *Core) GetAllTickerInfo() (*rest.RESTAPIResult, error) {
// 通过接口获取一个币种名下的某个时间范围内的Candle对象集合
// 按说这个应该放到 texus里实现
func (core *Core) v5PublicInvoke(subUrl string) (*CandleData, error) {
restUrl, _ := core.Cfg.Config.Get("connect").Get("restBaseUrl").String()
restUrl, _ := coreCfg.Config.Get("connect").Get("restBaseUrl").String()
url := restUrl + subUrl
resp, err := http.Get(url)
if err != nil {
@ -303,14 +377,14 @@ func (core *Core) v5PublicInvoke(subUrl string) (*CandleData, error) {
}
func (core *Core) RestInvoke(subUrl string, method string) (*rest.RESTAPIResult, error) {
restUrl, _ := core.Cfg.Config.Get("connect").Get("restBaseUrl").String()
restUrl, _ := coreCfg.Config.Get("connect").Get("restBaseUrl").String()
//ep, method, uri string, param *map[string]interface{}
rest := rest.NewRESTAPI(restUrl, method, subUrl, nil)
key, _ := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessKey").String()
secure, _ := core.Cfg.Config.Get("credentialReadOnly").Get("secretKey").String()
pass, _ := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessPassphrase").String()
key, _ := coreCfg.Config.Get("credentialReadOnly").Get("okAccessKey").String()
secure, _ := coreCfg.Config.Get("credentialReadOnly").Get("secretKey").String()
pass, _ := coreCfg.Config.Get("credentialReadOnly").Get("okAccessPassphrase").String()
isDemo := false
if core.Env == "demoEnv" {
if coreEnv == "demoEnv" {
isDemo = true
}
rest.SetSimulate(isDemo).SetAPIKey(key, secure, pass)
@ -320,13 +394,187 @@ func (core *Core) RestInvoke(subUrl string, method string) (*rest.RESTAPIResult,
}
return response, err
}
// 保证同一个 period, keyName 在一个周期里SaveToSortSet只会被执行一次
func (core *core.Core) SaveUniKey(period string, keyName string, extt time.Duration, tsi int64) {
refName := keyName + "|refer"
// refRes, _ := core.RedisLocalCli.GetSet(refName, 1).Result()
core.RedisLocalCli.Expire(refName, extt)
// 为保证唯一性机制防止SaveToSortSet 被重复执行, ps: 不需要唯一此操作幂等在redis里
// founded, _ := core.findInSortSet(period, keyName, extt, tsi)
// if len(refRes) != 0 {
// logrus.Error("refName exist: ", refName)
// return
// }
core.SaveToSortSet(period, keyName, extt, tsi)
}
func (core *core.Core) findInSortSet(period string, keyName string, extt time.Duration, tsi int64) (bool, error) {
founded := false
ary := strings.Split(keyName, "ts:")
setName := ary[0] + "sortedSet"
opt := redis.ZRangeBy{
Min: ToString(tsi),
Max: ToString(tsi),
}
rs, err := core.RedisLocalCli.ZRangeByScore(setName, opt).Result()
if len(rs) > 0 {
founded = true
}
if err != nil {
logrus.Error("err of ma7|ma30 add to redis:", err)
} else {
logrus.Info("sortedSet added to redis:", rs, keyName)
}
return founded, nil
}
// tsi: 上报时间timeStamp millinSecond
func (core *core.Core) SaveToSortSet(period string, keyName string, extt time.Duration, tsi int64) {
ary := strings.Split(keyName, "ts:")
setName := ary[0] + "sortedSet"
z := redis.Z{
Score: float64(tsi),
Member: keyName,
}
rs, err := core.RedisLocalCli.ZAdd(setName, z).Result()
if err != nil {
logrus.Warn("err of ma7|ma30 add to redis:", err)
} else {
logrus.Warn("sortedSet added to redis:", rs, keyName)
}
}
// 根据周期的文本内容,返回这代表多少个分钟
func (cr *core.Core) PeriodToMinutes(period string) (int64, error) {
ary := strings.Split(period, "")
beiStr := "1"
danwei := ""
if len(ary) == 0 {
err := errors.New(utils.GetFuncName() + " period is block")
return 0, err
}
if len(ary) == 3 {
beiStr = ary[0] + ary[1]
danwei = ary[2]
} else {
beiStr = ary[0]
danwei = ary[1]
}
cheng := 1
bei, _ := strconv.Atoi(beiStr)
switch danwei {
case "m":
{
cheng = bei
break
}
case "H":
{
cheng = bei * 60
break
}
case "D":
{
cheng = bei * 60 * 24
break
}
case "W":
{
cheng = bei * 60 * 24 * 7
break
}
case "M":
{
cheng = bei * 60 * 24 * 30
break
}
case "Y":
{
cheng = bei * 60 * 24 * 365
break
}
default:
{
logrus.Warning("notmatch:", danwei, period)
panic("notmatch:" + period)
}
}
return int64(cheng), nil
}
// type ScanCmd struct {
// baseCmd
//
// page []string
// cursor uint64
//
// process func(cmd Cmder) error
// }
func (core *core.Core) GetRangeKeyList(pattern string, from time.Time) ([]*simple.Json, error) {
// 比如用来计算ma30或ma7倒推多少时间范围
redisCli := core.RedisLocalCli
cursor := uint64(0)
n := 0
allTs := []int64{}
var keys []string
for {
var err error
keys, cursor, _ = redisCli.Scan(cursor, pattern+"*", 2000).Result()
if err != nil {
panic(err)
}
n += len(keys)
if n == 0 {
break
}
}
// keys, _ := redisCli.Keys(pattern + "*").Result()
for _, key := range keys {
keyAry := strings.Split(key, ":")
key = keyAry[1]
keyi64, _ := strconv.ParseInt(key, 10, 64)
allTs = append(allTs, keyi64)
}
nary := utils.RecursiveBubble(allTs, len(allTs))
tt := from.UnixMilli()
ff := tt - tt%60000
fi := int64(ff)
mary := []int64{}
for _, v := range nary {
if v < fi {
break
}
mary = append(mary, v)
}
res := []*simple.Json{}
for _, v := range mary {
// if k > 1 {
// break
// }
nv := pattern + strconv.FormatInt(v, 10)
str, err := redisCli.Get(nv).Result()
if err != nil {
logrus.Error("err of redis get key:", nv, err)
}
cur, err := simple.NewJson([]byte(str))
if err != nil {
logrus.Error("err of create newJson:", str, err)
}
res = append(res, cur)
}
return res, nil
}
// func (core *Core) RestInvoke2(subUrl string, method string, param *map[string]interface{}) (*rest.RESTAPIResult, error) {
// key, err1 := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessKey").String()
// secret, err2 := core.Cfg.Config.Get("credentialReadOnly").Get("secretKey").String()
// pass, err3 := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessPassphrase").String()
// userId, err4 := core.Cfg.Config.Get("connect").Get("userId").String()
// restUrl, err5 := core.Cfg.Config.Get("connect").Get("restBaseUrl").String()
// key, err1 := coreCfg.Config.Get("credentialReadOnly").Get("okAccessKey").String()
// secret, err2 := coreCfg.Config.Get("credentialReadOnly").Get("secretKey").String()
// pass, err3 := coreCfg.Config.Get("credentialReadOnly").Get("okAccessPassphrase").String()
// userId, err4 := coreCfg.Config.Get("connect").Get("userId").String()
// restUrl, err5 := coreCfg.Config.Get("connect").Get("restBaseUrl").String()
// if err1 != nil || err2 != nil || err3 != nil || err4 != nil || err5 != nil {
// fmt.Println(err1, err2, err3, err4, err5)
// } else {
@ -338,7 +586,7 @@ func (core *Core) RestInvoke(subUrl string, method string) (*rest.RESTAPIResult,
// // }
// // rs := rest.NewRESTAPI(restUrl, method, subUrl, &reqParam)
// isDemo := false
// if core.Env == "demoEnv" {
// if coreEnv == "demoEnv" {
// isDemo = true
// }
// // rs.SetSimulate(isDemo).SetAPIKey(key, secret, pass).SetUserId(userId)
@ -362,11 +610,11 @@ func (core *Core) RestInvoke(subUrl string, method string) (*rest.RESTAPIResult,
// 跟下单有关的都关掉,以后再说
// func (core *Core) RestPost(subUrl string, param *map[string]interface{}) (*rest.RESTAPIResult, error) {
// key, err1 := core.Cfg.Config.Get("credentialMutable").Get("okAccessKey").String()
// secret, err2 := core.Cfg.Config.Get("credentialMutable").Get("secretKey").String()
// pass, err3 := core.Cfg.Config.Get("credentialMutable").Get("okAccessPassphrase").String()
// userId, err4 := core.Cfg.Config.Get("connect").Get("userId").String()
// restUrl, err5 := core.Cfg.Config.Get("connect").Get("restBaseUrl").String()
// key, err1 := coreCfg.Config.Get("credentialMutable").Get("okAccessKey").String()
// secret, err2 := coreCfg.Config.Get("credentialMutable").Get("secretKey").String()
// pass, err3 := coreCfg.Config.Get("credentialMutable").Get("okAccessPassphrase").String()
// userId, err4 := coreCfg.Config.Get("connect").Get("userId").String()
// restUrl, err5 := coreCfg.Config.Get("connect").Get("restBaseUrl").String()
// if err1 != nil || err2 != nil || err3 != nil || err4 != nil || err5 != nil {
// fmt.Println(err1, err2, err3, err4, err5)
// } else {
@ -379,7 +627,7 @@ func (core *Core) RestInvoke(subUrl string, method string) (*rest.RESTAPIResult,
// PassPhrase: pass,
// }
// isDemo := false
// if core.Env == "demoEnv" {
// if coreEnv == "demoEnv" {
// isDemo = true
// }
// cli := rest.NewRESTClient(restUrl, &apikey, isDemo)
@ -392,7 +640,7 @@ func (core *Core) RestInvoke(subUrl string, method string) (*rest.RESTAPIResult,
// 我当前持有的币,每分钟刷新
func (core *Core) GetMyFavorList() []string {
redisCli := core.RedisLocalCli
redisCli := coreRedisLocalCli
opt := redis.ZRangeBy{
Min: "10",
Max: "100000000000",
@ -410,8 +658,8 @@ func (core *Core) GetMyFavorList() []string {
// 改了,不需要交易排行榜,我手动指定一个排行即可, tickersVol|sortedSet 改成 tickersList|sortedSet
func (core *Core) GetScoreList(count int) []string {
// redisCli := core.RedisLocalCli
myFocusList := core.Cfg.Config.Get("focusList").MustArray()
// redisCli := coreRedisLocalCli
myFocusList := coreCfg.Config.Get("focusList").MustArray()
logrus.Debug("curList: ", myFocusList)
lst := []string{}
for _, v := range myFocusList {
@ -622,7 +870,7 @@ func (core *Core) GetRangeMaXSortedSet(setName string, count int, from time.Time
ary2 = strings.Split(ary1[1], "candle")
period = ary2[1]
dui, err := core.PeriodToMinutes(period)
dui, err := corePeriodToMinutes(period)
if err != nil {
return mxl, err
}
@ -637,7 +885,7 @@ func (core *Core) GetRangeMaXSortedSet(setName string, count int, from time.Time
}
ary := []string{}
logrus.Debug("ZRevRangeByScore ", " setName:", setName, " froms:", froms, " sts:", sts)
dura, err := core.GetExpiration(period)
dura, err := coreGetExpiration(period)
if err != nil {
return mxl, err
}
@ -645,7 +893,7 @@ func (core *Core) GetRangeMaXSortedSet(setName string, count int, from time.Time
ot := time.Now().Add(dura * -1)
oti := ot.UnixMilli()
// fmt.Println(fmt.Sprint("GetExpiration zRemRangeByScore ", setName, " ", 0, " ", strconv.FormatInt(oti, 10)))
cli := core.RedisLocalCli
cli := coreRedisLocalCli
cli.LTrim(setName, 0, oti)
cunt, _ := cli.ZRemRangeByScore(setName, "0", strconv.FormatInt(oti, 10)).Result()
if cunt > 0 {
@ -763,7 +1011,7 @@ func (core *Core) GetRangeCandleSortedSet(setName string, count int, from time.T
period := strings.TrimPrefix(ary[0], "candle")
// 获取period对应的分钟数
durationMinutes, err := core.PeriodToMinutes(period)
durationMinutes, err := corePeriodToMinutes(period)
if err != nil {
return nil, fmt.Errorf("failed to get period minutes: %w", err)
}
@ -779,12 +1027,12 @@ func (core *Core) GetRangeCandleSortedSet(setName string, count int, from time.T
startTs := fromTs - durationMinutes*int64(count)*60*1000
// 清理过期数据
if err := core.cleanExpiredData(setName, period); err != nil {
if err := corecleanExpiredData(setName, period); err != nil {
logrus.Warnf("Failed to clean expired data: %v", err)
}
// 从Redis获取数据
cli := core.RedisLocalCli
cli := coreRedisLocalCli
opt := redis.ZRangeBy{
Min: strconv.FormatInt(startTs, 10),
Max: strconv.FormatInt(fromTs, 10),
@ -836,12 +1084,12 @@ func (core *Core) GetRangeCandleSortedSet(setName string, count int, from time.T
// cleanExpiredData 清理过期的数据
func (core *Core) cleanExpiredData(setName, period string) error {
expiration, err := core.GetExpiration(period)
expiration, err := coreGetExpiration(period)
if err != nil {
return err
}
cli := core.RedisLocalCli
cli := coreRedisLocalCli
expirationTime := time.Now().Add(-expiration)
expirationTs := strconv.FormatInt(expirationTime.UnixMilli(), 10)
@ -868,40 +1116,40 @@ func (cr *Core) GetCoasterFromPlate(instID string, period string) (Coaster, erro
return co, nil
}
func (core *Core) GetPixelSeries(instId string, period string) (Series, error) {
srs := Series{}
func (core *Core) GetPixelanalysis.Series(instId string, period string) (analysis.Series, error) {
srs := analysis.Series{}
srName := instId + "|" + period + "|series"
cli := core.RedisLocalCli
cli := coreRedisLocalCli
srsStr, err := cli.Get(srName).Result()
if err != nil {
return *new(Series), err
return *new(analysis.Series), err
}
err = json.Unmarshal([]byte(srsStr), &srs)
if err != nil {
return *new(Series), err
return *new(analysis.Series), err
}
logrus.Info("sei:", srsStr)
err = srs.CandleSeries.RecursiveBubbleS(srs.CandleSeries.Count, "asc")
err = srs.Candleanalysis.Series.RecursiveBubbleS(srs.Candleanalysis.Series.Count, "asc")
if err != nil {
return *new(Series), err
return *new(analysis.Series), err
}
// err = srs.CandleSeries.RecursiveBubbleX(srs.CandleSeries.Count, "asc")
// err = srs.Candleanalysis.Series.RecursiveBubbleX(srs.Candleanalysis.Series.Count, "asc")
// if err != nil {
// return nil, err
// }
err = srs.Ma7Series.RecursiveBubbleS(srs.CandleSeries.Count, "asc")
err = srs.Ma7analysis.Series.RecursiveBubbleS(srs.Candleanalysis.Series.Count, "asc")
if err != nil {
return *new(Series), err
return *new(analysis.Series), err
}
// err = srs.Ma7Series.RecursiveBubbleX(srs.CandleSeries.Count, "asc")
// err = srs.Ma7analysis.Series.RecursiveBubbleX(srs.Candleanalysis.Series.Count, "asc")
// if err != nil {
// return nil, err
// }
err = srs.Ma30Series.RecursiveBubbleS(srs.CandleSeries.Count, "asc")
err = srs.Ma30analysis.Series.RecursiveBubbleS(srs.Candleanalysis.Series.Count, "asc")
if err != nil {
return *new(Series), err
return *new(analysis.Series), err
}
// err = srs.Ma30Series.RecursiveBubbleX(srs.CandleSeries.Count, "asc")
// err = srs.Ma30analysis.Series.RecursiveBubbleX(srs.Candleanalysis.Series.Count, "asc")
// if err != nil {
// return nil, err
// }

View File

@ -2,11 +2,9 @@ package market
import (
"encoding/json"
"fmt"
"reflect"
"strconv"
"time"
"github.com/phyer/core/internal/core" // 新增
"github.com/phyer/core/internal/utils" // 新增
)
@ -33,52 +31,16 @@ func (tir *TickerInfoResp) Convert() TickerInfo {
Id: utils.HashString(tir.InstID + tir.Ts),
InstID: tir.InstID,
InstType: tir.InstType,
Last: ToFloat64(tir.Last),
VolCcy24h: ToFloat64(tir.VolCcy24h),
Ts: ToInt64(tir.Ts),
Last: utils.ToFloat64(tir.Last),
VolCcy24h: utils.ToFloat64(tir.VolCcy24h),
Ts: utils.ToInt64(tir.Ts),
LastUpdate: time.Now(),
}
return ti
}
func ToString(val interface{}) string {
valstr := ""
if reflect.TypeOf(val).Name() == "string" {
valstr = val.(string)
} else if reflect.TypeOf(val).Name() == "float64" {
valstr = fmt.Sprintf("%f", val)
} else if reflect.TypeOf(val).Name() == "int64" {
valstr = strconv.FormatInt(val.(int64), 16)
} else if reflect.TypeOf(val).Name() == "int" {
valstr = fmt.Sprintf("%d", val)
}
return valstr
}
func ToInt64(val interface{}) int64 {
vali := int64(0)
if reflect.TypeOf(val).Name() == "string" {
vali, _ = strconv.ParseInt(val.(string), 10, 64)
} else if reflect.TypeOf(val).Name() == "float64" {
vali = int64(val.(float64))
}
return vali
}
func ToFloat64(val interface{}) float64 {
valf := float64(0)
if reflect.TypeOf(val).Name() == "string" {
valf, _ = strconv.ParseFloat(val.(string), 64)
} else if reflect.TypeOf(val).Name() == "float64" {
valf = val.(float64)
} else if reflect.TypeOf(val).Name() == "int64" {
valf = float64(val.(int64))
}
return valf
}
// TODO 有待实现
func (ti *TickerInfo) SetToKey(cr *Core) error {
func (ti *TickerInfo) SetToKey(cr *core.Core) error {
js, _ := json.Marshal(*ti)
plateName := ti.InstID + "|tickerInfo"
_, err := cr.RedisLocalCli.Set(plateName, string(js), 0).Result()

View File

@ -1,8 +1,6 @@
package models
import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
// "reflect"
@ -78,77 +76,6 @@ func (mc *MatchCheck) SetMatched(value bool) {
mc.Matched = value
}
func (core *core.Core) GetCandlesWithRest(instId string, kidx int, dura time.Duration, maxCandles int) error {
ary := []string{}
wsary := core.Cfg.CandleDimentions
for k, v := range wsary {
matched := false
// 这个算法的目的是越靠后的candles维度被命中的概率越低第一个百分之百命中后面开始越来越低, 每分钟都会发生这样的计算,
// 因为维度多了的话,照顾不过来
rand.New(rand.NewSource(time.Now().UnixNano()))
rand.Seed(time.Now().UnixNano())
n := (k*2 + 2) * 3
if n < 1 {
n = 1
}
b := rand.Intn(n)
if b < 8 {
matched = true
}
if matched {
ary = append(ary, v)
}
}
mdura := dura/(time.Duration(len(ary)+1)) - 50*time.Millisecond
// fmt.Println("loop4 Ticker Start instId, dura: ", instId, dura, dura/10, mdura, len(ary), " idx: ", kidx)
// time.Duration(len(ary)+1)
ticker := time.NewTicker(mdura)
done := make(chan bool)
idx := 0
go func(i int) {
for {
select {
case <-ticker.C:
if i >= (len(ary)) {
done <- true
break
}
rand.Seed(time.Now().UnixNano())
b := rand.Intn(2)
maxCandles = maxCandles * (i + b) * 2
if maxCandles < 3 {
maxCandles = 3
}
if maxCandles > 30 {
maxCandles = 30
}
mx := strconv.Itoa(maxCandles)
// fmt.Println("loop4 getCandlesWithRest, instId, period,limit,dura, t: ", instId, ary[i], mx, mdura)
go func(ii int) {
restQ := RestQueue{
InstId: instId,
Bar: ary[ii],
Limit: mx,
Duration: mdura,
WithWs: true,
}
js, _ := json.Marshal(restQ)
core.RedisLocalCli.LPush("restQueue", js)
}(i)
i++
}
}
}(idx)
time.Sleep(dura - 10*time.Millisecond)
ticker.Stop()
// fmt.Println("loop4 Ticker stopped instId, dura: ", instId, dura, mdura)
done <- true
return nil
}
// 当前的时间毫秒数 对于某个时间段比如3分钟10分钟是否可以被整除
func IsModOf(curInt int64, duration time.Duration) bool {
vol := int64(0)
@ -284,15 +211,6 @@ func Daoxu(arr []interface{}) {
arr[length-1-i] = temp
}
}
func HashString(input string) string {
// 计算SHA-256哈希值
hash := sha256.Sum256([]byte(input))
// 转换为十六进制字符串
hashHex := hex.EncodeToString(hash[:])
// 返回前20位
return hashHex[:23]
}
func (cl *Candle) ToStruct(core *core.Core) (*Candle, error) {
// cl.Timestamp
// 将字符串转换为 int64 类型的时间戳
@ -340,180 +258,6 @@ func (cl *Candle) ToStruct(core *core.Core) (*Candle, error) {
return cl, nil
}
// 保证同一个 period, keyName 在一个周期里SaveToSortSet只会被执行一次
func (core *core.Core) SaveUniKey(period string, keyName string, extt time.Duration, tsi int64) {
refName := keyName + "|refer"
// refRes, _ := core.RedisLocalCli.GetSet(refName, 1).Result()
core.RedisLocalCli.Expire(refName, extt)
// 为保证唯一性机制防止SaveToSortSet 被重复执行, ps: 不需要唯一此操作幂等在redis里
// founded, _ := core.findInSortSet(period, keyName, extt, tsi)
// if len(refRes) != 0 {
// logrus.Error("refName exist: ", refName)
// return
// }
core.SaveToSortSet(period, keyName, extt, tsi)
}
func (core *core.Core) findInSortSet(period string, keyName string, extt time.Duration, tsi int64) (bool, error) {
founded := false
ary := strings.Split(keyName, "ts:")
setName := ary[0] + "sortedSet"
opt := redis.ZRangeBy{
Min: ToString(tsi),
Max: ToString(tsi),
}
rs, err := core.RedisLocalCli.ZRangeByScore(setName, opt).Result()
if len(rs) > 0 {
founded = true
}
if err != nil {
logrus.Error("err of ma7|ma30 add to redis:", err)
} else {
logrus.Info("sortedSet added to redis:", rs, keyName)
}
return founded, nil
}
// tsi: 上报时间timeStamp millinSecond
func (core *core.Core) SaveToSortSet(period string, keyName string, extt time.Duration, tsi int64) {
ary := strings.Split(keyName, "ts:")
setName := ary[0] + "sortedSet"
z := redis.Z{
Score: float64(tsi),
Member: keyName,
}
rs, err := core.RedisLocalCli.ZAdd(setName, z).Result()
if err != nil {
logrus.Warn("err of ma7|ma30 add to redis:", err)
} else {
logrus.Warn("sortedSet added to redis:", rs, keyName)
}
}
// 根据周期的文本内容,返回这代表多少个分钟
func (cr *core.Core) PeriodToMinutes(period string) (int64, error) {
ary := strings.Split(period, "")
beiStr := "1"
danwei := ""
if len(ary) == 0 {
err := errors.New(utils.GetFuncName() + " period is block")
return 0, err
}
if len(ary) == 3 {
beiStr = ary[0] + ary[1]
danwei = ary[2]
} else {
beiStr = ary[0]
danwei = ary[1]
}
cheng := 1
bei, _ := strconv.Atoi(beiStr)
switch danwei {
case "m":
{
cheng = bei
break
}
case "H":
{
cheng = bei * 60
break
}
case "D":
{
cheng = bei * 60 * 24
break
}
case "W":
{
cheng = bei * 60 * 24 * 7
break
}
case "M":
{
cheng = bei * 60 * 24 * 30
break
}
case "Y":
{
cheng = bei * 60 * 24 * 365
break
}
default:
{
logrus.Warning("notmatch:", danwei, period)
panic("notmatch:" + period)
}
}
return int64(cheng), nil
}
// type ScanCmd struct {
// baseCmd
//
// page []string
// cursor uint64
//
// process func(cmd Cmder) error
// }
func (core *core.Core) GetRangeKeyList(pattern string, from time.Time) ([]*simple.Json, error) {
// 比如用来计算ma30或ma7倒推多少时间范围
redisCli := core.RedisLocalCli
cursor := uint64(0)
n := 0
allTs := []int64{}
var keys []string
for {
var err error
keys, cursor, _ = redisCli.Scan(cursor, pattern+"*", 2000).Result()
if err != nil {
panic(err)
}
n += len(keys)
if n == 0 {
break
}
}
// keys, _ := redisCli.Keys(pattern + "*").Result()
for _, key := range keys {
keyAry := strings.Split(key, ":")
key = keyAry[1]
keyi64, _ := strconv.ParseInt(key, 10, 64)
allTs = append(allTs, keyi64)
}
nary := utils.RecursiveBubble(allTs, len(allTs))
tt := from.UnixMilli()
ff := tt - tt%60000
fi := int64(ff)
mary := []int64{}
for _, v := range nary {
if v < fi {
break
}
mary = append(mary, v)
}
res := []*simple.Json{}
for _, v := range mary {
// if k > 1 {
// break
// }
nv := pattern + strconv.FormatInt(v, 10)
str, err := redisCli.Get(nv).Result()
if err != nil {
logrus.Error("err of redis get key:", nv, err)
}
cur, err := simple.NewJson([]byte(str))
if err != nil {
logrus.Error("err of create newJson:", str, err)
}
res = append(res, cur)
}
return res, nil
}
func (cl *Candle) SetToKey(core *core.Core) ([]interface{}, error) {
data := cl.Data
tsi, err := strconv.ParseInt(data[0].(string), 10, 64)

View File

@ -2,6 +2,7 @@ package utils
import (
"crypto/md5"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
@ -9,6 +10,7 @@ import (
logrus "github.com/sirupsen/logrus"
"math"
"math/rand"
"reflect"
"runtime"
"strconv"
"time"
@ -207,3 +209,47 @@ func Md5V(str string) string {
h.Write([]byte(str))
return hex.EncodeToString(h.Sum(nil))
}
func ToString(val interface{}) string {
valstr := ""
if reflect.TypeOf(val).Name() == "string" {
valstr = val.(string)
} else if reflect.TypeOf(val).Name() == "float64" {
valstr = fmt.Sprintf("%f", val)
} else if reflect.TypeOf(val).Name() == "int64" {
valstr = strconv.FormatInt(val.(int64), 16)
} else if reflect.TypeOf(val).Name() == "int" {
valstr = fmt.Sprintf("%d", val)
}
return valstr
}
func ToInt64(val interface{}) int64 {
vali := int64(0)
if reflect.TypeOf(val).Name() == "string" {
vali, _ = strconv.ParseInt(val.(string), 10, 64)
} else if reflect.TypeOf(val).Name() == "float64" {
vali = int64(val.(float64))
}
return vali
}
func ToFloat64(val interface{}) float64 {
valf := float64(0)
if reflect.TypeOf(val).Name() == "string" {
valf, _ = strconv.ParseFloat(val.(string), 64)
} else if reflect.TypeOf(val).Name() == "float64" {
valf = val.(float64)
} else if reflect.TypeOf(val).Name() == "int64" {
valf = float64(val.(int64))
}
return valf
}
func HashString(input string) string {
// 计算SHA-256哈希值
hash := sha256.Sum256([]byte(input))
// 转换为十六进制字符串
hashHex := hex.EncodeToString(hash[:])
// 返回前20位
return hashHex[:23]
}