health: Checks if the service's serving status has changed before sending an update (#2424)

This commit is contained in:
Can Guler
2018-10-31 15:45:18 -07:00
committed by GitHub
parent 6572a802a1
commit c86bc5cd5f
2 changed files with 50 additions and 1 deletions

View File

@ -83,10 +83,16 @@ func (s *Server) Watch(in *healthpb.HealthCheckRequest, stream healthpb.Health_W
s.mu.Unlock() s.mu.Unlock()
}() }()
s.mu.Unlock() s.mu.Unlock()
var lastSentStatus healthpb.HealthCheckResponse_ServingStatus = -1
for { for {
select { select {
// Status updated. Sends the up-to-date status to the client. // Status updated. Sends the up-to-date status to the client.
case servingStatus := <-update: case servingStatus := <-update:
if lastSentStatus == servingStatus {
continue
}
lastSentStatus = servingStatus
err := stream.Send(&healthpb.HealthCheckResponse{Status: servingStatus}) err := stream.Send(&healthpb.HealthCheckResponse{Status: servingStatus})
if err != nil { if err != nil {
return status.Error(codes.Canceled, "Stream has ended.") return status.Error(codes.Canceled, "Stream has ended.")
@ -102,6 +108,8 @@ func (s *Server) Watch(in *healthpb.HealthCheckRequest, stream healthpb.Health_W
// or insert a new service entry into the statusMap. // or insert a new service entry into the statusMap.
func (s *Server) SetServingStatus(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) { func (s *Server) SetServingStatus(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock()
s.statusMap[service] = servingStatus s.statusMap[service] = servingStatus
for _, update := range s.updates[service] { for _, update := range s.updates[service] {
// Clears previous updates, that are not sent to the client, from the channel. // Clears previous updates, that are not sent to the client, from the channel.
@ -113,5 +121,4 @@ func (s *Server) SetServingStatus(service string, servingStatus healthpb.HealthC
// Puts the most recent update to the channel. // Puts the most recent update to the channel.
update <- servingStatus update <- servingStatus
} }
s.mu.Unlock()
} }

View File

@ -2481,6 +2481,48 @@ func testHealthWatchMultipleClients(t *testing.T, e env) {
healthWatchChecker(t, stream2, healthpb.HealthCheckResponse_NOT_SERVING) healthWatchChecker(t, stream2, healthpb.HealthCheckResponse_NOT_SERVING)
} }
func TestHealthWatchSameStatus(t *testing.T) {
defer leakcheck.Check(t)
for _, e := range listTestEnv() {
testHealthWatchSameStatus(t, e)
}
}
func testHealthWatchSameStatus(t *testing.T, e env) {
const service = "grpc.health.v1.Health1"
hs := health.NewServer()
te := newTest(t, e)
te.healthServer = hs
te.startServer(&testServer{security: e.security})
defer te.tearDown()
cc := te.clientConn()
hc := healthgrpc.NewHealthClient(cc)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
req := &healthpb.HealthCheckRequest{
Service: service,
}
stream1, err := hc.Watch(ctx, req)
if err != nil {
t.Fatalf("error: %v", err)
}
hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING)
healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_SERVING)
hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING)
hs.SetServingStatus(service, healthpb.HealthCheckResponse_NOT_SERVING)
healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_NOT_SERVING)
}
func TestHealthWatchServiceStatusSetBeforeStartingServer(t *testing.T) { func TestHealthWatchServiceStatusSetBeforeStartingServer(t *testing.T) {
defer leakcheck.Check(t) defer leakcheck.Check(t)
for _, e := range listTestEnv() { for _, e := range listTestEnv() {