Benchmarks that runs server and client and separate processes. (#1952)

This commit is contained in:
mmukhi
2018-05-21 16:00:01 -07:00
committed by GitHub
parent 8f06f82ca3
commit dea4e57312
4 changed files with 400 additions and 159 deletions

View File

@ -226,9 +226,14 @@ func DoByteBufStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallCli
// NewClientConn creates a gRPC client connection to addr.
func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn {
return NewClientConnWithContext(context.Background(), addr, opts...)
}
// NewClientConnWithContext creates a gRPC client connection to addr using ctx.
func NewClientConnWithContext(ctx context.Context, addr string, opts ...grpc.DialOption) *grpc.ClientConn {
opts = append(opts, grpc.WithWriteBufferSize(128*1024))
opts = append(opts, grpc.WithReadBufferSize(128*1024))
conn, err := grpc.Dial(addr, opts...)
conn, err := grpc.DialContext(ctx, addr, opts...)
if err != nil {
grpclog.Fatalf("NewClientConn(%q) failed to create a ClientConn %v", addr, err)
}

View File

@ -20,14 +20,15 @@ package main
import (
"flag"
"math"
"net"
"net/http"
_ "net/http/pprof"
"fmt"
"os"
"runtime"
"runtime/pprof"
"sync"
"time"
"golang.org/x/net/context"
"golang.org/x/sys/unix"
"google.golang.org/grpc"
"google.golang.org/grpc/benchmark"
testpb "google.golang.org/grpc/benchmark/grpc_testing"
@ -36,145 +37,159 @@ import (
)
var (
server = flag.String("server", "", "The server address")
maxConcurrentRPCs = flag.Int("max_concurrent_rpcs", 1, "The max number of concurrent RPCs")
duration = flag.Int("duration", math.MaxInt32, "The duration in seconds to run the benchmark client")
trace = flag.Bool("trace", true, "Whether tracing is on")
rpcType = flag.Int("rpc_type", 0,
port = flag.String("port", "50051", "Localhost port to connect to.")
numRPC = flag.Int("r", 1, "The number of concurrent RPCs on each connection.")
numConn = flag.Int("c", 1, "The number of parallel connections.")
warmupDur = flag.Int("w", 10, "Warm-up duration in seconds")
duration = flag.Int("d", 60, "Benchmark duration in seconds")
rqSize = flag.Int("req", 1, "Request message size in bytes.")
rspSize = flag.Int("resp", 1, "Response message size in bytes.")
rpcType = flag.String("rpc_type", "unary",
`Configure different client rpc type. Valid options are:
0 : unary call;
1 : streaming call.`)
unary;
streaming.`)
testName = flag.String("test_name", "", "Name of the test used for creating profiles.")
wg sync.WaitGroup
hopts = stats.HistogramOptions{
NumBuckets: 2495,
GrowthFactor: .01,
}
mu sync.Mutex
hists []*stats.Histogram
)
func unaryCaller(client testpb.BenchmarkServiceClient) {
benchmark.DoUnaryCall(client, 1, 1)
}
func streamCaller(stream testpb.BenchmarkService_StreamingCallClient) {
benchmark.DoStreamingRoundTrip(stream, 1, 1)
}
func buildConnection() (s *stats.Stats, conn *grpc.ClientConn, tc testpb.BenchmarkServiceClient) {
s = stats.NewStats(256)
conn = benchmark.NewClientConn(*server)
tc = testpb.NewBenchmarkServiceClient(conn)
return s, conn, tc
}
func closeLoopUnary() {
s, conn, tc := buildConnection()
for i := 0; i < 100; i++ {
unaryCaller(tc)
}
ch := make(chan int, *maxConcurrentRPCs*4)
var (
mu sync.Mutex
wg sync.WaitGroup
)
wg.Add(*maxConcurrentRPCs)
for i := 0; i < *maxConcurrentRPCs; i++ {
go func() {
for range ch {
start := time.Now()
unaryCaller(tc)
elapse := time.Since(start)
mu.Lock()
s.Add(elapse)
mu.Unlock()
}
wg.Done()
}()
}
// Stop the client when time is up.
done := make(chan struct{})
go func() {
<-time.After(time.Duration(*duration) * time.Second)
close(done)
}()
ok := true
for ok {
select {
case ch <- 0:
case <-done:
ok = false
}
}
close(ch)
wg.Wait()
conn.Close()
grpclog.Infoln(s.String())
}
func closeLoopStream() {
s, conn, tc := buildConnection()
ch := make(chan int, *maxConcurrentRPCs*4)
var (
mu sync.Mutex
wg sync.WaitGroup
)
wg.Add(*maxConcurrentRPCs)
// Distribute RPCs over maxConcurrentCalls workers.
for i := 0; i < *maxConcurrentRPCs; i++ {
go func() {
stream, err := tc.StreamingCall(context.Background())
if err != nil {
grpclog.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
}
// Do some warm up.
for i := 0; i < 100; i++ {
streamCaller(stream)
}
for range ch {
start := time.Now()
streamCaller(stream)
elapse := time.Since(start)
mu.Lock()
s.Add(elapse)
mu.Unlock()
}
wg.Done()
}()
}
// Stop the client when time is up.
done := make(chan struct{})
go func() {
<-time.After(time.Duration(*duration) * time.Second)
close(done)
}()
ok := true
for ok {
select {
case ch <- 0:
case <-done:
ok = false
}
}
close(ch)
wg.Wait()
conn.Close()
grpclog.Infoln(s.String())
}
func main() {
flag.Parse()
grpc.EnableTracing = *trace
go func() {
lis, err := net.Listen("tcp", ":0")
if err != nil {
grpclog.Fatalf("Failed to listen: %v", err)
}
grpclog.Infoln("Client profiling address: ", lis.Addr().String())
if err := http.Serve(lis, nil); err != nil {
grpclog.Fatalf("Failed to serve: %v", err)
}
}()
switch *rpcType {
case 0:
closeLoopUnary()
case 1:
closeLoopStream()
if *testName == "" {
grpclog.Fatalf("test_name not set")
}
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseSize: int32(*rspSize),
Payload: &testpb.Payload{
Type: testpb.PayloadType_COMPRESSABLE,
Body: make([]byte, *rqSize),
},
}
connectCtx, connectCancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
defer connectCancel()
ccs := buildConnections(connectCtx)
warmDeadline := time.Now().Add(time.Duration(*warmupDur) * time.Second)
endDeadline := warmDeadline.Add(time.Duration(*duration) * time.Second)
cf, err := os.Create("/tmp/" + *testName + ".cpu")
if err != nil {
grpclog.Fatalf("Error creating file: %v", err)
}
defer cf.Close()
pprof.StartCPUProfile(cf)
cpuBeg := getCPUTime()
for _, cc := range ccs {
runWithConn(cc, req, warmDeadline, endDeadline)
}
wg.Wait()
cpu := time.Duration(getCPUTime() - cpuBeg)
pprof.StopCPUProfile()
mf, err := os.Create("/tmp/" + *testName + ".mem")
if err != nil {
grpclog.Fatalf("Error creating file: %v", err)
}
defer mf.Close()
runtime.GC() // materialize all statistics
if err := pprof.WriteHeapProfile(mf); err != nil {
grpclog.Fatalf("Error writing memory profile: %v", err)
}
hist := stats.NewHistogram(hopts)
for _, h := range hists {
hist.Merge(h)
}
parseHist(hist)
fmt.Println("Client CPU utilization:", cpu)
fmt.Println("Client CPU profile:", cf.Name())
fmt.Println("Client Mem Profile:", mf.Name())
}
func buildConnections(ctx context.Context) []*grpc.ClientConn {
ccs := make([]*grpc.ClientConn, *numConn)
for i := range ccs {
ccs[i] = benchmark.NewClientConnWithContext(ctx, "localhost:"+*port, grpc.WithInsecure(), grpc.WithBlock())
}
return ccs
}
func runWithConn(cc *grpc.ClientConn, req *testpb.SimpleRequest, warmDeadline, endDeadline time.Time) {
for i := 0; i < *numRPC; i++ {
wg.Add(1)
go func() {
defer wg.Done()
caller := makeCaller(cc, req)
hist := stats.NewHistogram(hopts)
for {
start := time.Now()
if start.After(endDeadline) {
mu.Lock()
hists = append(hists, hist)
mu.Unlock()
return
}
caller()
elapsed := time.Since(start)
if start.After(warmDeadline) {
hist.Add(elapsed.Nanoseconds())
}
}
}()
}
}
func makeCaller(cc *grpc.ClientConn, req *testpb.SimpleRequest) func() {
client := testpb.NewBenchmarkServiceClient(cc)
if *rpcType == "unary" {
return func() {
if _, err := client.UnaryCall(context.Background(), req); err != nil {
grpclog.Fatalf("RPC failed: %v", err)
}
}
}
stream, err := client.StreamingCall(context.Background())
if err != nil {
grpclog.Fatalf("RPC failed: %v", err)
}
return func() {
if err := stream.Send(req); err != nil {
grpclog.Fatalf("Streaming RPC failed to send: %v", err)
}
if _, err := stream.Recv(); err != nil {
grpclog.Fatalf("Streaming RPC failed to read: %v", err)
}
}
}
func parseHist(hist *stats.Histogram) {
fmt.Println("qps:", float64(hist.Count)/float64(*duration))
fmt.Printf("Latency: (50/90/99 %%ile): %v/%v/%v\n",
time.Duration(median(.5, hist)),
time.Duration(median(.9, hist)),
time.Duration(median(.99, hist)))
}
func median(percentile float64, h *stats.Histogram) int64 {
need := int64(float64(h.Count) * percentile)
have := int64(0)
for _, bucket := range h.Buckets {
count := bucket.Count
if have+count >= need {
percent := float64(need-have) / float64(count)
return int64((1.0-percent)*bucket.LowBound + percent*bucket.LowBound*(1.0+hopts.GrowthFactor))
}
have += bucket.Count
}
panic("should have found a bound")
}
func getCPUTime() int64 {
var ts unix.Timespec
if err := unix.ClockGettime(unix.CLOCK_PROCESS_CPUTIME_ID, &ts); err != nil {
grpclog.Fatal(err)
}
return ts.Nano()
}

187
benchmark/run_bench.sh Executable file
View File

@ -0,0 +1,187 @@
#!/bin/bash
rpcs=(1)
conns=(1)
warmup=10
dur=10
reqs=(1)
resps=(1)
rpc_types=(unary)
# idx[0] = idx value for rpcs
# idx[1] = idx value for conns
# idx[2] = idx value for reqs
# idx[3] = idx value for resps
# idx[4] = idx value for rpc_types
idx=(0 0 0 0 0)
idx_max=(1 1 1 1 1)
inc()
{
for i in $(seq $((${#idx[@]}-1)) -1 0); do
idx[${i}]=$((${idx[${i}]}+1))
if [ ${idx[${i}]} == ${idx_max[${i}]} ]; then
idx[${i}]=0
else
break
fi
done
local fin
fin=1
# Check to see if we have looped back to the beginning.
for v in ${idx[@]}; do
if [ ${v} != 0 ]; then
fin=0
break
fi
done
if [ ${fin} == 1 ]; then
rm -Rf ${out_dir}
clean_and_die 0
fi
}
clean_and_die() {
rm -Rf ${out_dir}
exit $1
}
run(){
local nr
nr=${rpcs[${idx[0]}]}
local nc
nc=${conns[${idx[1]}]}
req_sz=${reqs[${idx[2]}]}
resp_sz=${resps[${idx[3]}]}
r_type=${rpc_types[${idx[4]}]}
# Following runs one benchmark
base_port=50051
delta=0
test_name="r_"${nr}"_c_"${nc}"_req_"${req_sz}"_resp_"${resp_sz}"_"${r_type}"_"$(date +%s)
echo "================================================================================"
echo ${test_name}
while :
do
port=$((${base_port}+${delta}))
# Launch the server in background
${out_dir}/server --port=${port} --test_name="Server_"${test_name}&
server_pid=$(echo $!)
# Launch the client
${out_dir}/client --port=${port} --d=${dur} --w=${warmup} --r=${nr} --c=${nc} --req=${req_sz} --resp=${resp_sz} --rpc_type=${r_type} --test_name="client_"${test_name}
client_status=$(echo $?)
kill ${server_pid}
wait ${server_pid}
if [ ${client_status} == 0 ]; then
break
fi
delta=$((${delta}+1))
if [ ${delta} == 10 ]; then
echo "Continuous 10 failed runs. Exiting now."
rm -Rf ${out_dir}
clean_and_die 1
fi
done
}
set_param(){
local argname=$1
shift
local idx=$1
shift
if [ $# -eq 0 ]; then
echo "${argname} not specified"
exit 1
fi
PARAM=($(echo $1 | sed 's/,/ /g'))
if [ ${idx} -lt 0 ]; then
return
fi
idx_max[${idx}]=${#PARAM[@]}
}
while [ $# -gt 0 ]; do
case "$1" in
-r)
shift
set_param "number of rpcs" 0 $1
rpcs=(${PARAM[@]})
shift
;;
-c)
shift
set_param "number of connections" 1 $1
conns=(${PARAM[@]})
shift
;;
-w)
shift
set_param "warm-up period" -1 $1
warmup=${PARAM}
shift
;;
-d)
shift
set_param "duration" -1 $1
dur=${PARAM}
shift
;;
-req)
shift
set_param "request size" 2 $1
reqs=(${PARAM[@]})
shift
;;
-resp)
shift
set_param "response size" 3 $1
resps=(${PARAM[@]})
shift
;;
-rpc_type)
shift
set_param "rpc type" 4 $1
rpc_types=(${PARAM[@]})
shift
;;
-h|--help)
echo "Following are valid options:"
echo
echo "-h, --help show brief help"
echo "-w warm-up duration in seconds, default value is 10"
echo "-d benchmark duration in seconds, default value is 60"
echo ""
echo "Each of the following can have multiple comma separated values."
echo ""
echo "-r number of RPCs, default value is 1"
echo "-c number of Connections, default value is 1"
echo "-req req size in bytes, default value is 1"
echo "-resp resp size in bytes, default value is 1"
echo "-rpc_type valid values are unary|streaming, default is unary"
;;
*)
echo "Incorrect option $1"
exit 1
;;
esac
done
# Build server and client
out_dir=$(mktemp -d oss_benchXXX)
go build -o ${out_dir}/server $GOPATH/src/google.golang.org/grpc/benchmark/server/main.go && go build -o ${out_dir}/client $GOPATH/src/google.golang.org/grpc/benchmark/client/main.go
if [ $? != 0 ]; then
clean_and_die 1
fi
while :
do
run
inc
done

View File

@ -20,37 +20,71 @@ package main
import (
"flag"
"math"
"fmt"
"net"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"runtime"
"runtime/pprof"
"syscall"
"time"
"golang.org/x/sys/unix"
"google.golang.org/grpc/benchmark"
"google.golang.org/grpc/grpclog"
)
var duration = flag.Int("duration", math.MaxInt32, "The duration in seconds to run the benchmark server")
var (
port = flag.String("port", "50051", "Localhost port to listen on.")
testName = flag.String("test_name", "", "Name of the test used for creating profiles.")
)
func main() {
flag.Parse()
go func() {
lis, err := net.Listen("tcp", ":0")
if err != nil {
grpclog.Fatalf("Failed to listen: %v", err)
}
grpclog.Infoln("Server profiling address: ", lis.Addr().String())
if err := http.Serve(lis, nil); err != nil {
grpclog.Fatalf("Failed to serve: %v", err)
}
}()
lis, err := net.Listen("tcp", ":0")
if *testName == "" {
grpclog.Fatalf("test name not set")
}
lis, err := net.Listen("tcp", ":"+*port)
if err != nil {
grpclog.Fatalf("Failed to listen: %v", err)
}
addr := lis.Addr().String()
stopper := benchmark.StartServer(benchmark.ServerInfo{Type: "protobuf", Listener: lis}) // listen on all interfaces
grpclog.Infoln("Server Address: ", addr)
<-time.After(time.Duration(*duration) * time.Second)
stopper()
defer lis.Close()
cf, err := os.Create("/tmp/" + *testName + ".cpu")
if err != nil {
grpclog.Fatalf("Failed to create file: %v", err)
}
defer cf.Close()
pprof.StartCPUProfile(cf)
cpuBeg := getCPUTime()
// Launch server in a separate goroutine.
stop := benchmark.StartServer(benchmark.ServerInfo{Type: "protobuf", Listener: lis})
// Wait on OS terminate signal.
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM)
<-ch
cpu := time.Duration(getCPUTime() - cpuBeg)
stop()
pprof.StopCPUProfile()
mf, err := os.Create("/tmp/" + *testName + ".mem")
if err != nil {
grpclog.Fatalf("Failed to create file: %v", err)
}
defer mf.Close()
runtime.GC() // materialize all statistics
if err := pprof.WriteHeapProfile(mf); err != nil {
grpclog.Fatalf("Failed to write memory profile: %v", err)
}
fmt.Println("Server CPU utilization:", cpu)
fmt.Println("Server CPU profile:", cf.Name())
fmt.Println("Server Mem Profile:", mf.Name())
}
func getCPUTime() int64 {
var ts unix.Timespec
if err := unix.ClockGettime(unix.CLOCK_PROCESS_CPUTIME_ID, &ts); err != nil {
grpclog.Fatal(err)
}
return ts.Nano()
}