diff --git a/go.mod b/go.mod index df66aa5738..0f61e68fa7 100644 --- a/go.mod +++ b/go.mod @@ -140,7 +140,7 @@ require ( github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db github.com/ncw/swift/v2 v2.0.5 github.com/oklog/ulid/v2 v2.1.1 - github.com/parquet-go/parquet-go v0.27.0 + github.com/parquet-go/parquet-go v0.28.0 github.com/pressly/goose/v3 v3.26.0 github.com/prometheus/alertmanager v0.31.1 github.com/prometheus/common/sigv4 v0.1.0 diff --git a/go.sum b/go.sum index 9a46e17b45..47c2628ce3 100644 --- a/go.sum +++ b/go.sum @@ -965,8 +965,8 @@ github.com/parquet-go/bitpack v1.0.0 h1:AUqzlKzPPXf2bCdjfj4sTeacrUwsT7NlcYDMUQxP github.com/parquet-go/bitpack v1.0.0/go.mod h1:XnVk9TH+O40eOOmvpAVZ7K2ocQFrQwysLMnc6M/8lgs= github.com/parquet-go/jsonlite v1.0.0 h1:87QNdi56wOfsE5bdgas0vRzHPxfJgzrXGml1zZdd7VU= github.com/parquet-go/jsonlite v1.0.0/go.mod h1:nDjpkpL4EOtqs6NQugUsi0Rleq9sW/OtC1NnZEnxzF0= -github.com/parquet-go/parquet-go v0.27.0 h1:vHWK2xaHbj+v1DYps03yDRpEsdtOeKbhiXUaixoPb3g= -github.com/parquet-go/parquet-go v0.27.0/go.mod h1:navtkAYr2LGoJVp141oXPlO/sxLvaOe3la2JEoD8+rg= +github.com/parquet-go/parquet-go v0.28.0 h1:ECyksyv8T2pOrlLsN7aWJIoQakyk/HtxQ2lchgS4els= +github.com/parquet-go/parquet-go v0.28.0/go.mod h1:navtkAYr2LGoJVp141oXPlO/sxLvaOe3la2JEoD8+rg= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pb33f/jsonpath v0.7.1 h1:dEp6oIZuJbpDSyuHAl9m7GonoDW4M20BcD5vT0tPYRE= diff --git a/vendor/github.com/parquet-go/parquet-go/.CLAUDE.md b/vendor/github.com/parquet-go/parquet-go/.CLAUDE.md new file mode 100644 index 0000000000..37709f4df8 --- /dev/null +++ b/vendor/github.com/parquet-go/parquet-go/.CLAUDE.md @@ -0,0 +1,160 @@ +# parquet-go Project Context + +## Overview +High-performance Go library for reading and writing Apache Parquet files. Originally developed by Twilio Segment, now community-maintained at github.com/parquet-go/parquet-go. + +- **Go Version**: 1.22+ +- **Current Version**: v0.26.0+ (pre-v1, breaking changes possible) +- **Codebase Size**: ~71,700 lines across 210 Go files + +## Project Structure + +``` +parquet-go/ +├── Root Package # Core API: Reader/Writer/File/Schema +├── encoding/ # 7 encoding formats (plain, rle, delta, etc.) +├── compress/ # 6 compression codecs (snappy, gzip, zstd, etc.) +├── bloom/ # Split-block Bloom filter (SIMD-optimized) +├── sparse/ # Sparse array utilities with gather ops +├── hashprobe/ # Hash-based dictionary operations +├── format/ # Generated Thrift definitions (parquet spec) +└── internal/ # Memory mgmt, byte algorithms, unsafe casts +``` + +## Key Files by Function + +| Purpose | Files | +|---------|-------| +| Core Types | `parquet.go`, `file.go`, `schema.go`, `node.go`, `type.go` | +| Reading | `reader.go`, `column.go`, `page.go` | +| Writing | `writer.go`, `column_buffer.go`, `buffer.go` | +| Logical Types | `type_boolean.go` through `type_variant.go` (40+ files) | +| Column Buffers | `column_buffer_*.go` (20+ files for each physical type) | +| Page Impl | `page_*.go` (15+ files) | +| Operations | `convert.go`, `merge.go`, `sorting.go` | +| Configuration | `config.go` (900+ lines) | + +## Core Types & Interfaces + +### Schema System +- `Schema` - Immutable, thread-safe parquet schema +- `Node` - Interface for schema nodes +- `Field` - Named schema node interface +- `Column` - Concrete column representation +- `Type` - Logical type interface +- `Kind` - Physical type enum (Boolean, Int32, Int64, Float, Double, ByteArray, FixedLenByteArray) + +### Reader/Writer System +- `GenericReader[T]` - Type-safe reader (preferred) +- `GenericWriter[T]` - Type-safe writer (preferred) +- `SortingWriter[T]` - Writer with integrated sorting +- `GenericBuffer[T]` - In-memory row group buffer (implements sort.Interface) + +### Row Group Abstractions +- `RowGroup` - Interface for row group collections +- `ColumnChunk` - Interface for column data +- `Page` - Interface for page data with stats +- `Pages` - Sequential page reader + +## Common Patterns + +### Writing Data +```go +// One-shot +parquet.WriteFile[T](path, rows, options...) + +// Streaming +writer := parquet.NewGenericWriter[T](w, options...) +writer.Write(rows) +writer.Close() + +// With sorting +sortWriter := parquet.NewSortingWriter[T](w, rowCount, options...) +``` + +### Reading Data +```go +// One-shot +rows, _ := parquet.ReadFile[T](path) + +// Streaming +reader := parquet.NewGenericReader[T](r, options...) +n, _ := reader.Read(rows) + +// Low-level +file, _ := parquet.OpenFile(r, size) +for _, rg := range file.RowGroups() { ... } +``` + +## Build Commands + +```bash +make test # Run tests with -race and coverage +make format # Go fmt + modernize tool +make tools # Install development tools +``` + +## Testing Patterns + +- Unit tests: `*_test.go` throughout (100+ files) +- Examples: `example_test.go` +- Property tests: `internal/quick` package +- Key test files: + - `parquet_test.go` - Core functionality + - `writer_test.go` - Writer specifics (91KB) + - `reader_test.go` - Reader specifics + - `merge_test.go` - Merge operations + - `convert_test.go` - Schema conversion + +## Debugging + +Set environment variable: +```bash +PARQUETGODEBUG=1 +``` + +## Important Implementation Details + +### Performance Optimizations +- SIMD assembly for AMD64 (dictionary ops, page bounds, ordering) +- Zero-copy via interface-based design +- Memory pooling with `BufferPool` +- Async reading with `ReadModeAsync` + +### Schema Tags +Struct field tags control parquet behavior: +```go +type Record struct { + ID int64 `parquet:"id"` + Name string `parquet:"name,optional"` + Data []byte `parquet:"data,snappy"` +} +``` + +### Encoding Options +- plain, rle, delta (binary-packed, length byte array, byte array) +- bytestreamsplit (float optimization) +- dictionary encoding + +### Compression Codecs +snappy, gzip, brotli, zstd, lz4, uncompressed + +## Recent Development Focus +- Bug fixes: panic in Group.GoType(), json.RawMessage handling, repetition levels +- New features: Geometry/Geography types, VARIANT logical type +- Performance: GenericWriter optimizations +- Stability: SortingWriter improvements + +## Common Bug Areas +- Schema conversion edge cases (`convert.go`) +- Nested structure handling (repetition/definition levels) +- Memory management in streaming writes +- Page statistics and index handling +- Dictionary encoding with nulls +- Interface type handling in dynamic value mapping (`value.go`, `row.go`) + +## Dependencies (Runtime) +- Compression: brotli, gzip, lz4, zstd libraries +- Encoding: bitpack, jsonlite +- Types: google/uuid, go-geom (geometry) +- Serialization: protobuf diff --git a/vendor/github.com/parquet-go/parquet-go/column.go b/vendor/github.com/parquet-go/parquet-go/column.go index fb819cebc3..0566c8589a 100644 --- a/vendor/github.com/parquet-go/parquet-go/column.go +++ b/vendor/github.com/parquet-go/parquet-go/column.go @@ -238,12 +238,15 @@ func (c *Column) setLevels(depth, repetition, definition, index int) (int, error return -1, fmt.Errorf("cannot represent parquet columns with more than %d definition levels: %s", MaxDefinitionLevel, c.path) } - switch schemaRepetitionTypeOf(c.schema) { - case format.Optional: - definition++ - case format.Repeated: - repetition++ - definition++ + // Only non-root columns have a well defined repetition_type + if depth > 0 { + switch schemaRepetitionTypeOf(c.schema) { + case format.Optional: + definition++ + case format.Repeated: + repetition++ + definition++ + } } c.depth = int8(depth) diff --git a/vendor/github.com/parquet-go/parquet-go/column_buffer_reflect.go b/vendor/github.com/parquet-go/parquet-go/column_buffer_reflect.go index cfc630fd23..1564605736 100644 --- a/vendor/github.com/parquet-go/parquet-go/column_buffer_reflect.go +++ b/vendor/github.com/parquet-go/parquet-go/column_buffer_reflect.go @@ -1,10 +1,14 @@ package parquet import ( + "bytes" "cmp" + "encoding/binary" "encoding/json" "fmt" "maps" + "math" + "math/big" "math/bits" "reflect" "sort" @@ -37,11 +41,10 @@ import ( // - nil *structpb.Struct, *structpb.ListValue, *structpb.Value // - Zero values for value types func isNullValue(value reflect.Value) bool { - if !value.IsValid() { - return true - } - switch value.Kind() { + case reflect.Invalid: + return true + case reflect.Pointer, reflect.Interface: if value.IsNil() { return true @@ -705,12 +708,11 @@ func writeValueFuncOfLeaf(columnIndex int16, node Node) (int16, writeValueFunc) return columnIndex + 1, func(columns []ColumnBuffer, levels columnLevels, value reflect.Value) { col := columns[columnIndex] writeValue: - if !value.IsValid() { + switch value.Kind() { + case reflect.Invalid: col.writeNull(levels) return - } - switch value.Kind() { case reflect.Pointer, reflect.Interface: if value.IsNil() { col.writeNull(levels) @@ -755,6 +757,8 @@ func writeValueFuncOfLeaf(columnIndex int16, node Node) (int16, writeValueFunc) writeProtoAny(col, levels, msg, node) case geom.T: writeGeometry(col, levels, msg, node) + case *big.Float: + writeBigFloat(col, levels, msg, node) default: value = value.Elem() goto writeValue @@ -790,10 +794,23 @@ func writeValueFuncOfLeaf(columnIndex int16, node Node) (int16, writeValueFunc) return case reflect.Float32: + typ := node.Type() + logicalType := typ.LogicalType() + if logicalType != nil && logicalType.Decimal != nil { + decimalValue(col, levels, typ, value, logicalType.Decimal.Scale) + return + } col.writeFloat(levels, float32(value.Float())) return case reflect.Float64: + typ := node.Type() + logicalType := typ.LogicalType() + if logicalType != nil && logicalType.Decimal != nil { + decimalValue(col, levels, typ, value, logicalType.Decimal.Scale) + return + } + col.writeDouble(levels, value.Float()) return @@ -907,3 +924,108 @@ func writeUUID(col ColumnBuffer, levels columnLevels, str string, typ Type) { col.writeByteArray(levels, buf.Slice()) buf.Reset() } + +func decimalValue(col ColumnBuffer, levels columnLevels, typ Type, value reflect.Value, scale int32) { + val := int64(math.Round(value.Float() * math.Pow10(int(scale)))) + switch typ.Kind() { + case Int32: + col.writeInt32(levels, int32(val)) + case Int64: + col.writeInt64(levels, val) + case ByteArray: + col.writeByteArray(levels, numberToByteArray(val)) + } +} + +func numberToByteArray(data any) []byte { + var buf bytes.Buffer + err := binary.Write(&buf, binary.BigEndian, data) + if err != nil { + panic(err) + } + return buf.Bytes() +} + +func writeBigFloat(col ColumnBuffer, levels columnLevels, f *big.Float, node Node) { + typ := node.Type() + logicalType := typ.LogicalType() + if logicalType == nil || logicalType.Decimal == nil { + panic("writeBigFloat requires a decimal logical type") + } + + scale := int(logicalType.Decimal.Scale) + // Compute minimum precision needed: decimal precision * log2(10) ≈ precision * 3.32 + // We use precision * 4 + 64 for safety margin + minPrec := uint(logicalType.Decimal.Precision)*4 + 64 + prec := max(f.Prec(), minPrec) + scaleFactor := new(big.Float).SetPrec(prec) + scaleFactor.SetInt(new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(scale)), nil)) + scaled := new(big.Float).SetPrec(prec).Mul(f, scaleFactor) + // Round to nearest integer (add 0.5 and truncate for positive, subtract 0.5 for negative) + half := new(big.Float).SetPrec(prec).SetFloat64(0.5) + if scaled.Sign() >= 0 { + scaled.Add(scaled, half) + } else { + scaled.Sub(scaled, half) + } + unscaled, _ := scaled.Int(nil) + + b := bigIntToByteArray(unscaled) + if typ.Kind() == FixedLenByteArray { + b = padToFixedLen(b, typ.Length(), unscaled.Sign() < 0) + } + col.writeByteArray(levels, b) +} + +func bigIntToByteArray(i *big.Int) []byte { + if i.Sign() >= 0 { + b := i.Bytes() + // Add leading zero byte if high bit is set to avoid being interpreted as negative + if len(b) > 0 && b[0]&0x80 != 0 { + b = append([]byte{0}, b...) + } + return b + } + // Negative: convert to two's complement + // Get the absolute value bytes + abs := new(big.Int).Abs(i) + b := abs.Bytes() + // Add a leading zero byte to ensure we have room for sign + if len(b) == 0 || b[0]&0x80 != 0 { + b = append([]byte{0}, b...) + } + // Invert all bits + for j := range b { + b[j] = ^b[j] + } + // Add 1 + carry := byte(1) + for j := len(b) - 1; j >= 0 && carry > 0; j-- { + sum := b[j] + carry + b[j] = sum + if sum != 0 { + carry = 0 + } + } + return b +} + +func padToFixedLen(b []byte, length int, negative bool) []byte { + if len(b) == length { + return b + } + if len(b) > length { + panic(fmt.Sprintf("decimal value requires %d bytes but fixed length is %d", len(b), length)) + } + padByte := byte(0x00) + if negative { + padByte = 0xFF + } + result := make([]byte, length) + padding := length - len(b) + for i := range padding { + result[i] = padByte + } + copy(result[padding:], b) + return result +} diff --git a/vendor/github.com/parquet-go/parquet-go/column_buffer_write.go b/vendor/github.com/parquet-go/parquet-go/column_buffer_write.go index 34ad076931..cc46e5f22c 100644 --- a/vendor/github.com/parquet-go/parquet-go/column_buffer_write.go +++ b/vendor/github.com/parquet-go/parquet-go/column_buffer_write.go @@ -307,8 +307,26 @@ func writeRowsFuncOfPointer(t reflect.Type, schema *Schema, path columnPath, tag func writeRowsFuncOfSlice(t reflect.Type, schema *Schema, path columnPath, tagReplacements []StructTagOption) writeRowsFunc { elemType := t.Elem() elemSize := uintptr(elemType.Size()) + + // If the current node is a LIST, we need to drill down to the element + // to find the schema node for the slice elements. + if node := findByPath(schema, path); node != nil && isList(node) { + path = path.append("list", "element") + } + writeRows := writeRowsFuncOf(elemType, schema, path, tagReplacements) + // Check if the element schema node is optional. + // This handles the case of `parquet-element:",optional"` tag where + // the list elements themselves are optional (non-pointer basic types). + // For pointer types (like []*string), the pointer handling already + // takes care of optionality via writeRowsFuncOfPointer. + if elemType.Kind() != reflect.Ptr { + if node := findByPath(schema, path); node != nil && node.Optional() { + writeRows = writeRowsFuncOfOptional(elemType, schema, path, writeRows) + } + } + return func(columns []ColumnBuffer, levels columnLevels, rows sparse.Array) { type sliceHeader struct { base unsafe.Pointer @@ -638,14 +656,18 @@ func writeRowsFuncOfMap(t reflect.Type, schema *Schema, path columnPath, tagRepl } func writeRowsFuncOfJSON(t reflect.Type, schema *Schema, path columnPath) writeRowsFunc { - // If this is a string or a byte array write directly. - switch t.Kind() { - case reflect.String: - return writeRowsFuncOfRequired(t, schema, path) - case reflect.Slice: - if t.Elem().Kind() == reflect.Uint8 { - return writeRowsFuncOfRequired(t, schema, path) + // If this is a string or a byte array, write directly. + if t.Kind() == reflect.String || (t.Kind() == reflect.Slice && t.Elem().Kind() == reflect.Uint8) { + column, _ := schema.Lookup(path...) + isOptional := column.Node.Optional() + + // This fast-path avoids the optional checks, so check here and wrap the writer + // if required. + writer := writeRowsFuncOfRequired(t, schema, path) + if isOptional { + writer = writeRowsFuncOfOptional(t, schema, path, writer) } + return writer } columnIndex := findColumnIndex(schema, schema, path) diff --git a/vendor/github.com/parquet-go/parquet-go/convert.go b/vendor/github.com/parquet-go/parquet-go/convert.go index 8cbab51d25..2b58865d46 100644 --- a/vendor/github.com/parquet-go/parquet-go/convert.go +++ b/vendor/github.com/parquet-go/parquet-go/convert.go @@ -283,7 +283,7 @@ func (c *conversion) Convert(rows []Row) (int, error) { // Fix: If we have a zero Value{}, convert it to a properly typed value // For optional fields, keep as null (kind = 0) // For required fields, convert to typed zero value - if columnValues[i].Kind() == Kind(0) && !conv.isOptional { + if columnValues[i].IsNull() && !conv.isOptional { columnValues[i] = ZeroValue(conv.targetKind) } diff --git a/vendor/github.com/parquet-go/parquet-go/encoding/thrift/binary.go b/vendor/github.com/parquet-go/parquet-go/encoding/thrift/binary.go index 82d7fe610f..7c01ccbdae 100644 --- a/vendor/github.com/parquet-go/parquet-go/encoding/thrift/binary.go +++ b/vendor/github.com/parquet-go/parquet-go/encoding/thrift/binary.go @@ -46,7 +46,8 @@ func (r *binaryReader) Reader() io.Reader { func (r *binaryReader) ReadBool() (bool, error) { v, err := r.ReadByte() - return v != 0, err + // Thrift protocol treats both 0 and 2 as false. + return v != 0 && v != 2, err } func (r *binaryReader) ReadInt8() (int8, error) { diff --git a/vendor/github.com/parquet-go/parquet-go/format/parquet.go b/vendor/github.com/parquet-go/parquet-go/format/parquet.go index 90c135eca6..256a803e61 100644 --- a/vendor/github.com/parquet-go/parquet-go/format/parquet.go +++ b/vendor/github.com/parquet-go/parquet-go/format/parquet.go @@ -1038,6 +1038,17 @@ type RowGroup struct { Ordinal int16 `thrift:"7,optional,writezero"` } +func (r *RowGroup) Reset() { + r.FileOffset = 0 + r.NumRows = 0 + r.TotalByteSize = 0 + r.SortingColumns = r.SortingColumns[:0] + r.Columns = r.Columns[:0] + r.Ordinal = 0 + r.TotalByteSize = 0 + r.TotalCompressedSize = 0 +} + // Empty struct to signal the order defined by the physical or logical type. type TypeDefinedOrder struct{} @@ -1125,6 +1136,16 @@ type OffsetIndex struct { UnencodedByteArrayDataBytes []int64 `thrift:"2,optional"` } +func (o *OffsetIndex) Reset() { + for k := range o.PageLocations { + o.PageLocations[k].Offset = 0 + o.PageLocations[k].CompressedPageSize = 0 + o.PageLocations[k].FirstRowIndex = 0 + } + o.PageLocations = o.PageLocations[:0] + o.UnencodedByteArrayDataBytes = o.UnencodedByteArrayDataBytes[:0] +} + // Description for ColumnIndex. // Each [i] refers to the page at OffsetIndex.PageLocations[i] type ColumnIndex struct { @@ -1174,6 +1195,22 @@ type ColumnIndex struct { DefinitionLevelHistogram []int64 `thrift:"7,optional"` } +func (c *ColumnIndex) Reset() { + c.DefinitionLevelHistogram = c.DefinitionLevelHistogram[:0] + c.RepetitionLevelHistogram = c.RepetitionLevelHistogram[:0] + c.NullCounts = c.NullCounts[:0] + c.NullPages = c.NullPages[:0] + c.BoundaryOrder = 0 + for k := range c.MaxValues { + c.MaxValues[k] = c.MaxValues[k][:0] + } + c.MaxValues = c.MaxValues[:0] + for k := range c.MinValues { + c.MinValues[k] = c.MinValues[k][:0] + } + c.MinValues = c.MinValues[:0] +} + type AesGcmV1 struct { // AAD prefix. AadPrefix []byte `thrift:"1,optional"` diff --git a/vendor/github.com/parquet-go/parquet-go/merge.go b/vendor/github.com/parquet-go/parquet-go/merge.go index fc7b7cd20f..8650b66a9a 100644 --- a/vendor/github.com/parquet-go/parquet-go/merge.go +++ b/vendor/github.com/parquet-go/parquet-go/merge.go @@ -698,7 +698,7 @@ func mergeTwoNodes(a, b Node) Node { if !isPlainEncoding(encoding2) { encoding = encoding2 } - if encoding != nil { + if encoding != nil && canEncode(encoding, merged.Type().Kind()) { merged = Encoded(merged, encoding) } } else { diff --git a/vendor/github.com/parquet-go/parquet-go/node.go b/vendor/github.com/parquet-go/parquet-go/node.go index d2460c9d56..6429bb803f 100644 --- a/vendor/github.com/parquet-go/parquet-go/node.go +++ b/vendor/github.com/parquet-go/parquet-go/node.go @@ -1,8 +1,10 @@ package parquet import ( + "fmt" "reflect" "slices" + "strconv" "strings" "unicode" "unicode/utf8" @@ -381,18 +383,70 @@ func goTypeOfLeaf(node Node) reflect.Type { func goTypeOfGroup(node Node) reflect.Type { fields := node.Fields() structFields := make([]reflect.StructField, len(fields)) + names := make(map[string]struct{}, len(fields)) for i, field := range fields { - structFields[i].Name = exportedStructFieldName(field.Name()) + if strings.IndexByte(field.Name(), ',') != -1 { + // Ruh-roh! We cannot create a valid Go identifier, but neither can we + // create a valid Go struct tag for mapping a synthetic field name. + panic(fmt.Sprintf("schema node contains an invalid name %q: fields must not have commas in their name", field.Name())) + } + name := exportedStructFieldName(field.Name()) + for { + if _, alreadyUsed := names[name]; !alreadyUsed { + break + } + name += "_" // add suffix to fix collision + } + names[name] = struct{}{} + structFields[i].Name = name structFields[i].Type = field.GoType() - // TODO: can we reconstruct a struct tag that would be valid if a value - // of this type were passed to SchemaOf? + structFields[i].Tag = reflect.StructTag(`parquet:` + strconv.Quote(field.Name())) } return reflect.StructOf(structFields) } func exportedStructFieldName(name string) string { firstRune, size := utf8.DecodeRuneInString(name) - return string([]rune{unicode.ToUpper(firstRune)}) + name[size:] + if unicode.IsUpper(firstRune) { + return sanitize(name) + } + upperFirstRune := unicode.ToUpper(firstRune) + if upperFirstRune == firstRune { + // First character was not a letter, so just trying to upper-case the first + // character won't export the field. We need to add an upper-case prefix instead. + return "X" + sanitize(name) + } + return string([]rune{upperFirstRune}) + sanitize(name[size:]) +} + +// sanitize replaces any character that are invalid in a Go identifier with "_". +func sanitize(name string) string { + if isSanitized(name) { + // Fast path: name is fine, no need to allocate or compute anything. + return name + } + var newName strings.Builder + for _, r := range name { + if isValidInGoIdent(r) { + newName.WriteRune(r) + } else { + newName.WriteByte('_') + } + } + return newName.String() +} + +func isSanitized(name string) bool { + for _, r := range name { + if !isValidInGoIdent(r) { + return false + } + } + return true +} + +func isValidInGoIdent(r rune) bool { + return r == '_' || unicode.IsLetter(r) || unicode.IsDigit(r) } func isList(node Node) bool { diff --git a/vendor/github.com/parquet-go/parquet-go/page_byte_array.go b/vendor/github.com/parquet-go/parquet-go/page_byte_array.go index dbe5407326..08d69b9160 100644 --- a/vendor/github.com/parquet-go/parquet-go/page_byte_array.go +++ b/vendor/github.com/parquet-go/parquet-go/page_byte_array.go @@ -18,10 +18,16 @@ type byteArrayPage struct { func newByteArrayPage(typ Type, columnIndex int16, numValues int32, values encoding.Values) *byteArrayPage { data, offsets := values.ByteArray() + if len(offsets) != int(numValues)+1 { + println("parquet: warning: column", columnIndex, "byte array page has", len(offsets), "offsets but numValues is", numValues, "(expected", numValues+1, "offsets)") + } + if int(numValues)+1 <= len(offsets) { + offsets = offsets[:numValues+1] + } return &byteArrayPage{ typ: typ, values: memory.SliceBufferFrom(data), - offsets: memory.SliceBufferFrom(offsets[:numValues+1]), + offsets: memory.SliceBufferFrom(offsets), columnIndex: ^columnIndex, } } diff --git a/vendor/github.com/parquet-go/parquet-go/page_double.go b/vendor/github.com/parquet-go/parquet-go/page_double.go index 92f6343843..c93903a969 100644 --- a/vendor/github.com/parquet-go/parquet-go/page_double.go +++ b/vendor/github.com/parquet-go/parquet-go/page_double.go @@ -2,6 +2,7 @@ package parquet import ( "io" + "math" "github.com/parquet-go/bitpack/unsafecast" "github.com/parquet-go/parquet-go/encoding" @@ -50,11 +51,39 @@ func (page *doublePage) max() float64 { return maxFloat64(page.values.Slice()) } func (page *doublePage) bounds() (min, max float64) { return boundsFloat64(page.values.Slice()) } +// Bounds returns the min and max values in the page. NaN values are excluded +// from the result when non-NaN values exist so that query engines can rely on +// min/max for predicate pushdown and row-group/page skipping. This matches the +// behavior of Apache parquet-mr (PARQUET-1246), Apache Arrow, and the Apache +// Iceberg spec (which states lower/upper bounds apply to non-null, non-NaN +// values only). If all values are NaN, the bounds are reported as NaN so that +// readers know the page had data. func (page *doublePage) Bounds() (min, max Value, ok bool) { if ok = page.values.Len() > 0; ok { - minFloat, maxFloat := page.bounds() - min = page.makeValue(minFloat) - max = page.makeValue(maxFloat) + data := page.values.Slice() + i := 0 + for i < len(data) && math.IsNaN(data[i]) { + i++ + } + if i >= len(data) { + min = page.makeValue(data[0]) + max = page.makeValue(data[0]) + return min, max, ok + } + lo, hi := data[i], data[i] + for _, v := range data[i+1:] { + if math.IsNaN(v) { + continue + } + if v < lo { + lo = v + } + if v > hi { + hi = v + } + } + min = page.makeValue(lo) + max = page.makeValue(hi) } return min, max, ok } diff --git a/vendor/github.com/parquet-go/parquet-go/page_float.go b/vendor/github.com/parquet-go/parquet-go/page_float.go index 3c6c092dd8..d4002000ca 100644 --- a/vendor/github.com/parquet-go/parquet-go/page_float.go +++ b/vendor/github.com/parquet-go/parquet-go/page_float.go @@ -2,6 +2,7 @@ package parquet import ( "io" + "math" "github.com/parquet-go/bitpack/unsafecast" "github.com/parquet-go/parquet-go/encoding" @@ -50,11 +51,40 @@ func (page *floatPage) max() float32 { return maxFloat32(page.values.Slice()) } func (page *floatPage) bounds() (min, max float32) { return boundsFloat32(page.values.Slice()) } +// Bounds returns the min and max values in the page. NaN values are excluded +// from the result when non-NaN values exist so that query engines can rely on +// min/max for predicate pushdown and row-group/page skipping. This matches the +// behavior of Apache parquet-mr (PARQUET-1246), Apache Arrow, and the Apache +// Iceberg spec (which states lower/upper bounds apply to non-null, non-NaN +// values only). If all values are NaN, the bounds are reported as NaN so that +// readers know the page had data. func (page *floatPage) Bounds() (min, max Value, ok bool) { if ok = page.values.Len() > 0; ok { - minFloat32, maxFloat32 := page.bounds() - min = page.makeValue(minFloat32) - max = page.makeValue(maxFloat32) + data := page.values.Slice() + i := 0 + for i < len(data) && math.IsNaN(float64(data[i])) { + i++ + } + if i >= len(data) { + // All values are NaN. + min = page.makeValue(data[0]) + max = page.makeValue(data[0]) + return min, max, ok + } + lo, hi := data[i], data[i] + for _, v := range data[i+1:] { + if math.IsNaN(float64(v)) { + continue + } + if v < lo { + lo = v + } + if v > hi { + hi = v + } + } + min = page.makeValue(lo) + max = page.makeValue(hi) } return min, max, ok } diff --git a/vendor/github.com/parquet-go/parquet-go/page_optional.go b/vendor/github.com/parquet-go/parquet-go/page_optional.go index 9c51d1978c..0a2f11a7d5 100644 --- a/vendor/github.com/parquet-go/parquet-go/page_optional.go +++ b/vendor/github.com/parquet-go/parquet-go/page_optional.go @@ -92,7 +92,12 @@ func (r *optionalPageValues) ReadValues(values []Value) (n int, err error) { } if n < i { - for j, err = r.values.ReadValues(values[n:i]); j > 0; j-- { + j, err := r.values.ReadValues(values[n:i]) + if j == 0 && err == io.EOF { + // Underlying page exhausted before definitionLevels - corrupted file + return n, io.EOF + } + for ; j > 0; j-- { values[n].definitionLevel = maxDefinitionLevel r.offset++ n++ diff --git a/vendor/github.com/parquet-go/parquet-go/page_repeated.go b/vendor/github.com/parquet-go/parquet-go/page_repeated.go index 2d850a66e2..0cd2856a14 100644 --- a/vendor/github.com/parquet-go/parquet-go/page_repeated.go +++ b/vendor/github.com/parquet-go/parquet-go/page_repeated.go @@ -152,7 +152,12 @@ func (r *repeatedPageValues) ReadValues(values []Value) (n int, err error) { // Copy all the non-zero values in this run. if n < i { - for j, err = r.values.ReadValues(values[n:i]); j > 0; j-- { + j, err := r.values.ReadValues(values[n:i]) + if j == 0 && err == io.EOF { + // Underlying page exhausted before definitionLevels - corrupted file + return n, io.EOF + } + for ; j > 0; j-- { values[n].repetitionLevel = repetitionLevels[r.offset] values[n].definitionLevel = maxDefinitionLevel r.offset++ diff --git a/vendor/github.com/parquet-go/parquet-go/row.go b/vendor/github.com/parquet-go/parquet-go/row.go index a662d1fb12..180a704f57 100644 --- a/vendor/github.com/parquet-go/parquet-go/row.go +++ b/vendor/github.com/parquet-go/parquet-go/row.go @@ -705,7 +705,99 @@ func reconstructFuncOfRequired(columnIndex int16, node Node) (int16, reconstruct } func reconstructFuncOfList(columnIndex int16, node Node) (int16, reconstructFunc) { - return reconstructFuncOf(columnIndex, Repeated(listElementOf(node))) + elem := listElementOf(node) + // If the list element is optional (e.g., from `parquet-element:",optional"`), + // we need to handle it specially because the normal path through + // reconstructFuncOfRepeated would wrap the node with Required() which + // hides the Optional property. + if elem.Optional() { + return reconstructFuncOfRepeatedOptional(columnIndex, elem) + } + return reconstructFuncOf(columnIndex, Repeated(elem)) +} + +// reconstructFuncOfRepeatedOptional handles the case where list elements are optional. +// This is needed because reconstructFuncOfRepeated uses Required() which hides +// the Optional property of the inner node. +// +//go:noinline +func reconstructFuncOfRepeatedOptional(columnIndex int16, node Node) (int16, reconstructFunc) { + // node is Optional(X), get the inner reconstruction for the required version + nextColumnIndex, reconstruct := reconstructFuncOf(columnIndex, Required(node)) + + return nextColumnIndex, func(value reflect.Value, levels columnLevels, columns [][]Value) error { + // Increment both for the repeated and optional levels + levels.repetitionDepth++ + levels.definitionLevel += 2 // +1 for repeated, +1 for optional + + // Handle empty groups (no columns) + if len(columns) == 0 || len(columns[0]) == 0 { + setMakeSlice(value, 0) + return nil + } + + // Check if the list itself is null (definition level less than the repeated level) + // We need to check against (levels.definitionLevel - 1) because that's the repeated level + if columns[0][0].definitionLevel < levels.definitionLevel-1 { + setMakeSlice(value, 0) + return nil + } + + values := make([][]Value, len(columns)) + column := columns[0] + n := 0 + + for i, column := range columns { + values[i] = column[0:0:len(column)] + } + + for i := 0; i < len(column); { + i++ + n++ + + for i < len(column) && column[i].repetitionLevel > levels.repetitionDepth { + i++ + } + } + + value = setMakeSlice(value, n) + + for i := range n { + for j, column := range values { + column = column[:cap(column)] + if len(column) == 0 { + continue + } + + k := 1 + for k < len(column) && column[k].repetitionLevel > levels.repetitionDepth { + k++ + } + + values[j] = column[:k] + } + + // Check if this element is null (definition level indicates null) + // An element is null if its definition level is less than the max (which includes optional) + elemValue := value.Index(i) + if len(values) > 0 && len(values[0]) > 0 && values[0][0].definitionLevel < levels.definitionLevel { + // Element is null, leave as zero value + elemValue.SetZero() + } else { + if err := reconstruct(elemValue, levels, values); err != nil { + return err + } + } + + for j, column := range values { + values[j] = column[len(column):len(column):cap(column)] + } + + levels.repetitionLevel = levels.repetitionDepth + } + + return nil + } } //go:noinline diff --git a/vendor/github.com/parquet-go/parquet-go/row_buffer.go b/vendor/github.com/parquet-go/parquet-go/row_buffer.go index 9f98e4d057..1ce453f3c3 100644 --- a/vendor/github.com/parquet-go/parquet-go/row_buffer.go +++ b/vendor/github.com/parquet-go/parquet-go/row_buffer.go @@ -2,6 +2,7 @@ package parquet import ( "io" + "math" "sort" "github.com/parquet-go/parquet-go/deprecated" @@ -225,21 +226,48 @@ func (p *rowBufferPage) NumNulls() int64 { } func (p *rowBufferPage) Bounds() (min, max Value, ok bool) { + var nanValue Value + hasNaN := false + p.scan(func(value Value) { - if !value.IsNull() { - switch { - case !ok: - min, max, ok = value, value, true - case p.typ.Compare(value, min) < 0: - min = value - case p.typ.Compare(value, max) > 0: - max = value - } + if value.IsNull() { + return + } + // NaN is excluded from min/max so that query engines can rely on + // statistics for predicate pushdown. See floatPage.Bounds for the + // full rationale. + if isNaN(value) { + nanValue = value + hasNaN = true + return + } + switch { + case !ok: + min, max, ok = value, value, true + case p.typ.Compare(value, min) < 0: + min = value + case p.typ.Compare(value, max) > 0: + max = value } }) + + if !ok && hasNaN { + min, max, ok = nanValue, nanValue, true + } return min, max, ok } +func isNaN(v Value) bool { + switch v.Kind() { + case Float: + return math.IsNaN(float64(v.Float())) + case Double: + return math.IsNaN(v.Double()) + default: + return false + } +} + func (p *rowBufferPage) Size() int64 { return 0 } func (p *rowBufferPage) Values() ValueReader { diff --git a/vendor/github.com/parquet-go/parquet-go/schema.go b/vendor/github.com/parquet-go/parquet-go/schema.go index 248de8ad58..7f86acc1f2 100644 --- a/vendor/github.com/parquet-go/parquet-go/schema.go +++ b/vendor/github.com/parquet-go/parquet-go/schema.go @@ -1152,6 +1152,21 @@ func makeNodeOf(path []string, t reflect.Type, name string, tags parquetTags, ta throwInvalidTag(t, name, option+args) } setNode(TimeAdjusted(timeUnit, adjusted)) + case reflect.Ptr: + // Support *time.Duration with time tag + if t.Elem() == reflect.TypeFor[time.Duration]() { + timeUnit, adjusted, err := parseTimestampArgs(args) + if err != nil { + throwInvalidTag(t, name, option+args) + } + if args == "()" { + timeUnit = Nanosecond + adjusted = true + } + setNode(Optional(TimeAdjusted(timeUnit, adjusted))) + } else { + throwInvalidTag(t, name, option) + } default: throwInvalidTag(t, name, option) } diff --git a/vendor/github.com/parquet-go/parquet-go/sorting.go b/vendor/github.com/parquet-go/parquet-go/sorting.go index e4087b6d31..cdb5974cc9 100644 --- a/vendor/github.com/parquet-go/parquet-go/sorting.go +++ b/vendor/github.com/parquet-go/parquet-go/sorting.go @@ -73,18 +73,11 @@ func (w *SortingWriter[T]) Close() error { if err := w.Flush(); err != nil { return err } - return w.output.Close() -} -func (w *SortingWriter[T]) Flush() error { defer w.resetSortingBuffer() - if err := w.sortAndWriteBufferedRows(); err != nil { - return err - } - if w.numRows == 0 { - return nil + return w.output.Close() } if err := w.writer.Close(); err != nil { @@ -129,7 +122,18 @@ func (w *SortingWriter[T]) Flush() error { return err } - return w.output.Flush() + if err := w.output.Flush(); err != nil { + return err + } + + return w.output.Close() +} + +// Flush sorts any buffered rows and writes them to temporary storage. +// This can be called multiple times to manage memory usage. +// The actual merge and write to output happens on Close. +func (w *SortingWriter[T]) Flush() error { + return w.sortAndWriteBufferedRows() } func (w *SortingWriter[T]) Reset(output io.Writer) { diff --git a/vendor/github.com/parquet-go/parquet-go/type_decimal.go b/vendor/github.com/parquet-go/parquet-go/type_decimal.go index 5f57557aa2..0bb1e53cc0 100644 --- a/vendor/github.com/parquet-go/parquet-go/type_decimal.go +++ b/vendor/github.com/parquet-go/parquet-go/type_decimal.go @@ -1,6 +1,12 @@ package parquet import ( + "log" + "math" + "math/big" + "reflect" + "strconv" + "github.com/parquet-go/parquet-go/deprecated" "github.com/parquet-go/parquet-go/format" ) @@ -11,7 +17,18 @@ import ( // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal func Decimal(scale, precision int, typ Type) Node { switch typ.Kind() { - case Int32, Int64, ByteArray, FixedLenByteArray: + case Int32: + if precision < 1 || precision > 9 { + panic("DECIMAL annotated with Int32 must have precision >= 1 and <= 9, got " + strconv.Itoa(precision)) + } + case Int64: + if precision < 1 || precision > 18 { + panic("DECIMAL annotated with Int32 must have precision >= 1 and <= 9, got " + strconv.Itoa(precision)) + } + if precision < 10 { + log.Printf("WARNING: DECIMAL annotated with Int64 should have a precision >= 10, got %d", precision) + } + case ByteArray, FixedLenByteArray: default: panic("DECIMAL node must annotate Int32, Int64, ByteArray or FixedLenByteArray but got " + typ.String()) } @@ -38,3 +55,50 @@ func (t *decimalType) LogicalType() *format.LogicalType { func (t *decimalType) ConvertedType() *deprecated.ConvertedType { return &convertedTypes[deprecated.Decimal] } + +func (t *decimalType) AssignValue(dst reflect.Value, src Value) error { + switch t.Type { + case Int32Type: + switch dst.Kind() { + case reflect.Int32: + dst.SetInt(int64(src.int32())) + default: + dst.Set(reflect.ValueOf(float32(src.int32()) / float32(math.Pow10(int(t.decimal.Scale))))) + } + case Int64Type: + switch dst.Kind() { + case reflect.Int64: + dst.SetInt(src.int64()) + default: + dst.Set(reflect.ValueOf(float64(src.int64()) / math.Pow10(int(t.decimal.Scale)))) + } + default: + // ByteArray and FixedLenByteArray + if t.Type.Kind() != ByteArray && t.Type.Kind() != FixedLenByteArray { + return nil + } + data := src.ByteArray() + val := new(big.Int) + if len(data) > 0 && data[0]&0x80 != 0 { + // Negative number: convert from two's complement + tmp := make([]byte, len(data)) + for i, b := range data { + tmp[i] = ^b + } + val.SetBytes(tmp) + val.Add(val, big.NewInt(1)) + val.Neg(val) + } else { + val.SetBytes(data) + } + // Use enough precision to represent the decimal value accurately + // precision * log2(10) ≈ precision * 3.32, round up generously + prec := max(uint(t.decimal.Precision)*4+64, 192) + f := new(big.Float).SetPrec(prec).SetInt(val) + scaleFactor := new(big.Float).SetPrec(prec) + scaleFactor.SetInt(new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(t.decimal.Scale)), nil)) + f.Quo(f, scaleFactor) + dst.Set(reflect.ValueOf(f)) + } + return nil +} diff --git a/vendor/github.com/parquet-go/parquet-go/type_int32.go b/vendor/github.com/parquet-go/parquet-go/type_int32.go index f6497c76d0..d46113f287 100644 --- a/vendor/github.com/parquet-go/parquet-go/type_int32.go +++ b/vendor/github.com/parquet-go/parquet-go/type_int32.go @@ -10,19 +10,19 @@ import ( type int32Type struct{} -func (t int32Type) String() string { return "INT32" } -func (t int32Type) Kind() Kind { return Int32 } -func (t int32Type) Length() int { return 32 } -func (t int32Type) EstimateSize(n int) int { return 4 * n } -func (t int32Type) EstimateNumValues(n int) int { return n / 4 } -func (t int32Type) Compare(a, b Value) int { return compareInt32(a.int32(), b.int32()) } -func (t int32Type) ColumnOrder() *format.ColumnOrder { return &typeDefinedColumnOrder } -func (t int32Type) LogicalType() *format.LogicalType { - return &format.LogicalType{Integer: &format.IntType{ - BitWidth: 32, - IsSigned: true, - }} -} +var int32LogicalType = format.LogicalType{Integer: &format.IntType{ + BitWidth: 32, + IsSigned: true, +}} + +func (t int32Type) String() string { return "INT32" } +func (t int32Type) Kind() Kind { return Int32 } +func (t int32Type) Length() int { return 32 } +func (t int32Type) EstimateSize(n int) int { return 4 * n } +func (t int32Type) EstimateNumValues(n int) int { return n / 4 } +func (t int32Type) Compare(a, b Value) int { return compareInt32(a.int32(), b.int32()) } +func (t int32Type) ColumnOrder() *format.ColumnOrder { return &typeDefinedColumnOrder } +func (t int32Type) LogicalType() *format.LogicalType { return &int32LogicalType } func (t int32Type) ConvertedType() *deprecated.ConvertedType { return nil } func (t int32Type) PhysicalType() *format.Type { return &physicalTypes[Int32] } diff --git a/vendor/github.com/parquet-go/parquet-go/type_int64.go b/vendor/github.com/parquet-go/parquet-go/type_int64.go index 9742a7f14b..db1287b694 100644 --- a/vendor/github.com/parquet-go/parquet-go/type_int64.go +++ b/vendor/github.com/parquet-go/parquet-go/type_int64.go @@ -10,19 +10,19 @@ import ( type int64Type struct{} -func (t int64Type) String() string { return "INT64" } -func (t int64Type) Kind() Kind { return Int64 } -func (t int64Type) Length() int { return 64 } -func (t int64Type) EstimateSize(n int) int { return 8 * n } -func (t int64Type) EstimateNumValues(n int) int { return n / 8 } -func (t int64Type) Compare(a, b Value) int { return compareInt64(a.int64(), b.int64()) } -func (t int64Type) ColumnOrder() *format.ColumnOrder { return &typeDefinedColumnOrder } -func (t int64Type) LogicalType() *format.LogicalType { - return &format.LogicalType{Integer: &format.IntType{ - BitWidth: 64, - IsSigned: true, - }} -} +var int64LogicalType = format.LogicalType{Integer: &format.IntType{ + BitWidth: 64, + IsSigned: true, +}} + +func (t int64Type) String() string { return "INT64" } +func (t int64Type) Kind() Kind { return Int64 } +func (t int64Type) Length() int { return 64 } +func (t int64Type) EstimateSize(n int) int { return 8 * n } +func (t int64Type) EstimateNumValues(n int) int { return n / 8 } +func (t int64Type) Compare(a, b Value) int { return compareInt64(a.int64(), b.int64()) } +func (t int64Type) ColumnOrder() *format.ColumnOrder { return &typeDefinedColumnOrder } +func (t int64Type) LogicalType() *format.LogicalType { return &int64LogicalType } func (t int64Type) ConvertedType() *deprecated.ConvertedType { return nil } func (t int64Type) PhysicalType() *format.Type { return &physicalTypes[Int64] } diff --git a/vendor/github.com/parquet-go/parquet-go/type_list.go b/vendor/github.com/parquet-go/parquet-go/type_list.go index 20a1bbf866..3f17fde492 100644 --- a/vendor/github.com/parquet-go/parquet-go/type_list.go +++ b/vendor/github.com/parquet-go/parquet-go/type_list.go @@ -21,6 +21,8 @@ func (listNode) Type() Type { return &listType{} } type listType format.ListType +var listLogicalType = format.LogicalType{List: new(format.ListType)} + func (t *listType) String() string { return (*format.ListType)(t).String() } func (t *listType) Kind() Kind { panic("cannot call Kind on parquet LIST type") } @@ -37,9 +39,7 @@ func (t *listType) ColumnOrder() *format.ColumnOrder { return nil } func (t *listType) PhysicalType() *format.Type { return nil } -func (t *listType) LogicalType() *format.LogicalType { - return &format.LogicalType{List: (*format.ListType)(t)} -} +func (t *listType) LogicalType() *format.LogicalType { return &listLogicalType } func (t *listType) ConvertedType() *deprecated.ConvertedType { return &convertedTypes[deprecated.List] diff --git a/vendor/github.com/parquet-go/parquet-go/type_map.go b/vendor/github.com/parquet-go/parquet-go/type_map.go index d5f6a95cc5..66835cda12 100644 --- a/vendor/github.com/parquet-go/parquet-go/type_map.go +++ b/vendor/github.com/parquet-go/parquet-go/type_map.go @@ -26,6 +26,8 @@ func (mapNode) Type() Type { return &mapType{} } type mapType format.MapType +var mapLogicalType = format.LogicalType{Map: new(format.MapType)} + func (t *mapType) String() string { return (*format.MapType)(t).String() } func (t *mapType) Kind() Kind { panic("cannot call Kind on parquet MAP type") } @@ -42,9 +44,7 @@ func (t *mapType) ColumnOrder() *format.ColumnOrder { return nil } func (t *mapType) PhysicalType() *format.Type { return nil } -func (t *mapType) LogicalType() *format.LogicalType { - return &format.LogicalType{Map: (*format.MapType)(t)} -} +func (t *mapType) LogicalType() *format.LogicalType { return &mapLogicalType } func (t *mapType) ConvertedType() *deprecated.ConvertedType { return &convertedTypes[deprecated.Map] diff --git a/vendor/github.com/parquet-go/parquet-go/type_null.go b/vendor/github.com/parquet-go/parquet-go/type_null.go index ca1947d9ce..546bbe98cc 100644 --- a/vendor/github.com/parquet-go/parquet-go/type_null.go +++ b/vendor/github.com/parquet-go/parquet-go/type_null.go @@ -10,6 +10,8 @@ import ( type nullType format.NullType +var nullLogicalType = format.LogicalType{Unknown: new(format.NullType)} + func (t *nullType) String() string { return (*format.NullType)(t).String() } func (t *nullType) Kind() Kind { return -1 } @@ -26,9 +28,7 @@ func (t *nullType) ColumnOrder() *format.ColumnOrder { return nil } func (t *nullType) PhysicalType() *format.Type { return nil } -func (t *nullType) LogicalType() *format.LogicalType { - return &format.LogicalType{Unknown: (*format.NullType)(t)} -} +func (t *nullType) LogicalType() *format.LogicalType { return &nullLogicalType } func (t *nullType) ConvertedType() *deprecated.ConvertedType { return nil } diff --git a/vendor/github.com/parquet-go/parquet-go/type_variant.go b/vendor/github.com/parquet-go/parquet-go/type_variant.go index fd6cc06c97..8d692e0e0c 100644 --- a/vendor/github.com/parquet-go/parquet-go/type_variant.go +++ b/vendor/github.com/parquet-go/parquet-go/type_variant.go @@ -1,30 +1,160 @@ package parquet import ( + "errors" + "fmt" "reflect" + "strings" "github.com/parquet-go/parquet-go/deprecated" "github.com/parquet-go/parquet-go/encoding" "github.com/parquet-go/parquet-go/format" ) -// Variant constructs a node of unshredded VARIANT logical type. It is a group with +// Variant constructs a node of unshredded [VARIANT logical type]. It is a group with // two required fields, "metadata" and "value", both byte arrays. // -// Experimental: The specification for variants is still being developed and the type -// is not fully adopted. Support for this type is subject to change. +// *Experimental*: Support for the VARIANT type is still being developed and subject to +// change. // // Initial support does not attempt to process the variant data. So reading and writing // data of this type behaves as if it were just a group with two byte array fields, as // if the logical type annotation were absent. This may change in the future. // -// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#variant +// [VARIANT logical type]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#variant func Variant() Node { return variantNode{Group{"metadata": Required(Leaf(ByteArrayType)), "value": Required(Leaf(ByteArrayType))}} } -// TODO: add ShreddedVariant(Node) function, to create a shredded variant -// where the argument defines the type/structure of the shredded value(s). +// ShreddedVariant constructs a node of shredded [VARIANT logical type]. It is a group +// with a required byte array "metadata" field, an optional byte array "value" field +// (for any unshredded values), and an optional "typed_value" field whose type is that +// of the shredded value. If the given node is a group or list (or contains a list), +// the resulting "typed_value" field will have some additional structure to allow +// each group or element in a list to have a mix of shredded and unshredded data. +// +// The given node may only contain types that map to valid [variant value types]. +// Therefore, it may not contain ENUM, FLOAT16, INTERVAL, JSON, BSON, VARIANT, +// GEOMETRY, GEOGRAPHY, MAP, or UNKNOWN logical types. It may only use signed INT +// logical types. It may only use DECIMAL logical types whose precision is less than +// or equal to 38. It may not contain any repeated fields unless they are the middle +// level of a 3-level LIST logical type. Any "required" settings on fields will be +// ignored: shredded fields must always be optional to represent values that may not +// conform to the shredded type. It also may not contain empty groups: any groups +// must have at least one field. +// +// More information on shredded variants can be found in the [Parquet documentation]. +// +// *Experimental*: Support for the VARIANT type is still being developed and subject to +// change. +// +// Initial support does not attempt to process the variant data. So reading and writing +// data of this type behaves as if it were just a group with the three described fields, +// as if the logical type annotation were absent. This may change in the future. +// +// [VARIANT logical type]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#variant +// [variant value types]: https://github.com/apache/parquet-format/blob/master/VariantShredding.md#shredded-value-types +// [Parquet documentation]: https://github.com/apache/parquet-format/blob/master/VariantShredding.md +func ShreddedVariant(shreddedType Node) (Node, error) { + typedNode, err := variantTypedValueNode(shreddedType) + if err != nil { + return nil, err + } + return variantNode{Group{ + "metadata": Required(Leaf(ByteArrayType)), + "value": Optional(Leaf(ByteArrayType)), + "typed_value": Optional(typedNode), + }}, nil +} + +func variantTypedValueNode(node Node, fieldPath ...string) (typed Node, err error) { + defer func() { + if err != nil && len(fieldPath) > 0 { + err = fmt.Errorf("field %s: %w", strings.Join(fieldPath, "."), err) + } + }() + if node.Repeated() { + return nil, errors.New("repeated types are not allowed unless they are part of a 3-level LIST logical type") + } + if lt := node.Type().LogicalType(); lt != nil { + switch lt := getLogicalType(lt).(type) { + case *format.ListType: + // We must first extract the inner list.element field of the 3-level LIST type. + children := node.Fields() + if len(children) != 1 { + return nil, fmt.Errorf("invalid LIST logical type: expecting a single 'list' field but instead found %d", len(children)) + } + grandchildren := children[0].Fields() + if len(grandchildren) != 1 { + return nil, fmt.Errorf("invalid LIST logical type: expecting a single 'element' field but instead found %d", len(grandchildren)) + } + elementNode, err := variantTypedValueNode(grandchildren[0], fieldPath...) + if err != nil { + return nil, err + } + list := List(Required(Group{ + "value": Optional(Leaf(ByteArrayType)), + "typed_value": Optional(elementNode), + })) + if node.ID() != 0 { + return FieldID(list, node.ID()), nil + } + return list, nil + case *format.IntType: + if !lt.IsSigned { + return nil, errors.New("signed INT logical types are not allowed") + } + case *format.DecimalType: + if lt.Precision > 38 { + return nil, fmt.Errorf("DECIMAL logical types with precision >38 are not allowed (got %d)", lt.Precision) + } + case *format.DateType: + case *format.TimeType: + case *format.TimestampType: + case *format.StringType: + case *format.UUIDType: + default: + // No other logical types are allowed. + return nil, fmt.Errorf("%s logical types are not allowed", lt) + } + } + if node.Leaf() { + return node, nil + } + children := node.Fields() + if len(children) == 0 { + return nil, errors.New("empty groups are not allowed") + } + group := make(Group, len(children)) + for _, child := range children { + childNode, err := variantTypedValueNode(child, append(fieldPath, child.Name())...) + if err != nil { + return nil, err + } + group[child.Name()] = Required(Group{ + "value": Optional(Leaf(ByteArrayType)), + "typed_value": Optional(childNode), + }) + } + if node.ID() != 0 { + return FieldID(group, node.ID()), nil + } + return group, nil +} + +func getLogicalType(lt *format.LogicalType) any { + // We use reflection so we can always catch a logical type annotation. If a + // new one is added, we don't have to remember to update the switch above (unless + // a new one is added that is also supported as a variant value). + refVal := reflect.Indirect(reflect.ValueOf(lt)) + for i := range refVal.NumField() { + field := refVal.Field(i) + if field.CanInterface() && !field.IsZero() { + return field.Interface() + } + } + return nil +} type variantNode struct{ Group } @@ -32,6 +162,8 @@ func (variantNode) Type() Type { return &variantType{} } type variantType format.VariantType +var variantLogicalType = format.LogicalType{Variant: new(format.VariantType)} + func (t *variantType) String() string { return (*format.VariantType)(t).String() } func (t *variantType) Kind() Kind { panic("cannot call Kind on parquet VARIANT type") } @@ -50,9 +182,7 @@ func (t *variantType) ColumnOrder() *format.ColumnOrder { return nil } func (t *variantType) PhysicalType() *format.Type { return nil } -func (t *variantType) LogicalType() *format.LogicalType { - return &format.LogicalType{Variant: (*format.VariantType)(t)} -} +func (t *variantType) LogicalType() *format.LogicalType { return &variantLogicalType } func (t *variantType) ConvertedType() *deprecated.ConvertedType { return nil } diff --git a/vendor/github.com/parquet-go/parquet-go/writer.go b/vendor/github.com/parquet-go/parquet-go/writer.go index 76b5002704..beb54e9efb 100644 --- a/vendor/github.com/parquet-go/parquet-go/writer.go +++ b/vendor/github.com/parquet-go/parquet-go/writer.go @@ -1026,13 +1026,19 @@ func (w *writer) reset(writer io.Writer) { } w.currentRowGroup.reset() for i := range w.rowGroups { - w.rowGroups[i] = format.RowGroup{} + w.rowGroups[i].Reset() } for i := range w.columnIndexes { - w.columnIndexes[i] = nil + for j := range w.columnIndexes[i] { + w.columnIndexes[i][j].Reset() + } + w.columnIndexes[i] = w.columnIndexes[i][:0] } for i := range w.offsetIndexes { - w.offsetIndexes[i] = nil + for j := range w.offsetIndexes[i] { + w.offsetIndexes[i][j].Reset() + } + w.offsetIndexes[i] = w.offsetIndexes[i][:0] } w.rowGroups = w.rowGroups[:0] w.columnIndexes = w.columnIndexes[:0] @@ -1155,6 +1161,10 @@ func (w *writer) writeRowGroup(rg *ConcurrentRowGroupWriter, rowGroupSchema *Sch return 0, nil } + rowGroupIndex := len(w.rowGroups) + columnIndexIndex := len(w.columnIndexes) + offsetIndexIndex := len(w.offsetIndexes) + if len(w.rowGroups) == MaxRowGroups { return 0, ErrTooManyRowGroups } @@ -1178,10 +1188,9 @@ func (w *writer) writeRowGroup(rg *ConcurrentRowGroupWriter, rowGroupSchema *Sch fileOffset := w.writer.offset for i, c := range rg.columns { - columnIndex := c.columnIndex.ColumnIndex() - columnIndex.RepetitionLevelHistogram = slices.Clone(c.pageRepetitionLevelHistograms) - columnIndex.DefinitionLevelHistogram = slices.Clone(c.pageDefinitionLevelHistograms) - rg.columnIndex[i] = columnIndex + rg.columnIndex[i] = c.columnIndex.ColumnIndex() + rg.columnIndex[i].RepetitionLevelHistogram = append(rg.columnIndex[i].RepetitionLevelHistogram[:0], c.pageRepetitionLevelHistograms...) + rg.columnIndex[i].DefinitionLevelHistogram = append(rg.columnIndex[i].DefinitionLevelHistogram[:0], c.pageDefinitionLevelHistograms...) c.columnChunk.MetaData.SizeStatistics = format.SizeStatistics{ UnencodedByteArrayDataBytes: c.totalUnencodedByteArrayBytes, @@ -1239,9 +1248,20 @@ func (w *writer) writeRowGroup(rg *ConcurrentRowGroupWriter, rowGroupSchema *Sch totalCompressedSize += int64(c.TotalCompressedSize) } + var reuseRowGroup *format.RowGroup = nil + if cap(w.rowGroups) > rowGroupIndex { + w.rowGroups = w.rowGroups[:rowGroupIndex+1] // Extend the slice by one element + reuseRowGroup = &w.rowGroups[rowGroupIndex] // Pointer to the last element + } + sortingColumns := w.sortingColumns if len(sortingColumns) == 0 && len(rowGroupSortingColumns) > 0 { - sortingColumns = make([]format.SortingColumn, 0, len(rowGroupSortingColumns)) + scLen := len(rowGroupSortingColumns) + if reuseRowGroup == nil { + sortingColumns = make([]format.SortingColumn, 0, scLen) + } else { + sortingColumns = reuseRowGroup.SortingColumns + } forEachLeafColumnOf(rowGroupSchema, func(leaf leafColumn) { if sortingIndex := searchSortingColumn(rowGroupSortingColumns, leaf.path); sortingIndex < len(sortingColumns) { sortingColumns[sortingIndex] = format.SortingColumn{ @@ -1253,13 +1273,47 @@ func (w *writer) writeRowGroup(rg *ConcurrentRowGroupWriter, rowGroupSchema *Sch }) } - columns := slices.Clone(rg.columnChunk) - columnIndex := slices.Clone(rg.columnIndex) - offsetIndex := slices.Clone(rg.offsetIndex) + var reuseColumnIndex *[]format.ColumnIndex = nil + if cap(w.columnIndexes) > columnIndexIndex { + // Extend the slice by one element + w.columnIndexes = w.columnIndexes[:columnIndexIndex+1] + reuseColumnIndex = &w.columnIndexes[columnIndexIndex] + } + var columnIndex []format.ColumnIndex + if reuseColumnIndex != nil { + // Copy the slice + columnIndex = append(*reuseColumnIndex, rg.columnIndex...) + w.columnIndexes[columnIndexIndex] = columnIndex + } else { + columnIndex = slices.Clone(rg.columnIndex) + w.columnIndexes = append(w.columnIndexes, columnIndex) + } + var reuseOffsetIndex *[]format.OffsetIndex = nil + if cap(w.offsetIndexes) > offsetIndexIndex { + // Extend the slice by one element + w.offsetIndexes = w.offsetIndexes[:offsetIndexIndex+1] + reuseOffsetIndex = &w.offsetIndexes[offsetIndexIndex] + } + var offsetIndex []format.OffsetIndex + if reuseOffsetIndex != nil { + offsetIndex = append(*reuseOffsetIndex, rg.offsetIndex...) + w.offsetIndexes[offsetIndexIndex] = offsetIndex + } else { + offsetIndex = slices.Clone(rg.offsetIndex) + w.offsetIndexes = append(w.offsetIndexes, offsetIndex) + } + + // If the existing row group slice is full, we need to create a new element + var columns []format.ColumnChunk = nil + if reuseRowGroup == nil { + columns = slices.Clone(rg.columnChunk) + } else { + columns = append(reuseRowGroup.Columns, rg.columnChunk...) + } for i := range columns { c := &columns[i] - c.MetaData.EncodingStats = slices.Clone(rg.columnChunk[i].MetaData.EncodingStats) + c.MetaData.EncodingStats = append(c.MetaData.EncodingStats[:0], rg.columnChunk[i].MetaData.EncodingStats...) } for i := range offsetIndex { @@ -1267,18 +1321,26 @@ func (w *writer) writeRowGroup(rg *ConcurrentRowGroupWriter, rowGroupSchema *Sch c.PageLocations = slices.Clone(rg.offsetIndex[i].PageLocations) } - w.rowGroups = append(w.rowGroups, format.RowGroup{ - Columns: columns, - TotalByteSize: totalByteSize, - NumRows: numRows, - SortingColumns: sortingColumns, - FileOffset: fileOffset, - TotalCompressedSize: totalCompressedSize, - Ordinal: int16(len(w.rowGroups)), - }) + if reuseRowGroup == nil { + w.rowGroups = append(w.rowGroups, format.RowGroup{ + Columns: columns, + TotalByteSize: totalByteSize, + NumRows: numRows, + SortingColumns: sortingColumns, + FileOffset: fileOffset, + TotalCompressedSize: totalCompressedSize, + Ordinal: int16(len(w.rowGroups)), + }) + } else { + reuseRowGroup.Columns = columns + reuseRowGroup.TotalByteSize = totalByteSize + reuseRowGroup.NumRows = numRows + reuseRowGroup.SortingColumns = sortingColumns + reuseRowGroup.FileOffset = fileOffset + reuseRowGroup.TotalCompressedSize = totalCompressedSize + reuseRowGroup.Ordinal = int16(len(w.rowGroups) - 1) + } - w.columnIndexes = append(w.columnIndexes, columnIndex) - w.offsetIndexes = append(w.offsetIndexes, offsetIndex) return numRows, nil } @@ -1946,7 +2008,15 @@ func (c *ColumnWriter) fallbackDictionaryToPlain() error { c.columnType = indexedType.Type } if c.plainColumnBuffer == nil { - c.plainColumnBuffer = c.columnType.NewColumnBuffer(int(c.bufferIndex), int(c.bufferSize)) + base := c.columnType.NewColumnBuffer(int(c.bufferIndex), int(c.bufferSize)) + switch { + case c.maxRepetitionLevel > 0: + c.plainColumnBuffer = newRepeatedColumnBuffer(base, c.maxRepetitionLevel, c.maxDefinitionLevel, nullsGoLast) + case c.maxDefinitionLevel > 0: + c.plainColumnBuffer = newOptionalColumnBuffer(base, c.maxDefinitionLevel, nullsGoLast) + default: + c.plainColumnBuffer = base + } } c.columnBuffer = c.plainColumnBuffer c.encoding = &plain.Encoding{} @@ -2045,16 +2115,20 @@ func (c *ColumnWriter) recordPageStats(headerSize int32, header *format.PageHead definitionLevels := page.DefinitionLevels() if c.maxRepetitionLevel > 0 { - accumulateLevelHistogram(c.repetitionLevelHistogram, repetitionLevels) - c.pageRepetitionLevelHistograms = appendPageLevelHistogram( - c.pageRepetitionLevelHistograms, repetitionLevels, c.maxRepetitionLevel, + c.pageRepetitionLevelHistograms = accumulateAndAppendPageLevelHistogram( + c.repetitionLevelHistogram, + c.pageRepetitionLevelHistograms, + repetitionLevels, + c.maxRepetitionLevel, ) } if c.maxDefinitionLevel > 0 { - accumulateLevelHistogram(c.definitionLevelHistogram, definitionLevels) - c.pageDefinitionLevelHistograms = appendPageLevelHistogram( - c.pageDefinitionLevelHistograms, definitionLevels, c.maxDefinitionLevel, + c.pageDefinitionLevelHistograms = accumulateAndAppendPageLevelHistogram( + c.definitionLevelHistogram, + c.pageDefinitionLevelHistograms, + definitionLevels, + c.maxDefinitionLevel, ) } } diff --git a/vendor/github.com/parquet-go/parquet-go/writer_statistics.go b/vendor/github.com/parquet-go/parquet-go/writer_statistics.go index dd5aa07e45..72bcd3f44b 100644 --- a/vendor/github.com/parquet-go/parquet-go/writer_statistics.go +++ b/vendor/github.com/parquet-go/parquet-go/writer_statistics.go @@ -29,10 +29,7 @@ func appendPageLevelHistogram(histograms []int64, levels []byte, maxLevel byte) histSize := int(maxLevel) + 1 startIndex := len(histograms) histograms = slices.Grow(histograms, histSize)[:startIndex+histSize] - - for i := range histSize { - histograms[startIndex+i] = 0 - } + clear(histograms[startIndex : startIndex+histSize]) for _, level := range levels { histograms[startIndex+int(level)]++ @@ -40,3 +37,25 @@ func appendPageLevelHistogram(histograms []int64, levels []byte, maxLevel byte) return histograms } + +// accumulateAndAppendPageLevelHistogram combines accumulateLevelHistogram and +// appendPageLevelHistogram into a single pass through the levels array. +// It updates both the column-level histogram and creates a per-page histogram. +func accumulateAndAppendPageLevelHistogram( + columnHistogram []int64, + pageHistograms []int64, + levels []byte, + maxLevel byte, +) []int64 { + histSize := int(maxLevel) + 1 + startIndex := len(pageHistograms) + pageHistograms = slices.Grow(pageHistograms, histSize)[:startIndex+histSize] + clear(pageHistograms[startIndex : startIndex+histSize]) + + for _, level := range levels { + columnHistogram[level]++ + pageHistograms[startIndex+int(level)]++ + } + + return pageHistograms +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 4c34179556..0153cb6efd 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1691,7 +1691,7 @@ github.com/parquet-go/bitpack/unsafecast # github.com/parquet-go/jsonlite v1.0.0 ## explicit; go 1.23 github.com/parquet-go/jsonlite -# github.com/parquet-go/parquet-go v0.27.0 +# github.com/parquet-go/parquet-go v0.28.0 ## explicit; go 1.24.9 github.com/parquet-go/parquet-go github.com/parquet-go/parquet-go/bloom