Merge branch 'master' of github.com:grpc/grpc-go
This commit is contained in:
@ -24,26 +24,31 @@ func run(b *testing.B, maxConcurrentCalls int, caller func(testpb.TestServiceCli
|
|||||||
}
|
}
|
||||||
|
|
||||||
ch := make(chan int, maxConcurrentCalls*4)
|
ch := make(chan int, maxConcurrentCalls*4)
|
||||||
var wg sync.WaitGroup
|
var (
|
||||||
|
mu sync.Mutex
|
||||||
|
wg sync.WaitGroup
|
||||||
|
)
|
||||||
wg.Add(maxConcurrentCalls)
|
wg.Add(maxConcurrentCalls)
|
||||||
|
|
||||||
// Distribute the b.N calls over maxConcurrentCalls workers.
|
// Distribute the b.N calls over maxConcurrentCalls workers.
|
||||||
for i := 0; i < maxConcurrentCalls; i++ {
|
for i := 0; i < maxConcurrentCalls; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
for _ = range ch {
|
for _ = range ch {
|
||||||
|
start := time.Now()
|
||||||
caller(tc)
|
caller(tc)
|
||||||
|
elapse := time.Since(start)
|
||||||
|
mu.Lock()
|
||||||
|
s.Add(elapse)
|
||||||
|
mu.Unlock()
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
for i := 0; i < b.N; i++ {
|
|
||||||
b.StartTimer()
|
b.StartTimer()
|
||||||
start := time.Now()
|
for i := 0; i < b.N; i++ {
|
||||||
ch <- i
|
ch <- i
|
||||||
elapsed := time.Since(start)
|
|
||||||
b.StopTimer()
|
|
||||||
s.Add(elapsed)
|
|
||||||
}
|
}
|
||||||
|
b.StopTimer()
|
||||||
close(ch)
|
close(ch)
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
conn.Close()
|
conn.Close()
|
||||||
|
89
benchmark/client/main.go
Normal file
89
benchmark/client/main.go
Normal file
@ -0,0 +1,89 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"flag"
|
||||||
|
"log"
|
||||||
|
"math"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
_ "net/http/pprof"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"google.golang.org/grpc/benchmark"
|
||||||
|
"google.golang.org/grpc/benchmark/stats"
|
||||||
|
testpb "google.golang.org/grpc/interop/grpc_testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
server = flag.String("server", "", "The server address")
|
||||||
|
maxConcurrentRPCs = flag.Int("max_concurrent_rpcs", 1, "The max number of concurrent RPCs")
|
||||||
|
duration = flag.Int("duration", math.MaxInt32, "The duration in seconds to run the benchmark client")
|
||||||
|
)
|
||||||
|
|
||||||
|
func caller(client testpb.TestServiceClient) {
|
||||||
|
benchmark.DoUnaryCall(client, 1, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func closeLoop() {
|
||||||
|
s := stats.NewStats(256)
|
||||||
|
conn := benchmark.NewClientConn(*server)
|
||||||
|
tc := testpb.NewTestServiceClient(conn)
|
||||||
|
// Warm up connection.
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
caller(tc)
|
||||||
|
}
|
||||||
|
ch := make(chan int, *maxConcurrentRPCs*4)
|
||||||
|
var (
|
||||||
|
mu sync.Mutex
|
||||||
|
wg sync.WaitGroup
|
||||||
|
)
|
||||||
|
wg.Add(*maxConcurrentRPCs)
|
||||||
|
// Distribute RPCs over maxConcurrentCalls workers.
|
||||||
|
for i := 0; i < *maxConcurrentRPCs; i++ {
|
||||||
|
go func() {
|
||||||
|
for _ = range ch {
|
||||||
|
start := time.Now()
|
||||||
|
caller(tc)
|
||||||
|
elapse := time.Since(start)
|
||||||
|
mu.Lock()
|
||||||
|
s.Add(elapse)
|
||||||
|
mu.Unlock()
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
// Stop the client when time is up.
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
<-time.After(time.Duration(*duration) * time.Second)
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
ok := true
|
||||||
|
for ok {
|
||||||
|
select {
|
||||||
|
case ch <- 0:
|
||||||
|
case <-done:
|
||||||
|
ok = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(ch)
|
||||||
|
wg.Wait()
|
||||||
|
conn.Close()
|
||||||
|
log.Println(s.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
flag.Parse()
|
||||||
|
go func() {
|
||||||
|
lis, err := net.Listen("tcp", ":0")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to listen: %v", err)
|
||||||
|
}
|
||||||
|
log.Println("Client profiling address: ", lis.Addr().String())
|
||||||
|
if err := http.Serve(lis, nil); err != nil {
|
||||||
|
log.Fatalf("Failed to serve: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
closeLoop()
|
||||||
|
}
|
35
benchmark/server/main.go
Normal file
35
benchmark/server/main.go
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"flag"
|
||||||
|
"log"
|
||||||
|
"math"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
_ "net/http/pprof"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"google.golang.org/grpc/benchmark"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
duration = flag.Int("duration", math.MaxInt32, "The duration in seconds to run the benchmark server")
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
flag.Parse()
|
||||||
|
go func() {
|
||||||
|
lis, err := net.Listen("tcp", ":0")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to listen: %v", err)
|
||||||
|
}
|
||||||
|
log.Println("Server profiling address: ", lis.Addr().String())
|
||||||
|
if err := http.Serve(lis, nil); err != nil {
|
||||||
|
log.Fatalf("Failed to serve: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
addr, stopper := benchmark.StartServer()
|
||||||
|
log.Println("Server Address: ", addr)
|
||||||
|
<-time.After(time.Duration(*duration) * time.Second)
|
||||||
|
stopper()
|
||||||
|
}
|
21
server.go
21
server.go
@ -85,6 +85,7 @@ type Server struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type options struct {
|
type options struct {
|
||||||
|
handshaker func(net.Conn) error
|
||||||
codec Codec
|
codec Codec
|
||||||
maxConcurrentStreams uint32
|
maxConcurrentStreams uint32
|
||||||
}
|
}
|
||||||
@ -92,13 +93,22 @@ type options struct {
|
|||||||
// A ServerOption sets options.
|
// A ServerOption sets options.
|
||||||
type ServerOption func(*options)
|
type ServerOption func(*options)
|
||||||
|
|
||||||
|
// Handshaker returns a ServerOption that specifies a function to perform user-specified
|
||||||
|
// handshaking on the connection before it becomes usable for gRPC.
|
||||||
|
func Handshaker(f func(net.Conn) error) ServerOption {
|
||||||
|
return func(o *options) {
|
||||||
|
o.handshaker = f
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
|
||||||
func CustomCodec(codec Codec) ServerOption {
|
func CustomCodec(codec Codec) ServerOption {
|
||||||
return func(o *options) {
|
return func(o *options) {
|
||||||
o.codec = codec
|
o.codec = codec
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// MaxConcurrentStreams returns an Option that will apply a limit on the number
|
// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
|
||||||
// of concurrent streams to each ServerTransport.
|
// of concurrent streams to each ServerTransport.
|
||||||
func MaxConcurrentStreams(n uint32) ServerOption {
|
func MaxConcurrentStreams(n uint32) ServerOption {
|
||||||
return func(o *options) {
|
return func(o *options) {
|
||||||
@ -185,7 +195,14 @@ func (s *Server) Serve(lis net.Listener) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
// Perform handshaking if it is required.
|
||||||
|
if s.opts.handshaker != nil {
|
||||||
|
if err := s.opts.handshaker(c); err != nil {
|
||||||
|
log.Println("grpc: Server.Serve failed to complete handshake.")
|
||||||
|
c.Close()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
if s.conns == nil {
|
if s.conns == nil {
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
|
Reference in New Issue
Block a user