diff --git a/clientconn.go b/clientconn.go index 53a12125..c38110da 100644 --- a/clientconn.go +++ b/clientconn.go @@ -53,6 +53,9 @@ var ( // ErrClientConnClosing indicates that the operation is illegal because // the ClientConn is closing. ErrClientConnClosing = errors.New("grpc: the client connection is closing") + // ErrClientConnTimeout indicates that the ClientConn cannot establish the + // underlying connections within the specified timeout. + ErrClientConnTimeout = errors.New("grpc: timed out when dialing") // errNoTransportSecurity indicates that there is no transport security // being set for ClientConn. Users should either set one or explicitly @@ -62,15 +65,13 @@ var ( // (e.g., oauth2 token) which requires secure connection on an insecure // connection. errCredentialsMisuse = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportAuthenticator() to set)") - // errClientConnTimeout indicates that the connection could not be - // established or re-established within the specified timeout. - errClientConnTimeout = errors.New("grpc: timed out trying to connect") // errNetworkIP indicates that the connection is down due to some network I/O error. errNetworkIO = errors.New("grpc: failed with network I/O error") // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs. errConnDrain = errors.New("grpc: the connection is drained") // errConnClosing indicates that the connection is closing. errConnClosing = errors.New("grpc: the connection is closing") + errNoAddr = errors.New("grpc: there is no address available to dial") // minimum time to give a connection to complete minConnectTimeout = 20 * time.Second ) @@ -85,6 +86,7 @@ type dialOptions struct { balancer Balancer block bool insecure bool + timeout time.Duration copts transport.ConnectOptions } @@ -182,10 +184,11 @@ func WithPerRPCCredentials(creds credentials.Credentials) DialOption { } } -// WithTimeout returns a DialOption that configures a timeout for dialing a client connection. +// WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn +// initially. This is valid if and only if WithBlock() is present. func WithTimeout(d time.Duration) DialOption { return func(o *dialOptions) { - o.copts.Timeout = d + o.timeout = d } } @@ -228,26 +231,47 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) { if err := cc.balancer.Start(target); err != nil { return nil, err } + var ( + ok bool + addrs []Address + ) ch := cc.balancer.Notify() if ch == nil { // There is no name resolver installed. - addr := Address{Addr: target} - if err := cc.newAddrConn(addr, false); err != nil { - return nil, err - } + addrs = append(addrs, Address{Addr: target}) } else { - addrs, ok := <-ch + addrs, ok = <-ch if !ok || len(addrs) == 0 { - return nil, fmt.Errorf("grpc: there is no address available to dial") + return nil, errNoAddr } + } + waitC := make(chan error) + go func() { for _, a := range addrs { if err := cc.newAddrConn(a, false); err != nil { - return nil, err + waitC <- err + return } } + close(waitC) + }() + var timeoutCh <-chan time.Time + if cc.dopts.timeout > 0 { + timeoutCh = time.After(cc.dopts.timeout) + } + select { + case err := <-waitC: + if err != nil { + cc.Close() + return nil, err + } + case <-timeoutCh: + cc.Close() + return nil, ErrClientConnTimeout + } + if ok { go cc.lbWatcher() } - colonPos := strings.LastIndex(target, ":") if colonPos == -1 { colonPos = len(target) @@ -517,7 +541,6 @@ func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState Connecti func (ac *addrConn) resetTransport(closeTransport bool) error { var retries int - start := time.Now() for { ac.mu.Lock() ac.printf("connecting") @@ -537,29 +560,13 @@ func (ac *addrConn) resetTransport(closeTransport bool) error { if closeTransport && t != nil { t.Close() } - // Adjust timeout for the current try. - copts := ac.dopts.copts - if copts.Timeout < 0 { - ac.tearDown(errClientConnTimeout) - return errClientConnTimeout - } - if copts.Timeout > 0 { - copts.Timeout -= time.Since(start) - if copts.Timeout <= 0 { - ac.tearDown(errClientConnTimeout) - return errClientConnTimeout - } - } sleepTime := ac.dopts.bs.backoff(retries) - timeout := sleepTime - if timeout < minConnectTimeout { - timeout = minConnectTimeout - } - if copts.Timeout == 0 || copts.Timeout > timeout { - copts.Timeout = timeout + ac.dopts.copts.Timeout = sleepTime + if sleepTime < minConnectTimeout { + ac.dopts.copts.Timeout = minConnectTimeout } connectTime := time.Now() - newTransport, err := transport.NewClientTransport(ac.addr.Addr, &copts) + newTransport, err := transport.NewClientTransport(ac.addr.Addr, &ac.dopts.copts) if err != nil { ac.mu.Lock() if ac.state == Shutdown { @@ -579,14 +586,6 @@ func (ac *addrConn) resetTransport(closeTransport bool) error { if sleepTime < 0 { sleepTime = 0 } - // Fail early before falling into sleep. - if ac.dopts.copts.Timeout > 0 && ac.dopts.copts.Timeout < sleepTime+time.Since(start) { - ac.mu.Lock() - ac.errorf("connection timeout") - ac.mu.Unlock() - ac.tearDown(errClientConnTimeout) - return errClientConnTimeout - } closeTransport = false select { case <-time.After(sleepTime): diff --git a/clientconn_test.go b/clientconn_test.go index 09d7f110..d60a3aee 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -47,8 +47,8 @@ func TestDialTimeout(t *testing.T) { if err == nil { conn.Close() } - if err != errClientConnTimeout { - t.Fatalf("Dial(_, _) = %v, %v, want %v", conn, err, errClientConnTimeout) + if err != ErrClientConnTimeout { + t.Fatalf("Dial(_, _) = %v, %v, want %v", conn, err, ErrClientConnTimeout) } } @@ -61,8 +61,8 @@ func TestTLSDialTimeout(t *testing.T) { if err == nil { conn.Close() } - if err != errClientConnTimeout { - t.Fatalf("grpc.Dial(_, _) = %v, %v, want %v", conn, err, errClientConnTimeout) + if err != ErrClientConnTimeout { + t.Fatalf("grpc.Dial(_, _) = %v, %v, want %v", conn, err, ErrClientConnTimeout) } } diff --git a/test/end2end_test.go b/test/end2end_test.go index 30e5b8dc..0c29e159 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -296,61 +296,6 @@ func (s *testServer) HalfDuplexCall(stream testpb.TestService_HalfDuplexCallServ const tlsDir = "testdata/" -func TestReconnectTimeout(t *testing.T) { - defer leakCheck(t)() - restore := declareLogNoise(t, - "transport: http2Client.notifyError got notified that the client transport was broken", - "grpc: Conn.resetTransport failed to create client transport: connection error: desc = \"transport", - "grpc: Conn.transportMonitor exits due to: grpc: timed out trying to connect", - ) - defer restore() - - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("Failed to listen: %v", err) - } - _, port, err := net.SplitHostPort(lis.Addr().String()) - if err != nil { - t.Fatalf("Failed to parse listener address: %v", err) - } - addr := "localhost:" + port - conn, err := grpc.Dial(addr, grpc.WithTimeout(5*time.Second), grpc.WithBlock(), grpc.WithInsecure()) - if err != nil { - t.Fatalf("Failed to dial to the server %q: %v", addr, err) - } - // Close unaccepted connection (i.e., conn). - lis.Close() - tc := testpb.NewTestServiceClient(conn) - waitC := make(chan struct{}) - go func() { - defer close(waitC) - const argSize = 271828 - const respSize = 314159 - - payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) - if err != nil { - t.Error(err) - return - } - - req := &testpb.SimpleRequest{ - ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(), - ResponseSize: proto.Int32(respSize), - Payload: payload, - } - ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond) - if _, err := tc.UnaryCall(ctx, req); err == nil { - t.Errorf("TestService/UnaryCall(_, _) = _, , want _, non-nil") - return - } - }() - // Block until reconnect times out. - <-waitC - if err := conn.Close(); err != nil { - t.Fatalf("%v.Close() = %v, want ", conn, err) - } -} - func unixDialer(addr string, timeout time.Duration) (net.Conn, error) { return net.DialTimeout("unix", addr, timeout) } diff --git a/transport/transport.go b/transport/transport.go index 1c9af545..1e9d0c01 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -338,7 +338,7 @@ type ConnectOptions struct { Dialer func(string, time.Duration) (net.Conn, error) // AuthOptions stores the credentials required to setup a client connection and/or issue RPCs. AuthOptions []credentials.Credentials - // Timeout specifies the timeout for dialing a client connection. + // Timeout specifies the timeout for dialing a ClientTransport. Timeout time.Duration }