add timeout test, add check or pointer filed in callOption, fix minor issues

This commit is contained in:
Yuxuan Li
2017-05-15 13:51:11 -07:00
parent 59426b3c05
commit bdf9a640e4
6 changed files with 136 additions and 54 deletions

10
call.go
View File

@ -73,6 +73,10 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran
} }
} }
for { for {
if c.maxReceiveMessageSize == nil {
// TODO(lyuxuan): codes.Internal the right error to return here?
return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)")
}
if err = recv(p, dopts.codec, stream, dopts.dc, reply, *c.maxReceiveMessageSize, inPayload); err != nil { if err = recv(p, dopts.codec, stream, dopts.dc, reply, *c.maxReceiveMessageSize, inPayload); err != nil {
if err == io.EOF { if err == io.EOF {
break break
@ -118,6 +122,10 @@ func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor,
if err != nil { if err != nil {
return Errorf(codes.Internal, "grpc: %v", err) return Errorf(codes.Internal, "grpc: %v", err)
} }
if c.maxSendMessageSize == nil {
// TODO(lyuxuan): codes.Internal the right error to return here?
return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)")
}
if len(outBuf) > *c.maxSendMessageSize { if len(outBuf) > *c.maxSendMessageSize {
return Errorf(codes.ResourceExhausted, "Sent message larger than max (%d vs. %d)", len(outBuf), *c.maxSendMessageSize) return Errorf(codes.ResourceExhausted, "Sent message larger than max (%d vs. %d)", len(outBuf), *c.maxSendMessageSize)
} }
@ -148,7 +156,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) { func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) {
c := defaultCallInfo c := defaultCallInfo
mc, _ := cc.GetMethodConfig(method) mc := cc.GetMethodConfig(method)
if mc.WaitForReady != nil { if mc.WaitForReady != nil {
c.failFast = !*mc.WaitForReady c.failFast = !*mc.WaitForReady
} }

View File

@ -126,7 +126,7 @@ func WithInitialConnWindowSize(s int32) DialOption {
} }
} }
// WithMaxMsgSize Deprecated: use WithMaxReceiveMessageSize instead. // WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. Deprecated: use WithMaxReceiveMessageSize instead.
func WithMaxMsgSize(s int) DialOption { func WithMaxMsgSize(s int) DialOption {
return WithDefaultCallOptions(WithMaxReceiveMessageSize(s)) return WithDefaultCallOptions(WithMaxReceiveMessageSize(s))
} }
@ -367,12 +367,14 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
} }
}() }()
scSet := false
if cc.dopts.scChan != nil { if cc.dopts.scChan != nil {
// Try to get an initial service config. // Try to get an initial service config.
select { select {
case sc, ok := <-cc.dopts.scChan: case sc, ok := <-cc.dopts.scChan:
if ok { if ok {
cc.sc = sc cc.sc = sc
scSet = true
} }
default: default:
} }
@ -436,7 +438,17 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
return nil, err return nil, err
} }
} }
if cc.dopts.scChan != nil && !scSet {
// Blocking Wait for the initial service config.
select {
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = sc
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
if cc.dopts.scChan != nil { if cc.dopts.scChan != nil {
go cc.scWatcher() go cc.scWatcher()
} }
@ -646,17 +658,19 @@ func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error)
return nil return nil
} }
// GetMethodConfig gets the method config of the input method. If there's no exact match for the input method (i.e. /service/method), we will return the default config for all methods under the service (/service/). // GetMethodConfig gets the method config of the input method. If there's no exact
// match for the input method (i.e. /service/method), we will return the default
// config for all methods under the service (/service/).
// TODO: Avoid the locking here. // TODO: Avoid the locking here.
func (cc *ClientConn) GetMethodConfig(method string) (m MethodConfig, ok bool) { func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
cc.mu.RLock() cc.mu.RLock()
defer cc.mu.RUnlock() defer cc.mu.RUnlock()
m, ok = cc.sc.Methods[method] m, ok := cc.sc.Methods[method]
if !ok { if !ok {
i := strings.LastIndex(method, "/") i := strings.LastIndex(method, "/")
m, ok = cc.sc.Methods[method[:i+1]] m, ok = cc.sc.Methods[method[:i+1]]
} }
return return m
} }
func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) { func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {

View File

@ -184,7 +184,7 @@ func FailFast(failFast bool) CallOption {
}) })
} }
// WithMaxReceiveMessageSize returns a DialOption which sets the maximum message size the client can receive. // WithMaxReceiveMessageSize returns a CallOption which sets the maximum message size the client can receive.
func WithMaxReceiveMessageSize(s int) CallOption { func WithMaxReceiveMessageSize(s int) CallOption {
return beforeCall(func(o *callInfo) error { return beforeCall(func(o *callInfo) error {
o.maxReceiveMessageSize = &s o.maxReceiveMessageSize = &s
@ -192,7 +192,7 @@ func WithMaxReceiveMessageSize(s int) CallOption {
}) })
} }
// WithMaxSendMessageSize returns a DialOption which sets the maximum message size the client can send. // WithMaxSendMessageSize returns a CallOption which sets the maximum message size the client can send.
func WithMaxSendMessageSize(s int) CallOption { func WithMaxSendMessageSize(s int) CallOption {
return beforeCall(func(o *callInfo) error { return beforeCall(func(o *callInfo) error {
o.maxSendMessageSize = &s o.maxSendMessageSize = &s

View File

@ -116,7 +116,6 @@ type options struct {
codec Codec codec Codec
cp Compressor cp Compressor
dc Decompressor dc Decompressor
maxMsgSize int
unaryInt UnaryServerInterceptor unaryInt UnaryServerInterceptor
streamInt StreamServerInterceptor streamInt StreamServerInterceptor
inTapHandle tap.ServerInHandle inTapHandle tap.ServerInHandle
@ -132,6 +131,8 @@ type options struct {
initialConnWindowSize int32 initialConnWindowSize int32
} }
var defaultServerOptions = options{maxReceiveMessageSize: defaultServerMaxReceiveMessageSize, maxSendMessageSize: defaultServerMaxSendMessageSize}
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc. // A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
type ServerOption func(*options) type ServerOption func(*options)
@ -284,9 +285,7 @@ func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
// NewServer creates a gRPC server which has no service registered and has not // NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet. // started to accept requests yet.
func NewServer(opt ...ServerOption) *Server { func NewServer(opt ...ServerOption) *Server {
var opts options opts := defaultServerOptions
opts.maxReceiveMessageSize = defaultServerMaxReceiveMessageSize
opts.maxSendMessageSize = defaultServerMaxSendMessageSize
for _, o := range opt { for _, o := range opt {
o(&opts) o(&opts)
} }

View File

@ -113,7 +113,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
cancel context.CancelFunc cancel context.CancelFunc
) )
c := defaultCallInfo c := defaultCallInfo
mc, _ := cc.GetMethodConfig(method) mc := cc.GetMethodConfig(method)
if mc.WaitForReady != nil { if mc.WaitForReady != nil {
c.failFast = !*mc.WaitForReady c.failFast = !*mc.WaitForReady
} }
@ -219,15 +219,13 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
break break
} }
cs := &clientStream{ cs := &clientStream{
opts: opts, opts: opts,
c: c, c: c,
desc: desc, desc: desc,
codec: cc.dopts.codec, codec: cc.dopts.codec,
cp: cc.dopts.cp, cp: cc.dopts.cp,
dc: cc.dopts.dc, dc: cc.dopts.dc,
maxReceiveMessageSize: *c.maxReceiveMessageSize, cancel: cancel,
maxSendMessageSize: *c.maxSendMessageSize,
cancel: cancel,
put: put, put: put,
t: t, t: t,
@ -271,19 +269,17 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
// clientStream implements a client side Stream. // clientStream implements a client side Stream.
type clientStream struct { type clientStream struct {
opts []CallOption opts []CallOption
c callInfo c callInfo
t transport.ClientTransport t transport.ClientTransport
s *transport.Stream s *transport.Stream
p *parser p *parser
desc *StreamDesc desc *StreamDesc
codec Codec codec Codec
cp Compressor cp Compressor
cbuf *bytes.Buffer cbuf *bytes.Buffer
dc Decompressor dc Decompressor
maxReceiveMessageSize int cancel context.CancelFunc
maxSendMessageSize int
cancel context.CancelFunc
tracing bool // set to EnableTracing when the clientStream is created. tracing bool // set to EnableTracing when the clientStream is created.
@ -367,8 +363,12 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
if err != nil { if err != nil {
return Errorf(codes.Internal, "grpc: %v", err) return Errorf(codes.Internal, "grpc: %v", err)
} }
if len(out) > cs.maxSendMessageSize { if cs.c.maxSendMessageSize == nil {
return Errorf(codes.ResourceExhausted, "Sent message larger than max (%d vs. %d)", len(out), cs.maxSendMessageSize) // TODO(lyuxuan): codes.Internal the right error to return here?
return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)")
}
if len(out) > *cs.c.maxSendMessageSize {
return Errorf(codes.ResourceExhausted, "Sent message larger than max (%d vs. %d)", len(out), *cs.c.maxSendMessageSize)
} }
err = cs.t.Write(cs.s, out, &transport.Options{Last: false}) err = cs.t.Write(cs.s, out, &transport.Options{Last: false})
if err == nil && outPayload != nil { if err == nil && outPayload != nil {
@ -385,7 +385,11 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
Client: true, Client: true,
} }
} }
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, cs.maxReceiveMessageSize, inPayload) if cs.c.maxReceiveMessageSize == nil {
// TODO(lyuxuan): codes.Internal the right error to return here?
return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)")
}
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, *cs.c.maxReceiveMessageSize, inPayload)
defer func() { defer func() {
// err != nil indicates the termination of the stream. // err != nil indicates the termination of the stream.
if err != nil { if err != nil {
@ -408,7 +412,11 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
} }
// Special handling for client streaming rpc. // Special handling for client streaming rpc.
// This recv expects EOF or errors, so we don't collect inPayload. // This recv expects EOF or errors, so we don't collect inPayload.
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, cs.maxReceiveMessageSize, nil) if cs.c.maxReceiveMessageSize == nil {
// TODO(lyuxuan): codes.Internal the right error to return here?
return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)")
}
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, *cs.c.maxReceiveMessageSize, nil)
cs.closeTransportStream(err) cs.closeTransportStream(err)
if err == nil { if err == nil {
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>")) return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))

View File

@ -443,7 +443,6 @@ type test struct {
userAgent string userAgent string
clientCompression bool clientCompression bool
serverCompression bool serverCompression bool
timeout time.Duration
unaryClientInt grpc.UnaryClientInterceptor unaryClientInt grpc.UnaryClientInterceptor
streamClientInt grpc.StreamClientInterceptor streamClientInt grpc.StreamClientInterceptor
unaryServerInt grpc.UnaryServerInterceptor unaryServerInt grpc.UnaryServerInterceptor
@ -615,9 +614,6 @@ func (te *test) clientConn() *grpc.ClientConn {
if te.maxClientSendMsgSize != nil { if te.maxClientSendMsgSize != nil {
opts = append(opts, grpc.WithDefaultCallOptions(grpc.WithMaxSendMessageSize(*te.maxClientSendMsgSize))) opts = append(opts, grpc.WithDefaultCallOptions(grpc.WithMaxSendMessageSize(*te.maxClientSendMsgSize)))
} }
if te.timeout > 0 {
opts = append(opts, grpc.WithTimeout(te.timeout))
}
switch te.e.security { switch te.e.security {
case "tls": case "tls":
creds, err := credentials.NewClientTLSFromFile(tlsDir+"ca.pem", "x.test.youtube.com") creds, err := credentials.NewClientTLSFromFile(tlsDir+"ca.pem", "x.test.youtube.com")
@ -1118,7 +1114,7 @@ func TestServiceConfig(t *testing.T) {
for _, e := range listTestEnv() { for _, e := range listTestEnv() {
testGetMethodConfig(t, e) testGetMethodConfig(t, e)
testServiceConfigWaitForReady(t, e) testServiceConfigWaitForReady(t, e)
// Timeout logic (min of service config and client API) is implemented implicitly in context. WithTimeout(). No need to test here. testServiceConfigTimeout(t, e)
testServiceConfigMaxMsgSize(t, e) testServiceConfigMaxMsgSize(t, e)
} }
} }
@ -1139,15 +1135,11 @@ func testServiceConfigSetup(t *testing.T, e env) (*test, chan grpc.ServiceConfig
} }
func newBool(b bool) (a *bool) { func newBool(b bool) (a *bool) {
a = new(bool) return &b
*a = b
return
} }
func newInt(b int) (a *int) { func newInt(b int) (a *int) {
a = new(int) return &b
*a = b
return
} }
func newDuration(b time.Duration) (a *time.Duration) { func newDuration(b time.Duration) (a *time.Duration) {
@ -1238,11 +1230,11 @@ func testServiceConfigWaitForReady(t *testing.T, e env) {
ch <- sc ch <- sc
// Wait for the new service config to take effect. // Wait for the new service config to take effect.
mc, ok := cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall") mc = cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall")
for { for {
if ok && !*mc.WaitForReady { if !*mc.WaitForReady {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
mc, ok = cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall") mc = cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall")
continue continue
} }
break break
@ -1256,6 +1248,67 @@ func testServiceConfigWaitForReady(t *testing.T, e env) {
} }
} }
func testServiceConfigTimeout(t *testing.T, e env) {
te, ch := testServiceConfigSetup(t, e)
defer te.tearDown()
// Case1: Client API sets timeout to be 1ns and ServiceConfig sets timeout to be 1hr. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
mc := grpc.MethodConfig{
Timeout: newDuration(time.Hour),
}
m := make(map[string]grpc.MethodConfig)
m["/grpc.testing.TestService/EmptyCall"] = mc
m["/grpc.testing.TestService/FullDuplexCall"] = mc
sc := grpc.ServiceConfig{
Methods: m,
}
ch <- sc
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
// The following RPCs are expected to become non-fail-fast ones with 1ns deadline.
ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
}
ctx, _ = context.WithTimeout(context.Background(), time.Nanosecond)
if _, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
}
// Generate a service config update.
// Case2: Client API sets timeout to be 1hr and ServiceConfig sets timeout to be 1ns. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
mc.Timeout = newDuration(time.Nanosecond)
m = make(map[string]grpc.MethodConfig)
m["/grpc.testing.TestService/EmptyCall"] = mc
m["/grpc.testing.TestService/FullDuplexCall"] = mc
sc = grpc.ServiceConfig{
Methods: m,
}
ch <- sc
// // Wait for the new service config to take effect.
mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall")
for {
if *mc.Timeout != time.Nanosecond {
time.Sleep(100 * time.Millisecond)
mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall")
continue
}
break
}
ctx, _ = context.WithTimeout(context.Background(), time.Hour)
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
}
ctx, _ = context.WithTimeout(context.Background(), time.Hour)
if _, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
}
}
func testServiceConfigMaxMsgSize(t *testing.T, e env) { func testServiceConfigMaxMsgSize(t *testing.T, e env) {
// Setting up values and objects shared across all test cases. // Setting up values and objects shared across all test cases.
const smallSize = 1 const smallSize = 1