Alerting: Use org store to read organization IDs (#99938)

This commit is contained in:
Alexander Akhmetov
2025-02-03 15:38:16 +01:00
committed by GitHub
parent a18fa4af8f
commit d6c1e3bb45
15 changed files with 104 additions and 173 deletions

View File

@ -616,7 +616,7 @@ func (ng *AlertNG) Run(ctx context.Context) error {
// Also note that this runs synchronously to ensure state is loaded
// before rule evaluation begins, hence we use ctx and not subCtx.
//
ng.stateManager.Warm(ctx, ng.store, ng.StartupInstanceReader)
ng.stateManager.Warm(ctx, ng.store, ng.store, ng.StartupInstanceReader)
children.Go(func() error {
return ng.schedule.Run(subCtx)

View File

@ -254,7 +254,7 @@ func (moa *MultiOrgAlertmanager) Run(ctx context.Context) error {
func (moa *MultiOrgAlertmanager) LoadAndSyncAlertmanagersForOrgs(ctx context.Context) error {
moa.logger.Debug("Synchronizing Alertmanagers for orgs")
// First, load all the organizations from the database.
orgIDs, err := moa.orgStore.GetOrgs(ctx)
orgIDs, err := moa.orgStore.FetchOrgIds(ctx)
if err != nil {
return err
}

View File

@ -127,7 +127,7 @@ func TestMultiOrgAlertmanager_SyncAlertmanagersForOrgsWithFailures(t *testing.T)
2: {AlertmanagerConfiguration: brokenConfig, OrgID: orgWithBadConfig},
})
orgs, err := mam.orgStore.GetOrgs(ctx)
orgs, err := mam.orgStore.FetchOrgIds(ctx)
require.NoError(t, err)
// No successfully applied configurations should be found at first.
{

View File

@ -224,7 +224,7 @@ func NewFakeOrgStore(t *testing.T, orgs []int64) *FakeOrgStore {
}
}
func (f *FakeOrgStore) GetOrgs(_ context.Context) ([]int64, error) {
func (f *FakeOrgStore) FetchOrgIds(_ context.Context) ([]int64, error) {
return f.orgs, nil
}

View File

@ -128,20 +128,21 @@ func (st *Manager) Run(ctx context.Context) error {
return nil
}
func (st *Manager) Warm(ctx context.Context, rulesReader RuleReader, instanceReader InstanceReader) {
func (st *Manager) Warm(ctx context.Context, orgReader OrgReader, rulesReader RuleReader, instanceReader InstanceReader) {
logger := st.log.FromContext(ctx)
if st.instanceStore == nil {
logger.Info("Skip warming the state because instance store is not configured")
if orgReader == nil || rulesReader == nil || instanceReader == nil {
logger.Error("Unable to warm state cache, missing required store readers")
return
}
startTime := time.Now()
logger.Info("Warming state cache for startup")
orgIds, err := instanceReader.FetchOrgIds(ctx)
orgIds, err := orgReader.FetchOrgIds(ctx)
if err != nil {
logger.Error("Unable to fetch orgIds", "error", err)
logger.Error("Unable to warm state cache, failed to fetch org IDs", "error", err)
return
}
statesCount := 0
@ -203,7 +204,7 @@ func (st *Manager) Warm(ctx context.Context, rulesReader RuleReader, instanceRea
if entry.ResultFingerprint != "" {
fp, err := strconv.ParseUint(entry.ResultFingerprint, 16, 64)
if err != nil {
logger.Error("Failed to parse result fingerprint of alert instance", "error", err, "ruleUID", entry.RuleUID)
logger.Error("Failed to parse result fingerprint of alert instance", "error", err, "rule_uid", entry.RuleUID)
}
resultFp = data.Fingerprint(fp)
}

View File

@ -35,6 +35,9 @@ import (
"github.com/grafana/grafana/pkg/services/ngalert/state"
"github.com/grafana/grafana/pkg/services/ngalert/state/historian"
"github.com/grafana/grafana/pkg/services/ngalert/tests"
alertTestUtil "github.com/grafana/grafana/pkg/services/ngalert/testutil"
"github.com/grafana/grafana/pkg/services/org"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tests/testsuite"
"github.com/grafana/grafana/pkg/util"
)
@ -49,8 +52,12 @@ func TestWarmStateCache(t *testing.T) {
ctx := context.Background()
ng, dbstore := tests.SetupTestEnv(t, 1)
const mainOrgID int64 = 1
rule := tests.CreateTestAlertRule(t, ctx, dbstore, 600, mainOrgID)
orgService, err := alertTestUtil.SetupOrgService(t, dbstore.SQLStore, setting.NewCfg())
require.NoError(t, err)
mainOrg, err := orgService.CreateWithMember(ctx, &org.CreateOrgCommand{})
require.NoError(t, err)
rule := tests.CreateTestAlertRule(t, ctx, dbstore, 600, mainOrg.ID)
expectedEntries := []*state.State{
{
@ -230,7 +237,7 @@ func TestWarmStateCache(t *testing.T) {
Log: log.New("ngalert.state.manager"),
}
st := state.NewManager(cfg, state.NewNoopPersister())
st.Warm(ctx, dbstore, ng.InstanceStore)
st.Warm(ctx, dbstore, dbstore, ng.InstanceStore)
t.Run("instance cache has expected entries", func(t *testing.T) {
for _, entry := range expectedEntries {
@ -277,7 +284,7 @@ func TestDashboardAnnotations(t *testing.T) {
"test2": "{{ $labels.instance_label }}",
})
st.Warm(ctx, dbstore, ng.InstanceStore)
st.Warm(ctx, dbstore, dbstore, ng.InstanceStore)
bValue := float64(42)
cValue := float64(1)
_ = st.ProcessEvalResults(ctx, evaluationTime, rule, eval.Results{{
@ -1699,8 +1706,12 @@ func TestStaleResultsHandler(t *testing.T) {
ctx := context.Background()
ng, dbstore := tests.SetupTestEnv(t, 1)
const mainOrgID int64 = 1
rule := tests.CreateTestAlertRule(t, ctx, dbstore, int64(interval.Seconds()), mainOrgID)
orgService, err := alertTestUtil.SetupOrgService(t, dbstore.SQLStore, setting.NewCfg())
require.NoError(t, err)
mainOrg, err := orgService.CreateWithMember(ctx, &org.CreateOrgCommand{})
require.NoError(t, err)
rule := tests.CreateTestAlertRule(t, ctx, dbstore, int64(interval.Seconds()), mainOrg.ID)
lastEval := evaluationTime.Add(-2 * interval)
labels1 := models.InstanceLabels{
@ -1813,7 +1824,7 @@ func TestStaleResultsHandler(t *testing.T) {
Log: log.New("ngalert.state.manager"),
}
st := state.NewManager(cfg, state.NewNoopPersister())
st.Warm(ctx, dbstore, ng.InstanceStore)
st.Warm(ctx, dbstore, dbstore, ng.InstanceStore)
existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID)
// We have loaded the expected number of entries from the db
@ -1980,8 +1991,12 @@ func TestDeleteStateByRuleUID(t *testing.T) {
ctx := context.Background()
ng, dbstore := tests.SetupTestEnv(t, 1)
const mainOrgID int64 = 1
rule := tests.CreateTestAlertRule(t, ctx, dbstore, int64(interval.Seconds()), mainOrgID)
orgService, err := alertTestUtil.SetupOrgService(t, dbstore.SQLStore, setting.NewCfg())
require.NoError(t, err)
mainOrg, err := orgService.CreateWithMember(ctx, &org.CreateOrgCommand{})
require.NoError(t, err)
rule := tests.CreateTestAlertRule(t, ctx, dbstore, int64(interval.Seconds()), mainOrg.ID)
labels1 := models.InstanceLabels{"test1": "testValue1"}
_, hash1, _ := labels1.StringAndHash()
@ -2073,7 +2088,7 @@ func TestDeleteStateByRuleUID(t *testing.T) {
Log: log.New("ngalert.state.manager"),
}
st := state.NewManager(cfg, state.NewNoopPersister())
st.Warm(ctx, dbstore, ng.InstanceStore)
st.Warm(ctx, dbstore, dbstore, ng.InstanceStore)
q := &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID}
alerts, _ := ng.InstanceStore.ListAlertInstances(ctx, q)
existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID)
@ -2122,8 +2137,12 @@ func TestResetStateByRuleUID(t *testing.T) {
ctx := context.Background()
ng, dbstore := tests.SetupTestEnv(t, 1)
const mainOrgID int64 = 1
rule := tests.CreateTestAlertRule(t, ctx, dbstore, int64(interval.Seconds()), mainOrgID)
orgService, err := alertTestUtil.SetupOrgService(t, dbstore.SQLStore, setting.NewCfg())
require.NoError(t, err)
mainOrg, err := orgService.CreateWithMember(ctx, &org.CreateOrgCommand{})
require.NoError(t, err)
rule := tests.CreateTestAlertRule(t, ctx, dbstore, int64(interval.Seconds()), mainOrg.ID)
labels1 := models.InstanceLabels{"test1": "testValue1"}
_, hash1, _ := labels1.StringAndHash()
@ -2214,7 +2233,7 @@ func TestResetStateByRuleUID(t *testing.T) {
Log: log.New("ngalert.state.manager"),
}
st := state.NewManager(cfg, state.NewNoopPersister())
st.Warm(ctx, dbstore, ng.InstanceStore)
st.Warm(ctx, dbstore, dbstore, ng.InstanceStore)
q := &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID}
alerts, _ := ng.InstanceStore.ListAlertInstances(ctx, q)
existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID)

View File

@ -3,8 +3,6 @@ package state
import (
"context"
"fmt"
"maps"
"slices"
"time"
"github.com/grafana/grafana/pkg/infra/log"
@ -28,30 +26,6 @@ func NewMultiInstanceReader(logger log.Logger, r1, r2 InstanceReader) *MultiInst
}
}
// FetchOrgIds merges org IDs from both readers.
func (m *MultiInstanceReader) FetchOrgIds(ctx context.Context) ([]int64, error) {
orgsOne, err := m.ProtoDBReader.FetchOrgIds(ctx)
if err != nil {
return nil, fmt.Errorf("failed to fetch org IDs from ProtoDBReader: %w", err)
}
orgsTwo, err := m.DBReader.FetchOrgIds(ctx)
if err != nil {
return nil, fmt.Errorf("failed to fetch org IDs from DBReader: %w", err)
}
orgsSet := make(map[int64]struct{})
for _, orgID := range orgsOne {
orgsSet[orgID] = struct{}{}
}
for _, orgID := range orgsTwo {
orgsSet[orgID] = struct{}{}
}
return slices.Collect(maps.Keys(orgsSet)), nil
}
// ListAlertInstances fetches alert instances for a query from both readers,
// groups them by rule UID, and returns the newest instances for each rule as a
// single slice.

View File

@ -17,84 +17,11 @@ type mockInstanceReader struct {
mock.Mock
}
func (m *mockInstanceReader) FetchOrgIds(ctx context.Context) ([]int64, error) {
args := m.Called(ctx)
return args.Get(0).([]int64), args.Error(1)
}
func (m *mockInstanceReader) ListAlertInstances(ctx context.Context, cmd *models.ListAlertInstancesQuery) ([]*models.AlertInstance, error) {
args := m.Called(ctx, cmd)
return args.Get(0).([]*models.AlertInstance), args.Error(1)
}
func TestMultiInstanceReader_FetchOrgIds(t *testing.T) {
tests := []struct {
name string
mockAOrgIDs []int64
mockBOrgIDs []int64
mockAError error
mockBError error
expectedOrgIDs []int64
expectError bool
}{
{
name: "both readers empty, no errors",
mockAOrgIDs: []int64{},
mockBOrgIDs: []int64{},
expectedOrgIDs: []int64{},
},
{
name: "simple union, no errors",
mockAOrgIDs: []int64{1, 2},
mockBOrgIDs: []int64{2, 3},
expectedOrgIDs: []int64{1, 2, 3},
},
{
name: "error in readerA",
mockAError: errors.New("some error"),
expectError: true,
},
{
name: "error in readerB",
mockBError: errors.New("another error"),
expectError: true,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
readerA := &mockInstanceReader{}
if tc.mockAError != nil {
readerA.On("FetchOrgIds", mock.Anything).Return([]int64(nil), tc.mockAError).Once()
} else {
readerA.On("FetchOrgIds", mock.Anything).Return(tc.mockAOrgIDs, nil).Once()
}
readerB := &mockInstanceReader{}
if tc.mockBError != nil {
readerB.On("FetchOrgIds", mock.Anything).Return([]int64(nil), tc.mockBError).Once()
} else {
readerB.On("FetchOrgIds", mock.Anything).Return(tc.mockBOrgIDs, nil).Once()
}
multi := NewMultiInstanceReader(&logtest.Fake{}, readerA, readerB)
orgIDs, err := multi.FetchOrgIds(ctx)
if tc.expectError {
require.Error(t, err)
return
}
require.NoError(t, err)
require.ElementsMatch(t, tc.expectedOrgIDs, orgIDs)
readerA.AssertExpectations(t)
readerB.AssertExpectations(t)
})
}
}
func TestMultiInstanceReader_ListAlertInstances(t *testing.T) {
t1 := time.Unix(100, 0)
t2 := time.Unix(200, 0)

View File

@ -15,7 +15,6 @@ type InstanceStore interface {
// InstanceReader provides methods to fetch alert instances.
type InstanceReader interface {
FetchOrgIds(ctx context.Context) ([]int64, error)
ListAlertInstances(ctx context.Context, cmd *models.ListAlertInstancesQuery) ([]*models.AlertInstance, error)
}
@ -29,6 +28,10 @@ type InstanceWriter interface {
FullSync(ctx context.Context, instances []models.AlertInstance, batchSize int) error
}
type OrgReader interface {
FetchOrgIds(ctx context.Context) ([]int64, error)
}
// RuleReader represents the ability to fetch alert rules.
type RuleReader interface {
ListAlertRules(ctx context.Context, query *models.ListAlertRulesQuery) (models.RulesGroup, error)

View File

@ -44,8 +44,6 @@ func (f *FakeInstanceStore) SaveAlertInstance(_ context.Context, q models.AlertI
return nil
}
func (f *FakeInstanceStore) FetchOrgIds(_ context.Context) ([]int64, error) { return []int64{}, nil }
func (f *FakeInstanceStore) DeleteAlertInstances(ctx context.Context, q ...models.AlertInstanceKey) error {
f.mtx.Lock()
defer f.mtx.Unlock()

View File

@ -96,29 +96,6 @@ func (st InstanceDBStore) SaveAlertInstance(ctx context.Context, alertInstance m
})
}
func (st InstanceDBStore) FetchOrgIds(ctx context.Context) ([]int64, error) {
orgIds := []int64{}
err := st.SQLStore.WithDbSession(ctx, func(sess *db.Session) error {
s := strings.Builder{}
params := make([]any, 0)
addToQuery := func(stmt string, p ...any) {
s.WriteString(stmt)
params = append(params, p...)
}
addToQuery("SELECT DISTINCT rule_org_id FROM alert_instance")
if err := sess.SQL(s.String(), params...).Find(&orgIds); err != nil {
return err
}
return nil
})
return orgIds, err
}
// DeleteAlertInstances deletes instances with the provided keys in a single transaction.
func (st InstanceDBStore) DeleteAlertInstances(ctx context.Context, keys ...models.AlertInstanceKey) error {
if len(keys) == 0 {

View File

@ -7,10 +7,10 @@ import (
)
type OrgStore interface {
GetOrgs(ctx context.Context) ([]int64, error)
FetchOrgIds(ctx context.Context) ([]int64, error)
}
func (st DBstore) GetOrgs(ctx context.Context) ([]int64, error) {
func (st DBstore) FetchOrgIds(ctx context.Context) ([]int64, error) {
orgs := make([]int64, 0)
err := st.SQLStore.WithDbSession(ctx, func(sess *db.Session) error {
q := "SELECT id FROM org"

View File

@ -0,0 +1,47 @@
package store
import (
"context"
"fmt"
"testing"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/services/ngalert/testutil"
"github.com/grafana/grafana/pkg/services/org"
"github.com/grafana/grafana/pkg/setting"
)
func TestFetchOrgIds(t *testing.T) {
ctx := context.Background()
t.Run("returns empty result when no orgs exist", func(t *testing.T) {
sqlStore := db.InitTestDB(t)
store := &DBstore{SQLStore: sqlStore}
orgIDs, err := store.FetchOrgIds(ctx)
require.NoError(t, err)
require.Empty(t, orgIDs)
})
t.Run("returns all org IDs", func(t *testing.T) {
sqlStore := db.InitTestDB(t)
store := &DBstore{SQLStore: sqlStore}
orgService, err := testutil.SetupOrgService(t, sqlStore, setting.NewCfg())
require.NoError(t, err)
createdOrgIDs := make([]int64, 3)
for i := range 3 {
require.NoError(t, err)
newOrg, err := orgService.CreateWithMember(ctx, &org.CreateOrgCommand{Name: fmt.Sprintf("org-%d", i)})
require.NoError(t, err)
createdOrgIDs[i] = newOrg.ID
}
orgIDs, err := store.FetchOrgIds(ctx)
require.NoError(t, err)
require.ElementsMatch(t, createdOrgIDs, orgIDs)
})
}

View File

@ -93,29 +93,6 @@ func (st ProtoInstanceDBStore) SaveAlertInstance(ctx context.Context, alertInsta
return errors.New("save alert instance is not implemented for proto instance database store")
}
func (st ProtoInstanceDBStore) FetchOrgIds(ctx context.Context) ([]int64, error) {
orgIds := []int64{}
err := st.SQLStore.WithDbSession(ctx, func(sess *db.Session) error {
s := strings.Builder{}
params := make([]any, 0)
addToQuery := func(stmt string, p ...any) {
s.WriteString(stmt)
params = append(params, p...)
}
addToQuery("SELECT DISTINCT org_id FROM alert_rule_state")
if err := sess.SQL(s.String(), params...).Find(&orgIds); err != nil {
return err
}
return nil
})
return orgIds, err
}
func (st ProtoInstanceDBStore) DeleteAlertInstances(ctx context.Context, keys ...models.AlertInstanceKey) error {
logger := st.Logger.FromContext(ctx)
logger.Error("DeleteAlertInstances called and not implemented")

View File

@ -19,6 +19,8 @@ import (
"github.com/grafana/grafana/pkg/services/folder/folderimpl"
"github.com/grafana/grafana/pkg/services/folder/foldertest"
"github.com/grafana/grafana/pkg/services/guardian"
"github.com/grafana/grafana/pkg/services/org"
"github.com/grafana/grafana/pkg/services/org/orgimpl"
"github.com/grafana/grafana/pkg/services/quota/quotatest"
"github.com/grafana/grafana/pkg/services/supportbundles/supportbundlestest"
"github.com/grafana/grafana/pkg/services/tag/tagimpl"
@ -67,3 +69,9 @@ func SetupDashboardService(tb testing.TB, sqlStore db.DB, fs *folderimpl.Dashboa
return dashboardService, dashboardStore
}
func SetupOrgService(tb testing.TB, sqlStore db.DB, cfg *setting.Cfg) (org.Service, error) {
tb.Helper()
quotaService := quotatest.New(false, nil)
return orgimpl.ProvideService(sqlStore, cfg, quotaService)
}