diff --git a/health/grpc_health/health.proto b/health/grpc_health/health.proto deleted file mode 100644 index 40ae67d9..00000000 --- a/health/grpc_health/health.proto +++ /dev/null @@ -1,13 +0,0 @@ -syntax = "proto3"; - -package grpc.health; - -message HealthCheckRequest{ -} - -message HealthCheckResponse{ -} - -service HealthCheck{ - rpc Check( HealthCheckRequest) returns ( HealthCheckResponse); -} diff --git a/health/grpc_health/health.pb.go b/health/grpc_health_v1alpha/health.pb.go similarity index 64% rename from health/grpc_health/health.pb.go rename to health/grpc_health_v1alpha/health.pb.go index 068597f8..14356f27 100644 --- a/health/grpc_health/health.pb.go +++ b/health/grpc_health_v1alpha/health.pb.go @@ -3,7 +3,7 @@ // DO NOT EDIT! /* -Package grpc_health is a generated protocol buffer package. +Package grpc_health_v1alpha is a generated protocol buffer package. It is generated from these files: health.proto @@ -12,7 +12,7 @@ It has these top-level messages: HealthCheckRequest HealthCheckResponse */ -package grpc_health +package grpc_health_v1alpha import proto "github.com/golang/protobuf/proto" @@ -28,7 +28,32 @@ var _ grpc.ClientConn // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal +type HealthCheckResponse_ServingStatus int32 + +const ( + HealthCheckResponse_UNKNOWN HealthCheckResponse_ServingStatus = 0 + HealthCheckResponse_SERVING HealthCheckResponse_ServingStatus = 1 + HealthCheckResponse_NOT_SERVING HealthCheckResponse_ServingStatus = 2 +) + +var HealthCheckResponse_ServingStatus_name = map[int32]string{ + 0: "UNKNOWN", + 1: "SERVING", + 2: "NOT_SERVING", +} +var HealthCheckResponse_ServingStatus_value = map[string]int32{ + "UNKNOWN": 0, + "SERVING": 1, + "NOT_SERVING": 2, +} + +func (x HealthCheckResponse_ServingStatus) String() string { + return proto.EnumName(HealthCheckResponse_ServingStatus_name, int32(x)) +} + type HealthCheckRequest struct { + Host string `protobuf:"bytes,1,opt,name=host" json:"host,omitempty"` + Service string `protobuf:"bytes,2,opt,name=service" json:"service,omitempty"` } func (m *HealthCheckRequest) Reset() { *m = HealthCheckRequest{} } @@ -36,6 +61,7 @@ func (m *HealthCheckRequest) String() string { return proto.CompactTextString(m) func (*HealthCheckRequest) ProtoMessage() {} type HealthCheckResponse struct { + Status HealthCheckResponse_ServingStatus `protobuf:"varint,1,opt,name=status,enum=grpc.health.v1alpha.HealthCheckResponse_ServingStatus" json:"status,omitempty"` } func (m *HealthCheckResponse) Reset() { *m = HealthCheckResponse{} } @@ -43,6 +69,7 @@ func (m *HealthCheckResponse) String() string { return proto.CompactTextString(m func (*HealthCheckResponse) ProtoMessage() {} func init() { + proto.RegisterEnum("grpc.health.v1alpha.HealthCheckResponse_ServingStatus", HealthCheckResponse_ServingStatus_name, HealthCheckResponse_ServingStatus_value) } // Client API for HealthCheck service @@ -61,7 +88,7 @@ func NewHealthCheckClient(cc *grpc.ClientConn) HealthCheckClient { func (c *healthCheckClient) Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) { out := new(HealthCheckResponse) - err := grpc.Invoke(ctx, "/grpc.health.HealthCheck/Check", in, out, c.cc, opts...) + err := grpc.Invoke(ctx, "/grpc.health.v1alpha.HealthCheck/Check", in, out, c.cc, opts...) if err != nil { return nil, err } @@ -91,7 +118,7 @@ func _HealthCheck_Check_Handler(srv interface{}, ctx context.Context, codec grpc } var _HealthCheck_serviceDesc = grpc.ServiceDesc{ - ServiceName: "grpc.health.HealthCheck", + ServiceName: "grpc.health.v1alpha.HealthCheck", HandlerType: (*HealthCheckServer)(nil), Methods: []grpc.MethodDesc{ { diff --git a/health/grpc_health_v1alpha/health.proto b/health/grpc_health_v1alpha/health.proto new file mode 100644 index 00000000..747a4f03 --- /dev/null +++ b/health/grpc_health_v1alpha/health.proto @@ -0,0 +1,21 @@ +syntax = "proto3"; + +package grpc.health.v1alpha; + +message HealthCheckRequest { + string host = 1; + string service = 2; +} + +message HealthCheckResponse { + enum ServingStatus { + UNKNOWN = 0; + SERVING = 1; + NOT_SERVING = 2; + } + ServingStatus status = 1; +} + +service HealthCheck{ + rpc Check(HealthCheckRequest) returns (HealthCheckResponse); +} diff --git a/health/health.go b/health/health.go index 3089f3e4..e7b63853 100644 --- a/health/health.go +++ b/health/health.go @@ -3,26 +3,43 @@ package health import ( - "time" + "sync" "golang.org/x/net/context" "google.golang.org/grpc" - healthpb "google.golang.org/grpc/health/grpc_health" + "google.golang.org/grpc/codes" + healthpb "google.golang.org/grpc/health/grpc_health_v1alpha" ) -// HealthCheck is the client side function to health-check a server -func HealthCheck(t time.Duration, cc *grpc.ClientConn) error { - ctx, _ := context.WithTimeout(context.Background(), t) - hc := healthpb.NewHealthCheckClient(cc) - req := new(healthpb.HealthCheckRequest) - _, err := hc.Check(ctx, req) - return err +type HealthServer struct { + mu sync.Mutex + // statusMap stores the serving status of the services this HealthServer monitors. + statusMap map[string]healthpb.HealthCheckResponse_ServingStatus } -type HealthServer struct { +func NewHealthServer() *HealthServer { + return &HealthServer{ + statusMap: make(map[string]healthpb.HealthCheckResponse_ServingStatus), + } } func (s *HealthServer) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) { - out := new(healthpb.HealthCheckResponse) - return out, nil + service := in.Host + ":" + in.Service + s.mu.Lock() + defer s.mu.Unlock() + if status, ok := s.statusMap[service]; ok { + return &healthpb.HealthCheckResponse{ + Status: status, + }, nil + } + return nil, grpc.Errorf(codes.NotFound, "unknown service") +} + +// SetServingStatus is called when need to reset the serving status of a service +// or insert a new service entry into the statusMap. +func (s *HealthServer) SetServingStatus(host string, service string, status healthpb.HealthCheckResponse_ServingStatus) { + service = host + ":" + service + s.mu.Lock() + s.statusMap[service] = status + s.mu.Unlock() } diff --git a/test/end2end_test.go b/test/end2end_test.go index b00a487f..8430ed28 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -52,7 +52,7 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/health" - healthpb "google.golang.org/grpc/health/grpc_health" + healthpb "google.golang.org/grpc/health/grpc_health_v1alpha" "google.golang.org/grpc/metadata" testpb "google.golang.org/grpc/test/grpc_testing" ) @@ -285,7 +285,7 @@ func listTestEnv() []env { return []env{env{"tcp", nil, ""}, env{"tcp", nil, "tls"}, env{"unix", unixDialer, ""}, env{"unix", unixDialer, "tls"}} } -func setUp(healthCheck bool, maxStream uint32, e env) (s *grpc.Server, cc *grpc.ClientConn) { +func setUp(hs *health.HealthServer, maxStream uint32, e env) (s *grpc.Server, cc *grpc.ClientConn) { sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(maxStream)} la := ":0" switch e.network { @@ -305,8 +305,8 @@ func setUp(healthCheck bool, maxStream uint32, e env) (s *grpc.Server, cc *grpc. sopts = append(sopts, grpc.Creds(creds)) } s = grpc.NewServer(sopts...) - if healthCheck { - healthpb.RegisterHealthCheckServer(s, &health.HealthServer{}) + if hs != nil { + healthpb.RegisterHealthCheckServer(s, hs) } testpb.RegisterTestServiceServer(s, &testServer{}) go s.Serve(lis) @@ -347,7 +347,7 @@ func TestTimeoutOnDeadServer(t *testing.T) { } func testTimeoutOnDeadServer(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) s.Stop() // Set -1 as the timeout to make sure if transportMonitor gets error @@ -360,6 +360,16 @@ func testTimeoutOnDeadServer(t *testing.T, e env) { cc.Close() } +func healthCheck(t time.Duration, cc *grpc.ClientConn, serviceName string) (*healthpb.HealthCheckResponse, error) { + ctx, _ := context.WithTimeout(context.Background(), t) + hc := healthpb.NewHealthCheckClient(cc) + req := &healthpb.HealthCheckRequest{ + Host: "", + Service: serviceName, + } + return hc.Check(ctx, req) +} + func TestHealthCheckOnSuccess(t *testing.T) { for _, e := range listTestEnv() { testHealthCheckOnSuccess(t, e) @@ -367,10 +377,12 @@ func TestHealthCheckOnSuccess(t *testing.T) { } func testHealthCheckOnSuccess(t *testing.T, e env) { - s, cc := setUp(true, math.MaxUint32, e) + hs := health.NewHealthServer() + hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", 1) + s, cc := setUp(hs, math.MaxUint32, e) defer tearDown(s, cc) - if err := health.HealthCheck(1*time.Second, cc); err != nil { - t.Fatalf("HealthCheck(_)=_, %v, want ", err) + if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != nil { + t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, ", err) } } @@ -381,10 +393,12 @@ func TestHealthCheckOnFailure(t *testing.T) { } func testHealthCheckOnFailure(t *testing.T, e env) { - s, cc := setUp(true, math.MaxUint32, e) + hs := health.NewHealthServer() + hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", 1) + s, cc := setUp(hs, math.MaxUint32, e) defer tearDown(s, cc) - if err := health.HealthCheck(0*time.Second, cc); err != grpc.Errorf(codes.DeadlineExceeded, "context deadline exceeded") { - t.Fatalf("HealthCheck(_)=_, %v, want error code %d", err, codes.DeadlineExceeded) + if _, err := healthCheck(0*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != grpc.Errorf(codes.DeadlineExceeded, "context deadline exceeded") { + t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, error code %d", err, codes.DeadlineExceeded) } } @@ -395,14 +409,45 @@ func TestHealthCheckOff(t *testing.T) { } func testHealthCheckOff(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) defer tearDown(s, cc) - err := health.HealthCheck(1*time.Second, cc) - if err != grpc.Errorf(codes.Unimplemented, "unknown service grpc.health.HealthCheck") { - t.Fatalf("HealthCheck(_)=_, %v, want error code %d", err, codes.DeadlineExceeded) + if _, err := healthCheck(1*time.Second, cc, ""); err != grpc.Errorf(codes.Unimplemented, "unknown service grpc.health.v1alpha.HealthCheck") { + t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, error code %d", err, codes.Unimplemented) } } +func TestHealthCheckServingStatus(t *testing.T) { + for _, e := range listTestEnv() { + testHealthCheckServingStatus(t, e) + } +} + +func testHealthCheckServingStatus(t *testing.T, e env) { + hs := health.NewHealthServer() + s, cc := setUp(hs, math.MaxUint32, e) + defer tearDown(s, cc) + if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != grpc.Errorf(codes.NotFound, "unknown service") { + t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, error code %d", err, codes.NotFound) + } + hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", healthpb.HealthCheckResponse_SERVING) + out, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck") + if err != nil { + t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, ", err) + } + if out.Status != healthpb.HealthCheckResponse_SERVING { + t.Fatalf("Got the serving status %v, want SERVING", out.Status) + } + hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", healthpb.HealthCheckResponse_NOT_SERVING) + out, err = healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck") + if err != nil { + t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, ", err) + } + if out.Status != healthpb.HealthCheckResponse_NOT_SERVING { + t.Fatalf("Got the serving status %v, want NOT_SERVING", out.Status) + } + +} + func TestEmptyUnary(t *testing.T) { for _, e := range listTestEnv() { testEmptyUnary(t, e) @@ -410,7 +455,7 @@ func TestEmptyUnary(t *testing.T) { } func testEmptyUnary(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}) @@ -426,7 +471,7 @@ func TestFailedEmptyUnary(t *testing.T) { } func testFailedEmptyUnary(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) ctx := metadata.NewContext(context.Background(), testMetadata) @@ -442,7 +487,7 @@ func TestLargeUnary(t *testing.T) { } func testLargeUnary(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) argSize := 271828 @@ -470,7 +515,7 @@ func TestMetadataUnaryRPC(t *testing.T) { } func testMetadataUnaryRPC(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) argSize := 2718 @@ -524,7 +569,7 @@ func TestRetry(t *testing.T) { // TODO(zhaoq): Refactor to make this clearer and add more cases to test racy // and error-prone paths. func testRetry(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) var wg sync.WaitGroup @@ -554,7 +599,7 @@ func TestRPCTimeout(t *testing.T) { // TODO(zhaoq): Have a better test coverage of timeout and cancellation mechanism. func testRPCTimeout(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) argSize := 2718 @@ -580,7 +625,7 @@ func TestCancel(t *testing.T) { } func testCancel(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) argSize := 2718 @@ -612,7 +657,7 @@ func TestPingPong(t *testing.T) { } func testPingPong(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) stream, err := tc.FullDuplexCall(context.Background()) @@ -663,7 +708,7 @@ func TestMetadataStreamingRPC(t *testing.T) { } func testMetadataStreamingRPC(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) ctx := metadata.NewContext(context.Background(), testMetadata) @@ -720,7 +765,7 @@ func TestServerStreaming(t *testing.T) { } func testServerStreaming(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) respParam := make([]*testpb.ResponseParameters, len(respSizes)) @@ -772,7 +817,7 @@ func TestFailedServerStreaming(t *testing.T) { } func testFailedServerStreaming(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) respParam := make([]*testpb.ResponseParameters, len(respSizes)) @@ -802,7 +847,7 @@ func TestClientStreaming(t *testing.T) { } func testClientStreaming(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) stream, err := tc.StreamingInputCall(context.Background()) @@ -837,7 +882,7 @@ func TestExceedMaxStreamsLimit(t *testing.T) { func testExceedMaxStreamsLimit(t *testing.T, e env) { // Only allows 1 live stream per server transport. - s, cc := setUp(false, 1, e) + s, cc := setUp(nil, 1, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) // Perform a unary RPC to make sure the new settings were propagated to the client.