Compare commits
8 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
35e8f47bbf | ||
![]() |
51f087589a | ||
![]() |
9a83607b2c | ||
![]() |
6debb547e1 | ||
![]() |
3132c2966d | ||
![]() |
6cb2bcb49b | ||
![]() |
7ce7deba92 | ||
![]() |
3f2c113e88 |
6
.vscode/settings.json
vendored
Normal file
6
.vscode/settings.json
vendored
Normal file
@ -0,0 +1,6 @@
|
||||
{
|
||||
"marscode.codeCompletionPro": {
|
||||
"enableCodeCompletionPro": false
|
||||
},
|
||||
"marscode.enableInlineCommand": true
|
||||
}
|
3
build.sh
Executable file
3
build.sh
Executable file
@ -0,0 +1,3 @@
|
||||
#!/bin/sh
|
||||
go build -ldflags "-s -w" -trimpath -o ./bin/core ./cmd/core
|
||||
rm bin/core
|
305
chat2.md
Normal file
305
chat2.md
Normal file
@ -0,0 +1,305 @@
|
||||
> 这里,我引入 github.com/phyer/core/service 为的是让 Candle struct里的 属性 core 得到正确的类型, 结果出现了 循环以来, 那么我该如何用刚才提到的依赖注入和接口隔离的方式来规避
|
||||
>
|
||||
|
||||
🤔 Thought content:
|
||||
|
||||
|
||||
首先,我得理解问题所在。循环依赖通常发生在两个或多个包相互引用的时候。比如,如果model包引用了service包,而service包又引用了model包,就会形成循环。用户的结构体Candle有一个core字段,类型是service.Core,这可能导致model包依赖service包,而如果service包也依赖model包,就会循环。
|
||||
|
||||
接下来,用户想通过依赖注入和接口隔离来解决。依赖注入的意思是,不直接持有具体类型的实例,而是通过接口来抽象,从而解耦。接口隔离则是定义小接口,只暴露必要的方法,减少依赖。
|
||||
|
||||
我需要先检查用户的代码。在提供的代码中,Candle结构体的core字段类型是*service.Core。但用户提到引入了"github.com/phyer/core/service",这可能是指service包中的Core结构体。如果model包(即当前文件所在的包)引用了service包,而service包又引用了model包,就会形成循环。
|
||||
|
||||
解决方案应该是将core字段的类型从具体的service.Core改为一个接口类型,该接口包含Candle所需的方法。这样,model包就不再直接依赖service包,而是依赖一个本地定义的接口。service包中的Core结构体可以实现这个接口,从而解除循环依赖。
|
||||
|
||||
具体步骤可能如下:
|
||||
|
||||
1. 在model包中定义一个接口,比如CoreService,包含Candle结构体需要使用的方法。例如,如果Candle的core字段需要调用Cfg、RedisLocalCli等方法,就在接口中声明这些方法。
|
||||
|
||||
2. 将Candle结构体中的core字段类型改为这个接口类型,比如core CoreService。
|
||||
|
||||
3. 在service包中,确保Core结构体实现了model.CoreService接口的所有方法。这样,当service包导入model包时,不会有循环,因为model包现在只依赖接口,而接口在model包内部定义。
|
||||
|
||||
4. 调整代码中所有使用core字段的地方,确保通过接口方法访问,而不是直接访问结构体字段。
|
||||
|
||||
例如,用户代码中Candle结构体的core字段是*service.Core类型,改为CoreService接口类型后,循环依赖就被打破了。因为model包现在只依赖自己定义的接口,而service包实现该接口,不需要反过来引用model包中的具体类型。
|
||||
|
||||
需要检查用户提供的代码中core字段被使用的地方。例如,在SaveCandle方法中,core.Cfg.Config.Get这样的调用,说明Core结构体需要有Cfg字段。因此在CoreService接口中,可能需要一个GetCfg()方法,返回配置对象。
|
||||
|
||||
这样,model包中的代码通过接口方法访问core的功能,而不直接依赖service包的具体实现。service包中的Core结构体实现这个接口,从而解除了循环依赖。
|
||||
|
||||
最后,用户需要修改Candle结构体的core字段类型,定义接口,并调整所有使用core的地方,改为调用接口方法。这样就能避免循环依赖的问题。我们可以通过以下步骤用接口隔离和依赖注入解决循环依赖问题:
|
||||
|
||||
1. 首先在model包中定义核心服务需要的接口
|
||||
2. 修改Candle结构体使用接口类型
|
||||
3. 保持service包实现这个接口
|
||||
|
||||
具体修改如下:
|
||||
|
||||
Filepath: model/candle.go
|
||||
Replace lines: 23-26
|
||||
```go
|
||||
// 定义核心服务接口
|
||||
type CoreService interface {
|
||||
GetCfg() *simple.Json
|
||||
GetRedisLocalCli() *redis.Client
|
||||
AddToGeneralCandleChnl(candle *Candle, channels []string)
|
||||
PeriodToMinutes(period string) (int64, error)
|
||||
// 其他需要的方法...
|
||||
}
|
||||
|
||||
type Candle struct {
|
||||
Id string `json:"_id"`
|
||||
core CoreService // 改为接口类型
|
||||
InstID string `json:"instID"`
|
||||
```
|
||||
|
||||
需要确保在service包中让Core结构体实现这个接口:
|
||||
|
||||
```go
|
||||
// 在service包中
|
||||
func (c *Core) GetCfg() *simple.Json {
|
||||
return c.Cfg
|
||||
}
|
||||
|
||||
func (c *Core) GetRedisLocalCli() *redis.Client {
|
||||
return c.RedisLocalCli
|
||||
}
|
||||
|
||||
func (c *Core) AddToGeneralCandleChnl(candle *model.Candle, channels []string) {
|
||||
// 原有实现...
|
||||
}
|
||||
|
||||
func (c *Core) PeriodToMinutes(period string) (int64, error) {
|
||||
// 原有实现...
|
||||
}
|
||||
```
|
||||
|
||||
这种修改的好处:
|
||||
1. 解除了model和service包的循环依赖
|
||||
2. model包只依赖自己定义的接口
|
||||
3. service包实现model定义的接口
|
||||
4. 更容易进行单元测试(可以mock核心服务)
|
||||
5. 明确声明了依赖关系和方法契约
|
||||
|
||||
需要同步修改所有使用core字段的地方,例如:
|
||||
Filepath: model/candle.go
|
||||
Replace lines: 0-0
|
||||
```go
|
||||
core.GetCfg().Get(...)
|
||||
```
|
||||
|
||||
这样的接口隔离配合依赖注入(在创建Candle时传入实现了CoreService的对象),可以有效解决循环依赖问题。
|
||||
|
||||
---
|
||||
|
||||
- Datetime: 2025-02-21 13:48:10
|
||||
|
||||
- Model: alidsr1/deepseek-r1
|
||||
|
||||
- Selected files:
|
||||
- model/candle.go
|
||||
|
||||
- Selected code:
|
||||
|
||||
```go
|
||||
|
||||
simple "github.com/bitly/go-simplejson"
|
||||
"github.com/go-redis/redis"
|
||||
"github.com/phyer/core/service"
|
||||
"github.com/phyer/texus/utils"
|
||||
logrus "github.com/sirupsen/logrus"
|
||||
)
|
||||
```
|
||||
|
||||
> 这个model包自己定义的接口能放到util包下吗, util包不依赖任何其他包,
|
||||
|
||||
🤔 Thought content:
|
||||
|
||||
> 首先,用户提供的代码中,model包定义了几个接口,比如CoreService、Sample、SampleList,以及结构体如Candle、CandleList等。这些接口可能被其他包或model内部的结构体和方法所依赖。
|
||||
|
||||
|
||||
用户的问题是将这些接口移到util包下是否可行,同时确保util包不依赖其他包。这意味着,我需要检查这些接口是否引用了model包或其他包的类型。如果接口中的方法参数或返回值使用了model或其他包的类型,那么将这些接口移到util包会导致util包依赖这些类型所在的包,从而违反util包不依赖其他包的条件。
|
||||
|
||||
查看代码中的CoreService接口,它的方法如GetCfg()返回了*simple.Json,而simple来自"github.com/bitly/go-simplejson",这属于外部依赖。同样,GetRedisLocalCli()返回*redis.Client,来自"github.com/go-redis/redis"。还有AddToGeneralCandleChnl方法参数中有*Candle,这是model包内的类型。因此,CoreService接口依赖了外部包和model包本身,无法移到util包,因为util不能有这些依赖。
|
||||
|
||||
接下来是Sample接口,SetToKey方法参数是*CoreService,同样CoreService在model中定义,并且CoreService本身依赖了外部包。所以Sample接口也不能移到util。
|
||||
|
||||
SampleList接口中的方法返回[]*Sample,而Sample是当前包的接口,同样涉及model包的依赖。
|
||||
|
||||
因此,结论是这些接口不能移到util包,因为它们引用了model包内或外部的类型,导致util包会产生依赖。需要保持接口在model包中,或者重新设计接口以避免依赖,但这可能影响现有功能。
|
||||
|
||||
最后,向用户解释这些分析,说明原因,并建议保持接口在model包中,或者考虑其他重构方式。通过分析代码,model包中定义的接口存在以下依赖问题,无法直接移动到独立的util包:
|
||||
|
||||
1. CoreService接口依赖了其他包的组件:
|
||||
```go
|
||||
type CoreService interface {
|
||||
GetCfg() *simple.Json // 依赖github.com/bitly/go-simplejson
|
||||
GetRedisLocalCli() *redis.Client // 依赖github.com/go-redis/redis
|
||||
AddToGeneralCandleChnl(candle *Candle, channels []string) // 依赖本包Candle类型
|
||||
//...
|
||||
}
|
||||
```
|
||||
|
||||
2. Sample接口也依赖了本包类型:
|
||||
```go
|
||||
type Sample interface {
|
||||
SetToKey(cr *CoreService) ([]interface{}, error) // 参数依赖本包CoreService
|
||||
}
|
||||
```
|
||||
|
||||
3. SampleList接口依赖本包类型:
|
||||
```go
|
||||
type SampleList interface {
|
||||
GetSectionOf(start int, end int) ([]*Sample, error) // 返回本包Sample类型
|
||||
}
|
||||
```
|
||||
|
||||
建议方案:
|
||||
1. 保持接口定义在model包中(当前最佳方案)
|
||||
2. 如果确实需要独立util包,需要解耦依赖:
|
||||
```go
|
||||
// 需要先定义基础接口(在util包)
|
||||
type BasicService interface {
|
||||
// 只包含最基础的方法,不依赖具体实现
|
||||
}
|
||||
|
||||
// 原model包中扩展接口
|
||||
type CoreService interface {
|
||||
util.BasicService // 组合基础接口
|
||||
// 保留原项目特定方法
|
||||
}
|
||||
```
|
||||
|
||||
需要我继续提供解耦方案的具体实现代码吗?这需要对现有代码结构进行较大调整。
|
||||
|
||||
---
|
||||
|
||||
- Datetime: 2025-02-21 14:06:06
|
||||
|
||||
- Model: alidsr1/deepseek-r1
|
||||
|
||||
- Selected files:
|
||||
- core/interfaces.go
|
||||
- config/config.go
|
||||
- config/const.go
|
||||
- data/coaster.go
|
||||
- data/pixel.go
|
||||
- data/plate.go
|
||||
- data/rsi.go
|
||||
- data/segmentItem.go
|
||||
- data/series.go
|
||||
- data/shearForceGrp.go
|
||||
- data/sorted.go
|
||||
- data/tray.go
|
||||
- model/candle.go
|
||||
- model/maX.go
|
||||
- model/ticker.go
|
||||
- service/service_context.go
|
||||
- util/util.go
|
||||
- util/writeLog.go
|
||||
|
||||
- Selected code:
|
||||
|
||||
```go
|
||||
// 领域服务接口
|
||||
type CandleManager interface {
|
||||
GetCandles(instID string, period string) ([]*Candle, error)
|
||||
SaveCandle(candle *Candle) error
|
||||
}
|
||||
|
||||
// 基础设施接口
|
||||
type RedisService interface {
|
||||
GetClient(options *redis.Options) (*redis.Client, error)
|
||||
Ping(client *redis.Client) error
|
||||
}
|
||||
```
|
||||
|
||||
> 那既然model模块中定义接口, 让 service模块里的 core的struct去实现这个接口, 那么 core/interfaces.go里的接口是做什么的
|
||||
>
|
||||
|
||||
🤔 Thought content:
|
||||
让 service模块里的 core的struct去实现这个接口, 那么 core/interfaces.go里的接口是做什么的”。首先,我需要理解用户的问题背景和他们可能存在的困惑。
|
||||
|
||||
用户提到在model模块中定义了接口,并让service模块的core结构体实现这些接口,然后问core/interfaces.go中的接口的作用。我需要先看看提供的代码内容,特别是interfaces.go文件中的内容。
|
||||
|
||||
查看用户提供的core/interfaces.go文件,里面定义了多个接口,如CandleManager、RedisService、HTTPRequester、EventPublisher等。这些接口似乎是领域服务和基础设施接口的抽象。例如,CandleManager定义了获取和保存蜡烛图数据的方法,而RedisService涉及Redis客户端的获取和健康检查。
|
||||
|
||||
用户可能混淆了不同层次接口的用途。model模块中的接口可能是领域模型相关的,而core/interfaces.go中的接口可能是更上层的应用服务或基础设施接口。需要明确两者在架构中的不同角色。
|
||||
|
||||
接下来,我应该解释接口的分层设计。在六边形架构或清洁架构中,通常会有领域层、应用层和基础设施层。领域层接口定义核心业务逻辑,应用层接口定义用例,基础设施层实现具体技术细节。core/interfaces.go中的接口可能属于应用层或基础设施层,而model中的接口属于领域层。
|
||||
|
||||
需要指出core/interfaces.go中的接口作用,例如CandleManager作为领域服务接口,由service层的Core结构体实现,这样可以将业务逻辑与技术实现分离,提高可测试性和可维护性。RedisService则是基础设施接口,由具体的Redis客户端实现,遵循依赖倒置原则,允许更换底层技术而不影响上层逻辑。
|
||||
|
||||
总结时,应强调core/interfaces.go中的接口定义了服务层和基础设施的契约,而model中的接口可能更偏向领域模型,两者各司其职,共同支持系统的松耦合设计。通过分析代码结构和架构设计,可以明确不同接口的职责定位:
|
||||
|
||||
1. `core/interfaces.go` 中的接口是领域层与基础设施层的契约
|
||||
```go
|
||||
// 领域服务接口(面向业务能力)
|
||||
type CandleManager interface {
|
||||
GetCandles(instID string, period string) ([]*Candle, error)
|
||||
SaveCandle(candle *Candle) error
|
||||
}
|
||||
|
||||
// 基础设施接口(面向技术实现)
|
||||
type RedisService interface {
|
||||
GetClient(options *redis.Options) (*redis.Client, error)
|
||||
Ping(client *redis.Client) error
|
||||
}
|
||||
```
|
||||
|
||||
2. 具体实现关系示意图
|
||||
```
|
||||
┌───────────────┐ ┌───────────────┐
|
||||
│ Domain │ │ Infrastructure│
|
||||
│ (model包) │<─依赖注入─┤ (service包) │
|
||||
└──────┬────────┘ └──────┬────────┘
|
||||
│ │
|
||||
│ core/interfaces.go │
|
||||
│ ┌───────────────────┐ │
|
||||
└─>│ CandleManager │ │
|
||||
│ HTTPRequester │<──┘
|
||||
└───────────────────┘
|
||||
```
|
||||
|
||||
3. 关键区别说明:
|
||||
|
||||
| 维度 | model包接口 | core/interfaces.go接口 |
|
||||
|--------------------|-------------------------------------|-----------------------------------|
|
||||
| **定位** | 领域模型内部契约 | 跨层协作的抽象边界 |
|
||||
| **使用者** | 领域对象之间相互调用 | 业务服务与基础设施实现的对接点 |
|
||||
| **变化频率** | 随业务需求变化 | 随系统架构演进变化 |
|
||||
| **实现位置** | 领域层内部实现 | 基础设施层(如redis/http实现) |
|
||||
| **示例** | 如Candle实体行为 | 如RedisService定义存储访问规范 |
|
||||
|
||||
4. 设计意图解析:
|
||||
- 控制反转(IoC):通过`core/interfaces.go`定义抽象,允许service层实现具体逻辑
|
||||
- 解耦分层:领域层不直接依赖具体redis/http实现,而是通过接口抽象
|
||||
- 可测试性:Mock这些接口可以方便进行单元测试
|
||||
- 可扩展性:更换存储方案只需实现相同接口,不影响业务逻辑
|
||||
|
||||
5. 典型工作流程示例:
|
||||
```go
|
||||
// 基础设施层实现接口
|
||||
type Core struct {
|
||||
redisService RedisService // 依赖接口
|
||||
}
|
||||
|
||||
func (c *Core) GetClient(opts *redis.Options) (*redis.Client, error) {
|
||||
// 具体实现redis连接逻辑
|
||||
}
|
||||
|
||||
// 领域层通过接口使用
|
||||
func ProcessCandle(cm CandleManager) {
|
||||
cm.SaveCandle(&Candle{...}) // 实际调用Core的实现
|
||||
}
|
||||
```
|
||||
|
||||
这种设计模式遵循了SOLID原则中的:
|
||||
- 依赖倒置原则(DIP)
|
||||
- 接口隔离原则(ISP)
|
||||
- 开闭原则(OCP)
|
||||
|
||||
通过这种分层,当需要替换存储方案(如从Redis改为MySQL)时,只需实现相同的RedisService接口,而无需修改业务逻辑代码。
|
||||
|
||||
|
36
core/interfaces.go
Normal file
36
core/interfaces.go
Normal file
@ -0,0 +1,36 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"github.com/go-redis/redis"
|
||||
"github.com/phyer/core/model"
|
||||
"time"
|
||||
)
|
||||
|
||||
// 领域服务接口
|
||||
type CandleManager interface {
|
||||
GetCandles(instID string, period string) ([]*model.Candle, error)
|
||||
SaveCandle(candle *model.Candle) error
|
||||
}
|
||||
|
||||
// 基础设施接口
|
||||
type RedisService interface {
|
||||
GetClient(options *redis.Options) (*redis.Client, error)
|
||||
Ping(client *redis.Client) error
|
||||
}
|
||||
|
||||
type HTTPRequester interface {
|
||||
Get(url string) ([]byte, error)
|
||||
Post(url string, body []byte) ([]byte, error)
|
||||
}
|
||||
|
||||
// 时间段相关服务接口
|
||||
type SegmentService interface {
|
||||
GetCurrentSegment(period string) *model.CandleSegment
|
||||
ParseSegmentTime(timeStr string) (time.Time, error)
|
||||
}
|
||||
|
||||
// 领域事件接口
|
||||
type EventPublisher interface {
|
||||
Publish(topic string, message interface{}) error
|
||||
Subscribe(topic string, handler func(message []byte)) error
|
||||
}
|
318
data/segmentItem.go
Normal file
318
data/segmentItem.go
Normal file
@ -0,0 +1,318 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
logrus "github.com/sirupsen/logrus"
|
||||
"phyer.click/sardine/utils"
|
||||
)
|
||||
|
||||
// 段对象是对某个线段的表现进行评估的一个手段, 整个段会被分成3个小段, 整个段,计算整体的,字段,各自计算。包含仰角,段内极值等。
|
||||
// SegmentItem 属于一阶分析结果
|
||||
// {DMD-USDT 5D ma30 1643240793839 0 1642867200000 0xc00835fd80 0xc001687600 NaN 23 23 []}]
|
||||
type SegmentItem struct {
|
||||
InstID string
|
||||
Period string //通过InstID,Periods可以定位到Series对象, 里面有一手数据
|
||||
Ctype string //candle|ma7|ma30
|
||||
ReportTime int64
|
||||
ReportTimeStr string
|
||||
PolarQuadrant string // shangxian,manyue,xiaxian,xinyue, 分别对应圆周的四个阶段。
|
||||
LastUpdate int64
|
||||
ExtremumPixels *Extremum // 极值 是两个pixel对象
|
||||
FirstPixel *Pixel // 起始值,最后的pixel对象
|
||||
LastPixel *Pixel // 最后值,最后的maX pixel对象
|
||||
LastCandle *Pixel // 最后值,最后的Candle的pixel对象
|
||||
LastMa7 *Pixel // 最后值,最后的Ma7的pixel对象
|
||||
LastMa30 *Pixel // 最后值,最后的Ma30的pixel对象
|
||||
VerticalElevation float64 // 仰角, Interval范围内线段的仰角
|
||||
StartIdx int // 开始的坐标
|
||||
EndIdx int // 结束的坐标
|
||||
SubItemList []SegmentItem //往下一级微分
|
||||
}
|
||||
|
||||
const DAMANYUE = "damanyue"
|
||||
const DAMANYUE_POST = "damanyue_post"
|
||||
const DAMANYUE_PRE = "damanyue_pre"
|
||||
const XIAOMANYUE = "xiaomanyue"
|
||||
const XIAOMANYUE_POST = "xiaomanyue_post"
|
||||
const XIAOMANYUE_PRE = "xiaomanyue_pre"
|
||||
const DAXINYUE = "daxinyue"
|
||||
const DAXINYUE_POST = "daxinyue_post"
|
||||
const DAXINYUE_PRE = "daxinyue_pre"
|
||||
const XIAOXINYUE = "xiaoxinyue"
|
||||
const XIAOXINYUE_POST = "xiaoxinyue_post"
|
||||
const XIAOXINYUE_PRE = "xiaoxinyue_pre"
|
||||
const DASHANGXIANYUE = "dashangxianyue"
|
||||
const XIAOSHANGXIANYUE = "xiaoshangxianyue"
|
||||
const DAXIAXIANYUE = "daxiaxianyue"
|
||||
const XIAOXIAXIANYUE = "xiaoxiaxianyue"
|
||||
|
||||
const tinySeg = 0.1
|
||||
|
||||
type Extremum struct {
|
||||
Max *Pixel
|
||||
Min *Pixel
|
||||
}
|
||||
|
||||
func CalPolar(e0 float64, e1 float64, e2 float64) string {
|
||||
polarQuadrant := "default"
|
||||
//
|
||||
// ## 上弦月
|
||||
//
|
||||
// e0,e1,e2: -3.5315694477826525 -0.5773082714100172 1.0558744145515746
|
||||
if e2 >= e1 && e2 >= 0 {
|
||||
polarQuadrant = XIAOSHANGXIANYUE
|
||||
}
|
||||
// e2 > e1 > 0: 小上弦月
|
||||
// -> e1 > e0 > 0 : 大上弦月
|
||||
if e2 >= e1 && e1 >= 0 {
|
||||
polarQuadrant = XIAOSHANGXIANYUE
|
||||
if e1 >= e0 && e0 >= 0 {
|
||||
polarQuadrant = DASHANGXIANYUE
|
||||
}
|
||||
}
|
||||
//
|
||||
// ## 下弦月
|
||||
// 0 > e1 > e2:小下弦月
|
||||
// -> 0 > e0 > e1: 大下弦月
|
||||
//
|
||||
if 0 >= e1 && e1 >= e2 {
|
||||
polarQuadrant = XIAOXIAXIANYUE
|
||||
if 0 >= e0 && e0 >= e1 {
|
||||
polarQuadrant = DAXIAXIANYUE
|
||||
}
|
||||
}
|
||||
|
||||
// ## 同上
|
||||
if (0 >= e2 && 0 >= e1) && e2 >= e1 {
|
||||
polarQuadrant = XIAOXIAXIANYUE
|
||||
}
|
||||
// ##
|
||||
// ## 满月
|
||||
//
|
||||
// e1 > e2 > 0 : 小满月 pre
|
||||
// -> e0 > e1 : 大满月pre
|
||||
//
|
||||
|
||||
if e1 >= e2 && e2 >= 0 {
|
||||
polarQuadrant = XIAOMANYUE_PRE
|
||||
if e0 > e1 {
|
||||
polarQuadrant = DAMANYUE_PRE
|
||||
}
|
||||
}
|
||||
// e1 > 0.1 > e2 > 0 : 小满月
|
||||
// -> e0 > e1 : 大满月
|
||||
//
|
||||
|
||||
if e1 >= tinySeg && tinySeg >= e2 && e2 >= 0 {
|
||||
polarQuadrant = XIAOMANYUE
|
||||
if e0 > e1 {
|
||||
polarQuadrant = DAMANYUE
|
||||
}
|
||||
}
|
||||
|
||||
// e0,e1,e2: 0.9699903789854316 0.1802190672652184 -1.7888783234326784
|
||||
// e1 > 0 > e2 > -0.1 : 小满月post
|
||||
// -> e0 > e1 > 0 : 大满月post
|
||||
//
|
||||
if e1 >= 0 && 0 >= e2 && e2 >= -100000 {
|
||||
polarQuadrant = XIAOMANYUE_POST
|
||||
if e0 > e1 {
|
||||
polarQuadrant = DAMANYUE_POST
|
||||
}
|
||||
}
|
||||
// e0,e1,e2: -0.049579775302532776 0 -0.018291567587323976
|
||||
// ## 新月
|
||||
// e1 < e2 <0: 小新月pre
|
||||
// -> e1 > e0 : 大新月pre
|
||||
//
|
||||
if e1 <= e2 && e2 <= 0 && e2 >= -1*tinySeg {
|
||||
polarQuadrant = XIAOXINYUE_PRE
|
||||
if e1 > e0 {
|
||||
polarQuadrant = DAXINYUE_PRE
|
||||
}
|
||||
}
|
||||
// e1 < -0.1 < e2 <0 小新月
|
||||
// -> e1 > e0 : 大新月
|
||||
if e1 <= -1*tinySeg && -1*tinySeg <= e2 && e2 <= 0 {
|
||||
polarQuadrant = XIAOXINYUE
|
||||
if e1 > e0 {
|
||||
polarQuadrant = DAXINYUE
|
||||
}
|
||||
}
|
||||
//
|
||||
// e1 < 0 < e2 < 0.1 小新月post
|
||||
// -> e1 > e0 : 大新月post
|
||||
//e0,e1,e2: -0.03902244287114438 -0.13929829606729519 0.14828528291036536
|
||||
if e1 <= 0 && 0 <= e2 && e2 <= 1000000 {
|
||||
polarQuadrant = XIAOXINYUE_POST
|
||||
if e1 > e0 {
|
||||
polarQuadrant = DAXINYUE_POST
|
||||
}
|
||||
}
|
||||
|
||||
return polarQuadrant
|
||||
}
|
||||
|
||||
// 计算当前某段的曲线正弦所处极坐标象限
|
||||
func CalPolarQuadrant(maXSeg *SegmentItem) string {
|
||||
if len(maXSeg.SubItemList) == 0 {
|
||||
return "subItem no polarQuadrant"
|
||||
}
|
||||
m0 := maXSeg.SubItemList[0]
|
||||
m1 := maXSeg.SubItemList[1]
|
||||
m2 := maXSeg.SubItemList[2]
|
||||
e0 := m0.VerticalElevation
|
||||
e1 := m1.VerticalElevation
|
||||
e2 := m2.VerticalElevation
|
||||
polarQuadrant := CalPolar(e0, e1, e2)
|
||||
if polarQuadrant == "default" {
|
||||
env := os.Getenv("GO_ENV")
|
||||
if env != "production" {
|
||||
fmt.Println(utils.GetFuncName(), " instId:", maXSeg.InstID, " period:", maXSeg.Period, " ctype", maXSeg.Ctype, " e0,e1,e2:", e0, e1, e2)
|
||||
}
|
||||
}
|
||||
|
||||
return polarQuadrant
|
||||
}
|
||||
|
||||
func (seg *SegmentItem) SetToKey(cr *Core) error {
|
||||
if seg.InstID == "USDC-USDT" {
|
||||
return nil
|
||||
}
|
||||
keyName := seg.InstID + "|" + seg.Period + "|" + seg.Ctype + "|segmentItem"
|
||||
bj, err := json.Marshal(seg)
|
||||
if err != nil {
|
||||
logrus.Warn("se.MakeSegment: ", err, seg)
|
||||
}
|
||||
cr.RedisLocalCli.Set(keyName, string(bj), 0)
|
||||
|
||||
sf7 := float64(0)
|
||||
sf7 = seg.LastCandle.Y - seg.LastMa7.Y
|
||||
sf30 := float64(0)
|
||||
sf30 = seg.LastCandle.Y
|
||||
tms := time.Now().Format("2006-01-02 15:04:05.000")
|
||||
// fmt.Println("tms: ", seg.InstID, seg.Period, tms, seg.LastUpdate)
|
||||
she := ShearItem{
|
||||
LastUpdate: time.Now().UnixMilli(),
|
||||
LastUpdateTime: tms,
|
||||
VerticalElevation: seg.SubItemList[2].VerticalElevation,
|
||||
Ratio: seg.LastCandle.Y / seg.SubItemList[2].VerticalElevation,
|
||||
Score: seg.LastCandle.Score,
|
||||
PolarQuadrant: seg.PolarQuadrant,
|
||||
}
|
||||
|
||||
if seg.Ctype == "ma7" {
|
||||
she.ShearForce = sf7
|
||||
}
|
||||
|
||||
if seg.Ctype == "ma30" {
|
||||
she.ShearForce = sf30
|
||||
}
|
||||
sbj, _ := json.Marshal(she)
|
||||
keyName = seg.InstID + "|" + seg.Period + "|" + seg.Ctype + "|shearItem"
|
||||
cr.RedisLocalCli.Set(keyName, string(sbj), 3*time.Minute)
|
||||
cr.RedisLocal2Cli.Set(keyName, string(sbj), 3*time.Minute)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (seg *SegmentItem) Show() error {
|
||||
if seg.InstID == "USDC-USDT" {
|
||||
return nil
|
||||
}
|
||||
bj, _ := json.Marshal(*seg)
|
||||
logrus.Warn("SegmentItem Show:", string(bj))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (jgm *SegmentItem) Report(cr *Core) error {
|
||||
return nil
|
||||
}
|
||||
func (seg *SegmentItem) Process(cr *Core) {
|
||||
go func() {
|
||||
if seg == nil {
|
||||
return
|
||||
}
|
||||
seg.Show()
|
||||
seg.SetToKey(cr)
|
||||
// sheGrp, err := seg.MakeShearForceGrp(cr)
|
||||
// if err != nil {
|
||||
// log.Panic(err)
|
||||
// }
|
||||
// 当最后一个维度数据更新后,触发显示和备份
|
||||
|
||||
// 空的就可以
|
||||
shg := ShearForceGrp{
|
||||
InstID: seg.InstID,
|
||||
Ma30PeriodGroup: map[string]ShearItem{},
|
||||
Ma7PeriodGroup: map[string]ShearItem{},
|
||||
}
|
||||
if seg.Period == "4H" {
|
||||
time.Sleep(50 * time.Millisecond) //等可能存在的5D也ready
|
||||
go func() {
|
||||
cr.ShearForceGrpChan <- &shg
|
||||
}()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (srs *Series) MakeSegment(cr *Core, start int, end int, subArys [][]int, ctype string) *SegmentItem {
|
||||
list := []*Pixel{}
|
||||
if ctype == "ma7" {
|
||||
list = srs.Ma7Series.List
|
||||
}
|
||||
if ctype == "ma30" {
|
||||
list = srs.Ma30Series.List
|
||||
}
|
||||
st := start
|
||||
if len(list) == 0 {
|
||||
return nil
|
||||
}
|
||||
for i := start; i <= end; i++ {
|
||||
if list[i].X == 0 && list[i].Y == 0 {
|
||||
if i+1 < len(list) {
|
||||
st = i + 1
|
||||
} else {
|
||||
logrus.Panic(utils.GetFuncName(), "没有符合的记录")
|
||||
}
|
||||
}
|
||||
}
|
||||
extra, _ := srs.GetExtremum(cr, st, end, ctype)
|
||||
yj, err := srs.GetElevation(cr, ctype, st, end)
|
||||
if err != nil {
|
||||
fmt.Println("MakeSegment GetElevation err : ", err)
|
||||
}
|
||||
tm := time.Now()
|
||||
seg := SegmentItem{
|
||||
InstID: srs.InstID,
|
||||
Period: srs.Period,
|
||||
ReportTime: tm.UnixMilli(),
|
||||
ReportTimeStr: tm.Format("2006-01-02 15:04:05.000"),
|
||||
LastUpdate: srs.LastUpdateTime,
|
||||
FirstPixel: list[st],
|
||||
LastPixel: list[end],
|
||||
ExtremumPixels: extra,
|
||||
Ctype: ctype,
|
||||
VerticalElevation: yj,
|
||||
StartIdx: st,
|
||||
EndIdx: end,
|
||||
LastCandle: srs.CandleSeries.List[end],
|
||||
LastMa7: srs.Ma7Series.List[end],
|
||||
SubItemList: []SegmentItem{},
|
||||
PolarQuadrant: "none",
|
||||
}
|
||||
|
||||
if len(subArys) > 0 {
|
||||
for _, pair := range subArys {
|
||||
sub := [][]int{}
|
||||
curSeg := srs.MakeSegment(cr, pair[0], pair[1], sub, ctype)
|
||||
seg.SubItemList = append(seg.SubItemList, *curSeg)
|
||||
}
|
||||
}
|
||||
polar := CalPolarQuadrant(&seg)
|
||||
seg.PolarQuadrant = polar
|
||||
return &seg
|
||||
}
|
566
data/series.go
Normal file
566
data/series.go
Normal file
@ -0,0 +1,566 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis"
|
||||
logrus "github.com/sirupsen/logrus"
|
||||
// "phyer.click/sardine/utils"
|
||||
)
|
||||
|
||||
type Series struct {
|
||||
InstID string `json:"instID"`
|
||||
Period string `json:"Period"`
|
||||
Count int `json:"count,number"`
|
||||
Scale float64 `json:"scale,number"`
|
||||
LastUpdateTime int64 `json:"lastUpdateTime,number"`
|
||||
UpdateNickName string
|
||||
LastCandle1m Candle `json:"lastCandle1m"`
|
||||
CandleSeries *PixelList `json:"candleSerie"`
|
||||
Ma7Series *PixelList `json:"ma7Serie"`
|
||||
Ma30Series *PixelList `json:"ma30Serie"`
|
||||
}
|
||||
|
||||
type SeriesInfo struct {
|
||||
InstID string `json:"instID"`
|
||||
Period string `json:"period"`
|
||||
InsertedNew bool `json:"insertedNew,bool"`
|
||||
Score float64 `json:"score,number"`
|
||||
}
|
||||
type SeriesInfoScore struct {
|
||||
InstID string `json:"instID"`
|
||||
Score float64 `json:"score,number"`
|
||||
}
|
||||
|
||||
// TODO
|
||||
// redis key: verticalReportItem|BTC-USDT|4H-15m|ts:1643002300000
|
||||
// sortedSet: verticalLimit|2D-4H|rank|sortedSet
|
||||
type VerticalReportItem struct {
|
||||
InstID string
|
||||
Period string
|
||||
ReportTime int64
|
||||
LastUpdate int64
|
||||
LastUpdateTime string
|
||||
Interval int
|
||||
TrigerValue float64
|
||||
AdvUpSellPrice float64
|
||||
AdvDownSellPrice float64
|
||||
Rank float64
|
||||
ShearForce float64
|
||||
VerticalElevation float64
|
||||
SecondPeriod string
|
||||
}
|
||||
|
||||
// type Segment struct {
|
||||
// IndextStart int
|
||||
// IndexEnd int
|
||||
//
|
||||
// }
|
||||
|
||||
// 根据instId 和period 从 PlateMap里拿到coaster,创建对应的 series,
|
||||
func (sr *Series) Refresh(cr *Core) error {
|
||||
curCo, err := cr.GetCoasterFromPlate(sr.InstID, sr.Period)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ma30List := curCo.Ma30List.List
|
||||
ma30len := len(ma30List)
|
||||
if ma30len == 0 {
|
||||
err = errors.New("ma30List is empty:" + sr.InstID + "," + sr.Period)
|
||||
return err
|
||||
}
|
||||
baseMaX := ma30List[ma30len-1]
|
||||
|
||||
ma30Pxl, err := curCo.Ma30List.MakePixelList(cr, baseMaX, sr.Scale)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sr.Ma30Series = ma30Pxl
|
||||
ma7Pxl, err := curCo.Ma7List.MakePixelList(cr, baseMaX, sr.Scale)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sr.Ma7Series = ma7Pxl
|
||||
curCo.CandleList.RecursiveBubbleS(len(curCo.CandleList.List), "asc")
|
||||
candlePxl, err := curCo.CandleList.MakePixelList(cr, baseMaX, sr.Scale)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sr.CandleSeries = candlePxl
|
||||
// bj, _ := json.Marshal(sr.Ma30Series)
|
||||
// fmt.Println("sr.Ma30Series:", sr.Period, sr.InstID, string(bj))
|
||||
sr.LastUpdateTime = sr.Ma30Series.LastUpdateTime
|
||||
// fmt.Println("candlePxl: ", candlePxl)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sr *Series) SetToKey(cr *Core) (string, error) {
|
||||
if sr == nil || sr.CandleSeries == nil {
|
||||
return "", errors.New("sr.CandlesSeries == nil")
|
||||
}
|
||||
sr.CandleSeries.RecursiveBubbleS(sr.CandleSeries.Count, "asc")
|
||||
sr.CandleSeries.ReIndex()
|
||||
sr.CandleSeries.RecursiveBubbleS(sr.CandleSeries.Count, "asc")
|
||||
// sr.CandleSeries.RecursiveBubbleX(sr.CandleSeries.Count, "asc")
|
||||
sr.Ma7Series.RecursiveBubbleS(sr.CandleSeries.Count, "asc")
|
||||
// sr.Ma7Series.RecursiveBubbleX(sr.CandleSeries.Count, "asc")
|
||||
sr.Ma30Series.RecursiveBubbleS(sr.CandleSeries.Count, "asc")
|
||||
// sr.Ma30Series.RecursiveBubbleX(sr.CandleSeries.Count, "asc")
|
||||
now := time.Now().UnixMilli()
|
||||
sr.LastUpdateTime = now
|
||||
sr.CandleSeries.LastUpdateTime = now
|
||||
sr.CandleSeries.UpdateNickName = utils.GetRandomString(12)
|
||||
sr.UpdateNickName = utils.GetRandomString(12)
|
||||
js, _ := json.Marshal(*sr)
|
||||
seriesName := sr.InstID + "|" + sr.Period + "|series"
|
||||
res, err := cr.RedisLocalCli.Set(seriesName, string(js), 0).Result()
|
||||
if err != nil {
|
||||
logrus.Panic(utils.GetFuncName(), err, " seriesSetToKey1: instId:", sr.InstID, " period: ", sr.Period, " lastUpdate:", sr.LastUpdateTime, " md5:", utils.Md5V(string(js)))
|
||||
}
|
||||
res, err = cr.RedisLocal2Cli.Set(seriesName, string(js), 0).Result()
|
||||
return res, err
|
||||
}
|
||||
func PrintSerieY(cr *Core, list []redis.Z, period string, count int) {
|
||||
// fmt.Println("PrintSerieY start")
|
||||
env := os.Getenv("GO_ENV")
|
||||
isProduction := env == "production"
|
||||
//TODO 只有非产线环境,才会显示此列表
|
||||
if !isProduction {
|
||||
fmt.Println("seriesYTop count:", count, "period:", period, "sort start")
|
||||
}
|
||||
seiScrList := []*SeriesInfoScore{}
|
||||
for _, v := range list {
|
||||
sei := SeriesInfo{}
|
||||
seiScr := SeriesInfoScore{}
|
||||
json.Unmarshal([]byte(v.Member.(string)), &sei)
|
||||
seiScr.InstID = sei.InstID
|
||||
seiScr.Score = v.Score
|
||||
seiScrList = append(seiScrList, &seiScr)
|
||||
|
||||
// if k < count {
|
||||
// if !isProduction {
|
||||
// fmt.Println("seriesYTop", count, "No.", k+1, "period"+period, "InstID:", sei.InstID, "score:", v.Score)
|
||||
// }
|
||||
// 拉扯极限报告
|
||||
// }
|
||||
// if k == count+1 {
|
||||
// if !isProduction {
|
||||
// fmt.Println("seriesYTop end -------" + "period" + period + "-------------------------------------")
|
||||
// fmt.Println("seriesYLast start -------" + "period" + period + "-------------------------------------")
|
||||
// }
|
||||
// }
|
||||
// if k > len(list)-count-1 {
|
||||
// if !isProduction {
|
||||
// fmt.Println("seriesYLast", count, "No.", k+1, "period"+period, "InstID:", sei.InstID, "score:", v.Score)
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
bj, _ := json.Marshal(seiScrList)
|
||||
reqBody := bytes.NewBuffer(bj)
|
||||
cr.Env = os.Getenv("GO_ENV")
|
||||
cr.FluentBitUrl = os.Getenv("SARDINE_FluentBitUrl")
|
||||
fullUrl := "http://" + cr.FluentBitUrl + "/seriesY." + period
|
||||
|
||||
res, err := http.Post(fullUrl, "application/json", reqBody)
|
||||
fmt.Println("requested, response:", fullUrl, reqBody, res)
|
||||
if err != nil {
|
||||
logrus.Error(err)
|
||||
}
|
||||
|
||||
if !isProduction {
|
||||
fmt.Println("seriesYLast count:", count, "period:", period, "sort end")
|
||||
}
|
||||
}
|
||||
|
||||
func (sei *SeriesInfo) Process(cr *Core) {
|
||||
curSe, err := cr.GetPixelSeries(sei.InstID, sei.Period)
|
||||
if err != nil {
|
||||
logrus.Warn("GetPixelSeries: ", err)
|
||||
return
|
||||
}
|
||||
|
||||
// TODO 金拱门
|
||||
// list := cr.GetMyCcyBalanceName()
|
||||
go func(se Series) {
|
||||
|
||||
threeSeg := [][]int{[]int{0, 19}, []int{19, 22}, []int{22, 23}}
|
||||
|
||||
ma7Seg := se.MakeSegment(cr, 0, 23, threeSeg, "ma7")
|
||||
go func() {
|
||||
cr.SegmentItemChan <- ma7Seg
|
||||
}()
|
||||
|
||||
ma30Seg := se.MakeSegment(cr, 0, 23, threeSeg, "ma30")
|
||||
|
||||
go func() {
|
||||
cr.SegmentItemChan <- ma30Seg
|
||||
}()
|
||||
|
||||
}(curSe)
|
||||
|
||||
cli := cr.RedisLocalCli
|
||||
go func(se Series) {
|
||||
// 拉扯极限报告
|
||||
willReport := os.Getenv("SARDINE_SERIESTOREPORT") == "true"
|
||||
logrus.Info("willReport:", willReport)
|
||||
// fmt.Println("willReport:", willReport)
|
||||
if !willReport {
|
||||
return
|
||||
}
|
||||
err = curSe.AddToYSorted(cr)
|
||||
if err != nil {
|
||||
logrus.Warn("sei addToYSorted err: ", err)
|
||||
return
|
||||
}
|
||||
// 所有维度拉扯极限
|
||||
go func(se Series) {
|
||||
if se.InstID != "BTC-USDT" {
|
||||
return
|
||||
}
|
||||
list, err := cli.ZRevRangeWithScores("series|YValue|sortedSet|period"+se.Period, 0, -1).Result()
|
||||
if err != nil {
|
||||
fmt.Println("series sorted err", err)
|
||||
}
|
||||
PrintSerieY(cr, list, se.Period, 20)
|
||||
}(se)
|
||||
|
||||
}(curSe)
|
||||
// TODO 刘海儿检测, 监测金拱门中的刘海儿,预警下跌趋势, 其实有没有金拱门并不重要,刘海儿比金拱门更有说服力
|
||||
go func(se Series) {
|
||||
// 如何定义刘海:目前定义如下,3m以上的周期时,当7个或小于7个周期内的时间内发生了一次下坠和一次上升,下坠幅度达到2%以上,并随后的上升中收复了下坠的幅度,那么疑似刘海儿发生。用的周期越少,越强烈,探底和抬升的幅度越大越强烈,所处的维度越高越强烈,比如15m的没有1H的强烈
|
||||
// 如果发生在BTC身上,那么将影响所有
|
||||
// se.CheckLiuhai() {
|
||||
//
|
||||
// }
|
||||
|
||||
}(curSe)
|
||||
go func(se Series) {
|
||||
allow := os.Getenv("SARDINE_SERIESINFOTOCHNL") == "true"
|
||||
if !allow {
|
||||
return
|
||||
}
|
||||
time.Sleep(0 * time.Second)
|
||||
sei := SeriesInfo{
|
||||
InstID: curSe.InstID,
|
||||
Period: curSe.Period,
|
||||
}
|
||||
cr.AddToGeneralSeriesInfoChnl(&sei)
|
||||
}(curSe)
|
||||
|
||||
}
|
||||
|
||||
//-------------------------------------------------------------------------------
|
||||
|
||||
// 拉扯极限相关: 加入seriesY值排行榜, 用于生成拉扯极限
|
||||
func (srs *Series) AddToYSorted(cr *Core) error {
|
||||
setName := "series|YValue|sortedSet|period" + srs.Period
|
||||
srs.CandleSeries.RecursiveBubbleS(srs.CandleSeries.Count, "asc")
|
||||
length := len(srs.CandleSeries.List)
|
||||
if length != srs.Count {
|
||||
err := errors.New("AddToYSorted err: 数据量不够")
|
||||
return err
|
||||
}
|
||||
lastCandlePixel1 := srs.CandleSeries.List[srs.Count-1]
|
||||
sei := SeriesInfo{
|
||||
InstID: srs.InstID,
|
||||
Period: srs.Period,
|
||||
}
|
||||
bj, _ := json.Marshal(sei)
|
||||
// TODO -200 是个无效的值,如果遇到-200就赋予0值,这个办法不好,后面考虑不用sortedSet,而用自定义对象更好些。
|
||||
if lastCandlePixel1.Y == -200 {
|
||||
lastCandlePixel1.Y = 0
|
||||
}
|
||||
z := redis.Z{
|
||||
Score: float64(lastCandlePixel1.Y),
|
||||
Member: string(bj),
|
||||
}
|
||||
// TODO ZAdd 有可能由于bug或者key不一样的原因,让列表变长,需要想办法怎么定期请空
|
||||
if lastCandlePixel1.Score != 0 {
|
||||
cr.RedisLocalCli.ZAdd(setName, z).Result()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 垂直极限排名有一定片面性。暂时先不开放。垂直极限推荐最高的,可能是个不太容易📈上来的股票,甚至垃圾股,而且过一会儿可能跌的更多,所以就算使用这个功能,也仅供参考,
|
||||
func (vir *VerticalReportItem) AddToVeriticalLimitSorted(cr *Core, srs *Series, period2 string) error {
|
||||
// redis key: verticalReportItem|BTC-USDT|4H-15m|ts:1643002300000
|
||||
// sortedSet: verticalLimit|2D-4H|rank|sortedSet
|
||||
|
||||
setName := "verticalLimit|" + srs.Period + "-" + period2 + "|rank|sortedSet"
|
||||
tms := strconv.FormatInt(srs.LastUpdateTime, 10)
|
||||
keyName := "verticalLimit|" + srs.InstID + "|" + srs.Period + "-" + period2 + "|ts:" + tms
|
||||
z := redis.Z{
|
||||
Score: float64(srs.LastUpdateTime),
|
||||
Member: keyName,
|
||||
}
|
||||
if vir.Rank != -1 && vir.Rank != 0 {
|
||||
extt := 48 * time.Hour
|
||||
ot := time.Now().Add(extt * -1)
|
||||
oti := ot.UnixMilli()
|
||||
count, _ := cr.RedisLocalCli.ZRemRangeByScore(setName, "0", strconv.FormatInt(oti, 10)).Result()
|
||||
if count > 0 {
|
||||
logrus.Warning("移出过期的引用数量:", setName, count, "ZRemRangeByScore ", setName, 0, strconv.FormatInt(oti, 10))
|
||||
}
|
||||
cr.RedisLocalCli.ZAdd(setName, z).Result()
|
||||
bj, _ := json.Marshal(vir)
|
||||
cr.RedisLocalCli.Set(keyName, bj, 48*time.Hour).Result()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (vri *VerticalReportItem) Report(cr *Core) error {
|
||||
dd := DingdingMsg{
|
||||
Topic: "垂直极限触发",
|
||||
RobotName: "pengpeng",
|
||||
AtAll: true,
|
||||
Ctype: "markdown",
|
||||
Content: "",
|
||||
}
|
||||
ary1 := []string{}
|
||||
str := "``币名: ``" + vri.InstID + "\n"
|
||||
str1 := fmt.Sprintln("``基础维度:``", vri.Period)
|
||||
str2 := fmt.Sprintln("``剪切维度:``", vri.SecondPeriod)
|
||||
str21 := fmt.Sprintln("``观察周期:``", vri.Interval)
|
||||
str3 := fmt.Sprintln("``平均仰角:``", vri.VerticalElevation)
|
||||
str4 := fmt.Sprintln("``剪切力:``", vri.ShearForce)
|
||||
str5 := fmt.Sprintln("``Rank:``", vri.Rank)
|
||||
score := vri.TrigerValue
|
||||
str6 := fmt.Sprintln("``触发买入价位:``", score)
|
||||
str7 := fmt.Sprintln("``建议止盈价位:``", vri.AdvUpSellPrice)
|
||||
str8 := fmt.Sprintln("``建议止损价位:``", vri.AdvDownSellPrice)
|
||||
str9 := "----------------------\n"
|
||||
ary1 = append(ary1, str, str1, str2, str21, str3, str4, str5, str6, str7, str8, str9)
|
||||
dd.AddItemListGrp("垂直极限", 2, ary1)
|
||||
ary2 := []string{}
|
||||
|
||||
tm := time.Now().Format("01-02:15:04")
|
||||
rtime := fmt.Sprintln("``报告时间:``", tm)
|
||||
ctype := fmt.Sprintln("``类型:``", "极限触发,已弃用")
|
||||
from := "来自: " + os.Getenv("HOSTNAME")
|
||||
ary2 = append(ary2, rtime, ctype, from)
|
||||
dd.AddItemListGrp("", 2, ary2)
|
||||
dd.PostToRobot("pengpeng", cr)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (vri *VerticalReportItem) Show(cr *Core) error {
|
||||
ary1 := []string{}
|
||||
str := "``币名: ``" + vri.InstID + "\n"
|
||||
str1 := fmt.Sprintln("``基础维度:``", vri.Period)
|
||||
str2 := fmt.Sprintln("``剪切维度:``", vri.SecondPeriod)
|
||||
str21 := fmt.Sprintln("``观察周期:``", vri.Interval)
|
||||
str3 := fmt.Sprintln("``平均仰角:``", vri.VerticalElevation)
|
||||
str4 := fmt.Sprintln("``剪切力:``", vri.ShearForce)
|
||||
str5 := fmt.Sprintln("``Rank:``", vri.Rank)
|
||||
score := vri.TrigerValue
|
||||
str6 := fmt.Sprintln("``触发买入价位:``", score)
|
||||
str7 := fmt.Sprintln("``建议止盈价位:``", vri.AdvUpSellPrice)
|
||||
str8 := fmt.Sprintln("``建议止损价位:``", vri.AdvDownSellPrice)
|
||||
str9 := "----------------------\n"
|
||||
ary1 = append(ary1, str, str1, str2, str21, str3, str4, str5, str6, str7, str8, str9)
|
||||
for _, v := range ary1 {
|
||||
fmt.Println("verticalReportItem: ", v)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO 求某个PixelList里两个点之间的仰角,从ridx开始,往lidx的元素画一条直线,的仰角
|
||||
|
||||
func (srs *Series) GetElevation(cr *Core, ctype string, lIdx int, rIdx int) (float64, error) {
|
||||
yj := float64(0)
|
||||
switch ctype {
|
||||
case "candle":
|
||||
{
|
||||
yj = (srs.CandleSeries.List[rIdx].Y - srs.CandleSeries.List[lIdx].Y) / float64(rIdx-lIdx)
|
||||
}
|
||||
case "ma7":
|
||||
{
|
||||
yj = (srs.Ma7Series.List[rIdx].Y - srs.Ma7Series.List[lIdx].Y) / float64(rIdx-lIdx)
|
||||
}
|
||||
case "ma30":
|
||||
{
|
||||
yj = (srs.Ma30Series.List[rIdx].Y - srs.Ma30Series.List[lIdx].Y) / float64(rIdx-lIdx)
|
||||
|
||||
}
|
||||
}
|
||||
return yj, nil
|
||||
}
|
||||
|
||||
// TODO 求极值,在某个线段上。一个最大值,一个最小值
|
||||
func (srs *Series) GetExtremum(cr *Core, lIdx int, rIdx int, ctype string) (*Extremum, error) {
|
||||
ext := Extremum{
|
||||
Max: &Pixel{},
|
||||
Min: &Pixel{},
|
||||
}
|
||||
|
||||
switch ctype {
|
||||
case "candle":
|
||||
{
|
||||
done := false
|
||||
for k, v := range srs.CandleSeries.List {
|
||||
if k < lIdx {
|
||||
continue
|
||||
}
|
||||
if v.X == 0 && v.Y == 0 {
|
||||
continue
|
||||
}
|
||||
if k > rIdx {
|
||||
continue
|
||||
}
|
||||
if !done {
|
||||
ext.Max = srs.CandleSeries.List[k]
|
||||
ext.Min = srs.CandleSeries.List[k]
|
||||
done = true
|
||||
}
|
||||
|
||||
if v.Y > ext.Max.Y {
|
||||
ext.Max = v
|
||||
}
|
||||
if v.Y < ext.Min.Y {
|
||||
ext.Min = v
|
||||
}
|
||||
}
|
||||
// ext = nil
|
||||
}
|
||||
case "ma7":
|
||||
{
|
||||
done := false
|
||||
for k, v := range srs.Ma7Series.List {
|
||||
if k < lIdx {
|
||||
continue
|
||||
}
|
||||
if v.X == 0 && v.Y == 0 {
|
||||
continue
|
||||
}
|
||||
if k > rIdx {
|
||||
continue
|
||||
}
|
||||
if !done {
|
||||
ext.Max = srs.Ma7Series.List[k]
|
||||
ext.Min = srs.Ma7Series.List[k]
|
||||
done = true
|
||||
}
|
||||
if v.Y > ext.Max.Y {
|
||||
ext.Max = v
|
||||
}
|
||||
if v.Y < ext.Min.Y {
|
||||
ext.Min = v
|
||||
}
|
||||
}
|
||||
// ext = nil
|
||||
}
|
||||
case "ma30":
|
||||
{
|
||||
|
||||
done := false
|
||||
for k, v := range srs.Ma30Series.List {
|
||||
if k < lIdx {
|
||||
continue
|
||||
}
|
||||
if v.X == 0 && v.Y == 0 {
|
||||
continue
|
||||
}
|
||||
if k > rIdx {
|
||||
continue
|
||||
}
|
||||
if !done {
|
||||
ext.Max = srs.Ma30Series.List[k]
|
||||
ext.Min = srs.Ma30Series.List[k]
|
||||
done = true
|
||||
}
|
||||
if v.Y > ext.Max.Y {
|
||||
ext.Max = v
|
||||
}
|
||||
if v.Y < ext.Min.Y {
|
||||
ext.Min = v
|
||||
}
|
||||
}
|
||||
// ext = nil
|
||||
}
|
||||
}
|
||||
return &ext, nil
|
||||
}
|
||||
|
||||
// TODO 获取垂直极限列表
|
||||
// 筛选条件:
|
||||
//
|
||||
// 1. 极大值未发生在最后周期的,排除
|
||||
// 2. n周期内,有仰角小于0的,排除
|
||||
// 注意: 仰角极值未必发生在最后一个周期
|
||||
//
|
||||
// 对剩下的币种结果,计算:
|
||||
//
|
||||
// 1. n周期平均仰角: s
|
||||
// 2. 最后周期仰角: p
|
||||
//
|
||||
// 筛选出最后仰角高于n周期平均仰角的币列表,
|
||||
// 以最后仰角为结果,得到一个值 p
|
||||
// 对此列表集合,得到每个的15分钟维度拉扯极限,每个计算后得到一个结果 f,
|
||||
//
|
||||
// f值权重更高,p值权重降一个量级,求出分值用于排名,
|
||||
//
|
||||
// rank = 2 * (lcjx * -1) * (1 + avgElevation) * (1 + lastElevation) * (1 + lastElevation)
|
||||
//
|
||||
// 存储在sortedSet里,命名:
|
||||
// verticalLimit|15m~4H|rank|sortedSet
|
||||
// return rank, err
|
||||
func (vir *VerticalReportItem) MakeVerticalLimit(cr *Core, srs *Series, startIdx int, endIdx int, period2 string) (err error) {
|
||||
count := len(srs.CandleSeries.List) - 1
|
||||
lastMa30Pixel := srs.Ma30Series.List[count]
|
||||
// func (srs *Series) GetExtremum(cr *Core, lIdx int, rIdx int, ctype string) (*Extremum, error) {
|
||||
ext, err := srs.GetExtremum(cr, startIdx, endIdx, "ma30")
|
||||
if err != nil {
|
||||
logrus.Warn(utils.GetFuncName(), ":", err)
|
||||
}
|
||||
|
||||
if ext.Max.Score < 1.05*lastMa30Pixel.Score {
|
||||
lbj, _ := json.Marshal(lastMa30Pixel)
|
||||
lext, _ := json.Marshal(ext)
|
||||
err = errors.New(fmt.Sprintln("当前pixel不是极值", " lastMa30Pixel: ", string(lbj), " ext: ", string(lext)))
|
||||
return err
|
||||
} else {
|
||||
err = errors.New(fmt.Sprintln("当前pixel满足极值", lastMa30Pixel))
|
||||
}
|
||||
|
||||
yj, err := srs.GetElevation(cr, "ma30", startIdx, endIdx)
|
||||
if err != nil {
|
||||
logrus.Warn(utils.GetFuncName(), ":", err)
|
||||
}
|
||||
|
||||
vir.VerticalElevation = yj
|
||||
lcjx, _ := LacheJixian(cr, srs, period2)
|
||||
vir.ShearForce = lcjx
|
||||
vir.TrigerValue = srs.CandleSeries.List[len(srs.CandleSeries.List)-1].Score
|
||||
vir.AdvUpSellPrice = vir.TrigerValue * 1.04
|
||||
vir.AdvDownSellPrice = vir.TrigerValue * 0.98
|
||||
// 计算rank的公式如下
|
||||
// rank := 2 * (lcjx * -1) * (1 + avgElevation) * (1 + lastElevation) * (1 + lastElevation)
|
||||
// vir.Rank = rank
|
||||
return nil
|
||||
}
|
||||
|
||||
// 计算剪切力
|
||||
func LacheJixian(cr *Core, srs *Series, period string) (float64, error) {
|
||||
curSe, _ := cr.GetPixelSeries(srs.InstID, period)
|
||||
return curSe.CandleSeries.List[len(srs.CandleSeries.List)-1].Y, nil
|
||||
}
|
||||
|
||||
// type SegmentItem struct {
|
||||
// InstID string
|
||||
// Period string
|
||||
// ReportTime int64
|
||||
// lastUpdate int64
|
||||
// Interval int
|
||||
// Direct string // up, down
|
||||
// VerticalElevation float64
|
||||
// }
|
268
data/shearForceGrp.go
Normal file
268
data/shearForceGrp.go
Normal file
@ -0,0 +1,268 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis"
|
||||
logrus "github.com/sirupsen/logrus"
|
||||
"phyer.click/sardine/utils"
|
||||
)
|
||||
|
||||
type ShearItem struct {
|
||||
ShearForce float64 // ma30-candle剪切力
|
||||
VerticalElevation float64 // 仰角, Interval范围内线段的仰角
|
||||
Ratio float64 // 剪切力除以仰角的比值
|
||||
Score float64 // 当前LastCandleY点本值
|
||||
PolarQuadrant string // shangxian,manyue,xiaxian,xinyue, 分别对应圆周的四个阶段。
|
||||
LastUpdate int64
|
||||
LastUpdateTime string
|
||||
}
|
||||
type ShearForceGrp struct {
|
||||
InstID string
|
||||
LastUpdate int64
|
||||
LastUpdateTime string
|
||||
Ma30PeriodGroup map[string]ShearItem
|
||||
Ma7PeriodGroup map[string]ShearItem
|
||||
From string
|
||||
}
|
||||
|
||||
// TODO 弃用
|
||||
// func (seg *SegmentItem) MakeShearForceGrp(cr *Core) (*ShearForceGrp, error) {
|
||||
// shg := ShearForceGrp{
|
||||
// InstID: seg.InstID,
|
||||
// Ma30PeriodGroup: map[string]ShearItem{},
|
||||
// Ma7PeriodGroup: map[string]ShearItem{},
|
||||
// }
|
||||
// err := shg.ForceUpdate(cr)
|
||||
// sf1 := float64(0)
|
||||
// sf1 = seg.LastCandle.Y - seg.LastMa7.Y
|
||||
// she := ShearItem{
|
||||
// LastUpdate: time.Now().UnixMilli(),
|
||||
// VerticalElevation: seg.SubItemList[2].VerticalElevation,
|
||||
// Ratio: seg.LastCandle.Y / seg.SubItemList[2].VerticalElevation,
|
||||
// Score: seg.LastCandle.Score,
|
||||
// PolarQuadrant: seg.PolarQuadrant,
|
||||
// }
|
||||
// if seg.Ctype == "ma7" {
|
||||
// she.ShearForce = seg.LastCandle.Y
|
||||
// shg.Ma7PeriodGroup[seg.Period] = she
|
||||
// }
|
||||
// if seg.Ctype == "ma30" {
|
||||
// she.ShearForce = sf1
|
||||
// shg.Ma30PeriodGroup[seg.Period] = she
|
||||
// }
|
||||
// return &shg, err
|
||||
// }
|
||||
|
||||
// TODO 弃用
|
||||
// func (shg *ShearForceGrp) ForceUpdate(cr *Core) error {
|
||||
// ctype := "ma7"
|
||||
// hmName := shg.InstID + "|" + ctype + "|shearForceGrp"
|
||||
// res, err := cr.RedisLocalCli.HGetAll(hmName).Result()
|
||||
//
|
||||
// for k, v := range res {
|
||||
// si := ShearItem{}
|
||||
// json.Unmarshal([]byte(v), &si)
|
||||
// shg.Ma7PeriodGroup[k] = si
|
||||
// }
|
||||
//
|
||||
// ctype = "ma30"
|
||||
// hmName = shg.InstID + "|" + ctype + "|shearForceGrp"
|
||||
// res, err = cr.RedisLocalCli.HGetAll(hmName).Result()
|
||||
//
|
||||
// for k, v := range res {
|
||||
// si := ShearItem{}
|
||||
// json.Unmarshal([]byte(v), &si)
|
||||
// shg.Ma30PeriodGroup[k] = si
|
||||
// }
|
||||
// shg.SetToKey(cr)
|
||||
// return err
|
||||
// }
|
||||
func (she *ShearForceGrp) Show(cr *Core) error {
|
||||
js, err := json.Marshal(she)
|
||||
logrus.Info(utils.GetFuncName(), ": ", string(js))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO 需要重构: 已经重构
|
||||
// 对象数据库落盘
|
||||
func (she *ShearForceGrp) SetToKey(cr *Core) error {
|
||||
keyName := she.InstID + "|shearForce"
|
||||
she.From = os.Getenv("HOSTNAME")
|
||||
she.LastUpdateTime = time.Now().Format("2006-01-02 15:04:05.000")
|
||||
js, err := json.Marshal(she)
|
||||
if err != nil {
|
||||
logrus.Panic(utils.GetFuncName(), " err: ", err)
|
||||
} else {
|
||||
cr.RedisLocalCli.Set(keyName, string(js), 0).Result()
|
||||
cr.RedisLocal2Cli.Set(keyName, string(js), 0).Result()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (she *ShearForceGrp) maXPrd(cr *Core, ctype string) {
|
||||
// 先把对象克隆,防止在处理的过程中对象发生变更
|
||||
she2 := *she
|
||||
she3 := &she2
|
||||
// 查了一下,json marshal 有线程安全问题,需要用户自己加锁,先不用了
|
||||
// bj, _ := json.Marshal(she3)
|
||||
// bytes := []byte(bj)
|
||||
// var she4 ShearForceGrp
|
||||
// json.Unmarshal(bytes, she4)
|
||||
// 先声明map
|
||||
var grp map[string]ShearItem
|
||||
// 再使用make函数创建一个非nil的map,nil map不能赋值
|
||||
grp = make(map[string]ShearItem)
|
||||
if ctype == "ma7" {
|
||||
//fmt.Println("len of ma7 she.Ma7PeriodGroup: ", len(she3.Ma7PeriodGroup))
|
||||
bj, err := json.Marshal(she3.Ma7PeriodGroup)
|
||||
if err != nil {
|
||||
logrus.Panic(utils.GetFuncName(), " err:", err)
|
||||
}
|
||||
json.Unmarshal(bj, &grp)
|
||||
//fmt.Println("len of ma30 she.Ma7PeriodGroup: ", len(she3.Ma7PeriodGroup))
|
||||
} else if ctype == "ma30" {
|
||||
bj, err := json.Marshal(she3.Ma30PeriodGroup)
|
||||
if err != nil {
|
||||
logrus.Panic(utils.GetFuncName(), " err: ", err)
|
||||
}
|
||||
json.Unmarshal(bj, &grp)
|
||||
}
|
||||
for period, shearItem := range grp {
|
||||
setName := "shearForce|ratio|" + ctype + "|" + period + "|sortedSet"
|
||||
// TODO:这个key用于判定当前instID|maX|period|的ratio排名是否已经过期
|
||||
timelinessKey := "shearForce|ratio|" + she.InstID + "|" + ctype + "|" + period + "|lastUpdate"
|
||||
sei := SeriesInfo{
|
||||
InstID: she3.InstID,
|
||||
Period: period,
|
||||
}
|
||||
// 阈值先暂且设置为 -100
|
||||
// SHEARFORCE_VERTICAL_RATE
|
||||
threahold := float64(SHEARFORCE_VERTICAL_RATE)
|
||||
bj, _ := json.Marshal(sei)
|
||||
z := redis.Z{
|
||||
Score: float64(shearItem.Ratio),
|
||||
Member: string(bj),
|
||||
}
|
||||
//无论超过阈值,还是低于阈值的负数,都是达标
|
||||
if shearItem.Ratio < -1*threahold {
|
||||
cr.RedisLocalCli.ZAdd(setName, z).Result()
|
||||
cr.RedisLocalCli.Set(timelinessKey, shearItem.LastUpdate, 3*time.Minute)
|
||||
} else if shearItem.Ratio > threahold {
|
||||
cr.RedisLocalCli.ZAdd(setName, z).Result()
|
||||
cr.RedisLocalCli.Set(timelinessKey, shearItem.LastUpdate, 3*time.Minute)
|
||||
} else {
|
||||
cr.RedisLocalCli.ZRem(setName, string(bj)).Result()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 把所有引用调用都改成传值调用,试试,看能不能解决那个陈年bug
|
||||
func (she *ShearForceGrp) AddToRatioSorted(cr *Core) error {
|
||||
she.maXPrd(cr, "ma7")
|
||||
she.maXPrd(cr, "ma30")
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO 需要重构: 看了一下,不用重构
|
||||
func (she *ShearForceGrp) MakeSnapShot(cr *Core) error {
|
||||
nw := time.Now()
|
||||
tm := nw.UnixMilli()
|
||||
tm = tm - tm%60000
|
||||
tms := strconv.FormatInt(tm, 10)
|
||||
js, err := json.Marshal(she)
|
||||
|
||||
keyName1 := fmt.Sprint(she.InstID + "|shearForce|snapShot|ts:" + tms)
|
||||
keyName2 := fmt.Sprint(she.InstID + "|shearForce|snapShot|last")
|
||||
_, err = cr.RedisLocalCli.Set(keyName1, string(js), time.Duration(24)*time.Hour).Result()
|
||||
_, err = cr.RedisLocalCli.Set(keyName2, string(js), time.Duration(24)*time.Hour).Result()
|
||||
_, err = cr.RedisLocal2Cli.Set(keyName1, string(js), time.Duration(24)*time.Hour).Result()
|
||||
_, err = cr.RedisLocal2Cli.Set(keyName2, string(js), time.Duration(24)*time.Hour).Result()
|
||||
writeLog := os.Getenv("SARDINE_WRITELOG") == "true"
|
||||
if !writeLog {
|
||||
return err
|
||||
}
|
||||
wg := WriteLog{
|
||||
Content: js,
|
||||
Tag: she.InstID + ".shearForce",
|
||||
}
|
||||
go func() {
|
||||
cr.WriteLogChan <- &wg
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sheGrp *ShearForceGrp) Refresh(cr *Core) error {
|
||||
segments := cr.Cfg.Config.Get("softCandleSegmentList").MustArray()
|
||||
ma7Grp := map[string]ShearItem{}
|
||||
ma30Grp := map[string]ShearItem{}
|
||||
//搜集各个维度未过期的shearItem数据,组合成shearForceGrp对象
|
||||
for _, v := range segments {
|
||||
cs := CandleSegment{}
|
||||
sv, _ := json.Marshal(v)
|
||||
json.Unmarshal(sv, &cs)
|
||||
shi30, err := MakeShearItem(cr, sheGrp.InstID, cs.Seg, "ma30")
|
||||
if err != nil {
|
||||
logrus.Warn(utils.GetFuncName(), err)
|
||||
} else {
|
||||
ma30Grp[cs.Seg] = *shi30
|
||||
}
|
||||
shi7, err := MakeShearItem(cr, sheGrp.InstID, cs.Seg, "ma7")
|
||||
if err != nil {
|
||||
logrus.Warn(utils.GetFuncName(), err)
|
||||
} else {
|
||||
ma7Grp[cs.Seg] = *shi7
|
||||
}
|
||||
sheGrp.Ma7PeriodGroup = ma7Grp
|
||||
sheGrp.Ma30PeriodGroup = ma30Grp
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func MakeShearItem(cr *Core, instId string, period string, ctype string) (*ShearItem, error) {
|
||||
shi := ShearItem{}
|
||||
keyn := instId + "|" + period + "|" + ctype + "|shearItem"
|
||||
res, err := cr.RedisLocalCli.Get(keyn).Result()
|
||||
if err != nil && len(res) == 0 {
|
||||
return &shi, err
|
||||
}
|
||||
json.Unmarshal([]byte(res), &shi)
|
||||
return &shi, err
|
||||
}
|
||||
|
||||
func (sheGrp *ShearForceGrp) Process(cr *Core) error {
|
||||
go func() {
|
||||
sheGrp.Show(cr)
|
||||
// 传递过来的shg对象是空的,需要从segmentItem对象创建的shearItem对象组合中来重建
|
||||
sheGrp.Refresh(cr)
|
||||
err := sheGrp.SetToKey(cr)
|
||||
if err != nil {
|
||||
logrus.Panic("srs SetToKey err: ", err)
|
||||
}
|
||||
// sheGrp.MakeSnapShot(cr)
|
||||
// 下一个阶段计算
|
||||
allow := os.Getenv("SARDINE_MAKEANALYTICS") == "true"
|
||||
if !allow {
|
||||
return
|
||||
}
|
||||
|
||||
periodList := []string{}
|
||||
for k := range sheGrp.Ma30PeriodGroup {
|
||||
periodList = append(periodList, k)
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
sheGrp.AddToRatioSorted(cr)
|
||||
}()
|
||||
go func() {
|
||||
// 另一个携程中,Analytics对象要读这里snapShot,我希望它读到的是老的而不是新的,所以等待2秒钟
|
||||
time.Sleep(2 * time.Second)
|
||||
sheGrp.MakeSnapShot(cr)
|
||||
}()
|
||||
return nil
|
||||
}
|
161
data/sorted.go
Normal file
161
data/sorted.go
Normal file
@ -0,0 +1,161 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis"
|
||||
logrus "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// 1. 成交量计算和保存
|
||||
// func (tk *TickerInfo) MakeVolSorted(cr *Core, period string) {
|
||||
// SORTED_INTERVAL 分钟的整数倍
|
||||
// minutes := cr.PeriodToMinutes(period)
|
||||
// ts := tk.Ts
|
||||
// ts = ts - ts%60
|
||||
// z := redis.Z{
|
||||
// Score: float64(tk.VolCcy24h),
|
||||
// Member: tk.InstID,
|
||||
// }
|
||||
// 计算当次成交量环比增长比值, 存入当次成交价
|
||||
// if ts%(minutes*60) == 0 {
|
||||
// 计算成交量环比增幅
|
||||
// tk.makeVolSpeed(cr, period)
|
||||
// cr.RedisLocalCli.ZAdd(SORTED_VOL+"|"+period, z).Result()
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// 2. 成交量环比增幅计算和保存
|
||||
// func (tk *TickerInfo) makeVolSpeed(cr *Core, period string) {
|
||||
// spd := float64(1)
|
||||
// preVolCcy24h, err := cr.RedisLocalCli.ZScore(SORTED_VOL+"|"+period, tk.InstID).Result()
|
||||
// if err != nil {
|
||||
// TODO 新股
|
||||
// } else {
|
||||
// if tk.VolCcy24h < preVolCcy24h {
|
||||
// 新的一天开始了
|
||||
// } else {
|
||||
// tk.VolDiff = tk.VolCcy24h - preVolCcy24h
|
||||
// }
|
||||
// }
|
||||
// tk.VolDiff = spd
|
||||
// 计算环比增幅振幅
|
||||
// tk.makeVolAcclr(cr, period)
|
||||
// z := redis.Z{
|
||||
// Score: tk.VolDiff,
|
||||
// Member: tk.InstID,
|
||||
// }
|
||||
// 保存当次成交量环比增幅
|
||||
// cr.RedisLocalCli.ZAdd(SORTED_VOLDIFF+"|"+period, z).Result()
|
||||
// }
|
||||
//
|
||||
// 3. 成交量环比增长振幅计算和保存
|
||||
// func (tk *TickerInfo) makeVolAcclr(cr *Core, period string) {
|
||||
// acclr := float64(0)
|
||||
// score, err := cr.RedisLocalCli.ZScore(SORTED_VOLDIFF+"|"+period, tk.InstID).Result()
|
||||
// if err != nil {
|
||||
// TODO 新股
|
||||
// fmt.Println("zScore err:", score, err)
|
||||
// } else {
|
||||
// 当次比值和上次比值之间的比值作为当次振幅保存
|
||||
// acclr = (tk.VolDiff + score) / (tk.VolCcy24h - score)
|
||||
// }
|
||||
// tk.VolAcclr = acclr
|
||||
// z := redis.Z{
|
||||
// Score: acclr,
|
||||
// Member: tk.InstID,
|
||||
// }
|
||||
// cr.RedisLocalCli.ZAdd(SORTED_VOLACCLER+"|"+period, z).Result()
|
||||
// }
|
||||
|
||||
// ------------------------------------------------------------------------
|
||||
// 1. 时间节点: t0; 价格保存
|
||||
func (tk *TickerInfo) MakePriceSorted(cr *Core, period string) {
|
||||
ts := tk.Ts
|
||||
tm := time.Now().UnixMilli()
|
||||
bj, _ := json.Marshal(tk)
|
||||
if tm > (ts + 100000) {
|
||||
logrus.Info("tickerInfo已经失效:", tm-(ts+1000), "毫秒", string(bj))
|
||||
} else {
|
||||
logrus.Info("tickerInfo有效:", string(bj))
|
||||
}
|
||||
ts = ts - ts%60*1000
|
||||
minutes, err := cr.PeriodToMinutes(period)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
z := redis.Z{
|
||||
Score: float64(tk.Last),
|
||||
Member: tk.InstId,
|
||||
}
|
||||
// 满足约定时间间隔,执行制作价格差和存储当次价格的动作, 每分钟都计算,不管间隔是多少分钟,分到不同的seg里去,10分钟的话,就会有9个seg
|
||||
for i := int64(0); i < minutes; i++ {
|
||||
if (ts%(minutes*60*1000))/(60*1000) == int64(i) {
|
||||
//计算成交价环比增幅
|
||||
tk.makePriceDiff(cr, period, i)
|
||||
// 保存本次成交价
|
||||
cr.RedisLocalCli.ZAdd(SORTED_PRICE+"|"+period+"|seg"+strconv.FormatInt(i, 10), z).Result()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// 2. 时间节点: t1; 成交价环比增幅计算和保存
|
||||
func (tk *TickerInfo) makePriceDiff(cr *Core, period string, seg int64) {
|
||||
spd := float64(1)
|
||||
score, err := cr.RedisLocalCli.ZScore(SORTED_PRICE+"|"+period+"|seg"+strconv.FormatInt(seg, 10), tk.InstId).Result()
|
||||
if err != nil {
|
||||
//TODO 新股
|
||||
logrus.Warn("makeVolSpeed zScore err:", score, err)
|
||||
} else {
|
||||
spd = tk.Last - score
|
||||
}
|
||||
tk.PriceDiff = spd
|
||||
tk.makePriceAcclr(cr, period, seg)
|
||||
z := redis.Z{
|
||||
Score: spd,
|
||||
Member: tk.InstId,
|
||||
}
|
||||
cr.RedisLocalCli.ZAdd(SORTED_PRICEDIFF+"|"+period+"|seg"+strconv.FormatInt(seg, 10), z).Result()
|
||||
// fmt.Println("sorted PriceDiff: ", tk.InstID, "new Price:", tk.Last, "old Price:", score, "PriceDiff:", spd, "rs:", rs)
|
||||
}
|
||||
|
||||
// 3. 时间节点: t2; 价格变化振幅排行榜, 计算振幅需要三个阶段,第一个阶段得到降/增幅,第二个阶段得到降/增幅的差值,第三个阶段得到将增幅差值的比值
|
||||
func (tk *TickerInfo) makePriceAcclr(cr *Core, period string, seg int64) {
|
||||
acclr := float64(0)
|
||||
score, err := cr.RedisLocalCli.ZScore(SORTED_PRICEDIFF+"|"+period+"|seg"+strconv.FormatInt(seg, 10), tk.InstId).Result()
|
||||
if err != nil {
|
||||
//TODO 新股
|
||||
fmt.Println("zScore err:", score, err)
|
||||
} else {
|
||||
// 在时间节点t2, 用最近两个周期增降幅的叠加之后的结果,除以t0时刻的成交价, 得到振幅
|
||||
// 时间跨度为两个周期,t0开始,t3结束
|
||||
acclr = (tk.PriceDiff + score) / (tk.Last - score)
|
||||
}
|
||||
tk.PriceAcclr = acclr
|
||||
z := redis.Z{
|
||||
Score: acclr,
|
||||
Member: tk.InstId,
|
||||
}
|
||||
zname := SORTED_PRICEACCLR + "|" + period + "|seg" + strconv.FormatInt(seg, 10)
|
||||
// fmt.Println("zname1:", zname)
|
||||
cr.RedisLocalCli.ZAdd(zname, z).Result()
|
||||
// fmt.Println("sorted Price acclr ", tk.InstID, "new diff:", tk.PriceDiff, "old PriceDiff:", score, "acclr:", acclr, "rs:", rs)
|
||||
}
|
||||
|
||||
func commonSorted(cr *Core, ctype string, period string, seg int64) []redis.Z {
|
||||
env := os.Getenv("GO_ENV")
|
||||
isProdction := env == "production"
|
||||
zname := ctype + "|" + period + "|seg" + strconv.FormatInt(seg, 10)
|
||||
ary, err := cr.RedisLocalCli.ZRevRangeWithScores(zname, 0, -1).Result()
|
||||
if err != nil {
|
||||
if !isProdction {
|
||||
fmt.Println("sortedSet err:", err)
|
||||
}
|
||||
}
|
||||
return ary
|
||||
}
|
35
ini.sh
Executable file
35
ini.sh
Executable file
@ -0,0 +1,35 @@
|
||||
#!/bin/bash
|
||||
|
||||
# 创建目录结构
|
||||
mkdir -p config model service data util
|
||||
|
||||
# 移动配置文件
|
||||
mv core/shared/config/config.go config/
|
||||
mv core/shared/config/const.go config/
|
||||
|
||||
# 移动模型文件
|
||||
mv core/models/candle.go model/
|
||||
mv core/models/maX.go model/
|
||||
mv core/models/ticker.go model/
|
||||
|
||||
# 移动服务文件
|
||||
mv core/services/service_context.go service/
|
||||
|
||||
# 移动数据层文件
|
||||
mv core/datapipes/coaster.go data/
|
||||
mv core/datapipes/pixel.go data/
|
||||
mv core/datapipes/plate.go data/
|
||||
mv core/datapipes/rsi.go data/
|
||||
mv core/datapipes/segmentItem.go data/
|
||||
mv core/datapipes/series.go data/
|
||||
mv core/datapipes/shearForceGrp.go data/
|
||||
mv core/datapipes/sorted.go data/
|
||||
mv core/datapipes/tray.go data/
|
||||
|
||||
# 移动工具类文件
|
||||
mv core/shared/util.go util/
|
||||
mv core/shared/logging/writeLog.go util/
|
||||
|
||||
echo "文件迁移完成!新目录结构:"
|
||||
tree -d .
|
||||
|
@ -1,4 +1,4 @@
|
||||
package core
|
||||
package model
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
@ -19,11 +19,20 @@ import (
|
||||
logrus "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// 定义核心服务接口
|
||||
type CoreService interface {
|
||||
GetCfg() *simple.Json
|
||||
GetRedisLocalCli() *redis.Client
|
||||
AddToGeneralCandleChnl(candle *Candle, channels []string)
|
||||
PeriodToMinutes(period string) (int64, error)
|
||||
// 其他需要的方法...
|
||||
}
|
||||
|
||||
type Candle struct {
|
||||
Id string `json:"_id"`
|
||||
core *Core
|
||||
InstID string `json:"instID"`
|
||||
Period string `json:"period"`
|
||||
Id string `json:"_id"`
|
||||
Core CoreService // 改为接口类型
|
||||
InstID string `json:"instID"`
|
||||
Period string `json:"period"`
|
||||
Data []interface{}
|
||||
From string `json:"from"`
|
||||
Timestamp time.Time `json:"timeStamp"`
|
||||
@ -36,7 +45,7 @@ type Candle struct {
|
||||
Confirm bool `json:"confirm"`
|
||||
}
|
||||
type Sample interface {
|
||||
SetToKey(cr *Core) ([]interface{}, error)
|
||||
SetToKey(cr *CoreService) ([]interface{}, error)
|
||||
}
|
||||
|
||||
type SampleList interface {
|
||||
@ -62,7 +71,7 @@ type MatchCheck struct {
|
||||
Matched bool
|
||||
}
|
||||
|
||||
func (cd *Candle) Filter(cr *Core) bool {
|
||||
func (cd *Candle) Filter(cr *CoreService) bool {
|
||||
myFocusList := cr.Cfg.Config.Get("focusList").MustArray()
|
||||
founded := false
|
||||
for _, v := range myFocusList {
|
||||
@ -77,184 +86,6 @@ func (mc *MatchCheck) SetMatched(value bool) {
|
||||
mc.Matched = value
|
||||
}
|
||||
|
||||
func (core *Core) GetCandlesWithRest(instId string, kidx int, dura time.Duration, maxCandles int) error {
|
||||
ary := []string{}
|
||||
|
||||
wsary := core.Cfg.CandleDimentions
|
||||
for k, v := range wsary {
|
||||
matched := false
|
||||
// 这个算法的目的是:越靠后的candles维度,被命中的概率越低,第一个百分之百命中,后面开始越来越低, 每分钟都会发生这样的计算,
|
||||
// 因为维度多了的话,照顾不过来
|
||||
rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
n := (k*2 + 2) * 3
|
||||
if n < 1 {
|
||||
n = 1
|
||||
}
|
||||
b := rand.Intn(n)
|
||||
if b < 8 {
|
||||
matched = true
|
||||
}
|
||||
if matched {
|
||||
ary = append(ary, v)
|
||||
}
|
||||
}
|
||||
|
||||
mdura := dura/(time.Duration(len(ary)+1)) - 50*time.Millisecond
|
||||
// fmt.Println("loop4 Ticker Start instId, dura: ", instId, dura, dura/10, mdura, len(ary), " idx: ", kidx)
|
||||
// time.Duration(len(ary)+1)
|
||||
ticker := time.NewTicker(mdura)
|
||||
done := make(chan bool)
|
||||
idx := 0
|
||||
go func(i int) {
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if i >= (len(ary)) {
|
||||
done <- true
|
||||
break
|
||||
}
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
b := rand.Intn(2)
|
||||
maxCandles = maxCandles * (i + b) * 2
|
||||
|
||||
if maxCandles < 3 {
|
||||
maxCandles = 3
|
||||
}
|
||||
if maxCandles > 30 {
|
||||
maxCandles = 30
|
||||
}
|
||||
mx := strconv.Itoa(maxCandles)
|
||||
// fmt.Println("loop4 getCandlesWithRest, instId, period,limit,dura, t: ", instId, ary[i], mx, mdura)
|
||||
go func(ii int) {
|
||||
restQ := RestQueue{
|
||||
InstId: instId,
|
||||
Bar: ary[ii],
|
||||
Limit: mx,
|
||||
Duration: mdura,
|
||||
WithWs: true,
|
||||
}
|
||||
js, _ := json.Marshal(restQ)
|
||||
core.RedisLocalCli.LPush("restQueue", js)
|
||||
}(i)
|
||||
i++
|
||||
}
|
||||
}
|
||||
}(idx)
|
||||
time.Sleep(dura - 10*time.Millisecond)
|
||||
ticker.Stop()
|
||||
// fmt.Println("loop4 Ticker stopped instId, dura: ", instId, dura, mdura)
|
||||
done <- true
|
||||
return nil
|
||||
}
|
||||
|
||||
// 当前的时间毫秒数 对于某个时间段,比如3分钟,10分钟,是否可以被整除,
|
||||
func IsModOf(curInt int64, duration time.Duration) bool {
|
||||
vol := int64(0)
|
||||
if duration < 24*time.Hour {
|
||||
// 小于1天
|
||||
vol = (curInt + 28800000)
|
||||
} else if duration >= 24*time.Hour && duration < 48*time.Hour {
|
||||
// 1天
|
||||
vol = curInt - 1633881600000
|
||||
} else if duration >= 48*time.Hour && duration < 72*time.Hour {
|
||||
// 2天
|
||||
vol = curInt - 1633795200000
|
||||
} else if duration >= 72*time.Hour && duration < 120*time.Hour {
|
||||
// 3天
|
||||
vol = curInt - 1633708800000
|
||||
} else if duration >= 120*time.Hour {
|
||||
// 5天
|
||||
vol = curInt - 1633795200000
|
||||
} else {
|
||||
// fmt.Println("noMatched:", curInt)
|
||||
}
|
||||
|
||||
mody := vol % duration.Milliseconds()
|
||||
if mody == 0 {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (core *Core) SaveCandle(instId string, period string, rsp *CandleData, dura time.Duration, withWs bool) {
|
||||
leng := len(rsp.Data)
|
||||
// fmt.Println("saveCandle leng: ", leng, " instId: ", instId, " period: ", period)
|
||||
logrus.Info("saveCandles leng: ", leng, " instId: ", instId, " period: ", period, " length of rsp.Data: ", len(rsp.Data))
|
||||
// softCandleSegmentList
|
||||
segments := core.Cfg.Config.Get("softCandleSegmentList").MustArray()
|
||||
logrus.Warn("lensof segments:", len(segments))
|
||||
curSegStartTime := ""
|
||||
for k, v := range segments {
|
||||
logrus.Warn("fetch segments:", k, v)
|
||||
cs := CandleSegment{}
|
||||
sv, _ := json.Marshal(v)
|
||||
json.Unmarshal(sv, &cs)
|
||||
if !cs.Enabled {
|
||||
continue
|
||||
}
|
||||
logrus.Warn("fetch segments2: cs.Seg: ", cs.Seg, ", period:", period, ", cs.Seg == period: ", (cs.Seg == period))
|
||||
|
||||
if cs.Seg == period {
|
||||
curSegStartTime = cs.StartTime
|
||||
break
|
||||
}
|
||||
}
|
||||
logrus.Warn("curSegStartTime:", curSegStartTime)
|
||||
curTm, _ := time.ParseInLocation("2006-01-02 15:04.000", curSegStartTime, time.Local)
|
||||
curTmi := curTm.UnixMilli()
|
||||
|
||||
for k, v := range rsp.Data {
|
||||
tmi := ToInt64(v[0])
|
||||
last := ToFloat64(v[4])
|
||||
// ty := reflect.TypeOf(v[4]).Name()
|
||||
// v4It, err := strconv.ParseInt(v[4].(string), 10, 64)
|
||||
// if err != nil {
|
||||
// logrus.Info("saveCandles last is 0 is err: ", err, "v4It: ", v4It, "v[4]: ", v[4], "v[4] type: ", ty, "leng: ", leng, " instId: ", instId, " period: ", period, " length of rsp.Data: ", len(rsp.Data), " data:", rsp.Data)
|
||||
// }
|
||||
if last == 0 {
|
||||
// logrus.Info("saveCandles last is 0: ", "v[4]: ", v[4], "v[4] type: ", ty, " v4It: ", v4It, "leng: ", leng, " instId: ", instId, " period: ", period, " length of rsp.Data: ", len(rsp.Data), " data:", rsp.Data)
|
||||
continue
|
||||
}
|
||||
minutes, _ := core.PeriodToMinutes(period)
|
||||
logrus.Warn("tmi: ", tmi, " curTim:", curTmi, " minutes: ", minutes)
|
||||
if (tmi-curTmi)%(minutes*60000) != 0 {
|
||||
logrus.Warn("saveCandles error: 当前记录中的时间戳:", curSegStartTime, ",并非周期节点:", period, " 忽略")
|
||||
continue
|
||||
}
|
||||
|
||||
ts, _ := Int64ToTime(tmi)
|
||||
candle := Candle{
|
||||
InstID: instId,
|
||||
Period: period,
|
||||
Data: v,
|
||||
From: "rest",
|
||||
Timestamp: ts,
|
||||
LastUpdate: time.Now(),
|
||||
}
|
||||
|
||||
//存到elasticSearch
|
||||
candle.PushToWriteLogChan(core)
|
||||
//保存rest得到的candle
|
||||
// 发布到allCandles|publish, 给外部订阅者用于setToKey
|
||||
arys := []string{ALLCANDLES_PUBLISH}
|
||||
if withWs {
|
||||
arys = append(arys, ALLCANDLES_INNER_PUBLISH)
|
||||
time.Sleep(time.Duration(k*40) * time.Millisecond)
|
||||
}
|
||||
// 如果candle都不需要存到redis,那么AddToGeneralCandleChnl也没有意义
|
||||
saveCandle := os.Getenv("TEXUS_SAVECANDLE")
|
||||
logrus.Info("saveCandles datas: k,v: ", k, v)
|
||||
if saveCandle == "true" {
|
||||
go func(k int) {
|
||||
time.Sleep(time.Duration(k*100) * time.Millisecond)
|
||||
candle.SetToKey(core)
|
||||
core.AddToGeneralCandleChnl(&candle, arys)
|
||||
}(k)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (candle *Candle) PushToWriteLogChan(cr *Core) error {
|
||||
did := candle.InstID + candle.Period + candle.Data[0].(string)
|
||||
candle.Id = HashString(did)
|
@ -1,4 +1,4 @@
|
||||
package core
|
||||
package model
|
||||
|
||||
import (
|
||||
"encoding/json"
|
@ -1,4 +1,4 @@
|
||||
package core
|
||||
package model
|
||||
|
||||
import (
|
||||
"encoding/json"
|
32
service/segment_service.go
Normal file
32
service/segment_service.go
Normal file
@ -0,0 +1,32 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/bitly/go-simplejson"
|
||||
logrus "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type DefaultSegmentService struct {
|
||||
Config *simplejson.Json
|
||||
}
|
||||
|
||||
func (s *DefaultSegmentService) GetCurrentSegment(period string) *CandleSegment {
|
||||
segments := s.Config.Get("softCandleSegmentList").MustArray()
|
||||
|
||||
for _, v := range segments {
|
||||
cs := CandleSegment{}
|
||||
sv, _ := json.Marshal(v)
|
||||
json.Unmarshal(sv, &cs)
|
||||
|
||||
if cs.Enabled && cs.Seg == period {
|
||||
return &cs
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *DefaultSegmentService) ParseSegmentTime(timeStr string) (time.Time, error) {
|
||||
return time.ParseInLocation("2006-01-02 15:04.000", timeStr, time.Local)
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package core
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -18,16 +18,22 @@ import (
|
||||
// simple "github.com/bitly/go-simplejson"
|
||||
// "v5sdk_go/ws/wImpl"
|
||||
"github.com/go-redis/redis"
|
||||
"github.com/phyer/core/core"
|
||||
"github.com/phyer/core/model"
|
||||
"github.com/phyer/texus/private"
|
||||
"github.com/phyer/v5sdkgo/rest"
|
||||
logrus "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type Core struct {
|
||||
Env string
|
||||
Cfg *MyConfig
|
||||
RedisLocalCli *redis.Client
|
||||
RedisRemoteCli *redis.Client
|
||||
Env string
|
||||
Config ConfigProvider // 改为接口类型
|
||||
Redis RedisConnector // 改为接口类型
|
||||
HTTPClient HTTPClient // 改为接口类型
|
||||
Logger Logger // 改为接口类型
|
||||
SegmentServer core.SegmentService // 新增接口依赖
|
||||
|
||||
// 保留原有的运行时字段
|
||||
FluentBitUrl string
|
||||
PlateMap map[string]*Plate
|
||||
TrayMap map[string]*Tray
|
||||
@ -35,21 +41,54 @@ type Core struct {
|
||||
Mu *sync.Mutex
|
||||
Mu1 *sync.Mutex
|
||||
Waity *sync.WaitGroup
|
||||
CandlesProcessChan chan *Candle
|
||||
CandlesProcessChan chan *model.Candle
|
||||
MaXProcessChan chan *MaX
|
||||
RsiProcessChan chan *Rsi
|
||||
StockRsiProcessChan chan *StockRsi
|
||||
TickerInforocessChan chan *TickerInfo
|
||||
CoasterChan chan *CoasterInfo
|
||||
// SeriesChan chan *SeriesInfo
|
||||
// SegmentItemChan chan *SegmentItem
|
||||
MakeMaXsChan chan *Candle
|
||||
// ShearForceGrpChan chan *ShearForceGrp
|
||||
InvokeRestQueueChan chan *RestQueue
|
||||
RedisLocal2Cli *redis.Client
|
||||
RestQueueChan chan *RestQueue
|
||||
SeriesChan chan *SeriesInfo
|
||||
SegmentItemChan chan *SegmentItem
|
||||
MakeMaXsChan chan *Candle
|
||||
ShearForceGrpChan chan *ShearForceGrp
|
||||
InvokeRestQueueChan chan *RestQueue
|
||||
RestQueueChan chan *RestQueue
|
||||
RestQueue
|
||||
WriteLogChan chan *WriteLog
|
||||
|
||||
// 降级为内部字段(后续可逐步移除)
|
||||
RedisLocalCli *redis.Client
|
||||
RedisRemoteCli *redis.Client
|
||||
RedisLocal2Cli *redis.Client
|
||||
Cfg *MyConfig
|
||||
}
|
||||
|
||||
// 各模块接口定义
|
||||
type RedisConnector interface {
|
||||
GetClient(options *redis.Options) (*redis.Client, error)
|
||||
Ping(client *redis.Client) error
|
||||
}
|
||||
|
||||
type ConfigProvider interface {
|
||||
GetString(keyPath string) (string, error)
|
||||
GetInt(keyPath string) (int, error)
|
||||
GetBool(keyPath string) (bool, error)
|
||||
}
|
||||
|
||||
type Logger interface {
|
||||
Info(args ...interface{})
|
||||
Error(args ...interface{})
|
||||
Warn(args ...interface{})
|
||||
Debug(args ...interface{})
|
||||
}
|
||||
|
||||
type HTTPClient interface {
|
||||
Do(req *http.Request) (*http.Response, error)
|
||||
}
|
||||
|
||||
type PipelineOperator interface {
|
||||
CandlePipeline() chan *Candle
|
||||
LogPipeline() chan *WriteLog
|
||||
}
|
||||
|
||||
type RestQueue struct {
|
||||
@ -127,26 +166,99 @@ func WriteLogProcess(cr *Core) {
|
||||
// fmt.Println("serverSystem time:", rsp)
|
||||
// }
|
||||
|
||||
func (core *Core) Init() {
|
||||
core.Env = os.Getenv("GO_ENV")
|
||||
gitBranch := os.Getenv("gitBranchName")
|
||||
commitID := os.Getenv("gitCommitID")
|
||||
|
||||
logrus.Info("当前环境: ", core.Env)
|
||||
logrus.Info("gitBranch: ", gitBranch)
|
||||
logrus.Info("gitCommitID: ", commitID)
|
||||
cfg := MyConfig{}
|
||||
cfg, _ = cfg.Init()
|
||||
core.Cfg = &cfg
|
||||
cli, err := core.GetRedisLocalCli()
|
||||
core.RedisLocalCli = cli
|
||||
core.RestQueueChan = make(chan *RestQueue)
|
||||
core.WriteLogChan = make(chan *WriteLog)
|
||||
// 跟订单有关的都关掉
|
||||
// core.OrderChan = make(chan *private.Order)
|
||||
if err != nil {
|
||||
logrus.Error("init redis client err: ", err)
|
||||
func NewCore(
|
||||
config ConfigProvider,
|
||||
redisConn RedisConnector,
|
||||
httpClient HTTPClient,
|
||||
logger Logger,
|
||||
) *Core {
|
||||
core := &Core{
|
||||
Config: config,
|
||||
Redis: redisConn,
|
||||
HTTPClient: httpClient,
|
||||
Logger: logger,
|
||||
RestQueueChan: make(chan *RestQueue),
|
||||
WriteLogChan: make(chan *WriteLog),
|
||||
}
|
||||
|
||||
core.Env = os.Getenv("GO_ENV")
|
||||
core.FluentBitUrl = os.Getenv("SARDINE_FluentBitUrl")
|
||||
|
||||
logger.Info("当前环境: ", core.Env)
|
||||
logger.Info("FluentBit地址: ", core.FluentBitUrl)
|
||||
|
||||
return core
|
||||
}
|
||||
|
||||
func (core *Core) GetCandlesWithRest(instId string, kidx int, dura time.Duration, maxCandles int) error {
|
||||
ary := []string{}
|
||||
|
||||
wsary := core.Cfg.CandleDimentions
|
||||
for k, v := range wsary {
|
||||
matched := false
|
||||
// 这个算法的目的是:越靠后的candles维度,被命中的概率越低,第一个百分之百命中,后面开始越来越低, 每分钟都会发生这样的计算,
|
||||
// 因为维度多了的话,照顾不过来
|
||||
rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
n := (k*2 + 2) * 3
|
||||
if n < 1 {
|
||||
n = 1
|
||||
}
|
||||
b := rand.Intn(n)
|
||||
if b < 8 {
|
||||
matched = true
|
||||
}
|
||||
if matched {
|
||||
ary = append(ary, v)
|
||||
}
|
||||
}
|
||||
|
||||
mdura := dura/(time.Duration(len(ary)+1)) - 50*time.Millisecond
|
||||
// fmt.Println("loop4 Ticker Start instId, dura: ", instId, dura, dura/10, mdura, len(ary), " idx: ", kidx)
|
||||
// time.Duration(len(ary)+1)
|
||||
ticker := time.NewTicker(mdura)
|
||||
done := make(chan bool)
|
||||
idx := 0
|
||||
go func(i int) {
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if i >= (len(ary)) {
|
||||
done <- true
|
||||
break
|
||||
}
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
b := rand.Intn(2)
|
||||
maxCandles = maxCandles * (i + b) * 2
|
||||
|
||||
if maxCandles < 3 {
|
||||
maxCandles = 3
|
||||
}
|
||||
if maxCandles > 30 {
|
||||
maxCandles = 30
|
||||
}
|
||||
mx := strconv.Itoa(maxCandles)
|
||||
// fmt.Println("loop4 getCandlesWithRest, instId, period,limit,dura, t: ", instId, ary[i], mx, mdura)
|
||||
go func(ii int) {
|
||||
restQ := RestQueue{
|
||||
InstId: instId,
|
||||
Bar: ary[ii],
|
||||
Limit: mx,
|
||||
Duration: mdura,
|
||||
WithWs: true,
|
||||
}
|
||||
js, _ := json.Marshal(restQ)
|
||||
core.RedisLocalCli.LPush("restQueue", js)
|
||||
}(i)
|
||||
i++
|
||||
}
|
||||
}
|
||||
}(idx)
|
||||
time.Sleep(dura - 10*time.Millisecond)
|
||||
ticker.Stop()
|
||||
// fmt.Println("loop4 Ticker stopped instId, dura: ", instId, dura, mdura)
|
||||
done <- true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (core *Core) GetRedisCliFromConf(conf RedisConfig) (*redis.Client, error) {
|
||||
@ -593,7 +705,7 @@ func (cr *Core) ProcessOrder(od *private.Order) error {
|
||||
// return res.V5Response, nil
|
||||
// }
|
||||
|
||||
func (cr *Core) AddToGeneralCandleChnl(candle *Candle, channels []string) {
|
||||
func (cr *Core) AddToGeneralCandleChnl(candle *model.Candle, channels []string) {
|
||||
redisCli := cr.RedisLocalCli
|
||||
ab, err := json.Marshal(candle)
|
||||
if err != nil {
|
@ -207,3 +207,32 @@ func Md5V(str string) string {
|
||||
h.Write([]byte(str))
|
||||
return hex.EncodeToString(h.Sum(nil))
|
||||
}
|
||||
|
||||
// 当前的时间毫秒数 对于某个时间段,比如3分钟,10分钟,是否可以被整除,
|
||||
func IsModOf(curInt int64, duration time.Duration) bool {
|
||||
vol := int64(0)
|
||||
if duration < 24*time.Hour {
|
||||
// 小于1天
|
||||
vol = (curInt + 28800000)
|
||||
} else if duration >= 24*time.Hour && duration < 48*time.Hour {
|
||||
// 1天
|
||||
vol = curInt - 1633881600000
|
||||
} else if duration >= 48*time.Hour && duration < 72*time.Hour {
|
||||
// 2天
|
||||
vol = curInt - 1633795200000
|
||||
} else if duration >= 72*time.Hour && duration < 120*time.Hour {
|
||||
// 3天
|
||||
vol = curInt - 1633708800000
|
||||
} else if duration >= 120*time.Hour {
|
||||
// 5天
|
||||
vol = curInt - 1633795200000
|
||||
} else {
|
||||
// fmt.Println("noMatched:", curInt)
|
||||
}
|
||||
|
||||
mody := vol % duration.Milliseconds()
|
||||
if mody == 0 {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user