benchmarks: add flag to benchmain to use bufconn instead of network (#1837)
This will allow us to focus on grpc internals in profiling by excluding the cost of syscalls, which is a significant part of our time spent.
This commit is contained in:
@ -66,6 +66,7 @@ import (
|
|||||||
"google.golang.org/grpc/benchmark/latency"
|
"google.golang.org/grpc/benchmark/latency"
|
||||||
"google.golang.org/grpc/benchmark/stats"
|
"google.golang.org/grpc/benchmark/stats"
|
||||||
"google.golang.org/grpc/grpclog"
|
"google.golang.org/grpc/grpclog"
|
||||||
|
"google.golang.org/grpc/test/bufconn"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -137,13 +138,31 @@ func makeFuncUnary(benchFeatures stats.Features) (func(int), func()) {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
|
sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
|
||||||
opts = append(opts, grpc.WithDialer(func(address string, timeout time.Duration) (net.Conn, error) {
|
|
||||||
return nw.TimeoutDialer(net.DialTimeout)("tcp", address, timeout)
|
|
||||||
}))
|
|
||||||
opts = append(opts, grpc.WithInsecure())
|
opts = append(opts, grpc.WithInsecure())
|
||||||
|
|
||||||
target, stopper := bm.StartServer(bm.ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, sopts...)
|
var lis net.Listener
|
||||||
conn := bm.NewClientConn(target, opts...)
|
if *useBufconn {
|
||||||
|
bcLis := bufconn.Listen(256 * 1024)
|
||||||
|
lis = bcLis
|
||||||
|
opts = append(opts, grpc.WithDialer(func(string, time.Duration) (net.Conn, error) {
|
||||||
|
return nw.TimeoutDialer(
|
||||||
|
func(string, string, time.Duration) (net.Conn, error) {
|
||||||
|
return bcLis.Dial()
|
||||||
|
})("", "", 0)
|
||||||
|
}))
|
||||||
|
} else {
|
||||||
|
var err error
|
||||||
|
lis, err = net.Listen("tcp", "localhost:0")
|
||||||
|
if err != nil {
|
||||||
|
grpclog.Fatalf("Failed to listen: %v", err)
|
||||||
|
}
|
||||||
|
opts = append(opts, grpc.WithDialer(func(_ string, timeout time.Duration) (net.Conn, error) {
|
||||||
|
return nw.TimeoutDialer(net.DialTimeout)("tcp", lis.Addr().String(), timeout)
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
lis = nw.Listener(lis)
|
||||||
|
stopper := bm.StartServer(bm.ServerInfo{Type: "protobuf", Listener: lis}, sopts...)
|
||||||
|
conn := bm.NewClientConn("" /* target not used */, opts...)
|
||||||
tc := testpb.NewBenchmarkServiceClient(conn)
|
tc := testpb.NewBenchmarkServiceClient(conn)
|
||||||
return func(int) {
|
return func(int) {
|
||||||
unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
|
unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
|
||||||
@ -154,6 +173,7 @@ func makeFuncUnary(benchFeatures stats.Features) (func(int), func()) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func makeFuncStream(benchFeatures stats.Features) (func(int), func()) {
|
func makeFuncStream(benchFeatures stats.Features) (func(int), func()) {
|
||||||
|
// TODO: Refactor to remove duplication with makeFuncUnary.
|
||||||
nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu}
|
nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu}
|
||||||
opts := []grpc.DialOption{}
|
opts := []grpc.DialOption{}
|
||||||
sopts := []grpc.ServerOption{}
|
sopts := []grpc.ServerOption{}
|
||||||
@ -168,13 +188,31 @@ func makeFuncStream(benchFeatures stats.Features) (func(int), func()) {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
|
sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
|
||||||
opts = append(opts, grpc.WithDialer(func(address string, timeout time.Duration) (net.Conn, error) {
|
|
||||||
return nw.TimeoutDialer(net.DialTimeout)("tcp", address, timeout)
|
|
||||||
}))
|
|
||||||
opts = append(opts, grpc.WithInsecure())
|
opts = append(opts, grpc.WithInsecure())
|
||||||
|
|
||||||
target, stopper := bm.StartServer(bm.ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, sopts...)
|
var lis net.Listener
|
||||||
conn := bm.NewClientConn(target, opts...)
|
if *useBufconn {
|
||||||
|
bcLis := bufconn.Listen(256 * 1024)
|
||||||
|
lis = bcLis
|
||||||
|
opts = append(opts, grpc.WithDialer(func(string, time.Duration) (net.Conn, error) {
|
||||||
|
return nw.TimeoutDialer(
|
||||||
|
func(string, string, time.Duration) (net.Conn, error) {
|
||||||
|
return bcLis.Dial()
|
||||||
|
})("", "", 0)
|
||||||
|
}))
|
||||||
|
} else {
|
||||||
|
var err error
|
||||||
|
lis, err = net.Listen("tcp", "localhost:0")
|
||||||
|
if err != nil {
|
||||||
|
grpclog.Fatalf("Failed to listen: %v", err)
|
||||||
|
}
|
||||||
|
opts = append(opts, grpc.WithDialer(func(_ string, timeout time.Duration) (net.Conn, error) {
|
||||||
|
return nw.TimeoutDialer(net.DialTimeout)("tcp", lis.Addr().String(), timeout)
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
lis = nw.Listener(lis)
|
||||||
|
stopper := bm.StartServer(bm.ServerInfo{Type: "protobuf", Listener: lis}, sopts...)
|
||||||
|
conn := bm.NewClientConn("" /* target not used */, opts...)
|
||||||
tc := testpb.NewBenchmarkServiceClient(conn)
|
tc := testpb.NewBenchmarkServiceClient(conn)
|
||||||
streams := make([]testpb.BenchmarkService_StreamingCallClient, benchFeatures.MaxConcurrentCalls)
|
streams := make([]testpb.BenchmarkService_StreamingCallClient, benchFeatures.MaxConcurrentCalls)
|
||||||
for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ {
|
for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ {
|
||||||
@ -240,6 +278,8 @@ func runBenchmark(caller func(int), startTimer func(), stopTimer func(int32), be
|
|||||||
stopTimer(count)
|
stopTimer(count)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var useBufconn = flag.Bool("bufconn", false, "Use in-memory connection instead of system network I/O")
|
||||||
|
|
||||||
// Initiate main function to get settings of features.
|
// Initiate main function to get settings of features.
|
||||||
func init() {
|
func init() {
|
||||||
var (
|
var (
|
||||||
|
@ -136,9 +136,6 @@ func (s *byteBufServer) StreamingCall(stream testpb.BenchmarkService_StreamingCa
|
|||||||
|
|
||||||
// ServerInfo contains the information to create a gRPC benchmark server.
|
// ServerInfo contains the information to create a gRPC benchmark server.
|
||||||
type ServerInfo struct {
|
type ServerInfo struct {
|
||||||
// Addr is the address of the server.
|
|
||||||
Addr string
|
|
||||||
|
|
||||||
// Type is the type of the server.
|
// Type is the type of the server.
|
||||||
// It should be "protobuf" or "bytebuf".
|
// It should be "protobuf" or "bytebuf".
|
||||||
Type string
|
Type string
|
||||||
@ -148,21 +145,13 @@ type ServerInfo struct {
|
|||||||
// For "bytebuf", it should be an int representing response size.
|
// For "bytebuf", it should be an int representing response size.
|
||||||
Metadata interface{}
|
Metadata interface{}
|
||||||
|
|
||||||
// Network can simulate latency
|
// Listener is the network listener for the server to use
|
||||||
Network *latency.Network
|
Listener net.Listener
|
||||||
}
|
}
|
||||||
|
|
||||||
// StartServer starts a gRPC server serving a benchmark service according to info.
|
// StartServer starts a gRPC server serving a benchmark service according to info.
|
||||||
// It returns its listen address and a function to stop the server.
|
// It returns a function to stop the server.
|
||||||
func StartServer(info ServerInfo, opts ...grpc.ServerOption) (string, func()) {
|
func StartServer(info ServerInfo, opts ...grpc.ServerOption) func() {
|
||||||
lis, err := net.Listen("tcp", info.Addr)
|
|
||||||
if err != nil {
|
|
||||||
grpclog.Fatalf("Failed to listen: %v", err)
|
|
||||||
}
|
|
||||||
nw := info.Network
|
|
||||||
if nw != nil {
|
|
||||||
lis = nw.Listener(lis)
|
|
||||||
}
|
|
||||||
opts = append(opts, grpc.WriteBufferSize(128*1024))
|
opts = append(opts, grpc.WriteBufferSize(128*1024))
|
||||||
opts = append(opts, grpc.ReadBufferSize(128*1024))
|
opts = append(opts, grpc.ReadBufferSize(128*1024))
|
||||||
s := grpc.NewServer(opts...)
|
s := grpc.NewServer(opts...)
|
||||||
@ -178,8 +167,8 @@ func StartServer(info ServerInfo, opts ...grpc.ServerOption) (string, func()) {
|
|||||||
default:
|
default:
|
||||||
grpclog.Fatalf("failed to StartServer, unknown Type: %v", info.Type)
|
grpclog.Fatalf("failed to StartServer, unknown Type: %v", info.Type)
|
||||||
}
|
}
|
||||||
go s.Serve(lis)
|
go s.Serve(info.Listener)
|
||||||
return lis.Addr().String(), func() {
|
return func() {
|
||||||
s.Stop()
|
s.Stop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -250,7 +239,13 @@ func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn {
|
|||||||
func runUnary(b *testing.B, benchFeatures stats.Features) {
|
func runUnary(b *testing.B, benchFeatures stats.Features) {
|
||||||
s := stats.AddStats(b, 38)
|
s := stats.AddStats(b, 38)
|
||||||
nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu}
|
nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu}
|
||||||
target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
|
lis, err := net.Listen("tcp", "localhost:0")
|
||||||
|
if err != nil {
|
||||||
|
grpclog.Fatalf("Failed to listen: %v", err)
|
||||||
|
}
|
||||||
|
target := lis.Addr().String()
|
||||||
|
lis = nw.Listener(lis)
|
||||||
|
stopper := StartServer(ServerInfo{Type: "protobuf", Listener: lis}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
|
||||||
defer stopper()
|
defer stopper()
|
||||||
conn := NewClientConn(
|
conn := NewClientConn(
|
||||||
target, grpc.WithInsecure(),
|
target, grpc.WithInsecure(),
|
||||||
@ -298,7 +293,13 @@ func runUnary(b *testing.B, benchFeatures stats.Features) {
|
|||||||
func runStream(b *testing.B, benchFeatures stats.Features) {
|
func runStream(b *testing.B, benchFeatures stats.Features) {
|
||||||
s := stats.AddStats(b, 38)
|
s := stats.AddStats(b, 38)
|
||||||
nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu}
|
nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu}
|
||||||
target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
|
lis, err := net.Listen("tcp", "localhost:0")
|
||||||
|
if err != nil {
|
||||||
|
grpclog.Fatalf("Failed to listen: %v", err)
|
||||||
|
}
|
||||||
|
target := lis.Addr().String()
|
||||||
|
lis = nw.Listener(lis)
|
||||||
|
stopper := StartServer(ServerInfo{Type: "protobuf", Listener: lis}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
|
||||||
defer stopper()
|
defer stopper()
|
||||||
conn := NewClientConn(
|
conn := NewClientConn(
|
||||||
target, grpc.WithInsecure(),
|
target, grpc.WithInsecure(),
|
||||||
|
@ -44,7 +44,12 @@ func main() {
|
|||||||
grpclog.Fatalf("Failed to serve: %v", err)
|
grpclog.Fatalf("Failed to serve: %v", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
addr, stopper := benchmark.StartServer(benchmark.ServerInfo{Addr: ":0", Type: "protobuf"}) // listen on all interfaces
|
lis, err := net.Listen("tcp", "localhost:0")
|
||||||
|
if err != nil {
|
||||||
|
grpclog.Fatalf("Failed to listen: %v", err)
|
||||||
|
}
|
||||||
|
addr := lis.Addr().String()
|
||||||
|
stopper := benchmark.StartServer(benchmark.ServerInfo{Type: "protobuf", Listener: lis}) // listen on all interfaces
|
||||||
grpclog.Println("Server Address: ", addr)
|
grpclog.Println("Server Address: ", addr)
|
||||||
<-time.After(time.Duration(*duration) * time.Second)
|
<-time.After(time.Duration(*duration) * time.Second)
|
||||||
stopper()
|
stopper()
|
||||||
|
@ -20,6 +20,8 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"flag"
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
"runtime"
|
"runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@ -110,26 +112,27 @@ func startBenchmarkServer(config *testpb.ServerConfig, serverPort int) (*benchma
|
|||||||
if port == 0 {
|
if port == 0 {
|
||||||
port = serverPort
|
port = serverPort
|
||||||
}
|
}
|
||||||
|
lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
|
||||||
|
if err != nil {
|
||||||
|
grpclog.Fatalf("Failed to listen: %v", err)
|
||||||
|
}
|
||||||
|
addr := lis.Addr().String()
|
||||||
|
|
||||||
// Create different benchmark server according to config.
|
// Create different benchmark server according to config.
|
||||||
var (
|
var closeFunc func()
|
||||||
addr string
|
|
||||||
closeFunc func()
|
|
||||||
err error
|
|
||||||
)
|
|
||||||
if config.PayloadConfig != nil {
|
if config.PayloadConfig != nil {
|
||||||
switch payload := config.PayloadConfig.Payload.(type) {
|
switch payload := config.PayloadConfig.Payload.(type) {
|
||||||
case *testpb.PayloadConfig_BytebufParams:
|
case *testpb.PayloadConfig_BytebufParams:
|
||||||
opts = append(opts, grpc.CustomCodec(byteBufCodec{}))
|
opts = append(opts, grpc.CustomCodec(byteBufCodec{}))
|
||||||
addr, closeFunc = benchmark.StartServer(benchmark.ServerInfo{
|
closeFunc = benchmark.StartServer(benchmark.ServerInfo{
|
||||||
Addr: ":" + strconv.Itoa(port),
|
|
||||||
Type: "bytebuf",
|
Type: "bytebuf",
|
||||||
Metadata: payload.BytebufParams.RespSize,
|
Metadata: payload.BytebufParams.RespSize,
|
||||||
|
Listener: lis,
|
||||||
}, opts...)
|
}, opts...)
|
||||||
case *testpb.PayloadConfig_SimpleParams:
|
case *testpb.PayloadConfig_SimpleParams:
|
||||||
addr, closeFunc = benchmark.StartServer(benchmark.ServerInfo{
|
closeFunc = benchmark.StartServer(benchmark.ServerInfo{
|
||||||
Addr: ":" + strconv.Itoa(port),
|
Type: "protobuf",
|
||||||
Type: "protobuf",
|
Listener: lis,
|
||||||
}, opts...)
|
}, opts...)
|
||||||
case *testpb.PayloadConfig_ComplexParams:
|
case *testpb.PayloadConfig_ComplexParams:
|
||||||
return nil, status.Errorf(codes.Unimplemented, "unsupported payload config: %v", config.PayloadConfig)
|
return nil, status.Errorf(codes.Unimplemented, "unsupported payload config: %v", config.PayloadConfig)
|
||||||
@ -138,9 +141,9 @@ func startBenchmarkServer(config *testpb.ServerConfig, serverPort int) (*benchma
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Start protobuf server if payload config is nil.
|
// Start protobuf server if payload config is nil.
|
||||||
addr, closeFunc = benchmark.StartServer(benchmark.ServerInfo{
|
closeFunc = benchmark.StartServer(benchmark.ServerInfo{
|
||||||
Addr: ":" + strconv.Itoa(port),
|
Type: "protobuf",
|
||||||
Type: "protobuf",
|
Listener: lis,
|
||||||
}, opts...)
|
}, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user