Files
Bruno b1592b5e36 Cloud migrations: store snapshots in the database (#108551)
* Cloud migrations: store snapshots in the database

* update github.com/grafana/grafana-cloud-migration-snapshot to v1.9.0

* make update-workspace

* use new field name in test

* return error after call to fmt.Errorf

* create methods for readability / fix session deletiong not deleting snapshots

* remove debugging changes

* update sample.ini

* update tests to include OrgID in ListSnapshotsQuery

* lint

* lint

* Update pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go

Co-authored-by: Matheus Macabu <macabu@users.noreply.github.com>

* remove TODO

* Update pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go

Co-authored-by: Matheus Macabu <macabu@users.noreply.github.com>

* remove one of the debug logs

---------

Co-authored-by: Matheus Macabu <macabu@users.noreply.github.com>
2025-07-25 11:41:21 -03:00

1000 lines
35 KiB
Go

package cloudmigrationimpl
import (
"bytes"
"context"
cryptoRand "crypto/rand"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"slices"
"sort"
"time"
"go.opentelemetry.io/otel/codes"
"golang.org/x/crypto/nacl/box"
snapshot "github.com/grafana/grafana-cloud-migration-snapshot/src"
"github.com/grafana/grafana-cloud-migration-snapshot/src/contracts"
"github.com/grafana/grafana-cloud-migration-snapshot/src/infra/crypto"
"github.com/grafana/grafana/pkg/infra/tracing"
plugins "github.com/grafana/grafana/pkg/plugins"
ac "github.com/grafana/grafana/pkg/services/accesscontrol"
"github.com/grafana/grafana/pkg/services/cloudmigration"
"github.com/grafana/grafana/pkg/services/dashboards"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/folder"
libraryelements "github.com/grafana/grafana/pkg/services/libraryelements/model"
"github.com/grafana/grafana/pkg/services/org"
"github.com/grafana/grafana/pkg/services/pluginsintegration/pluginaccesscontrol"
"github.com/grafana/grafana/pkg/services/pluginsintegration/pluginsettings"
"github.com/grafana/grafana/pkg/services/user"
"github.com/grafana/grafana/pkg/util/retryer"
)
var currentMigrationTypes = []cloudmigration.MigrateDataType{
cloudmigration.DatasourceDataType,
cloudmigration.FolderDataType,
cloudmigration.LibraryElementDataType,
cloudmigration.DashboardDataType,
cloudmigration.MuteTimingType,
cloudmigration.NotificationTemplateType,
cloudmigration.ContactPointType,
cloudmigration.NotificationPolicyType,
cloudmigration.AlertRuleGroupType,
cloudmigration.AlertRuleType,
cloudmigration.PluginDataType,
}
//nolint:gocyclo
func (s *Service) getMigrationDataJSON(ctx context.Context, signedInUser *user.SignedInUser, resourceTypes cloudmigration.ResourceTypes) (*cloudmigration.MigrateDataRequest, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.getMigrationDataJSON")
defer span.End()
migrationDataSlice := make([]cloudmigration.MigrateDataRequestItem, 0)
folderHierarchy := make(map[cloudmigration.MigrateDataType]map[string]string, 0)
// Plugins
if resourceTypes.Has(cloudmigration.PluginDataType) {
plugins, err := s.getPlugins(ctx, signedInUser)
if err != nil {
s.log.Error("Failed to get plugins", "err", err)
return nil, err
}
for _, plugin := range plugins {
migrationDataSlice = append(migrationDataSlice, cloudmigration.MigrateDataRequestItem{
Type: cloudmigration.PluginDataType,
RefID: plugin.ID,
Name: plugin.Name,
Data: plugin.SettingCmd,
})
}
}
// Data sources
if resourceTypes.Has(cloudmigration.DatasourceDataType) {
dataSources, err := s.getDataSourceCommands(ctx, signedInUser)
if err != nil {
s.log.Error("Failed to get datasources", "err", err)
return nil, err
}
for _, ds := range dataSources {
migrationDataSlice = append(migrationDataSlice, cloudmigration.MigrateDataRequestItem{
Type: cloudmigration.DatasourceDataType,
RefID: ds.UID,
Name: ds.Name,
Data: ds,
})
}
}
// Dashboards & Folders: linked via the schema, so we need to get both
if resourceTypes.Has(cloudmigration.DashboardDataType) || resourceTypes.Has(cloudmigration.FolderDataType) {
dashs, folders, err := s.getDashboardAndFolderCommands(ctx, signedInUser)
if err != nil {
s.log.Error("Failed to get dashboards and folders", "err", err)
return nil, err
}
if resourceTypes.Has(cloudmigration.DashboardDataType) {
folderHierarchy[cloudmigration.DashboardDataType] = make(map[string]string, 0)
for _, dashboard := range dashs {
dashboard.Data.Del("id")
migrationDataSlice = append(migrationDataSlice, cloudmigration.MigrateDataRequestItem{
Type: cloudmigration.DashboardDataType,
RefID: dashboard.UID,
Name: dashboard.Title,
Data: dashboards.SaveDashboardCommand{
Dashboard: dashboard.Data,
Overwrite: true, // currently only intended to be a push, not a sync; revisit during the preview
Message: fmt.Sprintf("Created via the Grafana Cloud Migration Assistant by on-prem user \"%s\"", signedInUser.Login),
IsFolder: false,
FolderUID: dashboard.FolderUID,
},
})
folderHierarchy[cloudmigration.DashboardDataType][dashboard.UID] = dashboard.FolderUID
}
}
if resourceTypes.Has(cloudmigration.FolderDataType) {
folderHierarchy[cloudmigration.FolderDataType] = make(map[string]string, 0)
folders = sortFolders(folders)
for _, f := range folders {
migrationDataSlice = append(migrationDataSlice, cloudmigration.MigrateDataRequestItem{
Type: cloudmigration.FolderDataType,
RefID: f.UID,
Name: f.Title,
Data: f,
})
folderHierarchy[cloudmigration.FolderDataType][f.UID] = f.ParentUID
}
}
}
// Library Elements
if resourceTypes.Has(cloudmigration.LibraryElementDataType) {
libraryElements, err := s.getLibraryElementsCommands(ctx, signedInUser)
if err != nil {
s.log.Error("Failed to get library elements", "err", err)
return nil, err
}
folderHierarchy[cloudmigration.LibraryElementDataType] = make(map[string]string, 0)
for _, libraryElement := range libraryElements {
migrationDataSlice = append(migrationDataSlice, cloudmigration.MigrateDataRequestItem{
Type: cloudmigration.LibraryElementDataType,
RefID: libraryElement.UID,
Name: libraryElement.Name,
Data: libraryElement,
})
if libraryElement.FolderUID != nil {
folderHierarchy[cloudmigration.LibraryElementDataType][libraryElement.UID] = *libraryElement.FolderUID
}
}
}
// Alerts: Mute Timings
if resourceTypes.Has(cloudmigration.MuteTimingType) {
muteTimings, err := s.getAlertMuteTimings(ctx, signedInUser)
if err != nil {
s.log.Error("Failed to get alert mute timings", "err", err)
return nil, err
}
for _, muteTiming := range muteTimings {
migrationDataSlice = append(migrationDataSlice, cloudmigration.MigrateDataRequestItem{
Type: cloudmigration.MuteTimingType,
RefID: muteTiming.UID,
Name: muteTiming.Name,
Data: muteTiming,
})
}
}
// Alerts: Notification Templates
if resourceTypes.Has(cloudmigration.NotificationTemplateType) {
notificationTemplates, err := s.getNotificationTemplates(ctx, signedInUser)
if err != nil {
s.log.Error("Failed to get alert notification templates", "err", err)
return nil, err
}
for _, notificationTemplate := range notificationTemplates {
migrationDataSlice = append(migrationDataSlice, cloudmigration.MigrateDataRequestItem{
Type: cloudmigration.NotificationTemplateType,
RefID: notificationTemplate.UID,
Name: notificationTemplate.Name,
Data: notificationTemplate,
})
}
}
// Alerts: Contact Points
if resourceTypes.Has(cloudmigration.ContactPointType) {
contactPoints, err := s.getContactPoints(ctx, signedInUser)
if err != nil {
s.log.Error("Failed to get alert contact points", "err", err)
return nil, err
}
for _, contactPoint := range contactPoints {
migrationDataSlice = append(migrationDataSlice, cloudmigration.MigrateDataRequestItem{
Type: cloudmigration.ContactPointType,
RefID: contactPoint.UID,
Name: contactPoint.Name,
Data: contactPoint,
})
}
}
// Alerts: Notification Policies
if resourceTypes.Has(cloudmigration.NotificationPolicyType) {
notificationPolicies, err := s.getNotificationPolicies(ctx, signedInUser)
if err != nil {
s.log.Error("Failed to get alert notification policies", "err", err)
return nil, err
}
if len(notificationPolicies.Name) > 0 {
// Notification Policy can only be managed by updating its entire tree, so we send the whole thing as one item.
migrationDataSlice = append(migrationDataSlice, cloudmigration.MigrateDataRequestItem{
Type: cloudmigration.NotificationPolicyType,
RefID: notificationPolicies.Name, // no UID available
Name: notificationPolicies.Name,
Data: notificationPolicies.Routes,
})
}
}
// Alerts: Alert Rule Groups
if resourceTypes.Has(cloudmigration.AlertRuleGroupType) {
alertRuleGroups, err := s.getAlertRuleGroups(ctx, signedInUser)
if err != nil {
s.log.Error("Failed to get alert rule groups", "err", err)
return nil, err
}
for _, alertRuleGroup := range alertRuleGroups {
migrationDataSlice = append(migrationDataSlice, cloudmigration.MigrateDataRequestItem{
Type: cloudmigration.AlertRuleGroupType,
RefID: alertRuleGroup.Title, // no UID available
Name: alertRuleGroup.Title,
Data: alertRuleGroup,
})
}
}
// Alerts: Alert Rules
if resourceTypes.Has(cloudmigration.AlertRuleType) {
alertRules, err := s.getAlertRules(ctx, signedInUser)
if err != nil {
s.log.Error("Failed to get alert rules", "err", err)
return nil, err
}
folderHierarchy[cloudmigration.AlertRuleType] = make(map[string]string, 0)
for _, alertRule := range alertRules {
migrationDataSlice = append(migrationDataSlice, cloudmigration.MigrateDataRequestItem{
Type: cloudmigration.AlertRuleType,
RefID: alertRule.UID,
Name: alertRule.Title,
Data: alertRule,
})
folderHierarchy[cloudmigration.AlertRuleType][alertRule.UID] = alertRule.FolderUID
}
}
// Obtain the names of parent elements for data types that have folders.
parentNamesByType, err := s.getParentNames(ctx, signedInUser, folderHierarchy)
if err != nil {
s.log.Error("Failed to get parent folder names", "err", err)
}
migrationData := &cloudmigration.MigrateDataRequest{
Items: migrationDataSlice,
ItemParentNames: parentNamesByType,
}
return migrationData, nil
}
func (s *Service) getDataSourceCommands(ctx context.Context, signedInUser *user.SignedInUser) ([]datasources.AddDataSourceCommand, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.getDataSourceCommands")
defer span.End()
dataSources, err := s.dsService.GetDataSources(ctx, &datasources.GetDataSourcesQuery{OrgID: signedInUser.GetOrgID()})
if err != nil {
s.log.Error("Failed to get all datasources", "err", err)
return nil, err
}
result := make([]datasources.AddDataSourceCommand, 0, len(dataSources))
for _, dataSource := range dataSources {
// Decrypt secure json to send raw credentials
decryptedData, err := s.secretsService.DecryptJsonData(ctx, dataSource.SecureJsonData)
if err != nil {
s.log.Error("Failed to decrypt secure json data", "err", err)
return nil, err
}
dataSourceCmd := datasources.AddDataSourceCommand{
OrgID: dataSource.OrgID,
Name: dataSource.Name,
Type: dataSource.Type,
Access: dataSource.Access,
URL: dataSource.URL,
User: dataSource.User,
Database: dataSource.Database,
BasicAuth: dataSource.BasicAuth,
BasicAuthUser: dataSource.BasicAuthUser,
WithCredentials: dataSource.WithCredentials,
IsDefault: dataSource.IsDefault,
JsonData: dataSource.JsonData,
SecureJsonData: decryptedData,
ReadOnly: dataSource.ReadOnly,
UID: dataSource.UID,
}
result = append(result, dataSourceCmd)
}
return result, err
}
// getDashboardAndFolderCommands returns the json payloads required by the dashboard and folder creation APIs
func (s *Service) getDashboardAndFolderCommands(ctx context.Context, signedInUser *user.SignedInUser) ([]dashboards.Dashboard, []folder.CreateFolderCommand, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.getDashboardAndFolderCommands")
defer span.End()
dashs, err := s.dashboardService.GetAllDashboardsByOrgId(ctx, signedInUser.GetOrgID())
if err != nil {
return nil, nil, err
}
dashboardCmds := make([]dashboards.Dashboard, 0)
folderUids := make([]string, 0)
// Folders need to be fetched by UID in a separate step, separate dashboards from folders
// If any result is in the trash bin, don't migrate it
for _, d := range dashs {
if d.IsFolder {
folderUids = append(folderUids, d.UID)
} else {
dashboardCmds = append(dashboardCmds, *d)
}
}
folders, err := s.folderService.GetFolders(ctx, folder.GetFoldersQuery{
UIDs: folderUids,
SignedInUser: signedInUser,
OrgID: signedInUser.GetOrgID(),
WithFullpathUIDs: true,
})
if err != nil {
return nil, nil, err
}
folderCmds := make([]folder.CreateFolderCommand, 0, len(folders))
for _, f := range folders {
folderCmds = append(folderCmds, folder.CreateFolderCommand{
UID: f.UID,
Title: f.Title,
Description: f.Description,
ParentUID: f.ParentUID,
})
}
return dashboardCmds, folderCmds, nil
}
type libraryElement struct {
FolderUID *string `json:"folderUid"`
Name string `json:"name"`
UID string `json:"uid"`
Model json.RawMessage `json:"model"`
Kind int64 `json:"kind"`
}
// getLibraryElementsCommands returns the json payloads required by the library elements creation API
func (s *Service) getLibraryElementsCommands(ctx context.Context, signedInUser *user.SignedInUser) ([]libraryElement, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.getLibraryElementsCommands")
defer span.End()
const perPage = 100
cmds := make([]libraryElement, 0)
page := 1
count := 0
for {
query := libraryelements.SearchLibraryElementsQuery{
PerPage: perPage,
Page: page,
}
libraryElements, err := s.libraryElementsService.GetAllElements(ctx, signedInUser, query)
if err != nil {
return nil, fmt.Errorf("failed to get all library elements: %w", err)
}
for _, element := range libraryElements.Elements {
var folderUID *string
if len(element.FolderUID) > 0 {
folderUID = &element.FolderUID
}
cmds = append(cmds, libraryElement{
FolderUID: folderUID,
Name: element.Name,
Model: element.Model,
Kind: element.Kind,
UID: element.UID,
})
}
page += 1
count += libraryElements.PerPage
if len(libraryElements.Elements) == 0 || count >= int(libraryElements.TotalCount) {
break
}
}
return cmds, nil
}
type PluginCmd struct {
ID string `json:"id"`
Name string `json:"name"`
SettingCmd pluginsettings.UpdatePluginSettingCmd `json:"settingCmd"`
}
// IsPublicSignatureType returns true if plugin signature type is public
func IsPublicSignatureType(signatureType plugins.SignatureType) bool {
switch signatureType {
case plugins.SignatureTypeGrafana, plugins.SignatureTypeCommercial, plugins.SignatureTypeCommunity:
return true
case plugins.SignatureTypePrivate, plugins.SignatureTypePrivateGlob:
return false
}
return false
}
// getPlugins returns the json payloads required by the plugin creation API
func (s *Service) getPlugins(ctx context.Context, signedInUser *user.SignedInUser) ([]PluginCmd, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.getPlugins")
defer span.End()
results := make([]PluginCmd, 0)
plugins := s.pluginStore.Plugins(ctx)
// Obtain plugins from gcom
requestID := tracing.TraceIDFromContext(ctx, false)
gcomPlugins, err := s.gcomService.GetPlugins(ctx, requestID)
if err != nil {
return results, fmt.Errorf("fetching gcom plugins: %w", err)
}
// Permissions for listing plugins, taken from plugins api
userIsOrgAdmin := signedInUser.HasRole(org.RoleAdmin)
hasAccess, _ := s.accessControl.Evaluate(ctx, signedInUser, ac.EvalAny(
ac.EvalPermission(datasources.ActionCreate),
ac.EvalPermission(pluginaccesscontrol.ActionInstall),
))
if !userIsOrgAdmin && !hasAccess {
s.log.Info("user is not allowed to list non-core plugins", "UID", signedInUser.UserUID)
return results, nil
}
for _, plugin := range plugins {
// filter plugins to keep only the ones allowed by gcom
if _, exists := gcomPlugins[plugin.ID]; !exists {
continue
}
// filter plugins to keep only non core, signed, with public signature type plugins
if plugin.IsCorePlugin() || !plugin.Signature.IsValid() || !IsPublicSignatureType(plugin.SignatureType) {
continue
}
// filter out dependent app plugins
if plugin.IncludedInAppID != "" {
continue
}
// Permissions filtering, taken from plugins api
hasAccess, _ = s.accessControl.Evaluate(ctx, signedInUser, ac.EvalPermission(pluginaccesscontrol.ActionWrite, pluginaccesscontrol.ScopeProvider.GetResourceScope(plugin.ID)))
if !hasAccess {
continue
}
pluginSettingCmd := pluginsettings.UpdatePluginSettingCmd{
Enabled: plugin.AutoEnabled,
Pinned: plugin.Pinned,
PluginVersion: plugin.Info.Version,
PluginId: plugin.ID,
}
// get plugin settings from db if they exist
ps, err := s.pluginSettingsService.GetPluginSettingByPluginID(ctx, &pluginsettings.GetByPluginIDArgs{
PluginID: plugin.ID,
OrgID: signedInUser.OrgID,
})
if err != nil && !errors.Is(err, pluginsettings.ErrPluginSettingNotFound) {
return nil, fmt.Errorf("failed to get plugin settings: %w", err)
} else if ps != nil {
pluginSettingCmd.Enabled = ps.Enabled
pluginSettingCmd.Pinned = ps.Pinned
pluginSettingCmd.JsonData = ps.JSONData
decryptedData, err := s.secretsService.DecryptJsonData(ctx, ps.SecureJSONData)
if err != nil {
return nil, fmt.Errorf("failed to decrypt secure json data: %w", err)
}
pluginSettingCmd.SecureJsonData = decryptedData
}
results = append(results, PluginCmd{
ID: plugin.ID,
Name: plugin.Name,
SettingCmd: pluginSettingCmd,
})
}
return results, nil
}
// asynchronous process for writing the snapshot to the filesystem and updating the snapshot status
func (s *Service) buildSnapshot(
ctx context.Context,
signedInUser *user.SignedInUser,
maxItemsPerPartition uint32,
metadata []byte,
snapshotMeta cloudmigration.CloudMigrationSnapshot,
resourceTypes cloudmigration.ResourceTypes,
) error {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.buildSnapshot")
defer span.End()
// TODO -- make sure we can only build one snapshot at a time
s.buildSnapshotMutex.Lock()
defer s.buildSnapshotMutex.Unlock()
start := time.Now()
defer func() {
s.log.Debug(fmt.Sprintf("buildSnapshot: method completed in %d ms", time.Since(start).Milliseconds()))
}()
publicKey, privateKey, err := box.GenerateKey(cryptoRand.Reader)
if err != nil {
return fmt.Errorf("nacl: generating public and private key: %w", err)
}
s.log.Debug(fmt.Sprintf("buildSnapshot: generated keys in %d ms", time.Since(start).Milliseconds()))
// Use GMS public key + the grafana generated private key to encrypt snapshot files.
snapshotWriter, err := snapshot.NewSnapshotWriter(contracts.AssymetricKeys{
Public: snapshotMeta.GMSPublicKey,
Private: privateKey[:],
},
crypto.NewNacl(),
snapshotMeta.LocalDir,
)
if err != nil {
return fmt.Errorf("instantiating snapshot writer: %w", err)
}
s.log.Debug(fmt.Sprintf("buildSnapshot: created snapshot writing in %d ms", time.Since(start).Milliseconds()))
migrationData, err := s.getMigrationDataJSON(ctx, signedInUser, resourceTypes)
if err != nil {
return fmt.Errorf("fetching migration data: %w", err)
}
s.log.Debug(fmt.Sprintf("buildSnapshot: got migration data json in %d ms", time.Since(start).Milliseconds()))
localSnapshotResource := make([]cloudmigration.CloudMigrationResource, len(migrationData.Items))
resourcesGroupedByType := make(map[cloudmigration.MigrateDataType][]snapshot.MigrateDataRequestItemDTO, 0)
for i, item := range migrationData.Items {
resourcesGroupedByType[item.Type] = append(resourcesGroupedByType[item.Type], snapshot.MigrateDataRequestItemDTO{
Name: item.Name,
Type: snapshot.MigrateDataType(item.Type),
RefID: item.RefID,
Data: item.Data,
})
parentName := ""
if _, exists := migrationData.ItemParentNames[item.Type]; exists {
parentName = migrationData.ItemParentNames[item.Type][item.RefID]
}
localSnapshotResource[i] = cloudmigration.CloudMigrationResource{
Name: item.Name,
Type: item.Type,
RefID: item.RefID,
Status: cloudmigration.ItemStatusPending,
ParentName: parentName,
}
}
switch s.cfg.CloudMigration.ResourceStorageType {
case cloudmigration.ResourceStorageTypeDb:
if err := s.buildSnapshotWithDBStorage(ctx, snapshotMeta.UID, snapshotWriter, resourcesGroupedByType, maxItemsPerPartition); err != nil {
return fmt.Errorf("building snapshot with database storage: %w", err)
}
s.log.Debug(fmt.Sprintf("buildSnapshot: wrote data partitions with database storage in %d ms", time.Since(start).Milliseconds()))
case cloudmigration.ResourceStorageTypeFs:
if err := s.buildSnapshotWithFSStorage(publicKey[:], metadata, snapshotWriter, resourcesGroupedByType, maxItemsPerPartition); err != nil {
return fmt.Errorf("building snapshot with file system storage: %w", err)
}
s.log.Debug(fmt.Sprintf("buildSnapshot: wrote data partitions with file system storage in %d ms", time.Since(start).Milliseconds()))
default:
return fmt.Errorf("unknown resource storage type, check your configuration and try again: %q", s.cfg.CloudMigration.ResourceStorageType)
}
// update snapshot status to pending upload with retries
if err := s.updateSnapshotWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{
UID: snapshotMeta.UID,
SessionID: snapshotMeta.SessionUID,
Status: cloudmigration.SnapshotStatusPendingUpload,
LocalResourcesToCreate: localSnapshotResource,
PublicKey: publicKey[:],
}); err != nil {
return err
}
return nil
}
func (s *Service) buildSnapshotWithDBStorage(ctx context.Context, snapshotUID string, snapshotWriter *snapshot.SnapshotWriter, resourcesGroupedByType map[cloudmigration.MigrateDataType][]snapshot.MigrateDataRequestItemDTO, maxItemsPerPartition uint32) error {
for _, resourceType := range currentMigrationTypes {
i := 0
for chunk := range slices.Chunk(resourcesGroupedByType[resourceType], int(maxItemsPerPartition)) {
encoded, err := snapshotWriter.EncodePartition(chunk)
if err != nil {
return fmt.Errorf("encoding snapshot partition: %w", err)
}
if err := s.store.StorePartition(ctx, snapshotUID, string(resourceType), i, encoded); err != nil {
return fmt.Errorf("storing partition into database: %w", err)
}
i += 1
}
}
return nil
}
func (s *Service) buildSnapshotWithFSStorage(publicKey, metadata []byte, snapshotWriter *snapshot.SnapshotWriter, resourcesGroupedByType map[cloudmigration.MigrateDataType][]snapshot.MigrateDataRequestItemDTO, maxItemsPerPartition uint32) error {
for _, resourceType := range currentMigrationTypes {
for chunk := range slices.Chunk(resourcesGroupedByType[resourceType], int(maxItemsPerPartition)) {
if err := snapshotWriter.Write(string(resourceType), chunk); err != nil {
return fmt.Errorf("writing resources to snapshot writer: resourceType=%s %w", resourceType, err)
}
}
}
// Add the grafana generated public key to the index file so gms can use it to decrypt the snapshot files later.
// This works because the snapshot files are being encrypted with
// the grafana generated private key + the gms public key.
if _, err := snapshotWriter.Finish(snapshot.FinishInput{
SenderPublicKey: publicKey[:],
Metadata: metadata,
}); err != nil {
return fmt.Errorf("finishing writing snapshot files and generating index file: %w", err)
}
return nil
}
// asynchronous process for and updating the snapshot status
func (s *Service) uploadSnapshot(ctx context.Context, session *cloudmigration.CloudMigrationSession, snapshotMeta *cloudmigration.CloudMigrationSnapshot, uploadUrl string) (err error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.uploadSnapshot")
defer span.End()
// TODO -- make sure we can only upload one snapshot at a time
s.buildSnapshotMutex.Lock()
defer s.buildSnapshotMutex.Unlock()
start := time.Now()
defer func() {
s.log.Debug(fmt.Sprintf("uploadSnapshot: method completed in %d ms", time.Since(start).Milliseconds()))
}()
switch s.cfg.CloudMigration.ResourceStorageType {
case cloudmigration.ResourceStorageTypeDb:
if err := s.uploadSnapshotWithDBStorage(ctx, session, snapshotMeta, uploadUrl); err != nil {
return fmt.Errorf("uploading snapshot with database storage: %w", err)
}
case cloudmigration.ResourceStorageTypeFs:
if err := s.uploadSnapshotWithFSStorage(ctx, session, snapshotMeta, uploadUrl); err != nil {
return fmt.Errorf("uploading snapshot with file system storage: %w", err)
}
default:
return fmt.Errorf("unknown resource storage type, check your configuration and try again: %q", s.cfg.CloudMigration.ResourceStorageType)
}
s.log.Info("successfully uploaded snapshot", "snapshotUid", snapshotMeta.UID, "cloud_snapshotUid", snapshotMeta.GMSSnapshotUID)
// update snapshot status to processing with retries
if err := s.updateSnapshotWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{
UID: snapshotMeta.UID,
SessionID: snapshotMeta.SessionUID,
Status: cloudmigration.SnapshotStatusProcessing,
}); err != nil {
return err
}
return nil
}
func (s *Service) uploadSnapshotWithDBStorage(ctx context.Context, session *cloudmigration.CloudMigrationSession, snapshotMeta *cloudmigration.CloudMigrationSnapshot, uploadUrl string) error {
index, err := s.store.GetIndex(ctx, session.OrgID, snapshotMeta.SessionUID, snapshotMeta.UID)
if err != nil {
return fmt.Errorf("fetching index from database: %w", err)
}
snapshotIndex := snapshot.Index{
Version: 1,
EncryptionAlgo: index.EncryptionAlgo,
PublicKey: index.PublicKey,
Metadata: index.Metadata,
Items: make(map[string][]string),
}
var partitionToFileName = func(resourceType string, partitionNumber int) string {
return fmt.Sprintf("%+v_%+v", resourceType, partitionNumber)
}
for resourceType, partitionsNumbers := range index.Items {
for _, partitionNumber := range partitionsNumbers {
fileName := partitionToFileName(resourceType, partitionNumber)
snapshotIndex.Items[resourceType] = append(snapshotIndex.Items[resourceType], fileName)
key := fmt.Sprintf("%d/snapshots/%s/%+v", session.StackID, snapshotMeta.GMSSnapshotUID, fileName)
partition, err := s.store.GetPartition(ctx, snapshotMeta.UID, resourceType, partitionNumber)
if err != nil {
return fmt.Errorf("fetching partition from database: %w", err)
}
if err = s.objectStorage.PresignedURLUpload(ctx, uploadUrl, key, bytes.NewReader(partition.Data)); err != nil {
return fmt.Errorf("uploading file using presigned url: %w", err)
}
}
}
key := fmt.Sprintf("%d/snapshots/%s/%s", session.StackID, snapshotMeta.GMSSnapshotUID, "index.json")
buffer, err := snapshot.EncodeIndex(snapshotIndex)
if err != nil {
return fmt.Errorf("encoding snapshot index for upload: %w", err)
}
if err = s.objectStorage.PresignedURLUpload(ctx, uploadUrl, key, bytes.NewReader(buffer)); err != nil {
return fmt.Errorf("uploading index file using presigned url: %w", err)
}
return nil
}
func (s *Service) uploadSnapshotWithFSStorage(ctx context.Context, session *cloudmigration.CloudMigrationSession, snapshotMeta *cloudmigration.CloudMigrationSnapshot, uploadUrl string) error {
indexFilePath := filepath.Join(snapshotMeta.LocalDir, "index.json")
start := time.Now()
// LocalDir can be set in the configuration, therefore the file path can be set to any path.
// nolint:gosec
indexFile, err := os.Open(indexFilePath)
if err != nil {
return fmt.Errorf("opening index files: %w. If you are running Grafana in a highly-available setup, try setting cloud_migration.resource_storage_type to 'db' or scaling down to one replica", err)
}
defer func() {
if closeErr := indexFile.Close(); closeErr != nil {
s.log.Error("closing index file", "err", closeErr.Error())
}
}()
_, readIndexSpan := s.tracer.Start(ctx, "CloudMigrationService.uploadSnapshot.readIndex")
index, err := snapshot.ReadIndex(indexFile)
if err != nil {
readIndexSpan.SetStatus(codes.Error, "reading index from file")
readIndexSpan.RecordError(err)
readIndexSpan.End()
return fmt.Errorf("reading index from file: %w", err)
}
readIndexSpan.End()
uploadCtx, uploadSpan := s.tracer.Start(ctx, "CloudMigrationService.uploadSnapshot.uploadDataFiles")
// Upload the data files.
for _, fileNames := range index.Items {
for _, fileName := range fileNames {
filePath := filepath.Join(snapshotMeta.LocalDir, fileName)
key := fmt.Sprintf("%d/snapshots/%s/%s", session.StackID, snapshotMeta.GMSSnapshotUID, fileName)
if err := s.uploadUsingPresignedURL(uploadCtx, uploadUrl, key, filePath); err != nil {
uploadSpan.SetStatus(codes.Error, "uploading snapshot data file using presigned url")
uploadSpan.RecordError(err)
uploadSpan.End()
return fmt.Errorf("uploading snapshot file using presigned url: %w", err)
}
s.log.Debug(fmt.Sprintf("uploadSnapshot: uploaded %s in %d ms", fileName, time.Since(start).Milliseconds()))
}
}
uploadSpan.End()
s.log.Debug(fmt.Sprintf("uploadSnapshot: uploaded all data files in %d ms", time.Since(start).Milliseconds()))
uploadCtx, uploadSpan = s.tracer.Start(ctx, "CloudMigrationService.uploadSnapshot.uploadIndex")
// Upload the index file. Must be done after uploading the data files.
key := fmt.Sprintf("%d/snapshots/%s/%s", session.StackID, snapshotMeta.GMSSnapshotUID, "index.json")
if _, err := indexFile.Seek(0, 0); err != nil {
uploadSpan.SetStatus(codes.Error, "seeking to beginning of index file")
uploadSpan.RecordError(err)
uploadSpan.End()
return fmt.Errorf("seeking to beginning of index file: %w", err)
}
if err := s.objectStorage.PresignedURLUpload(uploadCtx, uploadUrl, key, indexFile); err != nil {
uploadSpan.SetStatus(codes.Error, "uploading index file using presigned url")
uploadSpan.RecordError(err)
uploadSpan.End()
return fmt.Errorf("uploading file using presigned url: %w", err)
}
uploadSpan.End()
s.log.Debug(fmt.Sprintf("uploadSnapshot: uploaded index file in %d ms", time.Since(start).Milliseconds()))
return nil
}
func (s *Service) uploadUsingPresignedURL(ctx context.Context, uploadURL, key string, filePath string) (err error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.uploadUsingPresignedURL")
defer span.End()
// The directory that contains the file can set in the configuration, therefore the directory can be any directory.
// nolint:gosec
file, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("opening snapshot file: path=%s %w", filePath, err)
}
defer func() {
if closeErr := file.Close(); closeErr != nil {
s.log.Error("closing file", "path", filePath, "err", closeErr)
}
}()
if err = s.objectStorage.PresignedURLUpload(ctx, uploadURL, key, file); err != nil {
return fmt.Errorf("uploading file using presigned url: %w", err)
}
return nil
}
func (s *Service) updateSnapshotWithRetries(ctx context.Context, cmd cloudmigration.UpdateSnapshotCmd) (err error) {
maxRetries := 10
retries := 0
if err := retryer.Retry(func() (retryer.RetrySignal, error) {
if err := s.store.UpdateSnapshot(ctx, cmd); err != nil {
s.log.Error("updating snapshot in retry loop", "error", err.Error())
retries++
if retries > maxRetries {
return retryer.FuncError, err
}
return retryer.FuncFailure, nil
}
return retryer.FuncComplete, nil
}, maxRetries, time.Millisecond*10, time.Second*5); err != nil {
s.log.Error("failed to update snapshot status", "snapshotUid", cmd.UID, "status", cmd.Status, "num_local_resources", len(cmd.LocalResourcesToCreate), "num_cloud_resources", len(cmd.CloudResourcesToUpdate), "error", err.Error())
return fmt.Errorf("failed to update snapshot status: %w", err)
}
return nil
}
// sortFolders implements a sort such that parent folders always come before their children
// Implementation inspired by ChatGPT, OpenAI's language model.
func sortFolders(input []folder.CreateFolderCommand) []folder.CreateFolderCommand {
// Map from UID to the corresponding folder for quick lookup
folderMap := make(map[string]folder.CreateFolderCommand)
for _, folder := range input {
folderMap[folder.UID] = folder
}
// Dynamic map of folderUID to depth
depthMap := make(map[string]int)
// Function to get the depth of a folder based on its parent hierarchy
var getDepth func(uid string) int
getDepth = func(uid string) int {
if uid == "" {
return 0
}
if d, ok := depthMap[uid]; ok {
return d
}
folder, exists := folderMap[uid]
if !exists || folder.ParentUID == "" {
return 1
}
return 1 + getDepth(folder.ParentUID)
}
// Calculate the depth of each folder
for _, folder := range input {
depthMap[folder.UID] = getDepth(folder.UID)
}
// Sort folders by their depth, ensuring a stable sort
sort.SliceStable(input, func(i, j int) bool {
return depthMap[input[i].UID] < depthMap[input[j].UID]
})
return input
}
// getFolderNamesForFolderUIDs queries the folders service to obtain folder names for a list of folderUIDs
func (s *Service) getFolderNamesForFolderUIDs(ctx context.Context, signedInUser *user.SignedInUser, folderUIDs []string) (map[string](string), error) {
folders, err := s.folderService.GetFolders(ctx, folder.GetFoldersQuery{
UIDs: folderUIDs,
SignedInUser: signedInUser,
OrgID: signedInUser.GetOrgID(),
WithFullpathUIDs: true,
})
if err != nil {
s.log.Error("Failed to obtain folders from folder UIDs", "err", err)
return nil, err
}
folderUIDsToNames := make(map[string](string), len(folderUIDs))
for _, folderUID := range folderUIDs {
folderUIDsToNames[folderUID] = ""
}
for _, f := range folders {
folderUIDsToNames[f.UID] = f.Title
}
return folderUIDsToNames, nil
}
// getParentNames finds the parent names for resources and returns a map of data type: {data UID : parentName}
// for dashboards, folders and library elements - the parent is the parent folder
func (s *Service) getParentNames(
ctx context.Context,
signedInUser *user.SignedInUser,
folderHierarchy map[cloudmigration.MigrateDataType]map[string]string,
) (map[cloudmigration.MigrateDataType]map[string](string), error) {
parentNamesByType := make(map[cloudmigration.MigrateDataType]map[string]string)
for _, dataType := range currentMigrationTypes {
parentNamesByType[dataType] = make(map[string]string)
}
// Obtain list of unique folderUIDs
parentFolderUIDsSet := make(map[string]struct{})
for _, folderUIDs := range folderHierarchy {
for _, folderUID := range folderUIDs {
// Skip the root folder
if folderUID == "" {
continue
}
parentFolderUIDsSet[folderUID] = struct{}{}
}
}
parentFolderUIDsSlice := make([]string, 0, len(parentFolderUIDsSet))
for parentFolderUID := range parentFolderUIDsSet {
parentFolderUIDsSlice = append(parentFolderUIDsSlice, parentFolderUID)
}
// Obtain folder names given a list of folderUIDs
foldersUIDsToFolderName, err := s.getFolderNamesForFolderUIDs(ctx, signedInUser, parentFolderUIDsSlice)
if err != nil {
s.log.Error("Failed to get parent folder names from folder UIDs", "err", err)
return parentNamesByType, err
}
// Prepare map of {data type: {data UID : parentName}}
for dataType, uidFolderMap := range folderHierarchy {
for uid, folderUID := range uidFolderMap {
parentNamesByType[dataType][uid] = foldersUIDsToFolderName[folderUID]
}
}
return parentNamesByType, err
}