mirror of
				https://github.com/fluxcd/flux2.git
				synced 2025-10-31 16:26:36 +08:00 
			
		
		
		
	Refactor reconcile and resume cmd for alert and receiver
Signed-off-by: Somtochi Onyekwere <somtochionyekwere@gmail.com>
This commit is contained in:
		| @ -19,6 +19,7 @@ package main | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"github.com/fluxcd/notification-controller/api/v1beta1" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/fluxcd/pkg/apis/meta" | ||||
| @ -97,12 +98,23 @@ func (reconcile reconcileCommand) run(cmd *cobra.Command, args []string) error { | ||||
| 	} | ||||
| 	logger.Successf("%s annotated", reconcile.kind) | ||||
|  | ||||
| 	if reconcile.kind == v1beta1.AlertKind || reconcile.kind == v1beta1.ReceiverKind { | ||||
| 		if err = wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, | ||||
| 			isReconcileReady(ctx, kubeClient, namespacedName, reconcile.object)); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		logger.Successf(reconcile.object.successMessage()) | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	lastHandledReconcileAt := reconcile.object.lastHandledReconcileRequest() | ||||
| 	logger.Waitingf("waiting for %s reconciliation", reconcile.kind) | ||||
| 	if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, | ||||
| 		reconciliationHandled(ctx, kubeClient, namespacedName, reconcile.object, lastHandledReconcileAt)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Successf("%s reconciliation completed", reconcile.kind) | ||||
|  | ||||
| 	if apimeta.IsStatusConditionFalse(*reconcile.object.GetStatusConditions(), meta.ReadyCondition) { | ||||
| @ -140,3 +152,23 @@ func requestReconciliation(ctx context.Context, kubeClient client.Client, | ||||
| 		return kubeClient.Update(ctx, obj.asClientObject()) | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| func isReconcileReady(ctx context.Context, kubeClient client.Client, | ||||
| 	namespacedName types.NamespacedName, obj reconcilable) wait.ConditionFunc { | ||||
| 	return func() (bool, error) { | ||||
| 		err := kubeClient.Get(ctx, namespacedName, obj.asClientObject()) | ||||
| 		if err != nil { | ||||
| 			return false, err | ||||
| 		} | ||||
|  | ||||
| 		if c := apimeta.FindStatusCondition(*obj.GetStatusConditions(), meta.ReadyCondition); c != nil { | ||||
| 			switch c.Status { | ||||
| 			case metav1.ConditionTrue: | ||||
| 				return true, nil | ||||
| 			case metav1.ConditionFalse: | ||||
| 				return false, fmt.Errorf(c.Message) | ||||
| 			} | ||||
| 		} | ||||
| 		return false, nil | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @ -17,18 +17,8 @@ limitations under the License. | ||||
| package main | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/fluxcd/flux2/internal/utils" | ||||
| 	"github.com/fluxcd/pkg/apis/meta" | ||||
|  | ||||
| 	"github.com/spf13/cobra" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
|  | ||||
| 	notificationv1 "github.com/fluxcd/notification-controller/api/v1beta1" | ||||
| 	"github.com/spf13/cobra" | ||||
| ) | ||||
|  | ||||
| var reconcileAlertCmd = &cobra.Command{ | ||||
| @ -37,61 +27,16 @@ var reconcileAlertCmd = &cobra.Command{ | ||||
| 	Long:  `The reconcile alert command triggers a reconciliation of an Alert resource and waits for it to finish.`, | ||||
| 	Example: `  # Trigger a reconciliation for an existing alert | ||||
|   flux reconcile alert main`, | ||||
| 	RunE: reconcileAlertCmdRun, | ||||
| 	RunE: reconcileCommand{ | ||||
| 		apiType: alertType, | ||||
| 		object:  alertAdapter{¬ificationv1.Alert{}}, | ||||
| 	}.run, | ||||
| } | ||||
|  | ||||
| func init() { | ||||
| 	reconcileCmd.AddCommand(reconcileAlertCmd) | ||||
| } | ||||
|  | ||||
| func reconcileAlertCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	if len(args) < 1 { | ||||
| 		return fmt.Errorf("Alert name is required") | ||||
| 	} | ||||
| 	name := args[0] | ||||
|  | ||||
| 	ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout) | ||||
| 	defer cancel() | ||||
|  | ||||
| 	kubeClient, err := utils.KubeClient(rootArgs.kubeconfig, rootArgs.kubecontext) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	namespacedName := types.NamespacedName{ | ||||
| 		Namespace: rootArgs.namespace, | ||||
| 		Name:      name, | ||||
| 	} | ||||
|  | ||||
| 	var alert notificationv1.Alert | ||||
| 	err = kubeClient.Get(ctx, namespacedName, &alert) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if alert.Spec.Suspend { | ||||
| 		return fmt.Errorf("resource is suspended") | ||||
| 	} | ||||
|  | ||||
| 	logger.Actionf("annotating Alert %s in %s namespace", name, rootArgs.namespace) | ||||
| 	if alert.Annotations == nil { | ||||
| 		alert.Annotations = map[string]string{ | ||||
| 			meta.ReconcileRequestAnnotation: time.Now().Format(time.RFC3339Nano), | ||||
| 		} | ||||
| 	} else { | ||||
| 		alert.Annotations[meta.ReconcileRequestAnnotation] = time.Now().Format(time.RFC3339Nano) | ||||
| 	} | ||||
|  | ||||
| 	if err := kubeClient.Update(ctx, &alert); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("Alert annotated") | ||||
|  | ||||
| 	logger.Waitingf("waiting for reconciliation") | ||||
| 	if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, | ||||
| 		isAlertReady(ctx, kubeClient, namespacedName, &alert)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("Alert reconciliation completed") | ||||
| 	return nil | ||||
| func (obj alertAdapter) lastHandledReconcileRequest() string { | ||||
| 	return "" | ||||
| } | ||||
|  | ||||
| @ -17,23 +17,9 @@ limitations under the License. | ||||
| package main | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/spf13/cobra" | ||||
| 	apimeta "k8s.io/apimachinery/pkg/api/meta" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
| 	"k8s.io/client-go/util/retry" | ||||
| 	"sigs.k8s.io/controller-runtime/pkg/client" | ||||
|  | ||||
| 	"github.com/fluxcd/flux2/internal/utils" | ||||
| 	"github.com/fluxcd/pkg/apis/meta" | ||||
|  | ||||
| 	helmv2 "github.com/fluxcd/helm-controller/api/v2beta1" | ||||
| 	sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" | ||||
| 	"github.com/spf13/cobra" | ||||
| ) | ||||
|  | ||||
| var reconcileHrCmd = &cobra.Command{ | ||||
| @ -47,7 +33,10 @@ The reconcile kustomization command triggers a reconciliation of a HelmRelease r | ||||
|  | ||||
|   # Trigger a reconciliation of the HelmRelease's source and apply changes | ||||
|   flux reconcile hr podinfo --with-source`, | ||||
| 	RunE: reconcileHrCmdRun, | ||||
| 	RunE: reconcileWithSourceCommand{ | ||||
| 		apiType: helmReleaseType, | ||||
| 		object:  helmReleaseAdapter{&helmv2.HelmRelease{}}, | ||||
| 	}.run, | ||||
| } | ||||
|  | ||||
| type reconcileHelmReleaseFlags struct { | ||||
| @ -62,117 +51,33 @@ func init() { | ||||
| 	reconcileCmd.AddCommand(reconcileHrCmd) | ||||
| } | ||||
|  | ||||
| func reconcileHrCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	if len(args) < 1 { | ||||
| 		return fmt.Errorf("HelmRelease name is required") | ||||
| 	} | ||||
| 	name := args[0] | ||||
|  | ||||
| 	ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout) | ||||
| 	defer cancel() | ||||
|  | ||||
| 	kubeClient, err := utils.KubeClient(rootArgs.kubeconfig, rootArgs.kubecontext) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	namespacedName := types.NamespacedName{ | ||||
| 		Namespace: rootArgs.namespace, | ||||
| 		Name:      name, | ||||
| 	} | ||||
|  | ||||
| 	var helmRelease helmv2.HelmRelease | ||||
| 	err = kubeClient.Get(ctx, namespacedName, &helmRelease) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if helmRelease.Spec.Suspend { | ||||
| 		return fmt.Errorf("resource is suspended") | ||||
| 	} | ||||
|  | ||||
| 	if rhrArgs.syncHrWithSource { | ||||
| 		nsCopy := rootArgs.namespace | ||||
| 		if helmRelease.Spec.Chart.Spec.SourceRef.Namespace != "" { | ||||
| 			rootArgs.namespace = helmRelease.Spec.Chart.Spec.SourceRef.Namespace | ||||
| 		} | ||||
| 		switch helmRelease.Spec.Chart.Spec.SourceRef.Kind { | ||||
| 		case sourcev1.HelmRepositoryKind: | ||||
| 			err = reconcileCommand{ | ||||
| 				apiType: helmRepositoryType, | ||||
| 				object:  helmRepositoryAdapter{&sourcev1.HelmRepository{}}, | ||||
| 			}.run(nil, []string{helmRelease.Spec.Chart.Spec.SourceRef.Name}) | ||||
| 		case sourcev1.GitRepositoryKind: | ||||
| 			err = reconcileCommand{ | ||||
| 				apiType: gitRepositoryType, | ||||
| 				object:  gitRepositoryAdapter{&sourcev1.GitRepository{}}, | ||||
| 			}.run(nil, []string{helmRelease.Spec.Chart.Spec.SourceRef.Name}) | ||||
| 		case sourcev1.BucketKind: | ||||
| 			err = reconcileCommand{ | ||||
| 				apiType: bucketType, | ||||
| 				object:  bucketAdapter{&sourcev1.Bucket{}}, | ||||
| 			}.run(nil, []string{helmRelease.Spec.Chart.Spec.SourceRef.Name}) | ||||
| 		} | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		rootArgs.namespace = nsCopy | ||||
| 	} | ||||
|  | ||||
| 	lastHandledReconcileAt := helmRelease.Status.LastHandledReconcileAt | ||||
| 	logger.Actionf("annotating HelmRelease %s in %s namespace", name, rootArgs.namespace) | ||||
| 	if err := requestHelmReleaseReconciliation(ctx, kubeClient, namespacedName, &helmRelease); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("HelmRelease annotated") | ||||
|  | ||||
| 	logger.Waitingf("waiting for HelmRelease reconciliation") | ||||
| 	if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, | ||||
| 		helmReleaseReconciliationHandled(ctx, kubeClient, namespacedName, &helmRelease, lastHandledReconcileAt), | ||||
| 	); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("HelmRelease reconciliation completed") | ||||
|  | ||||
| 	err = kubeClient.Get(ctx, namespacedName, &helmRelease) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if c := apimeta.FindStatusCondition(helmRelease.Status.Conditions, meta.ReadyCondition); c != nil { | ||||
| 		switch c.Status { | ||||
| 		case metav1.ConditionFalse: | ||||
| 			return fmt.Errorf("HelmRelease reconciliation failed: %s", c.Message) | ||||
| 		default: | ||||
| 			logger.Successf("reconciled revision %s", helmRelease.Status.LastAppliedRevision) | ||||
| 		} | ||||
| 	} | ||||
| 	return nil | ||||
| func (obj helmReleaseAdapter) lastHandledReconcileRequest() string { | ||||
| 	return obj.Status.GetLastHandledReconcileRequest() | ||||
| } | ||||
|  | ||||
| func helmReleaseReconciliationHandled(ctx context.Context, kubeClient client.Client, | ||||
| 	namespacedName types.NamespacedName, helmRelease *helmv2.HelmRelease, lastHandledReconcileAt string) wait.ConditionFunc { | ||||
| 	return func() (bool, error) { | ||||
| 		err := kubeClient.Get(ctx, namespacedName, helmRelease) | ||||
| 		if err != nil { | ||||
| 			return false, err | ||||
| 		} | ||||
| 		return helmRelease.Status.LastHandledReconcileAt != lastHandledReconcileAt, nil | ||||
| 	} | ||||
| func (obj helmReleaseAdapter) reconcileSource() bool { | ||||
| 	return rhrArgs.syncHrWithSource | ||||
| } | ||||
|  | ||||
| func requestHelmReleaseReconciliation(ctx context.Context, kubeClient client.Client, | ||||
| 	namespacedName types.NamespacedName, helmRelease *helmv2.HelmRelease) error { | ||||
| 	return retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) { | ||||
| 		if err := kubeClient.Get(ctx, namespacedName, helmRelease); err != nil { | ||||
| 			return err | ||||
| func (obj helmReleaseAdapter) getSource() (reconcileCommand, string) { | ||||
| 	var cmd reconcileCommand | ||||
| 	switch obj.Spec.Chart.Spec.SourceRef.Kind { | ||||
| 	case sourcev1.HelmRepositoryKind: | ||||
| 		cmd = reconcileCommand{ | ||||
| 			apiType: helmRepositoryType, | ||||
| 			object:  helmRepositoryAdapter{&sourcev1.HelmRepository{}}, | ||||
| 		} | ||||
| 		if helmRelease.Annotations == nil { | ||||
| 			helmRelease.Annotations = map[string]string{ | ||||
| 				meta.ReconcileRequestAnnotation: time.Now().Format(time.RFC3339Nano), | ||||
| 			} | ||||
| 		} else { | ||||
| 			helmRelease.Annotations[meta.ReconcileRequestAnnotation] = time.Now().Format(time.RFC3339Nano) | ||||
| 	case sourcev1.GitRepositoryKind: | ||||
| 		cmd = reconcileCommand{ | ||||
| 			apiType: gitRepositoryType, | ||||
| 			object:  gitRepositoryAdapter{&sourcev1.GitRepository{}}, | ||||
| 		} | ||||
| 		return kubeClient.Update(ctx, helmRelease) | ||||
| 	}) | ||||
| 	case sourcev1.BucketKind: | ||||
| 		cmd = reconcileCommand{ | ||||
| 			apiType: bucketType, | ||||
| 			object:  bucketAdapter{&sourcev1.Bucket{}}, | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return cmd, obj.Spec.Chart.Spec.SourceRef.Name | ||||
| } | ||||
|  | ||||
| @ -17,22 +17,9 @@ limitations under the License. | ||||
| package main | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"time" | ||||
|  | ||||
| 	apimeta "k8s.io/apimachinery/pkg/api/meta" | ||||
| 	"k8s.io/client-go/util/retry" | ||||
| 	"sigs.k8s.io/controller-runtime/pkg/client" | ||||
|  | ||||
| 	"github.com/fluxcd/flux2/internal/utils" | ||||
| 	"github.com/fluxcd/pkg/apis/meta" | ||||
| 	"github.com/spf13/cobra" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
|  | ||||
| 	kustomizev1 "github.com/fluxcd/kustomize-controller/api/v1beta1" | ||||
| 	sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" | ||||
| 	"github.com/spf13/cobra" | ||||
| ) | ||||
|  | ||||
| var reconcileKsCmd = &cobra.Command{ | ||||
| @ -46,7 +33,10 @@ The reconcile kustomization command triggers a reconciliation of a Kustomization | ||||
|  | ||||
|   # Trigger a sync of the Kustomization's source and apply changes | ||||
|   flux reconcile kustomization podinfo --with-source`, | ||||
| 	RunE: reconcileKsCmdRun, | ||||
| 	RunE: reconcileWithSourceCommand{ | ||||
| 		apiType: kustomizationType, | ||||
| 		object:  kustomizationAdapter{&kustomizev1.Kustomization{}}, | ||||
| 	}.run, | ||||
| } | ||||
|  | ||||
| type reconcileKsFlags struct { | ||||
| @ -61,104 +51,28 @@ func init() { | ||||
| 	reconcileCmd.AddCommand(reconcileKsCmd) | ||||
| } | ||||
|  | ||||
| func reconcileKsCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	if len(args) < 1 { | ||||
| 		return fmt.Errorf("Kustomization name is required") | ||||
| 	} | ||||
| 	name := args[0] | ||||
|  | ||||
| 	ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout) | ||||
| 	defer cancel() | ||||
|  | ||||
| 	kubeClient, err := utils.KubeClient(rootArgs.kubeconfig, rootArgs.kubecontext) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	namespacedName := types.NamespacedName{ | ||||
| 		Namespace: rootArgs.namespace, | ||||
| 		Name:      name, | ||||
| 	} | ||||
| 	var kustomization kustomizev1.Kustomization | ||||
| 	err = kubeClient.Get(ctx, namespacedName, &kustomization) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if kustomization.Spec.Suspend { | ||||
| 		return fmt.Errorf("resource is suspended") | ||||
| 	} | ||||
|  | ||||
| 	if rksArgs.syncKsWithSource { | ||||
| 		nsCopy := rootArgs.namespace | ||||
| 		if kustomization.Spec.SourceRef.Namespace != "" { | ||||
| 			rootArgs.namespace = kustomization.Spec.SourceRef.Namespace | ||||
| 		} | ||||
| 		switch kustomization.Spec.SourceRef.Kind { | ||||
| 		case sourcev1.GitRepositoryKind: | ||||
| 			err = reconcileCommand{ | ||||
| 				apiType: gitRepositoryType, | ||||
| 				object:  gitRepositoryAdapter{&sourcev1.GitRepository{}}, | ||||
| 			}.run(nil, []string{kustomization.Spec.SourceRef.Name}) | ||||
| 		case sourcev1.BucketKind: | ||||
| 			err = reconcileCommand{ | ||||
| 				apiType: bucketType, | ||||
| 				object:  bucketAdapter{&sourcev1.Bucket{}}, | ||||
| 			}.run(nil, []string{kustomization.Spec.SourceRef.Name}) | ||||
| 		} | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		rootArgs.namespace = nsCopy | ||||
| 	} | ||||
|  | ||||
| 	lastHandledReconcileAt := kustomization.Status.LastHandledReconcileAt | ||||
| 	logger.Actionf("annotating Kustomization %s in %s namespace", name, rootArgs.namespace) | ||||
| 	if err := requestKustomizeReconciliation(ctx, kubeClient, namespacedName, &kustomization); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("Kustomization annotated") | ||||
|  | ||||
| 	logger.Waitingf("waiting for Kustomization reconciliation") | ||||
| 	if err := wait.PollImmediate( | ||||
| 		rootArgs.pollInterval, rootArgs.timeout, | ||||
| 		kustomizeReconciliationHandled(ctx, kubeClient, namespacedName, &kustomization, lastHandledReconcileAt), | ||||
| 	); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("Kustomization reconciliation completed") | ||||
|  | ||||
| 	if apimeta.IsStatusConditionFalse(kustomization.Status.Conditions, meta.ReadyCondition) { | ||||
| 		return fmt.Errorf("Kustomization reconciliation failed") | ||||
| 	} | ||||
| 	logger.Successf("reconciled revision %s", kustomization.Status.LastAppliedRevision) | ||||
| 	return nil | ||||
| func (obj kustomizationAdapter) lastHandledReconcileRequest() string { | ||||
| 	return obj.Status.GetLastHandledReconcileRequest() | ||||
| } | ||||
|  | ||||
| func kustomizeReconciliationHandled(ctx context.Context, kubeClient client.Client, | ||||
| 	namespacedName types.NamespacedName, kustomization *kustomizev1.Kustomization, lastHandledReconcileAt string) wait.ConditionFunc { | ||||
| 	return func() (bool, error) { | ||||
| 		err := kubeClient.Get(ctx, namespacedName, kustomization) | ||||
| 		if err != nil { | ||||
| 			return false, err | ||||
| 		} | ||||
| 		return kustomization.Status.LastHandledReconcileAt != lastHandledReconcileAt, nil | ||||
| 	} | ||||
| func (obj kustomizationAdapter) reconcileSource() bool { | ||||
| 	return rksArgs.syncKsWithSource | ||||
| } | ||||
|  | ||||
| func requestKustomizeReconciliation(ctx context.Context, kubeClient client.Client, | ||||
| 	namespacedName types.NamespacedName, kustomization *kustomizev1.Kustomization) error { | ||||
| 	return retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) { | ||||
| 		if err := kubeClient.Get(ctx, namespacedName, kustomization); err != nil { | ||||
| 			return err | ||||
| func (obj kustomizationAdapter) getSource() (reconcileCommand, string) { | ||||
| 	var cmd reconcileCommand | ||||
| 	switch obj.Spec.SourceRef.Kind { | ||||
| 	case sourcev1.GitRepositoryKind: | ||||
| 		cmd = reconcileCommand{ | ||||
| 			apiType: gitRepositoryType, | ||||
| 			object:  gitRepositoryAdapter{&sourcev1.GitRepository{}}, | ||||
| 		} | ||||
| 		if kustomization.Annotations == nil { | ||||
| 			kustomization.Annotations = map[string]string{ | ||||
| 				meta.ReconcileRequestAnnotation: time.Now().Format(time.RFC3339Nano), | ||||
| 			} | ||||
| 		} else { | ||||
| 			kustomization.Annotations[meta.ReconcileRequestAnnotation] = time.Now().Format(time.RFC3339Nano) | ||||
| 	case sourcev1.BucketKind: | ||||
| 		cmd = reconcileCommand{ | ||||
| 			apiType: bucketType, | ||||
| 			object:  bucketAdapter{&sourcev1.Bucket{}}, | ||||
| 		} | ||||
| 		return kubeClient.Update(ctx, kustomization) | ||||
| 	}) | ||||
| 	} | ||||
|  | ||||
| 	return cmd, obj.Spec.SourceRef.Name | ||||
| } | ||||
|  | ||||
							
								
								
									
										88
									
								
								cmd/flux/reconcile_with_source.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										88
									
								
								cmd/flux/reconcile_with_source.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,88 @@ | ||||
| package main | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"github.com/fluxcd/flux2/internal/utils" | ||||
| 	"github.com/fluxcd/pkg/apis/meta" | ||||
| 	"github.com/spf13/cobra" | ||||
| 	apimeta "k8s.io/apimachinery/pkg/api/meta" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
| ) | ||||
|  | ||||
| type reconcileWithSource interface { | ||||
| 	adapter | ||||
| 	reconcilable | ||||
| 	reconcileSource() bool | ||||
| 	getSource() (reconcileCommand, string) | ||||
| } | ||||
|  | ||||
| type reconcileWithSourceCommand struct { | ||||
| 	apiType | ||||
| 	object reconcileWithSource | ||||
| } | ||||
|  | ||||
| func (reconcile reconcileWithSourceCommand) run(cmd *cobra.Command, args []string) error { | ||||
| 	if len(args) < 1 { | ||||
| 		return fmt.Errorf("%s name is required", reconcile.kind) | ||||
| 	} | ||||
| 	name := args[0] | ||||
|  | ||||
| 	ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout) | ||||
| 	defer cancel() | ||||
|  | ||||
| 	kubeClient, err := utils.KubeClient(rootArgs.kubeconfig, rootArgs.kubecontext) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	namespacedName := types.NamespacedName{ | ||||
| 		Namespace: rootArgs.namespace, | ||||
| 		Name:      name, | ||||
| 	} | ||||
|  | ||||
| 	err = kubeClient.Get(ctx, namespacedName, reconcile.object.asClientObject()) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if reconcile.object.isSuspended() { | ||||
| 		return fmt.Errorf("resource is suspended") | ||||
| 	} | ||||
|  | ||||
| 	if reconcile.object.reconcileSource() { | ||||
| 		nsCopy := rootArgs.namespace | ||||
| 		objectNs := reconcile.object.asClientObject().GetNamespace() | ||||
| 		if objectNs != "" { | ||||
| 			rootArgs.namespace = reconcile.object.asClientObject().GetNamespace() | ||||
| 		} | ||||
|  | ||||
| 		reconcileCmd, sourceName := reconcile.object.getSource() | ||||
| 		err := reconcileCmd.run(nil, []string{sourceName}) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		rootArgs.namespace = nsCopy | ||||
| 	} | ||||
|  | ||||
| 	logger.Actionf("annotating %s %s in %s namespace", reconcile.kind, name, rootArgs.namespace) | ||||
| 	if err := requestReconciliation(ctx, kubeClient, namespacedName, reconcile.object); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("%s annotated", reconcile.kind) | ||||
|  | ||||
| 	lastHandledReconcileAt := reconcile.object.lastHandledReconcileRequest() | ||||
| 	logger.Waitingf("waiting for %s reconciliation", reconcile.kind) | ||||
| 	if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, | ||||
| 		reconciliationHandled(ctx, kubeClient, namespacedName, reconcile.object, lastHandledReconcileAt)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("%s reconciliation completed", reconcile.kind) | ||||
|  | ||||
| 	if apimeta.IsStatusConditionFalse(*reconcile.object.GetStatusConditions(), meta.ReadyCondition) { | ||||
| 		return fmt.Errorf("%s reconciliation failed", reconcile.kind) | ||||
| 	} | ||||
| 	logger.Successf(reconcile.object.successMessage()) | ||||
| 	return nil | ||||
| } | ||||
| @ -17,20 +17,9 @@ limitations under the License. | ||||
| package main | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
|  | ||||
| 	"github.com/fluxcd/flux2/internal/utils" | ||||
| 	"github.com/fluxcd/pkg/apis/meta" | ||||
| 	notificationv1 "github.com/fluxcd/notification-controller/api/v1beta1" | ||||
|  | ||||
| 	"github.com/spf13/cobra" | ||||
| 	apimeta "k8s.io/apimachinery/pkg/api/meta" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
| 	"sigs.k8s.io/controller-runtime/pkg/client" | ||||
|  | ||||
| 	notificationv1 "github.com/fluxcd/notification-controller/api/v1beta1" | ||||
| ) | ||||
|  | ||||
| var resumeAlertCmd = &cobra.Command{ | ||||
| @ -40,72 +29,24 @@ var resumeAlertCmd = &cobra.Command{ | ||||
| finish the apply.`, | ||||
| 	Example: `  # Resume reconciliation for an existing Alert | ||||
|   flux resume alert main`, | ||||
| 	RunE: resumeAlertCmdRun, | ||||
| 	RunE: resumeCommand{ | ||||
| 		apiType: alertType, | ||||
| 		object:  alertAdapter{¬ificationv1.Alert{}}, | ||||
| 	}.run, | ||||
| } | ||||
|  | ||||
| func init() { | ||||
| 	resumeCmd.AddCommand(resumeAlertCmd) | ||||
| } | ||||
|  | ||||
| func resumeAlertCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	if len(args) < 1 { | ||||
| 		return fmt.Errorf("Alert name is required") | ||||
| 	} | ||||
| 	name := args[0] | ||||
|  | ||||
| 	ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout) | ||||
| 	defer cancel() | ||||
|  | ||||
| 	kubeClient, err := utils.KubeClient(rootArgs.kubeconfig, rootArgs.kubecontext) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	namespacedName := types.NamespacedName{ | ||||
| 		Namespace: rootArgs.namespace, | ||||
| 		Name:      name, | ||||
| 	} | ||||
| 	var alert notificationv1.Alert | ||||
| 	err = kubeClient.Get(ctx, namespacedName, &alert) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Actionf("resuming Alert %s in %s namespace", name, rootArgs.namespace) | ||||
| 	alert.Spec.Suspend = false | ||||
| 	if err := kubeClient.Update(ctx, &alert); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("Alert resumed") | ||||
|  | ||||
| 	logger.Waitingf("waiting for Alert reconciliation") | ||||
| 	if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, | ||||
| 		isAlertResumed(ctx, kubeClient, namespacedName, &alert)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("Alert reconciliation completed") | ||||
| 	return nil | ||||
| func (obj alertAdapter) getObservedGeneration() int64 { | ||||
| 	return obj.Alert.Status.ObservedGeneration | ||||
| } | ||||
|  | ||||
| func isAlertResumed(ctx context.Context, kubeClient client.Client, | ||||
| 	namespacedName types.NamespacedName, alert *notificationv1.Alert) wait.ConditionFunc { | ||||
| 	return func() (bool, error) { | ||||
| 		err := kubeClient.Get(ctx, namespacedName, alert) | ||||
| 		if err != nil { | ||||
| 			return false, err | ||||
| 		} | ||||
|  | ||||
| 		if c := apimeta.FindStatusCondition(alert.Status.Conditions, meta.ReadyCondition); c != nil { | ||||
| 			switch c.Status { | ||||
| 			case metav1.ConditionTrue: | ||||
| 				return true, nil | ||||
| 			case metav1.ConditionFalse: | ||||
| 				if c.Reason == meta.SuspendedReason { | ||||
| 					return false, nil | ||||
| 				} | ||||
| 				return false, fmt.Errorf(c.Message) | ||||
| 			} | ||||
| 		} | ||||
| 		return false, nil | ||||
| 	} | ||||
| func (obj alertAdapter) setUnsuspended() { | ||||
| 	obj.Alert.Spec.Suspend = false | ||||
| } | ||||
|  | ||||
| func (obj alertAdapter) successMessage() string { | ||||
| 	return "Alert reconciliation completed" | ||||
| } | ||||
|  | ||||
| @ -17,20 +17,8 @@ limitations under the License. | ||||
| package main | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
|  | ||||
| 	"github.com/fluxcd/flux2/internal/utils" | ||||
| 	"github.com/fluxcd/pkg/apis/meta" | ||||
|  | ||||
| 	"github.com/spf13/cobra" | ||||
| 	apimeta "k8s.io/apimachinery/pkg/api/meta" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
| 	"sigs.k8s.io/controller-runtime/pkg/client" | ||||
|  | ||||
| 	notificationv1 "github.com/fluxcd/notification-controller/api/v1beta1" | ||||
| 	"github.com/spf13/cobra" | ||||
| ) | ||||
|  | ||||
| var resumeReceiverCmd = &cobra.Command{ | ||||
| @ -40,73 +28,24 @@ var resumeReceiverCmd = &cobra.Command{ | ||||
| finish the apply.`, | ||||
| 	Example: `  # Resume reconciliation for an existing Receiver | ||||
|   flux resume receiver main`, | ||||
| 	RunE: resumeReceiverCmdRun, | ||||
| 	RunE: resumeCommand{ | ||||
| 		apiType: receiverType, | ||||
| 		object:  receiverAdapter{¬ificationv1.Receiver{}}, | ||||
| 	}.run, | ||||
| } | ||||
|  | ||||
| func init() { | ||||
| 	resumeCmd.AddCommand(resumeReceiverCmd) | ||||
| } | ||||
|  | ||||
| func resumeReceiverCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	if len(args) < 1 { | ||||
| 		return fmt.Errorf("Receiver name is required") | ||||
| 	} | ||||
| 	name := args[0] | ||||
|  | ||||
| 	ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout) | ||||
| 	defer cancel() | ||||
|  | ||||
| 	kubeClient, err := utils.KubeClient(rootArgs.kubeconfig, rootArgs.kubecontext) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	namespacedName := types.NamespacedName{ | ||||
| 		Namespace: rootArgs.namespace, | ||||
| 		Name:      name, | ||||
| 	} | ||||
| 	var receiver notificationv1.Receiver | ||||
| 	err = kubeClient.Get(ctx, namespacedName, &receiver) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Actionf("resuming Receiver %s in %s namespace", name, rootArgs.namespace) | ||||
| 	receiver.Spec.Suspend = false | ||||
| 	if err := kubeClient.Update(ctx, &receiver); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("Receiver resumed") | ||||
|  | ||||
| 	logger.Waitingf("waiting for Receiver reconciliation") | ||||
| 	if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, | ||||
| 		isReceiverResumed(ctx, kubeClient, namespacedName, &receiver)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Successf("Receiver reconciliation completed") | ||||
| 	return nil | ||||
| func (obj receiverAdapter) getObservedGeneration() int64 { | ||||
| 	return obj.Receiver.Status.ObservedGeneration | ||||
| } | ||||
|  | ||||
| func isReceiverResumed(ctx context.Context, kubeClient client.Client, | ||||
| 	namespacedName types.NamespacedName, receiver *notificationv1.Receiver) wait.ConditionFunc { | ||||
| 	return func() (bool, error) { | ||||
| 		err := kubeClient.Get(ctx, namespacedName, receiver) | ||||
| 		if err != nil { | ||||
| 			return false, err | ||||
| 		} | ||||
|  | ||||
| 		if c := apimeta.FindStatusCondition(receiver.Status.Conditions, meta.ReadyCondition); c != nil { | ||||
| 			switch c.Status { | ||||
| 			case metav1.ConditionTrue: | ||||
| 				return true, nil | ||||
| 			case metav1.ConditionFalse: | ||||
| 				if c.Reason == meta.SuspendedReason { | ||||
| 					return false, nil | ||||
| 				} | ||||
| 				return false, fmt.Errorf(c.Message) | ||||
| 			} | ||||
| 		} | ||||
| 		return false, nil | ||||
| 	} | ||||
| func (obj receiverAdapter) setUnsuspended() { | ||||
| 	obj.Receiver.Spec.Suspend = false | ||||
| } | ||||
|  | ||||
| func (obj receiverAdapter) successMessage() string { | ||||
| 	return "Receiver reconciliation completed" | ||||
| } | ||||
|  | ||||
| @ -17,14 +17,8 @@ limitations under the License. | ||||
| package main | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
|  | ||||
| 	"github.com/spf13/cobra" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
|  | ||||
| 	"github.com/fluxcd/flux2/internal/utils" | ||||
| 	notificationv1 "github.com/fluxcd/notification-controller/api/v1beta1" | ||||
| 	"github.com/spf13/cobra" | ||||
| ) | ||||
|  | ||||
| var suspendAlertCmd = &cobra.Command{ | ||||
| @ -33,43 +27,20 @@ var suspendAlertCmd = &cobra.Command{ | ||||
| 	Long:  "The suspend command disables the reconciliation of a Alert resource.", | ||||
| 	Example: `  # Suspend reconciliation for an existing Alert | ||||
|   flux suspend alert main`, | ||||
| 	RunE: suspendAlertCmdRun, | ||||
| 	RunE: suspendCommand{ | ||||
| 		apiType: alertType, | ||||
| 		object:  &alertAdapter{¬ificationv1.Alert{}}, | ||||
| 	}.run, | ||||
| } | ||||
|  | ||||
| func init() { | ||||
| 	suspendCmd.AddCommand(suspendAlertCmd) | ||||
| } | ||||
|  | ||||
| func suspendAlertCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	if len(args) < 1 { | ||||
| 		return fmt.Errorf("Alert name is required") | ||||
| 	} | ||||
| 	name := args[0] | ||||
|  | ||||
| 	ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout) | ||||
| 	defer cancel() | ||||
|  | ||||
| 	kubeClient, err := utils.KubeClient(rootArgs.kubeconfig, rootArgs.kubecontext) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	namespacedName := types.NamespacedName{ | ||||
| 		Namespace: rootArgs.namespace, | ||||
| 		Name:      name, | ||||
| 	} | ||||
| 	var alert notificationv1.Alert | ||||
| 	err = kubeClient.Get(ctx, namespacedName, &alert) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Actionf("suspending Alert %s in %s namespace", name, rootArgs.namespace) | ||||
| 	alert.Spec.Suspend = true | ||||
| 	if err := kubeClient.Update(ctx, &alert); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("Alert suspended") | ||||
|  | ||||
| 	return nil | ||||
| func (obj alertAdapter) isSuspended() bool { | ||||
| 	return obj.Alert.Spec.Suspend | ||||
| } | ||||
|  | ||||
| func (obj alertAdapter) setSuspended() { | ||||
| 	obj.Alert.Spec.Suspend = true | ||||
| } | ||||
|  | ||||
| @ -17,14 +17,8 @@ limitations under the License. | ||||
| package main | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
|  | ||||
| 	"github.com/spf13/cobra" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
|  | ||||
| 	"github.com/fluxcd/flux2/internal/utils" | ||||
| 	notificationv1 "github.com/fluxcd/notification-controller/api/v1beta1" | ||||
| 	"github.com/spf13/cobra" | ||||
| ) | ||||
|  | ||||
| var suspendReceiverCmd = &cobra.Command{ | ||||
| @ -33,43 +27,20 @@ var suspendReceiverCmd = &cobra.Command{ | ||||
| 	Long:  "The suspend command disables the reconciliation of a Receiver resource.", | ||||
| 	Example: `  # Suspend reconciliation for an existing Receiver | ||||
|   flux suspend receiver main`, | ||||
| 	RunE: suspendReceiverCmdRun, | ||||
| 	RunE: suspendCommand{ | ||||
| 		apiType: receiverType, | ||||
| 		object:  &receiverAdapter{¬ificationv1.Receiver{}}, | ||||
| 	}.run, | ||||
| } | ||||
|  | ||||
| func init() { | ||||
| 	suspendCmd.AddCommand(suspendReceiverCmd) | ||||
| } | ||||
|  | ||||
| func suspendReceiverCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	if len(args) < 1 { | ||||
| 		return fmt.Errorf("Receiver name is required") | ||||
| 	} | ||||
| 	name := args[0] | ||||
|  | ||||
| 	ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout) | ||||
| 	defer cancel() | ||||
|  | ||||
| 	kubeClient, err := utils.KubeClient(rootArgs.kubeconfig, rootArgs.kubecontext) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	namespacedName := types.NamespacedName{ | ||||
| 		Namespace: rootArgs.namespace, | ||||
| 		Name:      name, | ||||
| 	} | ||||
| 	var receiver notificationv1.Receiver | ||||
| 	err = kubeClient.Get(ctx, namespacedName, &receiver) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Actionf("suspending Receiver %s in %s namespace", name, rootArgs.namespace) | ||||
| 	receiver.Spec.Suspend = true | ||||
| 	if err := kubeClient.Update(ctx, &receiver); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("Receiver suspended") | ||||
|  | ||||
| 	return nil | ||||
| func (obj receiverAdapter) isSuspended() bool { | ||||
| 	return obj.Receiver.Spec.Suspend | ||||
| } | ||||
|  | ||||
| func (obj receiverAdapter) setSuspended() { | ||||
| 	obj.Receiver.Spec.Suspend = true | ||||
| } | ||||
|  | ||||
		Reference in New Issue
	
	Block a user
	 Somtochi Onyekwere
					Somtochi Onyekwere