From 8e6533ee6e5fa65bf0487bffaec8769f247cf079 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Wed, 30 Jan 2019 10:56:23 -0800 Subject: [PATCH] client: clean up v1 balancer wrapper error handling (#2511) --- balancer/balancer.go | 7 +++-- balancer_v1_wrapper.go | 65 ++++++++++++++++++++++++++---------------- picker_wrapper.go | 9 ++---- test/end2end_test.go | 8 ++++-- 4 files changed, 53 insertions(+), 36 deletions(-) diff --git a/balancer/balancer.go b/balancer/balancer.go index 1c0df149..67518de9 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -227,9 +227,10 @@ type Picker interface { // - Else (error is other non-nil error): // - The RPC will fail with unavailable error. // - // The returned done() function will be called once the rpc has finished, with the - // final status of that RPC. - // done may be nil if balancer doesn't care about the RPC status. + // The returned done() function will be called once the rpc has finished, + // with the final status of that RPC. If the SubConn returned is not a + // valid SubConn type, done may not be called. done may be nil if balancer + // doesn't care about the RPC status. Pick(ctx context.Context, opts PickOptions) (conn SubConn, done func(DoneInfo), err error) } diff --git a/balancer_v1_wrapper.go b/balancer_v1_wrapper.go index 42b60fea..29bda635 100644 --- a/balancer_v1_wrapper.go +++ b/balancer_v1_wrapper.go @@ -281,9 +281,8 @@ func (bw *balancerWrapper) Close() { } // The picker is the balancerWrapper itself. -// Pick should never return ErrNoSubConnAvailable. // It either blocks or returns error, consistent with v1 balancer Get(). -func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) { +func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions) (sc balancer.SubConn, done func(balancer.DoneInfo), err error) { failfast := true // Default failfast is true. if ss, ok := rpcInfoFromContext(ctx); ok { failfast = ss.failfast @@ -292,35 +291,51 @@ func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions) if err != nil { return nil, nil, err } - var done func(balancer.DoneInfo) if p != nil { - done = func(i balancer.DoneInfo) { p() } + done = func(balancer.DoneInfo) { p() } + defer func() { + if err != nil { + p() + } + }() } - var sc balancer.SubConn + bw.mu.Lock() defer bw.mu.Unlock() if bw.pickfirst { // Get the first sc in conns. - for _, sc = range bw.conns { - break - } - } else { - var ok bool - sc, ok = bw.conns[resolver.Address{ - Addr: a.Addr, - Type: resolver.Backend, - ServerName: "", - Metadata: a.Metadata, - }] - if !ok && failfast { - return nil, nil, balancer.ErrTransientFailure - } - if s, ok := bw.connSt[sc]; failfast && (!ok || s.s != connectivity.Ready) { - // If the returned sc is not ready and RPC is failfast, - // return error, and this RPC will fail. - return nil, nil, balancer.ErrTransientFailure + for _, sc := range bw.conns { + return sc, done, nil } + return nil, nil, balancer.ErrNoSubConnAvailable + } + sc, ok1 := bw.conns[resolver.Address{ + Addr: a.Addr, + Type: resolver.Backend, + ServerName: "", + Metadata: a.Metadata, + }] + s, ok2 := bw.connSt[sc] + if !ok1 || !ok2 { + // This can only happen due to a race where Get() returned an address + // that was subsequently removed by Notify. In this case we should + // retry always. + return nil, nil, balancer.ErrNoSubConnAvailable + } + switch s.s { + case connectivity.Ready, connectivity.Idle: + return sc, done, nil + case connectivity.Shutdown, connectivity.TransientFailure: + // If the returned sc has been shut down or is in transient failure, + // return error, and this RPC will fail or wait for another picker (if + // non-failfast). + return nil, nil, balancer.ErrTransientFailure + default: + // For other states (connecting or unknown), the v1 balancer would + // traditionally wait until ready and then issue the RPC. Returning + // ErrNoSubConnAvailable will be a slight improvement in that it will + // allow the balancer to choose another address in case others are + // connected. + return nil, nil, balancer.ErrNoSubConnAvailable } - - return sc, done, nil } diff --git a/picker_wrapper.go b/picker_wrapper.go index 14f915d6..672c14b1 100644 --- a/picker_wrapper.go +++ b/picker_wrapper.go @@ -101,10 +101,7 @@ func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) f // - the subConn returned by the current picker is not READY // When one of these situations happens, pick blocks until the picker gets updated. func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.PickOptions) (transport.ClientTransport, func(balancer.DoneInfo), error) { - var ( - p balancer.Picker - ch chan struct{} - ) + var ch chan struct{} for { bp.mu.Lock() @@ -130,7 +127,7 @@ func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer. } ch = bp.blockingCh - p = bp.picker + p := bp.picker bp.mu.Unlock() subConn, done, err := p.Pick(ctx, opts) @@ -152,7 +149,7 @@ func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer. acw, ok := subConn.(*acBalancerWrapper) if !ok { - grpclog.Infof("subconn returned from pick is not *acBalancerWrapper") + grpclog.Error("subconn returned from pick is not *acBalancerWrapper") continue } if t, ok := acw.getAddrConn().getReadyTransport(); ok { diff --git a/test/end2end_test.go b/test/end2end_test.go index 98ead4f8..d6509f3d 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -1060,7 +1060,9 @@ func testServerGoAway(t *testing.T, e env) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) // Finish an RPC to make sure the connection is good. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, ", err) } ch := make(chan struct{}) @@ -1078,7 +1080,9 @@ func testServerGoAway(t *testing.T, e env) { cancel() } // A new RPC should fail. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.Unavailable && status.Code(err) != codes.Internal { + ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable && status.Code(err) != codes.Internal { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s or %s", err, codes.Unavailable, codes.Internal) } <-ch