Decouple transport flow control from application read. (#1265)

* Decouple transport flow control from application read.

* post-review update

* Added comment in http2_server as well.

* Added another test

* Fixed typos in comments.
This commit is contained in:
MakMukhi
2017-06-01 09:42:07 -07:00
committed by GitHub
parent a113590521
commit 1e47334c73
4 changed files with 209 additions and 81 deletions

View File

@ -240,11 +240,3 @@ func (f *inFlow) onRead(n uint32) uint32 {
}
return 0
}
func (f *inFlow) resetPendingData() uint32 {
f.mu.Lock()
defer f.mu.Unlock()
n := f.pendingData
f.pendingData = 0
return n
}

View File

@ -595,11 +595,6 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
s.mu.Lock()
rstStream = s.rstStream
rstError = s.rstError
if q := s.fc.resetPendingData(); q > 0 {
if n := t.fc.onRead(q); n > 0 {
t.controlBuf.put(&windowUpdate{0, n})
}
}
if s.state == streamDone {
s.mu.Unlock()
return
@ -831,9 +826,6 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
if s.state == streamDone {
return
}
if w := t.fc.onRead(n); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
if w := s.fc.onRead(n); w > 0 {
t.controlBuf.put(&windowUpdate{s.id, w})
}
@ -845,22 +837,26 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
t.notifyError(connectionErrorf(true, err, "%v", err))
return
}
// Decouple connection's flow control from application's read.
// An update on connection's flow control should not depend on
// whether user application has read the data or not. Such a
// restriction is already imposed on the stream's flow control,
// and therefore the sender will be blocked anyways.
// Decoupling the connection flow control will prevent other
// active(fast) streams from starving in presence of slow or
// inactive streams.
if w := t.fc.onRead(uint32(size)); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
// Select the right stream to dispatch.
s, ok := t.getStream(f)
if !ok {
if w := t.fc.onRead(uint32(size)); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
return
}
if size > 0 {
s.mu.Lock()
if s.state == streamDone {
s.mu.Unlock()
// The stream has been closed. Release the corresponding quota.
if w := t.fc.onRead(uint32(size)); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
return
}
if err := s.fc.onData(uint32(size)); err != nil {
@ -872,9 +868,6 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
return
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
if w := t.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
t.controlBuf.put(&windowUpdate{s.id, w})
}

View File

@ -462,9 +462,6 @@ func (t *http2Server) updateWindow(s *Stream, n uint32) {
if s.state == streamDone {
return
}
if w := t.fc.onRead(n); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
if w := s.fc.onRead(n); w > 0 {
t.controlBuf.put(&windowUpdate{s.id, w})
}
@ -477,22 +474,26 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
t.Close()
return
}
// Decouple connection's flow control from application's read.
// An update on connection's flow control should not depend on
// whether user application has read the data or not. Such a
// restriction is already imposed on the stream's flow control,
// and therefore the sender will be blocked anyways.
// Decoupling the connection flow control will prevent other
// active(fast) streams from starving in presence of slow or
// inactive streams.
if w := t.fc.onRead(uint32(size)); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
// Select the right stream to dispatch.
s, ok := t.getStream(f)
if !ok {
if w := t.fc.onRead(uint32(size)); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
return
}
if size > 0 {
s.mu.Lock()
if s.state == streamDone {
s.mu.Unlock()
// The stream has been closed. Release the corresponding quota.
if w := t.fc.onRead(uint32(size)); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
return
}
if err := s.fc.onData(uint32(size)); err != nil {
@ -502,9 +503,6 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
return
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
if w := t.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
t.controlBuf.put(&windowUpdate{s.id, w})
}
@ -1066,11 +1064,6 @@ func (t *http2Server) closeStream(s *Stream) {
// called to interrupt the potential blocking on other goroutines.
s.cancel()
s.mu.Lock()
if q := s.fc.resetPendingData(); q > 0 {
if w := t.fc.onRead(q); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
}
if s.state == streamDone {
s.mu.Unlock()
return

View File

@ -481,7 +481,7 @@ func TestMaxConnectionAge(t *testing.T) {
}
}
// TestKeepaliveServer tests that a server closes conneciton with a client that doesn't respond to keepalive pings.
// TestKeepaliveServer tests that a server closes connection with a client that doesn't respond to keepalive pings.
func TestKeepaliveServer(t *testing.T) {
serverConfig := &ServerConfig{
KeepaliveParams: keepalive.ServerParameters{
@ -1164,6 +1164,186 @@ func TestServerContextCanceledOnClosedConnection(t *testing.T) {
server.stop()
}
func TestClientConnDecoupledFromApplicationRead(t *testing.T) {
connectOptions := ConnectOptions{
InitialWindowSize: defaultWindowSize,
InitialConnWindowSize: defaultWindowSize,
}
server, client := setUpWithOptions(t, 0, &ServerConfig{}, suspended, connectOptions)
defer server.stop()
defer client.Close()
waitWhileTrue(t, func() (bool, error) {
server.mu.Lock()
defer server.mu.Unlock()
if len(server.conns) == 0 {
return true, fmt.Errorf("timed-out while waiting for connection to be created on the server")
}
return false, nil
})
var st *http2Server
server.mu.Lock()
for k := range server.conns {
st = k.(*http2Server)
}
server.mu.Unlock()
cstream1, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
if err != nil {
t.Fatalf("Client failed to create first stream. Err: %v", err)
}
var sstream1 *Stream
// Access stream on the server.
waitWhileTrue(t, func() (bool, error) {
st.mu.Lock()
defer st.mu.Unlock()
if len(st.activeStreams) != 1 {
return true, fmt.Errorf("timed-out while waiting for server to have created a stream")
}
for _, v := range st.activeStreams {
sstream1 = v
}
return false, nil
})
// Exhaust client's connection window.
<-st.writableChan
if err := st.framer.writeData(true, sstream1.id, true, make([]byte, defaultWindowSize)); err != nil {
st.writableChan <- 0
t.Fatalf("Server failed to write data. Err: %v", err)
}
st.writableChan <- 0
// Create another stream on client.
cstream2, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
if err != nil {
t.Fatalf("Client failed to create second stream. Err: %v", err)
}
var sstream2 *Stream
waitWhileTrue(t, func() (bool, error) {
st.mu.Lock()
defer st.mu.Unlock()
if len(st.activeStreams) != 2 {
return true, fmt.Errorf("timed-out while waiting for server to have created the second stream")
}
for _, v := range st.activeStreams {
if v.id == cstream2.id {
sstream2 = v
}
}
if sstream2 == nil {
return true, fmt.Errorf("didn't find stream corresponding to client cstream.id: %v on the server", cstream2.id)
}
return false, nil
})
// Server should be able to send data on the new stream, even though the client hasn't read anything on the first stream.
<-st.writableChan
if err := st.framer.writeData(true, sstream2.id, true, make([]byte, defaultWindowSize)); err != nil {
st.writableChan <- 0
t.Fatalf("Server failed to write data. Err: %v", err)
}
st.writableChan <- 0
// Client should be able to read data on second stream.
if _, err := cstream2.Read(make([]byte, defaultWindowSize)); err != nil {
t.Fatalf("_.Read(_) = _, %v, want _, <nil>", err)
}
// Client should be able to read data on first stream.
if _, err := cstream1.Read(make([]byte, defaultWindowSize)); err != nil {
t.Fatalf("_.Read(_) = _, %v, want _, <nil>", err)
}
}
func TestServerConnDecoupledFromApplicationRead(t *testing.T) {
serverConfig := &ServerConfig{
InitialWindowSize: defaultWindowSize,
InitialConnWindowSize: defaultWindowSize,
}
server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer server.stop()
defer client.Close()
waitWhileTrue(t, func() (bool, error) {
server.mu.Lock()
defer server.mu.Unlock()
if len(server.conns) == 0 {
return true, fmt.Errorf("timed-out while waiting for connection to be created on the server")
}
return false, nil
})
var st *http2Server
server.mu.Lock()
for k := range server.conns {
st = k.(*http2Server)
}
server.mu.Unlock()
cstream1, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
if err != nil {
t.Fatalf("Failed to create 1st stream. Err: %v", err)
}
// Exhaust server's connection window.
if err := client.Write(cstream1, make([]byte, defaultWindowSize), &Options{Last: true}); err != nil {
t.Fatalf("Client failed to write data. Err: %v", err)
}
//Client should be able to create another stream and send data on it.
cstream2, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
if err != nil {
t.Fatalf("Failed to create 2nd stream. Err: %v", err)
}
if err := client.Write(cstream2, make([]byte, defaultWindowSize), &Options{}); err != nil {
t.Fatalf("Client failed to write data. Err: %v", err)
}
// Get the streams on server.
waitWhileTrue(t, func() (bool, error) {
st.mu.Lock()
defer st.mu.Unlock()
if len(st.activeStreams) != 2 {
return true, fmt.Errorf("timed-out while waiting for server to have created the streams")
}
return false, nil
})
var sstream1 *Stream
st.mu.Lock()
for _, v := range st.activeStreams {
if v.id == 1 {
sstream1 = v
}
}
st.mu.Unlock()
// Trying to write more on a max-ed out stream should result in a RST_STREAM from the server.
ct := client.(*http2Client)
<-ct.writableChan
if err := ct.framer.writeData(true, cstream2.id, true, make([]byte, 1)); err != nil {
t.Fatalf("Client failed to write. Err: %v", err)
}
ct.writableChan <- 0
code := http2ErrConvTab[http2.ErrCodeFlowControl]
waitWhileTrue(t, func() (bool, error) {
cstream2.mu.Lock()
defer cstream2.mu.Unlock()
if cstream2.status.Code() != code {
return true, fmt.Errorf("want code = %v, got %v", code, cstream2.status.Code())
}
return false, nil
})
// Reading from the stream on server should succeed.
if _, err := sstream1.Read(make([]byte, defaultWindowSize)); err != nil {
t.Fatalf("_.Read(_) = %v, want <nil>", err)
}
if _, err := sstream1.Read(make([]byte, 1)); err != io.EOF {
t.Fatalf("_.Read(_) = %v, want io.EOF", err)
}
}
func TestServerWithMisbehavedClient(t *testing.T) {
server, ct := setUp(t, 0, math.MaxUint32, suspended)
callHdr := &CallHdr{
@ -1224,7 +1404,7 @@ func TestServerWithMisbehavedClient(t *testing.T) {
}
ss.fc.mu.Unlock()
}
if ss.fc.pendingData != http2MaxFrameLen || ss.fc.pendingUpdate != 0 || sc.fc.pendingData != http2MaxFrameLen || sc.fc.pendingUpdate != 0 {
if ss.fc.pendingData != http2MaxFrameLen || ss.fc.pendingUpdate != 0 || sc.fc.pendingData != 0 || sc.fc.pendingUpdate != http2MaxFrameLen {
t.Fatalf("Server mistakenly updates inbound flow control params: got %d, %d, %d, %d; want %d, %d, %d, %d", ss.fc.pendingData, ss.fc.pendingUpdate, sc.fc.pendingData, sc.fc.pendingUpdate, http2MaxFrameLen, 0, http2MaxFrameLen, 0)
}
// Keep sending until the server inbound window is drained for that stream.
@ -1245,24 +1425,10 @@ func TestServerWithMisbehavedClient(t *testing.T) {
t.Fatalf("%v got status %v; want Code=%v", s, s.status, code)
}
if ss.fc.pendingData != 0 || ss.fc.pendingUpdate != 0 || sc.fc.pendingData != 0 || sc.fc.pendingUpdate <= initialWindowSize {
t.Fatalf("Server mistakenly resets inbound flow control params: got %d, %d, %d, %d; want 0, 0, 0, >%d", ss.fc.pendingData, ss.fc.pendingUpdate, sc.fc.pendingData, sc.fc.pendingUpdate, initialWindowSize)
if sc.fc.pendingData != 0 || sc.fc.pendingUpdate <= initialWindowSize {
t.Fatalf("Server mistakenly resets inbound flow control params: got %d, %d; want 0, >%d", sc.fc.pendingData, sc.fc.pendingUpdate, initialWindowSize)
}
ct.CloseStream(s, nil)
// Test server behavior for violation of connection flow control window size restriction.
//
// Keep creating new streams until the connection window is drained on the server and
// the server tears down the connection.
for {
s, err := ct.NewStream(context.Background(), callHdr)
if err != nil {
// The server tears down the connection.
break
}
<-cc.writableChan
cc.framer.writeData(true, s.id, true, make([]byte, http2MaxFrameLen))
cc.writableChan <- 0
}
ct.Close()
server.stop()
}
@ -1293,7 +1459,7 @@ func TestClientWithMisbehavedServer(t *testing.T) {
break
}
}
if s.fc.pendingData <= initialWindowSize || s.fc.pendingUpdate != 0 || conn.fc.pendingData <= initialWindowSize || conn.fc.pendingUpdate != 0 {
if s.fc.pendingData <= initialWindowSize || s.fc.pendingUpdate != 0 || conn.fc.pendingData != 0 || conn.fc.pendingUpdate <= initialWindowSize {
t.Fatalf("Client mistakenly updates inbound flow control params: got %d, %d, %d, %d; want >%d, %d, >%d, %d", s.fc.pendingData, s.fc.pendingUpdate, conn.fc.pendingData, conn.fc.pendingUpdate, initialWindowSize, 0, initialWindowSize, 0)
}
@ -1305,25 +1471,9 @@ func TestClientWithMisbehavedServer(t *testing.T) {
}
conn.CloseStream(s, err)
if s.fc.pendingData != 0 || s.fc.pendingUpdate != 0 || conn.fc.pendingData != 0 || conn.fc.pendingUpdate <= initialWindowSize {
t.Fatalf("Client mistakenly resets inbound flow control params: got %d, %d, %d, %d; want 0, 0, 0, >%d", s.fc.pendingData, s.fc.pendingUpdate, conn.fc.pendingData, conn.fc.pendingUpdate, initialWindowSize)
if conn.fc.pendingData != 0 || conn.fc.pendingUpdate <= initialWindowSize {
t.Fatalf("Client mistakenly resets inbound flow control params: got %d, %d; want 0, >%d", conn.fc.pendingData, conn.fc.pendingUpdate, initialWindowSize)
}
// Test the logic for the violation of the connection flow control window size restriction.
//
// Generate enough streams to drain the connection window. Make the server flood the traffic
// to violate flow control window size of the connection.
callHdr.Method = "foo.Connection"
for i := 0; i < int(initialConnWindowSize/initialWindowSize+10); i++ {
s, err := ct.NewStream(context.Background(), callHdr)
if err != nil {
break
}
if err := ct.Write(s, d, &Options{Last: true, Delay: false}); err != nil {
break
}
}
// http2Client.errChan is closed due to connection flow control window size violation.
<-conn.Error()
ct.Close()
server.stop()
}