diff --git a/grpclb/grpclb.go b/grpclb/grpclb.go index 5864cce0..931a4d84 100644 --- a/grpclb/grpclb.go +++ b/grpclb/grpclb.go @@ -70,6 +70,7 @@ type addrInfo struct { type balancer struct { r naming.Resolver mu sync.Mutex + w naming.Watcher addrCh chan []grpc.Address rbs []remoteBalancerInfo addrs []addrInfo @@ -86,6 +87,9 @@ func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan remoteBalancerInfo var bAddr remoteBalancerInfo b.mu.Lock() defer b.mu.Unlock() + if b.done { + return grpc.ErrClientConnClosing + } if len(b.rbs) > 0 { bAddr = b.rbs[0] } @@ -120,10 +124,6 @@ func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan remoteBalancerInfo grpclog.Println("Unknown update.Op ", update.Op) } } - if b.done { - close(ch) - return grpc.ErrClientConnClosing - } // TODO: Fall back to the basic round-robin load balancing if the resulting address is // not a load balancer. if len(b.rbs) > 0 { @@ -160,7 +160,6 @@ func (b *balancer) processServerList(l *lbpb.ServerList) { b.mu.Lock() defer b.mu.Unlock() if b.done { - close(b.addrCh) return } if len(sl) > 0 { @@ -235,18 +234,20 @@ func (b *balancer) Start(target string, config grpc.BalancerConfig) error { return grpc.ErrClientConnClosing } b.addrCh = make(chan []grpc.Address) - b.mu.Unlock() w, err := b.r.Resolve(target) if err != nil { + b.mu.Unlock() return err } + b.w = w + b.mu.Unlock() balancerAddrCh := make(chan remoteBalancerInfo, 1) // Spawn a goroutine to monitor the name resolution of remote load balancer. go func() { for { if err := b.watchAddrUpdates(w, balancerAddrCh); err != nil { - w.Close() grpclog.Printf("grpc: the naming watcher stops working due to %v.\n", err) + close(balancerAddrCh) return } } @@ -432,5 +433,11 @@ func (b *balancer) Close() error { if b.waitCh != nil { close(b.waitCh) } + if b.addrCh != nil { + close(b.addrCh) + } + if b.w != nil { + b.w.Close() + } return nil }