mirror of
https://github.com/grafana/grafana.git
synced 2025-07-29 15:52:28 +08:00

* Add unified_alerting.notification_history to ini files * Parse notification history settings * Move Loki client to a separate package * Loki client: add params for metrics and traces * add NotificationHistorian * rm writeDuration * remove RangeQuery stuff * wip * wip * wip * wip * pass notification historian in tests * unify loki settings * unify loki settings * add test * update grafana/alerting * make update-workspace * add feature toggle * fix configureNotificationHistorian * Revert "add feature toggle" This reverts commit de7af8f7 * add feature toggle * more tests * RuleUID * fix metrics test * met.Info.Set(0)
399 lines
14 KiB
Go
399 lines
14 KiB
Go
package ngalert
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"math/rand"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/testutil"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/grafana/grafana/pkg/bus"
|
|
"github.com/grafana/grafana/pkg/events"
|
|
"github.com/grafana/grafana/pkg/infra/db"
|
|
"github.com/grafana/grafana/pkg/infra/log"
|
|
"github.com/grafana/grafana/pkg/infra/tracing"
|
|
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
|
"github.com/grafana/grafana/pkg/services/folder"
|
|
acfakes "github.com/grafana/grafana/pkg/services/ngalert/accesscontrol/fakes"
|
|
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
|
"github.com/grafana/grafana/pkg/services/ngalert/models"
|
|
"github.com/grafana/grafana/pkg/services/ngalert/state"
|
|
"github.com/grafana/grafana/pkg/services/ngalert/store"
|
|
"github.com/grafana/grafana/pkg/services/ngalert/tests/fakes"
|
|
"github.com/grafana/grafana/pkg/setting"
|
|
"github.com/grafana/grafana/pkg/util"
|
|
)
|
|
|
|
func Test_subscribeToFolderChanges(t *testing.T) {
|
|
getRecordedCommand := func(ruleStore *fakes.RuleStore) []fakes.GenericRecordedQuery {
|
|
results := ruleStore.GetRecordedCommands(func(cmd any) (any, bool) {
|
|
c, ok := cmd.(fakes.GenericRecordedQuery)
|
|
if !ok || c.Name != "IncreaseVersionForAllRulesInNamespaces" {
|
|
return nil, false
|
|
}
|
|
return c, ok
|
|
})
|
|
var result []fakes.GenericRecordedQuery
|
|
for _, cmd := range results {
|
|
result = append(result, cmd.(fakes.GenericRecordedQuery))
|
|
}
|
|
return result
|
|
}
|
|
|
|
orgID := rand.Int63()
|
|
folder1 := &folder.Folder{
|
|
UID: util.GenerateShortUID(),
|
|
Title: "Folder" + util.GenerateShortUID(),
|
|
}
|
|
folder2 := &folder.Folder{
|
|
UID: util.GenerateShortUID(),
|
|
Title: "Folder" + util.GenerateShortUID(),
|
|
}
|
|
gen := models.RuleGen
|
|
rules := gen.With(gen.WithOrgID(orgID), gen.WithNamespace(folder1.ToFolderReference())).GenerateManyRef(5)
|
|
|
|
bus := bus.ProvideBus(tracing.InitializeTracerForTest())
|
|
db := fakes.NewRuleStore(t)
|
|
db.Folders[orgID] = append(db.Folders[orgID], folder1)
|
|
db.PutRule(context.Background(), rules...)
|
|
|
|
subscribeToFolderChanges(log.New("test"), bus, db)
|
|
|
|
err := bus.Publish(context.Background(), &events.FolderFullPathUpdated{
|
|
Timestamp: time.Now(),
|
|
UIDs: []string{folder1.UID, folder2.UID},
|
|
OrgID: orgID,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
require.EventuallyWithT(t, func(c *assert.CollectT) {
|
|
recordedCommands := getRecordedCommand(db)
|
|
require.Len(c, recordedCommands, 1)
|
|
require.Equal(c, recordedCommands[0].Params[0].(int64), orgID)
|
|
require.ElementsMatch(c, recordedCommands[0].Params[1].([]string), []string{folder1.UID, folder2.UID})
|
|
}, time.Second, 10*time.Millisecond, "expected to call db store method but nothing was called")
|
|
}
|
|
|
|
func TestConfigureHistorianBackend(t *testing.T) {
|
|
t.Run("fail initialization if invalid backend", func(t *testing.T) {
|
|
met := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)
|
|
logger := log.NewNopLogger()
|
|
tracer := tracing.InitializeTracerForTest()
|
|
cfg := setting.UnifiedAlertingStateHistorySettings{
|
|
Enabled: true,
|
|
Backend: "invalid-backend",
|
|
}
|
|
ac := &acfakes.FakeRuleService{}
|
|
|
|
_, err := configureHistorianBackend(context.Background(), cfg, nil, nil, nil, met, logger, tracer, ac, nil, nil, nil, nil, nil)
|
|
|
|
require.ErrorContains(t, err, "unrecognized")
|
|
})
|
|
|
|
t.Run("fail initialization if invalid multi-backend primary", func(t *testing.T) {
|
|
met := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)
|
|
logger := log.NewNopLogger()
|
|
tracer := tracing.InitializeTracerForTest()
|
|
cfg := setting.UnifiedAlertingStateHistorySettings{
|
|
Enabled: true,
|
|
Backend: "multiple",
|
|
MultiPrimary: "invalid-backend",
|
|
}
|
|
ac := &acfakes.FakeRuleService{}
|
|
|
|
_, err := configureHistorianBackend(context.Background(), cfg, nil, nil, nil, met, logger, tracer, ac, nil, nil, nil, nil, nil)
|
|
|
|
require.ErrorContains(t, err, "multi-backend target")
|
|
require.ErrorContains(t, err, "unrecognized")
|
|
})
|
|
|
|
t.Run("fail initialization if invalid multi-backend secondary", func(t *testing.T) {
|
|
met := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)
|
|
logger := log.NewNopLogger()
|
|
tracer := tracing.InitializeTracerForTest()
|
|
cfg := setting.UnifiedAlertingStateHistorySettings{
|
|
Enabled: true,
|
|
Backend: "multiple",
|
|
MultiPrimary: "annotations",
|
|
MultiSecondaries: []string{"annotations", "invalid-backend"},
|
|
}
|
|
ac := &acfakes.FakeRuleService{}
|
|
|
|
_, err := configureHistorianBackend(context.Background(), cfg, nil, nil, nil, met, logger, tracer, ac, nil, nil, nil, nil, nil)
|
|
|
|
require.ErrorContains(t, err, "multi-backend target")
|
|
require.ErrorContains(t, err, "unrecognized")
|
|
})
|
|
|
|
t.Run("do not fail initialization if pinging Loki fails", func(t *testing.T) {
|
|
met := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)
|
|
logger := log.NewNopLogger()
|
|
tracer := tracing.InitializeTracerForTest()
|
|
cfg := setting.UnifiedAlertingStateHistorySettings{
|
|
Enabled: true,
|
|
Backend: "loki",
|
|
LokiSettings: setting.UnifiedAlertingLokiSettings{
|
|
// Should never resolve at the DNS level: https://www.rfc-editor.org/rfc/rfc6761#section-6.4
|
|
LokiReadURL: "http://gone.invalid",
|
|
LokiWriteURL: "http://gone.invalid",
|
|
},
|
|
}
|
|
ac := &acfakes.FakeRuleService{}
|
|
|
|
h, err := configureHistorianBackend(context.Background(), cfg, nil, nil, nil, met, logger, tracer, ac, nil, nil, nil, nil, nil)
|
|
|
|
require.NotNil(t, h)
|
|
require.NoError(t, err)
|
|
})
|
|
|
|
t.Run("fail initialization if prometheus backend missing datasource UID", func(t *testing.T) {
|
|
met := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)
|
|
logger := log.NewNopLogger()
|
|
tracer := tracing.InitializeTracerForTest()
|
|
cfg := setting.UnifiedAlertingStateHistorySettings{
|
|
Enabled: true,
|
|
Backend: "prometheus",
|
|
// Missing PrometheusTargetDatasourceUID
|
|
}
|
|
ac := &acfakes.FakeRuleService{}
|
|
|
|
_, err := configureHistorianBackend(context.Background(), cfg, nil, nil, nil, met, logger, tracer, ac, nil, nil, nil, nil, nil)
|
|
|
|
require.Error(t, err)
|
|
require.ErrorContains(t, err, "datasource UID must not be empty")
|
|
})
|
|
|
|
t.Run("successful initialization of prometheus backend", func(t *testing.T) {
|
|
met := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)
|
|
logger := log.NewNopLogger()
|
|
tracer := tracing.InitializeTracerForTest()
|
|
cfg := setting.UnifiedAlertingStateHistorySettings{
|
|
Enabled: true,
|
|
Backend: "prometheus",
|
|
PrometheusMetricName: "test_metric",
|
|
PrometheusTargetDatasourceUID: "test-prometheus-uid",
|
|
}
|
|
ac := &acfakes.FakeRuleService{}
|
|
|
|
h, err := configureHistorianBackend(context.Background(), cfg, nil, nil, nil, met, logger, tracer, ac, nil, nil, nil, nil, nil)
|
|
|
|
require.NotNil(t, h)
|
|
require.NoError(t, err)
|
|
})
|
|
|
|
t.Run("emit metric describing chosen backend", func(t *testing.T) {
|
|
reg := prometheus.NewRegistry()
|
|
met := metrics.NewHistorianMetrics(reg, metrics.Subsystem)
|
|
logger := log.NewNopLogger()
|
|
tracer := tracing.InitializeTracerForTest()
|
|
cfg := setting.UnifiedAlertingStateHistorySettings{
|
|
Enabled: true,
|
|
Backend: "annotations",
|
|
}
|
|
ac := &acfakes.FakeRuleService{}
|
|
|
|
h, err := configureHistorianBackend(context.Background(), cfg, nil, nil, nil, met, logger, tracer, ac, nil, nil, nil, nil, nil)
|
|
|
|
require.NotNil(t, h)
|
|
require.NoError(t, err)
|
|
exp := bytes.NewBufferString(`
|
|
# HELP grafana_alerting_state_history_info Information about the state history store.
|
|
# TYPE grafana_alerting_state_history_info gauge
|
|
grafana_alerting_state_history_info{backend="annotations"} 1
|
|
`)
|
|
err = testutil.GatherAndCompare(reg, exp, "grafana_alerting_state_history_info")
|
|
require.NoError(t, err)
|
|
})
|
|
|
|
t.Run("emit special zero metric if state history disabled", func(t *testing.T) {
|
|
reg := prometheus.NewRegistry()
|
|
met := metrics.NewHistorianMetrics(reg, metrics.Subsystem)
|
|
logger := log.NewNopLogger()
|
|
tracer := tracing.InitializeTracerForTest()
|
|
cfg := setting.UnifiedAlertingStateHistorySettings{
|
|
Enabled: false,
|
|
}
|
|
ac := &acfakes.FakeRuleService{}
|
|
|
|
h, err := configureHistorianBackend(context.Background(), cfg, nil, nil, nil, met, logger, tracer, ac, nil, nil, nil, nil, nil)
|
|
|
|
require.NotNil(t, h)
|
|
require.NoError(t, err)
|
|
exp := bytes.NewBufferString(`
|
|
# HELP grafana_alerting_state_history_info Information about the state history store.
|
|
# TYPE grafana_alerting_state_history_info gauge
|
|
grafana_alerting_state_history_info{backend="noop"} 0
|
|
`)
|
|
err = testutil.GatherAndCompare(reg, exp, "grafana_alerting_state_history_info")
|
|
require.NoError(t, err)
|
|
})
|
|
}
|
|
|
|
func TestConfigureNotificationHistorian(t *testing.T) {
|
|
t.Run("do not fail initialization if pinging Loki fails", func(t *testing.T) {
|
|
reg := prometheus.NewRegistry()
|
|
met := metrics.NewNotificationHistorianMetrics(reg)
|
|
logger := log.NewNopLogger()
|
|
tracer := tracing.InitializeTracerForTest()
|
|
ft := featuremgmt.WithFeatures(featuremgmt.FlagAlertingNotificationHistory)
|
|
cfg := setting.UnifiedAlertingNotificationHistorySettings{
|
|
Enabled: true,
|
|
LokiSettings: setting.UnifiedAlertingLokiSettings{
|
|
// Should never resolve at the DNS level: https://www.rfc-editor.org/rfc/rfc6761#section-6.4
|
|
LokiRemoteURL: "http://gone.invalid",
|
|
},
|
|
}
|
|
|
|
h, err := configureNotificationHistorian(context.Background(), ft, cfg, met, logger, tracer)
|
|
require.NotNil(t, h)
|
|
require.NoError(t, err)
|
|
|
|
// Verify that the metric value is set to 1, indicating that notification history is enabled.
|
|
exp := bytes.NewBufferString(`
|
|
# HELP grafana_alerting_notification_history_info Information about the notification history store.
|
|
# TYPE grafana_alerting_notification_history_info gauge
|
|
grafana_alerting_notification_history_info 1
|
|
`)
|
|
err = testutil.GatherAndCompare(reg, exp, "grafana_alerting_notification_history_info")
|
|
require.NoError(t, err)
|
|
})
|
|
|
|
t.Run("emit special zero metric if notification history disabled", func(t *testing.T) {
|
|
testCases := []struct {
|
|
name string
|
|
ft featuremgmt.FeatureToggles
|
|
cfg setting.UnifiedAlertingNotificationHistorySettings
|
|
}{
|
|
{
|
|
"disabled via config",
|
|
featuremgmt.WithFeatures(featuremgmt.FlagAlertingNotificationHistory),
|
|
setting.UnifiedAlertingNotificationHistorySettings{Enabled: false},
|
|
},
|
|
{
|
|
"disabled via feature toggle",
|
|
featuremgmt.WithFeatures(),
|
|
setting.UnifiedAlertingNotificationHistorySettings{Enabled: true},
|
|
},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
reg := prometheus.NewRegistry()
|
|
met := metrics.NewNotificationHistorianMetrics(reg)
|
|
logger := log.NewNopLogger()
|
|
tracer := tracing.InitializeTracerForTest()
|
|
h, err := configureNotificationHistorian(context.Background(), tc.ft, tc.cfg, met, logger, tracer)
|
|
require.Nil(t, h)
|
|
require.NoError(t, err)
|
|
|
|
exp := bytes.NewBufferString(`
|
|
# HELP grafana_alerting_notification_history_info Information about the notification history store.
|
|
# TYPE grafana_alerting_notification_history_info gauge
|
|
grafana_alerting_notification_history_info 0
|
|
`)
|
|
err = testutil.GatherAndCompare(reg, exp, "grafana_alerting_notification_history_info")
|
|
require.NoError(t, err)
|
|
})
|
|
}
|
|
})
|
|
}
|
|
|
|
type mockDB struct {
|
|
db.DB
|
|
}
|
|
|
|
func TestInitInstanceStore(t *testing.T) {
|
|
sqlStore := &mockDB{}
|
|
logger := log.New()
|
|
|
|
tests := []struct {
|
|
name string
|
|
ft featuremgmt.FeatureToggles
|
|
expectedInstanceStoreType interface{}
|
|
}{
|
|
{
|
|
name: "Compressed flag enabled, no periodic flag",
|
|
ft: featuremgmt.WithFeatures(
|
|
featuremgmt.FlagAlertingSaveStateCompressed,
|
|
),
|
|
expectedInstanceStoreType: store.ProtoInstanceDBStore{},
|
|
},
|
|
{
|
|
name: "Compressed flag enabled with periodic flag",
|
|
ft: featuremgmt.WithFeatures(
|
|
featuremgmt.FlagAlertingSaveStateCompressed,
|
|
featuremgmt.FlagAlertingSaveStatePeriodic,
|
|
),
|
|
expectedInstanceStoreType: store.ProtoInstanceDBStore{},
|
|
},
|
|
{
|
|
name: "Compressed flag disabled",
|
|
ft: featuremgmt.WithFeatures(),
|
|
expectedInstanceStoreType: store.InstanceDBStore{},
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
instanceStore, instanceReader := initInstanceStore(sqlStore, logger, tt.ft)
|
|
assert.IsType(t, tt.expectedInstanceStoreType, instanceStore)
|
|
assert.IsType(t, &state.MultiInstanceReader{}, instanceReader)
|
|
assert.IsType(t, store.ProtoInstanceDBStore{}, instanceReader.(*state.MultiInstanceReader).ProtoDBReader)
|
|
assert.IsType(t, store.InstanceDBStore{}, instanceReader.(*state.MultiInstanceReader).DBReader)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestInitStatePersister(t *testing.T) {
|
|
ua := setting.UnifiedAlertingSettings{
|
|
StatePeriodicSaveInterval: 1 * time.Minute,
|
|
}
|
|
cfg := state.ManagerCfg{}
|
|
|
|
tests := []struct {
|
|
name string
|
|
ft featuremgmt.FeatureToggles
|
|
expectedStatePersisterType state.StatePersister
|
|
}{
|
|
{
|
|
name: "Compressed flag enabled",
|
|
ft: featuremgmt.WithFeatures(
|
|
featuremgmt.FlagAlertingSaveStateCompressed,
|
|
),
|
|
expectedStatePersisterType: &state.SyncRuleStatePersister{},
|
|
},
|
|
{
|
|
name: "Periodic flag enabled",
|
|
ft: featuremgmt.WithFeatures(
|
|
featuremgmt.FlagAlertingSaveStatePeriodic,
|
|
),
|
|
expectedStatePersisterType: &state.AsyncStatePersister{},
|
|
},
|
|
{
|
|
name: "No flags enabled",
|
|
ft: featuremgmt.WithFeatures(),
|
|
expectedStatePersisterType: &state.SyncStatePersister{},
|
|
},
|
|
{
|
|
name: "Both flags enabled - compressed takes precedence",
|
|
ft: featuremgmt.WithFeatures(
|
|
featuremgmt.FlagAlertingSaveStateCompressed,
|
|
featuremgmt.FlagAlertingSaveStatePeriodic,
|
|
),
|
|
expectedStatePersisterType: &state.SyncRuleStatePersister{},
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
statePersister := initStatePersister(ua, cfg, tt.ft)
|
|
assert.IsType(t, tt.expectedStatePersisterType, statePersister)
|
|
})
|
|
}
|
|
}
|