diff --git a/stats/grpc_testing/test.pb.go b/stats/grpc_testing/test.pb.go index b24dcd8d..5730004a 100644 --- a/stats/grpc_testing/test.pb.go +++ b/stats/grpc_testing/test.pb.go @@ -34,7 +34,6 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package -// Unary request. type SimpleRequest struct { Id int32 `protobuf:"varint,2,opt,name=id" json:"id,omitempty"` } @@ -44,7 +43,13 @@ func (m *SimpleRequest) String() string { return proto.CompactTextStr func (*SimpleRequest) ProtoMessage() {} func (*SimpleRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } -// Unary response, as configured by the request. +func (m *SimpleRequest) GetId() int32 { + if m != nil { + return m.Id + } + return 0 +} + type SimpleResponse struct { Id int32 `protobuf:"varint,3,opt,name=id" json:"id,omitempty"` } @@ -54,6 +59,13 @@ func (m *SimpleResponse) String() string { return proto.CompactTextSt func (*SimpleResponse) ProtoMessage() {} func (*SimpleResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } +func (m *SimpleResponse) GetId() int32 { + if m != nil { + return m.Id + } + return 0 +} + func init() { proto.RegisterType((*SimpleRequest)(nil), "grpc.testing.SimpleRequest") proto.RegisterType((*SimpleResponse)(nil), "grpc.testing.SimpleResponse") @@ -77,6 +89,10 @@ type TestServiceClient interface { // As one request could lead to multiple responses, this interface // demonstrates the idea of full duplexing. FullDuplexCall(ctx context.Context, opts ...grpc.CallOption) (TestService_FullDuplexCallClient, error) + // Client stream + ClientStreamCall(ctx context.Context, opts ...grpc.CallOption) (TestService_ClientStreamCallClient, error) + // Server stream + ServerStreamCall(ctx context.Context, in *SimpleRequest, opts ...grpc.CallOption) (TestService_ServerStreamCallClient, error) } type testServiceClient struct { @@ -127,6 +143,72 @@ func (x *testServiceFullDuplexCallClient) Recv() (*SimpleResponse, error) { return m, nil } +func (c *testServiceClient) ClientStreamCall(ctx context.Context, opts ...grpc.CallOption) (TestService_ClientStreamCallClient, error) { + stream, err := grpc.NewClientStream(ctx, &_TestService_serviceDesc.Streams[1], c.cc, "/grpc.testing.TestService/ClientStreamCall", opts...) + if err != nil { + return nil, err + } + x := &testServiceClientStreamCallClient{stream} + return x, nil +} + +type TestService_ClientStreamCallClient interface { + Send(*SimpleRequest) error + CloseAndRecv() (*SimpleResponse, error) + grpc.ClientStream +} + +type testServiceClientStreamCallClient struct { + grpc.ClientStream +} + +func (x *testServiceClientStreamCallClient) Send(m *SimpleRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *testServiceClientStreamCallClient) CloseAndRecv() (*SimpleResponse, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(SimpleResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *testServiceClient) ServerStreamCall(ctx context.Context, in *SimpleRequest, opts ...grpc.CallOption) (TestService_ServerStreamCallClient, error) { + stream, err := grpc.NewClientStream(ctx, &_TestService_serviceDesc.Streams[2], c.cc, "/grpc.testing.TestService/ServerStreamCall", opts...) + if err != nil { + return nil, err + } + x := &testServiceServerStreamCallClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type TestService_ServerStreamCallClient interface { + Recv() (*SimpleResponse, error) + grpc.ClientStream +} + +type testServiceServerStreamCallClient struct { + grpc.ClientStream +} + +func (x *testServiceServerStreamCallClient) Recv() (*SimpleResponse, error) { + m := new(SimpleResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // Server API for TestService service type TestServiceServer interface { @@ -137,6 +219,10 @@ type TestServiceServer interface { // As one request could lead to multiple responses, this interface // demonstrates the idea of full duplexing. FullDuplexCall(TestService_FullDuplexCallServer) error + // Client stream + ClientStreamCall(TestService_ClientStreamCallServer) error + // Server stream + ServerStreamCall(*SimpleRequest, TestService_ServerStreamCallServer) error } func RegisterTestServiceServer(s *grpc.Server, srv TestServiceServer) { @@ -187,6 +273,53 @@ func (x *testServiceFullDuplexCallServer) Recv() (*SimpleRequest, error) { return m, nil } +func _TestService_ClientStreamCall_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(TestServiceServer).ClientStreamCall(&testServiceClientStreamCallServer{stream}) +} + +type TestService_ClientStreamCallServer interface { + SendAndClose(*SimpleResponse) error + Recv() (*SimpleRequest, error) + grpc.ServerStream +} + +type testServiceClientStreamCallServer struct { + grpc.ServerStream +} + +func (x *testServiceClientStreamCallServer) SendAndClose(m *SimpleResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *testServiceClientStreamCallServer) Recv() (*SimpleRequest, error) { + m := new(SimpleRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func _TestService_ServerStreamCall_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(SimpleRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(TestServiceServer).ServerStreamCall(m, &testServiceServerStreamCallServer{stream}) +} + +type TestService_ServerStreamCallServer interface { + Send(*SimpleResponse) error + grpc.ServerStream +} + +type testServiceServerStreamCallServer struct { + grpc.ServerStream +} + +func (x *testServiceServerStreamCallServer) Send(m *SimpleResponse) error { + return x.ServerStream.SendMsg(m) +} + var _TestService_serviceDesc = grpc.ServiceDesc{ ServiceName: "grpc.testing.TestService", HandlerType: (*TestServiceServer)(nil), @@ -203,6 +336,16 @@ var _TestService_serviceDesc = grpc.ServiceDesc{ ServerStreams: true, ClientStreams: true, }, + { + StreamName: "ClientStreamCall", + Handler: _TestService_ClientStreamCall_Handler, + ClientStreams: true, + }, + { + StreamName: "ServerStreamCall", + Handler: _TestService_ServerStreamCall_Handler, + ServerStreams: true, + }, }, Metadata: "test.proto", } @@ -210,16 +353,18 @@ var _TestService_serviceDesc = grpc.ServiceDesc{ func init() { proto.RegisterFile("test.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 167 bytes of a gzipped FileDescriptorProto + // 196 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x2a, 0x49, 0x2d, 0x2e, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x49, 0x2f, 0x2a, 0x48, 0xd6, 0x03, 0x09, 0x64, 0xe6, 0xa5, 0x2b, 0xc9, 0x73, 0xf1, 0x06, 0x67, 0xe6, 0x16, 0xe4, 0xa4, 0x06, 0xa5, 0x16, 0x96, 0xa6, 0x16, 0x97, 0x08, 0xf1, 0x71, 0x31, 0x65, 0xa6, 0x48, 0x30, 0x29, 0x30, 0x6a, 0xb0, 0x06, 0x31, 0x65, 0xa6, 0x28, 0x29, 0x70, 0xf1, 0xc1, 0x14, 0x14, 0x17, 0xe4, 0xe7, 0x15, 0xa7, 0x42, - 0x55, 0x30, 0xc3, 0x54, 0x18, 0x2d, 0x63, 0xe4, 0xe2, 0x0e, 0x49, 0x2d, 0x2e, 0x09, 0x4e, 0x2d, + 0x55, 0x30, 0xc3, 0x54, 0x18, 0x9d, 0x60, 0xe2, 0xe2, 0x0e, 0x49, 0x2d, 0x2e, 0x09, 0x4e, 0x2d, 0x2a, 0xcb, 0x4c, 0x4e, 0x15, 0x72, 0xe3, 0xe2, 0x0c, 0xcd, 0x4b, 0x2c, 0xaa, 0x74, 0x4e, 0xcc, 0xc9, 0x11, 0x92, 0xd6, 0x43, 0xb6, 0x4e, 0x0f, 0xc5, 0x2e, 0x29, 0x19, 0xec, 0x92, 0x50, 0x7b, 0xfc, 0xb9, 0xf8, 0xdc, 0x4a, 0x73, 0x72, 0x5c, 0x4a, 0x0b, 0x72, 0x52, 0x2b, 0x28, 0x34, 0x4c, - 0x83, 0xd1, 0x80, 0x31, 0x89, 0x0d, 0x1c, 0x00, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x8d, - 0x82, 0x5b, 0xdd, 0x0e, 0x01, 0x00, 0x00, + 0x83, 0xd1, 0x80, 0x51, 0xc8, 0x9f, 0x4b, 0xc0, 0x39, 0x27, 0x33, 0x35, 0xaf, 0x24, 0xb8, 0xa4, + 0x28, 0x35, 0x31, 0x97, 0x62, 0x23, 0x41, 0x06, 0x82, 0x3c, 0x9d, 0x5a, 0x44, 0x15, 0x03, 0x0d, + 0x18, 0x93, 0xd8, 0xc0, 0x51, 0x64, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x61, 0x49, 0x59, 0xe6, + 0xb0, 0x01, 0x00, 0x00, } diff --git a/stats/grpc_testing/test.proto b/stats/grpc_testing/test.proto index 54e6f744..bea8c4c7 100644 --- a/stats/grpc_testing/test.proto +++ b/stats/grpc_testing/test.proto @@ -20,4 +20,10 @@ service TestService { // As one request could lead to multiple responses, this interface // demonstrates the idea of full duplexing. rpc FullDuplexCall(stream SimpleRequest) returns (stream SimpleResponse); + + // Client stream + rpc ClientStreamCall(stream SimpleRequest) returns (SimpleResponse); + + // Server stream + rpc ServerStreamCall(SimpleRequest) returns (stream SimpleResponse); } diff --git a/stats/stats_test.go b/stats/stats_test.go index 35d60a45..467d6a58 100644 --- a/stats/stats_test.go +++ b/stats/stats_test.go @@ -120,6 +120,51 @@ func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServ } } +func (s *testServer) ClientStreamCall(stream testpb.TestService_ClientStreamCallServer) error { + md, ok := metadata.FromContext(stream.Context()) + if ok { + if err := stream.SendHeader(md); err != nil { + return grpc.Errorf(grpc.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil) + } + stream.SetTrailer(testTrailerMetadata) + } + for { + in, err := stream.Recv() + if err == io.EOF { + // read done. + return stream.SendAndClose(&testpb.SimpleResponse{Id: int32(0)}) + } + if err != nil { + return err + } + + if in.Id == errorID { + return fmt.Errorf("got error id: %v", in.Id) + } + } +} + +func (s *testServer) ServerStreamCall(in *testpb.SimpleRequest, stream testpb.TestService_ServerStreamCallServer) error { + md, ok := metadata.FromContext(stream.Context()) + if ok { + if err := stream.SendHeader(md); err != nil { + return grpc.Errorf(grpc.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil) + } + stream.SetTrailer(testTrailerMetadata) + } + + if in.Id == errorID { + return fmt.Errorf("got error id: %v", in.Id) + } + + for i := 0; i < 5; i++ { + if err := stream.Send(&testpb.SimpleResponse{Id: in.Id}); err != nil { + return err + } + } + return nil +} + // test is an end-to-end test. It should be created with the newTest // func, modified as needed, and then started with its startServer method. // It should be cleaned up with the tearDown method. @@ -218,12 +263,21 @@ func (te *test) clientConn() *grpc.ClientConn { return te.cc } +type rpcType int + +const ( + unaryRPC rpcType = iota + clientStreamRPC + serverStreamRPC + fullDuplexStreamRPC +) + type rpcConfig struct { count int // Number of requests and responses for streaming RPCs. success bool // Whether the RPC should succeed or return error. failfast bool - streaming bool // Whether the rpc should be a streaming RPC. - noLastRecv bool // Whether to call recv for io.EOF. When true, last recv won't be called. Only valid for streaming RPCs. + callType rpcType // Type of RPC. + noLastRecv bool // Whether to call recv for io.EOF. When true, last recv won't be called. Only valid for streaming RPCs. } func (te *test) doUnaryCall(c *rpcConfig) (*testpb.SimpleRequest, *testpb.SimpleResponse, error) { @@ -289,6 +343,64 @@ func (te *test) doFullDuplexCallRoundtrip(c *rpcConfig) ([]*testpb.SimpleRequest return reqs, resps, nil } +func (te *test) doClientStreamCall(c *rpcConfig) ([]*testpb.SimpleRequest, *testpb.SimpleResponse, error) { + var ( + reqs []*testpb.SimpleRequest + resp *testpb.SimpleResponse + err error + ) + tc := testpb.NewTestServiceClient(te.clientConn()) + stream, err := tc.ClientStreamCall(metadata.NewContext(context.Background(), testMetadata), grpc.FailFast(c.failfast)) + if err != nil { + return reqs, resp, err + } + var startID int32 + if !c.success { + startID = errorID + } + for i := 0; i < c.count; i++ { + req := &testpb.SimpleRequest{ + Id: int32(i) + startID, + } + reqs = append(reqs, req) + if err = stream.Send(req); err != nil { + return reqs, resp, err + } + } + resp, err = stream.CloseAndRecv() + return reqs, resp, err +} + +func (te *test) doServerStreamCall(c *rpcConfig) (*testpb.SimpleRequest, []*testpb.SimpleResponse, error) { + var ( + req *testpb.SimpleRequest + resps []*testpb.SimpleResponse + err error + ) + + tc := testpb.NewTestServiceClient(te.clientConn()) + + var startID int32 + if !c.success { + startID = errorID + } + req = &testpb.SimpleRequest{Id: startID} + stream, err := tc.ServerStreamCall(metadata.NewContext(context.Background(), testMetadata), req, grpc.FailFast(c.failfast)) + if err != nil { + return req, resps, err + } + for { + var resp *testpb.SimpleResponse + resp, err := stream.Recv() + if err == io.EOF { + return req, resps, nil + } else if err != nil { + return req, resps, err + } + resps = append(resps, resp) + } +} + type expectedData struct { method string serverAddr string @@ -672,16 +784,35 @@ func testServerStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs []f defer te.tearDown() var ( - reqs []*testpb.SimpleRequest - resps []*testpb.SimpleResponse - err error + reqs []*testpb.SimpleRequest + resps []*testpb.SimpleResponse + err error + method string + + req *testpb.SimpleRequest + resp *testpb.SimpleResponse + e error ) - if !cc.streaming { - req, resp, e := te.doUnaryCall(cc) + + switch cc.callType { + case unaryRPC: + method = "/grpc.testing.TestService/UnaryCall" + req, resp, e = te.doUnaryCall(cc) reqs = []*testpb.SimpleRequest{req} resps = []*testpb.SimpleResponse{resp} err = e - } else { + case clientStreamRPC: + method = "/grpc.testing.TestService/ClientStreamCall" + reqs, resp, e = te.doClientStreamCall(cc) + resps = []*testpb.SimpleResponse{resp} + err = e + case serverStreamRPC: + method = "/grpc.testing.TestService/ServerStreamCall" + req, resps, e = te.doServerStreamCall(cc) + reqs = []*testpb.SimpleRequest{req} + err = e + case fullDuplexStreamRPC: + method = "/grpc.testing.TestService/FullDuplexCall" reqs, resps, err = te.doFullDuplexCallRoundtrip(cc) } if cc.success != (err == nil) { @@ -713,22 +844,18 @@ func testServerStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs []f expect := &expectedData{ serverAddr: te.srvAddr, compression: tc.compress, + method: method, requests: reqs, responses: resps, err: err, } - if !cc.streaming { - expect.method = "/grpc.testing.TestService/UnaryCall" - } else { - expect.method = "/grpc.testing.TestService/FullDuplexCall" - } checkConnStats(t, h.gotConn) checkServerStats(t, h.gotRPC, expect, checkFuncs) } func TestServerStatsUnaryRPC(t *testing.T) { - testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: true}, []func(t *testing.T, d *gotData, e *expectedData){ + testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: true, callType: unaryRPC}, []func(t *testing.T, d *gotData, e *expectedData){ checkInHeader, checkBegin, checkInPayload, @@ -740,7 +867,7 @@ func TestServerStatsUnaryRPC(t *testing.T) { } func TestServerStatsUnaryRPCError(t *testing.T) { - testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: false}, []func(t *testing.T, d *gotData, e *expectedData){ + testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: false, callType: unaryRPC}, []func(t *testing.T, d *gotData, e *expectedData){ checkInHeader, checkBegin, checkInPayload, @@ -750,7 +877,73 @@ func TestServerStatsUnaryRPCError(t *testing.T) { }) } -func TestServerStatsStreamingRPC(t *testing.T) { +func TestServerStatsClientStreamRPC(t *testing.T) { + count := 5 + checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){ + checkInHeader, + checkBegin, + checkOutHeader, + } + ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){ + checkInPayload, + } + for i := 0; i < count; i++ { + checkFuncs = append(checkFuncs, ioPayFuncs...) + } + checkFuncs = append(checkFuncs, + checkOutPayload, + checkOutTrailer, + checkEnd, + ) + testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: clientStreamRPC}, checkFuncs) +} + +func TestServerStatsClientStreamRPCError(t *testing.T) { + count := 1 + testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: clientStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){ + checkInHeader, + checkBegin, + checkOutHeader, + checkInPayload, + checkOutTrailer, + checkEnd, + }) +} + +func TestServerStatsServerStreamRPC(t *testing.T) { + count := 5 + checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){ + checkInHeader, + checkBegin, + checkInPayload, + checkOutHeader, + } + ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){ + checkOutPayload, + } + for i := 0; i < count; i++ { + checkFuncs = append(checkFuncs, ioPayFuncs...) + } + checkFuncs = append(checkFuncs, + checkOutTrailer, + checkEnd, + ) + testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: serverStreamRPC}, checkFuncs) +} + +func TestServerStatsServerStreamRPCError(t *testing.T) { + count := 5 + testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: serverStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){ + checkInHeader, + checkBegin, + checkInPayload, + checkOutHeader, + checkOutTrailer, + checkEnd, + }) +} + +func TestServerStatsFullDuplexRPC(t *testing.T) { count := 5 checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){ checkInHeader, @@ -768,12 +961,12 @@ func TestServerStatsStreamingRPC(t *testing.T) { checkOutTrailer, checkEnd, ) - testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, streaming: true}, checkFuncs) + testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: fullDuplexStreamRPC}, checkFuncs) } -func TestServerStatsStreamingRPCError(t *testing.T) { +func TestServerStatsFullDuplexRPCError(t *testing.T) { count := 5 - testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, streaming: true}, []func(t *testing.T, d *gotData, e *expectedData){ + testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: fullDuplexStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){ checkInHeader, checkBegin, checkOutHeader, @@ -880,16 +1073,34 @@ func testClientStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs map defer te.tearDown() var ( - reqs []*testpb.SimpleRequest - resps []*testpb.SimpleResponse - err error + reqs []*testpb.SimpleRequest + resps []*testpb.SimpleResponse + method string + err error + + req *testpb.SimpleRequest + resp *testpb.SimpleResponse + e error ) - if !cc.streaming { - req, resp, e := te.doUnaryCall(cc) + switch cc.callType { + case unaryRPC: + method = "/grpc.testing.TestService/UnaryCall" + req, resp, e = te.doUnaryCall(cc) reqs = []*testpb.SimpleRequest{req} resps = []*testpb.SimpleResponse{resp} err = e - } else { + case clientStreamRPC: + method = "/grpc.testing.TestService/ClientStreamCall" + reqs, resp, e = te.doClientStreamCall(cc) + resps = []*testpb.SimpleResponse{resp} + err = e + case serverStreamRPC: + method = "/grpc.testing.TestService/ServerStreamCall" + req, resps, e = te.doServerStreamCall(cc) + reqs = []*testpb.SimpleRequest{req} + err = e + case fullDuplexStreamRPC: + method = "/grpc.testing.TestService/FullDuplexCall" reqs, resps, err = te.doFullDuplexCallRoundtrip(cc) } if cc.success != (err == nil) { @@ -925,23 +1136,19 @@ func testClientStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs map expect := &expectedData{ serverAddr: te.srvAddr, compression: tc.compress, + method: method, requests: reqs, responses: resps, failfast: cc.failfast, err: err, } - if !cc.streaming { - expect.method = "/grpc.testing.TestService/UnaryCall" - } else { - expect.method = "/grpc.testing.TestService/FullDuplexCall" - } checkConnStats(t, h.gotConn) checkClientStats(t, h.gotRPC, expect, checkFuncs) } func TestClientStatsUnaryRPC(t *testing.T) { - testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: true, failfast: false}, map[int]*checkFuncWithCount{ + testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: true, failfast: false, callType: unaryRPC}, map[int]*checkFuncWithCount{ begin: {checkBegin, 1}, outHeader: {checkOutHeader, 1}, outPayload: {checkOutPayload, 1}, @@ -953,7 +1160,7 @@ func TestClientStatsUnaryRPC(t *testing.T) { } func TestClientStatsUnaryRPCError(t *testing.T) { - testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: false, failfast: false}, map[int]*checkFuncWithCount{ + testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: false, failfast: false, callType: unaryRPC}, map[int]*checkFuncWithCount{ begin: {checkBegin, 1}, outHeader: {checkOutHeader, 1}, outPayload: {checkOutPayload, 1}, @@ -963,23 +1170,59 @@ func TestClientStatsUnaryRPCError(t *testing.T) { }) } -func TestClientStatsStreamingRPC(t *testing.T) { +func TestClientStatsClientStreamRPC(t *testing.T) { count := 5 - testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, streaming: true}, map[int]*checkFuncWithCount{ + testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: clientStreamRPC}, map[int]*checkFuncWithCount{ begin: {checkBegin, 1}, outHeader: {checkOutHeader, 1}, - outPayload: {checkOutPayload, count}, inHeader: {checkInHeader, 1}, - inPayload: {checkInPayload, count}, + outPayload: {checkOutPayload, count}, inTrailer: {checkInTrailer, 1}, + inPayload: {checkInPayload, 1}, end: {checkEnd, 1}, }) } -// If the user doesn't call the last recv() on clientSteam. -func TestClientStatsStreamingRPCNotCallingLastRecv(t *testing.T) { +func TestClientStatsClientStreamRPCError(t *testing.T) { count := 1 - testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, streaming: true, noLastRecv: true}, map[int]*checkFuncWithCount{ + testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: clientStreamRPC}, map[int]*checkFuncWithCount{ + begin: {checkBegin, 1}, + outHeader: {checkOutHeader, 1}, + inHeader: {checkInHeader, 1}, + outPayload: {checkOutPayload, 1}, + inTrailer: {checkInTrailer, 1}, + end: {checkEnd, 1}, + }) +} + +func TestClientStatsServerStreamRPC(t *testing.T) { + count := 5 + testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: serverStreamRPC}, map[int]*checkFuncWithCount{ + begin: {checkBegin, 1}, + outHeader: {checkOutHeader, 1}, + outPayload: {checkOutPayload, 1}, + inHeader: {checkInHeader, 1}, + inPayload: {checkInPayload, count}, + inTrailer: {checkInTrailer, 1}, + end: {checkEnd, 1}, + }) +} + +func TestClientStatsServerStreamRPCError(t *testing.T) { + count := 5 + testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: serverStreamRPC}, map[int]*checkFuncWithCount{ + begin: {checkBegin, 1}, + outHeader: {checkOutHeader, 1}, + outPayload: {checkOutPayload, 1}, + inHeader: {checkInHeader, 1}, + inTrailer: {checkInTrailer, 1}, + end: {checkEnd, 1}, + }) +} + +func TestClientStatsFullDuplexRPC(t *testing.T) { + count := 5 + testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: fullDuplexStreamRPC}, map[int]*checkFuncWithCount{ begin: {checkBegin, 1}, outHeader: {checkOutHeader, 1}, outPayload: {checkOutPayload, count}, @@ -990,9 +1233,9 @@ func TestClientStatsStreamingRPCNotCallingLastRecv(t *testing.T) { }) } -func TestClientStatsStreamingRPCError(t *testing.T) { +func TestClientStatsFullDuplexRPCError(t *testing.T) { count := 5 - testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, streaming: true}, map[int]*checkFuncWithCount{ + testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: fullDuplexStreamRPC}, map[int]*checkFuncWithCount{ begin: {checkBegin, 1}, outHeader: {checkOutHeader, 1}, outPayload: {checkOutPayload, 1}, @@ -1001,3 +1244,17 @@ func TestClientStatsStreamingRPCError(t *testing.T) { end: {checkEnd, 1}, }) } + +// If the user doesn't call the last recv() on clientStream. +func TestClientStatsFullDuplexRPCNotCallingLastRecv(t *testing.T) { + count := 1 + testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: fullDuplexStreamRPC, noLastRecv: true}, map[int]*checkFuncWithCount{ + begin: {checkBegin, 1}, + outHeader: {checkOutHeader, 1}, + outPayload: {checkOutPayload, count}, + inHeader: {checkInHeader, 1}, + inPayload: {checkInPayload, count}, + inTrailer: {checkInTrailer, 1}, + end: {checkEnd, 1}, + }) +}