diff --git a/test/channelz_test.go b/test/channelz_test.go index 46f304e4..ebbb3e54 100644 --- a/test/channelz_test.go +++ b/test/channelz_test.go @@ -41,7 +41,6 @@ import ( "google.golang.org/grpc/keepalive" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" - "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/status" testpb "google.golang.org/grpc/test/grpc_testing" "google.golang.org/grpc/testdata" @@ -53,16 +52,6 @@ func czCleanupWrapper(cleanup func() error, t *testing.T) { } } -func (te *test) startServers(ts testpb.TestServiceServer, num int) { - for i := 0; i < num; i++ { - te.startServer(ts) - te.srvs = append(te.srvs, te.srv.(*grpc.Server)) - te.srvAddrs = append(te.srvAddrs, te.srvAddr) - te.srv = nil - te.srvAddr = "" - } -} - func verifyResultWithDelay(f func() (bool, error)) error { var ok bool var err error @@ -2014,11 +2003,3 @@ func (s) TestCZTraceTopChannelDeletionTraceClear(t *testing.T) { t.Fatal(err) } } - -func parseCfg(s string) serviceconfig.Config { - c, err := serviceconfig.Parse(s) - if err != nil { - panic(fmt.Sprintf("Error parsing config %q: %v", s, err)) - } - return c -} diff --git a/test/end2end_test.go b/test/end2end_test.go index 396a9bd3..30356a6e 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -70,6 +70,7 @@ import ( "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" _ "google.golang.org/grpc/resolver/passthrough" + "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/stats" "google.golang.org/grpc/status" "google.golang.org/grpc/tap" @@ -470,59 +471,74 @@ func listTestEnv() (envs []env) { // func, modified as needed, and then started with its startServer method. // It should be cleaned up with the tearDown method. type test struct { - t *testing.T - e env - + // The following are setup in newTest(). + t *testing.T + e env ctx context.Context // valid for life of test, before tearDown cancel context.CancelFunc - // Configurable knobs, after newTest returns: - testServer testpb.TestServiceServer // nil means none - healthServer *health.Server // nil means disabled + // The following knobs are for the server-side, and should be set after + // calling newTest() and before calling startServer(). + + // whether or not to expose the server's health via the default health + // service implementation. + enableHealthServer bool + // In almost all cases, one should set the 'enableHealthServer' flag above to + // expose the server's health using the default health service + // implementation. This should only be used when a non-default health service + // implementation is required. + healthServer healthpb.HealthServer maxStream uint32 tapHandle tap.ServerInHandle - maxMsgSize *int - maxClientReceiveMsgSize *int - maxClientSendMsgSize *int + maxServerMsgSize *int maxServerReceiveMsgSize *int maxServerSendMsgSize *int - maxClientHeaderListSize *uint32 maxServerHeaderListSize *uint32 + // Used to test the deprecated API WithCompressor and WithDecompressor. + serverCompression bool + unknownHandler grpc.StreamHandler + unaryServerInt grpc.UnaryServerInterceptor + streamServerInt grpc.StreamServerInterceptor + serverInitialWindowSize int32 + serverInitialConnWindowSize int32 + customServerOptions []grpc.ServerOption + + // The following knobs are for the client-side, and should be set after + // calling newTest() and before calling clientConn(). + maxClientMsgSize *int + maxClientReceiveMsgSize *int + maxClientSendMsgSize *int + maxClientHeaderListSize *uint32 userAgent string - // clientCompression and serverCompression are set to test the deprecated API - // WithCompressor and WithDecompressor. + // Used to test the deprecated API WithCompressor and WithDecompressor. clientCompression bool - serverCompression bool - // clientUseCompression is set to test the new compressor registration API UseCompressor. + // Used to test the new compressor registration API UseCompressor. clientUseCompression bool // clientNopCompression is set to create a compressor whose type is not supported. clientNopCompression bool unaryClientInt grpc.UnaryClientInterceptor streamClientInt grpc.StreamClientInterceptor - unaryServerInt grpc.UnaryServerInterceptor - streamServerInt grpc.StreamServerInterceptor - unknownHandler grpc.StreamHandler sc <-chan grpc.ServiceConfig customCodec encoding.Codec - serverInitialWindowSize int32 - serverInitialConnWindowSize int32 clientInitialWindowSize int32 clientInitialConnWindowSize int32 perRPCCreds credentials.PerRPCCredentials customDialOptions []grpc.DialOption - customServerOptions []grpc.ServerOption resolverScheme string // All test dialing is blocking by default. Set this to true if dial // should be non-blocking. nonBlockingDial bool - // srv and srvAddr are set once startServer is called. + // These are are set once startServer is called. The common case is to have + // only one testServer. srv stopper + hSrv healthpb.HealthServer srvAddr string - // srvs and srvAddrs are set once startServers is called. - srvs []*grpc.Server + // These are are set once startServers is called. + srvs []stopper + hSrvs []healthpb.HealthServer srvAddrs []string cc *grpc.ClientConn // nil until requested via clientConn @@ -553,10 +569,8 @@ func (te *test) tearDown() { if te.srv != nil { te.srv.Stop() } - if len(te.srvs) != 0 { - for _, s := range te.srvs { - s.Stop() - } + for _, s := range te.srvs { + s.Stop() } } @@ -574,11 +588,10 @@ func newTest(t *testing.T, e env) *test { } func (te *test) listenAndServe(ts testpb.TestServiceServer, listen func(network, address string) (net.Listener, error)) net.Listener { - te.testServer = ts te.t.Logf("Running test in %s environment...", te.e.name) sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(te.maxStream)} - if te.maxMsgSize != nil { - sopts = append(sopts, grpc.MaxMsgSize(*te.maxMsgSize)) + if te.maxServerMsgSize != nil { + sopts = append(sopts, grpc.MaxMsgSize(*te.maxServerMsgSize)) } if te.maxServerReceiveMsgSize != nil { sopts = append(sopts, grpc.MaxRecvMsgSize(*te.maxServerReceiveMsgSize)) @@ -635,13 +648,20 @@ func (te *test) listenAndServe(ts testpb.TestServiceServer, listen func(network, } sopts = append(sopts, te.customServerOptions...) s := grpc.NewServer(sopts...) - te.srv = s - if te.healthServer != nil { - healthgrpc.RegisterHealthServer(s, te.healthServer) + if ts != nil { + testpb.RegisterTestServiceServer(s, ts) } - if te.testServer != nil { - testpb.RegisterTestServiceServer(s, te.testServer) + + // Create a new default health server if enableHealthServer is set, or use + // the provided one. + hs := te.healthServer + if te.enableHealthServer { + hs = health.NewServer() } + if hs != nil { + healthgrpc.RegisterHealthServer(s, hs) + } + addr := la switch te.e.network { case "unix": @@ -653,6 +673,8 @@ func (te *test) listenAndServe(ts testpb.TestServiceServer, listen func(network, addr = "localhost:" + port } + te.srv = s + te.hSrv = hs te.srvAddr = addr if te.e.httpHandler { @@ -697,12 +719,25 @@ func (te *test) startServerWithConnControl(ts testpb.TestServiceServer) *listene return l.(*listenerWrapper) } -// startServer starts a gRPC server listening. Callers should defer a -// call to te.tearDown to clean up. +// startServer starts a gRPC server exposing the provided TestService +// implementation. Callers should defer a call to te.tearDown to clean up func (te *test) startServer(ts testpb.TestServiceServer) { te.listenAndServe(ts, net.Listen) } +// startServers starts 'num' gRPC servers exposing the provided TestService. +func (te *test) startServers(ts testpb.TestServiceServer, num int) { + for i := 0; i < num; i++ { + te.startServer(ts) + te.srvs = append(te.srvs, te.srv.(*grpc.Server)) + te.hSrvs = append(te.hSrvs, te.hSrv) + te.srvAddrs = append(te.srvAddrs, te.srvAddr) + te.srv = nil + te.hSrv = nil + te.srvAddr = "" + } +} + type nopCompressor struct { grpc.Compressor } @@ -757,8 +792,8 @@ func (te *test) configDial(opts ...grpc.DialOption) ([]grpc.DialOption, string) if te.streamClientInt != nil { opts = append(opts, grpc.WithStreamInterceptor(te.streamClientInt)) } - if te.maxMsgSize != nil { - opts = append(opts, grpc.WithMaxMsgSize(*te.maxMsgSize)) + if te.maxClientMsgSize != nil { + opts = append(opts, grpc.WithMaxMsgSize(*te.maxClientMsgSize)) } if te.maxClientReceiveMsgSize != nil { opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(*te.maxClientReceiveMsgSize))) @@ -2881,15 +2916,16 @@ func (s) TestExceedMsgLimit(t *testing.T) { func testExceedMsgLimit(t *testing.T, e env) { te := newTest(t, e) - te.maxMsgSize = newInt(1024) + maxMsgSize := 1024 + te.maxServerMsgSize, te.maxClientMsgSize = newInt(maxMsgSize), newInt(maxMsgSize) te.startServer(&testServer{security: e.security}) defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) - argSize := int32(*te.maxMsgSize + 1) + largeSize := int32(maxMsgSize + 1) const smallSize = 1 - payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) + largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) if err != nil { t.Fatal(err) } @@ -2898,23 +2934,23 @@ func testExceedMsgLimit(t *testing.T, e env) { t.Fatal(err) } - // Test on server side for unary RPC. + // Make sure the server cannot receive a unary RPC of largeSize. req := &testpb.SimpleRequest{ ResponseType: testpb.PayloadType_COMPRESSABLE, ResponseSize: smallSize, - Payload: payload, + Payload: largePayload, } if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } - // Test on client side for unary RPC. - req.ResponseSize = int32(*te.maxMsgSize) + 1 + // Make sure the client cannot receive a unary RPC of largeSize. + req.ResponseSize = largeSize req.Payload = smallPayload if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } - // Test on server side for streaming RPC. + // Make sure the server cannot receive a streaming RPC of largeSize. stream, err := tc.FullDuplexCall(te.ctx) if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) @@ -2925,15 +2961,10 @@ func testExceedMsgLimit(t *testing.T, e env) { }, } - spayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(*te.maxMsgSize+1)) - if err != nil { - t.Fatal(err) - } - sreq := &testpb.StreamingOutputCallRequest{ ResponseType: testpb.PayloadType_COMPRESSABLE, ResponseParameters: respParam, - Payload: spayload, + Payload: largePayload, } if err := stream.Send(sreq); err != nil { t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) @@ -2947,7 +2978,7 @@ func testExceedMsgLimit(t *testing.T, e env) { if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) } - respParam[0].Size = int32(*te.maxMsgSize) + 1 + respParam[0].Size = largeSize sreq.Payload = smallPayload if err := stream.Send(sreq); err != nil { t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) @@ -2955,7 +2986,6 @@ func testExceedMsgLimit(t *testing.T, e env) { if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) } - } func (s) TestPeerClientSide(t *testing.T) { @@ -7412,3 +7442,11 @@ func doHTTPHeaderTest(t *testing.T, errCode codes.Code, headerFields ...[]string t.Fatalf("stream.Recv() = _, %v, want error code: %v", err, errCode) } } + +func parseCfg(s string) serviceconfig.Config { + c, err := serviceconfig.Parse(s) + if err != nil { + panic(fmt.Sprintf("Error parsing config %q: %v", s, err)) + } + return c +}