Merge pull request #1165 from lyuxuan/service_config_pr

Expand service config support
This commit is contained in:
lyuxuan
2017-05-22 11:15:26 -07:00
committed by GitHub
6 changed files with 946 additions and 176 deletions

37
call.go
View File

@ -73,7 +73,10 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran
} }
} }
for { for {
if err = recv(p, dopts.codec, stream, dopts.dc, reply, dopts.maxMsgSize, inPayload); err != nil { if c.maxReceiveMessageSize == nil {
return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)")
}
if err = recv(p, dopts.codec, stream, dopts.dc, reply, *c.maxReceiveMessageSize, inPayload); err != nil {
if err == io.EOF { if err == io.EOF {
break break
} }
@ -93,7 +96,7 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran
} }
// sendRequest writes out various information of an RPC such as Context and Message. // sendRequest writes out various information of an RPC such as Context and Message.
func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, callHdr *transport.CallHdr, stream *transport.Stream, t transport.ClientTransport, args interface{}, opts *transport.Options) (err error) { func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, c *callInfo, callHdr *transport.CallHdr, stream *transport.Stream, t transport.ClientTransport, args interface{}, opts *transport.Options) (err error) {
defer func() { defer func() {
if err != nil { if err != nil {
// If err is connection error, t will be closed, no need to close stream here. // If err is connection error, t will be closed, no need to close stream here.
@ -118,6 +121,12 @@ func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor,
if err != nil { if err != nil {
return Errorf(codes.Internal, "grpc: %v", err) return Errorf(codes.Internal, "grpc: %v", err)
} }
if c.maxSendMessageSize == nil {
return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)")
}
if len(outBuf) > *c.maxSendMessageSize {
return Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(outBuf), *c.maxSendMessageSize)
}
err = t.Write(stream, outBuf, opts) err = t.Write(stream, outBuf, opts)
if err == nil && outPayload != nil { if err == nil && outPayload != nil {
outPayload.SentTime = time.Now() outPayload.SentTime = time.Now()
@ -145,14 +154,18 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) { func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) {
c := defaultCallInfo c := defaultCallInfo
if mc, ok := cc.getMethodConfig(method); ok { mc := cc.GetMethodConfig(method)
c.failFast = !mc.WaitForReady if mc.WaitForReady != nil {
if mc.Timeout > 0 { c.failFast = !*mc.WaitForReady
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, mc.Timeout)
defer cancel()
}
} }
if mc.Timeout != nil && *mc.Timeout >= 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
defer cancel()
}
opts = append(cc.dopts.callOptions, opts...)
for _, o := range opts { for _, o := range opts {
if err := o.before(&c); err != nil { if err := o.before(&c); err != nil {
return toRPCErr(err) return toRPCErr(err)
@ -163,6 +176,10 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
o.after(&c) o.after(&c)
} }
}() }()
c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize)
c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
if EnableTracing { if EnableTracing {
c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method) c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
defer c.traceInfo.tr.Finish() defer c.traceInfo.tr.Finish()
@ -260,7 +277,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
} }
return toRPCErr(err) return toRPCErr(err)
} }
err = sendRequest(ctx, cc.dopts, cc.dopts.cp, callHdr, stream, t, args, topts) err = sendRequest(ctx, cc.dopts, cc.dopts.cp, &c, callHdr, stream, t, args, topts)
if err != nil { if err != nil {
if put != nil { if put != nil {
updateRPCInfoInContext(ctx, rpcInfo{ updateRPCInfoInContext(ctx, rpcInfo{

View File

@ -36,8 +36,8 @@ package grpc
import ( import (
"errors" "errors"
"fmt" "fmt"
"math"
"net" "net"
"strings"
"sync" "sync"
"time" "time"
@ -87,22 +87,25 @@ var (
// dialOptions configure a Dial call. dialOptions are set by the DialOption // dialOptions configure a Dial call. dialOptions are set by the DialOption
// values passed to Dial. // values passed to Dial.
type dialOptions struct { type dialOptions struct {
unaryInt UnaryClientInterceptor unaryInt UnaryClientInterceptor
streamInt StreamClientInterceptor streamInt StreamClientInterceptor
codec Codec codec Codec
cp Compressor cp Compressor
dc Decompressor dc Decompressor
bs backoffStrategy bs backoffStrategy
balancer Balancer balancer Balancer
block bool block bool
insecure bool insecure bool
timeout time.Duration timeout time.Duration
scChan <-chan ServiceConfig scChan <-chan ServiceConfig
copts transport.ConnectOptions copts transport.ConnectOptions
maxMsgSize int callOptions []CallOption
} }
const defaultClientMaxMsgSize = math.MaxInt32 const (
defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
defaultClientMaxSendMessageSize = 1024 * 1024 * 4
)
// DialOption configures how we set up the connection. // DialOption configures how we set up the connection.
type DialOption func(*dialOptions) type DialOption func(*dialOptions)
@ -123,10 +126,15 @@ func WithInitialConnWindowSize(s int32) DialOption {
} }
} }
// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. // WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead.
func WithMaxMsgSize(s int) DialOption { func WithMaxMsgSize(s int) DialOption {
return WithDefaultCallOptions(MaxCallRecvMsgSize(s))
}
// WithDefaultCallOptions returns a DialOption which sets the default CallOptions for calls over the connection.
func WithDefaultCallOptions(cos ...CallOption) DialOption {
return func(o *dialOptions) { return func(o *dialOptions) {
o.maxMsgSize = s o.callOptions = append(o.callOptions, cos...)
} }
} }
@ -321,7 +329,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
conns: make(map[Address]*addrConn), conns: make(map[Address]*addrConn),
} }
cc.ctx, cc.cancel = context.WithCancel(context.Background()) cc.ctx, cc.cancel = context.WithCancel(context.Background())
cc.dopts.maxMsgSize = defaultClientMaxMsgSize
for _, opt := range opts { for _, opt := range opts {
opt(&cc.dopts) opt(&cc.dopts)
} }
@ -359,15 +367,16 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
} }
}() }()
scSet := false
if cc.dopts.scChan != nil { if cc.dopts.scChan != nil {
// Wait for the initial service config. // Try to get an initial service config.
select { select {
case sc, ok := <-cc.dopts.scChan: case sc, ok := <-cc.dopts.scChan:
if ok { if ok {
cc.sc = sc cc.sc = sc
scSet = true
} }
case <-ctx.Done(): default:
return nil, ctx.Err()
} }
} }
// Set defaults. // Set defaults.
@ -430,7 +439,17 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
return nil, err return nil, err
} }
} }
if cc.dopts.scChan != nil && !scSet {
// Blocking wait for the initial service config.
select {
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = sc
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
if cc.dopts.scChan != nil { if cc.dopts.scChan != nil {
go cc.scWatcher() go cc.scWatcher()
} }
@ -640,12 +659,23 @@ func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error)
return nil return nil
} }
// TODO: Avoid the locking here. // GetMethodConfig gets the method config of the input method.
func (cc *ClientConn) getMethodConfig(method string) (m MethodConfig, ok bool) { // If there's an exact match for input method (i.e. /service/method), we return
// the corresponding MethodConfig.
// If there isn't an exact match for the input method, we look for the default config
// under the service (i.e /service/). If there is a default MethodConfig for
// the serivce, we return it.
// Otherwise, we return an empty MethodConfig.
func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
// TODO: Avoid the locking here.
cc.mu.RLock() cc.mu.RLock()
defer cc.mu.RUnlock() defer cc.mu.RUnlock()
m, ok = cc.sc.Methods[method] m, ok := cc.sc.Methods[method]
return if !ok {
i := strings.LastIndex(method, "/")
m, _ = cc.sc.Methods[method[:i+1]]
}
return m
} }
func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) { func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {

View File

@ -137,12 +137,14 @@ func (d *gzipDecompressor) Type() string {
// callInfo contains all related configuration and information about an RPC. // callInfo contains all related configuration and information about an RPC.
type callInfo struct { type callInfo struct {
failFast bool failFast bool
headerMD metadata.MD headerMD metadata.MD
trailerMD metadata.MD trailerMD metadata.MD
peer *peer.Peer peer *peer.Peer
traceInfo traceInfo // in trace.go traceInfo traceInfo // in trace.go
creds credentials.PerRPCCredentials maxReceiveMessageSize *int
maxSendMessageSize *int
creds credentials.PerRPCCredentials
} }
var defaultCallInfo = callInfo{failFast: true} var defaultCallInfo = callInfo{failFast: true}
@ -217,6 +219,22 @@ func FailFast(failFast bool) CallOption {
}) })
} }
// MaxCallRecvMsgSize returns a CallOption which sets the maximum message size the client can receive.
func MaxCallRecvMsgSize(s int) CallOption {
return beforeCall(func(o *callInfo) error {
o.maxReceiveMessageSize = &s
return nil
})
}
// MaxCallSendMsgSize returns a CallOption which sets the maximum message size the client can send.
func MaxCallSendMsgSize(s int) CallOption {
return beforeCall(func(o *callInfo) error {
o.maxSendMessageSize = &s
return nil
})
}
// PerRPCCredentials returns a CallOption that sets credentials.PerRPCCredentials // PerRPCCredentials returns a CallOption that sets credentials.PerRPCCredentials
// for a call. // for a call.
func PerRPCCredentials(creds credentials.PerRPCCredentials) CallOption { func PerRPCCredentials(creds credentials.PerRPCCredentials) CallOption {
@ -259,7 +277,7 @@ type parser struct {
// No other error values or types must be returned, which also means // No other error values or types must be returned, which also means
// that the underlying io.Reader must not return an incompatible // that the underlying io.Reader must not return an incompatible
// error. // error.
func (p *parser) recvMsg(maxMsgSize int) (pf payloadFormat, msg []byte, err error) { func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byte, err error) {
if _, err := io.ReadFull(p.r, p.header[:]); err != nil { if _, err := io.ReadFull(p.r, p.header[:]); err != nil {
return 0, nil, err return 0, nil, err
} }
@ -270,8 +288,8 @@ func (p *parser) recvMsg(maxMsgSize int) (pf payloadFormat, msg []byte, err erro
if length == 0 { if length == 0 {
return pf, nil, nil return pf, nil, nil
} }
if length > uint32(maxMsgSize) { if length > uint32(maxReceiveMessageSize) {
return 0, nil, Errorf(codes.Internal, "grpc: received message length %d exceeding the max size %d", length, maxMsgSize) return 0, nil, Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize)
} }
// TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead // TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead
// of making it for each message: // of making it for each message:
@ -314,7 +332,7 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outPayl
length = uint(len(b)) length = uint(len(b))
} }
if length > math.MaxUint32 { if length > math.MaxUint32 {
return nil, Errorf(codes.InvalidArgument, "grpc: message too large (%d bytes)", length) return nil, Errorf(codes.ResourceExhausted, "grpc: message too large (%d bytes)", length)
} }
const ( const (
@ -355,8 +373,8 @@ func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) er
return nil return nil
} }
func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int, inPayload *stats.InPayload) error { func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, inPayload *stats.InPayload) error {
pf, d, err := p.recvMsg(maxMsgSize) pf, d, err := p.recvMsg(maxReceiveMessageSize)
if err != nil { if err != nil {
return err return err
} }
@ -372,10 +390,10 @@ func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{
return Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err) return Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
} }
} }
if len(d) > maxMsgSize { if len(d) > maxReceiveMessageSize {
// TODO: Revisit the error code. Currently keep it consistent with java // TODO: Revisit the error code. Currently keep it consistent with java
// implementation. // implementation.
return Errorf(codes.Internal, "grpc: received a message of %d bytes exceeding %d limit", len(d), maxMsgSize) return Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(d), maxReceiveMessageSize)
} }
if err := c.Unmarshal(d, m); err != nil { if err := c.Unmarshal(d, m); err != nil {
return Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err) return Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
@ -501,24 +519,22 @@ type MethodConfig struct {
// WaitForReady indicates whether RPCs sent to this method should wait until // WaitForReady indicates whether RPCs sent to this method should wait until
// the connection is ready by default (!failfast). The value specified via the // the connection is ready by default (!failfast). The value specified via the
// gRPC client API will override the value set here. // gRPC client API will override the value set here.
WaitForReady bool WaitForReady *bool
// Timeout is the default timeout for RPCs sent to this method. The actual // Timeout is the default timeout for RPCs sent to this method. The actual
// deadline used will be the minimum of the value specified here and the value // deadline used will be the minimum of the value specified here and the value
// set by the application via the gRPC client API. If either one is not set, // set by the application via the gRPC client API. If either one is not set,
// then the other will be used. If neither is set, then the RPC has no deadline. // then the other will be used. If neither is set, then the RPC has no deadline.
Timeout time.Duration Timeout *time.Duration
// MaxReqSize is the maximum allowed payload size for an individual request in a // MaxReqSize is the maximum allowed payload size for an individual request in a
// stream (client->server) in bytes. The size which is measured is the serialized // stream (client->server) in bytes. The size which is measured is the serialized
// payload after per-message compression (but before stream compression) in bytes. // payload after per-message compression (but before stream compression) in bytes.
// The actual value used is the minumum of the value specified here and the value set // The actual value used is the minumum of the value specified here and the value set
// by the application via the gRPC client API. If either one is not set, then the other // by the application via the gRPC client API. If either one is not set, then the other
// will be used. If neither is set, then the built-in default is used. // will be used. If neither is set, then the built-in default is used.
// TODO: support this. MaxReqSize *int
MaxReqSize uint32
// MaxRespSize is the maximum allowed payload size for an individual response in a // MaxRespSize is the maximum allowed payload size for an individual response in a
// stream (server->client) in bytes. // stream (server->client) in bytes.
// TODO: support this. MaxRespSize *int
MaxRespSize uint32
} }
// ServiceConfig is provided by the service provider and contains parameters for how // ServiceConfig is provided by the service provider and contains parameters for how
@ -529,9 +545,32 @@ type ServiceConfig struct {
// via grpc.WithBalancer will override this. // via grpc.WithBalancer will override this.
LB Balancer LB Balancer
// Methods contains a map for the methods in this service. // Methods contains a map for the methods in this service.
// If there is an exact match for a method (i.e. /service/method) in the map, use the corresponding MethodConfig.
// If there's no exact match, look for the default config for the service (/service/) and use the corresponding MethodConfig if it exists.
// Otherwise, the method has no MethodConfig to use.
Methods map[string]MethodConfig Methods map[string]MethodConfig
} }
func min(a, b *int) *int {
if *a < *b {
return a
}
return b
}
func getMaxSize(mcMax, doptMax *int, defaultVal int) *int {
if mcMax == nil && doptMax == nil {
return &defaultVal
}
if mcMax != nil && doptMax != nil {
return min(mcMax, doptMax)
}
if mcMax != nil {
return mcMax
}
return doptMax
}
// SupportPackageIsVersion4 is referenced from generated protocol buffer files // SupportPackageIsVersion4 is referenced from generated protocol buffer files
// to assert that that code is compatible with this version of the grpc package. // to assert that that code is compatible with this version of the grpc package.
// //

