diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 62a31855b..544fa32d2 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -278,6 +278,10 @@ "ImportPath": "github.com/texttheater/golang-levenshtein/levenshtein", "Rev": "dfd657628c58d3eeaa26391097853b2473c8b94e" }, + { + "ImportPath": "github.com/whyrusleeping/chunker", + "Rev": "537e901819164627ca4bb5ce4e3faa8ce7956564" + }, { "ImportPath": "github.com/whyrusleeping/go-metrics", "Rev": "1cd8009604ec2238b5a71305a0ecd974066e0e16" diff --git a/Godeps/_workspace/src/github.com/whyrusleeping/chunker/.travis.yml b/Godeps/_workspace/src/github.com/whyrusleeping/chunker/.travis.yml new file mode 100644 index 000000000..01ccb409e --- /dev/null +++ b/Godeps/_workspace/src/github.com/whyrusleeping/chunker/.travis.yml @@ -0,0 +1,10 @@ +language: go +sudo: false + +go: + - 1.3.3 + - 1.4.2 + +os: + - linux + - osx diff --git a/Godeps/_workspace/src/github.com/whyrusleeping/chunker/LICENSE b/Godeps/_workspace/src/github.com/whyrusleeping/chunker/LICENSE new file mode 100644 index 000000000..04f854350 --- /dev/null +++ b/Godeps/_workspace/src/github.com/whyrusleeping/chunker/LICENSE @@ -0,0 +1,23 @@ +Copyright (c) 2014, Alexander Neumann +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/Godeps/_workspace/src/github.com/whyrusleeping/chunker/README.md b/Godeps/_workspace/src/github.com/whyrusleeping/chunker/README.md new file mode 100644 index 000000000..a4e33a381 --- /dev/null +++ b/Godeps/_workspace/src/github.com/whyrusleeping/chunker/README.md @@ -0,0 +1,7 @@ +[![Build Status](https://travis-ci.org/restic/chunker.svg?branch=master)](https://travis-ci.org/restic/chunker) + +Content Defined Chunking (CDC) based on a rolling Rabin Checksum. + +Part of https://github.com/restic/restic. + +Better README will follow soon. diff --git a/Godeps/_workspace/src/github.com/whyrusleeping/chunker/chunker.go b/Godeps/_workspace/src/github.com/whyrusleeping/chunker/chunker.go new file mode 100644 index 000000000..0cc8dfd69 --- /dev/null +++ b/Godeps/_workspace/src/github.com/whyrusleeping/chunker/chunker.go @@ -0,0 +1,370 @@ +package chunker + +import ( + "errors" + "hash" + "io" + "math" + "sync" +) + +const ( + KiB = 1024 + MiB = 1024 * KiB + + // WindowSize is the size of the sliding window. + windowSize = 16 + + chunkerBufSize = 512 * KiB +) + +var bufPool = sync.Pool{ + New: func() interface{} { return make([]byte, chunkerBufSize) }, +} + +type tables struct { + out [256]Pol + mod [256]Pol +} + +// cache precomputed tables, these are read-only anyway +var cache struct { + entries map[Pol]*tables + sync.Mutex +} + +func init() { + cache.entries = make(map[Pol]*tables) +} + +// Chunk is one content-dependent chunk of bytes whose end was cut when the +// Rabin Fingerprint had the value stored in Cut. +type Chunk struct { + Start uint64 + Length uint64 + Cut uint64 + Digest []byte + Data []byte +} + +func (c Chunk) Reader(r io.ReaderAt) io.Reader { + return io.NewSectionReader(r, int64(c.Start), int64(c.Length)) +} + +// Chunker splits content with Rabin Fingerprints. +type Chunker struct { + pol Pol + polShift uint64 + tables *tables + + rd io.Reader + closed bool + + chunkbuf []byte + + window [windowSize]byte + wpos int + + buf []byte + bpos uint64 + bmax uint64 + + start uint64 + count uint64 + pos uint64 + + pre uint64 // wait for this many bytes before start calculating an new chunk + + digest uint64 + h hash.Hash + + sizeMask uint64 + + // minimal and maximal size of the outputted blocks + MinSize uint64 + MaxSize uint64 +} + +// New returns a new Chunker based on polynomial p that reads from rd +// with bufsize and pass all data to hash along the way. +func New(rd io.Reader, pol Pol, h hash.Hash, avSize, min, max uint64) *Chunker { + + sizepow := uint(math.Log2(float64(avSize))) + + c := &Chunker{ + buf: bufPool.Get().([]byte), + h: h, + pol: pol, + rd: rd, + chunkbuf: make([]byte, 0, max), + sizeMask: (1 << sizepow) - 1, + + MinSize: min, + MaxSize: max, + } + + c.reset() + + return c +} + +func (c *Chunker) reset() { + c.polShift = uint64(c.pol.Deg() - 8) + c.fillTables() + + for i := 0; i < windowSize; i++ { + c.window[i] = 0 + } + + c.closed = false + c.digest = 0 + c.wpos = 0 + c.count = 0 + c.slide(1) + c.start = c.pos + + if c.h != nil { + c.h.Reset() + } + + // do not start a new chunk unless at least MinSize bytes have been read + c.pre = c.MinSize - windowSize +} + +// Calculate out_table and mod_table for optimization. Must be called only +// once. This implementation uses a cache in the global variable cache. +func (c *Chunker) fillTables() { + // if polynomial hasn't been specified, do not compute anything for now + if c.pol == 0 { + return + } + + // test if the tables are cached for this polynomial + cache.Lock() + defer cache.Unlock() + if t, ok := cache.entries[c.pol]; ok { + c.tables = t + return + } + + // else create a new entry + c.tables = &tables{} + cache.entries[c.pol] = c.tables + + // calculate table for sliding out bytes. The byte to slide out is used as + // the index for the table, the value contains the following: + // out_table[b] = Hash(b || 0 || ... || 0) + // \ windowsize-1 zero bytes / + // To slide out byte b_0 for window size w with known hash + // H := H(b_0 || ... || b_w), it is sufficient to add out_table[b_0]: + // H(b_0 || ... || b_w) + H(b_0 || 0 || ... || 0) + // = H(b_0 + b_0 || b_1 + 0 || ... || b_w + 0) + // = H( 0 || b_1 || ... || b_w) + // + // Afterwards a new byte can be shifted in. + for b := 0; b < 256; b++ { + var h Pol + + h = appendByte(h, byte(b), c.pol) + for i := 0; i < windowSize-1; i++ { + h = appendByte(h, 0, c.pol) + } + c.tables.out[b] = h + } + + // calculate table for reduction mod Polynomial + k := c.pol.Deg() + for b := 0; b < 256; b++ { + // mod_table[b] = A | B, where A = (b(x) * x^k mod pol) and B = b(x) * x^k + // + // The 8 bits above deg(Polynomial) determine what happens next and so + // these bits are used as a lookup to this table. The value is split in + // two parts: Part A contains the result of the modulus operation, part + // B is used to cancel out the 8 top bits so that one XOR operation is + // enough to reduce modulo Polynomial + c.tables.mod[b] = Pol(uint64(b)<= c.bmax { + n, err := io.ReadFull(c.rd, c.buf[:]) + c.chunkbuf = append(c.chunkbuf, c.buf[:n]...) + + if err == io.ErrUnexpectedEOF { + err = nil + } + + // io.ReadFull only returns io.EOF when no bytes could be read. If + // this is the case and we're in this branch, there are no more + // bytes to buffer, so this was the last chunk. If a different + // error has occurred, return that error and abandon the current + // chunk. + if err == io.EOF && !c.closed { + c.closed = true + + // return the buffer to the pool + bufPool.Put(c.buf) + + data := c.nextBytes() + + // return current chunk, if any bytes have been processed + if c.count > 0 { + return &Chunk{ + Start: c.start, + Length: c.count, + Cut: c.digest, + Digest: c.hashDigest(), + Data: data, + }, nil + } + } + + if err != nil { + return nil, err + } + + c.bpos = 0 + c.bmax = uint64(n) + } + + // check if bytes have to be dismissed before starting a new chunk + if c.pre > 0 { + n := c.bmax - c.bpos + if c.pre > uint64(n) { + c.pre -= uint64(n) + c.updateHash(c.buf[c.bpos:c.bmax]) + + c.count += uint64(n) + c.pos += uint64(n) + c.bpos = c.bmax + + continue + } + + c.updateHash(c.buf[c.bpos : c.bpos+c.pre]) + + c.bpos += c.pre + c.count += c.pre + c.pos += c.pre + c.pre = 0 + } + + add := c.count + for _, b := range c.buf[c.bpos:c.bmax] { + // inline c.slide(b) and append(b) to increase performance + out := c.window[c.wpos] + c.window[c.wpos] = b + c.digest ^= uint64(c.tables.out[out]) + c.wpos = (c.wpos + 1) % windowSize + + // c.append(b) + index := c.digest >> c.polShift + c.digest <<= 8 + c.digest |= uint64(b) + + c.digest ^= uint64(c.tables.mod[index]) + // end inline + + add++ + if add < c.MinSize { + continue + } + + if (c.digest&c.sizeMask) == 0 || add >= c.MaxSize { + i := add - c.count - 1 + c.updateHash(c.buf[c.bpos : c.bpos+uint64(i)+1]) + c.count = add + c.pos += uint64(i) + 1 + c.bpos += uint64(i) + 1 + + data := c.nextBytes() + + chunk := &Chunk{ + Start: c.start, + Length: c.count, + Cut: c.digest, + Digest: c.hashDigest(), + Data: data, + } + + c.reset() + + return chunk, nil + } + } + + steps := c.bmax - c.bpos + if steps > 0 { + c.updateHash(c.buf[c.bpos : c.bpos+steps]) + } + c.count += steps + c.pos += steps + c.bpos = c.bmax + } +} + +func dupBytes(b []byte) []byte { + out := make([]byte, len(b)) + copy(out, b) + return out +} + +func (c *Chunker) updateHash(data []byte) { + if c.h != nil { + // the hashes from crypto/sha* do not return an error + _, err := c.h.Write(data) + if err != nil { + panic(err) + } + } +} + +func (c *Chunker) hashDigest() []byte { + if c.h == nil { + return nil + } + + return c.h.Sum(nil) +} + +func (c *Chunker) append(b byte) { + index := c.digest >> c.polShift + c.digest <<= 8 + c.digest |= uint64(b) + + c.digest ^= uint64(c.tables.mod[index]) +} + +func (c *Chunker) slide(b byte) { + out := c.window[c.wpos] + c.window[c.wpos] = b + c.digest ^= uint64(c.tables.out[out]) + c.wpos = (c.wpos + 1) % windowSize + + c.append(b) +} + +func appendByte(hash Pol, b byte, pol Pol) Pol { + hash <<= 8 + hash |= Pol(b) + + return hash.Mod(pol) +} diff --git a/Godeps/_workspace/src/github.com/whyrusleeping/chunker/chunker_test.go b/Godeps/_workspace/src/github.com/whyrusleeping/chunker/chunker_test.go new file mode 100644 index 000000000..9fa54f233 --- /dev/null +++ b/Godeps/_workspace/src/github.com/whyrusleeping/chunker/chunker_test.go @@ -0,0 +1,298 @@ +package chunker_test + +import ( + "bytes" + "crypto/md5" + "crypto/sha256" + "encoding/hex" + "hash" + "io" + "io/ioutil" + "math/rand" + "testing" + "time" + + "github.com/restic/chunker" + . "github.com/restic/restic/test" +) + +func parseDigest(s string) []byte { + d, err := hex.DecodeString(s) + if err != nil { + panic(err) + } + + return d +} + +type chunk struct { + Length uint + CutFP uint64 + Digest []byte +} + +// polynomial used for all the tests below +const testPol = chunker.Pol(0x3DA3358B4DC173) + +// created for 32MB of random data out of math/rand's Uint32() seeded by +// constant 23 +// +// chunking configuration: +// window size 64, avg chunksize 1<<20, min chunksize 1<<19, max chunksize 1<<23 +// polynom 0x3DA3358B4DC173 +var chunks1 = []chunk{ + chunk{2163460, 0x000b98d4cdf00000, parseDigest("4b94cb2cf293855ea43bf766731c74969b91aa6bf3c078719aabdd19860d590d")}, + chunk{643703, 0x000d4e8364d00000, parseDigest("5727a63c0964f365ab8ed2ccf604912f2ea7be29759a2b53ede4d6841e397407")}, + chunk{1528956, 0x0015a25c2ef00000, parseDigest("a73759636a1e7a2758767791c69e81b69fb49236c6929e5d1b654e06e37674ba")}, + chunk{1955808, 0x00102a8242e00000, parseDigest("c955fb059409b25f07e5ae09defbbc2aadf117c97a3724e06ad4abd2787e6824")}, + chunk{2222372, 0x00045da878000000, parseDigest("6ba5e9f7e1b310722be3627716cf469be941f7f3e39a4c3bcefea492ec31ee56")}, + chunk{2538687, 0x00198a8179900000, parseDigest("8687937412f654b5cfe4a82b08f28393a0c040f77c6f95e26742c2fc4254bfde")}, + chunk{609606, 0x001d4e8d17100000, parseDigest("5da820742ff5feb3369112938d3095785487456f65a8efc4b96dac4be7ebb259")}, + chunk{1205738, 0x000a7204dd600000, parseDigest("cc70d8fad5472beb031b1aca356bcab86c7368f40faa24fe5f8922c6c268c299")}, + chunk{959742, 0x00183e71e1400000, parseDigest("4065bdd778f95676c92b38ac265d361f81bff17d76e5d9452cf985a2ea5a4e39")}, + chunk{4036109, 0x001fec043c700000, parseDigest("b9cf166e75200eb4993fc9b6e22300a6790c75e6b0fc8f3f29b68a752d42f275")}, + chunk{1525894, 0x000b1574b1500000, parseDigest("2f238180e4ca1f7520a05f3d6059233926341090f9236ce677690c1823eccab3")}, + chunk{1352720, 0x00018965f2e00000, parseDigest("afd12f13286a3901430de816e62b85cc62468c059295ce5888b76b3af9028d84")}, + chunk{811884, 0x00155628aa100000, parseDigest("42d0cdb1ee7c48e552705d18e061abb70ae7957027db8ae8db37ec756472a70a")}, + chunk{1282314, 0x001909a0a1400000, parseDigest("819721c2457426eb4f4c7565050c44c32076a56fa9b4515a1c7796441730eb58")}, + chunk{1318021, 0x001cceb980000000, parseDigest("842eb53543db55bacac5e25cb91e43cc2e310fe5f9acc1aee86bdf5e91389374")}, + chunk{948640, 0x0011f7a470a00000, parseDigest("b8e36bf7019bb96ac3fb7867659d2167d9d3b3148c09fe0de45850b8fe577185")}, + chunk{645464, 0x00030ce2d9400000, parseDigest("5584bd27982191c3329f01ed846bfd266e96548dfa87018f745c33cfc240211d")}, + chunk{533758, 0x0004435c53c00000, parseDigest("4da778a25b72a9a0d53529eccfe2e5865a789116cb1800f470d8df685a8ab05d")}, + chunk{1128303, 0x0000c48517800000, parseDigest("08c6b0b38095b348d80300f0be4c5184d2744a17147c2cba5cc4315abf4c048f")}, + chunk{800374, 0x000968473f900000, parseDigest("820284d2c8fd243429674c996d8eb8d3450cbc32421f43113e980f516282c7bf")}, + chunk{2453512, 0x001e197c92600000, parseDigest("5fa870ed107c67704258e5e50abe67509fb73562caf77caa843b5f243425d853")}, + chunk{2651975, 0x000ae6c868000000, parseDigest("181347d2bbec32bef77ad5e9001e6af80f6abcf3576549384d334ee00c1988d8")}, + chunk{237392, 0x0000000000000001, parseDigest("fcd567f5d866357a8e299fd5b2359bb2c8157c30395229c4e9b0a353944a7978")}, +} + +// test if nullbytes are correctly split, even if length is a multiple of MinSize. +var chunks2 = []chunk{ + chunk{chunker.MinSize, 0, parseDigest("07854d2fef297a06ba81685e660c332de36d5d18d546927d30daad6d7fda1541")}, + chunk{chunker.MinSize, 0, parseDigest("07854d2fef297a06ba81685e660c332de36d5d18d546927d30daad6d7fda1541")}, + chunk{chunker.MinSize, 0, parseDigest("07854d2fef297a06ba81685e660c332de36d5d18d546927d30daad6d7fda1541")}, + chunk{chunker.MinSize, 0, parseDigest("07854d2fef297a06ba81685e660c332de36d5d18d546927d30daad6d7fda1541")}, +} + +func testWithData(t *testing.T, chnker *chunker.Chunker, testChunks []chunk) []*chunker.Chunk { + chunks := []*chunker.Chunk{} + + pos := uint(0) + for i, chunk := range testChunks { + c, err := chnker.Next() + + if err != nil { + t.Fatalf("Error returned with chunk %d: %v", i, err) + } + + if c == nil { + t.Fatalf("Nil chunk returned") + } + + if c != nil { + if c.Start != pos { + t.Fatalf("Start for chunk %d does not match: expected %d, got %d", + i, pos, c.Start) + } + + if c.Length != chunk.Length { + t.Fatalf("Length for chunk %d does not match: expected %d, got %d", + i, chunk.Length, c.Length) + } + + if c.Cut != chunk.CutFP { + t.Fatalf("Cut fingerprint for chunk %d/%d does not match: expected %016x, got %016x", + i, len(chunks)-1, chunk.CutFP, c.Cut) + } + + if c.Digest != nil && !bytes.Equal(c.Digest, chunk.Digest) { + t.Fatalf("Digest fingerprint for chunk %d/%d does not match: expected %02x, got %02x", + i, len(chunks)-1, chunk.Digest, c.Digest) + } + + pos += c.Length + chunks = append(chunks, c) + } + } + + c, err := chnker.Next() + + if c != nil { + t.Fatal("additional non-nil chunk returned") + } + + if err != io.EOF { + t.Fatal("wrong error returned after last chunk") + } + + return chunks +} + +func getRandom(seed, count int) []byte { + buf := make([]byte, count) + + rnd := rand.New(rand.NewSource(23)) + for i := 0; i < count; i += 4 { + r := rnd.Uint32() + buf[i] = byte(r) + buf[i+1] = byte(r >> 8) + buf[i+2] = byte(r >> 16) + buf[i+3] = byte(r >> 24) + } + + return buf +} + +func TestChunker(t *testing.T) { + // setup data source + buf := getRandom(23, 32*1024*1024) + ch := chunker.New(bytes.NewReader(buf), testPol, sha256.New()) + chunks := testWithData(t, ch, chunks1) + + // test reader + for i, c := range chunks { + rd := c.Reader(bytes.NewReader(buf)) + + h := sha256.New() + n, err := io.Copy(h, rd) + if err != nil { + t.Fatalf("io.Copy(): %v", err) + } + + if uint(n) != chunks1[i].Length { + t.Fatalf("reader returned wrong number of bytes: expected %d, got %d", + chunks1[i].Length, n) + } + + d := h.Sum(nil) + if !bytes.Equal(d, chunks1[i].Digest) { + t.Fatalf("wrong hash returned: expected %02x, got %02x", + chunks1[i].Digest, d) + } + } + + // setup nullbyte data source + buf = bytes.Repeat([]byte{0}, len(chunks2)*chunker.MinSize) + ch = chunker.New(bytes.NewReader(buf), testPol, sha256.New()) + + testWithData(t, ch, chunks2) +} + +func TestChunkerWithRandomPolynomial(t *testing.T) { + // setup data source + buf := getRandom(23, 32*1024*1024) + + // generate a new random polynomial + start := time.Now() + p, err := chunker.RandomPolynomial() + OK(t, err) + t.Logf("generating random polynomial took %v", time.Since(start)) + + start = time.Now() + ch := chunker.New(bytes.NewReader(buf), p, sha256.New()) + t.Logf("creating chunker took %v", time.Since(start)) + + // make sure that first chunk is different + c, err := ch.Next() + + Assert(t, c.Cut != chunks1[0].CutFP, + "Cut point is the same") + Assert(t, c.Length != chunks1[0].Length, + "Length is the same") + Assert(t, !bytes.Equal(c.Digest, chunks1[0].Digest), + "Digest is the same") +} + +func TestChunkerWithoutHash(t *testing.T) { + // setup data source + buf := getRandom(23, 32*1024*1024) + + ch := chunker.New(bytes.NewReader(buf), testPol, nil) + chunks := testWithData(t, ch, chunks1) + + // test reader + for i, c := range chunks { + rd := c.Reader(bytes.NewReader(buf)) + + buf2, err := ioutil.ReadAll(rd) + if err != nil { + t.Fatalf("io.Copy(): %v", err) + } + + if uint(len(buf2)) != chunks1[i].Length { + t.Fatalf("reader returned wrong number of bytes: expected %d, got %d", + chunks1[i].Length, uint(len(buf2))) + } + + if uint(len(buf2)) != chunks1[i].Length { + t.Fatalf("wrong number of bytes returned: expected %02x, got %02x", + chunks[i].Length, len(buf2)) + } + + if !bytes.Equal(buf[c.Start:c.Start+c.Length], buf2) { + t.Fatalf("invalid data for chunk returned: expected %02x, got %02x", + buf[c.Start:c.Start+c.Length], buf2) + } + } + + // setup nullbyte data source + buf = bytes.Repeat([]byte{0}, len(chunks2)*chunker.MinSize) + ch = chunker.New(bytes.NewReader(buf), testPol, sha256.New()) + + testWithData(t, ch, chunks2) +} + +func benchmarkChunker(b *testing.B, hash hash.Hash) { + size := 10 * 1024 * 1024 + rd := bytes.NewReader(getRandom(23, size)) + + b.ResetTimer() + b.SetBytes(int64(size)) + + var chunks int + for i := 0; i < b.N; i++ { + chunks = 0 + + rd.Seek(0, 0) + ch := chunker.New(rd, testPol, hash) + + for { + _, err := ch.Next() + + if err == io.EOF { + break + } + + if err != nil { + b.Fatalf("Unexpected error occurred: %v", err) + } + + chunks++ + } + } + + b.Logf("%d chunks, average chunk size: %d bytes", chunks, size/chunks) +} + +func BenchmarkChunkerWithSHA256(b *testing.B) { + benchmarkChunker(b, sha256.New()) +} + +func BenchmarkChunkerWithMD5(b *testing.B) { + benchmarkChunker(b, md5.New()) +} + +func BenchmarkChunker(b *testing.B) { + benchmarkChunker(b, nil) +} + +func BenchmarkNewChunker(b *testing.B) { + p, err := chunker.RandomPolynomial() + OK(b, err) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + chunker.New(bytes.NewBuffer(nil), p, nil) + } +} diff --git a/Godeps/_workspace/src/github.com/whyrusleeping/chunker/doc.go b/Godeps/_workspace/src/github.com/whyrusleeping/chunker/doc.go new file mode 100644 index 000000000..5537c172c --- /dev/null +++ b/Godeps/_workspace/src/github.com/whyrusleeping/chunker/doc.go @@ -0,0 +1,82 @@ +// Copyright 2014 Alexander Neumann. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +/* +Package chunker implements Content Defined Chunking (CDC) based on a rolling +Rabin Checksum. + +Choosing a Random Irreducible Polynomial + +The function RandomPolynomial() returns a new random polynomial of degree 53 +for use with the chunker. The degree 53 is chosen because it is the largest +prime below 64-8 = 56, so that the top 8 bits of an uint64 can be used for +optimising calculations in the chunker. + +A random polynomial is chosen selecting 64 random bits, masking away bits +64..54 and setting bit 53 to one (otherwise the polynomial is not of the +desired degree) and bit 0 to one (otherwise the polynomial is trivially +reducible), so that 51 bits are chosen at random. + +This process is repeated until Irreducible() returns true, then this +polynomials is returned. If this doesn't happen after 1 million tries, the +function returns an error. The probability for selecting an irreducible +polynomial at random is about 7.5% ( (2^53-2)/53 / 2^51), so the probability +that no irreducible polynomial has been found after 100 tries is lower than +0.04%. + +Verifying Irreducible Polynomials + +During development the results have been verified using the computational +discrete algebra system GAP, which can be obtained from the website at +http://www.gap-system.org/. + +For filtering a given list of polynomials in hexadecimal coefficient notation, +the following script can be used: + + # create x over F_2 = GF(2) + x := Indeterminate(GF(2), "x"); + + # test if polynomial is irreducible, i.e. the number of factors is one + IrredPoly := function (poly) + return (Length(Factors(poly)) = 1); + end;; + + # create a polynomial in x from the hexadecimal representation of the + # coefficients + Hex2Poly := function (s) + return ValuePol(CoefficientsQadic(IntHexString(s), 2), x); + end;; + + # list of candidates, in hex + candidates := [ "3DA3358B4DC173" ]; + + # create real polynomials + L := List(candidates, Hex2Poly); + + # filter and display the list of irreducible polynomials contained in L + Display(Filtered(L, x -> (IrredPoly(x)))); + +All irreducible polynomials from the list are written to the output. + +Background Literature + +An introduction to Rabin Fingerprints/Checksums can be found in the following articles: + +Michael O. Rabin (1981): "Fingerprinting by Random Polynomials" +http://www.xmailserver.org/rabin.pdf + +Ross N. Williams (1993): "A Painless Guide to CRC Error Detection Algorithms" +http://www.zlib.net/crc_v3.txt + +Andrei Z. Broder (1993): "Some Applications of Rabin's Fingerprinting Method" +http://www.xmailserver.org/rabin_apps.pdf + +Shuhong Gao and Daniel Panario (1997): "Tests and Constructions of Irreducible Polynomials over Finite Fields" +http://www.math.clemson.edu/~sgao/papers/GP97a.pdf + +Andrew Kadatch, Bob Jenkins (2007): "Everything we know about CRC but afraid to forget" +http://crcutil.googlecode.com/files/crc-doc.1.0.pdf + +*/ +package chunker diff --git a/Godeps/_workspace/src/github.com/whyrusleeping/chunker/polynomials.go b/Godeps/_workspace/src/github.com/whyrusleeping/chunker/polynomials.go new file mode 100644 index 000000000..355da1095 --- /dev/null +++ b/Godeps/_workspace/src/github.com/whyrusleeping/chunker/polynomials.go @@ -0,0 +1,278 @@ +package chunker + +import ( + "crypto/rand" + "encoding/binary" + "errors" + "fmt" + "strconv" +) + +// Pol is a polynomial from F_2[X]. +type Pol uint64 + +// Add returns x+y. +func (x Pol) Add(y Pol) Pol { + r := Pol(uint64(x) ^ uint64(y)) + return r +} + +// mulOverflows returns true if the multiplication would overflow uint64. +// Code by Rob Pike, see +// https://groups.google.com/d/msg/golang-nuts/h5oSN5t3Au4/KaNQREhZh0QJ +func mulOverflows(a, b Pol) bool { + if a <= 1 || b <= 1 { + return false + } + c := a.mul(b) + d := c.Div(b) + if d != a { + return true + } + + return false +} + +func (x Pol) mul(y Pol) Pol { + if x == 0 || y == 0 { + return 0 + } + + var res Pol + for i := 0; i <= y.Deg(); i++ { + if (y & (1 << uint(i))) > 0 { + res = res.Add(x << uint(i)) + } + } + + return res +} + +// Mul returns x*y. When an overflow occurs, Mul panics. +func (x Pol) Mul(y Pol) Pol { + if mulOverflows(x, y) { + panic("multiplication would overflow uint64") + } + + return x.mul(y) +} + +// Deg returns the degree of the polynomial x. If x is zero, -1 is returned. +func (x Pol) Deg() int { + // the degree of 0 is -1 + if x == 0 { + return -1 + } + + var mask Pol = (1 << 63) + for i := 63; i >= 0; i-- { + // test if bit i is set + if x&mask > 0 { + // this is the degree of x + return i + } + mask >>= 1 + } + + // fall-through, return -1 + return -1 +} + +// String returns the coefficients in hex. +func (x Pol) String() string { + return "0x" + strconv.FormatUint(uint64(x), 16) +} + +// Expand returns the string representation of the polynomial x. +func (x Pol) Expand() string { + if x == 0 { + return "0" + } + + s := "" + for i := x.Deg(); i > 1; i-- { + if x&(1< 0 { + s += fmt.Sprintf("+x^%d", i) + } + } + + if x&2 > 0 { + s += "+x" + } + + if x&1 > 0 { + s += "+1" + } + + return s[1:] +} + +// DivMod returns x / d = q, and remainder r, +// see https://en.wikipedia.org/wiki/Division_algorithm +func (x Pol) DivMod(d Pol) (Pol, Pol) { + if x == 0 { + return 0, 0 + } + + if d == 0 { + panic("division by zero") + } + + D := d.Deg() + diff := x.Deg() - D + if diff < 0 { + return 0, x + } + + var q Pol + for diff >= 0 { + m := d << uint(diff) + q |= (1 << uint(diff)) + x = x.Add(m) + + diff = x.Deg() - D + } + + return q, x +} + +// Div returns the integer division result x / d. +func (x Pol) Div(d Pol) Pol { + q, _ := x.DivMod(d) + return q +} + +// Mod returns the remainder of x / d +func (x Pol) Mod(d Pol) Pol { + _, r := x.DivMod(d) + return r +} + +// I really dislike having a function that does not terminate, so specify a +// really large upper bound for finding a new irreducible polynomial, and +// return an error when no irreducible polynomial has been found within +// randPolMaxTries. +const randPolMaxTries = 1e6 + +// RandomPolynomial returns a new random irreducible polynomial of degree 53 +// (largest prime number below 64-8). There are (2^53-2/53) irreducible +// polynomials of degree 53 in F_2[X], c.f. Michael O. Rabin (1981): +// "Fingerprinting by Random Polynomials", page 4. If no polynomial could be +// found in one million tries, an error is returned. +func RandomPolynomial() (Pol, error) { + for i := 0; i < randPolMaxTries; i++ { + var f Pol + + // choose polynomial at random + err := binary.Read(rand.Reader, binary.LittleEndian, &f) + if err != nil { + return 0, err + } + + // mask away bits above bit 53 + f &= Pol((1 << 54) - 1) + + // set highest and lowest bit so that the degree is 53 and the + // polynomial is not trivially reducible + f |= (1 << 53) | 1 + + // test if f is irreducible + if f.Irreducible() { + return f, nil + } + } + + // If this is reached, we haven't found an irreducible polynomial in + // randPolMaxTries. This error is very unlikely to occur. + return 0, errors.New("unable to find new random irreducible polynomial") +} + +// GCD computes the Greatest Common Divisor x and f. +func (x Pol) GCD(f Pol) Pol { + if f == 0 { + return x + } + + if x == 0 { + return f + } + + if x.Deg() < f.Deg() { + x, f = f, x + } + + return f.GCD(x.Mod(f)) +} + +// Irreducible returns true iff x is irreducible over F_2. This function +// uses Ben Or's reducibility test. +// +// For details see "Tests and Constructions of Irreducible Polynomials over +// Finite Fields". +func (x Pol) Irreducible() bool { + for i := 1; i <= x.Deg()/2; i++ { + if x.GCD(qp(uint(i), x)) != 1 { + return false + } + } + + return true +} + +// MulMod computes x*f mod g +func (x Pol) MulMod(f, g Pol) Pol { + if x == 0 || f == 0 { + return 0 + } + + var res Pol + for i := 0; i <= f.Deg(); i++ { + if (f & (1 << uint(i))) > 0 { + a := x + for j := 0; j < i; j++ { + a = a.Mul(2).Mod(g) + } + res = res.Add(a).Mod(g) + } + } + + return res +} + +// qp computes the polynomial (x^(2^p)-x) mod g. This is needed for the +// reducibility test. +func qp(p uint, g Pol) Pol { + num := (1 << p) + i := 1 + + // start with x + res := Pol(2) + + for i < num { + // repeatedly square res + res = res.MulMod(res, g) + i *= 2 + } + + // add x + return res.Add(2).Mod(g) +} + +func (p Pol) MarshalJSON() ([]byte, error) { + buf := strconv.AppendUint([]byte{'"'}, uint64(p), 16) + buf = append(buf, '"') + return buf, nil +} + +func (p *Pol) UnmarshalJSON(data []byte) error { + if len(data) < 2 { + return errors.New("invalid string for polynomial") + } + n, err := strconv.ParseUint(string(data[1:len(data)-1]), 16, 64) + if err != nil { + return err + } + *p = Pol(n) + + return nil +} diff --git a/Godeps/_workspace/src/github.com/whyrusleeping/chunker/polynomials_test.go b/Godeps/_workspace/src/github.com/whyrusleeping/chunker/polynomials_test.go new file mode 100644 index 000000000..bdfadd671 --- /dev/null +++ b/Godeps/_workspace/src/github.com/whyrusleeping/chunker/polynomials_test.go @@ -0,0 +1,385 @@ +package chunker_test + +import ( + "strconv" + "testing" + + "github.com/restic/chunker" + . "github.com/restic/restic/test" +) + +var polAddTests = []struct { + x, y chunker.Pol + sum chunker.Pol +}{ + {23, 16, 23 ^ 16}, + {0x9a7e30d1e855e0a0, 0x670102a1f4bcd414, 0xfd7f32701ce934b4}, + {0x9a7e30d1e855e0a0, 0x9a7e30d1e855e0a0, 0}, +} + +func TestPolAdd(t *testing.T) { + for _, test := range polAddTests { + Equals(t, test.sum, test.x.Add(test.y)) + Equals(t, test.sum, test.y.Add(test.x)) + } +} + +func parseBin(s string) chunker.Pol { + i, err := strconv.ParseUint(s, 2, 64) + if err != nil { + panic(err) + } + + return chunker.Pol(i) +} + +var polMulTests = []struct { + x, y chunker.Pol + res chunker.Pol +}{ + {1, 2, 2}, + { + parseBin("1101"), + parseBin("10"), + parseBin("11010"), + }, + { + parseBin("1101"), + parseBin("11"), + parseBin("10111"), + }, + { + 0x40000000, + 0x40000000, + 0x1000000000000000, + }, + { + parseBin("1010"), + parseBin("100100"), + parseBin("101101000"), + }, + { + parseBin("100"), + parseBin("11"), + parseBin("1100"), + }, + { + parseBin("11"), + parseBin("110101"), + parseBin("1011111"), + }, + { + parseBin("10011"), + parseBin("110101"), + parseBin("1100001111"), + }, +} + +func TestPolMul(t *testing.T) { + for i, test := range polMulTests { + m := test.x.Mul(test.y) + Assert(t, test.res == m, + "TestPolMul failed for test %d: %v * %v: want %v, got %v", + i, test.x, test.y, test.res, m) + m = test.y.Mul(test.x) + Assert(t, test.res == test.y.Mul(test.x), + "TestPolMul failed for %d: %v * %v: want %v, got %v", + i, test.x, test.y, test.res, m) + } +} + +func TestPolMulOverflow(t *testing.T) { + defer func() { + // try to recover overflow error + err := recover() + + if e, ok := err.(string); ok && e == "multiplication would overflow uint64" { + return + } else { + t.Logf("invalid error raised: %v", err) + // re-raise error if not overflow + panic(err) + } + }() + + x := chunker.Pol(1 << 63) + x.Mul(2) + t.Fatal("overflow test did not panic") +} + +var polDivTests = []struct { + x, y chunker.Pol + res chunker.Pol +}{ + {10, 50, 0}, + {0, 1, 0}, + { + parseBin("101101000"), // 0x168 + parseBin("1010"), // 0xa + parseBin("100100"), // 0x24 + }, + {2, 2, 1}, + { + 0x8000000000000000, + 0x8000000000000000, + 1, + }, + { + parseBin("1100"), + parseBin("100"), + parseBin("11"), + }, + { + parseBin("1100001111"), + parseBin("10011"), + parseBin("110101"), + }, +} + +func TestPolDiv(t *testing.T) { + for i, test := range polDivTests { + m := test.x.Div(test.y) + Assert(t, test.res == m, + "TestPolDiv failed for test %d: %v * %v: want %v, got %v", + i, test.x, test.y, test.res, m) + } +} + +var polModTests = []struct { + x, y chunker.Pol + res chunker.Pol +}{ + {10, 50, 10}, + {0, 1, 0}, + { + parseBin("101101001"), + parseBin("1010"), + parseBin("1"), + }, + {2, 2, 0}, + { + 0x8000000000000000, + 0x8000000000000000, + 0, + }, + { + parseBin("1100"), + parseBin("100"), + parseBin("0"), + }, + { + parseBin("1100001111"), + parseBin("10011"), + parseBin("0"), + }, +} + +func TestPolModt(t *testing.T) { + for _, test := range polModTests { + Equals(t, test.res, test.x.Mod(test.y)) + } +} + +func BenchmarkPolDivMod(t *testing.B) { + f := chunker.Pol(0x2482734cacca49) + g := chunker.Pol(0x3af4b284899) + + for i := 0; i < t.N; i++ { + g.DivMod(f) + } +} + +func BenchmarkPolDiv(t *testing.B) { + f := chunker.Pol(0x2482734cacca49) + g := chunker.Pol(0x3af4b284899) + + for i := 0; i < t.N; i++ { + g.Div(f) + } +} + +func BenchmarkPolMod(t *testing.B) { + f := chunker.Pol(0x2482734cacca49) + g := chunker.Pol(0x3af4b284899) + + for i := 0; i < t.N; i++ { + g.Mod(f) + } +} + +func BenchmarkPolDeg(t *testing.B) { + f := chunker.Pol(0x3af4b284899) + d := f.Deg() + if d != 41 { + t.Fatalf("BenchmalPolDeg: Wrong degree %d returned, expected %d", + d, 41) + } + + for i := 0; i < t.N; i++ { + f.Deg() + } +} + +func TestRandomPolynomial(t *testing.T) { + _, err := chunker.RandomPolynomial() + OK(t, err) +} + +func BenchmarkRandomPolynomial(t *testing.B) { + for i := 0; i < t.N; i++ { + _, err := chunker.RandomPolynomial() + OK(t, err) + } +} + +func TestExpandPolynomial(t *testing.T) { + pol := chunker.Pol(0x3DA3358B4DC173) + s := pol.Expand() + Equals(t, "x^53+x^52+x^51+x^50+x^48+x^47+x^45+x^41+x^40+x^37+x^36+x^34+x^32+x^31+x^27+x^25+x^24+x^22+x^19+x^18+x^16+x^15+x^14+x^8+x^6+x^5+x^4+x+1", s) +} + +var polIrredTests = []struct { + f chunker.Pol + irred bool +}{ + {0x38f1e565e288df, false}, + {0x3DA3358B4DC173, true}, + {0x30a8295b9d5c91, false}, + {0x255f4350b962cb, false}, + {0x267f776110a235, false}, + {0x2f4dae10d41227, false}, + {0x2482734cacca49, true}, + {0x312daf4b284899, false}, + {0x29dfb6553d01d1, false}, + {0x3548245eb26257, false}, + {0x3199e7ef4211b3, false}, + {0x362f39017dae8b, false}, + {0x200d57aa6fdacb, false}, + {0x35e0a4efa1d275, false}, + {0x2ced55b026577f, false}, + {0x260b012010893d, false}, + {0x2df29cbcd59e9d, false}, + {0x3f2ac7488bd429, false}, + {0x3e5cb1711669fb, false}, + {0x226d8de57a9959, false}, + {0x3c8de80aaf5835, false}, + {0x2026a59efb219b, false}, + {0x39dfa4d13fb231, false}, + {0x3143d0464b3299, false}, +} + +func TestPolIrreducible(t *testing.T) { + for _, test := range polIrredTests { + Assert(t, test.f.Irreducible() == test.irred, + "Irreducibility test for Polynomial %v failed: got %v, wanted %v", + test.f, test.f.Irreducible(), test.irred) + } +} + +func BenchmarkPolIrreducible(b *testing.B) { + // find first irreducible polynomial + var pol chunker.Pol + for _, test := range polIrredTests { + if test.irred { + pol = test.f + break + } + } + + for i := 0; i < b.N; i++ { + Assert(b, pol.Irreducible(), + "Irreducibility test for Polynomial %v failed", pol) + } +} + +var polGCDTests = []struct { + f1 chunker.Pol + f2 chunker.Pol + gcd chunker.Pol +}{ + {10, 50, 2}, + {0, 1, 1}, + { + parseBin("101101001"), + parseBin("1010"), + parseBin("1"), + }, + {2, 2, 2}, + { + parseBin("1010"), + parseBin("11"), + parseBin("11"), + }, + { + 0x8000000000000000, + 0x8000000000000000, + 0x8000000000000000, + }, + { + parseBin("1100"), + parseBin("101"), + parseBin("11"), + }, + { + parseBin("1100001111"), + parseBin("10011"), + parseBin("10011"), + }, + { + 0x3DA3358B4DC173, + 0x3DA3358B4DC173, + 0x3DA3358B4DC173, + }, + { + 0x3DA3358B4DC173, + 0x230d2259defd, + 1, + }, + { + 0x230d2259defd, + 0x51b492b3eff2, + parseBin("10011"), + }, +} + +func TestPolGCD(t *testing.T) { + for i, test := range polGCDTests { + gcd := test.f1.GCD(test.f2) + Assert(t, test.gcd == gcd, + "GCD test %d (%+v) failed: got %v, wanted %v", + i, test, gcd, test.gcd) + gcd = test.f2.GCD(test.f1) + Assert(t, test.gcd == gcd, + "GCD test %d (%+v) failed: got %v, wanted %v", + i, test, gcd, test.gcd) + } +} + +var polMulModTests = []struct { + f1 chunker.Pol + f2 chunker.Pol + g chunker.Pol + mod chunker.Pol +}{ + { + 0x1230, + 0x230, + 0x55, + 0x22, + }, + { + 0x0eae8c07dbbb3026, + 0xd5d6db9de04771de, + 0xdd2bda3b77c9, + 0x425ae8595b7a, + }, +} + +func TestPolMulMod(t *testing.T) { + for i, test := range polMulModTests { + mod := test.f1.MulMod(test.f2, test.g) + Assert(t, mod == test.mod, + "MulMod test %d (%+v) failed: got %v, wanted %v", + i, test, mod, test.mod) + } +} diff --git a/core/commands/add.go b/core/commands/add.go index de5cc4d85..614305126 100644 --- a/core/commands/add.go +++ b/core/commands/add.go @@ -31,6 +31,7 @@ const ( wrapOptionName = "wrap-with-directory" hiddenOptionName = "hidden" onlyHashOptionName = "only-hash" + chunkerOptionName = "chunker" ) type AddedObject struct { @@ -61,6 +62,7 @@ remains to be implemented. cmds.BoolOption(onlyHashOptionName, "n", "Only chunk and hash - do not write to disk"), cmds.BoolOption(wrapOptionName, "w", "Wrap files with a directory object"), cmds.BoolOption(hiddenOptionName, "Include files that are hidden"), + cmds.StringOption(chunkerOptionName, "s", "chunking algorithm to use"), }, PreRun: func(req cmds.Request) error { if quiet, _, _ := req.Option(quietOptionName).Bool(); quiet { @@ -97,6 +99,7 @@ remains to be implemented. wrap, _, _ := req.Option(wrapOptionName).Bool() hash, _, _ := req.Option(onlyHashOptionName).Bool() hidden, _, _ := req.Option(hiddenOptionName).Bool() + chunker, _, _ := req.Option(chunkerOptionName).String() if hash { nilnode, err := core.NewNodeBuilder().NilRepo().Build(n.Context()) @@ -118,6 +121,7 @@ remains to be implemented. progress: progress, hidden: hidden, trickle: trickle, + chunker: chunker, } rootnd, err := addParams.addFile(file) @@ -265,24 +269,27 @@ type adder struct { progress bool hidden bool trickle bool + chunker string } // Perform the actual add & pin locally, outputting results to reader -func add(n *core.IpfsNode, reader io.Reader, useTrickle bool) (*dag.Node, error) { +func add(n *core.IpfsNode, reader io.Reader, useTrickle bool, chunker string) (*dag.Node, error) { + chnk, err := chunk.FromString(reader, chunker) + if err != nil { + return nil, err + } + var node *dag.Node - var err error if useTrickle { node, err = importer.BuildTrickleDagFromReader( - reader, n.DAG, - chunk.DefaultSplitter, + chnk, importer.PinIndirectCB(n.Pinning.GetManual()), ) } else { node, err = importer.BuildDagFromReader( - reader, n.DAG, - chunk.DefaultSplitter, + chnk, importer.PinIndirectCB(n.Pinning.GetManual()), ) } @@ -314,7 +321,7 @@ func (params *adder) addFile(file files.File) (*dag.Node, error) { reader = &progressReader{file: file, out: params.out} } - dagnode, err := add(params.node, reader, params.trickle) + dagnode, err := add(params.node, reader, params.trickle, params.chunker) if err != nil { return nil, err } diff --git a/core/corehttp/gateway_handler.go b/core/corehttp/gateway_handler.go index 671a1c5e8..550943675 100644 --- a/core/corehttp/gateway_handler.go +++ b/core/corehttp/gateway_handler.go @@ -46,7 +46,9 @@ func (i *gatewayHandler) newDagFromReader(r io.Reader) (*dag.Node, error) { // TODO(cryptix): change and remove this helper once PR1136 is merged // return ufs.AddFromReader(i.node, r.Body) return importer.BuildDagFromReader( - r, i.node.DAG, chunk.DefaultSplitter, importer.BasicPinnerCB(i.node.Pinning.GetManual())) + i.node.DAG, + chunk.DefaultSplitter(r), + importer.BasicPinnerCB(i.node.Pinning.GetManual())) } // TODO(btc): break this apart into separate handlers using a more expressive muxer diff --git a/core/coreunix/add.go b/core/coreunix/add.go index c1b9586c7..17864c9f7 100644 --- a/core/coreunix/add.go +++ b/core/coreunix/add.go @@ -25,10 +25,10 @@ var log = eventlog.Logger("coreunix") // datastore. Returns a key representing the root node. func Add(n *core.IpfsNode, r io.Reader) (string, error) { // TODO more attractive function signature importer.BuildDagFromReader + dagNode, err := importer.BuildDagFromReader( - r, n.DAG, - chunk.DefaultSplitter, + chunk.NewSizeSplitter(r, chunk.DefaultBlockSize), importer.BasicPinnerCB(n.Pinning.GetManual()), ) if err != nil { @@ -96,9 +96,8 @@ func add(n *core.IpfsNode, reader io.Reader) (*merkledag.Node, error) { mp := n.Pinning.GetManual() node, err := importer.BuildDagFromReader( - reader, n.DAG, - chunk.DefaultSplitter, + chunk.DefaultSplitter(reader), importer.PinIndirectCB(mp), ) if err != nil { diff --git a/core/coreunix/metadata_test.go b/core/coreunix/metadata_test.go index d2cb57915..d6f7635a9 100644 --- a/core/coreunix/metadata_test.go +++ b/core/coreunix/metadata_test.go @@ -38,7 +38,7 @@ func TestMetadata(t *testing.T) { data := make([]byte, 1000) u.NewTimeSeededRand().Read(data) r := bytes.NewReader(data) - nd, err := importer.BuildDagFromReader(r, ds, chunk.DefaultSplitter, nil) + nd, err := importer.BuildDagFromReader(ds, chunk.DefaultSplitter(r), nil) if err != nil { t.Fatal(err) } diff --git a/fuse/readonly/ipfs_test.go b/fuse/readonly/ipfs_test.go index 40946f567..b2a8f3a2c 100644 --- a/fuse/readonly/ipfs_test.go +++ b/fuse/readonly/ipfs_test.go @@ -36,7 +36,7 @@ func randObj(t *testing.T, nd *core.IpfsNode, size int64) (*dag.Node, []byte) { buf := make([]byte, size) u.NewTimeSeededRand().Read(buf) read := bytes.NewReader(buf) - obj, err := importer.BuildTrickleDagFromReader(read, nd.DAG, chunk.DefaultSplitter, nil) + obj, err := importer.BuildTrickleDagFromReader(nd.DAG, chunk.DefaultSplitter(read), nil) if err != nil { t.Fatal(err) } diff --git a/importer/balanced/balanced_test.go b/importer/balanced/balanced_test.go index ee36383e7..446577075 100644 --- a/importer/balanced/balanced_test.go +++ b/importer/balanced/balanced_test.go @@ -12,23 +12,38 @@ import ( "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" chunk "github.com/ipfs/go-ipfs/importer/chunk" h "github.com/ipfs/go-ipfs/importer/helpers" - merkledag "github.com/ipfs/go-ipfs/merkledag" + dag "github.com/ipfs/go-ipfs/merkledag" mdtest "github.com/ipfs/go-ipfs/merkledag/test" pin "github.com/ipfs/go-ipfs/pin" uio "github.com/ipfs/go-ipfs/unixfs/io" u "github.com/ipfs/go-ipfs/util" ) -func buildTestDag(r io.Reader, ds merkledag.DAGService, spl chunk.BlockSplitter) (*merkledag.Node, error) { +// TODO: extract these tests and more as a generic layout test suite + +func buildTestDag(ds dag.DAGService, spl chunk.Splitter) (*dag.Node, error) { // Start the splitter - blkch := spl.Split(r) + blkch, errs := chunk.Chan(spl) dbp := h.DagBuilderParams{ Dagserv: ds, Maxlinks: h.DefaultLinksPerBlock, } - return BalancedLayout(dbp.New(blkch)) + return BalancedLayout(dbp.New(blkch, errs)) +} + +func getTestDag(t *testing.T, ds dag.DAGService, size int64, blksize int64) (*dag.Node, []byte) { + data := make([]byte, size) + u.NewTimeSeededRand().Read(data) + r := bytes.NewReader(data) + + nd, err := buildTestDag(ds, chunk.NewSizeSplitter(r, blksize)) + if err != nil { + t.Fatal(err) + } + + return nd, data } //Test where calls to read are smaller than the chunk size @@ -36,9 +51,10 @@ func TestSizeBasedSplit(t *testing.T) { if testing.Short() { t.SkipNow() } - bs := &chunk.SizeSplitter{Size: 512} + + bs := chunk.SizeSplitterGen(512) testFileConsistency(t, bs, 32*512) - bs = &chunk.SizeSplitter{Size: 4096} + bs = chunk.SizeSplitterGen(4096) testFileConsistency(t, bs, 32*4096) // Uneven offset @@ -51,13 +67,13 @@ func dup(b []byte) []byte { return o } -func testFileConsistency(t *testing.T, bs chunk.BlockSplitter, nbytes int) { +func testFileConsistency(t *testing.T, bs chunk.SplitterGen, nbytes int) { should := make([]byte, nbytes) u.NewTimeSeededRand().Read(should) read := bytes.NewReader(should) ds := mdtest.Mock(t) - nd, err := buildTestDag(read, ds, bs) + nd, err := buildTestDag(ds, bs(read)) if err != nil { t.Fatal(err) } @@ -79,15 +95,9 @@ func testFileConsistency(t *testing.T, bs chunk.BlockSplitter, nbytes int) { } func TestBuilderConsistency(t *testing.T) { - nbytes := 100000 - buf := new(bytes.Buffer) - io.CopyN(buf, u.NewTimeSeededRand(), int64(nbytes)) - should := dup(buf.Bytes()) dagserv := mdtest.Mock(t) - nd, err := buildTestDag(buf, dagserv, chunk.DefaultSplitter) - if err != nil { - t.Fatal(err) - } + nd, should := getTestDag(t, dagserv, 100000, chunk.DefaultBlockSize) + r, err := uio.NewDagReader(context.Background(), nd, dagserv) if err != nil { t.Fatal(err) @@ -116,50 +126,14 @@ func arrComp(a, b []byte) error { return nil } -func TestMaybeRabinConsistency(t *testing.T) { - if testing.Short() { - t.SkipNow() - } - testFileConsistency(t, chunk.NewMaybeRabin(4096), 256*4096) -} - -func TestRabinBlockSize(t *testing.T) { - if testing.Short() { - t.SkipNow() - } - buf := new(bytes.Buffer) - nbytes := 1024 * 1024 - io.CopyN(buf, u.NewTimeSeededRand(), int64(nbytes)) - rab := chunk.NewMaybeRabin(4096) - blkch := rab.Split(buf) - - var blocks [][]byte - for b := range blkch { - blocks = append(blocks, b) - } - - fmt.Printf("Avg block size: %d\n", nbytes/len(blocks)) - -} - type dagservAndPinner struct { - ds merkledag.DAGService + ds dag.DAGService mp pin.ManualPinner } func TestIndirectBlocks(t *testing.T) { - splitter := &chunk.SizeSplitter{512} - nbytes := 1024 * 1024 - buf := make([]byte, nbytes) - u.NewTimeSeededRand().Read(buf) - - read := bytes.NewReader(buf) - ds := mdtest.Mock(t) - dag, err := buildTestDag(read, ds, splitter) - if err != nil { - t.Fatal(err) - } + dag, buf := getTestDag(t, ds, 1024*1024, 512) reader, err := uio.NewDagReader(context.Background(), dag, ds) if err != nil { @@ -178,15 +152,8 @@ func TestIndirectBlocks(t *testing.T) { func TestSeekingBasic(t *testing.T) { nbytes := int64(10 * 1024) - should := make([]byte, nbytes) - u.NewTimeSeededRand().Read(should) - - read := bytes.NewReader(should) ds := mdtest.Mock(t) - nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500}) - if err != nil { - t.Fatal(err) - } + nd, should := getTestDag(t, ds, nbytes, 500) rs, err := uio.NewDagReader(context.Background(), nd, ds) if err != nil { @@ -214,16 +181,8 @@ func TestSeekingBasic(t *testing.T) { } func TestSeekToBegin(t *testing.T) { - nbytes := int64(10 * 1024) - should := make([]byte, nbytes) - u.NewTimeSeededRand().Read(should) - - read := bytes.NewReader(should) ds := mdtest.Mock(t) - nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500}) - if err != nil { - t.Fatal(err) - } + nd, should := getTestDag(t, ds, 10*1024, 500) rs, err := uio.NewDagReader(context.Background(), nd, ds) if err != nil { @@ -258,16 +217,8 @@ func TestSeekToBegin(t *testing.T) { } func TestSeekToAlmostBegin(t *testing.T) { - nbytes := int64(10 * 1024) - should := make([]byte, nbytes) - u.NewTimeSeededRand().Read(should) - - read := bytes.NewReader(should) ds := mdtest.Mock(t) - nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500}) - if err != nil { - t.Fatal(err) - } + nd, should := getTestDag(t, ds, 10*1024, 500) rs, err := uio.NewDagReader(context.Background(), nd, ds) if err != nil { @@ -303,15 +254,8 @@ func TestSeekToAlmostBegin(t *testing.T) { func TestSeekEnd(t *testing.T) { nbytes := int64(50 * 1024) - should := make([]byte, nbytes) - u.NewTimeSeededRand().Read(should) - - read := bytes.NewReader(should) ds := mdtest.Mock(t) - nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500}) - if err != nil { - t.Fatal(err) - } + nd, _ := getTestDag(t, ds, nbytes, 500) rs, err := uio.NewDagReader(context.Background(), nd, ds) if err != nil { @@ -329,15 +273,8 @@ func TestSeekEnd(t *testing.T) { func TestSeekEndSingleBlockFile(t *testing.T) { nbytes := int64(100) - should := make([]byte, nbytes) - u.NewTimeSeededRand().Read(should) - - read := bytes.NewReader(should) ds := mdtest.Mock(t) - nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{5000}) - if err != nil { - t.Fatal(err) - } + nd, _ := getTestDag(t, ds, nbytes, 5000) rs, err := uio.NewDagReader(context.Background(), nd, ds) if err != nil { @@ -355,15 +292,8 @@ func TestSeekEndSingleBlockFile(t *testing.T) { func TestSeekingStress(t *testing.T) { nbytes := int64(1024 * 1024) - should := make([]byte, nbytes) - u.NewTimeSeededRand().Read(should) - - read := bytes.NewReader(should) ds := mdtest.Mock(t) - nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{1000}) - if err != nil { - t.Fatal(err) - } + nd, should := getTestDag(t, ds, nbytes, 1000) rs, err := uio.NewDagReader(context.Background(), nd, ds) if err != nil { @@ -400,15 +330,8 @@ func TestSeekingStress(t *testing.T) { func TestSeekingConsistency(t *testing.T) { nbytes := int64(128 * 1024) - should := make([]byte, nbytes) - u.NewTimeSeededRand().Read(should) - - read := bytes.NewReader(should) ds := mdtest.Mock(t) - nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500}) - if err != nil { - t.Fatal(err) - } + nd, should := getTestDag(t, ds, nbytes, 500) rs, err := uio.NewDagReader(context.Background(), nd, ds) if err != nil { diff --git a/importer/chunk/parse.go b/importer/chunk/parse.go new file mode 100644 index 000000000..55e96cc04 --- /dev/null +++ b/importer/chunk/parse.go @@ -0,0 +1,76 @@ +package chunk + +import ( + "errors" + "fmt" + "io" + "strconv" + "strings" +) + +func FromString(r io.Reader, chunker string) (Splitter, error) { + switch { + case chunker == "" || chunker == "default": + return NewSizeSplitter(r, DefaultBlockSize), nil + + case strings.HasPrefix(chunker, "size-"): + sizeStr := strings.Split(chunker, "-")[1] + size, err := strconv.Atoi(sizeStr) + if err != nil { + return nil, err + } + return NewSizeSplitter(r, int64(size)), nil + + case strings.HasPrefix(chunker, "rabin"): + return parseRabinString(r, chunker) + + default: + return nil, fmt.Errorf("unrecognized chunker option: %s", chunker) + } +} + +func parseRabinString(r io.Reader, chunker string) (Splitter, error) { + parts := strings.Split(chunker, "-") + switch len(parts) { + case 1: + return NewRabin(r, uint64(DefaultBlockSize)), nil + case 2: + size, err := strconv.Atoi(parts[1]) + if err != nil { + return nil, err + } + return NewRabin(r, uint64(size)), nil + case 4: + sub := strings.Split(parts[1], ":") + if len(sub) > 1 && sub[0] != "min" { + return nil, errors.New("first label must be min") + } + min, err := strconv.Atoi(sub[len(sub)-1]) + if err != nil { + return nil, err + } + + sub = strings.Split(parts[2], ":") + if len(sub) > 1 && sub[0] != "avg" { + log.Error("sub == ", sub) + return nil, errors.New("second label must be avg") + } + avg, err := strconv.Atoi(sub[len(sub)-1]) + if err != nil { + return nil, err + } + + sub = strings.Split(parts[3], ":") + if len(sub) > 1 && sub[0] != "max" { + return nil, errors.New("final label must be max") + } + max, err := strconv.Atoi(sub[len(sub)-1]) + if err != nil { + return nil, err + } + + return NewRabinMinMax(r, uint64(min), uint64(avg), uint64(max)), nil + default: + return nil, errors.New("incorrect format (expected 'rabin' 'rabin-[avg]' or 'rabin-[min]-[avg]-[max]'") + } +} diff --git a/importer/chunk/rabin.go b/importer/chunk/rabin.go index fbfb4cec4..de68ae079 100644 --- a/importer/chunk/rabin.go +++ b/importer/chunk/rabin.go @@ -1,94 +1,39 @@ package chunk import ( - "bufio" - "bytes" - "fmt" + "hash/fnv" "io" - "math" + + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/whyrusleeping/chunker" ) -type MaybeRabin struct { - mask int - windowSize int - MinBlockSize int - MaxBlockSize int +var IpfsRabinPoly = chunker.Pol(17437180132763653) + +type Rabin struct { + r *chunker.Chunker } -func NewMaybeRabin(avgBlkSize int) *MaybeRabin { - blkbits := uint(math.Log2(float64(avgBlkSize))) - rb := new(MaybeRabin) - rb.mask = (1 << blkbits) - 1 - rb.windowSize = 16 // probably a good number... - rb.MinBlockSize = avgBlkSize / 2 - rb.MaxBlockSize = (avgBlkSize / 2) * 3 - return rb +func NewRabin(r io.Reader, avgBlkSize uint64) *Rabin { + min := avgBlkSize / 3 + max := avgBlkSize + (avgBlkSize / 2) + + return NewRabinMinMax(r, avgBlkSize, min, max) } -func (mr *MaybeRabin) Split(r io.Reader) chan []byte { - out := make(chan []byte, 16) - go func() { - inbuf := bufio.NewReader(r) - blkbuf := new(bytes.Buffer) +func NewRabinMinMax(r io.Reader, min, avg, max uint64) *Rabin { + h := fnv.New32a() + ch := chunker.New(r, IpfsRabinPoly, h, avg, min, max) - // some bullshit numbers i made up - a := 10 // honestly, no idea what this is - MOD := 33554383 // randomly chosen (seriously) - an := 1 - rollingHash := 0 - - // Window is a circular buffer - window := make([]byte, mr.windowSize) - push := func(i int, val byte) (outval int) { - outval = int(window[i%len(window)]) - window[i%len(window)] = val - return - } - - // Duplicate byte slice - dup := func(b []byte) []byte { - d := make([]byte, len(b)) - copy(d, b) - return d - } - - // Fill up the window - i := 0 - for ; i < mr.windowSize; i++ { - b, err := inbuf.ReadByte() - if err != nil { - fmt.Println(err) - return - } - blkbuf.WriteByte(b) - push(i, b) - rollingHash = (rollingHash*a + int(b)) % MOD - an = (an * a) % MOD - } - - for ; true; i++ { - b, err := inbuf.ReadByte() - if err != nil { - break - } - outval := push(i, b) - blkbuf.WriteByte(b) - rollingHash = (rollingHash*a + int(b) - an*outval) % MOD - if (rollingHash&mr.mask == mr.mask && blkbuf.Len() > mr.MinBlockSize) || - blkbuf.Len() >= mr.MaxBlockSize { - out <- dup(blkbuf.Bytes()) - blkbuf.Reset() - } - - // Check if there are enough remaining - peek, err := inbuf.Peek(mr.windowSize) - if err != nil || len(peek) != mr.windowSize { - break - } - } - io.Copy(blkbuf, inbuf) - out <- blkbuf.Bytes() - close(out) - }() - return out + return &Rabin{ + r: ch, + } +} + +func (r *Rabin) NextBytes() ([]byte, error) { + ch, err := r.r.Next() + if err != nil { + return nil, err + } + + return ch.Data, nil } diff --git a/importer/chunk/rabin_test.go b/importer/chunk/rabin_test.go new file mode 100644 index 000000000..596f2f63e --- /dev/null +++ b/importer/chunk/rabin_test.go @@ -0,0 +1,84 @@ +package chunk + +import ( + "bytes" + "fmt" + "github.com/ipfs/go-ipfs/blocks" + "github.com/ipfs/go-ipfs/blocks/key" + "github.com/ipfs/go-ipfs/util" + "io" + "testing" +) + +func TestRabinChunking(t *testing.T) { + data := make([]byte, 1024*1024*16) + util.NewTimeSeededRand().Read(data) + + r := NewRabin(bytes.NewReader(data), 1024*256) + + var chunks [][]byte + + for { + chunk, err := r.NextBytes() + if err != nil { + if err == io.EOF { + break + } + t.Fatal(err) + } + + chunks = append(chunks, chunk) + } + + fmt.Printf("average block size: %d\n", len(data)/len(chunks)) + + unchunked := bytes.Join(chunks, nil) + if !bytes.Equal(unchunked, data) { + fmt.Printf("%d %d\n", len(unchunked), len(data)) + t.Fatal("data was chunked incorrectly") + } +} + +func chunkData(t *testing.T, data []byte) map[key.Key]*blocks.Block { + r := NewRabin(bytes.NewReader(data), 1024*256) + + blkmap := make(map[key.Key]*blocks.Block) + + for { + blk, err := r.NextBytes() + if err != nil { + if err == io.EOF { + break + } + t.Fatal(err) + } + + b := blocks.NewBlock(blk) + blkmap[b.Key()] = b + } + + return blkmap +} + +func TestRabinChunkReuse(t *testing.T) { + data := make([]byte, 1024*1024*16) + util.NewTimeSeededRand().Read(data) + + ch1 := chunkData(t, data[1000:]) + ch2 := chunkData(t, data) + + var extra int + for k, _ := range ch2 { + _, ok := ch1[k] + if !ok { + extra++ + } + } + + if extra > 2 { + t.Fatal("too many spare chunks made") + } + if extra == 2 { + t.Log("why did we get two extra blocks?") + } +} diff --git a/importer/chunk/splitting.go b/importer/chunk/splitting.go index 999ed367f..960947245 100644 --- a/importer/chunk/splitting.go +++ b/importer/chunk/splitting.go @@ -9,39 +9,71 @@ import ( var log = util.Logger("chunk") -var DefaultBlockSize = 1024 * 256 -var DefaultSplitter = &SizeSplitter{Size: DefaultBlockSize} +var DefaultBlockSize int64 = 1024 * 256 -type BlockSplitter interface { - Split(r io.Reader) chan []byte +type Splitter interface { + NextBytes() ([]byte, error) } -type SizeSplitter struct { - Size int +type SplitterGen func(r io.Reader) Splitter + +func DefaultSplitter(r io.Reader) Splitter { + return NewSizeSplitter(r, DefaultBlockSize) } -func (ss *SizeSplitter) Split(r io.Reader) chan []byte { +func SizeSplitterGen(size int64) SplitterGen { + return func(r io.Reader) Splitter { + return NewSizeSplitter(r, size) + } +} + +func Chan(s Splitter) (<-chan []byte, <-chan error) { out := make(chan []byte) + errs := make(chan error, 1) go func() { defer close(out) + defer close(errs) // all-chunks loop (keep creating chunks) for { - // log.Infof("making chunk with size: %d", ss.Size) - chunk := make([]byte, ss.Size) - nread, err := io.ReadFull(r, chunk) - if nread > 0 { - // log.Infof("sending out chunk with size: %d", sofar) - out <- chunk[:nread] - } - if err == io.EOF || err == io.ErrUnexpectedEOF { - return - } + b, err := s.NextBytes() if err != nil { - log.Debugf("Block split error: %s", err) + errs <- err return } + + out <- b } }() - return out + return out, errs +} + +type sizeSplitterv2 struct { + r io.Reader + size int64 + err error +} + +func NewSizeSplitter(r io.Reader, size int64) Splitter { + return &sizeSplitterv2{ + r: r, + size: size, + } +} + +func (ss *sizeSplitterv2) NextBytes() ([]byte, error) { + if ss.err != nil { + return nil, ss.err + } + buf := make([]byte, ss.size) + n, err := io.ReadFull(ss.r, buf) + if err == io.ErrUnexpectedEOF { + ss.err = io.EOF + err = nil + } + if err != nil { + return nil, err + } + + return buf[:n], nil } diff --git a/importer/chunk/splitting_test.go b/importer/chunk/splitting_test.go index 232b4fde9..27b2a7b7a 100644 --- a/importer/chunk/splitting_test.go +++ b/importer/chunk/splitting_test.go @@ -32,8 +32,8 @@ func TestSizeSplitterIsDeterministic(t *testing.T) { bufA := copyBuf(bufR) bufB := copyBuf(bufR) - chunksA := DefaultSplitter.Split(bytes.NewReader(bufA)) - chunksB := DefaultSplitter.Split(bytes.NewReader(bufB)) + chunksA, _ := Chan(DefaultSplitter(bytes.NewReader(bufA))) + chunksB, _ := Chan(DefaultSplitter(bytes.NewReader(bufB))) for n := 0; ; n++ { a, moreA := <-chunksA @@ -65,8 +65,8 @@ func TestSizeSplitterFillsChunks(t *testing.T) { max := 10000000 b := randBuf(t, max) r := &clipReader{r: bytes.NewReader(b), size: 4000} - s := SizeSplitter{Size: 1024 * 256} - c := s.Split(r) + chunksize := int64(1024 * 256) + c, _ := Chan(NewSizeSplitter(r, chunksize)) sofar := 0 whole := make([]byte, max) @@ -80,7 +80,7 @@ func TestSizeSplitterFillsChunks(t *testing.T) { copy(whole[sofar:], chunk) sofar += len(chunk) - if sofar != max && len(chunk) < s.Size { + if sofar != max && len(chunk) < int(chunksize) { t.Fatal("sizesplitter split at a smaller size") } } diff --git a/importer/helpers/dagbuilder.go b/importer/helpers/dagbuilder.go index a1b00c423..40617fdc2 100644 --- a/importer/helpers/dagbuilder.go +++ b/importer/helpers/dagbuilder.go @@ -19,6 +19,8 @@ type DagBuilderHelper struct { dserv dag.DAGService mp pin.ManualPinner in <-chan []byte + errs <-chan error + recvdErr error nextData []byte // the next item to return. maxlinks int ncb NodeCB @@ -39,7 +41,7 @@ type DagBuilderParams struct { // Generate a new DagBuilderHelper from the given params, using 'in' as a // data source -func (dbp *DagBuilderParams) New(in <-chan []byte) *DagBuilderHelper { +func (dbp *DagBuilderParams) New(in <-chan []byte, errs <-chan error) *DagBuilderHelper { ncb := dbp.NodeCB if ncb == nil { ncb = nilFunc @@ -48,6 +50,7 @@ func (dbp *DagBuilderParams) New(in <-chan []byte) *DagBuilderHelper { return &DagBuilderHelper{ dserv: dbp.Dagserv, in: in, + errs: errs, maxlinks: dbp.Maxlinks, ncb: ncb, batch: dbp.Dagserv.Batch(), diff --git a/importer/importer.go b/importer/importer.go index f499b190a..cc5940d68 100644 --- a/importer/importer.go +++ b/importer/importer.go @@ -4,7 +4,6 @@ package importer import ( "fmt" - "io" "os" bal "github.com/ipfs/go-ipfs/importer/balanced" @@ -36,12 +35,12 @@ func BuildDagFromFile(fpath string, ds dag.DAGService, mp pin.ManualPinner) (*da } defer f.Close() - return BuildDagFromReader(f, ds, chunk.DefaultSplitter, BasicPinnerCB(mp)) + return BuildDagFromReader(ds, chunk.NewSizeSplitter(f, chunk.DefaultBlockSize), BasicPinnerCB(mp)) } -func BuildDagFromReader(r io.Reader, ds dag.DAGService, spl chunk.BlockSplitter, ncb h.NodeCB) (*dag.Node, error) { +func BuildDagFromReader(ds dag.DAGService, spl chunk.Splitter, ncb h.NodeCB) (*dag.Node, error) { // Start the splitter - blkch := spl.Split(r) + blkch, errch := chunk.Chan(spl) dbp := h.DagBuilderParams{ Dagserv: ds, @@ -49,12 +48,12 @@ func BuildDagFromReader(r io.Reader, ds dag.DAGService, spl chunk.BlockSplitter, NodeCB: ncb, } - return bal.BalancedLayout(dbp.New(blkch)) + return bal.BalancedLayout(dbp.New(blkch, errch)) } -func BuildTrickleDagFromReader(r io.Reader, ds dag.DAGService, spl chunk.BlockSplitter, ncb h.NodeCB) (*dag.Node, error) { +func BuildTrickleDagFromReader(ds dag.DAGService, spl chunk.Splitter, ncb h.NodeCB) (*dag.Node, error) { // Start the splitter - blkch := spl.Split(r) + blkch, errch := chunk.Chan(spl) dbp := h.DagBuilderParams{ Dagserv: ds, @@ -62,7 +61,7 @@ func BuildTrickleDagFromReader(r io.Reader, ds dag.DAGService, spl chunk.BlockSp NodeCB: ncb, } - return trickle.TrickleLayout(dbp.New(blkch)) + return trickle.TrickleLayout(dbp.New(blkch, errch)) } func BasicPinnerCB(p pin.ManualPinner) h.NodeCB { diff --git a/importer/importer_test.go b/importer/importer_test.go index 3641fb1b0..445aba0d1 100644 --- a/importer/importer_test.go +++ b/importer/importer_test.go @@ -14,20 +14,20 @@ import ( u "github.com/ipfs/go-ipfs/util" ) -func getBalancedDag(t testing.TB, size int64, blksize int) (*dag.Node, dag.DAGService) { +func getBalancedDag(t testing.TB, size int64, blksize int64) (*dag.Node, dag.DAGService) { ds := mdtest.Mock(t) r := io.LimitReader(u.NewTimeSeededRand(), size) - nd, err := BuildDagFromReader(r, ds, &chunk.SizeSplitter{blksize}, nil) + nd, err := BuildDagFromReader(ds, chunk.NewSizeSplitter(r, blksize), nil) if err != nil { t.Fatal(err) } return nd, ds } -func getTrickleDag(t testing.TB, size int64, blksize int) (*dag.Node, dag.DAGService) { +func getTrickleDag(t testing.TB, size int64, blksize int64) (*dag.Node, dag.DAGService) { ds := mdtest.Mock(t) r := io.LimitReader(u.NewTimeSeededRand(), size) - nd, err := BuildTrickleDagFromReader(r, ds, &chunk.SizeSplitter{blksize}, nil) + nd, err := BuildTrickleDagFromReader(ds, chunk.NewSizeSplitter(r, blksize), nil) if err != nil { t.Fatal(err) } @@ -40,7 +40,7 @@ func TestBalancedDag(t *testing.T) { u.NewTimeSeededRand().Read(buf) r := bytes.NewReader(buf) - nd, err := BuildDagFromReader(r, ds, chunk.DefaultSplitter, nil) + nd, err := BuildDagFromReader(ds, chunk.DefaultSplitter(r), nil) if err != nil { t.Fatal(err) } diff --git a/importer/trickle/trickle_test.go b/importer/trickle/trickle_test.go index 1af5d222f..0eb440b6e 100644 --- a/importer/trickle/trickle_test.go +++ b/importer/trickle/trickle_test.go @@ -20,16 +20,16 @@ import ( u "github.com/ipfs/go-ipfs/util" ) -func buildTestDag(r io.Reader, ds merkledag.DAGService, spl chunk.BlockSplitter) (*merkledag.Node, error) { +func buildTestDag(ds merkledag.DAGService, spl chunk.Splitter) (*merkledag.Node, error) { // Start the splitter - blkch := spl.Split(r) + blkch, errs := chunk.Chan(spl) dbp := h.DagBuilderParams{ Dagserv: ds, Maxlinks: h.DefaultLinksPerBlock, } - nd, err := TrickleLayout(dbp.New(blkch)) + nd, err := TrickleLayout(dbp.New(blkch, errs)) if err != nil { return nil, err } @@ -42,9 +42,10 @@ func TestSizeBasedSplit(t *testing.T) { if testing.Short() { t.SkipNow() } - bs := &chunk.SizeSplitter{Size: 512} + bs := chunk.SizeSplitterGen(512) testFileConsistency(t, bs, 32*512) - bs = &chunk.SizeSplitter{Size: 4096} + + bs = chunk.SizeSplitterGen(4096) testFileConsistency(t, bs, 32*4096) // Uneven offset @@ -57,13 +58,13 @@ func dup(b []byte) []byte { return o } -func testFileConsistency(t *testing.T, bs chunk.BlockSplitter, nbytes int) { +func testFileConsistency(t *testing.T, bs chunk.SplitterGen, nbytes int) { should := make([]byte, nbytes) u.NewTimeSeededRand().Read(should) read := bytes.NewReader(should) ds := mdtest.Mock(t) - nd, err := buildTestDag(read, ds, bs) + nd, err := buildTestDag(ds, bs(read)) if err != nil { t.Fatal(err) } @@ -90,7 +91,7 @@ func TestBuilderConsistency(t *testing.T) { io.CopyN(buf, u.NewTimeSeededRand(), int64(nbytes)) should := dup(buf.Bytes()) dagserv := mdtest.Mock(t) - nd, err := buildTestDag(buf, dagserv, chunk.DefaultSplitter) + nd, err := buildTestDag(dagserv, chunk.DefaultSplitter(buf)) if err != nil { t.Fatal(err) } @@ -122,39 +123,13 @@ func arrComp(a, b []byte) error { return nil } -func TestMaybeRabinConsistency(t *testing.T) { - if testing.Short() { - t.SkipNow() - } - testFileConsistency(t, chunk.NewMaybeRabin(4096), 256*4096) -} - -func TestRabinBlockSize(t *testing.T) { - if testing.Short() { - t.SkipNow() - } - buf := new(bytes.Buffer) - nbytes := 1024 * 1024 - io.CopyN(buf, u.NewTimeSeededRand(), int64(nbytes)) - rab := chunk.NewMaybeRabin(4096) - blkch := rab.Split(buf) - - var blocks [][]byte - for b := range blkch { - blocks = append(blocks, b) - } - - fmt.Printf("Avg block size: %d\n", nbytes/len(blocks)) - -} - type dagservAndPinner struct { ds merkledag.DAGService mp pin.ManualPinner } func TestIndirectBlocks(t *testing.T) { - splitter := &chunk.SizeSplitter{512} + splitter := chunk.SizeSplitterGen(512) nbytes := 1024 * 1024 buf := make([]byte, nbytes) u.NewTimeSeededRand().Read(buf) @@ -162,7 +137,7 @@ func TestIndirectBlocks(t *testing.T) { read := bytes.NewReader(buf) ds := mdtest.Mock(t) - dag, err := buildTestDag(read, ds, splitter) + dag, err := buildTestDag(ds, splitter(read)) if err != nil { t.Fatal(err) } @@ -189,7 +164,7 @@ func TestSeekingBasic(t *testing.T) { read := bytes.NewReader(should) ds := mdtest.Mock(t) - nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500}) + nd, err := buildTestDag(ds, chunk.NewSizeSplitter(read, 512)) if err != nil { t.Fatal(err) } @@ -226,7 +201,7 @@ func TestSeekToBegin(t *testing.T) { read := bytes.NewReader(should) ds := mdtest.Mock(t) - nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500}) + nd, err := buildTestDag(ds, chunk.NewSizeSplitter(read, 500)) if err != nil { t.Fatal(err) } @@ -270,7 +245,7 @@ func TestSeekToAlmostBegin(t *testing.T) { read := bytes.NewReader(should) ds := mdtest.Mock(t) - nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500}) + nd, err := buildTestDag(ds, chunk.NewSizeSplitter(read, 500)) if err != nil { t.Fatal(err) } @@ -314,7 +289,7 @@ func TestSeekEnd(t *testing.T) { read := bytes.NewReader(should) ds := mdtest.Mock(t) - nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500}) + nd, err := buildTestDag(ds, chunk.NewSizeSplitter(read, 500)) if err != nil { t.Fatal(err) } @@ -340,7 +315,7 @@ func TestSeekEndSingleBlockFile(t *testing.T) { read := bytes.NewReader(should) ds := mdtest.Mock(t) - nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{5000}) + nd, err := buildTestDag(ds, chunk.NewSizeSplitter(read, 5000)) if err != nil { t.Fatal(err) } @@ -366,7 +341,7 @@ func TestSeekingStress(t *testing.T) { read := bytes.NewReader(should) ds := mdtest.Mock(t) - nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{1000}) + nd, err := buildTestDag(ds, chunk.NewSizeSplitter(read, 1000)) if err != nil { t.Fatal(err) } @@ -411,7 +386,7 @@ func TestSeekingConsistency(t *testing.T) { read := bytes.NewReader(should) ds := mdtest.Mock(t) - nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500}) + nd, err := buildTestDag(ds, chunk.NewSizeSplitter(read, 500)) if err != nil { t.Fatal(err) } @@ -455,7 +430,7 @@ func TestAppend(t *testing.T) { // Reader for half the bytes read := bytes.NewReader(should[:nbytes/2]) ds := mdtest.Mock(t) - nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500}) + nd, err := buildTestDag(ds, chunk.NewSizeSplitter(read, 500)) if err != nil { t.Fatal(err) } @@ -465,10 +440,10 @@ func TestAppend(t *testing.T) { Maxlinks: h.DefaultLinksPerBlock, } - spl := &chunk.SizeSplitter{500} - blks := spl.Split(bytes.NewReader(should[nbytes/2:])) + r := bytes.NewReader(should[nbytes/2:]) + blks, errs := chunk.Chan(chunk.NewSizeSplitter(r, 500)) - nnode, err := TrickleAppend(nd, dbp.New(blks)) + nnode, err := TrickleAppend(nd, dbp.New(blks, errs)) if err != nil { t.Fatal(err) } @@ -504,7 +479,7 @@ func TestMultipleAppends(t *testing.T) { u.NewTimeSeededRand().Read(should) read := bytes.NewReader(nil) - nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500}) + nd, err := buildTestDag(ds, chunk.NewSizeSplitter(read, 500)) if err != nil { t.Fatal(err) } @@ -514,12 +489,12 @@ func TestMultipleAppends(t *testing.T) { Maxlinks: 4, } - spl := &chunk.SizeSplitter{500} + spl := chunk.SizeSplitterGen(500) for i := 0; i < len(should); i++ { - blks := spl.Split(bytes.NewReader(should[i : i+1])) + blks, errs := chunk.Chan(spl(bytes.NewReader(should[i : i+1]))) - nnode, err := TrickleAppend(nd, dbp.New(blks)) + nnode, err := TrickleAppend(nd, dbp.New(blks, errs)) if err != nil { t.Fatal(err) } @@ -559,18 +534,18 @@ func TestAppendSingleBytesToEmpty(t *testing.T) { Maxlinks: 4, } - spl := &chunk.SizeSplitter{500} + spl := chunk.SizeSplitterGen(500) - blks := spl.Split(bytes.NewReader(data[:1])) + blks, errs := chunk.Chan(spl(bytes.NewReader(data[:1]))) - nnode, err := TrickleAppend(nd, dbp.New(blks)) + nnode, err := TrickleAppend(nd, dbp.New(blks, errs)) if err != nil { t.Fatal(err) } - blks = spl.Split(bytes.NewReader(data[1:])) + blks, errs = chunk.Chan(spl(bytes.NewReader(data[1:]))) - nnode, err = TrickleAppend(nnode, dbp.New(blks)) + nnode, err = TrickleAppend(nnode, dbp.New(blks, errs)) if err != nil { t.Fatal(err) } diff --git a/merkledag/merkledag_test.go b/merkledag/merkledag_test.go index fc110bfd7..4a87f6d84 100644 --- a/merkledag/merkledag_test.go +++ b/merkledag/merkledag_test.go @@ -163,9 +163,9 @@ func runBatchFetchTest(t *testing.T, read io.Reader) { dagservs = append(dagservs, NewDAGService(bsi)) } - spl := &chunk.SizeSplitter{512} + spl := chunk.NewSizeSplitter(read, 512) - root, err := imp.BuildDagFromReader(read, dagservs[0], spl, nil) + root, err := imp.BuildDagFromReader(dagservs[0], spl, nil) if err != nil { t.Fatal(err) } diff --git a/unixfs/mod/dagmodifier.go b/unixfs/mod/dagmodifier.go index 48374c10b..be7d92248 100644 --- a/unixfs/mod/dagmodifier.go +++ b/unixfs/mod/dagmodifier.go @@ -40,7 +40,7 @@ type DagModifier struct { curNode *mdag.Node mp pin.ManualPinner - splitter chunk.BlockSplitter + splitter chunk.SplitterGen ctx context.Context readCancel func() @@ -51,7 +51,7 @@ type DagModifier struct { read *uio.DagReader } -func NewDagModifier(ctx context.Context, from *mdag.Node, serv mdag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*DagModifier, error) { +func NewDagModifier(ctx context.Context, from *mdag.Node, serv mdag.DAGService, mp pin.ManualPinner, spl chunk.SplitterGen) (*DagModifier, error) { return &DagModifier{ curNode: from.Copy(), dagserv: serv, @@ -106,10 +106,10 @@ func (zr zeroReader) Read(b []byte) (int, error) { // expandSparse grows the file with zero blocks of 4096 // A small blocksize is chosen to aid in deduplication func (dm *DagModifier) expandSparse(size int64) error { - spl := chunk.SizeSplitter{4096} r := io.LimitReader(zeroReader{}, size) - blks := spl.Split(r) - nnode, err := dm.appendData(dm.curNode, blks) + spl := chunk.NewSizeSplitter(r, 4096) + blks, errs := chunk.Chan(spl) + nnode, err := dm.appendData(dm.curNode, blks, errs) if err != nil { return err } @@ -196,8 +196,8 @@ func (dm *DagModifier) Sync() error { // need to write past end of current dag if !done { - blks := dm.splitter.Split(dm.wrBuf) - nd, err = dm.appendData(dm.curNode, blks) + blks, errs := chunk.Chan(dm.splitter(dm.wrBuf)) + nd, err = dm.appendData(dm.curNode, blks, errs) if err != nil { return err } @@ -306,14 +306,14 @@ func (dm *DagModifier) modifyDag(node *mdag.Node, offset uint64, data io.Reader) } // appendData appends the blocks from the given chan to the end of this dag -func (dm *DagModifier) appendData(node *mdag.Node, blks <-chan []byte) (*mdag.Node, error) { +func (dm *DagModifier) appendData(node *mdag.Node, blks <-chan []byte, errs <-chan error) (*mdag.Node, error) { dbp := &help.DagBuilderParams{ Dagserv: dm.dagserv, Maxlinks: help.DefaultLinksPerBlock, NodeCB: imp.BasicPinnerCB(dm.mp), } - return trickle.TrickleAppend(node, dbp.New(blks)) + return trickle.TrickleAppend(node, dbp.New(blks, errs)) } // Read data from this dag starting at the current offset diff --git a/unixfs/mod/dagmodifier_test.go b/unixfs/mod/dagmodifier_test.go index e7db0d97d..b4a501dd4 100644 --- a/unixfs/mod/dagmodifier_test.go +++ b/unixfs/mod/dagmodifier_test.go @@ -53,7 +53,7 @@ func getMockDagServAndBstore(t testing.TB) (mdag.DAGService, blockstore.Blocksto func getNode(t testing.TB, dserv mdag.DAGService, size int64, pinner pin.ManualPinner) ([]byte, *mdag.Node) { in := io.LimitReader(u.NewTimeSeededRand(), size) - node, err := imp.BuildTrickleDagFromReader(in, dserv, &chunk.SizeSplitter{500}, imp.BasicPinnerCB(pinner)) + node, err := imp.BuildTrickleDagFromReader(dserv, sizeSplitterGen(500)(in), imp.BasicPinnerCB(pinner)) if err != nil { t.Fatal(err) } @@ -117,13 +117,19 @@ func testModWrite(t *testing.T, beg, size uint64, orig []byte, dm *DagModifier) return orig } +func sizeSplitterGen(size int64) chunk.SplitterGen { + return func(r io.Reader) chunk.Splitter { + return chunk.NewSizeSplitter(r, size) + } +} + func TestDagModifierBasic(t *testing.T) { dserv, pin := getMockDagServ(t) b, n := getNode(t, dserv, 50000, pin) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dagmod, err := NewDagModifier(ctx, n, dserv, pin, &chunk.SizeSplitter{Size: 512}) + dagmod, err := NewDagModifier(ctx, n, dserv, pin, sizeSplitterGen(512)) if err != nil { t.Fatal(err) } @@ -178,7 +184,7 @@ func TestMultiWrite(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512}) + dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512)) if err != nil { t.Fatal(err) } @@ -231,7 +237,7 @@ func TestMultiWriteAndFlush(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512}) + dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512)) if err != nil { t.Fatal(err) } @@ -279,7 +285,7 @@ func TestWriteNewFile(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512}) + dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512)) if err != nil { t.Fatal(err) } @@ -322,7 +328,7 @@ func TestMultiWriteCoal(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512}) + dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512)) if err != nil { t.Fatal(err) } @@ -368,7 +374,7 @@ func TestLargeWriteChunks(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512}) + dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512)) if err != nil { t.Fatal(err) } @@ -406,7 +412,7 @@ func TestDagTruncate(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512}) + dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512)) if err != nil { t.Fatal(err) } @@ -437,7 +443,7 @@ func TestSparseWrite(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512}) + dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512)) if err != nil { t.Fatal(err) } @@ -491,7 +497,7 @@ func TestCorrectPinning(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512}) + dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512)) if err != nil { t.Fatal(err) } @@ -598,7 +604,7 @@ func BenchmarkDagmodWrite(b *testing.B) { wrsize := 4096 - dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512}) + dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512)) if err != nil { b.Fatal(err) }