diff --git a/transport/control.go b/transport/control.go index c5fbe23a..6629ce7b 100644 --- a/transport/control.go +++ b/transport/control.go @@ -105,7 +105,9 @@ type quotaPool struct { // newQuotaPool creates a quotaPool which has quota q available to consume. func newQuotaPool(q int) *quotaPool { qb := "aPool{c: make(chan int, 1)} - qb.c <- q + if q > 0 { + qb.c <- q + } return qb } diff --git a/transport/http2_client.go b/transport/http2_client.go index f956b1ea..035b53f6 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -196,17 +196,16 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e return t, nil } -func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr, sq bool) *Stream { +func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { fc := &inFlow{ limit: initialWindowSize, conn: t.fc, } // TODO(zhaoq): Handle uint32 overflow of Stream.id. s := &Stream{ - id: t.nextID, - method: callHdr.Method, - buf: newRecvBuffer(), - updateStreams: sq, + id: t.nextID, + method: callHdr.Method, + buf: newRecvBuffer(), fc: fc, sendQuotaPool: newQuotaPool(int(t.streamSendQuota)), headerChan: make(chan struct{}), @@ -267,7 +266,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea return nil, err } t.mu.Lock() - s := t.newStream(ctx, callHdr, checkStreamsQuota) + s := t.newStream(ctx, callHdr) t.activeStreams[s.id] = s t.mu.Unlock() // HPACK encodes various headers. Note that once WriteField(...) is @@ -336,10 +335,14 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea // CloseStream clears the footprint of a stream when the stream is not needed any more. // This must not be executed in reader's goroutine. func (t *http2Client) CloseStream(s *Stream, err error) { + var updateStreams bool t.mu.Lock() + if t.streamsQuota != nil { + updateStreams = true + } delete(t.activeStreams, s.id) t.mu.Unlock() - if s.updateStreams { + if updateStreams { t.streamsQuota.add(1) } s.mu.Lock() @@ -737,7 +740,7 @@ func (t *http2Client) applySettings(ss []http2.Setting) { t.mu.Lock() reset := t.streamsQuota != nil if !reset { - t.streamsQuota = newQuotaPool(int(s.Val)) + t.streamsQuota = newQuotaPool(int(s.Val) - len(t.activeStreams)) } ms := t.maxStreams t.maxStreams = int(s.Val) diff --git a/transport/transport.go b/transport/transport.go index c2ac3f88..58436f01 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -169,17 +169,11 @@ type Stream struct { ctx context.Context cancel context.CancelFunc // method records the associated RPC method of the stream. - method string - buf *recvBuffer - dec io.Reader - - // updateStreams indicates whether the transport's streamsQuota needed - // to be updated when this stream is closed. It is false when the transport - // sticks to the initial infinite value of the number of concurrent streams. - // Ture otherwise. - updateStreams bool - fc *inFlow - recvQuota uint32 + method string + buf *recvBuffer + dec io.Reader + fc *inFlow + recvQuota uint32 // The accumulated inbound quota pending for window update. updateQuota uint32 // The handler to control the window update procedure for both this