diff --git a/transport/http2_client.go b/transport/http2_client.go index d4fc6815..e28172f3 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -98,7 +98,8 @@ type http2Client struct { initialWindowSize int32 - bdpEst *bdpEstimator + bdpEst *bdpEstimator + outQuotaVersion uint64 mu sync.Mutex // guard the following variables state transportState // the state of underlying connection @@ -679,9 +680,13 @@ func (t *http2Client) GracefulClose() error { // if it improves the performance. func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { r := bytes.NewBuffer(data) + var ( + p []byte + oqv uint64 + ) for { - var p []byte - if r.Len() > 0 { + oqv = atomic.LoadUint64(&t.outQuotaVersion) + if r.Len() > 0 || p != nil { size := http2MaxFrameLen // Wait until the stream has some quota to send the data. sq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, s.sendQuotaPool.acquire()) @@ -699,7 +704,9 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { if tq < size { size = tq } - p = r.Next(size) + if p == nil { + p = r.Next(size) + } ps := len(p) if ps < sq { // Overbooked stream quota. Return it back. @@ -744,6 +751,18 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { return ContextErr(s.ctx.Err()) default: } + if oqv != atomic.LoadUint64(&t.outQuotaVersion) { + // InitialWindowSize settings frame must have been received after we + // acquired send quota but before we got the writable channel. + // We must forsake this write. + t.sendQuotaPool.add(len(p)) + s.sendQuotaPool.add(len(p)) + if t.framer.adjustNumWriters(-1) == 0 { + t.controlBuf.put(&flushIO{}) + } + t.writableChan <- 0 + continue + } if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 { // Do a force flush iff this is last frame for the entire gRPC message // and the caller is the only writer at this moment. @@ -756,6 +775,7 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { t.notifyError(err) return connectionErrorf(true, err, "transport: %v", err) } + p = nil if t.framer.adjustNumWriters(-1) == 0 { t.framer.flushWrite() } @@ -1216,6 +1236,7 @@ func (t *http2Client) applySettings(ss []http2.Setting) { } t.streamSendQuota = s.Val t.mu.Unlock() + atomic.AddUint64(&t.outQuotaVersion, 1) } } } diff --git a/transport/http2_server.go b/transport/http2_server.go index 76808ced..c54585e2 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -99,7 +99,8 @@ type http2Server struct { initialWindowSize int32 - bdpEst *bdpEstimator + bdpEst *bdpEstimator + outQuotaVersion uint64 mu sync.Mutex // guard the following state transportState @@ -828,10 +829,15 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) { t.WriteHeader(s, nil) } r := bytes.NewBuffer(data) + var ( + p []byte + oqv uint64 + ) for { - if r.Len() == 0 { + if r.Len() == 0 && p == nil { return nil } + oqv = atomic.LoadUint64(&t.outQuotaVersion) size := http2MaxFrameLen // Wait until the stream has some quota to send the data. sq, err := wait(s.ctx, nil, nil, t.shutdownChan, s.sendQuotaPool.acquire()) @@ -849,7 +855,9 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) { if tq < size { size = tq } - p := r.Next(size) + if p == nil { + p = r.Next(size) + } ps := len(p) if ps < sq { // Overbooked stream quota. Return it back. @@ -886,6 +894,18 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) { return ContextErr(s.ctx.Err()) default: } + if oqv != atomic.LoadUint64(&t.outQuotaVersion) { + // InitialWindowSize settings frame must have been received after we + // acquired send quota but before we got the writable channel. + // We must forsake this write. + t.sendQuotaPool.add(ps) + s.sendQuotaPool.add(ps) + if t.framer.adjustNumWriters(-1) == 0 { + t.controlBuf.put(&flushIO{}) + } + t.writableChan <- 0 + continue + } var forceFlush bool if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 && !opts.Last { forceFlush = true @@ -897,6 +917,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) { t.Close() return connectionErrorf(true, err, "transport: %v", err) } + p = nil if t.framer.adjustNumWriters(-1) == 0 { t.framer.flushWrite() } @@ -914,6 +935,7 @@ func (t *http2Server) applySettings(ss []http2.Setting) { stream.sendQuotaPool.add(int(s.Val) - int(t.streamSendQuota)) } t.streamSendQuota = s.Val + atomic.AddUint64(&t.outQuotaVersion, 1) } }