Alerting: Select remote write path dependent on metrics backend type. (#101891)

The remote write path differs based on whether the data source is actually
Prometheus, Mimir, Cortex, or an older version of Cortex. We do not want
users to have to specify the path, so this change determines the path as
best it can.

It may be in the future we have to make this configurable per-datasource
to cater for setups where it's impossible to determine the correct path.
This commit is contained in:
Steve Simpson
2025-03-11 13:45:16 +01:00
committed by GitHub
parent 062a0e7212
commit bbab62ce39
9 changed files with 199 additions and 57 deletions

View File

@ -1527,9 +1527,6 @@ timeout = 10s
# Only has effect if the grafanaManagedRecordRulesDatasources feature toggle is enabled.
default_datasource_uid =
# Suffix to apply to the data source URL for remote write requests.
remote_write_path_suffix = /push
# Optional custom headers to include in recording rule write requests.
[recording_rules.custom_headers]
# exampleHeader = exampleValue

View File

@ -1509,9 +1509,6 @@ timeout = 30s
# Only has effect if the grafanaManagedRecordRulesDatasources feature toggle is enabled.
default_datasource_uid =
# Suffix to apply to the data source URL for remote write requests.
remote_write_path_suffix = /push
# Optional custom headers to include in recording rule write requests.
[recording_rules.custom_headers]
# exampleHeader = exampleValue

View File

@ -74,11 +74,12 @@ func (s *FakeDataSourceService) AddDataSource(ctx context.Context, cmd *datasour
s.lastID = int64(len(s.DataSources) - 1)
}
dataSource := &datasources.DataSource{
ID: s.lastID + 1,
Name: cmd.Name,
Type: cmd.Type,
UID: cmd.UID,
OrgID: cmd.OrgID,
ID: s.lastID + 1,
Name: cmd.Name,
Type: cmd.Type,
UID: cmd.UID,
OrgID: cmd.OrgID,
JsonData: cmd.JsonData,
}
s.DataSources = append(s.DataSources, dataSource)
return dataSource, nil

View File

@ -760,14 +760,12 @@ func createRecordingWriter(featureToggles featuremgmt.FeatureToggles, settings s
if settings.Enabled {
if featureToggles.IsEnabledGlobally(featuremgmt.FlagGrafanaManagedRecordingRulesDatasources) {
cfg := writer.DatasourceWriterConfig{
Timeout: settings.Timeout,
DefaultDatasourceUID: settings.DefaultDatasourceUID,
RemoteWritePathSuffix: settings.RemoteWritePathSuffix,
Timeout: settings.Timeout,
DefaultDatasourceUID: settings.DefaultDatasourceUID,
}
logger.Info("Setting up remote write using data sources",
"timeout", cfg.Timeout, "default_datasource_uid", cfg.DefaultDatasourceUID,
"remote_write_path_suffix", cfg.RemoteWritePathSuffix)
"timeout", cfg.Timeout, "default_datasource_uid", cfg.DefaultDatasourceUID)
return writer.NewDatasourceWriter(cfg, datasourceService, httpClientProvider, clock, logger, m), nil
} else {

View File

@ -18,6 +18,7 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/expr"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/datasources"
@ -574,15 +575,15 @@ func setupDatasourceWriter(t *testing.T, target *writer.TestRemoteWriteTarget, r
dss := &dsfakes.FakeDataSourceService{}
p1, _ := dss.AddDataSource(context.Background(), &datasources.AddDataSourceCommand{
UID: dsUID,
Type: datasources.DS_PROMETHEUS,
UID: dsUID,
Type: datasources.DS_PROMETHEUS,
JsonData: simplejson.MustJson([]byte(`{"prometheusType":"Prometheus"}`)),
})
p1.URL = target.DatasourceURL()
cfg := writer.DatasourceWriterConfig{
Timeout: time.Second * 5,
DefaultDatasourceUID: "",
RemoteWritePathSuffix: writer.RemoteWriteSuffix,
Timeout: time.Second * 5,
DefaultDatasourceUID: "",
}
return writer.NewDatasourceWriter(cfg, dss, provider, clock.NewMock(),

View File

@ -5,6 +5,8 @@ import (
"errors"
"fmt"
"net/url"
"path"
"strings"
"time"
"github.com/benbjohnson/clock"
@ -35,9 +37,6 @@ type DatasourceWriterConfig struct {
// This exists to cater for upgrading from old versions of Grafana, where rule
// definitions may not have a target data source specified.
DefaultDatasourceUID string
// RemoteWritePathSuffix is the path suffix for remote write, normally /push.
RemoteWritePathSuffix string
}
type DatasourceWriter struct {
@ -78,6 +77,73 @@ func (w *DatasourceWriter) decrypt(ds *datasources.DataSource) (map[string]strin
return decryptedJsonData, err
}
func getPrometheusType(ds *datasources.DataSource) string {
if ds.JsonData == nil {
return ""
}
jsonData := ds.JsonData.Get("prometheusType")
if jsonData == nil {
return ""
}
str, err := jsonData.String()
if err != nil {
return ""
}
return str
}
func getRemoteWriteURL(ds *datasources.DataSource) (*url.URL, error) {
u, err := url.Parse(ds.URL)
if err != nil {
return nil, err
}
if getPrometheusType(ds) == "Prometheus" {
return u.JoinPath("/api/v1/write"), nil
}
// All other cases assume Mimir/Cortex, as these systems are much more likely to be
// used as a remote write target, where as Prometheus does not recommend it.
// Mimir/Cortex are more complicated, as Grafana has to be configured with the
// base URL for where the Prometheus API is located, e.g. /api/prom or /prometheus.
//
// - For "legacy" routes, /push is located on the same level as /api/v1/query.
//
// For example:
// Grafana will be configured with <host>/api/prom
// The query API is at /api/prom/api/v1/query
// The push API is at /api/prom/push
//
// - For "new" routes, /push is located at the Mimir root, not Prometheus root.
//
// For example:
// Grafana will be configured with e.g. <host>/prometheus
// The query API is at /prometheus/api/v1/query
// But push API is at /push
//
// Unfortunately, the prefixes can also be configured,
cleanPath := path.Clean(u.Path)
// If the suffix is /api/prom, assume Mimir/Cortex with legacy routes.
if strings.HasSuffix(cleanPath, "/api/prom") {
u.Path = path.Join(u.Path, "/push")
return u, nil
}
// If the suffix is /prometheus, assume Mimir/Cortex with new routes.
if strings.HasSuffix(cleanPath, "/prometheus") {
u.Path = path.Join(path.Dir(u.Path), "/api/v1/push")
return u, nil
}
// The user has configured an unknown prefix, so fall back to taking
// the host as the Mimir root. This is less than ideal.
u.Path = "/api/v1/push"
return u, nil
}
func (w *DatasourceWriter) makeWriter(ctx context.Context, orgID int64, dsUID string) (*PrometheusWriter, error) {
ds, err := w.datasources.GetDataSource(ctx, &datasources.GetDataSourceQuery{
UID: dsUID,
@ -101,13 +167,11 @@ func (w *DatasourceWriter) makeWriter(ctx context.Context, orgID int64, dsUID st
return nil, err
}
u, err := url.Parse(is.URL)
u, err := getRemoteWriteURL(ds)
if err != nil {
return nil, err
}
u = u.JoinPath(w.cfg.RemoteWritePathSuffix)
cfg := PrometheusWriterConfig{
URL: u.String(),
HTTPOptions: httpclient.Options{
@ -121,6 +185,15 @@ func (w *DatasourceWriter) makeWriter(ctx context.Context, orgID int64, dsUID st
return nil, err
}
w.l.Debug("Created Prometheus remote writer",
"datasource_uid", dsUID,
"type", ds.Type,
"prometheusType", getPrometheusType(ds),
"url", cfg.URL,
"tls", cfg.HTTPOptions.TLS != nil,
"basic_auth", cfg.HTTPOptions.BasicAuth != nil,
"timeout", cfg.Timeout)
return NewPrometheusWriter(
cfg,
w.httpClientProvider,

View File

@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/datasources"
@ -43,16 +44,20 @@ func setupDataSources(t *testing.T) *testDataSources {
})
p1, _ := res.AddDataSource(context.Background(), &datasources.AddDataSourceCommand{
UID: "prom-1",
Type: datasources.DS_PROMETHEUS,
UID: "prom-1",
Type: datasources.DS_PROMETHEUS,
JsonData: simplejson.MustJson([]byte(`{"prometheusType":"Prometheus"}`)),
})
p1.URL = res.prom1.srv.URL + "/api/v1"
p1.URL = res.prom1.srv.URL
res.prom1.ExpectedPath = "/api/v1/write"
p2, _ := res.AddDataSource(context.Background(), &datasources.AddDataSourceCommand{
UID: "prom-2",
Type: datasources.DS_PROMETHEUS,
UID: "prom-2",
Type: datasources.DS_PROMETHEUS,
JsonData: simplejson.MustJson([]byte(`{"prometheusType":"Mimir"}`)),
})
p2.URL = res.prom2.srv.URL + "/api/v1"
p2.URL = res.prom2.srv.URL + "/api/prom"
res.prom2.ExpectedPath = "/api/prom/push"
// Add a non-Prometheus datasource.
_, _ = res.AddDataSource(context.Background(), &datasources.AddDataSourceCommand{
@ -70,9 +75,8 @@ func TestDatasourceWriter(t *testing.T) {
datasources := setupDataSources(t)
cfg := DatasourceWriterConfig{
Timeout: time.Second * 5,
DefaultDatasourceUID: "prom-2",
RemoteWritePathSuffix: "/write",
Timeout: time.Second * 5,
DefaultDatasourceUID: "prom-2",
}
met := metrics.NewRemoteWriterMetrics(prometheus.NewRegistry())
@ -117,3 +121,76 @@ func TestDatasourceWriter(t *testing.T) {
require.NoError(t, err)
})
}
func TestDatasourceWriterGetRemoteWriteURL(t *testing.T) {
tc := []struct {
name string
ds datasources.DataSource
url string
}{
{
"prometheus",
datasources.DataSource{
JsonData: simplejson.MustJson([]byte(`{"prometheusType":"Prometheus"}`)),
URL: "http://example.com",
},
"http://example.com/api/v1/write",
},
{
"prometheus with prefix",
datasources.DataSource{
JsonData: simplejson.MustJson([]byte(`{"prometheusType":"Prometheus"}`)),
URL: "http://example.com/myprom",
},
"http://example.com/myprom/api/v1/write",
},
{
"mimir/cortex legacy routes",
datasources.DataSource{
JsonData: simplejson.MustJson([]byte(`{"prometheusType":"Anything"}`)),
URL: "http://example.com/api/prom",
},
"http://example.com/api/prom/push",
},
{
"mimir/cortex legacy routes with prefix",
datasources.DataSource{
JsonData: simplejson.MustJson([]byte(`{"prometheusType":"Anything"}`)),
URL: "http://example.com/myprom/api/prom",
},
"http://example.com/myprom/api/prom/push",
},
{
"mimir/cortex new routes",
datasources.DataSource{
JsonData: simplejson.MustJson([]byte(`{"prometheusType":"Anything"}`)),
URL: "http://example.com/prometheus",
},
"http://example.com/api/v1/push",
},
{
"mimir/cortex new routes with prefix",
datasources.DataSource{
JsonData: simplejson.MustJson([]byte(`{"prometheusType":"Anything"}`)),
URL: "http://example.com/mymimir/prometheus",
},
"http://example.com/mymimir/api/v1/push",
},
{
"mimir/cortex with unknown suffix",
datasources.DataSource{
JsonData: simplejson.MustJson([]byte(`{"prometheusType":"Anything"}`)),
URL: "http://example.com/foo/bar",
},
"http://example.com/api/v1/push",
},
}
for _, tt := range tc {
t.Run(tt.name, func(t *testing.T) {
res, err := getRemoteWriteURL(&tt.ds)
require.NoError(t, err)
require.Equal(t, tt.url, res.String())
})
}
}

View File

@ -12,10 +12,7 @@ import (
"github.com/stretchr/testify/require"
)
const RemoteWritePrefix = "/api/v1"
const RemoteWriteSuffix = "/write"
const RemoteWriteEndpoint = RemoteWritePrefix + RemoteWriteSuffix
const RemoteWriteEndpoint = "/api/v1/write"
type TestRemoteWriteTarget struct {
srv *httptest.Server
@ -23,6 +20,8 @@ type TestRemoteWriteTarget struct {
mtx sync.Mutex
RequestsCount int
LastRequestBody string
ExpectedPath string
}
func NewTestRemoteWriteTarget(t *testing.T) *TestRemoteWriteTarget {
@ -31,10 +30,11 @@ func NewTestRemoteWriteTarget(t *testing.T) *TestRemoteWriteTarget {
target := &TestRemoteWriteTarget{
RequestsCount: 0,
LastRequestBody: "",
ExpectedPath: RemoteWriteEndpoint,
}
handler := func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != RemoteWriteEndpoint {
if r.URL.Path != target.ExpectedPath {
require.Fail(t, "Received unexpected request for endpoint %s", r.URL.Path)
}
@ -63,7 +63,7 @@ func (s *TestRemoteWriteTarget) Close() {
}
func (s *TestRemoteWriteTarget) DatasourceURL() string {
return s.srv.URL + RemoteWritePrefix
return s.srv.URL
}
func (s *TestRemoteWriteTarget) ClientSettings() setting.RecordingRuleSettings {

View File

@ -132,14 +132,13 @@ type UnifiedAlertingSettings struct {
}
type RecordingRuleSettings struct {
Enabled bool
URL string
BasicAuthUsername string
BasicAuthPassword string
CustomHeaders map[string]string
Timeout time.Duration
DefaultDatasourceUID string
RemoteWritePathSuffix string
Enabled bool
URL string
BasicAuthUsername string
BasicAuthPassword string
CustomHeaders map[string]string
Timeout time.Duration
DefaultDatasourceUID string
}
// RemoteAlertmanagerSettings contains the configuration needed
@ -437,13 +436,12 @@ func (cfg *Cfg) ReadUnifiedAlertingSettings(iniFile *ini.File) error {
rr := iniFile.Section("recording_rules")
uaCfgRecordingRules := RecordingRuleSettings{
Enabled: rr.Key("enabled").MustBool(false),
URL: rr.Key("url").MustString(""),
BasicAuthUsername: rr.Key("basic_auth_username").MustString(""),
BasicAuthPassword: rr.Key("basic_auth_password").MustString(""),
Timeout: rr.Key("timeout").MustDuration(defaultRecordingRequestTimeout),
DefaultDatasourceUID: rr.Key("default_datasource_uid").MustString(""),
RemoteWritePathSuffix: rr.Key("remote_write_path_suffix").MustString("/push"),
Enabled: rr.Key("enabled").MustBool(false),
URL: rr.Key("url").MustString(""),
BasicAuthUsername: rr.Key("basic_auth_username").MustString(""),
BasicAuthPassword: rr.Key("basic_auth_password").MustString(""),
Timeout: rr.Key("timeout").MustDuration(defaultRecordingRequestTimeout),
DefaultDatasourceUID: rr.Key("default_datasource_uid").MustString(""),
}
rrHeaders := iniFile.Section("recording_rules.custom_headers")