grpclb: drop only when at least one SubConn is ready (#2630)

This commit is contained in:
Menghan Li
2019-02-28 13:11:23 -08:00
committed by GitHub
parent 5878d965b2
commit 9ecb6116ff
2 changed files with 45 additions and 35 deletions

View File

@ -258,11 +258,14 @@ func (lb *lbBalancer) regeneratePicker() {
} }
} }
if len(readySCs) <= 0 {
// If there's no ready SubConns, always re-pick. This is to avoid drops
// unless at least one SubConn is ready. Otherwise we may drop more
// often than want because of drops + re-picks(which become re-drops).
lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
return
}
if len(lb.fullServerList) <= 0 { if len(lb.fullServerList) <= 0 {
if len(readySCs) <= 0 {
lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
return
}
lb.picker = &rrPicker{subConns: readySCs} lb.picker = &rrPicker{subConns: readySCs}
return return
} }

View File

@ -744,14 +744,18 @@ func runAndGetStats(t *testing.T, drop bool, runRPCs func(*grpc.ClientConn)) *rp
t.Fatalf("failed to create new load balancer: %v", err) t.Fatalf("failed to create new load balancer: %v", err)
} }
defer cleanup() defer cleanup()
tss.ls.sls <- &lbpb.ServerList{ servers := []*lbpb.Server{{
Servers: []*lbpb.Server{{ IpAddress: tss.beIPs[0],
IpAddress: tss.beIPs[0], Port: int32(tss.bePorts[0]),
Port: int32(tss.bePorts[0]), LoadBalanceToken: lbToken,
}}
if drop {
servers = append(servers, &lbpb.Server{
LoadBalanceToken: lbToken, LoadBalanceToken: lbToken,
Drop: drop, Drop: drop,
}}, })
} }
tss.ls.sls <- &lbpb.ServerList{Servers: servers}
tss.ls.statsDura = 100 * time.Millisecond tss.ls.statsDura = 100 * time.Millisecond
creds := serverNameCheckCreds{expected: beServerName} creds := serverNameCheckCreds{expected: beServerName}
@ -781,7 +785,6 @@ func runAndGetStats(t *testing.T, drop bool, runRPCs func(*grpc.ClientConn)) *rp
const ( const (
countRPC = 40 countRPC = 40
failtosendURI = "failtosend" failtosendURI = "failtosend"
dropErrDesc = "request dropped by grpclb"
) )
func TestGRPCLBStatsUnarySuccess(t *testing.T) { func TestGRPCLBStatsUnarySuccess(t *testing.T) {
@ -808,27 +811,22 @@ func TestGRPCLBStatsUnarySuccess(t *testing.T) {
func TestGRPCLBStatsUnaryDrop(t *testing.T) { func TestGRPCLBStatsUnaryDrop(t *testing.T) {
defer leakcheck.Check(t) defer leakcheck.Check(t)
c := 0
stats := runAndGetStats(t, true, func(cc *grpc.ClientConn) { stats := runAndGetStats(t, true, func(cc *grpc.ClientConn) {
testC := testpb.NewTestServiceClient(cc) testC := testpb.NewTestServiceClient(cc)
for { // The first non-failfast RPC succeeds, all connections are up.
c++ if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
if strings.Contains(err.Error(), dropErrDesc) {
break
}
}
} }
for i := 0; i < countRPC; i++ { for i := 0; i < countRPC-1; i++ {
testC.EmptyCall(context.Background(), &testpb.Empty{}) testC.EmptyCall(context.Background(), &testpb.Empty{})
} }
}) })
if err := checkStats(stats, &rpcStats{ if err := checkStats(stats, &rpcStats{
numCallsStarted: int64(countRPC + c), numCallsStarted: int64(countRPC),
numCallsFinished: int64(countRPC + c), numCallsFinished: int64(countRPC),
numCallsFinishedWithClientFailedToSend: int64(c - 1), numCallsFinishedKnownReceived: int64(countRPC) / 2,
numCallsDropped: map[string]int64{lbToken: int64(countRPC + 1)}, numCallsDropped: map[string]int64{lbToken: int64(countRPC) / 2},
}); err != nil { }); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -895,27 +893,36 @@ func TestGRPCLBStatsStreamingSuccess(t *testing.T) {
func TestGRPCLBStatsStreamingDrop(t *testing.T) { func TestGRPCLBStatsStreamingDrop(t *testing.T) {
defer leakcheck.Check(t) defer leakcheck.Check(t)
c := 0
stats := runAndGetStats(t, true, func(cc *grpc.ClientConn) { stats := runAndGetStats(t, true, func(cc *grpc.ClientConn) {
testC := testpb.NewTestServiceClient(cc) testC := testpb.NewTestServiceClient(cc)
// The first non-failfast RPC succeeds, all connections are up.
stream, err := testC.FullDuplexCall(context.Background(), grpc.WaitForReady(true))
if err != nil {
t.Fatalf("%v.FullDuplexCall(_, _) = _, %v, want _, <nil>", testC, err)
}
for { for {
c++ if _, err = stream.Recv(); err == io.EOF {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { break
if strings.Contains(err.Error(), dropErrDesc) {
break
}
} }
} }
for i := 0; i < countRPC; i++ { for i := 0; i < countRPC-1; i++ {
testC.FullDuplexCall(context.Background()) stream, err = testC.FullDuplexCall(context.Background())
if err == nil {
// Wait for stream to end if err is nil.
for {
if _, err = stream.Recv(); err == io.EOF {
break
}
}
}
} }
}) })
if err := checkStats(stats, &rpcStats{ if err := checkStats(stats, &rpcStats{
numCallsStarted: int64(countRPC + c), numCallsStarted: int64(countRPC),
numCallsFinished: int64(countRPC + c), numCallsFinished: int64(countRPC),
numCallsFinishedWithClientFailedToSend: int64(c - 1), numCallsFinishedKnownReceived: int64(countRPC) / 2,
numCallsDropped: map[string]int64{lbToken: int64(countRPC + 1)}, numCallsDropped: map[string]int64{lbToken: int64(countRPC) / 2},
}); err != nil { }); err != nil {
t.Fatal(err) t.Fatal(err)
} }