Provisioning: introduce interface for git clones (#103175)

* Delegate clone to export in migrate from API server

* Clonable interface

* Root from register.go

* Call option push on write

* Fix linting
This commit is contained in:
Roberto Jiménez Sánchez
2025-04-01 12:42:52 +02:00
committed by GitHub
parent f91f739ee2
commit 29f395e1dd
7 changed files with 136 additions and 131 deletions

View File

@ -11,41 +11,29 @@ import (
provisioning "github.com/grafana/grafana/pkg/apis/provisioning/v0alpha1"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/repository"
gogit "github.com/grafana/grafana/pkg/registry/apis/provisioning/repository/go-git"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/resources"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/secrets"
"github.com/grafana/grafana/pkg/storage/legacysql/dualwrite"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/dynamic"
)
type ExportWorker struct {
// Tempdir for repo clones
clonedir string
// required to create clients
clientFactory *resources.ClientFactory
// Check where values are currently saved
storageStatus dualwrite.Service
// Decrypt secrets in config
secrets secrets.Service
parsers *resources.ParserFactory
}
func NewExportWorker(clientFactory *resources.ClientFactory,
storageStatus dualwrite.Service,
secrets secrets.Service,
clonedir string,
parsers *resources.ParserFactory,
) *ExportWorker {
return &ExportWorker{
clonedir,
clientFactory,
storageStatus,
secrets,
parsers,
}
}
@ -67,27 +55,25 @@ func (r *ExportWorker) Process(ctx context.Context, repo repository.Repository,
return err
}
// Use the existing clone if already checked out
buffered, ok := repo.(*gogit.GoGitRepo)
if !ok && repo.Config().Spec.GitHub != nil {
var clone repository.ClonedRepository
if clonable, ok := repo.(repository.ClonableRepository); ok {
progress.SetMessage(ctx, "clone target")
buffered, err = gogit.Clone(ctx, repo.Config(), gogit.GoGitCloneOptions{
Root: r.clonedir,
SingleCommitBeforePush: true,
clone, err = clonable.Clone(ctx, repository.CloneOptions{
PushOnWrites: false,
// TODO: make this configurable
Timeout: 10 * time.Minute,
}, r.secrets, os.Stdout)
})
if err != nil {
return fmt.Errorf("unable to clone target: %w", err)
}
repo = buffered // send all writes to the buffered repo
defer func() {
if err := buffered.Remove(ctx); err != nil {
if err := clone.Remove(ctx); err != nil {
logging.FromContext(ctx).Error("failed to remove cloned repository after export", "err", err)
}
}()
// Use the cloned repo for all operations
repo = clone
options.Branch = "" // :( the branch is now baked into the repo
}
@ -180,12 +166,13 @@ func (r *ExportWorker) Process(ctx context.Context, repo repository.Repository,
}
}
if buffered != nil {
if clone != nil {
progress.SetMessage(ctx, "push changes")
if err := buffered.Push(ctx, gogit.GoGitPushOptions{
if err := clone.Push(ctx, repository.PushOptions{
// TODO: make this configurable
Timeout: 10 * time.Minute,
}, os.Stdout); err != nil {
Timeout: 10 * time.Minute,
Progress: os.Stdout,
}); err != nil {
return fmt.Errorf("error pushing changes: %w", err)
}
}

View File

@ -15,9 +15,7 @@ import (
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs/export"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs/sync"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/repository"
gogit "github.com/grafana/grafana/pkg/registry/apis/provisioning/repository/go-git"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/resources"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/secrets"
"github.com/grafana/grafana/pkg/storage/legacysql/dualwrite"
"github.com/grafana/grafana/pkg/storage/unified/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -26,9 +24,6 @@ import (
)
type MigrationWorker struct {
// Tempdir for repo clones
clonedir string
// temporary... while we still do an import
parsers *resources.ParserFactory
@ -41,9 +36,6 @@ type MigrationWorker struct {
// Direct access to unified storage... use carefully!
bulk resource.BulkStoreClient
// Decrypt secret from config object
secrets secrets.Service
// Delegate the export to the export worker
exportWorker *export.ExportWorker
@ -56,18 +48,14 @@ func NewMigrationWorker(
parsers *resources.ParserFactory, // should not be necessary!
storageStatus dualwrite.Service,
batch resource.BulkStoreClient,
secrets secrets.Service,
exportWorker *export.ExportWorker,
syncWorker *sync.SyncWorker,
clonedir string,
) *MigrationWorker {
return &MigrationWorker{
clonedir,
parsers,
storageStatus,
legacyMigrator,
batch,
secrets,
exportWorker,
syncWorker,
}
@ -84,17 +72,33 @@ func (w *MigrationWorker) Process(ctx context.Context, repo repository.Repositor
return errors.New("missing migrate settings")
}
progress.SetTotal(ctx, 10) // will show a progress bar
rw, ok := repo.(repository.ReaderWriter)
if !ok {
return errors.New("migration job submitted targeting repository that is not a ReaderWriter")
}
parser, err := w.parsers.GetParser(ctx, rw)
if err != nil {
return fmt.Errorf("error getting parser: %w", err)
}
if dualwrite.IsReadingLegacyDashboardsAndFolders(ctx, w.storageStatus) {
return w.migrateFromLegacy(ctx, rw, parser, *options, progress)
}
return w.migrateFromAPIServer(ctx, rw, parser, *options, progress)
}
// migrateFromLegacy will export the resources from legacy storage and import them into the target repository
func (w *MigrationWorker) migrateFromLegacy(ctx context.Context, rw repository.ReaderWriter, parser *resources.Parser, options provisioning.MigrateJobOptions, progress jobs.JobProgressRecorder) error {
var (
err error
buffered *gogit.GoGitRepo
err error
clone repository.ClonedRepository
)
isFromLegacy := dualwrite.IsReadingLegacyDashboardsAndFolders(ctx, w.storageStatus)
progress.SetTotal(ctx, 10) // will show a progress bar
// TODO: we should fail fast if migration is not possible and not always clone the repository.
if repo.Config().Spec.GitHub != nil {
progress.SetMessage(ctx, "clone "+repo.Config().Spec.GitHub.URL)
clonable, ok := rw.(repository.ClonableRepository)
if ok {
progress.SetMessage(ctx, "clone "+rw.Config().Spec.GitHub.URL)
reader, writer := io.Pipe()
go func() {
scanner := bufio.NewScanner(reader)
@ -103,43 +107,24 @@ func (w *MigrationWorker) Process(ctx context.Context, repo repository.Repositor
}
}()
buffered, err = gogit.Clone(ctx, repo.Config(), gogit.GoGitCloneOptions{
Root: w.clonedir,
SingleCommitBeforePush: !(options.History && isFromLegacy),
clone, err = clonable.Clone(ctx, repository.CloneOptions{
PushOnWrites: options.History,
// TODO: make this configurable
Timeout: 10 * time.Minute,
}, w.secrets, writer)
Timeout: 10 * time.Minute,
Progress: writer,
})
if err != nil {
return fmt.Errorf("unable to clone target: %w", err)
}
repo = buffered // send all writes to the buffered repo
rw = clone // send all writes to the buffered repo
defer func() {
if err := buffered.Remove(ctx); err != nil {
if err := clone.Remove(ctx); err != nil {
logging.FromContext(ctx).Error("failed to remove cloned repository after migrate", "err", err)
}
}()
}
rw, ok := repo.(repository.ReaderWriter)
if !ok {
return errors.New("migration job submitted targeting repository that is not a ReaderWriter")
}
if isFromLegacy {
return w.migrateFromLegacy(ctx, rw, buffered, *options, progress)
}
return w.migrateFromAPIServer(ctx, rw, *options, progress)
}
// migrateFromLegacy will export the resources from legacy storage and import them into the target repository
func (w *MigrationWorker) migrateFromLegacy(ctx context.Context, rw repository.ReaderWriter, buffered *gogit.GoGitRepo, options provisioning.MigrateJobOptions, progress jobs.JobProgressRecorder) error {
parser, err := w.parsers.GetParser(ctx, rw)
if err != nil {
return fmt.Errorf("error getting parser: %w", err)
}
var userInfo map[string]repository.CommitSignature
if options.History {
progress.SetMessage(ctx, "loading users")
@ -197,7 +182,7 @@ func (w *MigrationWorker) migrateFromLegacy(ctx context.Context, rw repository.R
}
}
if buffered != nil {
if clone != nil {
progress.SetMessage(ctx, "pushing changes")
reader, writer := io.Pipe()
go func() {
@ -207,10 +192,11 @@ func (w *MigrationWorker) migrateFromLegacy(ctx context.Context, rw repository.R
}
}()
if err := buffered.Push(ctx, gogit.GoGitPushOptions{
if err := clone.Push(ctx, repository.PushOptions{
// TODO: make this configurable
Timeout: 10 * time.Minute,
}, writer); err != nil {
Timeout: 10 * time.Minute,
Progress: writer,
}); err != nil {
return fmt.Errorf("error pushing changes: %w", err)
}
}
@ -244,7 +230,7 @@ func (w *MigrationWorker) migrateFromLegacy(ctx context.Context, rw repository.R
}
// migrateFromAPIServer will export the resources from unified storage and import them into the target repository
func (w *MigrationWorker) migrateFromAPIServer(ctx context.Context, repo repository.ReaderWriter, options provisioning.MigrateJobOptions, progress jobs.JobProgressRecorder) error {
func (w *MigrationWorker) migrateFromAPIServer(ctx context.Context, repo repository.ReaderWriter, parser *resources.Parser, options provisioning.MigrateJobOptions, progress jobs.JobProgressRecorder) error {
progress.SetMessage(ctx, "exporting unified storage resources")
exportJob := provisioning.Job{
Spec: provisioning.JobSpec{
@ -259,6 +245,7 @@ func (w *MigrationWorker) migrateFromAPIServer(ctx context.Context, repo reposit
// Reset the results after the export as pull will operate on the same resources
progress.ResetResults()
progress.SetMessage(ctx, "pulling resources")
syncJob := provisioning.Job{
Spec: provisioning.JobSpec{
@ -273,11 +260,6 @@ func (w *MigrationWorker) migrateFromAPIServer(ctx context.Context, repo reposit
}
progress.SetMessage(ctx, "removing unprovisioned resources")
parser, err := w.parsers.GetParser(ctx, repo)
if err != nil {
return fmt.Errorf("error getting parser: %w", err)
}
return parser.Clients().ForEachUnmanagedResource(ctx, func(client dynamic.ResourceInterface, item *unstructured.Unstructured) error {
result := jobs.JobResourceResult{
Name: item.GetName(),
@ -286,7 +268,7 @@ func (w *MigrationWorker) migrateFromAPIServer(ctx context.Context, repo reposit
Action: repository.FileActionDeleted,
}
if err = client.Delete(ctx, item.GetName(), metav1.DeleteOptions{}); err != nil {
if err := client.Delete(ctx, item.GetName(), metav1.DeleteOptions{}); err != nil {
result.Error = fmt.Errorf("failed to delete folder: %w", err)
progress.Record(ctx, result)
return result.Error

View File

@ -45,6 +45,7 @@ import (
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs/sync"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/repository"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/repository/github"
gogit "github.com/grafana/grafana/pkg/registry/apis/provisioning/repository/go-git"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/resources"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/safepath"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/secrets"
@ -557,8 +558,6 @@ func (b *APIBuilder) GetPostStartHooks() (map[string]genericapiserver.PostStartH
exportWorker := export.NewExportWorker(
b.parsers.ClientFactory,
b.storageStatus,
b.secrets,
b.clonedir,
b.parsers,
)
syncWorker := sync.NewSyncWorker(
@ -572,10 +571,8 @@ func (b *APIBuilder) GetPostStartHooks() (map[string]genericapiserver.PostStartH
b.parsers,
b.storageStatus,
b.unified,
b.secrets,
exportWorker,
syncWorker,
b.clonedir,
)
// Pull request worker
@ -1134,7 +1131,11 @@ func (b *APIBuilder) AsRepository(ctx context.Context, r *provisioning.Repositor
r.GetName(),
)
}
return repository.NewGitHub(ctx, r, b.ghFactory, b.secrets, webhookURL)
cloneFn := func(ctx context.Context, opts repository.CloneOptions) (repository.ClonedRepository, error) {
return gogit.Clone(ctx, b.clonedir, r, opts, b.secrets)
}
return repository.NewGitHub(ctx, r, b.ghFactory, b.secrets, webhookURL, cloneFn)
default:
return nil, fmt.Errorf("unknown repository type (%s)", r.Spec.Type)
}

View File

@ -36,6 +36,8 @@ type githubRepository struct {
owner string
repo string
cloneFn CloneFn
}
var (
@ -45,6 +47,7 @@ var (
_ Writer = (*githubRepository)(nil)
_ Reader = (*githubRepository)(nil)
_ RepositoryWithURLs = (*githubRepository)(nil)
_ ClonableRepository = (*githubRepository)(nil)
)
func NewGitHub(
@ -53,6 +56,7 @@ func NewGitHub(
factory *pgh.Factory,
secrets secrets.Service,
webhookURL string,
cloneFn CloneFn,
) (*githubRepository, error) {
owner, repo, err := parseOwnerRepo(config.Spec.GitHub.URL)
if err != nil {
@ -73,6 +77,7 @@ func NewGitHub(
webhookURL: webhookURL,
owner: owner,
repo: repo,
cloneFn: cloneFn,
}, nil
}
@ -896,6 +901,10 @@ func (r *githubRepository) OnDelete(ctx context.Context) error {
return r.deleteWebhook(ctx)
}
func (r *githubRepository) Clone(ctx context.Context, opts CloneOptions) (ClonedRepository, error) {
return r.cloneFn(ctx, opts)
}
func (r *githubRepository) logger(ctx context.Context, ref string) (context.Context, logging.Logger) {
logger := logging.FromContext(ctx)

View File

@ -45,30 +45,10 @@ func init() {
var _ repository.Repository = (*GoGitRepo)(nil)
type GoGitCloneOptions struct {
Root string // tempdir (when empty, memory??)
// If the branch does not exist, create it
CreateIfNotExists bool
// Skip intermediate commits and commit all before push
SingleCommitBeforePush bool
// Maximum allowed size for repository clone in bytes (0 means no limit)
MaxSize int64
// Maximum time allowed for clone operation in seconds (0 means no limit)
Timeout time.Duration
}
type GoGitPushOptions struct {
Timeout time.Duration
}
type GoGitRepo struct {
config *provisioning.Repository
opts GoGitCloneOptions
decryptedPassword string
opts repository.CloneOptions
repo *git.Repository
tree *git.Worktree
@ -79,12 +59,12 @@ type GoGitRepo struct {
// As structured, it is valid for one context and should not be shared across multiple requests
func Clone(
ctx context.Context,
root string,
config *provisioning.Repository,
opts GoGitCloneOptions,
opts repository.CloneOptions,
secrets secrets.Service,
progress io.Writer, // os.Stdout
) (*GoGitRepo, error) {
if opts.Root == "" {
) (repository.ClonedRepository, error) {
if root == "" {
return nil, fmt.Errorf("missing root config")
}
@ -101,15 +81,20 @@ func Clone(
return nil, fmt.Errorf("error decrypting token: %w", err)
}
if err := os.MkdirAll(opts.Root, 0700); err != nil {
if err := os.MkdirAll(root, 0700); err != nil {
return nil, fmt.Errorf("create root dir: %w", err)
}
dir, err := mkdirTempClone(opts.Root, config)
dir, err := mkdirTempClone(root, config)
if err != nil {
return nil, fmt.Errorf("create temp clone dir: %w", err)
}
progress := opts.Progress
if progress == nil {
progress = io.Discard
}
repo, worktree, err := clone(ctx, config, opts, decrypted, dir, progress)
if err != nil {
if err := os.RemoveAll(dir); err != nil {
@ -121,15 +106,15 @@ func Clone(
return &GoGitRepo{
config: config,
opts: opts,
tree: worktree,
opts: opts,
decryptedPassword: string(decrypted),
repo: repo,
dir: dir,
}, nil
}
func clone(ctx context.Context, config *provisioning.Repository, opts GoGitCloneOptions, decrypted []byte, dir string, progress io.Writer) (*git.Repository, *git.Worktree, error) {
func clone(ctx context.Context, config *provisioning.Repository, opts repository.CloneOptions, decrypted []byte, dir string, progress io.Writer) (*git.Repository, *git.Worktree, error) {
gitcfg := config.Spec.GitHub
url := fmt.Sprintf("%s.git", gitcfg.URL)
@ -198,17 +183,22 @@ func mkdirTempClone(root string, config *provisioning.Repository) (string, error
return os.MkdirTemp(root, fmt.Sprintf("clone-%s-%s-", config.Namespace, config.Name))
}
// Affer making changes to the worktree, push changes
func (g *GoGitRepo) Push(ctx context.Context, opts GoGitPushOptions, progress io.Writer) error {
// After making changes to the worktree, push changes
func (g *GoGitRepo) Push(ctx context.Context, opts repository.PushOptions) error {
timeout := maxOperationTimeout
if opts.Timeout > 0 {
timeout = opts.Timeout
}
progress := opts.Progress
if progress == nil {
progress = io.Discard
}
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
if g.opts.SingleCommitBeforePush {
if !g.opts.PushOnWrites {
_, err := g.tree.Commit("exported from grafana", &git.CommitOptions{
All: true, // Add everything that changed
})
@ -336,7 +326,7 @@ func (g *GoGitRepo) Write(ctx context.Context, fpath string, ref string, data []
}
// Skip commit for each file
if g.opts.SingleCommitBeforePush {
if !g.opts.PushOnWrites {
return nil
}

View File

@ -45,7 +45,7 @@ func TestGoGitWrapper(t *testing.T) {
}
ctx := context.Background()
wrap, err := Clone(ctx, &v0alpha1.Repository{
wrap, err := Clone(ctx, "testdata/clone", &v0alpha1.Repository{
ObjectMeta: v1.ObjectMeta{
Namespace: "ns",
Name: "unit-tester",
@ -57,14 +57,13 @@ func TestGoGitWrapper(t *testing.T) {
},
},
},
GoGitCloneOptions{
Root: "testdata/clone", // where things are cloned,
// one commit (not 11)
SingleCommitBeforePush: true,
CreateIfNotExists: true,
repository.CloneOptions{
PushOnWrites: false,
CreateIfNotExists: true,
Progress: os.Stdout,
},
&dummySecret{},
os.Stdout)
)
require.NoError(t, err)
tree, err := wrap.ReadTree(ctx, "")
@ -89,9 +88,10 @@ func TestGoGitWrapper(t *testing.T) {
}
fmt.Printf("push...\n")
err = wrap.Push(ctx, GoGitPushOptions{
Timeout: 10,
}, os.Stdout)
err = wrap.Push(ctx, repository.PushOptions{
Timeout: 10,
Progress: os.Stdout,
})
require.NoError(t, err)
}

View File

@ -4,8 +4,10 @@ import (
"context"
"errors"
"fmt"
"io"
"io/fs"
"net/http"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/validation/field"
@ -44,6 +46,40 @@ type FileInfo struct {
Modified *metav1.Time
}
type CloneFn func(ctx context.Context, opts CloneOptions) (ClonedRepository, error)
type CloneOptions struct {
// If the branch does not exist, create it
CreateIfNotExists bool
// Push on every write
PushOnWrites bool
// Maximum allowed size for repository clone in bytes (0 means no limit)
MaxSize int64
// Maximum time allowed for clone operation in seconds (0 means no limit)
Timeout time.Duration
// Progress is the writer to report progress to
Progress io.Writer
}
type ClonableRepository interface {
Clone(ctx context.Context, opts CloneOptions) (ClonedRepository, error)
}
type PushOptions struct {
Timeout time.Duration
Progress io.Writer
}
type ClonedRepository interface {
ReaderWriter
Push(ctx context.Context, opts PushOptions) error
Remove(ctx context.Context) error
}
// An entry in the file tree, as returned by 'ReadFileTree'. Like FileInfo, but contains less information.
type FileTreeEntry struct {
// The path to the file from the base path given (if any).