From cc65dd8bcfdadf274d3b90cbb5b0aa48fef3a753 Mon Sep 17 00:00:00 2001 From: bergquist Date: Fri, 3 Jun 2016 08:33:04 +0200 Subject: [PATCH] tech(alerting): use pointers for updating alertjobs --- pkg/models/alerts.go | 2 +- pkg/services/alerting/alerting.go | 47 +++++++++++-------- pkg/services/alerting/datasources/backends.go | 2 +- pkg/services/alerting/datasources/graphite.go | 8 ++-- pkg/services/alerting/executor.go | 15 +++--- pkg/services/alerting/executor_test.go | 16 +++---- 6 files changed, 50 insertions(+), 40 deletions(-) diff --git a/pkg/models/alerts.go b/pkg/models/alerts.go index e7ef0c178e2..a986fb0037f 100644 --- a/pkg/models/alerts.go +++ b/pkg/models/alerts.go @@ -122,5 +122,5 @@ type AlertResult struct { ActualValue float64 Duration float64 Description string - Rule AlertRule + AlertJob *AlertJob } diff --git a/pkg/services/alerting/alerting.go b/pkg/services/alerting/alerting.go index 19fce39b8ee..8871e132f39 100644 --- a/pkg/services/alerting/alerting.go +++ b/pkg/services/alerting/alerting.go @@ -20,8 +20,8 @@ func Init() { reader := NewRuleReader() go scheduler.dispatch(reader) - go scheduler.Executor(&ExecutorImpl{}) - go scheduler.HandleResponses() + go scheduler.executor(&ExecutorImpl{}) + go scheduler.handleResponses() } @@ -65,11 +65,22 @@ func (scheduler *Scheduler) updateJobs(alertRuleFn func() []m.AlertRule) { for i := 0; i < len(rules); i++ { rule := rules[i] - jobs[rule.Id] = &m.AlertJob{ - Rule: rule, - Offset: int64(i), - Running: false, + /* + jobs[rule.Id] = &m.AlertJob{ + Offset: int64(i), + Running: false, + Rule: rule, + } + */ + + job := &m.AlertJob{} + if scheduler.jobs[rule.Id] != nil { + job = scheduler.jobs[rule.Id] } + + job.Rule = rule + job.Offset = int64(i) + jobs[rule.Id] = job } log.Debug("Scheduler: Selected %d jobs", len(jobs)) @@ -86,35 +97,33 @@ func (scheduler *Scheduler) queueJobs() { } } -func (scheduler *Scheduler) Executor(executor Executor) { +func (scheduler *Scheduler) executor(executor Executor) { for job := range scheduler.runQueue { //log.Info("Executor: queue length %d", len(this.runQueue)) log.Info("Executor: executing %s", job.Rule.Title) - scheduler.jobs[job.Rule.Id].Running = true - scheduler.MeasureAndExecute(executor, job) + job.Running = true + scheduler.measureAndExecute(executor, job) } } -func (scheduler *Scheduler) HandleResponses() { +func (scheduler *Scheduler) handleResponses() { for response := range scheduler.responseQueue { - log.Info("Response: alert(%d) status(%s) actual(%v)", response.Id, response.State, response.ActualValue) - if scheduler.jobs[response.Id] != nil { - scheduler.jobs[response.Id].Running = false - } + log.Info("Response: alert(%d) status(%s) actual(%v) running(%v)", response.Id, response.State, response.ActualValue, response.AlertJob.Running) + response.AlertJob.Running = false - cmd := m.UpdateAlertStateCommand{ + cmd := &m.UpdateAlertStateCommand{ AlertId: response.Id, NewState: response.State, Info: response.Description, } - if err := bus.Dispatch(&cmd); err != nil { - log.Error(1, "failed to save state", err) + if err := bus.Dispatch(cmd); err != nil { + log.Error(2, "failed to save state %v", err) } } } -func (scheduler *Scheduler) MeasureAndExecute(exec Executor, job *m.AlertJob) { +func (scheduler *Scheduler) measureAndExecute(exec Executor, job *m.AlertJob) { now := time.Now() responseChan := make(chan *m.AlertResult, 1) @@ -126,7 +135,7 @@ func (scheduler *Scheduler) MeasureAndExecute(exec Executor, job *m.AlertJob) { Id: job.Rule.Id, State: "timed out", Duration: float64(time.Since(now).Nanoseconds()) / float64(1000000), - Rule: job.Rule, + AlertJob: job, } case result := <-responseChan: result.Duration = float64(time.Since(now).Nanoseconds()) / float64(1000000) diff --git a/pkg/services/alerting/datasources/backends.go b/pkg/services/alerting/datasources/backends.go index 0a74c15804b..5b570ab61b5 100644 --- a/pkg/services/alerting/datasources/backends.go +++ b/pkg/services/alerting/datasources/backends.go @@ -26,7 +26,7 @@ func GetSeries(job *m.AlertJob) (m.TimeSeriesSlice, error) { } if query.Result.Type == m.DS_GRAPHITE { - return GraphiteClient{}.GetSeries(job, query.Result) + return GraphiteClient{}.GetSeries(*job, query.Result) } return nil, fmt.Errorf("Grafana does not support alerts for %s", query.Result.Type) diff --git a/pkg/services/alerting/datasources/graphite.go b/pkg/services/alerting/datasources/graphite.go index 767e2ec03de..bc01f3056e2 100644 --- a/pkg/services/alerting/datasources/graphite.go +++ b/pkg/services/alerting/datasources/graphite.go @@ -22,7 +22,7 @@ type GraphiteSerie struct { type GraphiteResponse []GraphiteSerie -func (this GraphiteClient) GetSeries(rule *m.AlertJob, datasource m.DataSource) (m.TimeSeriesSlice, error) { +func (this GraphiteClient) GetSeries(rule m.AlertJob, datasource m.DataSource) (m.TimeSeriesSlice, error) { v := url.Values{ "format": []string{"json"}, "target": []string{getTargetFromRule(rule.Rule)}, @@ -39,9 +39,6 @@ func (this GraphiteClient) GetSeries(rule *m.AlertJob, datasource m.DataSource) Timeout: 5 * time.Second, }.Do() - response := GraphiteResponse{} - res.Body.FromJsonTo(&response) - if err != nil { return nil, err } @@ -50,6 +47,9 @@ func (this GraphiteClient) GetSeries(rule *m.AlertJob, datasource m.DataSource) return nil, fmt.Errorf("expected httpstatus 200, found %d", res.StatusCode) } + response := GraphiteResponse{} + res.Body.FromJsonTo(&response) + timeSeries := make([]*m.TimeSeries, 0) for _, v := range response { diff --git a/pkg/services/alerting/executor.go b/pkg/services/alerting/executor.go index f300f67d632..67fbdd38d07 100644 --- a/pkg/services/alerting/executor.go +++ b/pkg/services/alerting/executor.go @@ -3,10 +3,11 @@ package alerting import ( "fmt" + "math" + "github.com/grafana/grafana/pkg/log" m "github.com/grafana/grafana/pkg/models" b "github.com/grafana/grafana/pkg/services/alerting/datasources" - "math" ) type Executor interface { @@ -80,13 +81,15 @@ func (this *ExecutorImpl) Execute(job *m.AlertJob, responseQueue chan *m.AlertRe response, err := b.GetSeries(job) if err != nil { - responseQueue <- &m.AlertResult{State: "PENDING", Id: job.Rule.Id, Rule: job.Rule} + responseQueue <- &m.AlertResult{State: "PENDING", Id: job.Rule.Id, AlertJob: job} } - responseQueue <- this.ValidateRule(job.Rule, response) + result := this.validateRule(job.Rule, response) + result.AlertJob = job + responseQueue <- result } -func (this *ExecutorImpl) ValidateRule(rule m.AlertRule, series m.TimeSeriesSlice) *m.AlertResult { +func (this *ExecutorImpl) validateRule(rule m.AlertRule, series m.TimeSeriesSlice) *m.AlertResult { for _, serie := range series { if aggregator[rule.Aggregator] == nil { continue @@ -103,7 +106,6 @@ func (this *ExecutorImpl) ValidateRule(rule m.AlertRule, series m.TimeSeriesSlic Id: rule.Id, ActualValue: aggValue, Description: fmt.Sprintf("Actual value: %1.2f for %s", aggValue, serie.Name), - Rule: rule, } } @@ -116,10 +118,9 @@ func (this *ExecutorImpl) ValidateRule(rule m.AlertRule, series m.TimeSeriesSlic Id: rule.Id, Description: fmt.Sprintf("Actual value: %1.2f for %s", aggValue, serie.Name), ActualValue: aggValue, - Rule: rule, } } } - return &m.AlertResult{State: m.AlertStateOk, Id: rule.Id, Rule: rule, Description: "Alert is OK!"} + return &m.AlertResult{State: m.AlertStateOk, Id: rule.Id, Description: "Alert is OK!"} } diff --git a/pkg/services/alerting/executor_test.go b/pkg/services/alerting/executor_test.go index 03e0164fa70..284d5c2d64d 100644 --- a/pkg/services/alerting/executor_test.go +++ b/pkg/services/alerting/executor_test.go @@ -18,7 +18,7 @@ func TestAlertingExecutor(t *testing.T) { m.NewTimeSeries("test1", [][2]float64{{2, 0}}), } - result := executor.ValidateRule(rule, timeSeries) + result := executor.validateRule(rule, timeSeries) So(result.State, ShouldEqual, m.AlertStateOk) }) @@ -29,7 +29,7 @@ func TestAlertingExecutor(t *testing.T) { m.NewTimeSeries("test1", [][2]float64{{2, 0}}), } - result := executor.ValidateRule(rule, timeSeries) + result := executor.validateRule(rule, timeSeries) So(result.State, ShouldEqual, m.AlertStateCritical) }) @@ -40,7 +40,7 @@ func TestAlertingExecutor(t *testing.T) { m.NewTimeSeries("test1", [][2]float64{{9, 0}, {9, 0}}), } - result := executor.ValidateRule(rule, timeSeries) + result := executor.validateRule(rule, timeSeries) So(result.State, ShouldEqual, m.AlertStateCritical) }) @@ -51,7 +51,7 @@ func TestAlertingExecutor(t *testing.T) { m.NewTimeSeries("test1", [][2]float64{{9, 0}, {9, 0}}), } - result := executor.ValidateRule(rule, timeSeries) + result := executor.validateRule(rule, timeSeries) So(result.State, ShouldEqual, m.AlertStateOk) }) @@ -62,7 +62,7 @@ func TestAlertingExecutor(t *testing.T) { m.NewTimeSeries("test1", [][2]float64{{11, 0}, {9, 0}}), } - result := executor.ValidateRule(rule, timeSeries) + result := executor.validateRule(rule, timeSeries) So(result.State, ShouldEqual, m.AlertStateOk) }) @@ -73,7 +73,7 @@ func TestAlertingExecutor(t *testing.T) { m.NewTimeSeries("test1", [][2]float64{{1, 0}, {11, 0}}), } - result := executor.ValidateRule(rule, timeSeries) + result := executor.validateRule(rule, timeSeries) So(result.State, ShouldEqual, m.AlertStateCritical) }) }) @@ -87,7 +87,7 @@ func TestAlertingExecutor(t *testing.T) { m.NewTimeSeries("test1", [][2]float64{{2, 0}}), } - result := executor.ValidateRule(rule, timeSeries) + result := executor.validateRule(rule, timeSeries) So(result.State, ShouldEqual, m.AlertStateOk) }) @@ -99,7 +99,7 @@ func TestAlertingExecutor(t *testing.T) { m.NewTimeSeries("test1", [][2]float64{{11, 0}}), } - result := executor.ValidateRule(rule, timeSeries) + result := executor.validateRule(rule, timeSeries) So(result.State, ShouldEqual, m.AlertStateCritical) }) })