grpclb: keep drop index unless a new serverlist is received ()

Keep the drop index from the previous picker when regenerating picker due to
subchannel state change. Only reset the drop index when picker is regenerated
because of a serverlist update.

fixes 
This commit is contained in:
Menghan Li
2019-03-11 10:09:16 -07:00
committed by GitHub
parent 29adad9d71
commit 69e99ca520
4 changed files with 96 additions and 28 deletions

@ -244,7 +244,7 @@ type lbBalancer struct {
// - always returns ErrTransientFailure if the balancer is in TransientFailure,
// - does two layer roundrobin pick otherwise.
// Caller must hold lb.mu.
func (lb *lbBalancer) regeneratePicker() {
func (lb *lbBalancer) regeneratePicker(resetDrop bool) {
if lb.state == connectivity.TransientFailure {
lb.picker = &errPicker{err: balancer.ErrTransientFailure}
return
@ -266,14 +266,19 @@ func (lb *lbBalancer) regeneratePicker() {
return
}
if len(lb.fullServerList) <= 0 {
lb.picker = &rrPicker{subConns: readySCs}
lb.picker = newRRPicker(readySCs)
return
}
lb.picker = &lbPicker{
serverList: lb.fullServerList,
subConns: readySCs,
stats: lb.clientStats,
if resetDrop {
lb.picker = newLBPicker(lb.fullServerList, readySCs, lb.clientStats)
return
}
prevLBPicker, ok := lb.picker.(*lbPicker)
if !ok {
lb.picker = newLBPicker(lb.fullServerList, readySCs, lb.clientStats)
return
}
prevLBPicker.updateReadySCs(readySCs)
}
func (lb *lbBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
@ -306,7 +311,7 @@ func (lb *lbBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivi
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
if (oldS == connectivity.Ready) != (s == connectivity.Ready) ||
(lb.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
lb.regeneratePicker()
lb.regeneratePicker(false)
}
lb.cc.UpdateBalancerState(lb.state, lb.picker)

@ -26,6 +26,7 @@ import (
"google.golang.org/grpc/balancer"
lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/status"
)
@ -110,6 +111,13 @@ type rrPicker struct {
subConnsNext int
}
func newRRPicker(readySCs []balancer.SubConn) *rrPicker {
return &rrPicker{
subConns: readySCs,
subConnsNext: grpcrand.Intn(len(readySCs)),
}
}
func (p *rrPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
p.mu.Lock()
defer p.mu.Unlock()
@ -137,6 +145,15 @@ type lbPicker struct {
stats *rpcStats
}
func newLBPicker(serverList []*lbpb.Server, readySCs []balancer.SubConn, stats *rpcStats) *lbPicker {
return &lbPicker{
serverList: serverList,
subConns: readySCs,
subConnsNext: grpcrand.Intn(len(readySCs)),
stats: stats,
}
}
func (p *lbPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
p.mu.Lock()
defer p.mu.Unlock()
@ -168,3 +185,11 @@ func (p *lbPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balance
}
return sc, done, nil
}
func (p *lbPicker) updateReadySCs(readySCs []balancer.SubConn) {
p.mu.Lock()
defer p.mu.Unlock()
p.subConns = readySCs
p.subConnsNext = p.subConnsNext % len(readySCs)
}

@ -88,7 +88,7 @@ func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {
//
// Now with cache, even if SubConn was newed/removed, there might be no
// state changes.
lb.regeneratePicker()
lb.regeneratePicker(true)
lb.cc.UpdateBalancerState(lb.state, lb.picker)
}

@ -291,11 +291,12 @@ func stopBackends(servers []*grpc.Server) {
}
type testServers struct {
lbAddr string
ls *remoteBalancer
lb *grpc.Server
beIPs []net.IP
bePorts []int
lbAddr string
ls *remoteBalancer
lb *grpc.Server
backends []*grpc.Server
beIPs []net.IP
bePorts []int
}
func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), err error) {
@ -337,11 +338,12 @@ func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), er
}()
tss = &testServers{
lbAddr: fakeName + ":" + strconv.Itoa(lbLis.Addr().(*net.TCPAddr).Port),
ls: ls,
lb: lb,
beIPs: beIPs,
bePorts: bePorts,
lbAddr: net.JoinHostPort(fakeName, strconv.Itoa(lbLis.Addr().(*net.TCPAddr).Port)),
ls: ls,
lb: lb,
backends: backends,
beIPs: beIPs,
bePorts: bePorts,
}
cleanup = func() {
defer stopBackends(backends)
@ -477,7 +479,7 @@ func TestDropRequest(t *testing.T) {
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
tss, cleanup, err := newLoadBalancer(1)
tss, cleanup, err := newLoadBalancer(2)
if err != nil {
t.Fatalf("failed to create new load balancer: %v", err)
}
@ -488,6 +490,11 @@ func TestDropRequest(t *testing.T) {
Port: int32(tss.bePorts[0]),
LoadBalanceToken: lbToken,
Drop: false,
}, {
IpAddress: tss.beIPs[1],
Port: int32(tss.bePorts[1]),
LoadBalanceToken: lbToken,
Drop: false,
}, {
Drop: true,
}},
@ -512,8 +519,7 @@ func TestDropRequest(t *testing.T) {
}})
// Wait for the 1st, non-fail-fast RPC to succeed. This ensures both server
// connections are made, because the first one has DropForLoadBalancing set
// to true.
// connections are made, because the first one has Drop set to true.
var i int
for i = 0; i < 1000; i++ {
if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err == nil {
@ -531,16 +537,48 @@ func TestDropRequest(t *testing.T) {
}
for _, failfast := range []bool{true, false} {
for i := 0; i < 3; i++ {
// Even RPCs should fail, because the 2st backend has
// DropForLoadBalancing set to true.
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(!failfast)); status.Code(err) != codes.Unavailable {
t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, %s", testC, err, codes.Unavailable)
}
// Odd RPCs should succeed since they choose the non-drop-request
// backend according to the round robin policy.
// 1st RPCs pick the second item in server list. They should succeed
// since they choose the non-drop-request backend according to the
// round robin policy.
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(!failfast)); err != nil {
t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
// 2st RPCs should fail, because they pick last item in server list,
// with Drop set to true.
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(!failfast)); status.Code(err) != codes.Unavailable {
t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, %s", testC, err, codes.Unavailable)
}
// 3rd RPCs pick the first item in server list. They should succeed
// since they choose the non-drop-request backend according to the
// round robin policy.
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(!failfast)); err != nil {
t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
}
}
tss.backends[0].Stop()
// This last pick was backend 0. Closing backend 0 doesn't reset drop index
// (for level 1 picking), so the following picks will be (backend1, drop,
// backend1), instead of (backend, backend, drop) if drop index was reset.
time.Sleep(time.Second)
for i := 0; i < 3; i++ {
var p peer.Peer
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
if want := tss.bePorts[1]; p.Addr.(*net.TCPAddr).Port != want {
t.Errorf("got peer: %v, want peer port: %v", p.Addr, want)
}
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.Unavailable {
t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, %s", testC, err, codes.Unavailable)
}
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
if want := tss.bePorts[1]; p.Addr.(*net.TCPAddr).Port != want {
t.Errorf("got peer: %v, want peer port: %v", p.Addr, want)
}
}
}