unified-storage: setup ring to shard requests (#103783)

* Updates the instrumentation_server service to use mux instead of the builtin router, and have it store the router in the module server: this is so we can register the /ring endpoint to check the status of the ring
* Create a new Ring service that depends on the instrumentation server and declares it as a dependency for the storage server
* Create standalone MemberlistKV service for Ring service to use
* Update the storage server Search and GetStats handler to distribute requests if applicable
This commit is contained in:
Will Assis
2025-04-25 14:08:44 -03:00
committed by GitHub
parent ff7b923d33
commit 4adebd6058
19 changed files with 654 additions and 25 deletions

View File

@ -9,6 +9,8 @@ import (
"strconv"
"sync"
"github.com/gorilla/mux"
"github.com/grafana/dskit/kv"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/dskit/services"
@ -32,10 +34,11 @@ func NewModule(opts Options,
cfg *setting.Cfg,
storageMetrics *resource.StorageMetrics,
indexMetrics *resource.BleveIndexMetrics,
reg prometheus.Registerer,
promGatherer prometheus.Gatherer,
license licensing.Licensing,
) (*ModuleServer, error) {
s, err := newModuleServer(opts, apiOpts, features, cfg, storageMetrics, indexMetrics, promGatherer, license)
s, err := newModuleServer(opts, apiOpts, features, cfg, storageMetrics, indexMetrics, reg, promGatherer, license)
if err != nil {
return nil, err
}
@ -47,7 +50,7 @@ func NewModule(opts Options,
return s, nil
}
func newModuleServer(opts Options, apiOpts api.ServerOptions, features featuremgmt.FeatureToggles, cfg *setting.Cfg, storageMetrics *resource.StorageMetrics, indexMetrics *resource.BleveIndexMetrics, promGatherer prometheus.Gatherer, license licensing.Licensing) (*ModuleServer, error) {
func newModuleServer(opts Options, apiOpts api.ServerOptions, features featuremgmt.FeatureToggles, cfg *setting.Cfg, storageMetrics *resource.StorageMetrics, indexMetrics *resource.BleveIndexMetrics, reg prometheus.Registerer, promGatherer prometheus.Gatherer, license licensing.Licensing) (*ModuleServer, error) {
rootCtx, shutdownFn := context.WithCancel(context.Background())
s := &ModuleServer{
@ -66,6 +69,7 @@ func newModuleServer(opts Options, apiOpts api.ServerOptions, features featuremg
storageMetrics: storageMetrics,
indexMetrics: indexMetrics,
promGatherer: promGatherer,
registerer: reg,
license: license,
}
@ -98,6 +102,11 @@ type ModuleServer struct {
buildBranch string
promGatherer prometheus.Gatherer
registerer prometheus.Registerer
MemberlistKVConfig kv.Config
httpServerRouter *mux.Router
distributor *resource.Distributor
}
// init initializes the server and its services.
@ -136,9 +145,12 @@ func (s *ModuleServer) Run() error {
if m.IsModuleEnabled(modules.All) || m.IsModuleEnabled(modules.Core) || m.IsModuleEnabled(modules.FrontendServer) {
return services.NewBasicService(nil, nil, nil).WithName(modules.InstrumentationServer), nil
}
return NewInstrumentationService(s.log, s.cfg, s.promGatherer)
return s.initInstrumentationServer()
})
m.RegisterModule(modules.MemberlistKV, s.initMemberlistKV)
m.RegisterModule(modules.StorageRing, s.initRing)
m.RegisterModule(modules.Core, func() (services.Service, error) {
return NewService(s.cfg, s.opts, s.apiOpts)
})
@ -157,7 +169,7 @@ func (s *ModuleServer) Run() error {
if err != nil {
return nil, err
}
return sql.ProvideUnifiedStorageGrpcService(s.cfg, s.features, nil, s.log, nil, docBuilders, s.storageMetrics, s.indexMetrics)
return sql.ProvideUnifiedStorageGrpcService(s.cfg, s.features, nil, s.log, nil, docBuilders, s.storageMetrics, s.indexMetrics, s.distributor)
})
m.RegisterModule(modules.ZanzanaServer, func() (services.Service, error) {