diff --git a/go.mod b/go.mod index b81d64a3a2..6516dfb65e 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ ignore ./tools/dev require ( cloud.google.com/go/bigtable v1.41.0 cloud.google.com/go/pubsub/v2 v2.3.0 - cloud.google.com/go/storage v1.59.1 + cloud.google.com/go/storage v1.59.2 dario.cat/mergo v1.0.2 github.com/Azure/azure-pipeline-go v0.2.3 github.com/Azure/azure-storage-blob-go v0.15.0 diff --git a/go.sum b/go.sum index 58d8c06876..69e0903ddd 100644 --- a/go.sum +++ b/go.sum @@ -52,8 +52,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= -cloud.google.com/go/storage v1.59.1 h1:DXAZLcTimtiXdGqDSnebROVPd9QvRsFVVlptz02Wk58= -cloud.google.com/go/storage v1.59.1/go.mod h1:cMWbtM+anpC74gn6qjLh+exqYcfmB9Hqe5z6adx+CLI= +cloud.google.com/go/storage v1.59.2 h1:gmOAuG1opU8YvycMNpP+DvHfT9BfzzK5Cy+arP+Nocw= +cloud.google.com/go/storage v1.59.2/go.mod h1:cMWbtM+anpC74gn6qjLh+exqYcfmB9Hqe5z6adx+CLI= cloud.google.com/go/trace v1.11.6 h1:2O2zjPzqPYAHrn3OKl029qlqG6W8ZdYaOWRyr8NgMT4= cloud.google.com/go/trace v1.11.6/go.mod h1:GA855OeDEBiBMzcckLPE2kDunIpC72N+Pq8WFieFjnI= dario.cat/mergo v1.0.2 h1:85+piFYR1tMbRrLcDwR18y4UKJ3aH1Tbzi24VRW1TK8= diff --git a/vendor/cloud.google.com/go/storage/CHANGES.md b/vendor/cloud.google.com/go/storage/CHANGES.md index ea259da7d8..efa8edce8d 100644 --- a/vendor/cloud.google.com/go/storage/CHANGES.md +++ b/vendor/cloud.google.com/go/storage/CHANGES.md @@ -1,6 +1,12 @@ # Changes +## [1.59.2](https://github.com/googleapis/google-cloud-go/releases/tag/storage%2Fv1.59.2) (2026-01-28) + +### Bug Fixes + +* deadlock in event loop while coordinating channels (#13652) ([ff6c8e7](https://github.com/googleapis/google-cloud-go/commit/ff6c8e780b2207b154808ba22e3124b68d6b4f7d)) + ## [1.59.1](https://github.com/googleapis/google-cloud-go/releases/tag/storage%2Fv1.59.1) (2026-01-14) ### Bug Fixes diff --git a/vendor/cloud.google.com/go/storage/grpc_reader_multi_range.go b/vendor/cloud.google.com/go/storage/grpc_reader_multi_range.go index 4b05813c74..1e3f4fd16b 100644 --- a/vendor/cloud.google.com/go/storage/grpc_reader_multi_range.go +++ b/vendor/cloud.google.com/go/storage/grpc_reader_multi_range.go @@ -15,6 +15,7 @@ package storage import ( + "container/list" "context" "errors" "fmt" @@ -34,6 +35,10 @@ import ( const ( mrdCommandChannelSize = 1 mrdResponseChannelSize = 100 + // This should never be hit in practice, but is a safety valve to prevent + // unbounded memory usage if the user is adding ranges faster than they + // can be processed. + mrdAddInternalQueueMaxSize = 50000 ) // --- internalMultiRangeDownloader Interface --- @@ -83,18 +88,19 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params // Create the manager manager := &multiRangeDownloaderManager{ - ctx: mCtx, - cancel: cancel, - client: c, - settings: s, - params: params, - cmds: make(chan mrdCommand, mrdCommandChannelSize), - sessionResps: make(chan mrdSessionResult, mrdResponseChannelSize), - pendingRanges: make(map[int64]*rangeRequest), - readIDCounter: 1, - readSpec: readSpec, - attrsReady: make(chan struct{}), - spanCtx: ctx, + ctx: mCtx, + cancel: cancel, + client: c, + settings: s, + params: params, + cmds: make(chan mrdCommand, mrdCommandChannelSize), + sessionResps: make(chan mrdSessionResult, mrdResponseChannelSize), + pendingRanges: make(map[int64]*rangeRequest), + readIDCounter: 1, + readSpec: readSpec, + attrsReady: make(chan struct{}), + spanCtx: ctx, + unsentRequests: newRequestQueue(), } mrd := &MultiRangeDownloader{ @@ -227,6 +233,7 @@ type multiRangeDownloaderManager struct { attrsOnce sync.Once spanCtx context.Context callbackWg sync.WaitGroup + unsentRequests *requestQueue } type rangeRequest struct { @@ -374,10 +381,29 @@ func (m *multiRangeDownloaderManager) eventLoop() { } for { + var nextReq *storagepb.BidiReadObjectRequest + var targetChan chan<- *storagepb.BidiReadObjectRequest + + // Only try to send if we have queued requests + if m.unsentRequests.Len() > 0 && m.currentSession != nil { + nextReq = m.unsentRequests.Front() + if nextReq != nil { + targetChan = m.currentSession.reqC + } + } + // Only read from cmds if we have space in the unsentRequests queue. + var cmdsChan chan mrdCommand + if m.unsentRequests.Len() < mrdAddInternalQueueMaxSize { + cmdsChan = m.cmds + } select { case <-m.ctx.Done(): return - case cmd := <-m.cmds: + // This path only triggers if space is available in the channel. + // It never blocks the eventLoop. + case targetChan <- nextReq: + m.unsentRequests.RemoveFront() + case cmd := <-cmdsChan: cmd.apply(m.ctx, m) if _, ok := cmd.(*mrdCloseCmd); ok { return @@ -386,7 +412,7 @@ func (m *multiRangeDownloaderManager) eventLoop() { m.processSessionResult(result) } - if len(m.pendingRanges) == 0 { + if len(m.pendingRanges) == 0 && m.unsentRequests.Len() == 0 { for _, waiter := range m.waiters { close(waiter) } @@ -512,7 +538,7 @@ func (m *multiRangeDownloaderManager) handleAddCmd(ctx context.Context, cmd *mrd ReadId: req.readID, }}, } - m.currentSession.SendRequest(protoReq) + m.unsentRequests.PushBack(protoReq) } func (m *multiRangeDownloaderManager) convertToPositiveOffset(req *rangeRequest) error { @@ -655,7 +681,8 @@ func (m *multiRangeDownloaderManager) ensureSession(ctx context.Context) error { } } if len(rangesToResend) > 0 { - m.currentSession.SendRequest(&storagepb.BidiReadObjectRequest{ReadRanges: rangesToResend}) + retryReq := &storagepb.BidiReadObjectRequest{ReadRanges: rangesToResend} + m.unsentRequests.PushFront(retryReq) } return nil }, m.settings.retry, true) @@ -900,3 +927,28 @@ func readerAttrsFromObject(o *ObjectAttrs) ReaderObjectAttrs { CRC32C: o.CRC32C, } } + +type requestQueue struct { + l *list.List +} + +func newRequestQueue() *requestQueue { + return &requestQueue{l: list.New()} +} + +func (q *requestQueue) PushBack(r *storagepb.BidiReadObjectRequest) { q.l.PushBack(r) } +func (q *requestQueue) PushFront(r *storagepb.BidiReadObjectRequest) { q.l.PushFront(r) } +func (q *requestQueue) Len() int { return q.l.Len() } + +func (q *requestQueue) Front() *storagepb.BidiReadObjectRequest { + if f := q.l.Front(); f != nil { + return f.Value.(*storagepb.BidiReadObjectRequest) + } + return nil +} + +func (q *requestQueue) RemoveFront() { + if f := q.l.Front(); f != nil { + q.l.Remove(f) + } +} diff --git a/vendor/cloud.google.com/go/storage/internal/version.go b/vendor/cloud.google.com/go/storage/internal/version.go index 0db74236e8..cee1b2a070 100644 --- a/vendor/cloud.google.com/go/storage/internal/version.go +++ b/vendor/cloud.google.com/go/storage/internal/version.go @@ -17,4 +17,4 @@ package internal // Version is the current tagged release of the library. -const Version = "1.59.1" +const Version = "1.59.2" diff --git a/vendor/modules.txt b/vendor/modules.txt index ea00518a30..b1fa97dce5 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -68,7 +68,7 @@ cloud.google.com/go/pubsub/v2/apiv1/pubsubpb cloud.google.com/go/pubsub/v2/internal cloud.google.com/go/pubsub/v2/internal/distribution cloud.google.com/go/pubsub/v2/internal/scheduler -# cloud.google.com/go/storage v1.59.1 +# cloud.google.com/go/storage v1.59.2 ## explicit; go 1.24.0 cloud.google.com/go/storage cloud.google.com/go/storage/experimental