mirror of
https://github.com/grafana/loki.git
synced 2026-03-13 09:33:58 +08:00
199 lines
5.2 KiB
Go
199 lines
5.2 KiB
Go
package columnar
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"github.com/grafana/loki/v3/pkg/memory"
|
|
)
|
|
|
|
// Concat concatenates the input arrays into a single, combined array using the
|
|
// provided allocator. Concat returns an error if not all arrays are of the same
|
|
// kind.
|
|
func Concat(alloc *memory.Allocator, in []Array) (Array, error) {
|
|
if len(in) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
kind := in[0].Kind()
|
|
for _, arr := range in[1:] {
|
|
if want, got := kind, arr.Kind(); want != got {
|
|
return nil, fmt.Errorf("concat expected array of %s, but got %s", want, got)
|
|
}
|
|
}
|
|
|
|
switch kind {
|
|
case KindNull:
|
|
return concatNull(alloc, in)
|
|
case KindBool:
|
|
return concatBool(alloc, in)
|
|
case KindInt64:
|
|
return concatNumber[int64](alloc, in)
|
|
case KindUint64:
|
|
return concatNumber[uint64](alloc, in)
|
|
case KindUTF8:
|
|
return concatUTF8(alloc, in)
|
|
default:
|
|
return nil, fmt.Errorf("unsupported array kind %s", kind)
|
|
}
|
|
}
|
|
|
|
func concatNull(alloc *memory.Allocator, in []Array) (Array, error) {
|
|
totalLen := getTotalLen(in)
|
|
validity := memory.NewBitmap(alloc, totalLen)
|
|
validity.AppendCount(false, totalLen)
|
|
return NewNull(validity), nil
|
|
}
|
|
|
|
func getTotalLen(in []Array) int {
|
|
var total int
|
|
for _, arr := range in {
|
|
total += arr.Len()
|
|
}
|
|
return total
|
|
}
|
|
|
|
func concatBool(alloc *memory.Allocator, in []Array) (Array, error) {
|
|
totalLen := getTotalLen(in)
|
|
|
|
var (
|
|
validity = memory.NewBitmap(alloc, totalLen)
|
|
values = memory.NewBitmap(alloc, totalLen)
|
|
)
|
|
|
|
for _, arr := range in {
|
|
if arr.Len() == 0 {
|
|
continue
|
|
}
|
|
|
|
appendValidityBitmap(&validity, arr)
|
|
values.AppendBitmap(arr.(*Bool).Values())
|
|
}
|
|
|
|
cleanValidity(&validity)
|
|
return NewBool(values, validity), nil
|
|
}
|
|
|
|
// appendValidityBitmap appends the validity bitmap of srcArray into dst. If
|
|
// srcArray lacks a validity bitmap (because it has no nulls), srcArray.Len()
|
|
// true values are appended into dst.
|
|
func appendValidityBitmap(dst *memory.Bitmap, srcArray Array) {
|
|
srcValidity := srcArray.Validity()
|
|
if srcValidity.Len() == 0 {
|
|
// The validity bitmap does not exist; append all true values up to the
|
|
// size of the source array.
|
|
dst.AppendCount(true, srcArray.Len())
|
|
}
|
|
dst.AppendBitmap(srcValidity)
|
|
}
|
|
|
|
func cleanValidity(validity *memory.Bitmap) {
|
|
if validity.ClearCount() == 0 {
|
|
// Drop the bitmap; it has no nulls.
|
|
*validity = memory.Bitmap{}
|
|
}
|
|
}
|
|
|
|
func concatNumber[T Numeric](alloc *memory.Allocator, in []Array) (Array, error) {
|
|
totalLen := getTotalLen(in)
|
|
|
|
var (
|
|
validity = memory.NewBitmap(alloc, totalLen)
|
|
values = memory.NewBuffer[T](alloc, totalLen)
|
|
)
|
|
|
|
for _, arr := range in {
|
|
if arr.Len() == 0 {
|
|
continue
|
|
}
|
|
|
|
appendValidityBitmap(&validity, arr)
|
|
values.Append(arr.(*Number[T]).Values()...)
|
|
}
|
|
|
|
cleanValidity(&validity)
|
|
return NewNumber[T](values.Data(), validity), nil
|
|
}
|
|
|
|
// concatUTF8 concatenates UTF8 arrays, normalizing offsets to be zero-based.
|
|
func concatUTF8(alloc *memory.Allocator, in []Array) (Array, error) {
|
|
totalLen := getTotalLen(in)
|
|
|
|
var totalDataLen int
|
|
for _, arr := range in {
|
|
// NOTE(rfratto): This is more accurate than len(arr.(*UTF8).Data()),
|
|
// which doesn't account arr being a slice.
|
|
totalDataLen += arr.(*UTF8).DataLen()
|
|
}
|
|
|
|
var (
|
|
validity = memory.NewBitmap(alloc, totalLen)
|
|
offsetsBuf = memory.NewBuffer[int32](alloc, totalLen+1) // +1 for first offset
|
|
data = memory.NewBuffer[byte](alloc, totalDataLen)
|
|
)
|
|
|
|
offsetsBuf.Resize(totalLen + 1)
|
|
|
|
var lastOffset int32
|
|
var numOffsets int
|
|
|
|
offsets := offsetsBuf.Data()
|
|
offsets[numOffsets] = 0
|
|
numOffsets++
|
|
|
|
for _, arr := range in {
|
|
if arr.Len() == 0 {
|
|
continue
|
|
}
|
|
|
|
appendValidityBitmap(&validity, arr)
|
|
|
|
var (
|
|
utf8Array = arr.(*UTF8)
|
|
fullArrData = utf8Array.Data()
|
|
arrOffsets = utf8Array.Offsets()
|
|
|
|
// arrData holds the subset of data used by arrOffsets.
|
|
//
|
|
// This is smaller than fullArrData when arr is a slice, where
|
|
// arrOffsets points to a subset of data.
|
|
//
|
|
// This reduces the amount of data we write, but is also needed for
|
|
// correctness, as we need everything in the final data buffer to be
|
|
// contiguous with no gaps.
|
|
arrData = fullArrData[arrOffsets[0]:arrOffsets[len(arrOffsets)-1]]
|
|
)
|
|
|
|
// We're going to normalize offsets and data as we write, so we need to
|
|
// shrink arrData to what's actually used.
|
|
data.Append(arrData...)
|
|
|
|
// We don't want to append the first offset from an array; it's always
|
|
// the last offset we already added. Appending the first offset would
|
|
// cause an incorrect total offset count.
|
|
//
|
|
// We do need to at least temporarily store the first offset to
|
|
// determine relative offsets for the concatenated offsets, though.
|
|
arrStartOffset := arrOffsets[0]
|
|
|
|
for _, off := range arrOffsets[1:] {
|
|
// Determine relative offset to start, then shift by the
|
|
// accumulative offset.
|
|
actualOff := lastOffset + (off - arrStartOffset)
|
|
|
|
// NOTE(rfratto): There's a 4% increase in read speed by manually
|
|
// modifying the slice rather than appending via the buffer.
|
|
//
|
|
// TODO(rfratto): Consider updating memory.Buffer to make appends
|
|
// faster to avoid needing to do this hack all over the hot path (like
|
|
// dataset page decoders).
|
|
offsets[numOffsets] = actualOff
|
|
numOffsets++
|
|
}
|
|
|
|
lastOffset = offsets[numOffsets-1]
|
|
}
|
|
|
|
cleanValidity(&validity)
|
|
return NewUTF8(data.Data(), offsets, validity), nil
|
|
}
|