Files
Valentin Rothberg c069d11759 vendor containers/image v1.4
This requires some additional changes to the dependencies since the
progress-bar library has been changed to github.com/vbauerster/mpb.

Please refer to the following link for the release notes:
https://github.com/containers/image/releases/tag/v1.4

Signed-off-by: Valentin Rothberg <rothberg@redhat.com>
2019-02-21 11:54:04 +01:00

252 lines
5.6 KiB
Go

package mpb
import (
"container/heap"
"fmt"
"io"
"io/ioutil"
"os"
"sync"
"time"
"github.com/vbauerster/mpb/cwriter"
)
const (
// default RefreshRate
prr = 120 * time.Millisecond
// default width
pwidth = 80
// default format
pformat = "[=>-]"
)
// Progress represents the container that renders Progress bars
type Progress struct {
wg *sync.WaitGroup
uwg *sync.WaitGroup
operateState chan func(*pState)
done chan struct{}
}
type pState struct {
bHeap *priorityQueue
shutdownPending []*Bar
heapUpdated bool
zeroWait bool
idCounter int
width int
format string
rr time.Duration
cw *cwriter.Writer
pMatrix map[int][]chan int
aMatrix map[int][]chan int
// following are provided by user
uwg *sync.WaitGroup
manualRefreshCh <-chan time.Time
cancel <-chan struct{}
shutdownNotifier chan struct{}
waitBars map[*Bar]*Bar
debugOut io.Writer
}
// New creates new Progress instance, which orchestrates bars rendering process.
// Accepts mpb.ProgressOption funcs for customization.
func New(options ...ProgressOption) *Progress {
pq := make(priorityQueue, 0)
heap.Init(&pq)
s := &pState{
bHeap: &pq,
width: pwidth,
format: pformat,
cw: cwriter.New(os.Stdout),
rr: prr,
waitBars: make(map[*Bar]*Bar),
debugOut: ioutil.Discard,
}
for _, opt := range options {
if opt != nil {
opt(s)
}
}
p := &Progress{
uwg: s.uwg,
wg: new(sync.WaitGroup),
operateState: make(chan func(*pState)),
done: make(chan struct{}),
}
go p.serve(s)
return p
}
// AddBar creates a new progress bar and adds to the container.
func (p *Progress) AddBar(total int64, options ...BarOption) *Bar {
p.wg.Add(1)
result := make(chan *Bar)
select {
case p.operateState <- func(s *pState) {
options = append(options, barWidth(s.width), barFormat(s.format))
b := newBar(p.wg, s.idCounter, total, s.cancel, options...)
if b.runningBar != nil {
s.waitBars[b.runningBar] = b
} else {
heap.Push(s.bHeap, b)
s.heapUpdated = true
}
s.idCounter++
result <- b
}:
return <-result
case <-p.done:
p.wg.Done()
return nil
}
}
// Abort is only effective while bar progress is running,
// it means remove bar now without waiting for its completion.
// If bar is already completed, there is nothing to abort.
// If you need to remove bar after completion, use BarRemoveOnComplete BarOption.
func (p *Progress) Abort(b *Bar, remove bool) {
select {
case p.operateState <- func(s *pState) {
if b.index < 0 {
return
}
if remove {
s.heapUpdated = heap.Remove(s.bHeap, b.index) != nil
}
s.shutdownPending = append(s.shutdownPending, b)
}:
case <-p.done:
}
}
// UpdateBarPriority provides a way to change bar's order position.
// Zero is highest priority, i.e. bar will be on top.
func (p *Progress) UpdateBarPriority(b *Bar, priority int) {
select {
case p.operateState <- func(s *pState) { s.bHeap.update(b, priority) }:
case <-p.done:
}
}
// BarCount returns bars count
func (p *Progress) BarCount() int {
result := make(chan int, 1)
select {
case p.operateState <- func(s *pState) { result <- s.bHeap.Len() }:
return <-result
case <-p.done:
return 0
}
}
// Wait first waits for user provided *sync.WaitGroup, if any,
// then waits far all bars to complete and finally shutdowns master goroutine.
// After this method has been called, there is no way to reuse *Progress instance.
func (p *Progress) Wait() {
if p.uwg != nil {
p.uwg.Wait()
}
p.wg.Wait()
select {
case p.operateState <- func(s *pState) { s.zeroWait = true }:
<-p.done
case <-p.done:
}
}
func (s *pState) updateSyncMatrix() {
s.pMatrix = make(map[int][]chan int)
s.aMatrix = make(map[int][]chan int)
for i := 0; i < s.bHeap.Len(); i++ {
bar := (*s.bHeap)[i]
table := bar.wSyncTable()
pRow, aRow := table[0], table[1]
for i, ch := range pRow {
s.pMatrix[i] = append(s.pMatrix[i], ch)
}
for i, ch := range aRow {
s.aMatrix[i] = append(s.aMatrix[i], ch)
}
}
}
func (s *pState) render(tw int) {
if s.heapUpdated {
s.updateSyncMatrix()
s.heapUpdated = false
}
syncWidth(s.pMatrix)
syncWidth(s.aMatrix)
for i := 0; i < s.bHeap.Len(); i++ {
bar := (*s.bHeap)[i]
go bar.render(s.debugOut, tw)
}
if err := s.flush(s.bHeap.Len()); err != nil {
fmt.Fprintf(s.debugOut, "%s %s %v\n", "[mpb]", time.Now(), err)
}
}
func (s *pState) flush(lineCount int) error {
for s.bHeap.Len() > 0 {
bar := heap.Pop(s.bHeap).(*Bar)
frameReader := <-bar.frameReaderCh
defer func() {
if frameReader.toShutdown {
// shutdown at next flush, in other words decrement underlying WaitGroup
// only after the bar with completed state has been flushed.
// this ensures no bar ends up with less than 100% rendered.
s.shutdownPending = append(s.shutdownPending, bar)
if replacementBar, ok := s.waitBars[bar]; ok {
heap.Push(s.bHeap, replacementBar)
s.heapUpdated = true
delete(s.waitBars, bar)
}
if frameReader.removeOnComplete {
s.heapUpdated = true
return
}
}
heap.Push(s.bHeap, bar)
}()
s.cw.ReadFrom(frameReader)
lineCount += frameReader.extendedLines
}
for i := len(s.shutdownPending) - 1; i >= 0; i-- {
close(s.shutdownPending[i].shutdown)
s.shutdownPending = s.shutdownPending[:i]
}
return s.cw.Flush(lineCount)
}
func syncWidth(matrix map[int][]chan int) {
for _, column := range matrix {
column := column
go func() {
var maxWidth int
for _, ch := range column {
w := <-ch
if w > maxWidth {
maxWidth = w
}
}
for _, ch := range column {
ch <- maxWidth
}
}()
}
}