benchmark: remove multi-layer for loop (#1339)

This commit is contained in:
ZhouyihaiDing
2017-07-21 13:39:06 -07:00
committed by dfawley
parent 0c41876308
commit 6495e8dfeb
3 changed files with 100 additions and 64 deletions

View File

@ -37,6 +37,34 @@ import (
"google.golang.org/grpc/grpclog" "google.golang.org/grpc/grpclog"
) )
// Features contains most fields for a benchmark
type Features struct {
EnableTrace bool
Latency time.Duration
Kbps int
Mtu int
MaxConcurrentCalls int
ReqSizeBytes int
RespSizeBytes int
}
func (f Features) String() string {
return fmt.Sprintf("latency_%s-kbps_%#v-MTU_%#v-maxConcurrentCalls_"+
"%#v-reqSize_%#vB-respSize_%#vB",
f.Latency.String(), f.Kbps, f.Mtu, f.MaxConcurrentCalls, f.ReqSizeBytes, f.RespSizeBytes)
}
// AddOne add 1 to the features slice
func AddOne(features []int, featuresMaxPosition []int) {
for i := len(features) - 1; i >= 0; i-- {
features[i] = (features[i] + 1)
if features[i]/featuresMaxPosition[i] == 0 {
break
}
features[i] = features[i] % featuresMaxPosition[i]
}
}
// Allows reuse of the same testpb.Payload object. // Allows reuse of the same testpb.Payload object.
func setPayload(p *testpb.Payload, t testpb.PayloadType, size int) { func setPayload(p *testpb.Payload, t testpb.PayloadType, size int) {
if size < 0 { if size < 0 {
@ -230,11 +258,11 @@ func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn {
return conn return conn
} }
func runUnary(b *testing.B, maxConcurrentCalls, reqSize, respSize, kbps, mtu int, ltc time.Duration) { func runUnary(b *testing.B, benchFeatures Features) {
s := stats.AddStats(b, 38) s := stats.AddStats(b, 38)
nw := &latency.Network{Kbps: kbps, Latency: ltc, MTU: mtu} nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu}
b.StopTimer() b.StopTimer()
target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, grpc.MaxConcurrentStreams(uint32(maxConcurrentCalls+1))) target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
defer stopper() defer stopper()
conn := NewClientConn( conn := NewClientConn(
target, grpc.WithInsecure(), target, grpc.WithInsecure(),
@ -246,21 +274,21 @@ func runUnary(b *testing.B, maxConcurrentCalls, reqSize, respSize, kbps, mtu int
// Warm up connection. // Warm up connection.
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
unaryCaller(tc, reqSize, respSize) unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
} }
ch := make(chan int, maxConcurrentCalls*4) ch := make(chan int, benchFeatures.MaxConcurrentCalls*4)
var ( var (
mu sync.Mutex mu sync.Mutex
wg sync.WaitGroup wg sync.WaitGroup
) )
wg.Add(maxConcurrentCalls) wg.Add(benchFeatures.MaxConcurrentCalls)
// Distribute the b.N calls over maxConcurrentCalls workers. // Distribute the b.N calls over maxConcurrentCalls workers.
for i := 0; i < maxConcurrentCalls; i++ { for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ {
go func() { go func() {
for range ch { for range ch {
start := time.Now() start := time.Now()
unaryCaller(tc, reqSize, respSize) unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
elapse := time.Since(start) elapse := time.Since(start)
mu.Lock() mu.Lock()
s.Add(elapse) s.Add(elapse)
@ -279,11 +307,11 @@ func runUnary(b *testing.B, maxConcurrentCalls, reqSize, respSize, kbps, mtu int
conn.Close() conn.Close()
} }
func runStream(b *testing.B, maxConcurrentCalls, reqSize, respSize, kbps, mtu int, ltc time.Duration) { func runStream(b *testing.B, benchFeatures Features) {
s := stats.AddStats(b, 38) s := stats.AddStats(b, 38)
nw := &latency.Network{Kbps: kbps, Latency: ltc, MTU: mtu} nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu}
b.StopTimer() b.StopTimer()
target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, grpc.MaxConcurrentStreams(uint32(maxConcurrentCalls+1))) target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
defer stopper() defer stopper()
conn := NewClientConn( conn := NewClientConn(
target, grpc.WithInsecure(), target, grpc.WithInsecure(),
@ -299,18 +327,18 @@ func runStream(b *testing.B, maxConcurrentCalls, reqSize, respSize, kbps, mtu in
b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
} }
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
streamCaller(stream, reqSize, respSize) streamCaller(stream, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
} }
ch := make(chan struct{}, maxConcurrentCalls*4) ch := make(chan struct{}, benchFeatures.MaxConcurrentCalls*4)
var ( var (
mu sync.Mutex mu sync.Mutex
wg sync.WaitGroup wg sync.WaitGroup
) )
wg.Add(maxConcurrentCalls) wg.Add(benchFeatures.MaxConcurrentCalls)
// Distribute the b.N calls over maxConcurrentCalls workers. // Distribute the b.N calls over maxConcurrentCalls workers.
for i := 0; i < maxConcurrentCalls; i++ { for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ {
stream, err := tc.StreamingCall(context.Background()) stream, err := tc.StreamingCall(context.Background())
if err != nil { if err != nil {
b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
@ -318,7 +346,7 @@ func runStream(b *testing.B, maxConcurrentCalls, reqSize, respSize, kbps, mtu in
go func() { go func() {
for range ch { for range ch {
start := time.Now() start := time.Now()
streamCaller(stream, reqSize, respSize) streamCaller(stream, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
elapse := time.Since(start) elapse := time.Since(start)
mu.Lock() mu.Lock()
s.Add(elapse) s.Add(elapse)

View File

@ -30,80 +30,80 @@ import (
func BenchmarkClientStreamc1(b *testing.B) { func BenchmarkClientStreamc1(b *testing.B) {
grpc.EnableTracing = true grpc.EnableTracing = true
runStream(b, 1, 1, 1, 0, 0, 0) runStream(b, Features{true, 0, 0, 0, 1, 1, 1})
} }
func BenchmarkClientStreamc8(b *testing.B) { func BenchmarkClientStreamc8(b *testing.B) {
grpc.EnableTracing = true grpc.EnableTracing = true
runStream(b, 8, 1, 1, 0, 0, 0) runStream(b, Features{true, 0, 0, 0, 8, 1, 1})
} }
func BenchmarkClientStreamc64(b *testing.B) { func BenchmarkClientStreamc64(b *testing.B) {
grpc.EnableTracing = true grpc.EnableTracing = true
runStream(b, 64, 1, 1, 0, 0, 0) runStream(b, Features{true, 0, 0, 0, 64, 1, 1})
} }
func BenchmarkClientStreamc512(b *testing.B) { func BenchmarkClientStreamc512(b *testing.B) {
grpc.EnableTracing = true grpc.EnableTracing = true
runStream(b, 512, 1, 1, 0, 0, 0) runStream(b, Features{true, 0, 0, 0, 512, 1, 1})
} }
func BenchmarkClientUnaryc1(b *testing.B) { func BenchmarkClientUnaryc1(b *testing.B) {
grpc.EnableTracing = true grpc.EnableTracing = true
runUnary(b, 1, 1, 1, 0, 0, 0) runStream(b, Features{true, 0, 0, 0, 1, 1, 1})
} }
func BenchmarkClientUnaryc8(b *testing.B) { func BenchmarkClientUnaryc8(b *testing.B) {
grpc.EnableTracing = true grpc.EnableTracing = true
runUnary(b, 8, 1, 1, 0, 0, 0) runStream(b, Features{true, 0, 0, 0, 8, 1, 1})
} }
func BenchmarkClientUnaryc64(b *testing.B) { func BenchmarkClientUnaryc64(b *testing.B) {
grpc.EnableTracing = true grpc.EnableTracing = true
runUnary(b, 64, 1, 1, 0, 0, 0) runStream(b, Features{true, 0, 0, 0, 64, 1, 1})
} }
func BenchmarkClientUnaryc512(b *testing.B) { func BenchmarkClientUnaryc512(b *testing.B) {
grpc.EnableTracing = true grpc.EnableTracing = true
runUnary(b, 512, 1, 1, 0, 0, 0) runStream(b, Features{true, 0, 0, 0, 512, 1, 1})
} }
func BenchmarkClientStreamNoTracec1(b *testing.B) { func BenchmarkClientStreamNoTracec1(b *testing.B) {
grpc.EnableTracing = false grpc.EnableTracing = false
runStream(b, 1, 1, 1, 0, 0, 0) runStream(b, Features{false, 0, 0, 0, 1, 1, 1})
} }
func BenchmarkClientStreamNoTracec8(b *testing.B) { func BenchmarkClientStreamNoTracec8(b *testing.B) {
grpc.EnableTracing = false grpc.EnableTracing = false
runStream(b, 8, 1, 1, 0, 0, 0) runStream(b, Features{false, 0, 0, 0, 8, 1, 1})
} }
func BenchmarkClientStreamNoTracec64(b *testing.B) { func BenchmarkClientStreamNoTracec64(b *testing.B) {
grpc.EnableTracing = false grpc.EnableTracing = false
runStream(b, 64, 1, 1, 0, 0, 0) runStream(b, Features{false, 0, 0, 0, 64, 1, 1})
} }
func BenchmarkClientStreamNoTracec512(b *testing.B) { func BenchmarkClientStreamNoTracec512(b *testing.B) {
grpc.EnableTracing = false grpc.EnableTracing = false
runStream(b, 512, 1, 1, 0, 0, 0) runStream(b, Features{false, 0, 0, 0, 512, 1, 1})
} }
func BenchmarkClientUnaryNoTracec1(b *testing.B) { func BenchmarkClientUnaryNoTracec1(b *testing.B) {
grpc.EnableTracing = false grpc.EnableTracing = false
runUnary(b, 1, 1, 1, 0, 0, 0) runStream(b, Features{false, 0, 0, 0, 1, 1, 1})
} }
func BenchmarkClientUnaryNoTracec8(b *testing.B) { func BenchmarkClientUnaryNoTracec8(b *testing.B) {
grpc.EnableTracing = false grpc.EnableTracing = false
runUnary(b, 8, 1, 1, 0, 0, 0) runStream(b, Features{false, 0, 0, 0, 8, 1, 1})
} }
func BenchmarkClientUnaryNoTracec64(b *testing.B) { func BenchmarkClientUnaryNoTracec64(b *testing.B) {
grpc.EnableTracing = false grpc.EnableTracing = false
runUnary(b, 64, 1, 1, 0, 0, 0) runStream(b, Features{false, 0, 0, 0, 64, 1, 1})
} }
func BenchmarkClientUnaryNoTracec512(b *testing.B) { func BenchmarkClientUnaryNoTracec512(b *testing.B) {
grpc.EnableTracing = false grpc.EnableTracing = false
runUnary(b, 512, 1, 1, 0, 0, 0) runStream(b, Features{false, 0, 0, 0, 512, 1, 1})
} }
func TestMain(m *testing.M) { func TestMain(m *testing.M) {

View File

@ -23,6 +23,7 @@ package benchmark
import ( import (
"fmt" "fmt"
"os" "os"
"reflect"
"testing" "testing"
"time" "time"
@ -31,45 +32,52 @@ import (
) )
func BenchmarkClient(b *testing.B) { func BenchmarkClient(b *testing.B) {
maxConcurrentCalls := []int{1, 8, 64, 512} enableTrace := []bool{true, false} // run both enable and disable by default
reqSizeBytes := []int{1, 1024, 1024 * 1024}
reqspSizeBytes := []int{1, 1024, 1024 * 1024}
kbps := []int{0, 10240} // if non-positive, infinite
MTU := []int{0, 10} // if non-positive, infinite
// When set the latency to 0 (no delay), the result is slower than the real result with no delay // When set the latency to 0 (no delay), the result is slower than the real result with no delay
// because latency simulation section has extra operations // because latency simulation section has extra operations
latency := []time.Duration{0, 40 * time.Millisecond} // if non-positive, no delay. latency := []time.Duration{0, 40 * time.Millisecond} // if non-positive, no delay.
kbps := []int{0, 10240} // if non-positive, infinite
mtu := []int{0} // if non-positive, infinite
maxConcurrentCalls := []int{1, 8, 64, 512}
reqSizeBytes := []int{1, 1024 * 1024}
respSizeBytes := []int{1, 1024 * 1024}
featuresCurPos := make([]int, 7)
for _, enableTracing := range []bool{true, false} { // 0:enableTracing 1:md 2:ltc 3:kbps 4:mtu 5:maxC 6:connCount 7:reqSize 8:respSize
grpc.EnableTracing = enableTracing featuresMaxPosition := []int{len(enableTrace), len(latency), len(kbps), len(mtu), len(maxConcurrentCalls), len(reqSizeBytes), len(respSizeBytes)}
tracing := "Tracing" initalPos := make([]int, len(featuresCurPos))
if !enableTracing {
// run benchmarks
start := true
for !reflect.DeepEqual(featuresCurPos, initalPos) || start {
start = false
tracing := "Trace"
if !enableTrace[featuresCurPos[0]] {
tracing = "noTrace" tracing = "noTrace"
} }
for _, ltc := range latency {
for _, k := range kbps { benchFeature := Features{
for _, mtu := range MTU { EnableTrace: enableTrace[featuresCurPos[0]],
for _, maxC := range maxConcurrentCalls { Latency: latency[featuresCurPos[1]],
for _, reqS := range reqSizeBytes { Kbps: kbps[featuresCurPos[2]],
for _, respS := range reqspSizeBytes { Mtu: mtu[featuresCurPos[3]],
b.Run(fmt.Sprintf("Unary-%s-kbps_%#v-MTU_%#v-maxConcurrentCalls_"+ MaxConcurrentCalls: maxConcurrentCalls[featuresCurPos[4]],
"%#v-reqSize_%#vB-respSize_%#vB-latency_%s", ReqSizeBytes: reqSizeBytes[featuresCurPos[5]],
tracing, k, mtu, maxC, reqS, respS, ltc.String()), func(b *testing.B) { RespSizeBytes: respSizeBytes[featuresCurPos[6]],
runUnary(b, maxC, reqS, respS, k, mtu, ltc)
})
b.Run(fmt.Sprintf("Stream-%s-kbps_%#v-MTU_%#v-maxConcurrentCalls_"+
"%#v-reqSize_%#vB-respSize_%#vB-latency_%s",
tracing, k, mtu, maxC, reqS, respS, ltc.String()), func(b *testing.B) {
runStream(b, maxC, reqS, respS, k, mtu, ltc)
})
}
}
}
}
}
}
} }
grpc.EnableTracing = enableTrace[featuresCurPos[0]]
b.Run(fmt.Sprintf("Unary-%s-%s",
tracing, benchFeature.String()), func(b *testing.B) {
runUnary(b, benchFeature)
})
b.Run(fmt.Sprintf("Stream-%s-%s",
tracing, benchFeature.String()), func(b *testing.B) {
runStream(b, benchFeature)
})
AddOne(featuresCurPos, featuresMaxPosition)
}
} }
func TestMain(m *testing.M) { func TestMain(m *testing.M) {