client: clean up v1 balancer wrapper error handling (#2511)
This commit is contained in:
@ -227,9 +227,10 @@ type Picker interface {
|
||||
// - Else (error is other non-nil error):
|
||||
// - The RPC will fail with unavailable error.
|
||||
//
|
||||
// The returned done() function will be called once the rpc has finished, with the
|
||||
// final status of that RPC.
|
||||
// done may be nil if balancer doesn't care about the RPC status.
|
||||
// The returned done() function will be called once the rpc has finished,
|
||||
// with the final status of that RPC. If the SubConn returned is not a
|
||||
// valid SubConn type, done may not be called. done may be nil if balancer
|
||||
// doesn't care about the RPC status.
|
||||
Pick(ctx context.Context, opts PickOptions) (conn SubConn, done func(DoneInfo), err error)
|
||||
}
|
||||
|
||||
|
@ -281,9 +281,8 @@ func (bw *balancerWrapper) Close() {
|
||||
}
|
||||
|
||||
// The picker is the balancerWrapper itself.
|
||||
// Pick should never return ErrNoSubConnAvailable.
|
||||
// It either blocks or returns error, consistent with v1 balancer Get().
|
||||
func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
|
||||
func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions) (sc balancer.SubConn, done func(balancer.DoneInfo), err error) {
|
||||
failfast := true // Default failfast is true.
|
||||
if ss, ok := rpcInfoFromContext(ctx); ok {
|
||||
failfast = ss.failfast
|
||||
@ -292,35 +291,51 @@ func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
var done func(balancer.DoneInfo)
|
||||
if p != nil {
|
||||
done = func(i balancer.DoneInfo) { p() }
|
||||
done = func(balancer.DoneInfo) { p() }
|
||||
defer func() {
|
||||
if err != nil {
|
||||
p()
|
||||
}
|
||||
var sc balancer.SubConn
|
||||
}()
|
||||
}
|
||||
|
||||
bw.mu.Lock()
|
||||
defer bw.mu.Unlock()
|
||||
if bw.pickfirst {
|
||||
// Get the first sc in conns.
|
||||
for _, sc = range bw.conns {
|
||||
break
|
||||
for _, sc := range bw.conns {
|
||||
return sc, done, nil
|
||||
}
|
||||
} else {
|
||||
var ok bool
|
||||
sc, ok = bw.conns[resolver.Address{
|
||||
return nil, nil, balancer.ErrNoSubConnAvailable
|
||||
}
|
||||
sc, ok1 := bw.conns[resolver.Address{
|
||||
Addr: a.Addr,
|
||||
Type: resolver.Backend,
|
||||
ServerName: "",
|
||||
Metadata: a.Metadata,
|
||||
}]
|
||||
if !ok && failfast {
|
||||
return nil, nil, balancer.ErrTransientFailure
|
||||
s, ok2 := bw.connSt[sc]
|
||||
if !ok1 || !ok2 {
|
||||
// This can only happen due to a race where Get() returned an address
|
||||
// that was subsequently removed by Notify. In this case we should
|
||||
// retry always.
|
||||
return nil, nil, balancer.ErrNoSubConnAvailable
|
||||
}
|
||||
if s, ok := bw.connSt[sc]; failfast && (!ok || s.s != connectivity.Ready) {
|
||||
// If the returned sc is not ready and RPC is failfast,
|
||||
// return error, and this RPC will fail.
|
||||
return nil, nil, balancer.ErrTransientFailure
|
||||
}
|
||||
}
|
||||
|
||||
switch s.s {
|
||||
case connectivity.Ready, connectivity.Idle:
|
||||
return sc, done, nil
|
||||
case connectivity.Shutdown, connectivity.TransientFailure:
|
||||
// If the returned sc has been shut down or is in transient failure,
|
||||
// return error, and this RPC will fail or wait for another picker (if
|
||||
// non-failfast).
|
||||
return nil, nil, balancer.ErrTransientFailure
|
||||
default:
|
||||
// For other states (connecting or unknown), the v1 balancer would
|
||||
// traditionally wait until ready and then issue the RPC. Returning
|
||||
// ErrNoSubConnAvailable will be a slight improvement in that it will
|
||||
// allow the balancer to choose another address in case others are
|
||||
// connected.
|
||||
return nil, nil, balancer.ErrNoSubConnAvailable
|
||||
}
|
||||
}
|
||||
|
@ -101,10 +101,7 @@ func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) f
|
||||
// - the subConn returned by the current picker is not READY
|
||||
// When one of these situations happens, pick blocks until the picker gets updated.
|
||||
func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.PickOptions) (transport.ClientTransport, func(balancer.DoneInfo), error) {
|
||||
var (
|
||||
p balancer.Picker
|
||||
ch chan struct{}
|
||||
)
|
||||
var ch chan struct{}
|
||||
|
||||
for {
|
||||
bp.mu.Lock()
|
||||
@ -130,7 +127,7 @@ func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.
|
||||
}
|
||||
|
||||
ch = bp.blockingCh
|
||||
p = bp.picker
|
||||
p := bp.picker
|
||||
bp.mu.Unlock()
|
||||
|
||||
subConn, done, err := p.Pick(ctx, opts)
|
||||
@ -152,7 +149,7 @@ func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.
|
||||
|
||||
acw, ok := subConn.(*acBalancerWrapper)
|
||||
if !ok {
|
||||
grpclog.Infof("subconn returned from pick is not *acBalancerWrapper")
|
||||
grpclog.Error("subconn returned from pick is not *acBalancerWrapper")
|
||||
continue
|
||||
}
|
||||
if t, ok := acw.getAddrConn().getReadyTransport(); ok {
|
||||
|
@ -1060,7 +1060,9 @@ func testServerGoAway(t *testing.T, e env) {
|
||||
cc := te.clientConn()
|
||||
tc := testpb.NewTestServiceClient(cc)
|
||||
// Finish an RPC to make sure the connection is good.
|
||||
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
|
||||
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
|
||||
}
|
||||
ch := make(chan struct{})
|
||||
@ -1078,7 +1080,9 @@ func testServerGoAway(t *testing.T, e env) {
|
||||
cancel()
|
||||
}
|
||||
// A new RPC should fail.
|
||||
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.Unavailable && status.Code(err) != codes.Internal {
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable && status.Code(err) != codes.Internal {
|
||||
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s or %s", err, codes.Unavailable, codes.Internal)
|
||||
}
|
||||
<-ch
|
||||
|
Reference in New Issue
Block a user