From 033c91440bf8642aef81787e2754b8c3bb018ccb Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Mon, 26 Sep 2016 15:07:31 -0700 Subject: [PATCH] Fix testStreamingRPCTimeoutServerError --- test/end2end_test.go | 58 ++++++++++++++++++++++++-------------------- 1 file changed, 32 insertions(+), 26 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 7adc247c..730d8208 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -1698,20 +1698,18 @@ func testFailedServerStreaming(t *testing.T, e env) { // checkTimeoutErrorServer is a gRPC server checks context timeout error in FullDuplexCall(). // It is only used in TestStreamingRPCTimeoutServerError. type checkTimeoutErrorServer struct { - t *testing.T + t *testing.T + done chan struct{} testpb.TestServiceServer } -func (s checkTimeoutErrorServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error { +func (s *checkTimeoutErrorServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error { + defer close(s.done) for { _, err := stream.Recv() - if err == io.EOF { - // read done. - return nil - } if err != nil { if grpc.Code(err) != codes.DeadlineExceeded { - s.t.Fatalf("stream.Recv(_) = _, %v, want error code %s", err, codes.DeadlineExceeded) + s.t.Errorf("stream.Recv() = _, %v, want error code %s", err, codes.DeadlineExceeded) } return err } @@ -1721,7 +1719,7 @@ func (s checkTimeoutErrorServer) FullDuplexCall(stream testpb.TestService_FullDu }, }); err != nil { if grpc.Code(err) != codes.DeadlineExceeded { - s.t.Fatalf("stream.Send(_) = %v, want error code %s", err, codes.DeadlineExceeded) + s.t.Errorf("stream.Send(_) = %v, want error code %s", err, codes.DeadlineExceeded) } return err } @@ -1739,33 +1737,41 @@ func TestStreamingRPCTimeoutServerError(t *testing.T) { // When context timeout happens on client side, server should get deadline exceeded error. func testStreamingRPCTimeoutServerError(t *testing.T, e env) { te := newTest(t, e) - te.startServer(checkTimeoutErrorServer{t: t}) + serverDone := make(chan struct{}) + te.startServer(&checkTimeoutErrorServer{t: t, done: serverDone}) + defer te.tearDown() cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) req := &testpb.StreamingOutputCallRequest{} - duration := 100 * time.Millisecond - ctx, _ := context.WithTimeout(context.Background(), duration) - stream, err := tc.FullDuplexCall(ctx) - if err != nil { - t.Errorf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) - return - } - for { - err := stream.Send(req) - if err != nil { - break + for duration := 50 * time.Millisecond; ; duration *= 2 { + ctx, _ := context.WithTimeout(context.Background(), duration) + stream, err := tc.FullDuplexCall(ctx) + if grpc.Code(err) == codes.DeadlineExceeded { + // Redo test with double timeout. + continue } - _, err = stream.Recv() if err != nil { - break + t.Errorf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) + return + } + for { + err := stream.Send(req) + if err != nil { + break + } + _, err = stream.Recv() + if err != nil { + break + } } - } - // Wait for context timeout on server before closing connection. - time.Sleep(duration) - te.tearDown() + // Wait for context timeout on server before closing connection + // to make sure the server will get timeout error. + <-serverDone + break + } } // concurrentSendServer is a TestServiceServer whose