From 458d514e70b3e5ba71287de5c338c8a6b8b92be3 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 27 May 2015 19:03:21 -0700 Subject: [PATCH] new streams block when the max concurrent stram limit is reached. --- test/end2end_test.go | 4 ++-- transport/http2_client.go | 13 +++++++------ 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index cf1e2915..69c131eb 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -806,12 +806,12 @@ func testExceedMaxStreamsLimit(t *testing.T, e env) { var wg sync.WaitGroup wg.Add(1) go func() { + defer wg.Done() // The 2nd stream should block until its deadline exceeds. ctx, _ := context.WithTimeout(context.Background(), time.Second) if _, err := tc.StreamingInputCall(ctx); grpc.Code(err) != codes.DeadlineExceeded { - t.Fatalf("%v.StreamingInputCall(%v) = _, %v, want error code %d", tc, ctx, err, codes.DeadlineExceeded) + t.Errorf("%v.StreamingInputCall(%v) = _, %v, want error code %d", tc, ctx, err, codes.DeadlineExceeded) } - wg.Done() }() wg.Wait() } diff --git a/transport/http2_client.go b/transport/http2_client.go index 35a69fc3..07c93299 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -243,19 +243,20 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea t.mu.Unlock() return nil, ErrConnClosing } - if t.streamsQuota != nil { - q, err := wait(ctx, t.shutdownChan, t.streamsQuota.acquire()) + checkStreamsQuota := t.streamsQuota != nil + t.mu.Unlock() + if checkStreamsQuota { + sq, err := wait(ctx, t.shutdownChan, t.streamsQuota.acquire()) if err != nil { - t.mu.Unlock() return nil, err } // Returns the quota balance back. - if q > 1 { - t.streamsQuota.add(q - 1) + if sq > 1 { + t.streamsQuota.add(sq - 1) } } - t.mu.Unlock() if _, err := wait(ctx, t.shutdownChan, t.writableChan); err != nil { + // t.streamsQuota will be updated when t.CloseStream is invoked. return nil, err } t.mu.Lock()