Fix connectivity state transitions when dialing (#1596)

This commit is contained in:
Menghan Li
2017-10-23 14:06:33 -07:00
committed by GitHub
parent 0c2d9db1f9
commit b3ed81a60b
4 changed files with 44 additions and 50 deletions

View File

@ -111,13 +111,13 @@ func TestOneBackend(t *testing.T) {
// The first RPC should fail because there's no address. // The first RPC should fail because there's no address.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel() defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.DeadlineExceeded { if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
} }
r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}}) r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
// The second RPC should succeed. // The second RPC should succeed.
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
} }
} }
@ -143,7 +143,7 @@ func TestBackendsRoundRobin(t *testing.T) {
// The first RPC should fail because there's no address. // The first RPC should fail because there's no address.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel() defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.DeadlineExceeded { if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
} }
@ -158,7 +158,7 @@ func TestBackendsRoundRobin(t *testing.T) {
for si := 0; si < backendCount; si++ { for si := 0; si < backendCount; si++ {
var connected bool var connected bool
for i := 0; i < 1000; i++ { for i := 0; i < 1000; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil { if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
} }
if p.Addr.String() == test.addresses[si] { if p.Addr.String() == test.addresses[si] {
@ -173,7 +173,7 @@ func TestBackendsRoundRobin(t *testing.T) {
} }
for i := 0; i < 3*backendCount; i++ { for i := 0; i < 3*backendCount; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil { if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
} }
if p.Addr.String() != test.addresses[i%backendCount] { if p.Addr.String() != test.addresses[i%backendCount] {
@ -202,13 +202,13 @@ func TestAddressesRemoved(t *testing.T) {
// The first RPC should fail because there's no address. // The first RPC should fail because there's no address.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel() defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.DeadlineExceeded { if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
} }
r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}}) r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
// The second RPC should succeed. // The second RPC should succeed.
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
} }
@ -248,7 +248,7 @@ func TestCloseWithPendingRPC(t *testing.T) {
defer wg.Done() defer wg.Done()
// This RPC blocks until cc is closed. // This RPC blocks until cc is closed.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) == codes.DeadlineExceeded { if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); grpc.Code(err) == codes.DeadlineExceeded {
t.Errorf("RPC failed because of deadline after cc is closed; want error the client connection is closing") t.Errorf("RPC failed because of deadline after cc is closed; want error the client connection is closing")
} }
cancel() cancel()
@ -278,7 +278,7 @@ func TestNewAddressWhileBlocking(t *testing.T) {
// The first RPC should fail because there's no address. // The first RPC should fail because there's no address.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel() defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.DeadlineExceeded { if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
} }
@ -286,7 +286,7 @@ func TestNewAddressWhileBlocking(t *testing.T) {
// The second RPC should succeed. // The second RPC should succeed.
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err != nil { if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, nil", err) t.Fatalf("EmptyCall() = _, %v, want _, nil", err)
} }
@ -298,7 +298,7 @@ func TestNewAddressWhileBlocking(t *testing.T) {
go func() { go func() {
defer wg.Done() defer wg.Done()
// This RPC blocks until NewAddress is called. // This RPC blocks until NewAddress is called.
testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)) testc.EmptyCall(context.Background(), &testpb.Empty{})
}() }()
} }
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
@ -327,7 +327,7 @@ func TestOneServerDown(t *testing.T) {
// The first RPC should fail because there's no address. // The first RPC should fail because there's no address.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel() defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.DeadlineExceeded { if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
} }
@ -342,7 +342,7 @@ func TestOneServerDown(t *testing.T) {
for si := 0; si < backendCount; si++ { for si := 0; si < backendCount; si++ {
var connected bool var connected bool
for i := 0; i < 1000; i++ { for i := 0; i < 1000; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil { if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
} }
if p.Addr.String() == test.addresses[si] { if p.Addr.String() == test.addresses[si] {
@ -357,7 +357,7 @@ func TestOneServerDown(t *testing.T) {
} }
for i := 0; i < 3*backendCount; i++ { for i := 0; i < 3*backendCount; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil { if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
} }
if p.Addr.String() != test.addresses[i%backendCount] { if p.Addr.String() != test.addresses[i%backendCount] {
@ -371,7 +371,7 @@ func TestOneServerDown(t *testing.T) {
// Loop until see server[backendCount-1] twice without seeing server[backendCount]. // Loop until see server[backendCount-1] twice without seeing server[backendCount].
var targetSeen int var targetSeen int
for i := 0; i < 1000; i++ { for i := 0; i < 1000; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil { if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
} }
switch p.Addr.String() { switch p.Addr.String() {
@ -390,7 +390,7 @@ func TestOneServerDown(t *testing.T) {
t.Fatal("Failed to see server[backendCount-1] twice without seeing server[backendCount]") t.Fatal("Failed to see server[backendCount-1] twice without seeing server[backendCount]")
} }
for i := 0; i < 3*backendCount; i++ { for i := 0; i < 3*backendCount; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil { if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
} }
if p.Addr.String() != test.addresses[i%backendCount] { if p.Addr.String() != test.addresses[i%backendCount] {
@ -420,7 +420,7 @@ func TestAllServersDown(t *testing.T) {
// The first RPC should fail because there's no address. // The first RPC should fail because there's no address.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel() defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.DeadlineExceeded { if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
} }
@ -435,7 +435,7 @@ func TestAllServersDown(t *testing.T) {
for si := 0; si < backendCount; si++ { for si := 0; si < backendCount; si++ {
var connected bool var connected bool
for i := 0; i < 1000; i++ { for i := 0; i < 1000; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil { if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
} }
if p.Addr.String() == test.addresses[si] { if p.Addr.String() == test.addresses[si] {
@ -450,7 +450,7 @@ func TestAllServersDown(t *testing.T) {
} }
for i := 0; i < 3*backendCount; i++ { for i := 0; i < 3*backendCount; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil { if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
} }
if p.Addr.String() != test.addresses[i%backendCount] { if p.Addr.String() != test.addresses[i%backendCount] {

View File

@ -252,7 +252,7 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
acbw.ac = ac acbw.ac = ac
ac.acbw = acbw ac.acbw = acbw
if acState != connectivity.Idle { if acState != connectivity.Idle {
ac.connect(false) ac.connect()
} }
} }
} }
@ -260,7 +260,7 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
func (acbw *acBalancerWrapper) Connect() { func (acbw *acBalancerWrapper) Connect() {
acbw.mu.Lock() acbw.mu.Lock()
defer acbw.mu.Unlock() defer acbw.mu.Unlock()
acbw.ac.connect(false) acbw.ac.connect()
} }
func (acbw *acBalancerWrapper) getAddrConn() *addrConn { func (acbw *acBalancerWrapper) getAddrConn() *addrConn {

View File

@ -709,7 +709,7 @@ func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
// It does nothing if the ac is not IDLE. // It does nothing if the ac is not IDLE.
// TODO(bar) Move this to the addrConn section. // TODO(bar) Move this to the addrConn section.
// This was part of resetAddrConn, keep it here to make the diff look clean. // This was part of resetAddrConn, keep it here to make the diff look clean.
func (ac *addrConn) connect(block bool) error { func (ac *addrConn) connect() error {
ac.mu.Lock() ac.mu.Lock()
if ac.state == connectivity.Shutdown { if ac.state == connectivity.Shutdown {
ac.mu.Unlock() ac.mu.Unlock()
@ -723,32 +723,18 @@ func (ac *addrConn) connect(block bool) error {
ac.cc.handleSubConnStateChange(ac.acbw, ac.state) ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.mu.Unlock() ac.mu.Unlock()
if block { // Start a goroutine connecting to the server asynchronously.
go func() {
if err := ac.resetTransport(); err != nil { if err := ac.resetTransport(); err != nil {
grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addrs[0].Addr, err)
if err != errConnClosing { if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err) ac.tearDown(err)
} }
if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() { return
return e.Origin()
}
return err
} }
// Start to monitor the error status of transport. ac.transportMonitor()
go ac.transportMonitor() }()
} else {
// Start a goroutine connecting to the server asynchronously.
go func() {
if err := ac.resetTransport(); err != nil {
grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addrs[0].Addr, err)
if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err)
}
return
}
ac.transportMonitor()
}()
}
return nil return nil
} }
@ -909,6 +895,7 @@ func (ac *addrConn) errorf(format string, a ...interface{}) {
// resetTransport recreates a transport to the address for ac. The old // resetTransport recreates a transport to the address for ac. The old
// transport will close itself on error or when the clientconn is closed. // transport will close itself on error or when the clientconn is closed.
//
// TODO(bar) make sure all state transitions are valid. // TODO(bar) make sure all state transitions are valid.
func (ac *addrConn) resetTransport() error { func (ac *addrConn) resetTransport() error {
ac.mu.Lock() ac.mu.Lock()
@ -916,8 +903,6 @@ func (ac *addrConn) resetTransport() error {
ac.mu.Unlock() ac.mu.Unlock()
return errConnClosing return errConnClosing
} }
ac.state = connectivity.TransientFailure
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
if ac.ready != nil { if ac.ready != nil {
close(ac.ready) close(ac.ready)
ac.ready = nil ac.ready = nil
@ -941,8 +926,10 @@ func (ac *addrConn) resetTransport() error {
return errConnClosing return errConnClosing
} }
ac.printf("connecting") ac.printf("connecting")
ac.state = connectivity.Connecting if ac.state != connectivity.Connecting {
ac.cc.handleSubConnStateChange(ac.acbw, ac.state) ac.state = connectivity.Connecting
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
}
// copy ac.addrs in case of race // copy ac.addrs in case of race
addrsIter := make([]resolver.Address, len(ac.addrs)) addrsIter := make([]resolver.Address, len(ac.addrs))
copy(addrsIter, ac.addrs) copy(addrsIter, ac.addrs)
@ -1037,6 +1024,13 @@ func (ac *addrConn) transportMonitor() {
ac.adjustParams(t.GetGoAwayReason()) ac.adjustParams(t.GetGoAwayReason())
default: default:
} }
ac.mu.Lock()
// Set connectivity state to TransientFailure before calling
// resetTransport. Transition READY->CONNECTING is not valid.
ac.state = connectivity.TransientFailure
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.curAddr = resolver.Address{}
ac.mu.Unlock()
if err := ac.resetTransport(); err != nil { if err := ac.resetTransport(); err != nil {
ac.mu.Lock() ac.mu.Lock()
ac.printf("transport exiting: %v", err) ac.printf("transport exiting: %v", err)
@ -1108,7 +1102,7 @@ func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
ac.mu.Unlock() ac.mu.Unlock()
// Trigger idle ac to connect. // Trigger idle ac to connect.
if idle { if idle {
ac.connect(false) ac.connect()
} }
return nil, false return nil, false
} }

View File

@ -128,7 +128,7 @@ func TestNewAddressWhileBlockingPickfirst(t *testing.T) {
go func() { go func() {
defer wg.Done() defer wg.Done()
// This RPC blocks until NewAddress is called. // This RPC blocks until NewAddress is called.
Invoke(context.Background(), "/foo/bar", &req, &reply, cc, FailFast(false)) Invoke(context.Background(), "/foo/bar", &req, &reply, cc)
}() }()
} }
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
@ -165,7 +165,7 @@ func TestCloseWithPendingRPCPickfirst(t *testing.T) {
go func() { go func() {
defer wg.Done() defer wg.Done()
// This RPC blocks until NewAddress is called. // This RPC blocks until NewAddress is called.
Invoke(context.Background(), "/foo/bar", &req, &reply, cc, FailFast(false)) Invoke(context.Background(), "/foo/bar", &req, &reply, cc)
}() }()
} }
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)