fix(blooms): Exclude label filters where label name is part of the series labels. (#14661)

This commit is contained in:
Salva Corts
2024-10-31 15:51:15 +01:00
committed by GitHub
parent ebbbccfdaf
commit d1668f6a11
16 changed files with 278 additions and 112 deletions

View File

@@ -15,6 +15,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/logproto"
@@ -41,7 +42,15 @@ func stringSlice[T fmt.Stringer](s []T) []string {
func groupRefs(t *testing.T, chunkRefs []*logproto.ChunkRef) []*logproto.GroupedChunkRefs {
t.Helper()
return groupChunkRefs(chunkRefs, nil)
grouped := groupChunkRefs(nil, chunkRefs, nil)
// Put fake labels to the series
for _, g := range grouped {
g.Labels = &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", fmt.Sprintf("%d", g.Fingerprint))),
}
}
return grouped
}
func newLimits() *validation.Overrides {
@@ -295,12 +304,18 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
{Fingerprint: 1000, Tenant: tenantID, Refs: []*logproto.ShortRef{
{From: 1696248000000, Through: 1696251600000, Checksum: 2},
{From: 1696244400000, Through: 1696248000000, Checksum: 4},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "1000")),
}},
{Fingerprint: 2000, Tenant: tenantID, Refs: []*logproto.ShortRef{
{From: 1696255200000, Through: 1696258800000, Checksum: 3},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "2000")),
}},
{Fingerprint: 3000, Tenant: tenantID, Refs: []*logproto.ShortRef{
{From: 1696240800000, Through: 1696244400000, Checksum: 1},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "3000")),
}},
},
}, res)
@@ -405,6 +420,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
// see MkBasicSeriesWithBlooms() in pkg/storage/bloom/v1/test_util.go
rnd := rand.Intn(len(inputChunkRefs))
fp := inputChunkRefs[rnd].Fingerprint
lbs := inputChunkRefs[rnd].Labels
chks := inputChunkRefs[rnd].Refs
key := fmt.Sprintf("%s:%04x", model.Fingerprint(fp), 0)
@@ -428,6 +444,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
ChunkRefs: []*logproto.GroupedChunkRefs{
{
Fingerprint: fp,
Labels: lbs,
Refs: chks,
Tenant: tenantID,
},

View File

@@ -72,6 +72,7 @@ func (e extractor) Extract(start, end int64, r resultscache.Response, _, _ int64
if len(refs) > 0 {
chunkRefs = append(chunkRefs, &logproto.GroupedChunkRefs{
Fingerprint: chunkRef.Fingerprint,
Labels: chunkRef.Labels,
Tenant: chunkRef.Tenant,
Refs: refs,
})

View File

@@ -322,6 +322,7 @@ func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedCh
}
return &logproto.GroupedChunkRefs{
Fingerprint: a.Fingerprint,
Labels: a.Labels,
Tenant: a.Tenant,
Refs: mergeChunkSets(a.Refs, b.Refs),
}

View File

@@ -174,6 +174,7 @@ func (it *requestIterator) Next() bool {
it.curr = v1.Request{
Recorder: it.recorder,
Fp: model.Fingerprint(group.Fingerprint),
Labels: logproto.FromLabelAdaptersToLabels(group.Labels.Labels),
Chks: convertToChunkRefs(group.Refs),
Search: it.search,
Response: it.channel,

View File

@@ -7,6 +7,7 @@ import (
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
v2 "github.com/grafana/loki/v3/pkg/iter/v2"
@@ -73,6 +74,8 @@ func TestTask_RequestIterator(t *testing.T) {
Refs: []*logproto.GroupedChunkRefs{
{Fingerprint: 100, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-3 * time.Hour), Through: ts.Add(-2 * time.Hour), Checksum: 100},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "100")),
}},
},
}
@@ -83,9 +86,13 @@ func TestTask_RequestIterator(t *testing.T) {
Refs: []*logproto.GroupedChunkRefs{
{Fingerprint: 100, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 200},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "100")),
}},
{Fingerprint: 200, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 300},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "200")),
}},
},
}
@@ -96,6 +103,8 @@ func TestTask_RequestIterator(t *testing.T) {
Refs: []*logproto.GroupedChunkRefs{
{Fingerprint: 200, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 400},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "200")),
}},
},
}

View File

