mirror of
https://github.com/grafana/grafana.git
synced 2025-07-31 04:52:24 +08:00
Alerting: Recording rules understands errors embedded in dataframes (#88946)
* Make MakeDependencyError public for tests in another package * Create tests for errors in eval results * Extract logic to pull frame errors out into exported function * Maybe we can drop cyclomatic complexity lint suppression now? * extract frame errors and fail recording rules if frames contain error * Fix up retry logic to actually work * Do not retry non retryable errors
This commit is contained in:
@ -63,7 +63,7 @@ var DependencyError = errutil.NewBase(
|
|||||||
depErrStr,
|
depErrStr,
|
||||||
errutil.WithPublic(depErrStr))
|
errutil.WithPublic(depErrStr))
|
||||||
|
|
||||||
func makeDependencyError(refID, depRefID string) error {
|
func MakeDependencyError(refID, depRefID string) error {
|
||||||
data := errutil.TemplateData{
|
data := errutil.TemplateData{
|
||||||
Public: map[string]interface{}{
|
Public: map[string]interface{}{
|
||||||
"refId": refID,
|
"refId": refID,
|
||||||
|
@ -90,7 +90,7 @@ func (dp *DataPipeline) execute(c context.Context, now time.Time, s *Service) (m
|
|||||||
if res, ok := vars[neededVar]; ok {
|
if res, ok := vars[neededVar]; ok {
|
||||||
if res.Error != nil {
|
if res.Error != nil {
|
||||||
errResult := mathexp.Results{
|
errResult := mathexp.Results{
|
||||||
Error: makeDependencyError(node.RefID(), neededVar),
|
Error: MakeDependencyError(node.RefID(), neededVar),
|
||||||
}
|
}
|
||||||
vars[node.RefID()] = errResult
|
vars[node.RefID()] = errResult
|
||||||
hasDepError = true
|
hasDepError = true
|
||||||
|
@ -141,9 +141,6 @@ type ExecutionResults struct {
|
|||||||
// Results contains the results of all queries, reduce and math expressions
|
// Results contains the results of all queries, reduce and math expressions
|
||||||
Results map[string]data.Frames
|
Results map[string]data.Frames
|
||||||
|
|
||||||
// Errors contains a map of RefIDs that returned an error
|
|
||||||
Errors map[string]error
|
|
||||||
|
|
||||||
// NoData contains the DatasourceUID for RefIDs that returned no data.
|
// NoData contains the DatasourceUID for RefIDs that returned no data.
|
||||||
NoData map[string]string
|
NoData map[string]string
|
||||||
|
|
||||||
@ -166,20 +163,29 @@ func (evalResults Results) HasErrors() bool {
|
|||||||
// HasNonRetryableErrors returns true if we have at least 1 result with:
|
// HasNonRetryableErrors returns true if we have at least 1 result with:
|
||||||
// 1. A `State` of `Error`
|
// 1. A `State` of `Error`
|
||||||
// 2. The `Error` attribute is not nil
|
// 2. The `Error` attribute is not nil
|
||||||
// 3. The `Error` type is of `&invalidEvalResultFormatError` or `ErrSeriesMustBeWide`
|
// 3. The `Error` matches IsNonRetryableError
|
||||||
// Our thinking with this approach, is that we don't want to retry errors that have relation with invalid alert definition format.
|
// Our thinking with this approach, is that we don't want to retry errors that have relation with invalid alert definition format.
|
||||||
func (evalResults Results) HasNonRetryableErrors() bool {
|
func (evalResults Results) HasNonRetryableErrors() bool {
|
||||||
for _, r := range evalResults {
|
for _, r := range evalResults {
|
||||||
if r.State == Error && r.Error != nil {
|
if r.State == Error && r.Error != nil {
|
||||||
|
if IsNonRetryableError(r.Error) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsNonRetryableError indicates whether an error is considered persistent and not worth performing evaluation retries.
|
||||||
|
// Currently it is true if err is `&invalidEvalResultFormatError` or `ErrSeriesMustBeWide`
|
||||||
|
func IsNonRetryableError(err error) bool {
|
||||||
var nonRetryableError *invalidEvalResultFormatError
|
var nonRetryableError *invalidEvalResultFormatError
|
||||||
if errors.As(r.Error, &nonRetryableError) {
|
if errors.As(err, &nonRetryableError) {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if errors.Is(r.Error, expr.ErrSeriesMustBeWide) {
|
if errors.Is(err, expr.ErrSeriesMustBeWide) {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -407,7 +413,6 @@ type NumberValueCapture struct {
|
|||||||
Value *float64
|
Value *float64
|
||||||
}
|
}
|
||||||
|
|
||||||
//nolint:gocyclo
|
|
||||||
func queryDataResponseToExecutionResults(c models.Condition, execResp *backend.QueryDataResponse) ExecutionResults {
|
func queryDataResponseToExecutionResults(c models.Condition, execResp *backend.QueryDataResponse) ExecutionResults {
|
||||||
// captures contains the values of all instant queries and expressions for each dimension
|
// captures contains the values of all instant queries and expressions for each dimension
|
||||||
captures := make(map[string]map[data.Fingerprint]NumberValueCapture)
|
captures := make(map[string]map[data.Fingerprint]NumberValueCapture)
|
||||||
@ -433,17 +438,10 @@ func queryDataResponseToExecutionResults(c models.Condition, execResp *backend.Q
|
|||||||
}
|
}
|
||||||
|
|
||||||
result := ExecutionResults{Results: make(map[string]data.Frames)}
|
result := ExecutionResults{Results: make(map[string]data.Frames)}
|
||||||
for refID, res := range execResp.Responses {
|
|
||||||
if res.Error != nil {
|
|
||||||
if result.Errors == nil {
|
|
||||||
result.Errors = make(map[string]error)
|
|
||||||
}
|
|
||||||
result.Errors[refID] = res.Error
|
|
||||||
if refID == c.Condition {
|
|
||||||
result.Error = res.Error
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
result.Error = FindConditionError(execResp, c.Condition)
|
||||||
|
|
||||||
|
for refID, res := range execResp.Responses {
|
||||||
// There are two possible frame formats for No Data:
|
// There are two possible frame formats for No Data:
|
||||||
//
|
//
|
||||||
// 1. A response with no frames
|
// 1. A response with no frames
|
||||||
@ -526,30 +524,50 @@ func queryDataResponseToExecutionResults(c models.Condition, execResp *backend.Q
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindConditionError extracts the error from a query response that caused the given condition to fail.
|
||||||
|
// If a condition failed because a node it depends on had an error, that error is returned instead.
|
||||||
|
// It returns nil if there are no errors related to the condition.
|
||||||
|
func FindConditionError(resp *backend.QueryDataResponse, condition string) error {
|
||||||
|
if resp == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
errs := make(map[string]error)
|
||||||
|
for refID, node := range resp.Responses {
|
||||||
|
if node.Error != nil {
|
||||||
|
errs[refID] = node.Error
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
conditionErr := errs[condition]
|
||||||
|
|
||||||
// If the error of the condition is an Error that indicates the condition failed
|
// If the error of the condition is an Error that indicates the condition failed
|
||||||
// because one of its dependent query or expressions failed, then we follow
|
// because one of its dependent query or expressions failed, then we follow
|
||||||
// the dependency chain to an error that is not a dependency error.
|
// the dependency chain to an error that is not a dependency error.
|
||||||
if len(result.Errors) > 0 && result.Error != nil {
|
if conditionErr != nil {
|
||||||
if errors.Is(result.Error, expr.DependencyError) {
|
if errors.Is(conditionErr, expr.DependencyError) {
|
||||||
var utilError errutil.Error
|
var utilError errutil.Error
|
||||||
e := result.Error
|
e := conditionErr
|
||||||
for {
|
for {
|
||||||
errors.As(e, &utilError)
|
errors.As(e, &utilError)
|
||||||
depRefID := utilError.PublicPayload["depRefId"].(string)
|
depRefID := utilError.PublicPayload["depRefId"].(string)
|
||||||
depError, ok := result.Errors[depRefID]
|
depError, ok := errs[depRefID]
|
||||||
if !ok {
|
if !ok {
|
||||||
return result
|
return conditionErr
|
||||||
}
|
}
|
||||||
if !errors.Is(depError, expr.DependencyError) {
|
if !errors.Is(depError, expr.DependencyError) {
|
||||||
result.Error = depError
|
conditionErr = depError
|
||||||
return result
|
return conditionErr
|
||||||
}
|
}
|
||||||
e = depError
|
e = depError
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return result
|
return conditionErr
|
||||||
}
|
}
|
||||||
|
|
||||||
// datasourceUIDsToRefIDs returns a sorted slice of Ref IDs for each Datasource UID.
|
// datasourceUIDsToRefIDs returns a sorted slice of Ref IDs for each Datasource UID.
|
||||||
|
@ -894,6 +894,85 @@ func TestEvaluate(t *testing.T) {
|
|||||||
},
|
},
|
||||||
EvaluationString: "[ var='A' labels={foo=bar} value=10 ], [ var='B' labels={bar=baz, foo=bar} value=1 ]",
|
EvaluationString: "[ var='A' labels={foo=bar} value=10 ], [ var='B' labels={bar=baz, foo=bar} value=1 ]",
|
||||||
}},
|
}},
|
||||||
|
}, {
|
||||||
|
name: "results contains error if condition frame has error",
|
||||||
|
cond: models.Condition{
|
||||||
|
Condition: "B",
|
||||||
|
},
|
||||||
|
resp: backend.QueryDataResponse{
|
||||||
|
Responses: backend.Responses{
|
||||||
|
"A": {
|
||||||
|
Frames: []*data.Frame{{
|
||||||
|
RefID: "A",
|
||||||
|
Fields: []*data.Field{
|
||||||
|
data.NewField(
|
||||||
|
"Value",
|
||||||
|
data.Labels{"foo": "bar"},
|
||||||
|
[]*float64{util.Pointer(10.0)},
|
||||||
|
),
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
},
|
||||||
|
"B": {
|
||||||
|
Frames: []*data.Frame{{
|
||||||
|
RefID: "B",
|
||||||
|
Fields: []*data.Field{
|
||||||
|
data.NewField(
|
||||||
|
"Value",
|
||||||
|
data.Labels{"foo": "bar", "bar": "baz"},
|
||||||
|
[]*float64{util.Pointer(1.0)},
|
||||||
|
),
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
Error: errors.New("some frame error"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expected: Results{{
|
||||||
|
State: Error,
|
||||||
|
Error: errors.New("some frame error"),
|
||||||
|
EvaluationString: "",
|
||||||
|
}},
|
||||||
|
}, {
|
||||||
|
name: "results contain underlying error if condition frame has error that depends on another node",
|
||||||
|
cond: models.Condition{
|
||||||
|
Condition: "B",
|
||||||
|
},
|
||||||
|
resp: backend.QueryDataResponse{
|
||||||
|
Responses: backend.Responses{
|
||||||
|
"A": {
|
||||||
|
Frames: []*data.Frame{{
|
||||||
|
RefID: "A",
|
||||||
|
Fields: []*data.Field{
|
||||||
|
data.NewField(
|
||||||
|
"Value",
|
||||||
|
data.Labels{"foo": "bar"},
|
||||||
|
[]*float64{util.Pointer(10.0)},
|
||||||
|
),
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
Error: errors.New("another error depends on me"),
|
||||||
|
},
|
||||||
|
"B": {
|
||||||
|
Frames: []*data.Frame{{
|
||||||
|
RefID: "B",
|
||||||
|
Fields: []*data.Field{
|
||||||
|
data.NewField(
|
||||||
|
"Value",
|
||||||
|
data.Labels{"foo": "bar", "bar": "baz"},
|
||||||
|
[]*float64{util.Pointer(1.0)},
|
||||||
|
),
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
Error: expr.MakeDependencyError("B", "A"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expected: Results{{
|
||||||
|
State: Error,
|
||||||
|
Error: errors.New("another error depends on me"),
|
||||||
|
EvaluationString: "",
|
||||||
|
}},
|
||||||
}}
|
}}
|
||||||
|
|
||||||
for _, tc := range cases {
|
for _, tc := range cases {
|
||||||
|
@ -116,7 +116,10 @@ func (r *recordingRule) doEvaluate(ctx context.Context, ev *Evaluation) {
|
|||||||
logger := r.logger.FromContext(ctx).New("now", ev.scheduledAt, "fingerprint", ev.Fingerprint())
|
logger := r.logger.FromContext(ctx).New("now", ev.scheduledAt, "fingerprint", ev.Fingerprint())
|
||||||
orgID := fmt.Sprint(ev.rule.OrgID)
|
orgID := fmt.Sprint(ev.rule.OrgID)
|
||||||
evalDuration := r.metrics.EvalDuration.WithLabelValues(orgID)
|
evalDuration := r.metrics.EvalDuration.WithLabelValues(orgID)
|
||||||
|
evalAttemptTotal := r.metrics.EvalAttemptTotal.WithLabelValues(orgID)
|
||||||
|
evalAttemptFailures := r.metrics.EvalAttemptFailures.WithLabelValues(orgID)
|
||||||
evalTotal := r.metrics.EvalTotal.WithLabelValues(orgID)
|
evalTotal := r.metrics.EvalTotal.WithLabelValues(orgID)
|
||||||
|
evalTotalFailures := r.metrics.EvalFailures.WithLabelValues(orgID)
|
||||||
evalStart := r.clock.Now()
|
evalStart := r.clock.Now()
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
@ -139,6 +142,7 @@ func (r *recordingRule) doEvaluate(ctx context.Context, ev *Evaluation) {
|
|||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
var latestError error
|
||||||
for attempt := int64(1); attempt <= r.maxAttempts; attempt++ {
|
for attempt := int64(1); attempt <= r.maxAttempts; attempt++ {
|
||||||
logger := logger.New("attempt", attempt)
|
logger := logger.New("attempt", attempt)
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
@ -147,12 +151,21 @@ func (r *recordingRule) doEvaluate(ctx context.Context, ev *Evaluation) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
evalAttemptTotal.Inc()
|
||||||
err := r.tryEvaluation(ctx, ev, logger)
|
err := r.tryEvaluation(ctx, ev, logger)
|
||||||
|
latestError = err
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Error("Failed to evaluate rule", "attempt", attempt, "error", err)
|
logger.Error("Failed to evaluate rule", "attempt", attempt, "error", err)
|
||||||
|
evalAttemptFailures.Inc()
|
||||||
|
|
||||||
|
if eval.IsNonRetryableError(err) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if attempt < r.maxAttempts {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
logger.Error("Context has been cancelled while backing off", "attempt", attempt)
|
logger.Error("Context has been cancelled while backing off", "attempt", attempt)
|
||||||
@ -161,35 +174,36 @@ func (r *recordingRule) doEvaluate(ctx context.Context, ev *Evaluation) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if latestError != nil {
|
||||||
|
evalTotalFailures.Inc()
|
||||||
|
span.SetStatus(codes.Error, "rule evaluation failed")
|
||||||
|
span.RecordError(latestError)
|
||||||
|
if r.maxAttempts > 0 {
|
||||||
|
logger.Error("Recording rule evaluation failed after all attempts", "lastError", latestError)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logger.Debug("Recording rule evaluation succeeded")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *recordingRule) tryEvaluation(ctx context.Context, ev *Evaluation, logger log.Logger) error {
|
func (r *recordingRule) tryEvaluation(ctx context.Context, ev *Evaluation, logger log.Logger) error {
|
||||||
orgID := fmt.Sprint(ev.rule.OrgID)
|
|
||||||
evalAttemptTotal := r.metrics.EvalAttemptTotal.WithLabelValues(orgID)
|
|
||||||
evalAttemptFailures := r.metrics.EvalAttemptFailures.WithLabelValues(orgID)
|
|
||||||
evalTotalFailures := r.metrics.EvalFailures.WithLabelValues(orgID)
|
|
||||||
|
|
||||||
evalStart := r.clock.Now()
|
evalStart := r.clock.Now()
|
||||||
evalCtx := eval.NewContext(ctx, SchedulerUserFor(ev.rule.OrgID))
|
evalCtx := eval.NewContext(ctx, SchedulerUserFor(ev.rule.OrgID))
|
||||||
result, err := r.buildAndExecutePipeline(ctx, evalCtx, ev, logger)
|
result, err := r.buildAndExecutePipeline(ctx, evalCtx, ev, logger)
|
||||||
evalDur := r.clock.Now().Sub(evalStart)
|
evalDur := r.clock.Now().Sub(evalStart)
|
||||||
|
|
||||||
evalAttemptTotal.Inc()
|
|
||||||
span := trace.SpanFromContext(ctx)
|
|
||||||
|
|
||||||
// TODO: In some cases, err can be nil but the dataframe itself contains embedded error frames. Parse these out like we do when evaluating alert rules.
|
|
||||||
// TODO: (Maybe, refactor something in eval package so we can use shared code for this)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
evalAttemptFailures.Inc()
|
|
||||||
// TODO: Only errors embedded in the frame can be considered retryable.
|
|
||||||
// TODO: Since we are not handling these yet per the above TODO, we can blindly consider all errors to be non-retryable for now, and just exit.
|
|
||||||
evalTotalFailures.Inc()
|
|
||||||
span.SetStatus(codes.Error, "rule evaluation failed")
|
|
||||||
span.RecordError(err)
|
|
||||||
return fmt.Errorf("server side expressions pipeline returned an error: %w", err)
|
return fmt.Errorf("server side expressions pipeline returned an error: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// There might be errors in the pipeline results, even if the query succeeded.
|
||||||
|
if err := eval.FindConditionError(result, ev.rule.Record.From); err != nil {
|
||||||
|
return fmt.Errorf("the query failed with an error: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
logger.Info("Recording rule evaluated", "results", result, "duration", evalDur)
|
logger.Info("Recording rule evaluated", "results", result, "duration", evalDur)
|
||||||
|
span := trace.SpanFromContext(ctx)
|
||||||
span.AddEvent("rule evaluated", trace.WithAttributes(
|
span.AddEvent("rule evaluated", trace.WithAttributes(
|
||||||
attribute.Int64("results", int64(len(result.Responses))),
|
attribute.Int64("results", int64(len(result.Responses))),
|
||||||
))
|
))
|
||||||
|
Reference in New Issue
Block a user