From a29cfe5d460865b1f41724c61dba5f10e4a9801b Mon Sep 17 00:00:00 2001 From: Michael Mandrus <41969079+mmandrus@users.noreply.github.com> Date: Fri, 21 Apr 2023 13:03:49 -0400 Subject: [PATCH] Caching: Consolidate resource cache checking and updating in plugin middleware (#67002) * Update the HandleResourceRequest function to mimic the HandleQueryRequest function * Remove CacheResourceResponse function from interface * revert additional thing I missed --- pkg/api/http_server.go | 5 +-- pkg/api/plugin_resource.go | 13 +----- pkg/api/plugin_resource_test.go | 1 - pkg/api/plugins_test.go | 22 +++------- .../manager/client/clienttest/clienttest.go | 17 +++++++ pkg/services/caching/fake_caching_service.go | 8 +--- pkg/services/caching/service.go | 22 ++++++---- .../clientmiddleware/caching_middleware.go | 24 +++++++--- .../caching_middleware_test.go | 44 ++++++++++++++----- 9 files changed, 92 insertions(+), 64 deletions(-) diff --git a/pkg/api/http_server.go b/pkg/api/http_server.go index b9b84af22ed..093eac80840 100644 --- a/pkg/api/http_server.go +++ b/pkg/api/http_server.go @@ -41,7 +41,6 @@ import ( "github.com/grafana/grafana/pkg/services/apikey" "github.com/grafana/grafana/pkg/services/auth" "github.com/grafana/grafana/pkg/services/authn" - "github.com/grafana/grafana/pkg/services/caching" "github.com/grafana/grafana/pkg/services/cleanup" "github.com/grafana/grafana/pkg/services/contexthandler" "github.com/grafana/grafana/pkg/services/correlations" @@ -208,7 +207,6 @@ type HTTPServer struct { statsService stats.Service authnService authn.Service starApi *starApi.API - cachingService caching.CachingService } type ServerOptions struct { @@ -250,7 +248,7 @@ func ProvideHTTPServer(opts ServerOptions, cfg *setting.Cfg, routeRegister routi accesscontrolService accesscontrol.Service, navTreeService navtree.Service, annotationRepo annotations.Repository, tagService tag.Service, searchv2HTTPService searchV2.SearchHTTPService, oauthTokenService oauthtoken.OAuthTokenService, statsService stats.Service, authnService authn.Service, pluginsCDNService *pluginscdn.Service, - starApi *starApi.API, cachingService caching.CachingService, + starApi *starApi.API, ) (*HTTPServer, error) { web.Env = cfg.Env @@ -355,7 +353,6 @@ func ProvideHTTPServer(opts ServerOptions, cfg *setting.Cfg, routeRegister routi authnService: authnService, pluginsCDNService: pluginsCDNService, starApi: starApi, - cachingService: cachingService, } if hs.Listener != nil { hs.log.Debug("Using provided listener") diff --git a/pkg/api/plugin_resource.go b/pkg/api/plugin_resource.go index 5ebe91cb86e..2b69379b920 100644 --- a/pkg/api/plugin_resource.go +++ b/pkg/api/plugin_resource.go @@ -15,7 +15,6 @@ import ( "github.com/grafana/grafana/pkg/plugins/backendplugin" contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model" "github.com/grafana/grafana/pkg/services/datasources" - "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/util/proxyutil" "github.com/grafana/grafana/pkg/web" ) @@ -130,7 +129,7 @@ func (hs *HTTPServer) makePluginResourceRequest(w http.ResponseWriter, req *http var flushStreamErr error go func() { - flushStreamErr = hs.flushStream(req.Context(), crReq, stream, w) + flushStreamErr = hs.flushStream(stream, w) wg.Done() }() @@ -141,10 +140,8 @@ func (hs *HTTPServer) makePluginResourceRequest(w http.ResponseWriter, req *http return flushStreamErr } -func (hs *HTTPServer) flushStream(ctx context.Context, req *backend.CallResourceRequest, stream callResourceClientResponseStream, w http.ResponseWriter) error { +func (hs *HTTPServer) flushStream(stream callResourceClientResponseStream, w http.ResponseWriter) error { processedStreams := 0 - ctx, cancel := context.WithCancel(ctx) - defer cancel() for { resp, err := stream.Recv() if errors.Is(err, io.EOF) { @@ -200,12 +197,6 @@ func (hs *HTTPServer) flushStream(ctx context.Context, req *backend.CallResource if _, err := w.Write(resp.Body); err != nil { hs.log.Error("Failed to write resource response", "err", err) - } else if hs.Features.IsEnabled(featuremgmt.FlagUseCachingService) { - // Placing the new service implementation behind a feature flag until it is known to be stable - - // The enterprise implementation of this function will use the headers and status of the first response, - // And append the body of any subsequent responses. It waits for the context to be canceled before caching the cumulative result. - hs.cachingService.CacheResourceResponse(ctx, req, resp) } if flusher, ok := w.(http.Flusher); ok { diff --git a/pkg/api/plugin_resource_test.go b/pkg/api/plugin_resource_test.go index ff34565961e..3aeb39a8bd6 100644 --- a/pkg/api/plugin_resource_test.go +++ b/pkg/api/plugin_resource_test.go @@ -78,7 +78,6 @@ func TestCallResource(t *testing.T) { hs.QuotaService = quotatest.New(false, nil) hs.pluginStore = ps hs.pluginClient = pluginClient.ProvideService(reg, pCfg) - hs.cachingService = &caching.OSSCachingService{} }) t.Run("Test successful response is received for valid request", func(t *testing.T) { diff --git a/pkg/api/plugins_test.go b/pkg/api/plugins_test.go index 1ce73b89d67..9de3a41e4cc 100644 --- a/pkg/api/plugins_test.go +++ b/pkg/api/plugins_test.go @@ -30,9 +30,7 @@ import ( "github.com/grafana/grafana/pkg/plugins/manager/store" "github.com/grafana/grafana/pkg/plugins/pluginscdn" ac "github.com/grafana/grafana/pkg/services/accesscontrol" - "github.com/grafana/grafana/pkg/services/caching" contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model" - "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/org" "github.com/grafana/grafana/pkg/services/org/orgtest" "github.com/grafana/grafana/pkg/services/pluginsintegration/pluginaccesscontrol" @@ -373,11 +371,9 @@ func Test_GetPluginAssets(t *testing.T) { func TestMakePluginResourceRequest(t *testing.T) { hs := HTTPServer{ - Cfg: setting.NewCfg(), - log: log.New(), - pluginClient: &fakePluginClient{}, - cachingService: &caching.OSSCachingService{}, - Features: &featuremgmt.FeatureManager{}, + Cfg: setting.NewCfg(), + log: log.New(), + pluginClient: &fakePluginClient{}, } req := httptest.NewRequest(http.MethodGet, "/", nil) @@ -403,8 +399,6 @@ func TestMakePluginResourceRequestSetCookieNotPresent(t *testing.T) { pluginClient: &fakePluginClient{ headers: map[string][]string{"Set-Cookie": {"monster"}}, }, - cachingService: &caching.OSSCachingService{}, - Features: &featuremgmt.FeatureManager{}, } req := httptest.NewRequest(http.MethodGet, "/", nil) resp := httptest.NewRecorder() @@ -439,8 +433,6 @@ func TestMakePluginResourceRequestContentTypeUnique(t *testing.T) { "x-another": {"hello"}, }, }, - cachingService: &caching.OSSCachingService{}, - Features: &featuremgmt.FeatureManager{}, } req := httptest.NewRequest(http.MethodGet, "/", nil) resp := httptest.NewRecorder() @@ -464,11 +456,9 @@ func TestMakePluginResourceRequestContentTypeEmpty(t *testing.T) { statusCode: http.StatusNoContent, } hs := HTTPServer{ - Cfg: setting.NewCfg(), - log: log.New(), - pluginClient: pluginClient, - cachingService: &caching.OSSCachingService{}, - Features: &featuremgmt.FeatureManager{}, + Cfg: setting.NewCfg(), + log: log.New(), + pluginClient: pluginClient, } req := httptest.NewRequest(http.MethodGet, "/", nil) resp := httptest.NewRecorder() diff --git a/pkg/plugins/manager/client/clienttest/clienttest.go b/pkg/plugins/manager/client/clienttest/clienttest.go index 32995bfa38f..6b3be014cdf 100644 --- a/pkg/plugins/manager/client/clienttest/clienttest.go +++ b/pkg/plugins/manager/client/clienttest/clienttest.go @@ -179,6 +179,9 @@ type ClientDecoratorTest struct { SubscribeStreamCtx context.Context PublishStreamReq *backend.PublishStreamRequest PublishStreamCtx context.Context + + // When CallResource is called, the sender will be called with these values + callResourceResponses []*backend.CallResourceResponse } type ClientDecoratorTestOption func(*ClientDecoratorTest) @@ -197,6 +200,13 @@ func NewClientDecoratorTest(t *testing.T, opts ...ClientDecoratorTestOption) *Cl CallResourceFunc: func(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { cdt.CallResourceReq = req cdt.CallResourceCtx = ctx + if cdt.callResourceResponses != nil { + for _, r := range cdt.callResourceResponses { + if err := sender.Send(r); err != nil { + return err + } + } + } return nil }, CheckHealthFunc: func(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { @@ -262,3 +272,10 @@ func WithMiddlewares(middlewares ...plugins.ClientMiddleware) ClientDecoratorTes cdt.Middlewares = append(cdt.Middlewares, middlewares...) }) } + +// WithResourceResponses can be used to make the test client send simulated resource responses back over the sender stream +func WithResourceResponses(responses []*backend.CallResourceResponse) ClientDecoratorTestOption { + return ClientDecoratorTestOption(func(cdt *ClientDecoratorTest) { + cdt.callResourceResponses = responses + }) +} diff --git a/pkg/services/caching/fake_caching_service.go b/pkg/services/caching/fake_caching_service.go index c4cb7434425..522be45f02f 100644 --- a/pkg/services/caching/fake_caching_service.go +++ b/pkg/services/caching/fake_caching_service.go @@ -11,20 +11,16 @@ import ( type FakeOSSCachingService struct { calls map[string]int ReturnHit bool - ReturnResourceResponse *backend.CallResourceResponse + ReturnResourceResponse CachedResourceDataResponse ReturnQueryResponse CachedQueryDataResponse } -func (f *FakeOSSCachingService) CacheResourceResponse(ctx context.Context, req *backend.CallResourceRequest, resp *backend.CallResourceResponse) { - f.calls["CacheResourceResponse"]++ -} - func (f *FakeOSSCachingService) HandleQueryRequest(ctx context.Context, req *backend.QueryDataRequest) (bool, CachedQueryDataResponse) { f.calls["HandleQueryRequest"]++ return f.ReturnHit, f.ReturnQueryResponse } -func (f *FakeOSSCachingService) HandleResourceRequest(ctx context.Context, req *backend.CallResourceRequest) (bool, *backend.CallResourceResponse) { +func (f *FakeOSSCachingService) HandleResourceRequest(ctx context.Context, req *backend.CallResourceRequest) (bool, CachedResourceDataResponse) { f.calls["HandleResourceRequest"]++ return f.ReturnHit, f.ReturnResourceResponse } diff --git a/pkg/services/caching/service.go b/pkg/services/caching/service.go index 57be8735c78..6f37099fe15 100644 --- a/pkg/services/caching/service.go +++ b/pkg/services/caching/service.go @@ -16,6 +16,7 @@ const ( ) type CacheQueryResponseFn func(context.Context, *backend.QueryDataResponse) +type CacheResourceResponseFn func(context.Context, *backend.CallResourceResponse) type CachedQueryDataResponse struct { // The cached data response associated with a query, or nil if no cached data is found @@ -25,6 +26,15 @@ type CachedQueryDataResponse struct { UpdateCacheFn CacheQueryResponseFn } +type CachedResourceDataResponse struct { + // The cached response associated with a resource request, or nil if no cached data is found + Response *backend.CallResourceResponse + // A function that should be used to cache a CallResourceResponse for a given resource request. + // It can be set to nil by the method implementation (if there is an error, for example), so it should be checked before being called. + // Because plugins can send multiple responses asyncronously, the implementation should be able to handle multiple calls to this function for one request. + UpdateCacheFn CacheResourceResponseFn +} + func ProvideCachingService() *OSSCachingService { return &OSSCachingService{} } @@ -36,10 +46,7 @@ type CachingService interface { HandleQueryRequest(context.Context, *backend.QueryDataRequest) (bool, CachedQueryDataResponse) // HandleResourceRequest uses a CallResourceRequest to check the cache for any existing results for that request. If none are found, it should return false. // This function may populate any response headers (accessible through the context) with the cache status using the X-Cache header. - HandleResourceRequest(context.Context, *backend.CallResourceRequest) (bool, *backend.CallResourceResponse) - // CacheResourceResponse is used to cache resource responses for a resource request. - // Because plugins can send multiple responses asyncronously, the implementation should be able to handle multiple calls to this function for one request. - CacheResourceResponse(context.Context, *backend.CallResourceRequest, *backend.CallResourceResponse) + HandleResourceRequest(context.Context, *backend.CallResourceRequest) (bool, CachedResourceDataResponse) } // Implementation of interface - does nothing @@ -50,11 +57,8 @@ func (s *OSSCachingService) HandleQueryRequest(ctx context.Context, req *backend return false, CachedQueryDataResponse{} } -func (s *OSSCachingService) HandleResourceRequest(ctx context.Context, req *backend.CallResourceRequest) (bool, *backend.CallResourceResponse) { - return false, nil -} - -func (s *OSSCachingService) CacheResourceResponse(ctx context.Context, req *backend.CallResourceRequest, resp *backend.CallResourceResponse) { +func (s *OSSCachingService) HandleResourceRequest(ctx context.Context, req *backend.CallResourceRequest) (bool, CachedResourceDataResponse) { + return false, CachedResourceDataResponse{} } var _ CachingService = &OSSCachingService{} diff --git a/pkg/services/pluginsintegration/clientmiddleware/caching_middleware.go b/pkg/services/pluginsintegration/clientmiddleware/caching_middleware.go index a86f5921dce..e9e0b50bc0b 100644 --- a/pkg/services/pluginsintegration/clientmiddleware/caching_middleware.go +++ b/pkg/services/pluginsintegration/clientmiddleware/caching_middleware.go @@ -100,7 +100,7 @@ func (m *CachingMiddleware) CallResource(ctx context.Context, req *backend.CallR start := time.Now() // First look in the resource cache if enabled - hit, resp := m.caching.HandleResourceRequest(ctx, req) + hit, cr := m.caching.HandleResourceRequest(ctx, req) defer func() { // record request duration if caching was used @@ -114,13 +114,21 @@ func (m *CachingMiddleware) CallResource(ctx context.Context, req *backend.CallR // Cache hit; send the response and return if hit { - return sender.Send(resp) + return sender.Send(cr.Response) } // Cache miss; do the actual request - // The call to update the cache happens in /pkg/api/plugin_resource.go in the flushStream() func - // TODO: Implement updating the cache from this method - return m.next.CallResource(ctx, req, sender) + // If there is no update cache func, just pass in the original sender + if cr.UpdateCacheFn == nil { + return m.next.CallResource(ctx, req, sender) + } + // Otherwise, intercept the responses in a wrapped sender so we can cache them first + cacheSender := cachedSenderFunc(func(res *backend.CallResourceResponse) error { + cr.UpdateCacheFn(ctx, res) + return sender.Send(res) + }) + + return m.next.CallResource(ctx, req, cacheSender) } func (m *CachingMiddleware) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { @@ -142,3 +150,9 @@ func (m *CachingMiddleware) PublishStream(ctx context.Context, req *backend.Publ func (m *CachingMiddleware) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error { return m.next.RunStream(ctx, req, sender) } + +type cachedSenderFunc func(res *backend.CallResourceResponse) error + +func (fn cachedSenderFunc) Send(res *backend.CallResourceResponse) error { + return fn(res) +} diff --git a/pkg/services/pluginsintegration/clientmiddleware/caching_middleware_test.go b/pkg/services/pluginsintegration/clientmiddleware/caching_middleware_test.go index 54cf5e2103c..dcfe60ec525 100644 --- a/pkg/services/pluginsintegration/clientmiddleware/caching_middleware_test.go +++ b/pkg/services/pluginsintegration/clientmiddleware/caching_middleware_test.go @@ -97,10 +97,30 @@ func TestCachingMiddleware(t *testing.T) { req, err := http.NewRequest(http.MethodGet, "/resource/blah", nil) require.NoError(t, err) + // This is the response returned by the HandleResourceRequest call + // Track whether the update cache fn was called, depending on what the response headers are in the cache request + var updateCacheCalled bool + dataResponse := caching.CachedResourceDataResponse{ + Response: &backend.CallResourceResponse{ + Status: 200, + Body: []byte("bogus"), + }, + UpdateCacheFn: func(ctx context.Context, rdr *backend.CallResourceResponse) { + updateCacheCalled = true + }, + } + + // This is the response sent via the passed-in sender when there is a cache miss + simulatedPluginResponse := &backend.CallResourceResponse{ + Status: 201, + Body: []byte("bogus"), + } + cs := caching.NewFakeOSSCachingService() cdt := clienttest.NewClientDecoratorTest(t, clienttest.WithReqContext(req, &user.SignedInUser{}), clienttest.WithMiddlewares(NewCachingMiddleware(cs)), + clienttest.WithResourceResponses([]*backend.CallResourceResponse{simulatedPluginResponse}), ) jsonDataMap := map[string]interface{}{} @@ -121,11 +141,6 @@ func TestCachingMiddleware(t *testing.T) { PluginContext: pluginCtx, } - resourceResponse := &backend.CallResourceResponse{ - Status: 200, - Body: []byte("bogus"), - } - var sentResponse *backend.CallResourceResponse var storeOneResponseCallResourceSender = callResourceResponseSenderFunc(func(res *backend.CallResourceResponse) error { sentResponse = res @@ -139,32 +154,37 @@ func TestCachingMiddleware(t *testing.T) { }) cs.ReturnHit = true - cs.ReturnResourceResponse = resourceResponse + cs.ReturnResourceResponse = dataResponse err := cdt.Decorator.CallResource(req.Context(), crr, storeOneResponseCallResourceSender) assert.NoError(t, err) // Cache service is called once cs.AssertCalls(t, "HandleResourceRequest", 1) - // Equals the mocked response was sent + // The mocked cached response was sent assert.NotNil(t, sentResponse) - assert.Equal(t, resourceResponse, sentResponse) + assert.Equal(t, dataResponse.Response, sentResponse) + // Cache was not updated by the middleware + assert.False(t, updateCacheCalled) }) - t.Run("If cache returns a miss, resource call is issued", func(t *testing.T) { + t.Run("If cache returns a miss, resource call is issued and the update cache function is called", func(t *testing.T) { t.Cleanup(func() { sentResponse = nil cs.Reset() }) cs.ReturnHit = false - cs.ReturnResourceResponse = resourceResponse + cs.ReturnResourceResponse = dataResponse err := cdt.Decorator.CallResource(req.Context(), crr, storeOneResponseCallResourceSender) assert.NoError(t, err) // Cache service is called once cs.AssertCalls(t, "HandleResourceRequest", 1) - // Nil response was sent - assert.Nil(t, sentResponse) + // Simulated plugin response was sent + assert.NotNil(t, sentResponse) + assert.Equal(t, simulatedPluginResponse, sentResponse) + // Since it was a miss, the middleware called the update func + assert.True(t, updateCacheCalled) }) })