diff --git a/Documentation/grpc-metadata.md b/Documentation/grpc-metadata.md index b387e880..928f5572 100644 --- a/Documentation/grpc-metadata.md +++ b/Documentation/grpc-metadata.md @@ -70,7 +70,8 @@ Metadata can be retrieved from context using `FromContext`: ```go func (s *server) SomeRPC(ctx context.Context, in *pb.SomeRequest) (*pb.SomeResponse, err) { - md := metadata.FromContext(ctx) + md, ok := metadata.FromContext(ctx) + // do something with metadata } ``` diff --git a/test/end2end_test.go b/test/end2end_test.go index 99f48f63..acfb3828 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -1598,6 +1598,61 @@ func testExceedMaxStreamsLimit(t *testing.T, e env) { } } +func TestStreamsQuotaRecovery(t *testing.T) { + defer leakCheck(t)() + for _, e := range listTestEnv() { + testStreamsQuotaRecovery(t, e) + } +} + +func testStreamsQuotaRecovery(t *testing.T, e env) { + te := newTest(t, e) + te.declareLogNoise( + "http2Client.notifyError got notified that the client transport was broken", + "Conn.resetTransport failed to create client transport", + "grpc: the client connection is closing", + ) + te.maxStream = 1 // Allows 1 live stream. + te.startServer() + defer te.tearDown() + + cc := te.clientConn() + tc := testpb.NewTestServiceClient(cc) + ctx, cancel := context.WithCancel(context.Background()) + if _, err := tc.StreamingInputCall(ctx); err != nil { + t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, ", tc, err) + } + // Loop until the new max stream setting is effective. + for { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + _, err := tc.StreamingInputCall(ctx) + if err == nil { + time.Sleep(time.Second) + continue + } + if grpc.Code(err) == codes.DeadlineExceeded { + break + } + t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %d", tc, err, codes.DeadlineExceeded) + } + cancel() + + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + ctx, cancel := context.WithCancel(context.Background()) + if _, err := tc.StreamingInputCall(ctx); err != nil { + t.Errorf("%v.StreamingInputCall(_) = _, %v, want _, ", tc, err) + } + cancel() + }() + } + wg.Wait() +} + func TestCompressServerHasNoSupport(t *testing.T) { defer leakCheck(t)() for _, e := range listTestEnv() { diff --git a/transport/http2_client.go b/transport/http2_client.go index 4027614b..10004e4d 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -293,7 +293,10 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea } } if _, err := wait(ctx, t.shutdownChan, t.writableChan); err != nil { - // t.streamsQuota will be updated when t.CloseStream is invoked. + // Return the quota back now because there is no stream returned to the caller. + if _, ok := err.(StreamError); ok && checkStreamsQuota { + t.streamsQuota.add(1) + } return nil, err } t.mu.Lock()