mirror of
https://github.com/grafana/loki.git
synced 2026-03-13 09:33:58 +08:00
59 lines
1.4 KiB
Go
59 lines
1.4 KiB
Go
package executor
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/apache/arrow-go/v18/arrow"
|
|
)
|
|
|
|
func NewLimitPipeline(input Pipeline, skip, fetch uint32) *GenericPipeline {
|
|
// We gradually reduce offsetRemaining and limitRemaining as we process more records, as the
|
|
// offsetRemaining and limitRemaining may cross record boundaries.
|
|
var (
|
|
offsetRemaining = int64(skip)
|
|
limitRemaining = int64(fetch)
|
|
)
|
|
|
|
return newGenericPipeline(func(ctx context.Context, inputs []Pipeline) (arrow.Record, error) {
|
|
var length int64
|
|
var start, end int64
|
|
var batch arrow.Record
|
|
var err error
|
|
|
|
// We skip yielding zero-length batches while offsetRemainig > 0
|
|
for length == 0 {
|
|
// Stop once we reached the limit
|
|
if limitRemaining <= 0 {
|
|
return nil, EOF
|
|
}
|
|
|
|
// Pull the next item from input
|
|
input := inputs[0]
|
|
batch, err = input.Read(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// We want to slice batch so it only contains the rows we're looking for
|
|
// accounting for both the limit and offset.
|
|
// We constrain the start and end to be within the bounds of the record.
|
|
start = min(offsetRemaining, batch.NumRows())
|
|
end = min(start+limitRemaining, batch.NumRows())
|
|
length = end - start
|
|
|
|
offsetRemaining -= start
|
|
limitRemaining -= length
|
|
}
|
|
|
|
if length <= 0 && offsetRemaining <= 0 {
|
|
return nil, EOF
|
|
}
|
|
|
|
if batch.NumRows() == 0 {
|
|
return batch, nil
|
|
}
|
|
|
|
return batch.NewSlice(start, end), nil
|
|
}, input)
|
|
}
|