From 46fd263cc8f290fe631163b8dbe7c28d5b300310 Mon Sep 17 00:00:00 2001 From: dfawley Date: Thu, 15 Feb 2018 14:52:38 -0800 Subject: [PATCH] benchmarks: add flag to benchmain to use bufconn instead of network (#1837) This will allow us to focus on grpc internals in profiling by excluding the cost of syscalls, which is a significant part of our time spent. --- benchmark/benchmain/main.go | 60 +++++++++++++++++++++++----- benchmark/benchmark.go | 39 +++++++++--------- benchmark/server/main.go | 7 +++- benchmark/worker/benchmark_server.go | 29 ++++++++------ 4 files changed, 92 insertions(+), 43 deletions(-) diff --git a/benchmark/benchmain/main.go b/benchmark/benchmain/main.go index 0cc1f25e..d9c63aea 100644 --- a/benchmark/benchmain/main.go +++ b/benchmark/benchmain/main.go @@ -66,6 +66,7 @@ import ( "google.golang.org/grpc/benchmark/latency" "google.golang.org/grpc/benchmark/stats" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/test/bufconn" ) const ( @@ -137,13 +138,31 @@ func makeFuncUnary(benchFeatures stats.Features) (func(int), func()) { ) } sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1))) - opts = append(opts, grpc.WithDialer(func(address string, timeout time.Duration) (net.Conn, error) { - return nw.TimeoutDialer(net.DialTimeout)("tcp", address, timeout) - })) opts = append(opts, grpc.WithInsecure()) - target, stopper := bm.StartServer(bm.ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, sopts...) - conn := bm.NewClientConn(target, opts...) + var lis net.Listener + if *useBufconn { + bcLis := bufconn.Listen(256 * 1024) + lis = bcLis + opts = append(opts, grpc.WithDialer(func(string, time.Duration) (net.Conn, error) { + return nw.TimeoutDialer( + func(string, string, time.Duration) (net.Conn, error) { + return bcLis.Dial() + })("", "", 0) + })) + } else { + var err error + lis, err = net.Listen("tcp", "localhost:0") + if err != nil { + grpclog.Fatalf("Failed to listen: %v", err) + } + opts = append(opts, grpc.WithDialer(func(_ string, timeout time.Duration) (net.Conn, error) { + return nw.TimeoutDialer(net.DialTimeout)("tcp", lis.Addr().String(), timeout) + })) + } + lis = nw.Listener(lis) + stopper := bm.StartServer(bm.ServerInfo{Type: "protobuf", Listener: lis}, sopts...) + conn := bm.NewClientConn("" /* target not used */, opts...) tc := testpb.NewBenchmarkServiceClient(conn) return func(int) { unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes) @@ -154,6 +173,7 @@ func makeFuncUnary(benchFeatures stats.Features) (func(int), func()) { } func makeFuncStream(benchFeatures stats.Features) (func(int), func()) { + // TODO: Refactor to remove duplication with makeFuncUnary. nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu} opts := []grpc.DialOption{} sopts := []grpc.ServerOption{} @@ -168,13 +188,31 @@ func makeFuncStream(benchFeatures stats.Features) (func(int), func()) { ) } sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1))) - opts = append(opts, grpc.WithDialer(func(address string, timeout time.Duration) (net.Conn, error) { - return nw.TimeoutDialer(net.DialTimeout)("tcp", address, timeout) - })) opts = append(opts, grpc.WithInsecure()) - target, stopper := bm.StartServer(bm.ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, sopts...) - conn := bm.NewClientConn(target, opts...) + var lis net.Listener + if *useBufconn { + bcLis := bufconn.Listen(256 * 1024) + lis = bcLis + opts = append(opts, grpc.WithDialer(func(string, time.Duration) (net.Conn, error) { + return nw.TimeoutDialer( + func(string, string, time.Duration) (net.Conn, error) { + return bcLis.Dial() + })("", "", 0) + })) + } else { + var err error + lis, err = net.Listen("tcp", "localhost:0") + if err != nil { + grpclog.Fatalf("Failed to listen: %v", err) + } + opts = append(opts, grpc.WithDialer(func(_ string, timeout time.Duration) (net.Conn, error) { + return nw.TimeoutDialer(net.DialTimeout)("tcp", lis.Addr().String(), timeout) + })) + } + lis = nw.Listener(lis) + stopper := bm.StartServer(bm.ServerInfo{Type: "protobuf", Listener: lis}, sopts...) + conn := bm.NewClientConn("" /* target not used */, opts...) tc := testpb.NewBenchmarkServiceClient(conn) streams := make([]testpb.BenchmarkService_StreamingCallClient, benchFeatures.MaxConcurrentCalls) for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ { @@ -240,6 +278,8 @@ func runBenchmark(caller func(int), startTimer func(), stopTimer func(int32), be stopTimer(count) } +var useBufconn = flag.Bool("bufconn", false, "Use in-memory connection instead of system network I/O") + // Initiate main function to get settings of features. func init() { var ( diff --git a/benchmark/benchmark.go b/benchmark/benchmark.go index f09fa454..956ae35f 100644 --- a/benchmark/benchmark.go +++ b/benchmark/benchmark.go @@ -136,9 +136,6 @@ func (s *byteBufServer) StreamingCall(stream testpb.BenchmarkService_StreamingCa // ServerInfo contains the information to create a gRPC benchmark server. type ServerInfo struct { - // Addr is the address of the server. - Addr string - // Type is the type of the server. // It should be "protobuf" or "bytebuf". Type string @@ -148,21 +145,13 @@ type ServerInfo struct { // For "bytebuf", it should be an int representing response size. Metadata interface{} - // Network can simulate latency - Network *latency.Network + // Listener is the network listener for the server to use + Listener net.Listener } // StartServer starts a gRPC server serving a benchmark service according to info. -// It returns its listen address and a function to stop the server. -func StartServer(info ServerInfo, opts ...grpc.ServerOption) (string, func()) { - lis, err := net.Listen("tcp", info.Addr) - if err != nil { - grpclog.Fatalf("Failed to listen: %v", err) - } - nw := info.Network - if nw != nil { - lis = nw.Listener(lis) - } +// It returns a function to stop the server. +func StartServer(info ServerInfo, opts ...grpc.ServerOption) func() { opts = append(opts, grpc.WriteBufferSize(128*1024)) opts = append(opts, grpc.ReadBufferSize(128*1024)) s := grpc.NewServer(opts...) @@ -178,8 +167,8 @@ func StartServer(info ServerInfo, opts ...grpc.ServerOption) (string, func()) { default: grpclog.Fatalf("failed to StartServer, unknown Type: %v", info.Type) } - go s.Serve(lis) - return lis.Addr().String(), func() { + go s.Serve(info.Listener) + return func() { s.Stop() } } @@ -250,7 +239,13 @@ func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn { func runUnary(b *testing.B, benchFeatures stats.Features) { s := stats.AddStats(b, 38) nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu} - target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1))) + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + grpclog.Fatalf("Failed to listen: %v", err) + } + target := lis.Addr().String() + lis = nw.Listener(lis) + stopper := StartServer(ServerInfo{Type: "protobuf", Listener: lis}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1))) defer stopper() conn := NewClientConn( target, grpc.WithInsecure(), @@ -298,7 +293,13 @@ func runUnary(b *testing.B, benchFeatures stats.Features) { func runStream(b *testing.B, benchFeatures stats.Features) { s := stats.AddStats(b, 38) nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu} - target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1))) + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + grpclog.Fatalf("Failed to listen: %v", err) + } + target := lis.Addr().String() + lis = nw.Listener(lis) + stopper := StartServer(ServerInfo{Type: "protobuf", Listener: lis}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1))) defer stopper() conn := NewClientConn( target, grpc.WithInsecure(), diff --git a/benchmark/server/main.go b/benchmark/server/main.go index ac59ff77..b6e87aa7 100644 --- a/benchmark/server/main.go +++ b/benchmark/server/main.go @@ -44,7 +44,12 @@ func main() { grpclog.Fatalf("Failed to serve: %v", err) } }() - addr, stopper := benchmark.StartServer(benchmark.ServerInfo{Addr: ":0", Type: "protobuf"}) // listen on all interfaces + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + grpclog.Fatalf("Failed to listen: %v", err) + } + addr := lis.Addr().String() + stopper := benchmark.StartServer(benchmark.ServerInfo{Type: "protobuf", Listener: lis}) // listen on all interfaces grpclog.Println("Server Address: ", addr) <-time.After(time.Duration(*duration) * time.Second) stopper() diff --git a/benchmark/worker/benchmark_server.go b/benchmark/worker/benchmark_server.go index f130efde..9770776f 100644 --- a/benchmark/worker/benchmark_server.go +++ b/benchmark/worker/benchmark_server.go @@ -20,6 +20,8 @@ package main import ( "flag" + "fmt" + "net" "runtime" "strconv" "strings" @@ -110,26 +112,27 @@ func startBenchmarkServer(config *testpb.ServerConfig, serverPort int) (*benchma if port == 0 { port = serverPort } + lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port)) + if err != nil { + grpclog.Fatalf("Failed to listen: %v", err) + } + addr := lis.Addr().String() // Create different benchmark server according to config. - var ( - addr string - closeFunc func() - err error - ) + var closeFunc func() if config.PayloadConfig != nil { switch payload := config.PayloadConfig.Payload.(type) { case *testpb.PayloadConfig_BytebufParams: opts = append(opts, grpc.CustomCodec(byteBufCodec{})) - addr, closeFunc = benchmark.StartServer(benchmark.ServerInfo{ - Addr: ":" + strconv.Itoa(port), + closeFunc = benchmark.StartServer(benchmark.ServerInfo{ Type: "bytebuf", Metadata: payload.BytebufParams.RespSize, + Listener: lis, }, opts...) case *testpb.PayloadConfig_SimpleParams: - addr, closeFunc = benchmark.StartServer(benchmark.ServerInfo{ - Addr: ":" + strconv.Itoa(port), - Type: "protobuf", + closeFunc = benchmark.StartServer(benchmark.ServerInfo{ + Type: "protobuf", + Listener: lis, }, opts...) case *testpb.PayloadConfig_ComplexParams: return nil, status.Errorf(codes.Unimplemented, "unsupported payload config: %v", config.PayloadConfig) @@ -138,9 +141,9 @@ func startBenchmarkServer(config *testpb.ServerConfig, serverPort int) (*benchma } } else { // Start protobuf server if payload config is nil. - addr, closeFunc = benchmark.StartServer(benchmark.ServerInfo{ - Addr: ":" + strconv.Itoa(port), - Type: "protobuf", + closeFunc = benchmark.StartServer(benchmark.ServerInfo{ + Type: "protobuf", + Listener: lis, }, opts...) }