mirror of
https://github.com/grafana/loki.git
synced 2026-03-13 09:33:58 +08:00
fix(alloc): set a limit on preallocations (backport release-3.6.x) (#20920)
Co-authored-by: Ivan Kalita <ivan.kalita@grafana.com>
This commit is contained in:
@@ -1000,7 +1000,8 @@ func determineType(value string) logproto.DetectedFieldType {
|
||||
}
|
||||
|
||||
func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*parsedFields {
|
||||
detectedFields := make(map[string]*parsedFields, limit)
|
||||
const maxDetectedFieldsPreAlloc = 1000
|
||||
detectedFields := make(map[string]*parsedFields, min(maxDetectedFieldsPreAlloc, limit))
|
||||
fieldCount := uint32(0)
|
||||
emtpyparsers := []string{}
|
||||
|
||||
|
||||
@@ -280,7 +280,8 @@ func determineType(value string) logproto.DetectedFieldType {
|
||||
}
|
||||
|
||||
func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*parsedFields {
|
||||
detectedFields := make(map[string]*parsedFields, limit)
|
||||
const maxDetectedFieldsPreAlloc = 1000
|
||||
detectedFields := make(map[string]*parsedFields, min(maxDetectedFieldsPreAlloc, limit))
|
||||
fieldCount := uint32(0)
|
||||
emtpyparsers := []string{}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
runtime "runtime"
|
||||
"slices"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -117,6 +118,27 @@ func Test_parseDetectedFields(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("detects fields with huge limit doesn't explode memory", func(t *testing.T) {
|
||||
runtime.GC()
|
||||
var before runtime.MemStats
|
||||
runtime.ReadMemStats(&before)
|
||||
|
||||
df := parseDetectedFields(1000000, logqlmodel.Streams([]push.Stream{rulerStream}))
|
||||
require.True(t, len(df) > 0)
|
||||
|
||||
runtime.GC()
|
||||
var after runtime.MemStats
|
||||
runtime.ReadMemStats(&after)
|
||||
|
||||
delta := int64(after.TotalAlloc) - int64(before.TotalAlloc)
|
||||
// 10 MB
|
||||
if delta > 10*1024*1024 {
|
||||
t.Fatalf("heap grew too much: %d MB", delta/1024/1024)
|
||||
}
|
||||
|
||||
runtime.KeepAlive(df)
|
||||
})
|
||||
|
||||
t.Run("detects json fields", func(t *testing.T) {
|
||||
df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{nginxStream}))
|
||||
for _, expected := range []string{"host", "user_identifier", "datetime", "method", "request", "protocol", "status", "bytes", "referer"} {
|
||||
|
||||
@@ -55,7 +55,8 @@ func MergeFields(
|
||||
fields []*logproto.DetectedField,
|
||||
limit uint32,
|
||||
) ([]*logproto.DetectedField, error) {
|
||||
mergedFields := make(map[string]*UnmarshaledDetectedField, limit)
|
||||
const maxMergedFieldsPreAlloc = 1000
|
||||
mergedFields := make(map[string]*UnmarshaledDetectedField, min(maxMergedFieldsPreAlloc, limit))
|
||||
foundFields := uint32(0)
|
||||
|
||||
for _, field := range fields {
|
||||
@@ -86,7 +87,7 @@ func MergeFields(
|
||||
}
|
||||
}
|
||||
|
||||
result := make([]*logproto.DetectedField, 0, limit)
|
||||
result := make([]*logproto.DetectedField, 0, len(mergedFields))
|
||||
for _, field := range mergedFields {
|
||||
detectedField := &logproto.DetectedField{
|
||||
Label: field.Label,
|
||||
@@ -105,7 +106,8 @@ func MergeValues(
|
||||
values []string,
|
||||
limit uint32,
|
||||
) ([]string, error) {
|
||||
mergedValues := make(map[string]struct{}, limit)
|
||||
const maxMergedValuesPreAlloc = 1000
|
||||
mergedValues := make(map[string]struct{}, min(maxMergedValuesPreAlloc, limit))
|
||||
|
||||
for _, value := range values {
|
||||
if value == "" {
|
||||
@@ -119,7 +121,7 @@ func MergeValues(
|
||||
mergedValues[value] = struct{}{}
|
||||
}
|
||||
|
||||
result := make([]string, 0, limit)
|
||||
result := make([]string, 0, len(mergedValues))
|
||||
for value := range mergedValues {
|
||||
result = append(result, value)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package detected
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"testing"
|
||||
|
||||
"github.com/axiomhq/hyperloglog"
|
||||
@@ -88,6 +89,27 @@ func Test_MergeFields(t *testing.T) {
|
||||
assert.Equal(t, logproto.DetectedFieldString, baz.Type)
|
||||
})
|
||||
|
||||
t.Run("huge limit doesn't explode the heap", func(t *testing.T) {
|
||||
runtime.GC()
|
||||
var before runtime.MemStats
|
||||
runtime.ReadMemStats(&before)
|
||||
|
||||
result, err := MergeFields(fields, 10000000)
|
||||
require.NoError(t, err)
|
||||
|
||||
runtime.GC()
|
||||
var after runtime.MemStats
|
||||
runtime.ReadMemStats(&after)
|
||||
|
||||
delta := int64(after.TotalAlloc) - int64(before.TotalAlloc)
|
||||
// 10 MB
|
||||
if delta > 10*1024*1024 {
|
||||
t.Fatalf("heap grew too much: %d MB", delta/1024/1024)
|
||||
}
|
||||
|
||||
runtime.KeepAlive(result)
|
||||
})
|
||||
|
||||
t.Run("returns up to limit number of fields", func(t *testing.T) {
|
||||
lowLimit := uint32(1)
|
||||
result, err := MergeFields(fields, lowLimit)
|
||||
@@ -125,6 +147,28 @@ func Test_MergeValues(t *testing.T) {
|
||||
assert.ElementsMatch(t, []string{"foo", "bar", "baz", "qux"}, result)
|
||||
})
|
||||
|
||||
t.Run("huge limit doesn't explode the heap", func(t *testing.T) {
|
||||
runtime.GC()
|
||||
var before runtime.MemStats
|
||||
runtime.ReadMemStats(&before)
|
||||
|
||||
values := []string{"foo", "bar", "baz", "qux"}
|
||||
result, err := MergeValues(values, 1000000)
|
||||
require.NoError(t, err)
|
||||
|
||||
runtime.GC()
|
||||
var after runtime.MemStats
|
||||
runtime.ReadMemStats(&after)
|
||||
|
||||
delta := int64(after.TotalAlloc) - int64(before.TotalAlloc)
|
||||
// 10 MB
|
||||
if delta > 10*1024*1024 {
|
||||
t.Fatalf("heap grew too much: %d MB", delta/1024/1024)
|
||||
}
|
||||
|
||||
runtime.KeepAlive(result)
|
||||
})
|
||||
|
||||
t.Run("merges repeating values", func(t *testing.T) {
|
||||
values := []string{"foo", "bar", "baz", "qux", "foo", "bar", "baz", "qux"}
|
||||
limit := uint32(50)
|
||||
|
||||
Reference in New Issue
Block a user