grpc: move to TransientFailure in pick_first LB policy when all addresses are removed (#5274)

This commit is contained in:
Easwar Swaminathan
2022-03-29 15:06:28 -07:00
committed by GitHub
parent 474948a978
commit b6873c006d
4 changed files with 154 additions and 134 deletions

View File

@ -97,7 +97,7 @@ func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
bp := newPickerWrapper() bp := newPickerWrapper()
var finishedCount uint64 var finishedCount uint64
bp.updatePicker(&testingPicker{err: balancer.ErrNoSubConnAvailable, maxCalled: goroutineCount}) bp.updatePicker(&testingPicker{err: balancer.ErrNoSubConnAvailable, maxCalled: goroutineCount})
// All goroutines should block because picker returns no sc available. // All goroutines should block because picker returns no subConn available.
for i := goroutineCount; i > 0; i-- { for i := goroutineCount; i > 0; i-- {
go func() { go func() {
if tr, _, err := bp.pick(context.Background(), true, balancer.PickInfo{}); err != nil || tr != testT { if tr, _, err := bp.pick(context.Background(), true, balancer.PickInfo{}); err != nil || tr != testT {
@ -138,7 +138,7 @@ func (s) TestBlockingPickSCNotReady(t *testing.T) {
bp := newPickerWrapper() bp := newPickerWrapper()
bp.updatePicker(&testingPicker{sc: testSCNotReady, maxCalled: goroutineCount}) bp.updatePicker(&testingPicker{sc: testSCNotReady, maxCalled: goroutineCount})
var finishedCount uint64 var finishedCount uint64
// All goroutines should block because sc is not ready. // All goroutines should block because subConn is not ready.
for i := goroutineCount; i > 0; i-- { for i := goroutineCount; i > 0; i-- {
go func() { go func() {
if tr, _, err := bp.pick(context.Background(), true, balancer.PickInfo{}); err != nil || tr != testT { if tr, _, err := bp.pick(context.Background(), true, balancer.PickInfo{}); err != nil || tr != testT {

View File

@ -44,79 +44,107 @@ func (*pickfirstBuilder) Name() string {
} }
type pickfirstBalancer struct { type pickfirstBalancer struct {
state connectivity.State state connectivity.State
cc balancer.ClientConn cc balancer.ClientConn
sc balancer.SubConn subConn balancer.SubConn
} }
func (b *pickfirstBalancer) ResolverError(err error) { func (b *pickfirstBalancer) ResolverError(err error) {
switch b.state {
case connectivity.TransientFailure, connectivity.Idle, connectivity.Connecting:
// Set a failing picker if we don't have a good picker.
b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)},
})
}
if logger.V(2) { if logger.V(2) {
logger.Infof("pickfirstBalancer: ResolverError called with error %v", err) logger.Infof("pickfirstBalancer: ResolverError called with error %v", err)
} }
if b.subConn == nil {
b.state = connectivity.TransientFailure
}
if b.state != connectivity.TransientFailure {
// The picker will not change since the balancer does not currently
// report an error.
return
}
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)},
})
} }
func (b *pickfirstBalancer) UpdateClientConnState(cs balancer.ClientConnState) error { func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
if len(cs.ResolverState.Addresses) == 0 { if len(state.ResolverState.Addresses) == 0 {
// The resolver reported an empty address list. Treat it like an error by
// calling b.ResolverError.
if b.subConn != nil {
// Remove the old subConn. All addresses were removed, so it is no longer
// valid.
b.cc.RemoveSubConn(b.subConn)
b.subConn = nil
}
b.ResolverError(errors.New("produced zero addresses")) b.ResolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState return balancer.ErrBadResolverState
} }
if b.sc == nil {
var err error if b.subConn != nil {
b.sc, err = b.cc.NewSubConn(cs.ResolverState.Addresses, balancer.NewSubConnOptions{}) b.cc.UpdateAddresses(b.subConn, state.ResolverState.Addresses)
if err != nil { return nil
if logger.V(2) {
logger.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)
}
b.state = connectivity.TransientFailure
b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: fmt.Errorf("error creating connection: %v", err)},
})
return balancer.ErrBadResolverState
}
b.state = connectivity.Idle
b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Idle, Picker: &picker{result: balancer.PickResult{SubConn: b.sc}}})
b.sc.Connect()
} else {
b.cc.UpdateAddresses(b.sc, cs.ResolverState.Addresses)
b.sc.Connect()
} }
subConn, err := b.cc.NewSubConn(state.ResolverState.Addresses, balancer.NewSubConnOptions{})
if err != nil {
if logger.V(2) {
logger.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)
}
b.state = connectivity.TransientFailure
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: fmt.Errorf("error creating connection: %v", err)},
})
return balancer.ErrBadResolverState
}
b.subConn = subConn
b.state = connectivity.Idle
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Idle,
Picker: &picker{result: balancer.PickResult{SubConn: b.subConn}},
})
b.subConn.Connect()
return nil return nil
} }
func (b *pickfirstBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) { func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
if logger.V(2) { if logger.V(2) {
logger.Infof("pickfirstBalancer: UpdateSubConnState: %p, %v", sc, s) logger.Infof("pickfirstBalancer: UpdateSubConnState: %p, %v", subConn, state)
} }
if b.sc != sc { if b.subConn != subConn {
if logger.V(2) { if logger.V(2) {
logger.Infof("pickfirstBalancer: ignored state change because sc is not recognized") logger.Infof("pickfirstBalancer: ignored state change because subConn is not recognized")
} }
return return
} }
b.state = s.ConnectivityState b.state = state.ConnectivityState
if s.ConnectivityState == connectivity.Shutdown { if state.ConnectivityState == connectivity.Shutdown {
b.sc = nil b.subConn = nil
return return
} }
switch s.ConnectivityState { switch state.ConnectivityState {
case connectivity.Ready: case connectivity.Ready:
b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{result: balancer.PickResult{SubConn: sc}}}) b.cc.UpdateState(balancer.State{
ConnectivityState: state.ConnectivityState,
Picker: &picker{result: balancer.PickResult{SubConn: subConn}},
})
case connectivity.Connecting: case connectivity.Connecting:
b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{err: balancer.ErrNoSubConnAvailable}}) b.cc.UpdateState(balancer.State{
ConnectivityState: state.ConnectivityState,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
case connectivity.Idle: case connectivity.Idle:
b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &idlePicker{sc: sc}}) b.cc.UpdateState(balancer.State{
ConnectivityState: state.ConnectivityState,
Picker: &idlePicker{subConn: subConn},
})
case connectivity.TransientFailure: case connectivity.TransientFailure:
b.cc.UpdateState(balancer.State{ b.cc.UpdateState(balancer.State{
ConnectivityState: s.ConnectivityState, ConnectivityState: state.ConnectivityState,
Picker: &picker{err: s.ConnectionError}, Picker: &picker{err: state.ConnectionError},
}) })
} }
} }
@ -125,8 +153,8 @@ func (b *pickfirstBalancer) Close() {
} }
func (b *pickfirstBalancer) ExitIdle() { func (b *pickfirstBalancer) ExitIdle() {
if b.sc != nil && b.state == connectivity.Idle { if b.subConn != nil && b.state == connectivity.Idle {
b.sc.Connect() b.subConn.Connect()
} }
} }
@ -135,18 +163,18 @@ type picker struct {
err error err error
} }
func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
return p.result, p.err return p.result, p.err
} }
// idlePicker is used when the SubConn is IDLE and kicks the SubConn into // idlePicker is used when the SubConn is IDLE and kicks the SubConn into
// CONNECTING when Pick is called. // CONNECTING when Pick is called.
type idlePicker struct { type idlePicker struct {
sc balancer.SubConn subConn balancer.SubConn
} }
func (i *idlePicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
i.sc.Connect() i.subConn.Connect()
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
} }

