Stream: do not cancel ctx created with service config timeout (#1838)

This commit is contained in:
Menghan Li
2018-02-02 10:35:15 -08:00
committed by GitHub
parent f9628db66d
commit 424e3e9894
2 changed files with 96 additions and 7 deletions

View File

@ -90,8 +90,9 @@ type ClientStream interface {
// Stream.SendMsg() may return a non-nil error when something wrong happens sending
// the request. The returned error indicates the status of this sending, not the final
// status of the RPC.
// Always call Stream.RecvMsg() to get the final status if you care about the status of
// the RPC.
//
// Always call Stream.RecvMsg() to drain the stream and get the final
// status, otherwise there could be leaked resources.
Stream
}
@ -126,6 +127,14 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}
if mc.Timeout != nil && *mc.Timeout >= 0 {
// The cancel function for this context will only be called when RecvMsg
// returns non-nil error, which means the stream finishes with error or
// io.EOF. https://github.com/grpc/grpc-go/issues/1818.
//
// Possible context leak:
// - If user provided context is Background, and the user doesn't call
// RecvMsg() for the final status, this ctx will be leaked after the
// stream is done, until the service config timeout happens.
ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
defer func() {
if err != nil {
@ -322,6 +331,8 @@ type clientStream struct {
decomp encoding.Compressor
decompSet bool
// cancel is only called when RecvMsg() returns non-nil error, which means
// the stream finishes with error or with io.EOF.
cancel context.CancelFunc
tracing bool // set to EnableTracing when the clientStream is created.
@ -446,6 +457,9 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
// err != nil indicates the termination of the stream.
if err != nil {
cs.finish(err)
if cs.cancel != nil {
cs.cancel()
}
}
}()
if err == nil {
@ -477,6 +491,9 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
return se
}
cs.finish(err)
if cs.cancel != nil {
cs.cancel()
}
return nil
}
return toRPCErr(err)
@ -523,17 +540,15 @@ func (cs *clientStream) closeTransportStream(err error) {
}
func (cs *clientStream) finish(err error) {
// Do not call cs.cancel in this function. Only call it when RecvMag()
// returns non-nil error because of
// https://github.com/grpc/grpc-go/issues/1818.
cs.mu.Lock()
defer cs.mu.Unlock()
if cs.finished {
return
}
cs.finished = true
defer func() {
if cs.cancel != nil {
cs.cancel()
}
}()
for _, o := range cs.opts {
o.after(cs.c)
}

View File

@ -1808,6 +1808,80 @@ func TestServiceConfigMaxMsgSize(t *testing.T) {
}
}
// Reading from a streaming RPC may fail with context canceled if timeout was
// set by service config (https://github.com/grpc/grpc-go/issues/1818). This
// test makes sure read from streaming RPC doesn't fail in this case.
func TestStreamingRPCWithTimeoutInServiceConfigRecv(t *testing.T) {
te := testServiceConfigSetup(t, tcpClearRREnv)
te.startServer(&testServer{security: tcpClearRREnv.security})
defer te.tearDown()
r, rcleanup := manual.GenerateAndRegisterManualResolver()
defer rcleanup()
te.resolverScheme = r.Scheme()
te.nonBlockingDial = true
fmt.Println("1")
cc := te.clientConn()
fmt.Println("10")
tc := testpb.NewTestServiceClient(cc)
r.NewAddress([]resolver.Address{{Addr: te.srvAddr}})
r.NewServiceConfig(`{
"methodConfig": [
{
"name": [
{
"service": "grpc.testing.TestService",
"method": "FullDuplexCall"
}
],
"waitForReady": true,
"timeout": "10s"
}
]
}`)
// Make sure service config has been processed by grpc.
for {
if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").Timeout != nil {
break
}
time.Sleep(time.Millisecond)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false))
if err != nil {
t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
}
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 0)
if err != nil {
t.Fatalf("failed to newPayload: %v", err)
}
req := &testpb.StreamingOutputCallRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseParameters: []*testpb.ResponseParameters{{Size: 0}},
Payload: payload,
}
if err := stream.Send(req); err != nil {
t.Fatalf("stream.Send(%v) = %v, want <nil>", req, err)
}
stream.CloseSend()
time.Sleep(time.Second)
// Sleep 1 second before recv to make sure the final status is received
// before the recv.
if _, err := stream.Recv(); err != nil {
t.Fatalf("stream.Recv = _, %v, want _, <nil>", err)
}
// Keep reading to drain the stream.
for {
if _, err := stream.Recv(); err != nil {
break
}
}
}
func TestMaxMsgSizeClientDefault(t *testing.T) {
defer leakcheck.Check(t)
for _, e := range listTestEnv() {