From 2aaff82a6ecfbb37a2593245df8c5144bb24d745 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Mon, 18 Apr 2016 14:49:05 -0700 Subject: [PATCH] Add RunClient() --- benchmark/benchmark.go | 9 +- benchmark/benchmark_test.go | 8 +- benchmark/client/main.go | 8 +- benchmark/server/testdata/ca.pem | 15 ++ benchmark/stats/histogram.go | 9 ++ benchmark/worker/benchmark_client.go | 214 +++++++++++++++++++++++++++ benchmark/worker/main.go | 47 +++++- 7 files changed, 298 insertions(+), 12 deletions(-) create mode 100644 benchmark/server/testdata/ca.pem create mode 100644 benchmark/worker/benchmark_client.go diff --git a/benchmark/benchmark.go b/benchmark/benchmark.go index 514c0dd9..1a2e0cc7 100644 --- a/benchmark/benchmark.go +++ b/benchmark/benchmark.go @@ -161,7 +161,7 @@ func DoUnaryCall(tc testpb.BenchmarkServiceClient, reqSize, respSize int) { } // DoStreamingRoundTrip performs a round trip for a single streaming rpc. -func DoStreamingRoundTrip(tc testpb.BenchmarkServiceClient, stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) { +func DoStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) { pl := newPayload(testpb.PayloadType_COMPRESSABLE, reqSize) req := &testpb.SimpleRequest{ ResponseType: pl.Type, @@ -177,8 +177,11 @@ func DoStreamingRoundTrip(tc testpb.BenchmarkServiceClient, stream testpb.Benchm } // NewClientConn creates a gRPC client connection to addr. -func NewClientConn(addr string) *grpc.ClientConn { - conn, err := grpc.Dial(addr, grpc.WithInsecure()) +func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn { + if len(opts) <= 0 { + opts = append(opts, grpc.WithInsecure()) + } + conn, err := grpc.Dial(addr, opts...) if err != nil { grpclog.Fatalf("NewClientConn(%q) failed to create a ClientConn %v", addr, err) } diff --git a/benchmark/benchmark_test.go b/benchmark/benchmark_test.go index e8eaa9bb..8057c064 100644 --- a/benchmark/benchmark_test.go +++ b/benchmark/benchmark_test.go @@ -70,7 +70,7 @@ func runStream(b *testing.B, maxConcurrentCalls int) { b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) } for i := 0; i < 10; i++ { - streamCaller(tc, stream) + streamCaller(stream) } ch := make(chan int, maxConcurrentCalls*4) @@ -89,7 +89,7 @@ func runStream(b *testing.B, maxConcurrentCalls int) { } for range ch { start := time.Now() - streamCaller(tc, stream) + streamCaller(stream) elapse := time.Since(start) mu.Lock() s.Add(elapse) @@ -111,8 +111,8 @@ func unaryCaller(client testpb.BenchmarkServiceClient) { DoUnaryCall(client, 1, 1) } -func streamCaller(client testpb.BenchmarkServiceClient, stream testpb.BenchmarkService_StreamingCallClient) { - DoStreamingRoundTrip(client, stream, 1, 1) +func streamCaller(stream testpb.BenchmarkService_StreamingCallClient) { + DoStreamingRoundTrip(stream, 1, 1) } func BenchmarkClientStreamc1(b *testing.B) { diff --git a/benchmark/client/main.go b/benchmark/client/main.go index 27cc1a8b..5dfbe6a3 100644 --- a/benchmark/client/main.go +++ b/benchmark/client/main.go @@ -32,8 +32,8 @@ func unaryCaller(client testpb.BenchmarkServiceClient) { benchmark.DoUnaryCall(client, 1, 1) } -func streamCaller(client testpb.BenchmarkServiceClient, stream testpb.BenchmarkService_StreamingCallClient) { - benchmark.DoStreamingRoundTrip(client, stream, 1, 1) +func streamCaller(stream testpb.BenchmarkService_StreamingCallClient) { + benchmark.DoStreamingRoundTrip(stream, 1, 1) } func buildConnection() (s *stats.Stats, conn *grpc.ClientConn, tc testpb.BenchmarkServiceClient) { @@ -107,11 +107,11 @@ func closeLoopStream() { } // Do some warm up. for i := 0; i < 100; i++ { - streamCaller(tc, stream) + streamCaller(stream) } for range ch { start := time.Now() - streamCaller(tc, stream) + streamCaller(stream) elapse := time.Since(start) mu.Lock() s.Add(elapse) diff --git a/benchmark/server/testdata/ca.pem b/benchmark/server/testdata/ca.pem new file mode 100644 index 00000000..6c8511a7 --- /dev/null +++ b/benchmark/server/testdata/ca.pem @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE----- +MIICSjCCAbOgAwIBAgIJAJHGGR4dGioHMA0GCSqGSIb3DQEBCwUAMFYxCzAJBgNV +BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX +aWRnaXRzIFB0eSBMdGQxDzANBgNVBAMTBnRlc3RjYTAeFw0xNDExMTEyMjMxMjla +Fw0yNDExMDgyMjMxMjlaMFYxCzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0 +YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxDzANBgNVBAMT +BnRlc3RjYTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAwEDfBV5MYdlHVHJ7 ++L4nxrZy7mBfAVXpOc5vMYztssUI7mL2/iYujiIXM+weZYNTEpLdjyJdu7R5gGUu +g1jSVK/EPHfc74O7AyZU34PNIP4Sh33N+/A5YexrNgJlPY+E3GdVYi4ldWJjgkAd +Qah2PH5ACLrIIC6tRka9hcaBlIECAwEAAaMgMB4wDAYDVR0TBAUwAwEB/zAOBgNV +HQ8BAf8EBAMCAgQwDQYJKoZIhvcNAQELBQADgYEAHzC7jdYlzAVmddi/gdAeKPau +sPBG/C2HCWqHzpCUHcKuvMzDVkY/MP2o6JIW2DBbY64bO/FceExhjcykgaYtCH/m +oIU63+CFOTtR7otyQAWHqXa7q4SbCDlG7DyRFxqG0txPtGvy12lgldA2+RgcigQG +Dfcog5wrJytaQ6UA0wE= +-----END CERTIFICATE----- diff --git a/benchmark/stats/histogram.go b/benchmark/stats/histogram.go index cfb40c90..60763960 100644 --- a/benchmark/stats/histogram.go +++ b/benchmark/stats/histogram.go @@ -145,6 +145,15 @@ func NewHistogram(opts HistogramOptions) *Histogram { return &h } +func (h *Histogram) Clear() { + h.count = newCounter() + h.sum = newCounter() + h.tracker = newTracker() + for _, v := range h.buckets { + v.count = newCounter() + } +} + // Opts returns a copy of the options used to create the Histogram. func (h *Histogram) Opts() HistogramOptions { return h.opts diff --git a/benchmark/worker/benchmark_client.go b/benchmark/worker/benchmark_client.go new file mode 100644 index 00000000..94d53c7b --- /dev/null +++ b/benchmark/worker/benchmark_client.go @@ -0,0 +1,214 @@ +package main + +import ( + "math" + "runtime" + "sync" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/benchmark" + testpb "google.golang.org/grpc/benchmark/grpc_testing" + "google.golang.org/grpc/benchmark/stats" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/grpclog" +) + +var ( + caFile = "/usr/local/google/home/menghanl/go/src/google.golang.org/grpc/benchmark/server/testdata/ca.pem" +) + +type benchmarkClient struct { + conns []*grpc.ClientConn + histogramGrowFactor float64 + histogramMaxPossible float64 + stop chan bool + mu sync.RWMutex + lastResetTime time.Time + histogram *stats.Histogram +} + +func startBenchmarkClientWithSetup(setup *testpb.ClientConfig) (*benchmarkClient, error) { + var opts []grpc.DialOption + + grpclog.Printf(" - client type: %v", setup.ClientType) + switch setup.ClientType { + // Ignore client type + case testpb.ClientType_SYNC_CLIENT: + case testpb.ClientType_ASYNC_CLIENT: + default: + return nil, grpc.Errorf(codes.InvalidArgument, "unknow client type: %v", setup.ClientType) + } + + grpclog.Printf(" - security params: %v", setup.SecurityParams) + if setup.SecurityParams != nil { + creds, err := credentials.NewClientTLSFromFile(caFile, setup.SecurityParams.ServerHostOverride) + if err != nil { + grpclog.Fatalf("failed to create TLS credentials %v", err) + } + opts = append(opts, grpc.WithTransportCredentials(creds)) + } else { + opts = append(opts, grpc.WithInsecure()) + } + + // Ignore async client threads. + + grpclog.Printf(" - core limit: %v", setup.CoreLimit) + if setup.CoreLimit > 0 { + runtime.GOMAXPROCS(int(setup.CoreLimit)) + } else { + // runtime.GOMAXPROCS(runtime.NumCPU()) + runtime.GOMAXPROCS(1) + } + + // TODO payload config + grpclog.Printf(" - payload config: %v", setup.PayloadConfig) + var payloadReqSize, payloadRespSize int + if setup.PayloadConfig != nil { + // TODO payload config + grpclog.Printf("payload config: %v", setup.PayloadConfig) + switch c := setup.PayloadConfig.Payload.(type) { + case *testpb.PayloadConfig_BytebufParams: + opts = append(opts, grpc.WithCodec(byteBufCodec{})) + payloadReqSize = int(c.BytebufParams.ReqSize) + payloadRespSize = int(c.BytebufParams.RespSize) + case *testpb.PayloadConfig_SimpleParams: + payloadReqSize = int(c.SimpleParams.ReqSize) + payloadRespSize = int(c.SimpleParams.RespSize) + case *testpb.PayloadConfig_ComplexParams: + default: + return nil, grpc.Errorf(codes.InvalidArgument, "unknow payload config: %v", setup.PayloadConfig) + } + } + + // TODO core list + grpclog.Printf(" - core list: %v", setup.CoreList) + + grpclog.Printf(" - histogram params: %v", setup.HistogramParams) + grpclog.Printf(" - server targets: %v", setup.ServerTargets) + grpclog.Printf(" - rpcs per chann: %v", setup.OutstandingRpcsPerChannel) + grpclog.Printf(" - channel number: %v", setup.ClientChannels) + + rpcCount, connCount := int(setup.OutstandingRpcsPerChannel), int(setup.ClientChannels) + + bc := &benchmarkClient{ + conns: make([]*grpc.ClientConn, connCount), + histogramGrowFactor: setup.HistogramParams.Resolution, + histogramMaxPossible: setup.HistogramParams.MaxPossible, + } + + for connIndex := 0; connIndex < connCount; connIndex++ { + bc.conns[connIndex] = benchmark.NewClientConn(setup.ServerTargets[connIndex%len(setup.ServerTargets)], opts...) + } + + bc.histogram = stats.NewHistogram(stats.HistogramOptions{ + NumBuckets: int(math.Log(bc.histogramMaxPossible)/math.Log(1+bc.histogramGrowFactor)) + 1, + GrowthFactor: bc.histogramGrowFactor, + MinValue: 0, + }) + + grpclog.Printf(" - rpc type: %v", setup.RpcType) + var rpc func(testpb.BenchmarkServiceClient) + switch setup.RpcType { + case testpb.RpcType_UNARY: + rpc = func(client testpb.BenchmarkServiceClient) { + benchmark.DoUnaryCall(client, payloadReqSize, payloadRespSize) + } + case testpb.RpcType_STREAMING: + default: + return nil, grpc.Errorf(codes.InvalidArgument, "unknown rpc type: %v", setup.RpcType) + } + + grpclog.Printf(" - load params: %v", setup.LoadParams) + bc.stop = make(chan bool) + switch lp := setup.LoadParams.Load.(type) { + case *testpb.LoadParams_ClosedLoop: + grpclog.Printf(" - %v", lp.ClosedLoop) + doCloseLoop(bc.histogram, bc.conns, rpcCount, rpc, bc.stop) + case *testpb.LoadParams_Poisson: + grpclog.Printf(" - %v", lp.Poisson) + case *testpb.LoadParams_Uniform: + grpclog.Printf(" - %v", lp.Uniform) + case *testpb.LoadParams_Determ: + grpclog.Printf(" - %v", lp.Determ) + case *testpb.LoadParams_Pareto: + grpclog.Printf(" - %v", lp.Pareto) + default: + return nil, grpc.Errorf(codes.InvalidArgument, "unknown load params: %v", setup.LoadParams) + } + + bc.mu.Lock() + defer bc.mu.Unlock() + bc.lastResetTime = time.Now() + return bc, nil +} + +func doCloseLoop(h *stats.Histogram, conns []*grpc.ClientConn, rpcCount int, rpc func(testpb.BenchmarkServiceClient), stop <-chan bool) { + clients := make([]testpb.BenchmarkServiceClient, len(conns)) + for ic, conn := range conns { + clients[ic] = testpb.NewBenchmarkServiceClient(conn) + for j := 0; j < 100/len(conns); j++ { + rpc(clients[ic]) + } + } + var mu sync.Mutex + for ic, _ := range conns { + for j := 0; j < rpcCount; j++ { + go func() { + for { + select { + case <-stop: + grpclog.Printf("stopped") + return + default: + start := time.Now() + rpc(clients[ic]) + elapse := time.Since(start) + go func() { + mu.Lock() + h.Add(int64(elapse / time.Nanosecond)) + mu.Unlock() + }() + } + } + }() + } + } + grpclog.Printf("close loop done, count: %v", rpcCount) +} + +func (bc *benchmarkClient) getStats() *testpb.ClientStats { + bc.mu.RLock() + // time.Sleep(1 * time.Second) + defer bc.mu.RUnlock() + histogramValue := bc.histogram.Value() + b := make([]uint32, len(histogramValue.Buckets)) + tempCount := make(map[int64]int) + for i, v := range histogramValue.Buckets { + b[i] = uint32(v.Count) + tempCount[v.Count] += 1 + } + grpclog.Printf("+++++\n%v count: %v\n+++++", tempCount, histogramValue.Count) + return &testpb.ClientStats{ + Latencies: &testpb.HistogramData{ + Bucket: b, + MinSeen: float64(histogramValue.Min), + MaxSeen: float64(histogramValue.Max), + Sum: float64(histogramValue.Sum), + // TODO change to squares + SumOfSquares: float64(histogramValue.Sum), + Count: float64(histogramValue.Count), + }, + TimeElapsed: time.Since(bc.lastResetTime).Seconds(), + TimeUser: 0, + TimeSystem: 0, + } +} + +func (bc *benchmarkClient) reset() { + bc.mu.Lock() + defer bc.mu.Unlock() + bc.lastResetTime = time.Now() + bc.histogram.Clear() +} diff --git a/benchmark/worker/main.go b/benchmark/worker/main.go index fb6f3e82..04668f16 100644 --- a/benchmark/worker/main.go +++ b/benchmark/worker/main.go @@ -71,6 +71,7 @@ func (byteBufCodec) String() string { type workerServer struct { stop chan<- bool serverPort int + bc *benchmarkClient } func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) error { @@ -128,7 +129,48 @@ func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) er } func (s *workerServer) RunClient(stream testpb.WorkerService_RunClientServer) error { - return grpc.Errorf(codes.Unimplemented, "RunClient not implemented") + + for { + in, err := stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return err + } + + var out *testpb.ClientStatus + switch t := in.Argtype.(type) { + case *testpb.ClientArgs_Setup: + grpclog.Printf("client setup received:") + + bc, err := startBenchmarkClientWithSetup(t.Setup) + if err != nil { + return err + } + s.bc = bc + out = &testpb.ClientStatus{ + Stats: s.bc.getStats(), + } + case *testpb.ClientArgs_Mark: + grpclog.Printf("client mark received:") + grpclog.Printf(" - %v", t) + if s.bc == nil { + return grpc.Errorf(codes.InvalidArgument, "client does not exist when mark received") + } + out = &testpb.ClientStatus{ + Stats: s.bc.getStats(), + } + if t.Mark.Reset_ { + s.bc.reset() + } + } + if err := stream.Send(out); err != nil { + return err + } + } + + return nil } func (s *workerServer) CoreCount(ctx context.Context, in *testpb.CoreRequest) (*testpb.CoreResponse, error) { @@ -139,6 +181,9 @@ func (s *workerServer) CoreCount(ctx context.Context, in *testpb.CoreRequest) (* func (s *workerServer) QuitWorker(ctx context.Context, in *testpb.Void) (*testpb.Void, error) { grpclog.Printf("quiting worker") defer func() { s.stop <- true }() + if s.bc != nil { + close(s.bc.stop) + } return &testpb.Void{}, nil }