mirror of
https://github.com/grafana/loki.git
synced 2026-03-13 09:33:58 +08:00
refactor(stringlabels): Support stringlabels in ingester (#17243)
This commit is contained in:
committed by
GitHub
parent
525b41155d
commit
845822b124
@@ -37,13 +37,13 @@ func FastFingerprint(ls []logproto.LabelAdapter) model.Fingerprint {
|
||||
}
|
||||
|
||||
// Fingerprint runs the same algorithm as Prometheus labelSetToFingerprint()
|
||||
func Fingerprint(labels labels.Labels) model.Fingerprint {
|
||||
func Fingerprint(lbls labels.Labels) model.Fingerprint {
|
||||
sum := hashNew()
|
||||
for _, label := range labels {
|
||||
lbls.Range(func(label labels.Label) {
|
||||
sum = hashAddString(sum, label.Name)
|
||||
sum = hashAddByte(sum, model.SeparatorByte)
|
||||
sum = hashAddString(sum, label.Value)
|
||||
sum = hashAddByte(sum, model.SeparatorByte)
|
||||
}
|
||||
})
|
||||
return model.Fingerprint(sum)
|
||||
}
|
||||
|
||||
@@ -320,7 +320,7 @@ func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint
|
||||
stream, ok = instance.streams.LoadByFP(fp)
|
||||
|
||||
if !ok {
|
||||
return nil, nil, nil
|
||||
return nil, labels.EmptyLabels(), nil
|
||||
}
|
||||
|
||||
stream.chunkMtx.Lock()
|
||||
|
||||
@@ -183,14 +183,14 @@ func Test_BitPrefixDeleteAddLoopkup(t *testing.T) {
|
||||
}
|
||||
|
||||
func Test_BitPrefix_hash_mapping(t *testing.T) {
|
||||
lbs := labels.Labels{
|
||||
labels.Label{Name: "compose_project", Value: "loki-tsdb-storage-s3"},
|
||||
labels.Label{Name: "compose_service", Value: "ingester-2"},
|
||||
labels.Label{Name: "container_name", Value: "loki-tsdb-storage-s3_ingester-2_1"},
|
||||
labels.Label{Name: "filename", Value: "/var/log/docker/790fef4c6a587c3b386fe85c07e03f3a1613f4929ca3abaa4880e14caadb5ad1/json.log"},
|
||||
labels.Label{Name: "host", Value: "docker-desktop"},
|
||||
labels.Label{Name: "source", Value: "stderr"},
|
||||
}
|
||||
lbs := labels.FromStrings(
|
||||
"compose_project", "loki-tsdb-storage-s3",
|
||||
"compose_service", "ingester-2",
|
||||
"container_name", "loki-tsdb-storage-s3_ingester-2_1",
|
||||
"filename", "/var/log/docker/790fef4c6a587c3b386fe85c07e03f3a1613f4929ca3abaa4880e14caadb5ad1/json.log",
|
||||
"host", "docker-desktop",
|
||||
"source", "stderr",
|
||||
)
|
||||
|
||||
// for _, shard := range []uint32{2, 4, 8, 16, 32, 64, 128} {
|
||||
for _, shard := range []uint32{2} {
|
||||
@@ -223,10 +223,10 @@ func Test_BitPrefix_hash_mapping(t *testing.T) {
|
||||
}
|
||||
|
||||
func Test_BitPrefixNoMatcherLookup(t *testing.T) {
|
||||
lbs := labels.Labels{
|
||||
labels.Label{Name: "foo", Value: "bar"},
|
||||
labels.Label{Name: "hi", Value: "hello"},
|
||||
}
|
||||
lbs := labels.FromStrings(
|
||||
"foo", "bar",
|
||||
"hi", "hello",
|
||||
)
|
||||
// with no shard param
|
||||
ii, err := NewBitPrefixWithShards(16)
|
||||
require.Nil(t, err)
|
||||
@@ -253,10 +253,10 @@ func Test_BitPrefixConsistentMapping(t *testing.T) {
|
||||
require.Nil(t, err)
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
lbs := labels.Labels{
|
||||
labels.Label{Name: "foo", Value: "bar"},
|
||||
labels.Label{Name: "hi", Value: fmt.Sprint(i)},
|
||||
}
|
||||
lbs := labels.FromStrings(
|
||||
"foo", "bar",
|
||||
"hi", fmt.Sprint(i),
|
||||
)
|
||||
|
||||
fp := model.Fingerprint(lbs.Hash())
|
||||
a.Add(logproto.FromLabelsToLabelAdapters(lbs), fp)
|
||||
|
||||
@@ -139,9 +139,9 @@ func labelsString(b *bytes.Buffer, ls labels.Labels) {
|
||||
b.WriteString("logs")
|
||||
b.WriteByte('{')
|
||||
i := 0
|
||||
for _, l := range ls {
|
||||
ls.Range(func(l labels.Label) {
|
||||
if l.Name == labels.MetricName {
|
||||
continue
|
||||
return
|
||||
}
|
||||
if i > 0 {
|
||||
b.WriteByte(',')
|
||||
@@ -152,7 +152,7 @@ func labelsString(b *bytes.Buffer, ls labels.Labels) {
|
||||
var buf [1000]byte
|
||||
b.Write(strconv.AppendQuote(buf[:0], l.Value))
|
||||
i++
|
||||
}
|
||||
})
|
||||
b.WriteByte('}')
|
||||
}
|
||||
|
||||
@@ -258,9 +258,9 @@ func (shard *indexShard) add(metric []logproto.LabelAdapter, fp model.Fingerprin
|
||||
shard.mtx.Lock()
|
||||
defer shard.mtx.Unlock()
|
||||
|
||||
internedLabels := make(labels.Labels, len(metric))
|
||||
builder := labels.NewScratchBuilder(len(metric))
|
||||
|
||||
for i, pair := range metric {
|
||||
for _, pair := range metric {
|
||||
values, ok := shard.idx[pair.Name]
|
||||
if !ok {
|
||||
values = indexEntry{
|
||||
@@ -283,10 +283,9 @@ func (shard *indexShard) add(metric []logproto.LabelAdapter, fp model.Fingerprin
|
||||
copy(fingerprints.fps[j+1:], fingerprints.fps[j:])
|
||||
fingerprints.fps[j] = fp
|
||||
values.fps[fingerprints.value] = fingerprints
|
||||
internedLabels[i] = labels.Label{Name: values.name, Value: fingerprints.value}
|
||||
builder.Add(values.name, fingerprints.value)
|
||||
}
|
||||
sort.Sort(internedLabels)
|
||||
return internedLabels
|
||||
return builder.Labels()
|
||||
}
|
||||
|
||||
func (shard *indexShard) lookup(matchers []*labels.Matcher) []model.Fingerprint {
|
||||
@@ -400,19 +399,19 @@ func (shard *indexShard) labelValues(
|
||||
return extractor(values)
|
||||
}
|
||||
|
||||
func (shard *indexShard) delete(labels labels.Labels, fp model.Fingerprint) {
|
||||
func (shard *indexShard) delete(lbls labels.Labels, fp model.Fingerprint) {
|
||||
shard.mtx.Lock()
|
||||
defer shard.mtx.Unlock()
|
||||
|
||||
for _, pair := range labels {
|
||||
lbls.Range(func(pair labels.Label) {
|
||||
name, value := pair.Name, pair.Value
|
||||
values, ok := shard.idx[name]
|
||||
if !ok {
|
||||
continue
|
||||
return
|
||||
}
|
||||
fingerprints, ok := values.fps[value]
|
||||
if !ok {
|
||||
continue
|
||||
return
|
||||
}
|
||||
|
||||
j := sort.Search(len(fingerprints.fps), func(i int) bool {
|
||||
@@ -421,7 +420,7 @@ func (shard *indexShard) delete(labels labels.Labels, fp model.Fingerprint) {
|
||||
|
||||
// see if search didn't find fp which matches the condition which means we don't have to do anything.
|
||||
if j >= len(fingerprints.fps) || fingerprints.fps[j] != fp {
|
||||
continue
|
||||
return
|
||||
}
|
||||
fingerprints.fps = fingerprints.fps[:j+copy(fingerprints.fps[j:], fingerprints.fps[j+1:])]
|
||||
|
||||
@@ -436,7 +435,7 @@ func (shard *indexShard) delete(labels labels.Labels, fp model.Fingerprint) {
|
||||
} else {
|
||||
shard.idx[name] = values
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// intersect two sorted lists of fingerprints. Assumes there are no duplicate
|
||||
|
||||
@@ -97,14 +97,14 @@ func TestDeleteAddLoopkup(t *testing.T) {
|
||||
}
|
||||
|
||||
func Test_hash_mapping(t *testing.T) {
|
||||
lbs := labels.Labels{
|
||||
labels.Label{Name: "compose_project", Value: "loki-tsdb-storage-s3"},
|
||||
labels.Label{Name: "compose_service", Value: "ingester-2"},
|
||||
labels.Label{Name: "container_name", Value: "loki-tsdb-storage-s3_ingester-2_1"},
|
||||
labels.Label{Name: "filename", Value: "/var/log/docker/790fef4c6a587c3b386fe85c07e03f3a1613f4929ca3abaa4880e14caadb5ad1/json.log"},
|
||||
labels.Label{Name: "host", Value: "docker-desktop"},
|
||||
labels.Label{Name: "source", Value: "stderr"},
|
||||
}
|
||||
lbs := labels.FromStrings(
|
||||
"compose_project", "loki-tsdb-storage-s3",
|
||||
"compose_service", "ingester-2",
|
||||
"container_name", "loki-tsdb-storage-s3_ingester-2_1",
|
||||
"filename", "/var/log/docker/790fef4c6a587c3b386fe85c07e03f3a1613f4929ca3abaa4880e14caadb5ad1/json.log",
|
||||
"host", "docker-desktop",
|
||||
"source", "stderr",
|
||||
)
|
||||
|
||||
for _, shard := range []uint32{16, 32, 64, 128} {
|
||||
t.Run(fmt.Sprintf("%d", shard), func(t *testing.T) {
|
||||
@@ -121,10 +121,10 @@ func Test_hash_mapping(t *testing.T) {
|
||||
}
|
||||
|
||||
func Test_NoMatcherLookup(t *testing.T) {
|
||||
lbs := labels.Labels{
|
||||
labels.Label{Name: "foo", Value: "bar"},
|
||||
labels.Label{Name: "hi", Value: "hello"},
|
||||
}
|
||||
lbs := labels.FromStrings(
|
||||
"foo", "bar",
|
||||
"hi", "hello",
|
||||
)
|
||||
// with no shard param
|
||||
ii := NewWithShards(16)
|
||||
ii.Add(logproto.FromLabelsToLabelAdapters(lbs), 1)
|
||||
@@ -146,10 +146,10 @@ func Test_ConsistentMapping(t *testing.T) {
|
||||
b := NewWithShards(32)
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
lbs := labels.Labels{
|
||||
labels.Label{Name: "foo", Value: "bar"},
|
||||
labels.Label{Name: "hi", Value: fmt.Sprint(i)},
|
||||
}
|
||||
lbs := labels.FromStrings(
|
||||
"foo", "bar",
|
||||
"hi", fmt.Sprint(i),
|
||||
)
|
||||
a.Add(logproto.FromLabelsToLabelAdapters(lbs), model.Fingerprint(i))
|
||||
b.Add(logproto.FromLabelsToLabelAdapters(lbs), model.Fingerprint(i))
|
||||
}
|
||||
|
||||
@@ -107,7 +107,7 @@ func (m *Multi) indexFor(t time.Time) Interface {
|
||||
type noopInvertedIndex struct{}
|
||||
|
||||
func (noopInvertedIndex) Add(_ []logproto.LabelAdapter, _ model.Fingerprint) labels.Labels {
|
||||
return nil
|
||||
return labels.EmptyLabels()
|
||||
}
|
||||
|
||||
func (noopInvertedIndex) Delete(_ labels.Labels, _ model.Fingerprint) {}
|
||||
|
||||
@@ -461,7 +461,7 @@ func (i *instance) getHashForLabels(ls labels.Labels) model.Fingerprint {
|
||||
func (i *instance) getLabelsFromFingerprint(fp model.Fingerprint) labels.Labels {
|
||||
s, ok := i.streams.LoadByFP(fp)
|
||||
if !ok {
|
||||
return nil
|
||||
return labels.EmptyLabels()
|
||||
}
|
||||
return s.labels
|
||||
}
|
||||
@@ -640,17 +640,17 @@ func (i *instance) label(ctx context.Context, req *logproto.LabelRequest, matche
|
||||
}, nil
|
||||
}
|
||||
|
||||
labels := util.NewUniqueStrings(0)
|
||||
lbls := util.NewUniqueStrings(0)
|
||||
err := i.forMatchingStreams(ctx, *req.Start, matchers, nil, func(s *stream) error {
|
||||
for _, label := range s.labels {
|
||||
s.labels.Range(func(label labels.Label) {
|
||||
if req.Values && label.Name == req.Name {
|
||||
labels.Add(label.Value)
|
||||
continue
|
||||
lbls.Add(label.Value)
|
||||
return
|
||||
}
|
||||
if !req.Values {
|
||||
labels.Add(label.Name)
|
||||
lbls.Add(label.Name)
|
||||
}
|
||||
}
|
||||
})
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
@@ -658,7 +658,7 @@ func (i *instance) label(ctx context.Context, req *logproto.LabelRequest, matche
|
||||
}
|
||||
|
||||
return &logproto.LabelResponse{
|
||||
Values: labels.Strings(),
|
||||
Values: lbls.Strings(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -692,7 +692,7 @@ func (i *instance) LabelsWithValues(ctx context.Context, startTime time.Time, ma
|
||||
}
|
||||
|
||||
err := i.forMatchingStreams(ctx, startTime, matchers, nil, func(s *stream) error {
|
||||
for _, label := range s.labels {
|
||||
s.labels.Range(func(label labels.Label) {
|
||||
v, exists := labelMap[label.Name]
|
||||
if !exists {
|
||||
v = make(map[string]struct{})
|
||||
@@ -701,7 +701,7 @@ func (i *instance) LabelsWithValues(ctx context.Context, startTime time.Time, ma
|
||||
v[label.Value] = struct{}{}
|
||||
}
|
||||
labelMap[label.Name] = v
|
||||
}
|
||||
})
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
@@ -847,7 +847,7 @@ func (i *instance) getVolume(ctx context.Context, req *logproto.VolumeRequest) (
|
||||
matchAny = matchAny || len(matchers) == 0
|
||||
|
||||
seriesNames := make(map[uint64]string)
|
||||
seriesLabels := labels.Labels(make([]labels.Label, 0, len(labelsToMatch)))
|
||||
seriesLabelsBuilder := labels.NewScratchBuilder(len(labelsToMatch))
|
||||
|
||||
from, through := req.From.Time(), req.Through.Time()
|
||||
volumes := make(map[string]uint64)
|
||||
@@ -874,15 +874,15 @@ func (i *instance) getVolume(ctx context.Context, req *logproto.VolumeRequest) (
|
||||
|
||||
var labelVolumes map[string]uint64
|
||||
if aggregateBySeries {
|
||||
seriesLabels = seriesLabels[:0]
|
||||
for _, l := range s.labels {
|
||||
seriesLabelsBuilder.Reset()
|
||||
s.labels.Range(func(l labels.Label) {
|
||||
if _, ok := labelsToMatch[l.Name]; matchAny || ok {
|
||||
seriesLabels = append(seriesLabels, l)
|
||||
seriesLabelsBuilder.Add(l.Name, l.Value)
|
||||
}
|
||||
}
|
||||
})
|
||||
} else {
|
||||
labelVolumes = make(map[string]uint64, len(s.labels))
|
||||
for _, l := range s.labels {
|
||||
labelVolumes = make(map[string]uint64, s.labels.Len())
|
||||
s.labels.Range(func(l labels.Label) {
|
||||
if len(targetLabels) > 0 {
|
||||
if _, ok := labelsToMatch[l.Name]; matchAny || ok {
|
||||
labelVolumes[l.Name] += size
|
||||
@@ -890,11 +890,12 @@ func (i *instance) getVolume(ctx context.Context, req *logproto.VolumeRequest) (
|
||||
} else {
|
||||
labelVolumes[l.Name] += size
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// If the labels are < 1k, this does not alloc
|
||||
// https://github.com/prometheus/prometheus/pull/8025
|
||||
seriesLabels := seriesLabelsBuilder.Labels()
|
||||
hash := seriesLabels.Hash()
|
||||
if _, ok := seriesNames[hash]; !ok {
|
||||
seriesNames[hash] = seriesLabels.String()
|
||||
|
||||
@@ -59,7 +59,7 @@ func (m *FpMapper) MapFP(fp model.Fingerprint, metric labels.Labels) model.Finge
|
||||
// Then check the most likely case: This fp belongs to a series that is
|
||||
// already in memory.
|
||||
s := m.fpToLabels(fp)
|
||||
if s != nil {
|
||||
if !s.IsEmpty() {
|
||||
// FP exists in memory, but is it for the same metric?
|
||||
if labels.Equal(metric, s) {
|
||||
// Yupp. We are done.
|
||||
@@ -143,10 +143,10 @@ func (m *FpMapper) nextMappedFP() model.Fingerprint {
|
||||
// and indexes as it might become really large, causing a lot of hashing effort
|
||||
// in maps and a lot of storage overhead in indexes.
|
||||
func metricToUniqueString(m labels.Labels) string {
|
||||
parts := make([]string, 0, len(m))
|
||||
for _, pair := range m {
|
||||
parts = append(parts, pair.Name+separatorString+pair.Value)
|
||||
}
|
||||
parts := make([]string, 0, m.Len())
|
||||
m.Range(func(l labels.Label) {
|
||||
parts = append(parts, l.Name+separatorString+l.Value)
|
||||
})
|
||||
sort.Strings(parts)
|
||||
return strings.Join(parts, separatorString)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user