From 4da1327a66482fbdc67ab7f3a4f13dc60b84ec73 Mon Sep 17 00:00:00 2001
From: iamqizhao <toqizhao@gmail.com>
Date: Tue, 28 Jul 2015 16:41:46 -0700
Subject: [PATCH] do right counting for max-streams flow control

---
 transport/control.go      |  4 +++-
 transport/http2_client.go | 19 +++++++++++--------
 transport/transport.go    | 16 +++++-----------
 3 files changed, 19 insertions(+), 20 deletions(-)

diff --git a/transport/control.go b/transport/control.go
index c5fbe23a..6629ce7b 100644
--- a/transport/control.go
+++ b/transport/control.go
@@ -105,7 +105,9 @@ type quotaPool struct {
 // newQuotaPool creates a quotaPool which has quota q available to consume.
 func newQuotaPool(q int) *quotaPool {
 	qb := &quotaPool{c: make(chan int, 1)}
-	qb.c <- q
+	if q > 0 {
+		qb.c <- q
+	}
 	return qb
 }
 
diff --git a/transport/http2_client.go b/transport/http2_client.go
index f956b1ea..035b53f6 100644
--- a/transport/http2_client.go
+++ b/transport/http2_client.go
@@ -196,17 +196,16 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e
 	return t, nil
 }
 
-func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr, sq bool) *Stream {
+func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
 	fc := &inFlow{
 		limit: initialWindowSize,
 		conn:  t.fc,
 	}
 	// TODO(zhaoq): Handle uint32 overflow of Stream.id.
 	s := &Stream{
-		id:            t.nextID,
-		method:        callHdr.Method,
-		buf:           newRecvBuffer(),
-		updateStreams: sq,
+		id:     t.nextID,
+		method: callHdr.Method,
+		buf:    newRecvBuffer(),
 		fc:            fc,
 		sendQuotaPool: newQuotaPool(int(t.streamSendQuota)),
 		headerChan:    make(chan struct{}),
@@ -267,7 +266,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
 		return nil, err
 	}
 	t.mu.Lock()
-	s := t.newStream(ctx, callHdr, checkStreamsQuota)
+	s := t.newStream(ctx, callHdr)
 	t.activeStreams[s.id] = s
 	t.mu.Unlock()
 	// HPACK encodes various headers. Note that once WriteField(...) is
@@ -336,10 +335,14 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
 // CloseStream clears the footprint of a stream when the stream is not needed any more.
 // This must not be executed in reader's goroutine.
 func (t *http2Client) CloseStream(s *Stream, err error) {
+	var updateStreams bool
 	t.mu.Lock()
+	if t.streamsQuota != nil {
+		updateStreams = true
+	}
 	delete(t.activeStreams, s.id)
 	t.mu.Unlock()
-	if s.updateStreams {
+	if updateStreams {
 		t.streamsQuota.add(1)
 	}
 	s.mu.Lock()
@@ -737,7 +740,7 @@ func (t *http2Client) applySettings(ss []http2.Setting) {
 			t.mu.Lock()
 			reset := t.streamsQuota != nil
 			if !reset {
-				t.streamsQuota = newQuotaPool(int(s.Val))
+				t.streamsQuota = newQuotaPool(int(s.Val) - len(t.activeStreams))
 			}
 			ms := t.maxStreams
 			t.maxStreams = int(s.Val)
diff --git a/transport/transport.go b/transport/transport.go
index c2ac3f88..58436f01 100644
--- a/transport/transport.go
+++ b/transport/transport.go
@@ -169,17 +169,11 @@ type Stream struct {
 	ctx    context.Context
 	cancel context.CancelFunc
 	// method records the associated RPC method of the stream.
-	method string
-	buf    *recvBuffer
-	dec    io.Reader
-
-	// updateStreams indicates whether the transport's streamsQuota needed
-	// to be updated when this stream is closed. It is false when the transport
-	// sticks to the initial infinite value of the number of concurrent streams.
-	// Ture otherwise.
-	updateStreams bool
-	fc            *inFlow
-	recvQuota     uint32
+	method    string
+	buf       *recvBuffer
+	dec       io.Reader
+	fc        *inFlow
+	recvQuota uint32
 	// The accumulated inbound quota pending for window update.
 	updateQuota uint32
 	// The handler to control the window update procedure for both this