mirror of
https://github.com/grafana/loki.git
synced 2026-03-13 09:33:58 +08:00
137 lines
4.0 KiB
Go
137 lines
4.0 KiB
Go
package executor
|
|
|
|
import (
|
|
"errors"
|
|
"testing"
|
|
|
|
"github.com/apache/arrow-go/v18/arrow"
|
|
"github.com/apache/arrow-go/v18/arrow/memory"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/grafana/loki/v3/pkg/util/arrowtest"
|
|
)
|
|
|
|
// recordInput pairs a schema with rows to build one record for pipeline tests.
|
|
type recordInput struct {
|
|
Schema *arrow.Schema
|
|
Rows arrowtest.Rows
|
|
}
|
|
|
|
func TestBatchingPipeline(t *testing.T) {
|
|
schemaAB := arrow.NewSchema([]arrow.Field{
|
|
{Name: "a", Type: arrow.PrimitiveTypes.Int64, Nullable: true},
|
|
{Name: "b", Type: arrow.BinaryTypes.String, Nullable: true},
|
|
}, nil)
|
|
|
|
tests := []struct {
|
|
name string
|
|
batchSize int64
|
|
inputs []recordInput
|
|
wantBatches int
|
|
wantTotalRows int64
|
|
checkBatches func(t *testing.T, batches []arrow.RecordBatch)
|
|
}{
|
|
{
|
|
name: "batchSize=0 passes records through unchanged",
|
|
batchSize: 0,
|
|
inputs: []recordInput{
|
|
{schemaAB, arrowtest.Rows{{"a": int64(1), "b": "x"}}},
|
|
{schemaAB, arrowtest.Rows{{"a": int64(2), "b": "x"}}},
|
|
{schemaAB, arrowtest.Rows{{"a": int64(3), "b": "x"}}},
|
|
},
|
|
wantBatches: 3,
|
|
wantTotalRows: 3,
|
|
},
|
|
{
|
|
name: "batchSize=2 fits exactly two records per batch",
|
|
batchSize: 2,
|
|
inputs: []recordInput{
|
|
{schemaAB, arrowtest.Rows{{"a": int64(1), "b": "x"}}},
|
|
{schemaAB, arrowtest.Rows{{"a": int64(2), "b": "x"}}},
|
|
{schemaAB, arrowtest.Rows{{"a": int64(3), "b": "x"}}},
|
|
{schemaAB, arrowtest.Rows{{"a": int64(4), "b": "x"}}},
|
|
{schemaAB, arrowtest.Rows{{"a": int64(5), "b": "x"}}},
|
|
},
|
|
wantBatches: 3,
|
|
wantTotalRows: 5,
|
|
checkBatches: func(t *testing.T, batches []arrow.RecordBatch) {
|
|
require.Equal(t, int64(2), batches[0].NumRows(), "first batch: 2 rows")
|
|
require.Equal(t, int64(2), batches[1].NumRows(), "second batch: 2 rows")
|
|
require.Equal(t, int64(1), batches[2].NumRows(), "third batch: 1 row")
|
|
},
|
|
},
|
|
{
|
|
name: "batchSize=1 each record sent alone",
|
|
batchSize: 1,
|
|
inputs: []recordInput{
|
|
{schemaAB, arrowtest.Rows{{"a": int64(1), "b": "x"}}},
|
|
{schemaAB, arrowtest.Rows{{"a": int64(2), "b": "x"}}},
|
|
{schemaAB, arrowtest.Rows{{"a": int64(3), "b": "x"}}},
|
|
},
|
|
wantBatches: 3,
|
|
wantTotalRows: 3,
|
|
},
|
|
{
|
|
name: "records with different schemas are batched with union schema",
|
|
batchSize: 10, // large enough to batch both records
|
|
inputs: []recordInput{
|
|
{
|
|
arrow.NewSchema([]arrow.Field{
|
|
{Name: "a", Type: arrow.PrimitiveTypes.Int64, Nullable: true},
|
|
{Name: "b", Type: arrow.BinaryTypes.String, Nullable: true},
|
|
}, nil),
|
|
arrowtest.Rows{{"a": int64(1), "b": "x"}},
|
|
},
|
|
{
|
|
arrow.NewSchema([]arrow.Field{
|
|
{Name: "b", Type: arrow.BinaryTypes.String, Nullable: true},
|
|
{Name: "c", Type: arrow.PrimitiveTypes.Int64, Nullable: true},
|
|
}, nil),
|
|
arrowtest.Rows{{"b": "y", "c": int64(2)}},
|
|
},
|
|
},
|
|
wantBatches: 1,
|
|
wantTotalRows: 2,
|
|
checkBatches: func(t *testing.T, batches []arrow.RecordBatch) {
|
|
fields := make([]string, batches[0].Schema().NumFields())
|
|
for i := range fields {
|
|
fields[i] = batches[0].Schema().Field(i).Name
|
|
}
|
|
require.Equal(t, []string{"a", "b", "c"}, fields, "union schema: fields in order first seen")
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
ctx := t.Context()
|
|
records := make([]arrow.RecordBatch, len(tt.inputs))
|
|
for i, in := range tt.inputs {
|
|
records[i] = in.Rows.Record(memory.DefaultAllocator, in.Schema)
|
|
}
|
|
|
|
p := NewBatchingPipeline(NewBufferedPipeline(records...), tt.batchSize)
|
|
defer p.Close()
|
|
|
|
require.NoError(t, p.Open(ctx))
|
|
var batches []arrow.RecordBatch
|
|
var totalRows int64
|
|
for {
|
|
rec, err := p.Read(ctx)
|
|
if errors.Is(err, EOF) {
|
|
break
|
|
}
|
|
require.NoError(t, err)
|
|
batches = append(batches, rec)
|
|
totalRows += rec.NumRows()
|
|
}
|
|
|
|
require.Len(t, batches, tt.wantBatches)
|
|
require.Equal(t, tt.wantTotalRows, totalRows)
|
|
if tt.checkBatches != nil {
|
|
tt.checkBatches(t, batches)
|
|
}
|
|
})
|
|
}
|
|
}
|