Elasticsearch: Fix pipeline aggregation queries in backend to pass frontend tests (#60337)

* Elasticsearch: Fix pipeline aggregation queries in backend

* Update

* Update lint

* Update pkg/tsdb/elasticsearch/time_series_query.go

* Fix lint

* Fix merge
This commit is contained in:
Ivana Huckova
2022-12-16 17:45:43 +01:00
committed by GitHub
parent cc007e9727
commit 09bb4423d2
2 changed files with 55 additions and 58 deletions

View File

@ -172,16 +172,17 @@ func (e *timeSeriesQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilde
continue
}
} else {
if _, err := strconv.Atoi(m.PipelineAggregate); err == nil {
pipelineAggField := getPipelineAggField(m)
if _, err := strconv.Atoi(pipelineAggField); err == nil {
var appliedAgg *MetricAgg
for _, pipelineMetric := range q.Metrics {
if pipelineMetric.ID == m.PipelineAggregate {
if pipelineMetric.ID == pipelineAggField {
appliedAgg = pipelineMetric
break
}
}
if appliedAgg != nil {
bucketPath := m.PipelineAggregate
bucketPath := pipelineAggField
if appliedAgg.Type == countType {
bucketPath = "_count"
}
@ -397,3 +398,16 @@ func addGeoHashGridAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBui
return aggBuilder
}
func getPipelineAggField(m *MetricAgg) string {
// In frontend we are using Field as pipelineAggField
// There might be historical reason why in backend we were using PipelineAggregate as pipelineAggField
// So for now let's check Field first and then PipelineAggregate to ensure that we are not breaking anything
// TODO: Investigate, if we can remove check for PipelineAggregate
pipelineAggField := m.Field
if pipelineAggField == "" {
pipelineAggField = m.PipelineAggregate
}
return pipelineAggField
}

View File

@ -674,8 +674,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
sr := c.multisearchRequests[0].Requests[0]
firstLevel := sr.Aggs[0]
require.Equal(t, firstLevel.Key, "4")
// FIXME: Currently this is 1 as movingAvg is completely missing. We have only sum.
// require.Equal(t, len(firstLevel.Aggregation.Aggs), 2)
require.Equal(t, len(firstLevel.Aggregation.Aggs), 2)
sumAgg := firstLevel.Aggregation.Aggs[0]
require.Equal(t, sumAgg.Key, "3")
@ -683,17 +682,14 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
mAgg := sumAgg.Aggregation.Aggregation.(*es.MetricAggregation)
require.Equal(t, mAgg.Field, "@value")
// FIXME: This is currently fully missing
// in the test bellow with pipelineAgg it is working as expected
// movingAvgAgg := firstLevel.Aggregation.Aggs[1]
// require.Equal(t, movingAvgAgg.Key, "2")
// require.Equal(t, movingAvgAgg.Aggregation.Type, "moving_avg")
// pl := movingAvgAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
// require.Equal(t, pl.BucketPath, "3")
movingAvgAgg := firstLevel.Aggregation.Aggs[1]
require.Equal(t, movingAvgAgg.Key, "2")
require.Equal(t, movingAvgAgg.Aggregation.Type, "moving_avg")
pl := movingAvgAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
require.Equal(t, pl.BucketPath, "3")
})
t.Run("With moving average", func(t *testing.T) {
// This test is with pipelineAgg and is passing. Same test without pipelineAgg is failing.
c := newFakeClient()
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
@ -753,19 +749,16 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
firstLevel := sr.Aggs[0]
require.Equal(t, firstLevel.Key, "4")
require.Equal(t, firstLevel.Aggregation.Type, "date_histogram")
// FIXME: Currently, movingAvg is completely missing
// in the test bellow with pipelineAgg it is working as expected
// require.Len(t, firstLevel.Aggregation.Aggs, 1)
require.Len(t, firstLevel.Aggregation.Aggs, 1)
// movingAvgAgg := firstLevel.Aggregation.Aggs[0]
// require.Equal(t, movingAvgAgg.Key, "2")
// require.Equal(t, movingAvgAgg.Aggregation.Type, "moving_avg")
// pl := movingAvgAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
// require.Equal(t, pl.BucketPath, "_count")
movingAvgAgg := firstLevel.Aggregation.Aggs[0]
require.Equal(t, movingAvgAgg.Key, "2")
require.Equal(t, movingAvgAgg.Aggregation.Type, "moving_avg")
pl := movingAvgAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
require.Equal(t, pl.BucketPath, "_count")
})
t.Run("With moving average doc count", func(t *testing.T) {
// This test is with pipelineAgg and is passing. Same test without pipelineAgg is failing.
c := newFakeClient()
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
@ -822,21 +815,18 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
firstLevel := sr.Aggs[0]
require.Equal(t, firstLevel.Key, "3")
// FIXME: Currently, movingAvg is completely missing
// in the test bellow with pipelineAgg it is working as expected
// require.Len(t, firstLevel.Aggregation.Aggs, 2)
require.Len(t, firstLevel.Aggregation.Aggs, 2)
sumAgg := firstLevel.Aggregation.Aggs[0]
require.Equal(t, sumAgg.Key, "3")
// movingAvgAgg := firstLevel.Aggregation.Aggs[1]
// require.Equal(t, movingAvgAgg.Key, "2")
// plAgg := movingAvgAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
// require.Equal(t, plAgg.BucketPath, "3")
movingAvgAgg := firstLevel.Aggregation.Aggs[1]
require.Equal(t, movingAvgAgg.Key, "2")
plAgg := movingAvgAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
require.Equal(t, plAgg.BucketPath, "3")
})
t.Run("With broken moving average", func(t *testing.T) {
// This test is with pipelineAgg and is passing. Same test without pipelineAgg is failing.
c := newFakeClient()
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
@ -1084,12 +1074,10 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
require.Equal(t, firstLevel.Key, "4")
require.Equal(t, firstLevel.Aggregation.Type, "date_histogram")
// FIXME: This is currently fully missing
// in the test above with pipelineAgg it is working as expected
// derivativeAgg := firstLevel.Aggregation.Aggs[0]
// require.Equal(t, derivativeAgg.Key, "2")
// plAgg := derivativeAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
// require.Equal(t, plAgg.BucketPath, "_count")
derivativeAgg := firstLevel.Aggregation.Aggs[0]
require.Equal(t, derivativeAgg.Key, "2")
plAgg := derivativeAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
require.Equal(t, plAgg.BucketPath, "_count")
})
t.Run("With serial_diff", func(t *testing.T) {
@ -1147,13 +1135,11 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
firstLevel := sr.Aggs[0]
require.Equal(t, firstLevel.Key, "3")
require.Equal(t, firstLevel.Aggregation.Type, "date_histogram")
// FIXME: This is currently fully missing
// in the test above with pipelineAgg it is working as expected
// serialDiffAgg := firstLevel.Aggregation.Aggs[1]
// require.Equal(t, serialDiffAgg.Key, "2")
// plAgg := serialDiffAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
// require.Equal(t, plAgg.BucketPath, "3")
// require.Equal(t, plAgg.Settings["lag"], "5")
serialDiffAgg := firstLevel.Aggregation.Aggs[1]
require.Equal(t, serialDiffAgg.Key, "2")
plAgg := serialDiffAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
require.Equal(t, plAgg.BucketPath, "3")
require.Equal(t, plAgg.Settings["lag"], 5.0)
})
t.Run("With serial_diff doc count", func(t *testing.T) {
@ -1447,20 +1433,18 @@ func TestSettingsCasting(t *testing.T) {
"bucketAggs": [{"type": "date_histogram", "field": "@timestamp", "id": "1"}]
}`, from, to, 15*time.Second)
require.NoError(t, err)
// FIXME
// This is working correctly if instead of field we use pipelineAgg
// sr := c.multisearchRequests[0].Requests[0]
// movingAvgSettings := sr.Aggs[0].Aggregation.Aggs[1].Aggregation.Aggregation.(*es.PipelineAggregation).Settings
sr := c.multisearchRequests[0].Requests[0]
movingAvgSettings := sr.Aggs[0].Aggregation.Aggs[1].Aggregation.Aggregation.(*es.PipelineAggregation).Settings
// assert.Equal(t, movingAvgSettings["window"], 5)
// assert.Equal(t, movingAvgSettings["predict"], 10)
assert.Equal(t, movingAvgSettings["window"], 5.0)
assert.Equal(t, movingAvgSettings["predict"], 10.0)
// modelSettings := movingAvgSettings["settings"].(map[string]interface{})
modelSettings := movingAvgSettings["settings"].(map[string]interface{})
// assert.Equal(t, modelSettings["alpha"], 1)
// assert.Equal(t, modelSettings["beta"], 2)
// assert.Equal(t, modelSettings["gamma"], 3)
// assert.Equal(t, modelSettings["period"], 4)
assert.Equal(t, modelSettings["alpha"], 1.0)
assert.Equal(t, modelSettings["beta"], 2.0)
assert.Equal(t, modelSettings["gamma"], 3.0)
assert.Equal(t, modelSettings["period"], 4.0)
})
t.Run("Correctly transforms moving_average settings", func(t *testing.T) {
@ -1528,10 +1512,9 @@ func TestSettingsCasting(t *testing.T) {
]
}`, from, to, 15*time.Second)
assert.Nil(t, err)
// FIXME This fails, but if we add pipelineAgg it works
// sr := c.multisearchRequests[0].Requests[0]
// serialDiffSettings := sr.Aggs[0].Aggregation.Aggs[1].Aggregation.Aggregation.(*es.PipelineAggregation).Settings
// assert.Equal(t, serialDiffSettings["lag"], 1.)
sr := c.multisearchRequests[0].Requests[0]
serialDiffSettings := sr.Aggs[0].Aggregation.Aggs[1].Aggregation.Aggregation.(*es.PipelineAggregation).Settings
assert.Equal(t, serialDiffSettings["lag"], 1.)
})
t.Run("Correctly transforms serial_diff settings", func(t *testing.T) {