mirror of
https://github.com/grafana/grafana.git
synced 2025-07-30 17:42:35 +08:00
105 lines
2.7 KiB
Go
105 lines
2.7 KiB
Go
package resource
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"time"
|
|
|
|
grpcAuth "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/auth"
|
|
"google.golang.org/grpc/health/grpc_health_v1"
|
|
|
|
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
|
)
|
|
|
|
// Compile-time assertion
|
|
var _ HealthService = &healthServer{}
|
|
|
|
type HealthService interface {
|
|
grpc_health_v1.HealthServer
|
|
grpcAuth.ServiceAuthFuncOverride
|
|
}
|
|
|
|
func ProvideHealthService(server resourcepb.DiagnosticsServer) (grpc_health_v1.HealthServer, error) {
|
|
h := &healthServer{srv: server}
|
|
return h, nil
|
|
}
|
|
|
|
type healthServer struct {
|
|
srv resourcepb.DiagnosticsServer
|
|
}
|
|
|
|
// AuthFuncOverride for no auth for health service.
|
|
func (s *healthServer) AuthFuncOverride(ctx context.Context, _ string) (context.Context, error) {
|
|
return ctx, nil
|
|
}
|
|
|
|
func (s *healthServer) List(ctx context.Context, req *grpc_health_v1.HealthListRequest) (*grpc_health_v1.HealthListResponse, error) {
|
|
h, err := s.Check(ctx, &grpc_health_v1.HealthCheckRequest{
|
|
Service: "all", // not used for anything
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &grpc_health_v1.HealthListResponse{
|
|
Statuses: map[string]*grpc_health_v1.HealthCheckResponse{
|
|
"all": h,
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func (s *healthServer) Check(ctx context.Context, _ *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
|
|
r, err := s.srv.IsHealthy(ctx, &resourcepb.HealthCheckRequest{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &grpc_health_v1.HealthCheckResponse{
|
|
Status: grpc_health_v1.HealthCheckResponse_ServingStatus(r.Status.Number()),
|
|
}, nil
|
|
}
|
|
|
|
func (s *healthServer) Watch(_ *grpc_health_v1.HealthCheckRequest, stream grpc_health_v1.Health_WatchServer) error {
|
|
h, err := s.srv.IsHealthy(stream.Context(), &resourcepb.HealthCheckRequest{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// send initial health status
|
|
err = stream.Send(&grpc_health_v1.HealthCheckResponse{
|
|
Status: grpc_health_v1.HealthCheckResponse_ServingStatus(h.Status.Number()),
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
currHealth := h.Status.Number()
|
|
ticker := time.NewTicker(5 * time.Second)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
// get current health status
|
|
h, err := s.srv.IsHealthy(stream.Context(), &resourcepb.HealthCheckRequest{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// if health status has not changed, continue
|
|
if h.Status.Number() == currHealth {
|
|
continue
|
|
}
|
|
|
|
// send the new health status
|
|
currHealth = h.Status.Number()
|
|
err = stream.Send(&grpc_health_v1.HealthCheckResponse{
|
|
Status: grpc_health_v1.HealthCheckResponse_ServingStatus(h.Status.Number()),
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
case <-stream.Context().Done():
|
|
return errors.New("stream closed, context cancelled")
|
|
}
|
|
}
|
|
}
|