make max size a pointer type and initialize function a CallOption

This commit is contained in:
Yuxuan Li
2017-04-26 15:50:58 -07:00
parent 983d8372ea
commit 9c5f260e67
5 changed files with 102 additions and 94 deletions

24
call.go
View File

@ -52,7 +52,7 @@ import (
// //
// TODO(zhaoq): Check whether the received message sequence is valid. // TODO(zhaoq): Check whether the received message sequence is valid.
// TODO ctx is used for stats collection and processing. It is the context passed from the application. // TODO ctx is used for stats collection and processing. It is the context passed from the application.
func recvResponse(ctx context.Context, dopts dialOptions, msgSizeLimit int, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) { func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) {
// Try to acquire header metadata from the server if there is any. // Try to acquire header metadata from the server if there is any.
defer func() { defer func() {
if err != nil { if err != nil {
@ -73,7 +73,7 @@ func recvResponse(ctx context.Context, dopts dialOptions, msgSizeLimit int, t tr
} }
} }
for { for {
if err = recv(p, dopts.codec, stream, dopts.dc, reply, msgSizeLimit, inPayload); err != nil { if err = recv(p, dopts.codec, stream, dopts.dc, reply, *c.maxReceiveMessageSize, inPayload); err != nil {
if err == io.EOF { if err == io.EOF {
break break
} }
@ -93,7 +93,7 @@ func recvResponse(ctx context.Context, dopts dialOptions, msgSizeLimit int, t tr
} }
// sendRequest writes out various information of an RPC such as Context and Message. // sendRequest writes out various information of an RPC such as Context and Message.
func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, msgSizeLimit int, callHdr *transport.CallHdr, t transport.ClientTransport, args interface{}, opts *transport.Options) (_ *transport.Stream, err error) { func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, c *callInfo, callHdr *transport.CallHdr, t transport.ClientTransport, args interface{}, opts *transport.Options) (_ *transport.Stream, err error) {
stream, err := t.NewStream(ctx, callHdr) stream, err := t.NewStream(ctx, callHdr)
if err != nil { if err != nil {
return nil, err return nil, err
@ -122,8 +122,8 @@ func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor,
if err != nil { if err != nil {
return nil, Errorf(codes.Internal, "grpc: %v", err) return nil, Errorf(codes.Internal, "grpc: %v", err)
} }
if len(outBuf) > msgSizeLimit { if len(outBuf) > *c.maxSendMessageSize {
return nil, Errorf(codes.ResourceExhausted, "Sent message larger than max (%d vs. %d)", len(outBuf), msgSizeLimit) return nil, Errorf(codes.ResourceExhausted, "Sent message larger than max (%d vs. %d)", len(outBuf), *c.maxSendMessageSize)
} }
err = t.Write(stream, outBuf, opts) err = t.Write(stream, outBuf, opts)
if err == nil && outPayload != nil { if err == nil && outPayload != nil {
@ -152,7 +152,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) { func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) {
c := defaultCallInfo c := defaultCallInfo
mc := cc.GetMethodConfig(method) mc, _ := cc.GetMethodConfig(method)
if mc.WaitForReady != nil { if mc.WaitForReady != nil {
c.failFast = !*mc.WaitForReady c.failFast = !*mc.WaitForReady
} }
@ -163,9 +163,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
defer cancel() defer cancel()
} }
maxSendMessageSize := getMaxSize(mc.MaxReqSize, cc.dopts.maxSendMessageSize, defaultClientMaxSendMessageSize) opts = append(cc.dopts.callOptions, opts...)
maxReceiveMessageSize := getMaxSize(mc.MaxRespSize, cc.dopts.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
for _, o := range opts { for _, o := range opts {
if err := o.before(&c); err != nil { if err := o.before(&c); err != nil {
return toRPCErr(err) return toRPCErr(err)
@ -176,6 +174,10 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
o.after(&c) o.after(&c)
} }
}() }()
c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize)
c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
if EnableTracing { if EnableTracing {
c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method) c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
defer c.traceInfo.tr.Finish() defer c.traceInfo.tr.Finish()
@ -255,7 +257,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
if c.traceInfo.tr != nil { if c.traceInfo.tr != nil {
c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true) c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true)
} }
stream, err = sendRequest(ctx, cc.dopts, cc.dopts.cp, maxSendMessageSize, callHdr, t, args, topts) stream, err = sendRequest(ctx, cc.dopts, cc.dopts.cp, &c, callHdr, t, args, topts)
if err != nil { if err != nil {
if put != nil { if put != nil {
put() put()
@ -272,7 +274,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
} }
return toRPCErr(err) return toRPCErr(err)
} }
err = recvResponse(ctx, cc.dopts, maxReceiveMessageSize, t, &c, stream, reply) err = recvResponse(ctx, cc.dopts, t, &c, stream, reply)
if err != nil { if err != nil {
if put != nil { if put != nil {
put() put()

View File

@ -86,20 +86,19 @@ var (
// dialOptions configure a Dial call. dialOptions are set by the DialOption // dialOptions configure a Dial call. dialOptions are set by the DialOption
// values passed to Dial. // values passed to Dial.
type dialOptions struct { type dialOptions struct {
unaryInt UnaryClientInterceptor unaryInt UnaryClientInterceptor
streamInt StreamClientInterceptor streamInt StreamClientInterceptor
codec Codec codec Codec
cp Compressor cp Compressor
dc Decompressor dc Decompressor
bs backoffStrategy bs backoffStrategy
balancer Balancer balancer Balancer
block bool block bool
insecure bool insecure bool
timeout time.Duration timeout time.Duration
scChan <-chan ServiceConfig scChan <-chan ServiceConfig
copts transport.ConnectOptions copts transport.ConnectOptions
maxReceiveMessageSize *int callOptions []CallOption
maxSendMessageSize *int
} }
const ( const (
@ -114,20 +113,13 @@ type DialOption func(*dialOptions)
// WithMaxMsgSize Deprecated: use WithMaxReceiveMessageSize instead. // WithMaxMsgSize Deprecated: use WithMaxReceiveMessageSize instead.
func WithMaxMsgSize(s int) DialOption { func WithMaxMsgSize(s int) DialOption {
return WithMaxReceiveMessageSize(s) return WithDefaultCallOptions(WithMaxReceiveMessageSize(s))
} }
// WithMaxReceiveMessageSize returns a DialOption which sets the maximum message size the client can receive. Negative input is invalid and has the same effect as not setting the field. // WithDefaultCallOptions returns a DialOption which sets the default CallOptions for calls over the connection.
func WithMaxReceiveMessageSize(s int) DialOption { func WithDefaultCallOptions(cos ...CallOption) DialOption {
return func(o *dialOptions) { return func(o *dialOptions) {
*o.maxReceiveMessageSize = s o.callOptions = append(o.callOptions, cos...)
}
}
// WithMaxSendMessageSize returns a DialOption which sets the maximum message size the client can send. Negative input is invalid and has the same effect as not seeting the field.
func WithMaxSendMessageSize(s int) DialOption {
return func(o *dialOptions) {
*o.maxSendMessageSize = s
} }
} }
@ -642,13 +634,13 @@ func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error)
// GetMethodConfig gets the method config of the input method. If there's no exact match for the input method (i.e. /service/method), we will return the default config for all methods under the service (/service/). // GetMethodConfig gets the method config of the input method. If there's no exact match for the input method (i.e. /service/method), we will return the default config for all methods under the service (/service/).
// TODO: Avoid the locking here. // TODO: Avoid the locking here.
func (cc *ClientConn) GetMethodConfig(method string) (m MethodConfig) { func (cc *ClientConn) GetMethodConfig(method string) (m MethodConfig, ok bool) {
cc.mu.RLock() cc.mu.RLock()
defer cc.mu.RUnlock() defer cc.mu.RUnlock()
m, ok := cc.sc.Methods[method] m, ok = cc.sc.Methods[method]
if !ok { if !ok {
i := strings.LastIndex(method, "/") i := strings.LastIndex(method, "/")
m, _ = cc.sc.Methods[method[:i+1]] m, ok = cc.sc.Methods[method[:i+1]]
} }
return return
} }

View File

@ -111,11 +111,13 @@ func (d *gzipDecompressor) Type() string {
// callInfo contains all related configuration and information about an RPC. // callInfo contains all related configuration and information about an RPC.
type callInfo struct { type callInfo struct {
failFast bool failFast bool
headerMD metadata.MD headerMD metadata.MD
trailerMD metadata.MD trailerMD metadata.MD
peer *peer.Peer peer *peer.Peer
traceInfo traceInfo // in trace.go traceInfo traceInfo // in trace.go
maxReceiveMessageSize *int
maxSendMessageSize *int
} }
var defaultCallInfo = callInfo{failFast: true} var defaultCallInfo = callInfo{failFast: true}
@ -181,6 +183,22 @@ func FailFast(failFast bool) CallOption {
}) })
} }
// WithMaxReceiveMessageSize returns a DialOption which sets the maximum message size the client can receive. Negative input is invalid and has the same effect as not setting the field.
func WithMaxReceiveMessageSize(s int) CallOption {
return beforeCall(func(o *callInfo) error {
o.maxReceiveMessageSize = &s
return nil
})
}
// WithMaxSendMessageSize returns a DialOption which sets the maximum message size the client can send. Negative input is invalid and has the same effect as not seeting the field.
func WithMaxSendMessageSize(s int) CallOption {
return beforeCall(func(o *callInfo) error {
o.maxSendMessageSize = &s
return nil
})
}
// The format of the payload: compressed or not? // The format of the payload: compressed or not?
type payloadFormat uint8 type payloadFormat uint8
@ -476,24 +494,24 @@ const SupportPackageIsVersion4 = true
// Version is the current grpc version. // Version is the current grpc version.
const Version = "1.3.0-dev" const Version = "1.3.0-dev"
func min(a, b int) int { func min(a, b *int) *int {
if a < b { if *a < *b {
return a return a
} }
return b return b
} }
func getMaxSize(mcMax, doptMax *int, defaultVal int) int { func getMaxSize(mcMax, doptMax *int, defaultVal int) *int {
if mcMax == nil && doptMax == nil { if mcMax == nil && doptMax == nil {
return defaultVal return &defaultVal
} }
if mcMax != nil && doptMax != nil { if mcMax != nil && doptMax != nil {
return min(*mcMax, *doptMax) return min(mcMax, doptMax)
} }
if mcMax != nil { if mcMax != nil {
return *mcMax return mcMax
} }
return *doptMax return doptMax
} }
const grpcUA = "grpc-go/" + Version const grpcUA = "grpc-go/" + Version

View File

@ -113,7 +113,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
cancel context.CancelFunc cancel context.CancelFunc
) )
c := defaultCallInfo c := defaultCallInfo
mc := cc.GetMethodConfig(method) mc, _ := cc.GetMethodConfig(method)
if mc.WaitForReady != nil { if mc.WaitForReady != nil {
c.failFast = !*mc.WaitForReady c.failFast = !*mc.WaitForReady
} }
@ -124,13 +124,15 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
defer cancel() defer cancel()
} }
maxSendMessageSize := getMaxSize(mc.MaxReqSize, cc.dopts.maxSendMessageSize, defaultClientMaxSendMessageSize) opts = append(cc.dopts.callOptions, opts...)
maxReceiveMessageSize := getMaxSize(mc.MaxRespSize, cc.dopts.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
for _, o := range opts { for _, o := range opts {
if err := o.before(&c); err != nil { if err := o.before(&c); err != nil {
return nil, toRPCErr(err) return nil, toRPCErr(err)
} }
} }
c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize)
c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
callHdr := &transport.CallHdr{ callHdr := &transport.CallHdr{
Host: cc.authority, Host: cc.authority,
Method: method, Method: method,
@ -221,8 +223,8 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
codec: cc.dopts.codec, codec: cc.dopts.codec,
cp: cc.dopts.cp, cp: cc.dopts.cp,
dc: cc.dopts.dc, dc: cc.dopts.dc,
maxReceiveMessageSize: maxReceiveMessageSize, maxReceiveMessageSize: *c.maxReceiveMessageSize,
maxSendMessageSize: maxSendMessageSize, maxSendMessageSize: *c.maxSendMessageSize,
cancel: cancel, cancel: cancel,
put: put, put: put,

View File

@ -433,11 +433,11 @@ type test struct {
healthServer *health.Server // nil means disabled healthServer *health.Server // nil means disabled
maxStream uint32 maxStream uint32
tapHandle tap.ServerInHandle tapHandle tap.ServerInHandle
maxMsgSize int maxMsgSize *int
maxClientReceiveMsgSize int maxClientReceiveMsgSize *int
maxClientSendMsgSize int maxClientSendMsgSize *int
maxServerReceiveMsgSize int maxServerReceiveMsgSize *int
maxServerSendMsgSize int maxServerSendMsgSize *int
userAgent string userAgent string
clientCompression bool clientCompression bool
serverCompression bool serverCompression bool
@ -483,12 +483,6 @@ func newTest(t *testing.T, e env) *test {
t: t, t: t,
e: e, e: e,
maxStream: math.MaxUint32, maxStream: math.MaxUint32,
// Default value 0 is meaningful (0 byte msg size limit), thus using -1 to indiciate the field is unset.
maxClientReceiveMsgSize: -1,
maxClientSendMsgSize: -1,
maxServerReceiveMsgSize: -1,
maxServerSendMsgSize: -1,
maxMsgSize: -1,
} }
te.ctx, te.cancel = context.WithCancel(context.Background()) te.ctx, te.cancel = context.WithCancel(context.Background())
return te return te
@ -500,14 +494,14 @@ func (te *test) startServer(ts testpb.TestServiceServer) {
te.testServer = ts te.testServer = ts
te.t.Logf("Running test in %s environment...", te.e.name) te.t.Logf("Running test in %s environment...", te.e.name)
sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(te.maxStream)} sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(te.maxStream)}
if te.maxMsgSize >= 0 { if te.maxMsgSize != nil {
sopts = append(sopts, grpc.MaxMsgSize(te.maxMsgSize)) sopts = append(sopts, grpc.MaxMsgSize(*te.maxMsgSize))
} }
if te.maxServerReceiveMsgSize >= 0 { if te.maxServerReceiveMsgSize != nil {
sopts = append(sopts, grpc.MaxReceiveMessageSize(te.maxServerReceiveMsgSize)) sopts = append(sopts, grpc.MaxReceiveMessageSize(*te.maxServerReceiveMsgSize))
} }
if te.maxServerSendMsgSize >= 0 { if te.maxServerSendMsgSize != nil {
sopts = append(sopts, grpc.MaxSendMessageSize(te.maxServerSendMsgSize)) sopts = append(sopts, grpc.MaxSendMessageSize(*te.maxServerSendMsgSize))
} }
if te.tapHandle != nil { if te.tapHandle != nil {
sopts = append(sopts, grpc.InTapHandle(te.tapHandle)) sopts = append(sopts, grpc.InTapHandle(te.tapHandle))
@ -600,14 +594,14 @@ func (te *test) clientConn() *grpc.ClientConn {
if te.streamClientInt != nil { if te.streamClientInt != nil {
opts = append(opts, grpc.WithStreamInterceptor(te.streamClientInt)) opts = append(opts, grpc.WithStreamInterceptor(te.streamClientInt))
} }
if te.maxMsgSize >= 0 { if te.maxMsgSize != nil {
opts = append(opts, grpc.WithMaxMsgSize(te.maxMsgSize)) opts = append(opts, grpc.WithMaxMsgSize(*te.maxMsgSize))
} }
if te.maxClientReceiveMsgSize >= 0 { if te.maxClientReceiveMsgSize != nil {
opts = append(opts, grpc.WithMaxReceiveMessageSize(te.maxClientReceiveMsgSize)) opts = append(opts, grpc.WithDefaultCallOptions(grpc.WithMaxReceiveMessageSize(*te.maxClientReceiveMsgSize)))
} }
if te.maxClientSendMsgSize >= 0 { if te.maxClientSendMsgSize != nil {
opts = append(opts, grpc.WithMaxSendMessageSize(te.maxClientSendMsgSize)) opts = append(opts, grpc.WithDefaultCallOptions(grpc.WithMaxSendMessageSize(*te.maxClientSendMsgSize)))
} }
if te.timeout > 0 { if te.timeout > 0 {
opts = append(opts, grpc.WithTimeout(te.timeout)) opts = append(opts, grpc.WithTimeout(te.timeout))
@ -1334,8 +1328,8 @@ func testServiceConfigMaxMsgSize(t *testing.T, e env) {
// Case2: Client API set maxReqSize to 1024 (send), maxRespSize to 1024 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv). // Case2: Client API set maxReqSize to 1024 (send), maxRespSize to 1024 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
te2, ch2 := testServiceConfigSetup(t, e) te2, ch2 := testServiceConfigSetup(t, e)
te2.maxClientReceiveMsgSize = 1024 te2.maxClientReceiveMsgSize = newInt(1024)
te2.maxClientSendMsgSize = 1024 te2.maxClientSendMsgSize = newInt(1024)
te2.startServer(&testServer{security: e.security}) te2.startServer(&testServer{security: e.security})
defer te2.tearDown() defer te2.tearDown()
ch2 <- sc ch2 <- sc
@ -1383,8 +1377,8 @@ func testServiceConfigMaxMsgSize(t *testing.T, e env) {
// Case3: Client API set maxReqSize to 4096 (send), maxRespSize to 4096 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv). // Case3: Client API set maxReqSize to 4096 (send), maxRespSize to 4096 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
te3, ch3 := testServiceConfigSetup(t, e) te3, ch3 := testServiceConfigSetup(t, e)
te3.maxClientReceiveMsgSize = 4096 te3.maxClientReceiveMsgSize = newInt(4096)
te3.maxClientSendMsgSize = 4096 te3.maxClientSendMsgSize = newInt(4096)
te3.startServer(&testServer{security: e.security}) te3.startServer(&testServer{security: e.security})
defer te3.tearDown() defer te3.tearDown()
ch3 <- sc ch3 <- sc
@ -1468,7 +1462,7 @@ func testMaxMsgSizeClientDefault(t *testing.T, e env) {
te := newTest(t, e) te := newTest(t, e)
te.userAgent = testAppUA te.userAgent = testAppUA
// To avoid error on server side. // To avoid error on server side.
te.maxServerSendMsgSize = 5 * 1024 * 1024 te.maxServerSendMsgSize = newInt(5 * 1024 * 1024)
te.declareLogNoise( te.declareLogNoise(
"transport: http2Client.notifyError got notified that the client transport was broken EOF", "transport: http2Client.notifyError got notified that the client transport was broken EOF",
"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
@ -1547,9 +1541,9 @@ func testMaxMsgSizeClientAPI(t *testing.T, e env) {
te := newTest(t, e) te := newTest(t, e)
te.userAgent = testAppUA te.userAgent = testAppUA
// To avoid error on server side. // To avoid error on server side.
te.maxServerSendMsgSize = 5 * 1024 * 1024 te.maxServerSendMsgSize = newInt(5 * 1024 * 1024)
te.maxClientReceiveMsgSize = 1024 te.maxClientReceiveMsgSize = newInt(1024)
te.maxClientSendMsgSize = 1024 te.maxClientSendMsgSize = newInt(1024)
te.declareLogNoise( te.declareLogNoise(
"transport: http2Client.notifyError got notified that the client transport was broken EOF", "transport: http2Client.notifyError got notified that the client transport was broken EOF",
"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
@ -1627,8 +1621,8 @@ func testMaxMsgSizeClientAPI(t *testing.T, e env) {
func testMaxMsgSizeServerAPI(t *testing.T, e env) { func testMaxMsgSizeServerAPI(t *testing.T, e env) {
te := newTest(t, e) te := newTest(t, e)
te.userAgent = testAppUA te.userAgent = testAppUA
te.maxServerReceiveMsgSize = 1024 te.maxServerReceiveMsgSize = newInt(1024)
te.maxServerSendMsgSize = 1024 te.maxServerSendMsgSize = newInt(1024)
te.declareLogNoise( te.declareLogNoise(
"transport: http2Client.notifyError got notified that the client transport was broken EOF", "transport: http2Client.notifyError got notified that the client transport was broken EOF",
"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
@ -2032,12 +2026,12 @@ func TestExceedMsgLimit(t *testing.T) {
func testExceedMsgLimit(t *testing.T, e env) { func testExceedMsgLimit(t *testing.T, e env) {
te := newTest(t, e) te := newTest(t, e)
te.maxMsgSize = 1024 te.maxMsgSize = newInt(1024)
te.startServer(&testServer{security: e.security}) te.startServer(&testServer{security: e.security})
defer te.tearDown() defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn()) tc := testpb.NewTestServiceClient(te.clientConn())
argSize := int32(te.maxMsgSize + 1) argSize := int32(*te.maxMsgSize + 1)
const smallSize = 1 const smallSize = 1
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
@ -2059,7 +2053,7 @@ func testExceedMsgLimit(t *testing.T, e env) {
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
} }
// Test on client side for unary RPC. // Test on client side for unary RPC.
req.ResponseSize = proto.Int32(int32(te.maxMsgSize) + 1) req.ResponseSize = proto.Int32(int32(*te.maxMsgSize) + 1)
req.Payload = smallPayload req.Payload = smallPayload
if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted { if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
@ -2076,7 +2070,7 @@ func testExceedMsgLimit(t *testing.T, e env) {
}, },
} }
spayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(te.maxMsgSize+1)) spayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(*te.maxMsgSize+1))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -2098,7 +2092,7 @@ func testExceedMsgLimit(t *testing.T, e env) {
if err != nil { if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
} }
respParam[0].Size = proto.Int32(int32(te.maxMsgSize) + 1) respParam[0].Size = proto.Int32(int32(*te.maxMsgSize) + 1)
sreq.Payload = smallPayload sreq.Payload = smallPayload
if err := stream.Send(sreq); err != nil { if err := stream.Send(sreq); err != nil {
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)