From 32248ef0ba17165e75c4ffc8e2af759d8f156803 Mon Sep 17 00:00:00 2001 From: Stas Spiridonov Date: Mon, 23 Feb 2026 23:07:43 -0500 Subject: [PATCH] chore(thor): Performance optimization for aggregator (#20934) --- pkg/engine/internal/executor/aggregator.go | 101 +++++++----------- .../internal/executor/range_aggregation.go | 4 +- .../internal/executor/vector_aggregate.go | 4 +- 3 files changed, 46 insertions(+), 63 deletions(-) diff --git a/pkg/engine/internal/executor/aggregator.go b/pkg/engine/internal/executor/aggregator.go index 77816050eb..32ae1a7e1a 100644 --- a/pkg/engine/internal/executor/aggregator.go +++ b/pkg/engine/internal/executor/aggregator.go @@ -18,10 +18,8 @@ import ( var ErrSeriesLimitExceeded = errors.New("maximum number of series limit exceeded") type groupState struct { - value float64 // aggregated value - count int64 // values counter - labels []arrow.Field // grouping labels - labelValues []string // grouping label values + value float64 // aggregated value + count int64 // values counter } type aggregationOperation int @@ -42,12 +40,12 @@ type aggregator struct { points map[time.Time]map[uint64]*groupState // holds the groupState for each point in time series digest *xxhash.Digest // used to compute key for each group operation aggregationOperation // aggregation type - labels []arrow.Field // combined list of all label fields for all sample values + labels map[string]arrow.Field // combined list of all label fields for all sample values clonedLabelValues map[string]string // cache of cloned strings to reduce allocations for repeated values // Track unique series across all timestamps to enforce maxSeries limit - maxSeries int // maximum number of unique series allowed (0 means no limit) - uniqueSeries map[uint64]struct{} // tracks unique series across all timestamps + maxSeries int // maximum number of unique series allowed (0 means no limit) + uniqueSeries map[uint64]map[string]string // tracks unique series across all timestamps } // newAggregator creates a new aggregator with the specified grouping. @@ -56,7 +54,8 @@ func newAggregator(pointsSizeHint int, operation aggregationOperation) *aggregat digest: xxhash.New(), operation: operation, clonedLabelValues: make(map[string]string), - uniqueSeries: make(map[uint64]struct{}), + labels: make(map[string]arrow.Field), + uniqueSeries: make(map[uint64]map[string]string), } if pointsSizeHint > 0 { @@ -72,10 +71,8 @@ func newAggregator(pointsSizeHint int, operation aggregationOperation) *aggregat // over the lifetime of an aggregator to accommodate processing of multiple records with different schemas. func (a *aggregator) AddLabels(labels []arrow.Field) { for _, label := range labels { - if !slices.ContainsFunc(a.labels, func(l arrow.Field) bool { - return label.Equal(l) - }) { - a.labels = append(a.labels, label) + if _, ok := a.labels[label.Name]; !ok { + a.labels[label.Name] = label } } } @@ -134,47 +131,33 @@ func (a *aggregator) Add(ts time.Time, value float64, labels []arrow.Field, labe state.count++ } else { - - if a.maxSeries > 0 { + if series, exists := a.uniqueSeries[key]; !exists { // Check series limit before adding a new series - if _, exists := a.uniqueSeries[key]; !exists { - if len(a.uniqueSeries) >= a.maxSeries { - return ErrSeriesLimitExceeded + if a.maxSeries > 0 && len(a.uniqueSeries) >= a.maxSeries { + return ErrSeriesLimitExceeded + } + + if len(labels) > 0 { + series = make(map[string]string) + for i, v := range labelValues { + // copy the value as this is backed by the arrow array data buffer. + // We could retain the record to avoid this copy, but that would hold + // all other columns in memory for as long as the query is evaluated. + cloned, ok := a.clonedLabelValues[v] + if !ok { + cloned = strings.Clone(v) + a.clonedLabelValues[v] = cloned + } + series[labels[i].Name] = cloned } - - a.uniqueSeries[key] = struct{}{} } - } - count := int64(1) - - if len(labels) == 0 { - // special case: All values aggregated into a single group. - point[key] = &groupState{ - value: value, - count: count, - } - return nil - } - - labelValuesCopy := make([]string, len(labelValues)) - for i, v := range labelValues { - // copy the value as this is backed by the arrow array data buffer. - // We could retain the record to avoid this copy, but that would hold - // all other columns in memory for as long as the query is evaluated. - cloned, ok := a.clonedLabelValues[v] - if !ok { - cloned = strings.Clone(v) - a.clonedLabelValues[v] = cloned - } - labelValuesCopy[i] = cloned + a.uniqueSeries[key] = series } point[key] = &groupState{ - labels: labels, - labelValues: labelValuesCopy, - value: value, - count: count, + value: value, + count: int64(1), } } return nil @@ -186,7 +169,9 @@ func (a *aggregator) BuildRecord() (arrow.RecordBatch, error) { semconv.FieldFromIdent(semconv.ColumnIdentTimestamp, false), semconv.FieldFromIdent(semconv.ColumnIdentValue, false), ) - fields = append(fields, a.labels...) + for _, label := range a.labels { + fields = append(fields, label) + } schema := arrow.NewSchema(fields, nil) rb := array.NewRecordBuilder(memory.NewGoAllocator(), schema) @@ -203,7 +188,7 @@ func (a *aggregator) BuildRecord() (arrow.RecordBatch, error) { for _, ts := range sortedTimestamps { tsValue, _ := arrow.TimestampFromTime(ts, arrow.Nanosecond) - for _, entry := range a.points[ts] { + for key, entry := range a.points[ts] { var value float64 switch a.operation { case aggregationOperationAvg: @@ -217,21 +202,14 @@ func (a *aggregator) BuildRecord() (arrow.RecordBatch, error) { rb.Field(0).(*array.TimestampBuilder).Append(tsValue) rb.Field(1).(*array.Float64Builder).Append(value) - for i, label := range a.labels { - builder := rb.Field(2 + i) // offset by 2 as the first 2 fields are timestamp and value + series := a.uniqueSeries[key] + for i := 2; i < len(fields); i++ { // offset by 2 as the first 2 fields are timestamp and value + builder := rb.Field(i) - j := slices.IndexFunc(entry.labels, func(l arrow.Field) bool { - return l.Name == label.Name - }) - if j == -1 { - builder.(*array.StringBuilder).AppendNull() + if v, ok := series[fields[i].Name]; ok { + builder.(*array.StringBuilder).Append(v) } else { - // TODO: differentiate between null and actual empty string - if entry.labelValues[j] == "" { - builder.(*array.StringBuilder).AppendNull() - } else { - builder.(*array.StringBuilder).Append(entry.labelValues[j]) - } + builder.(*array.StringBuilder).AppendNull() } } } @@ -248,6 +226,7 @@ func (a *aggregator) Reset() { } clear(a.uniqueSeries) + clear(a.clonedLabelValues) } // getSortedTimestamps returns all timestamps in sorted order diff --git a/pkg/engine/internal/executor/range_aggregation.go b/pkg/engine/internal/executor/range_aggregation.go index 4703908150..4f33de07d6 100644 --- a/pkg/engine/internal/executor/range_aggregation.go +++ b/pkg/engine/internal/executor/range_aggregation.go @@ -299,12 +299,14 @@ func (r *rangeAggregationPipeline) read(ctx context.Context) (arrow.RecordBatch, r.inputsExhausted = true + rec, err := r.aggregator.BuildRecord() + if region := xcap.RegionFromContext(ctx); region != nil { computeTime := time.Since(startedAt) - inputReadTime region.Record(xcap.StatPipelineExecDuration.Observe(computeTime.Seconds())) } - return r.aggregator.BuildRecord() + return rec, err } // Close closes the resources of the pipeline. diff --git a/pkg/engine/internal/executor/vector_aggregate.go b/pkg/engine/internal/executor/vector_aggregate.go index f890e67d0f..557717c817 100644 --- a/pkg/engine/internal/executor/vector_aggregate.go +++ b/pkg/engine/internal/executor/vector_aggregate.go @@ -237,12 +237,14 @@ func (v *vectorAggregationPipeline) read(ctx context.Context) (arrow.RecordBatch v.inputsExhausted = true + rec, err := v.aggregator.BuildRecord() + if region := xcap.RegionFromContext(ctx); region != nil { computeTime := time.Since(startedAt) - inputReadTime region.Record(xcap.StatPipelineExecDuration.Observe(computeTime.Seconds())) } - return v.aggregator.BuildRecord() + return rec, err } // Close closes the resources of the pipeline.