fix(deps): update module github.com/parquet-go/parquet-go to v0.28.0 (main) (#20973)

Signed-off-by: renovate-sh-app[bot] <219655108+renovate-sh-app[bot]@users.noreply.github.com>
Co-authored-by: renovate-sh-app[bot] <219655108+renovate-sh-app[bot]@users.noreply.github.com>
This commit is contained in:
renovate-sh-app[bot]
2026-02-26 11:55:03 -05:00
committed by GitHub
parent 92a45f135f
commit 0edcd606ee
30 changed files with 1038 additions and 138 deletions

2
go.mod
View File

@@ -140,7 +140,7 @@ require (
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db
github.com/ncw/swift/v2 v2.0.5
github.com/oklog/ulid/v2 v2.1.1
github.com/parquet-go/parquet-go v0.27.0
github.com/parquet-go/parquet-go v0.28.0
github.com/pressly/goose/v3 v3.26.0
github.com/prometheus/alertmanager v0.31.1
github.com/prometheus/common/sigv4 v0.1.0

4
go.sum
View File

@@ -965,8 +965,8 @@ github.com/parquet-go/bitpack v1.0.0 h1:AUqzlKzPPXf2bCdjfj4sTeacrUwsT7NlcYDMUQxP
github.com/parquet-go/bitpack v1.0.0/go.mod h1:XnVk9TH+O40eOOmvpAVZ7K2ocQFrQwysLMnc6M/8lgs=
github.com/parquet-go/jsonlite v1.0.0 h1:87QNdi56wOfsE5bdgas0vRzHPxfJgzrXGml1zZdd7VU=
github.com/parquet-go/jsonlite v1.0.0/go.mod h1:nDjpkpL4EOtqs6NQugUsi0Rleq9sW/OtC1NnZEnxzF0=
github.com/parquet-go/parquet-go v0.27.0 h1:vHWK2xaHbj+v1DYps03yDRpEsdtOeKbhiXUaixoPb3g=
github.com/parquet-go/parquet-go v0.27.0/go.mod h1:navtkAYr2LGoJVp141oXPlO/sxLvaOe3la2JEoD8+rg=
github.com/parquet-go/parquet-go v0.28.0 h1:ECyksyv8T2pOrlLsN7aWJIoQakyk/HtxQ2lchgS4els=
github.com/parquet-go/parquet-go v0.28.0/go.mod h1:navtkAYr2LGoJVp141oXPlO/sxLvaOe3la2JEoD8+rg=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pb33f/jsonpath v0.7.1 h1:dEp6oIZuJbpDSyuHAl9m7GonoDW4M20BcD5vT0tPYRE=

160
vendor/github.com/parquet-go/parquet-go/.CLAUDE.md generated vendored Normal file
View File

@@ -0,0 +1,160 @@
# parquet-go Project Context
## Overview
High-performance Go library for reading and writing Apache Parquet files. Originally developed by Twilio Segment, now community-maintained at github.com/parquet-go/parquet-go.
- **Go Version**: 1.22+
- **Current Version**: v0.26.0+ (pre-v1, breaking changes possible)
- **Codebase Size**: ~71,700 lines across 210 Go files
## Project Structure
```
parquet-go/
├── Root Package # Core API: Reader/Writer/File/Schema
├── encoding/ # 7 encoding formats (plain, rle, delta, etc.)
├── compress/ # 6 compression codecs (snappy, gzip, zstd, etc.)
├── bloom/ # Split-block Bloom filter (SIMD-optimized)
├── sparse/ # Sparse array utilities with gather ops
├── hashprobe/ # Hash-based dictionary operations
├── format/ # Generated Thrift definitions (parquet spec)
└── internal/ # Memory mgmt, byte algorithms, unsafe casts
```
## Key Files by Function
| Purpose | Files |
|---------|-------|
| Core Types | `parquet.go`, `file.go`, `schema.go`, `node.go`, `type.go` |
| Reading | `reader.go`, `column.go`, `page.go` |
| Writing | `writer.go`, `column_buffer.go`, `buffer.go` |
| Logical Types | `type_boolean.go` through `type_variant.go` (40+ files) |
| Column Buffers | `column_buffer_*.go` (20+ files for each physical type) |
| Page Impl | `page_*.go` (15+ files) |
| Operations | `convert.go`, `merge.go`, `sorting.go` |
| Configuration | `config.go` (900+ lines) |
## Core Types & Interfaces
### Schema System
- `Schema` - Immutable, thread-safe parquet schema
- `Node` - Interface for schema nodes
- `Field` - Named schema node interface
- `Column` - Concrete column representation
- `Type` - Logical type interface
- `Kind` - Physical type enum (Boolean, Int32, Int64, Float, Double, ByteArray, FixedLenByteArray)
### Reader/Writer System
- `GenericReader[T]` - Type-safe reader (preferred)
- `GenericWriter[T]` - Type-safe writer (preferred)
- `SortingWriter[T]` - Writer with integrated sorting
- `GenericBuffer[T]` - In-memory row group buffer (implements sort.Interface)
### Row Group Abstractions
- `RowGroup` - Interface for row group collections
- `ColumnChunk` - Interface for column data
- `Page` - Interface for page data with stats
- `Pages` - Sequential page reader
## Common Patterns
### Writing Data
```go
// One-shot
parquet.WriteFile[T](path, rows, options...)
// Streaming
writer := parquet.NewGenericWriter[T](w, options...)
writer.Write(rows)
writer.Close()
// With sorting
sortWriter := parquet.NewSortingWriter[T](w, rowCount, options...)
```
### Reading Data
```go
// One-shot
rows, _ := parquet.ReadFile[T](path)
// Streaming
reader := parquet.NewGenericReader[T](r, options...)
n, _ := reader.Read(rows)
// Low-level
file, _ := parquet.OpenFile(r, size)
for _, rg := range file.RowGroups() { ... }
```
## Build Commands
```bash
make test # Run tests with -race and coverage
make format # Go fmt + modernize tool
make tools # Install development tools
```
## Testing Patterns
- Unit tests: `*_test.go` throughout (100+ files)
- Examples: `example_test.go`
- Property tests: `internal/quick` package
- Key test files:
- `parquet_test.go` - Core functionality
- `writer_test.go` - Writer specifics (91KB)
- `reader_test.go` - Reader specifics
- `merge_test.go` - Merge operations
- `convert_test.go` - Schema conversion
## Debugging
Set environment variable:
```bash
PARQUETGODEBUG=1
```
## Important Implementation Details
### Performance Optimizations
- SIMD assembly for AMD64 (dictionary ops, page bounds, ordering)
- Zero-copy via interface-based design
- Memory pooling with `BufferPool`
- Async reading with `ReadModeAsync`
### Schema Tags
Struct field tags control parquet behavior:
```go
type Record struct {
ID int64 `parquet:"id"`
Name string `parquet:"name,optional"`
Data []byte `parquet:"data,snappy"`
}
```
### Encoding Options
- plain, rle, delta (binary-packed, length byte array, byte array)
- bytestreamsplit (float optimization)
- dictionary encoding
### Compression Codecs
snappy, gzip, brotli, zstd, lz4, uncompressed
## Recent Development Focus
- Bug fixes: panic in Group.GoType(), json.RawMessage handling, repetition levels
- New features: Geometry/Geography types, VARIANT logical type
- Performance: GenericWriter optimizations
- Stability: SortingWriter improvements
## Common Bug Areas
- Schema conversion edge cases (`convert.go`)
- Nested structure handling (repetition/definition levels)
- Memory management in streaming writes
- Page statistics and index handling
- Dictionary encoding with nulls
- Interface type handling in dynamic value mapping (`value.go`, `row.go`)
## Dependencies (Runtime)
- Compression: brotli, gzip, lz4, zstd libraries
- Encoding: bitpack, jsonlite
- Types: google/uuid, go-geom (geometry)
- Serialization: protobuf

View File

@@ -238,12 +238,15 @@ func (c *Column) setLevels(depth, repetition, definition, index int) (int, error
return -1, fmt.Errorf("cannot represent parquet columns with more than %d definition levels: %s", MaxDefinitionLevel, c.path)
}
switch schemaRepetitionTypeOf(c.schema) {
case format.Optional:
definition++
case format.Repeated:
repetition++
definition++
// Only non-root columns have a well defined repetition_type
if depth > 0 {
switch schemaRepetitionTypeOf(c.schema) {
case format.Optional:
definition++
case format.Repeated:
repetition++
definition++
}
}
c.depth = int8(depth)

View File

@@ -1,10 +1,14 @@
package parquet
import (
"bytes"
"cmp"
"encoding/binary"
"encoding/json"
"fmt"
"maps"
"math"
"math/big"
"math/bits"
"reflect"
"sort"
@@ -37,11 +41,10 @@ import (
// - nil *structpb.Struct, *structpb.ListValue, *structpb.Value
// - Zero values for value types
func isNullValue(value reflect.Value) bool {
if !value.IsValid() {
return true
}
switch value.Kind() {
case reflect.Invalid:
return true
case reflect.Pointer, reflect.Interface:
if value.IsNil() {
return true
@@ -705,12 +708,11 @@ func writeValueFuncOfLeaf(columnIndex int16, node Node) (int16, writeValueFunc)
return columnIndex + 1, func(columns []ColumnBuffer, levels columnLevels, value reflect.Value) {
col := columns[columnIndex]
writeValue:
if !value.IsValid() {
switch value.Kind() {
case reflect.Invalid:
col.writeNull(levels)
return
}
switch value.Kind() {
case reflect.Pointer, reflect.Interface:
if value.IsNil() {
col.writeNull(levels)
@@ -755,6 +757,8 @@ func writeValueFuncOfLeaf(columnIndex int16, node Node) (int16, writeValueFunc)
writeProtoAny(col, levels, msg, node)
case geom.T:
writeGeometry(col, levels, msg, node)
case *big.Float:
writeBigFloat(col, levels, msg, node)
default:
value = value.Elem()
goto writeValue
@@ -790,10 +794,23 @@ func writeValueFuncOfLeaf(columnIndex int16, node Node) (int16, writeValueFunc)
return
case reflect.Float32:
typ := node.Type()
logicalType := typ.LogicalType()
if logicalType != nil && logicalType.Decimal != nil {
decimalValue(col, levels, typ, value, logicalType.Decimal.Scale)
return
}
col.writeFloat(levels, float32(value.Float()))
return
case reflect.Float64:
typ := node.Type()
logicalType := typ.LogicalType()
if logicalType != nil && logicalType.Decimal != nil {
decimalValue(col, levels, typ, value, logicalType.Decimal.Scale)
return
}
col.writeDouble(levels, value.Float())
return
@@ -907,3 +924,108 @@ func writeUUID(col ColumnBuffer, levels columnLevels, str string, typ Type) {
col.writeByteArray(levels, buf.Slice())
buf.Reset()
}
func decimalValue(col ColumnBuffer, levels columnLevels, typ Type, value reflect.Value, scale int32) {
val := int64(math.Round(value.Float() * math.Pow10(int(scale))))
switch typ.Kind() {
case Int32:
col.writeInt32(levels, int32(val))
case Int64:
col.writeInt64(levels, val)
case ByteArray:
col.writeByteArray(levels, numberToByteArray(val))
}
}
func numberToByteArray(data any) []byte {
var buf bytes.Buffer
err := binary.Write(&buf, binary.BigEndian, data)
if err != nil {
panic(err)
}
return buf.Bytes()
}
func writeBigFloat(col ColumnBuffer, levels columnLevels, f *big.Float, node Node) {
typ := node.Type()
logicalType := typ.LogicalType()
if logicalType == nil || logicalType.Decimal == nil {
panic("writeBigFloat requires a decimal logical type")
}
scale := int(logicalType.Decimal.Scale)
// Compute minimum precision needed: decimal precision * log2(10) ≈ precision * 3.32
// We use precision * 4 + 64 for safety margin
minPrec := uint(logicalType.Decimal.Precision)*4 + 64
prec := max(f.Prec(), minPrec)
scaleFactor := new(big.Float).SetPrec(prec)
scaleFactor.SetInt(new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(scale)), nil))
scaled := new(big.Float).SetPrec(prec).Mul(f, scaleFactor)
// Round to nearest integer (add 0.5 and truncate for positive, subtract 0.5 for negative)
half := new(big.Float).SetPrec(prec).SetFloat64(0.5)
if scaled.Sign() >= 0 {
scaled.Add(scaled, half)
} else {
scaled.Sub(scaled, half)
}
unscaled, _ := scaled.Int(nil)
b := bigIntToByteArray(unscaled)
if typ.Kind() == FixedLenByteArray {
b = padToFixedLen(b, typ.Length(), unscaled.Sign() < 0)
}
col.writeByteArray(levels, b)
}
func bigIntToByteArray(i *big.Int) []byte {
if i.Sign() >= 0 {
b := i.Bytes()
// Add leading zero byte if high bit is set to avoid being interpreted as negative
if len(b) > 0 && b[0]&0x80 != 0 {
b = append([]byte{0}, b...)
}
return b
}
// Negative: convert to two's complement
// Get the absolute value bytes
abs := new(big.Int).Abs(i)
b := abs.Bytes()
// Add a leading zero byte to ensure we have room for sign
if len(b) == 0 || b[0]&0x80 != 0 {
b = append([]byte{0}, b...)
}
// Invert all bits
for j := range b {
b[j] = ^b[j]
}
// Add 1
carry := byte(1)
for j := len(b) - 1; j >= 0 && carry > 0; j-- {
sum := b[j] + carry
b[j] = sum
if sum != 0 {
carry = 0
}
}
return b
}
func padToFixedLen(b []byte, length int, negative bool) []byte {
if len(b) == length {
return b
}
if len(b) > length {
panic(fmt.Sprintf("decimal value requires %d bytes but fixed length is %d", len(b), length))
}
padByte := byte(0x00)
if negative {
padByte = 0xFF
}
result := make([]byte, length)
padding := length - len(b)
for i := range padding {
result[i] = padByte
}
copy(result[padding:], b)
return result
}

View File

@@ -307,8 +307,26 @@ func writeRowsFuncOfPointer(t reflect.Type, schema *Schema, path columnPath, tag
func writeRowsFuncOfSlice(t reflect.Type, schema *Schema, path columnPath, tagReplacements []StructTagOption) writeRowsFunc {
elemType := t.Elem()
elemSize := uintptr(elemType.Size())
// If the current node is a LIST, we need to drill down to the element
// to find the schema node for the slice elements.
if node := findByPath(schema, path); node != nil && isList(node) {
path = path.append("list", "element")
}
writeRows := writeRowsFuncOf(elemType, schema, path, tagReplacements)
// Check if the element schema node is optional.
// This handles the case of `parquet-element:",optional"` tag where
// the list elements themselves are optional (non-pointer basic types).
// For pointer types (like []*string), the pointer handling already
// takes care of optionality via writeRowsFuncOfPointer.
if elemType.Kind() != reflect.Ptr {
if node := findByPath(schema, path); node != nil && node.Optional() {
writeRows = writeRowsFuncOfOptional(elemType, schema, path, writeRows)
}
}
return func(columns []ColumnBuffer, levels columnLevels, rows sparse.Array) {
type sliceHeader struct {
base unsafe.Pointer
@@ -638,14 +656,18 @@ func writeRowsFuncOfMap(t reflect.Type, schema *Schema, path columnPath, tagRepl
}
func writeRowsFuncOfJSON(t reflect.Type, schema *Schema, path columnPath) writeRowsFunc {
// If this is a string or a byte array write directly.
switch t.Kind() {
case reflect.String:
return writeRowsFuncOfRequired(t, schema, path)
case reflect.Slice:
if t.Elem().Kind() == reflect.Uint8 {
return writeRowsFuncOfRequired(t, schema, path)
// If this is a string or a byte array, write directly.
if t.Kind() == reflect.String || (t.Kind() == reflect.Slice && t.Elem().Kind() == reflect.Uint8) {
column, _ := schema.Lookup(path...)
isOptional := column.Node.Optional()
// This fast-path avoids the optional checks, so check here and wrap the writer
// if required.
writer := writeRowsFuncOfRequired(t, schema, path)
if isOptional {
writer = writeRowsFuncOfOptional(t, schema, path, writer)
}
return writer
}
columnIndex := findColumnIndex(schema, schema, path)

View File

@@ -283,7 +283,7 @@ func (c *conversion) Convert(rows []Row) (int, error) {
// Fix: If we have a zero Value{}, convert it to a properly typed value
// For optional fields, keep as null (kind = 0)
// For required fields, convert to typed zero value
if columnValues[i].Kind() == Kind(0) && !conv.isOptional {
if columnValues[i].IsNull() && !conv.isOptional {
columnValues[i] = ZeroValue(conv.targetKind)
}

View File

@@ -46,7 +46,8 @@ func (r *binaryReader) Reader() io.Reader {
func (r *binaryReader) ReadBool() (bool, error) {
v, err := r.ReadByte()
return v != 0, err
// Thrift protocol treats both 0 and 2 as false.
return v != 0 && v != 2, err
}
func (r *binaryReader) ReadInt8() (int8, error) {

View File

@@ -1038,6 +1038,17 @@ type RowGroup struct {
Ordinal int16 `thrift:"7,optional,writezero"`
}
func (r *RowGroup) Reset() {
r.FileOffset = 0
r.NumRows = 0
r.TotalByteSize = 0
r.SortingColumns = r.SortingColumns[:0]
r.Columns = r.Columns[:0]
r.Ordinal = 0
r.TotalByteSize = 0
r.TotalCompressedSize = 0
}
// Empty struct to signal the order defined by the physical or logical type.
type TypeDefinedOrder struct{}
@@ -1125,6 +1136,16 @@ type OffsetIndex struct {
UnencodedByteArrayDataBytes []int64 `thrift:"2,optional"`
}
func (o *OffsetIndex) Reset() {
for k := range o.PageLocations {
o.PageLocations[k].Offset = 0
o.PageLocations[k].CompressedPageSize = 0
o.PageLocations[k].FirstRowIndex = 0
}
o.PageLocations = o.PageLocations[:0]
o.UnencodedByteArrayDataBytes = o.UnencodedByteArrayDataBytes[:0]
}
// Description for ColumnIndex.
// Each <array-field>[i] refers to the page at OffsetIndex.PageLocations[i]
type ColumnIndex struct {
@@ -1174,6 +1195,22 @@ type ColumnIndex struct {
DefinitionLevelHistogram []int64 `thrift:"7,optional"`
}
func (c *ColumnIndex) Reset() {
c.DefinitionLevelHistogram = c.DefinitionLevelHistogram[:0]
c.RepetitionLevelHistogram = c.RepetitionLevelHistogram[:0]
c.NullCounts = c.NullCounts[:0]
c.NullPages = c.NullPages[:0]
c.BoundaryOrder = 0
for k := range c.MaxValues {
c.MaxValues[k] = c.MaxValues[k][:0]
}
c.MaxValues = c.MaxValues[:0]
for k := range c.MinValues {
c.MinValues[k] = c.MinValues[k][:0]
}
c.MinValues = c.MinValues[:0]
}
type AesGcmV1 struct {
// AAD prefix.
AadPrefix []byte `thrift:"1,optional"`

View File

@@ -698,7 +698,7 @@ func mergeTwoNodes(a, b Node) Node {
if !isPlainEncoding(encoding2) {
encoding = encoding2
}
if encoding != nil {
if encoding != nil && canEncode(encoding, merged.Type().Kind()) {
merged = Encoded(merged, encoding)
}
} else {

View File

@@ -1,8 +1,10 @@
package parquet
import (
"fmt"
"reflect"
"slices"
"strconv"
"strings"
"unicode"
"unicode/utf8"
@@ -381,18 +383,70 @@ func goTypeOfLeaf(node Node) reflect.Type {
func goTypeOfGroup(node Node) reflect.Type {
fields := node.Fields()
structFields := make([]reflect.StructField, len(fields))
names := make(map[string]struct{}, len(fields))
for i, field := range fields {
structFields[i].Name = exportedStructFieldName(field.Name())
if strings.IndexByte(field.Name(), ',') != -1 {
// Ruh-roh! We cannot create a valid Go identifier, but neither can we
// create a valid Go struct tag for mapping a synthetic field name.
panic(fmt.Sprintf("schema node contains an invalid name %q: fields must not have commas in their name", field.Name()))
}
name := exportedStructFieldName(field.Name())
for {
if _, alreadyUsed := names[name]; !alreadyUsed {
break
}
name += "_" // add suffix to fix collision
}
names[name] = struct{}{}
structFields[i].Name = name
structFields[i].Type = field.GoType()
// TODO: can we reconstruct a struct tag that would be valid if a value
// of this type were passed to SchemaOf?
structFields[i].Tag = reflect.StructTag(`parquet:` + strconv.Quote(field.Name()))
}
return reflect.StructOf(structFields)
}
func exportedStructFieldName(name string) string {
firstRune, size := utf8.DecodeRuneInString(name)
return string([]rune{unicode.ToUpper(firstRune)}) + name[size:]
if unicode.IsUpper(firstRune) {
return sanitize(name)
}
upperFirstRune := unicode.ToUpper(firstRune)
if upperFirstRune == firstRune {
// First character was not a letter, so just trying to upper-case the first
// character won't export the field. We need to add an upper-case prefix instead.
return "X" + sanitize(name)
}
return string([]rune{upperFirstRune}) + sanitize(name[size:])
}
// sanitize replaces any character that are invalid in a Go identifier with "_".
func sanitize(name string) string {
if isSanitized(name) {
// Fast path: name is fine, no need to allocate or compute anything.
return name
}
var newName strings.Builder
for _, r := range name {
if isValidInGoIdent(r) {
newName.WriteRune(r)
} else {
newName.WriteByte('_')
}
}
return newName.String()
}
func isSanitized(name string) bool {
for _, r := range name {
if !isValidInGoIdent(r) {
return false
}
}
return true
}
func isValidInGoIdent(r rune) bool {
return r == '_' || unicode.IsLetter(r) || unicode.IsDigit(r)
}
func isList(node Node) bool {

View File

@@ -18,10 +18,16 @@ type byteArrayPage struct {
func newByteArrayPage(typ Type, columnIndex int16, numValues int32, values encoding.Values) *byteArrayPage {
data, offsets := values.ByteArray()
if len(offsets) != int(numValues)+1 {
println("parquet: warning: column", columnIndex, "byte array page has", len(offsets), "offsets but numValues is", numValues, "(expected", numValues+1, "offsets)")
}
if int(numValues)+1 <= len(offsets) {
offsets = offsets[:numValues+1]
}
return &byteArrayPage{
typ: typ,
values: memory.SliceBufferFrom(data),
offsets: memory.SliceBufferFrom(offsets[:numValues+1]),
offsets: memory.SliceBufferFrom(offsets),
columnIndex: ^columnIndex,
}
}

View File

@@ -2,6 +2,7 @@ package parquet
import (
"io"
"math"
"github.com/parquet-go/bitpack/unsafecast"
"github.com/parquet-go/parquet-go/encoding"
@@ -50,11 +51,39 @@ func (page *doublePage) max() float64 { return maxFloat64(page.values.Slice()) }
func (page *doublePage) bounds() (min, max float64) { return boundsFloat64(page.values.Slice()) }
// Bounds returns the min and max values in the page. NaN values are excluded
// from the result when non-NaN values exist so that query engines can rely on
// min/max for predicate pushdown and row-group/page skipping. This matches the
// behavior of Apache parquet-mr (PARQUET-1246), Apache Arrow, and the Apache
// Iceberg spec (which states lower/upper bounds apply to non-null, non-NaN
// values only). If all values are NaN, the bounds are reported as NaN so that
// readers know the page had data.
func (page *doublePage) Bounds() (min, max Value, ok bool) {
if ok = page.values.Len() > 0; ok {
minFloat, maxFloat := page.bounds()
min = page.makeValue(minFloat)
max = page.makeValue(maxFloat)
data := page.values.Slice()
i := 0
for i < len(data) && math.IsNaN(data[i]) {
i++
}
if i >= len(data) {
min = page.makeValue(data[0])
max = page.makeValue(data[0])
return min, max, ok
}
lo, hi := data[i], data[i]
for _, v := range data[i+1:] {
if math.IsNaN(v) {
continue
}
if v < lo {
lo = v
}
if v > hi {
hi = v
}
}
min = page.makeValue(lo)
max = page.makeValue(hi)
}
return min, max, ok
}

View File

@@ -2,6 +2,7 @@ package parquet
import (
"io"
"math"
"github.com/parquet-go/bitpack/unsafecast"
"github.com/parquet-go/parquet-go/encoding"
@@ -50,11 +51,40 @@ func (page *floatPage) max() float32 { return maxFloat32(page.values.Slice()) }
func (page *floatPage) bounds() (min, max float32) { return boundsFloat32(page.values.Slice()) }
// Bounds returns the min and max values in the page. NaN values are excluded
// from the result when non-NaN values exist so that query engines can rely on
// min/max for predicate pushdown and row-group/page skipping. This matches the
// behavior of Apache parquet-mr (PARQUET-1246), Apache Arrow, and the Apache
// Iceberg spec (which states lower/upper bounds apply to non-null, non-NaN
// values only). If all values are NaN, the bounds are reported as NaN so that
// readers know the page had data.
func (page *floatPage) Bounds() (min, max Value, ok bool) {
if ok = page.values.Len() > 0; ok {
minFloat32, maxFloat32 := page.bounds()
min = page.makeValue(minFloat32)
max = page.makeValue(maxFloat32)
data := page.values.Slice()
i := 0
for i < len(data) && math.IsNaN(float64(data[i])) {
i++
}
if i >= len(data) {
// All values are NaN.
min = page.makeValue(data[0])
max = page.makeValue(data[0])
return min, max, ok
}
lo, hi := data[i], data[i]
for _, v := range data[i+1:] {
if math.IsNaN(float64(v)) {
continue
}
if v < lo {
lo = v
}
if v > hi {
hi = v
}
}
min = page.makeValue(lo)
max = page.makeValue(hi)
}
return min, max, ok
}

View File

@@ -92,7 +92,12 @@ func (r *optionalPageValues) ReadValues(values []Value) (n int, err error) {
}
if n < i {
for j, err = r.values.ReadValues(values[n:i]); j > 0; j-- {
j, err := r.values.ReadValues(values[n:i])
if j == 0 && err == io.EOF {
// Underlying page exhausted before definitionLevels - corrupted file
return n, io.EOF
}
for ; j > 0; j-- {
values[n].definitionLevel = maxDefinitionLevel
r.offset++
n++

View File

@@ -152,7 +152,12 @@ func (r *repeatedPageValues) ReadValues(values []Value) (n int, err error) {
// Copy all the non-zero values in this run.
if n < i {
for j, err = r.values.ReadValues(values[n:i]); j > 0; j-- {
j, err := r.values.ReadValues(values[n:i])
if j == 0 && err == io.EOF {
// Underlying page exhausted before definitionLevels - corrupted file
return n, io.EOF
}
for ; j > 0; j-- {
values[n].repetitionLevel = repetitionLevels[r.offset]
values[n].definitionLevel = maxDefinitionLevel
r.offset++

View File

@@ -705,7 +705,99 @@ func reconstructFuncOfRequired(columnIndex int16, node Node) (int16, reconstruct
}
func reconstructFuncOfList(columnIndex int16, node Node) (int16, reconstructFunc) {
return reconstructFuncOf(columnIndex, Repeated(listElementOf(node)))
elem := listElementOf(node)
// If the list element is optional (e.g., from `parquet-element:",optional"`),
// we need to handle it specially because the normal path through
// reconstructFuncOfRepeated would wrap the node with Required() which
// hides the Optional property.
if elem.Optional() {
return reconstructFuncOfRepeatedOptional(columnIndex, elem)
}
return reconstructFuncOf(columnIndex, Repeated(elem))
}
// reconstructFuncOfRepeatedOptional handles the case where list elements are optional.
// This is needed because reconstructFuncOfRepeated uses Required() which hides
// the Optional property of the inner node.
//
//go:noinline
func reconstructFuncOfRepeatedOptional(columnIndex int16, node Node) (int16, reconstructFunc) {
// node is Optional(X), get the inner reconstruction for the required version
nextColumnIndex, reconstruct := reconstructFuncOf(columnIndex, Required(node))
return nextColumnIndex, func(value reflect.Value, levels columnLevels, columns [][]Value) error {
// Increment both for the repeated and optional levels
levels.repetitionDepth++
levels.definitionLevel += 2 // +1 for repeated, +1 for optional
// Handle empty groups (no columns)
if len(columns) == 0 || len(columns[0]) == 0 {
setMakeSlice(value, 0)
return nil
}
// Check if the list itself is null (definition level less than the repeated level)
// We need to check against (levels.definitionLevel - 1) because that's the repeated level
if columns[0][0].definitionLevel < levels.definitionLevel-1 {
setMakeSlice(value, 0)
return nil
}
values := make([][]Value, len(columns))
column := columns[0]
n := 0
for i, column := range columns {
values[i] = column[0:0:len(column)]
}
for i := 0; i < len(column); {
i++
n++
for i < len(column) && column[i].repetitionLevel > levels.repetitionDepth {
i++
}
}
value = setMakeSlice(value, n)
for i := range n {
for j, column := range values {
column = column[:cap(column)]
if len(column) == 0 {
continue
}
k := 1
for k < len(column) && column[k].repetitionLevel > levels.repetitionDepth {
k++
}
values[j] = column[:k]
}
// Check if this element is null (definition level indicates null)
// An element is null if its definition level is less than the max (which includes optional)
elemValue := value.Index(i)
if len(values) > 0 && len(values[0]) > 0 && values[0][0].definitionLevel < levels.definitionLevel {
// Element is null, leave as zero value
elemValue.SetZero()
} else {
if err := reconstruct(elemValue, levels, values); err != nil {
return err
}
}
for j, column := range values {
values[j] = column[len(column):len(column):cap(column)]
}
levels.repetitionLevel = levels.repetitionDepth
}
return nil
}
}
//go:noinline

View File

@@ -2,6 +2,7 @@ package parquet
import (
"io"
"math"
"sort"
"github.com/parquet-go/parquet-go/deprecated"
@@ -225,21 +226,48 @@ func (p *rowBufferPage) NumNulls() int64 {
}
func (p *rowBufferPage) Bounds() (min, max Value, ok bool) {
var nanValue Value
hasNaN := false
p.scan(func(value Value) {
if !value.IsNull() {
switch {
case !ok:
min, max, ok = value, value, true
case p.typ.Compare(value, min) < 0:
min = value
case p.typ.Compare(value, max) > 0:
max = value
}
if value.IsNull() {
return
}
// NaN is excluded from min/max so that query engines can rely on
// statistics for predicate pushdown. See floatPage.Bounds for the
// full rationale.
if isNaN(value) {
nanValue = value
hasNaN = true
return
}
switch {
case !ok:
min, max, ok = value, value, true
case p.typ.Compare(value, min) < 0:
min = value
case p.typ.Compare(value, max) > 0:
max = value
}
})
if !ok && hasNaN {
min, max, ok = nanValue, nanValue, true
}
return min, max, ok
}
func isNaN(v Value) bool {
switch v.Kind() {
case Float:
return math.IsNaN(float64(v.Float()))
case Double:
return math.IsNaN(v.Double())
default:
return false
}
}
func (p *rowBufferPage) Size() int64 { return 0 }
func (p *rowBufferPage) Values() ValueReader {

View File

@@ -1152,6 +1152,21 @@ func makeNodeOf(path []string, t reflect.Type, name string, tags parquetTags, ta
throwInvalidTag(t, name, option+args)
}
setNode(TimeAdjusted(timeUnit, adjusted))
case reflect.Ptr:
// Support *time.Duration with time tag
if t.Elem() == reflect.TypeFor[time.Duration]() {
timeUnit, adjusted, err := parseTimestampArgs(args)
if err != nil {
throwInvalidTag(t, name, option+args)
}
if args == "()" {
timeUnit = Nanosecond
adjusted = true
}
setNode(Optional(TimeAdjusted(timeUnit, adjusted)))
} else {
throwInvalidTag(t, name, option)
}
default:
throwInvalidTag(t, name, option)
}

View File

@@ -73,18 +73,11 @@ func (w *SortingWriter[T]) Close() error {
if err := w.Flush(); err != nil {
return err
}
return w.output.Close()
}
func (w *SortingWriter[T]) Flush() error {
defer w.resetSortingBuffer()
if err := w.sortAndWriteBufferedRows(); err != nil {
return err
}
if w.numRows == 0 {
return nil
return w.output.Close()
}
if err := w.writer.Close(); err != nil {
@@ -129,7 +122,18 @@ func (w *SortingWriter[T]) Flush() error {
return err
}
return w.output.Flush()
if err := w.output.Flush(); err != nil {
return err
}
return w.output.Close()
}
// Flush sorts any buffered rows and writes them to temporary storage.
// This can be called multiple times to manage memory usage.
// The actual merge and write to output happens on Close.
func (w *SortingWriter[T]) Flush() error {
return w.sortAndWriteBufferedRows()
}
func (w *SortingWriter[T]) Reset(output io.Writer) {

View File

@@ -1,6 +1,12 @@
package parquet
import (
"log"
"math"
"math/big"
"reflect"
"strconv"
"github.com/parquet-go/parquet-go/deprecated"
"github.com/parquet-go/parquet-go/format"
)
@@ -11,7 +17,18 @@ import (
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal
func Decimal(scale, precision int, typ Type) Node {
switch typ.Kind() {
case Int32, Int64, ByteArray, FixedLenByteArray:
case Int32:
if precision < 1 || precision > 9 {
panic("DECIMAL annotated with Int32 must have precision >= 1 and <= 9, got " + strconv.Itoa(precision))
}
case Int64:
if precision < 1 || precision > 18 {
panic("DECIMAL annotated with Int32 must have precision >= 1 and <= 9, got " + strconv.Itoa(precision))
}
if precision < 10 {
log.Printf("WARNING: DECIMAL annotated with Int64 should have a precision >= 10, got %d", precision)
}
case ByteArray, FixedLenByteArray:
default:
panic("DECIMAL node must annotate Int32, Int64, ByteArray or FixedLenByteArray but got " + typ.String())
}
@@ -38,3 +55,50 @@ func (t *decimalType) LogicalType() *format.LogicalType {
func (t *decimalType) ConvertedType() *deprecated.ConvertedType {
return &convertedTypes[deprecated.Decimal]
}
func (t *decimalType) AssignValue(dst reflect.Value, src Value) error {
switch t.Type {
case Int32Type:
switch dst.Kind() {
case reflect.Int32:
dst.SetInt(int64(src.int32()))
default:
dst.Set(reflect.ValueOf(float32(src.int32()) / float32(math.Pow10(int(t.decimal.Scale)))))
}
case Int64Type:
switch dst.Kind() {
case reflect.Int64:
dst.SetInt(src.int64())
default:
dst.Set(reflect.ValueOf(float64(src.int64()) / math.Pow10(int(t.decimal.Scale))))
}
default:
// ByteArray and FixedLenByteArray
if t.Type.Kind() != ByteArray && t.Type.Kind() != FixedLenByteArray {
return nil
}
data := src.ByteArray()
val := new(big.Int)
if len(data) > 0 && data[0]&0x80 != 0 {
// Negative number: convert from two's complement
tmp := make([]byte, len(data))
for i, b := range data {
tmp[i] = ^b
}
val.SetBytes(tmp)
val.Add(val, big.NewInt(1))
val.Neg(val)
} else {
val.SetBytes(data)
}
// Use enough precision to represent the decimal value accurately
// precision * log2(10) ≈ precision * 3.32, round up generously
prec := max(uint(t.decimal.Precision)*4+64, 192)
f := new(big.Float).SetPrec(prec).SetInt(val)
scaleFactor := new(big.Float).SetPrec(prec)
scaleFactor.SetInt(new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(t.decimal.Scale)), nil))
f.Quo(f, scaleFactor)
dst.Set(reflect.ValueOf(f))
}
return nil
}

View File

@@ -10,19 +10,19 @@ import (
type int32Type struct{}
func (t int32Type) String() string { return "INT32" }
func (t int32Type) Kind() Kind { return Int32 }
func (t int32Type) Length() int { return 32 }
func (t int32Type) EstimateSize(n int) int { return 4 * n }
func (t int32Type) EstimateNumValues(n int) int { return n / 4 }
func (t int32Type) Compare(a, b Value) int { return compareInt32(a.int32(), b.int32()) }
func (t int32Type) ColumnOrder() *format.ColumnOrder { return &typeDefinedColumnOrder }
func (t int32Type) LogicalType() *format.LogicalType {
return &format.LogicalType{Integer: &format.IntType{
BitWidth: 32,
IsSigned: true,
}}
}
var int32LogicalType = format.LogicalType{Integer: &format.IntType{
BitWidth: 32,
IsSigned: true,
}}
func (t int32Type) String() string { return "INT32" }
func (t int32Type) Kind() Kind { return Int32 }
func (t int32Type) Length() int { return 32 }
func (t int32Type) EstimateSize(n int) int { return 4 * n }
func (t int32Type) EstimateNumValues(n int) int { return n / 4 }
func (t int32Type) Compare(a, b Value) int { return compareInt32(a.int32(), b.int32()) }
func (t int32Type) ColumnOrder() *format.ColumnOrder { return &typeDefinedColumnOrder }
func (t int32Type) LogicalType() *format.LogicalType { return &int32LogicalType }
func (t int32Type) ConvertedType() *deprecated.ConvertedType { return nil }
func (t int32Type) PhysicalType() *format.Type { return &physicalTypes[Int32] }

View File

@@ -10,19 +10,19 @@ import (
type int64Type struct{}
func (t int64Type) String() string { return "INT64" }
func (t int64Type) Kind() Kind { return Int64 }
func (t int64Type) Length() int { return 64 }
func (t int64Type) EstimateSize(n int) int { return 8 * n }
func (t int64Type) EstimateNumValues(n int) int { return n / 8 }
func (t int64Type) Compare(a, b Value) int { return compareInt64(a.int64(), b.int64()) }
func (t int64Type) ColumnOrder() *format.ColumnOrder { return &typeDefinedColumnOrder }
func (t int64Type) LogicalType() *format.LogicalType {
return &format.LogicalType{Integer: &format.IntType{
BitWidth: 64,
IsSigned: true,
}}
}
var int64LogicalType = format.LogicalType{Integer: &format.IntType{
BitWidth: 64,
IsSigned: true,
}}
func (t int64Type) String() string { return "INT64" }
func (t int64Type) Kind() Kind { return Int64 }
func (t int64Type) Length() int { return 64 }
func (t int64Type) EstimateSize(n int) int { return 8 * n }
func (t int64Type) EstimateNumValues(n int) int { return n / 8 }
func (t int64Type) Compare(a, b Value) int { return compareInt64(a.int64(), b.int64()) }
func (t int64Type) ColumnOrder() *format.ColumnOrder { return &typeDefinedColumnOrder }
func (t int64Type) LogicalType() *format.LogicalType { return &int64LogicalType }
func (t int64Type) ConvertedType() *deprecated.ConvertedType { return nil }
func (t int64Type) PhysicalType() *format.Type { return &physicalTypes[Int64] }

View File

@@ -21,6 +21,8 @@ func (listNode) Type() Type { return &listType{} }
type listType format.ListType
var listLogicalType = format.LogicalType{List: new(format.ListType)}
func (t *listType) String() string { return (*format.ListType)(t).String() }
func (t *listType) Kind() Kind { panic("cannot call Kind on parquet LIST type") }
@@ -37,9 +39,7 @@ func (t *listType) ColumnOrder() *format.ColumnOrder { return nil }
func (t *listType) PhysicalType() *format.Type { return nil }
func (t *listType) LogicalType() *format.LogicalType {
return &format.LogicalType{List: (*format.ListType)(t)}
}
func (t *listType) LogicalType() *format.LogicalType { return &listLogicalType }
func (t *listType) ConvertedType() *deprecated.ConvertedType {
return &convertedTypes[deprecated.List]

View File

@@ -26,6 +26,8 @@ func (mapNode) Type() Type { return &mapType{} }
type mapType format.MapType
var mapLogicalType = format.LogicalType{Map: new(format.MapType)}
func (t *mapType) String() string { return (*format.MapType)(t).String() }
func (t *mapType) Kind() Kind { panic("cannot call Kind on parquet MAP type") }
@@ -42,9 +44,7 @@ func (t *mapType) ColumnOrder() *format.ColumnOrder { return nil }
func (t *mapType) PhysicalType() *format.Type { return nil }
func (t *mapType) LogicalType() *format.LogicalType {
return &format.LogicalType{Map: (*format.MapType)(t)}
}
func (t *mapType) LogicalType() *format.LogicalType { return &mapLogicalType }
func (t *mapType) ConvertedType() *deprecated.ConvertedType {
return &convertedTypes[deprecated.Map]

View File

@@ -10,6 +10,8 @@ import (
type nullType format.NullType
var nullLogicalType = format.LogicalType{Unknown: new(format.NullType)}
func (t *nullType) String() string { return (*format.NullType)(t).String() }
func (t *nullType) Kind() Kind { return -1 }
@@ -26,9 +28,7 @@ func (t *nullType) ColumnOrder() *format.ColumnOrder { return nil }
func (t *nullType) PhysicalType() *format.Type { return nil }
func (t *nullType) LogicalType() *format.LogicalType {
return &format.LogicalType{Unknown: (*format.NullType)(t)}
}
func (t *nullType) LogicalType() *format.LogicalType { return &nullLogicalType }
func (t *nullType) ConvertedType() *deprecated.ConvertedType { return nil }

View File

@@ -1,30 +1,160 @@
package parquet
import (
"errors"
"fmt"
"reflect"
"strings"
"github.com/parquet-go/parquet-go/deprecated"
"github.com/parquet-go/parquet-go/encoding"
"github.com/parquet-go/parquet-go/format"
)
// Variant constructs a node of unshredded VARIANT logical type. It is a group with
// Variant constructs a node of unshredded [VARIANT logical type]. It is a group with
// two required fields, "metadata" and "value", both byte arrays.
//
// Experimental: The specification for variants is still being developed and the type
// is not fully adopted. Support for this type is subject to change.
// *Experimental*: Support for the VARIANT type is still being developed and subject to
// change.
//
// Initial support does not attempt to process the variant data. So reading and writing
// data of this type behaves as if it were just a group with two byte array fields, as
// if the logical type annotation were absent. This may change in the future.
//
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#variant
// [VARIANT logical type]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#variant
func Variant() Node {
return variantNode{Group{"metadata": Required(Leaf(ByteArrayType)), "value": Required(Leaf(ByteArrayType))}}
}
// TODO: add ShreddedVariant(Node) function, to create a shredded variant
// where the argument defines the type/structure of the shredded value(s).
// ShreddedVariant constructs a node of shredded [VARIANT logical type]. It is a group
// with a required byte array "metadata" field, an optional byte array "value" field
// (for any unshredded values), and an optional "typed_value" field whose type is that
// of the shredded value. If the given node is a group or list (or contains a list),
// the resulting "typed_value" field will have some additional structure to allow
// each group or element in a list to have a mix of shredded and unshredded data.
//
// The given node may only contain types that map to valid [variant value types].
// Therefore, it may not contain ENUM, FLOAT16, INTERVAL, JSON, BSON, VARIANT,
// GEOMETRY, GEOGRAPHY, MAP, or UNKNOWN logical types. It may only use signed INT
// logical types. It may only use DECIMAL logical types whose precision is less than
// or equal to 38. It may not contain any repeated fields unless they are the middle
// level of a 3-level LIST logical type. Any "required" settings on fields will be
// ignored: shredded fields must always be optional to represent values that may not
// conform to the shredded type. It also may not contain empty groups: any groups
// must have at least one field.
//
// More information on shredded variants can be found in the [Parquet documentation].
//
// *Experimental*: Support for the VARIANT type is still being developed and subject to
// change.
//
// Initial support does not attempt to process the variant data. So reading and writing
// data of this type behaves as if it were just a group with the three described fields,
// as if the logical type annotation were absent. This may change in the future.
//
// [VARIANT logical type]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#variant
// [variant value types]: https://github.com/apache/parquet-format/blob/master/VariantShredding.md#shredded-value-types
// [Parquet documentation]: https://github.com/apache/parquet-format/blob/master/VariantShredding.md
func ShreddedVariant(shreddedType Node) (Node, error) {
typedNode, err := variantTypedValueNode(shreddedType)
if err != nil {
return nil, err
}
return variantNode{Group{
"metadata": Required(Leaf(ByteArrayType)),
"value": Optional(Leaf(ByteArrayType)),
"typed_value": Optional(typedNode),
}}, nil
}
func variantTypedValueNode(node Node, fieldPath ...string) (typed Node, err error) {
defer func() {
if err != nil && len(fieldPath) > 0 {
err = fmt.Errorf("field %s: %w", strings.Join(fieldPath, "."), err)
}
}()
if node.Repeated() {
return nil, errors.New("repeated types are not allowed unless they are part of a 3-level LIST logical type")
}
if lt := node.Type().LogicalType(); lt != nil {
switch lt := getLogicalType(lt).(type) {
case *format.ListType:
// We must first extract the inner list.element field of the 3-level LIST type.
children := node.Fields()
if len(children) != 1 {
return nil, fmt.Errorf("invalid LIST logical type: expecting a single 'list' field but instead found %d", len(children))
}
grandchildren := children[0].Fields()
if len(grandchildren) != 1 {
return nil, fmt.Errorf("invalid LIST logical type: expecting a single 'element' field but instead found %d", len(grandchildren))
}
elementNode, err := variantTypedValueNode(grandchildren[0], fieldPath...)
if err != nil {
return nil, err
}
list := List(Required(Group{
"value": Optional(Leaf(ByteArrayType)),
"typed_value": Optional(elementNode),
}))
if node.ID() != 0 {
return FieldID(list, node.ID()), nil
}
return list, nil
case *format.IntType:
if !lt.IsSigned {
return nil, errors.New("signed INT logical types are not allowed")
}
case *format.DecimalType:
if lt.Precision > 38 {
return nil, fmt.Errorf("DECIMAL logical types with precision >38 are not allowed (got %d)", lt.Precision)
}
case *format.DateType:
case *format.TimeType:
case *format.TimestampType:
case *format.StringType:
case *format.UUIDType:
default:
// No other logical types are allowed.
return nil, fmt.Errorf("%s logical types are not allowed", lt)
}
}
if node.Leaf() {
return node, nil
}
children := node.Fields()
if len(children) == 0 {
return nil, errors.New("empty groups are not allowed")
}
group := make(Group, len(children))
for _, child := range children {
childNode, err := variantTypedValueNode(child, append(fieldPath, child.Name())...)
if err != nil {
return nil, err
}
group[child.Name()] = Required(Group{
"value": Optional(Leaf(ByteArrayType)),
"typed_value": Optional(childNode),
})
}
if node.ID() != 0 {
return FieldID(group, node.ID()), nil
}
return group, nil
}
func getLogicalType(lt *format.LogicalType) any {
// We use reflection so we can always catch a logical type annotation. If a
// new one is added, we don't have to remember to update the switch above (unless
// a new one is added that is also supported as a variant value).
refVal := reflect.Indirect(reflect.ValueOf(lt))
for i := range refVal.NumField() {
field := refVal.Field(i)
if field.CanInterface() && !field.IsZero() {
return field.Interface()
}
}
return nil
}
type variantNode struct{ Group }
@@ -32,6 +162,8 @@ func (variantNode) Type() Type { return &variantType{} }
type variantType format.VariantType
var variantLogicalType = format.LogicalType{Variant: new(format.VariantType)}
func (t *variantType) String() string { return (*format.VariantType)(t).String() }
func (t *variantType) Kind() Kind { panic("cannot call Kind on parquet VARIANT type") }
@@ -50,9 +182,7 @@ func (t *variantType) ColumnOrder() *format.ColumnOrder { return nil }
func (t *variantType) PhysicalType() *format.Type { return nil }
func (t *variantType) LogicalType() *format.LogicalType {
return &format.LogicalType{Variant: (*format.VariantType)(t)}
}
func (t *variantType) LogicalType() *format.LogicalType { return &variantLogicalType }
func (t *variantType) ConvertedType() *deprecated.ConvertedType { return nil }

View File

@@ -1026,13 +1026,19 @@ func (w *writer) reset(writer io.Writer) {
}
w.currentRowGroup.reset()
for i := range w.rowGroups {
w.rowGroups[i] = format.RowGroup{}
w.rowGroups[i].Reset()
}
for i := range w.columnIndexes {
w.columnIndexes[i] = nil
for j := range w.columnIndexes[i] {
w.columnIndexes[i][j].Reset()
}
w.columnIndexes[i] = w.columnIndexes[i][:0]
}
for i := range w.offsetIndexes {
w.offsetIndexes[i] = nil
for j := range w.offsetIndexes[i] {
w.offsetIndexes[i][j].Reset()
}
w.offsetIndexes[i] = w.offsetIndexes[i][:0]
}
w.rowGroups = w.rowGroups[:0]
w.columnIndexes = w.columnIndexes[:0]
@@ -1155,6 +1161,10 @@ func (w *writer) writeRowGroup(rg *ConcurrentRowGroupWriter, rowGroupSchema *Sch
return 0, nil
}
rowGroupIndex := len(w.rowGroups)
columnIndexIndex := len(w.columnIndexes)
offsetIndexIndex := len(w.offsetIndexes)
if len(w.rowGroups) == MaxRowGroups {
return 0, ErrTooManyRowGroups
}
@@ -1178,10 +1188,9 @@ func (w *writer) writeRowGroup(rg *ConcurrentRowGroupWriter, rowGroupSchema *Sch
fileOffset := w.writer.offset
for i, c := range rg.columns {
columnIndex := c.columnIndex.ColumnIndex()
columnIndex.RepetitionLevelHistogram = slices.Clone(c.pageRepetitionLevelHistograms)
columnIndex.DefinitionLevelHistogram = slices.Clone(c.pageDefinitionLevelHistograms)
rg.columnIndex[i] = columnIndex
rg.columnIndex[i] = c.columnIndex.ColumnIndex()
rg.columnIndex[i].RepetitionLevelHistogram = append(rg.columnIndex[i].RepetitionLevelHistogram[:0], c.pageRepetitionLevelHistograms...)
rg.columnIndex[i].DefinitionLevelHistogram = append(rg.columnIndex[i].DefinitionLevelHistogram[:0], c.pageDefinitionLevelHistograms...)
c.columnChunk.MetaData.SizeStatistics = format.SizeStatistics{
UnencodedByteArrayDataBytes: c.totalUnencodedByteArrayBytes,
@@ -1239,9 +1248,20 @@ func (w *writer) writeRowGroup(rg *ConcurrentRowGroupWriter, rowGroupSchema *Sch
totalCompressedSize += int64(c.TotalCompressedSize)
}
var reuseRowGroup *format.RowGroup = nil
if cap(w.rowGroups) > rowGroupIndex {
w.rowGroups = w.rowGroups[:rowGroupIndex+1] // Extend the slice by one element
reuseRowGroup = &w.rowGroups[rowGroupIndex] // Pointer to the last element
}
sortingColumns := w.sortingColumns
if len(sortingColumns) == 0 && len(rowGroupSortingColumns) > 0 {
sortingColumns = make([]format.SortingColumn, 0, len(rowGroupSortingColumns))
scLen := len(rowGroupSortingColumns)
if reuseRowGroup == nil {
sortingColumns = make([]format.SortingColumn, 0, scLen)
} else {
sortingColumns = reuseRowGroup.SortingColumns
}
forEachLeafColumnOf(rowGroupSchema, func(leaf leafColumn) {
if sortingIndex := searchSortingColumn(rowGroupSortingColumns, leaf.path); sortingIndex < len(sortingColumns) {
sortingColumns[sortingIndex] = format.SortingColumn{
@@ -1253,13 +1273,47 @@ func (w *writer) writeRowGroup(rg *ConcurrentRowGroupWriter, rowGroupSchema *Sch
})
}
columns := slices.Clone(rg.columnChunk)
columnIndex := slices.Clone(rg.columnIndex)
offsetIndex := slices.Clone(rg.offsetIndex)
var reuseColumnIndex *[]format.ColumnIndex = nil
if cap(w.columnIndexes) > columnIndexIndex {
// Extend the slice by one element
w.columnIndexes = w.columnIndexes[:columnIndexIndex+1]
reuseColumnIndex = &w.columnIndexes[columnIndexIndex]
}
var columnIndex []format.ColumnIndex
if reuseColumnIndex != nil {
// Copy the slice
columnIndex = append(*reuseColumnIndex, rg.columnIndex...)
w.columnIndexes[columnIndexIndex] = columnIndex
} else {
columnIndex = slices.Clone(rg.columnIndex)
w.columnIndexes = append(w.columnIndexes, columnIndex)
}
var reuseOffsetIndex *[]format.OffsetIndex = nil
if cap(w.offsetIndexes) > offsetIndexIndex {
// Extend the slice by one element
w.offsetIndexes = w.offsetIndexes[:offsetIndexIndex+1]
reuseOffsetIndex = &w.offsetIndexes[offsetIndexIndex]
}
var offsetIndex []format.OffsetIndex
if reuseOffsetIndex != nil {
offsetIndex = append(*reuseOffsetIndex, rg.offsetIndex...)
w.offsetIndexes[offsetIndexIndex] = offsetIndex
} else {
offsetIndex = slices.Clone(rg.offsetIndex)
w.offsetIndexes = append(w.offsetIndexes, offsetIndex)
}
// If the existing row group slice is full, we need to create a new element
var columns []format.ColumnChunk = nil
if reuseRowGroup == nil {
columns = slices.Clone(rg.columnChunk)
} else {
columns = append(reuseRowGroup.Columns, rg.columnChunk...)
}
for i := range columns {
c := &columns[i]
c.MetaData.EncodingStats = slices.Clone(rg.columnChunk[i].MetaData.EncodingStats)
c.MetaData.EncodingStats = append(c.MetaData.EncodingStats[:0], rg.columnChunk[i].MetaData.EncodingStats...)
}
for i := range offsetIndex {
@@ -1267,18 +1321,26 @@ func (w *writer) writeRowGroup(rg *ConcurrentRowGroupWriter, rowGroupSchema *Sch
c.PageLocations = slices.Clone(rg.offsetIndex[i].PageLocations)
}
w.rowGroups = append(w.rowGroups, format.RowGroup{
Columns: columns,
TotalByteSize: totalByteSize,
NumRows: numRows,
SortingColumns: sortingColumns,
FileOffset: fileOffset,
TotalCompressedSize: totalCompressedSize,
Ordinal: int16(len(w.rowGroups)),
})
if reuseRowGroup == nil {
w.rowGroups = append(w.rowGroups, format.RowGroup{
Columns: columns,
TotalByteSize: totalByteSize,
NumRows: numRows,
SortingColumns: sortingColumns,
FileOffset: fileOffset,
TotalCompressedSize: totalCompressedSize,
Ordinal: int16(len(w.rowGroups)),
})
} else {
reuseRowGroup.Columns = columns
reuseRowGroup.TotalByteSize = totalByteSize
reuseRowGroup.NumRows = numRows
reuseRowGroup.SortingColumns = sortingColumns
reuseRowGroup.FileOffset = fileOffset
reuseRowGroup.TotalCompressedSize = totalCompressedSize
reuseRowGroup.Ordinal = int16(len(w.rowGroups) - 1)
}
w.columnIndexes = append(w.columnIndexes, columnIndex)
w.offsetIndexes = append(w.offsetIndexes, offsetIndex)
return numRows, nil
}
@@ -1946,7 +2008,15 @@ func (c *ColumnWriter) fallbackDictionaryToPlain() error {
c.columnType = indexedType.Type
}
if c.plainColumnBuffer == nil {
c.plainColumnBuffer = c.columnType.NewColumnBuffer(int(c.bufferIndex), int(c.bufferSize))
base := c.columnType.NewColumnBuffer(int(c.bufferIndex), int(c.bufferSize))
switch {
case c.maxRepetitionLevel > 0:
c.plainColumnBuffer = newRepeatedColumnBuffer(base, c.maxRepetitionLevel, c.maxDefinitionLevel, nullsGoLast)
case c.maxDefinitionLevel > 0:
c.plainColumnBuffer = newOptionalColumnBuffer(base, c.maxDefinitionLevel, nullsGoLast)
default:
c.plainColumnBuffer = base
}
}
c.columnBuffer = c.plainColumnBuffer
c.encoding = &plain.Encoding{}
@@ -2045,16 +2115,20 @@ func (c *ColumnWriter) recordPageStats(headerSize int32, header *format.PageHead
definitionLevels := page.DefinitionLevels()
if c.maxRepetitionLevel > 0 {
accumulateLevelHistogram(c.repetitionLevelHistogram, repetitionLevels)
c.pageRepetitionLevelHistograms = appendPageLevelHistogram(
c.pageRepetitionLevelHistograms, repetitionLevels, c.maxRepetitionLevel,
c.pageRepetitionLevelHistograms = accumulateAndAppendPageLevelHistogram(
c.repetitionLevelHistogram,
c.pageRepetitionLevelHistograms,
repetitionLevels,
c.maxRepetitionLevel,
)
}
if c.maxDefinitionLevel > 0 {
accumulateLevelHistogram(c.definitionLevelHistogram, definitionLevels)
c.pageDefinitionLevelHistograms = appendPageLevelHistogram(
c.pageDefinitionLevelHistograms, definitionLevels, c.maxDefinitionLevel,
c.pageDefinitionLevelHistograms = accumulateAndAppendPageLevelHistogram(
c.definitionLevelHistogram,
c.pageDefinitionLevelHistograms,
definitionLevels,
c.maxDefinitionLevel,
)
}
}

View File

@@ -29,10 +29,7 @@ func appendPageLevelHistogram(histograms []int64, levels []byte, maxLevel byte)
histSize := int(maxLevel) + 1
startIndex := len(histograms)
histograms = slices.Grow(histograms, histSize)[:startIndex+histSize]
for i := range histSize {
histograms[startIndex+i] = 0
}
clear(histograms[startIndex : startIndex+histSize])
for _, level := range levels {
histograms[startIndex+int(level)]++
@@ -40,3 +37,25 @@ func appendPageLevelHistogram(histograms []int64, levels []byte, maxLevel byte)
return histograms
}
// accumulateAndAppendPageLevelHistogram combines accumulateLevelHistogram and
// appendPageLevelHistogram into a single pass through the levels array.
// It updates both the column-level histogram and creates a per-page histogram.
func accumulateAndAppendPageLevelHistogram(
columnHistogram []int64,
pageHistograms []int64,
levels []byte,
maxLevel byte,
) []int64 {
histSize := int(maxLevel) + 1
startIndex := len(pageHistograms)
pageHistograms = slices.Grow(pageHistograms, histSize)[:startIndex+histSize]
clear(pageHistograms[startIndex : startIndex+histSize])
for _, level := range levels {
columnHistogram[level]++
pageHistograms[startIndex+int(level)]++
}
return pageHistograms
}

2
vendor/modules.txt vendored
View File

@@ -1691,7 +1691,7 @@ github.com/parquet-go/bitpack/unsafecast
# github.com/parquet-go/jsonlite v1.0.0
## explicit; go 1.23
github.com/parquet-go/jsonlite
# github.com/parquet-go/parquet-go v0.27.0
# github.com/parquet-go/parquet-go v0.28.0
## explicit; go 1.24.9
github.com/parquet-go/parquet-go
github.com/parquet-go/parquet-go/bloom