diff --git a/benchmark/benchmark.go b/benchmark/benchmark.go index 574c4a92..56e2b78f 100644 --- a/benchmark/benchmark.go +++ b/benchmark/benchmark.go @@ -108,7 +108,6 @@ func StartServer(addr string, opts ...grpc.ServerOption) (string, func()) { } type byteBufServer struct { - reqSize int32 respSize int32 } @@ -118,15 +117,16 @@ func (s *byteBufServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) func (s *byteBufServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error { for { - m := make([]byte, s.reqSize) - err := stream.(grpc.ServerStream).RecvMsg(m) + var in []byte + err := stream.(grpc.ServerStream).RecvMsg(&in) if err == io.EOF { return nil } if err != nil { return err } - if err := stream.(grpc.ServerStream).SendMsg(make([]byte, s.respSize)); err != nil { + out := make([]byte, s.respSize) + if err := stream.(grpc.ServerStream).SendMsg(&out); err != nil { return err } } @@ -134,13 +134,13 @@ func (s *byteBufServer) StreamingCall(stream testpb.BenchmarkService_StreamingCa // StartbyteBufServer starts a benchmark service server that supports custom codec. // It returns its listen address and a function to stop the server. -func StartByteBufServer(addr string, reqSize, respSize int32, opts ...grpc.ServerOption) (string, func()) { +func StartByteBufServer(addr string, respSize int32, opts ...grpc.ServerOption) (string, func()) { lis, err := net.Listen("tcp", addr) if err != nil { grpclog.Fatalf("Failed to listen: %v", err) } s := grpc.NewServer(opts...) - testpb.RegisterBenchmarkServiceServer(s, &byteBufServer{reqSize: reqSize, respSize: respSize}) + testpb.RegisterBenchmarkServiceServer(s, &byteBufServer{respSize: respSize}) go s.Serve(lis) return lis.Addr().String(), func() { s.Stop() @@ -180,11 +180,12 @@ func DoStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, re // DoByteBufStreamingRoundTrip performs a round trip for a single streaming rpc, using custom codec. func DoByteBufStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error { - if err := stream.(grpc.ClientStream).SendMsg(make([]byte, reqSize)); err != nil { + out := make([]byte, reqSize) + if err := stream.(grpc.ClientStream).SendMsg(&out); err != nil { return grpc.Errorf(grpc.Code(err), "StreamingCall(_).(ClientStream).SendMsg: %v", grpc.ErrorDesc(err)) } - m := make([]byte, respSize) - if err := stream.(grpc.ClientStream).RecvMsg(m); err != nil { + var in []byte + if err := stream.(grpc.ClientStream).RecvMsg(&in); err != nil { return grpc.Errorf(grpc.Code(err), "StreamingCall(_).(ClientStream).RecvMsg: %v", grpc.ErrorDesc(err)) } return nil diff --git a/benchmark/worker/benchmark_server.go b/benchmark/worker/benchmark_server.go index 1b58a854..5e2fd7b3 100644 --- a/benchmark/worker/benchmark_server.go +++ b/benchmark/worker/benchmark_server.go @@ -114,7 +114,7 @@ func startBenchmarkServerWithSetup(setup *testpb.ServerConfig, serverPort int) ( switch payload := setup.PayloadConfig.Payload.(type) { case *testpb.PayloadConfig_BytebufParams: opts = append(opts, grpc.CustomCodec(byteBufCodec{})) - addr, close = benchmark.StartByteBufServer(":"+strconv.Itoa(port), payload.BytebufParams.ReqSize, payload.BytebufParams.RespSize, opts...) + addr, close = benchmark.StartByteBufServer(":"+strconv.Itoa(port), payload.BytebufParams.RespSize, opts...) case *testpb.PayloadConfig_SimpleParams: addr, close = benchmark.StartServer(":"+strconv.Itoa(port), opts...) case *testpb.PayloadConfig_ComplexParams: diff --git a/benchmark/worker/main.go b/benchmark/worker/main.go index 52990929..72ed63d4 100644 --- a/benchmark/worker/main.go +++ b/benchmark/worker/main.go @@ -35,6 +35,7 @@ package main import ( "flag" + "fmt" "io" "net" "runtime" @@ -56,11 +57,19 @@ type byteBufCodec struct { } func (byteBufCodec) Marshal(v interface{}) ([]byte, error) { - return v.([]byte), nil + b, ok := v.(*[]byte) + if !ok { + return nil, fmt.Errorf("Failed to marshal: %v is not type of *[]byte") + } + return *b, nil } func (byteBufCodec) Unmarshal(data []byte, v interface{}) error { - v = data + b, ok := v.(*[]byte) + if !ok { + return fmt.Errorf("Failed to marshal: %v is not type of *[]byte") + } + *b = data return nil }