Merge pull request #409 from iamqizhao/master

fix a counting race for max streams.
This commit is contained in:
Qi Zhao
2015-10-22 11:07:05 -07:00
3 changed files with 35 additions and 33 deletions

View File

@ -1060,33 +1060,24 @@ func testExceedMaxStreamsLimit(t *testing.T, e env) {
s, cc := setUp(t, nil, 1, "", e) s, cc := setUp(t, nil, 1, "", e)
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc) defer tearDown(s, cc)
done := make(chan struct{}) stream, err := tc.StreamingInputCall(context.Background())
ch := make(chan int) if err != nil {
t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
}
go func() { go func() {
timer := time.After(5 * time.Second) stream.Header()
for {
select {
case <-time.After(5 * time.Millisecond):
ch <- 0
case <-timer:
close(done)
return
}
}
}() }()
// Loop until a stream creation hangs due to the new max stream setting. // Loop until receiving the new max stream setting from the server.
for { for {
select {
case <-ch:
ctx, _ := context.WithTimeout(context.Background(), time.Second) ctx, _ := context.WithTimeout(context.Background(), time.Second)
if _, err := tc.StreamingInputCall(ctx); err != nil { _, err := tc.StreamingInputCall(ctx)
if err == nil {
time.Sleep(time.Second)
continue
}
if grpc.Code(err) == codes.DeadlineExceeded { if grpc.Code(err) == codes.DeadlineExceeded {
return break
}
t.Fatalf("%v.StreamingInputCall(_) = %v, want <nil>", tc, err)
}
case <-done:
t.Fatalf("Client has not received the max stream setting in 5 seconds.")
} }
t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %d", tc, err, codes.DeadlineExceeded)
} }
} }

View File

@ -293,7 +293,18 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
} }
s := t.newStream(ctx, callHdr) s := t.newStream(ctx, callHdr)
t.activeStreams[s.id] = s t.activeStreams[s.id] = s
// This stream is not counted when applySetings(...) initialize t.streamsQuota.
// Reset t.streamsQuota to the right value.
var reset bool
if !checkStreamsQuota && t.streamsQuota != nil {
reset = true
}
t.mu.Unlock() t.mu.Unlock()
if reset {
t.streamsQuota.reset(-1)
}
// HPACK encodes various headers. Note that once WriteField(...) is // HPACK encodes various headers. Note that once WriteField(...) is
// called, the corresponding headers/continuation frame has to be sent // called, the corresponding headers/continuation frame has to be sent
// because hpack.Encoder is stateful. // because hpack.Encoder is stateful.