From 09aecb094ef6b9ebe49dd999f52c505beeeb402e Mon Sep 17 00:00:00 2001 From: Qi Zhao Date: Mon, 19 Dec 2016 16:31:00 -0800 Subject: [PATCH] Add the initial service config support (#1009) * Add the initial service config support * start scWatcher later * remove timeoutCh * address the comments * deal with dial timeout * defer cancel for the newly created context for correct lifetime management * fix the defer order * added other 2 missing cancels --- call.go | 9 +++++ clientconn.go | 73 ++++++++++++++++++++++++++++++++++----- clientconn_test.go | 8 ++--- rpc_util.go | 38 ++++++++++++++++++++ stream.go | 52 ++++++++++++++++++---------- test/end2end_test.go | 82 +++++++++++++++++++++++++++++++++++++++++++- 6 files changed, 229 insertions(+), 33 deletions(-) diff --git a/call.go b/call.go index fc8e18af..4d8023d9 100644 --- a/call.go +++ b/call.go @@ -145,6 +145,14 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) { c := defaultCallInfo + if mc, ok := cc.getMethodConfig(method); ok { + c.failFast = !mc.WaitForReady + if mc.Timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, mc.Timeout) + defer cancel() + } + } for _, o := range opts { if err := o.before(&c); err != nil { return toRPCErr(err) @@ -211,6 +219,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli if cc.dopts.cp != nil { callHdr.SendCompress = cc.dopts.cp.Type() } + gopts := BalancerGetOptions{ BlockingWait: !c.failFast, } diff --git a/clientconn.go b/clientconn.go index f6dab4b7..aa6b63de 100644 --- a/clientconn.go +++ b/clientconn.go @@ -54,6 +54,8 @@ var ( ErrClientConnClosing = errors.New("grpc: the client connection is closing") // ErrClientConnTimeout indicates that the ClientConn cannot establish the // underlying connections within the specified timeout. + // DEPRECATED: Please use context.DeadlineExceeded instead. This error will be + // removed in Q1 2017. ErrClientConnTimeout = errors.New("grpc: timed out when dialing") // errNoTransportSecurity indicates that there is no transport security @@ -93,6 +95,7 @@ type dialOptions struct { block bool insecure bool timeout time.Duration + scChan <-chan ServiceConfig copts transport.ConnectOptions } @@ -129,6 +132,13 @@ func WithBalancer(b Balancer) DialOption { } } +// WithServiceConfig returns a DialOption which has a channel to read the service configuration. +func WithServiceConfig(c <-chan ServiceConfig) DialOption { + return func(o *dialOptions) { + o.scChan = c + } +} + // WithBackoffMaxDelay configures the dialer to use the provided maximum delay // when backing off after failed connection attempts. func WithBackoffMaxDelay(md time.Duration) DialOption { @@ -260,6 +270,15 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * conns: make(map[Address]*addrConn), } cc.ctx, cc.cancel = context.WithCancel(context.Background()) + for _, opt := range opts { + opt(&cc.dopts) + } + if cc.dopts.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) + defer cancel() + } + defer func() { select { case <-ctx.Done(): @@ -272,10 +291,17 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } }() - for _, opt := range opts { - opt(&cc.dopts) + if cc.dopts.scChan != nil { + // Wait for the initial service config. + select { + case sc, ok := <-cc.dopts.scChan: + if ok { + cc.sc = sc + } + case <-ctx.Done(): + return nil, ctx.Err() + } } - // Set defaults. if cc.dopts.codec == nil { cc.dopts.codec = protoCodec{} @@ -297,6 +323,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * waitC := make(chan error, 1) go func() { var addrs []Address + if cc.dopts.balancer == nil && cc.sc.LB != nil { + cc.dopts.balancer = cc.sc.LB + } if cc.dopts.balancer == nil { // Connect to target directly if balancer is nil. addrs = append(addrs, Address{Addr: target}) @@ -332,10 +361,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } close(waitC) }() - var timeoutCh <-chan time.Time - if cc.dopts.timeout > 0 { - timeoutCh = time.After(cc.dopts.timeout) - } select { case <-ctx.Done(): return nil, ctx.Err() @@ -343,14 +368,17 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * if err != nil { return nil, err } - case <-timeoutCh: - return nil, ErrClientConnTimeout } + // If balancer is nil or balancer.Notify() is nil, ok will be false here. // The lbWatcher goroutine will not be created. if ok { go cc.lbWatcher() } + + if cc.dopts.scChan != nil { + go cc.scWatcher() + } return cc, nil } @@ -397,6 +425,7 @@ type ClientConn struct { dopts dialOptions mu sync.RWMutex + sc ServiceConfig conns map[Address]*addrConn } @@ -435,6 +464,24 @@ func (cc *ClientConn) lbWatcher() { } } +func (cc *ClientConn) scWatcher() { + for { + select { + case sc, ok := <-cc.dopts.scChan: + if !ok { + return + } + cc.mu.Lock() + // TODO: load balance policy runtime change is ignored. + // We may revist this decision in the future. + cc.sc = sc + cc.mu.Unlock() + case <-cc.ctx.Done(): + return + } + } +} + // resetAddrConn creates an addrConn for addr and adds it to cc.conns. // If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason. // If tearDownErr is nil, errConnDrain will be used instead. @@ -522,6 +569,14 @@ func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr err return nil } +// TODO: Avoid the locking here. +func (cc *ClientConn) getMethodConfig(method string) (m MethodConfig, ok bool) { + cc.mu.RLock() + defer cc.mu.RUnlock() + m, ok = cc.sc.Methods[method] + return +} + func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) { var ( ac *addrConn diff --git a/clientconn_test.go b/clientconn_test.go index 98612f45..9b759a1f 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -50,8 +50,8 @@ func TestDialTimeout(t *testing.T) { if err == nil { conn.Close() } - if err != ErrClientConnTimeout { - t.Fatalf("Dial(_, _) = %v, %v, want %v", conn, err, ErrClientConnTimeout) + if err != context.DeadlineExceeded { + t.Fatalf("Dial(_, _) = %v, %v, want %v", conn, err, context.DeadlineExceeded) } } @@ -64,8 +64,8 @@ func TestTLSDialTimeout(t *testing.T) { if err == nil { conn.Close() } - if err != ErrClientConnTimeout { - t.Fatalf("Dial(_, _) = %v, %v, want %v", conn, err, ErrClientConnTimeout) + if err != context.DeadlineExceeded { + t.Fatalf("Dial(_, _) = %v, %v, want %v", conn, err, context.DeadlineExceeded) } } diff --git a/rpc_util.go b/rpc_util.go index 66d08b5a..2619d396 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -472,6 +472,44 @@ func convertCode(err error) codes.Code { return codes.Unknown } +// MethodConfig defines the configuration recommended by the service providers for a +// particular method. +// This is EXPERIMENTAL and subject to change. +type MethodConfig struct { + // WaitForReady indicates whether RPCs sent to this method should wait until + // the connection is ready by default (!failfast). The value specified via the + // gRPC client API will override the value set here. + WaitForReady bool + // Timeout is the default timeout for RPCs sent to this method. The actual + // deadline used will be the minimum of the value specified here and the value + // set by the application via the gRPC client API. If either one is not set, + // then the other will be used. If neither is set, then the RPC has no deadline. + Timeout time.Duration + // MaxReqSize is the maximum allowed payload size for an individual request in a + // stream (client->server) in bytes. The size which is measured is the serialized, + // uncompressed payload in bytes. The actual value used is the minumum of the value + // specified here and the value set by the application via the gRPC client API. If + // either one is not set, then the other will be used. If neither is set, then the + // built-in default is used. + // TODO: support this. + MaxReqSize uint64 + // MaxRespSize is the maximum allowed payload size for an individual response in a + // stream (server->client) in bytes. + // TODO: support this. + MaxRespSize uint64 +} + +// ServiceConfig is provided by the service provider and contains parameters for how +// clients that connect to the service should behave. +// This is EXPERIMENTAL and subject to change. +type ServiceConfig struct { + // LB is the load balancer the service providers recommends. The balancer specified + // via grpc.WithBalancer will override this. + LB Balancer + // Methods contains a map for the methods in this service. + Methods map[string]MethodConfig +} + // SupportPackageIsVersion4 is referenced from generated protocol buffer files // to assert that that code is compatible with this version of the grpc package. // diff --git a/stream.go b/stream.go index 1bcd2183..d3a4debf 100644 --- a/stream.go +++ b/stream.go @@ -107,11 +107,18 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { var ( - t transport.ClientTransport - s *transport.Stream - put func() + t transport.ClientTransport + s *transport.Stream + put func() + cancel context.CancelFunc ) c := defaultCallInfo + if mc, ok := cc.getMethodConfig(method); ok { + c.failFast = !mc.WaitForReady + if mc.Timeout > 0 { + ctx, cancel = context.WithTimeout(ctx, mc.Timeout) + } + } for _, o := range opts { if err := o.before(&c); err != nil { return nil, toRPCErr(err) @@ -200,12 +207,13 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth break } cs := &clientStream{ - opts: opts, - c: c, - desc: desc, - codec: cc.dopts.codec, - cp: cc.dopts.cp, - dc: cc.dopts.dc, + opts: opts, + c: c, + desc: desc, + codec: cc.dopts.codec, + cp: cc.dopts.cp, + dc: cc.dopts.dc, + cancel: cancel, put: put, t: t, @@ -249,16 +257,17 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth // clientStream implements a client side Stream. type clientStream struct { - opts []CallOption - c callInfo - t transport.ClientTransport - s *transport.Stream - p *parser - desc *StreamDesc - codec Codec - cp Compressor - cbuf *bytes.Buffer - dc Decompressor + opts []CallOption + c callInfo + t transport.ClientTransport + s *transport.Stream + p *parser + desc *StreamDesc + codec Codec + cp Compressor + cbuf *bytes.Buffer + dc Decompressor + cancel context.CancelFunc tracing bool // set to EnableTracing when the clientStream is created. @@ -449,6 +458,11 @@ func (cs *clientStream) closeTransportStream(err error) { } func (cs *clientStream) finish(err error) { + defer func() { + if cs.cancel != nil { + cs.cancel() + } + }() cs.mu.Lock() defer cs.mu.Unlock() for _, o := range cs.opts { diff --git a/test/end2end_test.go b/test/end2end_test.go index 61ff1741..6fe45f21 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -428,6 +428,7 @@ type test struct { streamClientInt grpc.StreamClientInterceptor unaryServerInt grpc.UnaryServerInterceptor streamServerInt grpc.StreamServerInterceptor + sc <-chan grpc.ServiceConfig // srv and srvAddr are set once startServer is called. srv *grpc.Server @@ -450,7 +451,9 @@ func (te *test) tearDown() { te.restoreLogs() te.restoreLogs = nil } - te.srv.Stop() + if te.srv != nil { + te.srv.Stop() + } } // newTest returns a new test using the provided testing.T and @@ -547,6 +550,10 @@ func (te *test) clientConn() *grpc.ClientConn { grpc.WithUserAgent(te.userAgent), } + if te.sc != nil { + opts = append(opts, grpc.WithServiceConfig(te.sc)) + } + if te.clientCompression { opts = append(opts, grpc.WithCompressor(grpc.NewGZIPCompressor()), @@ -1013,6 +1020,79 @@ func testFailFast(t *testing.T, e env) { awaitNewConnLogOutput() } +func TestServiceConfig(t *testing.T) { + defer leakCheck(t)() + for _, e := range listTestEnv() { + testServiceConfig(t, e) + } +} + +func testServiceConfig(t *testing.T, e env) { + te := newTest(t, e) + ch := make(chan grpc.ServiceConfig) + te.sc = ch + te.userAgent = testAppUA + te.declareLogNoise( + "transport: http2Client.notifyError got notified that the client transport was broken EOF", + "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", + "grpc: addrConn.resetTransport failed to create client transport: connection error", + "Failed to dial : context canceled; please retry.", + ) + defer te.tearDown() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + mc := grpc.MethodConfig{ + WaitForReady: true, + Timeout: time.Millisecond, + } + m := make(map[string]grpc.MethodConfig) + m["/grpc.testing.TestService/EmptyCall"] = mc + m["/grpc.testing.TestService/FullDuplexCall"] = mc + sc := grpc.ServiceConfig{ + Methods: m, + } + ch <- sc + }() + cc := te.clientConn() + tc := testpb.NewTestServiceClient(cc) + // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded { + t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) + } + if _, err := tc.FullDuplexCall(context.Background()); grpc.Code(err) != codes.DeadlineExceeded { + t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) + } + wg.Wait() + // Generate a service config update. + mc := grpc.MethodConfig{ + WaitForReady: false, + } + m := make(map[string]grpc.MethodConfig) + m["/grpc.testing.TestService/EmptyCall"] = mc + m["/grpc.testing.TestService/FullDuplexCall"] = mc + sc := grpc.ServiceConfig{ + Methods: m, + } + ch <- sc + // Loop until the new update becomes effective. + for { + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.Unavailable { + continue + } + break + } + // The following RPCs are expected to become fail-fast. + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.Unavailable { + t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable) + } + if _, err := tc.FullDuplexCall(context.Background()); grpc.Code(err) != codes.Unavailable { + t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.Unavailable) + } +} + func TestTap(t *testing.T) { defer leakCheck(t)() for _, e := range listTestEnv() {