Merge pull request #21797 from mtrmac/sparse0

Reformulate sparseWriter
This commit is contained in:
openshift-merge-bot[bot]
2024-02-25 14:36:08 +00:00
committed by GitHub
2 changed files with 129 additions and 106 deletions

View File

@ -1,19 +1,12 @@
package compression package compression
import ( import (
"bytes"
"errors" "errors"
"fmt"
"io" "io"
) )
type state int const zerosThreshold = 1024
const (
zerosThreshold = 1024
stateData = iota
stateZeros
)
type WriteSeekCloser interface { type WriteSeekCloser interface {
io.Closer io.Closer
@ -21,108 +14,120 @@ type WriteSeekCloser interface {
} }
type sparseWriter struct { type sparseWriter struct {
state state
file WriteSeekCloser file WriteSeekCloser
zeros int64 // Invariant between method calls:
lastIsZero bool // The contents of the file match the contents passed to Write, except that pendingZeroes trailing zeroes have not been written.
// Also, the data that _has_ been written does not end with a zero byte (i.e. pendingZeroes is the largest possible value.
pendingZeroes int64
} }
func NewSparseWriter(file WriteSeekCloser) *sparseWriter { func NewSparseWriter(file WriteSeekCloser) *sparseWriter {
return &sparseWriter{ return &sparseWriter{
file: file, file: file,
state: stateData, pendingZeroes: 0,
zeros: 0,
lastIsZero: false,
} }
} }
func (sw *sparseWriter) createHole() error { func (sw *sparseWriter) createHole(size int64) error {
zeros := sw.zeros _, err := sw.file.Seek(size, io.SeekCurrent)
if zeros == 0 {
return nil
}
sw.zeros = 0
sw.lastIsZero = true
_, err := sw.file.Seek(zeros, io.SeekCurrent)
return err return err
} }
func findFirstNotZero(b []byte) int { func zeroSpanEnd(b []byte, i int) int {
for i, v := range b { for i < len(b) && b[i] == 0 {
if v != 0 { i++
}
return i return i
} }
func nonzeroSpanEnd(b []byte, i int) int {
for i < len(b) && b[i] != 0 {
i++
} }
return -1 return i
} }
// Write writes data to the file, creating holes for long sequences of zeros. // Write writes data to the file, creating holes for long sequences of zeros.
func (sw *sparseWriter) Write(data []byte) (int, error) { func (sw *sparseWriter) Write(data []byte) (int, error) {
written, current := 0, 0 initialZeroSpanLength := zeroSpanEnd(data, 0)
totalLen := len(data) if initialZeroSpanLength == len(data) {
for current < len(data) { sw.pendingZeroes += int64(initialZeroSpanLength)
switch sw.state { return initialZeroSpanLength, nil
case stateData:
nextZero := bytes.IndexByte(data[current:], 0)
if nextZero < 0 {
_, err := sw.file.Write(data[written:])
sw.lastIsZero = false
return totalLen, err
} else {
current += nextZero
sw.state = stateZeros
} }
case stateZeros:
nextNonZero := findFirstNotZero(data[current:]) // We have _some_ non-zero data to write.
if nextNonZero < 0 { // Think of the input as an alternating sequence of spans of zeroes / non-zeroes 0a0b…c0,
// finish with a zero, flush any data and keep track of the zeros // where the starting/ending span of zeroes may be empty.
if written != current {
if _, err := sw.file.Write(data[written:current]); err != nil { pendingWriteOffset := 0
// The expected condition for creating a hole would be sw.pendingZeroes + initialZeroSpanLength >= zerosThreshold; but
// if sw.pendingZeroes != 0, we are going to spend a syscall to deal with sw.pendingZeroes either way.
// We might just as well make it a createHole(), even if the hole size is below zeroThreshold.
if sw.pendingZeroes != 0 || initialZeroSpanLength >= zerosThreshold {
if err := sw.createHole(sw.pendingZeroes + int64(initialZeroSpanLength)); err != nil {
return -1, err return -1, err
} }
sw.lastIsZero = false // We could set sw.pendingZeroes = 0 now; it would always be overwritten on successful return from this function.
pendingWriteOffset = initialZeroSpanLength
} }
sw.zeros += int64(len(data) - current)
return totalLen, nil current := initialZeroSpanLength
for {
// Invariant at this point of this loop:
// - pendingWriteOffset <= current < len(data)
// - data[current] != 0
// - data[pendingWriteOffset:current] has not yet been written
if pendingWriteOffset > current || current >= len(data) {
return -1, fmt.Errorf("internal error: sparseWriter invariant violation: %d <= %d < %d", pendingWriteOffset, current, len(data))
} }
// do not bother with too short sequences if b := data[current]; b == 0 {
if sw.zeros == 0 && nextNonZero < zerosThreshold { return -1, fmt.Errorf("internal error: sparseWriter invariant violation: %d@%d", b, current)
sw.state = stateData }
current += nextNonZero
nonzeroSpanEnd := nonzeroSpanEnd(data, current)
if nonzeroSpanEnd == current {
return -1, fmt.Errorf("internal error: sparseWriters nonzeroSpanEnd didnt advance")
}
zeroSpanEnd := zeroSpanEnd(data, nonzeroSpanEnd) // possibly == nonzeroSpanEnd
zeroSpanLength := zeroSpanEnd - nonzeroSpanEnd
if zeroSpanEnd < len(data) && zeroSpanLength < zerosThreshold {
// Too small a hole, keep going
current = zeroSpanEnd
continue continue
} }
if written != current {
if _, err := sw.file.Write(data[written:current]); err != nil { // We have either reached the end, or found an interesting hole. Issue a write.
if _, err := sw.file.Write(data[pendingWriteOffset:nonzeroSpanEnd]); err != nil {
return -1, err return -1, err
} }
sw.lastIsZero = false if zeroSpanEnd == len(data) {
sw.pendingZeroes = int64(zeroSpanLength)
return zeroSpanEnd, nil
} }
sw.zeros += int64(nextNonZero)
current += nextNonZero if err := sw.createHole(int64(zeroSpanLength)); err != nil {
if err := sw.createHole(); err != nil {
return -1, err return -1, err
} }
written = current pendingWriteOffset = zeroSpanEnd
current = zeroSpanEnd
} }
} }
return totalLen, nil
}
// Close closes the SparseWriter's underlying file. // Close closes the SparseWriter's underlying file.
func (sw *sparseWriter) Close() error { func (sw *sparseWriter) Close() error {
if sw.file == nil { if sw.file == nil {
return errors.New("file is already closed") return errors.New("file is already closed")
} }
if err := sw.createHole(); err != nil { if sw.pendingZeroes != 0 {
if holeSize := sw.pendingZeroes - 1; holeSize >= zerosThreshold {
if err := sw.createHole(holeSize); err != nil {
sw.file.Close() sw.file.Close()
return err return err
} }
if sw.lastIsZero { sw.pendingZeroes -= holeSize
if _, err := sw.file.Seek(-1, io.SeekCurrent); err != nil {
sw.file.Close()
return err
} }
if _, err := sw.file.Write([]byte{0}); err != nil { var zeroArray [zerosThreshold]byte
if _, err := sw.file.Write(zeroArray[:sw.pendingZeroes]); err != nil {
sw.file.Close() sw.file.Close()
return err return err
} }

View File

@ -3,22 +3,32 @@ package compression
import ( import (
"bytes" "bytes"
"errors" "errors"
"fmt"
"io" "io"
"testing" "testing"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
) )
type memorySparseFile struct { type memorySparseFile struct {
buffer bytes.Buffer buffer bytes.Buffer
pos int64 pos int64
sparse int64
} }
func (m *memorySparseFile) Seek(offset int64, whence int) (int64, error) { func (m *memorySparseFile) Seek(offset int64, whence int) (int64, error) {
logrus.Debugf("Seek %d %d", offset, whence)
var newPos int64 var newPos int64
switch whence { switch whence {
case io.SeekStart: case io.SeekStart:
newPos = offset panic("unexpected")
case io.SeekCurrent: case io.SeekCurrent:
newPos = m.pos + offset newPos = m.pos + offset
if offset < -1 {
panic("unexpected")
}
m.sparse += offset
case io.SeekEnd: case io.SeekEnd:
newPos = int64(m.buffer.Len()) + offset newPos = int64(m.buffer.Len()) + offset
default: default:
@ -34,6 +44,7 @@ func (m *memorySparseFile) Seek(offset int64, whence int) (int64, error) {
} }
func (m *memorySparseFile) Write(b []byte) (n int, err error) { func (m *memorySparseFile) Write(b []byte) (n int, err error) {
logrus.Debugf("Write %d", len(b))
if int64(m.buffer.Len()) < m.pos { if int64(m.buffer.Len()) < m.pos {
padding := make([]byte, m.pos-int64(m.buffer.Len())) padding := make([]byte, m.pos-int64(m.buffer.Len()))
_, err := m.buffer.Write(padding) _, err := m.buffer.Write(padding)
@ -42,8 +53,6 @@ func (m *memorySparseFile) Write(b []byte) (n int, err error) {
} }
} }
m.buffer.Next(int(m.pos) - m.buffer.Len())
n, err = m.buffer.Write(b) n, err = m.buffer.Write(b)
m.pos += int64(n) m.pos += int64(n)
return n, err return n, err
@ -53,7 +62,7 @@ func (m *memorySparseFile) Close() error {
return nil return nil
} }
func testInputWithWriteLen(t *testing.T, input []byte, chunkSize int) { func testInputWithWriteLen(t *testing.T, input []byte, minSparse int64, chunkSize int) {
m := &memorySparseFile{} m := &memorySparseFile{}
sparseWriter := NewSparseWriter(m) sparseWriter := NewSparseWriter(m)
@ -71,15 +80,16 @@ func testInputWithWriteLen(t *testing.T, input []byte, chunkSize int) {
if err != nil { if err != nil {
t.Fatalf("Expected no error, got %v", err) t.Fatalf("Expected no error, got %v", err)
} }
if !bytes.Equal(input, m.buffer.Bytes()) { assert.Equal(t, string(input), m.buffer.String())
t.Fatalf("Incorrect output") assert.GreaterOrEqual(t, m.sparse, minSparse)
}
} }
func testInput(t *testing.T, inputBytes []byte) { func testInput(t *testing.T, name string, inputBytes []byte, minSparse int64) {
currentLen := 1 currentLen := 1
for { for {
testInputWithWriteLen(t, inputBytes, currentLen) t.Run(fmt.Sprintf("%s@%d", name, currentLen), func(t *testing.T) {
testInputWithWriteLen(t, inputBytes, minSparse, currentLen)
})
currentLen <<= 1 currentLen <<= 1
if currentLen > len(inputBytes) { if currentLen > len(inputBytes) {
break break
@ -88,22 +98,30 @@ func testInput(t *testing.T, inputBytes []byte) {
} }
func TestSparseWriter(t *testing.T) { func TestSparseWriter(t *testing.T) {
testInput(t, []byte("hello")) testInput(t, "small contents", []byte("hello"), 0)
testInput(t, append(make([]byte, 100), []byte("hello")...)) testInput(t, "small zeroes", append(make([]byte, 100), []byte("hello")...), 0)
testInput(t, []byte("")) testInput(t, "empty", []byte(""), 0)
testInput(t, "small iterated", []byte{'a', 0, 'a', 0, 'a', 0}, 0)
testInput(t, "small iterated2", []byte{0, 'a', 0, 'a', 0, 'a'}, 0)
// add "hello" at the beginning // add "hello" at the beginning
largeInput := make([]byte, 1024*1024) const largeSize = 1024 * 1024
largeInput := make([]byte, largeSize)
copy(largeInput, []byte("hello")) copy(largeInput, []byte("hello"))
testInput(t, largeInput) testInput(t, "sparse end", largeInput, largeSize-5-1) // -1 for the final byte establishing file size
// add "hello" at the end // add "hello" at the end
largeInput = make([]byte, 1024*1024) largeInput = make([]byte, largeSize)
copy(largeInput[1024*1024-5:], []byte("hello")) copy(largeInput[largeSize-5:], []byte("hello"))
testInput(t, largeInput) testInput(t, "sparse beginning", largeInput, largeSize-5)
// add "hello" in the middle // add "hello" in the middle
largeInput = make([]byte, 1024*1024) largeInput = make([]byte, largeSize)
copy(largeInput[len(largeInput)/2:], []byte("hello")) copy(largeInput[len(largeInput)/2:], []byte("hello"))
testInput(t, largeInput) testInput(t, "sparse both ends", largeInput, largeSize-5-1) // -1 for the final byte establishing file size
largeInput = make([]byte, largeSize)
copy(largeInput[0:5], []byte("hello"))
copy(largeInput[largeSize-5:], []byte("HELLO"))
testInput(t, "sparse middle", largeInput, largeSize-10)
} }