Address review comments

Change startBenchmarkClient and startBenchmarkServer
This commit is contained in:
Menghan Li
2016-04-28 16:16:54 -07:00
parent dad9308fa3
commit 3a13913bba
3 changed files with 61 additions and 61 deletions

View File

@ -60,27 +60,27 @@ type benchmarkClient struct {
histogram *stats.Histogram histogram *stats.Histogram
} }
func startBenchmarkClientWithSetup(setup *testpb.ClientConfig) (*benchmarkClient, error) { func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error) {
var opts []grpc.DialOption var opts []grpc.DialOption
// Some setup options are ignored: // Some config options are ignored:
// - client type: // - client type:
// will always create sync client // will always create sync client
// - async client threads. // - async client threads.
// - core list // - core list
grpclog.Printf(" * client type: %v (ignored, always creates sync client)", setup.ClientType) grpclog.Printf(" * client type: %v (ignored, always creates sync client)", config.ClientType)
switch setup.ClientType { switch config.ClientType {
case testpb.ClientType_SYNC_CLIENT: case testpb.ClientType_SYNC_CLIENT:
case testpb.ClientType_ASYNC_CLIENT: case testpb.ClientType_ASYNC_CLIENT:
default: default:
return nil, grpc.Errorf(codes.InvalidArgument, "unknow client type: %v", setup.ClientType) return nil, grpc.Errorf(codes.InvalidArgument, "unknow client type: %v", config.ClientType)
} }
grpclog.Printf(" * async client threads: %v (ignored)", setup.AsyncClientThreads) grpclog.Printf(" * async client threads: %v (ignored)", config.AsyncClientThreads)
grpclog.Printf(" * core list: %v (ignored)", setup.CoreList) grpclog.Printf(" * core list: %v (ignored)", config.CoreList)
grpclog.Printf(" - security params: %v", setup.SecurityParams) grpclog.Printf(" - security params: %v", config.SecurityParams)
if setup.SecurityParams != nil { if config.SecurityParams != nil {
creds, err := credentials.NewClientTLSFromFile(Abs(caFile), setup.SecurityParams.ServerHostOverride) creds, err := credentials.NewClientTLSFromFile(Abs(caFile), config.SecurityParams.ServerHostOverride)
if err != nil { if err != nil {
return nil, grpc.Errorf(codes.InvalidArgument, "failed to create TLS credentials %v", err) return nil, grpc.Errorf(codes.InvalidArgument, "failed to create TLS credentials %v", err)
} }
@ -89,19 +89,19 @@ func startBenchmarkClientWithSetup(setup *testpb.ClientConfig) (*benchmarkClient
opts = append(opts, grpc.WithInsecure()) opts = append(opts, grpc.WithInsecure())
} }
grpclog.Printf(" - core limit: %v", setup.CoreLimit) grpclog.Printf(" - core limit: %v", config.CoreLimit)
// Use one cpu core by default // Use one cpu core by default
numOfCores := 1 numOfCores := 1
if setup.CoreLimit > 0 { if config.CoreLimit > 0 {
numOfCores = int(setup.CoreLimit) numOfCores = int(config.CoreLimit)
} }
runtime.GOMAXPROCS(numOfCores) runtime.GOMAXPROCS(numOfCores)
grpclog.Printf(" - payload config: %v", setup.PayloadConfig) grpclog.Printf(" - payload config: %v", config.PayloadConfig)
var payloadReqSize, payloadRespSize int var payloadReqSize, payloadRespSize int
var payloadType string var payloadType string
if setup.PayloadConfig != nil { if config.PayloadConfig != nil {
switch c := setup.PayloadConfig.Payload.(type) { switch c := config.PayloadConfig.Payload.(type) {
case *testpb.PayloadConfig_BytebufParams: case *testpb.PayloadConfig_BytebufParams:
opts = append(opts, grpc.WithCodec(byteBufCodec{})) opts = append(opts, grpc.WithCodec(byteBufCodec{}))
payloadReqSize = int(c.BytebufParams.ReqSize) payloadReqSize = int(c.BytebufParams.ReqSize)
@ -112,60 +112,60 @@ func startBenchmarkClientWithSetup(setup *testpb.ClientConfig) (*benchmarkClient
payloadRespSize = int(c.SimpleParams.RespSize) payloadRespSize = int(c.SimpleParams.RespSize)
payloadType = "protobuf" payloadType = "protobuf"
case *testpb.PayloadConfig_ComplexParams: case *testpb.PayloadConfig_ComplexParams:
return nil, grpc.Errorf(codes.Unimplemented, "unsupported payload config: %v", setup.PayloadConfig) return nil, grpc.Errorf(codes.Unimplemented, "unsupported payload config: %v", config.PayloadConfig)
default: default:
return nil, grpc.Errorf(codes.InvalidArgument, "unknow payload config: %v", setup.PayloadConfig) return nil, grpc.Errorf(codes.InvalidArgument, "unknow payload config: %v", config.PayloadConfig)
} }
} }
grpclog.Printf(" - rpcs per chann: %v", setup.OutstandingRpcsPerChannel) grpclog.Printf(" - rpcs per chann: %v", config.OutstandingRpcsPerChannel)
grpclog.Printf(" - channel number: %v", setup.ClientChannels) grpclog.Printf(" - channel number: %v", config.ClientChannels)
rpcCountPerConn, connCount := int(setup.OutstandingRpcsPerChannel), int(setup.ClientChannels) rpcCountPerConn, connCount := int(config.OutstandingRpcsPerChannel), int(config.ClientChannels)
grpclog.Printf(" - load params: %v", setup.LoadParams) grpclog.Printf(" - load params: %v", config.LoadParams)
var dist *int var dist *int
switch lp := setup.LoadParams.Load.(type) { switch lp := config.LoadParams.Load.(type) {
case *testpb.LoadParams_ClosedLoop: case *testpb.LoadParams_ClosedLoop:
case *testpb.LoadParams_Poisson: case *testpb.LoadParams_Poisson:
grpclog.Printf(" - %v", lp.Poisson) grpclog.Printf(" - %v", lp.Poisson)
return nil, grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", setup.LoadParams) return nil, grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", config.LoadParams)
// TODO poisson // TODO poisson
case *testpb.LoadParams_Uniform: case *testpb.LoadParams_Uniform:
return nil, grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", setup.LoadParams) return nil, grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", config.LoadParams)
case *testpb.LoadParams_Determ: case *testpb.LoadParams_Determ:
return nil, grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", setup.LoadParams) return nil, grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", config.LoadParams)
case *testpb.LoadParams_Pareto: case *testpb.LoadParams_Pareto:
return nil, grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", setup.LoadParams) return nil, grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", config.LoadParams)
default: default:
return nil, grpc.Errorf(codes.InvalidArgument, "unknown load params: %v", setup.LoadParams) return nil, grpc.Errorf(codes.InvalidArgument, "unknown load params: %v", config.LoadParams)
} }
grpclog.Printf(" - rpc type: %v", setup.RpcType) grpclog.Printf(" - rpc type: %v", config.RpcType)
var rpcType string var rpcType string
switch setup.RpcType { switch config.RpcType {
case testpb.RpcType_UNARY: case testpb.RpcType_UNARY:
rpcType = "unary" rpcType = "unary"
case testpb.RpcType_STREAMING: case testpb.RpcType_STREAMING:
rpcType = "streaming" rpcType = "streaming"
default: default:
return nil, grpc.Errorf(codes.InvalidArgument, "unknown rpc type: %v", setup.RpcType) return nil, grpc.Errorf(codes.InvalidArgument, "unknown rpc type: %v", config.RpcType)
} }
grpclog.Printf(" - histogram params: %v", setup.HistogramParams) grpclog.Printf(" - histogram params: %v", config.HistogramParams)
grpclog.Printf(" - server targets: %v", setup.ServerTargets) grpclog.Printf(" - server targets: %v", config.ServerTargets)
conns := make([]*grpc.ClientConn, connCount) conns := make([]*grpc.ClientConn, connCount)
for connIndex := 0; connIndex < connCount; connIndex++ { for connIndex := 0; connIndex < connCount; connIndex++ {
conns[connIndex] = benchmark.NewClientConn(setup.ServerTargets[connIndex%len(setup.ServerTargets)], opts...) conns[connIndex] = benchmark.NewClientConn(config.ServerTargets[connIndex%len(config.ServerTargets)], opts...)
} }
bc := benchmarkClient{ bc := benchmarkClient{
histogram: stats.NewHistogram(stats.HistogramOptions{ histogram: stats.NewHistogram(stats.HistogramOptions{
NumBuckets: int(math.Log(setup.HistogramParams.MaxPossible)/math.Log(1+setup.HistogramParams.Resolution)) + 1, NumBuckets: int(math.Log(config.HistogramParams.MaxPossible)/math.Log(1+config.HistogramParams.Resolution)) + 1,
GrowthFactor: setup.HistogramParams.Resolution, GrowthFactor: config.HistogramParams.Resolution,
BaseBucketSize: (1 + setup.HistogramParams.Resolution), BaseBucketSize: (1 + config.HistogramParams.Resolution),
MinValue: 0, MinValue: 0,
}), }),
stop: make(chan bool), stop: make(chan bool),

View File

@ -62,27 +62,27 @@ type benchmarkServer struct {
lastResetTime time.Time lastResetTime time.Time
} }
func startBenchmarkServerWithSetup(setup *testpb.ServerConfig, serverPort int) (*benchmarkServer, error) { func startBenchmarkServer(config *testpb.ServerConfig, serverPort int) (*benchmarkServer, error) {
var opts []grpc.ServerOption var opts []grpc.ServerOption
// Some setup options are ignored: // Some config options are ignored:
// - server type: // - server type:
// will always start sync server // will always start sync server
// - async server threads // - async server threads
// - core list // - core list
grpclog.Printf(" * server type: %v (ignored, always starts sync server)", setup.ServerType) grpclog.Printf(" * server type: %v (ignored, always starts sync server)", config.ServerType)
switch setup.ServerType { switch config.ServerType {
case testpb.ServerType_SYNC_SERVER: case testpb.ServerType_SYNC_SERVER:
case testpb.ServerType_ASYNC_SERVER: case testpb.ServerType_ASYNC_SERVER:
case testpb.ServerType_ASYNC_GENERIC_SERVER: case testpb.ServerType_ASYNC_GENERIC_SERVER:
default: default:
return nil, grpc.Errorf(codes.InvalidArgument, "unknow server type: %v", setup.ServerType) return nil, grpc.Errorf(codes.InvalidArgument, "unknow server type: %v", config.ServerType)
} }
grpclog.Printf(" * async server threads: %v (ignored)", setup.AsyncServerThreads) grpclog.Printf(" * async server threads: %v (ignored)", config.AsyncServerThreads)
grpclog.Printf(" * core list: %v (ignored)", setup.CoreList) grpclog.Printf(" * core list: %v (ignored)", config.CoreList)
grpclog.Printf(" - security params: %v", setup.SecurityParams) grpclog.Printf(" - security params: %v", config.SecurityParams)
if setup.SecurityParams != nil { if config.SecurityParams != nil {
creds, err := credentials.NewServerTLSFromFile(Abs(certFile), Abs(keyFile)) creds, err := credentials.NewServerTLSFromFile(Abs(certFile), Abs(keyFile))
if err != nil { if err != nil {
grpclog.Fatalf("failed to generate credentials %v", err) grpclog.Fatalf("failed to generate credentials %v", err)
@ -90,37 +90,37 @@ func startBenchmarkServerWithSetup(setup *testpb.ServerConfig, serverPort int) (
opts = append(opts, grpc.Creds(creds)) opts = append(opts, grpc.Creds(creds))
} }
grpclog.Printf(" - core limit: %v", setup.CoreLimit) grpclog.Printf(" - core limit: %v", config.CoreLimit)
// Use one cpu core by default. // Use one cpu core by default.
numOfCores := 1 numOfCores := 1
if setup.CoreLimit > 0 { if config.CoreLimit > 0 {
numOfCores = int(setup.CoreLimit) numOfCores = int(config.CoreLimit)
} }
runtime.GOMAXPROCS(numOfCores) runtime.GOMAXPROCS(numOfCores)
grpclog.Printf(" - port: %v", setup.Port) grpclog.Printf(" - port: %v", config.Port)
var port int var port int
// Priority: setup.Port > serverPort > default (0). // Priority: config.Port > serverPort > default (0).
if setup.Port != 0 { if config.Port != 0 {
port = int(setup.Port) port = int(config.Port)
} else if serverPort != 0 { } else if serverPort != 0 {
port = serverPort port = serverPort
} }
grpclog.Printf(" - payload config: %v", setup.PayloadConfig) grpclog.Printf(" - payload config: %v", config.PayloadConfig)
var addr string var addr string
var close func() var close func()
if setup.PayloadConfig != nil { if config.PayloadConfig != nil {
switch payload := setup.PayloadConfig.Payload.(type) { switch payload := config.PayloadConfig.Payload.(type) {
case *testpb.PayloadConfig_BytebufParams: case *testpb.PayloadConfig_BytebufParams:
opts = append(opts, grpc.CustomCodec(byteBufCodec{})) opts = append(opts, grpc.CustomCodec(byteBufCodec{}))
addr, close = benchmark.StartByteBufServer(":"+strconv.Itoa(port), payload.BytebufParams.RespSize, opts...) addr, close = benchmark.StartByteBufServer(":"+strconv.Itoa(port), payload.BytebufParams.RespSize, opts...)
case *testpb.PayloadConfig_SimpleParams: case *testpb.PayloadConfig_SimpleParams:
addr, close = benchmark.StartServer(":"+strconv.Itoa(port), opts...) addr, close = benchmark.StartServer(":"+strconv.Itoa(port), opts...)
case *testpb.PayloadConfig_ComplexParams: case *testpb.PayloadConfig_ComplexParams:
return nil, grpc.Errorf(codes.Unimplemented, "unsupported payload config: %v", setup.PayloadConfig) return nil, grpc.Errorf(codes.Unimplemented, "unsupported payload config: %v", config.PayloadConfig)
default: default:
return nil, grpc.Errorf(codes.InvalidArgument, "unknow payload config: %v", setup.PayloadConfig) return nil, grpc.Errorf(codes.InvalidArgument, "unknow payload config: %v", config.PayloadConfig)
} }
} else { } else {
// Start protobuf server is payload config is nil. // Start protobuf server is payload config is nil.

View File

@ -50,7 +50,7 @@ import (
var ( var (
driverPort = flag.Int("driver_port", 10000, "port for communication with driver") driverPort = flag.Int("driver_port", 10000, "port for communication with driver")
serverPort = flag.Int("server_port", 0, "default port for benchmark server") serverPort = flag.Int("server_port", 0, "port for benchmark server if not specified by server config message")
) )
type byteBufCodec struct { type byteBufCodec struct {
@ -104,7 +104,7 @@ func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) er
switch argtype := in.Argtype.(type) { switch argtype := in.Argtype.(type) {
case *testpb.ServerArgs_Setup: case *testpb.ServerArgs_Setup:
grpclog.Printf("server setup received:") grpclog.Printf("server setup received:")
newbs, err := startBenchmarkServerWithSetup(argtype.Setup, s.serverPort) newbs, err := startBenchmarkServer(argtype.Setup, s.serverPort)
if err != nil { if err != nil {
return err return err
} }
@ -165,7 +165,7 @@ func (s *workerServer) RunClient(stream testpb.WorkerService_RunClientServer) er
switch t := in.Argtype.(type) { switch t := in.Argtype.(type) {
case *testpb.ClientArgs_Setup: case *testpb.ClientArgs_Setup:
grpclog.Printf("client setup received:") grpclog.Printf("client setup received:")
newbc, err := startBenchmarkClientWithSetup(t.Setup) newbc, err := startBenchmarkClient(t.Setup)
if err != nil { if err != nil {
return err return err
} }