From e16607685d532f33a7d982b64c2532aae6da39a5 Mon Sep 17 00:00:00 2001 From: Can Guler Date: Fri, 26 Oct 2018 13:00:42 -0700 Subject: [PATCH] healthcheck: clientHealthCheck and its unit test (#2389) Adds clientHealthCheck and a unit test for exponential backoff functionality. --- healthcheck/healthcheck.go | 107 ++++++++++++++++++++++++++++++++ healthcheck/healthcheck_test.go | 58 +++++++++++++++++ 2 files changed, 165 insertions(+) create mode 100644 healthcheck/healthcheck.go create mode 100644 healthcheck/healthcheck_test.go diff --git a/healthcheck/healthcheck.go b/healthcheck/healthcheck.go new file mode 100644 index 00000000..a4d0fc92 --- /dev/null +++ b/healthcheck/healthcheck.go @@ -0,0 +1,107 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package healthcheck + +import ( + "fmt" + "io" + "time" + + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + healthpb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/backoff" + "google.golang.org/grpc/status" +) + +const maxDelay = 120 * time.Second + +var backoffStrategy = backoff.Exponential{MaxDelay: maxDelay} +var backoffFunc = func(ctx context.Context, retries int) bool { + d := backoffStrategy.Backoff(retries) + timer := time.NewTimer(d) + select { + case <-timer.C: + return true + case <-ctx.Done(): + timer.Stop() + return false + } +} + +func init() { + internal.HealthCheckFunc = clientHealthCheck +} + +func clientHealthCheck(ctx context.Context, newStream func() (interface{}, error), reportHealth func(bool), service string) error { + tryCnt := 0 + +retryConnection: + for { + // Backs off if the connection has failed in some way without receiving a message in the previous retry. + if tryCnt > 0 && !backoffFunc(ctx, tryCnt-1) { + return nil + } + tryCnt++ + + if ctx.Err() != nil { + return nil + } + rawS, err := newStream() + if err != nil { + continue retryConnection + } + + s, ok := rawS.(grpc.ClientStream) + // Ideally, this should never happen. But if it happens, the server is marked as healthy for LBing purposes. + if !ok { + reportHealth(true) + return fmt.Errorf("newStream returned %v (type %T); want grpc.ClientStream", rawS, rawS) + } + + if err = s.SendMsg(&healthpb.HealthCheckRequest{Service: service}); err != nil && err != io.EOF { + // Stream should have been closed, so we can safely continue to create a new stream. + continue retryConnection + } + s.CloseSend() + + resp := new(healthpb.HealthCheckResponse) + for { + err = s.RecvMsg(resp) + + // Reports healthy for the LBing purposes if health check is not implemented in the server. + if status.Code(err) == codes.Unimplemented { + reportHealth(true) + return err + } + + // Reports unhealthy if server's Watch method gives an error other than UNIMPLEMENTED. + if err != nil { + reportHealth(false) + continue retryConnection + } + + // As a message has been received, removes the need for backoff for the next retry by reseting the try count. + tryCnt = 0 + reportHealth(resp.Status == healthpb.HealthCheckResponse_SERVING) + } + } +} diff --git a/healthcheck/healthcheck_test.go b/healthcheck/healthcheck_test.go new file mode 100644 index 00000000..8fb88b1e --- /dev/null +++ b/healthcheck/healthcheck_test.go @@ -0,0 +1,58 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package healthcheck + +import ( + "errors" + "reflect" + "testing" + "time" + + "golang.org/x/net/context" +) + +func TestClientHealthCheckBackoff(t *testing.T) { + const maxRetries = 5 + + var want []time.Duration + for i := 0; i < maxRetries; i++ { + want = append(want, time.Duration(i+1)*time.Second) + } + + var got []time.Duration + newStream := func() (interface{}, error) { + if len(got) < maxRetries { + return nil, errors.New("backoff") + } + return nil, nil + } + + oldBackoffFunc := backoffFunc + backoffFunc = func(ctx context.Context, retries int) bool { + got = append(got, time.Duration(retries+1)*time.Second) + return true + } + defer func() { backoffFunc = oldBackoffFunc }() + + clientHealthCheck(context.Background(), newStream, func(_ bool) {}, "test") + + if !reflect.DeepEqual(got, want) { + t.Fatalf("Backoff durations for %v retries are %v. (expected: %v)", maxRetries, got, want) + } +}