Merge pull request #271 from iamqizhao/master
Do right counting for max-stream settings
This commit is contained in:
@ -104,8 +104,14 @@ type quotaPool struct {
|
|||||||
|
|
||||||
// newQuotaPool creates a quotaPool which has quota q available to consume.
|
// newQuotaPool creates a quotaPool which has quota q available to consume.
|
||||||
func newQuotaPool(q int) *quotaPool {
|
func newQuotaPool(q int) *quotaPool {
|
||||||
qb := "aPool{c: make(chan int, 1)}
|
qb := "aPool{
|
||||||
|
c: make(chan int, 1),
|
||||||
|
}
|
||||||
|
if q > 0 {
|
||||||
qb.c <- q
|
qb.c <- q
|
||||||
|
} else {
|
||||||
|
qb.quota = q
|
||||||
|
}
|
||||||
return qb
|
return qb
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -196,7 +196,7 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e
|
|||||||
return t, nil
|
return t, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr, sq bool) *Stream {
|
func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
|
||||||
fc := &inFlow{
|
fc := &inFlow{
|
||||||
limit: initialWindowSize,
|
limit: initialWindowSize,
|
||||||
conn: t.fc,
|
conn: t.fc,
|
||||||
@ -206,7 +206,6 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr, sq bool)
|
|||||||
id: t.nextID,
|
id: t.nextID,
|
||||||
method: callHdr.Method,
|
method: callHdr.Method,
|
||||||
buf: newRecvBuffer(),
|
buf: newRecvBuffer(),
|
||||||
updateStreams: sq,
|
|
||||||
fc: fc,
|
fc: fc,
|
||||||
sendQuotaPool: newQuotaPool(int(t.streamSendQuota)),
|
sendQuotaPool: newQuotaPool(int(t.streamSendQuota)),
|
||||||
headerChan: make(chan struct{}),
|
headerChan: make(chan struct{}),
|
||||||
@ -267,7 +266,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
s := t.newStream(ctx, callHdr, checkStreamsQuota)
|
s := t.newStream(ctx, callHdr)
|
||||||
t.activeStreams[s.id] = s
|
t.activeStreams[s.id] = s
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
// HPACK encodes various headers. Note that once WriteField(...) is
|
// HPACK encodes various headers. Note that once WriteField(...) is
|
||||||
@ -336,10 +335,14 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||||||
// CloseStream clears the footprint of a stream when the stream is not needed any more.
|
// CloseStream clears the footprint of a stream when the stream is not needed any more.
|
||||||
// This must not be executed in reader's goroutine.
|
// This must not be executed in reader's goroutine.
|
||||||
func (t *http2Client) CloseStream(s *Stream, err error) {
|
func (t *http2Client) CloseStream(s *Stream, err error) {
|
||||||
|
var updateStreams bool
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
|
if t.streamsQuota != nil {
|
||||||
|
updateStreams = true
|
||||||
|
}
|
||||||
delete(t.activeStreams, s.id)
|
delete(t.activeStreams, s.id)
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
if s.updateStreams {
|
if updateStreams {
|
||||||
t.streamsQuota.add(1)
|
t.streamsQuota.add(1)
|
||||||
}
|
}
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
@ -737,7 +740,7 @@ func (t *http2Client) applySettings(ss []http2.Setting) {
|
|||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
reset := t.streamsQuota != nil
|
reset := t.streamsQuota != nil
|
||||||
if !reset {
|
if !reset {
|
||||||
t.streamsQuota = newQuotaPool(int(s.Val))
|
t.streamsQuota = newQuotaPool(int(s.Val) - len(t.activeStreams))
|
||||||
}
|
}
|
||||||
ms := t.maxStreams
|
ms := t.maxStreams
|
||||||
t.maxStreams = int(s.Val)
|
t.maxStreams = int(s.Val)
|
||||||
|
@ -172,12 +172,6 @@ type Stream struct {
|
|||||||
method string
|
method string
|
||||||
buf *recvBuffer
|
buf *recvBuffer
|
||||||
dec io.Reader
|
dec io.Reader
|
||||||
|
|
||||||
// updateStreams indicates whether the transport's streamsQuota needed
|
|
||||||
// to be updated when this stream is closed. It is false when the transport
|
|
||||||
// sticks to the initial infinite value of the number of concurrent streams.
|
|
||||||
// Ture otherwise.
|
|
||||||
updateStreams bool
|
|
||||||
fc *inFlow
|
fc *inFlow
|
||||||
recvQuota uint32
|
recvQuota uint32
|
||||||
// The accumulated inbound quota pending for window update.
|
// The accumulated inbound quota pending for window update.
|
||||||
|
@ -355,6 +355,75 @@ func TestLargeMessageSuspension(t *testing.T) {
|
|||||||
server.stop()
|
server.stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMaxStreams(t *testing.T) {
|
||||||
|
server, ct := setUp(t, 0, 1, suspended)
|
||||||
|
callHdr := &CallHdr{
|
||||||
|
Host: "localhost",
|
||||||
|
Method: "foo.Large",
|
||||||
|
}
|
||||||
|
// Have a pending stream which takes all streams quota.
|
||||||
|
s, err := ct.NewStream(context.Background(), callHdr)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to open stream: %v", err)
|
||||||
|
}
|
||||||
|
cc, ok := ct.(*http2Client)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("Failed to convert %v to *http2Client", ct)
|
||||||
|
}
|
||||||
|
done := make(chan struct{})
|
||||||
|
ch := make(chan int)
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-time.After(5 * time.Millisecond):
|
||||||
|
ch <- 0
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
close(done)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
case <-done:
|
||||||
|
t.Fatalf("Client has not received the max stream setting in 5 seconds.")
|
||||||
|
}
|
||||||
|
cc.mu.Lock()
|
||||||
|
// cc.streamsQuota should be initialized once receiving the 1st setting frame from
|
||||||
|
// the server.
|
||||||
|
if cc.streamsQuota != nil {
|
||||||
|
cc.mu.Unlock()
|
||||||
|
select {
|
||||||
|
case <-cc.streamsQuota.acquire():
|
||||||
|
t.Fatalf("streamsQuota.acquire() becomes readable mistakenly.")
|
||||||
|
default:
|
||||||
|
if cc.streamsQuota.quota != 0 {
|
||||||
|
t.Fatalf("streamsQuota.quota got non-zero quota mistakenly.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
cc.mu.Unlock()
|
||||||
|
}
|
||||||
|
// Close the pending stream so that the streams quota becomes available for the next new stream.
|
||||||
|
ct.CloseStream(s, nil)
|
||||||
|
select {
|
||||||
|
case i := <-cc.streamsQuota.acquire():
|
||||||
|
if i != 1 {
|
||||||
|
t.Fatalf("streamsQuota.acquire() got %d quota, want 1.", i)
|
||||||
|
}
|
||||||
|
cc.streamsQuota.add(i)
|
||||||
|
default:
|
||||||
|
t.Fatalf("streamsQuota.acquire() is not readable.")
|
||||||
|
}
|
||||||
|
if _, err := ct.NewStream(context.Background(), callHdr); err != nil {
|
||||||
|
t.Fatalf("Failed to open stream: %v", err)
|
||||||
|
}
|
||||||
|
ct.Close()
|
||||||
|
server.stop()
|
||||||
|
}
|
||||||
|
|
||||||
func TestServerWithMisbehavedClient(t *testing.T) {
|
func TestServerWithMisbehavedClient(t *testing.T) {
|
||||||
server, ct := setUp(t, 0, math.MaxUint32, suspended)
|
server, ct := setUp(t, 0, math.MaxUint32, suspended)
|
||||||
callHdr := &CallHdr{
|
callHdr := &CallHdr{
|
||||||
|
Reference in New Issue
Block a user