Add ClientConn state

This commit is contained in:
iamqizhao
2015-07-30 15:30:26 -07:00
parent 689884fbd0
commit 76ef365255

View File

@ -146,6 +146,7 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
} }
if cc.dopts.block { if cc.dopts.block {
if err := cc.resetTransport(false); err != nil { if err := cc.resetTransport(false); err != nil {
cc.Close()
return nil, err return nil, err
} }
// Start to monitor the error status of transport. // Start to monitor the error status of transport.
@ -155,6 +156,7 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
go func() { go func() {
if err := cc.resetTransport(false); err != nil { if err := cc.resetTransport(false); err != nil {
grpclog.Printf("Failed to dial %s: %v; please retry.", target, err) grpclog.Printf("Failed to dial %s: %v; please retry.", target, err)
cc.Close()
return return
} }
go cc.transportMonitor() go cc.transportMonitor()
@ -163,6 +165,22 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return cc, nil return cc, nil
} }
// ConnectivityState indicates the state of a client connection.
type ConnectivityState int
const (
// Idle indicates the ClientConn is idle.
Idle ConnectivityState = iota
// Connecting indicates the ClienConn is connecting.
Connecting
// Ready indicates the ClientConn is ready for work.
Ready
// TransientFailure indicates the ClientConn has seen a failure but expects to recover.
TransientFailure
// Shutdown indicates the CLientConn has stated shutting down.
Shutdown
)
// ClientConn represents a client connection to an RPC service. // ClientConn represents a client connection to an RPC service.
type ClientConn struct { type ClientConn struct {
target string target string
@ -170,12 +188,11 @@ type ClientConn struct {
dopts dialOptions dopts dialOptions
shutdownChan chan struct{} shutdownChan chan struct{}
mu sync.Mutex mu sync.Mutex
state ConnectivityState
// ready is closed and becomes nil when a new transport is up or failed // ready is closed and becomes nil when a new transport is up or failed
// due to timeout. // due to timeout.
ready chan struct{} ready chan struct{}
// Indicates the ClientConn is under destruction.
closing bool
// Every time a new transport is created, this is incremented by 1. Used // Every time a new transport is created, this is incremented by 1. Used
// to avoid trying to recreate a transport while the new one is already // to avoid trying to recreate a transport while the new one is already
// under construction. // under construction.
@ -188,11 +205,12 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error {
start := time.Now() start := time.Now()
for { for {
cc.mu.Lock() cc.mu.Lock()
cc.state = Connecting
t := cc.transport t := cc.transport
ts := cc.transportSeq ts := cc.transportSeq
// Avoid wait() picking up a dying transport unnecessarily. // Avoid wait() picking up a dying transport unnecessarily.
cc.transportSeq = 0 cc.transportSeq = 0
if cc.closing { if cc.state == Shutdown {
cc.mu.Unlock() cc.mu.Unlock()
return ErrClientConnClosing return ErrClientConnClosing
} }
@ -224,6 +242,9 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error {
connectTime := time.Now() connectTime := time.Now()
newTransport, err := transport.NewClientTransport(cc.target, &copts) newTransport, err := transport.NewClientTransport(cc.target, &copts)
if err != nil { if err != nil {
cc.mu.Lock()
cc.state = TransientFailure
cc.mu.Unlock()
sleepTime -= time.Since(connectTime) sleepTime -= time.Since(connectTime)
if sleepTime < 0 { if sleepTime < 0 {
sleepTime = 0 sleepTime = 0
@ -240,12 +261,13 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error {
continue continue
} }
cc.mu.Lock() cc.mu.Lock()
if cc.closing { if cc.state == Shutdown {
// cc.Close() has been invoked. // cc.Close() has been invoked.
cc.mu.Unlock() cc.mu.Unlock()
newTransport.Close() newTransport.Close()
return ErrClientConnClosing return ErrClientConnClosing
} }
cc.state = Ready
cc.transport = newTransport cc.transport = newTransport
cc.transportSeq = ts + 1 cc.transportSeq = ts + 1
if cc.ready != nil { if cc.ready != nil {
@ -262,13 +284,13 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error {
func (cc *ClientConn) transportMonitor() { func (cc *ClientConn) transportMonitor() {
for { for {
select { select {
// shutdownChan is needed to detect the channel teardown when // shutdownChan is needed to detect the teardown when
// the ClientConn is idle (i.e., no RPC in flight). // the ClientConn is idle (i.e., no RPC in flight).
case <-cc.shutdownChan: case <-cc.shutdownChan:
return return
case <-cc.transport.Error(): case <-cc.transport.Error():
if err := cc.resetTransport(true); err != nil { if err := cc.resetTransport(true); err != nil {
// The channel is closing. // The ClientConn is closing.
grpclog.Printf("grpc: ClientConn.transportMonitor exits due to: %v", err) grpclog.Printf("grpc: ClientConn.transportMonitor exits due to: %v", err)
return return
} }
@ -284,7 +306,7 @@ func (cc *ClientConn) wait(ctx context.Context, ts int) (transport.ClientTranspo
for { for {
cc.mu.Lock() cc.mu.Lock()
switch { switch {
case cc.closing: case cc.state == Shutdown:
cc.mu.Unlock() cc.mu.Unlock()
return nil, 0, ErrClientConnClosing return nil, 0, ErrClientConnClosing
case ts < cc.transportSeq: case ts < cc.transportSeq:
@ -316,10 +338,10 @@ func (cc *ClientConn) wait(ctx context.Context, ts int) (transport.ClientTranspo
func (cc *ClientConn) Close() error { func (cc *ClientConn) Close() error {
cc.mu.Lock() cc.mu.Lock()
defer cc.mu.Unlock() defer cc.mu.Unlock()
if cc.closing { if cc.state == Shutdown {
return ErrClientConnClosing return ErrClientConnClosing
} }
cc.closing = true cc.state = Shutdown
if cc.ready != nil { if cc.ready != nil {
close(cc.ready) close(cc.ready)
cc.ready = nil cc.ready = nil