diff --git a/pkg/api/api.go b/pkg/api/api.go index 42fb81ab4c9..05b88052829 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -258,6 +258,7 @@ func (hs *HTTPServer) registerRoutes() { apiRoute.Group("/plugins", func(pluginRoute routing.RouteRegister) { pluginRoute.Get("/:pluginId/dashboards/", Wrap(GetPluginDashboards)) pluginRoute.Post("/:pluginId/settings", bind(models.UpdatePluginSettingCmd{}), Wrap(UpdatePluginSetting)) + pluginRoute.Get("/:pluginId/metrics", Wrap(hs.CollectPluginMetrics)) }, reqOrgAdmin) apiRoute.Get("/frontend/settings/", hs.GetFrontendSettings) diff --git a/pkg/api/plugins.go b/pkg/api/plugins.go index 950cdadf55e..3f856db667b 100644 --- a/pkg/api/plugins.go +++ b/pkg/api/plugins.go @@ -2,6 +2,7 @@ package api import ( "errors" + "net/http" "sort" "time" @@ -247,6 +248,39 @@ func ImportDashboard(c *models.ReqContext, apiCmd dtos.ImportDashboardCommand) R return JSON(200, cmd.Result) } +// CollectPluginMetrics collect metrics from a plugin. +// +// /api/plugins/:pluginId/metrics +func (hs *HTTPServer) CollectPluginMetrics(c *models.ReqContext) Response { + pluginID := c.Params("pluginId") + plugin, exists := plugins.Plugins[pluginID] + if !exists { + return Error(404, "Plugin not found, no installed plugin with that id", nil) + } + + resp, err := hs.BackendPluginManager.CollectMetrics(c.Req.Context(), plugin.Id) + if err != nil { + if err == backendplugin.ErrPluginNotRegistered { + return Error(404, "Plugin not found", err) + } + + if err == backendplugin.ErrDiagnosticsNotSupported { + return Error(404, "Health check not implemented", err) + } + + return Error(500, "Collect plugin metrics failed", err) + } + + headers := make(http.Header) + headers.Set("Content-Type", "text/plain") + + return &NormalResponse{ + header: headers, + body: resp.PrometheusMetrics, + status: http.StatusOK, + } +} + // CheckHealth returns the health of a plugin. // /api/plugins/:pluginId/health func (hs *HTTPServer) CheckHealth(c *models.ReqContext) Response { diff --git a/pkg/plugins/backendplugin/backend_plugin.go b/pkg/plugins/backendplugin/backend_plugin.go index 78707f7e443..c2d48bb177b 100644 --- a/pkg/plugins/backendplugin/backend_plugin.go +++ b/pkg/plugins/backendplugin/backend_plugin.go @@ -1,26 +1,20 @@ package backendplugin import ( - "bytes" "context" "errors" - "fmt" "net/http" "time" "github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/expfmt" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" datasourceV1 "github.com/grafana/grafana-plugin-model/go/datasource" rendererV1 "github.com/grafana/grafana-plugin-model/go/renderer" "github.com/grafana/grafana/pkg/infra/log" - "github.com/grafana/grafana/pkg/plugins/backendplugin/collector" "github.com/grafana/grafana/pkg/util/errutil" plugin "github.com/hashicorp/go-plugin" - dto "github.com/prometheus/client_model/go" ) // BackendPlugin a registered backend plugin. @@ -140,49 +134,27 @@ func (p *BackendPlugin) supportsDiagnostics() bool { } // CollectMetrics implements the collector.Collector interface. -func (p *BackendPlugin) CollectMetrics(ctx context.Context, ch chan<- prometheus.Metric) error { - if p.diagnostics == nil { - return nil - } - - if p.client == nil || p.client.Exited() { - return nil +func (p *BackendPlugin) CollectMetrics(ctx context.Context) (*pluginv2.CollectMetricsResponse, error) { + if p.diagnostics == nil || p.client == nil || p.client.Exited() { + return &pluginv2.CollectMetricsResponse{ + Metrics: &pluginv2.CollectMetricsResponse_Payload{}, + }, nil } res, err := p.diagnostics.CollectMetrics(ctx, &pluginv2.CollectMetricsRequest{}) if err != nil { if st, ok := status.FromError(err); ok { if st.Code() == codes.Unimplemented { - return nil + return &pluginv2.CollectMetricsResponse{ + Metrics: &pluginv2.CollectMetricsResponse_Payload{}, + }, nil } } - return err + return nil, err } - if res == nil || res.Metrics == nil || res.Metrics.Prometheus == nil { - return nil - } - - reader := bytes.NewReader(res.Metrics.Prometheus) - var parser expfmt.TextParser - families, err := parser.TextToMetricFamilies(reader) - if err != nil { - return errutil.Wrap("failed to parse collected metrics", err) - } - - for _, mf := range families { - if mf.Help == nil { - help := fmt.Sprintf("Metric read from %s plugin", p.id) - mf.Help = &help - } - } - - for _, mf := range families { - convertMetricFamily(p.id, mf, ch, p.logger) - } - - return nil + return res, nil } func (p *BackendPlugin) checkHealth(ctx context.Context, config *PluginConfig) (*pluginv2.CheckHealthResponse, error) { @@ -321,112 +293,3 @@ func (p *BackendPlugin) callResource(ctx context.Context, req CallResourceReques stream: protoStream, }, nil } - -// convertMetricFamily converts metric family to prometheus.Metric. -// Copied from https://github.com/prometheus/node_exporter/blob/3ddc82c2d8d11eec53ed5faa8db969a1bb81f8bb/collector/textfile.go#L66-L165 -func convertMetricFamily(pluginID string, metricFamily *dto.MetricFamily, ch chan<- prometheus.Metric, logger log.Logger) { - var valType prometheus.ValueType - var val float64 - - allLabelNames := map[string]struct{}{} - for _, metric := range metricFamily.Metric { - labels := metric.GetLabel() - for _, label := range labels { - if _, ok := allLabelNames[label.GetName()]; !ok { - allLabelNames[label.GetName()] = struct{}{} - } - } - } - - for _, metric := range metricFamily.Metric { - if metric.TimestampMs != nil { - logger.Warn("Ignoring unsupported custom timestamp on metric", "metric", metric) - } - - labels := metric.GetLabel() - var names []string - var values []string - for _, label := range labels { - names = append(names, label.GetName()) - values = append(values, label.GetValue()) - } - names = append(names, "plugin_id") - values = append(values, pluginID) - - for k := range allLabelNames { - present := false - for _, name := range names { - if k == name { - present = true - break - } - } - if !present { - names = append(names, k) - values = append(values, "") - } - } - - metricName := prometheus.BuildFQName(collector.Namespace, "", *metricFamily.Name) - - metricType := metricFamily.GetType() - switch metricType { - case dto.MetricType_COUNTER: - valType = prometheus.CounterValue - val = metric.Counter.GetValue() - - case dto.MetricType_GAUGE: - valType = prometheus.GaugeValue - val = metric.Gauge.GetValue() - - case dto.MetricType_UNTYPED: - valType = prometheus.UntypedValue - val = metric.Untyped.GetValue() - - case dto.MetricType_SUMMARY: - quantiles := map[float64]float64{} - for _, q := range metric.Summary.Quantile { - quantiles[q.GetQuantile()] = q.GetValue() - } - ch <- prometheus.MustNewConstSummary( - prometheus.NewDesc( - metricName, - metricFamily.GetHelp(), - names, nil, - ), - metric.Summary.GetSampleCount(), - metric.Summary.GetSampleSum(), - quantiles, values..., - ) - case dto.MetricType_HISTOGRAM: - buckets := map[float64]uint64{} - for _, b := range metric.Histogram.Bucket { - buckets[b.GetUpperBound()] = b.GetCumulativeCount() - } - ch <- prometheus.MustNewConstHistogram( - prometheus.NewDesc( - metricName, - metricFamily.GetHelp(), - names, nil, - ), - metric.Histogram.GetSampleCount(), - metric.Histogram.GetSampleSum(), - buckets, values..., - ) - default: - logger.Error("unknown metric type", "type", metricType) - continue - } - - if metricType == dto.MetricType_GAUGE || metricType == dto.MetricType_COUNTER || metricType == dto.MetricType_UNTYPED { - ch <- prometheus.MustNewConstMetric( - prometheus.NewDesc( - metricName, - metricFamily.GetHelp(), - names, nil, - ), - valType, val, values..., - ) - } - } -} diff --git a/pkg/plugins/backendplugin/collector/collector.go b/pkg/plugins/backendplugin/collector/collector.go deleted file mode 100644 index dd9cfceaccb..00000000000 --- a/pkg/plugins/backendplugin/collector/collector.go +++ /dev/null @@ -1,89 +0,0 @@ -package collector - -import ( - "context" - "sync" - "time" - - "github.com/grafana/grafana/pkg/infra/log" - "github.com/prometheus/client_golang/prometheus" -) - -// Namespace collector metric namespace -const Namespace = "grafana_plugin" - -var ( - scrapeDurationDesc = prometheus.NewDesc( - prometheus.BuildFQName(Namespace, "scrape", "duration_seconds"), - "grafana_plugin: Duration of a plugin collector scrape.", - []string{"plugin_id"}, - nil, - ) - scrapeSuccessDesc = prometheus.NewDesc( - prometheus.BuildFQName(Namespace, "scrape", "success"), - "grafana_plugin: Whether a plugin collector succeeded.", - []string{"plugin_id"}, - nil, - ) -) - -// Collector is the interface a plugin collector has to implement. -type Collector interface { - // Get new metrics and expose them via prometheus registry. - CollectMetrics(ctx context.Context, ch chan<- prometheus.Metric) error -} - -// PluginCollector implements the prometheus.Collector interface. -type PluginCollector struct { - collectors map[string]Collector - logger log.Logger -} - -// NewPluginCollector creates a new PluginCollector.. -func NewPluginCollector() PluginCollector { - return PluginCollector{ - collectors: make(map[string]Collector), - logger: log.New("plugins.backend.collector"), - } -} - -func (pc PluginCollector) Register(pluginID string, c Collector) { - pc.collectors[pluginID] = c -} - -// Describe implements the prometheus.Collector interface. -func (pc PluginCollector) Describe(ch chan<- *prometheus.Desc) { - ch <- scrapeDurationDesc - ch <- scrapeSuccessDesc -} - -// Collect implements the prometheus.Collector interface. -func (pc PluginCollector) Collect(ch chan<- prometheus.Metric) { - ctx := context.Background() - wg := sync.WaitGroup{} - wg.Add(len(pc.collectors)) - for name, c := range pc.collectors { - go func(name string, c Collector) { - execute(ctx, name, c, ch, pc.logger) - wg.Done() - }(name, c) - } - wg.Wait() -} - -func execute(ctx context.Context, pluginID string, c Collector, ch chan<- prometheus.Metric, logger log.Logger) { - begin := time.Now() - err := c.CollectMetrics(ctx, ch) - duration := time.Since(begin) - var success float64 - - if err != nil { - logger.Error("collector failed", "pluginId", pluginID, "took", duration, "error", err) - success = 0 - } else { - logger.Debug("collector succeeded", "pluginId", pluginID, "took", duration) - success = 1 - } - ch <- prometheus.MustNewConstMetric(scrapeDurationDesc, prometheus.GaugeValue, duration.Seconds(), pluginID) - ch <- prometheus.MustNewConstMetric(scrapeSuccessDesc, prometheus.GaugeValue, success, pluginID) -} diff --git a/pkg/plugins/backendplugin/contracts.go b/pkg/plugins/backendplugin/contracts.go index de9c8ec303d..0ca66d14dce 100644 --- a/pkg/plugins/backendplugin/contracts.go +++ b/pkg/plugins/backendplugin/contracts.go @@ -59,6 +59,23 @@ func checkHealthResultFromProto(protoResp *pluginv2.CheckHealthResponse) *CheckH } } +func collectMetricsResultFromProto(protoResp *pluginv2.CollectMetricsResponse) *CollectMetricsResult { + var prometheusMetrics []byte + + if protoResp.Metrics != nil { + prometheusMetrics = protoResp.Metrics.Prometheus + } + + return &CollectMetricsResult{ + PrometheusMetrics: prometheusMetrics, + } +} + +// CollectMetricsResult collect metrics result. +type CollectMetricsResult struct { + PrometheusMetrics []byte +} + type DataSourceConfig struct { ID int64 Name string diff --git a/pkg/plugins/backendplugin/manager.go b/pkg/plugins/backendplugin/manager.go index 34d4876ac90..f6a81405073 100644 --- a/pkg/plugins/backendplugin/manager.go +++ b/pkg/plugins/backendplugin/manager.go @@ -10,10 +10,7 @@ import ( "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/util/proxyutil" - "github.com/prometheus/client_golang/prometheus" - "github.com/grafana/grafana/pkg/infra/log" - "github.com/grafana/grafana/pkg/plugins/backendplugin/collector" "github.com/grafana/grafana/pkg/registry" plugin "github.com/hashicorp/go-plugin" "golang.org/x/xerrors" @@ -42,6 +39,8 @@ type Manager interface { Register(descriptor PluginDescriptor) error // StartPlugin starts a non-managed backend plugin StartPlugin(ctx context.Context, pluginID string) error + // CollectMetrics collects metrics from a registered backend plugin. + CollectMetrics(ctx context.Context, pluginID string) (*CollectMetricsResult, error) // CheckHealth checks the health of a registered backend plugin. CheckHealth(ctx context.Context, pluginConfig *PluginConfig) (*CheckHealthResult, error) // CallResource calls a plugin resource. @@ -49,17 +48,14 @@ type Manager interface { } type manager struct { - pluginsMu sync.RWMutex - plugins map[string]*BackendPlugin - pluginCollector collector.PluginCollector - logger log.Logger + pluginsMu sync.RWMutex + plugins map[string]*BackendPlugin + logger log.Logger } func (m *manager) Init() error { m.plugins = make(map[string]*BackendPlugin) m.logger = log.New("plugins.backend") - m.pluginCollector = collector.NewPluginCollector() - prometheus.MustRegister(m.pluginCollector) return nil } @@ -111,11 +107,6 @@ func (m *manager) start(ctx context.Context) { p.logger.Error("Failed to start plugin", "error", err) continue } - - if p.supportsDiagnostics() { - p.logger.Debug("Registering metrics collector") - m.pluginCollector.Register(p.id, p) - } } } @@ -150,6 +141,28 @@ func (m *manager) stop() { } } +// CollectMetrics collects metrics from a registered backend plugin. +func (m *manager) CollectMetrics(ctx context.Context, pluginID string) (*CollectMetricsResult, error) { + m.pluginsMu.RLock() + p, registered := m.plugins[pluginID] + m.pluginsMu.RUnlock() + + if !registered { + return nil, ErrPluginNotRegistered + } + + if !p.supportsDiagnostics() { + return nil, ErrDiagnosticsNotSupported + } + + res, err := p.CollectMetrics(ctx) + if err != nil { + return nil, err + } + + return collectMetricsResultFromProto(res), nil +} + // CheckHealth checks the health of a registered backend plugin. func (m *manager) CheckHealth(ctx context.Context, pluginConfig *PluginConfig) (*CheckHealthResult, error) { m.pluginsMu.RLock()