chore(thor): Performance optimization for aggregator (#20934)

This commit is contained in:
Stas Spiridonov
2026-02-23 23:07:43 -05:00
committed by GitHub
parent 1bf3b788b0
commit 32248ef0ba
3 changed files with 46 additions and 63 deletions

View File

@@ -18,10 +18,8 @@ import (
var ErrSeriesLimitExceeded = errors.New("maximum number of series limit exceeded")
type groupState struct {
value float64 // aggregated value
count int64 // values counter
labels []arrow.Field // grouping labels
labelValues []string // grouping label values
value float64 // aggregated value
count int64 // values counter
}
type aggregationOperation int
@@ -42,12 +40,12 @@ type aggregator struct {
points map[time.Time]map[uint64]*groupState // holds the groupState for each point in time series
digest *xxhash.Digest // used to compute key for each group
operation aggregationOperation // aggregation type
labels []arrow.Field // combined list of all label fields for all sample values
labels map[string]arrow.Field // combined list of all label fields for all sample values
clonedLabelValues map[string]string // cache of cloned strings to reduce allocations for repeated values
// Track unique series across all timestamps to enforce maxSeries limit
maxSeries int // maximum number of unique series allowed (0 means no limit)
uniqueSeries map[uint64]struct{} // tracks unique series across all timestamps
maxSeries int // maximum number of unique series allowed (0 means no limit)
uniqueSeries map[uint64]map[string]string // tracks unique series across all timestamps
}
// newAggregator creates a new aggregator with the specified grouping.
@@ -56,7 +54,8 @@ func newAggregator(pointsSizeHint int, operation aggregationOperation) *aggregat
digest: xxhash.New(),
operation: operation,
clonedLabelValues: make(map[string]string),
uniqueSeries: make(map[uint64]struct{}),
labels: make(map[string]arrow.Field),
uniqueSeries: make(map[uint64]map[string]string),
}
if pointsSizeHint > 0 {
@@ -72,10 +71,8 @@ func newAggregator(pointsSizeHint int, operation aggregationOperation) *aggregat
// over the lifetime of an aggregator to accommodate processing of multiple records with different schemas.
func (a *aggregator) AddLabels(labels []arrow.Field) {
for _, label := range labels {
if !slices.ContainsFunc(a.labels, func(l arrow.Field) bool {
return label.Equal(l)
}) {
a.labels = append(a.labels, label)
if _, ok := a.labels[label.Name]; !ok {
a.labels[label.Name] = label
}
}
}
@@ -134,47 +131,33 @@ func (a *aggregator) Add(ts time.Time, value float64, labels []arrow.Field, labe
state.count++
} else {
if a.maxSeries > 0 {
if series, exists := a.uniqueSeries[key]; !exists {
// Check series limit before adding a new series
if _, exists := a.uniqueSeries[key]; !exists {
if len(a.uniqueSeries) >= a.maxSeries {
return ErrSeriesLimitExceeded
if a.maxSeries > 0 && len(a.uniqueSeries) >= a.maxSeries {
return ErrSeriesLimitExceeded
}
if len(labels) > 0 {
series = make(map[string]string)
for i, v := range labelValues {
// copy the value as this is backed by the arrow array data buffer.
// We could retain the record to avoid this copy, but that would hold
// all other columns in memory for as long as the query is evaluated.
cloned, ok := a.clonedLabelValues[v]
if !ok {
cloned = strings.Clone(v)
a.clonedLabelValues[v] = cloned
}
series[labels[i].Name] = cloned
}
a.uniqueSeries[key] = struct{}{}
}
}
count := int64(1)
if len(labels) == 0 {
// special case: All values aggregated into a single group.
point[key] = &groupState{
value: value,
count: count,
}
return nil
}
labelValuesCopy := make([]string, len(labelValues))
for i, v := range labelValues {
// copy the value as this is backed by the arrow array data buffer.
// We could retain the record to avoid this copy, but that would hold
// all other columns in memory for as long as the query is evaluated.
cloned, ok := a.clonedLabelValues[v]
if !ok {
cloned = strings.Clone(v)
a.clonedLabelValues[v] = cloned
}
labelValuesCopy[i] = cloned
a.uniqueSeries[key] = series
}
point[key] = &groupState{
labels: labels,
labelValues: labelValuesCopy,
value: value,
count: count,
value: value,
count: int64(1),
}
}
return nil
@@ -186,7 +169,9 @@ func (a *aggregator) BuildRecord() (arrow.RecordBatch, error) {
semconv.FieldFromIdent(semconv.ColumnIdentTimestamp, false),
semconv.FieldFromIdent(semconv.ColumnIdentValue, false),
)
fields = append(fields, a.labels...)
for _, label := range a.labels {
fields = append(fields, label)
}
schema := arrow.NewSchema(fields, nil)
rb := array.NewRecordBuilder(memory.NewGoAllocator(), schema)
@@ -203,7 +188,7 @@ func (a *aggregator) BuildRecord() (arrow.RecordBatch, error) {
for _, ts := range sortedTimestamps {
tsValue, _ := arrow.TimestampFromTime(ts, arrow.Nanosecond)
for _, entry := range a.points[ts] {
for key, entry := range a.points[ts] {
var value float64
switch a.operation {
case aggregationOperationAvg:
@@ -217,21 +202,14 @@ func (a *aggregator) BuildRecord() (arrow.RecordBatch, error) {
rb.Field(0).(*array.TimestampBuilder).Append(tsValue)
rb.Field(1).(*array.Float64Builder).Append(value)
for i, label := range a.labels {
builder := rb.Field(2 + i) // offset by 2 as the first 2 fields are timestamp and value
series := a.uniqueSeries[key]
for i := 2; i < len(fields); i++ { // offset by 2 as the first 2 fields are timestamp and value
builder := rb.Field(i)
j := slices.IndexFunc(entry.labels, func(l arrow.Field) bool {
return l.Name == label.Name
})
if j == -1 {
builder.(*array.StringBuilder).AppendNull()
if v, ok := series[fields[i].Name]; ok {
builder.(*array.StringBuilder).Append(v)
} else {
// TODO: differentiate between null and actual empty string
if entry.labelValues[j] == "" {
builder.(*array.StringBuilder).AppendNull()
} else {
builder.(*array.StringBuilder).Append(entry.labelValues[j])
}
builder.(*array.StringBuilder).AppendNull()
}
}
}
@@ -248,6 +226,7 @@ func (a *aggregator) Reset() {
}
clear(a.uniqueSeries)
clear(a.clonedLabelValues)
}
// getSortedTimestamps returns all timestamps in sorted order

View File

@@ -299,12 +299,14 @@ func (r *rangeAggregationPipeline) read(ctx context.Context) (arrow.RecordBatch,
r.inputsExhausted = true
rec, err := r.aggregator.BuildRecord()
if region := xcap.RegionFromContext(ctx); region != nil {
computeTime := time.Since(startedAt) - inputReadTime
region.Record(xcap.StatPipelineExecDuration.Observe(computeTime.Seconds()))
}
return r.aggregator.BuildRecord()
return rec, err
}
// Close closes the resources of the pipeline.

View File

@@ -237,12 +237,14 @@ func (v *vectorAggregationPipeline) read(ctx context.Context) (arrow.RecordBatch
v.inputsExhausted = true
rec, err := v.aggregator.BuildRecord()
if region := xcap.RegionFromContext(ctx); region != nil {
computeTime := time.Since(startedAt) - inputReadTime
region.Record(xcap.StatPipelineExecDuration.Observe(computeTime.Seconds()))
}
return v.aggregator.BuildRecord()
return rec, err
}
// Close closes the resources of the pipeline.