Change errors returned by ac.wait()

This commit is contained in:
Menghan Li
2016-08-16 14:16:42 -07:00
parent cd5721c02f
commit c72b08a774
3 changed files with 33 additions and 29 deletions

@ -170,9 +170,9 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
if _, ok := err.(*rpcError); ok {
return err
}
if err == errConnClosing {
if err == errConnClosing || err == errConnUnavailable {
if c.failFast {
return Errorf(codes.Unavailable, "%v", errConnClosing)
return Errorf(codes.Unavailable, "%v", err)
}
continue
}

@ -73,6 +73,8 @@ var (
errConnDrain = errors.New("grpc: the connection is drained")
// errConnClosing indicates that the connection is closing.
errConnClosing = errors.New("grpc: the connection is closing")
// errConnUnavailable indicates that the connection is unavailable.
errConnUnavailable = errors.New("grpc: the connection is unavailable")
errNoAddr = errors.New("grpc: there is no address available to dial")
// minimum time to give a connection to complete
minConnectTimeout = 20 * time.Second
@ -501,11 +503,7 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions)
}
return nil, nil, errConnClosing
}
// ac.wait should block on transient failure only if balancer is nil and RPC is non-failfast.
// - If RPC is failfast, ac.wait should not block.
// - If balancer is not nil, ac.wait should return errConnClosing on transient failure
// so that non-failfast RPCs will try to get a new transport instead of waiting on ac.
t, err := ac.wait(ctx, cc.dopts.balancer == nil && opts.BlockingWait)
t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait)
if err != nil {
if put != nil {
put()
@ -757,23 +755,30 @@ func (ac *addrConn) transportMonitor() {
}
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
// iv) transport is in TransientFailure and blocking is false.
func (ac *addrConn) wait(ctx context.Context, blocking bool) (transport.ClientTransport, error) {
// iv) transport is in TransientFailure and there's no balancer/failfast is true.
func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
for {
ac.mu.Lock()
switch {
case ac.state == Shutdown:
if failfast || !hasBalancer {
// RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
err := ac.tearDownErr
ac.mu.Unlock()
return nil, err
}
ac.mu.Unlock()
return nil, errConnClosing
case ac.state == Ready:
ct := ac.transport
ac.mu.Unlock()
return ct, nil
case ac.state == TransientFailure && !blocking:
case ac.state == TransientFailure:
if failfast || hasBalancer {
ac.mu.Unlock()
return nil, errConnClosing
default:
return nil, errConnUnavailable
}
}
ready := ac.ready
if ready == nil {
ready = make(chan struct{})
@ -788,7 +793,6 @@ func (ac *addrConn) wait(ctx context.Context, blocking bool) (transport.ClientTr
}
}
}
}
// tearDown starts to tear down the addrConn.
// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in

@ -146,9 +146,9 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
if _, ok := err.(*rpcError); ok {
return nil, err
}
if err == errConnClosing {
if err == errConnClosing || err == errConnUnavailable {
if c.failFast {
return nil, Errorf(codes.Unavailable, "%v", errConnClosing)
return nil, Errorf(codes.Unavailable, "%v", err)
}
continue
}