do right counting for max-streams flow control

This commit is contained in:
iamqizhao
2015-07-28 16:41:46 -07:00
parent 31fa21984e
commit 4da1327a66
3 changed files with 19 additions and 20 deletions

View File

@ -105,7 +105,9 @@ type quotaPool struct {
// newQuotaPool creates a quotaPool which has quota q available to consume.
func newQuotaPool(q int) *quotaPool {
qb := &quotaPool{c: make(chan int, 1)}
if q > 0 {
qb.c <- q
}
return qb
}

View File

@ -196,7 +196,7 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e
return t, nil
}
func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr, sq bool) *Stream {
func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
fc := &inFlow{
limit: initialWindowSize,
conn: t.fc,
@ -206,7 +206,6 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr, sq bool)
id: t.nextID,
method: callHdr.Method,
buf: newRecvBuffer(),
updateStreams: sq,
fc: fc,
sendQuotaPool: newQuotaPool(int(t.streamSendQuota)),
headerChan: make(chan struct{}),
@ -267,7 +266,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
return nil, err
}
t.mu.Lock()
s := t.newStream(ctx, callHdr, checkStreamsQuota)
s := t.newStream(ctx, callHdr)
t.activeStreams[s.id] = s
t.mu.Unlock()
// HPACK encodes various headers. Note that once WriteField(...) is
@ -336,10 +335,14 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
// CloseStream clears the footprint of a stream when the stream is not needed any more.
// This must not be executed in reader's goroutine.
func (t *http2Client) CloseStream(s *Stream, err error) {
var updateStreams bool
t.mu.Lock()
if t.streamsQuota != nil {
updateStreams = true
}
delete(t.activeStreams, s.id)
t.mu.Unlock()
if s.updateStreams {
if updateStreams {
t.streamsQuota.add(1)
}
s.mu.Lock()
@ -737,7 +740,7 @@ func (t *http2Client) applySettings(ss []http2.Setting) {
t.mu.Lock()
reset := t.streamsQuota != nil
if !reset {
t.streamsQuota = newQuotaPool(int(s.Val))
t.streamsQuota = newQuotaPool(int(s.Val) - len(t.activeStreams))
}
ms := t.maxStreams
t.maxStreams = int(s.Val)

View File

@ -172,12 +172,6 @@ type Stream struct {
method string
buf *recvBuffer
dec io.Reader
// updateStreams indicates whether the transport's streamsQuota needed
// to be updated when this stream is closed. It is false when the transport
// sticks to the initial infinite value of the number of concurrent streams.
// Ture otherwise.
updateStreams bool
fc *inFlow
recvQuota uint32
// The accumulated inbound quota pending for window update.