grpclb: do not send redundant empty load reports to remote LB (#3249)

Small refactor of tests:

- Add a channel to remoteBalancer on which to send received stats messages
- Change runAndGetStats to runAndCheckStats to allow for faster successful test
  runs (no longer need to sleep for one whole second per test).
- Remove redundant leak check from runAndCheckStats, otherwise excess load
  report messages not read from the channel will result in an infinite loop.
- Add String method to rpcStats to avoid data race between merging and printing.
This commit is contained in:
Doug Fawley
2019-12-12 14:10:47 -08:00
committed by GitHub
parent df18b5433b
commit 085c980048
3 changed files with 103 additions and 44 deletions

View File

@ -49,6 +49,14 @@ func newRPCStats() *rpcStats {
} }
} }
func isZeroStats(stats *lbpb.ClientStats) bool {
return len(stats.CallsFinishedWithDrop) == 0 &&
stats.NumCallsStarted == 0 &&
stats.NumCallsFinished == 0 &&
stats.NumCallsFinishedWithClientFailedToSend == 0 &&
stats.NumCallsFinishedKnownReceived == 0
}
// toClientStats converts rpcStats to lbpb.ClientStats, and clears rpcStats. // toClientStats converts rpcStats to lbpb.ClientStats, and clears rpcStats.
func (s *rpcStats) toClientStats() *lbpb.ClientStats { func (s *rpcStats) toClientStats() *lbpb.ClientStats {
stats := &lbpb.ClientStats{ stats := &lbpb.ClientStats{

View File

@ -286,6 +286,7 @@ func (ccw *remoteBalancerCCWrapper) readServerList(s *balanceLoadClientStream) e
func (ccw *remoteBalancerCCWrapper) sendLoadReport(s *balanceLoadClientStream, interval time.Duration) { func (ccw *remoteBalancerCCWrapper) sendLoadReport(s *balanceLoadClientStream, interval time.Duration) {
ticker := time.NewTicker(interval) ticker := time.NewTicker(interval)
defer ticker.Stop() defer ticker.Stop()
lastZero := false
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
@ -293,6 +294,12 @@ func (ccw *remoteBalancerCCWrapper) sendLoadReport(s *balanceLoadClientStream, i
return return
} }
stats := ccw.lb.clientStats.toClientStats() stats := ccw.lb.clientStats.toClientStats()
zero := isZeroStats(stats)
if zero && lastZero {
// Quash redundant empty load reports.
continue
}
lastZero = zero
t := time.Now() t := time.Now()
stats.Timestamp = &timestamppb.Timestamp{ stats.Timestamp = &timestamppb.Timestamp{
Seconds: t.Unix(), Seconds: t.Unix(),

View File

@ -166,18 +166,31 @@ func (s *rpcStats) equal(o *rpcStats) bool {
return mapsEqual(s.numCallsDropped, o.numCallsDropped) return mapsEqual(s.numCallsDropped, o.numCallsDropped)
} }
func (s *rpcStats) String() string {
s.mu.Lock()
defer s.mu.Unlock()
return fmt.Sprintf("Started: %v, Finished: %v, FinishedWithClientFailedToSend: %v, FinishedKnownReceived: %v, Dropped: %v",
atomic.LoadInt64(&s.numCallsStarted),
atomic.LoadInt64(&s.numCallsFinished),
atomic.LoadInt64(&s.numCallsFinishedWithClientFailedToSend),
atomic.LoadInt64(&s.numCallsFinishedKnownReceived),
s.numCallsDropped)
}
type remoteBalancer struct { type remoteBalancer struct {
sls chan *lbpb.ServerList sls chan *lbpb.ServerList
statsDura time.Duration statsDura time.Duration
done chan struct{} done chan struct{}
stats *rpcStats stats *rpcStats
statsChan chan *lbpb.ClientStats
} }
func newRemoteBalancer(intervals []time.Duration) *remoteBalancer { func newRemoteBalancer(intervals []time.Duration, statsChan chan *lbpb.ClientStats) *remoteBalancer {
return &remoteBalancer{ return &remoteBalancer{
sls: make(chan *lbpb.ServerList, 1), sls: make(chan *lbpb.ServerList, 1),
done: make(chan struct{}), done: make(chan struct{}),
stats: newRPCStats(), stats: newRPCStats(),
statsChan: statsChan,
} }
} }
@ -218,6 +231,9 @@ func (b *remoteBalancer) BalanceLoad(stream lbgrpc.LoadBalancer_BalanceLoadServe
return return
} }
b.stats.merge(req.GetClientStats()) b.stats.merge(req.GetClientStats())
if b.statsChan != nil && req.GetClientStats() != nil {
b.statsChan <- req.GetClientStats()
}
} }
}() }()
for { for {
@ -295,7 +311,7 @@ type testServers struct {
beListeners []net.Listener beListeners []net.Listener
} }
func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), err error) { func newLoadBalancer(numberOfBackends int, statsChan chan *lbpb.ClientStats) (tss *testServers, cleanup func(), err error) {
var ( var (
beListeners []net.Listener beListeners []net.Listener
ls *remoteBalancer ls *remoteBalancer
@ -328,7 +344,7 @@ func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), er
sn: lbServerName, sn: lbServerName,
} }
lb = grpc.NewServer(grpc.Creds(lbCreds)) lb = grpc.NewServer(grpc.Creds(lbCreds))
ls = newRemoteBalancer(nil) ls = newRemoteBalancer(nil, statsChan)
lbgrpc.RegisterLoadBalancerServer(lb, ls) lbgrpc.RegisterLoadBalancerServer(lb, ls)
go func() { go func() {
lb.Serve(lbLis) lb.Serve(lbLis)
@ -361,7 +377,7 @@ func TestGRPCLB(t *testing.T) {
r, cleanup := manual.GenerateAndRegisterManualResolver() r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup() defer cleanup()
tss, cleanup, err := newLoadBalancer(1) tss, cleanup, err := newLoadBalancer(1, nil)
if err != nil { if err != nil {
t.Fatalf("failed to create new load balancer: %v", err) t.Fatalf("failed to create new load balancer: %v", err)
} }
@ -407,7 +423,7 @@ func TestGRPCLBWeighted(t *testing.T) {
r, cleanup := manual.GenerateAndRegisterManualResolver() r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup() defer cleanup()
tss, cleanup, err := newLoadBalancer(2) tss, cleanup, err := newLoadBalancer(2, nil)
if err != nil { if err != nil {
t.Fatalf("failed to create new load balancer: %v", err) t.Fatalf("failed to create new load balancer: %v", err)
} }
@ -475,7 +491,7 @@ func TestDropRequest(t *testing.T) {
r, cleanup := manual.GenerateAndRegisterManualResolver() r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup() defer cleanup()
tss, cleanup, err := newLoadBalancer(2) tss, cleanup, err := newLoadBalancer(2, nil)
if err != nil { if err != nil {
t.Fatalf("failed to create new load balancer: %v", err) t.Fatalf("failed to create new load balancer: %v", err)
} }
@ -633,7 +649,7 @@ func TestBalancerDisconnects(t *testing.T) {
lbs []*grpc.Server lbs []*grpc.Server
) )
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
tss, cleanup, err := newLoadBalancer(1) tss, cleanup, err := newLoadBalancer(1, nil)
if err != nil { if err != nil {
t.Fatalf("failed to create new load balancer: %v", err) t.Fatalf("failed to create new load balancer: %v", err)
} }
@ -708,7 +724,7 @@ func TestFallback(t *testing.T) {
r, cleanup := manual.GenerateAndRegisterManualResolver() r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup() defer cleanup()
tss, cleanup, err := newLoadBalancer(1) tss, cleanup, err := newLoadBalancer(1, nil)
if err != nil { if err != nil {
t.Fatalf("failed to create new load balancer: %v", err) t.Fatalf("failed to create new load balancer: %v", err)
} }
@ -843,7 +859,7 @@ func TestFallBackWithNoServerAddress(t *testing.T) {
} }
defer cleanup() defer cleanup()
tss, cleanup, err := newLoadBalancer(1) tss, cleanup, err := newLoadBalancer(1, nil)
if err != nil { if err != nil {
t.Fatalf("failed to create new load balancer: %v", err) t.Fatalf("failed to create new load balancer: %v", err)
} }
@ -957,7 +973,7 @@ func TestGRPCLBPickFirst(t *testing.T) {
r, cleanup := manual.GenerateAndRegisterManualResolver() r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup() defer cleanup()
tss, cleanup, err := newLoadBalancer(3) tss, cleanup, err := newLoadBalancer(3, nil)
if err != nil { if err != nil {
t.Fatalf("failed to create new load balancer: %v", err) t.Fatalf("failed to create new load balancer: %v", err)
} }
@ -1108,13 +1124,11 @@ func checkStats(stats, expected *rpcStats) error {
return nil return nil
} }
func runAndGetStats(t *testing.T, drop bool, runRPCs func(*grpc.ClientConn)) *rpcStats { func runAndCheckStats(t *testing.T, drop bool, statsChan chan *lbpb.ClientStats, runRPCs func(*grpc.ClientConn), statsWant *rpcStats) error {
defer leakcheck.Check(t)
r, cleanup := manual.GenerateAndRegisterManualResolver() r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup() defer cleanup()
tss, cleanup, err := newLoadBalancer(1) tss, cleanup, err := newLoadBalancer(1, statsChan)
if err != nil { if err != nil {
t.Fatalf("failed to create new load balancer: %v", err) t.Fatalf("failed to create new load balancer: %v", err)
} }
@ -1152,9 +1166,14 @@ func runAndGetStats(t *testing.T, drop bool, runRPCs func(*grpc.ClientConn)) *rp
}}}) }}})
runRPCs(cc) runRPCs(cc)
time.Sleep(1 * time.Second) end := time.Now().Add(time.Second)
stats := tss.ls.stats for time.Now().Before(end) {
return stats if err := checkStats(tss.ls.stats, statsWant); err == nil {
time.Sleep(200 * time.Millisecond) // sleep for two intervals to make sure no new stats are reported.
break
}
}
return checkStats(tss.ls.stats, statsWant)
} }
const ( const (
@ -1164,7 +1183,7 @@ const (
func TestGRPCLBStatsUnarySuccess(t *testing.T) { func TestGRPCLBStatsUnarySuccess(t *testing.T) {
defer leakcheck.Check(t) defer leakcheck.Check(t)
stats := runAndGetStats(t, false, func(cc *grpc.ClientConn) { if err := runAndCheckStats(t, false, nil, func(cc *grpc.ClientConn) {
testC := testpb.NewTestServiceClient(cc) testC := testpb.NewTestServiceClient(cc)
// The first non-failfast RPC succeeds, all connections are up. // The first non-failfast RPC succeeds, all connections are up.
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
@ -1173,9 +1192,7 @@ func TestGRPCLBStatsUnarySuccess(t *testing.T) {
for i := 0; i < countRPC-1; i++ { for i := 0; i < countRPC-1; i++ {
testC.EmptyCall(context.Background(), &testpb.Empty{}) testC.EmptyCall(context.Background(), &testpb.Empty{})
} }
}) }, &rpcStats{
if err := checkStats(stats, &rpcStats{
numCallsStarted: int64(countRPC), numCallsStarted: int64(countRPC),
numCallsFinished: int64(countRPC), numCallsFinished: int64(countRPC),
numCallsFinishedKnownReceived: int64(countRPC), numCallsFinishedKnownReceived: int64(countRPC),
@ -1186,7 +1203,7 @@ func TestGRPCLBStatsUnarySuccess(t *testing.T) {
func TestGRPCLBStatsUnaryDrop(t *testing.T) { func TestGRPCLBStatsUnaryDrop(t *testing.T) {
defer leakcheck.Check(t) defer leakcheck.Check(t)
stats := runAndGetStats(t, true, func(cc *grpc.ClientConn) { if err := runAndCheckStats(t, true, nil, func(cc *grpc.ClientConn) {
testC := testpb.NewTestServiceClient(cc) testC := testpb.NewTestServiceClient(cc)
// The first non-failfast RPC succeeds, all connections are up. // The first non-failfast RPC succeeds, all connections are up.
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
@ -1195,9 +1212,7 @@ func TestGRPCLBStatsUnaryDrop(t *testing.T) {
for i := 0; i < countRPC-1; i++ { for i := 0; i < countRPC-1; i++ {
testC.EmptyCall(context.Background(), &testpb.Empty{}) testC.EmptyCall(context.Background(), &testpb.Empty{})
} }
}) }, &rpcStats{
if err := checkStats(stats, &rpcStats{
numCallsStarted: int64(countRPC), numCallsStarted: int64(countRPC),
numCallsFinished: int64(countRPC), numCallsFinished: int64(countRPC),
numCallsFinishedKnownReceived: int64(countRPC) / 2, numCallsFinishedKnownReceived: int64(countRPC) / 2,
@ -1209,7 +1224,7 @@ func TestGRPCLBStatsUnaryDrop(t *testing.T) {
func TestGRPCLBStatsUnaryFailedToSend(t *testing.T) { func TestGRPCLBStatsUnaryFailedToSend(t *testing.T) {
defer leakcheck.Check(t) defer leakcheck.Check(t)
stats := runAndGetStats(t, false, func(cc *grpc.ClientConn) { if err := runAndCheckStats(t, false, nil, func(cc *grpc.ClientConn) {
testC := testpb.NewTestServiceClient(cc) testC := testpb.NewTestServiceClient(cc)
// The first non-failfast RPC succeeds, all connections are up. // The first non-failfast RPC succeeds, all connections are up.
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
@ -1218,9 +1233,7 @@ func TestGRPCLBStatsUnaryFailedToSend(t *testing.T) {
for i := 0; i < countRPC-1; i++ { for i := 0; i < countRPC-1; i++ {
cc.Invoke(context.Background(), failtosendURI, &testpb.Empty{}, nil) cc.Invoke(context.Background(), failtosendURI, &testpb.Empty{}, nil)
} }
}) }, &rpcStats{
if err := checkStats(stats, &rpcStats{
numCallsStarted: int64(countRPC)*2 - 1, numCallsStarted: int64(countRPC)*2 - 1,
numCallsFinished: int64(countRPC)*2 - 1, numCallsFinished: int64(countRPC)*2 - 1,
numCallsFinishedWithClientFailedToSend: int64(countRPC-1) * 2, numCallsFinishedWithClientFailedToSend: int64(countRPC-1) * 2,
@ -1232,7 +1245,7 @@ func TestGRPCLBStatsUnaryFailedToSend(t *testing.T) {
func TestGRPCLBStatsStreamingSuccess(t *testing.T) { func TestGRPCLBStatsStreamingSuccess(t *testing.T) {
defer leakcheck.Check(t) defer leakcheck.Check(t)
stats := runAndGetStats(t, false, func(cc *grpc.ClientConn) { if err := runAndCheckStats(t, false, nil, func(cc *grpc.ClientConn) {
testC := testpb.NewTestServiceClient(cc) testC := testpb.NewTestServiceClient(cc)
// The first non-failfast RPC succeeds, all connections are up. // The first non-failfast RPC succeeds, all connections are up.
stream, err := testC.FullDuplexCall(context.Background(), grpc.WaitForReady(true)) stream, err := testC.FullDuplexCall(context.Background(), grpc.WaitForReady(true))
@ -1255,9 +1268,7 @@ func TestGRPCLBStatsStreamingSuccess(t *testing.T) {
} }
} }
} }
}) }, &rpcStats{
if err := checkStats(stats, &rpcStats{
numCallsStarted: int64(countRPC), numCallsStarted: int64(countRPC),
numCallsFinished: int64(countRPC), numCallsFinished: int64(countRPC),
numCallsFinishedKnownReceived: int64(countRPC), numCallsFinishedKnownReceived: int64(countRPC),
@ -1268,7 +1279,7 @@ func TestGRPCLBStatsStreamingSuccess(t *testing.T) {
func TestGRPCLBStatsStreamingDrop(t *testing.T) { func TestGRPCLBStatsStreamingDrop(t *testing.T) {
defer leakcheck.Check(t) defer leakcheck.Check(t)
stats := runAndGetStats(t, true, func(cc *grpc.ClientConn) { if err := runAndCheckStats(t, true, nil, func(cc *grpc.ClientConn) {
testC := testpb.NewTestServiceClient(cc) testC := testpb.NewTestServiceClient(cc)
// The first non-failfast RPC succeeds, all connections are up. // The first non-failfast RPC succeeds, all connections are up.
stream, err := testC.FullDuplexCall(context.Background(), grpc.WaitForReady(true)) stream, err := testC.FullDuplexCall(context.Background(), grpc.WaitForReady(true))
@ -1291,9 +1302,7 @@ func TestGRPCLBStatsStreamingDrop(t *testing.T) {
} }
} }
} }
}) }, &rpcStats{
if err := checkStats(stats, &rpcStats{
numCallsStarted: int64(countRPC), numCallsStarted: int64(countRPC),
numCallsFinished: int64(countRPC), numCallsFinished: int64(countRPC),
numCallsFinishedKnownReceived: int64(countRPC) / 2, numCallsFinishedKnownReceived: int64(countRPC) / 2,
@ -1305,7 +1314,7 @@ func TestGRPCLBStatsStreamingDrop(t *testing.T) {
func TestGRPCLBStatsStreamingFailedToSend(t *testing.T) { func TestGRPCLBStatsStreamingFailedToSend(t *testing.T) {
defer leakcheck.Check(t) defer leakcheck.Check(t)
stats := runAndGetStats(t, false, func(cc *grpc.ClientConn) { if err := runAndCheckStats(t, false, nil, func(cc *grpc.ClientConn) {
testC := testpb.NewTestServiceClient(cc) testC := testpb.NewTestServiceClient(cc)
// The first non-failfast RPC succeeds, all connections are up. // The first non-failfast RPC succeeds, all connections are up.
stream, err := testC.FullDuplexCall(context.Background(), grpc.WaitForReady(true)) stream, err := testC.FullDuplexCall(context.Background(), grpc.WaitForReady(true))
@ -1320,9 +1329,7 @@ func TestGRPCLBStatsStreamingFailedToSend(t *testing.T) {
for i := 0; i < countRPC-1; i++ { for i := 0; i < countRPC-1; i++ {
cc.NewStream(context.Background(), &grpc.StreamDesc{}, failtosendURI) cc.NewStream(context.Background(), &grpc.StreamDesc{}, failtosendURI)
} }
}) }, &rpcStats{
if err := checkStats(stats, &rpcStats{
numCallsStarted: int64(countRPC)*2 - 1, numCallsStarted: int64(countRPC)*2 - 1,
numCallsFinished: int64(countRPC)*2 - 1, numCallsFinished: int64(countRPC)*2 - 1,
numCallsFinishedWithClientFailedToSend: int64(countRPC-1) * 2, numCallsFinishedWithClientFailedToSend: int64(countRPC-1) * 2,
@ -1331,3 +1338,40 @@ func TestGRPCLBStatsStreamingFailedToSend(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
} }
func TestGRPCLBStatsQuashEmpty(t *testing.T) {
defer leakcheck.Check(t)
ch := make(chan *lbpb.ClientStats)
defer close(ch)
if err := runAndCheckStats(t, false, ch, func(cc *grpc.ClientConn) {
// Perform no RPCs; wait for load reports to start, which should be
// zero, then expect no other load report within 5x the update
// interval.
select {
case st := <-ch:
if !isZeroStats(st) {
t.Errorf("got stats %v; want all zero", st)
}
case <-time.After(5 * time.Second):
t.Errorf("did not get initial stats report after 5 seconds")
return
}
select {
case st := <-ch:
t.Errorf("got unexpected stats report: %v", st)
case <-time.After(500 * time.Millisecond):
// Success.
}
go func() {
for range ch { // Drain statsChan until it is closed.
}
}()
}, &rpcStats{
numCallsStarted: 0,
numCallsFinished: 0,
numCallsFinishedKnownReceived: 0,
}); err != nil {
t.Fatal(err)
}
}