mirror of
https://github.com/grafana/grafana.git
synced 2025-07-29 11:12:13 +08:00
221 lines
6.7 KiB
Go
221 lines
6.7 KiB
Go
package elasticsearch
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"path"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
|
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
|
)
|
|
|
|
const ErrorBodyMaxSize = 200
|
|
|
|
func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
|
|
logger := s.logger.FromContext(ctx)
|
|
|
|
ds, err := s.getDSInfo(ctx, req.PluginContext)
|
|
if err != nil {
|
|
logger.Error("Failed to get data source info", "error", err)
|
|
return &backend.CheckHealthResult{
|
|
Status: backend.HealthStatusUnknown,
|
|
Message: "Health check failed: Failed to get data source info",
|
|
}, nil
|
|
}
|
|
|
|
healthStatusUrl, err := url.Parse(ds.URL)
|
|
if err != nil {
|
|
logger.Error("Failed to parse data source URL", "error", err)
|
|
return &backend.CheckHealthResult{
|
|
Status: backend.HealthStatusUnknown,
|
|
Message: "Failed to parse data source URL",
|
|
}, nil
|
|
}
|
|
|
|
// check that ES is healthy
|
|
healthStatusUrl.Path = path.Join(healthStatusUrl.Path, "_cluster/health")
|
|
healthStatusUrl.RawQuery = "wait_for_status=yellow"
|
|
|
|
request, err := http.NewRequestWithContext(ctx, http.MethodGet, healthStatusUrl.String(), nil)
|
|
if err != nil {
|
|
logger.Error("Failed to create request", "error", err, "url", healthStatusUrl.String())
|
|
return &backend.CheckHealthResult{
|
|
Status: backend.HealthStatusUnknown,
|
|
Message: "Failed to create request",
|
|
}, nil
|
|
}
|
|
|
|
start := time.Now()
|
|
logger.Debug("Sending healthcheck request to Elasticsearch", "url", healthStatusUrl.String())
|
|
response, err := ds.HTTPClient.Do(request)
|
|
|
|
if err != nil {
|
|
logger.Error("Failed to connect to Elasticsearch", "error", err, "url", healthStatusUrl.String())
|
|
return &backend.CheckHealthResult{
|
|
Status: backend.HealthStatusError,
|
|
Message: "Health check failed: Failed to connect to Elasticsearch",
|
|
}, nil
|
|
}
|
|
|
|
if response.StatusCode == http.StatusRequestTimeout {
|
|
return &backend.CheckHealthResult{
|
|
Status: backend.HealthStatusError,
|
|
Message: "Health check failed: Elasticsearch data source is not healthy. Request timed out",
|
|
}, nil
|
|
}
|
|
|
|
if response.StatusCode >= 400 {
|
|
return &backend.CheckHealthResult{
|
|
Status: backend.HealthStatusError,
|
|
Message: fmt.Sprintf("Health check failed: Elasticsearch data source is not healthy. Status: %s", response.Status),
|
|
}, nil
|
|
}
|
|
|
|
logger.Info("Response received from Elasticsearch", "statusCode", response.StatusCode, "status", "ok", "duration", time.Since(start))
|
|
|
|
defer func() {
|
|
if err := response.Body.Close(); err != nil {
|
|
logger.Warn("Failed to close response body", "error", err)
|
|
}
|
|
}()
|
|
|
|
body, err := io.ReadAll(response.Body)
|
|
if err != nil {
|
|
logger.Error("Error reading response body bytes", "error", err)
|
|
return &backend.CheckHealthResult{
|
|
Status: backend.HealthStatusUnknown,
|
|
Message: "Health check failed: Failed to read response",
|
|
}, nil
|
|
}
|
|
|
|
jsonData := map[string]any{}
|
|
|
|
err = json.Unmarshal(body, &jsonData)
|
|
if err != nil {
|
|
truncatedBody := string(body)
|
|
if len(truncatedBody) > ErrorBodyMaxSize {
|
|
truncatedBody = truncatedBody[:ErrorBodyMaxSize] + "..."
|
|
}
|
|
return &backend.CheckHealthResult{
|
|
Status: backend.HealthStatusUnknown,
|
|
Message: fmt.Sprintf("Health check failed: Failed to parse response from Elasticsearch. Response received: %s", truncatedBody),
|
|
}, nil
|
|
}
|
|
|
|
if jsonData["status"] == "red" {
|
|
return &backend.CheckHealthResult{
|
|
Status: backend.HealthStatusError,
|
|
Message: "Health check failed: Elasticsearch data source is not healthy",
|
|
}, nil
|
|
}
|
|
|
|
successMessage := "Elasticsearch data source is healthy."
|
|
indexWarningMessage := ""
|
|
|
|
// validate index and time field
|
|
cfg := backend.GrafanaConfigFromContext(ctx)
|
|
crossClusterSearchEnabled := cfg.FeatureToggles().IsEnabled("elasticsearchCrossClusterSearch")
|
|
|
|
if crossClusterSearchEnabled {
|
|
message, level := validateIndex(ctx, ds)
|
|
if level == "warning" {
|
|
indexWarningMessage = message
|
|
}
|
|
if level == "error" {
|
|
return &backend.CheckHealthResult{
|
|
Status: backend.HealthStatusError,
|
|
Message: message,
|
|
}, nil
|
|
}
|
|
}
|
|
|
|
if indexWarningMessage != "" {
|
|
successMessage = fmt.Sprintf("%s Warning: %s", successMessage, indexWarningMessage)
|
|
}
|
|
|
|
return &backend.CheckHealthResult{
|
|
Status: backend.HealthStatusOk,
|
|
Message: successMessage,
|
|
}, nil
|
|
}
|
|
|
|
func validateIndex(ctx context.Context, ds *es.DatasourceInfo) (message string, level string) {
|
|
// validate that the index exist and has date field
|
|
ip, err := es.NewIndexPattern(ds.Interval, ds.Database)
|
|
if err != nil {
|
|
return fmt.Sprintf("Failed to get build index pattern: %s", err), "error"
|
|
}
|
|
|
|
indices, err := ip.GetIndices(backend.TimeRange{
|
|
From: time.Now().UTC(),
|
|
To: time.Now().UTC(),
|
|
})
|
|
if err != nil {
|
|
return fmt.Sprintf("Failed to get index pattern: %s", err), "error"
|
|
}
|
|
|
|
indexList := strings.Join(indices, ",")
|
|
|
|
validateUrl := fmt.Sprintf("%s/%s/_field_caps?fields=%s", ds.URL, indexList, ds.ConfiguredFields.TimeField)
|
|
if indexList == "" || strings.ReplaceAll(indexList, ",", "") == "" {
|
|
validateUrl = fmt.Sprintf("%s/_field_caps?fields=%s", ds.URL, ds.ConfiguredFields.TimeField)
|
|
}
|
|
|
|
request, err := http.NewRequestWithContext(ctx, http.MethodGet, validateUrl, nil)
|
|
if err != nil {
|
|
return fmt.Sprint("Failed to create request", "error", err, "url", validateUrl), "error"
|
|
}
|
|
response, err := ds.HTTPClient.Do(request)
|
|
if err != nil {
|
|
return fmt.Sprint("Failed to fetch field capabilities", "error", err, "url", validateUrl), "error"
|
|
}
|
|
defer func() {
|
|
if err := response.Body.Close(); err != nil {
|
|
backend.Logger.Warn("Failed to close response body", "error", err)
|
|
}
|
|
}()
|
|
|
|
fieldCaps := map[string]any{}
|
|
body, err := io.ReadAll(response.Body)
|
|
if err != nil {
|
|
return "Could not read response body while checking time field", "error"
|
|
}
|
|
err = json.Unmarshal(body, &fieldCaps)
|
|
if err != nil {
|
|
return "Failed to unmarshal field capabilities response", "error"
|
|
}
|
|
if fieldCaps["error"] != nil {
|
|
if errorMessage, ok := fieldCaps["error"].(map[string]any)["reason"].(string); ok {
|
|
return fmt.Sprintf("Error validating index: %s", errorMessage), "warning"
|
|
} else {
|
|
return "Error validating index", "warning"
|
|
}
|
|
}
|
|
|
|
fields, ok := fieldCaps["fields"].(map[string]any)
|
|
if !ok {
|
|
return "Failed to parse fields from response", "error"
|
|
}
|
|
if len(fields) == 0 {
|
|
return fmt.Sprintf("Could not find field %s in index", ds.ConfiguredFields.TimeField), "warning"
|
|
}
|
|
|
|
timeFieldInfo, ok := fields[ds.ConfiguredFields.TimeField].(map[string]any)
|
|
if !ok {
|
|
return "Failed to parse time field info from response", "error"
|
|
}
|
|
|
|
dateTypeField, ok := timeFieldInfo["date"].(map[string]any)
|
|
if !ok || dateTypeField == nil {
|
|
return fmt.Sprintf("Could not find time field '%s' with type date in index", ds.ConfiguredFields.TimeField), "warning"
|
|
}
|
|
|
|
return "", ""
|
|
}
|