Remote provisioning: consolidate resource operations (#102972)

* Move to new repository

* Rename it to dual writer

* Rename the function

* Rename the methods

* Rename to exportResource

* Clean up logic in migrate and add TODOs

* Add TODOs

* Use generic client for unprovisioned

* ForEachResource

* More consolidation

* Refactor more around client

* Consolidate constants

* ForEachFolder

* More use of constants

* Add FIXME notes

* Use more constant

* Remove Dashboard

* Pass tree to folder manager

* Replicate tree

* Reduce export complexity

* More refactoring

* Use the ForEach for loading users

* Limit in-memory folders

* Isolate the object

* Improve the export function

* Move resources to resources package

* Move delete operation

* Move more logic

* More consolidation

* More renaming

* Fix more issues

* Ensure path exists when created a resource

* Simply append error

* Fix receiver lint issue

* Fix cyclomatic complexity

* Fix linting

* Remove folder path creation
This commit is contained in:
Roberto Jiménez Sánchez
2025-03-31 14:27:46 +02:00
committed by GitHub
parent 5668ab9676
commit bb344fcd83
21 changed files with 1117 additions and 1136 deletions

View File

@ -2,18 +2,15 @@ package provisioning
import (
"context"
"errors"
"fmt"
"net/http"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/rest"
"github.com/grafana/grafana-app-sdk/logging"
"github.com/grafana/grafana/pkg/apimachinery/apis/common/v0alpha1"
provisioning "github.com/grafana/grafana/pkg/apis/provisioning/v0alpha1"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/repository"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/resources"
@ -54,7 +51,6 @@ func (*filesConnector) NewConnectOptions() (runtime.Object, bool, string) {
}
// TODO: document the synchronous write and delete on the API Spec
// TODO: Move dual write logic to `resources` package and keep this connector simple
func (s *filesConnector) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
logger := logging.FromContext(ctx).With("logger", "files-connector", "repository_name", name)
ctx = logging.Context(ctx, logger)
@ -64,11 +60,23 @@ func (s *filesConnector) Connect(ctx context.Context, name string, opts runtime.
return nil, err
}
reader, ok := repo.(repository.Reader)
readWriter, ok := repo.(repository.ReaderWriter)
if !ok {
return nil, apierrors.NewBadRequest("repository does not support read")
return nil, apierrors.NewBadRequest("repository does not support read-writing")
}
parser, err := s.parsers.GetParser(ctx, readWriter)
if err != nil {
return nil, fmt.Errorf("failed to get parser: %w", err)
}
folderClient, err := parser.Clients().Folder()
if err != nil {
return nil, fmt.Errorf("failed to get folder client: %w", err)
}
folders := resources.NewFolderManager(readWriter, folderClient, resources.NewEmptyFolderTree())
dualReadWriter := resources.NewDualReadWriter(readWriter, parser, folders)
return withTimeout(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
ref := query.Get("ref")
@ -89,30 +97,11 @@ func (s *filesConnector) Connect(ctx context.Context, name string, opts runtime.
isDir := safepath.IsDir(filePath)
if r.Method == http.MethodGet && isDir {
// TODO: Implement folder navigation
if len(filePath) > 0 {
responder.Error(apierrors.NewBadRequest("folder navigation not yet supported"))
return
}
// TODO: Add pagination
rsp, err := reader.ReadTree(ctx, ref)
files, err := s.listFolderFiles(ctx, filePath, ref, readWriter)
if err != nil {
responder.Error(err)
return
}
files := &provisioning.FileList{}
for _, v := range rsp {
if !v.Blob {
continue // folder item
}
files.Items = append(files.Items, provisioning.FileItem{
Path: v.Path,
Size: v.Size,
Hash: v.Hash,
})
}
responder.Object(http.StatusOK, files)
return
}
@ -132,19 +121,60 @@ func (s *filesConnector) Connect(ctx context.Context, name string, opts runtime.
code := http.StatusOK
switch r.Method {
case http.MethodGet:
code, obj, err = s.doRead(ctx, reader, filePath, ref)
resource, err := dualReadWriter.Read(ctx, filePath, ref)
if err != nil {
responder.Error(err)
return
}
obj = resource.AsResourceWrapper()
code = http.StatusOK
if len(resource.Errors) > 0 {
code = http.StatusNotAcceptable
}
case http.MethodPost:
obj, err = s.doWrite(ctx, false, repo, filePath, ref, message, r)
if isDir {
obj, err = dualReadWriter.CreateFolder(ctx, filePath, ref, message)
} else {
data, err := readBody(r, filesMaxBodySize)
if err != nil {
responder.Error(err)
return
}
resource, err := dualReadWriter.CreateResource(ctx, filePath, ref, message, data)
if err != nil {
responder.Error(err)
return
}
obj = resource.AsResourceWrapper()
}
case http.MethodPut:
// TODO: document in API specification
if isDir {
err = apierrors.NewMethodNotSupported(provisioning.RepositoryResourceInfo.GroupResource(), r.Method)
} else {
obj, err = s.doWrite(ctx, true, repo, filePath, ref, message, r)
data, err := readBody(r, filesMaxBodySize)
if err != nil {
responder.Error(err)
return
}
resource, err := dualReadWriter.UpdateResource(ctx, filePath, ref, message, data)
if err != nil {
responder.Error(err)
return
}
obj = resource.AsResourceWrapper()
}
case http.MethodDelete:
// TODO: limit file size
obj, err = s.doDelete(ctx, repo, filePath, ref, message)
resource, err := dualReadWriter.Delete(ctx, filePath, ref, message)
if err != nil {
responder.Error(err)
return
}
obj = resource.AsResourceWrapper()
default:
err = apierrors.NewMethodNotSupported(provisioning.RepositoryResourceInfo.GroupResource(), r.Method)
}
@ -165,218 +195,32 @@ func (s *filesConnector) Connect(ctx context.Context, name string, opts runtime.
}), 30*time.Second), nil
}
func (s *filesConnector) doRead(ctx context.Context, repo repository.Reader, path string, ref string) (int, *provisioning.ResourceWrapper, error) {
info, err := repo.Read(ctx, path, ref)
if err != nil {
return 0, nil, err
// listFolderFiles returns a list of files in a folder
func (s *filesConnector) listFolderFiles(ctx context.Context, filePath string, ref string, readWriter repository.ReaderWriter) (*provisioning.FileList, error) {
// TODO: Implement folder navigation
if len(filePath) > 0 {
return nil, apierrors.NewBadRequest("folder navigation not yet supported")
}
parser, err := s.parsers.GetParser(ctx, repo)
// TODO: Add pagination
rsp, err := readWriter.ReadTree(ctx, ref)
if err != nil {
return 0, nil, err
return nil, err
}
parsed, err := parser.Parse(ctx, info, true)
if err != nil {
return 0, nil, err
}
// GVR will exist for anything we can actually save
// TODO: Add known error in parser for unsupported resource
if parsed.GVR == nil {
if parsed.GVK != nil {
//nolint:govet
parsed.Errors = append(parsed.Errors, fmt.Errorf("unknown resource for Kind: %s", parsed.GVK.Kind))
} else {
parsed.Errors = append(parsed.Errors, fmt.Errorf("unknown resource"))
files := &provisioning.FileList{}
for _, v := range rsp {
if !v.Blob {
continue // folder item
}
files.Items = append(files.Items, provisioning.FileItem{
Path: v.Path,
Size: v.Size,
Hash: v.Hash,
})
}
code := http.StatusOK
if len(parsed.Errors) > 0 {
code = http.StatusNotAcceptable
}
return code, parsed.AsResourceWrapper(), nil
}
func (s *filesConnector) doWrite(ctx context.Context, update bool, repo repository.Repository, path string, ref string, message string, req *http.Request) (*provisioning.ResourceWrapper, error) {
if err := repository.IsWriteAllowed(repo.Config(), ref); err != nil {
return nil, err
}
writer, ok := repo.(repository.ReaderWriter)
if !ok {
return nil, apierrors.NewBadRequest("repository does not support read-writing")
}
parser, err := s.parsers.GetParser(ctx, writer)
if err != nil {
return nil, err
}
defer func() { _ = req.Body.Close() }()
if safepath.IsDir(path) {
return s.doCreateFolder(ctx, writer, path, ref, message, parser)
}
data, err := readBody(req, filesMaxBodySize)
if err != nil {
return nil, err
}
info := &repository.FileInfo{
Data: data,
Path: path,
Ref: ref,
}
// TODO: improve parser to parse out of reader
parsed, err := parser.Parse(ctx, info, true)
if err != nil {
if errors.Is(err, resources.ErrUnableToReadResourceBytes) {
return nil, apierrors.NewBadRequest("unable to read the request as a resource")
}
return nil, err
}
// GVR will exist for anything we can actually save
// TODO: Add known error in parser for unsupported resource
if parsed.GVR == nil {
return nil, apierrors.NewBadRequest("The payload does not map to a known resource")
}
// Do not write if any errors exist
if len(parsed.Errors) > 0 {
return parsed.AsResourceWrapper(), err
}
data, err = parsed.ToSaveBytes()
if err != nil {
return nil, err
}
if update {
err = writer.Update(ctx, path, ref, data, message)
} else {
err = writer.Create(ctx, path, ref, data, message)
}
if err != nil {
return nil, err
}
// Directly update the grafana database
// Behaves the same running sync after writing
if ref == "" {
if parsed.Existing == nil {
parsed.Upsert, err = parsed.Client.Create(ctx, parsed.Obj, metav1.CreateOptions{})
if err != nil {
parsed.Errors = append(parsed.Errors, err)
}
} else {
parsed.Upsert, err = parsed.Client.Update(ctx, parsed.Obj, metav1.UpdateOptions{})
if err != nil {
parsed.Errors = append(parsed.Errors, err)
}
}
}
return parsed.AsResourceWrapper(), err
}
func (s *filesConnector) doCreateFolder(ctx context.Context, repo repository.Writer, path string, ref string, message string, parser *resources.Parser) (*provisioning.ResourceWrapper, error) {
client, err := parser.Clients().Folder()
if err != nil {
return nil, err
}
manager := resources.NewFolderManager(repo, client)
// Now actually create the folder
if err := repo.Create(ctx, path, ref, nil, message); err != nil {
return nil, fmt.Errorf("failed to create folder: %w", err)
}
cfg := repo.Config()
wrap := &provisioning.ResourceWrapper{
Path: path,
Ref: ref,
Repository: provisioning.ResourceRepositoryInfo{
Type: cfg.Spec.Type,
Namespace: cfg.Namespace,
Name: cfg.Name,
Title: cfg.Spec.Title,
},
Resource: provisioning.ResourceObjects{
Action: provisioning.ResourceActionCreate,
},
}
if ref == "" {
folderName, err := manager.EnsureFolderPathExist(ctx, path)
if err != nil {
return nil, err
}
current, err := manager.GetFolder(ctx, folderName)
if err != nil && !apierrors.IsNotFound(err) {
return nil, err // unable to check if the folder exists
}
wrap.Resource.Upsert = v0alpha1.Unstructured{
Object: current.Object,
}
}
return wrap, nil
}
// Deletes a file from the repository and the Grafana database.
// If the path is a folder, it will return an error.
// If the file is not parsable, it will return an error.
func (s *filesConnector) doDelete(ctx context.Context, repo repository.Repository, path string, ref string, message string) (*provisioning.ResourceWrapper, error) {
if err := repository.IsWriteAllowed(repo.Config(), ref); err != nil {
return nil, err
}
// Read the existing value
access, ok := repo.(repository.ReaderWriter)
if !ok {
return nil, fmt.Errorf("repository is not read+writeable")
}
file, err := access.Read(ctx, path, ref)
if err != nil {
return nil, err // unable to read value
}
parser, err := s.parsers.GetParser(ctx, access)
if err != nil {
return nil, err // unable to read value
}
// TODO: document in API specification
// We can only delete parsable things
parsed, err := parser.Parse(ctx, file, false)
if err != nil {
return nil, err // unable to read value
}
parsed.Action = provisioning.ResourceActionDelete
wrap := parsed.AsResourceWrapper()
// Now delete the file
err = access.Delete(ctx, path, ref, message)
if err != nil {
return nil, err
}
// Delete the file in the grafana database
if ref == "" {
err = parsed.Client.Delete(ctx, parsed.Obj.GetName(), metav1.DeleteOptions{})
if apierrors.IsNotFound(err) {
err = nil // ignorable
}
}
return wrap, err
return files, nil
}
var (

View File

@ -1,96 +0,0 @@
package export
import (
"context"
"errors"
"fmt"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
folders "github.com/grafana/grafana/pkg/apis/folder/v0alpha1"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/repository"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/resources"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/safepath"
)
// FIXME: revise logging in this method
func (r *exportJob) loadFolders(ctx context.Context) error {
logger := r.logger
r.progress.SetMessage(ctx, "reading folder tree")
repoName := r.target.Config().Name
// TODO: should this be logging or message or both?
r.progress.SetMessage(ctx, "read folder tree from unified storage")
client, err := r.client.Folder()
if err != nil {
return err
}
rawList, err := client.List(ctx, metav1.ListOptions{Limit: 10000})
if err != nil {
return fmt.Errorf("failed to list folders: %w", err)
}
if rawList.GetContinue() != "" {
return fmt.Errorf("unable to list all folders in one request: %s", rawList.GetContinue())
}
for _, item := range rawList.Items {
err = r.folderTree.AddUnstructured(&item, repoName)
if err != nil {
r.progress.Record(ctx, jobs.JobResourceResult{
Name: item.GetName(),
Resource: folders.RESOURCE,
Group: folders.GROUP,
Error: err,
})
}
}
// create folders first is required so that empty folders exist when finished
r.progress.SetMessage(ctx, "write folders")
err = r.folderTree.Walk(ctx, func(ctx context.Context, folder resources.Folder) error {
p := folder.Path
if r.path != "" {
p = safepath.Join(r.path, p)
}
logger := logger.With("path", p)
result := jobs.JobResourceResult{
Name: folder.ID,
Resource: folders.RESOURCE,
Group: folders.GROUP,
Path: p,
}
_, err := r.target.Read(ctx, p, r.ref)
if err != nil && !(errors.Is(err, repository.ErrFileNotFound) || apierrors.IsNotFound(err)) {
result.Error = fmt.Errorf("failed to check if folder exists before writing: %w", err)
return result.Error
} else if err == nil {
logger.Info("folder already exists")
result.Action = repository.FileActionIgnored
r.progress.Record(ctx, result)
return nil
}
result.Action = repository.FileActionCreated
msg := fmt.Sprintf("export folder %s", p)
// Create with an empty body will make a folder (or .keep file if unsupported)
if err := r.target.Create(ctx, p, r.ref, nil, msg); err != nil {
result.Error = fmt.Errorf("failed to write folder in repo: %w", err)
r.progress.Record(ctx, result)
return result.Error
}
r.progress.Record(ctx, result)
return nil
})
if err != nil {
return fmt.Errorf("failed to write folders: %w", err)
}
return nil
}

View File

@ -1,45 +0,0 @@
package export
import (
"context"
"github.com/grafana/grafana-app-sdk/logging"
provisioning "github.com/grafana/grafana/pkg/apis/provisioning/v0alpha1"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/repository"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/resources"
)
// ExportJob holds all context for a running job
type exportJob struct {
logger logging.Logger
client *resources.ResourceClients // Read from
target repository.ReaderWriter // Write to
namespace string
progress jobs.JobProgressRecorder
folderTree *resources.FolderTree
path string // from options (now clean+safe)
ref string // from options (only git)
keepIdentifier bool
}
func newExportJob(ctx context.Context,
target repository.ReaderWriter,
options provisioning.ExportJobOptions,
clients *resources.ResourceClients,
progress jobs.JobProgressRecorder,
) *exportJob {
return &exportJob{
namespace: target.Config().Namespace,
target: target,
client: clients,
logger: logging.FromContext(ctx),
progress: progress,
path: options.Path,
ref: options.Branch,
keepIdentifier: options.Identifier,
folderTree: resources.NewEmptyFolderTree(),
}
}

View File

@ -1,153 +0,0 @@
package export
import (
"context"
"encoding/json"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
dashboard "github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/v0alpha1"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/infra/slugify"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/repository"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/resources"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/safepath"
)
func (r *exportJob) loadResources(ctx context.Context) error {
kinds := []schema.GroupVersionResource{{
Group: dashboard.GROUP,
Resource: dashboard.DASHBOARD_RESOURCE,
Version: "v1alpha1",
}}
for _, kind := range kinds {
r.progress.SetMessage(ctx, fmt.Sprintf("reading %s resource", kind.Resource))
if err := r.loadResourcesFromAPIServer(ctx, kind); err != nil {
return fmt.Errorf("error loading %s %w", kind.Resource, err)
}
}
return nil
}
func (r *exportJob) loadResourcesFromAPIServer(ctx context.Context, kind schema.GroupVersionResource) error {
client, _, err := r.client.ForResource(kind)
if err != nil {
return err
}
var continueToken string
for ctx.Err() == nil {
list, err := client.List(ctx, metav1.ListOptions{Limit: 100, Continue: continueToken})
if err != nil {
return fmt.Errorf("error executing list: %w", err)
}
for _, item := range list.Items {
if ctx.Err() != nil {
return ctx.Err()
}
r.progress.Record(ctx, r.write(ctx, &item))
if err := r.progress.TooManyErrors(); err != nil {
return err
}
}
continueToken = list.GetContinue()
if continueToken == "" {
break
}
}
return ctx.Err()
}
func (r *exportJob) write(ctx context.Context, obj *unstructured.Unstructured) jobs.JobResourceResult {
gvk := obj.GroupVersionKind()
result := jobs.JobResourceResult{
Name: obj.GetName(),
Resource: gvk.Kind,
Group: gvk.Group,
Action: repository.FileActionCreated,
}
if err := ctx.Err(); err != nil {
result.Error = fmt.Errorf("context error: %w", err)
return result
}
meta, err := utils.MetaAccessor(obj)
if err != nil {
result.Error = fmt.Errorf("extract meta accessor: %w", err)
return result
}
// Message from annotations
commitMessage := meta.GetMessage()
if commitMessage == "" {
g := meta.GetGeneration()
if g > 0 {
commitMessage = fmt.Sprintf("Generation: %d", g)
} else {
commitMessage = "exported from grafana"
}
}
name := meta.GetName()
manager, _ := meta.GetManagerProperties()
if manager.Identity == r.target.Config().GetName() {
result.Action = repository.FileActionIgnored
return result
}
title := meta.FindTitle("")
if title == "" {
title = name
}
folder := meta.GetFolder()
// Get the absolute path of the folder
fid, ok := r.folderTree.DirPath(folder, "")
if !ok {
// FIXME: Shouldn't this fail instead?
fid = resources.Folder{
Path: "__folder_not_found/" + slugify.Slugify(folder),
}
r.logger.Error("folder of item was not in tree of repository")
}
result.Path = fid.Path
// Clear the metadata
delete(obj.Object, "metadata")
if r.keepIdentifier {
meta.SetName(name) // keep the identifier in the metadata
}
body, err := json.MarshalIndent(obj.Object, "", " ")
if err != nil {
result.Error = fmt.Errorf("failed to marshal dashboard: %w", err)
return result
}
fileName := slugify.Slugify(title) + ".json"
if fid.Path != "" {
fileName = safepath.Join(fid.Path, fileName)
}
if r.path != "" {
fileName = safepath.Join(r.path, fileName)
}
err = r.target.Write(ctx, fileName, r.ref, body, commitMessage)
if err != nil {
result.Error = fmt.Errorf("failed to write file: %w", err)
}
return result
}

View File

@ -15,6 +15,8 @@ import (
"github.com/grafana/grafana/pkg/registry/apis/provisioning/resources"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/secrets"
"github.com/grafana/grafana/pkg/storage/legacysql/dualwrite"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/dynamic"
)
type ExportWorker struct {
@ -29,18 +31,22 @@ type ExportWorker struct {
// Decrypt secrets in config
secrets secrets.Service
parsers *resources.ParserFactory
}
func NewExportWorker(clientFactory *resources.ClientFactory,
storageStatus dualwrite.Service,
secrets secrets.Service,
clonedir string,
parsers *resources.ParserFactory,
) *ExportWorker {
return &ExportWorker{
clonedir,
clientFactory,
storageStatus,
secrets,
parsers,
}
}
@ -95,19 +101,83 @@ func (r *ExportWorker) Process(ctx context.Context, repo repository.Repository,
return err
}
worker := newExportJob(ctx, rw, *options, clients, progress)
// Load and write all folders
progress.SetMessage(ctx, "start folder export")
err = worker.loadFolders(ctx)
// FIXME: we load the entire tree in memory
progress.SetMessage(ctx, "read folder tree from API server")
client, err := clients.Folder()
if err != nil {
return err
return fmt.Errorf("failed to get folder client: %w", err)
}
folders := resources.NewFolderManager(rw, client, resources.NewEmptyFolderTree())
if err := folders.LoadFromServer(ctx); err != nil {
return fmt.Errorf("failed to load folders from API server: %w", err)
}
progress.SetMessage(ctx, "write folders to repository")
err = folders.EnsureTreeExists(ctx, options.Branch, options.Path, func(folder resources.Folder, created bool, err error) error {
result := jobs.JobResourceResult{
Action: repository.FileActionCreated,
Name: folder.ID,
Resource: resources.FolderResource.Resource,
Group: resources.FolderResource.Group,
Path: folder.Path,
Error: err,
}
if !created {
result.Action = repository.FileActionIgnored
}
progress.Record(ctx, result)
return nil
})
if err != nil {
return fmt.Errorf("write folders to repository: %w", err)
}
progress.SetMessage(ctx, "start resource export")
err = worker.loadResources(ctx)
parser, err := r.parsers.GetParser(ctx, rw)
if err != nil {
return err
return fmt.Errorf("failed to get parser: %w", err)
}
resourceManager := resources.NewResourcesManager(rw, folders, parser, clients, nil)
for _, kind := range resources.SupportedResources {
// skip from folders as we do them first
if kind == resources.FolderResource {
continue
}
progress.SetMessage(ctx, fmt.Sprintf("reading %s resource", kind.Resource))
if err := clients.ForEachResource(ctx, kind, func(_ dynamic.ResourceInterface, item *unstructured.Unstructured) error {
result := jobs.JobResourceResult{
Name: item.GetName(),
Resource: kind.Resource,
Group: kind.Group,
Action: repository.FileActionCreated,
}
fileName, err := resourceManager.CreateResourceFileFromObject(ctx, item, resources.WriteOptions{
Path: options.Path,
Ref: options.Branch,
Identifier: options.Identifier,
})
if errors.Is(err, resources.ErrAlreadyInRepository) {
result.Action = repository.FileActionIgnored
} else if err != nil {
result.Error = fmt.Errorf("export resource: %w", err)
}
result.Path = fileName
progress.Record(ctx, result)
if err := progress.TooManyErrors(); err != nil {
return err
}
return nil
}); err != nil {
return fmt.Errorf("error exporting %s %w", kind.Resource, err)
}
}
if buffered != nil {

View File

@ -5,109 +5,69 @@ import (
"errors"
"fmt"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
folders "github.com/grafana/grafana/pkg/apis/folder/v0alpha1"
"github.com/grafana/grafana/pkg/registry/apis/dashboard/legacy"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/repository"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/resources"
"github.com/grafana/grafana/pkg/storage/unified/parquet"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
var _ resource.BulkResourceWriter = (*folderReader)(nil)
const maxFolders = 10000
type folderReader struct {
var _ resource.BulkResourceWriter = (*legacyFolderReader)(nil)
type legacyFolderReader struct {
tree *resources.FolderTree
targetRepoName string
repoName string
legacyMigrator legacy.LegacyMigrator
namespace string
}
func NewLegacyFolderReader(legacyMigrator legacy.LegacyMigrator, repoName, namespace string) *legacyFolderReader {
return &legacyFolderReader{
legacyMigrator: legacyMigrator,
repoName: repoName,
namespace: namespace,
tree: resources.NewEmptyFolderTree(),
}
}
// Close implements resource.BulkResourceWrite.
func (f *folderReader) Close() error {
func (f *legacyFolderReader) Close() error {
return nil
}
// CloseWithResults implements resource.BulkResourceWrite.
func (f *folderReader) CloseWithResults() (*resource.BulkResponse, error) {
func (f *legacyFolderReader) CloseWithResults() (*resource.BulkResponse, error) {
return &resource.BulkResponse{}, nil
}
// Write implements resource.BulkResourceWrite.
func (f *folderReader) Write(ctx context.Context, key *resource.ResourceKey, value []byte) error {
func (f *legacyFolderReader) Write(ctx context.Context, key *resource.ResourceKey, value []byte) error {
item := &unstructured.Unstructured{}
err := item.UnmarshalJSON(value)
if err != nil {
return fmt.Errorf("unmarshal unstructured to JSON: %w", err)
}
return f.tree.AddUnstructured(item, f.targetRepoName)
if f.tree.Count() > maxFolders {
return errors.New("too many folders")
}
return f.tree.AddUnstructured(item, f.repoName)
}
func (j *migrationJob) migrateLegacyFolders(ctx context.Context) error {
logger := j.logger
j.progress.SetMessage(ctx, "reading folder tree")
repoName := j.target.Config().Name
j.progress.SetMessage(ctx, "migrate folder tree from legacy")
reader := &folderReader{
tree: j.folderTree,
targetRepoName: repoName,
}
_, err := j.legacy.Migrate(ctx, legacy.MigrateOptions{
Namespace: j.namespace,
Resources: []schema.GroupResource{{
Group: folders.GROUP,
Resource: folders.RESOURCE,
}},
Store: parquet.NewBulkResourceWriterClient(reader),
func (f *legacyFolderReader) Read(ctx context.Context, legacyMigrator legacy.LegacyMigrator, name, namespace string) error {
_, err := legacyMigrator.Migrate(ctx, legacy.MigrateOptions{
Namespace: namespace,
Resources: []schema.GroupResource{resources.FolderResource.GroupResource()},
Store: parquet.NewBulkResourceWriterClient(f),
})
if err != nil {
return fmt.Errorf("unable to read folders from legacy storage %w", err)
}
// create folders first is required so that empty folders exist when finished
j.progress.SetMessage(ctx, "write folders")
err = j.folderTree.Walk(ctx, func(ctx context.Context, folder resources.Folder) error {
p := folder.Path
logger = logger.With("path", p)
result := jobs.JobResourceResult{
Name: folder.ID,
Resource: folders.RESOURCE,
Group: folders.GROUP,
Path: p,
}
_, err := j.target.Read(ctx, p, "")
if err != nil && !(errors.Is(err, repository.ErrFileNotFound) || apierrors.IsNotFound(err)) {
result.Error = fmt.Errorf("failed to check if folder exists before writing: %w", err)
return result.Error
} else if err == nil {
logger.Info("folder already exists")
result.Action = repository.FileActionIgnored
j.progress.Record(ctx, result)
return nil
}
result.Action = repository.FileActionCreated
msg := fmt.Sprintf("export folder %s", p)
// Create with an empty body will make a folder (or .keep file if unsupported)
if err := j.target.Create(ctx, p, "", nil, msg); err != nil {
result.Error = fmt.Errorf("failed to write folder in repo: %w", err)
j.progress.Record(ctx, result)
return result.Error
}
return nil
})
if err != nil {
return fmt.Errorf("failed to write folders: %w", err)
}
return nil
return err
}
func (f *legacyFolderReader) Tree() *resources.FolderTree {
return f.tree
}

View File

@ -2,51 +2,62 @@ package migrate
import (
"context"
"encoding/json"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
dashboard "github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/v0alpha1"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/infra/slugify"
provisioning "github.com/grafana/grafana/pkg/apis/provisioning/v0alpha1"
"github.com/grafana/grafana/pkg/registry/apis/dashboard/legacy"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/repository"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/resources"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/safepath"
"github.com/grafana/grafana/pkg/storage/unified/parquet"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"k8s.io/apimachinery/pkg/runtime/schema"
)
var _ resource.BulkResourceWriter = (*resourceReader)(nil)
var _ resource.BulkResourceWriter = (*legacyResourceResourceMigrator)(nil)
type resourceReader struct {
job *migrationJob
// TODO: can we use the same migrator for folders?
type legacyResourceResourceMigrator struct {
legacy legacy.LegacyMigrator
parser *resources.Parser
progress jobs.JobProgressRecorder
namespace string
kind schema.GroupResource
options provisioning.MigrateJobOptions
resources *resources.ResourcesManager
}
func NewLegacyResourceMigrator(legacy legacy.LegacyMigrator, parser *resources.Parser, resources *resources.ResourcesManager, progress jobs.JobProgressRecorder, options provisioning.MigrateJobOptions, namespace string, kind schema.GroupResource) *legacyResourceResourceMigrator {
return &legacyResourceResourceMigrator{
legacy: legacy,
parser: parser,
progress: progress,
options: options,
namespace: namespace,
kind: kind,
resources: resources,
}
}
// Close implements resource.BulkResourceWriter.
func (r *resourceReader) Close() error {
func (r *legacyResourceResourceMigrator) Close() error {
return nil
}
// CloseWithResults implements resource.BulkResourceWriter.
func (r *resourceReader) CloseWithResults() (*resource.BulkResponse, error) {
func (r *legacyResourceResourceMigrator) CloseWithResults() (*resource.BulkResponse, error) {
return &resource.BulkResponse{}, nil
}
// Write implements resource.BulkResourceWriter.
func (r *resourceReader) Write(ctx context.Context, key *resource.ResourceKey, value []byte) error {
func (r *legacyResourceResourceMigrator) Write(ctx context.Context, key *resource.ResourceKey, value []byte) error {
// Reuse the same parse+cleanup logic
parsed, err := r.job.parser.Parse(ctx, &repository.FileInfo{
parsed, err := r.parser.Parse(ctx, &repository.FileInfo{
Path: "", // empty path to ignore file system
Data: value,
}, false)
if err != nil {
// TODO: should we fail the entire execution?
return fmt.Errorf("failed to unmarshal unstructured: %w", err)
}
@ -54,180 +65,59 @@ func (r *resourceReader) Write(ctx context.Context, key *resource.ResourceKey, v
parsed.Meta.SetManagerProperties(utils.ManagerProperties{})
parsed.Meta.SetSourceProperties(utils.SourceProperties{})
if result := r.job.write(ctx, parsed.Obj); result.Error != nil {
r.job.progress.Record(ctx, result)
if err := r.job.progress.TooManyErrors(); err != nil {
return err
}
}
// TODO: this seems to be same logic as the export job
// TODO: we should use a kind safe manager here
fileName, err := r.resources.CreateResourceFileFromObject(ctx, parsed.Obj, resources.WriteOptions{
Path: "",
Ref: "",
Identifier: r.options.Identifier,
})
return nil
}
func (j *migrationJob) migrateLegacyResources(ctx context.Context) error {
kinds := []schema.GroupVersionResource{{
Group: dashboard.GROUP,
Resource: dashboard.DASHBOARD_RESOURCE,
Version: "v1alpha1",
}}
for _, kind := range kinds {
j.progress.SetMessage(ctx, fmt.Sprintf("migrate %s resource", kind.Resource))
gr := kind.GroupResource()
opts := legacy.MigrateOptions{
Namespace: j.namespace,
WithHistory: j.options.History,
Resources: []schema.GroupResource{gr},
Store: parquet.NewBulkResourceWriterClient(&resourceReader{job: j}),
OnlyCount: true, // first get the count
}
stats, err := j.legacy.Migrate(ctx, opts)
if err != nil {
return fmt.Errorf("unable to count legacy items %w", err)
}
// FIXME: explain why we calculate it in this way
if len(stats.Summary) > 0 {
count := stats.Summary[0].Count //
history := stats.Summary[0].History
if history > count {
count = history // the number of items we will process
}
j.progress.SetTotal(ctx, int(count))
}
opts.OnlyCount = false // this time actually write
_, err = j.legacy.Migrate(ctx, opts)
if err != nil {
return fmt.Errorf("error running legacy migrate %s %w", kind.Resource, err)
}
}
return nil
}
func (j *migrationJob) write(ctx context.Context, obj *unstructured.Unstructured) jobs.JobResourceResult {
gvk := obj.GroupVersionKind()
result := jobs.JobResourceResult{
Name: obj.GetName(),
Resource: gvk.Kind,
Group: gvk.Group,
Name: parsed.Meta.GetName(),
Resource: r.kind.Resource,
Group: r.kind.Group,
Action: repository.FileActionCreated,
Error: err,
Path: fileName,
}
if err := ctx.Err(); err != nil {
result.Error = fmt.Errorf("context error: %w", err)
return result
}
meta, err := utils.MetaAccessor(obj)
if err != nil {
result.Error = fmt.Errorf("extract meta accessor: %w", err)
return result
}
// Message from annotations
commitMessage := meta.GetMessage()
if commitMessage == "" {
g := meta.GetGeneration()
if g > 0 {
commitMessage = fmt.Sprintf("Generation: %d", g)
} else {
commitMessage = "exported from grafana"
}
}
name := meta.GetName()
manager, _ := meta.GetManagerProperties()
if manager.Identity == j.target.Config().GetName() {
result.Action = repository.FileActionIgnored
return result
}
title := meta.FindTitle("")
if title == "" {
title = name
}
folder := meta.GetFolder()
// Add the author in context (if available)
ctx = j.withAuthorSignature(ctx, meta)
// Get the absolute path of the folder
fid, ok := j.folderTree.DirPath(folder, "")
if !ok {
// FIXME: Shouldn't this fail instead?
fid = resources.Folder{
Path: "__folder_not_found/" + slugify.Slugify(folder),
}
j.logger.Error("folder of item was not in tree of repository")
}
result.Path = fid.Path
// Clear the metadata
delete(obj.Object, "metadata")
if j.options.Identifier {
meta.SetName(name) // keep the identifier in the metadata
}
body, err := json.MarshalIndent(obj.Object, "", " ")
if err != nil {
result.Error = fmt.Errorf("failed to marshal dashboard: %w", err)
return result
}
fileName := slugify.Slugify(title) + ".json"
if fid.Path != "" {
fileName = safepath.Join(fid.Path, fileName)
}
err = j.target.Write(ctx, fileName, "", body, commitMessage)
if err != nil {
result.Error = fmt.Errorf("failed to write file: %w", err)
}
return result
}
func removeUnprovisioned(ctx context.Context, client dynamic.ResourceInterface, progress jobs.JobProgressRecorder) error {
rawList, err := client.List(ctx, metav1.ListOptions{Limit: 10000})
if err != nil {
return fmt.Errorf("failed to list resources: %w", err)
}
if rawList.GetContinue() != "" {
return fmt.Errorf("unable to list all resources in one request: %s", rawList.GetContinue())
}
for _, item := range rawList.Items {
// Create a pointer to the item since MetaAccessor requires a pointer
itemPtr := &item
meta, err := utils.MetaAccessor(itemPtr)
if err != nil {
return fmt.Errorf("extract meta accessor: %w", err)
}
// Skip if managed
_, ok := meta.GetManagerProperties()
if ok {
continue
}
result := jobs.JobResourceResult{
Name: item.GetName(),
Resource: item.GetKind(),
Group: item.GroupVersionKind().Group,
Action: repository.FileActionDeleted,
}
if err = client.Delete(ctx, item.GetName(), metav1.DeleteOptions{}); err != nil {
result.Error = fmt.Errorf("failed to delete folder: %w", err)
progress.Record(ctx, result)
return result.Error
}
progress.Record(ctx, result)
r.progress.Record(ctx, result)
if err := r.progress.TooManyErrors(); err != nil {
return err
}
return nil
}
func (r *legacyResourceResourceMigrator) Migrate(ctx context.Context) error {
r.progress.SetMessage(ctx, fmt.Sprintf("migrate %s resource", r.kind.Resource))
opts := legacy.MigrateOptions{
Namespace: r.namespace,
WithHistory: r.options.History,
Resources: []schema.GroupResource{r.kind},
Store: parquet.NewBulkResourceWriterClient(r),
OnlyCount: true, // first get the count
}
stats, err := r.legacy.Migrate(ctx, opts)
if err != nil {
return fmt.Errorf("unable to count legacy items %w", err)
}
// FIXME: explain why we calculate it in this way
if len(stats.Summary) > 0 {
count := stats.Summary[0].Count //
history := stats.Summary[0].History
if history > count {
count = history // the number of items we will process
}
r.progress.SetTotal(ctx, int(count))
}
opts.OnlyCount = false // this time actually write
_, err = r.legacy.Migrate(ctx, opts)
if err != nil {
return fmt.Errorf("error running legacy migrate %s %w", r.kind.Resource, err)
}
return nil

View File

@ -6,27 +6,16 @@ import (
"time"
"google.golang.org/grpc/metadata"
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/grafana/grafana-app-sdk/logging"
dashboard "github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/v0alpha1"
folders "github.com/grafana/grafana/pkg/apis/folder/v0alpha1"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/resources"
"github.com/grafana/grafana/pkg/storage/legacysql/dualwrite"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
// called when an error exists
func stopReadingUnifiedStorage(ctx context.Context, dual dualwrite.Service) error {
kinds := []schema.GroupResource{{
Group: folders.GROUP,
Resource: folders.RESOURCE,
}, {
Group: dashboard.GROUP,
Resource: dashboard.DASHBOARD_RESOURCE,
}}
for _, gr := range kinds {
status, _ := dual.Status(ctx, gr)
for _, gr := range resources.SupportedResources {
status, _ := dual.Status(ctx, gr.GroupResource())
status.ReadUnified = false
status.Migrated = 0
status.Migrating = 0
@ -35,20 +24,13 @@ func stopReadingUnifiedStorage(ctx context.Context, dual dualwrite.Service) erro
return err
}
}
return nil
}
func (j *migrationJob) wipeUnifiedAndSetMigratedFlag(ctx context.Context, dual dualwrite.Service) error {
kinds := []schema.GroupResource{{
Group: folders.GROUP,
Resource: folders.RESOURCE,
}, {
Group: dashboard.GROUP,
Resource: dashboard.DASHBOARD_RESOURCE,
}}
for _, gr := range kinds {
status, _ := dual.Status(ctx, gr)
func wipeUnifiedAndSetMigratedFlag(ctx context.Context, dual dualwrite.Service, namespace string, batch resource.BulkStoreClient) error {
for _, gr := range resources.SupportedResources {
status, _ := dual.Status(ctx, gr.GroupResource())
if status.ReadUnified {
return fmt.Errorf("unexpected state - already using unified storage for: %s", gr)
}
@ -60,13 +42,13 @@ func (j *migrationJob) wipeUnifiedAndSetMigratedFlag(ctx context.Context, dual d
settings := resource.BulkSettings{
RebuildCollection: true, // wipes everything in the collection
Collection: []*resource.ResourceKey{{
Namespace: j.namespace,
Namespace: namespace,
Group: gr.Group,
Resource: gr.Resource,
}},
}
ctx = metadata.NewOutgoingContext(ctx, settings.ToMD())
stream, err := j.batch.BulkProcess(ctx)
stream, err := batch.BulkProcess(ctx)
if err != nil {
return fmt.Errorf("error clearing unified %s / %w", gr, err)
}

View File

@ -2,41 +2,41 @@ package migrate
import (
"context"
"fmt"
"errors"
"strings"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/repository"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/resources"
)
func (j *migrationJob) loadUsers(ctx context.Context) error {
client, err := j.parser.Clients().User()
const maxUsers = 10000
func loadUsers(ctx context.Context, parser *resources.Parser) (map[string]repository.CommitSignature, error) {
client, err := parser.Clients().User()
if err != nil {
return err
return nil, err
}
rawList, err := client.List(ctx, metav1.ListOptions{Limit: 10000})
if err != nil {
return fmt.Errorf("failed to list users: %w", err)
}
if rawList.GetContinue() != "" {
return fmt.Errorf("unable to list all users in one request: %s", rawList.GetContinue())
}
userInfo := make(map[string]repository.CommitSignature)
var count int
err = resources.ForEachResource(ctx, client, func(item *unstructured.Unstructured) error {
count++
if count > maxUsers {
return errors.New("too many users")
}
var ok bool
j.userInfo = make(map[string]repository.CommitSignature)
for _, item := range rawList.Items {
sig := repository.CommitSignature{}
// FIXME: should we improve logging here?
var ok bool
sig.Name, ok, err = unstructured.NestedString(item.Object, "spec", "login")
if !ok || err != nil {
continue
return nil
}
sig.Email, ok, err = unstructured.NestedString(item.Object, "spec", "email")
if !ok || err != nil {
continue
return nil
}
if sig.Name == sig.Email {
@ -47,7 +47,12 @@ func (j *migrationJob) loadUsers(ctx context.Context) error {
}
}
j.userInfo["user:"+item.GetName()] = sig
userInfo["user:"+item.GetName()] = sig
return nil
})
if err != nil {
return nil, err
}
return nil
return userInfo, nil
}

View File

@ -9,7 +9,6 @@ import (
"time"
"github.com/grafana/grafana-app-sdk/logging"
"github.com/grafana/grafana/pkg/apimachinery/utils"
provisioning "github.com/grafana/grafana/pkg/apis/provisioning/v0alpha1"
"github.com/grafana/grafana/pkg/registry/apis/dashboard/legacy"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs"
@ -21,6 +20,9 @@ import (
"github.com/grafana/grafana/pkg/registry/apis/provisioning/secrets"
"github.com/grafana/grafana/pkg/storage/legacysql/dualwrite"
"github.com/grafana/grafana/pkg/storage/unified/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/dynamic"
)
type MigrationWorker struct {
@ -128,7 +130,7 @@ func (w *MigrationWorker) Process(ctx context.Context, repo repository.Repositor
return w.migrateFromLegacy(ctx, rw, buffered, *options, progress)
}
return w.migrateFromUnifiedStorage(ctx, rw, *options, progress)
return w.migrateFromAPIServer(ctx, rw, *options, progress)
}
// migrateFromLegacy will export the resources from legacy storage and import them into the target repository
@ -138,29 +140,61 @@ func (w *MigrationWorker) migrateFromLegacy(ctx context.Context, rw repository.R
return fmt.Errorf("error getting parser: %w", err)
}
worker, err := newMigrationJob(ctx, rw, options, parser, w.bulk, w.legacyMigrator, progress)
if err != nil {
return fmt.Errorf("error creating job: %w", err)
}
var userInfo map[string]repository.CommitSignature
if options.History {
progress.SetMessage(ctx, "loading users")
err = worker.loadUsers(ctx)
userInfo, err = loadUsers(ctx, parser)
if err != nil {
return fmt.Errorf("error loading users: %w", err)
}
}
namespace := rw.Config().Namespace
progress.SetMessage(ctx, "exporting legacy folders")
err = worker.migrateLegacyFolders(ctx)
progress.SetMessage(ctx, "loading legacy folders")
reader := NewLegacyFolderReader(w.legacyMigrator, rw.Config().Name, namespace)
if err = reader.Read(ctx, w.legacyMigrator, rw.Config().Name, namespace); err != nil {
return fmt.Errorf("error loading folder tree: %w", err)
}
folderClient, err := parser.Clients().Folder()
if err != nil {
return err
return fmt.Errorf("error getting folder client: %w", err)
}
folders := resources.NewFolderManager(rw, folderClient, reader.Tree())
progress.SetMessage(ctx, "exporting legacy folders")
err = folders.EnsureTreeExists(ctx, "", "", func(folder resources.Folder, created bool, err error) error {
result := jobs.JobResourceResult{
Action: repository.FileActionCreated,
Name: folder.ID,
Resource: resources.FolderResource.Resource,
Group: resources.FolderResource.Group,
Path: folder.Path,
Error: err,
}
if !created {
result.Action = repository.FileActionIgnored
}
progress.Record(ctx, result)
return nil
})
if err != nil {
return fmt.Errorf("error exporting legacy folders: %w", err)
}
progress.SetMessage(ctx, "exporting legacy resources")
err = worker.migrateLegacyResources(ctx)
if err != nil {
return err
resourceManager := resources.NewResourcesManager(rw, folders, parser, parser.Clients(), userInfo)
for _, kind := range resources.SupportedResources {
if kind == resources.FolderResource {
continue
}
reader := NewLegacyResourceMigrator(w.legacyMigrator, parser, resourceManager, progress, options, namespace, kind.GroupResource())
if err := reader.Migrate(ctx); err != nil {
return fmt.Errorf("error migrating resource %s: %w", kind, err)
}
}
if buffered != nil {
@ -182,7 +216,7 @@ func (w *MigrationWorker) migrateFromLegacy(ctx context.Context, rw repository.R
}
progress.SetMessage(ctx, "resetting unified storage")
if err = worker.wipeUnifiedAndSetMigratedFlag(ctx, w.storageStatus); err != nil {
if err = wipeUnifiedAndSetMigratedFlag(ctx, w.storageStatus, namespace, w.bulk); err != nil {
return fmt.Errorf("unable to reset unified storage %w", err)
}
@ -209,125 +243,57 @@ func (w *MigrationWorker) migrateFromLegacy(ctx context.Context, rw repository.R
return err
}
// migrateFromUnifiedStorage will export the resources from unified storage and import them into the target repository
func (w *MigrationWorker) migrateFromUnifiedStorage(ctx context.Context, repo repository.ReaderWriter, options provisioning.MigrateJobOptions, progress jobs.JobProgressRecorder) error {
parser, err := w.parsers.GetParser(ctx, repo)
if err != nil {
return fmt.Errorf("error getting parser: %w", err)
}
// migrateFromAPIServer will export the resources from unified storage and import them into the target repository
func (w *MigrationWorker) migrateFromAPIServer(ctx context.Context, repo repository.ReaderWriter, options provisioning.MigrateJobOptions, progress jobs.JobProgressRecorder) error {
progress.SetMessage(ctx, "exporting unified storage resources")
if err := w.exportWorker.Process(ctx, repo, provisioning.Job{
exportJob := provisioning.Job{
Spec: provisioning.JobSpec{
Push: &provisioning.ExportJobOptions{
Identifier: options.Identifier,
},
},
}, progress); err != nil {
}
if err := w.exportWorker.Process(ctx, repo, exportJob, progress); err != nil {
return fmt.Errorf("export resources: %w", err)
}
// Reset the results after the export as pull will operate on the same resources
progress.ResetResults()
progress.SetMessage(ctx, "pulling resources")
err = w.syncWorker.Process(ctx, repo, provisioning.Job{
syncJob := provisioning.Job{
Spec: provisioning.JobSpec{
Pull: &provisioning.SyncJobOptions{
Incremental: false,
},
},
}, progress)
if err != nil {
}
if err := w.syncWorker.Process(ctx, repo, syncJob, progress); err != nil {
return fmt.Errorf("pull resources: %w", err)
}
folderClient, err := parser.Clients().Folder()
progress.SetMessage(ctx, "removing unprovisioned resources")
parser, err := w.parsers.GetParser(ctx, repo)
if err != nil {
return fmt.Errorf("unable to get folder client: %w", err)
return fmt.Errorf("error getting parser: %w", err)
}
dashboardClient, err := parser.Clients().Dashboard()
if err != nil {
return fmt.Errorf("unable to get dashboard client: %w", err)
}
return parser.Clients().ForEachUnmanagedResource(ctx, func(client dynamic.ResourceInterface, item *unstructured.Unstructured) error {
result := jobs.JobResourceResult{
Name: item.GetName(),
Resource: item.GetKind(),
Group: item.GroupVersionKind().Group,
Action: repository.FileActionDeleted,
}
progress.SetMessage(ctx, "removing unprovisioned folders")
err = removeUnprovisioned(ctx, folderClient, progress)
if err != nil {
return fmt.Errorf("remove unprovisioned folders: %w", err)
}
if err = client.Delete(ctx, item.GetName(), metav1.DeleteOptions{}); err != nil {
result.Error = fmt.Errorf("failed to delete folder: %w", err)
progress.Record(ctx, result)
return result.Error
}
progress.SetMessage(ctx, "removing unprovisioned dashboards")
err = removeUnprovisioned(ctx, dashboardClient, progress)
if err != nil {
return fmt.Errorf("remove unprovisioned dashboards: %w", err)
}
progress.Record(ctx, result)
return nil
}
// MigrationJob holds all context for a running job
type migrationJob struct {
logger logging.Logger
target repository.ReaderWriter
legacy legacy.LegacyMigrator
parser *resources.Parser
batch resource.BulkStoreClient
namespace string
progress jobs.JobProgressRecorder
userInfo map[string]repository.CommitSignature
folderTree *resources.FolderTree
options provisioning.MigrateJobOptions
}
func newMigrationJob(ctx context.Context,
target repository.ReaderWriter,
options provisioning.MigrateJobOptions,
parser *resources.Parser,
batch resource.BulkStoreClient,
legacyMigrator legacy.LegacyMigrator,
progress jobs.JobProgressRecorder,
) (*migrationJob, error) {
return &migrationJob{
namespace: target.Config().Namespace,
target: target,
logger: logging.FromContext(ctx),
progress: progress,
options: options,
parser: parser,
batch: batch,
legacy: legacyMigrator,
folderTree: resources.NewEmptyFolderTree(),
}, nil
}
func (j *migrationJob) withAuthorSignature(ctx context.Context, item utils.GrafanaMetaAccessor) context.Context {
if j.userInfo == nil {
return ctx
}
id := item.GetUpdatedBy()
if id == "" {
id = item.GetCreatedBy()
}
if id == "" {
id = "grafana"
}
sig := j.userInfo[id] // lookup
if sig.Name == "" && sig.Email == "" {
sig.Name = id
}
t, err := item.GetUpdatedTimestamp()
if err == nil && t != nil {
sig.When = *t
} else {
sig.When = item.GetCreationTimestamp().Time
}
return repository.WithAuthorSignature(ctx, sig)
return nil
})
}

View File

@ -5,7 +5,6 @@ import (
"sort"
"strings"
folders "github.com/grafana/grafana/pkg/apis/folder/v0alpha1"
provisioning "github.com/grafana/grafana/pkg/apis/provisioning/v0alpha1"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/repository"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/resources"
@ -25,14 +24,14 @@ func Changes(source []repository.FileTreeEntry, target *provisioning.ResourceLis
lookup := make(map[string]*provisioning.ResourceListItem, len(target.Items))
for _, item := range target.Items {
if item.Path == "" {
if item.Group != folders.GROUP {
if item.Group != resources.FolderResource.Group {
return nil, fmt.Errorf("empty path on a non folder")
}
continue
}
// TODO: why do we have to do this here?
if item.Group == folders.GROUP && !strings.HasSuffix(item.Path, "/") {
if item.Group == resources.FolderResource.Group && !strings.HasSuffix(item.Path, "/") {
item.Path = item.Path + "/"
}
@ -44,7 +43,7 @@ func Changes(source []repository.FileTreeEntry, target *provisioning.ResourceLis
for _, file := range source {
check, ok := lookup[file.Path]
if ok {
if check.Hash != file.Hash && check.Resource != folders.RESOURCE {
if check.Hash != file.Hash && check.Resource != resources.FolderResource.Resource {
changes = append(changes, ResourceFileChange{
Action: repository.FileActionUpdated,
Path: check.Path,
@ -56,7 +55,7 @@ func Changes(source []repository.FileTreeEntry, target *provisioning.ResourceLis
return nil, fmt.Errorf("failed to add path to keep trie: %w", err)
}
if check.Resource != folders.RESOURCE {
if check.Resource != resources.FolderResource.Resource {
delete(lookup, file.Path)
}
@ -101,7 +100,7 @@ func Changes(source []repository.FileTreeEntry, target *provisioning.ResourceLis
// Paths found in grafana, without a matching path in the repository
for _, v := range lookup {
if v.Resource == folders.RESOURCE && keep.Exists(v.Path) {
if v.Resource == resources.FolderResource.Resource && keep.Exists(v.Path) {
continue
}

View File

@ -1,7 +1,6 @@
package sync
import (
"bytes"
"context"
"encoding/json"
"errors"
@ -12,7 +11,6 @@ import (
"k8s.io/apimachinery/pkg/types"
"github.com/grafana/grafana-app-sdk/logging"
folders "github.com/grafana/grafana/pkg/apis/folder/v0alpha1"
provisioning "github.com/grafana/grafana/pkg/apis/provisioning/v0alpha1"
client "github.com/grafana/grafana/pkg/generated/clientset/versioned/typed/provisioning/v0alpha1"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs"
@ -64,7 +62,7 @@ func (r *SyncWorker) Process(ctx context.Context, repo repository.Repository, jo
return fmt.Errorf("sync not supported until storage has migrated")
}
rw, ok := repo.(repository.Reader)
rw, ok := repo.(repository.ReaderWriter)
if !ok {
return fmt.Errorf("sync job submitted for repository that does not support read-write -- this is a bug")
}
@ -132,7 +130,7 @@ func (r *SyncWorker) Process(ctx context.Context, repo repository.Repository, jo
}
// start a job and run it
func (r *SyncWorker) createJob(ctx context.Context, repo repository.Reader, progress jobs.JobProgressRecorder) (*syncJob, error) {
func (r *SyncWorker) createJob(ctx context.Context, repo repository.ReaderWriter, progress jobs.JobProgressRecorder) (*syncJob, error) {
cfg := repo.Config()
parser, err := r.parsers.GetParser(ctx, repo)
if err != nil {
@ -144,13 +142,14 @@ func (r *SyncWorker) createJob(ctx context.Context, repo repository.Reader, prog
return nil, fmt.Errorf("unable to get folder client: %w", err)
}
folders := resources.NewFolderManager(repo, folderClient, resources.NewEmptyFolderTree())
job := &syncJob{
repository: repo,
progress: progress,
parser: parser,
lister: r.lister,
folders: resources.NewFolderManager(repo, folderClient),
resourcesLookup: map[resourceID]string{},
folders: folders,
clients: parser.Clients(),
resourceManager: resources.NewResourcesManager(repo, folders, parser, parser.Clients(), nil),
}
return job, nil
@ -171,21 +170,14 @@ func (r *SyncWorker) patchStatus(ctx context.Context, repo *provisioning.Reposit
return nil
}
type resourceID struct {
Name string
Resource string
Group string
}
// created once for each sync execution
type syncJob struct {
repository repository.Reader
progress jobs.JobProgressRecorder
parser *resources.Parser
lister resources.ResourceLister
folders *resources.FolderManager
folderLookup *resources.FolderTree
resourcesLookup map[resourceID]string // the path with this k8s name
clients *resources.ResourceClients
resourceManager *resources.ResourcesManager
}
func (r *syncJob) run(ctx context.Context, options provisioning.SyncJobOptions) error {
@ -242,18 +234,13 @@ func (r *syncJob) run(ctx context.Context, options provisioning.SyncJobOptions)
return nil
}
// Load any existing folder information
r.folderLookup = resources.NewFolderTreeFromResourceList(target)
r.folders.SetTree(resources.NewFolderTreeFromResourceList(target))
// Now apply the changes
return r.applyChanges(ctx, changes)
}
func (r *syncJob) applyChanges(ctx context.Context, changes []ResourceFileChange) error {
if len(r.resourcesLookup) > 0 {
return fmt.Errorf("this should be empty")
}
r.progress.SetTotal(ctx, len(changes))
r.progress.SetMessage(ctx, "replicating changes")
@ -285,7 +272,8 @@ func (r *syncJob) applyChanges(ctx context.Context, changes []ResourceFileChange
Resource: change.Existing.Resource,
}
client, _, err := r.parser.Clients().ForResource(versionlessGVR)
// TODO: should we use the clients or the resource manager instead?
client, _, err := r.clients.ForResource(versionlessGVR)
if err != nil {
result.Error = fmt.Errorf("unable to get client for deleted object: %w", err)
r.progress.Record(ctx, result)
@ -312,15 +300,25 @@ func (r *syncJob) applyChanges(ctx context.Context, changes []ResourceFileChange
}
result.Name = folder
result.Resource = folders.RESOURCE
result.Group = folders.GROUP
result.Resource = resources.FolderResource.Resource
result.Group = resources.FolderResource.Group
r.progress.Record(ctx, result)
continue
}
// Write the resource file
r.progress.Record(ctx, r.writeResourceFromFile(ctx, change.Path, "", change.Action))
name, gvk, err := r.resourceManager.WriteResourceFromFile(ctx, change.Path, "")
result := jobs.JobResourceResult{
Path: change.Path,
Action: change.Action,
Name: name,
Error: err,
}
if gvk != nil {
result.Resource = gvk.Kind
result.Group = gvk.Group
}
r.progress.Record(ctx, result)
}
r.progress.SetMessage(ctx, "changes replicated")
@ -367,8 +365,8 @@ func (r *syncJob) applyVersionedChanges(ctx context.Context, repo repository.Ver
r.progress.Record(ctx, jobs.JobResourceResult{
Path: safeSegment,
Action: repository.FileActionCreated,
Resource: folders.RESOURCE,
Group: folders.GROUP,
Resource: resources.FolderResource.Resource,
Group: resources.FolderResource.Group,
Name: folder,
})
@ -382,126 +380,49 @@ func (r *syncJob) applyVersionedChanges(ctx context.Context, repo repository.Ver
continue
}
result := jobs.JobResourceResult{
Path: change.Path,
Action: change.Action,
}
switch change.Action {
case repository.FileActionCreated, repository.FileActionUpdated:
r.progress.Record(ctx, r.writeResourceFromFile(ctx, change.Path, change.Ref, change.Action))
case repository.FileActionDeleted:
r.progress.Record(ctx, r.deleteObject(ctx, change.Path, change.PreviousRef))
case repository.FileActionRenamed:
// 1. Delete
result := r.deleteObject(ctx, change.Path, change.PreviousRef)
if result.Error != nil {
r.progress.Record(ctx, result)
continue
name, gvk, err := r.resourceManager.WriteResourceFromFile(ctx, change.Path, change.Ref)
if err != nil {
result.Error = fmt.Errorf("write resource: %w", err)
}
result.Name = name
if gvk != nil {
result.Resource = gvk.Kind
result.Group = gvk.Group
}
case repository.FileActionDeleted:
name, gvk, err := r.resourceManager.RemoveResourceFromFile(ctx, change.Path, change.PreviousRef)
if err != nil {
result.Error = fmt.Errorf("delete resource: %w", err)
}
result.Name = name
if gvk != nil {
result.Resource = gvk.Kind
result.Group = gvk.Group
}
case repository.FileActionRenamed:
name, gvk, err := r.resourceManager.RenameResourceFile(ctx, change.Path, change.PreviousRef, change.Path, change.Ref)
if err != nil {
result.Error = fmt.Errorf("rename resource: %w", err)
}
result.Name = name
if gvk != nil {
result.Resource = gvk.Kind
result.Group = gvk.Group
}
// 2. Create
r.progress.Record(ctx, r.writeResourceFromFile(ctx, change.Path, change.Ref, repository.FileActionCreated))
case repository.FileActionIgnored:
r.progress.Record(ctx, jobs.JobResourceResult{
Path: change.Path,
Action: repository.FileActionIgnored,
})
// do nothing
}
r.progress.Record(ctx, result)
}
r.progress.SetMessage(ctx, "versioned changes replicated")
return nil
}
func (r *syncJob) deleteObject(ctx context.Context, path string, ref string) jobs.JobResourceResult {
info, err := r.repository.Read(ctx, path, ref)
result := jobs.JobResourceResult{
Path: path,
Action: repository.FileActionDeleted,
}
if err != nil {
result.Error = fmt.Errorf("failed to read file: %w", err)
return result
}
obj, gvk, _ := resources.DecodeYAMLObject(bytes.NewBuffer(info.Data))
if obj == nil {
result.Error = errors.New("no object found")
return result
}
objName := obj.GetName()
if objName == "" {
// Find the referenced file
objName, _ = resources.NamesFromHashedRepoPath(r.repository.Config().Name, path)
}
result.Name = objName
result.Resource = gvk.Kind
result.Group = gvk.Group
client, _, err := r.parser.Clients().ForKind(*gvk)
if err != nil {
result.Error = fmt.Errorf("unable to get client for deleted object: %w", err)
return result
}
err = client.Delete(ctx, objName, metav1.DeleteOptions{})
if err != nil {
result.Error = fmt.Errorf("failed to delete: %w", err)
return result
}
return result
}
func (r *syncJob) writeResourceFromFile(ctx context.Context, path string, ref string, action repository.FileAction) jobs.JobResourceResult {
result := jobs.JobResourceResult{
Path: path,
Action: action,
}
// Read the referenced file
fileInfo, err := r.repository.Read(ctx, path, ref)
if err != nil {
result.Error = fmt.Errorf("failed to read file: %w", err)
return result
}
parsed, err := r.parser.Parse(ctx, fileInfo, false) // no validation
if err != nil {
result.Error = fmt.Errorf("failed to parse file: %w", err)
return result
}
// Check if the resource already exists
id := resourceID{
Name: parsed.Obj.GetName(),
Resource: parsed.GVR.Resource,
Group: parsed.GVK.Group,
}
existing, found := r.resourcesLookup[id]
if found {
result.Error = fmt.Errorf("duplicate resource name: %s, %s and %s", parsed.Obj.GetName(), path, existing)
return result
}
r.resourcesLookup[id] = path
// Make sure the parent folders exist
folder, err := r.folders.EnsureFolderPathExist(ctx, path)
if err != nil {
result.Error = fmt.Errorf("failed to ensure folder path exists: %w", err)
return result
}
parsed.Meta.SetFolder(folder)
parsed.Meta.SetUID("") // clear identifiers
parsed.Meta.SetResourceVersion("") // clear identifiers
result.Name = parsed.Obj.GetName()
result.Resource = parsed.GVR.Resource
result.Group = parsed.GVK.Group
// Update will also create (for resources we care about)
_, err = parsed.Client.Update(ctx, parsed.Obj, metav1.UpdateOptions{})
result.Error = err
return result
}

View File

@ -562,6 +562,7 @@ func (b *APIBuilder) GetPostStartHooks() (map[string]genericapiserver.PostStartH
b.storageStatus,
b.secrets,
b.clonedir,
b.parsers,
)
syncWorker := sync.NewSyncWorker(
c.ProvisioningV0alpha1(),

View File

@ -29,6 +29,7 @@ func readBody(r *http.Request, maxSize int64) ([]byte, error) {
}
return nil, fmt.Errorf("error reading request body: %w", err)
}
defer func() { _ = limitedBody.Close() }()
return body, nil
}

View File

@ -5,10 +5,13 @@ import (
"fmt"
"sync"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
dashboard "github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/v1alpha1"
"github.com/grafana/grafana/pkg/apimachinery/utils"
folders "github.com/grafana/grafana/pkg/apis/folder/v0alpha1"
iam "github.com/grafana/grafana/pkg/apis/iam/v0alpha1"
"github.com/grafana/grafana/pkg/services/apiserver"
@ -142,29 +145,105 @@ func (c *ResourceClients) ForResource(gvr schema.GroupVersionResource) (dynamic.
return info.client, info.gvk, nil
}
func (c *ResourceClients) Folder() (dynamic.ResourceInterface, error) {
v, _, err := c.ForResource(schema.GroupVersionResource{
Group: folders.GROUP,
Version: folders.VERSION,
Resource: folders.RESOURCE,
// ForEachResource applies the function to each resource in the discovery client
func (c *ResourceClients) ForEachResource(ctx context.Context, kind schema.GroupVersionResource, fn func(client dynamic.ResourceInterface, item *unstructured.Unstructured) error) error {
client, _, err := c.ForResource(kind)
if err != nil {
return err
}
return ForEachResource(ctx, client, func(item *unstructured.Unstructured) error {
return fn(client, item)
})
return v, err
}
// ForEachResource applies the function to each resource in the discovery client
func ForEachResource(ctx context.Context, client dynamic.ResourceInterface, fn func(item *unstructured.Unstructured) error) error {
var continueToken string
for ctx.Err() == nil {
list, err := client.List(ctx, metav1.ListOptions{Limit: 100, Continue: continueToken})
if err != nil {
return fmt.Errorf("error executing list: %w", err)
}
for _, item := range list.Items {
if ctx.Err() != nil {
return ctx.Err()
}
if err := fn(&item); err != nil {
return err
}
}
continueToken = list.GetContinue()
if continueToken == "" {
break
}
}
return nil
}
// ForEachUnmanagedResource applies the function to each unprovisioned supported resource
func (c *ResourceClients) ForEachUnmanagedResource(ctx context.Context, fn func(client dynamic.ResourceInterface, item *unstructured.Unstructured) error) error {
return c.ForEachSupportedResource(ctx, func(client dynamic.ResourceInterface, item *unstructured.Unstructured) error {
meta, err := utils.MetaAccessor(item)
if err != nil {
return fmt.Errorf("extract meta accessor: %w", err)
}
// Skip if managed
_, ok := meta.GetManagerProperties()
if ok {
return nil
}
return fn(client, item)
})
}
// ForEachSupportedResource applies the function to each supported resource
func (c *ResourceClients) ForEachSupportedResource(ctx context.Context, fn func(client dynamic.ResourceInterface, item *unstructured.Unstructured) error) error {
for _, kind := range SupportedResources {
if err := c.ForEachResource(ctx, kind, fn); err != nil {
return err
}
}
return nil
}
func (c *ResourceClients) Folder() (dynamic.ResourceInterface, error) {
client, _, err := c.ForResource(FolderResource)
return client, err
}
func (c *ResourceClients) ForEachFolder(ctx context.Context, fn func(client dynamic.ResourceInterface, item *unstructured.Unstructured) error) error {
return c.ForEachResource(ctx, FolderResource, fn)
}
func (c *ResourceClients) User() (dynamic.ResourceInterface, error) {
v, _, err := c.ForResource(schema.GroupVersionResource{
Group: iam.GROUP,
Version: iam.VERSION,
Resource: iam.UserResourceInfo.GroupResource().Resource,
})
v, _, err := c.ForResource(UserResource)
return v, err
}
func (c *ResourceClients) Dashboard() (dynamic.ResourceInterface, error) {
v, _, err := c.ForResource(schema.GroupVersionResource{
Group: dashboard.GROUP,
Version: dashboard.VERSION,
Resource: dashboard.DASHBOARD_RESOURCE,
})
return v, err
var UserResource = schema.GroupVersionResource{
Group: iam.GROUP,
Version: iam.VERSION,
Resource: iam.UserResourceInfo.GroupResource().Resource,
}
var FolderResource = schema.GroupVersionResource{
Group: folders.GROUP,
Version: folders.VERSION,
Resource: folders.RESOURCE,
}
var DashboardResource = schema.GroupVersionResource{
Group: dashboard.GROUP,
Version: dashboard.VERSION,
Resource: dashboard.DASHBOARD_RESOURCE,
}
// SupportedResources is the list of resources that are supported by provisioning
var SupportedResources = []schema.GroupVersionResource{FolderResource, DashboardResource}

View File

@ -0,0 +1,276 @@
package resources
import (
"context"
"errors"
"fmt"
"github.com/grafana/grafana/pkg/apimachinery/apis/common/v0alpha1"
provisioning "github.com/grafana/grafana/pkg/apis/provisioning/v0alpha1"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/repository"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/safepath"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// DualReadWriter is a wrapper around a repository that can read and write resources
// TODO: it does not support folders yet
type DualReadWriter struct {
repo repository.ReaderWriter
parser *Parser
folders *FolderManager
}
func NewDualReadWriter(repo repository.ReaderWriter, parser *Parser, folders *FolderManager) *DualReadWriter {
return &DualReadWriter{repo: repo, parser: parser, folders: folders}
}
func (r *DualReadWriter) Read(ctx context.Context, path string, ref string) (*ParsedResource, error) {
// TODO: implement this
if safepath.IsDir(path) {
return nil, fmt.Errorf("folder read not supported")
}
info, err := r.repo.Read(ctx, path, ref)
if err != nil {
return nil, err
}
parsed, err := r.parser.Parse(ctx, info, false)
if err != nil {
return nil, err
}
// GVR will exist for anything we can actually save
// TODO: Add known error in parser for unsupported resource
if parsed.GVR == nil {
if parsed.GVK != nil {
//nolint:govet
parsed.Errors = append(parsed.Errors, fmt.Errorf("unknown resource for Kind: %s", parsed.GVK.Kind))
} else {
parsed.Errors = append(parsed.Errors, fmt.Errorf("unknown resource"))
}
}
return parsed, nil
}
func (r *DualReadWriter) Delete(ctx context.Context, path string, ref string, message string) (*ParsedResource, error) {
if err := repository.IsWriteAllowed(r.repo.Config(), ref); err != nil {
return nil, err
}
// TODO: implement this
if safepath.IsDir(path) {
return nil, fmt.Errorf("folder delete not supported")
}
file, err := r.repo.Read(ctx, path, ref)
if err != nil {
return nil, err // unable to read value
}
// TODO: document in API specification
// We can only delete parsable things
parsed, err := r.parser.Parse(ctx, file, false)
if err != nil {
return nil, err // unable to read value
}
parsed.Action = provisioning.ResourceActionDelete
err = r.repo.Delete(ctx, path, ref, message)
if err != nil {
return nil, fmt.Errorf("delete file from repository: %w", err)
}
// Delete the file in the grafana database
if ref == "" {
err = parsed.Client.Delete(ctx, parsed.Obj.GetName(), metav1.DeleteOptions{})
if apierrors.IsNotFound(err) {
err = nil // ignorable
}
if err != nil {
return nil, fmt.Errorf("delete resource from storage: %w", err)
}
}
return parsed, err
}
// CreateFolder creates a new folder in the repository
// FIXME: fix signature to return ParsedResource
func (r *DualReadWriter) CreateFolder(ctx context.Context, path string, ref string, message string) (*provisioning.ResourceWrapper, error) {
if err := repository.IsWriteAllowed(r.repo.Config(), ref); err != nil {
return nil, err
}
if !safepath.IsDir(path) {
return nil, fmt.Errorf("not a folder path")
}
// Now actually create the folder
if err := r.repo.Create(ctx, path, ref, nil, message); err != nil {
return nil, fmt.Errorf("failed to create folder: %w", err)
}
cfg := r.repo.Config()
wrap := &provisioning.ResourceWrapper{
Path: path,
Ref: ref,
Repository: provisioning.ResourceRepositoryInfo{
Type: cfg.Spec.Type,
Namespace: cfg.Namespace,
Name: cfg.Name,
Title: cfg.Spec.Title,
},
Resource: provisioning.ResourceObjects{
Action: provisioning.ResourceActionCreate,
},
}
if ref == "" {
folderName, err := r.folders.EnsureFolderPathExist(ctx, path)
if err != nil {
return nil, err
}
current, err := r.folders.GetFolder(ctx, folderName)
if err != nil && !apierrors.IsNotFound(err) {
return nil, err // unable to check if the folder exists
}
wrap.Resource.Upsert = v0alpha1.Unstructured{
Object: current.Object,
}
}
return wrap, nil
}
// CreateResource creates a new resource in the repository
func (r *DualReadWriter) CreateResource(ctx context.Context, path string, ref string, message string, data []byte) (*ParsedResource, error) {
if err := repository.IsWriteAllowed(r.repo.Config(), ref); err != nil {
return nil, err
}
info := &repository.FileInfo{
Data: data,
Path: path,
Ref: ref,
}
// TODO: improve parser to parse out of reader
parsed, err := r.parser.Parse(ctx, info, true)
if err != nil {
if errors.Is(err, ErrUnableToReadResourceBytes) {
return nil, apierrors.NewBadRequest("unable to read the request as a resource")
}
return nil, err
}
// GVR will exist for anything we can actually save
// TODO: Add known error in parser for unsupported resource
if parsed.GVR == nil {
return nil, apierrors.NewBadRequest("The payload does not map to a known resource")
}
// Do not write if any errors exist
if len(parsed.Errors) > 0 {
return parsed, err
}
data, err = parsed.ToSaveBytes()
if err != nil {
return nil, err
}
err = r.repo.Create(ctx, path, ref, data, message)
if err != nil {
return nil, fmt.Errorf("create resource in repository: %w", err)
}
// Directly update the grafana database
// Behaves the same running sync after writing
if ref == "" {
// FIXME: we are not creating the folder path
// TODO: will existing also be present here? for update?
if parsed.Existing == nil {
parsed.Upsert, err = parsed.Client.Create(ctx, parsed.Obj, metav1.CreateOptions{})
if err != nil {
parsed.Errors = append(parsed.Errors, err)
}
} else {
parsed.Upsert, err = parsed.Client.Update(ctx, parsed.Obj, metav1.UpdateOptions{})
if err != nil {
parsed.Errors = append(parsed.Errors, err)
}
}
}
return parsed, err
}
// UpdateResource updates a resource in the repository
func (r *DualReadWriter) UpdateResource(ctx context.Context, path string, ref string, message string, data []byte) (*ParsedResource, error) {
if err := repository.IsWriteAllowed(r.repo.Config(), ref); err != nil {
return nil, err
}
info := &repository.FileInfo{
Data: data,
Path: path,
Ref: ref,
}
// TODO: improve parser to parse out of reader
parsed, err := r.parser.Parse(ctx, info, true)
if err != nil {
if errors.Is(err, ErrUnableToReadResourceBytes) {
return nil, apierrors.NewBadRequest("unable to read the request as a resource")
}
return nil, err
}
// GVR will exist for anything we can actually save
// TODO: Add known error in parser for unsupported resource
if parsed.GVR == nil {
return nil, apierrors.NewBadRequest("The payload does not map to a known resource")
}
// Do not write if any errors exist
if len(parsed.Errors) > 0 {
return parsed, err
}
data, err = parsed.ToSaveBytes()
if err != nil {
return nil, err
}
err = r.repo.Update(ctx, path, ref, data, message)
if err != nil {
return nil, fmt.Errorf("update resource in repository: %w", err)
}
// Directly update the grafana database
// Behaves the same running sync after writing
if ref == "" {
// FIXME: we are not creating the folder path
// FIXME: I don't like this parsed strategy here
if parsed.Existing == nil {
parsed.Upsert, err = parsed.Client.Create(ctx, parsed.Obj, metav1.CreateOptions{})
if err != nil {
parsed.Errors = append(parsed.Errors, err)
}
} else {
parsed.Upsert, err = parsed.Client.Update(ctx, parsed.Obj, metav1.UpdateOptions{})
if err != nil {
parsed.Errors = append(parsed.Errors, err)
}
}
}
return parsed, err
}

View File

@ -2,6 +2,7 @@ package resources
import (
"context"
"errors"
"fmt"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -15,16 +16,18 @@ import (
"github.com/grafana/grafana/pkg/registry/apis/provisioning/safepath"
)
const maxFolders = 10000
type FolderManager struct {
repo repository.Repository
lookup *FolderTree
repo repository.ReaderWriter
tree *FolderTree
client dynamic.ResourceInterface
}
func NewFolderManager(repo repository.Repository, client dynamic.ResourceInterface) *FolderManager {
func NewFolderManager(repo repository.ReaderWriter, client dynamic.ResourceInterface, lookup *FolderTree) *FolderManager {
return &FolderManager{
repo: repo,
lookup: NewEmptyFolderTree(),
tree: lookup,
client: client,
}
}
@ -33,6 +36,14 @@ func (fm *FolderManager) Client() dynamic.ResourceInterface {
return fm.client
}
func (fm *FolderManager) Tree() *FolderTree {
return fm.tree
}
func (fm *FolderManager) SetTree(tree *FolderTree) {
fm.tree = tree
}
// EnsureFoldersExist creates the folder structure in the cluster.
func (fm *FolderManager) EnsureFolderPathExist(ctx context.Context, filePath string) (parent string, err error) {
cfg := fm.repo.Config()
@ -48,13 +59,13 @@ func (fm *FolderManager) EnsureFolderPathExist(ctx context.Context, filePath str
}
f := ParseFolder(dir, cfg.Name)
if fm.lookup.In(f.ID) {
if fm.tree.In(f.ID) {
return f.ID, nil
}
err = safepath.Walk(ctx, f.Path, func(ctx context.Context, traverse string) error {
f := ParseFolder(traverse, cfg.GetName())
if fm.lookup.In(f.ID) {
if fm.tree.In(f.ID) {
parent = f.ID
return nil
}
@ -63,7 +74,7 @@ func (fm *FolderManager) EnsureFolderPathExist(ctx context.Context, filePath str
return fmt.Errorf("ensure folder exists: %w", err)
}
fm.lookup.Add(f, parent)
fm.tree.Add(f, parent)
parent = f.ID
return nil
})
@ -131,3 +142,40 @@ func (fm *FolderManager) EnsureFolderExists(ctx context.Context, folder Folder,
func (fm *FolderManager) GetFolder(ctx context.Context, name string) (*unstructured.Unstructured, error) {
return fm.client.Get(ctx, name, metav1.GetOptions{})
}
// ReplicateTree replicates the folder tree to the repository.
// The function fn is called for each folder.
// If the folder already exists, the function is called with created set to false.
// If the folder is created, the function is called with created set to true.
func (fm *FolderManager) EnsureTreeExists(ctx context.Context, ref, path string, fn func(folder Folder, created bool, err error) error) error {
return fm.tree.Walk(ctx, func(ctx context.Context, folder Folder) error {
p := folder.Path
if path != "" {
p = safepath.Join(path, p)
}
_, err := fm.repo.Read(ctx, p, ref)
if err != nil && !(errors.Is(err, repository.ErrFileNotFound) || apierrors.IsNotFound(err)) {
return fn(folder, false, fmt.Errorf("check if folder exists before writing: %w", err))
} else if err == nil {
return fn(folder, false, nil)
}
msg := fmt.Sprintf("Add folder %s", p)
if err := fm.repo.Create(ctx, p, ref, nil, msg); err != nil {
return fn(folder, true, fmt.Errorf("write folder in repo: %w", err))
}
return fn(folder, true, nil)
})
}
func (fm *FolderManager) LoadFromServer(ctx context.Context) error {
return ForEachResource(ctx, fm.client, func(item *unstructured.Unstructured) error {
if fm.tree.Count() > maxFolders {
return errors.New("too many folders")
}
return fm.tree.AddUnstructured(item, fm.repo.Config().Name)
})
}

View File

@ -104,6 +104,8 @@ type ParsedResource struct {
Errors []error
}
// FIXME: eliminate clients from parser
func (r *Parser) Clients() *ResourceClients {
return r.clients
}

View File

@ -0,0 +1,229 @@
package resources
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/infra/slugify"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/repository"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/safepath"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
)
var ErrAlreadyInRepository = errors.New("already in repository")
type WriteOptions struct {
Identifier bool
Path string
Ref string
}
type resourceID struct {
Name string
Resource string
Group string
}
type ResourcesManager struct {
repo repository.ReaderWriter
folders *FolderManager
parser *Parser
clients *ResourceClients
userInfo map[string]repository.CommitSignature
resourcesLookup map[resourceID]string // the path with this k8s name
}
func NewResourcesManager(repo repository.ReaderWriter, folders *FolderManager, parser *Parser, clients *ResourceClients, userInfo map[string]repository.CommitSignature) *ResourcesManager {
return &ResourcesManager{
repo: repo,
folders: folders,
parser: parser,
clients: clients,
userInfo: userInfo,
resourcesLookup: map[resourceID]string{},
}
}
// CreateResource writes an object to the repository
func (r *ResourcesManager) CreateResourceFileFromObject(ctx context.Context, obj *unstructured.Unstructured, options WriteOptions) (string, error) {
if err := ctx.Err(); err != nil {
return "", fmt.Errorf("context error: %w", err)
}
meta, err := utils.MetaAccessor(obj)
if err != nil {
return "", fmt.Errorf("extract meta accessor: %w", err)
}
// Message from annotations
commitMessage := meta.GetMessage()
if commitMessage == "" {
g := meta.GetGeneration()
if g > 0 {
commitMessage = fmt.Sprintf("Generation: %d", g)
} else {
commitMessage = "exported from grafana"
}
}
ctx = r.withAuthorSignature(ctx, meta)
name := meta.GetName()
manager, _ := meta.GetManagerProperties()
// TODO: how we should handle this?
if manager.Identity == r.repo.Config().GetName() {
// If it's already in the repository, we don't need to write it
return "", ErrAlreadyInRepository
}
title := meta.FindTitle("")
if title == "" {
title = name
}
folder := meta.GetFolder()
// Get the absolute path of the folder
fid, ok := r.folders.Tree().DirPath(folder, "")
if !ok {
// FIXME: Shouldn't this fail instead?
fid = Folder{
Path: "__folder_not_found/" + slugify.Slugify(folder),
}
// r.logger.Error("folder of item was not in tree of repository")
}
// Clear the metadata
delete(obj.Object, "metadata")
if options.Identifier {
meta.SetName(name) // keep the identifier in the metadata
}
body, err := json.MarshalIndent(obj.Object, "", " ")
if err != nil {
return "", fmt.Errorf("failed to marshal dashboard: %w", err)
}
fileName := slugify.Slugify(title) + ".json"
if fid.Path != "" {
fileName = safepath.Join(fid.Path, fileName)
}
if options.Path != "" {
fileName = safepath.Join(options.Path, fileName)
}
err = r.repo.Write(ctx, fileName, options.Ref, body, commitMessage)
if err != nil {
return "", fmt.Errorf("failed to write file: %w", err)
}
return fileName, nil
}
func (r *ResourcesManager) WriteResourceFromFile(ctx context.Context, path string, ref string) (string, *schema.GroupVersionKind, error) {
// Read the referenced file
fileInfo, err := r.repo.Read(ctx, path, ref)
if err != nil {
return "", nil, fmt.Errorf("failed to read file: %w", err)
}
parsed, err := r.parser.Parse(ctx, fileInfo, false) // no validation
if err != nil {
return "", nil, fmt.Errorf("failed to parse file: %w", err)
}
// Check if the resource already exists
id := resourceID{
Name: parsed.Obj.GetName(),
Resource: parsed.GVR.Resource,
Group: parsed.GVK.Group,
}
existing, found := r.resourcesLookup[id]
if found {
return "", parsed.GVK, fmt.Errorf("duplicate resource name: %s, %s and %s", parsed.Obj.GetName(), path, existing)
}
r.resourcesLookup[id] = path
// Make sure the parent folders exist
folder, err := r.folders.EnsureFolderPathExist(ctx, path)
if err != nil {
return "", parsed.GVK, fmt.Errorf("failed to ensure folder path exists: %w", err)
}
parsed.Meta.SetFolder(folder)
parsed.Meta.SetUID("") // clear identifiers
parsed.Meta.SetResourceVersion("") // clear identifiers
// Update will also create (for resources we care about)
_, err = parsed.Client.Update(ctx, parsed.Obj, metav1.UpdateOptions{})
return parsed.Obj.GetName(), parsed.GVK, err
}
func (r *ResourcesManager) RenameResourceFile(ctx context.Context, previousPath, previousRef, newPath, newRef string) (string, *schema.GroupVersionKind, error) {
name, gvk, err := r.RemoveResourceFromFile(ctx, previousPath, previousRef)
if err != nil {
return name, gvk, fmt.Errorf("failed to remove resource: %w", err)
}
return r.WriteResourceFromFile(ctx, newPath, newRef)
}
func (r *ResourcesManager) RemoveResourceFromFile(ctx context.Context, path string, ref string) (string, *schema.GroupVersionKind, error) {
info, err := r.repo.Read(ctx, path, ref)
if err != nil {
return "", nil, fmt.Errorf("failed to read file: %w", err)
}
obj, gvk, _ := DecodeYAMLObject(bytes.NewBuffer(info.Data))
if obj == nil {
return "", nil, fmt.Errorf("no object found")
}
objName := obj.GetName()
if objName == "" {
// Find the referenced file
objName, _ = NamesFromHashedRepoPath(r.repo.Config().Name, path)
}
client, _, err := r.clients.ForKind(*gvk)
if err != nil {
return "", nil, fmt.Errorf("unable to get client for deleted object: %w", err)
}
err = client.Delete(ctx, objName, metav1.DeleteOptions{})
if err != nil {
return "", nil, fmt.Errorf("failed to delete: %w", err)
}
return objName, gvk, nil
}
func (r *ResourcesManager) withAuthorSignature(ctx context.Context, item utils.GrafanaMetaAccessor) context.Context {
id := item.GetUpdatedBy()
if id == "" {
id = item.GetCreatedBy()
}
if id == "" {
id = "grafana"
}
sig := r.userInfo[id] // lookup
if sig.Name == "" && sig.Email == "" {
sig.Name = id
}
t, err := item.GetUpdatedTimestamp()
if err == nil && t != nil {
sig.When = *t
} else {
sig.When = item.GetCreationTimestamp().Time
}
return repository.WithAuthorSignature(ctx, sig)
}

View File

@ -19,6 +19,7 @@ import (
type FolderTree struct {
tree map[string]string
folders map[string]Folder
count int
}
// In determines if the given folder is in the tree at all. That is, it answers "does the folder even exist in the Grafana instance?"
@ -66,6 +67,11 @@ func (t *FolderTree) DirPath(folder, baseFolder string) (fid Folder, ok bool) {
func (t *FolderTree) Add(folder Folder, parent string) {
t.tree[folder.ID] = parent
t.folders[folder.ID] = folder
t.count++
}
func (t *FolderTree) Count() int {
return t.count
}
type WalkFunc func(ctx context.Context, folder Folder) error
@ -113,6 +119,7 @@ func (t *FolderTree) AddUnstructured(item *unstructured.Unstructured, skipRepo s
}
t.tree[folder.ID] = meta.GetFolder()
t.folders[folder.ID] = folder
t.count++
return nil
}
@ -133,7 +140,8 @@ func NewFolderTreeFromResourceList(resources *provisioning.ResourceList) *Folder
}
return &FolderTree{
tree,
folderIDs,
tree: tree,
folders: folderIDs,
count: len(resources.Items),
}
}

View File

@ -10,6 +10,7 @@ import (
"github.com/grafana/grafana/pkg/registry/apis/provisioning/resources"
)
// FIXME: do this tests make sense in their current form?
func TestIntegrationProvisioning_Client(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
@ -22,32 +23,25 @@ func TestIntegrationProvisioning_Client(t *testing.T) {
require.NoError(t, err)
t.Run("dashboard client support", func(t *testing.T) {
dash, err := clients.Dashboard()
require.NoError(t, err)
require.NotNil(t, dash)
client, _, err := clients.ForResource(schema.GroupVersionResource{
_, _, err := clients.ForResource(schema.GroupVersionResource{
Group: "dashboard.grafana.app",
Resource: "dashboards",
Version: "v1alpha1",
})
require.NoError(t, err)
require.Equal(t, dash, client, "expecting the default dashboard to be version1")
// With empty version, we should get the preferred version (v1alpha1)
client, _, err = clients.ForResource(schema.GroupVersionResource{
_, _, err = clients.ForResource(schema.GroupVersionResource{
Group: "dashboard.grafana.app",
Resource: "dashboards",
})
require.NoError(t, err)
require.Equal(t, dash, client, "expecting the default dashboard to be version0")
client, _, err = clients.ForKind(schema.GroupVersionKind{
_, _, err = clients.ForKind(schema.GroupVersionKind{
Group: "dashboard.grafana.app",
Version: "v1alpha1",
Kind: "Dashboard",
})
require.NoError(t, err)
require.Equal(t, dash, client, "same client when requested by kind")
})
}