From 1b5f15dda802d672f832beb8ff65ebeccb3fd44b Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Tue, 26 May 2015 18:14:05 -0700 Subject: [PATCH] blocking when max concurrent stream limit is reached. --- test/end2end_test.go | 30 +++++++++++++++++----------- transport/http2_client.go | 42 +++++++++++++++++++++++++++++---------- transport/transport.go | 11 ++++++++++ 3 files changed, 61 insertions(+), 22 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 3f55024b..cc475cfb 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -795,17 +795,23 @@ func testExceedMaxStreamsLimit(t *testing.T, e env) { s, cc := setUp(1, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) - var err error - for { - time.Sleep(2 * time.Millisecond) - _, err = tc.StreamingInputCall(context.Background()) - // Loop until the settings of max concurrent streams is - // received by the client. - if err != nil { - break + // Perform an unary RPC to make sure the new settings were propagated to the client. + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + t.Fatalf("fhaof") + } + // Initiate the 1st stream + if _, err := tc.StreamingInputCall(context.Background()); err != nil { + t.Fatalf("faf") + } + var wg sync.WaitGroup + wg.Add(1) + go func() { + // The 2nd stream should block until its deadline exceeds. + ctx, _ := context.WithTimeout(context.Background(), time.Second) + if _, err := tc.StreamingInputCall(ctx); grpc.Code(err) != codes.DeadlineExceeded { + t.Fatalf("1414") } - } - if grpc.Code(err) != codes.Unavailable { - t.Fatalf("got %v, want error code %d", err, codes.Unavailable) - } + wg.Done() + }() + wg.Wait() } diff --git a/transport/http2_client.go b/transport/http2_client.go index 6ba93448..1420b166 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -79,6 +79,7 @@ type http2Client struct { fc *inFlow // sendQuotaPool provides flow control to outbound message. sendQuotaPool *quotaPool + streamsQuota *quotaPool // The scheme used: https if TLS is on, http otherwise. scheme string @@ -89,7 +90,7 @@ type http2Client struct { state transportState // the state of underlying connection activeStreams map[uint32]*Stream // The max number of concurrent streams - maxStreams uint32 + maxStreams int // the per-stream outbound flow control window size set by the peer. streamSendQuota uint32 } @@ -174,8 +175,8 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e scheme: scheme, state: reachable, activeStreams: make(map[uint32]*Stream), - maxStreams: math.MaxUint32, authCreds: opts.AuthOptions, + maxStreams: math.MaxInt32, streamSendQuota: defaultWindowSize, } go t.controller() @@ -236,19 +237,26 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea authData[k] = v } } - if _, err := wait(ctx, t.shutdownChan, t.writableChan); err != nil { - return nil, err - } t.mu.Lock() if t.state != reachable { t.mu.Unlock() return nil, ErrConnClosing } - if uint32(len(t.activeStreams)) >= t.maxStreams { - t.mu.Unlock() - t.writableChan <- 0 - return nil, StreamErrorf(codes.Unavailable, "transport: failed to create new stream because the limit has been reached.") + if t.streamsQuota != nil { + q, err := wait(ctx, t.shutdownChan, t.streamsQuota.acquire()) + if err != nil { + t.mu.Unlock() + return nil, err + } + if q > 1 { + t.streamsQuota.add(q-1) + } } + t.mu.Unlock() + if _, err := wait(ctx, t.shutdownChan, t.writableChan); err != nil { + return nil, err + } + t.mu.Lock() s := t.newStream(ctx, callHdr) t.activeStreams[s.id] = s t.mu.Unlock() @@ -318,6 +326,9 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea func (t *http2Client) CloseStream(s *Stream, err error) { t.mu.Lock() delete(t.activeStreams, s.id) + if t.streamsQuota != nil { + t.streamsQuota.add(1) + } t.mu.Unlock() s.mu.Lock() if q := s.fc.restoreConn(); q > 0 { @@ -558,7 +569,18 @@ func (t *http2Client) handleSettings(f *http2.SettingsFrame) { defer t.mu.Unlock() switch s.ID { case http2.SettingMaxConcurrentStreams: - t.maxStreams = v + // TODO(zhaoq): This is a hack to avoid significant refactoring of the + // code to deal with int32 overflow. Have a better way to handle this + // later. + if v > math.MaxInt32 { + v = math.MaxInt32 + } + if t.streamsQuota == nil { + t.streamsQuota = newQuotaPool(int(v)) + } else { + t.streamsQuota.reset(int(v) - t.maxStreams) + } + t.maxStreams = int(v) case http2.SettingInitialWindowSize: for _, s := range t.activeStreams { // Adjust the sending quota for each s. diff --git a/transport/transport.go b/transport/transport.go index 5dfd89f0..052f2618 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -451,3 +451,14 @@ func wait(ctx context.Context, closing <-chan struct{}, proceed <-chan int) (int return i, nil } } + +func wait64(ctx context.Context, closing <-chan struct{}, proceed <-chan int64) (int64, error) { + select { + case <-ctx.Done(): + return 0, ContextErr(ctx.Err()) + case <-closing: + return 0, ErrConnClosing + case i := <-proceed: + return i, nil + } +}