Add close benchmark rpc calls

This commit is contained in:
Menghan Li
2016-04-18 18:30:38 -07:00
parent 2aaff82a6e
commit 257710d39c

View File

@ -6,6 +6,7 @@ import (
"sync" "sync"
"time" "time"
"golang.org/x/net/context"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/benchmark" "google.golang.org/grpc/benchmark"
testpb "google.golang.org/grpc/benchmark/grpc_testing" testpb "google.golang.org/grpc/benchmark/grpc_testing"
@ -108,24 +109,10 @@ func startBenchmarkClientWithSetup(setup *testpb.ClientConfig) (*benchmarkClient
MinValue: 0, MinValue: 0,
}) })
grpclog.Printf(" - rpc type: %v", setup.RpcType)
var rpc func(testpb.BenchmarkServiceClient)
switch setup.RpcType {
case testpb.RpcType_UNARY:
rpc = func(client testpb.BenchmarkServiceClient) {
benchmark.DoUnaryCall(client, payloadReqSize, payloadRespSize)
}
case testpb.RpcType_STREAMING:
default:
return nil, grpc.Errorf(codes.InvalidArgument, "unknown rpc type: %v", setup.RpcType)
}
grpclog.Printf(" - load params: %v", setup.LoadParams) grpclog.Printf(" - load params: %v", setup.LoadParams)
bc.stop = make(chan bool)
switch lp := setup.LoadParams.Load.(type) { switch lp := setup.LoadParams.Load.(type) {
case *testpb.LoadParams_ClosedLoop: case *testpb.LoadParams_ClosedLoop:
grpclog.Printf(" - %v", lp.ClosedLoop) grpclog.Printf(" - %v", lp.ClosedLoop)
doCloseLoop(bc.histogram, bc.conns, rpcCount, rpc, bc.stop)
case *testpb.LoadParams_Poisson: case *testpb.LoadParams_Poisson:
grpclog.Printf(" - %v", lp.Poisson) grpclog.Printf(" - %v", lp.Poisson)
case *testpb.LoadParams_Uniform: case *testpb.LoadParams_Uniform:
@ -138,18 +125,29 @@ func startBenchmarkClientWithSetup(setup *testpb.ClientConfig) (*benchmarkClient
return nil, grpc.Errorf(codes.InvalidArgument, "unknown load params: %v", setup.LoadParams) return nil, grpc.Errorf(codes.InvalidArgument, "unknown load params: %v", setup.LoadParams)
} }
grpclog.Printf(" - rpc type: %v", setup.RpcType)
bc.stop = make(chan bool)
switch setup.RpcType {
case testpb.RpcType_UNARY:
doCloseLoopUnaryBenchmark(bc.histogram, bc.conns, rpcCount, payloadReqSize, payloadRespSize, bc.stop)
case testpb.RpcType_STREAMING:
doCloseLoopStreamingBenchmark(bc.histogram, bc.conns, rpcCount, payloadReqSize, payloadRespSize, bc.stop)
default:
return nil, grpc.Errorf(codes.InvalidArgument, "unknown rpc type: %v", setup.RpcType)
}
bc.mu.Lock() bc.mu.Lock()
defer bc.mu.Unlock() defer bc.mu.Unlock()
bc.lastResetTime = time.Now() bc.lastResetTime = time.Now()
return bc, nil return bc, nil
} }
func doCloseLoop(h *stats.Histogram, conns []*grpc.ClientConn, rpcCount int, rpc func(testpb.BenchmarkServiceClient), stop <-chan bool) { func doCloseLoopUnaryBenchmark(h *stats.Histogram, conns []*grpc.ClientConn, rpcCount int, reqSize int, respSize int, stop <-chan bool) {
clients := make([]testpb.BenchmarkServiceClient, len(conns)) clients := make([]testpb.BenchmarkServiceClient, len(conns))
for ic, conn := range conns { for ic, conn := range conns {
clients[ic] = testpb.NewBenchmarkServiceClient(conn) clients[ic] = testpb.NewBenchmarkServiceClient(conn)
for j := 0; j < 100/len(conns); j++ { for j := 0; j < 100/len(conns); j++ {
rpc(clients[ic]) benchmark.DoUnaryCall(clients[ic], reqSize, respSize)
} }
} }
var mu sync.Mutex var mu sync.Mutex
@ -157,19 +155,62 @@ func doCloseLoop(h *stats.Histogram, conns []*grpc.ClientConn, rpcCount int, rpc
for j := 0; j < rpcCount; j++ { for j := 0; j < rpcCount; j++ {
go func() { go func() {
for { for {
done := make(chan bool)
go func() {
start := time.Now()
benchmark.DoUnaryCall(clients[ic], reqSize, respSize)
elapse := time.Since(start)
mu.Lock()
h.Add(int64(elapse / time.Nanosecond))
mu.Unlock()
done <- true
}()
select { select {
case <-stop: case <-stop:
grpclog.Printf("stopped") grpclog.Printf("stopped")
return return
default: case <-done:
}
}
}()
}
}
grpclog.Printf("close loop done, count: %v", rpcCount)
}
func doCloseLoopStreamingBenchmark(h *stats.Histogram, conns []*grpc.ClientConn, rpcCount int, reqSize int, respSize int, stop <-chan bool) {
streams := make([]testpb.BenchmarkService_StreamingCallClient, len(conns))
for ic, conn := range conns {
c := testpb.NewBenchmarkServiceClient(conn)
s, err := c.StreamingCall(context.Background())
if err != nil {
grpclog.Fatalf("%v.StreamingCall(_) = _, %v", c, err)
}
streams[ic] = s
for j := 0; j < 100/len(conns); j++ {
benchmark.DoStreamingRoundTrip(streams[ic], reqSize, respSize)
}
}
var mu sync.Mutex
for ic, _ := range conns {
for j := 0; j < rpcCount; j++ {
go func() {
for {
done := make(chan bool)
go func() {
start := time.Now() start := time.Now()
rpc(clients[ic]) benchmark.DoStreamingRoundTrip(streams[ic], reqSize, respSize)
elapse := time.Since(start) elapse := time.Since(start)
go func() { mu.Lock()
mu.Lock() h.Add(int64(elapse / time.Nanosecond))
h.Add(int64(elapse / time.Nanosecond)) mu.Unlock()
mu.Unlock() done <- true
}() }()
select {
case <-stop:
grpclog.Printf("stopped")
return
case <-done:
} }
} }
}() }()