diff --git a/internal/models/candle.go b/candle.go similarity index 61% rename from internal/models/candle.go rename to candle.go index 5ddc108..77c7860 100644 --- a/internal/models/candle.go +++ b/candle.go @@ -1,6 +1,8 @@ -package models +package core import ( + "crypto/sha256" + "encoding/hex" "encoding/json" "errors" // "reflect" @@ -13,14 +15,13 @@ import ( simple "github.com/bitly/go-simplejson" "github.com/go-redis/redis" + "github.com/phyer/texus/utils" logrus "github.com/sirupsen/logrus" - - "github.com/phyer/core/internal/core" // 新增 ) type Candle struct { Id string `json:"_id"` - core *core.Core + core *Core InstID string `json:"instID"` Period string `json:"period"` Data []interface{} @@ -35,7 +36,7 @@ type Candle struct { Confirm bool `json:"confirm"` } type Sample interface { - SetToKey(cr *core.Core) ([]interface{}, error) + SetToKey(cr *Core) ([]interface{}, error) } type SampleList interface { @@ -61,7 +62,7 @@ type MatchCheck struct { Matched bool } -func (cd *Candle) Filter(cr *core.Core) bool { +func (cd *Candle) Filter(cr *Core) bool { myFocusList := cr.Cfg.Config.Get("focusList").MustArray() founded := false for _, v := range myFocusList { @@ -76,6 +77,77 @@ func (mc *MatchCheck) SetMatched(value bool) { mc.Matched = value } +func (core *Core) GetCandlesWithRest(instId string, kidx int, dura time.Duration, maxCandles int) error { + ary := []string{} + + wsary := core.Cfg.CandleDimentions + for k, v := range wsary { + matched := false + // 这个算法的目的是:越靠后的candles维度,被命中的概率越低,第一个百分之百命中,后面开始越来越低, 每分钟都会发生这样的计算, + // 因为维度多了的话,照顾不过来 + rand.New(rand.NewSource(time.Now().UnixNano())) + rand.Seed(time.Now().UnixNano()) + n := (k*2 + 2) * 3 + if n < 1 { + n = 1 + } + b := rand.Intn(n) + if b < 8 { + matched = true + } + if matched { + ary = append(ary, v) + } + } + + mdura := dura/(time.Duration(len(ary)+1)) - 50*time.Millisecond + // fmt.Println("loop4 Ticker Start instId, dura: ", instId, dura, dura/10, mdura, len(ary), " idx: ", kidx) + // time.Duration(len(ary)+1) + ticker := time.NewTicker(mdura) + done := make(chan bool) + idx := 0 + go func(i int) { + for { + select { + case <-ticker.C: + if i >= (len(ary)) { + done <- true + break + } + rand.Seed(time.Now().UnixNano()) + b := rand.Intn(2) + maxCandles = maxCandles * (i + b) * 2 + + if maxCandles < 3 { + maxCandles = 3 + } + if maxCandles > 30 { + maxCandles = 30 + } + mx := strconv.Itoa(maxCandles) + // fmt.Println("loop4 getCandlesWithRest, instId, period,limit,dura, t: ", instId, ary[i], mx, mdura) + go func(ii int) { + restQ := RestQueue{ + InstId: instId, + Bar: ary[ii], + Limit: mx, + Duration: mdura, + WithWs: true, + } + js, _ := json.Marshal(restQ) + core.RedisLocalCli.LPush("restQueue", js) + }(i) + i++ + } + } + }(idx) + time.Sleep(dura - 10*time.Millisecond) + ticker.Stop() + // fmt.Println("loop4 Ticker stopped instId, dura: ", instId, dura, mdura) + done <- true + return nil +} + // 当前的时间毫秒数 对于某个时间段,比如3分钟,10分钟,是否可以被整除, func IsModOf(curInt int64, duration time.Duration) bool { vol := int64(0) @@ -105,7 +177,7 @@ func IsModOf(curInt int64, duration time.Duration) bool { return false } -func (core *core.Core) SaveCandle(instId string, period string, rsp *CandleData, dura time.Duration, withWs bool) { +func (core *Core) SaveCandle(instId string, period string, rsp *CandleData, dura time.Duration, withWs bool) { leng := len(rsp.Data) // fmt.Println("saveCandle leng: ", leng, " instId: ", instId, " period: ", period) logrus.Info("saveCandles leng: ", leng, " instId: ", instId, " period: ", period, " length of rsp.Data: ", len(rsp.Data)) @@ -183,7 +255,7 @@ func (core *core.Core) SaveCandle(instId string, period string, rsp *CandleData, } } -func (candle *Candle) PushToWriteLogChan(cr *core.Core) error { +func (candle *Candle) PushToWriteLogChan(cr *Core) error { did := candle.InstID + candle.Period + candle.Data[0].(string) candle.Id = HashString(did) cl, _ := candle.ToStruct(cr) @@ -211,7 +283,16 @@ func Daoxu(arr []interface{}) { arr[length-1-i] = temp } } -func (cl *Candle) ToStruct(core *core.Core) (*Candle, error) { +func HashString(input string) string { + // 计算SHA-256哈希值 + hash := sha256.Sum256([]byte(input)) + // 转换为十六进制字符串 + hashHex := hex.EncodeToString(hash[:]) + // 返回前20位 + return hashHex[:23] +} + +func (cl *Candle) ToStruct(core *Core) (*Candle, error) { // cl.Timestamp // 将字符串转换为 int64 类型的时间戳 ts, err := strconv.ParseInt(cl.Data[0].(string), 10, 64) @@ -258,7 +339,181 @@ func (cl *Candle) ToStruct(core *core.Core) (*Candle, error) { return cl, nil } -func (cl *Candle) SetToKey(core *core.Core) ([]interface{}, error) { +// 保证同一个 period, keyName ,在一个周期里,SaveToSortSet只会被执行一次 +func (core *Core) SaveUniKey(period string, keyName string, extt time.Duration, tsi int64) { + + refName := keyName + "|refer" + // refRes, _ := core.RedisLocalCli.GetSet(refName, 1).Result() + core.RedisLocalCli.Expire(refName, extt) + // 为保证唯一性机制,防止SaveToSortSet 被重复执行, ps: 不需要唯一,此操作幂等在redis里 + // founded, _ := core.findInSortSet(period, keyName, extt, tsi) + // if len(refRes) != 0 { + // logrus.Error("refName exist: ", refName) + // return + // } + core.SaveToSortSet(period, keyName, extt, tsi) +} + +func (core *Core) findInSortSet(period string, keyName string, extt time.Duration, tsi int64) (bool, error) { + founded := false + ary := strings.Split(keyName, "ts:") + setName := ary[0] + "sortedSet" + opt := redis.ZRangeBy{ + Min: ToString(tsi), + Max: ToString(tsi), + } + rs, err := core.RedisLocalCli.ZRangeByScore(setName, opt).Result() + if len(rs) > 0 { + founded = true + } + if err != nil { + logrus.Error("err of ma7|ma30 add to redis:", err) + } else { + logrus.Info("sortedSet added to redis:", rs, keyName) + } + return founded, nil +} + +// tsi: 上报时间timeStamp millinSecond +func (core *Core) SaveToSortSet(period string, keyName string, extt time.Duration, tsi int64) { + ary := strings.Split(keyName, "ts:") + setName := ary[0] + "sortedSet" + z := redis.Z{ + Score: float64(tsi), + Member: keyName, + } + rs, err := core.RedisLocalCli.ZAdd(setName, z).Result() + if err != nil { + logrus.Warn("err of ma7|ma30 add to redis:", err) + } else { + logrus.Warn("sortedSet added to redis:", rs, keyName) + } +} + +// 根据周期的文本内容,返回这代表多少个分钟 +func (cr *Core) PeriodToMinutes(period string) (int64, error) { + ary := strings.Split(period, "") + beiStr := "1" + danwei := "" + if len(ary) == 0 { + err := errors.New(utils.GetFuncName() + " period is block") + return 0, err + } + if len(ary) == 3 { + beiStr = ary[0] + ary[1] + danwei = ary[2] + } else { + beiStr = ary[0] + danwei = ary[1] + } + cheng := 1 + bei, _ := strconv.Atoi(beiStr) + switch danwei { + case "m": + { + cheng = bei + break + } + case "H": + { + cheng = bei * 60 + break + } + case "D": + { + cheng = bei * 60 * 24 + break + } + case "W": + { + cheng = bei * 60 * 24 * 7 + break + } + case "M": + { + cheng = bei * 60 * 24 * 30 + break + } + case "Y": + { + cheng = bei * 60 * 24 * 365 + break + } + default: + { + logrus.Warning("notmatch:", danwei, period) + panic("notmatch:" + period) + } + } + return int64(cheng), nil +} + +// type ScanCmd struct { +// baseCmd +// +// page []string +// cursor uint64 +// +// process func(cmd Cmder) error +// } +func (core *Core) GetRangeKeyList(pattern string, from time.Time) ([]*simple.Json, error) { + // 比如,用来计算ma30或ma7,倒推多少时间范围, + redisCli := core.RedisLocalCli + cursor := uint64(0) + n := 0 + allTs := []int64{} + var keys []string + for { + var err error + keys, cursor, _ = redisCli.Scan(cursor, pattern+"*", 2000).Result() + if err != nil { + panic(err) + } + n += len(keys) + if n == 0 { + break + } + } + + // keys, _ := redisCli.Keys(pattern + "*").Result() + for _, key := range keys { + keyAry := strings.Split(key, ":") + key = keyAry[1] + keyi64, _ := strconv.ParseInt(key, 10, 64) + allTs = append(allTs, keyi64) + } + nary := utils.RecursiveBubble(allTs, len(allTs)) + tt := from.UnixMilli() + ff := tt - tt%60000 + fi := int64(ff) + mary := []int64{} + for _, v := range nary { + if v < fi { + break + } + mary = append(mary, v) + } + res := []*simple.Json{} + for _, v := range mary { + // if k > 1 { + // break + // } + nv := pattern + strconv.FormatInt(v, 10) + str, err := redisCli.Get(nv).Result() + if err != nil { + logrus.Error("err of redis get key:", nv, err) + } + cur, err := simple.NewJson([]byte(str)) + if err != nil { + logrus.Error("err of create newJson:", str, err) + } + res = append(res, cur) + } + + return res, nil +} + +func (cl *Candle) SetToKey(core *Core) ([]interface{}, error) { data := cl.Data tsi, err := strconv.ParseInt(data[0].(string), 10, 64) @@ -367,61 +622,3 @@ func (cdl *CandleList) RPush(sp *Candle) (Sample, error) { return nil, err } } - -// TODO pixel -func (cdl *CandleList) MakePixelList(cr *core.Core, mx *MaX, score float64) (*PixelList, error) { - if mx.Data[2] != float64(30) { - err := errors.New("ma30 原始数据不足30条") - return nil, err - } - pxl := PixelList{ - Count: cdl.Count, - UpdateNickName: cdl.UpdateNickName, - LastUpdateTime: cdl.LastUpdateTime, - List: []*Pixel{}, - } - realLens := len(cdl.List) - cha := cdl.Count - realLens - for i := 0; i < 24; i++ { - pix := Pixel{} - pxl.List = append(pxl.List, &pix) - } - ma30Val := ToFloat64(mx.Data[1]) - for h := cdl.Count - 1; h-cha >= 0; h-- { - // Count 是希望值,比如24,realLens是实际值, 如果希望值和实际值相等,cha就是0 - cdLast := cdl.List[h-cha].Data[4] - cdLastf := ToFloat64(cdLast) - cdOpen := cdl.List[h-cha].Data[1] - cdOpenf := ToFloat64(cdOpen) - cdHigh := cdl.List[h-cha].Data[2] - cdHighf := ToFloat64(cdHigh) - cdLow := cdl.List[h-cha].Data[3] - cdLowf := ToFloat64(cdLow) - - yCandle := YCandle{ - Open: (cdOpenf - ma30Val) / ma30Val / score, - High: (cdHighf - ma30Val) / ma30Val / score, - Low: (cdLowf - ma30Val) / ma30Val / score, - Close: (cdLastf - ma30Val) / ma30Val / score, - } - tmi := ToInt64(cdl.List[h-cha].Data[0]) - pxl.List[h].Y = (cdLastf - ma30Val) / ma30Val / score - pxl.List[h].X = float64(h) - pxl.List[h].YCandle = yCandle - pxl.List[h].Score = cdLastf - pxl.List[h].TimeStamp = tmi - } - - return &pxl, nil -} -func (cr *core.Core) AddToGeneralSeriesInfoChnl(sr *SeriesInfo) { - redisCli := cr.RedisLocalCli - ab, _ := json.Marshal(sr) - if len(string(ab)) == 0 { - return - } - _, err := redisCli.Publish(ALLSERIESINFO_PUBLISH, string(ab)).Result() - if err != nil { - logrus.Debug("err of seriesinfo add to redis2:", err, sr.InstID, sr.Period) - } -} diff --git a/cmd/core/main.go b/cmd/core/main.go deleted file mode 100644 index 2076ec5..0000000 --- a/cmd/core/main.go +++ /dev/null @@ -1,20 +0,0 @@ -package main - -import ( - "fmt" - "os" -) - -const ( - Name = "core" - Version = "1.0.0" - Author = "phyer" -) - -func main() { - if len(os.Args) > 1 && os.Args[1] == "--version" { - fmt.Printf("%s %s (by %s)\n", Name, Version, Author) - return - } - fmt.Println("This is a library package, not an executable") -} diff --git a/internal/models/coaster.go b/coaster.go similarity index 55% rename from internal/models/coaster.go rename to coaster.go index 819aa38..b1768f5 100644 --- a/internal/models/coaster.go +++ b/coaster.go @@ -1,11 +1,11 @@ -package models +package core import ( "encoding/json" - "errors" - "fmt" + // "errors" + // "fmt" logrus "github.com/sirupsen/logrus" - "os" + // "os" "strconv" "time" ) @@ -89,90 +89,89 @@ func (co *Coaster) SetToKey(cr *Core) (string, error) { return res, err } -func (coi *CoasterInfo) Process(cr *Core) { - curCo, _ := cr.GetCoasterFromPlate(coi.InstID, coi.Period) - go func(co Coaster) { - //这里执行:创建一个tray对象,用现有的co的数据计算和填充其listMap - // TODO 发到一个channel里来执行下面的任务, - allow := os.Getenv("SARDINE_MAKESERIES") == "true" - if !allow { - return - } - srs, err := co.UpdateTray(cr) - if err != nil || srs == nil { - logrus.Warn("tray err: ", err) - return - } - _, err = srs.SetToKey(cr) - if err != nil { - logrus.Warn("srs SetToKey err: ", err) - return - } - //实例化完一个tray之后,拿着这个tray去执行Analytics方法 - // - // srsinfo := SeriesInfo{ - // InstID: curCo.InstID, - // Period: curCo.Period, - // } - // - // cr.SeriesChan <- &srsinfo - }(curCo) - - go func(co Coaster) { - // 每3次会有一次触发缓存落盘 - // run := utils.Shaizi(3) - // if run { - _, err := co.SetToKey(cr) - if err != nil { - logrus.Warn("coaster process err: ", err) - fmt.Println("coaster SetToKey err: ", err) - } - // } - - }(curCo) -} - +// func (coi *CoasterInfo) Process(cr *Core) { +// curCo, _ := cr.GetCoasterFromPlate(coi.InstID, coi.Period) +// go func(co Coaster) { +// //这里执行:创建一个tray对象,用现有的co的数据计算和填充其listMap +// // TODO 发到一个channel里来执行下面的任务, +// allow := os.Getenv("SARDINE_MAKESERIES") == "true" +// if !allow { +// return +// } +// srs, err := co.UpdateTray(cr) +// if err != nil || srs == nil { +// logrus.Warn("tray err: ", err) +// return +// } +// _, err = srs.SetToKey(cr) +// if err != nil { +// logrus.Warn("srs SetToKey err: ", err) +// return +// } +// //实例化完一个tray之后,拿着这个tray去执行Analytics方法 +// // +// // srsinfo := SeriesInfo{ +// // InstID: curCo.InstID, +// // Period: curCo.Period, +// // } +// // +// // cr.SeriesChan <- &srsinfo +// }(curCo) +// +// go func(co Coaster) { +// // 每3次会有一次触发缓存落盘 +// // run := utils.Shaizi(3) +// // if run { +// _, err := co.SetToKey(cr) +// if err != nil { +// logrus.Warn("coaster process err: ", err) +// fmt.Println("coaster SetToKey err: ", err) +// } +// // } +// +// }(curCo) +// } // // TODO 类似于InsertIntoPlate函数,照猫画虎就行了 // -func (co *Coaster) UpdateTray(cr *Core) (*Series, error) { - cr.Mu1.Lock() - defer cr.Mu1.Unlock() - //尝试从内存读取tray对象 - tr, trayFounded := cr.TrayMap[co.InstID] - if !trayFounded { - tr1, err := co.LoadTray(cr) - if err != nil { - return nil, err - } - cr.TrayMap[co.InstID] = tr1 - tr = tr1 - } - srs, seriesFounded := tr.SeriesMap["period"+co.Period] - err := errors.New("") - if !seriesFounded { - srs1, err := tr.NewSeries(cr, co.Period) - if err != nil { - return nil, err - } - tr.SeriesMap["period"+co.Period] = srs1 - } else { - err = srs.Refresh(cr) - } - // if err == nil { - // bj, _ := json.Marshal(srs) - // logrus.Debug("series:,string"(bj)) - // } - return srs, err -} - +// func (co *Coaster) UpdateTray(cr *Core) (*Series, error) { +// cr.Mu1.Lock() +// defer cr.Mu1.Unlock() +// //尝试从内存读取tray对象 +// tr, trayFounded := cr.TrayMap[co.InstID] +// if !trayFounded { +// tr1, err := co.LoadTray(cr) +// if err != nil { +// return nil, err +// } +// cr.TrayMap[co.InstID] = tr1 +// tr = tr1 +// } +// srs, seriesFounded := tr.SeriesMap["period"+co.Period] +// err := errors.New("") +// if !seriesFounded { +// srs1, err := tr.NewSeries(cr, co.Period) +// if err != nil { +// return nil, err +// } +// tr.SeriesMap["period"+co.Period] = srs1 +// } else { +// err = srs.Refresh(cr) +// } +// // if err == nil { +// // bj, _ := json.Marshal(srs) +// // logrus.Debug("series:,string"(bj)) +// // } +// return srs, err +// } +// // TODO -func (co *Coaster) LoadTray(cr *Core) (*Tray, error) { - tray := Tray{} - tray.Init(co.InstID) - prs := cr.Cfg.Config.Get("candleDimentions").MustArray() - for _, v := range prs { - tray.NewSeries(cr, v.(string)) - } - return &tray, nil -} +// func (co *Coaster) LoadTray(cr *Core) (*Tray, error) { +// tray := Tray{} +// tray.Init(co.InstID) +// prs := cr.Cfg.Config.Get("candleDimentions").MustArray() +// for _, v := range prs { +// tray.NewSeries(cr, v.(string)) +// } +// return &tray, nil +// } diff --git a/internal/config/config.go b/config.go similarity index 99% rename from internal/config/config.go rename to config.go index 4e34002..df46aa5 100644 --- a/internal/config/config.go +++ b/config.go @@ -1,4 +1,4 @@ -package config +package core import ( // "fmt" diff --git a/internal/config/const.go b/const.go similarity index 99% rename from internal/config/const.go rename to const.go index 0946adb..b276661 100644 --- a/internal/config/const.go +++ b/const.go @@ -1,4 +1,4 @@ -package config +package core const MAIN_ALLCOINS_PERIOD_MINUTES = 1 const MAIN_ALLCOINS_ONCE_COUNTS = 3 diff --git a/internal/core/core.go b/core.go similarity index 68% rename from internal/core/core.go rename to core.go index 74396ae..747fe05 100644 --- a/internal/core/core.go +++ b/core.go @@ -5,6 +5,8 @@ import ( "encoding/json" "errors" "fmt" + + // "math/rand" "io/ioutil" "net/http" "os" @@ -13,115 +15,43 @@ import ( "sync" "time" + // simple "github.com/bitly/go-simplejson" + // "v5sdk_go/ws/wImpl" "github.com/go-redis/redis" - // "github.com/phyer/texus/private" - "github.com/phyer/core/analysis" - "github.com/phyer/core/config" - "github.com/phyer/core/models" + "github.com/phyer/texus/private" "github.com/phyer/v5sdkgo/rest" logrus "github.com/sirupsen/logrus" ) type Core struct { Env string - Cfg *config.MyConfig + Cfg *MyConfig RedisLocalCli *redis.Client RedisRemoteCli *redis.Client FluentBitUrl string - PlateMap map[string]*models.Plate - TrayMap map[string]*models.Tray + PlateMap map[string]*Plate + TrayMap map[string]*Tray CoasterMd5SyncMap sync.Map Mu *sync.Mutex Mu1 *sync.Mutex Waity *sync.WaitGroup - CandlesProcessChan chan *models.Candle + CandlesProcessChan chan *Candle MaXProcessChan chan *MaX RsiProcessChan chan *Rsi StockRsiProcessChan chan *StockRsi TickerInforocessChan chan *TickerInfo CoasterChan chan *CoasterInfo - analysis.SeriesChan chan *analysis.SeriesInfo // to be init - SegmentItemChan chan *SegmentItem // to be init - MakeMaXsChan chan *Candle - ShearForceGrpChan chan *ShearForceGrp // to be init - InvokeRestQueueChan chan *RestQueue - RedisLocal2Cli *redis.Client - RestQueueChan chan *RestQueue + // SeriesChan chan *SeriesInfo + // SegmentItemChan chan *SegmentItem + MakeMaXsChan chan *Candle + // ShearForceGrpChan chan *ShearForceGrp + InvokeRestQueueChan chan *RestQueue + RedisLocal2Cli *redis.Client + RestQueueChan chan *RestQueue RestQueue WriteLogChan chan *WriteLog } -func (cre *coreCore) GetCandlesWithRest(instId string, kidx int, dura time.Duration, maxCandles int) error { - ary := []string{} - - wsary := cre.Cfg.CandleDimentions - for k, v := range wsary { - matched := false - // 这个算法的目的是:越靠后的candles维度,被命中的概率越低,第一个百分之百命中,后面开始越来越低, 每分钟都会发生这样的计算, - // 因为维度多了的话,照顾不过来 - rand.New(rand.NewSource(time.Now().UnixNano())) - rand.Seed(time.Now().UnixNano()) - n := (k*2 + 2) * 3 - if n < 1 { - n = 1 - } - b := rand.Intn(n) - if b < 8 { - matched = true - } - if matched { - ary = append(ary, v) - } - } - - mdura := dura/(time.Duration(len(ary)+1)) - 50*time.Millisecond - // fmt.Println("loop4 Ticker Start instId, dura: ", instId, dura, dura/10, mdura, len(ary), " idx: ", kidx) - // time.Duration(len(ary)+1) - ticker := time.NewTicker(mdura) - done := make(chan bool) - idx := 0 - go func(i int) { - for { - select { - case <-ticker.C: - if i >= (len(ary)) { - done <- true - break - } - rand.Seed(time.Now().UnixNano()) - b := rand.Intn(2) - maxCandles = maxCandles * (i + b) * 2 - - if maxCandles < 3 { - maxCandles = 3 - } - if maxCandles > 30 { - maxCandles = 30 - } - mx := strconv.Itoa(maxCandles) - // fmt.Println("loop4 getCandlesWithRest, instId, period,limit,dura, t: ", instId, ary[i], mx, mdura) - go func(ii int) { - restQ := RestQueue{ - InstId: instId, - Bar: ary[ii], - Limit: mx, - Duration: mdura, - WithWs: true, - } - js, _ := json.Marshal(restQ) - coreRedisLocalCli.LPush("restQueue", js) - }(i) - i++ - } - } - }(idx) - time.Sleep(dura - 10*time.Millisecond) - ticker.Stop() - // fmt.Println("loop4 Ticker stopped instId, dura: ", instId, dura, mdura) - done <- true - return nil -} - type RestQueue struct { InstId string Bar string @@ -198,22 +128,22 @@ func WriteLogProcess(cr *Core) { // } func (core *Core) Init() { - coreEnv = os.Getenv("GO_ENV") + core.Env = os.Getenv("GO_ENV") gitBranch := os.Getenv("gitBranchName") commitID := os.Getenv("gitCommitID") - logrus.Info("当前环境: ", coreEnv) + logrus.Info("当前环境: ", core.Env) logrus.Info("gitBranch: ", gitBranch) logrus.Info("gitCommitID: ", commitID) cfg := MyConfig{} cfg, _ = cfg.Init() - coreCfg = &cfg - cli, err := coreGetRedisLocalCli() - coreRedisLocalCli = cli - coreRestQueueChan = make(chan *RestQueue) - coreWriteLogChan = make(chan *WriteLog) + core.Cfg = &cfg + cli, err := core.GetRedisLocalCli() + core.RedisLocalCli = cli + core.RestQueueChan = make(chan *RestQueue) + core.WriteLogChan = make(chan *WriteLog) // 跟订单有关的都关掉 - // coreOrderChan = make(chan *private.Order) + // core.OrderChan = make(chan *private.Order) if err != nil { logrus.Error("init redis client err: ", err) } @@ -236,9 +166,9 @@ func (core *Core) GetRedisCliFromConf(conf RedisConfig) (*redis.Client, error) { } func (core *Core) GetRemoteRedisLocalCli() (*redis.Client, error) { - ru := coreCfg.RedisConf.Url - rp := coreCfg.RedisConf.Password - ri := coreCfg.RedisConf.Index + ru := core.Cfg.RedisConf.Url + rp := core.Cfg.RedisConf.Password + ri := core.Cfg.RedisConf.Index re := os.Getenv("REDIS_URL") if len(re) > 0 { ru = re @@ -258,9 +188,9 @@ func (core *Core) GetRemoteRedisLocalCli() (*redis.Client, error) { return client, nil } func (core *Core) GetRedisLocalCli() (*redis.Client, error) { - ru := coreCfg.RedisConf.Url - rp := coreCfg.RedisConf.Password - ri := coreCfg.RedisConf.Index + ru := core.Cfg.RedisConf.Url + rp := core.Cfg.RedisConf.Password + ri := core.Cfg.RedisConf.Index re := os.Getenv("REDIS_URL") if len(re) > 0 { ru = re @@ -283,7 +213,7 @@ func (core *Core) GetRedisLocalCli() (*redis.Client, error) { // 这些应该是放到 texus 里实现的 func (core *Core) GetAllTickerInfo() (*rest.RESTAPIResult, error) { // GET / 获取所有产品行情信息 - rsp, err := coreRestInvoke("/api/v5/market/tickers?instType=SPOT", rest.GET) + rsp, err := core.RestInvoke("/api/v5/market/tickers?instType=SPOT", rest.GET) return rsp, err } @@ -291,14 +221,14 @@ func (core *Core) GetAllTickerInfo() (*rest.RESTAPIResult, error) { // // func (core *Core) GetBalances() (*rest.RESTAPIResult, error) { // // TODO 临时用了两个实现,restInvoke,复用原来的会有bug,不知道是谁的bug -// rsp, err := coreRestInvoke2("/api/v5/account/balance", rest.GET, nil) +// rsp, err := core.RestInvoke2("/api/v5/account/balance", rest.GET, nil) // return rsp, err // } // // func (core *Core) GetLivingOrderList() ([]*private.Order, error) { // // TODO 临时用了两个实现,restInvoke,复用原来的会有bug,不知道是谁的bug // params := make(map[string]interface{}) -// data, err := coreRestInvoke2("/api/v5/trade/orders-pending", rest.GET, ¶ms) +// data, err := core.RestInvoke2("/api/v5/trade/orders-pending", rest.GET, ¶ms) // odrsp := private.OrderResp{} // err = json.Unmarshal([]byte(data.Body), &odrsp) // str, _ := json.Marshal(odrsp) @@ -313,7 +243,7 @@ func (core *Core) GetAllTickerInfo() (*rest.RESTAPIResult, error) { // time.Sleep(3 * time.Second) // ctype := ws.SPOT // -// redisCli := coreRedisLocalCli +// redisCli := core.RedisLocalCli // counts, err := redisCli.HLen("instruments|" + ctype + "|hash").Result() // if err != nil { // fmt.Println("err of hset to redis:", err) @@ -328,7 +258,7 @@ func (core *Core) GetAllTickerInfo() (*rest.RESTAPIResult, error) { // func (core *Core) SubscribeTicker(op string) error { // mp := make(map[string]string) // -// redisCli := coreRedisLocalCli +// redisCli := core.RedisLocalCli // ctype := ws.SPOT // mp, err := redisCli.HGetAll("instruments|" + ctype + "|hash").Result() // b, err := json.Marshal(mp) @@ -347,7 +277,7 @@ func (core *Core) GetAllTickerInfo() (*rest.RESTAPIResult, error) { // time.Sleep(5 * time.Second) // go func(instId string, op string) { // -// redisCli := coreRedisLocalCli +// redisCli := core.RedisLocalCli // _, err = redisCli.SAdd("tickers|"+op+"|set", instId).Result() // if err != nil { // fmt.Println("err of unMarshalJson5:", js) @@ -360,7 +290,7 @@ func (core *Core) GetAllTickerInfo() (*rest.RESTAPIResult, error) { // 通过接口获取一个币种名下的某个时间范围内的Candle对象集合 // 按说这个应该放到 texus里实现 func (core *Core) v5PublicInvoke(subUrl string) (*CandleData, error) { - restUrl, _ := coreCfg.Config.Get("connect").Get("restBaseUrl").String() + restUrl, _ := core.Cfg.Config.Get("connect").Get("restBaseUrl").String() url := restUrl + subUrl resp, err := http.Get(url) if err != nil { @@ -377,14 +307,14 @@ func (core *Core) v5PublicInvoke(subUrl string) (*CandleData, error) { } func (core *Core) RestInvoke(subUrl string, method string) (*rest.RESTAPIResult, error) { - restUrl, _ := coreCfg.Config.Get("connect").Get("restBaseUrl").String() + restUrl, _ := core.Cfg.Config.Get("connect").Get("restBaseUrl").String() //ep, method, uri string, param *map[string]interface{} rest := rest.NewRESTAPI(restUrl, method, subUrl, nil) - key, _ := coreCfg.Config.Get("credentialReadOnly").Get("okAccessKey").String() - secure, _ := coreCfg.Config.Get("credentialReadOnly").Get("secretKey").String() - pass, _ := coreCfg.Config.Get("credentialReadOnly").Get("okAccessPassphrase").String() + key, _ := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessKey").String() + secure, _ := core.Cfg.Config.Get("credentialReadOnly").Get("secretKey").String() + pass, _ := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessPassphrase").String() isDemo := false - if coreEnv == "demoEnv" { + if core.Env == "demoEnv" { isDemo = true } rest.SetSimulate(isDemo).SetAPIKey(key, secure, pass) @@ -394,187 +324,13 @@ func (core *Core) RestInvoke(subUrl string, method string) (*rest.RESTAPIResult, } return response, err } -// 保证同一个 period, keyName ,在一个周期里,SaveToSortSet只会被执行一次 -func (core *core.Core) SaveUniKey(period string, keyName string, extt time.Duration, tsi int64) { - - refName := keyName + "|refer" - // refRes, _ := core.RedisLocalCli.GetSet(refName, 1).Result() - core.RedisLocalCli.Expire(refName, extt) - // 为保证唯一性机制,防止SaveToSortSet 被重复执行, ps: 不需要唯一,此操作幂等在redis里 - // founded, _ := core.findInSortSet(period, keyName, extt, tsi) - // if len(refRes) != 0 { - // logrus.Error("refName exist: ", refName) - // return - // } - core.SaveToSortSet(period, keyName, extt, tsi) -} - -func (core *core.Core) findInSortSet(period string, keyName string, extt time.Duration, tsi int64) (bool, error) { - founded := false - ary := strings.Split(keyName, "ts:") - setName := ary[0] + "sortedSet" - opt := redis.ZRangeBy{ - Min: ToString(tsi), - Max: ToString(tsi), - } - rs, err := core.RedisLocalCli.ZRangeByScore(setName, opt).Result() - if len(rs) > 0 { - founded = true - } - if err != nil { - logrus.Error("err of ma7|ma30 add to redis:", err) - } else { - logrus.Info("sortedSet added to redis:", rs, keyName) - } - return founded, nil -} - -// tsi: 上报时间timeStamp millinSecond -func (core *core.Core) SaveToSortSet(period string, keyName string, extt time.Duration, tsi int64) { - ary := strings.Split(keyName, "ts:") - setName := ary[0] + "sortedSet" - z := redis.Z{ - Score: float64(tsi), - Member: keyName, - } - rs, err := core.RedisLocalCli.ZAdd(setName, z).Result() - if err != nil { - logrus.Warn("err of ma7|ma30 add to redis:", err) - } else { - logrus.Warn("sortedSet added to redis:", rs, keyName) - } -} - -// 根据周期的文本内容,返回这代表多少个分钟 -func (cr *core.Core) PeriodToMinutes(period string) (int64, error) { - ary := strings.Split(period, "") - beiStr := "1" - danwei := "" - if len(ary) == 0 { - err := errors.New(utils.GetFuncName() + " period is block") - return 0, err - } - if len(ary) == 3 { - beiStr = ary[0] + ary[1] - danwei = ary[2] - } else { - beiStr = ary[0] - danwei = ary[1] - } - cheng := 1 - bei, _ := strconv.Atoi(beiStr) - switch danwei { - case "m": - { - cheng = bei - break - } - case "H": - { - cheng = bei * 60 - break - } - case "D": - { - cheng = bei * 60 * 24 - break - } - case "W": - { - cheng = bei * 60 * 24 * 7 - break - } - case "M": - { - cheng = bei * 60 * 24 * 30 - break - } - case "Y": - { - cheng = bei * 60 * 24 * 365 - break - } - default: - { - logrus.Warning("notmatch:", danwei, period) - panic("notmatch:" + period) - } - } - return int64(cheng), nil -} - -// type ScanCmd struct { -// baseCmd -// -// page []string -// cursor uint64 -// -// process func(cmd Cmder) error -// } -func (core *core.Core) GetRangeKeyList(pattern string, from time.Time) ([]*simple.Json, error) { - // 比如,用来计算ma30或ma7,倒推多少时间范围, - redisCli := core.RedisLocalCli - cursor := uint64(0) - n := 0 - allTs := []int64{} - var keys []string - for { - var err error - keys, cursor, _ = redisCli.Scan(cursor, pattern+"*", 2000).Result() - if err != nil { - panic(err) - } - n += len(keys) - if n == 0 { - break - } - } - - // keys, _ := redisCli.Keys(pattern + "*").Result() - for _, key := range keys { - keyAry := strings.Split(key, ":") - key = keyAry[1] - keyi64, _ := strconv.ParseInt(key, 10, 64) - allTs = append(allTs, keyi64) - } - nary := utils.RecursiveBubble(allTs, len(allTs)) - tt := from.UnixMilli() - ff := tt - tt%60000 - fi := int64(ff) - mary := []int64{} - for _, v := range nary { - if v < fi { - break - } - mary = append(mary, v) - } - res := []*simple.Json{} - for _, v := range mary { - // if k > 1 { - // break - // } - nv := pattern + strconv.FormatInt(v, 10) - str, err := redisCli.Get(nv).Result() - if err != nil { - logrus.Error("err of redis get key:", nv, err) - } - cur, err := simple.NewJson([]byte(str)) - if err != nil { - logrus.Error("err of create newJson:", str, err) - } - res = append(res, cur) - } - - return res, nil -} - // func (core *Core) RestInvoke2(subUrl string, method string, param *map[string]interface{}) (*rest.RESTAPIResult, error) { -// key, err1 := coreCfg.Config.Get("credentialReadOnly").Get("okAccessKey").String() -// secret, err2 := coreCfg.Config.Get("credentialReadOnly").Get("secretKey").String() -// pass, err3 := coreCfg.Config.Get("credentialReadOnly").Get("okAccessPassphrase").String() -// userId, err4 := coreCfg.Config.Get("connect").Get("userId").String() -// restUrl, err5 := coreCfg.Config.Get("connect").Get("restBaseUrl").String() +// key, err1 := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessKey").String() +// secret, err2 := core.Cfg.Config.Get("credentialReadOnly").Get("secretKey").String() +// pass, err3 := core.Cfg.Config.Get("credentialReadOnly").Get("okAccessPassphrase").String() +// userId, err4 := core.Cfg.Config.Get("connect").Get("userId").String() +// restUrl, err5 := core.Cfg.Config.Get("connect").Get("restBaseUrl").String() // if err1 != nil || err2 != nil || err3 != nil || err4 != nil || err5 != nil { // fmt.Println(err1, err2, err3, err4, err5) // } else { @@ -586,7 +342,7 @@ func (core *core.Core) GetRangeKeyList(pattern string, from time.Time) ([]*simpl // // } // // rs := rest.NewRESTAPI(restUrl, method, subUrl, &reqParam) // isDemo := false -// if coreEnv == "demoEnv" { +// if core.Env == "demoEnv" { // isDemo = true // } // // rs.SetSimulate(isDemo).SetAPIKey(key, secret, pass).SetUserId(userId) @@ -610,11 +366,11 @@ func (core *core.Core) GetRangeKeyList(pattern string, from time.Time) ([]*simpl // 跟下单有关的都关掉,以后再说 // func (core *Core) RestPost(subUrl string, param *map[string]interface{}) (*rest.RESTAPIResult, error) { -// key, err1 := coreCfg.Config.Get("credentialMutable").Get("okAccessKey").String() -// secret, err2 := coreCfg.Config.Get("credentialMutable").Get("secretKey").String() -// pass, err3 := coreCfg.Config.Get("credentialMutable").Get("okAccessPassphrase").String() -// userId, err4 := coreCfg.Config.Get("connect").Get("userId").String() -// restUrl, err5 := coreCfg.Config.Get("connect").Get("restBaseUrl").String() +// key, err1 := core.Cfg.Config.Get("credentialMutable").Get("okAccessKey").String() +// secret, err2 := core.Cfg.Config.Get("credentialMutable").Get("secretKey").String() +// pass, err3 := core.Cfg.Config.Get("credentialMutable").Get("okAccessPassphrase").String() +// userId, err4 := core.Cfg.Config.Get("connect").Get("userId").String() +// restUrl, err5 := core.Cfg.Config.Get("connect").Get("restBaseUrl").String() // if err1 != nil || err2 != nil || err3 != nil || err4 != nil || err5 != nil { // fmt.Println(err1, err2, err3, err4, err5) // } else { @@ -627,7 +383,7 @@ func (core *core.Core) GetRangeKeyList(pattern string, from time.Time) ([]*simpl // PassPhrase: pass, // } // isDemo := false -// if coreEnv == "demoEnv" { +// if core.Env == "demoEnv" { // isDemo = true // } // cli := rest.NewRESTClient(restUrl, &apikey, isDemo) @@ -640,7 +396,7 @@ func (core *core.Core) GetRangeKeyList(pattern string, from time.Time) ([]*simpl // 我当前持有的币,每分钟刷新 func (core *Core) GetMyFavorList() []string { - redisCli := coreRedisLocalCli + redisCli := core.RedisLocalCli opt := redis.ZRangeBy{ Min: "10", Max: "100000000000", @@ -658,8 +414,8 @@ func (core *Core) GetMyFavorList() []string { // 改了,不需要交易排行榜,我手动指定一个排行即可, tickersVol|sortedSet 改成 tickersList|sortedSet func (core *Core) GetScoreList(count int) []string { - // redisCli := coreRedisLocalCli - myFocusList := coreCfg.Config.Get("focusList").MustArray() + // redisCli := core.RedisLocalCli + myFocusList := core.Cfg.Config.Get("focusList").MustArray() logrus.Debug("curList: ", myFocusList) lst := []string{} for _, v := range myFocusList { @@ -870,7 +626,7 @@ func (core *Core) GetRangeMaXSortedSet(setName string, count int, from time.Time ary2 = strings.Split(ary1[1], "candle") period = ary2[1] - dui, err := corePeriodToMinutes(period) + dui, err := core.PeriodToMinutes(period) if err != nil { return mxl, err } @@ -885,7 +641,7 @@ func (core *Core) GetRangeMaXSortedSet(setName string, count int, from time.Time } ary := []string{} logrus.Debug("ZRevRangeByScore ", " setName:", setName, " froms:", froms, " sts:", sts) - dura, err := coreGetExpiration(period) + dura, err := core.GetExpiration(period) if err != nil { return mxl, err } @@ -893,7 +649,7 @@ func (core *Core) GetRangeMaXSortedSet(setName string, count int, from time.Time ot := time.Now().Add(dura * -1) oti := ot.UnixMilli() // fmt.Println(fmt.Sprint("GetExpiration zRemRangeByScore ", setName, " ", 0, " ", strconv.FormatInt(oti, 10))) - cli := coreRedisLocalCli + cli := core.RedisLocalCli cli.LTrim(setName, 0, oti) cunt, _ := cli.ZRemRangeByScore(setName, "0", strconv.FormatInt(oti, 10)).Result() if cunt > 0 { @@ -1011,7 +767,7 @@ func (core *Core) GetRangeCandleSortedSet(setName string, count int, from time.T period := strings.TrimPrefix(ary[0], "candle") // 获取period对应的分钟数 - durationMinutes, err := corePeriodToMinutes(period) + durationMinutes, err := core.PeriodToMinutes(period) if err != nil { return nil, fmt.Errorf("failed to get period minutes: %w", err) } @@ -1027,12 +783,12 @@ func (core *Core) GetRangeCandleSortedSet(setName string, count int, from time.T startTs := fromTs - durationMinutes*int64(count)*60*1000 // 清理过期数据 - if err := corecleanExpiredData(setName, period); err != nil { + if err := core.cleanExpiredData(setName, period); err != nil { logrus.Warnf("Failed to clean expired data: %v", err) } // 从Redis获取数据 - cli := coreRedisLocalCli + cli := core.RedisLocalCli opt := redis.ZRangeBy{ Min: strconv.FormatInt(startTs, 10), Max: strconv.FormatInt(fromTs, 10), @@ -1084,12 +840,12 @@ func (core *Core) GetRangeCandleSortedSet(setName string, count int, from time.T // cleanExpiredData 清理过期的数据 func (core *Core) cleanExpiredData(setName, period string) error { - expiration, err := coreGetExpiration(period) + expiration, err := core.GetExpiration(period) if err != nil { return err } - cli := coreRedisLocalCli + cli := core.RedisLocalCli expirationTime := time.Now().Add(-expiration) expirationTs := strconv.FormatInt(expirationTime.UnixMilli(), 10) @@ -1115,43 +871,3 @@ func (cr *Core) GetCoasterFromPlate(instID string, period string) (Coaster, erro return co, nil } - -func (core *Core) GetPixelanalysis.Series(instId string, period string) (analysis.Series, error) { - srs := analysis.Series{} - srName := instId + "|" + period + "|series" - cli := coreRedisLocalCli - srsStr, err := cli.Get(srName).Result() - if err != nil { - return *new(analysis.Series), err - } - err = json.Unmarshal([]byte(srsStr), &srs) - if err != nil { - return *new(analysis.Series), err - } - logrus.Info("sei:", srsStr) - err = srs.Candleanalysis.Series.RecursiveBubbleS(srs.Candleanalysis.Series.Count, "asc") - if err != nil { - return *new(analysis.Series), err - } - // err = srs.Candleanalysis.Series.RecursiveBubbleX(srs.Candleanalysis.Series.Count, "asc") - // if err != nil { - // return nil, err - // } - err = srs.Ma7analysis.Series.RecursiveBubbleS(srs.Candleanalysis.Series.Count, "asc") - if err != nil { - return *new(analysis.Series), err - } - // err = srs.Ma7analysis.Series.RecursiveBubbleX(srs.Candleanalysis.Series.Count, "asc") - // if err != nil { - // return nil, err - // } - err = srs.Ma30analysis.Series.RecursiveBubbleS(srs.Candleanalysis.Series.Count, "asc") - if err != nil { - return *new(analysis.Series), err - } - // err = srs.Ma30analysis.Series.RecursiveBubbleX(srs.Candleanalysis.Series.Count, "asc") - // if err != nil { - // return nil, err - // } - return srs, nil -} diff --git a/internal/analysis/segmentItem.go b/internal/analysis/segmentItem.go deleted file mode 100644 index 8b5b194..0000000 --- a/internal/analysis/segmentItem.go +++ /dev/null @@ -1,318 +0,0 @@ -package analysis - -import ( - "encoding/json" - "fmt" - "os" - "time" - - "github.com/phyer/core/internal/core" - logrus "github.com/sirupsen/logrus" -) - -// 段对象是对某个线段的表现进行评估的一个手段, 整个段会被分成3个小段, 整个段,计算整体的,字段,各自计算。包含仰角,段内极值等。 -// SegmentItem 属于一阶分析结果 -// {DMD-USDT 5D ma30 1643240793839 0 1642867200000 0xc00835fd80 0xc001687600 NaN 23 23 []}] -type SegmentItem struct { - InstID string - Period string //通过InstID,Periods可以定位到Series对象, 里面有一手数据 - Ctype string //candle|ma7|ma30 - ReportTime int64 - ReportTimeStr string - PolarQuadrant string // shangxian,manyue,xiaxian,xinyue, 分别对应圆周的四个阶段。 - LastUpdate int64 - ExtremumPixels *Extremum // 极值 是两个pixel对象 - FirstPixel *core.Pixel // 起始值,最后的pixel对象 - LastPixel *core.Pixel // 最后值,最后的maX pixel对象 - LastCandle *core.Pixel // 最后值,最后的Candle的pixel对象 - LastMa7 *core.Pixel // 最后值,最后的Ma7的pixel对象 - LastMa30 *core.Pixel // 最后值,最后的Ma30的pixel对象 - VerticalElevation float64 // 仰角, Interval范围内线段的仰角 - StartIdx int // 开始的坐标 - EndIdx int // 结束的坐标 - SubItemList []SegmentItem //往下一级微分 -} - -const DAMANYUE = "damanyue" -const DAMANYUE_POST = "damanyue_post" -const DAMANYUE_PRE = "damanyue_pre" -const XIAOMANYUE = "xiaomanyue" -const XIAOMANYUE_POST = "xiaomanyue_post" -const XIAOMANYUE_PRE = "xiaomanyue_pre" -const DAXINYUE = "daxinyue" -const DAXINYUE_POST = "daxinyue_post" -const DAXINYUE_PRE = "daxinyue_pre" -const XIAOXINYUE = "xiaoxinyue" -const XIAOXINYUE_POST = "xiaoxinyue_post" -const XIAOXINYUE_PRE = "xiaoxinyue_pre" -const DASHANGXIANYUE = "dashangxianyue" -const XIAOSHANGXIANYUE = "xiaoshangxianyue" -const DAXIAXIANYUE = "daxiaxianyue" -const XIAOXIAXIANYUE = "xiaoxiaxianyue" - -const tinySeg = 0.1 - -type Extremum struct { - Max *core.Pixel - Min *core.Pixel -} - -func CalPolar(e0 float64, e1 float64, e2 float64) string { - polarQuadrant := "default" - // - // ## 上弦月 - // - // e0,e1,e2: -3.5315694477826525 -0.5773082714100172 1.0558744145515746 - if e2 >= e1 && e2 >= 0 { - polarQuadrant = XIAOSHANGXIANYUE - } - // e2 > e1 > 0: 小上弦月 - // -> e1 > e0 > 0 : 大上弦月 - if e2 >= e1 && e1 >= 0 { - polarQuadrant = XIAOSHANGXIANYUE - if e1 >= e0 && e0 >= 0 { - polarQuadrant = DASHANGXIANYUE - } - } - // - // ## 下弦月 - // 0 > e1 > e2:小下弦月 - // -> 0 > e0 > e1: 大下弦月 - // - if 0 >= e1 && e1 >= e2 { - polarQuadrant = XIAOXIAXIANYUE - if 0 >= e0 && e0 >= e1 { - polarQuadrant = DAXIAXIANYUE - } - } - - // ## 同上 - if (0 >= e2 && 0 >= e1) && e2 >= e1 { - polarQuadrant = XIAOXIAXIANYUE - } - // ## - // ## 满月 - // - // e1 > e2 > 0 : 小满月 pre - // -> e0 > e1 : 大满月pre - // - - if e1 >= e2 && e2 >= 0 { - polarQuadrant = XIAOMANYUE_PRE - if e0 > e1 { - polarQuadrant = DAMANYUE_PRE - } - } - // e1 > 0.1 > e2 > 0 : 小满月 - // -> e0 > e1 : 大满月 - // - - if e1 >= tinySeg && tinySeg >= e2 && e2 >= 0 { - polarQuadrant = XIAOMANYUE - if e0 > e1 { - polarQuadrant = DAMANYUE - } - } - - // e0,e1,e2: 0.9699903789854316 0.1802190672652184 -1.7888783234326784 - // e1 > 0 > e2 > -0.1 : 小满月post - // -> e0 > e1 > 0 : 大满月post - // - if e1 >= 0 && 0 >= e2 && e2 >= -100000 { - polarQuadrant = XIAOMANYUE_POST - if e0 > e1 { - polarQuadrant = DAMANYUE_POST - } - } - // e0,e1,e2: -0.049579775302532776 0 -0.018291567587323976 - // ## 新月 - // e1 < e2 <0: 小新月pre - // -> e1 > e0 : 大新月pre - // - if e1 <= e2 && e2 <= 0 && e2 >= -1*tinySeg { - polarQuadrant = XIAOXINYUE_PRE - if e1 > e0 { - polarQuadrant = DAXINYUE_PRE - } - } - // e1 < -0.1 < e2 <0 小新月 - // -> e1 > e0 : 大新月 - if e1 <= -1*tinySeg && -1*tinySeg <= e2 && e2 <= 0 { - polarQuadrant = XIAOXINYUE - if e1 > e0 { - polarQuadrant = DAXINYUE - } - } - // - // e1 < 0 < e2 < 0.1 小新月post - // -> e1 > e0 : 大新月post - //e0,e1,e2: -0.03902244287114438 -0.13929829606729519 0.14828528291036536 - if e1 <= 0 && 0 <= e2 && e2 <= 1000000 { - polarQuadrant = XIAOXINYUE_POST - if e1 > e0 { - polarQuadrant = DAXINYUE_POST - } - } - - return polarQuadrant -} - -// 计算当前某段的曲线正弦所处极坐标象限 -func CalPolarQuadrant(maXSeg *SegmentItem) string { - if len(maXSeg.SubItemList) == 0 { - return "subItem no polarQuadrant" - } - m0 := maXSeg.SubItemList[0] - m1 := maXSeg.SubItemList[1] - m2 := maXSeg.SubItemList[2] - e0 := m0.VerticalElevation - e1 := m1.VerticalElevation - e2 := m2.VerticalElevation - polarQuadrant := CalPolar(e0, e1, e2) - if polarQuadrant == "default" { - env := os.Getenv("GO_ENV") - if env != "production" { - fmt.Println(GetFuncName(), " instId:", maXSeg.InstID, " period:", maXSeg.Period, " ctype", maXSeg.Ctype, " e0,e1,e2:", e0, e1, e2) - } - } - - return polarQuadrant -} - -func (seg *SegmentItem) SetToKey(cr *Core) error { - if seg.InstID == "USDC-USDT" { - return nil - } - keyName := seg.InstID + "|" + seg.Period + "|" + seg.Ctype + "|segmentItem" - bj, err := json.Marshal(seg) - if err != nil { - logrus.Warn("se.MakeSegment: ", err, seg) - } - cr.RedisLocalCli.Set(keyName, string(bj), 0) - - sf7 := float64(0) - sf7 = seg.LastCandle.Y - seg.LastMa7.Y - sf30 := float64(0) - sf30 = seg.LastCandle.Y - tms := time.Now().Format("2006-01-02 15:04:05.000") - // fmt.Println("tms: ", seg.InstID, seg.Period, tms, seg.LastUpdate) - she := ShearItem{ - LastUpdate: time.Now().UnixMilli(), - LastUpdateTime: tms, - VerticalElevation: seg.SubItemList[2].VerticalElevation, - Ratio: seg.LastCandle.Y / seg.SubItemList[2].VerticalElevation, - Score: seg.LastCandle.Score, - PolarQuadrant: seg.PolarQuadrant, - } - - if seg.Ctype == "ma7" { - she.ShearForce = sf7 - } - - if seg.Ctype == "ma30" { - she.ShearForce = sf30 - } - sbj, _ := json.Marshal(she) - keyName = seg.InstID + "|" + seg.Period + "|" + seg.Ctype + "|shearItem" - cr.RedisLocalCli.Set(keyName, string(sbj), 3*time.Minute) - cr.RedisLocal2Cli.Set(keyName, string(sbj), 3*time.Minute) - return nil -} - -func (seg *SegmentItem) Show() error { - if seg.InstID == "USDC-USDT" { - return nil - } - bj, _ := json.Marshal(*seg) - logrus.Warn("SegmentItem Show:", string(bj)) - return nil -} - -func (jgm *SegmentItem) Report(cr *Core) error { - return nil -} -func (seg *SegmentItem) Process(cr *Core) { - go func() { - if seg == nil { - return - } - seg.Show() - seg.SetToKey(cr) - // sheGrp, err := seg.MakeShearForceGrp(cr) - // if err != nil { - // log.Panic(err) - // } - // 当最后一个维度数据更新后,触发显示和备份 - - // 空的就可以 - shg := ShearForceGrp{ - InstID: seg.InstID, - Ma30PeriodGroup: map[string]ShearItem{}, - Ma7PeriodGroup: map[string]ShearItem{}, - } - if seg.Period == "4H" { - time.Sleep(50 * time.Millisecond) //等可能存在的5D也ready - go func() { - cr.ShearForceGrpChan <- &shg - }() - } - }() -} - -func (srs *Series) MakeSegment(cr *core.Core, start int, end int, subArys [][]int, ctype string) *SegmentItem { - list := []*core.Pixel{} - if ctype == "ma7" { - list = srs.Ma7Series.List - } - if ctype == "ma30" { - list = srs.Ma30Series.List - } - st := start - if len(list) == 0 { - return nil - } - for i := start; i <= end; i++ { - if list[i].X == 0 && list[i].Y == 0 { - if i+1 < len(list) { - st = i + 1 - } else { - logrus.Panic(GetFuncName(), "没有符合的记录") - } - } - } - extra, _ := srs.GetExtremum(cr, st, end, ctype) - yj, err := srs.GetElevation(cr, ctype, st, end) - if err != nil { - fmt.Println("MakeSegment GetElevation err : ", err) - } - tm := time.Now() - seg := SegmentItem{ - InstID: srs.InstID, - Period: srs.Period, - ReportTime: tm.UnixMilli(), - ReportTimeStr: tm.Format("2006-01-02 15:04:05.000"), - LastUpdate: srs.LastUpdateTime, - FirstPixel: list[st], - LastPixel: list[end], - ExtremumPixels: extra, - Ctype: ctype, - VerticalElevation: yj, - StartIdx: st, - EndIdx: end, - LastCandle: srs.CandleSeries.List[end], - LastMa7: srs.Ma7Series.List[end], - SubItemList: []SegmentItem{}, - PolarQuadrant: "none", - } - - if len(subArys) > 0 { - for _, pair := range subArys { - sub := [][]int{} - curSeg := srs.MakeSegment(cr, pair[0], pair[1], sub, ctype) - seg.SubItemList = append(seg.SubItemList, *curSeg) - } - } - polar := CalPolarQuadrant(&seg) - seg.PolarQuadrant = polar - return &seg -} diff --git a/internal/analysis/series.go b/internal/analysis/series.go deleted file mode 100644 index 5246c4c..0000000 --- a/internal/analysis/series.go +++ /dev/null @@ -1,568 +0,0 @@ -package analysis - -import ( - "bytes" - "encoding/json" - "errors" - "fmt" - "net/http" - "os" - "strconv" - "time" - - "github.com/go-redis/redis" - "github.com/phyer/core/internal/core" - "github.com/phyer/core/internal/models" - "github.com/phyer/core/internal/utils" - logrus "github.com/sirupsen/logrus" -) - -type Series struct { - InstID string `json:"instID"` - Period string `json:"Period"` - Count int `json:"count,number"` - Scale float64 `json:"scale,number"` - LastUpdateTime int64 `json:"lastUpdateTime,number"` - UpdateNickName string - LastCandle1m *models.Candle `json:"lastCandle1m"` - CandleSeries *core.PixelList `json:"candleSerie"` - Ma7Series *core.PixelList `json:"ma7Serie"` - Ma30Series *core.PixelList `json:"ma30Serie"` -} - -type SeriesInfo struct { - InstID string `json:"instID"` - Period string `json:"period"` - InsertedNew bool `json:"insertedNew,bool"` - Score float64 `json:"score,number"` -} -type SeriesInfoScore struct { - InstID string `json:"instID"` - Score float64 `json:"score,number"` -} - -// TODO -// redis key: verticalReportItem|BTC-USDT|4H-15m|ts:1643002300000 -// sortedSet: verticalLimit|2D-4H|rank|sortedSet -type VerticalReportItem struct { - InstID string - Period string - ReportTime int64 - LastUpdate int64 - LastUpdateTime string - Interval int - TrigerValue float64 - AdvUpSellPrice float64 - AdvDownSellPrice float64 - Rank float64 - ShearForce float64 - VerticalElevation float64 - SecondPeriod string -} - -// type Segment struct { -// IndextStart int -// IndexEnd int -// -// } - -// 根据instId 和period 从 PlateMap里拿到coaster,创建对应的 series, -func (sr *Series) Refresh(cr *core.Core) error { - curCo, err := cr.GetCoasterFromPlate(sr.InstID, sr.Period) - if err != nil { - return err - } - - ma30List := curCo.Ma30List.List - ma30len := len(ma30List) - if ma30len == 0 { - err = errors.New("ma30List is empty:" + sr.InstID + "," + sr.Period) - return err - } - baseMaX := ma30List[ma30len-1] - - ma30Pxl, err := curCo.Ma30List.MakePixelList(cr, baseMaX, sr.Scale) - if err != nil { - return err - } - sr.Ma30Series = ma30Pxl - ma7Pxl, err := curCo.Ma7List.MakePixelList(cr, baseMaX, sr.Scale) - if err != nil { - return err - } - sr.Ma7Series = ma7Pxl - curCo.CandleList.RecursiveBubbleS(len(curCo.CandleList.List), "asc") - candlePxl, err := curCo.CandleList.MakePixelList(cr, baseMaX, sr.Scale) - if err != nil { - return err - } - sr.CandleSeries = candlePxl - // bj, _ := json.Marshal(sr.Ma30Series) - // fmt.Println("sr.Ma30Series:", sr.Period, sr.InstID, string(bj)) - sr.LastUpdateTime = sr.Ma30Series.LastUpdateTime - // fmt.Println("candlePxl: ", candlePxl) - return nil -} - -func (sr *Series) SetToKey(cr *core.Core) (string, error) { - if sr == nil || sr.CandleSeries == nil { - return "", errors.New("sr.CandlesSeries == nil") - } - sr.CandleSeries.RecursiveBubbleS(sr.CandleSeries.Count, "asc") - sr.CandleSeries.ReIndex() - sr.CandleSeries.RecursiveBubbleS(sr.CandleSeries.Count, "asc") - // sr.CandleSeries.RecursiveBubbleX(sr.CandleSeries.Count, "asc") - sr.Ma7Series.RecursiveBubbleS(sr.CandleSeries.Count, "asc") - // sr.Ma7Series.RecursiveBubbleX(sr.CandleSeries.Count, "asc") - sr.Ma30Series.RecursiveBubbleS(sr.CandleSeries.Count, "asc") - // sr.Ma30Series.RecursiveBubbleX(sr.CandleSeries.Count, "asc") - now := time.Now().UnixMilli() - sr.LastUpdateTime = now - sr.CandleSeries.LastUpdateTime = now - sr.CandleSeries.UpdateNickName = GetRandomString(12) - sr.UpdateNickName = GetRandomString(12) - js, _ := json.Marshal(*sr) - seriesName := sr.InstID + "|" + sr.Period + "|series" - res, err := cr.RedisLocalCli.Set(seriesName, string(js), 0).Result() - if err != nil { - logrus.Panic(utils.GetFuncName(), err, " seriesSetToKey1: instId:", sr.InstID, " period: ", sr.Period, " lastUpdate:", sr.LastUpdateTime, " md5:", Md5V(string(js))) - } - res, err = cr.RedisLocal2Cli.Set(seriesName, string(js), 0).Result() - return res, err -} -func PrintSerieY(cr *core.Core, list []redis.Z, period string, count int) { - // fmt.Println("PrintSerieY start") - env := os.Getenv("GO_ENV") - isProduction := env == "production" - //TODO 只有非产线环境,才会显示此列表 - if !isProduction { - fmt.Println("seriesYTop count:", count, "period:", period, "sort start") - } - seiScrList := []*SeriesInfoScore{} - for _, v := range list { - sei := SeriesInfo{} - seiScr := SeriesInfoScore{} - json.Unmarshal([]byte(v.Member.(string)), &sei) - seiScr.InstID = sei.InstID - seiScr.Score = v.Score - seiScrList = append(seiScrList, &seiScr) - - // if k < count { - // if !isProduction { - // fmt.Println("seriesYTop", count, "No.", k+1, "period"+period, "InstID:", sei.InstID, "score:", v.Score) - // } - // 拉扯极限报告 - // } - // if k == count+1 { - // if !isProduction { - // fmt.Println("seriesYTop end -------" + "period" + period + "-------------------------------------") - // fmt.Println("seriesYLast start -------" + "period" + period + "-------------------------------------") - // } - // } - // if k > len(list)-count-1 { - // if !isProduction { - // fmt.Println("seriesYLast", count, "No.", k+1, "period"+period, "InstID:", sei.InstID, "score:", v.Score) - // } - // } - } - - bj, _ := json.Marshal(seiScrList) - reqBody := bytes.NewBuffer(bj) - cr.Env = os.Getenv("GO_ENV") - cr.FluentBitUrl = os.Getenv("SARDINE_FluentBitUrl") - fullUrl := "http://" + cr.FluentBitUrl + "/seriesY." + period - - res, err := http.Post(fullUrl, "application/json", reqBody) - fmt.Println("requested, response:", fullUrl, reqBody, res) - if err != nil { - logrus.Error(err) - } - - if !isProduction { - fmt.Println("seriesYLast count:", count, "period:", period, "sort end") - } -} - -func (sei *SeriesInfo) Process(cr *core.Core) { - curSe, err := cr.GetPixelSeries(sei.InstID, sei.Period) - if err != nil { - logrus.Warn("GetPixelSeries: ", err) - return - } - - // TODO 金拱门 - // list := cr.GetMyCcyBalanceName() - go func(se Series) { - - threeSeg := [][]int{[]int{0, 19}, []int{19, 22}, []int{22, 23}} - - ma7Seg := se.MakeSegment(cr, 0, 23, threeSeg, "ma7") - go func() { - cr.SegmentItemChan <- ma7Seg - }() - - ma30Seg := se.MakeSegment(cr, 0, 23, threeSeg, "ma30") - - go func() { - cr.SegmentItemChan <- ma30Seg - }() - - }(curSe) - - cli := cr.RedisLocalCli - go func(se Series) { - // 拉扯极限报告 - willReport := os.Getenv("SARDINE_SERIESTOREPORT") == "true" - logrus.Info("willReport:", willReport) - // fmt.Println("willReport:", willReport) - if !willReport { - return - } - err = curSe.AddToYSorted(cr) - if err != nil { - logrus.Warn("sei addToYSorted err: ", err) - return - } - // 所有维度拉扯极限 - go func(se Series) { - if se.InstID != "BTC-USDT" { - return - } - list, err := cli.ZRevRangeWithScores("series|YValue|sortedSet|period"+se.Period, 0, -1).Result() - if err != nil { - fmt.Println("series sorted err", err) - } - PrintSerieY(cr, list, se.Period, 20) - }(se) - - }(curSe) - // TODO 刘海儿检测, 监测金拱门中的刘海儿,预警下跌趋势, 其实有没有金拱门并不重要,刘海儿比金拱门更有说服力 - go func(se Series) { - // 如何定义刘海:目前定义如下,3m以上的周期时,当7个或小于7个周期内的时间内发生了一次下坠和一次上升,下坠幅度达到2%以上,并随后的上升中收复了下坠的幅度,那么疑似刘海儿发生。用的周期越少,越强烈,探底和抬升的幅度越大越强烈,所处的维度越高越强烈,比如15m的没有1H的强烈 - // 如果发生在BTC身上,那么将影响所有 - // se.CheckLiuhai() { - // - // } - - }(curSe) - go func(se Series) { - allow := os.Getenv("SARDINE_SERIESINFOTOCHNL") == "true" - if !allow { - return - } - time.Sleep(0 * time.Second) - sei := SeriesInfo{ - InstID: curSe.InstID, - Period: curSe.Period, - } - cr.AddToGeneralSeriesInfoChnl(&sei) - }(curSe) - -} - -//------------------------------------------------------------------------------- - -// 拉扯极限相关: 加入seriesY值排行榜, 用于生成拉扯极限 -func (srs *Series) AddToYSorted(cr *core.Core) error { - setName := "series|YValue|sortedSet|period" + srs.Period - srs.CandleSeries.RecursiveBubbleS(srs.CandleSeries.Count, "asc") - length := len(srs.CandleSeries.List) - if length != srs.Count { - err := errors.New("AddToYSorted err: 数据量不够") - return err - } - lastCandlePixel1 := srs.CandleSeries.List[srs.Count-1] - sei := SeriesInfo{ - InstID: srs.InstID, - Period: srs.Period, - } - bj, _ := json.Marshal(sei) - // TODO -200 是个无效的值,如果遇到-200就赋予0值,这个办法不好,后面考虑不用sortedSet,而用自定义对象更好些。 - if lastCandlePixel1.Y == -200 { - lastCandlePixel1.Y = 0 - } - z := redis.Z{ - Score: float64(lastCandlePixel1.Y), - Member: string(bj), - } - // TODO ZAdd 有可能由于bug或者key不一样的原因,让列表变长,需要想办法怎么定期请空 - if lastCandlePixel1.Score != 0 { - cr.RedisLocalCli.ZAdd(setName, z).Result() - } - return nil -} - -// 垂直极限排名有一定片面性。暂时先不开放。垂直极限推荐最高的,可能是个不太容易📈上来的股票,甚至垃圾股,而且过一会儿可能跌的更多,所以就算使用这个功能,也仅供参考, -func (vir *VerticalReportItem) AddToVeriticalLimitSorted(cr *core.Core, srs *Series, period2 string) error { - // redis key: verticalReportItem|BTC-USDT|4H-15m|ts:1643002300000 - // sortedSet: verticalLimit|2D-4H|rank|sortedSet - - setName := "verticalLimit|" + srs.Period + "-" + period2 + "|rank|sortedSet" - tms := strconv.FormatInt(srs.LastUpdateTime, 10) - keyName := "verticalLimit|" + srs.InstID + "|" + srs.Period + "-" + period2 + "|ts:" + tms - z := redis.Z{ - Score: float64(srs.LastUpdateTime), - Member: keyName, - } - if vir.Rank != -1 && vir.Rank != 0 { - extt := 48 * time.Hour - ot := time.Now().Add(extt * -1) - oti := ot.UnixMilli() - count, _ := cr.RedisLocalCli.ZRemRangeByScore(setName, "0", strconv.FormatInt(oti, 10)).Result() - if count > 0 { - logrus.Warning("移出过期的引用数量:", setName, count, "ZRemRangeByScore ", setName, 0, strconv.FormatInt(oti, 10)) - } - cr.RedisLocalCli.ZAdd(setName, z).Result() - bj, _ := json.Marshal(vir) - cr.RedisLocalCli.Set(keyName, bj, 48*time.Hour).Result() - } - return nil -} - -func (vri *VerticalReportItem) Report(cr *core.Core) error { - dd := DingdingMsg{ - Topic: "垂直极限触发", - RobotName: "pengpeng", - AtAll: true, - Ctype: "markdown", - Content: "", - } - ary1 := []string{} - str := "``币名: ``" + vri.InstID + "\n" - str1 := fmt.Sprintln("``基础维度:``", vri.Period) - str2 := fmt.Sprintln("``剪切维度:``", vri.SecondPeriod) - str21 := fmt.Sprintln("``观察周期:``", vri.Interval) - str3 := fmt.Sprintln("``平均仰角:``", vri.VerticalElevation) - str4 := fmt.Sprintln("``剪切力:``", vri.ShearForce) - str5 := fmt.Sprintln("``Rank:``", vri.Rank) - score := vri.TrigerValue - str6 := fmt.Sprintln("``触发买入价位:``", score) - str7 := fmt.Sprintln("``建议止盈价位:``", vri.AdvUpSellPrice) - str8 := fmt.Sprintln("``建议止损价位:``", vri.AdvDownSellPrice) - str9 := "----------------------\n" - ary1 = append(ary1, str, str1, str2, str21, str3, str4, str5, str6, str7, str8, str9) - dd.AddItemListGrp("垂直极限", 2, ary1) - ary2 := []string{} - - tm := time.Now().Format("01-02:15:04") - rtime := fmt.Sprintln("``报告时间:``", tm) - ctype := fmt.Sprintln("``类型:``", "极限触发,已弃用") - from := "来自: " + os.Getenv("HOSTNAME") - ary2 = append(ary2, rtime, ctype, from) - dd.AddItemListGrp("", 2, ary2) - dd.PostToRobot("pengpeng", cr) - - return nil -} - -func (vri *VerticalReportItem) Show(cr *core.Core) error { - ary1 := []string{} - str := "``币名: ``" + vri.InstID + "\n" - str1 := fmt.Sprintln("``基础维度:``", vri.Period) - str2 := fmt.Sprintln("``剪切维度:``", vri.SecondPeriod) - str21 := fmt.Sprintln("``观察周期:``", vri.Interval) - str3 := fmt.Sprintln("``平均仰角:``", vri.VerticalElevation) - str4 := fmt.Sprintln("``剪切力:``", vri.ShearForce) - str5 := fmt.Sprintln("``Rank:``", vri.Rank) - score := vri.TrigerValue - str6 := fmt.Sprintln("``触发买入价位:``", score) - str7 := fmt.Sprintln("``建议止盈价位:``", vri.AdvUpSellPrice) - str8 := fmt.Sprintln("``建议止损价位:``", vri.AdvDownSellPrice) - str9 := "----------------------\n" - ary1 = append(ary1, str, str1, str2, str21, str3, str4, str5, str6, str7, str8, str9) - for _, v := range ary1 { - fmt.Println("verticalReportItem: ", v) - } - return nil -} - -// TODO 求某个PixelList里两个点之间的仰角,从ridx开始,往lidx的元素画一条直线,的仰角 - -func (srs *Series) GetElevation(cr *core.Core, ctype string, lIdx int, rIdx int) (float64, error) { - yj := float64(0) - switch ctype { - case "candle": - { - yj = (srs.CandleSeries.List[rIdx].Y - srs.CandleSeries.List[lIdx].Y) / float64(rIdx-lIdx) - } - case "ma7": - { - yj = (srs.Ma7Series.List[rIdx].Y - srs.Ma7Series.List[lIdx].Y) / float64(rIdx-lIdx) - } - case "ma30": - { - yj = (srs.Ma30Series.List[rIdx].Y - srs.Ma30Series.List[lIdx].Y) / float64(rIdx-lIdx) - - } - } - return yj, nil -} - -// TODO 求极值,在某个线段上。一个最大值,一个最小值 -func (srs *Series) GetExtremum(cr *core.Core, lIdx int, rIdx int, ctype string) (*Extremum, error) { - ext := Extremum{ - Max: &Pixel{}, - Min: &Pixel{}, - } - - switch ctype { - case "candle": - { - done := false - for k, v := range srs.CandleSeries.List { - if k < lIdx { - continue - } - if v.X == 0 && v.Y == 0 { - continue - } - if k > rIdx { - continue - } - if !done { - ext.Max = srs.CandleSeries.List[k] - ext.Min = srs.CandleSeries.List[k] - done = true - } - - if v.Y > ext.Max.Y { - ext.Max = v - } - if v.Y < ext.Min.Y { - ext.Min = v - } - } - // ext = nil - } - case "ma7": - { - done := false - for k, v := range srs.Ma7Series.List { - if k < lIdx { - continue - } - if v.X == 0 && v.Y == 0 { - continue - } - if k > rIdx { - continue - } - if !done { - ext.Max = srs.Ma7Series.List[k] - ext.Min = srs.Ma7Series.List[k] - done = true - } - if v.Y > ext.Max.Y { - ext.Max = v - } - if v.Y < ext.Min.Y { - ext.Min = v - } - } - // ext = nil - } - case "ma30": - { - - done := false - for k, v := range srs.Ma30Series.List { - if k < lIdx { - continue - } - if v.X == 0 && v.Y == 0 { - continue - } - if k > rIdx { - continue - } - if !done { - ext.Max = srs.Ma30Series.List[k] - ext.Min = srs.Ma30Series.List[k] - done = true - } - if v.Y > ext.Max.Y { - ext.Max = v - } - if v.Y < ext.Min.Y { - ext.Min = v - } - } - // ext = nil - } - } - return &ext, nil -} - -// TODO 获取垂直极限列表 -// 筛选条件: -// -// 1. 极大值未发生在最后周期的,排除 -// 2. n周期内,有仰角小于0的,排除 -// 注意: 仰角极值未必发生在最后一个周期 -// -// 对剩下的币种结果,计算: -// -// 1. n周期平均仰角: s -// 2. 最后周期仰角: p -// -// 筛选出最后仰角高于n周期平均仰角的币列表, -// 以最后仰角为结果,得到一个值 p -// 对此列表集合,得到每个的15分钟维度拉扯极限,每个计算后得到一个结果 f, -// -// f值权重更高,p值权重降一个量级,求出分值用于排名, -// -// rank = 2 * (lcjx * -1) * (1 + avgElevation) * (1 + lastElevation) * (1 + lastElevation) -// -// 存储在sortedSet里,命名: -// verticalLimit|15m~4H|rank|sortedSet -// return rank, err -func (vir *VerticalReportItem) MakeVerticalLimit(cr *core.Core, srs *Series, startIdx int, endIdx int, period2 string) (err error) { - count := len(srs.CandleSeries.List) - 1 - lastMa30Pixel := srs.Ma30Series.List[count] - // func (srs *Series) GetExtremum(cr *core.Core, lIdx int, rIdx int, ctype string) (*Extremum, error) { - ext, err := srs.GetExtremum(cr, startIdx, endIdx, "ma30") - if err != nil { - logrus.Warn(utils.GetFuncName(), ":", err) - } - - if ext.Max.Score < 1.05*lastMa30Pixel.Score { - lbj, _ := json.Marshal(lastMa30Pixel) - lext, _ := json.Marshal(ext) - err = errors.New(fmt.Sprintln("当前pixel不是极值", " lastMa30Pixel: ", string(lbj), " ext: ", string(lext))) - return err - } else { - err = errors.New(fmt.Sprintln("当前pixel满足极值", lastMa30Pixel)) - } - - yj, err := srs.GetElevation(cr, "ma30", startIdx, endIdx) - if err != nil { - logrus.Warn(utils.GetFuncName(), ":", err) - } - - vir.VerticalElevation = yj - lcjx, _ := LacheJixian(cr, srs, period2) - vir.ShearForce = lcjx - vir.TrigerValue = srs.CandleSeries.List[len(srs.CandleSeries.List)-1].Score - vir.AdvUpSellPrice = vir.TrigerValue * 1.04 - vir.AdvDownSellPrice = vir.TrigerValue * 0.98 - // 计算rank的公式如下 - // rank := 2 * (lcjx * -1) * (1 + avgElevation) * (1 + lastElevation) * (1 + lastElevation) - // vir.Rank = rank - return nil -} - -// 计算剪切力 -func LacheJixian(cr *core.Core, srs *Series, period string) (float64, error) { - curSe, _ := cr.GetPixelSeries(srs.InstID, period) - return curSe.CandleSeries.List[len(srs.CandleSeries.List)-1].Y, nil -} - -// type SegmentItem struct { -// InstID string -// Period string -// ReportTime int64 -// lastUpdate int64 -// Interval int -// Direct string // up, down -// VerticalElevation float64 -// } diff --git a/internal/analysis/shearGorceGrp.go b/internal/analysis/shearGorceGrp.go deleted file mode 100644 index fac4e85..0000000 --- a/internal/analysis/shearGorceGrp.go +++ /dev/null @@ -1,268 +0,0 @@ -package analysis - -import ( - "encoding/json" - "fmt" - "os" - "strconv" - "time" - - "github.com/go-redis/redis" - "github.com/phyer/core/internal/core" - logrus "github.com/sirupsen/logrus" -) - -type ShearItem struct { - ShearForce float64 // ma30-candle剪切力 - VerticalElevation float64 // 仰角, Interval范围内线段的仰角 - Ratio float64 // 剪切力除以仰角的比值 - Score float64 // 当前LastCandleY点本值 - PolarQuadrant string // shangxian,manyue,xiaxian,xinyue, 分别对应圆周的四个阶段。 - LastUpdate int64 - LastUpdateTime string -} -type ShearForceGrp struct { - InstID string - LastUpdate int64 - LastUpdateTime string - Ma30PeriodGroup map[string]ShearItem - Ma7PeriodGroup map[string]ShearItem - From string -} - -// TODO 弃用 -// func (seg *SegmentItem) MakeShearForceGrp(cr *Core) (*ShearForceGrp, error) { -// shg := ShearForceGrp{ -// InstID: seg.InstID, -// Ma30PeriodGroup: map[string]ShearItem{}, -// Ma7PeriodGroup: map[string]ShearItem{}, -// } -// err := shg.ForceUpdate(cr) -// sf1 := float64(0) -// sf1 = seg.LastCandle.Y - seg.LastMa7.Y -// she := ShearItem{ -// LastUpdate: time.Now().UnixMilli(), -// VerticalElevation: seg.SubItemList[2].VerticalElevation, -// Ratio: seg.LastCandle.Y / seg.SubItemList[2].VerticalElevation, -// Score: seg.LastCandle.Score, -// PolarQuadrant: seg.PolarQuadrant, -// } -// if seg.Ctype == "ma7" { -// she.ShearForce = seg.LastCandle.Y -// shg.Ma7PeriodGroup[seg.Period] = she -// } -// if seg.Ctype == "ma30" { -// she.ShearForce = sf1 -// shg.Ma30PeriodGroup[seg.Period] = she -// } -// return &shg, err -// } - -// TODO 弃用 -// func (shg *ShearForceGrp) ForceUpdate(cr *Core) error { -// ctype := "ma7" -// hmName := shg.InstID + "|" + ctype + "|shearForceGrp" -// res, err := cr.RedisLocalCli.HGetAll(hmName).Result() -// -// for k, v := range res { -// si := ShearItem{} -// json.Unmarshal([]byte(v), &si) -// shg.Ma7PeriodGroup[k] = si -// } -// -// ctype = "ma30" -// hmName = shg.InstID + "|" + ctype + "|shearForceGrp" -// res, err = cr.RedisLocalCli.HGetAll(hmName).Result() -// -// for k, v := range res { -// si := ShearItem{} -// json.Unmarshal([]byte(v), &si) -// shg.Ma30PeriodGroup[k] = si -// } -// shg.SetToKey(cr) -// return err -// } -func (she *ShearForceGrp) Show(cr *Core) error { - js, err := json.Marshal(she) - logrus.Info(GetFuncName(), ": ", string(js)) - - return err -} - -// TODO 需要重构: 已经重构 -// 对象数据库落盘 -func (she *ShearForceGrp) SetToKey(cr *Core) error { - keyName := she.InstID + "|shearForce" - she.From = os.Getenv("HOSTNAME") - she.LastUpdateTime = time.Now().Format("2006-01-02 15:04:05.000") - js, err := json.Marshal(she) - if err != nil { - logrus.Panic(GetFuncName(), " err: ", err) - } else { - cr.RedisLocalCli.Set(keyName, string(js), 0).Result() - cr.RedisLocal2Cli.Set(keyName, string(js), 0).Result() - } - return err -} - -func (she *ShearForceGrp) maXPrd(cr *Core, ctype string) { - // 先把对象克隆,防止在处理的过程中对象发生变更 - she2 := *she - she3 := &she2 - // 查了一下,json marshal 有线程安全问题,需要用户自己加锁,先不用了 - // bj, _ := json.Marshal(she3) - // bytes := []byte(bj) - // var she4 ShearForceGrp - // json.Unmarshal(bytes, she4) - // 先声明map - var grp map[string]ShearItem - // 再使用make函数创建一个非nil的map,nil map不能赋值 - grp = make(map[string]ShearItem) - if ctype == "ma7" { - //fmt.Println("len of ma7 she.Ma7PeriodGroup: ", len(she3.Ma7PeriodGroup)) - bj, err := json.Marshal(she3.Ma7PeriodGroup) - if err != nil { - logrus.Panic(GetFuncName(), " err:", err) - } - json.Unmarshal(bj, &grp) - //fmt.Println("len of ma30 she.Ma7PeriodGroup: ", len(she3.Ma7PeriodGroup)) - } else if ctype == "ma30" { - bj, err := json.Marshal(she3.Ma30PeriodGroup) - if err != nil { - logrus.Panic(GetFuncName(), " err: ", err) - } - json.Unmarshal(bj, &grp) - } - for period, shearItem := range grp { - setName := "shearForce|ratio|" + ctype + "|" + period + "|sortedSet" - // TODO:这个key用于判定当前instID|maX|period|的ratio排名是否已经过期 - timelinessKey := "shearForce|ratio|" + she.InstID + "|" + ctype + "|" + period + "|lastUpdate" - sei := SeriesInfo{ - InstID: she3.InstID, - Period: period, - } - // 阈值先暂且设置为 -100 - // SHEARFORCE_VERTICAL_RATE - threahold := float64(SHEARFORCE_VERTICAL_RATE) - bj, _ := json.Marshal(sei) - z := redis.Z{ - Score: float64(shearItem.Ratio), - Member: string(bj), - } - //无论超过阈值,还是低于阈值的负数,都是达标 - if shearItem.Ratio < -1*threahold { - cr.RedisLocalCli.ZAdd(setName, z).Result() - cr.RedisLocalCli.Set(timelinessKey, shearItem.LastUpdate, 3*time.Minute) - } else if shearItem.Ratio > threahold { - cr.RedisLocalCli.ZAdd(setName, z).Result() - cr.RedisLocalCli.Set(timelinessKey, shearItem.LastUpdate, 3*time.Minute) - } else { - cr.RedisLocalCli.ZRem(setName, string(bj)).Result() - } - } -} - -// 把所有引用调用都改成传值调用,试试,看能不能解决那个陈年bug -func (she *ShearForceGrp) AddToRatioSorted(cr *Core) error { - she.maXPrd(cr, "ma7") - she.maXPrd(cr, "ma30") - return nil -} - -// TODO 需要重构: 看了一下,不用重构 -func (she *ShearForceGrp) MakeSnapShot(cr *Core) error { - nw := time.Now() - tm := nw.UnixMilli() - tm = tm - tm%60000 - tms := strconv.FormatInt(tm, 10) - js, err := json.Marshal(she) - - keyName1 := fmt.Sprint(she.InstID + "|shearForce|snapShot|ts:" + tms) - keyName2 := fmt.Sprint(she.InstID + "|shearForce|snapShot|last") - _, err = cr.RedisLocalCli.Set(keyName1, string(js), time.Duration(24)*time.Hour).Result() - _, err = cr.RedisLocalCli.Set(keyName2, string(js), time.Duration(24)*time.Hour).Result() - _, err = cr.RedisLocal2Cli.Set(keyName1, string(js), time.Duration(24)*time.Hour).Result() - _, err = cr.RedisLocal2Cli.Set(keyName2, string(js), time.Duration(24)*time.Hour).Result() - writeLog := os.Getenv("SARDINE_WRITELOG") == "true" - if !writeLog { - return err - } - wg := WriteLog{ - Content: js, - Tag: she.InstID + ".shearForce", - } - go func() { - cr.WriteLogChan <- &wg - }() - return nil -} - -func (sheGrp *ShearForceGrp) Refresh(cr *Core) error { - segments := cr.Cfg.Config.Get("softCandleSegmentList").MustArray() - ma7Grp := map[string]ShearItem{} - ma30Grp := map[string]ShearItem{} - //搜集各个维度未过期的shearItem数据,组合成shearForceGrp对象 - for _, v := range segments { - cs := CandleSegment{} - sv, _ := json.Marshal(v) - json.Unmarshal(sv, &cs) - shi30, err := MakeShearItem(cr, sheGrp.InstID, cs.Seg, "ma30") - if err != nil { - logrus.Warn(GetFuncName(), err) - } else { - ma30Grp[cs.Seg] = *shi30 - } - shi7, err := MakeShearItem(cr, sheGrp.InstID, cs.Seg, "ma7") - if err != nil { - logrus.Warn(GetFuncName(), err) - } else { - ma7Grp[cs.Seg] = *shi7 - } - sheGrp.Ma7PeriodGroup = ma7Grp - sheGrp.Ma30PeriodGroup = ma30Grp - } - return nil -} - -func MakeShearItem(cr *Core, instId string, period string, ctype string) (*ShearItem, error) { - shi := ShearItem{} - keyn := instId + "|" + period + "|" + ctype + "|shearItem" - res, err := cr.RedisLocalCli.Get(keyn).Result() - if err != nil && len(res) == 0 { - return &shi, err - } - json.Unmarshal([]byte(res), &shi) - return &shi, err -} - -func (sheGrp *ShearForceGrp) Process(cr *Core) error { - go func() { - sheGrp.Show(cr) - // 传递过来的shg对象是空的,需要从segmentItem对象创建的shearItem对象组合中来重建 - sheGrp.Refresh(cr) - err := sheGrp.SetToKey(cr) - if err != nil { - logrus.Panic("srs SetToKey err: ", err) - } - // sheGrp.MakeSnapShot(cr) - // 下一个阶段计算 - allow := os.Getenv("SARDINE_MAKEANALYTICS") == "true" - if !allow { - return - } - - periodList := []string{} - for k := range sheGrp.Ma30PeriodGroup { - periodList = append(periodList, k) - } - }() - go func() { - sheGrp.AddToRatioSorted(cr) - }() - go func() { - // 另一个携程中,Analytics对象要读这里snapShot,我希望它读到的是老的而不是新的,所以等待2秒钟 - time.Sleep(2 * time.Second) - sheGrp.MakeSnapShot(cr) - }() - return nil -} diff --git a/internal/market/ticker.go b/internal/market/ticker.go deleted file mode 100644 index 369fcff..0000000 --- a/internal/market/ticker.go +++ /dev/null @@ -1,48 +0,0 @@ -package market - -import ( - "encoding/json" - "time" - - "github.com/phyer/core/internal/core" // 新增 - "github.com/phyer/core/internal/utils" // 新增 -) - -type TickerInfo struct { - Id string `json:"_id"` - InstID string `json:"instID"` - Last float64 `json:"last"` - LastUpdate time.Time `json:"lastUpdate"` - InstType string `json:"instType"` - VolCcy24h float64 `json:"volCcy24h"` - Ts int64 `json:"ts"` -} - -type TickerInfoResp struct { - InstID string `json:"instID"` - Last string `json:"last"` - InstType string `json:"instType"` - VolCcy24h string `json:"volCcy24h"` - Ts string `json:"ts"` -} - -func (tir *TickerInfoResp) Convert() TickerInfo { - ti := TickerInfo{ - Id: utils.HashString(tir.InstID + tir.Ts), - InstID: tir.InstID, - InstType: tir.InstType, - Last: utils.ToFloat64(tir.Last), - VolCcy24h: utils.ToFloat64(tir.VolCcy24h), - Ts: utils.ToInt64(tir.Ts), - LastUpdate: time.Now(), - } - return ti -} - -// TODO 有待实现 -func (ti *TickerInfo) SetToKey(cr *core.Core) error { - js, _ := json.Marshal(*ti) - plateName := ti.InstID + "|tickerInfo" - _, err := cr.RedisLocalCli.Set(plateName, string(js), 0).Result() - return err -} diff --git a/internal/notify/dingding.go b/internal/notify/dingding.go deleted file mode 100644 index dc5d5fa..0000000 --- a/internal/notify/dingding.go +++ /dev/null @@ -1,128 +0,0 @@ -package notify - -import ( - "crypto/hmac" - "crypto/sha256" - "encoding/base64" - "encoding/json" - "errors" - "io/ioutil" - "net/http" - "net/url" - "strconv" - "strings" - "time" - - logrus "github.com/sirupsen/logrus" -) - -const DingdingMsgType_Markdown = "markdown" - -type DingdingMsg struct { - RobotName string - Topic string - Ctype string - Content string - AtAll bool - UniqueCode string -} - -func (dd *DingdingMsg) AddItemListGrp(title string, level int, list []string) error { - pre := "" - if level < 1 { - err := errors.New("level is not allow " + strconv.FormatInt(int64(level), 10)) - return err - } - for i := level; i > 0; i-- { - pre = pre + "#" - } - title = pre + " " + title - dd.Content += "\n" - dd.Content += title - dd.Content += "\n" - for _, v := range list { - dd.Content += v - dd.Content += "\n" - } - return nil -} - -func MakeSign(baseUrl string, secure string, token string, tm time.Time) string { - tsi := tm.UnixMilli() - tsi = tsi - tsi%60000 - tss := strconv.FormatInt(tsi, 10) - sign := tss + "\n" + secure - sign = ComputeHmac256(secure, sign) - sign = url.QueryEscape(sign) - - url := baseUrl + "?access_token=" + token + "×tamp=" + tss + "&sign=" + sign - return url -} - -func (dd *DingdingMsg) MakeContent() []byte { - ctn := map[string]interface{}{} - ctn["msgtype"] = dd.Ctype - if dd.Ctype == DingdingMsgType_Markdown { - md := map[string]interface{}{} - md["title"] = dd.Topic - md["text"] = dd.Content - md["isAtAll"] = dd.AtAll - ctn[DingdingMsgType_Markdown] = md - } - btn, _ := json.Marshal(ctn) - return btn -} - -func ComputeHmac256(secret string, message string) string { - h := hmac.New(sha256.New, []byte(secret)) - h.Write([]byte(message)) - return base64.StdEncoding.EncodeToString(h.Sum(nil)) -} - -func PostHeader(url string, msg []byte, headers map[string]string) (string, error) { - client := &http.Client{} - - req, err := http.NewRequest("POST", url, strings.NewReader(string(msg))) - if err != nil { - return "", err - } - for key, header := range headers { - req.Header.Set(key, header) - } - resp, err := client.Do(req) - if err != nil { - logrus.Warn("postHeader err: ", err) - return "", err - } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return "", err - } - return string(body), nil -} -func (dd *DingdingMsg) PostToRobot(rbt string, cr *Core) (string, error) { - baseUrl, _ := cr.Cfg.Config.Get("dingding").Get("baseUrl").String() - secret, _ := cr.Cfg.Config.Get("dingding").Get("robots").Get(rbt).Get("secret").String() - token, _ := cr.Cfg.Config.Get("dingding").Get("robots").Get(rbt).Get("accessToken").String() - cli := cr.RedisLocalCli - - if len(dd.UniqueCode) > 0 { - unique := "ddPostUnique|" + dd.UniqueCode - exists, _ := cli.Exists(unique).Result() - if exists == 1 { - err := errors.New("20分钟内已经投递过了,不再重复") - return "", err - } - cli.Set(unique, 1, 20*time.Minute).Result() - } - nw := time.Now() - url := MakeSign(baseUrl, secret, token, nw) - - ctn := dd.MakeContent() - headers := make(map[string]string) - headers["Content-Type"] = "application/json;charset=utf-8" - res, err := PostHeader(url, ctn, headers) - logrus.Warn("postToRobot res:", res, string(ctn)) - return res, err -} diff --git a/internal/analysis/maX.go b/maX.go similarity index 67% rename from internal/analysis/maX.go rename to maX.go index 6d7e4e3..ee4dd4c 100644 --- a/internal/analysis/maX.go +++ b/maX.go @@ -1,14 +1,11 @@ -package analysis +package core import ( "encoding/json" "errors" "fmt" logrus "github.com/sirupsen/logrus" - - "github.com/phyer/core/internal/core" - "github.com/phyer/core/internal/models" - "github.com/phyer/core/internal/utils" + // "os" "strconv" "time" ) @@ -39,15 +36,15 @@ type WillMX struct { Count int } -func (mx MaX) SetToKey(cr *core.Core) ([]interface{}, error) { - // fmt.Println(utils.utils.GetFuncName(), " step1 ", mx.InstID, " ", mx.Period) +func (mx MaX) SetToKey(cr *Core) ([]interface{}, error) { + // fmt.Println(utils.GetFuncName(), " step1 ", mx.InstID, " ", mx.Period) // mx.Timestamp, _ = Int64ToTime(mx.Ts) cstr := strconv.Itoa(mx.Count) tss := strconv.FormatInt(mx.Ts, 10) //校验时间戳是否合法 ntm, err := cr.PeriodToLastTime(mx.Period, time.UnixMilli(mx.Ts)) if ntm.UnixMilli() != mx.Ts { - logrus.Warn(fmt.Sprint(utils.GetFuncName(), " candles时间戳有问题 ", " 应该: ", ntm, "实际:", mx.Ts)) + logrus.Warn(fmt.Sprint(GetFuncName(), " candles时间戳有问题 ", " 应该: ", ntm, "实际:", mx.Ts)) mx.Ts = ntm.UnixMilli() } keyName := "ma" + cstr + "|candle" + mx.Period + "|" + mx.InstID + "|ts:" + tss @@ -62,7 +59,7 @@ func (mx MaX) SetToKey(cr *core.Core) ([]interface{}, error) { logrus.Error("max SetToKey err: ", err) return mx.Data, err } - // fmt.Println(utils.utils.GetFuncName(), " step2 ", mx.InstID, " ", mx.Period) + // fmt.Println(utils.GetFuncName(), " step2 ", mx.InstID, " ", mx.Period) // tm := time.UnixMilli(mx.Ts).Format("01-02 15:04") cli := cr.RedisLocalCli if len(string(dj)) == 0 { @@ -70,13 +67,13 @@ func (mx MaX) SetToKey(cr *core.Core) ([]interface{}, error) { err := errors.New("data is block") return mx.Data, err } - // fmt.Println(utils.utils.GetFuncName(), " step3 ", mx.InstID, " ", mx.Period) + // fmt.Println(utils.GetFuncName(), " step3 ", mx.InstID, " ", mx.Period) _, err = cli.Set(keyName, dj, extt).Result() if err != nil { - logrus.Error(utils.GetFuncName(), " maXSetToKey err:", err) + logrus.Error(GetFuncName(), " maXSetToKey err:", err) return mx.Data, err } - // fmt.Println(utils.utils.GetFuncName(), " step4 ", mx.InstID, " ", mx.Period) + // fmt.Println(utils.GetFuncName(), " step4 ", mx.InstID, " ", mx.Period) // fmt.Println("max setToKey: ", keyName, "res:", res, "data:", string(dj), "from: ", mx.From) cr.SaveUniKey(mx.Period, keyName, extt, mx.Ts) return mx.Data, err @@ -96,14 +93,14 @@ func Int64ToTime(ts int64) (time.Time, error) { t = t.In(loc) return t, nil } -func (mx *MaX) PushToWriteLogChan(cr *core.Core) error { +func (mx *MaX) PushToWriteLogChan(cr *Core) error { s := strconv.FormatFloat(float64(mx.Ts), 'f', 0, 64) did := "ma" + ToString(mx.Count) + "|" + mx.InstID + "|" + mx.Period + "|" + s logrus.Debug("did of max:", did) hs := HashString(did) mx.Id = hs md, _ := json.Marshal(mx) - wg := logger.WriteLog{ + wg := WriteLog{ Content: md, Tag: "sardine.log.maX." + mx.Period, Id: hs, @@ -115,7 +112,7 @@ func (mx *MaX) PushToWriteLogChan(cr *core.Core) error { // TODO // 返回: // Sample:被顶出队列的元素 -func (mxl *MaXList) RPush(sm *MaX) (models.Sample, error) { +func (mxl *MaXList) RPush(sm *MaX) (Sample, error) { last := MaX{} bj, _ := json.Marshal(*sm) json.Unmarshal(bj, &sm) @@ -171,38 +168,3 @@ func (mxl *MaXList) RecursiveBubbleS(length int, ctype string) error { err := mxl.RecursiveBubbleS(length, ctype) return err } - -// TODO pixel -func (mxl *MaXList) MakePixelList(cr *core.Core, mx *MaX, score float64) (*core.PixelList, error) { - if len(mx.Data) == 2 { - err := errors.New("ma30 原始数据不足30条") - return nil, err - } - if mx.Data[2] != float64(30) { - err := errors.New("ma30 原始数据不足30条") - return nil, err - } - pxl := core.PixelList{ - Count: mxl.Count, - UpdateNickName: mxl.UpdateNickName, - LastUpdateTime: mxl.LastUpdateTime, - List: []*core.Pixel{}, - } - for i := 0; i < mxl.Count; i++ { - pix := core.Pixel{} - pxl.List = append(pxl.List, &pix) - } - ma30Val := (mx.Data[1]).(float64) - realLens := len(mxl.List) - cha := mxl.Count - realLens - // fmt.Println("mxl.Count: ", mxl.Count, "realLens: ", realLens) - for h := mxl.Count - 1; h-cha >= 0; h-- { - // Count 是希望值,比如24,realLens是实际值, 如果希望值和实际值相等,cha就是0 - cdLast := mxl.List[h-cha].Data[1] - pxl.List[h].Y = (cdLast.(float64) - ma30Val) / ma30Val / score - pxl.List[h].X = float64(h) - pxl.List[h].Score = cdLast.(float64) - pxl.List[h].TimeStamp = int64(mxl.List[h-cha].Data[0].(float64)) - } - return &pxl, nil -} diff --git a/internal/core/pixel.go b/pixel.go similarity index 100% rename from internal/core/pixel.go rename to pixel.go diff --git a/internal/models/plate.go b/plate.go similarity index 99% rename from internal/models/plate.go rename to plate.go index b84cb5f..178274b 100644 --- a/internal/models/plate.go +++ b/plate.go @@ -1,4 +1,4 @@ -package models +package core import ( "encoding/json" diff --git a/internal/analysis/rsi.go b/rsi.go similarity index 87% rename from internal/analysis/rsi.go rename to rsi.go index 5ab3f21..189ba55 100644 --- a/internal/analysis/rsi.go +++ b/rsi.go @@ -1,4 +1,4 @@ -package analysis +package core import ( // "crypto/sha256" @@ -15,15 +15,12 @@ import ( // simple "github.com/bitly/go-simplejson" // "github.com/go-redis/redis" // "github.com/phyer/texus/utils" - "github.com/phyer/core/internal/core" - - "github.com/phyer/core/internal/utils" logrus "github.com/sirupsen/logrus" ) type Rsi struct { Id string `json:"_id"` - core *core.Core + core *Core InstID string `json:"instID"` Period string `json:"period"` Timestamp time.Time `json:"timeStamp"` @@ -41,7 +38,7 @@ type RsiList struct { } type StockRsi struct { Id string `json:"_id"` - core *core.Core + core *Core InstID string `json:"instID"` Period string `json:"period"` Timestamp time.Time `json:"timeStamp"` @@ -59,7 +56,7 @@ type StockRsiList struct { List []*StockRsi `json:"list"` } -func (rsi *Rsi) PushToWriteLogChan(cr *core.Core) error { +func (rsi *Rsi) PushToWriteLogChan(cr *Core) error { did := rsi.InstID + rsi.Period + ToString(rsi.Ts) rsi.Id = HashString(did) cd, err := json.Marshal(rsi) @@ -74,9 +71,9 @@ func (rsi *Rsi) PushToWriteLogChan(cr *core.Core) error { cr.WriteLogChan <- &wg return nil } -func (srsi *StockRsi) PushToWriteLogChan(cr *core.Core) error { +func (srsi *StockRsi) PushToWriteLogChan(cr *Core) error { did := srsi.InstID + srsi.Period + ToString(srsi.Ts) - srsi.Id = util.HashString(did) + srsi.Id = HashString(did) cd, err := json.Marshal(srsi) if err != nil { logrus.Error("PushToWriteLog json marshal rsi err: ", err) diff --git a/ticker.go b/ticker.go new file mode 100644 index 0000000..67f3c85 --- /dev/null +++ b/ticker.go @@ -0,0 +1,84 @@ +package core + +import ( + "encoding/json" + "fmt" + "reflect" + "strconv" + "time" +) + +type TickerInfo struct { + Id string `json:"_id"` + InstID string `json:"instID"` + Last float64 `json:"last"` + LastUpdate time.Time `json:"lastUpdate"` + InstType string `json:"instType"` + VolCcy24h float64 `json:"volCcy24h"` + Ts int64 `json:"ts"` +} + +type TickerInfoResp struct { + InstID string `json:"instID"` + Last string `json:"last"` + InstType string `json:"instType"` + VolCcy24h string `json:"volCcy24h"` + Ts string `json:"ts"` +} + +func (tir *TickerInfoResp) Convert() TickerInfo { + ti := TickerInfo{ + Id: HashString(tir.InstID + tir.Ts), + InstID: tir.InstID, + InstType: tir.InstType, + Last: ToFloat64(tir.Last), + VolCcy24h: ToFloat64(tir.VolCcy24h), + Ts: ToInt64(tir.Ts), + LastUpdate: time.Now(), + } + return ti +} + +func ToString(val interface{}) string { + valstr := "" + if reflect.TypeOf(val).Name() == "string" { + valstr = val.(string) + } else if reflect.TypeOf(val).Name() == "float64" { + valstr = fmt.Sprintf("%f", val) + } else if reflect.TypeOf(val).Name() == "int64" { + valstr = strconv.FormatInt(val.(int64), 16) + } else if reflect.TypeOf(val).Name() == "int" { + valstr = fmt.Sprintf("%d", val) + } + return valstr +} + +func ToInt64(val interface{}) int64 { + vali := int64(0) + if reflect.TypeOf(val).Name() == "string" { + vali, _ = strconv.ParseInt(val.(string), 10, 64) + } else if reflect.TypeOf(val).Name() == "float64" { + vali = int64(val.(float64)) + } + return vali +} + +func ToFloat64(val interface{}) float64 { + valf := float64(0) + if reflect.TypeOf(val).Name() == "string" { + valf, _ = strconv.ParseFloat(val.(string), 64) + } else if reflect.TypeOf(val).Name() == "float64" { + valf = val.(float64) + } else if reflect.TypeOf(val).Name() == "int64" { + valf = float64(val.(int64)) + } + return valf +} + +// TODO 有待实现 +func (ti *TickerInfo) SetToKey(cr *Core) error { + js, _ := json.Marshal(*ti) + plateName := ti.InstID + "|tickerInfo" + _, err := cr.RedisLocalCli.Set(plateName, string(js), 0).Result() + return err +} diff --git a/internal/models/tray.go b/tray.go similarity index 50% rename from internal/models/tray.go rename to tray.go index b3f0188..923207c 100644 --- a/internal/models/tray.go +++ b/tray.go @@ -1,22 +1,22 @@ -package models +package core import ( "encoding/json" ) type Tray struct { - InstID string `json:"instId,string"` - Period string `json:"period,string"` - Count int `json:"count,number"` - Scale float64 `json:"scale,number"` - LastUpdateTime int64 `json:"lastUpdateTime,number"` - SeriesMap map[string]*Series `json:"seriesMap"` + InstID string `json:"instId,string"` + Period string `json:"period,string"` + Count int `json:"count,number"` + Scale float64 `json:"scale,number"` + LastUpdateTime int64 `json:"lastUpdateTime,number"` + // SeriesMap map[string]*Series `json:"seriesMap"` } type PixelSeries struct { Count int64 `json:"count"` Section int64 `json:"section"` - List []*analysis/Pixel `json:"list"` + List []*Pixel `json:"list"` } func (tr *Tray) Init(instId string) { @@ -43,18 +43,18 @@ func (tr *Tray) Analytics(cr *Core) { } // TODO 实例化一个series -func (tr *Tray) NewSeries(cr *Core, period string) (*Series, error) { - sr := Series{ - InstID: tr.InstID, - Period: period, - Count: tr.Count, - Scale: tr.Scale, - CandleSeries: &PixelList{}, - Ma7Series: &PixelList{}, - Ma30Series: &PixelList{}, - } - // 自我更新 - err := sr.Refresh(cr) - tr.SeriesMap["period"+period] = &sr - return &sr, err -} +// func (tr *Tray) NewSeries(cr *Core, period string) (*Series, error) { +// sr := Series{ +// InstID: tr.InstID, +// Period: period, +// Count: tr.Count, +// Scale: tr.Scale, +// CandleSeries: &PixelList{}, +// Ma7Series: &PixelList{}, +// Ma30Series: &PixelList{}, +// } +// // 自我更新 +// err := sr.Refresh(cr) +// tr.SeriesMap["period"+period] = &sr +// return &sr, err +// } diff --git a/internal/utils/util.go b/util.go similarity index 75% rename from internal/utils/util.go rename to util.go index 55abf73..41b02d9 100644 --- a/internal/utils/util.go +++ b/util.go @@ -1,8 +1,7 @@ -package utils +package core import ( "crypto/md5" - "crypto/sha256" "encoding/hex" "encoding/json" "errors" @@ -10,7 +9,6 @@ import ( logrus "github.com/sirupsen/logrus" "math" "math/rand" - "reflect" "runtime" "strconv" "time" @@ -209,47 +207,3 @@ func Md5V(str string) string { h.Write([]byte(str)) return hex.EncodeToString(h.Sum(nil)) } -func ToString(val interface{}) string { - valstr := "" - if reflect.TypeOf(val).Name() == "string" { - valstr = val.(string) - } else if reflect.TypeOf(val).Name() == "float64" { - valstr = fmt.Sprintf("%f", val) - } else if reflect.TypeOf(val).Name() == "int64" { - valstr = strconv.FormatInt(val.(int64), 16) - } else if reflect.TypeOf(val).Name() == "int" { - valstr = fmt.Sprintf("%d", val) - } - return valstr -} - -func ToInt64(val interface{}) int64 { - vali := int64(0) - if reflect.TypeOf(val).Name() == "string" { - vali, _ = strconv.ParseInt(val.(string), 10, 64) - } else if reflect.TypeOf(val).Name() == "float64" { - vali = int64(val.(float64)) - } - return vali -} - -func ToFloat64(val interface{}) float64 { - valf := float64(0) - if reflect.TypeOf(val).Name() == "string" { - valf, _ = strconv.ParseFloat(val.(string), 64) - } else if reflect.TypeOf(val).Name() == "float64" { - valf = val.(float64) - } else if reflect.TypeOf(val).Name() == "int64" { - valf = float64(val.(int64)) - } - return valf -} - -func HashString(input string) string { - // 计算SHA-256哈希值 - hash := sha256.Sum256([]byte(input)) - // 转换为十六进制字符串 - hashHex := hex.EncodeToString(hash[:]) - // 返回前20位 - return hashHex[:23] -} diff --git a/internal/logger/writeLog.go b/writeLog.go similarity index 83% rename from internal/logger/writeLog.go rename to writeLog.go index 71b57d4..fd049ef 100644 --- a/internal/logger/writeLog.go +++ b/writeLog.go @@ -1,4 +1,4 @@ -package logger +package core import ( "bytes" @@ -6,7 +6,6 @@ import ( "net/http" "os" - // "github.com/phyer/core/internal/core" logrus "github.com/sirupsen/logrus" ) @@ -16,7 +15,7 @@ type WriteLog struct { Id string } -func (wg *WriteLog) Process(cr *core.Core) error { +func (wg *WriteLog) Process(cr *Core) error { go func() { reqBody := bytes.NewBuffer(wg.Content) cr.Env = os.Getenv("GO_ENV")