balancer: change roundrobin to accept empty address list (#3491)

Roundrobin will remove all SubConns. The ClientConn will set SubConn state change to shutdown, and the overall state will turn transient failure.
This commit is contained in:
Menghan Li
2020-04-03 09:47:59 -07:00
committed by GitHub
parent fe1d8e7181
commit aedb1362d5
3 changed files with 136 additions and 26 deletions

View File

@ -113,10 +113,6 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
if grpclog.V(2) {
grpclog.Infoln("base.baseBalancer: got new ClientConn state: ", s)
}
if len(s.ResolverState.Addresses) == 0 {
b.ResolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}
// Successful resolution; clear resolver error and ensure we return nil.
b.resolverErr = nil
// addrsSet is the set converted from addrs, it's used for quick lookup of an address.
@ -144,6 +140,14 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
// The entry will be deleted in HandleSubConnStateChange.
}
}
// If resolver state contains no addresses, return an error so ClientConn
// will trigger re-resolve. Also records this as an resolver error, so when
// the overall state turns transient failure, the error message will have
// the zero address information.
if len(s.ResolverState.Addresses) == 0 {
b.ResolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}
return nil
}

View File

@ -216,31 +216,20 @@ func (s) TestAddressesRemoved(t *testing.T) {
}
r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
// Removing addresses results in an error reported to the clientconn, but
// the existing connections remain. RPCs should still succeed.
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
}
// Stop the server to bring the channel state into transient failure.
test.cleanup()
// Wait for not-ready.
for src := cc.GetState(); src == connectivity.Ready; src = cc.GetState() {
if !cc.WaitForStateChange(ctx, src) {
t.Fatalf("timed out waiting for state change. got %v; want !%v", src, connectivity.Ready)
ctx2, cancel2 := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel2()
// Wait for state to change to transient failure.
for src := cc.GetState(); src != connectivity.TransientFailure; src = cc.GetState() {
if !cc.WaitForStateChange(ctx2, src) {
t.Fatalf("timed out waiting for state change. got %v; want %v", src, connectivity.TransientFailure)
}
}
// Report an empty server list again; because the state is not ready, the
// empty address list error should surface to the user.
r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
const msgWant = "produced zero addresses"
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || !strings.Contains(status.Convert(err).Message(), msgWant) {
if _, err := testc.EmptyCall(ctx2, &testpb.Empty{}); err == nil || !strings.Contains(status.Convert(err).Message(), msgWant) {
t.Fatalf("EmptyCall() = _, %v, want _, Contains(Message(), %q)", err, msgWant)
}
}
func (s) TestCloseWithPendingRPC(t *testing.T) {

View File

@ -21,6 +21,7 @@ import (
"testing"
"time"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
@ -56,7 +57,6 @@ func (s) TestEDSPriority_HighPriorityReady(t *testing.T) {
p1 := <-cc.newPickerCh
want := []balancer.SubConn{sc1}
if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil {
// t.Fatalf("want %v, got %v", want, err)
t.Fatalf("want %v, got %v", want, err)
}
@ -123,7 +123,6 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) {
p0 := <-cc.newPickerCh
want := []balancer.SubConn{sc0}
if err := isRoundRobin(want, subConnFromPicker(p0)); err != nil {
// t.Fatalf("want %v, got %v", want, err)
t.Fatalf("want %v, got %v", want, err)
}
@ -426,7 +425,6 @@ func (s) TestEDSPriority_MultipleLocalities(t *testing.T) {
p0 := <-cc.newPickerCh
want := []balancer.SubConn{sc0}
if err := isRoundRobin(want, subConnFromPicker(p0)); err != nil {
// t.Fatalf("want %v, got %v", want, err)
t.Fatalf("want %v, got %v", want, err)
}
@ -445,7 +443,6 @@ func (s) TestEDSPriority_MultipleLocalities(t *testing.T) {
p1 := <-cc.newPickerCh
want = []balancer.SubConn{sc1}
if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil {
// t.Fatalf("want %v, got %v", want, err)
t.Fatalf("want %v, got %v", want, err)
}
@ -657,3 +654,123 @@ func (s) TestPriorityType(t *testing.T) {
t.Errorf("want p1 to be equal to priority with value 1, got p1==1: %v", got)
}
}
// Test the case where the high priority contains no backends. The low priority
// will be used.
func (s) TestEDSPriority_HighPriorityNoEndpoints(t *testing.T) {
cc := newTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
// Two localities, with priorities [0, 1], each with one backend.
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
addrs1 := <-cc.newSubConnAddrsCh
if got, want := addrs1[0].Addr, testEndpointAddrs[0]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc1 := <-cc.newSubConnCh
// p0 is ready.
edsb.HandleSubConnStateChange(sc1, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc1, connectivity.Ready)
// Test roundrobin with only p0 subconns.
p1 := <-cc.newPickerCh
want := []balancer.SubConn{sc1}
if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Remove addresses from priority 0, should use p1.
clab2 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.AddLocality(testSubZones[0], 1, 0, nil, nil)
clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab2.Build()))
// p0 will remove the subconn, and ClientConn will send a sc update to
// shutdown.
scToRemove := <-cc.removeSubConnCh
edsb.HandleSubConnStateChange(scToRemove, connectivity.Shutdown)
addrs2 := <-cc.newSubConnAddrsCh
if got, want := addrs2[0].Addr, testEndpointAddrs[1]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc2 := <-cc.newSubConnCh
// p1 is ready.
edsb.HandleSubConnStateChange(sc2, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc2, connectivity.Ready)
// Test roundrobin with only p1 subconns.
p2 := <-cc.newPickerCh
want = []balancer.SubConn{sc2}
if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}
// Test the case where the high priority contains no healthy backends. The low
// priority will be used.
func (s) TestEDSPriority_HighPriorityAllUnhealthy(t *testing.T) {
cc := newTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
// Two localities, with priorities [0, 1], each with one backend.
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
addrs1 := <-cc.newSubConnAddrsCh
if got, want := addrs1[0].Addr, testEndpointAddrs[0]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc1 := <-cc.newSubConnCh
// p0 is ready.
edsb.HandleSubConnStateChange(sc1, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc1, connectivity.Ready)
// Test roundrobin with only p0 subconns.
p1 := <-cc.newPickerCh
want := []balancer.SubConn{sc1}
if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Set priority 0 endpoints to all unhealthy, should use p1.
clab2 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], &xdsclient.AddLocalityOptions{
Health: []corepb.HealthStatus{corepb.HealthStatus_UNHEALTHY},
})
clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab2.Build()))
// p0 will remove the subconn, and ClientConn will send a sc update to
// transient failure.
scToRemove := <-cc.removeSubConnCh
edsb.HandleSubConnStateChange(scToRemove, connectivity.Shutdown)
addrs2 := <-cc.newSubConnAddrsCh
if got, want := addrs2[0].Addr, testEndpointAddrs[1]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc2 := <-cc.newSubConnCh
// p1 is ready.
edsb.HandleSubConnStateChange(sc2, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc2, connectivity.Ready)
// Test roundrobin with only p1 subconns.
p2 := <-cc.newPickerCh
want = []balancer.SubConn{sc2}
if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}