From a081764fd88f40e31c115bf32e8835a2b09a79f7 Mon Sep 17 00:00:00 2001 From: Yuriy Tseretyan Date: Tue, 26 Jul 2022 09:40:06 -0400 Subject: [PATCH] Alerting: Scheduler to use AlertRule (#52354) * update GetAlertRulesForSchedulingQuery to have result AlertRule * update fetcher utils and registry to support AlertRule * alertRuleInfo to use alert rule instead of version * update updateCh hanlder of ruleRoutine to just clean up the state. The updated rule will be provided at the next evaluation * update evalCh handler of ruleRoutine to use rule from the message and clear state as well as update extra labels * remove unused function in ruleRoutine * remove unused model SchedulableAlertRule * store rule version in ruleRoutine instead of rule * do not call the sender if nothing to send --- pkg/services/ngalert/models/alert_rule.go | 18 +- pkg/services/ngalert/schedule/fetcher.go | 4 +- pkg/services/ngalert/schedule/fetcher_test.go | 10 +- pkg/services/ngalert/schedule/registry.go | 26 +- .../ngalert/schedule/registry_test.go | 40 +- pkg/services/ngalert/schedule/schedule.go | 96 ++- .../ngalert/schedule/schedule_unit_test.go | 569 +++++++----------- pkg/services/ngalert/store/alert_rule.go | 4 +- pkg/services/ngalert/store/testing.go | 9 +- 9 files changed, 287 insertions(+), 489 deletions(-) diff --git a/pkg/services/ngalert/models/alert_rule.go b/pkg/services/ngalert/models/alert_rule.go index 7c2d26b5b87..1f639b8ad8b 100644 --- a/pkg/services/ngalert/models/alert_rule.go +++ b/pkg/services/ngalert/models/alert_rule.go @@ -137,17 +137,6 @@ type AlertRule struct { Labels map[string]string } -type SchedulableAlertRule struct { - Title string - UID string `xorm:"uid"` - OrgID int64 `xorm:"org_id"` - IntervalSeconds int64 - Version int64 - NamespaceUID string `xorm:"namespace_uid"` - RuleGroup string - RuleGroupIndex int `xorm:"rule_group_idx"` -} - type LabelOption func(map[string]string) func WithoutInternalLabels() LabelOption { @@ -228,11 +217,6 @@ func (alertRule *AlertRule) GetGroupKey() AlertRuleGroupKey { return AlertRuleGroupKey{OrgID: alertRule.OrgID, NamespaceUID: alertRule.NamespaceUID, RuleGroup: alertRule.RuleGroup} } -// GetKey returns the alert definitions identifier -func (alertRule *SchedulableAlertRule) GetKey() AlertRuleKey { - return AlertRuleKey{OrgID: alertRule.OrgID, UID: alertRule.UID} -} - // PreSave sets default values and loads the updated model for each alert query. func (alertRule *AlertRule) PreSave(timeNow func() time.Time) error { for i, q := range alertRule.Data { @@ -316,7 +300,7 @@ type ListAlertRulesQuery struct { } type GetAlertRulesForSchedulingQuery struct { - Result []*SchedulableAlertRule + Result []*AlertRule } // ListNamespaceAlertRulesQuery is the query for listing namespace alert rules diff --git a/pkg/services/ngalert/schedule/fetcher.go b/pkg/services/ngalert/schedule/fetcher.go index 0c9b2b9b608..02f8aa07e39 100644 --- a/pkg/services/ngalert/schedule/fetcher.go +++ b/pkg/services/ngalert/schedule/fetcher.go @@ -13,7 +13,7 @@ import ( // hashUIDs returns a fnv64 hash of the UIDs for all alert rules. // The order of the alert rules does not matter as hashUIDs sorts // the UIDs in increasing order. -func hashUIDs(alertRules []*models.SchedulableAlertRule) uint64 { +func hashUIDs(alertRules []*models.AlertRule) uint64 { h := fnv.New64() for _, uid := range sortedUIDs(alertRules) { // We can ignore err as fnv64 does not return an error @@ -24,7 +24,7 @@ func hashUIDs(alertRules []*models.SchedulableAlertRule) uint64 { } // sortedUIDs returns a slice of sorted UIDs. -func sortedUIDs(alertRules []*models.SchedulableAlertRule) []string { +func sortedUIDs(alertRules []*models.AlertRule) []string { uids := make([]string, 0, len(alertRules)) for _, alertRule := range alertRules { uids = append(uids, alertRule.UID) diff --git a/pkg/services/ngalert/schedule/fetcher_test.go b/pkg/services/ngalert/schedule/fetcher_test.go index 1a9e597702b..6d93c4237af 100644 --- a/pkg/services/ngalert/schedule/fetcher_test.go +++ b/pkg/services/ngalert/schedule/fetcher_test.go @@ -9,18 +9,18 @@ import ( ) func TestHashUIDs(t *testing.T) { - r := []*models.SchedulableAlertRule{{UID: "foo"}, {UID: "bar"}} + r := []*models.AlertRule{{UID: "foo"}, {UID: "bar"}} assert.Equal(t, uint64(0xade76f55c76a1c48), hashUIDs(r)) // expect the same hash irrespective of order - r = []*models.SchedulableAlertRule{{UID: "bar"}, {UID: "foo"}} + r = []*models.AlertRule{{UID: "bar"}, {UID: "foo"}} assert.Equal(t, uint64(0xade76f55c76a1c48), hashUIDs(r)) // expect a different hash - r = []*models.SchedulableAlertRule{{UID: "bar"}} + r = []*models.AlertRule{{UID: "bar"}} assert.Equal(t, uint64(0xd8d9a5186bad3880), hashUIDs(r)) // slice with no items - r = []*models.SchedulableAlertRule{} + r = []*models.AlertRule{} assert.Equal(t, uint64(0xcbf29ce484222325), hashUIDs(r)) // a different slice with no items should have the same hash - r = []*models.SchedulableAlertRule{} + r = []*models.AlertRule{} assert.Equal(t, uint64(0xcbf29ce484222325), hashUIDs(r)) } diff --git a/pkg/services/ngalert/schedule/registry.go b/pkg/services/ngalert/schedule/registry.go index e06d47b3110..61e14511baf 100644 --- a/pkg/services/ngalert/schedule/registry.go +++ b/pkg/services/ngalert/schedule/registry.go @@ -92,7 +92,7 @@ func newAlertRuleInfo(parent context.Context) *alertRuleInfo { // - true when message was sent // - false when the send operation is stopped // the second element contains a dropped message that was sent by a concurrent sender. -func (a *alertRuleInfo) eval(t time.Time, version int64) (bool, *evaluation) { +func (a *alertRuleInfo) eval(t time.Time, rule *models.AlertRule) (bool, *evaluation) { // read the channel in unblocking manner to make sure that there is no concurrent send operation. var droppedMsg *evaluation select { @@ -103,7 +103,7 @@ func (a *alertRuleInfo) eval(t time.Time, version int64) (bool, *evaluation) { select { case a.evalCh <- &evaluation{ scheduledAt: t, - version: version, + rule: rule, }: return true, droppedMsg case <-a.ctx.Done(): @@ -136,52 +136,52 @@ func (a *alertRuleInfo) update(lastVersion ruleVersion) bool { type evaluation struct { scheduledAt time.Time - version int64 + rule *models.AlertRule } -type schedulableAlertRulesRegistry struct { - rules map[models.AlertRuleKey]*models.SchedulableAlertRule +type alertRulesRegistry struct { + rules map[models.AlertRuleKey]*models.AlertRule mu sync.Mutex } // all returns all rules in the registry. -func (r *schedulableAlertRulesRegistry) all() []*models.SchedulableAlertRule { +func (r *alertRulesRegistry) all() []*models.AlertRule { r.mu.Lock() defer r.mu.Unlock() - result := make([]*models.SchedulableAlertRule, 0, len(r.rules)) + result := make([]*models.AlertRule, 0, len(r.rules)) for _, rule := range r.rules { result = append(result, rule) } return result } -func (r *schedulableAlertRulesRegistry) get(k models.AlertRuleKey) *models.SchedulableAlertRule { +func (r *alertRulesRegistry) get(k models.AlertRuleKey) *models.AlertRule { r.mu.Lock() defer r.mu.Unlock() return r.rules[k] } // set replaces all rules in the registry. -func (r *schedulableAlertRulesRegistry) set(rules []*models.SchedulableAlertRule) { +func (r *alertRulesRegistry) set(rules []*models.AlertRule) { r.mu.Lock() defer r.mu.Unlock() - r.rules = make(map[models.AlertRuleKey]*models.SchedulableAlertRule) + r.rules = make(map[models.AlertRuleKey]*models.AlertRule) for _, rule := range rules { r.rules[rule.GetKey()] = rule } } // update inserts or replaces a rule in the registry. -func (r *schedulableAlertRulesRegistry) update(rule *models.SchedulableAlertRule) { +func (r *alertRulesRegistry) update(rule *models.AlertRule) { r.mu.Lock() defer r.mu.Unlock() r.rules[rule.GetKey()] = rule } -// del removes pair that has specific key from schedulableAlertRulesRegistry. +// del removes pair that has specific key from alertRulesRegistry. // Returns 2-tuple where the first element is value of the removed pair // and the second element indicates whether element with the specified key existed. -func (r *schedulableAlertRulesRegistry) del(k models.AlertRuleKey) (*models.SchedulableAlertRule, bool) { +func (r *alertRulesRegistry) del(k models.AlertRuleKey) (*models.AlertRule, bool) { r.mu.Lock() defer r.mu.Unlock() rule, ok := r.rules[k] diff --git a/pkg/services/ngalert/schedule/registry_test.go b/pkg/services/ngalert/schedule/registry_test.go index b29d5269706..f0fbf91c045 100644 --- a/pkg/services/ngalert/schedule/registry_test.go +++ b/pkg/services/ngalert/schedule/registry_test.go @@ -91,14 +91,14 @@ func TestSchedule_alertRuleInfo(t *testing.T) { r := newAlertRuleInfo(context.Background()) expected := time.Now() resultCh := make(chan evalResponse) - version := rand.Int63() + rule := models.AlertRuleGen()() go func() { - result, dropped := r.eval(expected, version) + result, dropped := r.eval(expected, rule) resultCh <- evalResponse{result, dropped} }() select { case ctx := <-r.evalCh: - require.Equal(t, version, ctx.version) + require.Equal(t, rule, ctx.rule) require.Equal(t, expected, ctx.scheduledAt) result := <-resultCh require.True(t, result.success) @@ -113,12 +113,12 @@ func TestSchedule_alertRuleInfo(t *testing.T) { time2 := time.UnixMilli(rand.Int63n(math.MaxInt64)) resultCh1 := make(chan evalResponse) resultCh2 := make(chan evalResponse) - version := rand.Int63() + rule := models.AlertRuleGen()() wg := sync.WaitGroup{} wg.Add(1) go func() { wg.Done() - result, dropped := r.eval(time1, version) + result, dropped := r.eval(time1, rule) wg.Done() resultCh1 <- evalResponse{result, dropped} }() @@ -126,7 +126,7 @@ func TestSchedule_alertRuleInfo(t *testing.T) { wg.Add(2) // one when time1 is sent, another when go-routine for time2 has started go func() { wg.Done() - result, dropped := r.eval(time2, version) + result, dropped := r.eval(time2, rule) resultCh2 <- evalResponse{result, dropped} }() wg.Wait() // at this point tick 1 has already been dropped @@ -147,8 +147,9 @@ func TestSchedule_alertRuleInfo(t *testing.T) { t.Run("eval should exit when context is cancelled", func(t *testing.T) { r := newAlertRuleInfo(context.Background()) resultCh := make(chan evalResponse) + rule := models.AlertRuleGen()() go func() { - result, dropped := r.eval(time.Now(), rand.Int63()) + result, dropped := r.eval(time.Now(), rule) resultCh <- evalResponse{result, dropped} }() runtime.Gosched() @@ -171,7 +172,8 @@ func TestSchedule_alertRuleInfo(t *testing.T) { t.Run("eval should do nothing", func(t *testing.T) { r := newAlertRuleInfo(context.Background()) r.stop() - success, dropped := r.eval(time.Now(), rand.Int63()) + rule := models.AlertRuleGen()() + success, dropped := r.eval(time.Now(), rule) require.False(t, success) require.Nilf(t, dropped, "expected no dropped evaluations but got one") }) @@ -209,7 +211,7 @@ func TestSchedule_alertRuleInfo(t *testing.T) { case 1: r.update(ruleVersion(rand.Int63())) case 2: - r.eval(time.Now(), rand.Int63()) + r.eval(time.Now(), models.AlertRuleGen()()) case 3: r.stop() } @@ -223,39 +225,39 @@ func TestSchedule_alertRuleInfo(t *testing.T) { } func TestSchedulableAlertRulesRegistry(t *testing.T) { - r := schedulableAlertRulesRegistry{rules: make(map[models.AlertRuleKey]*models.SchedulableAlertRule)} + r := alertRulesRegistry{rules: make(map[models.AlertRuleKey]*models.AlertRule)} assert.Len(t, r.all(), 0) // replace all rules in the registry with foo - r.set([]*models.SchedulableAlertRule{{OrgID: 1, UID: "foo", Version: 1}}) + r.set([]*models.AlertRule{{OrgID: 1, UID: "foo", Version: 1}}) assert.Len(t, r.all(), 1) foo := r.get(models.AlertRuleKey{OrgID: 1, UID: "foo"}) require.NotNil(t, foo) - assert.Equal(t, models.SchedulableAlertRule{OrgID: 1, UID: "foo", Version: 1}, *foo) + assert.Equal(t, models.AlertRule{OrgID: 1, UID: "foo", Version: 1}, *foo) // update foo to a newer version - r.update(&models.SchedulableAlertRule{OrgID: 1, UID: "foo", Version: 2}) + r.update(&models.AlertRule{OrgID: 1, UID: "foo", Version: 2}) assert.Len(t, r.all(), 1) foo = r.get(models.AlertRuleKey{OrgID: 1, UID: "foo"}) require.NotNil(t, foo) - assert.Equal(t, models.SchedulableAlertRule{OrgID: 1, UID: "foo", Version: 2}, *foo) + assert.Equal(t, models.AlertRule{OrgID: 1, UID: "foo", Version: 2}, *foo) // update bar which does not exist in the registry - r.update(&models.SchedulableAlertRule{OrgID: 1, UID: "bar", Version: 1}) + r.update(&models.AlertRule{OrgID: 1, UID: "bar", Version: 1}) assert.Len(t, r.all(), 2) foo = r.get(models.AlertRuleKey{OrgID: 1, UID: "foo"}) require.NotNil(t, foo) - assert.Equal(t, models.SchedulableAlertRule{OrgID: 1, UID: "foo", Version: 2}, *foo) + assert.Equal(t, models.AlertRule{OrgID: 1, UID: "foo", Version: 2}, *foo) bar := r.get(models.AlertRuleKey{OrgID: 1, UID: "bar"}) require.NotNil(t, foo) - assert.Equal(t, models.SchedulableAlertRule{OrgID: 1, UID: "bar", Version: 1}, *bar) + assert.Equal(t, models.AlertRule{OrgID: 1, UID: "bar", Version: 1}, *bar) // replace all rules in the registry with baz - r.set([]*models.SchedulableAlertRule{{OrgID: 1, UID: "baz", Version: 1}}) + r.set([]*models.AlertRule{{OrgID: 1, UID: "baz", Version: 1}}) assert.Len(t, r.all(), 1) baz := r.get(models.AlertRuleKey{OrgID: 1, UID: "baz"}) require.NotNil(t, baz) - assert.Equal(t, models.SchedulableAlertRule{OrgID: 1, UID: "baz", Version: 1}, *baz) + assert.Equal(t, models.AlertRule{OrgID: 1, UID: "baz", Version: 1}, *baz) assert.Nil(t, r.get(models.AlertRuleKey{OrgID: 1, UID: "foo"})) assert.Nil(t, r.get(models.AlertRuleKey{OrgID: 1, UID: "bar"})) diff --git a/pkg/services/ngalert/schedule/schedule.go b/pkg/services/ngalert/schedule/schedule.go index 07467ac45a5..a92f99131e7 100644 --- a/pkg/services/ngalert/schedule/schedule.go +++ b/pkg/services/ngalert/schedule/schedule.go @@ -96,7 +96,7 @@ type schedule struct { // evaluation in the current tick. The evaluation of an alert rule in the // current tick depends on its evaluation interval and when it was // last evaluated. - schedulableAlertRules schedulableAlertRulesRegistry + schedulableAlertRules alertRulesRegistry // bus is used to hook into events that should cause rule updates. bus bus.Bus @@ -137,7 +137,7 @@ func NewScheduler(cfg SchedulerCfg, appURL *url.URL, stateManager *state.Manager disableGrafanaFolder: cfg.Cfg.ReservedLabels.IsReservedLabelDisabled(ngmodels.FolderTitleLabel), stateManager: stateManager, minRuleInterval: cfg.Cfg.MinInterval, - schedulableAlertRules: schedulableAlertRulesRegistry{rules: make(map[ngmodels.AlertRuleKey]*ngmodels.SchedulableAlertRule)}, + schedulableAlertRules: alertRulesRegistry{rules: make(map[ngmodels.AlertRuleKey]*ngmodels.AlertRule)}, bus: bus, alertsSender: cfg.AlertSender, } @@ -240,16 +240,13 @@ func (sch *schedule) schedulePeriodic(ctx context.Context) error { sch.metrics.SchedulableAlertRulesHash.Set(float64(hashUIDs(alertRules))) type readyToRunItem struct { - key ngmodels.AlertRuleKey - ruleName string ruleInfo *alertRuleInfo - version int64 + rule *ngmodels.AlertRule } readyToRun := make([]readyToRunItem, 0) for _, item := range alertRules { key := item.GetKey() - itemVersion := item.Version ruleInfo, newRoutine := sch.registry.getOrCreateInfo(ctx, key) // enforce minimum evaluation interval @@ -275,7 +272,7 @@ func (sch *schedule) schedulePeriodic(ctx context.Context) error { itemFrequency := item.IntervalSeconds / int64(sch.baseInterval.Seconds()) if item.IntervalSeconds != 0 && tickNum%itemFrequency == 0 { - readyToRun = append(readyToRun, readyToRunItem{key: key, ruleName: item.Title, ruleInfo: ruleInfo, version: itemVersion}) + readyToRun = append(readyToRun, readyToRunItem{ruleInfo: ruleInfo, rule: item}) } // remove the alert rule from the registered alert rules @@ -291,15 +288,16 @@ func (sch *schedule) schedulePeriodic(ctx context.Context) error { item := readyToRun[i] time.AfterFunc(time.Duration(int64(i)*step), func() { - success, dropped := item.ruleInfo.eval(tick, item.version) + key := item.rule.GetKey() + success, dropped := item.ruleInfo.eval(tick, item.rule) if !success { - sch.log.Debug("scheduled evaluation was canceled because evaluation routine was stopped", "uid", item.key.UID, "org", item.key.OrgID, "time", tick) + sch.log.Debug("scheduled evaluation was canceled because evaluation routine was stopped", "uid", key.UID, "org", key.OrgID, "time", tick) return } if dropped != nil { - sch.log.Warn("Alert rule evaluation is too slow - dropped tick", "uid", item.key.UID, "org", item.key.OrgID, "time", tick) - orgID := fmt.Sprint(item.key.OrgID) - sch.metrics.EvaluationMissed.WithLabelValues(orgID, item.ruleName).Inc() + sch.log.Warn("Alert rule evaluation is too slow - dropped tick", "uid", key.UID, "org", key.OrgID, "time", tick) + orgID := fmt.Sprint(key.OrgID) + sch.metrics.EvaluationMissed.WithLabelValues(orgID, item.rule.Title).Inc() } }) } @@ -341,31 +339,16 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR states := sch.stateManager.GetStatesForRuleUID(key.OrgID, key.UID) expiredAlerts := FromAlertsStateToStoppedAlert(states, sch.appURL, sch.clock) sch.stateManager.RemoveByRuleUID(key.OrgID, key.UID) - sch.alertsSender.Send(key, expiredAlerts) + if len(expiredAlerts.PostableAlerts) > 0 { + sch.alertsSender.Send(key, expiredAlerts) + } } - updateRule := func(ctx context.Context, oldRule *ngmodels.AlertRule) (*ngmodels.AlertRule, map[string]string, error) { - q := ngmodels.GetAlertRuleByUIDQuery{OrgID: key.OrgID, UID: key.UID} - err := sch.ruleStore.GetAlertRuleByUID(ctx, &q) - if err != nil { - logger.Error("failed to fetch alert rule", "err", err) - return nil, nil, err - } - if oldRule != nil && oldRule.Version < q.Result.Version { - clearState() - } - newLabels, err := sch.getRuleExtraLabels(ctx, q.Result) - if err != nil { - return nil, nil, err - } - return q.Result, newLabels, nil - } - - evaluate := func(ctx context.Context, r *ngmodels.AlertRule, extraLabels map[string]string, attempt int64, e *evaluation) { - logger := logger.New("version", r.Version, "attempt", attempt, "now", e.scheduledAt) + evaluate := func(ctx context.Context, extraLabels map[string]string, attempt int64, e *evaluation) { + logger := logger.New("version", e.rule.Version, "attempt", attempt, "now", e.scheduledAt) start := sch.clock.Now() - results := sch.evaluator.ConditionEval(ctx, r.GetEvalCondition(), e.scheduledAt) + results := sch.evaluator.ConditionEval(ctx, e.rule.GetEvalCondition(), e.scheduledAt) dur := sch.clock.Now().Sub(start) evalTotal.Inc() evalDuration.Observe(dur.Seconds()) @@ -376,10 +359,12 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR logger.Debug("alert rule evaluated", "results", results, "duration", dur) } - processedStates := sch.stateManager.ProcessEvalResults(ctx, e.scheduledAt, r, results, extraLabels) + processedStates := sch.stateManager.ProcessEvalResults(ctx, e.scheduledAt, e.rule, results, extraLabels) sch.saveAlertStates(ctx, processedStates) alerts := FromAlertStateToPostableAlerts(processedStates, sch.stateManager, sch.appURL) - sch.alertsSender.Send(key, alerts) + if len(alerts.PostableAlerts) > 0 { + sch.alertsSender.Send(key, alerts) + } } retryIfError := func(f func(attempt int64) error) error { @@ -395,35 +380,24 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR } evalRunning := false - var currentRule *ngmodels.AlertRule + var currentRuleVersion int64 = 0 var extraLabels map[string]string defer sch.stopApplied(key) for { select { // used by external services (API) to notify that rule is updated. - case version := <-updateCh: + case lastVersion := <-updateCh: // sometimes it can happen when, for example, the rule evaluation took so long, // and there were two concurrent messages in updateCh and evalCh, and the eval's one got processed first. // therefore, at the time when message from updateCh is processed the current rule will have // at least the same version (or greater) and the state created for the new version of the rule. - if currentRule != nil && int64(version) <= currentRule.Version { - logger.Info("skip updating rule because its current version is actual", "current_version", currentRule.Version, "new_version", version) + if currentRuleVersion >= int64(lastVersion) { + logger.Info("skip updating rule because its current version is actual", "version", currentRuleVersion, "new_version", lastVersion) continue } - logger.Info("fetching new version of the rule") - err := retryIfError(func(attempt int64) error { - newRule, newExtraLabels, err := updateRule(grafanaCtx, currentRule) - if err != nil { - return err - } - logger.Debug("new alert rule version fetched", "title", newRule.Title, "version", newRule.Version) - currentRule = newRule - extraLabels = newExtraLabels - return nil - }) - if err != nil { - logger.Error("updating rule failed after all retries", "err", err) - } + logger.Info("clearing the state of the rule because version has changed", "version", currentRuleVersion, "new_version", lastVersion) + // clear the state. So the next evaluation will start from the scratch. + clearState() // evalCh - used by the scheduler to signal that evaluation is needed. case ctx, ok := <-evalCh: if !ok { @@ -442,17 +416,21 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR }() err := retryIfError(func(attempt int64) error { + newVersion := ctx.rule.Version // fetch latest alert rule version - if currentRule == nil || currentRule.Version < ctx.version { - newRule, newExtraLabels, err := updateRule(grafanaCtx, currentRule) + if currentRuleVersion != newVersion { + if currentRuleVersion > 0 { // do not clean up state if the eval loop has just started. + logger.Debug("got a new version of alert rule. Clear up the state and refresh extra labels", "version", currentRuleVersion, "new_version", newVersion) + clearState() + } + newLabels, err := sch.getRuleExtraLabels(grafanaCtx, ctx.rule) if err != nil { return err } - currentRule = newRule - extraLabels = newExtraLabels - logger.Debug("new alert rule version fetched", "title", newRule.Title, "version", newRule.Version) + currentRuleVersion = newVersion + extraLabels = newLabels } - evaluate(grafanaCtx, currentRule, extraLabels, attempt, ctx) + evaluate(grafanaCtx, extraLabels, attempt, ctx) return nil }) if err != nil { diff --git a/pkg/services/ngalert/schedule/schedule_unit_test.go b/pkg/services/ngalert/schedule/schedule_unit_test.go index f54f6700371..2de997e6e2c 100644 --- a/pkg/services/ngalert/schedule/schedule_unit_test.go +++ b/pkg/services/ngalert/schedule/schedule_unit_test.go @@ -4,11 +4,9 @@ import ( "bytes" "context" "encoding/json" - "errors" "fmt" "math/rand" "net/url" - "sync" "testing" "time" @@ -48,7 +46,7 @@ func TestSchedule_ruleRoutine(t *testing.T) { instanceStore := &store.FakeInstanceStore{} registry := prometheus.NewPedanticRegistry() - sch, _ := setupScheduler(t, ruleStore, instanceStore, registry, senderMock) + sch := setupScheduler(t, ruleStore, instanceStore, registry, senderMock, nil) sch.evalAppliedFunc = func(key models.AlertRuleKey, t time.Time) { evalAppliedChan <- t } @@ -58,10 +56,6 @@ func TestSchedule_ruleRoutine(t *testing.T) { // normal states do not include NoData and Error because currently it is not possible to perform any sensible test normalStates := []eval.State{eval.Normal, eval.Alerting, eval.Pending} allStates := [...]eval.State{eval.Normal, eval.Alerting, eval.Pending, eval.NoData, eval.Error} - randomNormalState := func() eval.State { - // pick only supported cases - return normalStates[rand.Intn(3)] - } for _, evalState := range normalStates { // TODO rewrite when we are able to mock/fake state manager @@ -70,7 +64,8 @@ func TestSchedule_ruleRoutine(t *testing.T) { evalAppliedChan := make(chan time.Time) sch, ruleStore, instanceStore, reg := createSchedule(evalAppliedChan, nil) - rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), evalState) + rule := models.AlertRuleGen(withQueryForState(t, evalState))() + ruleStore.PutRule(context.Background(), rule) go func() { ctx, cancel := context.WithCancel(context.Background()) @@ -82,38 +77,28 @@ func TestSchedule_ruleRoutine(t *testing.T) { evalChan <- &evaluation{ scheduledAt: expectedTime, - version: rule.Version, + rule: rule, } actualTime := waitForTimeChannel(t, evalAppliedChan) require.Equal(t, expectedTime, actualTime) - t.Run("it should get rule from database when run the first time", func(t *testing.T) { - queries := make([]models.GetAlertRuleByUIDQuery, 0) - for _, op := range ruleStore.RecordedOps { - switch q := op.(type) { - case models.GetAlertRuleByUIDQuery: - queries = append(queries, q) - } - } - require.NotEmptyf(t, queries, "Expected a %T request to rule store but nothing was recorded", models.GetAlertRuleByUIDQuery{}) - require.Len(t, queries, 1, "Expected exactly one request of %T but got %d", models.GetAlertRuleByUIDQuery{}, len(queries)) - require.Equal(t, rule.UID, queries[0].UID) - require.Equal(t, rule.OrgID, queries[0].OrgID) - }) - t.Run("it should get rule folder title from database and attach as label", func(t *testing.T) { + t.Run("it should add extra labels", func(t *testing.T) { states := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID) + folder, _ := ruleStore.GetNamespaceByUID(context.Background(), rule.NamespaceUID, rule.OrgID, nil) for _, s := range states { - require.NotEmptyf(t, s.Labels[models.FolderTitleLabel], "Expected a non-empty title in label %s", models.FolderTitleLabel) - require.Equal(t, s.Labels[models.FolderTitleLabel], ruleStore.Folders[rule.OrgID][0].Title) + assert.Equal(t, rule.UID, s.Labels[models.RuleUIDLabel]) + assert.Equal(t, rule.NamespaceUID, s.Labels[models.NamespaceUIDLabel]) + assert.Equal(t, rule.Title, s.Labels[prometheusModel.AlertNameLabel]) + assert.Equal(t, folder.Title, s.Labels[models.FolderTitleLabel]) } }) + t.Run("it should process evaluation results via state manager", func(t *testing.T) { // TODO rewrite when we are able to mock/fake state manager states := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID) require.Len(t, states, 1) s := states[0] - t.Logf("State: %v", s) require.Equal(t, rule.UID, s.AlertRuleUID) require.Len(t, s.Results, 1) var expectedStatus = evalState @@ -148,6 +133,7 @@ func TestSchedule_ruleRoutine(t *testing.T) { require.Equal(t, evalState.String(), string(cmd.State)) require.Equal(t, s.Labels, data.Labels(cmd.Labels)) }) + t.Run("it reports metrics", func(t *testing.T) { // duration metric has 0 values because of mocked clock that do not advance expectedMetric := fmt.Sprintf( @@ -201,265 +187,80 @@ func TestSchedule_ruleRoutine(t *testing.T) { }) }) - t.Run("should fetch rule from database only if new version is greater than current", func(t *testing.T) { + t.Run("when a message is sent to update channel", func(t *testing.T) { + rule := models.AlertRuleGen(withQueryForState(t, eval.Normal))() + evalChan := make(chan *evaluation) evalAppliedChan := make(chan time.Time) + updateChan := make(chan ruleVersion) - ctx := context.Background() - sch, ruleStore, _, _ := createSchedule(evalAppliedChan, nil) + sender := AlertsSenderMock{} + sender.EXPECT().Send(rule.GetKey(), mock.Anything).Return() - rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), randomNormalState()) + sch, ruleStore, _, _ := createSchedule(evalAppliedChan, &sender) + ruleStore.PutRule(context.Background(), rule) go func() { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersion)) + _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, updateChan) }() - expectedTime := time.UnixMicro(rand.Int63()) + // init evaluation loop so it got the rule version evalChan <- &evaluation{ - scheduledAt: expectedTime, - version: rule.Version, + scheduledAt: sch.clock.Now(), + rule: rule, } - actualTime := waitForTimeChannel(t, evalAppliedChan) - require.Equal(t, expectedTime, actualTime) + waitForTimeChannel(t, evalAppliedChan) - // Now update the rule - newRule := *rule - newRule.Version++ - ruleStore.PutRule(ctx, &newRule) - - // and call with new version - expectedTime = expectedTime.Add(time.Duration(rand.Intn(10)) * time.Second) - evalChan <- &evaluation{ - scheduledAt: expectedTime, - version: newRule.Version, - } - - actualTime = waitForTimeChannel(t, evalAppliedChan) - require.Equal(t, expectedTime, actualTime) - - queries := make([]models.GetAlertRuleByUIDQuery, 0) - for _, op := range ruleStore.RecordedOps { - switch q := op.(type) { - case models.GetAlertRuleByUIDQuery: - queries = append(queries, q) - } - } - require.Len(t, queries, 2, "Expected exactly two request of %T", models.GetAlertRuleByUIDQuery{}) - require.Equal(t, rule.UID, queries[0].UID) - require.Equal(t, rule.OrgID, queries[0].OrgID) - require.Equal(t, rule.UID, queries[1].UID) - require.Equal(t, rule.OrgID, queries[1].OrgID) - }) - - t.Run("should not fetch rule if version is equal or less than current", func(t *testing.T) { - evalChan := make(chan *evaluation) - evalAppliedChan := make(chan time.Time) - - sch, ruleStore, _, _ := createSchedule(evalAppliedChan, nil) - - rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), randomNormalState()) - - go func() { - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersion)) - }() - - expectedTime := time.UnixMicro(rand.Int63()) - evalChan <- &evaluation{ - scheduledAt: expectedTime, - version: rule.Version, - } - - actualTime := waitForTimeChannel(t, evalAppliedChan) - require.Equal(t, expectedTime, actualTime) - - // try again with the same version - expectedTime = expectedTime.Add(time.Duration(rand.Intn(10)) * time.Second) - evalChan <- &evaluation{ - scheduledAt: expectedTime, - version: rule.Version, - } - actualTime = waitForTimeChannel(t, evalAppliedChan) - require.Equal(t, expectedTime, actualTime) - - expectedTime = expectedTime.Add(time.Duration(rand.Intn(10)) * time.Second) - evalChan <- &evaluation{ - scheduledAt: expectedTime, - version: rule.Version - 1, - } - actualTime = waitForTimeChannel(t, evalAppliedChan) - require.Equal(t, expectedTime, actualTime) - - queries := make([]models.GetAlertRuleByUIDQuery, 0) - for _, op := range ruleStore.RecordedOps { - switch q := op.(type) { - case models.GetAlertRuleByUIDQuery: - queries = append(queries, q) - } - } - require.Len(t, queries, 1, "Expected exactly one request of %T", models.GetAlertRuleByUIDQuery{}) - }) - - t.Run("when update channel is not empty", func(t *testing.T) { - t.Run("should fetch the alert rule from database", func(t *testing.T) { - evalChan := make(chan *evaluation) - evalAppliedChan := make(chan time.Time) - updateChan := make(chan ruleVersion) - - sch, ruleStore, _, _ := createSchedule(evalAppliedChan, nil) - - rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), eval.Alerting) // we want the alert to fire - - go func() { - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, updateChan) - }() - updateChan <- ruleVersion(rule.Version) - - // wait for command to be executed - var queries []interface{} - require.Eventuallyf(t, func() bool { - queries = ruleStore.GetRecordedCommands(func(cmd interface{}) (interface{}, bool) { - c, ok := cmd.(models.GetAlertRuleByUIDQuery) - return c, ok + // define some state + states := make([]*state.State, 0, len(allStates)) + for _, s := range allStates { + for i := 0; i < 2; i++ { + states = append(states, &state.State{ + AlertRuleUID: rule.UID, + CacheId: util.GenerateShortUID(), + OrgID: rule.OrgID, + State: s, + StartsAt: sch.clock.Now(), + EndsAt: sch.clock.Now().Add(time.Duration(rand.Intn(25)+5) * time.Second), + Labels: rule.Labels, }) - return len(queries) == 1 - }, 5*time.Second, 100*time.Millisecond, "Expected command a single %T to be recorded. All recordings: %#v", models.GetAlertRuleByUIDQuery{}, ruleStore.RecordedOps) - - m := queries[0].(models.GetAlertRuleByUIDQuery) - require.Equal(t, rule.UID, m.UID) - require.Equal(t, rule.OrgID, m.OrgID) - - // now call evaluation loop to make sure that the rule was persisted - evalChan <- &evaluation{ - scheduledAt: time.UnixMicro(rand.Int63()), - version: rule.Version, } - waitForTimeChannel(t, evalAppliedChan) + } + sch.stateManager.Put(states) - queries = ruleStore.GetRecordedCommands(func(cmd interface{}) (interface{}, bool) { - c, ok := cmd.(models.GetAlertRuleByUIDQuery) - return c, ok - }) - require.Lenf(t, queries, 1, "evaluation loop requested a rule from database but it should not be") + states = sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID) + expectedToBeSent := 0 + for _, s := range states { + if s.State == eval.Normal || s.State == eval.Pending { + continue + } + expectedToBeSent++ + } + require.Greaterf(t, expectedToBeSent, 0, "State manger was expected to return at least one state that can be expired") + + t.Run("should do nothing if version in channel is the same", func(t *testing.T) { + updateChan <- ruleVersion(rule.Version - 1) + updateChan <- ruleVersion(rule.Version) + updateChan <- ruleVersion(rule.Version) // second time just to make sure that previous messages were handled + + actualStates := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID) + require.Len(t, actualStates, len(states)) + + sender.AssertNotCalled(t, "Send", mock.Anything, mock.Anything) }) - t.Run("should retry when database fails", func(t *testing.T) { - evalAppliedChan := make(chan time.Time) - updateChan := make(chan ruleVersion) - - sch, ruleStore, _, _ := createSchedule(evalAppliedChan, nil) - sch.maxAttempts = rand.Int63n(4) + 1 - - rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), randomNormalState()) - - go func() { - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - _ = sch.ruleRoutine(ctx, rule.GetKey(), make(chan *evaluation), updateChan) - }() - - ruleStore.Hook = func(cmd interface{}) error { - if _, ok := cmd.(models.GetAlertRuleByUIDQuery); !ok { - return nil - } - return errors.New("TEST") - } - updateChan <- ruleVersion(rule.Version) - - var queries []interface{} - require.Eventuallyf(t, func() bool { - queries = ruleStore.GetRecordedCommands(func(cmd interface{}) (interface{}, bool) { - c, ok := cmd.(models.GetAlertRuleByUIDQuery) - return c, ok - }) - return int64(len(queries)) == sch.maxAttempts - }, 5*time.Second, 100*time.Millisecond, "Expected exactly two request of %T. All recordings: %#v", models.GetAlertRuleByUIDQuery{}, ruleStore.RecordedOps) - }) - }) - - t.Run("when rule version is updated", func(t *testing.T) { - t.Run("should clear the state and expire firing alerts", func(t *testing.T) { - orgID := rand.Int63() - - evalChan := make(chan *evaluation) - evalAppliedChan := make(chan time.Time) - updateChan := make(chan ruleVersion) - - sender := AlertsSenderMock{} - - ctx := context.Background() - sch, ruleStore, _, _ := createSchedule(evalAppliedChan, &sender) - - var rule = CreateTestAlertRule(t, ruleStore, 10, orgID, eval.Alerting) // we want the alert to fire - - sender.EXPECT().Send(rule.GetKey(), mock.Anything) - - // define some state - states := make([]*state.State, 0, len(allStates)) - for _, s := range allStates { - for i := 0; i < 2; i++ { - states = append(states, &state.State{ - AlertRuleUID: rule.UID, - CacheId: util.GenerateShortUID(), - OrgID: rule.OrgID, - State: s, - StartsAt: sch.clock.Now(), - EndsAt: sch.clock.Now().Add(time.Duration(rand.Intn(25)+5) * time.Second), - Labels: rule.Labels, - }) - } - } - sch.stateManager.Put(states) - states = sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID) - - expectedToBeSent := 0 - for _, s := range states { - if s.State == eval.Normal || s.State == eval.Pending { - continue - } - expectedToBeSent++ - } - require.Greaterf(t, expectedToBeSent, 0, "State manger was expected to return at least one state that can be expired") - - go func() { - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, updateChan) - }() - - wg := sync.WaitGroup{} - wg.Add(1) - ruleStore.Hook = func(cmd interface{}) error { - _, ok := cmd.(models.GetAlertRuleByUIDQuery) - if ok { - wg.Done() // add synchronization. - } - return nil - } - - updateChan <- ruleVersion(rule.Version) - - wg.Wait() - newRule := models.CopyRule(rule) - newRule.Version++ - ruleStore.PutRule(ctx, newRule) - wg.Add(1) - updateChan <- ruleVersion(newRule.Version) - wg.Wait() + t.Run("should clear the state and expire firing alerts if version in channel is greater", func(t *testing.T) { + updateChan <- ruleVersion(rule.Version + rand.Int63n(1000) + 1) require.Eventually(t, func() bool { return len(sender.Calls) > 0 }, 5*time.Second, 100*time.Millisecond) require.Empty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)) - - sender.AssertExpectations(t) + sender.AssertNumberOfCalls(t, "Send", 1) args, ok := sender.Calls[0].Arguments[1].(definitions.PostableAlerts) require.Truef(t, ok, fmt.Sprintf("expected argument of function was supposed to be 'definitions.PostableAlerts' but got %T", sender.Calls[0].Arguments[1])) require.Len(t, args.PostableAlerts, expectedToBeSent) @@ -467,32 +268,87 @@ func TestSchedule_ruleRoutine(t *testing.T) { }) t.Run("when evaluation fails", func(t *testing.T) { + rule := models.AlertRuleGen(withQueryForState(t, eval.Error))() + rule.ExecErrState = models.ErrorErrState + + evalChan := make(chan *evaluation) + evalAppliedChan := make(chan time.Time) + + sender := AlertsSenderMock{} + sender.EXPECT().Send(rule.GetKey(), mock.Anything).Return() + + sch, ruleStore, _, reg := createSchedule(evalAppliedChan, &sender) + ruleStore.PutRule(context.Background(), rule) + + go func() { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersion)) + }() + + evalChan <- &evaluation{ + scheduledAt: sch.clock.Now(), + rule: rule, + } + + waitForTimeChannel(t, evalAppliedChan) + t.Run("it should increase failure counter", func(t *testing.T) { - t.Skip() - // TODO implement check for counter + // duration metric has 0 values because of mocked clock that do not advance + expectedMetric := fmt.Sprintf( + `# HELP grafana_alerting_rule_evaluation_duration_seconds The duration for a rule to execute. + # TYPE grafana_alerting_rule_evaluation_duration_seconds histogram + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.005"} 1 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.01"} 1 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.025"} 1 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.05"} 1 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.1"} 1 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.25"} 1 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.5"} 1 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="1"} 1 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="2.5"} 1 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="5"} 1 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="10"} 1 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="25"} 1 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="50"} 1 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="100"} 1 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="+Inf"} 1 + grafana_alerting_rule_evaluation_duration_seconds_sum{org="%[1]d"} 0 + grafana_alerting_rule_evaluation_duration_seconds_count{org="%[1]d"} 1 + # HELP grafana_alerting_rule_evaluation_failures_total The total number of rule evaluation failures. + # TYPE grafana_alerting_rule_evaluation_failures_total counter + grafana_alerting_rule_evaluation_failures_total{org="%[1]d"} 1 + # HELP grafana_alerting_rule_evaluations_total The total number of rule evaluations. + # TYPE grafana_alerting_rule_evaluations_total counter + grafana_alerting_rule_evaluations_total{org="%[1]d"} 1 + `, rule.OrgID) + + err := testutil.GatherAndCompare(reg, bytes.NewBufferString(expectedMetric), "grafana_alerting_rule_evaluation_duration_seconds", "grafana_alerting_rule_evaluations_total", "grafana_alerting_rule_evaluation_failures_total") + require.NoError(t, err) }) - t.Run("it should retry up to configured times", func(t *testing.T) { - // TODO figure out how to simulate failure - t.Skip() + + t.Run("it should send special alert DatasourceError", func(t *testing.T) { + sender.AssertNumberOfCalls(t, "Send", 1) + args, ok := sender.Calls[0].Arguments[1].(definitions.PostableAlerts) + require.Truef(t, ok, fmt.Sprintf("expected argument of function was supposed to be 'definitions.PostableAlerts' but got %T", sender.Calls[0].Arguments[1])) + assert.Len(t, args.PostableAlerts, 1) + assert.Equal(t, ErrorAlertName, args.PostableAlerts[0].Labels[prometheusModel.AlertNameLabel]) }) }) t.Run("when there are alerts that should be firing", func(t *testing.T) { t.Run("it should call sender", func(t *testing.T) { - orgID := rand.Int63() + // eval.Alerting makes state manager to create notifications for alertmanagers + rule := models.AlertRuleGen(withQueryForState(t, eval.Alerting))() evalChan := make(chan *evaluation) evalAppliedChan := make(chan time.Time) sender := AlertsSenderMock{} + sender.EXPECT().Send(rule.GetKey(), mock.Anything).Return() sch, ruleStore, _, _ := createSchedule(evalAppliedChan, &sender) - - // eval.Alerting makes state manager to create notifications for alertmanagers - rule := CreateTestAlertRule(t, ruleStore, 10, orgID, eval.Alerting) - folder, _ := ruleStore.GetNamespaceByUID(context.Background(), rule.NamespaceUID, orgID, nil) - - sender.EXPECT().Send(rule.GetKey(), mock.Anything).Return() + ruleStore.PutRule(context.Background(), rule) go func() { ctx, cancel := context.WithCancel(context.Background()) @@ -501,39 +357,56 @@ func TestSchedule_ruleRoutine(t *testing.T) { }() evalChan <- &evaluation{ - scheduledAt: time.Now(), - version: rule.Version, + scheduledAt: sch.clock.Now(), + rule: rule, } waitForTimeChannel(t, evalAppliedChan) - sender.AssertExpectations(t) + sender.AssertNumberOfCalls(t, "Send", 1) args, ok := sender.Calls[0].Arguments[1].(definitions.PostableAlerts) require.Truef(t, ok, fmt.Sprintf("expected argument of function was supposed to be 'definitions.PostableAlerts' but got %T", sender.Calls[0].Arguments[1])) require.Len(t, args.PostableAlerts, 1) - - t.Run("should add extra labels", func(t *testing.T) { - alert := args.PostableAlerts[0] - assert.Equal(t, rule.UID, alert.Labels[models.RuleUIDLabel]) - assert.Equal(t, rule.NamespaceUID, alert.Labels[models.NamespaceUIDLabel]) - assert.Equal(t, rule.Title, alert.Labels[prometheusModel.AlertNameLabel]) - assert.Equal(t, folder.Title, alert.Labels[models.FolderTitleLabel]) - }) }) }) t.Run("when there are no alerts to send it should not call notifiers", func(t *testing.T) { - // TODO needs some mocking/stubbing for Alertmanager and Sender to make sure it was not called - t.Skip() + rule := models.AlertRuleGen(withQueryForState(t, eval.Normal))() + + evalChan := make(chan *evaluation) + evalAppliedChan := make(chan time.Time) + + sender := AlertsSenderMock{} + sender.EXPECT().Send(rule.GetKey(), mock.Anything).Return() + + sch, ruleStore, _, _ := createSchedule(evalAppliedChan, &sender) + ruleStore.PutRule(context.Background(), rule) + + go func() { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersion)) + }() + + evalChan <- &evaluation{ + scheduledAt: sch.clock.Now(), + rule: rule, + } + + waitForTimeChannel(t, evalAppliedChan) + + sender.AssertNotCalled(t, "Send", mock.Anything, mock.Anything) + + require.NotEmpty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)) }) } func TestSchedule_UpdateAlertRule(t *testing.T) { t.Run("when rule exists", func(t *testing.T) { t.Run("it should call Update", func(t *testing.T) { - sch := setupSchedulerWithFakeStores(t) - key := generateRuleKey() + sch := setupScheduler(t, nil, nil, nil, nil, nil) + key := models.GenerateRuleKey(rand.Int63()) info, _ := sch.registry.getOrCreateInfo(context.Background(), key) version := rand.Int63() go func() { @@ -548,8 +421,8 @@ func TestSchedule_UpdateAlertRule(t *testing.T) { } }) t.Run("should exit if it is closed", func(t *testing.T) { - sch := setupSchedulerWithFakeStores(t) - key := generateRuleKey() + sch := setupScheduler(t, nil, nil, nil, nil, nil) + key := models.GenerateRuleKey(rand.Int63()) info, _ := sch.registry.getOrCreateInfo(context.Background(), key) info.stop() sch.UpdateAlertRule(key, rand.Int63()) @@ -557,8 +430,8 @@ func TestSchedule_UpdateAlertRule(t *testing.T) { }) t.Run("when rule does not exist", func(t *testing.T) { t.Run("should exit", func(t *testing.T) { - sch := setupSchedulerWithFakeStores(t) - key := generateRuleKey() + sch := setupScheduler(t, nil, nil, nil, nil, nil) + key := models.GenerateRuleKey(rand.Int63()) sch.UpdateAlertRule(key, rand.Int63()) }) }) @@ -567,24 +440,26 @@ func TestSchedule_UpdateAlertRule(t *testing.T) { func TestSchedule_DeleteAlertRule(t *testing.T) { t.Run("when rule exists", func(t *testing.T) { t.Run("it should stop evaluation loop and remove the controller from registry", func(t *testing.T) { - sch := setupSchedulerWithFakeStores(t) - key := generateRuleKey() + sch := setupScheduler(t, nil, nil, nil, nil, nil) + rule := models.AlertRuleGen()() + key := rule.GetKey() info, _ := sch.registry.getOrCreateInfo(context.Background(), key) sch.DeleteAlertRule(key) require.False(t, info.update(ruleVersion(rand.Int63()))) - success, dropped := info.eval(time.Now(), 1) + success, dropped := info.eval(time.Now(), rule) require.False(t, success) require.Nilf(t, dropped, "expected no dropped evaluations but got one") require.False(t, sch.registry.exists(key)) }) t.Run("should remove controller from registry", func(t *testing.T) { - sch := setupSchedulerWithFakeStores(t) - key := generateRuleKey() + sch := setupScheduler(t, nil, nil, nil, nil, nil) + rule := models.AlertRuleGen()() + key := rule.GetKey() info, _ := sch.registry.getOrCreateInfo(context.Background(), key) info.stop() sch.DeleteAlertRule(key) require.False(t, info.update(ruleVersion(rand.Int63()))) - success, dropped := info.eval(time.Now(), 1) + success, dropped := info.eval(time.Now(), rule) require.False(t, success) require.Nilf(t, dropped, "expected no dropped evaluations but got one") require.False(t, sch.registry.exists(key)) @@ -592,40 +467,39 @@ func TestSchedule_DeleteAlertRule(t *testing.T) { }) t.Run("when rule does not exist", func(t *testing.T) { t.Run("should exit", func(t *testing.T) { - sch := setupSchedulerWithFakeStores(t) - key := generateRuleKey() + sch := setupScheduler(t, nil, nil, nil, nil, nil) + key := models.GenerateRuleKey(rand.Int63()) sch.DeleteAlertRule(key) }) }) } -func generateRuleKey() models.AlertRuleKey { - return models.AlertRuleKey{ - OrgID: rand.Int63(), - UID: util.GenerateShortUID(), - } -} - -func setupSchedulerWithFakeStores(t *testing.T) *schedule { - t.Helper() - ruleStore := store.NewFakeRuleStore(t) - instanceStore := &store.FakeInstanceStore{} - sch, _ := setupScheduler(t, ruleStore, instanceStore, nil, nil) - return sch -} - -func setupScheduler(t *testing.T, rs store.RuleStore, is store.InstanceStore, registry *prometheus.Registry, senderMock *AlertsSenderMock) (*schedule, *clock.Mock) { +func setupScheduler(t *testing.T, rs *store.FakeRuleStore, is *store.FakeInstanceStore, registry *prometheus.Registry, senderMock *AlertsSenderMock, evalMock *eval.FakeEvaluator) *schedule { t.Helper() fakeAnnoRepo := store.NewFakeAnnotationsRepo() annotations.SetRepository(fakeAnnoRepo) mockedClock := clock.NewMock() logger := log.New("ngalert schedule test") + + if rs == nil { + rs = store.NewFakeRuleStore(t) + } + + if is == nil { + is = &store.FakeInstanceStore{} + } + + var evaluator eval.Evaluator = evalMock + if evalMock == nil { + secretsService := secretsManager.SetupTestService(t, fakes.NewFakeSecretsStore()) + evaluator = eval.NewEvaluator(&setting.Cfg{ExpressionsEnabled: true}, logger, nil, secretsService, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil)) + } + if registry == nil { registry = prometheus.NewPedanticRegistry() } m := metrics.NewNGAlert(registry) - secretsService := secretsManager.SetupTestService(t, fakes.NewFakeSecretsStore()) appUrl := &url.URL{ Scheme: "http", @@ -638,41 +512,27 @@ func setupScheduler(t *testing.T, rs store.RuleStore, is store.InstanceStore, re } cfg := setting.UnifiedAlertingSettings{ - BaseInterval: time.Second, - MaxAttempts: 1, - AdminConfigPollInterval: 10 * time.Minute, // do not poll in unit tests. + BaseInterval: time.Second, + MaxAttempts: 1, } schedCfg := SchedulerCfg{ Cfg: cfg, C: mockedClock, - Evaluator: eval.NewEvaluator(&setting.Cfg{ExpressionsEnabled: true}, logger, nil, secretsService, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil)), + Evaluator: evaluator, RuleStore: rs, InstanceStore: is, Logger: logger, Metrics: m.GetSchedulerMetrics(), AlertSender: senderMock, } - st := state.NewManager(schedCfg.Logger, m.GetStateMetrics(), nil, rs, is, &dashboards.FakeDashboardService{}, &image.NoopImageService{}, clock.NewMock()) - return NewScheduler(schedCfg, appUrl, st, busmock.New()), mockedClock + st := state.NewManager(schedCfg.Logger, m.GetStateMetrics(), nil, rs, is, &dashboards.FakeDashboardService{}, &image.NoopImageService{}, mockedClock) + return NewScheduler(schedCfg, appUrl, st, busmock.New()) } -// createTestAlertRule creates a dummy alert definition to be used by the tests. -func CreateTestAlertRule(t *testing.T, dbstore *store.FakeRuleStore, intervalSeconds int64, orgID int64, evalResult eval.State) *models.AlertRule { - ctx := context.Background() - - t.Helper() - records := make([]interface{}, 0, len(dbstore.RecordedOps)) - copy(records, dbstore.RecordedOps) - defer func() { - // erase queries that were made by the testing suite - dbstore.RecordedOps = records - }() - d := rand.Intn(1000) - ruleGroup := fmt.Sprintf("ruleGroup-%d", d) - +func withQueryForState(t *testing.T, evalResult eval.State) models.AlertRuleMutator { var expression string - var forDuration time.Duration + var forMultimplier int64 = 0 switch evalResult { case eval.Normal: expression = `{ @@ -687,7 +547,7 @@ func CreateTestAlertRule(t *testing.T, dbstore *store.FakeRuleStore, intervalSec "expression":"2 + 2 > 1" }` if evalResult == eval.Pending { - forDuration = 100 * time.Second + forMultimplier = rand.Int63n(9) + 1 } case eval.Error: expression = `{ @@ -695,17 +555,13 @@ func CreateTestAlertRule(t *testing.T, dbstore *store.FakeRuleStore, intervalSec "type":"math", "expression":"$A" }` - case eval.NoData: - // TODO Implement support for NoData - require.Fail(t, "Alert rule with desired evaluation result NoData is not supported yet") + default: + require.Fail(t, fmt.Sprintf("Alert rule with desired evaluation result '%s' is not supported yet", evalResult)) } - rule := &models.AlertRule{ - ID: 1, - OrgID: orgID, - Title: fmt.Sprintf("an alert definition %d", d), - Condition: "A", - Data: []models.AlertQuery{ + return func(rule *models.AlertRule) { + rule.Condition = "A" + rule.Data = []models.AlertQuery{ { DatasourceUID: "-100", Model: json.RawMessage(expression), @@ -715,22 +571,7 @@ func CreateTestAlertRule(t *testing.T, dbstore *store.FakeRuleStore, intervalSec }, RefID: "A", }, - }, - Updated: time.Now(), - IntervalSeconds: intervalSeconds, - Version: 1, - UID: util.GenerateShortUID(), - NamespaceUID: "namespace", - RuleGroup: ruleGroup, - NoDataState: models.NoData, - ExecErrState: models.AlertingErrState, - For: forDuration, - Annotations: map[string]string{"testAnnoKey": "testAnnoValue"}, - Labels: make(map[string]string), + } + rule.For = time.Duration(rule.IntervalSeconds*forMultimplier) * time.Second } - - dbstore.PutRule(ctx, rule) - - t.Logf("alert definition: %v with interval: %d created", rule.GetKey(), rule.IntervalSeconds) - return rule } diff --git a/pkg/services/ngalert/store/alert_rule.go b/pkg/services/ngalert/store/alert_rule.go index d22a2bd59ca..fa66ba28f30 100644 --- a/pkg/services/ngalert/store/alert_rule.go +++ b/pkg/services/ngalert/store/alert_rule.go @@ -401,8 +401,8 @@ func (st DBstore) GetNamespaceByUID(ctx context.Context, uid string, orgID int64 // GetAlertRulesForScheduling returns a short version of all alert rules except those that belong to an excluded list of organizations func (st DBstore) GetAlertRulesForScheduling(ctx context.Context, query *ngmodels.GetAlertRulesForSchedulingQuery) error { return st.SQLStore.WithDbSession(ctx, func(sess *sqlstore.DBSession) error { - alerts := make([]*ngmodels.SchedulableAlertRule, 0) - q := sess.Table("alert_rule") + alerts := make([]*ngmodels.AlertRule, 0) + q := sess.Table(ngmodels.AlertRule{}) if len(st.Cfg.DisabledOrgs) > 0 { excludeOrgs := make([]interface{}, 0, len(st.Cfg.DisabledOrgs)) for orgID := range st.Cfg.DisabledOrgs { diff --git a/pkg/services/ngalert/store/testing.go b/pkg/services/ngalert/store/testing.go index c6cc5670060..0a92561cfae 100644 --- a/pkg/services/ngalert/store/testing.go +++ b/pkg/services/ngalert/store/testing.go @@ -183,14 +183,7 @@ func (f *FakeRuleStore) GetAlertRulesForScheduling(_ context.Context, q *models. return err } for _, rules := range f.Rules { - for _, rule := range rules { - q.Result = append(q.Result, &models.SchedulableAlertRule{ - UID: rule.UID, - OrgID: rule.OrgID, - IntervalSeconds: rule.IntervalSeconds, - Version: rule.Version, - }) - } + q.Result = append(q.Result, rules...) } return nil }