mirror of
https://github.com/grafana/grafana.git
synced 2025-07-30 01:12:09 +08:00
Elasticsearch: Remove frontend testDatasource method (#99894)
This commit is contained in:
@ -59,7 +59,7 @@ type Client interface {
|
|||||||
var NewClient = func(ctx context.Context, ds *DatasourceInfo, logger log.Logger) (Client, error) {
|
var NewClient = func(ctx context.Context, ds *DatasourceInfo, logger log.Logger) (Client, error) {
|
||||||
logger = logger.FromContext(ctx).With("entity", "client")
|
logger = logger.FromContext(ctx).With("entity", "client")
|
||||||
|
|
||||||
ip, err := newIndexPattern(ds.Interval, ds.Database)
|
ip, err := NewIndexPattern(ds.Interval, ds.Database)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("Failed creating index pattern", "error", err, "interval", ds.Interval, "index", ds.Database)
|
logger.Error("Failed creating index pattern", "error", err, "interval", ds.Interval, "index", ds.Database)
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -73,7 +73,7 @@ type IndexPattern interface {
|
|||||||
GetIndices(timeRange backend.TimeRange) ([]string, error)
|
GetIndices(timeRange backend.TimeRange) ([]string, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
var newIndexPattern = func(interval string, pattern string) (IndexPattern, error) {
|
var NewIndexPattern = func(interval string, pattern string) (IndexPattern, error) {
|
||||||
if interval == noInterval {
|
if interval == noInterval {
|
||||||
return &staticIndexPattern{indexName: pattern}, nil
|
return &staticIndexPattern{indexName: pattern}, nil
|
||||||
}
|
}
|
||||||
|
@ -290,7 +290,7 @@ func TestIndexPattern(t *testing.T) {
|
|||||||
func indexPatternScenario(t *testing.T, interval string, pattern string, timeRange backend.TimeRange, fn func(indices []string)) {
|
func indexPatternScenario(t *testing.T, interval string, pattern string, timeRange backend.TimeRange, fn func(indices []string)) {
|
||||||
testName := fmt.Sprintf("Index pattern (interval=%s, index=%s", interval, pattern)
|
testName := fmt.Sprintf("Index pattern (interval=%s, index=%s", interval, pattern)
|
||||||
t.Run(testName, func(t *testing.T) {
|
t.Run(testName, func(t *testing.T) {
|
||||||
ip, err := newIndexPattern(interval, pattern)
|
ip, err := NewIndexPattern(interval, pattern)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotNil(t, ip)
|
require.NotNil(t, ip)
|
||||||
indices, err := ip.GetIndices(timeRange)
|
indices, err := ip.GetIndices(timeRange)
|
||||||
|
@ -8,12 +8,15 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"path"
|
"path"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
|
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const ErrorBodyMaxSize = 200
|
||||||
|
|
||||||
func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
|
func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
|
||||||
logger := s.logger.FromContext(ctx)
|
logger := s.logger.FromContext(ctx)
|
||||||
|
|
||||||
@ -22,59 +25,56 @@ func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthReque
|
|||||||
logger.Error("Failed to get data source info", "error", err)
|
logger.Error("Failed to get data source info", "error", err)
|
||||||
return &backend.CheckHealthResult{
|
return &backend.CheckHealthResult{
|
||||||
Status: backend.HealthStatusUnknown,
|
Status: backend.HealthStatusUnknown,
|
||||||
Message: "Failed to get data source info",
|
Message: "Health check failed: Failed to get data source info",
|
||||||
}, err
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
esUrl, err := url.Parse(ds.URL)
|
healthStatusUrl, err := url.Parse(ds.URL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("Failed to parse data source URL", "error", err, "url", ds.URL)
|
logger.Error("Failed to parse data source URL", "error", err)
|
||||||
return &backend.CheckHealthResult{
|
return &backend.CheckHealthResult{
|
||||||
Status: backend.HealthStatusUnknown,
|
Status: backend.HealthStatusUnknown,
|
||||||
Message: "Failed to parse data source URL",
|
Message: "Failed to parse data source URL",
|
||||||
}, err
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
esUrl.Path = path.Join(esUrl.Path, "_cluster/health")
|
// check that ES is healthy
|
||||||
esUrl.RawQuery = "wait_for_status=yellow"
|
healthStatusUrl.Path = path.Join(healthStatusUrl.Path, "_cluster/health")
|
||||||
|
healthStatusUrl.RawQuery = "wait_for_status=yellow"
|
||||||
|
|
||||||
request, err := http.NewRequestWithContext(ctx, "GET", esUrl.String(), nil)
|
request, err := http.NewRequestWithContext(ctx, http.MethodGet, healthStatusUrl.String(), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("Failed to create request", "error", err, "url", esUrl.String())
|
logger.Error("Failed to create request", "error", err, "url", healthStatusUrl.String())
|
||||||
return &backend.CheckHealthResult{
|
return &backend.CheckHealthResult{
|
||||||
Status: backend.HealthStatusUnknown,
|
Status: backend.HealthStatusUnknown,
|
||||||
Message: "Failed to create request",
|
Message: "Failed to create request",
|
||||||
}, err
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
logger.Debug("Sending healthcheck request to Elasticsearch", "url", esUrl.String())
|
logger.Debug("Sending healthcheck request to Elasticsearch", "url", healthStatusUrl.String())
|
||||||
response, err := ds.HTTPClient.Do(request)
|
response, err := ds.HTTPClient.Do(request)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("Failed to do healthcheck request", "error", err, "url", esUrl.String())
|
logger.Error("Failed to connect to Elasticsearch", "error", err, "url", healthStatusUrl.String())
|
||||||
if backend.IsDownstreamHTTPError(err) {
|
|
||||||
err = errorsource.DownstreamError(err, false)
|
|
||||||
}
|
|
||||||
return &backend.CheckHealthResult{
|
return &backend.CheckHealthResult{
|
||||||
Status: backend.HealthStatusUnknown,
|
Status: backend.HealthStatusError,
|
||||||
Message: "Failed to do healthcheck request",
|
Message: "Health check failed: Failed to connect to Elasticsearch",
|
||||||
}, err
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if response.StatusCode == http.StatusRequestTimeout {
|
if response.StatusCode == http.StatusRequestTimeout {
|
||||||
return &backend.CheckHealthResult{
|
return &backend.CheckHealthResult{
|
||||||
Status: backend.HealthStatusError,
|
Status: backend.HealthStatusError,
|
||||||
Message: "Elasticsearch data source is not healthy",
|
Message: "Health check failed: Elasticsearch data source is not healthy. Request timed out",
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if response.StatusCode >= 400 {
|
if response.StatusCode >= 400 {
|
||||||
errWithSource := errorsource.SourceError(backend.ErrorSourceFromHTTPStatus(response.StatusCode), fmt.Errorf("unexpected status code: %d", response.StatusCode), false)
|
|
||||||
return &backend.CheckHealthResult{
|
return &backend.CheckHealthResult{
|
||||||
Status: backend.HealthStatusError,
|
Status: backend.HealthStatusError,
|
||||||
Message: fmt.Sprintf("Elasticsearch data source is not healthy. Status: %s", response.Status),
|
Message: fmt.Sprintf("Health check failed: Elasticsearch data source is not healthy. Status: %s", response.Status),
|
||||||
}, errWithSource
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Info("Response received from Elasticsearch", "statusCode", response.StatusCode, "status", "ok", "duration", time.Since(start))
|
logger.Info("Response received from Elasticsearch", "statusCode", response.StatusCode, "status", "ok", "duration", time.Since(start))
|
||||||
@ -90,31 +90,131 @@ func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthReque
|
|||||||
logger.Error("Error reading response body bytes", "error", err)
|
logger.Error("Error reading response body bytes", "error", err)
|
||||||
return &backend.CheckHealthResult{
|
return &backend.CheckHealthResult{
|
||||||
Status: backend.HealthStatusUnknown,
|
Status: backend.HealthStatusUnknown,
|
||||||
Message: "Failed to read response",
|
Message: "Health check failed: Failed to read response",
|
||||||
}, err
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
jsonData := map[string]any{}
|
jsonData := map[string]any{}
|
||||||
|
|
||||||
err = json.Unmarshal(body, &jsonData)
|
err = json.Unmarshal(body, &jsonData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("Error during json unmarshal of the body", "error", err)
|
truncatedBody := string(body)
|
||||||
|
if len(truncatedBody) > ErrorBodyMaxSize {
|
||||||
|
truncatedBody = truncatedBody[:ErrorBodyMaxSize] + "..."
|
||||||
|
}
|
||||||
return &backend.CheckHealthResult{
|
return &backend.CheckHealthResult{
|
||||||
Status: backend.HealthStatusUnknown,
|
Status: backend.HealthStatusUnknown,
|
||||||
Message: "Failed to unmarshal response",
|
Message: fmt.Sprintf("Health check failed: Failed to parse response from Elasticsearch. Response received: %s", truncatedBody),
|
||||||
}, err
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
status := backend.HealthStatusOk
|
|
||||||
message := "Elasticsearch data source is healthy"
|
|
||||||
|
|
||||||
if jsonData["status"] == "red" {
|
if jsonData["status"] == "red" {
|
||||||
status = backend.HealthStatusError
|
return &backend.CheckHealthResult{
|
||||||
message = "Elasticsearch data source is not healthy"
|
Status: backend.HealthStatusError,
|
||||||
|
Message: "Health check failed: Elasticsearch data source is not healthy",
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
successMessage := "Elasticsearch data source is healthy."
|
||||||
|
indexWarningMessage := ""
|
||||||
|
|
||||||
|
// validate index and time field
|
||||||
|
cfg := backend.GrafanaConfigFromContext(ctx)
|
||||||
|
crossClusterSearchEnabled := cfg.FeatureToggles().IsEnabled("elasticsearchCrossClusterSearch")
|
||||||
|
|
||||||
|
if crossClusterSearchEnabled {
|
||||||
|
message, level := validateIndex(ctx, ds)
|
||||||
|
if level == "warning" {
|
||||||
|
indexWarningMessage = message
|
||||||
|
}
|
||||||
|
if level == "error" {
|
||||||
|
return &backend.CheckHealthResult{
|
||||||
|
Status: backend.HealthStatusError,
|
||||||
|
Message: message,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if indexWarningMessage != "" {
|
||||||
|
successMessage = fmt.Sprintf("%s Warning: %s", successMessage, indexWarningMessage)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &backend.CheckHealthResult{
|
return &backend.CheckHealthResult{
|
||||||
Status: status,
|
Status: backend.HealthStatusOk,
|
||||||
Message: message,
|
Message: successMessage,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func validateIndex(ctx context.Context, ds *es.DatasourceInfo) (message string, level string) {
|
||||||
|
// validate that the index exist and has date field
|
||||||
|
ip, err := es.NewIndexPattern(ds.Interval, ds.Database)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Sprintf("Failed to get build index pattern: %s", err), "error"
|
||||||
|
}
|
||||||
|
|
||||||
|
indices, err := ip.GetIndices(backend.TimeRange{
|
||||||
|
From: time.Now().UTC(),
|
||||||
|
To: time.Now().UTC(),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Sprintf("Failed to get index pattern: %s", err), "error"
|
||||||
|
}
|
||||||
|
|
||||||
|
indexList := strings.Join(indices, ",")
|
||||||
|
|
||||||
|
validateUrl := fmt.Sprintf("%s/%s/_field_caps?fields=%s", ds.URL, indexList, ds.ConfiguredFields.TimeField)
|
||||||
|
if indexList == "" || strings.Replace(indexList, ",", "", -1) == "" {
|
||||||
|
validateUrl = fmt.Sprintf("%s/_field_caps?fields=%s", ds.URL, ds.ConfiguredFields.TimeField)
|
||||||
|
}
|
||||||
|
|
||||||
|
request, err := http.NewRequestWithContext(ctx, http.MethodGet, validateUrl, nil)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Sprint("Failed to create request", "error", err, "url", validateUrl), "error"
|
||||||
|
}
|
||||||
|
response, err := ds.HTTPClient.Do(request)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Sprint("Failed to fetch field capabilities", "error", err, "url", validateUrl), "error"
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err := response.Body.Close(); err != nil {
|
||||||
|
backend.Logger.Warn("Failed to close response body", "error", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
fieldCaps := map[string]any{}
|
||||||
|
body, err := io.ReadAll(response.Body)
|
||||||
|
if err != nil {
|
||||||
|
return "Could not read response body while checking time field", "error"
|
||||||
|
}
|
||||||
|
err = json.Unmarshal(body, &fieldCaps)
|
||||||
|
if err != nil {
|
||||||
|
return "Failed to unmarshal field capabilities response", "error"
|
||||||
|
}
|
||||||
|
if fieldCaps["error"] != nil {
|
||||||
|
if errorMessage, ok := fieldCaps["error"].(map[string]any)["reason"].(string); ok {
|
||||||
|
return fmt.Sprintf("Error validating index: %s", errorMessage), "warning"
|
||||||
|
} else {
|
||||||
|
return "Error validating index", "warning"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fields, ok := fieldCaps["fields"].(map[string]any)
|
||||||
|
if !ok {
|
||||||
|
return "Failed to parse fields from response", "error"
|
||||||
|
}
|
||||||
|
if len(fields) == 0 {
|
||||||
|
return fmt.Sprintf("Could not find field %s in index", ds.ConfiguredFields.TimeField), "warning"
|
||||||
|
}
|
||||||
|
|
||||||
|
timeFieldInfo, ok := fields[ds.ConfiguredFields.TimeField].(map[string]any)
|
||||||
|
if !ok {
|
||||||
|
return "Failed to parse time field info from response", "error"
|
||||||
|
}
|
||||||
|
|
||||||
|
dateTypeField, ok := timeFieldInfo["date"].(map[string]any)
|
||||||
|
if !ok || dateTypeField == nil {
|
||||||
|
return fmt.Sprintf("Could not find time field '%s' with type date in index", ds.ConfiguredFields.TimeField), "warning"
|
||||||
|
}
|
||||||
|
|
||||||
|
return "", ""
|
||||||
|
}
|
||||||
|
@ -11,74 +11,132 @@ import (
|
|||||||
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
|
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
|
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
||||||
|
"github.com/grafana/grafana-plugin-sdk-go/experimental/featuretoggles"
|
||||||
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var mockedCfg = backend.WithGrafanaConfig(context.Background(), backend.NewGrafanaCfg(map[string]string{featuretoggles.EnabledFeatures: "elasticsearchCrossClusterSearch"}))
|
||||||
|
|
||||||
func Test_Healthcheck_OK(t *testing.T) {
|
func Test_Healthcheck_OK(t *testing.T) {
|
||||||
service := GetMockService(http.StatusOK, "200 OK")
|
service := GetMockService(http.StatusOK, "200 OK", `{"status":"green"}`, `{"fields":{"timestamp":{"date":{"metadata_field":true}}}}`)
|
||||||
res, _ := service.CheckHealth(context.Background(), &backend.CheckHealthRequest{
|
res, _ := service.CheckHealth(context.Background(), &backend.CheckHealthRequest{
|
||||||
PluginContext: backend.PluginContext{},
|
PluginContext: backend.PluginContext{},
|
||||||
Headers: nil,
|
Headers: nil,
|
||||||
})
|
})
|
||||||
assert.Equal(t, backend.HealthStatusOk, res.Status)
|
assert.Equal(t, backend.HealthStatusOk, res.Status)
|
||||||
assert.Equal(t, "Elasticsearch data source is healthy", res.Message)
|
assert.Equal(t, "Elasticsearch data source is healthy.", res.Message)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_Healthcheck_Timeout(t *testing.T) {
|
func Test_Healthcheck_Timeout(t *testing.T) {
|
||||||
service := GetMockService(http.StatusRequestTimeout, "408 Request Timeout")
|
service := GetMockService(http.StatusRequestTimeout, "408 Request Timeout", `{"status":"red"}`, `{"fields":{"timestamp":{"date":{"metadata_field":true}}}}`)
|
||||||
res, _ := service.CheckHealth(context.Background(), &backend.CheckHealthRequest{
|
res, _ := service.CheckHealth(context.Background(), &backend.CheckHealthRequest{
|
||||||
PluginContext: backend.PluginContext{},
|
PluginContext: backend.PluginContext{},
|
||||||
Headers: nil,
|
Headers: nil,
|
||||||
})
|
})
|
||||||
assert.Equal(t, backend.HealthStatusError, res.Status)
|
assert.Equal(t, backend.HealthStatusError, res.Status)
|
||||||
assert.Equal(t, "Elasticsearch data source is not healthy", res.Message)
|
assert.Equal(t, "Health check failed: Elasticsearch data source is not healthy. Request timed out", res.Message)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_Healthcheck_Error(t *testing.T) {
|
func Test_Healthcheck_Error(t *testing.T) {
|
||||||
service := GetMockService(http.StatusBadGateway, "502 Bad Gateway")
|
service := GetMockService(http.StatusBadGateway, "502 Bad Gateway", `{"status":"red"}`, `{"fields":{"timestamp":{"date":{"metadata_field":true}}}}`)
|
||||||
res, _ := service.CheckHealth(context.Background(), &backend.CheckHealthRequest{
|
res, _ := service.CheckHealth(context.Background(), &backend.CheckHealthRequest{
|
||||||
PluginContext: backend.PluginContext{},
|
PluginContext: backend.PluginContext{},
|
||||||
Headers: nil,
|
Headers: nil,
|
||||||
})
|
})
|
||||||
assert.Equal(t, backend.HealthStatusError, res.Status)
|
assert.Equal(t, backend.HealthStatusError, res.Status)
|
||||||
assert.Equal(t, "Elasticsearch data source is not healthy. Status: 502 Bad Gateway", res.Message)
|
assert.Equal(t, "Health check failed: Elasticsearch data source is not healthy. Status: 502 Bad Gateway", res.Message)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_validateIndex_Warning_ErrorValidatingIndex(t *testing.T) {
|
||||||
|
service := GetMockService(http.StatusOK, "200 OK", `{"status":"green"}`, `{"error":{"reason":"index_not_found"}}`)
|
||||||
|
res, _ := service.CheckHealth(mockedCfg, &backend.CheckHealthRequest{
|
||||||
|
PluginContext: backend.PluginContext{},
|
||||||
|
Headers: nil,
|
||||||
|
})
|
||||||
|
assert.Equal(t, backend.HealthStatusOk, res.Status)
|
||||||
|
assert.Equal(t, "Elasticsearch data source is healthy. Warning: Error validating index: index_not_found", res.Message)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_validateIndex_Warning_WrongTimestampType(t *testing.T) {
|
||||||
|
service := GetMockService(http.StatusOK, "200 OK", `{"status":"green"}`, `{"fields":{"timestamp":{"float":{"metadata_field":true}}}}`)
|
||||||
|
res, _ := service.CheckHealth(mockedCfg, &backend.CheckHealthRequest{
|
||||||
|
PluginContext: backend.PluginContext{},
|
||||||
|
Headers: nil,
|
||||||
|
})
|
||||||
|
assert.Equal(t, backend.HealthStatusOk, res.Status)
|
||||||
|
assert.Equal(t, "Elasticsearch data source is healthy. Warning: Could not find time field 'timestamp' with type date in index", res.Message)
|
||||||
|
}
|
||||||
|
func Test_validateIndex_Error_FailedToUnmarshalValidateResponse(t *testing.T) {
|
||||||
|
service := GetMockService(http.StatusOK, "200 OK", `{"status":"green"}`, `\\\///{"fields":null}"`)
|
||||||
|
res, _ := service.CheckHealth(mockedCfg, &backend.CheckHealthRequest{
|
||||||
|
PluginContext: backend.PluginContext{},
|
||||||
|
Headers: nil,
|
||||||
|
})
|
||||||
|
assert.Equal(t, backend.HealthStatusError, res.Status)
|
||||||
|
assert.Equal(t, "Failed to unmarshal field capabilities response", res.Message)
|
||||||
|
}
|
||||||
|
func Test_validateIndex_Success_SuccessValidatingIndex(t *testing.T) {
|
||||||
|
service := GetMockService(http.StatusOK, "200 OK", `{"status":"green"}`, `{"fields":{"timestamp":{"date":{"metadata_field":true}}}}`)
|
||||||
|
res, _ := service.CheckHealth(mockedCfg, &backend.CheckHealthRequest{
|
||||||
|
PluginContext: backend.PluginContext{},
|
||||||
|
Headers: nil,
|
||||||
|
})
|
||||||
|
assert.Equal(t, backend.HealthStatusOk, res.Status)
|
||||||
|
assert.Equal(t, "Elasticsearch data source is healthy.", res.Message)
|
||||||
}
|
}
|
||||||
|
|
||||||
type FakeRoundTripper struct {
|
type FakeRoundTripper struct {
|
||||||
statusCode int
|
statusCode int
|
||||||
status string
|
status string
|
||||||
|
index int
|
||||||
|
elasticSearchResponse string
|
||||||
|
fieldCapsResponse string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fakeRoundTripper *FakeRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
func (fakeRoundTripper *FakeRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||||
var res *http.Response
|
var res *http.Response
|
||||||
if fakeRoundTripper.statusCode == http.StatusOK {
|
if fakeRoundTripper.index == 0 {
|
||||||
|
if fakeRoundTripper.statusCode == http.StatusOK {
|
||||||
|
res = &http.Response{
|
||||||
|
StatusCode: http.StatusOK,
|
||||||
|
Status: "200 OK",
|
||||||
|
Body: io.NopCloser(bytes.NewBufferString(fakeRoundTripper.elasticSearchResponse)),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
res = &http.Response{
|
||||||
|
StatusCode: fakeRoundTripper.statusCode,
|
||||||
|
Status: fakeRoundTripper.status,
|
||||||
|
Body: io.NopCloser(bytes.NewBufferString(fakeRoundTripper.elasticSearchResponse)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fakeRoundTripper.index++
|
||||||
|
} else {
|
||||||
res = &http.Response{
|
res = &http.Response{
|
||||||
StatusCode: http.StatusOK,
|
StatusCode: http.StatusOK,
|
||||||
Status: "200 OK",
|
Status: "200 OK",
|
||||||
Body: io.NopCloser(bytes.NewBufferString("{\"status\":\"green\"}")),
|
Body: io.NopCloser(bytes.NewBufferString(fakeRoundTripper.fieldCapsResponse)),
|
||||||
}
|
|
||||||
} else {
|
|
||||||
res = &http.Response{
|
|
||||||
StatusCode: fakeRoundTripper.statusCode,
|
|
||||||
Status: fakeRoundTripper.status,
|
|
||||||
Body: io.NopCloser(bytes.NewBufferString("{\"status\":\"red\"}")),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type FakeInstanceManager struct {
|
type FakeInstanceManager struct {
|
||||||
statusCode int
|
statusCode int
|
||||||
status string
|
status string
|
||||||
|
elasticSearchResponse string
|
||||||
|
fieldCapsResponse string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fakeInstanceManager *FakeInstanceManager) Get(tx context.Context, pluginContext backend.PluginContext) (instancemgmt.Instance, error) {
|
func (fakeInstanceManager *FakeInstanceManager) Get(tx context.Context, pluginContext backend.PluginContext) (instancemgmt.Instance, error) {
|
||||||
httpClient, _ := httpclient.New(httpclient.Options{})
|
httpClient, _ := httpclient.New(httpclient.Options{})
|
||||||
httpClient.Transport = &FakeRoundTripper{statusCode: fakeInstanceManager.statusCode, status: fakeInstanceManager.status}
|
httpClient.Transport = &FakeRoundTripper{statusCode: fakeInstanceManager.statusCode, status: fakeInstanceManager.status, elasticSearchResponse: fakeInstanceManager.elasticSearchResponse, fieldCapsResponse: fakeInstanceManager.fieldCapsResponse, index: 0}
|
||||||
|
|
||||||
return es.DatasourceInfo{
|
return es.DatasourceInfo{
|
||||||
HTTPClient: httpClient,
|
HTTPClient: httpClient,
|
||||||
|
ConfiguredFields: es.ConfiguredFields{
|
||||||
|
TimeField: "timestamp",
|
||||||
|
},
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -86,9 +144,9 @@ func (*FakeInstanceManager) Do(_ context.Context, _ backend.PluginContext, _ ins
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetMockService(statusCode int, status string) *Service {
|
func GetMockService(statusCode int, status string, elasticSearchResponse string, fieldCapsResponse string) *Service {
|
||||||
return &Service{
|
return &Service{
|
||||||
im: &FakeInstanceManager{statusCode: statusCode, status: status},
|
im: &FakeInstanceManager{statusCode: statusCode, status: status, elasticSearchResponse: elasticSearchResponse, fieldCapsResponse: fieldCapsResponse},
|
||||||
logger: log.New(),
|
logger: log.New(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
import { cloneDeep, first as _first, isNumber, isString, map as _map, find, isObject } from 'lodash';
|
import { cloneDeep, first as _first, isNumber, isString, map as _map, isObject } from 'lodash';
|
||||||
import { from, generate, lastValueFrom, Observable, of } from 'rxjs';
|
import { from, generate, lastValueFrom, Observable, of } from 'rxjs';
|
||||||
import { catchError, first, map, mergeMap, skipWhile, throwIfEmpty, tap } from 'rxjs/operators';
|
import { catchError, first, map, mergeMap, skipWhile, throwIfEmpty, tap } from 'rxjs/operators';
|
||||||
import { SemVer } from 'semver';
|
import { SemVer } from 'semver';
|
||||||
@ -83,7 +83,7 @@ import {
|
|||||||
isElasticsearchResponseWithHits,
|
isElasticsearchResponseWithHits,
|
||||||
ElasticsearchHits,
|
ElasticsearchHits,
|
||||||
} from './types';
|
} from './types';
|
||||||
import { getScriptValue, isSupportedVersion, isTimeSeriesQuery, unsupportedVersionMessage } from './utils';
|
import { getScriptValue, isTimeSeriesQuery } from './utils';
|
||||||
|
|
||||||
export const REF_ID_STARTER_LOG_VOLUME = 'log-volume-';
|
export const REF_ID_STARTER_LOG_VOLUME = 'log-volume-';
|
||||||
export const REF_ID_STARTER_LOG_SAMPLE = 'log-sample-';
|
export const REF_ID_STARTER_LOG_SAMPLE = 'log-sample-';
|
||||||
@ -441,37 +441,6 @@ export class ElasticDatasource
|
|||||||
return queries.map((q) => this.applyTemplateVariables(q, scopedVars, filters));
|
return queries.map((q) => this.applyTemplateVariables(q, scopedVars, filters));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @todo Remove as we have health checks in the backend
|
|
||||||
*/
|
|
||||||
async testDatasource() {
|
|
||||||
// we explicitly ask for uncached, "fresh" data here
|
|
||||||
const dbVersion = await this.getDatabaseVersion(false);
|
|
||||||
// if we are not able to determine the elastic-version, we assume it is a good version.
|
|
||||||
const isSupported = dbVersion != null ? isSupportedVersion(dbVersion) : true;
|
|
||||||
const versionMessage = isSupported ? '' : `WARNING: ${unsupportedVersionMessage} `;
|
|
||||||
// validate that the index exist and has date field
|
|
||||||
return lastValueFrom(
|
|
||||||
this.getFields(['date']).pipe(
|
|
||||||
mergeMap((dateFields) => {
|
|
||||||
const timeField = find(dateFields, { text: this.timeField });
|
|
||||||
if (!timeField) {
|
|
||||||
return of({
|
|
||||||
status: 'error',
|
|
||||||
message: 'No date field named ' + this.timeField + ' found',
|
|
||||||
});
|
|
||||||
}
|
|
||||||
return of({ status: 'success', message: `${versionMessage}Data source successfully connected.` });
|
|
||||||
}),
|
|
||||||
catchError((err) => {
|
|
||||||
const infoInParentheses = err.message ? ` (${err.message})` : '';
|
|
||||||
const message = `Unable to connect with Elasticsearch${infoInParentheses}. Please check the server logs for more details.`;
|
|
||||||
return of({ status: 'error', message });
|
|
||||||
})
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Private method used in `getTerms` to get the header for the Elasticsearch query
|
// Private method used in `getTerms` to get the header for the Elasticsearch query
|
||||||
private getQueryHeader(searchType: string, timeFrom?: DateTime, timeTo?: DateTime): string {
|
private getQueryHeader(searchType: string, timeFrom?: DateTime, timeTo?: DateTime): string {
|
||||||
const queryHeader = {
|
const queryHeader = {
|
||||||
|
Reference in New Issue
Block a user