From 47152e8076bea20c214cc9bcf98897282c44ff8c Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 3 May 2016 16:55:15 -0700 Subject: [PATCH] Reconstruct startBenchmarkClient --- benchmark/worker/benchmark_client.go | 133 +++++++++++++++++---------- 1 file changed, 82 insertions(+), 51 deletions(-) diff --git a/benchmark/worker/benchmark_client.go b/benchmark/worker/benchmark_client.go index e2f78bb8..a4350661 100644 --- a/benchmark/worker/benchmark_client.go +++ b/benchmark/worker/benchmark_client.go @@ -60,43 +60,84 @@ type benchmarkClient struct { histogram *stats.Histogram } -func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error) { - var opts []grpc.DialOption - +func printClientConfig(config *testpb.ClientConfig) { // Some config options are ignored: // - client type: // will always create sync client // - async client threads. // - core list grpclog.Printf(" * client type: %v (ignored, always creates sync client)", config.ClientType) - switch config.ClientType { - case testpb.ClientType_SYNC_CLIENT: - case testpb.ClientType_ASYNC_CLIENT: - default: - return nil, grpc.Errorf(codes.InvalidArgument, "unknow client type: %v", config.ClientType) - } grpclog.Printf(" * async client threads: %v (ignored)", config.AsyncClientThreads) grpclog.Printf(" * core list: %v (ignored)", config.CoreList) grpclog.Printf(" - security params: %v", config.SecurityParams) + grpclog.Printf(" - core limit: %v", config.CoreLimit) + grpclog.Printf(" - payload config: %v", config.PayloadConfig) + grpclog.Printf(" - rpcs per chann: %v", config.OutstandingRpcsPerChannel) + grpclog.Printf(" - channel number: %v", config.ClientChannels) + grpclog.Printf(" - load params: %v", config.LoadParams) + grpclog.Printf(" - rpc type: %v", config.RpcType) + grpclog.Printf(" - histogram params: %v", config.HistogramParams) + grpclog.Printf(" - server targets: %v", config.ServerTargets) +} + +func setupClientEnv(config *testpb.ClientConfig) { + // Use one cpu core by default. + // TODO: change default number of cores used if 1 is not fastest. + if config.CoreLimit > 1 { + runtime.GOMAXPROCS(int(config.CoreLimit)) + } +} + +func createConns(config *testpb.ClientConfig) ([]*grpc.ClientConn, func(), error) { + var opts []grpc.DialOption + + // Sanity check for client type. + switch config.ClientType { + case testpb.ClientType_SYNC_CLIENT: + case testpb.ClientType_ASYNC_CLIENT: + default: + return nil, nil, grpc.Errorf(codes.InvalidArgument, "unknow client type: %v", config.ClientType) + } + + // Check and set security options. if config.SecurityParams != nil { creds, err := credentials.NewClientTLSFromFile(abs(caFile), config.SecurityParams.ServerHostOverride) if err != nil { - return nil, grpc.Errorf(codes.InvalidArgument, "failed to create TLS credentials %v", err) + return nil, nil, grpc.Errorf(codes.InvalidArgument, "failed to create TLS credentials %v", err) } opts = append(opts, grpc.WithTransportCredentials(creds)) } else { opts = append(opts, grpc.WithInsecure()) } - grpclog.Printf(" - core limit: %v", config.CoreLimit) - // Use one cpu core by default. - // TODO: change default number of cores used if 1 is not fastest. - if config.CoreLimit > 1 { - runtime.GOMAXPROCS(int(config.CoreLimit)) + // Use byteBufCodec is required. + if config.PayloadConfig != nil { + switch config.PayloadConfig.Payload.(type) { + case *testpb.PayloadConfig_BytebufParams: + opts = append(opts, grpc.WithCodec(byteBufCodec{})) + case *testpb.PayloadConfig_SimpleParams: + default: + return nil, nil, grpc.Errorf(codes.InvalidArgument, "unknow payload config: %v", config.PayloadConfig) + } } - grpclog.Printf(" - payload config: %v", config.PayloadConfig) + // Create connestions. + connCount := int(config.ClientChannels) + conns := make([]*grpc.ClientConn, connCount) + for connIndex := 0; connIndex < connCount; connIndex++ { + conns[connIndex] = benchmark.NewClientConn(config.ServerTargets[connIndex%len(config.ServerTargets)], opts...) + } + + return conns, func() { + for _, conn := range conns { + conn.Close() + } + }, nil +} + +func performRPCs(config *testpb.ClientConfig, conns []*grpc.ClientConn, bc *benchmarkClient) error { + // Read payload size and type from config. var ( payloadReqSize, payloadRespSize int payloadType string @@ -104,7 +145,6 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error) if config.PayloadConfig != nil { switch c := config.PayloadConfig.Payload.(type) { case *testpb.PayloadConfig_BytebufParams: - opts = append(opts, grpc.WithCodec(byteBufCodec{})) payloadReqSize = int(c.BytebufParams.ReqSize) payloadRespSize = int(c.BytebufParams.RespSize) payloadType = "bytebuf" @@ -112,52 +152,45 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error) payloadReqSize = int(c.SimpleParams.ReqSize) payloadRespSize = int(c.SimpleParams.RespSize) payloadType = "protobuf" - case *testpb.PayloadConfig_ComplexParams: - return nil, grpc.Errorf(codes.Unimplemented, "unsupported payload config: %v", config.PayloadConfig) default: - return nil, grpc.Errorf(codes.InvalidArgument, "unknow payload config: %v", config.PayloadConfig) + return grpc.Errorf(codes.InvalidArgument, "unknow payload config: %v", config.PayloadConfig) } } - grpclog.Printf(" - rpcs per chann: %v", config.OutstandingRpcsPerChannel) - grpclog.Printf(" - channel number: %v", config.ClientChannels) - - rpcCountPerConn, connCount := int(config.OutstandingRpcsPerChannel), int(config.ClientChannels) - - grpclog.Printf(" - load params: %v", config.LoadParams) // TODO add open loop distribution. switch config.LoadParams.Load.(type) { case *testpb.LoadParams_ClosedLoop: case *testpb.LoadParams_Poisson: - return nil, grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", config.LoadParams) - case *testpb.LoadParams_Uniform: - return nil, grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", config.LoadParams) - case *testpb.LoadParams_Determ: - return nil, grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", config.LoadParams) - case *testpb.LoadParams_Pareto: - return nil, grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", config.LoadParams) + return grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", config.LoadParams) default: - return nil, grpc.Errorf(codes.InvalidArgument, "unknown load params: %v", config.LoadParams) + return grpc.Errorf(codes.InvalidArgument, "unknown load params: %v", config.LoadParams) } - grpclog.Printf(" - rpc type: %v", config.RpcType) - var rpcType string + rpcCountPerConn := int(config.OutstandingRpcsPerChannel) + switch config.RpcType { case testpb.RpcType_UNARY: - rpcType = "unary" + bc.doCloseLoopUnary(conns, rpcCountPerConn, payloadReqSize, payloadRespSize) + // TODO open loop. case testpb.RpcType_STREAMING: - rpcType = "streaming" + bc.doCloseLoopStreaming(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, payloadType) + // TODO open loop. default: - return nil, grpc.Errorf(codes.InvalidArgument, "unknown rpc type: %v", config.RpcType) + return grpc.Errorf(codes.InvalidArgument, "unknown rpc type: %v", config.RpcType) } - grpclog.Printf(" - histogram params: %v", config.HistogramParams) - grpclog.Printf(" - server targets: %v", config.ServerTargets) + return nil +} - conns := make([]*grpc.ClientConn, connCount) +func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error) { + printClientConfig(config) - for connIndex := 0; connIndex < connCount; connIndex++ { - conns[connIndex] = benchmark.NewClientConn(config.ServerTargets[connIndex%len(config.ServerTargets)], opts...) + // Set running environment like how many cores to use. + setupClientEnv(config) + + conns, closeConns, err := createConns(config) + if err != nil { + return nil, err } bc := benchmarkClient{ @@ -171,13 +204,11 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error) lastResetTime: time.Now(), } - switch rpcType { - case "unary": - bc.doCloseLoopUnary(conns, rpcCountPerConn, payloadReqSize, payloadRespSize) - // TODO open loop. - case "streaming": - bc.doCloseLoopStreaming(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, payloadType) - // TODO open loop. + err = performRPCs(config, conns, &bc) + if err != nil { + // Close all conns if failed to performRPCs. + closeConns() + return nil, err } return &bc, nil