new streams block when the max concurrent stram limit is reached.

This commit is contained in:
iamqizhao
2015-05-27 19:03:21 -07:00
parent ffe2c5d7aa
commit 458d514e70
2 changed files with 9 additions and 8 deletions

View File

@ -806,12 +806,12 @@ func testExceedMaxStreamsLimit(t *testing.T, e env) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// The 2nd stream should block until its deadline exceeds.
ctx, _ := context.WithTimeout(context.Background(), time.Second)
if _, err := tc.StreamingInputCall(ctx); grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("%v.StreamingInputCall(%v) = _, %v, want error code %d", tc, ctx, err, codes.DeadlineExceeded)
t.Errorf("%v.StreamingInputCall(%v) = _, %v, want error code %d", tc, ctx, err, codes.DeadlineExceeded)
}
wg.Done()
}()
wg.Wait()
}

View File

@ -243,19 +243,20 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
t.mu.Unlock()
return nil, ErrConnClosing
}
if t.streamsQuota != nil {
q, err := wait(ctx, t.shutdownChan, t.streamsQuota.acquire())
checkStreamsQuota := t.streamsQuota != nil
t.mu.Unlock()
if checkStreamsQuota {
sq, err := wait(ctx, t.shutdownChan, t.streamsQuota.acquire())
if err != nil {
t.mu.Unlock()
return nil, err
}
// Returns the quota balance back.
if q > 1 {
t.streamsQuota.add(q - 1)
if sq > 1 {
t.streamsQuota.add(sq - 1)
}
}
t.mu.Unlock()
if _, err := wait(ctx, t.shutdownChan, t.writableChan); err != nil {
// t.streamsQuota will be updated when t.CloseStream is invoked.
return nil, err
}
t.mu.Lock()