diff --git a/clientconn.go b/clientconn.go index 4f2c77ff..12ecf3b4 100644 --- a/clientconn.go +++ b/clientconn.go @@ -643,11 +643,9 @@ func (cc *ClientConn) incrCallsFailed() { atomic.AddInt64(&cc.czData.callsFailed, 1) } -// connect starts to creating transport and also starts the transport monitor -// goroutine for this ac. +// connect starts creating a transport. // It does nothing if the ac is not IDLE. // TODO(bar) Move this to the addrConn section. -// This was part of resetAddrConn, keep it here to make the diff look clean. func (ac *addrConn) connect() error { ac.mu.Lock() if ac.state == connectivity.Shutdown { @@ -963,6 +961,23 @@ func (ac *addrConn) resetTransport(resolveNow bool) { ac.mu.Unlock() } + ac.mu.Lock() + if ac.state == connectivity.Shutdown { + ac.mu.Unlock() + return + } + + // If the connection is READY, a failure must have occurred. + // Otherwise, we'll consider this is a transient failure when: + // We've exhausted all addresses + // We're in CONNECTING + // And it's not the very first addr to try TODO(deklerk) find a better way to do this than checking ac.successfulHandshake + if ac.state == connectivity.Ready || (ac.addrIdx == len(ac.addrs)-1 && ac.state == connectivity.Connecting && !ac.successfulHandshake) { + ac.updateConnectivityState(connectivity.TransientFailure) + ac.cc.handleSubConnStateChange(ac.acbw, ac.state) + } + ac.mu.Unlock() + if err := ac.nextAddr(); err != nil { return } @@ -1109,6 +1124,8 @@ func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts // We got the preface - huzzah! things are good. case <-onCloseCalled: // The transport has already closed - noop. + close(allowedToReset) + return nil } } else { go func() { @@ -1213,8 +1230,6 @@ func (ac *addrConn) nextAddr() error { ac.mu.Unlock() return errConnClosing } - ac.updateConnectivityState(connectivity.TransientFailure) - ac.cc.handleSubConnStateChange(ac.acbw, ac.state) ac.cc.resolveNow(resolver.ResolveNowOption{}) if ac.ready != nil { close(ac.ready) diff --git a/clientconn_test.go b/clientconn_test.go index b8c11934..720fc35c 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -134,11 +134,11 @@ func TestDialWithMultipleBackendsNotSendingServerPreface(t *testing.T) { func TestDialWaitsForServerSettings(t *testing.T) { defer leakcheck.Check(t) - server, err := net.Listen("tcp", "localhost:0") + lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("Error while listening. Err: %v", err) } - defer server.Close() + defer lis.Close() done := make(chan struct{}) sent := make(chan struct{}) dialDone := make(chan struct{}) @@ -146,7 +146,7 @@ func TestDialWaitsForServerSettings(t *testing.T) { defer func() { close(done) }() - conn, err := server.Accept() + conn, err := lis.Accept() if err != nil { t.Errorf("Error while accepting. Err: %v", err) return @@ -165,7 +165,7 @@ func TestDialWaitsForServerSettings(t *testing.T) { }() ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - client, err := DialContext(ctx, server.Addr().String(), WithInsecure(), WithWaitForHandshake(), WithBlock()) + client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithWaitForHandshake(), WithBlock()) close(dialDone) if err != nil { t.Fatalf("Error while dialing. Err: %v", err) @@ -182,7 +182,7 @@ func TestDialWaitsForServerSettings(t *testing.T) { func TestDialWaitsForServerSettingsAndFails(t *testing.T) { defer leakcheck.Check(t) - server, err := net.Listen("tcp", "localhost:0") + lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("Error while listening. Err: %v", err) } @@ -193,7 +193,7 @@ func TestDialWaitsForServerSettingsAndFails(t *testing.T) { close(done) }() for { - conn, err := server.Accept() + conn, err := lis.Accept() if err != nil { break } @@ -204,8 +204,8 @@ func TestDialWaitsForServerSettingsAndFails(t *testing.T) { getMinConnectTimeout = func() time.Duration { return time.Second / 2 } ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - client, err := DialContext(ctx, server.Addr().String(), WithInsecure(), WithWaitForHandshake(), WithBlock()) - server.Close() + client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithWaitForHandshake(), WithBlock()) + lis.Close() if err == nil { client.Close() t.Fatalf("Unexpected success (err=nil) while dialing") @@ -305,17 +305,17 @@ func TestCloseConnectionWhenServerPrefaceNotReceived(t *testing.T) { func TestBackoffWhenNoServerPrefaceReceived(t *testing.T) { defer leakcheck.Check(t) - server, err := net.Listen("tcp", "localhost:0") + lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("Error while listening. Err: %v", err) } - defer server.Close() + defer lis.Close() done := make(chan struct{}) go func() { // Launch the server. defer func() { close(done) }() - conn, err := server.Accept() // Accept the connection only to close it immediately. + conn, err := lis.Accept() // Accept the connection only to close it immediately. if err != nil { t.Errorf("Error while accepting. Err: %v", err) return @@ -325,7 +325,7 @@ func TestBackoffWhenNoServerPrefaceReceived(t *testing.T) { var prevDuration time.Duration // Make sure the retry attempts are backed off properly. for i := 0; i < 3; i++ { - conn, err := server.Accept() + conn, err := lis.Accept() if err != nil { t.Errorf("Error while accepting. Err: %v", err) return @@ -341,7 +341,7 @@ func TestBackoffWhenNoServerPrefaceReceived(t *testing.T) { prevAt = meow } }() - client, err := Dial(server.Addr().String(), WithInsecure()) + client, err := Dial(lis.Addr().String(), WithInsecure()) if err != nil { t.Fatalf("Error while dialing. Err: %v", err) } @@ -820,3 +820,150 @@ func TestBackoffCancel(t *testing.T) { cc.Close() // Should not leak. May need -count 5000 to exercise. } + +// CONNECTING -> READY -> TRANSIENT FAILURE -> CONNECTING -> TRANSIENT FAILURE -> CONNECTING -> TRANSIENT FAILURE +// +// Note: csmgr (which drives GetState and WaitForStateChange) lags behind reality a bit because state updates go +// through the balancer. So, we are somewhat overaggressive in using WaitForStateChange in this test in order to force +// it to keep up. +// +// TODO(deklerk) Rewrite this test with a custom balancer that records state transitions. This will mean we can get +// rid of all this synchronization and realtime state-checking, and instead just check the state transitions at +// once after all the activity has happened. +func TestDialCloseStateTransition(t *testing.T) { + defer leakcheck.Check(t) + + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Error while listening. Err: %v", err) + } + defer lis.Close() + testFinished := make(chan struct{}) + backoffCaseReady := make(chan struct{}) + killFirstConnection := make(chan struct{}) + killSecondConnection := make(chan struct{}) + + // Launch the server. + go func() { + // Establish a successful connection so that we enter READY. We need to get + // to READY so that we can get a client back for us to introspect later (as + // opposed to just CONNECTING). + conn, err := lis.Accept() + if err != nil { + t.Error(err) + return + } + defer conn.Close() + + framer := http2.NewFramer(conn, conn) + if err := framer.WriteSettings(http2.Setting{}); err != nil { + t.Errorf("Error while writing settings frame. %v", err) + return + } + + select { + case <-testFinished: + return + case <-killFirstConnection: + } + + // Close the conn to cause onShutdown, causing us to enter TRANSIENT FAILURE. Note that we are not in + // WaitForHandshake at this point because the preface was sent successfully. + conn.Close() + + // We have to re-accept and re-close the connection because the first re-connect after a successful handshake + // has no backoff. So, we need to get to the second re-connect after the successful handshake for our infinite + // backoff to happen. + conn, err = lis.Accept() + if err != nil { + t.Error(err) + return + } + err = conn.Close() + if err != nil { + t.Error(err) + } + + // The client should now be headed towards backoff. + close(backoffCaseReady) + + // Re-connect (without server preface). + conn, err = lis.Accept() + if err != nil { + t.Error(err) + return + } + + // Close the conn to cause onShutdown, causing us to enter TRANSIENT FAILURE. Note that we are in + // WaitForHandshake at this point because the preface has not been sent yet. + select { + case <-testFinished: + return + case <-killSecondConnection: + } + conn.Close() + }() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithWaitForHandshake(), WithBlock(), withBackoff(backoffForever{})) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + // It should start in READY because the server sends the server preface. + if got, want := client.GetState(), connectivity.Ready; got != want { + t.Fatalf("expected addrconn state to be %v, was %v", want, got) + } + + // Once the connection is killed, it should go: + // READY -> TRANSIENT FAILURE (no backoff) -> CONNECTING -> TRANSIENT FAILURE (infinite backoff) + // The first TRANSIENT FAILURE is triggered by closing a channel. Then, we wait for the server to let us know + // when the client has progressed past the first failure (which does not get backoff, because handshake was + // successful). + close(killFirstConnection) + if !client.WaitForStateChange(ctx, connectivity.Ready) { + t.Fatal("expected WaitForStateChange to change state, but it timed out") + } + if !client.WaitForStateChange(ctx, connectivity.TransientFailure) { + t.Fatal("expected WaitForStateChange to change state, but it timed out") + } + <-backoffCaseReady + if !client.WaitForStateChange(ctx, connectivity.Connecting) { + t.Fatal("expected WaitForStateChange to change state, but it timed out") + } + if got, want := client.GetState(), connectivity.TransientFailure; got != want { + t.Fatalf("expected addrconn state to be %v, was %v", want, got) + } + + // Stop backing off, allowing a re-connect. Note: this races with the client actually getting to the backoff, + // so continually reset backoff until we notice the state change. + for i := 0; i < 100; i++ { + client.ResetConnectBackoff() + cctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + defer cancel() + if client.WaitForStateChange(cctx, connectivity.TransientFailure) { + break + } + } + if got, want := client.GetState(), connectivity.Connecting; got != want { + t.Fatalf("expected addrconn state to be %v, was %v", want, got) + } + + select { + case <-testFinished: + case killSecondConnection <- struct{}{}: + } + + // The connection should be killed shortly by the above goroutine, and here we watch for the first new connectivity + // state and make sure it's TRANSIENT FAILURE. This is racy, but fairly accurate - expect it to catch failures + // 90% of the time or so. + if !client.WaitForStateChange(ctx, connectivity.Connecting) { + t.Fatal("expected WaitForStateChange to change state, but it timed out") + } + if got, want := client.GetState(), connectivity.TransientFailure; got != want { + t.Fatalf("expected addrconn state to be %v, was %v", want, got) + } +} diff --git a/test/channelz_test.go b/test/channelz_test.go index 44e12d94..c204e35f 100644 --- a/test/channelz_test.go +++ b/test/channelz_test.go @@ -1342,8 +1342,8 @@ func TestCZSubChannelTraceCreationDeletion(t *testing.T) { if len(scm.Trace.Events) == 0 { return false, fmt.Errorf("there should be at least one trace event for subChannel not 0") } - if scm.Trace.Events[len(scm.Trace.Events)-1].Desc != "Subchannel Deleted" { - return false, fmt.Errorf("the first trace event should be \"Subchannel Deleted\", not %q", scm.Trace.Events[0].Desc) + if got, want := scm.Trace.Events[len(scm.Trace.Events)-1].Desc, "Subchannel Deleted"; got != want { + return false, fmt.Errorf("the last trace event should be %q, not %q", want, got) } return true, nil