diff --git a/test/end2end_test.go b/test/end2end_test.go index 6d85c65b..504b8540 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -108,6 +108,7 @@ type testServer struct { setAndSendHeader bool // whether to call setHeader and sendHeader. setHeaderOnly bool // whether to only call setHeader, not sendHeader. multipleSetTrailer bool // whether to call setTrailer multiple times. + unaryCallSleepTime time.Duration } func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { @@ -203,7 +204,7 @@ func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (* } } // Simulate some service delay. - time.Sleep(time.Second) + time.Sleep(s.unaryCallSleepTime) payload, err := newPayload(in.GetResponseType(), in.GetResponseSize()) if err != nil { @@ -693,7 +694,7 @@ type lazyConn struct { func (l *lazyConn) Write(b []byte) (int, error) { if atomic.LoadInt32(&(l.beLazy)) == 1 { // The sleep duration here needs to less than the leakCheck deadline. - time.Sleep(time.Second * 5) + time.Sleep(time.Second) } return l.Conn.Write(b) } @@ -721,8 +722,7 @@ func TestContextDeadlineNotIgnored(t *testing.T) { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, ", err) } atomic.StoreInt32(&(lc.beLazy), 1) - timeout := time.Second * 1 - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) defer cancel() t1 := time.Now() if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded { @@ -1126,7 +1126,7 @@ func testClientConnCloseAfterGoAwayWithActiveStream(t *testing.T, e env) { te.srv.GracefulStop() close(done) }() - time.Sleep(time.Second) + time.Sleep(50 * time.Millisecond) cc.Close() timeout := time.NewTimer(time.Second) select { @@ -2923,7 +2923,7 @@ func TestRPCTimeout(t *testing.T) { // TODO(zhaoq): Have a better test coverage of timeout and cancellation mechanism. func testRPCTimeout(t *testing.T, e env) { te := newTest(t, e) - te.startServer(&testServer{security: e.security}) + te.startServer(&testServer{security: e.security, unaryCallSleepTime: 50 * time.Millisecond}) defer te.tearDown() cc := te.clientConn() @@ -2961,7 +2961,7 @@ func TestCancel(t *testing.T) { func testCancel(t *testing.T, e env) { te := newTest(t, e) te.declareLogNoise("grpc: the client connection is closing; please retry") - te.startServer(&testServer{security: e.security}) + te.startServer(&testServer{security: e.security, unaryCallSleepTime: time.Second}) defer te.tearDown() cc := te.clientConn() @@ -3021,11 +3021,10 @@ func testCancelNoIO(t *testing.T, e env) { // succeeding. // TODO(bradfitz): add internal test hook for this (Issue 534) for { - ctx, cancelSecond := context.WithTimeout(context.Background(), 250*time.Millisecond) + ctx, cancelSecond := context.WithTimeout(context.Background(), 50*time.Millisecond) _, err := tc.StreamingInputCall(ctx) cancelSecond() if err == nil { - time.Sleep(50 * time.Millisecond) continue } if grpc.Code(err) == codes.DeadlineExceeded { @@ -3036,21 +3035,19 @@ func testCancelNoIO(t *testing.T, e env) { // If there are any RPCs in flight before the client receives // the max streams setting, let them be expired. // TODO(bradfitz): add internal test hook for this (Issue 534) - time.Sleep(500 * time.Millisecond) + time.Sleep(50 * time.Millisecond) - ch := make(chan struct{}) go func() { - defer close(ch) - - // This should be blocked until the 1st is canceled. - ctx, cancelThird := context.WithTimeout(context.Background(), 2*time.Second) - if _, err := tc.StreamingInputCall(ctx); err != nil { - t.Errorf("%v.StreamingInputCall(_) = _, %v, want _, ", tc, err) - } - cancelThird() + time.Sleep(50 * time.Millisecond) + cancelFirst() }() - cancelFirst() - <-ch + + // This should be blocked until the 1st is canceled, then succeed. + ctx, cancelThird := context.WithTimeout(context.Background(), 500*time.Millisecond) + if _, err := tc.StreamingInputCall(ctx); err != nil { + t.Errorf("%v.StreamingInputCall(_) = _, %v, want _, ", tc, err) + } + cancelThird() } // The following tests the gRPC streaming RPC implementations. @@ -3528,11 +3525,11 @@ func testExceedMaxStreamsLimit(t *testing.T, e env) { } // Loop until receiving the new max stream setting from the server. for { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) defer cancel() _, err := tc.StreamingInputCall(ctx) if err == nil { - time.Sleep(time.Second) + time.Sleep(50 * time.Millisecond) continue } if grpc.Code(err) == codes.DeadlineExceeded { @@ -3580,7 +3577,7 @@ func testExceedDefaultMaxStreamsLimit(t *testing.T, e env) { } // Trying to create one more should timeout. - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() _, err := tc.StreamingInputCall(ctx) if err == nil || grpc.Code(err) != codes.DeadlineExceeded { @@ -3613,11 +3610,11 @@ func testStreamsQuotaRecovery(t *testing.T, e env) { } // Loop until the new max stream setting is effective. for { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) defer cancel() _, err := tc.StreamingInputCall(ctx) if err == nil { - time.Sleep(time.Second) + time.Sleep(50 * time.Millisecond) continue } if grpc.Code(err) == codes.DeadlineExceeded {