mirror of
https://github.com/grafana/loki.git
synced 2026-03-13 09:33:58 +08:00
This change adds an internal request queue to the bloom gateway. Instead of executing every single request individually, which involves resolving bloom blocks, downloading them if needed and executing the chunk filtering, requests are now enqueued to the internal, per-tenant queue. The queue implements the same shuffle sharding mechanism as the queue in the query scheduler component. Workers then dequeue a batch of requests for a single tenant and multiplex them into a single processing task for each day. This has the big advantage that the chunks of multiple requests can be processed in a single sequential scan through a set a bloom blocks, without needing to skip back and forth within the binary stream of the block. --------- Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
26 lines
548 B
Go
26 lines
548 B
Go
package queue
|
|
|
|
import "github.com/prometheus/prometheus/util/pool"
|
|
|
|
// SlicePool uses a bucket pool and wraps the Get() and Put() functions for
|
|
// simpler access.
|
|
type SlicePool[T any] struct {
|
|
p *pool.Pool
|
|
}
|
|
|
|
func NewSlicePool[T any](minSize, maxSize int, factor float64) *SlicePool[T] {
|
|
return &SlicePool[T]{
|
|
p: pool.New(minSize, maxSize, factor, func(i int) interface{} {
|
|
return make([]T, 0, i)
|
|
}),
|
|
}
|
|
}
|
|
|
|
func (sp *SlicePool[T]) Get(n int) []T {
|
|
return sp.p.Get(n).([]T)
|
|
}
|
|
|
|
func (sp *SlicePool[T]) Put(buf []T) {
|
|
sp.p.Put(buf[0:0])
|
|
}
|