From ccdc150c372bbca0ecf9341942bce43f29ff46b2 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Tue, 12 Apr 2016 11:06:27 -0700 Subject: [PATCH] Fix window update counting for the canceled streams --- transport/control.go | 19 +++++++++++++++++++ transport/http2_client.go | 3 +++ transport/http2_server.go | 3 +++ 3 files changed, 25 insertions(+) diff --git a/transport/control.go b/transport/control.go index c99f363e..4c4eeb48 100644 --- a/transport/control.go +++ b/transport/control.go @@ -196,6 +196,25 @@ func (f *inFlow) onData(n uint32) error { return nil } +// adjustConnPendingUpdate increments the connection level pending updates by n. +// This is called to make the proper connection level window updates when +// receiving data frame targeting the canceled RPCs. +func (f *inFlow) adjustConnPendingUpdate(n uint32) uint32 { + if n == 0 || f.conn != nil { + return 0 + } + f.mu.Lock() + defer f.mu.Unlock() + f.pendingUpdate += n + if f.pendingUpdate >= f.limit/4 { + ret := f.pendingUpdate + f.pendingUpdate = 0 + return ret + } + return 0 + +} + // connOnRead updates the connection level states when the application consumes data. func (f *inFlow) connOnRead(n uint32) uint32 { if n == 0 || f.conn != nil { diff --git a/transport/http2_client.go b/transport/http2_client.go index 5d4a8c4b..cd18f9d1 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -573,6 +573,9 @@ func (t *http2Client) handleData(f *http2.DataFrame) { // Select the right stream to dispatch. s, ok := t.getStream(f) if !ok { + if cwu := t.fc.adjustConnPendingUpdate(uint32(len(f.Data()))); cwu > 0 { + t.controlBuf.put(&windowUpdate{0, cwu}) + } return } size := len(f.Data()) diff --git a/transport/http2_server.go b/transport/http2_server.go index 03164236..335872a6 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -320,6 +320,9 @@ func (t *http2Server) handleData(f *http2.DataFrame) { // Select the right stream to dispatch. s, ok := t.getStream(f) if !ok { + if cwu := t.fc.adjustConnPendingUpdate(uint32(len(f.Data()))); cwu > 0 { + t.controlBuf.put(&windowUpdate{0, cwu}) + } return } size := len(f.Data())