From 3a13913bbac6ba6aad589b16c615124b55eeebec Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Thu, 28 Apr 2016 16:16:54 -0700 Subject: [PATCH] Address review comments Change startBenchmarkClient and startBenchmarkServer --- benchmark/worker/benchmark_client.go | 74 ++++++++++++++-------------- benchmark/worker/benchmark_server.go | 42 ++++++++-------- benchmark/worker/main.go | 6 +-- 3 files changed, 61 insertions(+), 61 deletions(-) diff --git a/benchmark/worker/benchmark_client.go b/benchmark/worker/benchmark_client.go index a5b1dce5..facb9e7a 100644 --- a/benchmark/worker/benchmark_client.go +++ b/benchmark/worker/benchmark_client.go @@ -60,27 +60,27 @@ type benchmarkClient struct { histogram *stats.Histogram } -func startBenchmarkClientWithSetup(setup *testpb.ClientConfig) (*benchmarkClient, error) { +func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error) { var opts []grpc.DialOption - // Some setup options are ignored: + // Some config options are ignored: // - client type: // will always create sync client // - async client threads. // - core list - grpclog.Printf(" * client type: %v (ignored, always creates sync client)", setup.ClientType) - switch setup.ClientType { + grpclog.Printf(" * client type: %v (ignored, always creates sync client)", config.ClientType) + switch config.ClientType { case testpb.ClientType_SYNC_CLIENT: case testpb.ClientType_ASYNC_CLIENT: default: - return nil, grpc.Errorf(codes.InvalidArgument, "unknow client type: %v", setup.ClientType) + return nil, grpc.Errorf(codes.InvalidArgument, "unknow client type: %v", config.ClientType) } - grpclog.Printf(" * async client threads: %v (ignored)", setup.AsyncClientThreads) - grpclog.Printf(" * core list: %v (ignored)", setup.CoreList) + grpclog.Printf(" * async client threads: %v (ignored)", config.AsyncClientThreads) + grpclog.Printf(" * core list: %v (ignored)", config.CoreList) - grpclog.Printf(" - security params: %v", setup.SecurityParams) - if setup.SecurityParams != nil { - creds, err := credentials.NewClientTLSFromFile(Abs(caFile), setup.SecurityParams.ServerHostOverride) + grpclog.Printf(" - security params: %v", config.SecurityParams) + if config.SecurityParams != nil { + creds, err := credentials.NewClientTLSFromFile(Abs(caFile), config.SecurityParams.ServerHostOverride) if err != nil { return nil, grpc.Errorf(codes.InvalidArgument, "failed to create TLS credentials %v", err) } @@ -89,19 +89,19 @@ func startBenchmarkClientWithSetup(setup *testpb.ClientConfig) (*benchmarkClient opts = append(opts, grpc.WithInsecure()) } - grpclog.Printf(" - core limit: %v", setup.CoreLimit) + grpclog.Printf(" - core limit: %v", config.CoreLimit) // Use one cpu core by default numOfCores := 1 - if setup.CoreLimit > 0 { - numOfCores = int(setup.CoreLimit) + if config.CoreLimit > 0 { + numOfCores = int(config.CoreLimit) } runtime.GOMAXPROCS(numOfCores) - grpclog.Printf(" - payload config: %v", setup.PayloadConfig) + grpclog.Printf(" - payload config: %v", config.PayloadConfig) var payloadReqSize, payloadRespSize int var payloadType string - if setup.PayloadConfig != nil { - switch c := setup.PayloadConfig.Payload.(type) { + if config.PayloadConfig != nil { + switch c := config.PayloadConfig.Payload.(type) { case *testpb.PayloadConfig_BytebufParams: opts = append(opts, grpc.WithCodec(byteBufCodec{})) payloadReqSize = int(c.BytebufParams.ReqSize) @@ -112,60 +112,60 @@ func startBenchmarkClientWithSetup(setup *testpb.ClientConfig) (*benchmarkClient payloadRespSize = int(c.SimpleParams.RespSize) payloadType = "protobuf" case *testpb.PayloadConfig_ComplexParams: - return nil, grpc.Errorf(codes.Unimplemented, "unsupported payload config: %v", setup.PayloadConfig) + return nil, grpc.Errorf(codes.Unimplemented, "unsupported payload config: %v", config.PayloadConfig) default: - return nil, grpc.Errorf(codes.InvalidArgument, "unknow payload config: %v", setup.PayloadConfig) + return nil, grpc.Errorf(codes.InvalidArgument, "unknow payload config: %v", config.PayloadConfig) } } - grpclog.Printf(" - rpcs per chann: %v", setup.OutstandingRpcsPerChannel) - grpclog.Printf(" - channel number: %v", setup.ClientChannels) + grpclog.Printf(" - rpcs per chann: %v", config.OutstandingRpcsPerChannel) + grpclog.Printf(" - channel number: %v", config.ClientChannels) - rpcCountPerConn, connCount := int(setup.OutstandingRpcsPerChannel), int(setup.ClientChannels) + rpcCountPerConn, connCount := int(config.OutstandingRpcsPerChannel), int(config.ClientChannels) - grpclog.Printf(" - load params: %v", setup.LoadParams) + grpclog.Printf(" - load params: %v", config.LoadParams) var dist *int - switch lp := setup.LoadParams.Load.(type) { + switch lp := config.LoadParams.Load.(type) { case *testpb.LoadParams_ClosedLoop: case *testpb.LoadParams_Poisson: grpclog.Printf(" - %v", lp.Poisson) - return nil, grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", setup.LoadParams) + return nil, grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", config.LoadParams) // TODO poisson case *testpb.LoadParams_Uniform: - return nil, grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", setup.LoadParams) + return nil, grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", config.LoadParams) case *testpb.LoadParams_Determ: - return nil, grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", setup.LoadParams) + return nil, grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", config.LoadParams) case *testpb.LoadParams_Pareto: - return nil, grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", setup.LoadParams) + return nil, grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", config.LoadParams) default: - return nil, grpc.Errorf(codes.InvalidArgument, "unknown load params: %v", setup.LoadParams) + return nil, grpc.Errorf(codes.InvalidArgument, "unknown load params: %v", config.LoadParams) } - grpclog.Printf(" - rpc type: %v", setup.RpcType) + grpclog.Printf(" - rpc type: %v", config.RpcType) var rpcType string - switch setup.RpcType { + switch config.RpcType { case testpb.RpcType_UNARY: rpcType = "unary" case testpb.RpcType_STREAMING: rpcType = "streaming" default: - return nil, grpc.Errorf(codes.InvalidArgument, "unknown rpc type: %v", setup.RpcType) + return nil, grpc.Errorf(codes.InvalidArgument, "unknown rpc type: %v", config.RpcType) } - grpclog.Printf(" - histogram params: %v", setup.HistogramParams) - grpclog.Printf(" - server targets: %v", setup.ServerTargets) + grpclog.Printf(" - histogram params: %v", config.HistogramParams) + grpclog.Printf(" - server targets: %v", config.ServerTargets) conns := make([]*grpc.ClientConn, connCount) for connIndex := 0; connIndex < connCount; connIndex++ { - conns[connIndex] = benchmark.NewClientConn(setup.ServerTargets[connIndex%len(setup.ServerTargets)], opts...) + conns[connIndex] = benchmark.NewClientConn(config.ServerTargets[connIndex%len(config.ServerTargets)], opts...) } bc := benchmarkClient{ histogram: stats.NewHistogram(stats.HistogramOptions{ - NumBuckets: int(math.Log(setup.HistogramParams.MaxPossible)/math.Log(1+setup.HistogramParams.Resolution)) + 1, - GrowthFactor: setup.HistogramParams.Resolution, - BaseBucketSize: (1 + setup.HistogramParams.Resolution), + NumBuckets: int(math.Log(config.HistogramParams.MaxPossible)/math.Log(1+config.HistogramParams.Resolution)) + 1, + GrowthFactor: config.HistogramParams.Resolution, + BaseBucketSize: (1 + config.HistogramParams.Resolution), MinValue: 0, }), stop: make(chan bool), diff --git a/benchmark/worker/benchmark_server.go b/benchmark/worker/benchmark_server.go index 5e2fd7b3..de937c0c 100644 --- a/benchmark/worker/benchmark_server.go +++ b/benchmark/worker/benchmark_server.go @@ -62,27 +62,27 @@ type benchmarkServer struct { lastResetTime time.Time } -func startBenchmarkServerWithSetup(setup *testpb.ServerConfig, serverPort int) (*benchmarkServer, error) { +func startBenchmarkServer(config *testpb.ServerConfig, serverPort int) (*benchmarkServer, error) { var opts []grpc.ServerOption - // Some setup options are ignored: + // Some config options are ignored: // - server type: // will always start sync server // - async server threads // - core list - grpclog.Printf(" * server type: %v (ignored, always starts sync server)", setup.ServerType) - switch setup.ServerType { + grpclog.Printf(" * server type: %v (ignored, always starts sync server)", config.ServerType) + switch config.ServerType { case testpb.ServerType_SYNC_SERVER: case testpb.ServerType_ASYNC_SERVER: case testpb.ServerType_ASYNC_GENERIC_SERVER: default: - return nil, grpc.Errorf(codes.InvalidArgument, "unknow server type: %v", setup.ServerType) + return nil, grpc.Errorf(codes.InvalidArgument, "unknow server type: %v", config.ServerType) } - grpclog.Printf(" * async server threads: %v (ignored)", setup.AsyncServerThreads) - grpclog.Printf(" * core list: %v (ignored)", setup.CoreList) + grpclog.Printf(" * async server threads: %v (ignored)", config.AsyncServerThreads) + grpclog.Printf(" * core list: %v (ignored)", config.CoreList) - grpclog.Printf(" - security params: %v", setup.SecurityParams) - if setup.SecurityParams != nil { + grpclog.Printf(" - security params: %v", config.SecurityParams) + if config.SecurityParams != nil { creds, err := credentials.NewServerTLSFromFile(Abs(certFile), Abs(keyFile)) if err != nil { grpclog.Fatalf("failed to generate credentials %v", err) @@ -90,37 +90,37 @@ func startBenchmarkServerWithSetup(setup *testpb.ServerConfig, serverPort int) ( opts = append(opts, grpc.Creds(creds)) } - grpclog.Printf(" - core limit: %v", setup.CoreLimit) + grpclog.Printf(" - core limit: %v", config.CoreLimit) // Use one cpu core by default. numOfCores := 1 - if setup.CoreLimit > 0 { - numOfCores = int(setup.CoreLimit) + if config.CoreLimit > 0 { + numOfCores = int(config.CoreLimit) } runtime.GOMAXPROCS(numOfCores) - grpclog.Printf(" - port: %v", setup.Port) + grpclog.Printf(" - port: %v", config.Port) var port int - // Priority: setup.Port > serverPort > default (0). - if setup.Port != 0 { - port = int(setup.Port) + // Priority: config.Port > serverPort > default (0). + if config.Port != 0 { + port = int(config.Port) } else if serverPort != 0 { port = serverPort } - grpclog.Printf(" - payload config: %v", setup.PayloadConfig) + grpclog.Printf(" - payload config: %v", config.PayloadConfig) var addr string var close func() - if setup.PayloadConfig != nil { - switch payload := setup.PayloadConfig.Payload.(type) { + if config.PayloadConfig != nil { + switch payload := config.PayloadConfig.Payload.(type) { case *testpb.PayloadConfig_BytebufParams: opts = append(opts, grpc.CustomCodec(byteBufCodec{})) addr, close = benchmark.StartByteBufServer(":"+strconv.Itoa(port), payload.BytebufParams.RespSize, opts...) case *testpb.PayloadConfig_SimpleParams: addr, close = benchmark.StartServer(":"+strconv.Itoa(port), opts...) case *testpb.PayloadConfig_ComplexParams: - return nil, grpc.Errorf(codes.Unimplemented, "unsupported payload config: %v", setup.PayloadConfig) + return nil, grpc.Errorf(codes.Unimplemented, "unsupported payload config: %v", config.PayloadConfig) default: - return nil, grpc.Errorf(codes.InvalidArgument, "unknow payload config: %v", setup.PayloadConfig) + return nil, grpc.Errorf(codes.InvalidArgument, "unknow payload config: %v", config.PayloadConfig) } } else { // Start protobuf server is payload config is nil. diff --git a/benchmark/worker/main.go b/benchmark/worker/main.go index 72ed63d4..e16a6304 100644 --- a/benchmark/worker/main.go +++ b/benchmark/worker/main.go @@ -50,7 +50,7 @@ import ( var ( driverPort = flag.Int("driver_port", 10000, "port for communication with driver") - serverPort = flag.Int("server_port", 0, "default port for benchmark server") + serverPort = flag.Int("server_port", 0, "port for benchmark server if not specified by server config message") ) type byteBufCodec struct { @@ -104,7 +104,7 @@ func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) er switch argtype := in.Argtype.(type) { case *testpb.ServerArgs_Setup: grpclog.Printf("server setup received:") - newbs, err := startBenchmarkServerWithSetup(argtype.Setup, s.serverPort) + newbs, err := startBenchmarkServer(argtype.Setup, s.serverPort) if err != nil { return err } @@ -165,7 +165,7 @@ func (s *workerServer) RunClient(stream testpb.WorkerService_RunClientServer) er switch t := in.Argtype.(type) { case *testpb.ClientArgs_Setup: grpclog.Printf("client setup received:") - newbc, err := startBenchmarkClientWithSetup(t.Setup) + newbc, err := startBenchmarkClient(t.Setup) if err != nil { return err }