diff --git a/clientconn.go b/clientconn.go index d9f7077b..610eb84a 100644 --- a/clientconn.go +++ b/clientconn.go @@ -146,6 +146,7 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) { } if cc.dopts.block { if err := cc.resetTransport(false); err != nil { + cc.Close() return nil, err } // Start to monitor the error status of transport. @@ -155,6 +156,7 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) { go func() { if err := cc.resetTransport(false); err != nil { grpclog.Printf("Failed to dial %s: %v; please retry.", target, err) + cc.Close() return } go cc.transportMonitor() @@ -163,6 +165,22 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) { return cc, nil } +// ConnectivityState indicates the state of a client connection. +type ConnectivityState int + +const ( + // Idle indicates the ClientConn is idle. + Idle ConnectivityState = iota + // Connecting indicates the ClienConn is connecting. + Connecting + // Ready indicates the ClientConn is ready for work. + Ready + // TransientFailure indicates the ClientConn has seen a failure but expects to recover. + TransientFailure + // Shutdown indicates the CLientConn has stated shutting down. + Shutdown +) + // ClientConn represents a client connection to an RPC service. type ClientConn struct { target string @@ -170,12 +188,11 @@ type ClientConn struct { dopts dialOptions shutdownChan chan struct{} - mu sync.Mutex + mu sync.Mutex + state ConnectivityState // ready is closed and becomes nil when a new transport is up or failed // due to timeout. ready chan struct{} - // Indicates the ClientConn is under destruction. - closing bool // Every time a new transport is created, this is incremented by 1. Used // to avoid trying to recreate a transport while the new one is already // under construction. @@ -188,11 +205,12 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { start := time.Now() for { cc.mu.Lock() + cc.state = Connecting t := cc.transport ts := cc.transportSeq // Avoid wait() picking up a dying transport unnecessarily. cc.transportSeq = 0 - if cc.closing { + if cc.state == Shutdown { cc.mu.Unlock() return ErrClientConnClosing } @@ -224,6 +242,9 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { connectTime := time.Now() newTransport, err := transport.NewClientTransport(cc.target, &copts) if err != nil { + cc.mu.Lock() + cc.state = TransientFailure + cc.mu.Unlock() sleepTime -= time.Since(connectTime) if sleepTime < 0 { sleepTime = 0 @@ -240,12 +261,13 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { continue } cc.mu.Lock() - if cc.closing { + if cc.state == Shutdown { // cc.Close() has been invoked. cc.mu.Unlock() newTransport.Close() return ErrClientConnClosing } + cc.state = Ready cc.transport = newTransport cc.transportSeq = ts + 1 if cc.ready != nil { @@ -262,13 +284,13 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { func (cc *ClientConn) transportMonitor() { for { select { - // shutdownChan is needed to detect the channel teardown when + // shutdownChan is needed to detect the teardown when // the ClientConn is idle (i.e., no RPC in flight). case <-cc.shutdownChan: return case <-cc.transport.Error(): if err := cc.resetTransport(true); err != nil { - // The channel is closing. + // The ClientConn is closing. grpclog.Printf("grpc: ClientConn.transportMonitor exits due to: %v", err) return } @@ -284,7 +306,7 @@ func (cc *ClientConn) wait(ctx context.Context, ts int) (transport.ClientTranspo for { cc.mu.Lock() switch { - case cc.closing: + case cc.state == Shutdown: cc.mu.Unlock() return nil, 0, ErrClientConnClosing case ts < cc.transportSeq: @@ -316,10 +338,10 @@ func (cc *ClientConn) wait(ctx context.Context, ts int) (transport.ClientTranspo func (cc *ClientConn) Close() error { cc.mu.Lock() defer cc.mu.Unlock() - if cc.closing { + if cc.state == Shutdown { return ErrClientConnClosing } - cc.closing = true + cc.state = Shutdown if cc.ready != nil { close(cc.ready) cc.ready = nil