mirror of
https://github.com/grafana/loki.git
synced 2026-03-13 09:33:58 +08:00
118 lines
3.2 KiB
Go
118 lines
3.2 KiB
Go
package ingester
|
|
|
|
import (
|
|
"github.com/dustin/go-humanize"
|
|
"github.com/go-kit/log/level"
|
|
"go.uber.org/atomic"
|
|
"golang.org/x/sync/singleflight"
|
|
|
|
util_log "github.com/grafana/loki/v3/pkg/util/log"
|
|
)
|
|
|
|
type replayFlusher struct {
|
|
i *Ingester
|
|
}
|
|
|
|
func (f *replayFlusher) Flush() {
|
|
f.i.InitFlushQueues()
|
|
f.i.flush(false) // flush data but don't remove streams from the ingesters
|
|
|
|
// Similar to sweepUsers with the exception that it will not remove streams
|
|
// afterwards to prevent unlinking a stream which may receive later writes from the WAL.
|
|
// We have to do this here after the flushQueues have been drained.
|
|
instances := f.i.getInstances()
|
|
|
|
for _, instance := range instances {
|
|
|
|
_ = instance.streams.ForEach(func(s *stream) (bool, error) {
|
|
f.i.removeFlushedChunks(instance, s, false)
|
|
return true, nil
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
type Flusher interface {
|
|
Flush()
|
|
}
|
|
|
|
// replayController handles coordinating backpressure between WAL replays and chunk flushing.
|
|
type replayController struct {
|
|
// Note, this has to be defined first to make sure it is aligned properly for 32bit ARM OS
|
|
// From https://golang.org/pkg/sync/atomic/#pkg-note-BUG:
|
|
// > On ARM, 386, and 32-bit MIPS, it is the caller's responsibility to arrange for
|
|
// > 64-bit alignment of 64-bit words accessed atomically. The first word in a
|
|
// > variable or in an allocated struct, array, or slice can be relied upon to
|
|
// > be 64-bit aligned.
|
|
currentBytes atomic.Int64
|
|
cfg WALConfig
|
|
metrics *ingesterMetrics
|
|
|
|
flusher Flusher
|
|
flushSF singleflight.Group
|
|
}
|
|
|
|
// flusher is expected to reduce pressure via calling Sub
|
|
func newReplayController(metrics *ingesterMetrics, cfg WALConfig, flusher Flusher) *replayController {
|
|
return &replayController{
|
|
cfg: cfg,
|
|
metrics: metrics,
|
|
flusher: flusher,
|
|
}
|
|
}
|
|
|
|
func (c *replayController) Add(x int64) {
|
|
c.metrics.recoveredBytesTotal.Add(float64(x))
|
|
c.metrics.setRecoveryBytesInUse(c.currentBytes.Add(x))
|
|
}
|
|
|
|
func (c *replayController) Sub(x int64) {
|
|
c.metrics.setRecoveryBytesInUse(c.currentBytes.Sub(x))
|
|
|
|
}
|
|
|
|
func (c *replayController) Cur() int {
|
|
return int(c.currentBytes.Load())
|
|
}
|
|
|
|
func (c *replayController) Flush() {
|
|
// Use singleflight to ensure only one flush happens at a time
|
|
_, _, _ = c.flushSF.Do("flush", func() (interface{}, error) {
|
|
c.flush()
|
|
return nil, nil
|
|
})
|
|
}
|
|
|
|
func (c *replayController) flush() {
|
|
c.metrics.recoveryIsFlushing.Set(1)
|
|
prior := c.currentBytes.Load()
|
|
level.Debug(util_log.Logger).Log(
|
|
"msg", "replay flusher pre-flush",
|
|
"bytes", humanize.Bytes(uint64(prior)),
|
|
)
|
|
|
|
c.flusher.Flush()
|
|
|
|
after := c.currentBytes.Load()
|
|
level.Debug(util_log.Logger).Log(
|
|
"msg", "replay flusher post-flush",
|
|
"bytes", humanize.Bytes(uint64(after)),
|
|
)
|
|
|
|
c.metrics.recoveryIsFlushing.Set(0)
|
|
}
|
|
|
|
// WithBackPressure is expected to call replayController.Add in the passed function to increase the managed byte count.
|
|
// It will call the function as long as there is expected room before the memory cap and will then flush data intermittently
|
|
// when needed.
|
|
func (c *replayController) WithBackPressure(fn func() error) error {
|
|
// use 90% as a threshold since we'll be adding to it.
|
|
for c.Cur() > int(c.cfg.ReplayMemoryCeiling)*9/10 {
|
|
// too much backpressure, flush
|
|
c.Flush()
|
|
}
|
|
|
|
return fn()
|
|
}
|