health check test: prevent double close of hcEnterChan (#2441)

This commit is contained in:
lyuxuan
2018-12-13 16:44:36 -08:00
committed by GitHub
parent b74673af89
commit 5da252b6a6
5 changed files with 297 additions and 286 deletions

View File

@ -36,7 +36,6 @@ import (
"google.golang.org/grpc/connectivity" "google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog" "google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff" "google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/envconfig"
@ -1220,7 +1219,7 @@ func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts
// 3. a service config with non-empty healthCheckConfig field is provided, // 3. a service config with non-empty healthCheckConfig field is provided,
// 4. the current load balancer allows it. // 4. the current load balancer allows it.
if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled { if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
if internal.HealthCheckFunc != nil { if ac.cc.dopts.healthCheckFunc != nil {
go ac.startHealthCheck(hcCtx, newTr, addr, healthCheckConfig.ServiceName) go ac.startHealthCheck(hcCtx, newTr, addr, healthCheckConfig.ServiceName)
close(allowedToReset) close(allowedToReset)
return nil return nil
@ -1281,7 +1280,7 @@ func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.Client
} }
} }
err := internal.HealthCheckFunc(ctx, newStream, reportHealth, serviceName) err := ac.cc.dopts.healthCheckFunc(ctx, newStream, reportHealth, serviceName)
if err != nil { if err != nil {
if status.Code(err) == codes.Unimplemented { if status.Code(err) == codes.Unimplemented {
if channelz.IsOn() { if channelz.IsOn() {

View File

@ -60,6 +60,7 @@ type dialOptions struct {
disableServiceConfig bool disableServiceConfig bool
disableRetry bool disableRetry bool
disableHealthCheck bool disableHealthCheck bool
healthCheckFunc internal.HealthChecker
} }
// DialOption configures how we set up the connection. // DialOption configures how we set up the connection.
@ -338,6 +339,7 @@ func withContextDialer(f func(context.Context, string) (net.Conn, error)) DialOp
func init() { func init() {
internal.WithContextDialer = withContextDialer internal.WithContextDialer = withContextDialer
internal.WithResolverBuilder = withResolverBuilder internal.WithResolverBuilder = withResolverBuilder
internal.WithHealthCheckFunc = withHealthCheckFunc
} }
// WithDialer returns a DialOption that specifies a function to use for dialing // WithDialer returns a DialOption that specifies a function to use for dialing
@ -468,10 +470,22 @@ func WithDisableHealthCheck() DialOption {
o.disableHealthCheck = true o.disableHealthCheck = true
}) })
} }
// withHealthCheckFunc replaces the default health check function with the provided one. It makes
// tests easier to change the health check function.
//
// For testing purpose only.
func withHealthCheckFunc(f internal.HealthChecker) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.healthCheckFunc = f
})
}
func defaultDialOptions() dialOptions { func defaultDialOptions() dialOptions {
return dialOptions{ return dialOptions{
disableRetry: !envconfig.Retry, disableRetry: !envconfig.Retry,
reqHandshake: envconfig.RequireHandshake, reqHandshake: envconfig.RequireHandshake,
healthCheckFunc: internal.HealthCheckFunc,
copts: transport.ConnectOptions{ copts: transport.ConnectOptions{
WriteBufferSize: defaultWriteBufSize, WriteBufferSize: defaultWriteBufSize,
ReadBufferSize: defaultReadBufSize, ReadBufferSize: defaultReadBufSize,

View File

@ -23,16 +23,21 @@ package internal
import "context" import "context"
var ( var (
// WithContextDialer is exported by clientconn.go // WithContextDialer is exported by dialoptions.go
WithContextDialer interface{} // func(context.Context, string) (net.Conn, error) grpc.DialOption WithContextDialer interface{} // func(context.Context, string) (net.Conn, error) grpc.DialOption
// WithResolverBuilder is exported by clientconn.go // WithResolverBuilder is exported by dialoptions.go
WithResolverBuilder interface{} // func (resolver.Builder) grpc.DialOption WithResolverBuilder interface{} // func (resolver.Builder) grpc.DialOption
// WithHealthCheckFunc is not exported by dialoptions.go
WithHealthCheckFunc interface{} // func (HealthChecker) DialOption
// HealthCheckFunc is used to provide client-side LB channel health checking // HealthCheckFunc is used to provide client-side LB channel health checking
HealthCheckFunc func(ctx context.Context, newStream func() (interface{}, error), reportHealth func(bool), serviceName string) error HealthCheckFunc HealthChecker
// BalancerUnregister is exported by package balancer to unregister a balancer. // BalancerUnregister is exported by package balancer to unregister a balancer.
BalancerUnregister func(name string) BalancerUnregister func(name string)
) )
// HealthChecker defines the signature of the client-side LB channel health checking function.
type HealthChecker func(ctx context.Context, newStream func() (interface{}, error), reportHealth func(bool), serviceName string) error
const ( const (
// CredsBundleModeFallback switches GoogleDefaultCreds to fallback mode. // CredsBundleModeFallback switches GoogleDefaultCreds to fallback mode.
CredsBundleModeFallback = "fallback" CredsBundleModeFallback = "fallback"

View File

@ -44,14 +44,6 @@ import (
var testHealthCheckFunc = internal.HealthCheckFunc var testHealthCheckFunc = internal.HealthCheckFunc
func replaceHealthCheckFunc(f func(context.Context, func() (interface{}, error), func(bool), string) error) func() {
oldHcFunc := internal.HealthCheckFunc
internal.HealthCheckFunc = f
return func() {
internal.HealthCheckFunc = oldHcFunc
}
}
func newTestHealthServer() *testHealthServer { func newTestHealthServer() *testHealthServer {
return newTestHealthServerWithWatchFunc(defaultWatchFunc) return newTestHealthServerWithWatchFunc(defaultWatchFunc)
} }
@ -120,17 +112,69 @@ func (s *testHealthServer) SetServingStatus(service string, status healthpb.Heal
s.mu.Unlock() s.mu.Unlock()
} }
func setupHealthCheckWrapper() (hcEnterChan chan struct{}, hcExitChan chan struct{}, wrapper func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error) {
hcEnterChan = make(chan struct{})
hcExitChan = make(chan struct{})
wrapper = func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error {
close(hcEnterChan)
defer close(hcExitChan)
return testHealthCheckFunc(ctx, newStream, update, service)
}
return
}
type svrConfig struct {
specialWatchFunc func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error
}
func setupServer(sc *svrConfig) (s *grpc.Server, lis net.Listener, ts *testHealthServer, deferFunc func(), err error) {
s = grpc.NewServer()
lis, err = net.Listen("tcp", "localhost:0")
if err != nil {
return nil, nil, nil, func() {}, fmt.Errorf("failed to listen due to err %v", err)
}
if sc.specialWatchFunc != nil {
ts = newTestHealthServerWithWatchFunc(sc.specialWatchFunc)
} else {
ts = newTestHealthServer()
}
healthgrpc.RegisterHealthServer(s, ts)
testpb.RegisterTestServiceServer(s, &testServer{})
go s.Serve(lis)
return s, lis, ts, s.Stop, nil
}
type clientConfig struct {
balancerName string
testHealthCheckFuncWrapper func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error
extraDialOption []grpc.DialOption
}
func setupClient(c *clientConfig) (cc *grpc.ClientConn, r *manual.Resolver, deferFunc func(), err error) {
r, rcleanup := manual.GenerateAndRegisterManualResolver()
var opts []grpc.DialOption
opts = append(opts, grpc.WithInsecure(), grpc.WithBalancerName(c.balancerName))
if c.testHealthCheckFuncWrapper != nil {
opts = append(opts, internal.WithHealthCheckFunc.(func(internal.HealthChecker) grpc.DialOption)(c.testHealthCheckFuncWrapper))
}
for _, dopt := range c.extraDialOption {
opts = append(opts, dopt)
}
cc, err = grpc.Dial(r.Scheme()+":///test.server", opts...)
if err != nil {
rcleanup()
return nil, nil, nil, fmt.Errorf("dial failed due to err: %v", err)
}
return cc, r, func() { cc.Close(); rcleanup() }, nil
}
func TestHealthCheckWatchStateChange(t *testing.T) { func TestHealthCheckWatchStateChange(t *testing.T) {
defer leakcheck.Check(t) defer leakcheck.Check(t)
s := grpc.NewServer() _, lis, ts, deferFunc, err := setupServer(&svrConfig{})
lis, err := net.Listen("tcp", "localhost:0") defer deferFunc()
if err != nil { if err != nil {
t.Fatalf("failed to listen due to err: %v", err) t.Fatal(err)
} }
ts := newTestHealthServer()
healthgrpc.RegisterHealthServer(s, ts)
go s.Serve(lis)
defer s.Stop()
// The table below shows the expected series of addrConn connectivity transitions when server // The table below shows the expected series of addrConn connectivity transitions when server
// updates its health status. As there's only one addrConn corresponds with the ClientConn in this // updates its health status. As there's only one addrConn corresponds with the ClientConn in this
@ -145,13 +189,12 @@ func TestHealthCheckWatchStateChange(t *testing.T) {
//| UNKNOWN | ->TRANSIENT FAILURE | //| UNKNOWN | ->TRANSIENT FAILURE |
//+------------------------------+-------------------------------------------+ //+------------------------------+-------------------------------------------+
ts.SetServingStatus("foo", healthpb.HealthCheckResponse_NOT_SERVING) ts.SetServingStatus("foo", healthpb.HealthCheckResponse_NOT_SERVING)
r, rcleanup := manual.GenerateAndRegisterManualResolver()
defer rcleanup() cc, r, deferFunc, err := setupClient(&clientConfig{balancerName: "round_robin"})
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin"))
if err != nil { if err != nil {
t.Fatalf("dial failed due to err: %v", err) t.Fatal(err)
} }
defer cc.Close() defer deferFunc()
r.NewServiceConfig(`{ r.NewServiceConfig(`{
"healthCheckConfig": { "healthCheckConfig": {
@ -215,13 +258,11 @@ func TestHealthCheckHealthServerNotRegistered(t *testing.T) {
go s.Serve(lis) go s.Serve(lis)
defer s.Stop() defer s.Stop()
r, rcleanup := manual.GenerateAndRegisterManualResolver() cc, r, deferFunc, err := setupClient(&clientConfig{balancerName: "round_robin"})
defer rcleanup()
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin"))
if err != nil { if err != nil {
t.Fatalf("dial failed due to err: %v", err) t.Fatal(err)
} }
defer cc.Close() defer deferFunc()
r.NewServiceConfig(`{ r.NewServiceConfig(`{
"healthCheckConfig": { "healthCheckConfig": {
@ -247,33 +288,25 @@ func TestHealthCheckHealthServerNotRegistered(t *testing.T) {
// function should exit. // function should exit.
func TestHealthCheckWithGoAway(t *testing.T) { func TestHealthCheckWithGoAway(t *testing.T) {
defer leakcheck.Check(t) defer leakcheck.Check(t)
s := grpc.NewServer()
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("failed to listen due to err: %v", err)
}
ts := newTestHealthServer()
healthgrpc.RegisterHealthServer(s, ts)
testpb.RegisterTestServiceServer(s, &testServer{})
go s.Serve(lis)
defer s.Stop()
ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
hcExitChan := make(chan struct{})
testHealthCheckFuncWrapper := func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error {
err := testHealthCheckFunc(ctx, newStream, update, service)
close(hcExitChan)
return err
}
replace := replaceHealthCheckFunc(testHealthCheckFuncWrapper)
defer replace()
r, rcleanup := manual.GenerateAndRegisterManualResolver()
defer rcleanup()
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin")) hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
s, lis, ts, deferFunc, err := setupServer(&svrConfig{})
defer deferFunc()
if err != nil { if err != nil {
t.Fatalf("dial failed due to err: %v", err) t.Fatal(err)
} }
defer cc.Close()
ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
cc, r, deferFunc, err := setupClient(&clientConfig{
balancerName: "round_robin",
testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
})
if err != nil {
t.Fatal(err)
}
defer deferFunc()
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
r.NewServiceConfig(`{ r.NewServiceConfig(`{
@ -328,6 +361,11 @@ func TestHealthCheckWithGoAway(t *testing.T) {
select { select {
case <-hcExitChan: case <-hcExitChan:
case <-time.After(5 * time.Second): case <-time.After(5 * time.Second):
select {
case <-hcEnterChan:
default:
t.Fatal("Health check function has not entered after 5s.")
}
t.Fatal("Health check function has not exited after 5s.") t.Fatal("Health check function has not exited after 5s.")
} }
@ -342,33 +380,27 @@ func TestHealthCheckWithGoAway(t *testing.T) {
func TestHealthCheckWithConnClose(t *testing.T) { func TestHealthCheckWithConnClose(t *testing.T) {
defer leakcheck.Check(t) defer leakcheck.Check(t)
s := grpc.NewServer()
lis, err := net.Listen("tcp", "localhost:0") hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
s, lis, ts, deferFunc, err := setupServer(&svrConfig{})
defer deferFunc()
if err != nil { if err != nil {
t.Fatalf("failed to listen due to err: %v", err) t.Fatal(err)
}
ts := newTestHealthServer()
healthgrpc.RegisterHealthServer(s, ts)
testpb.RegisterTestServiceServer(s, &testServer{})
go s.Serve(lis)
defer s.Stop()
ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
hcExitChan := make(chan struct{})
testHealthCheckFuncWrapper := func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error {
err := testHealthCheckFunc(ctx, newStream, update, service)
close(hcExitChan)
return err
} }
replace := replaceHealthCheckFunc(testHealthCheckFuncWrapper) ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
defer replace()
r, rcleanup := manual.GenerateAndRegisterManualResolver() cc, r, deferFunc, err := setupClient(&clientConfig{
defer rcleanup() balancerName: "round_robin",
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin")) testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
extraDialOption: []grpc.DialOption{grpc.WithWaitForHandshake()},
})
if err != nil { if err != nil {
t.Fatalf("dial failed due to err: %v", err) t.Fatal(err)
} }
defer cc.Close() defer deferFunc()
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
r.NewServiceConfig(`{ r.NewServiceConfig(`{
@ -399,6 +431,11 @@ func TestHealthCheckWithConnClose(t *testing.T) {
select { select {
case <-hcExitChan: case <-hcExitChan:
case <-time.After(5 * time.Second): case <-time.After(5 * time.Second):
select {
case <-hcEnterChan:
default:
t.Fatal("Health check function has not entered after 5s.")
}
t.Fatal("Health check function has not exited after 5s.") t.Fatal("Health check function has not exited after 5s.")
} }
} }
@ -407,33 +444,25 @@ func TestHealthCheckWithConnClose(t *testing.T) {
// address list returned by the resolver. // address list returned by the resolver.
func TestHealthCheckWithAddrConnDrain(t *testing.T) { func TestHealthCheckWithAddrConnDrain(t *testing.T) {
defer leakcheck.Check(t) defer leakcheck.Check(t)
s := grpc.NewServer()
lis, err := net.Listen("tcp", "localhost:0") hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
_, lis, ts, deferFunc, err := setupServer(&svrConfig{})
defer deferFunc()
if err != nil { if err != nil {
t.Fatalf("failed to listen due to err: %v", err) t.Fatal(err)
}
ts := newTestHealthServer()
healthgrpc.RegisterHealthServer(s, ts)
testpb.RegisterTestServiceServer(s, &testServer{})
go s.Serve(lis)
defer s.Stop()
ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
hcExitChan := make(chan struct{})
testHealthCheckFuncWrapper := func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error {
err := testHealthCheckFunc(ctx, newStream, update, service)
close(hcExitChan)
return err
} }
replace := replaceHealthCheckFunc(testHealthCheckFuncWrapper) ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
defer replace()
r, rcleanup := manual.GenerateAndRegisterManualResolver() cc, r, deferFunc, err := setupClient(&clientConfig{
defer rcleanup() balancerName: "round_robin",
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin")) testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
})
if err != nil { if err != nil {
t.Fatalf("dial failed due to err: %v", err) t.Fatal(err)
} }
defer cc.Close() defer deferFunc()
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
r.NewServiceConfig(`{ r.NewServiceConfig(`{
@ -487,6 +516,11 @@ func TestHealthCheckWithAddrConnDrain(t *testing.T) {
select { select {
case <-hcExitChan: case <-hcExitChan:
case <-time.After(5 * time.Second): case <-time.After(5 * time.Second):
select {
case <-hcEnterChan:
default:
t.Fatal("Health check function has not entered after 5s.")
}
t.Fatal("Health check function has not exited after 5s.") t.Fatal("Health check function has not exited after 5s.")
} }
@ -502,33 +536,25 @@ func TestHealthCheckWithAddrConnDrain(t *testing.T) {
// ClientConn close will lead to its addrConns being torn down. // ClientConn close will lead to its addrConns being torn down.
func TestHealthCheckWithClientConnClose(t *testing.T) { func TestHealthCheckWithClientConnClose(t *testing.T) {
defer leakcheck.Check(t) defer leakcheck.Check(t)
s := grpc.NewServer()
lis, err := net.Listen("tcp", "localhost:0") hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
_, lis, ts, deferFunc, err := setupServer(&svrConfig{})
defer deferFunc()
if err != nil { if err != nil {
t.Fatalf("failed to listen due to err: %v", err) t.Fatal(err)
}
ts := newTestHealthServer()
healthgrpc.RegisterHealthServer(s, ts)
testpb.RegisterTestServiceServer(s, &testServer{})
go s.Serve(lis)
defer s.Stop()
ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
hcExitChan := make(chan struct{})
testHealthCheckFuncWrapper := func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error {
err := testHealthCheckFunc(ctx, newStream, update, service)
close(hcExitChan)
return err
} }
replace := replaceHealthCheckFunc(testHealthCheckFuncWrapper) ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
defer replace()
r, rcleanup := manual.GenerateAndRegisterManualResolver() cc, r, deferFunc, err := setupClient(&clientConfig{
defer rcleanup() balancerName: "round_robin",
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin")) testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
})
if err != nil { if err != nil {
t.Fatalf("dial failed due to err: %v", err) t.Fatal(err)
} }
defer cc.Close() defer deferFunc()
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
r.NewServiceConfig(`{ r.NewServiceConfig(`{
@ -560,6 +586,11 @@ func TestHealthCheckWithClientConnClose(t *testing.T) {
select { select {
case <-hcExitChan: case <-hcExitChan:
case <-time.After(5 * time.Second): case <-time.After(5 * time.Second):
select {
case <-hcEnterChan:
default:
t.Fatal("Health check function has not entered after 5s.")
}
t.Fatal("Health check function has not exited after 5s.") t.Fatal("Health check function has not exited after 5s.")
} }
} }
@ -569,49 +600,40 @@ func TestHealthCheckWithClientConnClose(t *testing.T) {
// onGoAway/onClose goroutine. // onGoAway/onClose goroutine.
func TestHealthCheckWithoutReportHealthCalledAddrConnShutDown(t *testing.T) { func TestHealthCheckWithoutReportHealthCalledAddrConnShutDown(t *testing.T) {
defer leakcheck.Check(t) defer leakcheck.Check(t)
s := grpc.NewServer()
lis, err := net.Listen("tcp", "localhost:0") hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
if err != nil {
t.Fatalf("failed to listen due to err %v", err) _, lis, ts, deferFunc, err := setupServer(&svrConfig{
} specialWatchFunc: func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
ts := newTestHealthServerWithWatchFunc(func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error { if in.Service != "delay" {
if in.Service != "delay" { return status.Error(codes.FailedPrecondition,
return status.Error(codes.FailedPrecondition, "this special Watch function only handles request with service name to be \"delay\"")
"this special Watch function only handles request with service name to be \"delay\"") }
} // Do nothing to mock a delay of health check response from server side.
// Do nothing to mock a delay of health check response from server side. // This case is to help with the test that covers the condition that reportHealth is not
// This case is to help with the test that covers the condition that reportHealth is not // called inside HealthCheckFunc before the func returns.
// called inside HealthCheckFunc before the func returns. select {
select { case <-stream.Context().Done():
case <-stream.Context().Done(): case <-time.After(5 * time.Second):
case <-time.After(5 * time.Second): }
} return nil
return nil },
}) })
healthgrpc.RegisterHealthServer(s, ts) defer deferFunc()
testpb.RegisterTestServiceServer(s, &testServer{}) if err != nil {
go s.Serve(lis) t.Fatal(err)
defer s.Stop() }
ts.SetServingStatus("delay", healthpb.HealthCheckResponse_SERVING) ts.SetServingStatus("delay", healthpb.HealthCheckResponse_SERVING)
hcEnterChan := make(chan struct{}) _, r, deferFunc, err := setupClient(&clientConfig{
hcExitChan := make(chan struct{}) balancerName: "round_robin",
testHealthCheckFuncWrapper := func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error { testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
close(hcEnterChan) })
err := testHealthCheckFunc(ctx, newStream, update, service)
close(hcExitChan)
return err
}
replace := replaceHealthCheckFunc(testHealthCheckFuncWrapper)
defer replace()
r, rcleanup := manual.GenerateAndRegisterManualResolver()
defer rcleanup()
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin"))
if err != nil { if err != nil {
t.Fatalf("dial failed due to err: %v", err) t.Fatal(err)
} }
defer cc.Close() defer deferFunc()
// The serviceName "delay" is specially handled at server side, where response will not be sent // The serviceName "delay" is specially handled at server side, where response will not be sent
// back to client immediately upon receiving the request (client should receive no response until // back to client immediately upon receiving the request (client should receive no response until
@ -653,49 +675,41 @@ func TestHealthCheckWithoutReportHealthCalledAddrConnShutDown(t *testing.T) {
// onGoAway/onClose goroutine. // onGoAway/onClose goroutine.
func TestHealthCheckWithoutReportHealthCalled(t *testing.T) { func TestHealthCheckWithoutReportHealthCalled(t *testing.T) {
defer leakcheck.Check(t) defer leakcheck.Check(t)
s := grpc.NewServer()
lis, err := net.Listen("tcp", "localhost:0") hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
if err != nil {
t.Fatalf("failed to listen due to err: %v", err) s, lis, ts, deferFunc, err := setupServer(&svrConfig{
} specialWatchFunc: func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
ts := newTestHealthServerWithWatchFunc(func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error { if in.Service != "delay" {
if in.Service != "delay" { return status.Error(codes.FailedPrecondition,
return status.Error(codes.FailedPrecondition, "this special Watch function only handles request with service name to be \"delay\"")
"this special Watch function only handles request with service name to be \"delay\"") }
} // Do nothing to mock a delay of health check response from server side.
// Do nothing to mock a delay of health check response from server side. // This case is to help with the test that covers the condition that reportHealth is not
// This case is to help with the test that covers the condition that reportHealth is not // called inside HealthCheckFunc before the func returns.
// called inside HealthCheckFunc before the func returns. select {
select { case <-stream.Context().Done():
case <-stream.Context().Done(): case <-time.After(5 * time.Second):
case <-time.After(5 * time.Second): }
} return nil
return nil },
}) })
healthgrpc.RegisterHealthServer(s, ts) defer deferFunc()
testpb.RegisterTestServiceServer(s, &testServer{}) if err != nil {
go s.Serve(lis) t.Fatal(err)
defer s.Stop() }
ts.SetServingStatus("delay", healthpb.HealthCheckResponse_SERVING) ts.SetServingStatus("delay", healthpb.HealthCheckResponse_SERVING)
hcEnterChan := make(chan struct{}) _, r, deferFunc, err := setupClient(&clientConfig{
hcExitChan := make(chan struct{}) balancerName: "round_robin",
testHealthCheckFuncWrapper := func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error { testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
close(hcEnterChan) extraDialOption: []grpc.DialOption{grpc.WithWaitForHandshake()},
err := testHealthCheckFunc(ctx, newStream, update, service) })
close(hcExitChan)
return err
}
replace := replaceHealthCheckFunc(testHealthCheckFuncWrapper)
defer replace()
r, rcleanup := manual.GenerateAndRegisterManualResolver()
defer rcleanup()
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin"))
if err != nil { if err != nil {
t.Fatalf("dial failed due to err: %v", err) t.Fatal(err)
} }
defer cc.Close() defer deferFunc()
// The serviceName "delay" is specially handled at server side, where response will not be sent // The serviceName "delay" is specially handled at server side, where response will not be sent
// back to client immediately upon receiving the request (client should receive no response until // back to client immediately upon receiving the request (client should receive no response until
@ -733,22 +747,20 @@ func TestHealthCheckWithoutReportHealthCalled(t *testing.T) {
} }
func testHealthCheckDisableWithDialOption(t *testing.T, addr string) { func testHealthCheckDisableWithDialOption(t *testing.T, addr string) {
hcEnterChan := make(chan struct{}) hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
testHealthCheckFuncWrapper := func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error {
close(hcEnterChan)
return nil
}
replace := replaceHealthCheckFunc(testHealthCheckFuncWrapper) cc, r, deferFunc, err := setupClient(&clientConfig{
defer replace() balancerName: "round_robin",
r, rcleanup := manual.GenerateAndRegisterManualResolver() testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
defer rcleanup() extraDialOption: []grpc.DialOption{grpc.WithDisableHealthCheck()},
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin"), grpc.WithDisableHealthCheck()) })
if err != nil { if err != nil {
t.Fatalf("dial failed due to err: %v", err) t.Fatal(err)
} }
defer deferFunc()
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
defer cc.Close()
r.NewServiceConfig(`{ r.NewServiceConfig(`{
"healthCheckConfig": { "healthCheckConfig": {
"serviceName": "foo" "serviceName": "foo"
@ -774,22 +786,19 @@ func testHealthCheckDisableWithDialOption(t *testing.T, addr string) {
} }
func testHealthCheckDisableWithBalancer(t *testing.T, addr string) { func testHealthCheckDisableWithBalancer(t *testing.T, addr string) {
hcEnterChan := make(chan struct{}) hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
testHealthCheckFuncWrapper := func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error {
close(hcEnterChan)
return nil
}
replace := replaceHealthCheckFunc(testHealthCheckFuncWrapper) cc, r, deferFunc, err := setupClient(&clientConfig{
defer replace() balancerName: "pick_first",
r, rcleanup := manual.GenerateAndRegisterManualResolver() testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
defer rcleanup() })
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("pick_first"))
if err != nil { if err != nil {
t.Fatalf("dial failed due to err: %v", err) t.Fatal(err)
} }
defer deferFunc()
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
defer cc.Close()
r.NewServiceConfig(`{ r.NewServiceConfig(`{
"healthCheckConfig": { "healthCheckConfig": {
"serviceName": "foo" "serviceName": "foo"
@ -815,22 +824,18 @@ func testHealthCheckDisableWithBalancer(t *testing.T, addr string) {
} }
func testHealthCheckDisableWithServiceConfig(t *testing.T, addr string) { func testHealthCheckDisableWithServiceConfig(t *testing.T, addr string) {
hcEnterChan := make(chan struct{}) hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
testHealthCheckFuncWrapper := func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error {
close(hcEnterChan)
return nil
}
replace := replaceHealthCheckFunc(testHealthCheckFuncWrapper) cc, r, deferFunc, err := setupClient(&clientConfig{
defer replace() balancerName: "round_robin",
r, rcleanup := manual.GenerateAndRegisterManualResolver() testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
defer rcleanup() })
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin"))
if err != nil { if err != nil {
t.Fatalf("dial failed due to err: %v", err) t.Fatal(err)
} }
defer deferFunc()
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
defer cc.Close()
r.NewAddress([]resolver.Address{{Addr: addr}}) r.NewAddress([]resolver.Address{{Addr: addr}})
@ -853,17 +858,12 @@ func testHealthCheckDisableWithServiceConfig(t *testing.T, addr string) {
func TestHealthCheckDisable(t *testing.T) { func TestHealthCheckDisable(t *testing.T) {
defer leakcheck.Check(t) defer leakcheck.Check(t)
// set up server side
s := grpc.NewServer() _, lis, ts, deferFunc, err := setupServer(&svrConfig{})
lis, err := net.Listen("tcp", "localhost:0") defer deferFunc()
if err != nil { if err != nil {
t.Fatalf("failed to listen due to err: %v", err) t.Fatal(err)
} }
ts := newTestHealthServer()
healthgrpc.RegisterHealthServer(s, ts)
testpb.RegisterTestServiceServer(s, &testServer{})
go s.Serve(lis)
defer s.Stop()
ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING) ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)
// test client side disabling configuration. // test client side disabling configuration.
@ -874,30 +874,26 @@ func TestHealthCheckDisable(t *testing.T) {
func TestHealthCheckChannelzCountingCallSuccess(t *testing.T) { func TestHealthCheckChannelzCountingCallSuccess(t *testing.T) {
defer leakcheck.Check(t) defer leakcheck.Check(t)
s := grpc.NewServer()
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("failed to listen due to err: %v", err)
}
ts := newTestHealthServerWithWatchFunc(func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
if in.Service != "channelzSuccess" {
return status.Error(codes.FailedPrecondition,
"this special Watch function only handles request with service name to be \"channelzSuccess\"")
}
return status.Error(codes.OK, "fake success")
})
healthgrpc.RegisterHealthServer(s, ts)
testpb.RegisterTestServiceServer(s, &testServer{})
go s.Serve(lis)
defer s.Stop()
r, rcleanup := manual.GenerateAndRegisterManualResolver() _, lis, _, deferFunc, err := setupServer(&svrConfig{
defer rcleanup() specialWatchFunc: func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin")) if in.Service != "channelzSuccess" {
return status.Error(codes.FailedPrecondition,
"this special Watch function only handles request with service name to be \"channelzSuccess\"")
}
return status.Error(codes.OK, "fake success")
},
})
defer deferFunc()
if err != nil { if err != nil {
t.Fatalf("dial failed due to err: %v", err) t.Fatal(err)
} }
defer cc.Close()
_, r, deferFunc, err := setupClient(&clientConfig{balancerName: "round_robin"})
if err != nil {
t.Fatal(err)
}
defer deferFunc()
r.NewServiceConfig(`{ r.NewServiceConfig(`{
"healthCheckConfig": { "healthCheckConfig": {
@ -935,30 +931,26 @@ func TestHealthCheckChannelzCountingCallSuccess(t *testing.T) {
func TestHealthCheckChannelzCountingCallFailure(t *testing.T) { func TestHealthCheckChannelzCountingCallFailure(t *testing.T) {
defer leakcheck.Check(t) defer leakcheck.Check(t)
s := grpc.NewServer()
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("failed to listen due to err: %v", err)
}
ts := newTestHealthServerWithWatchFunc(func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
if in.Service != "channelzFailure" {
return status.Error(codes.FailedPrecondition,
"this special Watch function only handles request with service name to be \"channelzFailure\"")
}
return status.Error(codes.Internal, "fake failure")
})
healthgrpc.RegisterHealthServer(s, ts)
testpb.RegisterTestServiceServer(s, &testServer{})
go s.Serve(lis)
defer s.Stop()
r, rcleanup := manual.GenerateAndRegisterManualResolver() _, lis, _, deferFunc, err := setupServer(&svrConfig{
defer rcleanup() specialWatchFunc: func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin")) if in.Service != "channelzFailure" {
return status.Error(codes.FailedPrecondition,
"this special Watch function only handles request with service name to be \"channelzFailure\"")
}
return status.Error(codes.Internal, "fake failure")
},
})
if err != nil { if err != nil {
t.Fatalf("dial failed due to err: %v", err) t.Fatal(err)
} }
defer cc.Close() defer deferFunc()
_, r, deferFunc, err := setupClient(&clientConfig{balancerName: "round_robin"})
if err != nil {
t.Fatal(err)
}
defer deferFunc()
r.NewServiceConfig(`{ r.NewServiceConfig(`{
"healthCheckConfig": { "healthCheckConfig": {

1
vet.sh
View File

@ -132,5 +132,6 @@ internal/transport/transport_test.go:SA2002
stats/stats_test.go:SA1019 stats/stats_test.go:SA1019
test/channelz_test.go:SA1019 test/channelz_test.go:SA1019
test/end2end_test.go:SA1019 test/end2end_test.go:SA1019
test/healthcheck_test.go:SA1019
' ./... ' ./...
misspell -error . misspell -error .