View File

@ -61,6 +61,11 @@ import (
"google.golang.org/grpc/transport" "google.golang.org/grpc/transport"
) )
const (
defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
defaultServerMaxSendMessageSize = 1024 * 1024 * 4
)
type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error) type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
// MethodDesc represents an RPC service's method specification. // MethodDesc represents an RPC service's method specification.
@ -111,12 +116,13 @@ type options struct {
codec Codec codec Codec
cp Compressor cp Compressor
dc Decompressor dc Decompressor
maxMsgSize int
unaryInt UnaryServerInterceptor unaryInt UnaryServerInterceptor
streamInt StreamServerInterceptor streamInt StreamServerInterceptor
inTapHandle tap.ServerInHandle inTapHandle tap.ServerInHandle
statsHandler stats.Handler statsHandler stats.Handler
maxConcurrentStreams uint32 maxConcurrentStreams uint32
maxReceiveMessageSize int
maxSendMessageSize int
useHandlerImpl bool // use http.Handler-based server useHandlerImpl bool // use http.Handler-based server
unknownStreamDesc *StreamDesc unknownStreamDesc *StreamDesc
keepaliveParams keepalive.ServerParameters keepaliveParams keepalive.ServerParameters
@ -125,7 +131,10 @@ type options struct {
initialConnWindowSize int32 initialConnWindowSize int32
} }
var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit var defaultServerOptions = options{
maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
maxSendMessageSize: defaultServerMaxSendMessageSize,
}
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc. // A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
type ServerOption func(*options) type ServerOption func(*options)
@ -181,11 +190,25 @@ func RPCDecompressor(dc Decompressor) ServerOption {
} }
} }
// MaxMsgSize returns a ServerOption to set the max message size in bytes for inbound mesages. // MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
// If this is not set, gRPC uses the default 4MB. // If this is not set, gRPC uses the default limit. Deprecated: use MaxRecvMsgSize instead.
func MaxMsgSize(m int) ServerOption { func MaxMsgSize(m int) ServerOption {
return MaxRecvMsgSize(m)
}
// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
// If this is not set, gRPC uses the default 4MB.
func MaxRecvMsgSize(m int) ServerOption {
return func(o *options) { return func(o *options) {
o.maxMsgSize = m o.maxReceiveMessageSize = m
}
}
// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
// If this is not set, gRPC uses the default 4MB.
func MaxSendMsgSize(m int) ServerOption {
return func(o *options) {
o.maxSendMessageSize = m
} }
} }
@ -266,8 +289,7 @@ func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
// NewServer creates a gRPC server which has no service registered and has not // NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet. // started to accept requests yet.
func NewServer(opt ...ServerOption) *Server { func NewServer(opt ...ServerOption) *Server {
var opts options opts := defaultServerOptions
opts.maxMsgSize = defaultMaxMsgSize
for _, o := range opt { for _, o := range opt {
o(&opts) o(&opts)
} }
@ -651,6 +673,9 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
// the optimal option. // the optimal option.
grpclog.Fatalf("grpc: Server failed to encode response %v", err) grpclog.Fatalf("grpc: Server failed to encode response %v", err)
} }
if len(p) > s.opts.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(p), s.opts.maxSendMessageSize)
}
err = t.Write(stream, p, opts) err = t.Write(stream, p, opts)
if err == nil && outPayload != nil { if err == nil && outPayload != nil {
outPayload.SentTime = time.Now() outPayload.SentTime = time.Now()
@ -692,7 +717,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
stream.SetSendCompress(s.opts.cp.Type()) stream.SetSendCompress(s.opts.cp.Type())
} }
p := &parser{r: stream} p := &parser{r: stream}
pf, req, err := p.recvMsg(s.opts.maxMsgSize) pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize)
if err == io.EOF { if err == io.EOF {
// The entire stream is done (for unary RPC only). // The entire stream is done (for unary RPC only).
return err return err
@ -750,10 +775,10 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
return Errorf(codes.Internal, err.Error()) return Errorf(codes.Internal, err.Error())
} }
} }
if len(req) > s.opts.maxMsgSize { if len(req) > s.opts.maxReceiveMessageSize {
// TODO: Revisit the error code. Currently keep it consistent with // TODO: Revisit the error code. Currently keep it consistent with
// java implementation. // java implementation.
return status.Errorf(codes.Internal, "grpc: server received a message of %d bytes exceeding %d limit", len(req), s.opts.maxMsgSize) return status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(req), s.opts.maxReceiveMessageSize)
} }
if err := s.opts.codec.Unmarshal(req, v); err != nil { if err := s.opts.codec.Unmarshal(req, v); err != nil {
return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err) return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
@ -846,15 +871,16 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
stream.SetSendCompress(s.opts.cp.Type()) stream.SetSendCompress(s.opts.cp.Type())
} }
ss := &serverStream{ ss := &serverStream{
t: t, t: t,
s: stream, s: stream,
p: &parser{r: stream}, p: &parser{r: stream},
codec: s.opts.codec, codec: s.opts.codec,
cp: s.opts.cp, cp: s.opts.cp,
dc: s.opts.dc, dc: s.opts.dc,
maxMsgSize: s.opts.maxMsgSize, maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
trInfo: trInfo, maxSendMessageSize: s.opts.maxSendMessageSize,
statsHandler: sh, trInfo: trInfo,
statsHandler: sh,
} }
if ss.cp != nil { if ss.cp != nil {
ss.cbuf = new(bytes.Buffer) ss.cbuf = new(bytes.Buffer)
@ -929,7 +955,7 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
trInfo.tr.SetError() trInfo.tr.SetError()
} }
errDesc := fmt.Sprintf("malformed method name: %q", stream.Method()) errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
if err := t.WriteStatus(stream, status.New(codes.InvalidArgument, errDesc)); err != nil { if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
if trInfo != nil { if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError() trInfo.tr.SetError()

View File

@ -113,17 +113,24 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
cancel context.CancelFunc cancel context.CancelFunc
) )
c := defaultCallInfo c := defaultCallInfo
if mc, ok := cc.getMethodConfig(method); ok { mc := cc.GetMethodConfig(method)
c.failFast = !mc.WaitForReady if mc.WaitForReady != nil {
if mc.Timeout > 0 { c.failFast = !*mc.WaitForReady
ctx, cancel = context.WithTimeout(ctx, mc.Timeout)
}
} }
if mc.Timeout != nil {
ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
}
opts = append(cc.dopts.callOptions, opts...)
for _, o := range opts { for _, o := range opts {
if err := o.before(&c); err != nil { if err := o.before(&c); err != nil {
return nil, toRPCErr(err) return nil, toRPCErr(err)
} }
} }
c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize)
c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
callHdr := &transport.CallHdr{ callHdr := &transport.CallHdr{
Host: cc.authority, Host: cc.authority,
Method: method, Method: method,
@ -215,14 +222,13 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
break break
} }
cs := &clientStream{ cs := &clientStream{
opts: opts, opts: opts,
c: c, c: c,
desc: desc, desc: desc,
codec: cc.dopts.codec, codec: cc.dopts.codec,
cp: cc.dopts.cp, cp: cc.dopts.cp,
dc: cc.dopts.dc, dc: cc.dopts.dc,
maxMsgSize: cc.dopts.maxMsgSize, cancel: cancel,
cancel: cancel,
put: put, put: put,
t: t, t: t,
@ -266,18 +272,17 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
// clientStream implements a client side Stream. // clientStream implements a client side Stream.
type clientStream struct { type clientStream struct {
opts []CallOption opts []CallOption
c callInfo c callInfo
t transport.ClientTransport t transport.ClientTransport
s *transport.Stream s *transport.Stream
p *parser p *parser
desc *StreamDesc desc *StreamDesc
codec Codec codec Codec
cp Compressor cp Compressor
cbuf *bytes.Buffer cbuf *bytes.Buffer
dc Decompressor dc Decompressor
maxMsgSize int cancel context.CancelFunc
cancel context.CancelFunc
tracing bool // set to EnableTracing when the clientStream is created. tracing bool // set to EnableTracing when the clientStream is created.
@ -361,6 +366,12 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
if err != nil { if err != nil {
return Errorf(codes.Internal, "grpc: %v", err) return Errorf(codes.Internal, "grpc: %v", err)
} }
if cs.c.maxSendMessageSize == nil {
return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)")
}
if len(out) > *cs.c.maxSendMessageSize {
return Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(out), *cs.c.maxSendMessageSize)
}
err = cs.t.Write(cs.s, out, &transport.Options{Last: false}) err = cs.t.Write(cs.s, out, &transport.Options{Last: false})
if err == nil && outPayload != nil { if err == nil && outPayload != nil {
outPayload.SentTime = time.Now() outPayload.SentTime = time.Now()
@ -376,7 +387,10 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
Client: true, Client: true,
} }
} }
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, cs.maxMsgSize, inPayload) if cs.c.maxReceiveMessageSize == nil {
return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)")
}
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, *cs.c.maxReceiveMessageSize, inPayload)
defer func() { defer func() {
// err != nil indicates the termination of the stream. // err != nil indicates the termination of the stream.
if err != nil { if err != nil {
@ -399,7 +413,10 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
} }
// Special handling for client streaming rpc. // Special handling for client streaming rpc.
// This recv expects EOF or errors, so we don't collect inPayload. // This recv expects EOF or errors, so we don't collect inPayload.
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, cs.maxMsgSize, nil) if cs.c.maxReceiveMessageSize == nil {
return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)")
}
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, *cs.c.maxReceiveMessageSize, nil)
cs.closeTransportStream(err) cs.closeTransportStream(err)
if err == nil { if err == nil {
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>")) return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
@ -524,15 +541,16 @@ type ServerStream interface {
// serverStream implements a server side Stream. // serverStream implements a server side Stream.
type serverStream struct { type serverStream struct {
t transport.ServerTransport t transport.ServerTransport
s *transport.Stream s *transport.Stream
p *parser p *parser
codec Codec codec Codec
cp Compressor cp Compressor
dc Decompressor dc Decompressor
cbuf *bytes.Buffer cbuf *bytes.Buffer
maxMsgSize int maxReceiveMessageSize int
trInfo *traceInfo maxSendMessageSize int
trInfo *traceInfo
statsHandler stats.Handler statsHandler stats.Handler
@ -591,6 +609,9 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
err = Errorf(codes.Internal, "grpc: %v", err) err = Errorf(codes.Internal, "grpc: %v", err)
return err return err
} }
if len(out) > ss.maxSendMessageSize {
return Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(out), ss.maxSendMessageSize)
}
if err := ss.t.Write(ss.s, out, &transport.Options{Last: false}); err != nil { if err := ss.t.Write(ss.s, out, &transport.Options{Last: false}); err != nil {
return toRPCErr(err) return toRPCErr(err)
} }
@ -620,7 +641,7 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
if ss.statsHandler != nil { if ss.statsHandler != nil {
inPayload = &stats.InPayload{} inPayload = &stats.InPayload{}
} }
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize, inPayload); err != nil { if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, inPayload); err != nil {
if err == io.EOF { if err == io.EOF {
return err return err
} }

View File

@ -435,7 +435,11 @@ type test struct {
healthServer *health.Server // nil means disabled healthServer *health.Server // nil means disabled
maxStream uint32 maxStream uint32
tapHandle tap.ServerInHandle tapHandle tap.ServerInHandle
maxMsgSize int maxMsgSize *int
maxClientReceiveMsgSize *int
maxClientSendMsgSize *int
maxServerReceiveMsgSize *int
maxServerSendMsgSize *int
userAgent string userAgent string
clientCompression bool clientCompression bool
serverCompression bool serverCompression bool
@ -496,8 +500,14 @@ func (te *test) startServer(ts testpb.TestServiceServer) {
te.testServer = ts te.testServer = ts
te.t.Logf("Running test in %s environment...", te.e.name) te.t.Logf("Running test in %s environment...", te.e.name)
sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(te.maxStream)} sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(te.maxStream)}
if te.maxMsgSize > 0 { if te.maxMsgSize != nil {
sopts = append(sopts, grpc.MaxMsgSize(te.maxMsgSize)) sopts = append(sopts, grpc.MaxMsgSize(*te.maxMsgSize))
}
if te.maxServerReceiveMsgSize != nil {
sopts = append(sopts, grpc.MaxRecvMsgSize(*te.maxServerReceiveMsgSize))
}
if te.maxServerSendMsgSize != nil {
sopts = append(sopts, grpc.MaxSendMsgSize(*te.maxServerSendMsgSize))
} }
if te.tapHandle != nil { if te.tapHandle != nil {
sopts = append(sopts, grpc.InTapHandle(te.tapHandle)) sopts = append(sopts, grpc.InTapHandle(te.tapHandle))
@ -596,8 +606,14 @@ func (te *test) clientConn() *grpc.ClientConn {
if te.streamClientInt != nil { if te.streamClientInt != nil {
opts = append(opts, grpc.WithStreamInterceptor(te.streamClientInt)) opts = append(opts, grpc.WithStreamInterceptor(te.streamClientInt))
} }
if te.maxMsgSize > 0 { if te.maxMsgSize != nil {
opts = append(opts, grpc.WithMaxMsgSize(te.maxMsgSize)) opts = append(opts, grpc.WithMaxMsgSize(*te.maxMsgSize))
}
if te.maxClientReceiveMsgSize != nil {
opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(*te.maxClientReceiveMsgSize)))
}
if te.maxClientSendMsgSize != nil {
opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(*te.maxClientSendMsgSize)))
} }
switch te.e.security { switch te.e.security {
case "tls": case "tls":
@ -1097,16 +1113,10 @@ func testFailFast(t *testing.T, e env) {
awaitNewConnLogOutput() awaitNewConnLogOutput()
} }
func TestServiceConfig(t *testing.T) { func testServiceConfigSetup(t *testing.T, e env) (*test, chan grpc.ServiceConfig) {
defer leakCheck(t)()
for _, e := range listTestEnv() {
testServiceConfig(t, e)
}
}
func testServiceConfig(t *testing.T, e env) {
te := newTest(t, e) te := newTest(t, e)
ch := make(chan grpc.ServiceConfig) // We write before read.
ch := make(chan grpc.ServiceConfig, 1)
te.sc = ch te.sc = ch
te.userAgent = testAppUA te.userAgent = testAppUA
te.declareLogNoise( te.declareLogNoise(
@ -1115,48 +1125,64 @@ func testServiceConfig(t *testing.T, e env) {
"grpc: addrConn.resetTransport failed to create client transport: connection error", "grpc: addrConn.resetTransport failed to create client transport: connection error",
"Failed to dial : context canceled; please retry.", "Failed to dial : context canceled; please retry.",
) )
return te, ch
}
func newBool(b bool) (a *bool) {
return &b
}
func newInt(b int) (a *int) {
return &b
}
func newDuration(b time.Duration) (a *time.Duration) {
a = new(time.Duration)
*a = b
return
}
func TestServiceConfigGetMethodConfig(t *testing.T) {
defer leakCheck(t)()
for _, e := range listTestEnv() {
testGetMethodConfig(t, e)
}
}
func testGetMethodConfig(t *testing.T, e env) {
te, ch := testServiceConfigSetup(t, e)
defer te.tearDown() defer te.tearDown()
var wg sync.WaitGroup mc1 := grpc.MethodConfig{
wg.Add(1) WaitForReady: newBool(true),
go func() { Timeout: newDuration(time.Millisecond),
defer wg.Done() }
mc := grpc.MethodConfig{ mc2 := grpc.MethodConfig{WaitForReady: newBool(false)}
WaitForReady: true, m := make(map[string]grpc.MethodConfig)
Timeout: time.Millisecond, m["/grpc.testing.TestService/EmptyCall"] = mc1
} m["/grpc.testing.TestService/"] = mc2
m := make(map[string]grpc.MethodConfig) sc := grpc.ServiceConfig{
m["/grpc.testing.TestService/EmptyCall"] = mc Methods: m,
m["/grpc.testing.TestService/FullDuplexCall"] = mc }
sc := grpc.ServiceConfig{ ch <- sc
Methods: m,
}
ch <- sc
}()
cc := te.clientConn() cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
// The following RPCs are expected to become non-fail-fast ones with 1ms deadline. // The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded { if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
} }
if _, err := tc.FullDuplexCall(context.Background()); grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) m = make(map[string]grpc.MethodConfig)
} m["/grpc.testing.TestService/UnaryCall"] = mc1
wg.Wait() m["/grpc.testing.TestService/"] = mc2
// Generate a service config update. sc = grpc.ServiceConfig{
mc := grpc.MethodConfig{
WaitForReady: false,
}
m := make(map[string]grpc.MethodConfig)
m["/grpc.testing.TestService/EmptyCall"] = mc
m["/grpc.testing.TestService/FullDuplexCall"] = mc
sc := grpc.ServiceConfig{
Methods: m, Methods: m,
} }
ch <- sc ch <- sc
// Loop until the new update becomes effective. // Wait for the new service config to propagate.
for { for {
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.Unavailable { if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) == codes.DeadlineExceeded {
continue continue
} }
break break
@ -1165,8 +1191,618 @@ func testServiceConfig(t *testing.T, e env) {
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.Unavailable { if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.Unavailable {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable) t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable)
} }
if _, err := tc.FullDuplexCall(context.Background()); grpc.Code(err) != codes.Unavailable { }
t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.Unavailable)
func TestServiceConfigWaitForReady(t *testing.T) {
defer leakCheck(t)()
for _, e := range listTestEnv() {
testServiceConfigWaitForReady(t, e)
}
}
func testServiceConfigWaitForReady(t *testing.T, e env) {
te, ch := testServiceConfigSetup(t, e)
defer te.tearDown()
// Case1: Client API set failfast to be false, and service config set wait_for_ready to be false, Client API should win, and the rpc will wait until deadline exceeds.
mc := grpc.MethodConfig{
WaitForReady: newBool(false),
Timeout: newDuration(time.Millisecond),
}
m := make(map[string]grpc.MethodConfig)
m["/grpc.testing.TestService/EmptyCall"] = mc
m["/grpc.testing.TestService/FullDuplexCall"] = mc
sc := grpc.ServiceConfig{
Methods: m,
}
ch <- sc
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
// The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
}
if _, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
}
// Generate a service config update.
// Case2: Client API does not set failfast, and service config set wait_for_ready to be true, and the rpc will wait until deadline exceeds.
mc.WaitForReady = newBool(true)
m = make(map[string]grpc.MethodConfig)
m["/grpc.testing.TestService/EmptyCall"] = mc
m["/grpc.testing.TestService/FullDuplexCall"] = mc
sc = grpc.ServiceConfig{
Methods: m,
}
ch <- sc
// Wait for the new service config to take effect.
mc = cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall")
for {
if !*mc.WaitForReady {
time.Sleep(100 * time.Millisecond)
mc = cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall")
continue
}
break
}
// The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
}
if _, err := tc.FullDuplexCall(context.Background()); grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
}
}
func TestServiceConfigTimeout(t *testing.T) {
defer leakCheck(t)()
for _, e := range listTestEnv() {
testServiceConfigTimeout(t, e)
}
}
func testServiceConfigTimeout(t *testing.T, e env) {
te, ch := testServiceConfigSetup(t, e)
defer te.tearDown()
// Case1: Client API sets timeout to be 1ns and ServiceConfig sets timeout to be 1hr. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
mc := grpc.MethodConfig{
Timeout: newDuration(time.Hour),
}
m := make(map[string]grpc.MethodConfig)
m["/grpc.testing.TestService/EmptyCall"] = mc
m["/grpc.testing.TestService/FullDuplexCall"] = mc
sc := grpc.ServiceConfig{
Methods: m,
}
ch <- sc
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
// The following RPCs are expected to become non-fail-fast ones with 1ns deadline.
ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
}
ctx, _ = context.WithTimeout(context.Background(), time.Nanosecond)
if _, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
}
// Generate a service config update.
// Case2: Client API sets timeout to be 1hr and ServiceConfig sets timeout to be 1ns. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
mc.Timeout = newDuration(time.Nanosecond)
m = make(map[string]grpc.MethodConfig)
m["/grpc.testing.TestService/EmptyCall"] = mc
m["/grpc.testing.TestService/FullDuplexCall"] = mc
sc = grpc.ServiceConfig{
Methods: m,
}
ch <- sc
// Wait for the new service config to take effect.
mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall")
for {
if *mc.Timeout != time.Nanosecond {
time.Sleep(100 * time.Millisecond)
mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall")
continue
}
break
}
ctx, _ = context.WithTimeout(context.Background(), time.Hour)
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
}
ctx, _ = context.WithTimeout(context.Background(), time.Hour)
if _, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
}
}
func TestServiceConfigMaxMsgSize(t *testing.T) {
defer leakCheck(t)()
for _, e := range listTestEnv() {
testServiceConfigMaxMsgSize(t, e)
}
}
func testServiceConfigMaxMsgSize(t *testing.T, e env) {
// Setting up values and objects shared across all test cases.
const smallSize = 1
const largeSize = 1024
const extraLargeSize = 2048
smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
if err != nil {
t.Fatal(err)
}
largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
if err != nil {
t.Fatal(err)
}
extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize)
if err != nil {
t.Fatal(err)
}
mc := grpc.MethodConfig{
MaxReqSize: newInt(extraLargeSize),
MaxRespSize: newInt(extraLargeSize),
}
m := make(map[string]grpc.MethodConfig)
m["/grpc.testing.TestService/UnaryCall"] = mc
m["/grpc.testing.TestService/FullDuplexCall"] = mc
sc := grpc.ServiceConfig{
Methods: m,
}
// Case1: sc set maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
te1, ch1 := testServiceConfigSetup(t, e)
te1.startServer(&testServer{security: e.security})
defer te1.tearDown()
ch1 <- sc
tc := testpb.NewTestServiceClient(te1.clientConn())
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
ResponseSize: proto.Int32(int32(extraLargeSize)),
Payload: smallPayload,
}
// Test for unary RPC recv.
if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
}
// Test for unary RPC send.
req.Payload = extraLargePayload
req.ResponseSize = proto.Int32(int32(smallSize))
if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
}
// Test for streaming RPC recv.
respParam := []*testpb.ResponseParameters{
{
Size: proto.Int32(int32(extraLargeSize)),
},
}
sreq := &testpb.StreamingOutputCallRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
ResponseParameters: respParam,
Payload: smallPayload,
}
stream, err := tc.FullDuplexCall(te1.ctx)
if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}
if err := stream.Send(sreq); err != nil {
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
}
if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
}
// Test for streaming RPC send.
respParam[0].Size = proto.Int32(int32(smallSize))
sreq.Payload = extraLargePayload
stream, err = tc.FullDuplexCall(te1.ctx)
if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}
if err := stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
}
// Case2: Client API set maxReqSize to 1024 (send), maxRespSize to 1024 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
te2, ch2 := testServiceConfigSetup(t, e)
te2.maxClientReceiveMsgSize = newInt(1024)
te2.maxClientSendMsgSize = newInt(1024)
te2.startServer(&testServer{security: e.security})
defer te2.tearDown()
ch2 <- sc
tc = testpb.NewTestServiceClient(te2.clientConn())
// Test for unary RPC recv.
req.Payload = smallPayload
req.ResponseSize = proto.Int32(int32(largeSize))
if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
}
// Test for unary RPC send.
req.Payload = largePayload
req.ResponseSize = proto.Int32(int32(smallSize))
if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
}
// Test for streaming RPC recv.
stream, err = tc.FullDuplexCall(te2.ctx)
respParam[0].Size = proto.Int32(int32(largeSize))
sreq.Payload = smallPayload
if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}
if err := stream.Send(sreq); err != nil {
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
}
if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
}
// Test for streaming RPC send.
respParam[0].Size = proto.Int32(int32(smallSize))
sreq.Payload = largePayload
stream, err = tc.FullDuplexCall(te2.ctx)
if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}
if err := stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
}
// Case3: Client API set maxReqSize to 4096 (send), maxRespSize to 4096 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
te3, ch3 := testServiceConfigSetup(t, e)
te3.maxClientReceiveMsgSize = newInt(4096)
te3.maxClientSendMsgSize = newInt(4096)
te3.startServer(&testServer{security: e.security})
defer te3.tearDown()
ch3 <- sc
tc = testpb.NewTestServiceClient(te3.clientConn())
// Test for unary RPC recv.
req.Payload = smallPayload
req.ResponseSize = proto.Int32(int32(largeSize))
if _, err := tc.UnaryCall(context.Background(), req); err != nil {
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err)
}
req.ResponseSize = proto.Int32(int32(extraLargeSize))
if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
}
// Test for unary RPC send.
req.Payload = largePayload
req.ResponseSize = proto.Int32(int32(smallSize))
if _, err := tc.UnaryCall(context.Background(), req); err != nil {
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err)
}
req.Payload = extraLargePayload
if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
}
// Test for streaming RPC recv.
stream, err = tc.FullDuplexCall(te3.ctx)
if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}
respParam[0].Size = proto.Int32(int32(largeSize))
sreq.Payload = smallPayload
if err := stream.Send(sreq); err != nil {
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
}
if _, err := stream.Recv(); err != nil {
t.Fatalf("%v.Recv() = _, %v, want <nil>", stream, err)
}
respParam[0].Size = proto.Int32(int32(extraLargeSize))
if err := stream.Send(sreq); err != nil {
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
}
if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
}
// Test for streaming RPC send.
respParam[0].Size = proto.Int32(int32(smallSize))
sreq.Payload = largePayload
stream, err = tc.FullDuplexCall(te3.ctx)
if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}
if err := stream.Send(sreq); err != nil {
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
}
sreq.Payload = extraLargePayload
if err := stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
}
}
func TestMaxMsgSizeClientDefault(t *testing.T) {
defer leakCheck(t)()
for _, e := range listTestEnv() {
testMaxMsgSizeClientDefault(t, e)
}
}
func testMaxMsgSizeClientDefault(t *testing.T, e env) {
te := newTest(t, e)
te.userAgent = testAppUA
// To avoid error on server side.
te.maxServerSendMsgSize = newInt(5 * 1024 * 1024)
te.declareLogNoise(
"transport: http2Client.notifyError got notified that the client transport was broken EOF",
"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
"grpc: addrConn.resetTransport failed to create client transport: connection error",
"Failed to dial : context canceled; please retry.",
)
te.startServer(&testServer{security: e.security})
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
const smallSize = 1
const largeSize = 4 * 1024 * 1024
smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
if err != nil {
t.Fatal(err)
}
largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
if err != nil {
t.Fatal(err)
}
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
ResponseSize: proto.Int32(int32(largeSize)),
Payload: smallPayload,
}
// Test for unary RPC recv.
if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
}
// Test for unary RPC send.
req.Payload = largePayload
req.ResponseSize = proto.Int32(int32(smallSize))
if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
}
respParam := []*testpb.ResponseParameters{
{
Size: proto.Int32(int32(largeSize)),
},
}
sreq := &testpb.StreamingOutputCallRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
ResponseParameters: respParam,
Payload: smallPayload,
}
// Test for streaming RPC recv.
stream, err := tc.FullDuplexCall(te.ctx)
if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}
if err := stream.Send(sreq); err != nil {
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
}
if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
}
// Test for streaming RPC send.
respParam[0].Size = proto.Int32(int32(smallSize))
sreq.Payload = largePayload
stream, err = tc.FullDuplexCall(te.ctx)
if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}
if err := stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("%v.Send(%v) = %v, want _, error codes: %s", stream, sreq, err, codes.ResourceExhausted)
}
}
func TestMaxMsgSizeClientAPI(t *testing.T) {
defer leakCheck(t)()
for _, e := range listTestEnv() {
testMaxMsgSizeClientAPI(t, e)
}
}
func testMaxMsgSizeClientAPI(t *testing.T, e env) {
te := newTest(t, e)
te.userAgent = testAppUA
// To avoid error on server side.
te.maxServerSendMsgSize = newInt(5 * 1024 * 1024)
te.maxClientReceiveMsgSize = newInt(1024)
te.maxClientSendMsgSize = newInt(1024)
te.declareLogNoise(
"transport: http2Client.notifyError got notified that the client transport was broken EOF",
"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
"grpc: addrConn.resetTransport failed to create client transport: connection error",
"Failed to dial : context canceled; please retry.",
)
te.startServer(&testServer{security: e.security})
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
const smallSize = 1
const largeSize = 1024
smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
if err != nil {
t.Fatal(err)
}
largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
if err != nil {
t.Fatal(err)
}
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
ResponseSize: proto.Int32(int32(largeSize)),
Payload: smallPayload,
}
// Test for unary RPC recv.
if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
}
// Test for unary RPC send.
req.Payload = largePayload
req.ResponseSize = proto.Int32(int32(smallSize))
if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
}
respParam := []*testpb.ResponseParameters{
{
Size: proto.Int32(int32(largeSize)),
},
}
sreq := &testpb.StreamingOutputCallRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
ResponseParameters: respParam,
Payload: smallPayload,
}
// Test for streaming RPC recv.
stream, err := tc.FullDuplexCall(te.ctx)
if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}
if err := stream.Send(sreq); err != nil {
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
}
if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
}
// Test for streaming RPC send.
respParam[0].Size = proto.Int32(int32(smallSize))
sreq.Payload = largePayload
stream, err = tc.FullDuplexCall(te.ctx)
if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}
if err := stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
}
}
func TestMaxMsgSizeServerAPI(t *testing.T) {
defer leakCheck(t)()
for _, e := range listTestEnv() {
testMaxMsgSizeServerAPI(t, e)
}
}
func testMaxMsgSizeServerAPI(t *testing.T, e env) {
te := newTest(t, e)
te.userAgent = testAppUA
te.maxServerReceiveMsgSize = newInt(1024)
te.maxServerSendMsgSize = newInt(1024)
te.declareLogNoise(
"transport: http2Client.notifyError got notified that the client transport was broken EOF",
"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
"grpc: addrConn.resetTransport failed to create client transport: connection error",
"Failed to dial : context canceled; please retry.",
)
te.startServer(&testServer{security: e.security})
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
const smallSize = 1
const largeSize = 1024
smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
if err != nil {
t.Fatal(err)
}
largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
if err != nil {
t.Fatal(err)
}
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
ResponseSize: proto.Int32(int32(largeSize)),
Payload: smallPayload,
}
// Test for unary RPC send.
if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
}
// Test for unary RPC recv.
req.Payload = largePayload
req.ResponseSize = proto.Int32(int32(smallSize))
if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
}
respParam := []*testpb.ResponseParameters{
{
Size: proto.Int32(int32(largeSize)),
},
}
sreq := &testpb.StreamingOutputCallRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
ResponseParameters: respParam,
Payload: smallPayload,
}
// Test for streaming RPC send.
stream, err := tc.FullDuplexCall(te.ctx)
if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}
if err := stream.Send(sreq); err != nil {
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
}
if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
}
// Test for streaming RPC recv.
respParam[0].Size = proto.Int32(int32(smallSize))
sreq.Payload = largePayload
stream, err = tc.FullDuplexCall(te.ctx)
if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}
if err := stream.Send(sreq); err != nil {
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
}
if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
} }
} }
@ -1486,6 +2122,7 @@ func testLargeUnary(t *testing.T, e env) {
} }
} }
// Test backward-compatability API for setting msg size limit.
func TestExceedMsgLimit(t *testing.T) { func TestExceedMsgLimit(t *testing.T) {
defer leakCheck(t)() defer leakCheck(t)()
for _, e := range listTestEnv() { for _, e := range listTestEnv() {
@ -1495,12 +2132,12 @@ func TestExceedMsgLimit(t *testing.T) {
func testExceedMsgLimit(t *testing.T, e env) { func testExceedMsgLimit(t *testing.T, e env) {
te := newTest(t, e) te := newTest(t, e)
te.maxMsgSize = 1024 te.maxMsgSize = newInt(1024)
te.startServer(&testServer{security: e.security}) te.startServer(&testServer{security: e.security})
defer te.tearDown() defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn()) tc := testpb.NewTestServiceClient(te.clientConn())
argSize := int32(te.maxMsgSize + 1) argSize := int32(*te.maxMsgSize + 1)
const smallSize = 1 const smallSize = 1
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
@ -1512,23 +2149,23 @@ func testExceedMsgLimit(t *testing.T, e env) {
t.Fatal(err) t.Fatal(err)
} }
// test on server side for unary RPC // Test on server side for unary RPC.
req := &testpb.SimpleRequest{ req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(), ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
ResponseSize: proto.Int32(smallSize), ResponseSize: proto.Int32(smallSize),
Payload: payload, Payload: payload,
} }
if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.Internal { if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.Internal) t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
} }
// test on client side for unary RPC // Test on client side for unary RPC.
req.ResponseSize = proto.Int32(int32(te.maxMsgSize) + 1) req.ResponseSize = proto.Int32(int32(*te.maxMsgSize) + 1)
req.Payload = smallPayload req.Payload = smallPayload
if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.Internal { if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.Internal) t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
} }
// test on server side for streaming RPC // Test on server side for streaming RPC.
stream, err := tc.FullDuplexCall(te.ctx) stream, err := tc.FullDuplexCall(te.ctx)
if err != nil { if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
@ -1539,7 +2176,7 @@ func testExceedMsgLimit(t *testing.T, e env) {
}, },
} }
spayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(te.maxMsgSize+1)) spayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(*te.maxMsgSize+1))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -1552,22 +2189,22 @@ func testExceedMsgLimit(t *testing.T, e env) {
if err := stream.Send(sreq); err != nil { if err := stream.Send(sreq); err != nil {
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
} }
if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.Internal { if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.Internal) t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
} }
// test on client side for streaming RPC // Test on client side for streaming RPC.
stream, err = tc.FullDuplexCall(te.ctx) stream, err = tc.FullDuplexCall(te.ctx)
if err != nil { if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
} }
respParam[0].Size = proto.Int32(int32(te.maxMsgSize) + 1) respParam[0].Size = proto.Int32(int32(*te.maxMsgSize) + 1)
sreq.Payload = smallPayload sreq.Payload = smallPayload
if err := stream.Send(sreq); err != nil { if err := stream.Send(sreq); err != nil {
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
} }
if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.Internal { if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.Internal) t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
} }
} }