diff --git a/health/grpc_health_v1alpha/health.pb.go b/health/grpc_health_v1alpha/health.pb.go new file mode 100644 index 00000000..14356f27 --- /dev/null +++ b/health/grpc_health_v1alpha/health.pb.go @@ -0,0 +1,130 @@ +// Code generated by protoc-gen-go. +// source: health.proto +// DO NOT EDIT! + +/* +Package grpc_health_v1alpha is a generated protocol buffer package. + +It is generated from these files: + health.proto + +It has these top-level messages: + HealthCheckRequest + HealthCheckResponse +*/ +package grpc_health_v1alpha + +import proto "github.com/golang/protobuf/proto" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal + +type HealthCheckResponse_ServingStatus int32 + +const ( + HealthCheckResponse_UNKNOWN HealthCheckResponse_ServingStatus = 0 + HealthCheckResponse_SERVING HealthCheckResponse_ServingStatus = 1 + HealthCheckResponse_NOT_SERVING HealthCheckResponse_ServingStatus = 2 +) + +var HealthCheckResponse_ServingStatus_name = map[int32]string{ + 0: "UNKNOWN", + 1: "SERVING", + 2: "NOT_SERVING", +} +var HealthCheckResponse_ServingStatus_value = map[string]int32{ + "UNKNOWN": 0, + "SERVING": 1, + "NOT_SERVING": 2, +} + +func (x HealthCheckResponse_ServingStatus) String() string { + return proto.EnumName(HealthCheckResponse_ServingStatus_name, int32(x)) +} + +type HealthCheckRequest struct { + Host string `protobuf:"bytes,1,opt,name=host" json:"host,omitempty"` + Service string `protobuf:"bytes,2,opt,name=service" json:"service,omitempty"` +} + +func (m *HealthCheckRequest) Reset() { *m = HealthCheckRequest{} } +func (m *HealthCheckRequest) String() string { return proto.CompactTextString(m) } +func (*HealthCheckRequest) ProtoMessage() {} + +type HealthCheckResponse struct { + Status HealthCheckResponse_ServingStatus `protobuf:"varint,1,opt,name=status,enum=grpc.health.v1alpha.HealthCheckResponse_ServingStatus" json:"status,omitempty"` +} + +func (m *HealthCheckResponse) Reset() { *m = HealthCheckResponse{} } +func (m *HealthCheckResponse) String() string { return proto.CompactTextString(m) } +func (*HealthCheckResponse) ProtoMessage() {} + +func init() { + proto.RegisterEnum("grpc.health.v1alpha.HealthCheckResponse_ServingStatus", HealthCheckResponse_ServingStatus_name, HealthCheckResponse_ServingStatus_value) +} + +// Client API for HealthCheck service + +type HealthCheckClient interface { + Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) +} + +type healthCheckClient struct { + cc *grpc.ClientConn +} + +func NewHealthCheckClient(cc *grpc.ClientConn) HealthCheckClient { + return &healthCheckClient{cc} +} + +func (c *healthCheckClient) Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) { + out := new(HealthCheckResponse) + err := grpc.Invoke(ctx, "/grpc.health.v1alpha.HealthCheck/Check", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for HealthCheck service + +type HealthCheckServer interface { + Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) +} + +func RegisterHealthCheckServer(s *grpc.Server, srv HealthCheckServer) { + s.RegisterService(&_HealthCheck_serviceDesc, srv) +} + +func _HealthCheck_Check_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) { + in := new(HealthCheckRequest) + if err := codec.Unmarshal(buf, in); err != nil { + return nil, err + } + out, err := srv.(HealthCheckServer).Check(ctx, in) + if err != nil { + return nil, err + } + return out, nil +} + +var _HealthCheck_serviceDesc = grpc.ServiceDesc{ + ServiceName: "grpc.health.v1alpha.HealthCheck", + HandlerType: (*HealthCheckServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Check", + Handler: _HealthCheck_Check_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, +} diff --git a/health/grpc_health_v1alpha/health.proto b/health/grpc_health_v1alpha/health.proto new file mode 100644 index 00000000..f2c7efcf --- /dev/null +++ b/health/grpc_health_v1alpha/health.proto @@ -0,0 +1,20 @@ +syntax = "proto3"; +package grpc.health.v1alpha; + +message HealthCheckRequest { + string host = 1; + string service = 2; + } + + message HealthCheckResponse { + enum ServingStatus { + UNKNOWN = 0; + SERVING = 1; + NOT_SERVING = 2; + } + ServingStatus status = 1; + } + +service HealthCheck{ + rpc Check( HealthCheckRequest) returns ( HealthCheckResponse); +} diff --git a/health/health.go b/health/health.go index 3089f3e4..66081d60 100644 --- a/health/health.go +++ b/health/health.go @@ -7,22 +7,39 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc" - healthpb "google.golang.org/grpc/health/grpc_health" + "google.golang.org/grpc/codes" + healthpb "google.golang.org/grpc/health/grpc_health_v1alpha" ) // HealthCheck is the client side function to health-check a server -func HealthCheck(t time.Duration, cc *grpc.ClientConn) error { +func HealthCheck(t time.Duration, cc *grpc.ClientConn, service_name string) (*healthpb.HealthCheckResponse, error) { ctx, _ := context.WithTimeout(context.Background(), t) hc := healthpb.NewHealthCheckClient(cc) req := new(healthpb.HealthCheckRequest) - _, err := hc.Check(ctx, req) - return err + req.Host = "" + req.Service = service_name + out, err := hc.Check(ctx, req) + return out, err } type HealthServer struct { + StatusMap map[string]int32 } -func (s *HealthServer) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) { - out := new(healthpb.HealthCheckResponse) - return out, nil +func (s *HealthServer) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (out *healthpb.HealthCheckResponse, err error) { + service := ":" + in.Service + out = new(healthpb.HealthCheckResponse) + status, ok := s.StatusMap[service] + out.Status = healthpb.HealthCheckResponse_ServingStatus(status) + if !ok { + err = grpc.Errorf(codes.NotFound, "unknown service") + } else { + err = nil + } + return out, err +} + +func (s *HealthServer) SetServingStatus(service string, status int32) { + service = ":" + service + s.StatusMap[service] = status } diff --git a/test/end2end_test.go b/test/end2end_test.go index b00a487f..65dee899 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -52,7 +52,7 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/health" - healthpb "google.golang.org/grpc/health/grpc_health" + healthpb "google.golang.org/grpc/health/grpc_health_v1alpha" "google.golang.org/grpc/metadata" testpb "google.golang.org/grpc/test/grpc_testing" ) @@ -285,7 +285,7 @@ func listTestEnv() []env { return []env{env{"tcp", nil, ""}, env{"tcp", nil, "tls"}, env{"unix", unixDialer, ""}, env{"unix", unixDialer, "tls"}} } -func setUp(healthCheck bool, maxStream uint32, e env) (s *grpc.Server, cc *grpc.ClientConn) { +func setUp(hs *health.HealthServer, maxStream uint32, e env) (s *grpc.Server, cc *grpc.ClientConn) { sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(maxStream)} la := ":0" switch e.network { @@ -305,8 +305,8 @@ func setUp(healthCheck bool, maxStream uint32, e env) (s *grpc.Server, cc *grpc. sopts = append(sopts, grpc.Creds(creds)) } s = grpc.NewServer(sopts...) - if healthCheck { - healthpb.RegisterHealthCheckServer(s, &health.HealthServer{}) + if hs != nil { + healthpb.RegisterHealthCheckServer(s, hs) } testpb.RegisterTestServiceServer(s, &testServer{}) go s.Serve(lis) @@ -347,7 +347,7 @@ func TestTimeoutOnDeadServer(t *testing.T) { } func testTimeoutOnDeadServer(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) s.Stop() // Set -1 as the timeout to make sure if transportMonitor gets error @@ -367,9 +367,14 @@ func TestHealthCheckOnSuccess(t *testing.T) { } func testHealthCheckOnSuccess(t *testing.T, e env) { - s, cc := setUp(true, math.MaxUint32, e) + + hs := &health.HealthServer{ + StatusMap: make(map[string]int32), + } + hs.SetServingStatus("grpc.health.v1alpha.HealthCheck", 1) + s, cc := setUp(hs, math.MaxUint32, e) defer tearDown(s, cc) - if err := health.HealthCheck(1*time.Second, cc); err != nil { + if _, err := health.HealthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != nil { t.Fatalf("HealthCheck(_)=_, %v, want ", err) } } @@ -381,9 +386,14 @@ func TestHealthCheckOnFailure(t *testing.T) { } func testHealthCheckOnFailure(t *testing.T, e env) { - s, cc := setUp(true, math.MaxUint32, e) + + hs := &health.HealthServer{ + StatusMap: make(map[string]int32), + } + hs.SetServingStatus("grpc.health.v1alpha.HealthCheck", 1) + s, cc := setUp(hs, math.MaxUint32, e) defer tearDown(s, cc) - if err := health.HealthCheck(0*time.Second, cc); err != grpc.Errorf(codes.DeadlineExceeded, "context deadline exceeded") { + if _, err := health.HealthCheck(0*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != grpc.Errorf(codes.DeadlineExceeded, "context deadline exceeded") { t.Fatalf("HealthCheck(_)=_, %v, want error code %d", err, codes.DeadlineExceeded) } } @@ -395,11 +405,75 @@ func TestHealthCheckOff(t *testing.T) { } func testHealthCheckOff(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) defer tearDown(s, cc) - err := health.HealthCheck(1*time.Second, cc) - if err != grpc.Errorf(codes.Unimplemented, "unknown service grpc.health.HealthCheck") { - t.Fatalf("HealthCheck(_)=_, %v, want error code %d", err, codes.DeadlineExceeded) + if _, err := health.HealthCheck(1*time.Second, cc, ""); err != grpc.Errorf(codes.Unimplemented, "unknown service grpc.health.v1alpha.HealthCheck") { + t.Fatalf("HealthCheck(_)=_, %v, want error code %d", err, codes.Unimplemented) + } +} + +func TestHealthCheckNotFound(t *testing.T) { + for _, e := range listTestEnv() { + testHealthCheckNotFound(t, e) + } +} + +func testHealthCheckNotFound(t *testing.T, e env) { + + hs := &health.HealthServer{ + StatusMap: make(map[string]int32), + } + + s, cc := setUp(hs, math.MaxUint32, e) + defer tearDown(s, cc) + if _, err := health.HealthCheck(1*time.Second, cc, "unregister_service"); err != grpc.Errorf(codes.NotFound, "unknown service") { + t.Fatalf("HealthCheck(_)=_, %v, want error code %d", err, codes.NotFound) + } +} + +func TestHealthCheckServing(t *testing.T) { + for _, e := range listTestEnv() { + testHealthCheckServing(t, e) + } +} + +func testHealthCheckServing(t *testing.T, e env) { + + hs := &health.HealthServer{ + StatusMap: make(map[string]int32), + } + hs.SetServingStatus("grpc.health.v1alpha.HealthCheck", 1) + s, cc := setUp(hs, math.MaxUint32, e) + defer tearDown(s, cc) + out, err := health.HealthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck") + if err != nil { + t.Fatalf("HealthCheck(_)=_, %v, want _,", err) + } + if out.Status != healthpb.HealthCheckResponse_SERVING { + t.Fatalf("Got the serving status %v, want SERVING", out.Status) + } +} + +func TestHealthCheckNotServing(t *testing.T) { + for _, e := range listTestEnv() { + testHealthCheckNotServing(t, e) + } +} + +func testHealthCheckNotServing(t *testing.T, e env) { + + hs := &health.HealthServer{ + StatusMap: make(map[string]int32), + } + hs.SetServingStatus("grpc.health.v1alpha.HealthCheck", 2) + s, cc := setUp(hs, math.MaxUint32, e) + defer tearDown(s, cc) + out, err := health.HealthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck") + if err != nil { + t.Fatalf("HealthCheck(_)=_, %v, want _,", err) + } + if out.Status != healthpb.HealthCheckResponse_NOT_SERVING { + t.Fatalf("Got the serving status %v, want NOT_SERVING ") } } @@ -410,7 +484,7 @@ func TestEmptyUnary(t *testing.T) { } func testEmptyUnary(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}) @@ -426,7 +500,7 @@ func TestFailedEmptyUnary(t *testing.T) { } func testFailedEmptyUnary(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) ctx := metadata.NewContext(context.Background(), testMetadata) @@ -442,7 +516,7 @@ func TestLargeUnary(t *testing.T) { } func testLargeUnary(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) argSize := 271828 @@ -470,7 +544,7 @@ func TestMetadataUnaryRPC(t *testing.T) { } func testMetadataUnaryRPC(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) argSize := 2718 @@ -524,7 +598,7 @@ func TestRetry(t *testing.T) { // TODO(zhaoq): Refactor to make this clearer and add more cases to test racy // and error-prone paths. func testRetry(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) var wg sync.WaitGroup @@ -554,7 +628,7 @@ func TestRPCTimeout(t *testing.T) { // TODO(zhaoq): Have a better test coverage of timeout and cancellation mechanism. func testRPCTimeout(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) argSize := 2718 @@ -580,7 +654,7 @@ func TestCancel(t *testing.T) { } func testCancel(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) argSize := 2718 @@ -612,7 +686,7 @@ func TestPingPong(t *testing.T) { } func testPingPong(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) stream, err := tc.FullDuplexCall(context.Background()) @@ -663,7 +737,7 @@ func TestMetadataStreamingRPC(t *testing.T) { } func testMetadataStreamingRPC(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) ctx := metadata.NewContext(context.Background(), testMetadata) @@ -720,7 +794,7 @@ func TestServerStreaming(t *testing.T) { } func testServerStreaming(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) respParam := make([]*testpb.ResponseParameters, len(respSizes)) @@ -772,7 +846,7 @@ func TestFailedServerStreaming(t *testing.T) { } func testFailedServerStreaming(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) respParam := make([]*testpb.ResponseParameters, len(respSizes)) @@ -802,7 +876,7 @@ func TestClientStreaming(t *testing.T) { } func testClientStreaming(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) stream, err := tc.StreamingInputCall(context.Background()) @@ -837,7 +911,7 @@ func TestExceedMaxStreamsLimit(t *testing.T) { func testExceedMaxStreamsLimit(t *testing.T, e env) { // Only allows 1 live stream per server transport. - s, cc := setUp(false, 1, e) + s, cc := setUp(nil, 1, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) // Perform a unary RPC to make sure the new settings were propagated to the client.