From 9ecb6116ff8b19e6f9627f339177681034a6c568 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Thu, 28 Feb 2019 13:11:23 -0800 Subject: [PATCH] grpclb: drop only when at least one SubConn is ready (#2630) --- balancer/grpclb/grpclb.go | 11 ++++-- balancer/grpclb/grpclb_test.go | 69 +++++++++++++++++++--------------- 2 files changed, 45 insertions(+), 35 deletions(-) diff --git a/balancer/grpclb/grpclb.go b/balancer/grpclb/grpclb.go index 735c1841..abc2e211 100644 --- a/balancer/grpclb/grpclb.go +++ b/balancer/grpclb/grpclb.go @@ -258,11 +258,14 @@ func (lb *lbBalancer) regeneratePicker() { } } + if len(readySCs) <= 0 { + // If there's no ready SubConns, always re-pick. This is to avoid drops + // unless at least one SubConn is ready. Otherwise we may drop more + // often than want because of drops + re-picks(which become re-drops). + lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable} + return + } if len(lb.fullServerList) <= 0 { - if len(readySCs) <= 0 { - lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable} - return - } lb.picker = &rrPicker{subConns: readySCs} return } diff --git a/balancer/grpclb/grpclb_test.go b/balancer/grpclb/grpclb_test.go index 04c5b2f7..33e5f2ce 100644 --- a/balancer/grpclb/grpclb_test.go +++ b/balancer/grpclb/grpclb_test.go @@ -744,14 +744,18 @@ func runAndGetStats(t *testing.T, drop bool, runRPCs func(*grpc.ClientConn)) *rp t.Fatalf("failed to create new load balancer: %v", err) } defer cleanup() - tss.ls.sls <- &lbpb.ServerList{ - Servers: []*lbpb.Server{{ - IpAddress: tss.beIPs[0], - Port: int32(tss.bePorts[0]), + servers := []*lbpb.Server{{ + IpAddress: tss.beIPs[0], + Port: int32(tss.bePorts[0]), + LoadBalanceToken: lbToken, + }} + if drop { + servers = append(servers, &lbpb.Server{ LoadBalanceToken: lbToken, Drop: drop, - }}, + }) } + tss.ls.sls <- &lbpb.ServerList{Servers: servers} tss.ls.statsDura = 100 * time.Millisecond creds := serverNameCheckCreds{expected: beServerName} @@ -781,7 +785,6 @@ func runAndGetStats(t *testing.T, drop bool, runRPCs func(*grpc.ClientConn)) *rp const ( countRPC = 40 failtosendURI = "failtosend" - dropErrDesc = "request dropped by grpclb" ) func TestGRPCLBStatsUnarySuccess(t *testing.T) { @@ -808,27 +811,22 @@ func TestGRPCLBStatsUnarySuccess(t *testing.T) { func TestGRPCLBStatsUnaryDrop(t *testing.T) { defer leakcheck.Check(t) - c := 0 stats := runAndGetStats(t, true, func(cc *grpc.ClientConn) { testC := testpb.NewTestServiceClient(cc) - for { - c++ - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { - if strings.Contains(err.Error(), dropErrDesc) { - break - } - } + // The first non-failfast RPC succeeds, all connections are up. + if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } - for i := 0; i < countRPC; i++ { + for i := 0; i < countRPC-1; i++ { testC.EmptyCall(context.Background(), &testpb.Empty{}) } }) if err := checkStats(stats, &rpcStats{ - numCallsStarted: int64(countRPC + c), - numCallsFinished: int64(countRPC + c), - numCallsFinishedWithClientFailedToSend: int64(c - 1), - numCallsDropped: map[string]int64{lbToken: int64(countRPC + 1)}, + numCallsStarted: int64(countRPC), + numCallsFinished: int64(countRPC), + numCallsFinishedKnownReceived: int64(countRPC) / 2, + numCallsDropped: map[string]int64{lbToken: int64(countRPC) / 2}, }); err != nil { t.Fatal(err) } @@ -895,27 +893,36 @@ func TestGRPCLBStatsStreamingSuccess(t *testing.T) { func TestGRPCLBStatsStreamingDrop(t *testing.T) { defer leakcheck.Check(t) - c := 0 stats := runAndGetStats(t, true, func(cc *grpc.ClientConn) { testC := testpb.NewTestServiceClient(cc) + // The first non-failfast RPC succeeds, all connections are up. + stream, err := testC.FullDuplexCall(context.Background(), grpc.WaitForReady(true)) + if err != nil { + t.Fatalf("%v.FullDuplexCall(_, _) = _, %v, want _, ", testC, err) + } for { - c++ - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { - if strings.Contains(err.Error(), dropErrDesc) { - break - } + if _, err = stream.Recv(); err == io.EOF { + break } } - for i := 0; i < countRPC; i++ { - testC.FullDuplexCall(context.Background()) + for i := 0; i < countRPC-1; i++ { + stream, err = testC.FullDuplexCall(context.Background()) + if err == nil { + // Wait for stream to end if err is nil. + for { + if _, err = stream.Recv(); err == io.EOF { + break + } + } + } } }) if err := checkStats(stats, &rpcStats{ - numCallsStarted: int64(countRPC + c), - numCallsFinished: int64(countRPC + c), - numCallsFinishedWithClientFailedToSend: int64(c - 1), - numCallsDropped: map[string]int64{lbToken: int64(countRPC + 1)}, + numCallsStarted: int64(countRPC), + numCallsFinished: int64(countRPC), + numCallsFinishedKnownReceived: int64(countRPC) / 2, + numCallsDropped: map[string]int64{lbToken: int64(countRPC) / 2}, }); err != nil { t.Fatal(err) }