Add stats test for client streaming and server streaming RPCs (#1140)

This commit is contained in:
Charith Ellawala
2017-05-11 19:58:13 +01:00
committed by Menghan Li
parent 88a73d35c9
commit a0c3e72252
3 changed files with 455 additions and 47 deletions

View File

@ -34,7 +34,6 @@ var _ = math.Inf
// proto package needs to be updated. // proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
// Unary request.
type SimpleRequest struct { type SimpleRequest struct {
Id int32 `protobuf:"varint,2,opt,name=id" json:"id,omitempty"` Id int32 `protobuf:"varint,2,opt,name=id" json:"id,omitempty"`
} }
@ -44,7 +43,13 @@ func (m *SimpleRequest) String() string { return proto.CompactTextStr
func (*SimpleRequest) ProtoMessage() {} func (*SimpleRequest) ProtoMessage() {}
func (*SimpleRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } func (*SimpleRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
// Unary response, as configured by the request. func (m *SimpleRequest) GetId() int32 {
if m != nil {
return m.Id
}
return 0
}
type SimpleResponse struct { type SimpleResponse struct {
Id int32 `protobuf:"varint,3,opt,name=id" json:"id,omitempty"` Id int32 `protobuf:"varint,3,opt,name=id" json:"id,omitempty"`
} }
@ -54,6 +59,13 @@ func (m *SimpleResponse) String() string { return proto.CompactTextSt
func (*SimpleResponse) ProtoMessage() {} func (*SimpleResponse) ProtoMessage() {}
func (*SimpleResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } func (*SimpleResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *SimpleResponse) GetId() int32 {
if m != nil {
return m.Id
}
return 0
}
func init() { func init() {
proto.RegisterType((*SimpleRequest)(nil), "grpc.testing.SimpleRequest") proto.RegisterType((*SimpleRequest)(nil), "grpc.testing.SimpleRequest")
proto.RegisterType((*SimpleResponse)(nil), "grpc.testing.SimpleResponse") proto.RegisterType((*SimpleResponse)(nil), "grpc.testing.SimpleResponse")
@ -77,6 +89,10 @@ type TestServiceClient interface {
// As one request could lead to multiple responses, this interface // As one request could lead to multiple responses, this interface
// demonstrates the idea of full duplexing. // demonstrates the idea of full duplexing.
FullDuplexCall(ctx context.Context, opts ...grpc.CallOption) (TestService_FullDuplexCallClient, error) FullDuplexCall(ctx context.Context, opts ...grpc.CallOption) (TestService_FullDuplexCallClient, error)
// Client stream
ClientStreamCall(ctx context.Context, opts ...grpc.CallOption) (TestService_ClientStreamCallClient, error)
// Server stream
ServerStreamCall(ctx context.Context, in *SimpleRequest, opts ...grpc.CallOption) (TestService_ServerStreamCallClient, error)
} }
type testServiceClient struct { type testServiceClient struct {
@ -127,6 +143,72 @@ func (x *testServiceFullDuplexCallClient) Recv() (*SimpleResponse, error) {
return m, nil return m, nil
} }
func (c *testServiceClient) ClientStreamCall(ctx context.Context, opts ...grpc.CallOption) (TestService_ClientStreamCallClient, error) {
stream, err := grpc.NewClientStream(ctx, &_TestService_serviceDesc.Streams[1], c.cc, "/grpc.testing.TestService/ClientStreamCall", opts...)
if err != nil {
return nil, err
}
x := &testServiceClientStreamCallClient{stream}
return x, nil
}
type TestService_ClientStreamCallClient interface {
Send(*SimpleRequest) error
CloseAndRecv() (*SimpleResponse, error)
grpc.ClientStream
}
type testServiceClientStreamCallClient struct {
grpc.ClientStream
}
func (x *testServiceClientStreamCallClient) Send(m *SimpleRequest) error {
return x.ClientStream.SendMsg(m)
}
func (x *testServiceClientStreamCallClient) CloseAndRecv() (*SimpleResponse, error) {
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
m := new(SimpleResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *testServiceClient) ServerStreamCall(ctx context.Context, in *SimpleRequest, opts ...grpc.CallOption) (TestService_ServerStreamCallClient, error) {
stream, err := grpc.NewClientStream(ctx, &_TestService_serviceDesc.Streams[2], c.cc, "/grpc.testing.TestService/ServerStreamCall", opts...)
if err != nil {
return nil, err
}
x := &testServiceServerStreamCallClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type TestService_ServerStreamCallClient interface {
Recv() (*SimpleResponse, error)
grpc.ClientStream
}
type testServiceServerStreamCallClient struct {
grpc.ClientStream
}
func (x *testServiceServerStreamCallClient) Recv() (*SimpleResponse, error) {
m := new(SimpleResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// Server API for TestService service // Server API for TestService service
type TestServiceServer interface { type TestServiceServer interface {
@ -137,6 +219,10 @@ type TestServiceServer interface {
// As one request could lead to multiple responses, this interface // As one request could lead to multiple responses, this interface
// demonstrates the idea of full duplexing. // demonstrates the idea of full duplexing.
FullDuplexCall(TestService_FullDuplexCallServer) error FullDuplexCall(TestService_FullDuplexCallServer) error
// Client stream
ClientStreamCall(TestService_ClientStreamCallServer) error
// Server stream
ServerStreamCall(*SimpleRequest, TestService_ServerStreamCallServer) error
} }
func RegisterTestServiceServer(s *grpc.Server, srv TestServiceServer) { func RegisterTestServiceServer(s *grpc.Server, srv TestServiceServer) {
@ -187,6 +273,53 @@ func (x *testServiceFullDuplexCallServer) Recv() (*SimpleRequest, error) {
return m, nil return m, nil
} }
func _TestService_ClientStreamCall_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(TestServiceServer).ClientStreamCall(&testServiceClientStreamCallServer{stream})
}
type TestService_ClientStreamCallServer interface {
SendAndClose(*SimpleResponse) error
Recv() (*SimpleRequest, error)
grpc.ServerStream
}
type testServiceClientStreamCallServer struct {
grpc.ServerStream
}
func (x *testServiceClientStreamCallServer) SendAndClose(m *SimpleResponse) error {
return x.ServerStream.SendMsg(m)
}
func (x *testServiceClientStreamCallServer) Recv() (*SimpleRequest, error) {
m := new(SimpleRequest)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func _TestService_ServerStreamCall_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(SimpleRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(TestServiceServer).ServerStreamCall(m, &testServiceServerStreamCallServer{stream})
}
type TestService_ServerStreamCallServer interface {
Send(*SimpleResponse) error
grpc.ServerStream
}
type testServiceServerStreamCallServer struct {
grpc.ServerStream
}
func (x *testServiceServerStreamCallServer) Send(m *SimpleResponse) error {
return x.ServerStream.SendMsg(m)
}
var _TestService_serviceDesc = grpc.ServiceDesc{ var _TestService_serviceDesc = grpc.ServiceDesc{
ServiceName: "grpc.testing.TestService", ServiceName: "grpc.testing.TestService",
HandlerType: (*TestServiceServer)(nil), HandlerType: (*TestServiceServer)(nil),
@ -203,6 +336,16 @@ var _TestService_serviceDesc = grpc.ServiceDesc{
ServerStreams: true, ServerStreams: true,
ClientStreams: true, ClientStreams: true,
}, },
{
StreamName: "ClientStreamCall",
Handler: _TestService_ClientStreamCall_Handler,
ClientStreams: true,
},
{
StreamName: "ServerStreamCall",
Handler: _TestService_ServerStreamCall_Handler,
ServerStreams: true,
},
}, },
Metadata: "test.proto", Metadata: "test.proto",
} }
@ -210,16 +353,18 @@ var _TestService_serviceDesc = grpc.ServiceDesc{
func init() { proto.RegisterFile("test.proto", fileDescriptor0) } func init() { proto.RegisterFile("test.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{ var fileDescriptor0 = []byte{
// 167 bytes of a gzipped FileDescriptorProto // 196 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x2a, 0x49, 0x2d, 0x2e, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x2a, 0x49, 0x2d, 0x2e,
0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x49, 0x2f, 0x2a, 0x48, 0xd6, 0x03, 0x09, 0x64, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x49, 0x2f, 0x2a, 0x48, 0xd6, 0x03, 0x09, 0x64,
0xe6, 0xa5, 0x2b, 0xc9, 0x73, 0xf1, 0x06, 0x67, 0xe6, 0x16, 0xe4, 0xa4, 0x06, 0xa5, 0x16, 0x96, 0xe6, 0xa5, 0x2b, 0xc9, 0x73, 0xf1, 0x06, 0x67, 0xe6, 0x16, 0xe4, 0xa4, 0x06, 0xa5, 0x16, 0x96,
0xa6, 0x16, 0x97, 0x08, 0xf1, 0x71, 0x31, 0x65, 0xa6, 0x48, 0x30, 0x29, 0x30, 0x6a, 0xb0, 0x06, 0xa6, 0x16, 0x97, 0x08, 0xf1, 0x71, 0x31, 0x65, 0xa6, 0x48, 0x30, 0x29, 0x30, 0x6a, 0xb0, 0x06,
0x31, 0x65, 0xa6, 0x28, 0x29, 0x70, 0xf1, 0xc1, 0x14, 0x14, 0x17, 0xe4, 0xe7, 0x15, 0xa7, 0x42, 0x31, 0x65, 0xa6, 0x28, 0x29, 0x70, 0xf1, 0xc1, 0x14, 0x14, 0x17, 0xe4, 0xe7, 0x15, 0xa7, 0x42,
0x55, 0x30, 0xc3, 0x54, 0x18, 0x2d, 0x63, 0xe4, 0xe2, 0x0e, 0x49, 0x2d, 0x2e, 0x09, 0x4e, 0x2d, 0x55, 0x30, 0xc3, 0x54, 0x18, 0x9d, 0x60, 0xe2, 0xe2, 0x0e, 0x49, 0x2d, 0x2e, 0x09, 0x4e, 0x2d,
0x2a, 0xcb, 0x4c, 0x4e, 0x15, 0x72, 0xe3, 0xe2, 0x0c, 0xcd, 0x4b, 0x2c, 0xaa, 0x74, 0x4e, 0xcc, 0x2a, 0xcb, 0x4c, 0x4e, 0x15, 0x72, 0xe3, 0xe2, 0x0c, 0xcd, 0x4b, 0x2c, 0xaa, 0x74, 0x4e, 0xcc,
0xc9, 0x11, 0x92, 0xd6, 0x43, 0xb6, 0x4e, 0x0f, 0xc5, 0x2e, 0x29, 0x19, 0xec, 0x92, 0x50, 0x7b, 0xc9, 0x11, 0x92, 0xd6, 0x43, 0xb6, 0x4e, 0x0f, 0xc5, 0x2e, 0x29, 0x19, 0xec, 0x92, 0x50, 0x7b,
0xfc, 0xb9, 0xf8, 0xdc, 0x4a, 0x73, 0x72, 0x5c, 0x4a, 0x0b, 0x72, 0x52, 0x2b, 0x28, 0x34, 0x4c, 0xfc, 0xb9, 0xf8, 0xdc, 0x4a, 0x73, 0x72, 0x5c, 0x4a, 0x0b, 0x72, 0x52, 0x2b, 0x28, 0x34, 0x4c,
0x83, 0xd1, 0x80, 0x31, 0x89, 0x0d, 0x1c, 0x00, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x8d, 0x83, 0xd1, 0x80, 0x51, 0xc8, 0x9f, 0x4b, 0xc0, 0x39, 0x27, 0x33, 0x35, 0xaf, 0x24, 0xb8, 0xa4,
0x82, 0x5b, 0xdd, 0x0e, 0x01, 0x00, 0x00, 0x28, 0x35, 0x31, 0x97, 0x62, 0x23, 0x41, 0x06, 0x82, 0x3c, 0x9d, 0x5a, 0x44, 0x15, 0x03, 0x0d,
0x18, 0x93, 0xd8, 0xc0, 0x51, 0x64, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x61, 0x49, 0x59, 0xe6,
0xb0, 0x01, 0x00, 0x00,
} }

View File

@ -20,4 +20,10 @@ service TestService {
// As one request could lead to multiple responses, this interface // As one request could lead to multiple responses, this interface
// demonstrates the idea of full duplexing. // demonstrates the idea of full duplexing.
rpc FullDuplexCall(stream SimpleRequest) returns (stream SimpleResponse); rpc FullDuplexCall(stream SimpleRequest) returns (stream SimpleResponse);
// Client stream
rpc ClientStreamCall(stream SimpleRequest) returns (SimpleResponse);
// Server stream
rpc ServerStreamCall(SimpleRequest) returns (stream SimpleResponse);
} }

View File

@ -120,6 +120,51 @@ func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServ
} }
} }
func (s *testServer) ClientStreamCall(stream testpb.TestService_ClientStreamCallServer) error {
md, ok := metadata.FromContext(stream.Context())
if ok {
if err := stream.SendHeader(md); err != nil {
return grpc.Errorf(grpc.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil)
}
stream.SetTrailer(testTrailerMetadata)
}
for {
in, err := stream.Recv()
if err == io.EOF {
// read done.
return stream.SendAndClose(&testpb.SimpleResponse{Id: int32(0)})
}
if err != nil {
return err
}
if in.Id == errorID {
return fmt.Errorf("got error id: %v", in.Id)
}
}
}
func (s *testServer) ServerStreamCall(in *testpb.SimpleRequest, stream testpb.TestService_ServerStreamCallServer) error {
md, ok := metadata.FromContext(stream.Context())
if ok {
if err := stream.SendHeader(md); err != nil {
return grpc.Errorf(grpc.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil)
}
stream.SetTrailer(testTrailerMetadata)
}
if in.Id == errorID {
return fmt.Errorf("got error id: %v", in.Id)
}
for i := 0; i < 5; i++ {
if err := stream.Send(&testpb.SimpleResponse{Id: in.Id}); err != nil {
return err
}
}
return nil
}
// test is an end-to-end test. It should be created with the newTest // test is an end-to-end test. It should be created with the newTest
// func, modified as needed, and then started with its startServer method. // func, modified as needed, and then started with its startServer method.
// It should be cleaned up with the tearDown method. // It should be cleaned up with the tearDown method.
@ -218,11 +263,20 @@ func (te *test) clientConn() *grpc.ClientConn {
return te.cc return te.cc
} }
type rpcType int
const (
unaryRPC rpcType = iota
clientStreamRPC
serverStreamRPC
fullDuplexStreamRPC
)
type rpcConfig struct { type rpcConfig struct {
count int // Number of requests and responses for streaming RPCs. count int // Number of requests and responses for streaming RPCs.
success bool // Whether the RPC should succeed or return error. success bool // Whether the RPC should succeed or return error.
failfast bool failfast bool
streaming bool // Whether the rpc should be a streaming RPC. callType rpcType // Type of RPC.
noLastRecv bool // Whether to call recv for io.EOF. When true, last recv won't be called. Only valid for streaming RPCs. noLastRecv bool // Whether to call recv for io.EOF. When true, last recv won't be called. Only valid for streaming RPCs.
} }
@ -289,6 +343,64 @@ func (te *test) doFullDuplexCallRoundtrip(c *rpcConfig) ([]*testpb.SimpleRequest
return reqs, resps, nil return reqs, resps, nil
} }
func (te *test) doClientStreamCall(c *rpcConfig) ([]*testpb.SimpleRequest, *testpb.SimpleResponse, error) {
var (
reqs []*testpb.SimpleRequest
resp *testpb.SimpleResponse
err error
)
tc := testpb.NewTestServiceClient(te.clientConn())
stream, err := tc.ClientStreamCall(metadata.NewContext(context.Background(), testMetadata), grpc.FailFast(c.failfast))
if err != nil {
return reqs, resp, err
}
var startID int32
if !c.success {
startID = errorID
}
for i := 0; i < c.count; i++ {
req := &testpb.SimpleRequest{
Id: int32(i) + startID,
}
reqs = append(reqs, req)
if err = stream.Send(req); err != nil {
return reqs, resp, err
}
}
resp, err = stream.CloseAndRecv()
return reqs, resp, err
}
func (te *test) doServerStreamCall(c *rpcConfig) (*testpb.SimpleRequest, []*testpb.SimpleResponse, error) {
var (
req *testpb.SimpleRequest
resps []*testpb.SimpleResponse
err error
)
tc := testpb.NewTestServiceClient(te.clientConn())
var startID int32
if !c.success {
startID = errorID
}
req = &testpb.SimpleRequest{Id: startID}
stream, err := tc.ServerStreamCall(metadata.NewContext(context.Background(), testMetadata), req, grpc.FailFast(c.failfast))
if err != nil {
return req, resps, err
}
for {
var resp *testpb.SimpleResponse
resp, err := stream.Recv()
if err == io.EOF {
return req, resps, nil
} else if err != nil {
return req, resps, err
}
resps = append(resps, resp)
}
}
type expectedData struct { type expectedData struct {
method string method string
serverAddr string serverAddr string
@ -675,13 +787,32 @@ func testServerStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs []f
reqs []*testpb.SimpleRequest reqs []*testpb.SimpleRequest
resps []*testpb.SimpleResponse resps []*testpb.SimpleResponse
err error err error
method string
req *testpb.SimpleRequest
resp *testpb.SimpleResponse
e error
) )
if !cc.streaming {
req, resp, e := te.doUnaryCall(cc) switch cc.callType {
case unaryRPC:
method = "/grpc.testing.TestService/UnaryCall"
req, resp, e = te.doUnaryCall(cc)
reqs = []*testpb.SimpleRequest{req} reqs = []*testpb.SimpleRequest{req}
resps = []*testpb.SimpleResponse{resp} resps = []*testpb.SimpleResponse{resp}
err = e err = e
} else { case clientStreamRPC:
method = "/grpc.testing.TestService/ClientStreamCall"
reqs, resp, e = te.doClientStreamCall(cc)
resps = []*testpb.SimpleResponse{resp}
err = e
case serverStreamRPC:
method = "/grpc.testing.TestService/ServerStreamCall"
req, resps, e = te.doServerStreamCall(cc)
reqs = []*testpb.SimpleRequest{req}
err = e
case fullDuplexStreamRPC:
method = "/grpc.testing.TestService/FullDuplexCall"
reqs, resps, err = te.doFullDuplexCallRoundtrip(cc) reqs, resps, err = te.doFullDuplexCallRoundtrip(cc)
} }
if cc.success != (err == nil) { if cc.success != (err == nil) {
@ -713,22 +844,18 @@ func testServerStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs []f
expect := &expectedData{ expect := &expectedData{
serverAddr: te.srvAddr, serverAddr: te.srvAddr,
compression: tc.compress, compression: tc.compress,
method: method,
requests: reqs, requests: reqs,
responses: resps, responses: resps,
err: err, err: err,
} }
if !cc.streaming {
expect.method = "/grpc.testing.TestService/UnaryCall"
} else {
expect.method = "/grpc.testing.TestService/FullDuplexCall"
}
checkConnStats(t, h.gotConn) checkConnStats(t, h.gotConn)
checkServerStats(t, h.gotRPC, expect, checkFuncs) checkServerStats(t, h.gotRPC, expect, checkFuncs)
} }
func TestServerStatsUnaryRPC(t *testing.T) { func TestServerStatsUnaryRPC(t *testing.T) {
testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: true}, []func(t *testing.T, d *gotData, e *expectedData){ testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: true, callType: unaryRPC}, []func(t *testing.T, d *gotData, e *expectedData){
checkInHeader, checkInHeader,
checkBegin, checkBegin,
checkInPayload, checkInPayload,
@ -740,7 +867,7 @@ func TestServerStatsUnaryRPC(t *testing.T) {
} }
func TestServerStatsUnaryRPCError(t *testing.T) { func TestServerStatsUnaryRPCError(t *testing.T) {
testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: false}, []func(t *testing.T, d *gotData, e *expectedData){ testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: false, callType: unaryRPC}, []func(t *testing.T, d *gotData, e *expectedData){
checkInHeader, checkInHeader,
checkBegin, checkBegin,
checkInPayload, checkInPayload,
@ -750,7 +877,73 @@ func TestServerStatsUnaryRPCError(t *testing.T) {
}) })
} }
func TestServerStatsStreamingRPC(t *testing.T) { func TestServerStatsClientStreamRPC(t *testing.T) {
count := 5
checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
checkInHeader,
checkBegin,
checkOutHeader,
}
ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){
checkInPayload,
}
for i := 0; i < count; i++ {
checkFuncs = append(checkFuncs, ioPayFuncs...)
}
checkFuncs = append(checkFuncs,
checkOutPayload,
checkOutTrailer,
checkEnd,
)
testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: clientStreamRPC}, checkFuncs)
}
func TestServerStatsClientStreamRPCError(t *testing.T) {
count := 1
testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: clientStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){
checkInHeader,
checkBegin,
checkOutHeader,
checkInPayload,
checkOutTrailer,
checkEnd,
})
}
func TestServerStatsServerStreamRPC(t *testing.T) {
count := 5
checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
checkInHeader,
checkBegin,
checkInPayload,
checkOutHeader,
}
ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){
checkOutPayload,
}
for i := 0; i < count; i++ {
checkFuncs = append(checkFuncs, ioPayFuncs...)
}
checkFuncs = append(checkFuncs,
checkOutTrailer,
checkEnd,
)
testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: serverStreamRPC}, checkFuncs)
}
func TestServerStatsServerStreamRPCError(t *testing.T) {
count := 5
testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: serverStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){
checkInHeader,
checkBegin,
checkInPayload,
checkOutHeader,
checkOutTrailer,
checkEnd,
})
}
func TestServerStatsFullDuplexRPC(t *testing.T) {
count := 5 count := 5
checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){ checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
checkInHeader, checkInHeader,
@ -768,12 +961,12 @@ func TestServerStatsStreamingRPC(t *testing.T) {
checkOutTrailer, checkOutTrailer,
checkEnd, checkEnd,
) )
testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, streaming: true}, checkFuncs) testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: fullDuplexStreamRPC}, checkFuncs)
} }
func TestServerStatsStreamingRPCError(t *testing.T) { func TestServerStatsFullDuplexRPCError(t *testing.T) {
count := 5 count := 5
testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, streaming: true}, []func(t *testing.T, d *gotData, e *expectedData){ testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: fullDuplexStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){
checkInHeader, checkInHeader,
checkBegin, checkBegin,
checkOutHeader, checkOutHeader,
@ -882,14 +1075,32 @@ func testClientStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs map
var ( var (
reqs []*testpb.SimpleRequest reqs []*testpb.SimpleRequest
resps []*testpb.SimpleResponse resps []*testpb.SimpleResponse
method string
err error err error
req *testpb.SimpleRequest
resp *testpb.SimpleResponse
e error
) )
if !cc.streaming { switch cc.callType {
req, resp, e := te.doUnaryCall(cc) case unaryRPC:
method = "/grpc.testing.TestService/UnaryCall"
req, resp, e = te.doUnaryCall(cc)
reqs = []*testpb.SimpleRequest{req} reqs = []*testpb.SimpleRequest{req}
resps = []*testpb.SimpleResponse{resp} resps = []*testpb.SimpleResponse{resp}
err = e err = e
} else { case clientStreamRPC:
method = "/grpc.testing.TestService/ClientStreamCall"
reqs, resp, e = te.doClientStreamCall(cc)
resps = []*testpb.SimpleResponse{resp}
err = e
case serverStreamRPC:
method = "/grpc.testing.TestService/ServerStreamCall"
req, resps, e = te.doServerStreamCall(cc)
reqs = []*testpb.SimpleRequest{req}
err = e
case fullDuplexStreamRPC:
method = "/grpc.testing.TestService/FullDuplexCall"
reqs, resps, err = te.doFullDuplexCallRoundtrip(cc) reqs, resps, err = te.doFullDuplexCallRoundtrip(cc)
} }
if cc.success != (err == nil) { if cc.success != (err == nil) {
@ -925,23 +1136,19 @@ func testClientStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs map
expect := &expectedData{ expect := &expectedData{
serverAddr: te.srvAddr, serverAddr: te.srvAddr,
compression: tc.compress, compression: tc.compress,
method: method,
requests: reqs, requests: reqs,
responses: resps, responses: resps,
failfast: cc.failfast, failfast: cc.failfast,
err: err, err: err,
} }
if !cc.streaming {
expect.method = "/grpc.testing.TestService/UnaryCall"
} else {
expect.method = "/grpc.testing.TestService/FullDuplexCall"
}
checkConnStats(t, h.gotConn) checkConnStats(t, h.gotConn)
checkClientStats(t, h.gotRPC, expect, checkFuncs) checkClientStats(t, h.gotRPC, expect, checkFuncs)
} }
func TestClientStatsUnaryRPC(t *testing.T) { func TestClientStatsUnaryRPC(t *testing.T) {
testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: true, failfast: false}, map[int]*checkFuncWithCount{ testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: true, failfast: false, callType: unaryRPC}, map[int]*checkFuncWithCount{
begin: {checkBegin, 1}, begin: {checkBegin, 1},
outHeader: {checkOutHeader, 1}, outHeader: {checkOutHeader, 1},
outPayload: {checkOutPayload, 1}, outPayload: {checkOutPayload, 1},
@ -953,7 +1160,7 @@ func TestClientStatsUnaryRPC(t *testing.T) {
} }
func TestClientStatsUnaryRPCError(t *testing.T) { func TestClientStatsUnaryRPCError(t *testing.T) {
testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: false, failfast: false}, map[int]*checkFuncWithCount{ testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: false, failfast: false, callType: unaryRPC}, map[int]*checkFuncWithCount{
begin: {checkBegin, 1}, begin: {checkBegin, 1},
outHeader: {checkOutHeader, 1}, outHeader: {checkOutHeader, 1},
outPayload: {checkOutPayload, 1}, outPayload: {checkOutPayload, 1},
@ -963,23 +1170,59 @@ func TestClientStatsUnaryRPCError(t *testing.T) {
}) })
} }
func TestClientStatsStreamingRPC(t *testing.T) { func TestClientStatsClientStreamRPC(t *testing.T) {
count := 5 count := 5
testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, streaming: true}, map[int]*checkFuncWithCount{ testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: clientStreamRPC}, map[int]*checkFuncWithCount{
begin: {checkBegin, 1}, begin: {checkBegin, 1},
outHeader: {checkOutHeader, 1}, outHeader: {checkOutHeader, 1},
outPayload: {checkOutPayload, count},
inHeader: {checkInHeader, 1}, inHeader: {checkInHeader, 1},
inPayload: {checkInPayload, count}, outPayload: {checkOutPayload, count},
inTrailer: {checkInTrailer, 1}, inTrailer: {checkInTrailer, 1},
inPayload: {checkInPayload, 1},
end: {checkEnd, 1}, end: {checkEnd, 1},
}) })
} }
// If the user doesn't call the last recv() on clientSteam. func TestClientStatsClientStreamRPCError(t *testing.T) {
func TestClientStatsStreamingRPCNotCallingLastRecv(t *testing.T) {
count := 1 count := 1
testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, streaming: true, noLastRecv: true}, map[int]*checkFuncWithCount{ testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: clientStreamRPC}, map[int]*checkFuncWithCount{
begin: {checkBegin, 1},
outHeader: {checkOutHeader, 1},
inHeader: {checkInHeader, 1},
outPayload: {checkOutPayload, 1},
inTrailer: {checkInTrailer, 1},
end: {checkEnd, 1},
})
}
func TestClientStatsServerStreamRPC(t *testing.T) {
count := 5
testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: serverStreamRPC}, map[int]*checkFuncWithCount{
begin: {checkBegin, 1},
outHeader: {checkOutHeader, 1},
outPayload: {checkOutPayload, 1},
inHeader: {checkInHeader, 1},
inPayload: {checkInPayload, count},
inTrailer: {checkInTrailer, 1},
end: {checkEnd, 1},
})
}
func TestClientStatsServerStreamRPCError(t *testing.T) {
count := 5
testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: serverStreamRPC}, map[int]*checkFuncWithCount{
begin: {checkBegin, 1},
outHeader: {checkOutHeader, 1},
outPayload: {checkOutPayload, 1},
inHeader: {checkInHeader, 1},
inTrailer: {checkInTrailer, 1},
end: {checkEnd, 1},
})
}
func TestClientStatsFullDuplexRPC(t *testing.T) {
count := 5
testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: fullDuplexStreamRPC}, map[int]*checkFuncWithCount{
begin: {checkBegin, 1}, begin: {checkBegin, 1},
outHeader: {checkOutHeader, 1}, outHeader: {checkOutHeader, 1},
outPayload: {checkOutPayload, count}, outPayload: {checkOutPayload, count},
@ -990,9 +1233,9 @@ func TestClientStatsStreamingRPCNotCallingLastRecv(t *testing.T) {
}) })
} }
func TestClientStatsStreamingRPCError(t *testing.T) { func TestClientStatsFullDuplexRPCError(t *testing.T) {
count := 5 count := 5
testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, streaming: true}, map[int]*checkFuncWithCount{ testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: fullDuplexStreamRPC}, map[int]*checkFuncWithCount{
begin: {checkBegin, 1}, begin: {checkBegin, 1},
outHeader: {checkOutHeader, 1}, outHeader: {checkOutHeader, 1},
outPayload: {checkOutPayload, 1}, outPayload: {checkOutPayload, 1},
@ -1001,3 +1244,17 @@ func TestClientStatsStreamingRPCError(t *testing.T) {
end: {checkEnd, 1}, end: {checkEnd, 1},
}) })
} }
// If the user doesn't call the last recv() on clientStream.
func TestClientStatsFullDuplexRPCNotCallingLastRecv(t *testing.T) {
count := 1
testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: fullDuplexStreamRPC, noLastRecv: true}, map[int]*checkFuncWithCount{
begin: {checkBegin, 1},
outHeader: {checkOutHeader, 1},
outPayload: {checkOutPayload, count},
inHeader: {checkInHeader, 1},
inPayload: {checkInPayload, count},
inTrailer: {checkInTrailer, 1},
end: {checkEnd, 1},
})
}