blocking when max concurrent stream limit is reached.

This commit is contained in:
iamqizhao
2015-05-26 18:14:05 -07:00
parent f5ebd86be7
commit 1b5f15dda8
3 changed files with 61 additions and 22 deletions

View File

@ -795,17 +795,23 @@ func testExceedMaxStreamsLimit(t *testing.T, e env) {
s, cc := setUp(1, e)
tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc)
var err error
for {
time.Sleep(2 * time.Millisecond)
_, err = tc.StreamingInputCall(context.Background())
// Loop until the settings of max concurrent streams is
// received by the client.
if err != nil {
break
// Perform an unary RPC to make sure the new settings were propagated to the client.
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
t.Fatalf("fhaof")
}
// Initiate the 1st stream
if _, err := tc.StreamingInputCall(context.Background()); err != nil {
t.Fatalf("faf")
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
// The 2nd stream should block until its deadline exceeds.
ctx, _ := context.WithTimeout(context.Background(), time.Second)
if _, err := tc.StreamingInputCall(ctx); grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("1414")
}
}
if grpc.Code(err) != codes.Unavailable {
t.Fatalf("got %v, want error code %d", err, codes.Unavailable)
}
wg.Done()
}()
wg.Wait()
}

View File

@ -79,6 +79,7 @@ type http2Client struct {
fc *inFlow
// sendQuotaPool provides flow control to outbound message.
sendQuotaPool *quotaPool
streamsQuota *quotaPool
// The scheme used: https if TLS is on, http otherwise.
scheme string
@ -89,7 +90,7 @@ type http2Client struct {
state transportState // the state of underlying connection
activeStreams map[uint32]*Stream
// The max number of concurrent streams
maxStreams uint32
maxStreams int
// the per-stream outbound flow control window size set by the peer.
streamSendQuota uint32
}
@ -174,8 +175,8 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e
scheme: scheme,
state: reachable,
activeStreams: make(map[uint32]*Stream),
maxStreams: math.MaxUint32,
authCreds: opts.AuthOptions,
maxStreams: math.MaxInt32,
streamSendQuota: defaultWindowSize,
}
go t.controller()
@ -236,19 +237,26 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
authData[k] = v
}
}
if _, err := wait(ctx, t.shutdownChan, t.writableChan); err != nil {
return nil, err
}
t.mu.Lock()
if t.state != reachable {
t.mu.Unlock()
return nil, ErrConnClosing
}
if uint32(len(t.activeStreams)) >= t.maxStreams {
t.mu.Unlock()
t.writableChan <- 0
return nil, StreamErrorf(codes.Unavailable, "transport: failed to create new stream because the limit has been reached.")
if t.streamsQuota != nil {
q, err := wait(ctx, t.shutdownChan, t.streamsQuota.acquire())
if err != nil {
t.mu.Unlock()
return nil, err
}
if q > 1 {
t.streamsQuota.add(q-1)
}
}
t.mu.Unlock()
if _, err := wait(ctx, t.shutdownChan, t.writableChan); err != nil {
return nil, err
}
t.mu.Lock()
s := t.newStream(ctx, callHdr)
t.activeStreams[s.id] = s
t.mu.Unlock()
@ -318,6 +326,9 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
func (t *http2Client) CloseStream(s *Stream, err error) {
t.mu.Lock()
delete(t.activeStreams, s.id)
if t.streamsQuota != nil {
t.streamsQuota.add(1)
}
t.mu.Unlock()
s.mu.Lock()
if q := s.fc.restoreConn(); q > 0 {
@ -558,7 +569,18 @@ func (t *http2Client) handleSettings(f *http2.SettingsFrame) {
defer t.mu.Unlock()
switch s.ID {
case http2.SettingMaxConcurrentStreams:
t.maxStreams = v
// TODO(zhaoq): This is a hack to avoid significant refactoring of the
// code to deal with int32 overflow. Have a better way to handle this
// later.
if v > math.MaxInt32 {
v = math.MaxInt32
}
if t.streamsQuota == nil {
t.streamsQuota = newQuotaPool(int(v))
} else {
t.streamsQuota.reset(int(v) - t.maxStreams)
}
t.maxStreams = int(v)
case http2.SettingInitialWindowSize:
for _, s := range t.activeStreams {
// Adjust the sending quota for each s.

View File

@ -451,3 +451,14 @@ func wait(ctx context.Context, closing <-chan struct{}, proceed <-chan int) (int
return i, nil
}
}
func wait64(ctx context.Context, closing <-chan struct{}, proceed <-chan int64) (int64, error) {
select {
case <-ctx.Done():
return 0, ContextErr(ctx.Err())
case <-closing:
return 0, ErrConnClosing
case i := <-proceed:
return i, nil
}
}