From 5056bd45f84f2fab332498610a377277d1265ae5 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 20 Feb 2023 12:20:18 +0000 Subject: [PATCH] build(deps): bump github.com/vbauerster/mpb/v8 from 8.1.6 to 8.2.0 Bumps [github.com/vbauerster/mpb/v8](https://github.com/vbauerster/mpb) from 8.1.6 to 8.2.0. - [Release notes](https://github.com/vbauerster/mpb/releases) - [Commits](https://github.com/vbauerster/mpb/compare/v8.1.6...v8.2.0) --- updated-dependencies: - dependency-name: github.com/vbauerster/mpb/v8 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- go.mod | 2 +- go.sum | 4 +- vendor/github.com/vbauerster/mpb/v8/bar.go | 123 +++-- .../vbauerster/mpb/v8/bar_option.go | 17 +- .../vbauerster/mpb/v8/container_option.go | 48 +- .../vbauerster/mpb/v8/cwriter/writer.go | 5 + .../vbauerster/mpb/v8/decor/decorator.go | 4 +- .../vbauerster/mpb/v8/decor/merge.go | 2 +- .../vbauerster/mpb/v8/decor/on_abort.go | 2 +- .../vbauerster/mpb/v8/decor/on_complete.go | 2 +- .../vbauerster/mpb/v8/heap_manager.go | 171 +++++++ .../vbauerster/mpb/v8/priority_queue.go | 2 +- .../github.com/vbauerster/mpb/v8/progress.go | 419 ++++++++---------- vendor/modules.txt | 2 +- 14 files changed, 459 insertions(+), 344 deletions(-) create mode 100644 vendor/github.com/vbauerster/mpb/v8/heap_manager.go diff --git a/go.mod b/go.mod index 1bcfe32b47..9f0eab62b1 100644 --- a/go.mod +++ b/go.mod @@ -57,7 +57,7 @@ require ( github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635 github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/ulikunitz/xz v0.5.11 - github.com/vbauerster/mpb/v8 v8.1.6 + github.com/vbauerster/mpb/v8 v8.2.0 github.com/vishvananda/netlink v1.2.1-beta.2 go.etcd.io/bbolt v1.3.7 golang.org/x/net v0.7.0 diff --git a/go.sum b/go.sum index 7d4f9c041a..e7a57c60fc 100644 --- a/go.sum +++ b/go.sum @@ -1056,8 +1056,8 @@ github.com/urfave/cli v1.22.4/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX github.com/urfave/cli v1.22.9/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/vbatts/tar-split v0.11.2 h1:Via6XqJr0hceW4wff3QRzD5gAk/tatMw/4ZA7cTlIME= github.com/vbatts/tar-split v0.11.2/go.mod h1:vV3ZuO2yWSVsz+pfFzDG/upWH1JhjOiEaWq6kXyQ3VI= -github.com/vbauerster/mpb/v8 v8.1.6 h1:EswHDkAsy4OQ7QBAmU1MUPz4vHzl6KlINjlh7vJoxvY= -github.com/vbauerster/mpb/v8 v8.1.6/go.mod h1:O9/Wl8X9dUbR63tZ41MLIAxrtNfwlpwUhGkeYugUPW8= +github.com/vbauerster/mpb/v8 v8.2.0 h1:zaH0DaIcUoOeItZ/Yy567ZhaPUC3GMhUyHollQDgZvs= +github.com/vbauerster/mpb/v8 v8.2.0/go.mod h1:HEVcHNizbUIg0l4Qwhw0BDvg50zo3CMiWkbz1WUEQ94= github.com/vishvananda/netlink v0.0.0-20181108222139-023a6dafdcdf/go.mod h1:+SR5DhBJrl6ZM7CoCKvpw5BKroDKQ+PJqOg65H/2ktk= github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE= github.com/vishvananda/netlink v1.1.1-0.20201029203352-d40f9887b852/go.mod h1:twkDnbuQxJYemMlGd4JFIcuhgX83tXhKS2B/PRMpOho= diff --git a/vendor/github.com/vbauerster/mpb/v8/bar.go b/vendor/github.com/vbauerster/mpb/v8/bar.go index b71e6b4659..f6cba15bd3 100644 --- a/vendor/github.com/vbauerster/mpb/v8/bar.go +++ b/vendor/github.com/vbauerster/mpb/v8/bar.go @@ -41,8 +41,10 @@ type bState struct { completed bool aborted bool triggerComplete bool - dropOnComplete bool + rmOnComplete bool noPop bool + autoRefresh bool + manualRefresh bool aDecorators []decor.Decorator pDecorators []decor.Decorator averageDecorators []decor.AverageDecorator @@ -50,24 +52,22 @@ type bState struct { shutdownListeners []decor.ShutdownListener buffers [3]*bytes.Buffer filler BarFiller - middleware func(BarFiller) BarFiller extender extenderFunc - manualRefresh chan interface{} - - wait struct { - bar *Bar // key for (*pState).queueBars - sync bool - } + refreshCh chan time.Time + waitBar *Bar // key for (*pState).queueBars } type renderFrame struct { - rows []io.Reader - shutdown bool - err error + rows []io.Reader + shutdown int + rmOnComplete bool + noPop bool + done bool + err error } -func newBar(container *Progress, bs *bState) *Bar { - ctx, cancel := context.WithCancel(container.ctx) +func newBar(ctx context.Context, container *Progress, bs *bState) *Bar { + ctx, cancel := context.WithCancel(ctx) bar := &Bar{ priority: bs.priority, @@ -78,6 +78,7 @@ func newBar(container *Progress, bs *bState) *Bar { cancel: cancel, } + container.bwg.Add(1) go bar.serve(ctx, bs) return bar } @@ -153,20 +154,22 @@ func (b *Bar) SetRefill(amount int64) { // TraverseDecorators traverses all available decorators and calls cb func on each. func (b *Bar) TraverseDecorators(cb func(decor.Decorator)) { - sync := make(chan struct{}) + iter := make(chan decor.Decorator) select { case b.operateState <- func(s *bState) { - defer close(sync) for _, decorators := range [][]decor.Decorator{ s.pDecorators, s.aDecorators, } { for _, d := range decorators { - cb(extractBaseDecorator(d)) + iter <- d } } + close(iter) }: - <-sync + for d := range iter { + cb(unwrap(d)) + } case <-b.done: } } @@ -185,7 +188,7 @@ func (b *Bar) EnableTriggerComplete() { if s.current >= s.total { s.current = s.total s.completed = true - b.forceRefresh(s.manualRefresh) + b.triggerCompletion(s) } else { s.triggerComplete = true } @@ -197,9 +200,9 @@ func (b *Bar) EnableTriggerComplete() { // SetTotal sets total to an arbitrary value. It's effective only for // bar which was constructed with `total <= 0`. Setting total to negative // value is equivalent to (*Bar).SetTotal((*Bar).Current(), bool) but faster. -// If triggerCompleteNow is true, total value is set to current and +// If triggerCompletion is true, total value is set to current and // complete event is triggered right away. -func (b *Bar) SetTotal(total int64, triggerCompleteNow bool) { +func (b *Bar) SetTotal(total int64, triggerCompletion bool) { select { case b.operateState <- func(s *bState) { if s.triggerComplete { @@ -210,10 +213,10 @@ func (b *Bar) SetTotal(total int64, triggerCompleteNow bool) { } else { s.total = total } - if triggerCompleteNow { + if triggerCompletion { s.current = s.total s.completed = true - b.forceRefresh(s.manualRefresh) + b.triggerCompletion(s) } }: case <-b.done: @@ -231,7 +234,7 @@ func (b *Bar) SetCurrent(current int64) { if s.triggerComplete && s.current >= s.total { s.current = s.total s.completed = true - b.forceRefresh(s.manualRefresh) + b.triggerCompletion(s) } }: case <-b.done: @@ -253,7 +256,7 @@ func (b *Bar) EwmaSetCurrent(current int64, iterDur time.Duration) { if s.triggerComplete && s.current >= s.total { s.current = s.total s.completed = true - b.forceRefresh(s.manualRefresh) + b.triggerCompletion(s) } }: case <-b.done: @@ -281,7 +284,7 @@ func (b *Bar) IncrInt64(n int64) { if s.triggerComplete && s.current >= s.total { s.current = s.total s.completed = true - b.forceRefresh(s.manualRefresh) + b.triggerCompletion(s) } }: case <-b.done: @@ -311,7 +314,7 @@ func (b *Bar) EwmaIncrInt64(n int64, iterDur time.Duration) { if s.triggerComplete && s.current >= s.total { s.current = s.total s.completed = true - b.forceRefresh(s.manualRefresh) + b.triggerCompletion(s) } }: case <-b.done: @@ -348,8 +351,8 @@ func (b *Bar) Abort(drop bool) { return } s.aborted = true - s.dropOnComplete = drop - b.forceRefresh(s.manualRefresh) + s.rmOnComplete = drop + b.triggerCompletion(s) }: case <-b.done: } @@ -382,9 +385,7 @@ func (b *Bar) Completed() bool { func (b *Bar) IsRunning() bool { result := make(chan bool) select { - case b.operateState <- func(s *bState) { - result <- !s.completed && !s.aborted - }: + case b.operateState <- func(s *bState) { result <- !s.completed && !s.aborted }: return <-result case <-b.done: return false @@ -398,9 +399,6 @@ func (b *Bar) Wait() { func (b *Bar) serve(ctx context.Context, bs *bState) { defer b.container.bwg.Done() - if bs.wait.bar != nil && bs.wait.sync { - bs.wait.bar.Wait() - } for { select { case op := <-b.operateState: @@ -433,10 +431,16 @@ func (b *Bar) render(tw int) { return } } - frame := &renderFrame{rows: rows} + frame := &renderFrame{ + rows: rows, + shutdown: s.shutdown, + rmOnComplete: s.rmOnComplete, + noPop: s.noPop, + done: done, + } if s.completed || s.aborted { - frame.shutdown = !done || s.shutdown == 1 - b.cancel() + // post increment makes sure OnComplete decorators are rendered + s.shutdown++ } b.frameCh <- frame } @@ -448,17 +452,22 @@ func (b *Bar) render(tw int) { } } -func (b *Bar) forceRefresh(refreshCh chan interface{}) { - b.container.bwg.Add(1) - go b.forceRefreshImpl(refreshCh) +func (b *Bar) triggerCompletion(s *bState) { + if s.autoRefresh { + // Technically this call isn't required, but if refresh rate is set to + // one hour for example and bar completes within a few minutes p.Wait() + // will wait for one hour. This call helps to avoid unnecessary waiting. + go b.tryEarlyRefresh(s.refreshCh) + } else if !s.manualRefresh { + b.cancel() + } } -func (b *Bar) forceRefreshImpl(refreshCh chan interface{}) { - defer b.container.bwg.Done() +func (b *Bar) tryEarlyRefresh(refreshCh chan<- time.Time) { var anyOtherRunning bool b.container.traverseBars(func(bar *Bar) bool { anyOtherRunning = b != bar && bar.IsRunning() - return !anyOtherRunning + return anyOtherRunning }) if !anyOtherRunning { for { @@ -584,26 +593,6 @@ func (s *bState) wSyncTable() (table syncTable) { return table } -func (s *bState) subscribeDecorators() { - for _, decorators := range [][]decor.Decorator{ - s.pDecorators, - s.aDecorators, - } { - for _, d := range decorators { - d = extractBaseDecorator(d) - if d, ok := d.(decor.AverageDecorator); ok { - s.averageDecorators = append(s.averageDecorators, d) - } - if d, ok := d.(decor.EwmaDecorator); ok { - s.ewmaDecorators = append(s.ewmaDecorators, d) - } - if d, ok := d.(decor.ShutdownListener); ok { - s.shutdownListeners = append(s.shutdownListeners, d) - } - } - } -} - func (s bState) ewmaUpdate(n int64, dur time.Duration) { var wg sync.WaitGroup for i := 0; i < len(s.ewmaDecorators); i++ { @@ -643,11 +632,11 @@ func (s bState) decoratorShutdownNotify() { for i := 0; i < len(s.shutdownListeners); i++ { switch d := s.shutdownListeners[i]; i { case len(s.shutdownListeners) - 1: - d.Shutdown() + d.OnShutdown() default: wg.Add(1) go func() { - d.Shutdown() + d.OnShutdown() wg.Done() }() } @@ -668,9 +657,9 @@ func newStatistics(tw int, s *bState) decor.Statistics { } } -func extractBaseDecorator(d decor.Decorator) decor.Decorator { +func unwrap(d decor.Decorator) decor.Decorator { if d, ok := d.(decor.Wrapper); ok { - return extractBaseDecorator(d.Base()) + return unwrap(d.Unwrap()) } return d } diff --git a/vendor/github.com/vbauerster/mpb/v8/bar_option.go b/vendor/github.com/vbauerster/mpb/v8/bar_option.go index 953e9d2781..b07fb43764 100644 --- a/vendor/github.com/vbauerster/mpb/v8/bar_option.go +++ b/vendor/github.com/vbauerster/mpb/v8/bar_option.go @@ -59,15 +59,9 @@ func BarWidth(width int) BarOption { // BarQueueAfter puts this (being constructed) bar into the queue. // BarPriority will be inherited from the argument bar. // When argument bar completes or aborts queued bar replaces its place. -// If sync is true queued bar is suspended until argument bar completes -// or aborts. -func BarQueueAfter(bar *Bar, sync bool) BarOption { - if bar == nil { - return nil - } +func BarQueueAfter(bar *Bar) BarOption { return func(s *bState) { - s.wait.bar = bar - s.wait.sync = sync + s.waitBar = bar } } @@ -75,7 +69,7 @@ func BarQueueAfter(bar *Bar, sync bool) BarOption { // on complete event. func BarRemoveOnComplete() BarOption { return func(s *bState) { - s.dropOnComplete = true + s.rmOnComplete = true } } @@ -101,7 +95,10 @@ func BarFillerOnComplete(message string) BarOption { // BarFillerMiddleware provides a way to augment the underlying BarFiller. func BarFillerMiddleware(middle func(BarFiller) BarFiller) BarOption { return func(s *bState) { - s.middleware = middle + if middle == nil { + return + } + s.filler = middle(s.filler) } } diff --git a/vendor/github.com/vbauerster/mpb/v8/container_option.go b/vendor/github.com/vbauerster/mpb/v8/container_option.go index da0e9b1aca..6664e53134 100644 --- a/vendor/github.com/vbauerster/mpb/v8/container_option.go +++ b/vendor/github.com/vbauerster/mpb/v8/container_option.go @@ -39,10 +39,23 @@ func WithRefreshRate(d time.Duration) ContainerOption { // WithManualRefresh disables internal auto refresh time.Ticker. // Refresh will occur upon receive value from provided ch. -func WithManualRefresh(ch chan interface{}) ContainerOption { +func WithManualRefresh(ch <-chan interface{}) ContainerOption { return func(s *pState) { - s.manualRefresh = ch - s.disableAutoRefresh = true + s.manualRefresh = true + go func(refreshCh chan<- time.Time, done <-chan struct{}) { + for { + select { + case x := <-ch: + if t, ok := x.(time.Time); ok { + refreshCh <- t + } else { + refreshCh <- time.Now() + } + case <-done: + return + } + } + }(s.refreshCh, s.ctx.Done()) } } @@ -56,30 +69,23 @@ func WithRenderDelay(ch <-chan struct{}) ContainerOption { } } -// WithShutdownNotifier provided chanel will be closed, after all bars -// have been rendered. -func WithShutdownNotifier(ch chan struct{}) ContainerOption { +// WithShutdownNotifier value of type `[]*mpb.Bar` will be send into provided +// channel upon container shutdown. +func WithShutdownNotifier(ch chan<- interface{}) ContainerOption { return func(s *pState) { - select { - case <-ch: - default: - s.shutdownNotifier = ch - } + s.shutdownNotifier = ch } } -// WithOutput overrides default os.Stdout output. Setting it to nil -// will effectively disable auto refresh rate and discard any output, -// useful if you want to disable progress bars with little overhead. +// WithOutput overrides default os.Stdout output. If underlying io.Writer +// is not a terminal then auto refresh is disabled unless WithAutoRefresh +// option is set. func WithOutput(w io.Writer) ContainerOption { - var discarded bool if w == nil { w = io.Discard - discarded = true } return func(s *pState) { s.output = w - s.outputDiscarded = discarded } } @@ -93,6 +99,14 @@ func WithDebugOutput(w io.Writer) ContainerOption { } } +// WithAutoRefresh force auto refresh regardless of what output is set to. +// Applicable only if not WithManualRefresh set. +func WithAutoRefresh() ContainerOption { + return func(s *pState) { + s.autoRefresh = true + } +} + // PopCompletedMode will pop completed bars to the top. // To stop rendering bar after it has been popped, use // mpb.BarRemoveOnComplete() option on that bar. diff --git a/vendor/github.com/vbauerster/mpb/v8/cwriter/writer.go b/vendor/github.com/vbauerster/mpb/v8/cwriter/writer.go index 35136de60b..23a72d3ec1 100644 --- a/vendor/github.com/vbauerster/mpb/v8/cwriter/writer.go +++ b/vendor/github.com/vbauerster/mpb/v8/cwriter/writer.go @@ -40,6 +40,11 @@ func New(out io.Writer) *Writer { return w } +// IsTerminal tells whether underlying io.Writer is terminal. +func (w *Writer) IsTerminal() bool { + return w.terminal +} + // GetTermSize returns WxH of underlying terminal. func (w *Writer) GetTermSize() (width, height int, err error) { return w.termSize(w.fd) diff --git a/vendor/github.com/vbauerster/mpb/v8/decor/decorator.go b/vendor/github.com/vbauerster/mpb/v8/decor/decorator.go index a43a139baf..e60d37e935 100644 --- a/vendor/github.com/vbauerster/mpb/v8/decor/decorator.go +++ b/vendor/github.com/vbauerster/mpb/v8/decor/decorator.go @@ -93,7 +93,7 @@ type Configurator interface { // it is necessary to implement this interface to retain functionality // of built-in Decorator. type Wrapper interface { - Base() Decorator + Unwrap() Decorator } // EwmaDecorator interface. @@ -113,7 +113,7 @@ type AverageDecorator interface { // If decorator needs to be notified once upon bar shutdown event, so // this is the right interface to implement. type ShutdownListener interface { - Shutdown() + OnShutdown() } // Global convenience instances of WC with sync width bit set. diff --git a/vendor/github.com/vbauerster/mpb/v8/decor/merge.go b/vendor/github.com/vbauerster/mpb/v8/decor/merge.go index cc9a512c6b..a30b785f91 100644 --- a/vendor/github.com/vbauerster/mpb/v8/decor/merge.go +++ b/vendor/github.com/vbauerster/mpb/v8/decor/merge.go @@ -63,7 +63,7 @@ func (d *mergeDecorator) Sync() (chan int, bool) { return d.wc.Sync() } -func (d *mergeDecorator) Base() Decorator { +func (d *mergeDecorator) Unwrap() Decorator { return d.Decorator } diff --git a/vendor/github.com/vbauerster/mpb/v8/decor/on_abort.go b/vendor/github.com/vbauerster/mpb/v8/decor/on_abort.go index f9a1197b52..e36a63cdaf 100644 --- a/vendor/github.com/vbauerster/mpb/v8/decor/on_abort.go +++ b/vendor/github.com/vbauerster/mpb/v8/decor/on_abort.go @@ -35,6 +35,6 @@ func (d *onAbortWrapper) Decor(s Statistics) string { return d.Decorator.Decor(s) } -func (d *onAbortWrapper) Base() Decorator { +func (d *onAbortWrapper) Unwrap() Decorator { return d.Decorator } diff --git a/vendor/github.com/vbauerster/mpb/v8/decor/on_complete.go b/vendor/github.com/vbauerster/mpb/v8/decor/on_complete.go index 663ec366de..837d0675a5 100644 --- a/vendor/github.com/vbauerster/mpb/v8/decor/on_complete.go +++ b/vendor/github.com/vbauerster/mpb/v8/decor/on_complete.go @@ -34,6 +34,6 @@ func (d *onCompleteWrapper) Decor(s Statistics) string { return d.Decorator.Decor(s) } -func (d *onCompleteWrapper) Base() Decorator { +func (d *onCompleteWrapper) Unwrap() Decorator { return d.Decorator } diff --git a/vendor/github.com/vbauerster/mpb/v8/heap_manager.go b/vendor/github.com/vbauerster/mpb/v8/heap_manager.go new file mode 100644 index 0000000000..678dd7c9f3 --- /dev/null +++ b/vendor/github.com/vbauerster/mpb/v8/heap_manager.go @@ -0,0 +1,171 @@ +package mpb + +import ( + "container/heap" +) + +type heapManager chan heapRequest + +type heapCmd int + +const ( + h_sync heapCmd = iota + h_push + h_iter + h_drain + h_fix + h_state + h_end +) + +type heapRequest struct { + cmd heapCmd + data interface{} +} + +type iterData struct { + iter chan<- *Bar + drop <-chan struct{} +} + +type pushData struct { + bar *Bar + sync bool +} + +type fixData struct { + bar *Bar + priority int +} + +func (m heapManager) run() { + var bHeap priorityQueue + var pMatrix, aMatrix map[int][]chan int + + var l int + var sync bool + + for req := range m { + switch req.cmd { + case h_push: + data := req.data.(pushData) + heap.Push(&bHeap, data.bar) + if !sync { + sync = data.sync + } + case h_sync: + if sync || l != bHeap.Len() { + pMatrix = make(map[int][]chan int) + aMatrix = make(map[int][]chan int) + for _, b := range bHeap { + table := b.wSyncTable() + for i, ch := range table[0] { + pMatrix[i] = append(pMatrix[i], ch) + } + for i, ch := range table[1] { + aMatrix[i] = append(aMatrix[i], ch) + } + } + sync = false + l = bHeap.Len() + } + drop := req.data.(<-chan struct{}) + syncWidth(pMatrix, drop) + syncWidth(aMatrix, drop) + case h_iter: + data := req.data.(iterData) + for _, b := range bHeap { + select { + case data.iter <- b: + case <-data.drop: + break + } + } + close(data.iter) + case h_drain: + data := req.data.(iterData) + for bHeap.Len() != 0 { + select { + case data.iter <- heap.Pop(&bHeap).(*Bar): + case <-data.drop: + break + } + } + close(data.iter) + case h_fix: + data := req.data.(fixData) + bar, priority := data.bar, data.priority + if bar.index < 0 { + break + } + bar.priority = priority + heap.Fix(&bHeap, bar.index) + case h_state: + ch := req.data.(chan<- bool) + ch <- sync || l != bHeap.Len() + case h_end: + ch := req.data.(chan<- interface{}) + if ch != nil { + go func() { + ch <- []*Bar(bHeap) + }() + } + close(m) + } + } +} + +func (m heapManager) sync(drop <-chan struct{}) { + m <- heapRequest{cmd: h_sync, data: drop} +} + +func (m heapManager) push(b *Bar, sync bool) { + data := pushData{b, sync} + m <- heapRequest{cmd: h_push, data: data} +} + +func (m heapManager) iter(iter chan<- *Bar, drop <-chan struct{}) { + data := iterData{iter, drop} + m <- heapRequest{cmd: h_iter, data: data} +} + +func (m heapManager) drain(iter chan<- *Bar, drop <-chan struct{}) { + data := iterData{iter, drop} + m <- heapRequest{cmd: h_drain, data: data} +} + +func (m heapManager) fix(b *Bar, priority int) { + data := fixData{b, priority} + m <- heapRequest{cmd: h_fix, data: data} +} + +func (m heapManager) state(ch chan<- bool) { + m <- heapRequest{cmd: h_state, data: ch} +} + +func (m heapManager) end(ch chan<- interface{}) { + m <- heapRequest{cmd: h_end, data: ch} +} + +func syncWidth(matrix map[int][]chan int, drop <-chan struct{}) { + for _, column := range matrix { + go maxWidthDistributor(column, drop) + } +} + +func maxWidthDistributor(column []chan int, drop <-chan struct{}) { + var maxWidth int + for _, ch := range column { + select { + case w := <-ch: + if w > maxWidth { + maxWidth = w + } + case <-drop: + return + } + } + for _, ch := range column { + ch <- maxWidth + } +} diff --git a/vendor/github.com/vbauerster/mpb/v8/priority_queue.go b/vendor/github.com/vbauerster/mpb/v8/priority_queue.go index 152482e9ac..e048e7fb24 100644 --- a/vendor/github.com/vbauerster/mpb/v8/priority_queue.go +++ b/vendor/github.com/vbauerster/mpb/v8/priority_queue.go @@ -6,7 +6,7 @@ type priorityQueue []*Bar func (pq priorityQueue) Len() int { return len(pq) } func (pq priorityQueue) Less(i, j int) bool { - // less priority pops first + // greater priority pops first return pq[i].priority > pq[j].priority } diff --git a/vendor/github.com/vbauerster/mpb/v8/progress.go b/vendor/github.com/vbauerster/mpb/v8/progress.go index 7f47e0dd47..1e3b41bbd2 100644 --- a/vendor/github.com/vbauerster/mpb/v8/progress.go +++ b/vendor/github.com/vbauerster/mpb/v8/progress.go @@ -2,7 +2,6 @@ package mpb import ( "bytes" - "container/heap" "context" "fmt" "io" @@ -12,6 +11,7 @@ import ( "time" "github.com/vbauerster/mpb/v8/cwriter" + "github.com/vbauerster/mpb/v8/decor" ) const ( @@ -23,39 +23,35 @@ var DoneError = fmt.Errorf("%T instance can't be reused after it's done!", (*Pro // Progress represents a container that renders one or more progress bars. type Progress struct { - ctx context.Context uwg *sync.WaitGroup - bwg *sync.WaitGroup + pwg, bwg sync.WaitGroup operateState chan func(*pState) - interceptIo chan func(io.Writer) - done chan struct{} - shutdown chan struct{} + interceptIO chan func(io.Writer) + done <-chan struct{} cancel func() } // pState holds bars in its priorityQueue, it gets passed to (*Progress).serve monitor goroutine. type pState struct { - bHeap priorityQueue - heapUpdated bool - pMatrix map[int][]chan int - aMatrix map[int][]chan int - rows []io.Reader + ctx context.Context + hm heapManager + dropS, dropD chan struct{} + refreshCh chan time.Time + idCount int + popPriority int // following are provided/overrided by user - refreshRate time.Duration - idCount int - reqWidth int - popPriority int - popCompleted bool - outputDiscarded bool - disableAutoRefresh bool - manualRefresh chan interface{} - renderDelay <-chan struct{} - shutdownNotifier chan struct{} - queueBars map[*Bar]*Bar - output io.Writer - debugOut io.Writer - uwg *sync.WaitGroup + refreshRate time.Duration + reqWidth int + popCompleted bool + manualRefresh bool + autoRefresh bool + renderDelay <-chan struct{} + shutdownNotifier chan<- interface{} + queueBars map[*Bar]*Bar + output io.Writer + debugOut io.Writer + uwg *sync.WaitGroup } // New creates new Progress container instance. It's not possible to @@ -68,14 +64,18 @@ func New(options ...ContainerOption) *Progress { // context. It's not possible to reuse instance after (*Progress).Wait // method has been called. func NewWithContext(ctx context.Context, options ...ContainerOption) *Progress { + ctx, cancel := context.WithCancel(ctx) s := &pState{ - rows: make([]io.Reader, 32), - refreshRate: defaultRefreshRate, - popPriority: math.MinInt32, - manualRefresh: make(chan interface{}), - queueBars: make(map[*Bar]*Bar), - output: os.Stdout, - debugOut: io.Discard, + ctx: ctx, + hm: make(heapManager), + dropS: make(chan struct{}), + dropD: make(chan struct{}), + refreshCh: make(chan time.Time), + refreshRate: defaultRefreshRate, + popPriority: math.MinInt32, + queueBars: make(map[*Bar]*Bar), + output: os.Stdout, + debugOut: io.Discard, } for _, opt := range options { @@ -84,25 +84,26 @@ func NewWithContext(ctx context.Context, options ...ContainerOption) *Progress { } } - ctx, cancel := context.WithCancel(ctx) + go s.hm.run() + + cw := cwriter.New(s.output) + if (cw.IsTerminal() || s.autoRefresh) && !s.manualRefresh { + s.autoRefresh = true + go s.newTicker(s.renderDelay != nil) + } else { + s.autoRefresh = false + } + p := &Progress{ - ctx: ctx, uwg: s.uwg, - bwg: new(sync.WaitGroup), operateState: make(chan func(*pState)), - interceptIo: make(chan func(io.Writer)), - done: make(chan struct{}), + interceptIO: make(chan func(io.Writer)), + done: ctx.Done(), cancel: cancel, } - if s.shutdownNotifier != nil { - p.shutdown = s.shutdownNotifier - s.shutdownNotifier = nil - } else { - p.shutdown = make(chan struct{}) - } - - go p.serve(s, cwriter.New(s.output)) + p.pwg.Add(1) + go p.serve(s, cw) return p } @@ -128,42 +129,52 @@ func (p *Progress) AddFiller(total int64, filler BarFiller, options ...BarOption if filler == nil { filler = NopStyle().Build() } - p.bwg.Add(1) - result := make(chan *Bar) + type result struct { + bar *Bar + bs *bState + } + ch := make(chan result) select { case p.operateState <- func(ps *pState) { bs := ps.makeBarState(total, filler, options...) - bar := newBar(p, bs) - if bs.wait.bar != nil { - ps.queueBars[bs.wait.bar] = bar + bar := newBar(ps.ctx, p, bs) + if bs.waitBar != nil { + ps.queueBars[bs.waitBar] = bar } else { - heap.Push(&ps.bHeap, bar) - ps.heapUpdated = true + ps.hm.push(bar, true) } ps.idCount++ - result <- bar + ch <- result{bar, bs} }: - bar := <-result + res := <-ch + bar, bs := res.bar, res.bs + bar.TraverseDecorators(func(d decor.Decorator) { + if d, ok := d.(decor.AverageDecorator); ok { + bs.averageDecorators = append(bs.averageDecorators, d) + } + if d, ok := d.(decor.EwmaDecorator); ok { + bs.ewmaDecorators = append(bs.ewmaDecorators, d) + } + if d, ok := d.(decor.ShutdownListener); ok { + bs.shutdownListeners = append(bs.shutdownListeners, d) + } + }) return bar case <-p.done: - p.bwg.Done() panic(DoneError) } } func (p *Progress) traverseBars(cb func(b *Bar) bool) { - sync := make(chan struct{}) + iter, drop := make(chan *Bar), make(chan struct{}) select { - case p.operateState <- func(s *pState) { - defer close(sync) - for i := 0; i < s.bHeap.Len(); i++ { - bar := s.bHeap[i] - if !cb(bar) { + case p.operateState <- func(s *pState) { s.hm.iter(iter, drop) }: + for b := range iter { + if cb(b) { + close(drop) break } } - }: - <-sync case <-p.done: } } @@ -171,28 +182,11 @@ func (p *Progress) traverseBars(cb func(b *Bar) bool) { // UpdateBarPriority same as *Bar.SetPriority(int). func (p *Progress) UpdateBarPriority(b *Bar, priority int) { select { - case p.operateState <- func(s *pState) { - if b.index < 0 { - return - } - b.priority = priority - heap.Fix(&s.bHeap, b.index) - }: + case p.operateState <- func(s *pState) { s.hm.fix(b, priority) }: case <-p.done: } } -// BarCount returns bars count. -func (p *Progress) BarCount() int { - result := make(chan int) - select { - case p.operateState <- func(s *pState) { result <- s.bHeap.Len() }: - return <-result - case <-p.done: - return 0 - } -} - // Write is implementation of io.Writer. // Writing to `*mpb.Progress` will print lines above a running bar. // Writes aren't flushed immediately, but at next refresh cycle. @@ -203,11 +197,11 @@ func (p *Progress) Write(b []byte) (int, error) { n int err error } - ch := make(chan *result) + ch := make(chan result) select { - case p.interceptIo <- func(w io.Writer) { + case p.interceptIO <- func(w io.Writer) { n, err := w.Write(b) - ch <- &result{n, err} + ch <- result{n, err} }: res := <-ch return res.n, res.err @@ -233,190 +227,160 @@ func (p *Progress) Wait() { // are doing. Proper way to shutdown is to call (*Progress).Wait() instead. func (p *Progress) Shutdown() { p.cancel() - <-p.shutdown -} - -func (p *Progress) newTicker(s *pState) chan time.Time { - ch := make(chan time.Time) - go func() { - var autoRefresh <-chan time.Time - if !s.disableAutoRefresh && !s.outputDiscarded { - if s.renderDelay != nil { - <-s.renderDelay - } - ticker := time.NewTicker(s.refreshRate) - defer ticker.Stop() - autoRefresh = ticker.C - } - for { - select { - case t := <-autoRefresh: - ch <- t - case x := <-s.manualRefresh: - if t, ok := x.(time.Time); ok { - ch <- t - } else { - ch <- time.Now() - } - case <-p.ctx.Done(): - close(p.done) - return - } - } - }() - return ch + p.pwg.Wait() } func (p *Progress) serve(s *pState, cw *cwriter.Writer) { - defer close(p.shutdown) - - render := func() error { - return s.render(cw) - } - - refreshCh := p.newTicker(s) + defer p.pwg.Done() + render := func() error { return s.render(cw) } + var err error for { select { case op := <-p.operateState: op(s) - case fn := <-p.interceptIo: + case fn := <-p.interceptIO: fn(cw) - case <-refreshCh: - err := render() - if err != nil { - s.heapUpdated = false - render = func() error { return nil } - _, _ = fmt.Fprintln(s.debugOut, err.Error()) + case <-s.refreshCh: + e := render() + if e != nil { p.cancel() // cancel all bars + render = func() error { return nil } + err = e } case <-p.done: - for s.heapUpdated { - err := render() - if err != nil { - _, _ = fmt.Fprintln(s.debugOut, err.Error()) - return + update := make(chan bool) + for s.autoRefresh && err == nil { + s.hm.state(update) + if <-update { + err = render() + } else { + break } } + if err != nil { + _, _ = fmt.Fprintln(s.debugOut, err.Error()) + } + s.hm.end(s.shutdownNotifier) return } } } -func (s *pState) render(cw *cwriter.Writer) error { - var wg sync.WaitGroup - if s.heapUpdated { - s.updateSyncMatrix() - s.heapUpdated = false +func (s *pState) newTicker(delay bool) { + if delay { + <-s.renderDelay } - syncWidth(&wg, s.pMatrix) - syncWidth(&wg, s.aMatrix) - - width, height, err := cw.GetTermSize() - if err != nil { - width = s.reqWidth - height = s.bHeap.Len() + ticker := time.NewTicker(s.refreshRate) + defer ticker.Stop() + for { + select { + case t := <-ticker.C: + s.refreshCh <- t + case <-s.ctx.Done(): + return + } } - for i := 0; i < s.bHeap.Len(); i++ { - bar := s.bHeap[i] - go bar.render(width) - } - - err = s.flush(&wg, cw, height) - wg.Wait() - return err } -func (s *pState) flush(wg *sync.WaitGroup, cw *cwriter.Writer, height int) error { - var popCount int - pool := make([]*Bar, 0, s.bHeap.Len()) - s.rows = s.rows[:0] +func (s *pState) render(cw *cwriter.Writer) (err error) { + s.hm.sync(s.dropS) + iter := make(chan *Bar) + go s.hm.iter(iter, s.dropS) - for s.bHeap.Len() > 0 { - b := heap.Pop(&s.bHeap).(*Bar) + var width, height int + if cw.IsTerminal() { + width, height, err = cw.GetTermSize() + if err != nil { + close(s.dropS) + return err + } + } else { + if s.reqWidth > 0 { + width = s.reqWidth + } else { + width = 100 + } + height = 100 + } + + for b := range iter { + go b.render(width) + } + + return s.flush(cw, height) +} + +func (s *pState) flush(cw *cwriter.Writer, height int) error { + wg := new(sync.WaitGroup) + defer wg.Wait() // waiting for all s.hm.push to complete + + var popCount int + var rows []io.Reader + + iter := make(chan *Bar) + s.hm.drain(iter, s.dropD) + + for b := range iter { frame := <-b.frameCh if frame.err != nil { - // b.frameCh is buffered it's ok to return here - return frame.err + close(s.dropD) + b.cancel() + return frame.err // b.frameCh is buffered it's ok to return here } var usedRows int for i := len(frame.rows) - 1; i >= 0; i-- { - if row := frame.rows[i]; len(s.rows) < height { - s.rows = append(s.rows, row) + if row := frame.rows[i]; len(rows) < height { + rows = append(rows, row) usedRows++ } else { - wg.Add(1) - go func() { - _, _ = io.Copy(io.Discard, row) - wg.Done() - }() + _, _ = io.Copy(io.Discard, row) } } - if frame.shutdown { - b.Wait() // waiting for b.done, so it's safe to read b.bs + if frame.shutdown != 0 && !frame.done { if qb, ok := s.queueBars[b]; ok { + b.cancel() delete(s.queueBars, b) qb.priority = b.priority - pool = append(pool, qb) - s.heapUpdated = true + wg.Add(1) + go func(b *Bar) { + s.hm.push(b, true) + wg.Done() + }(qb) continue } - if s.popCompleted && !b.bs.noPop { - switch b.bs.shutdown++; b.bs.shutdown { + if s.popCompleted && !frame.noPop { + switch frame.shutdown { case 1: b.priority = s.popPriority s.popPriority++ default: - if b.bs.dropOnComplete { - popCount += usedRows - s.heapUpdated = true - continue - } + b.cancel() + popCount += usedRows + continue } - } else if b.bs.dropOnComplete { - s.heapUpdated = true + } else if frame.rmOnComplete { + b.cancel() continue + } else { + b.cancel() } } - pool = append(pool, b) - } - - if len(pool) != 0 { wg.Add(1) - go func() { - for _, b := range pool { - heap.Push(&s.bHeap, b) - } + go func(b *Bar) { + s.hm.push(b, false) wg.Done() - }() + }(b) } - for i := len(s.rows) - 1; i >= 0; i-- { - _, err := cw.ReadFrom(s.rows[i]) + for i := len(rows) - 1; i >= 0; i-- { + _, err := cw.ReadFrom(rows[i]) if err != nil { return err } } - err := cw.Flush(len(s.rows) - popCount) - return err -} - -func (s *pState) updateSyncMatrix() { - s.pMatrix = make(map[int][]chan int) - s.aMatrix = make(map[int][]chan int) - for i := 0; i < s.bHeap.Len(); i++ { - bar := s.bHeap[i] - table := bar.wSyncTable() - - for i, ch := range table[0] { - s.pMatrix[i] = append(s.pMatrix[i], ch) - } - - for i, ch := range table[1] { - s.aMatrix[i] = append(s.aMatrix[i], ch) - } - } + return cw.Flush(len(rows) - popCount) } func (s *pState) makeBarState(total int64, filler BarFiller, options ...BarOption) *bState { @@ -426,6 +390,8 @@ func (s *pState) makeBarState(total int64, filler BarFiller, options ...BarOptio reqWidth: s.reqWidth, total: total, filler: filler, + refreshCh: s.refreshCh, + autoRefresh: s.autoRefresh, manualRefresh: s.manualRefresh, } @@ -439,36 +405,9 @@ func (s *pState) makeBarState(total int64, filler BarFiller, options ...BarOptio } } - if bs.middleware != nil { - bs.filler = bs.middleware(filler) - bs.middleware = nil - } - for i := 0; i < len(bs.buffers); i++ { bs.buffers[i] = bytes.NewBuffer(make([]byte, 0, 512)) } - bs.subscribeDecorators() - return bs } - -func syncWidth(wg *sync.WaitGroup, matrix map[int][]chan int) { - for _, column := range matrix { - wg.Add(1) - go maxWidthDistributor(wg, column) - } -} - -func maxWidthDistributor(wg *sync.WaitGroup, column []chan int) { - var maxWidth int - for _, ch := range column { - if w := <-ch; w > maxWidth { - maxWidth = w - } - } - for _, ch := range column { - ch <- maxWidth - } - wg.Done() -} diff --git a/vendor/modules.txt b/vendor/modules.txt index e341b524f9..098191fa92 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -867,7 +867,7 @@ github.com/ulikunitz/xz/lzma github.com/vbatts/tar-split/archive/tar github.com/vbatts/tar-split/tar/asm github.com/vbatts/tar-split/tar/storage -# github.com/vbauerster/mpb/v8 v8.1.6 +# github.com/vbauerster/mpb/v8 v8.2.0 ## explicit; go 1.17 github.com/vbauerster/mpb/v8 github.com/vbauerster/mpb/v8/cwriter