From 8d1806945ef6811990c3cd01e3b5ea70badee0e6 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 11 May 2016 15:50:22 -0700 Subject: [PATCH 1/2] Release the quota taken in streamsQuota when an rpc is timeout or canceled. --- test/end2end_test.go | 55 +++++++++++++++++++++++++++++++++++++++ transport/http2_client.go | 5 +++- 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 5338e4aa..4d011196 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -1595,6 +1595,61 @@ func testExceedMaxStreamsLimit(t *testing.T, e env) { } } +func TestStreamsQuotaRecovery(t *testing.T) { + defer leakCheck(t)() + for _, e := range listTestEnv() { + testStreamsQuotaRecovery(t, e) + } +} + +func testStreamsQuotaRecovery(t *testing.T, e env) { + te := newTest(t, e) + te.declareLogNoise( + "http2Client.notifyError got notified that the client transport was broken", + "Conn.resetTransport failed to create client transport", + "grpc: the client connection is closing", + ) + te.maxStream = 1 // Allows 1 live stream. + te.startServer() + defer te.tearDown() + + cc := te.clientConn() + tc := testpb.NewTestServiceClient(cc) + ctx, cancel := context.WithCancel(context.Background()) + if _, err := tc.StreamingInputCall(ctx); err != nil { + t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, ", tc, err) + } + // Loop until the new max stream setting is effective. + for { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + _, err := tc.StreamingInputCall(ctx) + if err == nil { + time.Sleep(time.Second) + continue + } + if grpc.Code(err) == codes.DeadlineExceeded { + break + } + t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %d", tc, err, codes.DeadlineExceeded) + } + cancel() + + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + ctx, cancel := context.WithCancel(context.Background()) + if _, err := tc.StreamingInputCall(ctx); err != nil { + t.Errorf("%v.StreamingInputCall(_) = _, %v, want _, ", tc, err) + } + cancel() + }() + } + wg.Wait() +} + func TestCompressServerHasNoSupport(t *testing.T) { defer leakCheck(t)() for _, e := range listTestEnv() { diff --git a/transport/http2_client.go b/transport/http2_client.go index 8082fdc8..be521ffb 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -289,7 +289,10 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea } } if _, err := wait(ctx, t.shutdownChan, t.writableChan); err != nil { - // t.streamsQuota will be updated when t.CloseStream is invoked. + // Return the quota back now because there is no stream returned to the caller. + if _, ok := err.(StreamError); ok && checkStreamsQuota { + t.streamsQuota.add(1) + } return nil, err } t.mu.Lock() From c4adca718a3d7f08f08f146816b2272654756d43 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Thu, 12 May 2016 11:07:32 -0700 Subject: [PATCH 2/2] Documentation: fix metadata.FromContext example metadata.FromContext returns additional boolean value. This fixes the example in metadata documentation. --- Documentation/grpc-metadata.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Documentation/grpc-metadata.md b/Documentation/grpc-metadata.md index b387e880..928f5572 100644 --- a/Documentation/grpc-metadata.md +++ b/Documentation/grpc-metadata.md @@ -70,7 +70,8 @@ Metadata can be retrieved from context using `FromContext`: ```go func (s *server) SomeRPC(ctx context.Context, in *pb.SomeRequest) (*pb.SomeResponse, err) { - md := metadata.FromContext(ctx) + md, ok := metadata.FromContext(ctx) + // do something with metadata } ```