From 1e47334c738272e285e0a256d1f3f346bfdc8109 Mon Sep 17 00:00:00 2001 From: MakMukhi Date: Thu, 1 Jun 2017 09:42:07 -0700 Subject: [PATCH] Decouple transport flow control from application read. (#1265) * Decouple transport flow control from application read. * post-review update * Added comment in http2_server as well. * Added another test * Fixed typos in comments. --- transport/control.go | 8 -- transport/http2_client.go | 29 ++--- transport/http2_server.go | 29 ++--- transport/transport_test.go | 224 ++++++++++++++++++++++++++++++------ 4 files changed, 209 insertions(+), 81 deletions(-) diff --git a/transport/control.go b/transport/control.go index 68dfdd56..3db471a0 100644 --- a/transport/control.go +++ b/transport/control.go @@ -240,11 +240,3 @@ func (f *inFlow) onRead(n uint32) uint32 { } return 0 } - -func (f *inFlow) resetPendingData() uint32 { - f.mu.Lock() - defer f.mu.Unlock() - n := f.pendingData - f.pendingData = 0 - return n -} diff --git a/transport/http2_client.go b/transport/http2_client.go index 713f7622..d27199b2 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -595,11 +595,6 @@ func (t *http2Client) CloseStream(s *Stream, err error) { s.mu.Lock() rstStream = s.rstStream rstError = s.rstError - if q := s.fc.resetPendingData(); q > 0 { - if n := t.fc.onRead(q); n > 0 { - t.controlBuf.put(&windowUpdate{0, n}) - } - } if s.state == streamDone { s.mu.Unlock() return @@ -831,9 +826,6 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) { if s.state == streamDone { return } - if w := t.fc.onRead(n); w > 0 { - t.controlBuf.put(&windowUpdate{0, w}) - } if w := s.fc.onRead(n); w > 0 { t.controlBuf.put(&windowUpdate{s.id, w}) } @@ -845,22 +837,26 @@ func (t *http2Client) handleData(f *http2.DataFrame) { t.notifyError(connectionErrorf(true, err, "%v", err)) return } + // Decouple connection's flow control from application's read. + // An update on connection's flow control should not depend on + // whether user application has read the data or not. Such a + // restriction is already imposed on the stream's flow control, + // and therefore the sender will be blocked anyways. + // Decoupling the connection flow control will prevent other + // active(fast) streams from starving in presence of slow or + // inactive streams. + if w := t.fc.onRead(uint32(size)); w > 0 { + t.controlBuf.put(&windowUpdate{0, w}) + } // Select the right stream to dispatch. s, ok := t.getStream(f) if !ok { - if w := t.fc.onRead(uint32(size)); w > 0 { - t.controlBuf.put(&windowUpdate{0, w}) - } return } if size > 0 { s.mu.Lock() if s.state == streamDone { s.mu.Unlock() - // The stream has been closed. Release the corresponding quota. - if w := t.fc.onRead(uint32(size)); w > 0 { - t.controlBuf.put(&windowUpdate{0, w}) - } return } if err := s.fc.onData(uint32(size)); err != nil { @@ -872,9 +868,6 @@ func (t *http2Client) handleData(f *http2.DataFrame) { return } if f.Header().Flags.Has(http2.FlagDataPadded) { - if w := t.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 { - t.controlBuf.put(&windowUpdate{0, w}) - } if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 { t.controlBuf.put(&windowUpdate{s.id, w}) } diff --git a/transport/http2_server.go b/transport/http2_server.go index 559d28d3..130a39d5 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -462,9 +462,6 @@ func (t *http2Server) updateWindow(s *Stream, n uint32) { if s.state == streamDone { return } - if w := t.fc.onRead(n); w > 0 { - t.controlBuf.put(&windowUpdate{0, w}) - } if w := s.fc.onRead(n); w > 0 { t.controlBuf.put(&windowUpdate{s.id, w}) } @@ -477,22 +474,26 @@ func (t *http2Server) handleData(f *http2.DataFrame) { t.Close() return } + // Decouple connection's flow control from application's read. + // An update on connection's flow control should not depend on + // whether user application has read the data or not. Such a + // restriction is already imposed on the stream's flow control, + // and therefore the sender will be blocked anyways. + // Decoupling the connection flow control will prevent other + // active(fast) streams from starving in presence of slow or + // inactive streams. + if w := t.fc.onRead(uint32(size)); w > 0 { + t.controlBuf.put(&windowUpdate{0, w}) + } // Select the right stream to dispatch. s, ok := t.getStream(f) if !ok { - if w := t.fc.onRead(uint32(size)); w > 0 { - t.controlBuf.put(&windowUpdate{0, w}) - } return } if size > 0 { s.mu.Lock() if s.state == streamDone { s.mu.Unlock() - // The stream has been closed. Release the corresponding quota. - if w := t.fc.onRead(uint32(size)); w > 0 { - t.controlBuf.put(&windowUpdate{0, w}) - } return } if err := s.fc.onData(uint32(size)); err != nil { @@ -502,9 +503,6 @@ func (t *http2Server) handleData(f *http2.DataFrame) { return } if f.Header().Flags.Has(http2.FlagDataPadded) { - if w := t.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 { - t.controlBuf.put(&windowUpdate{0, w}) - } if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 { t.controlBuf.put(&windowUpdate{s.id, w}) } @@ -1066,11 +1064,6 @@ func (t *http2Server) closeStream(s *Stream) { // called to interrupt the potential blocking on other goroutines. s.cancel() s.mu.Lock() - if q := s.fc.resetPendingData(); q > 0 { - if w := t.fc.onRead(q); w > 0 { - t.controlBuf.put(&windowUpdate{0, w}) - } - } if s.state == streamDone { s.mu.Unlock() return diff --git a/transport/transport_test.go b/transport/transport_test.go index 72bd1047..ec45727c 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -481,7 +481,7 @@ func TestMaxConnectionAge(t *testing.T) { } } -// TestKeepaliveServer tests that a server closes conneciton with a client that doesn't respond to keepalive pings. +// TestKeepaliveServer tests that a server closes connection with a client that doesn't respond to keepalive pings. func TestKeepaliveServer(t *testing.T) { serverConfig := &ServerConfig{ KeepaliveParams: keepalive.ServerParameters{ @@ -1164,6 +1164,186 @@ func TestServerContextCanceledOnClosedConnection(t *testing.T) { server.stop() } +func TestClientConnDecoupledFromApplicationRead(t *testing.T) { + connectOptions := ConnectOptions{ + InitialWindowSize: defaultWindowSize, + InitialConnWindowSize: defaultWindowSize, + } + server, client := setUpWithOptions(t, 0, &ServerConfig{}, suspended, connectOptions) + defer server.stop() + defer client.Close() + + waitWhileTrue(t, func() (bool, error) { + server.mu.Lock() + defer server.mu.Unlock() + + if len(server.conns) == 0 { + return true, fmt.Errorf("timed-out while waiting for connection to be created on the server") + } + return false, nil + }) + + var st *http2Server + server.mu.Lock() + for k := range server.conns { + st = k.(*http2Server) + } + server.mu.Unlock() + cstream1, err := client.NewStream(context.Background(), &CallHdr{Flush: true}) + if err != nil { + t.Fatalf("Client failed to create first stream. Err: %v", err) + } + + var sstream1 *Stream + // Access stream on the server. + waitWhileTrue(t, func() (bool, error) { + st.mu.Lock() + defer st.mu.Unlock() + + if len(st.activeStreams) != 1 { + return true, fmt.Errorf("timed-out while waiting for server to have created a stream") + } + for _, v := range st.activeStreams { + sstream1 = v + } + return false, nil + }) + + // Exhaust client's connection window. + <-st.writableChan + if err := st.framer.writeData(true, sstream1.id, true, make([]byte, defaultWindowSize)); err != nil { + st.writableChan <- 0 + t.Fatalf("Server failed to write data. Err: %v", err) + } + st.writableChan <- 0 + // Create another stream on client. + cstream2, err := client.NewStream(context.Background(), &CallHdr{Flush: true}) + if err != nil { + t.Fatalf("Client failed to create second stream. Err: %v", err) + } + + var sstream2 *Stream + waitWhileTrue(t, func() (bool, error) { + st.mu.Lock() + defer st.mu.Unlock() + + if len(st.activeStreams) != 2 { + return true, fmt.Errorf("timed-out while waiting for server to have created the second stream") + } + for _, v := range st.activeStreams { + if v.id == cstream2.id { + sstream2 = v + } + } + if sstream2 == nil { + return true, fmt.Errorf("didn't find stream corresponding to client cstream.id: %v on the server", cstream2.id) + } + return false, nil + }) + + // Server should be able to send data on the new stream, even though the client hasn't read anything on the first stream. + <-st.writableChan + if err := st.framer.writeData(true, sstream2.id, true, make([]byte, defaultWindowSize)); err != nil { + st.writableChan <- 0 + t.Fatalf("Server failed to write data. Err: %v", err) + } + st.writableChan <- 0 + + // Client should be able to read data on second stream. + if _, err := cstream2.Read(make([]byte, defaultWindowSize)); err != nil { + t.Fatalf("_.Read(_) = _, %v, want _, ", err) + } + + // Client should be able to read data on first stream. + if _, err := cstream1.Read(make([]byte, defaultWindowSize)); err != nil { + t.Fatalf("_.Read(_) = _, %v, want _, ", err) + } +} + +func TestServerConnDecoupledFromApplicationRead(t *testing.T) { + serverConfig := &ServerConfig{ + InitialWindowSize: defaultWindowSize, + InitialConnWindowSize: defaultWindowSize, + } + server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) + defer server.stop() + defer client.Close() + waitWhileTrue(t, func() (bool, error) { + server.mu.Lock() + defer server.mu.Unlock() + + if len(server.conns) == 0 { + return true, fmt.Errorf("timed-out while waiting for connection to be created on the server") + } + return false, nil + }) + var st *http2Server + server.mu.Lock() + for k := range server.conns { + st = k.(*http2Server) + } + server.mu.Unlock() + cstream1, err := client.NewStream(context.Background(), &CallHdr{Flush: true}) + if err != nil { + t.Fatalf("Failed to create 1st stream. Err: %v", err) + } + // Exhaust server's connection window. + if err := client.Write(cstream1, make([]byte, defaultWindowSize), &Options{Last: true}); err != nil { + t.Fatalf("Client failed to write data. Err: %v", err) + } + //Client should be able to create another stream and send data on it. + cstream2, err := client.NewStream(context.Background(), &CallHdr{Flush: true}) + if err != nil { + t.Fatalf("Failed to create 2nd stream. Err: %v", err) + } + if err := client.Write(cstream2, make([]byte, defaultWindowSize), &Options{}); err != nil { + t.Fatalf("Client failed to write data. Err: %v", err) + } + // Get the streams on server. + waitWhileTrue(t, func() (bool, error) { + st.mu.Lock() + defer st.mu.Unlock() + + if len(st.activeStreams) != 2 { + return true, fmt.Errorf("timed-out while waiting for server to have created the streams") + } + return false, nil + }) + var sstream1 *Stream + st.mu.Lock() + for _, v := range st.activeStreams { + if v.id == 1 { + sstream1 = v + } + } + st.mu.Unlock() + // Trying to write more on a max-ed out stream should result in a RST_STREAM from the server. + ct := client.(*http2Client) + <-ct.writableChan + if err := ct.framer.writeData(true, cstream2.id, true, make([]byte, 1)); err != nil { + t.Fatalf("Client failed to write. Err: %v", err) + } + ct.writableChan <- 0 + code := http2ErrConvTab[http2.ErrCodeFlowControl] + waitWhileTrue(t, func() (bool, error) { + cstream2.mu.Lock() + defer cstream2.mu.Unlock() + if cstream2.status.Code() != code { + return true, fmt.Errorf("want code = %v, got %v", code, cstream2.status.Code()) + } + return false, nil + }) + // Reading from the stream on server should succeed. + if _, err := sstream1.Read(make([]byte, defaultWindowSize)); err != nil { + t.Fatalf("_.Read(_) = %v, want ", err) + } + + if _, err := sstream1.Read(make([]byte, 1)); err != io.EOF { + t.Fatalf("_.Read(_) = %v, want io.EOF", err) + } + +} + func TestServerWithMisbehavedClient(t *testing.T) { server, ct := setUp(t, 0, math.MaxUint32, suspended) callHdr := &CallHdr{ @@ -1224,7 +1404,7 @@ func TestServerWithMisbehavedClient(t *testing.T) { } ss.fc.mu.Unlock() } - if ss.fc.pendingData != http2MaxFrameLen || ss.fc.pendingUpdate != 0 || sc.fc.pendingData != http2MaxFrameLen || sc.fc.pendingUpdate != 0 { + if ss.fc.pendingData != http2MaxFrameLen || ss.fc.pendingUpdate != 0 || sc.fc.pendingData != 0 || sc.fc.pendingUpdate != http2MaxFrameLen { t.Fatalf("Server mistakenly updates inbound flow control params: got %d, %d, %d, %d; want %d, %d, %d, %d", ss.fc.pendingData, ss.fc.pendingUpdate, sc.fc.pendingData, sc.fc.pendingUpdate, http2MaxFrameLen, 0, http2MaxFrameLen, 0) } // Keep sending until the server inbound window is drained for that stream. @@ -1245,24 +1425,10 @@ func TestServerWithMisbehavedClient(t *testing.T) { t.Fatalf("%v got status %v; want Code=%v", s, s.status, code) } - if ss.fc.pendingData != 0 || ss.fc.pendingUpdate != 0 || sc.fc.pendingData != 0 || sc.fc.pendingUpdate <= initialWindowSize { - t.Fatalf("Server mistakenly resets inbound flow control params: got %d, %d, %d, %d; want 0, 0, 0, >%d", ss.fc.pendingData, ss.fc.pendingUpdate, sc.fc.pendingData, sc.fc.pendingUpdate, initialWindowSize) + if sc.fc.pendingData != 0 || sc.fc.pendingUpdate <= initialWindowSize { + t.Fatalf("Server mistakenly resets inbound flow control params: got %d, %d; want 0, >%d", sc.fc.pendingData, sc.fc.pendingUpdate, initialWindowSize) } ct.CloseStream(s, nil) - // Test server behavior for violation of connection flow control window size restriction. - // - // Keep creating new streams until the connection window is drained on the server and - // the server tears down the connection. - for { - s, err := ct.NewStream(context.Background(), callHdr) - if err != nil { - // The server tears down the connection. - break - } - <-cc.writableChan - cc.framer.writeData(true, s.id, true, make([]byte, http2MaxFrameLen)) - cc.writableChan <- 0 - } ct.Close() server.stop() } @@ -1293,7 +1459,7 @@ func TestClientWithMisbehavedServer(t *testing.T) { break } } - if s.fc.pendingData <= initialWindowSize || s.fc.pendingUpdate != 0 || conn.fc.pendingData <= initialWindowSize || conn.fc.pendingUpdate != 0 { + if s.fc.pendingData <= initialWindowSize || s.fc.pendingUpdate != 0 || conn.fc.pendingData != 0 || conn.fc.pendingUpdate <= initialWindowSize { t.Fatalf("Client mistakenly updates inbound flow control params: got %d, %d, %d, %d; want >%d, %d, >%d, %d", s.fc.pendingData, s.fc.pendingUpdate, conn.fc.pendingData, conn.fc.pendingUpdate, initialWindowSize, 0, initialWindowSize, 0) } @@ -1305,25 +1471,9 @@ func TestClientWithMisbehavedServer(t *testing.T) { } conn.CloseStream(s, err) - if s.fc.pendingData != 0 || s.fc.pendingUpdate != 0 || conn.fc.pendingData != 0 || conn.fc.pendingUpdate <= initialWindowSize { - t.Fatalf("Client mistakenly resets inbound flow control params: got %d, %d, %d, %d; want 0, 0, 0, >%d", s.fc.pendingData, s.fc.pendingUpdate, conn.fc.pendingData, conn.fc.pendingUpdate, initialWindowSize) + if conn.fc.pendingData != 0 || conn.fc.pendingUpdate <= initialWindowSize { + t.Fatalf("Client mistakenly resets inbound flow control params: got %d, %d; want 0, >%d", conn.fc.pendingData, conn.fc.pendingUpdate, initialWindowSize) } - // Test the logic for the violation of the connection flow control window size restriction. - // - // Generate enough streams to drain the connection window. Make the server flood the traffic - // to violate flow control window size of the connection. - callHdr.Method = "foo.Connection" - for i := 0; i < int(initialConnWindowSize/initialWindowSize+10); i++ { - s, err := ct.NewStream(context.Background(), callHdr) - if err != nil { - break - } - if err := ct.Write(s, d, &Options{Last: true, Delay: false}); err != nil { - break - } - } - // http2Client.errChan is closed due to connection flow control window size violation. - <-conn.Error() ct.Close() server.stop() }