diff --git a/test/end2end_test.go b/test/end2end_test.go index be9eb275..6d59d298 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3675,52 +3675,6 @@ func testExceedMaxStreamsLimit(t *testing.T, e env) { } } -const defaultMaxStreamsClient = 100 - -func TestExceedDefaultMaxStreamsLimit(t *testing.T) { - defer leakcheck.Check(t) - for _, e := range listTestEnv() { - if e.name == "handler-tls" { - // The default max stream limit in handler_server is not 100? - continue - } - testExceedDefaultMaxStreamsLimit(t, e) - } -} - -func testExceedDefaultMaxStreamsLimit(t *testing.T, e env) { - te := newTest(t, e) - te.declareLogNoise( - "http2Client.notifyError got notified that the client transport was broken", - "Conn.resetTransport failed to create client transport", - "grpc: the connection is closing", - ) - // When masStream is set to 0 the server doesn't send a settings frame for - // MaxConcurrentStreams, essentially allowing infinite (math.MaxInt32) streams. - // In such a case, there should be a default cap on the client-side. - te.maxStream = 0 - te.startServer(&testServer{security: e.security}) - defer te.tearDown() - - cc := te.clientConn() - tc := testpb.NewTestServiceClient(cc) - - // Create as many streams as a client can. - for i := 0; i < defaultMaxStreamsClient; i++ { - if _, err := tc.StreamingInputCall(te.ctx); err != nil { - t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, ", tc, err) - } - } - - // Trying to create one more should timeout. - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancel() - _, err := tc.StreamingInputCall(ctx) - if err == nil || grpc.Code(err) != codes.DeadlineExceeded { - t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded) - } -} - func TestStreamsQuotaRecovery(t *testing.T) { defer leakcheck.Check(t) for _, e := range listTestEnv() { diff --git a/transport/http2_client.go b/transport/http2_client.go index 1057512c..90cb5901 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -918,15 +918,28 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) { s.write(recvMsg{err: io.EOF}) } -func (t *http2Client) handleSettings(f *http2.SettingsFrame) { +func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) { if f.IsAck() { return } var ss []http2.Setting + isMaxConcurrentStreamsMissing := true f.ForeachSetting(func(s http2.Setting) error { + if s.ID == http2.SettingMaxConcurrentStreams { + isMaxConcurrentStreamsMissing = false + } ss = append(ss, s) return nil }) + if isFirst && isMaxConcurrentStreamsMissing { + // This means server is imposing no limits on + // maximum number of concurrent streams initiated by client. + // So we must remove our self-imposed limit. + ss = append(ss, http2.Setting{ + ID: http2.SettingMaxConcurrentStreams, + Val: math.MaxUint32, + }) + } // The settings will be applied once the ack is sent. t.controlBuf.put(&settings{ack: true, ss: ss}) } @@ -1125,7 +1138,7 @@ func (t *http2Client) reader() { t.Close() return } - t.handleSettings(sf) + t.handleSettings(sf, true) // loop to keep reading incoming messages on this transport. for { @@ -1158,7 +1171,7 @@ func (t *http2Client) reader() { case *http2.RSTStreamFrame: t.handleRSTStream(frame) case *http2.SettingsFrame: - t.handleSettings(frame) + t.handleSettings(frame, false) case *http2.PingFrame: t.handlePing(frame) case *http2.GoAwayFrame: