diff --git a/apps/advisor/pkg/app/app.go b/apps/advisor/pkg/app/app.go index ba84a14f7f7..fae11df0c6c 100644 --- a/apps/advisor/pkg/app/app.go +++ b/apps/advisor/pkg/app/app.go @@ -78,7 +78,7 @@ func New(cfg app.Config) (app.App, error) { } }() } - if req.Action == resource.AdmissionActionUpdate { + if req.Action == resource.AdmissionActionUpdate && retryAnnotationChanged(req.OldObject, req.Object) { go func() { logger := log.WithContext(ctx).With("check", check.ID()) logger.Debug("Updating check", "namespace", req.Object.GetNamespace(), "name", req.Object.GetName()) diff --git a/apps/advisor/pkg/app/utils.go b/apps/advisor/pkg/app/utils.go index 80f71cc679e..4ab55391bb1 100644 --- a/apps/advisor/pkg/app/utils.go +++ b/apps/advisor/pkg/app/utils.go @@ -7,6 +7,7 @@ import ( "slices" "strings" "sync" + "time" "github.com/grafana/grafana-app-sdk/logging" "github.com/grafana/grafana-app-sdk/resource" @@ -15,6 +16,8 @@ import ( "github.com/grafana/grafana/pkg/services/contexthandler" ) +var retryAnnotationPollingInterval = 1 * time.Second + func getCheck(obj resource.Object, checkMap map[string]checks.Check) (checks.Check, error) { labels := obj.GetLabels() objTypeLabel, ok := labels[checks.TypeLabel] @@ -100,13 +103,17 @@ func processCheckRetry(ctx context.Context, log logging.Logger, client resource. status := checks.GetStatusAnnotation(obj) if status == "" || status == checks.StatusAnnotationError { // Check not processed yet or errored + log.Debug("Check not processed yet or errored, skipping retry", "check", obj.GetName(), "status", status) return nil } // Get the item to retry from the annotation itemToRetry := checks.GetRetryAnnotation(obj) if itemToRetry == "" { // No item to retry, nothing to do + log.Debug("No item to retry, skipping retry", "check", obj.GetName()) return nil + } else { + log.Debug("Item to retry found", "check", obj.GetName(), "item", itemToRetry) } c, ok := obj.(*advisorv0alpha1.Check) if !ok { @@ -165,14 +172,23 @@ func processCheckRetry(ctx context.Context, log logging.Logger, client resource. // Failure not in the list of items to retry, keep it return false }) + // Wait for the retry annotation to be persisted before patching the object + err = waitForRetryAnnotation(ctx, log, client, obj, itemToRetry) + if err != nil { + return err + } + // Set the status err = checks.SetStatus(ctx, client, obj, c.Status) + log.Debug("Status set", "check", obj.GetName(), "status.count", c.Status.Report.Count) if err != nil { return err } // Delete the retry annotation to mark the check as processed annotations := checks.DeleteAnnotations(ctx, obj, []string{checks.RetryAnnotation}) + err = checks.SetAnnotations(ctx, client, obj, annotations) + log.Debug("Annotations set", "check", obj.GetName(), "annotations", annotations) - return checks.SetAnnotations(ctx, client, obj, annotations) + return err } func runStepsInParallel(ctx context.Context, log logging.Logger, spec *advisorv0alpha1.CheckSpec, steps []checks.Step, items []any) ([]advisorv0alpha1.CheckReportFailure, error) { @@ -234,3 +250,47 @@ func filterSteps(checkType resource.Object, steps []checks.Step) ([]checks.Step, } return steps, nil } + +// retryAnnotationChanged compares the retry annotation between old and new objects +func retryAnnotationChanged(oldObj, newObj resource.Object) bool { + if oldObj == nil || newObj == nil { + return true // If either is nil, consider it changed + } + + // Compare annotations + oldAnnotations := oldObj.GetAnnotations() + newAnnotations := newObj.GetAnnotations() + return newAnnotations[checks.RetryAnnotation] != "" && + oldAnnotations[checks.RetryAnnotation] != newAnnotations[checks.RetryAnnotation] +} + +// waitForRetryAnnotation waits for the retry annotation to match the item to retry +func waitForRetryAnnotation(ctx context.Context, log logging.Logger, client resource.Client, obj resource.Object, itemToRetry string) error { + currentObj, err := client.Get(ctx, resource.Identifier{ + Namespace: obj.GetNamespace(), + Name: obj.GetName(), + }) + if err != nil { + return err + } + retries := 0 + currentRetryAnnotation := checks.GetRetryAnnotation(currentObj) + for currentRetryAnnotation != itemToRetry { + log.Debug("Waiting for retry annotation to be persisted", "check", obj.GetName(), "item", itemToRetry, "currentRetryAnnotation", currentRetryAnnotation) + time.Sleep(retryAnnotationPollingInterval) + retries++ + if retries > 5 { + return fmt.Errorf("timeout waiting for retry annotation to be persisted") + } + currentObj, err = client.Get(ctx, resource.Identifier{ + Namespace: obj.GetNamespace(), + Name: obj.GetName(), + }) + if err != nil { + return err + } + currentRetryAnnotation = checks.GetRetryAnnotation(currentObj) + } + log.Debug("Retry annotation persisted", "check", obj.GetName(), "item", itemToRetry) + return nil +} diff --git a/apps/advisor/pkg/app/utils_test.go b/apps/advisor/pkg/app/utils_test.go index 663d7cfdcf6..ea905feee2a 100644 --- a/apps/advisor/pkg/app/utils_test.go +++ b/apps/advisor/pkg/app/utils_test.go @@ -6,6 +6,7 @@ import ( "fmt" "net/http" "testing" + "time" "github.com/grafana/grafana-app-sdk/logging" "github.com/grafana/grafana-app-sdk/resource" @@ -249,7 +250,9 @@ func TestProcessCheckRetry_SkipMissingItem(t *testing.T) { t.Fatal(err) } meta.SetCreatedBy("user:1") - client := &mockClient{} + client := &mockClient{ + res: obj, + } typesClient := &mockTypesClient{} ctx := context.TODO() @@ -281,7 +284,9 @@ func TestProcessCheckRetry_Success(t *testing.T) { t.Fatal(err) } meta.SetCreatedBy("user:1") - client := &mockClient{} + client := &mockClient{ + res: obj, + } typesClient := &mockTypesClient{} ctx := context.TODO() @@ -296,6 +301,51 @@ func TestProcessCheckRetry_Success(t *testing.T) { assert.Empty(t, obj.Status.Report.Failures) } +func TestProcessCheckRetry_Success_Polling(t *testing.T) { + retryAnnotationPollingInterval = 1 * time.Millisecond + obj := &advisorv0alpha1.Check{} + obj.SetAnnotations(map[string]string{ + checks.RetryAnnotation: "item", + checks.StatusAnnotation: checks.StatusAnnotationProcessed, + }) + obj.Status.Report.Failures = []advisorv0alpha1.CheckReportFailure{ + { + ItemID: "item", + StepID: "step", + }, + } + meta, err := utils.MetaAccessor(obj) + if err != nil { + t.Fatal(err) + } + meta.SetCreatedBy("user:1") + retryCount := 0 + client := &mockClient{ + get: func(ctx context.Context, id resource.Identifier) (resource.Object, error) { + if retryCount > 0 { + // obj contains the retry annotation + return obj, nil + } + retryCount++ + oldObject := &advisorv0alpha1.Check{} + oldObject.SetAnnotations(map[string]string{ + checks.RetryAnnotation: "", + }) + return oldObject, nil + }, + } + typesClient := &mockTypesClient{} + ctx := context.TODO() + + check := &mockCheck{ + items: []any{"item"}, + } + + err = processCheckRetry(ctx, logging.DefaultLogger, client, typesClient, obj, check) + assert.NoError(t, err) + assert.Equal(t, 1, retryCount) +} + func TestRunStepsInParallel_ConcurrentHeaderAccess(t *testing.T) { // Create an HTTP request with headers to simulate the real scenario req, err := http.NewRequest("GET", "/test", nil) @@ -360,6 +410,8 @@ func TestRunStepsInParallel_ConcurrentHeaderAccess(t *testing.T) { type mockClient struct { resource.Client values []any + res resource.Object + get func(ctx context.Context, id resource.Identifier) (resource.Object, error) } func (m *mockClient) PatchInto(ctx context.Context, id resource.Identifier, req resource.PatchRequest, opts resource.PatchOptions, obj resource.Object) error { @@ -368,6 +420,13 @@ func (m *mockClient) PatchInto(ctx context.Context, id resource.Identifier, req return nil } +func (m *mockClient) Get(ctx context.Context, id resource.Identifier) (resource.Object, error) { + if m.get != nil { + return m.get(ctx, id) + } + return m.res, nil +} + type mockTypesClient struct { resource.Client res resource.Object