play kube: Add --wait option

Add a way to keep play kube running in the foreground and terminating all pods
after receiving a a SIGINT or SIGTERM signal. The pods will also be
cleaned up after the containers in it have exited.
If an error occurrs during kube play, any resources created till the
error point will be cleane up also.

Add tests for the various scenarios.

Fixes #14522

Signed-off-by: Urvashi Mohnani <umohnani@redhat.com>
This commit is contained in:
Urvashi Mohnani
2023-01-11 10:08:06 +05:30
parent db53f38711
commit 20a42d0e4f
13 changed files with 244 additions and 62 deletions

View File

@ -52,5 +52,5 @@ func down(cmd *cobra.Command, args []string) error {
if err != nil { if err != nil {
return err return err
} }
return teardown(reader, entities.PlayKubeDownOptions{Force: downOptions.Force}, false) return teardown(reader, entities.PlayKubeDownOptions{Force: downOptions.Force})
} }

View File

@ -8,7 +8,9 @@ import (
"net" "net"
"net/http" "net/http"
"os" "os"
"os/signal"
"strings" "strings"
"syscall"
"github.com/containers/common/pkg/auth" "github.com/containers/common/pkg/auth"
"github.com/containers/common/pkg/completion" "github.com/containers/common/pkg/completion"
@ -18,6 +20,7 @@ import (
"github.com/containers/podman/v4/cmd/podman/registry" "github.com/containers/podman/v4/cmd/podman/registry"
"github.com/containers/podman/v4/cmd/podman/utils" "github.com/containers/podman/v4/cmd/podman/utils"
"github.com/containers/podman/v4/libpod/define" "github.com/containers/podman/v4/libpod/define"
"github.com/containers/podman/v4/libpod/shutdown"
"github.com/containers/podman/v4/pkg/domain/entities" "github.com/containers/podman/v4/pkg/domain/entities"
"github.com/containers/podman/v4/pkg/errorhandling" "github.com/containers/podman/v4/pkg/errorhandling"
"github.com/containers/podman/v4/pkg/util" "github.com/containers/podman/v4/pkg/util"
@ -155,6 +158,9 @@ func playFlags(cmd *cobra.Command) {
flags.StringSliceVar(&playOptions.PublishPorts, publishPortsFlagName, []string{}, "Publish a container's port, or a range of ports, to the host") flags.StringSliceVar(&playOptions.PublishPorts, publishPortsFlagName, []string{}, "Publish a container's port, or a range of ports, to the host")
_ = cmd.RegisterFlagCompletionFunc(publishPortsFlagName, completion.AutocompleteNone) _ = cmd.RegisterFlagCompletionFunc(publishPortsFlagName, completion.AutocompleteNone)
waitFlagName := "wait"
flags.BoolVarP(&playOptions.Wait, waitFlagName, "w", false, "Clean up all objects created when a SIGTERM is received or pods exit")
if !registry.IsRemote() { if !registry.IsRemote() {
certDirFlagName := "cert-dir" certDirFlagName := "cert-dir"
flags.StringVar(&playOptions.CertDir, certDirFlagName, "", "`Pathname` of a directory containing TLS certificates and keys") flags.StringVar(&playOptions.CertDir, certDirFlagName, "", "`Pathname` of a directory containing TLS certificates and keys")
@ -257,11 +263,11 @@ func play(cmd *cobra.Command, args []string) error {
} }
if playOptions.Down { if playOptions.Down {
return teardown(reader, entities.PlayKubeDownOptions{Force: playOptions.Force}, false) return teardown(reader, entities.PlayKubeDownOptions{Force: playOptions.Force})
} }
if playOptions.Replace { if playOptions.Replace {
if err := teardown(reader, entities.PlayKubeDownOptions{Force: playOptions.Force}, false); err != nil && !errorhandling.Contains(err, define.ErrNoSuchPod) { if err := teardown(reader, entities.PlayKubeDownOptions{Force: playOptions.Force}); err != nil && !errorhandling.Contains(err, define.ErrNoSuchPod) {
return err return err
} }
if _, err := reader.Seek(0, 0); err != nil { if _, err := reader.Seek(0, 0); err != nil {
@ -269,22 +275,68 @@ func play(cmd *cobra.Command, args []string) error {
} }
} }
if err := kubeplay(reader); err != nil { // Create a channel to catch an interrupt or SIGTERM signal
ch := make(chan os.Signal, 1)
var teardownReader *bytes.Reader
if playOptions.Wait {
// Stop the the shutdown signal handler so we can actually clean up after a SIGTERM or interrupt
if err := shutdown.Stop(); err != nil && err != shutdown.ErrNotStarted {
return err
}
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
playOptions.ServiceContainer = true
// Read the kube yaml file again so that a reader can be passed down to the teardown function
teardownReader, err = readerFromArg(args[0])
if err != nil {
return err
}
fmt.Println("Use ctrl+c to clean up or wait for pods to exit")
}
var teardownErr error
cancelled := false
if playOptions.Wait {
// use a goroutine to wait for a sigterm or interrupt
go func() {
<-ch
// clean up any volumes that were created as well
fmt.Println("\nCleaning up containers, pods, and volumes...")
cancelled = true
if err := teardown(teardownReader, entities.PlayKubeDownOptions{Force: true}); err != nil && !errorhandling.Contains(err, define.ErrNoSuchPod) {
teardownErr = fmt.Errorf("error during cleanup: %v", err)
}
}()
}
if playErr := kubeplay(reader); playErr != nil {
// FIXME: The cleanup logic below must be fixed to only remove // FIXME: The cleanup logic below must be fixed to only remove
// resources that were created before a failure. Otherwise, // resources that were created before a failure. Otherwise,
// rerunning the same YAML file will cause an error and remove // rerunning the same YAML file will cause an error and remove
// the previously created workload. // the previously created workload.
// //
// teardown any containers, pods, and volumes that might have created before we hit the error // teardown any containers, pods, and volumes that might have created before we hit the error
// teardownReader, trErr := readerFromArg(args[0]) // reader, err := readerFromArg(args[0])
// if trErr != nil { // if err != nil {
// return trErr // return err
// } // }
// if tErr := teardown(teardownReader, entities.PlayKubeDownOptions{Force: true}, true); tErr != nil && !errorhandling.Contains(tErr, define.ErrNoSuchPod) { // if err := teardown(reader, entities.PlayKubeDownOptions{Force: true}, true); err != nil && !errorhandling.Contains(err, define.ErrNoSuchPod) {
// return fmt.Errorf("error tearing down workloads %q after kube play error %q", tErr, err) // return fmt.Errorf("error tearing down workloads %q after kube play error %q", err, playErr)
// } // }
return playErr
}
if teardownErr != nil {
return teardownErr
}
// cleanup if --wait=true and the pods have exited
if playOptions.Wait && !cancelled {
fmt.Println("Cleaning up containers, pods, and volumes...")
// clean up any volumes that were created as well
if err := teardown(teardownReader, entities.PlayKubeDownOptions{Force: true}); err != nil && !errorhandling.Contains(err, define.ErrNoSuchPod) {
return err return err
} }
}
return nil return nil
} }
@ -328,7 +380,7 @@ func readerFromArg(fileName string) (*bytes.Reader, error) {
return bytes.NewReader(data), nil return bytes.NewReader(data), nil
} }
func teardown(body io.Reader, options entities.PlayKubeDownOptions, quiet bool) error { func teardown(body io.Reader, options entities.PlayKubeDownOptions) error {
var ( var (
podStopErrors utils.OutputErrors podStopErrors utils.OutputErrors
podRmErrors utils.OutputErrors podRmErrors utils.OutputErrors
@ -341,14 +393,11 @@ func teardown(body io.Reader, options entities.PlayKubeDownOptions, quiet bool)
} }
// Output stopped pods // Output stopped pods
if !quiet {
fmt.Println("Pods stopped:") fmt.Println("Pods stopped:")
}
for _, stopped := range reports.StopReport { for _, stopped := range reports.StopReport {
switch { switch {
case len(stopped.Errs) > 0: case len(stopped.Errs) > 0:
podStopErrors = append(podStopErrors, stopped.Errs...) podStopErrors = append(podStopErrors, stopped.Errs...)
case quiet:
default: default:
fmt.Println(stopped.Id) fmt.Println(stopped.Id)
} }
@ -360,14 +409,11 @@ func teardown(body io.Reader, options entities.PlayKubeDownOptions, quiet bool)
} }
// Output rm'd pods // Output rm'd pods
if !quiet {
fmt.Println("Pods removed:") fmt.Println("Pods removed:")
}
for _, removed := range reports.RmReport { for _, removed := range reports.RmReport {
switch { switch {
case removed.Err != nil: case removed.Err != nil:
podRmErrors = append(podRmErrors, removed.Err) podRmErrors = append(podRmErrors, removed.Err)
case quiet:
default: default:
fmt.Println(removed.Id) fmt.Println(removed.Id)
} }
@ -379,14 +425,11 @@ func teardown(body io.Reader, options entities.PlayKubeDownOptions, quiet bool)
} }
// Output rm'd volumes // Output rm'd volumes
if !quiet {
fmt.Println("Secrets removed:") fmt.Println("Secrets removed:")
}
for _, removed := range reports.SecretRmReport { for _, removed := range reports.SecretRmReport {
switch { switch {
case removed.Err != nil: case removed.Err != nil:
secRmErrors = append(secRmErrors, removed.Err) secRmErrors = append(secRmErrors, removed.Err)
case quiet:
default: default:
fmt.Println(removed.ID) fmt.Println(removed.ID)
} }
@ -397,14 +440,11 @@ func teardown(body io.Reader, options entities.PlayKubeDownOptions, quiet bool)
} }
// Output rm'd volumes // Output rm'd volumes
if !quiet {
fmt.Println("Volumes removed:") fmt.Println("Volumes removed:")
}
for _, removed := range reports.VolumeRmReport { for _, removed := range reports.VolumeRmReport {
switch { switch {
case removed.Err != nil: case removed.Err != nil:
volRmErrors = append(volRmErrors, removed.Err) volRmErrors = append(volRmErrors, removed.Err)
case quiet:
default: default:
fmt.Println(removed.Id) fmt.Println(removed.Id)
} }
@ -418,6 +458,23 @@ func kubeplay(body io.Reader) error {
if err != nil { if err != nil {
return err return err
} }
if err := printPlayReport(report); err != nil {
return err
}
// If --wait=true, we need wait for the service container to exit so that we know that the pod has exited and we can clean up
if playOptions.Wait {
_, err := registry.ContainerEngine().ContainerWait(registry.GetContext(), []string{report.ServiceContainerID}, entities.WaitOptions{})
if err != nil {
return err
}
}
return nil
}
// printPlayReport goes through the report returned by KubePlay and prints it out in a human
// friendly format.
func printPlayReport(report *entities.PlayKubeReport) error {
// Print volumes report // Print volumes report
for i, volume := range report.Volumes { for i, volume := range report.Volumes {
if i == 0 { if i == 0 {
@ -473,6 +530,5 @@ func kubeplay(body io.Reader) error {
if ctrsFailed > 0 { if ctrsFailed > 0 {
return fmt.Errorf("failed to start %d containers", ctrsFailed) return fmt.Errorf("failed to start %d containers", ctrsFailed)
} }
return nil return nil
} }

View File

@ -205,6 +205,18 @@ Start the pod after creating it, set to false to only create it.
@@option userns.container @@option userns.container
#### **--wait**, **-w**
Run pods and containers in the foreground. Default is false.
At any time you can run `podman pod ps` in the other shell to view a list of
the running pods and containers.
When attached in the tty mode, you can kill the pods and containers by pressing
Ctrl-C or receiving any other interrupt signals.
Volumes created with `podman kube play` will be removed when `--wait=true`.
## EXAMPLES ## EXAMPLES
Recreate the pod and containers as described in a file called `demo.yml` Recreate the pod and containers as described in a file called `demo.yml`

View File

@ -134,10 +134,12 @@ func (p *Pod) maybeStopServiceContainer() error {
return return
} }
logrus.Debugf("Stopping service container %s", serviceCtr.ID()) logrus.Debugf("Stopping service container %s", serviceCtr.ID())
if err := serviceCtr.Stop(); err != nil { if err := serviceCtr.Stop(); err != nil && !errors.Is(err, define.ErrCtrStopped) {
if !errors.Is(err, define.ErrCtrStopped) { // Log this in debug mode so that we don't print out an error and confuse the user
logrus.Errorf("Stopping service container %s: %v", serviceCtr.ID(), err) // when the service container can't be stopped because it is in created state
} // This can happen when an error happens during kube play and we are trying to
// clean up after the error.
logrus.Debugf("Error stopping service container %s: %v", serviceCtr.ID(), err)
} }
}) })
return nil return nil

View File

@ -27,6 +27,7 @@ var (
handlerOrder []string handlerOrder []string
shutdownInhibit sync.RWMutex shutdownInhibit sync.RWMutex
logrus = logrusImport.WithField("PID", os.Getpid()) logrus = logrusImport.WithField("PID", os.Getpid())
ErrNotStarted = errors.New("shutdown signal handler has not yet been started")
) )
// Start begins handling SIGTERM and SIGINT and will run the given on-signal // Start begins handling SIGTERM and SIGINT and will run the given on-signal
@ -84,7 +85,7 @@ func Start() error {
// Stop the shutdown signal handler. // Stop the shutdown signal handler.
func Stop() error { func Stop() error {
if cancelChan == nil { if cancelChan == nil {
return errors.New("shutdown signal handler has not yet been started") return ErrNotStarted
} }
if stopped { if stopped {
return nil return nil

View File

@ -29,6 +29,8 @@ func KubePlay(w http.ResponseWriter, r *http.Request) {
StaticMACs []string `schema:"staticMACs"` StaticMACs []string `schema:"staticMACs"`
NoHosts bool `schema:"noHosts"` NoHosts bool `schema:"noHosts"`
PublishPorts []string `schema:"publishPorts"` PublishPorts []string `schema:"publishPorts"`
Wait bool `schema:"wait"`
ServiceContainer bool `schema:"serviceContainer"`
}{ }{
TLSVerify: true, TLSVerify: true,
Start: true, Start: true,
@ -96,6 +98,8 @@ func KubePlay(w http.ResponseWriter, r *http.Request) {
StaticMACs: staticMACs, StaticMACs: staticMACs,
IsRemote: true, IsRemote: true,
PublishPorts: query.PublishPorts, PublishPorts: query.PublishPorts,
Wait: query.Wait,
ServiceContainer: query.ServiceContainer,
} }
if _, found := r.URL.Query()["tlsVerify"]; found { if _, found := r.URL.Query()["tlsVerify"]; found {
options.SkipTLSVerify = types.NewOptionalBool(!query.TLSVerify) options.SkipTLSVerify = types.NewOptionalBool(!query.TLSVerify)

View File

@ -37,6 +37,11 @@ func (s *APIServer) registerKubeHandlers(r *mux.Router) error {
// default: true // default: true
// description: Start the pod after creating it. // description: Start the pod after creating it.
// - in: query // - in: query
// name: serviceContainer
// type: boolean
// default: false
// description: Starts a service container before all pods.
// - in: query
// name: staticIPs // name: staticIPs
// type: array // type: array
// description: Static IPs used for the pods. // description: Static IPs used for the pods.
@ -48,6 +53,11 @@ func (s *APIServer) registerKubeHandlers(r *mux.Router) error {
// description: Static MACs used for the pods. // description: Static MACs used for the pods.
// items: // items:
// type: string // type: string
// - in: query
// name: wait
// type: boolean
// default: false
// description: Clean up all objects created when a SIGTERM is received or pods exit.
// - in: body // - in: body
// name: request // name: request
// description: Kubernetes YAML file. // description: Kubernetes YAML file.

View File

@ -50,6 +50,9 @@ type PlayOptions struct {
Force *bool Force *bool
// PublishPorts - configure how to expose ports configured inside the K8S YAML file // PublishPorts - configure how to expose ports configured inside the K8S YAML file
PublishPorts []string PublishPorts []string
// // Wait - indicates whether to return after having created the pods
Wait *bool
ServiceContainer *bool
} }
// ApplyOptions are optional options for applying kube YAML files to a k8s cluster // ApplyOptions are optional options for applying kube YAML files to a k8s cluster

View File

@ -317,3 +317,33 @@ func (o *PlayOptions) GetPublishPorts() []string {
} }
return o.PublishPorts return o.PublishPorts
} }
// WithWait set field Wait to given value
func (o *PlayOptions) WithWait(value bool) *PlayOptions {
o.Wait = &value
return o
}
// GetWait returns value of field Wait
func (o *PlayOptions) GetWait() bool {
if o.Wait == nil {
var z bool
return z
}
return *o.Wait
}
// WithServiceContainer set field ServiceContainer to given value
func (o *PlayOptions) WithServiceContainer(value bool) *PlayOptions {
o.ServiceContainer = &value
return o
}
// GetServiceContainer returns value of field ServiceContainer
func (o *PlayOptions) GetServiceContainer() bool {
if o.ServiceContainer == nil {
var z bool
return z
}
return *o.ServiceContainer
}

View File

@ -64,6 +64,8 @@ type PlayKubeOptions struct {
Force bool Force bool
// PublishPorts - configure how to expose ports configured inside the K8S YAML file // PublishPorts - configure how to expose ports configured inside the K8S YAML file
PublishPorts []string PublishPorts []string
// Wait - indicates whether to return after having created the pods
Wait bool
} }
// PlayKubePod represents a single pod and associated containers created by play kube // PlayKubePod represents a single pod and associated containers created by play kube
@ -94,7 +96,10 @@ type PlayKubeReport struct {
// Volumes - volumes created by play kube. // Volumes - volumes created by play kube.
Volumes []PlayKubeVolume Volumes []PlayKubeVolume
PlayKubeTeardown PlayKubeTeardown
// Secrets - secrets created by play kube
Secrets []PlaySecret Secrets []PlaySecret
// ServiceContainerID - ID of the service container if one is created
ServiceContainerID string
} }
type KubePlayReport = PlayKubeReport type KubePlayReport = PlayKubeReport

View File

@ -200,8 +200,13 @@ func (ic *ContainerEngine) PlayKube(ctx context.Context, body io.Reader, options
if finalErr == nil { if finalErr == nil {
return return
} }
if err := ic.Libpod.RemoveContainer(ctx, ctr, true, false, nil); err != nil { if err := ic.Libpod.RemoveContainer(ctx, ctr, true, true, nil); err != nil {
logrus.Errorf("Cleaning up service container after failure: %v", err) // Log this in debug mode so that we don't print out an error and confuse the user
// when the service container can't be removed because the pod still exists
// This can happen when an error happens during kube play and we are trying to
// clean up after the error. The service container will be removed as part of the
// teardown function.
logrus.Debugf("Error cleaning up service container after failure: %v", err)
} }
}() }()
} }
@ -316,6 +321,7 @@ func (ic *ContainerEngine) PlayKube(ctx context.Context, body io.Reader, options
// If we started containers along with a service container, we are // If we started containers along with a service container, we are
// running inside a systemd unit and need to set the main PID. // running inside a systemd unit and need to set the main PID.
if options.ServiceContainer && ranContainers { if options.ServiceContainer && ranContainers {
switch len(notifyProxies) { switch len(notifyProxies) {
case 0: // Optimization for containers/podman/issues/17345 case 0: // Optimization for containers/podman/issues/17345
@ -341,11 +347,12 @@ func (ic *ContainerEngine) PlayKube(ctx context.Context, body io.Reader, options
if err := notifyproxy.SendMessage("", message); err != nil { if err := notifyproxy.SendMessage("", message); err != nil {
return nil, err return nil, err
} }
if _, err := serviceContainer.Wait(ctx); err != nil { if _, err := serviceContainer.Wait(ctx); err != nil {
return nil, fmt.Errorf("waiting for service container: %w", err) return nil, fmt.Errorf("waiting for service container: %w", err)
} }
} }
report.ServiceContainerID = serviceContainer.ID()
} }
return report, nil return report, nil

View File

@ -58,7 +58,7 @@ func (ic *ContainerEngine) PlayKube(ctx context.Context, body io.Reader, opts en
options := new(kube.PlayOptions).WithAuthfile(opts.Authfile).WithUsername(opts.Username).WithPassword(opts.Password) options := new(kube.PlayOptions).WithAuthfile(opts.Authfile).WithUsername(opts.Username).WithPassword(opts.Password)
options.WithCertDir(opts.CertDir).WithQuiet(opts.Quiet).WithSignaturePolicy(opts.SignaturePolicy).WithConfigMaps(opts.ConfigMaps) options.WithCertDir(opts.CertDir).WithQuiet(opts.Quiet).WithSignaturePolicy(opts.SignaturePolicy).WithConfigMaps(opts.ConfigMaps)
options.WithLogDriver(opts.LogDriver).WithNetwork(opts.Networks).WithSeccompProfileRoot(opts.SeccompProfileRoot) options.WithLogDriver(opts.LogDriver).WithNetwork(opts.Networks).WithSeccompProfileRoot(opts.SeccompProfileRoot)
options.WithStaticIPs(opts.StaticIPs).WithStaticMACs(opts.StaticMACs) options.WithStaticIPs(opts.StaticIPs).WithStaticMACs(opts.StaticMACs).WithWait(opts.Wait).WithServiceContainer(opts.ServiceContainer)
if len(opts.LogOptions) > 0 { if len(opts.LogOptions) > 0 {
options.WithLogOptions(opts.LogOptions) options.WithLogOptions(opts.LogOptions)
} }

View File

@ -572,3 +572,55 @@ EOF
fi fi
run_podman kube down $YAML run_podman kube down $YAML
} }
# kube play --wait=true, where we clear up the created containers, pods, and volumes when a kill or sigterm is triggered
@test "podman kube play --wait with siginterrupt" {
cname=c$(random_string 15)
fname="/tmp/play_kube_wait_$(random_string 6).yaml"
run_podman container create --name $cname $IMAGE top
run_podman kube generate -f $fname $cname
# delete the container we generated from
run_podman rm -f $cname
# force a timeout to happen so that the kube play command is killed
# and expect the timeout code 124 to happen so that we can clean up
local t0=$SECONDS
PODMAN_TIMEOUT=15 run_podman 124 kube play --wait $fname
local t1=$SECONDS
local delta_t=$((t1 - t0))
assert $delta_t -le 20 \
"podman kube play did not get killed within 10 seconds"
# there should be no containers running or created
run_podman ps -aq
is "$output" "" "There should be no containers"
run_podman rmi $(pause_image)
}
@test "podman kube play --wait - wait for pod to exit" {
fname="/tmp/play_kube_wait_$(random_string 6).yaml"
echo "
apiVersion: v1
kind: Pod
metadata:
labels:
app: test
name: test_pod
spec:
restartPolicy: Never
containers:
- name: server
image: $IMAGE
command:
- sleep
- "5"
" > $fname
run_podman kube play --wait $fname
# there should be no containers running or created
run_podman ps -aq
is "$output" "" "There should be no containers"
run_podman rmi $(pause_image)
}