From 70e8b380527e63d26e65f8b588edfa4b4ea239f5 Mon Sep 17 00:00:00 2001
From: Easwar Swaminathan <easwars@google.com>
Date: Mon, 24 Jun 2019 14:53:45 -0700
Subject: [PATCH] test: end2end test improvements separate server and client
 configs. (#2877)

- Seperated and documented the options for client and server sides.
- Better support for multiple grpc.Servers. This will be used in other
  improvements that I have in the works.
- Moved some common functionality from channelz_test.go to
  end2end_test.go.
- Added an option to use the default health service implementation, instead
 of each test creating a new health.Server and passing it in. The
 inidividual tests have not been changed in this PR. I will do that in a
 follow up PR to keep the changes to a reasonable size.
- Fixed one of the tests which had to be fixed because of the separation
  of client and server configs.
---
 test/channelz_test.go |  19 ------
 test/end2end_test.go  | 148 ++++++++++++++++++++++++++----------------
 2 files changed, 93 insertions(+), 74 deletions(-)

diff --git a/test/channelz_test.go b/test/channelz_test.go
index 46f304e4..ebbb3e54 100644
--- a/test/channelz_test.go
+++ b/test/channelz_test.go
@@ -41,7 +41,6 @@ import (
 	"google.golang.org/grpc/keepalive"
 	"google.golang.org/grpc/resolver"
 	"google.golang.org/grpc/resolver/manual"
-	"google.golang.org/grpc/serviceconfig"
 	"google.golang.org/grpc/status"
 	testpb "google.golang.org/grpc/test/grpc_testing"
 	"google.golang.org/grpc/testdata"
@@ -53,16 +52,6 @@ func czCleanupWrapper(cleanup func() error, t *testing.T) {
 	}
 }
 
-func (te *test) startServers(ts testpb.TestServiceServer, num int) {
-	for i := 0; i < num; i++ {
-		te.startServer(ts)
-		te.srvs = append(te.srvs, te.srv.(*grpc.Server))
-		te.srvAddrs = append(te.srvAddrs, te.srvAddr)
-		te.srv = nil
-		te.srvAddr = ""
-	}
-}
-
 func verifyResultWithDelay(f func() (bool, error)) error {
 	var ok bool
 	var err error
@@ -2014,11 +2003,3 @@ func (s) TestCZTraceTopChannelDeletionTraceClear(t *testing.T) {
 		t.Fatal(err)
 	}
 }
-
-func parseCfg(s string) serviceconfig.Config {
-	c, err := serviceconfig.Parse(s)
-	if err != nil {
-		panic(fmt.Sprintf("Error parsing config %q: %v", s, err))
-	}
-	return c
-}
diff --git a/test/end2end_test.go b/test/end2end_test.go
index 396a9bd3..30356a6e 100644
--- a/test/end2end_test.go
+++ b/test/end2end_test.go
@@ -70,6 +70,7 @@ import (
 	"google.golang.org/grpc/resolver"
 	"google.golang.org/grpc/resolver/manual"
 	_ "google.golang.org/grpc/resolver/passthrough"
+	"google.golang.org/grpc/serviceconfig"
 	"google.golang.org/grpc/stats"
 	"google.golang.org/grpc/status"
 	"google.golang.org/grpc/tap"
@@ -470,59 +471,74 @@ func listTestEnv() (envs []env) {
 // func, modified as needed, and then started with its startServer method.
 // It should be cleaned up with the tearDown method.
 type test struct {
-	t *testing.T
-	e env
-
+	// The following are setup in newTest().
+	t      *testing.T
+	e      env
 	ctx    context.Context // valid for life of test, before tearDown
 	cancel context.CancelFunc
 
-	// Configurable knobs, after newTest returns:
-	testServer              testpb.TestServiceServer // nil means none
-	healthServer            *health.Server           // nil means disabled
+	// The following knobs are for the server-side, and should be set after
+	// calling newTest() and before calling startServer().
+
+	// whether or not to expose the server's health via the default health
+	// service implementation.
+	enableHealthServer bool
+	// In almost all cases, one should set the 'enableHealthServer' flag above to
+	// expose the server's health using the default health service
+	// implementation. This should only be used when a non-default health service
+	// implementation is required.
+	healthServer            healthpb.HealthServer
 	maxStream               uint32
 	tapHandle               tap.ServerInHandle
-	maxMsgSize              *int
-	maxClientReceiveMsgSize *int
-	maxClientSendMsgSize    *int
+	maxServerMsgSize        *int
 	maxServerReceiveMsgSize *int
 	maxServerSendMsgSize    *int
-	maxClientHeaderListSize *uint32
 	maxServerHeaderListSize *uint32
+	// Used to test the deprecated API WithCompressor and WithDecompressor.
+	serverCompression           bool
+	unknownHandler              grpc.StreamHandler
+	unaryServerInt              grpc.UnaryServerInterceptor
+	streamServerInt             grpc.StreamServerInterceptor
+	serverInitialWindowSize     int32
+	serverInitialConnWindowSize int32
+	customServerOptions         []grpc.ServerOption
+
+	// The following knobs are for the client-side, and should be set after
+	// calling newTest() and before calling clientConn().
+	maxClientMsgSize        *int
+	maxClientReceiveMsgSize *int
+	maxClientSendMsgSize    *int
+	maxClientHeaderListSize *uint32
 	userAgent               string
-	// clientCompression and serverCompression are set to test the deprecated API
-	// WithCompressor and WithDecompressor.
+	// Used to test the deprecated API WithCompressor and WithDecompressor.
 	clientCompression bool
-	serverCompression bool
-	// clientUseCompression is set to test the new compressor registration API UseCompressor.
+	// Used to test the new compressor registration API UseCompressor.
 	clientUseCompression bool
 	// clientNopCompression is set to create a compressor whose type is not supported.
 	clientNopCompression        bool
 	unaryClientInt              grpc.UnaryClientInterceptor
 	streamClientInt             grpc.StreamClientInterceptor
-	unaryServerInt              grpc.UnaryServerInterceptor
-	streamServerInt             grpc.StreamServerInterceptor
-	unknownHandler              grpc.StreamHandler
 	sc                          <-chan grpc.ServiceConfig
 	customCodec                 encoding.Codec
-	serverInitialWindowSize     int32
-	serverInitialConnWindowSize int32
 	clientInitialWindowSize     int32
 	clientInitialConnWindowSize int32
 	perRPCCreds                 credentials.PerRPCCredentials
 	customDialOptions           []grpc.DialOption
-	customServerOptions         []grpc.ServerOption
 	resolverScheme              string
 
 	// All test dialing is blocking by default. Set this to true if dial
 	// should be non-blocking.
 	nonBlockingDial bool
 
-	// srv and srvAddr are set once startServer is called.
+	// These are are set once startServer is called. The common case is to have
+	// only one testServer.
 	srv     stopper
+	hSrv    healthpb.HealthServer
 	srvAddr string
 
-	// srvs and srvAddrs are set once startServers is called.
-	srvs     []*grpc.Server
+	// These are are set once startServers is called.
+	srvs     []stopper
+	hSrvs    []healthpb.HealthServer
 	srvAddrs []string
 
 	cc          *grpc.ClientConn // nil until requested via clientConn
@@ -553,10 +569,8 @@ func (te *test) tearDown() {
 	if te.srv != nil {
 		te.srv.Stop()
 	}
-	if len(te.srvs) != 0 {
-		for _, s := range te.srvs {
-			s.Stop()
-		}
+	for _, s := range te.srvs {
+		s.Stop()
 	}
 }
 
@@ -574,11 +588,10 @@ func newTest(t *testing.T, e env) *test {
 }
 
 func (te *test) listenAndServe(ts testpb.TestServiceServer, listen func(network, address string) (net.Listener, error)) net.Listener {
-	te.testServer = ts
 	te.t.Logf("Running test in %s environment...", te.e.name)
 	sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(te.maxStream)}
-	if te.maxMsgSize != nil {
-		sopts = append(sopts, grpc.MaxMsgSize(*te.maxMsgSize))
+	if te.maxServerMsgSize != nil {
+		sopts = append(sopts, grpc.MaxMsgSize(*te.maxServerMsgSize))
 	}
 	if te.maxServerReceiveMsgSize != nil {
 		sopts = append(sopts, grpc.MaxRecvMsgSize(*te.maxServerReceiveMsgSize))
@@ -635,13 +648,20 @@ func (te *test) listenAndServe(ts testpb.TestServiceServer, listen func(network,
 	}
 	sopts = append(sopts, te.customServerOptions...)
 	s := grpc.NewServer(sopts...)
-	te.srv = s
-	if te.healthServer != nil {
-		healthgrpc.RegisterHealthServer(s, te.healthServer)
+	if ts != nil {
+		testpb.RegisterTestServiceServer(s, ts)
 	}
-	if te.testServer != nil {
-		testpb.RegisterTestServiceServer(s, te.testServer)
+
+	// Create a new default health server if enableHealthServer is set, or use
+	// the provided one.
+	hs := te.healthServer
+	if te.enableHealthServer {
+		hs = health.NewServer()
 	}
+	if hs != nil {
+		healthgrpc.RegisterHealthServer(s, hs)
+	}
+
 	addr := la
 	switch te.e.network {
 	case "unix":
@@ -653,6 +673,8 @@ func (te *test) listenAndServe(ts testpb.TestServiceServer, listen func(network,
 		addr = "localhost:" + port
 	}
 
+	te.srv = s
+	te.hSrv = hs
 	te.srvAddr = addr
 
 	if te.e.httpHandler {
@@ -697,12 +719,25 @@ func (te *test) startServerWithConnControl(ts testpb.TestServiceServer) *listene
 	return l.(*listenerWrapper)
 }
 
-// startServer starts a gRPC server listening. Callers should defer a
-// call to te.tearDown to clean up.
+// startServer starts a gRPC server exposing the provided TestService
+// implementation. Callers should defer a call to te.tearDown to clean up
 func (te *test) startServer(ts testpb.TestServiceServer) {
 	te.listenAndServe(ts, net.Listen)
 }
 
+// startServers starts 'num' gRPC servers exposing the provided TestService.
+func (te *test) startServers(ts testpb.TestServiceServer, num int) {
+	for i := 0; i < num; i++ {
+		te.startServer(ts)
+		te.srvs = append(te.srvs, te.srv.(*grpc.Server))
+		te.hSrvs = append(te.hSrvs, te.hSrv)
+		te.srvAddrs = append(te.srvAddrs, te.srvAddr)
+		te.srv = nil
+		te.hSrv = nil
+		te.srvAddr = ""
+	}
+}
+
 type nopCompressor struct {
 	grpc.Compressor
 }
@@ -757,8 +792,8 @@ func (te *test) configDial(opts ...grpc.DialOption) ([]grpc.DialOption, string)
 	if te.streamClientInt != nil {
 		opts = append(opts, grpc.WithStreamInterceptor(te.streamClientInt))
 	}
-	if te.maxMsgSize != nil {
-		opts = append(opts, grpc.WithMaxMsgSize(*te.maxMsgSize))
+	if te.maxClientMsgSize != nil {
+		opts = append(opts, grpc.WithMaxMsgSize(*te.maxClientMsgSize))
 	}
 	if te.maxClientReceiveMsgSize != nil {
 		opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(*te.maxClientReceiveMsgSize)))
@@ -2881,15 +2916,16 @@ func (s) TestExceedMsgLimit(t *testing.T) {
 
 func testExceedMsgLimit(t *testing.T, e env) {
 	te := newTest(t, e)
-	te.maxMsgSize = newInt(1024)
+	maxMsgSize := 1024
+	te.maxServerMsgSize, te.maxClientMsgSize = newInt(maxMsgSize), newInt(maxMsgSize)
 	te.startServer(&testServer{security: e.security})
 	defer te.tearDown()
 	tc := testpb.NewTestServiceClient(te.clientConn())
 
-	argSize := int32(*te.maxMsgSize + 1)
+	largeSize := int32(maxMsgSize + 1)
 	const smallSize = 1
 
-	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
+	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -2898,23 +2934,23 @@ func testExceedMsgLimit(t *testing.T, e env) {
 		t.Fatal(err)
 	}
 
-	// Test on server side for unary RPC.
+	// Make sure the server cannot receive a unary RPC of largeSize.
 	req := &testpb.SimpleRequest{
 		ResponseType: testpb.PayloadType_COMPRESSABLE,
 		ResponseSize: smallSize,
-		Payload:      payload,
+		Payload:      largePayload,
 	}
 	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
 		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
 	}
-	// Test on client side for unary RPC.
-	req.ResponseSize = int32(*te.maxMsgSize) + 1
+	// Make sure the client cannot receive a unary RPC of largeSize.
+	req.ResponseSize = largeSize
 	req.Payload = smallPayload
 	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
 		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
 	}
 
-	// Test on server side for streaming RPC.
+	// Make sure the server cannot receive a streaming RPC of largeSize.
 	stream, err := tc.FullDuplexCall(te.ctx)
 	if err != nil {
 		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
@@ -2925,15 +2961,10 @@ func testExceedMsgLimit(t *testing.T, e env) {
 		},
 	}
 
-	spayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(*te.maxMsgSize+1))
-	if err != nil {
-		t.Fatal(err)
-	}
-
 	sreq := &testpb.StreamingOutputCallRequest{
 		ResponseType:       testpb.PayloadType_COMPRESSABLE,
 		ResponseParameters: respParam,
-		Payload:            spayload,
+		Payload:            largePayload,
 	}
 	if err := stream.Send(sreq); err != nil {
 		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
@@ -2947,7 +2978,7 @@ func testExceedMsgLimit(t *testing.T, e env) {
 	if err != nil {
 		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
 	}
-	respParam[0].Size = int32(*te.maxMsgSize) + 1
+	respParam[0].Size = largeSize
 	sreq.Payload = smallPayload
 	if err := stream.Send(sreq); err != nil {
 		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
@@ -2955,7 +2986,6 @@ func testExceedMsgLimit(t *testing.T, e env) {
 	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
 		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
 	}
-
 }
 
 func (s) TestPeerClientSide(t *testing.T) {
@@ -7412,3 +7442,11 @@ func doHTTPHeaderTest(t *testing.T, errCode codes.Code, headerFields ...[]string
 		t.Fatalf("stream.Recv() = _, %v, want error code: %v", err, errCode)
 	}
 }
+
+func parseCfg(s string) serviceconfig.Config {
+	c, err := serviceconfig.Parse(s)
+	if err != nil {
+		panic(fmt.Sprintf("Error parsing config %q: %v", s, err))
+	}
+	return c
+}