From 7092fd269d79e65a20cdb17ee43fcc78dd61a56c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ida=20=C5=A0tambuk?= Date: Mon, 10 Feb 2025 18:33:36 +0100 Subject: [PATCH] Elasticsearch: Remove frontend testDatasource method (#99894) --- pkg/tsdb/elasticsearch/client/client.go | 2 +- .../elasticsearch/client/index_pattern.go | 2 +- .../client/index_pattern_test.go | 2 +- pkg/tsdb/elasticsearch/healthcheck.go | 170 ++++++++++++++---- pkg/tsdb/elasticsearch/healthcheck_test.go | 100 ++++++++--- .../datasource/elasticsearch/datasource.ts | 35 +--- 6 files changed, 219 insertions(+), 92 deletions(-) diff --git a/pkg/tsdb/elasticsearch/client/client.go b/pkg/tsdb/elasticsearch/client/client.go index 6bffc08ba72..73d03dcba7b 100644 --- a/pkg/tsdb/elasticsearch/client/client.go +++ b/pkg/tsdb/elasticsearch/client/client.go @@ -59,7 +59,7 @@ type Client interface { var NewClient = func(ctx context.Context, ds *DatasourceInfo, logger log.Logger) (Client, error) { logger = logger.FromContext(ctx).With("entity", "client") - ip, err := newIndexPattern(ds.Interval, ds.Database) + ip, err := NewIndexPattern(ds.Interval, ds.Database) if err != nil { logger.Error("Failed creating index pattern", "error", err, "interval", ds.Interval, "index", ds.Database) return nil, err diff --git a/pkg/tsdb/elasticsearch/client/index_pattern.go b/pkg/tsdb/elasticsearch/client/index_pattern.go index a7a7a6096f8..ff01d759c5c 100644 --- a/pkg/tsdb/elasticsearch/client/index_pattern.go +++ b/pkg/tsdb/elasticsearch/client/index_pattern.go @@ -73,7 +73,7 @@ type IndexPattern interface { GetIndices(timeRange backend.TimeRange) ([]string, error) } -var newIndexPattern = func(interval string, pattern string) (IndexPattern, error) { +var NewIndexPattern = func(interval string, pattern string) (IndexPattern, error) { if interval == noInterval { return &staticIndexPattern{indexName: pattern}, nil } diff --git a/pkg/tsdb/elasticsearch/client/index_pattern_test.go b/pkg/tsdb/elasticsearch/client/index_pattern_test.go index a309c07a472..a30408cf4a8 100644 --- a/pkg/tsdb/elasticsearch/client/index_pattern_test.go +++ b/pkg/tsdb/elasticsearch/client/index_pattern_test.go @@ -290,7 +290,7 @@ func TestIndexPattern(t *testing.T) { func indexPatternScenario(t *testing.T, interval string, pattern string, timeRange backend.TimeRange, fn func(indices []string)) { testName := fmt.Sprintf("Index pattern (interval=%s, index=%s", interval, pattern) t.Run(testName, func(t *testing.T) { - ip, err := newIndexPattern(interval, pattern) + ip, err := NewIndexPattern(interval, pattern) require.NoError(t, err) require.NotNil(t, ip) indices, err := ip.GetIndices(timeRange) diff --git a/pkg/tsdb/elasticsearch/healthcheck.go b/pkg/tsdb/elasticsearch/healthcheck.go index 6218aea2754..3ef6f524a7b 100644 --- a/pkg/tsdb/elasticsearch/healthcheck.go +++ b/pkg/tsdb/elasticsearch/healthcheck.go @@ -8,12 +8,15 @@ import ( "net/http" "net/url" "path" + "strings" "time" "github.com/grafana/grafana-plugin-sdk-go/backend" - "github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource" + es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client" ) +const ErrorBodyMaxSize = 200 + func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { logger := s.logger.FromContext(ctx) @@ -22,59 +25,56 @@ func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthReque logger.Error("Failed to get data source info", "error", err) return &backend.CheckHealthResult{ Status: backend.HealthStatusUnknown, - Message: "Failed to get data source info", - }, err + Message: "Health check failed: Failed to get data source info", + }, nil } - esUrl, err := url.Parse(ds.URL) + healthStatusUrl, err := url.Parse(ds.URL) if err != nil { - logger.Error("Failed to parse data source URL", "error", err, "url", ds.URL) + logger.Error("Failed to parse data source URL", "error", err) return &backend.CheckHealthResult{ Status: backend.HealthStatusUnknown, Message: "Failed to parse data source URL", - }, err + }, nil } - esUrl.Path = path.Join(esUrl.Path, "_cluster/health") - esUrl.RawQuery = "wait_for_status=yellow" + // check that ES is healthy + healthStatusUrl.Path = path.Join(healthStatusUrl.Path, "_cluster/health") + healthStatusUrl.RawQuery = "wait_for_status=yellow" - request, err := http.NewRequestWithContext(ctx, "GET", esUrl.String(), nil) + request, err := http.NewRequestWithContext(ctx, http.MethodGet, healthStatusUrl.String(), nil) if err != nil { - logger.Error("Failed to create request", "error", err, "url", esUrl.String()) + logger.Error("Failed to create request", "error", err, "url", healthStatusUrl.String()) return &backend.CheckHealthResult{ Status: backend.HealthStatusUnknown, Message: "Failed to create request", - }, err + }, nil } start := time.Now() - logger.Debug("Sending healthcheck request to Elasticsearch", "url", esUrl.String()) + logger.Debug("Sending healthcheck request to Elasticsearch", "url", healthStatusUrl.String()) response, err := ds.HTTPClient.Do(request) if err != nil { - logger.Error("Failed to do healthcheck request", "error", err, "url", esUrl.String()) - if backend.IsDownstreamHTTPError(err) { - err = errorsource.DownstreamError(err, false) - } + logger.Error("Failed to connect to Elasticsearch", "error", err, "url", healthStatusUrl.String()) return &backend.CheckHealthResult{ - Status: backend.HealthStatusUnknown, - Message: "Failed to do healthcheck request", - }, err + Status: backend.HealthStatusError, + Message: "Health check failed: Failed to connect to Elasticsearch", + }, nil } if response.StatusCode == http.StatusRequestTimeout { return &backend.CheckHealthResult{ Status: backend.HealthStatusError, - Message: "Elasticsearch data source is not healthy", + Message: "Health check failed: Elasticsearch data source is not healthy. Request timed out", }, nil } if response.StatusCode >= 400 { - errWithSource := errorsource.SourceError(backend.ErrorSourceFromHTTPStatus(response.StatusCode), fmt.Errorf("unexpected status code: %d", response.StatusCode), false) return &backend.CheckHealthResult{ Status: backend.HealthStatusError, - Message: fmt.Sprintf("Elasticsearch data source is not healthy. Status: %s", response.Status), - }, errWithSource + Message: fmt.Sprintf("Health check failed: Elasticsearch data source is not healthy. Status: %s", response.Status), + }, nil } logger.Info("Response received from Elasticsearch", "statusCode", response.StatusCode, "status", "ok", "duration", time.Since(start)) @@ -90,31 +90,131 @@ func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthReque logger.Error("Error reading response body bytes", "error", err) return &backend.CheckHealthResult{ Status: backend.HealthStatusUnknown, - Message: "Failed to read response", - }, err + Message: "Health check failed: Failed to read response", + }, nil } jsonData := map[string]any{} err = json.Unmarshal(body, &jsonData) if err != nil { - logger.Error("Error during json unmarshal of the body", "error", err) + truncatedBody := string(body) + if len(truncatedBody) > ErrorBodyMaxSize { + truncatedBody = truncatedBody[:ErrorBodyMaxSize] + "..." + } return &backend.CheckHealthResult{ Status: backend.HealthStatusUnknown, - Message: "Failed to unmarshal response", - }, err + Message: fmt.Sprintf("Health check failed: Failed to parse response from Elasticsearch. Response received: %s", truncatedBody), + }, nil } - status := backend.HealthStatusOk - message := "Elasticsearch data source is healthy" - if jsonData["status"] == "red" { - status = backend.HealthStatusError - message = "Elasticsearch data source is not healthy" + return &backend.CheckHealthResult{ + Status: backend.HealthStatusError, + Message: "Health check failed: Elasticsearch data source is not healthy", + }, nil + } + + successMessage := "Elasticsearch data source is healthy." + indexWarningMessage := "" + + // validate index and time field + cfg := backend.GrafanaConfigFromContext(ctx) + crossClusterSearchEnabled := cfg.FeatureToggles().IsEnabled("elasticsearchCrossClusterSearch") + + if crossClusterSearchEnabled { + message, level := validateIndex(ctx, ds) + if level == "warning" { + indexWarningMessage = message + } + if level == "error" { + return &backend.CheckHealthResult{ + Status: backend.HealthStatusError, + Message: message, + }, nil + } + } + + if indexWarningMessage != "" { + successMessage = fmt.Sprintf("%s Warning: %s", successMessage, indexWarningMessage) } return &backend.CheckHealthResult{ - Status: status, - Message: message, + Status: backend.HealthStatusOk, + Message: successMessage, }, nil } + +func validateIndex(ctx context.Context, ds *es.DatasourceInfo) (message string, level string) { + // validate that the index exist and has date field + ip, err := es.NewIndexPattern(ds.Interval, ds.Database) + if err != nil { + return fmt.Sprintf("Failed to get build index pattern: %s", err), "error" + } + + indices, err := ip.GetIndices(backend.TimeRange{ + From: time.Now().UTC(), + To: time.Now().UTC(), + }) + if err != nil { + return fmt.Sprintf("Failed to get index pattern: %s", err), "error" + } + + indexList := strings.Join(indices, ",") + + validateUrl := fmt.Sprintf("%s/%s/_field_caps?fields=%s", ds.URL, indexList, ds.ConfiguredFields.TimeField) + if indexList == "" || strings.Replace(indexList, ",", "", -1) == "" { + validateUrl = fmt.Sprintf("%s/_field_caps?fields=%s", ds.URL, ds.ConfiguredFields.TimeField) + } + + request, err := http.NewRequestWithContext(ctx, http.MethodGet, validateUrl, nil) + if err != nil { + return fmt.Sprint("Failed to create request", "error", err, "url", validateUrl), "error" + } + response, err := ds.HTTPClient.Do(request) + if err != nil { + return fmt.Sprint("Failed to fetch field capabilities", "error", err, "url", validateUrl), "error" + } + defer func() { + if err := response.Body.Close(); err != nil { + backend.Logger.Warn("Failed to close response body", "error", err) + } + }() + + fieldCaps := map[string]any{} + body, err := io.ReadAll(response.Body) + if err != nil { + return "Could not read response body while checking time field", "error" + } + err = json.Unmarshal(body, &fieldCaps) + if err != nil { + return "Failed to unmarshal field capabilities response", "error" + } + if fieldCaps["error"] != nil { + if errorMessage, ok := fieldCaps["error"].(map[string]any)["reason"].(string); ok { + return fmt.Sprintf("Error validating index: %s", errorMessage), "warning" + } else { + return "Error validating index", "warning" + } + } + + fields, ok := fieldCaps["fields"].(map[string]any) + if !ok { + return "Failed to parse fields from response", "error" + } + if len(fields) == 0 { + return fmt.Sprintf("Could not find field %s in index", ds.ConfiguredFields.TimeField), "warning" + } + + timeFieldInfo, ok := fields[ds.ConfiguredFields.TimeField].(map[string]any) + if !ok { + return "Failed to parse time field info from response", "error" + } + + dateTypeField, ok := timeFieldInfo["date"].(map[string]any) + if !ok || dateTypeField == nil { + return fmt.Sprintf("Could not find time field '%s' with type date in index", ds.ConfiguredFields.TimeField), "warning" + } + + return "", "" +} diff --git a/pkg/tsdb/elasticsearch/healthcheck_test.go b/pkg/tsdb/elasticsearch/healthcheck_test.go index 93b6c1252cd..b3f6dc97c93 100644 --- a/pkg/tsdb/elasticsearch/healthcheck_test.go +++ b/pkg/tsdb/elasticsearch/healthcheck_test.go @@ -11,74 +11,132 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient" "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" "github.com/grafana/grafana-plugin-sdk-go/backend/log" + "github.com/grafana/grafana-plugin-sdk-go/experimental/featuretoggles" es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client" "github.com/stretchr/testify/assert" ) +var mockedCfg = backend.WithGrafanaConfig(context.Background(), backend.NewGrafanaCfg(map[string]string{featuretoggles.EnabledFeatures: "elasticsearchCrossClusterSearch"})) + func Test_Healthcheck_OK(t *testing.T) { - service := GetMockService(http.StatusOK, "200 OK") + service := GetMockService(http.StatusOK, "200 OK", `{"status":"green"}`, `{"fields":{"timestamp":{"date":{"metadata_field":true}}}}`) res, _ := service.CheckHealth(context.Background(), &backend.CheckHealthRequest{ PluginContext: backend.PluginContext{}, Headers: nil, }) assert.Equal(t, backend.HealthStatusOk, res.Status) - assert.Equal(t, "Elasticsearch data source is healthy", res.Message) + assert.Equal(t, "Elasticsearch data source is healthy.", res.Message) } func Test_Healthcheck_Timeout(t *testing.T) { - service := GetMockService(http.StatusRequestTimeout, "408 Request Timeout") + service := GetMockService(http.StatusRequestTimeout, "408 Request Timeout", `{"status":"red"}`, `{"fields":{"timestamp":{"date":{"metadata_field":true}}}}`) res, _ := service.CheckHealth(context.Background(), &backend.CheckHealthRequest{ PluginContext: backend.PluginContext{}, Headers: nil, }) assert.Equal(t, backend.HealthStatusError, res.Status) - assert.Equal(t, "Elasticsearch data source is not healthy", res.Message) + assert.Equal(t, "Health check failed: Elasticsearch data source is not healthy. Request timed out", res.Message) } func Test_Healthcheck_Error(t *testing.T) { - service := GetMockService(http.StatusBadGateway, "502 Bad Gateway") + service := GetMockService(http.StatusBadGateway, "502 Bad Gateway", `{"status":"red"}`, `{"fields":{"timestamp":{"date":{"metadata_field":true}}}}`) res, _ := service.CheckHealth(context.Background(), &backend.CheckHealthRequest{ PluginContext: backend.PluginContext{}, Headers: nil, }) assert.Equal(t, backend.HealthStatusError, res.Status) - assert.Equal(t, "Elasticsearch data source is not healthy. Status: 502 Bad Gateway", res.Message) + assert.Equal(t, "Health check failed: Elasticsearch data source is not healthy. Status: 502 Bad Gateway", res.Message) +} + +func Test_validateIndex_Warning_ErrorValidatingIndex(t *testing.T) { + service := GetMockService(http.StatusOK, "200 OK", `{"status":"green"}`, `{"error":{"reason":"index_not_found"}}`) + res, _ := service.CheckHealth(mockedCfg, &backend.CheckHealthRequest{ + PluginContext: backend.PluginContext{}, + Headers: nil, + }) + assert.Equal(t, backend.HealthStatusOk, res.Status) + assert.Equal(t, "Elasticsearch data source is healthy. Warning: Error validating index: index_not_found", res.Message) +} + +func Test_validateIndex_Warning_WrongTimestampType(t *testing.T) { + service := GetMockService(http.StatusOK, "200 OK", `{"status":"green"}`, `{"fields":{"timestamp":{"float":{"metadata_field":true}}}}`) + res, _ := service.CheckHealth(mockedCfg, &backend.CheckHealthRequest{ + PluginContext: backend.PluginContext{}, + Headers: nil, + }) + assert.Equal(t, backend.HealthStatusOk, res.Status) + assert.Equal(t, "Elasticsearch data source is healthy. Warning: Could not find time field 'timestamp' with type date in index", res.Message) +} +func Test_validateIndex_Error_FailedToUnmarshalValidateResponse(t *testing.T) { + service := GetMockService(http.StatusOK, "200 OK", `{"status":"green"}`, `\\\///{"fields":null}"`) + res, _ := service.CheckHealth(mockedCfg, &backend.CheckHealthRequest{ + PluginContext: backend.PluginContext{}, + Headers: nil, + }) + assert.Equal(t, backend.HealthStatusError, res.Status) + assert.Equal(t, "Failed to unmarshal field capabilities response", res.Message) +} +func Test_validateIndex_Success_SuccessValidatingIndex(t *testing.T) { + service := GetMockService(http.StatusOK, "200 OK", `{"status":"green"}`, `{"fields":{"timestamp":{"date":{"metadata_field":true}}}}`) + res, _ := service.CheckHealth(mockedCfg, &backend.CheckHealthRequest{ + PluginContext: backend.PluginContext{}, + Headers: nil, + }) + assert.Equal(t, backend.HealthStatusOk, res.Status) + assert.Equal(t, "Elasticsearch data source is healthy.", res.Message) } type FakeRoundTripper struct { - statusCode int - status string + statusCode int + status string + index int + elasticSearchResponse string + fieldCapsResponse string } func (fakeRoundTripper *FakeRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { var res *http.Response - if fakeRoundTripper.statusCode == http.StatusOK { + if fakeRoundTripper.index == 0 { + if fakeRoundTripper.statusCode == http.StatusOK { + res = &http.Response{ + StatusCode: http.StatusOK, + Status: "200 OK", + Body: io.NopCloser(bytes.NewBufferString(fakeRoundTripper.elasticSearchResponse)), + } + } else { + res = &http.Response{ + StatusCode: fakeRoundTripper.statusCode, + Status: fakeRoundTripper.status, + Body: io.NopCloser(bytes.NewBufferString(fakeRoundTripper.elasticSearchResponse)), + } + } + fakeRoundTripper.index++ + } else { res = &http.Response{ StatusCode: http.StatusOK, Status: "200 OK", - Body: io.NopCloser(bytes.NewBufferString("{\"status\":\"green\"}")), - } - } else { - res = &http.Response{ - StatusCode: fakeRoundTripper.statusCode, - Status: fakeRoundTripper.status, - Body: io.NopCloser(bytes.NewBufferString("{\"status\":\"red\"}")), + Body: io.NopCloser(bytes.NewBufferString(fakeRoundTripper.fieldCapsResponse)), } } return res, nil } type FakeInstanceManager struct { - statusCode int - status string + statusCode int + status string + elasticSearchResponse string + fieldCapsResponse string } func (fakeInstanceManager *FakeInstanceManager) Get(tx context.Context, pluginContext backend.PluginContext) (instancemgmt.Instance, error) { httpClient, _ := httpclient.New(httpclient.Options{}) - httpClient.Transport = &FakeRoundTripper{statusCode: fakeInstanceManager.statusCode, status: fakeInstanceManager.status} + httpClient.Transport = &FakeRoundTripper{statusCode: fakeInstanceManager.statusCode, status: fakeInstanceManager.status, elasticSearchResponse: fakeInstanceManager.elasticSearchResponse, fieldCapsResponse: fakeInstanceManager.fieldCapsResponse, index: 0} return es.DatasourceInfo{ HTTPClient: httpClient, + ConfiguredFields: es.ConfiguredFields{ + TimeField: "timestamp", + }, }, nil } @@ -86,9 +144,9 @@ func (*FakeInstanceManager) Do(_ context.Context, _ backend.PluginContext, _ ins return nil } -func GetMockService(statusCode int, status string) *Service { +func GetMockService(statusCode int, status string, elasticSearchResponse string, fieldCapsResponse string) *Service { return &Service{ - im: &FakeInstanceManager{statusCode: statusCode, status: status}, + im: &FakeInstanceManager{statusCode: statusCode, status: status, elasticSearchResponse: elasticSearchResponse, fieldCapsResponse: fieldCapsResponse}, logger: log.New(), } } diff --git a/public/app/plugins/datasource/elasticsearch/datasource.ts b/public/app/plugins/datasource/elasticsearch/datasource.ts index 99b68fb3aa8..34d22b1e33a 100644 --- a/public/app/plugins/datasource/elasticsearch/datasource.ts +++ b/public/app/plugins/datasource/elasticsearch/datasource.ts @@ -1,4 +1,4 @@ -import { cloneDeep, first as _first, isNumber, isString, map as _map, find, isObject } from 'lodash'; +import { cloneDeep, first as _first, isNumber, isString, map as _map, isObject } from 'lodash'; import { from, generate, lastValueFrom, Observable, of } from 'rxjs'; import { catchError, first, map, mergeMap, skipWhile, throwIfEmpty, tap } from 'rxjs/operators'; import { SemVer } from 'semver'; @@ -83,7 +83,7 @@ import { isElasticsearchResponseWithHits, ElasticsearchHits, } from './types'; -import { getScriptValue, isSupportedVersion, isTimeSeriesQuery, unsupportedVersionMessage } from './utils'; +import { getScriptValue, isTimeSeriesQuery } from './utils'; export const REF_ID_STARTER_LOG_VOLUME = 'log-volume-'; export const REF_ID_STARTER_LOG_SAMPLE = 'log-sample-'; @@ -441,37 +441,6 @@ export class ElasticDatasource return queries.map((q) => this.applyTemplateVariables(q, scopedVars, filters)); } - /** - * @todo Remove as we have health checks in the backend - */ - async testDatasource() { - // we explicitly ask for uncached, "fresh" data here - const dbVersion = await this.getDatabaseVersion(false); - // if we are not able to determine the elastic-version, we assume it is a good version. - const isSupported = dbVersion != null ? isSupportedVersion(dbVersion) : true; - const versionMessage = isSupported ? '' : `WARNING: ${unsupportedVersionMessage} `; - // validate that the index exist and has date field - return lastValueFrom( - this.getFields(['date']).pipe( - mergeMap((dateFields) => { - const timeField = find(dateFields, { text: this.timeField }); - if (!timeField) { - return of({ - status: 'error', - message: 'No date field named ' + this.timeField + ' found', - }); - } - return of({ status: 'success', message: `${versionMessage}Data source successfully connected.` }); - }), - catchError((err) => { - const infoInParentheses = err.message ? ` (${err.message})` : ''; - const message = `Unable to connect with Elasticsearch${infoInParentheses}. Please check the server logs for more details.`; - return of({ status: 'error', message }); - }) - ) - ); - } - // Private method used in `getTerms` to get the header for the Elasticsearch query private getQueryHeader(searchType: string, timeFrom?: DateTime, timeTo?: DateTime): string { const queryHeader = {