diff --git a/Makefile b/Makefile index e9fceab4..03bb01f0 100644 --- a/Makefile +++ b/Makefile @@ -9,9 +9,6 @@ updatedeps: testdeps: go get -d -v -t google.golang.org/grpc/... -benchdeps: testdeps - go get -d -v golang.org/x/perf/cmd/benchstat - updatetestdeps: go get -d -v -t -u -f google.golang.org/grpc/... @@ -35,9 +32,6 @@ test: testdeps testrace: testdeps go test -v -race -cpu 1,4 google.golang.org/grpc/... -benchmark: benchdeps - go test google.golang.org/grpc/benchmark/... -benchmem -bench=. | tee /tmp/tmp.result && benchstat /tmp/tmp.result && rm /tmp/tmp.result - clean: go clean -i google.golang.org/grpc/... @@ -55,6 +49,4 @@ coverage: testdeps test \ testrace \ clean \ - coverage \ - benchdeps \ - benchmark + coverage diff --git a/benchmark/benchmark.go b/benchmark/benchmark.go index 7afd49a9..df47f69d 100644 --- a/benchmark/benchmark.go +++ b/benchmark/benchmark.go @@ -32,6 +32,7 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc" testpb "google.golang.org/grpc/benchmark/grpc_testing" + "google.golang.org/grpc/benchmark/latency" "google.golang.org/grpc/benchmark/stats" "google.golang.org/grpc/grpclog" ) @@ -133,6 +134,9 @@ type ServerInfo struct { // For "protobuf", it's ignored. // For "bytebuf", it should be an int representing response size. Metadata interface{} + + // Network can simulate latency + Network *latency.Network } // StartServer starts a gRPC server serving a benchmark service according to info. @@ -142,6 +146,10 @@ func StartServer(info ServerInfo, opts ...grpc.ServerOption) (string, func()) { if err != nil { grpclog.Fatalf("Failed to listen: %v", err) } + nw := info.Network + if nw != nil { + lis = nw.Listener(lis) + } s := grpc.NewServer(opts...) switch info.Type { case "protobuf": @@ -222,12 +230,18 @@ func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn { return conn } -func runUnary(b *testing.B, maxConcurrentCalls, reqSize, respSize int) { +func runUnary(b *testing.B, maxConcurrentCalls, reqSize, respSize, kbps, mtu int, ltc time.Duration) { s := stats.AddStats(b, 38) + nw := &latency.Network{Kbps: kbps, Latency: ltc, MTU: mtu} b.StopTimer() - target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf"}, grpc.MaxConcurrentStreams(uint32(maxConcurrentCalls+1))) + target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, grpc.MaxConcurrentStreams(uint32(maxConcurrentCalls+1))) defer stopper() - conn := NewClientConn(target, grpc.WithInsecure()) + conn := NewClientConn( + target, grpc.WithInsecure(), + grpc.WithDialer(func(address string, timeout time.Duration) (net.Conn, error) { + return nw.TimeoutDialer(net.DialTimeout)("tcp", address, timeout) + }), + ) tc := testpb.NewBenchmarkServiceClient(conn) // Warm up connection. @@ -265,12 +279,18 @@ func runUnary(b *testing.B, maxConcurrentCalls, reqSize, respSize int) { conn.Close() } -func runStream(b *testing.B, maxConcurrentCalls, reqSize, respSize int) { +func runStream(b *testing.B, maxConcurrentCalls, reqSize, respSize, kbps, mtu int, ltc time.Duration) { s := stats.AddStats(b, 38) + nw := &latency.Network{Kbps: kbps, Latency: ltc, MTU: mtu} b.StopTimer() - target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf"}, grpc.MaxConcurrentStreams(uint32(maxConcurrentCalls+1))) + target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, grpc.MaxConcurrentStreams(uint32(maxConcurrentCalls+1))) defer stopper() - conn := NewClientConn(target, grpc.WithInsecure()) + conn := NewClientConn( + target, grpc.WithInsecure(), + grpc.WithDialer(func(address string, timeout time.Duration) (net.Conn, error) { + return nw.TimeoutDialer(net.DialTimeout)("tcp", address, timeout) + }), + ) tc := testpb.NewBenchmarkServiceClient(conn) // Warm up connection. diff --git a/benchmark/benchmark16_test.go b/benchmark/benchmark16_test.go index fcbcb15c..bff52c6c 100644 --- a/benchmark/benchmark16_test.go +++ b/benchmark/benchmark16_test.go @@ -30,80 +30,80 @@ import ( func BenchmarkClientStreamc1(b *testing.B) { grpc.EnableTracing = true - runStream(b, 1, 1, 1) + runStream(b, 1, 1, 1, 0, 0, 0) } func BenchmarkClientStreamc8(b *testing.B) { grpc.EnableTracing = true - runStream(b, 8, 1, 1) + runStream(b, 8, 1, 1, 0, 0, 0) } func BenchmarkClientStreamc64(b *testing.B) { grpc.EnableTracing = true - runStream(b, 64, 1, 1) + runStream(b, 64, 1, 1, 0, 0, 0) } func BenchmarkClientStreamc512(b *testing.B) { grpc.EnableTracing = true - runStream(b, 512, 1, 1) + runStream(b, 512, 1, 1, 0, 0, 0) } func BenchmarkClientUnaryc1(b *testing.B) { grpc.EnableTracing = true - runUnary(b, 1, 1, 1) + runUnary(b, 1, 1, 1, 0, 0, 0) } func BenchmarkClientUnaryc8(b *testing.B) { grpc.EnableTracing = true - runUnary(b, 8, 1, 1) + runUnary(b, 8, 1, 1, 0, 0, 0) } func BenchmarkClientUnaryc64(b *testing.B) { grpc.EnableTracing = true - runUnary(b, 64, 1, 1) + runUnary(b, 64, 1, 1, 0, 0, 0) } func BenchmarkClientUnaryc512(b *testing.B) { grpc.EnableTracing = true - runUnary(b, 512, 1, 1) + runUnary(b, 512, 1, 1, 0, 0, 0) } func BenchmarkClientStreamNoTracec1(b *testing.B) { grpc.EnableTracing = false - runStream(b, 1, 1, 1) + runStream(b, 1, 1, 1, 0, 0, 0) } func BenchmarkClientStreamNoTracec8(b *testing.B) { grpc.EnableTracing = false - runStream(b, 8, 1, 1) + runStream(b, 8, 1, 1, 0, 0, 0) } func BenchmarkClientStreamNoTracec64(b *testing.B) { grpc.EnableTracing = false - runStream(b, 64, 1, 1) + runStream(b, 64, 1, 1, 0, 0, 0) } func BenchmarkClientStreamNoTracec512(b *testing.B) { grpc.EnableTracing = false - runStream(b, 512, 1, 1) + runStream(b, 512, 1, 1, 0, 0, 0) } func BenchmarkClientUnaryNoTracec1(b *testing.B) { grpc.EnableTracing = false - runUnary(b, 1, 1, 1) + runUnary(b, 1, 1, 1, 0, 0, 0) } func BenchmarkClientUnaryNoTracec8(b *testing.B) { grpc.EnableTracing = false - runUnary(b, 8, 1, 1) + runUnary(b, 8, 1, 1, 0, 0, 0) } func BenchmarkClientUnaryNoTracec64(b *testing.B) { grpc.EnableTracing = false - runUnary(b, 64, 1, 1) + runUnary(b, 64, 1, 1, 0, 0, 0) } func BenchmarkClientUnaryNoTracec512(b *testing.B) { grpc.EnableTracing = false - runUnary(b, 512, 1, 1) + runUnary(b, 512, 1, 1, 0, 0, 0) } func TestMain(m *testing.M) { diff --git a/benchmark/benchmark17_test.go b/benchmark/benchmark17_test.go index f6353d48..cb6231ad 100644 --- a/benchmark/benchmark17_test.go +++ b/benchmark/benchmark17_test.go @@ -24,6 +24,7 @@ import ( "fmt" "os" "testing" + "time" "google.golang.org/grpc" "google.golang.org/grpc/benchmark/stats" @@ -31,25 +32,39 @@ import ( func BenchmarkClient(b *testing.B) { maxConcurrentCalls := []int{1, 8, 64, 512} - reqSizeBytes := []int{1, 1024} - reqspSizeBytes := []int{1, 1024} + reqSizeBytes := []int{1, 1024, 1024 * 1024} + reqspSizeBytes := []int{1, 1024, 1024 * 1024} + kbps := []int{0, 10240} // if non-positive, infinite + MTU := []int{0, 10} // if non-positive, infinite + // When set the latency to 0 (no delay), the result is slower than the real result with no delay + // because latency simulation section has extra operations + latency := []time.Duration{0, 40 * time.Millisecond} // if non-positive, no delay. + for _, enableTracing := range []bool{true, false} { grpc.EnableTracing = enableTracing tracing := "Tracing" if !enableTracing { tracing = "noTrace" } - for _, maxC := range maxConcurrentCalls { - for _, reqS := range reqSizeBytes { - for _, respS := range reqspSizeBytes { - b.Run(fmt.Sprintf("Unary-%s-maxConcurrentCalls_"+ - "%#v-reqSize_%#vB-respSize_%#vB", tracing, maxC, reqS, respS), func(b *testing.B) { - runUnary(b, maxC, reqS, respS) - }) - b.Run(fmt.Sprintf("Stream-%s-maxConcurrentCalls_"+ - "%#v-reqSize_%#vB-respSize_%#vB", tracing, maxC, reqS, respS), func(b *testing.B) { - runStream(b, maxC, reqS, respS) - }) + for _, ltc := range latency { + for _, k := range kbps { + for _, mtu := range MTU { + for _, maxC := range maxConcurrentCalls { + for _, reqS := range reqSizeBytes { + for _, respS := range reqspSizeBytes { + b.Run(fmt.Sprintf("Unary-%s-kbps_%#v-MTU_%#v-maxConcurrentCalls_"+ + "%#v-reqSize_%#vB-respSize_%#vB-latency_%s", + tracing, k, mtu, maxC, reqS, respS, ltc.String()), func(b *testing.B) { + runUnary(b, maxC, reqS, respS, k, mtu, ltc) + }) + b.Run(fmt.Sprintf("Stream-%s-kbps_%#v-MTU_%#v-maxConcurrentCalls_"+ + "%#v-reqSize_%#vB-respSize_%#vB-latency_%s", + tracing, k, mtu, maxC, reqS, respS, ltc.String()), func(b *testing.B) { + runStream(b, maxC, reqS, respS, k, mtu, ltc) + }) + } + } + } } } } diff --git a/benchmark/stats/histogram.go b/benchmark/stats/histogram.go index f2fdf231..f038d26e 100644 --- a/benchmark/stats/histogram.go +++ b/benchmark/stats/histogram.go @@ -104,8 +104,14 @@ func NewHistogram(opts HistogramOptions) *Histogram { // Print writes textual output of the histogram values. func (h *Histogram) Print(w io.Writer) { + h.PrintWithUnit(w, 1) +} + +// PrintWithUnit writes textual output of the histogram values . +// Data in histogram is divided by a Unit before print. +func (h *Histogram) PrintWithUnit(w io.Writer, unit float64) { avg := float64(h.Sum) / float64(h.Count) - fmt.Fprintf(w, "Count: %d Min: %d Max: %d Avg: %.2f\n", h.Count, h.Min, h.Max, avg) + fmt.Fprintf(w, "Count: %d Min: %5.1f Max: %5.1f Avg: %.2f\n", h.Count, float64(h.Min)/unit, float64(h.Max)/unit, avg/unit) fmt.Fprintf(w, "%s\n", strings.Repeat("-", 60)) if h.Count <= 0 { return @@ -121,9 +127,9 @@ func (h *Histogram) Print(w io.Writer) { accCount := int64(0) for i, b := range h.Buckets { - fmt.Fprintf(w, "[%*f, ", maxBucketDigitLen, b.LowBound) + fmt.Fprintf(w, "[%*f, ", maxBucketDigitLen, b.LowBound/unit) if i+1 < len(h.Buckets) { - fmt.Fprintf(w, "%*f)", maxBucketDigitLen, h.Buckets[i+1].LowBound) + fmt.Fprintf(w, "%*f)", maxBucketDigitLen, h.Buckets[i+1].LowBound/unit) } else { fmt.Fprintf(w, "%*s)", maxBucketDigitLen, "inf") } diff --git a/benchmark/stats/stats.go b/benchmark/stats/stats.go index d5f22203..03569ae2 100644 --- a/benchmark/stats/stats.go +++ b/benchmark/stats/stats.go @@ -93,9 +93,6 @@ func (stats *Stats) maybeUpdate() { stats.unit = u } - // Adjust the min/max according to the new unit. - stats.min /= int64(stats.unit) - stats.max /= int64(stats.unit) numBuckets := stats.numBuckets if n := int(stats.max - stats.min + 1); n < numBuckets { numBuckets = n @@ -108,7 +105,7 @@ func (stats *Stats) maybeUpdate() { MinValue: stats.min}) for _, d := range stats.durations { - stats.histogram.Add(int64(d / stats.unit)) + stats.histogram.Add(int64(d)) } stats.dirty = false @@ -122,7 +119,7 @@ func (stats *Stats) Print(w io.Writer) { fmt.Fprint(w, "Histogram (empty)\n") } else { fmt.Fprintf(w, "Histogram (unit: %s)\n", fmt.Sprintf("%v", stats.unit)[1:]) - stats.histogram.Print(w) + stats.histogram.PrintWithUnit(w, float64(stats.unit)) } }