Modify byteBufCodec to use *[]byte and remove reqSize from bytebufserver

This commit is contained in:
Menghan Li
2016-04-28 15:49:24 -07:00
parent 988c934720
commit 997b80914b
3 changed files with 22 additions and 12 deletions

View File

@ -108,7 +108,6 @@ func StartServer(addr string, opts ...grpc.ServerOption) (string, func()) {
} }
type byteBufServer struct { type byteBufServer struct {
reqSize int32
respSize int32 respSize int32
} }
@ -118,15 +117,16 @@ func (s *byteBufServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest)
func (s *byteBufServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error { func (s *byteBufServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error {
for { for {
m := make([]byte, s.reqSize) var in []byte
err := stream.(grpc.ServerStream).RecvMsg(m) err := stream.(grpc.ServerStream).RecvMsg(&in)
if err == io.EOF { if err == io.EOF {
return nil return nil
} }
if err != nil { if err != nil {
return err return err
} }
if err := stream.(grpc.ServerStream).SendMsg(make([]byte, s.respSize)); err != nil { out := make([]byte, s.respSize)
if err := stream.(grpc.ServerStream).SendMsg(&out); err != nil {
return err return err
} }
} }
@ -134,13 +134,13 @@ func (s *byteBufServer) StreamingCall(stream testpb.BenchmarkService_StreamingCa
// StartbyteBufServer starts a benchmark service server that supports custom codec. // StartbyteBufServer starts a benchmark service server that supports custom codec.
// It returns its listen address and a function to stop the server. // It returns its listen address and a function to stop the server.
func StartByteBufServer(addr string, reqSize, respSize int32, opts ...grpc.ServerOption) (string, func()) { func StartByteBufServer(addr string, respSize int32, opts ...grpc.ServerOption) (string, func()) {
lis, err := net.Listen("tcp", addr) lis, err := net.Listen("tcp", addr)
if err != nil { if err != nil {
grpclog.Fatalf("Failed to listen: %v", err) grpclog.Fatalf("Failed to listen: %v", err)
} }
s := grpc.NewServer(opts...) s := grpc.NewServer(opts...)
testpb.RegisterBenchmarkServiceServer(s, &byteBufServer{reqSize: reqSize, respSize: respSize}) testpb.RegisterBenchmarkServiceServer(s, &byteBufServer{respSize: respSize})
go s.Serve(lis) go s.Serve(lis)
return lis.Addr().String(), func() { return lis.Addr().String(), func() {
s.Stop() s.Stop()
@ -180,11 +180,12 @@ func DoStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, re
// DoByteBufStreamingRoundTrip performs a round trip for a single streaming rpc, using custom codec. // DoByteBufStreamingRoundTrip performs a round trip for a single streaming rpc, using custom codec.
func DoByteBufStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error { func DoByteBufStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
if err := stream.(grpc.ClientStream).SendMsg(make([]byte, reqSize)); err != nil { out := make([]byte, reqSize)
if err := stream.(grpc.ClientStream).SendMsg(&out); err != nil {
return grpc.Errorf(grpc.Code(err), "StreamingCall(_).(ClientStream).SendMsg: %v", grpc.ErrorDesc(err)) return grpc.Errorf(grpc.Code(err), "StreamingCall(_).(ClientStream).SendMsg: %v", grpc.ErrorDesc(err))
} }
m := make([]byte, respSize) var in []byte
if err := stream.(grpc.ClientStream).RecvMsg(m); err != nil { if err := stream.(grpc.ClientStream).RecvMsg(&in); err != nil {
return grpc.Errorf(grpc.Code(err), "StreamingCall(_).(ClientStream).RecvMsg: %v", grpc.ErrorDesc(err)) return grpc.Errorf(grpc.Code(err), "StreamingCall(_).(ClientStream).RecvMsg: %v", grpc.ErrorDesc(err))
} }
return nil return nil

View File

@ -114,7 +114,7 @@ func startBenchmarkServerWithSetup(setup *testpb.ServerConfig, serverPort int) (
switch payload := setup.PayloadConfig.Payload.(type) { switch payload := setup.PayloadConfig.Payload.(type) {
case *testpb.PayloadConfig_BytebufParams: case *testpb.PayloadConfig_BytebufParams:
opts = append(opts, grpc.CustomCodec(byteBufCodec{})) opts = append(opts, grpc.CustomCodec(byteBufCodec{}))
addr, close = benchmark.StartByteBufServer(":"+strconv.Itoa(port), payload.BytebufParams.ReqSize, payload.BytebufParams.RespSize, opts...) addr, close = benchmark.StartByteBufServer(":"+strconv.Itoa(port), payload.BytebufParams.RespSize, opts...)
case *testpb.PayloadConfig_SimpleParams: case *testpb.PayloadConfig_SimpleParams:
addr, close = benchmark.StartServer(":"+strconv.Itoa(port), opts...) addr, close = benchmark.StartServer(":"+strconv.Itoa(port), opts...)
case *testpb.PayloadConfig_ComplexParams: case *testpb.PayloadConfig_ComplexParams:

View File

@ -35,6 +35,7 @@ package main
import ( import (
"flag" "flag"
"fmt"
"io" "io"
"net" "net"
"runtime" "runtime"
@ -56,11 +57,19 @@ type byteBufCodec struct {
} }
func (byteBufCodec) Marshal(v interface{}) ([]byte, error) { func (byteBufCodec) Marshal(v interface{}) ([]byte, error) {
return v.([]byte), nil b, ok := v.(*[]byte)
if !ok {
return nil, fmt.Errorf("Failed to marshal: %v is not type of *[]byte")
}
return *b, nil
} }
func (byteBufCodec) Unmarshal(data []byte, v interface{}) error { func (byteBufCodec) Unmarshal(data []byte, v interface{}) error {
v = data b, ok := v.(*[]byte)
if !ok {
return fmt.Errorf("Failed to marshal: %v is not type of *[]byte")
}
*b = data
return nil return nil
} }