fix a race condition causing http2Client.controller leaking
This commit is contained in:
@ -363,9 +363,9 @@ func (cc *ClientConn) lbWatcher() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
|
// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
|
||||||
// If there is an old addrConn for addr, it will be torn down, using errTearDown as the reason.
|
// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
|
||||||
// If errTearDown is nil, errConnDrain will be used instead.
|
// If tearDownErr is nil, errConnDrain will be used instead.
|
||||||
func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, errTearDown error) error {
|
func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr error) error {
|
||||||
ac := &addrConn{
|
ac := &addrConn{
|
||||||
cc: cc,
|
cc: cc,
|
||||||
addr: addr,
|
addr: addr,
|
||||||
@ -401,19 +401,18 @@ func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, errTearDown err
|
|||||||
ac.cc.mu.Unlock()
|
ac.cc.mu.Unlock()
|
||||||
if stale != nil {
|
if stale != nil {
|
||||||
// There is an addrConn alive on ac.addr already. This could be due to
|
// There is an addrConn alive on ac.addr already. This could be due to
|
||||||
// 1) stale's Close is undergoing;
|
// 1) a buggy Balancer notifies duplicated Addresses;
|
||||||
// 2) a buggy Balancer notifies duplicated Addresses;
|
// 2) goaway was received, a new ac will replace the old ac.
|
||||||
// 3) goaway was received, a new ac will replace the old ac.
|
|
||||||
// The old ac should be deleted from cc.conns, but the
|
// The old ac should be deleted from cc.conns, but the
|
||||||
// underlying transport should drain rather than close.
|
// underlying transport should drain rather than close.
|
||||||
if errTearDown == nil {
|
if tearDownErr == nil {
|
||||||
// errTearDown is nil if resetAddrConn is called by
|
// tearDownErr is nil if resetAddrConn is called by
|
||||||
// 1) Dial
|
// 1) Dial
|
||||||
// 2) lbWatcher
|
// 2) lbWatcher
|
||||||
// In both cases, the stale ac should drain, not close.
|
// In both cases, the stale ac should drain, not close.
|
||||||
stale.tearDown(errConnDrain)
|
stale.tearDown(errConnDrain)
|
||||||
} else {
|
} else {
|
||||||
stale.tearDown(errTearDown)
|
stale.tearDown(tearDownErr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// skipWait may overwrite the decision in ac.dopts.block.
|
// skipWait may overwrite the decision in ac.dopts.block.
|
||||||
|
@ -613,9 +613,9 @@ func testServerGoAway(t *testing.T, e env) {
|
|||||||
}
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
// A new RPC should fail with Unavailable error.
|
// A new RPC should fail.
|
||||||
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err == nil || grpc.Code(err) != codes.Unavailable {
|
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.Unavailable && grpc.Code(err) != codes.Internal {
|
||||||
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, error code: %d", err, codes.Unavailable)
|
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s or %s", err, codes.Unavailable, codes.Internal)
|
||||||
}
|
}
|
||||||
<-ch
|
<-ch
|
||||||
awaitNewConnLogOutput()
|
awaitNewConnLogOutput()
|
||||||
|
@ -496,7 +496,14 @@ func (t *http2Client) Close() (err error) {
|
|||||||
|
|
||||||
func (t *http2Client) GracefulClose() error {
|
func (t *http2Client) GracefulClose() error {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
if t.state == closing || t.state == unreachable {
|
switch t.state {
|
||||||
|
case unreachable:
|
||||||
|
// The server may close the connection concurrently. t is not available for
|
||||||
|
// any streams. Close it now.
|
||||||
|
t.mu.Unlock()
|
||||||
|
t.Close()
|
||||||
|
return nil
|
||||||
|
case closing:
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user