diff --git a/balancer/grpclb/grpclb_picker.go b/balancer/grpclb/grpclb_picker.go index 421dbfa1..22213cc0 100644 --- a/balancer/grpclb/grpclb_picker.go +++ b/balancer/grpclb/grpclb_picker.go @@ -49,6 +49,14 @@ func newRPCStats() *rpcStats { } } +func isZeroStats(stats *lbpb.ClientStats) bool { + return len(stats.CallsFinishedWithDrop) == 0 && + stats.NumCallsStarted == 0 && + stats.NumCallsFinished == 0 && + stats.NumCallsFinishedWithClientFailedToSend == 0 && + stats.NumCallsFinishedKnownReceived == 0 +} + // toClientStats converts rpcStats to lbpb.ClientStats, and clears rpcStats. func (s *rpcStats) toClientStats() *lbpb.ClientStats { stats := &lbpb.ClientStats{ diff --git a/balancer/grpclb/grpclb_remote_balancer.go b/balancer/grpclb/grpclb_remote_balancer.go index aa0bb516..ecaa1e78 100644 --- a/balancer/grpclb/grpclb_remote_balancer.go +++ b/balancer/grpclb/grpclb_remote_balancer.go @@ -286,6 +286,7 @@ func (ccw *remoteBalancerCCWrapper) readServerList(s *balanceLoadClientStream) e func (ccw *remoteBalancerCCWrapper) sendLoadReport(s *balanceLoadClientStream, interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() + lastZero := false for { select { case <-ticker.C: @@ -293,6 +294,12 @@ func (ccw *remoteBalancerCCWrapper) sendLoadReport(s *balanceLoadClientStream, i return } stats := ccw.lb.clientStats.toClientStats() + zero := isZeroStats(stats) + if zero && lastZero { + // Quash redundant empty load reports. + continue + } + lastZero = zero t := time.Now() stats.Timestamp = ×tamppb.Timestamp{ Seconds: t.Unix(), diff --git a/balancer/grpclb/grpclb_test.go b/balancer/grpclb/grpclb_test.go index dc2665be..128daa9f 100644 --- a/balancer/grpclb/grpclb_test.go +++ b/balancer/grpclb/grpclb_test.go @@ -166,18 +166,31 @@ func (s *rpcStats) equal(o *rpcStats) bool { return mapsEqual(s.numCallsDropped, o.numCallsDropped) } +func (s *rpcStats) String() string { + s.mu.Lock() + defer s.mu.Unlock() + return fmt.Sprintf("Started: %v, Finished: %v, FinishedWithClientFailedToSend: %v, FinishedKnownReceived: %v, Dropped: %v", + atomic.LoadInt64(&s.numCallsStarted), + atomic.LoadInt64(&s.numCallsFinished), + atomic.LoadInt64(&s.numCallsFinishedWithClientFailedToSend), + atomic.LoadInt64(&s.numCallsFinishedKnownReceived), + s.numCallsDropped) +} + type remoteBalancer struct { sls chan *lbpb.ServerList statsDura time.Duration done chan struct{} stats *rpcStats + statsChan chan *lbpb.ClientStats } -func newRemoteBalancer(intervals []time.Duration) *remoteBalancer { +func newRemoteBalancer(intervals []time.Duration, statsChan chan *lbpb.ClientStats) *remoteBalancer { return &remoteBalancer{ - sls: make(chan *lbpb.ServerList, 1), - done: make(chan struct{}), - stats: newRPCStats(), + sls: make(chan *lbpb.ServerList, 1), + done: make(chan struct{}), + stats: newRPCStats(), + statsChan: statsChan, } } @@ -218,6 +231,9 @@ func (b *remoteBalancer) BalanceLoad(stream lbgrpc.LoadBalancer_BalanceLoadServe return } b.stats.merge(req.GetClientStats()) + if b.statsChan != nil && req.GetClientStats() != nil { + b.statsChan <- req.GetClientStats() + } } }() for { @@ -295,7 +311,7 @@ type testServers struct { beListeners []net.Listener } -func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), err error) { +func newLoadBalancer(numberOfBackends int, statsChan chan *lbpb.ClientStats) (tss *testServers, cleanup func(), err error) { var ( beListeners []net.Listener ls *remoteBalancer @@ -328,7 +344,7 @@ func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), er sn: lbServerName, } lb = grpc.NewServer(grpc.Creds(lbCreds)) - ls = newRemoteBalancer(nil) + ls = newRemoteBalancer(nil, statsChan) lbgrpc.RegisterLoadBalancerServer(lb, ls) go func() { lb.Serve(lbLis) @@ -361,7 +377,7 @@ func TestGRPCLB(t *testing.T) { r, cleanup := manual.GenerateAndRegisterManualResolver() defer cleanup() - tss, cleanup, err := newLoadBalancer(1) + tss, cleanup, err := newLoadBalancer(1, nil) if err != nil { t.Fatalf("failed to create new load balancer: %v", err) } @@ -407,7 +423,7 @@ func TestGRPCLBWeighted(t *testing.T) { r, cleanup := manual.GenerateAndRegisterManualResolver() defer cleanup() - tss, cleanup, err := newLoadBalancer(2) + tss, cleanup, err := newLoadBalancer(2, nil) if err != nil { t.Fatalf("failed to create new load balancer: %v", err) } @@ -475,7 +491,7 @@ func TestDropRequest(t *testing.T) { r, cleanup := manual.GenerateAndRegisterManualResolver() defer cleanup() - tss, cleanup, err := newLoadBalancer(2) + tss, cleanup, err := newLoadBalancer(2, nil) if err != nil { t.Fatalf("failed to create new load balancer: %v", err) } @@ -633,7 +649,7 @@ func TestBalancerDisconnects(t *testing.T) { lbs []*grpc.Server ) for i := 0; i < 2; i++ { - tss, cleanup, err := newLoadBalancer(1) + tss, cleanup, err := newLoadBalancer(1, nil) if err != nil { t.Fatalf("failed to create new load balancer: %v", err) } @@ -708,7 +724,7 @@ func TestFallback(t *testing.T) { r, cleanup := manual.GenerateAndRegisterManualResolver() defer cleanup() - tss, cleanup, err := newLoadBalancer(1) + tss, cleanup, err := newLoadBalancer(1, nil) if err != nil { t.Fatalf("failed to create new load balancer: %v", err) } @@ -843,7 +859,7 @@ func TestFallBackWithNoServerAddress(t *testing.T) { } defer cleanup() - tss, cleanup, err := newLoadBalancer(1) + tss, cleanup, err := newLoadBalancer(1, nil) if err != nil { t.Fatalf("failed to create new load balancer: %v", err) } @@ -957,7 +973,7 @@ func TestGRPCLBPickFirst(t *testing.T) { r, cleanup := manual.GenerateAndRegisterManualResolver() defer cleanup() - tss, cleanup, err := newLoadBalancer(3) + tss, cleanup, err := newLoadBalancer(3, nil) if err != nil { t.Fatalf("failed to create new load balancer: %v", err) } @@ -1108,13 +1124,11 @@ func checkStats(stats, expected *rpcStats) error { return nil } -func runAndGetStats(t *testing.T, drop bool, runRPCs func(*grpc.ClientConn)) *rpcStats { - defer leakcheck.Check(t) - +func runAndCheckStats(t *testing.T, drop bool, statsChan chan *lbpb.ClientStats, runRPCs func(*grpc.ClientConn), statsWant *rpcStats) error { r, cleanup := manual.GenerateAndRegisterManualResolver() defer cleanup() - tss, cleanup, err := newLoadBalancer(1) + tss, cleanup, err := newLoadBalancer(1, statsChan) if err != nil { t.Fatalf("failed to create new load balancer: %v", err) } @@ -1152,9 +1166,14 @@ func runAndGetStats(t *testing.T, drop bool, runRPCs func(*grpc.ClientConn)) *rp }}}) runRPCs(cc) - time.Sleep(1 * time.Second) - stats := tss.ls.stats - return stats + end := time.Now().Add(time.Second) + for time.Now().Before(end) { + if err := checkStats(tss.ls.stats, statsWant); err == nil { + time.Sleep(200 * time.Millisecond) // sleep for two intervals to make sure no new stats are reported. + break + } + } + return checkStats(tss.ls.stats, statsWant) } const ( @@ -1164,7 +1183,7 @@ const ( func TestGRPCLBStatsUnarySuccess(t *testing.T) { defer leakcheck.Check(t) - stats := runAndGetStats(t, false, func(cc *grpc.ClientConn) { + if err := runAndCheckStats(t, false, nil, func(cc *grpc.ClientConn) { testC := testpb.NewTestServiceClient(cc) // The first non-failfast RPC succeeds, all connections are up. if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { @@ -1173,9 +1192,7 @@ func TestGRPCLBStatsUnarySuccess(t *testing.T) { for i := 0; i < countRPC-1; i++ { testC.EmptyCall(context.Background(), &testpb.Empty{}) } - }) - - if err := checkStats(stats, &rpcStats{ + }, &rpcStats{ numCallsStarted: int64(countRPC), numCallsFinished: int64(countRPC), numCallsFinishedKnownReceived: int64(countRPC), @@ -1186,7 +1203,7 @@ func TestGRPCLBStatsUnarySuccess(t *testing.T) { func TestGRPCLBStatsUnaryDrop(t *testing.T) { defer leakcheck.Check(t) - stats := runAndGetStats(t, true, func(cc *grpc.ClientConn) { + if err := runAndCheckStats(t, true, nil, func(cc *grpc.ClientConn) { testC := testpb.NewTestServiceClient(cc) // The first non-failfast RPC succeeds, all connections are up. if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { @@ -1195,9 +1212,7 @@ func TestGRPCLBStatsUnaryDrop(t *testing.T) { for i := 0; i < countRPC-1; i++ { testC.EmptyCall(context.Background(), &testpb.Empty{}) } - }) - - if err := checkStats(stats, &rpcStats{ + }, &rpcStats{ numCallsStarted: int64(countRPC), numCallsFinished: int64(countRPC), numCallsFinishedKnownReceived: int64(countRPC) / 2, @@ -1209,7 +1224,7 @@ func TestGRPCLBStatsUnaryDrop(t *testing.T) { func TestGRPCLBStatsUnaryFailedToSend(t *testing.T) { defer leakcheck.Check(t) - stats := runAndGetStats(t, false, func(cc *grpc.ClientConn) { + if err := runAndCheckStats(t, false, nil, func(cc *grpc.ClientConn) { testC := testpb.NewTestServiceClient(cc) // The first non-failfast RPC succeeds, all connections are up. if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { @@ -1218,9 +1233,7 @@ func TestGRPCLBStatsUnaryFailedToSend(t *testing.T) { for i := 0; i < countRPC-1; i++ { cc.Invoke(context.Background(), failtosendURI, &testpb.Empty{}, nil) } - }) - - if err := checkStats(stats, &rpcStats{ + }, &rpcStats{ numCallsStarted: int64(countRPC)*2 - 1, numCallsFinished: int64(countRPC)*2 - 1, numCallsFinishedWithClientFailedToSend: int64(countRPC-1) * 2, @@ -1232,7 +1245,7 @@ func TestGRPCLBStatsUnaryFailedToSend(t *testing.T) { func TestGRPCLBStatsStreamingSuccess(t *testing.T) { defer leakcheck.Check(t) - stats := runAndGetStats(t, false, func(cc *grpc.ClientConn) { + if err := runAndCheckStats(t, false, nil, func(cc *grpc.ClientConn) { testC := testpb.NewTestServiceClient(cc) // The first non-failfast RPC succeeds, all connections are up. stream, err := testC.FullDuplexCall(context.Background(), grpc.WaitForReady(true)) @@ -1255,9 +1268,7 @@ func TestGRPCLBStatsStreamingSuccess(t *testing.T) { } } } - }) - - if err := checkStats(stats, &rpcStats{ + }, &rpcStats{ numCallsStarted: int64(countRPC), numCallsFinished: int64(countRPC), numCallsFinishedKnownReceived: int64(countRPC), @@ -1268,7 +1279,7 @@ func TestGRPCLBStatsStreamingSuccess(t *testing.T) { func TestGRPCLBStatsStreamingDrop(t *testing.T) { defer leakcheck.Check(t) - stats := runAndGetStats(t, true, func(cc *grpc.ClientConn) { + if err := runAndCheckStats(t, true, nil, func(cc *grpc.ClientConn) { testC := testpb.NewTestServiceClient(cc) // The first non-failfast RPC succeeds, all connections are up. stream, err := testC.FullDuplexCall(context.Background(), grpc.WaitForReady(true)) @@ -1291,9 +1302,7 @@ func TestGRPCLBStatsStreamingDrop(t *testing.T) { } } } - }) - - if err := checkStats(stats, &rpcStats{ + }, &rpcStats{ numCallsStarted: int64(countRPC), numCallsFinished: int64(countRPC), numCallsFinishedKnownReceived: int64(countRPC) / 2, @@ -1305,7 +1314,7 @@ func TestGRPCLBStatsStreamingDrop(t *testing.T) { func TestGRPCLBStatsStreamingFailedToSend(t *testing.T) { defer leakcheck.Check(t) - stats := runAndGetStats(t, false, func(cc *grpc.ClientConn) { + if err := runAndCheckStats(t, false, nil, func(cc *grpc.ClientConn) { testC := testpb.NewTestServiceClient(cc) // The first non-failfast RPC succeeds, all connections are up. stream, err := testC.FullDuplexCall(context.Background(), grpc.WaitForReady(true)) @@ -1320,9 +1329,7 @@ func TestGRPCLBStatsStreamingFailedToSend(t *testing.T) { for i := 0; i < countRPC-1; i++ { cc.NewStream(context.Background(), &grpc.StreamDesc{}, failtosendURI) } - }) - - if err := checkStats(stats, &rpcStats{ + }, &rpcStats{ numCallsStarted: int64(countRPC)*2 - 1, numCallsFinished: int64(countRPC)*2 - 1, numCallsFinishedWithClientFailedToSend: int64(countRPC-1) * 2, @@ -1331,3 +1338,40 @@ func TestGRPCLBStatsStreamingFailedToSend(t *testing.T) { t.Fatal(err) } } + +func TestGRPCLBStatsQuashEmpty(t *testing.T) { + defer leakcheck.Check(t) + ch := make(chan *lbpb.ClientStats) + defer close(ch) + if err := runAndCheckStats(t, false, ch, func(cc *grpc.ClientConn) { + // Perform no RPCs; wait for load reports to start, which should be + // zero, then expect no other load report within 5x the update + // interval. + select { + case st := <-ch: + if !isZeroStats(st) { + t.Errorf("got stats %v; want all zero", st) + } + case <-time.After(5 * time.Second): + t.Errorf("did not get initial stats report after 5 seconds") + return + } + + select { + case st := <-ch: + t.Errorf("got unexpected stats report: %v", st) + case <-time.After(500 * time.Millisecond): + // Success. + } + go func() { + for range ch { // Drain statsChan until it is closed. + } + }() + }, &rpcStats{ + numCallsStarted: 0, + numCallsFinished: 0, + numCallsFinishedKnownReceived: 0, + }); err != nil { + t.Fatal(err) + } +}