diff --git a/clientconn.go b/clientconn.go index 87b3b578..346dcb85 100644 --- a/clientconn.go +++ b/clientconn.go @@ -494,17 +494,19 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * Dialer: cc.dopts.copts.Dialer, } - if cc.dopts.balancerBuilder != nil { - cc.customBalancer = true - // Build should not take long time. So it's ok to not have a goroutine for it. - cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts) - } - // Build the resolver. cc.resolverWrapper, err = newCCResolverWrapper(cc) if err != nil { return nil, fmt.Errorf("failed to build resolver: %v", err) } + // Start the resolver wrapper goroutine after resolverWrapper is created. + // + // If the goroutine is started before resolverWrapper is ready, the + // following may happen: The goroutine sends updates to cc. cc forwards + // those to balancer. Balancer creates new addrConn. addrConn fails to + // connect, and calls resolveNow(). resolveNow() tries to use the non-ready + // resolverWrapper. + cc.resolverWrapper.start() // A blocking dial blocks until the clientConn is ready. if cc.dopts.block { @@ -577,7 +579,6 @@ type ClientConn struct { dopts dialOptions csMgr *connectivityStateManager - customBalancer bool // If this is true, switching balancer will be disabled. balancerBuildOpts balancer.BuildOptions resolverWrapper *ccResolverWrapper blockingpicker *pickerWrapper @@ -638,19 +639,28 @@ func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) { cc.mu.Lock() defer cc.mu.Unlock() if cc.conns == nil { + // cc was closed. + return + } + + if reflect.DeepEqual(cc.curAddresses, addrs) { return } // TODO(bar switching) when grpclb is submitted, check address type and start grpclb. - if !cc.customBalancer && cc.balancerWrapper == nil { - // No customBalancer was specified by DialOption, and this is the first - // time handling resolved addresses, create a pickfirst balancer. - builder := newPickfirstBuilder() + if cc.balancerWrapper == nil { + // First time handling resolved addresses. Build a balancer use either + // the builder specified by dial option, or pickfirst. + builder := cc.dopts.balancerBuilder + if builder == nil { + // No customBalancer was specified by DialOption, and this is the first + // time handling resolved addresses, create a pickfirst balancer. + builder = newPickfirstBuilder() + } cc.curBalancerName = builder.Name() cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts) } - // TODO(bar switching) compare addresses, if there's no update, don't notify balancer. cc.curAddresses = addrs cc.balancerWrapper.handleResolvedAddrs(addrs, nil) } @@ -664,7 +674,7 @@ func (cc *ClientConn) switchBalancer(name string) { } grpclog.Infof("ClientConn switching balancer to %q", name) - if cc.customBalancer { + if cc.dopts.balancerBuilder != nil { grpclog.Infoln("ignoring service config balancer configuration: WithBalancer DialOption used instead") return } @@ -843,6 +853,16 @@ func (cc *ClientConn) handleServiceConfig(js string) error { return nil } +func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) { + cc.mu.Lock() + r := cc.resolverWrapper + cc.mu.Unlock() + if r == nil { + return + } + go r.resolveNow(o) +} + // Close tears down the ClientConn and all underlying connections. func (cc *ClientConn) Close() error { cc.cancel() @@ -1029,6 +1049,7 @@ func (ac *addrConn) resetTransport() error { ac.mu.Lock() ac.state = connectivity.TransientFailure ac.cc.handleSubConnStateChange(ac.acbw, ac.state) + ac.cc.resolveNow(resolver.ResolveNowOption{}) if ac.ready != nil { close(ac.ready) ac.ready = nil @@ -1073,6 +1094,7 @@ func (ac *addrConn) transportMonitor() { // resetTransport. Transition READY->CONNECTING is not valid. ac.state = connectivity.TransientFailure ac.cc.handleSubConnStateChange(ac.acbw, ac.state) + ac.cc.resolveNow(resolver.ResolveNowOption{}) ac.curAddr = resolver.Address{} ac.mu.Unlock() if err := ac.resetTransport(); err != nil { diff --git a/resolver/resolver.go b/resolver/resolver.go index e27db82f..e6c8faf9 100644 --- a/resolver/resolver.go +++ b/resolver/resolver.go @@ -130,8 +130,10 @@ type ResolveNowOption struct{} // Resolver watches for the updates on the specified target. // Updates include address updates and service config updates. type Resolver interface { - // ResolveNow will be called by gRPC to try to resolve the target name again. - // It's just a hint, resolver can ignore this if it's not necessary. + // ResolveNow will be called by gRPC to try to resolve the target name + // again. It's just a hint, resolver can ignore this if it's not necessary. + // + // It could be called multiple times concurrently. ResolveNow(ResolveNowOption) // Close closes the resolver. Close() diff --git a/resolver_conn_wrapper.go b/resolver_conn_wrapper.go index e285ad9b..1a1591e8 100644 --- a/resolver_conn_wrapper.go +++ b/resolver_conn_wrapper.go @@ -87,10 +87,13 @@ func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) { if err != nil { return nil, err } - go ccr.watcher() return ccr, nil } +func (ccr *ccResolverWrapper) start() { + go ccr.watcher() +} + // watcher processes address updates and service config updates sequencially. // Otherwise, we need to resolve possible races between address and service // config (e.g. they specify different balancer types). @@ -125,6 +128,10 @@ func (ccr *ccResolverWrapper) watcher() { } } +func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOption) { + ccr.resolver.ResolveNow(o) +} + func (ccr *ccResolverWrapper) close() { ccr.resolver.Close() close(ccr.done)