From 76ef3652552545e83d64a2ce03da2c6e15fb521e Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 30 Jul 2015 15:30:26 -0700 Subject: [PATCH 1/2] Add ClientConn state --- clientconn.go | 42 ++++++++++++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/clientconn.go b/clientconn.go index d9f7077b..610eb84a 100644 --- a/clientconn.go +++ b/clientconn.go @@ -146,6 +146,7 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) { } if cc.dopts.block { if err := cc.resetTransport(false); err != nil { + cc.Close() return nil, err } // Start to monitor the error status of transport. @@ -155,6 +156,7 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) { go func() { if err := cc.resetTransport(false); err != nil { grpclog.Printf("Failed to dial %s: %v; please retry.", target, err) + cc.Close() return } go cc.transportMonitor() @@ -163,6 +165,22 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) { return cc, nil } +// ConnectivityState indicates the state of a client connection. +type ConnectivityState int + +const ( + // Idle indicates the ClientConn is idle. + Idle ConnectivityState = iota + // Connecting indicates the ClienConn is connecting. + Connecting + // Ready indicates the ClientConn is ready for work. + Ready + // TransientFailure indicates the ClientConn has seen a failure but expects to recover. + TransientFailure + // Shutdown indicates the CLientConn has stated shutting down. + Shutdown +) + // ClientConn represents a client connection to an RPC service. type ClientConn struct { target string @@ -170,12 +188,11 @@ type ClientConn struct { dopts dialOptions shutdownChan chan struct{} - mu sync.Mutex + mu sync.Mutex + state ConnectivityState // ready is closed and becomes nil when a new transport is up or failed // due to timeout. ready chan struct{} - // Indicates the ClientConn is under destruction. - closing bool // Every time a new transport is created, this is incremented by 1. Used // to avoid trying to recreate a transport while the new one is already // under construction. @@ -188,11 +205,12 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { start := time.Now() for { cc.mu.Lock() + cc.state = Connecting t := cc.transport ts := cc.transportSeq // Avoid wait() picking up a dying transport unnecessarily. cc.transportSeq = 0 - if cc.closing { + if cc.state == Shutdown { cc.mu.Unlock() return ErrClientConnClosing } @@ -224,6 +242,9 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { connectTime := time.Now() newTransport, err := transport.NewClientTransport(cc.target, &copts) if err != nil { + cc.mu.Lock() + cc.state = TransientFailure + cc.mu.Unlock() sleepTime -= time.Since(connectTime) if sleepTime < 0 { sleepTime = 0 @@ -240,12 +261,13 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { continue } cc.mu.Lock() - if cc.closing { + if cc.state == Shutdown { // cc.Close() has been invoked. cc.mu.Unlock() newTransport.Close() return ErrClientConnClosing } + cc.state = Ready cc.transport = newTransport cc.transportSeq = ts + 1 if cc.ready != nil { @@ -262,13 +284,13 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { func (cc *ClientConn) transportMonitor() { for { select { - // shutdownChan is needed to detect the channel teardown when + // shutdownChan is needed to detect the teardown when // the ClientConn is idle (i.e., no RPC in flight). case <-cc.shutdownChan: return case <-cc.transport.Error(): if err := cc.resetTransport(true); err != nil { - // The channel is closing. + // The ClientConn is closing. grpclog.Printf("grpc: ClientConn.transportMonitor exits due to: %v", err) return } @@ -284,7 +306,7 @@ func (cc *ClientConn) wait(ctx context.Context, ts int) (transport.ClientTranspo for { cc.mu.Lock() switch { - case cc.closing: + case cc.state == Shutdown: cc.mu.Unlock() return nil, 0, ErrClientConnClosing case ts < cc.transportSeq: @@ -316,10 +338,10 @@ func (cc *ClientConn) wait(ctx context.Context, ts int) (transport.ClientTranspo func (cc *ClientConn) Close() error { cc.mu.Lock() defer cc.mu.Unlock() - if cc.closing { + if cc.state == Shutdown { return ErrClientConnClosing } - cc.closing = true + cc.state = Shutdown if cc.ready != nil { close(cc.ready) cc.ready = nil From f680d0b77dd48906b3357f130d9565fa7cc79705 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 30 Jul 2015 15:36:59 -0700 Subject: [PATCH 2/2] fix a typo --- clientconn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clientconn.go b/clientconn.go index 610eb84a..6d77a033 100644 --- a/clientconn.go +++ b/clientconn.go @@ -177,7 +177,7 @@ const ( Ready // TransientFailure indicates the ClientConn has seen a failure but expects to recover. TransientFailure - // Shutdown indicates the CLientConn has stated shutting down. + // Shutdown indicates the ClientConn has stated shutting down. Shutdown )