pickfirst: do not return initial subconn while connecting (#5825)
Fixes https://github.com/grpc/grpc-go/issues/5293
This commit is contained in:
@ -788,10 +788,16 @@ func (cc *ClientConn) incrCallsFailed() {
|
|||||||
func (ac *addrConn) connect() error {
|
func (ac *addrConn) connect() error {
|
||||||
ac.mu.Lock()
|
ac.mu.Lock()
|
||||||
if ac.state == connectivity.Shutdown {
|
if ac.state == connectivity.Shutdown {
|
||||||
|
if logger.V(2) {
|
||||||
|
logger.Infof("connect called on shutdown addrConn; ignoring.")
|
||||||
|
}
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
return errConnClosing
|
return errConnClosing
|
||||||
}
|
}
|
||||||
if ac.state != connectivity.Idle {
|
if ac.state != connectivity.Idle {
|
||||||
|
if logger.V(2) {
|
||||||
|
logger.Infof("connect called on addrConn in non-idle state (%v); ignoring.", ac.state)
|
||||||
|
}
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -102,8 +102,8 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
|
|||||||
b.subConn = subConn
|
b.subConn = subConn
|
||||||
b.state = connectivity.Idle
|
b.state = connectivity.Idle
|
||||||
b.cc.UpdateState(balancer.State{
|
b.cc.UpdateState(balancer.State{
|
||||||
ConnectivityState: connectivity.Idle,
|
ConnectivityState: connectivity.Connecting,
|
||||||
Picker: &picker{result: balancer.PickResult{SubConn: b.subConn}},
|
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
|
||||||
})
|
})
|
||||||
b.subConn.Connect()
|
b.subConn.Connect()
|
||||||
return nil
|
return nil
|
||||||
|
@ -536,7 +536,7 @@ func testConcurrentServerStopAndGoAway(t *testing.T, e env) {
|
|||||||
// test ensures that the connection is re-created after GO_AWAY and not affected by the
|
// test ensures that the connection is re-created after GO_AWAY and not affected by the
|
||||||
// subsequent (old) connection closure.
|
// subsequent (old) connection closure.
|
||||||
func (s) TestGoAwayThenClose(t *testing.T) {
|
func (s) TestGoAwayThenClose(t *testing.T) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
lis1, err := net.Listen("tcp", "localhost:0")
|
lis1, err := net.Listen("tcp", "localhost:0")
|
||||||
@ -587,12 +587,21 @@ func (s) TestGoAwayThenClose(t *testing.T) {
|
|||||||
|
|
||||||
client := testpb.NewTestServiceClient(cc)
|
client := testpb.NewTestServiceClient(cc)
|
||||||
|
|
||||||
|
t.Log("Waiting for the ClientConn to enter READY state.")
|
||||||
|
state := cc.GetState()
|
||||||
|
for ; state != connectivity.Ready && cc.WaitForStateChange(ctx, state); state = cc.GetState() {
|
||||||
|
}
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
t.Fatalf("timed out waiting for READY channel state; last state = %v", state)
|
||||||
|
}
|
||||||
|
|
||||||
// We make a streaming RPC and do an one-message-round-trip to make sure
|
// We make a streaming RPC and do an one-message-round-trip to make sure
|
||||||
// it's created on connection 1.
|
// it's created on connection 1.
|
||||||
//
|
//
|
||||||
// We use a long-lived RPC because it will cause GracefulStop to send
|
// We use a long-lived RPC because it will cause GracefulStop to send
|
||||||
// GO_AWAY, but the connection doesn't get closed until the server stops and
|
// GO_AWAY, but the connection won't get closed until the server stops and
|
||||||
// the client receives the error.
|
// the client receives the error.
|
||||||
|
t.Log("Creating first streaming RPC to server 1.")
|
||||||
stream, err := client.FullDuplexCall(ctx)
|
stream, err := client.FullDuplexCall(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("FullDuplexCall(_) = _, %v; want _, nil", err)
|
t.Fatalf("FullDuplexCall(_) = _, %v; want _, nil", err)
|
||||||
@ -603,37 +612,40 @@ func (s) TestGoAwayThenClose(t *testing.T) {
|
|||||||
|
|
||||||
go s2.Serve(lis2)
|
go s2.Serve(lis2)
|
||||||
|
|
||||||
// Send GO_AWAY to connection 1.
|
t.Log("Gracefully stopping server 1.")
|
||||||
go s1.GracefulStop()
|
go s1.GracefulStop()
|
||||||
|
|
||||||
// Wait for the ClientConn to enter IDLE state.
|
t.Log("Waiting for the ClientConn to enter IDLE state.")
|
||||||
state := cc.GetState()
|
|
||||||
for ; state != connectivity.Idle && cc.WaitForStateChange(ctx, state); state = cc.GetState() {
|
for ; state != connectivity.Idle && cc.WaitForStateChange(ctx, state); state = cc.GetState() {
|
||||||
}
|
}
|
||||||
if state != connectivity.Idle {
|
if ctx.Err() != nil {
|
||||||
t.Fatalf("timed out waiting for IDLE channel state; last state = %v", state)
|
t.Fatalf("timed out waiting for IDLE channel state; last state = %v", state)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initiate another RPC to create another connection.
|
t.Log("Performing another RPC to create a connection to server 2.")
|
||||||
if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
|
if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
|
||||||
t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
|
t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Assert that connection 2 has been established.
|
t.Log("Waiting for a connection to server 2.")
|
||||||
<-conn2Established.Done()
|
select {
|
||||||
|
case <-conn2Established.Done():
|
||||||
|
case <-ctx.Done():
|
||||||
|
t.Fatalf("timed out waiting for connection 2 to be established")
|
||||||
|
}
|
||||||
|
|
||||||
// Close the listener for server2 to prevent it from allowing new connections.
|
// Close the listener for server2 to prevent it from allowing new connections.
|
||||||
lis2.Close()
|
lis2.Close()
|
||||||
|
|
||||||
// Close connection 1.
|
t.Log("Hard closing connection 1.")
|
||||||
s1.Stop()
|
s1.Stop()
|
||||||
|
|
||||||
// Wait for client to close.
|
t.Log("Waiting for the first stream to error.")
|
||||||
if _, err = stream.Recv(); err == nil {
|
if _, err = stream.Recv(); err == nil {
|
||||||
t.Fatal("expected the stream to die, but got a successful Recv")
|
t.Fatal("expected the stream to die, but got a successful Recv")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do a bunch of RPCs, make sure it stays stable. These should go to connection 2.
|
t.Log("Ensuring connection 2 is stable.")
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
|
if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
|
||||||
t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
|
t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
|
||||||
|
Reference in New Issue
Block a user