
This commit introduces the first microbenchmark for grpc, wherein `encode` is benchmarked according to message size. A conclusion of the benchmark is that the removal of type switching found in `binary.Write`, which is used in `encode` produces the following encoding time and memory allocation footprint: ``` $ # Return to previous commit but benchmark. $ go test ./... -test.bench="Benchmark*" > /tmp/before $ # Return to working copy. $ go test ./... -test.bench="Benchmark*" > /tmp/after $ benchcmp /tmp/before /tmp/after benchmark old ns/op new ns/op delta BenchmarkEncode1B 1282 936 -26.99% BenchmarkEncode1KiB 4865 4184 -14.00% BenchmarkEncode8KiB 22686 21560 -4.96% BenchmarkEncode64KiB 134451 116762 -13.16% BenchmarkEncode512KiB 514044 361224 -29.73% BenchmarkEncode1MiB 767096 636725 -17.00% benchmark old MB/s new MB/s speedup BenchmarkEncode1B 6.24 8.55 1.37x BenchmarkEncode1KiB 212.11 246.63 1.16x BenchmarkEncode8KiB 361.46 380.33 1.05x BenchmarkEncode64KiB 487.50 561.35 1.15x BenchmarkEncode512KiB 1019.94 1451.45 1.42x BenchmarkEncode1MiB 1366.95 1646.84 1.20x benchmark old allocs new allocs delta BenchmarkEncode1B 6 3 -50.00% BenchmarkEncode1KiB 8 5 -37.50% BenchmarkEncode8KiB 8 5 -37.50% BenchmarkEncode64KiB 8 5 -37.50% BenchmarkEncode512KiB 8 5 -37.50% BenchmarkEncode1MiB 8 5 -37.50% benchmark old bytes new bytes delta BenchmarkEncode1B 384 328 -14.58% BenchmarkEncode1KiB 2816 2760 -1.99% BenchmarkEncode8KiB 17283 17227 -0.32% BenchmarkEncode64KiB 147856 147802 -0.04% BenchmarkEncode512KiB 1065344 1065288 -0.01% BenchmarkEncode1MiB 2113920 2113864 -0.00% ``` ..., which is apropos of the comment in [encoding/binary] (http://golang.org/pkg/encoding/binary), wherein ... > This package favors simplicity over efficiency. ... is stated. If `encode` is deemed to need further memory efficiencies, a mechanism whereby a `proto.Buffer` is retained may be warranted, which is why the original TODO remains. The proposed improvement in this change is simple and low-hanging. I did not want to introduce yet-another protocol buffer message for tests, but the ones under ... > interop/grpc_testing/test.proto > test/grpc_testing/test.proto ... have a fundamental dependency on `grpc` package due to their generated stubs, which produces a cycle in the imports if the benchmark were to attempt to import them for profiling. The newly created ... > test/grpc_message/test.proto ... protocol buffer package has no generated RPC service stubs, which means it can be imported into the `grpc` package root without cycle.
285 lines
7.9 KiB
Go
285 lines
7.9 KiB
Go
/*
|
|
*
|
|
* Copyright 2014, Google Inc.
|
|
* All rights reserved.
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted provided that the following conditions are
|
|
* met:
|
|
*
|
|
* * Redistributions of source code must retain the above copyright
|
|
* notice, this list of conditions and the following disclaimer.
|
|
* * Redistributions in binary form must reproduce the above
|
|
* copyright notice, this list of conditions and the following disclaimer
|
|
* in the documentation and/or other materials provided with the
|
|
* distribution.
|
|
* * Neither the name of Google Inc. nor the names of its
|
|
* contributors may be used to endorse or promote products derived from
|
|
* this software without specific prior written permission.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
|
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
|
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
|
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
|
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
|
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
|
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
|
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
|
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
|
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
|
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
*
|
|
*/
|
|
|
|
package grpc
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"io"
|
|
"math/rand"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
"golang.org/x/net/context"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/metadata"
|
|
"google.golang.org/grpc/transport"
|
|
)
|
|
|
|
// CallOption configures a Call before it starts or extracts information from
|
|
// a Call after it completes.
|
|
type CallOption interface {
|
|
// before is called before the call is sent to any server. If before
|
|
// returns a non-nil error, the RPC fails with that error.
|
|
before(*callInfo) error
|
|
|
|
// after is called after the call has completed. after cannot return an
|
|
// error, so any failures should be reported via output parameters.
|
|
after(*callInfo)
|
|
}
|
|
|
|
type beforeCall func(c *callInfo) error
|
|
|
|
func (o beforeCall) before(c *callInfo) error { return o(c) }
|
|
func (o beforeCall) after(c *callInfo) {}
|
|
|
|
type afterCall func(c *callInfo)
|
|
|
|
func (o afterCall) before(c *callInfo) error { return nil }
|
|
func (o afterCall) after(c *callInfo) { o(c) }
|
|
|
|
// Header returns a CallOptions that retrieves the header metadata
|
|
// for a unary RPC.
|
|
func Header(md *metadata.MD) CallOption {
|
|
return afterCall(func(c *callInfo) {
|
|
*md = c.headerMD
|
|
})
|
|
}
|
|
|
|
// Trailer returns a CallOptions that retrieves the trailer metadata
|
|
// for a unary RPC.
|
|
func Trailer(md *metadata.MD) CallOption {
|
|
return afterCall(func(c *callInfo) {
|
|
*md = c.trailerMD
|
|
})
|
|
}
|
|
|
|
// The format of the payload: compressed or not?
|
|
type payloadFormat uint8
|
|
|
|
const (
|
|
compressionNone payloadFormat = iota // no compression
|
|
compressionFlate
|
|
// More formats
|
|
)
|
|
|
|
// parser reads complelete gRPC messages from the underlying reader.
|
|
type parser struct {
|
|
s io.Reader
|
|
}
|
|
|
|
// msgFixedHeader defines the header of a gRPC message (go/grpc-wirefmt).
|
|
type msgFixedHeader struct {
|
|
T payloadFormat
|
|
Length uint32
|
|
}
|
|
|
|
// recvMsg is to read a complete gRPC message from the stream. It is blocking if
|
|
// the message has not been complete yet. It returns the message and its type,
|
|
// EOF is returned with nil msg and 0 pf if the entire stream is done. Other
|
|
// non-nil error is returned if something is wrong on reading.
|
|
func (p *parser) recvMsg() (pf payloadFormat, msg []byte, err error) {
|
|
var hdr msgFixedHeader
|
|
if err := binary.Read(p.s, binary.BigEndian, &hdr); err != nil {
|
|
return 0, nil, err
|
|
}
|
|
if hdr.Length == 0 {
|
|
return hdr.T, nil, nil
|
|
}
|
|
msg = make([]byte, int(hdr.Length))
|
|
if _, err := io.ReadFull(p.s, msg); err != nil {
|
|
if err == io.EOF {
|
|
err = io.ErrUnexpectedEOF
|
|
}
|
|
return 0, nil, err
|
|
}
|
|
return hdr.T, msg, nil
|
|
}
|
|
|
|
// encode serializes msg and prepends the message header. If msg is nil, it
|
|
// generates the message header of 0 message length.
|
|
func encode(msg proto.Message, pf payloadFormat) ([]byte, error) {
|
|
var buf bytes.Buffer
|
|
// Write message fixed header.
|
|
if err := buf.WriteByte(uint8(pf)); err != nil {
|
|
return nil, err
|
|
}
|
|
var b []byte
|
|
var length uint32
|
|
if msg != nil {
|
|
var err error
|
|
// TODO(zhaoq): optimize to reduce memory alloc and copying.
|
|
b, err = proto.Marshal(msg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
length = uint32(len(b))
|
|
}
|
|
var szHdr [4]byte
|
|
binary.BigEndian.PutUint32(szHdr[:], length)
|
|
if _, err := buf.Write(szHdr[:]); err != nil {
|
|
return nil, err
|
|
}
|
|
if _, err := buf.Write(b); err != nil {
|
|
return nil, err
|
|
}
|
|
return buf.Bytes(), nil
|
|
}
|
|
|
|
func recvProto(p *parser, m proto.Message) error {
|
|
pf, d, err := p.recvMsg()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
switch pf {
|
|
case compressionNone:
|
|
if err := proto.Unmarshal(d, m); err != nil {
|
|
return Errorf(codes.Internal, "grpc: %v", err)
|
|
}
|
|
default:
|
|
return Errorf(codes.Internal, "gprc: compression is not supported yet.")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// rpcError defines the status from an RPC.
|
|
type rpcError struct {
|
|
code codes.Code
|
|
desc string
|
|
}
|
|
|
|
func (e rpcError) Error() string {
|
|
return fmt.Sprintf("rpc error: code = %d desc = %q", e.code, e.desc)
|
|
}
|
|
|
|
// Code returns the error code for err if it was produced by the rpc system.
|
|
// Otherwise, it returns codes.Unknown.
|
|
func Code(err error) codes.Code {
|
|
if e, ok := err.(rpcError); ok {
|
|
return e.code
|
|
}
|
|
return codes.Unknown
|
|
}
|
|
|
|
// Errorf returns an error containing an error code and a description;
|
|
// CodeOf extracts the Code.
|
|
// Errorf returns nil if c is OK.
|
|
func Errorf(c codes.Code, format string, a ...interface{}) error {
|
|
if c == codes.OK {
|
|
return nil
|
|
}
|
|
return rpcError{
|
|
code: c,
|
|
desc: fmt.Sprintf(format, a...),
|
|
}
|
|
}
|
|
|
|
// toRPCErr converts a transport error into a rpcError if possible.
|
|
func toRPCErr(err error) error {
|
|
switch e := err.(type) {
|
|
case transport.StreamError:
|
|
return rpcError{
|
|
code: e.Code,
|
|
desc: e.Desc,
|
|
}
|
|
case transport.ConnectionError:
|
|
return rpcError{
|
|
code: codes.Internal,
|
|
desc: e.Desc,
|
|
}
|
|
}
|
|
return Errorf(codes.Unknown, "grpc: failed to convert %v to rpcErr", err)
|
|
}
|
|
|
|
// convertCode converts a standard Go error into its canonical code. Note that
|
|
// this is only used to translate the error returned by the server applications.
|
|
func convertCode(err error) codes.Code {
|
|
switch err {
|
|
case nil:
|
|
return codes.OK
|
|
case io.EOF:
|
|
return codes.OutOfRange
|
|
case io.ErrClosedPipe, io.ErrNoProgress, io.ErrShortBuffer, io.ErrShortWrite, io.ErrUnexpectedEOF:
|
|
return codes.FailedPrecondition
|
|
case os.ErrInvalid:
|
|
return codes.InvalidArgument
|
|
case context.Canceled:
|
|
return codes.Canceled
|
|
case context.DeadlineExceeded:
|
|
return codes.DeadlineExceeded
|
|
}
|
|
switch {
|
|
case os.IsExist(err):
|
|
return codes.AlreadyExists
|
|
case os.IsNotExist(err):
|
|
return codes.NotFound
|
|
case os.IsPermission(err):
|
|
return codes.PermissionDenied
|
|
}
|
|
return codes.Unknown
|
|
}
|
|
|
|
const (
|
|
// how long to wait after the first failure before retrying
|
|
baseDelay = 1.0 * time.Second
|
|
// upper bound on backoff delay
|
|
maxDelay = 120 * time.Second
|
|
backoffFactor = 2.0 // backoff increases by this factor on each retry
|
|
backoffRange = 0.4 // backoff is randomized downwards by this factor
|
|
)
|
|
|
|
// backoff returns a value in [0, maxDelay] that increases exponentially with
|
|
// retries, starting from baseDelay.
|
|
func backoff(retries int) time.Duration {
|
|
backoff, max := float64(baseDelay), float64(maxDelay)
|
|
for backoff < max && retries > 0 {
|
|
backoff = backoff * backoffFactor
|
|
retries--
|
|
}
|
|
if backoff > max {
|
|
backoff = max
|
|
}
|
|
|
|
// Randomize backoff delays so that if a cluster of requests start at
|
|
// the same time, they won't operate in lockstep. We just subtract up
|
|
// to 40% so that we obey maxDelay.
|
|
backoff -= backoff * backoffRange * rand.Float64()
|
|
if backoff < 0 {
|
|
return 0
|
|
}
|
|
return time.Duration(backoff)
|
|
}
|