This commit is contained in:
iamqizhao
2015-07-23 18:52:14 -07:00
5 changed files with 154 additions and 57 deletions

View File

@ -1,13 +0,0 @@
syntax = "proto3";
package grpc.health;
message HealthCheckRequest{
}
message HealthCheckResponse{
}
service HealthCheck{
rpc Check( HealthCheckRequest) returns ( HealthCheckResponse);
}

View File

@ -3,7 +3,7 @@
// DO NOT EDIT! // DO NOT EDIT!
/* /*
Package grpc_health is a generated protocol buffer package. Package grpc_health_v1alpha is a generated protocol buffer package.
It is generated from these files: It is generated from these files:
health.proto health.proto
@ -12,7 +12,7 @@ It has these top-level messages:
HealthCheckRequest HealthCheckRequest
HealthCheckResponse HealthCheckResponse
*/ */
package grpc_health package grpc_health_v1alpha
import proto "github.com/golang/protobuf/proto" import proto "github.com/golang/protobuf/proto"
@ -28,7 +28,32 @@ var _ grpc.ClientConn
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal var _ = proto.Marshal
type HealthCheckResponse_ServingStatus int32
const (
HealthCheckResponse_UNKNOWN HealthCheckResponse_ServingStatus = 0
HealthCheckResponse_SERVING HealthCheckResponse_ServingStatus = 1
HealthCheckResponse_NOT_SERVING HealthCheckResponse_ServingStatus = 2
)
var HealthCheckResponse_ServingStatus_name = map[int32]string{
0: "UNKNOWN",
1: "SERVING",
2: "NOT_SERVING",
}
var HealthCheckResponse_ServingStatus_value = map[string]int32{
"UNKNOWN": 0,
"SERVING": 1,
"NOT_SERVING": 2,
}
func (x HealthCheckResponse_ServingStatus) String() string {
return proto.EnumName(HealthCheckResponse_ServingStatus_name, int32(x))
}
type HealthCheckRequest struct { type HealthCheckRequest struct {
Host string `protobuf:"bytes,1,opt,name=host" json:"host,omitempty"`
Service string `protobuf:"bytes,2,opt,name=service" json:"service,omitempty"`
} }
func (m *HealthCheckRequest) Reset() { *m = HealthCheckRequest{} } func (m *HealthCheckRequest) Reset() { *m = HealthCheckRequest{} }
@ -36,6 +61,7 @@ func (m *HealthCheckRequest) String() string { return proto.CompactTextString(m)
func (*HealthCheckRequest) ProtoMessage() {} func (*HealthCheckRequest) ProtoMessage() {}
type HealthCheckResponse struct { type HealthCheckResponse struct {
Status HealthCheckResponse_ServingStatus `protobuf:"varint,1,opt,name=status,enum=grpc.health.v1alpha.HealthCheckResponse_ServingStatus" json:"status,omitempty"`
} }
func (m *HealthCheckResponse) Reset() { *m = HealthCheckResponse{} } func (m *HealthCheckResponse) Reset() { *m = HealthCheckResponse{} }
@ -43,6 +69,7 @@ func (m *HealthCheckResponse) String() string { return proto.CompactTextString(m
func (*HealthCheckResponse) ProtoMessage() {} func (*HealthCheckResponse) ProtoMessage() {}
func init() { func init() {
proto.RegisterEnum("grpc.health.v1alpha.HealthCheckResponse_ServingStatus", HealthCheckResponse_ServingStatus_name, HealthCheckResponse_ServingStatus_value)
} }
// Client API for HealthCheck service // Client API for HealthCheck service
@ -61,7 +88,7 @@ func NewHealthCheckClient(cc *grpc.ClientConn) HealthCheckClient {
func (c *healthCheckClient) Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) { func (c *healthCheckClient) Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) {
out := new(HealthCheckResponse) out := new(HealthCheckResponse)
err := grpc.Invoke(ctx, "/grpc.health.HealthCheck/Check", in, out, c.cc, opts...) err := grpc.Invoke(ctx, "/grpc.health.v1alpha.HealthCheck/Check", in, out, c.cc, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -91,7 +118,7 @@ func _HealthCheck_Check_Handler(srv interface{}, ctx context.Context, codec grpc
} }
var _HealthCheck_serviceDesc = grpc.ServiceDesc{ var _HealthCheck_serviceDesc = grpc.ServiceDesc{
ServiceName: "grpc.health.HealthCheck", ServiceName: "grpc.health.v1alpha.HealthCheck",
HandlerType: (*HealthCheckServer)(nil), HandlerType: (*HealthCheckServer)(nil),
Methods: []grpc.MethodDesc{ Methods: []grpc.MethodDesc{
{ {

View File

@ -0,0 +1,21 @@
syntax = "proto3";
package grpc.health.v1alpha;
message HealthCheckRequest {
string host = 1;
string service = 2;
}
message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
}
ServingStatus status = 1;
}
service HealthCheck{
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
}

View File

@ -3,26 +3,43 @@
package health package health
import ( import (
"time" "sync"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc" "google.golang.org/grpc"
healthpb "google.golang.org/grpc/health/grpc_health" "google.golang.org/grpc/codes"
healthpb "google.golang.org/grpc/health/grpc_health_v1alpha"
) )
// HealthCheck is the client side function to health-check a server type HealthServer struct {
func HealthCheck(t time.Duration, cc *grpc.ClientConn) error { mu sync.Mutex
ctx, _ := context.WithTimeout(context.Background(), t) // statusMap stores the serving status of the services this HealthServer monitors.
hc := healthpb.NewHealthCheckClient(cc) statusMap map[string]healthpb.HealthCheckResponse_ServingStatus
req := new(healthpb.HealthCheckRequest)
_, err := hc.Check(ctx, req)
return err
} }
type HealthServer struct { func NewHealthServer() *HealthServer {
return &HealthServer{
statusMap: make(map[string]healthpb.HealthCheckResponse_ServingStatus),
}
} }
func (s *HealthServer) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) { func (s *HealthServer) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
out := new(healthpb.HealthCheckResponse) service := in.Host + ":" + in.Service
return out, nil s.mu.Lock()
defer s.mu.Unlock()
if status, ok := s.statusMap[service]; ok {
return &healthpb.HealthCheckResponse{
Status: status,
}, nil
}
return nil, grpc.Errorf(codes.NotFound, "unknown service")
}
// SetServingStatus is called when need to reset the serving status of a service
// or insert a new service entry into the statusMap.
func (s *HealthServer) SetServingStatus(host string, service string, status healthpb.HealthCheckResponse_ServingStatus) {
service = host + ":" + service
s.mu.Lock()
s.statusMap[service] = status
s.mu.Unlock()
} }

View File

@ -52,7 +52,7 @@ import (
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog" "google.golang.org/grpc/grpclog"
"google.golang.org/grpc/health" "google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health" healthpb "google.golang.org/grpc/health/grpc_health_v1alpha"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
testpb "google.golang.org/grpc/test/grpc_testing" testpb "google.golang.org/grpc/test/grpc_testing"
) )
@ -285,7 +285,7 @@ func listTestEnv() []env {
return []env{env{"tcp", nil, ""}, env{"tcp", nil, "tls"}, env{"unix", unixDialer, ""}, env{"unix", unixDialer, "tls"}} return []env{env{"tcp", nil, ""}, env{"tcp", nil, "tls"}, env{"unix", unixDialer, ""}, env{"unix", unixDialer, "tls"}}
} }
func setUp(healthCheck bool, maxStream uint32, e env) (s *grpc.Server, cc *grpc.ClientConn) { func setUp(hs *health.HealthServer, maxStream uint32, e env) (s *grpc.Server, cc *grpc.ClientConn) {
sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(maxStream)} sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(maxStream)}
la := ":0" la := ":0"
switch e.network { switch e.network {
@ -305,8 +305,8 @@ func setUp(healthCheck bool, maxStream uint32, e env) (s *grpc.Server, cc *grpc.
sopts = append(sopts, grpc.Creds(creds)) sopts = append(sopts, grpc.Creds(creds))
} }
s = grpc.NewServer(sopts...) s = grpc.NewServer(sopts...)
if healthCheck { if hs != nil {
healthpb.RegisterHealthCheckServer(s, &health.HealthServer{}) healthpb.RegisterHealthCheckServer(s, hs)
} }
testpb.RegisterTestServiceServer(s, &testServer{}) testpb.RegisterTestServiceServer(s, &testServer{})
go s.Serve(lis) go s.Serve(lis)
@ -347,7 +347,7 @@ func TestTimeoutOnDeadServer(t *testing.T) {
} }
func testTimeoutOnDeadServer(t *testing.T, e env) { func testTimeoutOnDeadServer(t *testing.T, e env) {
s, cc := setUp(false, math.MaxUint32, e) s, cc := setUp(nil, math.MaxUint32, e)
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
s.Stop() s.Stop()
// Set -1 as the timeout to make sure if transportMonitor gets error // Set -1 as the timeout to make sure if transportMonitor gets error
@ -360,6 +360,16 @@ func testTimeoutOnDeadServer(t *testing.T, e env) {
cc.Close() cc.Close()
} }
func healthCheck(t time.Duration, cc *grpc.ClientConn, serviceName string) (*healthpb.HealthCheckResponse, error) {
ctx, _ := context.WithTimeout(context.Background(), t)
hc := healthpb.NewHealthCheckClient(cc)
req := &healthpb.HealthCheckRequest{
Host: "",
Service: serviceName,
}
return hc.Check(ctx, req)
}
func TestHealthCheckOnSuccess(t *testing.T) { func TestHealthCheckOnSuccess(t *testing.T) {
for _, e := range listTestEnv() { for _, e := range listTestEnv() {
testHealthCheckOnSuccess(t, e) testHealthCheckOnSuccess(t, e)
@ -367,10 +377,12 @@ func TestHealthCheckOnSuccess(t *testing.T) {
} }
func testHealthCheckOnSuccess(t *testing.T, e env) { func testHealthCheckOnSuccess(t *testing.T, e env) {
s, cc := setUp(true, math.MaxUint32, e) hs := health.NewHealthServer()
hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", 1)
s, cc := setUp(hs, math.MaxUint32, e)
defer tearDown(s, cc) defer tearDown(s, cc)
if err := health.HealthCheck(1*time.Second, cc); err != nil { if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != nil {
t.Fatalf("HealthCheck(_)=_, %v, want <nil>", err) t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, <nil>", err)
} }
} }
@ -381,10 +393,12 @@ func TestHealthCheckOnFailure(t *testing.T) {
} }
func testHealthCheckOnFailure(t *testing.T, e env) { func testHealthCheckOnFailure(t *testing.T, e env) {
s, cc := setUp(true, math.MaxUint32, e) hs := health.NewHealthServer()
hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", 1)
s, cc := setUp(hs, math.MaxUint32, e)
defer tearDown(s, cc) defer tearDown(s, cc)
if err := health.HealthCheck(0*time.Second, cc); err != grpc.Errorf(codes.DeadlineExceeded, "context deadline exceeded") { if _, err := healthCheck(0*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != grpc.Errorf(codes.DeadlineExceeded, "context deadline exceeded") {
t.Fatalf("HealthCheck(_)=_, %v, want error code %d", err, codes.DeadlineExceeded) t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, error code %d", err, codes.DeadlineExceeded)
} }
} }
@ -395,14 +409,45 @@ func TestHealthCheckOff(t *testing.T) {
} }
func testHealthCheckOff(t *testing.T, e env) { func testHealthCheckOff(t *testing.T, e env) {
s, cc := setUp(false, math.MaxUint32, e) s, cc := setUp(nil, math.MaxUint32, e)
defer tearDown(s, cc) defer tearDown(s, cc)
err := health.HealthCheck(1*time.Second, cc) if _, err := healthCheck(1*time.Second, cc, ""); err != grpc.Errorf(codes.Unimplemented, "unknown service grpc.health.v1alpha.HealthCheck") {
if err != grpc.Errorf(codes.Unimplemented, "unknown service grpc.health.HealthCheck") { t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, error code %d", err, codes.Unimplemented)
t.Fatalf("HealthCheck(_)=_, %v, want error code %d", err, codes.DeadlineExceeded)
} }
} }
func TestHealthCheckServingStatus(t *testing.T) {
for _, e := range listTestEnv() {
testHealthCheckServingStatus(t, e)
}
}
func testHealthCheckServingStatus(t *testing.T, e env) {
hs := health.NewHealthServer()
s, cc := setUp(hs, math.MaxUint32, e)
defer tearDown(s, cc)
if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != grpc.Errorf(codes.NotFound, "unknown service") {
t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, error code %d", err, codes.NotFound)
}
hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", healthpb.HealthCheckResponse_SERVING)
out, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck")
if err != nil {
t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, <nil>", err)
}
if out.Status != healthpb.HealthCheckResponse_SERVING {
t.Fatalf("Got the serving status %v, want SERVING", out.Status)
}
hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", healthpb.HealthCheckResponse_NOT_SERVING)
out, err = healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck")
if err != nil {
t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, <nil>", err)
}
if out.Status != healthpb.HealthCheckResponse_NOT_SERVING {
t.Fatalf("Got the serving status %v, want NOT_SERVING", out.Status)
}
}
func TestEmptyUnary(t *testing.T) { func TestEmptyUnary(t *testing.T) {
for _, e := range listTestEnv() { for _, e := range listTestEnv() {
testEmptyUnary(t, e) testEmptyUnary(t, e)
@ -410,7 +455,7 @@ func TestEmptyUnary(t *testing.T) {
} }
func testEmptyUnary(t *testing.T, e env) { func testEmptyUnary(t *testing.T, e env) {
s, cc := setUp(false, math.MaxUint32, e) s, cc := setUp(nil, math.MaxUint32, e)
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc) defer tearDown(s, cc)
reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}) reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{})
@ -426,7 +471,7 @@ func TestFailedEmptyUnary(t *testing.T) {
} }
func testFailedEmptyUnary(t *testing.T, e env) { func testFailedEmptyUnary(t *testing.T, e env) {
s, cc := setUp(false, math.MaxUint32, e) s, cc := setUp(nil, math.MaxUint32, e)
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc) defer tearDown(s, cc)
ctx := metadata.NewContext(context.Background(), testMetadata) ctx := metadata.NewContext(context.Background(), testMetadata)
@ -442,7 +487,7 @@ func TestLargeUnary(t *testing.T) {
} }
func testLargeUnary(t *testing.T, e env) { func testLargeUnary(t *testing.T, e env) {
s, cc := setUp(false, math.MaxUint32, e) s, cc := setUp(nil, math.MaxUint32, e)
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc) defer tearDown(s, cc)
argSize := 271828 argSize := 271828
@ -470,7 +515,7 @@ func TestMetadataUnaryRPC(t *testing.T) {
} }
func testMetadataUnaryRPC(t *testing.T, e env) { func testMetadataUnaryRPC(t *testing.T, e env) {
s, cc := setUp(false, math.MaxUint32, e) s, cc := setUp(nil, math.MaxUint32, e)
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc) defer tearDown(s, cc)
argSize := 2718 argSize := 2718
@ -524,7 +569,7 @@ func TestRetry(t *testing.T) {
// TODO(zhaoq): Refactor to make this clearer and add more cases to test racy // TODO(zhaoq): Refactor to make this clearer and add more cases to test racy
// and error-prone paths. // and error-prone paths.
func testRetry(t *testing.T, e env) { func testRetry(t *testing.T, e env) {
s, cc := setUp(false, math.MaxUint32, e) s, cc := setUp(nil, math.MaxUint32, e)
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc) defer tearDown(s, cc)
var wg sync.WaitGroup var wg sync.WaitGroup
@ -554,7 +599,7 @@ func TestRPCTimeout(t *testing.T) {
// TODO(zhaoq): Have a better test coverage of timeout and cancellation mechanism. // TODO(zhaoq): Have a better test coverage of timeout and cancellation mechanism.
func testRPCTimeout(t *testing.T, e env) { func testRPCTimeout(t *testing.T, e env) {
s, cc := setUp(false, math.MaxUint32, e) s, cc := setUp(nil, math.MaxUint32, e)
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc) defer tearDown(s, cc)
argSize := 2718 argSize := 2718
@ -580,7 +625,7 @@ func TestCancel(t *testing.T) {
} }
func testCancel(t *testing.T, e env) { func testCancel(t *testing.T, e env) {
s, cc := setUp(false, math.MaxUint32, e) s, cc := setUp(nil, math.MaxUint32, e)
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc) defer tearDown(s, cc)
argSize := 2718 argSize := 2718
@ -612,7 +657,7 @@ func TestPingPong(t *testing.T) {
} }
func testPingPong(t *testing.T, e env) { func testPingPong(t *testing.T, e env) {
s, cc := setUp(false, math.MaxUint32, e) s, cc := setUp(nil, math.MaxUint32, e)
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc) defer tearDown(s, cc)
stream, err := tc.FullDuplexCall(context.Background()) stream, err := tc.FullDuplexCall(context.Background())
@ -663,7 +708,7 @@ func TestMetadataStreamingRPC(t *testing.T) {
} }
func testMetadataStreamingRPC(t *testing.T, e env) { func testMetadataStreamingRPC(t *testing.T, e env) {
s, cc := setUp(false, math.MaxUint32, e) s, cc := setUp(nil, math.MaxUint32, e)
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc) defer tearDown(s, cc)
ctx := metadata.NewContext(context.Background(), testMetadata) ctx := metadata.NewContext(context.Background(), testMetadata)
@ -720,7 +765,7 @@ func TestServerStreaming(t *testing.T) {
} }
func testServerStreaming(t *testing.T, e env) { func testServerStreaming(t *testing.T, e env) {
s, cc := setUp(false, math.MaxUint32, e) s, cc := setUp(nil, math.MaxUint32, e)
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc) defer tearDown(s, cc)
respParam := make([]*testpb.ResponseParameters, len(respSizes)) respParam := make([]*testpb.ResponseParameters, len(respSizes))
@ -772,7 +817,7 @@ func TestFailedServerStreaming(t *testing.T) {
} }
func testFailedServerStreaming(t *testing.T, e env) { func testFailedServerStreaming(t *testing.T, e env) {
s, cc := setUp(false, math.MaxUint32, e) s, cc := setUp(nil, math.MaxUint32, e)
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc) defer tearDown(s, cc)
respParam := make([]*testpb.ResponseParameters, len(respSizes)) respParam := make([]*testpb.ResponseParameters, len(respSizes))
@ -802,7 +847,7 @@ func TestClientStreaming(t *testing.T) {
} }
func testClientStreaming(t *testing.T, e env) { func testClientStreaming(t *testing.T, e env) {
s, cc := setUp(false, math.MaxUint32, e) s, cc := setUp(nil, math.MaxUint32, e)
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc) defer tearDown(s, cc)
stream, err := tc.StreamingInputCall(context.Background()) stream, err := tc.StreamingInputCall(context.Background())
@ -837,7 +882,7 @@ func TestExceedMaxStreamsLimit(t *testing.T) {
func testExceedMaxStreamsLimit(t *testing.T, e env) { func testExceedMaxStreamsLimit(t *testing.T, e env) {
// Only allows 1 live stream per server transport. // Only allows 1 live stream per server transport.
s, cc := setUp(false, 1, e) s, cc := setUp(nil, 1, e)
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc) defer tearDown(s, cc)
// Perform a unary RPC to make sure the new settings were propagated to the client. // Perform a unary RPC to make sure the new settings were propagated to the client.