From 58e5eda3f46f1fce0fd6b6d1d02b5cf34d4adb63 Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Wed, 22 Jul 2015 10:32:39 -0700 Subject: [PATCH 01/17] update health check service and corresponding end2end test --- health/grpc_health_v1alpha/health.pb.go | 130 ++++++++++++++++++++++++ health/grpc_health_v1alpha/health.proto | 20 ++++ health/health.go | 31 ++++-- test/end2end_test.go | 126 ++++++++++++++++++----- 4 files changed, 274 insertions(+), 33 deletions(-) create mode 100644 health/grpc_health_v1alpha/health.pb.go create mode 100644 health/grpc_health_v1alpha/health.proto diff --git a/health/grpc_health_v1alpha/health.pb.go b/health/grpc_health_v1alpha/health.pb.go new file mode 100644 index 00000000..14356f27 --- /dev/null +++ b/health/grpc_health_v1alpha/health.pb.go @@ -0,0 +1,130 @@ +// Code generated by protoc-gen-go. +// source: health.proto +// DO NOT EDIT! + +/* +Package grpc_health_v1alpha is a generated protocol buffer package. + +It is generated from these files: + health.proto + +It has these top-level messages: + HealthCheckRequest + HealthCheckResponse +*/ +package grpc_health_v1alpha + +import proto "github.com/golang/protobuf/proto" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal + +type HealthCheckResponse_ServingStatus int32 + +const ( + HealthCheckResponse_UNKNOWN HealthCheckResponse_ServingStatus = 0 + HealthCheckResponse_SERVING HealthCheckResponse_ServingStatus = 1 + HealthCheckResponse_NOT_SERVING HealthCheckResponse_ServingStatus = 2 +) + +var HealthCheckResponse_ServingStatus_name = map[int32]string{ + 0: "UNKNOWN", + 1: "SERVING", + 2: "NOT_SERVING", +} +var HealthCheckResponse_ServingStatus_value = map[string]int32{ + "UNKNOWN": 0, + "SERVING": 1, + "NOT_SERVING": 2, +} + +func (x HealthCheckResponse_ServingStatus) String() string { + return proto.EnumName(HealthCheckResponse_ServingStatus_name, int32(x)) +} + +type HealthCheckRequest struct { + Host string `protobuf:"bytes,1,opt,name=host" json:"host,omitempty"` + Service string `protobuf:"bytes,2,opt,name=service" json:"service,omitempty"` +} + +func (m *HealthCheckRequest) Reset() { *m = HealthCheckRequest{} } +func (m *HealthCheckRequest) String() string { return proto.CompactTextString(m) } +func (*HealthCheckRequest) ProtoMessage() {} + +type HealthCheckResponse struct { + Status HealthCheckResponse_ServingStatus `protobuf:"varint,1,opt,name=status,enum=grpc.health.v1alpha.HealthCheckResponse_ServingStatus" json:"status,omitempty"` +} + +func (m *HealthCheckResponse) Reset() { *m = HealthCheckResponse{} } +func (m *HealthCheckResponse) String() string { return proto.CompactTextString(m) } +func (*HealthCheckResponse) ProtoMessage() {} + +func init() { + proto.RegisterEnum("grpc.health.v1alpha.HealthCheckResponse_ServingStatus", HealthCheckResponse_ServingStatus_name, HealthCheckResponse_ServingStatus_value) +} + +// Client API for HealthCheck service + +type HealthCheckClient interface { + Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) +} + +type healthCheckClient struct { + cc *grpc.ClientConn +} + +func NewHealthCheckClient(cc *grpc.ClientConn) HealthCheckClient { + return &healthCheckClient{cc} +} + +func (c *healthCheckClient) Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) { + out := new(HealthCheckResponse) + err := grpc.Invoke(ctx, "/grpc.health.v1alpha.HealthCheck/Check", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for HealthCheck service + +type HealthCheckServer interface { + Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) +} + +func RegisterHealthCheckServer(s *grpc.Server, srv HealthCheckServer) { + s.RegisterService(&_HealthCheck_serviceDesc, srv) +} + +func _HealthCheck_Check_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) { + in := new(HealthCheckRequest) + if err := codec.Unmarshal(buf, in); err != nil { + return nil, err + } + out, err := srv.(HealthCheckServer).Check(ctx, in) + if err != nil { + return nil, err + } + return out, nil +} + +var _HealthCheck_serviceDesc = grpc.ServiceDesc{ + ServiceName: "grpc.health.v1alpha.HealthCheck", + HandlerType: (*HealthCheckServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Check", + Handler: _HealthCheck_Check_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, +} diff --git a/health/grpc_health_v1alpha/health.proto b/health/grpc_health_v1alpha/health.proto new file mode 100644 index 00000000..f2c7efcf --- /dev/null +++ b/health/grpc_health_v1alpha/health.proto @@ -0,0 +1,20 @@ +syntax = "proto3"; +package grpc.health.v1alpha; + +message HealthCheckRequest { + string host = 1; + string service = 2; + } + + message HealthCheckResponse { + enum ServingStatus { + UNKNOWN = 0; + SERVING = 1; + NOT_SERVING = 2; + } + ServingStatus status = 1; + } + +service HealthCheck{ + rpc Check( HealthCheckRequest) returns ( HealthCheckResponse); +} diff --git a/health/health.go b/health/health.go index 3089f3e4..66081d60 100644 --- a/health/health.go +++ b/health/health.go @@ -7,22 +7,39 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc" - healthpb "google.golang.org/grpc/health/grpc_health" + "google.golang.org/grpc/codes" + healthpb "google.golang.org/grpc/health/grpc_health_v1alpha" ) // HealthCheck is the client side function to health-check a server -func HealthCheck(t time.Duration, cc *grpc.ClientConn) error { +func HealthCheck(t time.Duration, cc *grpc.ClientConn, service_name string) (*healthpb.HealthCheckResponse, error) { ctx, _ := context.WithTimeout(context.Background(), t) hc := healthpb.NewHealthCheckClient(cc) req := new(healthpb.HealthCheckRequest) - _, err := hc.Check(ctx, req) - return err + req.Host = "" + req.Service = service_name + out, err := hc.Check(ctx, req) + return out, err } type HealthServer struct { + StatusMap map[string]int32 } -func (s *HealthServer) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) { - out := new(healthpb.HealthCheckResponse) - return out, nil +func (s *HealthServer) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (out *healthpb.HealthCheckResponse, err error) { + service := ":" + in.Service + out = new(healthpb.HealthCheckResponse) + status, ok := s.StatusMap[service] + out.Status = healthpb.HealthCheckResponse_ServingStatus(status) + if !ok { + err = grpc.Errorf(codes.NotFound, "unknown service") + } else { + err = nil + } + return out, err +} + +func (s *HealthServer) SetServingStatus(service string, status int32) { + service = ":" + service + s.StatusMap[service] = status } diff --git a/test/end2end_test.go b/test/end2end_test.go index b00a487f..65dee899 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -52,7 +52,7 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/health" - healthpb "google.golang.org/grpc/health/grpc_health" + healthpb "google.golang.org/grpc/health/grpc_health_v1alpha" "google.golang.org/grpc/metadata" testpb "google.golang.org/grpc/test/grpc_testing" ) @@ -285,7 +285,7 @@ func listTestEnv() []env { return []env{env{"tcp", nil, ""}, env{"tcp", nil, "tls"}, env{"unix", unixDialer, ""}, env{"unix", unixDialer, "tls"}} } -func setUp(healthCheck bool, maxStream uint32, e env) (s *grpc.Server, cc *grpc.ClientConn) { +func setUp(hs *health.HealthServer, maxStream uint32, e env) (s *grpc.Server, cc *grpc.ClientConn) { sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(maxStream)} la := ":0" switch e.network { @@ -305,8 +305,8 @@ func setUp(healthCheck bool, maxStream uint32, e env) (s *grpc.Server, cc *grpc. sopts = append(sopts, grpc.Creds(creds)) } s = grpc.NewServer(sopts...) - if healthCheck { - healthpb.RegisterHealthCheckServer(s, &health.HealthServer{}) + if hs != nil { + healthpb.RegisterHealthCheckServer(s, hs) } testpb.RegisterTestServiceServer(s, &testServer{}) go s.Serve(lis) @@ -347,7 +347,7 @@ func TestTimeoutOnDeadServer(t *testing.T) { } func testTimeoutOnDeadServer(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) s.Stop() // Set -1 as the timeout to make sure if transportMonitor gets error @@ -367,9 +367,14 @@ func TestHealthCheckOnSuccess(t *testing.T) { } func testHealthCheckOnSuccess(t *testing.T, e env) { - s, cc := setUp(true, math.MaxUint32, e) + + hs := &health.HealthServer{ + StatusMap: make(map[string]int32), + } + hs.SetServingStatus("grpc.health.v1alpha.HealthCheck", 1) + s, cc := setUp(hs, math.MaxUint32, e) defer tearDown(s, cc) - if err := health.HealthCheck(1*time.Second, cc); err != nil { + if _, err := health.HealthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != nil { t.Fatalf("HealthCheck(_)=_, %v, want ", err) } } @@ -381,9 +386,14 @@ func TestHealthCheckOnFailure(t *testing.T) { } func testHealthCheckOnFailure(t *testing.T, e env) { - s, cc := setUp(true, math.MaxUint32, e) + + hs := &health.HealthServer{ + StatusMap: make(map[string]int32), + } + hs.SetServingStatus("grpc.health.v1alpha.HealthCheck", 1) + s, cc := setUp(hs, math.MaxUint32, e) defer tearDown(s, cc) - if err := health.HealthCheck(0*time.Second, cc); err != grpc.Errorf(codes.DeadlineExceeded, "context deadline exceeded") { + if _, err := health.HealthCheck(0*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != grpc.Errorf(codes.DeadlineExceeded, "context deadline exceeded") { t.Fatalf("HealthCheck(_)=_, %v, want error code %d", err, codes.DeadlineExceeded) } } @@ -395,11 +405,75 @@ func TestHealthCheckOff(t *testing.T) { } func testHealthCheckOff(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) defer tearDown(s, cc) - err := health.HealthCheck(1*time.Second, cc) - if err != grpc.Errorf(codes.Unimplemented, "unknown service grpc.health.HealthCheck") { - t.Fatalf("HealthCheck(_)=_, %v, want error code %d", err, codes.DeadlineExceeded) + if _, err := health.HealthCheck(1*time.Second, cc, ""); err != grpc.Errorf(codes.Unimplemented, "unknown service grpc.health.v1alpha.HealthCheck") { + t.Fatalf("HealthCheck(_)=_, %v, want error code %d", err, codes.Unimplemented) + } +} + +func TestHealthCheckNotFound(t *testing.T) { + for _, e := range listTestEnv() { + testHealthCheckNotFound(t, e) + } +} + +func testHealthCheckNotFound(t *testing.T, e env) { + + hs := &health.HealthServer{ + StatusMap: make(map[string]int32), + } + + s, cc := setUp(hs, math.MaxUint32, e) + defer tearDown(s, cc) + if _, err := health.HealthCheck(1*time.Second, cc, "unregister_service"); err != grpc.Errorf(codes.NotFound, "unknown service") { + t.Fatalf("HealthCheck(_)=_, %v, want error code %d", err, codes.NotFound) + } +} + +func TestHealthCheckServing(t *testing.T) { + for _, e := range listTestEnv() { + testHealthCheckServing(t, e) + } +} + +func testHealthCheckServing(t *testing.T, e env) { + + hs := &health.HealthServer{ + StatusMap: make(map[string]int32), + } + hs.SetServingStatus("grpc.health.v1alpha.HealthCheck", 1) + s, cc := setUp(hs, math.MaxUint32, e) + defer tearDown(s, cc) + out, err := health.HealthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck") + if err != nil { + t.Fatalf("HealthCheck(_)=_, %v, want _,", err) + } + if out.Status != healthpb.HealthCheckResponse_SERVING { + t.Fatalf("Got the serving status %v, want SERVING", out.Status) + } +} + +func TestHealthCheckNotServing(t *testing.T) { + for _, e := range listTestEnv() { + testHealthCheckNotServing(t, e) + } +} + +func testHealthCheckNotServing(t *testing.T, e env) { + + hs := &health.HealthServer{ + StatusMap: make(map[string]int32), + } + hs.SetServingStatus("grpc.health.v1alpha.HealthCheck", 2) + s, cc := setUp(hs, math.MaxUint32, e) + defer tearDown(s, cc) + out, err := health.HealthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck") + if err != nil { + t.Fatalf("HealthCheck(_)=_, %v, want _,", err) + } + if out.Status != healthpb.HealthCheckResponse_NOT_SERVING { + t.Fatalf("Got the serving status %v, want NOT_SERVING ") } } @@ -410,7 +484,7 @@ func TestEmptyUnary(t *testing.T) { } func testEmptyUnary(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}) @@ -426,7 +500,7 @@ func TestFailedEmptyUnary(t *testing.T) { } func testFailedEmptyUnary(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) ctx := metadata.NewContext(context.Background(), testMetadata) @@ -442,7 +516,7 @@ func TestLargeUnary(t *testing.T) { } func testLargeUnary(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) argSize := 271828 @@ -470,7 +544,7 @@ func TestMetadataUnaryRPC(t *testing.T) { } func testMetadataUnaryRPC(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) argSize := 2718 @@ -524,7 +598,7 @@ func TestRetry(t *testing.T) { // TODO(zhaoq): Refactor to make this clearer and add more cases to test racy // and error-prone paths. func testRetry(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) var wg sync.WaitGroup @@ -554,7 +628,7 @@ func TestRPCTimeout(t *testing.T) { // TODO(zhaoq): Have a better test coverage of timeout and cancellation mechanism. func testRPCTimeout(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) argSize := 2718 @@ -580,7 +654,7 @@ func TestCancel(t *testing.T) { } func testCancel(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) argSize := 2718 @@ -612,7 +686,7 @@ func TestPingPong(t *testing.T) { } func testPingPong(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) stream, err := tc.FullDuplexCall(context.Background()) @@ -663,7 +737,7 @@ func TestMetadataStreamingRPC(t *testing.T) { } func testMetadataStreamingRPC(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) ctx := metadata.NewContext(context.Background(), testMetadata) @@ -720,7 +794,7 @@ func TestServerStreaming(t *testing.T) { } func testServerStreaming(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) respParam := make([]*testpb.ResponseParameters, len(respSizes)) @@ -772,7 +846,7 @@ func TestFailedServerStreaming(t *testing.T) { } func testFailedServerStreaming(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) respParam := make([]*testpb.ResponseParameters, len(respSizes)) @@ -802,7 +876,7 @@ func TestClientStreaming(t *testing.T) { } func testClientStreaming(t *testing.T, e env) { - s, cc := setUp(false, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) stream, err := tc.StreamingInputCall(context.Background()) @@ -837,7 +911,7 @@ func TestExceedMaxStreamsLimit(t *testing.T) { func testExceedMaxStreamsLimit(t *testing.T, e env) { // Only allows 1 live stream per server transport. - s, cc := setUp(false, 1, e) + s, cc := setUp(nil, 1, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) // Perform a unary RPC to make sure the new settings were propagated to the client. From 2fdea48f1f7136966773d996de55c7ecc1251039 Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Wed, 22 Jul 2015 10:46:58 -0700 Subject: [PATCH 02/17] remove the old health.proto file and health.pb.go --- health/grpc_health/health.pb.go | 103 -------------------------------- health/grpc_health/health.proto | 13 ---- 2 files changed, 116 deletions(-) delete mode 100644 health/grpc_health/health.pb.go delete mode 100644 health/grpc_health/health.proto diff --git a/health/grpc_health/health.pb.go b/health/grpc_health/health.pb.go deleted file mode 100644 index 068597f8..00000000 --- a/health/grpc_health/health.pb.go +++ /dev/null @@ -1,103 +0,0 @@ -// Code generated by protoc-gen-go. -// source: health.proto -// DO NOT EDIT! - -/* -Package grpc_health is a generated protocol buffer package. - -It is generated from these files: - health.proto - -It has these top-level messages: - HealthCheckRequest - HealthCheckResponse -*/ -package grpc_health - -import proto "github.com/golang/protobuf/proto" - -import ( - context "golang.org/x/net/context" - grpc "google.golang.org/grpc" -) - -// Reference imports to suppress errors if they are not otherwise used. -var _ context.Context -var _ grpc.ClientConn - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal - -type HealthCheckRequest struct { -} - -func (m *HealthCheckRequest) Reset() { *m = HealthCheckRequest{} } -func (m *HealthCheckRequest) String() string { return proto.CompactTextString(m) } -func (*HealthCheckRequest) ProtoMessage() {} - -type HealthCheckResponse struct { -} - -func (m *HealthCheckResponse) Reset() { *m = HealthCheckResponse{} } -func (m *HealthCheckResponse) String() string { return proto.CompactTextString(m) } -func (*HealthCheckResponse) ProtoMessage() {} - -func init() { -} - -// Client API for HealthCheck service - -type HealthCheckClient interface { - Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) -} - -type healthCheckClient struct { - cc *grpc.ClientConn -} - -func NewHealthCheckClient(cc *grpc.ClientConn) HealthCheckClient { - return &healthCheckClient{cc} -} - -func (c *healthCheckClient) Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) { - out := new(HealthCheckResponse) - err := grpc.Invoke(ctx, "/grpc.health.HealthCheck/Check", in, out, c.cc, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -// Server API for HealthCheck service - -type HealthCheckServer interface { - Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) -} - -func RegisterHealthCheckServer(s *grpc.Server, srv HealthCheckServer) { - s.RegisterService(&_HealthCheck_serviceDesc, srv) -} - -func _HealthCheck_Check_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) { - in := new(HealthCheckRequest) - if err := codec.Unmarshal(buf, in); err != nil { - return nil, err - } - out, err := srv.(HealthCheckServer).Check(ctx, in) - if err != nil { - return nil, err - } - return out, nil -} - -var _HealthCheck_serviceDesc = grpc.ServiceDesc{ - ServiceName: "grpc.health.HealthCheck", - HandlerType: (*HealthCheckServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "Check", - Handler: _HealthCheck_Check_Handler, - }, - }, - Streams: []grpc.StreamDesc{}, -} diff --git a/health/grpc_health/health.proto b/health/grpc_health/health.proto deleted file mode 100644 index 40ae67d9..00000000 --- a/health/grpc_health/health.proto +++ /dev/null @@ -1,13 +0,0 @@ -syntax = "proto3"; - -package grpc.health; - -message HealthCheckRequest{ -} - -message HealthCheckResponse{ -} - -service HealthCheck{ - rpc Check( HealthCheckRequest) returns ( HealthCheckResponse); -} From 34f97a03ae1dce6cf69fab301578c22f937c865a Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Wed, 22 Jul 2015 16:04:53 -0700 Subject: [PATCH 03/17] minor changes --- health/grpc_health_v1alpha/health.proto | 23 ++++++------ health/health.go | 22 ++++-------- test/end2end_test.go | 47 ++++++++++++++----------- 3 files changed, 44 insertions(+), 48 deletions(-) diff --git a/health/grpc_health_v1alpha/health.proto b/health/grpc_health_v1alpha/health.proto index f2c7efcf..de7c0458 100644 --- a/health/grpc_health_v1alpha/health.proto +++ b/health/grpc_health_v1alpha/health.proto @@ -1,20 +1,19 @@ syntax = "proto3"; + package grpc.health.v1alpha; message HealthCheckRequest { string host = 1; string service = 2; } - - message HealthCheckResponse { - enum ServingStatus { - UNKNOWN = 0; - SERVING = 1; - NOT_SERVING = 2; - } - ServingStatus status = 1; - } - +message HealthCheckResponse { + enum ServingStatus { + UNKNOWN = 0; + SERVING = 1; + NOT_SERVING = 2; + } + ServingStatus status = 1; + } service HealthCheck{ - rpc Check( HealthCheckRequest) returns ( HealthCheckResponse); -} + rpc Check( HealthCheckRequest) returns ( HealthCheckResponse); + } diff --git a/health/health.go b/health/health.go index 66081d60..00007a30 100644 --- a/health/health.go +++ b/health/health.go @@ -3,31 +3,19 @@ package health import ( - "time" - "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" healthpb "google.golang.org/grpc/health/grpc_health_v1alpha" ) -// HealthCheck is the client side function to health-check a server -func HealthCheck(t time.Duration, cc *grpc.ClientConn, service_name string) (*healthpb.HealthCheckResponse, error) { - ctx, _ := context.WithTimeout(context.Background(), t) - hc := healthpb.NewHealthCheckClient(cc) - req := new(healthpb.HealthCheckRequest) - req.Host = "" - req.Service = service_name - out, err := hc.Check(ctx, req) - return out, err -} - type HealthServer struct { + // StatusMap stores the serving status of a service StatusMap map[string]int32 } func (s *HealthServer) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (out *healthpb.HealthCheckResponse, err error) { - service := ":" + in.Service + service := in.Host + ":" + in.Service out = new(healthpb.HealthCheckResponse) status, ok := s.StatusMap[service] out.Status = healthpb.HealthCheckResponse_ServingStatus(status) @@ -39,7 +27,9 @@ func (s *HealthServer) Check(ctx context.Context, in *healthpb.HealthCheckReques return out, err } -func (s *HealthServer) SetServingStatus(service string, status int32) { - service = ":" + service +// SetServingStatus is called when need to reset the serving status of a service +// or insert a new service entry into the statusMap +func (s *HealthServer) SetServingStatus(host string, service string, status int32) { + service = host + ":" + service s.StatusMap[service] = status } diff --git a/test/end2end_test.go b/test/end2end_test.go index 65dee899..b4f67957 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -93,6 +93,17 @@ func newPayload(t testpb.PayloadType, size int32) *testpb.Payload { } } +func healthCheck(t time.Duration, cc *grpc.ClientConn, serviceName string) (*healthpb.HealthCheckResponse, error) { + ctx, _ := context.WithTimeout(context.Background(), t) + hc := healthpb.NewHealthCheckClient(cc) + req := &healthpb.HealthCheckRequest{ + Host: "", + Service: serviceName, + } + out, err := hc.Check(ctx, req) + return out, err +} + func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { md, ok := metadata.FromContext(ctx) if ok { @@ -371,11 +382,11 @@ func testHealthCheckOnSuccess(t *testing.T, e env) { hs := &health.HealthServer{ StatusMap: make(map[string]int32), } - hs.SetServingStatus("grpc.health.v1alpha.HealthCheck", 1) + hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", 1) s, cc := setUp(hs, math.MaxUint32, e) defer tearDown(s, cc) - if _, err := health.HealthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != nil { - t.Fatalf("HealthCheck(_)=_, %v, want ", err) + if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != nil { + t.Fatalf("TestHealthCheck(_)=_, %v, want ", err) } } @@ -390,11 +401,11 @@ func testHealthCheckOnFailure(t *testing.T, e env) { hs := &health.HealthServer{ StatusMap: make(map[string]int32), } - hs.SetServingStatus("grpc.health.v1alpha.HealthCheck", 1) + hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", 1) s, cc := setUp(hs, math.MaxUint32, e) defer tearDown(s, cc) - if _, err := health.HealthCheck(0*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != grpc.Errorf(codes.DeadlineExceeded, "context deadline exceeded") { - t.Fatalf("HealthCheck(_)=_, %v, want error code %d", err, codes.DeadlineExceeded) + if _, err := healthCheck(0*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != grpc.Errorf(codes.DeadlineExceeded, "context deadline exceeded") { + t.Fatalf("TestHealthCheck(_)=_, %v, want error code %d", err, codes.DeadlineExceeded) } } @@ -407,8 +418,8 @@ func TestHealthCheckOff(t *testing.T) { func testHealthCheckOff(t *testing.T, e env) { s, cc := setUp(nil, math.MaxUint32, e) defer tearDown(s, cc) - if _, err := health.HealthCheck(1*time.Second, cc, ""); err != grpc.Errorf(codes.Unimplemented, "unknown service grpc.health.v1alpha.HealthCheck") { - t.Fatalf("HealthCheck(_)=_, %v, want error code %d", err, codes.Unimplemented) + if _, err := healthCheck(1*time.Second, cc, ""); err != grpc.Errorf(codes.Unimplemented, "unknown service grpc.health.v1alpha.HealthCheck") { + t.Fatalf("TestHealthCheck(_)=_, %v, want error code %d", err, codes.Unimplemented) } } @@ -419,15 +430,13 @@ func TestHealthCheckNotFound(t *testing.T) { } func testHealthCheckNotFound(t *testing.T, e env) { - hs := &health.HealthServer{ StatusMap: make(map[string]int32), } - s, cc := setUp(hs, math.MaxUint32, e) defer tearDown(s, cc) - if _, err := health.HealthCheck(1*time.Second, cc, "unregister_service"); err != grpc.Errorf(codes.NotFound, "unknown service") { - t.Fatalf("HealthCheck(_)=_, %v, want error code %d", err, codes.NotFound) + if _, err := healthCheck(1*time.Second, cc, "unregister_service"); err != grpc.Errorf(codes.NotFound, "unknown service") { + t.Fatalf("TestHealthCheck(_)=_, %v, want error code %d", err, codes.NotFound) } } @@ -438,16 +447,15 @@ func TestHealthCheckServing(t *testing.T) { } func testHealthCheckServing(t *testing.T, e env) { - hs := &health.HealthServer{ StatusMap: make(map[string]int32), } - hs.SetServingStatus("grpc.health.v1alpha.HealthCheck", 1) + hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", 1) s, cc := setUp(hs, math.MaxUint32, e) defer tearDown(s, cc) - out, err := health.HealthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck") + out, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck") if err != nil { - t.Fatalf("HealthCheck(_)=_, %v, want _,", err) + t.Fatalf("TestHealthCheck(_)=_, %v, want _,", err) } if out.Status != healthpb.HealthCheckResponse_SERVING { t.Fatalf("Got the serving status %v, want SERVING", out.Status) @@ -461,16 +469,15 @@ func TestHealthCheckNotServing(t *testing.T) { } func testHealthCheckNotServing(t *testing.T, e env) { - hs := &health.HealthServer{ StatusMap: make(map[string]int32), } - hs.SetServingStatus("grpc.health.v1alpha.HealthCheck", 2) + hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", 2) s, cc := setUp(hs, math.MaxUint32, e) defer tearDown(s, cc) - out, err := health.HealthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck") + out, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck") if err != nil { - t.Fatalf("HealthCheck(_)=_, %v, want _,", err) + t.Fatalf("TestHealthCheck(_)=_, %v, want _,", err) } if out.Status != healthpb.HealthCheckResponse_NOT_SERVING { t.Fatalf("Got the serving status %v, want NOT_SERVING ") From 901c3e574cbb154c350435e1d8d17ac09fbc0c8c Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Wed, 22 Jul 2015 16:06:15 -0700 Subject: [PATCH 04/17] remove redundent space --- test/end2end_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index b4f67957..02950ff1 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -378,7 +378,6 @@ func TestHealthCheckOnSuccess(t *testing.T) { } func testHealthCheckOnSuccess(t *testing.T, e env) { - hs := &health.HealthServer{ StatusMap: make(map[string]int32), } @@ -397,7 +396,6 @@ func TestHealthCheckOnFailure(t *testing.T) { } func testHealthCheckOnFailure(t *testing.T, e env) { - hs := &health.HealthServer{ StatusMap: make(map[string]int32), } From dc36cdbfdd56bcd133e87cb0c901d7720fb6486a Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Wed, 22 Jul 2015 16:26:16 -0700 Subject: [PATCH 05/17] add mutex and create newHealthServer Function --- health/health.go | 17 ++++++++++++++--- test/end2end_test.go | 20 +++++--------------- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/health/health.go b/health/health.go index 00007a30..de782dc5 100644 --- a/health/health.go +++ b/health/health.go @@ -3,6 +3,8 @@ package health import ( + "sync" + "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -11,13 +13,19 @@ import ( type HealthServer struct { // StatusMap stores the serving status of a service - StatusMap map[string]int32 + statusMap map[string]int32 +} + +func NewHealthServer() *HealthServer { + return &HealthServer{ + statusMap: make(map[string]int32), + } } func (s *HealthServer) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (out *healthpb.HealthCheckResponse, err error) { service := in.Host + ":" + in.Service out = new(healthpb.HealthCheckResponse) - status, ok := s.StatusMap[service] + status, ok := s.statusMap[service] out.Status = healthpb.HealthCheckResponse_ServingStatus(status) if !ok { err = grpc.Errorf(codes.NotFound, "unknown service") @@ -31,5 +39,8 @@ func (s *HealthServer) Check(ctx context.Context, in *healthpb.HealthCheckReques // or insert a new service entry into the statusMap func (s *HealthServer) SetServingStatus(host string, service string, status int32) { service = host + ":" + service - s.StatusMap[service] = status + var mu sync.Mutex + mu.Lock() + s.statusMap[service] = status + mu.Unlock() } diff --git a/test/end2end_test.go b/test/end2end_test.go index 02950ff1..f220ffca 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -378,9 +378,7 @@ func TestHealthCheckOnSuccess(t *testing.T) { } func testHealthCheckOnSuccess(t *testing.T, e env) { - hs := &health.HealthServer{ - StatusMap: make(map[string]int32), - } + hs := health.NewHealthServer() hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", 1) s, cc := setUp(hs, math.MaxUint32, e) defer tearDown(s, cc) @@ -396,9 +394,7 @@ func TestHealthCheckOnFailure(t *testing.T) { } func testHealthCheckOnFailure(t *testing.T, e env) { - hs := &health.HealthServer{ - StatusMap: make(map[string]int32), - } + hs := health.NewHealthServer() hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", 1) s, cc := setUp(hs, math.MaxUint32, e) defer tearDown(s, cc) @@ -428,9 +424,7 @@ func TestHealthCheckNotFound(t *testing.T) { } func testHealthCheckNotFound(t *testing.T, e env) { - hs := &health.HealthServer{ - StatusMap: make(map[string]int32), - } + hs := health.NewHealthServer() s, cc := setUp(hs, math.MaxUint32, e) defer tearDown(s, cc) if _, err := healthCheck(1*time.Second, cc, "unregister_service"); err != grpc.Errorf(codes.NotFound, "unknown service") { @@ -445,9 +439,7 @@ func TestHealthCheckServing(t *testing.T) { } func testHealthCheckServing(t *testing.T, e env) { - hs := &health.HealthServer{ - StatusMap: make(map[string]int32), - } + hs := health.NewHealthServer() hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", 1) s, cc := setUp(hs, math.MaxUint32, e) defer tearDown(s, cc) @@ -467,9 +459,7 @@ func TestHealthCheckNotServing(t *testing.T) { } func testHealthCheckNotServing(t *testing.T, e env) { - hs := &health.HealthServer{ - StatusMap: make(map[string]int32), - } + hs := health.NewHealthServer() hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", 2) s, cc := setUp(hs, math.MaxUint32, e) defer tearDown(s, cc) From 0252e09f2156f21768e35b23d760fc46eb649d15 Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Thu, 23 Jul 2015 10:55:42 -0700 Subject: [PATCH 06/17] add response trace log for client streaming --- call.go | 4 ++-- stream.go | 9 ++++++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/call.go b/call.go index 3b974721..218fc0f6 100644 --- a/call.go +++ b/call.go @@ -165,7 +165,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli return toRPCErr(err) } if EnableTracing { - c.traceInfo.tr.LazyLog(payload{args}, true) + c.traceInfo.tr.LazyLog(&fmtStringer{"sent: %v", []interface{}{payload{args}}}, true) } stream, err = sendRequest(ctx, cc.dopts.codec, callHdr, t, args, topts) if err != nil { @@ -184,7 +184,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli continue } if EnableTracing { - c.traceInfo.tr.LazyLog(payload{reply}, true) + c.traceInfo.tr.LazyLog(&fmtStringer{"received: %v", []interface{}{payload{reply}}}, true) } t.CloseStream(stream, lastErr) if lastErr != nil { diff --git a/stream.go b/stream.go index c902034d..348fe9a5 100644 --- a/stream.go +++ b/stream.go @@ -166,7 +166,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { if cs.tracing { cs.mu.Lock() if cs.traceInfo.tr != nil { - cs.traceInfo.tr.LazyLog(payload{m}, true) + cs.traceInfo.tr.LazyLog(&fmtStringer{"sent: %v", []interface{}{payload{m}}}, true) } cs.mu.Unlock() } @@ -187,6 +187,13 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { } func (cs *clientStream) RecvMsg(m interface{}) (err error) { + if cs.tracing { + cs.mu.Lock() + if cs.traceInfo.tr != nil { + cs.traceInfo.tr.LazyLog(&fmtStringer{"received: %v", []interface{}{payload{m}}}, true) + } + cs.mu.Unlock() + } err = recv(cs.p, cs.codec, m) defer func() { // err != nil indicates the termination of the stream. From ee67f58505d590485fd1012b6c6a941957b512bf Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Thu, 23 Jul 2015 11:01:18 -0700 Subject: [PATCH 07/17] fix the indent --- health/grpc_health_v1alpha/health.proto | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/health/grpc_health_v1alpha/health.proto b/health/grpc_health_v1alpha/health.proto index de7c0458..07615ba5 100644 --- a/health/grpc_health_v1alpha/health.proto +++ b/health/grpc_health_v1alpha/health.proto @@ -5,15 +5,17 @@ package grpc.health.v1alpha; message HealthCheckRequest { string host = 1; string service = 2; - } +} + message HealthCheckResponse { enum ServingStatus { UNKNOWN = 0; SERVING = 1; NOT_SERVING = 2; - } - ServingStatus status = 1; } + ServingStatus status = 1; +} + service HealthCheck{ - rpc Check( HealthCheckRequest) returns ( HealthCheckResponse); - } + rpc Check(HealthCheckRequest) returns (HealthCheckResponse); +} From 9d4ac0cac5b7593fdff902fc8c544b09dbcf9bfa Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Thu, 23 Jul 2015 14:45:27 -0700 Subject: [PATCH 08/17] fix mutex problems and merge multiple servingstatus to one single test --- call.go | 4 ++-- health/health.go | 23 +++++++++--------- stream.go | 9 +------ test/end2end_test.go | 57 +++++++++++++------------------------------- 4 files changed, 32 insertions(+), 61 deletions(-) diff --git a/call.go b/call.go index 218fc0f6..3b974721 100644 --- a/call.go +++ b/call.go @@ -165,7 +165,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli return toRPCErr(err) } if EnableTracing { - c.traceInfo.tr.LazyLog(&fmtStringer{"sent: %v", []interface{}{payload{args}}}, true) + c.traceInfo.tr.LazyLog(payload{args}, true) } stream, err = sendRequest(ctx, cc.dopts.codec, callHdr, t, args, topts) if err != nil { @@ -184,7 +184,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli continue } if EnableTracing { - c.traceInfo.tr.LazyLog(&fmtStringer{"received: %v", []interface{}{payload{reply}}}, true) + c.traceInfo.tr.LazyLog(payload{reply}, true) } t.CloseStream(stream, lastErr) if lastErr != nil { diff --git a/health/health.go b/health/health.go index de782dc5..819ace4c 100644 --- a/health/health.go +++ b/health/health.go @@ -12,8 +12,9 @@ import ( ) type HealthServer struct { - // StatusMap stores the serving status of a service + // statusMap stores the serving status of the services this HealthServer monitors statusMap map[string]int32 + mu sync.Mutex } func NewHealthServer() *HealthServer { @@ -25,22 +26,22 @@ func NewHealthServer() *HealthServer { func (s *HealthServer) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (out *healthpb.HealthCheckResponse, err error) { service := in.Host + ":" + in.Service out = new(healthpb.HealthCheckResponse) - status, ok := s.statusMap[service] - out.Status = healthpb.HealthCheckResponse_ServingStatus(status) - if !ok { - err = grpc.Errorf(codes.NotFound, "unknown service") - } else { - err = nil + s.mu.Lock() + if status, ok := s.statusMap[service]; ok { + s.mu.Unlock() + return &healthpb.HealthCheckResponse{ + Status: healthpb.HealthCheckResponse_ServingStatus(status), + }, nil } - return out, err + s.mu.Unlock() + return nil, grpc.Errorf(codes.NotFound, "unknown service") } // SetServingStatus is called when need to reset the serving status of a service // or insert a new service entry into the statusMap func (s *HealthServer) SetServingStatus(host string, service string, status int32) { service = host + ":" + service - var mu sync.Mutex - mu.Lock() + s.mu.Lock() s.statusMap[service] = status - mu.Unlock() + s.mu.Unlock() } diff --git a/stream.go b/stream.go index 348fe9a5..c902034d 100644 --- a/stream.go +++ b/stream.go @@ -166,7 +166,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { if cs.tracing { cs.mu.Lock() if cs.traceInfo.tr != nil { - cs.traceInfo.tr.LazyLog(&fmtStringer{"sent: %v", []interface{}{payload{m}}}, true) + cs.traceInfo.tr.LazyLog(payload{m}, true) } cs.mu.Unlock() } @@ -187,13 +187,6 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { } func (cs *clientStream) RecvMsg(m interface{}) (err error) { - if cs.tracing { - cs.mu.Lock() - if cs.traceInfo.tr != nil { - cs.traceInfo.tr.LazyLog(&fmtStringer{"received: %v", []interface{}{payload{m}}}, true) - } - cs.mu.Unlock() - } err = recv(cs.p, cs.codec, m) defer func() { // err != nil indicates the termination of the stream. diff --git a/test/end2end_test.go b/test/end2end_test.go index f220ffca..ba674bf5 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -93,17 +93,6 @@ func newPayload(t testpb.PayloadType, size int32) *testpb.Payload { } } -func healthCheck(t time.Duration, cc *grpc.ClientConn, serviceName string) (*healthpb.HealthCheckResponse, error) { - ctx, _ := context.WithTimeout(context.Background(), t) - hc := healthpb.NewHealthCheckClient(cc) - req := &healthpb.HealthCheckRequest{ - Host: "", - Service: serviceName, - } - out, err := hc.Check(ctx, req) - return out, err -} - func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { md, ok := metadata.FromContext(ctx) if ok { @@ -371,6 +360,17 @@ func testTimeoutOnDeadServer(t *testing.T, e env) { cc.Close() } +func healthCheck(t time.Duration, cc *grpc.ClientConn, serviceName string) (*healthpb.HealthCheckResponse, error) { + ctx, _ := context.WithTimeout(context.Background(), t) + hc := healthpb.NewHealthCheckClient(cc) + req := &healthpb.HealthCheckRequest{ + Host: "", + Service: serviceName, + } + out, err := hc.Check(ctx, req) + return out, err +} + func TestHealthCheckOnSuccess(t *testing.T) { for _, e := range listTestEnv() { testHealthCheckOnSuccess(t, e) @@ -417,32 +417,20 @@ func testHealthCheckOff(t *testing.T, e env) { } } -func TestHealthCheckNotFound(t *testing.T) { +func TestHealthCheckServingStatus(t *testing.T) { for _, e := range listTestEnv() { - testHealthCheckNotFound(t, e) + testHealthCheckServingStatus(t, e) } } -func testHealthCheckNotFound(t *testing.T, e env) { +func testHealthCheckServingStatus(t *testing.T, e env) { hs := health.NewHealthServer() s, cc := setUp(hs, math.MaxUint32, e) defer tearDown(s, cc) - if _, err := healthCheck(1*time.Second, cc, "unregister_service"); err != grpc.Errorf(codes.NotFound, "unknown service") { + if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != grpc.Errorf(codes.NotFound, "unknown service") { t.Fatalf("TestHealthCheck(_)=_, %v, want error code %d", err, codes.NotFound) } -} - -func TestHealthCheckServing(t *testing.T) { - for _, e := range listTestEnv() { - testHealthCheckServing(t, e) - } -} - -func testHealthCheckServing(t *testing.T, e env) { - hs := health.NewHealthServer() hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", 1) - s, cc := setUp(hs, math.MaxUint32, e) - defer tearDown(s, cc) out, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck") if err != nil { t.Fatalf("TestHealthCheck(_)=_, %v, want _,", err) @@ -450,26 +438,15 @@ func testHealthCheckServing(t *testing.T, e env) { if out.Status != healthpb.HealthCheckResponse_SERVING { t.Fatalf("Got the serving status %v, want SERVING", out.Status) } -} - -func TestHealthCheckNotServing(t *testing.T) { - for _, e := range listTestEnv() { - testHealthCheckNotServing(t, e) - } -} - -func testHealthCheckNotServing(t *testing.T, e env) { - hs := health.NewHealthServer() hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", 2) - s, cc := setUp(hs, math.MaxUint32, e) - defer tearDown(s, cc) - out, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck") + out, err = healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck") if err != nil { t.Fatalf("TestHealthCheck(_)=_, %v, want _,", err) } if out.Status != healthpb.HealthCheckResponse_NOT_SERVING { t.Fatalf("Got the serving status %v, want NOT_SERVING ") } + } func TestEmptyUnary(t *testing.T) { From ec53eba4e7e63f191c53be3692a6ced9f338c919 Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Thu, 23 Jul 2015 16:10:35 -0700 Subject: [PATCH 09/17] add derfer to unlock --- health/health.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/health/health.go b/health/health.go index 819ace4c..7670acb7 100644 --- a/health/health.go +++ b/health/health.go @@ -27,13 +27,12 @@ func (s *HealthServer) Check(ctx context.Context, in *healthpb.HealthCheckReques service := in.Host + ":" + in.Service out = new(healthpb.HealthCheckResponse) s.mu.Lock() + defer s.mu.Unlock() if status, ok := s.statusMap[service]; ok { - s.mu.Unlock() return &healthpb.HealthCheckResponse{ Status: healthpb.HealthCheckResponse_ServingStatus(status), }, nil } - s.mu.Unlock() return nil, grpc.Errorf(codes.NotFound, "unknown service") } From e9dfeb65b7c6daa37580036f7e2debff722e5e4f Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Thu, 23 Jul 2015 16:52:40 -0700 Subject: [PATCH 10/17] fix indent --- health/grpc_health_v1alpha/health.proto | 2 +- health/health.go | 9 ++++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/health/grpc_health_v1alpha/health.proto b/health/grpc_health_v1alpha/health.proto index 07615ba5..747a4f03 100644 --- a/health/grpc_health_v1alpha/health.proto +++ b/health/grpc_health_v1alpha/health.proto @@ -9,7 +9,7 @@ message HealthCheckRequest { message HealthCheckResponse { enum ServingStatus { - UNKNOWN = 0; + UNKNOWN = 0; SERVING = 1; NOT_SERVING = 2; } diff --git a/health/health.go b/health/health.go index 7670acb7..3f689fed 100644 --- a/health/health.go +++ b/health/health.go @@ -12,9 +12,9 @@ import ( ) type HealthServer struct { - // statusMap stores the serving status of the services this HealthServer monitors + mu sync.Mutex + // statusMap stores the serving status of the services this HealthServer monitors. statusMap map[string]int32 - mu sync.Mutex } func NewHealthServer() *HealthServer { @@ -23,9 +23,8 @@ func NewHealthServer() *HealthServer { } } -func (s *HealthServer) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (out *healthpb.HealthCheckResponse, err error) { +func (s *HealthServer) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) { service := in.Host + ":" + in.Service - out = new(healthpb.HealthCheckResponse) s.mu.Lock() defer s.mu.Unlock() if status, ok := s.statusMap[service]; ok { @@ -37,7 +36,7 @@ func (s *HealthServer) Check(ctx context.Context, in *healthpb.HealthCheckReques } // SetServingStatus is called when need to reset the serving status of a service -// or insert a new service entry into the statusMap +// or insert a new service entry into the statusMap. func (s *HealthServer) SetServingStatus(host string, service string, status int32) { service = host + ":" + service s.mu.Lock() From a5348d9bde61ba938e44025441dc4498679968c5 Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Thu, 23 Jul 2015 17:14:27 -0700 Subject: [PATCH 11/17] change error information --- test/end2end_test.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index ba674bf5..dd6141a4 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -383,7 +383,7 @@ func testHealthCheckOnSuccess(t *testing.T, e env) { s, cc := setUp(hs, math.MaxUint32, e) defer tearDown(s, cc) if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != nil { - t.Fatalf("TestHealthCheck(_)=_, %v, want ", err) + t.Fatalf("HealthCheck/Check(,_)= _, %v, want ", err) } } @@ -399,7 +399,7 @@ func testHealthCheckOnFailure(t *testing.T, e env) { s, cc := setUp(hs, math.MaxUint32, e) defer tearDown(s, cc) if _, err := healthCheck(0*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != grpc.Errorf(codes.DeadlineExceeded, "context deadline exceeded") { - t.Fatalf("TestHealthCheck(_)=_, %v, want error code %d", err, codes.DeadlineExceeded) + t.Fatalf("HealthCheck/Check(,_)= _, %v, want error code %d", err, codes.DeadlineExceeded) } } @@ -413,7 +413,7 @@ func testHealthCheckOff(t *testing.T, e env) { s, cc := setUp(nil, math.MaxUint32, e) defer tearDown(s, cc) if _, err := healthCheck(1*time.Second, cc, ""); err != grpc.Errorf(codes.Unimplemented, "unknown service grpc.health.v1alpha.HealthCheck") { - t.Fatalf("TestHealthCheck(_)=_, %v, want error code %d", err, codes.Unimplemented) + t.Fatalf("HealthCheck/Check(,_) = _, %v, want error code %d", err, codes.Unimplemented) } } @@ -428,20 +428,20 @@ func testHealthCheckServingStatus(t *testing.T, e env) { s, cc := setUp(hs, math.MaxUint32, e) defer tearDown(s, cc) if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != grpc.Errorf(codes.NotFound, "unknown service") { - t.Fatalf("TestHealthCheck(_)=_, %v, want error code %d", err, codes.NotFound) + t.Fatalf("HealthCheck/Check(,_)= _, %v, want error code %d", err, codes.NotFound) } - hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", 1) + hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", int32(healthpb.HealthCheckResponse_SERVING)) out, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck") if err != nil { - t.Fatalf("TestHealthCheck(_)=_, %v, want _,", err) + t.Fatalf("HealthCheck/Check(,_)= _, %v, want _,", err) } if out.Status != healthpb.HealthCheckResponse_SERVING { t.Fatalf("Got the serving status %v, want SERVING", out.Status) } - hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", 2) + hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", int32(healthpb.HealthCheckResponse_NOT_SERVING)) out, err = healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck") if err != nil { - t.Fatalf("TestHealthCheck(_)=_, %v, want _,", err) + t.Fatalf("HealthCheck/Check(,_)= _, %v, want _,", err) } if out.Status != healthpb.HealthCheckResponse_NOT_SERVING { t.Fatalf("Got the serving status %v, want NOT_SERVING ") From 8e88badc725bd0f18a2adc968f7d8cc5a9babc4d Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Thu, 23 Jul 2015 17:35:44 -0700 Subject: [PATCH 12/17] modify the fatal message --- test/end2end_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index dd6141a4..3a2e2acd 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -383,7 +383,7 @@ func testHealthCheckOnSuccess(t *testing.T, e env) { s, cc := setUp(hs, math.MaxUint32, e) defer tearDown(s, cc) if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != nil { - t.Fatalf("HealthCheck/Check(,_)= _, %v, want ", err) + t.Fatalf("HealthCheck/Check(_,_)= _, %v, want _, ", err) } } @@ -399,7 +399,7 @@ func testHealthCheckOnFailure(t *testing.T, e env) { s, cc := setUp(hs, math.MaxUint32, e) defer tearDown(s, cc) if _, err := healthCheck(0*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != grpc.Errorf(codes.DeadlineExceeded, "context deadline exceeded") { - t.Fatalf("HealthCheck/Check(,_)= _, %v, want error code %d", err, codes.DeadlineExceeded) + t.Fatalf("HealthCheck/Check(_,_)= _, %v, want _, error code %d", err, codes.DeadlineExceeded) } } @@ -413,7 +413,7 @@ func testHealthCheckOff(t *testing.T, e env) { s, cc := setUp(nil, math.MaxUint32, e) defer tearDown(s, cc) if _, err := healthCheck(1*time.Second, cc, ""); err != grpc.Errorf(codes.Unimplemented, "unknown service grpc.health.v1alpha.HealthCheck") { - t.Fatalf("HealthCheck/Check(,_) = _, %v, want error code %d", err, codes.Unimplemented) + t.Fatalf("HealthCheck/Check(_,_) = _, %v, want _, error code %d", err, codes.Unimplemented) } } @@ -428,12 +428,12 @@ func testHealthCheckServingStatus(t *testing.T, e env) { s, cc := setUp(hs, math.MaxUint32, e) defer tearDown(s, cc) if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != grpc.Errorf(codes.NotFound, "unknown service") { - t.Fatalf("HealthCheck/Check(,_)= _, %v, want error code %d", err, codes.NotFound) + t.Fatalf("HealthCheck/Check(_,_)= _, %v, want _, error code %d", err, codes.NotFound) } hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", int32(healthpb.HealthCheckResponse_SERVING)) out, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck") if err != nil { - t.Fatalf("HealthCheck/Check(,_)= _, %v, want _,", err) + t.Fatalf("HealthCheck/Check(_,_)= _, %v, want _, ", err) } if out.Status != healthpb.HealthCheckResponse_SERVING { t.Fatalf("Got the serving status %v, want SERVING", out.Status) @@ -441,7 +441,7 @@ func testHealthCheckServingStatus(t *testing.T, e env) { hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", int32(healthpb.HealthCheckResponse_NOT_SERVING)) out, err = healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck") if err != nil { - t.Fatalf("HealthCheck/Check(,_)= _, %v, want _,", err) + t.Fatalf("HealthCheck/Check(,_)= _, %v, want _, ", err) } if out.Status != healthpb.HealthCheckResponse_NOT_SERVING { t.Fatalf("Got the serving status %v, want NOT_SERVING ") From cdea3603c347b149064495abe7699b95120669d4 Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Thu, 23 Jul 2015 17:40:49 -0700 Subject: [PATCH 13/17] add space --- test/end2end_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 3a2e2acd..e01f0268 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -383,7 +383,7 @@ func testHealthCheckOnSuccess(t *testing.T, e env) { s, cc := setUp(hs, math.MaxUint32, e) defer tearDown(s, cc) if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != nil { - t.Fatalf("HealthCheck/Check(_,_)= _, %v, want _, ", err) + t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, ", err) } } @@ -399,7 +399,7 @@ func testHealthCheckOnFailure(t *testing.T, e env) { s, cc := setUp(hs, math.MaxUint32, e) defer tearDown(s, cc) if _, err := healthCheck(0*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != grpc.Errorf(codes.DeadlineExceeded, "context deadline exceeded") { - t.Fatalf("HealthCheck/Check(_,_)= _, %v, want _, error code %d", err, codes.DeadlineExceeded) + t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, error code %d", err, codes.DeadlineExceeded) } } @@ -413,7 +413,7 @@ func testHealthCheckOff(t *testing.T, e env) { s, cc := setUp(nil, math.MaxUint32, e) defer tearDown(s, cc) if _, err := healthCheck(1*time.Second, cc, ""); err != grpc.Errorf(codes.Unimplemented, "unknown service grpc.health.v1alpha.HealthCheck") { - t.Fatalf("HealthCheck/Check(_,_) = _, %v, want _, error code %d", err, codes.Unimplemented) + t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, error code %d", err, codes.Unimplemented) } } @@ -428,12 +428,12 @@ func testHealthCheckServingStatus(t *testing.T, e env) { s, cc := setUp(hs, math.MaxUint32, e) defer tearDown(s, cc) if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != grpc.Errorf(codes.NotFound, "unknown service") { - t.Fatalf("HealthCheck/Check(_,_)= _, %v, want _, error code %d", err, codes.NotFound) + t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, error code %d", err, codes.NotFound) } hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", int32(healthpb.HealthCheckResponse_SERVING)) out, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck") if err != nil { - t.Fatalf("HealthCheck/Check(_,_)= _, %v, want _, ", err) + t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, ", err) } if out.Status != healthpb.HealthCheckResponse_SERVING { t.Fatalf("Got the serving status %v, want SERVING", out.Status) @@ -441,7 +441,7 @@ func testHealthCheckServingStatus(t *testing.T, e env) { hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", int32(healthpb.HealthCheckResponse_NOT_SERVING)) out, err = healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck") if err != nil { - t.Fatalf("HealthCheck/Check(,_)= _, %v, want _, ", err) + t.Fatalf("HealthCheck/Check(, _) = _, %v, want _, ", err) } if out.Status != healthpb.HealthCheckResponse_NOT_SERVING { t.Fatalf("Got the serving status %v, want NOT_SERVING ") From 89139c920f742bb860587ce99d8ec05823eca6db Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Thu, 23 Jul 2015 17:41:57 -0700 Subject: [PATCH 14/17] format --- test/end2end_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index e01f0268..ac5c4171 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -441,7 +441,7 @@ func testHealthCheckServingStatus(t *testing.T, e env) { hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", int32(healthpb.HealthCheckResponse_NOT_SERVING)) out, err = healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck") if err != nil { - t.Fatalf("HealthCheck/Check(, _) = _, %v, want _, ", err) + t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, ", err) } if out.Status != healthpb.HealthCheckResponse_NOT_SERVING { t.Fatalf("Got the serving status %v, want NOT_SERVING ") From a09d93d3162b2b1ba1264d10f0e2262bb1227033 Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Thu, 23 Jul 2015 17:47:32 -0700 Subject: [PATCH 15/17] fix fatal message --- test/end2end_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index ac5c4171..327f977c 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -444,7 +444,7 @@ func testHealthCheckServingStatus(t *testing.T, e env) { t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, ", err) } if out.Status != healthpb.HealthCheckResponse_NOT_SERVING { - t.Fatalf("Got the serving status %v, want NOT_SERVING ") + t.Fatalf("Got the serving status %v, want NOT_SERVING", out.Status) } } From e6a0e51ad1af1d11e06bd9c7bd20db797e6d1740 Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Thu, 23 Jul 2015 18:01:20 -0700 Subject: [PATCH 16/17] modify the status map type --- health/health.go | 8 ++++---- test/end2end_test.go | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/health/health.go b/health/health.go index 3f689fed..e7b63853 100644 --- a/health/health.go +++ b/health/health.go @@ -14,12 +14,12 @@ import ( type HealthServer struct { mu sync.Mutex // statusMap stores the serving status of the services this HealthServer monitors. - statusMap map[string]int32 + statusMap map[string]healthpb.HealthCheckResponse_ServingStatus } func NewHealthServer() *HealthServer { return &HealthServer{ - statusMap: make(map[string]int32), + statusMap: make(map[string]healthpb.HealthCheckResponse_ServingStatus), } } @@ -29,7 +29,7 @@ func (s *HealthServer) Check(ctx context.Context, in *healthpb.HealthCheckReques defer s.mu.Unlock() if status, ok := s.statusMap[service]; ok { return &healthpb.HealthCheckResponse{ - Status: healthpb.HealthCheckResponse_ServingStatus(status), + Status: status, }, nil } return nil, grpc.Errorf(codes.NotFound, "unknown service") @@ -37,7 +37,7 @@ func (s *HealthServer) Check(ctx context.Context, in *healthpb.HealthCheckReques // SetServingStatus is called when need to reset the serving status of a service // or insert a new service entry into the statusMap. -func (s *HealthServer) SetServingStatus(host string, service string, status int32) { +func (s *HealthServer) SetServingStatus(host string, service string, status healthpb.HealthCheckResponse_ServingStatus) { service = host + ":" + service s.mu.Lock() s.statusMap[service] = status diff --git a/test/end2end_test.go b/test/end2end_test.go index 327f977c..e7859b9b 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -430,7 +430,7 @@ func testHealthCheckServingStatus(t *testing.T, e env) { if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != grpc.Errorf(codes.NotFound, "unknown service") { t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, error code %d", err, codes.NotFound) } - hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", int32(healthpb.HealthCheckResponse_SERVING)) + hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", healthpb.HealthCheckResponse_SERVING) out, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck") if err != nil { t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, ", err) @@ -438,7 +438,7 @@ func testHealthCheckServingStatus(t *testing.T, e env) { if out.Status != healthpb.HealthCheckResponse_SERVING { t.Fatalf("Got the serving status %v, want SERVING", out.Status) } - hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", int32(healthpb.HealthCheckResponse_NOT_SERVING)) + hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", healthpb.HealthCheckResponse_NOT_SERVING) out, err = healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck") if err != nil { t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, ", err) From b022c50ebee44926c8f6e7238d2eb0d462d3f927 Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Thu, 23 Jul 2015 18:06:59 -0700 Subject: [PATCH 17/17] modify something --- test/end2end_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index e7859b9b..8430ed28 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -367,8 +367,7 @@ func healthCheck(t time.Duration, cc *grpc.ClientConn, serviceName string) (*hea Host: "", Service: serviceName, } - out, err := hc.Check(ctx, req) - return out, err + return hc.Check(ctx, req) } func TestHealthCheckOnSuccess(t *testing.T) {