Merge branch 'master' of https://github.com/grpc/grpc-go into race-fix

This commit is contained in:
iamqizhao
2016-07-29 18:18:19 -07:00

View File

@ -252,7 +252,7 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
waitC := make(chan error, 1)
go func() {
for _, a := range addrs {
if err := cc.newAddrConn(a, false); err != nil {
if err := cc.resetAddrConn(a, false, nil); err != nil {
waitC <- err
return
}
@ -349,11 +349,12 @@ func (cc *ClientConn) lbWatcher() {
}
if !keep {
del = append(del, c)
delete(cc.conns, c.addr)
}
}
cc.mu.Unlock()
for _, a := range add {
cc.newAddrConn(a, true)
cc.resetAddrConn(a, true, nil)
}
for _, c := range del {
c.tearDown(errConnDrain)
@ -361,7 +362,10 @@ func (cc *ClientConn) lbWatcher() {
}
}
func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error {
// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
// If there is an old addrConn for addr, it will be torn down, using errTearDown as the reason.
// If errTearDown is nil, errConnDrain will be used instead.
func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, errTearDown error) error {
ac := &addrConn{
cc: cc,
addr: addr,
@ -397,13 +401,27 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error {
ac.cc.mu.Unlock()
if stale != nil {
// There is an addrConn alive on ac.addr already. This could be due to
// i) stale's Close is undergoing;
// ii) a buggy Balancer notifies duplicated Addresses.
stale.tearDown(errConnDrain)
// 1) stale's Close is undergoing;
// 2) a buggy Balancer notifies duplicated Addresses;
// 3) goaway was received, a new ac will replace the old ac.
// The old ac should be deleted from cc.conns, but the
// underlying transport should drain rather than close.
if errTearDown == nil {
// errTearDown is nil if resetAddrConn is called by
// 1) Dial
// 2) lbWatcher
// In both cases, the stale ac should drain, not close.
stale.tearDown(errConnDrain)
} else {
stale.tearDown(errTearDown)
}
}
// skipWait may overwrite the decision in ac.dopts.block.
if ac.dopts.block && !skipWait {
if err := ac.resetTransport(false); err != nil {
ac.cc.mu.Lock()
delete(ac.cc.conns, ac.addr)
ac.cc.mu.Unlock()
ac.tearDown(err)
return err
}
@ -414,6 +432,9 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error {
go func() {
if err := ac.resetTransport(false); err != nil {
grpclog.Printf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err)
ac.cc.mu.Lock()
delete(ac.cc.conns, ac.addr)
ac.cc.mu.Unlock()
ac.tearDown(err)
return
}
@ -641,11 +662,10 @@ func (ac *addrConn) transportMonitor() {
// In both cases, a new ac is created.
select {
case <-t.Error():
ac.tearDown(errNetworkIO)
ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
default:
ac.tearDown(errConnDrain)
ac.cc.resetAddrConn(ac.addr, true, errConnDrain)
}
ac.cc.newAddrConn(ac.addr, true)
return
case <-t.Error():
select {
@ -653,8 +673,7 @@ func (ac *addrConn) transportMonitor() {
t.Close()
return
case <-t.GoAway():
ac.tearDown(errNetworkIO)
ac.cc.newAddrConn(ac.addr, true)
ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
return
default:
}
@ -715,16 +734,10 @@ func (ac *addrConn) wait(ctx context.Context, failFast bool) (transport.ClientTr
// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
// some edge cases (e.g., the caller opens and closes many addrConn's in a
// tight loop.
// tearDown doesn't remove ac from ac.cc.conns.
func (ac *addrConn) tearDown(err error) {
ac.mu.Lock()
defer func() {
ac.mu.Unlock()
ac.cc.mu.Lock()
if ac.cc.conns != nil {
delete(ac.cc.conns, ac.addr)
}
ac.cc.mu.Unlock()
}()
defer ac.mu.Unlock()
if ac.down != nil {
ac.down(downErrorf(false, false, "%v", err))
ac.down = nil