diff --git a/go.mod b/go.mod index 2fbae33f9b6..905bcae70d4 100644 --- a/go.mod +++ b/go.mod @@ -36,14 +36,13 @@ require ( github.com/go-stack/stack v1.8.0 github.com/gobwas/glob v0.2.3 github.com/golang/mock v1.5.0 - github.com/golang/protobuf v1.5.1 // indirect github.com/google/go-cmp v0.5.5 github.com/google/uuid v1.2.0 github.com/gosimple/slug v1.9.0 github.com/grafana/alerting-api v0.0.0-20210323142651-d6515052e2f0 github.com/grafana/grafana-aws-sdk v0.2.0 github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4 - github.com/grafana/grafana-plugin-sdk-go v0.88.0 + github.com/grafana/grafana-plugin-sdk-go v0.89.0 github.com/grafana/loki v1.6.2-0.20201026154740-6978ee5d7387 github.com/grpc-ecosystem/go-grpc-middleware v1.2.2 github.com/hashicorp/go-hclog v0.15.0 @@ -65,7 +64,7 @@ require ( github.com/prometheus/alertmanager v0.21.1-0.20210315141118-bf9c43b57df6 github.com/prometheus/client_golang v1.10.0 github.com/prometheus/client_model v0.2.0 - github.com/prometheus/common v0.18.1-0.20210305175002-2a23014b3b39 + github.com/prometheus/common v0.19.0 github.com/robfig/cron v0.0.0-20180505203441-b41be1df6967 github.com/robfig/cron/v3 v3.0.1 github.com/russellhaering/goxmldsig v1.1.0 diff --git a/go.sum b/go.sum index afcd9ff4b00..a5fb85b3304 100644 --- a/go.sum +++ b/go.sum @@ -165,8 +165,9 @@ github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQY github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/antonmedv/expr v1.8.9/go.mod h1:5qsM3oLGDND7sDmQGDXHkYfkjYMUX14qsgqmHhwGEk8= github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db/go.mod h1:VTxUBvSJ3s3eHAg65PNgrsn5BtqCRPdmyXh6rAfdxN0= -github.com/apache/arrow/go/arrow v0.0.0-20200629181129-68b1273cbbf7 h1:dgL2mSOuj63SXOyojjWKq2ni3FQpQ+KrLKD7Pbq6t/4= github.com/apache/arrow/go/arrow v0.0.0-20200629181129-68b1273cbbf7/go.mod h1:QNYViu/X0HXDHw7m3KXzWSVXIbfUvJqBFe6Gj8/pYA0= +github.com/apache/arrow/go/arrow v0.0.0-20210223225224-5bea62493d91 h1:rbe942bXzd2vnds4y9fYQL8X4yFltXoZsKW7KtG+TFM= +github.com/apache/arrow/go/arrow v0.0.0-20210223225224-5bea62493d91/go.mod h1:c9sxoIT3YgLxH4UhLOCKaBlEojuMhVYpk4Ntv3opUTQ= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= @@ -245,6 +246,7 @@ github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA github.com/centrifugal/centrifuge v0.15.0 h1:6B8BGSXOJ7S6RCz29L19hJv3MZN6szRX5nVVnlJ7P4A= github.com/centrifugal/centrifuge v0.15.0/go.mod h1:HZ8joq0C1KJYQgmS9ElLLmB01DzdD7/UeBWLOdlIBJs= github.com/centrifugal/centrifuge-go v0.6.5/go.mod h1:YW9BKhnEMOBPU7C/wfqdqzhIiF0xRd0R4sHW82a7sf8= +github.com/centrifugal/centrifuge-go v0.7.1/go.mod h1:G8cXpoTVd8l6CMHh9LWyUJOEfu6cjrm4SGdT36E15Hc= github.com/centrifugal/protocol v0.3.4/go.mod h1:2YbBCaDwQHl37ErRdMrKSj18X2yVvpkQYtSX6aVbe5A= github.com/centrifugal/protocol v0.3.5 h1:3Tu1iNoKfEw5xkJotKm2or0vhyOl4HtfPbGSDnEWoGA= github.com/centrifugal/protocol v0.3.5/go.mod h1:2YbBCaDwQHl37ErRdMrKSj18X2yVvpkQYtSX6aVbe5A= @@ -807,8 +809,9 @@ github.com/grafana/grafana-aws-sdk v0.2.0/go.mod h1:+pPo5U+pX0zWimR7YBc7ASeSQfbR github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4 h1:SPdxCL9BChFTlyi0Khv64vdCW4TMna8+sxL7+Chx+Ag= github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4/go.mod h1:nc0XxBzjeGcrMltCDw269LoWF9S8ibhgxolCdA1R8To= github.com/grafana/grafana-plugin-sdk-go v0.79.0/go.mod h1:NvxLzGkVhnoBKwzkst6CFfpMFKwAdIUZ1q8ssuLeF60= -github.com/grafana/grafana-plugin-sdk-go v0.88.0 h1:Up4HGwlzHTb3XGljteirrqY7g0UNbMC46/eYNqVhHdw= github.com/grafana/grafana-plugin-sdk-go v0.88.0/go.mod h1:PTALh0lz+Y7k0+OMczAABTpeocL63aw6FVOBptp5GVo= +github.com/grafana/grafana-plugin-sdk-go v0.89.0 h1:TjwqMG9gS4wUbmSI8gO1NVGPUte6uw1D7Dua9I1LbZY= +github.com/grafana/grafana-plugin-sdk-go v0.89.0/go.mod h1:WACdtafPRErZbjnGqMPbmOXXQu6sWNJFzkVDmlWBIhM= github.com/grafana/loki v1.6.2-0.20201026154740-6978ee5d7387 h1:iwcM8lkYJ3EhytGLJ2BvRSwutb0QWoI7EWbYv3yJRsY= github.com/grafana/loki v1.6.2-0.20201026154740-6978ee5d7387/go.mod h1:jHA1OHnPsuj3LLgMXmFopsKDt4ARHHUhrmT3JrGf71g= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= @@ -1237,8 +1240,9 @@ github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= github.com/olekukonko/tablewriter v0.0.1/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= github.com/olekukonko/tablewriter v0.0.2/go.mod h1:rSAaSIOAGT9odnlyGlUfAJaoc5w2fSBUmeGDbRWPxyQ= -github.com/olekukonko/tablewriter v0.0.4 h1:vHD/YYe1Wolo78koG299f7V/VAS08c6IpCLn+Ejf/w8= github.com/olekukonko/tablewriter v0.0.4/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FWnp+qbPhuoO21uA= +github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= +github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/olivere/elastic v6.2.27+incompatible/go.mod h1:J+q1zQJTgAz9woqsbVRqGeB5G1iqDKVBWLNSYW8yfJ8= github.com/olivere/elastic v6.2.35+incompatible/go.mod h1:J+q1zQJTgAz9woqsbVRqGeB5G1iqDKVBWLNSYW8yfJ8= github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -1378,8 +1382,9 @@ github.com/prometheus/common v0.14.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16 github.com/prometheus/common v0.15.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= github.com/prometheus/common v0.17.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= github.com/prometheus/common v0.18.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= -github.com/prometheus/common v0.18.1-0.20210305175002-2a23014b3b39 h1:2B8F94QxZhfNPFQ1jLMjnuOigpr/hVXL7rHFPurXmY8= github.com/prometheus/common v0.18.1-0.20210305175002-2a23014b3b39/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= +github.com/prometheus/common v0.19.0 h1:Itb4+NjG9wRdkAWgVucbM/adyIXxEhbw0866e0uZE6A= +github.com/prometheus/common v0.19.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= github.com/prometheus/exporter-toolkit v0.5.0/go.mod h1:OCkM4805mmisBhLmVFw858QYi3v0wKdY6/UxrT0pZVg= github.com/prometheus/exporter-toolkit v0.5.1/go.mod h1:OCkM4805mmisBhLmVFw858QYi3v0wKdY6/UxrT0pZVg= github.com/prometheus/node_exporter v1.0.0-rc.0.0.20200428091818-01054558c289 h1:dTUS1vaLWq+Y6XKOTnrFpoVsQKLCbCp1OLj24TDi7oM= @@ -1827,6 +1832,7 @@ golang.org/x/net v0.0.0-20200602114024-627f9648deb9/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= @@ -1953,6 +1959,7 @@ golang.org/x/sys v0.0.0-20200728102440-3e129f6d46b1/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200831180312-196b9ba8737a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200909081042-eff7692f9009/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200918174421-af09f7315aff/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201008064518-c1f3e3309c71/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -2182,6 +2189,7 @@ google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20200815001618-f69a88009b70/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= +google.golang.org/genproto v0.0.0-20200911024640-645f7a48b24f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20201109203340-2640f1f9cdfb/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20201201144952-b05cb90ed32e/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20201210142538-e3217bee35cc/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= @@ -2224,6 +2232,7 @@ google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA5 google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.36.0 h1:o1bcQ6imQMIOpdrO3SWf2z5RV72WbDwdXuK0MDlc8As= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= +google.golang.org/grpc/cmd/protoc-gen-go-grpc v0.0.0-20200910201057-6591123024b3/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/grpc/examples v0.0.0-20200728065043-dfc0c05b2da9/go.mod h1:5j1uub0jRGhRiSghIlrThmBUgcgLXOVJQ/l1getT4uo= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= diff --git a/pkg/api/http_server.go b/pkg/api/http_server.go index 188bfc2c5dd..0af211d19f9 100644 --- a/pkg/api/http_server.go +++ b/pkg/api/http_server.go @@ -13,10 +13,6 @@ import ( "strings" "sync" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" - macaron "gopkg.in/macaron.v1" - "github.com/grafana/grafana/pkg/api/routing" httpstatic "github.com/grafana/grafana/pkg/api/static" "github.com/grafana/grafana/pkg/bus" @@ -29,6 +25,7 @@ import ( "github.com/grafana/grafana/pkg/plugins" "github.com/grafana/grafana/pkg/plugins/backendplugin" _ "github.com/grafana/grafana/pkg/plugins/backendplugin/manager" + "github.com/grafana/grafana/pkg/plugins/plugincontext" "github.com/grafana/grafana/pkg/plugins/plugindashboards" "github.com/grafana/grafana/pkg/registry" "github.com/grafana/grafana/pkg/services/alerting" @@ -48,6 +45,10 @@ import ( "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/tsdb" "github.com/grafana/grafana/pkg/util/errutil" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + macaron "gopkg.in/macaron.v1" ) func init() { @@ -65,6 +66,7 @@ type HTTPServer struct { httpSrv *http.Server middlewares []macaron.Handler + PluginContextProvider *plugincontext.Provider `inject:""` RouteRegister routing.RouteRegister `inject:""` Bus bus.Bus `inject:""` RenderService rendering.Service `inject:""` diff --git a/pkg/api/plugins.go b/pkg/api/plugins.go index 7b48eddfc7e..1b0a73d89d0 100644 --- a/pkg/api/plugins.go +++ b/pkg/api/plugins.go @@ -5,7 +5,6 @@ import ( "errors" "net/http" "sort" - "time" "github.com/grafana/grafana-plugin-sdk-go/backend" @@ -14,53 +13,9 @@ import ( "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/plugins" - "github.com/grafana/grafana/pkg/plugins/adapters" "github.com/grafana/grafana/pkg/plugins/backendplugin" - "github.com/grafana/grafana/pkg/util/errutil" ) -// ErrPluginNotFound is returned when an requested plugin is not installed. -var ErrPluginNotFound error = errors.New("plugin not found, no installed plugin with that id") - -func (hs *HTTPServer) getPluginContext(pluginID string, user *models.SignedInUser) (backend.PluginContext, error) { - pc := backend.PluginContext{} - plugin := hs.PluginManager.GetPlugin(pluginID) - if plugin == nil { - return pc, ErrPluginNotFound - } - - jsonData := json.RawMessage{} - decryptedSecureJSONData := map[string]string{} - var updated time.Time - - ps, err := hs.getCachedPluginSettings(pluginID, user) - if err != nil { - // models.ErrPluginSettingNotFound is expected if there's no row found for plugin setting in database (if non-app plugin). - // If it's not this expected error something is wrong with cache or database and we return the error to the client. - if !errors.Is(err, models.ErrPluginSettingNotFound) { - return pc, errutil.Wrap("Failed to get plugin settings", err) - } - } else { - jsonData, err = json.Marshal(ps.JsonData) - if err != nil { - return pc, errutil.Wrap("Failed to unmarshal plugin json data", err) - } - decryptedSecureJSONData = ps.DecryptedValues() - updated = ps.Updated - } - - return backend.PluginContext{ - OrgID: user.OrgId, - PluginID: plugin.Id, - User: adapters.BackendUserFromSignedInUser(user), - AppInstanceSettings: &backend.AppInstanceSettings{ - JSONData: jsonData, - DecryptedSecureJSONData: decryptedSecureJSONData, - Updated: updated, - }, - }, nil -} - func (hs *HTTPServer) GetPluginList(c *models.ReqContext) response.Response { typeFilter := c.Query("type") enabledFilter := c.Query("enabled") @@ -285,14 +240,13 @@ func (hs *HTTPServer) CollectPluginMetrics(c *models.ReqContext) response.Respon func (hs *HTTPServer) CheckHealth(c *models.ReqContext) response.Response { pluginID := c.Params("pluginId") - pCtx, err := hs.getPluginContext(pluginID, c.SignedInUser) + pCtx, found, err := hs.PluginContextProvider.Get(pluginID, "", c.SignedInUser) if err != nil { - if errors.Is(err, ErrPluginNotFound) { - return response.Error(404, "Plugin not found", nil) - } - return response.Error(500, "Failed to get plugin settings", err) } + if !found { + return response.Error(404, "Plugin not found", nil) + } resp, err := hs.BackendPluginManager.CheckHealth(c.Req.Context(), pCtx) if err != nil { @@ -328,39 +282,19 @@ func (hs *HTTPServer) CheckHealth(c *models.ReqContext) response.Response { func (hs *HTTPServer) CallResource(c *models.ReqContext) { pluginID := c.Params("pluginId") - pCtx, err := hs.getPluginContext(pluginID, c.SignedInUser) + pCtx, found, err := hs.PluginContextProvider.Get(pluginID, "", c.SignedInUser) if err != nil { - if errors.Is(err, ErrPluginNotFound) { - c.JsonApiErr(404, "Plugin not found", nil) - return - } - c.JsonApiErr(500, "Failed to get plugin settings", err) return } + if !found { + c.JsonApiErr(404, "Plugin not found", nil) + return + } hs.BackendPluginManager.CallResource(pCtx, c, c.Params("*")) } -func (hs *HTTPServer) getCachedPluginSettings(pluginID string, user *models.SignedInUser) (*models.PluginSetting, error) { - cacheKey := "plugin-setting-" + pluginID - - if cached, found := hs.CacheService.Get(cacheKey); found { - ps := cached.(*models.PluginSetting) - if ps.OrgId == user.OrgId { - return ps, nil - } - } - - query := models.GetPluginSettingByIdQuery{PluginId: pluginID, OrgId: user.OrgId} - if err := hs.Bus.Dispatch(&query); err != nil { - return nil, err - } - - hs.CacheService.Set(cacheKey, query.Result, time.Second*5) - return query.Result, nil -} - -func (hs *HTTPServer) GetPluginErrorsList(c *models.ReqContext) response.Response { +func (hs *HTTPServer) GetPluginErrorsList(_ *models.ReqContext) response.Response { return response.JSON(200, hs.PluginManager.ScanningErrors()) } diff --git a/pkg/plugins/backendplugin/coreplugin/core_plugin.go b/pkg/plugins/backendplugin/coreplugin/core_plugin.go index bfe28e35f75..0c2af2bd572 100644 --- a/pkg/plugins/backendplugin/coreplugin/core_plugin.go +++ b/pkg/plugins/backendplugin/coreplugin/core_plugin.go @@ -19,6 +19,7 @@ type corePlugin struct { backend.CheckHealthHandler backend.CallResourceHandler backend.QueryDataHandler + backend.StreamHandler } // New returns a new backendplugin.PluginFactoryFunc for creating a core (built-in) backendplugin.Plugin. @@ -30,6 +31,7 @@ func New(opts backend.ServeOpts) backendplugin.PluginFactoryFunc { CheckHealthHandler: opts.CheckHealthHandler, CallResourceHandler: opts.CallResourceHandler, QueryDataHandler: opts.QueryDataHandler, + StreamHandler: opts.StreamHandler, }, nil } } @@ -55,9 +57,7 @@ func (cp *corePlugin) DataQuery(ctx context.Context, dsInfo *models.DataSource, } func (cp *corePlugin) Start(ctx context.Context) error { - if cp.QueryDataHandler != nil { - cp.isDataPlugin = true - } + cp.isDataPlugin = cp.QueryDataHandler != nil return nil } @@ -92,3 +92,17 @@ func (cp *corePlugin) CallResource(ctx context.Context, req *backend.CallResourc return backendplugin.ErrMethodNotImplemented } + +func (cp *corePlugin) CanSubscribeToStream(ctx context.Context, req *backend.SubscribeToStreamRequest) (*backend.SubscribeToStreamResponse, error) { + if cp.StreamHandler != nil { + return cp.StreamHandler.CanSubscribeToStream(ctx, req) + } + return nil, backendplugin.ErrMethodNotImplemented +} + +func (cp *corePlugin) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender backend.StreamPacketSender) error { + if cp.StreamHandler != nil { + return cp.StreamHandler.RunStream(ctx, req, sender) + } + return backendplugin.ErrMethodNotImplemented +} diff --git a/pkg/plugins/backendplugin/grpcplugin/client.go b/pkg/plugins/backendplugin/grpcplugin/client.go index 2b495ac8624..81538cab34a 100644 --- a/pkg/plugins/backendplugin/grpcplugin/client.go +++ b/pkg/plugins/backendplugin/grpcplugin/client.go @@ -74,6 +74,7 @@ func getV2PluginSet() goplugin.PluginSet { "diagnostics": &grpcplugin.DiagnosticsGRPCPlugin{}, "resource": &grpcplugin.ResourceGRPCPlugin{}, "data": &grpcplugin.DataGRPCPlugin{}, + "stream": &grpcplugin.StreamGRPCPlugin{}, "renderer": &pluginextensionv2.RendererGRPCPlugin{}, } } @@ -120,4 +121,5 @@ type LegacyClient struct { type Client struct { DataPlugin grpcplugin.DataClient RendererPlugin pluginextensionv2.RendererPlugin + StreamClient grpcplugin.StreamClient } diff --git a/pkg/plugins/backendplugin/grpcplugin/client_v1.go b/pkg/plugins/backendplugin/grpcplugin/client_v1.go index 7fe1fe2573c..bedf3cd30d6 100644 --- a/pkg/plugins/backendplugin/grpcplugin/client_v1.go +++ b/pkg/plugins/backendplugin/grpcplugin/client_v1.go @@ -63,6 +63,14 @@ func (c *clientV1) CallResource(ctx context.Context, req *backend.CallResourceRe return backendplugin.ErrMethodNotImplemented } +func (c *clientV1) CanSubscribeToStream(ctx context.Context, request *backend.SubscribeToStreamRequest) (*backend.SubscribeToStreamResponse, error) { + return nil, backendplugin.ErrMethodNotImplemented +} + +func (c *clientV1) RunStream(ctx context.Context, request *backend.RunStreamRequest, sender backend.StreamPacketSender) error { + return backendplugin.ErrMethodNotImplemented +} + type datasourceV1QueryFunc func(ctx context.Context, req *datasourceV1.DatasourceRequest) (*datasourceV1.DatasourceResponse, error) func (fn datasourceV1QueryFunc) Query(ctx context.Context, req *datasourceV1.DatasourceRequest) (*datasourceV1.DatasourceResponse, error) { diff --git a/pkg/plugins/backendplugin/grpcplugin/client_v2.go b/pkg/plugins/backendplugin/grpcplugin/client_v2.go index 68a175f492d..17501184cd1 100644 --- a/pkg/plugins/backendplugin/grpcplugin/client_v2.go +++ b/pkg/plugins/backendplugin/grpcplugin/client_v2.go @@ -23,6 +23,7 @@ type clientV2 struct { grpcplugin.DiagnosticsClient grpcplugin.ResourceClient grpcplugin.DataClient + grpcplugin.StreamClient pluginextensionv2.RendererPlugin } @@ -42,6 +43,11 @@ func newClientV2(descriptor PluginDescriptor, logger log.Logger, rpcClient plugi return nil, err } + rawStream, err := rpcClient.Dispense("stream") + if err != nil { + return nil, err + } + rawRenderer, err := rpcClient.Dispense("renderer") if err != nil { return nil, err @@ -66,6 +72,12 @@ func newClientV2(descriptor PluginDescriptor, logger log.Logger, rpcClient plugi } } + if rawStream != nil { + if plugin, ok := rawStream.(grpcplugin.StreamClient); ok { + c.StreamClient = plugin + } + } + if rawRenderer != nil { if plugin, ok := rawRenderer.(pluginextensionv2.RendererPlugin); ok { c.RendererPlugin = plugin @@ -76,6 +88,7 @@ func newClientV2(descriptor PluginDescriptor, logger log.Logger, rpcClient plugi client := &Client{ DataPlugin: c.DataClient, RendererPlugin: c.RendererPlugin, + StreamClient: c.StreamClient, } if err := descriptor.startFns.OnStart(descriptor.pluginID, client, logger); err != nil { return nil, err @@ -158,6 +171,48 @@ func (c *clientV2) CallResource(ctx context.Context, req *backend.CallResourceRe } } +func (c *clientV2) CanSubscribeToStream(ctx context.Context, req *backend.SubscribeToStreamRequest) (*backend.SubscribeToStreamResponse, error) { + if c.StreamClient == nil { + return nil, backendplugin.ErrMethodNotImplemented + } + protoResp, err := c.StreamClient.CanSubscribeToStream(ctx, backend.ToProto().SubscribeToStreamRequest(req)) + if err != nil { + return nil, err + } + return backend.FromProto().SubscribeToStreamResponse(protoResp), nil +} + +func (c *clientV2) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender backend.StreamPacketSender) error { + if c.StreamClient == nil { + return backendplugin.ErrMethodNotImplemented + } + + protoReq := backend.ToProto().RunStreamRequest(req) + protoStream, err := c.StreamClient.RunStream(ctx, protoReq) + if err != nil { + if status.Code(err) == codes.Unimplemented { + return backendplugin.ErrMethodNotImplemented + } + return errutil.Wrap("Failed to call resource", err) + } + + for { + protoResp, err := protoStream.Recv() + if err != nil { + if status.Code(err) == codes.Unimplemented { + return backendplugin.ErrMethodNotImplemented + } + if errors.Is(err, io.EOF) { + return nil + } + return errutil.Wrap("failed to receive call resource response", err) + } + if err := sender.Send(backend.FromProto().StreamPacket(protoResp)); err != nil { + return err + } + } +} + type dataClientQueryDataFunc func(ctx context.Context, req *pluginv2.QueryDataRequest, opts ...grpc.CallOption) (*pluginv2.QueryDataResponse, error) func (fn dataClientQueryDataFunc) QueryData(ctx context.Context, req *pluginv2.QueryDataRequest, opts ...grpc.CallOption) (*pluginv2.QueryDataResponse, error) { diff --git a/pkg/plugins/backendplugin/grpcplugin/grpc_plugin.go b/pkg/plugins/backendplugin/grpcplugin/grpc_plugin.go index f724b07f9ad..1aa0e3a82bd 100644 --- a/pkg/plugins/backendplugin/grpcplugin/grpc_plugin.go +++ b/pkg/plugins/backendplugin/grpcplugin/grpc_plugin.go @@ -15,6 +15,7 @@ type pluginClient interface { backend.CollectMetricsHandler backend.CheckHealthHandler backend.CallResourceHandler + backend.StreamHandler } type grpcPlugin struct { @@ -138,3 +139,27 @@ func (p *grpcPlugin) CallResource(ctx context.Context, req *backend.CallResource return pluginClient.CallResource(ctx, req, sender) } + +func (p *grpcPlugin) CanSubscribeToStream(ctx context.Context, request *backend.SubscribeToStreamRequest) (*backend.SubscribeToStreamResponse, error) { + p.mutex.RLock() + if p.client == nil || p.client.Exited() || p.pluginClient == nil { + p.mutex.RUnlock() + return nil, backendplugin.ErrPluginUnavailable + } + pluginClient := p.pluginClient + p.mutex.RUnlock() + + return pluginClient.CanSubscribeToStream(ctx, request) +} + +func (p *grpcPlugin) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender backend.StreamPacketSender) error { + p.mutex.RLock() + if p.client == nil || p.client.Exited() || p.pluginClient == nil { + p.mutex.RUnlock() + return backendplugin.ErrPluginUnavailable + } + pluginClient := p.pluginClient + p.mutex.RUnlock() + + return pluginClient.RunStream(ctx, req, sender) +} diff --git a/pkg/plugins/backendplugin/ifaces.go b/pkg/plugins/backendplugin/ifaces.go index 832d0aab8c0..54e39f98a15 100644 --- a/pkg/plugins/backendplugin/ifaces.go +++ b/pkg/plugins/backendplugin/ifaces.go @@ -20,6 +20,8 @@ type Manager interface { CheckHealth(ctx context.Context, pCtx backend.PluginContext) (*backend.CheckHealthResult, error) // CallResource calls a plugin resource. CallResource(pluginConfig backend.PluginContext, ctx *models.ReqContext, path string) + // Get plugin by its ID. + Get(pluginID string) (Plugin, bool) // GetDataPlugin gets a DataPlugin with a certain ID or nil if it doesn't exist. // TODO: interface{} is the return type in order to break a dependency cycle. Should be plugins.DataPlugin. GetDataPlugin(pluginID string) interface{} @@ -37,4 +39,5 @@ type Plugin interface { backend.CollectMetricsHandler backend.CheckHealthHandler backend.CallResourceHandler + backend.StreamHandler } diff --git a/pkg/plugins/backendplugin/manager/manager.go b/pkg/plugins/backendplugin/manager/manager.go index a903d4bb2f5..bcf9149584f 100644 --- a/pkg/plugins/backendplugin/manager/manager.go +++ b/pkg/plugins/backendplugin/manager/manager.go @@ -96,6 +96,11 @@ func (m *manager) Register(pluginID string, factory backendplugin.PluginFactoryF return nil } +func (m *manager) Get(pluginID string) (backendplugin.Plugin, bool) { + p, ok := m.plugins[pluginID] + return p, ok +} + func (m *manager) getAWSEnvironmentVariables() []string { variables := []string{} if m.Cfg.AWSAssumeRoleEnabled { diff --git a/pkg/plugins/backendplugin/manager/manager_test.go b/pkg/plugins/backendplugin/manager/manager_test.go index 2e354880d49..e95e22c5317 100644 --- a/pkg/plugins/backendplugin/manager/manager_test.go +++ b/pkg/plugins/backendplugin/manager/manager_test.go @@ -396,6 +396,14 @@ func (tp *testPlugin) CallResource(ctx context.Context, req *backend.CallResourc return backendplugin.ErrMethodNotImplemented } +func (tp *testPlugin) CanSubscribeToStream(ctx context.Context, request *backend.SubscribeToStreamRequest) (*backend.SubscribeToStreamResponse, error) { + return nil, backendplugin.ErrMethodNotImplemented +} + +func (tp *testPlugin) RunStream(ctx context.Context, request *backend.RunStreamRequest, sender backend.StreamPacketSender) error { + return backendplugin.ErrMethodNotImplemented +} + type testLicensingService struct { edition string hasLicense bool diff --git a/pkg/plugins/datasource_plugin_wrapper_v2.go b/pkg/plugins/datasource_plugin_wrapper_v2.go index 33b3fb68585..3455c6fc4d1 100644 --- a/pkg/plugins/datasource_plugin_wrapper_v2.go +++ b/pkg/plugins/datasource_plugin_wrapper_v2.go @@ -14,8 +14,8 @@ import ( "github.com/grafana/grafana/pkg/services/oauthtoken" ) -func newDataSourcePluginWrapperV2(log log.Logger, pluginId, pluginType string, client grpcplugin.DataClient) *DatasourcePluginWrapperV2 { - return &DatasourcePluginWrapperV2{DataClient: client, logger: log, pluginId: pluginId, pluginType: pluginType} +func newDataSourcePluginWrapperV2(log log.Logger, pluginId, pluginType string, dataClient grpcplugin.DataClient) *DatasourcePluginWrapperV2 { + return &DatasourcePluginWrapperV2{DataClient: dataClient, logger: log, pluginId: pluginId, pluginType: pluginType} } type DatasourcePluginWrapperV2 struct { diff --git a/pkg/plugins/plugincontext/plugincontext.go b/pkg/plugins/plugincontext/plugincontext.go new file mode 100644 index 00000000000..cf2c2857b99 --- /dev/null +++ b/pkg/plugins/plugincontext/plugincontext.go @@ -0,0 +1,118 @@ +package plugincontext + +import ( + "encoding/json" + "errors" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + + "github.com/grafana/grafana/pkg/bus" + "github.com/grafana/grafana/pkg/infra/localcache" + "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/plugins" + "github.com/grafana/grafana/pkg/plugins/adapters" + "github.com/grafana/grafana/pkg/registry" + "github.com/grafana/grafana/pkg/services/datasources" + "github.com/grafana/grafana/pkg/util/errutil" +) + +func init() { + registry.Register(®istry.Descriptor{ + Name: "PluginContextProvider", + Instance: newProvider(), + }) +} + +func newProvider() *Provider { + return &Provider{} +} + +type Provider struct { + Bus bus.Bus `inject:""` + CacheService *localcache.CacheService `inject:""` + PluginManager plugins.Manager `inject:""` + DatasourceCache datasources.CacheService `inject:""` +} + +func (p *Provider) Init() error { + return nil +} + +// Get allows getting plugin context by its id. If datasourceUID is not empty string +// then PluginContext.DataSourceInstanceSettings will be resolved and appended to +// returned context. +func (p *Provider) Get(pluginID string, datasourceUID string, user *models.SignedInUser) (backend.PluginContext, bool, error) { + pc := backend.PluginContext{} + plugin := p.PluginManager.GetPlugin(pluginID) + if plugin == nil { + return pc, false, nil + } + + jsonData := json.RawMessage{} + decryptedSecureJSONData := map[string]string{} + var updated time.Time + + ps, err := p.getCachedPluginSettings(pluginID, user) + if err != nil { + // models.ErrPluginSettingNotFound is expected if there's no row found for plugin setting in database (if non-app plugin). + // If it's not this expected error something is wrong with cache or database and we return the error to the client. + if !errors.Is(err, models.ErrPluginSettingNotFound) { + return pc, false, errutil.Wrap("Failed to get plugin settings", err) + } + } else { + jsonData, err = json.Marshal(ps.JsonData) + if err != nil { + return pc, false, errutil.Wrap("Failed to unmarshal plugin json data", err) + } + decryptedSecureJSONData = ps.DecryptedValues() + updated = ps.Updated + } + + pCtx := backend.PluginContext{ + OrgID: user.OrgId, + PluginID: plugin.Id, + User: adapters.BackendUserFromSignedInUser(user), + AppInstanceSettings: &backend.AppInstanceSettings{ + JSONData: jsonData, + DecryptedSecureJSONData: decryptedSecureJSONData, + Updated: updated, + }, + } + + if datasourceUID != "" { + ds, err := p.DatasourceCache.GetDatasourceByUID(datasourceUID, user, false) + if err != nil { + return pc, false, errutil.Wrap("Failed to get datasource", err) + } + datasourceSettings, err := adapters.ModelToInstanceSettings(ds) + if err != nil { + return pc, false, errutil.Wrap("Failed to convert datasource", err) + } + pCtx.DataSourceInstanceSettings = datasourceSettings + } + + return pCtx, true, nil +} + +const pluginSettingsCacheTTL = 5 * time.Second +const pluginSettingsCachePrefix = "plugin-setting-" + +func (p *Provider) getCachedPluginSettings(pluginID string, user *models.SignedInUser) (*models.PluginSetting, error) { + cacheKey := pluginSettingsCachePrefix + pluginID + + if cached, found := p.CacheService.Get(cacheKey); found { + ps := cached.(*models.PluginSetting) + if ps.OrgId == user.OrgId { + return ps, nil + } + } + + query := models.GetPluginSettingByIdQuery{PluginId: pluginID, OrgId: user.OrgId} + if err := p.Bus.Dispatch(&query); err != nil { + return nil, err + } + + p.CacheService.Set(cacheKey, query.Result, pluginSettingsCacheTTL) + return query.Result, nil +} diff --git a/pkg/services/live/channel.go b/pkg/services/live/channel.go index d5374a5ee5e..962411104f5 100644 --- a/pkg/services/live/channel.go +++ b/pkg/services/live/channel.go @@ -6,13 +6,14 @@ import ( // ChannelAddress is the channel ID split by parts. type ChannelAddress struct { - // Scope is "grafana", "ds", or "plugin". + // Scope is one of available channel scopes: + // like ScopeGrafana, ScopePlugin, ScopeDatasource. Scope string `json:"scope,omitempty"` // Namespace meaning depends on the scope. - // * when grafana, namespace is a "feature" - // * when ds, namespace is the datasource id - // * when plugin, namespace is the plugin name + // * when ScopeGrafana, namespace is a "feature" + // * when ScopePlugin, namespace is the plugin name + // * when ScopeDatasource, namespace is the datasource uid Namespace string `json:"namespace,omitempty"` // Within each namespace, the handler can process the path as needed. diff --git a/pkg/services/live/context.go b/pkg/services/live/context.go new file mode 100644 index 00000000000..4d3163293c0 --- /dev/null +++ b/pkg/services/live/context.go @@ -0,0 +1,24 @@ +package live + +import ( + "context" + + "github.com/grafana/grafana/pkg/models" +) + +type signedUserContextKeyType int + +var signedUserContextKey signedUserContextKeyType + +func setContextSignedUser(ctx context.Context, user *models.SignedInUser) context.Context { + ctx = context.WithValue(ctx, signedUserContextKey, user) + return ctx +} + +func getContextSignedUser(ctx context.Context) (*models.SignedInUser, bool) { + if val := ctx.Value(signedUserContextKey); val != nil { + user, ok := val.(*models.SignedInUser) + return user, ok + } + return nil, false +} diff --git a/pkg/services/live/features/mock.go b/pkg/services/live/features/mock.go new file mode 100644 index 00000000000..d8d4244673f --- /dev/null +++ b/pkg/services/live/features/mock.go @@ -0,0 +1,164 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/grafana/grafana/pkg/services/live/features (interfaces: ChannelPublisher,PresenceGetter,PluginContextGetter,StreamRunner) + +// Package features is a generated GoMock package. +package features + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + backend "github.com/grafana/grafana-plugin-sdk-go/backend" +) + +// MockChannelPublisher is a mock of ChannelPublisher interface. +type MockChannelPublisher struct { + ctrl *gomock.Controller + recorder *MockChannelPublisherMockRecorder +} + +// MockChannelPublisherMockRecorder is the mock recorder for MockChannelPublisher. +type MockChannelPublisherMockRecorder struct { + mock *MockChannelPublisher +} + +// NewMockChannelPublisher creates a new mock instance. +func NewMockChannelPublisher(ctrl *gomock.Controller) *MockChannelPublisher { + mock := &MockChannelPublisher{ctrl: ctrl} + mock.recorder = &MockChannelPublisherMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockChannelPublisher) EXPECT() *MockChannelPublisherMockRecorder { + return m.recorder +} + +// Publish mocks base method. +func (m *MockChannelPublisher) Publish(arg0 string, arg1 []byte) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Publish", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// Publish indicates an expected call of Publish. +func (mr *MockChannelPublisherMockRecorder) Publish(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Publish", reflect.TypeOf((*MockChannelPublisher)(nil).Publish), arg0, arg1) +} + +// MockPresenceGetter is a mock of PresenceGetter interface. +type MockPresenceGetter struct { + ctrl *gomock.Controller + recorder *MockPresenceGetterMockRecorder +} + +// MockPresenceGetterMockRecorder is the mock recorder for MockPresenceGetter. +type MockPresenceGetterMockRecorder struct { + mock *MockPresenceGetter +} + +// NewMockPresenceGetter creates a new mock instance. +func NewMockPresenceGetter(ctrl *gomock.Controller) *MockPresenceGetter { + mock := &MockPresenceGetter{ctrl: ctrl} + mock.recorder = &MockPresenceGetterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPresenceGetter) EXPECT() *MockPresenceGetterMockRecorder { + return m.recorder +} + +// GetNumSubscribers mocks base method. +func (m *MockPresenceGetter) GetNumSubscribers(arg0 string) (int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetNumSubscribers", arg0) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetNumSubscribers indicates an expected call of GetNumSubscribers. +func (mr *MockPresenceGetterMockRecorder) GetNumSubscribers(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetNumSubscribers", reflect.TypeOf((*MockPresenceGetter)(nil).GetNumSubscribers), arg0) +} + +// MockPluginContextGetter is a mock of PluginContextGetter interface. +type MockPluginContextGetter struct { + ctrl *gomock.Controller + recorder *MockPluginContextGetterMockRecorder +} + +// MockPluginContextGetterMockRecorder is the mock recorder for MockPluginContextGetter. +type MockPluginContextGetterMockRecorder struct { + mock *MockPluginContextGetter +} + +// NewMockPluginContextGetter creates a new mock instance. +func NewMockPluginContextGetter(ctrl *gomock.Controller) *MockPluginContextGetter { + mock := &MockPluginContextGetter{ctrl: ctrl} + mock.recorder = &MockPluginContextGetterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPluginContextGetter) EXPECT() *MockPluginContextGetterMockRecorder { + return m.recorder +} + +// GetPluginContext mocks base method. +func (m *MockPluginContextGetter) GetPluginContext(arg0 context.Context, arg1, arg2 string) (backend.PluginContext, bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPluginContext", arg0, arg1, arg2) + ret0, _ := ret[0].(backend.PluginContext) + ret1, _ := ret[1].(bool) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// GetPluginContext indicates an expected call of GetPluginContext. +func (mr *MockPluginContextGetterMockRecorder) GetPluginContext(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPluginContext", reflect.TypeOf((*MockPluginContextGetter)(nil).GetPluginContext), arg0, arg1, arg2) +} + +// MockStreamRunner is a mock of StreamRunner interface. +type MockStreamRunner struct { + ctrl *gomock.Controller + recorder *MockStreamRunnerMockRecorder +} + +// MockStreamRunnerMockRecorder is the mock recorder for MockStreamRunner. +type MockStreamRunnerMockRecorder struct { + mock *MockStreamRunner +} + +// NewMockStreamRunner creates a new mock instance. +func NewMockStreamRunner(ctrl *gomock.Controller) *MockStreamRunner { + mock := &MockStreamRunner{ctrl: ctrl} + mock.recorder = &MockStreamRunnerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStreamRunner) EXPECT() *MockStreamRunnerMockRecorder { + return m.recorder +} + +// RunStream mocks base method. +func (m *MockStreamRunner) RunStream(arg0 context.Context, arg1 *backend.RunStreamRequest, arg2 backend.StreamPacketSender) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RunStream", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// RunStream indicates an expected call of RunStream. +func (mr *MockStreamRunnerMockRecorder) RunStream(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunStream", reflect.TypeOf((*MockStreamRunner)(nil).RunStream), arg0, arg1, arg2) +} diff --git a/pkg/services/live/features/plugin.go b/pkg/services/live/features/plugin.go new file mode 100644 index 00000000000..900b3dffc6f --- /dev/null +++ b/pkg/services/live/features/plugin.go @@ -0,0 +1,122 @@ +package features + +import ( + "context" + "fmt" + + "github.com/centrifugal/centrifuge" + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/models" +) + +//go:generate mockgen -destination=mock.go -package=features github.com/grafana/grafana/pkg/services/live/features ChannelPublisher,PresenceGetter,PluginContextGetter,StreamRunner + +type ChannelPublisher interface { + Publish(channel string, data []byte) error +} + +type PresenceGetter interface { + GetNumSubscribers(channel string) (int, error) +} + +type PluginContextGetter interface { + GetPluginContext(ctx context.Context, pluginID string, datasourceUID string) (backend.PluginContext, bool, error) +} + +type StreamRunner interface { + RunStream(ctx context.Context, request *backend.RunStreamRequest, sender backend.StreamPacketSender) error +} + +type streamSender struct { + channel string + channelPublisher ChannelPublisher +} + +func newStreamSender(channel string, publisher ChannelPublisher) *streamSender { + return &streamSender{channel: channel, channelPublisher: publisher} +} + +func (p *streamSender) Send(packet *backend.StreamPacket) error { + return p.channelPublisher.Publish(p.channel, packet.Payload) +} + +// PluginRunner can handle streaming operations for channels belonging to plugins. +type PluginRunner struct { + pluginID string + datasourceUID string + pluginContextGetter PluginContextGetter + handler backend.StreamHandler + streamManager *StreamManager +} + +// NewPluginRunner creates new PluginRunner. +func NewPluginRunner(pluginID string, datasourceUID string, streamManager *StreamManager, pluginContextGetter PluginContextGetter, handler backend.StreamHandler) *PluginRunner { + return &PluginRunner{ + pluginID: pluginID, + datasourceUID: datasourceUID, + pluginContextGetter: pluginContextGetter, + handler: handler, + streamManager: streamManager, + } +} + +// GetHandlerForPath gets the handler for a path. +func (m *PluginRunner) GetHandlerForPath(path string) (models.ChannelHandler, error) { + return &PluginPathRunner{ + path: path, + pluginID: m.pluginID, + datasourceUID: m.datasourceUID, + streamManager: m.streamManager, + handler: m.handler, + pluginContextGetter: m.pluginContextGetter, + }, nil +} + +// PluginPathRunner can handle streaming operations for channels belonging to plugin specific path. +type PluginPathRunner struct { + path string + pluginID string + datasourceUID string + streamManager *StreamManager + handler backend.StreamHandler + pluginContextGetter PluginContextGetter +} + +// OnSubscribe passes control to a plugin. +func (r *PluginPathRunner) OnSubscribe(client *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) { + pCtx, found, err := r.pluginContextGetter.GetPluginContext(client.Context(), r.pluginID, r.datasourceUID) + if err != nil { + logger.Error("Get plugin context error", "error", err, "path", r.path) + return centrifuge.SubscribeReply{}, err + } + if !found { + logger.Error("Plugin context not found", "path", r.path) + return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal + } + resp, err := r.handler.CanSubscribeToStream(client.Context(), &backend.SubscribeToStreamRequest{ + PluginContext: pCtx, + Path: r.path, + }) + if err != nil { + logger.Error("Plugin CanSubscribeToStream call error", "error", err, "path", r.path) + return centrifuge.SubscribeReply{}, err + } + if !resp.OK { + return centrifuge.SubscribeReply{}, centrifuge.ErrorPermissionDenied + } + err = r.streamManager.SubmitStream(e.Channel, r.path, pCtx, r.handler) + if err != nil { + logger.Error("Error submitting stream to manager", "error", err, "path", r.path) + return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal + } + return centrifuge.SubscribeReply{ + Options: centrifuge.SubscribeOptions{ + Presence: true, + }, + }, nil +} + +// OnPublish passes control to a plugin. +func (r *PluginPathRunner) OnPublish(_ *centrifuge.Client, _ centrifuge.PublishEvent) (centrifuge.PublishReply, error) { + return centrifuge.PublishReply{}, fmt.Errorf("not implemented yet") +} diff --git a/pkg/services/live/features/stream.go b/pkg/services/live/features/stream.go new file mode 100644 index 00000000000..8ef23f08e28 --- /dev/null +++ b/pkg/services/live/features/stream.go @@ -0,0 +1,173 @@ +package features + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/backend" +) + +// StreamManager manages streams from Grafana to plugins. +type StreamManager struct { + mu sync.RWMutex + streams map[string]struct{} + presenceGetter PresenceGetter + channelPublisher ChannelPublisher + registerCh chan streamRequest + closedCh chan struct{} + checkInterval time.Duration + maxChecks int +} + +// StreamManagerOption modifies StreamManager behavior (used for tests for example). +type StreamManagerOption func(*StreamManager) + +// WithCheckConfig allows setting custom check rules. +func WithCheckConfig(interval time.Duration, maxChecks int) StreamManagerOption { + return func(sm *StreamManager) { + sm.checkInterval = interval + sm.maxChecks = maxChecks + } +} + +const ( + defaultCheckInterval = 5 * time.Second + defaultMaxChecks = 3 +) + +// NewStreamManager creates new StreamManager. +func NewStreamManager(chPublisher ChannelPublisher, presenceGetter PresenceGetter, opts ...StreamManagerOption) *StreamManager { + sm := &StreamManager{ + streams: make(map[string]struct{}), + channelPublisher: chPublisher, + presenceGetter: presenceGetter, + registerCh: make(chan streamRequest), + closedCh: make(chan struct{}), + checkInterval: defaultCheckInterval, + maxChecks: defaultMaxChecks, + } + for _, opt := range opts { + opt(sm) + } + return sm +} + +func (s *StreamManager) stopStream(sr streamRequest, cancelFn func()) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.streams, sr.Channel) + cancelFn() +} + +func (s *StreamManager) watchStream(ctx context.Context, cancelFn func(), sr streamRequest) { + numNoSubscribersChecks := 0 + for { + select { + case <-ctx.Done(): + return + case <-time.After(s.checkInterval): + numSubscribers, err := s.presenceGetter.GetNumSubscribers(sr.Channel) + if err != nil { + logger.Error("Error checking num subscribers", "channel", sr.Channel, "path", sr.Path) + continue + } + if numSubscribers > 0 { + // reset counter since channel has active subscribers. + numNoSubscribersChecks = 0 + continue + } + numNoSubscribersChecks++ + if numNoSubscribersChecks >= s.maxChecks { + logger.Info("Stop stream since no active subscribers", "channel", sr.Channel, "path", sr.Path) + s.stopStream(sr, cancelFn) + return + } + } + } +} + +// run stream until context canceled. +func (s *StreamManager) runStream(ctx context.Context, sr streamRequest) { + for { + select { + case <-ctx.Done(): + return + default: + } + err := sr.StreamRunner.RunStream( + ctx, + &backend.RunStreamRequest{ + PluginContext: sr.PluginContext, + Path: sr.Path, + }, + newStreamSender(sr.Channel, s.channelPublisher), + ) + if err != nil { + if errors.Is(ctx.Err(), context.Canceled) { + logger.Info("Stream cleanly finished", "path", sr.Path) + return + } + logger.Error("Error running stream, retrying", "path", sr.Path, "error", err) + continue + } + logger.Warn("Stream finished without error?", "path", sr.Path) + return + } +} + +func (s *StreamManager) registerStream(ctx context.Context, sr streamRequest) { + s.mu.Lock() + if _, ok := s.streams[sr.Channel]; ok { + logger.Debug("Skip running new stream (already exists)", "path", sr.Path) + s.mu.Unlock() + return + } + ctx, cancel := context.WithCancel(ctx) + defer cancel() + s.streams[sr.Channel] = struct{}{} + s.mu.Unlock() + + go s.watchStream(ctx, cancel, sr) + s.runStream(ctx, sr) +} + +// Run StreamManager till context canceled. +func (s *StreamManager) Run(ctx context.Context) error { + for { + select { + case sr := <-s.registerCh: + go s.registerStream(ctx, sr) + case <-ctx.Done(): + close(s.closedCh) + return ctx.Err() + } + } +} + +type streamRequest struct { + Channel string + Path string + PluginContext backend.PluginContext + StreamRunner StreamRunner +} + +// SubmitStream submits stream handler in StreamManager to manage. +// The stream will be opened and kept till channel has active subscribers. +func (s *StreamManager) SubmitStream(channel string, path string, pCtx backend.PluginContext, streamRunner StreamRunner) error { + select { + case <-s.closedCh: + close(s.registerCh) + return nil + case s.registerCh <- streamRequest{ + Channel: channel, + Path: path, + PluginContext: pCtx, + StreamRunner: streamRunner, + }: + case <-time.After(time.Second): + return errors.New("timeout") + } + return nil +} diff --git a/pkg/services/live/features/stream_test.go b/pkg/services/live/features/stream_test.go new file mode 100644 index 00000000000..5d42ff8ed05 --- /dev/null +++ b/pkg/services/live/features/stream_test.go @@ -0,0 +1,129 @@ +package features + +import ( + "context" + "testing" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" +) + +// wait until channel closed with timeout. +func waitWithTimeout(tb testing.TB, ch chan struct{}, timeout time.Duration) { + select { + case <-ch: + case <-time.After(timeout): + tb.Fatal("timeout") + } +} + +func TestStreamManager_Run(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mockChannelPublisher := NewMockChannelPublisher(mockCtrl) + mockPresenceGetter := NewMockPresenceGetter(mockCtrl) + + manager := NewStreamManager(mockChannelPublisher, mockPresenceGetter) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + cancel() + }() + + err := manager.Run(ctx) + require.ErrorIs(t, err, context.Canceled) +} + +func TestStreamManager_SubmitStream_Send(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mockChannelPublisher := NewMockChannelPublisher(mockCtrl) + mockPresenceGetter := NewMockPresenceGetter(mockCtrl) + + manager := NewStreamManager(mockChannelPublisher, mockPresenceGetter) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + _ = manager.Run(ctx) + }() + + startedCh := make(chan struct{}) + doneCh := make(chan struct{}) + + mockChannelPublisher.EXPECT().Publish("test", []byte("test")).Times(1) + + mockStreamRunner := NewMockStreamRunner(mockCtrl) + mockStreamRunner.EXPECT().RunStream( + gomock.Any(), gomock.Any(), gomock.Any(), + ).Do(func(ctx context.Context, req *backend.RunStreamRequest, sender backend.StreamPacketSender) error { + require.Equal(t, "test", req.Path) + close(startedCh) + err := sender.Send(&backend.StreamPacket{ + Payload: []byte("test"), + }) + require.NoError(t, err) + <-ctx.Done() + close(doneCh) + return ctx.Err() + }).Times(1) + + err := manager.SubmitStream("test", "test", backend.PluginContext{}, mockStreamRunner) + require.NoError(t, err) + + // try submit the same. + err = manager.SubmitStream("test", "test", backend.PluginContext{}, mockStreamRunner) + require.NoError(t, err) + + waitWithTimeout(t, startedCh, time.Second) + require.Len(t, manager.streams, 1) + cancel() + waitWithTimeout(t, doneCh, time.Second) +} + +func TestStreamManager_SubmitStream_CloseNoSubscribers(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mockChannelPublisher := NewMockChannelPublisher(mockCtrl) + mockPresenceGetter := NewMockPresenceGetter(mockCtrl) + + manager := NewStreamManager( + mockChannelPublisher, + mockPresenceGetter, + WithCheckConfig(10*time.Millisecond, 3), + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + _ = manager.Run(ctx) + }() + + startedCh := make(chan struct{}) + doneCh := make(chan struct{}) + + mockPresenceGetter.EXPECT().GetNumSubscribers("test").Return(0, nil).Times(3) + + mockStreamRunner := NewMockStreamRunner(mockCtrl) + mockStreamRunner.EXPECT().RunStream(gomock.Any(), gomock.Any(), gomock.Any()).Do(func(ctx context.Context, req *backend.RunStreamRequest, sender backend.StreamPacketSender) error { + close(startedCh) + <-ctx.Done() + close(doneCh) + return ctx.Err() + }).Times(1) + + err := manager.SubmitStream("test", "test", backend.PluginContext{}, mockStreamRunner) + require.NoError(t, err) + + waitWithTimeout(t, startedCh, time.Second) + waitWithTimeout(t, doneCh, time.Second) + require.Len(t, manager.streams, 0) +} diff --git a/pkg/services/live/features/testdata.go b/pkg/services/live/features/testdata.go deleted file mode 100644 index ee41aebd9f1..00000000000 --- a/pkg/services/live/features/testdata.go +++ /dev/null @@ -1,113 +0,0 @@ -package features - -import ( - "encoding/json" - "fmt" - "math/rand" - "time" - - "github.com/centrifugal/centrifuge" - "github.com/grafana/grafana/pkg/models" -) - -// testDataRunner manages all the `grafana/dashboard/*` channels. -type testDataRunner struct { - publisher models.ChannelPublisher - running bool - speedMillis int - dropPercent float64 - channel string - name string -} - -// TestDataSupplier manages all the `grafana/testdata/*` channels. -type TestDataSupplier struct { - Publisher models.ChannelPublisher -} - -// GetHandlerForPath gets the channel handler for a path. -// Called on init. -func (s *TestDataSupplier) GetHandlerForPath(path string) (models.ChannelHandler, error) { - channel := "grafana/testdata/" + path - - if path == "random-2s-stream" { - return &testDataRunner{ - publisher: s.Publisher, - running: false, - speedMillis: 2000, - dropPercent: 0, - channel: channel, - name: path, - }, nil - } - - if path == "random-flakey-stream" { - return &testDataRunner{ - publisher: s.Publisher, - running: false, - speedMillis: 400, - dropPercent: .6, - channel: channel, - }, nil - } - - return nil, fmt.Errorf("unknown channel") -} - -// OnSubscribe will let anyone connect to the path -func (r *testDataRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) { - if !r.running { - r.running = true - - // Run in the background - go r.runRandomCSV() - } - - return centrifuge.SubscribeReply{}, nil -} - -// OnPublish checks if a message from the websocket can be broadcast on this channel -func (r *testDataRunner) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) { - return centrifuge.PublishReply{}, fmt.Errorf("can not publish to testdata") -} - -// runRandomCSV is just for an example. -func (r *testDataRunner) runRandomCSV() { - spread := 50.0 - - walker := rand.Float64() * 100 - ticker := time.NewTicker(time.Duration(r.speedMillis) * time.Millisecond) - - measurement := models.Measurement{ - Name: r.name, - Time: 0, - Values: make(map[string]interface{}, 5), - } - msg := models.MeasurementBatch{ - Measurements: []models.Measurement{measurement}, // always a single measurement - } - - for t := range ticker.C { - if rand.Float64() <= r.dropPercent { - continue - } - delta := rand.Float64() - 0.5 - walker += delta - - measurement.Time = t.UnixNano() / int64(time.Millisecond) - measurement.Values["value"] = walker - measurement.Values["min"] = walker - ((rand.Float64() * spread) + 0.01) - measurement.Values["max"] = walker + ((rand.Float64() * spread) + 0.01) - - bytes, err := json.Marshal(&msg) - if err != nil { - logger.Warn("unable to marshal line", "error", err) - continue - } - - err = r.publisher(r.channel, bytes) - if err != nil { - logger.Warn("write", "channel", r.channel, "measurement", measurement) - } - } -} diff --git a/pkg/services/live/live.go b/pkg/services/live/live.go index 01a42776985..d1e1c72f71c 100644 --- a/pkg/services/live/live.go +++ b/pkg/services/live/live.go @@ -1,15 +1,21 @@ package live import ( + "context" "fmt" "sync" + "time" "github.com/centrifugal/centrifuge" + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/api/routing" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/models" - "github.com/grafana/grafana/pkg/plugins" + "github.com/grafana/grafana/pkg/plugins/manager" + "github.com/grafana/grafana/pkg/plugins/plugincontext" "github.com/grafana/grafana/pkg/registry" + "github.com/grafana/grafana/pkg/services/datasources" "github.com/grafana/grafana/pkg/services/live/features" "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/tsdb/cloudwatch" @@ -21,13 +27,13 @@ var ( ) func init() { - registry.RegisterService(&GrafanaLive{ + registry.RegisterServiceWithPriority(&GrafanaLive{ channels: make(map[string]models.ChannelHandler), channelsMu: sync.RWMutex{}, GrafanaScope: CoreGrafanaScope{ Features: make(map[string]models.ChannelHandlerFactory), }, - }) + }, registry.Low) } // CoreGrafanaScope list of core features @@ -40,11 +46,14 @@ type CoreGrafanaScope struct { // GrafanaLive pretends to be the server type GrafanaLive struct { - Cfg *setting.Cfg `inject:""` - RouteRegister routing.RouteRegister `inject:""` - LogsService *cloudwatch.LogsService `inject:""` - PluginManager plugins.Manager `inject:""` - node *centrifuge.Node + PluginContextProvider *plugincontext.Provider `inject:""` + Cfg *setting.Cfg `inject:""` + RouteRegister routing.RouteRegister `inject:""` + LogsService *cloudwatch.LogsService `inject:""` + PluginManager *manager.PluginManager `inject:""` + DatasourceCache datasources.CacheService `inject:""` + + node *centrifuge.Node // The websocket handler WebsocketHandler interface{} @@ -55,9 +64,32 @@ type GrafanaLive struct { // The core internal features GrafanaScope CoreGrafanaScope + + contextGetter *pluginContextGetter + streamManager *features.StreamManager } -// Init initializes the instance. +func (g *GrafanaLive) getStreamPlugin(pluginID string) (backend.StreamHandler, error) { + plugin, ok := g.PluginManager.BackendPluginManager.Get(pluginID) + if !ok { + return nil, fmt.Errorf("plugin not found: %s", pluginID) + } + streamHandler, ok := plugin.(backend.StreamHandler) + if !ok { + return nil, fmt.Errorf("%s plugin does not implement StreamHandler: %#v", pluginID, plugin) + } + return streamHandler, nil +} + +func (g *GrafanaLive) Run(ctx context.Context) error { + if g.streamManager != nil { + // Only run stream manager if GrafanaLive properly initialized. + return g.streamManager.Run(ctx) + } + return nil +} + +// Init initializes Live service. // Required to implement the registry.Service interface. func (g *GrafanaLive) Init() error { logger.Debug("GrafanaLive initialization") @@ -76,24 +108,25 @@ func (g *GrafanaLive) Init() error { // Node is the core object in Centrifuge library responsible for many useful // things. For example Node allows to publish messages to channels from server - // side with its Publish method, but in this example we will publish messages - // only from client side. + // side with its Publish method. node, err := centrifuge.New(cfg) if err != nil { return err } g.node = node + g.contextGetter = newPluginContextGetter(g.PluginContextProvider) + + channelPublisher := newPluginChannelPublisher(node) + presenceGetter := newPluginPresenceGetter(node) + g.streamManager = features.NewStreamManager(channelPublisher, presenceGetter) + // Initialize the main features dash := &features.DashboardHandler{ Publisher: g.Publish, } - g.GrafanaScope.Dashboards = dash g.GrafanaScope.Features["dashboard"] = dash - g.GrafanaScope.Features["testdata"] = &features.TestDataSupplier{ - Publisher: g.Publish, - } g.GrafanaScope.Features["broadcast"] = &features.BroadcastRunner{} g.GrafanaScope.Features["measurements"] = &features.MeasurementsRunner{} @@ -102,11 +135,14 @@ func (g *GrafanaLive) Init() error { // different goroutines (belonging to different client connections). This is also // true for other event handlers. node.OnConnect(func(client *centrifuge.Client) { - logger.Debug("Client connected", "user", client.UserID()) + logger.Debug("Client connected", "user", client.UserID(), "client", client.ID()) + connectedAt := time.Now() client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) { - handler, err := g.GetChannelHandler(e.Channel) + logger.Debug("Client wants to subscribe", "user", client.UserID(), "client", client.ID(), "channel", e.Channel) + handler, err := g.GetChannelHandler(client.Context(), e.Channel) if err != nil { + logger.Error("Error getting channel handler", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err) cb(centrifuge.SubscribeReply{}, err) } else { cb(handler.OnSubscribe(client, e)) @@ -117,13 +153,19 @@ func (g *GrafanaLive) Init() error { // In general, we should prefer writing to the HTTP API, but this // allows some simple prototypes to work quickly. client.OnPublish(func(e centrifuge.PublishEvent, cb centrifuge.PublishCallback) { - handler, err := g.GetChannelHandler(e.Channel) + logger.Debug("Client wants to publish", "user", client.UserID(), "client", client.ID(), "channel", e.Channel) + handler, err := g.GetChannelHandler(client.Context(), e.Channel) if err != nil { + logger.Error("Error getting channel handler", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err) cb(centrifuge.PublishReply{}, err) } else { cb(handler.OnPublish(client, e)) } }) + + client.OnDisconnect(func(_ centrifuge.DisconnectEvent) { + logger.Debug("Client disconnected", "user", client.UserID(), "client", client.ID(), "elapsed", time.Since(connectedAt)) + }) }) // Run node. This method does not block. @@ -149,6 +191,7 @@ func (g *GrafanaLive) Init() error { UserID: fmt.Sprintf("%d", user.UserId), } newCtx := centrifuge.SetCredentials(ctx.Req.Context(), cred) + newCtx = setContextSignedUser(newCtx, user) r := ctx.Req.Request r = r.WithContext(newCtx) // Set a user ID. @@ -161,12 +204,13 @@ func (g *GrafanaLive) Init() error { return nil } -// GetChannelHandler gives threadsafe access to the channel -func (g *GrafanaLive) GetChannelHandler(channel string) (models.ChannelHandler, error) { +// GetChannelHandler gives thread-safe access to the channel. +func (g *GrafanaLive) GetChannelHandler(ctx context.Context, channel string) (models.ChannelHandler, error) { g.channelsMu.RLock() c, ok := g.channels[channel] g.channelsMu.RUnlock() // defer? but then you can't lock further down if ok { + logger.Debug("Found cached channel handler", "channel", channel) return c, nil } @@ -175,65 +219,94 @@ func (g *GrafanaLive) GetChannelHandler(channel string) (models.ChannelHandler, if !addr.IsValid() { return nil, fmt.Errorf("invalid channel: %q", channel) } - logger.Info("initChannel", "channel", channel, "address", addr) g.channelsMu.Lock() defer g.channelsMu.Unlock() c, ok = g.channels[channel] // may have filled in while locked if ok { + logger.Debug("Found cached channel handler", "channel", channel) return c, nil } - getter, err := g.GetChannelHandlerFactory(addr.Scope, addr.Namespace) + getter, err := g.GetChannelHandlerFactory(ctx, addr.Scope, addr.Namespace) if err != nil { - return nil, err + return nil, fmt.Errorf("error getting channel handler factory: %w", err) } - // First access will initialize + // First access will initialize. c, err = getter.GetHandlerForPath(addr.Path) if err != nil { - return nil, err + return nil, fmt.Errorf("error getting handler for path: %w", err) } + logger.Info("Initialized channel handler", "channel", channel, "address", addr) g.channels[channel] = c return c, nil } // GetChannelHandlerFactory gets a ChannelHandlerFactory for a namespace. -// It gives threadsafe access to the channel. -func (g *GrafanaLive) GetChannelHandlerFactory(scope string, name string) (models.ChannelHandlerFactory, error) { - if scope == "grafana" { - p, ok := g.GrafanaScope.Features[name] - if ok { - return p, nil - } - return nil, fmt.Errorf("unknown feature: %q", name) +// It gives thread-safe access to the channel. +func (g *GrafanaLive) GetChannelHandlerFactory(ctx context.Context, scope string, namespace string) (models.ChannelHandlerFactory, error) { + switch scope { + case ScopeGrafana: + return g.handleGrafanaScope(ctx, namespace) + case ScopePlugin: + return g.handlePluginScope(ctx, namespace) + case ScopeDatasource: + return g.handleDatasourceScope(ctx, namespace) + default: + return nil, fmt.Errorf("invalid scope: %q", scope) } +} - if scope == "ds" { - return nil, fmt.Errorf("todo... look up datasource: %q", name) +func (g *GrafanaLive) handleGrafanaScope(_ context.Context, namespace string) (models.ChannelHandlerFactory, error) { + if p, ok := g.GrafanaScope.Features[namespace]; ok { + return p, nil } + return nil, fmt.Errorf("unknown feature: %q", namespace) +} - if scope == "plugin" { - // Temporary hack until we have a more generic solution later on - if name == "cloudwatch" { - return &cloudwatch.LogQueryRunnerSupplier{ - Publisher: g.Publish, - Service: g.LogsService, - }, nil - } - - p := g.PluginManager.GetPlugin(name) - if p != nil { - h := &PluginHandler{ - Plugin: p, - } - return h, nil - } - return nil, fmt.Errorf("unknown plugin: %q", name) +func (g *GrafanaLive) handlePluginScope(_ context.Context, namespace string) (models.ChannelHandlerFactory, error) { + // Temporary hack until we have a more generic solution later on + if namespace == "cloudwatch" { + return &cloudwatch.LogQueryRunnerSupplier{ + Publisher: g.Publish, + Service: g.LogsService, + }, nil } + streamHandler, err := g.getStreamPlugin(namespace) + if err != nil { + return nil, fmt.Errorf("can't find stream plugin: %s", namespace) + } + return features.NewPluginRunner( + namespace, + "", + g.streamManager, + g.contextGetter, + streamHandler, + ), nil +} - return nil, fmt.Errorf("invalid scope: %q", scope) +func (g *GrafanaLive) handleDatasourceScope(ctx context.Context, namespace string) (models.ChannelHandlerFactory, error) { + user, ok := getContextSignedUser(ctx) + if !ok { + return nil, fmt.Errorf("no signed user found in context") + } + ds, err := g.DatasourceCache.GetDatasourceByUID(namespace, user, false) + if err != nil { + return nil, fmt.Errorf("error getting datasource: %w", err) + } + streamHandler, err := g.getStreamPlugin(ds.Name) + if err != nil { + return nil, fmt.Errorf("can't find stream plugin: %s", namespace) + } + return features.NewPluginRunner( + ds.Type, + ds.Uid, + g.streamManager, + g.contextGetter, + streamHandler, + ), nil } // Publish sends the data to the channel without checking permissions etc diff --git a/pkg/services/live/pluginHandler.go b/pkg/services/live/pluginHandler.go deleted file mode 100644 index 9e0319369f2..00000000000 --- a/pkg/services/live/pluginHandler.go +++ /dev/null @@ -1,27 +0,0 @@ -package live - -import ( - "github.com/centrifugal/centrifuge" - "github.com/grafana/grafana/pkg/models" - "github.com/grafana/grafana/pkg/plugins" -) - -// PluginHandler manages all the `grafana/dashboard/*` channels -type PluginHandler struct { - Plugin *plugins.PluginBase -} - -// GetHandlerForPath called on init -func (h *PluginHandler) GetHandlerForPath(path string) (models.ChannelHandler, error) { - return h, nil // all dashboards share the same handler -} - -// OnSubscribe for now allows anyone to subscribe -func (h *PluginHandler) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) { - return centrifuge.SubscribeReply{}, nil -} - -// OnPublish checks if a message from the websocket can be broadcast on this channel -func (h *PluginHandler) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) { - return centrifuge.PublishReply{}, nil // broadcast any event -} diff --git a/pkg/services/live/plugin_helpers.go b/pkg/services/live/plugin_helpers.go new file mode 100644 index 00000000000..a66acdade08 --- /dev/null +++ b/pkg/services/live/plugin_helpers.go @@ -0,0 +1,57 @@ +package live + +import ( + "context" + "fmt" + + "github.com/centrifugal/centrifuge" + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/plugins/plugincontext" +) + +type pluginChannelPublisher struct { + node *centrifuge.Node +} + +func newPluginChannelPublisher(node *centrifuge.Node) *pluginChannelPublisher { + return &pluginChannelPublisher{node: node} +} + +func (p *pluginChannelPublisher) Publish(channel string, data []byte) error { + _, err := p.node.Publish(channel, data) + return err +} + +type pluginPresenceGetter struct { + node *centrifuge.Node +} + +func newPluginPresenceGetter(node *centrifuge.Node) *pluginPresenceGetter { + return &pluginPresenceGetter{node: node} +} + +func (p *pluginPresenceGetter) GetNumSubscribers(channel string) (int, error) { + res, err := p.node.PresenceStats(channel) + if err != nil { + return 0, err + } + return res.NumClients, nil +} + +type pluginContextGetter struct { + PluginContextProvider *plugincontext.Provider +} + +func newPluginContextGetter(pluginContextProvider *plugincontext.Provider) *pluginContextGetter { + return &pluginContextGetter{ + PluginContextProvider: pluginContextProvider, + } +} + +func (g *pluginContextGetter) GetPluginContext(ctx context.Context, pluginID string, datasourceUID string) (backend.PluginContext, bool, error) { + user, ok := getContextSignedUser(ctx) + if !ok { + return backend.PluginContext{}, false, fmt.Errorf("no signed user found in context") + } + return g.PluginContextProvider.Get(pluginID, datasourceUID, user) +} diff --git a/pkg/services/live/scope.go b/pkg/services/live/scope.go new file mode 100644 index 00000000000..e56577fa58c --- /dev/null +++ b/pkg/services/live/scope.go @@ -0,0 +1,10 @@ +package live + +const ( + // ScopeGrafana contains builtin features of Grafana Core. + ScopeGrafana = "grafana" + // ScopePlugin passes control to a plugin. + ScopePlugin = "plugin" + // ScopeDatasource passes control to a datasource plugin. + ScopeDatasource = "ds" +) diff --git a/pkg/tsdb/testdatasource/stream_handler.go b/pkg/tsdb/testdatasource/stream_handler.go new file mode 100644 index 00000000000..73f67dcf9c8 --- /dev/null +++ b/pkg/tsdb/testdatasource/stream_handler.go @@ -0,0 +1,104 @@ +package testdatasource + +import ( + "context" + "fmt" + "math/rand" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/data" + + "github.com/grafana/grafana/pkg/cmd/grafana-cli/logger" + "github.com/grafana/grafana/pkg/infra/log" +) + +type testStreamHandler struct { + logger log.Logger +} + +func newTestStreamHandler(logger log.Logger) *testStreamHandler { + return &testStreamHandler{ + logger: logger, + } +} + +func (p *testStreamHandler) CanSubscribeToStream(_ context.Context, req *backend.SubscribeToStreamRequest) (*backend.SubscribeToStreamResponse, error) { + p.logger.Debug("Allowing access to stream", "path", req.Path, "user", req.PluginContext.User) + return &backend.SubscribeToStreamResponse{OK: true}, nil +} + +func (p *testStreamHandler) RunStream(ctx context.Context, request *backend.RunStreamRequest, sender backend.StreamPacketSender) error { + p.logger.Debug("New stream call", "path", request.Path) + var conf testStreamConfig + switch request.Path { + case "random-2s-stream": + conf = testStreamConfig{ + Interval: 200 * time.Millisecond, + } + case "random-flakey-stream": + conf = testStreamConfig{ + Interval: 200 * time.Millisecond, + Drop: 0.6, + } + case "random-20Hz-stream": + conf = testStreamConfig{ + Interval: 50 * time.Millisecond, + } + default: + return fmt.Errorf("testdata plugin does not support path: %s", request.Path) + } + return p.runTestStream(ctx, request.Path, conf, sender) +} + +type testStreamConfig struct { + Interval time.Duration + Drop float64 +} + +func (p *testStreamHandler) runTestStream(ctx context.Context, path string, conf testStreamConfig, sender backend.StreamPacketSender) error { + spread := 50.0 + walker := rand.Float64() * 100 + + ticker := time.NewTicker(conf.Interval) + defer ticker.Stop() + + frame := data.NewFrame("testdata", + data.NewField("Time", nil, make([]time.Time, 1)), + data.NewField("Value", nil, make([]float64, 1)), + data.NewField("Min", nil, make([]float64, 1)), + data.NewField("Max", nil, make([]float64, 1)), + ) + + for { + select { + case <-ctx.Done(): + p.logger.Debug("Stop streaming data for path", "path", path) + return ctx.Err() + case t := <-ticker.C: + if rand.Float64() < conf.Drop { + continue + } + delta := rand.Float64() - 0.5 + walker += delta + + frame.Fields[0].Set(0, t) + frame.Fields[1].Set(0, walker) // Value + frame.Fields[2].Set(0, walker-((rand.Float64()*spread)+0.01)) // Min + frame.Fields[3].Set(0, walker+((rand.Float64()*spread)+0.01)) // Max + + bytes, err := data.FrameToJSON(frame, true, true) + if err != nil { + logger.Warn("unable to marshal line", "error", err) + continue + } + + packet := &backend.StreamPacket{ + Payload: bytes, + } + if err := sender.Send(packet); err != nil { + return err + } + } + } +} diff --git a/pkg/tsdb/testdatasource/testdata.go b/pkg/tsdb/testdatasource/testdata.go index 72d722c78bb..de009cdd1a7 100644 --- a/pkg/tsdb/testdatasource/testdata.go +++ b/pkg/tsdb/testdatasource/testdata.go @@ -33,6 +33,7 @@ func (p *testDataPlugin) Init() error { factory := coreplugin.New(backend.ServeOpts{ QueryDataHandler: p.queryMux, CallResourceHandler: httpadapter.New(resourceMux), + StreamHandler: newTestStreamHandler(p.logger), }) err := p.BackendPluginManager.Register("testdata", factory) if err != nil { diff --git a/public/app/features/live/features.ts b/public/app/features/live/features.ts index 0dd28283481..79a40951a13 100644 --- a/public/app/features/live/features.ts +++ b/public/app/features/live/features.ts @@ -7,6 +7,7 @@ import { grafanaLiveCoreFeatures } from './scopes'; export function registerLiveFeatures() { const random2s = new MeasurementCollector(); const randomFlakey = new MeasurementCollector(); + const random20Hz = new MeasurementCollector(); const channels: LiveChannelConfig[] = [ { path: 'random-2s-stream', @@ -20,6 +21,12 @@ export function registerLiveFeatures() { getController: () => randomFlakey, processMessage: randomFlakey.addBatch, }, + { + path: 'random-20Hz-stream', + description: 'Random stream with points in 20Hz', + getController: () => random20Hz, + processMessage: random20Hz.addBatch, + }, ]; grafanaLiveCoreFeatures.register({ diff --git a/public/app/plugins/datasource/testdata/components/GrafanaLiveEditor.tsx b/public/app/plugins/datasource/testdata/components/GrafanaLiveEditor.tsx index c30da6eddd8..a0c150e2b22 100644 --- a/public/app/plugins/datasource/testdata/components/GrafanaLiveEditor.tsx +++ b/public/app/plugins/datasource/testdata/components/GrafanaLiveEditor.tsx @@ -14,6 +14,11 @@ const liveTestDataChannels = [ value: 'random-flakey-stream', description: 'Stream that returns data in random intervals', }, + { + label: 'random-20Hz-stream', + value: 'random-20Hz-stream', + description: 'Random stream with points in 20Hz', + }, ]; export const GrafanaLiveEditor = ({ onChange, query }: EditorProps) => { diff --git a/public/app/plugins/datasource/testdata/datasource.ts b/public/app/plugins/datasource/testdata/datasource.ts index 61814701afb..1677ce0f239 100644 --- a/public/app/plugins/datasource/testdata/datasource.ts +++ b/public/app/plugins/datasource/testdata/datasource.ts @@ -220,7 +220,7 @@ function runGrafanaLiveQuery( } return getLiveMeasurementsObserver( { - scope: LiveChannelScope.Grafana, + scope: LiveChannelScope.Plugin, namespace: 'testdata', path: target.channel, }, diff --git a/public/app/plugins/datasource/testdata/module.tsx b/public/app/plugins/datasource/testdata/module.tsx index 060e1d08699..7a01feb1ca7 100644 --- a/public/app/plugins/datasource/testdata/module.tsx +++ b/public/app/plugins/datasource/testdata/module.tsx @@ -3,6 +3,7 @@ import { TestDataDataSource } from './datasource'; import { TestInfoTab } from './TestInfoTab'; import { ConfigEditor } from './ConfigEditor'; import { QueryEditor } from './QueryEditor'; +import { LiveMeasurementsSupport } from 'app/features/live/measurements/measurementsSupport'; class TestDataAnnotationsQueryCtrl { annotation: any; @@ -13,6 +14,7 @@ class TestDataAnnotationsQueryCtrl { export const plugin = new DataSourcePlugin(TestDataDataSource) .setConfigEditor(ConfigEditor) .setQueryEditor(QueryEditor) + .setChannelSupport(new LiveMeasurementsSupport()) .setAnnotationQueryCtrl(TestDataAnnotationsQueryCtrl) .addConfigPage({ title: 'Setup',