Add RunClient()

This commit is contained in:
Menghan Li
2016-04-18 14:49:05 -07:00
parent c2e8421003
commit 2aaff82a6e
7 changed files with 298 additions and 12 deletions

View File

@ -161,7 +161,7 @@ func DoUnaryCall(tc testpb.BenchmarkServiceClient, reqSize, respSize int) {
}
// DoStreamingRoundTrip performs a round trip for a single streaming rpc.
func DoStreamingRoundTrip(tc testpb.BenchmarkServiceClient, stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) {
func DoStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) {
pl := newPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
req := &testpb.SimpleRequest{
ResponseType: pl.Type,
@ -177,8 +177,11 @@ func DoStreamingRoundTrip(tc testpb.BenchmarkServiceClient, stream testpb.Benchm
}
// NewClientConn creates a gRPC client connection to addr.
func NewClientConn(addr string) *grpc.ClientConn {
conn, err := grpc.Dial(addr, grpc.WithInsecure())
func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn {
if len(opts) <= 0 {
opts = append(opts, grpc.WithInsecure())
}
conn, err := grpc.Dial(addr, opts...)
if err != nil {
grpclog.Fatalf("NewClientConn(%q) failed to create a ClientConn %v", addr, err)
}

View File

@ -70,7 +70,7 @@ func runStream(b *testing.B, maxConcurrentCalls int) {
b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
}
for i := 0; i < 10; i++ {
streamCaller(tc, stream)
streamCaller(stream)
}
ch := make(chan int, maxConcurrentCalls*4)
@ -89,7 +89,7 @@ func runStream(b *testing.B, maxConcurrentCalls int) {
}
for range ch {
start := time.Now()
streamCaller(tc, stream)
streamCaller(stream)
elapse := time.Since(start)
mu.Lock()
s.Add(elapse)
@ -111,8 +111,8 @@ func unaryCaller(client testpb.BenchmarkServiceClient) {
DoUnaryCall(client, 1, 1)
}
func streamCaller(client testpb.BenchmarkServiceClient, stream testpb.BenchmarkService_StreamingCallClient) {
DoStreamingRoundTrip(client, stream, 1, 1)
func streamCaller(stream testpb.BenchmarkService_StreamingCallClient) {
DoStreamingRoundTrip(stream, 1, 1)
}
func BenchmarkClientStreamc1(b *testing.B) {

View File

@ -32,8 +32,8 @@ func unaryCaller(client testpb.BenchmarkServiceClient) {
benchmark.DoUnaryCall(client, 1, 1)
}
func streamCaller(client testpb.BenchmarkServiceClient, stream testpb.BenchmarkService_StreamingCallClient) {
benchmark.DoStreamingRoundTrip(client, stream, 1, 1)
func streamCaller(stream testpb.BenchmarkService_StreamingCallClient) {
benchmark.DoStreamingRoundTrip(stream, 1, 1)
}
func buildConnection() (s *stats.Stats, conn *grpc.ClientConn, tc testpb.BenchmarkServiceClient) {
@ -107,11 +107,11 @@ func closeLoopStream() {
}
// Do some warm up.
for i := 0; i < 100; i++ {
streamCaller(tc, stream)
streamCaller(stream)
}
for range ch {
start := time.Now()
streamCaller(tc, stream)
streamCaller(stream)
elapse := time.Since(start)
mu.Lock()
s.Add(elapse)

15
benchmark/server/testdata/ca.pem vendored Normal file
View File

@ -0,0 +1,15 @@
-----BEGIN CERTIFICATE-----
MIICSjCCAbOgAwIBAgIJAJHGGR4dGioHMA0GCSqGSIb3DQEBCwUAMFYxCzAJBgNV
BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQxDzANBgNVBAMTBnRlc3RjYTAeFw0xNDExMTEyMjMxMjla
Fw0yNDExMDgyMjMxMjlaMFYxCzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0
YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxDzANBgNVBAMT
BnRlc3RjYTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAwEDfBV5MYdlHVHJ7
+L4nxrZy7mBfAVXpOc5vMYztssUI7mL2/iYujiIXM+weZYNTEpLdjyJdu7R5gGUu
g1jSVK/EPHfc74O7AyZU34PNIP4Sh33N+/A5YexrNgJlPY+E3GdVYi4ldWJjgkAd
Qah2PH5ACLrIIC6tRka9hcaBlIECAwEAAaMgMB4wDAYDVR0TBAUwAwEB/zAOBgNV
HQ8BAf8EBAMCAgQwDQYJKoZIhvcNAQELBQADgYEAHzC7jdYlzAVmddi/gdAeKPau
sPBG/C2HCWqHzpCUHcKuvMzDVkY/MP2o6JIW2DBbY64bO/FceExhjcykgaYtCH/m
oIU63+CFOTtR7otyQAWHqXa7q4SbCDlG7DyRFxqG0txPtGvy12lgldA2+RgcigQG
Dfcog5wrJytaQ6UA0wE=
-----END CERTIFICATE-----

View File

@ -145,6 +145,15 @@ func NewHistogram(opts HistogramOptions) *Histogram {
return &h
}
func (h *Histogram) Clear() {
h.count = newCounter()
h.sum = newCounter()
h.tracker = newTracker()
for _, v := range h.buckets {
v.count = newCounter()
}
}
// Opts returns a copy of the options used to create the Histogram.
func (h *Histogram) Opts() HistogramOptions {
return h.opts

View File

@ -0,0 +1,214 @@
package main
import (
"math"
"runtime"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/benchmark"
testpb "google.golang.org/grpc/benchmark/grpc_testing"
"google.golang.org/grpc/benchmark/stats"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
)
var (
caFile = "/usr/local/google/home/menghanl/go/src/google.golang.org/grpc/benchmark/server/testdata/ca.pem"
)
type benchmarkClient struct {
conns []*grpc.ClientConn
histogramGrowFactor float64
histogramMaxPossible float64
stop chan bool
mu sync.RWMutex
lastResetTime time.Time
histogram *stats.Histogram
}
func startBenchmarkClientWithSetup(setup *testpb.ClientConfig) (*benchmarkClient, error) {
var opts []grpc.DialOption
grpclog.Printf(" - client type: %v", setup.ClientType)
switch setup.ClientType {
// Ignore client type
case testpb.ClientType_SYNC_CLIENT:
case testpb.ClientType_ASYNC_CLIENT:
default:
return nil, grpc.Errorf(codes.InvalidArgument, "unknow client type: %v", setup.ClientType)
}
grpclog.Printf(" - security params: %v", setup.SecurityParams)
if setup.SecurityParams != nil {
creds, err := credentials.NewClientTLSFromFile(caFile, setup.SecurityParams.ServerHostOverride)
if err != nil {
grpclog.Fatalf("failed to create TLS credentials %v", err)
}
opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
opts = append(opts, grpc.WithInsecure())
}
// Ignore async client threads.
grpclog.Printf(" - core limit: %v", setup.CoreLimit)
if setup.CoreLimit > 0 {
runtime.GOMAXPROCS(int(setup.CoreLimit))
} else {
// runtime.GOMAXPROCS(runtime.NumCPU())
runtime.GOMAXPROCS(1)
}
// TODO payload config
grpclog.Printf(" - payload config: %v", setup.PayloadConfig)
var payloadReqSize, payloadRespSize int
if setup.PayloadConfig != nil {
// TODO payload config
grpclog.Printf("payload config: %v", setup.PayloadConfig)
switch c := setup.PayloadConfig.Payload.(type) {
case *testpb.PayloadConfig_BytebufParams:
opts = append(opts, grpc.WithCodec(byteBufCodec{}))
payloadReqSize = int(c.BytebufParams.ReqSize)
payloadRespSize = int(c.BytebufParams.RespSize)
case *testpb.PayloadConfig_SimpleParams:
payloadReqSize = int(c.SimpleParams.ReqSize)
payloadRespSize = int(c.SimpleParams.RespSize)
case *testpb.PayloadConfig_ComplexParams:
default:
return nil, grpc.Errorf(codes.InvalidArgument, "unknow payload config: %v", setup.PayloadConfig)
}
}
// TODO core list
grpclog.Printf(" - core list: %v", setup.CoreList)
grpclog.Printf(" - histogram params: %v", setup.HistogramParams)
grpclog.Printf(" - server targets: %v", setup.ServerTargets)
grpclog.Printf(" - rpcs per chann: %v", setup.OutstandingRpcsPerChannel)
grpclog.Printf(" - channel number: %v", setup.ClientChannels)
rpcCount, connCount := int(setup.OutstandingRpcsPerChannel), int(setup.ClientChannels)
bc := &benchmarkClient{
conns: make([]*grpc.ClientConn, connCount),
histogramGrowFactor: setup.HistogramParams.Resolution,
histogramMaxPossible: setup.HistogramParams.MaxPossible,
}
for connIndex := 0; connIndex < connCount; connIndex++ {
bc.conns[connIndex] = benchmark.NewClientConn(setup.ServerTargets[connIndex%len(setup.ServerTargets)], opts...)
}
bc.histogram = stats.NewHistogram(stats.HistogramOptions{
NumBuckets: int(math.Log(bc.histogramMaxPossible)/math.Log(1+bc.histogramGrowFactor)) + 1,
GrowthFactor: bc.histogramGrowFactor,
MinValue: 0,
})
grpclog.Printf(" - rpc type: %v", setup.RpcType)
var rpc func(testpb.BenchmarkServiceClient)
switch setup.RpcType {
case testpb.RpcType_UNARY:
rpc = func(client testpb.BenchmarkServiceClient) {
benchmark.DoUnaryCall(client, payloadReqSize, payloadRespSize)
}
case testpb.RpcType_STREAMING:
default:
return nil, grpc.Errorf(codes.InvalidArgument, "unknown rpc type: %v", setup.RpcType)
}
grpclog.Printf(" - load params: %v", setup.LoadParams)
bc.stop = make(chan bool)
switch lp := setup.LoadParams.Load.(type) {
case *testpb.LoadParams_ClosedLoop:
grpclog.Printf(" - %v", lp.ClosedLoop)
doCloseLoop(bc.histogram, bc.conns, rpcCount, rpc, bc.stop)
case *testpb.LoadParams_Poisson:
grpclog.Printf(" - %v", lp.Poisson)
case *testpb.LoadParams_Uniform:
grpclog.Printf(" - %v", lp.Uniform)
case *testpb.LoadParams_Determ:
grpclog.Printf(" - %v", lp.Determ)
case *testpb.LoadParams_Pareto:
grpclog.Printf(" - %v", lp.Pareto)
default:
return nil, grpc.Errorf(codes.InvalidArgument, "unknown load params: %v", setup.LoadParams)
}
bc.mu.Lock()
defer bc.mu.Unlock()
bc.lastResetTime = time.Now()
return bc, nil
}
func doCloseLoop(h *stats.Histogram, conns []*grpc.ClientConn, rpcCount int, rpc func(testpb.BenchmarkServiceClient), stop <-chan bool) {
clients := make([]testpb.BenchmarkServiceClient, len(conns))
for ic, conn := range conns {
clients[ic] = testpb.NewBenchmarkServiceClient(conn)
for j := 0; j < 100/len(conns); j++ {
rpc(clients[ic])
}
}
var mu sync.Mutex
for ic, _ := range conns {
for j := 0; j < rpcCount; j++ {
go func() {
for {
select {
case <-stop:
grpclog.Printf("stopped")
return
default:
start := time.Now()
rpc(clients[ic])
elapse := time.Since(start)
go func() {
mu.Lock()
h.Add(int64(elapse / time.Nanosecond))
mu.Unlock()
}()
}
}
}()
}
}
grpclog.Printf("close loop done, count: %v", rpcCount)
}
func (bc *benchmarkClient) getStats() *testpb.ClientStats {
bc.mu.RLock()
// time.Sleep(1 * time.Second)
defer bc.mu.RUnlock()
histogramValue := bc.histogram.Value()
b := make([]uint32, len(histogramValue.Buckets))
tempCount := make(map[int64]int)
for i, v := range histogramValue.Buckets {
b[i] = uint32(v.Count)
tempCount[v.Count] += 1
}
grpclog.Printf("+++++\n%v count: %v\n+++++", tempCount, histogramValue.Count)
return &testpb.ClientStats{
Latencies: &testpb.HistogramData{
Bucket: b,
MinSeen: float64(histogramValue.Min),
MaxSeen: float64(histogramValue.Max),
Sum: float64(histogramValue.Sum),
// TODO change to squares
SumOfSquares: float64(histogramValue.Sum),
Count: float64(histogramValue.Count),
},
TimeElapsed: time.Since(bc.lastResetTime).Seconds(),
TimeUser: 0,
TimeSystem: 0,
}
}
func (bc *benchmarkClient) reset() {
bc.mu.Lock()
defer bc.mu.Unlock()
bc.lastResetTime = time.Now()
bc.histogram.Clear()
}

View File

@ -71,6 +71,7 @@ func (byteBufCodec) String() string {
type workerServer struct {
stop chan<- bool
serverPort int
bc *benchmarkClient
}
func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) error {
@ -128,7 +129,48 @@ func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) er
}
func (s *workerServer) RunClient(stream testpb.WorkerService_RunClientServer) error {
return grpc.Errorf(codes.Unimplemented, "RunClient not implemented")
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
var out *testpb.ClientStatus
switch t := in.Argtype.(type) {
case *testpb.ClientArgs_Setup:
grpclog.Printf("client setup received:")
bc, err := startBenchmarkClientWithSetup(t.Setup)
if err != nil {
return err
}
s.bc = bc
out = &testpb.ClientStatus{
Stats: s.bc.getStats(),
}
case *testpb.ClientArgs_Mark:
grpclog.Printf("client mark received:")
grpclog.Printf(" - %v", t)
if s.bc == nil {
return grpc.Errorf(codes.InvalidArgument, "client does not exist when mark received")
}
out = &testpb.ClientStatus{
Stats: s.bc.getStats(),
}
if t.Mark.Reset_ {
s.bc.reset()
}
}
if err := stream.Send(out); err != nil {
return err
}
}
return nil
}
func (s *workerServer) CoreCount(ctx context.Context, in *testpb.CoreRequest) (*testpb.CoreResponse, error) {
@ -139,6 +181,9 @@ func (s *workerServer) CoreCount(ctx context.Context, in *testpb.CoreRequest) (*
func (s *workerServer) QuitWorker(ctx context.Context, in *testpb.Void) (*testpb.Void, error) {
grpclog.Printf("quiting worker")
defer func() { s.stop <- true }()
if s.bc != nil {
close(s.bc.stop)
}
return &testpb.Void{}, nil
}