Health server watch method (#2365)

Implements the Watch method of Health server and it's end2end tests.
This commit is contained in:
Can Guler
2018-10-16 14:39:16 -07:00
committed by GitHub
parent cfb9600851
commit 23d111fa0d
2 changed files with 260 additions and 10 deletions

View File

@ -36,12 +36,14 @@ type Server struct {
mu sync.Mutex mu sync.Mutex
// statusMap stores the serving status of the services this Server monitors. // statusMap stores the serving status of the services this Server monitors.
statusMap map[string]healthpb.HealthCheckResponse_ServingStatus statusMap map[string]healthpb.HealthCheckResponse_ServingStatus
updates map[string]map[healthpb.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus
} }
// NewServer returns a new Server. // NewServer returns a new Server.
func NewServer() *Server { func NewServer() *Server {
return &Server{ return &Server{
statusMap: make(map[string]healthpb.HealthCheckResponse_ServingStatus), statusMap: map[string]healthpb.HealthCheckResponse_ServingStatus{"": healthpb.HealthCheckResponse_SERVING},
updates: make(map[string]map[healthpb.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus),
} }
} }
@ -49,12 +51,6 @@ func NewServer() *Server {
func (s *Server) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) { func (s *Server) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
if in.Service == "" {
// check the server overall health status.
return &healthpb.HealthCheckResponse{
Status: healthpb.HealthCheckResponse_SERVING,
}, nil
}
if servingStatus, ok := s.statusMap[in.Service]; ok { if servingStatus, ok := s.statusMap[in.Service]; ok {
return &healthpb.HealthCheckResponse{ return &healthpb.HealthCheckResponse{
Status: servingStatus, Status: servingStatus,
@ -65,13 +61,57 @@ func (s *Server) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*h
// Watch implements `service Health`. // Watch implements `service Health`.
func (s *Server) Watch(in *healthpb.HealthCheckRequest, stream healthpb.Health_WatchServer) error { func (s *Server) Watch(in *healthpb.HealthCheckRequest, stream healthpb.Health_WatchServer) error {
return status.Error(codes.Unimplemented, "Watching is not supported") service := in.Service
// update channel is used for getting service status updates.
update := make(chan healthpb.HealthCheckResponse_ServingStatus, 1)
s.mu.Lock()
// Puts the initial status to the channel.
if servingStatus, ok := s.statusMap[service]; ok {
update <- servingStatus
} else {
update <- healthpb.HealthCheckResponse_SERVICE_UNKNOWN
}
// Registers the update channel to the correct place in the updates map.
if _, ok := s.updates[service]; !ok {
s.updates[service] = make(map[healthpb.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus)
}
s.updates[service][stream] = update
defer func() {
s.mu.Lock()
delete(s.updates[service], stream)
s.mu.Unlock()
}()
s.mu.Unlock()
for {
select {
// Status updated. Sends the up-to-date status to the client.
case servingStatus := <-update:
err := stream.Send(&healthpb.HealthCheckResponse{Status: servingStatus})
if err != nil {
return status.Error(codes.Canceled, "Stream has ended.")
}
// Context done. Removes the update channel from the updates map.
case <-stream.Context().Done():
return status.Error(codes.Canceled, "Stream has ended.")
}
}
} }
// SetServingStatus is called when need to reset the serving status of a service // SetServingStatus is called when need to reset the serving status of a service
// or insert a new service entry into the statusMap. // or insert a new service entry into the statusMap.
func (s *Server) SetServingStatus(service string, status healthpb.HealthCheckResponse_ServingStatus) { func (s *Server) SetServingStatus(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) {
s.mu.Lock() s.mu.Lock()
s.statusMap[service] = status s.statusMap[service] = servingStatus
for _, update := range s.updates[service] {
// Clears previous updates, that are not sent to the client, from the channel.
// This can happen if the client is not reading and the server gets flow control limited.
select {
case <-update:
default:
}
// Puts the most recent update to the channel.
update <- servingStatus
}
s.mu.Unlock() s.mu.Unlock()
} }

View File

@ -2432,6 +2432,216 @@ func testHealthCheckOff(t *testing.T, e env) {
} }
} }
func TestHealthWatchMultipleClients(t *testing.T) {
defer leakcheck.Check(t)
for _, e := range listTestEnv() {
testHealthWatchMultipleClients(t, e)
}
}
func testHealthWatchMultipleClients(t *testing.T, e env) {
const service = "grpc.health.v1.Health1"
hs := health.NewServer()
te := newTest(t, e)
te.healthServer = hs
te.startServer(&testServer{security: e.security})
defer te.tearDown()
cc := te.clientConn()
hc := healthgrpc.NewHealthClient(cc)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
req := &healthpb.HealthCheckRequest{
Service: service,
}
stream1, err := hc.Watch(ctx, req)
if err != nil {
t.Fatalf("error: %v", err)
}
healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_SERVICE_UNKNOWN)
stream2, err := hc.Watch(ctx, req)
if err != nil {
t.Fatalf("error: %v", err)
}
healthWatchChecker(t, stream2, healthpb.HealthCheckResponse_SERVICE_UNKNOWN)
hs.SetServingStatus(service, healthpb.HealthCheckResponse_NOT_SERVING)
healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_NOT_SERVING)
healthWatchChecker(t, stream2, healthpb.HealthCheckResponse_NOT_SERVING)
}
func TestHealthWatchServiceStatusSetBeforeStartingServer(t *testing.T) {
defer leakcheck.Check(t)
for _, e := range listTestEnv() {
testHealthWatchSetServiceStatusBeforeStartingServer(t, e)
}
}
func testHealthWatchSetServiceStatusBeforeStartingServer(t *testing.T, e env) {
const service = "grpc.health.v1.Health1"
hs := health.NewServer()
te := newTest(t, e)
te.healthServer = hs
hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING)
te.startServer(&testServer{security: e.security})
defer te.tearDown()
cc := te.clientConn()
hc := healthgrpc.NewHealthClient(cc)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
req := &healthpb.HealthCheckRequest{
Service: service,
}
stream, err := hc.Watch(ctx, req)
if err != nil {
t.Fatalf("error: %v", err)
}
healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING)
}
func TestHealthWatchDefaultStatusChange(t *testing.T) {
defer leakcheck.Check(t)
for _, e := range listTestEnv() {
testHealthWatchDefaultStatusChange(t, e)
}
}
func testHealthWatchDefaultStatusChange(t *testing.T, e env) {
const service = "grpc.health.v1.Health1"
hs := health.NewServer()
te := newTest(t, e)
te.healthServer = hs
te.startServer(&testServer{security: e.security})
defer te.tearDown()
cc := te.clientConn()
hc := healthgrpc.NewHealthClient(cc)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
req := &healthpb.HealthCheckRequest{
Service: service,
}
stream, err := hc.Watch(ctx, req)
if err != nil {
t.Fatalf("error: %v", err)
}
healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVICE_UNKNOWN)
hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING)
healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING)
}
func TestHealthWatchSetServiceStatusBeforeClientCallsWatch(t *testing.T) {
defer leakcheck.Check(t)
for _, e := range listTestEnv() {
testHealthWatchSetServiceStatusBeforeClientCallsWatch(t, e)
}
}
func testHealthWatchSetServiceStatusBeforeClientCallsWatch(t *testing.T, e env) {
const service = "grpc.health.v1.Health1"
hs := health.NewServer()
te := newTest(t, e)
te.healthServer = hs
te.startServer(&testServer{security: e.security})
defer te.tearDown()
cc := te.clientConn()
hc := healthgrpc.NewHealthClient(cc)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
req := &healthpb.HealthCheckRequest{
Service: service,
}
hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING)
stream, err := hc.Watch(ctx, req)
if err != nil {
t.Fatalf("error: %v", err)
}
healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING)
}
func TestHealthWatchOverallServerHealthChange(t *testing.T) {
defer leakcheck.Check(t)
for _, e := range listTestEnv() {
testHealthWatchOverallServerHealthChange(t, e)
}
}
func testHealthWatchOverallServerHealthChange(t *testing.T, e env) {
const service = ""
hs := health.NewServer()
te := newTest(t, e)
te.healthServer = hs
te.startServer(&testServer{security: e.security})
defer te.tearDown()
cc := te.clientConn()
hc := healthgrpc.NewHealthClient(cc)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
req := &healthpb.HealthCheckRequest{
Service: service,
}
stream, err := hc.Watch(ctx, req)
if err != nil {
t.Fatalf("error: %v", err)
}
healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING)
hs.SetServingStatus(service, healthpb.HealthCheckResponse_NOT_SERVING)
healthWatchChecker(t, stream, healthpb.HealthCheckResponse_NOT_SERVING)
}
func healthWatchChecker(t *testing.T, stream healthpb.Health_WatchClient, expectedServingStatus healthpb.HealthCheckResponse_ServingStatus) {
response, err := stream.Recv()
if err != nil {
t.Fatalf("error on %v.Recv(): %v", stream, err)
}
if response.Status != expectedServingStatus {
t.Fatalf("response.Status is %v (%v expected)", response.Status, expectedServingStatus)
}
}
func TestUnknownHandler(t *testing.T) { func TestUnknownHandler(t *testing.T) {
defer leakcheck.Check(t) defer leakcheck.Check(t)
// An example unknownHandler that returns a different code and a different method, making sure that we do not // An example unknownHandler that returns a different code and a different method, making sure that we do not