benchmain: add nop compressor and other usability tweaks (#1489)

This commit is contained in:
dfawley
2017-08-29 11:41:55 -07:00
committed by ZhouyihaiDing
parent e60698345e
commit 85a1e381f1

View File

@ -1,5 +1,3 @@
// +build go1.7
/* /*
* *
* Copyright 2017 gRPC authors. * Copyright 2017 gRPC authors.
@ -20,11 +18,14 @@
/* /*
Package main provides benchmark with setting flags. Package main provides benchmark with setting flags.
To run a certain benchmark with profile usage, the command is
go run benchmark/benchmain/main.go -kbps=0 -mtu=0 -maxConcurrentCalls=1 \ An example to run some benchmarks with profiling enabled:
-reqSizeBytes=1,1048576 -reqspSizeBytes=1,1048576 -runUnary=true -runStream=true \
-traceMode=true -compressorMode=true -latency=0s,5ms -benchtime=10s \ go run benchmark/benchmain/main.go -benchtime=10s -workloads=all \
-cpuProfile=cpuProf -memProfile=memProf -memProfileRate=10000 -compression=on -maxConcurrentCalls=1 -traceMode=false \
-reqSizeBytes=1,1048576 -respSizeBytes=1,1048576 \
-latency=0s -kbps=0 -mtu=0 \
-cpuProfile=cpuProf -memProfile=memProf -memProfileRate=10000
*/ */
package main package main
@ -32,6 +33,8 @@ import (
"errors" "errors"
"flag" "flag"
"fmt" "fmt"
"io"
"io/ioutil"
"log" "log"
"net" "net"
"os" "os"
@ -54,10 +57,24 @@ import (
"google.golang.org/grpc/grpclog" "google.golang.org/grpc/grpclog"
) )
const (
compressionOn = "on"
compressionOff = "off"
compressionBoth = "both"
)
var allCompressionModes = []string{compressionOn, compressionOff, compressionBoth}
const (
workloadsUnary = "unary"
workloadsStreaming = "streaming"
workloadsAll = "all"
)
var allWorkloads = []string{workloadsUnary, workloadsStreaming, workloadsAll}
var ( var (
// runMode{runUnary, runStream} runMode = []bool{true, true} // {runUnary, runStream}
runMode = []bool{true, true}
enableTrace = []bool{false}
// When set the latency to 0 (no delay), the result is slower than the real result with no delay // When set the latency to 0 (no delay), the result is slower than the real result with no delay
// because latency simulation section has extra operations // because latency simulation section has extra operations
ltc = []time.Duration{0, 40 * time.Millisecond} // if non-positive, no delay. ltc = []time.Duration{0, 40 * time.Millisecond} // if non-positive, no delay.
@ -66,10 +83,11 @@ var (
maxConcurrentCalls = []int{1, 8, 64, 512} maxConcurrentCalls = []int{1, 8, 64, 512}
reqSizeBytes = []int{1, 1024, 1024 * 1024} reqSizeBytes = []int{1, 1024, 1024 * 1024}
respSizeBytes = []int{1, 1024, 1024 * 1024} respSizeBytes = []int{1, 1024, 1024 * 1024}
enableTrace = []bool{false}
benchtime time.Duration benchtime time.Duration
memProfile, cpuProfile string memProfile, cpuProfile string
memProfileRate int memProfileRate int
enableCompressor = []bool{false} enableCompressor []bool
) )
func unaryBenchmark(startTimer func(), stopTimer func(int32), benchFeatures bm.Features, benchtime time.Duration, s *stats.Stats) { func unaryBenchmark(startTimer func(), stopTimer func(int32), benchFeatures bm.Features, benchtime time.Duration, s *stats.Stats) {
@ -90,12 +108,12 @@ func makeFuncUnary(benchFeatures bm.Features) (func(int), func()) {
sopts := []grpc.ServerOption{} sopts := []grpc.ServerOption{}
if benchFeatures.EnableCompressor { if benchFeatures.EnableCompressor {
sopts = append(sopts, sopts = append(sopts,
grpc.RPCCompressor(grpc.NewGZIPCompressor()), grpc.RPCCompressor(nopCompressor{}),
grpc.RPCDecompressor(grpc.NewGZIPDecompressor()), grpc.RPCDecompressor(nopDecompressor{}),
) )
opts = append(opts, opts = append(opts,
grpc.WithCompressor(grpc.NewGZIPCompressor()), grpc.WithCompressor(nopCompressor{}),
grpc.WithDecompressor(grpc.NewGZIPDecompressor()), grpc.WithDecompressor(nopDecompressor{}),
) )
} }
sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1))) sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
@ -205,46 +223,66 @@ func runBenchmark(caller func(int), startTimer func(), stopTimer func(int32), be
// Initiate main function to get settings of features. // Initiate main function to get settings of features.
func init() { func init() {
var runUnary, runStream bool var (
var traceMode, compressorMode bool workloads, compressorMode, readLatency string
var readLatency string readKbps, readMtu, readMaxConcurrentCalls intSliceType
var readKbps, readMtu, readMaxConcurrentCalls, readReqSizeBytes, readReqspSizeBytes intSliceType readReqSizeBytes, readRespSizeBytes intSliceType
flag.BoolVar(&runUnary, "runUnary", false, "runUnary") traceMode bool
flag.BoolVar(&runStream, "runStream", false, "runStream") )
flag.BoolVar(&traceMode, "traceMode", false, "traceMode") flag.StringVar(&workloads, "workloads", workloadsAll,
flag.StringVar(&readLatency, "latency", "", "latency") fmt.Sprintf("Workloads to execute - One of: %v", strings.Join(allWorkloads, ", ")))
flag.DurationVar(&benchtime, "benchtime", time.Second, "benchtime") flag.BoolVar(&traceMode, "traceMode", false, "Enable gRPC tracing")
flag.Var(&readKbps, "kbps", "kbps") flag.StringVar(&readLatency, "latency", "", "Simulated one-way network latency - may be a comma-separated list")
flag.Var(&readMtu, "mtu", "mtu") flag.DurationVar(&benchtime, "benchtime", time.Second, "Configures the amount of time to run each benchmark")
flag.Var(&readMaxConcurrentCalls, "maxConcurrentCalls", "maxConcurrentCalls") flag.Var(&readKbps, "kbps", "Simulated network throughput (in kbps) - may be a comma-separated list")
flag.Var(&readReqSizeBytes, "reqSizeBytes", "reqSizeBytes") flag.Var(&readMtu, "mtu", "Simulated network MTU (Maximum Transmission Unit) - may be a comma-separated list")
flag.Var(&readReqspSizeBytes, "reqspSizeBytes", "reqspSizeBytes") flag.Var(&readMaxConcurrentCalls, "maxConcurrentCalls", "Number of concurrent RPCs during benchmarks")
flag.StringVar(&memProfile, "memProfile", "", "memProfile") flag.Var(&readReqSizeBytes, "reqSizeBytes", "Request size in bytes - may be a comma-separated list")
flag.IntVar(&memProfileRate, "memProfileRate", 0, "memProfileRate") flag.Var(&readRespSizeBytes, "respSizeBytes", "Response size in bytes - may be a comma-separated list")
flag.StringVar(&cpuProfile, "cpuProfile", "", "cpuProfile") flag.StringVar(&memProfile, "memProfile", "", "Enables memory profiling output to the filename provided")
flag.BoolVar(&compressorMode, "compressorMode", false, "compressorMode") flag.IntVar(&memProfileRate, "memProfileRate", 0, "Configures the memory profiling rate")
flag.StringVar(&cpuProfile, "cpuProfile", "", "Enables CPU profiling output to the filename provided")
flag.StringVar(&compressorMode, "compression", compressionOff,
fmt.Sprintf("Compression mode - One of: %v", strings.Join(allCompressionModes, ", ")))
flag.Parse() flag.Parse()
if flag.NArg() != 0 { if flag.NArg() != 0 {
log.Fatal("Error: unparsed arguments: ", flag.Args()) log.Fatal("Error: unparsed arguments: ", flag.Args())
} }
// If no flags related to mode are set, it runs both by default. switch workloads {
if runUnary || runStream { case workloadsUnary:
runMode[0] = runUnary runMode[0] = true
runMode[1] = runStream runMode[1] = false
case workloadsStreaming:
runMode[0] = false
runMode[1] = true
case workloadsAll:
runMode[0] = true
runMode[1] = true
default:
log.Fatalf("Unknown workloads setting: %v (want %v, %v, or %v)",
workloads, strings.Join(allWorkloads, ", "))
}
switch compressorMode {
case compressionOn:
enableCompressor = []bool{true}
case compressionOff:
enableCompressor = []bool{false}
case compressionBoth:
enableCompressor = []bool{false, true}
default:
log.Fatalf("Unknown compression mode setting: %v (want one of: %v)",
compressorMode, strings.Join(allCompressionModes, ", "))
} }
if traceMode { if traceMode {
enableTrace = []bool{true} enableTrace = []bool{true}
} }
if compressorMode {
enableCompressor = []bool{true}
}
// Time input formats as (time + unit). // Time input formats as (time + unit).
readTimeFromInput(&ltc, readLatency) readTimeFromInput(&ltc, readLatency)
readIntFromIntSlice(&kbps, readKbps) readIntFromIntSlice(&kbps, readKbps)
readIntFromIntSlice(&mtu, readMtu) readIntFromIntSlice(&mtu, readMtu)
readIntFromIntSlice(&maxConcurrentCalls, readMaxConcurrentCalls) readIntFromIntSlice(&maxConcurrentCalls, readMaxConcurrentCalls)
readIntFromIntSlice(&reqSizeBytes, readReqSizeBytes) readIntFromIntSlice(&reqSizeBytes, readReqSizeBytes)
readIntFromIntSlice(&respSizeBytes, readReqspSizeBytes) readIntFromIntSlice(&respSizeBytes, readRespSizeBytes)
} }
type intSliceType []int type intSliceType []int
@ -388,3 +426,25 @@ func after() {
f.Close() f.Close()
} }
} }
// nopCompressor is a compressor that just copies data.
type nopCompressor struct{}
func (nopCompressor) Do(w io.Writer, p []byte) error {
n, err := w.Write(p)
if err != nil {
return err
}
if n != len(p) {
return fmt.Errorf("nopCompressor.Write: wrote %v bytes; want %v", n, len(p))
}
return nil
}
func (nopCompressor) Type() string { return "nop" }
// nopDecompressor is a decompressor that just copies data.
type nopDecompressor struct{}
func (nopDecompressor) Do(r io.Reader) ([]byte, error) { return ioutil.ReadAll(r) }
func (nopDecompressor) Type() string { return "nop" }