mirror of
https://github.com/grafana/grafana.git
synced 2025-07-30 05:42:31 +08:00
Alerting: Add migration to clean up rule versions table (#102484)
* add migration to clean up rule versions * drop index right before creating a new one. * fetch only rules which version greater than toKeep
This commit is contained in:
@ -2,6 +2,8 @@ package ualert
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
@ -29,10 +31,13 @@ func AddAlertRuleGuidMigration(mg *migrator.Migrator) {
|
|||||||
Nullable: false,
|
Nullable: false,
|
||||||
Default: "''",
|
Default: "''",
|
||||||
}))
|
}))
|
||||||
mg.AddMigration("drop index in alert_rule_version table on rule_org_id, rule_uid and version columns", migrator.NewDropIndexMigration(alertRuleVersion, alertRuleVersionUDX_OrgIdRuleUIDVersion))
|
|
||||||
|
mg.AddMigration("cleanup alert_rule_version table", &cleanUpRuleVersionsMigration{})
|
||||||
|
|
||||||
mg.AddMigration("populate rule guid in alert rule table", &setRuleGuidMigration{})
|
mg.AddMigration("populate rule guid in alert rule table", &setRuleGuidMigration{})
|
||||||
|
|
||||||
|
mg.AddMigration("drop index in alert_rule_version table on rule_org_id, rule_uid and version columns", migrator.NewDropIndexMigration(alertRuleVersion, alertRuleVersionUDX_OrgIdRuleUIDVersion))
|
||||||
|
|
||||||
mg.AddMigration("add index in alert_rule_version table on rule_org_id, rule_uid, rule_guid and version columns",
|
mg.AddMigration("add index in alert_rule_version table on rule_org_id, rule_uid, rule_guid and version columns",
|
||||||
migrator.NewAddIndexMigration(alertRuleVersion,
|
migrator.NewAddIndexMigration(alertRuleVersion,
|
||||||
&migrator.Index{Cols: []string{"rule_org_id", "rule_uid", "rule_guid", "version"}, Type: migrator.UniqueIndex},
|
&migrator.Index{Cols: []string{"rule_org_id", "rule_uid", "rule_guid", "version"}, Type: migrator.UniqueIndex},
|
||||||
@ -118,3 +123,80 @@ func (c setRuleGuidMigration) Exec(sess *xorm.Session, mg *migrator.Migrator) er
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type cleanUpRuleVersionsMigration struct {
|
||||||
|
migrator.MigrationBase
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ migrator.CodeMigration = (*cleanUpRuleVersionsMigration)(nil)
|
||||||
|
|
||||||
|
func (c cleanUpRuleVersionsMigration) SQL(migrator.Dialect) string {
|
||||||
|
return codeMigration
|
||||||
|
}
|
||||||
|
|
||||||
|
func getBatchSize() int {
|
||||||
|
const defaultBatchSize = 50
|
||||||
|
envvar := os.Getenv("ALERT_RULE_VERSION_CLEANUP_MIGRATION_BATCH_SIZE")
|
||||||
|
if envvar == "" {
|
||||||
|
return defaultBatchSize
|
||||||
|
}
|
||||||
|
batchSize, err := strconv.Atoi(envvar)
|
||||||
|
if err != nil {
|
||||||
|
return defaultBatchSize
|
||||||
|
}
|
||||||
|
return batchSize
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c cleanUpRuleVersionsMigration) Exec(sess *xorm.Session, mg *migrator.Migrator) error {
|
||||||
|
var batchSize = getBatchSize()
|
||||||
|
|
||||||
|
const maxRetention = 100
|
||||||
|
toKeep := mg.Cfg.UnifiedAlerting.RuleVersionRecordLimit
|
||||||
|
if toKeep <= 0 {
|
||||||
|
mg.Logger.Info("Rule version record limit is not set, fallback to 100", "limit", toKeep)
|
||||||
|
toKeep = maxRetention
|
||||||
|
}
|
||||||
|
|
||||||
|
var rules []alertRule
|
||||||
|
err := sess.Table(alertRule{}).Select("uid, version").Where("version > ?", toKeep).Find(&rules)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
mg.Logger.Debug("Got alert rule UIDs with versions greater than retention", "count", len(rules))
|
||||||
|
batches := len(rules) / batchSize
|
||||||
|
if len(rules)%batchSize != 0 {
|
||||||
|
batches++
|
||||||
|
}
|
||||||
|
|
||||||
|
mg.Logger.Info("Cleaning up table `alert_rule_version`", "batchSize", batchSize, "batches", batches, "keepVersions", toKeep)
|
||||||
|
|
||||||
|
for i := 0; i < batches; i++ {
|
||||||
|
end := i*batchSize + batchSize
|
||||||
|
if end > len(rules) {
|
||||||
|
end = len(rules)
|
||||||
|
}
|
||||||
|
bd := strings.Builder{}
|
||||||
|
for idx, r := range rules[i*batchSize : end] {
|
||||||
|
if idx == 0 {
|
||||||
|
bd.WriteString(fmt.Sprintf("SELECT '%s' as uid, %d as version", r.UID, r.Version))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
bd.WriteString(fmt.Sprintf(" UNION ALL SELECT '%s', %d ", r.UID, r.Version))
|
||||||
|
}
|
||||||
|
_, err = sess.Exec(fmt.Sprintf(`
|
||||||
|
DELETE FROM alert_rule_version
|
||||||
|
WHERE EXISTS (
|
||||||
|
SELECT 1
|
||||||
|
FROM (%s) AR
|
||||||
|
WHERE AR.uid = alert_rule_version.rule_uid
|
||||||
|
AND alert_rule_version.version < AR.version - %d
|
||||||
|
)`, bd.String(), toKeep),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
mg.Logger.Debug(fmt.Sprintf("Batch %d of %d processed", i+1, batches))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user