Backend Plugins: Collect and expose metrics and plugin process health check (#21481)

Adds support for collecting metrics from backend plugins and 
exposing them thru Grafana's Prometheus metrics endpoint. 
Enables to check health of backend plugin by using the route
`/api/plugins/<plugin id>/health`.
Uses sdk v0.6.0.

Closes #20984
This commit is contained in:
Marcus Efraimsson
2020-01-15 13:10:48 +01:00
committed by GitHub
parent f56f54b1a3
commit 5c711bfb79
34 changed files with 2406 additions and 623 deletions

View File

@ -1,14 +1,25 @@
package backendplugin
import (
"bytes"
"context"
"errors"
"fmt"
"github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
datasourceV1 "github.com/grafana/grafana-plugin-model/go/datasource"
rendererV1 "github.com/grafana/grafana-plugin-model/go/renderer"
backend "github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/plugins/backendplugin/collector"
"github.com/grafana/grafana/pkg/util/errutil"
plugin "github.com/hashicorp/go-plugin"
dto "github.com/prometheus/client_model/go"
)
// BackendPlugin a registered backend plugin.
@ -20,6 +31,7 @@ type BackendPlugin struct {
client *plugin.Client
logger log.Logger
startFns PluginStartFuncs
diagnostics backend.DiagnosticsPlugin
}
func (p *BackendPlugin) start(ctx context.Context) error {
@ -33,6 +45,11 @@ func (p *BackendPlugin) start(ctx context.Context) error {
var client *Client
if p.client.NegotiatedVersion() > 1 {
rawDiagnostics, err := rpcClient.Dispense("diagnostics")
if err != nil {
return err
}
rawBackend, err := rpcClient.Dispense("backend")
if err != nil {
return err
@ -43,6 +60,12 @@ func (p *BackendPlugin) start(ctx context.Context) error {
return err
}
if rawDiagnostics != nil {
if plugin, ok := rawDiagnostics.(backend.DiagnosticsPlugin); ok {
p.diagnostics = plugin
}
}
client = &Client{}
if rawBackend != nil {
if plugin, ok := rawBackend.(backend.BackendPlugin); ok {
@ -96,3 +119,186 @@ func (p *BackendPlugin) stop() error {
}
return nil
}
// supportsDiagnostics return whether backend plugin supports diagnostics like metrics and health check.
func (p *BackendPlugin) supportsDiagnostics() bool {
return p.diagnostics != nil
}
// CollectMetrics implements the collector.Collector interface.
func (p *BackendPlugin) CollectMetrics(ctx context.Context, ch chan<- prometheus.Metric) error {
if p.diagnostics == nil {
return nil
}
if p.client == nil || p.client.Exited() {
return nil
}
res, err := p.diagnostics.CollectMetrics(ctx, &pluginv2.CollectMetrics_Request{})
if err != nil {
if st, ok := status.FromError(err); ok {
if st.Code() == codes.Unimplemented {
return nil
}
}
return err
}
if res == nil || res.Metrics == nil || res.Metrics.Prometheus == nil {
return nil
}
reader := bytes.NewReader(res.Metrics.Prometheus)
var parser expfmt.TextParser
families, err := parser.TextToMetricFamilies(reader)
if err != nil {
return errutil.Wrap("failed to parse collected metrics", err)
}
for _, mf := range families {
if mf.Help == nil {
help := fmt.Sprintf("Metric read from %s plugin", p.id)
mf.Help = &help
}
}
for _, mf := range families {
convertMetricFamily(p.id, mf, ch, p.logger)
}
return nil
}
func (p *BackendPlugin) checkHealth(ctx context.Context) (*pluginv2.CheckHealth_Response, error) {
if p.diagnostics == nil || p.client == nil || p.client.Exited() {
return &pluginv2.CheckHealth_Response{
Status: pluginv2.CheckHealth_Response_UNKNOWN,
}, nil
}
res, err := p.diagnostics.CheckHealth(ctx, &pluginv2.CheckHealth_Request{})
if err != nil {
if st, ok := status.FromError(err); ok {
if st.Code() == codes.Unimplemented {
return &pluginv2.CheckHealth_Response{
Status: pluginv2.CheckHealth_Response_UNKNOWN,
Info: "Health check not implemented",
}, nil
}
}
return nil, err
}
return res, nil
}
// convertMetricFamily converts metric family to prometheus.Metric.
// Copied from https://github.com/prometheus/node_exporter/blob/3ddc82c2d8d11eec53ed5faa8db969a1bb81f8bb/collector/textfile.go#L66-L165
func convertMetricFamily(pluginID string, metricFamily *dto.MetricFamily, ch chan<- prometheus.Metric, logger log.Logger) {
var valType prometheus.ValueType
var val float64
allLabelNames := map[string]struct{}{}
for _, metric := range metricFamily.Metric {
labels := metric.GetLabel()
for _, label := range labels {
if _, ok := allLabelNames[label.GetName()]; !ok {
allLabelNames[label.GetName()] = struct{}{}
}
}
}
for _, metric := range metricFamily.Metric {
if metric.TimestampMs != nil {
logger.Warn("Ignoring unsupported custom timestamp on metric", "metric", metric)
}
labels := metric.GetLabel()
var names []string
var values []string
for _, label := range labels {
names = append(names, label.GetName())
values = append(values, label.GetValue())
}
names = append(names, "plugin_id")
values = append(values, pluginID)
for k := range allLabelNames {
present := false
for _, name := range names {
if k == name {
present = true
break
}
}
if !present {
names = append(names, k)
values = append(values, "")
}
}
metricName := prometheus.BuildFQName(collector.Namespace, "", *metricFamily.Name)
metricType := metricFamily.GetType()
switch metricType {
case dto.MetricType_COUNTER:
valType = prometheus.CounterValue
val = metric.Counter.GetValue()
case dto.MetricType_GAUGE:
valType = prometheus.GaugeValue
val = metric.Gauge.GetValue()
case dto.MetricType_UNTYPED:
valType = prometheus.UntypedValue
val = metric.Untyped.GetValue()
case dto.MetricType_SUMMARY:
quantiles := map[float64]float64{}
for _, q := range metric.Summary.Quantile {
quantiles[q.GetQuantile()] = q.GetValue()
}
ch <- prometheus.MustNewConstSummary(
prometheus.NewDesc(
metricName,
metricFamily.GetHelp(),
names, nil,
),
metric.Summary.GetSampleCount(),
metric.Summary.GetSampleSum(),
quantiles, values...,
)
case dto.MetricType_HISTOGRAM:
buckets := map[float64]uint64{}
for _, b := range metric.Histogram.Bucket {
buckets[b.GetUpperBound()] = b.GetCumulativeCount()
}
ch <- prometheus.MustNewConstHistogram(
prometheus.NewDesc(
metricName,
metricFamily.GetHelp(),
names, nil,
),
metric.Histogram.GetSampleCount(),
metric.Histogram.GetSampleSum(),
buckets, values...,
)
default:
logger.Error("unknown metric type", "type", metricType)
continue
}
if metricType == dto.MetricType_GAUGE || metricType == dto.MetricType_COUNTER || metricType == dto.MetricType_UNTYPED {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
metricName,
metricFamily.GetHelp(),
names, nil,
),
valType, val, values...,
)
}
}
}

View File

@ -8,7 +8,6 @@ import (
datasourceV1 "github.com/grafana/grafana-plugin-model/go/datasource"
rendererV1 "github.com/grafana/grafana-plugin-model/go/renderer"
backend "github.com/grafana/grafana-plugin-sdk-go/backend"
sdk "github.com/grafana/grafana-plugin-sdk-go/common"
"github.com/hashicorp/go-plugin"
)
@ -27,8 +26,8 @@ var handshake = plugin.HandshakeConfig{
ProtocolVersion: DefaultProtocolVersion,
// The magic cookie values should NEVER be changed.
MagicCookieKey: sdk.MagicCookieKey,
MagicCookieValue: sdk.MagicCookieValue,
MagicCookieKey: backend.MagicCookieKey,
MagicCookieValue: backend.MagicCookieValue,
}
func newClientConfig(executablePath string, logger log.Logger, versionedPlugins map[int]plugin.PluginSet) *plugin.ClientConfig {
@ -73,9 +72,10 @@ func NewBackendPluginDescriptor(pluginID, executablePath string, startFns Plugin
DefaultProtocolVersion: {
pluginID: &datasourceV1.DatasourcePluginImpl{},
},
sdk.ProtocolVersion: {
"backend": &backend.CoreGRPCPlugin{},
"transform": &backend.TransformGRPCPlugin{},
backend.ProtocolVersion: {
"diagnostics": &backend.DiagnosticsGRPCPlugin{},
"backend": &backend.CoreGRPCPlugin{},
"transform": &backend.TransformGRPCPlugin{},
},
},
startFns: startFns,
@ -106,6 +106,7 @@ type LegacyClient struct {
// Client client for communicating with a plugin using the current plugin protocol.
type Client struct {
BackendPlugin backend.BackendPlugin
TransformPlugin backend.TransformPlugin
DiagnosticsPlugin backend.DiagnosticsPlugin
BackendPlugin backend.BackendPlugin
TransformPlugin backend.TransformPlugin
}

View File

@ -0,0 +1,89 @@
package collector
import (
"context"
"sync"
"time"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/prometheus/client_golang/prometheus"
)
// Namespace collector metric namespace
const Namespace = "grafana_plugin"
var (
scrapeDurationDesc = prometheus.NewDesc(
prometheus.BuildFQName(Namespace, "scrape", "duration_seconds"),
"grafana_plugin: Duration of a plugin collector scrape.",
[]string{"plugin_id"},
nil,
)
scrapeSuccessDesc = prometheus.NewDesc(
prometheus.BuildFQName(Namespace, "scrape", "success"),
"grafana_plugin: Whether a plugin collector succeeded.",
[]string{"plugin_id"},
nil,
)
)
// Collector is the interface a plugin collector has to implement.
type Collector interface {
// Get new metrics and expose them via prometheus registry.
CollectMetrics(ctx context.Context, ch chan<- prometheus.Metric) error
}
// PluginCollector implements the prometheus.Collector interface.
type PluginCollector struct {
collectors map[string]Collector
logger log.Logger
}
// NewPluginCollector creates a new PluginCollector..
func NewPluginCollector() PluginCollector {
return PluginCollector{
collectors: make(map[string]Collector),
logger: log.New("plugins.backend.collector"),
}
}
func (pc PluginCollector) Register(pluginID string, c Collector) {
pc.collectors[pluginID] = c
}
// Describe implements the prometheus.Collector interface.
func (pc PluginCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- scrapeDurationDesc
ch <- scrapeSuccessDesc
}
// Collect implements the prometheus.Collector interface.
func (pc PluginCollector) Collect(ch chan<- prometheus.Metric) {
ctx := context.Background()
wg := sync.WaitGroup{}
wg.Add(len(pc.collectors))
for name, c := range pc.collectors {
go func(name string, c Collector) {
execute(ctx, name, c, ch, pc.logger)
wg.Done()
}(name, c)
}
wg.Wait()
}
func execute(ctx context.Context, pluginID string, c Collector, ch chan<- prometheus.Metric, logger log.Logger) {
begin := time.Now()
err := c.CollectMetrics(ctx, ch)
duration := time.Since(begin)
var success float64
if err != nil {
logger.Error("collector failed", "pluginId", pluginID, "took", duration, "error", err)
success = 0
} else {
logger.Debug("collector succeeded", "pluginId", pluginID, "took", duration)
success = 1
}
ch <- prometheus.MustNewConstMetric(scrapeDurationDesc, prometheus.GaugeValue, duration.Seconds(), pluginID)
ch <- prometheus.MustNewConstMetric(scrapeSuccessDesc, prometheus.GaugeValue, success, pluginID)
}

View File

@ -6,7 +6,14 @@ import (
"sync"
"time"
"github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins/backendplugin/collector"
"github.com/grafana/grafana/pkg/registry"
plugin "github.com/hashicorp/go-plugin"
"golang.org/x/xerrors"
@ -29,14 +36,21 @@ type Manager interface {
}
type manager struct {
pluginsMu sync.RWMutex
plugins map[string]*BackendPlugin
logger log.Logger
RouteRegister routing.RouteRegister `inject:""`
pluginsMu sync.RWMutex
plugins map[string]*BackendPlugin
pluginCollector collector.PluginCollector
logger log.Logger
}
func (m *manager) Init() error {
m.plugins = make(map[string]*BackendPlugin)
m.logger = log.New("plugins.backend")
m.pluginCollector = collector.NewPluginCollector()
prometheus.MustRegister(m.pluginCollector)
m.RouteRegister.Get("/api/plugins/:pluginId/health", m.checkHealth)
return nil
}
@ -85,6 +99,12 @@ func (m *manager) start(ctx context.Context) {
if err := startPluginAndRestartKilledProcesses(ctx, p); err != nil {
p.logger.Error("Failed to start plugin", "error", err)
continue
}
if p.supportsDiagnostics() {
p.logger.Debug("Registering metrics collector")
m.pluginCollector.Register(p.id, p)
}
}
}
@ -120,6 +140,40 @@ func (m *manager) stop() {
}
}
// checkHealth http handler for checking plugin health.
func (m *manager) checkHealth(c *models.ReqContext) {
pluginID := c.Params("pluginId")
m.pluginsMu.RLock()
p, registered := m.plugins[pluginID]
m.pluginsMu.RUnlock()
if !registered || !p.supportsDiagnostics() {
c.JsonApiErr(404, "Plugin not found", nil)
return
}
res, err := p.checkHealth(c.Req.Context())
if err != nil {
p.logger.Error("Failed to check plugin health", "error", err)
c.JSON(503, map[string]interface{}{
"status": pluginv2.CheckHealth_Response_ERROR.String(),
})
return
}
payload := map[string]interface{}{
"status": res.Status.String(),
"info": res.Info,
}
if res.Status != pluginv2.CheckHealth_Response_OK {
c.JSON(503, payload)
return
}
c.JSON(200, payload)
}
func startPluginAndRestartKilledProcesses(ctx context.Context, p *BackendPlugin) error {
if err := p.start(ctx); err != nil {
return err