From 20a42d0e4f5231d2bc4ff866ebb4c6d07569ebc0 Mon Sep 17 00:00:00 2001 From: Urvashi Mohnani Date: Wed, 11 Jan 2023 10:08:06 +0530 Subject: [PATCH] play kube: Add --wait option Add a way to keep play kube running in the foreground and terminating all pods after receiving a a SIGINT or SIGTERM signal. The pods will also be cleaned up after the containers in it have exited. If an error occurrs during kube play, any resources created till the error point will be cleane up also. Add tests for the various scenarios. Fixes #14522 Signed-off-by: Urvashi Mohnani --- cmd/podman/kube/down.go | 2 +- cmd/podman/kube/play.go | 114 +++++++++++++----- docs/source/markdown/podman-kube-play.1.md.in | 12 ++ libpod/service.go | 10 +- libpod/shutdown/handler.go | 3 +- pkg/api/handlers/libpod/kube.go | 50 ++++---- pkg/api/server/register_kube.go | 10 ++ pkg/bindings/kube/types.go | 3 + pkg/bindings/kube/types_play_options.go | 30 +++++ pkg/domain/entities/play.go | 5 + pkg/domain/infra/abi/play.go | 13 +- pkg/domain/infra/tunnel/kube.go | 2 +- test/system/700-play.bats | 52 ++++++++ 13 files changed, 244 insertions(+), 62 deletions(-) diff --git a/cmd/podman/kube/down.go b/cmd/podman/kube/down.go index b027454781..6904df488b 100644 --- a/cmd/podman/kube/down.go +++ b/cmd/podman/kube/down.go @@ -52,5 +52,5 @@ func down(cmd *cobra.Command, args []string) error { if err != nil { return err } - return teardown(reader, entities.PlayKubeDownOptions{Force: downOptions.Force}, false) + return teardown(reader, entities.PlayKubeDownOptions{Force: downOptions.Force}) } diff --git a/cmd/podman/kube/play.go b/cmd/podman/kube/play.go index 84e64a8969..93e8ed4d39 100644 --- a/cmd/podman/kube/play.go +++ b/cmd/podman/kube/play.go @@ -8,7 +8,9 @@ import ( "net" "net/http" "os" + "os/signal" "strings" + "syscall" "github.com/containers/common/pkg/auth" "github.com/containers/common/pkg/completion" @@ -18,6 +20,7 @@ import ( "github.com/containers/podman/v4/cmd/podman/registry" "github.com/containers/podman/v4/cmd/podman/utils" "github.com/containers/podman/v4/libpod/define" + "github.com/containers/podman/v4/libpod/shutdown" "github.com/containers/podman/v4/pkg/domain/entities" "github.com/containers/podman/v4/pkg/errorhandling" "github.com/containers/podman/v4/pkg/util" @@ -155,6 +158,9 @@ func playFlags(cmd *cobra.Command) { flags.StringSliceVar(&playOptions.PublishPorts, publishPortsFlagName, []string{}, "Publish a container's port, or a range of ports, to the host") _ = cmd.RegisterFlagCompletionFunc(publishPortsFlagName, completion.AutocompleteNone) + waitFlagName := "wait" + flags.BoolVarP(&playOptions.Wait, waitFlagName, "w", false, "Clean up all objects created when a SIGTERM is received or pods exit") + if !registry.IsRemote() { certDirFlagName := "cert-dir" flags.StringVar(&playOptions.CertDir, certDirFlagName, "", "`Pathname` of a directory containing TLS certificates and keys") @@ -257,11 +263,11 @@ func play(cmd *cobra.Command, args []string) error { } if playOptions.Down { - return teardown(reader, entities.PlayKubeDownOptions{Force: playOptions.Force}, false) + return teardown(reader, entities.PlayKubeDownOptions{Force: playOptions.Force}) } if playOptions.Replace { - if err := teardown(reader, entities.PlayKubeDownOptions{Force: playOptions.Force}, false); err != nil && !errorhandling.Contains(err, define.ErrNoSuchPod) { + if err := teardown(reader, entities.PlayKubeDownOptions{Force: playOptions.Force}); err != nil && !errorhandling.Contains(err, define.ErrNoSuchPod) { return err } if _, err := reader.Seek(0, 0); err != nil { @@ -269,21 +275,67 @@ func play(cmd *cobra.Command, args []string) error { } } - if err := kubeplay(reader); err != nil { + // Create a channel to catch an interrupt or SIGTERM signal + ch := make(chan os.Signal, 1) + var teardownReader *bytes.Reader + if playOptions.Wait { + // Stop the the shutdown signal handler so we can actually clean up after a SIGTERM or interrupt + if err := shutdown.Stop(); err != nil && err != shutdown.ErrNotStarted { + return err + } + signal.Notify(ch, os.Interrupt, syscall.SIGTERM) + playOptions.ServiceContainer = true + + // Read the kube yaml file again so that a reader can be passed down to the teardown function + teardownReader, err = readerFromArg(args[0]) + if err != nil { + return err + } + fmt.Println("Use ctrl+c to clean up or wait for pods to exit") + } + + var teardownErr error + cancelled := false + if playOptions.Wait { + // use a goroutine to wait for a sigterm or interrupt + go func() { + <-ch + // clean up any volumes that were created as well + fmt.Println("\nCleaning up containers, pods, and volumes...") + cancelled = true + if err := teardown(teardownReader, entities.PlayKubeDownOptions{Force: true}); err != nil && !errorhandling.Contains(err, define.ErrNoSuchPod) { + teardownErr = fmt.Errorf("error during cleanup: %v", err) + } + }() + } + + if playErr := kubeplay(reader); playErr != nil { // FIXME: The cleanup logic below must be fixed to only remove // resources that were created before a failure. Otherwise, // rerunning the same YAML file will cause an error and remove // the previously created workload. // // teardown any containers, pods, and volumes that might have created before we hit the error - // teardownReader, trErr := readerFromArg(args[0]) - // if trErr != nil { - // return trErr - // } - // if tErr := teardown(teardownReader, entities.PlayKubeDownOptions{Force: true}, true); tErr != nil && !errorhandling.Contains(tErr, define.ErrNoSuchPod) { - // return fmt.Errorf("error tearing down workloads %q after kube play error %q", tErr, err) - // } - return err + // reader, err := readerFromArg(args[0]) + // if err != nil { + // return err + // } + // if err := teardown(reader, entities.PlayKubeDownOptions{Force: true}, true); err != nil && !errorhandling.Contains(err, define.ErrNoSuchPod) { + // return fmt.Errorf("error tearing down workloads %q after kube play error %q", err, playErr) + // } + return playErr + } + if teardownErr != nil { + return teardownErr + } + + // cleanup if --wait=true and the pods have exited + if playOptions.Wait && !cancelled { + fmt.Println("Cleaning up containers, pods, and volumes...") + // clean up any volumes that were created as well + if err := teardown(teardownReader, entities.PlayKubeDownOptions{Force: true}); err != nil && !errorhandling.Contains(err, define.ErrNoSuchPod) { + return err + } } return nil @@ -328,7 +380,7 @@ func readerFromArg(fileName string) (*bytes.Reader, error) { return bytes.NewReader(data), nil } -func teardown(body io.Reader, options entities.PlayKubeDownOptions, quiet bool) error { +func teardown(body io.Reader, options entities.PlayKubeDownOptions) error { var ( podStopErrors utils.OutputErrors podRmErrors utils.OutputErrors @@ -341,14 +393,11 @@ func teardown(body io.Reader, options entities.PlayKubeDownOptions, quiet bool) } // Output stopped pods - if !quiet { - fmt.Println("Pods stopped:") - } + fmt.Println("Pods stopped:") for _, stopped := range reports.StopReport { switch { case len(stopped.Errs) > 0: podStopErrors = append(podStopErrors, stopped.Errs...) - case quiet: default: fmt.Println(stopped.Id) } @@ -360,14 +409,11 @@ func teardown(body io.Reader, options entities.PlayKubeDownOptions, quiet bool) } // Output rm'd pods - if !quiet { - fmt.Println("Pods removed:") - } + fmt.Println("Pods removed:") for _, removed := range reports.RmReport { switch { case removed.Err != nil: podRmErrors = append(podRmErrors, removed.Err) - case quiet: default: fmt.Println(removed.Id) } @@ -379,14 +425,11 @@ func teardown(body io.Reader, options entities.PlayKubeDownOptions, quiet bool) } // Output rm'd volumes - if !quiet { - fmt.Println("Secrets removed:") - } + fmt.Println("Secrets removed:") for _, removed := range reports.SecretRmReport { switch { case removed.Err != nil: secRmErrors = append(secRmErrors, removed.Err) - case quiet: default: fmt.Println(removed.ID) } @@ -397,14 +440,11 @@ func teardown(body io.Reader, options entities.PlayKubeDownOptions, quiet bool) } // Output rm'd volumes - if !quiet { - fmt.Println("Volumes removed:") - } + fmt.Println("Volumes removed:") for _, removed := range reports.VolumeRmReport { switch { case removed.Err != nil: volRmErrors = append(volRmErrors, removed.Err) - case quiet: default: fmt.Println(removed.Id) } @@ -418,6 +458,23 @@ func kubeplay(body io.Reader) error { if err != nil { return err } + if err := printPlayReport(report); err != nil { + return err + } + + // If --wait=true, we need wait for the service container to exit so that we know that the pod has exited and we can clean up + if playOptions.Wait { + _, err := registry.ContainerEngine().ContainerWait(registry.GetContext(), []string{report.ServiceContainerID}, entities.WaitOptions{}) + if err != nil { + return err + } + } + return nil +} + +// printPlayReport goes through the report returned by KubePlay and prints it out in a human +// friendly format. +func printPlayReport(report *entities.PlayKubeReport) error { // Print volumes report for i, volume := range report.Volumes { if i == 0 { @@ -473,6 +530,5 @@ func kubeplay(body io.Reader) error { if ctrsFailed > 0 { return fmt.Errorf("failed to start %d containers", ctrsFailed) } - return nil } diff --git a/docs/source/markdown/podman-kube-play.1.md.in b/docs/source/markdown/podman-kube-play.1.md.in index 254eebfbbb..dbbd2d6e4b 100644 --- a/docs/source/markdown/podman-kube-play.1.md.in +++ b/docs/source/markdown/podman-kube-play.1.md.in @@ -205,6 +205,18 @@ Start the pod after creating it, set to false to only create it. @@option userns.container +#### **--wait**, **-w** + +Run pods and containers in the foreground. Default is false. + +At any time you can run `podman pod ps` in the other shell to view a list of +the running pods and containers. + +When attached in the tty mode, you can kill the pods and containers by pressing +Ctrl-C or receiving any other interrupt signals. + +Volumes created with `podman kube play` will be removed when `--wait=true`. + ## EXAMPLES Recreate the pod and containers as described in a file called `demo.yml` diff --git a/libpod/service.go b/libpod/service.go index 3304c73e0d..976ccd88bf 100644 --- a/libpod/service.go +++ b/libpod/service.go @@ -134,10 +134,12 @@ func (p *Pod) maybeStopServiceContainer() error { return } logrus.Debugf("Stopping service container %s", serviceCtr.ID()) - if err := serviceCtr.Stop(); err != nil { - if !errors.Is(err, define.ErrCtrStopped) { - logrus.Errorf("Stopping service container %s: %v", serviceCtr.ID(), err) - } + if err := serviceCtr.Stop(); err != nil && !errors.Is(err, define.ErrCtrStopped) { + // Log this in debug mode so that we don't print out an error and confuse the user + // when the service container can't be stopped because it is in created state + // This can happen when an error happens during kube play and we are trying to + // clean up after the error. + logrus.Debugf("Error stopping service container %s: %v", serviceCtr.ID(), err) } }) return nil diff --git a/libpod/shutdown/handler.go b/libpod/shutdown/handler.go index 75e9b4e8a5..2b30a95768 100644 --- a/libpod/shutdown/handler.go +++ b/libpod/shutdown/handler.go @@ -27,6 +27,7 @@ var ( handlerOrder []string shutdownInhibit sync.RWMutex logrus = logrusImport.WithField("PID", os.Getpid()) + ErrNotStarted = errors.New("shutdown signal handler has not yet been started") ) // Start begins handling SIGTERM and SIGINT and will run the given on-signal @@ -84,7 +85,7 @@ func Start() error { // Stop the shutdown signal handler. func Stop() error { if cancelChan == nil { - return errors.New("shutdown signal handler has not yet been started") + return ErrNotStarted } if stopped { return nil diff --git a/pkg/api/handlers/libpod/kube.go b/pkg/api/handlers/libpod/kube.go index 77dddaf02d..2abefa5d1d 100644 --- a/pkg/api/handlers/libpod/kube.go +++ b/pkg/api/handlers/libpod/kube.go @@ -19,16 +19,18 @@ func KubePlay(w http.ResponseWriter, r *http.Request) { runtime := r.Context().Value(api.RuntimeKey).(*libpod.Runtime) decoder := r.Context().Value(api.DecoderKey).(*schema.Decoder) query := struct { - Annotations map[string]string `schema:"annotations"` - Network []string `schema:"network"` - TLSVerify bool `schema:"tlsVerify"` - LogDriver string `schema:"logDriver"` - LogOptions []string `schema:"logOptions"` - Start bool `schema:"start"` - StaticIPs []string `schema:"staticIPs"` - StaticMACs []string `schema:"staticMACs"` - NoHosts bool `schema:"noHosts"` - PublishPorts []string `schema:"publishPorts"` + Annotations map[string]string `schema:"annotations"` + Network []string `schema:"network"` + TLSVerify bool `schema:"tlsVerify"` + LogDriver string `schema:"logDriver"` + LogOptions []string `schema:"logOptions"` + Start bool `schema:"start"` + StaticIPs []string `schema:"staticIPs"` + StaticMACs []string `schema:"staticMACs"` + NoHosts bool `schema:"noHosts"` + PublishPorts []string `schema:"publishPorts"` + Wait bool `schema:"wait"` + ServiceContainer bool `schema:"serviceContainer"` }{ TLSVerify: true, Start: true, @@ -83,19 +85,21 @@ func KubePlay(w http.ResponseWriter, r *http.Request) { containerEngine := abi.ContainerEngine{Libpod: runtime} options := entities.PlayKubeOptions{ - Annotations: query.Annotations, - Authfile: authfile, - Username: username, - Password: password, - Networks: query.Network, - NoHosts: query.NoHosts, - Quiet: true, - LogDriver: logDriver, - LogOptions: query.LogOptions, - StaticIPs: staticIPs, - StaticMACs: staticMACs, - IsRemote: true, - PublishPorts: query.PublishPorts, + Annotations: query.Annotations, + Authfile: authfile, + Username: username, + Password: password, + Networks: query.Network, + NoHosts: query.NoHosts, + Quiet: true, + LogDriver: logDriver, + LogOptions: query.LogOptions, + StaticIPs: staticIPs, + StaticMACs: staticMACs, + IsRemote: true, + PublishPorts: query.PublishPorts, + Wait: query.Wait, + ServiceContainer: query.ServiceContainer, } if _, found := r.URL.Query()["tlsVerify"]; found { options.SkipTLSVerify = types.NewOptionalBool(!query.TLSVerify) diff --git a/pkg/api/server/register_kube.go b/pkg/api/server/register_kube.go index 963bfdb5af..73f4bc0ecf 100644 --- a/pkg/api/server/register_kube.go +++ b/pkg/api/server/register_kube.go @@ -37,6 +37,11 @@ func (s *APIServer) registerKubeHandlers(r *mux.Router) error { // default: true // description: Start the pod after creating it. // - in: query + // name: serviceContainer + // type: boolean + // default: false + // description: Starts a service container before all pods. + // - in: query // name: staticIPs // type: array // description: Static IPs used for the pods. @@ -48,6 +53,11 @@ func (s *APIServer) registerKubeHandlers(r *mux.Router) error { // description: Static MACs used for the pods. // items: // type: string + // - in: query + // name: wait + // type: boolean + // default: false + // description: Clean up all objects created when a SIGTERM is received or pods exit. // - in: body // name: request // description: Kubernetes YAML file. diff --git a/pkg/bindings/kube/types.go b/pkg/bindings/kube/types.go index 8cd7c79262..a5352333ba 100644 --- a/pkg/bindings/kube/types.go +++ b/pkg/bindings/kube/types.go @@ -50,6 +50,9 @@ type PlayOptions struct { Force *bool // PublishPorts - configure how to expose ports configured inside the K8S YAML file PublishPorts []string + // // Wait - indicates whether to return after having created the pods + Wait *bool + ServiceContainer *bool } // ApplyOptions are optional options for applying kube YAML files to a k8s cluster diff --git a/pkg/bindings/kube/types_play_options.go b/pkg/bindings/kube/types_play_options.go index 522fac85d8..bd83c6f576 100644 --- a/pkg/bindings/kube/types_play_options.go +++ b/pkg/bindings/kube/types_play_options.go @@ -317,3 +317,33 @@ func (o *PlayOptions) GetPublishPorts() []string { } return o.PublishPorts } + +// WithWait set field Wait to given value +func (o *PlayOptions) WithWait(value bool) *PlayOptions { + o.Wait = &value + return o +} + +// GetWait returns value of field Wait +func (o *PlayOptions) GetWait() bool { + if o.Wait == nil { + var z bool + return z + } + return *o.Wait +} + +// WithServiceContainer set field ServiceContainer to given value +func (o *PlayOptions) WithServiceContainer(value bool) *PlayOptions { + o.ServiceContainer = &value + return o +} + +// GetServiceContainer returns value of field ServiceContainer +func (o *PlayOptions) GetServiceContainer() bool { + if o.ServiceContainer == nil { + var z bool + return z + } + return *o.ServiceContainer +} diff --git a/pkg/domain/entities/play.go b/pkg/domain/entities/play.go index 3f791ab2fa..3989f96a68 100644 --- a/pkg/domain/entities/play.go +++ b/pkg/domain/entities/play.go @@ -64,6 +64,8 @@ type PlayKubeOptions struct { Force bool // PublishPorts - configure how to expose ports configured inside the K8S YAML file PublishPorts []string + // Wait - indicates whether to return after having created the pods + Wait bool } // PlayKubePod represents a single pod and associated containers created by play kube @@ -94,7 +96,10 @@ type PlayKubeReport struct { // Volumes - volumes created by play kube. Volumes []PlayKubeVolume PlayKubeTeardown + // Secrets - secrets created by play kube Secrets []PlaySecret + // ServiceContainerID - ID of the service container if one is created + ServiceContainerID string } type KubePlayReport = PlayKubeReport diff --git a/pkg/domain/infra/abi/play.go b/pkg/domain/infra/abi/play.go index 7c34af34f8..a9a821c076 100644 --- a/pkg/domain/infra/abi/play.go +++ b/pkg/domain/infra/abi/play.go @@ -200,8 +200,13 @@ func (ic *ContainerEngine) PlayKube(ctx context.Context, body io.Reader, options if finalErr == nil { return } - if err := ic.Libpod.RemoveContainer(ctx, ctr, true, false, nil); err != nil { - logrus.Errorf("Cleaning up service container after failure: %v", err) + if err := ic.Libpod.RemoveContainer(ctx, ctr, true, true, nil); err != nil { + // Log this in debug mode so that we don't print out an error and confuse the user + // when the service container can't be removed because the pod still exists + // This can happen when an error happens during kube play and we are trying to + // clean up after the error. The service container will be removed as part of the + // teardown function. + logrus.Debugf("Error cleaning up service container after failure: %v", err) } }() } @@ -316,6 +321,7 @@ func (ic *ContainerEngine) PlayKube(ctx context.Context, body io.Reader, options // If we started containers along with a service container, we are // running inside a systemd unit and need to set the main PID. + if options.ServiceContainer && ranContainers { switch len(notifyProxies) { case 0: // Optimization for containers/podman/issues/17345 @@ -341,11 +347,12 @@ func (ic *ContainerEngine) PlayKube(ctx context.Context, body io.Reader, options if err := notifyproxy.SendMessage("", message); err != nil { return nil, err } - if _, err := serviceContainer.Wait(ctx); err != nil { return nil, fmt.Errorf("waiting for service container: %w", err) } } + + report.ServiceContainerID = serviceContainer.ID() } return report, nil diff --git a/pkg/domain/infra/tunnel/kube.go b/pkg/domain/infra/tunnel/kube.go index 6e42240fe2..d5c0fcb4d7 100644 --- a/pkg/domain/infra/tunnel/kube.go +++ b/pkg/domain/infra/tunnel/kube.go @@ -58,7 +58,7 @@ func (ic *ContainerEngine) PlayKube(ctx context.Context, body io.Reader, opts en options := new(kube.PlayOptions).WithAuthfile(opts.Authfile).WithUsername(opts.Username).WithPassword(opts.Password) options.WithCertDir(opts.CertDir).WithQuiet(opts.Quiet).WithSignaturePolicy(opts.SignaturePolicy).WithConfigMaps(opts.ConfigMaps) options.WithLogDriver(opts.LogDriver).WithNetwork(opts.Networks).WithSeccompProfileRoot(opts.SeccompProfileRoot) - options.WithStaticIPs(opts.StaticIPs).WithStaticMACs(opts.StaticMACs) + options.WithStaticIPs(opts.StaticIPs).WithStaticMACs(opts.StaticMACs).WithWait(opts.Wait).WithServiceContainer(opts.ServiceContainer) if len(opts.LogOptions) > 0 { options.WithLogOptions(opts.LogOptions) } diff --git a/test/system/700-play.bats b/test/system/700-play.bats index 7c694aeddf..9769c9f0eb 100644 --- a/test/system/700-play.bats +++ b/test/system/700-play.bats @@ -572,3 +572,55 @@ EOF fi run_podman kube down $YAML } + +# kube play --wait=true, where we clear up the created containers, pods, and volumes when a kill or sigterm is triggered +@test "podman kube play --wait with siginterrupt" { + cname=c$(random_string 15) + fname="/tmp/play_kube_wait_$(random_string 6).yaml" + run_podman container create --name $cname $IMAGE top + run_podman kube generate -f $fname $cname + + # delete the container we generated from + run_podman rm -f $cname + + # force a timeout to happen so that the kube play command is killed + # and expect the timeout code 124 to happen so that we can clean up + local t0=$SECONDS + PODMAN_TIMEOUT=15 run_podman 124 kube play --wait $fname + local t1=$SECONDS + local delta_t=$((t1 - t0)) + assert $delta_t -le 20 \ + "podman kube play did not get killed within 10 seconds" + + # there should be no containers running or created + run_podman ps -aq + is "$output" "" "There should be no containers" + run_podman rmi $(pause_image) +} + +@test "podman kube play --wait - wait for pod to exit" { + fname="/tmp/play_kube_wait_$(random_string 6).yaml" + echo " +apiVersion: v1 +kind: Pod +metadata: + labels: + app: test + name: test_pod +spec: + restartPolicy: Never + containers: + - name: server + image: $IMAGE + command: + - sleep + - "5" +" > $fname + + run_podman kube play --wait $fname + + # there should be no containers running or created + run_podman ps -aq + is "$output" "" "There should be no containers" + run_podman rmi $(pause_image) +}