mirror of
				https://github.com/fluxcd/flux2.git
				synced 2025-10-31 08:17:19 +08:00 
			
		
		
		
	Refactor create, reconcile and resume cmds
* Take ObservedGeneration into account in readiness checks where applicable * Reduce amount of code (and duplicate GETs) by working with pointers where possible * Improve logged messages to properly take resource names into account and better describe processes
This commit is contained in:
		| @ -246,13 +246,15 @@ func applySyncManifests(ctx context.Context, kubeClient client.Client, name, nam | ||||
|  | ||||
| 	logger.Waitingf("waiting for cluster sync") | ||||
|  | ||||
| 	var gitRepository sourcev1.GitRepository | ||||
| 	if err := wait.PollImmediate(pollInterval, timeout, | ||||
| 		isGitRepositoryReady(ctx, kubeClient, name, namespace)); err != nil { | ||||
| 		isGitRepositoryReady(ctx, kubeClient, types.NamespacedName{Name: name, Namespace: namespace}, &gitRepository)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	var kustomization kustomizev1.Kustomization | ||||
| 	if err := wait.PollImmediate(pollInterval, timeout, | ||||
| 		isKustomizationReady(ctx, kubeClient, name, namespace)); err != nil { | ||||
| 		isKustomizationReady(ctx, kubeClient, types.NamespacedName{Name: name, Namespace: namespace}, &kustomization)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
|  | ||||
| @ -61,7 +61,7 @@ func init() { | ||||
|  | ||||
| func createAlertCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	if len(args) < 1 { | ||||
| 		return fmt.Errorf("alert name is required") | ||||
| 		return fmt.Errorf("Alert name is required") | ||||
| 	} | ||||
| 	name := args[0] | ||||
|  | ||||
| @ -92,7 +92,7 @@ func createAlertCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	} | ||||
|  | ||||
| 	if !export { | ||||
| 		logger.Generatef("generating alert") | ||||
| 		logger.Generatef("generating Alert") | ||||
| 	} | ||||
|  | ||||
| 	alert := notificationv1.Alert{ | ||||
| @ -123,23 +123,23 @@ func createAlertCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Actionf("applying alert") | ||||
| 	if err := upsertAlert(ctx, kubeClient, alert); err != nil { | ||||
| 	logger.Actionf("applying Alert") | ||||
| 	namespacedName, err := upsertAlert(ctx, kubeClient, &alert) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Waitingf("waiting for reconciliation") | ||||
| 	logger.Waitingf("waiting for Alert reconciliation") | ||||
| 	if err := wait.PollImmediate(pollInterval, timeout, | ||||
| 		isAlertReady(ctx, kubeClient, name, namespace)); err != nil { | ||||
| 		isAlertReady(ctx, kubeClient, namespacedName, &alert)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Successf("alert %s is ready", name) | ||||
|  | ||||
| 	logger.Successf("Alert %s is ready", name) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func upsertAlert(ctx context.Context, kubeClient client.Client, alert notificationv1.Alert) error { | ||||
| func upsertAlert(ctx context.Context, kubeClient client.Client, | ||||
| 	alert *notificationv1.Alert) (types.NamespacedName, error) { | ||||
| 	namespacedName := types.NamespacedName{ | ||||
| 		Namespace: alert.GetNamespace(), | ||||
| 		Name:      alert.GetName(), | ||||
| @ -149,35 +149,30 @@ func upsertAlert(ctx context.Context, kubeClient client.Client, alert notificati | ||||
| 	err := kubeClient.Get(ctx, namespacedName, &existing) | ||||
| 	if err != nil { | ||||
| 		if errors.IsNotFound(err) { | ||||
| 			if err := kubeClient.Create(ctx, &alert); err != nil { | ||||
| 				return err | ||||
| 			if err := kubeClient.Create(ctx, alert); err != nil { | ||||
| 				return namespacedName, err | ||||
| 			} else { | ||||
| 				logger.Successf("alert created") | ||||
| 				return nil | ||||
| 				logger.Successf("Alert created") | ||||
| 				return namespacedName, nil | ||||
| 			} | ||||
| 		} | ||||
| 		return err | ||||
| 		return namespacedName, err | ||||
| 	} | ||||
|  | ||||
| 	existing.Labels = alert.Labels | ||||
| 	existing.Spec = alert.Spec | ||||
| 	if err := kubeClient.Update(ctx, &existing); err != nil { | ||||
| 		return err | ||||
| 		return namespacedName, err | ||||
| 	} | ||||
|  | ||||
| 	logger.Successf("alert updated") | ||||
| 	return nil | ||||
| 	alert = &existing | ||||
| 	logger.Successf("Alert updated") | ||||
| 	return namespacedName, nil | ||||
| } | ||||
|  | ||||
| func isAlertReady(ctx context.Context, kubeClient client.Client, name, namespace string) wait.ConditionFunc { | ||||
| func isAlertReady(ctx context.Context, kubeClient client.Client, | ||||
| 	namespacedName types.NamespacedName, alert *notificationv1.Alert) wait.ConditionFunc { | ||||
| 	return func() (bool, error) { | ||||
| 		var alert notificationv1.Alert | ||||
| 		namespacedName := types.NamespacedName{ | ||||
| 			Namespace: namespace, | ||||
| 			Name:      name, | ||||
| 		} | ||||
|  | ||||
| 		err := kubeClient.Get(ctx, namespacedName, &alert) | ||||
| 		err := kubeClient.Get(ctx, namespacedName, alert) | ||||
| 		if err != nil { | ||||
| 			return false, err | ||||
| 		} | ||||
|  | ||||
| @ -71,12 +71,12 @@ func init() { | ||||
|  | ||||
| func createAlertProviderCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	if len(args) < 1 { | ||||
| 		return fmt.Errorf("provider name is required") | ||||
| 		return fmt.Errorf("Provider name is required") | ||||
| 	} | ||||
| 	name := args[0] | ||||
|  | ||||
| 	if apType == "" { | ||||
| 		return fmt.Errorf("type is required") | ||||
| 		return fmt.Errorf("Provider type is required") | ||||
| 	} | ||||
|  | ||||
| 	sourceLabels, err := parseLabels() | ||||
| @ -85,10 +85,10 @@ func createAlertProviderCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	} | ||||
|  | ||||
| 	if !export { | ||||
| 		logger.Generatef("generating provider") | ||||
| 		logger.Generatef("generating Provider") | ||||
| 	} | ||||
|  | ||||
| 	alertProvider := notificationv1.Provider{ | ||||
| 	provider := notificationv1.Provider{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			Name:      name, | ||||
| 			Namespace: namespace, | ||||
| @ -103,13 +103,13 @@ func createAlertProviderCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	} | ||||
|  | ||||
| 	if apSecretRef != "" { | ||||
| 		alertProvider.Spec.SecretRef = &corev1.LocalObjectReference{ | ||||
| 		provider.Spec.SecretRef = &corev1.LocalObjectReference{ | ||||
| 			Name: apSecretRef, | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if export { | ||||
| 		return exportAlertProvider(alertProvider) | ||||
| 		return exportAlertProvider(provider) | ||||
| 	} | ||||
|  | ||||
| 	ctx, cancel := context.WithTimeout(context.Background(), timeout) | ||||
| @ -120,66 +120,63 @@ func createAlertProviderCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Actionf("applying provider") | ||||
| 	if err := upsertAlertProvider(ctx, kubeClient, alertProvider); err != nil { | ||||
| 	logger.Actionf("applying Provider") | ||||
| 	namespacedName, err := upsertAlertProvider(ctx, kubeClient, &provider) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Waitingf("waiting for reconciliation") | ||||
| 	logger.Waitingf("waiting for Provider reconciliation") | ||||
| 	if err := wait.PollImmediate(pollInterval, timeout, | ||||
| 		isAlertProviderReady(ctx, kubeClient, name, namespace)); err != nil { | ||||
| 		isAlertProviderReady(ctx, kubeClient, namespacedName, &provider)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Successf("provider %s is ready", name) | ||||
| 	logger.Successf("Provider %s is ready", name) | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func upsertAlertProvider(ctx context.Context, kubeClient client.Client, alertProvider notificationv1.Provider) error { | ||||
| func upsertAlertProvider(ctx context.Context, kubeClient client.Client, | ||||
| 	provider *notificationv1.Provider) (types.NamespacedName, error) { | ||||
| 	namespacedName := types.NamespacedName{ | ||||
| 		Namespace: alertProvider.GetNamespace(), | ||||
| 		Name:      alertProvider.GetName(), | ||||
| 		Namespace: provider.GetNamespace(), | ||||
| 		Name:      provider.GetName(), | ||||
| 	} | ||||
|  | ||||
| 	var existing notificationv1.Provider | ||||
| 	err := kubeClient.Get(ctx, namespacedName, &existing) | ||||
| 	if err != nil { | ||||
| 		if errors.IsNotFound(err) { | ||||
| 			if err := kubeClient.Create(ctx, &alertProvider); err != nil { | ||||
| 				return err | ||||
| 			if err := kubeClient.Create(ctx, provider); err != nil { | ||||
| 				return namespacedName, err | ||||
| 			} else { | ||||
| 				logger.Successf("provider created") | ||||
| 				return nil | ||||
| 				logger.Successf("Provider created") | ||||
| 				return namespacedName, nil | ||||
| 			} | ||||
| 		} | ||||
| 		return err | ||||
| 		return namespacedName, err | ||||
| 	} | ||||
|  | ||||
| 	existing.Labels = alertProvider.Labels | ||||
| 	existing.Spec = alertProvider.Spec | ||||
| 	existing.Labels = provider.Labels | ||||
| 	existing.Spec = provider.Spec | ||||
| 	if err := kubeClient.Update(ctx, &existing); err != nil { | ||||
| 		return err | ||||
| 		return namespacedName, err | ||||
| 	} | ||||
|  | ||||
| 	logger.Successf("provider updated") | ||||
| 	return nil | ||||
| 	provider = &existing | ||||
| 	logger.Successf("Provider updated") | ||||
| 	return namespacedName, nil | ||||
| } | ||||
|  | ||||
| func isAlertProviderReady(ctx context.Context, kubeClient client.Client, name, namespace string) wait.ConditionFunc { | ||||
| func isAlertProviderReady(ctx context.Context, kubeClient client.Client, | ||||
| 	namespacedName types.NamespacedName, provider *notificationv1.Provider) wait.ConditionFunc { | ||||
| 	return func() (bool, error) { | ||||
| 		var alertProvider notificationv1.Provider | ||||
| 		namespacedName := types.NamespacedName{ | ||||
| 			Namespace: namespace, | ||||
| 			Name:      name, | ||||
| 		} | ||||
|  | ||||
| 		err := kubeClient.Get(ctx, namespacedName, &alertProvider) | ||||
| 		err := kubeClient.Get(ctx, namespacedName, provider) | ||||
| 		if err != nil { | ||||
| 			return false, err | ||||
| 		} | ||||
|  | ||||
| 		if c := meta.GetCondition(alertProvider.Status.Conditions, meta.ReadyCondition); c != nil { | ||||
| 		if c := meta.GetCondition(provider.Status.Conditions, meta.ReadyCondition); c != nil { | ||||
| 			switch c.Status { | ||||
| 			case corev1.ConditionTrue: | ||||
| 				return true, nil | ||||
|  | ||||
| @ -19,14 +19,13 @@ package main | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"github.com/fluxcd/pkg/apis/meta" | ||||
| 	"io/ioutil" | ||||
|  | ||||
| 	"github.com/fluxcd/pkg/apis/meta" | ||||
|  | ||||
| 	"github.com/spf13/cobra" | ||||
| 	corev1 "k8s.io/api/core/v1" | ||||
| 	apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" | ||||
| 	"k8s.io/apimachinery/pkg/api/errors" | ||||
| 	apierrors "k8s.io/apimachinery/pkg/api/errors" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
| @ -34,7 +33,6 @@ import ( | ||||
| 	"sigs.k8s.io/yaml" | ||||
|  | ||||
| 	helmv2 "github.com/fluxcd/helm-controller/api/v2beta1" | ||||
| 	sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" | ||||
| ) | ||||
|  | ||||
| var createHelmReleaseCmd = &cobra.Command{ | ||||
| @ -100,7 +98,7 @@ var ( | ||||
| ) | ||||
|  | ||||
| func init() { | ||||
| 	createHelmReleaseCmd.Flags().StringVar(&hrName, "release-name", "", "name used for the Helm release, defaults to a composition of '[<target-namespace>-]<hr-name>'") | ||||
| 	createHelmReleaseCmd.Flags().StringVar(&hrName, "release-name", "", "name used for the Helm release, defaults to a composition of '[<target-namespace>-]<HelmRelease-name>'") | ||||
| 	createHelmReleaseCmd.Flags().StringVar(&hrSource, "source", "", "source that contains the chart (<kind>/<name>)") | ||||
| 	createHelmReleaseCmd.Flags().StringVar(&hrChart, "chart", "", "Helm chart name or path") | ||||
| 	createHelmReleaseCmd.Flags().StringVar(&hrChartVersion, "chart-version", "", "Helm chart version, accepts a semver range (ignored for charts from GitRepository sources)") | ||||
| @ -112,7 +110,7 @@ func init() { | ||||
|  | ||||
| func createHelmReleaseCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	if len(args) < 1 { | ||||
| 		return fmt.Errorf("release name is required") | ||||
| 		return fmt.Errorf("HelmRelease name is required") | ||||
| 	} | ||||
| 	name := args[0] | ||||
|  | ||||
| @ -137,7 +135,7 @@ func createHelmReleaseCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	} | ||||
|  | ||||
| 	if !export { | ||||
| 		logger.Generatef("generating release") | ||||
| 		logger.Generatef("generating HelmRelease") | ||||
| 	} | ||||
|  | ||||
| 	helmRelease := helmv2.HelmRelease{ | ||||
| @ -193,43 +191,25 @@ func createHelmReleaseCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Actionf("applying release") | ||||
| 	if err := upsertHelmRelease(ctx, kubeClient, helmRelease); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Waitingf("waiting for reconciliation") | ||||
| 	chartName := fmt.Sprintf("%s-%s", namespace, name) | ||||
| 	if err := wait.PollImmediate(pollInterval, timeout, | ||||
| 		isHelmChartReady(ctx, kubeClient, chartName, namespace)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if err := wait.PollImmediate(pollInterval, timeout, | ||||
| 		isHelmReleaseReady(ctx, kubeClient, name, namespace)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Successf("release %s is ready", name) | ||||
|  | ||||
| 	namespacedName := types.NamespacedName{ | ||||
| 		Namespace: namespace, | ||||
| 		Name:      name, | ||||
| 	} | ||||
| 	err = kubeClient.Get(ctx, namespacedName, &helmRelease) | ||||
| 	logger.Actionf("applying HelmRelease") | ||||
| 	namespacedName, err := upsertHelmRelease(ctx, kubeClient, &helmRelease) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("release failed: %w", err) | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if helmRelease.Status.LastAppliedRevision != "" { | ||||
| 		logger.Successf("applied revision %s", helmRelease.Status.LastAppliedRevision) | ||||
| 	} else { | ||||
| 		return fmt.Errorf("reconciliation failed") | ||||
| 	logger.Waitingf("waiting for HelmRelease reconciliation") | ||||
| 	if err := wait.PollImmediate(pollInterval, timeout, | ||||
| 		isHelmReleaseReady(ctx, kubeClient, namespacedName, &helmRelease)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("HelmRelease %s is ready", name) | ||||
|  | ||||
| 	logger.Successf("applied revision %s", helmRelease.Status.LastAppliedRevision) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func upsertHelmRelease(ctx context.Context, kubeClient client.Client, helmRelease helmv2.HelmRelease) error { | ||||
| func upsertHelmRelease(ctx context.Context, kubeClient client.Client, | ||||
| 	helmRelease *helmv2.HelmRelease) (types.NamespacedName, error) { | ||||
| 	namespacedName := types.NamespacedName{ | ||||
| 		Namespace: helmRelease.GetNamespace(), | ||||
| 		Name:      helmRelease.GetName(), | ||||
| @ -239,75 +219,39 @@ func upsertHelmRelease(ctx context.Context, kubeClient client.Client, helmReleas | ||||
| 	err := kubeClient.Get(ctx, namespacedName, &existing) | ||||
| 	if err != nil { | ||||
| 		if errors.IsNotFound(err) { | ||||
| 			if err := kubeClient.Create(ctx, &helmRelease); err != nil { | ||||
| 				return err | ||||
| 			if err := kubeClient.Create(ctx, helmRelease); err != nil { | ||||
| 				return namespacedName, err | ||||
| 			} else { | ||||
| 				logger.Successf("release created") | ||||
| 				return nil | ||||
| 				logger.Successf("HelmRelease created") | ||||
| 				return namespacedName, nil | ||||
| 			} | ||||
| 		} | ||||
| 		return err | ||||
| 		return namespacedName, err | ||||
| 	} | ||||
|  | ||||
| 	existing.Labels = helmRelease.Labels | ||||
| 	existing.Spec = helmRelease.Spec | ||||
| 	if err := kubeClient.Update(ctx, &existing); err != nil { | ||||
| 		return err | ||||
| 		return namespacedName, err | ||||
| 	} | ||||
|  | ||||
| 	logger.Successf("release updated") | ||||
| 	return nil | ||||
| 	helmRelease = &existing | ||||
| 	logger.Successf("HelmRelease updated") | ||||
| 	return namespacedName, nil | ||||
| } | ||||
|  | ||||
| func isHelmChartReady(ctx context.Context, kubeClient client.Client, name, namespace string) wait.ConditionFunc { | ||||
| func isHelmReleaseReady(ctx context.Context, kubeClient client.Client, | ||||
| 	namespacedName types.NamespacedName, helmRelease *helmv2.HelmRelease) wait.ConditionFunc { | ||||
| 	return func() (bool, error) { | ||||
| 		var helmChart sourcev1.HelmChart | ||||
| 		namespacedName := types.NamespacedName{ | ||||
| 			Namespace: namespace, | ||||
| 			Name:      name, | ||||
| 		} | ||||
|  | ||||
| 		err := kubeClient.Get(ctx, namespacedName, &helmChart) | ||||
| 		if err != nil { | ||||
| 			if apierrors.IsNotFound(err) { | ||||
| 				return false, nil | ||||
| 			} | ||||
| 			return false, err | ||||
| 		} | ||||
|  | ||||
| 		if c := meta.GetCondition(helmChart.Status.Conditions, meta.ReadyCondition); c != nil { | ||||
| 			switch c.Status { | ||||
| 			case corev1.ConditionTrue: | ||||
| 				return true, nil | ||||
| 			case corev1.ConditionFalse: | ||||
| 				return false, fmt.Errorf(c.Message) | ||||
| 			} | ||||
| 		} | ||||
| 		return false, nil | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func isHelmReleaseReady(ctx context.Context, kubeClient client.Client, name, namespace string) wait.ConditionFunc { | ||||
| 	return func() (bool, error) { | ||||
| 		var helmRelease helmv2.HelmRelease | ||||
| 		namespacedName := types.NamespacedName{ | ||||
| 			Namespace: namespace, | ||||
| 			Name:      name, | ||||
| 		} | ||||
|  | ||||
| 		err := kubeClient.Get(ctx, namespacedName, &helmRelease) | ||||
| 		err := kubeClient.Get(ctx, namespacedName, helmRelease) | ||||
| 		if err != nil { | ||||
| 			return false, err | ||||
| 		} | ||||
|  | ||||
| 		if c := meta.GetCondition(helmRelease.Status.Conditions, meta.ReadyCondition); c != nil { | ||||
| 			switch c.Status { | ||||
| 			case corev1.ConditionTrue: | ||||
| 				return true, nil | ||||
| 			case corev1.ConditionFalse: | ||||
| 				return false, fmt.Errorf(c.Message) | ||||
| 			} | ||||
| 		// Confirm the state we are observing is for the current generation | ||||
| 		if helmRelease.Generation != helmRelease.Status.ObservedGeneration { | ||||
| 			return false, nil | ||||
| 		} | ||||
| 		return false, nil | ||||
|  | ||||
| 		return meta.HasReadyCondition(helmRelease.Status.Conditions), nil | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @ -102,7 +102,7 @@ func init() { | ||||
|  | ||||
| func createKsCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	if len(args) < 1 { | ||||
| 		return fmt.Errorf("kustomization name is required") | ||||
| 		return fmt.Errorf("Kustomization name is required") | ||||
| 	} | ||||
| 	name := args[0] | ||||
|  | ||||
| @ -127,7 +127,7 @@ func createKsCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	} | ||||
|  | ||||
| 	if !export { | ||||
| 		logger.Generatef("generating kustomization") | ||||
| 		logger.Generatef("generating Kustomization") | ||||
| 	} | ||||
|  | ||||
| 	ksLabels, err := parseLabels() | ||||
| @ -232,38 +232,25 @@ func createKsCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Actionf("applying kustomization") | ||||
| 	if err := upsertKustomization(ctx, kubeClient, kustomization); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Waitingf("waiting for kustomization sync") | ||||
| 	if err := wait.PollImmediate(pollInterval, timeout, | ||||
| 		isKustomizationReady(ctx, kubeClient, name, namespace)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Successf("kustomization %s is ready", name) | ||||
|  | ||||
| 	namespacedName := types.NamespacedName{ | ||||
| 		Namespace: namespace, | ||||
| 		Name:      name, | ||||
| 	} | ||||
| 	err = kubeClient.Get(ctx, namespacedName, &kustomization) | ||||
| 	logger.Actionf("applying Kustomization") | ||||
| 	namespacedName, err := upsertKustomization(ctx, kubeClient, &kustomization) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("kustomization sync failed: %w", err) | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if kustomization.Status.LastAppliedRevision != "" { | ||||
| 		logger.Successf("applied revision %s", kustomization.Status.LastAppliedRevision) | ||||
| 	} else { | ||||
| 		return fmt.Errorf("kustomization sync failed") | ||||
| 	logger.Waitingf("waiting for Kustomization reconciliation") | ||||
| 	if err := wait.PollImmediate(pollInterval, timeout, | ||||
| 		isKustomizationReady(ctx, kubeClient, namespacedName, &kustomization)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("Kustomization %s is ready", name) | ||||
|  | ||||
| 	logger.Successf("applied revision %s", kustomization.Status.LastAppliedRevision) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func upsertKustomization(ctx context.Context, kubeClient client.Client, kustomization kustomizev1.Kustomization) error { | ||||
| func upsertKustomization(ctx context.Context, kubeClient client.Client, | ||||
| 	kustomization *kustomizev1.Kustomization) (types.NamespacedName, error) { | ||||
| 	namespacedName := types.NamespacedName{ | ||||
| 		Namespace: kustomization.GetNamespace(), | ||||
| 		Name:      kustomization.GetName(), | ||||
| @ -273,39 +260,39 @@ func upsertKustomization(ctx context.Context, kubeClient client.Client, kustomiz | ||||
| 	err := kubeClient.Get(ctx, namespacedName, &existing) | ||||
| 	if err != nil { | ||||
| 		if errors.IsNotFound(err) { | ||||
| 			if err := kubeClient.Create(ctx, &kustomization); err != nil { | ||||
| 				return err | ||||
| 			if err := kubeClient.Create(ctx, kustomization); err != nil { | ||||
| 				return namespacedName, err | ||||
| 			} else { | ||||
| 				logger.Successf("kustomization created") | ||||
| 				return nil | ||||
| 				logger.Successf("Kustomization created") | ||||
| 				return namespacedName, nil | ||||
| 			} | ||||
| 		} | ||||
| 		return err | ||||
| 		return namespacedName, err | ||||
| 	} | ||||
|  | ||||
| 	existing.Labels = kustomization.Labels | ||||
| 	existing.Spec = kustomization.Spec | ||||
| 	if err := kubeClient.Update(ctx, &existing); err != nil { | ||||
| 		return err | ||||
| 		return namespacedName, err | ||||
| 	} | ||||
|  | ||||
| 	logger.Successf("kustomization updated") | ||||
| 	return nil | ||||
| 	kustomization = &existing | ||||
| 	logger.Successf("Kustomization updated") | ||||
| 	return namespacedName, nil | ||||
| } | ||||
|  | ||||
| func isKustomizationReady(ctx context.Context, kubeClient client.Client, name, namespace string) wait.ConditionFunc { | ||||
| func isKustomizationReady(ctx context.Context, kubeClient client.Client, | ||||
| 	namespacedName types.NamespacedName, kustomization *kustomizev1.Kustomization) wait.ConditionFunc { | ||||
| 	return func() (bool, error) { | ||||
| 		var kustomization kustomizev1.Kustomization | ||||
| 		namespacedName := types.NamespacedName{ | ||||
| 			Namespace: namespace, | ||||
| 			Name:      name, | ||||
| 		} | ||||
|  | ||||
| 		err := kubeClient.Get(ctx, namespacedName, &kustomization) | ||||
| 		err := kubeClient.Get(ctx, namespacedName, kustomization) | ||||
| 		if err != nil { | ||||
| 			return false, err | ||||
| 		} | ||||
|  | ||||
| 		// Confirm the state we are observing is for the current generation | ||||
| 		if kustomization.Generation != kustomization.Status.ObservedGeneration { | ||||
| 			return false, nil | ||||
| 		} | ||||
|  | ||||
| 		if c := meta.GetCondition(kustomization.Status.Conditions, meta.ReadyCondition); c != nil { | ||||
| 			switch c.Status { | ||||
| 			case corev1.ConditionTrue: | ||||
|  | ||||
| @ -65,12 +65,12 @@ func init() { | ||||
|  | ||||
| func createReceiverCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	if len(args) < 1 { | ||||
| 		return fmt.Errorf("receiver name is required") | ||||
| 		return fmt.Errorf("Receiver name is required") | ||||
| 	} | ||||
| 	name := args[0] | ||||
|  | ||||
| 	if rcvType == "" { | ||||
| 		return fmt.Errorf("type is required") | ||||
| 		return fmt.Errorf("Receiver type is required") | ||||
| 	} | ||||
|  | ||||
| 	if rcvSecretRef == "" { | ||||
| @ -100,7 +100,7 @@ func createReceiverCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	} | ||||
|  | ||||
| 	if !export { | ||||
| 		logger.Generatef("generating receiver") | ||||
| 		logger.Generatef("generating Receiver") | ||||
| 	} | ||||
|  | ||||
| 	receiver := notificationv1.Receiver{ | ||||
| @ -132,34 +132,25 @@ func createReceiverCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Actionf("applying receiver") | ||||
| 	if err := upsertReceiver(ctx, kubeClient, receiver); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Waitingf("waiting for reconciliation") | ||||
| 	if err := wait.PollImmediate(pollInterval, timeout, | ||||
| 		isReceiverReady(ctx, kubeClient, name, namespace)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Successf("receiver %s is ready", name) | ||||
|  | ||||
| 	namespacedName := types.NamespacedName{ | ||||
| 		Namespace: namespace, | ||||
| 		Name:      name, | ||||
| 	} | ||||
| 	err = kubeClient.Get(ctx, namespacedName, &receiver) | ||||
| 	logger.Actionf("applying Receiver") | ||||
| 	namespacedName, err := upsertReceiver(ctx, kubeClient, &receiver) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("receiver sync failed: %w", err) | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Waitingf("waiting for Receiver reconciliation") | ||||
| 	if err := wait.PollImmediate(pollInterval, timeout, | ||||
| 		isReceiverReady(ctx, kubeClient, namespacedName, &receiver)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("Receiver %s is ready", name) | ||||
|  | ||||
| 	logger.Successf("generated webhook URL %s", receiver.Status.URL) | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func upsertReceiver(ctx context.Context, kubeClient client.Client, receiver notificationv1.Receiver) error { | ||||
| func upsertReceiver(ctx context.Context, kubeClient client.Client, | ||||
| 	receiver *notificationv1.Receiver) (types.NamespacedName, error) { | ||||
| 	namespacedName := types.NamespacedName{ | ||||
| 		Namespace: receiver.GetNamespace(), | ||||
| 		Name:      receiver.GetName(), | ||||
| @ -169,35 +160,30 @@ func upsertReceiver(ctx context.Context, kubeClient client.Client, receiver noti | ||||
| 	err := kubeClient.Get(ctx, namespacedName, &existing) | ||||
| 	if err != nil { | ||||
| 		if errors.IsNotFound(err) { | ||||
| 			if err := kubeClient.Create(ctx, &receiver); err != nil { | ||||
| 				return err | ||||
| 			if err := kubeClient.Create(ctx, receiver); err != nil { | ||||
| 				return namespacedName, err | ||||
| 			} else { | ||||
| 				logger.Successf("receiver created") | ||||
| 				return nil | ||||
| 				logger.Successf("Receiver created") | ||||
| 				return namespacedName, nil | ||||
| 			} | ||||
| 		} | ||||
| 		return err | ||||
| 		return namespacedName, err | ||||
| 	} | ||||
|  | ||||
| 	existing.Labels = receiver.Labels | ||||
| 	existing.Spec = receiver.Spec | ||||
| 	if err := kubeClient.Update(ctx, &existing); err != nil { | ||||
| 		return err | ||||
| 		return namespacedName, err | ||||
| 	} | ||||
|  | ||||
| 	logger.Successf("receiver updated") | ||||
| 	return nil | ||||
| 	receiver = &existing | ||||
| 	logger.Successf("Receiver updated") | ||||
| 	return namespacedName, nil | ||||
| } | ||||
|  | ||||
| func isReceiverReady(ctx context.Context, kubeClient client.Client, name, namespace string) wait.ConditionFunc { | ||||
| func isReceiverReady(ctx context.Context, kubeClient client.Client, | ||||
| 	namespacedName types.NamespacedName, receiver *notificationv1.Receiver) wait.ConditionFunc { | ||||
| 	return func() (bool, error) { | ||||
| 		var receiver notificationv1.Receiver | ||||
| 		namespacedName := types.NamespacedName{ | ||||
| 			Namespace: namespace, | ||||
| 			Name:      name, | ||||
| 		} | ||||
|  | ||||
| 		err := kubeClient.Get(ctx, namespacedName, &receiver) | ||||
| 		err := kubeClient.Get(ctx, namespacedName, receiver) | ||||
| 		if err != nil { | ||||
| 			return false, err | ||||
| 		} | ||||
|  | ||||
| @ -83,13 +83,13 @@ func init() { | ||||
|  | ||||
| func createSourceBucketCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	if len(args) < 1 { | ||||
| 		return fmt.Errorf("source name is required") | ||||
| 		return fmt.Errorf("Bucket source name is required") | ||||
| 	} | ||||
| 	name := args[0] | ||||
| 	secretName := fmt.Sprintf("bucket-%s", name) | ||||
|  | ||||
| 	if !utils.containsItemString(supportedSourceBucketProviders, sourceBucketProvider) { | ||||
| 		return fmt.Errorf("bucket provider %s is not supported, can be %v", | ||||
| 		return fmt.Errorf("Bucket provider %s is not supported, can be %v", | ||||
| 			sourceBucketProvider, supportedSourceBucketProviders) | ||||
| 	} | ||||
|  | ||||
| @ -112,7 +112,7 @@ func createSourceBucketCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	} | ||||
| 	defer os.RemoveAll(tmpDir) | ||||
|  | ||||
| 	bucket := sourcev1.Bucket{ | ||||
| 	bucket := &sourcev1.Bucket{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			Name:      name, | ||||
| 			Namespace: namespace, | ||||
| @ -131,7 +131,7 @@ func createSourceBucketCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	} | ||||
|  | ||||
| 	if export { | ||||
| 		return exportBucket(bucket) | ||||
| 		return exportBucket(*bucket) | ||||
| 	} | ||||
|  | ||||
| 	ctx, cancel := context.WithTimeout(context.Background(), timeout) | ||||
| @ -142,7 +142,7 @@ func createSourceBucketCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Generatef("generating source") | ||||
| 	logger.Generatef("generating Bucket source") | ||||
|  | ||||
| 	secret := corev1.Secret{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| @ -168,38 +168,28 @@ func createSourceBucketCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 		logger.Successf("authentication configured") | ||||
| 	} | ||||
|  | ||||
| 	logger.Actionf("applying source") | ||||
| 	if err := upsertBucket(ctx, kubeClient, bucket); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Waitingf("waiting for download") | ||||
| 	if err := wait.PollImmediate(pollInterval, timeout, | ||||
| 		isBucketReady(ctx, kubeClient, name, namespace)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Successf("download completed") | ||||
|  | ||||
| 	namespacedName := types.NamespacedName{ | ||||
| 		Namespace: namespace, | ||||
| 		Name:      name, | ||||
| 	} | ||||
| 	err = kubeClient.Get(ctx, namespacedName, &bucket) | ||||
| 	logger.Actionf("applying Bucket source") | ||||
| 	namespacedName, err := upsertBucket(ctx, kubeClient, bucket) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("could not retrieve bucket: %w", err) | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if bucket.Status.Artifact != nil { | ||||
| 		logger.Successf("fetched revision: %s", bucket.Status.Artifact.Revision) | ||||
| 	} else { | ||||
| 		return fmt.Errorf("download failed, artifact not found") | ||||
| 	logger.Waitingf("waiting for Bucket source reconciliation") | ||||
| 	if err := wait.PollImmediate(pollInterval, timeout, | ||||
| 		isBucketReady(ctx, kubeClient, namespacedName, bucket)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("Bucket source reconciliation completed") | ||||
|  | ||||
| 	if bucket.Status.Artifact == nil { | ||||
| 		return fmt.Errorf("Bucket source reconciliation but no artifact was found") | ||||
| 	} | ||||
| 	logger.Successf("fetched revision: %s", bucket.Status.Artifact.Revision) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func upsertBucket(ctx context.Context, kubeClient client.Client, bucket sourcev1.Bucket) error { | ||||
| func upsertBucket(ctx context.Context, kubeClient client.Client, | ||||
| 	bucket *sourcev1.Bucket) (types.NamespacedName, error) { | ||||
| 	namespacedName := types.NamespacedName{ | ||||
| 		Namespace: bucket.GetNamespace(), | ||||
| 		Name:      bucket.GetName(), | ||||
| @ -209,22 +199,22 @@ func upsertBucket(ctx context.Context, kubeClient client.Client, bucket sourcev1 | ||||
| 	err := kubeClient.Get(ctx, namespacedName, &existing) | ||||
| 	if err != nil { | ||||
| 		if errors.IsNotFound(err) { | ||||
| 			if err := kubeClient.Create(ctx, &bucket); err != nil { | ||||
| 				return err | ||||
| 			if err := kubeClient.Create(ctx, bucket); err != nil { | ||||
| 				return namespacedName, err | ||||
| 			} else { | ||||
| 				logger.Successf("source created") | ||||
| 				return nil | ||||
| 				logger.Successf("Bucket source created") | ||||
| 				return namespacedName, nil | ||||
| 			} | ||||
| 		} | ||||
| 		return err | ||||
| 		return namespacedName, err | ||||
| 	} | ||||
|  | ||||
| 	existing.Labels = bucket.Labels | ||||
| 	existing.Spec = bucket.Spec | ||||
| 	if err := kubeClient.Update(ctx, &existing); err != nil { | ||||
| 		return err | ||||
| 		return namespacedName, err | ||||
| 	} | ||||
|  | ||||
| 	logger.Successf("source updated") | ||||
| 	return nil | ||||
| 	bucket = &existing | ||||
| 	logger.Successf("Bucket source updated") | ||||
| 	return namespacedName, nil | ||||
| } | ||||
|  | ||||
| @ -111,7 +111,7 @@ func init() { | ||||
|  | ||||
| func createSourceGitCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	if len(args) < 1 { | ||||
| 		return fmt.Errorf("source name is required") | ||||
| 		return fmt.Errorf("GitRepository source name is required") | ||||
| 	} | ||||
| 	name := args[0] | ||||
|  | ||||
| @ -234,7 +234,7 @@ func createSourceGitCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 		logger.Successf("authentication configured") | ||||
| 	} | ||||
|  | ||||
| 	logger.Generatef("generating source") | ||||
| 	logger.Generatef("generating GitRepository source") | ||||
|  | ||||
| 	if withAuth { | ||||
| 		gitRepository.Spec.SecretRef = &corev1.LocalObjectReference{ | ||||
| @ -242,34 +242,23 @@ func createSourceGitCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	logger.Actionf("applying source") | ||||
| 	if err := upsertGitRepository(ctx, kubeClient, gitRepository); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Waitingf("waiting for git sync") | ||||
| 	if err := wait.PollImmediate(pollInterval, timeout, | ||||
| 		isGitRepositoryReady(ctx, kubeClient, name, namespace)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Successf("git sync completed") | ||||
|  | ||||
| 	namespacedName := types.NamespacedName{ | ||||
| 		Namespace: namespace, | ||||
| 		Name:      name, | ||||
| 	} | ||||
| 	err = kubeClient.Get(ctx, namespacedName, &gitRepository) | ||||
| 	logger.Actionf("applying GitRepository source") | ||||
| 	namespacedName, err := upsertGitRepository(ctx, kubeClient, &gitRepository) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("git sync failed: %w", err) | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if gitRepository.Status.Artifact != nil { | ||||
| 		logger.Successf("fetched revision: %s", gitRepository.Status.Artifact.Revision) | ||||
| 	} else { | ||||
| 		return fmt.Errorf("git sync failed, artifact not found") | ||||
| 	logger.Waitingf("waiting for GitRepository source reconciliation") | ||||
| 	if err := wait.PollImmediate(pollInterval, timeout, | ||||
| 		isGitRepositoryReady(ctx, kubeClient, namespacedName, &gitRepository)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("GitRepository source reconciliation completed") | ||||
|  | ||||
| 	if gitRepository.Status.Artifact == nil { | ||||
| 		return fmt.Errorf("GitRepository source reconciliation completed but no artifact was found") | ||||
| 	} | ||||
| 	logger.Successf("fetched revision: %s", gitRepository.Status.Artifact.Revision) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @ -330,7 +319,8 @@ func upsertSecret(ctx context.Context, kubeClient client.Client, secret corev1.S | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func upsertGitRepository(ctx context.Context, kubeClient client.Client, gitRepository sourcev1.GitRepository) error { | ||||
| func upsertGitRepository(ctx context.Context, kubeClient client.Client, | ||||
| 	gitRepository *sourcev1.GitRepository) (types.NamespacedName, error) { | ||||
| 	namespacedName := types.NamespacedName{ | ||||
| 		Namespace: gitRepository.GetNamespace(), | ||||
| 		Name:      gitRepository.GetName(), | ||||
| @ -340,35 +330,30 @@ func upsertGitRepository(ctx context.Context, kubeClient client.Client, gitRepos | ||||
| 	err := kubeClient.Get(ctx, namespacedName, &existing) | ||||
| 	if err != nil { | ||||
| 		if errors.IsNotFound(err) { | ||||
| 			if err := kubeClient.Create(ctx, &gitRepository); err != nil { | ||||
| 				return err | ||||
| 			if err := kubeClient.Create(ctx, gitRepository); err != nil { | ||||
| 				return namespacedName, err | ||||
| 			} else { | ||||
| 				logger.Successf("source created") | ||||
| 				return nil | ||||
| 				logger.Successf("GitRepository source created") | ||||
| 				return namespacedName, nil | ||||
| 			} | ||||
| 		} | ||||
| 		return err | ||||
| 		return namespacedName, err | ||||
| 	} | ||||
|  | ||||
| 	existing.Labels = gitRepository.Labels | ||||
| 	existing.Spec = gitRepository.Spec | ||||
| 	if err := kubeClient.Update(ctx, &existing); err != nil { | ||||
| 		return err | ||||
| 		return namespacedName, err | ||||
| 	} | ||||
|  | ||||
| 	logger.Successf("source updated") | ||||
| 	return nil | ||||
| 	gitRepository = &existing | ||||
| 	logger.Successf("GitRepository source updated") | ||||
| 	return namespacedName, nil | ||||
| } | ||||
|  | ||||
| func isGitRepositoryReady(ctx context.Context, kubeClient client.Client, name, namespace string) wait.ConditionFunc { | ||||
| func isGitRepositoryReady(ctx context.Context, kubeClient client.Client, | ||||
| 	namespacedName types.NamespacedName, gitRepository *sourcev1.GitRepository) wait.ConditionFunc { | ||||
| 	return func() (bool, error) { | ||||
| 		var gitRepository sourcev1.GitRepository | ||||
| 		namespacedName := types.NamespacedName{ | ||||
| 			Namespace: namespace, | ||||
| 			Name:      name, | ||||
| 		} | ||||
|  | ||||
| 		err := kubeClient.Get(ctx, namespacedName, &gitRepository) | ||||
| 		err := kubeClient.Get(ctx, namespacedName, gitRepository) | ||||
| 		if err != nil { | ||||
| 			return false, err | ||||
| 		} | ||||
|  | ||||
| @ -83,7 +83,7 @@ func init() { | ||||
|  | ||||
| func createSourceHelmCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	if len(args) < 1 { | ||||
| 		return fmt.Errorf("source name is required") | ||||
| 		return fmt.Errorf("HelmRepository source name is required") | ||||
| 	} | ||||
| 	name := args[0] | ||||
| 	secretName := fmt.Sprintf("helm-%s", name) | ||||
| @ -107,7 +107,7 @@ func createSourceHelmCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 		return fmt.Errorf("url parse failed: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	helmRepository := sourcev1.HelmRepository{ | ||||
| 	helmRepository := &sourcev1.HelmRepository{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			Name:      name, | ||||
| 			Namespace: namespace, | ||||
| @ -122,7 +122,7 @@ func createSourceHelmCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	} | ||||
|  | ||||
| 	if export { | ||||
| 		return exportHelmRepository(helmRepository) | ||||
| 		return exportHelmRepository(*helmRepository) | ||||
| 	} | ||||
|  | ||||
| 	ctx, cancel := context.WithTimeout(context.Background(), timeout) | ||||
| @ -133,7 +133,7 @@ func createSourceHelmCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Generatef("generating source") | ||||
| 	logger.Generatef("generating HelmRepository source") | ||||
|  | ||||
| 	secret := corev1.Secret{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| @ -181,38 +181,28 @@ func createSourceHelmCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 		logger.Successf("authentication configured") | ||||
| 	} | ||||
|  | ||||
| 	logger.Actionf("applying source") | ||||
| 	if err := upsertHelmRepository(ctx, kubeClient, helmRepository); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Waitingf("waiting for index download") | ||||
| 	if err := wait.PollImmediate(pollInterval, timeout, | ||||
| 		isHelmRepositoryReady(ctx, kubeClient, name, namespace)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Successf("index download completed") | ||||
|  | ||||
| 	namespacedName := types.NamespacedName{ | ||||
| 		Namespace: namespace, | ||||
| 		Name:      name, | ||||
| 	} | ||||
| 	err = kubeClient.Get(ctx, namespacedName, &helmRepository) | ||||
| 	logger.Actionf("applying HelmRepository source") | ||||
| 	namespacedName, err := upsertHelmRepository(ctx, kubeClient, helmRepository) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("helm index failed: %w", err) | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if helmRepository.Status.Artifact != nil { | ||||
| 		logger.Successf("fetched revision: %s", helmRepository.Status.Artifact.Revision) | ||||
| 	} else { | ||||
| 		return fmt.Errorf("index download failed, artifact not found") | ||||
| 	logger.Waitingf("waiting for HelmRepository source reconciliation") | ||||
| 	if err := wait.PollImmediate(pollInterval, timeout, | ||||
| 		isHelmRepositoryReady(ctx, kubeClient, namespacedName, helmRepository)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("HelmRepository source reconciliation completed") | ||||
|  | ||||
| 	if helmRepository.Status.Artifact == nil { | ||||
| 		return fmt.Errorf("HelmRepository source reconciliation completed but no artifact was found") | ||||
| 	} | ||||
| 	logger.Successf("fetched revision: %s", helmRepository.Status.Artifact.Revision) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func upsertHelmRepository(ctx context.Context, kubeClient client.Client, helmRepository sourcev1.HelmRepository) error { | ||||
| func upsertHelmRepository(ctx context.Context, kubeClient client.Client, | ||||
| 	helmRepository *sourcev1.HelmRepository) (types.NamespacedName, error) { | ||||
| 	namespacedName := types.NamespacedName{ | ||||
| 		Namespace: helmRepository.GetNamespace(), | ||||
| 		Name:      helmRepository.GetName(), | ||||
| @ -222,22 +212,22 @@ func upsertHelmRepository(ctx context.Context, kubeClient client.Client, helmRep | ||||
| 	err := kubeClient.Get(ctx, namespacedName, &existing) | ||||
| 	if err != nil { | ||||
| 		if errors.IsNotFound(err) { | ||||
| 			if err := kubeClient.Create(ctx, &helmRepository); err != nil { | ||||
| 				return err | ||||
| 			if err := kubeClient.Create(ctx, helmRepository); err != nil { | ||||
| 				return namespacedName, err | ||||
| 			} else { | ||||
| 				logger.Successf("source created") | ||||
| 				return nil | ||||
| 				return namespacedName, nil | ||||
| 			} | ||||
| 		} | ||||
| 		return err | ||||
| 		return namespacedName, err | ||||
| 	} | ||||
|  | ||||
| 	existing.Labels = helmRepository.Labels | ||||
| 	existing.Spec = helmRepository.Spec | ||||
| 	if err := kubeClient.Update(ctx, &existing); err != nil { | ||||
| 		return err | ||||
| 		return namespacedName, err | ||||
| 	} | ||||
|  | ||||
| 	helmRepository = &existing | ||||
| 	logger.Successf("source updated") | ||||
| 	return nil | ||||
| 	return namespacedName, nil | ||||
| } | ||||
|  | ||||
| @ -45,7 +45,7 @@ func init() { | ||||
|  | ||||
| func reconcileAlertCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	if len(args) < 1 { | ||||
| 		return fmt.Errorf("alert name is required") | ||||
| 		return fmt.Errorf("Alert name is required") | ||||
| 	} | ||||
| 	name := args[0] | ||||
|  | ||||
| @ -62,7 +62,7 @@ func reconcileAlertCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 		Name:      name, | ||||
| 	} | ||||
|  | ||||
| 	logger.Actionf("annotating alert %s in %s namespace", name, namespace) | ||||
| 	logger.Actionf("annotating Alert %s in %s namespace", name, namespace) | ||||
| 	var alert notificationv1.Alert | ||||
| 	err = kubeClient.Get(ctx, namespacedName, &alert) | ||||
| 	if err != nil { | ||||
| @ -79,15 +79,13 @@ func reconcileAlertCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	if err := kubeClient.Update(ctx, &alert); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("alert annotated") | ||||
| 	logger.Successf("Alert annotated") | ||||
|  | ||||
| 	logger.Waitingf("waiting for reconciliation") | ||||
| 	if err := wait.PollImmediate(pollInterval, timeout, | ||||
| 		isAlertReady(ctx, kubeClient, name, namespace)); err != nil { | ||||
| 		isAlertReady(ctx, kubeClient, namespacedName, &alert)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Successf("alert reconciliation completed") | ||||
|  | ||||
| 	logger.Successf("Alert reconciliation completed") | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @ -45,7 +45,7 @@ func init() { | ||||
|  | ||||
| func reconcileAlertProviderCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	if len(args) < 1 { | ||||
| 		return fmt.Errorf("provider name is required") | ||||
| 		return fmt.Errorf("Provider name is required") | ||||
| 	} | ||||
| 	name := args[0] | ||||
|  | ||||
| @ -62,7 +62,7 @@ func reconcileAlertProviderCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 		Name:      name, | ||||
| 	} | ||||
|  | ||||
| 	logger.Actionf("annotating provider %s in %s namespace", name, namespace) | ||||
| 	logger.Actionf("annotating Provider %s in %s namespace", name, namespace) | ||||
| 	var alertProvider notificationv1.Provider | ||||
| 	err = kubeClient.Get(ctx, namespacedName, &alertProvider) | ||||
| 	if err != nil { | ||||
| @ -79,15 +79,13 @@ func reconcileAlertProviderCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	if err := kubeClient.Update(ctx, &alertProvider); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("provider annotated") | ||||
| 	logger.Successf("Provider annotated") | ||||
|  | ||||
| 	logger.Waitingf("waiting for reconciliation") | ||||
| 	if err := wait.PollImmediate(pollInterval, timeout, | ||||
| 		isAlertProviderReady(ctx, kubeClient, name, namespace)); err != nil { | ||||
| 		isAlertProviderReady(ctx, kubeClient, namespacedName, &alertProvider)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Successf("provider reconciliation completed") | ||||
|  | ||||
| 	logger.Successf("Provider reconciliation completed") | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @ -98,19 +98,19 @@ func reconcileHrCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	lastHandledReconcileAt := helmRelease.Status.LastHandledReconcileAt | ||||
| 	logger.Actionf("annotating HelmRelease %s in %s namespace", name, namespace) | ||||
| 	if err := requestHelmReleaseReconciliation(ctx, kubeClient, namespacedName); err != nil { | ||||
| 	if err := requestHelmReleaseReconciliation(ctx, kubeClient, namespacedName, &helmRelease); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("HelmRelease annotated") | ||||
|  | ||||
| 	logger.Waitingf("waiting for HelmRelease reconciliation") | ||||
| 	if err := wait.PollImmediate(pollInterval, timeout, | ||||
| 		helmReleaseReconciliationHandled(ctx, kubeClient, name, namespace, helmRelease.Status.LastHandledReconcileAt), | ||||
| 		helmReleaseReconciliationHandled(ctx, kubeClient, namespacedName, &helmRelease, lastHandledReconcileAt), | ||||
| 	); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Successf("HelmRelease reconciliation completed") | ||||
|  | ||||
| 	err = kubeClient.Get(ctx, namespacedName, &helmRelease) | ||||
| @ -120,7 +120,7 @@ func reconcileHrCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	if c := meta.GetCondition(helmRelease.Status.Conditions, meta.ReadyCondition); c != nil { | ||||
| 		switch c.Status { | ||||
| 		case corev1.ConditionFalse: | ||||
| 			return fmt.Errorf("HelmRelease reconciliation failed") | ||||
| 			return fmt.Errorf("HelmRelease reconciliation failed: %s", c.Message) | ||||
| 		default: | ||||
| 			logger.Successf("reconciled revision %s", helmRelease.Status.LastAppliedRevision) | ||||
| 		} | ||||
| @ -129,39 +129,29 @@ func reconcileHrCmdRun(cmd *cobra.Command, args []string) error { | ||||
| } | ||||
|  | ||||
| func helmReleaseReconciliationHandled(ctx context.Context, kubeClient client.Client, | ||||
| 	name, namespace, lastHandledReconcileAt string) wait.ConditionFunc { | ||||
| 	namespacedName types.NamespacedName, helmRelease *helmv2.HelmRelease, lastHandledReconcileAt string) wait.ConditionFunc { | ||||
| 	return func() (bool, error) { | ||||
| 		var helmRelease helmv2.HelmRelease | ||||
| 		namespacedName := types.NamespacedName{ | ||||
| 			Namespace: namespace, | ||||
| 			Name:      name, | ||||
| 		} | ||||
|  | ||||
| 		err := kubeClient.Get(ctx, namespacedName, &helmRelease) | ||||
| 		err := kubeClient.Get(ctx, namespacedName, helmRelease) | ||||
| 		if err != nil { | ||||
| 			return false, err | ||||
| 		} | ||||
|  | ||||
| 		return helmRelease.Status.LastHandledReconcileAt != lastHandledReconcileAt, nil | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func requestHelmReleaseReconciliation(ctx context.Context, kubeClient client.Client, namespacedName types.NamespacedName) error { | ||||
| 	var release helmv2.HelmRelease | ||||
| func requestHelmReleaseReconciliation(ctx context.Context, kubeClient client.Client, | ||||
| 	namespacedName types.NamespacedName, helmRelease *helmv2.HelmRelease) error { | ||||
| 	return retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) { | ||||
| 		if err := kubeClient.Get(ctx, namespacedName, &release); err != nil { | ||||
| 		if err := kubeClient.Get(ctx, namespacedName, helmRelease); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		if release.Annotations == nil { | ||||
| 			release.Annotations = map[string]string{ | ||||
| 		if helmRelease.Annotations == nil { | ||||
| 			helmRelease.Annotations = map[string]string{ | ||||
| 				meta.ReconcileAtAnnotation: time.Now().Format(time.RFC3339Nano), | ||||
| 			} | ||||
| 		} else { | ||||
| 			release.Annotations[meta.ReconcileAtAnnotation] = time.Now().Format(time.RFC3339Nano) | ||||
| 			helmRelease.Annotations[meta.ReconcileAtAnnotation] = time.Now().Format(time.RFC3339Nano) | ||||
| 		} | ||||
|  | ||||
| 		err = kubeClient.Update(ctx, &release) | ||||
| 		return | ||||
| 		return kubeClient.Update(ctx, helmRelease) | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| @ -54,14 +54,14 @@ var ( | ||||
| ) | ||||
|  | ||||
| func init() { | ||||
| 	reconcileKsCmd.Flags().BoolVar(&syncKsWithSource, "with-source", false, "reconcile kustomization source") | ||||
| 	reconcileKsCmd.Flags().BoolVar(&syncKsWithSource, "with-source", false, "reconcile Kustomization source") | ||||
|  | ||||
| 	reconcileCmd.AddCommand(reconcileKsCmd) | ||||
| } | ||||
|  | ||||
| func reconcileKsCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	if len(args) < 1 { | ||||
| 		return fmt.Errorf("kustomization name is required") | ||||
| 		return fmt.Errorf("Kustomization name is required") | ||||
| 	} | ||||
| 	name := args[0] | ||||
|  | ||||
| @ -77,7 +77,6 @@ func reconcileKsCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 		Namespace: namespace, | ||||
| 		Name:      name, | ||||
| 	} | ||||
|  | ||||
| 	var kustomization kustomizev1.Kustomization | ||||
| 	err = kubeClient.Get(ctx, namespacedName, &kustomization) | ||||
| 	if err != nil { | ||||
| @ -96,30 +95,26 @@ func reconcileKsCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	logger.Actionf("annotating kustomization %s in %s namespace", name, namespace) | ||||
| 	if err := requestKustomizeReconciliation(ctx, kubeClient, namespacedName); err != nil { | ||||
| 	lastHandledReconcileAt := kustomization.Status.LastHandledReconcileAt | ||||
| 	logger.Actionf("annotating Kustomization %s in %s namespace", name, namespace) | ||||
| 	if err := requestKustomizeReconciliation(ctx, kubeClient, namespacedName, &kustomization); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("kustomization annotated") | ||||
| 	logger.Successf("Kustomization annotated") | ||||
|  | ||||
| 	logger.Waitingf("waiting for kustomization reconciliation") | ||||
| 	logger.Waitingf("waiting for Kustomization reconciliation") | ||||
| 	if err := wait.PollImmediate( | ||||
| 		pollInterval, timeout, | ||||
| 		kustomizeReconciliationHandled(ctx, kubeClient, name, namespace, kustomization.Status.LastHandledReconcileAt), | ||||
| 		kustomizeReconciliationHandled(ctx, kubeClient, namespacedName, &kustomization, lastHandledReconcileAt), | ||||
| 	); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("Kustomization reconciliation completed") | ||||
|  | ||||
| 	logger.Successf("kustomization reconciliation completed") | ||||
|  | ||||
| 	err = kubeClient.Get(ctx, namespacedName, &kustomization) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if c := meta.GetCondition(kustomization.Status.Conditions, meta.ReadyCondition); c != nil { | ||||
| 		switch c.Status { | ||||
| 		case corev1.ConditionFalse: | ||||
| 			return fmt.Errorf("kustomization reconciliation failed") | ||||
| 			return fmt.Errorf("Kustomization reconciliation failed") | ||||
| 		default: | ||||
| 			logger.Successf("reconciled revision %s", kustomization.Status.LastAppliedRevision) | ||||
| 		} | ||||
| @ -128,30 +123,22 @@ func reconcileKsCmdRun(cmd *cobra.Command, args []string) error { | ||||
| } | ||||
|  | ||||
| func kustomizeReconciliationHandled(ctx context.Context, kubeClient client.Client, | ||||
| 	name, namespace, lastHandledReconcileAt string) wait.ConditionFunc { | ||||
| 	namespacedName types.NamespacedName, kustomization *kustomizev1.Kustomization, lastHandledReconcileAt string) wait.ConditionFunc { | ||||
| 	return func() (bool, error) { | ||||
| 		var kustomize kustomizev1.Kustomization | ||||
| 		namespacedName := types.NamespacedName{ | ||||
| 			Namespace: namespace, | ||||
| 			Name:      name, | ||||
| 		} | ||||
|  | ||||
| 		err := kubeClient.Get(ctx, namespacedName, &kustomize) | ||||
| 		err := kubeClient.Get(ctx, namespacedName, kustomization) | ||||
| 		if err != nil { | ||||
| 			return false, err | ||||
| 		} | ||||
|  | ||||
| 		return kustomize.Status.LastHandledReconcileAt != lastHandledReconcileAt, nil | ||||
| 		return kustomization.Status.LastHandledReconcileAt != lastHandledReconcileAt, nil | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func requestKustomizeReconciliation(ctx context.Context, kubeClient client.Client, namespacedName types.NamespacedName) error { | ||||
| 	var kustomization kustomizev1.Kustomization | ||||
| func requestKustomizeReconciliation(ctx context.Context, kubeClient client.Client, | ||||
| 	namespacedName types.NamespacedName, kustomization *kustomizev1.Kustomization) error { | ||||
| 	return retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) { | ||||
| 		if err := kubeClient.Get(ctx, namespacedName, &kustomization); err != nil { | ||||
| 		if err := kubeClient.Get(ctx, namespacedName, kustomization); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		if kustomization.Annotations == nil { | ||||
| 			kustomization.Annotations = map[string]string{ | ||||
| 				meta.ReconcileAtAnnotation: time.Now().Format(time.RFC3339Nano), | ||||
| @ -159,8 +146,6 @@ func requestKustomizeReconciliation(ctx context.Context, kubeClient client.Clien | ||||
| 		} else { | ||||
| 			kustomization.Annotations[meta.ReconcileAtAnnotation] = time.Now().Format(time.RFC3339Nano) | ||||
| 		} | ||||
|  | ||||
| 		err = kubeClient.Update(ctx, &kustomization) | ||||
| 		return | ||||
| 		return kubeClient.Update(ctx, kustomization) | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| @ -62,7 +62,7 @@ func reconcileReceiverCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 		Name:      name, | ||||
| 	} | ||||
|  | ||||
| 	logger.Actionf("annotating receiver %s in %s namespace", name, namespace) | ||||
| 	logger.Actionf("annotating Receiver %s in %s namespace", name, namespace) | ||||
| 	var receiver notificationv1.Receiver | ||||
| 	err = kubeClient.Get(ctx, namespacedName, &receiver) | ||||
| 	if err != nil { | ||||
| @ -79,15 +79,15 @@ func reconcileReceiverCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	if err := kubeClient.Update(ctx, &receiver); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("receiver annotated") | ||||
| 	logger.Successf("Receiver annotated") | ||||
|  | ||||
| 	logger.Waitingf("waiting for reconciliation") | ||||
| 	logger.Waitingf("waiting for Receiver reconciliation") | ||||
| 	if err := wait.PollImmediate(pollInterval, timeout, | ||||
| 		isReceiverReady(ctx, kubeClient, name, namespace)); err != nil { | ||||
| 		isReceiverReady(ctx, kubeClient, namespacedName, &receiver)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Successf("receiver reconciliation completed") | ||||
| 	logger.Successf("Receiver reconciliation completed") | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @ -64,7 +64,7 @@ func reconcileSourceBucketCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 		Name:      name, | ||||
| 	} | ||||
|  | ||||
| 	logger.Actionf("annotating source %s in %s namespace", name, namespace) | ||||
| 	logger.Actionf("annotating Bucket source %s in %s namespace", name, namespace) | ||||
| 	var bucket sourcev1.Bucket | ||||
| 	err = kubeClient.Get(ctx, namespacedName, &bucket) | ||||
| 	if err != nil { | ||||
| @ -81,42 +81,35 @@ func reconcileSourceBucketCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	if err := kubeClient.Update(ctx, &bucket); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("source annotated") | ||||
| 	logger.Successf("Bucket source annotated") | ||||
|  | ||||
| 	logger.Waitingf("waiting for reconciliation") | ||||
| 	logger.Waitingf("waiting for Bucket source reconciliation") | ||||
| 	if err := wait.PollImmediate(pollInterval, timeout, | ||||
| 		isBucketReady(ctx, kubeClient, name, namespace)); err != nil { | ||||
| 		isBucketReady(ctx, kubeClient, namespacedName, &bucket)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("Bucket source reconciliation completed") | ||||
|  | ||||
| 	logger.Successf("bucket reconciliation completed") | ||||
|  | ||||
| 	err = kubeClient.Get(ctx, namespacedName, &bucket) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if bucket.Status.Artifact != nil { | ||||
| 		logger.Successf("fetched revision %s", bucket.Status.Artifact.Revision) | ||||
| 	} else { | ||||
| 		return fmt.Errorf("bucket reconciliation failed, artifact not found") | ||||
| 	if bucket.Status.Artifact == nil { | ||||
| 		return fmt.Errorf("Bucket source reconciliation completed but no artifact was found") | ||||
| 	} | ||||
| 	logger.Successf("fetched revision %s", bucket.Status.Artifact.Revision) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func isBucketReady(ctx context.Context, kubeClient client.Client, name, namespace string) wait.ConditionFunc { | ||||
| func isBucketReady(ctx context.Context, kubeClient client.Client, | ||||
| 	namespacedName types.NamespacedName, bucket *sourcev1.Bucket) wait.ConditionFunc { | ||||
| 	return func() (bool, error) { | ||||
| 		var bucket sourcev1.Bucket | ||||
| 		namespacedName := types.NamespacedName{ | ||||
| 			Namespace: namespace, | ||||
| 			Name:      name, | ||||
| 		} | ||||
|  | ||||
| 		err := kubeClient.Get(ctx, namespacedName, &bucket) | ||||
| 		err := kubeClient.Get(ctx, namespacedName, bucket) | ||||
| 		if err != nil { | ||||
| 			return false, err | ||||
| 		} | ||||
|  | ||||
| 		// Confirm the state we are observing is for the current generation | ||||
| 		if bucket.Generation != bucket.Status.ObservedGeneration { | ||||
| 			return false, nil | ||||
| 		} | ||||
|  | ||||
| 		if c := meta.GetCondition(bucket.Status.Conditions, meta.ReadyCondition); c != nil { | ||||
| 			switch c.Status { | ||||
| 			case corev1.ConditionTrue: | ||||
|  | ||||
| @ -62,7 +62,7 @@ func reconcileSourceGitCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 		Name:      name, | ||||
| 	} | ||||
|  | ||||
| 	logger.Actionf("annotating source %s in %s namespace", name, namespace) | ||||
| 	logger.Actionf("annotating GitRepository source %s in %s namespace", name, namespace) | ||||
| 	var gitRepository sourcev1.GitRepository | ||||
| 	err = kubeClient.Get(ctx, namespacedName, &gitRepository) | ||||
| 	if err != nil { | ||||
| @ -79,25 +79,18 @@ func reconcileSourceGitCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	if err := kubeClient.Update(ctx, &gitRepository); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("source annotated") | ||||
| 	logger.Successf("GitRepository source annotated") | ||||
|  | ||||
| 	logger.Waitingf("waiting for reconciliation") | ||||
| 	logger.Waitingf("waiting for GitRepository source reconciliation") | ||||
| 	if err := wait.PollImmediate(pollInterval, timeout, | ||||
| 		isGitRepositoryReady(ctx, kubeClient, name, namespace)); err != nil { | ||||
| 		isGitRepositoryReady(ctx, kubeClient, namespacedName, &gitRepository)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("GitRepository source reconciliation completed") | ||||
|  | ||||
| 	logger.Successf("git reconciliation completed") | ||||
|  | ||||
| 	err = kubeClient.Get(ctx, namespacedName, &gitRepository) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if gitRepository.Status.Artifact != nil { | ||||
| 		logger.Successf("fetched revision %s", gitRepository.Status.Artifact.Revision) | ||||
| 	} else { | ||||
| 		return fmt.Errorf("git reconciliation failed, artifact not found") | ||||
| 	if gitRepository.Status.Artifact == nil { | ||||
| 		return fmt.Errorf("GitRepository source reconciliation completed but no artifact was found") | ||||
| 	} | ||||
| 	logger.Successf("fetched revision %s", gitRepository.Status.Artifact.Revision) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @ -47,7 +47,7 @@ func init() { | ||||
|  | ||||
| func reconcileSourceHelmCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	if len(args) < 1 { | ||||
| 		return fmt.Errorf("source name is required") | ||||
| 		return fmt.Errorf("HelmRepository source name is required") | ||||
| 	} | ||||
| 	name := args[0] | ||||
|  | ||||
| @ -64,7 +64,7 @@ func reconcileSourceHelmCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 		Name:      name, | ||||
| 	} | ||||
|  | ||||
| 	logger.Actionf("annotating source %s in %s namespace", name, namespace) | ||||
| 	logger.Actionf("annotating HelmRepository source %s in %s namespace", name, namespace) | ||||
| 	var helmRepository sourcev1.HelmRepository | ||||
| 	err = kubeClient.Get(ctx, namespacedName, &helmRepository) | ||||
| 	if err != nil { | ||||
| @ -81,42 +81,35 @@ func reconcileSourceHelmCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	if err := kubeClient.Update(ctx, &helmRepository); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("source annotated") | ||||
| 	logger.Successf("HelmRepository source annotated") | ||||
|  | ||||
| 	logger.Waitingf("waiting for reconciliation") | ||||
| 	logger.Waitingf("waiting for HelmRepository source reconciliation") | ||||
| 	if err := wait.PollImmediate(pollInterval, timeout, | ||||
| 		isHelmRepositoryReady(ctx, kubeClient, name, namespace)); err != nil { | ||||
| 		isHelmRepositoryReady(ctx, kubeClient, namespacedName, &helmRepository)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("HelmRepository source reconciliation completed") | ||||
|  | ||||
| 	logger.Successf("helm reconciliation completed") | ||||
|  | ||||
| 	err = kubeClient.Get(ctx, namespacedName, &helmRepository) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if helmRepository.Status.Artifact != nil { | ||||
| 		logger.Successf("fetched revision %s", helmRepository.Status.Artifact.Revision) | ||||
| 	} else { | ||||
| 		return fmt.Errorf("helm reconciliation failed, artifact not found") | ||||
| 	if helmRepository.Status.Artifact == nil { | ||||
| 		return fmt.Errorf("HelmRepository source reconciliation completed but no artifact was found") | ||||
| 	} | ||||
| 	logger.Successf("fetched revision %s", helmRepository.Status.Artifact.Revision) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func isHelmRepositoryReady(ctx context.Context, kubeClient client.Client, name, namespace string) wait.ConditionFunc { | ||||
| func isHelmRepositoryReady(ctx context.Context, kubeClient client.Client, | ||||
| 	namespacedName types.NamespacedName, helmRepository *sourcev1.HelmRepository) wait.ConditionFunc { | ||||
| 	return func() (bool, error) { | ||||
| 		var helmRepository sourcev1.HelmRepository | ||||
| 		namespacedName := types.NamespacedName{ | ||||
| 			Namespace: namespace, | ||||
| 			Name:      name, | ||||
| 		} | ||||
|  | ||||
| 		err := kubeClient.Get(ctx, namespacedName, &helmRepository) | ||||
| 		err := kubeClient.Get(ctx, namespacedName, helmRepository) | ||||
| 		if err != nil { | ||||
| 			return false, err | ||||
| 		} | ||||
|  | ||||
| 		// Confirm the state we are observing is for the current generation | ||||
| 		if helmRepository.Generation != helmRepository.Status.ObservedGeneration { | ||||
| 			return false, nil | ||||
| 		} | ||||
|  | ||||
| 		if c := meta.GetCondition(helmRepository.Status.Conditions, meta.ReadyCondition); c != nil { | ||||
| 			switch c.Status { | ||||
| 			case corev1.ConditionTrue: | ||||
|  | ||||
| @ -78,24 +78,17 @@ func resumeAlertCmdRun(cmd *cobra.Command, args []string) error { | ||||
|  | ||||
| 	logger.Waitingf("waiting for Alert reconciliation") | ||||
| 	if err := wait.PollImmediate(pollInterval, timeout, | ||||
| 		isAlertResumed(ctx, kubeClient, name, namespace)); err != nil { | ||||
| 		isAlertResumed(ctx, kubeClient, namespacedName, &alert)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Successf("Alert reconciliation completed") | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func isAlertResumed(ctx context.Context, kubeClient client.Client, name, namespace string) wait.ConditionFunc { | ||||
| func isAlertResumed(ctx context.Context, kubeClient client.Client, | ||||
| 	namespacedName types.NamespacedName, alert *notificationv1.Alert) wait.ConditionFunc { | ||||
| 	return func() (bool, error) { | ||||
| 		var alert notificationv1.Alert | ||||
| 		namespacedName := types.NamespacedName{ | ||||
| 			Namespace: namespace, | ||||
| 			Name:      name, | ||||
| 		} | ||||
|  | ||||
| 		err := kubeClient.Get(ctx, namespacedName, &alert) | ||||
| 		err := kubeClient.Get(ctx, namespacedName, alert) | ||||
| 		if err != nil { | ||||
| 			return false, err | ||||
| 		} | ||||
|  | ||||
| @ -79,36 +79,25 @@ func resumeHrCmdRun(cmd *cobra.Command, args []string) error { | ||||
|  | ||||
| 	logger.Waitingf("waiting for HelmRelease reconciliation") | ||||
| 	if err := wait.PollImmediate(pollInterval, timeout, | ||||
| 		isHelmReleaseResumed(ctx, kubeClient, name, namespace)); err != nil { | ||||
| 		isHelmReleaseResumed(ctx, kubeClient, namespacedName, &helmRelease)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Successf("HelmRelease reconciliation completed") | ||||
|  | ||||
| 	err = kubeClient.Get(ctx, namespacedName, &helmRelease) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if helmRelease.Status.LastAppliedRevision != "" { | ||||
| 		logger.Successf("applied revision %s", helmRelease.Status.LastAppliedRevision) | ||||
| 	} else { | ||||
| 		return fmt.Errorf("HelmRelease reconciliation failed") | ||||
| 	} | ||||
|  | ||||
| 	logger.Successf("applied revision %s", helmRelease.Status.LastAppliedRevision) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func isHelmReleaseResumed(ctx context.Context, kubeClient client.Client, name, namespace string) wait.ConditionFunc { | ||||
| func isHelmReleaseResumed(ctx context.Context, kubeClient client.Client, | ||||
| 	namespacedName types.NamespacedName, helmRelease *helmv2.HelmRelease) wait.ConditionFunc { | ||||
| 	return func() (bool, error) { | ||||
| 		var helmRelease helmv2.HelmRelease | ||||
| 		namespacedName := types.NamespacedName{ | ||||
| 			Namespace: namespace, | ||||
| 			Name:      name, | ||||
| 		err := kubeClient.Get(ctx, namespacedName, helmRelease) | ||||
| 		if err != nil { | ||||
| 			return false, err | ||||
| 		} | ||||
|  | ||||
| 		err := kubeClient.Get(ctx, namespacedName, &helmRelease) | ||||
| 		if err != nil { | ||||
| 		// Confirm the state we are observing is for the current generation | ||||
| 		if helmRelease.Generation != helmRelease.Status.ObservedGeneration { | ||||
| 			return false, err | ||||
| 		} | ||||
|  | ||||
|  | ||||
| @ -47,7 +47,7 @@ func init() { | ||||
|  | ||||
| func resumeKsCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 	if len(args) < 1 { | ||||
| 		return fmt.Errorf("kustomization name is required") | ||||
| 		return fmt.Errorf("Kustomization name is required") | ||||
| 	} | ||||
| 	name := args[0] | ||||
|  | ||||
| @ -69,48 +69,37 @@ func resumeKsCmdRun(cmd *cobra.Command, args []string) error { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Actionf("resuming kustomization %s in %s namespace", name, namespace) | ||||
| 	logger.Actionf("resuming Kustomization %s in %s namespace", name, namespace) | ||||
| 	kustomization.Spec.Suspend = false | ||||
| 	if err := kubeClient.Update(ctx, &kustomization); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("kustomization resumed") | ||||
| 	logger.Successf("Kustomization resumed") | ||||
|  | ||||
| 	logger.Waitingf("waiting for kustomization sync") | ||||
| 	logger.Waitingf("waiting for Kustomization reconciliation") | ||||
| 	if err := wait.PollImmediate(pollInterval, timeout, | ||||
| 		isKustomizationResumed(ctx, kubeClient, name, namespace)); err != nil { | ||||
| 		isKustomizationResumed(ctx, kubeClient, namespacedName, &kustomization)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	logger.Successf("Kustomization reconciliation completed") | ||||
|  | ||||
| 	logger.Successf("kustomization sync completed") | ||||
|  | ||||
| 	err = kubeClient.Get(ctx, namespacedName, &kustomization) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if kustomization.Status.LastAppliedRevision != "" { | ||||
| 		logger.Successf("applied revision %s", kustomization.Status.LastAppliedRevision) | ||||
| 	} else { | ||||
| 		return fmt.Errorf("kustomization sync failed") | ||||
| 	} | ||||
|  | ||||
| 	logger.Successf("applied revision %s", kustomization.Status.LastAppliedRevision) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func isKustomizationResumed(ctx context.Context, kubeClient client.Client, name, namespace string) wait.ConditionFunc { | ||||
| func isKustomizationResumed(ctx context.Context, kubeClient client.Client, | ||||
| 	namespacedName types.NamespacedName, kustomization *kustomizev1.Kustomization) wait.ConditionFunc { | ||||
| 	return func() (bool, error) { | ||||
| 		var kustomization kustomizev1.Kustomization | ||||
| 		namespacedName := types.NamespacedName{ | ||||
| 			Namespace: namespace, | ||||
| 			Name:      name, | ||||
| 		} | ||||
|  | ||||
| 		err := kubeClient.Get(ctx, namespacedName, &kustomization) | ||||
| 		err := kubeClient.Get(ctx, namespacedName, kustomization) | ||||
| 		if err != nil { | ||||
| 			return false, err | ||||
| 		} | ||||
|  | ||||
| 		// Confirm the state we are observing is for the current generation | ||||
| 		if kustomization.Generation != kustomization.Status.ObservedGeneration { | ||||
| 			return false, nil | ||||
| 		} | ||||
|  | ||||
| 		if c := meta.GetCondition(kustomization.Status.Conditions, meta.ReadyCondition); c != nil { | ||||
| 			switch c.Status { | ||||
| 			case corev1.ConditionTrue: | ||||
|  | ||||
| @ -78,24 +78,18 @@ func resumeReceiverCmdRun(cmd *cobra.Command, args []string) error { | ||||
|  | ||||
| 	logger.Waitingf("waiting for Receiver reconciliation") | ||||
| 	if err := wait.PollImmediate(pollInterval, timeout, | ||||
| 		isReceiverResumed(ctx, kubeClient, name, namespace)); err != nil { | ||||
| 		isReceiverResumed(ctx, kubeClient, namespacedName, &receiver)); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	logger.Successf("Receiver reconciliation completed") | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func isReceiverResumed(ctx context.Context, kubeClient client.Client, name, namespace string) wait.ConditionFunc { | ||||
| func isReceiverResumed(ctx context.Context, kubeClient client.Client, | ||||
| 	namespacedName types.NamespacedName, receiver *notificationv1.Receiver) wait.ConditionFunc { | ||||
| 	return func() (bool, error) { | ||||
| 		var receiver notificationv1.Receiver | ||||
| 		namespacedName := types.NamespacedName{ | ||||
| 			Namespace: namespace, | ||||
| 			Name:      name, | ||||
| 		} | ||||
|  | ||||
| 		err := kubeClient.Get(ctx, namespacedName, &receiver) | ||||
| 		err := kubeClient.Get(ctx, namespacedName, receiver) | ||||
| 		if err != nil { | ||||
| 			return false, err | ||||
| 		} | ||||
|  | ||||
| @ -66,7 +66,7 @@ gotk create helmrelease [name] [flags] | ||||
|       --chart-version string      Helm chart version, accepts a semver range (ignored for charts from GitRepository sources) | ||||
|       --depends-on stringArray    HelmReleases that must be ready before this release can be installed, supported formats '<name>' and '<namespace>/<name>' | ||||
|   -h, --help                      help for helmrelease | ||||
|       --release-name string       name used for the Helm release, defaults to a composition of '[<target-namespace>-]<hr-name>' | ||||
|       --release-name string       name used for the Helm release, defaults to a composition of '[<target-namespace>-]<HelmRelease-name>' | ||||
|       --source string             source that contains the chart (<kind>/<name>) | ||||
|       --target-namespace string   namespace to install this release, defaults to the HelmRelease namespace | ||||
|       --values string             local path to the values.yaml file | ||||
|  | ||||
| @ -26,7 +26,7 @@ gotk reconcile kustomization [name] [flags] | ||||
|  | ||||
| ``` | ||||
|   -h, --help          help for kustomization | ||||
|       --with-source   reconcile kustomization source | ||||
|       --with-source   reconcile Kustomization source | ||||
| ``` | ||||
|  | ||||
| ### Options inherited from parent commands | ||||
|  | ||||
		Reference in New Issue
	
	Block a user
	 Hidde Beydals
					Hidde Beydals