Validate send quota again after acquiring writable channel (#1367)
* Validate send qouta again after acquring writable channel. * Debug * increase version only after settings have been updated * clean-up
This commit is contained in:
@ -98,7 +98,8 @@ type http2Client struct {
|
||||
|
||||
initialWindowSize int32
|
||||
|
||||
bdpEst *bdpEstimator
|
||||
bdpEst *bdpEstimator
|
||||
outQuotaVersion uint64
|
||||
|
||||
mu sync.Mutex // guard the following variables
|
||||
state transportState // the state of underlying connection
|
||||
@ -679,9 +680,13 @@ func (t *http2Client) GracefulClose() error {
|
||||
// if it improves the performance.
|
||||
func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
|
||||
r := bytes.NewBuffer(data)
|
||||
var (
|
||||
p []byte
|
||||
oqv uint64
|
||||
)
|
||||
for {
|
||||
var p []byte
|
||||
if r.Len() > 0 {
|
||||
oqv = atomic.LoadUint64(&t.outQuotaVersion)
|
||||
if r.Len() > 0 || p != nil {
|
||||
size := http2MaxFrameLen
|
||||
// Wait until the stream has some quota to send the data.
|
||||
sq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, s.sendQuotaPool.acquire())
|
||||
@ -699,7 +704,9 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
|
||||
if tq < size {
|
||||
size = tq
|
||||
}
|
||||
p = r.Next(size)
|
||||
if p == nil {
|
||||
p = r.Next(size)
|
||||
}
|
||||
ps := len(p)
|
||||
if ps < sq {
|
||||
// Overbooked stream quota. Return it back.
|
||||
@ -744,6 +751,18 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
|
||||
return ContextErr(s.ctx.Err())
|
||||
default:
|
||||
}
|
||||
if oqv != atomic.LoadUint64(&t.outQuotaVersion) {
|
||||
// InitialWindowSize settings frame must have been received after we
|
||||
// acquired send quota but before we got the writable channel.
|
||||
// We must forsake this write.
|
||||
t.sendQuotaPool.add(len(p))
|
||||
s.sendQuotaPool.add(len(p))
|
||||
if t.framer.adjustNumWriters(-1) == 0 {
|
||||
t.controlBuf.put(&flushIO{})
|
||||
}
|
||||
t.writableChan <- 0
|
||||
continue
|
||||
}
|
||||
if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 {
|
||||
// Do a force flush iff this is last frame for the entire gRPC message
|
||||
// and the caller is the only writer at this moment.
|
||||
@ -756,6 +775,7 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
|
||||
t.notifyError(err)
|
||||
return connectionErrorf(true, err, "transport: %v", err)
|
||||
}
|
||||
p = nil
|
||||
if t.framer.adjustNumWriters(-1) == 0 {
|
||||
t.framer.flushWrite()
|
||||
}
|
||||
@ -1216,6 +1236,7 @@ func (t *http2Client) applySettings(ss []http2.Setting) {
|
||||
}
|
||||
t.streamSendQuota = s.Val
|
||||
t.mu.Unlock()
|
||||
atomic.AddUint64(&t.outQuotaVersion, 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -99,7 +99,8 @@ type http2Server struct {
|
||||
|
||||
initialWindowSize int32
|
||||
|
||||
bdpEst *bdpEstimator
|
||||
bdpEst *bdpEstimator
|
||||
outQuotaVersion uint64
|
||||
|
||||
mu sync.Mutex // guard the following
|
||||
state transportState
|
||||
@ -828,10 +829,15 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) {
|
||||
t.WriteHeader(s, nil)
|
||||
}
|
||||
r := bytes.NewBuffer(data)
|
||||
var (
|
||||
p []byte
|
||||
oqv uint64
|
||||
)
|
||||
for {
|
||||
if r.Len() == 0 {
|
||||
if r.Len() == 0 && p == nil {
|
||||
return nil
|
||||
}
|
||||
oqv = atomic.LoadUint64(&t.outQuotaVersion)
|
||||
size := http2MaxFrameLen
|
||||
// Wait until the stream has some quota to send the data.
|
||||
sq, err := wait(s.ctx, nil, nil, t.shutdownChan, s.sendQuotaPool.acquire())
|
||||
@ -849,7 +855,9 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) {
|
||||
if tq < size {
|
||||
size = tq
|
||||
}
|
||||
p := r.Next(size)
|
||||
if p == nil {
|
||||
p = r.Next(size)
|
||||
}
|
||||
ps := len(p)
|
||||
if ps < sq {
|
||||
// Overbooked stream quota. Return it back.
|
||||
@ -886,6 +894,18 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) {
|
||||
return ContextErr(s.ctx.Err())
|
||||
default:
|
||||
}
|
||||
if oqv != atomic.LoadUint64(&t.outQuotaVersion) {
|
||||
// InitialWindowSize settings frame must have been received after we
|
||||
// acquired send quota but before we got the writable channel.
|
||||
// We must forsake this write.
|
||||
t.sendQuotaPool.add(ps)
|
||||
s.sendQuotaPool.add(ps)
|
||||
if t.framer.adjustNumWriters(-1) == 0 {
|
||||
t.controlBuf.put(&flushIO{})
|
||||
}
|
||||
t.writableChan <- 0
|
||||
continue
|
||||
}
|
||||
var forceFlush bool
|
||||
if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 && !opts.Last {
|
||||
forceFlush = true
|
||||
@ -897,6 +917,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) {
|
||||
t.Close()
|
||||
return connectionErrorf(true, err, "transport: %v", err)
|
||||
}
|
||||
p = nil
|
||||
if t.framer.adjustNumWriters(-1) == 0 {
|
||||
t.framer.flushWrite()
|
||||
}
|
||||
@ -914,6 +935,7 @@ func (t *http2Server) applySettings(ss []http2.Setting) {
|
||||
stream.sendQuotaPool.add(int(s.Val) - int(t.streamSendQuota))
|
||||
}
|
||||
t.streamSendQuota = s.Val
|
||||
atomic.AddUint64(&t.outQuotaVersion, 1)
|
||||
}
|
||||
|
||||
}
|
||||
|
Reference in New Issue
Block a user