mirror of
https://github.com/grafana/grafana.git
synced 2025-08-01 17:42:12 +08:00
Elasticsearch and Cloudwatch: Replace deprecated experimental/errorsource (#97363)
* Update elasticsearch * Update azure monitor * Update test * Revert "Update azure monitor" This reverts commit 21b53845b85c3a43d679d729c33879fdc3e9f027. * Update cloudwatch
This commit is contained in:
@ -11,7 +11,6 @@ import (
|
|||||||
"github.com/aws/aws-sdk-go/service/cloudwatch"
|
"github.com/aws/aws-sdk-go/service/cloudwatch"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
|
|
||||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/kinds/dataquery"
|
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/kinds/dataquery"
|
||||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/utils"
|
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/utils"
|
||||||
)
|
)
|
||||||
@ -36,7 +35,7 @@ func (e *cloudWatchExecutor) executeAnnotationQuery(ctx context.Context, pluginC
|
|||||||
if model.Period != nil && *model.Period != "" {
|
if model.Period != nil && *model.Period != "" {
|
||||||
p, err := strconv.ParseInt(*model.Period, 10, 64)
|
p, err := strconv.ParseInt(*model.Period, 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errorsource.DownstreamError(fmt.Errorf("query period must be an int"), false)
|
return nil, backend.DownstreamError(fmt.Errorf("query period must be an int"))
|
||||||
}
|
}
|
||||||
period = p
|
period = p
|
||||||
}
|
}
|
||||||
@ -82,13 +81,13 @@ func (e *cloudWatchExecutor) executeAnnotationQuery(ctx context.Context, pluginC
|
|||||||
}
|
}
|
||||||
resp, err := cli.DescribeAlarms(params)
|
resp, err := cli.DescribeAlarms(params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
result = errorsource.AddDownstreamErrorToResponse(query.RefID, result, fmt.Errorf("%v: %w", "failed to call cloudwatch:DescribeAlarms", err))
|
result.Responses[query.RefID] = backend.ErrorResponseWithErrorSource(backend.DownstreamError(fmt.Errorf("%v: %w", "failed to call cloudwatch:DescribeAlarms", err)))
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
alarmNames = filterAlarms(resp, utils.Depointerizer(model.Namespace), metricName, dimensions, statistic, period)
|
alarmNames = filterAlarms(resp, utils.Depointerizer(model.Namespace), metricName, dimensions, statistic, period)
|
||||||
} else {
|
} else {
|
||||||
if model.Region == nil || model.Namespace == nil || metricName == "" || statistic == "" {
|
if model.Region == nil || model.Namespace == nil || metricName == "" || statistic == "" {
|
||||||
return result, errorsource.DownstreamError(errors.New("invalid annotations query"), false)
|
return result, backend.DownstreamError(errors.New("invalid annotations query"))
|
||||||
}
|
}
|
||||||
|
|
||||||
var qd []*cloudwatch.Dimension
|
var qd []*cloudwatch.Dimension
|
||||||
@ -113,7 +112,7 @@ func (e *cloudWatchExecutor) executeAnnotationQuery(ctx context.Context, pluginC
|
|||||||
}
|
}
|
||||||
resp, err := cli.DescribeAlarmsForMetric(params)
|
resp, err := cli.DescribeAlarmsForMetric(params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
result = errorsource.AddDownstreamErrorToResponse(query.RefID, result, fmt.Errorf("%v: %w", "failed to call cloudwatch:DescribeAlarmsForMetric", err))
|
result.Responses[query.RefID] = backend.ErrorResponseWithErrorSource(backend.DownstreamError(fmt.Errorf("%v: %w", "failed to call cloudwatch:DescribeAlarmsForMetric", err)))
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
for _, alarm := range resp.MetricAlarms {
|
for _, alarm := range resp.MetricAlarms {
|
||||||
@ -131,7 +130,7 @@ func (e *cloudWatchExecutor) executeAnnotationQuery(ctx context.Context, pluginC
|
|||||||
}
|
}
|
||||||
resp, err := cli.DescribeAlarmHistory(params)
|
resp, err := cli.DescribeAlarmHistory(params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
result = errorsource.AddDownstreamErrorToResponse(query.RefID, result, fmt.Errorf("%v: %w", "failed to call cloudwatch:DescribeAlarmHistory", err))
|
result.Responses[query.RefID] = backend.ErrorResponseWithErrorSource(backend.DownstreamError(fmt.Errorf("%v: %w", "failed to call cloudwatch:DescribeAlarmHistory", err)))
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
for _, history := range resp.AlarmHistoryItems {
|
for _, history := range resp.AlarmHistoryItems {
|
||||||
|
@ -7,7 +7,7 @@ import (
|
|||||||
"github.com/aws/aws-sdk-go/aws"
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
"github.com/aws/aws-sdk-go/service/cloudwatch"
|
"github.com/aws/aws-sdk-go/service/cloudwatch"
|
||||||
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
|
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/features"
|
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/features"
|
||||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/utils"
|
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/utils"
|
||||||
)
|
)
|
||||||
@ -28,7 +28,7 @@ func (e *cloudWatchExecutor) executeRequest(ctx context.Context, client cloudwat
|
|||||||
|
|
||||||
resp, err := client.GetMetricDataWithContext(ctx, metricDataInput)
|
resp, err := client.GetMetricDataWithContext(ctx, metricDataInput)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return mdo, errorsource.DownstreamError(err, false)
|
return mdo, backend.DownstreamError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
mdo = append(mdo, resp)
|
mdo = append(mdo, resp)
|
||||||
|
@ -17,7 +17,6 @@ import (
|
|||||||
"github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface"
|
"github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
|
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/features"
|
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/features"
|
||||||
@ -101,7 +100,7 @@ func (e *cloudWatchExecutor) executeLogActions(ctx context.Context, req *backend
|
|||||||
dataframe, err := e.executeLogAction(ectx, logsQuery, query, req.PluginContext)
|
dataframe, err := e.executeLogAction(ectx, logsQuery, query, req.PluginContext)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
resultChan <- backend.Responses{
|
resultChan <- backend.Responses{
|
||||||
query.RefID: errorsource.Response(err),
|
query.RefID: backend.ErrorResponseWithErrorSource(err),
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -182,12 +181,12 @@ func (e *cloudWatchExecutor) handleGetLogEvents(ctx context.Context, logsClient
|
|||||||
}
|
}
|
||||||
|
|
||||||
if logsQuery.LogGroupName == "" {
|
if logsQuery.LogGroupName == "" {
|
||||||
return nil, errorsource.DownstreamError(fmt.Errorf("Error: Parameter 'logGroupName' is required"), false)
|
return nil, backend.DownstreamError(fmt.Errorf("Error: Parameter 'logGroupName' is required"))
|
||||||
}
|
}
|
||||||
queryRequest.SetLogGroupName(logsQuery.LogGroupName)
|
queryRequest.SetLogGroupName(logsQuery.LogGroupName)
|
||||||
|
|
||||||
if logsQuery.LogStreamName == "" {
|
if logsQuery.LogStreamName == "" {
|
||||||
return nil, errorsource.DownstreamError(fmt.Errorf("Error: Parameter 'logStreamName' is required"), false)
|
return nil, backend.DownstreamError(fmt.Errorf("Error: Parameter 'logStreamName' is required"))
|
||||||
}
|
}
|
||||||
queryRequest.SetLogStreamName(logsQuery.LogStreamName)
|
queryRequest.SetLogStreamName(logsQuery.LogStreamName)
|
||||||
|
|
||||||
@ -201,7 +200,7 @@ func (e *cloudWatchExecutor) handleGetLogEvents(ctx context.Context, logsClient
|
|||||||
|
|
||||||
logEvents, err := logsClient.GetLogEventsWithContext(ctx, queryRequest)
|
logEvents, err := logsClient.GetLogEventsWithContext(ctx, queryRequest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errorsource.DownstreamError(err, false)
|
return nil, backend.DownstreamError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
messages := make([]*string, 0)
|
messages := make([]*string, 0)
|
||||||
@ -230,7 +229,7 @@ func (e *cloudWatchExecutor) executeStartQuery(ctx context.Context, logsClient c
|
|||||||
endTime := timeRange.To
|
endTime := timeRange.To
|
||||||
|
|
||||||
if !startTime.Before(endTime) {
|
if !startTime.Before(endTime) {
|
||||||
return nil, errorsource.DownstreamError(fmt.Errorf("invalid time range: start time must be before end time"), false)
|
return nil, backend.DownstreamError(fmt.Errorf("invalid time range: start time must be before end time"))
|
||||||
}
|
}
|
||||||
if logsQuery.QueryLanguage == nil {
|
if logsQuery.QueryLanguage == nil {
|
||||||
cwli := dataquery.LogsQueryLanguageCWLI
|
cwli := dataquery.LogsQueryLanguageCWLI
|
||||||
@ -290,7 +289,7 @@ func (e *cloudWatchExecutor) executeStartQuery(ctx context.Context, logsClient c
|
|||||||
e.logger.FromContext(ctx).Debug("ExecuteStartQuery rate exceeded", "err", awsErr)
|
e.logger.FromContext(ctx).Debug("ExecuteStartQuery rate exceeded", "err", awsErr)
|
||||||
err = &AWSError{Code: throttlingException, Message: err.Error()}
|
err = &AWSError{Code: throttlingException, Message: err.Error()}
|
||||||
}
|
}
|
||||||
err = errorsource.DownstreamError(err, false)
|
err = backend.DownstreamError(err)
|
||||||
}
|
}
|
||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
@ -335,7 +334,7 @@ func (e *cloudWatchExecutor) executeStopQuery(ctx context.Context, logsClient cl
|
|||||||
response = &cloudwatchlogs.StopQueryOutput{Success: aws.Bool(false)}
|
response = &cloudwatchlogs.StopQueryOutput{Success: aws.Bool(false)}
|
||||||
err = nil
|
err = nil
|
||||||
} else {
|
} else {
|
||||||
err = errorsource.DownstreamError(err, false)
|
err = backend.DownstreamError(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -365,7 +364,7 @@ func (e *cloudWatchExecutor) executeGetQueryResults(ctx context.Context, logsCli
|
|||||||
if errors.As(err, &awsErr) {
|
if errors.As(err, &awsErr) {
|
||||||
err = &AWSError{Code: awsErr.Code(), Message: err.Error()}
|
err = &AWSError{Code: awsErr.Code(), Message: err.Error()}
|
||||||
}
|
}
|
||||||
err = errorsource.DownstreamError(err, false)
|
err = backend.DownstreamError(err)
|
||||||
}
|
}
|
||||||
return getQueryResultsResponse, err
|
return getQueryResultsResponse, err
|
||||||
}
|
}
|
||||||
|
@ -11,7 +11,6 @@ import (
|
|||||||
"github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface"
|
"github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
|
|
||||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/kinds/dataquery"
|
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/kinds/dataquery"
|
||||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/models"
|
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/models"
|
||||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/utils"
|
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/utils"
|
||||||
@ -56,9 +55,9 @@ var executeSyncLogQuery = func(ctx context.Context, e *cloudWatchExecutor, req *
|
|||||||
}
|
}
|
||||||
|
|
||||||
getQueryResultsOutput, err := e.syncQuery(ctx, logsClient, q, logsQuery, instance.Settings.LogsTimeout.Duration)
|
getQueryResultsOutput, err := e.syncQuery(ctx, logsClient, q, logsQuery, instance.Settings.LogsTimeout.Duration)
|
||||||
var sourceError errorsource.Error
|
var sourceError backend.ErrorWithSource
|
||||||
if errors.As(err, &sourceError) {
|
if errors.As(err, &sourceError) {
|
||||||
errorsource.AddErrorToResponse(refId, resp, sourceError)
|
resp.Responses[refId] = backend.ErrorResponseWithErrorSource(sourceError)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -16,7 +16,6 @@ import (
|
|||||||
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
|
|
||||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/kinds/dataquery"
|
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/kinds/dataquery"
|
||||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/utils"
|
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/utils"
|
||||||
)
|
)
|
||||||
@ -311,7 +310,7 @@ func (q *CloudWatchQuery) migrateLegacyQuery(query metricsDataQuery) {
|
|||||||
func (q *CloudWatchQuery) validateAndSetDefaults(refId string, metricsDataQuery metricsDataQuery, startTime, endTime time.Time,
|
func (q *CloudWatchQuery) validateAndSetDefaults(refId string, metricsDataQuery metricsDataQuery, startTime, endTime time.Time,
|
||||||
defaultRegionValue string, crossAccountQueryingEnabled bool) error {
|
defaultRegionValue string, crossAccountQueryingEnabled bool) error {
|
||||||
if metricsDataQuery.Statistic == nil && metricsDataQuery.Statistics == nil {
|
if metricsDataQuery.Statistic == nil && metricsDataQuery.Statistics == nil {
|
||||||
return errorsource.DownstreamError(fmt.Errorf("query must have either statistic or statistics field"), false)
|
return backend.DownstreamError(fmt.Errorf("query must have either statistic or statistics field"))
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
|
@ -8,7 +8,6 @@ import (
|
|||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
|
|
||||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/features"
|
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/features"
|
||||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/models"
|
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/models"
|
||||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/utils"
|
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/utils"
|
||||||
@ -24,13 +23,13 @@ func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, req *ba
|
|||||||
resp := backend.NewQueryDataResponse()
|
resp := backend.NewQueryDataResponse()
|
||||||
|
|
||||||
if len(req.Queries) == 0 {
|
if len(req.Queries) == 0 {
|
||||||
return nil, errorsource.DownstreamError(fmt.Errorf("request contains no queries"), false)
|
return nil, backend.DownstreamError(fmt.Errorf("request contains no queries"))
|
||||||
}
|
}
|
||||||
// startTime and endTime are always the same for all queries
|
// startTime and endTime are always the same for all queries
|
||||||
startTime := req.Queries[0].TimeRange.From
|
startTime := req.Queries[0].TimeRange.From
|
||||||
endTime := req.Queries[0].TimeRange.To
|
endTime := req.Queries[0].TimeRange.To
|
||||||
if !startTime.Before(endTime) {
|
if !startTime.Before(endTime) {
|
||||||
return nil, errorsource.DownstreamError(fmt.Errorf("invalid time range: start time must be before end time"), false)
|
return nil, backend.DownstreamError(fmt.Errorf("invalid time range: start time must be before end time"))
|
||||||
}
|
}
|
||||||
|
|
||||||
instance, err := e.getInstance(ctx, req.PluginContext)
|
instance, err := e.getInstance(ctx, req.PluginContext)
|
||||||
@ -129,7 +128,7 @@ func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, req *ba
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := eg.Wait(); err != nil {
|
if err := eg.Wait(); err != nil {
|
||||||
dataResponse := errorsource.Response(fmt.Errorf("metric request error: %w", err))
|
dataResponse := backend.ErrorResponseWithErrorSource(fmt.Errorf("metric request error: %w", err))
|
||||||
resultChan <- &responseWrapper{
|
resultChan <- &responseWrapper{
|
||||||
RefId: getQueryRefIdFromErrorString(err.Error(), requestQueries),
|
RefId: getQueryRefIdFromErrorString(err.Error(), requestQueries),
|
||||||
DataResponse: &dataResponse,
|
DataResponse: &dataResponse,
|
||||||
|
@ -17,9 +17,9 @@ import (
|
|||||||
"go.opentelemetry.io/otel/codes"
|
"go.opentelemetry.io/otel/codes"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
|
||||||
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
|
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
|
||||||
exp "github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Used in logging to mark a stage
|
// Used in logging to mark a stage
|
||||||
@ -182,9 +182,9 @@ func (c *baseClientImpl) ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearch
|
|||||||
status = "cancelled"
|
status = "cancelled"
|
||||||
}
|
}
|
||||||
lp := []any{"error", err, "status", status, "duration", time.Since(start), "stage", StageDatabaseRequest}
|
lp := []any{"error", err, "status", status, "duration", time.Since(start), "stage", StageDatabaseRequest}
|
||||||
sourceErr := exp.Error{}
|
sourceErr := backend.ErrorWithSource{}
|
||||||
if errors.As(err, &sourceErr) {
|
if errors.As(err, &sourceErr) {
|
||||||
lp = append(lp, "statusSource", sourceErr.Source())
|
lp = append(lp, "statusSource", sourceErr.ErrorSource())
|
||||||
}
|
}
|
||||||
if clientRes != nil {
|
if clientRes != nil {
|
||||||
lp = append(lp, "statusCode", clientRes.StatusCode)
|
lp = append(lp, "statusCode", clientRes.StatusCode)
|
||||||
|
@ -11,7 +11,6 @@ import (
|
|||||||
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
|
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||||
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
||||||
@ -52,7 +51,8 @@ func (e *elasticsearchDataQuery) execute() (*backend.QueryDataResponse, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
mq, _ := json.Marshal(e.dataQueries)
|
mq, _ := json.Marshal(e.dataQueries)
|
||||||
e.logger.Error("Failed to parse queries", "error", err, "queries", string(mq), "queriesLength", len(queries), "duration", time.Since(start), "stage", es.StagePrepareRequest)
|
e.logger.Error("Failed to parse queries", "error", err, "queries", string(mq), "queriesLength", len(queries), "duration", time.Since(start), "stage", es.StagePrepareRequest)
|
||||||
return errorsource.AddPluginErrorToResponse(e.dataQueries[0].RefID, response, err), nil
|
response.Responses[e.dataQueries[0].RefID] = backend.ErrorResponseWithErrorSource(err)
|
||||||
|
return response, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
ms := e.client.MultiSearch()
|
ms := e.client.MultiSearch()
|
||||||
@ -63,7 +63,8 @@ func (e *elasticsearchDataQuery) execute() (*backend.QueryDataResponse, error) {
|
|||||||
if err := e.processQuery(q, ms, from, to); err != nil {
|
if err := e.processQuery(q, ms, from, to); err != nil {
|
||||||
mq, _ := json.Marshal(q)
|
mq, _ := json.Marshal(q)
|
||||||
e.logger.Error("Failed to process query to multisearch request builder", "error", err, "query", string(mq), "queriesLength", len(queries), "duration", time.Since(start), "stage", es.StagePrepareRequest)
|
e.logger.Error("Failed to process query to multisearch request builder", "error", err, "query", string(mq), "queriesLength", len(queries), "duration", time.Since(start), "stage", es.StagePrepareRequest)
|
||||||
return errorsource.AddPluginErrorToResponse(q.RefID, response, err), nil
|
response.Responses[q.RefID] = backend.ErrorResponseWithErrorSource(err)
|
||||||
|
return response, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -71,21 +72,28 @@ func (e *elasticsearchDataQuery) execute() (*backend.QueryDataResponse, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
mqs, _ := json.Marshal(e.dataQueries)
|
mqs, _ := json.Marshal(e.dataQueries)
|
||||||
e.logger.Error("Failed to build multisearch request", "error", err, "queriesLength", len(queries), "queries", string(mqs), "duration", time.Since(start), "stage", es.StagePrepareRequest)
|
e.logger.Error("Failed to build multisearch request", "error", err, "queriesLength", len(queries), "queries", string(mqs), "duration", time.Since(start), "stage", es.StagePrepareRequest)
|
||||||
return errorsource.AddPluginErrorToResponse(e.dataQueries[0].RefID, response, err), nil
|
response.Responses[e.dataQueries[0].RefID] = backend.ErrorResponseWithErrorSource(err)
|
||||||
|
return response, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
e.logger.Info("Prepared request", "queriesLength", len(queries), "duration", time.Since(start), "stage", es.StagePrepareRequest)
|
e.logger.Info("Prepared request", "queriesLength", len(queries), "duration", time.Since(start), "stage", es.StagePrepareRequest)
|
||||||
res, err := e.client.ExecuteMultisearch(req)
|
res, err := e.client.ExecuteMultisearch(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if backend.IsDownstreamHTTPError(err) {
|
if backend.IsDownstreamHTTPError(err) {
|
||||||
err = errorsource.DownstreamError(err, false)
|
err = backend.DownstreamError(err)
|
||||||
}
|
}
|
||||||
return errorsource.AddErrorToResponse(e.dataQueries[0].RefID, response, err), nil
|
response.Responses[e.dataQueries[0].RefID] = backend.ErrorResponseWithErrorSource(err)
|
||||||
|
return response, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if res.Status >= 400 {
|
if res.Status >= 400 {
|
||||||
errWithSource := errorsource.SourceError(backend.ErrorSourceFromHTTPStatus(res.Status), fmt.Errorf("unexpected status code: %d", res.Status), false)
|
statusErr := fmt.Errorf("unexpected status code: %d", res.Status)
|
||||||
return errorsource.AddErrorToResponse(e.dataQueries[0].RefID, response, errWithSource), nil
|
if backend.ErrorSourceFromHTTPStatus(res.Status) == backend.ErrorSourceDownstream {
|
||||||
|
response.Responses[e.dataQueries[0].RefID] = backend.ErrorResponseWithErrorSource(backend.DownstreamError(statusErr))
|
||||||
|
} else {
|
||||||
|
response.Responses[e.dataQueries[0].RefID] = backend.ErrorResponseWithErrorSource(backend.PluginError(statusErr))
|
||||||
|
}
|
||||||
|
return response, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return parseResponse(e.ctx, res.Responses, queries, e.client.GetConfiguredFields(), e.keepLabelsInResponse, e.logger)
|
return parseResponse(e.ctx, res.Responses, queries, e.client.GetConfiguredFields(), e.keepLabelsInResponse, e.logger)
|
||||||
@ -94,8 +102,7 @@ func (e *elasticsearchDataQuery) execute() (*backend.QueryDataResponse, error) {
|
|||||||
func (e *elasticsearchDataQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilder, from, to int64) error {
|
func (e *elasticsearchDataQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilder, from, to int64) error {
|
||||||
err := isQueryWithError(q)
|
err := isQueryWithError(q)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = errorsource.DownstreamError(fmt.Errorf("received invalid query. %w", err), false)
|
return backend.DownstreamError(fmt.Errorf("received invalid query. %w", err))
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
defaultTimeField := e.client.GetConfiguredFields().TimeField
|
defaultTimeField := e.client.GetConfiguredFields().TimeField
|
||||||
|
@ -1431,7 +1431,6 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) {
|
|||||||
"query": "foo",
|
"query": "foo",
|
||||||
}`, from, to)
|
}`, from, to)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, res.Responses["A"].ErrorSource, backend.ErrorSourcePlugin)
|
|
||||||
require.Equal(t, res.Responses["A"].Error.Error(), "invalid character '}' looking for beginning of object key string")
|
require.Equal(t, res.Responses["A"].Error.Error(), "invalid character '}' looking for beginning of object key string")
|
||||||
}))
|
}))
|
||||||
})
|
})
|
||||||
|
@ -19,7 +19,6 @@ import (
|
|||||||
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
|
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
|
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
||||||
exp "github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
|
|
||||||
|
|
||||||
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
||||||
)
|
)
|
||||||
@ -98,11 +97,11 @@ func newInstanceSettings(httpClientProvider *httpclient.Provider) datasource.Ins
|
|||||||
|
|
||||||
timeField, ok := jsonData["timeField"].(string)
|
timeField, ok := jsonData["timeField"].(string)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, exp.DownstreamError(errors.New("timeField cannot be cast to string"), false)
|
return nil, backend.DownstreamError(errors.New("timeField cannot be cast to string"))
|
||||||
}
|
}
|
||||||
|
|
||||||
if timeField == "" {
|
if timeField == "" {
|
||||||
return nil, exp.DownstreamError(errors.New("elasticsearch time field name is required"), false)
|
return nil, backend.DownstreamError(errors.New("elasticsearch time field name is required"))
|
||||||
}
|
}
|
||||||
|
|
||||||
logLevelField, ok := jsonData["logLevelField"].(string)
|
logLevelField, ok := jsonData["logLevelField"].(string)
|
||||||
@ -221,9 +220,9 @@ func (s *Service) CallResource(ctx context.Context, req *backend.CallResourceReq
|
|||||||
status = "cancelled"
|
status = "cancelled"
|
||||||
}
|
}
|
||||||
lp := []any{"error", err, "status", status, "duration", time.Since(start), "stage", es.StageDatabaseRequest, "resourcePath", req.Path}
|
lp := []any{"error", err, "status", status, "duration", time.Since(start), "stage", es.StageDatabaseRequest, "resourcePath", req.Path}
|
||||||
sourceErr := exp.Error{}
|
sourceErr := backend.ErrorWithSource{}
|
||||||
if errors.As(err, &sourceErr) {
|
if errors.As(err, &sourceErr) {
|
||||||
lp = append(lp, "statusSource", sourceErr.Source())
|
lp = append(lp, "statusSource", sourceErr.ErrorSource())
|
||||||
}
|
}
|
||||||
if response != nil {
|
if response != nil {
|
||||||
lp = append(lp, "statusCode", response.StatusCode)
|
lp = append(lp, "statusCode", response.StatusCode)
|
||||||
|
@ -5,7 +5,6 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -121,7 +120,7 @@ func TestErrorTooManyDateHistogramBuckets(t *testing.T) {
|
|||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
require.Len(t, dataResponse.Frames, 0)
|
require.Len(t, dataResponse.Frames, 0)
|
||||||
require.ErrorContains(t, dataResponse.Error, "Trying to create too many buckets. Must be less than or equal to: [65536].")
|
require.ErrorContains(t, dataResponse.Error, "Trying to create too many buckets. Must be less than or equal to: [65536].")
|
||||||
var sourceErr errorsource.Error
|
var sourceErr backend.ErrorWithSource
|
||||||
ok = errors.As(dataResponse.Error, &sourceErr)
|
ok = errors.As(dataResponse.Error, &sourceErr)
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
require.Equal(t, sourceErr.ErrorSource().String(), "downstream")
|
require.Equal(t, sourceErr.ErrorSource().String(), "downstream")
|
||||||
|
@ -15,7 +15,6 @@ import (
|
|||||||
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
|
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
|
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/codes"
|
"go.opentelemetry.io/otel/codes"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
@ -75,7 +74,7 @@ func parseResponse(ctx context.Context, responses []*es.SearchResponse, targets
|
|||||||
resSpan.End()
|
resSpan.End()
|
||||||
logger.Error("Processing error response from Elasticsearch", "error", string(me), "query", string(mt))
|
logger.Error("Processing error response from Elasticsearch", "error", string(me), "query", string(mt))
|
||||||
errResult := getErrorFromElasticResponse(res)
|
errResult := getErrorFromElasticResponse(res)
|
||||||
result.Responses[target.RefID] = errorsource.Response(errorsource.DownstreamError(errors.New(errResult), false))
|
result.Responses[target.RefID] = backend.ErrorResponseWithErrorSource(backend.DownstreamError(errors.New(errResult)))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user