mirror of
https://github.com/grafana/grafana.git
synced 2025-08-02 05:46:28 +08:00
Elasticsearch: Add tracing to data source (#74750)
* Elasticsearch: Add tracing do data source * Fix tests * Address feedback * Update pkg/tsdb/elasticsearch/response_parser.go Co-authored-by: Sven Grossmann <sven.grossmann@grafana.com> * Update pkg/tsdb/elasticsearch/response_parser.go Co-authored-by: Sven Grossmann <sven.grossmann@grafana.com> * Track error across both spans * Add span for decoding of response * Fix test * Update setting of errors + fix test --------- Co-authored-by: Sven Grossmann <sven.grossmann@grafana.com>
This commit is contained in:
@ -82,7 +82,7 @@ func TestIntegrationPluginManager(t *testing.T) {
|
||||
am := azuremonitor.ProvideService(cfg, hcp, features, tracer)
|
||||
cw := cloudwatch.ProvideService(cfg, hcp, features)
|
||||
cm := cloudmonitoring.ProvideService(hcp, tracer)
|
||||
es := elasticsearch.ProvideService(hcp)
|
||||
es := elasticsearch.ProvideService(hcp, tracer)
|
||||
grap := graphite.ProvideService(hcp, tracer)
|
||||
idb := influxdb.ProvideService(hcp)
|
||||
lk := loki.ProvideService(hcp, features, tracer)
|
||||
|
@ -14,8 +14,11 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
)
|
||||
|
||||
// Used in logging to mark a stage
|
||||
@ -52,7 +55,7 @@ type Client interface {
|
||||
}
|
||||
|
||||
// NewClient creates a new elasticsearch client
|
||||
var NewClient = func(ctx context.Context, ds *DatasourceInfo, timeRange backend.TimeRange, logger log.Logger) (Client, error) {
|
||||
var NewClient = func(ctx context.Context, ds *DatasourceInfo, timeRange backend.TimeRange, logger log.Logger, tracer tracing.Tracer) (Client, error) {
|
||||
logger = logger.New("entity", "client")
|
||||
|
||||
ip, err := newIndexPattern(ds.Interval, ds.Database)
|
||||
@ -74,6 +77,7 @@ var NewClient = func(ctx context.Context, ds *DatasourceInfo, timeRange backend.
|
||||
configuredFields: ds.ConfiguredFields,
|
||||
indices: indices,
|
||||
timeRange: timeRange,
|
||||
tracer: tracer,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -84,6 +88,7 @@ type baseClientImpl struct {
|
||||
indices []string
|
||||
timeRange backend.TimeRange
|
||||
logger log.Logger
|
||||
tracer tracing.Tracer
|
||||
}
|
||||
|
||||
func (c *baseClientImpl) GetConfiguredFields() ConfiguredFields {
|
||||
@ -163,8 +168,20 @@ func (c *baseClientImpl) executeRequest(method, uriPath, uriQuery string, body [
|
||||
}
|
||||
|
||||
func (c *baseClientImpl) ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearchResponse, error) {
|
||||
var err error
|
||||
multiRequests := c.createMultiSearchRequests(r.Requests)
|
||||
queryParams := c.getMultiSearchQueryParameters()
|
||||
_, span := c.tracer.Start(c.ctx, "datasource.elasticsearch.queryData.executeMultisearch")
|
||||
span.SetAttributes("queryParams", queryParams, attribute.Key("queryParams").String(queryParams))
|
||||
span.SetAttributes("url", c.ds.URL, attribute.Key("url").String(c.ds.URL))
|
||||
defer func() {
|
||||
if err != nil {
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
}
|
||||
span.End()
|
||||
}()
|
||||
|
||||
start := time.Now()
|
||||
clientRes, err := c.executeBatchRequest("_msearch", queryParams, multiRequests)
|
||||
if err != nil {
|
||||
@ -191,6 +208,14 @@ func (c *baseClientImpl) ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearch
|
||||
start = time.Now()
|
||||
var msr MultiSearchResponse
|
||||
dec := json.NewDecoder(res.Body)
|
||||
_, resSpan := c.tracer.Start(c.ctx, "datasource.elasticsearch.queryData.executeMultisearch.decodeResponse")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
resSpan.RecordError(err)
|
||||
resSpan.SetStatus(codes.Error, err.Error())
|
||||
}
|
||||
resSpan.End()
|
||||
}()
|
||||
err = dec.Decode(&msr)
|
||||
if err != nil {
|
||||
c.logger.Error("Failed to decode response from Elasticsearch", "error", err, "duration", time.Since(start))
|
||||
|
@ -15,6 +15,7 @@ import (
|
||||
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
)
|
||||
|
||||
func TestClient_ExecuteMultisearch(t *testing.T) {
|
||||
@ -67,7 +68,7 @@ func TestClient_ExecuteMultisearch(t *testing.T) {
|
||||
To: to,
|
||||
}
|
||||
|
||||
c, err := NewClient(context.Background(), &ds, timeRange, log.New("test", "test"))
|
||||
c, err := NewClient(context.Background(), &ds, timeRange, log.New("test", "test"), tracing.NewFakeTracer())
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, c)
|
||||
|
||||
@ -189,7 +190,7 @@ func TestClient_Index(t *testing.T) {
|
||||
To: to,
|
||||
}
|
||||
|
||||
c, err := NewClient(context.Background(), &ds, timeRange, log.New("test", "test"))
|
||||
c, err := NewClient(context.Background(), &ds, timeRange, log.New("test", "test"), tracing.NewFakeTracer())
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, c)
|
||||
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
||||
)
|
||||
|
||||
@ -24,14 +25,16 @@ type elasticsearchDataQuery struct {
|
||||
dataQueries []backend.DataQuery
|
||||
logger log.Logger
|
||||
ctx context.Context
|
||||
tracer tracing.Tracer
|
||||
}
|
||||
|
||||
var newElasticsearchDataQuery = func(ctx context.Context, client es.Client, dataQuery []backend.DataQuery, logger log.Logger) *elasticsearchDataQuery {
|
||||
var newElasticsearchDataQuery = func(ctx context.Context, client es.Client, dataQuery []backend.DataQuery, logger log.Logger, tracer tracing.Tracer) *elasticsearchDataQuery {
|
||||
return &elasticsearchDataQuery{
|
||||
client: client,
|
||||
dataQueries: dataQuery,
|
||||
logger: logger,
|
||||
ctx: ctx,
|
||||
tracer: tracer,
|
||||
}
|
||||
}
|
||||
|
||||
@ -70,7 +73,7 @@ func (e *elasticsearchDataQuery) execute() (*backend.QueryDataResponse, error) {
|
||||
return &backend.QueryDataResponse{}, err
|
||||
}
|
||||
|
||||
return parseResponse(e.ctx, res.Responses, queries, e.client.GetConfiguredFields(), e.logger)
|
||||
return parseResponse(e.ctx, res.Responses, queries, e.client.GetConfiguredFields(), e.logger, e.tracer)
|
||||
}
|
||||
|
||||
func (e *elasticsearchDataQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilder, from, to int64) error {
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
||||
)
|
||||
|
||||
@ -1818,6 +1819,6 @@ func executeElasticsearchDataQuery(c es.Client, body string, from, to time.Time)
|
||||
},
|
||||
},
|
||||
}
|
||||
query := newElasticsearchDataQuery(context.Background(), c, dataRequest.Queries, log.New("test.logger"))
|
||||
query := newElasticsearchDataQuery(context.Background(), c, dataRequest.Queries, log.New("test.logger"), tracing.NewFakeTracer())
|
||||
return query.execute()
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/httpclient"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
ngalertmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
||||
)
|
||||
@ -29,39 +30,43 @@ var eslog = log.New("tsdb.elasticsearch")
|
||||
type Service struct {
|
||||
httpClientProvider httpclient.Provider
|
||||
im instancemgmt.InstanceManager
|
||||
tracer tracing.Tracer
|
||||
logger *log.ConcreteLogger
|
||||
}
|
||||
|
||||
func ProvideService(httpClientProvider httpclient.Provider) *Service {
|
||||
func ProvideService(httpClientProvider httpclient.Provider, tracer tracing.Tracer) *Service {
|
||||
return &Service{
|
||||
im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider)),
|
||||
httpClientProvider: httpClientProvider,
|
||||
tracer: tracer,
|
||||
logger: eslog,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
dsInfo, err := s.getDSInfo(ctx, req.PluginContext)
|
||||
_, fromAlert := req.Headers[ngalertmodels.FromAlertHeaderName]
|
||||
logger := eslog.FromContext(ctx).New("fromAlert", fromAlert)
|
||||
logger := s.logger.FromContext(ctx).New("fromAlert", fromAlert)
|
||||
|
||||
if err != nil {
|
||||
logger.Error("Failed to get data source info", "error", err)
|
||||
return &backend.QueryDataResponse{}, err
|
||||
}
|
||||
|
||||
return queryData(ctx, req.Queries, dsInfo, logger)
|
||||
return queryData(ctx, req.Queries, dsInfo, logger, s.tracer)
|
||||
}
|
||||
|
||||
// separate function to allow testing the whole transformation and query flow
|
||||
func queryData(ctx context.Context, queries []backend.DataQuery, dsInfo *es.DatasourceInfo, logger log.Logger) (*backend.QueryDataResponse, error) {
|
||||
func queryData(ctx context.Context, queries []backend.DataQuery, dsInfo *es.DatasourceInfo, logger log.Logger, tracer tracing.Tracer) (*backend.QueryDataResponse, error) {
|
||||
if len(queries) == 0 {
|
||||
return &backend.QueryDataResponse{}, fmt.Errorf("query contains no queries")
|
||||
}
|
||||
|
||||
client, err := es.NewClient(ctx, dsInfo, queries[0].TimeRange, logger)
|
||||
client, err := es.NewClient(ctx, dsInfo, queries[0].TimeRange, logger, tracer)
|
||||
if err != nil {
|
||||
return &backend.QueryDataResponse{}, err
|
||||
}
|
||||
query := newElasticsearchDataQuery(ctx, client, queries, logger)
|
||||
query := newElasticsearchDataQuery(ctx, client, queries, logger, tracer)
|
||||
return query.execute()
|
||||
}
|
||||
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
||||
)
|
||||
|
||||
@ -138,7 +139,7 @@ func queryDataTestWithResponseCode(queriesBytes []byte, responseStatusCode int,
|
||||
return nil
|
||||
})
|
||||
|
||||
result, err := queryData(context.Background(), queries, dsInfo, log.New("test.logger"))
|
||||
result, err := queryData(context.Background(), queries, dsInfo, log.New("test.logger"), tracing.NewFakeTracer())
|
||||
if err != nil {
|
||||
return queryDataTestResult{}, err
|
||||
}
|
||||
|
@ -13,9 +13,12 @@ import (
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
||||
)
|
||||
|
||||
@ -41,21 +44,29 @@ const (
|
||||
|
||||
var searchWordsRegex = regexp.MustCompile(regexp.QuoteMeta(es.HighlightPreTagsString) + `(.*?)` + regexp.QuoteMeta(es.HighlightPostTagsString))
|
||||
|
||||
func parseResponse(ctx context.Context, responses []*es.SearchResponse, targets []*Query, configuredFields es.ConfiguredFields, logger log.Logger) (*backend.QueryDataResponse, error) {
|
||||
func parseResponse(ctx context.Context, responses []*es.SearchResponse, targets []*Query, configuredFields es.ConfiguredFields, logger log.Logger, tracer tracing.Tracer) (*backend.QueryDataResponse, error) {
|
||||
result := backend.QueryDataResponse{
|
||||
Responses: backend.Responses{},
|
||||
}
|
||||
if responses == nil {
|
||||
return &result, nil
|
||||
}
|
||||
ctx, span := tracer.Start(ctx, "datasource.elastic.parseResponse")
|
||||
span.SetAttributes("responseLength", len(responses), attribute.Key("responseLength").Int(len(responses)))
|
||||
defer span.End()
|
||||
|
||||
for i, res := range responses {
|
||||
_, resSpan := tracer.Start(ctx, "datasource.elastic.parseResponse.response")
|
||||
resSpan.SetAttributes("queryMetricType", targets[i].Metrics[0].Type, attribute.Key("queryMetricType").String(targets[i].Metrics[0].Type))
|
||||
start := time.Now()
|
||||
target := targets[i]
|
||||
|
||||
if res.Error != nil {
|
||||
mt, _ := json.Marshal(target)
|
||||
me, _ := json.Marshal(res.Error)
|
||||
resSpan.RecordError(errors.New(string(me)))
|
||||
resSpan.SetStatus(codes.Error, string(me))
|
||||
resSpan.End()
|
||||
logger.Error("Processing error response from Elasticsearch", "error", string(me), "query", string(mt))
|
||||
errResult := getErrorFromElasticResponse(res)
|
||||
result.Responses[target.RefID] = backend.DataResponse{
|
||||
@ -94,6 +105,11 @@ func parseResponse(ctx context.Context, responses []*es.SearchResponse, targets
|
||||
logger.Debug("Processed metric query response")
|
||||
if err != nil {
|
||||
mt, _ := json.Marshal(target)
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
resSpan.RecordError(err)
|
||||
resSpan.SetStatus(codes.Error, err.Error())
|
||||
resSpan.End()
|
||||
logger.Error("Error processing buckets", "error", err, "query", string(mt), "aggregationsLength", len(res.Aggregations))
|
||||
return &backend.QueryDataResponse{}, err
|
||||
}
|
||||
@ -102,9 +118,9 @@ func parseResponse(ctx context.Context, responses []*es.SearchResponse, targets
|
||||
|
||||
result.Responses[target.RefID] = queryRes
|
||||
}
|
||||
resSpan.End()
|
||||
logger.Info("Finished processing of response", "duration", time.Since(start), "stage", es.StageParseResponse)
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
|
@ -15,6 +15,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
||||
)
|
||||
|
||||
@ -3533,7 +3534,7 @@ func parseTestResponse(tsdbQueries map[string]string, responseBody string) (*bac
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return parseResponse(context.Background(), response.Responses, queries, configuredFields, log.New("test.logger"))
|
||||
return parseResponse(context.Background(), response.Responses, queries, configuredFields, log.New("test.logger"), tracing.NewFakeTracer())
|
||||
}
|
||||
|
||||
func requireTimeValue(t *testing.T, expected int64, frame *data.Frame, index int) {
|
||||
@ -3583,7 +3584,7 @@ func requireStringAt(t *testing.T, expected string, field *data.Field, index int
|
||||
|
||||
func requireFloatAt(t *testing.T, expected float64, field *data.Field, index int) {
|
||||
v := field.At(index).(*float64)
|
||||
require.Equal(t, expected, *v, fmt.Sprintf("wrong flaot at index %v", index))
|
||||
require.Equal(t, expected, *v, fmt.Sprintf("wrong float at index %v", index))
|
||||
}
|
||||
|
||||
func requireTimeSeriesName(t *testing.T, expected string, frame *data.Frame) {
|
||||
|
Reference in New Issue
Block a user