mirror of
https://github.com/grafana/grafana.git
synced 2025-08-01 12:32:22 +08:00
Plugins: Make Installer responsible for removing plugins from file system (#73323)
* installer is responsible for removing from file system * take plugin as arg * remove resolve step * return plugin in test
This commit is contained in:
@ -486,7 +486,7 @@ func pluginAssetScenario(t *testing.T, desc string, url string, urlPattern strin
|
||||
cfg.IsFeatureToggleEnabled = func(_ string) bool { return false }
|
||||
hs := HTTPServer{
|
||||
Cfg: cfg,
|
||||
pluginStore: store.New(pluginRegistry),
|
||||
pluginStore: store.New(pluginRegistry, &fakes.FakeLoader{}),
|
||||
pluginFileStore: filestore.ProvideService(pluginRegistry),
|
||||
log: log.NewNopLogger(),
|
||||
pluginsCDNService: pluginscdn.ProvideService(&config.Cfg{
|
||||
@ -598,7 +598,7 @@ func Test_PluginsList_AccessControl(t *testing.T) {
|
||||
server := SetupAPITestServer(t, func(hs *HTTPServer) {
|
||||
hs.Cfg = setting.NewCfg()
|
||||
hs.PluginSettings = &pluginSettings
|
||||
hs.pluginStore = store.New(pluginRegistry)
|
||||
hs.pluginStore = store.New(pluginRegistry, &fakes.FakeLoader{})
|
||||
hs.pluginFileStore = filestore.ProvideService(pluginRegistry)
|
||||
var err error
|
||||
hs.pluginsUpdateChecker, err = updatechecker.ProvidePluginsService(hs.Cfg, nil, tracing.InitializeTracerForTest())
|
||||
|
@ -40,7 +40,7 @@ func (i *FakePluginInstaller) Remove(ctx context.Context, pluginID string) error
|
||||
|
||||
type FakeLoader struct {
|
||||
LoadFunc func(_ context.Context, _ plugins.PluginSource) ([]*plugins.Plugin, error)
|
||||
UnloadFunc func(_ context.Context, _ string) error
|
||||
UnloadFunc func(_ context.Context, _ *plugins.Plugin) (*plugins.Plugin, error)
|
||||
}
|
||||
|
||||
func (l *FakeLoader) Load(ctx context.Context, src plugins.PluginSource) ([]*plugins.Plugin, error) {
|
||||
@ -50,11 +50,11 @@ func (l *FakeLoader) Load(ctx context.Context, src plugins.PluginSource) ([]*plu
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (l *FakeLoader) Unload(ctx context.Context, pluginID string) error {
|
||||
func (l *FakeLoader) Unload(ctx context.Context, p *plugins.Plugin) (*plugins.Plugin, error) {
|
||||
if l.UnloadFunc != nil {
|
||||
return l.UnloadFunc(ctx, pluginID)
|
||||
return l.UnloadFunc(ctx, p)
|
||||
}
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
type FakePluginClient struct {
|
||||
@ -509,14 +509,14 @@ func (f *FakeInitializer) Initialize(ctx context.Context, ps []*plugins.Plugin)
|
||||
}
|
||||
|
||||
type FakeTerminator struct {
|
||||
TerminateFunc func(ctx context.Context, pluginID string) error
|
||||
TerminateFunc func(ctx context.Context, p *plugins.Plugin) (*plugins.Plugin, error)
|
||||
}
|
||||
|
||||
func (f *FakeTerminator) Terminate(ctx context.Context, pluginID string) error {
|
||||
func (f *FakeTerminator) Terminate(ctx context.Context, p *plugins.Plugin) (*plugins.Plugin, error) {
|
||||
if f.TerminateFunc != nil {
|
||||
return f.TerminateFunc(ctx, pluginID)
|
||||
return f.TerminateFunc(ctx, p)
|
||||
}
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
type FakeBackendPlugin struct {
|
||||
|
@ -145,9 +145,17 @@ func (m *PluginInstaller) Remove(ctx context.Context, pluginID string) error {
|
||||
return plugins.ErrUninstallCorePlugin
|
||||
}
|
||||
|
||||
if err := m.pluginLoader.Unload(ctx, plugin.ID); err != nil {
|
||||
p, err := m.pluginLoader.Unload(ctx, plugin)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if remover, ok := p.FS.(plugins.FSRemover); ok {
|
||||
if err = remover.Remove(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -40,6 +40,9 @@ func TestPluginManager_Add_Remove(t *testing.T) {
|
||||
require.Equal(t, []string{zipNameV1}, src.PluginURIs(ctx))
|
||||
return []*plugins.Plugin{pluginV1}, nil
|
||||
},
|
||||
UnloadFunc: func(_ context.Context, p *plugins.Plugin) (*plugins.Plugin, error) {
|
||||
return p, nil
|
||||
},
|
||||
}
|
||||
|
||||
pluginRepo := &fakes.FakePluginRepo{
|
||||
@ -129,9 +132,9 @@ func TestPluginManager_Add_Remove(t *testing.T) {
|
||||
|
||||
var unloadedPlugins []string
|
||||
inst.pluginLoader = &fakes.FakeLoader{
|
||||
UnloadFunc: func(_ context.Context, id string) error {
|
||||
unloadedPlugins = append(unloadedPlugins, id)
|
||||
return nil
|
||||
UnloadFunc: func(_ context.Context, p *plugins.Plugin) (*plugins.Plugin, error) {
|
||||
unloadedPlugins = append(unloadedPlugins, p.ID)
|
||||
return p, nil
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -11,5 +11,5 @@ type Service interface {
|
||||
// Load will return a list of plugins found in the provided file system paths.
|
||||
Load(ctx context.Context, src plugins.PluginSource) ([]*plugins.Plugin, error)
|
||||
// Unload will unload a specified plugin from the file system.
|
||||
Unload(ctx context.Context, pluginID string) error
|
||||
Unload(ctx context.Context, p *plugins.Plugin) (*plugins.Plugin, error)
|
||||
}
|
||||
|
@ -57,6 +57,6 @@ func (l *Loader) Load(ctx context.Context, src plugins.PluginSource) ([]*plugins
|
||||
return initializedPlugins, nil
|
||||
}
|
||||
|
||||
func (l *Loader) Unload(ctx context.Context, pluginID string) error {
|
||||
return l.termination.Terminate(ctx, pluginID)
|
||||
func (l *Loader) Unload(ctx context.Context, p *plugins.Plugin) (*plugins.Plugin, error) {
|
||||
return l.termination.Terminate(ctx, p)
|
||||
}
|
||||
|
@ -478,7 +478,9 @@ func TestLoader_Load(t *testing.T) {
|
||||
|
||||
func TestLoader_Unload(t *testing.T) {
|
||||
t.Run("Termination stage error is returned from Unload", func(t *testing.T) {
|
||||
pluginID := "grafana-test-panel"
|
||||
plugin := &plugins.Plugin{
|
||||
JSONData: plugins.JSONData{ID: "test-datasource", Type: plugins.TypeDataSource, Info: plugins.Info{Version: "1.0.0"}},
|
||||
}
|
||||
tcs := []struct {
|
||||
expectedErr error
|
||||
}{
|
||||
@ -496,13 +498,13 @@ func TestLoader_Unload(t *testing.T) {
|
||||
&fakes.FakeValidator{},
|
||||
&fakes.FakeInitializer{},
|
||||
&fakes.FakeTerminator{
|
||||
TerminateFunc: func(ctx context.Context, pID string) error {
|
||||
require.Equal(t, pluginID, pID)
|
||||
return tc.expectedErr
|
||||
TerminateFunc: func(ctx context.Context, p *plugins.Plugin) (*plugins.Plugin, error) {
|
||||
require.Equal(t, plugin, p)
|
||||
return p, tc.expectedErr
|
||||
},
|
||||
})
|
||||
|
||||
err := l.Unload(context.Background(), pluginID)
|
||||
_, err := l.Unload(context.Background(), plugin)
|
||||
require.ErrorIs(t, err, tc.expectedErr)
|
||||
}
|
||||
})
|
||||
|
@ -9,39 +9,6 @@ import (
|
||||
"github.com/grafana/grafana/pkg/plugins/manager/registry"
|
||||
)
|
||||
|
||||
// TerminablePluginResolver implements a ResolveFunc for resolving a plugin that can be terminated.
|
||||
type TerminablePluginResolver struct {
|
||||
pluginRegistry registry.Service
|
||||
log log.Logger
|
||||
}
|
||||
|
||||
// TerminablePluginResolverStep returns a new ResolveFunc for resolving a plugin that can be terminated.
|
||||
func TerminablePluginResolverStep(pluginRegistry registry.Service) ResolveFunc {
|
||||
return newTerminablePluginResolver(pluginRegistry).Resolve
|
||||
}
|
||||
|
||||
func newTerminablePluginResolver(pluginRegistry registry.Service) *TerminablePluginResolver {
|
||||
return &TerminablePluginResolver{
|
||||
pluginRegistry: pluginRegistry,
|
||||
log: log.New("plugins.resolver"),
|
||||
}
|
||||
}
|
||||
|
||||
// Resolve returns a plugin that can be terminated.
|
||||
func (r *TerminablePluginResolver) Resolve(ctx context.Context, pluginID string) (*plugins.Plugin, error) {
|
||||
p, exists := r.pluginRegistry.Plugin(ctx, pluginID)
|
||||
if !exists {
|
||||
return nil, plugins.ErrPluginNotInstalled
|
||||
}
|
||||
|
||||
// core plugins and bundled plugins cannot be terminated
|
||||
if p.IsCorePlugin() || p.IsBundledPlugin() {
|
||||
return nil, plugins.ErrUninstallCorePlugin
|
||||
}
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// BackendProcessTerminator implements a TerminateFunc for stopping a backend plugin process.
|
||||
//
|
||||
// It uses the process.Manager to stop the backend plugin process.
|
||||
@ -93,13 +60,3 @@ func (d *Deregister) Deregister(ctx context.Context, p *plugins.Plugin) error {
|
||||
d.log.Debug("Plugin unregistered", "pluginId", p.ID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// FSRemoval implements a TerminateFunc for removing plugin files from the filesystem.
|
||||
func FSRemoval(_ context.Context, p *plugins.Plugin) error {
|
||||
if remover, ok := p.FS.(plugins.FSRemover); ok {
|
||||
if err := remover.Remove(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -2,7 +2,6 @@ package termination
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/plugins/config"
|
||||
@ -11,57 +10,41 @@ import (
|
||||
|
||||
// Terminator is responsible for the Termination stage of the plugin loader pipeline.
|
||||
type Terminator interface {
|
||||
Terminate(ctx context.Context, pluginID string) error
|
||||
Terminate(ctx context.Context, p *plugins.Plugin) (*plugins.Plugin, error)
|
||||
}
|
||||
|
||||
// ResolveFunc is the function used for the Resolve step of the Termination stage.
|
||||
type ResolveFunc func(ctx context.Context, pluginID string) (*plugins.Plugin, error)
|
||||
|
||||
// TerminateFunc is the function used for the Terminate step of the Termination stage.
|
||||
type TerminateFunc func(ctx context.Context, p *plugins.Plugin) error
|
||||
|
||||
type Terminate struct {
|
||||
cfg *config.Cfg
|
||||
resolveStep ResolveFunc
|
||||
terminateSteps []TerminateFunc
|
||||
log log.Logger
|
||||
}
|
||||
|
||||
type Opts struct {
|
||||
ResolveFunc ResolveFunc
|
||||
TerminateFuncs []TerminateFunc
|
||||
}
|
||||
|
||||
// New returns a new Termination stage.
|
||||
func New(cfg *config.Cfg, opts Opts) (*Terminate, error) {
|
||||
// without a resolve function, we can't do anything so return an error
|
||||
if opts.ResolveFunc == nil && opts.TerminateFuncs != nil {
|
||||
return nil, errors.New("resolve function is required")
|
||||
}
|
||||
|
||||
if opts.TerminateFuncs == nil {
|
||||
opts.TerminateFuncs = []TerminateFunc{}
|
||||
}
|
||||
|
||||
return &Terminate{
|
||||
cfg: cfg,
|
||||
resolveStep: opts.ResolveFunc,
|
||||
terminateSteps: opts.TerminateFuncs,
|
||||
log: log.New("plugins.termination"),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Terminate will execute the Terminate steps of the Termination stage.
|
||||
func (t *Terminate) Terminate(ctx context.Context, pluginID string) error {
|
||||
p, err := t.resolveStep(ctx, pluginID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
func (t *Terminate) Terminate(ctx context.Context, p *plugins.Plugin) (*plugins.Plugin, error) {
|
||||
for _, terminate := range t.terminateSteps {
|
||||
if err = terminate(ctx, p); err != nil {
|
||||
return err
|
||||
if err := terminate(ctx, p); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return p, nil
|
||||
}
|
||||
|
@ -6,7 +6,6 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/plugins/backendplugin"
|
||||
"github.com/grafana/grafana/pkg/plugins/manager/loader"
|
||||
"github.com/grafana/grafana/pkg/plugins/manager/registry"
|
||||
"github.com/grafana/grafana/pkg/plugins/manager/sources"
|
||||
@ -16,6 +15,7 @@ var _ plugins.Store = (*Service)(nil)
|
||||
|
||||
type Service struct {
|
||||
pluginRegistry registry.Service
|
||||
pluginLoader loader.Service
|
||||
}
|
||||
|
||||
func ProvideService(pluginRegistry registry.Service, pluginSources sources.Registry,
|
||||
@ -26,7 +26,7 @@ func ProvideService(pluginRegistry registry.Service, pluginSources sources.Regis
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return New(pluginRegistry), nil
|
||||
return New(pluginRegistry, pluginLoader), nil
|
||||
}
|
||||
|
||||
func (s *Service) Run(ctx context.Context) error {
|
||||
@ -35,9 +35,10 @@ func (s *Service) Run(ctx context.Context) error {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
func New(pluginRegistry registry.Service) *Service {
|
||||
func New(pluginRegistry registry.Service, pluginLoader loader.Service) *Service {
|
||||
return &Service{
|
||||
pluginRegistry: pluginRegistry,
|
||||
pluginLoader: pluginLoader,
|
||||
}
|
||||
}
|
||||
|
||||
@ -131,16 +132,16 @@ func (s *Service) Routes(ctx context.Context) []*plugins.StaticRoute {
|
||||
|
||||
func (s *Service) shutdown(ctx context.Context) {
|
||||
var wg sync.WaitGroup
|
||||
for _, p := range s.pluginRegistry.Plugins(ctx) {
|
||||
for _, plugin := range s.pluginRegistry.Plugins(ctx) {
|
||||
wg.Add(1)
|
||||
go func(p backendplugin.Plugin, ctx context.Context) {
|
||||
go func(ctx context.Context, p *plugins.Plugin) {
|
||||
defer wg.Done()
|
||||
p.Logger().Debug("Stopping plugin")
|
||||
if err := p.Stop(ctx); err != nil {
|
||||
if _, err := s.pluginLoader.Unload(ctx, p); err != nil {
|
||||
p.Logger().Error("Failed to stop plugin", "error", err)
|
||||
}
|
||||
p.Logger().Debug("Plugin stopped")
|
||||
}(p, ctx)
|
||||
}(ctx, plugin)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
@ -61,7 +61,7 @@ func TestStore_Plugin(t *testing.T) {
|
||||
p1.ID: p1,
|
||||
p2.ID: p2,
|
||||
},
|
||||
})
|
||||
}, &fakes.FakeLoader{})
|
||||
|
||||
p, exists := ps.Plugin(context.Background(), p1.ID)
|
||||
require.False(t, exists)
|
||||
@ -90,7 +90,7 @@ func TestStore_Plugins(t *testing.T) {
|
||||
p4.ID: p4,
|
||||
p5.ID: p5,
|
||||
},
|
||||
})
|
||||
}, &fakes.FakeLoader{})
|
||||
|
||||
pss := ps.Plugins(context.Background())
|
||||
require.Equal(t, pss, []plugins.PluginDTO{p1.ToDTO(), p2.ToDTO(), p3.ToDTO(), p4.ToDTO()})
|
||||
@ -128,7 +128,7 @@ func TestStore_Routes(t *testing.T) {
|
||||
p5.ID: p5,
|
||||
p6.ID: p6,
|
||||
},
|
||||
})
|
||||
}, &fakes.FakeLoader{})
|
||||
|
||||
sr := func(p *plugins.Plugin) *plugins.StaticRoute {
|
||||
return &plugins.StaticRoute{PluginID: p.ID, Directory: p.FS.Base()}
|
||||
@ -151,7 +151,7 @@ func TestStore_Renderer(t *testing.T) {
|
||||
p2.ID: p2,
|
||||
p3.ID: p3,
|
||||
},
|
||||
})
|
||||
}, &fakes.FakeLoader{})
|
||||
|
||||
r := ps.Renderer(context.Background())
|
||||
require.Equal(t, p1, r)
|
||||
@ -172,7 +172,7 @@ func TestStore_SecretsManager(t *testing.T) {
|
||||
p3.ID: p3,
|
||||
p4.ID: p4,
|
||||
},
|
||||
})
|
||||
}, &fakes.FakeLoader{})
|
||||
|
||||
r := ps.SecretsManager(context.Background())
|
||||
require.Equal(t, p3, r)
|
||||
@ -185,10 +185,17 @@ func TestProcessManager_shutdown(t *testing.T) {
|
||||
p.RegisterClient(backend)
|
||||
p.SetLogger(log.NewTestLogger())
|
||||
|
||||
unloaded := false
|
||||
ps := New(&fakes.FakePluginRegistry{
|
||||
Store: map[string]*plugins.Plugin{
|
||||
p.ID: p,
|
||||
},
|
||||
}, &fakes.FakeLoader{
|
||||
UnloadFunc: func(_ context.Context, plugin *plugins.Plugin) (*plugins.Plugin, error) {
|
||||
require.Equal(t, p, plugin)
|
||||
unloaded = true
|
||||
return nil, nil
|
||||
},
|
||||
})
|
||||
|
||||
pCtx := context.Background()
|
||||
@ -205,8 +212,7 @@ func TestProcessManager_shutdown(t *testing.T) {
|
||||
cancel()
|
||||
wgRun.Wait()
|
||||
require.ErrorIs(t, runErr, context.Canceled)
|
||||
require.True(t, p.Exited())
|
||||
require.Equal(t, 1, backend.StopCount)
|
||||
require.True(t, unloaded)
|
||||
})
|
||||
}
|
||||
|
||||
@ -221,7 +227,7 @@ func TestStore_availablePlugins(t *testing.T) {
|
||||
p1.ID: p1,
|
||||
p2.ID: p2,
|
||||
},
|
||||
})
|
||||
}, &fakes.FakeLoader{})
|
||||
|
||||
aps := ps.availablePlugins(context.Background())
|
||||
require.Len(t, aps, 1)
|
||||
|
@ -30,6 +30,6 @@ func (l *Loader) Load(ctx context.Context, src plugins.PluginSource) ([]*plugins
|
||||
return l.loader.Load(ctx, src)
|
||||
}
|
||||
|
||||
func (l *Loader) Unload(ctx context.Context, pluginID string) error {
|
||||
return l.loader.Unload(ctx, pluginID)
|
||||
func (l *Loader) Unload(ctx context.Context, p *plugins.Plugin) (*plugins.Plugin, error) {
|
||||
return l.loader.Unload(ctx, p)
|
||||
}
|
||||
|
@ -69,11 +69,9 @@ func ProvideInitializationStage(cfg *config.Cfg, pr registry.Service, l plugins.
|
||||
|
||||
func ProvideTerminationStage(cfg *config.Cfg, pr registry.Service, pm process.Manager) (*termination.Terminate, error) {
|
||||
return termination.New(cfg, termination.Opts{
|
||||
ResolveFunc: termination.TerminablePluginResolverStep(pr),
|
||||
TerminateFuncs: []termination.TerminateFunc{
|
||||
termination.BackendProcessTerminatorStep(pm),
|
||||
termination.DeregisterStep(pr),
|
||||
termination.FSRemoval,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
Reference in New Issue
Block a user