chore(blooms): Add number of skipped bloom blocks to stats-report log line (#14771)

We already log the number of processed blocks, so we also want to know the number of skipped blocks, because they have not been available. This will give better insights on a per-query basis, additionally to the metric of overall skipped unavailable blocks `loki_bloom_gateway_blocks_not_available_total`.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
This commit is contained in:
Christian Haudum
2024-11-05 15:09:47 +01:00
committed by GitHub
parent 76d35cc97f
commit 03f7eab9c0
2 changed files with 25 additions and 13 deletions

View File

@@ -114,26 +114,28 @@ func (p *processor) processBlocks(ctx context.Context, bqs []*bloomshipper.Close
}()
return concurrency.ForEachJob(ctx, len(bqs), p.concurrency, func(ctx context.Context, i int) error {
bq := bqs[i]
if bq == nil {
blockQuerier := bqs[i]
blockWithTasks := data[i]
// block has not been downloaded or is otherwise not available (yet)
// therefore no querier for this block available
if blockQuerier == nil {
for _, task := range blockWithTasks.tasks {
stats := FromContext(task.ctx)
stats.IncSkippedBlocks()
}
p.metrics.blocksNotAvailable.WithLabelValues(p.id).Inc()
return nil
}
block := data[i]
if !block.ref.Bounds.Equal(bq.Bounds) {
return errors.Errorf("block and querier bounds differ: %s vs %s", block.ref.Bounds, bq.Bounds)
if !blockWithTasks.ref.Bounds.Equal(blockQuerier.Bounds) {
return errors.Errorf("block and querier bounds differ: %s vs %s", blockWithTasks.ref.Bounds, blockQuerier.Bounds)
}
var errs multierror.MultiError
errs.Add(
errors.Wrap(
p.processBlock(ctx, bq, block.tasks),
"processing block",
),
)
errs.Add(bq.Close())
errs.Add(errors.Wrap(p.processBlock(ctx, blockQuerier, blockWithTasks.tasks), "processing block"))
errs.Add(blockQuerier.Close())
hasClosed[i] = true
return errs.Err()
})

View File

@@ -16,6 +16,7 @@ type Stats struct {
BlocksFetchTime *atomic.Duration
ProcessingTime, TotalProcessingTime *atomic.Duration
PostProcessingTime *atomic.Duration
SkippedBlocks *atomic.Int32 // blocks skipped because they were not available (yet)
ProcessedBlocks *atomic.Int32 // blocks processed for this specific request
ProcessedBlocksTotal *atomic.Int32 // blocks processed for multiplexed request
}
@@ -28,6 +29,7 @@ var ctxKey = statsKey(0)
func ContextWithEmptyStats(ctx context.Context) (*Stats, context.Context) {
stats := &Stats{
Status: "unknown",
SkippedBlocks: atomic.NewInt32(0),
ProcessedBlocks: atomic.NewInt32(0),
ProcessedBlocksTotal: atomic.NewInt32(0),
QueueTime: atomic.NewDuration(0),
@@ -71,6 +73,7 @@ func (s *Stats) KVArgs() []any {
"status", s.Status,
"tasks", s.NumTasks,
"matchers", s.NumMatchers,
"blocks_skipped", s.SkippedBlocks.Load(),
"blocks_processed", s.ProcessedBlocks.Load(),
"blocks_processed_total", s.ProcessedBlocksTotal.Load(),
"series_requested", s.SeriesRequested,
@@ -122,6 +125,13 @@ func (s *Stats) AddPostProcessingTime(t time.Duration) {
s.PostProcessingTime.Add(t)
}
func (s *Stats) IncSkippedBlocks() {
if s == nil {
return
}
s.SkippedBlocks.Inc()
}
func (s *Stats) IncProcessedBlocks() {
if s == nil {
return