From 1a7c083850d2d34031304cce7286e3bd4eca5ef4 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 15 Sep 2014 02:04:55 +0000 Subject: [PATCH] implement something like rabin fingerprinting --- importer/importer_test.go | 97 +++++++++++++++++++++----- importer/rabin.go | 143 ++++++++++++++++++++++++++++++++++++++ importer/split_test.go | 48 ------------- importer/splitting.go | 50 ------------- 4 files changed, 223 insertions(+), 115 deletions(-) create mode 100644 importer/rabin.go delete mode 100644 importer/split_test.go diff --git a/importer/importer_test.go b/importer/importer_test.go index 534bce262..dbd917708 100644 --- a/importer/importer_test.go +++ b/importer/importer_test.go @@ -3,42 +3,66 @@ package importer import ( "bytes" "crypto/rand" + "fmt" "io" "io/ioutil" + "os" "testing" dag "github.com/jbenet/go-ipfs/merkledag" ) -func TestFileConsistency(t *testing.T) { - buf := new(bytes.Buffer) - io.CopyN(buf, rand.Reader, 512*32) - should := buf.Bytes() - nd, err := NewDagFromReaderWithSplitter(buf, SplitterBySize(512)) - if err != nil { - t.Fatal(err) - } - r, err := dag.NewDagReader(nd, nil) +func TestBuildDag(t *testing.T) { + td := os.TempDir() + fi, err := os.Create(td + "/tmpfi") if err != nil { t.Fatal(err) } - out, err := ioutil.ReadAll(r) + _, err = io.CopyN(fi, rand.Reader, 1024*1024) if err != nil { t.Fatal(err) } - if !bytes.Equal(out, should) { - t.Fatal("Output not the same as input.") + fi.Close() + + _, err = NewDagFromFile(td + "/tmpfi") + if err != nil { + t.Fatal(err) } } //Test where calls to read are smaller than the chunk size -func TestFileConsistencyLargeBlocks(t *testing.T) { +func TestSizeBasedSplit(t *testing.T) { + bs := SplitterBySize(512) + testFileConsistency(t, bs, 32*512) + bs = SplitterBySize(4096) + testFileConsistency(t, bs, 32*4096) + + // Uneven offset + testFileConsistency(t, bs, 31*4095) +} + +func TestOtherSplit(t *testing.T) { + //split := WhyrusleepingCantImplementRabin + //testFileConsistency(t, split, 4096*64) +} + +type testData struct{ n uint64 } + +func (t *testData) Read(b []byte) (int, error) { + for i, _ := range b { + b[i] = byte(t.n % 256) + t.n++ + } + return len(b), nil +} + +func testFileConsistency(t *testing.T, bs BlockSplitter, nbytes int) { buf := new(bytes.Buffer) - io.CopyN(buf, rand.Reader, 4096*32) + io.CopyN(buf, rand.Reader, int64(nbytes)) should := buf.Bytes() - nd, err := NewDagFromReaderWithSplitter(buf, SplitterBySize(4096)) + nd, err := NewDagFromReaderWithSplitter(buf, bs) if err != nil { t.Fatal(err) } @@ -52,7 +76,46 @@ func TestFileConsistencyLargeBlocks(t *testing.T) { t.Fatal(err) } - if !bytes.Equal(out, should) { - t.Fatal("Output not the same as input.") + err = arrComp(out, should) + if err != nil { + t.Fatal(err) + } +} + +func arrComp(a, b []byte) error { + if len(a) != len(b) { + return fmt.Errorf("Arrays differ in length. %d != %d", len(a), len(b)) + } + for i, v := range a { + if v != b[i] { + return fmt.Errorf("Arrays differ at index: %d", i) + } + } + return nil +} + +func TestMaybeRabinConsistency(t *testing.T) { + testFileConsistency(t, ThisMightBeRabin, 256*4096) +} + +func TestRabinSplit(t *testing.T) { + + //Generate some random data + nbytes := 256 * 4096 + buf := new(bytes.Buffer) + io.CopyN(buf, rand.Reader, int64(nbytes)) + good := buf.Bytes() + + // Get block generator for random data + ch := ThisMightBeRabin(buf) + + i := 0 + var blocks [][]byte + for blk := range ch { + if !bytes.Equal(blk, good[i:len(blk)+i]) { + t.Fatalf("bad block! %v", blk[:32]) + } + i += len(blk) + blocks = append(blocks, blk) } } diff --git a/importer/rabin.go b/importer/rabin.go new file mode 100644 index 000000000..4348c93b3 --- /dev/null +++ b/importer/rabin.go @@ -0,0 +1,143 @@ +package importer + +import ( + "bufio" + "bytes" + "fmt" + "io" +) + +//pseudocode stolen from the internet +func rollhash(S []byte) { + a := 10 + mask := 0xfff + MOD := 33554383 //randomly chosen + windowSize := 16 + an := 1 + rollingHash := 0 + for i := 0; i < windowSize; i++ { + rollingHash = (rollingHash*a + int(S[i])) % MOD + an = (an * a) % MOD + } + if rollingHash&mask == mask { + // "match" + fmt.Println("match") + } + for i := 1; i < len(S)-windowSize; i++ { + rollingHash = (rollingHash*a + int(S[i+windowSize-1]) - an*int(S[i-1])) % MOD + if rollingHash&mask == mask { + //print "match" + fmt.Println("match") + } + } +} + +func ThisMightBeRabin(r io.Reader) chan []byte { + out := make(chan []byte) + go func() { + inbuf := bufio.NewReader(r) + blkbuf := new(bytes.Buffer) + + // some bullshit numbers + a := 10 + mask := 0xfff //make this smaller for smaller blocks + MOD := 33554383 //randomly chosen + windowSize := 16 + an := 1 + rollingHash := 0 + + window := make([]byte, windowSize) + get := func(i int) int { return int(window[i%len(window)]) } + set := func(i int, val byte) { window[i%len(window)] = val } + dup := func(b []byte) []byte { + d := make([]byte, len(b)) + copy(d, b) + return d + } + + i := 0 + for ; i < windowSize; i++ { + b, err := inbuf.ReadByte() + if err != nil { + fmt.Println(err) + return + } + blkbuf.WriteByte(b) + window[i] = b + rollingHash = (rollingHash*a + int(b)) % MOD + an = (an * a) % MOD + } + /* This is too short for a block + if rollingHash&mask == mask { + // "match" + fmt.Println("match") + } + */ + for ; true; i++ { + b, err := inbuf.ReadByte() + if err != nil { + break + } + outval := get(i) + set(i, b) + blkbuf.WriteByte(b) + rollingHash = (rollingHash*a + get(i) - an*outval) % MOD + if rollingHash&mask == mask { + //print "match" + out <- dup(blkbuf.Bytes()) + blkbuf.Reset() + } + peek, err := inbuf.Peek(windowSize) + if err != nil { + break + } + if len(peek) != windowSize { + break + } + } + io.Copy(blkbuf, inbuf) + out <- blkbuf.Bytes() + close(out) + }() + return out +} + +/* +func WhyrusleepingCantImplementRabin(r io.Reader) chan []byte { + out := make(chan []byte, 4) + go func() { + buf := bufio.NewReader(r) + blkbuf := new(bytes.Buffer) + window := make([]byte, 16) + var val uint64 + prime := uint64(61) + + get := func(i int) uint64 { + return uint64(window[i%len(window)]) + } + + set := func(i int, val byte) { + window[i%len(window)] = val + } + + for i := 0; ; i++ { + curb, err := buf.ReadByte() + if err != nil { + break + } + set(i, curb) + blkbuf.WriteByte(curb) + + hash := md5.Sum(window) + if hash[0] == 0 && hash[1] == 0 { + out <- blkbuf.Bytes() + blkbuf.Reset() + } + } + out <- blkbuf.Bytes() + close(out) + }() + + return out +} +*/ diff --git a/importer/split_test.go b/importer/split_test.go deleted file mode 100644 index fe3a4d0e5..000000000 --- a/importer/split_test.go +++ /dev/null @@ -1,48 +0,0 @@ -package importer - -import ( - "bytes" - "crypto/rand" - "testing" -) - -func TestDataSplitting(t *testing.T) { - buf := make([]byte, 16*1024*1024) - rand.Read(buf) - split := Rabin(buf) - - if len(split) == 1 { - t.Fatal("No split occurred!") - } - - min := 2 << 15 - max := 0 - - mxcount := 0 - - n := 0 - for _, b := range split { - if !bytes.Equal(b, buf[n:n+len(b)]) { - t.Fatal("Split lost data!") - } - n += len(b) - - if len(b) < min { - min = len(b) - } - - if len(b) > max { - max = len(b) - } - - if len(b) == 16384 { - mxcount++ - } - } - - if n != len(buf) { - t.Fatal("missing some bytes!") - } - t.Log(len(split)) - t.Log(min, max, mxcount) -} diff --git a/importer/splitting.go b/importer/splitting.go index d2690c784..a4c19bf20 100644 --- a/importer/splitting.go +++ b/importer/splitting.go @@ -32,53 +32,3 @@ func SplitterBySize(n int) BlockSplitter { return out } } - -// TODO: this should take a reader, not a byte array. what if we're splitting a 3TB file? -//Rabin Fingerprinting for file chunking -func Rabin(b []byte) [][]byte { - var out [][]byte - windowsize := uint64(48) - chunkMax := 1024 * 16 - minBlkSize := 2048 - blkBegI := 0 - prime := uint64(61) - - var poly uint64 - var curchecksum uint64 - - // Smaller than a window? Get outa here! - if len(b) <= int(windowsize) { - return [][]byte{b} - } - - i := 0 - for n := i; i < n+int(windowsize); i++ { - cur := uint64(b[i]) - curchecksum = (curchecksum * prime) + cur - poly = (poly * prime) + cur - } - - for ; i < len(b); i++ { - cur := uint64(b[i]) - curchecksum = (curchecksum * prime) + cur - poly = (poly * prime) + cur - curchecksum -= (uint64(b[i-1]) * prime) - - if i-blkBegI >= chunkMax { - // push block - out = append(out, b[blkBegI:i]) - blkBegI = i - } - - // first 13 bits of polynomial are 0 - if poly%8192 == 0 && i-blkBegI >= minBlkSize { - // push block - out = append(out, b[blkBegI:i]) - blkBegI = i - } - } - if i > blkBegI { - out = append(out, b[blkBegI:]) - } - return out -}