diff --git a/benchmark/benchmark.go b/benchmark/benchmark.go index df47f69d..90574cf5 100644 --- a/benchmark/benchmark.go +++ b/benchmark/benchmark.go @@ -37,6 +37,34 @@ import ( "google.golang.org/grpc/grpclog" ) +// Features contains most fields for a benchmark +type Features struct { + EnableTrace bool + Latency time.Duration + Kbps int + Mtu int + MaxConcurrentCalls int + ReqSizeBytes int + RespSizeBytes int +} + +func (f Features) String() string { + return fmt.Sprintf("latency_%s-kbps_%#v-MTU_%#v-maxConcurrentCalls_"+ + "%#v-reqSize_%#vB-respSize_%#vB", + f.Latency.String(), f.Kbps, f.Mtu, f.MaxConcurrentCalls, f.ReqSizeBytes, f.RespSizeBytes) +} + +// AddOne add 1 to the features slice +func AddOne(features []int, featuresMaxPosition []int) { + for i := len(features) - 1; i >= 0; i-- { + features[i] = (features[i] + 1) + if features[i]/featuresMaxPosition[i] == 0 { + break + } + features[i] = features[i] % featuresMaxPosition[i] + } +} + // Allows reuse of the same testpb.Payload object. func setPayload(p *testpb.Payload, t testpb.PayloadType, size int) { if size < 0 { @@ -230,11 +258,11 @@ func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn { return conn } -func runUnary(b *testing.B, maxConcurrentCalls, reqSize, respSize, kbps, mtu int, ltc time.Duration) { +func runUnary(b *testing.B, benchFeatures Features) { s := stats.AddStats(b, 38) - nw := &latency.Network{Kbps: kbps, Latency: ltc, MTU: mtu} + nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu} b.StopTimer() - target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, grpc.MaxConcurrentStreams(uint32(maxConcurrentCalls+1))) + target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1))) defer stopper() conn := NewClientConn( target, grpc.WithInsecure(), @@ -246,21 +274,21 @@ func runUnary(b *testing.B, maxConcurrentCalls, reqSize, respSize, kbps, mtu int // Warm up connection. for i := 0; i < 10; i++ { - unaryCaller(tc, reqSize, respSize) + unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes) } - ch := make(chan int, maxConcurrentCalls*4) + ch := make(chan int, benchFeatures.MaxConcurrentCalls*4) var ( mu sync.Mutex wg sync.WaitGroup ) - wg.Add(maxConcurrentCalls) + wg.Add(benchFeatures.MaxConcurrentCalls) // Distribute the b.N calls over maxConcurrentCalls workers. - for i := 0; i < maxConcurrentCalls; i++ { + for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ { go func() { for range ch { start := time.Now() - unaryCaller(tc, reqSize, respSize) + unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes) elapse := time.Since(start) mu.Lock() s.Add(elapse) @@ -279,11 +307,11 @@ func runUnary(b *testing.B, maxConcurrentCalls, reqSize, respSize, kbps, mtu int conn.Close() } -func runStream(b *testing.B, maxConcurrentCalls, reqSize, respSize, kbps, mtu int, ltc time.Duration) { +func runStream(b *testing.B, benchFeatures Features) { s := stats.AddStats(b, 38) - nw := &latency.Network{Kbps: kbps, Latency: ltc, MTU: mtu} + nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu} b.StopTimer() - target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, grpc.MaxConcurrentStreams(uint32(maxConcurrentCalls+1))) + target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1))) defer stopper() conn := NewClientConn( target, grpc.WithInsecure(), @@ -299,18 +327,18 @@ func runStream(b *testing.B, maxConcurrentCalls, reqSize, respSize, kbps, mtu in b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) } for i := 0; i < 10; i++ { - streamCaller(stream, reqSize, respSize) + streamCaller(stream, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes) } - ch := make(chan struct{}, maxConcurrentCalls*4) + ch := make(chan struct{}, benchFeatures.MaxConcurrentCalls*4) var ( mu sync.Mutex wg sync.WaitGroup ) - wg.Add(maxConcurrentCalls) + wg.Add(benchFeatures.MaxConcurrentCalls) // Distribute the b.N calls over maxConcurrentCalls workers. - for i := 0; i < maxConcurrentCalls; i++ { + for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ { stream, err := tc.StreamingCall(context.Background()) if err != nil { b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) @@ -318,7 +346,7 @@ func runStream(b *testing.B, maxConcurrentCalls, reqSize, respSize, kbps, mtu in go func() { for range ch { start := time.Now() - streamCaller(stream, reqSize, respSize) + streamCaller(stream, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes) elapse := time.Since(start) mu.Lock() s.Add(elapse) diff --git a/benchmark/benchmark16_test.go b/benchmark/benchmark16_test.go index bff52c6c..ba4797d6 100644 --- a/benchmark/benchmark16_test.go +++ b/benchmark/benchmark16_test.go @@ -30,80 +30,80 @@ import ( func BenchmarkClientStreamc1(b *testing.B) { grpc.EnableTracing = true - runStream(b, 1, 1, 1, 0, 0, 0) + runStream(b, Features{true, 0, 0, 0, 1, 1, 1}) } func BenchmarkClientStreamc8(b *testing.B) { grpc.EnableTracing = true - runStream(b, 8, 1, 1, 0, 0, 0) + runStream(b, Features{true, 0, 0, 0, 8, 1, 1}) } func BenchmarkClientStreamc64(b *testing.B) { grpc.EnableTracing = true - runStream(b, 64, 1, 1, 0, 0, 0) + runStream(b, Features{true, 0, 0, 0, 64, 1, 1}) } func BenchmarkClientStreamc512(b *testing.B) { grpc.EnableTracing = true - runStream(b, 512, 1, 1, 0, 0, 0) + runStream(b, Features{true, 0, 0, 0, 512, 1, 1}) } func BenchmarkClientUnaryc1(b *testing.B) { grpc.EnableTracing = true - runUnary(b, 1, 1, 1, 0, 0, 0) + runStream(b, Features{true, 0, 0, 0, 1, 1, 1}) } func BenchmarkClientUnaryc8(b *testing.B) { grpc.EnableTracing = true - runUnary(b, 8, 1, 1, 0, 0, 0) + runStream(b, Features{true, 0, 0, 0, 8, 1, 1}) } func BenchmarkClientUnaryc64(b *testing.B) { grpc.EnableTracing = true - runUnary(b, 64, 1, 1, 0, 0, 0) + runStream(b, Features{true, 0, 0, 0, 64, 1, 1}) } func BenchmarkClientUnaryc512(b *testing.B) { grpc.EnableTracing = true - runUnary(b, 512, 1, 1, 0, 0, 0) + runStream(b, Features{true, 0, 0, 0, 512, 1, 1}) } func BenchmarkClientStreamNoTracec1(b *testing.B) { grpc.EnableTracing = false - runStream(b, 1, 1, 1, 0, 0, 0) + runStream(b, Features{false, 0, 0, 0, 1, 1, 1}) } func BenchmarkClientStreamNoTracec8(b *testing.B) { grpc.EnableTracing = false - runStream(b, 8, 1, 1, 0, 0, 0) + runStream(b, Features{false, 0, 0, 0, 8, 1, 1}) } func BenchmarkClientStreamNoTracec64(b *testing.B) { grpc.EnableTracing = false - runStream(b, 64, 1, 1, 0, 0, 0) + runStream(b, Features{false, 0, 0, 0, 64, 1, 1}) } func BenchmarkClientStreamNoTracec512(b *testing.B) { grpc.EnableTracing = false - runStream(b, 512, 1, 1, 0, 0, 0) + runStream(b, Features{false, 0, 0, 0, 512, 1, 1}) } func BenchmarkClientUnaryNoTracec1(b *testing.B) { grpc.EnableTracing = false - runUnary(b, 1, 1, 1, 0, 0, 0) + runStream(b, Features{false, 0, 0, 0, 1, 1, 1}) } func BenchmarkClientUnaryNoTracec8(b *testing.B) { grpc.EnableTracing = false - runUnary(b, 8, 1, 1, 0, 0, 0) + runStream(b, Features{false, 0, 0, 0, 8, 1, 1}) } func BenchmarkClientUnaryNoTracec64(b *testing.B) { grpc.EnableTracing = false - runUnary(b, 64, 1, 1, 0, 0, 0) + runStream(b, Features{false, 0, 0, 0, 64, 1, 1}) } func BenchmarkClientUnaryNoTracec512(b *testing.B) { grpc.EnableTracing = false - runUnary(b, 512, 1, 1, 0, 0, 0) + runStream(b, Features{false, 0, 0, 0, 512, 1, 1}) } func TestMain(m *testing.M) { diff --git a/benchmark/benchmark17_test.go b/benchmark/benchmark17_test.go index cb6231ad..df1bb431 100644 --- a/benchmark/benchmark17_test.go +++ b/benchmark/benchmark17_test.go @@ -23,6 +23,7 @@ package benchmark import ( "fmt" "os" + "reflect" "testing" "time" @@ -31,45 +32,52 @@ import ( ) func BenchmarkClient(b *testing.B) { - maxConcurrentCalls := []int{1, 8, 64, 512} - reqSizeBytes := []int{1, 1024, 1024 * 1024} - reqspSizeBytes := []int{1, 1024, 1024 * 1024} - kbps := []int{0, 10240} // if non-positive, infinite - MTU := []int{0, 10} // if non-positive, infinite + enableTrace := []bool{true, false} // run both enable and disable by default // When set the latency to 0 (no delay), the result is slower than the real result with no delay // because latency simulation section has extra operations latency := []time.Duration{0, 40 * time.Millisecond} // if non-positive, no delay. + kbps := []int{0, 10240} // if non-positive, infinite + mtu := []int{0} // if non-positive, infinite + maxConcurrentCalls := []int{1, 8, 64, 512} + reqSizeBytes := []int{1, 1024 * 1024} + respSizeBytes := []int{1, 1024 * 1024} + featuresCurPos := make([]int, 7) - for _, enableTracing := range []bool{true, false} { - grpc.EnableTracing = enableTracing - tracing := "Tracing" - if !enableTracing { + // 0:enableTracing 1:md 2:ltc 3:kbps 4:mtu 5:maxC 6:connCount 7:reqSize 8:respSize + featuresMaxPosition := []int{len(enableTrace), len(latency), len(kbps), len(mtu), len(maxConcurrentCalls), len(reqSizeBytes), len(respSizeBytes)} + initalPos := make([]int, len(featuresCurPos)) + + // run benchmarks + start := true + for !reflect.DeepEqual(featuresCurPos, initalPos) || start { + start = false + tracing := "Trace" + if !enableTrace[featuresCurPos[0]] { tracing = "noTrace" } - for _, ltc := range latency { - for _, k := range kbps { - for _, mtu := range MTU { - for _, maxC := range maxConcurrentCalls { - for _, reqS := range reqSizeBytes { - for _, respS := range reqspSizeBytes { - b.Run(fmt.Sprintf("Unary-%s-kbps_%#v-MTU_%#v-maxConcurrentCalls_"+ - "%#v-reqSize_%#vB-respSize_%#vB-latency_%s", - tracing, k, mtu, maxC, reqS, respS, ltc.String()), func(b *testing.B) { - runUnary(b, maxC, reqS, respS, k, mtu, ltc) - }) - b.Run(fmt.Sprintf("Stream-%s-kbps_%#v-MTU_%#v-maxConcurrentCalls_"+ - "%#v-reqSize_%#vB-respSize_%#vB-latency_%s", - tracing, k, mtu, maxC, reqS, respS, ltc.String()), func(b *testing.B) { - runStream(b, maxC, reqS, respS, k, mtu, ltc) - }) - } - } - } - } - } - } - } + benchFeature := Features{ + EnableTrace: enableTrace[featuresCurPos[0]], + Latency: latency[featuresCurPos[1]], + Kbps: kbps[featuresCurPos[2]], + Mtu: mtu[featuresCurPos[3]], + MaxConcurrentCalls: maxConcurrentCalls[featuresCurPos[4]], + ReqSizeBytes: reqSizeBytes[featuresCurPos[5]], + RespSizeBytes: respSizeBytes[featuresCurPos[6]], + } + + grpc.EnableTracing = enableTrace[featuresCurPos[0]] + b.Run(fmt.Sprintf("Unary-%s-%s", + tracing, benchFeature.String()), func(b *testing.B) { + runUnary(b, benchFeature) + }) + + b.Run(fmt.Sprintf("Stream-%s-%s", + tracing, benchFeature.String()), func(b *testing.B) { + runStream(b, benchFeature) + }) + AddOne(featuresCurPos, featuresMaxPosition) + } } func TestMain(m *testing.M) {