i) update streamsQuota only if streamsQuota is there when the stream is created; ii) move ops of streamsQuota out of mutex of the transport (except acquire())
This commit is contained in:
@ -795,7 +795,7 @@ func testExceedMaxStreamsLimit(t *testing.T, e env) {
|
|||||||
s, cc := setUp(1, e)
|
s, cc := setUp(1, e)
|
||||||
tc := testpb.NewTestServiceClient(cc)
|
tc := testpb.NewTestServiceClient(cc)
|
||||||
defer tearDown(s, cc)
|
defer tearDown(s, cc)
|
||||||
// Perform an unary RPC to make sure the new settings were propagated to the client.
|
// Perform a unary RPC to make sure the new settings were propagated to the client.
|
||||||
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
|
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
|
||||||
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", tc, err)
|
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", tc, err)
|
||||||
}
|
}
|
||||||
|
@ -190,7 +190,7 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e
|
|||||||
return t, nil
|
return t, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
|
func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr, sq bool) *Stream {
|
||||||
fc := &inFlow{
|
fc := &inFlow{
|
||||||
limit: initialWindowSize,
|
limit: initialWindowSize,
|
||||||
conn: t.fc,
|
conn: t.fc,
|
||||||
@ -200,6 +200,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
|
|||||||
id: t.nextID,
|
id: t.nextID,
|
||||||
method: callHdr.Method,
|
method: callHdr.Method,
|
||||||
buf: newRecvBuffer(),
|
buf: newRecvBuffer(),
|
||||||
|
updateStreams: sq,
|
||||||
fc: fc,
|
fc: fc,
|
||||||
sendQuotaPool: newQuotaPool(int(t.streamSendQuota)),
|
sendQuotaPool: newQuotaPool(int(t.streamSendQuota)),
|
||||||
headerChan: make(chan struct{}),
|
headerChan: make(chan struct{}),
|
||||||
@ -260,7 +261,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
s := t.newStream(ctx, callHdr)
|
s := t.newStream(ctx, callHdr, checkStreamsQuota)
|
||||||
t.activeStreams[s.id] = s
|
t.activeStreams[s.id] = s
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
// HPACK encodes various headers. Note that once WriteField(...) is
|
// HPACK encodes various headers. Note that once WriteField(...) is
|
||||||
@ -329,10 +330,10 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||||||
func (t *http2Client) CloseStream(s *Stream, err error) {
|
func (t *http2Client) CloseStream(s *Stream, err error) {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
delete(t.activeStreams, s.id)
|
delete(t.activeStreams, s.id)
|
||||||
if t.streamsQuota != nil {
|
t.mu.Unlock()
|
||||||
|
if s.updateStreams {
|
||||||
t.streamsQuota.add(1)
|
t.streamsQuota.add(1)
|
||||||
}
|
}
|
||||||
t.mu.Unlock()
|
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
if q := s.fc.restoreConn(); q > 0 {
|
if q := s.fc.restoreConn(); q > 0 {
|
||||||
t.controlBuf.put(&windowUpdate{0, q})
|
t.controlBuf.put(&windowUpdate{0, q})
|
||||||
@ -568,8 +569,6 @@ func (t *http2Client) handleSettings(f *http2.SettingsFrame) {
|
|||||||
}
|
}
|
||||||
f.ForeachSetting(func(s http2.Setting) error {
|
f.ForeachSetting(func(s http2.Setting) error {
|
||||||
if v, ok := f.Value(s.ID); ok {
|
if v, ok := f.Value(s.ID); ok {
|
||||||
t.mu.Lock()
|
|
||||||
defer t.mu.Unlock()
|
|
||||||
switch s.ID {
|
switch s.ID {
|
||||||
case http2.SettingMaxConcurrentStreams:
|
case http2.SettingMaxConcurrentStreams:
|
||||||
// TODO(zhaoq): This is a hack to avoid significant refactoring of the
|
// TODO(zhaoq): This is a hack to avoid significant refactoring of the
|
||||||
@ -578,18 +577,24 @@ func (t *http2Client) handleSettings(f *http2.SettingsFrame) {
|
|||||||
if v > math.MaxInt32 {
|
if v > math.MaxInt32 {
|
||||||
v = math.MaxInt32
|
v = math.MaxInt32
|
||||||
}
|
}
|
||||||
if t.streamsQuota == nil {
|
t.mu.Lock()
|
||||||
|
reset := t.streamsQuota != nil
|
||||||
|
ms := t.maxStreams
|
||||||
|
t.maxStreams = int(v)
|
||||||
|
t.mu.Unlock()
|
||||||
|
if !reset {
|
||||||
t.streamsQuota = newQuotaPool(int(v))
|
t.streamsQuota = newQuotaPool(int(v))
|
||||||
} else {
|
} else {
|
||||||
t.streamsQuota.reset(int(v) - t.maxStreams)
|
t.streamsQuota.reset(int(v) - ms)
|
||||||
}
|
}
|
||||||
t.maxStreams = int(v)
|
|
||||||
case http2.SettingInitialWindowSize:
|
case http2.SettingInitialWindowSize:
|
||||||
|
t.mu.Lock()
|
||||||
for _, s := range t.activeStreams {
|
for _, s := range t.activeStreams {
|
||||||
// Adjust the sending quota for each s.
|
// Adjust the sending quota for each s.
|
||||||
s.sendQuotaPool.reset(int(v - t.streamSendQuota))
|
s.sendQuotaPool.reset(int(v - t.streamSendQuota))
|
||||||
}
|
}
|
||||||
t.streamSendQuota = v
|
t.streamSendQuota = v
|
||||||
|
t.mu.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -173,6 +173,11 @@ type Stream struct {
|
|||||||
buf *recvBuffer
|
buf *recvBuffer
|
||||||
dec io.Reader
|
dec io.Reader
|
||||||
|
|
||||||
|
// updateStreams indicates whether the transport's streamsQuota needed
|
||||||
|
// to be updated when this stream is closed. It is false when the transport
|
||||||
|
// sticks to the initial infinite value of the number of concurrent streams.
|
||||||
|
// Ture otherwise.
|
||||||
|
updateStreams bool
|
||||||
fc *inFlow
|
fc *inFlow
|
||||||
recvQuota uint32
|
recvQuota uint32
|
||||||
// The accumulated inbound quota pending for window update.
|
// The accumulated inbound quota pending for window update.
|
||||||
|
Reference in New Issue
Block a user