From fdc807e6dfa1177ead4512113ec4886ccb39de53 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Tue, 24 Feb 2026 14:33:03 -0500 Subject: [PATCH] chore(dataobj): prefetch during open for rowReaderDownloader (#20942) Signed-off-by: Robert Fratto --- pkg/dataobj/internal/dataset/row_reader.go | 10 +++++++ .../internal/dataset/row_reader_downloader.go | 26 ++++++++++++++++--- .../internal/dataset/row_reader_test.go | 19 ++++++++++++++ 3 files changed, 51 insertions(+), 4 deletions(-) diff --git a/pkg/dataobj/internal/dataset/row_reader.go b/pkg/dataobj/internal/dataset/row_reader.go index b1dda755e5..4935567df3 100644 --- a/pkg/dataobj/internal/dataset/row_reader.go +++ b/pkg/dataobj/internal/dataset/row_reader.go @@ -458,6 +458,9 @@ func (r *RowReader) init(ctx context.Context) error { if err := r.initDownloader(ctx); err != nil { return err } + if err := r.prefetchPages(ctx); err != nil { + return fmt.Errorf("prefetching pages: %w", err) + } if r.inner == nil { r.inner = newBasicRowReader(r.allColumns()) @@ -469,6 +472,13 @@ func (r *RowReader) init(ctx context.Context) error { return nil } +func (r *RowReader) prefetchPages(ctx context.Context) error { + if !r.opts.Prefetch { + return nil + } + return r.dl.Prefetch(ctx) +} + // allColumns returns the full set of column to read. If r was configured with // prefetching, wrapped columns from [rowReaderDownloader] are returned. Otherwise, // the columns of the original dataset are returned. diff --git a/pkg/dataobj/internal/dataset/row_reader_downloader.go b/pkg/dataobj/internal/dataset/row_reader_downloader.go index 0c593a5d8e..8c4c3870fc 100644 --- a/pkg/dataobj/internal/dataset/row_reader_downloader.go +++ b/pkg/dataobj/internal/dataset/row_reader_downloader.go @@ -2,6 +2,7 @@ package dataset import ( "context" + "math" "github.com/grafana/loki/v3/pkg/dataobj/internal/result" "github.com/grafana/loki/v3/pkg/dataobj/internal/util/rangeset" @@ -130,6 +131,16 @@ func newRowReaderDownloader(dset Dataset) *rowReaderDownloader { return &rd } +// Prefetch will download an initial batch of pages. +func (dl *rowReaderDownloader) Prefetch(ctx context.Context) error { + oldReadRange := dl.readRange + defer func() { dl.readRange = oldReadRange }() + + // Temporarily set the read range to the entire dataset to ensure that we download everything. + dl.readRange = rangeset.Range{Start: 0, End: math.MaxUint64} + return dl.downloadBatch(ctx, nil) +} + // AddColumn adds a column to the rowReaderDownloader. This should be called // before the downloader is used. // @@ -276,13 +287,20 @@ func (dl *rowReaderDownloader) buildDownloadBatch(ctx context.Context, requestor } // Always add the requestor page to the batch if it's uncached. - if len(requestor.data) == 0 { + if requestor != nil && len(requestor.data) == 0 { pageBatch = append(pageBatch, requestor) } + // If we're not calling buildDownloadBatch due to a page read, we'll assume + // it's a primary page so we can fill as much data as possible. + isPrimary := true + if requestor != nil { + isPrimary = requestor.column.primary + } + // Add uncached P1 pages to the batch. We add all P1 pages, even if it would // exceed the target size. - for result := range dl.iterP1Pages(ctx, requestor.column.primary) { + for result := range dl.iterP1Pages(ctx, isPrimary) { page, err := result.Value() if err != nil { return nil, err @@ -306,7 +324,7 @@ func (dl *rowReaderDownloader) buildDownloadBatch(ctx context.Context, requestor var targetReached bool - for result := range dl.iterP2Pages(ctx, requestor.column.primary) { + for result := range dl.iterP2Pages(ctx, isPrimary) { page, err := result.Value() if err != nil { return nil, err @@ -322,7 +340,7 @@ func (dl *rowReaderDownloader) buildDownloadBatch(ctx context.Context, requestor return pageBatch, nil } - for result := range dl.iterP3Pages(ctx, requestor.column.primary) { + for result := range dl.iterP3Pages(ctx, isPrimary) { page, err := result.Value() if err != nil { return nil, err diff --git a/pkg/dataobj/internal/dataset/row_reader_test.go b/pkg/dataobj/internal/dataset/row_reader_test.go index b3864e9759..e115f32c20 100644 --- a/pkg/dataobj/internal/dataset/row_reader_test.go +++ b/pkg/dataobj/internal/dataset/row_reader_test.go @@ -12,6 +12,7 @@ import ( "testing" "github.com/dustin/go-humanize" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" @@ -20,6 +21,24 @@ import ( "github.com/grafana/loki/v3/pkg/xcap" ) +func Test_RowReader_Open_Prefetch(t *testing.T) { + dset, columns := buildTestDataset(t) + r := NewRowReader(RowReaderOptions{Dataset: dset, Columns: columns, Prefetch: true}) + defer r.Close() + + require.NoError(t, r.Open(t.Context())) + + require.NotEqual(t, 0, len(r.dl.allColumns), "expected at least one column") + for i, col := range r.dl.allColumns { + rc := col.(*readerColumn) + + assert.NotEqual(t, 0, len(rc.pages), "expected at least one page in column %d", i) + for j, page := range rc.pages { + assert.NotNil(t, page.data, "Expected column %d page %d to be prefetched", i, j) + } + } +} + func Test_Reader_ReadAll(t *testing.T) { dset, columns := buildTestDataset(t) r := NewRowReader(RowReaderOptions{Dataset: dset, Columns: columns})