build(deps): bump github.com/vbauerster/mpb/v8 from 8.1.6 to 8.2.0

Bumps [github.com/vbauerster/mpb/v8](https://github.com/vbauerster/mpb) from 8.1.6 to 8.2.0.
- [Release notes](https://github.com/vbauerster/mpb/releases)
- [Commits](https://github.com/vbauerster/mpb/compare/v8.1.6...v8.2.0)

---
updated-dependencies:
- dependency-name: github.com/vbauerster/mpb/v8
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2023-02-20 12:20:18 +00:00
committed by GitHub
parent 37352a0c8c
commit 5056bd45f8
14 changed files with 459 additions and 344 deletions

2
go.mod
View File

@ -57,7 +57,7 @@ require (
github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635 github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635
github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/ulikunitz/xz v0.5.11 github.com/ulikunitz/xz v0.5.11
github.com/vbauerster/mpb/v8 v8.1.6 github.com/vbauerster/mpb/v8 v8.2.0
github.com/vishvananda/netlink v1.2.1-beta.2 github.com/vishvananda/netlink v1.2.1-beta.2
go.etcd.io/bbolt v1.3.7 go.etcd.io/bbolt v1.3.7
golang.org/x/net v0.7.0 golang.org/x/net v0.7.0

4
go.sum
View File

@ -1056,8 +1056,8 @@ github.com/urfave/cli v1.22.4/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX
github.com/urfave/cli v1.22.9/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli v1.22.9/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/vbatts/tar-split v0.11.2 h1:Via6XqJr0hceW4wff3QRzD5gAk/tatMw/4ZA7cTlIME= github.com/vbatts/tar-split v0.11.2 h1:Via6XqJr0hceW4wff3QRzD5gAk/tatMw/4ZA7cTlIME=
github.com/vbatts/tar-split v0.11.2/go.mod h1:vV3ZuO2yWSVsz+pfFzDG/upWH1JhjOiEaWq6kXyQ3VI= github.com/vbatts/tar-split v0.11.2/go.mod h1:vV3ZuO2yWSVsz+pfFzDG/upWH1JhjOiEaWq6kXyQ3VI=
github.com/vbauerster/mpb/v8 v8.1.6 h1:EswHDkAsy4OQ7QBAmU1MUPz4vHzl6KlINjlh7vJoxvY= github.com/vbauerster/mpb/v8 v8.2.0 h1:zaH0DaIcUoOeItZ/Yy567ZhaPUC3GMhUyHollQDgZvs=
github.com/vbauerster/mpb/v8 v8.1.6/go.mod h1:O9/Wl8X9dUbR63tZ41MLIAxrtNfwlpwUhGkeYugUPW8= github.com/vbauerster/mpb/v8 v8.2.0/go.mod h1:HEVcHNizbUIg0l4Qwhw0BDvg50zo3CMiWkbz1WUEQ94=
github.com/vishvananda/netlink v0.0.0-20181108222139-023a6dafdcdf/go.mod h1:+SR5DhBJrl6ZM7CoCKvpw5BKroDKQ+PJqOg65H/2ktk= github.com/vishvananda/netlink v0.0.0-20181108222139-023a6dafdcdf/go.mod h1:+SR5DhBJrl6ZM7CoCKvpw5BKroDKQ+PJqOg65H/2ktk=
github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE= github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE=
github.com/vishvananda/netlink v1.1.1-0.20201029203352-d40f9887b852/go.mod h1:twkDnbuQxJYemMlGd4JFIcuhgX83tXhKS2B/PRMpOho= github.com/vishvananda/netlink v1.1.1-0.20201029203352-d40f9887b852/go.mod h1:twkDnbuQxJYemMlGd4JFIcuhgX83tXhKS2B/PRMpOho=

View File

@ -41,8 +41,10 @@ type bState struct {
completed bool completed bool
aborted bool aborted bool
triggerComplete bool triggerComplete bool
dropOnComplete bool rmOnComplete bool
noPop bool noPop bool
autoRefresh bool
manualRefresh bool
aDecorators []decor.Decorator aDecorators []decor.Decorator
pDecorators []decor.Decorator pDecorators []decor.Decorator
averageDecorators []decor.AverageDecorator averageDecorators []decor.AverageDecorator
@ -50,24 +52,22 @@ type bState struct {
shutdownListeners []decor.ShutdownListener shutdownListeners []decor.ShutdownListener
buffers [3]*bytes.Buffer buffers [3]*bytes.Buffer
filler BarFiller filler BarFiller
middleware func(BarFiller) BarFiller
extender extenderFunc extender extenderFunc
manualRefresh chan interface{} refreshCh chan time.Time
waitBar *Bar // key for (*pState).queueBars
wait struct {
bar *Bar // key for (*pState).queueBars
sync bool
}
} }
type renderFrame struct { type renderFrame struct {
rows []io.Reader rows []io.Reader
shutdown bool shutdown int
err error rmOnComplete bool
noPop bool
done bool
err error
} }
func newBar(container *Progress, bs *bState) *Bar { func newBar(ctx context.Context, container *Progress, bs *bState) *Bar {
ctx, cancel := context.WithCancel(container.ctx) ctx, cancel := context.WithCancel(ctx)
bar := &Bar{ bar := &Bar{
priority: bs.priority, priority: bs.priority,
@ -78,6 +78,7 @@ func newBar(container *Progress, bs *bState) *Bar {
cancel: cancel, cancel: cancel,
} }
container.bwg.Add(1)
go bar.serve(ctx, bs) go bar.serve(ctx, bs)
return bar return bar
} }
@ -153,20 +154,22 @@ func (b *Bar) SetRefill(amount int64) {
// TraverseDecorators traverses all available decorators and calls cb func on each. // TraverseDecorators traverses all available decorators and calls cb func on each.
func (b *Bar) TraverseDecorators(cb func(decor.Decorator)) { func (b *Bar) TraverseDecorators(cb func(decor.Decorator)) {
sync := make(chan struct{}) iter := make(chan decor.Decorator)
select { select {
case b.operateState <- func(s *bState) { case b.operateState <- func(s *bState) {
defer close(sync)
for _, decorators := range [][]decor.Decorator{ for _, decorators := range [][]decor.Decorator{
s.pDecorators, s.pDecorators,
s.aDecorators, s.aDecorators,
} { } {
for _, d := range decorators { for _, d := range decorators {
cb(extractBaseDecorator(d)) iter <- d
} }
} }
close(iter)
}: }:
<-sync for d := range iter {
cb(unwrap(d))
}
case <-b.done: case <-b.done:
} }
} }
@ -185,7 +188,7 @@ func (b *Bar) EnableTriggerComplete() {
if s.current >= s.total { if s.current >= s.total {
s.current = s.total s.current = s.total
s.completed = true s.completed = true
b.forceRefresh(s.manualRefresh) b.triggerCompletion(s)
} else { } else {
s.triggerComplete = true s.triggerComplete = true
} }
@ -197,9 +200,9 @@ func (b *Bar) EnableTriggerComplete() {
// SetTotal sets total to an arbitrary value. It's effective only for // SetTotal sets total to an arbitrary value. It's effective only for
// bar which was constructed with `total <= 0`. Setting total to negative // bar which was constructed with `total <= 0`. Setting total to negative
// value is equivalent to (*Bar).SetTotal((*Bar).Current(), bool) but faster. // value is equivalent to (*Bar).SetTotal((*Bar).Current(), bool) but faster.
// If triggerCompleteNow is true, total value is set to current and // If triggerCompletion is true, total value is set to current and
// complete event is triggered right away. // complete event is triggered right away.
func (b *Bar) SetTotal(total int64, triggerCompleteNow bool) { func (b *Bar) SetTotal(total int64, triggerCompletion bool) {
select { select {
case b.operateState <- func(s *bState) { case b.operateState <- func(s *bState) {
if s.triggerComplete { if s.triggerComplete {
@ -210,10 +213,10 @@ func (b *Bar) SetTotal(total int64, triggerCompleteNow bool) {
} else { } else {
s.total = total s.total = total
} }
if triggerCompleteNow { if triggerCompletion {
s.current = s.total s.current = s.total
s.completed = true s.completed = true
b.forceRefresh(s.manualRefresh) b.triggerCompletion(s)
} }
}: }:
case <-b.done: case <-b.done:
@ -231,7 +234,7 @@ func (b *Bar) SetCurrent(current int64) {
if s.triggerComplete && s.current >= s.total { if s.triggerComplete && s.current >= s.total {
s.current = s.total s.current = s.total
s.completed = true s.completed = true
b.forceRefresh(s.manualRefresh) b.triggerCompletion(s)
} }
}: }:
case <-b.done: case <-b.done:
@ -253,7 +256,7 @@ func (b *Bar) EwmaSetCurrent(current int64, iterDur time.Duration) {
if s.triggerComplete && s.current >= s.total { if s.triggerComplete && s.current >= s.total {
s.current = s.total s.current = s.total
s.completed = true s.completed = true
b.forceRefresh(s.manualRefresh) b.triggerCompletion(s)
} }
}: }:
case <-b.done: case <-b.done:
@ -281,7 +284,7 @@ func (b *Bar) IncrInt64(n int64) {
if s.triggerComplete && s.current >= s.total { if s.triggerComplete && s.current >= s.total {
s.current = s.total s.current = s.total
s.completed = true s.completed = true
b.forceRefresh(s.manualRefresh) b.triggerCompletion(s)
} }
}: }:
case <-b.done: case <-b.done:
@ -311,7 +314,7 @@ func (b *Bar) EwmaIncrInt64(n int64, iterDur time.Duration) {
if s.triggerComplete && s.current >= s.total { if s.triggerComplete && s.current >= s.total {
s.current = s.total s.current = s.total
s.completed = true s.completed = true
b.forceRefresh(s.manualRefresh) b.triggerCompletion(s)
} }
}: }:
case <-b.done: case <-b.done:
@ -348,8 +351,8 @@ func (b *Bar) Abort(drop bool) {
return return
} }
s.aborted = true s.aborted = true
s.dropOnComplete = drop s.rmOnComplete = drop
b.forceRefresh(s.manualRefresh) b.triggerCompletion(s)
}: }:
case <-b.done: case <-b.done:
} }
@ -382,9 +385,7 @@ func (b *Bar) Completed() bool {
func (b *Bar) IsRunning() bool { func (b *Bar) IsRunning() bool {
result := make(chan bool) result := make(chan bool)
select { select {
case b.operateState <- func(s *bState) { case b.operateState <- func(s *bState) { result <- !s.completed && !s.aborted }:
result <- !s.completed && !s.aborted
}:
return <-result return <-result
case <-b.done: case <-b.done:
return false return false
@ -398,9 +399,6 @@ func (b *Bar) Wait() {
func (b *Bar) serve(ctx context.Context, bs *bState) { func (b *Bar) serve(ctx context.Context, bs *bState) {
defer b.container.bwg.Done() defer b.container.bwg.Done()
if bs.wait.bar != nil && bs.wait.sync {
bs.wait.bar.Wait()
}
for { for {
select { select {
case op := <-b.operateState: case op := <-b.operateState:
@ -433,10 +431,16 @@ func (b *Bar) render(tw int) {
return return
} }
} }
frame := &renderFrame{rows: rows} frame := &renderFrame{
rows: rows,
shutdown: s.shutdown,
rmOnComplete: s.rmOnComplete,
noPop: s.noPop,
done: done,
}
if s.completed || s.aborted { if s.completed || s.aborted {
frame.shutdown = !done || s.shutdown == 1 // post increment makes sure OnComplete decorators are rendered
b.cancel() s.shutdown++
} }
b.frameCh <- frame b.frameCh <- frame
} }
@ -448,17 +452,22 @@ func (b *Bar) render(tw int) {
} }
} }
func (b *Bar) forceRefresh(refreshCh chan interface{}) { func (b *Bar) triggerCompletion(s *bState) {
b.container.bwg.Add(1) if s.autoRefresh {
go b.forceRefreshImpl(refreshCh) // Technically this call isn't required, but if refresh rate is set to
// one hour for example and bar completes within a few minutes p.Wait()
// will wait for one hour. This call helps to avoid unnecessary waiting.
go b.tryEarlyRefresh(s.refreshCh)
} else if !s.manualRefresh {
b.cancel()
}
} }
func (b *Bar) forceRefreshImpl(refreshCh chan interface{}) { func (b *Bar) tryEarlyRefresh(refreshCh chan<- time.Time) {
defer b.container.bwg.Done()
var anyOtherRunning bool var anyOtherRunning bool
b.container.traverseBars(func(bar *Bar) bool { b.container.traverseBars(func(bar *Bar) bool {
anyOtherRunning = b != bar && bar.IsRunning() anyOtherRunning = b != bar && bar.IsRunning()
return !anyOtherRunning return anyOtherRunning
}) })
if !anyOtherRunning { if !anyOtherRunning {
for { for {
@ -584,26 +593,6 @@ func (s *bState) wSyncTable() (table syncTable) {
return table return table
} }
func (s *bState) subscribeDecorators() {
for _, decorators := range [][]decor.Decorator{
s.pDecorators,
s.aDecorators,
} {
for _, d := range decorators {
d = extractBaseDecorator(d)
if d, ok := d.(decor.AverageDecorator); ok {
s.averageDecorators = append(s.averageDecorators, d)
}
if d, ok := d.(decor.EwmaDecorator); ok {
s.ewmaDecorators = append(s.ewmaDecorators, d)
}
if d, ok := d.(decor.ShutdownListener); ok {
s.shutdownListeners = append(s.shutdownListeners, d)
}
}
}
}
func (s bState) ewmaUpdate(n int64, dur time.Duration) { func (s bState) ewmaUpdate(n int64, dur time.Duration) {
var wg sync.WaitGroup var wg sync.WaitGroup
for i := 0; i < len(s.ewmaDecorators); i++ { for i := 0; i < len(s.ewmaDecorators); i++ {
@ -643,11 +632,11 @@ func (s bState) decoratorShutdownNotify() {
for i := 0; i < len(s.shutdownListeners); i++ { for i := 0; i < len(s.shutdownListeners); i++ {
switch d := s.shutdownListeners[i]; i { switch d := s.shutdownListeners[i]; i {
case len(s.shutdownListeners) - 1: case len(s.shutdownListeners) - 1:
d.Shutdown() d.OnShutdown()
default: default:
wg.Add(1) wg.Add(1)
go func() { go func() {
d.Shutdown() d.OnShutdown()
wg.Done() wg.Done()
}() }()
} }
@ -668,9 +657,9 @@ func newStatistics(tw int, s *bState) decor.Statistics {
} }
} }
func extractBaseDecorator(d decor.Decorator) decor.Decorator { func unwrap(d decor.Decorator) decor.Decorator {
if d, ok := d.(decor.Wrapper); ok { if d, ok := d.(decor.Wrapper); ok {
return extractBaseDecorator(d.Base()) return unwrap(d.Unwrap())
} }
return d return d
} }

View File

@ -59,15 +59,9 @@ func BarWidth(width int) BarOption {
// BarQueueAfter puts this (being constructed) bar into the queue. // BarQueueAfter puts this (being constructed) bar into the queue.
// BarPriority will be inherited from the argument bar. // BarPriority will be inherited from the argument bar.
// When argument bar completes or aborts queued bar replaces its place. // When argument bar completes or aborts queued bar replaces its place.
// If sync is true queued bar is suspended until argument bar completes func BarQueueAfter(bar *Bar) BarOption {
// or aborts.
func BarQueueAfter(bar *Bar, sync bool) BarOption {
if bar == nil {
return nil
}
return func(s *bState) { return func(s *bState) {
s.wait.bar = bar s.waitBar = bar
s.wait.sync = sync
} }
} }
@ -75,7 +69,7 @@ func BarQueueAfter(bar *Bar, sync bool) BarOption {
// on complete event. // on complete event.
func BarRemoveOnComplete() BarOption { func BarRemoveOnComplete() BarOption {
return func(s *bState) { return func(s *bState) {
s.dropOnComplete = true s.rmOnComplete = true
} }
} }
@ -101,7 +95,10 @@ func BarFillerOnComplete(message string) BarOption {
// BarFillerMiddleware provides a way to augment the underlying BarFiller. // BarFillerMiddleware provides a way to augment the underlying BarFiller.
func BarFillerMiddleware(middle func(BarFiller) BarFiller) BarOption { func BarFillerMiddleware(middle func(BarFiller) BarFiller) BarOption {
return func(s *bState) { return func(s *bState) {
s.middleware = middle if middle == nil {
return
}
s.filler = middle(s.filler)
} }
} }

View File

@ -39,10 +39,23 @@ func WithRefreshRate(d time.Duration) ContainerOption {
// WithManualRefresh disables internal auto refresh time.Ticker. // WithManualRefresh disables internal auto refresh time.Ticker.
// Refresh will occur upon receive value from provided ch. // Refresh will occur upon receive value from provided ch.
func WithManualRefresh(ch chan interface{}) ContainerOption { func WithManualRefresh(ch <-chan interface{}) ContainerOption {
return func(s *pState) { return func(s *pState) {
s.manualRefresh = ch s.manualRefresh = true
s.disableAutoRefresh = true go func(refreshCh chan<- time.Time, done <-chan struct{}) {
for {
select {
case x := <-ch:
if t, ok := x.(time.Time); ok {
refreshCh <- t
} else {
refreshCh <- time.Now()
}
case <-done:
return
}
}
}(s.refreshCh, s.ctx.Done())
} }
} }
@ -56,30 +69,23 @@ func WithRenderDelay(ch <-chan struct{}) ContainerOption {
} }
} }
// WithShutdownNotifier provided chanel will be closed, after all bars // WithShutdownNotifier value of type `[]*mpb.Bar` will be send into provided
// have been rendered. // channel upon container shutdown.
func WithShutdownNotifier(ch chan struct{}) ContainerOption { func WithShutdownNotifier(ch chan<- interface{}) ContainerOption {
return func(s *pState) { return func(s *pState) {
select { s.shutdownNotifier = ch
case <-ch:
default:
s.shutdownNotifier = ch
}
} }
} }
// WithOutput overrides default os.Stdout output. Setting it to nil // WithOutput overrides default os.Stdout output. If underlying io.Writer
// will effectively disable auto refresh rate and discard any output, // is not a terminal then auto refresh is disabled unless WithAutoRefresh
// useful if you want to disable progress bars with little overhead. // option is set.
func WithOutput(w io.Writer) ContainerOption { func WithOutput(w io.Writer) ContainerOption {
var discarded bool
if w == nil { if w == nil {
w = io.Discard w = io.Discard
discarded = true
} }
return func(s *pState) { return func(s *pState) {
s.output = w s.output = w
s.outputDiscarded = discarded
} }
} }
@ -93,6 +99,14 @@ func WithDebugOutput(w io.Writer) ContainerOption {
} }
} }
// WithAutoRefresh force auto refresh regardless of what output is set to.
// Applicable only if not WithManualRefresh set.
func WithAutoRefresh() ContainerOption {
return func(s *pState) {
s.autoRefresh = true
}
}
// PopCompletedMode will pop completed bars to the top. // PopCompletedMode will pop completed bars to the top.
// To stop rendering bar after it has been popped, use // To stop rendering bar after it has been popped, use
// mpb.BarRemoveOnComplete() option on that bar. // mpb.BarRemoveOnComplete() option on that bar.

View File

@ -40,6 +40,11 @@ func New(out io.Writer) *Writer {
return w return w
} }
// IsTerminal tells whether underlying io.Writer is terminal.
func (w *Writer) IsTerminal() bool {
return w.terminal
}
// GetTermSize returns WxH of underlying terminal. // GetTermSize returns WxH of underlying terminal.
func (w *Writer) GetTermSize() (width, height int, err error) { func (w *Writer) GetTermSize() (width, height int, err error) {
return w.termSize(w.fd) return w.termSize(w.fd)

View File

@ -93,7 +93,7 @@ type Configurator interface {
// it is necessary to implement this interface to retain functionality // it is necessary to implement this interface to retain functionality
// of built-in Decorator. // of built-in Decorator.
type Wrapper interface { type Wrapper interface {
Base() Decorator Unwrap() Decorator
} }
// EwmaDecorator interface. // EwmaDecorator interface.
@ -113,7 +113,7 @@ type AverageDecorator interface {
// If decorator needs to be notified once upon bar shutdown event, so // If decorator needs to be notified once upon bar shutdown event, so
// this is the right interface to implement. // this is the right interface to implement.
type ShutdownListener interface { type ShutdownListener interface {
Shutdown() OnShutdown()
} }
// Global convenience instances of WC with sync width bit set. // Global convenience instances of WC with sync width bit set.

View File

@ -63,7 +63,7 @@ func (d *mergeDecorator) Sync() (chan int, bool) {
return d.wc.Sync() return d.wc.Sync()
} }
func (d *mergeDecorator) Base() Decorator { func (d *mergeDecorator) Unwrap() Decorator {
return d.Decorator return d.Decorator
} }

View File

@ -35,6 +35,6 @@ func (d *onAbortWrapper) Decor(s Statistics) string {
return d.Decorator.Decor(s) return d.Decorator.Decor(s)
} }
func (d *onAbortWrapper) Base() Decorator { func (d *onAbortWrapper) Unwrap() Decorator {
return d.Decorator return d.Decorator
} }

View File

@ -34,6 +34,6 @@ func (d *onCompleteWrapper) Decor(s Statistics) string {
return d.Decorator.Decor(s) return d.Decorator.Decor(s)
} }
func (d *onCompleteWrapper) Base() Decorator { func (d *onCompleteWrapper) Unwrap() Decorator {
return d.Decorator return d.Decorator
} }

171
vendor/github.com/vbauerster/mpb/v8/heap_manager.go generated vendored Normal file
View File

@ -0,0 +1,171 @@
package mpb
import (
"container/heap"
)
type heapManager chan heapRequest
type heapCmd int
const (
h_sync heapCmd = iota
h_push
h_iter
h_drain
h_fix
h_state
h_end
)
type heapRequest struct {
cmd heapCmd
data interface{}
}
type iterData struct {
iter chan<- *Bar
drop <-chan struct{}
}
type pushData struct {
bar *Bar
sync bool
}
type fixData struct {
bar *Bar
priority int
}
func (m heapManager) run() {
var bHeap priorityQueue
var pMatrix, aMatrix map[int][]chan int
var l int
var sync bool
for req := range m {
switch req.cmd {
case h_push:
data := req.data.(pushData)
heap.Push(&bHeap, data.bar)
if !sync {
sync = data.sync
}
case h_sync:
if sync || l != bHeap.Len() {
pMatrix = make(map[int][]chan int)
aMatrix = make(map[int][]chan int)
for _, b := range bHeap {
table := b.wSyncTable()
for i, ch := range table[0] {
pMatrix[i] = append(pMatrix[i], ch)
}
for i, ch := range table[1] {
aMatrix[i] = append(aMatrix[i], ch)
}
}
sync = false
l = bHeap.Len()
}
drop := req.data.(<-chan struct{})
syncWidth(pMatrix, drop)
syncWidth(aMatrix, drop)
case h_iter:
data := req.data.(iterData)
for _, b := range bHeap {
select {
case data.iter <- b:
case <-data.drop:
break
}
}
close(data.iter)
case h_drain:
data := req.data.(iterData)
for bHeap.Len() != 0 {
select {
case data.iter <- heap.Pop(&bHeap).(*Bar):
case <-data.drop:
break
}
}
close(data.iter)
case h_fix:
data := req.data.(fixData)
bar, priority := data.bar, data.priority
if bar.index < 0 {
break
}
bar.priority = priority
heap.Fix(&bHeap, bar.index)
case h_state:
ch := req.data.(chan<- bool)
ch <- sync || l != bHeap.Len()
case h_end:
ch := req.data.(chan<- interface{})
if ch != nil {
go func() {
ch <- []*Bar(bHeap)
}()
}
close(m)
}
}
}
func (m heapManager) sync(drop <-chan struct{}) {
m <- heapRequest{cmd: h_sync, data: drop}
}
func (m heapManager) push(b *Bar, sync bool) {
data := pushData{b, sync}
m <- heapRequest{cmd: h_push, data: data}
}
func (m heapManager) iter(iter chan<- *Bar, drop <-chan struct{}) {
data := iterData{iter, drop}
m <- heapRequest{cmd: h_iter, data: data}
}
func (m heapManager) drain(iter chan<- *Bar, drop <-chan struct{}) {
data := iterData{iter, drop}
m <- heapRequest{cmd: h_drain, data: data}
}
func (m heapManager) fix(b *Bar, priority int) {
data := fixData{b, priority}
m <- heapRequest{cmd: h_fix, data: data}
}
func (m heapManager) state(ch chan<- bool) {
m <- heapRequest{cmd: h_state, data: ch}
}
func (m heapManager) end(ch chan<- interface{}) {
m <- heapRequest{cmd: h_end, data: ch}
}
func syncWidth(matrix map[int][]chan int, drop <-chan struct{}) {
for _, column := range matrix {
go maxWidthDistributor(column, drop)
}
}
func maxWidthDistributor(column []chan int, drop <-chan struct{}) {
var maxWidth int
for _, ch := range column {
select {
case w := <-ch:
if w > maxWidth {
maxWidth = w
}
case <-drop:
return
}
}
for _, ch := range column {
ch <- maxWidth
}
}

View File

@ -6,7 +6,7 @@ type priorityQueue []*Bar
func (pq priorityQueue) Len() int { return len(pq) } func (pq priorityQueue) Len() int { return len(pq) }
func (pq priorityQueue) Less(i, j int) bool { func (pq priorityQueue) Less(i, j int) bool {
// less priority pops first // greater priority pops first
return pq[i].priority > pq[j].priority return pq[i].priority > pq[j].priority
} }

View File

@ -2,7 +2,6 @@ package mpb
import ( import (
"bytes" "bytes"
"container/heap"
"context" "context"
"fmt" "fmt"
"io" "io"
@ -12,6 +11,7 @@ import (
"time" "time"
"github.com/vbauerster/mpb/v8/cwriter" "github.com/vbauerster/mpb/v8/cwriter"
"github.com/vbauerster/mpb/v8/decor"
) )
const ( const (
@ -23,39 +23,35 @@ var DoneError = fmt.Errorf("%T instance can't be reused after it's done!", (*Pro
// Progress represents a container that renders one or more progress bars. // Progress represents a container that renders one or more progress bars.
type Progress struct { type Progress struct {
ctx context.Context
uwg *sync.WaitGroup uwg *sync.WaitGroup
bwg *sync.WaitGroup pwg, bwg sync.WaitGroup
operateState chan func(*pState) operateState chan func(*pState)
interceptIo chan func(io.Writer) interceptIO chan func(io.Writer)
done chan struct{} done <-chan struct{}
shutdown chan struct{}
cancel func() cancel func()
} }
// pState holds bars in its priorityQueue, it gets passed to (*Progress).serve monitor goroutine. // pState holds bars in its priorityQueue, it gets passed to (*Progress).serve monitor goroutine.
type pState struct { type pState struct {
bHeap priorityQueue ctx context.Context
heapUpdated bool hm heapManager
pMatrix map[int][]chan int dropS, dropD chan struct{}
aMatrix map[int][]chan int refreshCh chan time.Time
rows []io.Reader idCount int
popPriority int
// following are provided/overrided by user // following are provided/overrided by user
refreshRate time.Duration refreshRate time.Duration
idCount int reqWidth int
reqWidth int popCompleted bool
popPriority int manualRefresh bool
popCompleted bool autoRefresh bool
outputDiscarded bool renderDelay <-chan struct{}
disableAutoRefresh bool shutdownNotifier chan<- interface{}
manualRefresh chan interface{} queueBars map[*Bar]*Bar
renderDelay <-chan struct{} output io.Writer
shutdownNotifier chan struct{} debugOut io.Writer
queueBars map[*Bar]*Bar uwg *sync.WaitGroup
output io.Writer
debugOut io.Writer
uwg *sync.WaitGroup
} }
// New creates new Progress container instance. It's not possible to // New creates new Progress container instance. It's not possible to
@ -68,14 +64,18 @@ func New(options ...ContainerOption) *Progress {
// context. It's not possible to reuse instance after (*Progress).Wait // context. It's not possible to reuse instance after (*Progress).Wait
// method has been called. // method has been called.
func NewWithContext(ctx context.Context, options ...ContainerOption) *Progress { func NewWithContext(ctx context.Context, options ...ContainerOption) *Progress {
ctx, cancel := context.WithCancel(ctx)
s := &pState{ s := &pState{
rows: make([]io.Reader, 32), ctx: ctx,
refreshRate: defaultRefreshRate, hm: make(heapManager),
popPriority: math.MinInt32, dropS: make(chan struct{}),
manualRefresh: make(chan interface{}), dropD: make(chan struct{}),
queueBars: make(map[*Bar]*Bar), refreshCh: make(chan time.Time),
output: os.Stdout, refreshRate: defaultRefreshRate,
debugOut: io.Discard, popPriority: math.MinInt32,
queueBars: make(map[*Bar]*Bar),
output: os.Stdout,
debugOut: io.Discard,
} }
for _, opt := range options { for _, opt := range options {
@ -84,25 +84,26 @@ func NewWithContext(ctx context.Context, options ...ContainerOption) *Progress {
} }
} }
ctx, cancel := context.WithCancel(ctx) go s.hm.run()
cw := cwriter.New(s.output)
if (cw.IsTerminal() || s.autoRefresh) && !s.manualRefresh {
s.autoRefresh = true
go s.newTicker(s.renderDelay != nil)
} else {
s.autoRefresh = false
}
p := &Progress{ p := &Progress{
ctx: ctx,
uwg: s.uwg, uwg: s.uwg,
bwg: new(sync.WaitGroup),
operateState: make(chan func(*pState)), operateState: make(chan func(*pState)),
interceptIo: make(chan func(io.Writer)), interceptIO: make(chan func(io.Writer)),
done: make(chan struct{}), done: ctx.Done(),
cancel: cancel, cancel: cancel,
} }
if s.shutdownNotifier != nil { p.pwg.Add(1)
p.shutdown = s.shutdownNotifier go p.serve(s, cw)
s.shutdownNotifier = nil
} else {
p.shutdown = make(chan struct{})
}
go p.serve(s, cwriter.New(s.output))
return p return p
} }
@ -128,42 +129,52 @@ func (p *Progress) AddFiller(total int64, filler BarFiller, options ...BarOption
if filler == nil { if filler == nil {
filler = NopStyle().Build() filler = NopStyle().Build()
} }
p.bwg.Add(1) type result struct {
result := make(chan *Bar) bar *Bar
bs *bState
}
ch := make(chan result)
select { select {
case p.operateState <- func(ps *pState) { case p.operateState <- func(ps *pState) {
bs := ps.makeBarState(total, filler, options...) bs := ps.makeBarState(total, filler, options...)
bar := newBar(p, bs) bar := newBar(ps.ctx, p, bs)
if bs.wait.bar != nil { if bs.waitBar != nil {
ps.queueBars[bs.wait.bar] = bar ps.queueBars[bs.waitBar] = bar
} else { } else {
heap.Push(&ps.bHeap, bar) ps.hm.push(bar, true)
ps.heapUpdated = true
} }
ps.idCount++ ps.idCount++
result <- bar ch <- result{bar, bs}
}: }:
bar := <-result res := <-ch
bar, bs := res.bar, res.bs
bar.TraverseDecorators(func(d decor.Decorator) {
if d, ok := d.(decor.AverageDecorator); ok {
bs.averageDecorators = append(bs.averageDecorators, d)
}
if d, ok := d.(decor.EwmaDecorator); ok {
bs.ewmaDecorators = append(bs.ewmaDecorators, d)
}
if d, ok := d.(decor.ShutdownListener); ok {
bs.shutdownListeners = append(bs.shutdownListeners, d)
}
})
return bar return bar
case <-p.done: case <-p.done:
p.bwg.Done()
panic(DoneError) panic(DoneError)
} }
} }
func (p *Progress) traverseBars(cb func(b *Bar) bool) { func (p *Progress) traverseBars(cb func(b *Bar) bool) {
sync := make(chan struct{}) iter, drop := make(chan *Bar), make(chan struct{})
select { select {
case p.operateState <- func(s *pState) { case p.operateState <- func(s *pState) { s.hm.iter(iter, drop) }:
defer close(sync) for b := range iter {
for i := 0; i < s.bHeap.Len(); i++ { if cb(b) {
bar := s.bHeap[i] close(drop)
if !cb(bar) {
break break
} }
} }
}:
<-sync
case <-p.done: case <-p.done:
} }
} }
@ -171,28 +182,11 @@ func (p *Progress) traverseBars(cb func(b *Bar) bool) {
// UpdateBarPriority same as *Bar.SetPriority(int). // UpdateBarPriority same as *Bar.SetPriority(int).
func (p *Progress) UpdateBarPriority(b *Bar, priority int) { func (p *Progress) UpdateBarPriority(b *Bar, priority int) {
select { select {
case p.operateState <- func(s *pState) { case p.operateState <- func(s *pState) { s.hm.fix(b, priority) }:
if b.index < 0 {
return
}
b.priority = priority
heap.Fix(&s.bHeap, b.index)
}:
case <-p.done: case <-p.done:
} }
} }
// BarCount returns bars count.
func (p *Progress) BarCount() int {
result := make(chan int)
select {
case p.operateState <- func(s *pState) { result <- s.bHeap.Len() }:
return <-result
case <-p.done:
return 0
}
}
// Write is implementation of io.Writer. // Write is implementation of io.Writer.
// Writing to `*mpb.Progress` will print lines above a running bar. // Writing to `*mpb.Progress` will print lines above a running bar.
// Writes aren't flushed immediately, but at next refresh cycle. // Writes aren't flushed immediately, but at next refresh cycle.
@ -203,11 +197,11 @@ func (p *Progress) Write(b []byte) (int, error) {
n int n int
err error err error
} }
ch := make(chan *result) ch := make(chan result)
select { select {
case p.interceptIo <- func(w io.Writer) { case p.interceptIO <- func(w io.Writer) {
n, err := w.Write(b) n, err := w.Write(b)
ch <- &result{n, err} ch <- result{n, err}
}: }:
res := <-ch res := <-ch
return res.n, res.err return res.n, res.err
@ -233,190 +227,160 @@ func (p *Progress) Wait() {
// are doing. Proper way to shutdown is to call (*Progress).Wait() instead. // are doing. Proper way to shutdown is to call (*Progress).Wait() instead.
func (p *Progress) Shutdown() { func (p *Progress) Shutdown() {
p.cancel() p.cancel()
<-p.shutdown p.pwg.Wait()
}
func (p *Progress) newTicker(s *pState) chan time.Time {
ch := make(chan time.Time)
go func() {
var autoRefresh <-chan time.Time
if !s.disableAutoRefresh && !s.outputDiscarded {
if s.renderDelay != nil {
<-s.renderDelay
}
ticker := time.NewTicker(s.refreshRate)
defer ticker.Stop()
autoRefresh = ticker.C
}
for {
select {
case t := <-autoRefresh:
ch <- t
case x := <-s.manualRefresh:
if t, ok := x.(time.Time); ok {
ch <- t
} else {
ch <- time.Now()
}
case <-p.ctx.Done():
close(p.done)
return
}
}
}()
return ch
} }
func (p *Progress) serve(s *pState, cw *cwriter.Writer) { func (p *Progress) serve(s *pState, cw *cwriter.Writer) {
defer close(p.shutdown) defer p.pwg.Done()
render := func() error { return s.render(cw) }
render := func() error { var err error
return s.render(cw)
}
refreshCh := p.newTicker(s)
for { for {
select { select {
case op := <-p.operateState: case op := <-p.operateState:
op(s) op(s)
case fn := <-p.interceptIo: case fn := <-p.interceptIO:
fn(cw) fn(cw)
case <-refreshCh: case <-s.refreshCh:
err := render() e := render()
if err != nil { if e != nil {
s.heapUpdated = false
render = func() error { return nil }
_, _ = fmt.Fprintln(s.debugOut, err.Error())
p.cancel() // cancel all bars p.cancel() // cancel all bars
render = func() error { return nil }
err = e
} }
case <-p.done: case <-p.done:
for s.heapUpdated { update := make(chan bool)
err := render() for s.autoRefresh && err == nil {
if err != nil { s.hm.state(update)
_, _ = fmt.Fprintln(s.debugOut, err.Error()) if <-update {
return err = render()
} else {
break
} }
} }
if err != nil {
_, _ = fmt.Fprintln(s.debugOut, err.Error())
}
s.hm.end(s.shutdownNotifier)
return return
} }
} }
} }
func (s *pState) render(cw *cwriter.Writer) error { func (s *pState) newTicker(delay bool) {
var wg sync.WaitGroup if delay {
if s.heapUpdated { <-s.renderDelay
s.updateSyncMatrix()
s.heapUpdated = false
} }
syncWidth(&wg, s.pMatrix) ticker := time.NewTicker(s.refreshRate)
syncWidth(&wg, s.aMatrix) defer ticker.Stop()
for {
width, height, err := cw.GetTermSize() select {
if err != nil { case t := <-ticker.C:
width = s.reqWidth s.refreshCh <- t
height = s.bHeap.Len() case <-s.ctx.Done():
return
}
} }
for i := 0; i < s.bHeap.Len(); i++ {
bar := s.bHeap[i]
go bar.render(width)
}
err = s.flush(&wg, cw, height)
wg.Wait()
return err
} }
func (s *pState) flush(wg *sync.WaitGroup, cw *cwriter.Writer, height int) error { func (s *pState) render(cw *cwriter.Writer) (err error) {
var popCount int s.hm.sync(s.dropS)
pool := make([]*Bar, 0, s.bHeap.Len()) iter := make(chan *Bar)
s.rows = s.rows[:0] go s.hm.iter(iter, s.dropS)
for s.bHeap.Len() > 0 { var width, height int
b := heap.Pop(&s.bHeap).(*Bar) if cw.IsTerminal() {
width, height, err = cw.GetTermSize()
if err != nil {
close(s.dropS)
return err
}
} else {
if s.reqWidth > 0 {
width = s.reqWidth
} else {
width = 100
}
height = 100
}
for b := range iter {
go b.render(width)
}
return s.flush(cw, height)
}
func (s *pState) flush(cw *cwriter.Writer, height int) error {
wg := new(sync.WaitGroup)
defer wg.Wait() // waiting for all s.hm.push to complete
var popCount int
var rows []io.Reader
iter := make(chan *Bar)
s.hm.drain(iter, s.dropD)
for b := range iter {
frame := <-b.frameCh frame := <-b.frameCh
if frame.err != nil { if frame.err != nil {
// b.frameCh is buffered it's ok to return here close(s.dropD)
return frame.err b.cancel()
return frame.err // b.frameCh is buffered it's ok to return here
} }
var usedRows int var usedRows int
for i := len(frame.rows) - 1; i >= 0; i-- { for i := len(frame.rows) - 1; i >= 0; i-- {
if row := frame.rows[i]; len(s.rows) < height { if row := frame.rows[i]; len(rows) < height {
s.rows = append(s.rows, row) rows = append(rows, row)
usedRows++ usedRows++
} else { } else {
wg.Add(1) _, _ = io.Copy(io.Discard, row)
go func() {
_, _ = io.Copy(io.Discard, row)
wg.Done()
}()
} }
} }
if frame.shutdown { if frame.shutdown != 0 && !frame.done {
b.Wait() // waiting for b.done, so it's safe to read b.bs
if qb, ok := s.queueBars[b]; ok { if qb, ok := s.queueBars[b]; ok {
b.cancel()
delete(s.queueBars, b) delete(s.queueBars, b)
qb.priority = b.priority qb.priority = b.priority
pool = append(pool, qb) wg.Add(1)
s.heapUpdated = true go func(b *Bar) {
s.hm.push(b, true)
wg.Done()
}(qb)
continue continue
} }
if s.popCompleted && !b.bs.noPop { if s.popCompleted && !frame.noPop {
switch b.bs.shutdown++; b.bs.shutdown { switch frame.shutdown {
case 1: case 1:
b.priority = s.popPriority b.priority = s.popPriority
s.popPriority++ s.popPriority++
default: default:
if b.bs.dropOnComplete { b.cancel()
popCount += usedRows popCount += usedRows
s.heapUpdated = true continue
continue
}
} }
} else if b.bs.dropOnComplete { } else if frame.rmOnComplete {
s.heapUpdated = true b.cancel()
continue continue
} else {
b.cancel()
} }
} }
pool = append(pool, b)
}
if len(pool) != 0 {
wg.Add(1) wg.Add(1)
go func() { go func(b *Bar) {
for _, b := range pool { s.hm.push(b, false)
heap.Push(&s.bHeap, b)
}
wg.Done() wg.Done()
}() }(b)
} }
for i := len(s.rows) - 1; i >= 0; i-- { for i := len(rows) - 1; i >= 0; i-- {
_, err := cw.ReadFrom(s.rows[i]) _, err := cw.ReadFrom(rows[i])
if err != nil { if err != nil {
return err return err
} }
} }
err := cw.Flush(len(s.rows) - popCount) return cw.Flush(len(rows) - popCount)
return err
}
func (s *pState) updateSyncMatrix() {
s.pMatrix = make(map[int][]chan int)
s.aMatrix = make(map[int][]chan int)
for i := 0; i < s.bHeap.Len(); i++ {
bar := s.bHeap[i]
table := bar.wSyncTable()
for i, ch := range table[0] {
s.pMatrix[i] = append(s.pMatrix[i], ch)
}
for i, ch := range table[1] {
s.aMatrix[i] = append(s.aMatrix[i], ch)
}
}
} }
func (s *pState) makeBarState(total int64, filler BarFiller, options ...BarOption) *bState { func (s *pState) makeBarState(total int64, filler BarFiller, options ...BarOption) *bState {
@ -426,6 +390,8 @@ func (s *pState) makeBarState(total int64, filler BarFiller, options ...BarOptio
reqWidth: s.reqWidth, reqWidth: s.reqWidth,
total: total, total: total,
filler: filler, filler: filler,
refreshCh: s.refreshCh,
autoRefresh: s.autoRefresh,
manualRefresh: s.manualRefresh, manualRefresh: s.manualRefresh,
} }
@ -439,36 +405,9 @@ func (s *pState) makeBarState(total int64, filler BarFiller, options ...BarOptio
} }
} }
if bs.middleware != nil {
bs.filler = bs.middleware(filler)
bs.middleware = nil
}
for i := 0; i < len(bs.buffers); i++ { for i := 0; i < len(bs.buffers); i++ {
bs.buffers[i] = bytes.NewBuffer(make([]byte, 0, 512)) bs.buffers[i] = bytes.NewBuffer(make([]byte, 0, 512))
} }
bs.subscribeDecorators()
return bs return bs
} }
func syncWidth(wg *sync.WaitGroup, matrix map[int][]chan int) {
for _, column := range matrix {
wg.Add(1)
go maxWidthDistributor(wg, column)
}
}
func maxWidthDistributor(wg *sync.WaitGroup, column []chan int) {
var maxWidth int
for _, ch := range column {
if w := <-ch; w > maxWidth {
maxWidth = w
}
}
for _, ch := range column {
ch <- maxWidth
}
wg.Done()
}

2
vendor/modules.txt vendored
View File

@ -867,7 +867,7 @@ github.com/ulikunitz/xz/lzma
github.com/vbatts/tar-split/archive/tar github.com/vbatts/tar-split/archive/tar
github.com/vbatts/tar-split/tar/asm github.com/vbatts/tar-split/tar/asm
github.com/vbatts/tar-split/tar/storage github.com/vbatts/tar-split/tar/storage
# github.com/vbauerster/mpb/v8 v8.1.6 # github.com/vbauerster/mpb/v8 v8.2.0
## explicit; go 1.17 ## explicit; go 1.17
github.com/vbauerster/mpb/v8 github.com/vbauerster/mpb/v8
github.com/vbauerster/mpb/v8/cwriter github.com/vbauerster/mpb/v8/cwriter