From df4f24b125eb9876766d0c3acb30ede1c5700368 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Mon, 13 Feb 2017 14:24:31 -0800 Subject: [PATCH] 1. Initialize streamsQuota at object creation. 2. Defer adding back to streamsQuota pool in CloseStream --- test/end2end_test.go | 2 +- transport/http2_client.go | 49 ++++++++++--------------------------- transport/transport_test.go | 5 +++- 3 files changed, 18 insertions(+), 38 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 0ba0256f..f79b91e0 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -2676,7 +2676,7 @@ const defaultMaxStreamsClient = 100 func TestClientExceedMaxStreamsLimit(t *testing.T) { defer leakCheck(t)() for _, e := range listTestEnv() { - testExceedMaxStreamsLimit(t, e) + testClientExceedMaxStreamsLimit(t, e) } } diff --git a/transport/http2_client.go b/transport/http2_client.go index a85552be..c9458fe1 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -209,6 +209,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( activeStreams: make(map[uint32]*Stream), creds: opts.PerRPCCredentials, maxStreams: defaultMaxStreamsClient, + streamsQuota: newQuotaPool(defaultMaxStreamsClient), streamSendQuota: defaultWindowSize, statsHandler: opts.StatsHandler, } @@ -337,21 +338,18 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea t.mu.Unlock() return nil, ErrConnClosing } - checkStreamsQuota := t.streamsQuota != nil t.mu.Unlock() - if checkStreamsQuota { - sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire()) - if err != nil { - return nil, err - } - // Returns the quota balance back. - if sq > 1 { - t.streamsQuota.add(sq - 1) - } + sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire()) + if err != nil { + return nil, err + } + // Returns the quota balance back. + if sq > 1 { + t.streamsQuota.add(sq - 1) } if _, err := wait(ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil { // Return the quota back now because there is no stream returned to the caller. - if _, ok := err.(StreamError); ok && checkStreamsQuota { + if _, ok := err.(StreamError); ok { t.streamsQuota.add(1) } return nil, err @@ -359,9 +357,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea t.mu.Lock() if t.state == draining { t.mu.Unlock() - if checkStreamsQuota { - t.streamsQuota.add(1) - } + t.streamsQuota.add(1) // Need to make t writable again so that the rpc in flight can still proceed. t.writableChan <- 0 return nil, ErrStreamDrain @@ -374,16 +370,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea s.clientStatsCtx = userCtx t.activeStreams[s.id] = s - // This stream is not counted when applySetings(...) initialize t.streamsQuota. - // Reset t.streamsQuota to the right value. - var reset bool - if !checkStreamsQuota && t.streamsQuota != nil { - reset = true - } t.mu.Unlock() - if reset { - t.streamsQuota.add(-1) - } // HPACK encodes various headers. Note that once WriteField(...) is // called, the corresponding headers/continuation frame has to be sent @@ -491,15 +478,11 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea // CloseStream clears the footprint of a stream when the stream is not needed any more. // This must not be executed in reader's goroutine. func (t *http2Client) CloseStream(s *Stream, err error) { - var updateStreams bool t.mu.Lock() if t.activeStreams == nil { t.mu.Unlock() return } - if t.streamsQuota != nil { - updateStreams = true - } delete(t.activeStreams, s.id) if t.state == draining && len(t.activeStreams) == 0 { // The transport is draining and s is the last live stream on t. @@ -508,9 +491,9 @@ func (t *http2Client) CloseStream(s *Stream, err error) { return } t.mu.Unlock() - if updateStreams { + defer func() { t.streamsQuota.add(1) - } + }() s.mu.Lock() if q := s.fc.resetPendingData(); q > 0 { if n := t.fc.onRead(q); n > 0 { @@ -1043,16 +1026,10 @@ func (t *http2Client) applySettings(ss []http2.Setting) { s.Val = math.MaxInt32 } t.mu.Lock() - reset := t.streamsQuota != nil - if !reset { - t.streamsQuota = newQuotaPool(int(s.Val) - len(t.activeStreams)) - } ms := t.maxStreams t.maxStreams = int(s.Val) t.mu.Unlock() - if reset { - t.streamsQuota.add(int(s.Val) - ms) - } + t.streamsQuota.add(int(s.Val) - ms) case http2.SettingInitialWindowSize: t.mu.Lock() for _, stream := range t.activeStreams { diff --git a/transport/transport_test.go b/transport/transport_test.go index 1ca6eb1a..e91fc6ed 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -507,7 +507,10 @@ func TestMaxStreams(t *testing.T) { case <-cc.streamsQuota.acquire(): t.Fatalf("streamsQuota.acquire() becomes readable mistakenly.") default: - if cc.streamsQuota.quota != 0 { + cc.streamsQuota.mu.Lock() + quota := cc.streamsQuota.quota + cc.streamsQuota.mu.Unlock() + if quota != 0 { t.Fatalf("streamsQuota.quota got non-zero quota mistakenly.") } }