From ff2aa05958775030998dbe2f9bccbe2af324adf4 Mon Sep 17 00:00:00 2001 From: Jean de Klerk Date: Fri, 19 Oct 2018 14:11:21 -0700 Subject: [PATCH] internal: fix GO_AWAY deadlock (#2391) internal: fix GO_AWAY deadlock A deadlock can occur when a GO_AWAY is followed by a connection closure. This happens because onClose needlessly closes the current ac.transport: if a GO_AWAY already occured, and the transport was already reset, then the later closure (of the original address) sets ac.transport - which is now healthy - to nil. The manifestation of this problem is that picker_wrapper spins forever trying to use a READY connection whose ac.transport is nil. --- clientconn.go | 3 -- test/end2end_test.go | 109 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 109 insertions(+), 3 deletions(-) diff --git a/clientconn.go b/clientconn.go index e74f8e40..d0404313 100644 --- a/clientconn.go +++ b/clientconn.go @@ -1066,9 +1066,6 @@ func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts case <-skipReset: // The outer resetTransport loop will handle reconnection. return case <-allowedToReset: // We're in the clear to reset. - ac.mu.Lock() - ac.transport = nil - ac.mu.Unlock() oneReset.Do(func() { ac.resetTransport(false) }) } } diff --git a/test/end2end_test.go b/test/end2end_test.go index cbbeca3c..30d8a8c8 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -57,6 +57,7 @@ import ( healthgrpc "google.golang.org/grpc/health/grpc_health_v1" healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/internal/channelz" + "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/leakcheck" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/keepalive" @@ -6994,3 +6995,111 @@ func testLargeTimeout(t *testing.T, e env) { } } } + +// Proxies typically send GO_AWAY followed by connection closure a minute or so later. This +// test ensures that the connection is re-created after GO_AWAY and not affected by the +// subsequent (old) connection closure. +func TestGoAwayThenClose(t *testing.T) { + defer leakcheck.Check(t) + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + lis1, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Error while listening. Err: %v", err) + } + s1 := grpc.NewServer() + defer s1.Stop() + ts1 := &funcServer{ + unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{}, nil + }, + fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { + // Wait forever. + _, err := stream.Recv() + if err == nil { + t.Error("expected to never receive any message") + } + return err + }, + } + testpb.RegisterTestServiceServer(s1, ts1) + go s1.Serve(lis1) + + conn2Established := grpcsync.NewEvent() + lis2, err := listenWithNotifyingListener("tcp", "localhost:0", conn2Established) + if err != nil { + t.Fatalf("Error while listening. Err: %v", err) + } + s2 := grpc.NewServer() + defer s2.Stop() + conn2Ready := grpcsync.NewEvent() + ts2 := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + conn2Ready.Fire() + return &testpb.SimpleResponse{}, nil + }} + testpb.RegisterTestServiceServer(s2, ts2) + go s2.Serve(lis2) + + r, rcleanup := manual.GenerateAndRegisterManualResolver() + defer rcleanup() + r.InitialAddrs([]resolver.Address{ + {Addr: lis1.Addr().String()}, + {Addr: lis2.Addr().String()}, + }) + cc, err := grpc.DialContext(ctx, r.Scheme()+":///", grpc.WithInsecure()) + if err != nil { + t.Fatalf("Error creating client: %v", err) + } + defer cc.Close() + + client := testpb.NewTestServiceClient(cc) + + // Should go on connection 1. We use a long-lived RPC because it will cause GracefulStop to send GO_AWAY, but the + // connection doesn't get closed until the server stops and the client receives. + stream, err := client.FullDuplexCall(ctx) + if err != nil { + t.Fatalf("FullDuplexCall(_) = _, %v; want _, nil", err) + } + + // Send GO_AWAY to connection 1. + go s1.GracefulStop() + + // Wait for connection 2 to be established. + <-conn2Established.Done() + + // Close connection 1. + s1.Stop() + + // Wait for client to close. + _, err = stream.Recv() + if err == nil { + t.Fatal("expected the stream to die, but got a successful Recv") + } + + // Do a bunch of RPCs, make sure it stays stable. These should go to connection 2. + for i := 0; i < 10; i++ { + if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { + t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err) + } + } +} + +func listenWithNotifyingListener(network, address string, event *grpcsync.Event) (net.Listener, error) { + lis, err := net.Listen(network, address) + if err != nil { + return nil, err + } + return notifyingListener{connEstablished: event, Listener: lis}, nil +} + +type notifyingListener struct { + connEstablished *grpcsync.Event + net.Listener +} + +func (lis notifyingListener) Accept() (net.Conn, error) { + defer lis.connEstablished.Fire() + return lis.Listener.Accept() +}