Update module github.com/vbauerster/mpb/v8 to v8.11.1

Signed-off-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
This commit is contained in:
renovate[bot]
2025-11-07 10:36:01 +00:00
committed by GitHub
parent 905721cae5
commit 6493343ddc
47 changed files with 2269 additions and 16492 deletions

View File

@@ -26,7 +26,7 @@ type Bar struct {
cancel func()
}
type syncTable [2][]chan int
type decorSyncTable [2][]*decor.Sync
type extenderFunc func(decor.Statistics, ...io.Reader) ([]io.Reader, error)
// bState is actual bar's state.
@@ -466,8 +466,8 @@ func (b *Bar) tryEarlyRefresh(renderReq chan<- time.Time) {
}
}
func (b *Bar) wSyncTable() syncTable {
result := make(chan syncTable)
func (b *Bar) wSyncTable() decorSyncTable {
result := make(chan decorSyncTable)
select {
case b.operateState <- func(s *bState) { result <- s.wSyncTable() }:
return <-result
@@ -530,14 +530,14 @@ func (s *bState) draw(stat decor.Statistics) (_ io.Reader, err error) {
), nil
}
func (s *bState) wSyncTable() (table syncTable) {
func (s *bState) wSyncTable() (table decorSyncTable) {
var start int
var row []chan int
var row []*decor.Sync
for i, group := range s.decorGroups {
for _, d := range group {
if ch, ok := d.Sync(); ok {
row = append(row, ch)
if s, ok := d.Sync(); ok {
row = append(row, s)
}
}
table[i], start = row[start:], len(row)

38
vendor/github.com/vbauerster/mpb/v8/bar_heap.go generated vendored Normal file
View File

@@ -0,0 +1,38 @@
package mpb
import "container/heap"
var _ heap.Interface = (*barHeap)(nil)
type barHeap []*Bar
func (s barHeap) Len() int { return len(s) }
// it's a reversed Less same as sort.Reverse(sort.Interface) would do
// becasuse we need greater priority item to pop first
func (s barHeap) Less(i, j int) bool {
return s[j].priority < s[i].priority
}
func (s barHeap) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
s[i].index = i
s[j].index = j
}
func (h *barHeap) Push(x interface{}) {
s := *h
b := x.(*Bar)
b.index = len(s)
*h = append(s, b)
}
func (h *barHeap) Pop() interface{} {
var b *Bar
s := *h
i := s.Len() - 1
b, s[i] = s[i], nil // nil to avoid memory leak
b.index = -1 // for safety
*h = s[:i]
return b
}

View File

@@ -66,8 +66,8 @@ func WithRenderDelay(ch <-chan struct{}) ContainerOption {
}
}
// WithShutdownNotifier value of type `[]*mpb.Bar` will be send into provided
// channel upon container shutdown.
// WithShutdownNotifier value of type `[]*mpb.Bar` will be send to provided channel
// on shutdown event, i.e. after `(*Progress) Wait()` or `(*Progress) Shutdown()` call.
func WithShutdownNotifier(ch chan<- interface{}) ContainerOption {
return func(s *pState) {
s.shutdownNotifier = ch

View File

@@ -73,11 +73,17 @@ type Decorator interface {
// To be used with `func Any(DecorFunc, ...WC) Decorator`.
type DecorFunc func(Statistics) string
// Sync type used by Synchronizer.
type Sync struct {
Tx chan int
Rx chan int
}
// Synchronizer interface.
// All decorators implement this interface implicitly. Its Sync
// method exposes width sync channel, if DSyncWidth bit is set.
type Synchronizer interface {
Sync() (chan int, bool)
Sync() (*Sync, bool)
}
// Formatter interface.
@@ -129,10 +135,10 @@ var (
// W represents width and C represents bit set of width related config.
// A decorator should embed WC, to enable width synchronization.
type WC struct {
W int
C int
fill func(s string, w int) string
wsync chan int
W int
C int
sync *Sync
fill func(s string, w int) string
}
// Format should be called by any Decorator implementation.
@@ -145,8 +151,8 @@ func (wc WC) Format(str string) (string, int) {
width++
}
if (wc.C & DSyncWidth) != 0 {
wc.wsync <- width
width = <-wc.wsync
wc.sync.Tx <- width
width = <-wc.sync.Rx
}
return wc.fill(str, width), width
}
@@ -161,17 +167,17 @@ func (wc *WC) Init() WC {
if (wc.C & DSyncWidth) != 0 {
// it's deliberate choice to override wsync on each Init() call,
// this way globals like WCSyncSpace can be reused
wc.wsync = make(chan int)
wc.sync = &Sync{make(chan int, 1), make(chan int, 1)}
}
return *wc
}
// Sync is implementation of Synchronizer interface.
func (wc WC) Sync() (chan int, bool) {
if (wc.C&DSyncWidth) != 0 && wc.wsync == nil {
func (wc WC) Sync() (*Sync, bool) {
if (wc.C&DSyncWidth) != 0 && wc.sync == nil {
panic(fmt.Sprintf("%T is not initialized", wc))
}
return wc.wsync, (wc.C & DSyncWidth) != 0
return wc.sync, (wc.C & DSyncWidth) != 0
}
func initWC(wcc ...WC) WC {

View File

@@ -1,6 +1,11 @@
package mpb
import "container/heap"
import (
"container/heap"
"time"
"github.com/vbauerster/mpb/v8/decor"
)
type heapManager chan heapRequest
@@ -20,11 +25,7 @@ type heapRequest struct {
data interface{}
}
type iterData struct {
drop <-chan struct{}
iter chan<- *Bar
iterPop chan<- *Bar
}
type iterRequest chan (<-chan *Bar)
type pushData struct {
bar *Bar
@@ -38,63 +39,47 @@ type fixData struct {
}
func (m heapManager) run() {
var bHeap priorityQueue
var pMatrix, aMatrix map[int][]chan int
var bHeap barHeap
var pMatrix map[int][]*decor.Sync
var aMatrix map[int][]*decor.Sync
var l int
var sync bool
var prevLen int
for req := range m {
switch req.cmd {
case h_sync:
if sync || prevLen != bHeap.Len() {
pMatrix = make(map[int][]*decor.Sync)
aMatrix = make(map[int][]*decor.Sync)
for _, b := range bHeap {
table := b.wSyncTable()
for i, s := range table[0] {
pMatrix[i] = append(pMatrix[i], s)
}
for i, s := range table[1] {
aMatrix[i] = append(aMatrix[i], s)
}
}
sync, prevLen = false, bHeap.Len()
}
syncWidth(pMatrix)
syncWidth(aMatrix)
case h_push:
data := req.data.(pushData)
heap.Push(&bHeap, data.bar)
sync = sync || data.sync
case h_sync:
if sync || l != bHeap.Len() {
pMatrix = make(map[int][]chan int)
aMatrix = make(map[int][]chan int)
for _, b := range bHeap {
table := b.wSyncTable()
for i, ch := range table[0] {
pMatrix[i] = append(pMatrix[i], ch)
}
for i, ch := range table[1] {
aMatrix[i] = append(aMatrix[i], ch)
}
}
sync = false
l = bHeap.Len()
}
drop := req.data.(<-chan struct{})
syncWidth(pMatrix, drop)
syncWidth(aMatrix, drop)
case h_iter:
data := req.data.(iterData)
loop: // unordered iteration
for _, b := range bHeap {
select {
case data.iter <- b:
case <-data.drop:
data.iterPop = nil
break loop
for i, req := range req.data.([]iterRequest) {
ch := make(chan *Bar, bHeap.Len())
req <- ch
switch i {
case 0:
rangeOverSlice(bHeap, ch)
case 1:
popOverHeap(&bHeap, ch)
}
}
close(data.iter)
if data.iterPop == nil {
break
}
loop_pop: // ordered iteration
for bHeap.Len() != 0 {
bar := heap.Pop(&bHeap).(*Bar)
select {
case data.iterPop <- bar:
case <-data.drop:
heap.Push(&bHeap, bar)
break loop_pop
}
}
close(data.iterPop)
case h_fix:
data := req.data.(fixData)
if data.bar.index < 0 {
@@ -106,21 +91,24 @@ func (m heapManager) run() {
}
case h_state:
ch := req.data.(chan<- bool)
ch <- sync || l != bHeap.Len()
ch <- sync || prevLen != bHeap.Len()
case h_end:
ch := req.data.(chan<- interface{})
if ch != nil {
go func() {
ch <- []*Bar(bHeap)
select {
case ch <- []*Bar(bHeap):
case <-time.After(time.Second):
}
}()
}
close(m)
return
}
}
}
func (m heapManager) sync(drop <-chan struct{}) {
m <- heapRequest{cmd: h_sync, data: drop}
func (m heapManager) sync() {
m <- heapRequest{cmd: h_sync}
}
func (m heapManager) push(b *Bar, sync bool) {
@@ -135,9 +123,8 @@ func (m heapManager) push(b *Bar, sync bool) {
}
}
func (m heapManager) iter(drop <-chan struct{}, iter, iterPop chan<- *Bar) {
data := iterData{drop, iter, iterPop}
m <- heapRequest{cmd: h_iter, data: data}
func (m heapManager) iter(req ...iterRequest) {
m <- heapRequest{cmd: h_iter, data: req}
}
func (m heapManager) fix(b *Bar, priority int, lazy bool) {
@@ -153,25 +140,37 @@ func (m heapManager) end(ch chan<- interface{}) {
m <- heapRequest{cmd: h_end, data: ch}
}
func syncWidth(matrix map[int][]chan int, drop <-chan struct{}) {
func syncWidth(matrix map[int][]*decor.Sync) {
for _, column := range matrix {
go maxWidthDistributor(column, drop)
go maxWidthDistributor(column)
}
}
func maxWidthDistributor(column []chan int, drop <-chan struct{}) {
func maxWidthDistributor(column []*decor.Sync) {
var maxWidth int
for _, ch := range column {
select {
case w := <-ch:
if w > maxWidth {
maxWidth = w
}
case <-drop:
return
for _, s := range column {
w := <-s.Tx
if w > maxWidth {
maxWidth = w
}
}
for _, ch := range column {
ch <- maxWidth
for _, s := range column {
s.Rx <- maxWidth
}
}
// unordered iteration
func rangeOverSlice(s barHeap, dst chan<- *Bar) {
defer close(dst)
for _, b := range s {
dst <- b
}
}
// ordered iteration
func popOverHeap(h heap.Interface, dst chan<- *Bar) {
defer close(dst)
for h.Len() != 0 {
dst <- heap.Pop(h).(*Bar)
}
}

View File

@@ -1,37 +0,0 @@
package mpb
import "container/heap"
var _ heap.Interface = (*priorityQueue)(nil)
type priorityQueue []*Bar
func (pq priorityQueue) Len() int { return len(pq) }
func (pq priorityQueue) Less(i, j int) bool {
// greater priority pops first
return pq[i].priority > pq[j].priority
}
func (pq priorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *priorityQueue) Push(x interface{}) {
s := *pq
b := x.(*Bar)
b.index = len(s)
*pq = append(s, b)
}
func (pq *priorityQueue) Pop() interface{} {
var b *Bar
s := *pq
i := len(s) - 1
b, s[i] = s[i], nil // nil to avoid memory leak
b.index = -1 // for safety
*pq = s[:i]
return b
}

View File

@@ -34,7 +34,6 @@ type Progress struct {
type pState struct {
ctx context.Context
hm heapManager
iterDrop chan struct{}
renderReq chan time.Time
idCount int
popPriority int
@@ -71,7 +70,6 @@ func NewWithContext(ctx context.Context, options ...ContainerOption) *Progress {
s := &pState{
ctx: ctx,
hmQueueLen: defaultHmQueueLength,
iterDrop: make(chan struct{}),
renderReq: make(chan time.Time),
popPriority: math.MinInt32,
refreshRate: defaultRefreshRate,
@@ -176,12 +174,13 @@ func (p *Progress) Add(total int64, filler BarFiller, options ...BarOption) (*Ba
}
func (p *Progress) traverseBars(cb func(b *Bar) bool) {
drop, iter := make(chan struct{}), make(chan *Bar)
req := make(iterRequest, 1)
select {
case p.operateState <- func(s *pState) { s.hm.iter(drop, iter, nil) }:
for b := range iter {
case p.operateState <- func(s *pState) {
s.hm.iter(req)
}:
for b := range <-req {
if !cb(b) {
close(drop)
break
}
}
@@ -227,12 +226,12 @@ func (p *Progress) Write(b []byte) (int, error) {
// Wait waits for all bars to complete and finally shutdowns container. After
// this method has been called, there is no way to reuse `*Progress` instance.
func (p *Progress) Wait() {
p.bwg.Wait()
p.Shutdown()
// wait for user wg, if any
if p.uwg != nil {
p.uwg.Wait()
}
p.bwg.Wait()
p.Shutdown()
}
// Shutdown cancels any running bar immediately and then shutdowns `*Progress`
@@ -335,15 +334,14 @@ func (s *pState) manualRefreshListener(done chan struct{}) {
}
func (s *pState) render(cw *cwriter.Writer) (err error) {
iter, iterPop := make(chan *Bar), make(chan *Bar)
s.hm.sync(s.iterDrop)
s.hm.iter(s.iterDrop, iter, iterPop)
req := make(iterRequest, 1)
s.hm.sync()
s.hm.iter(req, req)
var width, height int
if cw.IsTerminal() {
width, height, err = cw.GetTermSize()
if err != nil {
close(s.iterDrop)
return err
}
} else {
@@ -355,24 +353,22 @@ func (s *pState) render(cw *cwriter.Writer) (err error) {
height = width
}
var barCount int
for b := range iter {
barCount++
for b := range <-req {
go b.render(width)
}
return s.flush(cw, height, barCount, iterPop)
return s.flush(cw, height, <-req)
}
func (s *pState) flush(cw *cwriter.Writer, height, barCount int, iter <-chan *Bar) error {
func (s *pState) flush(cw *cwriter.Writer, height int, iter <-chan *Bar) error {
var total, popCount int
rows := make([][]io.Reader, 0, barCount)
rows := make([][]io.Reader, 0, cap(iter))
for b := range iter {
frame := <-b.frameCh
if frame.err != nil {
close(s.iterDrop)
b.cancel()
s.hm.push(b, false)
return frame.err // b.frameCh is buffered it's ok to return here
}
var discarded int