Merge pull request #6495 from mheon/parallel_execution

Add parallel execution code for container operations
This commit is contained in:
OpenShift Merge Robot
2020-06-05 12:27:25 -04:00
committed by GitHub
4 changed files with 124 additions and 9 deletions

View File

@ -4,12 +4,14 @@ import (
"fmt"
"os"
"path"
"runtime"
"runtime/pprof"
"strings"
"github.com/containers/libpod/cmd/podman/registry"
"github.com/containers/libpod/cmd/podman/validate"
"github.com/containers/libpod/pkg/domain/entities"
"github.com/containers/libpod/pkg/parallel"
"github.com/containers/libpod/pkg/rootless"
"github.com/containers/libpod/pkg/tracing"
"github.com/containers/libpod/version"
@ -137,6 +139,13 @@ func persistentPreRunE(cmd *cobra.Command, args []string) error {
opentracing.StartSpanFromContext(cfg.SpanCtx, cmd.Name())
}
if cfg.MaxWorks <= 0 {
return errors.Errorf("maximum workers must be set to a positive number (got %d)", cfg.MaxWorks)
}
if err := parallel.SetMaxThreads(uint(cfg.MaxWorks)); err != nil {
return err
}
// Setup Rootless environment, IFF:
// 1) in ABI mode
// 2) running as non-root
@ -216,7 +225,7 @@ func rootFlags(opts *entities.PodmanConfig, flags *pflag.FlagSet) {
flags.StringVar(&cfg.Containers.DefaultMountsFile, "default-mounts-file", cfg.Containers.DefaultMountsFile, "Path to default mounts file")
flags.StringVar(&cfg.Engine.EventsLogger, "events-backend", cfg.Engine.EventsLogger, `Events backend to use ("file"|"journald"|"none")`)
flags.StringSliceVar(&cfg.Engine.HooksDir, "hooks-dir", cfg.Engine.HooksDir, "Set the OCI hooks directory path (may be set multiple times)")
flags.IntVar(&opts.MaxWorks, "max-workers", 0, "The maximum number of workers for parallel operations")
flags.IntVar(&opts.MaxWorks, "max-workers", (runtime.NumCPU()*3)+1, "The maximum number of workers for parallel operations")
flags.StringVar(&cfg.Engine.Namespace, "namespace", cfg.Engine.Namespace, "Set the libpod namespace, used to create separate views of the containers and pods on the system")
flags.StringVar(&cfg.Engine.StaticDir, "root", "", "Path to the root directory in which data, including images, is stored")
flags.StringVar(&opts.RegistriesConf, "registries-conf", "", "Path to a registries.conf to use for image processing")

View File

@ -23,6 +23,7 @@ import (
"github.com/containers/libpod/pkg/checkpoint"
"github.com/containers/libpod/pkg/domain/entities"
"github.com/containers/libpod/pkg/domain/infra/abi/terminal"
"github.com/containers/libpod/pkg/parallel"
"github.com/containers/libpod/pkg/ps"
"github.com/containers/libpod/pkg/rootless"
"github.com/containers/libpod/pkg/signal"
@ -321,21 +322,25 @@ func (ic *ContainerEngine) ContainerRm(ctx context.Context, namesOrIds []string,
return reports, nil
}
for _, c := range ctrs {
report := entities.RmReport{Id: c.ID()}
errMap, err := parallel.ParallelContainerOp(ctx, ctrs, func(c *libpod.Container) error {
err := ic.Libpod.RemoveContainer(ctx, c, options.Force, options.Volumes)
if err != nil {
if options.Ignore && errors.Cause(err) == define.ErrNoSuchCtr {
logrus.Debugf("Ignoring error (--allow-missing): %v", err)
reports = append(reports, &report)
continue
return nil
}
logrus.Debugf("Failed to remove container %s: %s", c.ID(), err.Error())
report.Err = err
reports = append(reports, &report)
continue
}
reports = append(reports, &report)
return err
})
if err != nil {
return nil, err
}
for ctr, err := range errMap {
report := new(entities.RmReport)
report.Id = ctr.ID()
report.Err = err
reports = append(reports, report)
}
return reports, nil
}

44
pkg/parallel/parallel.go Normal file
View File

@ -0,0 +1,44 @@
package parallel
import (
"sync"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
)
var (
// Maximum number of jobs that will be used.
// Set a low, but non-zero, default. We'll be overriding it by default
// anyways.
numThreads uint = 8
// Semaphore to control thread creation and ensure numThreads is
// respected.
jobControl *semaphore.Weighted
// Lock to control changing the semaphore - we don't want to do it
// while anyone is using it.
jobControlLock sync.RWMutex
)
// SetMaxThreads sets the number of threads that will be used for parallel jobs.
func SetMaxThreads(threads uint) error {
if threads == 0 {
return errors.New("must give a non-zero number of threads to execute with")
}
jobControlLock.Lock()
defer jobControlLock.Unlock()
numThreads = threads
jobControl = semaphore.NewWeighted(int64(threads))
logrus.Infof("Setting parallel job count to %d", threads)
return nil
}
// GetMaxThreads returns the current number of threads that will be used for
// parallel jobs.
func GetMaxThreads() uint {
return numThreads
}

View File

@ -0,0 +1,57 @@
package parallel
import (
"context"
"sync"
"github.com/containers/libpod/libpod"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// ParallelContainerOp performs the given function on the given set of
// containers, using a number of parallel threads.
// If no error is returned, each container specified in ctrs will have an entry
// in the resulting map; containers with no error will be set to nil.
func ParallelContainerOp(ctx context.Context, ctrs []*libpod.Container, applyFunc func(*libpod.Container) error) (map[*libpod.Container]error, error) {
jobControlLock.RLock()
defer jobControlLock.RUnlock()
// We could use a sync.Map but given Go's lack of generic I'd rather
// just use a lock on a normal map...
// The expectation is that most of the time is spent in applyFunc
// anyways.
var (
errMap map[*libpod.Container]error = make(map[*libpod.Container]error)
errLock sync.Mutex
allDone sync.WaitGroup
)
for _, ctr := range ctrs {
// Block until a thread is available
if err := jobControl.Acquire(ctx, 1); err != nil {
return nil, errors.Wrapf(err, "error acquiring job control semaphore")
}
allDone.Add(1)
c := ctr
go func() {
logrus.Debugf("Launching job on container %s", c.ID())
err := applyFunc(c)
errLock.Lock()
errMap[c] = err
errLock.Unlock()
allDone.Done()
jobControl.Release(1)
}()
}
allDone.Wait()
return errMap, nil
}
// TODO: Add an Enqueue() function that returns a promise