Live: stream resubmit on ds change, fix old ds settings in RunStream (#34130)

This commit is contained in:
Alexander Emelin
2021-05-18 21:39:56 +03:00
committed by GitHub
parent 18954aaa7b
commit e799257637
13 changed files with 502 additions and 85 deletions

View File

@ -265,10 +265,10 @@ func (hs *HTTPServer) registerRoutes() {
apiRoute.Group("/datasources", func(datasourceRoute routing.RouteRegister) {
datasourceRoute.Get("/", routing.Wrap(hs.GetDataSources))
datasourceRoute.Post("/", quota("data_source"), bind(models.AddDataSourceCommand{}), routing.Wrap(AddDataSource))
datasourceRoute.Put("/:id", bind(models.UpdateDataSourceCommand{}), routing.Wrap(UpdateDataSource))
datasourceRoute.Delete("/:id", routing.Wrap(DeleteDataSourceById))
datasourceRoute.Delete("/uid/:uid", routing.Wrap(DeleteDataSourceByUID))
datasourceRoute.Delete("/name/:name", routing.Wrap(DeleteDataSourceByName))
datasourceRoute.Put("/:id", bind(models.UpdateDataSourceCommand{}), routing.Wrap(hs.UpdateDataSource))
datasourceRoute.Delete("/:id", routing.Wrap(hs.DeleteDataSourceById))
datasourceRoute.Delete("/uid/:uid", routing.Wrap(hs.DeleteDataSourceByUID))
datasourceRoute.Delete("/name/:name", routing.Wrap(hs.DeleteDataSourceByName))
datasourceRoute.Get("/:id", routing.Wrap(GetDataSourceById))
datasourceRoute.Get("/uid/:uid", routing.Wrap(GetDataSourceByUID))
datasourceRoute.Get("/name/:name", routing.Wrap(GetDataSourceByName))

View File

@ -6,7 +6,6 @@ import (
"fmt"
"sort"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/api/datasource"
"github.com/grafana/grafana/pkg/api/dtos"
"github.com/grafana/grafana/pkg/api/response"
@ -15,6 +14,8 @@ import (
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins/adapters"
"github.com/grafana/grafana/pkg/util"
"github.com/grafana/grafana-plugin-sdk-go/backend"
)
var datasourcesLogger = log.New("datasources")
@ -83,7 +84,7 @@ func GetDataSourceById(c *models.ReqContext) response.Response {
return response.JSON(200, &dtos)
}
func DeleteDataSourceById(c *models.ReqContext) response.Response {
func (hs *HTTPServer) DeleteDataSourceById(c *models.ReqContext) response.Response {
id := c.ParamsInt64(":id")
if id <= 0 {
@ -109,6 +110,8 @@ func DeleteDataSourceById(c *models.ReqContext) response.Response {
return response.Error(500, "Failed to delete datasource", err)
}
hs.Live.HandleDatasourceDelete(c.OrgId, ds.Uid)
return response.Success("Data source deleted")
}
@ -128,7 +131,7 @@ func GetDataSourceByUID(c *models.ReqContext) response.Response {
}
// DELETE /api/datasources/uid/:uid
func DeleteDataSourceByUID(c *models.ReqContext) response.Response {
func (hs *HTTPServer) DeleteDataSourceByUID(c *models.ReqContext) response.Response {
uid := c.Params(":uid")
if uid == "" {
@ -154,10 +157,12 @@ func DeleteDataSourceByUID(c *models.ReqContext) response.Response {
return response.Error(500, "Failed to delete datasource", err)
}
hs.Live.HandleDatasourceDelete(c.OrgId, ds.Uid)
return response.Success("Data source deleted")
}
func DeleteDataSourceByName(c *models.ReqContext) response.Response {
func (hs *HTTPServer) DeleteDataSourceByName(c *models.ReqContext) response.Response {
name := c.Params(":name")
if name == "" {
@ -182,6 +187,8 @@ func DeleteDataSourceByName(c *models.ReqContext) response.Response {
return response.Error(500, "Failed to delete datasource", err)
}
hs.Live.HandleDatasourceDelete(c.OrgId, getCmd.Result.Uid)
return response.JSON(200, util.DynMap{
"message": "Data source deleted",
"id": getCmd.Result.Id,
@ -224,7 +231,7 @@ func AddDataSource(c *models.ReqContext, cmd models.AddDataSourceCommand) respon
})
}
func UpdateDataSource(c *models.ReqContext, cmd models.UpdateDataSourceCommand) response.Response {
func (hs *HTTPServer) UpdateDataSource(c *models.ReqContext, cmd models.UpdateDataSourceCommand) response.Response {
datasourcesLogger.Debug("Received command to update data source", "url", cmd.Url)
cmd.OrgId = c.OrgId
cmd.Id = c.ParamsInt64(":id")
@ -254,16 +261,18 @@ func UpdateDataSource(c *models.ReqContext, cmd models.UpdateDataSourceCommand)
if errors.Is(err, models.ErrDataSourceNotFound) {
return response.Error(404, "Data source not found", nil)
}
return response.Error(500, "Failed to query datasources", err)
return response.Error(500, "Failed to query datasource", err)
}
dtos := convertModelToDtos(query.Result)
datasourceDTO := convertModelToDtos(query.Result)
hs.Live.HandleDatasourceUpdate(c.OrgId, datasourceDTO.UID)
return response.JSON(200, util.DynMap{
"message": "Datasource updated",
"id": cmd.Id,
"name": cmd.Name,
"datasource": dtos,
"datasource": datasourceDTO,
})
}

View File

@ -55,7 +55,13 @@ func TestDataSourcesProxy_userLoggedIn(t *testing.T) {
loggedInUserScenario(t, "Should be able to save a data source when calling DELETE on non-existing",
"/api/datasources/name/12345", func(sc *scenarioContext) {
sc.handlerFunc = DeleteDataSourceByName
// handler func being tested
hs := &HTTPServer{
Bus: bus.GetBus(),
Cfg: setting.NewCfg(),
PluginManager: &fakePluginManager{},
}
sc.handlerFunc = hs.DeleteDataSourceByName
sc.fakeReqWithParams("DELETE", sc.url, map[string]string{}).exec()
assert.Equal(t, 404, sc.resp.Code)
})

View File

@ -313,7 +313,7 @@ func (hs *HTTPServer) GetPluginAssets(c *models.ReqContext) {
func (hs *HTTPServer) CheckHealth(c *models.ReqContext) response.Response {
pluginID := c.Params("pluginId")
pCtx, found, err := hs.PluginContextProvider.Get(pluginID, "", c.SignedInUser)
pCtx, found, err := hs.PluginContextProvider.Get(pluginID, "", c.SignedInUser, false)
if err != nil {
return response.Error(500, "Failed to get plugin settings", err)
}
@ -355,7 +355,7 @@ func (hs *HTTPServer) CheckHealth(c *models.ReqContext) response.Response {
func (hs *HTTPServer) CallResource(c *models.ReqContext) {
pluginID := c.Params("pluginId")
pCtx, found, err := hs.PluginContextProvider.Get(pluginID, "", c.SignedInUser)
pCtx, found, err := hs.PluginContextProvider.Get(pluginID, "", c.SignedInUser, false)
if err != nil {
c.JsonApiErr(500, "Failed to get plugin settings", err)
return

View File

@ -17,6 +17,7 @@ func ModelToInstanceSettings(ds *models.DataSource) (*backend.DataSourceInstance
ID: ds.Id,
Name: ds.Name,
URL: ds.Url,
UID: ds.Uid,
Database: ds.Database,
User: ds.User,
BasicAuthEnabled: ds.BasicAuth,

View File

@ -3,6 +3,7 @@ package grpcplugin
import (
"context"
"errors"
"fmt"
"io"
"github.com/grafana/grafana-plugin-sdk-go/backend"
@ -216,7 +217,7 @@ func (c *clientV2) RunStream(ctx context.Context, req *backend.RunStreamRequest,
if errors.Is(err, io.EOF) {
return nil
}
return errutil.Wrap("failed to receive call resource response", err)
return fmt.Errorf("error running stream: %w", err)
}
if err := sender.Send(backend.FromProto().StreamPacket(protoResp)); err != nil {
return err

View File

@ -42,7 +42,7 @@ func (p *Provider) Init() error {
// Get allows getting plugin context by its id. If datasourceUID is not empty string
// then PluginContext.DataSourceInstanceSettings will be resolved and appended to
// returned context.
func (p *Provider) Get(pluginID string, datasourceUID string, user *models.SignedInUser) (backend.PluginContext, bool, error) {
func (p *Provider) Get(pluginID string, datasourceUID string, user *models.SignedInUser, skipCache bool) (backend.PluginContext, bool, error) {
pc := backend.PluginContext{}
plugin := p.PluginManager.GetPlugin(pluginID)
if plugin == nil {
@ -81,7 +81,7 @@ func (p *Provider) Get(pluginID string, datasourceUID string, user *models.Signe
}
if datasourceUID != "" {
ds, err := p.DatasourceCache.GetDatasourceByUID(datasourceUID, user, false)
ds, err := p.DatasourceCache.GetDatasourceByUID(datasourceUID, user, skipCache)
if err != nil {
return pc, false, errutil.Wrap("Failed to get datasource", err)
}

View File

@ -3,17 +3,18 @@ package features
import (
"context"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/live/orgchannel"
"github.com/grafana/grafana/pkg/services/live/runstream"
"github.com/centrifugal/centrifuge"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/models"
)
//go:generate mockgen -destination=plugin_mock.go -package=features github.com/grafana/grafana/pkg/services/live/features PluginContextGetter
type PluginContextGetter interface {
GetPluginContext(user *models.SignedInUser, pluginID string, datasourceUID string) (backend.PluginContext, bool, error)
GetPluginContext(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error)
}
// PluginRunner can handle streaming operations for channels belonging to plugins.
@ -60,7 +61,7 @@ type PluginPathRunner struct {
// OnSubscribe passes control to a plugin.
func (r *PluginPathRunner) OnSubscribe(ctx context.Context, user *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
pCtx, found, err := r.pluginContextGetter.GetPluginContext(user, r.pluginID, r.datasourceUID)
pCtx, found, err := r.pluginContextGetter.GetPluginContext(user, r.pluginID, r.datasourceUID, false)
if err != nil {
logger.Error("Get plugin context error", "error", err, "path", r.path)
return models.SubscribeReply{}, 0, err
@ -81,7 +82,7 @@ func (r *PluginPathRunner) OnSubscribe(ctx context.Context, user *models.SignedI
return models.SubscribeReply{}, resp.Status, nil
}
submitResult, err := r.runStreamManager.SubmitStream(ctx, user.OrgId, e.Channel, r.path, pCtx, r.handler)
submitResult, err := r.runStreamManager.SubmitStream(ctx, user, orgchannel.PrependOrgID(user.OrgId, e.Channel), r.path, pCtx, r.handler, false)
if err != nil {
logger.Error("Error submitting stream to manager", "error", err, "path", r.path)
return models.SubscribeReply{}, 0, centrifuge.ErrorInternal
@ -89,7 +90,7 @@ func (r *PluginPathRunner) OnSubscribe(ctx context.Context, user *models.SignedI
if submitResult.StreamExists {
logger.Debug("Skip running new stream (already exists)", "path", r.path)
} else {
logger.Debug("Running a new keepalive stream", "path", r.path)
logger.Debug("Running a new unidirectional stream", "path", r.path)
}
reply := models.SubscribeReply{
@ -101,7 +102,7 @@ func (r *PluginPathRunner) OnSubscribe(ctx context.Context, user *models.SignedI
// OnPublish passes control to a plugin.
func (r *PluginPathRunner) OnPublish(ctx context.Context, user *models.SignedInUser, e models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
pCtx, found, err := r.pluginContextGetter.GetPluginContext(user, r.pluginID, r.datasourceUID)
pCtx, found, err := r.pluginContextGetter.GetPluginContext(user, r.pluginID, r.datasourceUID, false)
if err != nil {
logger.Error("Get plugin context error", "error", err, "path", r.path)
return models.PublishReply{}, 0, err

View File

@ -156,7 +156,7 @@ func (g *GrafanaLive) Init() error {
g.contextGetter = newPluginContextGetter(g.PluginContextProvider)
packetSender := newPluginPacketSender(node)
presenceGetter := newPluginPresenceGetter(node)
g.runStreamManager = runstream.NewManager(packetSender, presenceGetter)
g.runStreamManager = runstream.NewManager(packetSender, presenceGetter, g.contextGetter)
// Initialize the main features
dash := &features.DashboardHandler{
@ -283,6 +283,26 @@ func runConcurrentlyIfNeeded(ctx context.Context, semaphore chan struct{}, fn fu
return nil
}
func (g *GrafanaLive) HandleDatasourceDelete(orgID int64, dsUID string) {
if g.runStreamManager == nil {
return
}
err := g.runStreamManager.HandleDatasourceDelete(orgID, dsUID)
if err != nil {
logger.Error("Error handling datasource delete", "error", err)
}
}
func (g *GrafanaLive) HandleDatasourceUpdate(orgID int64, dsUID string) {
if g.runStreamManager == nil {
return
}
err := g.runStreamManager.HandleDatasourceUpdate(orgID, dsUID)
if err != nil {
logger.Error("Error handling datasource update", "error", err)
}
}
func (g *GrafanaLive) handleOnSubscribe(client *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) {
logger.Debug("Client wants to subscribe", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)

View File

@ -52,6 +52,6 @@ func newPluginContextGetter(pluginContextProvider *plugincontext.Provider) *plug
}
}
func (g *pluginContextGetter) GetPluginContext(user *models.SignedInUser, pluginID string, datasourceUID string) (backend.PluginContext, bool, error) {
return g.PluginContextProvider.Get(pluginID, datasourceUID, user)
func (g *pluginContextGetter) GetPluginContext(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
return g.PluginContextProvider.Get(pluginID, datasourceUID, user, skipCache)
}

View File

@ -3,12 +3,13 @@ package runstream
import (
"context"
"errors"
"fmt"
"math"
"sync"
"time"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/live/orgchannel"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana-plugin-sdk-go/backend"
)
@ -17,12 +18,16 @@ var (
logger = log.New("live.runstream")
)
//go:generate mockgen -destination=mock.go -package=runstream github.com/grafana/grafana/pkg/services/live/runstream StreamPacketSender,PresenceGetter,StreamRunner
//go:generate mockgen -destination=mock.go -package=runstream github.com/grafana/grafana/pkg/services/live/runstream StreamPacketSender,PresenceGetter,StreamRunner,PluginContextGetter
type StreamPacketSender interface {
Send(channel string, packet *backend.StreamPacket) error
}
type PluginContextGetter interface {
GetPluginContext(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error)
}
type PresenceGetter interface {
GetNumSubscribers(channel string) (int, error)
}
@ -50,13 +55,17 @@ func (p *streamSender) Send(packet *backend.StreamPacket) error {
// Manager manages streams from Grafana to plugins (i.e. RunStream method).
type Manager struct {
mu sync.RWMutex
streams map[string]chan struct{}
baseCtx context.Context
streams map[string]streamContext
datasourceStreams map[string]map[string]struct{}
presenceGetter PresenceGetter
pluginContextGetter PluginContextGetter
packetSender StreamPacketSender
registerCh chan submitRequest
closedCh chan struct{}
checkInterval time.Duration
maxChecks int
datasourceCheckInterval time.Duration
}
// ManagerOption modifies Manager behavior (used for tests for example).
@ -72,19 +81,23 @@ func WithCheckConfig(interval time.Duration, maxChecks int) ManagerOption {
const (
defaultCheckInterval = 5 * time.Second
defaultDatasourceCheckInterval = 60 * time.Second
defaultMaxChecks = 3
)
// NewManager creates new Manager.
func NewManager(packetSender StreamPacketSender, presenceGetter PresenceGetter, opts ...ManagerOption) *Manager {
func NewManager(packetSender StreamPacketSender, presenceGetter PresenceGetter, pluginContextGetter PluginContextGetter, opts ...ManagerOption) *Manager {
sm := &Manager{
streams: make(map[string]chan struct{}),
streams: make(map[string]streamContext),
datasourceStreams: map[string]map[string]struct{}{},
packetSender: packetSender,
presenceGetter: presenceGetter,
pluginContextGetter: pluginContextGetter,
registerCh: make(chan submitRequest),
closedCh: make(chan struct{}),
checkInterval: defaultCheckInterval,
maxChecks: defaultMaxChecks,
datasourceCheckInterval: defaultDatasourceCheckInterval,
}
for _, opt := range opts {
opt(sm)
@ -92,25 +105,109 @@ func NewManager(packetSender StreamPacketSender, presenceGetter PresenceGetter,
return sm
}
func (s *Manager) HandleDatasourceDelete(orgID int64, dsUID string) error {
return s.handleDatasourceEvent(orgID, dsUID, false)
}
func (s *Manager) HandleDatasourceUpdate(orgID int64, dsUID string) error {
return s.handleDatasourceEvent(orgID, dsUID, true)
}
func (s *Manager) handleDatasourceEvent(orgID int64, dsUID string, resubmit bool) error {
dsKey := datasourceKey(orgID, dsUID)
s.mu.RLock()
dsStreams, ok := s.datasourceStreams[dsKey]
if !ok {
s.mu.RUnlock()
return nil
}
var resubmitRequests []streamRequest
var waitChannels []chan struct{}
for channel := range dsStreams {
streamCtx, ok := s.streams[channel]
if !ok {
continue
}
streamCtx.cancelFn()
waitChannels = append(waitChannels, streamCtx.CloseCh)
resubmitRequests = append(resubmitRequests, streamCtx.streamRequest)
}
s.mu.RUnlock()
// Wait for all streams to stop.
for _, ch := range waitChannels {
<-ch
}
if resubmit {
// Re-submit streams.
for _, sr := range resubmitRequests {
_, err := s.SubmitStream(s.baseCtx, sr.user, sr.Channel, sr.Path, sr.PluginContext, sr.StreamRunner, true)
if err != nil {
// Log error but do not prevent execution of caller routine.
logger.Error("Error re-submitting stream", "path", sr.Path, "error", err)
}
}
}
return nil
}
func datasourceKey(orgID int64, dsUID string) string {
return fmt.Sprintf("%d_%s", orgID, dsUID)
}
func (s *Manager) stopStream(sr streamRequest, cancelFn func()) {
s.mu.Lock()
defer s.mu.Unlock()
closeCh, ok := s.streams[sr.Channel]
streamCtx, ok := s.streams[sr.Channel]
if !ok {
return
}
closeCh := streamCtx.CloseCh
delete(s.streams, sr.Channel)
if sr.PluginContext.DataSourceInstanceSettings != nil {
dsUID := sr.PluginContext.DataSourceInstanceSettings.UID
dsKey := datasourceKey(sr.PluginContext.OrgID, dsUID)
delete(s.datasourceStreams[dsKey], sr.Channel)
}
cancelFn()
close(closeCh)
}
func (s *Manager) watchStream(ctx context.Context, cancelFn func(), sr streamRequest) {
numNoSubscribersChecks := 0
presenceTicker := time.NewTicker(s.checkInterval)
defer presenceTicker.Stop()
datasourceTicker := time.NewTicker(s.datasourceCheckInterval)
defer datasourceTicker.Stop()
for {
select {
case <-ctx.Done():
return
case <-time.After(s.checkInterval):
case <-datasourceTicker.C:
if sr.PluginContext.DataSourceInstanceSettings != nil {
dsUID := sr.PluginContext.DataSourceInstanceSettings.UID
pCtx, ok, err := s.pluginContextGetter.GetPluginContext(sr.user, sr.PluginContext.PluginID, dsUID, false)
if err != nil {
logger.Error("Error getting datasource context", "channel", sr.Channel, "path", sr.Path, "error", err)
continue
}
if !ok {
logger.Debug("Datasource not found, stop stream", "channel", sr.Channel, "path", sr.Path)
return
}
if pCtx.DataSourceInstanceSettings.Updated != sr.PluginContext.DataSourceInstanceSettings.Updated {
logger.Debug("Datasource changed, re-establish stream", "channel", sr.Channel, "path", sr.Path)
err := s.HandleDatasourceUpdate(pCtx.OrgID, dsUID)
if err != nil {
logger.Error("Error re-establishing stream", "channel", sr.Channel, "path", sr.Path, "error", err)
continue
}
return
}
}
case <-presenceTicker.C:
numSubscribers, err := s.presenceGetter.GetNumSubscribers(sr.Channel)
if err != nil {
logger.Error("Error checking num subscribers", "channel", sr.Channel, "path", sr.Path)
@ -148,31 +245,24 @@ func getDelay(numErrors int) time.Duration {
// run stream until context canceled or stream finished without an error.
func (s *Manager) runStream(ctx context.Context, cancelFn func(), sr streamRequest) {
defer func() { s.stopStream(sr, cancelFn) }()
var numFastErrors int
var delay time.Duration
var isReconnect bool
startTime := time.Now()
for {
select {
case <-ctx.Done():
return
default:
}
startTime := time.Now()
err := sr.StreamRunner.RunStream(
ctx,
&backend.RunStreamRequest{
PluginContext: sr.PluginContext,
Path: sr.Path,
},
newStreamSender(sr.Channel, s.packetSender),
)
if err != nil {
if errors.Is(ctx.Err(), context.Canceled) {
logger.Debug("Stream cleanly finished", "path", sr.Path)
return
}
pluginCtx := sr.PluginContext
if isReconnect {
// Best effort to cool down re-establishment process. We don't have a
// nice way to understand whether we really need to wait here - so relying
// on RunStream duration time.
// on duration time of running a stream.
if time.Since(startTime) < streamDurationThreshold {
if delay < maxDelay {
// Due to not calling getDelay after we have delay larger than maxDelay
@ -186,29 +276,85 @@ func (s *Manager) runStream(ctx context.Context, cancelFn func(), sr streamReque
delay = 0
numFastErrors = 0
}
select {
case <-ctx.Done():
return
case <-time.After(delay):
}
startTime = time.Now()
// Resolve new plugin context as it could be modified since last call.
// We are using the same user here which initiated stream originally.
var datasourceUID string
if pluginCtx.DataSourceInstanceSettings != nil {
datasourceUID = pluginCtx.DataSourceInstanceSettings.UID
}
newPluginCtx, ok, err := s.pluginContextGetter.GetPluginContext(sr.user, pluginCtx.PluginID, datasourceUID, false)
if err != nil {
logger.Error("Error getting plugin context", "path", sr.Path, "error", err)
isReconnect = true
continue
}
if !ok {
logger.Info("No plugin context found, stopping stream", "path", sr.Path)
return
}
pluginCtx = newPluginCtx
}
err := sr.StreamRunner.RunStream(
ctx,
&backend.RunStreamRequest{
PluginContext: pluginCtx,
Path: sr.Path,
},
newStreamSender(sr.Channel, s.packetSender),
)
if err != nil {
if errors.Is(ctx.Err(), context.Canceled) {
logger.Debug("Stream cleanly finished", "path", sr.Path)
return
}
logger.Error("Error running stream, re-establishing", "path", sr.Path, "error", err, "wait", delay)
time.Sleep(delay)
isReconnect = true
continue
}
logger.Debug("Stream finished without error, stopping it", "path", sr.Path)
s.stopStream(sr, cancelFn)
return
}
}
var errClosed = errors.New("stream manager closed")
type streamContext struct {
CloseCh chan struct{}
cancelFn func()
streamRequest streamRequest
}
func (s *Manager) registerStream(ctx context.Context, sr submitRequest) {
s.mu.Lock()
if closeCh, ok := s.streams[sr.streamRequest.Channel]; ok {
if streamCtx, ok := s.streams[sr.streamRequest.Channel]; ok {
s.mu.Unlock()
sr.responseCh <- submitResponse{Result: submitResult{StreamExists: true, CloseNotify: closeCh}}
sr.responseCh <- submitResponse{Result: submitResult{StreamExists: true, CloseNotify: streamCtx.CloseCh}}
return
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
closeCh := make(chan struct{})
s.streams[sr.streamRequest.Channel] = closeCh
s.streams[sr.streamRequest.Channel] = streamContext{
CloseCh: closeCh,
cancelFn: cancel,
streamRequest: sr.streamRequest,
}
if sr.streamRequest.PluginContext.DataSourceInstanceSettings != nil {
dsUID := sr.streamRequest.PluginContext.DataSourceInstanceSettings.UID
dsKey := datasourceKey(sr.streamRequest.PluginContext.OrgID, dsUID)
if _, ok := s.datasourceStreams[dsKey]; !ok {
s.datasourceStreams[dsKey] = map[string]struct{}{}
}
s.datasourceStreams[dsKey][sr.streamRequest.Channel] = struct{}{}
}
s.mu.Unlock()
sr.responseCh <- submitResponse{Result: submitResult{StreamExists: false, CloseNotify: closeCh}}
go s.watchStream(ctx, cancel, sr.streamRequest)
@ -217,6 +363,7 @@ func (s *Manager) registerStream(ctx context.Context, sr submitRequest) {
// Run Manager till context canceled.
func (s *Manager) Run(ctx context.Context) error {
s.baseCtx = ctx
for {
select {
case sr := <-s.registerCh:
@ -231,6 +378,7 @@ func (s *Manager) Run(ctx context.Context) error {
type streamRequest struct {
Channel string
Path string
user *models.SignedInUser
PluginContext backend.PluginContext
StreamRunner StreamRunner
}
@ -252,13 +400,32 @@ type submitResponse struct {
Result submitResult
}
var errDatasourceNotFound = errors.New("datasource not found")
// SubmitStream submits stream handler in Manager to manage.
// The stream will be opened and kept till channel has active subscribers.
func (s *Manager) SubmitStream(ctx context.Context, orgID int64, channel string, path string, pCtx backend.PluginContext, streamRunner StreamRunner) (*submitResult, error) {
func (s *Manager) SubmitStream(ctx context.Context, user *models.SignedInUser, channel string, path string, pCtx backend.PluginContext, streamRunner StreamRunner, isResubmit bool) (*submitResult, error) {
if isResubmit {
// Resolve new plugin context as it could be modified since last call.
var datasourceUID string
if pCtx.DataSourceInstanceSettings != nil {
datasourceUID = pCtx.DataSourceInstanceSettings.UID
}
newPluginCtx, ok, err := s.pluginContextGetter.GetPluginContext(user, pCtx.PluginID, datasourceUID, false)
if err != nil {
return nil, err
}
if !ok {
return nil, errDatasourceNotFound
}
pCtx = newPluginCtx
}
req := submitRequest{
responseCh: make(chan submitResponse, 1),
streamRequest: streamRequest{
Channel: orgchannel.PrependOrgID(orgID, channel),
user: user,
Channel: channel,
Path: path,
PluginContext: pCtx,
StreamRunner: streamRunner,

View File

@ -6,14 +6,16 @@ import (
"testing"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/models"
"github.com/golang/mock/gomock"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/stretchr/testify/require"
)
// wait until channel closed with timeout.
func waitWithTimeout(tb testing.TB, ch chan struct{}, timeout time.Duration) {
tb.Helper()
select {
case <-ch:
case <-time.After(timeout):
@ -27,8 +29,9 @@ func TestStreamManager_Run(t *testing.T) {
mockPacketSender := NewMockStreamPacketSender(mockCtrl)
mockPresenceGetter := NewMockPresenceGetter(mockCtrl)
mockContextGetter := NewMockPluginContextGetter(mockCtrl)
manager := NewManager(mockPacketSender, mockPresenceGetter)
manager := NewManager(mockPacketSender, mockPresenceGetter, mockContextGetter)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -47,8 +50,9 @@ func TestStreamManager_SubmitStream_Send(t *testing.T) {
mockPacketSender := NewMockStreamPacketSender(mockCtrl)
mockPresenceGetter := NewMockPresenceGetter(mockCtrl)
mockContextGetter := NewMockPluginContextGetter(mockCtrl)
manager := NewManager(mockPacketSender, mockPresenceGetter)
manager := NewManager(mockPacketSender, mockPresenceGetter, mockContextGetter)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -59,6 +63,22 @@ func TestStreamManager_SubmitStream_Send(t *testing.T) {
startedCh := make(chan struct{})
doneCh := make(chan struct{})
testPluginContext := backend.PluginContext{
OrgID: 1,
PluginID: "test-plugin",
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{
UID: "xyz",
},
}
mockContextGetter.EXPECT().GetPluginContext(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
require.Equal(t, int64(2), user.UserId)
require.Equal(t, int64(1), user.OrgId)
require.Equal(t, testPluginContext.PluginID, pluginID)
require.Equal(t, testPluginContext.DataSourceInstanceSettings.UID, datasourceUID)
return testPluginContext, true, nil
}).Times(0)
mockPacketSender.EXPECT().Send("1/test", gomock.Any()).Times(1)
mockStreamRunner := NewMockStreamRunner(mockCtrl)
@ -76,12 +96,12 @@ func TestStreamManager_SubmitStream_Send(t *testing.T) {
return ctx.Err()
}).Times(1)
result, err := manager.SubmitStream(context.Background(), 1, "test", "test", backend.PluginContext{}, mockStreamRunner)
result, err := manager.SubmitStream(context.Background(), &models.SignedInUser{UserId: 2, OrgId: 1}, "1/test", "test", testPluginContext, mockStreamRunner, false)
require.NoError(t, err)
require.False(t, result.StreamExists)
// try submit the same.
result, err = manager.SubmitStream(context.Background(), 1, "test", "test", backend.PluginContext{}, mockStreamRunner)
result, err = manager.SubmitStream(context.Background(), &models.SignedInUser{UserId: 2, OrgId: 1}, "1/test", "test", backend.PluginContext{}, mockStreamRunner, false)
require.NoError(t, err)
require.True(t, result.StreamExists)
@ -97,8 +117,9 @@ func TestStreamManager_SubmitStream_DifferentOrgID(t *testing.T) {
mockPacketSender := NewMockStreamPacketSender(mockCtrl)
mockPresenceGetter := NewMockPresenceGetter(mockCtrl)
mockContextGetter := NewMockPluginContextGetter(mockCtrl)
manager := NewManager(mockPacketSender, mockPresenceGetter)
manager := NewManager(mockPacketSender, mockPresenceGetter, mockContextGetter)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -114,6 +135,10 @@ func TestStreamManager_SubmitStream_DifferentOrgID(t *testing.T) {
mockPacketSender.EXPECT().Send("1/test", gomock.Any()).Times(1)
mockPacketSender.EXPECT().Send("2/test", gomock.Any()).Times(1)
mockContextGetter.EXPECT().GetPluginContext(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
return backend.PluginContext{}, true, nil
}).Times(0)
mockStreamRunner1 := NewMockStreamRunner(mockCtrl)
mockStreamRunner1.EXPECT().RunStream(
gomock.Any(), gomock.Any(), gomock.Any(),
@ -144,12 +169,12 @@ func TestStreamManager_SubmitStream_DifferentOrgID(t *testing.T) {
return ctx.Err()
}).Times(1)
result, err := manager.SubmitStream(context.Background(), 1, "test", "test", backend.PluginContext{}, mockStreamRunner1)
result, err := manager.SubmitStream(context.Background(), &models.SignedInUser{UserId: 2, OrgId: 1}, "1/test", "test", backend.PluginContext{}, mockStreamRunner1, false)
require.NoError(t, err)
require.False(t, result.StreamExists)
// try submit the same channel but different orgID.
result, err = manager.SubmitStream(context.Background(), 2, "test", "test", backend.PluginContext{}, mockStreamRunner2)
result, err = manager.SubmitStream(context.Background(), &models.SignedInUser{UserId: 2, OrgId: 2}, "2/test", "test", backend.PluginContext{}, mockStreamRunner2, false)
require.NoError(t, err)
require.False(t, result.StreamExists)
@ -167,11 +192,13 @@ func TestStreamManager_SubmitStream_CloseNoSubscribers(t *testing.T) {
mockPacketSender := NewMockStreamPacketSender(mockCtrl)
mockPresenceGetter := NewMockPresenceGetter(mockCtrl)
mockContextGetter := NewMockPluginContextGetter(mockCtrl)
// Create manager with very fast num subscribers checks.
manager := NewManager(
mockPacketSender,
mockPresenceGetter,
mockContextGetter,
WithCheckConfig(10*time.Millisecond, 3),
)
@ -184,6 +211,10 @@ func TestStreamManager_SubmitStream_CloseNoSubscribers(t *testing.T) {
startedCh := make(chan struct{})
doneCh := make(chan struct{})
mockContextGetter.EXPECT().GetPluginContext(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
return backend.PluginContext{}, true, nil
}).Times(0)
mockPresenceGetter.EXPECT().GetNumSubscribers("1/test").Return(0, nil).Times(3)
mockStreamRunner := NewMockStreamRunner(mockCtrl)
@ -194,7 +225,7 @@ func TestStreamManager_SubmitStream_CloseNoSubscribers(t *testing.T) {
return ctx.Err()
}).Times(1)
_, err := manager.SubmitStream(context.Background(), 1, "test", "test", backend.PluginContext{}, mockStreamRunner)
_, err := manager.SubmitStream(context.Background(), &models.SignedInUser{UserId: 2, OrgId: 1}, "1/test", "test", backend.PluginContext{}, mockStreamRunner, false)
require.NoError(t, err)
waitWithTimeout(t, startedCh, time.Second)
@ -208,8 +239,9 @@ func TestStreamManager_SubmitStream_ErrorRestartsRunStream(t *testing.T) {
mockPacketSender := NewMockStreamPacketSender(mockCtrl)
mockPresenceGetter := NewMockPresenceGetter(mockCtrl)
mockContextGetter := NewMockPluginContextGetter(mockCtrl)
manager := NewManager(mockPacketSender, mockPresenceGetter)
manager := NewManager(mockPacketSender, mockPresenceGetter, mockContextGetter)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -220,6 +252,22 @@ func TestStreamManager_SubmitStream_ErrorRestartsRunStream(t *testing.T) {
numErrors := 3
currentErrors := 0
testPluginContext := backend.PluginContext{
OrgID: 1,
PluginID: "test-plugin",
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{
UID: "xyz",
},
}
mockContextGetter.EXPECT().GetPluginContext(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
require.Equal(t, int64(2), user.UserId)
require.Equal(t, int64(1), user.OrgId)
require.Equal(t, testPluginContext.PluginID, pluginID)
require.Equal(t, testPluginContext.DataSourceInstanceSettings.UID, datasourceUID)
return testPluginContext, true, nil
}).Times(numErrors)
mockStreamRunner := NewMockStreamRunner(mockCtrl)
mockStreamRunner.EXPECT().RunStream(
gomock.Any(), gomock.Any(), gomock.Any(),
@ -231,7 +279,7 @@ func TestStreamManager_SubmitStream_ErrorRestartsRunStream(t *testing.T) {
return errors.New("boom")
}).Times(numErrors + 1)
result, err := manager.SubmitStream(context.Background(), 1, "test", "test", backend.PluginContext{}, mockStreamRunner)
result, err := manager.SubmitStream(context.Background(), &models.SignedInUser{UserId: 2, OrgId: 1}, "test", "test", testPluginContext, mockStreamRunner, false)
require.NoError(t, err)
require.False(t, result.StreamExists)
@ -244,8 +292,9 @@ func TestStreamManager_SubmitStream_NilErrorStopsRunStream(t *testing.T) {
mockPacketSender := NewMockStreamPacketSender(mockCtrl)
mockPresenceGetter := NewMockPresenceGetter(mockCtrl)
mockContextGetter := NewMockPluginContextGetter(mockCtrl)
manager := NewManager(mockPacketSender, mockPresenceGetter)
manager := NewManager(mockPacketSender, mockPresenceGetter, mockContextGetter)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -253,6 +302,10 @@ func TestStreamManager_SubmitStream_NilErrorStopsRunStream(t *testing.T) {
_ = manager.Run(ctx)
}()
mockContextGetter.EXPECT().GetPluginContext(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
return backend.PluginContext{}, true, nil
}).Times(0)
mockStreamRunner := NewMockStreamRunner(mockCtrl)
mockStreamRunner.EXPECT().RunStream(
gomock.Any(), gomock.Any(), gomock.Any(),
@ -260,8 +313,127 @@ func TestStreamManager_SubmitStream_NilErrorStopsRunStream(t *testing.T) {
return nil
}).Times(1)
result, err := manager.SubmitStream(context.Background(), 1, "test", "test", backend.PluginContext{}, mockStreamRunner)
result, err := manager.SubmitStream(context.Background(), &models.SignedInUser{UserId: 2, OrgId: 1}, "test", "test", backend.PluginContext{}, mockStreamRunner, false)
require.NoError(t, err)
require.False(t, result.StreamExists)
waitWithTimeout(t, result.CloseNotify, time.Second)
}
func TestStreamManager_HandleDatasourceUpdate(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mockPacketSender := NewMockStreamPacketSender(mockCtrl)
mockPresenceGetter := NewMockPresenceGetter(mockCtrl)
mockContextGetter := NewMockPluginContextGetter(mockCtrl)
manager := NewManager(mockPacketSender, mockPresenceGetter, mockContextGetter)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
_ = manager.Run(ctx)
}()
testPluginContext := backend.PluginContext{
OrgID: 1,
PluginID: "test-plugin",
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{
UID: "xyz",
},
}
mockContextGetter.EXPECT().GetPluginContext(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
require.Equal(t, int64(2), user.UserId)
require.Equal(t, int64(1), user.OrgId)
require.Equal(t, testPluginContext.PluginID, pluginID)
require.Equal(t, testPluginContext.DataSourceInstanceSettings.UID, datasourceUID)
return testPluginContext, true, nil
}).Times(1)
isFirstCall := true
doneCh1 := make(chan struct{})
doneCh2 := make(chan struct{})
mockStreamRunner := NewMockStreamRunner(mockCtrl)
mockStreamRunner.EXPECT().RunStream(
gomock.Any(), gomock.Any(), gomock.Any(),
).DoAndReturn(func(ctx context.Context, req *backend.RunStreamRequest, sender backend.StreamPacketSender) error {
if isFirstCall {
// first RunStream will wait till context done.
isFirstCall = false
close(doneCh1)
<-ctx.Done()
return ctx.Err()
}
// second RunStream finishes immediately since we are waiting for it below.
close(doneCh2)
return nil
}).Times(2)
result, err := manager.SubmitStream(context.Background(), &models.SignedInUser{UserId: 2, OrgId: 1}, "test", "test", testPluginContext, mockStreamRunner, false)
require.NoError(t, err)
require.False(t, result.StreamExists)
waitWithTimeout(t, doneCh1, time.Second)
err = manager.HandleDatasourceUpdate(1, "xyz")
require.NoError(t, err)
waitWithTimeout(t, result.CloseNotify, time.Second)
waitWithTimeout(t, doneCh2, time.Second)
}
func TestStreamManager_HandleDatasourceDelete(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mockPacketSender := NewMockStreamPacketSender(mockCtrl)
mockPresenceGetter := NewMockPresenceGetter(mockCtrl)
mockContextGetter := NewMockPluginContextGetter(mockCtrl)
manager := NewManager(mockPacketSender, mockPresenceGetter, mockContextGetter)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
_ = manager.Run(ctx)
}()
testPluginContext := backend.PluginContext{
OrgID: 1,
PluginID: "test-plugin",
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{
UID: "xyz",
},
}
mockContextGetter.EXPECT().GetPluginContext(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
require.Equal(t, int64(2), user.UserId)
require.Equal(t, int64(1), user.OrgId)
require.Equal(t, testPluginContext.PluginID, pluginID)
require.Equal(t, testPluginContext.DataSourceInstanceSettings.UID, datasourceUID)
return testPluginContext, true, nil
}).Times(0)
doneCh := make(chan struct{})
mockStreamRunner := NewMockStreamRunner(mockCtrl)
mockStreamRunner.EXPECT().RunStream(
gomock.Any(), gomock.Any(), gomock.Any(),
).DoAndReturn(func(ctx context.Context, req *backend.RunStreamRequest, sender backend.StreamPacketSender) error {
close(doneCh)
<-ctx.Done()
return ctx.Err()
}).Times(1)
result, err := manager.SubmitStream(context.Background(), &models.SignedInUser{UserId: 2, OrgId: 1}, "test", "test", testPluginContext, mockStreamRunner, false)
require.NoError(t, err)
require.False(t, result.StreamExists)
waitWithTimeout(t, doneCh, time.Second)
err = manager.HandleDatasourceDelete(1, "xyz")
require.NoError(t, err)
waitWithTimeout(t, result.CloseNotify, time.Second)
}

View File

@ -1,5 +1,5 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/grafana/grafana/pkg/services/live/runstream (interfaces: StreamPacketSender,PresenceGetter,StreamRunner)
// Source: github.com/grafana/grafana/pkg/services/live/runstream (interfaces: StreamPacketSender,PresenceGetter,StreamRunner,PluginContextGetter)
// Package runstream is a generated GoMock package.
package runstream
@ -10,6 +10,7 @@ import (
gomock "github.com/golang/mock/gomock"
backend "github.com/grafana/grafana-plugin-sdk-go/backend"
models "github.com/grafana/grafana/pkg/models"
)
// MockStreamPacketSender is a mock of StreamPacketSender interface.
@ -123,3 +124,42 @@ func (mr *MockStreamRunnerMockRecorder) RunStream(arg0, arg1, arg2 interface{})
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunStream", reflect.TypeOf((*MockStreamRunner)(nil).RunStream), arg0, arg1, arg2)
}
// MockPluginContextGetter is a mock of PluginContextGetter interface.
type MockPluginContextGetter struct {
ctrl *gomock.Controller
recorder *MockPluginContextGetterMockRecorder
}
// MockPluginContextGetterMockRecorder is the mock recorder for MockPluginContextGetter.
type MockPluginContextGetterMockRecorder struct {
mock *MockPluginContextGetter
}
// NewMockPluginContextGetter creates a new mock instance.
func NewMockPluginContextGetter(ctrl *gomock.Controller) *MockPluginContextGetter {
mock := &MockPluginContextGetter{ctrl: ctrl}
mock.recorder = &MockPluginContextGetterMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockPluginContextGetter) EXPECT() *MockPluginContextGetterMockRecorder {
return m.recorder
}
// GetPluginContext mocks base method.
func (m *MockPluginContextGetter) GetPluginContext(arg0 *models.SignedInUser, arg1, arg2 string, arg3 bool) (backend.PluginContext, bool, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetPluginContext", arg0, arg1, arg2, arg3)
ret0, _ := ret[0].(backend.PluginContext)
ret1, _ := ret[1].(bool)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
}
// GetPluginContext indicates an expected call of GetPluginContext.
func (mr *MockPluginContextGetterMockRecorder) GetPluginContext(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPluginContext", reflect.TypeOf((*MockPluginContextGetter)(nil).GetPluginContext), arg0, arg1, arg2, arg3)
}