mirror of
https://github.com/grafana/grafana.git
synced 2025-07-29 14:52:26 +08:00
417 lines
12 KiB
Go
417 lines
12 KiB
Go
package sql
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"os"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/trace"
|
|
"google.golang.org/grpc/health/grpc_health_v1"
|
|
|
|
"github.com/grafana/authlib/grpcutils"
|
|
"github.com/grafana/dskit/kv"
|
|
"github.com/grafana/dskit/netutil"
|
|
"github.com/grafana/dskit/ring"
|
|
"github.com/grafana/dskit/services"
|
|
|
|
infraDB "github.com/grafana/grafana/pkg/infra/db"
|
|
"github.com/grafana/grafana/pkg/infra/log"
|
|
"github.com/grafana/grafana/pkg/modules"
|
|
"github.com/grafana/grafana/pkg/services/authz"
|
|
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
|
"github.com/grafana/grafana/pkg/services/grpcserver"
|
|
"github.com/grafana/grafana/pkg/services/grpcserver/interceptors"
|
|
"github.com/grafana/grafana/pkg/setting"
|
|
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
|
"github.com/grafana/grafana/pkg/storage/unified/resource/grpc"
|
|
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
|
"github.com/grafana/grafana/pkg/storage/unified/search"
|
|
"github.com/grafana/grafana/pkg/util/scheduler"
|
|
)
|
|
|
|
var (
|
|
_ UnifiedStorageGrpcService = (*service)(nil)
|
|
)
|
|
|
|
type UnifiedStorageGrpcService interface {
|
|
services.NamedService
|
|
|
|
// Return the address where this service is running
|
|
GetAddress() string
|
|
}
|
|
|
|
type service struct {
|
|
*services.BasicService
|
|
|
|
// Subservices manager
|
|
subservices *services.Manager
|
|
subservicesWatcher *services.FailureWatcher
|
|
hasSubservices bool
|
|
|
|
cfg *setting.Cfg
|
|
features featuremgmt.FeatureToggles
|
|
db infraDB.DB
|
|
stopCh chan struct{}
|
|
stoppedCh chan error
|
|
|
|
handler grpcserver.Provider
|
|
|
|
tracing trace.Tracer
|
|
|
|
authenticator func(ctx context.Context) (context.Context, error)
|
|
|
|
log log.Logger
|
|
reg prometheus.Registerer
|
|
storageMetrics *resource.StorageMetrics
|
|
indexMetrics *resource.BleveIndexMetrics
|
|
|
|
docBuilders resource.DocumentBuilderSupplier
|
|
|
|
searchRing *ring.Ring
|
|
ringLifecycler *ring.BasicLifecycler
|
|
|
|
queue QOSEnqueueDequeuer
|
|
scheduler *scheduler.Scheduler
|
|
}
|
|
|
|
func ProvideUnifiedStorageGrpcService(
|
|
cfg *setting.Cfg,
|
|
features featuremgmt.FeatureToggles,
|
|
db infraDB.DB,
|
|
log log.Logger,
|
|
reg prometheus.Registerer,
|
|
docBuilders resource.DocumentBuilderSupplier,
|
|
storageMetrics *resource.StorageMetrics,
|
|
indexMetrics *resource.BleveIndexMetrics,
|
|
searchRing *ring.Ring,
|
|
memberlistKVConfig kv.Config,
|
|
) (UnifiedStorageGrpcService, error) {
|
|
var err error
|
|
tracer := otel.Tracer("unified-storage")
|
|
|
|
// FIXME: This is a temporary solution while we are migrating to the new authn interceptor
|
|
// grpcutils.NewGrpcAuthenticator should be used instead.
|
|
authn := NewAuthenticatorWithFallback(cfg, reg, tracer, func(ctx context.Context) (context.Context, error) {
|
|
auth := grpc.Authenticator{Tracer: tracer}
|
|
return auth.Authenticate(ctx)
|
|
})
|
|
|
|
s := &service{
|
|
cfg: cfg,
|
|
features: features,
|
|
stopCh: make(chan struct{}),
|
|
authenticator: authn,
|
|
tracing: tracer,
|
|
db: db,
|
|
log: log,
|
|
reg: reg,
|
|
docBuilders: docBuilders,
|
|
storageMetrics: storageMetrics,
|
|
indexMetrics: indexMetrics,
|
|
searchRing: searchRing,
|
|
subservicesWatcher: services.NewFailureWatcher(),
|
|
}
|
|
|
|
subservices := []services.Service{}
|
|
if cfg.EnableSharding {
|
|
ringStore, err := kv.NewClient(
|
|
memberlistKVConfig,
|
|
ring.GetCodec(),
|
|
kv.RegistererWithKVName(reg, resource.RingName),
|
|
log,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create KV store client: %s", err)
|
|
}
|
|
|
|
lifecyclerCfg, err := toLifecyclerConfig(cfg, log)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize storage-ring lifecycler config: %s", err)
|
|
}
|
|
|
|
// Define lifecycler delegates in reverse order (last to be called defined first because they're
|
|
// chained via "next delegate").
|
|
delegate := ring.BasicLifecyclerDelegate(ring.NewInstanceRegisterDelegate(ring.JOINING, resource.RingNumTokens))
|
|
delegate = ring.NewLeaveOnStoppingDelegate(delegate, log)
|
|
delegate = ring.NewAutoForgetDelegate(resource.RingHeartbeatTimeout*2, delegate, log)
|
|
|
|
s.ringLifecycler, err = ring.NewBasicLifecycler(
|
|
lifecyclerCfg,
|
|
resource.RingName,
|
|
resource.RingKey,
|
|
ringStore,
|
|
delegate,
|
|
log,
|
|
reg,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize storage-ring lifecycler: %s", err)
|
|
}
|
|
|
|
s.ringLifecycler.SetKeepInstanceInTheRingOnShutdown(true)
|
|
subservices = append(subservices, s.ringLifecycler)
|
|
}
|
|
|
|
if cfg.QOSEnabled {
|
|
qosReg := prometheus.WrapRegistererWithPrefix("resource_server_qos_", reg)
|
|
queue := scheduler.NewQueue(&scheduler.QueueOptions{
|
|
MaxSizePerTenant: cfg.QOSMaxSizePerTenant,
|
|
Registerer: qosReg,
|
|
})
|
|
scheduler, err := scheduler.NewScheduler(queue, &scheduler.Config{
|
|
NumWorkers: cfg.QOSNumberWorker,
|
|
Logger: log,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create qos scheduler: %s", err)
|
|
}
|
|
|
|
s.queue = queue
|
|
s.scheduler = scheduler
|
|
subservices = append(subservices, s.queue, s.scheduler)
|
|
}
|
|
|
|
if len(subservices) > 0 {
|
|
s.hasSubservices = true
|
|
s.subservices, err = services.NewManager(subservices...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create subservices manager: %w", err)
|
|
}
|
|
}
|
|
|
|
// This will be used when running as a dskit service
|
|
s.BasicService = services.NewBasicService(s.starting, s.running, s.stopping).WithName(modules.StorageServer)
|
|
|
|
return s, nil
|
|
}
|
|
|
|
func (s *service) starting(ctx context.Context) error {
|
|
if s.hasSubservices {
|
|
s.subservicesWatcher.WatchManager(s.subservices)
|
|
if err := services.StartManagerAndAwaitHealthy(ctx, s.subservices); err != nil {
|
|
return fmt.Errorf("failed to start subservices: %w", err)
|
|
}
|
|
}
|
|
|
|
authzClient, err := authz.ProvideStandaloneAuthZClient(s.cfg, s.features, s.tracing)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
searchOptions, err := search.NewSearchOptions(s.features, s.cfg, s.tracing, s.docBuilders, s.indexMetrics)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
serverOptions := ServerOptions{
|
|
DB: s.db,
|
|
Cfg: s.cfg,
|
|
Tracer: s.tracing,
|
|
Reg: s.reg,
|
|
AccessClient: authzClient,
|
|
SearchOptions: searchOptions,
|
|
StorageMetrics: s.storageMetrics,
|
|
IndexMetrics: s.indexMetrics,
|
|
Features: s.features,
|
|
QOSQueue: s.queue,
|
|
Ring: s.searchRing,
|
|
RingLifecycler: s.ringLifecycler,
|
|
}
|
|
server, err := NewResourceServer(serverOptions)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.handler, err = grpcserver.ProvideService(s.cfg, s.features, interceptors.AuthenticatorFunc(s.authenticator), s.tracing, prometheus.DefaultRegisterer)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
healthService, err := resource.ProvideHealthService(server)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
srv := s.handler.GetServer()
|
|
resourcepb.RegisterResourceStoreServer(srv, server)
|
|
resourcepb.RegisterBulkStoreServer(srv, server)
|
|
resourcepb.RegisterResourceIndexServer(srv, server)
|
|
resourcepb.RegisterManagedObjectIndexServer(srv, server)
|
|
resourcepb.RegisterBlobStoreServer(srv, server)
|
|
resourcepb.RegisterDiagnosticsServer(srv, server)
|
|
grpc_health_v1.RegisterHealthServer(srv, healthService)
|
|
|
|
// register reflection service
|
|
_, err = grpcserver.ProvideReflectionService(s.cfg, s.handler)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if s.cfg.EnableSharding {
|
|
s.log.Info("waiting until resource server is JOINING in the ring")
|
|
lfcCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
|
defer cancel()
|
|
if err := ring.WaitInstanceState(lfcCtx, s.searchRing, s.ringLifecycler.GetInstanceID(), ring.JOINING); err != nil {
|
|
return fmt.Errorf("error switching to JOINING in the ring: %s", err)
|
|
}
|
|
s.log.Info("resource server is JOINING in the ring")
|
|
|
|
if err := s.ringLifecycler.ChangeState(ctx, ring.ACTIVE); err != nil {
|
|
return fmt.Errorf("error switching to ACTIVE in the ring: %s", err)
|
|
}
|
|
s.log.Info("resource server is ACTIVE in the ring")
|
|
}
|
|
|
|
// start the gRPC server
|
|
go func() {
|
|
err := s.handler.Run(ctx)
|
|
if err != nil {
|
|
s.stoppedCh <- err
|
|
} else {
|
|
s.stoppedCh <- nil
|
|
}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
// GetAddress returns the address of the gRPC server.
|
|
func (s *service) GetAddress() string {
|
|
return s.handler.GetAddress()
|
|
}
|
|
|
|
func (s *service) running(ctx context.Context) error {
|
|
select {
|
|
case err := <-s.stoppedCh:
|
|
if err != nil && !errors.Is(err, context.Canceled) {
|
|
return err
|
|
}
|
|
case err := <-s.subservicesWatcher.Chan():
|
|
return fmt.Errorf("subservice failure: %w", err)
|
|
case <-ctx.Done():
|
|
close(s.stopCh)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *service) stopping(_ error) error {
|
|
if s.hasSubservices {
|
|
err := services.StopManagerAndAwaitStopped(context.Background(), s.subservices)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to stop subservices: %w", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type authenticatorWithFallback struct {
|
|
authenticator func(ctx context.Context) (context.Context, error)
|
|
fallback func(ctx context.Context) (context.Context, error)
|
|
metrics *metrics
|
|
tracer trace.Tracer
|
|
}
|
|
|
|
type metrics struct {
|
|
requestsTotal *prometheus.CounterVec
|
|
}
|
|
|
|
func (f *authenticatorWithFallback) Authenticate(ctx context.Context) (context.Context, error) {
|
|
ctx, span := f.tracer.Start(ctx, "grpcutils.AuthenticatorWithFallback.Authenticate")
|
|
defer span.End()
|
|
|
|
// Try to authenticate with the new authenticator first
|
|
span.SetAttributes(attribute.Bool("fallback_used", false))
|
|
newCtx, err := f.authenticator(ctx)
|
|
if err == nil {
|
|
// fallback not used, authentication successful
|
|
f.metrics.requestsTotal.WithLabelValues("false", "true").Inc()
|
|
return newCtx, nil
|
|
}
|
|
|
|
// In case of error, fallback to the legacy authenticator
|
|
span.SetAttributes(attribute.Bool("fallback_used", true))
|
|
newCtx, err = f.fallback(ctx)
|
|
if newCtx != nil {
|
|
newCtx = resource.WithFallback(newCtx)
|
|
}
|
|
f.metrics.requestsTotal.WithLabelValues("true", fmt.Sprintf("%t", err == nil)).Inc()
|
|
return newCtx, err
|
|
}
|
|
|
|
func newMetrics(reg prometheus.Registerer) *metrics {
|
|
return &metrics{
|
|
requestsTotal: promauto.With(reg).NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Name: "grafana_grpc_authenticator_with_fallback_requests_total",
|
|
Help: "Number requests using the authenticator with fallback",
|
|
}, []string{"fallback_used", "result"}),
|
|
}
|
|
}
|
|
|
|
func ReadGrpcServerConfig(cfg *setting.Cfg) *grpcutils.AuthenticatorConfig {
|
|
section := cfg.SectionWithEnvOverrides("grpc_server_authentication")
|
|
|
|
return &grpcutils.AuthenticatorConfig{
|
|
SigningKeysURL: section.Key("signing_keys_url").MustString(""),
|
|
AllowedAudiences: section.Key("allowed_audiences").Strings(","),
|
|
AllowInsecure: cfg.Env == setting.Dev,
|
|
}
|
|
}
|
|
|
|
func NewAuthenticatorWithFallback(cfg *setting.Cfg, reg prometheus.Registerer, tracer trace.Tracer, fallback func(context.Context) (context.Context, error)) func(context.Context) (context.Context, error) {
|
|
authCfg := ReadGrpcServerConfig(cfg)
|
|
authenticator := grpcutils.NewAuthenticator(authCfg, tracer)
|
|
metrics := newMetrics(reg)
|
|
return func(ctx context.Context) (context.Context, error) {
|
|
a := &authenticatorWithFallback{
|
|
authenticator: authenticator,
|
|
fallback: fallback,
|
|
tracer: tracer,
|
|
metrics: metrics,
|
|
}
|
|
return a.Authenticate(ctx)
|
|
}
|
|
}
|
|
|
|
func toLifecyclerConfig(cfg *setting.Cfg, logger log.Logger) (ring.BasicLifecyclerConfig, error) {
|
|
instanceAddr, err := ring.GetInstanceAddr(cfg.MemberlistBindAddr, netutil.PrivateNetworkInterfacesWithFallback([]string{"eth0", "en0"}, logger), logger, true)
|
|
if err != nil {
|
|
return ring.BasicLifecyclerConfig{}, err
|
|
}
|
|
|
|
instanceId := cfg.InstanceID
|
|
if instanceId == "" {
|
|
hostname, err := os.Hostname()
|
|
if err != nil {
|
|
return ring.BasicLifecyclerConfig{}, err
|
|
}
|
|
|
|
instanceId = hostname
|
|
}
|
|
|
|
_, grpcPortStr, err := net.SplitHostPort(cfg.GRPCServer.Address)
|
|
if err != nil {
|
|
return ring.BasicLifecyclerConfig{}, fmt.Errorf("could not get grpc port from grpc server address: %s", err)
|
|
}
|
|
|
|
grpcPort, err := strconv.Atoi(grpcPortStr)
|
|
if err != nil {
|
|
return ring.BasicLifecyclerConfig{}, fmt.Errorf("error converting grpc address port to int: %s", err)
|
|
}
|
|
|
|
return ring.BasicLifecyclerConfig{
|
|
Addr: fmt.Sprintf("%s:%d", instanceAddr, grpcPort),
|
|
ID: instanceId,
|
|
HeartbeatPeriod: 15 * time.Second,
|
|
HeartbeatTimeout: resource.RingHeartbeatTimeout,
|
|
TokensObservePeriod: 0,
|
|
NumTokens: resource.RingNumTokens,
|
|
}, nil
|
|
}
|