@@ -10,6 +10,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/querier/plan"
@@ -101,7 +102,7 @@ func convertToShortRef(ref *logproto.ChunkRef) *logproto.ShortRef {
return &logproto.ShortRef{From: ref.From, Through: ref.Through, Checksum: ref.Checksum}
}
func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, chunkRefs []*logproto.ChunkRef, queryPlan plan.QueryPlan) ([]*logproto.ChunkRef, error) {
func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, series map[uint64]labels.Labels, chunkRefs []*logproto.ChunkRef, queryPlan plan.QueryPlan) ([]*logproto.ChunkRef, error) {
// Shortcut that does not require any filtering
if !bq.limits.BloomGatewayEnabled(tenant) || len(chunkRefs) == 0 || len(v1.ExtractTestableLabelMatchers(queryPlan.AST)) == 0 {
return chunkRefs, nil
@@ -112,7 +113,7 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
grouped := groupedChunksRefPool.Get(len(chunkRefs))
defer groupedChunksRefPool.Put(grouped)
grouped = groupChunkRefs(chunkRefs, grouped)
grouped = groupChunkRefs(series, chunkRefs, grouped)
preFilterChunks := len(chunkRefs)
preFilterSeries := len(grouped)
@@ -225,7 +226,7 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
// groups them by fingerprint.
// The second argument `grouped` can be used to pass a buffer to avoid allocations.
// If it's nil, the returned slice will be allocated.
func groupChunkRefs(chunkRefs []*logproto.ChunkRef, grouped []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs {
func groupChunkRefs(series map[uint64]labels.Labels, chunkRefs []*logproto.ChunkRef, grouped []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs {
seen := make(map[uint64]int, len(grouped))
for _, chunkRef := range chunkRefs {
if idx, found := seen[chunkRef.Fingerprint]; found {
@@ -234,10 +235,14 @@ func groupChunkRefs(chunkRefs []*logproto.ChunkRef, grouped []*logproto.GroupedC
seen[chunkRef.Fingerprint] = len(grouped)
grouped = append(grouped, &logproto.GroupedChunkRefs{
Fingerprint: chunkRef.Fingerprint,
Tenant: chunkRef.UserID,
Refs: []*logproto.ShortRef{convertToShortRef(chunkRef)},
Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(series[chunkRef.Fingerprint]),
},
Tenant: chunkRef.UserID,
Refs: []*logproto.ShortRef{convertToShortRef(chunkRef)},
})
}
}
return grouped
}

View File

@@ -2,6 +2,7 @@ package bloomgateway
import (
"context"
"fmt"
"math/rand"
"sort"
"testing"
@@ -10,6 +11,7 @@ import (
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/logproto"
@@ -79,7 +81,7 @@ func TestBloomQuerier(t *testing.T) {
}
expr, err := syntax.ParseExpr(`{foo="bar"}`)
require.NoError(t, err)
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr})
require.NoError(t, err)
require.Equal(t, chunkRefs, res)
require.Equal(t, 0, c.callCount)
@@ -95,7 +97,7 @@ func TestBloomQuerier(t *testing.T) {
chunkRefs := []*logproto.ChunkRef{}
expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`)
require.NoError(t, err)
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr})
require.NoError(t, err)
require.Equal(t, chunkRefs, res)
require.Equal(t, 0, c.callCount)
@@ -115,7 +117,7 @@ func TestBloomQuerier(t *testing.T) {
}
expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`)
require.NoError(t, err)
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr})
require.Error(t, err)
require.Nil(t, res)
})
@@ -134,7 +136,7 @@ func TestBloomQuerier(t *testing.T) {
}
expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`)
require.NoError(t, err)
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr})
require.NoError(t, err)
require.Equal(t, chunkRefs, res)
require.Equal(t, 2, c.callCount)
@@ -143,28 +145,44 @@ func TestBloomQuerier(t *testing.T) {
}
func TestGroupChunkRefs(t *testing.T) {
chunkRefs := []*logproto.ChunkRef{
{Fingerprint: 0x00, UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{Fingerprint: 0x00, UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
{Fingerprint: 0x01, UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{Fingerprint: 0x01, UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
{Fingerprint: 0x02, UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{Fingerprint: 0x02, UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
series := []labels.Labels{
labels.FromStrings("app", "1"),
labels.FromStrings("app", "2"),
labels.FromStrings("app", "3"),
}
seriesMap := make(map[uint64]labels.Labels)
for _, s := range series {
seriesMap[s.Hash()] = s
}
result := groupChunkRefs(chunkRefs, nil)
chunkRefs := []*logproto.ChunkRef{
{Fingerprint: series[0].Hash(), UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{Fingerprint: series[0].Hash(), UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
{Fingerprint: series[1].Hash(), UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{Fingerprint: series[1].Hash(), UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
{Fingerprint: series[2].Hash(), UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{Fingerprint: series[2].Hash(), UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
}
result := groupChunkRefs(seriesMap, chunkRefs, nil)
require.Equal(t, []*logproto.GroupedChunkRefs{
{Fingerprint: 0x00, Tenant: "tenant", Refs: []*logproto.ShortRef{
{Fingerprint: series[0].Hash(), Tenant: "tenant", Refs: []*logproto.ShortRef{
{From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(series[0]),
}},
{Fingerprint: 0x01, Tenant: "tenant", Refs: []*logproto.ShortRef{
{Fingerprint: series[1].Hash(), Tenant: "tenant", Refs: []*logproto.ShortRef{
{From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(series[1]),
}},
{Fingerprint: 0x02, Tenant: "tenant", Refs: []*logproto.ShortRef{
{Fingerprint: series[2].Hash(), Tenant: "tenant", Refs: []*logproto.ShortRef{
{From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(series[2]),
}},
}, result)
}
@@ -175,11 +193,15 @@ func BenchmarkGroupChunkRefs(b *testing.B) {
n := 1000 // num series
m := 10000 // num chunks per series
chunkRefs := make([]*logproto.ChunkRef, 0, n*m)
series := make(map[uint64]labels.Labels, n)
for i := 0; i < n; i++ {
s := labels.FromStrings("app", fmt.Sprintf("%d", i))
sFP := s.Hash()
series[sFP] = s
for j := 0; j < m; j++ {
chunkRefs = append(chunkRefs, &logproto.ChunkRef{
Fingerprint: uint64(n),
Fingerprint: sFP,
UserID: "tenant",
From: mktime("2024-04-20 00:00"),
Through: mktime("2024-04-20 00:59"),
@@ -196,5 +218,5 @@ func BenchmarkGroupChunkRefs(b *testing.B) {
b.StartTimer()
groups := make([]*logproto.GroupedChunkRefs, 0, n)
groupChunkRefs(chunkRefs, groups)
groupChunkRefs(series, chunkRefs, groups)
}

View File

@@ -110,6 +110,7 @@ func partitionSeriesByDay(from, through model.Time, seriesWithChunks []*logproto
res = append(res, &logproto.GroupedChunkRefs{
Fingerprint: series.Fingerprint,
Labels: series.Labels,
Tenant: series.Tenant,
Refs: relevantChunks,
})

View File

@@ -58,7 +58,7 @@ type IndexClientWithRange struct {
}
type BloomQuerier interface {
FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, chunks []*logproto.ChunkRef, plan plan.QueryPlan) ([]*logproto.ChunkRef, error)
FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, series map[uint64]labels.Labels, chunks []*logproto.ChunkRef, plan plan.QueryPlan) ([]*logproto.ChunkRef, error)
}
type Gateway struct {
@@ -225,12 +225,16 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ
return nil, err
}
series := make(map[uint64]labels.Labels)
result = &logproto.GetChunkRefResponse{
Refs: make([]*logproto.ChunkRef, 0, len(chunks)),
}
for _, cs := range chunks {
for i := range cs {
result.Refs = append(result.Refs, &cs[i].ChunkRef)
if _, ok := series[cs[i].Fingerprint]; !ok {
series[cs[i].Fingerprint] = cs[i].Metric
}
}
}
@@ -257,7 +261,7 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ
return result, nil
}
chunkRefs, err := g.bloomQuerier.FilterChunkRefs(ctx, instanceID, req.From, req.Through, result.Refs, req.Plan)
chunkRefs, err := g.bloomQuerier.FilterChunkRefs(ctx, instanceID, req.From, req.Through, series, result.Refs, req.Plan)
if err != nil {
return nil, err
}

View File

@@ -178,6 +178,9 @@ type GroupedChunkRefs struct {
Fingerprint uint64 `protobuf:"varint,1,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"`
Tenant string `protobuf:"bytes,2,opt,name=tenant,proto3" json:"tenant,omitempty"`
Refs []*ShortRef `protobuf:"bytes,3,rep,name=refs,proto3" json:"refs,omitempty"`
// Labels are only populated on FilterChunkRefRequest. They are not returned on FilterChunkRefResponse
// TODO(salvacorts): Consider two different messages for FilterChunkRefRequest and FilterChunkRefResponse
Labels *IndexSeries `protobuf:"bytes,4,opt,name=labels,proto3" json:"labels,omitempty"`
}
func (m *GroupedChunkRefs) Reset() { *m = GroupedChunkRefs{} }
@@ -233,6 +236,13 @@ func (m *GroupedChunkRefs) GetRefs() []*ShortRef {
return nil
}
func (m *GroupedChunkRefs) GetLabels() *IndexSeries {
if m != nil {
return m.Labels
}
return nil
}
func init() {
proto.RegisterType((*FilterChunkRefRequest)(nil), "logproto.FilterChunkRefRequest")
proto.RegisterType((*FilterChunkRefResponse)(nil), "logproto.FilterChunkRefResponse")
@@ -243,39 +253,41 @@ func init() {
func init() { proto.RegisterFile("pkg/logproto/bloomgateway.proto", fileDescriptor_a50b5dd1dbcd1415) }
var fileDescriptor_a50b5dd1dbcd1415 = []byte{
// 504 bytes of a gzipped FileDescriptorProto
// 532 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x53, 0xcd, 0x6e, 0xd3, 0x40,
0x10, 0xf6, 0x36, 0x21, 0x24, 0x1b, 0xfe, 0xb4, 0x82, 0xca, 0x0a, 0xd2, 0xc6, 0xca, 0x81, 0xfa,
0xe4, 0x95, 0x52, 0x21, 0x71, 0xe1, 0x92, 0x4a, 0x54, 0xdc, 0x60, 0x41, 0x1c, 0x90, 0x38, 0x38,
0xee, 0xfa, 0x47, 0xb6, 0x77, 0xdc, 0xf5, 0x1a, 0xd4, 0x1b, 0x8f, 0xc0, 0x63, 0xf0, 0x04, 0x3c,
0x43, 0x8f, 0x11, 0xa7, 0x8a, 0x43, 0x45, 0x9c, 0x0b, 0xc7, 0x3e, 0x02, 0xb2, 0x1d, 0x37, 0x4d,
0x05, 0xaa, 0xc4, 0x89, 0x93, 0x77, 0x76, 0xbe, 0x19, 0x7f, 0xf3, 0x7d, 0x3b, 0x78, 0x9c, 0xc5,
0x01, 0x4b, 0x20, 0xc8, 0x14, 0x68, 0x60, 0xf3, 0x04, 0x20, 0x0d, 0x5c, 0x2d, 0x3e, 0xb9, 0x27,
0x4e, 0x7d, 0x45, 0xfa, 0x6d, 0x72, 0xf4, 0x30, 0x80, 0x00, 0x1a, 0x5c, 0x75, 0x6a, 0xf2, 0xa3,
0xc7, 0x5b, 0x0d, 0xda, 0x43, 0x93, 0x9c, 0x7c, 0xdf, 0xc1, 0x8f, 0x5e, 0x44, 0x89, 0x16, 0xea,
0x20, 0x2c, 0x64, 0xcc, 0x85, 0xcf, 0xc5, 0x71, 0x21, 0x72, 0x4d, 0x0e, 0x70, 0xd7, 0x57, 0x90,
0x9a, 0xc8, 0x42, 0x76, 0x67, 0xc6, 0x4e, 0xcf, 0xc7, 0xc6, 0x8f, 0xf3, 0xf1, 0x5e, 0x10, 0xe9,
0xb0, 0x98, 0x3b, 0x1e, 0xa4, 0x2c, 0x53, 0x90, 0x0a, 0x1d, 0x8a, 0x22, 0x67, 0x1e, 0xa4, 0x29,
0x48, 0x96, 0xc2, 0x91, 0x48, 0x9c, 0xb7, 0x51, 0x2a, 0x78, 0x5d, 0x4c, 0x5e, 0xe2, 0xdb, 0x3a,
0x54, 0x50, 0x04, 0xa1, 0xb9, 0xf3, 0x6f, 0x7d, 0xda, 0x7a, 0xe2, 0xe0, 0xae, 0x12, 0x7e, 0x6e,
0x76, 0xac, 0x8e, 0x3d, 0x9c, 0x8e, 0x9c, 0xcb, 0x41, 0x0e, 0x15, 0x14, 0x99, 0x38, 0x6a, 0xf9,
0xe7, 0xbc, 0xc6, 0x11, 0x17, 0x77, 0xb3, 0xc4, 0x95, 0xe6, 0x2d, 0x0b, 0xd9, 0xc3, 0xe9, 0xbd,
0x0d, 0xfe, 0x55, 0xe2, 0xca, 0xd9, 0xf3, 0x35, 0x8f, 0xa7, 0x57, 0x78, 0x04, 0xca, 0xf5, 0x5d,
0xe9, 0xb2, 0x04, 0xe2, 0x88, 0x7d, 0xdc, 0x67, 0x95, 0x6e, 0xc7, 0x85, 0x50, 0x91, 0x50, 0xac,
0x6a, 0xe5, 0xbc, 0x2e, 0x84, 0x3a, 0xa9, 0xca, 0x79, 0xdd, 0x9a, 0xec, 0xe2, 0xde, 0x3c, 0x01,
0x2f, 0xce, 0xcd, 0x9e, 0xd5, 0xb1, 0x07, 0x7c, 0x1d, 0x4d, 0x38, 0xde, 0xbd, 0xae, 0x69, 0x9e,
0x81, 0xcc, 0x05, 0x79, 0x86, 0x07, 0x5e, 0xcb, 0xd3, 0x44, 0x37, 0x4e, 0xb2, 0x01, 0x4f, 0xbe,
0x21, 0xdc, 0x7f, 0x13, 0x82, 0xd2, 0x5c, 0xf8, 0xff, 0x9d, 0x37, 0x23, 0xdc, 0xf7, 0x42, 0xe1,
0xc5, 0x79, 0x91, 0x9a, 0x1d, 0x0b, 0xd9, 0x77, 0xf9, 0x65, 0x3c, 0xd1, 0xf8, 0xc1, 0xf5, 0xb9,
0x88, 0x85, 0x87, 0x7e, 0x24, 0x03, 0xa1, 0x32, 0x15, 0x49, 0x5d, 0x8f, 0xd1, 0xe5, 0x57, 0xaf,
0x2a, 0x69, 0xb5, 0x90, 0xae, 0xd4, 0x35, 0xb7, 0x01, 0x5f, 0x47, 0xe4, 0xc9, 0xd6, 0x2b, 0x20,
0x1b, 0xed, 0x5a, 0x6d, 0x1a, 0xf7, 0xa7, 0x3e, 0xbe, 0x33, 0xab, 0x56, 0xe5, 0xb0, 0x59, 0x15,
0xf2, 0x0e, 0xdf, 0xdf, 0xb6, 0x24, 0x27, 0xe3, 0x4d, 0xf1, 0x1f, 0x37, 0x60, 0x64, 0xfd, 0x1d,
0xd0, 0xd8, 0x39, 0x31, 0x66, 0x1f, 0x16, 0x4b, 0x6a, 0x9c, 0x2d, 0xa9, 0x71, 0xb1, 0xa4, 0xe8,
0x73, 0x49, 0xd1, 0xd7, 0x92, 0xa2, 0xd3, 0x92, 0xa2, 0x45, 0x49, 0xd1, 0xcf, 0x92, 0xa2, 0x5f,
0x25, 0x35, 0x2e, 0x4a, 0x8a, 0xbe, 0xac, 0xa8, 0xb1, 0x58, 0x51, 0xe3, 0x6c, 0x45, 0x8d, 0xf7,
0x7b, 0x37, 0xbc, 0xba, 0xf6, 0xbf, 0xf3, 0x5e, 0xfd, 0xd9, 0xff, 0x1d, 0x00, 0x00, 0xff, 0xff,
0x30, 0x14, 0xc6, 0xc9, 0x05, 0x04, 0x00, 0x00,
0x10, 0xf6, 0x36, 0x21, 0x24, 0x1b, 0xfe, 0xb4, 0xa2, 0x95, 0x15, 0xa4, 0x8d, 0x95, 0x03, 0xcd,
0x05, 0x5b, 0x4a, 0x85, 0xc4, 0x85, 0x4b, 0x2a, 0x51, 0xf5, 0x06, 0x5b, 0xc4, 0x01, 0x89, 0x83,
0xe3, 0x8c, 0x7f, 0x14, 0x7b, 0xd7, 0xdd, 0x5d, 0x03, 0xbd, 0xf1, 0x08, 0xbc, 0x03, 0x17, 0x9e,
0x80, 0x67, 0xe8, 0x31, 0xe2, 0x54, 0x71, 0xa8, 0x88, 0x73, 0xe1, 0xd8, 0x47, 0x40, 0x76, 0xe2,
0xba, 0xa9, 0x40, 0x95, 0x38, 0x71, 0xf2, 0xee, 0xce, 0x37, 0xe3, 0x6f, 0xbe, 0x6f, 0x06, 0xf7,
0xd3, 0x59, 0xe0, 0xc4, 0x22, 0x48, 0xa5, 0xd0, 0xc2, 0x99, 0xc4, 0x42, 0x24, 0x81, 0xab, 0xe1,
0x83, 0x7b, 0x62, 0x97, 0x4f, 0xa4, 0x5d, 0x05, 0x7b, 0x0f, 0x03, 0x11, 0x88, 0x15, 0xae, 0x38,
0xad, 0xe2, 0xbd, 0x47, 0x1b, 0x05, 0xaa, 0xc3, 0x2a, 0x38, 0xf8, 0xbe, 0x85, 0xb7, 0x5f, 0x44,
0xb1, 0x06, 0xb9, 0x1f, 0x66, 0x7c, 0xc6, 0xc0, 0x67, 0x70, 0x9c, 0x81, 0xd2, 0x64, 0x1f, 0x37,
0x7d, 0x29, 0x12, 0x13, 0x59, 0x68, 0xd8, 0x18, 0x3b, 0xa7, 0xe7, 0x7d, 0xe3, 0xc7, 0x79, 0x7f,
0x37, 0x88, 0x74, 0x98, 0x4d, 0x6c, 0x4f, 0x24, 0x4e, 0x2a, 0x45, 0x02, 0x3a, 0x84, 0x4c, 0x39,
0x9e, 0x48, 0x12, 0xc1, 0x9d, 0x44, 0x4c, 0x21, 0xb6, 0x5f, 0x47, 0x09, 0xb0, 0x32, 0x99, 0x1c,
0xe2, 0xdb, 0x3a, 0x94, 0x22, 0x0b, 0x42, 0x73, 0xeb, 0xdf, 0xea, 0x54, 0xf9, 0xc4, 0xc6, 0x4d,
0x09, 0xbe, 0x32, 0x1b, 0x56, 0x63, 0xd8, 0x1d, 0xf5, 0xec, 0xcb, 0x46, 0x0e, 0xa4, 0xc8, 0x52,
0x98, 0x56, 0xfc, 0x15, 0x2b, 0x71, 0xc4, 0xc5, 0xcd, 0x34, 0x76, 0xb9, 0x79, 0xcb, 0x42, 0xc3,
0xee, 0xe8, 0x5e, 0x8d, 0x7f, 0x19, 0xbb, 0x7c, 0xfc, 0x7c, 0xcd, 0xe3, 0xe9, 0x15, 0x1e, 0x81,
0x74, 0x7d, 0x97, 0xbb, 0x4e, 0x2c, 0x66, 0x91, 0xf3, 0x7e, 0xcf, 0x29, 0x74, 0x3b, 0xce, 0x40,
0x46, 0x20, 0x9d, 0xa2, 0x94, 0xfd, 0x2a, 0x03, 0x79, 0x52, 0xa4, 0xb3, 0xb2, 0x34, 0xd9, 0xc1,
0xad, 0x49, 0x2c, 0xbc, 0x99, 0x32, 0x5b, 0x56, 0x63, 0xd8, 0x61, 0xeb, 0xdb, 0x80, 0xe1, 0x9d,
0xeb, 0x9a, 0xaa, 0x54, 0x70, 0x05, 0xe4, 0x19, 0xee, 0x78, 0x15, 0x4f, 0x13, 0xdd, 0xd8, 0x49,
0x0d, 0x1e, 0x7c, 0x43, 0xb8, 0x7d, 0x14, 0x0a, 0xa9, 0x19, 0xf8, 0xff, 0x9d, 0x37, 0x3d, 0xdc,
0xf6, 0x42, 0xf0, 0x66, 0x2a, 0x4b, 0xcc, 0x86, 0x85, 0x86, 0x77, 0xd9, 0xe5, 0x7d, 0xf0, 0x05,
0xe1, 0x07, 0xd7, 0x1b, 0x23, 0x16, 0xee, 0xfa, 0x11, 0x0f, 0x40, 0xa6, 0x32, 0xe2, 0xba, 0xec,
0xa3, 0xc9, 0xae, 0x3e, 0x15, 0xda, 0x6a, 0xe0, 0x2e, 0xd7, 0x25, 0xb9, 0x0e, 0x5b, 0xdf, 0xc8,
0xe3, 0x8d, 0x31, 0x20, 0xb5, 0x78, 0x95, 0x38, 0x6b, 0xfb, 0x9f, 0xe0, 0x56, 0xec, 0x4e, 0x20,
0x56, 0x66, 0xb3, 0x1c, 0x80, 0xed, 0x1a, 0x79, 0xc8, 0xa7, 0xf0, 0xf1, 0xa8, 0xf0, 0x55, 0xb1,
0x35, 0x68, 0xe4, 0xe3, 0x3b, 0xe3, 0x62, 0xb5, 0x0e, 0x56, 0xab, 0x45, 0xde, 0xe0, 0xfb, 0x9b,
0x16, 0x2a, 0xd2, 0xaf, 0x2b, 0xfc, 0x71, 0x63, 0x7a, 0xd6, 0xdf, 0x01, 0x2b, 0xfb, 0x07, 0xc6,
0xf8, 0xdd, 0x7c, 0x41, 0x8d, 0xb3, 0x05, 0x35, 0x2e, 0x16, 0x14, 0x7d, 0xca, 0x29, 0xfa, 0x9a,
0x53, 0x74, 0x9a, 0x53, 0x34, 0xcf, 0x29, 0xfa, 0x99, 0x53, 0xf4, 0x2b, 0xa7, 0xc6, 0x45, 0x4e,
0xd1, 0xe7, 0x25, 0x35, 0xe6, 0x4b, 0x6a, 0x9c, 0x2d, 0xa9, 0xf1, 0x76, 0xf7, 0x86, 0x29, 0xad,
0xfe, 0x3b, 0x69, 0x95, 0x9f, 0xbd, 0xdf, 0x01, 0x00, 0x00, 0xff, 0xff, 0xcf, 0x05, 0x31, 0x08,
0x35, 0x04, 0x00, 0x00,
}
func (this *FilterChunkRefRequest) Equal(that interface{}) bool {
@@ -416,6 +428,9 @@ func (this *GroupedChunkRefs) Equal(that interface{}) bool {
return false
}
}
if !this.Labels.Equal(that1.Labels) {
return false
}
return true
}
func (this *FilterChunkRefRequest) GoString() string {
@@ -462,13 +477,16 @@ func (this *GroupedChunkRefs) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 7)
s := make([]string, 0, 8)
s = append(s, "&logproto.GroupedChunkRefs{")
s = append(s, "Fingerprint: "+fmt.Sprintf("%#v", this.Fingerprint)+",\n")
s = append(s, "Tenant: "+fmt.Sprintf("%#v", this.Tenant)+",\n")
if this.Refs != nil {
s = append(s, "Refs: "+fmt.Sprintf("%#v", this.Refs)+",\n")
}
if this.Labels != nil {
s = append(s, "Labels: "+fmt.Sprintf("%#v", this.Labels)+",\n")
}
s = append(s, "}")
return strings.Join(s, "")
}
@@ -722,6 +740,18 @@ func (m *GroupedChunkRefs) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i
var l int
_ = l
if m.Labels != nil {
{
size, err := m.Labels.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintBloomgateway(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x22
}
if len(m.Refs) > 0 {
for iNdEx := len(m.Refs) - 1; iNdEx >= 0; iNdEx-- {
{
@@ -843,6 +873,10 @@ func (m *GroupedChunkRefs) Size() (n int) {
n += 1 + l + sovBloomgateway(uint64(l))
}
}
if m.Labels != nil {
l = m.Labels.Size()
n += 1 + l + sovBloomgateway(uint64(l))
}
return n
}
@@ -911,6 +945,7 @@ func (this *GroupedChunkRefs) String() string {
`Fingerprint:` + fmt.Sprintf("%v", this.Fingerprint) + `,`,
`Tenant:` + fmt.Sprintf("%v", this.Tenant) + `,`,
`Refs:` + repeatedStringForRefs + `,`,
`Labels:` + strings.Replace(fmt.Sprintf("%v", this.Labels), "IndexSeries", "IndexSeries", 1) + `,`,
`}`,
}, "")
return s
@@ -1424,6 +1459,42 @@ func (m *GroupedChunkRefs) Unmarshal(dAtA []byte) error {
return err
}
iNdEx = postIndex
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowBloomgateway
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthBloomgateway
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthBloomgateway
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Labels == nil {
m.Labels = &IndexSeries{}
}
if err := m.Labels.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipBloomgateway(dAtA[iNdEx:])

View File

@@ -45,6 +45,9 @@ message GroupedChunkRefs {
uint64 fingerprint = 1;
string tenant = 2;
repeated ShortRef refs = 3;
// Labels are only populated on FilterChunkRefRequest. They are not returned on FilterChunkRefResponse
// TODO(salvacorts): Consider two different messages for FilterChunkRefRequest and FilterChunkRefResponse
IndexSeries labels = 4;
}
service BloomGateway {

View File

@@ -391,6 +391,7 @@ func (m *FilterChunkRefRequest) WithStartEndForCache(start, end time.Time) resul
if len(refs) > 0 {
chunkRefs = append(chunkRefs, &GroupedChunkRefs{
Fingerprint: chunkRef.Fingerprint,
Labels: chunkRef.Labels,
Tenant: chunkRef.Tenant,
Refs: refs,
})

View File

@@ -4,28 +4,32 @@ import (
"fmt"
"unsafe"
"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)
type BloomTest interface {
Matches(bloom filter.Checker) bool
MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool
Matches(series labels.Labels, bloom filter.Checker) bool
MatchesWithPrefixBuf(series labels.Labels, bloom filter.Checker, buf []byte, prefixLen int) bool
}
type BloomTests []BloomTest
func (b BloomTests) Matches(bloom filter.Checker) bool {
func (b BloomTests) Matches(series labels.Labels, bloom filter.Checker) bool {
for _, test := range b {
if !test.Matches(bloom) {
if !test.Matches(series, bloom) {
return false
}
}
return true
}
func (b BloomTests) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool {
func (b BloomTests) MatchesWithPrefixBuf(series labels.Labels, bloom filter.Checker, buf []byte, prefixLen int) bool {
for _, test := range b {
if !test.MatchesWithPrefixBuf(bloom, buf, prefixLen) {
if !test.MatchesWithPrefixBuf(series, bloom, buf, prefixLen) {
return false
}
}
@@ -37,12 +41,12 @@ type matchAllTest struct{}
var MatchAll = matchAllTest{}
// Matches implements BloomTest
func (n matchAllTest) Matches(_ filter.Checker) bool {
func (n matchAllTest) Matches(_ labels.Labels, _ filter.Checker) bool {
return true
}
// MatchesWithPrefixBuf implements BloomTest
func (n matchAllTest) MatchesWithPrefixBuf(_ filter.Checker, _ []byte, _ int) bool {
func (n matchAllTest) MatchesWithPrefixBuf(_ labels.Labels, _ filter.Checker, _ []byte, _ int) bool {
return true
}
@@ -72,13 +76,13 @@ func newOrTest(left, right BloomTest) orTest {
}
// Matches implements BloomTest
func (o orTest) Matches(bloom filter.Checker) bool {
return o.left.Matches(bloom) || o.right.Matches(bloom)
func (o orTest) Matches(series labels.Labels, bloom filter.Checker) bool {
return o.left.Matches(series, bloom) || o.right.Matches(series, bloom)
}
// MatchesWithPrefixBuf implements BloomTest
func (o orTest) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool {
return o.left.MatchesWithPrefixBuf(bloom, buf, prefixLen) || o.right.MatchesWithPrefixBuf(bloom, buf, prefixLen)
func (o orTest) MatchesWithPrefixBuf(series labels.Labels, bloom filter.Checker, buf []byte, prefixLen int) bool {
return o.left.MatchesWithPrefixBuf(series, bloom, buf, prefixLen) || o.right.MatchesWithPrefixBuf(series, bloom, buf, prefixLen)
}
type andTest struct {
@@ -93,13 +97,13 @@ func newAndTest(left, right BloomTest) andTest {
}
// Matches implements BloomTest
func (a andTest) Matches(bloom filter.Checker) bool {
return a.left.Matches(bloom) && a.right.Matches(bloom)
func (a andTest) Matches(series labels.Labels, bloom filter.Checker) bool {
return a.left.Matches(series, bloom) && a.right.Matches(series, bloom)
}
// MatchesWithPrefixBuf implements BloomTest
func (a andTest) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool {
return a.left.MatchesWithPrefixBuf(bloom, buf, prefixLen) && a.right.MatchesWithPrefixBuf(bloom, buf, prefixLen)
func (a andTest) MatchesWithPrefixBuf(series labels.Labels, bloom filter.Checker, buf []byte, prefixLen int) bool {
return a.left.MatchesWithPrefixBuf(series, bloom, buf, prefixLen) && a.right.MatchesWithPrefixBuf(series, bloom, buf, prefixLen)
}
func LabelMatchersToBloomTest(matchers ...LabelMatcher) BloomTest {
@@ -144,7 +148,7 @@ func newStringMatcherTest(matcher PlainLabelMatcher) stringMatcherTest {
return stringMatcherTest{matcher: matcher}
}
func (sm stringMatcherTest) Matches(bloom filter.Checker) bool {
func (sm stringMatcherTest) Matches(series labels.Labels, bloom filter.Checker) bool {
// TODO(rfratto): reintroduce the use of a shared tokenizer here to avoid
// desyncing between how tokens are passed during building vs passed during
// querying.
@@ -155,44 +159,38 @@ func (sm stringMatcherTest) Matches(bloom filter.Checker) bool {
// 2. It should be possible to test for just the key
var (
combined = fmt.Sprintf("%s=%s", sm.matcher.Key, sm.matcher.Value)
rawKey = unsafe.Slice(unsafe.StringData(sm.matcher.Key), len(sm.matcher.Key))
combined = fmt.Sprintf("%s=%s", sm.matcher.Key, sm.matcher.Value)
rawCombined = unsafe.Slice(unsafe.StringData(combined), len(combined))
)
if !bloom.Test(rawKey) {
// The structured metadata key wasn't indexed. However, sm.matcher might be
// checking against a label which *does* exist, so we can't safely filter
// out this chunk.
//
// TODO(rfratto): The negative test here is a bit confusing, and the key
// presence test should likely be done higher up within FuseQuerier.
return true
}
return bloom.Test(rawCombined)
return sm.match(series, bloom, rawCombined)
}
func (sm stringMatcherTest) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool {
func (sm stringMatcherTest) MatchesWithPrefixBuf(series labels.Labels, bloom filter.Checker, buf []byte, prefixLen int) bool {
var (
combined = fmt.Sprintf("%s=%s", sm.matcher.Key, sm.matcher.Value)
prefixedKey = appendToBuf(buf, prefixLen, sm.matcher.Key)
combined = fmt.Sprintf("%s=%s", sm.matcher.Key, sm.matcher.Value)
prefixedCombined = appendToBuf(buf, prefixLen, combined)
)
if !bloom.Test(prefixedKey) {
// The structured metadata key wasn't indexed for a prefix. However,
// sm.matcher might be checking against a label which *does* exist, so we
// can't safely filter out this chunk.
//
// TODO(rfratto): The negative test here is a bit confusing, and the key
// presence test should likely be done higher up within FuseQuerier.
return sm.match(series, bloom, prefixedCombined)
}
// match returns true if the series matches the matcher or is in the bloom filter.
func (sm stringMatcherTest) match(series labels.Labels, bloom filter.Checker, combined []byte) bool {
// If we don't have the series labels, we cannot disambiguate which labels come from the series in which case
// we may filter out chunks for queries like `{env="prod"} | env="prod"` if env=prod is not structured metadata
if len(series) == 0 {
level.Warn(util_log.Logger).Log("msg", "series has no labels, cannot filter out chunks")
return true
}
return bloom.Test(prefixedCombined)
// It's in the series if the key is set and has the same value.
// By checking val != "" we handle `{env="prod"} | user=""`.
val := series.Get(sm.matcher.Key)
inSeries := val != "" && val == sm.matcher.Value
inBloom := bloom.Test(combined)
return inSeries || inBloom
}
// appendToBuf is the equivalent of append(buf[:prefixLen], str). len(buf) must

View File

@@ -3,6 +3,7 @@ package v1
import (
"testing"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/logql/syntax"
@@ -20,9 +21,11 @@ func TestLabelMatchersToBloomTest(t *testing.T) {
tokenizer,
push.LabelAdapter{Name: "trace_id", Value: "exists_1"},
push.LabelAdapter{Name: "trace_id", Value: "exists_2"},
push.LabelAdapter{Name: "app", Value: "other"},
)
)
series := labels.FromStrings("env", "prod", "app", "fake")
tt := []struct {
name string
query string
@@ -54,15 +57,40 @@ func TestLabelMatchersToBloomTest(t *testing.T) {
match: false,
},
{
name: "ignore non-indexed key",
name: "filter non-indexed key",
query: `{app="fake"} | noexist="noexist"`,
match: false,
},
{
name: "filter non-indexed key with empty value",
query: `{app="fake"} | noexist=""`,
match: false,
},
{
name: "ignore label from series",
query: `{app="fake"} | env="prod"`,
match: true,
},
{
name: "ignore non-indexed key with empty value",
query: `{app="fake"} | noexist=""`,
name: "filter label from series",
query: `{app="fake"} | env="dev"`, // env is set to prod in the series
match: false,
},
{
name: "ignore label from series and structured metadata",
query: `{app="fake"} | app="other"`,
match: true,
},
{
name: "filter series label with non-existing value",
query: `{app="fake"} | app="noexist"`,
match: false,
},
{
name: "ignore label from series with empty value",
query: `{app="fake"} | app=""`,
match: false,
},
{
name: "ignore unsupported operator",
query: `{app="fake"} | trace_id=~".*noexist.*"`,
@@ -99,8 +127,8 @@ func TestLabelMatchersToBloomTest(t *testing.T) {
bloomTest := LabelMatchersToBloomTest(matchers...)
// .Matches and .MatchesWithPrefixBuf should both have the same result.
require.Equal(t, tc.match, bloomTest.Matches(bloom))
require.Equal(t, tc.match, bloomTest.MatchesWithPrefixBuf(bloom, []byte(prefix), len(prefix)))
require.Equal(t, tc.match, bloomTest.Matches(series, bloom))
require.Equal(t, tc.match, bloomTest.MatchesWithPrefixBuf(series, bloom, []byte(prefix), len(prefix)))
})
}
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"go.uber.org/atomic"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
@@ -15,6 +16,7 @@ import (
type Request struct {
Fp model.Fingerprint
Labels labels.Labels
Chks ChunkRefs
Search BloomTest
Response chan<- Output
@@ -252,6 +254,7 @@ func (fq *FusedQuerier) Run() error {
nextBatch := fq.inputs.At()
fp := nextBatch[0].Fp
lbs := nextBatch[0].Labels
// advance the series iterator to the next fingerprint
if err := fq.bq.Seek(fp); err != nil {
@@ -276,13 +279,13 @@ func (fq *FusedQuerier) Run() error {
continue
}
fq.runSeries(schema, series, nextBatch)
fq.runSeries(schema, lbs, series, nextBatch)
}
return nil
}
func (fq *FusedQuerier) runSeries(_ Schema, series *SeriesWithMeta, reqs []Request) {
func (fq *FusedQuerier) runSeries(_ Schema, lbs labels.Labels, series *SeriesWithMeta, reqs []Request) {
// For a given chunk|series to be removed, it must fail to match all blooms.
// Because iterating/loading blooms can be expensive, we iterate blooms one at a time, collecting
// the removals (failures) for each (bloom, chunk) pair.
@@ -372,7 +375,7 @@ func (fq *FusedQuerier) runSeries(_ Schema, series *SeriesWithMeta, reqs []Reque
// shortcut: series level removal
// we can skip testing chunk keys individually if the bloom doesn't match
// the query.
if !req.Search.Matches(bloom) {
if !req.Search.Matches(lbs, bloom) {
// Nothing else needs to be done for this (bloom, request);
// check the next input request
continue
@@ -387,7 +390,7 @@ func (fq *FusedQuerier) runSeries(_ Schema, series *SeriesWithMeta, reqs []Reque
// TODO(rfratto): reuse buffer between multiple calls to
// prefixForChunkRef and MatchesWithPrefixBuf to avoid allocations.
tokenBuf := prefixForChunkRef(chk)
if matched := req.Search.MatchesWithPrefixBuf(bloom, tokenBuf, len(tokenBuf)); matched {
if matched := req.Search.MatchesWithPrefixBuf(lbs, bloom, tokenBuf, len(tokenBuf)); matched {
inputs[j].found[k] = true
}
}

View File

@@ -10,6 +10,7 @@ import (
"github.com/go-kit/log"
"github.com/grafana/dskit/concurrency"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/compression"
@@ -27,12 +28,12 @@ var BloomPagePool = mempool.New("test", []mempool.Bucket{
type singleKeyTest []byte
// Matches implements BloomTest.
func (s singleKeyTest) Matches(bloom filter.Checker) bool {
func (s singleKeyTest) Matches(_ labels.Labels, bloom filter.Checker) bool {
return bloom.Test(s)
}
// MatchesWithPrefixBuf implements BloomTest.
func (s singleKeyTest) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool {
func (s singleKeyTest) MatchesWithPrefixBuf(_ labels.Labels, bloom filter.Checker, buf []byte, prefixLen int) bool {
return bloom.Test(append(buf[:prefixLen], s...))
}