diff --git a/pkg/setting/setting.go b/pkg/setting/setting.go index 3ed1cc48e37..fcb680a0f4f 100644 --- a/pkg/setting/setting.go +++ b/pkg/setting/setting.go @@ -551,6 +551,7 @@ type Cfg struct { // Unified Storage UnifiedStorage map[string]UnifiedStorageConfig + MaxPageSizeBytes int IndexPath string IndexWorkers int IndexMaxBatchSize int diff --git a/pkg/setting/setting_unified_storage.go b/pkg/setting/setting_unified_storage.go index b77329bd03f..8f07a5cc1eb 100644 --- a/pkg/setting/setting_unified_storage.go +++ b/pkg/setting/setting_unified_storage.go @@ -51,6 +51,7 @@ func (cfg *Cfg) setUnifiedStorageConfig() { // Set indexer config for unified storaae section := cfg.Raw.Section("unified_storage") + cfg.MaxPageSizeBytes = section.Key("max_page_size_bytes").MustInt(0) cfg.IndexPath = section.Key("index_path").String() cfg.IndexWorkers = section.Key("index_workers").MustInt(10) cfg.IndexMaxBatchSize = section.Key("index_max_batch_size").MustInt(100) diff --git a/pkg/storage/legacysql/dualwrite/dualwriter.go b/pkg/storage/legacysql/dualwrite/dualwriter.go index 8cda0f6191d..3eb01bea499 100644 --- a/pkg/storage/legacysql/dualwrite/dualwriter.go +++ b/pkg/storage/legacysql/dualwrite/dualwriter.go @@ -67,31 +67,106 @@ func (d *dualWriter) Get(ctx context.Context, name string, options *metav1.GetOp } func (d *dualWriter) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) { - // If we read from unified, we can just do that and return. - if d.readUnified { - return d.unified.List(ctx, options) - } - // If legacy is still the main store, lets first read from it. - legacyList, err := d.legacy.List(ctx, options) + // Always work on *copies* so we never mutate the caller's ListOptions. + var ( + legacyOptions = options.DeepCopy() + unifiedOptions = options.DeepCopy() + log = logging.FromContext(ctx).With("method", "List") + ) + + legacyToken, unifiedToken, err := parseContinueTokens(options.Continue) if err != nil { return nil, err } - // Once we have successfully listed from legacy, we can check if we want to fail on a unified list. - // If we allow the unified list to fail, we can do it in the background and return. - if d.errorIsOK { - go func(ctxBg context.Context, cancel context.CancelFunc) { - defer cancel() - if _, err := d.unified.List(ctxBg, options); err != nil { - log := logging.FromContext(ctxBg).With("method", "List") - log.Error("failed background LIST to unified", "err", err) - } - }(context.WithTimeout(context.WithoutCancel(ctx), backgroundReqTimeout)) - return legacyList, nil + + legacyOptions.Continue = legacyToken + unifiedOptions.Continue = unifiedToken + + // If we read from unified, we can just do that and return. + if d.readUnified { + unifiedList, err := d.unified.List(ctx, unifiedOptions) + if err != nil { + return nil, err + } + unifiedMeta, err := meta.ListAccessor(unifiedList) + if err != nil { + return nil, fmt.Errorf("failed to access legacy List MetaData: %w", err) + } + unifiedMeta.SetContinue(buildContinueToken("", unifiedMeta.GetContinue())) + return unifiedList, nil } - // If it's not okay to fail, we have to check it in the foreground. - if _, err := d.unified.List(ctx, options); err != nil { + + // In some cases, the unified token might be there but legacy token is empty (i.e. finished iteration). + // This can happen, as unified storage iteration is doing paging not only based on the provided limit, + // but also based on the response size. This check prevents starting the new iteration again. + if options.Continue != "" && legacyToken == "" { + return nil, nil + } + + // In some cases, where the stores are not in sync yet, the unified storage continue token might already + // be empty, while the legacy one is not, as it has more data. In that case we don't want to issue a new + // request with an empty continue token, resulting in getting the first page again. + // nolint:staticcheck + shouldDoUnifiedRequest := true + if options.Continue != "" && unifiedToken == "" { + shouldDoUnifiedRequest = false + } + + // If legacy is still the main store, lets first read from it. + legacyList, err := d.legacy.List(ctx, legacyOptions) + if err != nil { return nil, err } + legacyMeta, err := meta.ListAccessor(legacyList) + if err != nil { + return nil, fmt.Errorf("failed to access legacy List MetaData: %w", err) + } + legacyToken = legacyMeta.GetContinue() + + // Once we have successfully listed from legacy, we can check if we want to fail on a unified list. + // If we allow the unified list to fail, we can do it in the background and return. + if d.errorIsOK && shouldDoUnifiedRequest { + // We would like to get continue token from unified storage, but + // don't want to wait for unified storage too long, since we're calling + // unified-storage asynchronously. + out := make(chan string, 1) + go func(ctxBg context.Context, cancel context.CancelFunc) { + defer cancel() + defer close(out) + unifiedList, err := d.unified.List(ctxBg, unifiedOptions) + if err != nil { + log.Error("failed background LIST to unified", "err", err) + return + } + unifiedMeta, err := meta.ListAccessor(unifiedList) + if err != nil { + log.Error("failed background LIST to unified", "err", + fmt.Errorf("failed to access unified List MetaData: %w", err)) + } + out <- unifiedMeta.GetContinue() + }(context.WithTimeout(context.WithoutCancel(ctx), backgroundReqTimeout)) + select { + case unifiedToken = <-out: + case <-time.After(300 * time.Millisecond): + log.Warn("timeout while waiting on the unified storage continue token") + break + } + legacyMeta.SetContinue(buildContinueToken(legacyToken, unifiedToken)) + return legacyList, nil + } + if shouldDoUnifiedRequest { + // If it's not okay to fail, we have to check it in the foreground. + unifiedList, err := d.unified.List(ctx, unifiedOptions) + if err != nil { + return nil, err + } + unifiedMeta, err := meta.ListAccessor(unifiedList) + if err != nil { + return nil, fmt.Errorf("failed to access unified List MetaData: %w", err) + } + unifiedToken = unifiedMeta.GetContinue() + } + legacyMeta.SetContinue(buildContinueToken(legacyToken, unifiedToken)) return legacyList, nil } diff --git a/pkg/storage/legacysql/dualwrite/dualwriter_continue_token.go b/pkg/storage/legacysql/dualwrite/dualwriter_continue_token.go new file mode 100644 index 00000000000..31eb8f1bf6e --- /dev/null +++ b/pkg/storage/legacysql/dualwrite/dualwriter_continue_token.go @@ -0,0 +1,33 @@ +package dualwrite + +import ( + "encoding/base64" + "fmt" + "strings" +) + +// parseContinueTokens splits a dualwriter continue token (legacy, unified) if we received one. +// If we receive a single token not separated by a comma, we return the token as-is as a legacy +// token and an empty unified token. This is to ensure a smooth transition to the new token format. +func parseContinueTokens(token string) (string, string, error) { + if token == "" { + return "", "", nil + } + decodedToken, err := base64.StdEncoding.DecodeString(token) + if err != nil { + return "", "", fmt.Errorf("failed to decode dualwriter continue token: %w", err) + } + decodedTokens := strings.Split(string(decodedToken), ",") + if len(decodedTokens) > 1 { + return decodedTokens[0], decodedTokens[1], nil + } + return token, "", nil +} + +func buildContinueToken(legacyToken, unifiedToken string) string { + if legacyToken == "" && unifiedToken == "" { + return "" + } + return base64.StdEncoding.EncodeToString([]byte( + strings.Join([]string{legacyToken, unifiedToken}, ","))) +} diff --git a/pkg/storage/legacysql/dualwrite/dualwriter_continue_token_test.go b/pkg/storage/legacysql/dualwrite/dualwriter_continue_token_test.go new file mode 100644 index 00000000000..2f79b9167c4 --- /dev/null +++ b/pkg/storage/legacysql/dualwrite/dualwriter_continue_token_test.go @@ -0,0 +1,98 @@ +package dualwrite + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestParseContinueTokens(t *testing.T) { + tcs := []struct { + name string + token string + legacyToken string + unifiedToken string + }{ + { + name: "Should handle empty token", + token: "", + legacyToken: "", + unifiedToken: "", + }, + { + name: "Should handle legacy token", + token: "MXwy", + legacyToken: "MXwy", + unifiedToken: "", + }, + { + name: "Should handle new token format", + // both slots taken 'MXwy,eyJvIjoxLCJ2IjoxNzQ5NTY1NTU4MDc4OTkwLCJzIjpmYWxzZX0=' + token: "TVh3eSxleUp2SWpveExDSjJJam94TnpRNU5UWTFOVFU0TURjNE9Ua3dMQ0p6SWpwbVlXeHpaWDA9", + legacyToken: "MXwy", + unifiedToken: "eyJvIjoxLCJ2IjoxNzQ5NTY1NTU4MDc4OTkwLCJzIjpmYWxzZX0=", + }, + { + name: "Should handle new token with only unified token (mode >= 3)", + // first slot empty ',eyJvIjoxLCJ2IjoxNzQ5NTY1NTU4MDc4OTkwLCJzIjpmYWxzZX0=' + token: "LGV5SnZJam94TENKMklqb3hOelE1TlRZMU5UVTRNRGM0T1Rrd0xDSnpJanBtWVd4elpYMD0=", + legacyToken: "", + unifiedToken: "eyJvIjoxLCJ2IjoxNzQ5NTY1NTU4MDc4OTkwLCJzIjpmYWxzZX0=", + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + legacyToken, unifiedToken, err := parseContinueTokens(tc.token) + require.NoError(t, err) + require.Equal(t, legacyToken, tc.legacyToken) + require.Equal(t, unifiedToken, tc.unifiedToken) + }) + } +} + +func TestBuildContinueToken(t *testing.T) { + tcs := []struct { + name string + legacyToken string + unifiedToken string + shouldBeEmpty bool + }{ + { + name: "Should handle both tokens", + legacyToken: "abc", + unifiedToken: "xyz", + }, + { + name: "Should handle legacy token standalone", + legacyToken: "abc", + }, + { + name: "Should handle unified token standalone", + unifiedToken: "xyz", + }, + { + name: "Should handle both tokens empty", + shouldBeEmpty: true, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + token := buildContinueToken(tc.legacyToken, tc.unifiedToken) + legacyToken, unifiedToken, err := parseContinueTokens(token) + require.NoError(t, err) + require.Equal(t, legacyToken, tc.legacyToken) + require.Equal(t, unifiedToken, tc.unifiedToken) + if tc.shouldBeEmpty { + require.Equal(t, "", token) + } + }) + } +} + +func TestInvalidToken(t *testing.T) { + // nolint: gosec + invalidToken := "325232ff4fF->" + _, _, err := parseContinueTokens(invalidToken) + require.Error(t, err) +} diff --git a/pkg/storage/unified/resource/server.go b/pkg/storage/unified/resource/server.go index f25f45927b0..3e219956f07 100644 --- a/pkg/storage/unified/resource/server.go +++ b/pkg/storage/unified/resource/server.go @@ -198,6 +198,8 @@ type ResourceServerOptions struct { storageMetrics *StorageMetrics IndexMetrics *BleveIndexMetrics + + MaxPageSizeBytes int } func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) { @@ -222,6 +224,11 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) { } } + if opts.MaxPageSizeBytes <= 0 { + // By default, we use 2MB for the page size. + opts.MaxPageSizeBytes = 1024 * 1024 * 2 + } + // Initialize the blob storage blobstore := opts.Blob.Backend if blobstore == nil { @@ -250,19 +257,20 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) { // Make this cancelable ctx, cancel := context.WithCancel(context.Background()) s := &server{ - tracer: opts.Tracer, - log: logger, - backend: opts.Backend, - blob: blobstore, - diagnostics: opts.Diagnostics, - access: opts.AccessClient, - writeHooks: opts.WriteHooks, - lifecycle: opts.Lifecycle, - now: opts.Now, - ctx: ctx, - cancel: cancel, - storageMetrics: opts.storageMetrics, - indexMetrics: opts.IndexMetrics, + tracer: opts.Tracer, + log: logger, + backend: opts.Backend, + blob: blobstore, + diagnostics: opts.Diagnostics, + access: opts.AccessClient, + writeHooks: opts.WriteHooks, + lifecycle: opts.Lifecycle, + now: opts.Now, + ctx: ctx, + cancel: cancel, + storageMetrics: opts.storageMetrics, + indexMetrics: opts.IndexMetrics, + maxPageSizeBytes: opts.MaxPageSizeBytes, } if opts.Search.Resources != nil { @@ -307,6 +315,8 @@ type server struct { // init checking once sync.Once initErr error + + maxPageSizeBytes int } // Init implements ResourceServer. @@ -791,7 +801,7 @@ func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resour if req.Limit < 1 { req.Limit = 50 // default max 50 items in a page } - maxPageBytes := 1024 * 1024 * 2 // 2mb/page + maxPageBytes := s.maxPageSizeBytes pageBytes := 0 rsp := &resourcepb.ListResponse{} diff --git a/pkg/storage/unified/sql/server.go b/pkg/storage/unified/sql/server.go index e5e1e845a9f..3d6b1d5f248 100644 --- a/pkg/storage/unified/sql/server.go +++ b/pkg/storage/unified/sql/server.go @@ -43,6 +43,12 @@ func NewResourceServer(db infraDB.DB, cfg *setting.Cfg, opts.Blob.URL = "file:///" + dir } + // This is mostly for testing, being able to influence when we paginate + // based on the page size during tests. + unifiedStorageCfg := cfg.SectionWithEnvOverrides("unified_storage") + maxPageSizeBytes := unifiedStorageCfg.Key("max_page_size_bytes") + opts.MaxPageSizeBytes = maxPageSizeBytes.MustInt(0) + eDB, err := dbimpl.ProvideResourceDB(db, cfg, tracer) if err != nil { return nil, err diff --git a/pkg/tests/apis/folder/folders_test.go b/pkg/tests/apis/folder/folders_test.go index c918e1f1b64..c8975a601cb 100644 --- a/pkg/tests/apis/folder/folders_test.go +++ b/pkg/tests/apis/folder/folders_test.go @@ -177,6 +177,38 @@ func TestIntegrationFoldersApp(t *testing.T) { })) }) + // This is a general test for the unified storage list operation. We don't have a common test + // directory for now, so we (search and storage) keep it here as we own this part of the tests. + t.Run("make sure list works with continue tokens", func(t *testing.T) { + modes := []grafanarest.DualWriterMode{ + grafanarest.Mode1, + grafanarest.Mode2, + grafanarest.Mode3, + grafanarest.Mode4, + grafanarest.Mode5, + } + for _, mode := range modes { + t.Run(fmt.Sprintf("mode %d", mode), func(t *testing.T) { + doListFoldersTest(t, apis.NewK8sTestHelper(t, testinfra.GrafanaOpts{ + AppModeProduction: true, + DisableAnonymous: true, + APIServerStorageType: "unified", + UnifiedStorageConfig: map[string]setting.UnifiedStorageConfig{ + folders.RESOURCEGROUP: { + DualWriterMode: mode, + }, + }, + // We set it to 1 here, so we always get forced pagination based on the response size. + UnifiedStorageMaxPageSizeBytes: 1, + EnableFeatureToggles: []string{ + featuremgmt.FlagKubernetesClientDashboardsFolders, + featuremgmt.FlagNestedFolders, + }, + }), mode) + }) + } + }) + t.Run("when creating a folder it should trim leading and trailing spaces", func(t *testing.T) { doCreateEnsureTitleIsTrimmedTest(t, apis.NewK8sTestHelper(t, testinfra.GrafanaOpts{ AppModeProduction: true, @@ -488,6 +520,65 @@ func doCreateCircularReferenceFolderTest(t *testing.T, helper *apis.K8sTestHelpe require.Equal(t, 400, create.Response.StatusCode) } +func doListFoldersTest(t *testing.T, helper *apis.K8sTestHelper, mode grafanarest.DualWriterMode) { + client := helper.GetResourceClient(apis.ResourceClientArgs{ + User: helper.Org1.Admin, + GVR: gvr, + }) + foldersCount := 3 + for i := 0; i < foldersCount; i++ { + payload, err := json.Marshal(map[string]interface{}{ + "title": fmt.Sprintf("Test-%d", i), + "uid": fmt.Sprintf("uid-%d", i), + }) + require.NoError(t, err) + parentCreate := apis.DoRequest(helper, apis.RequestParams{ + User: client.Args.User, + Method: http.MethodPost, + Path: "/api/folders", + Body: payload, + }, &folder.Folder{}) + require.NotNil(t, parentCreate.Result) + require.Equal(t, http.StatusOK, parentCreate.Response.StatusCode) + } + fetchedFolders, fetchItemsPerCall := checkListRequest(t, 1, client) + require.Equal(t, []string{"uid-0", "uid-1", "uid-2"}, fetchedFolders) + require.Equal(t, []int{1, 1, 1}, fetchItemsPerCall[:3]) + + // Now let's see if the iterator also works when we are limited by the page size, which should be set + // to 1 byte for this test. We only need to check that if we test unified storage as the primary storage, + // as legacy doesn't have such a page size limit. + if mode == grafanarest.Mode3 || mode == grafanarest.Mode4 || mode == grafanarest.Mode5 { + t.Run("check page size iterator", func(t *testing.T) { + fetchedFolders, fetchItemsPerCall := checkListRequest(t, 3, client) + require.Equal(t, []string{"uid-0", "uid-1", "uid-2"}, fetchedFolders) + require.Equal(t, []int{1, 1, 1}, fetchItemsPerCall[:3]) + }) + } +} + +func checkListRequest(t *testing.T, limit int64, client *apis.K8sResourceClient) ([]string, []int) { + fetchedFolders := make([]string, 0, 3) + fetchItemsPerCall := make([]int, 0, 3) + continueToken := "" + for { + res, err := client.Resource.List(context.Background(), metav1.ListOptions{ + Limit: limit, + Continue: continueToken, + }) + require.NoError(t, err) + fetchItemsPerCall = append(fetchItemsPerCall, len(res.Items)) + for _, item := range res.Items { + fetchedFolders = append(fetchedFolders, item.GetName()) + } + continueToken = res.GetContinue() + if continueToken == "" { + break + } + } + return fetchedFolders, fetchItemsPerCall +} + func TestIntegrationFolderCreatePermissions(t *testing.T) { if testing.Short() { t.Skip("skipping integration test") diff --git a/pkg/tests/testinfra/testinfra.go b/pkg/tests/testinfra/testinfra.go index 0d6510df906..5310a1813ad 100644 --- a/pkg/tests/testinfra/testinfra.go +++ b/pkg/tests/testinfra/testinfra.go @@ -492,6 +492,12 @@ func CreateGrafDir(t *testing.T, opts GrafanaOpts) (string, string) { require.NoError(t, err) } } + if opts.UnifiedStorageMaxPageSizeBytes > 0 { + section, err := getOrCreateSection("unified_storage") + require.NoError(t, err) + _, err = section.NewKey("max_page_size_bytes", fmt.Sprintf("%d", opts.UnifiedStorageMaxPageSizeBytes)) + require.NoError(t, err) + } if opts.PermittedProvisioningPaths != "" { _, err = pathsSect.NewKey("permitted_provisioning_paths", opts.PermittedProvisioningPaths) require.NoError(t, err) @@ -556,6 +562,7 @@ type GrafanaOpts struct { QueryRetries int64 GrafanaComAPIURL string UnifiedStorageConfig map[string]setting.UnifiedStorageConfig + UnifiedStorageMaxPageSizeBytes int PermittedProvisioningPaths string GrafanaComSSOAPIToken string LicensePath string