Merge pull request #14159 from vrothberg/service-container

play kube: service container
This commit is contained in:
Daniel J Walsh
2022-05-12 13:35:56 -04:00
committed by GitHub
53 changed files with 691 additions and 183 deletions

View File

@ -211,6 +211,14 @@ type ContainerState struct {
// network and an interface names
NetInterfaceDescriptions ContainerNetworkDescriptions `json:"networkDescriptions,omitempty"`
// Service indicates that container is the service container of a
// service. A service consists of one or more pods. The service
// container is started before all pods and is stopped when the last
// pod stops. The service container allows for tracking and managing
// the entire life cycle of service which may be started via
// `podman-play-kube`.
Service Service
// containerPlatformState holds platform-specific container state.
containerPlatformState

View File

@ -382,6 +382,9 @@ type ContainerMiscConfig struct {
// IsInfra is a bool indicating whether this container is an infra container used for
// sharing kernel namespaces in a pod
IsInfra bool `json:"pause"`
// IsService is a bool indicating whether this container is a service container used for
// tracking the life cycle of K8s service.
IsService bool `json:"isService"`
// SdNotifyMode tells libpod what to do with a NOTIFY_SOCKET if passed
SdNotifyMode string `json:"sdnotifyMode,omitempty"`
// Systemd tells libpod to setup the container in systemd mode, a value of nil denotes false

View File

@ -171,6 +171,7 @@ func (c *Container) getContainerInspectData(size bool, driverData *define.Driver
Mounts: inspectMounts,
Dependencies: c.Dependencies(),
IsInfra: c.IsInfra(),
IsService: c.isService(),
}
if c.state.ConfigPath != "" {

View File

@ -1,6 +1,8 @@
package libpod
import (
"fmt"
"github.com/containers/podman/v4/libpod/define"
spec "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
@ -27,6 +29,12 @@ func (c *Container) validate() error {
return errors.Wrapf(define.ErrInvalidArg, "must set root filesystem source to either image or rootfs")
}
// A container cannot be marked as an infra and service container at
// the same time.
if c.IsInfra() && c.isService() {
return fmt.Errorf("cannot be infra and service container at the same time: %w", define.ErrInvalidArg)
}
// Cannot make a network namespace if we are joining another container's
// network namespace
if c.config.CreateNetNS && c.config.NetNsCtr != "" {

View File

@ -683,6 +683,7 @@ type InspectContainerData struct {
NetworkSettings *InspectNetworkSettings `json:"NetworkSettings"`
Namespace string `json:"Namespace"`
IsInfra bool `json:"IsInfra"`
IsService bool `json:"IsService"`
Config *InspectContainerConfig `json:"Config"`
HostConfig *InspectContainerHostConfig `json:"HostConfig"`
}

View File

@ -1,6 +1,7 @@
package libpod
import (
"fmt"
"net"
"os"
"path/filepath"
@ -1477,7 +1478,7 @@ func WithCreateCommand(cmd []string) CtrCreateOption {
}
}
// withIsInfra allows us to dfferentiate between infra containers and regular containers
// withIsInfra allows us to dfferentiate between infra containers and other containers
// within the container config
func withIsInfra() CtrCreateOption {
return func(ctr *Container) error {
@ -1491,6 +1492,20 @@ func withIsInfra() CtrCreateOption {
}
}
// WithIsService allows us to dfferentiate between service containers and other container
// within the container config
func WithIsService() CtrCreateOption {
return func(ctr *Container) error {
if ctr.valid {
return define.ErrCtrFinalized
}
ctr.config.IsService = true
return nil
}
}
// WithCreateWorkingDir tells Podman to create the container's working directory
// if it does not exist.
func WithCreateWorkingDir() CtrCreateOption {
@ -2081,6 +2096,27 @@ func WithInfraContainer() PodCreateOption {
}
}
// WithServiceContainer associates the specified service container ID with the pod.
func WithServiceContainer(id string) PodCreateOption {
return func(pod *Pod) error {
if pod.valid {
return define.ErrPodFinalized
}
ctr, err := pod.runtime.LookupContainer(id)
if err != nil {
return fmt.Errorf("looking up service container: %w", err)
}
if err := ctr.addServicePodLocked(pod.ID()); err != nil {
return fmt.Errorf("associating service container %s with pod %s: %w", id, pod.ID(), err)
}
pod.config.ServiceContainerID = id
return nil
}
}
// WithVolatile sets the volatile flag for the container storage.
// The option can potentially cause data loss when used on a container that must survive a machine reboot.
func WithVolatile() CtrCreateOption {

View File

@ -64,6 +64,13 @@ type PodConfig struct {
HasInfra bool `json:"hasInfra,omitempty"`
// ServiceContainerID is the main container of a service. A service
// consists of one or more pods. The service container is started
// before all pods and is stopped when the last pod stops.
// The service container allows for tracking and managing the entire
// life cycle of service which may be started via `podman-play-kube`.
ServiceContainerID string `json:"serviceContainerID,omitempty"`
// Time pod was created
CreatedTime time.Time `json:"created"`

View File

@ -75,6 +75,10 @@ func (p *Pod) Start(ctx context.Context) (map[string]error, error) {
return nil, define.ErrPodRemoved
}
if err := p.maybeStartServiceContainer(ctx); err != nil {
return nil, err
}
// Before "regular" containers start in the pod, all init containers
// must have run and exited successfully.
if err := p.startInitContainers(ctx); err != nil {
@ -197,6 +201,11 @@ func (p *Pod) stopWithTimeout(ctx context.Context, cleanup bool, timeout int) (m
if len(ctrErrors) > 0 {
return ctrErrors, errors.Wrapf(define.ErrPodPartialFail, "error stopping some containers")
}
if err := p.maybeStopServiceContainer(); err != nil {
return nil, err
}
return nil, nil
}
@ -297,6 +306,10 @@ func (p *Pod) Cleanup(ctx context.Context) (map[string]error, error) {
return ctrErrors, errors.Wrapf(define.ErrPodPartialFail, "error cleaning up some containers")
}
if err := p.maybeStopServiceContainer(); err != nil {
return nil, err
}
return nil, nil
}
@ -443,6 +456,10 @@ func (p *Pod) Restart(ctx context.Context) (map[string]error, error) {
return nil, define.ErrPodRemoved
}
if err := p.maybeStartServiceContainer(ctx); err != nil {
return nil, err
}
allCtrs, err := p.runtime.state.PodContainers(p)
if err != nil {
return nil, err
@ -530,6 +547,11 @@ func (p *Pod) Kill(ctx context.Context, signal uint) (map[string]error, error) {
if len(ctrErrors) > 0 {
return ctrErrors, errors.Wrapf(define.ErrPodPartialFail, "error killing some containers")
}
if err := p.maybeStopServiceContainer(); err != nil {
return nil, err
}
return nil, nil
}

View File

@ -40,7 +40,7 @@ func (r *Runtime) RemoveContainersForImageCallback(ctx context.Context) libimage
if ctr.config.IsInfra {
pod, err := r.state.Pod(ctr.config.Pod)
if err != nil {
return errors.Wrapf(err, "container %s is in pod %s, but pod cannot be retrieved", ctr.ID(), pod.ID())
return errors.Wrapf(err, "container %s is in pod %s, but pod cannot be retrieved", ctr.ID(), ctr.config.Pod)
}
if err := r.removePod(ctx, pod, true, true, timeout); err != nil {
return errors.Wrapf(err, "removing image %s: container %s using image could not be removed", imageID, ctr.ID())

View File

@ -380,6 +380,10 @@ func (r *Runtime) removePod(ctx context.Context, p *Pod, removeCtrs, force bool,
}
}
if err := p.maybeRemoveServiceContainer(); err != nil {
return err
}
// Remove pod from state
if err := r.state.RemovePod(p); err != nil {
if removalErr != nil {

213
libpod/service.go Normal file
View File

@ -0,0 +1,213 @@
package libpod
import (
"context"
"fmt"
"github.com/containers/podman/v4/libpod/define"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// A service consists of one or more pods. The service container is started
// before all pods and is stopped when the last pod stops. The service
// container allows for tracking and managing the entire life cycle of service
// which may be started via `podman-play-kube`.
type Service struct {
// Pods running as part of the service.
Pods []string `json:"servicePods"`
}
// Indicates whether the pod is associated with a service container.
// The pod is expected to be updated and locked.
func (p *Pod) hasServiceContainer() bool {
return p.config.ServiceContainerID != ""
}
// Returns the pod's service container.
// The pod is expected to be updated and locked.
func (p *Pod) serviceContainer() (*Container, error) {
id := p.config.ServiceContainerID
if id == "" {
return nil, errors.Wrap(define.ErrNoSuchCtr, "pod has no service container")
}
return p.runtime.state.Container(id)
}
// ServiceContainer returns the service container.
func (p *Pod) ServiceContainer() (*Container, error) {
p.lock.Lock()
defer p.lock.Unlock()
if err := p.updatePod(); err != nil {
return nil, err
}
return p.serviceContainer()
}
func (c *Container) addServicePodLocked(id string) error {
c.lock.Lock()
defer c.lock.Unlock()
if err := c.syncContainer(); err != nil {
return err
}
c.state.Service.Pods = append(c.state.Service.Pods, id)
return c.save()
}
func (c *Container) isService() bool {
return c.config.IsService
}
// canStopServiceContainer returns true if all pods of the service are stopped.
// Note that the method acquires the container lock.
func (c *Container) canStopServiceContainerLocked() (bool, error) {
c.lock.Lock()
defer c.lock.Unlock()
if err := c.syncContainer(); err != nil {
return false, err
}
if !c.isService() {
return false, fmt.Errorf("internal error: checking service: container %s is not a service container", c.ID())
}
for _, id := range c.state.Service.Pods {
pod, err := c.runtime.LookupPod(id)
if err != nil {
if errors.Is(err, define.ErrNoSuchPod) {
continue
}
return false, err
}
status, err := pod.GetPodStatus()
if err != nil {
return false, err
}
// We can only stop the service if all pods are done.
switch status {
case define.PodStateStopped, define.PodStateExited, define.PodStateErrored:
continue
default:
return false, nil
}
}
return true, nil
}
// Checks whether the service container can be stopped and does so.
func (p *Pod) maybeStopServiceContainer() error {
if !p.hasServiceContainer() {
return nil
}
serviceCtr, err := p.serviceContainer()
if err != nil {
return fmt.Errorf("getting pod's service container: %w", err)
}
// Checking whether the service can be stopped must be done in
// the runtime's work queue to resolve ABBA dead locks in the
// pod->container->servicePods hierarchy.
p.runtime.queueWork(func() {
logrus.Debugf("Pod %s has a service %s: checking if it can be stopped", p.ID(), serviceCtr.ID())
canStop, err := serviceCtr.canStopServiceContainerLocked()
if err != nil {
logrus.Errorf("Checking whether service of container %s can be stopped: %v", serviceCtr.ID(), err)
return
}
if !canStop {
return
}
logrus.Debugf("Stopping service container %s", serviceCtr.ID())
if err := serviceCtr.Stop(); err != nil {
logrus.Errorf("Stopping service container %s: %v", serviceCtr.ID(), err)
}
})
return nil
}
// Starts the pod's service container if it's not already running.
func (p *Pod) maybeStartServiceContainer(ctx context.Context) error {
if !p.hasServiceContainer() {
return nil
}
serviceCtr, err := p.serviceContainer()
if err != nil {
return fmt.Errorf("getting pod's service container: %w", err)
}
serviceCtr.lock.Lock()
defer serviceCtr.lock.Unlock()
if err := serviceCtr.syncContainer(); err != nil {
return err
}
if serviceCtr.state.State == define.ContainerStateRunning {
return nil
}
// Restart will reinit among other things.
return serviceCtr.restartWithTimeout(ctx, 0)
}
// canRemoveServiceContainer returns true if all pods of the service are removed.
// Note that the method acquires the container lock.
func (c *Container) canRemoveServiceContainerLocked() (bool, error) {
c.lock.Lock()
defer c.lock.Unlock()
if err := c.syncContainer(); err != nil {
return false, err
}
if !c.isService() {
return false, fmt.Errorf("internal error: checking service: container %s is not a service container", c.ID())
}
for _, id := range c.state.Service.Pods {
if _, err := c.runtime.LookupPod(id); err != nil {
if errors.Is(err, define.ErrNoSuchPod) {
continue
}
return false, err
}
return false, nil
}
return true, nil
}
// Checks whether the service container can be removed and does so.
func (p *Pod) maybeRemoveServiceContainer() error {
if !p.hasServiceContainer() {
return nil
}
serviceCtr, err := p.serviceContainer()
if err != nil {
return fmt.Errorf("getting pod's service container: %w", err)
}
// Checking whether the service can be stopped must be done in
// the runtime's work queue to resolve ABBA dead locks in the
// pod->container->servicePods hierarchy.
p.runtime.queueWork(func() {
logrus.Debugf("Pod %s has a service %s: checking if it can be removed", p.ID(), serviceCtr.ID())
canRemove, err := serviceCtr.canRemoveServiceContainerLocked()
if err != nil {
logrus.Errorf("Checking whether service of container %s can be removed: %v", serviceCtr.ID(), err)
return
}
if !canRemove {
return
}
timeout := uint(0)
logrus.Debugf("Removing service container %s", serviceCtr.ID())
if err := p.runtime.RemoveContainer(context.Background(), serviceCtr, true, false, &timeout); err != nil {
logrus.Errorf("Removing service container %s: %v", serviceCtr.ID(), err)
}
})
return nil
}