Add pod removal code

Signed-off-by: Matthew Heon <matthew.heon@gmail.com>

Closes: #268
Approved by: rhatdan
This commit is contained in:
Matthew Heon
2018-01-28 05:31:01 -05:00
committed by Atomic Bot
parent cfd6da22df
commit 6b7b4b03a8
5 changed files with 326 additions and 26 deletions

View File

@ -423,6 +423,52 @@ func (s *InMemoryState) RemovePod(pod *Pod) error {
return nil
}
// RemovePodContainers removes all containers from a pod
// This is used to simultaneously remove a number of containers with
// many interdependencies
// Will only remove containers if no dependencies outside of the pod are present
func (s *InMemoryState) RemovePodContainers(pod *Pod) error {
if !pod.valid {
return errors.Wrapf(ErrPodRemoved, "pod %s is not valid", pod.ID())
}
// Get pod containers
podCtrs, ok := s.podContainers[pod.ID()]
if !ok {
return errors.Wrapf(ErrNoSuchPod, "no pod exists in state with ID %s", pod.ID())
}
// Go through container dependencies. Check to see if any are outside the pod.
for ctr := range podCtrs {
ctrDeps, ok := s.ctrDepends[ctr]
if !ok {
return errors.Wrapf(ErrInternal, "cannot find dependencies for container %s", ctr)
}
for _, dep := range ctrDeps {
_, ok := podCtrs[dep]
if !ok {
return errors.Wrapf(ErrCtrExists, "container %s has dependency %s outside of pod %s", ctr, dep, pod.ID())
}
}
}
// All dependencies are OK to remove
// Remove all containers
s.podContainers[pod.ID()] = make(map[string]*Container)
for _, ctr := range podCtrs {
if err := s.ctrIDIndex.Delete(ctr.ID()); err != nil {
return errors.Wrapf(err, "error removing container ID from index")
}
s.ctrNameIndex.Release(ctr.Name())
delete(s.containers, ctr.ID())
delete(s.ctrDepends, ctr.ID())
}
return nil
}
// AddContainerToPod adds a container to the given pod, also adding it to the
// state
func (s *InMemoryState) AddContainerToPod(pod *Pod, ctr *Container) error {

View File

@ -83,6 +83,11 @@ func (r *Runtime) NewContainer(rSpec *spec.Spec, options ...CtrCreateOption) (c
return nil, errors.Wrapf(err, "cannot add container %s to pod %s", ctr.ID(), ctr.config.Pod)
}
// Lock the pod to ensure we can't add containers to pods
// being removed
pod.lock.Lock()
defer pod.lock.Unlock()
if err := r.state.AddContainerToPod(pod, ctr); err != nil {
return nil, err
}
@ -107,6 +112,26 @@ func (r *Runtime) RemoveContainer(c *Container, force bool) error {
// Internal function to remove a container
// Locks the container, but does not lock the runtime
func (r *Runtime) removeContainer(c *Container, force bool) error {
if !c.valid {
// Container probably already removed
// Or was never in the runtime to begin with
return nil
}
// We need to lock the pod before we lock the container
// To avoid races around removing a container and the pod it is in
var pod *Pod
if c.config.Pod != "" {
pod, err := r.state.Pod(c.config.Pod)
if err != nil {
return errors.Wrapf(err, "container %s is in pod %s, but pod cannot be retrieved", c.ID(), pod.ID())
}
// Lock the pod while we're removing container
pod.lock.Lock()
defer pod.lock.Unlock()
}
c.lock.Lock()
defer c.lock.Unlock()
@ -123,16 +148,6 @@ func (r *Runtime) removeContainer(c *Container, force bool) error {
return errors.Wrapf(ErrCtrStateInvalid, "container %s is paused, cannot remove until unpaused", c.ID())
}
// Check that no other containers depend on the container
deps, err := r.state.ContainerInUse(c)
if err != nil {
return err
}
if len(deps) != 0 {
depsStr := strings.Join(deps, ", ")
return errors.Wrapf(ErrCtrExists, "container %s has dependent containers which must be removed before it: %s", c.ID(), depsStr)
}
// Check that the container's in a good state to be removed
if c.state.State == ContainerStateRunning && force {
if err := r.ociRuntime.stopContainer(c, c.StopTimeout()); err != nil {
@ -149,6 +164,16 @@ func (r *Runtime) removeContainer(c *Container, force bool) error {
return errors.Wrapf(ErrCtrStateInvalid, "cannot remove container %s as it is %s - running or paused containers cannot be removed", c.ID(), c.state.State.String())
}
// Check that no other containers depend on the container
deps, err := r.state.ContainerInUse(c)
if err != nil {
return err
}
if len(deps) != 0 {
depsStr := strings.Join(deps, ", ")
return errors.Wrapf(ErrCtrExists, "container %s has dependent containers which must be removed before it: %s", c.ID(), depsStr)
}
// Stop the container's network namespace (if it has one)
if err := r.teardownNetNS(c); err != nil {
return err
@ -161,11 +186,6 @@ func (r *Runtime) removeContainer(c *Container, force bool) error {
// Remove the container from the state
if c.config.Pod != "" {
pod, err := r.state.Pod(c.config.Pod)
if err != nil {
return errors.Wrapf(err, "container %s is in pod %s, but pod cannot be retrieved", c.ID(), pod.ID())
}
if err := r.state.RemoveContainerFromPod(pod, c); err != nil {
return err
}
@ -178,7 +198,7 @@ func (r *Runtime) removeContainer(c *Container, force bool) error {
// Delete the container
// Only do this if we're not ContainerStateConfigured - if we are,
// we haven't been created in the runtime yet
if c.state.State == ContainerStateConfigured {
if c.state.State != ContainerStateConfigured {
if err := r.ociRuntime.deleteContainer(c); err != nil {
return errors.Wrapf(err, "error removing container %s from runc", c.ID())
}

View File

@ -44,12 +44,142 @@ func (r *Runtime) NewPod(options ...PodCreateOption) (*Pod, error) {
return nil, ErrNotImplemented
}
// RemovePod removes a pod and all containers in it
// If force is specified, all containers in the pod will be stopped first
// Otherwise, RemovePod will return an error if any container in the pod is running
// Remove acts atomically, removing all containers or no containers
func (r *Runtime) RemovePod(p *Pod, force bool) error {
return ErrNotImplemented
// RemovePod removes a pod
// If removeCtrs is specified, containers will be removed
// Otherwise, a pod that is not empty will return an error and not be removed
// If force is specified with removeCtrs, all containers will be stopped before
// being removed
// Otherwise, the pod will not be removed if any containers are running
func (r *Runtime) RemovePod(p *Pod, removeCtrs, force bool) error {
r.lock.Lock()
defer r.lock.Unlock()
if !r.valid {
return ErrRuntimeStopped
}
p.lock.Lock()
defer p.lock.Unlock()
if !p.valid {
return ErrPodRemoved
}
ctrs, err := r.state.PodContainers(p)
if err != nil {
return err
}
numCtrs := len(ctrs)
if !removeCtrs && numCtrs > 0 {
return errors.Wrapf(ErrCtrExists, "pod %s contains containers and cannot be removed", p.ID())
}
// Go through and lock all containers so we can operate on them all at once
dependencies := make(map[string][]string)
for _, ctr := range ctrs {
ctr.lock.Lock()
defer ctr.lock.Unlock()
// Sync all containers
if err := ctr.syncContainer(); err != nil {
return err
}
// Check if the container is in a good state to be removed
if ctr.state.State == ContainerStatePaused {
return errors.Wrapf(ErrCtrStateInvalid, "pod %s contains paused container %s, cannot remove", p.ID(), ctr.ID())
}
if ctr.state.State == ContainerStateUnknown {
return errors.Wrapf(ErrCtrStateInvalid, "pod %s contains container %s with invalid state", p.ID(), ctr.ID())
}
// If the container is running and force is not set we can't do anything
if ctr.state.State == ContainerStateRunning {
return errors.Wrapf(ErrCtrStateInvalid, "pod %s contains container %s which is running", p.ID(), ctr.ID())
}
deps, err := r.state.ContainerInUse(ctr)
if err != nil {
return err
}
dependencies[ctr.ID()] = deps
}
// Check if containers have dependencies
// If they do, and the dependencies are not in the pod, error
for ctr, deps := range dependencies {
for _, dep := range deps {
if _, ok := dependencies[dep]; !ok {
return errors.Wrapf(ErrCtrExists, "container %s depends on container %s not in pod %s", ctr, dep, p.ID())
}
}
}
// First loop through all containers and stop them
// Do not remove in this loop to ensure that we don't remove unless all
// containers are in a good state
if force {
for _, ctr := range ctrs {
// If force is set and the container is running, stop it now
if ctr.state.State == ContainerStateRunning {
if err := r.ociRuntime.stopContainer(ctr, ctr.StopTimeout()); err != nil {
return errors.Wrapf(err, "error stopping container %s to remove pod %s", ctr.ID(), p.ID())
}
// Sync again to pick up stopped state
if err := ctr.syncContainer(); err != nil {
return err
}
}
}
}
// Start removing containers
// We can remove containers even if they have dependencies now
// As we have guaranteed their dependencies are in the pod
for _, ctr := range ctrs {
// Stop network NS
if err := r.teardownNetNS(ctr); err != nil {
return err
}
// Stop container's storage
if err := ctr.teardownStorage(); err != nil {
return err
}
// Delete the container from runc (only if we are not
// ContainerStateConfigured)
if ctr.state.State != ContainerStateConfigured {
if err := r.ociRuntime.deleteContainer(ctr); err != nil {
return errors.Wrapf(err, "error removing container %s from runc", ctr.ID())
}
}
}
// Remove containers from the state
if err := r.state.RemovePodContainers(p); err != nil {
return err
}
// Mark containers invalid
for _, ctr := range ctrs {
ctr.valid = false
}
// Remove pod from state
if err := r.state.RemovePod(p); err != nil {
return err
}
// Mark pod invalid
p.valid = false
return nil
}
// GetPod retrieves a pod by its ID

View File

@ -604,7 +604,6 @@ func (s *SQLState) LookupPod(idOrName string) (*Pod, error) {
return nil, errors.Wrapf(ErrNoSuchCtr, "no pod with ID or name %s found", idOrName)
}
return pod, nil
}
@ -652,7 +651,6 @@ func (s *SQLState) PodHasContainer(pod *Pod, ctrID string) (bool, error) {
return false, errors.Wrapf(ErrInternal, "check digit for PodHasContainer query incorrect")
}
return true, nil
}
@ -704,7 +702,6 @@ func (s *SQLState) PodContainersByID(pod *Pod) ([]string, error) {
return nil, errors.Wrapf(err, "error retrieving container rows")
}
return containers, nil
}
@ -820,7 +817,7 @@ func (s *SQLState) RemovePod(pod *Pod) error {
// Check rows acted on for the first transaction, verify we actually removed something
result, err := tx.Exec(query, pod.ID())
if err != nil {
return errors.Wrapf(err, "error removing pod %s from containers table", pod.ID())
return errors.Wrapf(err, "error removing pod %s from pods table", pod.ID())
}
rows, err := result.RowsAffected()
if err != nil {
@ -836,6 +833,108 @@ func (s *SQLState) RemovePod(pod *Pod) error {
return nil
}
// RemovePodContainers removes all containers in a pod simultaneously
// This can avoid issues with dependencies within the pod
// The operation will fail if any container in the pod has a dependency from
// outside the pod
func (s *SQLState) RemovePodContainers(pod *Pod) error {
const (
getPodCtrs = "SELECT Id FROM containers WHERE pod=?;"
removeCtr = "DELETE FROM containers WHERE pod=?;"
removeCtrState = "DELETE FROM containerState WHERE ID=ANY(SELECT Id FROM containers WHERE pod=?);"
)
if !s.valid {
return ErrDBClosed
}
if !pod.valid {
return ErrPodRemoved
}
committed := false
tx, err := s.db.Begin()
if err != nil {
return errors.Wrapf(err, "error beginning database transaction")
}
defer func() {
if err != nil && !committed {
if err2 := tx.Rollback(); err2 != nil {
logrus.Errorf("Error rolling back transaction to remove pod %s containers: %v", pod.ID(), err2)
}
}
}()
// First get all containers in the pod
rows, err := tx.Query(getPodCtrs, pod.ID())
if err != nil {
return errors.Wrapf(err, "error retrieving containers from database")
}
defer rows.Close()
containers := []string{}
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
if err == sql.ErrNoRows {
return ErrNoSuchCtr
}
return errors.Wrapf(err, "error parsing database row into container ID")
}
containers = append(containers, id)
}
if err := rows.Err(); err != nil {
return errors.Wrapf(err, "error retrieving container rows")
}
// Have container IDs, now exec SQL to remove contianers in the pod
// Remove state first, as it needs the subquery on containers
// Don't bother checking if we actually removed anything, we just want to
// empty the pod
_, err = tx.Exec(removeCtrState, pod.ID())
if err != nil {
return errors.Wrapf(err, "error removing pod %s containers from state table", pod.ID())
}
_, err = tx.Exec(removeCtr, pod.ID())
if err != nil {
return errors.Wrapf(err, "error removing pod %s containers from containers table", pod.ID())
}
if err := tx.Commit(); err != nil {
return errors.Wrapf(err, "error committing transaction to add pod %s", pod.ID())
}
committed = true
// Remove JSON files from the containers in question
hasError := false
for _, ctr := range containers {
jsonPath := getSpecPath(s.specsDir, ctr)
if err := os.Remove(jsonPath); err != nil {
logrus.Errorf("Error removing spec JSON for container %s: %v", ctr, err)
hasError = true
}
portsPath := getPortsPath(s.specsDir, ctr)
if err := os.Remove(portsPath); err != nil {
if !os.IsNotExist(err) {
logrus.Errorf("Error removing ports JSON for container %s: %v", ctr, err)
hasError = true
}
}
}
if hasError {
return errors.Wrapf(ErrInternal, "error removing JSON state for some containers")
}
return nil
}
// AddContainerToPod adds a container to the given pod
func (s *SQLState) AddContainerToPod(pod *Pod, ctr *Container) error {
if !pod.valid {

View File

@ -53,6 +53,11 @@ type State interface {
// Removes pod from state
// Only empty pods can be removed from the state
RemovePod(pod *Pod) error
// Remove all containers from a pod
// Used to simulataneously remove containers that might otherwise have
// dependency issues
// Will fail if a dependency outside the pod is encountered
RemovePodContainers(pod *Pod) error
// AddContainerToPod adds a container to an existing pod
// The container given will be added to the state and the pod
AddContainerToPod(pod *Pod, ctr *Container) error