Re-resolve target when one connection becomes TransientFailure (#1679)

This allows ClientConn to get more up-to-date addresses from resolver.
ClientConn compares new addresses with the cached ones. So if resolver returns the same set of addresses, ClientConn will not notify balancer about it.

Also moved the initialization of resolver and balancer to avoid race. Balancer will only be started when ClientConn gets resolved addresses from balancer.
This commit is contained in:
Menghan Li
2017-11-28 13:16:53 -08:00
committed by GitHub
parent 2ef021f78d
commit 1e1a47f0f2
3 changed files with 47 additions and 16 deletions

View File

@ -494,17 +494,19 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
Dialer: cc.dopts.copts.Dialer, Dialer: cc.dopts.copts.Dialer,
} }
if cc.dopts.balancerBuilder != nil {
cc.customBalancer = true
// Build should not take long time. So it's ok to not have a goroutine for it.
cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
}
// Build the resolver. // Build the resolver.
cc.resolverWrapper, err = newCCResolverWrapper(cc) cc.resolverWrapper, err = newCCResolverWrapper(cc)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to build resolver: %v", err) return nil, fmt.Errorf("failed to build resolver: %v", err)
} }
// Start the resolver wrapper goroutine after resolverWrapper is created.
//
// If the goroutine is started before resolverWrapper is ready, the
// following may happen: The goroutine sends updates to cc. cc forwards
// those to balancer. Balancer creates new addrConn. addrConn fails to
// connect, and calls resolveNow(). resolveNow() tries to use the non-ready
// resolverWrapper.
cc.resolverWrapper.start()
// A blocking dial blocks until the clientConn is ready. // A blocking dial blocks until the clientConn is ready.
if cc.dopts.block { if cc.dopts.block {
@ -577,7 +579,6 @@ type ClientConn struct {
dopts dialOptions dopts dialOptions
csMgr *connectivityStateManager csMgr *connectivityStateManager
customBalancer bool // If this is true, switching balancer will be disabled.
balancerBuildOpts balancer.BuildOptions balancerBuildOpts balancer.BuildOptions
resolverWrapper *ccResolverWrapper resolverWrapper *ccResolverWrapper
blockingpicker *pickerWrapper blockingpicker *pickerWrapper
@ -638,19 +639,28 @@ func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
cc.mu.Lock() cc.mu.Lock()
defer cc.mu.Unlock() defer cc.mu.Unlock()
if cc.conns == nil { if cc.conns == nil {
// cc was closed.
return
}
if reflect.DeepEqual(cc.curAddresses, addrs) {
return return
} }
// TODO(bar switching) when grpclb is submitted, check address type and start grpclb. // TODO(bar switching) when grpclb is submitted, check address type and start grpclb.
if !cc.customBalancer && cc.balancerWrapper == nil { if cc.balancerWrapper == nil {
// No customBalancer was specified by DialOption, and this is the first // First time handling resolved addresses. Build a balancer use either
// time handling resolved addresses, create a pickfirst balancer. // the builder specified by dial option, or pickfirst.
builder := newPickfirstBuilder() builder := cc.dopts.balancerBuilder
if builder == nil {
// No customBalancer was specified by DialOption, and this is the first
// time handling resolved addresses, create a pickfirst balancer.
builder = newPickfirstBuilder()
}
cc.curBalancerName = builder.Name() cc.curBalancerName = builder.Name()
cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts) cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
} }
// TODO(bar switching) compare addresses, if there's no update, don't notify balancer.
cc.curAddresses = addrs cc.curAddresses = addrs
cc.balancerWrapper.handleResolvedAddrs(addrs, nil) cc.balancerWrapper.handleResolvedAddrs(addrs, nil)
} }
@ -664,7 +674,7 @@ func (cc *ClientConn) switchBalancer(name string) {
} }
grpclog.Infof("ClientConn switching balancer to %q", name) grpclog.Infof("ClientConn switching balancer to %q", name)
if cc.customBalancer { if cc.dopts.balancerBuilder != nil {
grpclog.Infoln("ignoring service config balancer configuration: WithBalancer DialOption used instead") grpclog.Infoln("ignoring service config balancer configuration: WithBalancer DialOption used instead")
return return
} }
@ -843,6 +853,16 @@ func (cc *ClientConn) handleServiceConfig(js string) error {
return nil return nil
} }
func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) {
cc.mu.Lock()
r := cc.resolverWrapper
cc.mu.Unlock()
if r == nil {
return
}
go r.resolveNow(o)
}
// Close tears down the ClientConn and all underlying connections. // Close tears down the ClientConn and all underlying connections.
func (cc *ClientConn) Close() error { func (cc *ClientConn) Close() error {
cc.cancel() cc.cancel()
@ -1029,6 +1049,7 @@ func (ac *addrConn) resetTransport() error {
ac.mu.Lock() ac.mu.Lock()
ac.state = connectivity.TransientFailure ac.state = connectivity.TransientFailure
ac.cc.handleSubConnStateChange(ac.acbw, ac.state) ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.cc.resolveNow(resolver.ResolveNowOption{})
if ac.ready != nil { if ac.ready != nil {
close(ac.ready) close(ac.ready)
ac.ready = nil ac.ready = nil
@ -1073,6 +1094,7 @@ func (ac *addrConn) transportMonitor() {
// resetTransport. Transition READY->CONNECTING is not valid. // resetTransport. Transition READY->CONNECTING is not valid.
ac.state = connectivity.TransientFailure ac.state = connectivity.TransientFailure
ac.cc.handleSubConnStateChange(ac.acbw, ac.state) ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.cc.resolveNow(resolver.ResolveNowOption{})
ac.curAddr = resolver.Address{} ac.curAddr = resolver.Address{}
ac.mu.Unlock() ac.mu.Unlock()
if err := ac.resetTransport(); err != nil { if err := ac.resetTransport(); err != nil {

View File

@ -130,8 +130,10 @@ type ResolveNowOption struct{}
// Resolver watches for the updates on the specified target. // Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates. // Updates include address updates and service config updates.
type Resolver interface { type Resolver interface {
// ResolveNow will be called by gRPC to try to resolve the target name again. // ResolveNow will be called by gRPC to try to resolve the target name
// It's just a hint, resolver can ignore this if it's not necessary. // again. It's just a hint, resolver can ignore this if it's not necessary.
//
// It could be called multiple times concurrently.
ResolveNow(ResolveNowOption) ResolveNow(ResolveNowOption)
// Close closes the resolver. // Close closes the resolver.
Close() Close()

View File

@ -87,10 +87,13 @@ func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
go ccr.watcher()
return ccr, nil return ccr, nil
} }
func (ccr *ccResolverWrapper) start() {
go ccr.watcher()
}
// watcher processes address updates and service config updates sequencially. // watcher processes address updates and service config updates sequencially.
// Otherwise, we need to resolve possible races between address and service // Otherwise, we need to resolve possible races between address and service
// config (e.g. they specify different balancer types). // config (e.g. they specify different balancer types).
@ -125,6 +128,10 @@ func (ccr *ccResolverWrapper) watcher() {
} }
} }
func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOption) {
ccr.resolver.ResolveNow(o)
}
func (ccr *ccResolverWrapper) close() { func (ccr *ccResolverWrapper) close() {
ccr.resolver.Close() ccr.resolver.Close()
close(ccr.done) close(ccr.done)