diff --git a/clientconn.go b/clientconn.go index 6e018133..45e1fc83 100644 --- a/clientconn.go +++ b/clientconn.go @@ -252,7 +252,7 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) { waitC := make(chan error, 1) go func() { for _, a := range addrs { - if err := cc.newAddrConn(a, false); err != nil { + if err := cc.resetAddrConn(a, false, nil); err != nil { waitC <- err return } @@ -349,11 +349,12 @@ func (cc *ClientConn) lbWatcher() { } if !keep { del = append(del, c) + delete(cc.conns, c.addr) } } cc.mu.Unlock() for _, a := range add { - cc.newAddrConn(a, true) + cc.resetAddrConn(a, true, nil) } for _, c := range del { c.tearDown(errConnDrain) @@ -361,7 +362,10 @@ func (cc *ClientConn) lbWatcher() { } } -func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error { +// resetAddrConn creates an addrConn for addr and adds it to cc.conns. +// If there is an old addrConn for addr, it will be torn down, using errTearDown as the reason. +// If errTearDown is nil, errConnDrain will be used instead. +func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, errTearDown error) error { ac := &addrConn{ cc: cc, addr: addr, @@ -397,13 +401,27 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error { ac.cc.mu.Unlock() if stale != nil { // There is an addrConn alive on ac.addr already. This could be due to - // i) stale's Close is undergoing; - // ii) a buggy Balancer notifies duplicated Addresses. - stale.tearDown(errConnDrain) + // 1) stale's Close is undergoing; + // 2) a buggy Balancer notifies duplicated Addresses; + // 3) goaway was received, a new ac will replace the old ac. + // The old ac should be deleted from cc.conns, but the + // underlying transport should drain rather than close. + if errTearDown == nil { + // errTearDown is nil if resetAddrConn is called by + // 1) Dial + // 2) lbWatcher + // In both cases, the stale ac should drain, not close. + stale.tearDown(errConnDrain) + } else { + stale.tearDown(errTearDown) + } } // skipWait may overwrite the decision in ac.dopts.block. if ac.dopts.block && !skipWait { if err := ac.resetTransport(false); err != nil { + ac.cc.mu.Lock() + delete(ac.cc.conns, ac.addr) + ac.cc.mu.Unlock() ac.tearDown(err) return err } @@ -414,6 +432,9 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error { go func() { if err := ac.resetTransport(false); err != nil { grpclog.Printf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err) + ac.cc.mu.Lock() + delete(ac.cc.conns, ac.addr) + ac.cc.mu.Unlock() ac.tearDown(err) return } @@ -641,11 +662,10 @@ func (ac *addrConn) transportMonitor() { // In both cases, a new ac is created. select { case <-t.Error(): - ac.tearDown(errNetworkIO) + ac.cc.resetAddrConn(ac.addr, true, errNetworkIO) default: - ac.tearDown(errConnDrain) + ac.cc.resetAddrConn(ac.addr, true, errConnDrain) } - ac.cc.newAddrConn(ac.addr, true) return case <-t.Error(): select { @@ -653,8 +673,7 @@ func (ac *addrConn) transportMonitor() { t.Close() return case <-t.GoAway(): - ac.tearDown(errNetworkIO) - ac.cc.newAddrConn(ac.addr, true) + ac.cc.resetAddrConn(ac.addr, true, errNetworkIO) return default: } @@ -715,16 +734,10 @@ func (ac *addrConn) wait(ctx context.Context, failFast bool) (transport.ClientTr // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in // some edge cases (e.g., the caller opens and closes many addrConn's in a // tight loop. +// tearDown doesn't remove ac from ac.cc.conns. func (ac *addrConn) tearDown(err error) { ac.mu.Lock() - defer func() { - ac.mu.Unlock() - ac.cc.mu.Lock() - if ac.cc.conns != nil { - delete(ac.cc.conns, ac.addr) - } - ac.cc.mu.Unlock() - }() + defer ac.mu.Unlock() if ac.down != nil { ac.down(downErrorf(false, false, "%v", err)) ac.down = nil