mirror of
https://github.com/grafana/loki.git
synced 2026-03-13 09:33:58 +08:00
refactor(stringlabels): Support stringlabels in logql/log package (#17838)
This is another step towards supporting Prometheus' stringlabels implementation in Loki. It adds support in the `logql/log` package. Part of https://github.com/grafana/loki/issues/17122 The tests should now compile and pass with `-tags stringlabels`. --- Co-authored-by: Karsten Jeschkies <karsten.jeschkies@grafana.com>
This commit is contained in:
@@ -1317,7 +1317,7 @@ func (hb *headBlock) SampleIterator(
|
||||
for _, e := range hb.entries {
|
||||
for _, extractor := range extractors {
|
||||
stats.AddHeadChunkBytes(int64(len(e.s)))
|
||||
samples, ok := extractor.ProcessString(e.t, e.s, e.structuredMetadata...)
|
||||
samples, ok := extractor.ProcessString(e.t, e.s, e.structuredMetadata)
|
||||
if !ok || len(samples) == 0 {
|
||||
continue
|
||||
}
|
||||
@@ -1769,7 +1769,7 @@ func (e *sampleBufferedIterator) Next() bool {
|
||||
for e.bufferedIterator.Next() {
|
||||
e.stats.AddPostFilterLines(1)
|
||||
|
||||
samples, ok := e.extractor.Process(e.currTs, e.currLine, e.currStructuredMetadata...)
|
||||
samples, ok := e.extractor.Process(e.currTs, e.currLine, e.currStructuredMetadata)
|
||||
if !ok || len(samples) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -527,7 +527,7 @@ func TestSerialization(t *testing.T) {
|
||||
require.Equal(t, labels.FromStrings("foo", strconv.Itoa(i)), logproto.FromLabelAdaptersToLabels(e.StructuredMetadata))
|
||||
} else {
|
||||
require.Equal(t, labels.EmptyLabels().String(), it.Labels())
|
||||
require.Nil(t, e.StructuredMetadata)
|
||||
require.Empty(t, logproto.FromLabelAdaptersToLabels(e.StructuredMetadata))
|
||||
}
|
||||
}
|
||||
require.NoError(t, it.Err())
|
||||
|
||||
@@ -161,32 +161,32 @@ func TestSymbolizerLabelNormalization(t *testing.T) {
|
||||
{
|
||||
name: "basic label normalization",
|
||||
labelsToAdd: []labels.Labels{
|
||||
{
|
||||
{Name: "foo-bar", Value: "value1"},
|
||||
{Name: "fizz_buzz", Value: "value2"},
|
||||
},
|
||||
labels.FromStrings(
|
||||
"foo-bar", "value1",
|
||||
"fizz_buzz", "value2",
|
||||
),
|
||||
},
|
||||
expectedLabels: []labels.Labels{
|
||||
{
|
||||
{Name: "foo_bar", Value: "value1"},
|
||||
{Name: "fizz_buzz", Value: "value2"},
|
||||
},
|
||||
labels.FromStrings(
|
||||
"foo_bar", "value1",
|
||||
"fizz_buzz", "value2",
|
||||
),
|
||||
},
|
||||
description: "hyphens should be converted to underscores in label names",
|
||||
},
|
||||
{
|
||||
name: "same string as name and value",
|
||||
labelsToAdd: []labels.Labels{
|
||||
{
|
||||
{Name: "foo-bar", Value: "foo-bar"},
|
||||
{Name: "fizz-buzz", Value: "fizz-buzz"},
|
||||
},
|
||||
labels.FromStrings(
|
||||
"foo-bar", "foo-bar",
|
||||
"fizz-buzz", "fizz-buzz",
|
||||
),
|
||||
},
|
||||
expectedLabels: []labels.Labels{
|
||||
{
|
||||
{Name: "foo_bar", Value: "foo-bar"},
|
||||
{Name: "fizz_buzz", Value: "fizz-buzz"},
|
||||
},
|
||||
labels.FromStrings(
|
||||
"foo_bar", "foo-bar",
|
||||
"fizz_buzz", "fizz-buzz",
|
||||
),
|
||||
},
|
||||
description: "only normalize when string is used as a name, not as a value",
|
||||
},
|
||||
@@ -267,10 +267,10 @@ func TestSymbolizerLabelNormalizationAfterCheckpointing(t *testing.T) {
|
||||
s := newSymbolizer()
|
||||
|
||||
// Add some labels and serialize them
|
||||
originalLabels := labels.Labels{
|
||||
{Name: "foo-bar", Value: "value1"},
|
||||
{Name: "fizz-buzz", Value: "value2"},
|
||||
}
|
||||
originalLabels := labels.FromStrings(
|
||||
"foo-bar", "value1",
|
||||
"fizz-buzz", "value2",
|
||||
)
|
||||
_, err := s.Add(originalLabels)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -282,19 +282,21 @@ func TestSymbolizerLabelNormalizationAfterCheckpointing(t *testing.T) {
|
||||
loaded := symbolizerFromCheckpoint(buf.Bytes())
|
||||
|
||||
// Add new labels with the same names but different values
|
||||
newLabels := labels.Labels{
|
||||
{Name: "foo-bar", Value: "new-value1"},
|
||||
{Name: "fizz-buzz", Value: "new-value2"},
|
||||
}
|
||||
newLabels := labels.FromStrings(
|
||||
"foo-bar", "new-value1",
|
||||
"fizz-buzz", "new-value2",
|
||||
)
|
||||
symbols, err := loaded.Add(newLabels)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check that the normalization is consistent
|
||||
result := loaded.Lookup(symbols, nil)
|
||||
require.Equal(t, "foo_bar", result[0].Name, "first label should be normalized")
|
||||
require.Equal(t, "new-value1", result[0].Value, "first value should be unchanged")
|
||||
require.Equal(t, "fizz_buzz", result[1].Name, "second label should be normalized")
|
||||
require.Equal(t, "new-value2", result[1].Value, "second value should be unchanged")
|
||||
expected := map[string]string{
|
||||
"foo_bar": "new-value1",
|
||||
"fizz_buzz": "new-value2",
|
||||
}
|
||||
|
||||
require.Equal(t, expected, result.Map(), "label names should be normalized")
|
||||
}
|
||||
|
||||
func TestSymbolizerLabelNormalizationSameNameValue(t *testing.T) {
|
||||
|
||||
@@ -333,7 +333,7 @@ func (hb *unorderedHeadBlock) SampleIterator(
|
||||
structuredMetadata = hb.symbolizer.Lookup(structuredMetadataSymbols, labelsBuilder)
|
||||
|
||||
for _, extractor := range extractor {
|
||||
samples, ok := extractor.ProcessString(ts, line, structuredMetadata...)
|
||||
samples, ok := extractor.ProcessString(ts, line, structuredMetadata)
|
||||
if !ok || len(samples) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -26,9 +26,7 @@ func iterEq(t *testing.T, exp []entry, got iter.EntryIterator) {
|
||||
Timestamp: time.Unix(0, exp[i].t),
|
||||
Line: exp[i].s,
|
||||
StructuredMetadata: logproto.FromLabelsToLabelAdapters(exp[i].structuredMetadata),
|
||||
}
|
||||
if exp[i].structuredMetadata.IsEmpty() {
|
||||
expected.StructuredMetadata = nil
|
||||
Parsed: logproto.EmptyLabelAdapters(),
|
||||
}
|
||||
require.Equal(t, expected, got.At())
|
||||
require.Equal(t, exp[i].structuredMetadata.String(), got.Labels())
|
||||
@@ -729,20 +727,24 @@ func TestReorderAcrossBlocks(t *testing.T) {
|
||||
|
||||
exp := []entry{
|
||||
{
|
||||
t: time.Unix(1, 0).UnixNano(),
|
||||
s: "1",
|
||||
t: time.Unix(1, 0).UnixNano(),
|
||||
s: "1",
|
||||
structuredMetadata: labels.EmptyLabels(),
|
||||
},
|
||||
{
|
||||
t: time.Unix(3, 0).UnixNano(),
|
||||
s: "3",
|
||||
t: time.Unix(3, 0).UnixNano(),
|
||||
s: "3",
|
||||
structuredMetadata: labels.EmptyLabels(),
|
||||
},
|
||||
{
|
||||
t: time.Unix(5, 0).UnixNano(),
|
||||
s: "5",
|
||||
t: time.Unix(5, 0).UnixNano(),
|
||||
s: "5",
|
||||
structuredMetadata: labels.EmptyLabels(),
|
||||
},
|
||||
{
|
||||
t: time.Unix(7, 0).UnixNano(),
|
||||
s: "7",
|
||||
t: time.Unix(7, 0).UnixNano(),
|
||||
s: "7",
|
||||
structuredMetadata: labels.EmptyLabels(),
|
||||
},
|
||||
}
|
||||
iterEq(t, exp, itr)
|
||||
|
||||
@@ -58,7 +58,7 @@ func (e *multiExtractorSampleBufferedIterator) Next() bool {
|
||||
e.stats.AddPostFilterLines(1)
|
||||
|
||||
for _, extractor := range e.extractors {
|
||||
samples, ok := extractor.Process(e.currTs, e.currLine, e.currStructuredMetadata...)
|
||||
samples, ok := extractor.Process(e.currTs, e.currLine, e.currStructuredMetadata)
|
||||
if !ok || len(samples) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -631,6 +631,7 @@ func TestChunkRewriter(t *testing.T) {
|
||||
Timestamp: curr.Time(),
|
||||
Line: curr.String(),
|
||||
StructuredMetadata: logproto.FromLabelsToLabelAdapters(expectedStructuredMetadata),
|
||||
Parsed: logproto.EmptyLabelAdapters(),
|
||||
}, newChunkItr.At())
|
||||
require.Equal(t, expectedStructuredMetadata.String(), newChunkItr.Labels())
|
||||
}
|
||||
|
||||
@@ -252,7 +252,7 @@ func newSampleIterator(ctx context.Context,
|
||||
timestamp := record.Timestamp.UnixNano()
|
||||
|
||||
statistics.AddDecompressedLines(1)
|
||||
samples, ok := streamExtractor.Process(timestamp, record.Line, record.Metadata...)
|
||||
samples, ok := streamExtractor.Process(timestamp, record.Line, record.Metadata)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -377,8 +377,21 @@ func TestStore_SelectLogs(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
emptyLabelAdapters := logproto.EmptyLabelAdapters()
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
// Make sure all empty label sets use an empty slice, rather than nil, to make assertions below easier.
|
||||
for i := range tt.want {
|
||||
if len(tt.want[i].Entry.Parsed) == 0 {
|
||||
tt.want[i].Entry.Parsed = emptyLabelAdapters
|
||||
}
|
||||
|
||||
if len(tt.want[i].Entry.StructuredMetadata) == 0 {
|
||||
tt.want[i].Entry.StructuredMetadata = emptyLabelAdapters
|
||||
}
|
||||
}
|
||||
|
||||
it, err := store.SelectLogs(ctx, logql.SelectLogParams{
|
||||
QueryRequest: &logproto.QueryRequest{
|
||||
Start: tt.start,
|
||||
|
||||
@@ -424,12 +424,16 @@ var (
|
||||
Labels: labels.FromStrings("stream", "1").String(),
|
||||
Entries: []logproto.Entry{
|
||||
{
|
||||
Timestamp: time.Unix(0, 1),
|
||||
Line: "1",
|
||||
Timestamp: time.Unix(0, 1),
|
||||
Line: "1",
|
||||
StructuredMetadata: logproto.EmptyLabelAdapters(),
|
||||
Parsed: logproto.EmptyLabelAdapters(),
|
||||
},
|
||||
{
|
||||
Timestamp: time.Unix(0, 2),
|
||||
Line: "2",
|
||||
Timestamp: time.Unix(0, 2),
|
||||
Line: "2",
|
||||
StructuredMetadata: logproto.EmptyLabelAdapters(),
|
||||
Parsed: logproto.EmptyLabelAdapters(),
|
||||
},
|
||||
},
|
||||
}
|
||||
@@ -437,12 +441,16 @@ var (
|
||||
Labels: labels.FromStrings("stream", "2").String(),
|
||||
Entries: []logproto.Entry{
|
||||
{
|
||||
Timestamp: time.Unix(0, 1),
|
||||
Line: "3",
|
||||
Timestamp: time.Unix(0, 1),
|
||||
Line: "3",
|
||||
StructuredMetadata: logproto.EmptyLabelAdapters(),
|
||||
Parsed: logproto.EmptyLabelAdapters(),
|
||||
},
|
||||
{
|
||||
Timestamp: time.Unix(0, 2),
|
||||
Line: "4",
|
||||
Timestamp: time.Unix(0, 2),
|
||||
Line: "4",
|
||||
StructuredMetadata: logproto.EmptyLabelAdapters(),
|
||||
Parsed: logproto.EmptyLabelAdapters(),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
@@ -568,6 +568,19 @@ func buildTestStreams(offset int) []logproto.Stream {
|
||||
// check that the store is holding data equivalent to what we expect
|
||||
func (s *testStore) checkData(t *testing.T, testData map[string][]logproto.Stream) {
|
||||
for userID, expected := range testData {
|
||||
// Ensure all empty label sets use an empty set of adapters, rather than a nil slice, to make the assertion below easier.
|
||||
for _, stream := range expected {
|
||||
for i := range stream.Entries {
|
||||
if len(stream.Entries[i].Parsed) == 0 {
|
||||
stream.Entries[i].Parsed = logproto.EmptyLabelAdapters()
|
||||
}
|
||||
|
||||
if len(stream.Entries[i].StructuredMetadata) == 0 {
|
||||
stream.Entries[i].StructuredMetadata = logproto.EmptyLabelAdapters()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
streams := s.getStreamsForUser(t, userID)
|
||||
require.Equal(t, expected, streams)
|
||||
}
|
||||
|
||||
@@ -1035,14 +1035,14 @@ func (p *mockStreamExtractor) BaseLabels() log.LabelsResult {
|
||||
return p.wrappedSP.BaseLabels()
|
||||
}
|
||||
|
||||
func (p *mockStreamExtractor) Process(ts int64, line []byte, lbs ...labels.Label) ([]log.ExtractedSample, bool) {
|
||||
func (p *mockStreamExtractor) Process(ts int64, line []byte, lbs labels.Labels) ([]log.ExtractedSample, bool) {
|
||||
p.called++
|
||||
return p.wrappedSP.Process(ts, line, lbs...)
|
||||
return p.wrappedSP.Process(ts, line, lbs)
|
||||
}
|
||||
|
||||
func (p *mockStreamExtractor) ProcessString(ts int64, line string, lbs ...labels.Label) ([]log.ExtractedSample, bool) {
|
||||
func (p *mockStreamExtractor) ProcessString(ts int64, line string, lbs labels.Labels) ([]log.ExtractedSample, bool) {
|
||||
p.called++
|
||||
return p.wrappedSP.ProcessString(ts, line, lbs...)
|
||||
return p.wrappedSP.ProcessString(ts, line, lbs)
|
||||
}
|
||||
|
||||
func Test_QueryWithDelete(t *testing.T) {
|
||||
|
||||
@@ -294,8 +294,10 @@ func TestSeriesRecoveryNoDuplicates(t *testing.T) {
|
||||
Labels: lbls.String(),
|
||||
Entries: []logproto.Entry{
|
||||
{
|
||||
Timestamp: time.Unix(1, 0),
|
||||
Line: "line 1",
|
||||
Timestamp: time.Unix(1, 0),
|
||||
Line: "line 1",
|
||||
StructuredMetadata: logproto.EmptyLabelAdapters(),
|
||||
Parsed: logproto.EmptyLabelAdapters(),
|
||||
},
|
||||
},
|
||||
Hash: lbls.Hash(),
|
||||
|
||||
@@ -315,9 +315,10 @@ func Test_StructuredMetadata(t *testing.T) {
|
||||
Labels: labels.NewBuilder(lbs).Set("foo", "1").Labels().String(),
|
||||
Entries: []logproto.Entry{
|
||||
{
|
||||
Timestamp: time.Unix(0, 1),
|
||||
Line: "foo=1",
|
||||
Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "1")),
|
||||
Timestamp: time.Unix(0, 1),
|
||||
Line: "foo=1",
|
||||
Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "1")),
|
||||
StructuredMetadata: logproto.EmptyLabelAdapters(),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
@@ -61,6 +61,10 @@ func FromLabelsToLabelAdapters(ls labels.Labels) []LabelAdapter {
|
||||
return *(*[]LabelAdapter)(unsafe.Pointer(&ls)) // #nosec G103 -- we know the string is not mutated
|
||||
}
|
||||
|
||||
func EmptyLabelAdapters() []LabelAdapter {
|
||||
return FromLabelsToLabelAdapters(labels.EmptyLabels())
|
||||
}
|
||||
|
||||
// FromLabelAdaptersToMetric converts []LabelAdapter to a model.Metric.
|
||||
// Don't do this on any performance sensitive paths.
|
||||
func FromLabelAdaptersToMetric(ls []LabelAdapter) model.Metric {
|
||||
|
||||
@@ -37,7 +37,7 @@ func (c *consolidatedMultiVariantStreamExtractor) BaseLabels() LabelsResult {
|
||||
return c.commonPipeline.BaseLabels()
|
||||
}
|
||||
|
||||
func (c *consolidatedMultiVariantStreamExtractor) Process(ts int64, line []byte, structuredMetadata ...labels.Label) ([]ExtractedSample, bool) {
|
||||
func (c *consolidatedMultiVariantStreamExtractor) Process(ts int64, line []byte, structuredMetadata labels.Labels) ([]ExtractedSample, bool) {
|
||||
// Process the line through the common pipeline
|
||||
processedLine, commonLabels, ok := c.commonPipeline.Process(ts, line, structuredMetadata)
|
||||
if !ok {
|
||||
@@ -57,7 +57,7 @@ func (c *consolidatedMultiVariantStreamExtractor) Process(ts int64, line []byte,
|
||||
lbls := commonLabels.Labels()
|
||||
for i, variant := range c.variants {
|
||||
streamVariantExtractor := variant.ForStream(lbls)
|
||||
samples, ok := streamVariantExtractor.Process(ts, processedLine, commonStructuredMetadata...)
|
||||
samples, ok := streamVariantExtractor.Process(ts, processedLine, commonStructuredMetadata)
|
||||
if ok {
|
||||
for _, sample := range samples {
|
||||
sample.Labels = appendVariantLabel(sample.Labels, i)
|
||||
@@ -74,14 +74,15 @@ func (c *consolidatedMultiVariantStreamExtractor) Process(ts int64, line []byte,
|
||||
}
|
||||
|
||||
func appendVariantLabel(lbls LabelsResult, variantIndex int) LabelsResult {
|
||||
streamLbls := lbls.Stream()
|
||||
newLblsBuilder := labels.NewScratchBuilder(lbls.Stream().Len() + 1)
|
||||
|
||||
newLbls := make(labels.Labels, 0, len(streamLbls)+1)
|
||||
newLbls = append(newLbls, labels.Label{
|
||||
Name: constants.VariantLabel,
|
||||
Value: strconv.Itoa(variantIndex),
|
||||
lbls.Stream().Range(func(l labels.Label) {
|
||||
newLblsBuilder.Add(l.Name, l.Value)
|
||||
})
|
||||
newLbls = append(newLbls, streamLbls...)
|
||||
|
||||
newLblsBuilder.Add(constants.VariantLabel, strconv.Itoa(variantIndex))
|
||||
newLblsBuilder.Sort()
|
||||
newLbls := newLblsBuilder.Labels()
|
||||
|
||||
builder := NewBaseLabelsBuilder().ForLabels(newLbls, newLbls.Hash())
|
||||
builder.Add(StructuredMetadataLabel, lbls.StructuredMetadata())
|
||||
@@ -89,8 +90,8 @@ func appendVariantLabel(lbls LabelsResult, variantIndex int) LabelsResult {
|
||||
return builder.LabelsResult()
|
||||
}
|
||||
|
||||
func (c *consolidatedMultiVariantStreamExtractor) ProcessString(ts int64, line string, structuredMetadata ...labels.Label) ([]ExtractedSample, bool) {
|
||||
return c.Process(ts, unsafeGetBytes(line), structuredMetadata...)
|
||||
func (c *consolidatedMultiVariantStreamExtractor) ProcessString(ts int64, line string, structuredMetadata labels.Labels) ([]ExtractedSample, bool) {
|
||||
return c.Process(ts, unsafeGetBytes(line), structuredMetadata)
|
||||
}
|
||||
|
||||
func (c *consolidatedMultiVariantStreamExtractor) ReferencedStructuredMetadata() bool {
|
||||
|
||||
@@ -34,12 +34,10 @@ func Test_ConsolidatedMultiVariantExtractor(t *testing.T) {
|
||||
// Attempt to create a consolidated multi-variant extractor
|
||||
// This will fail because the function doesn't exist yet
|
||||
extractor := NewConsolidatedMultiVariantExtractor(commonPipeline, variants)
|
||||
streamExtractor := extractor.ForStream(labels.Labels{
|
||||
{Name: "foo", Value: "bar"},
|
||||
})
|
||||
streamExtractor := extractor.ForStream(labels.FromStrings("foo", "bar"))
|
||||
|
||||
// Process a log line
|
||||
samples, ok := streamExtractor.ProcessString(1000, "test log line")
|
||||
samples, ok := streamExtractor.ProcessString(1000, "test log line", labels.EmptyLabels())
|
||||
require.True(t, ok)
|
||||
require.Len(t, samples, 2)
|
||||
|
||||
@@ -80,8 +78,8 @@ func (m *MockCommonPipeline) Reset() {
|
||||
panic("not implemented") // TODO: Implement
|
||||
}
|
||||
|
||||
func (m *MockCommonPipeline) ForStream(labels labels.Labels) StreamPipeline {
|
||||
lblsResult := NewLabelsResult(labels.String(), labels.Hash(), labels, nil, nil)
|
||||
func (m *MockCommonPipeline) ForStream(lbls labels.Labels) StreamPipeline {
|
||||
lblsResult := NewLabelsResult(lbls.String(), lbls.Hash(), lbls, labels.EmptyLabels(), labels.EmptyLabels())
|
||||
return &MockCommonStreamPipeline{
|
||||
counter: m.counter,
|
||||
shouldPass: m.shouldPass,
|
||||
@@ -122,8 +120,8 @@ type MockVariantSpecificExtractor struct {
|
||||
shouldExtract bool
|
||||
}
|
||||
|
||||
func (m *MockVariantSpecificExtractor) ForStream(labels labels.Labels) StreamSampleExtractor {
|
||||
lblsResult := NewLabelsResult(labels.String(), labels.Hash(), labels, nil, nil)
|
||||
func (m *MockVariantSpecificExtractor) ForStream(lbls labels.Labels) StreamSampleExtractor {
|
||||
lblsResult := NewLabelsResult(lbls.String(), lbls.Hash(), lbls, labels.EmptyLabels(), labels.EmptyLabels())
|
||||
return &mockVariantSpecificStreamExtractor{
|
||||
valueToExtract: m.valueToExtract,
|
||||
shouldExtract: m.shouldExtract,
|
||||
@@ -142,7 +140,7 @@ func (m *mockVariantSpecificStreamExtractor) BaseLabels() LabelsResult {
|
||||
panic("not implemented") // TODO: Implement
|
||||
}
|
||||
|
||||
func (m *mockVariantSpecificStreamExtractor) Process(_ int64, _ []byte, _ ...labels.Label) ([]ExtractedSample, bool) {
|
||||
func (m *mockVariantSpecificStreamExtractor) Process(_ int64, _ []byte, _ labels.Labels) ([]ExtractedSample, bool) {
|
||||
result := []ExtractedSample{
|
||||
{
|
||||
Value: m.valueToExtract,
|
||||
@@ -153,8 +151,8 @@ func (m *mockVariantSpecificStreamExtractor) Process(_ int64, _ []byte, _ ...lab
|
||||
return result, m.shouldExtract
|
||||
}
|
||||
|
||||
func (m *mockVariantSpecificStreamExtractor) ProcessString(ts int64, line string, lbls ...labels.Label) ([]ExtractedSample, bool) {
|
||||
return m.Process(ts, []byte(line), lbls...)
|
||||
func (m *mockVariantSpecificStreamExtractor) ProcessString(ts int64, line string, lbls labels.Labels) ([]ExtractedSample, bool) {
|
||||
return m.Process(ts, []byte(line), lbls)
|
||||
}
|
||||
|
||||
func (m *mockVariantSpecificStreamExtractor) ReferencedStructuredMetadata() bool {
|
||||
|
||||
@@ -711,47 +711,47 @@ func Test_labelsFormatter_Format(t *testing.T) {
|
||||
{
|
||||
"unixToTime days",
|
||||
mustNewLabelsFormatter([]LabelFmt{NewTemplateLabelFmt("foo", `{{ .bar | unixToTime }}`)}),
|
||||
labels.Labels{{Name: "foo", Value: ""}, {Name: "bar", Value: "19503"}},
|
||||
labels.Labels{
|
||||
{Name: "bar", Value: "19503"},
|
||||
{Name: "foo", Value: epochDay19503.String()},
|
||||
},
|
||||
labels.FromStrings("foo", "", "bar", "19503"),
|
||||
labels.FromStrings(
|
||||
"bar", "19503",
|
||||
"foo", epochDay19503.String(),
|
||||
),
|
||||
},
|
||||
{
|
||||
"unixToTime seconds",
|
||||
mustNewLabelsFormatter([]LabelFmt{NewTemplateLabelFmt("foo", `{{ .bar | unixToTime }}`)}),
|
||||
labels.Labels{{Name: "foo", Value: ""}, {Name: "bar", Value: "1679577215"}},
|
||||
labels.Labels{
|
||||
{Name: "bar", Value: "1679577215"},
|
||||
{Name: "foo", Value: epochSeconds1679577215.String()},
|
||||
},
|
||||
labels.FromStrings("foo", "", "bar", "1679577215"),
|
||||
labels.FromStrings(
|
||||
"bar", "1679577215",
|
||||
"foo", epochSeconds1679577215.String(),
|
||||
),
|
||||
},
|
||||
{
|
||||
"unixToTime milliseconds",
|
||||
mustNewLabelsFormatter([]LabelFmt{NewTemplateLabelFmt("foo", `{{ .bar | unixToTime }}`)}),
|
||||
labels.Labels{{Name: "foo", Value: ""}, {Name: "bar", Value: "1257894000000"}},
|
||||
labels.Labels{
|
||||
{Name: "bar", Value: "1257894000000"},
|
||||
{Name: "foo", Value: epochMilliseconds1257894000000.String()},
|
||||
},
|
||||
labels.FromStrings("foo", "", "bar", "1257894000000"),
|
||||
labels.FromStrings(
|
||||
"bar", "1257894000000",
|
||||
"foo", epochMilliseconds1257894000000.String(),
|
||||
),
|
||||
},
|
||||
{
|
||||
"unixToTime microseconds",
|
||||
mustNewLabelsFormatter([]LabelFmt{NewTemplateLabelFmt("foo", `{{ .bar | unixToTime }}`)}),
|
||||
labels.Labels{{Name: "foo", Value: ""}, {Name: "bar", Value: "1673798889902000"}},
|
||||
labels.Labels{
|
||||
{Name: "bar", Value: "1673798889902000"},
|
||||
{Name: "foo", Value: epochMicroseconds1673798889902000.String()},
|
||||
},
|
||||
labels.FromStrings("foo", "", "bar", "1673798889902000"),
|
||||
labels.FromStrings(
|
||||
"bar", "1673798889902000",
|
||||
"foo", epochMicroseconds1673798889902000.String(),
|
||||
),
|
||||
},
|
||||
{
|
||||
"unixToTime nanoseconds",
|
||||
mustNewLabelsFormatter([]LabelFmt{NewTemplateLabelFmt("foo", `{{ .bar | unixToTime }}`)}),
|
||||
labels.Labels{{Name: "foo", Value: ""}, {Name: "bar", Value: "1000000000000000000"}},
|
||||
labels.Labels{
|
||||
{Name: "bar", Value: "1000000000000000000"},
|
||||
{Name: "foo", Value: epochNanoseconds1000000000000000000.String()},
|
||||
},
|
||||
labels.FromStrings("foo", "", "bar", "1000000000000000000"),
|
||||
labels.FromStrings(
|
||||
"bar", "1000000000000000000",
|
||||
"foo", epochNanoseconds1000000000000000000.String(),
|
||||
),
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@@ -385,25 +385,25 @@ func TestStringLabelFilter(t *testing.T) {
|
||||
{
|
||||
name: `logfmt|msg=~"(?i)hello" (with label)`,
|
||||
filter: NewStringLabelFilter(labels.MustNewMatcher(labels.MatchRegexp, "msg", "(?i)hello")),
|
||||
labels: labels.Labels{{Name: "msg", Value: "HELLO"}, {Name: "subqueries", Value: ""}}, // label `msg` contains HELLO
|
||||
labels: labels.FromStrings("msg", "HELLO", "subqueries", ""), // label `msg` contains HELLO
|
||||
shouldMatch: true,
|
||||
},
|
||||
{
|
||||
name: `logfmt|msg=~"(?i)hello" (with label)`,
|
||||
filter: NewStringLabelFilter(labels.MustNewMatcher(labels.MatchRegexp, "msg", "(?i)hello")),
|
||||
labels: labels.Labels{{Name: "msg", Value: "hello"}, {Name: "subqueries", Value: ""}}, // label `msg` contains hello
|
||||
labels: labels.FromStrings("msg", "hello", "subqueries", ""), // label `msg` contains hello
|
||||
shouldMatch: true,
|
||||
},
|
||||
{
|
||||
name: `logfmt|msg=~"(?i)HELLO" (with label)`,
|
||||
filter: NewStringLabelFilter(labels.MustNewMatcher(labels.MatchRegexp, "msg", "(?i)HELLO")),
|
||||
labels: labels.Labels{{Name: "msg", Value: "HELLO"}, {Name: "subqueries", Value: ""}}, // label `msg` contains HELLO
|
||||
labels: labels.FromStrings("msg", "HELLO", "subqueries", ""), // label `msg` contains HELLO
|
||||
shouldMatch: true,
|
||||
},
|
||||
{
|
||||
name: `logfmt|msg=~"(?i)HELLO" (with label)`,
|
||||
filter: NewStringLabelFilter(labels.MustNewMatcher(labels.MatchRegexp, "msg", "(?i)HELLO")),
|
||||
labels: labels.Labels{{Name: "msg", Value: "hello"}, {Name: "subqueries", Value: ""}}, // label `msg` contains hello
|
||||
labels: labels.FromStrings("msg", "hello", "subqueries", ""), // label `msg` contains hello
|
||||
shouldMatch: true,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -2,9 +2,6 @@ package log
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"slices"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
@@ -53,7 +50,21 @@ func (l labelsResult) String() string {
|
||||
}
|
||||
|
||||
func (l labelsResult) Labels() labels.Labels {
|
||||
return flattenLabels(nil, l.stream, l.structuredMetadata, l.parsed)
|
||||
size := l.stream.Len() + l.structuredMetadata.Len() + l.parsed.Len()
|
||||
b := labels.NewScratchBuilder(size)
|
||||
|
||||
l.stream.Range(func(l labels.Label) {
|
||||
b.Add(l.Name, l.Value)
|
||||
})
|
||||
l.structuredMetadata.Range(func(l labels.Label) {
|
||||
b.Add(l.Name, l.Value)
|
||||
})
|
||||
l.parsed.Range(func(l labels.Label) {
|
||||
b.Add(l.Name, l.Value)
|
||||
})
|
||||
|
||||
b.Sort()
|
||||
return b.Labels()
|
||||
}
|
||||
|
||||
func (l labelsResult) Hash() uint64 {
|
||||
@@ -61,44 +72,17 @@ func (l labelsResult) Hash() uint64 {
|
||||
}
|
||||
|
||||
func (l labelsResult) Stream() labels.Labels {
|
||||
if len(l.stream) == 0 {
|
||||
return nil
|
||||
}
|
||||
return l.stream
|
||||
}
|
||||
|
||||
func (l labelsResult) StructuredMetadata() labels.Labels {
|
||||
if len(l.structuredMetadata) == 0 {
|
||||
return nil
|
||||
}
|
||||
return l.structuredMetadata
|
||||
}
|
||||
|
||||
func (l labelsResult) Parsed() labels.Labels {
|
||||
if len(l.parsed) == 0 {
|
||||
return nil
|
||||
}
|
||||
return l.parsed
|
||||
}
|
||||
|
||||
type hasher struct {
|
||||
buf []byte // buffer for computing hash without bytes slice allocation.
|
||||
}
|
||||
|
||||
// newHasher allow to compute hashes for labels by reusing the same buffer.
|
||||
func newHasher() *hasher {
|
||||
return &hasher{
|
||||
buf: make([]byte, 0, 1024),
|
||||
}
|
||||
}
|
||||
|
||||
// Hash hashes the labels
|
||||
func (h *hasher) Hash(lbs labels.Labels) uint64 {
|
||||
var hash uint64
|
||||
hash, h.buf = lbs.HashWithoutLabels(h.buf, []string(nil)...)
|
||||
return hash
|
||||
}
|
||||
|
||||
type LabelCategory int
|
||||
|
||||
const (
|
||||
@@ -129,7 +113,7 @@ func categoriesContain(categories []LabelCategory, category LabelCategory) bool
|
||||
// Only one base builder is used and it contains cache for each LabelsBuilders.
|
||||
type BaseLabelsBuilder struct {
|
||||
del []string
|
||||
add [numValidCategories]labels.Labels
|
||||
add [numValidCategories][]labels.Label
|
||||
// nolint:structcheck
|
||||
// https://github.com/golangci/golangci-lint/issues/826
|
||||
err string
|
||||
@@ -150,7 +134,7 @@ type BaseLabelsBuilder struct {
|
||||
// LabelsBuilder is the same as labels.Builder but tailored for this package.
|
||||
type LabelsBuilder struct {
|
||||
base labels.Labels
|
||||
buf labels.Labels
|
||||
buf []labels.Label
|
||||
currentResult LabelsResult
|
||||
groupedResult LabelsResult
|
||||
|
||||
@@ -166,10 +150,10 @@ func NewBaseLabelsBuilderWithGrouping(groups []string, parserKeyHints ParserHint
|
||||
const labelsCapacity = 16
|
||||
return &BaseLabelsBuilder{
|
||||
del: make([]string, 0, 5),
|
||||
add: [numValidCategories]labels.Labels{
|
||||
StreamLabel: make(labels.Labels, 0, labelsCapacity),
|
||||
StructuredMetadataLabel: make(labels.Labels, 0, labelsCapacity),
|
||||
ParsedLabel: make(labels.Labels, 0, labelsCapacity),
|
||||
add: [numValidCategories][]labels.Label{
|
||||
StreamLabel: make([]labels.Label, 0, labelsCapacity),
|
||||
StructuredMetadataLabel: make([]labels.Label, 0, labelsCapacity),
|
||||
ParsedLabel: make([]labels.Label, 0, labelsCapacity),
|
||||
},
|
||||
resultCache: make(map[uint64]LabelsResult),
|
||||
hasher: newHasher(),
|
||||
@@ -315,11 +299,12 @@ func (b *LabelsBuilder) getWithCategory(key string) (string, LabelCategory, bool
|
||||
}
|
||||
}
|
||||
|
||||
for _, l := range b.base {
|
||||
if l.Name == key {
|
||||
return l.Value, StreamLabel, true
|
||||
}
|
||||
value := b.base.Get(key)
|
||||
|
||||
if value != "" {
|
||||
return value, StreamLabel, true
|
||||
}
|
||||
|
||||
return "", InvalidCategory, false
|
||||
}
|
||||
|
||||
@@ -437,15 +422,7 @@ func (b *LabelsBuilder) GetJSONPath(labelName string) []string {
|
||||
return path
|
||||
}
|
||||
|
||||
// Labels returns the labels from the builder. If no modifications
|
||||
// were made, the original labels are returned.
|
||||
func (b *LabelsBuilder) labels(categories ...LabelCategory) labels.Labels {
|
||||
b.buf = b.UnsortedLabels(b.buf, categories...)
|
||||
sort.Sort(b.buf)
|
||||
return b.buf
|
||||
}
|
||||
|
||||
func (b *LabelsBuilder) appendErrors(buf labels.Labels) labels.Labels {
|
||||
func (b *LabelsBuilder) appendErrors(buf []labels.Label) []labels.Label {
|
||||
if b.err != "" {
|
||||
buf = append(buf, labels.Label{
|
||||
Name: logqlmodel.ErrorLabel,
|
||||
@@ -461,18 +438,22 @@ func (b *LabelsBuilder) appendErrors(buf labels.Labels) labels.Labels {
|
||||
return buf
|
||||
}
|
||||
|
||||
func (b *LabelsBuilder) UnsortedLabels(buf labels.Labels, categories ...LabelCategory) labels.Labels {
|
||||
func (b *LabelsBuilder) UnsortedLabels(buf []labels.Label, categories ...LabelCategory) []labels.Label {
|
||||
if categories == nil {
|
||||
categories = allCategories
|
||||
}
|
||||
|
||||
if !b.hasDel() && !b.hasAdd() && categoriesContain(categories, StreamLabel) {
|
||||
if buf == nil {
|
||||
buf = make(labels.Labels, 0, len(b.base)+1) // +1 for error label.
|
||||
buf = make([]labels.Label, 0, b.base.Len()+1) // +1 for error label.
|
||||
} else {
|
||||
buf = buf[:0]
|
||||
}
|
||||
buf = append(buf, b.base...)
|
||||
|
||||
b.base.Range(func(l labels.Label) {
|
||||
buf = append(buf, l)
|
||||
})
|
||||
|
||||
if categoriesContain(categories, ParsedLabel) {
|
||||
buf = b.appendErrors(buf)
|
||||
}
|
||||
@@ -483,39 +464,38 @@ func (b *LabelsBuilder) UnsortedLabels(buf labels.Labels, categories ...LabelCat
|
||||
// In the general case, labels are removed, modified or moved
|
||||
// rather than added.
|
||||
if buf == nil {
|
||||
size := len(b.base) + b.sizeAdd() + 1
|
||||
buf = make(labels.Labels, 0, size)
|
||||
size := b.base.Len() + b.sizeAdd() + 1
|
||||
buf = make([]labels.Label, 0, size)
|
||||
} else {
|
||||
buf = buf[:0]
|
||||
}
|
||||
|
||||
if categoriesContain(categories, StreamLabel) {
|
||||
Outer:
|
||||
for _, l := range b.base {
|
||||
b.base.Range(func(l labels.Label) {
|
||||
// Skip stream labels to be deleted
|
||||
for _, n := range b.del {
|
||||
if l.Name == n {
|
||||
continue Outer
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Skip stream labels which value will be replaced by structured metadata
|
||||
if labelsContain(b.add[StructuredMetadataLabel], l.Name) {
|
||||
continue
|
||||
return
|
||||
}
|
||||
|
||||
// Skip stream labels which value will be replaced by parsed labels
|
||||
if labelsContain(b.add[ParsedLabel], l.Name) {
|
||||
continue
|
||||
return
|
||||
}
|
||||
|
||||
// Take value from stream label if present
|
||||
if labelsContain(b.add[StreamLabel], l.Name) {
|
||||
buf = append(buf, labels.Label{Name: l.Name, Value: b.add[StreamLabel].Get(l.Name)})
|
||||
if value, found := findLabelValue(b.add[StreamLabel], l.Name); found {
|
||||
buf = append(buf, labels.Label{Name: l.Name, Value: value})
|
||||
} else {
|
||||
buf = append(buf, l)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
if categoriesContain(categories, StructuredMetadataLabel) {
|
||||
@@ -611,9 +591,8 @@ func (b *LabelsBuilder) LabelsResult() LabelsResult {
|
||||
|
||||
// Get all labels at once and sort them
|
||||
b.buf = b.UnsortedLabels(b.buf)
|
||||
// sort.Sort(b.buf)
|
||||
slices.SortFunc(b.buf, func(a, b labels.Label) int { return strings.Compare(a.Name, b.Name) })
|
||||
hash := b.hasher.Hash(b.buf)
|
||||
lbls := labels.New(b.buf...)
|
||||
hash := b.hasher.Hash(lbls)
|
||||
|
||||
if cached, ok := b.resultCache[hash]; ok {
|
||||
return cached
|
||||
@@ -639,32 +618,13 @@ func (b *LabelsBuilder) LabelsResult() LabelsResult {
|
||||
}
|
||||
}
|
||||
|
||||
result := NewLabelsResult(b.buf.String(), hash, labels.New(stream...), labels.New(meta...), labels.New(parsed...))
|
||||
result := NewLabelsResult(lbls.String(), hash, labels.New(stream...), labels.New(meta...), labels.New(parsed...))
|
||||
b.resultCache[hash] = result
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func flattenLabels(buf labels.Labels, many ...labels.Labels) labels.Labels {
|
||||
var size int
|
||||
for _, lbls := range many {
|
||||
size += len(lbls)
|
||||
}
|
||||
|
||||
if buf == nil || cap(buf) < size {
|
||||
buf = make(labels.Labels, 0, size)
|
||||
} else {
|
||||
buf = buf[:0]
|
||||
}
|
||||
|
||||
for _, lbls := range many {
|
||||
buf = append(buf, lbls...)
|
||||
}
|
||||
sort.Sort(buf)
|
||||
return buf
|
||||
}
|
||||
|
||||
func labelsContain(labels labels.Labels, name string) bool {
|
||||
func labelsContain(labels []labels.Label, name string) bool {
|
||||
for _, l := range labels {
|
||||
if l.Name == name {
|
||||
return true
|
||||
@@ -673,13 +633,23 @@ func labelsContain(labels labels.Labels, name string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (b *BaseLabelsBuilder) toUncategorizedResult(buf labels.Labels) LabelsResult {
|
||||
hash := b.hasher.Hash(buf)
|
||||
func findLabelValue(labels []labels.Label, name string) (string, bool) {
|
||||
for _, l := range labels {
|
||||
if l.Name == name {
|
||||
return l.Value, true
|
||||
}
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
|
||||
func (b *BaseLabelsBuilder) toUncategorizedResult(buf []labels.Label) LabelsResult {
|
||||
lbls := labels.New(buf...)
|
||||
hash := b.hasher.Hash(lbls)
|
||||
if cached, ok := b.resultCache[hash]; ok {
|
||||
return cached
|
||||
}
|
||||
|
||||
res := NewLabelsResult(buf.String(), hash, buf.Copy(), nil, nil)
|
||||
res := NewLabelsResult(lbls.String(), hash, lbls, labels.EmptyLabels(), labels.EmptyLabels())
|
||||
b.resultCache[hash] = res
|
||||
return res
|
||||
}
|
||||
@@ -714,7 +684,7 @@ func (b *LabelsBuilder) GroupedLabels() LabelsResult {
|
||||
|
||||
func (b *LabelsBuilder) withResult() LabelsResult {
|
||||
if b.buf == nil {
|
||||
b.buf = make(labels.Labels, 0, len(b.groups))
|
||||
b.buf = make([]labels.Label, 0, len(b.groups))
|
||||
} else {
|
||||
b.buf = b.buf[:0]
|
||||
}
|
||||
@@ -736,11 +706,10 @@ Outer:
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, l := range b.base {
|
||||
if g == l.Name {
|
||||
b.buf = append(b.buf, l)
|
||||
continue Outer
|
||||
}
|
||||
|
||||
value := b.base.Get(g)
|
||||
if value != "" {
|
||||
b.buf = append(b.buf, labels.Label{Name: g, Value: value})
|
||||
}
|
||||
}
|
||||
return b.toUncategorizedResult(b.buf)
|
||||
@@ -748,35 +717,35 @@ Outer:
|
||||
|
||||
func (b *LabelsBuilder) withoutResult() LabelsResult {
|
||||
if b.buf == nil {
|
||||
size := len(b.base) + b.sizeAdd() - len(b.del) - len(b.groups)
|
||||
size := b.base.Len() + b.sizeAdd() - len(b.del) - len(b.groups)
|
||||
if size < 0 {
|
||||
size = 0
|
||||
}
|
||||
b.buf = make(labels.Labels, 0, size)
|
||||
b.buf = make([]labels.Label, 0, size)
|
||||
} else {
|
||||
b.buf = b.buf[:0]
|
||||
}
|
||||
Outer:
|
||||
for _, l := range b.base {
|
||||
|
||||
b.base.Range(func(l labels.Label) {
|
||||
for _, n := range b.del {
|
||||
if l.Name == n {
|
||||
continue Outer
|
||||
return
|
||||
}
|
||||
}
|
||||
for _, lbls := range b.add {
|
||||
for _, la := range lbls {
|
||||
if l.Name == la.Name {
|
||||
continue Outer
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, lg := range b.groups {
|
||||
if l.Name == lg {
|
||||
continue Outer
|
||||
return
|
||||
}
|
||||
}
|
||||
b.buf = append(b.buf, l)
|
||||
}
|
||||
})
|
||||
|
||||
for category, lbls := range b.add {
|
||||
OuterAdd:
|
||||
@@ -792,7 +761,7 @@ Outer:
|
||||
b.buf = append(b.buf, la)
|
||||
}
|
||||
}
|
||||
sort.Sort(b.buf)
|
||||
|
||||
return b.toUncategorizedResult(b.buf)
|
||||
}
|
||||
|
||||
@@ -806,7 +775,7 @@ func (b *LabelsBuilder) toBaseGroup() LabelsResult {
|
||||
} else {
|
||||
lbs = labels.NewBuilder(b.base).Keep(b.groups...).Labels()
|
||||
}
|
||||
res := NewLabelsResult(lbs.String(), lbs.Hash(), lbs, nil, nil)
|
||||
res := NewLabelsResult(lbs.String(), lbs.Hash(), lbs, labels.EmptyLabels(), labels.EmptyLabels())
|
||||
b.groupedResult = res
|
||||
return res
|
||||
}
|
||||
@@ -831,26 +800,3 @@ func (i internedStringSet) Get(data []byte, createNew func() (string, bool)) (st
|
||||
}{s: newStr, ok: ok}
|
||||
return newStr, ok
|
||||
}
|
||||
|
||||
// BufferedLabelsBuilder is a simple builder that uses a label buffer passed in.
|
||||
// It is used to avoid allocations when building labels.
|
||||
type BufferedLabelsBuilder struct {
|
||||
buf labels.Labels
|
||||
}
|
||||
|
||||
func NewBufferedLabelsBuilder(labels labels.Labels) *BufferedLabelsBuilder {
|
||||
return &BufferedLabelsBuilder{buf: labels[:0]}
|
||||
}
|
||||
|
||||
func (b *BufferedLabelsBuilder) Reset() {
|
||||
b.buf = b.buf[:0]
|
||||
}
|
||||
|
||||
func (b *BufferedLabelsBuilder) Add(label labels.Label) {
|
||||
b.buf = append(b.buf, label)
|
||||
}
|
||||
|
||||
func (b *BufferedLabelsBuilder) Labels() labels.Labels {
|
||||
//slices.SortFunc(b.buf, func(a, b labels.Label) int { return strings.Compare(a.Name, b.Name) })
|
||||
return b.buf
|
||||
}
|
||||
|
||||
47
pkg/logql/log/labels_slicelabels.go
Normal file
47
pkg/logql/log/labels_slicelabels.go
Normal file
@@ -0,0 +1,47 @@
|
||||
//go:build !stringlabels && !dedupelabels
|
||||
|
||||
package log
|
||||
|
||||
import "github.com/prometheus/prometheus/model/labels"
|
||||
|
||||
type hasher struct {
|
||||
buf []byte // buffer for computing hash without bytes slice allocation.
|
||||
}
|
||||
|
||||
// newHasher returns a hasher that computes hashes for labels by reusing the same buffer.
|
||||
func newHasher() *hasher {
|
||||
return &hasher{
|
||||
buf: make([]byte, 0, 1024),
|
||||
}
|
||||
}
|
||||
|
||||
// Hash computes a hash of lbs.
|
||||
// It is not guaranteed to be stable across different Loki processes or versions.
|
||||
func (h *hasher) Hash(lbs labels.Labels) uint64 {
|
||||
var hash uint64
|
||||
hash, h.buf = lbs.HashWithoutLabels(h.buf, []string(nil)...)
|
||||
return hash
|
||||
}
|
||||
|
||||
// BufferedLabelsBuilder is a simple builder that uses a label buffer passed in.
|
||||
// It is used to avoid allocations when building labels.
|
||||
type BufferedLabelsBuilder struct {
|
||||
buf labels.Labels
|
||||
}
|
||||
|
||||
func NewBufferedLabelsBuilder(labels labels.Labels) *BufferedLabelsBuilder {
|
||||
return &BufferedLabelsBuilder{buf: labels[:0]}
|
||||
}
|
||||
|
||||
func (b *BufferedLabelsBuilder) Reset() {
|
||||
b.buf = b.buf[:0]
|
||||
}
|
||||
|
||||
func (b *BufferedLabelsBuilder) Add(label labels.Label) {
|
||||
b.buf = append(b.buf, label)
|
||||
}
|
||||
|
||||
func (b *BufferedLabelsBuilder) Labels() labels.Labels {
|
||||
//slices.SortFunc(b.buf, func(a, b labels.Label) int { return strings.Compare(a.Name, b.Name) })
|
||||
return b.buf
|
||||
}
|
||||
43
pkg/logql/log/labels_stringlabels.go
Normal file
43
pkg/logql/log/labels_stringlabels.go
Normal file
@@ -0,0 +1,43 @@
|
||||
//go:build stringlabels
|
||||
|
||||
package log
|
||||
|
||||
import "github.com/prometheus/prometheus/model/labels"
|
||||
|
||||
type hasher struct{}
|
||||
|
||||
// newHasher returns a hasher that computes hashes for labels.
|
||||
func newHasher() *hasher {
|
||||
return &hasher{}
|
||||
}
|
||||
|
||||
// Hash computes a hash of lbs.
|
||||
// It is not guaranteed to be stable across different Loki processes or versions.
|
||||
func (h *hasher) Hash(lbs labels.Labels) uint64 {
|
||||
// We use Hash() here because there's no performance advantage to using HashWithoutLabels() with stringlabels.
|
||||
// The results from Hash(l) and HashWithoutLabels(l, []string{}) are different with stringlabels, so using Hash
|
||||
// here also simplifies our tests.
|
||||
return lbs.Hash()
|
||||
}
|
||||
|
||||
// BufferedLabelsBuilder is a simple builder that uses a label buffer passed in.
|
||||
// It is used to avoid allocations when building labels.
|
||||
type BufferedLabelsBuilder struct {
|
||||
builder *labels.Builder
|
||||
}
|
||||
|
||||
func NewBufferedLabelsBuilder(l labels.Labels) *BufferedLabelsBuilder {
|
||||
return &BufferedLabelsBuilder{builder: labels.NewBuilder(l)}
|
||||
}
|
||||
|
||||
func (b *BufferedLabelsBuilder) Reset() {
|
||||
b.builder.Reset(labels.EmptyLabels())
|
||||
}
|
||||
|
||||
func (b *BufferedLabelsBuilder) Add(label labels.Label) {
|
||||
b.builder.Set(label.Name, label.Value)
|
||||
}
|
||||
|
||||
func (b *BufferedLabelsBuilder) Labels() labels.Labels {
|
||||
return b.builder.Labels()
|
||||
}
|
||||
@@ -2,7 +2,8 @@ package log
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"slices"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -64,7 +65,7 @@ func TestLabelsBuilder_LabelsError(t *testing.T) {
|
||||
require.Equal(t, expectedLbs.String(), lbsWithErr.String())
|
||||
require.Equal(t, expectedLbs.Hash(), lbsWithErr.Hash())
|
||||
require.Equal(t, labels.FromStrings("already", "in"), lbsWithErr.Stream())
|
||||
require.Nil(t, lbsWithErr.StructuredMetadata())
|
||||
require.Equal(t, labels.EmptyLabels(), lbsWithErr.StructuredMetadata())
|
||||
require.Equal(t, labels.FromStrings(logqlmodel.ErrorLabel, "err"), lbsWithErr.Parsed())
|
||||
|
||||
// make sure the original labels is unchanged.
|
||||
@@ -89,7 +90,7 @@ func TestLabelsBuilder_LabelsErrorFromAdd(t *testing.T) {
|
||||
require.Equal(t, expectedLbs.String(), lbsWithErr.String())
|
||||
require.Equal(t, expectedLbs.Hash(), lbsWithErr.Hash())
|
||||
require.Equal(t, labels.FromStrings("already", "in"), lbsWithErr.Stream())
|
||||
require.Nil(t, lbsWithErr.StructuredMetadata())
|
||||
require.Equal(t, labels.EmptyLabels(), lbsWithErr.StructuredMetadata())
|
||||
require.Equal(t, labels.FromStrings(logqlmodel.ErrorLabel, "test error", logqlmodel.ErrorDetailsLabel, "test details"), lbsWithErr.Parsed())
|
||||
|
||||
// make sure the original labels is unchanged.
|
||||
@@ -187,11 +188,8 @@ func TestLabelsBuilder_LabelsResult(t *testing.T) {
|
||||
"buzz", "fuzz",
|
||||
"ToReplace", "other",
|
||||
)
|
||||
expected := make(labels.Labels, 0, len(expectedStreamLbls)+len(expectedStucturedMetadataLbls)+len(expectedParsedLbls))
|
||||
expected = append(expected, expectedStreamLbls...)
|
||||
expected = append(expected, expectedStucturedMetadataLbls...)
|
||||
expected = append(expected, expectedParsedLbls...)
|
||||
expected = labels.New(expected...)
|
||||
|
||||
expected := mergeLabels(expectedStreamLbls, expectedStucturedMetadataLbls, expectedParsedLbls)
|
||||
|
||||
assertLabelResult(t, expected, b.LabelsResult())
|
||||
// cached.
|
||||
@@ -220,11 +218,7 @@ func TestLabelsBuilder_LabelsResult(t *testing.T) {
|
||||
"ToReplace", "other",
|
||||
)
|
||||
|
||||
expected = make(labels.Labels, 0, len(expectedStreamLbls)+len(expectedStucturedMetadataLbls)+len(expectedParsedLbls))
|
||||
expected = append(expected, expectedStreamLbls...)
|
||||
expected = append(expected, expectedStucturedMetadataLbls...)
|
||||
expected = append(expected, expectedParsedLbls...)
|
||||
expected = labels.New(expected...)
|
||||
expected = mergeLabels(expectedStreamLbls, expectedStucturedMetadataLbls, expectedParsedLbls)
|
||||
assertLabelResult(t, expected, b.LabelsResult())
|
||||
// cached.
|
||||
assertLabelResult(t, expected, b.LabelsResult())
|
||||
@@ -250,11 +244,7 @@ func TestLabelsBuilder_Set(t *testing.T) {
|
||||
expectedStucturedMetadataLbls := labels.FromStrings("stzz", "stvzz")
|
||||
expectedParsedLbls := labels.FromStrings("toreplace", "buzz")
|
||||
|
||||
expected := make(labels.Labels, 0, len(expectedStreamLbls)+len(expectedStucturedMetadataLbls)+len(expectedParsedLbls))
|
||||
expected = append(expected, expectedStreamLbls...)
|
||||
expected = append(expected, expectedStucturedMetadataLbls...)
|
||||
expected = append(expected, expectedParsedLbls...)
|
||||
expected = labels.New(expected...)
|
||||
expected := mergeLabels(expectedStreamLbls, expectedStucturedMetadataLbls, expectedParsedLbls)
|
||||
|
||||
actual := b.LabelsResult()
|
||||
assertLabelResult(t, expected, actual)
|
||||
@@ -272,11 +262,7 @@ func TestLabelsBuilder_Set(t *testing.T) {
|
||||
expectedStucturedMetadataLbls = labels.FromStrings("stzz", "stvzz")
|
||||
expectedParsedLbls = labels.FromStrings("toreplace", "buzz")
|
||||
|
||||
expected = make(labels.Labels, 0, len(expectedStreamLbls)+len(expectedStucturedMetadataLbls)+len(expectedParsedLbls))
|
||||
expected = append(expected, expectedStreamLbls...)
|
||||
expected = append(expected, expectedStucturedMetadataLbls...)
|
||||
expected = append(expected, expectedParsedLbls...)
|
||||
expected = labels.New(expected...)
|
||||
expected = mergeLabels(expectedStreamLbls, expectedStucturedMetadataLbls, expectedParsedLbls)
|
||||
|
||||
actual = b.LabelsResult()
|
||||
assertLabelResult(t, expected, actual)
|
||||
@@ -293,11 +279,7 @@ func TestLabelsBuilder_Set(t *testing.T) {
|
||||
expectedStucturedMetadataLbls = labels.FromStrings("toreplace", "muzz")
|
||||
expectedParsedLbls = labels.FromStrings("stzz", "stvzz")
|
||||
|
||||
expected = make(labels.Labels, 0, len(expectedStreamLbls)+len(expectedStucturedMetadataLbls)+len(expectedParsedLbls))
|
||||
expected = append(expected, expectedStreamLbls...)
|
||||
expected = append(expected, expectedStucturedMetadataLbls...)
|
||||
expected = append(expected, expectedParsedLbls...)
|
||||
expected = labels.New(expected...)
|
||||
expected = mergeLabels(expectedStreamLbls, expectedStucturedMetadataLbls, expectedParsedLbls)
|
||||
|
||||
actual = b.LabelsResult()
|
||||
assertLabelResult(t, expected, actual)
|
||||
@@ -315,11 +297,7 @@ func TestLabelsBuilder_Set(t *testing.T) {
|
||||
expectedStucturedMetadataLbls = labels.FromStrings("stzz", "stvzzz")
|
||||
expectedParsedLbls = labels.FromStrings("toreplace", "puzz")
|
||||
|
||||
expected = make(labels.Labels, 0, len(expectedStreamLbls)+len(expectedStucturedMetadataLbls)+len(expectedParsedLbls))
|
||||
expected = append(expected, expectedStreamLbls...)
|
||||
expected = append(expected, expectedStucturedMetadataLbls...)
|
||||
expected = append(expected, expectedParsedLbls...)
|
||||
expected = labels.New(expected...)
|
||||
expected = mergeLabels(expectedStreamLbls, expectedStucturedMetadataLbls, expectedParsedLbls)
|
||||
|
||||
actual = b.LabelsResult()
|
||||
assertLabelResult(t, expected, actual)
|
||||
@@ -336,11 +314,7 @@ func TestLabelsBuilder_Set(t *testing.T) {
|
||||
expectedStucturedMetadataLbls = labels.FromStrings("toreplace", "muzz")
|
||||
expectedParsedLbls = labels.FromStrings("stzz", "stvzzz")
|
||||
|
||||
expected = make(labels.Labels, 0, len(expectedStreamLbls)+len(expectedStucturedMetadataLbls)+len(expectedParsedLbls))
|
||||
expected = append(expected, expectedStreamLbls...)
|
||||
expected = append(expected, expectedStucturedMetadataLbls...)
|
||||
expected = append(expected, expectedParsedLbls...)
|
||||
expected = labels.New(expected...)
|
||||
expected = mergeLabels(expectedStreamLbls, expectedStucturedMetadataLbls, expectedParsedLbls)
|
||||
|
||||
actual = b.LabelsResult()
|
||||
assertLabelResult(t, expected, actual)
|
||||
@@ -357,31 +331,37 @@ func TestLabelsBuilder_UnsortedLabels(t *testing.T) {
|
||||
}
|
||||
lbs := labels.FromStrings(strs...)
|
||||
b := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash())
|
||||
b.add[StructuredMetadataLabel] = labels.FromStrings("toreplace", "buzz", "fzz", "bzz")
|
||||
b.add[ParsedLabel] = labels.FromStrings("pzz", "pvzz")
|
||||
expected := labels.FromStrings("cluster", "us-central1", "namespace", "loki", "fzz", "bzz", "toreplace", "buzz", "pzz", "pvzz")
|
||||
b.add[StructuredMetadataLabel] = []labels.Label{{"toreplace", "buzz"}, {"fzz", "bzz"}}
|
||||
b.add[ParsedLabel] = []labels.Label{{"pzz", "pvzz"}}
|
||||
expected := []labels.Label{{"cluster", "us-central1"}, {"namespace", "loki"}, {"fzz", "bzz"}, {"toreplace", "buzz"}, {"pzz", "pvzz"}}
|
||||
actual := b.UnsortedLabels(nil)
|
||||
require.ElementsMatch(t, expected, actual)
|
||||
|
||||
b.Reset()
|
||||
b.add[StructuredMetadataLabel] = labels.FromStrings("fzz", "bzz")
|
||||
b.add[ParsedLabel] = labels.FromStrings("toreplace", "buzz", "pzz", "pvzz")
|
||||
expected = labels.FromStrings("cluster", "us-central1", "namespace", "loki", "fzz", "bzz", "toreplace", "buzz", "pzz", "pvzz")
|
||||
b.add[StructuredMetadataLabel] = []labels.Label{{"fzz", "bzz"}}
|
||||
b.add[ParsedLabel] = []labels.Label{{"toreplace", "buzz"}, {"pzz", "pvzz"}}
|
||||
expected = []labels.Label{{"cluster", "us-central1"}, {"namespace", "loki"}, {"fzz", "bzz"}, {"toreplace", "buzz"}, {"pzz", "pvzz"}}
|
||||
actual = b.UnsortedLabels(nil)
|
||||
sort.Sort(expected)
|
||||
sort.Sort(actual)
|
||||
sortLabelSlice(expected)
|
||||
sortLabelSlice(actual)
|
||||
assert.Equal(t, expected, actual)
|
||||
|
||||
b.Reset()
|
||||
b.add[StructuredMetadataLabel] = labels.FromStrings("fzz", "bzz", "toreplacezz", "test")
|
||||
b.add[ParsedLabel] = labels.FromStrings("toreplacezz", "buzz", "pzz", "pvzz")
|
||||
expected = labels.FromStrings("cluster", "us-central1", "namespace", "loki", "fzz", "bzz", "toreplace", "fuzz", "pzz", "pvzz", "toreplacezz", "buzz")
|
||||
b.add[StructuredMetadataLabel] = []labels.Label{{"fzz", "bzz"}, {"toreplacezz", "test"}}
|
||||
b.add[ParsedLabel] = []labels.Label{{"toreplacezz", "buzz"}, {"pzz", "pvzz"}}
|
||||
expected = []labels.Label{{"cluster", "us-central1"}, {"namespace", "loki"}, {"fzz", "bzz"}, {"toreplace", "fuzz"}, {"pzz", "pvzz"}, {"toreplacezz", "buzz"}}
|
||||
actual = b.UnsortedLabels(nil)
|
||||
sort.Sort(expected)
|
||||
sort.Sort(actual)
|
||||
sortLabelSlice(expected)
|
||||
sortLabelSlice(actual)
|
||||
assert.Equal(t, expected, actual)
|
||||
}
|
||||
|
||||
func sortLabelSlice(l []labels.Label) {
|
||||
slices.SortFunc(l, func(a, b labels.Label) int {
|
||||
return strings.Compare(a.Name, b.Name)
|
||||
})
|
||||
}
|
||||
|
||||
func TestLabelsBuilder_GroupedLabelsResult(t *testing.T) {
|
||||
strs := []string{"namespace", "loki",
|
||||
"job", "us-central1/loki",
|
||||
@@ -466,6 +446,20 @@ func assertLabelResult(t *testing.T, lbs labels.Labels, res LabelsResult) {
|
||||
)
|
||||
}
|
||||
|
||||
func mergeLabels(streamLabels, structuredMetadataLabels, parsedLabels labels.Labels) labels.Labels {
|
||||
builder := labels.NewBuilder(streamLabels)
|
||||
|
||||
structuredMetadataLabels.Range(func(l labels.Label) {
|
||||
builder.Set(l.Name, l.Value)
|
||||
})
|
||||
|
||||
parsedLabels.Range(func(l labels.Label) {
|
||||
builder.Set(l.Name, l.Value)
|
||||
})
|
||||
|
||||
return builder.Labels()
|
||||
}
|
||||
|
||||
// benchmark streamLineSampleExtractor.Process method
|
||||
func BenchmarkStreamLineSampleExtractor_Process(b *testing.B) {
|
||||
// Setup some test data
|
||||
@@ -477,12 +471,12 @@ func BenchmarkStreamLineSampleExtractor_Process(b *testing.B) {
|
||||
"stream", "stdout",
|
||||
)
|
||||
|
||||
structuredMeta := []labels.Label{
|
||||
{Name: "level", Value: "info"},
|
||||
{Name: "caller", Value: "http.go:42"},
|
||||
{Name: "user", Value: "john"},
|
||||
{Name: "trace_id", Value: "abc123"},
|
||||
}
|
||||
structuredMeta := labels.FromStrings(
|
||||
"level", "info",
|
||||
"caller", "http.go:42",
|
||||
"user", "john",
|
||||
"trace_id", "abc123",
|
||||
)
|
||||
|
||||
testLine := []byte(`{"timestamp":"2024-01-01T00:00:00Z","level":"info","message":"test message","duration_ms":150}`)
|
||||
|
||||
@@ -500,7 +494,7 @@ func BenchmarkStreamLineSampleExtractor_Process(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, _ = streamEx.Process(time.Now().UnixNano(), testLine, structuredMeta...)
|
||||
_, _ = streamEx.Process(time.Now().UnixNano(), testLine, structuredMeta)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -35,8 +35,8 @@ type SampleExtractor interface {
|
||||
// A StreamSampleExtractor never mutates the received line.
|
||||
type StreamSampleExtractor interface {
|
||||
BaseLabels() LabelsResult
|
||||
Process(ts int64, line []byte, structuredMetadata ...labels.Label) ([]ExtractedSample, bool)
|
||||
ProcessString(ts int64, line string, structuredMetadata ...labels.Label) ([]ExtractedSample, bool)
|
||||
Process(ts int64, line []byte, structuredMetadata labels.Labels) ([]ExtractedSample, bool)
|
||||
ProcessString(ts int64, line string, structuredMetadata labels.Labels) ([]ExtractedSample, bool)
|
||||
ReferencedStructuredMetadata() bool
|
||||
}
|
||||
|
||||
@@ -99,7 +99,7 @@ func (l *streamLineSampleExtractor) ReferencedStructuredMetadata() bool {
|
||||
return l.builder.referencedStructuredMetadata
|
||||
}
|
||||
|
||||
func (l *streamLineSampleExtractor) Process(ts int64, line []byte, structuredMetadata ...labels.Label) ([]ExtractedSample, bool) {
|
||||
func (l *streamLineSampleExtractor) Process(ts int64, line []byte, structuredMetadata labels.Labels) ([]ExtractedSample, bool) {
|
||||
l.builder.Reset()
|
||||
l.builder.Add(StructuredMetadataLabel, structuredMetadata)
|
||||
|
||||
@@ -120,9 +120,9 @@ func (l *streamLineSampleExtractor) Process(ts int64, line []byte, structuredMet
|
||||
return []ExtractedSample{{Value: value, Labels: labels}}, true
|
||||
}
|
||||
|
||||
func (l *streamLineSampleExtractor) ProcessString(ts int64, line string, structuredMetadata ...labels.Label) ([]ExtractedSample, bool) {
|
||||
func (l *streamLineSampleExtractor) ProcessString(ts int64, line string, structuredMetadata labels.Labels) ([]ExtractedSample, bool) {
|
||||
// unsafe get bytes since we have the guarantee that the line won't be mutated.
|
||||
return l.Process(ts, unsafeGetBytes(line), structuredMetadata...)
|
||||
return l.Process(ts, unsafeGetBytes(line), structuredMetadata)
|
||||
}
|
||||
|
||||
func (l *streamLineSampleExtractor) BaseLabels() LabelsResult { return l.builder.currentResult }
|
||||
@@ -199,7 +199,7 @@ func (l *labelSampleExtractor) ForStream(labels labels.Labels) StreamSampleExtra
|
||||
return res
|
||||
}
|
||||
|
||||
func (l *streamLabelSampleExtractor) Process(ts int64, line []byte, structuredMetadata ...labels.Label) ([]ExtractedSample, bool) {
|
||||
func (l *streamLabelSampleExtractor) Process(ts int64, line []byte, structuredMetadata labels.Labels) ([]ExtractedSample, bool) {
|
||||
// Apply the pipeline first.
|
||||
l.builder.Reset()
|
||||
l.builder.Add(StructuredMetadataLabel, structuredMetadata)
|
||||
@@ -230,9 +230,9 @@ func (l *streamLabelSampleExtractor) Process(ts int64, line []byte, structuredMe
|
||||
return []ExtractedSample{{Value: v, Labels: l.builder.GroupedLabels()}}, true
|
||||
}
|
||||
|
||||
func (l *streamLabelSampleExtractor) ProcessString(ts int64, line string, structuredMetadata ...labels.Label) ([]ExtractedSample, bool) {
|
||||
func (l *streamLabelSampleExtractor) ProcessString(ts int64, line string, structuredMetadata labels.Labels) ([]ExtractedSample, bool) {
|
||||
// unsafe get bytes since we have the guarantee that the line won't be mutated.
|
||||
return l.Process(ts, unsafeGetBytes(line), structuredMetadata...)
|
||||
return l.Process(ts, unsafeGetBytes(line), structuredMetadata)
|
||||
}
|
||||
|
||||
func (l *streamLabelSampleExtractor) BaseLabels() LabelsResult { return l.builder.currentResult }
|
||||
@@ -283,7 +283,7 @@ func (sp *filteringStreamExtractor) BaseLabels() LabelsResult {
|
||||
return sp.extractor.BaseLabels()
|
||||
}
|
||||
|
||||
func (sp *filteringStreamExtractor) Process(ts int64, line []byte, structuredMetadata ...labels.Label) ([]ExtractedSample, bool) {
|
||||
func (sp *filteringStreamExtractor) Process(ts int64, line []byte, structuredMetadata labels.Labels) ([]ExtractedSample, bool) {
|
||||
for _, filter := range sp.filters {
|
||||
if ts < filter.start || ts > filter.end {
|
||||
continue
|
||||
@@ -295,10 +295,10 @@ func (sp *filteringStreamExtractor) Process(ts int64, line []byte, structuredMet
|
||||
}
|
||||
}
|
||||
|
||||
return sp.extractor.Process(ts, line, structuredMetadata...)
|
||||
return sp.extractor.Process(ts, line, structuredMetadata)
|
||||
}
|
||||
|
||||
func (sp *filteringStreamExtractor) ProcessString(ts int64, line string, structuredMetadata ...labels.Label) ([]ExtractedSample, bool) {
|
||||
func (sp *filteringStreamExtractor) ProcessString(ts int64, line string, structuredMetadata labels.Labels) ([]ExtractedSample, bool) {
|
||||
for _, filter := range sp.filters {
|
||||
if ts < filter.start || ts > filter.end {
|
||||
continue
|
||||
@@ -310,7 +310,7 @@ func (sp *filteringStreamExtractor) ProcessString(ts int64, line string, structu
|
||||
}
|
||||
}
|
||||
|
||||
return sp.extractor.ProcessString(ts, line, labels.EmptyLabels()...)
|
||||
return sp.extractor.ProcessString(ts, line, structuredMetadata)
|
||||
}
|
||||
|
||||
func convertFloat(v string) (float64, error) {
|
||||
|
||||
@@ -251,7 +251,7 @@ func Test_labelSampleExtractor_Extract(t *testing.T) {
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
samples, ok := tt.ex.ForStream(tt.in).Process(0, []byte(tt.line), tt.structuredMetadata...)
|
||||
samples, ok := tt.ex.ForStream(tt.in).Process(0, []byte(tt.line), tt.structuredMetadata)
|
||||
require.Equal(t, tt.wantOk, ok)
|
||||
if ok {
|
||||
require.Len(t, samples, 1, "Expected exactly one sample")
|
||||
@@ -259,7 +259,7 @@ func Test_labelSampleExtractor_Extract(t *testing.T) {
|
||||
require.Equal(t, tt.wantLbs, samples[0].Labels.Labels())
|
||||
}
|
||||
|
||||
samples, ok = tt.ex.ForStream(tt.in).ProcessString(0, tt.line, tt.structuredMetadata...)
|
||||
samples, ok = tt.ex.ForStream(tt.in).ProcessString(0, tt.line, tt.structuredMetadata)
|
||||
require.Equal(t, tt.wantOk, ok)
|
||||
if ok {
|
||||
require.Len(t, samples, 1, "Expected exactly one sample")
|
||||
@@ -273,7 +273,7 @@ func Test_labelSampleExtractor_Extract(t *testing.T) {
|
||||
func Test_Extract_ExpectedLabels(t *testing.T) {
|
||||
ex := mustSampleExtractor(LabelExtractorWithStages("duration", ConvertDuration, []string{"foo"}, false, false, []Stage{NewJSONParser(false)}, NoopStage))
|
||||
|
||||
samples, ok := ex.ForStream(labels.FromStrings("bar", "foo")).ProcessString(0, `{"duration":"20ms","foo":"json"}`)
|
||||
samples, ok := ex.ForStream(labels.FromStrings("bar", "foo")).ProcessString(0, `{"duration":"20ms","foo":"json"}`, labels.EmptyLabels())
|
||||
require.True(t, ok)
|
||||
require.Len(t, samples, 1, "Expected exactly one sample")
|
||||
require.Equal(t, (20 * time.Millisecond).Seconds(), samples[0].Value)
|
||||
@@ -325,7 +325,7 @@ func TestLabelExtractorWithStages(t *testing.T) {
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
for _, line := range tc.checkLines {
|
||||
samples, ok := tc.extractor.ForStream(labels.FromStrings("bar", "foo")).ProcessString(0, line.logLine)
|
||||
samples, ok := tc.extractor.ForStream(labels.FromStrings("bar", "foo")).ProcessString(0, line.logLine, labels.EmptyLabels())
|
||||
skipped := !ok
|
||||
assert.Equal(t, line.skip, skipped, "line", line.logLine)
|
||||
if !skipped {
|
||||
@@ -360,13 +360,13 @@ func TestNewLineSampleExtractor(t *testing.T) {
|
||||
)
|
||||
|
||||
sse := se.ForStream(lbs)
|
||||
samples, ok := sse.Process(0, []byte(`foo`))
|
||||
samples, ok := sse.Process(0, []byte(`foo`), labels.EmptyLabels())
|
||||
require.True(t, ok)
|
||||
require.Len(t, samples, 1, "Expected exactly one sample")
|
||||
require.Equal(t, 1., samples[0].Value)
|
||||
assertLabelResult(t, lbs, samples[0].Labels)
|
||||
|
||||
samples, ok = sse.ProcessString(0, `foo`)
|
||||
samples, ok = sse.ProcessString(0, `foo`, labels.EmptyLabels())
|
||||
require.True(t, ok)
|
||||
require.Len(t, samples, 1, "Expected exactly one sample")
|
||||
require.Equal(t, 1., samples[0].Value)
|
||||
@@ -377,21 +377,21 @@ func TestNewLineSampleExtractor(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
sse = se.ForStream(lbs)
|
||||
samples, ok = sse.Process(0, []byte(`foo`))
|
||||
samples, ok = sse.Process(0, []byte(`foo`), labels.EmptyLabels())
|
||||
require.True(t, ok)
|
||||
require.Len(t, samples, 1, "Expected exactly one sample")
|
||||
require.Equal(t, 3., samples[0].Value)
|
||||
assertLabelResult(t, labels.FromStrings("namespace", "dev"), samples[0].Labels)
|
||||
|
||||
sse = se.ForStream(lbs)
|
||||
_, ok = sse.Process(0, []byte(`nope`))
|
||||
_, ok = sse.Process(0, []byte(`nope`), labels.EmptyLabels())
|
||||
require.False(t, ok)
|
||||
}
|
||||
|
||||
func TestNewLineSampleExtractorWithStructuredMetadata(t *testing.T) {
|
||||
lbs := labels.FromStrings("foo", "bar")
|
||||
structuredMetadata := labels.FromStrings("user", "bob")
|
||||
expectedLabelsResults := append(lbs, structuredMetadata...)
|
||||
expectedLabelsResults := appendLabels(lbs, structuredMetadata)
|
||||
se, err := NewLineSampleExtractor(CountExtractor, []Stage{
|
||||
NewStringLabelFilter(labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")),
|
||||
NewStringLabelFilter(labels.MustNewMatcher(labels.MatchEqual, "user", "bob")),
|
||||
@@ -399,34 +399,28 @@ func TestNewLineSampleExtractorWithStructuredMetadata(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
sse := se.ForStream(lbs)
|
||||
samples, ok := sse.Process(0, []byte(`foo`), structuredMetadata...)
|
||||
samples, ok := sse.Process(0, []byte(`foo`), structuredMetadata)
|
||||
require.True(t, ok)
|
||||
require.Len(t, samples, 1, "Expected exactly one sample")
|
||||
require.Equal(t, 1., samples[0].Value)
|
||||
assertLabelResult(t, expectedLabelsResults, samples[0].Labels)
|
||||
|
||||
samples, ok = sse.ProcessString(0, `foo`, structuredMetadata...)
|
||||
samples, ok = sse.ProcessString(0, `foo`, structuredMetadata)
|
||||
require.True(t, ok)
|
||||
require.Len(t, samples, 1, "Expected exactly one sample")
|
||||
require.Equal(t, 1., samples[0].Value)
|
||||
assertLabelResult(t, expectedLabelsResults, samples[0].Labels)
|
||||
|
||||
// test duplicated structured metadata with stream labels
|
||||
expectedLabelsResults = append(lbs, labels.Label{
|
||||
Name: "foo_extracted", Value: "baz",
|
||||
})
|
||||
expectedLabelsResults = append(expectedLabelsResults, structuredMetadata...)
|
||||
samples, ok = sse.Process(0, []byte(`foo`), append(structuredMetadata, labels.Label{
|
||||
Name: "foo", Value: "baz",
|
||||
})...)
|
||||
expectedLabelsResults = appendLabel(lbs, "foo_extracted", "baz")
|
||||
expectedLabelsResults = appendLabels(expectedLabelsResults, structuredMetadata)
|
||||
samples, ok = sse.Process(0, []byte(`foo`), appendLabel(structuredMetadata, "foo", "baz"))
|
||||
require.True(t, ok)
|
||||
require.Len(t, samples, 1, "Expected exactly one sample")
|
||||
require.Equal(t, 1., samples[0].Value)
|
||||
assertLabelResult(t, expectedLabelsResults, samples[0].Labels)
|
||||
|
||||
samples, ok = sse.ProcessString(0, `foo`, append(structuredMetadata, labels.Label{
|
||||
Name: "foo", Value: "baz",
|
||||
})...)
|
||||
samples, ok = sse.ProcessString(0, `foo`, appendLabel(structuredMetadata, "foo", "baz"))
|
||||
require.True(t, ok)
|
||||
require.Len(t, samples, 1, "Expected exactly one sample")
|
||||
require.Equal(t, 1., samples[0].Value)
|
||||
@@ -440,21 +434,27 @@ func TestNewLineSampleExtractorWithStructuredMetadata(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
sse = se.ForStream(lbs)
|
||||
samples, ok = sse.Process(0, []byte(`foo`), structuredMetadata...)
|
||||
samples, ok = sse.Process(0, []byte(`foo`), structuredMetadata)
|
||||
require.True(t, ok)
|
||||
require.Len(t, samples, 1, "Expected exactly one sample")
|
||||
require.Equal(t, 3., samples[0].Value)
|
||||
assertLabelResult(t, labels.FromStrings("foo", "bar"), samples[0].Labels)
|
||||
|
||||
sse = se.ForStream(lbs)
|
||||
_, ok = sse.Process(0, []byte(`nope`))
|
||||
_, ok = sse.Process(0, []byte(`nope`), labels.EmptyLabels())
|
||||
require.False(t, ok)
|
||||
}
|
||||
|
||||
func appendLabel(l labels.Labels, name, value string) labels.Labels {
|
||||
b := labels.NewBuilder(l)
|
||||
b.Set(name, value)
|
||||
return b.Labels()
|
||||
}
|
||||
|
||||
func TestFilteringSampleExtractor(t *testing.T) {
|
||||
se := NewFilteringSampleExtractor([]PipelineFilter{
|
||||
newPipelineFilter(2, 4, labels.FromStrings("foo", "bar", "bar", "baz"), nil, "e"),
|
||||
newPipelineFilter(3, 5, labels.FromStrings("baz", "foo"), nil, "e"),
|
||||
newPipelineFilter(2, 4, labels.FromStrings("foo", "bar", "bar", "baz"), labels.EmptyLabels(), "e"),
|
||||
newPipelineFilter(3, 5, labels.FromStrings("baz", "foo"), labels.EmptyLabels(), "e"),
|
||||
newPipelineFilter(3, 5, labels.FromStrings("foo", "baz"), labels.FromStrings("user", "bob"), "e"),
|
||||
}, newStubExtractor())
|
||||
|
||||
@@ -466,23 +466,23 @@ func TestFilteringSampleExtractor(t *testing.T) {
|
||||
structuredMetadata labels.Labels
|
||||
ok bool
|
||||
}{
|
||||
{"it is after the timerange", 6, "line", labels.FromStrings("baz", "foo"), nil, true},
|
||||
{"it is before the timerange", 1, "line", labels.FromStrings("baz", "foo"), nil, true},
|
||||
{"it doesn't match the filter", 3, "all good", labels.FromStrings("baz", "foo"), nil, true},
|
||||
{"it doesn't match all the selectors", 3, "line", labels.FromStrings("foo", "bar"), nil, true},
|
||||
{"it doesn't match any selectors", 3, "line", labels.FromStrings("beep", "boop"), nil, true},
|
||||
{"it matches all selectors", 3, "line", labels.FromStrings("foo", "bar", "bar", "baz"), nil, false},
|
||||
{"it is after the timerange", 6, "line", labels.FromStrings("baz", "foo"), labels.EmptyLabels(), true},
|
||||
{"it is before the timerange", 1, "line", labels.FromStrings("baz", "foo"), labels.EmptyLabels(), true},
|
||||
{"it doesn't match the filter", 3, "all good", labels.FromStrings("baz", "foo"), labels.EmptyLabels(), true},
|
||||
{"it doesn't match all the selectors", 3, "line", labels.FromStrings("foo", "bar"), labels.EmptyLabels(), true},
|
||||
{"it doesn't match any selectors", 3, "line", labels.FromStrings("beep", "boop"), labels.EmptyLabels(), true},
|
||||
{"it matches all selectors", 3, "line", labels.FromStrings("foo", "bar", "bar", "baz"), labels.EmptyLabels(), false},
|
||||
{"it doesn't match all structured metadata", 3, "line", labels.FromStrings("foo", "baz"), labels.FromStrings("user", "alice"), true},
|
||||
{"it matches all structured metadata", 3, "line", labels.FromStrings("foo", "baz"), labels.FromStrings("user", "bob"), false},
|
||||
{"it tries all the filters", 5, "line", labels.FromStrings("baz", "foo"), nil, false},
|
||||
{"it tries all the filters", 5, "line", labels.FromStrings("baz", "foo"), labels.EmptyLabels(), false},
|
||||
}
|
||||
|
||||
for _, test := range tt {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
_, ok := se.ForStream(test.labels).Process(test.ts, []byte(test.line), test.structuredMetadata...)
|
||||
_, ok := se.ForStream(test.labels).Process(test.ts, []byte(test.line), test.structuredMetadata)
|
||||
require.Equal(t, test.ok, ok)
|
||||
|
||||
_, ok = se.ForStream(test.labels).ProcessString(test.ts, test.line, test.structuredMetadata...)
|
||||
_, ok = se.ForStream(test.labels).ProcessString(test.ts, test.line, test.structuredMetadata)
|
||||
require.Equal(t, test.ok, ok)
|
||||
})
|
||||
}
|
||||
@@ -514,7 +514,7 @@ func (p *stubStreamExtractor) BaseLabels() LabelsResult {
|
||||
func (p *stubStreamExtractor) Process(
|
||||
_ int64,
|
||||
_ []byte,
|
||||
structuredMetadata ...labels.Label,
|
||||
structuredMetadata labels.Labels,
|
||||
) ([]ExtractedSample, bool) {
|
||||
builder := NewBaseLabelsBuilder().ForLabels(labels.FromStrings("foo", "bar"), 0)
|
||||
builder.Add(StructuredMetadataLabel, structuredMetadata)
|
||||
@@ -527,7 +527,7 @@ func (p *stubStreamExtractor) Process(
|
||||
func (p *stubStreamExtractor) ProcessString(
|
||||
_ int64,
|
||||
_ string,
|
||||
structuredMetadata ...labels.Label,
|
||||
structuredMetadata labels.Labels,
|
||||
) ([]ExtractedSample, bool) {
|
||||
builder := NewBaseLabelsBuilder().ForLabels(labels.FromStrings("foo", "bar"), 0)
|
||||
builder.Add(StructuredMetadataLabel, structuredMetadata)
|
||||
|
||||
@@ -234,7 +234,7 @@ func Test_ParserHints(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
for i, ex := range exs {
|
||||
res, ok := ex.ForStream(lbs).Process(0, append([]byte{}, tt.line...))
|
||||
res, ok := ex.ForStream(lbs).Process(0, append([]byte{}, tt.line...), labels.EmptyLabels())
|
||||
require.Equal(t, tt.expectOk, ok)
|
||||
|
||||
for _, sample := range res {
|
||||
|
||||
@@ -2,7 +2,6 @@ package log
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/grafana/loki/v3/pkg/logqlmodel"
|
||||
@@ -238,7 +237,7 @@ func TestKeyShortCircuit(t *testing.T) {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
_, result = tt.p.Process(0, tt.line, lbs)
|
||||
|
||||
require.Len(t, lbs.labels(), 1)
|
||||
require.Equal(t, 1, lbs.LabelsResult().Labels().Len())
|
||||
require.False(t, result)
|
||||
})
|
||||
}
|
||||
@@ -278,7 +277,7 @@ func TestLabelShortCircuit(t *testing.T) {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
_, result = tt.p.Process(0, tt.line, lbs)
|
||||
|
||||
require.Len(t, lbs.labels(), 1)
|
||||
require.Equal(t, 1, lbs.LabelsResult().Labels().Len())
|
||||
name, category, ok := lbs.GetWithCategory("name")
|
||||
require.True(t, ok)
|
||||
require.Equal(t, ParsedLabel, category)
|
||||
@@ -1075,7 +1074,7 @@ func TestLogfmtParser_parse(t *testing.T) {
|
||||
[]byte(`buzz=foo bar=<3D>f`),
|
||||
labels.EmptyLabels(),
|
||||
labels.FromStrings("bar", " f", "buzz", "foo"),
|
||||
nil,
|
||||
labels.EmptyLabels(),
|
||||
NoParserHints(),
|
||||
},
|
||||
{
|
||||
@@ -1084,7 +1083,7 @@ func TestLogfmtParser_parse(t *testing.T) {
|
||||
labels.FromStrings("foo", "bar"),
|
||||
labels.FromStrings("foo", "bar",
|
||||
"bar", "foo"),
|
||||
nil,
|
||||
labels.EmptyLabels(),
|
||||
NoParserHints(),
|
||||
},
|
||||
{
|
||||
@@ -1094,7 +1093,7 @@ func TestLogfmtParser_parse(t *testing.T) {
|
||||
labels.FromStrings("foo", "bar",
|
||||
"foobar", "foo bar",
|
||||
),
|
||||
nil,
|
||||
labels.EmptyLabels(),
|
||||
NoParserHints(),
|
||||
},
|
||||
{
|
||||
@@ -1104,7 +1103,7 @@ func TestLogfmtParser_parse(t *testing.T) {
|
||||
labels.FromStrings("a", "b",
|
||||
"foobar", "foo\nbar\tbaz",
|
||||
),
|
||||
nil,
|
||||
labels.EmptyLabels(),
|
||||
NoParserHints(),
|
||||
},
|
||||
{
|
||||
@@ -1114,7 +1113,7 @@ func TestLogfmtParser_parse(t *testing.T) {
|
||||
labels.FromStrings("a", "b",
|
||||
"foobar", "foo\nbar\tbaz",
|
||||
),
|
||||
nil,
|
||||
labels.EmptyLabels(),
|
||||
NoParserHints(),
|
||||
},
|
||||
{
|
||||
@@ -1124,7 +1123,7 @@ func TestLogfmtParser_parse(t *testing.T) {
|
||||
labels.FromStrings("a", "b",
|
||||
"foobar", `foo ba\r baz`,
|
||||
),
|
||||
nil,
|
||||
labels.EmptyLabels(),
|
||||
NoParserHints(),
|
||||
},
|
||||
{
|
||||
@@ -1134,7 +1133,7 @@ func TestLogfmtParser_parse(t *testing.T) {
|
||||
labels.FromStrings("a", "b",
|
||||
"foobar", "foo bar\nb\\az",
|
||||
),
|
||||
nil,
|
||||
labels.EmptyLabels(),
|
||||
NoParserHints(),
|
||||
},
|
||||
{
|
||||
@@ -1145,7 +1144,7 @@ func TestLogfmtParser_parse(t *testing.T) {
|
||||
"foobar", "foo bar",
|
||||
"latency", "10ms",
|
||||
),
|
||||
nil,
|
||||
labels.EmptyLabels(),
|
||||
NoParserHints(),
|
||||
},
|
||||
{
|
||||
@@ -1155,7 +1154,7 @@ func TestLogfmtParser_parse(t *testing.T) {
|
||||
labels.FromStrings("foo", "bar",
|
||||
"foobar", "foo bar",
|
||||
),
|
||||
nil,
|
||||
labels.EmptyLabels(),
|
||||
NoParserHints(),
|
||||
},
|
||||
{
|
||||
@@ -1166,7 +1165,7 @@ func TestLogfmtParser_parse(t *testing.T) {
|
||||
"foo_extracted", "foo bar",
|
||||
"foobar", "10ms",
|
||||
),
|
||||
nil,
|
||||
labels.EmptyLabels(),
|
||||
NoParserHints(),
|
||||
},
|
||||
{
|
||||
@@ -1178,7 +1177,7 @@ func TestLogfmtParser_parse(t *testing.T) {
|
||||
"foo_bar", "10ms",
|
||||
"test_dash", "foo",
|
||||
),
|
||||
nil,
|
||||
labels.EmptyLabels(),
|
||||
NoParserHints(),
|
||||
},
|
||||
{
|
||||
@@ -1186,7 +1185,7 @@ func TestLogfmtParser_parse(t *testing.T) {
|
||||
nil,
|
||||
labels.FromStrings("foo", "bar"),
|
||||
labels.FromStrings("foo", "bar"),
|
||||
nil,
|
||||
labels.EmptyLabels(),
|
||||
NoParserHints(),
|
||||
},
|
||||
{
|
||||
@@ -1268,10 +1267,9 @@ func TestLogfmtParser_parse(t *testing.T) {
|
||||
_, _ = p.Process(0, tt.line, b)
|
||||
|
||||
want := tt.want
|
||||
if tt.wantStrict != nil {
|
||||
if !tt.wantStrict.IsEmpty() {
|
||||
want = tt.wantStrict
|
||||
}
|
||||
sort.Sort(want)
|
||||
require.Equal(t, want, b.LabelsResult().Labels())
|
||||
})
|
||||
}
|
||||
@@ -1338,7 +1336,6 @@ func TestLogfmtParser_keepEmpty(t *testing.T) {
|
||||
_, _ = p.Process(0, tt.line, b)
|
||||
|
||||
want := tt.want
|
||||
sort.Sort(want)
|
||||
require.Equal(t, want, b.LabelsResult().Labels())
|
||||
})
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ func TestNoopPipeline(t *testing.T) {
|
||||
require.Equal(t, true, matches)
|
||||
|
||||
structuredMetadata := labels.FromStrings("y", "1", "z", "2")
|
||||
expectedLabelsResults := append(lbs, structuredMetadata...)
|
||||
expectedLabelsResults := appendLabels(lbs, structuredMetadata)
|
||||
l, lbr, matches = pipeline.ForStream(lbs).Process(0, []byte(""), structuredMetadata)
|
||||
require.Equal(t, []byte(""), l)
|
||||
require.Equal(t, NewLabelsResult(expectedLabelsResults.String(), expectedLabelsResults.Hash(), lbs, structuredMetadata, labels.EmptyLabels()), lbr)
|
||||
@@ -91,8 +91,8 @@ func TestPipeline(t *testing.T) {
|
||||
|
||||
// Reset caches
|
||||
p.baseBuilder.del = []string{"foo", "bar"}
|
||||
p.baseBuilder.add = [numValidCategories]labels.Labels{
|
||||
ParsedLabel: labels.FromStrings("baz", "blip"),
|
||||
p.baseBuilder.add = [numValidCategories][]labels.Label{
|
||||
ParsedLabel: {{"baz", "blip"}},
|
||||
}
|
||||
|
||||
p.Reset()
|
||||
@@ -106,7 +106,7 @@ func TestPipeline(t *testing.T) {
|
||||
func TestPipelineWithStructuredMetadata(t *testing.T) {
|
||||
lbs := labels.FromStrings("foo", "bar")
|
||||
structuredMetadata := labels.FromStrings("user", "bob")
|
||||
expectedLabelsResults := append(lbs, structuredMetadata...)
|
||||
expectedLabelsResults := appendLabels(lbs, structuredMetadata)
|
||||
p := NewPipeline([]Stage{
|
||||
NewStringLabelFilter(labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")),
|
||||
NewStringLabelFilter(labels.MustNewMatcher(labels.MatchEqual, "user", "bob")),
|
||||
@@ -130,7 +130,7 @@ func TestPipelineWithStructuredMetadata(t *testing.T) {
|
||||
// test duplicated structured metadata with stream labels
|
||||
expectedNonIndexedLabels := labels.FromStrings("user", "bob", "foo_extracted", "baz")
|
||||
expectedLabelsResults = labels.FromStrings("foo", "bar", "foo_extracted", "baz")
|
||||
expectedLabelsResults = append(expectedLabelsResults, structuredMetadata...)
|
||||
expectedLabelsResults = appendLabels(expectedLabelsResults, structuredMetadata)
|
||||
l, lbr, matches = p.ForStream(lbs).Process(0, []byte("line"), labels.NewBuilder(structuredMetadata).Set("foo", "baz").Labels())
|
||||
require.Equal(t, []byte("lbs bar bob"), l)
|
||||
require.Equal(t, NewLabelsResult(expectedLabelsResults.String(), expectedLabelsResults.Hash(), lbs, expectedNonIndexedLabels, labels.EmptyLabels()), lbr)
|
||||
@@ -167,8 +167,8 @@ func TestPipelineWithStructuredMetadata(t *testing.T) {
|
||||
|
||||
// Reset caches
|
||||
p.baseBuilder.del = []string{"foo", "bar"}
|
||||
p.baseBuilder.add = [numValidCategories]labels.Labels{
|
||||
ParsedLabel: labels.FromStrings("baz", "blip"),
|
||||
p.baseBuilder.add = [numValidCategories][]labels.Label{
|
||||
ParsedLabel: {{"baz", "blip"}},
|
||||
}
|
||||
|
||||
p.Reset()
|
||||
@@ -179,6 +179,16 @@ func TestPipelineWithStructuredMetadata(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func appendLabels(base labels.Labels, l labels.Labels) labels.Labels {
|
||||
b := labels.NewBuilder(base)
|
||||
|
||||
l.Range(func(l labels.Label) {
|
||||
b.Set(l.Name, l.Value)
|
||||
})
|
||||
|
||||
return b.Labels()
|
||||
}
|
||||
|
||||
func TestFilteringPipeline(t *testing.T) {
|
||||
tt := []struct {
|
||||
name string
|
||||
@@ -188,22 +198,22 @@ func TestFilteringPipeline(t *testing.T) {
|
||||
structuredMetadata labels.Labels
|
||||
ok bool
|
||||
}{
|
||||
{"it is before the timerange", 1, "line", labels.FromStrings("baz", "foo"), nil, true},
|
||||
{"it is after the timerange", 6, "line", labels.FromStrings("baz", "foo"), nil, true},
|
||||
{"it doesn't match the filter", 3, "all good", labels.FromStrings("baz", "foo"), nil, true},
|
||||
{"it doesn't match all the selectors", 3, "line", labels.FromStrings("foo", "bar"), nil, true},
|
||||
{"it doesn't match any selectors", 3, "line", labels.FromStrings("beep", "boop"), nil, true},
|
||||
{"it matches all selectors", 3, "line", labels.FromStrings("foo", "bar", "bar", "baz"), nil, false},
|
||||
{"it is before the timerange", 1, "line", labels.FromStrings("baz", "foo"), labels.EmptyLabels(), true},
|
||||
{"it is after the timerange", 6, "line", labels.FromStrings("baz", "foo"), labels.EmptyLabels(), true},
|
||||
{"it doesn't match the filter", 3, "all good", labels.FromStrings("baz", "foo"), labels.EmptyLabels(), true},
|
||||
{"it doesn't match all the selectors", 3, "line", labels.FromStrings("foo", "bar"), labels.EmptyLabels(), true},
|
||||
{"it doesn't match any selectors", 3, "line", labels.FromStrings("beep", "boop"), labels.EmptyLabels(), true},
|
||||
{"it matches all selectors", 3, "line", labels.FromStrings("foo", "bar", "bar", "baz"), labels.EmptyLabels(), false},
|
||||
{"it doesn't match all structured metadata", 3, "line", labels.FromStrings("foo", "baz"), labels.FromStrings("user", "alice"), true},
|
||||
{"it matches all structured metadata", 3, "line", labels.FromStrings("foo", "baz"), labels.FromStrings("user", "bob"), false},
|
||||
{"it tries all the filters", 5, "line", labels.FromStrings("baz", "foo"), nil, false},
|
||||
{"it tries all the filters", 5, "line", labels.FromStrings("baz", "foo"), labels.EmptyLabels(), false},
|
||||
}
|
||||
|
||||
for _, test := range tt {
|
||||
downstream := newStubPipeline()
|
||||
p := NewFilteringPipeline([]PipelineFilter{
|
||||
newPipelineFilter(2, 4, labels.FromStrings("foo", "bar", "bar", "baz"), nil, "e"),
|
||||
newPipelineFilter(3, 5, labels.FromStrings("baz", "foo"), nil, "e"),
|
||||
newPipelineFilter(2, 4, labels.FromStrings("foo", "bar", "bar", "baz"), labels.EmptyLabels(), "e"),
|
||||
newPipelineFilter(3, 5, labels.FromStrings("baz", "foo"), labels.EmptyLabels(), "e"),
|
||||
newPipelineFilter(3, 5, labels.FromStrings("foo", "baz"), labels.FromStrings("user", "bob"), "e"),
|
||||
}, downstream)
|
||||
|
||||
@@ -380,8 +390,8 @@ func TestDropLabelsPipeline(t *testing.T) {
|
||||
for i, line := range tt.lines {
|
||||
_, finalLbs, _ := sp.Process(0, line, labels.EmptyLabels())
|
||||
require.Equal(t, tt.wantLabels[i], finalLbs.Labels())
|
||||
require.Nil(t, finalLbs.Stream())
|
||||
require.Nil(t, finalLbs.StructuredMetadata())
|
||||
require.Equal(t, labels.EmptyLabels(), finalLbs.Stream())
|
||||
require.Equal(t, labels.EmptyLabels(), finalLbs.StructuredMetadata())
|
||||
require.Equal(t, tt.wantLabels[i], finalLbs.Parsed())
|
||||
require.Equal(t, tt.wantLabels[i].Hash(), finalLbs.Hash())
|
||||
}
|
||||
@@ -502,13 +512,9 @@ func TestKeepLabelsPipeline(t *testing.T) {
|
||||
finalLine, finalLbs, _ := sp.Process(0, line, labels.EmptyLabels())
|
||||
require.Equal(t, tt.wantLine[i], finalLine)
|
||||
require.Equal(t, tt.wantLabels[i], finalLbs.Labels())
|
||||
require.Nil(t, finalLbs.Stream())
|
||||
require.Nil(t, finalLbs.StructuredMetadata())
|
||||
if len(tt.wantLabels[i]) > 0 {
|
||||
require.Equal(t, tt.wantLabels[i], finalLbs.Parsed())
|
||||
} else {
|
||||
require.Nil(t, finalLbs.Parsed())
|
||||
}
|
||||
require.Equal(t, labels.EmptyLabels(), finalLbs.Stream())
|
||||
require.Equal(t, labels.EmptyLabels(), finalLbs.StructuredMetadata())
|
||||
require.Equal(t, tt.wantLabels[i], finalLbs.Parsed())
|
||||
require.Equal(t, tt.wantLabels[i].Hash(), finalLbs.Hash())
|
||||
require.Equal(t, tt.wantLabels[i].String(), finalLbs.String())
|
||||
}
|
||||
@@ -615,7 +621,7 @@ func Benchmark_Pipeline(b *testing.B) {
|
||||
b.Run("line extractor bytes", func(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for n := 0; n < b.N; n++ {
|
||||
samples, ok := ex.Process(0, line)
|
||||
samples, ok := ex.Process(0, line, labels.EmptyLabels())
|
||||
if ok && len(samples) > 0 {
|
||||
resSample = samples[0].Value
|
||||
resLbs = samples[0].Labels
|
||||
@@ -628,7 +634,7 @@ func Benchmark_Pipeline(b *testing.B) {
|
||||
b.Run("line extractor string", func(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for n := 0; n < b.N; n++ {
|
||||
samples, ok := ex.ProcessString(0, lineString)
|
||||
samples, ok := ex.ProcessString(0, lineString, labels.EmptyLabels())
|
||||
if ok && len(samples) > 0 {
|
||||
resSample = samples[0].Value
|
||||
resLbs = samples[0].Labels
|
||||
@@ -646,7 +652,7 @@ func Benchmark_Pipeline(b *testing.B) {
|
||||
b.Run("label extractor bytes", func(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for n := 0; n < b.N; n++ {
|
||||
samples, ok := ex.Process(0, line)
|
||||
samples, ok := ex.Process(0, line, labels.EmptyLabels())
|
||||
if ok && len(samples) > 0 {
|
||||
resSample = samples[0].Value
|
||||
resLbs = samples[0].Labels
|
||||
@@ -659,7 +665,7 @@ func Benchmark_Pipeline(b *testing.B) {
|
||||
b.Run("label extractor string", func(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for n := 0; n < b.N; n++ {
|
||||
samples, ok := ex.ProcessString(0, lineString)
|
||||
samples, ok := ex.ProcessString(0, lineString, labels.EmptyLabels())
|
||||
if ok && len(samples) > 0 {
|
||||
resSample = samples[0].Value
|
||||
resLbs = samples[0].Labels
|
||||
|
||||
@@ -157,7 +157,7 @@ func Test_MultiVariantExpr_Extractors(t *testing.T) {
|
||||
streamExtractor := extractors[0].ForStream(lbls)
|
||||
require.NotNil(t, streamExtractor, "stream extractor should not be nil")
|
||||
|
||||
samples, ok := streamExtractor.Process(now.UnixNano(), []byte(tc.testLine))
|
||||
samples, ok := streamExtractor.Process(now.UnixNano(), []byte(tc.testLine), labels.EmptyLabels())
|
||||
require.True(t, ok)
|
||||
|
||||
seen := make(map[string]float64, len(samples))
|
||||
@@ -176,7 +176,7 @@ func Test_MultiVariantExpr_Extractors(t *testing.T) {
|
||||
streamExtractor = extractors[0].ForStream(lbls)
|
||||
require.NotNil(t, streamExtractor, "multi-variant stream extractor should not be nil")
|
||||
|
||||
mvSamples, ok := streamExtractor.Process(now.UnixNano(), []byte(tc.testLine))
|
||||
mvSamples, ok := streamExtractor.Process(now.UnixNano(), []byte(tc.testLine), labels.EmptyLabels())
|
||||
require.True(t, ok)
|
||||
|
||||
// remove variant label
|
||||
@@ -412,7 +412,7 @@ func Test_MultiVariantExpr_Extractors(t *testing.T) {
|
||||
streamExtractor := extractors[0].ForStream(lbls)
|
||||
require.NotNil(t, streamExtractor, "stream extractor should not be nil")
|
||||
|
||||
samples, ok := streamExtractor.Process(now.UnixNano(), []byte(tc.testLine))
|
||||
samples, ok := streamExtractor.Process(now.UnixNano(), []byte(tc.testLine), labels.EmptyLabels())
|
||||
require.True(t, ok)
|
||||
|
||||
expectedSamples := make(map[string]float64, len(tc.expected))
|
||||
|
||||
@@ -3,7 +3,6 @@ package syntax
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
@@ -272,11 +271,8 @@ func ParseLogSelector(input string, validate bool) (LogSelectorExpr, error) {
|
||||
func ParseLabels(lbs string) (labels.Labels, error) {
|
||||
ls, err := promql_parser.ParseMetric(lbs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return labels.EmptyLabels(), err
|
||||
}
|
||||
// Sort labels to ensure functionally equivalent
|
||||
// inputs map to the same output
|
||||
sort.Sort(ls)
|
||||
|
||||
// Use the label builder to trim empty label values.
|
||||
// Empty label values are equivalent to absent labels
|
||||
|
||||
@@ -3506,7 +3506,7 @@ func Benchmark_MetricPipelineCombined(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
samples, matches = sp.Process(0, in)
|
||||
samples, matches = sp.Process(0, in, labels.EmptyLabels())
|
||||
}
|
||||
|
||||
v = samples[0].Value
|
||||
@@ -3678,6 +3678,22 @@ func TestParseLabels(t *testing.T) {
|
||||
input: `{job="foo"}`,
|
||||
output: labels.FromStrings("job", "foo"),
|
||||
},
|
||||
{
|
||||
desc: "multiple labels, already sorted",
|
||||
input: `{env="a", job="foo"}`,
|
||||
output: labels.FromStrings(
|
||||
"env", "a",
|
||||
"job", "foo",
|
||||
),
|
||||
},
|
||||
{
|
||||
desc: "multiple labels, not sorted",
|
||||
input: `{job="foo", env="a"}`,
|
||||
output: labels.FromStrings(
|
||||
"env", "a",
|
||||
"job", "foo",
|
||||
),
|
||||
},
|
||||
{
|
||||
desc: "strip empty label value",
|
||||
input: `{job="foo", bar=""}`,
|
||||
|
||||
@@ -148,7 +148,7 @@ func processSeries(in []logproto.Stream, ex []log.SampleExtractor) ([]logproto.S
|
||||
exs := extractor.ForStream(mustParseLabels(stream.Labels))
|
||||
for _, e := range stream.Entries {
|
||||
|
||||
if samples, ok := exs.Process(e.Timestamp.UnixNano(), []byte(e.Line)); ok {
|
||||
if samples, ok := exs.Process(e.Timestamp.UnixNano(), []byte(e.Line), labels.EmptyLabels()); ok {
|
||||
for _, sample := range samples {
|
||||
lbs := sample.Labels
|
||||
f := sample.Value
|
||||
|
||||
@@ -52,8 +52,10 @@ func TestLazyChunkIterator(t *testing.T) {
|
||||
Hash: fooLabelsWithName.Hash(),
|
||||
Entries: []logproto.Entry{
|
||||
{
|
||||
Timestamp: from,
|
||||
Line: "1",
|
||||
Timestamp: from,
|
||||
Line: "1",
|
||||
Parsed: logproto.EmptyLabelAdapters(),
|
||||
StructuredMetadata: logproto.EmptyLabelAdapters(),
|
||||
},
|
||||
},
|
||||
}),
|
||||
@@ -63,8 +65,10 @@ func TestLazyChunkIterator(t *testing.T) {
|
||||
Hash: fooLabels.Hash(),
|
||||
Entries: []logproto.Entry{
|
||||
{
|
||||
Timestamp: from,
|
||||
Line: "1",
|
||||
Timestamp: from,
|
||||
Line: "1",
|
||||
Parsed: logproto.EmptyLabelAdapters(),
|
||||
StructuredMetadata: logproto.EmptyLabelAdapters(),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
@@ -1133,14 +1133,14 @@ func (p *mockStreamExtractor) BaseLabels() lokilog.LabelsResult {
|
||||
return p.wrappedSP.BaseLabels()
|
||||
}
|
||||
|
||||
func (p *mockStreamExtractor) Process(ts int64, line []byte, lbs ...labels.Label) ([]lokilog.ExtractedSample, bool) {
|
||||
func (p *mockStreamExtractor) Process(ts int64, line []byte, lbs labels.Labels) ([]lokilog.ExtractedSample, bool) {
|
||||
p.called++
|
||||
return p.wrappedSP.Process(ts, line, lbs...)
|
||||
return p.wrappedSP.Process(ts, line, lbs)
|
||||
}
|
||||
|
||||
func (p *mockStreamExtractor) ProcessString(ts int64, line string, lbs ...labels.Label) ([]lokilog.ExtractedSample, bool) {
|
||||
func (p *mockStreamExtractor) ProcessString(ts int64, line string, lbs labels.Labels) ([]lokilog.ExtractedSample, bool) {
|
||||
p.called++
|
||||
return p.wrappedSP.ProcessString(ts, line, lbs...)
|
||||
return p.wrappedSP.ProcessString(ts, line, lbs)
|
||||
}
|
||||
|
||||
func Test_store_GetSeries(t *testing.T) {
|
||||
@@ -2041,6 +2041,7 @@ func TestQueryReferencingStructuredMetadata(t *testing.T) {
|
||||
entry := logproto.Entry{
|
||||
Timestamp: ts,
|
||||
Line: fmt.Sprintf("ts=%d level=info", ts.Unix()),
|
||||
Parsed: logproto.EmptyLabelAdapters(),
|
||||
}
|
||||
|
||||
if withStructuredMetadata {
|
||||
@@ -2054,6 +2055,8 @@ func TestQueryReferencingStructuredMetadata(t *testing.T) {
|
||||
Value: "1",
|
||||
},
|
||||
}
|
||||
} else {
|
||||
entry.StructuredMetadata = logproto.EmptyLabelAdapters()
|
||||
}
|
||||
dup, err := chunkEnc.Append(&entry)
|
||||
require.False(t, dup)
|
||||
@@ -2086,6 +2089,7 @@ func TestQueryReferencingStructuredMetadata(t *testing.T) {
|
||||
expectedEntry := logproto.Entry{
|
||||
Timestamp: ts.Truncate(0),
|
||||
Line: fmt.Sprintf("ts=%d level=info", ts.Unix()),
|
||||
Parsed: logproto.EmptyLabelAdapters(),
|
||||
}
|
||||
|
||||
if withStructuredMetadata {
|
||||
@@ -2099,6 +2103,8 @@ func TestQueryReferencingStructuredMetadata(t *testing.T) {
|
||||
Value: "1",
|
||||
},
|
||||
}
|
||||
} else {
|
||||
expectedEntry.StructuredMetadata = logproto.EmptyLabelAdapters()
|
||||
}
|
||||
require.Equal(t, expectedEntry, it.At())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user