post review updates

This commit is contained in:
Mahak Mukhi
2017-03-01 11:03:46 -08:00
parent f28d487753
commit 0dc1a7dd67
3 changed files with 23 additions and 6 deletions

View File

@ -2673,21 +2673,24 @@ func testExceedMaxStreamsLimit(t *testing.T, e env) {
const defaultMaxStreamsClient = 100 const defaultMaxStreamsClient = 100
func TestClientExceedMaxStreamsLimit(t *testing.T) { func TestExceedDefaultMaxStreamsLimit(t *testing.T) {
defer leakCheck(t)() defer leakCheck(t)()
for _, e := range listTestEnv() { for _, e := range listTestEnv() {
testClientExceedMaxStreamsLimit(t, e) testExceedDefaultMaxStreamsLimit(t, e)
} }
} }
func testClientExceedMaxStreamsLimit(t *testing.T, e env) { func testExceedDefaultMaxStreamsLimit(t *testing.T, e env) {
te := newTest(t, e) te := newTest(t, e)
te.declareLogNoise( te.declareLogNoise(
"http2Client.notifyError got notified that the client transport was broken", "http2Client.notifyError got notified that the client transport was broken",
"Conn.resetTransport failed to create client transport", "Conn.resetTransport failed to create client transport",
"grpc: the connection is closing", "grpc: the connection is closing",
) )
te.maxStream = 0 // Server allows infinite streams. The cap should be on client side. // When masStream is set to 0 the server doesn't send a settings frame for
// MaxConcurrentStreams, essentially allowing infinite (math.MaxInt32) streams.
// In such a case, there should be a default cap on the client-side.
te.maxStream = 0
te.startServer(&testServer{security: e.security}) te.startServer(&testServer{security: e.security})
defer te.tearDown() defer te.tearDown()

View File

@ -491,8 +491,17 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
return return
} }
t.mu.Unlock() t.mu.Unlock()
// rstStream is true in case the stream is being closed at the client-side
// and the server needs to be intimated about it by sending a RST_STREAM
// frame.
// To make sure this frame is written to the wire before the headers of the
// next stream waiting for streamsQuota, we add to streamsQuota pool only
// after having acquired the writableChan to send RST_STREAM out (look at
// the controller() routine).
var rstStream bool var rstStream bool
defer func() { defer func() {
// In case, the client doesn't have to send RST_STREAM to server
// we can safely add back to streamsQuota pool now.
if !rstStream { if !rstStream {
t.streamsQuota.add(1) t.streamsQuota.add(1)
return return
@ -1068,6 +1077,11 @@ func (t *http2Client) controller() {
t.framer.writeSettings(true, i.ss...) t.framer.writeSettings(true, i.ss...)
} }
case *resetStream: case *resetStream:
// If the server needs to be to intimated about stream closing,
// then we need to make sure the RST_STREAM frame is written to
// the wire before the headers of the next stream waiting on
// streamQuota. We ensure this by adding to the streamsQuota pool
// only after having acquired the writableChan to send RST_STREAM.
t.streamsQuota.add(1) t.streamsQuota.add(1)
t.framer.writeRSTStream(true, i.streamID, i.code) t.framer.writeRSTStream(true, i.streamID, i.code)
case *flushIO: case *flushIO:

View File

@ -213,8 +213,8 @@ type Stream struct {
// the status received from the server. // the status received from the server.
statusCode codes.Code statusCode codes.Code
statusDesc string statusDesc string
// rstStream is a flag that is true when a RST stream frame // rstStream indicates whether a RST_STREAM frame needs to be sent
// is sent to the server signifying that this stream is closing. // to the server to signify that this stream is closing.
rstStream bool rstStream bool
} }