From bad4f28d0d18007e3d5c597ef815bb0bb46ff83c Mon Sep 17 00:00:00 2001 From: Yuri Tseretyan Date: Wed, 9 Nov 2022 15:08:57 -0500 Subject: [PATCH] Alerting: update test TestAlertingTicker to not rely on clock (#58544) * extract method processTick * make processTick return scheduled rules * move state manager tests to state manager * update test * move all tests into one file * remove unused fields --- pkg/services/ngalert/models/testing.go | 12 + pkg/services/ngalert/schedule/schedule.go | 219 +++++++------ .../ngalert/schedule/schedule_test.go | 299 ------------------ .../ngalert/schedule/schedule_unit_test.go | 215 +++++++++++++ pkg/services/ngalert/schedule/testing.go | 6 + pkg/services/ngalert/state/manager_test.go | 88 ++++++ 6 files changed, 430 insertions(+), 409 deletions(-) delete mode 100644 pkg/services/ngalert/schedule/schedule_test.go diff --git a/pkg/services/ngalert/models/testing.go b/pkg/services/ngalert/models/testing.go index 1ca0156578e..782f76d9b58 100644 --- a/pkg/services/ngalert/models/testing.go +++ b/pkg/services/ngalert/models/testing.go @@ -146,6 +146,18 @@ func WithNamespace(namespace *models2.Folder) AlertRuleMutator { } } +func WithInterval(interval time.Duration) AlertRuleMutator { + return func(rule *AlertRule) { + rule.IntervalSeconds = int64(interval.Seconds()) + } +} + +func WithTitle(title string) AlertRuleMutator { + return func(rule *AlertRule) { + rule.Title = title + } +} + func GenerateAlertLabels(count int, prefix string) data.Labels { labels := make(data.Labels, count) for i := 0; i < count; i++ { diff --git a/pkg/services/ngalert/schedule/schedule.go b/pkg/services/ngalert/schedule/schedule.go index b33c0de0d6d..5a841653eaf 100644 --- a/pkg/services/ngalert/schedule/schedule.go +++ b/pkg/services/ngalert/schedule/schedule.go @@ -37,9 +37,6 @@ type ScheduleService interface { UpdateAlertRule(key ngmodels.AlertRuleKey, lastVersion int64) // DeleteAlertRule notifies scheduler that rules have been deleted DeleteAlertRule(keys ...ngmodels.AlertRuleKey) - // the following are used by tests only used for tests - evalApplied(ngmodels.AlertRuleKey, time.Time) - stopApplied(ngmodels.AlertRuleKey) } // AlertsSender is an interface for a service that is responsible for sending notifications to the end-user. @@ -103,8 +100,6 @@ type schedule struct { type SchedulerCfg struct { Cfg setting.UnifiedAlertingSettings C clock.Clock - EvalAppliedFunc func(ngmodels.AlertRuleKey, time.Time) - StopAppliedFunc func(ngmodels.AlertRuleKey) EvaluatorFactory eval.EvaluatorFactory RuleStore RulesStore Metrics *metrics.Scheduler @@ -119,8 +114,6 @@ func NewScheduler(cfg SchedulerCfg, appURL *url.URL, stateManager *state.Manager clock: cfg.C, baseInterval: cfg.Cfg.BaseInterval, log: log.New("ngalert.scheduler"), - evalAppliedFunc: cfg.EvalAppliedFunc, - stopAppliedFunc: cfg.StopAppliedFunc, evaluatorFactory: cfg.EvaluatorFactory, ruleStore: cfg.RuleStore, metrics: cfg.Metrics, @@ -190,109 +183,7 @@ func (sch *schedule) schedulePeriodic(ctx context.Context, t *ticker.T) error { start := time.Now().Round(0) sch.metrics.BehindSeconds.Set(start.Sub(tick).Seconds()) - tickNum := tick.Unix() / int64(sch.baseInterval.Seconds()) - - if err := sch.updateSchedulableAlertRules(ctx); err != nil { - sch.log.Error("Failed to update alert rules", "error", err) - } - alertRules, folderTitles := sch.schedulableAlertRules.all() - - // registeredDefinitions is a map used for finding deleted alert rules - // initially it is assigned to all known alert rules from the previous cycle - // each alert rule found also in this cycle is removed - // so, at the end, the remaining registered alert rules are the deleted ones - registeredDefinitions := sch.registry.keyMap() - - // While these are the rules that we iterate over, at the moment there's no 100% guarantee that they'll be - // scheduled as rules could be removed before we get a chance to evaluate them. - sch.metrics.SchedulableAlertRules.Set(float64(len(alertRules))) - sch.metrics.SchedulableAlertRulesHash.Set(float64(hashUIDs(alertRules))) - - type readyToRunItem struct { - ruleInfo *alertRuleInfo - evaluation - } - - readyToRun := make([]readyToRunItem, 0) - missingFolder := make(map[string][]string) - for _, item := range alertRules { - key := item.GetKey() - ruleInfo, newRoutine := sch.registry.getOrCreateInfo(ctx, key) - - // enforce minimum evaluation interval - if item.IntervalSeconds < int64(sch.minRuleInterval.Seconds()) { - sch.log.Debug("Interval adjusted", append(key.LogContext(), "originalInterval", item.IntervalSeconds, "adjustedInterval", sch.minRuleInterval.Seconds())...) - item.IntervalSeconds = int64(sch.minRuleInterval.Seconds()) - } - - invalidInterval := item.IntervalSeconds%int64(sch.baseInterval.Seconds()) != 0 - - if newRoutine && !invalidInterval { - dispatcherGroup.Go(func() error { - return sch.ruleRoutine(ruleInfo.ctx, key, ruleInfo.evalCh, ruleInfo.updateCh) - }) - } - - if invalidInterval { - // this is expected to be always false - // given that we validate interval during alert rule updates - sch.log.Warn("Rule has an invalid interval and will be ignored. Interval should be divided exactly by scheduler interval", append(key.LogContext(), "ruleInterval", time.Duration(item.IntervalSeconds)*time.Second, "schedulerInterval", sch.baseInterval)...) - continue - } - - itemFrequency := item.IntervalSeconds / int64(sch.baseInterval.Seconds()) - if item.IntervalSeconds != 0 && tickNum%itemFrequency == 0 { - var folderTitle string - if !sch.disableGrafanaFolder { - title, ok := folderTitles[item.NamespaceUID] - if ok { - folderTitle = title - } else { - missingFolder[item.NamespaceUID] = append(missingFolder[item.NamespaceUID], item.UID) - } - } - readyToRun = append(readyToRun, readyToRunItem{ruleInfo: ruleInfo, evaluation: evaluation{ - scheduledAt: tick, - rule: item, - folderTitle: folderTitle, - }}) - } - - // remove the alert rule from the registered alert rules - delete(registeredDefinitions, key) - } - - if len(missingFolder) > 0 { // if this happens then there can be problems with fetching folders from the database. - sch.log.Warn("Unable to obtain folder titles for some rules", "missingFolderUIDToRuleUID", missingFolder) - } - - var step int64 = 0 - if len(readyToRun) > 0 { - step = sch.baseInterval.Nanoseconds() / int64(len(readyToRun)) - } - - for i := range readyToRun { - item := readyToRun[i] - - time.AfterFunc(time.Duration(int64(i)*step), func() { - key := item.rule.GetKey() - success, dropped := item.ruleInfo.eval(&item.evaluation) - if !success { - sch.log.Debug("Scheduled evaluation was canceled because evaluation routine was stopped", append(key.LogContext(), "time", tick)...) - return - } - if dropped != nil { - sch.log.Warn("Tick dropped because alert rule evaluation is too slow", append(key.LogContext(), "time", tick)...) - orgID := fmt.Sprint(key.OrgID) - sch.metrics.EvaluationMissed.WithLabelValues(orgID, item.rule.Title).Inc() - } - }) - } - - // unregister and stop routines of the deleted alert rules - for key := range registeredDefinitions { - sch.DeleteAlertRule(key) - } + sch.processTick(ctx, dispatcherGroup, tick) sch.metrics.SchedulePeriodicDuration.Observe(time.Since(start).Seconds()) case <-ctx.Done(): @@ -303,6 +194,114 @@ func (sch *schedule) schedulePeriodic(ctx context.Context, t *ticker.T) error { } } +type readyToRunItem struct { + ruleInfo *alertRuleInfo + evaluation +} + +func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.Group, tick time.Time) ([]readyToRunItem, map[ngmodels.AlertRuleKey]struct{}) { + tickNum := tick.Unix() / int64(sch.baseInterval.Seconds()) + + if err := sch.updateSchedulableAlertRules(ctx); err != nil { + sch.log.Error("Failed to update alert rules", "error", err) + } + alertRules, folderTitles := sch.schedulableAlertRules.all() + + // registeredDefinitions is a map used for finding deleted alert rules + // initially it is assigned to all known alert rules from the previous cycle + // each alert rule found also in this cycle is removed + // so, at the end, the remaining registered alert rules are the deleted ones + registeredDefinitions := sch.registry.keyMap() + + // While these are the rules that we iterate over, at the moment there's no 100% guarantee that they'll be + // scheduled as rules could be removed before we get a chance to evaluate them. + sch.metrics.SchedulableAlertRules.Set(float64(len(alertRules))) + sch.metrics.SchedulableAlertRulesHash.Set(float64(hashUIDs(alertRules))) + + readyToRun := make([]readyToRunItem, 0) + missingFolder := make(map[string][]string) + for _, item := range alertRules { + key := item.GetKey() + ruleInfo, newRoutine := sch.registry.getOrCreateInfo(ctx, key) + + // enforce minimum evaluation interval + if item.IntervalSeconds < int64(sch.minRuleInterval.Seconds()) { + sch.log.Debug("Interval adjusted", append(key.LogContext(), "originalInterval", item.IntervalSeconds, "adjustedInterval", sch.minRuleInterval.Seconds())...) + item.IntervalSeconds = int64(sch.minRuleInterval.Seconds()) + } + + invalidInterval := item.IntervalSeconds%int64(sch.baseInterval.Seconds()) != 0 + + if newRoutine && !invalidInterval { + dispatcherGroup.Go(func() error { + return sch.ruleRoutine(ruleInfo.ctx, key, ruleInfo.evalCh, ruleInfo.updateCh) + }) + } + + if invalidInterval { + // this is expected to be always false + // given that we validate interval during alert rule updates + sch.log.Warn("Rule has an invalid interval and will be ignored. Interval should be divided exactly by scheduler interval", append(key.LogContext(), "ruleInterval", time.Duration(item.IntervalSeconds)*time.Second, "schedulerInterval", sch.baseInterval)...) + continue + } + + itemFrequency := item.IntervalSeconds / int64(sch.baseInterval.Seconds()) + if item.IntervalSeconds != 0 && tickNum%itemFrequency == 0 { + var folderTitle string + if !sch.disableGrafanaFolder { + title, ok := folderTitles[item.NamespaceUID] + if ok { + folderTitle = title + } else { + missingFolder[item.NamespaceUID] = append(missingFolder[item.NamespaceUID], item.UID) + } + } + readyToRun = append(readyToRun, readyToRunItem{ruleInfo: ruleInfo, evaluation: evaluation{ + scheduledAt: tick, + rule: item, + folderTitle: folderTitle, + }}) + } + + // remove the alert rule from the registered alert rules + delete(registeredDefinitions, key) + } + + if len(missingFolder) > 0 { // if this happens then there can be problems with fetching folders from the database. + sch.log.Warn("Unable to obtain folder titles for some rules", "missingFolderUIDToRuleUID", missingFolder) + } + + var step int64 = 0 + if len(readyToRun) > 0 { + step = sch.baseInterval.Nanoseconds() / int64(len(readyToRun)) + } + + for i := range readyToRun { + item := readyToRun[i] + + time.AfterFunc(time.Duration(int64(i)*step), func() { + key := item.rule.GetKey() + success, dropped := item.ruleInfo.eval(&item.evaluation) + if !success { + sch.log.Debug("Scheduled evaluation was canceled because evaluation routine was stopped", append(key.LogContext(), "time", tick)...) + return + } + if dropped != nil { + sch.log.Warn("Tick dropped because alert rule evaluation is too slow", append(key.LogContext(), "time", tick)...) + orgID := fmt.Sprint(key.OrgID) + sch.metrics.EvaluationMissed.WithLabelValues(orgID, item.rule.Title).Inc() + } + }) + } + + // unregister and stop routines of the deleted alert rules + for key := range registeredDefinitions { + sch.DeleteAlertRule(key) + } + + return readyToRun, registeredDefinitions +} + func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertRuleKey, evalCh <-chan *evaluation, updateCh <-chan ruleVersion) error { grafanaCtx = ngmodels.WithRuleKey(grafanaCtx, key) logger := sch.log.FromContext(grafanaCtx) diff --git a/pkg/services/ngalert/schedule/schedule_test.go b/pkg/services/ngalert/schedule/schedule_test.go deleted file mode 100644 index f741db6aa3b..00000000000 --- a/pkg/services/ngalert/schedule/schedule_test.go +++ /dev/null @@ -1,299 +0,0 @@ -package schedule_test - -import ( - "context" - "fmt" - "net/url" - "runtime" - "strings" - "testing" - "time" - - "github.com/benbjohnson/clock" - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - "github.com/grafana/grafana-plugin-sdk-go/data" - "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - - "github.com/grafana/grafana/pkg/services/ngalert/eval" - "github.com/grafana/grafana/pkg/services/ngalert/image" - "github.com/grafana/grafana/pkg/services/ngalert/metrics" - "github.com/grafana/grafana/pkg/services/ngalert/models" - "github.com/grafana/grafana/pkg/services/ngalert/schedule" - "github.com/grafana/grafana/pkg/services/ngalert/state" - "github.com/grafana/grafana/pkg/services/ngalert/tests" - "github.com/grafana/grafana/pkg/setting" -) - -var testMetrics = metrics.NewNGAlert(prometheus.NewPedanticRegistry()) - -type evalAppliedInfo struct { - alertDefKey models.AlertRuleKey - now time.Time -} - -func TestWarmStateCache(t *testing.T) { - evaluationTime, err := time.Parse("2006-01-02", "2021-03-25") - require.NoError(t, err) - ctx := context.Background() - _, dbstore := tests.SetupTestEnv(t, 1) - - const mainOrgID int64 = 1 - rule := tests.CreateTestAlertRule(t, ctx, dbstore, 600, mainOrgID) - - expectedEntries := []*state.State{ - { - AlertRuleUID: rule.UID, - OrgID: rule.OrgID, - CacheID: `[["test1","testValue1"]]`, - Labels: data.Labels{"test1": "testValue1"}, - State: eval.Normal, - Results: []state.Evaluation{ - {EvaluationTime: evaluationTime, EvaluationState: eval.Normal}, - }, - StartsAt: evaluationTime.Add(-1 * time.Minute), - EndsAt: evaluationTime.Add(1 * time.Minute), - LastEvaluationTime: evaluationTime, - Annotations: map[string]string{"testAnnoKey": "testAnnoValue"}, - }, { - AlertRuleUID: rule.UID, - OrgID: rule.OrgID, - CacheID: `[["test2","testValue2"]]`, - Labels: data.Labels{"test2": "testValue2"}, - State: eval.Alerting, - Results: []state.Evaluation{ - {EvaluationTime: evaluationTime, EvaluationState: eval.Alerting}, - }, - StartsAt: evaluationTime.Add(-1 * time.Minute), - EndsAt: evaluationTime.Add(1 * time.Minute), - LastEvaluationTime: evaluationTime, - Annotations: map[string]string{"testAnnoKey": "testAnnoValue"}, - }, - } - - labels := models.InstanceLabels{"test1": "testValue1"} - _, hash, _ := labels.StringAndHash() - instance1 := models.AlertInstance{ - AlertInstanceKey: models.AlertInstanceKey{ - RuleOrgID: rule.OrgID, - RuleUID: rule.UID, - LabelsHash: hash, - }, - CurrentState: models.InstanceStateNormal, - LastEvalTime: evaluationTime, - CurrentStateSince: evaluationTime.Add(-1 * time.Minute), - CurrentStateEnd: evaluationTime.Add(1 * time.Minute), - Labels: labels, - } - - _ = dbstore.SaveAlertInstances(ctx, instance1) - - labels = models.InstanceLabels{"test2": "testValue2"} - _, hash, _ = labels.StringAndHash() - instance2 := models.AlertInstance{ - AlertInstanceKey: models.AlertInstanceKey{ - RuleOrgID: rule.OrgID, - RuleUID: rule.UID, - LabelsHash: hash, - }, - CurrentState: models.InstanceStateFiring, - LastEvalTime: evaluationTime, - CurrentStateSince: evaluationTime.Add(-1 * time.Minute), - CurrentStateEnd: evaluationTime.Add(1 * time.Minute), - Labels: labels, - } - _ = dbstore.SaveAlertInstances(ctx, instance2) - st := state.NewManager(testMetrics.GetStateMetrics(), nil, dbstore, &image.NoopImageService{}, clock.NewMock(), &state.FakeHistorian{}) - st.Warm(ctx, dbstore) - - t.Run("instance cache has expected entries", func(t *testing.T) { - for _, entry := range expectedEntries { - cacheEntry := st.Get(entry.OrgID, entry.AlertRuleUID, entry.CacheID) - - if diff := cmp.Diff(entry, cacheEntry, cmpopts.IgnoreFields(state.State{}, "Results")); diff != "" { - t.Errorf("Result mismatch (-want +got):\n%s", diff) - t.FailNow() - } - } - }) -} - -func TestAlertingTicker(t *testing.T) { - ctx := context.Background() - _, dbstore := tests.SetupTestEnv(t, 1) - - alerts := make([]*models.AlertRule, 0) - - const mainOrgID int64 = 1 - // create alert rule under main org with one second interval - alerts = append(alerts, tests.CreateTestAlertRule(t, ctx, dbstore, 1, mainOrgID)) - - evalAppliedCh := make(chan evalAppliedInfo, len(alerts)) - stopAppliedCh := make(chan models.AlertRuleKey, len(alerts)) - - mockedClock := clock.NewMock() - - cfg := setting.UnifiedAlertingSettings{ - BaseInterval: time.Second, - AdminConfigPollInterval: 10 * time.Minute, // do not poll in unit tests. - } - - notifier := &schedule.AlertsSenderMock{} - notifier.EXPECT().Send(mock.Anything, mock.Anything).Return() - - schedCfg := schedule.SchedulerCfg{ - Cfg: cfg, - C: mockedClock, - EvalAppliedFunc: func(alertDefKey models.AlertRuleKey, now time.Time) { - evalAppliedCh <- evalAppliedInfo{alertDefKey: alertDefKey, now: now} - }, - StopAppliedFunc: func(alertDefKey models.AlertRuleKey) { - stopAppliedCh <- alertDefKey - }, - RuleStore: dbstore, - Metrics: testMetrics.GetSchedulerMetrics(), - AlertSender: notifier, - } - st := state.NewManager(testMetrics.GetStateMetrics(), nil, dbstore, &image.NoopImageService{}, clock.NewMock(), &state.FakeHistorian{}) - appUrl := &url.URL{ - Scheme: "http", - Host: "localhost", - } - sched := schedule.NewScheduler(schedCfg, appUrl, st) - - go func() { - err := sched.Run(ctx) - require.NoError(t, err) - }() - runtime.Gosched() - - expectedAlertRulesEvaluated := []models.AlertRuleKey{alerts[0].GetKey()} - t.Run(fmt.Sprintf("on 1st tick alert rules: %s should be evaluated", concatenate(expectedAlertRulesEvaluated)), func(t *testing.T) { - tick := advanceClock(t, mockedClock) - assertEvalRun(t, evalAppliedCh, tick, expectedAlertRulesEvaluated...) - }) - - // add alert rule under main org with three seconds interval - var threeSecInterval int64 = 3 - alerts = append(alerts, tests.CreateTestAlertRule(t, ctx, dbstore, threeSecInterval, mainOrgID)) - t.Logf("alert rule: %v added with interval: %d", alerts[1].GetKey(), threeSecInterval) - - expectedAlertRulesEvaluated = []models.AlertRuleKey{alerts[0].GetKey()} - t.Run(fmt.Sprintf("on 2nd tick alert rule: %s should be evaluated", concatenate(expectedAlertRulesEvaluated)), func(t *testing.T) { - tick := advanceClock(t, mockedClock) - assertEvalRun(t, evalAppliedCh, tick, expectedAlertRulesEvaluated...) - }) - - expectedAlertRulesEvaluated = []models.AlertRuleKey{alerts[1].GetKey(), alerts[0].GetKey()} - t.Run(fmt.Sprintf("on 3rd tick alert rules: %s should be evaluated", concatenate(expectedAlertRulesEvaluated)), func(t *testing.T) { - tick := advanceClock(t, mockedClock) - assertEvalRun(t, evalAppliedCh, tick, expectedAlertRulesEvaluated...) - }) - - expectedAlertRulesEvaluated = []models.AlertRuleKey{alerts[0].GetKey()} - t.Run(fmt.Sprintf("on 4th tick alert rules: %s should be evaluated", concatenate(expectedAlertRulesEvaluated)), func(t *testing.T) { - tick := advanceClock(t, mockedClock) - assertEvalRun(t, evalAppliedCh, tick, expectedAlertRulesEvaluated...) - }) - - key := alerts[0].GetKey() - err := dbstore.DeleteAlertRulesByUID(ctx, alerts[0].OrgID, alerts[0].UID) - require.NoError(t, err) - t.Logf("alert rule: %v deleted", key) - - expectedAlertRulesEvaluated = []models.AlertRuleKey{} - t.Run(fmt.Sprintf("on 5th tick alert rules: %s should be evaluated", concatenate(expectedAlertRulesEvaluated)), func(t *testing.T) { - tick := advanceClock(t, mockedClock) - assertEvalRun(t, evalAppliedCh, tick, expectedAlertRulesEvaluated...) - }) - expectedAlertRulesStopped := []models.AlertRuleKey{alerts[0].GetKey()} - t.Run(fmt.Sprintf("on 5th tick alert rules: %s should be stopped", concatenate(expectedAlertRulesStopped)), func(t *testing.T) { - assertStopRun(t, stopAppliedCh, expectedAlertRulesStopped...) - }) - - expectedAlertRulesEvaluated = []models.AlertRuleKey{alerts[1].GetKey()} - t.Run(fmt.Sprintf("on 6th tick alert rules: %s should be evaluated", concatenate(expectedAlertRulesEvaluated)), func(t *testing.T) { - tick := advanceClock(t, mockedClock) - assertEvalRun(t, evalAppliedCh, tick, expectedAlertRulesEvaluated...) - }) - - // create alert rule with one second interval - alerts = append(alerts, tests.CreateTestAlertRule(t, ctx, dbstore, 1, mainOrgID)) - - expectedAlertRulesEvaluated = []models.AlertRuleKey{alerts[2].GetKey()} - t.Run(fmt.Sprintf("on 7th tick alert rules: %s should be evaluated", concatenate(expectedAlertRulesEvaluated)), func(t *testing.T) { - tick := advanceClock(t, mockedClock) - assertEvalRun(t, evalAppliedCh, tick, expectedAlertRulesEvaluated...) - }) -} - -func assertEvalRun(t *testing.T, ch <-chan evalAppliedInfo, tick time.Time, keys ...models.AlertRuleKey) { - timeout := time.After(time.Second) - - expected := make(map[models.AlertRuleKey]struct{}, len(keys)) - for _, k := range keys { - expected[k] = struct{}{} - } - - for { - select { - case info := <-ch: - _, ok := expected[info.alertDefKey] - if !ok { - t.Fatalf("alert rule: %v should not have been evaluated at: %v", info.alertDefKey, info.now) - } - t.Logf("alert rule: %v evaluated at: %v", info.alertDefKey, info.now) - assert.Equal(t, tick, info.now) - delete(expected, info.alertDefKey) - case <-timeout: - if len(expected) == 0 { - return - } - t.Fatal("cycle has expired") - } - } -} - -func assertStopRun(t *testing.T, ch <-chan models.AlertRuleKey, keys ...models.AlertRuleKey) { - timeout := time.After(time.Second) - - expected := make(map[models.AlertRuleKey]struct{}, len(keys)) - for _, k := range keys { - expected[k] = struct{}{} - } - - for { - select { - case alertDefKey := <-ch: - _, ok := expected[alertDefKey] - t.Logf("alert rule: %v stopped", alertDefKey) - assert.True(t, ok) - delete(expected, alertDefKey) - if len(expected) == 0 { - return - } - case <-timeout: - if len(expected) == 0 { - return - } - t.Fatal("cycle has expired") - } - } -} - -func advanceClock(t *testing.T, mockedClock *clock.Mock) time.Time { - mockedClock.Add(time.Second) - return mockedClock.Now() - // t.Logf("Tick: %v", mockedClock.Now()) -} - -func concatenate(keys []models.AlertRuleKey) string { - s := make([]string, len(keys)) - for _, k := range keys { - s = append(s, k.String()) - } - return fmt.Sprintf("[%s]", strings.Join(s, ",")) -} diff --git a/pkg/services/ngalert/schedule/schedule_unit_test.go b/pkg/services/ngalert/schedule/schedule_unit_test.go index 15dfec3190e..d5706111ce9 100644 --- a/pkg/services/ngalert/schedule/schedule_unit_test.go +++ b/pkg/services/ngalert/schedule/schedule_unit_test.go @@ -18,6 +18,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" "github.com/grafana/grafana/pkg/expr" "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions" @@ -30,6 +31,163 @@ import ( "github.com/grafana/grafana/pkg/util" ) +type evalAppliedInfo struct { + alertDefKey models.AlertRuleKey + now time.Time +} + +func TestProcessTicks(t *testing.T) { + testMetrics := metrics.NewNGAlert(prometheus.NewPedanticRegistry()) + ctx := context.Background() + dispatcherGroup, ctx := errgroup.WithContext(ctx) + + ruleStore := newFakeRulesStore() + + cfg := setting.UnifiedAlertingSettings{ + BaseInterval: 1 * time.Second, + AdminConfigPollInterval: 10 * time.Minute, // do not poll in unit tests. + } + + const mainOrgID int64 = 1 + + mockedClock := clock.NewMock() + + notifier := &AlertsSenderMock{} + notifier.EXPECT().Send(mock.Anything, mock.Anything).Return() + + schedCfg := SchedulerCfg{ + Cfg: cfg, + C: mockedClock, + RuleStore: ruleStore, + Metrics: testMetrics.GetSchedulerMetrics(), + AlertSender: notifier, + } + st := state.NewManager(testMetrics.GetStateMetrics(), nil, nil, &image.NoopImageService{}, mockedClock, &state.FakeHistorian{}) + + appUrl := &url.URL{ + Scheme: "http", + Host: "localhost", + } + sched := NewScheduler(schedCfg, appUrl, st) + + evalAppliedCh := make(chan evalAppliedInfo, 1) + stopAppliedCh := make(chan models.AlertRuleKey, 1) + + sched.evalAppliedFunc = func(alertDefKey models.AlertRuleKey, now time.Time) { + evalAppliedCh <- evalAppliedInfo{alertDefKey: alertDefKey, now: now} + } + sched.stopAppliedFunc = func(alertDefKey models.AlertRuleKey) { + stopAppliedCh <- alertDefKey + } + + tick := time.Time{} + + // create alert rule under main org with one second interval + alertRule1 := models.AlertRuleGen(models.WithOrgID(mainOrgID), models.WithInterval(cfg.BaseInterval), models.WithTitle("rule-1"))() + ruleStore.PutRule(ctx, alertRule1) + + t.Run("on 1st tick alert rule should be evaluated", func(t *testing.T) { + tick = tick.Add(cfg.BaseInterval) + + scheduled, stopped := sched.processTick(ctx, dispatcherGroup, tick) + + require.Len(t, scheduled, 1) + require.Equal(t, alertRule1, scheduled[0].rule) + require.Equal(t, tick, scheduled[0].scheduledAt) + require.Emptyf(t, stopped, "None rules are expected to be stopped") + + assertEvalRun(t, evalAppliedCh, tick, alertRule1.GetKey()) + }) + + // add alert rule under main org with three base intervals + alertRule2 := models.AlertRuleGen(models.WithOrgID(mainOrgID), models.WithInterval(3*cfg.BaseInterval), models.WithTitle("rule-2"))() + ruleStore.PutRule(ctx, alertRule2) + + t.Run("on 2nd tick first alert rule should be evaluated", func(t *testing.T) { + tick = tick.Add(cfg.BaseInterval) + scheduled, stopped := sched.processTick(ctx, dispatcherGroup, tick) + + require.Len(t, scheduled, 1) + require.Equal(t, alertRule1, scheduled[0].rule) + require.Equal(t, tick, scheduled[0].scheduledAt) + require.Emptyf(t, stopped, "None rules are expected to be stopped") + assertEvalRun(t, evalAppliedCh, tick, alertRule1.GetKey()) + }) + + t.Run("on 3rd tick two alert rules should be evaluated", func(t *testing.T) { + tick = tick.Add(cfg.BaseInterval) + scheduled, stopped := sched.processTick(ctx, dispatcherGroup, tick) + require.Len(t, scheduled, 2) + var keys []models.AlertRuleKey + for _, item := range scheduled { + keys = append(keys, item.rule.GetKey()) + require.Equal(t, tick, item.scheduledAt) + } + require.Contains(t, keys, alertRule1.GetKey()) + require.Contains(t, keys, alertRule2.GetKey()) + + require.Emptyf(t, stopped, "None rules are expected to be stopped") + + assertEvalRun(t, evalAppliedCh, tick, keys...) + }) + + t.Run("on 4th tick only one alert rule should be evaluated", func(t *testing.T) { + tick = tick.Add(cfg.BaseInterval) + scheduled, stopped := sched.processTick(ctx, dispatcherGroup, tick) + + require.Len(t, scheduled, 1) + require.Equal(t, alertRule1, scheduled[0].rule) + require.Equal(t, tick, scheduled[0].scheduledAt) + require.Emptyf(t, stopped, "None rules are expected to be stopped") + + assertEvalRun(t, evalAppliedCh, tick, alertRule1.GetKey()) + }) + + t.Run("on 5th tick deleted rule should not be evaluated but stopped", func(t *testing.T) { + tick = tick.Add(cfg.BaseInterval) + + ruleStore.DeleteRule(alertRule1) + + scheduled, stopped := sched.processTick(ctx, dispatcherGroup, tick) + + require.Empty(t, scheduled) + require.Len(t, stopped, 1) + + require.Contains(t, stopped, alertRule1.GetKey()) + + assertStopRun(t, stopAppliedCh, alertRule1.GetKey()) + }) + + t.Run("on 6th tick one alert rule should be evaluated", func(t *testing.T) { + tick = tick.Add(cfg.BaseInterval) + + scheduled, stopped := sched.processTick(ctx, dispatcherGroup, tick) + + require.Len(t, scheduled, 1) + require.Equal(t, alertRule2, scheduled[0].rule) + require.Equal(t, tick, scheduled[0].scheduledAt) + require.Emptyf(t, stopped, "None rules are expected to be stopped") + + assertEvalRun(t, evalAppliedCh, tick, alertRule2.GetKey()) + }) + + t.Run("on 7th tick a new alert rule should be evaluated", func(t *testing.T) { + // create alert rule with one base interval + alertRule3 := models.AlertRuleGen(models.WithOrgID(mainOrgID), models.WithInterval(cfg.BaseInterval), models.WithTitle("rule-3"))() + ruleStore.PutRule(ctx, alertRule3) + tick = tick.Add(cfg.BaseInterval) + + scheduled, stopped := sched.processTick(ctx, dispatcherGroup, tick) + + require.Len(t, scheduled, 1) + require.Equal(t, alertRule3, scheduled[0].rule) + require.Equal(t, tick, scheduled[0].scheduledAt) + require.Emptyf(t, stopped, "None rules are expected to be stopped") + + assertEvalRun(t, evalAppliedCh, tick, alertRule3.GetKey()) + }) +} + func TestSchedule_ruleRoutine(t *testing.T) { createSchedule := func( evalAppliedChan chan time.Time, @@ -573,3 +731,60 @@ func withQueryForState(t *testing.T, evalResult eval.State) models.AlertRuleMuta rule.For = time.Duration(rule.IntervalSeconds*forMultimplier) * time.Second } } + +func assertEvalRun(t *testing.T, ch <-chan evalAppliedInfo, tick time.Time, keys ...models.AlertRuleKey) { + timeout := time.After(time.Second) + + expected := make(map[models.AlertRuleKey]struct{}, len(keys)) + for _, k := range keys { + expected[k] = struct{}{} + } + + for { + select { + case info := <-ch: + _, ok := expected[info.alertDefKey] + if !ok { + t.Fatalf("alert rule: %v should not have been evaluated at: %v", info.alertDefKey, info.now) + } + t.Logf("alert rule: %v evaluated at: %v", info.alertDefKey, info.now) + assert.Equal(t, tick, info.now) + delete(expected, info.alertDefKey) + if len(expected) == 0 { + return + } + case <-timeout: + if len(expected) == 0 { + return + } + t.Fatal("cycle has expired") + } + } +} + +func assertStopRun(t *testing.T, ch <-chan models.AlertRuleKey, keys ...models.AlertRuleKey) { + timeout := time.After(time.Second) + + expected := make(map[models.AlertRuleKey]struct{}, len(keys)) + for _, k := range keys { + expected[k] = struct{}{} + } + + for { + select { + case alertDefKey := <-ch: + _, ok := expected[alertDefKey] + t.Logf("alert rule: %v stopped", alertDefKey) + assert.True(t, ok) + delete(expected, alertDefKey) + if len(expected) == 0 { + return + } + case <-timeout: + if len(expected) == 0 { + return + } + t.Fatal("cycle has expired") + } + } +} diff --git a/pkg/services/ngalert/schedule/testing.go b/pkg/services/ngalert/schedule/testing.go index ef2c46b8292..803f084825c 100644 --- a/pkg/services/ngalert/schedule/testing.go +++ b/pkg/services/ngalert/schedule/testing.go @@ -71,6 +71,12 @@ func (f *fakeRulesStore) PutRule(_ context.Context, rules ...*models.AlertRule) } } +func (f *fakeRulesStore) DeleteRule(rules ...*models.AlertRule) { + for _, r := range rules { + delete(f.rules, r.UID) + } +} + func (f *fakeRulesStore) getNamespaceTitle(uid string) string { return "TEST-FOLDER-" + uid } diff --git a/pkg/services/ngalert/state/manager_test.go b/pkg/services/ngalert/state/manager_test.go index 58a9497fc70..7905d291e4d 100644 --- a/pkg/services/ngalert/state/manager_test.go +++ b/pkg/services/ngalert/state/manager_test.go @@ -10,6 +10,8 @@ import ( "time" "github.com/benbjohnson/clock" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" @@ -30,6 +32,92 @@ import ( var testMetrics = metrics.NewNGAlert(prometheus.NewPedanticRegistry()) +func TestWarmStateCache(t *testing.T) { + evaluationTime, err := time.Parse("2006-01-02", "2021-03-25") + require.NoError(t, err) + ctx := context.Background() + _, dbstore := tests.SetupTestEnv(t, 1) + + const mainOrgID int64 = 1 + rule := tests.CreateTestAlertRule(t, ctx, dbstore, 600, mainOrgID) + + expectedEntries := []*state.State{ + { + AlertRuleUID: rule.UID, + OrgID: rule.OrgID, + CacheID: `[["test1","testValue1"]]`, + Labels: data.Labels{"test1": "testValue1"}, + State: eval.Normal, + Results: []state.Evaluation{ + {EvaluationTime: evaluationTime, EvaluationState: eval.Normal}, + }, + StartsAt: evaluationTime.Add(-1 * time.Minute), + EndsAt: evaluationTime.Add(1 * time.Minute), + LastEvaluationTime: evaluationTime, + Annotations: map[string]string{"testAnnoKey": "testAnnoValue"}, + }, { + AlertRuleUID: rule.UID, + OrgID: rule.OrgID, + CacheID: `[["test2","testValue2"]]`, + Labels: data.Labels{"test2": "testValue2"}, + State: eval.Alerting, + Results: []state.Evaluation{ + {EvaluationTime: evaluationTime, EvaluationState: eval.Alerting}, + }, + StartsAt: evaluationTime.Add(-1 * time.Minute), + EndsAt: evaluationTime.Add(1 * time.Minute), + LastEvaluationTime: evaluationTime, + Annotations: map[string]string{"testAnnoKey": "testAnnoValue"}, + }, + } + + labels := models.InstanceLabels{"test1": "testValue1"} + _, hash, _ := labels.StringAndHash() + instance1 := models.AlertInstance{ + AlertInstanceKey: models.AlertInstanceKey{ + RuleOrgID: rule.OrgID, + RuleUID: rule.UID, + LabelsHash: hash, + }, + CurrentState: models.InstanceStateNormal, + LastEvalTime: evaluationTime, + CurrentStateSince: evaluationTime.Add(-1 * time.Minute), + CurrentStateEnd: evaluationTime.Add(1 * time.Minute), + Labels: labels, + } + + _ = dbstore.SaveAlertInstances(ctx, instance1) + + labels = models.InstanceLabels{"test2": "testValue2"} + _, hash, _ = labels.StringAndHash() + instance2 := models.AlertInstance{ + AlertInstanceKey: models.AlertInstanceKey{ + RuleOrgID: rule.OrgID, + RuleUID: rule.UID, + LabelsHash: hash, + }, + CurrentState: models.InstanceStateFiring, + LastEvalTime: evaluationTime, + CurrentStateSince: evaluationTime.Add(-1 * time.Minute), + CurrentStateEnd: evaluationTime.Add(1 * time.Minute), + Labels: labels, + } + _ = dbstore.SaveAlertInstances(ctx, instance2) + st := state.NewManager(testMetrics.GetStateMetrics(), nil, dbstore, &image.NoopImageService{}, clock.NewMock(), &state.FakeHistorian{}) + st.Warm(ctx, dbstore) + + t.Run("instance cache has expected entries", func(t *testing.T) { + for _, entry := range expectedEntries { + cacheEntry := st.Get(entry.OrgID, entry.AlertRuleUID, entry.CacheID) + + if diff := cmp.Diff(entry, cacheEntry, cmpopts.IgnoreFields(state.State{}, "Results")); diff != "" { + t.Errorf("Result mismatch (-want +got):\n%s", diff) + t.FailNow() + } + } + }) +} + func TestDashboardAnnotations(t *testing.T) { evaluationTime, err := time.Parse("2006-01-02", "2022-01-01") require.NoError(t, err)