diff --git a/pkg/services/alerting/engine.go b/pkg/services/alerting/engine.go index 11d0a2f2b9e..4d85d375ab5 100644 --- a/pkg/services/alerting/engine.go +++ b/pkg/services/alerting/engine.go @@ -35,7 +35,7 @@ func NewEngine() *Engine { func (e *Engine) Start() { log.Info("Alerting: Engine.Start()") - go e.schedulerTick() + go e.alertingTicker() go e.execDispatch() go e.resultHandler() } @@ -45,7 +45,7 @@ func (e *Engine) Stop() { close(e.resultQueue) } -func (e *Engine) schedulerTick() { +func (e *Engine) alertingTicker() { tickIndex := 0 for { @@ -57,6 +57,8 @@ func (e *Engine) schedulerTick() { } e.scheduler.Tick(tick, e.execQueue) + + tickIndex++ } } } diff --git a/pkg/services/alerting/executor.go b/pkg/services/alerting/executor.go index 6efe64ff0ee..2fc2d662230 100644 --- a/pkg/services/alerting/executor.go +++ b/pkg/services/alerting/executor.go @@ -13,7 +13,7 @@ import ( ) var ( - resultLogFmt = "%s executor: %s %1.2f %s %1.2f : %v" + resultLogFmt = "Alerting: executor %s %1.2f %s %1.2f : %v" descriptionFmt = "Actual value: %1.2f for %s" ) @@ -77,19 +77,19 @@ var aggregator = map[string]aggregationFn{ }, } -func (executor *ExecutorImpl) Execute(job *AlertJob, resultQueue chan *AlertResult) { - response, err := executor.GetSeries(job) +func (e *ExecutorImpl) Execute(job *AlertJob, resultQueue chan *AlertResult) { + response, err := e.GetSeries(job) if err != nil { resultQueue <- &AlertResult{State: alertstates.Pending, Id: job.Rule.Id, AlertJob: job} } - result := executor.validateRule(job.Rule, response) + result := e.validateRule(job.Rule, response) result.AlertJob = job resultQueue <- result } -func (executor *ExecutorImpl) GetSeries(job *AlertJob) (tsdb.TimeSeriesSlice, error) { +func (e *ExecutorImpl) GetSeries(job *AlertJob) (tsdb.TimeSeriesSlice, error) { query := &m.GetDataSourceByIdQuery{ Id: job.Rule.DatasourceId, OrgId: job.Rule.OrgId, @@ -108,7 +108,7 @@ func (executor *ExecutorImpl) GetSeries(job *AlertJob) (tsdb.TimeSeriesSlice, er return nil, fmt.Errorf("Grafana does not support alerts for %s", query.Result.Type) } -func (executor *ExecutorImpl) validateRule(rule *AlertRule, series tsdb.TimeSeriesSlice) *AlertResult { +func (e *ExecutorImpl) validateRule(rule *AlertRule, series tsdb.TimeSeriesSlice) *AlertResult { for _, serie := range series { if aggregator[rule.Aggregator] == nil { continue diff --git a/pkg/services/alerting/interfaces.go b/pkg/services/alerting/interfaces.go index 9f51c6216d3..d1a0f771b63 100644 --- a/pkg/services/alerting/interfaces.go +++ b/pkg/services/alerting/interfaces.go @@ -8,5 +8,5 @@ type Executor interface { type Scheduler interface { Tick(time time.Time, execQueue chan *AlertJob) - Update(rules []AlertRule) + Update(rules []*AlertRule) } diff --git a/pkg/services/alerting/rule_reader.go b/pkg/services/alerting/rule_reader.go new file mode 100644 index 00000000000..734c7504b5c --- /dev/null +++ b/pkg/services/alerting/rule_reader.go @@ -0,0 +1,91 @@ +package alerting + +import ( + "sync" + "time" + + "github.com/grafana/grafana/pkg/bus" + "github.com/grafana/grafana/pkg/log" + m "github.com/grafana/grafana/pkg/models" +) + +type RuleReader interface { + Fetch() []*AlertRule +} + +type AlertRuleReader struct { + sync.RWMutex + serverID string + serverPosition int + clusterSize int +} + +func NewRuleReader() *AlertRuleReader { + ruleReader := &AlertRuleReader{} + + go ruleReader.initReader() + return ruleReader +} + +func (arr *AlertRuleReader) initReader() { + heartbeat := time.NewTicker(time.Second * 10) + + for { + select { + case <-heartbeat.C: + arr.heartbeat() + } + } +} + +func (arr *AlertRuleReader) Fetch() []*AlertRule { + cmd := &m.GetAllAlertsQuery{} + err := bus.Dispatch(cmd) + + if err != nil { + log.Error(1, "Alerting: ruleReader.fetch(): Could not load alerts", err) + return []*AlertRule{} + } + + res := make([]*AlertRule, len(cmd.Result)) + for i, ruleDef := range cmd.Result { + model := &AlertRule{} + model.Id = ruleDef.Id + model.OrgId = ruleDef.OrgId + model.DatasourceId = ruleDef.DatasourceId + model.Query = ruleDef.Query + model.QueryRefId = ruleDef.QueryRefId + model.WarnLevel = ruleDef.WarnLevel + model.WarnOperator = ruleDef.WarnOperator + model.CritLevel = ruleDef.CritLevel + model.CritOperator = ruleDef.CritOperator + model.Frequency = ruleDef.Frequency + model.Title = ruleDef.Title + model.Description = ruleDef.Description + model.Aggregator = ruleDef.Aggregator + model.State = ruleDef.State + res[i] = model + } + + return res +} + +func (arr *AlertRuleReader) heartbeat() { + + //Lets cheat on this until we focus on clustering + //log.Info("Heartbeat: Sending heartbeat from " + this.serverId) + arr.clusterSize = 1 + arr.serverPosition = 1 + + /* + cmd := &m.HeartBeatCommand{ServerId: this.serverId} + err := bus.Dispatch(cmd) + + if err != nil { + log.Error(1, "Failed to send heartbeat.") + } else { + this.clusterSize = cmd.Result.ClusterSize + this.serverPosition = cmd.Result.UptimePosition + } + */ +} diff --git a/pkg/services/alerting/scheduler.go b/pkg/services/alerting/scheduler.go index 3d2d0cea263..ae94461fe4f 100644 --- a/pkg/services/alerting/scheduler.go +++ b/pkg/services/alerting/scheduler.go @@ -16,7 +16,7 @@ func NewScheduler() Scheduler { } } -func (s *SchedulerImpl) Update(rules []AlertRule) { +func (s *SchedulerImpl) Update(rules []*AlertRule) { log.Debug("Scheduler: Update()") jobs := make(map[int64]*AlertJob, 0) @@ -32,7 +32,7 @@ func (s *SchedulerImpl) Update(rules []AlertRule) { } } - job.Rule = &rule + job.Rule = rule job.Offset = int64(i) jobs[rule.Id] = job @@ -45,8 +45,6 @@ func (s *SchedulerImpl) Update(rules []AlertRule) { func (s *SchedulerImpl) Tick(tickTime time.Time, execQueue chan *AlertJob) { now := tickTime.Unix() - log.Info("Alerting: Scheduler.Tick() %v", len(s.jobs)) - for _, job := range s.jobs { if now%job.Rule.Frequency == 0 && job.Running == false { log.Trace("Scheduler: Putting job on to exec queue: %s", job.Rule.Title)