View File

@ -32,7 +32,6 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/attributes" "google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity" "google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
@ -683,87 +682,6 @@ func (s) TestServersSwap(t *testing.T) {
} }
} }
// TestEmptyAddrs verifies client behavior when a working connection is
// removed. In pick first and round-robin, both will continue using the old
// connections.
func (s) TestEmptyAddrs(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Initialize server
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
}
s := grpc.NewServer()
defer s.Stop()
const one = "1"
ts := &funcServer{
unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{Username: one}, nil
},
}
testpb.RegisterTestServiceServer(s, ts)
go s.Serve(lis)
// Initialize pickfirst client
pfr := manual.NewBuilderWithScheme("whatever")
pfr.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}})
pfcc, err := grpc.DialContext(ctx, pfr.Scheme()+":///", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(pfr))
if err != nil {
t.Fatalf("Error creating client: %v", err)
}
defer pfcc.Close()
pfclient := testpb.NewTestServiceClient(pfcc)
// Confirm we are connected to the server
if res, err := pfclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil || res.Username != one {
t.Fatalf("UnaryCall(_) = %v, %v; want {Username: %q}, nil", res, err, one)
}
// Remove all addresses.
pfr.UpdateState(resolver.State{})
// Initialize roundrobin client
rrr := manual.NewBuilderWithScheme("whatever")
rrr.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}})
rrcc, err := grpc.DialContext(ctx, rrr.Scheme()+":///", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(rrr),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, roundrobin.Name)))
if err != nil {
t.Fatalf("Error creating client: %v", err)
}
defer rrcc.Close()
rrclient := testpb.NewTestServiceClient(rrcc)
// Confirm we are connected to the server
if res, err := rrclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil || res.Username != one {
t.Fatalf("UnaryCall(_) = %v, %v; want {Username: %q}, nil", res, err, one)
}
// Remove all addresses.
rrr.UpdateState(resolver.State{})
// Confirm several new RPCs succeed on pick first.
for i := 0; i < 10; i++ {
if _, err := pfclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
}
time.Sleep(5 * time.Millisecond)
}
// Confirm several new RPCs succeed on round robin.
for i := 0; i < 10; i++ {
if _, err := pfclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
}
time.Sleep(5 * time.Millisecond)
}
}
func (s) TestWaitForReady(t *testing.T) { func (s) TestWaitForReady(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()

View File

@ -26,7 +26,9 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/peer" "google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver"
@ -43,6 +45,11 @@ const pickFirstServiceConfig = `{"loadBalancingConfig": [{"pick_first":{}}]}`
// with service config specifying the use of the pick_first LB policy. // with service config specifying the use of the pick_first LB policy.
func setupPickFirst(t *testing.T, backendCount int, opts ...grpc.DialOption) (*grpc.ClientConn, *manual.Resolver, []*stubserver.StubServer) { func setupPickFirst(t *testing.T, backendCount int, opts ...grpc.DialOption) (*grpc.ClientConn, *manual.Resolver, []*stubserver.StubServer) {
t.Helper() t.Helper()
// Initialize channelz. Used to determine pending RPC count.
czCleanup := channelz.NewChannelzStorageForTesting()
t.Cleanup(func() { czCleanupWrapper(czCleanup, t) })
r := manual.NewBuilderWithScheme("whatever") r := manual.NewBuilderWithScheme("whatever")
backends := make([]*stubserver.StubServer, backendCount) backends := make([]*stubserver.StubServer, backendCount)
@ -258,3 +265,70 @@ func (s) TestPickFirst_AddressesRemoved(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
} }
// TestPickFirst_NewAddressWhileBlocking tests the case where pick_first is
// configured on a channel, things are working as expected and then a resolver
// updates removes all addresses. An RPC attempted at this point in time will be
// blocked because there are no valid backends. This test verifies that when new
// backends are added, the RPC is able to complete.
func (s) TestPickFirst_NewAddressWhileBlocking(t *testing.T) {
cc, r, backends := setupPickFirst(t, 2)
addrs := backendsToAddrs(backends)
r.UpdateState(resolver.State{Addresses: addrs})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := checkPickFirst(ctx, cc, addrs[0].Addr); err != nil {
t.Fatal(err)
}
// Send a resolver update with no addresses. This should push the channel into
// TransientFailure.
r.UpdateState(resolver.State{})
for state := cc.GetState(); state != connectivity.TransientFailure; state = cc.GetState() {
if !cc.WaitForStateChange(ctx, state) {
t.Fatalf("timeout waiting for state change. got %v; want %v", state, connectivity.TransientFailure)
}
}
doneCh := make(chan struct{})
client := testpb.NewTestServiceClient(cc)
go func() {
// The channel is currently in TransientFailure and this RPC will block
// until the channel becomes Ready, which will only happen when we push a
// resolver update with a valid backend address.
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Errorf("EmptyCall() = %v, want <nil>", err)
}
close(doneCh)
}()
// Make sure that there is one pending RPC on the ClientConn before attempting
// to push new addresses through the name resolver. If we don't do this, the
// resolver update can happen before the above goroutine gets to make the RPC.
for {
if err := ctx.Err(); err != nil {
t.Fatal(err)
}
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
t.Fatalf("there should only be one top channel, not %d", len(tcs))
}
started := tcs[0].ChannelData.CallsStarted
completed := tcs[0].ChannelData.CallsSucceeded + tcs[0].ChannelData.CallsFailed
if (started - completed) == 1 {
break
}
time.Sleep(defaultTestShortTimeout)
}
// Send a resolver update with a valid backend to push the channel to Ready
// and unblock the above RPC.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backends[0].Address}}})
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for blocked RPC to complete")
case <-doneCh:
}
}