Reconstruct startBenchmarkClient

This commit is contained in:
Menghan Li
2016-05-03 16:55:15 -07:00
parent 14d95fc632
commit 47152e8076

View File

@ -60,43 +60,84 @@ type benchmarkClient struct {
histogram *stats.Histogram histogram *stats.Histogram
} }
func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error) { func printClientConfig(config *testpb.ClientConfig) {
var opts []grpc.DialOption
// Some config options are ignored: // Some config options are ignored:
// - client type: // - client type:
// will always create sync client // will always create sync client
// - async client threads. // - async client threads.
// - core list // - core list
grpclog.Printf(" * client type: %v (ignored, always creates sync client)", config.ClientType) grpclog.Printf(" * client type: %v (ignored, always creates sync client)", config.ClientType)
switch config.ClientType {
case testpb.ClientType_SYNC_CLIENT:
case testpb.ClientType_ASYNC_CLIENT:
default:
return nil, grpc.Errorf(codes.InvalidArgument, "unknow client type: %v", config.ClientType)
}
grpclog.Printf(" * async client threads: %v (ignored)", config.AsyncClientThreads) grpclog.Printf(" * async client threads: %v (ignored)", config.AsyncClientThreads)
grpclog.Printf(" * core list: %v (ignored)", config.CoreList) grpclog.Printf(" * core list: %v (ignored)", config.CoreList)
grpclog.Printf(" - security params: %v", config.SecurityParams) grpclog.Printf(" - security params: %v", config.SecurityParams)
grpclog.Printf(" - core limit: %v", config.CoreLimit)
grpclog.Printf(" - payload config: %v", config.PayloadConfig)
grpclog.Printf(" - rpcs per chann: %v", config.OutstandingRpcsPerChannel)
grpclog.Printf(" - channel number: %v", config.ClientChannels)
grpclog.Printf(" - load params: %v", config.LoadParams)
grpclog.Printf(" - rpc type: %v", config.RpcType)
grpclog.Printf(" - histogram params: %v", config.HistogramParams)
grpclog.Printf(" - server targets: %v", config.ServerTargets)
}
func setupClientEnv(config *testpb.ClientConfig) {
// Use one cpu core by default.
// TODO: change default number of cores used if 1 is not fastest.
if config.CoreLimit > 1 {
runtime.GOMAXPROCS(int(config.CoreLimit))
}
}
func createConns(config *testpb.ClientConfig) ([]*grpc.ClientConn, func(), error) {
var opts []grpc.DialOption
// Sanity check for client type.
switch config.ClientType {
case testpb.ClientType_SYNC_CLIENT:
case testpb.ClientType_ASYNC_CLIENT:
default:
return nil, nil, grpc.Errorf(codes.InvalidArgument, "unknow client type: %v", config.ClientType)
}
// Check and set security options.
if config.SecurityParams != nil { if config.SecurityParams != nil {
creds, err := credentials.NewClientTLSFromFile(abs(caFile), config.SecurityParams.ServerHostOverride) creds, err := credentials.NewClientTLSFromFile(abs(caFile), config.SecurityParams.ServerHostOverride)
if err != nil { if err != nil {
return nil, grpc.Errorf(codes.InvalidArgument, "failed to create TLS credentials %v", err) return nil, nil, grpc.Errorf(codes.InvalidArgument, "failed to create TLS credentials %v", err)
} }
opts = append(opts, grpc.WithTransportCredentials(creds)) opts = append(opts, grpc.WithTransportCredentials(creds))
} else { } else {
opts = append(opts, grpc.WithInsecure()) opts = append(opts, grpc.WithInsecure())
} }
grpclog.Printf(" - core limit: %v", config.CoreLimit) // Use byteBufCodec is required.
// Use one cpu core by default. if config.PayloadConfig != nil {
// TODO: change default number of cores used if 1 is not fastest. switch config.PayloadConfig.Payload.(type) {
if config.CoreLimit > 1 { case *testpb.PayloadConfig_BytebufParams:
runtime.GOMAXPROCS(int(config.CoreLimit)) opts = append(opts, grpc.WithCodec(byteBufCodec{}))
case *testpb.PayloadConfig_SimpleParams:
default:
return nil, nil, grpc.Errorf(codes.InvalidArgument, "unknow payload config: %v", config.PayloadConfig)
}
} }
grpclog.Printf(" - payload config: %v", config.PayloadConfig) // Create connestions.
connCount := int(config.ClientChannels)
conns := make([]*grpc.ClientConn, connCount)
for connIndex := 0; connIndex < connCount; connIndex++ {
conns[connIndex] = benchmark.NewClientConn(config.ServerTargets[connIndex%len(config.ServerTargets)], opts...)
}
return conns, func() {
for _, conn := range conns {
conn.Close()
}
}, nil
}
func performRPCs(config *testpb.ClientConfig, conns []*grpc.ClientConn, bc *benchmarkClient) error {
// Read payload size and type from config.
var ( var (
payloadReqSize, payloadRespSize int payloadReqSize, payloadRespSize int
payloadType string payloadType string
@ -104,7 +145,6 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error)
if config.PayloadConfig != nil { if config.PayloadConfig != nil {
switch c := config.PayloadConfig.Payload.(type) { switch c := config.PayloadConfig.Payload.(type) {
case *testpb.PayloadConfig_BytebufParams: case *testpb.PayloadConfig_BytebufParams:
opts = append(opts, grpc.WithCodec(byteBufCodec{}))
payloadReqSize = int(c.BytebufParams.ReqSize) payloadReqSize = int(c.BytebufParams.ReqSize)
payloadRespSize = int(c.BytebufParams.RespSize) payloadRespSize = int(c.BytebufParams.RespSize)
payloadType = "bytebuf" payloadType = "bytebuf"
@ -112,52 +152,45 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error)
payloadReqSize = int(c.SimpleParams.ReqSize) payloadReqSize = int(c.SimpleParams.ReqSize)
payloadRespSize = int(c.SimpleParams.RespSize) payloadRespSize = int(c.SimpleParams.RespSize)
payloadType = "protobuf" payloadType = "protobuf"
case *testpb.PayloadConfig_ComplexParams:
return nil, grpc.Errorf(codes.Unimplemented, "unsupported payload config: %v", config.PayloadConfig)
default: default:
return nil, grpc.Errorf(codes.InvalidArgument, "unknow payload config: %v", config.PayloadConfig) return grpc.Errorf(codes.InvalidArgument, "unknow payload config: %v", config.PayloadConfig)
} }
} }
grpclog.Printf(" - rpcs per chann: %v", config.OutstandingRpcsPerChannel)
grpclog.Printf(" - channel number: %v", config.ClientChannels)
rpcCountPerConn, connCount := int(config.OutstandingRpcsPerChannel), int(config.ClientChannels)
grpclog.Printf(" - load params: %v", config.LoadParams)
// TODO add open loop distribution. // TODO add open loop distribution.
switch config.LoadParams.Load.(type) { switch config.LoadParams.Load.(type) {
case *testpb.LoadParams_ClosedLoop: case *testpb.LoadParams_ClosedLoop:
case *testpb.LoadParams_Poisson: case *testpb.LoadParams_Poisson:
return nil, grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", config.LoadParams) return grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", config.LoadParams)
case *testpb.LoadParams_Uniform:
return nil, grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", config.LoadParams)
case *testpb.LoadParams_Determ:
return nil, grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", config.LoadParams)
case *testpb.LoadParams_Pareto:
return nil, grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", config.LoadParams)
default: default:
return nil, grpc.Errorf(codes.InvalidArgument, "unknown load params: %v", config.LoadParams) return grpc.Errorf(codes.InvalidArgument, "unknown load params: %v", config.LoadParams)
} }
grpclog.Printf(" - rpc type: %v", config.RpcType) rpcCountPerConn := int(config.OutstandingRpcsPerChannel)
var rpcType string
switch config.RpcType { switch config.RpcType {
case testpb.RpcType_UNARY: case testpb.RpcType_UNARY:
rpcType = "unary" bc.doCloseLoopUnary(conns, rpcCountPerConn, payloadReqSize, payloadRespSize)
// TODO open loop.
case testpb.RpcType_STREAMING: case testpb.RpcType_STREAMING:
rpcType = "streaming" bc.doCloseLoopStreaming(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, payloadType)
// TODO open loop.
default: default:
return nil, grpc.Errorf(codes.InvalidArgument, "unknown rpc type: %v", config.RpcType) return grpc.Errorf(codes.InvalidArgument, "unknown rpc type: %v", config.RpcType)
} }
grpclog.Printf(" - histogram params: %v", config.HistogramParams) return nil
grpclog.Printf(" - server targets: %v", config.ServerTargets) }
conns := make([]*grpc.ClientConn, connCount) func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error) {
printClientConfig(config)
for connIndex := 0; connIndex < connCount; connIndex++ { // Set running environment like how many cores to use.
conns[connIndex] = benchmark.NewClientConn(config.ServerTargets[connIndex%len(config.ServerTargets)], opts...) setupClientEnv(config)
conns, closeConns, err := createConns(config)
if err != nil {
return nil, err
} }
bc := benchmarkClient{ bc := benchmarkClient{
@ -171,13 +204,11 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error)
lastResetTime: time.Now(), lastResetTime: time.Now(),
} }
switch rpcType { err = performRPCs(config, conns, &bc)
case "unary": if err != nil {
bc.doCloseLoopUnary(conns, rpcCountPerConn, payloadReqSize, payloadRespSize) // Close all conns if failed to performRPCs.
// TODO open loop. closeConns()
case "streaming": return nil, err
bc.doCloseLoopStreaming(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, payloadType)
// TODO open loop.
} }
return &bc, nil return &bc, nil