From 23d111fa0dab0e9b31490819a23c2ef227f4b3dc Mon Sep 17 00:00:00 2001 From: Can Guler Date: Tue, 16 Oct 2018 14:39:16 -0700 Subject: [PATCH] Health server watch method (#2365) Implements the Watch method of Health server and it's end2end tests. --- health/health.go | 60 ++++++++++--- test/end2end_test.go | 210 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 260 insertions(+), 10 deletions(-) diff --git a/health/health.go b/health/health.go index 6a349b2f..10666f2d 100644 --- a/health/health.go +++ b/health/health.go @@ -36,12 +36,14 @@ type Server struct { mu sync.Mutex // statusMap stores the serving status of the services this Server monitors. statusMap map[string]healthpb.HealthCheckResponse_ServingStatus + updates map[string]map[healthpb.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus } // NewServer returns a new Server. func NewServer() *Server { return &Server{ - statusMap: make(map[string]healthpb.HealthCheckResponse_ServingStatus), + statusMap: map[string]healthpb.HealthCheckResponse_ServingStatus{"": healthpb.HealthCheckResponse_SERVING}, + updates: make(map[string]map[healthpb.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus), } } @@ -49,12 +51,6 @@ func NewServer() *Server { func (s *Server) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) { s.mu.Lock() defer s.mu.Unlock() - if in.Service == "" { - // check the server overall health status. - return &healthpb.HealthCheckResponse{ - Status: healthpb.HealthCheckResponse_SERVING, - }, nil - } if servingStatus, ok := s.statusMap[in.Service]; ok { return &healthpb.HealthCheckResponse{ Status: servingStatus, @@ -65,13 +61,57 @@ func (s *Server) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*h // Watch implements `service Health`. func (s *Server) Watch(in *healthpb.HealthCheckRequest, stream healthpb.Health_WatchServer) error { - return status.Error(codes.Unimplemented, "Watching is not supported") + service := in.Service + // update channel is used for getting service status updates. + update := make(chan healthpb.HealthCheckResponse_ServingStatus, 1) + s.mu.Lock() + // Puts the initial status to the channel. + if servingStatus, ok := s.statusMap[service]; ok { + update <- servingStatus + } else { + update <- healthpb.HealthCheckResponse_SERVICE_UNKNOWN + } + + // Registers the update channel to the correct place in the updates map. + if _, ok := s.updates[service]; !ok { + s.updates[service] = make(map[healthpb.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus) + } + s.updates[service][stream] = update + defer func() { + s.mu.Lock() + delete(s.updates[service], stream) + s.mu.Unlock() + }() + s.mu.Unlock() + for { + select { + // Status updated. Sends the up-to-date status to the client. + case servingStatus := <-update: + err := stream.Send(&healthpb.HealthCheckResponse{Status: servingStatus}) + if err != nil { + return status.Error(codes.Canceled, "Stream has ended.") + } + // Context done. Removes the update channel from the updates map. + case <-stream.Context().Done(): + return status.Error(codes.Canceled, "Stream has ended.") + } + } } // SetServingStatus is called when need to reset the serving status of a service // or insert a new service entry into the statusMap. -func (s *Server) SetServingStatus(service string, status healthpb.HealthCheckResponse_ServingStatus) { +func (s *Server) SetServingStatus(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) { s.mu.Lock() - s.statusMap[service] = status + s.statusMap[service] = servingStatus + for _, update := range s.updates[service] { + // Clears previous updates, that are not sent to the client, from the channel. + // This can happen if the client is not reading and the server gets flow control limited. + select { + case <-update: + default: + } + // Puts the most recent update to the channel. + update <- servingStatus + } s.mu.Unlock() } diff --git a/test/end2end_test.go b/test/end2end_test.go index 22741ab5..11b3bc8b 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -2432,6 +2432,216 @@ func testHealthCheckOff(t *testing.T, e env) { } } +func TestHealthWatchMultipleClients(t *testing.T) { + defer leakcheck.Check(t) + for _, e := range listTestEnv() { + testHealthWatchMultipleClients(t, e) + } +} + +func testHealthWatchMultipleClients(t *testing.T, e env) { + const service = "grpc.health.v1.Health1" + + hs := health.NewServer() + + te := newTest(t, e) + te.healthServer = hs + te.startServer(&testServer{security: e.security}) + defer te.tearDown() + + cc := te.clientConn() + hc := healthgrpc.NewHealthClient(cc) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + req := &healthpb.HealthCheckRequest{ + Service: service, + } + + stream1, err := hc.Watch(ctx, req) + if err != nil { + t.Fatalf("error: %v", err) + } + + healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_SERVICE_UNKNOWN) + + stream2, err := hc.Watch(ctx, req) + if err != nil { + t.Fatalf("error: %v", err) + } + + healthWatchChecker(t, stream2, healthpb.HealthCheckResponse_SERVICE_UNKNOWN) + + hs.SetServingStatus(service, healthpb.HealthCheckResponse_NOT_SERVING) + + healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_NOT_SERVING) + healthWatchChecker(t, stream2, healthpb.HealthCheckResponse_NOT_SERVING) +} + +func TestHealthWatchServiceStatusSetBeforeStartingServer(t *testing.T) { + defer leakcheck.Check(t) + for _, e := range listTestEnv() { + testHealthWatchSetServiceStatusBeforeStartingServer(t, e) + } +} + +func testHealthWatchSetServiceStatusBeforeStartingServer(t *testing.T, e env) { + const service = "grpc.health.v1.Health1" + + hs := health.NewServer() + + te := newTest(t, e) + te.healthServer = hs + + hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING) + + te.startServer(&testServer{security: e.security}) + defer te.tearDown() + + cc := te.clientConn() + hc := healthgrpc.NewHealthClient(cc) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + req := &healthpb.HealthCheckRequest{ + Service: service, + } + + stream, err := hc.Watch(ctx, req) + if err != nil { + t.Fatalf("error: %v", err) + } + + healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING) +} + +func TestHealthWatchDefaultStatusChange(t *testing.T) { + defer leakcheck.Check(t) + for _, e := range listTestEnv() { + testHealthWatchDefaultStatusChange(t, e) + } +} + +func testHealthWatchDefaultStatusChange(t *testing.T, e env) { + const service = "grpc.health.v1.Health1" + + hs := health.NewServer() + + te := newTest(t, e) + te.healthServer = hs + te.startServer(&testServer{security: e.security}) + defer te.tearDown() + + cc := te.clientConn() + hc := healthgrpc.NewHealthClient(cc) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + req := &healthpb.HealthCheckRequest{ + Service: service, + } + + stream, err := hc.Watch(ctx, req) + if err != nil { + t.Fatalf("error: %v", err) + } + + healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVICE_UNKNOWN) + + hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING) + + healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING) +} + +func TestHealthWatchSetServiceStatusBeforeClientCallsWatch(t *testing.T) { + defer leakcheck.Check(t) + for _, e := range listTestEnv() { + testHealthWatchSetServiceStatusBeforeClientCallsWatch(t, e) + } +} + +func testHealthWatchSetServiceStatusBeforeClientCallsWatch(t *testing.T, e env) { + const service = "grpc.health.v1.Health1" + + hs := health.NewServer() + + te := newTest(t, e) + te.healthServer = hs + te.startServer(&testServer{security: e.security}) + defer te.tearDown() + + cc := te.clientConn() + hc := healthgrpc.NewHealthClient(cc) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + req := &healthpb.HealthCheckRequest{ + Service: service, + } + + hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING) + + stream, err := hc.Watch(ctx, req) + if err != nil { + t.Fatalf("error: %v", err) + } + + healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING) +} + +func TestHealthWatchOverallServerHealthChange(t *testing.T) { + defer leakcheck.Check(t) + for _, e := range listTestEnv() { + testHealthWatchOverallServerHealthChange(t, e) + } +} + +func testHealthWatchOverallServerHealthChange(t *testing.T, e env) { + const service = "" + + hs := health.NewServer() + + te := newTest(t, e) + te.healthServer = hs + te.startServer(&testServer{security: e.security}) + defer te.tearDown() + + cc := te.clientConn() + hc := healthgrpc.NewHealthClient(cc) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + req := &healthpb.HealthCheckRequest{ + Service: service, + } + + stream, err := hc.Watch(ctx, req) + if err != nil { + t.Fatalf("error: %v", err) + } + + healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING) + + hs.SetServingStatus(service, healthpb.HealthCheckResponse_NOT_SERVING) + + healthWatchChecker(t, stream, healthpb.HealthCheckResponse_NOT_SERVING) +} + +func healthWatchChecker(t *testing.T, stream healthpb.Health_WatchClient, expectedServingStatus healthpb.HealthCheckResponse_ServingStatus) { + response, err := stream.Recv() + if err != nil { + t.Fatalf("error on %v.Recv(): %v", stream, err) + } + if response.Status != expectedServingStatus { + t.Fatalf("response.Status is %v (%v expected)", response.Status, expectedServingStatus) + } +} + func TestUnknownHandler(t *testing.T) { defer leakcheck.Check(t) // An example unknownHandler that returns a different code and a different method, making sure that we do not