Add bytebuf codec and generic benchmark server

This commit is contained in:
Menghan Li
2016-04-20 11:54:51 -07:00
parent 9fd1d5bee8
commit 643486f084
3 changed files with 82 additions and 19 deletions

View File

@ -93,7 +93,7 @@ func (s *testServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallS
// StartServer starts a gRPC server serving a benchmark service on the given
// address, which may be something like "localhost:0". It returns its listen
// address and a function to stop the server.
// port number and a function to stop the server.
func StartServer(addr string, opts ...grpc.ServerOption) (int, func()) {
lis, err := net.Listen("tcp", addr)
if err != nil {
@ -107,6 +107,47 @@ func StartServer(addr string, opts ...grpc.ServerOption) (int, func()) {
}
}
type genericTestServer struct {
reqSize int32
respSize int32
}
func (s *genericTestServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
}
func (s *genericTestServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error {
for {
m := make([]byte, s.reqSize)
err := stream.(grpc.ServerStream).RecvMsg(m)
if err == io.EOF {
// read done.
return nil
}
if err != nil {
return err
}
if err := stream.(grpc.ServerStream).SendMsg(make([]byte, s.respSize)); err != nil {
return err
}
}
}
// StartGenericServer starts a gRPC a benchmark service server, which supports custom codec.
// It returns its listen port number and a function to stop the server.
func StartGenericServer(addr string, reqSize, respSize int32, opts ...grpc.ServerOption) (int, func()) {
lis, err := net.Listen("tcp", addr)
if err != nil {
grpclog.Fatalf("Failed to listen: %v", err)
}
s := grpc.NewServer(opts...)
testpb.RegisterBenchmarkServiceServer(s, &genericTestServer{reqSize: reqSize, respSize: respSize})
go s.Serve(lis)
return lis.Addr().(*net.TCPAddr).Port, func() {
s.Stop()
}
}
// DoUnaryCall performs an unary RPC with given stub and request and response sizes.
func DoUnaryCall(tc testpb.BenchmarkServiceClient, reqSize, respSize int) {
pl := newPayload(testpb.PayloadType_COMPRESSABLE, reqSize)

View File

@ -52,18 +52,11 @@ func startBenchmarkServerWithSetup(setup *testpb.ServerConfig) (*benchmarkServer
// Ignore async server threads.
grpclog.Printf(" - core limit: %v", setup.CoreLimit)
grpclog.Printf(" - payload config: %v", setup.PayloadConfig)
if setup.PayloadConfig != nil {
// TODO payload config
grpclog.Printf("payload config: %v", setup.PayloadConfig)
switch setup.PayloadConfig.Payload.(type) {
case *testpb.PayloadConfig_BytebufParams:
case *testpb.PayloadConfig_SimpleParams:
case *testpb.PayloadConfig_ComplexParams:
default:
return nil, grpc.Errorf(codes.InvalidArgument, "unknow payload config: %v", setup.PayloadConfig)
}
if setup.CoreLimit > 0 {
runtime.GOMAXPROCS(int(setup.CoreLimit))
} else {
// runtime.GOMAXPROCS(runtime.NumCPU())
runtime.GOMAXPROCS(1)
}
grpclog.Printf(" - core list: %v", setup.CoreList)
@ -72,15 +65,28 @@ func startBenchmarkServerWithSetup(setup *testpb.ServerConfig) (*benchmarkServer
grpclog.Printf("specifying cores to run server on: %v", setup.CoreList)
}
if setup.CoreLimit > 0 {
runtime.GOMAXPROCS(int(setup.CoreLimit))
grpclog.Printf(" - port: %v", setup.Port)
grpclog.Printf(" - payload config: %v", setup.PayloadConfig)
var p int
var close func()
if setup.PayloadConfig != nil {
// TODO payload config
grpclog.Printf("payload config: %v", setup.PayloadConfig)
switch payload := setup.PayloadConfig.Payload.(type) {
case *testpb.PayloadConfig_BytebufParams:
opts = append(opts, grpc.CustomCodec(byteBufCodec{}))
p, close = benchmark.StartGenericServer(":"+strconv.Itoa(int(setup.Port)), payload.BytebufParams.ReqSize, payload.BytebufParams.RespSize, opts...)
case *testpb.PayloadConfig_SimpleParams:
p, close = benchmark.StartServer(":"+strconv.Itoa(int(setup.Port)), opts...)
case *testpb.PayloadConfig_ComplexParams:
default:
return nil, grpc.Errorf(codes.InvalidArgument, "unknow payload config: %v", setup.PayloadConfig)
}
} else {
// runtime.GOMAXPROCS(runtime.NumCPU())
runtime.GOMAXPROCS(1)
// Start protobuf server is payload config is nil
p, close = benchmark.StartServer(":"+strconv.Itoa(int(setup.Port)), opts...)
}
grpclog.Printf(" - port: %v", setup.Port)
p, close := benchmark.StartServer(":"+strconv.Itoa(int(setup.Port)), opts...)
grpclog.Printf("benchmark server listening at port %v", p)
// temp := strings.Split(addr, ":")

View File

@ -18,6 +18,22 @@ var (
// ports = []string{":10010"}
)
type byteBufCodec struct {
}
func (byteBufCodec) Marshal(v interface{}) ([]byte, error) {
return v.([]byte), nil
}
func (byteBufCodec) Unmarshal(data []byte, v interface{}) error {
v = data
return nil
}
func (byteBufCodec) String() string {
return "byteBufCodec"
}
type workerServer struct {
bs *benchmarkServer
}