From 1e1a47f0f28f70686674869a7dee0f1096d86919 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 28 Nov 2017 13:16:53 -0800 Subject: [PATCH] Re-resolve target when one connection becomes TransientFailure (#1679) This allows ClientConn to get more up-to-date addresses from resolver. ClientConn compares new addresses with the cached ones. So if resolver returns the same set of addresses, ClientConn will not notify balancer about it. Also moved the initialization of resolver and balancer to avoid race. Balancer will only be started when ClientConn gets resolved addresses from balancer. --- clientconn.go | 48 +++++++++++++++++++++++++++++----------- resolver/resolver.go | 6 +++-- resolver_conn_wrapper.go | 9 +++++++- 3 files changed, 47 insertions(+), 16 deletions(-) diff --git a/clientconn.go b/clientconn.go index 87b3b578..346dcb85 100644 --- a/clientconn.go +++ b/clientconn.go @@ -494,17 +494,19 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * Dialer: cc.dopts.copts.Dialer, } - if cc.dopts.balancerBuilder != nil { - cc.customBalancer = true - // Build should not take long time. So it's ok to not have a goroutine for it. - cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts) - } - // Build the resolver. cc.resolverWrapper, err = newCCResolverWrapper(cc) if err != nil { return nil, fmt.Errorf("failed to build resolver: %v", err) } + // Start the resolver wrapper goroutine after resolverWrapper is created. + // + // If the goroutine is started before resolverWrapper is ready, the + // following may happen: The goroutine sends updates to cc. cc forwards + // those to balancer. Balancer creates new addrConn. addrConn fails to + // connect, and calls resolveNow(). resolveNow() tries to use the non-ready + // resolverWrapper. + cc.resolverWrapper.start() // A blocking dial blocks until the clientConn is ready. if cc.dopts.block { @@ -577,7 +579,6 @@ type ClientConn struct { dopts dialOptions csMgr *connectivityStateManager - customBalancer bool // If this is true, switching balancer will be disabled. balancerBuildOpts balancer.BuildOptions resolverWrapper *ccResolverWrapper blockingpicker *pickerWrapper @@ -638,19 +639,28 @@ func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) { cc.mu.Lock() defer cc.mu.Unlock() if cc.conns == nil { + // cc was closed. + return + } + + if reflect.DeepEqual(cc.curAddresses, addrs) { return } // TODO(bar switching) when grpclb is submitted, check address type and start grpclb. - if !cc.customBalancer && cc.balancerWrapper == nil { - // No customBalancer was specified by DialOption, and this is the first - // time handling resolved addresses, create a pickfirst balancer. - builder := newPickfirstBuilder() + if cc.balancerWrapper == nil { + // First time handling resolved addresses. Build a balancer use either + // the builder specified by dial option, or pickfirst. + builder := cc.dopts.balancerBuilder + if builder == nil { + // No customBalancer was specified by DialOption, and this is the first + // time handling resolved addresses, create a pickfirst balancer. + builder = newPickfirstBuilder() + } cc.curBalancerName = builder.Name() cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts) } - // TODO(bar switching) compare addresses, if there's no update, don't notify balancer. cc.curAddresses = addrs cc.balancerWrapper.handleResolvedAddrs(addrs, nil) } @@ -664,7 +674,7 @@ func (cc *ClientConn) switchBalancer(name string) { } grpclog.Infof("ClientConn switching balancer to %q", name) - if cc.customBalancer { + if cc.dopts.balancerBuilder != nil { grpclog.Infoln("ignoring service config balancer configuration: WithBalancer DialOption used instead") return } @@ -843,6 +853,16 @@ func (cc *ClientConn) handleServiceConfig(js string) error { return nil } +func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) { + cc.mu.Lock() + r := cc.resolverWrapper + cc.mu.Unlock() + if r == nil { + return + } + go r.resolveNow(o) +} + // Close tears down the ClientConn and all underlying connections. func (cc *ClientConn) Close() error { cc.cancel() @@ -1029,6 +1049,7 @@ func (ac *addrConn) resetTransport() error { ac.mu.Lock() ac.state = connectivity.TransientFailure ac.cc.handleSubConnStateChange(ac.acbw, ac.state) + ac.cc.resolveNow(resolver.ResolveNowOption{}) if ac.ready != nil { close(ac.ready) ac.ready = nil @@ -1073,6 +1094,7 @@ func (ac *addrConn) transportMonitor() { // resetTransport. Transition READY->CONNECTING is not valid. ac.state = connectivity.TransientFailure ac.cc.handleSubConnStateChange(ac.acbw, ac.state) + ac.cc.resolveNow(resolver.ResolveNowOption{}) ac.curAddr = resolver.Address{} ac.mu.Unlock() if err := ac.resetTransport(); err != nil { diff --git a/resolver/resolver.go b/resolver/resolver.go index e27db82f..e6c8faf9 100644 --- a/resolver/resolver.go +++ b/resolver/resolver.go @@ -130,8 +130,10 @@ type ResolveNowOption struct{} // Resolver watches for the updates on the specified target. // Updates include address updates and service config updates. type Resolver interface { - // ResolveNow will be called by gRPC to try to resolve the target name again. - // It's just a hint, resolver can ignore this if it's not necessary. + // ResolveNow will be called by gRPC to try to resolve the target name + // again. It's just a hint, resolver can ignore this if it's not necessary. + // + // It could be called multiple times concurrently. ResolveNow(ResolveNowOption) // Close closes the resolver. Close() diff --git a/resolver_conn_wrapper.go b/resolver_conn_wrapper.go index e285ad9b..1a1591e8 100644 --- a/resolver_conn_wrapper.go +++ b/resolver_conn_wrapper.go @@ -87,10 +87,13 @@ func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) { if err != nil { return nil, err } - go ccr.watcher() return ccr, nil } +func (ccr *ccResolverWrapper) start() { + go ccr.watcher() +} + // watcher processes address updates and service config updates sequencially. // Otherwise, we need to resolve possible races between address and service // config (e.g. they specify different balancer types). @@ -125,6 +128,10 @@ func (ccr *ccResolverWrapper) watcher() { } } +func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOption) { + ccr.resolver.ResolveNow(o) +} + func (ccr *ccResolverWrapper) close() { ccr.resolver.Close() close(ccr.done)