From a5ca6e56d2a544cd616d91d9d5162b4f15289daa Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Tue, 3 Mar 2015 17:08:39 -0800 Subject: [PATCH 01/18] Support timeout for grpc.Dial --- clientconn.go | 47 +++++++++++++++++++++++++------------ credentials/credentials.go | 22 ++++++++++++----- test/end2end_test.go | 10 ++++++++ transport/http2_client.go | 21 ++++++++++++----- transport/transport.go | 14 ++++++++--- transport/transport_test.go | 7 ++++-- 6 files changed, 89 insertions(+), 32 deletions(-) diff --git a/clientconn.go b/clientconn.go index d6f1042a..596bb704 100644 --- a/clientconn.go +++ b/clientconn.go @@ -36,6 +36,7 @@ package grpc import ( "errors" "log" + "net" "sync" "time" @@ -50,30 +51,35 @@ var ( // ErrClientConnClosing indicates that the operation is illegal because // the session is closing. ErrClientConnClosing = errors.New("grpc: the client connection is closing") + // ErrClientConnTimeout indicates that the connection could not be + // established within the specified timeout. + ErrClientConnTimeout = errors.New("grpc: timed out trying to connect") ) -type dialOptions struct { - protocol string - authOptions []credentials.Credentials -} - -// DialOption configures how we set up the connection including auth -// credentials. -type DialOption func(*dialOptions) +// DialOption configures how we set up the connection. +type DialOption func(*transport.DialOptions) // WithTransportCredentials returns a DialOption which configures a // connection level security credentials (e.g., TLS/SSL). func WithTransportCredentials(creds credentials.TransportAuthenticator) DialOption { - return func(o *dialOptions) { - o.authOptions = append(o.authOptions, creds) + return func(o *transport.DialOptions) { + o.AuthOptions = append(o.AuthOptions, creds) } } // WithPerRPCCredentials returns a DialOption which sets // credentials which will place auth state on each outbound RPC. func WithPerRPCCredentials(creds credentials.Credentials) DialOption { - return func(o *dialOptions) { - o.authOptions = append(o.authOptions, creds) + return func(o *transport.DialOptions) { + o.AuthOptions = append(o.AuthOptions, creds) + } +} + +// WithTimeout returns a DialOption which configures a timeout for dialing a +// client connection. +func WithTimeout(d time.Duration) DialOption { + return func(o *transport.DialOptions) { + o.Timeout = d } } @@ -102,7 +108,7 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) { // ClientConn represents a client connection to an RPC service. type ClientConn struct { target string - dopts dialOptions + dopts transport.DialOptions shutdownChan chan struct{} mu sync.Mutex @@ -119,6 +125,7 @@ type ClientConn struct { func (cc *ClientConn) resetTransport(closeTransport bool) error { var retries int + start := time.Now() for { cc.mu.Lock() t := cc.transport @@ -133,12 +140,22 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { if closeTransport { t.Close() } - newTransport, err := transport.NewClientTransport(cc.dopts.protocol, cc.target, cc.dopts.authOptions) + // Adjust timeout for the current try. + if cc.dopts.Timeout > 0 { + cc.dopts.Timeout -= time.Since(start) + if cc.dopts.Timeout <= 0 { + return ErrClientConnTimeout + } + } + newTransport, err := transport.NewClientTransport(cc.target, cc.dopts) if err != nil { - // TODO(zhaoq): Record the error with glog.V. + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + return ErrClientConnTimeout + } closeTransport = false time.Sleep(backoff(retries)) retries++ + // TODO(zhaoq): Record the error with glog.V. log.Printf("grpc: ClientConn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, cc.target) continue } diff --git a/credentials/credentials.go b/credentials/credentials.go index b55ec752..e7ffc709 100644 --- a/credentials/credentials.go +++ b/credentials/credentials.go @@ -71,9 +71,15 @@ type Credentials interface { // TransportAuthenticator defines the common interface all supported transport // authentication protocols (e.g., TLS, SSL) must implement. type TransportAuthenticator interface { - // Dial connects to the given network address and does the authentication - // handshake specified by the corresponding authentication protocol. - Dial(addr string) (net.Conn, error) + // Dial connects to the given network address using net.Dial and then + // does the authentication handshake specified by the corresponding + // authentication protocol. + Dial(network, addr string) (net.Conn, error) + // DialWithDialer connects to the given network address using + // dialer.Dialand does the authentication handshake specified by the + // corresponding authentication protocol. Any timeout or deadline + // given in the dialer apply to connection and handshake as a whole. + DialWithDialer(dialer *net.Dialer, network, addr string) (net.Conn, error) // NewListener creates a listener which accepts connections with requested // authentication handshake. NewListener(lis net.Listener) net.Listener @@ -103,8 +109,7 @@ func (c *tlsCreds) GetRequestMetadata(ctx context.Context) (map[string]string, e return nil, nil } -// Dial connects to addr and performs TLS handshake. -func (c *tlsCreds) Dial(addr string) (_ net.Conn, err error) { +func (c *tlsCreds) DialWithDialer(dialer *net.Dialer, network, addr string) (_ net.Conn, err error) { name := c.serverName if name == "" { name, _, err = net.SplitHostPort(addr) @@ -112,13 +117,18 @@ func (c *tlsCreds) Dial(addr string) (_ net.Conn, err error) { return nil, fmt.Errorf("credentials: failed to parse server address %v", err) } } - return tls.Dial("tcp", addr, &tls.Config{ + return tls.DialWithDialer(dialer, "tcp", addr, &tls.Config{ RootCAs: c.rootCAs, NextProtos: alpnProtoStr, ServerName: name, }) } +// Dial connects to addr and performs TLS handshake. +func (c *tlsCreds) Dial(network, addr string) (_ net.Conn, err error) { + return c.DialWithDialer(new(net.Dialer), network, addr) +} + // NewListener creates a net.Listener with a TLS configuration constructed // from the information in tlsCreds. func (c *tlsCreds) NewListener(lis net.Listener) net.Listener { diff --git a/test/end2end_test.go b/test/end2end_test.go index 5453b37f..7e59e140 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -193,6 +193,16 @@ func (s *testServer) HalfDuplexCall(stream testpb.TestService_HalfDuplexCallServ const tlsDir = "testdata/" +func TestDialTimeout(t *testing.T) { + conn, err := grpc.Dial("Non-Existent.Server:80", grpc.WithTimeout(time.Millisecond)) + if err == nil { + conn.Close() + } + if err != grpc.ErrClientConnTimeout { + t.Fatalf("grpc.Dial(_, _) = %v, %v, want %v", conn, err, grpc.ErrClientConnTimeout) + } +} + func setUp(useTLS bool, maxStream uint32) (s *grpc.Server, tc testpb.TestServiceClient) { lis, err := net.Listen("tcp", ":0") if err != nil { diff --git a/transport/http2_client.go b/transport/http2_client.go index 6eab9894..791a7cb5 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -96,29 +96,38 @@ type http2Client struct { // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2 // and starts to receive messages on it. Non-nil error returns if construction // fails. -func newHTTP2Client(addr string, authOpts []credentials.Credentials) (_ ClientTransport, err error) { +func newHTTP2Client(addr string, opts DialOptions) (_ ClientTransport, err error) { var ( connErr error conn net.Conn ) scheme := "http" // TODO(zhaoq): Use DialTimeout instead. - for _, c := range authOpts { + for _, c := range opts.AuthOptions { if ccreds, ok := c.(credentials.TransportAuthenticator); ok { scheme = "https" // TODO(zhaoq): Now the first TransportAuthenticator is used if there are // multiple ones provided. Revisit this if it is not appropriate. Probably // place the ClientTransport construction into a separate function to make // things clear. - conn, connErr = ccreds.Dial(addr) + if opts.Timeout > 0 { + dialer := &net.Dialer{Timeout: opts.Timeout} + conn, connErr = ccreds.DialWithDialer(dialer, "tcp", addr) + } else { + conn, connErr = ccreds.Dial("tcp", addr) + } break } } if scheme == "http" { - conn, connErr = net.Dial("tcp", addr) + if opts.Timeout > 0 { + conn, connErr = net.DialTimeout("tcp", addr, opts.Timeout) + } else { + conn, connErr = net.Dial("tcp", addr) + } } if connErr != nil { - return nil, ConnectionErrorf("transport: %v", connErr) + return nil, connErr } defer func() { if err != nil { @@ -155,7 +164,7 @@ func newHTTP2Client(addr string, authOpts []credentials.Credentials) (_ ClientTr state: reachable, activeStreams: make(map[uint32]*Stream), maxStreams: math.MaxUint32, - authCreds: authOpts, + authCreds: opts.AuthOptions, } go t.controller() t.writableChan <- 0 diff --git a/transport/transport.go b/transport/transport.go index 406d581f..fced4a82 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -44,6 +44,7 @@ import ( "io" "net" "sync" + "time" "golang.org/x/net/context" "google.golang.org/grpc/codes" @@ -310,10 +311,17 @@ func NewServerTransport(protocol string, conn net.Conn, maxStreams uint32) (Serv return newHTTP2Server(conn, maxStreams) } -// NewClientTransport establishes the transport with the required protocol +// DialOptions covers all relevant options for dial a client connection. +type DialOptions struct { + Protocol string + AuthOptions []credentials.Credentials + Timeout time.Duration +} + +// NewClientTransport establishes the transport with the required DialOptions // and returns it to the caller. -func NewClientTransport(protocol, target string, authOpts []credentials.Credentials) (ClientTransport, error) { - return newHTTP2Client(target, authOpts) +func NewClientTransport(target string, opts DialOptions) (ClientTransport, error) { + return newHTTP2Client(target, opts) } // Options provides additional hints and information for message diff --git a/transport/transport_test.go b/transport/transport_test.go index e768f7a5..31fae8b9 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -181,9 +181,12 @@ func setUp(t *testing.T, useTLS bool, port int, maxStreams uint32, suspend bool) if err != nil { t.Fatalf("Failed to create credentials %v", err) } - ct, connErr = NewClientTransport("http2", addr, []credentials.Credentials{creds}) + dopts := DialOptions { + AuthOptions: []credentials.Credentials{creds}, + } + ct, connErr = NewClientTransport(addr, dopts) } else { - ct, connErr = NewClientTransport("http2", addr, nil) + ct, connErr = NewClientTransport(addr, DialOptions{}) } if connErr != nil { t.Fatalf("failed to create transport: %v", connErr) From fa215c2d4ba5dbff206308d2970f3d7f827a5808 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Tue, 3 Mar 2015 17:10:14 -0800 Subject: [PATCH 02/18] gofmt -w --- transport/transport_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transport/transport_test.go b/transport/transport_test.go index 31fae8b9..8a032ee6 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -181,7 +181,7 @@ func setUp(t *testing.T, useTLS bool, port int, maxStreams uint32, suspend bool) if err != nil { t.Fatalf("Failed to create credentials %v", err) } - dopts := DialOptions { + dopts := DialOptions{ AuthOptions: []credentials.Credentials{creds}, } ct, connErr = NewClientTransport(addr, dopts) From 351e2d02977f20df4393a6e5747420361ad59770 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Tue, 3 Mar 2015 19:04:29 -0800 Subject: [PATCH 03/18] add more tests for dial timeout and fix some bugs --- clientconn.go | 44 +++++++++++++++++++++++++++++++------------- test/end2end_test.go | 39 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 69 insertions(+), 14 deletions(-) diff --git a/clientconn.go b/clientconn.go index 596bb704..47383796 100644 --- a/clientconn.go +++ b/clientconn.go @@ -52,7 +52,7 @@ var ( // the session is closing. ErrClientConnClosing = errors.New("grpc: the client connection is closing") // ErrClientConnTimeout indicates that the connection could not be - // established within the specified timeout. + // established or re-established within the specified timeout. ErrClientConnTimeout = errors.New("grpc: timed out trying to connect") ) @@ -112,7 +112,8 @@ type ClientConn struct { shutdownChan chan struct{} mu sync.Mutex - // Is closed and becomes nil when a new transport is up. + // ready is closed and becomes nil when a new transport is up or failed + // due to timeout. ready chan struct{} // Indicates the ClientConn is under destruction. closing bool @@ -141,19 +142,28 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { t.Close() } // Adjust timeout for the current try. - if cc.dopts.Timeout > 0 { - cc.dopts.Timeout -= time.Since(start) - if cc.dopts.Timeout <= 0 { + dopts := cc.dopts + if dopts.Timeout > 0 { + dopts.Timeout -= time.Since(start) + if dopts.Timeout <= 0 { + cc.Close() return ErrClientConnTimeout } } - newTransport, err := transport.NewClientTransport(cc.target, cc.dopts) + newTransport, err := transport.NewClientTransport(cc.target, dopts) if err != nil { if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + cc.Close() + return ErrClientConnTimeout + } + sleepTime := backoff(retries) + // Fail early before falling into sleep. + if dopts.Timeout > 0 && dopts.Timeout < sleepTime + time.Since(start) { + cc.Close() return ErrClientConnTimeout } closeTransport = false - time.Sleep(backoff(retries)) + time.Sleep(sleepTime) retries++ // TODO(zhaoq): Record the error with glog.V. log.Printf("grpc: ClientConn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, cc.target) @@ -183,6 +193,8 @@ func (cc *ClientConn) transportMonitor() { case <-cc.transport.Error(): if err := cc.resetTransport(true); err != nil { // The channel is closing. + // TODO(zhaoq): Record the error with glog.V. + log.Printf("grpc: transport exits due to %v", err) return } continue @@ -214,24 +226,30 @@ func (cc *ClientConn) wait(ctx context.Context, ts int) (transport.ClientTranspo select { case <-ctx.Done(): return nil, 0, transport.ContextErr(ctx.Err()) - // Wait until the new transport is ready. + // Wait until the new transport is ready or failed. case <-ready: } } } } -// Close starts to tear down the ClientConn. +// Close starts to tear down the ClientConn. Returns ErrClientConnClosing if +// it has been closed (mostly due to dial time-out). // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in // some edge cases (e.g., the caller opens and closes many ClientConn's in a // tight loop. -func (cc *ClientConn) Close() { +func (cc *ClientConn) Close() error { cc.mu.Lock() defer cc.mu.Unlock() if cc.closing { - return + return ErrClientConnClosing } cc.closing = true - cc.transport.Close() - close(cc.shutdownChan) + if cc.transport != nil { + cc.transport.Close() + } + if cc.shutdownChan != nil { + close(cc.shutdownChan) + } + return nil } diff --git a/test/end2end_test.go b/test/end2end_test.go index 7e59e140..5d9000cc 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -203,6 +203,43 @@ func TestDialTimeout(t *testing.T) { } } +func TestTLSDialTimeout(t *testing.T) { + creds, err := credentials.NewClientTLSFromFile(tlsDir+"ca.pem", "x.test.youtube.com") + if err != nil { + t.Fatalf("Failed to create credentials %v", err) + } + conn, err := grpc.Dial("Non-Existent.Server:80", grpc.WithTransportCredentials(creds), grpc.WithTimeout(time.Millisecond)) + if err == nil { + conn.Close() + } + if err != grpc.ErrClientConnTimeout { + t.Fatalf("grpc.Dial(_, _) = %v, %v, want %v", conn, err, grpc.ErrClientConnTimeout) + } +} + +func TestReconnectTimeout(t *testing.T) { + lis, err := net.Listen("tcp", ":0") + if err != nil { + t.Fatalf("Failed to listen: %v", err) + } + _, port, err := net.SplitHostPort(lis.Addr().String()) + if err != nil { + t.Fatalf("Failed to parse listener address: %v", err) + } + addr := "localhost:" + port + timeOut := time.Second + conn, err := grpc.Dial(addr, grpc.WithTimeout(timeOut)) + if err != nil { + t.Fatalf("Failed to dial to the server %q: %v", addr, err) + } + lis.Close() + // Sleep till reconnect times out. + time.Sleep(2 * timeOut) + if err := conn.Close(); err != grpc.ErrClientConnClosing { + t.Fatalf("%v.Close() = %v, want %v", conn, err, grpc.ErrClientConnClosing) + } +} + func setUp(useTLS bool, maxStream uint32) (s *grpc.Server, tc testpb.TestServiceClient) { lis, err := net.Listen("tcp", ":0") if err != nil { @@ -341,7 +378,7 @@ func TestRetry(t *testing.T) { } // TODO(zhaoq): Have a better test coverage of timeout and cancellation mechanism. -func TestTimeout(t *testing.T) { +func TestRPCTimeout(t *testing.T) { s, tc := setUp(true, math.MaxUint32) defer s.Stop() argSize := 2718 From c73e40b80415718430918539f26c861760fc4d64 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 4 Mar 2015 11:15:10 -0800 Subject: [PATCH 04/18] address some review comments --- clientconn.go | 7 +++---- transport/http2_client.go | 2 +- transport/transport.go | 4 ++-- transport/transport_test.go | 4 ++-- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/clientconn.go b/clientconn.go index 47383796..3a44d432 100644 --- a/clientconn.go +++ b/clientconn.go @@ -75,8 +75,7 @@ func WithPerRPCCredentials(creds credentials.Credentials) DialOption { } } -// WithTimeout returns a DialOption which configures a timeout for dialing a -// client connection. +// WithTimeout returns a DialOption that configures a timeout for dialing a client connection. func WithTimeout(d time.Duration) DialOption { return func(o *transport.DialOptions) { o.Timeout = d @@ -150,7 +149,7 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { return ErrClientConnTimeout } } - newTransport, err := transport.NewClientTransport(cc.target, dopts) + newTransport, err := transport.NewClientTransport(cc.target, &dopts) if err != nil { if netErr, ok := err.(net.Error); ok && netErr.Timeout() { cc.Close() @@ -194,7 +193,7 @@ func (cc *ClientConn) transportMonitor() { if err := cc.resetTransport(true); err != nil { // The channel is closing. // TODO(zhaoq): Record the error with glog.V. - log.Printf("grpc: transport exits due to %v", err) + log.Printf("grpc: ClientConn.transportMonitor exits due to: %v", err) return } continue diff --git a/transport/http2_client.go b/transport/http2_client.go index 791a7cb5..9740edb5 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -96,7 +96,7 @@ type http2Client struct { // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2 // and starts to receive messages on it. Non-nil error returns if construction // fails. -func newHTTP2Client(addr string, opts DialOptions) (_ ClientTransport, err error) { +func newHTTP2Client(addr string, opts *DialOptions) (_ ClientTransport, err error) { var ( connErr error conn net.Conn diff --git a/transport/transport.go b/transport/transport.go index fced4a82..5529a3b7 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -311,7 +311,7 @@ func NewServerTransport(protocol string, conn net.Conn, maxStreams uint32) (Serv return newHTTP2Server(conn, maxStreams) } -// DialOptions covers all relevant options for dial a client connection. +// DialOptions covers all relevant options for dialing a server. type DialOptions struct { Protocol string AuthOptions []credentials.Credentials @@ -320,7 +320,7 @@ type DialOptions struct { // NewClientTransport establishes the transport with the required DialOptions // and returns it to the caller. -func NewClientTransport(target string, opts DialOptions) (ClientTransport, error) { +func NewClientTransport(target string, opts *DialOptions) (ClientTransport, error) { return newHTTP2Client(target, opts) } diff --git a/transport/transport_test.go b/transport/transport_test.go index 8a032ee6..bfd2c54c 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -184,9 +184,9 @@ func setUp(t *testing.T, useTLS bool, port int, maxStreams uint32, suspend bool) dopts := DialOptions{ AuthOptions: []credentials.Credentials{creds}, } - ct, connErr = NewClientTransport(addr, dopts) + ct, connErr = NewClientTransport(addr, &dopts) } else { - ct, connErr = NewClientTransport(addr, DialOptions{}) + ct, connErr = NewClientTransport(addr, &DialOptions{}) } if connErr != nil { t.Fatalf("failed to create transport: %v", connErr) From 28ad38be178ffb27029b21c7b3a1f8f9d3e97e98 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 4 Mar 2015 13:00:47 -0800 Subject: [PATCH 05/18] close ready when ClientConn closes --- clientconn.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/clientconn.go b/clientconn.go index 3a44d432..10efbd73 100644 --- a/clientconn.go +++ b/clientconn.go @@ -244,6 +244,10 @@ func (cc *ClientConn) Close() error { return ErrClientConnClosing } cc.closing = true + if cc.ready != nil { + close(cc.ready) + cc.ready = nil + } if cc.transport != nil { cc.transport.Close() } From 8304bc408ed1cef22f2b13b1a76899f649d1d266 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 4 Mar 2015 13:20:21 -0800 Subject: [PATCH 06/18] addressed some comments --- credentials/credentials.go | 2 +- transport/http2_client.go | 9 ++------- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/credentials/credentials.go b/credentials/credentials.go index e7ffc709..e876bb5e 100644 --- a/credentials/credentials.go +++ b/credentials/credentials.go @@ -76,7 +76,7 @@ type TransportAuthenticator interface { // authentication protocol. Dial(network, addr string) (net.Conn, error) // DialWithDialer connects to the given network address using - // dialer.Dialand does the authentication handshake specified by the + // dialer.Dial does the authentication handshake specified by the // corresponding authentication protocol. Any timeout or deadline // given in the dialer apply to connection and handshake as a whole. DialWithDialer(dialer *net.Dialer, network, addr string) (net.Conn, error) diff --git a/transport/http2_client.go b/transport/http2_client.go index 9740edb5..5a3c1939 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -102,7 +102,6 @@ func newHTTP2Client(addr string, opts *DialOptions) (_ ClientTransport, err erro conn net.Conn ) scheme := "http" - // TODO(zhaoq): Use DialTimeout instead. for _, c := range opts.AuthOptions { if ccreds, ok := c.(credentials.TransportAuthenticator); ok { scheme = "https" @@ -110,12 +109,8 @@ func newHTTP2Client(addr string, opts *DialOptions) (_ ClientTransport, err erro // multiple ones provided. Revisit this if it is not appropriate. Probably // place the ClientTransport construction into a separate function to make // things clear. - if opts.Timeout > 0 { - dialer := &net.Dialer{Timeout: opts.Timeout} - conn, connErr = ccreds.DialWithDialer(dialer, "tcp", addr) - } else { - conn, connErr = ccreds.Dial("tcp", addr) - } + dialer := &net.Dialer{Timeout: opts.Timeout} + conn, connErr = ccreds.DialWithDialer(dialer, "tcp", addr) break } } From 8d7b4ade871fc3346822a50cf2421e822a4bbcfe Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 4 Mar 2015 13:23:39 -0800 Subject: [PATCH 07/18] addressed some comments --- clientconn.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/clientconn.go b/clientconn.go index 10efbd73..85bf35e0 100644 --- a/clientconn.go +++ b/clientconn.go @@ -142,6 +142,10 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { } // Adjust timeout for the current try. dopts := cc.dopts + if dopts.Timeout < 0 { + cc.Close() + return ErrClientConnTimeout + } if dopts.Timeout > 0 { dopts.Timeout -= time.Since(start) if dopts.Timeout <= 0 { From 09fbfbf5539b3dd432ff68dcbbb552846bb9f0d1 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 4 Mar 2015 13:29:15 -0800 Subject: [PATCH 08/18] addressed some comments --- clientconn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clientconn.go b/clientconn.go index 85bf35e0..f10399bc 100644 --- a/clientconn.go +++ b/clientconn.go @@ -161,7 +161,7 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { } sleepTime := backoff(retries) // Fail early before falling into sleep. - if dopts.Timeout > 0 && dopts.Timeout < sleepTime + time.Since(start) { + if dopts.Timeout > 0 && cc.dopts.Timeout < sleepTime + time.Since(start) { cc.Close() return ErrClientConnTimeout } From 082bf01a4fa70e6fd407a4b064676c313bfe33ab Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 4 Mar 2015 13:32:58 -0800 Subject: [PATCH 09/18] addressed some comments --- transport/http2_client.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/transport/http2_client.go b/transport/http2_client.go index 5a3c1939..58b9ec78 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -109,8 +109,7 @@ func newHTTP2Client(addr string, opts *DialOptions) (_ ClientTransport, err erro // multiple ones provided. Revisit this if it is not appropriate. Probably // place the ClientTransport construction into a separate function to make // things clear. - dialer := &net.Dialer{Timeout: opts.Timeout} - conn, connErr = ccreds.DialWithDialer(dialer, "tcp", addr) + conn, connErr = ccreds.DialWithDialer(&net.Dialer{Timeout: opts.Timeout}, "tcp", addr) break } } From c33e9f7fa4357332bf8793c36fe92adb1a760836 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 4 Mar 2015 14:25:57 -0800 Subject: [PATCH 10/18] addressed some comments --- transport/http2_client.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/transport/http2_client.go b/transport/http2_client.go index 58b9ec78..169ab8c1 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -114,11 +114,7 @@ func newHTTP2Client(addr string, opts *DialOptions) (_ ClientTransport, err erro } } if scheme == "http" { - if opts.Timeout > 0 { - conn, connErr = net.DialTimeout("tcp", addr, opts.Timeout) - } else { - conn, connErr = net.Dial("tcp", addr) - } + conn, connErr = net.DialTimeout("tcp", addr, opts.Timeout) } if connErr != nil { return nil, connErr From 7bf60c1cd438a4ab7542700dae188c9f3a02c4cc Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 4 Mar 2015 14:37:12 -0800 Subject: [PATCH 11/18] addressed some comments --- clientconn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clientconn.go b/clientconn.go index f10399bc..d8d3fe05 100644 --- a/clientconn.go +++ b/clientconn.go @@ -161,7 +161,7 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { } sleepTime := backoff(retries) // Fail early before falling into sleep. - if dopts.Timeout > 0 && cc.dopts.Timeout < sleepTime + time.Since(start) { + if cc.dopts.Timeout > 0 && cc.dopts.Timeout < sleepTime + time.Since(start) { cc.Close() return ErrClientConnTimeout } From 91a7361187100c4642a3a3d8441fc502086ebf0d Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 4 Mar 2015 15:14:52 -0800 Subject: [PATCH 12/18] address some comments --- clientconn.go | 4 ---- transport/http2_client.go | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/clientconn.go b/clientconn.go index d8d3fe05..8440af4b 100644 --- a/clientconn.go +++ b/clientconn.go @@ -155,10 +155,6 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { } newTransport, err := transport.NewClientTransport(cc.target, &dopts) if err != nil { - if netErr, ok := err.(net.Error); ok && netErr.Timeout() { - cc.Close() - return ErrClientConnTimeout - } sleepTime := backoff(retries) // Fail early before falling into sleep. if cc.dopts.Timeout > 0 && cc.dopts.Timeout < sleepTime + time.Since(start) { diff --git a/transport/http2_client.go b/transport/http2_client.go index 169ab8c1..54008e7c 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -117,7 +117,7 @@ func newHTTP2Client(addr string, opts *DialOptions) (_ ClientTransport, err erro conn, connErr = net.DialTimeout("tcp", addr, opts.Timeout) } if connErr != nil { - return nil, connErr + return nil, ConnectionErrorf("transport: %v", connErr) } defer func() { if err != nil { From 10f29ffc09dd4e5692580bec15f0e6db97597062 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 4 Mar 2015 18:16:53 -0800 Subject: [PATCH 13/18] fix a function comment --- rpc_util.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rpc_util.go b/rpc_util.go index 8a76c8a8..30f9ccb8 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -189,7 +189,7 @@ func Code(err error) codes.Code { } // Errorf returns an error containing an error code and a description; -// CodeOf extracts the Code. +// Code extracts the Code. // Errorf returns nil if c is OK. func Errorf(c codes.Code, format string, a ...interface{}) error { if c == codes.OK { From 59cf05b4f5d7f07b2bcd5c779c1e1b6a995cee4d Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 5 Mar 2015 01:46:30 -0800 Subject: [PATCH 14/18] fix a comment --- rpc_util.go | 1 - 1 file changed, 1 deletion(-) diff --git a/rpc_util.go b/rpc_util.go index 30f9ccb8..5874be21 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -189,7 +189,6 @@ func Code(err error) codes.Code { } // Errorf returns an error containing an error code and a description; -// Code extracts the Code. // Errorf returns nil if c is OK. func Errorf(c codes.Code, format string, a ...interface{}) error { if c == codes.OK { From aae62e60510434a48f98bb04df211ce64715709a Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 5 Mar 2015 01:52:17 -0800 Subject: [PATCH 15/18] remove unused import --- clientconn.go | 1 - 1 file changed, 1 deletion(-) diff --git a/clientconn.go b/clientconn.go index 8440af4b..69b6cd83 100644 --- a/clientconn.go +++ b/clientconn.go @@ -36,7 +36,6 @@ package grpc import ( "errors" "log" - "net" "sync" "time" From 5c27dd6a7a2cdd162926b3bd811ff0ddd934aafb Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 5 Mar 2015 09:45:50 -0800 Subject: [PATCH 16/18] fix a bug --- clientconn.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/clientconn.go b/clientconn.go index 69b6cd83..bfb0ff4b 100644 --- a/clientconn.go +++ b/clientconn.go @@ -168,6 +168,12 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { continue } cc.mu.Lock() + if cc.closing { + // cc.Close() has been invoked. + cc.mu.Unlock() + newTransport.Close() + return ErrClientConnClosing + } cc.transport = newTransport cc.transportSeq = ts + 1 if cc.ready != nil { From 57c1951dc91495d16c3eba719463f4160cb228c3 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 5 Mar 2015 13:56:48 -0800 Subject: [PATCH 17/18] tightened some rpcErr loose ends and revised some tests --- call.go | 6 +++--- test/end2end_test.go | 20 ++++++++++++++++++-- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/call.go b/call.go index 5099846c..10bd8a54 100644 --- a/call.go +++ b/call.go @@ -137,15 +137,15 @@ func Invoke(ctx context.Context, method string, args, reply proto.Message, cc *C ) // TODO(zhaoq): Need a formal spec of retry strategy for non-failfast rpcs. if lastErr != nil && c.failFast { - return lastErr + return toRPCErr(lastErr) } t, ts, err = cc.wait(ctx, ts) if err != nil { if lastErr != nil { // This was a retry; return the error from the last attempt. - return lastErr + return toRPCErr(lastErr) } - return err + return Errorf(codes.Internal, "%v", err) } stream, err = sendRPC(ctx, callHdr, t, args, topts) if err != nil { diff --git a/test/end2end_test.go b/test/end2end_test.go index 5d9000cc..3bbc4e40 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -233,8 +233,24 @@ func TestReconnectTimeout(t *testing.T) { t.Fatalf("Failed to dial to the server %q: %v", addr, err) } lis.Close() - // Sleep till reconnect times out. - time.Sleep(2 * timeOut) + tc := testpb.NewTestServiceClient(conn) + waitC := make(chan struct{}) + go func() { + defer close(waitC) + argSize := 271828 + respSize := 314159 + req := &testpb.SimpleRequest{ + ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(), + ResponseSize: proto.Int32(int32(respSize)), + Payload: newPayload(testpb.PayloadType_COMPRESSABLE, int32(argSize)), + } + _, err := tc.UnaryCall(context.Background(), req) + if err != grpc.Errorf(codes.Internal, "%v", grpc.ErrClientConnClosing) { + t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, %v", err, grpc.Errorf(codes.Internal, "%v", grpc.ErrClientConnClosing)) + } + }() + // Block untill reconnect times out. + <-waitC if err := conn.Close(); err != grpc.ErrClientConnClosing { t.Fatalf("%v.Close() = %v, want %v", conn, err, grpc.ErrClientConnClosing) } From 1182c6e87c67f7cac0b0235d4701f9713beac5ae Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 5 Mar 2015 14:24:32 -0800 Subject: [PATCH 18/18] minor polish --- test/end2end_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 3bbc4e40..006f952d 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -227,8 +227,7 @@ func TestReconnectTimeout(t *testing.T) { t.Fatalf("Failed to parse listener address: %v", err) } addr := "localhost:" + port - timeOut := time.Second - conn, err := grpc.Dial(addr, grpc.WithTimeout(timeOut)) + conn, err := grpc.Dial(addr, grpc.WithTimeout(time.Second)) if err != nil { t.Fatalf("Failed to dial to the server %q: %v", addr, err) }