fix(deps): update module github.com/vbauerster/mpb/v8 to v8.9.1

Signed-off-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
This commit is contained in:
renovate[bot]
2025-01-07 19:11:33 +00:00
committed by GitHub
parent c7882f0ce2
commit c75e1c41d0
6 changed files with 71 additions and 68 deletions

2
go.mod
View File

@ -68,7 +68,7 @@ require (
github.com/spf13/cobra v1.8.1 github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5 github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.10.0 github.com/stretchr/testify v1.10.0
github.com/vbauerster/mpb/v8 v8.8.3 github.com/vbauerster/mpb/v8 v8.9.1
github.com/vishvananda/netlink v1.3.0 github.com/vishvananda/netlink v1.3.0
go.etcd.io/bbolt v1.3.11 go.etcd.io/bbolt v1.3.11
golang.org/x/crypto v0.31.0 golang.org/x/crypto v0.31.0

4
go.sum
View File

@ -515,8 +515,8 @@ github.com/ulikunitz/xz v0.5.12 h1:37Nm15o69RwBkXM0J6A5OlE67RZTfzUxTj8fB3dfcsc=
github.com/ulikunitz/xz v0.5.12/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14= github.com/ulikunitz/xz v0.5.12/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14=
github.com/vbatts/tar-split v0.11.6 h1:4SjTW5+PU11n6fZenf2IPoV8/tz3AaYHMWjf23envGs= github.com/vbatts/tar-split v0.11.6 h1:4SjTW5+PU11n6fZenf2IPoV8/tz3AaYHMWjf23envGs=
github.com/vbatts/tar-split v0.11.6/go.mod h1:dqKNtesIOr2j2Qv3W/cHjnvk9I8+G7oAkFDFN6TCBEI= github.com/vbatts/tar-split v0.11.6/go.mod h1:dqKNtesIOr2j2Qv3W/cHjnvk9I8+G7oAkFDFN6TCBEI=
github.com/vbauerster/mpb/v8 v8.8.3 h1:dTOByGoqwaTJYPubhVz3lO5O6MK553XVgUo33LdnNsQ= github.com/vbauerster/mpb/v8 v8.9.1 h1:LH5R3lXPfE2e3lIGxN7WNWv3Hl5nWO6LRi2B0L0ERHw=
github.com/vbauerster/mpb/v8 v8.8.3/go.mod h1:JfCCrtcMsJwP6ZwMn9e5LMnNyp3TVNpUWWkN+nd4EWk= github.com/vbauerster/mpb/v8 v8.9.1/go.mod h1:4XMvznPh8nfe2NpnDo1QTPvW9MVkUhbG90mPWvmOzcQ=
github.com/vishvananda/netlink v1.3.0 h1:X7l42GfcV4S6E4vHTsw48qbrV+9PVojNfIhZcwQdrZk= github.com/vishvananda/netlink v1.3.0 h1:X7l42GfcV4S6E4vHTsw48qbrV+9PVojNfIhZcwQdrZk=
github.com/vishvananda/netlink v1.3.0/go.mod h1:i6NetklAujEcC6fK0JPjT8qSwWyO0HLn4UKG+hGqeJs= github.com/vishvananda/netlink v1.3.0/go.mod h1:i6NetklAujEcC6fK0JPjT8qSwWyO0HLn4UKG+hGqeJs=
github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8= github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8=

View File

@ -30,6 +30,17 @@ func WithWidth(width int) ContainerOption {
} }
} }
// WithQueueLen sets buffer size of heap manager channel. Ideally it must be
// kept at MAX value, where MAX is number of bars to be rendered at the same
// time. If len < MAX then backpressure to the scheduler will be increased as
// MAX-len extra goroutines will be launched at each render cycle.
// Default queue len is 128.
func WithQueueLen(len int) ContainerOption {
return func(s *pState) {
s.hmQueueLen = len
}
}
// WithRefreshRate overrides default 150ms refresh rate. // WithRefreshRate overrides default 150ms refresh rate.
func WithRefreshRate(d time.Duration) ContainerOption { func WithRefreshRate(d time.Duration) ContainerOption {
return func(s *pState) { return func(s *pState) {

View File

@ -10,7 +10,6 @@ const (
h_sync heapCmd = iota h_sync heapCmd = iota
h_push h_push
h_iter h_iter
h_drain
h_fix h_fix
h_state h_state
h_end h_end
@ -22,8 +21,9 @@ type heapRequest struct {
} }
type iterData struct { type iterData struct {
iter chan<- *Bar drop <-chan struct{}
drop <-chan struct{} iter chan<- *Bar
iterPop chan<- *Bar
} }
type pushData struct { type pushData struct {
@ -41,7 +41,7 @@ func (m heapManager) run() {
var bHeap priorityQueue var bHeap priorityQueue
var pMatrix, aMatrix map[int][]chan int var pMatrix, aMatrix map[int][]chan int
var l int var len int
var sync bool var sync bool
for req := range m { for req := range m {
@ -49,11 +49,9 @@ func (m heapManager) run() {
case h_push: case h_push:
data := req.data.(pushData) data := req.data.(pushData)
heap.Push(&bHeap, data.bar) heap.Push(&bHeap, data.bar)
if !sync { sync = sync || data.sync
sync = data.sync
}
case h_sync: case h_sync:
if sync || l != bHeap.Len() { if sync || len != bHeap.Len() {
pMatrix = make(map[int][]chan int) pMatrix = make(map[int][]chan int)
aMatrix = make(map[int][]chan int) aMatrix = make(map[int][]chan int)
for _, b := range bHeap { for _, b := range bHeap {
@ -66,33 +64,37 @@ func (m heapManager) run() {
} }
} }
sync = false sync = false
l = bHeap.Len() len = bHeap.Len()
} }
drop := req.data.(<-chan struct{}) drop := req.data.(<-chan struct{})
syncWidth(pMatrix, drop) syncWidth(pMatrix, drop)
syncWidth(aMatrix, drop) syncWidth(aMatrix, drop)
case h_iter: case h_iter:
data := req.data.(iterData) data := req.data.(iterData)
drop_iter: loop: // unordered iteration
for _, b := range bHeap { for _, b := range bHeap {
select { select {
case data.iter <- b: case data.iter <- b:
case <-data.drop: case <-data.drop:
break drop_iter data.iterPop = nil
break loop
} }
} }
close(data.iter) close(data.iter)
case h_drain: if data.iterPop == nil {
data := req.data.(iterData) break
drop_drain: }
loop_pop: // ordered iteration
for bHeap.Len() != 0 { for bHeap.Len() != 0 {
bar := heap.Pop(&bHeap).(*Bar)
select { select {
case data.iter <- heap.Pop(&bHeap).(*Bar): case data.iterPop <- bar:
case <-data.drop: case <-data.drop:
break drop_drain heap.Push(&bHeap, bar)
break loop_pop
} }
} }
close(data.iter) close(data.iterPop)
case h_fix: case h_fix:
data := req.data.(fixData) data := req.data.(fixData)
if data.bar.index < 0 { if data.bar.index < 0 {
@ -104,7 +106,7 @@ func (m heapManager) run() {
} }
case h_state: case h_state:
ch := req.data.(chan<- bool) ch := req.data.(chan<- bool)
ch <- sync || l != bHeap.Len() ch <- sync || len != bHeap.Len()
case h_end: case h_end:
ch := req.data.(chan<- interface{}) ch := req.data.(chan<- interface{})
if ch != nil { if ch != nil {
@ -123,19 +125,21 @@ func (m heapManager) sync(drop <-chan struct{}) {
func (m heapManager) push(b *Bar, sync bool) { func (m heapManager) push(b *Bar, sync bool) {
data := pushData{b, sync} data := pushData{b, sync}
m <- heapRequest{cmd: h_push, data: data} req := heapRequest{cmd: h_push, data: data}
select {
case m <- req:
default:
go func() {
m <- req
}()
}
} }
func (m heapManager) iter(iter chan<- *Bar, drop <-chan struct{}) { func (m heapManager) iter(drop <-chan struct{}, iter, iterPop chan<- *Bar) {
data := iterData{iter, drop} data := iterData{drop, iter, iterPop}
m <- heapRequest{cmd: h_iter, data: data} m <- heapRequest{cmd: h_iter, data: data}
} }
func (m heapManager) drain(iter chan<- *Bar, drop <-chan struct{}) {
data := iterData{iter, drop}
m <- heapRequest{cmd: h_drain, data: data}
}
func (m heapManager) fix(b *Bar, priority int, lazy bool) { func (m heapManager) fix(b *Bar, priority int, lazy bool) {
data := fixData{b, priority, lazy} data := fixData{b, priority, lazy}
m <- heapRequest{cmd: h_fix, data: data} m <- heapRequest{cmd: h_fix, data: data}

View File

@ -15,6 +15,7 @@ import (
) )
const defaultRefreshRate = 150 * time.Millisecond const defaultRefreshRate = 150 * time.Millisecond
const defaultHmQueueLength = 128
// DoneError represents use after `(*Progress).Wait()` error. // DoneError represents use after `(*Progress).Wait()` error.
var DoneError = fmt.Errorf("%T instance can't be reused after %[1]T.Wait()", (*Progress)(nil)) var DoneError = fmt.Errorf("%T instance can't be reused after %[1]T.Wait()", (*Progress)(nil))
@ -31,16 +32,17 @@ type Progress struct {
// pState holds bars in its priorityQueue, it gets passed to (*Progress).serve monitor goroutine. // pState holds bars in its priorityQueue, it gets passed to (*Progress).serve monitor goroutine.
type pState struct { type pState struct {
ctx context.Context ctx context.Context
hm heapManager hm heapManager
dropS, dropD chan struct{} iterDrop chan struct{}
renderReq chan time.Time renderReq chan time.Time
idCount int idCount int
popPriority int popPriority int
// following are provided/overrided by user // following are provided/overrided by user
refreshRate time.Duration hmQueueLen int
reqWidth int reqWidth int
refreshRate time.Duration
popCompleted bool popCompleted bool
autoRefresh bool autoRefresh bool
delayRC <-chan struct{} delayRC <-chan struct{}
@ -68,9 +70,8 @@ func NewWithContext(ctx context.Context, options ...ContainerOption) *Progress {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
s := &pState{ s := &pState{
ctx: ctx, ctx: ctx,
hm: make(heapManager), hmQueueLen: defaultHmQueueLength,
dropS: make(chan struct{}), iterDrop: make(chan struct{}),
dropD: make(chan struct{}),
renderReq: make(chan time.Time), renderReq: make(chan time.Time),
popPriority: math.MinInt32, popPriority: math.MinInt32,
refreshRate: defaultRefreshRate, refreshRate: defaultRefreshRate,
@ -85,6 +86,8 @@ func NewWithContext(ctx context.Context, options ...ContainerOption) *Progress {
} }
} }
s.hm = make(heapManager, s.hmQueueLen)
p := &Progress{ p := &Progress{
uwg: s.uwg, uwg: s.uwg,
operateState: make(chan func(*pState)), operateState: make(chan func(*pState)),
@ -173,9 +176,9 @@ func (p *Progress) Add(total int64, filler BarFiller, options ...BarOption) (*Ba
} }
func (p *Progress) traverseBars(cb func(b *Bar) bool) { func (p *Progress) traverseBars(cb func(b *Bar) bool) {
iter, drop := make(chan *Bar), make(chan struct{}) drop, iter := make(chan struct{}), make(chan *Bar)
select { select {
case p.operateState <- func(s *pState) { s.hm.iter(iter, drop) }: case p.operateState <- func(s *pState) { s.hm.iter(drop, iter, nil) }:
for b := range iter { for b := range iter {
if !cb(b) { if !cb(b) {
close(drop) close(drop)
@ -333,15 +336,15 @@ func (s *pState) manualRefreshListener(done chan struct{}) {
} }
func (s *pState) render(cw *cwriter.Writer) (err error) { func (s *pState) render(cw *cwriter.Writer) (err error) {
s.hm.sync(s.dropS) iter, iterPop := make(chan *Bar), make(chan *Bar)
iter := make(chan *Bar) s.hm.sync(s.iterDrop)
go s.hm.iter(iter, s.dropS) s.hm.iter(s.iterDrop, iter, iterPop)
var width, height int var width, height int
if cw.IsTerminal() { if cw.IsTerminal() {
width, height, err = cw.GetTermSize() width, height, err = cw.GetTermSize()
if err != nil { if err != nil {
close(s.dropS) close(s.iterDrop)
return err return err
} }
} else { } else {
@ -357,23 +360,17 @@ func (s *pState) render(cw *cwriter.Writer) (err error) {
go b.render(width) go b.render(width)
} }
return s.flush(cw, height) return s.flush(cw, height, iterPop)
} }
func (s *pState) flush(cw *cwriter.Writer, height int) error { func (s *pState) flush(cw *cwriter.Writer, height int, iter <-chan *Bar) error {
var wg sync.WaitGroup
defer wg.Wait() // waiting for all s.push to complete
var popCount int var popCount int
var rows []io.Reader var rows []io.Reader
iter := make(chan *Bar)
s.hm.drain(iter, s.dropD)
for b := range iter { for b := range iter {
frame := <-b.frameCh frame := <-b.frameCh
if frame.err != nil { if frame.err != nil {
close(s.dropD) close(s.iterDrop)
b.cancel() b.cancel()
return frame.err // b.frameCh is buffered it's ok to return here return frame.err // b.frameCh is buffered it's ok to return here
} }
@ -393,16 +390,13 @@ func (s *pState) flush(cw *cwriter.Writer, height int) error {
if qb, ok := s.queueBars[b]; ok { if qb, ok := s.queueBars[b]; ok {
delete(s.queueBars, b) delete(s.queueBars, b)
qb.priority = b.priority qb.priority = b.priority
wg.Add(1) s.hm.push(qb, true)
go s.push(&wg, qb, true)
} else if s.popCompleted && !frame.noPop { } else if s.popCompleted && !frame.noPop {
b.priority = s.popPriority b.priority = s.popPriority
s.popPriority++ s.popPriority++
wg.Add(1) s.hm.push(b, false)
go s.push(&wg, b, false)
} else if !frame.rmOnComplete { } else if !frame.rmOnComplete {
wg.Add(1) s.hm.push(b, false)
go s.push(&wg, b, false)
} }
case 2: case 2:
if s.popCompleted && !frame.noPop { if s.popCompleted && !frame.noPop {
@ -411,8 +405,7 @@ func (s *pState) flush(cw *cwriter.Writer, height int) error {
} }
fallthrough fallthrough
default: default:
wg.Add(1) s.hm.push(b, false)
go s.push(&wg, b, false)
} }
} }
@ -426,11 +419,6 @@ func (s *pState) flush(cw *cwriter.Writer, height int) error {
return cw.Flush(len(rows) - popCount) return cw.Flush(len(rows) - popCount)
} }
func (s *pState) push(wg *sync.WaitGroup, b *Bar, sync bool) {
s.hm.push(b, sync)
wg.Done()
}
func (s pState) makeBarState(total int64, filler BarFiller, options ...BarOption) *bState { func (s pState) makeBarState(total int64, filler BarFiller, options ...BarOption) *bState {
bs := &bState{ bs := &bState{
id: s.idCount, id: s.idCount,

2
vendor/modules.txt vendored
View File

@ -1107,7 +1107,7 @@ github.com/ulikunitz/xz/lzma
github.com/vbatts/tar-split/archive/tar github.com/vbatts/tar-split/archive/tar
github.com/vbatts/tar-split/tar/asm github.com/vbatts/tar-split/tar/asm
github.com/vbatts/tar-split/tar/storage github.com/vbatts/tar-split/tar/storage
# github.com/vbauerster/mpb/v8 v8.8.3 # github.com/vbauerster/mpb/v8 v8.9.1
## explicit; go 1.17 ## explicit; go 1.17
github.com/vbauerster/mpb/v8 github.com/vbauerster/mpb/v8
github.com/vbauerster/mpb/v8/cwriter github.com/vbauerster/mpb/v8/cwriter