Revert "Less mem (#1987)" (#2049)

This reverts commit 7a8c9895074e03bc7fcff30bd561eaef4fa76dfa.
This commit is contained in:
mmukhi
2018-05-03 11:37:59 -07:00
committed by GitHub
parent ed2472917f
commit 0bc7c3280e
18 changed files with 1059 additions and 1134 deletions

View File

@ -21,6 +21,7 @@ package grpc
import (
"bytes"
"compress/gzip"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
@ -414,86 +415,141 @@ func (o CustomCodecCallOption) before(c *callInfo) error {
}
func (o CustomCodecCallOption) after(c *callInfo) {}
// The format of the payload: compressed or not?
type payloadFormat uint8
const (
compressionNone payloadFormat = iota // no compression
compressionMade
)
// parser reads complete gRPC messages from the underlying reader.
type parser struct {
// r is the underlying reader.
// See the comment on recvMsg for the permissible
// error types.
r io.Reader
// The header of a gRPC message. Find more detail at
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
header [5]byte
}
// recvMsg reads a complete gRPC message from the stream.
//
// It returns a flag set to true if message was compressed,
// the message as a byte slice or error if so.
// The caller owns the returned msg memory.
// It returns the message and its payload (compression/encoding)
// format. The caller owns the returned msg memory.
//
// If there is an error, possible values are:
// * io.EOF, when no messages remain
// * io.ErrUnexpectedEOF
// * of type transport.ConnectionError
// * of type transport.StreamError
// No other error values or types must be returned.
func recvMsg(s *transport.Stream, maxRecvMsgSize int) (bool, []byte, error) {
isCompressed, msg, err := s.Read(maxRecvMsgSize)
if err != nil {
return false, nil, err
// No other error values or types must be returned, which also means
// that the underlying io.Reader must not return an incompatible
// error.
func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byte, err error) {
if _, err := p.r.Read(p.header[:]); err != nil {
return 0, nil, err
}
return isCompressed, msg, nil
pf = payloadFormat(p.header[0])
length := binary.BigEndian.Uint32(p.header[1:])
if length == 0 {
return pf, nil, nil
}
if int64(length) > int64(maxInt) {
return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max length allowed on current machine (%d vs. %d)", length, maxInt)
}
if int(length) > maxReceiveMessageSize {
return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize)
}
// TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead
// of making it for each message:
msg = make([]byte, int(length))
if _, err := p.r.Read(msg); err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
return 0, nil, err
}
return pf, msg, nil
}
// encode serializes msg and returns a buffer of msg.
func encode(c baseCodec, msg interface{}, outPayload *stats.OutPayload) ([]byte, error) {
b, err := c.Marshal(msg)
if err != nil {
return nil, status.Errorf(codes.Internal, "grpc: error while marshaling: %v", err.Error())
}
if b == nil {
// If there was no error while marshalling, yet payload was nil,
// we update it to an empty slice, since a nil payload leads to
// an empty data frame(no gRPC message header is added).
b = []byte{}
}
if outPayload != nil {
outPayload.Payload = msg
// TODO truncate large payload.
outPayload.Data = b
outPayload.Length = len(b)
}
return b, nil
}
// compress the message if there is a compressor registered.
// TODO(mmukhi, dfawley): eliminate extra Compressor parameter.
func compress(b []byte, cp Compressor, compressor encoding.Compressor, outPayload *stats.OutPayload) ([]byte, bool, error) {
if len(b) <= 0 || (compressor == nil && cp == nil) {
return b, false, nil
}
cbuf := new(bytes.Buffer)
// Has compressor, check Compressor is set by UseCompressor first.
if compressor != nil {
z, _ := compressor.Compress(cbuf)
if _, err := z.Write(b); err != nil {
return nil, false, status.Errorf(codes.Internal, "grpc: error while compressing: %v", err.Error())
// encode serializes msg and returns a buffer of message header and a buffer of msg.
// If msg is nil, it generates the message header and an empty msg buffer.
// TODO(ddyihai): eliminate extra Compressor parameter.
func encode(c baseCodec, msg interface{}, cp Compressor, outPayload *stats.OutPayload, compressor encoding.Compressor) ([]byte, []byte, error) {
var (
b []byte
cbuf *bytes.Buffer
)
const (
payloadLen = 1
sizeLen = 4
)
if msg != nil {
var err error
b, err = c.Marshal(msg)
if err != nil {
return nil, nil, status.Errorf(codes.Internal, "grpc: error while marshaling: %v", err.Error())
}
z.Close()
} else {
// If Compressor is not set by UseCompressor, use default Compressor
if err := cp.Do(cbuf, b); err != nil {
return nil, false, status.Errorf(codes.Internal, "grpc: error while compressing: %v", err.Error())
if outPayload != nil {
outPayload.Payload = msg
// TODO truncate large payload.
outPayload.Data = b
outPayload.Length = len(b)
}
if compressor != nil || cp != nil {
cbuf = new(bytes.Buffer)
// Has compressor, check Compressor is set by UseCompressor first.
if compressor != nil {
z, _ := compressor.Compress(cbuf)
if _, err := z.Write(b); err != nil {
return nil, nil, status.Errorf(codes.Internal, "grpc: error while compressing: %v", err.Error())
}
z.Close()
} else {
// If Compressor is not set by UseCompressor, use default Compressor
if err := cp.Do(cbuf, b); err != nil {
return nil, nil, status.Errorf(codes.Internal, "grpc: error while compressing: %v", err.Error())
}
}
b = cbuf.Bytes()
}
}
b = cbuf.Bytes()
if uint(len(b)) > math.MaxUint32 {
return nil, false, status.Errorf(codes.ResourceExhausted, "grpc: message too large (%d bytes)", len(b))
return nil, nil, status.Errorf(codes.ResourceExhausted, "grpc: message too large (%d bytes)", len(b))
}
if outPayload != nil {
// A 5 byte gRPC-specific message header will added to this message
// before it's put on wire.
outPayload.WireLength = 5 + len(b)
bufHeader := make([]byte, payloadLen+sizeLen)
if compressor != nil || cp != nil {
bufHeader[0] = byte(compressionMade)
} else {
bufHeader[0] = byte(compressionNone)
}
return b, true, nil
// Write length of b into buf
binary.BigEndian.PutUint32(bufHeader[payloadLen:], uint32(len(b)))
if outPayload != nil {
outPayload.WireLength = payloadLen + sizeLen + len(b)
}
return bufHeader, b, nil
}
func checkRecvPayload(recvCompress string, haveCompressor bool) *status.Status {
if recvCompress == "" || recvCompress == encoding.Identity {
return status.New(codes.Internal, "grpc: compressed flag set with identity or empty encoding")
}
if !haveCompressor {
return status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
func checkRecvPayload(pf payloadFormat, recvCompress string, haveCompressor bool) *status.Status {
switch pf {
case compressionNone:
case compressionMade:
if recvCompress == "" || recvCompress == encoding.Identity {
return status.New(codes.Internal, "grpc: compressed flag set with identity or empty encoding")
}
if !haveCompressor {
return status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
}
default:
return status.Newf(codes.Internal, "grpc: received unexpected payload format %d", pf)
}
return nil
}
@ -501,8 +557,8 @@ func checkRecvPayload(recvCompress string, haveCompressor bool) *status.Status {
// For the two compressor parameters, both should not be set, but if they are,
// dc takes precedence over compressor.
// TODO(dfawley): wrap the old compressor/decompressor using the new API?
func recv(c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, inPayload *stats.InPayload, compressor encoding.Compressor) error {
isCompressed, d, err := recvMsg(s, maxReceiveMessageSize)
func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, inPayload *stats.InPayload, compressor encoding.Compressor) error {
pf, d, err := p.recvMsg(maxReceiveMessageSize)
if err != nil {
return err
}
@ -510,10 +566,11 @@ func recv(c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxR
inPayload.WireLength = len(d)
}
if isCompressed {
if st := checkRecvPayload(s.RecvCompress(), compressor != nil || dc != nil); st != nil {
return st.Err()
}
if st := checkRecvPayload(pf, s.RecvCompress(), compressor != nil || dc != nil); st != nil {
return st.Err()
}
if pf == compressionMade {
// To match legacy behavior, if the decompressor is set by WithDecompressor or RPCDecompressor,
// use this decompressor as the default.
if dc != nil {
@ -531,11 +588,11 @@ func recv(c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxR
return status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
}
}
if len(d) > maxReceiveMessageSize {
// TODO: Revisit the error code. Currently keep it consistent with java
// implementation.
return status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(d), maxReceiveMessageSize)
}
}
if len(d) > maxReceiveMessageSize {
// TODO: Revisit the error code. Currently keep it consistent with java
// implementation.
return status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(d), maxReceiveMessageSize)
}
if err := c.Unmarshal(d, m); err != nil {
return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)