From b3ed81a60ba74672ff49e0ba510b9a0c92e6a0e1 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Mon, 23 Oct 2017 14:06:33 -0700 Subject: [PATCH] Fix connectivity state transitions when dialing (#1596) --- balancer/roundrobin/roundrobin_test.go | 38 ++++++++++---------- balancer_conn_wrappers.go | 4 +-- clientconn.go | 48 +++++++++++--------------- pickfirst_test.go | 4 +-- 4 files changed, 44 insertions(+), 50 deletions(-) diff --git a/balancer/roundrobin/roundrobin_test.go b/balancer/roundrobin/roundrobin_test.go index 0335e486..857dfdea 100644 --- a/balancer/roundrobin/roundrobin_test.go +++ b/balancer/roundrobin/roundrobin_test.go @@ -111,13 +111,13 @@ func TestOneBackend(t *testing.T) { // The first RPC should fail because there's no address. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) defer cancel() - if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.DeadlineExceeded { + if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}}) // The second RPC should succeed. - if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { + if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } } @@ -143,7 +143,7 @@ func TestBackendsRoundRobin(t *testing.T) { // The first RPC should fail because there's no address. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) defer cancel() - if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.DeadlineExceeded { + if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } @@ -158,7 +158,7 @@ func TestBackendsRoundRobin(t *testing.T) { for si := 0; si < backendCount; si++ { var connected bool for i := 0; i < 1000; i++ { - if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil { + if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } if p.Addr.String() == test.addresses[si] { @@ -173,7 +173,7 @@ func TestBackendsRoundRobin(t *testing.T) { } for i := 0; i < 3*backendCount; i++ { - if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil { + if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } if p.Addr.String() != test.addresses[i%backendCount] { @@ -202,13 +202,13 @@ func TestAddressesRemoved(t *testing.T) { // The first RPC should fail because there's no address. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) defer cancel() - if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.DeadlineExceeded { + if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}}) // The second RPC should succeed. - if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { + if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } @@ -248,7 +248,7 @@ func TestCloseWithPendingRPC(t *testing.T) { defer wg.Done() // This RPC blocks until cc is closed. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) == codes.DeadlineExceeded { + if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); grpc.Code(err) == codes.DeadlineExceeded { t.Errorf("RPC failed because of deadline after cc is closed; want error the client connection is closing") } cancel() @@ -278,7 +278,7 @@ func TestNewAddressWhileBlocking(t *testing.T) { // The first RPC should fail because there's no address. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) defer cancel() - if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.DeadlineExceeded { + if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } @@ -286,7 +286,7 @@ func TestNewAddressWhileBlocking(t *testing.T) { // The second RPC should succeed. ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err != nil { + if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err != nil { t.Fatalf("EmptyCall() = _, %v, want _, nil", err) } @@ -298,7 +298,7 @@ func TestNewAddressWhileBlocking(t *testing.T) { go func() { defer wg.Done() // This RPC blocks until NewAddress is called. - testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)) + testc.EmptyCall(context.Background(), &testpb.Empty{}) }() } time.Sleep(50 * time.Millisecond) @@ -327,7 +327,7 @@ func TestOneServerDown(t *testing.T) { // The first RPC should fail because there's no address. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) defer cancel() - if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.DeadlineExceeded { + if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } @@ -342,7 +342,7 @@ func TestOneServerDown(t *testing.T) { for si := 0; si < backendCount; si++ { var connected bool for i := 0; i < 1000; i++ { - if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil { + if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } if p.Addr.String() == test.addresses[si] { @@ -357,7 +357,7 @@ func TestOneServerDown(t *testing.T) { } for i := 0; i < 3*backendCount; i++ { - if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil { + if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } if p.Addr.String() != test.addresses[i%backendCount] { @@ -371,7 +371,7 @@ func TestOneServerDown(t *testing.T) { // Loop until see server[backendCount-1] twice without seeing server[backendCount]. var targetSeen int for i := 0; i < 1000; i++ { - if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil { + if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } switch p.Addr.String() { @@ -390,7 +390,7 @@ func TestOneServerDown(t *testing.T) { t.Fatal("Failed to see server[backendCount-1] twice without seeing server[backendCount]") } for i := 0; i < 3*backendCount; i++ { - if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil { + if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } if p.Addr.String() != test.addresses[i%backendCount] { @@ -420,7 +420,7 @@ func TestAllServersDown(t *testing.T) { // The first RPC should fail because there's no address. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) defer cancel() - if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.DeadlineExceeded { + if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } @@ -435,7 +435,7 @@ func TestAllServersDown(t *testing.T) { for si := 0; si < backendCount; si++ { var connected bool for i := 0; i < 1000; i++ { - if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil { + if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } if p.Addr.String() == test.addresses[si] { @@ -450,7 +450,7 @@ func TestAllServersDown(t *testing.T) { } for i := 0; i < 3*backendCount; i++ { - if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil { + if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } if p.Addr.String() != test.addresses[i%backendCount] { diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index 8ec74920..7ab1e78a 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -252,7 +252,7 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) { acbw.ac = ac ac.acbw = acbw if acState != connectivity.Idle { - ac.connect(false) + ac.connect() } } } @@ -260,7 +260,7 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) { func (acbw *acBalancerWrapper) Connect() { acbw.mu.Lock() defer acbw.mu.Unlock() - acbw.ac.connect(false) + acbw.ac.connect() } func (acbw *acBalancerWrapper) getAddrConn() *addrConn { diff --git a/clientconn.go b/clientconn.go index 61ac1a0d..5f5aac41 100644 --- a/clientconn.go +++ b/clientconn.go @@ -709,7 +709,7 @@ func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) { // It does nothing if the ac is not IDLE. // TODO(bar) Move this to the addrConn section. // This was part of resetAddrConn, keep it here to make the diff look clean. -func (ac *addrConn) connect(block bool) error { +func (ac *addrConn) connect() error { ac.mu.Lock() if ac.state == connectivity.Shutdown { ac.mu.Unlock() @@ -723,32 +723,18 @@ func (ac *addrConn) connect(block bool) error { ac.cc.handleSubConnStateChange(ac.acbw, ac.state) ac.mu.Unlock() - if block { + // Start a goroutine connecting to the server asynchronously. + go func() { if err := ac.resetTransport(); err != nil { + grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addrs[0].Addr, err) if err != errConnClosing { + // Keep this ac in cc.conns, to get the reason it's torn down. ac.tearDown(err) } - if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() { - return e.Origin() - } - return err + return } - // Start to monitor the error status of transport. - go ac.transportMonitor() - } else { - // Start a goroutine connecting to the server asynchronously. - go func() { - if err := ac.resetTransport(); err != nil { - grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addrs[0].Addr, err) - if err != errConnClosing { - // Keep this ac in cc.conns, to get the reason it's torn down. - ac.tearDown(err) - } - return - } - ac.transportMonitor() - }() - } + ac.transportMonitor() + }() return nil } @@ -909,6 +895,7 @@ func (ac *addrConn) errorf(format string, a ...interface{}) { // resetTransport recreates a transport to the address for ac. The old // transport will close itself on error or when the clientconn is closed. +// // TODO(bar) make sure all state transitions are valid. func (ac *addrConn) resetTransport() error { ac.mu.Lock() @@ -916,8 +903,6 @@ func (ac *addrConn) resetTransport() error { ac.mu.Unlock() return errConnClosing } - ac.state = connectivity.TransientFailure - ac.cc.handleSubConnStateChange(ac.acbw, ac.state) if ac.ready != nil { close(ac.ready) ac.ready = nil @@ -941,8 +926,10 @@ func (ac *addrConn) resetTransport() error { return errConnClosing } ac.printf("connecting") - ac.state = connectivity.Connecting - ac.cc.handleSubConnStateChange(ac.acbw, ac.state) + if ac.state != connectivity.Connecting { + ac.state = connectivity.Connecting + ac.cc.handleSubConnStateChange(ac.acbw, ac.state) + } // copy ac.addrs in case of race addrsIter := make([]resolver.Address, len(ac.addrs)) copy(addrsIter, ac.addrs) @@ -1037,6 +1024,13 @@ func (ac *addrConn) transportMonitor() { ac.adjustParams(t.GetGoAwayReason()) default: } + ac.mu.Lock() + // Set connectivity state to TransientFailure before calling + // resetTransport. Transition READY->CONNECTING is not valid. + ac.state = connectivity.TransientFailure + ac.cc.handleSubConnStateChange(ac.acbw, ac.state) + ac.curAddr = resolver.Address{} + ac.mu.Unlock() if err := ac.resetTransport(); err != nil { ac.mu.Lock() ac.printf("transport exiting: %v", err) @@ -1108,7 +1102,7 @@ func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) { ac.mu.Unlock() // Trigger idle ac to connect. if idle { - ac.connect(false) + ac.connect() } return nil, false } diff --git a/pickfirst_test.go b/pickfirst_test.go index f2887543..e58b3422 100644 --- a/pickfirst_test.go +++ b/pickfirst_test.go @@ -128,7 +128,7 @@ func TestNewAddressWhileBlockingPickfirst(t *testing.T) { go func() { defer wg.Done() // This RPC blocks until NewAddress is called. - Invoke(context.Background(), "/foo/bar", &req, &reply, cc, FailFast(false)) + Invoke(context.Background(), "/foo/bar", &req, &reply, cc) }() } time.Sleep(50 * time.Millisecond) @@ -165,7 +165,7 @@ func TestCloseWithPendingRPCPickfirst(t *testing.T) { go func() { defer wg.Done() // This RPC blocks until NewAddress is called. - Invoke(context.Background(), "/foo/bar", &req, &reply, cc, FailFast(false)) + Invoke(context.Background(), "/foo/bar", &req, &reply, cc) }() } time.Sleep(50 * time.Millisecond)