From 9f83f476dc8c5af385ac4b80ab7a783cd1581431 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Tue, 28 Apr 2015 17:34:24 -0700 Subject: [PATCH 01/11] add standalone benchmark server --- benchmark/server/main.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 benchmark/server/main.go diff --git a/benchmark/server/main.go b/benchmark/server/main.go new file mode 100644 index 00000000..7bcfb5c3 --- /dev/null +++ b/benchmark/server/main.go @@ -0,0 +1,22 @@ +package main + +import ( + "flag" + "fmt" + "math" + "time" + + "google.golang.org/grpc/benchmark" +) + +var ( + duration = flag.Int("duration", math.MaxInt32, "The duration in seconds to run the benchmark server") +) + +func main() { + flag.Parse() + addr, stopper := benchmark.StartServer() + fmt.Println("Server Address: ", addr) + <-time.After(time.Duration(*duration) * time.Second) + stopper() +} From c0ead53d5e64ed9ccb6d1b1e87f2e1fc93ba134c Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Tue, 28 Apr 2015 19:23:26 -0700 Subject: [PATCH 02/11] Add the standalone benchmark client --- benchmark/client/main.go | 72 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 benchmark/client/main.go diff --git a/benchmark/client/main.go b/benchmark/client/main.go new file mode 100644 index 00000000..f85f7d33 --- /dev/null +++ b/benchmark/client/main.go @@ -0,0 +1,72 @@ +package main + +import ( + "flag" + "math" + "sync" + "time" + + "fmt" + + "google.golang.org/grpc/benchmark" + "google.golang.org/grpc/benchmark/stats" + testpb "google.golang.org/grpc/interop/grpc_testing" +) + +var ( + server = flag.String("server", "", "The server address") + maxConcurrentRPCs = flag.Int("max_concurrent_rpcs", 1, "The max number of concurrent RPCs") + duration = flag.Int("duration", math.MaxInt32, "The duration in seconds to run the benchmark client") +) + +func caller(client testpb.TestServiceClient) { + benchmark.DoUnaryCall(client, 1, 1) +} + +func closeLoop() { + s := stats.NewStats(256) + conn := benchmark.NewClientConn(*server) + tc := testpb.NewTestServiceClient(conn) + // Warm up connection. + for i := 0; i < 100; i++ { + caller(tc) + } + ch := make(chan int, *maxConcurrentRPCs*4) + var wg sync.WaitGroup + wg.Add(*maxConcurrentRPCs) + + // Distribute RPCs over maxConcurrentCalls workers. + for i := 0; i < *maxConcurrentRPCs; i++ { + go func() { + for _ = range ch { + caller(tc) + } + wg.Done() + }() + } + // Stop the client when time is up. + done := make(chan struct{}) + go func() { + <-time.After(time.Duration(*duration) * time.Second) + close(done) + }() + ok := true + for ok { + start := time.Now() + select { + case ch <-0: + s.Add(time.Since(start)) + case <-done: + ok = false + } + } + close(ch) + wg.Wait() + conn.Close() + fmt.Println(s.String()) +} + +func main() { + flag.Parse() + closeLoop() +} From 93ec6daf9cd4057aa17b482095bc0867110d131c Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 29 Apr 2015 10:58:56 -0700 Subject: [PATCH 03/11] record per-rpc latency instead of per-item --- benchmark/client/main.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/benchmark/client/main.go b/benchmark/client/main.go index f85f7d33..7319c11f 100644 --- a/benchmark/client/main.go +++ b/benchmark/client/main.go @@ -2,12 +2,11 @@ package main import ( "flag" + "fmt" "math" "sync" "time" - "fmt" - "google.golang.org/grpc/benchmark" "google.golang.org/grpc/benchmark/stats" testpb "google.golang.org/grpc/interop/grpc_testing" @@ -32,14 +31,21 @@ func closeLoop() { caller(tc) } ch := make(chan int, *maxConcurrentRPCs*4) - var wg sync.WaitGroup + var ( + mu sync.Mutex + wg sync.WaitGroup + ) wg.Add(*maxConcurrentRPCs) - // Distribute RPCs over maxConcurrentCalls workers. for i := 0; i < *maxConcurrentRPCs; i++ { go func() { for _ = range ch { + start := time.Now() caller(tc) + elapse := time.Since(start) + mu.Lock() + s.Add(elapse) + mu.Unlock() } wg.Done() }() @@ -52,10 +58,8 @@ func closeLoop() { }() ok := true for ok { - start := time.Now() select { case ch <-0: - s.Add(time.Since(start)) case <-done: ok = false } From f29a6f2d85518e625a1d7813dca55fc9844f9986 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 29 Apr 2015 11:02:40 -0700 Subject: [PATCH 04/11] record per-rpc rtt in benchmark_test --- benchmark/benchmark_test.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/benchmark/benchmark_test.go b/benchmark/benchmark_test.go index ecdfa1a7..16e72817 100644 --- a/benchmark/benchmark_test.go +++ b/benchmark/benchmark_test.go @@ -24,25 +24,30 @@ func run(b *testing.B, maxConcurrentCalls int, caller func(testpb.TestServiceCli } ch := make(chan int, maxConcurrentCalls*4) - var wg sync.WaitGroup + var ( + mu sync.Mutex + wg sync.WaitGroup + ) wg.Add(maxConcurrentCalls) // Distribute the b.N calls over maxConcurrentCalls workers. for i := 0; i < maxConcurrentCalls; i++ { go func() { for _ = range ch { + start := time.Now() caller(tc) + elapse := time.Since(start) + mu.Lock() + s.Add(elapse) + mu.Unlock() } wg.Done() }() } for i := 0; i < b.N; i++ { b.StartTimer() - start := time.Now() ch <- i - elapsed := time.Since(start) b.StopTimer() - s.Add(elapsed) } close(ch) wg.Wait() From dfda1a106157f92c08ed1bfac09e9b2bf3dbf8f5 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 29 Apr 2015 11:03:25 -0700 Subject: [PATCH 05/11] gofmt -w --- benchmark/client/main.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/benchmark/client/main.go b/benchmark/client/main.go index 7319c11f..4fe7102d 100644 --- a/benchmark/client/main.go +++ b/benchmark/client/main.go @@ -13,9 +13,9 @@ import ( ) var ( - server = flag.String("server", "", "The server address") + server = flag.String("server", "", "The server address") maxConcurrentRPCs = flag.Int("max_concurrent_rpcs", 1, "The max number of concurrent RPCs") - duration = flag.Int("duration", math.MaxInt32, "The duration in seconds to run the benchmark client") + duration = flag.Int("duration", math.MaxInt32, "The duration in seconds to run the benchmark client") ) func caller(client testpb.TestServiceClient) { @@ -59,7 +59,7 @@ func closeLoop() { ok := true for ok { select { - case ch <-0: + case ch <- 0: case <-done: ok = false } From 58e0450a3c03af6f21643919747c7e2f646c5d0f Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 29 Apr 2015 11:20:15 -0700 Subject: [PATCH 06/11] put Start/StopTimer() at right place --- benchmark/benchmark_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/benchmark/benchmark_test.go b/benchmark/benchmark_test.go index 16e72817..2cd13b5b 100644 --- a/benchmark/benchmark_test.go +++ b/benchmark/benchmark_test.go @@ -44,11 +44,11 @@ func run(b *testing.B, maxConcurrentCalls int, caller func(testpb.TestServiceCli wg.Done() }() } + b.StartTimer() for i := 0; i < b.N; i++ { - b.StartTimer() ch <- i - b.StopTimer() } + b.StopTimer() close(ch) wg.Wait() conn.Close() From c6f6e4701d52b9be5044c30e785f5fa980eaa359 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 30 Apr 2015 16:30:48 -0700 Subject: [PATCH 07/11] Add a handshaker option to server --- server.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/server.go b/server.go index 79f17f3d..0d9a5915 100644 --- a/server.go +++ b/server.go @@ -85,6 +85,7 @@ type Server struct { } type options struct { + handshaker func(net.Conn) error codec Codec maxConcurrentStreams uint32 } @@ -92,6 +93,12 @@ type options struct { // A ServerOption sets options. type ServerOption func(*options) +func Handshaker(f func(net.Conn) error) ServerOption { + return func(o *options) { + o.handshaker = f + } +} + func CustomCodec(codec Codec) ServerOption { return func(o *options) { o.codec = codec @@ -185,7 +192,14 @@ func (s *Server) Serve(lis net.Listener) error { if err != nil { return err } - + // Perform handshaking if it is required. + if s.opts.handshaker != nil { + if err := s.opts.handshaker(c); err != nil { + log.Println("grpc: Server.Serve failed to complete handshake.") + c.Close() + continue + } + } s.mu.Lock() if s.conns == nil { s.mu.Unlock() From c3da701487d1873610b7b580b343d645b3bfb6c5 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 30 Apr 2015 16:36:21 -0700 Subject: [PATCH 08/11] gofmt -w --- server.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server.go b/server.go index 0d9a5915..700e4563 100644 --- a/server.go +++ b/server.go @@ -93,19 +93,22 @@ type options struct { // A ServerOption sets options. type ServerOption func(*options) +// Handshaker returns a ServerOption that specifies a function to perform user-specified +// handshaking on the connection before it becomes usable for gRPC. func Handshaker(f func(net.Conn) error) ServerOption { return func(o *options) { o.handshaker = f } } +// CustomeCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling. func CustomCodec(codec Codec) ServerOption { return func(o *options) { o.codec = codec } } -// MaxConcurrentStreams returns an Option that will apply a limit on the number +// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number // of concurrent streams to each ServerTransport. func MaxConcurrentStreams(n uint32) ServerOption { return func(o *options) { From c90971aee74b1f490083ad84b1d948242de5776e Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 30 Apr 2015 17:59:40 -0700 Subject: [PATCH 09/11] fix a typo --- server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server.go b/server.go index 700e4563..c8e677e2 100644 --- a/server.go +++ b/server.go @@ -101,7 +101,7 @@ func Handshaker(f func(net.Conn) error) ServerOption { } } -// CustomeCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling. +// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling. func CustomCodec(codec Codec) ServerOption { return func(o *options) { o.codec = codec From ae401535042d0d6f1c2b4ff6c7290d60437c1579 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 30 Apr 2015 19:05:16 -0700 Subject: [PATCH 10/11] add profiling port for benchmark client --- benchmark/client/main.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/benchmark/client/main.go b/benchmark/client/main.go index 4fe7102d..180c2e6c 100644 --- a/benchmark/client/main.go +++ b/benchmark/client/main.go @@ -3,7 +3,11 @@ package main import ( "flag" "fmt" + "log" "math" + "net" + "net/http" + _ "net/http/pprof" "sync" "time" @@ -72,5 +76,15 @@ func closeLoop() { func main() { flag.Parse() + go func() { + lis, err := net.Listen("tcp", ":0") + if err != nil { + log.Fatalf("Failed to listen: %v", err) + } + log.Println("Client profiling address: ", lis.Addr().String()) + if err := http.Serve(lis, nil); err != nil { + log.Fatalf("Failed to serve: %v", err) + } + }() closeLoop() } From a239e5e55fb1a507c1ac5b7757fbc949dac5783b Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 30 Apr 2015 19:16:27 -0700 Subject: [PATCH 11/11] Add profiling port for benchmark server --- benchmark/client/main.go | 3 +-- benchmark/server/main.go | 17 +++++++++++++++-- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/benchmark/client/main.go b/benchmark/client/main.go index 180c2e6c..4bd87a0e 100644 --- a/benchmark/client/main.go +++ b/benchmark/client/main.go @@ -2,7 +2,6 @@ package main import ( "flag" - "fmt" "log" "math" "net" @@ -71,7 +70,7 @@ func closeLoop() { close(ch) wg.Wait() conn.Close() - fmt.Println(s.String()) + log.Println(s.String()) } func main() { diff --git a/benchmark/server/main.go b/benchmark/server/main.go index 7bcfb5c3..d24a4478 100644 --- a/benchmark/server/main.go +++ b/benchmark/server/main.go @@ -2,8 +2,11 @@ package main import ( "flag" - "fmt" + "log" "math" + "net" + "net/http" + _ "net/http/pprof" "time" "google.golang.org/grpc/benchmark" @@ -15,8 +18,18 @@ var ( func main() { flag.Parse() + go func() { + lis, err := net.Listen("tcp", ":0") + if err != nil { + log.Fatalf("Failed to listen: %v", err) + } + log.Println("Server profiling address: ", lis.Addr().String()) + if err := http.Serve(lis, nil); err != nil { + log.Fatalf("Failed to serve: %v", err) + } + }() addr, stopper := benchmark.StartServer() - fmt.Println("Server Address: ", addr) + log.Println("Server Address: ", addr) <-time.After(time.Duration(*duration) * time.Second) stopper() }