mirror of
https://github.com/grafana/grafana.git
synced 2025-08-02 16:12:27 +08:00

Removes request/response connection/hop headers for call resource in similar manner as Go's reverse proxy functions. Also removes Prometheus datasource custom call resource header manipulation in regards to hop-by-hop headers. Fixes #60076 Ref #58646 Co-authored-by: Will Browne <wbrowne@users.noreply.github.com>
281 lines
7.7 KiB
Go
281 lines
7.7 KiB
Go
package client
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net/textproto"
|
|
"strings"
|
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
|
|
|
"github.com/grafana/grafana/pkg/plugins"
|
|
"github.com/grafana/grafana/pkg/plugins/backendplugin"
|
|
"github.com/grafana/grafana/pkg/plugins/backendplugin/instrumentation"
|
|
"github.com/grafana/grafana/pkg/plugins/config"
|
|
"github.com/grafana/grafana/pkg/plugins/manager/registry"
|
|
)
|
|
|
|
var _ plugins.Client = (*Service)(nil)
|
|
|
|
type Service struct {
|
|
pluginRegistry registry.Service
|
|
cfg *config.Cfg
|
|
}
|
|
|
|
func ProvideService(pluginRegistry registry.Service, cfg *config.Cfg) *Service {
|
|
return &Service{
|
|
pluginRegistry: pluginRegistry,
|
|
cfg: cfg,
|
|
}
|
|
}
|
|
|
|
func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
|
if req == nil {
|
|
return nil, fmt.Errorf("req cannot be nil")
|
|
}
|
|
|
|
plugin, exists := s.plugin(ctx, req.PluginContext.PluginID)
|
|
if !exists {
|
|
return nil, plugins.ErrPluginNotRegistered.Errorf("%w", backendplugin.ErrPluginNotRegistered)
|
|
}
|
|
|
|
var resp *backend.QueryDataResponse
|
|
err := instrumentation.InstrumentQueryDataRequest(ctx, &req.PluginContext, s.cfg, func() (innerErr error) {
|
|
resp, innerErr = plugin.QueryData(ctx, req)
|
|
return
|
|
})
|
|
|
|
if err != nil {
|
|
if errors.Is(err, backendplugin.ErrMethodNotImplemented) {
|
|
return nil, plugins.ErrMethodNotImplemented.Errorf("%w", backendplugin.ErrMethodNotImplemented)
|
|
}
|
|
|
|
if errors.Is(err, backendplugin.ErrPluginUnavailable) {
|
|
return nil, plugins.ErrPluginUnavailable.Errorf("%w", backendplugin.ErrPluginUnavailable)
|
|
}
|
|
|
|
return nil, plugins.ErrPluginDownstreamError.Errorf("%v: %w", "failed to query data", err)
|
|
}
|
|
|
|
for refID, res := range resp.Responses {
|
|
// set frame ref ID based on response ref ID
|
|
for _, f := range res.Frames {
|
|
if f.RefID == "" {
|
|
f.RefID = refID
|
|
}
|
|
}
|
|
}
|
|
|
|
return resp, err
|
|
}
|
|
|
|
func (s *Service) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
|
|
if req == nil {
|
|
return fmt.Errorf("req cannot be nil")
|
|
}
|
|
|
|
if sender == nil {
|
|
return fmt.Errorf("sender cannot be nil")
|
|
}
|
|
|
|
p, exists := s.plugin(ctx, req.PluginContext.PluginID)
|
|
if !exists {
|
|
return backendplugin.ErrPluginNotRegistered
|
|
}
|
|
err := instrumentation.InstrumentCallResourceRequest(ctx, &req.PluginContext, s.cfg, func() error {
|
|
removeConnectionHeaders(req.Headers)
|
|
removeHopByHopHeaders(req.Headers)
|
|
|
|
wrappedSender := callResourceResponseSenderFunc(func(res *backend.CallResourceResponse) error {
|
|
if res != nil && len(res.Headers) > 0 {
|
|
removeConnectionHeaders(res.Headers)
|
|
removeHopByHopHeaders(res.Headers)
|
|
}
|
|
|
|
return sender.Send(res)
|
|
})
|
|
|
|
if err := p.CallResource(ctx, req, wrappedSender); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) CollectMetrics(ctx context.Context, req *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) {
|
|
if req == nil {
|
|
return nil, fmt.Errorf("req cannot be nil")
|
|
}
|
|
|
|
p, exists := s.plugin(ctx, req.PluginContext.PluginID)
|
|
if !exists {
|
|
return nil, backendplugin.ErrPluginNotRegistered
|
|
}
|
|
|
|
var resp *backend.CollectMetricsResult
|
|
err := instrumentation.InstrumentCollectMetrics(ctx, &req.PluginContext, s.cfg, func() (innerErr error) {
|
|
resp, innerErr = p.CollectMetrics(ctx, req)
|
|
return
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
|
|
if req == nil {
|
|
return nil, fmt.Errorf("req cannot be nil")
|
|
}
|
|
|
|
p, exists := s.plugin(ctx, req.PluginContext.PluginID)
|
|
if !exists {
|
|
return nil, backendplugin.ErrPluginNotRegistered
|
|
}
|
|
|
|
var resp *backend.CheckHealthResult
|
|
err := instrumentation.InstrumentCheckHealthRequest(ctx, &req.PluginContext, s.cfg, func() (innerErr error) {
|
|
resp, innerErr = p.CheckHealth(ctx, req)
|
|
return
|
|
})
|
|
|
|
if err != nil {
|
|
if errors.Is(err, backendplugin.ErrMethodNotImplemented) {
|
|
return nil, err
|
|
}
|
|
|
|
if errors.Is(err, backendplugin.ErrPluginUnavailable) {
|
|
return nil, err
|
|
}
|
|
|
|
return nil, fmt.Errorf("%v: %w", "failed to check plugin health", backendplugin.ErrHealthCheckFailed)
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (s *Service) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
|
|
if req == nil {
|
|
return nil, fmt.Errorf("req cannot be nil")
|
|
}
|
|
|
|
plugin, exists := s.plugin(ctx, req.PluginContext.PluginID)
|
|
if !exists {
|
|
return nil, backendplugin.ErrPluginNotRegistered
|
|
}
|
|
|
|
return plugin.SubscribeStream(ctx, req)
|
|
}
|
|
|
|
func (s *Service) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
|
|
if req == nil {
|
|
return nil, fmt.Errorf("req cannot be nil")
|
|
}
|
|
|
|
plugin, exists := s.plugin(ctx, req.PluginContext.PluginID)
|
|
if !exists {
|
|
return nil, backendplugin.ErrPluginNotRegistered
|
|
}
|
|
|
|
return plugin.PublishStream(ctx, req)
|
|
}
|
|
|
|
func (s *Service) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
|
|
if req == nil {
|
|
return fmt.Errorf("req cannot be nil")
|
|
}
|
|
|
|
if sender == nil {
|
|
return fmt.Errorf("sender cannot be nil")
|
|
}
|
|
|
|
plugin, exists := s.plugin(ctx, req.PluginContext.PluginID)
|
|
if !exists {
|
|
return backendplugin.ErrPluginNotRegistered
|
|
}
|
|
|
|
return plugin.RunStream(ctx, req, sender)
|
|
}
|
|
|
|
// plugin finds a plugin with `pluginID` from the registry that is not decommissioned
|
|
func (s *Service) plugin(ctx context.Context, pluginID string) (*plugins.Plugin, bool) {
|
|
p, exists := s.pluginRegistry.Plugin(ctx, pluginID)
|
|
if !exists {
|
|
return nil, false
|
|
}
|
|
|
|
if p.IsDecommissioned() {
|
|
return nil, false
|
|
}
|
|
|
|
return p, true
|
|
}
|
|
|
|
// removeConnectionHeaders removes hop-by-hop headers listed in the "Connection" header of h.
|
|
// See RFC 7230, section 6.1
|
|
//
|
|
// Based on https://github.com/golang/go/blob/dc04f3ba1f25313bc9c97e728620206c235db9ee/src/net/http/httputil/reverseproxy.go#L411-L421
|
|
func removeConnectionHeaders(h map[string][]string) {
|
|
for _, f := range h["Connection"] {
|
|
for _, sf := range strings.Split(f, ",") {
|
|
if sf = textproto.TrimString(sf); sf != "" {
|
|
for k := range h {
|
|
if textproto.CanonicalMIMEHeaderKey(sf) == textproto.CanonicalMIMEHeaderKey(k) {
|
|
delete(h, k)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Hop-by-hop headers. These are removed when sent to the backend.
|
|
// As of RFC 7230, hop-by-hop headers are required to appear in the
|
|
// Connection header field. These are the headers defined by the
|
|
// obsoleted RFC 2616 (section 13.5.1) and are used for backward
|
|
// compatibility.
|
|
//
|
|
// Copied from https://github.com/golang/go/blob/dc04f3ba1f25313bc9c97e728620206c235db9ee/src/net/http/httputil/reverseproxy.go#L171-L186
|
|
var hopHeaders = []string{
|
|
"Connection",
|
|
"Proxy-Connection", // non-standard but still sent by libcurl and rejected by e.g. google
|
|
"Keep-Alive",
|
|
"Proxy-Authenticate",
|
|
"Proxy-Authorization",
|
|
"Te", // canonicalized version of "TE"
|
|
"Trailer", // not Trailers per URL above; https://www.rfc-editor.org/errata_search.php?eid=4522
|
|
"Transfer-Encoding",
|
|
"Upgrade",
|
|
}
|
|
|
|
// removeHopByHopHeaders removes hop-by-hop headers. Especially
|
|
// important is "Connection" because we want a persistent
|
|
// connection, regardless of what the client sent to us.
|
|
//
|
|
// Based on https://github.com/golang/go/blob/dc04f3ba1f25313bc9c97e728620206c235db9ee/src/net/http/httputil/reverseproxy.go#L276-L281
|
|
func removeHopByHopHeaders(h map[string][]string) {
|
|
for _, hh := range hopHeaders {
|
|
for k := range h {
|
|
if hh == textproto.CanonicalMIMEHeaderKey(k) {
|
|
delete(h, k)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
type callResourceResponseSenderFunc func(res *backend.CallResourceResponse) error
|
|
|
|
func (fn callResourceResponseSenderFunc) Send(res *backend.CallResourceResponse) error {
|
|
return fn(res)
|
|
}
|