Files
grafana/pkg/server/instrumentation_service.go
Will Assis 4adebd6058 unified-storage: setup ring to shard requests (#103783)
* Updates the instrumentation_server service to use mux instead of the builtin router, and have it store the router in the module server: this is so we can register the /ring endpoint to check the status of the ring
* Create a new Ring service that depends on the instrumentation server and declares it as a dependency for the storage server
* Create standalone MemberlistKV service for Ring service to use
* Update the storage server Search and GetStats handler to distribute requests if applicable
2025-04-25 13:08:44 -04:00

74 lines
2.0 KiB
Go

package server
import (
"context"
"net"
"net/http"
"time"
"github.com/gorilla/mux"
"github.com/grafana/dskit/services"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/setting"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
type instrumentationService struct {
*services.BasicService
cfg *setting.Cfg
httpServ *http.Server
log log.Logger
errChan chan error
promGatherer prometheus.Gatherer
}
func (ms *ModuleServer) initInstrumentationServer() (*instrumentationService, error) {
s := &instrumentationService{log: ms.log, cfg: ms.cfg, promGatherer: ms.promGatherer}
s.httpServ, ms.httpServerRouter = s.newInstrumentationServer()
s.BasicService = services.NewBasicService(s.start, s.running, s.stop)
return s, nil
}
func (s *instrumentationService) start(ctx context.Context) error {
s.errChan = make(chan error)
go func() {
s.errChan <- s.httpServ.ListenAndServe()
}()
return nil
}
func (s *instrumentationService) running(ctx context.Context) error {
select {
case <-ctx.Done():
return nil
case err := <-s.errChan:
return err
}
}
func (s *instrumentationService) stop(failureReason error) error {
s.log.Info("stopping instrumentation server", "reason", failureReason)
if err := s.httpServ.Shutdown(context.Background()); err != nil {
s.log.Error("failed to shutdown instrumentation server", "error", err)
return err
}
return nil
}
func (s *instrumentationService) newInstrumentationServer() (*http.Server, *mux.Router) {
router := mux.NewRouter()
router.Handle("/metrics", promhttp.HandlerFor(s.promGatherer, promhttp.HandlerOpts{EnableOpenMetrics: true}))
addr := net.JoinHostPort(s.cfg.HTTPAddr, s.cfg.HTTPPort)
srv := &http.Server{
// 5s timeout for header reads to avoid Slowloris attacks (https://thetooth.io/blog/slowloris-attack/)
ReadHeaderTimeout: 5 * time.Second,
Addr: addr,
Handler: router,
}
return srv, router
}