test: end2end test improvements separate server and client configs. (#2877)

- Seperated and documented the options for client and server sides.
- Better support for multiple grpc.Servers. This will be used in other
  improvements that I have in the works.
- Moved some common functionality from channelz_test.go to
  end2end_test.go.
- Added an option to use the default health service implementation, instead
 of each test creating a new health.Server and passing it in. The
 inidividual tests have not been changed in this PR. I will do that in a
 follow up PR to keep the changes to a reasonable size.
- Fixed one of the tests which had to be fixed because of the separation
  of client and server configs.
This commit is contained in:
Easwar Swaminathan
2019-06-24 14:53:45 -07:00
committed by GitHub
parent e8b58ff637
commit 70e8b38052
2 changed files with 93 additions and 74 deletions

View File

@ -41,7 +41,6 @@ import (
"google.golang.org/grpc/keepalive" "google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual" "google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
testpb "google.golang.org/grpc/test/grpc_testing" testpb "google.golang.org/grpc/test/grpc_testing"
"google.golang.org/grpc/testdata" "google.golang.org/grpc/testdata"
@ -53,16 +52,6 @@ func czCleanupWrapper(cleanup func() error, t *testing.T) {
} }
} }
func (te *test) startServers(ts testpb.TestServiceServer, num int) {
for i := 0; i < num; i++ {
te.startServer(ts)
te.srvs = append(te.srvs, te.srv.(*grpc.Server))
te.srvAddrs = append(te.srvAddrs, te.srvAddr)
te.srv = nil
te.srvAddr = ""
}
}
func verifyResultWithDelay(f func() (bool, error)) error { func verifyResultWithDelay(f func() (bool, error)) error {
var ok bool var ok bool
var err error var err error
@ -2014,11 +2003,3 @@ func (s) TestCZTraceTopChannelDeletionTraceClear(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
} }
func parseCfg(s string) serviceconfig.Config {
c, err := serviceconfig.Parse(s)
if err != nil {
panic(fmt.Sprintf("Error parsing config %q: %v", s, err))
}
return c
}

View File

@ -70,6 +70,7 @@ import (
"google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual" "google.golang.org/grpc/resolver/manual"
_ "google.golang.org/grpc/resolver/passthrough" _ "google.golang.org/grpc/resolver/passthrough"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/stats" "google.golang.org/grpc/stats"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"google.golang.org/grpc/tap" "google.golang.org/grpc/tap"
@ -470,59 +471,74 @@ func listTestEnv() (envs []env) {
// func, modified as needed, and then started with its startServer method. // func, modified as needed, and then started with its startServer method.
// It should be cleaned up with the tearDown method. // It should be cleaned up with the tearDown method.
type test struct { type test struct {
t *testing.T // The following are setup in newTest().
e env t *testing.T
e env
ctx context.Context // valid for life of test, before tearDown ctx context.Context // valid for life of test, before tearDown
cancel context.CancelFunc cancel context.CancelFunc
// Configurable knobs, after newTest returns: // The following knobs are for the server-side, and should be set after
testServer testpb.TestServiceServer // nil means none // calling newTest() and before calling startServer().
healthServer *health.Server // nil means disabled
// whether or not to expose the server's health via the default health
// service implementation.
enableHealthServer bool
// In almost all cases, one should set the 'enableHealthServer' flag above to
// expose the server's health using the default health service
// implementation. This should only be used when a non-default health service
// implementation is required.
healthServer healthpb.HealthServer
maxStream uint32 maxStream uint32
tapHandle tap.ServerInHandle tapHandle tap.ServerInHandle
maxMsgSize *int maxServerMsgSize *int
maxClientReceiveMsgSize *int
maxClientSendMsgSize *int
maxServerReceiveMsgSize *int maxServerReceiveMsgSize *int
maxServerSendMsgSize *int maxServerSendMsgSize *int
maxClientHeaderListSize *uint32
maxServerHeaderListSize *uint32 maxServerHeaderListSize *uint32
// Used to test the deprecated API WithCompressor and WithDecompressor.
serverCompression bool
unknownHandler grpc.StreamHandler
unaryServerInt grpc.UnaryServerInterceptor
streamServerInt grpc.StreamServerInterceptor
serverInitialWindowSize int32
serverInitialConnWindowSize int32
customServerOptions []grpc.ServerOption
// The following knobs are for the client-side, and should be set after
// calling newTest() and before calling clientConn().
maxClientMsgSize *int
maxClientReceiveMsgSize *int
maxClientSendMsgSize *int
maxClientHeaderListSize *uint32
userAgent string userAgent string
// clientCompression and serverCompression are set to test the deprecated API // Used to test the deprecated API WithCompressor and WithDecompressor.
// WithCompressor and WithDecompressor.
clientCompression bool clientCompression bool
serverCompression bool // Used to test the new compressor registration API UseCompressor.
// clientUseCompression is set to test the new compressor registration API UseCompressor.
clientUseCompression bool clientUseCompression bool
// clientNopCompression is set to create a compressor whose type is not supported. // clientNopCompression is set to create a compressor whose type is not supported.
clientNopCompression bool clientNopCompression bool
unaryClientInt grpc.UnaryClientInterceptor unaryClientInt grpc.UnaryClientInterceptor
streamClientInt grpc.StreamClientInterceptor streamClientInt grpc.StreamClientInterceptor
unaryServerInt grpc.UnaryServerInterceptor
streamServerInt grpc.StreamServerInterceptor
unknownHandler grpc.StreamHandler
sc <-chan grpc.ServiceConfig sc <-chan grpc.ServiceConfig
customCodec encoding.Codec customCodec encoding.Codec
serverInitialWindowSize int32
serverInitialConnWindowSize int32
clientInitialWindowSize int32 clientInitialWindowSize int32
clientInitialConnWindowSize int32 clientInitialConnWindowSize int32
perRPCCreds credentials.PerRPCCredentials perRPCCreds credentials.PerRPCCredentials
customDialOptions []grpc.DialOption customDialOptions []grpc.DialOption
customServerOptions []grpc.ServerOption
resolverScheme string resolverScheme string
// All test dialing is blocking by default. Set this to true if dial // All test dialing is blocking by default. Set this to true if dial
// should be non-blocking. // should be non-blocking.
nonBlockingDial bool nonBlockingDial bool
// srv and srvAddr are set once startServer is called. // These are are set once startServer is called. The common case is to have
// only one testServer.
srv stopper srv stopper
hSrv healthpb.HealthServer
srvAddr string srvAddr string
// srvs and srvAddrs are set once startServers is called. // These are are set once startServers is called.
srvs []*grpc.Server srvs []stopper
hSrvs []healthpb.HealthServer
srvAddrs []string srvAddrs []string
cc *grpc.ClientConn // nil until requested via clientConn cc *grpc.ClientConn // nil until requested via clientConn
@ -553,10 +569,8 @@ func (te *test) tearDown() {
if te.srv != nil { if te.srv != nil {
te.srv.Stop() te.srv.Stop()
} }
if len(te.srvs) != 0 { for _, s := range te.srvs {
for _, s := range te.srvs { s.Stop()
s.Stop()
}
} }
} }
@ -574,11 +588,10 @@ func newTest(t *testing.T, e env) *test {
} }
func (te *test) listenAndServe(ts testpb.TestServiceServer, listen func(network, address string) (net.Listener, error)) net.Listener { func (te *test) listenAndServe(ts testpb.TestServiceServer, listen func(network, address string) (net.Listener, error)) net.Listener {
te.testServer = ts
te.t.Logf("Running test in %s environment...", te.e.name) te.t.Logf("Running test in %s environment...", te.e.name)
sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(te.maxStream)} sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(te.maxStream)}
if te.maxMsgSize != nil { if te.maxServerMsgSize != nil {
sopts = append(sopts, grpc.MaxMsgSize(*te.maxMsgSize)) sopts = append(sopts, grpc.MaxMsgSize(*te.maxServerMsgSize))
} }
if te.maxServerReceiveMsgSize != nil { if te.maxServerReceiveMsgSize != nil {
sopts = append(sopts, grpc.MaxRecvMsgSize(*te.maxServerReceiveMsgSize)) sopts = append(sopts, grpc.MaxRecvMsgSize(*te.maxServerReceiveMsgSize))
@ -635,13 +648,20 @@ func (te *test) listenAndServe(ts testpb.TestServiceServer, listen func(network,
} }
sopts = append(sopts, te.customServerOptions...) sopts = append(sopts, te.customServerOptions...)
s := grpc.NewServer(sopts...) s := grpc.NewServer(sopts...)
te.srv = s if ts != nil {
if te.healthServer != nil { testpb.RegisterTestServiceServer(s, ts)
healthgrpc.RegisterHealthServer(s, te.healthServer)
} }
if te.testServer != nil {
testpb.RegisterTestServiceServer(s, te.testServer) // Create a new default health server if enableHealthServer is set, or use
// the provided one.
hs := te.healthServer
if te.enableHealthServer {
hs = health.NewServer()
} }
if hs != nil {
healthgrpc.RegisterHealthServer(s, hs)
}
addr := la addr := la
switch te.e.network { switch te.e.network {
case "unix": case "unix":
@ -653,6 +673,8 @@ func (te *test) listenAndServe(ts testpb.TestServiceServer, listen func(network,
addr = "localhost:" + port addr = "localhost:" + port
} }
te.srv = s
te.hSrv = hs
te.srvAddr = addr te.srvAddr = addr
if te.e.httpHandler { if te.e.httpHandler {
@ -697,12 +719,25 @@ func (te *test) startServerWithConnControl(ts testpb.TestServiceServer) *listene
return l.(*listenerWrapper) return l.(*listenerWrapper)
} }
// startServer starts a gRPC server listening. Callers should defer a // startServer starts a gRPC server exposing the provided TestService
// call to te.tearDown to clean up. // implementation. Callers should defer a call to te.tearDown to clean up
func (te *test) startServer(ts testpb.TestServiceServer) { func (te *test) startServer(ts testpb.TestServiceServer) {
te.listenAndServe(ts, net.Listen) te.listenAndServe(ts, net.Listen)
} }
// startServers starts 'num' gRPC servers exposing the provided TestService.
func (te *test) startServers(ts testpb.TestServiceServer, num int) {
for i := 0; i < num; i++ {
te.startServer(ts)
te.srvs = append(te.srvs, te.srv.(*grpc.Server))
te.hSrvs = append(te.hSrvs, te.hSrv)
te.srvAddrs = append(te.srvAddrs, te.srvAddr)
te.srv = nil
te.hSrv = nil
te.srvAddr = ""
}
}
type nopCompressor struct { type nopCompressor struct {
grpc.Compressor grpc.Compressor
} }
@ -757,8 +792,8 @@ func (te *test) configDial(opts ...grpc.DialOption) ([]grpc.DialOption, string)
if te.streamClientInt != nil { if te.streamClientInt != nil {
opts = append(opts, grpc.WithStreamInterceptor(te.streamClientInt)) opts = append(opts, grpc.WithStreamInterceptor(te.streamClientInt))
} }
if te.maxMsgSize != nil { if te.maxClientMsgSize != nil {
opts = append(opts, grpc.WithMaxMsgSize(*te.maxMsgSize)) opts = append(opts, grpc.WithMaxMsgSize(*te.maxClientMsgSize))
} }
if te.maxClientReceiveMsgSize != nil { if te.maxClientReceiveMsgSize != nil {
opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(*te.maxClientReceiveMsgSize))) opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(*te.maxClientReceiveMsgSize)))
@ -2881,15 +2916,16 @@ func (s) TestExceedMsgLimit(t *testing.T) {
func testExceedMsgLimit(t *testing.T, e env) { func testExceedMsgLimit(t *testing.T, e env) {
te := newTest(t, e) te := newTest(t, e)
te.maxMsgSize = newInt(1024) maxMsgSize := 1024
te.maxServerMsgSize, te.maxClientMsgSize = newInt(maxMsgSize), newInt(maxMsgSize)
te.startServer(&testServer{security: e.security}) te.startServer(&testServer{security: e.security})
defer te.tearDown() defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn()) tc := testpb.NewTestServiceClient(te.clientConn())
argSize := int32(*te.maxMsgSize + 1) largeSize := int32(maxMsgSize + 1)
const smallSize = 1 const smallSize = 1
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -2898,23 +2934,23 @@ func testExceedMsgLimit(t *testing.T, e env) {
t.Fatal(err) t.Fatal(err)
} }
// Test on server side for unary RPC. // Make sure the server cannot receive a unary RPC of largeSize.
req := &testpb.SimpleRequest{ req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE, ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseSize: smallSize, ResponseSize: smallSize,
Payload: payload, Payload: largePayload,
} }
if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
} }
// Test on client side for unary RPC. // Make sure the client cannot receive a unary RPC of largeSize.
req.ResponseSize = int32(*te.maxMsgSize) + 1 req.ResponseSize = largeSize
req.Payload = smallPayload req.Payload = smallPayload
if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
} }
// Test on server side for streaming RPC. // Make sure the server cannot receive a streaming RPC of largeSize.
stream, err := tc.FullDuplexCall(te.ctx) stream, err := tc.FullDuplexCall(te.ctx)
if err != nil { if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
@ -2925,15 +2961,10 @@ func testExceedMsgLimit(t *testing.T, e env) {
}, },
} }
spayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(*te.maxMsgSize+1))
if err != nil {
t.Fatal(err)
}
sreq := &testpb.StreamingOutputCallRequest{ sreq := &testpb.StreamingOutputCallRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE, ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseParameters: respParam, ResponseParameters: respParam,
Payload: spayload, Payload: largePayload,
} }
if err := stream.Send(sreq); err != nil { if err := stream.Send(sreq); err != nil {
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
@ -2947,7 +2978,7 @@ func testExceedMsgLimit(t *testing.T, e env) {
if err != nil { if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
} }
respParam[0].Size = int32(*te.maxMsgSize) + 1 respParam[0].Size = largeSize
sreq.Payload = smallPayload sreq.Payload = smallPayload
if err := stream.Send(sreq); err != nil { if err := stream.Send(sreq); err != nil {
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
@ -2955,7 +2986,6 @@ func testExceedMsgLimit(t *testing.T, e env) {
if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
} }
} }
func (s) TestPeerClientSide(t *testing.T) { func (s) TestPeerClientSide(t *testing.T) {
@ -7412,3 +7442,11 @@ func doHTTPHeaderTest(t *testing.T, errCode codes.Code, headerFields ...[]string
t.Fatalf("stream.Recv() = _, %v, want error code: %v", err, errCode) t.Fatalf("stream.Recv() = _, %v, want error code: %v", err, errCode)
} }
} }
func parseCfg(s string) serviceconfig.Config {
c, err := serviceconfig.Parse(s)
if err != nil {
panic(fmt.Sprintf("Error parsing config %q: %v", s, err))
}
return c
}