mirror of
https://github.com/grafana/grafana.git
synced 2025-07-29 17:02:20 +08:00

* Cloud migrations: store snapshots in the database * update github.com/grafana/grafana-cloud-migration-snapshot to v1.9.0 * make update-workspace * use new field name in test * return error after call to fmt.Errorf * create methods for readability / fix session deletiong not deleting snapshots * remove debugging changes * update sample.ini * update tests to include OrgID in ListSnapshotsQuery * lint * lint * Update pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go Co-authored-by: Matheus Macabu <macabu@users.noreply.github.com> * remove TODO * Update pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go Co-authored-by: Matheus Macabu <macabu@users.noreply.github.com> * remove one of the debug logs --------- Co-authored-by: Matheus Macabu <macabu@users.noreply.github.com>
702 lines
23 KiB
Go
702 lines
23 KiB
Go
package cloudmigrationimpl
|
|
|
|
import (
|
|
"context"
|
|
"encoding/base64"
|
|
"fmt"
|
|
"slices"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/grafana/grafana/pkg/infra/db"
|
|
"github.com/grafana/grafana/pkg/services/cloudmigration"
|
|
"github.com/grafana/grafana/pkg/services/secrets"
|
|
secretskv "github.com/grafana/grafana/pkg/services/secrets/kvstore"
|
|
"github.com/grafana/grafana/pkg/services/sqlstore"
|
|
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
|
"github.com/grafana/grafana/pkg/util"
|
|
)
|
|
|
|
var _ store = (*sqlStore)(nil)
|
|
|
|
type sqlStore struct {
|
|
db db.DB
|
|
secretsStore secretskv.SecretsKVStore
|
|
secretsService secrets.Service
|
|
}
|
|
|
|
const (
|
|
tableName = "cloud_migration_resource"
|
|
secretType = "cloudmigration-snapshot-encryption-key"
|
|
GetAllSnapshots = -1
|
|
GetSnapshotListSortingLatest = "latest"
|
|
|
|
maxResourceBatchSize = 1000
|
|
)
|
|
|
|
func (ss *sqlStore) GetMigrationSessionByUID(ctx context.Context, orgID int64, uid string) (*cloudmigration.CloudMigrationSession, error) {
|
|
var cm cloudmigration.CloudMigrationSession
|
|
err := ss.db.WithDbSession(ctx, func(sess *db.Session) error {
|
|
exist, err := sess.Where("org_id=? AND uid=?", orgID, uid).Get(&cm)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !exist {
|
|
return cloudmigration.ErrMigrationNotFound
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := ss.decryptToken(ctx, &cm); err != nil {
|
|
return nil, fmt.Errorf("decrypting token: %w", err)
|
|
}
|
|
|
|
return &cm, err
|
|
}
|
|
|
|
func (ss *sqlStore) CreateMigrationSession(ctx context.Context, migration cloudmigration.CloudMigrationSession) (*cloudmigration.CloudMigrationSession, error) {
|
|
if err := ss.encryptToken(ctx, &migration); err != nil {
|
|
return nil, fmt.Errorf("encrypting token: %w", err)
|
|
}
|
|
|
|
err := ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
|
migration.Created = time.Now()
|
|
migration.Updated = time.Now()
|
|
migration.UID = util.GenerateShortUID()
|
|
|
|
_, err := sess.Insert(&migration)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &migration, nil
|
|
}
|
|
|
|
func (ss *sqlStore) GetCloudMigrationSessionList(ctx context.Context, orgID int64) ([]*cloudmigration.CloudMigrationSession, error) {
|
|
var migrations = make([]*cloudmigration.CloudMigrationSession, 0)
|
|
err := ss.db.WithDbSession(ctx, func(sess *db.Session) error {
|
|
return sess.Where("org_id=?", orgID).OrderBy("created DESC").Find(&migrations)
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for i := 0; i < len(migrations); i++ {
|
|
m := migrations[i]
|
|
|
|
if err := ss.decryptToken(ctx, m); err != nil {
|
|
return nil, fmt.Errorf("decrypting token: %w", err)
|
|
}
|
|
}
|
|
|
|
return migrations, nil
|
|
}
|
|
|
|
// DeleteMigrationSessionByUID deletes the migration session, and all the related snapshot and resources the work is done in a transaction.
|
|
func (ss *sqlStore) DeleteMigrationSessionByUID(ctx context.Context, orgID int64, uid string) (*cloudmigration.CloudMigrationSession, []cloudmigration.CloudMigrationSnapshot, error) {
|
|
var c cloudmigration.CloudMigrationSession
|
|
err := ss.db.WithDbSession(ctx, func(sess *db.Session) error {
|
|
exist, err := sess.Where("org_id=? AND uid=?", orgID, uid).Get(&c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !exist {
|
|
return cloudmigration.ErrMigrationNotFound
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// first we try to delete all the associated information to the session
|
|
q := cloudmigration.ListSnapshotsQuery{
|
|
SessionUID: uid,
|
|
Page: 1,
|
|
Limit: GetAllSnapshots,
|
|
OrgID: orgID,
|
|
}
|
|
snapshots, err := ss.GetSnapshotList(ctx, q)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("getting migration snapshots from db: %w", err)
|
|
}
|
|
|
|
err = ss.db.InTransaction(ctx, func(ctx context.Context) error {
|
|
for _, snapshot := range snapshots {
|
|
if err := ss.deleteSnapshotResources(ctx, snapshot.UID); err != nil {
|
|
return fmt.Errorf("deleting snapshot resource from db: %w", err)
|
|
}
|
|
if err := ss.deleteSnapshotPartitions(ctx, snapshot.UID); err != nil {
|
|
return fmt.Errorf("deleting snapshot partitions: %w", err)
|
|
}
|
|
if err := ss.deleteSnapshot(ctx, snapshot.UID); err != nil {
|
|
return fmt.Errorf("deleting snapshot from db: %w", err)
|
|
}
|
|
}
|
|
// and then we delete the migration sessions
|
|
err := ss.db.WithDbSession(ctx, func(sess *db.Session) error {
|
|
id := c.ID
|
|
affected, err := sess.Delete(&cloudmigration.CloudMigrationSession{
|
|
ID: id,
|
|
})
|
|
if affected == 0 {
|
|
return cloudmigration.ErrMigrationNotDeleted
|
|
}
|
|
return err
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("deleting migration from db: %w", err)
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
if err := ss.decryptToken(ctx, &c); err != nil {
|
|
return nil, nil, fmt.Errorf("decrypting token: %w", err)
|
|
}
|
|
|
|
return &c, snapshots, nil
|
|
}
|
|
|
|
func (ss *sqlStore) CreateSnapshot(ctx context.Context, snapshot cloudmigration.CloudMigrationSnapshot) error {
|
|
if snapshot.SessionUID == "" {
|
|
return fmt.Errorf("sessionUID is required")
|
|
}
|
|
if snapshot.UID == "" {
|
|
return fmt.Errorf("snapshot uid is required")
|
|
}
|
|
|
|
if err := ss.secretsStore.Set(ctx, secretskv.AllOrganizations, snapshot.UID, secretType, string(snapshot.GMSPublicKey)); err != nil {
|
|
return err
|
|
}
|
|
|
|
err := ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
|
snapshot.Created = time.Now()
|
|
snapshot.Updated = time.Now()
|
|
|
|
_, err := sess.InsertOne(&snapshot)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UpdateSnapshot takes a command containing a snapshot uid and any updates to apply to the snapshot.
|
|
// When performing multiple updates at once (e.g. updating the status and local resources), they are executed in separate transactions in order to batch insert large datasets.
|
|
// The status is the last thing updated, as its status ultimately determines the behavior of the API.
|
|
func (ss *sqlStore) UpdateSnapshot(ctx context.Context, update cloudmigration.UpdateSnapshotCmd) error {
|
|
if update.UID == "" {
|
|
return fmt.Errorf("missing snapshot uid")
|
|
}
|
|
if update.SessionID == "" {
|
|
return fmt.Errorf("missing session uid")
|
|
}
|
|
|
|
// If local resources are set, it means we have to create them for the first time
|
|
if len(update.LocalResourcesToCreate) > 0 {
|
|
if err := ss.CreateSnapshotResources(ctx, update.UID, update.LocalResourcesToCreate); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// If cloud resources are set, it means we have to update our resource local state
|
|
if len(update.CloudResourcesToUpdate) > 0 {
|
|
if err := ss.UpdateSnapshotResources(ctx, update.UID, update.CloudResourcesToUpdate); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Update the snapshot status if set
|
|
if update.Status != "" {
|
|
if err := ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
|
rawSQL := "UPDATE cloud_migration_snapshot SET status=? WHERE session_uid=? AND uid=?"
|
|
if _, err := sess.Exec(rawSQL, update.Status, update.SessionID, update.UID); err != nil {
|
|
return fmt.Errorf("updating snapshot status for uid %s: %w", update.UID, err)
|
|
}
|
|
return nil
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if update.PublicKey != nil {
|
|
if err := ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
|
rawSQL := "UPDATE cloud_migration_snapshot SET public_key=? WHERE session_uid=? AND uid=?"
|
|
if _, err := sess.Exec(rawSQL, update.PublicKey, update.SessionID, update.UID); err != nil {
|
|
return fmt.Errorf("updating snapshot public key for uid %s: %w", update.UID, err)
|
|
}
|
|
return nil
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ss *sqlStore) StorePartition(ctx context.Context, snapshotUID string, resourceType string, partitionNumber int, data []byte) error {
|
|
return ss.db.InTransaction(ctx, func(ctx context.Context) error {
|
|
return ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
|
_, err := sess.Insert(cloudmigration.CloudMigrationSnapshotPartition{
|
|
SnapshotUID: snapshotUID,
|
|
ResourceType: resourceType,
|
|
PartitionNumber: partitionNumber,
|
|
Data: data,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("inserting snapshot partition into database: %w", err)
|
|
}
|
|
return nil
|
|
})
|
|
})
|
|
}
|
|
|
|
func (ss *sqlStore) GetIndex(ctx context.Context, orgID int64, sessionUID string, snapshotUID string) (cloudmigration.CloudMigrationSnapshotIndex, error) {
|
|
var snap *cloudmigration.CloudMigrationSnapshot
|
|
partitions := make([]cloudmigration.CloudMigrationSnapshotPartition, 0)
|
|
|
|
if err := ss.db.InTransaction(ctx, func(ctx context.Context) error {
|
|
return ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
|
s, err := ss.getSnapshotByUID(ctx, orgID, sessionUID, snapshotUID)
|
|
if err != nil {
|
|
return fmt.Errorf("fetching snapshot from database: %w", err)
|
|
}
|
|
|
|
snap = s
|
|
if err := sess.OrderBy("cloud_migration_snapshot_partition.resource_type,cloud_migration_snapshot_partition.partition_number ASC").Find(&partitions, &cloudmigration.CloudMigrationSnapshotPartition{SnapshotUID: snapshotUID}); err != nil {
|
|
return fmt.Errorf("fetching partition from database: %w", err)
|
|
}
|
|
if secret, found, err := ss.secretsStore.Get(ctx, secretskv.AllOrganizations, snap.UID, secretType); err != nil {
|
|
return err
|
|
} else if !found {
|
|
return fmt.Errorf("encryption key not found for snapshot with UID %s", snap.UID)
|
|
} else {
|
|
snap.GMSPublicKey = []byte(secret)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}); err != nil {
|
|
return cloudmigration.CloudMigrationSnapshotIndex{}, err
|
|
}
|
|
|
|
partitionsByResourceType := make(map[string][]int)
|
|
for _, partition := range partitions {
|
|
partitionsByResourceType[partition.ResourceType] = append(partitionsByResourceType[partition.ResourceType], partition.PartitionNumber)
|
|
}
|
|
|
|
return cloudmigration.CloudMigrationSnapshotIndex{
|
|
EncryptionAlgo: snap.EncryptionAlgo,
|
|
PublicKey: snap.PublicKey,
|
|
Metadata: snap.Metadata,
|
|
Items: partitionsByResourceType,
|
|
}, nil
|
|
}
|
|
|
|
func (ss *sqlStore) GetPartition(ctx context.Context, snapshotUID string, resourceType string, partitionNumber int) (cloudmigration.CloudMigrationSnapshotPartition, error) {
|
|
var partition cloudmigration.CloudMigrationSnapshotPartition
|
|
|
|
err := ss.db.InTransaction(ctx, func(ctx context.Context) error {
|
|
return ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
|
if _, err := sess.Where("snapshot_uid = ? AND resource_type = ? AND partition_number = ?", snapshotUID, resourceType, partitionNumber).Get(&partition); err != nil {
|
|
return fmt.Errorf("fetching partition from database: %w", err)
|
|
}
|
|
return nil
|
|
})
|
|
})
|
|
|
|
return partition, err
|
|
}
|
|
|
|
func (ss *sqlStore) deleteSnapshot(ctx context.Context, snapshotUid string) error {
|
|
return ss.db.InTransaction(ctx, func(ctx context.Context) error {
|
|
return ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
|
if _, err := sess.Delete(cloudmigration.CloudMigrationSnapshot{
|
|
UID: snapshotUid,
|
|
}); err != nil {
|
|
return fmt.Errorf("deleting snapshot: %w", err)
|
|
}
|
|
return nil
|
|
})
|
|
})
|
|
}
|
|
|
|
func (ss *sqlStore) getSnapshotByUID(ctx context.Context, orgID int64, sessionUID string, snapshotUID string) (*cloudmigration.CloudMigrationSnapshot, error) {
|
|
session, err := ss.GetMigrationSessionByUID(ctx, orgID, sessionUID)
|
|
if err != nil || session == nil {
|
|
return nil, err
|
|
}
|
|
|
|
// now we get the snapshot
|
|
var snapshot cloudmigration.CloudMigrationSnapshot
|
|
err = ss.db.WithDbSession(ctx, func(sess *db.Session) error {
|
|
exist, err := sess.Where("session_uid=? AND uid=?", sessionUID, snapshotUID).Get(&snapshot)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !exist {
|
|
return cloudmigration.ErrSnapshotNotFound
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &snapshot, nil
|
|
}
|
|
|
|
func (ss *sqlStore) GetSnapshotByUID(ctx context.Context, orgID int64, sessionUid, uid string, params cloudmigration.SnapshotResultQueryParams) (*cloudmigration.CloudMigrationSnapshot, error) {
|
|
// first we check if the session exists, using orgId and sessionUid
|
|
session, err := ss.GetMigrationSessionByUID(ctx, orgID, sessionUid)
|
|
if err != nil || session == nil {
|
|
return nil, err
|
|
}
|
|
|
|
// now we get the snapshot
|
|
var snapshot cloudmigration.CloudMigrationSnapshot
|
|
err = ss.db.WithDbSession(ctx, func(sess *db.Session) error {
|
|
exist, err := sess.Where("session_uid=? AND uid=?", sessionUid, uid).Get(&snapshot)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !exist {
|
|
return cloudmigration.ErrSnapshotNotFound
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if secret, found, err := ss.secretsStore.Get(ctx, secretskv.AllOrganizations, snapshot.UID, secretType); err != nil {
|
|
return &snapshot, err
|
|
} else if !found {
|
|
return &snapshot, fmt.Errorf("encryption key not found for snapshot with UID %s", snapshot.UID)
|
|
} else {
|
|
snapshot.GMSPublicKey = []byte(secret)
|
|
}
|
|
|
|
resources, err := ss.getSnapshotResources(ctx, uid, params)
|
|
if err == nil {
|
|
snapshot.Resources = resources
|
|
}
|
|
stats, err := ss.getSnapshotResourceStats(ctx, uid)
|
|
if err == nil {
|
|
snapshot.StatsRollup = *stats
|
|
}
|
|
|
|
return &snapshot, err
|
|
}
|
|
|
|
// GetSnapshotList returns snapshots without resources included. Use GetSnapshotByUID to get individual snapshot results.
|
|
// passing GetAllSnapshots will return all the elements regardless of the page
|
|
func (ss *sqlStore) GetSnapshotList(ctx context.Context, query cloudmigration.ListSnapshotsQuery) ([]cloudmigration.CloudMigrationSnapshot, error) {
|
|
if query.OrgID == 0 {
|
|
return nil, fmt.Errorf("org id is required")
|
|
}
|
|
if query.SessionUID == "" {
|
|
return nil, fmt.Errorf("session uid is required")
|
|
}
|
|
var snapshots = make([]cloudmigration.CloudMigrationSnapshot, 0)
|
|
err := ss.db.WithDbSession(ctx, func(sess *db.Session) error {
|
|
sess.Join("INNER", "cloud_migration_session",
|
|
"cloud_migration_session.uid = cloud_migration_snapshot.session_uid AND cloud_migration_session.org_id = ?", query.OrgID,
|
|
)
|
|
if query.Limit != GetAllSnapshots {
|
|
offset := (query.Page - 1) * query.Limit
|
|
sess.Limit(query.Limit, offset)
|
|
}
|
|
if query.Sort == GetSnapshotListSortingLatest {
|
|
sess.OrderBy("cloud_migration_snapshot.created DESC")
|
|
}
|
|
return sess.Find(&snapshots, &cloudmigration.CloudMigrationSnapshot{
|
|
SessionUID: query.SessionUID,
|
|
})
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for i, snapshot := range snapshots {
|
|
if secret, found, err := ss.secretsStore.Get(ctx, secretskv.AllOrganizations, snapshot.UID, secretType); err != nil {
|
|
return nil, err
|
|
} else if !found {
|
|
return nil, fmt.Errorf("encryption key not found for snapshot with UID %s", snapshot.UID)
|
|
} else {
|
|
snapshot.GMSPublicKey = []byte(secret)
|
|
}
|
|
|
|
if stats, err := ss.getSnapshotResourceStats(ctx, snapshot.UID); err != nil {
|
|
return nil, err
|
|
} else {
|
|
snapshot.StatsRollup = *stats
|
|
}
|
|
snapshots[i] = snapshot
|
|
}
|
|
return snapshots, nil
|
|
}
|
|
|
|
// CreateSnapshotResources initializes the local state of a resources belonging to a snapshot
|
|
// Inserting large enough datasets causes SQL errors, so we batch the inserts
|
|
func (ss *sqlStore) CreateSnapshotResources(ctx context.Context, snapshotUid string, resources []cloudmigration.CloudMigrationResource) error {
|
|
for chunk := range slices.Chunk(resources, maxResourceBatchSize) {
|
|
if err := ss.createSnapshotResources(ctx, snapshotUid, chunk); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ss *sqlStore) createSnapshotResources(ctx context.Context, snapshotUid string, resources []cloudmigration.CloudMigrationResource) error {
|
|
for i := 0; i < len(resources); i++ {
|
|
resources[i].UID = util.GenerateShortUID()
|
|
// ensure snapshot_uids are consistent so that we can use in conjunction with refID for lookup later
|
|
resources[i].SnapshotUID = snapshotUid
|
|
}
|
|
|
|
err := ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
|
_, err := sess.Insert(resources)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("creating resources: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// UpdateSnapshotResources updates a migration resource for a snapshot, using snapshot_uid + resource_uid as a lookup
|
|
// It does preprocessing on the results in order to minimize the sql queries executed.
|
|
// Updating large enough datasets causes SQL errors, so we batch the updates
|
|
func (ss *sqlStore) UpdateSnapshotResources(ctx context.Context, snapshotUid string, resources []cloudmigration.CloudMigrationResource) error {
|
|
for chunk := range slices.Chunk(resources, maxResourceBatchSize) {
|
|
if err := ss.updateSnapshotResources(ctx, snapshotUid, chunk); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ss *sqlStore) updateSnapshotResources(ctx context.Context, snapshotUid string, resources []cloudmigration.CloudMigrationResource) error {
|
|
// refIds of resources that migrated successfully in order to update in bulk
|
|
okIds := make([]any, 0, len(resources))
|
|
|
|
// group any failed resources by errCode and errStr
|
|
type errId struct {
|
|
errCode cloudmigration.ResourceErrorCode
|
|
errStr string
|
|
}
|
|
errorIds := make(map[errId][]any)
|
|
|
|
for _, r := range resources {
|
|
switch r.Status {
|
|
case cloudmigration.ItemStatusPending:
|
|
// Do nothing. A pending item should not be updated, as it is still in progress.
|
|
case cloudmigration.ItemStatusOK:
|
|
okIds = append(okIds, r.RefID)
|
|
case cloudmigration.ItemStatusError:
|
|
key := errId{errCode: r.ErrorCode, errStr: r.Error}
|
|
if ids, ok := errorIds[key]; ok {
|
|
errorIds[key] = append(ids, r.RefID)
|
|
} else {
|
|
errorIds[key] = []any{r.RefID}
|
|
}
|
|
}
|
|
}
|
|
|
|
type statement struct {
|
|
sql string
|
|
args []any
|
|
}
|
|
|
|
// Prepare a sql statement for all of the OK statuses
|
|
var okUpdateStatement *statement
|
|
if len(okIds) > 0 {
|
|
okUpdateStatement = &statement{
|
|
sql: fmt.Sprintf("UPDATE cloud_migration_resource SET status=? WHERE snapshot_uid=? AND resource_uid IN (?%s)", strings.Repeat(", ?", len(okIds)-1)),
|
|
args: append([]any{cloudmigration.ItemStatusOK, snapshotUid}, okIds...),
|
|
}
|
|
}
|
|
|
|
// Prepare however many sql statements are necessary for the error statuses
|
|
errorStatements := make([]statement, 0, len(errorIds))
|
|
for k, ids := range errorIds {
|
|
errorStatements = append(errorStatements, statement{
|
|
sql: fmt.Sprintf("UPDATE cloud_migration_resource SET status=?, error_code=?, error_string=? WHERE snapshot_uid=? AND resource_uid IN (?%s)", strings.Repeat(", ?", len(ids)-1)),
|
|
args: append([]any{cloudmigration.ItemStatusError, k.errCode, k.errStr, snapshotUid}, ids...),
|
|
})
|
|
}
|
|
|
|
// Execute the minimum number of required statements!
|
|
return ss.db.InTransaction(ctx, func(ctx context.Context) error {
|
|
err := ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
|
if okUpdateStatement != nil {
|
|
if _, err := sess.Exec(append([]any{okUpdateStatement.sql}, okUpdateStatement.args...)...); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
for _, q := range errorStatements {
|
|
if _, err := sess.Exec(append([]any{q.sql}, q.args...)...); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("updating resources: %w", err)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (ss *sqlStore) getSnapshotResources(ctx context.Context, snapshotUid string, params cloudmigration.SnapshotResultQueryParams) ([]cloudmigration.CloudMigrationResource, error) {
|
|
page, limit, col, dir, errorsOnly := int(params.ResultPage), int(params.ResultLimit), string(params.SortColumn), string(params.SortOrder), params.ErrorsOnly
|
|
|
|
var resources []cloudmigration.CloudMigrationResource
|
|
err := ss.db.WithDbSession(ctx, func(sess *db.Session) error {
|
|
offset := (page - 1) * limit
|
|
sess.Limit(limit, offset)
|
|
if errorsOnly {
|
|
sess.Where("status = ?", cloudmigration.ItemStatusError)
|
|
}
|
|
// TODO: It would be better if the query builder supported a case-insensitive flag for the .OrderBy() method
|
|
orderByClause := fmt.Sprintf("lower(%s) %s", col, dir)
|
|
if ss.db.GetDBType() == migrator.Postgres || // Postgres does not support lower() in ORDER BY -- sorts by case-insensitive by default
|
|
params.SortColumn == cloudmigration.SortColumnID { // Don't apply a string sort to a numeric column
|
|
orderByClause = fmt.Sprintf("%s %s", col, dir)
|
|
}
|
|
return sess.OrderBy(orderByClause).Find(&resources, &cloudmigration.CloudMigrationResource{
|
|
SnapshotUID: snapshotUid,
|
|
})
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return resources, nil
|
|
}
|
|
|
|
func (ss *sqlStore) getSnapshotResourceStats(ctx context.Context, snapshotUid string) (*cloudmigration.SnapshotResourceStats, error) {
|
|
typeCounts := make([]struct {
|
|
Count int `json:"count"`
|
|
Type string `json:"type"`
|
|
}, 0)
|
|
statusCounts := make([]struct {
|
|
Count int `json:"count"`
|
|
Status string `json:"status"`
|
|
}, 0)
|
|
total := 0
|
|
err := ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
|
if t, err := sess.Count(cloudmigration.CloudMigrationResource{SnapshotUID: snapshotUid}); err != nil {
|
|
return err
|
|
} else {
|
|
total = int(t)
|
|
}
|
|
sess.Select("count(uid) as \"count\", resource_type as \"type\"").
|
|
Table(tableName).
|
|
GroupBy("type").
|
|
Where("snapshot_uid = ?", snapshotUid)
|
|
if err := sess.Find(&typeCounts); err != nil {
|
|
return err
|
|
}
|
|
sess.Select("count(uid) as \"count\", status").
|
|
Table(tableName).
|
|
GroupBy("status").
|
|
Where("snapshot_uid = ?", snapshotUid)
|
|
return sess.Find(&statusCounts)
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
stats := &cloudmigration.SnapshotResourceStats{
|
|
CountsByType: make(map[cloudmigration.MigrateDataType]int, len(typeCounts)),
|
|
CountsByStatus: make(map[cloudmigration.ItemStatus]int, len(statusCounts)),
|
|
Total: total,
|
|
}
|
|
for _, c := range typeCounts {
|
|
stats.CountsByType[cloudmigration.MigrateDataType(c.Type)] = c.Count
|
|
}
|
|
for _, c := range statusCounts {
|
|
stats.CountsByStatus[cloudmigration.ItemStatus(c.Status)] = c.Count
|
|
}
|
|
return stats, nil
|
|
}
|
|
|
|
func (ss *sqlStore) deleteSnapshotResources(ctx context.Context, snapshotUid string) error {
|
|
return ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
|
_, err := sess.Delete(cloudmigration.CloudMigrationResource{
|
|
SnapshotUID: snapshotUid,
|
|
})
|
|
return err
|
|
})
|
|
}
|
|
|
|
func (ss *sqlStore) deleteSnapshotPartitions(ctx context.Context, snapshotUid string) error {
|
|
return ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
|
if _, err := sess.Delete(cloudmigration.CloudMigrationSnapshotPartition{
|
|
SnapshotUID: snapshotUid,
|
|
}); err != nil {
|
|
return fmt.Errorf("deleting snapshot partitions: %w", err)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (ss *sqlStore) encryptToken(ctx context.Context, cm *cloudmigration.CloudMigrationSession) error {
|
|
s, err := ss.secretsService.Encrypt(ctx, []byte(cm.AuthToken), secrets.WithoutScope())
|
|
if err != nil {
|
|
return fmt.Errorf("encrypting auth token: %w", err)
|
|
}
|
|
|
|
cm.AuthToken = base64.StdEncoding.EncodeToString(s)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ss *sqlStore) decryptToken(ctx context.Context, cm *cloudmigration.CloudMigrationSession) error {
|
|
if cm == nil {
|
|
return fmt.Errorf("unable to decypt token because migration session was not found: %w", cloudmigration.ErrMigrationNotFound)
|
|
}
|
|
|
|
if len(cm.AuthToken) == 0 {
|
|
return fmt.Errorf("unable to decrypt token because token is empty: %w", cloudmigration.ErrTokenNotFound)
|
|
}
|
|
|
|
decoded, err := base64.StdEncoding.DecodeString(cm.AuthToken)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to base64 decode token: %w", err)
|
|
}
|
|
|
|
t, err := ss.secretsService.Decrypt(ctx, decoded)
|
|
if err != nil {
|
|
return fmt.Errorf("decrypting auth token: %w", err)
|
|
}
|
|
|
|
cm.AuthToken = string(t)
|
|
|
|
return nil
|
|
}
|