From 2e7aa9a2b106cce5d21e35f6ea2496876ece6b6a Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Tue, 12 Jul 2016 19:24:33 -0700 Subject: [PATCH 01/20] Error out the send call for a client streaming rpc if the server has returned an error. --- test/end2end_test.go | 134 +++++++++++++++++++++++++------------- transport/http2_client.go | 6 ++ 2 files changed, 95 insertions(+), 45 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index cc8bae5f..b4165f9a 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -90,7 +90,8 @@ var ( var raceMode bool // set by race_test.go in race mode type testServer struct { - security string // indicate the authentication protocol used by this server. + security string // indicate the authentication protocol used by this server. + streamingInputCallErr bool // whether to error out the StreamingInputCall handler prematurely. } func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { @@ -219,6 +220,9 @@ func (s *testServer) StreamingInputCall(stream testpb.TestService_StreamingInput } p := in.GetPayload().GetBody() sum += len(p) + if s.streamingInputCallErr { + return grpc.Errorf(codes.NotFound, "not found") + } } } @@ -404,10 +408,10 @@ func (te *test) tearDown() { // modify it before calling its startServer and clientConn methods. func newTest(t *testing.T, e env) *test { te := &test{ - t: t, - e: e, - testServer: &testServer{security: e.security}, - maxStream: math.MaxUint32, + t: t, + e: e, + //testServer: &testServer{security: e.security}, + maxStream: math.MaxUint32, } te.ctx, te.cancel = context.WithCancel(context.Background()) return te @@ -415,7 +419,8 @@ func newTest(t *testing.T, e env) *test { // startServer starts a gRPC server listening. Callers should defer a // call to te.tearDown to clean up. -func (te *test) startServer() { +func (te *test) startServer(ts testpb.TestServiceServer) { + te.testServer = ts e := te.e te.t.Logf("Running test in %s environment...", e.name) sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(te.maxStream)} @@ -545,7 +550,7 @@ func testTimeoutOnDeadServer(t *testing.T, e env) { "grpc: Conn.resetTransport failed to create client transport: connection error", "grpc: Conn.resetTransport failed to create client transport: connection error: desc = \"transport: dial unix", ) - te.startServer() + te.startServer(&testServer{security: e.security}) defer te.tearDown() cc := te.clientConn() @@ -580,7 +585,7 @@ func testFailFast(t *testing.T, e env) { "grpc: Conn.resetTransport failed to create client transport: connection error", "grpc: Conn.resetTransport failed to create client transport: connection error: desc = \"transport: dial unix", ) - te.startServer() + te.startServer(&testServer{security: e.security}) defer te.tearDown() cc := te.clientConn() @@ -629,7 +634,7 @@ func testHealthCheckOnSuccess(t *testing.T, e env) { hs := health.NewHealthServer() hs.SetServingStatus("grpc.health.v1.Health", 1) te.healthServer = hs - te.startServer() + te.startServer(&testServer{security: e.security}) defer te.tearDown() cc := te.clientConn() @@ -655,7 +660,7 @@ func testHealthCheckOnFailure(t *testing.T, e env) { hs := health.NewHealthServer() hs.SetServingStatus("grpc.health.v1.HealthCheck", 1) te.healthServer = hs - te.startServer() + te.startServer(&testServer{security: e.security}) defer te.tearDown() cc := te.clientConn() @@ -679,7 +684,7 @@ func TestHealthCheckOff(t *testing.T) { func testHealthCheckOff(t *testing.T, e env) { te := newTest(t, e) - te.startServer() + te.startServer(&testServer{security: e.security}) defer te.tearDown() want := grpc.Errorf(codes.Unimplemented, "unknown service grpc.health.v1.Health") if _, err := healthCheck(1*time.Second, te.clientConn(), ""); !equalErrors(err, want) { @@ -698,7 +703,7 @@ func testHealthCheckServingStatus(t *testing.T, e env) { te := newTest(t, e) hs := health.NewHealthServer() te.healthServer = hs - te.startServer() + te.startServer(&testServer{security: e.security}) defer te.tearDown() cc := te.clientConn() @@ -741,7 +746,7 @@ func TestErrorChanNoIO(t *testing.T) { func testErrorChanNoIO(t *testing.T, e env) { te := newTest(t, e) - te.startServer() + te.startServer(&testServer{security: e.security}) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -760,7 +765,7 @@ func TestEmptyUnaryWithUserAgent(t *testing.T) { func testEmptyUnaryWithUserAgent(t *testing.T, e env) { te := newTest(t, e) te.userAgent = testAppUA - te.startServer() + te.startServer(&testServer{security: e.security}) defer te.tearDown() cc := te.clientConn() @@ -786,7 +791,7 @@ func TestFailedEmptyUnary(t *testing.T) { func testFailedEmptyUnary(t *testing.T, e env) { te := newTest(t, e) - te.startServer() + te.startServer(&testServer{security: e.security}) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -806,7 +811,7 @@ func TestLargeUnary(t *testing.T) { func testLargeUnary(t *testing.T, e env) { te := newTest(t, e) - te.startServer() + te.startServer(&testServer{security: e.security}) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -843,7 +848,7 @@ func TestMetadataUnaryRPC(t *testing.T) { func testMetadataUnaryRPC(t *testing.T, e env) { te := newTest(t, e) - te.startServer() + te.startServer(&testServer{security: e.security}) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -889,7 +894,7 @@ func TestMalformedHTTP2Metadata(t *testing.T) { func testMalformedHTTP2Metadata(t *testing.T, e env) { te := newTest(t, e) - te.startServer() + te.startServer(&testServer{security: e.security}) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -951,7 +956,7 @@ func TestRetry(t *testing.T) { func testRetry(t *testing.T, e env) { te := newTest(t, e) te.declareLogNoise("transport: http2Client.notifyError got notified that the client transport was broken") - te.startServer() + te.startServer(&testServer{security: e.security}) defer te.tearDown() cc := te.clientConn() @@ -1000,7 +1005,7 @@ func TestRPCTimeout(t *testing.T) { // TODO(zhaoq): Have a better test coverage of timeout and cancellation mechanism. func testRPCTimeout(t *testing.T, e env) { te := newTest(t, e) - te.startServer() + te.startServer(&testServer{security: e.security}) defer te.tearDown() cc := te.clientConn() @@ -1037,7 +1042,7 @@ func TestCancel(t *testing.T) { func testCancel(t *testing.T, e env) { te := newTest(t, e) te.declareLogNoise("grpc: the client connection is closing; please retry") - te.startServer() + te.startServer(&testServer{security: e.security}) defer te.tearDown() cc := te.clientConn() @@ -1075,7 +1080,7 @@ func testCancelNoIO(t *testing.T, e env) { te := newTest(t, e) te.declareLogNoise("http2Client.notifyError got notified that the client transport was broken") te.maxStream = 1 // Only allows 1 live stream per server transport. - te.startServer() + te.startServer(&testServer{security: e.security}) defer te.tearDown() cc := te.clientConn() @@ -1145,8 +1150,7 @@ func TestNoService(t *testing.T) { func testNoService(t *testing.T, e env) { te := newTest(t, e) - te.testServer = nil // register nothing - te.startServer() + te.startServer(nil) defer te.tearDown() cc := te.clientConn() @@ -1170,7 +1174,7 @@ func TestPingPong(t *testing.T) { func testPingPong(t *testing.T, e env) { te := newTest(t, e) - te.startServer() + te.startServer(&testServer{security: e.security}) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -1230,7 +1234,7 @@ func TestMetadataStreamingRPC(t *testing.T) { func testMetadataStreamingRPC(t *testing.T, e env) { te := newTest(t, e) - te.startServer() + te.startServer(&testServer{security: e.security}) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -1301,7 +1305,7 @@ func TestServerStreaming(t *testing.T) { func testServerStreaming(t *testing.T, e env) { te := newTest(t, e) - te.startServer() + te.startServer(&testServer{security: e.security}) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -1356,7 +1360,7 @@ func TestFailedServerStreaming(t *testing.T) { func testFailedServerStreaming(t *testing.T, e env) { te := newTest(t, e) - te.startServer() + te.startServer(&testServer{security: e.security}) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -1412,8 +1416,7 @@ func TestServerStreaming_Concurrent(t *testing.T) { func testServerStreaming_Concurrent(t *testing.T, e env) { te := newTest(t, e) - te.testServer = concurrentSendServer{} - te.startServer() + te.startServer(concurrentSendServer{}) defer te.tearDown() cc := te.clientConn() @@ -1471,7 +1474,7 @@ func TestClientStreaming(t *testing.T) { func testClientStreaming(t *testing.T, e env) { te := newTest(t, e) - te.startServer() + te.startServer(&testServer{security: e.security}) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -1504,6 +1507,47 @@ func testClientStreaming(t *testing.T, e env) { } } +func TestClientStreamingError(t *testing.T) { + defer leakCheck(t)() + for _, e := range listTestEnv() { + testClientStreamingError(t, e) + } +} + +func testClientStreamingError(t *testing.T, e env) { + te := newTest(t, e) + te.startServer(&testServer{security: e.security, streamingInputCallErr: true}) + defer te.tearDown() + tc := testpb.NewTestServiceClient(te.clientConn()) + + stream, err := tc.StreamingInputCall(te.ctx) + if err != nil { + t.Fatalf("%v.StreamingInputCall(_) = _, %v, want ", tc, err) + } + payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 1) + if err != nil { + t.Fatal(err) + } + + req := &testpb.StreamingInputCallRequest{ + Payload: payload, + } + // The 1st request should go through. + if err := stream.Send(req); err != nil { + t.Fatalf("%v.Send(%v) = %v, want ", stream, req, err) + } + for { + if err := stream.Send(req); err == nil { + continue + } else { + if grpc.Code(err) != codes.NotFound { + t.Fatalf("%v.Send(_) = %v, want error %d", stream, err, codes.NotFound) + } + break + } + } +} + func TestExceedMaxStreamsLimit(t *testing.T) { defer leakCheck(t)() for _, e := range listTestEnv() { @@ -1519,7 +1563,7 @@ func testExceedMaxStreamsLimit(t *testing.T, e env) { "grpc: the client connection is closing", ) te.maxStream = 1 // Only allows 1 live stream per server transport. - te.startServer() + te.startServer(&testServer{security: e.security}) defer te.tearDown() cc := te.clientConn() @@ -1560,7 +1604,7 @@ func testStreamsQuotaRecovery(t *testing.T, e env) { "grpc: the client connection is closing", ) te.maxStream = 1 // Allows 1 live stream. - te.startServer() + te.startServer(&testServer{security: e.security}) defer te.tearDown() cc := te.clientConn() @@ -1611,7 +1655,7 @@ func testCompressServerHasNoSupport(t *testing.T, e env) { te := newTest(t, e) te.serverCompression = false te.clientCompression = true - te.startServer() + te.startServer(&testServer{security: e.security}) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -1667,7 +1711,7 @@ func testCompressOK(t *testing.T, e env) { te := newTest(t, e) te.serverCompression = true te.clientCompression = true - te.startServer() + te.startServer(&testServer{security: e.security}) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -1730,7 +1774,7 @@ func errInjector(ctx context.Context, req interface{}, info *grpc.UnaryServerInf func testUnaryServerInterceptor(t *testing.T, e env) { te := newTest(t, e) te.unaryInt = errInjector - te.startServer() + te.startServer(&testServer{security: e.security}) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -1761,7 +1805,7 @@ func fullDuplexOnly(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServ func testStreamServerInterceptor(t *testing.T, e env) { te := newTest(t, e) te.streamInt = fullDuplexOnly - te.startServer() + te.startServer(&testServer{security: e.security}) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -1825,12 +1869,12 @@ func TestClientRequestBodyError_UnexpectedEOF(t *testing.T) { func testClientRequestBodyError_UnexpectedEOF(t *testing.T, e env) { te := newTest(t, e) - te.testServer = &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { errUnexpectedCall := errors.New("unexpected call func server method") t.Error(errUnexpectedCall) return nil, errUnexpectedCall }} - te.startServer() + te.startServer(ts) defer te.tearDown() te.withServerTester(func(st *serverTester) { st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall") @@ -1850,12 +1894,12 @@ func TestClientRequestBodyError_CloseAfterLength(t *testing.T) { func testClientRequestBodyError_CloseAfterLength(t *testing.T, e env) { te := newTest(t, e) te.declareLogNoise("Server.processUnaryRPC failed to write status") - te.testServer = &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { errUnexpectedCall := errors.New("unexpected call func server method") t.Error(errUnexpectedCall) return nil, errUnexpectedCall }} - te.startServer() + te.startServer(ts) defer te.tearDown() te.withServerTester(func(st *serverTester) { st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall") @@ -1875,11 +1919,11 @@ func TestClientRequestBodyError_Cancel(t *testing.T) { func testClientRequestBodyError_Cancel(t *testing.T, e env) { te := newTest(t, e) gotCall := make(chan bool, 1) - te.testServer = &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { gotCall <- true return new(testpb.SimpleResponse), nil }} - te.startServer() + te.startServer(ts) defer te.tearDown() te.withServerTester(func(st *serverTester) { st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall") @@ -1912,12 +1956,12 @@ func TestClientRequestBodyError_Cancel_StreamingInput(t *testing.T) { func testClientRequestBodyError_Cancel_StreamingInput(t *testing.T, e env) { te := newTest(t, e) recvErr := make(chan error, 1) - te.testServer = &funcServer{streamingInputCall: func(stream testpb.TestService_StreamingInputCallServer) error { + ts := &funcServer{streamingInputCall: func(stream testpb.TestService_StreamingInputCallServer) error { _, err := stream.Recv() recvErr <- err return nil }} - te.startServer() + te.startServer(ts) defer te.tearDown() te.withServerTester(func(st *serverTester) { st.writeHeadersGRPC(1, "/grpc.testing.TestService/StreamingInputCall") diff --git a/transport/http2_client.go b/transport/http2_client.go index f66435fd..d7f19b2e 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -497,6 +497,12 @@ func (t *http2Client) GracefulClose() error { // TODO(zhaoq): opts.Delay is ignored in this implementation. Support it later // if it improves the performance. func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { + s.mu.Lock() + if s.state == streamDone { + s.mu.Unlock() + return StreamErrorf(s.statusCode, "%s", s.statusDesc) + } + s.mu.Unlock() r := bytes.NewBuffer(data) for { var p []byte From f53faa647d8e054bd074c858d29b6ebbb5e9f49f Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Tue, 12 Jul 2016 19:26:12 -0700 Subject: [PATCH 02/20] minor fix --- test/end2end_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index b4165f9a..457f34e3 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -408,9 +408,8 @@ func (te *test) tearDown() { // modify it before calling its startServer and clientConn methods. func newTest(t *testing.T, e env) *test { te := &test{ - t: t, - e: e, - //testServer: &testServer{security: e.security}, + t: t, + e: e, maxStream: math.MaxUint32, } te.ctx, te.cancel = context.WithCancel(context.Background()) From 6205cb25abc9325b42d2eb65f3d1789fb4d2c177 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 13 Jul 2016 18:05:02 -0700 Subject: [PATCH 03/20] fix some bugs --- clientconn_test.go | 5 +++-- test/end2end_test.go | 17 ++++++++--------- transport/http2_client.go | 38 +++++++++++++++++++++++++------------- transport/transport.go | 8 ++++++++ 4 files changed, 44 insertions(+), 24 deletions(-) diff --git a/clientconn_test.go b/clientconn_test.go index 29db8bfc..71ba45e6 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -73,7 +73,7 @@ func TestCredentialsMisuse(t *testing.T) { t.Fatalf("Failed to create authenticator %v", err) } // Two conflicting credential configurations - if _, err := Dial("Non-Existent.Server:80", WithTransportCredentials(tlsCreds), WithTimeout(time.Millisecond), WithBlock(), WithInsecure()); err != errCredentialsConflict { + if _, err := Dial("Non-Existent.Server:80", WithTransportCredentials(tlsCreds), WithBlock(), WithInsecure()); err != errCredentialsConflict { t.Fatalf("Dial(_, _) = _, %v, want _, %v", err, errCredentialsConflict) } rpcCreds, err := oauth.NewJWTAccessFromKey(nil) @@ -81,7 +81,7 @@ func TestCredentialsMisuse(t *testing.T) { t.Fatalf("Failed to create credentials %v", err) } // security info on insecure connection - if _, err := Dial("Non-Existent.Server:80", WithPerRPCCredentials(rpcCreds), WithTimeout(time.Millisecond), WithBlock(), WithInsecure()); err != errTransportCredentialsMissing { + if _, err := Dial("Non-Existent.Server:80", WithPerRPCCredentials(rpcCreds), WithBlock(), WithInsecure()); err != errTransportCredentialsMissing { t.Fatalf("Dial(_, _) = _, %v, want _, %v", err, errTransportCredentialsMissing) } } @@ -123,4 +123,5 @@ func testBackoffConfigSet(t *testing.T, expected *BackoffConfig, opts ...DialOpt if actual != *expected { t.Fatalf("unexpected backoff config on connection: %v, want %v", actual, expected) } + conn.Close() } diff --git a/test/end2end_test.go b/test/end2end_test.go index 457f34e3..2f4e8e8e 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -90,8 +90,8 @@ var ( var raceMode bool // set by race_test.go in race mode type testServer struct { - security string // indicate the authentication protocol used by this server. - streamingInputCallErr bool // whether to error out the StreamingInputCall handler prematurely. + security string // indicate the authentication protocol used by this server. + earlyFail bool // whether to error out the execution of a service handler. } func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { @@ -220,7 +220,7 @@ func (s *testServer) StreamingInputCall(stream testpb.TestService_StreamingInput } p := in.GetPayload().GetBody() sum += len(p) - if s.streamingInputCallErr { + if s.earlyFail { return grpc.Errorf(codes.NotFound, "not found") } } @@ -1515,7 +1515,7 @@ func TestClientStreamingError(t *testing.T) { func testClientStreamingError(t *testing.T, e env) { te := newTest(t, e) - te.startServer(&testServer{security: e.security, streamingInputCallErr: true}) + te.startServer(&testServer{security: e.security, earlyFail: true}) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) @@ -1538,12 +1538,11 @@ func testClientStreamingError(t *testing.T, e env) { for { if err := stream.Send(req); err == nil { continue - } else { - if grpc.Code(err) != codes.NotFound { - t.Fatalf("%v.Send(_) = %v, want error %d", stream, err, codes.NotFound) - } - break } + if _, err := stream.CloseAndRecv(); grpc.Code(err) != codes.NotFound { + t.Fatalf("%v.Send(_) = %v, want error %d", stream, err, codes.NotFound) + } + break } } diff --git a/transport/http2_client.go b/transport/http2_client.go index d7f19b2e..bede3d39 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -498,9 +498,10 @@ func (t *http2Client) GracefulClose() error { // if it improves the performance. func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { s.mu.Lock() + // The stream has been done. Return the status directly. if s.state == streamDone { s.mu.Unlock() - return StreamErrorf(s.statusCode, "%s", s.statusDesc) + return StreamErrorf(s.statusCode, "%v", s.statusDesc) } s.mu.Unlock() r := bytes.NewBuffer(data) @@ -599,11 +600,11 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { } s.mu.Lock() if s.state != streamDone { - if s.state == streamReadDone { - s.state = streamDone - } else { - s.state = streamWriteDone - } + //if s.state == streamReadDone { + // s.state = streamDone + //} else { + s.state = streamWriteDone + //} } s.mu.Unlock() return nil @@ -678,11 +679,14 @@ func (t *http2Client) handleData(f *http2.DataFrame) { // the read direction is closed, and set the status appropriately. if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) { s.mu.Lock() - if s.state == streamWriteDone { - s.state = streamDone - } else { - s.state = streamReadDone - } + s.state = streamDone + /* + if s.state == streamWriteDone { + s.state = streamDone + } else { + s.state = streamReadDone + } + */ s.statusCode = codes.Internal s.statusDesc = "server closed the stream without sending trailers" s.mu.Unlock() @@ -786,12 +790,20 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { if len(state.mdata) > 0 { s.trailer = state.mdata } - s.state = streamDone s.statusCode = state.statusCode s.statusDesc = state.statusDesc + var cancel bool + if s.state != streamWriteDone { + // s will be canceled. This is required to interrupt any pending + // blocking Write calls when the final RPC status has been arrived. + cancel = true + } + s.state = streamDone s.mu.Unlock() - s.write(recvMsg{err: io.EOF}) + if cancel { + s.cancel() + } } func handleMalformedHTTP2(s *Stream, err error) { diff --git a/transport/transport.go b/transport/transport.go index d4c220a0..cb739886 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -140,6 +140,14 @@ func (r *recvBufferReader) Read(p []byte) (n int, err error) { } select { case <-r.ctx.Done(): + select { + case i := <-r.recv.get(): + m := i.(*recvMsg) + if m.err != nil { + return 0, m.err + } + default: + } return 0, ContextErr(r.ctx.Err()) case i := <-r.recv.get(): r.recv.load() From 48068bbe62e5804adb0eee0fc5352416d7f6b718 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 13 Jul 2016 18:13:28 -0700 Subject: [PATCH 04/20] remove debugging info --- test/end2end_test.go | 2 +- transport/http2_client.go | 11 ----------- transport/transport.go | 1 - 3 files changed, 1 insertion(+), 13 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 2f4e8e8e..c2404f24 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -91,7 +91,7 @@ var raceMode bool // set by race_test.go in race mode type testServer struct { security string // indicate the authentication protocol used by this server. - earlyFail bool // whether to error out the execution of a service handler. + earlyFail bool // whether to error out the execution of a service handler prematurely. } func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { diff --git a/transport/http2_client.go b/transport/http2_client.go index bede3d39..3f1ef2a5 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -600,11 +600,7 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { } s.mu.Lock() if s.state != streamDone { - //if s.state == streamReadDone { - // s.state = streamDone - //} else { s.state = streamWriteDone - //} } s.mu.Unlock() return nil @@ -680,13 +676,6 @@ func (t *http2Client) handleData(f *http2.DataFrame) { if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) { s.mu.Lock() s.state = streamDone - /* - if s.state == streamWriteDone { - s.state = streamDone - } else { - s.state = streamReadDone - } - */ s.statusCode = codes.Internal s.statusDesc = "server closed the stream without sending trailers" s.mu.Unlock() diff --git a/transport/transport.go b/transport/transport.go index cb739886..db0de3a6 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -165,7 +165,6 @@ type streamState uint8 const ( streamActive streamState = iota streamWriteDone // EndStream sent - streamReadDone // EndStream received streamDone // sendDone and recvDone or RSTStreamFrame is sent or received. ) From 99c9776beb2e96d410f58f50db24f5650e8bc4a3 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 13 Jul 2016 18:18:35 -0700 Subject: [PATCH 05/20] add more comments --- transport/transport.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/transport/transport.go b/transport/transport.go index db0de3a6..46df5570 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -140,6 +140,10 @@ func (r *recvBufferReader) Read(p []byte) (n int, err error) { } select { case <-r.ctx.Done(): + // ctx might be canceled by gRPC internals to unblocking pending writing operations + // when the client receives the final status prematurely (for client and bi-directional + // streaming RPCs). Used to return the real status to the users instead of the + // cancellation. select { case i := <-r.recv.get(): m := i.(*recvMsg) From 311bcfb8e80004682b26e1b84227941d8d49f388 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 14 Jul 2016 10:49:31 -0700 Subject: [PATCH 06/20] fix some comments --- transport/transport.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transport/transport.go b/transport/transport.go index 46df5570..1c888083 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -169,7 +169,7 @@ type streamState uint8 const ( streamActive streamState = iota streamWriteDone // EndStream sent - streamDone // sendDone and recvDone or RSTStreamFrame is sent or received. + streamDone // the entire stream is finished. ) // Stream represents an RPC in the transport layer. From 7b7cb1ae8cee262f2202115cd544c6803241982f Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 14 Jul 2016 17:12:11 -0700 Subject: [PATCH 07/20] fix bugs and flaky tests --- stream.go | 7 ++++++- test/end2end_test.go | 2 +- transport/http2_client.go | 28 +++++++++------------------- transport/http2_server.go | 18 +++++++----------- transport/transport.go | 25 ++++++++++--------------- 5 files changed, 33 insertions(+), 47 deletions(-) diff --git a/stream.go b/stream.go index 7a3bef51..a0373600 100644 --- a/stream.go +++ b/stream.go @@ -251,7 +251,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { if err != nil { cs.finish(err) } - if err == nil || err == io.EOF { + if err == nil || err == io.EOF || err == transport.ErrEarlyDone { return } if _, ok := err.(transport.ConnectionError); !ok { @@ -328,6 +328,11 @@ func (cs *clientStream) CloseSend() (err error) { if err == nil || err == io.EOF { return } + if err == transport.ErrEarlyDone { + // If the RPC is done prematurely, Stream.RecvMsg(...) needs to be + // called to get the final status and clear the footprint. + return nil + } if _, ok := err.(transport.ConnectionError); !ok { cs.closeTransportStream(err) } diff --git a/test/end2end_test.go b/test/end2end_test.go index c2404f24..c109c885 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -1540,7 +1540,7 @@ func testClientStreamingError(t *testing.T, e env) { continue } if _, err := stream.CloseAndRecv(); grpc.Code(err) != codes.NotFound { - t.Fatalf("%v.Send(_) = %v, want error %d", stream, err, codes.NotFound) + t.Fatalf("%v.CloseAndRecv() = %v, want error %d", stream, err, codes.NotFound) } break } diff --git a/transport/http2_client.go b/transport/http2_client.go index 3f1ef2a5..2715e2d0 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -202,6 +202,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { // TODO(zhaoq): Handle uint32 overflow of Stream.id. s := &Stream{ id: t.nextID, + earlyDone: make(chan struct{}), method: callHdr.Method, sendCompress: callHdr.SendCompress, buf: newRecvBuffer(), @@ -278,7 +279,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea checkStreamsQuota := t.streamsQuota != nil t.mu.Unlock() if checkStreamsQuota { - sq, err := wait(ctx, t.shutdownChan, t.streamsQuota.acquire()) + sq, err := wait(ctx, nil, t.shutdownChan, t.streamsQuota.acquire()) if err != nil { return nil, err } @@ -287,7 +288,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea t.streamsQuota.add(sq - 1) } } - if _, err := wait(ctx, t.shutdownChan, t.writableChan); err != nil { + if _, err := wait(ctx, nil, t.shutdownChan, t.writableChan); err != nil { // Return the quota back now because there is no stream returned to the caller. if _, ok := err.(StreamError); ok && checkStreamsQuota { t.streamsQuota.add(1) @@ -497,13 +498,6 @@ func (t *http2Client) GracefulClose() error { // TODO(zhaoq): opts.Delay is ignored in this implementation. Support it later // if it improves the performance. func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { - s.mu.Lock() - // The stream has been done. Return the status directly. - if s.state == streamDone { - s.mu.Unlock() - return StreamErrorf(s.statusCode, "%v", s.statusDesc) - } - s.mu.Unlock() r := bytes.NewBuffer(data) for { var p []byte @@ -511,13 +505,13 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { size := http2MaxFrameLen s.sendQuotaPool.add(0) // Wait until the stream has some quota to send the data. - sq, err := wait(s.ctx, t.shutdownChan, s.sendQuotaPool.acquire()) + sq, err := wait(s.ctx, s.earlyDone, t.shutdownChan, s.sendQuotaPool.acquire()) if err != nil { return err } t.sendQuotaPool.add(0) // Wait until the transport has some quota to send the data. - tq, err := wait(s.ctx, t.shutdownChan, t.sendQuotaPool.acquire()) + tq, err := wait(s.ctx, s.earlyDone, t.shutdownChan, t.sendQuotaPool.acquire()) if err != nil { if _, ok := err.(StreamError); ok { t.sendQuotaPool.cancel() @@ -551,7 +545,7 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { // Indicate there is a writer who is about to write a data frame. t.framer.adjustNumWriters(1) // Got some quota. Try to acquire writing privilege on the transport. - if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil { + if _, err := wait(s.ctx, s.earlyDone, t.shutdownChan, t.writableChan); err != nil { if _, ok := err.(StreamError); ok { // Return the connection quota back. t.sendQuotaPool.add(len(p)) @@ -781,18 +775,14 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { } s.statusCode = state.statusCode s.statusDesc = state.statusDesc - var cancel bool if s.state != streamWriteDone { - // s will be canceled. This is required to interrupt any pending - // blocking Write calls when the final RPC status has been arrived. - cancel = true + // This is required to interrupt any pending blocking Write calls + // when the final RPC status has been arrived. + close(s.earlyDone) } s.state = streamDone s.mu.Unlock() s.write(recvMsg{err: io.EOF}) - if cancel { - s.cancel() - } } func handleMalformedHTTP2(s *Stream, err error) { diff --git a/transport/http2_server.go b/transport/http2_server.go index cee15429..9e35fdd8 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -364,11 +364,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) { // Received the end of stream from the client. s.mu.Lock() if s.state != streamDone { - if s.state == streamWriteDone { - s.state = streamDone - } else { - s.state = streamReadDone - } + s.state = streamReadDone } s.mu.Unlock() s.write(recvMsg{err: io.EOF}) @@ -455,7 +451,7 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { } s.headerOk = true s.mu.Unlock() - if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil { + if _, err := wait(s.ctx, nil, t.shutdownChan, t.writableChan); err != nil { return err } t.hBuf.Reset() @@ -495,7 +491,7 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s headersSent = true } s.mu.Unlock() - if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil { + if _, err := wait(s.ctx, nil, t.shutdownChan, t.writableChan); err != nil { return err } t.hBuf.Reset() @@ -544,7 +540,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error { } s.mu.Unlock() if writeHeaderFrame { - if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil { + if _, err := wait(s.ctx, nil, t.shutdownChan, t.writableChan); err != nil { return err } t.hBuf.Reset() @@ -572,13 +568,13 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error { size := http2MaxFrameLen s.sendQuotaPool.add(0) // Wait until the stream has some quota to send the data. - sq, err := wait(s.ctx, t.shutdownChan, s.sendQuotaPool.acquire()) + sq, err := wait(s.ctx, nil, t.shutdownChan, s.sendQuotaPool.acquire()) if err != nil { return err } t.sendQuotaPool.add(0) // Wait until the transport has some quota to send the data. - tq, err := wait(s.ctx, t.shutdownChan, t.sendQuotaPool.acquire()) + tq, err := wait(s.ctx, nil, t.shutdownChan, t.sendQuotaPool.acquire()) if err != nil { if _, ok := err.(StreamError); ok { t.sendQuotaPool.cancel() @@ -604,7 +600,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error { t.framer.adjustNumWriters(1) // Got some quota. Try to acquire writing privilege on the // transport. - if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil { + if _, err := wait(s.ctx, nil, t.shutdownChan, t.writableChan); err != nil { if _, ok := err.(StreamError); ok { // Return the connection quota back. t.sendQuotaPool.add(ps) diff --git a/transport/transport.go b/transport/transport.go index 1c888083..3340e9ad 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -140,18 +140,6 @@ func (r *recvBufferReader) Read(p []byte) (n int, err error) { } select { case <-r.ctx.Done(): - // ctx might be canceled by gRPC internals to unblocking pending writing operations - // when the client receives the final status prematurely (for client and bi-directional - // streaming RPCs). Used to return the real status to the users instead of the - // cancellation. - select { - case i := <-r.recv.get(): - m := i.(*recvMsg) - if m.err != nil { - return 0, m.err - } - default: - } return 0, ContextErr(r.ctx.Err()) case i := <-r.recv.get(): r.recv.load() @@ -169,6 +157,7 @@ type streamState uint8 const ( streamActive streamState = iota streamWriteDone // EndStream sent + streamReadDone // EndStream received streamDone // the entire stream is finished. ) @@ -178,8 +167,9 @@ type Stream struct { // nil for client side Stream. st ServerTransport // ctx is the associated context of the stream. - ctx context.Context - cancel context.CancelFunc + ctx context.Context + cancel context.CancelFunc + earlyDone chan struct{} // method records the associated RPC method of the stream. method string recvCompress string @@ -469,6 +459,8 @@ func StreamErrorf(c codes.Code, format string, a ...interface{}) StreamError { } } +var ErrEarlyDone = StreamErrorf(codes.Internal, "rpc is done prematurely") + // ConnectionErrorf creates an ConnectionError with the specified error description. func ConnectionErrorf(format string, a ...interface{}) ConnectionError { return ConnectionError{ @@ -512,12 +504,15 @@ func ContextErr(err error) StreamError { // wait blocks until it can receive from ctx.Done, closing, or proceed. // If it receives from ctx.Done, it returns 0, the StreamError for ctx.Err. +// If it receives from earlyDone, it returns 0, errEarlyDone. // If it receives from closing, it returns 0, ErrConnClosing. // If it receives from proceed, it returns the received integer, nil. -func wait(ctx context.Context, closing <-chan struct{}, proceed <-chan int) (int, error) { +func wait(ctx context.Context, earlyDone, closing <-chan struct{}, proceed <-chan int) (int, error) { select { case <-ctx.Done(): return 0, ContextErr(ctx.Err()) + case <-earlyDone: + return 0, ErrEarlyDone case <-closing: return 0, ErrConnClosing case i := <-proceed: From e558cf17ffbb1ea16873c38a642cf421da39baf6 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 14 Jul 2016 17:13:20 -0700 Subject: [PATCH 08/20] fix comments --- transport/transport.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transport/transport.go b/transport/transport.go index 3340e9ad..fb6cc4cf 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -504,7 +504,7 @@ func ContextErr(err error) StreamError { // wait blocks until it can receive from ctx.Done, closing, or proceed. // If it receives from ctx.Done, it returns 0, the StreamError for ctx.Err. -// If it receives from earlyDone, it returns 0, errEarlyDone. +// If it receives from earlyDone, it returns 0, ErrEarlyDone. // If it receives from closing, it returns 0, ErrConnClosing. // If it receives from proceed, it returns the received integer, nil. func wait(ctx context.Context, earlyDone, closing <-chan struct{}, proceed <-chan int) (int, error) { From 7ce43067bde0ac1ac334e4646dac7f16cda4a69c Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 14 Jul 2016 17:17:46 -0700 Subject: [PATCH 09/20] more comments --- transport/transport.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/transport/transport.go b/transport/transport.go index fb6cc4cf..502c1481 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -167,8 +167,9 @@ type Stream struct { // nil for client side Stream. st ServerTransport // ctx is the associated context of the stream. - ctx context.Context - cancel context.CancelFunc + ctx context.Context + cancel context.CancelFunc + // earlyDone is closed when the final status arrives prematurely. earlyDone chan struct{} // method records the associated RPC method of the stream. method string From 0f1aeede97b7a43ed8943a35210340d835e39893 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 14 Jul 2016 18:38:43 -0700 Subject: [PATCH 10/20] do not cancel stream in http2_client.go, use done chanel instead --- stream.go | 11 +++++------ transport/http2_client.go | 20 ++++++++------------ transport/transport.go | 26 +++++++++++++++++++------- 3 files changed, 32 insertions(+), 25 deletions(-) diff --git a/stream.go b/stream.go index a0373600..70c6447a 100644 --- a/stream.go +++ b/stream.go @@ -189,6 +189,10 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth select { case <-t.Error(): // Incur transport error, simply exit. + case <-s.Done(): + err := Errorf(s.StatusCode(), s.StatusDesc()) + cs.finish(err) + cs.closeTransportStream(err) case <-s.Context().Done(): err := s.Context().Err() cs.finish(err) @@ -251,7 +255,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { if err != nil { cs.finish(err) } - if err == nil || err == io.EOF || err == transport.ErrEarlyDone { + if err == nil || err == io.EOF { return } if _, ok := err.(transport.ConnectionError); !ok { @@ -326,11 +330,6 @@ func (cs *clientStream) CloseSend() (err error) { } }() if err == nil || err == io.EOF { - return - } - if err == transport.ErrEarlyDone { - // If the RPC is done prematurely, Stream.RecvMsg(...) needs to be - // called to get the final status and clear the footprint. return nil } if _, ok := err.(transport.ConnectionError); !ok { diff --git a/transport/http2_client.go b/transport/http2_client.go index 2715e2d0..373a5f13 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -202,7 +202,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { // TODO(zhaoq): Handle uint32 overflow of Stream.id. s := &Stream{ id: t.nextID, - earlyDone: make(chan struct{}), + done: make(chan struct{}), method: callHdr.Method, sendCompress: callHdr.SendCompress, buf: newRecvBuffer(), @@ -419,7 +419,7 @@ func (t *http2Client) CloseStream(s *Stream, err error) { // goroutines (e.g., bi-directional streaming), the caller needs // to call cancel on the stream to interrupt the blocking on // other goroutines. - s.cancel() + //s.cancel() s.mu.Lock() if q := s.fc.resetPendingData(); q > 0 { if n := t.fc.onRead(q); n > 0 { @@ -505,15 +505,15 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { size := http2MaxFrameLen s.sendQuotaPool.add(0) // Wait until the stream has some quota to send the data. - sq, err := wait(s.ctx, s.earlyDone, t.shutdownChan, s.sendQuotaPool.acquire()) + sq, err := wait(s.ctx, s.done, t.shutdownChan, s.sendQuotaPool.acquire()) if err != nil { return err } t.sendQuotaPool.add(0) // Wait until the transport has some quota to send the data. - tq, err := wait(s.ctx, s.earlyDone, t.shutdownChan, t.sendQuotaPool.acquire()) + tq, err := wait(s.ctx, s.done, t.shutdownChan, t.sendQuotaPool.acquire()) if err != nil { - if _, ok := err.(StreamError); ok { + if _, ok := err.(StreamError); ok || err == io.EOF { t.sendQuotaPool.cancel() } return err @@ -545,8 +545,8 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { // Indicate there is a writer who is about to write a data frame. t.framer.adjustNumWriters(1) // Got some quota. Try to acquire writing privilege on the transport. - if _, err := wait(s.ctx, s.earlyDone, t.shutdownChan, t.writableChan); err != nil { - if _, ok := err.(StreamError); ok { + if _, err := wait(s.ctx, s.done, t.shutdownChan, t.writableChan); err != nil { + if _, ok := err.(StreamError); ok || err == io.EOF { // Return the connection quota back. t.sendQuotaPool.add(len(p)) } @@ -775,11 +775,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { } s.statusCode = state.statusCode s.statusDesc = state.statusDesc - if s.state != streamWriteDone { - // This is required to interrupt any pending blocking Write calls - // when the final RPC status has been arrived. - close(s.earlyDone) - } + close(s.done) s.state = streamDone s.mu.Unlock() s.write(recvMsg{err: io.EOF}) diff --git a/transport/transport.go b/transport/transport.go index 502c1481..b1f4c077 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -169,8 +169,8 @@ type Stream struct { // ctx is the associated context of the stream. ctx context.Context cancel context.CancelFunc - // earlyDone is closed when the final status arrives prematurely. - earlyDone chan struct{} + // done is closed when the final status arrives prematurely. + done chan struct{} // method records the associated RPC method of the stream. method string recvCompress string @@ -216,6 +216,10 @@ func (s *Stream) SetSendCompress(str string) { s.sendCompress = str } +func (s *Stream) Done() <-chan struct{} { + return s.done +} + // Header acquires the key-value pairs of header metadata once it // is available. It blocks until i) the metadata is ready or ii) there is no // header metadata or iii) the stream is cancelled/expired. @@ -460,7 +464,8 @@ func StreamErrorf(c codes.Code, format string, a ...interface{}) StreamError { } } -var ErrEarlyDone = StreamErrorf(codes.Internal, "rpc is done prematurely") +// ErrDone indicates +//var ErrDone = StreamErrorf(codes.Internal, "rpc is done") // ConnectionErrorf creates an ConnectionError with the specified error description. func ConnectionErrorf(format string, a ...interface{}) ConnectionError { @@ -505,15 +510,22 @@ func ContextErr(err error) StreamError { // wait blocks until it can receive from ctx.Done, closing, or proceed. // If it receives from ctx.Done, it returns 0, the StreamError for ctx.Err. -// If it receives from earlyDone, it returns 0, ErrEarlyDone. +// If it receives from done, it returns 0, io.EOF if ctx is not done; otherwise +// it return the StreamError for ctx.Err. // If it receives from closing, it returns 0, ErrConnClosing. // If it receives from proceed, it returns the received integer, nil. -func wait(ctx context.Context, earlyDone, closing <-chan struct{}, proceed <-chan int) (int, error) { +func wait(ctx context.Context, done, closing <-chan struct{}, proceed <-chan int) (int, error) { select { case <-ctx.Done(): return 0, ContextErr(ctx.Err()) - case <-earlyDone: - return 0, ErrEarlyDone + case <-done: + // User cancellation has precedence. + select { + case <-ctx.Done(): + return 0, ContextErr(ctx.Err()) + default: + } + return 0, io.EOF case <-closing: return 0, ErrConnClosing case i := <-proceed: From 69e799fc76388c8da4c768c0cc6927176c4d9cee Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 14 Jul 2016 18:41:53 -0700 Subject: [PATCH 11/20] fix a comment --- transport/transport.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transport/transport.go b/transport/transport.go index b1f4c077..70654027 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -169,7 +169,7 @@ type Stream struct { // ctx is the associated context of the stream. ctx context.Context cancel context.CancelFunc - // done is closed when the final status arrives prematurely. + // done is closed when the final status arrives. done chan struct{} // method records the associated RPC method of the stream. method string From ec7eacfcc4d6700c3c8258bb8d7f1e63630a1bd5 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 15 Jul 2016 11:02:03 -0700 Subject: [PATCH 12/20] some fix --- stream.go | 4 +--- test/end2end_test.go | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/stream.go b/stream.go index 70c6447a..d5b82a4b 100644 --- a/stream.go +++ b/stream.go @@ -190,9 +190,7 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth case <-t.Error(): // Incur transport error, simply exit. case <-s.Done(): - err := Errorf(s.StatusCode(), s.StatusDesc()) - cs.finish(err) - cs.closeTransportStream(err) + // Simply exit. Leave the next I/O op to do cleanup. case <-s.Context().Done(): err := s.Context().Err() cs.finish(err) diff --git a/test/end2end_test.go b/test/end2end_test.go index c109c885..fb9d91ed 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -1536,7 +1536,7 @@ func testClientStreamingError(t *testing.T, e env) { t.Fatalf("%v.Send(%v) = %v, want ", stream, req, err) } for { - if err := stream.Send(req); err == nil { + if err := stream.Send(req); err != io.EOF { continue } if _, err := stream.CloseAndRecv(); grpc.Code(err) != codes.NotFound { From 1391d99481d8c29ebb9e3aec00d87c114b80c3f8 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 15 Jul 2016 11:18:18 -0700 Subject: [PATCH 13/20] clear dead code --- transport/transport.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/transport/transport.go b/transport/transport.go index 70654027..4dab5745 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -464,9 +464,6 @@ func StreamErrorf(c codes.Code, format string, a ...interface{}) StreamError { } } -// ErrDone indicates -//var ErrDone = StreamErrorf(codes.Internal, "rpc is done") - // ConnectionErrorf creates an ConnectionError with the specified error description. func ConnectionErrorf(format string, a ...interface{}) ConnectionError { return ConnectionError{ From 0223eb7067df32e4c3d16e9ec8e4c988c73f76fe Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 15 Jul 2016 11:21:50 -0700 Subject: [PATCH 14/20] remove remaining debugging logs --- transport/http2_client.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/transport/http2_client.go b/transport/http2_client.go index 373a5f13..f4fad840 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -415,11 +415,6 @@ func (t *http2Client) CloseStream(s *Stream, err error) { if updateStreams { t.streamsQuota.add(1) } - // In case stream sending and receiving are invoked in separate - // goroutines (e.g., bi-directional streaming), the caller needs - // to call cancel on the stream to interrupt the blocking on - // other goroutines. - //s.cancel() s.mu.Lock() if q := s.fc.resetPendingData(); q > 0 { if n := t.fc.onRead(q); n > 0 { From c9f0b89a96ecad493b145c0d475caf79f9787eea Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 15 Jul 2016 14:53:49 -0700 Subject: [PATCH 15/20] clean up underlying footprint when Stream.Done() is read --- stream.go | 3 ++- transport/http2_client.go | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/stream.go b/stream.go index d5b82a4b..89e4ec8a 100644 --- a/stream.go +++ b/stream.go @@ -190,7 +190,8 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth case <-t.Error(): // Incur transport error, simply exit. case <-s.Done(): - // Simply exit. Leave the next I/O op to do cleanup. + cs.finish(err) + cs.closeTransportStream(nil) case <-s.Context().Done(): err := s.Context().Err() cs.finish(err) diff --git a/transport/http2_client.go b/transport/http2_client.go index f4fad840..3a0fdac0 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -692,6 +692,8 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) { grpclog.Println("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error ", f.ErrCode) s.statusCode = codes.Unknown } + //grpclog.Println("DEBUG handleRSTStram: ", f, " | ", f.ErrCode) + close(s.done) s.mu.Unlock() s.write(recvMsg{err: io.EOF}) } From df0a2ae779fec0de382166343969d8cd8ced2ee3 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 15 Jul 2016 15:06:00 -0700 Subject: [PATCH 16/20] finish trace with the right status --- stream.go | 10 +++++++--- transport/http2_client.go | 1 - 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/stream.go b/stream.go index 89e4ec8a..591d21b5 100644 --- a/stream.go +++ b/stream.go @@ -183,14 +183,18 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth cs.t = t cs.s = s cs.p = &parser{r: s} - // Listen on ctx.Done() to detect cancellation when there is no pending - // I/O operations on this stream. + // Listen on ctx.Done() to detect cancellation and s.Done() to detect normal termination + // when there is no pending I/O operations on this stream. go func() { select { case <-t.Error(): // Incur transport error, simply exit. case <-s.Done(): - cs.finish(err) + if s.StatusCode() == codes.OK { + cs.finish(nil) + } else { + cs.finish(Errorf(s.StatusCode(), "%s", s.StatusDesc())) + } cs.closeTransportStream(nil) case <-s.Context().Done(): err := s.Context().Err() diff --git a/transport/http2_client.go b/transport/http2_client.go index 3a0fdac0..457536e9 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -692,7 +692,6 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) { grpclog.Println("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error ", f.ErrCode) s.statusCode = codes.Unknown } - //grpclog.Println("DEBUG handleRSTStram: ", f, " | ", f.ErrCode) close(s.done) s.mu.Unlock() s.write(recvMsg{err: io.EOF}) From 0bfa80150a981db9e4435eae2985da963d83eaa4 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 15 Jul 2016 15:43:42 -0700 Subject: [PATCH 17/20] add a TODO --- stream.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/stream.go b/stream.go index 591d21b5..5cde4be9 100644 --- a/stream.go +++ b/stream.go @@ -190,6 +190,8 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth case <-t.Error(): // Incur transport error, simply exit. case <-s.Done(): + // TODO: The trace of the RPC is terminated here when there is no pending + // I/O, which is probably not the optimal solution. if s.StatusCode() == codes.OK { cs.finish(nil) } else { From 07bf1082901dfc3513a50f59b0fa0d1f8baee6b4 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 15 Jul 2016 16:02:21 -0700 Subject: [PATCH 18/20] fix Invoke to accommodate the change --- call.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/call.go b/call.go index 84ac178c..451f7985 100644 --- a/call.go +++ b/call.go @@ -90,7 +90,10 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd return nil, transport.StreamErrorf(codes.Internal, "grpc: %v", err) } err = t.Write(stream, outBuf, opts) - if err != nil { + // NewStream could lead to an early rejection of the RPC (e.g., the service/method + // does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following + // recvResponse to get the final status. + if err != nil || err != io.EOF { return nil, err } // Sent successfully. From 55b7bd0424bfb6365bccf428ea7bbc067cc0776c Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 15 Jul 2016 16:39:31 -0700 Subject: [PATCH 19/20] add the missing close(s.done) --- call.go | 7 ++----- transport/http2_client.go | 6 ++++++ 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/call.go b/call.go index 451f7985..790e87d3 100644 --- a/call.go +++ b/call.go @@ -86,14 +86,11 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd cbuf = new(bytes.Buffer) } outBuf, err := encode(codec, args, compressor, cbuf) - if err != nil { + if err != nil && err != io.EOF { return nil, transport.StreamErrorf(codes.Internal, "grpc: %v", err) } err = t.Write(stream, outBuf, opts) - // NewStream could lead to an early rejection of the RPC (e.g., the service/method - // does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following - // recvResponse to get the final status. - if err != nil || err != io.EOF { + if err != nil { return nil, err } // Sent successfully. diff --git a/transport/http2_client.go b/transport/http2_client.go index 457536e9..4f22be09 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -647,6 +647,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) { s.state = streamDone s.statusCode = codes.Internal s.statusDesc = err.Error() + close(s.done) s.mu.Unlock() s.write(recvMsg{err: io.EOF}) t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl}) @@ -664,9 +665,14 @@ func (t *http2Client) handleData(f *http2.DataFrame) { // the read direction is closed, and set the status appropriately. if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) { s.mu.Lock() + if s.state == streamDone { + s.mu.Unlock() + return + } s.state = streamDone s.statusCode = codes.Internal s.statusDesc = "server closed the stream without sending trailers" + close(s.done) s.mu.Unlock() s.write(recvMsg{err: io.EOF}) } From 1ea428008b19962f3c55d64376f6de1f492d4f4d Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 15 Jul 2016 16:55:51 -0700 Subject: [PATCH 20/20] add the missing close(s.done) --- call.go | 7 +++++-- test/end2end_test.go | 5 +---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/call.go b/call.go index 790e87d3..d6326ea0 100644 --- a/call.go +++ b/call.go @@ -86,11 +86,14 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd cbuf = new(bytes.Buffer) } outBuf, err := encode(codec, args, compressor, cbuf) - if err != nil && err != io.EOF { + if err != nil { return nil, transport.StreamErrorf(codes.Internal, "grpc: %v", err) } err = t.Write(stream, outBuf, opts) - if err != nil { + // t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method + // does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following + // recvResponse to get the final status. + if err != nil && err != io.EOF { return nil, err } // Sent successfully. diff --git a/test/end2end_test.go b/test/end2end_test.go index fb9d91ed..fdac5815 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -558,10 +558,7 @@ func testTimeoutOnDeadServer(t *testing.T, e env) { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, ", err) } te.srv.Stop() - // Set -1 as the timeout to make sure if transportMonitor gets error - // notification in time the failure path of the 1st invoke of - // ClientConn.wait hits the deadline exceeded error. - ctx, _ := context.WithTimeout(context.Background(), -1) + ctx, _ := context.WithTimeout(context.Background(), time.Millisecond) if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(%v, _) = _, %v, want _, error code: %d", ctx, err, codes.DeadlineExceeded) }