diff --git a/transport/control.go b/transport/control.go index c99f363e..4c4eeb48 100644 --- a/transport/control.go +++ b/transport/control.go @@ -196,6 +196,25 @@ func (f *inFlow) onData(n uint32) error { return nil } +// adjustConnPendingUpdate increments the connection level pending updates by n. +// This is called to make the proper connection level window updates when +// receiving data frame targeting the canceled RPCs. +func (f *inFlow) adjustConnPendingUpdate(n uint32) uint32 { + if n == 0 || f.conn != nil { + return 0 + } + f.mu.Lock() + defer f.mu.Unlock() + f.pendingUpdate += n + if f.pendingUpdate >= f.limit/4 { + ret := f.pendingUpdate + f.pendingUpdate = 0 + return ret + } + return 0 + +} + // connOnRead updates the connection level states when the application consumes data. func (f *inFlow) connOnRead(n uint32) uint32 { if n == 0 || f.conn != nil { diff --git a/transport/http2_client.go b/transport/http2_client.go index 5d4a8c4b..cd18f9d1 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -573,6 +573,9 @@ func (t *http2Client) handleData(f *http2.DataFrame) { // Select the right stream to dispatch. s, ok := t.getStream(f) if !ok { + if cwu := t.fc.adjustConnPendingUpdate(uint32(len(f.Data()))); cwu > 0 { + t.controlBuf.put(&windowUpdate{0, cwu}) + } return } size := len(f.Data()) diff --git a/transport/http2_server.go b/transport/http2_server.go index 03164236..335872a6 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -320,6 +320,9 @@ func (t *http2Server) handleData(f *http2.DataFrame) { // Select the right stream to dispatch. s, ok := t.getStream(f) if !ok { + if cwu := t.fc.adjustConnPendingUpdate(uint32(len(f.Data()))); cwu > 0 { + t.controlBuf.put(&windowUpdate{0, cwu}) + } return } size := len(f.Data())