From dea4e5731246e23f513b9d5eae59b16fb611483a Mon Sep 17 00:00:00 2001 From: mmukhi Date: Mon, 21 May 2018 16:00:01 -0700 Subject: [PATCH] Benchmarks that runs server and client and separate processes. (#1952) --- benchmark/benchmark.go | 7 +- benchmark/client/main.go | 293 ++++++++++++++++++++------------------- benchmark/run_bench.sh | 187 +++++++++++++++++++++++++ benchmark/server/main.go | 72 +++++++--- 4 files changed, 400 insertions(+), 159 deletions(-) create mode 100755 benchmark/run_bench.sh diff --git a/benchmark/benchmark.go b/benchmark/benchmark.go index dc5d97af..ec9b50b2 100644 --- a/benchmark/benchmark.go +++ b/benchmark/benchmark.go @@ -226,9 +226,14 @@ func DoByteBufStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallCli // NewClientConn creates a gRPC client connection to addr. func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn { + return NewClientConnWithContext(context.Background(), addr, opts...) +} + +// NewClientConnWithContext creates a gRPC client connection to addr using ctx. +func NewClientConnWithContext(ctx context.Context, addr string, opts ...grpc.DialOption) *grpc.ClientConn { opts = append(opts, grpc.WithWriteBufferSize(128*1024)) opts = append(opts, grpc.WithReadBufferSize(128*1024)) - conn, err := grpc.Dial(addr, opts...) + conn, err := grpc.DialContext(ctx, addr, opts...) if err != nil { grpclog.Fatalf("NewClientConn(%q) failed to create a ClientConn %v", addr, err) } diff --git a/benchmark/client/main.go b/benchmark/client/main.go index 0984b6fe..13a5fb3e 100644 --- a/benchmark/client/main.go +++ b/benchmark/client/main.go @@ -20,14 +20,15 @@ package main import ( "flag" - "math" - "net" - "net/http" - _ "net/http/pprof" + "fmt" + "os" + "runtime" + "runtime/pprof" "sync" "time" "golang.org/x/net/context" + "golang.org/x/sys/unix" "google.golang.org/grpc" "google.golang.org/grpc/benchmark" testpb "google.golang.org/grpc/benchmark/grpc_testing" @@ -36,145 +37,159 @@ import ( ) var ( - server = flag.String("server", "", "The server address") - maxConcurrentRPCs = flag.Int("max_concurrent_rpcs", 1, "The max number of concurrent RPCs") - duration = flag.Int("duration", math.MaxInt32, "The duration in seconds to run the benchmark client") - trace = flag.Bool("trace", true, "Whether tracing is on") - rpcType = flag.Int("rpc_type", 0, + port = flag.String("port", "50051", "Localhost port to connect to.") + numRPC = flag.Int("r", 1, "The number of concurrent RPCs on each connection.") + numConn = flag.Int("c", 1, "The number of parallel connections.") + warmupDur = flag.Int("w", 10, "Warm-up duration in seconds") + duration = flag.Int("d", 60, "Benchmark duration in seconds") + rqSize = flag.Int("req", 1, "Request message size in bytes.") + rspSize = flag.Int("resp", 1, "Response message size in bytes.") + rpcType = flag.String("rpc_type", "unary", `Configure different client rpc type. Valid options are: - 0 : unary call; - 1 : streaming call.`) + unary; + streaming.`) + testName = flag.String("test_name", "", "Name of the test used for creating profiles.") + wg sync.WaitGroup + hopts = stats.HistogramOptions{ + NumBuckets: 2495, + GrowthFactor: .01, + } + mu sync.Mutex + hists []*stats.Histogram ) -func unaryCaller(client testpb.BenchmarkServiceClient) { - benchmark.DoUnaryCall(client, 1, 1) -} - -func streamCaller(stream testpb.BenchmarkService_StreamingCallClient) { - benchmark.DoStreamingRoundTrip(stream, 1, 1) -} - -func buildConnection() (s *stats.Stats, conn *grpc.ClientConn, tc testpb.BenchmarkServiceClient) { - s = stats.NewStats(256) - conn = benchmark.NewClientConn(*server) - tc = testpb.NewBenchmarkServiceClient(conn) - return s, conn, tc -} - -func closeLoopUnary() { - s, conn, tc := buildConnection() - - for i := 0; i < 100; i++ { - unaryCaller(tc) - } - ch := make(chan int, *maxConcurrentRPCs*4) - var ( - mu sync.Mutex - wg sync.WaitGroup - ) - wg.Add(*maxConcurrentRPCs) - - for i := 0; i < *maxConcurrentRPCs; i++ { - go func() { - for range ch { - start := time.Now() - unaryCaller(tc) - elapse := time.Since(start) - mu.Lock() - s.Add(elapse) - mu.Unlock() - } - wg.Done() - }() - } - // Stop the client when time is up. - done := make(chan struct{}) - go func() { - <-time.After(time.Duration(*duration) * time.Second) - close(done) - }() - ok := true - for ok { - select { - case ch <- 0: - case <-done: - ok = false - } - } - close(ch) - wg.Wait() - conn.Close() - grpclog.Infoln(s.String()) - -} - -func closeLoopStream() { - s, conn, tc := buildConnection() - ch := make(chan int, *maxConcurrentRPCs*4) - var ( - mu sync.Mutex - wg sync.WaitGroup - ) - wg.Add(*maxConcurrentRPCs) - // Distribute RPCs over maxConcurrentCalls workers. - for i := 0; i < *maxConcurrentRPCs; i++ { - go func() { - stream, err := tc.StreamingCall(context.Background()) - if err != nil { - grpclog.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) - } - // Do some warm up. - for i := 0; i < 100; i++ { - streamCaller(stream) - } - for range ch { - start := time.Now() - streamCaller(stream) - elapse := time.Since(start) - mu.Lock() - s.Add(elapse) - mu.Unlock() - } - wg.Done() - }() - } - // Stop the client when time is up. - done := make(chan struct{}) - go func() { - <-time.After(time.Duration(*duration) * time.Second) - close(done) - }() - ok := true - for ok { - select { - case ch <- 0: - case <-done: - ok = false - } - } - close(ch) - wg.Wait() - conn.Close() - grpclog.Infoln(s.String()) -} - func main() { flag.Parse() - grpc.EnableTracing = *trace - go func() { - lis, err := net.Listen("tcp", ":0") - if err != nil { - grpclog.Fatalf("Failed to listen: %v", err) - } - grpclog.Infoln("Client profiling address: ", lis.Addr().String()) - if err := http.Serve(lis, nil); err != nil { - grpclog.Fatalf("Failed to serve: %v", err) - } - }() - switch *rpcType { - case 0: - closeLoopUnary() - case 1: - closeLoopStream() + if *testName == "" { + grpclog.Fatalf("test_name not set") + } + req := &testpb.SimpleRequest{ + ResponseType: testpb.PayloadType_COMPRESSABLE, + ResponseSize: int32(*rspSize), + Payload: &testpb.Payload{ + Type: testpb.PayloadType_COMPRESSABLE, + Body: make([]byte, *rqSize), + }, + } + connectCtx, connectCancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second)) + defer connectCancel() + ccs := buildConnections(connectCtx) + warmDeadline := time.Now().Add(time.Duration(*warmupDur) * time.Second) + endDeadline := warmDeadline.Add(time.Duration(*duration) * time.Second) + cf, err := os.Create("/tmp/" + *testName + ".cpu") + if err != nil { + grpclog.Fatalf("Error creating file: %v", err) + } + defer cf.Close() + pprof.StartCPUProfile(cf) + cpuBeg := getCPUTime() + for _, cc := range ccs { + runWithConn(cc, req, warmDeadline, endDeadline) + } + wg.Wait() + cpu := time.Duration(getCPUTime() - cpuBeg) + pprof.StopCPUProfile() + mf, err := os.Create("/tmp/" + *testName + ".mem") + if err != nil { + grpclog.Fatalf("Error creating file: %v", err) + } + defer mf.Close() + runtime.GC() // materialize all statistics + if err := pprof.WriteHeapProfile(mf); err != nil { + grpclog.Fatalf("Error writing memory profile: %v", err) + } + hist := stats.NewHistogram(hopts) + for _, h := range hists { + hist.Merge(h) + } + parseHist(hist) + fmt.Println("Client CPU utilization:", cpu) + fmt.Println("Client CPU profile:", cf.Name()) + fmt.Println("Client Mem Profile:", mf.Name()) +} + +func buildConnections(ctx context.Context) []*grpc.ClientConn { + ccs := make([]*grpc.ClientConn, *numConn) + for i := range ccs { + ccs[i] = benchmark.NewClientConnWithContext(ctx, "localhost:"+*port, grpc.WithInsecure(), grpc.WithBlock()) + } + return ccs +} + +func runWithConn(cc *grpc.ClientConn, req *testpb.SimpleRequest, warmDeadline, endDeadline time.Time) { + for i := 0; i < *numRPC; i++ { + wg.Add(1) + go func() { + defer wg.Done() + caller := makeCaller(cc, req) + hist := stats.NewHistogram(hopts) + for { + start := time.Now() + if start.After(endDeadline) { + mu.Lock() + hists = append(hists, hist) + mu.Unlock() + return + } + caller() + elapsed := time.Since(start) + if start.After(warmDeadline) { + hist.Add(elapsed.Nanoseconds()) + } + } + }() } } + +func makeCaller(cc *grpc.ClientConn, req *testpb.SimpleRequest) func() { + client := testpb.NewBenchmarkServiceClient(cc) + if *rpcType == "unary" { + return func() { + if _, err := client.UnaryCall(context.Background(), req); err != nil { + grpclog.Fatalf("RPC failed: %v", err) + } + } + } + stream, err := client.StreamingCall(context.Background()) + if err != nil { + grpclog.Fatalf("RPC failed: %v", err) + } + return func() { + if err := stream.Send(req); err != nil { + grpclog.Fatalf("Streaming RPC failed to send: %v", err) + } + if _, err := stream.Recv(); err != nil { + grpclog.Fatalf("Streaming RPC failed to read: %v", err) + } + } +} + +func parseHist(hist *stats.Histogram) { + fmt.Println("qps:", float64(hist.Count)/float64(*duration)) + fmt.Printf("Latency: (50/90/99 %%ile): %v/%v/%v\n", + time.Duration(median(.5, hist)), + time.Duration(median(.9, hist)), + time.Duration(median(.99, hist))) +} + +func median(percentile float64, h *stats.Histogram) int64 { + need := int64(float64(h.Count) * percentile) + have := int64(0) + for _, bucket := range h.Buckets { + count := bucket.Count + if have+count >= need { + percent := float64(need-have) / float64(count) + return int64((1.0-percent)*bucket.LowBound + percent*bucket.LowBound*(1.0+hopts.GrowthFactor)) + } + have += bucket.Count + } + panic("should have found a bound") +} + +func getCPUTime() int64 { + var ts unix.Timespec + if err := unix.ClockGettime(unix.CLOCK_PROCESS_CPUTIME_ID, &ts); err != nil { + grpclog.Fatal(err) + } + return ts.Nano() +} diff --git a/benchmark/run_bench.sh b/benchmark/run_bench.sh new file mode 100755 index 00000000..045de160 --- /dev/null +++ b/benchmark/run_bench.sh @@ -0,0 +1,187 @@ +#!/bin/bash + +rpcs=(1) +conns=(1) +warmup=10 +dur=10 +reqs=(1) +resps=(1) +rpc_types=(unary) + +# idx[0] = idx value for rpcs +# idx[1] = idx value for conns +# idx[2] = idx value for reqs +# idx[3] = idx value for resps +# idx[4] = idx value for rpc_types +idx=(0 0 0 0 0) +idx_max=(1 1 1 1 1) + +inc() +{ + for i in $(seq $((${#idx[@]}-1)) -1 0); do + idx[${i}]=$((${idx[${i}]}+1)) + if [ ${idx[${i}]} == ${idx_max[${i}]} ]; then + idx[${i}]=0 + else + break + fi + done + local fin + fin=1 + # Check to see if we have looped back to the beginning. + for v in ${idx[@]}; do + if [ ${v} != 0 ]; then + fin=0 + break + fi + done + if [ ${fin} == 1 ]; then + rm -Rf ${out_dir} + clean_and_die 0 + fi +} + +clean_and_die() { + rm -Rf ${out_dir} + exit $1 +} + +run(){ + local nr + nr=${rpcs[${idx[0]}]} + local nc + nc=${conns[${idx[1]}]} + req_sz=${reqs[${idx[2]}]} + resp_sz=${resps[${idx[3]}]} + r_type=${rpc_types[${idx[4]}]} + # Following runs one benchmark + base_port=50051 + delta=0 + test_name="r_"${nr}"_c_"${nc}"_req_"${req_sz}"_resp_"${resp_sz}"_"${r_type}"_"$(date +%s) + echo "================================================================================" + echo ${test_name} + while : + do + port=$((${base_port}+${delta})) + + # Launch the server in background + ${out_dir}/server --port=${port} --test_name="Server_"${test_name}& + server_pid=$(echo $!) + + # Launch the client + ${out_dir}/client --port=${port} --d=${dur} --w=${warmup} --r=${nr} --c=${nc} --req=${req_sz} --resp=${resp_sz} --rpc_type=${r_type} --test_name="client_"${test_name} + client_status=$(echo $?) + + kill ${server_pid} + wait ${server_pid} + + if [ ${client_status} == 0 ]; then + break + fi + + delta=$((${delta}+1)) + if [ ${delta} == 10 ]; then + echo "Continuous 10 failed runs. Exiting now." + rm -Rf ${out_dir} + clean_and_die 1 + fi + done + +} + +set_param(){ + local argname=$1 + shift + local idx=$1 + shift + if [ $# -eq 0 ]; then + echo "${argname} not specified" + exit 1 + fi + PARAM=($(echo $1 | sed 's/,/ /g')) + if [ ${idx} -lt 0 ]; then + return + fi + idx_max[${idx}]=${#PARAM[@]} +} + +while [ $# -gt 0 ]; do + case "$1" in + -r) + shift + set_param "number of rpcs" 0 $1 + rpcs=(${PARAM[@]}) + shift + ;; + -c) + shift + set_param "number of connections" 1 $1 + conns=(${PARAM[@]}) + shift + ;; + -w) + shift + set_param "warm-up period" -1 $1 + warmup=${PARAM} + shift + ;; + -d) + shift + set_param "duration" -1 $1 + dur=${PARAM} + shift + ;; + -req) + shift + set_param "request size" 2 $1 + reqs=(${PARAM[@]}) + shift + ;; + -resp) + shift + set_param "response size" 3 $1 + resps=(${PARAM[@]}) + shift + ;; + -rpc_type) + shift + set_param "rpc type" 4 $1 + rpc_types=(${PARAM[@]}) + shift + ;; + -h|--help) + echo "Following are valid options:" + echo + echo "-h, --help show brief help" + echo "-w warm-up duration in seconds, default value is 10" + echo "-d benchmark duration in seconds, default value is 60" + echo "" + echo "Each of the following can have multiple comma separated values." + echo "" + echo "-r number of RPCs, default value is 1" + echo "-c number of Connections, default value is 1" + echo "-req req size in bytes, default value is 1" + echo "-resp resp size in bytes, default value is 1" + echo "-rpc_type valid values are unary|streaming, default is unary" + ;; + *) + echo "Incorrect option $1" + exit 1 + ;; + esac +done + +# Build server and client +out_dir=$(mktemp -d oss_benchXXX) + +go build -o ${out_dir}/server $GOPATH/src/google.golang.org/grpc/benchmark/server/main.go && go build -o ${out_dir}/client $GOPATH/src/google.golang.org/grpc/benchmark/client/main.go +if [ $? != 0 ]; then + clean_and_die 1 +fi + + +while : +do + run + inc +done diff --git a/benchmark/server/main.go b/benchmark/server/main.go index 3fd117a9..253657e0 100644 --- a/benchmark/server/main.go +++ b/benchmark/server/main.go @@ -20,37 +20,71 @@ package main import ( "flag" - "math" + "fmt" "net" - "net/http" _ "net/http/pprof" + "os" + "os/signal" + "runtime" + "runtime/pprof" + "syscall" "time" + "golang.org/x/sys/unix" "google.golang.org/grpc/benchmark" "google.golang.org/grpc/grpclog" ) -var duration = flag.Int("duration", math.MaxInt32, "The duration in seconds to run the benchmark server") +var ( + port = flag.String("port", "50051", "Localhost port to listen on.") + testName = flag.String("test_name", "", "Name of the test used for creating profiles.") +) func main() { flag.Parse() - go func() { - lis, err := net.Listen("tcp", ":0") - if err != nil { - grpclog.Fatalf("Failed to listen: %v", err) - } - grpclog.Infoln("Server profiling address: ", lis.Addr().String()) - if err := http.Serve(lis, nil); err != nil { - grpclog.Fatalf("Failed to serve: %v", err) - } - }() - lis, err := net.Listen("tcp", ":0") + if *testName == "" { + grpclog.Fatalf("test name not set") + } + lis, err := net.Listen("tcp", ":"+*port) if err != nil { grpclog.Fatalf("Failed to listen: %v", err) } - addr := lis.Addr().String() - stopper := benchmark.StartServer(benchmark.ServerInfo{Type: "protobuf", Listener: lis}) // listen on all interfaces - grpclog.Infoln("Server Address: ", addr) - <-time.After(time.Duration(*duration) * time.Second) - stopper() + defer lis.Close() + + cf, err := os.Create("/tmp/" + *testName + ".cpu") + if err != nil { + grpclog.Fatalf("Failed to create file: %v", err) + } + defer cf.Close() + pprof.StartCPUProfile(cf) + cpuBeg := getCPUTime() + // Launch server in a separate goroutine. + stop := benchmark.StartServer(benchmark.ServerInfo{Type: "protobuf", Listener: lis}) + // Wait on OS terminate signal. + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGTERM) + <-ch + cpu := time.Duration(getCPUTime() - cpuBeg) + stop() + pprof.StopCPUProfile() + mf, err := os.Create("/tmp/" + *testName + ".mem") + if err != nil { + grpclog.Fatalf("Failed to create file: %v", err) + } + defer mf.Close() + runtime.GC() // materialize all statistics + if err := pprof.WriteHeapProfile(mf); err != nil { + grpclog.Fatalf("Failed to write memory profile: %v", err) + } + fmt.Println("Server CPU utilization:", cpu) + fmt.Println("Server CPU profile:", cf.Name()) + fmt.Println("Server Mem Profile:", mf.Name()) +} + +func getCPUTime() int64 { + var ts unix.Timespec + if err := unix.ClockGettime(unix.CLOCK_PROCESS_CPUTIME_ID, &ts); err != nil { + grpclog.Fatal(err) + } + return ts.Nano() }