diff --git a/transport/.control.go.swp b/transport/.control.go.swp new file mode 100644 index 00000000..1d0a4f36 Binary files /dev/null and b/transport/.control.go.swp differ diff --git a/transport/.http2_server.go.swp b/transport/.http2_server.go.swp new file mode 100644 index 00000000..b51fa10c Binary files /dev/null and b/transport/.http2_server.go.swp differ diff --git a/transport/.transport.go.swp b/transport/.transport.go.swp new file mode 100644 index 00000000..785feb91 Binary files /dev/null and b/transport/.transport.go.swp differ diff --git a/transport/control.go b/transport/control.go index fc7806a8..c00ee4b3 100644 --- a/transport/control.go +++ b/transport/control.go @@ -164,12 +164,10 @@ type inFlow struct { mu sync.Mutex // pendingData is the overall data which have been received but not been - // fully consumed (either pending for application to read or pending for - // window update). + // consumed by applications. pendingData uint32 // The amount of data the application has consumed but grpc has not sent - // window update for them. Used to reduce window update frequency. It is - // always part of pendingData. + // window update for them. Used to reduce window update frequency. pendingUpdate uint32 } @@ -181,8 +179,8 @@ func (f *inFlow) onData(n uint32) error { } f.mu.Lock() defer f.mu.Unlock() - if f.pendingData+n > f.limit { - return fmt.Errorf("recieved %d-bytes data exceeding the limit %d bytes", f.pendingData+n, f.limit) + if f.pendingData+f.pendingUpdate+n > f.limit { + return fmt.Errorf("recieved %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate+n, f.limit) } if f.conn != nil { if err := f.conn.onData(n); err != nil { @@ -196,14 +194,18 @@ func (f *inFlow) onData(n uint32) error { // onRead is invoked when the application reads the data. func (f *inFlow) onRead(n uint32) uint32 { if n == 0 { - return 0 + return 0 } f.mu.Lock() defer f.mu.Unlock() + if f.pendingData == 0 { + // pendingData has been adjusted by restoreConn. + return 0 + } + f.pendingData -= n f.pendingUpdate += n if f.pendingUpdate >= f.limit/4 { ret := f.pendingUpdate - f.pendingData -= ret f.pendingUpdate = 0 return ret } @@ -218,14 +220,8 @@ func (f *inFlow) restoreConn() uint32 { } f.mu.Lock() defer f.mu.Unlock() - ret := f.pendingData - f.conn.mu.Lock() - f.conn.pendingData -= ret - if f.conn.pendingUpdate > f.conn.pendingData { - f.conn.pendingUpdate = f.conn.pendingData - } - f.conn.mu.Unlock() + n := f.pendingData f.pendingData = 0 f.pendingUpdate = 0 - return ret + return f.conn.onRead(n) } diff --git a/transport/http2_client.go b/transport/http2_client.go index b9cef877..4803390d 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -479,12 +479,12 @@ func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) { // Window updates will deliver to the controller for sending when // the cumulative quota exceeds the corresponding threshold. func (t *http2Client) updateWindow(s *Stream, n uint32) { - if q := t.fc.onRead(n); q > 0 { - t.controlBuf.put(&windowUpdate{0, q}) - } if q := s.fc.onRead(n); q > 0 { t.controlBuf.put(&windowUpdate{s.id, q}) } + if q := t.fc.onRead(n); q > 0 { + t.controlBuf.put(&windowUpdate{0, q}) + } } func (t *http2Client) handleData(f *http2.DataFrame) { diff --git a/transport/http2_server.go b/transport/http2_server.go index a6576584..21e487a0 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -305,12 +305,12 @@ func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) { // Window updates will deliver to the controller for sending when // the cumulative quota exceeds the corresponding threshold. func (t *http2Server) updateWindow(s *Stream, n uint32) { - if q := t.fc.onRead(n); q > 0 { - t.controlBuf.put(&windowUpdate{0, q}) - } if q := s.fc.onRead(n); q > 0 { t.controlBuf.put(&windowUpdate{s.id, q}) } + if q := t.fc.onRead(n); q > 0 { + t.controlBuf.put(&windowUpdate{0, q}) + } } func (t *http2Server) handleData(f *http2.DataFrame) { diff --git a/transport/transport.go b/transport/transport.go index 35d31b71..f6b80738 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -199,7 +199,6 @@ type Stream struct { // the status received from the server. statusCode codes.Code statusDesc string - pendingData uint32 } // Header acquires the key-value pairs of header metadata once it diff --git a/transport/transport_test.go b/transport/transport_test.go index 70343396..7559cf8f 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -489,11 +489,11 @@ func TestServerWithMisbehavedClient(t *testing.T) { // Keep sending until the server inbound window is drained for that stream. for sent <= initialWindowSize { <-cc.writableChan - if err = cc.framer.writeData(true, s.id, false, make([]byte, http2MaxFrameLen)); err != nil { + if err = cc.framer.writeData(true, s.id, false, make([]byte, 1)); err != nil { t.Fatalf("Failed to write data: ", err) } cc.writableChan <- 0 - sent += http2MaxFrameLen + sent += 1 } // Server sent a resetStream for s already. code := http2RSTErrConvTab[http2.ErrCodeFlowControl] @@ -501,8 +501,8 @@ func TestServerWithMisbehavedClient(t *testing.T) { t.Fatalf("%v got err %v with statusCode %d, want err with statusCode %d", s, err, s.statusCode, code) } - if ss.fc.pendingData != 0 || ss.fc.pendingUpdate != 0 || sc.fc.pendingData != 0 || sc.fc.pendingUpdate != 0 { - t.Fatalf("Server mistakenly resets inbound flow control params: got %d, %d, %d, %d; want 0, 0, 0, 0", ss.fc.pendingData, ss.fc.pendingUpdate, sc.fc.pendingData, sc.fc.pendingUpdate) + if ss.fc.pendingData != 0 || ss.fc.pendingUpdate != 0 || sc.fc.pendingData != 0 || sc.fc.pendingUpdate != initialWindowSize { + t.Fatalf("Server mistakenly resets inbound flow control params: got %d, %d, %d, %d; want 0, 0, 0, %d", ss.fc.pendingData, ss.fc.pendingUpdate, sc.fc.pendingData, sc.fc.pendingUpdate, initialWindowSize) } ct.CloseStream(s, nil) // Test server behavior for violation of connection flow control window size restriction. @@ -512,14 +512,11 @@ func TestServerWithMisbehavedClient(t *testing.T) { for { s, err := ct.NewStream(context.Background(), callHdr) if err != nil { - t.Fatalf("Failed to open stream: %v", err) - } - <-cc.writableChan - // Write will fail when connection flow control window runs out. - if err := cc.framer.writeData(true, s.id, true, make([]byte, http2MaxFrameLen)); err != nil { // The server tears down the connection. break } + <-cc.writableChan + cc.framer.writeData(true, s.id, true, make([]byte, http2MaxFrameLen)) cc.writableChan <- 0 } ct.Close() @@ -558,8 +555,8 @@ func TestClientWithMisbehavedServer(t *testing.T) { t.Fatalf("Got err %v and the status code %d, want and the code %d", err, s.statusCode, codes.Internal) } conn.CloseStream(s, err) - if s.fc.pendingData != 0 || s.fc.pendingUpdate != 0 || conn.fc.pendingData != 0 || conn.fc.pendingUpdate != 0 { - t.Fatalf("Client mistakenly resets inbound flow control params: got %d, %d, %d, %d; want 0, 0, 0, 0", s.fc.pendingData, s.fc.pendingUpdate, conn.fc.pendingData, conn.fc.pendingUpdate) + if s.fc.pendingData != 0 || s.fc.pendingUpdate != 0 || conn.fc.pendingData != 0 || conn.fc.pendingUpdate != initialWindowSize { + t.Fatalf("Client mistakenly resets inbound flow control params: got %d, %d, %d, %d; want 0, 0, 0, %d", s.fc.pendingData, s.fc.pendingUpdate, conn.fc.pendingData, conn.fc.pendingUpdate, initialWindowSize) } // Test the logic for the violation of the connection flow control window size restriction. //