Add --max-workers and heuristics for parallel operations

add a global flag for --max-workers so users can limit the number
of parallel operations for a given function.  also, when not limited
by max-workers, we implement a heuristic function that returns the
number of preferred parallel workers based on the number of CPUs and
the given operation.

Signed-off-by: baude <bbaude@redhat.com>
This commit is contained in:
baude
2018-10-23 18:40:34 -05:00
parent 57f778aed9
commit 3e5a5c68da
7 changed files with 135 additions and 83 deletions

View File

@ -211,6 +211,11 @@ func main() {
Value: hooks.DefaultDir,
Hidden: true,
},
cli.IntFlag{
Name: "max-workers",
Usage: "the maximum number of workers for parallel operations",
Hidden: true,
},
cli.StringFlag{
Name: "log-level",
Usage: "log messages above specified level: debug, info, warn, error (default), fatal or panic",

View File

@ -20,6 +20,7 @@ import (
"github.com/cri-o/ocicni/pkg/ocicni"
"github.com/docker/go-units"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
"k8s.io/apimachinery/pkg/fields"
)
@ -300,7 +301,13 @@ func psCmd(c *cli.Context) error {
outputContainers = []*libpod.Container{latestCtr}
}
pss := shared.PBatch(outputContainers, 8, opts)
maxWorkers := shared.Parallelize("ps")
if c.GlobalIsSet("max-workers") {
maxWorkers = c.GlobalInt("max-workers")
}
logrus.Debugf("Setting maximum workers to %d", maxWorkers)
pss := shared.PBatch(outputContainers, maxWorkers, opts)
if opts.Sort != "" {
pss, err = sortPsOutput(opts.Sort, pss)
if err != nil {

View File

@ -2,11 +2,11 @@ package main
import (
"fmt"
rt "runtime"
"github.com/containers/libpod/cmd/podman/libpodruntime"
"github.com/containers/libpod/cmd/podman/shared"
"github.com/containers/libpod/libpod"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
)
@ -48,14 +48,13 @@ func rmCmd(c *cli.Context) error {
var (
delContainers []*libpod.Container
lastError error
deleteFuncs []workerInput
deleteFuncs []shared.ParallelWorkerInput
)
ctx := getContext()
if err := validateFlags(c, rmFlags); err != nil {
return err
}
runtime, err := libpodruntime.GetRuntime(c)
if err != nil {
return errors.Wrapf(err, "could not get runtime")
@ -69,17 +68,23 @@ func rmCmd(c *cli.Context) error {
delContainers, lastError = getAllOrLatestContainers(c, runtime, -1, "all")
for _, container := range delContainers {
con := container
f := func() error {
return runtime.RemoveContainer(ctx, container, c.Bool("force"))
return runtime.RemoveContainer(ctx, con, c.Bool("force"))
}
deleteFuncs = append(deleteFuncs, workerInput{
containerID: container.ID(),
parallelFunc: f,
deleteFuncs = append(deleteFuncs, shared.ParallelWorkerInput{
ContainerID: con.ID(),
ParallelFunc: f,
})
}
maxWorkers := shared.Parallelize("rm")
if c.GlobalIsSet("max-workers") {
maxWorkers = c.GlobalInt("max-workers")
}
logrus.Debugf("Setting maximum workers to %d", maxWorkers)
deleteErrors := parallelExecuteWorkerPool(rt.NumCPU()*3, deleteFuncs)
deleteErrors := shared.ParallelExecuteWorkerPool(maxWorkers, deleteFuncs)
for cid, result := range deleteErrors {
if result != nil {
fmt.Println(result.Error())

View File

@ -226,10 +226,10 @@ func NewBatchContainer(ctr *libpod.Container, opts PsOptions) (PsContainerOutput
return pso, nil
}
type pFunc func() (PsContainerOutput, error)
type batchFunc func() (PsContainerOutput, error)
type workerInput struct {
parallelFunc pFunc
parallelFunc batchFunc
opts PsOptions
cid string
job int

View File

@ -0,0 +1,91 @@
package shared
import (
"runtime"
"sync"
)
type pFunc func() error
// ParallelWorkerInput is a struct used to pass in a slice of parallel funcs to be
// performed on a container ID
type ParallelWorkerInput struct {
ContainerID string
ParallelFunc pFunc
}
type containerError struct {
ContainerID string
Err error
}
// ParallelWorker is a "threaded" worker that takes jobs from the channel "queue"
func ParallelWorker(wg *sync.WaitGroup, jobs <-chan ParallelWorkerInput, results chan<- containerError) {
for j := range jobs {
err := j.ParallelFunc()
results <- containerError{ContainerID: j.ContainerID, Err: err}
wg.Done()
}
}
// ParallelExecuteWorkerPool takes container jobs and performs them in parallel. The worker
// int determines how many workers/threads should be premade.
func ParallelExecuteWorkerPool(workers int, functions []ParallelWorkerInput) map[string]error {
var (
wg sync.WaitGroup
)
resultChan := make(chan containerError, len(functions))
results := make(map[string]error)
paraJobs := make(chan ParallelWorkerInput, len(functions))
// If we have more workers than functions, match up the number of workers and functions
if workers > len(functions) {
workers = len(functions)
}
// Create the workers
for w := 1; w <= workers; w++ {
go ParallelWorker(&wg, paraJobs, resultChan)
}
// Add jobs to the workers
for _, j := range functions {
j := j
wg.Add(1)
paraJobs <- j
}
close(paraJobs)
wg.Wait()
close(resultChan)
for ctrError := range resultChan {
results[ctrError.ContainerID] = ctrError.Err
}
return results
}
// Parallelize provides the maximum number of parallel workers (int) as calculated by a basic
// heuristic. This can be overriden by the --max-workers primary switch to podman.
func Parallelize(job string) int {
numCpus := runtime.NumCPU()
switch job {
case "stop":
if numCpus <= 2 {
return 4
} else {
return numCpus * 3
}
case "rm":
if numCpus <= 3 {
return numCpus * 3
} else {
return numCpus * 4
}
case "ps":
return 8
}
return 3
}

View File

@ -2,12 +2,12 @@ package main
import (
"fmt"
rt "runtime"
"github.com/containers/libpod/cmd/podman/libpodruntime"
"github.com/containers/libpod/cmd/podman/shared"
"github.com/containers/libpod/libpod"
"github.com/containers/libpod/pkg/rootless"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
)
@ -61,7 +61,7 @@ func stopCmd(c *cli.Context) error {
containers, lastError := getAllOrLatestContainers(c, runtime, libpod.ContainerStateRunning, "running")
var stopFuncs []workerInput
var stopFuncs []shared.ParallelWorkerInput
for _, ctr := range containers {
con := ctr
var stopTimeout uint
@ -73,13 +73,19 @@ func stopCmd(c *cli.Context) error {
f := func() error {
return con.StopWithTimeout(stopTimeout)
}
stopFuncs = append(stopFuncs, workerInput{
containerID: con.ID(),
parallelFunc: f,
stopFuncs = append(stopFuncs, shared.ParallelWorkerInput{
ContainerID: con.ID(),
ParallelFunc: f,
})
}
stopErrors := parallelExecuteWorkerPool(rt.NumCPU()*3, stopFuncs)
maxWorkers := shared.Parallelize("stop")
if c.GlobalIsSet("max-workers") {
maxWorkers = c.GlobalInt("max-workers")
}
logrus.Debugf("Setting maximum workers to %d", maxWorkers)
stopErrors := shared.ParallelExecuteWorkerPool(maxWorkers, stopFuncs)
for cid, result := range stopErrors {
if result != nil && result != libpod.ErrCtrStopped {

View File

@ -3,10 +3,6 @@ package main
import (
"context"
"fmt"
"os"
gosignal "os/signal"
"sync"
"github.com/containers/libpod/libpod"
"github.com/docker/docker/pkg/signal"
"github.com/docker/docker/pkg/term"
@ -15,6 +11,8 @@ import (
"github.com/urfave/cli"
"golang.org/x/crypto/ssh/terminal"
"k8s.io/client-go/tools/remotecommand"
"os"
gosignal "os/signal"
)
type RawTtyFormatter struct {
@ -209,63 +207,3 @@ func getPodsFromContext(c *cli.Context, r *libpod.Runtime) ([]*libpod.Pod, error
}
return pods, lastError
}
type pFunc func() error
type workerInput struct {
containerID string
parallelFunc pFunc
}
type containerError struct {
containerID string
err error
}
// worker is a "threaded" worker that takes jobs from the channel "queue"
func worker(wg *sync.WaitGroup, jobs <-chan workerInput, results chan<- containerError) {
for j := range jobs {
err := j.parallelFunc()
results <- containerError{containerID: j.containerID, err: err}
wg.Done()
}
}
// parallelExecuteWorkerPool takes container jobs and performs them in parallel. The worker
// int is determines how many workers/threads should be premade.
func parallelExecuteWorkerPool(workers int, functions []workerInput) map[string]error {
var (
wg sync.WaitGroup
)
resultChan := make(chan containerError, len(functions))
results := make(map[string]error)
paraJobs := make(chan workerInput, len(functions))
// If we have more workers than functions, match up the number of workers and functions
if workers > len(functions) {
workers = len(functions)
}
// Create the workers
for w := 1; w <= workers; w++ {
go worker(&wg, paraJobs, resultChan)
}
// Add jobs to the workers
for _, j := range functions {
j := j
wg.Add(1)
paraJobs <- j
}
close(paraJobs)
wg.Wait()
close(resultChan)
for ctrError := range resultChan {
results[ctrError.containerID] = ctrError.err
}
return results
}