From 4f0b31d21bcd92f7a2da307e2b5c76652fdf7471 Mon Sep 17 00:00:00 2001 From: Ivana Huckova <30407135+ivanahuckova@users.noreply.github.com> Date: Mon, 18 Sep 2023 10:49:12 +0200 Subject: [PATCH] Elasticsearch: Add tracing to data source (#74750) * Elasticsearch: Add tracing do data source * Fix tests * Address feedback * Update pkg/tsdb/elasticsearch/response_parser.go Co-authored-by: Sven Grossmann * Update pkg/tsdb/elasticsearch/response_parser.go Co-authored-by: Sven Grossmann * Track error across both spans * Add span for decoding of response * Fix test * Update setting of errors + fix test --------- Co-authored-by: Sven Grossmann --- .../plugins_integration_test.go | 2 +- pkg/tsdb/elasticsearch/client/client.go | 27 ++++++++++++++++++- pkg/tsdb/elasticsearch/client/client_test.go | 5 ++-- pkg/tsdb/elasticsearch/data_query.go | 7 +++-- pkg/tsdb/elasticsearch/data_query_test.go | 3 ++- pkg/tsdb/elasticsearch/elasticsearch.go | 17 +++++++----- pkg/tsdb/elasticsearch/querydata_test.go | 3 ++- pkg/tsdb/elasticsearch/response_parser.go | 20 ++++++++++++-- .../elasticsearch/response_parser_test.go | 5 ++-- 9 files changed, 71 insertions(+), 18 deletions(-) diff --git a/pkg/services/pluginsintegration/plugins_integration_test.go b/pkg/services/pluginsintegration/plugins_integration_test.go index 85e9c0cbf65..d4cb7d042ef 100644 --- a/pkg/services/pluginsintegration/plugins_integration_test.go +++ b/pkg/services/pluginsintegration/plugins_integration_test.go @@ -82,7 +82,7 @@ func TestIntegrationPluginManager(t *testing.T) { am := azuremonitor.ProvideService(cfg, hcp, features, tracer) cw := cloudwatch.ProvideService(cfg, hcp, features) cm := cloudmonitoring.ProvideService(hcp, tracer) - es := elasticsearch.ProvideService(hcp) + es := elasticsearch.ProvideService(hcp, tracer) grap := graphite.ProvideService(hcp, tracer) idb := influxdb.ProvideService(hcp) lk := loki.ProvideService(hcp, features, tracer) diff --git a/pkg/tsdb/elasticsearch/client/client.go b/pkg/tsdb/elasticsearch/client/client.go index f172a61c4b3..fa76e94fcc1 100644 --- a/pkg/tsdb/elasticsearch/client/client.go +++ b/pkg/tsdb/elasticsearch/client/client.go @@ -14,8 +14,11 @@ import ( "time" "github.com/grafana/grafana-plugin-sdk-go/backend" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/tracing" ) // Used in logging to mark a stage @@ -52,7 +55,7 @@ type Client interface { } // NewClient creates a new elasticsearch client -var NewClient = func(ctx context.Context, ds *DatasourceInfo, timeRange backend.TimeRange, logger log.Logger) (Client, error) { +var NewClient = func(ctx context.Context, ds *DatasourceInfo, timeRange backend.TimeRange, logger log.Logger, tracer tracing.Tracer) (Client, error) { logger = logger.New("entity", "client") ip, err := newIndexPattern(ds.Interval, ds.Database) @@ -74,6 +77,7 @@ var NewClient = func(ctx context.Context, ds *DatasourceInfo, timeRange backend. configuredFields: ds.ConfiguredFields, indices: indices, timeRange: timeRange, + tracer: tracer, }, nil } @@ -84,6 +88,7 @@ type baseClientImpl struct { indices []string timeRange backend.TimeRange logger log.Logger + tracer tracing.Tracer } func (c *baseClientImpl) GetConfiguredFields() ConfiguredFields { @@ -163,8 +168,20 @@ func (c *baseClientImpl) executeRequest(method, uriPath, uriQuery string, body [ } func (c *baseClientImpl) ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearchResponse, error) { + var err error multiRequests := c.createMultiSearchRequests(r.Requests) queryParams := c.getMultiSearchQueryParameters() + _, span := c.tracer.Start(c.ctx, "datasource.elasticsearch.queryData.executeMultisearch") + span.SetAttributes("queryParams", queryParams, attribute.Key("queryParams").String(queryParams)) + span.SetAttributes("url", c.ds.URL, attribute.Key("url").String(c.ds.URL)) + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + span.End() + }() + start := time.Now() clientRes, err := c.executeBatchRequest("_msearch", queryParams, multiRequests) if err != nil { @@ -191,6 +208,14 @@ func (c *baseClientImpl) ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearch start = time.Now() var msr MultiSearchResponse dec := json.NewDecoder(res.Body) + _, resSpan := c.tracer.Start(c.ctx, "datasource.elasticsearch.queryData.executeMultisearch.decodeResponse") + defer func() { + if err != nil { + resSpan.RecordError(err) + resSpan.SetStatus(codes.Error, err.Error()) + } + resSpan.End() + }() err = dec.Decode(&msr) if err != nil { c.logger.Error("Failed to decode response from Elasticsearch", "error", err, "duration", time.Since(start)) diff --git a/pkg/tsdb/elasticsearch/client/client_test.go b/pkg/tsdb/elasticsearch/client/client_test.go index 04f0e6c90c0..2c3e25bde3f 100644 --- a/pkg/tsdb/elasticsearch/client/client_test.go +++ b/pkg/tsdb/elasticsearch/client/client_test.go @@ -15,6 +15,7 @@ import ( "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/tracing" ) func TestClient_ExecuteMultisearch(t *testing.T) { @@ -67,7 +68,7 @@ func TestClient_ExecuteMultisearch(t *testing.T) { To: to, } - c, err := NewClient(context.Background(), &ds, timeRange, log.New("test", "test")) + c, err := NewClient(context.Background(), &ds, timeRange, log.New("test", "test"), tracing.NewFakeTracer()) require.NoError(t, err) require.NotNil(t, c) @@ -189,7 +190,7 @@ func TestClient_Index(t *testing.T) { To: to, } - c, err := NewClient(context.Background(), &ds, timeRange, log.New("test", "test")) + c, err := NewClient(context.Background(), &ds, timeRange, log.New("test", "test"), tracing.NewFakeTracer()) require.NoError(t, err) require.NotNil(t, c) diff --git a/pkg/tsdb/elasticsearch/data_query.go b/pkg/tsdb/elasticsearch/data_query.go index 51ebb9037d0..94b7e45230e 100644 --- a/pkg/tsdb/elasticsearch/data_query.go +++ b/pkg/tsdb/elasticsearch/data_query.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/tracing" es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client" ) @@ -24,14 +25,16 @@ type elasticsearchDataQuery struct { dataQueries []backend.DataQuery logger log.Logger ctx context.Context + tracer tracing.Tracer } -var newElasticsearchDataQuery = func(ctx context.Context, client es.Client, dataQuery []backend.DataQuery, logger log.Logger) *elasticsearchDataQuery { +var newElasticsearchDataQuery = func(ctx context.Context, client es.Client, dataQuery []backend.DataQuery, logger log.Logger, tracer tracing.Tracer) *elasticsearchDataQuery { return &elasticsearchDataQuery{ client: client, dataQueries: dataQuery, logger: logger, ctx: ctx, + tracer: tracer, } } @@ -70,7 +73,7 @@ func (e *elasticsearchDataQuery) execute() (*backend.QueryDataResponse, error) { return &backend.QueryDataResponse{}, err } - return parseResponse(e.ctx, res.Responses, queries, e.client.GetConfiguredFields(), e.logger) + return parseResponse(e.ctx, res.Responses, queries, e.client.GetConfiguredFields(), e.logger, e.tracer) } func (e *elasticsearchDataQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilder, from, to int64) error { diff --git a/pkg/tsdb/elasticsearch/data_query_test.go b/pkg/tsdb/elasticsearch/data_query_test.go index b51aa65f908..127d11e27e4 100644 --- a/pkg/tsdb/elasticsearch/data_query_test.go +++ b/pkg/tsdb/elasticsearch/data_query_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/tracing" es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client" ) @@ -1818,6 +1819,6 @@ func executeElasticsearchDataQuery(c es.Client, body string, from, to time.Time) }, }, } - query := newElasticsearchDataQuery(context.Background(), c, dataRequest.Queries, log.New("test.logger")) + query := newElasticsearchDataQuery(context.Background(), c, dataRequest.Queries, log.New("test.logger"), tracing.NewFakeTracer()) return query.execute() } diff --git a/pkg/tsdb/elasticsearch/elasticsearch.go b/pkg/tsdb/elasticsearch/elasticsearch.go index 4403ced8345..d9ac1fd5e74 100644 --- a/pkg/tsdb/elasticsearch/elasticsearch.go +++ b/pkg/tsdb/elasticsearch/elasticsearch.go @@ -20,6 +20,7 @@ import ( "github.com/grafana/grafana/pkg/infra/httpclient" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/tracing" ngalertmodels "github.com/grafana/grafana/pkg/services/ngalert/models" es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client" ) @@ -29,39 +30,43 @@ var eslog = log.New("tsdb.elasticsearch") type Service struct { httpClientProvider httpclient.Provider im instancemgmt.InstanceManager + tracer tracing.Tracer + logger *log.ConcreteLogger } -func ProvideService(httpClientProvider httpclient.Provider) *Service { +func ProvideService(httpClientProvider httpclient.Provider, tracer tracing.Tracer) *Service { return &Service{ im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider)), httpClientProvider: httpClientProvider, + tracer: tracer, + logger: eslog, } } func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { dsInfo, err := s.getDSInfo(ctx, req.PluginContext) _, fromAlert := req.Headers[ngalertmodels.FromAlertHeaderName] - logger := eslog.FromContext(ctx).New("fromAlert", fromAlert) + logger := s.logger.FromContext(ctx).New("fromAlert", fromAlert) if err != nil { logger.Error("Failed to get data source info", "error", err) return &backend.QueryDataResponse{}, err } - return queryData(ctx, req.Queries, dsInfo, logger) + return queryData(ctx, req.Queries, dsInfo, logger, s.tracer) } // separate function to allow testing the whole transformation and query flow -func queryData(ctx context.Context, queries []backend.DataQuery, dsInfo *es.DatasourceInfo, logger log.Logger) (*backend.QueryDataResponse, error) { +func queryData(ctx context.Context, queries []backend.DataQuery, dsInfo *es.DatasourceInfo, logger log.Logger, tracer tracing.Tracer) (*backend.QueryDataResponse, error) { if len(queries) == 0 { return &backend.QueryDataResponse{}, fmt.Errorf("query contains no queries") } - client, err := es.NewClient(ctx, dsInfo, queries[0].TimeRange, logger) + client, err := es.NewClient(ctx, dsInfo, queries[0].TimeRange, logger, tracer) if err != nil { return &backend.QueryDataResponse{}, err } - query := newElasticsearchDataQuery(ctx, client, queries, logger) + query := newElasticsearchDataQuery(ctx, client, queries, logger, tracer) return query.execute() } diff --git a/pkg/tsdb/elasticsearch/querydata_test.go b/pkg/tsdb/elasticsearch/querydata_test.go index e42f0b3b5ce..780b93ea5db 100644 --- a/pkg/tsdb/elasticsearch/querydata_test.go +++ b/pkg/tsdb/elasticsearch/querydata_test.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/tracing" es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client" ) @@ -138,7 +139,7 @@ func queryDataTestWithResponseCode(queriesBytes []byte, responseStatusCode int, return nil }) - result, err := queryData(context.Background(), queries, dsInfo, log.New("test.logger")) + result, err := queryData(context.Background(), queries, dsInfo, log.New("test.logger"), tracing.NewFakeTracer()) if err != nil { return queryDataTestResult{}, err } diff --git a/pkg/tsdb/elasticsearch/response_parser.go b/pkg/tsdb/elasticsearch/response_parser.go index ed186d2848f..3063fb2e0fe 100644 --- a/pkg/tsdb/elasticsearch/response_parser.go +++ b/pkg/tsdb/elasticsearch/response_parser.go @@ -13,9 +13,12 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/tracing" es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client" ) @@ -41,21 +44,29 @@ const ( var searchWordsRegex = regexp.MustCompile(regexp.QuoteMeta(es.HighlightPreTagsString) + `(.*?)` + regexp.QuoteMeta(es.HighlightPostTagsString)) -func parseResponse(ctx context.Context, responses []*es.SearchResponse, targets []*Query, configuredFields es.ConfiguredFields, logger log.Logger) (*backend.QueryDataResponse, error) { +func parseResponse(ctx context.Context, responses []*es.SearchResponse, targets []*Query, configuredFields es.ConfiguredFields, logger log.Logger, tracer tracing.Tracer) (*backend.QueryDataResponse, error) { result := backend.QueryDataResponse{ Responses: backend.Responses{}, } if responses == nil { return &result, nil } + ctx, span := tracer.Start(ctx, "datasource.elastic.parseResponse") + span.SetAttributes("responseLength", len(responses), attribute.Key("responseLength").Int(len(responses))) + defer span.End() for i, res := range responses { + _, resSpan := tracer.Start(ctx, "datasource.elastic.parseResponse.response") + resSpan.SetAttributes("queryMetricType", targets[i].Metrics[0].Type, attribute.Key("queryMetricType").String(targets[i].Metrics[0].Type)) start := time.Now() target := targets[i] if res.Error != nil { mt, _ := json.Marshal(target) me, _ := json.Marshal(res.Error) + resSpan.RecordError(errors.New(string(me))) + resSpan.SetStatus(codes.Error, string(me)) + resSpan.End() logger.Error("Processing error response from Elasticsearch", "error", string(me), "query", string(mt)) errResult := getErrorFromElasticResponse(res) result.Responses[target.RefID] = backend.DataResponse{ @@ -94,6 +105,11 @@ func parseResponse(ctx context.Context, responses []*es.SearchResponse, targets logger.Debug("Processed metric query response") if err != nil { mt, _ := json.Marshal(target) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + resSpan.RecordError(err) + resSpan.SetStatus(codes.Error, err.Error()) + resSpan.End() logger.Error("Error processing buckets", "error", err, "query", string(mt), "aggregationsLength", len(res.Aggregations)) return &backend.QueryDataResponse{}, err } @@ -102,9 +118,9 @@ func parseResponse(ctx context.Context, responses []*es.SearchResponse, targets result.Responses[target.RefID] = queryRes } + resSpan.End() logger.Info("Finished processing of response", "duration", time.Since(start), "stage", es.StageParseResponse) } - return &result, nil } diff --git a/pkg/tsdb/elasticsearch/response_parser_test.go b/pkg/tsdb/elasticsearch/response_parser_test.go index f65a11b946c..d285b377ade 100644 --- a/pkg/tsdb/elasticsearch/response_parser_test.go +++ b/pkg/tsdb/elasticsearch/response_parser_test.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/tracing" es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client" ) @@ -3533,7 +3534,7 @@ func parseTestResponse(tsdbQueries map[string]string, responseBody string) (*bac return nil, err } - return parseResponse(context.Background(), response.Responses, queries, configuredFields, log.New("test.logger")) + return parseResponse(context.Background(), response.Responses, queries, configuredFields, log.New("test.logger"), tracing.NewFakeTracer()) } func requireTimeValue(t *testing.T, expected int64, frame *data.Frame, index int) { @@ -3583,7 +3584,7 @@ func requireStringAt(t *testing.T, expected string, field *data.Field, index int func requireFloatAt(t *testing.T, expected float64, field *data.Field, index int) { v := field.At(index).(*float64) - require.Equal(t, expected, *v, fmt.Sprintf("wrong flaot at index %v", index)) + require.Equal(t, expected, *v, fmt.Sprintf("wrong float at index %v", index)) } func requireTimeSeriesName(t *testing.T, expected string, frame *data.Frame) {