From bdf9a640e4fb00373ad986649c3af5bc594bc59a Mon Sep 17 00:00:00 2001 From: Yuxuan Li Date: Mon, 15 May 2017 13:51:11 -0700 Subject: [PATCH] add timeout test, add check or pointer filed in callOption, fix minor issues --- call.go | 10 +++++- clientconn.go | 26 ++++++++++---- rpc_util.go | 4 +-- server.go | 7 ++-- stream.go | 62 ++++++++++++++++++--------------- test/end2end_test.go | 81 ++++++++++++++++++++++++++++++++++++-------- 6 files changed, 136 insertions(+), 54 deletions(-) diff --git a/call.go b/call.go index 5822086f..eaccdad4 100644 --- a/call.go +++ b/call.go @@ -73,6 +73,10 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran } } for { + if c.maxReceiveMessageSize == nil { + // TODO(lyuxuan): codes.Internal the right error to return here? + return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)") + } if err = recv(p, dopts.codec, stream, dopts.dc, reply, *c.maxReceiveMessageSize, inPayload); err != nil { if err == io.EOF { break @@ -118,6 +122,10 @@ func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, if err != nil { return Errorf(codes.Internal, "grpc: %v", err) } + if c.maxSendMessageSize == nil { + // TODO(lyuxuan): codes.Internal the right error to return here? + return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)") + } if len(outBuf) > *c.maxSendMessageSize { return Errorf(codes.ResourceExhausted, "Sent message larger than max (%d vs. %d)", len(outBuf), *c.maxSendMessageSize) } @@ -148,7 +156,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) { c := defaultCallInfo - mc, _ := cc.GetMethodConfig(method) + mc := cc.GetMethodConfig(method) if mc.WaitForReady != nil { c.failFast = !*mc.WaitForReady } diff --git a/clientconn.go b/clientconn.go index 81258ec2..db6ad41d 100644 --- a/clientconn.go +++ b/clientconn.go @@ -126,7 +126,7 @@ func WithInitialConnWindowSize(s int32) DialOption { } } -// WithMaxMsgSize Deprecated: use WithMaxReceiveMessageSize instead. +// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. Deprecated: use WithMaxReceiveMessageSize instead. func WithMaxMsgSize(s int) DialOption { return WithDefaultCallOptions(WithMaxReceiveMessageSize(s)) } @@ -367,12 +367,14 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } }() + scSet := false if cc.dopts.scChan != nil { // Try to get an initial service config. select { case sc, ok := <-cc.dopts.scChan: if ok { cc.sc = sc + scSet = true } default: } @@ -436,7 +438,17 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * return nil, err } } - + if cc.dopts.scChan != nil && !scSet { + // Blocking Wait for the initial service config. + select { + case sc, ok := <-cc.dopts.scChan: + if ok { + cc.sc = sc + } + case <-ctx.Done(): + return nil, ctx.Err() + } + } if cc.dopts.scChan != nil { go cc.scWatcher() } @@ -646,17 +658,19 @@ func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) return nil } -// GetMethodConfig gets the method config of the input method. If there's no exact match for the input method (i.e. /service/method), we will return the default config for all methods under the service (/service/). +// GetMethodConfig gets the method config of the input method. If there's no exact +// match for the input method (i.e. /service/method), we will return the default +// config for all methods under the service (/service/). // TODO: Avoid the locking here. -func (cc *ClientConn) GetMethodConfig(method string) (m MethodConfig, ok bool) { +func (cc *ClientConn) GetMethodConfig(method string) MethodConfig { cc.mu.RLock() defer cc.mu.RUnlock() - m, ok = cc.sc.Methods[method] + m, ok := cc.sc.Methods[method] if !ok { i := strings.LastIndex(method, "/") m, ok = cc.sc.Methods[method[:i+1]] } - return + return m } func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) { diff --git a/rpc_util.go b/rpc_util.go index 58f4ed9e..6d46706d 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -184,7 +184,7 @@ func FailFast(failFast bool) CallOption { }) } -// WithMaxReceiveMessageSize returns a DialOption which sets the maximum message size the client can receive. +// WithMaxReceiveMessageSize returns a CallOption which sets the maximum message size the client can receive. func WithMaxReceiveMessageSize(s int) CallOption { return beforeCall(func(o *callInfo) error { o.maxReceiveMessageSize = &s @@ -192,7 +192,7 @@ func WithMaxReceiveMessageSize(s int) CallOption { }) } -// WithMaxSendMessageSize returns a DialOption which sets the maximum message size the client can send. +// WithMaxSendMessageSize returns a CallOption which sets the maximum message size the client can send. func WithMaxSendMessageSize(s int) CallOption { return beforeCall(func(o *callInfo) error { o.maxSendMessageSize = &s diff --git a/server.go b/server.go index ddaf2622..d7eff5f6 100644 --- a/server.go +++ b/server.go @@ -116,7 +116,6 @@ type options struct { codec Codec cp Compressor dc Decompressor - maxMsgSize int unaryInt UnaryServerInterceptor streamInt StreamServerInterceptor inTapHandle tap.ServerInHandle @@ -132,6 +131,8 @@ type options struct { initialConnWindowSize int32 } +var defaultServerOptions = options{maxReceiveMessageSize: defaultServerMaxReceiveMessageSize, maxSendMessageSize: defaultServerMaxSendMessageSize} + // A ServerOption sets options such as credentials, codec and keepalive parameters, etc. type ServerOption func(*options) @@ -284,9 +285,7 @@ func UnknownServiceHandler(streamHandler StreamHandler) ServerOption { // NewServer creates a gRPC server which has no service registered and has not // started to accept requests yet. func NewServer(opt ...ServerOption) *Server { - var opts options - opts.maxReceiveMessageSize = defaultServerMaxReceiveMessageSize - opts.maxSendMessageSize = defaultServerMaxSendMessageSize + opts := defaultServerOptions for _, o := range opt { o(&opts) } diff --git a/stream.go b/stream.go index a644f8f8..bf33dc38 100644 --- a/stream.go +++ b/stream.go @@ -113,7 +113,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth cancel context.CancelFunc ) c := defaultCallInfo - mc, _ := cc.GetMethodConfig(method) + mc := cc.GetMethodConfig(method) if mc.WaitForReady != nil { c.failFast = !*mc.WaitForReady } @@ -219,15 +219,13 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth break } cs := &clientStream{ - opts: opts, - c: c, - desc: desc, - codec: cc.dopts.codec, - cp: cc.dopts.cp, - dc: cc.dopts.dc, - maxReceiveMessageSize: *c.maxReceiveMessageSize, - maxSendMessageSize: *c.maxSendMessageSize, - cancel: cancel, + opts: opts, + c: c, + desc: desc, + codec: cc.dopts.codec, + cp: cc.dopts.cp, + dc: cc.dopts.dc, + cancel: cancel, put: put, t: t, @@ -271,19 +269,17 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth // clientStream implements a client side Stream. type clientStream struct { - opts []CallOption - c callInfo - t transport.ClientTransport - s *transport.Stream - p *parser - desc *StreamDesc - codec Codec - cp Compressor - cbuf *bytes.Buffer - dc Decompressor - maxReceiveMessageSize int - maxSendMessageSize int - cancel context.CancelFunc + opts []CallOption + c callInfo + t transport.ClientTransport + s *transport.Stream + p *parser + desc *StreamDesc + codec Codec + cp Compressor + cbuf *bytes.Buffer + dc Decompressor + cancel context.CancelFunc tracing bool // set to EnableTracing when the clientStream is created. @@ -367,8 +363,12 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { if err != nil { return Errorf(codes.Internal, "grpc: %v", err) } - if len(out) > cs.maxSendMessageSize { - return Errorf(codes.ResourceExhausted, "Sent message larger than max (%d vs. %d)", len(out), cs.maxSendMessageSize) + if cs.c.maxSendMessageSize == nil { + // TODO(lyuxuan): codes.Internal the right error to return here? + return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)") + } + if len(out) > *cs.c.maxSendMessageSize { + return Errorf(codes.ResourceExhausted, "Sent message larger than max (%d vs. %d)", len(out), *cs.c.maxSendMessageSize) } err = cs.t.Write(cs.s, out, &transport.Options{Last: false}) if err == nil && outPayload != nil { @@ -385,7 +385,11 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { Client: true, } } - err = recv(cs.p, cs.codec, cs.s, cs.dc, m, cs.maxReceiveMessageSize, inPayload) + if cs.c.maxReceiveMessageSize == nil { + // TODO(lyuxuan): codes.Internal the right error to return here? + return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)") + } + err = recv(cs.p, cs.codec, cs.s, cs.dc, m, *cs.c.maxReceiveMessageSize, inPayload) defer func() { // err != nil indicates the termination of the stream. if err != nil { @@ -408,7 +412,11 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { } // Special handling for client streaming rpc. // This recv expects EOF or errors, so we don't collect inPayload. - err = recv(cs.p, cs.codec, cs.s, cs.dc, m, cs.maxReceiveMessageSize, nil) + if cs.c.maxReceiveMessageSize == nil { + // TODO(lyuxuan): codes.Internal the right error to return here? + return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)") + } + err = recv(cs.p, cs.codec, cs.s, cs.dc, m, *cs.c.maxReceiveMessageSize, nil) cs.closeTransportStream(err) if err == nil { return toRPCErr(errors.New("grpc: client streaming protocol violation: get , want ")) diff --git a/test/end2end_test.go b/test/end2end_test.go index 2b0ea5dd..8835a5cd 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -443,7 +443,6 @@ type test struct { userAgent string clientCompression bool serverCompression bool - timeout time.Duration unaryClientInt grpc.UnaryClientInterceptor streamClientInt grpc.StreamClientInterceptor unaryServerInt grpc.UnaryServerInterceptor @@ -615,9 +614,6 @@ func (te *test) clientConn() *grpc.ClientConn { if te.maxClientSendMsgSize != nil { opts = append(opts, grpc.WithDefaultCallOptions(grpc.WithMaxSendMessageSize(*te.maxClientSendMsgSize))) } - if te.timeout > 0 { - opts = append(opts, grpc.WithTimeout(te.timeout)) - } switch te.e.security { case "tls": creds, err := credentials.NewClientTLSFromFile(tlsDir+"ca.pem", "x.test.youtube.com") @@ -1118,7 +1114,7 @@ func TestServiceConfig(t *testing.T) { for _, e := range listTestEnv() { testGetMethodConfig(t, e) testServiceConfigWaitForReady(t, e) - // Timeout logic (min of service config and client API) is implemented implicitly in context. WithTimeout(). No need to test here. + testServiceConfigTimeout(t, e) testServiceConfigMaxMsgSize(t, e) } } @@ -1139,15 +1135,11 @@ func testServiceConfigSetup(t *testing.T, e env) (*test, chan grpc.ServiceConfig } func newBool(b bool) (a *bool) { - a = new(bool) - *a = b - return + return &b } func newInt(b int) (a *int) { - a = new(int) - *a = b - return + return &b } func newDuration(b time.Duration) (a *time.Duration) { @@ -1238,11 +1230,11 @@ func testServiceConfigWaitForReady(t *testing.T, e env) { ch <- sc // Wait for the new service config to take effect. - mc, ok := cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall") + mc = cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall") for { - if ok && !*mc.WaitForReady { + if !*mc.WaitForReady { time.Sleep(100 * time.Millisecond) - mc, ok = cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall") + mc = cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall") continue } break @@ -1256,6 +1248,67 @@ func testServiceConfigWaitForReady(t *testing.T, e env) { } } +func testServiceConfigTimeout(t *testing.T, e env) { + te, ch := testServiceConfigSetup(t, e) + defer te.tearDown() + + // Case1: Client API sets timeout to be 1ns and ServiceConfig sets timeout to be 1hr. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds. + mc := grpc.MethodConfig{ + Timeout: newDuration(time.Hour), + } + m := make(map[string]grpc.MethodConfig) + m["/grpc.testing.TestService/EmptyCall"] = mc + m["/grpc.testing.TestService/FullDuplexCall"] = mc + sc := grpc.ServiceConfig{ + Methods: m, + } + ch <- sc + + cc := te.clientConn() + tc := testpb.NewTestServiceClient(cc) + // The following RPCs are expected to become non-fail-fast ones with 1ns deadline. + ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond) + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { + t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) + } + ctx, _ = context.WithTimeout(context.Background(), time.Nanosecond) + if _, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { + t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) + } + + // Generate a service config update. + // Case2: Client API sets timeout to be 1hr and ServiceConfig sets timeout to be 1ns. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds. + mc.Timeout = newDuration(time.Nanosecond) + m = make(map[string]grpc.MethodConfig) + m["/grpc.testing.TestService/EmptyCall"] = mc + m["/grpc.testing.TestService/FullDuplexCall"] = mc + sc = grpc.ServiceConfig{ + Methods: m, + } + ch <- sc + + // // Wait for the new service config to take effect. + mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall") + for { + if *mc.Timeout != time.Nanosecond { + time.Sleep(100 * time.Millisecond) + mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall") + continue + } + break + } + + ctx, _ = context.WithTimeout(context.Background(), time.Hour) + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { + t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) + } + + ctx, _ = context.WithTimeout(context.Background(), time.Hour) + if _, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { + t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) + } +} + func testServiceConfigMaxMsgSize(t *testing.T, e env) { // Setting up values and objects shared across all test cases. const smallSize = 1