@ -146,6 +146,7 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
|
|||||||
}
|
}
|
||||||
if cc.dopts.block {
|
if cc.dopts.block {
|
||||||
if err := cc.resetTransport(false); err != nil {
|
if err := cc.resetTransport(false); err != nil {
|
||||||
|
cc.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// Start to monitor the error status of transport.
|
// Start to monitor the error status of transport.
|
||||||
@ -155,6 +156,7 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
|
|||||||
go func() {
|
go func() {
|
||||||
if err := cc.resetTransport(false); err != nil {
|
if err := cc.resetTransport(false); err != nil {
|
||||||
grpclog.Printf("Failed to dial %s: %v; please retry.", target, err)
|
grpclog.Printf("Failed to dial %s: %v; please retry.", target, err)
|
||||||
|
cc.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
go cc.transportMonitor()
|
go cc.transportMonitor()
|
||||||
@ -163,6 +165,22 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
|
|||||||
return cc, nil
|
return cc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ConnectivityState indicates the state of a client connection.
|
||||||
|
type ConnectivityState int
|
||||||
|
|
||||||
|
const (
|
||||||
|
// Idle indicates the ClientConn is idle.
|
||||||
|
Idle ConnectivityState = iota
|
||||||
|
// Connecting indicates the ClienConn is connecting.
|
||||||
|
Connecting
|
||||||
|
// Ready indicates the ClientConn is ready for work.
|
||||||
|
Ready
|
||||||
|
// TransientFailure indicates the ClientConn has seen a failure but expects to recover.
|
||||||
|
TransientFailure
|
||||||
|
// Shutdown indicates the ClientConn has stated shutting down.
|
||||||
|
Shutdown
|
||||||
|
)
|
||||||
|
|
||||||
// ClientConn represents a client connection to an RPC service.
|
// ClientConn represents a client connection to an RPC service.
|
||||||
type ClientConn struct {
|
type ClientConn struct {
|
||||||
target string
|
target string
|
||||||
@ -170,12 +188,11 @@ type ClientConn struct {
|
|||||||
dopts dialOptions
|
dopts dialOptions
|
||||||
shutdownChan chan struct{}
|
shutdownChan chan struct{}
|
||||||
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
|
state ConnectivityState
|
||||||
// ready is closed and becomes nil when a new transport is up or failed
|
// ready is closed and becomes nil when a new transport is up or failed
|
||||||
// due to timeout.
|
// due to timeout.
|
||||||
ready chan struct{}
|
ready chan struct{}
|
||||||
// Indicates the ClientConn is under destruction.
|
|
||||||
closing bool
|
|
||||||
// Every time a new transport is created, this is incremented by 1. Used
|
// Every time a new transport is created, this is incremented by 1. Used
|
||||||
// to avoid trying to recreate a transport while the new one is already
|
// to avoid trying to recreate a transport while the new one is already
|
||||||
// under construction.
|
// under construction.
|
||||||
@ -188,11 +205,12 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error {
|
|||||||
start := time.Now()
|
start := time.Now()
|
||||||
for {
|
for {
|
||||||
cc.mu.Lock()
|
cc.mu.Lock()
|
||||||
|
cc.state = Connecting
|
||||||
t := cc.transport
|
t := cc.transport
|
||||||
ts := cc.transportSeq
|
ts := cc.transportSeq
|
||||||
// Avoid wait() picking up a dying transport unnecessarily.
|
// Avoid wait() picking up a dying transport unnecessarily.
|
||||||
cc.transportSeq = 0
|
cc.transportSeq = 0
|
||||||
if cc.closing {
|
if cc.state == Shutdown {
|
||||||
cc.mu.Unlock()
|
cc.mu.Unlock()
|
||||||
return ErrClientConnClosing
|
return ErrClientConnClosing
|
||||||
}
|
}
|
||||||
@ -224,6 +242,9 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error {
|
|||||||
connectTime := time.Now()
|
connectTime := time.Now()
|
||||||
newTransport, err := transport.NewClientTransport(cc.target, &copts)
|
newTransport, err := transport.NewClientTransport(cc.target, &copts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
cc.mu.Lock()
|
||||||
|
cc.state = TransientFailure
|
||||||
|
cc.mu.Unlock()
|
||||||
sleepTime -= time.Since(connectTime)
|
sleepTime -= time.Since(connectTime)
|
||||||
if sleepTime < 0 {
|
if sleepTime < 0 {
|
||||||
sleepTime = 0
|
sleepTime = 0
|
||||||
@ -240,12 +261,13 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
cc.mu.Lock()
|
cc.mu.Lock()
|
||||||
if cc.closing {
|
if cc.state == Shutdown {
|
||||||
// cc.Close() has been invoked.
|
// cc.Close() has been invoked.
|
||||||
cc.mu.Unlock()
|
cc.mu.Unlock()
|
||||||
newTransport.Close()
|
newTransport.Close()
|
||||||
return ErrClientConnClosing
|
return ErrClientConnClosing
|
||||||
}
|
}
|
||||||
|
cc.state = Ready
|
||||||
cc.transport = newTransport
|
cc.transport = newTransport
|
||||||
cc.transportSeq = ts + 1
|
cc.transportSeq = ts + 1
|
||||||
if cc.ready != nil {
|
if cc.ready != nil {
|
||||||
@ -262,13 +284,13 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error {
|
|||||||
func (cc *ClientConn) transportMonitor() {
|
func (cc *ClientConn) transportMonitor() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
// shutdownChan is needed to detect the channel teardown when
|
// shutdownChan is needed to detect the teardown when
|
||||||
// the ClientConn is idle (i.e., no RPC in flight).
|
// the ClientConn is idle (i.e., no RPC in flight).
|
||||||
case <-cc.shutdownChan:
|
case <-cc.shutdownChan:
|
||||||
return
|
return
|
||||||
case <-cc.transport.Error():
|
case <-cc.transport.Error():
|
||||||
if err := cc.resetTransport(true); err != nil {
|
if err := cc.resetTransport(true); err != nil {
|
||||||
// The channel is closing.
|
// The ClientConn is closing.
|
||||||
grpclog.Printf("grpc: ClientConn.transportMonitor exits due to: %v", err)
|
grpclog.Printf("grpc: ClientConn.transportMonitor exits due to: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -284,7 +306,7 @@ func (cc *ClientConn) wait(ctx context.Context, ts int) (transport.ClientTranspo
|
|||||||
for {
|
for {
|
||||||
cc.mu.Lock()
|
cc.mu.Lock()
|
||||||
switch {
|
switch {
|
||||||
case cc.closing:
|
case cc.state == Shutdown:
|
||||||
cc.mu.Unlock()
|
cc.mu.Unlock()
|
||||||
return nil, 0, ErrClientConnClosing
|
return nil, 0, ErrClientConnClosing
|
||||||
case ts < cc.transportSeq:
|
case ts < cc.transportSeq:
|
||||||
@ -316,10 +338,10 @@ func (cc *ClientConn) wait(ctx context.Context, ts int) (transport.ClientTranspo
|
|||||||
func (cc *ClientConn) Close() error {
|
func (cc *ClientConn) Close() error {
|
||||||
cc.mu.Lock()
|
cc.mu.Lock()
|
||||||
defer cc.mu.Unlock()
|
defer cc.mu.Unlock()
|
||||||
if cc.closing {
|
if cc.state == Shutdown {
|
||||||
return ErrClientConnClosing
|
return ErrClientConnClosing
|
||||||
}
|
}
|
||||||
cc.closing = true
|
cc.state = Shutdown
|
||||||
if cc.ready != nil {
|
if cc.ready != nil {
|
||||||
close(cc.ready)
|
close(cc.ready)
|
||||||
cc.ready = nil
|
cc.ready = nil
|
||||||
|
Reference in New Issue
Block a user