mirror of
https://github.com/grafana/grafana.git
synced 2025-07-31 06:52:42 +08:00
Storage: Add ascending order support for NotOlderThan queries and introduce ResourceVersionMatch_Unset as default (#102505)
* Add support for ASC ordering and introduce ResourceVersionMatch_Unset as default Add SortAscending to continue token and add integration test for pagination. * Change protobuf order * Make backwards compatible * Update pkg/storage/unified/sql/backend.go Co-authored-by: Jean-Philippe Quéméner <JohnnyQQQQ@users.noreply.github.com> --------- Co-authored-by: Marco de Abreu <18629099+marcoabreu@users.noreply.github.com> Co-authored-by: Jean-Philippe Quéméner <JohnnyQQQQ@users.noreply.github.com>
This commit is contained in:
@ -443,6 +443,7 @@ func runTestIntegrationBackendList(t *testing.T, backend resource.StorageBackend
|
||||
continueToken := &resource.ContinueToken{
|
||||
ResourceVersion: rv8,
|
||||
StartOffset: 2,
|
||||
SortAscending: false,
|
||||
}
|
||||
res, err := server.List(ctx, &resource.ListRequest{
|
||||
NextPageToken: continueToken.String(),
|
||||
@ -492,42 +493,142 @@ func runTestIntegrationBackendListHistory(t *testing.T, backend resource.Storage
|
||||
require.NoError(t, err)
|
||||
require.Greater(t, rvHistory5, rvHistory4)
|
||||
|
||||
t.Run("fetch first history page at revision with limit", func(t *testing.T) {
|
||||
res, err := server.List(ctx, &resource.ListRequest{
|
||||
Limit: 3,
|
||||
Source: resource.ListRequest_HISTORY,
|
||||
Options: &resource.ListOptions{
|
||||
Key: &resource.ResourceKey{
|
||||
Namespace: ns,
|
||||
Group: "group",
|
||||
Resource: "resource",
|
||||
Name: "item1",
|
||||
},
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, res.Error)
|
||||
require.Len(t, res.Items, 3)
|
||||
t.Log(res.Items)
|
||||
// should be in desc order, so the newest RVs are returned first
|
||||
require.Equal(t, "item1 MODIFIED", string(res.Items[0].Value))
|
||||
require.Equal(t, rvHistory5, res.Items[0].ResourceVersion)
|
||||
require.Equal(t, "item1 MODIFIED", string(res.Items[1].Value))
|
||||
require.Equal(t, rvHistory4, res.Items[1].ResourceVersion)
|
||||
require.Equal(t, "item1 MODIFIED", string(res.Items[2].Value))
|
||||
require.Equal(t, rvHistory3, res.Items[2].ResourceVersion)
|
||||
t.Run("fetch history with different version matching", func(t *testing.T) {
|
||||
baseKey := &resource.ResourceKey{
|
||||
Namespace: ns,
|
||||
Group: "group",
|
||||
Resource: "resource",
|
||||
Name: "item1",
|
||||
}
|
||||
|
||||
continueToken, err := resource.GetContinueToken(res.NextPageToken)
|
||||
require.NoError(t, err)
|
||||
// should return the furthest back RV as the next page token
|
||||
require.Equal(t, rvHistory3, continueToken.ResourceVersion)
|
||||
tests := []struct {
|
||||
name string
|
||||
request *resource.ListRequest
|
||||
expectedVersions []int64
|
||||
expectedValues []string
|
||||
minExpectedHeadRV int64
|
||||
expectedContinueRV int64
|
||||
expectedSortAsc bool
|
||||
}{
|
||||
{
|
||||
name: "NotOlderThan with rv1 (ASC order)",
|
||||
request: &resource.ListRequest{
|
||||
Limit: 3,
|
||||
Source: resource.ListRequest_HISTORY,
|
||||
ResourceVersion: rv1,
|
||||
VersionMatchV2: resource.ResourceVersionMatchV2_NotOlderThan,
|
||||
Options: &resource.ListOptions{
|
||||
Key: baseKey,
|
||||
},
|
||||
},
|
||||
expectedVersions: []int64{rv1, rvHistory1, rvHistory2},
|
||||
expectedValues: []string{"item1 ADDED", "item1 MODIFIED", "item1 MODIFIED"},
|
||||
minExpectedHeadRV: rvHistory2,
|
||||
expectedContinueRV: rvHistory2,
|
||||
expectedSortAsc: true,
|
||||
},
|
||||
{
|
||||
name: "NotOlderThan with rv=0 (ASC order)",
|
||||
request: &resource.ListRequest{
|
||||
Limit: 3,
|
||||
Source: resource.ListRequest_HISTORY,
|
||||
ResourceVersion: 0,
|
||||
VersionMatchV2: resource.ResourceVersionMatchV2_NotOlderThan,
|
||||
Options: &resource.ListOptions{
|
||||
Key: baseKey,
|
||||
},
|
||||
},
|
||||
expectedVersions: []int64{rv1, rvHistory1, rvHistory2},
|
||||
expectedValues: []string{"item1 ADDED", "item1 MODIFIED", "item1 MODIFIED"},
|
||||
minExpectedHeadRV: rvHistory2,
|
||||
expectedContinueRV: rvHistory2,
|
||||
expectedSortAsc: true,
|
||||
},
|
||||
{
|
||||
name: "ResourceVersionMatch_Unset (DESC order)",
|
||||
request: &resource.ListRequest{
|
||||
Limit: 3,
|
||||
Source: resource.ListRequest_HISTORY,
|
||||
Options: &resource.ListOptions{
|
||||
Key: baseKey,
|
||||
},
|
||||
},
|
||||
expectedVersions: []int64{rvHistory5, rvHistory4, rvHistory3},
|
||||
expectedValues: []string{"item1 MODIFIED", "item1 MODIFIED", "item1 MODIFIED"},
|
||||
minExpectedHeadRV: rvHistory5,
|
||||
expectedContinueRV: rvHistory3,
|
||||
expectedSortAsc: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
res, err := server.List(ctx, tc.request)
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, res.Error)
|
||||
require.Len(t, res.Items, 3)
|
||||
|
||||
// Check versions and values match expectations
|
||||
for i := 0; i < 3; i++ {
|
||||
require.Equal(t, tc.expectedVersions[i], res.Items[i].ResourceVersion)
|
||||
require.Equal(t, tc.expectedValues[i], string(res.Items[i].Value))
|
||||
}
|
||||
|
||||
// Check resource version in response
|
||||
require.GreaterOrEqual(t, res.ResourceVersion, tc.minExpectedHeadRV)
|
||||
|
||||
// Check continue token
|
||||
continueToken, err := resource.GetContinueToken(res.NextPageToken)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expectedContinueRV, continueToken.ResourceVersion)
|
||||
require.Equal(t, tc.expectedSortAsc, continueToken.SortAscending)
|
||||
})
|
||||
}
|
||||
|
||||
// Test pagination for NotOlderThan (second page)
|
||||
t.Run("second page with NotOlderThan", func(t *testing.T) {
|
||||
// Get first page
|
||||
firstRequest := &resource.ListRequest{
|
||||
Limit: 3,
|
||||
Source: resource.ListRequest_HISTORY,
|
||||
ResourceVersion: rv1,
|
||||
VersionMatchV2: resource.ResourceVersionMatchV2_NotOlderThan,
|
||||
Options: &resource.ListOptions{Key: baseKey},
|
||||
}
|
||||
firstPageRes, err := server.List(ctx, firstRequest)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Get continue token for second page
|
||||
continueToken, err := resource.GetContinueToken(firstPageRes.NextPageToken)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Get second page
|
||||
secondPageRes, err := server.List(ctx, &resource.ListRequest{
|
||||
Limit: 3,
|
||||
Source: resource.ListRequest_HISTORY,
|
||||
ResourceVersion: rv1,
|
||||
VersionMatchV2: resource.ResourceVersionMatchV2_NotOlderThan,
|
||||
NextPageToken: continueToken.String(),
|
||||
Options: &resource.ListOptions{Key: baseKey},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, secondPageRes.Error)
|
||||
require.Len(t, secondPageRes.Items, 3)
|
||||
|
||||
// Second page should continue in ascending order
|
||||
expectedRVs := []int64{rvHistory3, rvHistory4, rvHistory5}
|
||||
for i, expectedRV := range expectedRVs {
|
||||
require.Equal(t, expectedRV, secondPageRes.Items[i].ResourceVersion)
|
||||
require.Equal(t, "item1 MODIFIED", string(secondPageRes.Items[i].Value))
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("fetch second page of history at revision", func(t *testing.T) {
|
||||
continueToken := &resource.ContinueToken{
|
||||
ResourceVersion: rvHistory3,
|
||||
StartOffset: 2,
|
||||
SortAscending: false,
|
||||
}
|
||||
res, err := server.List(ctx, &resource.ListRequest{
|
||||
NextPageToken: continueToken.String(),
|
||||
@ -551,6 +652,123 @@ func runTestIntegrationBackendListHistory(t *testing.T, backend resource.Storage
|
||||
require.Equal(t, "item1 MODIFIED", string(res.Items[1].Value))
|
||||
require.Equal(t, rvHistory1, res.Items[1].ResourceVersion)
|
||||
})
|
||||
|
||||
t.Run("paginated history with NotOlderThan returns items in ascending order", func(t *testing.T) {
|
||||
// Create 10 versions of a resource to test pagination
|
||||
ns2 := nsPrefix + "-ns2"
|
||||
resourceKey := &resource.ResourceKey{
|
||||
Namespace: ns2,
|
||||
Group: "group",
|
||||
Resource: "resource",
|
||||
Name: "paged-item",
|
||||
}
|
||||
|
||||
var resourceVersions []int64
|
||||
|
||||
// First create the initial resource
|
||||
initialRV, err := writeEvent(ctx, backend, "paged-item", resource.WatchEvent_ADDED, WithNamespace(ns2))
|
||||
require.NoError(t, err)
|
||||
resourceVersions = append(resourceVersions, initialRV)
|
||||
|
||||
// Create 9 more versions with modifications
|
||||
for i := 0; i < 9; i++ {
|
||||
rv, err := writeEvent(ctx, backend, "paged-item", resource.WatchEvent_MODIFIED, WithNamespace(ns2))
|
||||
require.NoError(t, err)
|
||||
resourceVersions = append(resourceVersions, rv)
|
||||
}
|
||||
|
||||
// Now we should have 10 versions total (1 ADDED + 9 MODIFIED)
|
||||
require.Len(t, resourceVersions, 10)
|
||||
|
||||
// Test pagination with limit of 3 and NotOlderThan, starting from the beginning
|
||||
pages := []struct {
|
||||
pageNumber int
|
||||
pageSize int
|
||||
startToken string
|
||||
}{
|
||||
{pageNumber: 1, pageSize: 3, startToken: ""},
|
||||
{pageNumber: 2, pageSize: 3, startToken: ""}, // Will be set in the test
|
||||
{pageNumber: 3, pageSize: 3, startToken: ""}, // Will be set in the test
|
||||
{pageNumber: 4, pageSize: 1, startToken: ""}, // Will be set in the test - last page with remaining item
|
||||
}
|
||||
|
||||
var allItems []*resource.ResourceWrapper
|
||||
|
||||
// Request first page with NotOlderThan and ResourceVersion=0 (should start from oldest)
|
||||
for i, page := range pages {
|
||||
req := &resource.ListRequest{
|
||||
Limit: int64(page.pageSize),
|
||||
Source: resource.ListRequest_HISTORY,
|
||||
ResourceVersion: 0,
|
||||
VersionMatchV2: resource.ResourceVersionMatchV2_NotOlderThan,
|
||||
Options: &resource.ListOptions{
|
||||
Key: resourceKey,
|
||||
},
|
||||
}
|
||||
|
||||
if i > 0 {
|
||||
// For subsequent pages, use the continue token from the previous page
|
||||
req.NextPageToken = pages[i].startToken
|
||||
}
|
||||
|
||||
res, err := server.List(ctx, req)
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, res.Error)
|
||||
|
||||
// First 3 pages should have exactly pageSize items
|
||||
if i < 3 {
|
||||
require.Len(t, res.Items, page.pageSize, "Page %d should have %d items", i+1, page.pageSize)
|
||||
} else {
|
||||
// Last page should have 1 item (10 items total with 3+3+3+1 distribution)
|
||||
require.Len(t, res.Items, 1, "Last page should have 1 item")
|
||||
}
|
||||
|
||||
// Save continue token for next page if not the last page
|
||||
if i < len(pages)-1 {
|
||||
pages[i+1].startToken = res.NextPageToken
|
||||
require.NotEmpty(t, res.NextPageToken, "Should have continue token for page %d", i+1)
|
||||
} else {
|
||||
// Last page should not have a continue token
|
||||
require.Empty(t, res.NextPageToken, "Last page should not have continue token")
|
||||
}
|
||||
|
||||
// Add items to our collection
|
||||
allItems = append(allItems, res.Items...)
|
||||
|
||||
// Verify all items in current page are in ascending order
|
||||
for j := 1; j < len(res.Items); j++ {
|
||||
require.Less(t, res.Items[j-1].ResourceVersion, res.Items[j].ResourceVersion,
|
||||
"Items within page %d should be in ascending order", i+1)
|
||||
}
|
||||
|
||||
// For pages after the first, verify first item of current page is greater than last item of previous page
|
||||
if i > 0 && len(allItems) > page.pageSize {
|
||||
prevPageLastIdx := len(allItems) - len(res.Items) - 1
|
||||
currentPageFirstIdx := len(allItems) - len(res.Items)
|
||||
require.Greater(t, allItems[currentPageFirstIdx].ResourceVersion, allItems[prevPageLastIdx].ResourceVersion,
|
||||
"First item of page %d should have higher RV than last item of page %d", i+1, i)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify we got all 10 items
|
||||
require.Len(t, allItems, 10, "Should have retrieved all 10 items")
|
||||
|
||||
// Verify all items are in ascending order of resource version
|
||||
for i := 1; i < len(allItems); i++ {
|
||||
require.Less(t, allItems[i-1].ResourceVersion, allItems[i].ResourceVersion,
|
||||
"All items should be in ascending order of resource version")
|
||||
}
|
||||
|
||||
// Verify the first item is the initial ADDED event
|
||||
require.Equal(t, initialRV, allItems[0].ResourceVersion, "First item should be the initial ADDED event")
|
||||
require.Equal(t, "paged-item ADDED", string(allItems[0].Value))
|
||||
|
||||
// Verify all other items are MODIFIED events and correspond to our recorded resource versions
|
||||
for i := 1; i < len(allItems); i++ {
|
||||
require.Equal(t, "paged-item MODIFIED", string(allItems[i].Value))
|
||||
require.Equal(t, resourceVersions[i], allItems[i].ResourceVersion)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func runTestIntegrationBlobSupport(t *testing.T, backend resource.StorageBackend, nsPrefix string) {
|
||||
|
Reference in New Issue
Block a user