From 78454884db316d58c37e0a6addd3a7ed155a4c69 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 15 Sep 2014 04:17:29 +0000 Subject: [PATCH] clean up code, make it have a nicer interface --- importer/importer.go | 4 +- importer/importer_test.go | 43 ++----------------- importer/rabin.go | 88 ++++++++++++++++----------------------- importer/splitting.go | 46 ++++++++++---------- 4 files changed, 67 insertions(+), 114 deletions(-) diff --git a/importer/importer.go b/importer/importer.go index bdc5a30e0..607a989a6 100644 --- a/importer/importer.go +++ b/importer/importer.go @@ -20,11 +20,11 @@ var ErrSizeLimitExceeded = fmt.Errorf("object size limit exceeded") // NewDagFromReader constructs a Merkle DAG from the given io.Reader. // size required for block construction. func NewDagFromReader(r io.Reader) (*dag.Node, error) { - return NewDagFromReaderWithSplitter(r, SplitterBySize(1024*512)) + return NewDagFromReaderWithSplitter(r, &SizeSplitter{1024 * 512}) } func NewDagFromReaderWithSplitter(r io.Reader, spl BlockSplitter) (*dag.Node, error) { - blkChan := spl(r) + blkChan := spl.Split(r) root := &dag.Node{Data: dag.FilePBData()} for blk := range blkChan { diff --git a/importer/importer_test.go b/importer/importer_test.go index dbd917708..cefe4a9b4 100644 --- a/importer/importer_test.go +++ b/importer/importer_test.go @@ -34,30 +34,15 @@ func TestBuildDag(t *testing.T) { //Test where calls to read are smaller than the chunk size func TestSizeBasedSplit(t *testing.T) { - bs := SplitterBySize(512) + bs := &SizeSplitter{512} testFileConsistency(t, bs, 32*512) - bs = SplitterBySize(4096) + bs = &SizeSplitter{4096} testFileConsistency(t, bs, 32*4096) // Uneven offset testFileConsistency(t, bs, 31*4095) } -func TestOtherSplit(t *testing.T) { - //split := WhyrusleepingCantImplementRabin - //testFileConsistency(t, split, 4096*64) -} - -type testData struct{ n uint64 } - -func (t *testData) Read(b []byte) (int, error) { - for i, _ := range b { - b[i] = byte(t.n % 256) - t.n++ - } - return len(b), nil -} - func testFileConsistency(t *testing.T, bs BlockSplitter, nbytes int) { buf := new(bytes.Buffer) io.CopyN(buf, rand.Reader, int64(nbytes)) @@ -95,27 +80,5 @@ func arrComp(a, b []byte) error { } func TestMaybeRabinConsistency(t *testing.T) { - testFileConsistency(t, ThisMightBeRabin, 256*4096) -} - -func TestRabinSplit(t *testing.T) { - - //Generate some random data - nbytes := 256 * 4096 - buf := new(bytes.Buffer) - io.CopyN(buf, rand.Reader, int64(nbytes)) - good := buf.Bytes() - - // Get block generator for random data - ch := ThisMightBeRabin(buf) - - i := 0 - var blocks [][]byte - for blk := range ch { - if !bytes.Equal(blk, good[i:len(blk)+i]) { - t.Fatalf("bad block! %v", blk[:32]) - } - i += len(blk) - blocks = append(blocks, blk) - } + testFileConsistency(t, NewMaybeRabin(4096), 256*4096) } diff --git a/importer/rabin.go b/importer/rabin.go index 4348c93b3..4671239ac 100644 --- a/importer/rabin.go +++ b/importer/rabin.go @@ -5,93 +5,79 @@ import ( "bytes" "fmt" "io" + "math" ) -//pseudocode stolen from the internet -func rollhash(S []byte) { - a := 10 - mask := 0xfff - MOD := 33554383 //randomly chosen - windowSize := 16 - an := 1 - rollingHash := 0 - for i := 0; i < windowSize; i++ { - rollingHash = (rollingHash*a + int(S[i])) % MOD - an = (an * a) % MOD - } - if rollingHash&mask == mask { - // "match" - fmt.Println("match") - } - for i := 1; i < len(S)-windowSize; i++ { - rollingHash = (rollingHash*a + int(S[i+windowSize-1]) - an*int(S[i-1])) % MOD - if rollingHash&mask == mask { - //print "match" - fmt.Println("match") - } - } +type MaybeRabin struct { + mask int + windowSize int } -func ThisMightBeRabin(r io.Reader) chan []byte { - out := make(chan []byte) +func NewMaybeRabin(avgBlkSize int) *MaybeRabin { + blkbits := uint(math.Log2(float64(avgBlkSize))) + rb := new(MaybeRabin) + rb.mask = (1 << blkbits) - 1 + rb.windowSize = 16 // probably a good number... + return rb +} + +func (mr *MaybeRabin) Split(r io.Reader) chan []byte { + out := make(chan []byte, 16) go func() { inbuf := bufio.NewReader(r) blkbuf := new(bytes.Buffer) - // some bullshit numbers - a := 10 - mask := 0xfff //make this smaller for smaller blocks - MOD := 33554383 //randomly chosen - windowSize := 16 + // some bullshit numbers i made up + a := 10 // honestly, no idea what this is + MOD := 33554383 // randomly chosen (seriously) an := 1 rollingHash := 0 - window := make([]byte, windowSize) - get := func(i int) int { return int(window[i%len(window)]) } - set := func(i int, val byte) { window[i%len(window)] = val } + // Window is a circular buffer + window := make([]byte, mr.windowSize) + push := func(i int, val byte) (outval int) { + outval = int(window[i%len(window)]) + window[i%len(window)] = val + return + } + + // Duplicate byte slice dup := func(b []byte) []byte { d := make([]byte, len(b)) copy(d, b) return d } + // Fill up the window i := 0 - for ; i < windowSize; i++ { + for ; i < mr.windowSize; i++ { b, err := inbuf.ReadByte() if err != nil { fmt.Println(err) return } blkbuf.WriteByte(b) - window[i] = b + push(i, b) rollingHash = (rollingHash*a + int(b)) % MOD an = (an * a) % MOD } - /* This is too short for a block - if rollingHash&mask == mask { - // "match" - fmt.Println("match") - } - */ + for ; true; i++ { b, err := inbuf.ReadByte() if err != nil { break } - outval := get(i) - set(i, b) + outval := push(i, b) blkbuf.WriteByte(b) - rollingHash = (rollingHash*a + get(i) - an*outval) % MOD - if rollingHash&mask == mask { - //print "match" + rollingHash = (rollingHash*a + int(b) - an*outval) % MOD + if rollingHash&mr.mask == mr.mask { out <- dup(blkbuf.Bytes()) blkbuf.Reset() } - peek, err := inbuf.Peek(windowSize) - if err != nil { - break - } - if len(peek) != windowSize { + + // Check if there are enough remaining + peek, err := inbuf.Peek(mr.windowSize) + if err != nil || len(peek) != mr.windowSize { break } } diff --git a/importer/splitting.go b/importer/splitting.go index a4c19bf20..30ecc1e3e 100644 --- a/importer/splitting.go +++ b/importer/splitting.go @@ -6,29 +6,33 @@ import ( u "github.com/jbenet/go-ipfs/util" ) -type BlockSplitter func(io.Reader) chan []byte +type BlockSplitter interface { + Split(io.Reader) chan []byte +} -func SplitterBySize(n int) BlockSplitter { - return func(r io.Reader) chan []byte { - out := make(chan []byte) - go func(n int) { - defer close(out) - for { - chunk := make([]byte, n) - nread, err := r.Read(chunk) - if err != nil { - if err == io.EOF { - return - } - u.PErr("block split error: %v\n", err) +type SizeSplitter struct { + Size int +} + +func (ss *SizeSplitter) Split(r io.Reader) chan []byte { + out := make(chan []byte) + go func() { + defer close(out) + for { + chunk := make([]byte, ss.Size) + nread, err := r.Read(chunk) + if err != nil { + if err == io.EOF { return } - if nread < n { - chunk = chunk[:nread] - } - out <- chunk + u.PErr("block split error: %v\n", err) + return } - }(n) - return out - } + if nread < ss.Size { + chunk = chunk[:nread] + } + out <- chunk + } + }() + return out }