diff --git a/pkg/plugins/backendplugin/grpcplugin/client.go b/pkg/plugins/backendplugin/grpcplugin/client.go index 013dd2d0419..2905a1462cf 100644 --- a/pkg/plugins/backendplugin/grpcplugin/client.go +++ b/pkg/plugins/backendplugin/grpcplugin/client.go @@ -27,6 +27,18 @@ var handshake = goplugin.HandshakeConfig{ MagicCookieValue: grpcplugin.MagicCookieValue, } +// pluginSet is list of plugins supported on v2. +var pluginSet = map[int]goplugin.PluginSet{ + grpcplugin.ProtocolVersion: { + "diagnostics": &grpcplugin.DiagnosticsGRPCPlugin{}, + "resource": &grpcplugin.ResourceGRPCPlugin{}, + "data": &grpcplugin.DataGRPCPlugin{}, + "stream": &grpcplugin.StreamGRPCPlugin{}, + "renderer": &pluginextensionv2.RendererGRPCPlugin{}, + "secretsmanager": &secretsmanagerplugin.SecretsManagerGRPCPlugin{}, + }, +} + func newClientConfig(executablePath string, args []string, env []string, logger log.Logger, versionedPlugins map[int]goplugin.PluginSet) *goplugin.ClientConfig { // We can ignore gosec G201 here, since the dynamic part of executablePath comes from the plugin definition @@ -70,18 +82,6 @@ type PluginDescriptor struct { startSecretsManagerFn StartSecretsManagerFunc } -// getV2PluginSet returns list of plugins supported on v2. -func getV2PluginSet() goplugin.PluginSet { - return goplugin.PluginSet{ - "diagnostics": &grpcplugin.DiagnosticsGRPCPlugin{}, - "resource": &grpcplugin.ResourceGRPCPlugin{}, - "data": &grpcplugin.DataGRPCPlugin{}, - "stream": &grpcplugin.StreamGRPCPlugin{}, - "renderer": &pluginextensionv2.RendererGRPCPlugin{}, - "secretsmanager": &secretsmanagerplugin.SecretsManagerGRPCPlugin{}, - } -} - // NewBackendPlugin creates a new backend plugin factory used for registering a backend plugin. func NewBackendPlugin(pluginID, executablePath string, executableArgs ...string) backendplugin.PluginFactoryFunc { return newBackendPlugin(pluginID, executablePath, true, executableArgs...) @@ -95,38 +95,32 @@ func NewUnmanagedBackendPlugin(pluginID, executablePath string, executableArgs . // NewBackendPlugin creates a new backend plugin factory used for registering a backend plugin. func newBackendPlugin(pluginID, executablePath string, managed bool, executableArgs ...string) backendplugin.PluginFactoryFunc { return newPlugin(PluginDescriptor{ - pluginID: pluginID, - executablePath: executablePath, - executableArgs: executableArgs, - managed: managed, - versionedPlugins: map[int]goplugin.PluginSet{ - grpcplugin.ProtocolVersion: getV2PluginSet(), - }, + pluginID: pluginID, + executablePath: executablePath, + executableArgs: executableArgs, + managed: managed, + versionedPlugins: pluginSet, }) } // NewRendererPlugin creates a new renderer plugin factory used for registering a backend renderer plugin. func NewRendererPlugin(pluginID, executablePath string, startFn StartRendererFunc) backendplugin.PluginFactoryFunc { return newPlugin(PluginDescriptor{ - pluginID: pluginID, - executablePath: executablePath, - managed: false, - versionedPlugins: map[int]goplugin.PluginSet{ - grpcplugin.ProtocolVersion: getV2PluginSet(), - }, - startRendererFn: startFn, + pluginID: pluginID, + executablePath: executablePath, + managed: false, + versionedPlugins: pluginSet, + startRendererFn: startFn, }) } // NewSecretsManagerPlugin creates a new secrets manager plugin factory used for registering a backend secrets manager plugin. func NewSecretsManagerPlugin(pluginID, executablePath string, startFn StartSecretsManagerFunc) backendplugin.PluginFactoryFunc { return newPlugin(PluginDescriptor{ - pluginID: pluginID, - executablePath: executablePath, - managed: false, - versionedPlugins: map[int]goplugin.PluginSet{ - grpcplugin.ProtocolVersion: getV2PluginSet(), - }, + pluginID: pluginID, + executablePath: executablePath, + managed: false, + versionedPlugins: pluginSet, startSecretsManagerFn: startFn, }) } diff --git a/pkg/plugins/backendplugin/grpcplugin/client_proto.go b/pkg/plugins/backendplugin/grpcplugin/client_proto.go new file mode 100644 index 00000000000..cd392b4c693 --- /dev/null +++ b/pkg/plugins/backendplugin/grpcplugin/client_proto.go @@ -0,0 +1,123 @@ +package grpcplugin + +import ( + "context" + "errors" + + "google.golang.org/grpc" + + "github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2" + + "github.com/grafana/grafana/pkg/plugins/log" +) + +var ( + errClientNotStarted = errors.New("plugin client has not been started") +) + +var _ ProtoClient = (*protoClient)(nil) + +type ProtoClient interface { + pluginv2.DataClient + pluginv2.ResourceClient + pluginv2.DiagnosticsClient + pluginv2.StreamClient + + PluginID() string + Logger() log.Logger + Start(context.Context) error + Stop(context.Context) error +} + +type protoClient struct { + plugin *grpcPlugin +} + +type ProtoClientOpts struct { + PluginID string + ExecutablePath string + ExecutableArgs []string + Env []string + Logger log.Logger +} + +func NewProtoClient(opts ProtoClientOpts) (ProtoClient, error) { + p := newGrpcPlugin( + PluginDescriptor{ + pluginID: opts.PluginID, + managed: true, + executablePath: opts.ExecutablePath, + executableArgs: opts.ExecutableArgs, + versionedPlugins: pluginSet, + }, + opts.Logger, + func() []string { return opts.Env }, + ) + + return &protoClient{plugin: p}, nil +} + +func (r *protoClient) PluginID() string { + return r.plugin.descriptor.pluginID +} + +func (r *protoClient) Logger() log.Logger { + return r.plugin.logger +} + +func (r *protoClient) Start(ctx context.Context) error { + return r.plugin.Start(ctx) +} + +func (r *protoClient) Stop(ctx context.Context) error { + return r.plugin.Stop(ctx) +} + +func (r *protoClient) QueryData(ctx context.Context, in *pluginv2.QueryDataRequest, opts ...grpc.CallOption) (*pluginv2.QueryDataResponse, error) { + if r.plugin.pluginClient == nil { + return nil, errClientNotStarted + } + return r.plugin.pluginClient.DataClient.QueryData(ctx, in, opts...) +} + +func (r *protoClient) CallResource(ctx context.Context, in *pluginv2.CallResourceRequest, opts ...grpc.CallOption) (pluginv2.Resource_CallResourceClient, error) { + if r.plugin.pluginClient == nil { + return nil, errClientNotStarted + } + return r.plugin.pluginClient.ResourceClient.CallResource(ctx, in, opts...) +} + +func (r *protoClient) CheckHealth(ctx context.Context, in *pluginv2.CheckHealthRequest, opts ...grpc.CallOption) (*pluginv2.CheckHealthResponse, error) { + if r.plugin.pluginClient == nil { + return nil, errClientNotStarted + } + return r.plugin.pluginClient.DiagnosticsClient.CheckHealth(ctx, in, opts...) +} + +func (r *protoClient) CollectMetrics(ctx context.Context, in *pluginv2.CollectMetricsRequest, opts ...grpc.CallOption) (*pluginv2.CollectMetricsResponse, error) { + if r.plugin.pluginClient == nil { + return nil, errClientNotStarted + } + return r.plugin.pluginClient.DiagnosticsClient.CollectMetrics(ctx, in, opts...) +} + +func (r *protoClient) SubscribeStream(ctx context.Context, in *pluginv2.SubscribeStreamRequest, opts ...grpc.CallOption) (*pluginv2.SubscribeStreamResponse, error) { + if r.plugin.pluginClient == nil { + return nil, errClientNotStarted + } + return r.plugin.pluginClient.StreamClient.SubscribeStream(ctx, in, opts...) +} + +func (r *protoClient) RunStream(ctx context.Context, in *pluginv2.RunStreamRequest, opts ...grpc.CallOption) (pluginv2.Stream_RunStreamClient, error) { + if r.plugin.pluginClient == nil { + return nil, errClientNotStarted + } + return r.plugin.pluginClient.StreamClient.RunStream(ctx, in, opts...) +} + +func (r *protoClient) PublishStream(ctx context.Context, in *pluginv2.PublishStreamRequest, opts ...grpc.CallOption) (*pluginv2.PublishStreamResponse, error) { + if r.plugin.pluginClient == nil { + return nil, errClientNotStarted + } + return r.plugin.pluginClient.StreamClient.PublishStream(ctx, in, opts...) +} diff --git a/pkg/plugins/backendplugin/grpcplugin/client_v2.go b/pkg/plugins/backendplugin/grpcplugin/client_v2.go index b6334e9d904..804fc78f25e 100644 --- a/pkg/plugins/backendplugin/grpcplugin/client_v2.go +++ b/pkg/plugins/backendplugin/grpcplugin/client_v2.go @@ -28,7 +28,7 @@ type ClientV2 struct { secretsmanagerplugin.SecretsManagerPlugin } -func newClientV2(descriptor PluginDescriptor, logger log.Logger, rpcClient plugin.ClientProtocol) (pluginClient, error) { +func newClientV2(descriptor PluginDescriptor, logger log.Logger, rpcClient plugin.ClientProtocol) (*ClientV2, error) { rawDiagnostics, err := rpcClient.Dispense("diagnostics") if err != nil { return nil, err @@ -59,7 +59,7 @@ func newClientV2(descriptor PluginDescriptor, logger log.Logger, rpcClient plugi return nil, err } - c := ClientV2{} + c := &ClientV2{} if rawDiagnostics != nil { if diagnosticsClient, ok := rawDiagnostics.(grpcplugin.DiagnosticsClient); ok { c.DiagnosticsClient = diagnosticsClient @@ -108,7 +108,7 @@ func newClientV2(descriptor PluginDescriptor, logger log.Logger, rpcClient plugi } } - return &c, nil + return c, nil } func (c *ClientV2) CollectMetrics(ctx context.Context, req *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) { diff --git a/pkg/plugins/backendplugin/grpcplugin/grpc_plugin.go b/pkg/plugins/backendplugin/grpcplugin/grpc_plugin.go index 4a6c452bf52..ffd723690f5 100644 --- a/pkg/plugins/backendplugin/grpcplugin/grpc_plugin.go +++ b/pkg/plugins/backendplugin/grpcplugin/grpc_plugin.go @@ -26,7 +26,7 @@ type grpcPlugin struct { descriptor PluginDescriptor clientFactory func() *plugin.Client client *plugin.Client - pluginClient pluginClient + pluginClient *ClientV2 logger log.Logger mutex sync.RWMutex decommissioned bool @@ -35,13 +35,17 @@ type grpcPlugin struct { // newPlugin allocates and returns a new gRPC (external) backendplugin.Plugin. func newPlugin(descriptor PluginDescriptor) backendplugin.PluginFactoryFunc { return func(pluginID string, logger log.Logger, env func() []string) (backendplugin.Plugin, error) { - return &grpcPlugin{ - descriptor: descriptor, - logger: logger, - clientFactory: func() *plugin.Client { - return plugin.NewClient(newClientConfig(descriptor.executablePath, descriptor.executableArgs, env(), logger, descriptor.versionedPlugins)) - }, - }, nil + return newGrpcPlugin(descriptor, logger, env), nil + } +} + +func newGrpcPlugin(descriptor PluginDescriptor, logger log.Logger, env func() []string) *grpcPlugin { + return &grpcPlugin{ + descriptor: descriptor, + logger: logger, + clientFactory: func() *plugin.Client { + return plugin.NewClient(newClientConfig(descriptor.executablePath, descriptor.executableArgs, env(), logger, descriptor.versionedPlugins)) + }, } }