Advisor: Fixes for retry item logic (#109918)

This commit is contained in:
Andres Martinez Gotor
2025-08-26 10:44:47 +02:00
committed by GitHub
parent f144f74995
commit 07b208ac43
3 changed files with 123 additions and 4 deletions

View File

@ -78,7 +78,7 @@ func New(cfg app.Config) (app.App, error) {
}
}()
}
if req.Action == resource.AdmissionActionUpdate {
if req.Action == resource.AdmissionActionUpdate && retryAnnotationChanged(req.OldObject, req.Object) {
go func() {
logger := log.WithContext(ctx).With("check", check.ID())
logger.Debug("Updating check", "namespace", req.Object.GetNamespace(), "name", req.Object.GetName())

View File

@ -7,6 +7,7 @@ import (
"slices"
"strings"
"sync"
"time"
"github.com/grafana/grafana-app-sdk/logging"
"github.com/grafana/grafana-app-sdk/resource"
@ -15,6 +16,8 @@ import (
"github.com/grafana/grafana/pkg/services/contexthandler"
)
var retryAnnotationPollingInterval = 1 * time.Second
func getCheck(obj resource.Object, checkMap map[string]checks.Check) (checks.Check, error) {
labels := obj.GetLabels()
objTypeLabel, ok := labels[checks.TypeLabel]
@ -100,13 +103,17 @@ func processCheckRetry(ctx context.Context, log logging.Logger, client resource.
status := checks.GetStatusAnnotation(obj)
if status == "" || status == checks.StatusAnnotationError {
// Check not processed yet or errored
log.Debug("Check not processed yet or errored, skipping retry", "check", obj.GetName(), "status", status)
return nil
}
// Get the item to retry from the annotation
itemToRetry := checks.GetRetryAnnotation(obj)
if itemToRetry == "" {
// No item to retry, nothing to do
log.Debug("No item to retry, skipping retry", "check", obj.GetName())
return nil
} else {
log.Debug("Item to retry found", "check", obj.GetName(), "item", itemToRetry)
}
c, ok := obj.(*advisorv0alpha1.Check)
if !ok {
@ -165,14 +172,23 @@ func processCheckRetry(ctx context.Context, log logging.Logger, client resource.
// Failure not in the list of items to retry, keep it
return false
})
// Wait for the retry annotation to be persisted before patching the object
err = waitForRetryAnnotation(ctx, log, client, obj, itemToRetry)
if err != nil {
return err
}
// Set the status
err = checks.SetStatus(ctx, client, obj, c.Status)
log.Debug("Status set", "check", obj.GetName(), "status.count", c.Status.Report.Count)
if err != nil {
return err
}
// Delete the retry annotation to mark the check as processed
annotations := checks.DeleteAnnotations(ctx, obj, []string{checks.RetryAnnotation})
err = checks.SetAnnotations(ctx, client, obj, annotations)
log.Debug("Annotations set", "check", obj.GetName(), "annotations", annotations)
return checks.SetAnnotations(ctx, client, obj, annotations)
return err
}
func runStepsInParallel(ctx context.Context, log logging.Logger, spec *advisorv0alpha1.CheckSpec, steps []checks.Step, items []any) ([]advisorv0alpha1.CheckReportFailure, error) {
@ -234,3 +250,47 @@ func filterSteps(checkType resource.Object, steps []checks.Step) ([]checks.Step,
}
return steps, nil
}
// retryAnnotationChanged compares the retry annotation between old and new objects
func retryAnnotationChanged(oldObj, newObj resource.Object) bool {
if oldObj == nil || newObj == nil {
return true // If either is nil, consider it changed
}
// Compare annotations
oldAnnotations := oldObj.GetAnnotations()
newAnnotations := newObj.GetAnnotations()
return newAnnotations[checks.RetryAnnotation] != "" &&
oldAnnotations[checks.RetryAnnotation] != newAnnotations[checks.RetryAnnotation]
}
// waitForRetryAnnotation waits for the retry annotation to match the item to retry
func waitForRetryAnnotation(ctx context.Context, log logging.Logger, client resource.Client, obj resource.Object, itemToRetry string) error {
currentObj, err := client.Get(ctx, resource.Identifier{
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
})
if err != nil {
return err
}
retries := 0
currentRetryAnnotation := checks.GetRetryAnnotation(currentObj)
for currentRetryAnnotation != itemToRetry {
log.Debug("Waiting for retry annotation to be persisted", "check", obj.GetName(), "item", itemToRetry, "currentRetryAnnotation", currentRetryAnnotation)
time.Sleep(retryAnnotationPollingInterval)
retries++
if retries > 5 {
return fmt.Errorf("timeout waiting for retry annotation to be persisted")
}
currentObj, err = client.Get(ctx, resource.Identifier{
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
})
if err != nil {
return err
}
currentRetryAnnotation = checks.GetRetryAnnotation(currentObj)
}
log.Debug("Retry annotation persisted", "check", obj.GetName(), "item", itemToRetry)
return nil
}

View File

@ -6,6 +6,7 @@ import (
"fmt"
"net/http"
"testing"
"time"
"github.com/grafana/grafana-app-sdk/logging"
"github.com/grafana/grafana-app-sdk/resource"
@ -249,7 +250,9 @@ func TestProcessCheckRetry_SkipMissingItem(t *testing.T) {
t.Fatal(err)
}
meta.SetCreatedBy("user:1")
client := &mockClient{}
client := &mockClient{
res: obj,
}
typesClient := &mockTypesClient{}
ctx := context.TODO()
@ -281,7 +284,9 @@ func TestProcessCheckRetry_Success(t *testing.T) {
t.Fatal(err)
}
meta.SetCreatedBy("user:1")
client := &mockClient{}
client := &mockClient{
res: obj,
}
typesClient := &mockTypesClient{}
ctx := context.TODO()
@ -296,6 +301,51 @@ func TestProcessCheckRetry_Success(t *testing.T) {
assert.Empty(t, obj.Status.Report.Failures)
}
func TestProcessCheckRetry_Success_Polling(t *testing.T) {
retryAnnotationPollingInterval = 1 * time.Millisecond
obj := &advisorv0alpha1.Check{}
obj.SetAnnotations(map[string]string{
checks.RetryAnnotation: "item",
checks.StatusAnnotation: checks.StatusAnnotationProcessed,
})
obj.Status.Report.Failures = []advisorv0alpha1.CheckReportFailure{
{
ItemID: "item",
StepID: "step",
},
}
meta, err := utils.MetaAccessor(obj)
if err != nil {
t.Fatal(err)
}
meta.SetCreatedBy("user:1")
retryCount := 0
client := &mockClient{
get: func(ctx context.Context, id resource.Identifier) (resource.Object, error) {
if retryCount > 0 {
// obj contains the retry annotation
return obj, nil
}
retryCount++
oldObject := &advisorv0alpha1.Check{}
oldObject.SetAnnotations(map[string]string{
checks.RetryAnnotation: "",
})
return oldObject, nil
},
}
typesClient := &mockTypesClient{}
ctx := context.TODO()
check := &mockCheck{
items: []any{"item"},
}
err = processCheckRetry(ctx, logging.DefaultLogger, client, typesClient, obj, check)
assert.NoError(t, err)
assert.Equal(t, 1, retryCount)
}
func TestRunStepsInParallel_ConcurrentHeaderAccess(t *testing.T) {
// Create an HTTP request with headers to simulate the real scenario
req, err := http.NewRequest("GET", "/test", nil)
@ -360,6 +410,8 @@ func TestRunStepsInParallel_ConcurrentHeaderAccess(t *testing.T) {
type mockClient struct {
resource.Client
values []any
res resource.Object
get func(ctx context.Context, id resource.Identifier) (resource.Object, error)
}
func (m *mockClient) PatchInto(ctx context.Context, id resource.Identifier, req resource.PatchRequest, opts resource.PatchOptions, obj resource.Object) error {
@ -368,6 +420,13 @@ func (m *mockClient) PatchInto(ctx context.Context, id resource.Identifier, req
return nil
}
func (m *mockClient) Get(ctx context.Context, id resource.Identifier) (resource.Object, error) {
if m.get != nil {
return m.get(ctx, id)
}
return m.res, nil
}
type mockTypesClient struct {
resource.Client
res resource.Object