From c91396d4e1406768fa8b43bb5aa7c2e8e4eca79e Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Wed, 30 Nov 2022 08:57:17 -0800 Subject: [PATCH] pickfirst: do not return initial subconn while connecting (#5825) Fixes https://github.com/grpc/grpc-go/issues/5293 --- clientconn.go | 6 ++++++ pickfirst.go | 4 ++-- test/goaway_test.go | 36 ++++++++++++++++++++++++------------ 3 files changed, 32 insertions(+), 14 deletions(-) diff --git a/clientconn.go b/clientconn.go index 422639c7..78c81a10 100644 --- a/clientconn.go +++ b/clientconn.go @@ -788,10 +788,16 @@ func (cc *ClientConn) incrCallsFailed() { func (ac *addrConn) connect() error { ac.mu.Lock() if ac.state == connectivity.Shutdown { + if logger.V(2) { + logger.Infof("connect called on shutdown addrConn; ignoring.") + } ac.mu.Unlock() return errConnClosing } if ac.state != connectivity.Idle { + if logger.V(2) { + logger.Infof("connect called on addrConn in non-idle state (%v); ignoring.", ac.state) + } ac.mu.Unlock() return nil } diff --git a/pickfirst.go b/pickfirst.go index fb7a99e0..b3a55481 100644 --- a/pickfirst.go +++ b/pickfirst.go @@ -102,8 +102,8 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState b.subConn = subConn b.state = connectivity.Idle b.cc.UpdateState(balancer.State{ - ConnectivityState: connectivity.Idle, - Picker: &picker{result: balancer.PickResult{SubConn: b.subConn}}, + ConnectivityState: connectivity.Connecting, + Picker: &picker{err: balancer.ErrNoSubConnAvailable}, }) b.subConn.Connect() return nil diff --git a/test/goaway_test.go b/test/goaway_test.go index 92551dbf..7f44937d 100644 --- a/test/goaway_test.go +++ b/test/goaway_test.go @@ -536,7 +536,7 @@ func testConcurrentServerStopAndGoAway(t *testing.T, e env) { // test ensures that the connection is re-created after GO_AWAY and not affected by the // subsequent (old) connection closure. func (s) TestGoAwayThenClose(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() lis1, err := net.Listen("tcp", "localhost:0") @@ -587,12 +587,21 @@ func (s) TestGoAwayThenClose(t *testing.T) { client := testpb.NewTestServiceClient(cc) + t.Log("Waiting for the ClientConn to enter READY state.") + state := cc.GetState() + for ; state != connectivity.Ready && cc.WaitForStateChange(ctx, state); state = cc.GetState() { + } + if ctx.Err() != nil { + t.Fatalf("timed out waiting for READY channel state; last state = %v", state) + } + // We make a streaming RPC and do an one-message-round-trip to make sure // it's created on connection 1. // // We use a long-lived RPC because it will cause GracefulStop to send - // GO_AWAY, but the connection doesn't get closed until the server stops and + // GO_AWAY, but the connection won't get closed until the server stops and // the client receives the error. + t.Log("Creating first streaming RPC to server 1.") stream, err := client.FullDuplexCall(ctx) if err != nil { t.Fatalf("FullDuplexCall(_) = _, %v; want _, nil", err) @@ -603,37 +612,40 @@ func (s) TestGoAwayThenClose(t *testing.T) { go s2.Serve(lis2) - // Send GO_AWAY to connection 1. + t.Log("Gracefully stopping server 1.") go s1.GracefulStop() - // Wait for the ClientConn to enter IDLE state. - state := cc.GetState() + t.Log("Waiting for the ClientConn to enter IDLE state.") for ; state != connectivity.Idle && cc.WaitForStateChange(ctx, state); state = cc.GetState() { } - if state != connectivity.Idle { + if ctx.Err() != nil { t.Fatalf("timed out waiting for IDLE channel state; last state = %v", state) } - // Initiate another RPC to create another connection. + t.Log("Performing another RPC to create a connection to server 2.") if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err) } - // Assert that connection 2 has been established. - <-conn2Established.Done() + t.Log("Waiting for a connection to server 2.") + select { + case <-conn2Established.Done(): + case <-ctx.Done(): + t.Fatalf("timed out waiting for connection 2 to be established") + } // Close the listener for server2 to prevent it from allowing new connections. lis2.Close() - // Close connection 1. + t.Log("Hard closing connection 1.") s1.Stop() - // Wait for client to close. + t.Log("Waiting for the first stream to error.") if _, err = stream.Recv(); err == nil { t.Fatal("expected the stream to die, but got a successful Recv") } - // Do a bunch of RPCs, make sure it stays stable. These should go to connection 2. + t.Log("Ensuring connection 2 is stable.") for i := 0; i < 10; i++ { if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)