mirror of
https://github.com/grafana/grafana.git
synced 2025-07-29 20:52:21 +08:00

Handle all replacements if interval template variables in the client. Fix issue with client and different versions. Adds better tests of the client
319 lines
8.7 KiB
Go
319 lines
8.7 KiB
Go
package elasticsearch
|
|
|
|
import (
|
|
"fmt"
|
|
"strconv"
|
|
|
|
"github.com/grafana/grafana/pkg/components/simplejson"
|
|
"github.com/grafana/grafana/pkg/tsdb"
|
|
"github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
|
)
|
|
|
|
type timeSeriesQuery struct {
|
|
client es.Client
|
|
tsdbQuery *tsdb.TsdbQuery
|
|
intervalCalculator tsdb.IntervalCalculator
|
|
}
|
|
|
|
var newTimeSeriesQuery = func(client es.Client, tsdbQuery *tsdb.TsdbQuery, intervalCalculator tsdb.IntervalCalculator) *timeSeriesQuery {
|
|
return &timeSeriesQuery{
|
|
client: client,
|
|
tsdbQuery: tsdbQuery,
|
|
intervalCalculator: intervalCalculator,
|
|
}
|
|
}
|
|
|
|
func (e *timeSeriesQuery) execute() (*tsdb.Response, error) {
|
|
result := &tsdb.Response{}
|
|
result.Results = make(map[string]*tsdb.QueryResult)
|
|
|
|
tsQueryParser := newTimeSeriesQueryParser()
|
|
queries, err := tsQueryParser.parse(e.tsdbQuery)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ms := e.client.MultiSearch()
|
|
|
|
from := fmt.Sprintf("%d", e.tsdbQuery.TimeRange.GetFromAsMsEpoch())
|
|
to := fmt.Sprintf("%d", e.tsdbQuery.TimeRange.GetToAsMsEpoch())
|
|
|
|
for _, q := range queries {
|
|
minInterval, err := e.client.GetMinInterval(q.Interval)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
interval := e.intervalCalculator.Calculate(e.tsdbQuery.TimeRange, minInterval)
|
|
|
|
b := ms.Search(interval)
|
|
b.Size(0)
|
|
filters := b.Query().Bool().Filter()
|
|
filters.AddDateRangeFilter(e.client.GetTimeField(), to, from, es.DateFormatEpochMS)
|
|
|
|
if q.RawQuery != "" {
|
|
filters.AddQueryStringFilter(q.RawQuery, true)
|
|
}
|
|
|
|
if len(q.BucketAggs) == 0 {
|
|
if len(q.Metrics) == 0 || q.Metrics[0].Type != "raw_document" {
|
|
result.Results[q.RefID] = &tsdb.QueryResult{
|
|
RefId: q.RefID,
|
|
Error: fmt.Errorf("invalid query, missing metrics and aggregations"),
|
|
ErrorString: "invalid query, missing metrics and aggregations",
|
|
}
|
|
continue
|
|
}
|
|
metric := q.Metrics[0]
|
|
b.Size(metric.Settings.Get("size").MustInt(500))
|
|
b.SortDesc("@timestamp", "boolean")
|
|
b.AddDocValueField("@timestamp")
|
|
continue
|
|
}
|
|
|
|
aggBuilder := b.Agg()
|
|
|
|
// iterate backwards to create aggregations bottom-down
|
|
for _, bucketAgg := range q.BucketAggs {
|
|
switch bucketAgg.Type {
|
|
case "date_histogram":
|
|
aggBuilder = addDateHistogramAgg(aggBuilder, bucketAgg, from, to)
|
|
case "histogram":
|
|
aggBuilder = addHistogramAgg(aggBuilder, bucketAgg)
|
|
case "filters":
|
|
aggBuilder = addFiltersAgg(aggBuilder, bucketAgg)
|
|
case "terms":
|
|
aggBuilder = addTermsAgg(aggBuilder, bucketAgg, q.Metrics)
|
|
case "geohash_grid":
|
|
aggBuilder = addGeoHashGridAgg(aggBuilder, bucketAgg)
|
|
}
|
|
}
|
|
|
|
for _, m := range q.Metrics {
|
|
if m.Type == "count" {
|
|
continue
|
|
}
|
|
|
|
if isPipelineAgg(m.Type) {
|
|
if _, err := strconv.Atoi(m.PipelineAggregate); err == nil {
|
|
aggBuilder.Pipeline(m.ID, m.Type, m.PipelineAggregate, func(a *es.PipelineAggregation) {
|
|
a.Settings = m.Settings.MustMap()
|
|
})
|
|
} else {
|
|
continue
|
|
}
|
|
} else {
|
|
aggBuilder.Metric(m.ID, m.Type, m.Field, func(a *es.MetricAggregation) {
|
|
a.Settings = m.Settings.MustMap()
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
req, err := ms.Build()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
res, err := e.client.ExecuteMultisearch(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
rp := newResponseParser(res.Responses, queries)
|
|
return rp.getTimeSeries()
|
|
}
|
|
|
|
func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFrom, timeTo string) es.AggBuilder {
|
|
aggBuilder.DateHistogram(bucketAgg.ID, bucketAgg.Field, func(a *es.DateHistogramAgg, b es.AggBuilder) {
|
|
a.Interval = bucketAgg.Settings.Get("interval").MustString("auto")
|
|
a.MinDocCount = bucketAgg.Settings.Get("min_doc_count").MustInt(0)
|
|
a.ExtendedBounds = &es.ExtendedBounds{Min: timeFrom, Max: timeTo}
|
|
a.Format = bucketAgg.Settings.Get("format").MustString(es.DateFormatEpochMS)
|
|
|
|
if a.Interval == "auto" {
|
|
a.Interval = "$__interval"
|
|
}
|
|
|
|
if missing, err := bucketAgg.Settings.Get("missing").String(); err == nil {
|
|
a.Missing = &missing
|
|
}
|
|
|
|
aggBuilder = b
|
|
})
|
|
|
|
return aggBuilder
|
|
}
|
|
|
|
func addHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBuilder {
|
|
aggBuilder.Histogram(bucketAgg.ID, bucketAgg.Field, func(a *es.HistogramAgg, b es.AggBuilder) {
|
|
a.Interval = bucketAgg.Settings.Get("interval").MustInt(1000)
|
|
a.MinDocCount = bucketAgg.Settings.Get("min_doc_count").MustInt(0)
|
|
|
|
if missing, err := bucketAgg.Settings.Get("missing").Int(); err == nil {
|
|
a.Missing = &missing
|
|
}
|
|
|
|
aggBuilder = b
|
|
})
|
|
|
|
return aggBuilder
|
|
}
|
|
|
|
func addTermsAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, metrics []*MetricAgg) es.AggBuilder {
|
|
aggBuilder.Terms(bucketAgg.ID, bucketAgg.Field, func(a *es.TermsAggregation, b es.AggBuilder) {
|
|
if size, err := bucketAgg.Settings.Get("size").Int(); err == nil {
|
|
a.Size = size
|
|
} else if size, err := bucketAgg.Settings.Get("size").String(); err == nil {
|
|
a.Size, err = strconv.Atoi(size)
|
|
if err != nil {
|
|
a.Size = 500
|
|
}
|
|
} else {
|
|
a.Size = 500
|
|
}
|
|
if minDocCount, err := bucketAgg.Settings.Get("min_doc_count").Int(); err == nil {
|
|
a.MinDocCount = &minDocCount
|
|
}
|
|
if missing, err := bucketAgg.Settings.Get("missing").String(); err == nil {
|
|
a.Missing = &missing
|
|
}
|
|
|
|
if orderBy, err := bucketAgg.Settings.Get("orderBy").String(); err == nil {
|
|
a.Order[orderBy] = bucketAgg.Settings.Get("order").MustString("desc")
|
|
|
|
if _, err := strconv.Atoi(orderBy); err == nil {
|
|
for _, m := range metrics {
|
|
if m.ID == orderBy {
|
|
b.Metric(m.ID, m.Type, m.Field, nil)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
aggBuilder = b
|
|
})
|
|
|
|
return aggBuilder
|
|
}
|
|
|
|
func addFiltersAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBuilder {
|
|
filters := make(map[string]interface{})
|
|
for _, filter := range bucketAgg.Settings.Get("filters").MustArray() {
|
|
json := simplejson.NewFromAny(filter)
|
|
query := json.Get("query").MustString()
|
|
label := json.Get("label").MustString()
|
|
if label == "" {
|
|
label = query
|
|
}
|
|
filters[label] = &es.QueryStringFilter{Query: query, AnalyzeWildcard: true}
|
|
}
|
|
|
|
if len(filters) > 0 {
|
|
aggBuilder.Filters(bucketAgg.ID, func(a *es.FiltersAggregation, b es.AggBuilder) {
|
|
a.Filters = filters
|
|
aggBuilder = b
|
|
})
|
|
}
|
|
|
|
return aggBuilder
|
|
}
|
|
|
|
func addGeoHashGridAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBuilder {
|
|
aggBuilder.GeoHashGrid(bucketAgg.ID, bucketAgg.Field, func(a *es.GeoHashGridAggregation, b es.AggBuilder) {
|
|
a.Precision = bucketAgg.Settings.Get("precision").MustInt(3)
|
|
aggBuilder = b
|
|
})
|
|
|
|
return aggBuilder
|
|
}
|
|
|
|
type timeSeriesQueryParser struct{}
|
|
|
|
func newTimeSeriesQueryParser() *timeSeriesQueryParser {
|
|
return &timeSeriesQueryParser{}
|
|
}
|
|
|
|
func (p *timeSeriesQueryParser) parse(tsdbQuery *tsdb.TsdbQuery) ([]*Query, error) {
|
|
queries := make([]*Query, 0)
|
|
for _, q := range tsdbQuery.Queries {
|
|
model := q.Model
|
|
timeField, err := model.Get("timeField").String()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rawQuery := model.Get("query").MustString()
|
|
bucketAggs, err := p.parseBucketAggs(model)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
metrics, err := p.parseMetrics(model)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
alias := model.Get("alias").MustString("")
|
|
interval := strconv.FormatInt(q.IntervalMs, 10) + "ms"
|
|
|
|
queries = append(queries, &Query{
|
|
TimeField: timeField,
|
|
RawQuery: rawQuery,
|
|
BucketAggs: bucketAggs,
|
|
Metrics: metrics,
|
|
Alias: alias,
|
|
Interval: interval,
|
|
RefID: q.RefId,
|
|
})
|
|
}
|
|
|
|
return queries, nil
|
|
}
|
|
|
|
func (p *timeSeriesQueryParser) parseBucketAggs(model *simplejson.Json) ([]*BucketAgg, error) {
|
|
var err error
|
|
var result []*BucketAgg
|
|
for _, t := range model.Get("bucketAggs").MustArray() {
|
|
aggJSON := simplejson.NewFromAny(t)
|
|
agg := &BucketAgg{}
|
|
|
|
agg.Type, err = aggJSON.Get("type").String()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
agg.ID, err = aggJSON.Get("id").String()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
agg.Field = aggJSON.Get("field").MustString()
|
|
agg.Settings = simplejson.NewFromAny(aggJSON.Get("settings").MustMap())
|
|
|
|
result = append(result, agg)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func (p *timeSeriesQueryParser) parseMetrics(model *simplejson.Json) ([]*MetricAgg, error) {
|
|
var err error
|
|
var result []*MetricAgg
|
|
for _, t := range model.Get("metrics").MustArray() {
|
|
metricJSON := simplejson.NewFromAny(t)
|
|
metric := &MetricAgg{}
|
|
|
|
metric.Field = metricJSON.Get("field").MustString()
|
|
metric.Hide = metricJSON.Get("hide").MustBool(false)
|
|
metric.ID = metricJSON.Get("id").MustString()
|
|
metric.PipelineAggregate = metricJSON.Get("pipelineAgg").MustString()
|
|
metric.Settings = simplejson.NewFromAny(metricJSON.Get("settings").MustMap())
|
|
metric.Meta = simplejson.NewFromAny(metricJSON.Get("meta").MustMap())
|
|
|
|
metric.Type, err = metricJSON.Get("type").String()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result = append(result, metric)
|
|
}
|
|
return result, nil
|
|
}
|