Swap from map to channels for podman stop workers

We were encountering sync issues with the map, so swap to a
thread-safe channel and convert into a map when we output

Signed-off-by: Matthew Heon <matthew.heon@gmail.com>
This commit is contained in:
Matthew Heon
2018-10-11 15:24:05 -04:00
parent 5f6e4cc830
commit e0c980925b

View File

@ -224,11 +224,16 @@ type workerInput struct {
parallelFunc pFunc parallelFunc pFunc
} }
type containerError struct {
containerID string
err error
}
// worker is a "threaded" worker that takes jobs from the channel "queue" // worker is a "threaded" worker that takes jobs from the channel "queue"
func worker(wg *sync.WaitGroup, jobs <-chan workerInput, results map[string]error) { func worker(wg *sync.WaitGroup, jobs <-chan workerInput, results chan<- containerError) {
for j := range jobs { for j := range jobs {
err := j.parallelFunc() err := j.parallelFunc()
results[j.containerID] = err results <- containerError{containerID: j.containerID, err: err}
wg.Done() wg.Done()
} }
} }
@ -239,6 +244,8 @@ func parallelExecuteWorkerPool(workers int, functions []workerInput) map[string]
var ( var (
wg sync.WaitGroup wg sync.WaitGroup
) )
resultChan := make(chan containerError, len(functions))
results := make(map[string]error) results := make(map[string]error)
paraJobs := make(chan workerInput, len(functions)) paraJobs := make(chan workerInput, len(functions))
@ -249,7 +256,7 @@ func parallelExecuteWorkerPool(workers int, functions []workerInput) map[string]
// Create the workers // Create the workers
for w := 1; w <= workers; w++ { for w := 1; w <= workers; w++ {
go worker(&wg, paraJobs, results) go worker(&wg, paraJobs, resultChan)
} }
// Add jobs to the workers // Add jobs to the workers
@ -261,5 +268,11 @@ func parallelExecuteWorkerPool(workers int, functions []workerInput) map[string]
close(paraJobs) close(paraJobs)
wg.Wait() wg.Wait()
close(resultChan)
for ctrError := range resultChan {
results[ctrError.containerID] = ctrError.err
}
return results return results
} }