diff --git a/pkg/api/api.go b/pkg/api/api.go index ce9ac95fbe2..4416c097015 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -265,10 +265,10 @@ func (hs *HTTPServer) registerRoutes() { apiRoute.Group("/datasources", func(datasourceRoute routing.RouteRegister) { datasourceRoute.Get("/", routing.Wrap(hs.GetDataSources)) datasourceRoute.Post("/", quota("data_source"), bind(models.AddDataSourceCommand{}), routing.Wrap(AddDataSource)) - datasourceRoute.Put("/:id", bind(models.UpdateDataSourceCommand{}), routing.Wrap(UpdateDataSource)) - datasourceRoute.Delete("/:id", routing.Wrap(DeleteDataSourceById)) - datasourceRoute.Delete("/uid/:uid", routing.Wrap(DeleteDataSourceByUID)) - datasourceRoute.Delete("/name/:name", routing.Wrap(DeleteDataSourceByName)) + datasourceRoute.Put("/:id", bind(models.UpdateDataSourceCommand{}), routing.Wrap(hs.UpdateDataSource)) + datasourceRoute.Delete("/:id", routing.Wrap(hs.DeleteDataSourceById)) + datasourceRoute.Delete("/uid/:uid", routing.Wrap(hs.DeleteDataSourceByUID)) + datasourceRoute.Delete("/name/:name", routing.Wrap(hs.DeleteDataSourceByName)) datasourceRoute.Get("/:id", routing.Wrap(GetDataSourceById)) datasourceRoute.Get("/uid/:uid", routing.Wrap(GetDataSourceByUID)) datasourceRoute.Get("/name/:name", routing.Wrap(GetDataSourceByName)) diff --git a/pkg/api/datasources.go b/pkg/api/datasources.go index e1da1718d30..d3ed473f28a 100644 --- a/pkg/api/datasources.go +++ b/pkg/api/datasources.go @@ -6,7 +6,6 @@ import ( "fmt" "sort" - "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana/pkg/api/datasource" "github.com/grafana/grafana/pkg/api/dtos" "github.com/grafana/grafana/pkg/api/response" @@ -15,6 +14,8 @@ import ( "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/plugins/adapters" "github.com/grafana/grafana/pkg/util" + + "github.com/grafana/grafana-plugin-sdk-go/backend" ) var datasourcesLogger = log.New("datasources") @@ -83,7 +84,7 @@ func GetDataSourceById(c *models.ReqContext) response.Response { return response.JSON(200, &dtos) } -func DeleteDataSourceById(c *models.ReqContext) response.Response { +func (hs *HTTPServer) DeleteDataSourceById(c *models.ReqContext) response.Response { id := c.ParamsInt64(":id") if id <= 0 { @@ -109,6 +110,8 @@ func DeleteDataSourceById(c *models.ReqContext) response.Response { return response.Error(500, "Failed to delete datasource", err) } + hs.Live.HandleDatasourceDelete(c.OrgId, ds.Uid) + return response.Success("Data source deleted") } @@ -128,7 +131,7 @@ func GetDataSourceByUID(c *models.ReqContext) response.Response { } // DELETE /api/datasources/uid/:uid -func DeleteDataSourceByUID(c *models.ReqContext) response.Response { +func (hs *HTTPServer) DeleteDataSourceByUID(c *models.ReqContext) response.Response { uid := c.Params(":uid") if uid == "" { @@ -154,10 +157,12 @@ func DeleteDataSourceByUID(c *models.ReqContext) response.Response { return response.Error(500, "Failed to delete datasource", err) } + hs.Live.HandleDatasourceDelete(c.OrgId, ds.Uid) + return response.Success("Data source deleted") } -func DeleteDataSourceByName(c *models.ReqContext) response.Response { +func (hs *HTTPServer) DeleteDataSourceByName(c *models.ReqContext) response.Response { name := c.Params(":name") if name == "" { @@ -182,6 +187,8 @@ func DeleteDataSourceByName(c *models.ReqContext) response.Response { return response.Error(500, "Failed to delete datasource", err) } + hs.Live.HandleDatasourceDelete(c.OrgId, getCmd.Result.Uid) + return response.JSON(200, util.DynMap{ "message": "Data source deleted", "id": getCmd.Result.Id, @@ -224,7 +231,7 @@ func AddDataSource(c *models.ReqContext, cmd models.AddDataSourceCommand) respon }) } -func UpdateDataSource(c *models.ReqContext, cmd models.UpdateDataSourceCommand) response.Response { +func (hs *HTTPServer) UpdateDataSource(c *models.ReqContext, cmd models.UpdateDataSourceCommand) response.Response { datasourcesLogger.Debug("Received command to update data source", "url", cmd.Url) cmd.OrgId = c.OrgId cmd.Id = c.ParamsInt64(":id") @@ -254,16 +261,18 @@ func UpdateDataSource(c *models.ReqContext, cmd models.UpdateDataSourceCommand) if errors.Is(err, models.ErrDataSourceNotFound) { return response.Error(404, "Data source not found", nil) } - return response.Error(500, "Failed to query datasources", err) + return response.Error(500, "Failed to query datasource", err) } - dtos := convertModelToDtos(query.Result) + datasourceDTO := convertModelToDtos(query.Result) + + hs.Live.HandleDatasourceUpdate(c.OrgId, datasourceDTO.UID) return response.JSON(200, util.DynMap{ "message": "Datasource updated", "id": cmd.Id, "name": cmd.Name, - "datasource": dtos, + "datasource": datasourceDTO, }) } diff --git a/pkg/api/datasources_test.go b/pkg/api/datasources_test.go index 3348857d488..a4b8311c71a 100644 --- a/pkg/api/datasources_test.go +++ b/pkg/api/datasources_test.go @@ -55,7 +55,13 @@ func TestDataSourcesProxy_userLoggedIn(t *testing.T) { loggedInUserScenario(t, "Should be able to save a data source when calling DELETE on non-existing", "/api/datasources/name/12345", func(sc *scenarioContext) { - sc.handlerFunc = DeleteDataSourceByName + // handler func being tested + hs := &HTTPServer{ + Bus: bus.GetBus(), + Cfg: setting.NewCfg(), + PluginManager: &fakePluginManager{}, + } + sc.handlerFunc = hs.DeleteDataSourceByName sc.fakeReqWithParams("DELETE", sc.url, map[string]string{}).exec() assert.Equal(t, 404, sc.resp.Code) }) diff --git a/pkg/api/plugins.go b/pkg/api/plugins.go index 76406c6833f..31f1cbd2c11 100644 --- a/pkg/api/plugins.go +++ b/pkg/api/plugins.go @@ -313,7 +313,7 @@ func (hs *HTTPServer) GetPluginAssets(c *models.ReqContext) { func (hs *HTTPServer) CheckHealth(c *models.ReqContext) response.Response { pluginID := c.Params("pluginId") - pCtx, found, err := hs.PluginContextProvider.Get(pluginID, "", c.SignedInUser) + pCtx, found, err := hs.PluginContextProvider.Get(pluginID, "", c.SignedInUser, false) if err != nil { return response.Error(500, "Failed to get plugin settings", err) } @@ -355,7 +355,7 @@ func (hs *HTTPServer) CheckHealth(c *models.ReqContext) response.Response { func (hs *HTTPServer) CallResource(c *models.ReqContext) { pluginID := c.Params("pluginId") - pCtx, found, err := hs.PluginContextProvider.Get(pluginID, "", c.SignedInUser) + pCtx, found, err := hs.PluginContextProvider.Get(pluginID, "", c.SignedInUser, false) if err != nil { c.JsonApiErr(500, "Failed to get plugin settings", err) return diff --git a/pkg/plugins/adapters/adapters.go b/pkg/plugins/adapters/adapters.go index f3ff23d89c5..e91e0dedac0 100644 --- a/pkg/plugins/adapters/adapters.go +++ b/pkg/plugins/adapters/adapters.go @@ -17,6 +17,7 @@ func ModelToInstanceSettings(ds *models.DataSource) (*backend.DataSourceInstance ID: ds.Id, Name: ds.Name, URL: ds.Url, + UID: ds.Uid, Database: ds.Database, User: ds.User, BasicAuthEnabled: ds.BasicAuth, diff --git a/pkg/plugins/backendplugin/grpcplugin/client_v2.go b/pkg/plugins/backendplugin/grpcplugin/client_v2.go index 172d6a2a01b..ba70c84c0d1 100644 --- a/pkg/plugins/backendplugin/grpcplugin/client_v2.go +++ b/pkg/plugins/backendplugin/grpcplugin/client_v2.go @@ -3,6 +3,7 @@ package grpcplugin import ( "context" "errors" + "fmt" "io" "github.com/grafana/grafana-plugin-sdk-go/backend" @@ -216,7 +217,7 @@ func (c *clientV2) RunStream(ctx context.Context, req *backend.RunStreamRequest, if errors.Is(err, io.EOF) { return nil } - return errutil.Wrap("failed to receive call resource response", err) + return fmt.Errorf("error running stream: %w", err) } if err := sender.Send(backend.FromProto().StreamPacket(protoResp)); err != nil { return err diff --git a/pkg/plugins/plugincontext/plugincontext.go b/pkg/plugins/plugincontext/plugincontext.go index cf2c2857b99..0a284af147e 100644 --- a/pkg/plugins/plugincontext/plugincontext.go +++ b/pkg/plugins/plugincontext/plugincontext.go @@ -42,7 +42,7 @@ func (p *Provider) Init() error { // Get allows getting plugin context by its id. If datasourceUID is not empty string // then PluginContext.DataSourceInstanceSettings will be resolved and appended to // returned context. -func (p *Provider) Get(pluginID string, datasourceUID string, user *models.SignedInUser) (backend.PluginContext, bool, error) { +func (p *Provider) Get(pluginID string, datasourceUID string, user *models.SignedInUser, skipCache bool) (backend.PluginContext, bool, error) { pc := backend.PluginContext{} plugin := p.PluginManager.GetPlugin(pluginID) if plugin == nil { @@ -81,7 +81,7 @@ func (p *Provider) Get(pluginID string, datasourceUID string, user *models.Signe } if datasourceUID != "" { - ds, err := p.DatasourceCache.GetDatasourceByUID(datasourceUID, user, false) + ds, err := p.DatasourceCache.GetDatasourceByUID(datasourceUID, user, skipCache) if err != nil { return pc, false, errutil.Wrap("Failed to get datasource", err) } diff --git a/pkg/services/live/features/plugin.go b/pkg/services/live/features/plugin.go index 5206c73ded9..5dc260904b5 100644 --- a/pkg/services/live/features/plugin.go +++ b/pkg/services/live/features/plugin.go @@ -3,17 +3,18 @@ package features import ( "context" + "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/services/live/orgchannel" "github.com/grafana/grafana/pkg/services/live/runstream" "github.com/centrifugal/centrifuge" "github.com/grafana/grafana-plugin-sdk-go/backend" - "github.com/grafana/grafana/pkg/models" ) //go:generate mockgen -destination=plugin_mock.go -package=features github.com/grafana/grafana/pkg/services/live/features PluginContextGetter type PluginContextGetter interface { - GetPluginContext(user *models.SignedInUser, pluginID string, datasourceUID string) (backend.PluginContext, bool, error) + GetPluginContext(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) } // PluginRunner can handle streaming operations for channels belonging to plugins. @@ -60,7 +61,7 @@ type PluginPathRunner struct { // OnSubscribe passes control to a plugin. func (r *PluginPathRunner) OnSubscribe(ctx context.Context, user *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) { - pCtx, found, err := r.pluginContextGetter.GetPluginContext(user, r.pluginID, r.datasourceUID) + pCtx, found, err := r.pluginContextGetter.GetPluginContext(user, r.pluginID, r.datasourceUID, false) if err != nil { logger.Error("Get plugin context error", "error", err, "path", r.path) return models.SubscribeReply{}, 0, err @@ -81,7 +82,7 @@ func (r *PluginPathRunner) OnSubscribe(ctx context.Context, user *models.SignedI return models.SubscribeReply{}, resp.Status, nil } - submitResult, err := r.runStreamManager.SubmitStream(ctx, user.OrgId, e.Channel, r.path, pCtx, r.handler) + submitResult, err := r.runStreamManager.SubmitStream(ctx, user, orgchannel.PrependOrgID(user.OrgId, e.Channel), r.path, pCtx, r.handler, false) if err != nil { logger.Error("Error submitting stream to manager", "error", err, "path", r.path) return models.SubscribeReply{}, 0, centrifuge.ErrorInternal @@ -89,7 +90,7 @@ func (r *PluginPathRunner) OnSubscribe(ctx context.Context, user *models.SignedI if submitResult.StreamExists { logger.Debug("Skip running new stream (already exists)", "path", r.path) } else { - logger.Debug("Running a new keepalive stream", "path", r.path) + logger.Debug("Running a new unidirectional stream", "path", r.path) } reply := models.SubscribeReply{ @@ -101,7 +102,7 @@ func (r *PluginPathRunner) OnSubscribe(ctx context.Context, user *models.SignedI // OnPublish passes control to a plugin. func (r *PluginPathRunner) OnPublish(ctx context.Context, user *models.SignedInUser, e models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) { - pCtx, found, err := r.pluginContextGetter.GetPluginContext(user, r.pluginID, r.datasourceUID) + pCtx, found, err := r.pluginContextGetter.GetPluginContext(user, r.pluginID, r.datasourceUID, false) if err != nil { logger.Error("Get plugin context error", "error", err, "path", r.path) return models.PublishReply{}, 0, err diff --git a/pkg/services/live/live.go b/pkg/services/live/live.go index 44d57ff2b7b..e5a163c3f28 100644 --- a/pkg/services/live/live.go +++ b/pkg/services/live/live.go @@ -156,7 +156,7 @@ func (g *GrafanaLive) Init() error { g.contextGetter = newPluginContextGetter(g.PluginContextProvider) packetSender := newPluginPacketSender(node) presenceGetter := newPluginPresenceGetter(node) - g.runStreamManager = runstream.NewManager(packetSender, presenceGetter) + g.runStreamManager = runstream.NewManager(packetSender, presenceGetter, g.contextGetter) // Initialize the main features dash := &features.DashboardHandler{ @@ -283,6 +283,26 @@ func runConcurrentlyIfNeeded(ctx context.Context, semaphore chan struct{}, fn fu return nil } +func (g *GrafanaLive) HandleDatasourceDelete(orgID int64, dsUID string) { + if g.runStreamManager == nil { + return + } + err := g.runStreamManager.HandleDatasourceDelete(orgID, dsUID) + if err != nil { + logger.Error("Error handling datasource delete", "error", err) + } +} + +func (g *GrafanaLive) HandleDatasourceUpdate(orgID int64, dsUID string) { + if g.runStreamManager == nil { + return + } + err := g.runStreamManager.HandleDatasourceUpdate(orgID, dsUID) + if err != nil { + logger.Error("Error handling datasource update", "error", err) + } +} + func (g *GrafanaLive) handleOnSubscribe(client *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) { logger.Debug("Client wants to subscribe", "user", client.UserID(), "client", client.ID(), "channel", e.Channel) diff --git a/pkg/services/live/plugin_helpers.go b/pkg/services/live/plugin_helpers.go index 69a23b79bb3..223d8877f6d 100644 --- a/pkg/services/live/plugin_helpers.go +++ b/pkg/services/live/plugin_helpers.go @@ -52,6 +52,6 @@ func newPluginContextGetter(pluginContextProvider *plugincontext.Provider) *plug } } -func (g *pluginContextGetter) GetPluginContext(user *models.SignedInUser, pluginID string, datasourceUID string) (backend.PluginContext, bool, error) { - return g.PluginContextProvider.Get(pluginID, datasourceUID, user) +func (g *pluginContextGetter) GetPluginContext(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) { + return g.PluginContextProvider.Get(pluginID, datasourceUID, user, skipCache) } diff --git a/pkg/services/live/runstream/manager.go b/pkg/services/live/runstream/manager.go index ef4ed98f208..e0959521f46 100644 --- a/pkg/services/live/runstream/manager.go +++ b/pkg/services/live/runstream/manager.go @@ -3,12 +3,13 @@ package runstream import ( "context" "errors" + "fmt" "math" "sync" "time" "github.com/grafana/grafana/pkg/infra/log" - "github.com/grafana/grafana/pkg/services/live/orgchannel" + "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana-plugin-sdk-go/backend" ) @@ -17,12 +18,16 @@ var ( logger = log.New("live.runstream") ) -//go:generate mockgen -destination=mock.go -package=runstream github.com/grafana/grafana/pkg/services/live/runstream StreamPacketSender,PresenceGetter,StreamRunner +//go:generate mockgen -destination=mock.go -package=runstream github.com/grafana/grafana/pkg/services/live/runstream StreamPacketSender,PresenceGetter,StreamRunner,PluginContextGetter type StreamPacketSender interface { Send(channel string, packet *backend.StreamPacket) error } +type PluginContextGetter interface { + GetPluginContext(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) +} + type PresenceGetter interface { GetNumSubscribers(channel string) (int, error) } @@ -49,14 +54,18 @@ func (p *streamSender) Send(packet *backend.StreamPacket) error { // Manager manages streams from Grafana to plugins (i.e. RunStream method). type Manager struct { - mu sync.RWMutex - streams map[string]chan struct{} - presenceGetter PresenceGetter - packetSender StreamPacketSender - registerCh chan submitRequest - closedCh chan struct{} - checkInterval time.Duration - maxChecks int + mu sync.RWMutex + baseCtx context.Context + streams map[string]streamContext + datasourceStreams map[string]map[string]struct{} + presenceGetter PresenceGetter + pluginContextGetter PluginContextGetter + packetSender StreamPacketSender + registerCh chan submitRequest + closedCh chan struct{} + checkInterval time.Duration + maxChecks int + datasourceCheckInterval time.Duration } // ManagerOption modifies Manager behavior (used for tests for example). @@ -71,20 +80,24 @@ func WithCheckConfig(interval time.Duration, maxChecks int) ManagerOption { } const ( - defaultCheckInterval = 5 * time.Second - defaultMaxChecks = 3 + defaultCheckInterval = 5 * time.Second + defaultDatasourceCheckInterval = 60 * time.Second + defaultMaxChecks = 3 ) // NewManager creates new Manager. -func NewManager(packetSender StreamPacketSender, presenceGetter PresenceGetter, opts ...ManagerOption) *Manager { +func NewManager(packetSender StreamPacketSender, presenceGetter PresenceGetter, pluginContextGetter PluginContextGetter, opts ...ManagerOption) *Manager { sm := &Manager{ - streams: make(map[string]chan struct{}), - packetSender: packetSender, - presenceGetter: presenceGetter, - registerCh: make(chan submitRequest), - closedCh: make(chan struct{}), - checkInterval: defaultCheckInterval, - maxChecks: defaultMaxChecks, + streams: make(map[string]streamContext), + datasourceStreams: map[string]map[string]struct{}{}, + packetSender: packetSender, + presenceGetter: presenceGetter, + pluginContextGetter: pluginContextGetter, + registerCh: make(chan submitRequest), + closedCh: make(chan struct{}), + checkInterval: defaultCheckInterval, + maxChecks: defaultMaxChecks, + datasourceCheckInterval: defaultDatasourceCheckInterval, } for _, opt := range opts { opt(sm) @@ -92,25 +105,109 @@ func NewManager(packetSender StreamPacketSender, presenceGetter PresenceGetter, return sm } +func (s *Manager) HandleDatasourceDelete(orgID int64, dsUID string) error { + return s.handleDatasourceEvent(orgID, dsUID, false) +} + +func (s *Manager) HandleDatasourceUpdate(orgID int64, dsUID string) error { + return s.handleDatasourceEvent(orgID, dsUID, true) +} + +func (s *Manager) handleDatasourceEvent(orgID int64, dsUID string, resubmit bool) error { + dsKey := datasourceKey(orgID, dsUID) + s.mu.RLock() + dsStreams, ok := s.datasourceStreams[dsKey] + if !ok { + s.mu.RUnlock() + return nil + } + var resubmitRequests []streamRequest + var waitChannels []chan struct{} + for channel := range dsStreams { + streamCtx, ok := s.streams[channel] + if !ok { + continue + } + streamCtx.cancelFn() + waitChannels = append(waitChannels, streamCtx.CloseCh) + resubmitRequests = append(resubmitRequests, streamCtx.streamRequest) + } + s.mu.RUnlock() + + // Wait for all streams to stop. + for _, ch := range waitChannels { + <-ch + } + + if resubmit { + // Re-submit streams. + for _, sr := range resubmitRequests { + _, err := s.SubmitStream(s.baseCtx, sr.user, sr.Channel, sr.Path, sr.PluginContext, sr.StreamRunner, true) + if err != nil { + // Log error but do not prevent execution of caller routine. + logger.Error("Error re-submitting stream", "path", sr.Path, "error", err) + } + } + } + + return nil +} + +func datasourceKey(orgID int64, dsUID string) string { + return fmt.Sprintf("%d_%s", orgID, dsUID) +} + func (s *Manager) stopStream(sr streamRequest, cancelFn func()) { s.mu.Lock() defer s.mu.Unlock() - closeCh, ok := s.streams[sr.Channel] + streamCtx, ok := s.streams[sr.Channel] if !ok { return } + closeCh := streamCtx.CloseCh delete(s.streams, sr.Channel) + if sr.PluginContext.DataSourceInstanceSettings != nil { + dsUID := sr.PluginContext.DataSourceInstanceSettings.UID + dsKey := datasourceKey(sr.PluginContext.OrgID, dsUID) + delete(s.datasourceStreams[dsKey], sr.Channel) + } cancelFn() close(closeCh) } func (s *Manager) watchStream(ctx context.Context, cancelFn func(), sr streamRequest) { numNoSubscribersChecks := 0 + presenceTicker := time.NewTicker(s.checkInterval) + defer presenceTicker.Stop() + datasourceTicker := time.NewTicker(s.datasourceCheckInterval) + defer datasourceTicker.Stop() for { select { case <-ctx.Done(): return - case <-time.After(s.checkInterval): + case <-datasourceTicker.C: + if sr.PluginContext.DataSourceInstanceSettings != nil { + dsUID := sr.PluginContext.DataSourceInstanceSettings.UID + pCtx, ok, err := s.pluginContextGetter.GetPluginContext(sr.user, sr.PluginContext.PluginID, dsUID, false) + if err != nil { + logger.Error("Error getting datasource context", "channel", sr.Channel, "path", sr.Path, "error", err) + continue + } + if !ok { + logger.Debug("Datasource not found, stop stream", "channel", sr.Channel, "path", sr.Path) + return + } + if pCtx.DataSourceInstanceSettings.Updated != sr.PluginContext.DataSourceInstanceSettings.Updated { + logger.Debug("Datasource changed, re-establish stream", "channel", sr.Channel, "path", sr.Path) + err := s.HandleDatasourceUpdate(pCtx.OrgID, dsUID) + if err != nil { + logger.Error("Error re-establishing stream", "channel", sr.Channel, "path", sr.Path, "error", err) + continue + } + return + } + } + case <-presenceTicker.C: numSubscribers, err := s.presenceGetter.GetNumSubscribers(sr.Channel) if err != nil { logger.Error("Error checking num subscribers", "channel", sr.Channel, "path", sr.Path) @@ -148,31 +245,24 @@ func getDelay(numErrors int) time.Duration { // run stream until context canceled or stream finished without an error. func (s *Manager) runStream(ctx context.Context, cancelFn func(), sr streamRequest) { + defer func() { s.stopStream(sr, cancelFn) }() var numFastErrors int var delay time.Duration + var isReconnect bool + startTime := time.Now() for { select { case <-ctx.Done(): return default: } - startTime := time.Now() - err := sr.StreamRunner.RunStream( - ctx, - &backend.RunStreamRequest{ - PluginContext: sr.PluginContext, - Path: sr.Path, - }, - newStreamSender(sr.Channel, s.packetSender), - ) - if err != nil { - if errors.Is(ctx.Err(), context.Canceled) { - logger.Debug("Stream cleanly finished", "path", sr.Path) - return - } + + pluginCtx := sr.PluginContext + + if isReconnect { // Best effort to cool down re-establishment process. We don't have a // nice way to understand whether we really need to wait here - so relying - // on RunStream duration time. + // on duration time of running a stream. if time.Since(startTime) < streamDurationThreshold { if delay < maxDelay { // Due to not calling getDelay after we have delay larger than maxDelay @@ -186,29 +276,85 @@ func (s *Manager) runStream(ctx context.Context, cancelFn func(), sr streamReque delay = 0 numFastErrors = 0 } + select { + case <-ctx.Done(): + return + case <-time.After(delay): + } + startTime = time.Now() + + // Resolve new plugin context as it could be modified since last call. + // We are using the same user here which initiated stream originally. + var datasourceUID string + if pluginCtx.DataSourceInstanceSettings != nil { + datasourceUID = pluginCtx.DataSourceInstanceSettings.UID + } + newPluginCtx, ok, err := s.pluginContextGetter.GetPluginContext(sr.user, pluginCtx.PluginID, datasourceUID, false) + if err != nil { + logger.Error("Error getting plugin context", "path", sr.Path, "error", err) + isReconnect = true + continue + } + if !ok { + logger.Info("No plugin context found, stopping stream", "path", sr.Path) + return + } + pluginCtx = newPluginCtx + } + + err := sr.StreamRunner.RunStream( + ctx, + &backend.RunStreamRequest{ + PluginContext: pluginCtx, + Path: sr.Path, + }, + newStreamSender(sr.Channel, s.packetSender), + ) + if err != nil { + if errors.Is(ctx.Err(), context.Canceled) { + logger.Debug("Stream cleanly finished", "path", sr.Path) + return + } logger.Error("Error running stream, re-establishing", "path", sr.Path, "error", err, "wait", delay) - time.Sleep(delay) + isReconnect = true continue } logger.Debug("Stream finished without error, stopping it", "path", sr.Path) - s.stopStream(sr, cancelFn) return } } var errClosed = errors.New("stream manager closed") +type streamContext struct { + CloseCh chan struct{} + cancelFn func() + streamRequest streamRequest +} + func (s *Manager) registerStream(ctx context.Context, sr submitRequest) { s.mu.Lock() - if closeCh, ok := s.streams[sr.streamRequest.Channel]; ok { + if streamCtx, ok := s.streams[sr.streamRequest.Channel]; ok { s.mu.Unlock() - sr.responseCh <- submitResponse{Result: submitResult{StreamExists: true, CloseNotify: closeCh}} + sr.responseCh <- submitResponse{Result: submitResult{StreamExists: true, CloseNotify: streamCtx.CloseCh}} return } ctx, cancel := context.WithCancel(ctx) defer cancel() closeCh := make(chan struct{}) - s.streams[sr.streamRequest.Channel] = closeCh + s.streams[sr.streamRequest.Channel] = streamContext{ + CloseCh: closeCh, + cancelFn: cancel, + streamRequest: sr.streamRequest, + } + if sr.streamRequest.PluginContext.DataSourceInstanceSettings != nil { + dsUID := sr.streamRequest.PluginContext.DataSourceInstanceSettings.UID + dsKey := datasourceKey(sr.streamRequest.PluginContext.OrgID, dsUID) + if _, ok := s.datasourceStreams[dsKey]; !ok { + s.datasourceStreams[dsKey] = map[string]struct{}{} + } + s.datasourceStreams[dsKey][sr.streamRequest.Channel] = struct{}{} + } s.mu.Unlock() sr.responseCh <- submitResponse{Result: submitResult{StreamExists: false, CloseNotify: closeCh}} go s.watchStream(ctx, cancel, sr.streamRequest) @@ -217,6 +363,7 @@ func (s *Manager) registerStream(ctx context.Context, sr submitRequest) { // Run Manager till context canceled. func (s *Manager) Run(ctx context.Context) error { + s.baseCtx = ctx for { select { case sr := <-s.registerCh: @@ -231,6 +378,7 @@ func (s *Manager) Run(ctx context.Context) error { type streamRequest struct { Channel string Path string + user *models.SignedInUser PluginContext backend.PluginContext StreamRunner StreamRunner } @@ -252,13 +400,32 @@ type submitResponse struct { Result submitResult } +var errDatasourceNotFound = errors.New("datasource not found") + // SubmitStream submits stream handler in Manager to manage. // The stream will be opened and kept till channel has active subscribers. -func (s *Manager) SubmitStream(ctx context.Context, orgID int64, channel string, path string, pCtx backend.PluginContext, streamRunner StreamRunner) (*submitResult, error) { +func (s *Manager) SubmitStream(ctx context.Context, user *models.SignedInUser, channel string, path string, pCtx backend.PluginContext, streamRunner StreamRunner, isResubmit bool) (*submitResult, error) { + if isResubmit { + // Resolve new plugin context as it could be modified since last call. + var datasourceUID string + if pCtx.DataSourceInstanceSettings != nil { + datasourceUID = pCtx.DataSourceInstanceSettings.UID + } + newPluginCtx, ok, err := s.pluginContextGetter.GetPluginContext(user, pCtx.PluginID, datasourceUID, false) + if err != nil { + return nil, err + } + if !ok { + return nil, errDatasourceNotFound + } + pCtx = newPluginCtx + } + req := submitRequest{ responseCh: make(chan submitResponse, 1), streamRequest: streamRequest{ - Channel: orgchannel.PrependOrgID(orgID, channel), + user: user, + Channel: channel, Path: path, PluginContext: pCtx, StreamRunner: streamRunner, diff --git a/pkg/services/live/runstream/manager_test.go b/pkg/services/live/runstream/manager_test.go index 7ffe1319dbe..f8ac522ca7e 100644 --- a/pkg/services/live/runstream/manager_test.go +++ b/pkg/services/live/runstream/manager_test.go @@ -6,14 +6,16 @@ import ( "testing" "time" - "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/models" "github.com/golang/mock/gomock" + "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/stretchr/testify/require" ) // wait until channel closed with timeout. func waitWithTimeout(tb testing.TB, ch chan struct{}, timeout time.Duration) { + tb.Helper() select { case <-ch: case <-time.After(timeout): @@ -27,8 +29,9 @@ func TestStreamManager_Run(t *testing.T) { mockPacketSender := NewMockStreamPacketSender(mockCtrl) mockPresenceGetter := NewMockPresenceGetter(mockCtrl) + mockContextGetter := NewMockPluginContextGetter(mockCtrl) - manager := NewManager(mockPacketSender, mockPresenceGetter) + manager := NewManager(mockPacketSender, mockPresenceGetter, mockContextGetter) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -47,8 +50,9 @@ func TestStreamManager_SubmitStream_Send(t *testing.T) { mockPacketSender := NewMockStreamPacketSender(mockCtrl) mockPresenceGetter := NewMockPresenceGetter(mockCtrl) + mockContextGetter := NewMockPluginContextGetter(mockCtrl) - manager := NewManager(mockPacketSender, mockPresenceGetter) + manager := NewManager(mockPacketSender, mockPresenceGetter, mockContextGetter) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -59,6 +63,22 @@ func TestStreamManager_SubmitStream_Send(t *testing.T) { startedCh := make(chan struct{}) doneCh := make(chan struct{}) + testPluginContext := backend.PluginContext{ + OrgID: 1, + PluginID: "test-plugin", + DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{ + UID: "xyz", + }, + } + + mockContextGetter.EXPECT().GetPluginContext(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) { + require.Equal(t, int64(2), user.UserId) + require.Equal(t, int64(1), user.OrgId) + require.Equal(t, testPluginContext.PluginID, pluginID) + require.Equal(t, testPluginContext.DataSourceInstanceSettings.UID, datasourceUID) + return testPluginContext, true, nil + }).Times(0) + mockPacketSender.EXPECT().Send("1/test", gomock.Any()).Times(1) mockStreamRunner := NewMockStreamRunner(mockCtrl) @@ -76,12 +96,12 @@ func TestStreamManager_SubmitStream_Send(t *testing.T) { return ctx.Err() }).Times(1) - result, err := manager.SubmitStream(context.Background(), 1, "test", "test", backend.PluginContext{}, mockStreamRunner) + result, err := manager.SubmitStream(context.Background(), &models.SignedInUser{UserId: 2, OrgId: 1}, "1/test", "test", testPluginContext, mockStreamRunner, false) require.NoError(t, err) require.False(t, result.StreamExists) // try submit the same. - result, err = manager.SubmitStream(context.Background(), 1, "test", "test", backend.PluginContext{}, mockStreamRunner) + result, err = manager.SubmitStream(context.Background(), &models.SignedInUser{UserId: 2, OrgId: 1}, "1/test", "test", backend.PluginContext{}, mockStreamRunner, false) require.NoError(t, err) require.True(t, result.StreamExists) @@ -97,8 +117,9 @@ func TestStreamManager_SubmitStream_DifferentOrgID(t *testing.T) { mockPacketSender := NewMockStreamPacketSender(mockCtrl) mockPresenceGetter := NewMockPresenceGetter(mockCtrl) + mockContextGetter := NewMockPluginContextGetter(mockCtrl) - manager := NewManager(mockPacketSender, mockPresenceGetter) + manager := NewManager(mockPacketSender, mockPresenceGetter, mockContextGetter) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -114,6 +135,10 @@ func TestStreamManager_SubmitStream_DifferentOrgID(t *testing.T) { mockPacketSender.EXPECT().Send("1/test", gomock.Any()).Times(1) mockPacketSender.EXPECT().Send("2/test", gomock.Any()).Times(1) + mockContextGetter.EXPECT().GetPluginContext(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) { + return backend.PluginContext{}, true, nil + }).Times(0) + mockStreamRunner1 := NewMockStreamRunner(mockCtrl) mockStreamRunner1.EXPECT().RunStream( gomock.Any(), gomock.Any(), gomock.Any(), @@ -144,12 +169,12 @@ func TestStreamManager_SubmitStream_DifferentOrgID(t *testing.T) { return ctx.Err() }).Times(1) - result, err := manager.SubmitStream(context.Background(), 1, "test", "test", backend.PluginContext{}, mockStreamRunner1) + result, err := manager.SubmitStream(context.Background(), &models.SignedInUser{UserId: 2, OrgId: 1}, "1/test", "test", backend.PluginContext{}, mockStreamRunner1, false) require.NoError(t, err) require.False(t, result.StreamExists) // try submit the same channel but different orgID. - result, err = manager.SubmitStream(context.Background(), 2, "test", "test", backend.PluginContext{}, mockStreamRunner2) + result, err = manager.SubmitStream(context.Background(), &models.SignedInUser{UserId: 2, OrgId: 2}, "2/test", "test", backend.PluginContext{}, mockStreamRunner2, false) require.NoError(t, err) require.False(t, result.StreamExists) @@ -167,11 +192,13 @@ func TestStreamManager_SubmitStream_CloseNoSubscribers(t *testing.T) { mockPacketSender := NewMockStreamPacketSender(mockCtrl) mockPresenceGetter := NewMockPresenceGetter(mockCtrl) + mockContextGetter := NewMockPluginContextGetter(mockCtrl) // Create manager with very fast num subscribers checks. manager := NewManager( mockPacketSender, mockPresenceGetter, + mockContextGetter, WithCheckConfig(10*time.Millisecond, 3), ) @@ -184,6 +211,10 @@ func TestStreamManager_SubmitStream_CloseNoSubscribers(t *testing.T) { startedCh := make(chan struct{}) doneCh := make(chan struct{}) + mockContextGetter.EXPECT().GetPluginContext(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) { + return backend.PluginContext{}, true, nil + }).Times(0) + mockPresenceGetter.EXPECT().GetNumSubscribers("1/test").Return(0, nil).Times(3) mockStreamRunner := NewMockStreamRunner(mockCtrl) @@ -194,7 +225,7 @@ func TestStreamManager_SubmitStream_CloseNoSubscribers(t *testing.T) { return ctx.Err() }).Times(1) - _, err := manager.SubmitStream(context.Background(), 1, "test", "test", backend.PluginContext{}, mockStreamRunner) + _, err := manager.SubmitStream(context.Background(), &models.SignedInUser{UserId: 2, OrgId: 1}, "1/test", "test", backend.PluginContext{}, mockStreamRunner, false) require.NoError(t, err) waitWithTimeout(t, startedCh, time.Second) @@ -208,8 +239,9 @@ func TestStreamManager_SubmitStream_ErrorRestartsRunStream(t *testing.T) { mockPacketSender := NewMockStreamPacketSender(mockCtrl) mockPresenceGetter := NewMockPresenceGetter(mockCtrl) + mockContextGetter := NewMockPluginContextGetter(mockCtrl) - manager := NewManager(mockPacketSender, mockPresenceGetter) + manager := NewManager(mockPacketSender, mockPresenceGetter, mockContextGetter) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -220,6 +252,22 @@ func TestStreamManager_SubmitStream_ErrorRestartsRunStream(t *testing.T) { numErrors := 3 currentErrors := 0 + testPluginContext := backend.PluginContext{ + OrgID: 1, + PluginID: "test-plugin", + DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{ + UID: "xyz", + }, + } + + mockContextGetter.EXPECT().GetPluginContext(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) { + require.Equal(t, int64(2), user.UserId) + require.Equal(t, int64(1), user.OrgId) + require.Equal(t, testPluginContext.PluginID, pluginID) + require.Equal(t, testPluginContext.DataSourceInstanceSettings.UID, datasourceUID) + return testPluginContext, true, nil + }).Times(numErrors) + mockStreamRunner := NewMockStreamRunner(mockCtrl) mockStreamRunner.EXPECT().RunStream( gomock.Any(), gomock.Any(), gomock.Any(), @@ -231,7 +279,7 @@ func TestStreamManager_SubmitStream_ErrorRestartsRunStream(t *testing.T) { return errors.New("boom") }).Times(numErrors + 1) - result, err := manager.SubmitStream(context.Background(), 1, "test", "test", backend.PluginContext{}, mockStreamRunner) + result, err := manager.SubmitStream(context.Background(), &models.SignedInUser{UserId: 2, OrgId: 1}, "test", "test", testPluginContext, mockStreamRunner, false) require.NoError(t, err) require.False(t, result.StreamExists) @@ -244,8 +292,9 @@ func TestStreamManager_SubmitStream_NilErrorStopsRunStream(t *testing.T) { mockPacketSender := NewMockStreamPacketSender(mockCtrl) mockPresenceGetter := NewMockPresenceGetter(mockCtrl) + mockContextGetter := NewMockPluginContextGetter(mockCtrl) - manager := NewManager(mockPacketSender, mockPresenceGetter) + manager := NewManager(mockPacketSender, mockPresenceGetter, mockContextGetter) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -253,6 +302,10 @@ func TestStreamManager_SubmitStream_NilErrorStopsRunStream(t *testing.T) { _ = manager.Run(ctx) }() + mockContextGetter.EXPECT().GetPluginContext(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) { + return backend.PluginContext{}, true, nil + }).Times(0) + mockStreamRunner := NewMockStreamRunner(mockCtrl) mockStreamRunner.EXPECT().RunStream( gomock.Any(), gomock.Any(), gomock.Any(), @@ -260,8 +313,127 @@ func TestStreamManager_SubmitStream_NilErrorStopsRunStream(t *testing.T) { return nil }).Times(1) - result, err := manager.SubmitStream(context.Background(), 1, "test", "test", backend.PluginContext{}, mockStreamRunner) + result, err := manager.SubmitStream(context.Background(), &models.SignedInUser{UserId: 2, OrgId: 1}, "test", "test", backend.PluginContext{}, mockStreamRunner, false) require.NoError(t, err) require.False(t, result.StreamExists) waitWithTimeout(t, result.CloseNotify, time.Second) } + +func TestStreamManager_HandleDatasourceUpdate(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mockPacketSender := NewMockStreamPacketSender(mockCtrl) + mockPresenceGetter := NewMockPresenceGetter(mockCtrl) + mockContextGetter := NewMockPluginContextGetter(mockCtrl) + + manager := NewManager(mockPacketSender, mockPresenceGetter, mockContextGetter) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + _ = manager.Run(ctx) + }() + + testPluginContext := backend.PluginContext{ + OrgID: 1, + PluginID: "test-plugin", + DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{ + UID: "xyz", + }, + } + + mockContextGetter.EXPECT().GetPluginContext(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) { + require.Equal(t, int64(2), user.UserId) + require.Equal(t, int64(1), user.OrgId) + require.Equal(t, testPluginContext.PluginID, pluginID) + require.Equal(t, testPluginContext.DataSourceInstanceSettings.UID, datasourceUID) + return testPluginContext, true, nil + }).Times(1) + + isFirstCall := true + + doneCh1 := make(chan struct{}) + doneCh2 := make(chan struct{}) + + mockStreamRunner := NewMockStreamRunner(mockCtrl) + mockStreamRunner.EXPECT().RunStream( + gomock.Any(), gomock.Any(), gomock.Any(), + ).DoAndReturn(func(ctx context.Context, req *backend.RunStreamRequest, sender backend.StreamPacketSender) error { + if isFirstCall { + // first RunStream will wait till context done. + isFirstCall = false + close(doneCh1) + <-ctx.Done() + return ctx.Err() + } + // second RunStream finishes immediately since we are waiting for it below. + close(doneCh2) + return nil + }).Times(2) + + result, err := manager.SubmitStream(context.Background(), &models.SignedInUser{UserId: 2, OrgId: 1}, "test", "test", testPluginContext, mockStreamRunner, false) + require.NoError(t, err) + require.False(t, result.StreamExists) + + waitWithTimeout(t, doneCh1, time.Second) + + err = manager.HandleDatasourceUpdate(1, "xyz") + require.NoError(t, err) + + waitWithTimeout(t, result.CloseNotify, time.Second) + waitWithTimeout(t, doneCh2, time.Second) +} + +func TestStreamManager_HandleDatasourceDelete(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mockPacketSender := NewMockStreamPacketSender(mockCtrl) + mockPresenceGetter := NewMockPresenceGetter(mockCtrl) + mockContextGetter := NewMockPluginContextGetter(mockCtrl) + + manager := NewManager(mockPacketSender, mockPresenceGetter, mockContextGetter) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + _ = manager.Run(ctx) + }() + + testPluginContext := backend.PluginContext{ + OrgID: 1, + PluginID: "test-plugin", + DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{ + UID: "xyz", + }, + } + + mockContextGetter.EXPECT().GetPluginContext(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) { + require.Equal(t, int64(2), user.UserId) + require.Equal(t, int64(1), user.OrgId) + require.Equal(t, testPluginContext.PluginID, pluginID) + require.Equal(t, testPluginContext.DataSourceInstanceSettings.UID, datasourceUID) + return testPluginContext, true, nil + }).Times(0) + + doneCh := make(chan struct{}) + + mockStreamRunner := NewMockStreamRunner(mockCtrl) + mockStreamRunner.EXPECT().RunStream( + gomock.Any(), gomock.Any(), gomock.Any(), + ).DoAndReturn(func(ctx context.Context, req *backend.RunStreamRequest, sender backend.StreamPacketSender) error { + close(doneCh) + <-ctx.Done() + return ctx.Err() + }).Times(1) + + result, err := manager.SubmitStream(context.Background(), &models.SignedInUser{UserId: 2, OrgId: 1}, "test", "test", testPluginContext, mockStreamRunner, false) + require.NoError(t, err) + require.False(t, result.StreamExists) + + waitWithTimeout(t, doneCh, time.Second) + err = manager.HandleDatasourceDelete(1, "xyz") + require.NoError(t, err) + waitWithTimeout(t, result.CloseNotify, time.Second) +} diff --git a/pkg/services/live/runstream/mock.go b/pkg/services/live/runstream/mock.go index d4d1a2a2092..e3528153bc6 100644 --- a/pkg/services/live/runstream/mock.go +++ b/pkg/services/live/runstream/mock.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/grafana/grafana/pkg/services/live/runstream (interfaces: StreamPacketSender,PresenceGetter,StreamRunner) +// Source: github.com/grafana/grafana/pkg/services/live/runstream (interfaces: StreamPacketSender,PresenceGetter,StreamRunner,PluginContextGetter) // Package runstream is a generated GoMock package. package runstream @@ -10,6 +10,7 @@ import ( gomock "github.com/golang/mock/gomock" backend "github.com/grafana/grafana-plugin-sdk-go/backend" + models "github.com/grafana/grafana/pkg/models" ) // MockStreamPacketSender is a mock of StreamPacketSender interface. @@ -123,3 +124,42 @@ func (mr *MockStreamRunnerMockRecorder) RunStream(arg0, arg1, arg2 interface{}) mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunStream", reflect.TypeOf((*MockStreamRunner)(nil).RunStream), arg0, arg1, arg2) } + +// MockPluginContextGetter is a mock of PluginContextGetter interface. +type MockPluginContextGetter struct { + ctrl *gomock.Controller + recorder *MockPluginContextGetterMockRecorder +} + +// MockPluginContextGetterMockRecorder is the mock recorder for MockPluginContextGetter. +type MockPluginContextGetterMockRecorder struct { + mock *MockPluginContextGetter +} + +// NewMockPluginContextGetter creates a new mock instance. +func NewMockPluginContextGetter(ctrl *gomock.Controller) *MockPluginContextGetter { + mock := &MockPluginContextGetter{ctrl: ctrl} + mock.recorder = &MockPluginContextGetterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPluginContextGetter) EXPECT() *MockPluginContextGetterMockRecorder { + return m.recorder +} + +// GetPluginContext mocks base method. +func (m *MockPluginContextGetter) GetPluginContext(arg0 *models.SignedInUser, arg1, arg2 string, arg3 bool) (backend.PluginContext, bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPluginContext", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(backend.PluginContext) + ret1, _ := ret[1].(bool) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// GetPluginContext indicates an expected call of GetPluginContext. +func (mr *MockPluginContextGetterMockRecorder) GetPluginContext(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPluginContext", reflect.TypeOf((*MockPluginContextGetter)(nil).GetPluginContext), arg0, arg1, arg2, arg3) +}