Remove self-imposed limit on max concurrent streams if the server doesn't impose any. (#1624)
* Remove self-imposed limit on max concurrent streams if the server allows it. * Remove test necessitating buggy behavior.
This commit is contained in:
@ -3675,52 +3675,6 @@ func testExceedMaxStreamsLimit(t *testing.T, e env) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const defaultMaxStreamsClient = 100
|
|
||||||
|
|
||||||
func TestExceedDefaultMaxStreamsLimit(t *testing.T) {
|
|
||||||
defer leakcheck.Check(t)
|
|
||||||
for _, e := range listTestEnv() {
|
|
||||||
if e.name == "handler-tls" {
|
|
||||||
// The default max stream limit in handler_server is not 100?
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
testExceedDefaultMaxStreamsLimit(t, e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func testExceedDefaultMaxStreamsLimit(t *testing.T, e env) {
|
|
||||||
te := newTest(t, e)
|
|
||||||
te.declareLogNoise(
|
|
||||||
"http2Client.notifyError got notified that the client transport was broken",
|
|
||||||
"Conn.resetTransport failed to create client transport",
|
|
||||||
"grpc: the connection is closing",
|
|
||||||
)
|
|
||||||
// When masStream is set to 0 the server doesn't send a settings frame for
|
|
||||||
// MaxConcurrentStreams, essentially allowing infinite (math.MaxInt32) streams.
|
|
||||||
// In such a case, there should be a default cap on the client-side.
|
|
||||||
te.maxStream = 0
|
|
||||||
te.startServer(&testServer{security: e.security})
|
|
||||||
defer te.tearDown()
|
|
||||||
|
|
||||||
cc := te.clientConn()
|
|
||||||
tc := testpb.NewTestServiceClient(cc)
|
|
||||||
|
|
||||||
// Create as many streams as a client can.
|
|
||||||
for i := 0; i < defaultMaxStreamsClient; i++ {
|
|
||||||
if _, err := tc.StreamingInputCall(te.ctx); err != nil {
|
|
||||||
t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Trying to create one more should timeout.
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
|
||||||
defer cancel()
|
|
||||||
_, err := tc.StreamingInputCall(ctx)
|
|
||||||
if err == nil || grpc.Code(err) != codes.DeadlineExceeded {
|
|
||||||
t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestStreamsQuotaRecovery(t *testing.T) {
|
func TestStreamsQuotaRecovery(t *testing.T) {
|
||||||
defer leakcheck.Check(t)
|
defer leakcheck.Check(t)
|
||||||
for _, e := range listTestEnv() {
|
for _, e := range listTestEnv() {
|
||||||
|
@ -918,15 +918,28 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
|
|||||||
s.write(recvMsg{err: io.EOF})
|
s.write(recvMsg{err: io.EOF})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *http2Client) handleSettings(f *http2.SettingsFrame) {
|
func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
|
||||||
if f.IsAck() {
|
if f.IsAck() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
var ss []http2.Setting
|
var ss []http2.Setting
|
||||||
|
isMaxConcurrentStreamsMissing := true
|
||||||
f.ForeachSetting(func(s http2.Setting) error {
|
f.ForeachSetting(func(s http2.Setting) error {
|
||||||
|
if s.ID == http2.SettingMaxConcurrentStreams {
|
||||||
|
isMaxConcurrentStreamsMissing = false
|
||||||
|
}
|
||||||
ss = append(ss, s)
|
ss = append(ss, s)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
if isFirst && isMaxConcurrentStreamsMissing {
|
||||||
|
// This means server is imposing no limits on
|
||||||
|
// maximum number of concurrent streams initiated by client.
|
||||||
|
// So we must remove our self-imposed limit.
|
||||||
|
ss = append(ss, http2.Setting{
|
||||||
|
ID: http2.SettingMaxConcurrentStreams,
|
||||||
|
Val: math.MaxUint32,
|
||||||
|
})
|
||||||
|
}
|
||||||
// The settings will be applied once the ack is sent.
|
// The settings will be applied once the ack is sent.
|
||||||
t.controlBuf.put(&settings{ack: true, ss: ss})
|
t.controlBuf.put(&settings{ack: true, ss: ss})
|
||||||
}
|
}
|
||||||
@ -1125,7 +1138,7 @@ func (t *http2Client) reader() {
|
|||||||
t.Close()
|
t.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
t.handleSettings(sf)
|
t.handleSettings(sf, true)
|
||||||
|
|
||||||
// loop to keep reading incoming messages on this transport.
|
// loop to keep reading incoming messages on this transport.
|
||||||
for {
|
for {
|
||||||
@ -1158,7 +1171,7 @@ func (t *http2Client) reader() {
|
|||||||
case *http2.RSTStreamFrame:
|
case *http2.RSTStreamFrame:
|
||||||
t.handleRSTStream(frame)
|
t.handleRSTStream(frame)
|
||||||
case *http2.SettingsFrame:
|
case *http2.SettingsFrame:
|
||||||
t.handleSettings(frame)
|
t.handleSettings(frame, false)
|
||||||
case *http2.PingFrame:
|
case *http2.PingFrame:
|
||||||
t.handlePing(frame)
|
t.handlePing(frame)
|
||||||
case *http2.GoAwayFrame:
|
case *http2.GoAwayFrame:
|
||||||
|
Reference in New Issue
Block a user