mirror of
https://github.com/grafana/grafana.git
synced 2025-07-30 21:12:37 +08:00
Elasticsearch: Refactor parse query (#60440)
* Refactor parse query to functions * Move parsing to new file * Create empty result variable and use it when returning early * Fix linting * Revert "Create empty result variable and use it when returning early" This reverts commit 36a503f66e52f8213c673972774329a963a78100.
This commit is contained in:
110
pkg/tsdb/elasticsearch/parse_query.go
Normal file
110
pkg/tsdb/elasticsearch/parse_query.go
Normal file
@ -0,0 +1,110 @@
|
||||
package elasticsearch
|
||||
|
||||
import (
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
)
|
||||
|
||||
func parseQuery(tsdbQuery []backend.DataQuery) ([]*Query, error) {
|
||||
queries := make([]*Query, 0)
|
||||
for _, q := range tsdbQuery {
|
||||
model, err := simplejson.NewJson(q.JSON)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
timeField, err := model.Get("timeField").String()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rawQuery := model.Get("query").MustString()
|
||||
bucketAggs, err := parseBucketAggs(model)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
metrics, err := parseMetrics(model)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
alias := model.Get("alias").MustString("")
|
||||
interval := model.Get("interval").MustString("")
|
||||
|
||||
queries = append(queries, &Query{
|
||||
TimeField: timeField,
|
||||
RawQuery: rawQuery,
|
||||
BucketAggs: bucketAggs,
|
||||
Metrics: metrics,
|
||||
Alias: alias,
|
||||
Interval: interval,
|
||||
RefID: q.RefID,
|
||||
MaxDataPoints: q.MaxDataPoints,
|
||||
})
|
||||
}
|
||||
|
||||
return queries, nil
|
||||
}
|
||||
|
||||
func parseBucketAggs(model *simplejson.Json) ([]*BucketAgg, error) {
|
||||
var err error
|
||||
var result []*BucketAgg
|
||||
for _, t := range model.Get("bucketAggs").MustArray() {
|
||||
aggJSON := simplejson.NewFromAny(t)
|
||||
agg := &BucketAgg{}
|
||||
|
||||
agg.Type, err = aggJSON.Get("type").String()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
agg.ID, err = aggJSON.Get("id").String()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
agg.Field = aggJSON.Get("field").MustString()
|
||||
agg.Settings = simplejson.NewFromAny(aggJSON.Get("settings").MustMap())
|
||||
|
||||
result = append(result, agg)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func parseMetrics(model *simplejson.Json) ([]*MetricAgg, error) {
|
||||
var err error
|
||||
var result []*MetricAgg
|
||||
for _, t := range model.Get("metrics").MustArray() {
|
||||
metricJSON := simplejson.NewFromAny(t)
|
||||
metric := &MetricAgg{}
|
||||
|
||||
metric.Field = metricJSON.Get("field").MustString()
|
||||
metric.Hide = metricJSON.Get("hide").MustBool(false)
|
||||
metric.ID = metricJSON.Get("id").MustString()
|
||||
metric.PipelineAggregate = metricJSON.Get("pipelineAgg").MustString()
|
||||
// In legacy editors, we were storing empty settings values as "null"
|
||||
// The new editor doesn't store empty strings at all
|
||||
// We need to ensures backward compatibility with old queries and remove empty fields
|
||||
settings := metricJSON.Get("settings").MustMap()
|
||||
for k, v := range settings {
|
||||
if v == "null" {
|
||||
delete(settings, k)
|
||||
}
|
||||
}
|
||||
metric.Settings = simplejson.NewFromAny(settings)
|
||||
metric.Meta = simplejson.NewFromAny(metricJSON.Get("meta").MustMap())
|
||||
metric.Type, err = metricJSON.Get("type").String()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if isPipelineAggWithMultipleBucketPaths(metric.Type) {
|
||||
metric.PipelineVariables = map[string]string{}
|
||||
pvArr := metricJSON.Get("pipelineVariables").MustArray()
|
||||
for _, v := range pvArr {
|
||||
kv := v.(map[string]interface{})
|
||||
metric.PipelineVariables[kv["name"].(string)] = kv["pipelineAgg"].(string)
|
||||
}
|
||||
}
|
||||
|
||||
result = append(result, metric)
|
||||
}
|
||||
return result, nil
|
||||
}
|
107
pkg/tsdb/elasticsearch/parse_query_test.go
Normal file
107
pkg/tsdb/elasticsearch/parse_query_test.go
Normal file
@ -0,0 +1,107 @@
|
||||
package elasticsearch
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestParseQuery(t *testing.T) {
|
||||
t.Run("Test parse query", func(t *testing.T) {
|
||||
t.Run("Should be able to parse query", func(t *testing.T) {
|
||||
body := `{
|
||||
"timeField": "@timestamp",
|
||||
"query": "@metric:cpu",
|
||||
"alias": "{{@hostname}} {{metric}}",
|
||||
"interval": "10m",
|
||||
"metrics": [
|
||||
{
|
||||
"field": "@value",
|
||||
"id": "1",
|
||||
"meta": {},
|
||||
"settings": {
|
||||
"percents": [
|
||||
"90"
|
||||
]
|
||||
},
|
||||
"type": "percentiles"
|
||||
},
|
||||
{
|
||||
"type": "count",
|
||||
"field": "select field",
|
||||
"id": "4",
|
||||
"settings": {},
|
||||
"meta": {}
|
||||
}
|
||||
],
|
||||
"bucketAggs": [
|
||||
{
|
||||
"fake": true,
|
||||
"field": "@hostname",
|
||||
"id": "3",
|
||||
"settings": {
|
||||
"min_doc_count": 1,
|
||||
"order": "desc",
|
||||
"orderBy": "_term",
|
||||
"size": "10"
|
||||
},
|
||||
"type": "terms"
|
||||
},
|
||||
{
|
||||
"field": "@timestamp",
|
||||
"id": "2",
|
||||
"settings": {
|
||||
"interval": "5m",
|
||||
"min_doc_count": 0,
|
||||
"trimEdges": 0
|
||||
},
|
||||
"type": "date_histogram"
|
||||
}
|
||||
]
|
||||
}`
|
||||
dataQuery, err := newDataQuery(body)
|
||||
require.NoError(t, err)
|
||||
queries, err := parseQuery(dataQuery.Queries)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queries, 1)
|
||||
|
||||
q := queries[0]
|
||||
|
||||
require.Equal(t, q.TimeField, "@timestamp")
|
||||
require.Equal(t, q.RawQuery, "@metric:cpu")
|
||||
require.Equal(t, q.Alias, "{{@hostname}} {{metric}}")
|
||||
require.Equal(t, q.Interval, "10m")
|
||||
|
||||
require.Len(t, q.Metrics, 2)
|
||||
require.Equal(t, q.Metrics[0].Field, "@value")
|
||||
require.Equal(t, q.Metrics[0].ID, "1")
|
||||
require.Equal(t, q.Metrics[0].Type, "percentiles")
|
||||
require.False(t, q.Metrics[0].Hide)
|
||||
require.Equal(t, q.Metrics[0].PipelineAggregate, "")
|
||||
require.Equal(t, q.Metrics[0].Settings.Get("percents").MustStringArray()[0], "90")
|
||||
|
||||
require.Equal(t, q.Metrics[1].Field, "select field")
|
||||
require.Equal(t, q.Metrics[1].ID, "4")
|
||||
require.Equal(t, q.Metrics[1].Type, "count")
|
||||
require.False(t, q.Metrics[1].Hide)
|
||||
require.Equal(t, q.Metrics[1].PipelineAggregate, "")
|
||||
require.Empty(t, q.Metrics[1].Settings.MustMap())
|
||||
|
||||
require.Len(t, q.BucketAggs, 2)
|
||||
require.Equal(t, q.BucketAggs[0].Field, "@hostname")
|
||||
require.Equal(t, q.BucketAggs[0].ID, "3")
|
||||
require.Equal(t, q.BucketAggs[0].Type, "terms")
|
||||
require.Equal(t, q.BucketAggs[0].Settings.Get("min_doc_count").MustInt(), 1)
|
||||
require.Equal(t, q.BucketAggs[0].Settings.Get("order").MustString(), "desc")
|
||||
require.Equal(t, q.BucketAggs[0].Settings.Get("orderBy").MustString(), "_term")
|
||||
require.Equal(t, q.BucketAggs[0].Settings.Get("size").MustString(), "10")
|
||||
|
||||
require.Equal(t, q.BucketAggs[1].Field, "@timestamp")
|
||||
require.Equal(t, q.BucketAggs[1].ID, "2")
|
||||
require.Equal(t, q.BucketAggs[1].Type, "date_histogram")
|
||||
require.Equal(t, q.BucketAggs[1].Settings.Get("interval").MustString(), "5m")
|
||||
require.Equal(t, q.BucketAggs[1].Settings.Get("min_doc_count").MustInt(), 0)
|
||||
require.Equal(t, q.BucketAggs[1].Settings.Get("trimEdges").MustInt(), 0)
|
||||
})
|
||||
})
|
||||
}
|
@ -1176,8 +1176,7 @@ func parseTestResponse(tsdbQueries map[string]string, responseBody string) (*bac
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tsQueryParser := newTimeSeriesQueryParser()
|
||||
queries, err := tsQueryParser.parse(tsdbQuery.Queries)
|
||||
queries, err := parseQuery(tsdbQuery.Queries)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -28,8 +28,7 @@ var newTimeSeriesQuery = func(client es.Client, dataQuery []backend.DataQuery,
|
||||
}
|
||||
|
||||
func (e *timeSeriesQuery) execute() (*backend.QueryDataResponse, error) {
|
||||
tsQueryParser := newTimeSeriesQueryParser()
|
||||
queries, err := tsQueryParser.parse(e.dataQueries)
|
||||
queries, err := parseQuery(e.dataQueries)
|
||||
if err != nil {
|
||||
return &backend.QueryDataResponse{}, err
|
||||
}
|
||||
@ -398,113 +397,3 @@ func addGeoHashGridAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBui
|
||||
|
||||
return aggBuilder
|
||||
}
|
||||
|
||||
type timeSeriesQueryParser struct{}
|
||||
|
||||
func newTimeSeriesQueryParser() *timeSeriesQueryParser {
|
||||
return &timeSeriesQueryParser{}
|
||||
}
|
||||
|
||||
func (p *timeSeriesQueryParser) parse(tsdbQuery []backend.DataQuery) ([]*Query, error) {
|
||||
queries := make([]*Query, 0)
|
||||
for _, q := range tsdbQuery {
|
||||
model, err := simplejson.NewJson(q.JSON)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
timeField, err := model.Get("timeField").String()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rawQuery := model.Get("query").MustString()
|
||||
bucketAggs, err := p.parseBucketAggs(model)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
metrics, err := p.parseMetrics(model)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
alias := model.Get("alias").MustString("")
|
||||
interval := model.Get("interval").MustString("")
|
||||
|
||||
queries = append(queries, &Query{
|
||||
TimeField: timeField,
|
||||
RawQuery: rawQuery,
|
||||
BucketAggs: bucketAggs,
|
||||
Metrics: metrics,
|
||||
Alias: alias,
|
||||
Interval: interval,
|
||||
RefID: q.RefID,
|
||||
MaxDataPoints: q.MaxDataPoints,
|
||||
})
|
||||
}
|
||||
|
||||
return queries, nil
|
||||
}
|
||||
|
||||
func (p *timeSeriesQueryParser) parseBucketAggs(model *simplejson.Json) ([]*BucketAgg, error) {
|
||||
var err error
|
||||
var result []*BucketAgg
|
||||
for _, t := range model.Get("bucketAggs").MustArray() {
|
||||
aggJSON := simplejson.NewFromAny(t)
|
||||
agg := &BucketAgg{}
|
||||
|
||||
agg.Type, err = aggJSON.Get("type").String()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
agg.ID, err = aggJSON.Get("id").String()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
agg.Field = aggJSON.Get("field").MustString()
|
||||
agg.Settings = simplejson.NewFromAny(aggJSON.Get("settings").MustMap())
|
||||
|
||||
result = append(result, agg)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (p *timeSeriesQueryParser) parseMetrics(model *simplejson.Json) ([]*MetricAgg, error) {
|
||||
var err error
|
||||
var result []*MetricAgg
|
||||
for _, t := range model.Get("metrics").MustArray() {
|
||||
metricJSON := simplejson.NewFromAny(t)
|
||||
metric := &MetricAgg{}
|
||||
|
||||
metric.Field = metricJSON.Get("field").MustString()
|
||||
metric.Hide = metricJSON.Get("hide").MustBool(false)
|
||||
metric.ID = metricJSON.Get("id").MustString()
|
||||
metric.PipelineAggregate = metricJSON.Get("pipelineAgg").MustString()
|
||||
// In legacy editors, we were storing empty settings values as "null"
|
||||
// The new editor doesn't store empty strings at all
|
||||
// We need to ensures backward compatibility with old queries and remove empty fields
|
||||
settings := metricJSON.Get("settings").MustMap()
|
||||
for k, v := range settings {
|
||||
if v == "null" {
|
||||
delete(settings, k)
|
||||
}
|
||||
}
|
||||
metric.Settings = simplejson.NewFromAny(settings)
|
||||
metric.Meta = simplejson.NewFromAny(metricJSON.Get("meta").MustMap())
|
||||
metric.Type, err = metricJSON.Get("type").String()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if isPipelineAggWithMultipleBucketPaths(metric.Type) {
|
||||
metric.PipelineVariables = map[string]string{}
|
||||
pvArr := metricJSON.Get("pipelineVariables").MustArray()
|
||||
for _, v := range pvArr {
|
||||
kv := v.(map[string]interface{})
|
||||
metric.PipelineVariables[kv["name"].(string)] = kv["pipelineAgg"].(string)
|
||||
}
|
||||
}
|
||||
|
||||
result = append(result, metric)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
@ -1814,105 +1814,3 @@ func executeTsdbQuery(c es.Client, body string, from, to time.Time, minInterval
|
||||
query := newTimeSeriesQuery(c, dataRequest.Queries, intervalv2.NewCalculator(intervalv2.CalculatorOptions{MinInterval: minInterval}))
|
||||
return query.execute()
|
||||
}
|
||||
|
||||
func TestTimeSeriesQueryParser(t *testing.T) {
|
||||
t.Run("Test time series query parser", func(t *testing.T) {
|
||||
p := newTimeSeriesQueryParser()
|
||||
|
||||
t.Run("Should be able to parse query", func(t *testing.T) {
|
||||
body := `{
|
||||
"timeField": "@timestamp",
|
||||
"query": "@metric:cpu",
|
||||
"alias": "{{@hostname}} {{metric}}",
|
||||
"interval": "10m",
|
||||
"metrics": [
|
||||
{
|
||||
"field": "@value",
|
||||
"id": "1",
|
||||
"meta": {},
|
||||
"settings": {
|
||||
"percents": [
|
||||
"90"
|
||||
]
|
||||
},
|
||||
"type": "percentiles"
|
||||
},
|
||||
{
|
||||
"type": "count",
|
||||
"field": "select field",
|
||||
"id": "4",
|
||||
"settings": {},
|
||||
"meta": {}
|
||||
}
|
||||
],
|
||||
"bucketAggs": [
|
||||
{
|
||||
"fake": true,
|
||||
"field": "@hostname",
|
||||
"id": "3",
|
||||
"settings": {
|
||||
"min_doc_count": 1,
|
||||
"order": "desc",
|
||||
"orderBy": "_term",
|
||||
"size": "10"
|
||||
},
|
||||
"type": "terms"
|
||||
},
|
||||
{
|
||||
"field": "@timestamp",
|
||||
"id": "2",
|
||||
"settings": {
|
||||
"interval": "5m",
|
||||
"min_doc_count": 0,
|
||||
"trimEdges": 0
|
||||
},
|
||||
"type": "date_histogram"
|
||||
}
|
||||
]
|
||||
}`
|
||||
dataQuery, err := newDataQuery(body)
|
||||
require.NoError(t, err)
|
||||
queries, err := p.parse(dataQuery.Queries)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queries, 1)
|
||||
|
||||
q := queries[0]
|
||||
|
||||
require.Equal(t, q.TimeField, "@timestamp")
|
||||
require.Equal(t, q.RawQuery, "@metric:cpu")
|
||||
require.Equal(t, q.Alias, "{{@hostname}} {{metric}}")
|
||||
require.Equal(t, q.Interval, "10m")
|
||||
|
||||
require.Len(t, q.Metrics, 2)
|
||||
require.Equal(t, q.Metrics[0].Field, "@value")
|
||||
require.Equal(t, q.Metrics[0].ID, "1")
|
||||
require.Equal(t, q.Metrics[0].Type, "percentiles")
|
||||
require.False(t, q.Metrics[0].Hide)
|
||||
require.Equal(t, q.Metrics[0].PipelineAggregate, "")
|
||||
require.Equal(t, q.Metrics[0].Settings.Get("percents").MustStringArray()[0], "90")
|
||||
|
||||
require.Equal(t, q.Metrics[1].Field, "select field")
|
||||
require.Equal(t, q.Metrics[1].ID, "4")
|
||||
require.Equal(t, q.Metrics[1].Type, "count")
|
||||
require.False(t, q.Metrics[1].Hide)
|
||||
require.Equal(t, q.Metrics[1].PipelineAggregate, "")
|
||||
require.Empty(t, q.Metrics[1].Settings.MustMap())
|
||||
|
||||
require.Len(t, q.BucketAggs, 2)
|
||||
require.Equal(t, q.BucketAggs[0].Field, "@hostname")
|
||||
require.Equal(t, q.BucketAggs[0].ID, "3")
|
||||
require.Equal(t, q.BucketAggs[0].Type, "terms")
|
||||
require.Equal(t, q.BucketAggs[0].Settings.Get("min_doc_count").MustInt(), 1)
|
||||
require.Equal(t, q.BucketAggs[0].Settings.Get("order").MustString(), "desc")
|
||||
require.Equal(t, q.BucketAggs[0].Settings.Get("orderBy").MustString(), "_term")
|
||||
require.Equal(t, q.BucketAggs[0].Settings.Get("size").MustString(), "10")
|
||||
|
||||
require.Equal(t, q.BucketAggs[1].Field, "@timestamp")
|
||||
require.Equal(t, q.BucketAggs[1].ID, "2")
|
||||
require.Equal(t, q.BucketAggs[1].Type, "date_histogram")
|
||||
require.Equal(t, q.BucketAggs[1].Settings.Get("interval").MustString(), "5m")
|
||||
require.Equal(t, q.BucketAggs[1].Settings.Get("min_doc_count").MustInt(), 0)
|
||||
require.Equal(t, q.BucketAggs[1].Settings.Get("trimEdges").MustInt(), 0)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
Reference in New Issue
Block a user