ccxt-go/exchange_throttler.go

101 lines
1.8 KiB
Go
Raw Normal View History

2025-02-28 10:33:20 +08:00
package ccxt
import (
"time"
u "github.com/google/uuid"
)
type Throttler struct {
Queue Queue
Running bool
Config map[string]interface{}
}
func NewThrottler(config map[string]interface{}) Throttler {
defaultConfig := map[string]interface{}{
"refillRate": 1.0,
"delay": 0.001,
"capacity": 1.0,
"maxCapacity": 2000,
"tokens": 0,
"cost": 1.0,
}
return Throttler{
Queue: NewQueue(),
Running: false,
Config: ExtendMap(defaultConfig, config),
}
}
func (t *Throttler) Throttle(cost2 interface{}) <-chan bool {
var cost float64 = -1
if cost2 != nil {
cost = cost2.(float64)
} else {
cost = ToFloat64(t.Config["cost"])
}
task := make(chan bool)
queueElement := QueueElement{
Cost: cost,
Task: task,
Id: u.New().String(),
}
t.Queue.Enqueue(queueElement)
if !t.Running {
t.Running = true
go t.Loop()
}
return task
}
func (t *Throttler) Loop() {
lastTimestamp := Milliseconds()
for t.Running {
if t.Queue.IsEmpty() {
t.Running = false
continue
}
first, _ := t.Queue.Peek()
task := first.Task
cost := first.Cost
tokens := ToFloat64(t.Config["tokens"])
if tokens >= 0 {
t.Config["tokens"] = tokens - cost
if task != nil {
task <- true
close(task)
}
t.Queue.Dequeue()
if t.Queue.IsEmpty() {
t.Running = false
}
} else {
sleepTime := ToFloat64(t.Config["delay"]) * 1000
time.Sleep(time.Duration(sleepTime) * time.Millisecond)
current := Milliseconds()
elapsed := current - lastTimestamp
lastTimestamp = current
sumTokens := ToFloat64(t.Config["refillRate"]) * ToFloat64(elapsed)
tokens := ToFloat64(t.Config["tokens"]) + sumTokens
t.Config["tokens"] = MathMin(tokens, ToFloat64(t.Config["capacity"]))
}
}
}
func Milliseconds() int64 {
return time.Now().UnixNano() / int64(time.Millisecond)
}