diff --git a/health/server.go b/health/server.go index eebecccf..2afeb3b5 100644 --- a/health/server.go +++ b/health/server.go @@ -83,10 +83,16 @@ func (s *Server) Watch(in *healthpb.HealthCheckRequest, stream healthpb.Health_W s.mu.Unlock() }() s.mu.Unlock() + + var lastSentStatus healthpb.HealthCheckResponse_ServingStatus = -1 for { select { // Status updated. Sends the up-to-date status to the client. case servingStatus := <-update: + if lastSentStatus == servingStatus { + continue + } + lastSentStatus = servingStatus err := stream.Send(&healthpb.HealthCheckResponse{Status: servingStatus}) if err != nil { return status.Error(codes.Canceled, "Stream has ended.") @@ -102,6 +108,8 @@ func (s *Server) Watch(in *healthpb.HealthCheckRequest, stream healthpb.Health_W // or insert a new service entry into the statusMap. func (s *Server) SetServingStatus(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) { s.mu.Lock() + defer s.mu.Unlock() + s.statusMap[service] = servingStatus for _, update := range s.updates[service] { // Clears previous updates, that are not sent to the client, from the channel. @@ -113,5 +121,4 @@ func (s *Server) SetServingStatus(service string, servingStatus healthpb.HealthC // Puts the most recent update to the channel. update <- servingStatus } - s.mu.Unlock() } diff --git a/test/end2end_test.go b/test/end2end_test.go index 30d8a8c8..b8c2bee9 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -2481,6 +2481,48 @@ func testHealthWatchMultipleClients(t *testing.T, e env) { healthWatchChecker(t, stream2, healthpb.HealthCheckResponse_NOT_SERVING) } +func TestHealthWatchSameStatus(t *testing.T) { + defer leakcheck.Check(t) + for _, e := range listTestEnv() { + testHealthWatchSameStatus(t, e) + } +} + +func testHealthWatchSameStatus(t *testing.T, e env) { + const service = "grpc.health.v1.Health1" + + hs := health.NewServer() + + te := newTest(t, e) + te.healthServer = hs + te.startServer(&testServer{security: e.security}) + defer te.tearDown() + + cc := te.clientConn() + hc := healthgrpc.NewHealthClient(cc) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + req := &healthpb.HealthCheckRequest{ + Service: service, + } + + stream1, err := hc.Watch(ctx, req) + if err != nil { + t.Fatalf("error: %v", err) + } + + hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING) + + healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_SERVING) + + hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING) + hs.SetServingStatus(service, healthpb.HealthCheckResponse_NOT_SERVING) + + healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_NOT_SERVING) +} + func TestHealthWatchServiceStatusSetBeforeStartingServer(t *testing.T) { defer leakcheck.Check(t) for _, e := range listTestEnv() {