From d004f8a98de3d714ebb6628e4179ab827a8f3d0c Mon Sep 17 00:00:00 2001 From: Alexander Weaver Date: Tue, 11 Jun 2024 10:37:10 -0500 Subject: [PATCH] Alerting: Recording rules understands errors embedded in dataframes (#88946) * Make MakeDependencyError public for tests in another package * Create tests for errors in eval results * Extract logic to pull frame errors out into exported function * Maybe we can drop cyclomatic complexity lint suppression now? * extract frame errors and fail recording rules if frames contain error * Fix up retry logic to actually work * Do not retry non retryable errors --- pkg/expr/errors.go | 2 +- pkg/expr/graph.go | 2 +- pkg/services/ngalert/eval/eval.go | 74 ++++++++++------- pkg/services/ngalert/eval/eval_test.go | 79 +++++++++++++++++++ .../ngalert/schedule/recording_rule.go | 62 +++++++++------ 5 files changed, 165 insertions(+), 54 deletions(-) diff --git a/pkg/expr/errors.go b/pkg/expr/errors.go index 63c216fb735..3f162b7026c 100644 --- a/pkg/expr/errors.go +++ b/pkg/expr/errors.go @@ -63,7 +63,7 @@ var DependencyError = errutil.NewBase( depErrStr, errutil.WithPublic(depErrStr)) -func makeDependencyError(refID, depRefID string) error { +func MakeDependencyError(refID, depRefID string) error { data := errutil.TemplateData{ Public: map[string]interface{}{ "refId": refID, diff --git a/pkg/expr/graph.go b/pkg/expr/graph.go index d3d25e8e13a..369ff116a94 100644 --- a/pkg/expr/graph.go +++ b/pkg/expr/graph.go @@ -90,7 +90,7 @@ func (dp *DataPipeline) execute(c context.Context, now time.Time, s *Service) (m if res, ok := vars[neededVar]; ok { if res.Error != nil { errResult := mathexp.Results{ - Error: makeDependencyError(node.RefID(), neededVar), + Error: MakeDependencyError(node.RefID(), neededVar), } vars[node.RefID()] = errResult hasDepError = true diff --git a/pkg/services/ngalert/eval/eval.go b/pkg/services/ngalert/eval/eval.go index 896e4c1c1b8..b22b4e3fa16 100644 --- a/pkg/services/ngalert/eval/eval.go +++ b/pkg/services/ngalert/eval/eval.go @@ -141,9 +141,6 @@ type ExecutionResults struct { // Results contains the results of all queries, reduce and math expressions Results map[string]data.Frames - // Errors contains a map of RefIDs that returned an error - Errors map[string]error - // NoData contains the DatasourceUID for RefIDs that returned no data. NoData map[string]string @@ -166,16 +163,12 @@ func (evalResults Results) HasErrors() bool { // HasNonRetryableErrors returns true if we have at least 1 result with: // 1. A `State` of `Error` // 2. The `Error` attribute is not nil -// 3. The `Error` type is of `&invalidEvalResultFormatError` or `ErrSeriesMustBeWide` +// 3. The `Error` matches IsNonRetryableError // Our thinking with this approach, is that we don't want to retry errors that have relation with invalid alert definition format. func (evalResults Results) HasNonRetryableErrors() bool { for _, r := range evalResults { if r.State == Error && r.Error != nil { - var nonRetryableError *invalidEvalResultFormatError - if errors.As(r.Error, &nonRetryableError) { - return true - } - if errors.Is(r.Error, expr.ErrSeriesMustBeWide) { + if IsNonRetryableError(r.Error) { return true } } @@ -183,6 +176,19 @@ func (evalResults Results) HasNonRetryableErrors() bool { return false } +// IsNonRetryableError indicates whether an error is considered persistent and not worth performing evaluation retries. +// Currently it is true if err is `&invalidEvalResultFormatError` or `ErrSeriesMustBeWide` +func IsNonRetryableError(err error) bool { + var nonRetryableError *invalidEvalResultFormatError + if errors.As(err, &nonRetryableError) { + return true + } + if errors.Is(err, expr.ErrSeriesMustBeWide) { + return true + } + return false +} + // HasErrors returns true when Results contains at least one element and all elements are errors func (evalResults Results) IsError() bool { for _, r := range evalResults { @@ -407,7 +413,6 @@ type NumberValueCapture struct { Value *float64 } -//nolint:gocyclo func queryDataResponseToExecutionResults(c models.Condition, execResp *backend.QueryDataResponse) ExecutionResults { // captures contains the values of all instant queries and expressions for each dimension captures := make(map[string]map[data.Fingerprint]NumberValueCapture) @@ -433,17 +438,10 @@ func queryDataResponseToExecutionResults(c models.Condition, execResp *backend.Q } result := ExecutionResults{Results: make(map[string]data.Frames)} - for refID, res := range execResp.Responses { - if res.Error != nil { - if result.Errors == nil { - result.Errors = make(map[string]error) - } - result.Errors[refID] = res.Error - if refID == c.Condition { - result.Error = res.Error - } - } + result.Error = FindConditionError(execResp, c.Condition) + + for refID, res := range execResp.Responses { // There are two possible frame formats for No Data: // // 1. A response with no frames @@ -526,30 +524,50 @@ func queryDataResponseToExecutionResults(c models.Condition, execResp *backend.Q } } + return result +} + +// FindConditionError extracts the error from a query response that caused the given condition to fail. +// If a condition failed because a node it depends on had an error, that error is returned instead. +// It returns nil if there are no errors related to the condition. +func FindConditionError(resp *backend.QueryDataResponse, condition string) error { + if resp == nil { + return nil + } + + errs := make(map[string]error) + for refID, node := range resp.Responses { + if node.Error != nil { + errs[refID] = node.Error + } + } + + conditionErr := errs[condition] + // If the error of the condition is an Error that indicates the condition failed // because one of its dependent query or expressions failed, then we follow // the dependency chain to an error that is not a dependency error. - if len(result.Errors) > 0 && result.Error != nil { - if errors.Is(result.Error, expr.DependencyError) { + if conditionErr != nil { + if errors.Is(conditionErr, expr.DependencyError) { var utilError errutil.Error - e := result.Error + e := conditionErr for { errors.As(e, &utilError) depRefID := utilError.PublicPayload["depRefId"].(string) - depError, ok := result.Errors[depRefID] + depError, ok := errs[depRefID] if !ok { - return result + return conditionErr } if !errors.Is(depError, expr.DependencyError) { - result.Error = depError - return result + conditionErr = depError + return conditionErr } e = depError } } } - return result + return conditionErr } // datasourceUIDsToRefIDs returns a sorted slice of Ref IDs for each Datasource UID. diff --git a/pkg/services/ngalert/eval/eval_test.go b/pkg/services/ngalert/eval/eval_test.go index 8888235fb72..8f0c0e473ad 100644 --- a/pkg/services/ngalert/eval/eval_test.go +++ b/pkg/services/ngalert/eval/eval_test.go @@ -894,6 +894,85 @@ func TestEvaluate(t *testing.T) { }, EvaluationString: "[ var='A' labels={foo=bar} value=10 ], [ var='B' labels={bar=baz, foo=bar} value=1 ]", }}, + }, { + name: "results contains error if condition frame has error", + cond: models.Condition{ + Condition: "B", + }, + resp: backend.QueryDataResponse{ + Responses: backend.Responses{ + "A": { + Frames: []*data.Frame{{ + RefID: "A", + Fields: []*data.Field{ + data.NewField( + "Value", + data.Labels{"foo": "bar"}, + []*float64{util.Pointer(10.0)}, + ), + }, + }}, + }, + "B": { + Frames: []*data.Frame{{ + RefID: "B", + Fields: []*data.Field{ + data.NewField( + "Value", + data.Labels{"foo": "bar", "bar": "baz"}, + []*float64{util.Pointer(1.0)}, + ), + }, + }}, + Error: errors.New("some frame error"), + }, + }, + }, + expected: Results{{ + State: Error, + Error: errors.New("some frame error"), + EvaluationString: "", + }}, + }, { + name: "results contain underlying error if condition frame has error that depends on another node", + cond: models.Condition{ + Condition: "B", + }, + resp: backend.QueryDataResponse{ + Responses: backend.Responses{ + "A": { + Frames: []*data.Frame{{ + RefID: "A", + Fields: []*data.Field{ + data.NewField( + "Value", + data.Labels{"foo": "bar"}, + []*float64{util.Pointer(10.0)}, + ), + }, + }}, + Error: errors.New("another error depends on me"), + }, + "B": { + Frames: []*data.Frame{{ + RefID: "B", + Fields: []*data.Field{ + data.NewField( + "Value", + data.Labels{"foo": "bar", "bar": "baz"}, + []*float64{util.Pointer(1.0)}, + ), + }, + }}, + Error: expr.MakeDependencyError("B", "A"), + }, + }, + }, + expected: Results{{ + State: Error, + Error: errors.New("another error depends on me"), + EvaluationString: "", + }}, }} for _, tc := range cases { diff --git a/pkg/services/ngalert/schedule/recording_rule.go b/pkg/services/ngalert/schedule/recording_rule.go index 01f4824e778..b84a98b6314 100644 --- a/pkg/services/ngalert/schedule/recording_rule.go +++ b/pkg/services/ngalert/schedule/recording_rule.go @@ -116,7 +116,10 @@ func (r *recordingRule) doEvaluate(ctx context.Context, ev *Evaluation) { logger := r.logger.FromContext(ctx).New("now", ev.scheduledAt, "fingerprint", ev.Fingerprint()) orgID := fmt.Sprint(ev.rule.OrgID) evalDuration := r.metrics.EvalDuration.WithLabelValues(orgID) + evalAttemptTotal := r.metrics.EvalAttemptTotal.WithLabelValues(orgID) + evalAttemptFailures := r.metrics.EvalAttemptFailures.WithLabelValues(orgID) evalTotal := r.metrics.EvalTotal.WithLabelValues(orgID) + evalTotalFailures := r.metrics.EvalFailures.WithLabelValues(orgID) evalStart := r.clock.Now() defer func() { @@ -139,6 +142,7 @@ func (r *recordingRule) doEvaluate(ctx context.Context, ev *Evaluation) { )) defer span.End() + var latestError error for attempt := int64(1); attempt <= r.maxAttempts; attempt++ { logger := logger.New("attempt", attempt) if ctx.Err() != nil { @@ -147,49 +151,59 @@ func (r *recordingRule) doEvaluate(ctx context.Context, ev *Evaluation) { return } + evalAttemptTotal.Inc() err := r.tryEvaluation(ctx, ev, logger) + latestError = err if err == nil { - return + break } logger.Error("Failed to evaluate rule", "attempt", attempt, "error", err) - select { - case <-ctx.Done(): - logger.Error("Context has been cancelled while backing off", "attempt", attempt) - return - case <-time.After(retryDelay): - continue + evalAttemptFailures.Inc() + + if eval.IsNonRetryableError(err) { + break } + + if attempt < r.maxAttempts { + select { + case <-ctx.Done(): + logger.Error("Context has been cancelled while backing off", "attempt", attempt) + return + case <-time.After(retryDelay): + continue + } + } + } + + if latestError != nil { + evalTotalFailures.Inc() + span.SetStatus(codes.Error, "rule evaluation failed") + span.RecordError(latestError) + if r.maxAttempts > 0 { + logger.Error("Recording rule evaluation failed after all attempts", "lastError", latestError) + } + } else { + logger.Debug("Recording rule evaluation succeeded") } } func (r *recordingRule) tryEvaluation(ctx context.Context, ev *Evaluation, logger log.Logger) error { - orgID := fmt.Sprint(ev.rule.OrgID) - evalAttemptTotal := r.metrics.EvalAttemptTotal.WithLabelValues(orgID) - evalAttemptFailures := r.metrics.EvalAttemptFailures.WithLabelValues(orgID) - evalTotalFailures := r.metrics.EvalFailures.WithLabelValues(orgID) - evalStart := r.clock.Now() evalCtx := eval.NewContext(ctx, SchedulerUserFor(ev.rule.OrgID)) result, err := r.buildAndExecutePipeline(ctx, evalCtx, ev, logger) evalDur := r.clock.Now().Sub(evalStart) - - evalAttemptTotal.Inc() - span := trace.SpanFromContext(ctx) - - // TODO: In some cases, err can be nil but the dataframe itself contains embedded error frames. Parse these out like we do when evaluating alert rules. - // TODO: (Maybe, refactor something in eval package so we can use shared code for this) if err != nil { - evalAttemptFailures.Inc() - // TODO: Only errors embedded in the frame can be considered retryable. - // TODO: Since we are not handling these yet per the above TODO, we can blindly consider all errors to be non-retryable for now, and just exit. - evalTotalFailures.Inc() - span.SetStatus(codes.Error, "rule evaluation failed") - span.RecordError(err) return fmt.Errorf("server side expressions pipeline returned an error: %w", err) } + // There might be errors in the pipeline results, even if the query succeeded. + if err := eval.FindConditionError(result, ev.rule.Record.From); err != nil { + return fmt.Errorf("the query failed with an error: %w", err) + } + logger.Info("Recording rule evaluated", "results", result, "duration", evalDur) + span := trace.SpanFromContext(ctx) span.AddEvent("rule evaluated", trace.WithAttributes( attribute.Int64("results", int64(len(result.Responses))), ))