From bb344fcd836c7effe79a3bf03794e8cbb61c013a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roberto=20Jim=C3=A9nez=20S=C3=A1nchez?= Date: Mon, 31 Mar 2025 14:27:46 +0200 Subject: [PATCH] Remote provisioning: consolidate resource operations (#102972) * Move to new repository * Rename it to dual writer * Rename the function * Rename the methods * Rename to exportResource * Clean up logic in migrate and add TODOs * Add TODOs * Use generic client for unprovisioned * ForEachResource * More consolidation * Refactor more around client * Consolidate constants * ForEachFolder * More use of constants * Add FIXME notes * Use more constant * Remove Dashboard * Pass tree to folder manager * Replicate tree * Reduce export complexity * More refactoring * Use the ForEach for loading users * Limit in-memory folders * Isolate the object * Improve the export function * Move resources to resources package * Move delete operation * Move more logic * More consolidation * More renaming * Fix more issues * Ensure path exists when created a resource * Simply append error * Fix receiver lint issue * Fix cyclomatic complexity * Fix linting * Remove folder path creation --- pkg/registry/apis/provisioning/files.go | 314 +++++------------- .../apis/provisioning/jobs/export/folders.go | 96 ------ .../apis/provisioning/jobs/export/job.go | 45 --- .../provisioning/jobs/export/resources.go | 153 --------- .../apis/provisioning/jobs/export/worker.go | 84 ++++- .../apis/provisioning/jobs/migrate/folders.go | 108 ++---- .../provisioning/jobs/migrate/resources.go | 262 +++++---------- .../jobs/migrate/{import.go => storage.go} | 36 +- .../apis/provisioning/jobs/migrate/users.go | 43 +-- .../apis/provisioning/jobs/migrate/worker.go | 186 +++++------ .../apis/provisioning/jobs/sync/changes.go | 11 +- .../apis/provisioning/jobs/sync/worker.go | 203 ++++------- pkg/registry/apis/provisioning/register.go | 1 + pkg/registry/apis/provisioning/request.go | 1 + .../apis/provisioning/resources/client.go | 115 ++++++- .../apis/provisioning/resources/dualwriter.go | 276 +++++++++++++++ .../apis/provisioning/resources/folders.go | 62 +++- .../apis/provisioning/resources/parser.go | 2 + .../apis/provisioning/resources/resources.go | 229 +++++++++++++ .../apis/provisioning/resources/tree.go | 12 +- pkg/tests/apis/provisioning/client_test.go | 14 +- 21 files changed, 1117 insertions(+), 1136 deletions(-) delete mode 100644 pkg/registry/apis/provisioning/jobs/export/folders.go delete mode 100644 pkg/registry/apis/provisioning/jobs/export/job.go delete mode 100644 pkg/registry/apis/provisioning/jobs/export/resources.go rename pkg/registry/apis/provisioning/jobs/migrate/{import.go => storage.go} (65%) create mode 100644 pkg/registry/apis/provisioning/resources/dualwriter.go create mode 100644 pkg/registry/apis/provisioning/resources/resources.go diff --git a/pkg/registry/apis/provisioning/files.go b/pkg/registry/apis/provisioning/files.go index ffd6d678e06..8dc2cafb378 100644 --- a/pkg/registry/apis/provisioning/files.go +++ b/pkg/registry/apis/provisioning/files.go @@ -2,18 +2,15 @@ package provisioning import ( "context" - "errors" "fmt" "net/http" "time" apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apiserver/pkg/registry/rest" "github.com/grafana/grafana-app-sdk/logging" - "github.com/grafana/grafana/pkg/apimachinery/apis/common/v0alpha1" provisioning "github.com/grafana/grafana/pkg/apis/provisioning/v0alpha1" "github.com/grafana/grafana/pkg/registry/apis/provisioning/repository" "github.com/grafana/grafana/pkg/registry/apis/provisioning/resources" @@ -54,7 +51,6 @@ func (*filesConnector) NewConnectOptions() (runtime.Object, bool, string) { } // TODO: document the synchronous write and delete on the API Spec -// TODO: Move dual write logic to `resources` package and keep this connector simple func (s *filesConnector) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) { logger := logging.FromContext(ctx).With("logger", "files-connector", "repository_name", name) ctx = logging.Context(ctx, logger) @@ -64,11 +60,23 @@ func (s *filesConnector) Connect(ctx context.Context, name string, opts runtime. return nil, err } - reader, ok := repo.(repository.Reader) + readWriter, ok := repo.(repository.ReaderWriter) if !ok { - return nil, apierrors.NewBadRequest("repository does not support read") + return nil, apierrors.NewBadRequest("repository does not support read-writing") } + parser, err := s.parsers.GetParser(ctx, readWriter) + if err != nil { + return nil, fmt.Errorf("failed to get parser: %w", err) + } + + folderClient, err := parser.Clients().Folder() + if err != nil { + return nil, fmt.Errorf("failed to get folder client: %w", err) + } + folders := resources.NewFolderManager(readWriter, folderClient, resources.NewEmptyFolderTree()) + dualReadWriter := resources.NewDualReadWriter(readWriter, parser, folders) + return withTimeout(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() ref := query.Get("ref") @@ -89,30 +97,11 @@ func (s *filesConnector) Connect(ctx context.Context, name string, opts runtime. isDir := safepath.IsDir(filePath) if r.Method == http.MethodGet && isDir { - // TODO: Implement folder navigation - if len(filePath) > 0 { - responder.Error(apierrors.NewBadRequest("folder navigation not yet supported")) - return - } - - // TODO: Add pagination - rsp, err := reader.ReadTree(ctx, ref) + files, err := s.listFolderFiles(ctx, filePath, ref, readWriter) if err != nil { responder.Error(err) - return } - files := &provisioning.FileList{} - for _, v := range rsp { - if !v.Blob { - continue // folder item - } - files.Items = append(files.Items, provisioning.FileItem{ - Path: v.Path, - Size: v.Size, - Hash: v.Hash, - }) - } responder.Object(http.StatusOK, files) return } @@ -132,19 +121,60 @@ func (s *filesConnector) Connect(ctx context.Context, name string, opts runtime. code := http.StatusOK switch r.Method { case http.MethodGet: - code, obj, err = s.doRead(ctx, reader, filePath, ref) + resource, err := dualReadWriter.Read(ctx, filePath, ref) + if err != nil { + responder.Error(err) + return + } + + obj = resource.AsResourceWrapper() + code = http.StatusOK + if len(resource.Errors) > 0 { + code = http.StatusNotAcceptable + } case http.MethodPost: - obj, err = s.doWrite(ctx, false, repo, filePath, ref, message, r) + if isDir { + obj, err = dualReadWriter.CreateFolder(ctx, filePath, ref, message) + } else { + data, err := readBody(r, filesMaxBodySize) + if err != nil { + responder.Error(err) + return + } + + resource, err := dualReadWriter.CreateResource(ctx, filePath, ref, message, data) + if err != nil { + responder.Error(err) + return + } + obj = resource.AsResourceWrapper() + } case http.MethodPut: // TODO: document in API specification if isDir { err = apierrors.NewMethodNotSupported(provisioning.RepositoryResourceInfo.GroupResource(), r.Method) } else { - obj, err = s.doWrite(ctx, true, repo, filePath, ref, message, r) + data, err := readBody(r, filesMaxBodySize) + if err != nil { + responder.Error(err) + return + } + + resource, err := dualReadWriter.UpdateResource(ctx, filePath, ref, message, data) + if err != nil { + responder.Error(err) + return + } + obj = resource.AsResourceWrapper() } case http.MethodDelete: - // TODO: limit file size - obj, err = s.doDelete(ctx, repo, filePath, ref, message) + resource, err := dualReadWriter.Delete(ctx, filePath, ref, message) + if err != nil { + responder.Error(err) + return + } + + obj = resource.AsResourceWrapper() default: err = apierrors.NewMethodNotSupported(provisioning.RepositoryResourceInfo.GroupResource(), r.Method) } @@ -165,218 +195,32 @@ func (s *filesConnector) Connect(ctx context.Context, name string, opts runtime. }), 30*time.Second), nil } -func (s *filesConnector) doRead(ctx context.Context, repo repository.Reader, path string, ref string) (int, *provisioning.ResourceWrapper, error) { - info, err := repo.Read(ctx, path, ref) - if err != nil { - return 0, nil, err +// listFolderFiles returns a list of files in a folder +func (s *filesConnector) listFolderFiles(ctx context.Context, filePath string, ref string, readWriter repository.ReaderWriter) (*provisioning.FileList, error) { + // TODO: Implement folder navigation + if len(filePath) > 0 { + return nil, apierrors.NewBadRequest("folder navigation not yet supported") } - parser, err := s.parsers.GetParser(ctx, repo) + // TODO: Add pagination + rsp, err := readWriter.ReadTree(ctx, ref) if err != nil { - return 0, nil, err + return nil, err } - parsed, err := parser.Parse(ctx, info, true) - if err != nil { - return 0, nil, err - } - - // GVR will exist for anything we can actually save - // TODO: Add known error in parser for unsupported resource - if parsed.GVR == nil { - if parsed.GVK != nil { - //nolint:govet - parsed.Errors = append(parsed.Errors, fmt.Errorf("unknown resource for Kind: %s", parsed.GVK.Kind)) - } else { - parsed.Errors = append(parsed.Errors, fmt.Errorf("unknown resource")) + files := &provisioning.FileList{} + for _, v := range rsp { + if !v.Blob { + continue // folder item } + files.Items = append(files.Items, provisioning.FileItem{ + Path: v.Path, + Size: v.Size, + Hash: v.Hash, + }) } - code := http.StatusOK - if len(parsed.Errors) > 0 { - code = http.StatusNotAcceptable - } - return code, parsed.AsResourceWrapper(), nil -} - -func (s *filesConnector) doWrite(ctx context.Context, update bool, repo repository.Repository, path string, ref string, message string, req *http.Request) (*provisioning.ResourceWrapper, error) { - if err := repository.IsWriteAllowed(repo.Config(), ref); err != nil { - return nil, err - } - - writer, ok := repo.(repository.ReaderWriter) - if !ok { - return nil, apierrors.NewBadRequest("repository does not support read-writing") - } - - parser, err := s.parsers.GetParser(ctx, writer) - if err != nil { - return nil, err - } - - defer func() { _ = req.Body.Close() }() - if safepath.IsDir(path) { - return s.doCreateFolder(ctx, writer, path, ref, message, parser) - } - - data, err := readBody(req, filesMaxBodySize) - if err != nil { - return nil, err - } - - info := &repository.FileInfo{ - Data: data, - Path: path, - Ref: ref, - } - - // TODO: improve parser to parse out of reader - parsed, err := parser.Parse(ctx, info, true) - if err != nil { - if errors.Is(err, resources.ErrUnableToReadResourceBytes) { - return nil, apierrors.NewBadRequest("unable to read the request as a resource") - } - return nil, err - } - - // GVR will exist for anything we can actually save - // TODO: Add known error in parser for unsupported resource - if parsed.GVR == nil { - return nil, apierrors.NewBadRequest("The payload does not map to a known resource") - } - - // Do not write if any errors exist - if len(parsed.Errors) > 0 { - return parsed.AsResourceWrapper(), err - } - - data, err = parsed.ToSaveBytes() - if err != nil { - return nil, err - } - - if update { - err = writer.Update(ctx, path, ref, data, message) - } else { - err = writer.Create(ctx, path, ref, data, message) - } - if err != nil { - return nil, err - } - - // Directly update the grafana database - // Behaves the same running sync after writing - if ref == "" { - if parsed.Existing == nil { - parsed.Upsert, err = parsed.Client.Create(ctx, parsed.Obj, metav1.CreateOptions{}) - if err != nil { - parsed.Errors = append(parsed.Errors, err) - } - } else { - parsed.Upsert, err = parsed.Client.Update(ctx, parsed.Obj, metav1.UpdateOptions{}) - if err != nil { - parsed.Errors = append(parsed.Errors, err) - } - } - } - - return parsed.AsResourceWrapper(), err -} - -func (s *filesConnector) doCreateFolder(ctx context.Context, repo repository.Writer, path string, ref string, message string, parser *resources.Parser) (*provisioning.ResourceWrapper, error) { - client, err := parser.Clients().Folder() - if err != nil { - return nil, err - } - manager := resources.NewFolderManager(repo, client) - - // Now actually create the folder - if err := repo.Create(ctx, path, ref, nil, message); err != nil { - return nil, fmt.Errorf("failed to create folder: %w", err) - } - - cfg := repo.Config() - wrap := &provisioning.ResourceWrapper{ - Path: path, - Ref: ref, - Repository: provisioning.ResourceRepositoryInfo{ - Type: cfg.Spec.Type, - Namespace: cfg.Namespace, - Name: cfg.Name, - Title: cfg.Spec.Title, - }, - Resource: provisioning.ResourceObjects{ - Action: provisioning.ResourceActionCreate, - }, - } - - if ref == "" { - folderName, err := manager.EnsureFolderPathExist(ctx, path) - if err != nil { - return nil, err - } - - current, err := manager.GetFolder(ctx, folderName) - if err != nil && !apierrors.IsNotFound(err) { - return nil, err // unable to check if the folder exists - } - wrap.Resource.Upsert = v0alpha1.Unstructured{ - Object: current.Object, - } - } - - return wrap, nil -} - -// Deletes a file from the repository and the Grafana database. -// If the path is a folder, it will return an error. -// If the file is not parsable, it will return an error. -func (s *filesConnector) doDelete(ctx context.Context, repo repository.Repository, path string, ref string, message string) (*provisioning.ResourceWrapper, error) { - if err := repository.IsWriteAllowed(repo.Config(), ref); err != nil { - return nil, err - } - - // Read the existing value - access, ok := repo.(repository.ReaderWriter) - if !ok { - return nil, fmt.Errorf("repository is not read+writeable") - } - - file, err := access.Read(ctx, path, ref) - if err != nil { - return nil, err // unable to read value - } - - parser, err := s.parsers.GetParser(ctx, access) - if err != nil { - return nil, err // unable to read value - } - - // TODO: document in API specification - // We can only delete parsable things - parsed, err := parser.Parse(ctx, file, false) - if err != nil { - return nil, err // unable to read value - } - - parsed.Action = provisioning.ResourceActionDelete - wrap := parsed.AsResourceWrapper() - - // Now delete the file - err = access.Delete(ctx, path, ref, message) - if err != nil { - return nil, err - } - - // Delete the file in the grafana database - if ref == "" { - err = parsed.Client.Delete(ctx, parsed.Obj.GetName(), metav1.DeleteOptions{}) - if apierrors.IsNotFound(err) { - err = nil // ignorable - } - } - - return wrap, err + return files, nil } var ( diff --git a/pkg/registry/apis/provisioning/jobs/export/folders.go b/pkg/registry/apis/provisioning/jobs/export/folders.go deleted file mode 100644 index d8fe746cff0..00000000000 --- a/pkg/registry/apis/provisioning/jobs/export/folders.go +++ /dev/null @@ -1,96 +0,0 @@ -package export - -import ( - "context" - "errors" - "fmt" - - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - folders "github.com/grafana/grafana/pkg/apis/folder/v0alpha1" - "github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs" - "github.com/grafana/grafana/pkg/registry/apis/provisioning/repository" - "github.com/grafana/grafana/pkg/registry/apis/provisioning/resources" - "github.com/grafana/grafana/pkg/registry/apis/provisioning/safepath" -) - -// FIXME: revise logging in this method -func (r *exportJob) loadFolders(ctx context.Context) error { - logger := r.logger - r.progress.SetMessage(ctx, "reading folder tree") - - repoName := r.target.Config().Name - - // TODO: should this be logging or message or both? - r.progress.SetMessage(ctx, "read folder tree from unified storage") - client, err := r.client.Folder() - if err != nil { - return err - } - - rawList, err := client.List(ctx, metav1.ListOptions{Limit: 10000}) - if err != nil { - return fmt.Errorf("failed to list folders: %w", err) - } - if rawList.GetContinue() != "" { - return fmt.Errorf("unable to list all folders in one request: %s", rawList.GetContinue()) - } - - for _, item := range rawList.Items { - err = r.folderTree.AddUnstructured(&item, repoName) - if err != nil { - r.progress.Record(ctx, jobs.JobResourceResult{ - Name: item.GetName(), - Resource: folders.RESOURCE, - Group: folders.GROUP, - Error: err, - }) - } - } - - // create folders first is required so that empty folders exist when finished - r.progress.SetMessage(ctx, "write folders") - - err = r.folderTree.Walk(ctx, func(ctx context.Context, folder resources.Folder) error { - p := folder.Path - if r.path != "" { - p = safepath.Join(r.path, p) - } - logger := logger.With("path", p) - - result := jobs.JobResourceResult{ - Name: folder.ID, - Resource: folders.RESOURCE, - Group: folders.GROUP, - Path: p, - } - - _, err := r.target.Read(ctx, p, r.ref) - if err != nil && !(errors.Is(err, repository.ErrFileNotFound) || apierrors.IsNotFound(err)) { - result.Error = fmt.Errorf("failed to check if folder exists before writing: %w", err) - return result.Error - } else if err == nil { - logger.Info("folder already exists") - result.Action = repository.FileActionIgnored - r.progress.Record(ctx, result) - return nil - } - - result.Action = repository.FileActionCreated - msg := fmt.Sprintf("export folder %s", p) - // Create with an empty body will make a folder (or .keep file if unsupported) - if err := r.target.Create(ctx, p, r.ref, nil, msg); err != nil { - result.Error = fmt.Errorf("failed to write folder in repo: %w", err) - r.progress.Record(ctx, result) - return result.Error - } - - r.progress.Record(ctx, result) - return nil - }) - if err != nil { - return fmt.Errorf("failed to write folders: %w", err) - } - return nil -} diff --git a/pkg/registry/apis/provisioning/jobs/export/job.go b/pkg/registry/apis/provisioning/jobs/export/job.go deleted file mode 100644 index eb7f229d42d..00000000000 --- a/pkg/registry/apis/provisioning/jobs/export/job.go +++ /dev/null @@ -1,45 +0,0 @@ -package export - -import ( - "context" - - "github.com/grafana/grafana-app-sdk/logging" - provisioning "github.com/grafana/grafana/pkg/apis/provisioning/v0alpha1" - "github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs" - "github.com/grafana/grafana/pkg/registry/apis/provisioning/repository" - "github.com/grafana/grafana/pkg/registry/apis/provisioning/resources" -) - -// ExportJob holds all context for a running job -type exportJob struct { - logger logging.Logger - client *resources.ResourceClients // Read from - target repository.ReaderWriter // Write to - namespace string - - progress jobs.JobProgressRecorder - folderTree *resources.FolderTree - - path string // from options (now clean+safe) - ref string // from options (only git) - keepIdentifier bool -} - -func newExportJob(ctx context.Context, - target repository.ReaderWriter, - options provisioning.ExportJobOptions, - clients *resources.ResourceClients, - progress jobs.JobProgressRecorder, -) *exportJob { - return &exportJob{ - namespace: target.Config().Namespace, - target: target, - client: clients, - logger: logging.FromContext(ctx), - progress: progress, - path: options.Path, - ref: options.Branch, - keepIdentifier: options.Identifier, - folderTree: resources.NewEmptyFolderTree(), - } -} diff --git a/pkg/registry/apis/provisioning/jobs/export/resources.go b/pkg/registry/apis/provisioning/jobs/export/resources.go deleted file mode 100644 index 6a42a6593ff..00000000000 --- a/pkg/registry/apis/provisioning/jobs/export/resources.go +++ /dev/null @@ -1,153 +0,0 @@ -package export - -import ( - "context" - "encoding/json" - "fmt" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime/schema" - - dashboard "github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/v0alpha1" - "github.com/grafana/grafana/pkg/apimachinery/utils" - "github.com/grafana/grafana/pkg/infra/slugify" - "github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs" - "github.com/grafana/grafana/pkg/registry/apis/provisioning/repository" - "github.com/grafana/grafana/pkg/registry/apis/provisioning/resources" - "github.com/grafana/grafana/pkg/registry/apis/provisioning/safepath" -) - -func (r *exportJob) loadResources(ctx context.Context) error { - kinds := []schema.GroupVersionResource{{ - Group: dashboard.GROUP, - Resource: dashboard.DASHBOARD_RESOURCE, - Version: "v1alpha1", - }} - - for _, kind := range kinds { - r.progress.SetMessage(ctx, fmt.Sprintf("reading %s resource", kind.Resource)) - if err := r.loadResourcesFromAPIServer(ctx, kind); err != nil { - return fmt.Errorf("error loading %s %w", kind.Resource, err) - } - } - return nil -} - -func (r *exportJob) loadResourcesFromAPIServer(ctx context.Context, kind schema.GroupVersionResource) error { - client, _, err := r.client.ForResource(kind) - if err != nil { - return err - } - - var continueToken string - for ctx.Err() == nil { - list, err := client.List(ctx, metav1.ListOptions{Limit: 100, Continue: continueToken}) - if err != nil { - return fmt.Errorf("error executing list: %w", err) - } - - for _, item := range list.Items { - if ctx.Err() != nil { - return ctx.Err() - } - - r.progress.Record(ctx, r.write(ctx, &item)) - if err := r.progress.TooManyErrors(); err != nil { - return err - } - } - - continueToken = list.GetContinue() - if continueToken == "" { - break - } - } - - return ctx.Err() -} - -func (r *exportJob) write(ctx context.Context, obj *unstructured.Unstructured) jobs.JobResourceResult { - gvk := obj.GroupVersionKind() - result := jobs.JobResourceResult{ - Name: obj.GetName(), - Resource: gvk.Kind, - Group: gvk.Group, - Action: repository.FileActionCreated, - } - - if err := ctx.Err(); err != nil { - result.Error = fmt.Errorf("context error: %w", err) - return result - } - - meta, err := utils.MetaAccessor(obj) - if err != nil { - result.Error = fmt.Errorf("extract meta accessor: %w", err) - return result - } - - // Message from annotations - commitMessage := meta.GetMessage() - if commitMessage == "" { - g := meta.GetGeneration() - if g > 0 { - commitMessage = fmt.Sprintf("Generation: %d", g) - } else { - commitMessage = "exported from grafana" - } - } - - name := meta.GetName() - manager, _ := meta.GetManagerProperties() - if manager.Identity == r.target.Config().GetName() { - result.Action = repository.FileActionIgnored - return result - } - - title := meta.FindTitle("") - if title == "" { - title = name - } - folder := meta.GetFolder() - - // Get the absolute path of the folder - fid, ok := r.folderTree.DirPath(folder, "") - if !ok { - // FIXME: Shouldn't this fail instead? - fid = resources.Folder{ - Path: "__folder_not_found/" + slugify.Slugify(folder), - } - r.logger.Error("folder of item was not in tree of repository") - } - - result.Path = fid.Path - - // Clear the metadata - delete(obj.Object, "metadata") - - if r.keepIdentifier { - meta.SetName(name) // keep the identifier in the metadata - } - - body, err := json.MarshalIndent(obj.Object, "", " ") - if err != nil { - result.Error = fmt.Errorf("failed to marshal dashboard: %w", err) - return result - } - - fileName := slugify.Slugify(title) + ".json" - if fid.Path != "" { - fileName = safepath.Join(fid.Path, fileName) - } - if r.path != "" { - fileName = safepath.Join(r.path, fileName) - } - - err = r.target.Write(ctx, fileName, r.ref, body, commitMessage) - if err != nil { - result.Error = fmt.Errorf("failed to write file: %w", err) - } - - return result -} diff --git a/pkg/registry/apis/provisioning/jobs/export/worker.go b/pkg/registry/apis/provisioning/jobs/export/worker.go index 65b03f40da4..4fed24c37af 100644 --- a/pkg/registry/apis/provisioning/jobs/export/worker.go +++ b/pkg/registry/apis/provisioning/jobs/export/worker.go @@ -15,6 +15,8 @@ import ( "github.com/grafana/grafana/pkg/registry/apis/provisioning/resources" "github.com/grafana/grafana/pkg/registry/apis/provisioning/secrets" "github.com/grafana/grafana/pkg/storage/legacysql/dualwrite" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/client-go/dynamic" ) type ExportWorker struct { @@ -29,18 +31,22 @@ type ExportWorker struct { // Decrypt secrets in config secrets secrets.Service + + parsers *resources.ParserFactory } func NewExportWorker(clientFactory *resources.ClientFactory, storageStatus dualwrite.Service, secrets secrets.Service, clonedir string, + parsers *resources.ParserFactory, ) *ExportWorker { return &ExportWorker{ clonedir, clientFactory, storageStatus, secrets, + parsers, } } @@ -95,19 +101,83 @@ func (r *ExportWorker) Process(ctx context.Context, repo repository.Repository, return err } - worker := newExportJob(ctx, rw, *options, clients, progress) - // Load and write all folders - progress.SetMessage(ctx, "start folder export") - err = worker.loadFolders(ctx) + // FIXME: we load the entire tree in memory + progress.SetMessage(ctx, "read folder tree from API server") + client, err := clients.Folder() if err != nil { - return err + return fmt.Errorf("failed to get folder client: %w", err) + } + + folders := resources.NewFolderManager(rw, client, resources.NewEmptyFolderTree()) + if err := folders.LoadFromServer(ctx); err != nil { + return fmt.Errorf("failed to load folders from API server: %w", err) + } + + progress.SetMessage(ctx, "write folders to repository") + err = folders.EnsureTreeExists(ctx, options.Branch, options.Path, func(folder resources.Folder, created bool, err error) error { + result := jobs.JobResourceResult{ + Action: repository.FileActionCreated, + Name: folder.ID, + Resource: resources.FolderResource.Resource, + Group: resources.FolderResource.Group, + Path: folder.Path, + Error: err, + } + + if !created { + result.Action = repository.FileActionIgnored + } + + progress.Record(ctx, result) + return nil + }) + if err != nil { + return fmt.Errorf("write folders to repository: %w", err) } progress.SetMessage(ctx, "start resource export") - err = worker.loadResources(ctx) + parser, err := r.parsers.GetParser(ctx, rw) if err != nil { - return err + return fmt.Errorf("failed to get parser: %w", err) + } + + resourceManager := resources.NewResourcesManager(rw, folders, parser, clients, nil) + for _, kind := range resources.SupportedResources { + // skip from folders as we do them first + if kind == resources.FolderResource { + continue + } + + progress.SetMessage(ctx, fmt.Sprintf("reading %s resource", kind.Resource)) + if err := clients.ForEachResource(ctx, kind, func(_ dynamic.ResourceInterface, item *unstructured.Unstructured) error { + result := jobs.JobResourceResult{ + Name: item.GetName(), + Resource: kind.Resource, + Group: kind.Group, + Action: repository.FileActionCreated, + } + + fileName, err := resourceManager.CreateResourceFileFromObject(ctx, item, resources.WriteOptions{ + Path: options.Path, + Ref: options.Branch, + Identifier: options.Identifier, + }) + if errors.Is(err, resources.ErrAlreadyInRepository) { + result.Action = repository.FileActionIgnored + } else if err != nil { + result.Error = fmt.Errorf("export resource: %w", err) + } + result.Path = fileName + progress.Record(ctx, result) + + if err := progress.TooManyErrors(); err != nil { + return err + } + return nil + }); err != nil { + return fmt.Errorf("error exporting %s %w", kind.Resource, err) + } } if buffered != nil { diff --git a/pkg/registry/apis/provisioning/jobs/migrate/folders.go b/pkg/registry/apis/provisioning/jobs/migrate/folders.go index 0f9090e4f8c..96a2dfc32ac 100644 --- a/pkg/registry/apis/provisioning/jobs/migrate/folders.go +++ b/pkg/registry/apis/provisioning/jobs/migrate/folders.go @@ -5,109 +5,69 @@ import ( "errors" "fmt" - apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" - folders "github.com/grafana/grafana/pkg/apis/folder/v0alpha1" "github.com/grafana/grafana/pkg/registry/apis/dashboard/legacy" - "github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs" - "github.com/grafana/grafana/pkg/registry/apis/provisioning/repository" "github.com/grafana/grafana/pkg/registry/apis/provisioning/resources" "github.com/grafana/grafana/pkg/storage/unified/parquet" "github.com/grafana/grafana/pkg/storage/unified/resource" ) -var _ resource.BulkResourceWriter = (*folderReader)(nil) +const maxFolders = 10000 -type folderReader struct { +var _ resource.BulkResourceWriter = (*legacyFolderReader)(nil) + +type legacyFolderReader struct { tree *resources.FolderTree - targetRepoName string + repoName string + legacyMigrator legacy.LegacyMigrator + namespace string +} + +func NewLegacyFolderReader(legacyMigrator legacy.LegacyMigrator, repoName, namespace string) *legacyFolderReader { + return &legacyFolderReader{ + legacyMigrator: legacyMigrator, + repoName: repoName, + namespace: namespace, + tree: resources.NewEmptyFolderTree(), + } } // Close implements resource.BulkResourceWrite. -func (f *folderReader) Close() error { +func (f *legacyFolderReader) Close() error { return nil } // CloseWithResults implements resource.BulkResourceWrite. -func (f *folderReader) CloseWithResults() (*resource.BulkResponse, error) { +func (f *legacyFolderReader) CloseWithResults() (*resource.BulkResponse, error) { return &resource.BulkResponse{}, nil } // Write implements resource.BulkResourceWrite. -func (f *folderReader) Write(ctx context.Context, key *resource.ResourceKey, value []byte) error { +func (f *legacyFolderReader) Write(ctx context.Context, key *resource.ResourceKey, value []byte) error { item := &unstructured.Unstructured{} err := item.UnmarshalJSON(value) if err != nil { return fmt.Errorf("unmarshal unstructured to JSON: %w", err) } - return f.tree.AddUnstructured(item, f.targetRepoName) + if f.tree.Count() > maxFolders { + return errors.New("too many folders") + } + + return f.tree.AddUnstructured(item, f.repoName) } -func (j *migrationJob) migrateLegacyFolders(ctx context.Context) error { - logger := j.logger - j.progress.SetMessage(ctx, "reading folder tree") - - repoName := j.target.Config().Name - - j.progress.SetMessage(ctx, "migrate folder tree from legacy") - reader := &folderReader{ - tree: j.folderTree, - targetRepoName: repoName, - } - _, err := j.legacy.Migrate(ctx, legacy.MigrateOptions{ - Namespace: j.namespace, - Resources: []schema.GroupResource{{ - Group: folders.GROUP, - Resource: folders.RESOURCE, - }}, - Store: parquet.NewBulkResourceWriterClient(reader), +func (f *legacyFolderReader) Read(ctx context.Context, legacyMigrator legacy.LegacyMigrator, name, namespace string) error { + _, err := legacyMigrator.Migrate(ctx, legacy.MigrateOptions{ + Namespace: namespace, + Resources: []schema.GroupResource{resources.FolderResource.GroupResource()}, + Store: parquet.NewBulkResourceWriterClient(f), }) - if err != nil { - return fmt.Errorf("unable to read folders from legacy storage %w", err) - } - - // create folders first is required so that empty folders exist when finished - j.progress.SetMessage(ctx, "write folders") - - err = j.folderTree.Walk(ctx, func(ctx context.Context, folder resources.Folder) error { - p := folder.Path - logger = logger.With("path", p) - - result := jobs.JobResourceResult{ - Name: folder.ID, - Resource: folders.RESOURCE, - Group: folders.GROUP, - Path: p, - } - - _, err := j.target.Read(ctx, p, "") - if err != nil && !(errors.Is(err, repository.ErrFileNotFound) || apierrors.IsNotFound(err)) { - result.Error = fmt.Errorf("failed to check if folder exists before writing: %w", err) - return result.Error - } else if err == nil { - logger.Info("folder already exists") - result.Action = repository.FileActionIgnored - j.progress.Record(ctx, result) - return nil - } - - result.Action = repository.FileActionCreated - msg := fmt.Sprintf("export folder %s", p) - // Create with an empty body will make a folder (or .keep file if unsupported) - if err := j.target.Create(ctx, p, "", nil, msg); err != nil { - result.Error = fmt.Errorf("failed to write folder in repo: %w", err) - j.progress.Record(ctx, result) - return result.Error - } - - return nil - }) - if err != nil { - return fmt.Errorf("failed to write folders: %w", err) - } - - return nil + return err +} + +func (f *legacyFolderReader) Tree() *resources.FolderTree { + return f.tree } diff --git a/pkg/registry/apis/provisioning/jobs/migrate/resources.go b/pkg/registry/apis/provisioning/jobs/migrate/resources.go index 320fb7bc5cb..5bd0b203659 100644 --- a/pkg/registry/apis/provisioning/jobs/migrate/resources.go +++ b/pkg/registry/apis/provisioning/jobs/migrate/resources.go @@ -2,51 +2,62 @@ package migrate import ( "context" - "encoding/json" "fmt" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/dynamic" - - dashboard "github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/v0alpha1" "github.com/grafana/grafana/pkg/apimachinery/utils" - "github.com/grafana/grafana/pkg/infra/slugify" + provisioning "github.com/grafana/grafana/pkg/apis/provisioning/v0alpha1" "github.com/grafana/grafana/pkg/registry/apis/dashboard/legacy" "github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs" "github.com/grafana/grafana/pkg/registry/apis/provisioning/repository" "github.com/grafana/grafana/pkg/registry/apis/provisioning/resources" - "github.com/grafana/grafana/pkg/registry/apis/provisioning/safepath" "github.com/grafana/grafana/pkg/storage/unified/parquet" "github.com/grafana/grafana/pkg/storage/unified/resource" + "k8s.io/apimachinery/pkg/runtime/schema" ) -var _ resource.BulkResourceWriter = (*resourceReader)(nil) +var _ resource.BulkResourceWriter = (*legacyResourceResourceMigrator)(nil) -type resourceReader struct { - job *migrationJob +// TODO: can we use the same migrator for folders? +type legacyResourceResourceMigrator struct { + legacy legacy.LegacyMigrator + parser *resources.Parser + progress jobs.JobProgressRecorder + namespace string + kind schema.GroupResource + options provisioning.MigrateJobOptions + resources *resources.ResourcesManager +} + +func NewLegacyResourceMigrator(legacy legacy.LegacyMigrator, parser *resources.Parser, resources *resources.ResourcesManager, progress jobs.JobProgressRecorder, options provisioning.MigrateJobOptions, namespace string, kind schema.GroupResource) *legacyResourceResourceMigrator { + return &legacyResourceResourceMigrator{ + legacy: legacy, + parser: parser, + progress: progress, + options: options, + namespace: namespace, + kind: kind, + resources: resources, + } } // Close implements resource.BulkResourceWriter. -func (r *resourceReader) Close() error { +func (r *legacyResourceResourceMigrator) Close() error { return nil } // CloseWithResults implements resource.BulkResourceWriter. -func (r *resourceReader) CloseWithResults() (*resource.BulkResponse, error) { +func (r *legacyResourceResourceMigrator) CloseWithResults() (*resource.BulkResponse, error) { return &resource.BulkResponse{}, nil } // Write implements resource.BulkResourceWriter. -func (r *resourceReader) Write(ctx context.Context, key *resource.ResourceKey, value []byte) error { +func (r *legacyResourceResourceMigrator) Write(ctx context.Context, key *resource.ResourceKey, value []byte) error { // Reuse the same parse+cleanup logic - parsed, err := r.job.parser.Parse(ctx, &repository.FileInfo{ + parsed, err := r.parser.Parse(ctx, &repository.FileInfo{ Path: "", // empty path to ignore file system Data: value, }, false) if err != nil { - // TODO: should we fail the entire execution? return fmt.Errorf("failed to unmarshal unstructured: %w", err) } @@ -54,180 +65,59 @@ func (r *resourceReader) Write(ctx context.Context, key *resource.ResourceKey, v parsed.Meta.SetManagerProperties(utils.ManagerProperties{}) parsed.Meta.SetSourceProperties(utils.SourceProperties{}) - if result := r.job.write(ctx, parsed.Obj); result.Error != nil { - r.job.progress.Record(ctx, result) - if err := r.job.progress.TooManyErrors(); err != nil { - return err - } - } + // TODO: this seems to be same logic as the export job + // TODO: we should use a kind safe manager here + fileName, err := r.resources.CreateResourceFileFromObject(ctx, parsed.Obj, resources.WriteOptions{ + Path: "", + Ref: "", + Identifier: r.options.Identifier, + }) - return nil -} - -func (j *migrationJob) migrateLegacyResources(ctx context.Context) error { - kinds := []schema.GroupVersionResource{{ - Group: dashboard.GROUP, - Resource: dashboard.DASHBOARD_RESOURCE, - Version: "v1alpha1", - }} - - for _, kind := range kinds { - j.progress.SetMessage(ctx, fmt.Sprintf("migrate %s resource", kind.Resource)) - gr := kind.GroupResource() - opts := legacy.MigrateOptions{ - Namespace: j.namespace, - WithHistory: j.options.History, - Resources: []schema.GroupResource{gr}, - Store: parquet.NewBulkResourceWriterClient(&resourceReader{job: j}), - OnlyCount: true, // first get the count - } - stats, err := j.legacy.Migrate(ctx, opts) - if err != nil { - return fmt.Errorf("unable to count legacy items %w", err) - } - - // FIXME: explain why we calculate it in this way - if len(stats.Summary) > 0 { - count := stats.Summary[0].Count // - history := stats.Summary[0].History - if history > count { - count = history // the number of items we will process - } - j.progress.SetTotal(ctx, int(count)) - } - - opts.OnlyCount = false // this time actually write - _, err = j.legacy.Migrate(ctx, opts) - if err != nil { - return fmt.Errorf("error running legacy migrate %s %w", kind.Resource, err) - } - } - return nil -} - -func (j *migrationJob) write(ctx context.Context, obj *unstructured.Unstructured) jobs.JobResourceResult { - gvk := obj.GroupVersionKind() result := jobs.JobResourceResult{ - Name: obj.GetName(), - Resource: gvk.Kind, - Group: gvk.Group, + Name: parsed.Meta.GetName(), + Resource: r.kind.Resource, + Group: r.kind.Group, Action: repository.FileActionCreated, + Error: err, + Path: fileName, } - if err := ctx.Err(); err != nil { - result.Error = fmt.Errorf("context error: %w", err) - return result - } - - meta, err := utils.MetaAccessor(obj) - if err != nil { - result.Error = fmt.Errorf("extract meta accessor: %w", err) - return result - } - - // Message from annotations - commitMessage := meta.GetMessage() - if commitMessage == "" { - g := meta.GetGeneration() - if g > 0 { - commitMessage = fmt.Sprintf("Generation: %d", g) - } else { - commitMessage = "exported from grafana" - } - } - - name := meta.GetName() - manager, _ := meta.GetManagerProperties() - if manager.Identity == j.target.Config().GetName() { - result.Action = repository.FileActionIgnored - return result - } - - title := meta.FindTitle("") - if title == "" { - title = name - } - folder := meta.GetFolder() - - // Add the author in context (if available) - ctx = j.withAuthorSignature(ctx, meta) - - // Get the absolute path of the folder - fid, ok := j.folderTree.DirPath(folder, "") - if !ok { - // FIXME: Shouldn't this fail instead? - fid = resources.Folder{ - Path: "__folder_not_found/" + slugify.Slugify(folder), - } - j.logger.Error("folder of item was not in tree of repository") - } - - result.Path = fid.Path - - // Clear the metadata - delete(obj.Object, "metadata") - - if j.options.Identifier { - meta.SetName(name) // keep the identifier in the metadata - } - - body, err := json.MarshalIndent(obj.Object, "", " ") - if err != nil { - result.Error = fmt.Errorf("failed to marshal dashboard: %w", err) - return result - } - - fileName := slugify.Slugify(title) + ".json" - if fid.Path != "" { - fileName = safepath.Join(fid.Path, fileName) - } - - err = j.target.Write(ctx, fileName, "", body, commitMessage) - if err != nil { - result.Error = fmt.Errorf("failed to write file: %w", err) - } - - return result -} - -func removeUnprovisioned(ctx context.Context, client dynamic.ResourceInterface, progress jobs.JobProgressRecorder) error { - rawList, err := client.List(ctx, metav1.ListOptions{Limit: 10000}) - if err != nil { - return fmt.Errorf("failed to list resources: %w", err) - } - - if rawList.GetContinue() != "" { - return fmt.Errorf("unable to list all resources in one request: %s", rawList.GetContinue()) - } - - for _, item := range rawList.Items { - // Create a pointer to the item since MetaAccessor requires a pointer - itemPtr := &item - meta, err := utils.MetaAccessor(itemPtr) - if err != nil { - return fmt.Errorf("extract meta accessor: %w", err) - } - - // Skip if managed - _, ok := meta.GetManagerProperties() - if ok { - continue - } - - result := jobs.JobResourceResult{ - Name: item.GetName(), - Resource: item.GetKind(), - Group: item.GroupVersionKind().Group, - Action: repository.FileActionDeleted, - } - - if err = client.Delete(ctx, item.GetName(), metav1.DeleteOptions{}); err != nil { - result.Error = fmt.Errorf("failed to delete folder: %w", err) - progress.Record(ctx, result) - return result.Error - } - - progress.Record(ctx, result) + r.progress.Record(ctx, result) + if err := r.progress.TooManyErrors(); err != nil { + return err + } + + return nil +} + +func (r *legacyResourceResourceMigrator) Migrate(ctx context.Context) error { + r.progress.SetMessage(ctx, fmt.Sprintf("migrate %s resource", r.kind.Resource)) + opts := legacy.MigrateOptions{ + Namespace: r.namespace, + WithHistory: r.options.History, + Resources: []schema.GroupResource{r.kind}, + Store: parquet.NewBulkResourceWriterClient(r), + OnlyCount: true, // first get the count + } + stats, err := r.legacy.Migrate(ctx, opts) + if err != nil { + return fmt.Errorf("unable to count legacy items %w", err) + } + + // FIXME: explain why we calculate it in this way + if len(stats.Summary) > 0 { + count := stats.Summary[0].Count // + history := stats.Summary[0].History + if history > count { + count = history // the number of items we will process + } + r.progress.SetTotal(ctx, int(count)) + } + + opts.OnlyCount = false // this time actually write + _, err = r.legacy.Migrate(ctx, opts) + if err != nil { + return fmt.Errorf("error running legacy migrate %s %w", r.kind.Resource, err) } return nil diff --git a/pkg/registry/apis/provisioning/jobs/migrate/import.go b/pkg/registry/apis/provisioning/jobs/migrate/storage.go similarity index 65% rename from pkg/registry/apis/provisioning/jobs/migrate/import.go rename to pkg/registry/apis/provisioning/jobs/migrate/storage.go index 3a4fa42a180..798ce384a14 100644 --- a/pkg/registry/apis/provisioning/jobs/migrate/import.go +++ b/pkg/registry/apis/provisioning/jobs/migrate/storage.go @@ -6,27 +6,16 @@ import ( "time" "google.golang.org/grpc/metadata" - "k8s.io/apimachinery/pkg/runtime/schema" "github.com/grafana/grafana-app-sdk/logging" - dashboard "github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/v0alpha1" - folders "github.com/grafana/grafana/pkg/apis/folder/v0alpha1" + "github.com/grafana/grafana/pkg/registry/apis/provisioning/resources" "github.com/grafana/grafana/pkg/storage/legacysql/dualwrite" "github.com/grafana/grafana/pkg/storage/unified/resource" ) -// called when an error exists func stopReadingUnifiedStorage(ctx context.Context, dual dualwrite.Service) error { - kinds := []schema.GroupResource{{ - Group: folders.GROUP, - Resource: folders.RESOURCE, - }, { - Group: dashboard.GROUP, - Resource: dashboard.DASHBOARD_RESOURCE, - }} - - for _, gr := range kinds { - status, _ := dual.Status(ctx, gr) + for _, gr := range resources.SupportedResources { + status, _ := dual.Status(ctx, gr.GroupResource()) status.ReadUnified = false status.Migrated = 0 status.Migrating = 0 @@ -35,20 +24,13 @@ func stopReadingUnifiedStorage(ctx context.Context, dual dualwrite.Service) erro return err } } + return nil } -func (j *migrationJob) wipeUnifiedAndSetMigratedFlag(ctx context.Context, dual dualwrite.Service) error { - kinds := []schema.GroupResource{{ - Group: folders.GROUP, - Resource: folders.RESOURCE, - }, { - Group: dashboard.GROUP, - Resource: dashboard.DASHBOARD_RESOURCE, - }} - - for _, gr := range kinds { - status, _ := dual.Status(ctx, gr) +func wipeUnifiedAndSetMigratedFlag(ctx context.Context, dual dualwrite.Service, namespace string, batch resource.BulkStoreClient) error { + for _, gr := range resources.SupportedResources { + status, _ := dual.Status(ctx, gr.GroupResource()) if status.ReadUnified { return fmt.Errorf("unexpected state - already using unified storage for: %s", gr) } @@ -60,13 +42,13 @@ func (j *migrationJob) wipeUnifiedAndSetMigratedFlag(ctx context.Context, dual d settings := resource.BulkSettings{ RebuildCollection: true, // wipes everything in the collection Collection: []*resource.ResourceKey{{ - Namespace: j.namespace, + Namespace: namespace, Group: gr.Group, Resource: gr.Resource, }}, } ctx = metadata.NewOutgoingContext(ctx, settings.ToMD()) - stream, err := j.batch.BulkProcess(ctx) + stream, err := batch.BulkProcess(ctx) if err != nil { return fmt.Errorf("error clearing unified %s / %w", gr, err) } diff --git a/pkg/registry/apis/provisioning/jobs/migrate/users.go b/pkg/registry/apis/provisioning/jobs/migrate/users.go index 30d52faf770..2b7b4d09cb2 100644 --- a/pkg/registry/apis/provisioning/jobs/migrate/users.go +++ b/pkg/registry/apis/provisioning/jobs/migrate/users.go @@ -2,41 +2,41 @@ package migrate import ( "context" - "fmt" + "errors" "strings" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "github.com/grafana/grafana/pkg/registry/apis/provisioning/repository" + "github.com/grafana/grafana/pkg/registry/apis/provisioning/resources" ) -func (j *migrationJob) loadUsers(ctx context.Context) error { - client, err := j.parser.Clients().User() +const maxUsers = 10000 + +func loadUsers(ctx context.Context, parser *resources.Parser) (map[string]repository.CommitSignature, error) { + client, err := parser.Clients().User() if err != nil { - return err + return nil, err } - rawList, err := client.List(ctx, metav1.ListOptions{Limit: 10000}) - if err != nil { - return fmt.Errorf("failed to list users: %w", err) - } - if rawList.GetContinue() != "" { - return fmt.Errorf("unable to list all users in one request: %s", rawList.GetContinue()) - } + userInfo := make(map[string]repository.CommitSignature) + var count int + err = resources.ForEachResource(ctx, client, func(item *unstructured.Unstructured) error { + count++ + if count > maxUsers { + return errors.New("too many users") + } - var ok bool - j.userInfo = make(map[string]repository.CommitSignature) - for _, item := range rawList.Items { sig := repository.CommitSignature{} // FIXME: should we improve logging here? + var ok bool sig.Name, ok, err = unstructured.NestedString(item.Object, "spec", "login") if !ok || err != nil { - continue + return nil } sig.Email, ok, err = unstructured.NestedString(item.Object, "spec", "email") if !ok || err != nil { - continue + return nil } if sig.Name == sig.Email { @@ -47,7 +47,12 @@ func (j *migrationJob) loadUsers(ctx context.Context) error { } } - j.userInfo["user:"+item.GetName()] = sig + userInfo["user:"+item.GetName()] = sig + return nil + }) + if err != nil { + return nil, err } - return nil + + return userInfo, nil } diff --git a/pkg/registry/apis/provisioning/jobs/migrate/worker.go b/pkg/registry/apis/provisioning/jobs/migrate/worker.go index 1a66066b2db..6962942eedb 100644 --- a/pkg/registry/apis/provisioning/jobs/migrate/worker.go +++ b/pkg/registry/apis/provisioning/jobs/migrate/worker.go @@ -9,7 +9,6 @@ import ( "time" "github.com/grafana/grafana-app-sdk/logging" - "github.com/grafana/grafana/pkg/apimachinery/utils" provisioning "github.com/grafana/grafana/pkg/apis/provisioning/v0alpha1" "github.com/grafana/grafana/pkg/registry/apis/dashboard/legacy" "github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs" @@ -21,6 +20,9 @@ import ( "github.com/grafana/grafana/pkg/registry/apis/provisioning/secrets" "github.com/grafana/grafana/pkg/storage/legacysql/dualwrite" "github.com/grafana/grafana/pkg/storage/unified/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/client-go/dynamic" ) type MigrationWorker struct { @@ -128,7 +130,7 @@ func (w *MigrationWorker) Process(ctx context.Context, repo repository.Repositor return w.migrateFromLegacy(ctx, rw, buffered, *options, progress) } - return w.migrateFromUnifiedStorage(ctx, rw, *options, progress) + return w.migrateFromAPIServer(ctx, rw, *options, progress) } // migrateFromLegacy will export the resources from legacy storage and import them into the target repository @@ -138,29 +140,61 @@ func (w *MigrationWorker) migrateFromLegacy(ctx context.Context, rw repository.R return fmt.Errorf("error getting parser: %w", err) } - worker, err := newMigrationJob(ctx, rw, options, parser, w.bulk, w.legacyMigrator, progress) - if err != nil { - return fmt.Errorf("error creating job: %w", err) - } - + var userInfo map[string]repository.CommitSignature if options.History { progress.SetMessage(ctx, "loading users") - err = worker.loadUsers(ctx) + userInfo, err = loadUsers(ctx, parser) if err != nil { return fmt.Errorf("error loading users: %w", err) } } + namespace := rw.Config().Namespace - progress.SetMessage(ctx, "exporting legacy folders") - err = worker.migrateLegacyFolders(ctx) + progress.SetMessage(ctx, "loading legacy folders") + reader := NewLegacyFolderReader(w.legacyMigrator, rw.Config().Name, namespace) + if err = reader.Read(ctx, w.legacyMigrator, rw.Config().Name, namespace); err != nil { + return fmt.Errorf("error loading folder tree: %w", err) + } + + folderClient, err := parser.Clients().Folder() if err != nil { - return err + return fmt.Errorf("error getting folder client: %w", err) + } + + folders := resources.NewFolderManager(rw, folderClient, reader.Tree()) + progress.SetMessage(ctx, "exporting legacy folders") + err = folders.EnsureTreeExists(ctx, "", "", func(folder resources.Folder, created bool, err error) error { + result := jobs.JobResourceResult{ + Action: repository.FileActionCreated, + Name: folder.ID, + Resource: resources.FolderResource.Resource, + Group: resources.FolderResource.Group, + Path: folder.Path, + Error: err, + } + + if !created { + result.Action = repository.FileActionIgnored + } + + progress.Record(ctx, result) + return nil + }) + if err != nil { + return fmt.Errorf("error exporting legacy folders: %w", err) } progress.SetMessage(ctx, "exporting legacy resources") - err = worker.migrateLegacyResources(ctx) - if err != nil { - return err + resourceManager := resources.NewResourcesManager(rw, folders, parser, parser.Clients(), userInfo) + for _, kind := range resources.SupportedResources { + if kind == resources.FolderResource { + continue + } + + reader := NewLegacyResourceMigrator(w.legacyMigrator, parser, resourceManager, progress, options, namespace, kind.GroupResource()) + if err := reader.Migrate(ctx); err != nil { + return fmt.Errorf("error migrating resource %s: %w", kind, err) + } } if buffered != nil { @@ -182,7 +216,7 @@ func (w *MigrationWorker) migrateFromLegacy(ctx context.Context, rw repository.R } progress.SetMessage(ctx, "resetting unified storage") - if err = worker.wipeUnifiedAndSetMigratedFlag(ctx, w.storageStatus); err != nil { + if err = wipeUnifiedAndSetMigratedFlag(ctx, w.storageStatus, namespace, w.bulk); err != nil { return fmt.Errorf("unable to reset unified storage %w", err) } @@ -209,125 +243,57 @@ func (w *MigrationWorker) migrateFromLegacy(ctx context.Context, rw repository.R return err } -// migrateFromUnifiedStorage will export the resources from unified storage and import them into the target repository -func (w *MigrationWorker) migrateFromUnifiedStorage(ctx context.Context, repo repository.ReaderWriter, options provisioning.MigrateJobOptions, progress jobs.JobProgressRecorder) error { - parser, err := w.parsers.GetParser(ctx, repo) - if err != nil { - return fmt.Errorf("error getting parser: %w", err) - } - +// migrateFromAPIServer will export the resources from unified storage and import them into the target repository +func (w *MigrationWorker) migrateFromAPIServer(ctx context.Context, repo repository.ReaderWriter, options provisioning.MigrateJobOptions, progress jobs.JobProgressRecorder) error { progress.SetMessage(ctx, "exporting unified storage resources") - if err := w.exportWorker.Process(ctx, repo, provisioning.Job{ + exportJob := provisioning.Job{ Spec: provisioning.JobSpec{ Push: &provisioning.ExportJobOptions{ Identifier: options.Identifier, }, }, - }, progress); err != nil { + } + if err := w.exportWorker.Process(ctx, repo, exportJob, progress); err != nil { return fmt.Errorf("export resources: %w", err) } // Reset the results after the export as pull will operate on the same resources progress.ResetResults() - progress.SetMessage(ctx, "pulling resources") - err = w.syncWorker.Process(ctx, repo, provisioning.Job{ + syncJob := provisioning.Job{ Spec: provisioning.JobSpec{ Pull: &provisioning.SyncJobOptions{ Incremental: false, }, }, - }, progress) - if err != nil { + } + + if err := w.syncWorker.Process(ctx, repo, syncJob, progress); err != nil { return fmt.Errorf("pull resources: %w", err) } - folderClient, err := parser.Clients().Folder() + progress.SetMessage(ctx, "removing unprovisioned resources") + parser, err := w.parsers.GetParser(ctx, repo) if err != nil { - return fmt.Errorf("unable to get folder client: %w", err) + return fmt.Errorf("error getting parser: %w", err) } - dashboardClient, err := parser.Clients().Dashboard() - if err != nil { - return fmt.Errorf("unable to get dashboard client: %w", err) - } + return parser.Clients().ForEachUnmanagedResource(ctx, func(client dynamic.ResourceInterface, item *unstructured.Unstructured) error { + result := jobs.JobResourceResult{ + Name: item.GetName(), + Resource: item.GetKind(), + Group: item.GroupVersionKind().Group, + Action: repository.FileActionDeleted, + } - progress.SetMessage(ctx, "removing unprovisioned folders") - err = removeUnprovisioned(ctx, folderClient, progress) - if err != nil { - return fmt.Errorf("remove unprovisioned folders: %w", err) - } + if err = client.Delete(ctx, item.GetName(), metav1.DeleteOptions{}); err != nil { + result.Error = fmt.Errorf("failed to delete folder: %w", err) + progress.Record(ctx, result) + return result.Error + } - progress.SetMessage(ctx, "removing unprovisioned dashboards") - err = removeUnprovisioned(ctx, dashboardClient, progress) - if err != nil { - return fmt.Errorf("remove unprovisioned dashboards: %w", err) - } + progress.Record(ctx, result) - return nil -} - -// MigrationJob holds all context for a running job -type migrationJob struct { - logger logging.Logger - target repository.ReaderWriter - legacy legacy.LegacyMigrator - parser *resources.Parser - batch resource.BulkStoreClient - - namespace string - - progress jobs.JobProgressRecorder - - userInfo map[string]repository.CommitSignature - folderTree *resources.FolderTree - - options provisioning.MigrateJobOptions -} - -func newMigrationJob(ctx context.Context, - target repository.ReaderWriter, - options provisioning.MigrateJobOptions, - parser *resources.Parser, - batch resource.BulkStoreClient, - legacyMigrator legacy.LegacyMigrator, - progress jobs.JobProgressRecorder, -) (*migrationJob, error) { - return &migrationJob{ - namespace: target.Config().Namespace, - target: target, - logger: logging.FromContext(ctx), - progress: progress, - options: options, - parser: parser, - batch: batch, - legacy: legacyMigrator, - folderTree: resources.NewEmptyFolderTree(), - }, nil -} - -func (j *migrationJob) withAuthorSignature(ctx context.Context, item utils.GrafanaMetaAccessor) context.Context { - if j.userInfo == nil { - return ctx - } - id := item.GetUpdatedBy() - if id == "" { - id = item.GetCreatedBy() - } - if id == "" { - id = "grafana" - } - - sig := j.userInfo[id] // lookup - if sig.Name == "" && sig.Email == "" { - sig.Name = id - } - t, err := item.GetUpdatedTimestamp() - if err == nil && t != nil { - sig.When = *t - } else { - sig.When = item.GetCreationTimestamp().Time - } - - return repository.WithAuthorSignature(ctx, sig) + return nil + }) } diff --git a/pkg/registry/apis/provisioning/jobs/sync/changes.go b/pkg/registry/apis/provisioning/jobs/sync/changes.go index 719fd2cd02d..bbc46a93e3d 100644 --- a/pkg/registry/apis/provisioning/jobs/sync/changes.go +++ b/pkg/registry/apis/provisioning/jobs/sync/changes.go @@ -5,7 +5,6 @@ import ( "sort" "strings" - folders "github.com/grafana/grafana/pkg/apis/folder/v0alpha1" provisioning "github.com/grafana/grafana/pkg/apis/provisioning/v0alpha1" "github.com/grafana/grafana/pkg/registry/apis/provisioning/repository" "github.com/grafana/grafana/pkg/registry/apis/provisioning/resources" @@ -25,14 +24,14 @@ func Changes(source []repository.FileTreeEntry, target *provisioning.ResourceLis lookup := make(map[string]*provisioning.ResourceListItem, len(target.Items)) for _, item := range target.Items { if item.Path == "" { - if item.Group != folders.GROUP { + if item.Group != resources.FolderResource.Group { return nil, fmt.Errorf("empty path on a non folder") } continue } // TODO: why do we have to do this here? - if item.Group == folders.GROUP && !strings.HasSuffix(item.Path, "/") { + if item.Group == resources.FolderResource.Group && !strings.HasSuffix(item.Path, "/") { item.Path = item.Path + "/" } @@ -44,7 +43,7 @@ func Changes(source []repository.FileTreeEntry, target *provisioning.ResourceLis for _, file := range source { check, ok := lookup[file.Path] if ok { - if check.Hash != file.Hash && check.Resource != folders.RESOURCE { + if check.Hash != file.Hash && check.Resource != resources.FolderResource.Resource { changes = append(changes, ResourceFileChange{ Action: repository.FileActionUpdated, Path: check.Path, @@ -56,7 +55,7 @@ func Changes(source []repository.FileTreeEntry, target *provisioning.ResourceLis return nil, fmt.Errorf("failed to add path to keep trie: %w", err) } - if check.Resource != folders.RESOURCE { + if check.Resource != resources.FolderResource.Resource { delete(lookup, file.Path) } @@ -101,7 +100,7 @@ func Changes(source []repository.FileTreeEntry, target *provisioning.ResourceLis // Paths found in grafana, without a matching path in the repository for _, v := range lookup { - if v.Resource == folders.RESOURCE && keep.Exists(v.Path) { + if v.Resource == resources.FolderResource.Resource && keep.Exists(v.Path) { continue } diff --git a/pkg/registry/apis/provisioning/jobs/sync/worker.go b/pkg/registry/apis/provisioning/jobs/sync/worker.go index 329285a9493..2def2faa177 100644 --- a/pkg/registry/apis/provisioning/jobs/sync/worker.go +++ b/pkg/registry/apis/provisioning/jobs/sync/worker.go @@ -1,7 +1,6 @@ package sync import ( - "bytes" "context" "encoding/json" "errors" @@ -12,7 +11,6 @@ import ( "k8s.io/apimachinery/pkg/types" "github.com/grafana/grafana-app-sdk/logging" - folders "github.com/grafana/grafana/pkg/apis/folder/v0alpha1" provisioning "github.com/grafana/grafana/pkg/apis/provisioning/v0alpha1" client "github.com/grafana/grafana/pkg/generated/clientset/versioned/typed/provisioning/v0alpha1" "github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs" @@ -64,7 +62,7 @@ func (r *SyncWorker) Process(ctx context.Context, repo repository.Repository, jo return fmt.Errorf("sync not supported until storage has migrated") } - rw, ok := repo.(repository.Reader) + rw, ok := repo.(repository.ReaderWriter) if !ok { return fmt.Errorf("sync job submitted for repository that does not support read-write -- this is a bug") } @@ -132,7 +130,7 @@ func (r *SyncWorker) Process(ctx context.Context, repo repository.Repository, jo } // start a job and run it -func (r *SyncWorker) createJob(ctx context.Context, repo repository.Reader, progress jobs.JobProgressRecorder) (*syncJob, error) { +func (r *SyncWorker) createJob(ctx context.Context, repo repository.ReaderWriter, progress jobs.JobProgressRecorder) (*syncJob, error) { cfg := repo.Config() parser, err := r.parsers.GetParser(ctx, repo) if err != nil { @@ -144,13 +142,14 @@ func (r *SyncWorker) createJob(ctx context.Context, repo repository.Reader, prog return nil, fmt.Errorf("unable to get folder client: %w", err) } + folders := resources.NewFolderManager(repo, folderClient, resources.NewEmptyFolderTree()) job := &syncJob{ repository: repo, progress: progress, - parser: parser, lister: r.lister, - folders: resources.NewFolderManager(repo, folderClient), - resourcesLookup: map[resourceID]string{}, + folders: folders, + clients: parser.Clients(), + resourceManager: resources.NewResourcesManager(repo, folders, parser, parser.Clients(), nil), } return job, nil @@ -171,21 +170,14 @@ func (r *SyncWorker) patchStatus(ctx context.Context, repo *provisioning.Reposit return nil } -type resourceID struct { - Name string - Resource string - Group string -} - // created once for each sync execution type syncJob struct { repository repository.Reader progress jobs.JobProgressRecorder - parser *resources.Parser lister resources.ResourceLister folders *resources.FolderManager - folderLookup *resources.FolderTree - resourcesLookup map[resourceID]string // the path with this k8s name + clients *resources.ResourceClients + resourceManager *resources.ResourcesManager } func (r *syncJob) run(ctx context.Context, options provisioning.SyncJobOptions) error { @@ -242,18 +234,13 @@ func (r *syncJob) run(ctx context.Context, options provisioning.SyncJobOptions) return nil } - // Load any existing folder information - r.folderLookup = resources.NewFolderTreeFromResourceList(target) + r.folders.SetTree(resources.NewFolderTreeFromResourceList(target)) // Now apply the changes return r.applyChanges(ctx, changes) } func (r *syncJob) applyChanges(ctx context.Context, changes []ResourceFileChange) error { - if len(r.resourcesLookup) > 0 { - return fmt.Errorf("this should be empty") - } - r.progress.SetTotal(ctx, len(changes)) r.progress.SetMessage(ctx, "replicating changes") @@ -285,7 +272,8 @@ func (r *syncJob) applyChanges(ctx context.Context, changes []ResourceFileChange Resource: change.Existing.Resource, } - client, _, err := r.parser.Clients().ForResource(versionlessGVR) + // TODO: should we use the clients or the resource manager instead? + client, _, err := r.clients.ForResource(versionlessGVR) if err != nil { result.Error = fmt.Errorf("unable to get client for deleted object: %w", err) r.progress.Record(ctx, result) @@ -312,15 +300,25 @@ func (r *syncJob) applyChanges(ctx context.Context, changes []ResourceFileChange } result.Name = folder - result.Resource = folders.RESOURCE - result.Group = folders.GROUP + result.Resource = resources.FolderResource.Resource + result.Group = resources.FolderResource.Group r.progress.Record(ctx, result) continue } - // Write the resource file - r.progress.Record(ctx, r.writeResourceFromFile(ctx, change.Path, "", change.Action)) + name, gvk, err := r.resourceManager.WriteResourceFromFile(ctx, change.Path, "") + result := jobs.JobResourceResult{ + Path: change.Path, + Action: change.Action, + Name: name, + Error: err, + } + if gvk != nil { + result.Resource = gvk.Kind + result.Group = gvk.Group + } + r.progress.Record(ctx, result) } r.progress.SetMessage(ctx, "changes replicated") @@ -367,8 +365,8 @@ func (r *syncJob) applyVersionedChanges(ctx context.Context, repo repository.Ver r.progress.Record(ctx, jobs.JobResourceResult{ Path: safeSegment, Action: repository.FileActionCreated, - Resource: folders.RESOURCE, - Group: folders.GROUP, + Resource: resources.FolderResource.Resource, + Group: resources.FolderResource.Group, Name: folder, }) @@ -382,126 +380,49 @@ func (r *syncJob) applyVersionedChanges(ctx context.Context, repo repository.Ver continue } + result := jobs.JobResourceResult{ + Path: change.Path, + Action: change.Action, + } + switch change.Action { case repository.FileActionCreated, repository.FileActionUpdated: - r.progress.Record(ctx, r.writeResourceFromFile(ctx, change.Path, change.Ref, change.Action)) - case repository.FileActionDeleted: - r.progress.Record(ctx, r.deleteObject(ctx, change.Path, change.PreviousRef)) - case repository.FileActionRenamed: - // 1. Delete - result := r.deleteObject(ctx, change.Path, change.PreviousRef) - if result.Error != nil { - r.progress.Record(ctx, result) - continue + name, gvk, err := r.resourceManager.WriteResourceFromFile(ctx, change.Path, change.Ref) + if err != nil { + result.Error = fmt.Errorf("write resource: %w", err) + } + result.Name = name + if gvk != nil { + result.Resource = gvk.Kind + result.Group = gvk.Group + } + case repository.FileActionDeleted: + name, gvk, err := r.resourceManager.RemoveResourceFromFile(ctx, change.Path, change.PreviousRef) + if err != nil { + result.Error = fmt.Errorf("delete resource: %w", err) + } + result.Name = name + if gvk != nil { + result.Resource = gvk.Kind + result.Group = gvk.Group + } + case repository.FileActionRenamed: + name, gvk, err := r.resourceManager.RenameResourceFile(ctx, change.Path, change.PreviousRef, change.Path, change.Ref) + if err != nil { + result.Error = fmt.Errorf("rename resource: %w", err) + } + result.Name = name + if gvk != nil { + result.Resource = gvk.Kind + result.Group = gvk.Group } - - // 2. Create - r.progress.Record(ctx, r.writeResourceFromFile(ctx, change.Path, change.Ref, repository.FileActionCreated)) case repository.FileActionIgnored: - r.progress.Record(ctx, jobs.JobResourceResult{ - Path: change.Path, - Action: repository.FileActionIgnored, - }) + // do nothing } + r.progress.Record(ctx, result) } r.progress.SetMessage(ctx, "versioned changes replicated") return nil } - -func (r *syncJob) deleteObject(ctx context.Context, path string, ref string) jobs.JobResourceResult { - info, err := r.repository.Read(ctx, path, ref) - result := jobs.JobResourceResult{ - Path: path, - Action: repository.FileActionDeleted, - } - - if err != nil { - result.Error = fmt.Errorf("failed to read file: %w", err) - return result - } - - obj, gvk, _ := resources.DecodeYAMLObject(bytes.NewBuffer(info.Data)) - if obj == nil { - result.Error = errors.New("no object found") - return result - } - - objName := obj.GetName() - if objName == "" { - // Find the referenced file - objName, _ = resources.NamesFromHashedRepoPath(r.repository.Config().Name, path) - } - - result.Name = objName - result.Resource = gvk.Kind - result.Group = gvk.Group - - client, _, err := r.parser.Clients().ForKind(*gvk) - if err != nil { - result.Error = fmt.Errorf("unable to get client for deleted object: %w", err) - return result - } - - err = client.Delete(ctx, objName, metav1.DeleteOptions{}) - if err != nil { - result.Error = fmt.Errorf("failed to delete: %w", err) - return result - } - - return result -} - -func (r *syncJob) writeResourceFromFile(ctx context.Context, path string, ref string, action repository.FileAction) jobs.JobResourceResult { - result := jobs.JobResourceResult{ - Path: path, - Action: action, - } - - // Read the referenced file - fileInfo, err := r.repository.Read(ctx, path, ref) - if err != nil { - result.Error = fmt.Errorf("failed to read file: %w", err) - return result - } - - parsed, err := r.parser.Parse(ctx, fileInfo, false) // no validation - if err != nil { - result.Error = fmt.Errorf("failed to parse file: %w", err) - return result - } - - // Check if the resource already exists - id := resourceID{ - Name: parsed.Obj.GetName(), - Resource: parsed.GVR.Resource, - Group: parsed.GVK.Group, - } - existing, found := r.resourcesLookup[id] - if found { - result.Error = fmt.Errorf("duplicate resource name: %s, %s and %s", parsed.Obj.GetName(), path, existing) - return result - } - r.resourcesLookup[id] = path - - // Make sure the parent folders exist - folder, err := r.folders.EnsureFolderPathExist(ctx, path) - if err != nil { - result.Error = fmt.Errorf("failed to ensure folder path exists: %w", err) - return result - } - - parsed.Meta.SetFolder(folder) - parsed.Meta.SetUID("") // clear identifiers - parsed.Meta.SetResourceVersion("") // clear identifiers - - result.Name = parsed.Obj.GetName() - result.Resource = parsed.GVR.Resource - result.Group = parsed.GVK.Group - - // Update will also create (for resources we care about) - _, err = parsed.Client.Update(ctx, parsed.Obj, metav1.UpdateOptions{}) - result.Error = err - return result -} diff --git a/pkg/registry/apis/provisioning/register.go b/pkg/registry/apis/provisioning/register.go index 1abb955522c..146b1951978 100644 --- a/pkg/registry/apis/provisioning/register.go +++ b/pkg/registry/apis/provisioning/register.go @@ -562,6 +562,7 @@ func (b *APIBuilder) GetPostStartHooks() (map[string]genericapiserver.PostStartH b.storageStatus, b.secrets, b.clonedir, + b.parsers, ) syncWorker := sync.NewSyncWorker( c.ProvisioningV0alpha1(), diff --git a/pkg/registry/apis/provisioning/request.go b/pkg/registry/apis/provisioning/request.go index 616c86538f2..7eb8def59bb 100644 --- a/pkg/registry/apis/provisioning/request.go +++ b/pkg/registry/apis/provisioning/request.go @@ -29,6 +29,7 @@ func readBody(r *http.Request, maxSize int64) ([]byte, error) { } return nil, fmt.Errorf("error reading request body: %w", err) } + defer func() { _ = limitedBody.Close() }() return body, nil } diff --git a/pkg/registry/apis/provisioning/resources/client.go b/pkg/registry/apis/provisioning/resources/client.go index 8222146cdfa..d7d9f2c6918 100644 --- a/pkg/registry/apis/provisioning/resources/client.go +++ b/pkg/registry/apis/provisioning/resources/client.go @@ -5,10 +5,13 @@ import ( "fmt" "sync" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" dashboard "github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/v1alpha1" + "github.com/grafana/grafana/pkg/apimachinery/utils" folders "github.com/grafana/grafana/pkg/apis/folder/v0alpha1" iam "github.com/grafana/grafana/pkg/apis/iam/v0alpha1" "github.com/grafana/grafana/pkg/services/apiserver" @@ -142,29 +145,105 @@ func (c *ResourceClients) ForResource(gvr schema.GroupVersionResource) (dynamic. return info.client, info.gvk, nil } -func (c *ResourceClients) Folder() (dynamic.ResourceInterface, error) { - v, _, err := c.ForResource(schema.GroupVersionResource{ - Group: folders.GROUP, - Version: folders.VERSION, - Resource: folders.RESOURCE, +// ForEachResource applies the function to each resource in the discovery client +func (c *ResourceClients) ForEachResource(ctx context.Context, kind schema.GroupVersionResource, fn func(client dynamic.ResourceInterface, item *unstructured.Unstructured) error) error { + client, _, err := c.ForResource(kind) + if err != nil { + return err + } + + return ForEachResource(ctx, client, func(item *unstructured.Unstructured) error { + return fn(client, item) }) - return v, err +} + +// ForEachResource applies the function to each resource in the discovery client +func ForEachResource(ctx context.Context, client dynamic.ResourceInterface, fn func(item *unstructured.Unstructured) error) error { + var continueToken string + for ctx.Err() == nil { + list, err := client.List(ctx, metav1.ListOptions{Limit: 100, Continue: continueToken}) + if err != nil { + return fmt.Errorf("error executing list: %w", err) + } + + for _, item := range list.Items { + if ctx.Err() != nil { + return ctx.Err() + } + + if err := fn(&item); err != nil { + return err + } + } + + continueToken = list.GetContinue() + if continueToken == "" { + break + } + } + + return nil +} + +// ForEachUnmanagedResource applies the function to each unprovisioned supported resource +func (c *ResourceClients) ForEachUnmanagedResource(ctx context.Context, fn func(client dynamic.ResourceInterface, item *unstructured.Unstructured) error) error { + return c.ForEachSupportedResource(ctx, func(client dynamic.ResourceInterface, item *unstructured.Unstructured) error { + meta, err := utils.MetaAccessor(item) + if err != nil { + return fmt.Errorf("extract meta accessor: %w", err) + } + + // Skip if managed + _, ok := meta.GetManagerProperties() + if ok { + return nil + } + + return fn(client, item) + }) +} + +// ForEachSupportedResource applies the function to each supported resource +func (c *ResourceClients) ForEachSupportedResource(ctx context.Context, fn func(client dynamic.ResourceInterface, item *unstructured.Unstructured) error) error { + for _, kind := range SupportedResources { + if err := c.ForEachResource(ctx, kind, fn); err != nil { + return err + } + } + return nil +} + +func (c *ResourceClients) Folder() (dynamic.ResourceInterface, error) { + client, _, err := c.ForResource(FolderResource) + return client, err +} + +func (c *ResourceClients) ForEachFolder(ctx context.Context, fn func(client dynamic.ResourceInterface, item *unstructured.Unstructured) error) error { + return c.ForEachResource(ctx, FolderResource, fn) } func (c *ResourceClients) User() (dynamic.ResourceInterface, error) { - v, _, err := c.ForResource(schema.GroupVersionResource{ - Group: iam.GROUP, - Version: iam.VERSION, - Resource: iam.UserResourceInfo.GroupResource().Resource, - }) + v, _, err := c.ForResource(UserResource) return v, err } -func (c *ResourceClients) Dashboard() (dynamic.ResourceInterface, error) { - v, _, err := c.ForResource(schema.GroupVersionResource{ - Group: dashboard.GROUP, - Version: dashboard.VERSION, - Resource: dashboard.DASHBOARD_RESOURCE, - }) - return v, err +var UserResource = schema.GroupVersionResource{ + Group: iam.GROUP, + Version: iam.VERSION, + Resource: iam.UserResourceInfo.GroupResource().Resource, } + +var FolderResource = schema.GroupVersionResource{ + Group: folders.GROUP, + Version: folders.VERSION, + Resource: folders.RESOURCE, +} + +var DashboardResource = schema.GroupVersionResource{ + Group: dashboard.GROUP, + Version: dashboard.VERSION, + Resource: dashboard.DASHBOARD_RESOURCE, +} + +// SupportedResources is the list of resources that are supported by provisioning +var SupportedResources = []schema.GroupVersionResource{FolderResource, DashboardResource} diff --git a/pkg/registry/apis/provisioning/resources/dualwriter.go b/pkg/registry/apis/provisioning/resources/dualwriter.go new file mode 100644 index 00000000000..2a841187080 --- /dev/null +++ b/pkg/registry/apis/provisioning/resources/dualwriter.go @@ -0,0 +1,276 @@ +package resources + +import ( + "context" + "errors" + "fmt" + + "github.com/grafana/grafana/pkg/apimachinery/apis/common/v0alpha1" + provisioning "github.com/grafana/grafana/pkg/apis/provisioning/v0alpha1" + "github.com/grafana/grafana/pkg/registry/apis/provisioning/repository" + "github.com/grafana/grafana/pkg/registry/apis/provisioning/safepath" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// DualReadWriter is a wrapper around a repository that can read and write resources +// TODO: it does not support folders yet +type DualReadWriter struct { + repo repository.ReaderWriter + parser *Parser + folders *FolderManager +} + +func NewDualReadWriter(repo repository.ReaderWriter, parser *Parser, folders *FolderManager) *DualReadWriter { + return &DualReadWriter{repo: repo, parser: parser, folders: folders} +} + +func (r *DualReadWriter) Read(ctx context.Context, path string, ref string) (*ParsedResource, error) { + // TODO: implement this + if safepath.IsDir(path) { + return nil, fmt.Errorf("folder read not supported") + } + + info, err := r.repo.Read(ctx, path, ref) + if err != nil { + return nil, err + } + + parsed, err := r.parser.Parse(ctx, info, false) + if err != nil { + return nil, err + } + + // GVR will exist for anything we can actually save + // TODO: Add known error in parser for unsupported resource + if parsed.GVR == nil { + if parsed.GVK != nil { + //nolint:govet + parsed.Errors = append(parsed.Errors, fmt.Errorf("unknown resource for Kind: %s", parsed.GVK.Kind)) + } else { + parsed.Errors = append(parsed.Errors, fmt.Errorf("unknown resource")) + } + } + + return parsed, nil +} + +func (r *DualReadWriter) Delete(ctx context.Context, path string, ref string, message string) (*ParsedResource, error) { + if err := repository.IsWriteAllowed(r.repo.Config(), ref); err != nil { + return nil, err + } + + // TODO: implement this + if safepath.IsDir(path) { + return nil, fmt.Errorf("folder delete not supported") + } + + file, err := r.repo.Read(ctx, path, ref) + if err != nil { + return nil, err // unable to read value + } + + // TODO: document in API specification + // We can only delete parsable things + parsed, err := r.parser.Parse(ctx, file, false) + if err != nil { + return nil, err // unable to read value + } + + parsed.Action = provisioning.ResourceActionDelete + err = r.repo.Delete(ctx, path, ref, message) + if err != nil { + return nil, fmt.Errorf("delete file from repository: %w", err) + } + + // Delete the file in the grafana database + if ref == "" { + err = parsed.Client.Delete(ctx, parsed.Obj.GetName(), metav1.DeleteOptions{}) + if apierrors.IsNotFound(err) { + err = nil // ignorable + } + + if err != nil { + return nil, fmt.Errorf("delete resource from storage: %w", err) + } + } + + return parsed, err +} + +// CreateFolder creates a new folder in the repository +// FIXME: fix signature to return ParsedResource +func (r *DualReadWriter) CreateFolder(ctx context.Context, path string, ref string, message string) (*provisioning.ResourceWrapper, error) { + if err := repository.IsWriteAllowed(r.repo.Config(), ref); err != nil { + return nil, err + } + + if !safepath.IsDir(path) { + return nil, fmt.Errorf("not a folder path") + } + + // Now actually create the folder + if err := r.repo.Create(ctx, path, ref, nil, message); err != nil { + return nil, fmt.Errorf("failed to create folder: %w", err) + } + + cfg := r.repo.Config() + wrap := &provisioning.ResourceWrapper{ + Path: path, + Ref: ref, + Repository: provisioning.ResourceRepositoryInfo{ + Type: cfg.Spec.Type, + Namespace: cfg.Namespace, + Name: cfg.Name, + Title: cfg.Spec.Title, + }, + Resource: provisioning.ResourceObjects{ + Action: provisioning.ResourceActionCreate, + }, + } + + if ref == "" { + folderName, err := r.folders.EnsureFolderPathExist(ctx, path) + if err != nil { + return nil, err + } + + current, err := r.folders.GetFolder(ctx, folderName) + if err != nil && !apierrors.IsNotFound(err) { + return nil, err // unable to check if the folder exists + } + wrap.Resource.Upsert = v0alpha1.Unstructured{ + Object: current.Object, + } + } + + return wrap, nil +} + +// CreateResource creates a new resource in the repository +func (r *DualReadWriter) CreateResource(ctx context.Context, path string, ref string, message string, data []byte) (*ParsedResource, error) { + if err := repository.IsWriteAllowed(r.repo.Config(), ref); err != nil { + return nil, err + } + + info := &repository.FileInfo{ + Data: data, + Path: path, + Ref: ref, + } + + // TODO: improve parser to parse out of reader + parsed, err := r.parser.Parse(ctx, info, true) + if err != nil { + if errors.Is(err, ErrUnableToReadResourceBytes) { + return nil, apierrors.NewBadRequest("unable to read the request as a resource") + } + + return nil, err + } + + // GVR will exist for anything we can actually save + // TODO: Add known error in parser for unsupported resource + if parsed.GVR == nil { + return nil, apierrors.NewBadRequest("The payload does not map to a known resource") + } + + // Do not write if any errors exist + if len(parsed.Errors) > 0 { + return parsed, err + } + + data, err = parsed.ToSaveBytes() + if err != nil { + return nil, err + } + + err = r.repo.Create(ctx, path, ref, data, message) + if err != nil { + return nil, fmt.Errorf("create resource in repository: %w", err) + } + + // Directly update the grafana database + // Behaves the same running sync after writing + if ref == "" { + // FIXME: we are not creating the folder path + // TODO: will existing also be present here? for update? + if parsed.Existing == nil { + parsed.Upsert, err = parsed.Client.Create(ctx, parsed.Obj, metav1.CreateOptions{}) + if err != nil { + parsed.Errors = append(parsed.Errors, err) + } + } else { + parsed.Upsert, err = parsed.Client.Update(ctx, parsed.Obj, metav1.UpdateOptions{}) + if err != nil { + parsed.Errors = append(parsed.Errors, err) + } + } + } + + return parsed, err +} + +// UpdateResource updates a resource in the repository +func (r *DualReadWriter) UpdateResource(ctx context.Context, path string, ref string, message string, data []byte) (*ParsedResource, error) { + if err := repository.IsWriteAllowed(r.repo.Config(), ref); err != nil { + return nil, err + } + + info := &repository.FileInfo{ + Data: data, + Path: path, + Ref: ref, + } + + // TODO: improve parser to parse out of reader + parsed, err := r.parser.Parse(ctx, info, true) + if err != nil { + if errors.Is(err, ErrUnableToReadResourceBytes) { + return nil, apierrors.NewBadRequest("unable to read the request as a resource") + } + + return nil, err + } + + // GVR will exist for anything we can actually save + // TODO: Add known error in parser for unsupported resource + if parsed.GVR == nil { + return nil, apierrors.NewBadRequest("The payload does not map to a known resource") + } + + // Do not write if any errors exist + if len(parsed.Errors) > 0 { + return parsed, err + } + + data, err = parsed.ToSaveBytes() + if err != nil { + return nil, err + } + + err = r.repo.Update(ctx, path, ref, data, message) + if err != nil { + return nil, fmt.Errorf("update resource in repository: %w", err) + } + + // Directly update the grafana database + // Behaves the same running sync after writing + if ref == "" { + // FIXME: we are not creating the folder path + // FIXME: I don't like this parsed strategy here + if parsed.Existing == nil { + parsed.Upsert, err = parsed.Client.Create(ctx, parsed.Obj, metav1.CreateOptions{}) + if err != nil { + parsed.Errors = append(parsed.Errors, err) + } + } else { + parsed.Upsert, err = parsed.Client.Update(ctx, parsed.Obj, metav1.UpdateOptions{}) + if err != nil { + parsed.Errors = append(parsed.Errors, err) + } + } + } + + return parsed, err +} diff --git a/pkg/registry/apis/provisioning/resources/folders.go b/pkg/registry/apis/provisioning/resources/folders.go index c301d676720..12150403e3c 100644 --- a/pkg/registry/apis/provisioning/resources/folders.go +++ b/pkg/registry/apis/provisioning/resources/folders.go @@ -2,6 +2,7 @@ package resources import ( "context" + "errors" "fmt" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -15,16 +16,18 @@ import ( "github.com/grafana/grafana/pkg/registry/apis/provisioning/safepath" ) +const maxFolders = 10000 + type FolderManager struct { - repo repository.Repository - lookup *FolderTree + repo repository.ReaderWriter + tree *FolderTree client dynamic.ResourceInterface } -func NewFolderManager(repo repository.Repository, client dynamic.ResourceInterface) *FolderManager { +func NewFolderManager(repo repository.ReaderWriter, client dynamic.ResourceInterface, lookup *FolderTree) *FolderManager { return &FolderManager{ repo: repo, - lookup: NewEmptyFolderTree(), + tree: lookup, client: client, } } @@ -33,6 +36,14 @@ func (fm *FolderManager) Client() dynamic.ResourceInterface { return fm.client } +func (fm *FolderManager) Tree() *FolderTree { + return fm.tree +} + +func (fm *FolderManager) SetTree(tree *FolderTree) { + fm.tree = tree +} + // EnsureFoldersExist creates the folder structure in the cluster. func (fm *FolderManager) EnsureFolderPathExist(ctx context.Context, filePath string) (parent string, err error) { cfg := fm.repo.Config() @@ -48,13 +59,13 @@ func (fm *FolderManager) EnsureFolderPathExist(ctx context.Context, filePath str } f := ParseFolder(dir, cfg.Name) - if fm.lookup.In(f.ID) { + if fm.tree.In(f.ID) { return f.ID, nil } err = safepath.Walk(ctx, f.Path, func(ctx context.Context, traverse string) error { f := ParseFolder(traverse, cfg.GetName()) - if fm.lookup.In(f.ID) { + if fm.tree.In(f.ID) { parent = f.ID return nil } @@ -63,7 +74,7 @@ func (fm *FolderManager) EnsureFolderPathExist(ctx context.Context, filePath str return fmt.Errorf("ensure folder exists: %w", err) } - fm.lookup.Add(f, parent) + fm.tree.Add(f, parent) parent = f.ID return nil }) @@ -131,3 +142,40 @@ func (fm *FolderManager) EnsureFolderExists(ctx context.Context, folder Folder, func (fm *FolderManager) GetFolder(ctx context.Context, name string) (*unstructured.Unstructured, error) { return fm.client.Get(ctx, name, metav1.GetOptions{}) } + +// ReplicateTree replicates the folder tree to the repository. +// The function fn is called for each folder. +// If the folder already exists, the function is called with created set to false. +// If the folder is created, the function is called with created set to true. +func (fm *FolderManager) EnsureTreeExists(ctx context.Context, ref, path string, fn func(folder Folder, created bool, err error) error) error { + return fm.tree.Walk(ctx, func(ctx context.Context, folder Folder) error { + p := folder.Path + if path != "" { + p = safepath.Join(path, p) + } + + _, err := fm.repo.Read(ctx, p, ref) + if err != nil && !(errors.Is(err, repository.ErrFileNotFound) || apierrors.IsNotFound(err)) { + return fn(folder, false, fmt.Errorf("check if folder exists before writing: %w", err)) + } else if err == nil { + return fn(folder, false, nil) + } + + msg := fmt.Sprintf("Add folder %s", p) + if err := fm.repo.Create(ctx, p, ref, nil, msg); err != nil { + return fn(folder, true, fmt.Errorf("write folder in repo: %w", err)) + } + + return fn(folder, true, nil) + }) +} + +func (fm *FolderManager) LoadFromServer(ctx context.Context) error { + return ForEachResource(ctx, fm.client, func(item *unstructured.Unstructured) error { + if fm.tree.Count() > maxFolders { + return errors.New("too many folders") + } + + return fm.tree.AddUnstructured(item, fm.repo.Config().Name) + }) +} diff --git a/pkg/registry/apis/provisioning/resources/parser.go b/pkg/registry/apis/provisioning/resources/parser.go index a4a3f581025..a81dc9ccf65 100644 --- a/pkg/registry/apis/provisioning/resources/parser.go +++ b/pkg/registry/apis/provisioning/resources/parser.go @@ -104,6 +104,8 @@ type ParsedResource struct { Errors []error } +// FIXME: eliminate clients from parser + func (r *Parser) Clients() *ResourceClients { return r.clients } diff --git a/pkg/registry/apis/provisioning/resources/resources.go b/pkg/registry/apis/provisioning/resources/resources.go new file mode 100644 index 00000000000..6a7660deebe --- /dev/null +++ b/pkg/registry/apis/provisioning/resources/resources.go @@ -0,0 +1,229 @@ +package resources + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + + "github.com/grafana/grafana/pkg/apimachinery/utils" + "github.com/grafana/grafana/pkg/infra/slugify" + "github.com/grafana/grafana/pkg/registry/apis/provisioning/repository" + "github.com/grafana/grafana/pkg/registry/apis/provisioning/safepath" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +var ErrAlreadyInRepository = errors.New("already in repository") + +type WriteOptions struct { + Identifier bool + Path string + Ref string +} + +type resourceID struct { + Name string + Resource string + Group string +} + +type ResourcesManager struct { + repo repository.ReaderWriter + folders *FolderManager + parser *Parser + clients *ResourceClients + userInfo map[string]repository.CommitSignature + resourcesLookup map[resourceID]string // the path with this k8s name +} + +func NewResourcesManager(repo repository.ReaderWriter, folders *FolderManager, parser *Parser, clients *ResourceClients, userInfo map[string]repository.CommitSignature) *ResourcesManager { + return &ResourcesManager{ + repo: repo, + folders: folders, + parser: parser, + clients: clients, + userInfo: userInfo, + resourcesLookup: map[resourceID]string{}, + } +} + +// CreateResource writes an object to the repository +func (r *ResourcesManager) CreateResourceFileFromObject(ctx context.Context, obj *unstructured.Unstructured, options WriteOptions) (string, error) { + if err := ctx.Err(); err != nil { + return "", fmt.Errorf("context error: %w", err) + } + + meta, err := utils.MetaAccessor(obj) + if err != nil { + return "", fmt.Errorf("extract meta accessor: %w", err) + } + + // Message from annotations + commitMessage := meta.GetMessage() + if commitMessage == "" { + g := meta.GetGeneration() + if g > 0 { + commitMessage = fmt.Sprintf("Generation: %d", g) + } else { + commitMessage = "exported from grafana" + } + } + + ctx = r.withAuthorSignature(ctx, meta) + + name := meta.GetName() + manager, _ := meta.GetManagerProperties() + // TODO: how we should handle this? + if manager.Identity == r.repo.Config().GetName() { + // If it's already in the repository, we don't need to write it + return "", ErrAlreadyInRepository + } + + title := meta.FindTitle("") + if title == "" { + title = name + } + folder := meta.GetFolder() + + // Get the absolute path of the folder + fid, ok := r.folders.Tree().DirPath(folder, "") + if !ok { + // FIXME: Shouldn't this fail instead? + fid = Folder{ + Path: "__folder_not_found/" + slugify.Slugify(folder), + } + // r.logger.Error("folder of item was not in tree of repository") + } + + // Clear the metadata + delete(obj.Object, "metadata") + + if options.Identifier { + meta.SetName(name) // keep the identifier in the metadata + } + + body, err := json.MarshalIndent(obj.Object, "", " ") + if err != nil { + return "", fmt.Errorf("failed to marshal dashboard: %w", err) + } + + fileName := slugify.Slugify(title) + ".json" + if fid.Path != "" { + fileName = safepath.Join(fid.Path, fileName) + } + if options.Path != "" { + fileName = safepath.Join(options.Path, fileName) + } + + err = r.repo.Write(ctx, fileName, options.Ref, body, commitMessage) + if err != nil { + return "", fmt.Errorf("failed to write file: %w", err) + } + + return fileName, nil +} + +func (r *ResourcesManager) WriteResourceFromFile(ctx context.Context, path string, ref string) (string, *schema.GroupVersionKind, error) { + // Read the referenced file + fileInfo, err := r.repo.Read(ctx, path, ref) + if err != nil { + return "", nil, fmt.Errorf("failed to read file: %w", err) + } + + parsed, err := r.parser.Parse(ctx, fileInfo, false) // no validation + if err != nil { + return "", nil, fmt.Errorf("failed to parse file: %w", err) + } + + // Check if the resource already exists + id := resourceID{ + Name: parsed.Obj.GetName(), + Resource: parsed.GVR.Resource, + Group: parsed.GVK.Group, + } + existing, found := r.resourcesLookup[id] + if found { + return "", parsed.GVK, fmt.Errorf("duplicate resource name: %s, %s and %s", parsed.Obj.GetName(), path, existing) + } + r.resourcesLookup[id] = path + + // Make sure the parent folders exist + folder, err := r.folders.EnsureFolderPathExist(ctx, path) + if err != nil { + return "", parsed.GVK, fmt.Errorf("failed to ensure folder path exists: %w", err) + } + + parsed.Meta.SetFolder(folder) + parsed.Meta.SetUID("") // clear identifiers + parsed.Meta.SetResourceVersion("") // clear identifiers + + // Update will also create (for resources we care about) + _, err = parsed.Client.Update(ctx, parsed.Obj, metav1.UpdateOptions{}) + + return parsed.Obj.GetName(), parsed.GVK, err +} + +func (r *ResourcesManager) RenameResourceFile(ctx context.Context, previousPath, previousRef, newPath, newRef string) (string, *schema.GroupVersionKind, error) { + name, gvk, err := r.RemoveResourceFromFile(ctx, previousPath, previousRef) + if err != nil { + return name, gvk, fmt.Errorf("failed to remove resource: %w", err) + } + + return r.WriteResourceFromFile(ctx, newPath, newRef) +} + +func (r *ResourcesManager) RemoveResourceFromFile(ctx context.Context, path string, ref string) (string, *schema.GroupVersionKind, error) { + info, err := r.repo.Read(ctx, path, ref) + if err != nil { + return "", nil, fmt.Errorf("failed to read file: %w", err) + } + + obj, gvk, _ := DecodeYAMLObject(bytes.NewBuffer(info.Data)) + if obj == nil { + return "", nil, fmt.Errorf("no object found") + } + + objName := obj.GetName() + if objName == "" { + // Find the referenced file + objName, _ = NamesFromHashedRepoPath(r.repo.Config().Name, path) + } + + client, _, err := r.clients.ForKind(*gvk) + if err != nil { + return "", nil, fmt.Errorf("unable to get client for deleted object: %w", err) + } + + err = client.Delete(ctx, objName, metav1.DeleteOptions{}) + if err != nil { + return "", nil, fmt.Errorf("failed to delete: %w", err) + } + + return objName, gvk, nil +} + +func (r *ResourcesManager) withAuthorSignature(ctx context.Context, item utils.GrafanaMetaAccessor) context.Context { + id := item.GetUpdatedBy() + if id == "" { + id = item.GetCreatedBy() + } + if id == "" { + id = "grafana" + } + + sig := r.userInfo[id] // lookup + if sig.Name == "" && sig.Email == "" { + sig.Name = id + } + t, err := item.GetUpdatedTimestamp() + if err == nil && t != nil { + sig.When = *t + } else { + sig.When = item.GetCreationTimestamp().Time + } + + return repository.WithAuthorSignature(ctx, sig) +} diff --git a/pkg/registry/apis/provisioning/resources/tree.go b/pkg/registry/apis/provisioning/resources/tree.go index b11ac5a3cb7..3f570f37e23 100644 --- a/pkg/registry/apis/provisioning/resources/tree.go +++ b/pkg/registry/apis/provisioning/resources/tree.go @@ -19,6 +19,7 @@ import ( type FolderTree struct { tree map[string]string folders map[string]Folder + count int } // In determines if the given folder is in the tree at all. That is, it answers "does the folder even exist in the Grafana instance?" @@ -66,6 +67,11 @@ func (t *FolderTree) DirPath(folder, baseFolder string) (fid Folder, ok bool) { func (t *FolderTree) Add(folder Folder, parent string) { t.tree[folder.ID] = parent t.folders[folder.ID] = folder + t.count++ +} + +func (t *FolderTree) Count() int { + return t.count } type WalkFunc func(ctx context.Context, folder Folder) error @@ -113,6 +119,7 @@ func (t *FolderTree) AddUnstructured(item *unstructured.Unstructured, skipRepo s } t.tree[folder.ID] = meta.GetFolder() t.folders[folder.ID] = folder + t.count++ return nil } @@ -133,7 +140,8 @@ func NewFolderTreeFromResourceList(resources *provisioning.ResourceList) *Folder } return &FolderTree{ - tree, - folderIDs, + tree: tree, + folders: folderIDs, + count: len(resources.Items), } } diff --git a/pkg/tests/apis/provisioning/client_test.go b/pkg/tests/apis/provisioning/client_test.go index 71cb162b6ea..af46419d983 100644 --- a/pkg/tests/apis/provisioning/client_test.go +++ b/pkg/tests/apis/provisioning/client_test.go @@ -10,6 +10,7 @@ import ( "github.com/grafana/grafana/pkg/registry/apis/provisioning/resources" ) +// FIXME: do this tests make sense in their current form? func TestIntegrationProvisioning_Client(t *testing.T) { if testing.Short() { t.Skip("skipping integration test") @@ -22,32 +23,25 @@ func TestIntegrationProvisioning_Client(t *testing.T) { require.NoError(t, err) t.Run("dashboard client support", func(t *testing.T) { - dash, err := clients.Dashboard() - require.NoError(t, err) - require.NotNil(t, dash) - - client, _, err := clients.ForResource(schema.GroupVersionResource{ + _, _, err := clients.ForResource(schema.GroupVersionResource{ Group: "dashboard.grafana.app", Resource: "dashboards", Version: "v1alpha1", }) require.NoError(t, err) - require.Equal(t, dash, client, "expecting the default dashboard to be version1") // With empty version, we should get the preferred version (v1alpha1) - client, _, err = clients.ForResource(schema.GroupVersionResource{ + _, _, err = clients.ForResource(schema.GroupVersionResource{ Group: "dashboard.grafana.app", Resource: "dashboards", }) require.NoError(t, err) - require.Equal(t, dash, client, "expecting the default dashboard to be version0") - client, _, err = clients.ForKind(schema.GroupVersionKind{ + _, _, err = clients.ForKind(schema.GroupVersionKind{ Group: "dashboard.grafana.app", Version: "v1alpha1", Kind: "Dashboard", }) require.NoError(t, err) - require.Equal(t, dash, client, "same client when requested by kind") }) }