1. Initialize streamsQuota at object creation.
2. Defer adding back to streamsQuota pool in CloseStream
This commit is contained in:
@ -2676,7 +2676,7 @@ const defaultMaxStreamsClient = 100
|
||||
func TestClientExceedMaxStreamsLimit(t *testing.T) {
|
||||
defer leakCheck(t)()
|
||||
for _, e := range listTestEnv() {
|
||||
testExceedMaxStreamsLimit(t, e)
|
||||
testClientExceedMaxStreamsLimit(t, e)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -209,6 +209,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
|
||||
activeStreams: make(map[uint32]*Stream),
|
||||
creds: opts.PerRPCCredentials,
|
||||
maxStreams: defaultMaxStreamsClient,
|
||||
streamsQuota: newQuotaPool(defaultMaxStreamsClient),
|
||||
streamSendQuota: defaultWindowSize,
|
||||
statsHandler: opts.StatsHandler,
|
||||
}
|
||||
@ -337,21 +338,18 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
||||
t.mu.Unlock()
|
||||
return nil, ErrConnClosing
|
||||
}
|
||||
checkStreamsQuota := t.streamsQuota != nil
|
||||
t.mu.Unlock()
|
||||
if checkStreamsQuota {
|
||||
sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Returns the quota balance back.
|
||||
if sq > 1 {
|
||||
t.streamsQuota.add(sq - 1)
|
||||
}
|
||||
sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Returns the quota balance back.
|
||||
if sq > 1 {
|
||||
t.streamsQuota.add(sq - 1)
|
||||
}
|
||||
if _, err := wait(ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
|
||||
// Return the quota back now because there is no stream returned to the caller.
|
||||
if _, ok := err.(StreamError); ok && checkStreamsQuota {
|
||||
if _, ok := err.(StreamError); ok {
|
||||
t.streamsQuota.add(1)
|
||||
}
|
||||
return nil, err
|
||||
@ -359,9 +357,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
||||
t.mu.Lock()
|
||||
if t.state == draining {
|
||||
t.mu.Unlock()
|
||||
if checkStreamsQuota {
|
||||
t.streamsQuota.add(1)
|
||||
}
|
||||
t.streamsQuota.add(1)
|
||||
// Need to make t writable again so that the rpc in flight can still proceed.
|
||||
t.writableChan <- 0
|
||||
return nil, ErrStreamDrain
|
||||
@ -374,16 +370,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
||||
s.clientStatsCtx = userCtx
|
||||
t.activeStreams[s.id] = s
|
||||
|
||||
// This stream is not counted when applySetings(...) initialize t.streamsQuota.
|
||||
// Reset t.streamsQuota to the right value.
|
||||
var reset bool
|
||||
if !checkStreamsQuota && t.streamsQuota != nil {
|
||||
reset = true
|
||||
}
|
||||
t.mu.Unlock()
|
||||
if reset {
|
||||
t.streamsQuota.add(-1)
|
||||
}
|
||||
|
||||
// HPACK encodes various headers. Note that once WriteField(...) is
|
||||
// called, the corresponding headers/continuation frame has to be sent
|
||||
@ -491,15 +478,11 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
||||
// CloseStream clears the footprint of a stream when the stream is not needed any more.
|
||||
// This must not be executed in reader's goroutine.
|
||||
func (t *http2Client) CloseStream(s *Stream, err error) {
|
||||
var updateStreams bool
|
||||
t.mu.Lock()
|
||||
if t.activeStreams == nil {
|
||||
t.mu.Unlock()
|
||||
return
|
||||
}
|
||||
if t.streamsQuota != nil {
|
||||
updateStreams = true
|
||||
}
|
||||
delete(t.activeStreams, s.id)
|
||||
if t.state == draining && len(t.activeStreams) == 0 {
|
||||
// The transport is draining and s is the last live stream on t.
|
||||
@ -508,9 +491,9 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
|
||||
return
|
||||
}
|
||||
t.mu.Unlock()
|
||||
if updateStreams {
|
||||
defer func() {
|
||||
t.streamsQuota.add(1)
|
||||
}
|
||||
}()
|
||||
s.mu.Lock()
|
||||
if q := s.fc.resetPendingData(); q > 0 {
|
||||
if n := t.fc.onRead(q); n > 0 {
|
||||
@ -1043,16 +1026,10 @@ func (t *http2Client) applySettings(ss []http2.Setting) {
|
||||
s.Val = math.MaxInt32
|
||||
}
|
||||
t.mu.Lock()
|
||||
reset := t.streamsQuota != nil
|
||||
if !reset {
|
||||
t.streamsQuota = newQuotaPool(int(s.Val) - len(t.activeStreams))
|
||||
}
|
||||
ms := t.maxStreams
|
||||
t.maxStreams = int(s.Val)
|
||||
t.mu.Unlock()
|
||||
if reset {
|
||||
t.streamsQuota.add(int(s.Val) - ms)
|
||||
}
|
||||
t.streamsQuota.add(int(s.Val) - ms)
|
||||
case http2.SettingInitialWindowSize:
|
||||
t.mu.Lock()
|
||||
for _, stream := range t.activeStreams {
|
||||
|
@ -507,7 +507,10 @@ func TestMaxStreams(t *testing.T) {
|
||||
case <-cc.streamsQuota.acquire():
|
||||
t.Fatalf("streamsQuota.acquire() becomes readable mistakenly.")
|
||||
default:
|
||||
if cc.streamsQuota.quota != 0 {
|
||||
cc.streamsQuota.mu.Lock()
|
||||
quota := cc.streamsQuota.quota
|
||||
cc.streamsQuota.mu.Unlock()
|
||||
if quota != 0 {
|
||||
t.Fatalf("streamsQuota.quota got non-zero quota mistakenly.")
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user