From 0dc1a7dd670bfa6ce92934bd83aadaa1232f9b12 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Wed, 1 Mar 2017 11:03:46 -0800 Subject: [PATCH] post review updates --- test/end2end_test.go | 11 +++++++---- transport/http2_client.go | 14 ++++++++++++++ transport/transport.go | 4 ++-- 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index f79b91e0..199b0cf8 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -2673,21 +2673,24 @@ func testExceedMaxStreamsLimit(t *testing.T, e env) { const defaultMaxStreamsClient = 100 -func TestClientExceedMaxStreamsLimit(t *testing.T) { +func TestExceedDefaultMaxStreamsLimit(t *testing.T) { defer leakCheck(t)() for _, e := range listTestEnv() { - testClientExceedMaxStreamsLimit(t, e) + testExceedDefaultMaxStreamsLimit(t, e) } } -func testClientExceedMaxStreamsLimit(t *testing.T, e env) { +func testExceedDefaultMaxStreamsLimit(t *testing.T, e env) { te := newTest(t, e) te.declareLogNoise( "http2Client.notifyError got notified that the client transport was broken", "Conn.resetTransport failed to create client transport", "grpc: the connection is closing", ) - te.maxStream = 0 // Server allows infinite streams. The cap should be on client side. + // When masStream is set to 0 the server doesn't send a settings frame for + // MaxConcurrentStreams, essentially allowing infinite (math.MaxInt32) streams. + // In such a case, there should be a default cap on the client-side. + te.maxStream = 0 te.startServer(&testServer{security: e.security}) defer te.tearDown() diff --git a/transport/http2_client.go b/transport/http2_client.go index 696bfa7b..001522bd 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -491,8 +491,17 @@ func (t *http2Client) CloseStream(s *Stream, err error) { return } t.mu.Unlock() + // rstStream is true in case the stream is being closed at the client-side + // and the server needs to be intimated about it by sending a RST_STREAM + // frame. + // To make sure this frame is written to the wire before the headers of the + // next stream waiting for streamsQuota, we add to streamsQuota pool only + // after having acquired the writableChan to send RST_STREAM out (look at + // the controller() routine). var rstStream bool defer func() { + // In case, the client doesn't have to send RST_STREAM to server + // we can safely add back to streamsQuota pool now. if !rstStream { t.streamsQuota.add(1) return @@ -1068,6 +1077,11 @@ func (t *http2Client) controller() { t.framer.writeSettings(true, i.ss...) } case *resetStream: + // If the server needs to be to intimated about stream closing, + // then we need to make sure the RST_STREAM frame is written to + // the wire before the headers of the next stream waiting on + // streamQuota. We ensure this by adding to the streamsQuota pool + // only after having acquired the writableChan to send RST_STREAM. t.streamsQuota.add(1) t.framer.writeRSTStream(true, i.streamID, i.code) case *flushIO: diff --git a/transport/transport.go b/transport/transport.go index a94337bb..34c4dbc9 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -213,8 +213,8 @@ type Stream struct { // the status received from the server. statusCode codes.Code statusDesc string - // rstStream is a flag that is true when a RST stream frame - // is sent to the server signifying that this stream is closing. + // rstStream indicates whether a RST_STREAM frame needs to be sent + // to the server to signify that this stream is closing. rstStream bool }