fix some other issues

This commit is contained in:
iamqizhao
2016-09-20 15:45:17 -07:00
parent ac49e6fc55
commit 766825a8b4

View File

@ -70,6 +70,7 @@ type addrInfo struct {
type balancer struct { type balancer struct {
r naming.Resolver r naming.Resolver
mu sync.Mutex mu sync.Mutex
w naming.Watcher
addrCh chan []grpc.Address addrCh chan []grpc.Address
rbs []remoteBalancerInfo rbs []remoteBalancerInfo
addrs []addrInfo addrs []addrInfo
@ -86,6 +87,9 @@ func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan remoteBalancerInfo
var bAddr remoteBalancerInfo var bAddr remoteBalancerInfo
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock() defer b.mu.Unlock()
if b.done {
return grpc.ErrClientConnClosing
}
if len(b.rbs) > 0 { if len(b.rbs) > 0 {
bAddr = b.rbs[0] bAddr = b.rbs[0]
} }
@ -120,10 +124,6 @@ func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan remoteBalancerInfo
grpclog.Println("Unknown update.Op ", update.Op) grpclog.Println("Unknown update.Op ", update.Op)
} }
} }
if b.done {
close(ch)
return grpc.ErrClientConnClosing
}
// TODO: Fall back to the basic round-robin load balancing if the resulting address is // TODO: Fall back to the basic round-robin load balancing if the resulting address is
// not a load balancer. // not a load balancer.
if len(b.rbs) > 0 { if len(b.rbs) > 0 {
@ -160,7 +160,6 @@ func (b *balancer) processServerList(l *lbpb.ServerList) {
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock() defer b.mu.Unlock()
if b.done { if b.done {
close(b.addrCh)
return return
} }
if len(sl) > 0 { if len(sl) > 0 {
@ -235,18 +234,20 @@ func (b *balancer) Start(target string, config grpc.BalancerConfig) error {
return grpc.ErrClientConnClosing return grpc.ErrClientConnClosing
} }
b.addrCh = make(chan []grpc.Address) b.addrCh = make(chan []grpc.Address)
b.mu.Unlock()
w, err := b.r.Resolve(target) w, err := b.r.Resolve(target)
if err != nil { if err != nil {
b.mu.Unlock()
return err return err
} }
b.w = w
b.mu.Unlock()
balancerAddrCh := make(chan remoteBalancerInfo, 1) balancerAddrCh := make(chan remoteBalancerInfo, 1)
// Spawn a goroutine to monitor the name resolution of remote load balancer. // Spawn a goroutine to monitor the name resolution of remote load balancer.
go func() { go func() {
for { for {
if err := b.watchAddrUpdates(w, balancerAddrCh); err != nil { if err := b.watchAddrUpdates(w, balancerAddrCh); err != nil {
w.Close()
grpclog.Printf("grpc: the naming watcher stops working due to %v.\n", err) grpclog.Printf("grpc: the naming watcher stops working due to %v.\n", err)
close(balancerAddrCh)
return return
} }
} }
@ -432,5 +433,11 @@ func (b *balancer) Close() error {
if b.waitCh != nil { if b.waitCh != nil {
close(b.waitCh) close(b.waitCh)
} }
if b.addrCh != nil {
close(b.addrCh)
}
if b.w != nil {
b.w.Close()
}
return nil return nil
} }