mirror of
https://github.com/grafana/loki.git
synced 2026-03-13 09:33:58 +08:00
chore(dataobj): prefetch during open for rowReaderDownloader (#20942)
Signed-off-by: Robert Fratto <robertfratto@gmail.com>
This commit is contained in:
@@ -458,6 +458,9 @@ func (r *RowReader) init(ctx context.Context) error {
|
||||
if err := r.initDownloader(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := r.prefetchPages(ctx); err != nil {
|
||||
return fmt.Errorf("prefetching pages: %w", err)
|
||||
}
|
||||
|
||||
if r.inner == nil {
|
||||
r.inner = newBasicRowReader(r.allColumns())
|
||||
@@ -469,6 +472,13 @@ func (r *RowReader) init(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *RowReader) prefetchPages(ctx context.Context) error {
|
||||
if !r.opts.Prefetch {
|
||||
return nil
|
||||
}
|
||||
return r.dl.Prefetch(ctx)
|
||||
}
|
||||
|
||||
// allColumns returns the full set of column to read. If r was configured with
|
||||
// prefetching, wrapped columns from [rowReaderDownloader] are returned. Otherwise,
|
||||
// the columns of the original dataset are returned.
|
||||
|
||||
@@ -2,6 +2,7 @@ package dataset
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
|
||||
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
|
||||
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/rangeset"
|
||||
@@ -130,6 +131,16 @@ func newRowReaderDownloader(dset Dataset) *rowReaderDownloader {
|
||||
return &rd
|
||||
}
|
||||
|
||||
// Prefetch will download an initial batch of pages.
|
||||
func (dl *rowReaderDownloader) Prefetch(ctx context.Context) error {
|
||||
oldReadRange := dl.readRange
|
||||
defer func() { dl.readRange = oldReadRange }()
|
||||
|
||||
// Temporarily set the read range to the entire dataset to ensure that we download everything.
|
||||
dl.readRange = rangeset.Range{Start: 0, End: math.MaxUint64}
|
||||
return dl.downloadBatch(ctx, nil)
|
||||
}
|
||||
|
||||
// AddColumn adds a column to the rowReaderDownloader. This should be called
|
||||
// before the downloader is used.
|
||||
//
|
||||
@@ -276,13 +287,20 @@ func (dl *rowReaderDownloader) buildDownloadBatch(ctx context.Context, requestor
|
||||
}
|
||||
|
||||
// Always add the requestor page to the batch if it's uncached.
|
||||
if len(requestor.data) == 0 {
|
||||
if requestor != nil && len(requestor.data) == 0 {
|
||||
pageBatch = append(pageBatch, requestor)
|
||||
}
|
||||
|
||||
// If we're not calling buildDownloadBatch due to a page read, we'll assume
|
||||
// it's a primary page so we can fill as much data as possible.
|
||||
isPrimary := true
|
||||
if requestor != nil {
|
||||
isPrimary = requestor.column.primary
|
||||
}
|
||||
|
||||
// Add uncached P1 pages to the batch. We add all P1 pages, even if it would
|
||||
// exceed the target size.
|
||||
for result := range dl.iterP1Pages(ctx, requestor.column.primary) {
|
||||
for result := range dl.iterP1Pages(ctx, isPrimary) {
|
||||
page, err := result.Value()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -306,7 +324,7 @@ func (dl *rowReaderDownloader) buildDownloadBatch(ctx context.Context, requestor
|
||||
|
||||
var targetReached bool
|
||||
|
||||
for result := range dl.iterP2Pages(ctx, requestor.column.primary) {
|
||||
for result := range dl.iterP2Pages(ctx, isPrimary) {
|
||||
page, err := result.Value()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -322,7 +340,7 @@ func (dl *rowReaderDownloader) buildDownloadBatch(ctx context.Context, requestor
|
||||
return pageBatch, nil
|
||||
}
|
||||
|
||||
for result := range dl.iterP3Pages(ctx, requestor.column.primary) {
|
||||
for result := range dl.iterP3Pages(ctx, isPrimary) {
|
||||
page, err := result.Value()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/dustin/go-humanize"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
|
||||
@@ -20,6 +21,24 @@ import (
|
||||
"github.com/grafana/loki/v3/pkg/xcap"
|
||||
)
|
||||
|
||||
func Test_RowReader_Open_Prefetch(t *testing.T) {
|
||||
dset, columns := buildTestDataset(t)
|
||||
r := NewRowReader(RowReaderOptions{Dataset: dset, Columns: columns, Prefetch: true})
|
||||
defer r.Close()
|
||||
|
||||
require.NoError(t, r.Open(t.Context()))
|
||||
|
||||
require.NotEqual(t, 0, len(r.dl.allColumns), "expected at least one column")
|
||||
for i, col := range r.dl.allColumns {
|
||||
rc := col.(*readerColumn)
|
||||
|
||||
assert.NotEqual(t, 0, len(rc.pages), "expected at least one page in column %d", i)
|
||||
for j, page := range rc.pages {
|
||||
assert.NotNil(t, page.data, "Expected column %d page %d to be prefetched", i, j)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func Test_Reader_ReadAll(t *testing.T) {
|
||||
dset, columns := buildTestDataset(t)
|
||||
r := NewRowReader(RowReaderOptions{Dataset: dset, Columns: columns})
|
||||
|
||||
Reference in New Issue
Block a user