From 5c802ae8524004bf37db96e98ae518bf1cf90252 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 7 Oct 2014 05:55:28 +0000 Subject: [PATCH] add more tests and rework a lot of utility structures --- importer/dagwriter/dagmodifier.go | 23 ++++---- importer/dagwriter/dagmodifier_test.go | 38 ++----------- importer/dagwriter/dagwriter.go | 7 ++- importer/dagwriter/dagwriter_test.go | 6 +- importer/format/format_test.go | 36 ++++++++++++ importer/importer.go | 3 + importer/rabin.go | 40 -------------- importer/splitting.go | 39 +------------ util/util.go | 76 ++++++++++++++++++++++++++ util/util_test.go | 35 +++++++++++- 10 files changed, 175 insertions(+), 128 deletions(-) create mode 100644 importer/format/format_test.go diff --git a/importer/dagwriter/dagmodifier.go b/importer/dagwriter/dagmodifier.go index f1fa5b348..b749da22e 100644 --- a/importer/dagwriter/dagmodifier.go +++ b/importer/dagwriter/dagmodifier.go @@ -1,6 +1,7 @@ package dagwriter import ( + "bytes" "errors" "code.google.com/p/goprotobuf/proto" @@ -18,19 +19,21 @@ type DagModifier struct { dagserv *mdag.DAGService curNode *mdag.Node - pbdata *ft.PBData + pbdata *ft.PBData + splitter imp.BlockSplitter } -func NewDagModifier(from *mdag.Node, serv *mdag.DAGService) (*DagModifier, error) { +func NewDagModifier(from *mdag.Node, serv *mdag.DAGService, spl imp.BlockSplitter) (*DagModifier, error) { pbd, err := ft.FromBytes(from.Data) if err != nil { return nil, err } return &DagModifier{ - curNode: from.Copy(), - dagserv: serv, - pbdata: pbd, + curNode: from.Copy(), + dagserv: serv, + pbdata: pbd, + splitter: spl, }, nil } @@ -136,8 +139,7 @@ func (dm *DagModifier) WriteAt(b []byte, offset uint64) (int, error) { b = append(b, data[midoff:]...) } - // TODO: dont assume a splitting func here - subblocks := splitBytes(b, &imp.SizeSplitter2{512}) + subblocks := splitBytes(b, dm.splitter) var links []*mdag.Link var sizes []uint64 for _, sb := range subblocks { @@ -168,11 +170,8 @@ func (dm *DagModifier) WriteAt(b []byte, offset uint64) (int, error) { return origlen, nil } -func splitBytes(b []byte, spl imp.StreamSplitter) [][]byte { - ch := make(chan []byte) - out := spl.Split(ch) - ch <- b - close(ch) +func splitBytes(b []byte, spl imp.BlockSplitter) [][]byte { + out := spl.Split(bytes.NewReader(b)) var arr [][]byte for blk := range out { arr = append(arr, blk) diff --git a/importer/dagwriter/dagmodifier_test.go b/importer/dagwriter/dagmodifier_test.go index 7d5497258..129d836d5 100644 --- a/importer/dagwriter/dagmodifier_test.go +++ b/importer/dagwriter/dagmodifier_test.go @@ -4,46 +4,18 @@ import ( "fmt" "io" "io/ioutil" - "math/rand" "testing" - "time" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/op/go-logging" bs "github.com/jbenet/go-ipfs/blockservice" imp "github.com/jbenet/go-ipfs/importer" ft "github.com/jbenet/go-ipfs/importer/format" mdag "github.com/jbenet/go-ipfs/merkledag" + u "github.com/jbenet/go-ipfs/util" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go" ) -type randGen struct { - src rand.Source -} - -func newRand() *randGen { - return &randGen{rand.NewSource(time.Now().UnixNano())} -} - -func (r *randGen) Read(p []byte) (n int, err error) { - todo := len(p) - offset := 0 - for { - val := int64(r.src.Int63()) - for i := 0; i < 8; i++ { - p[offset] = byte(val & 0xff) - todo-- - if todo == 0 { - return len(p), nil - } - offset++ - val >>= 8 - } - } - - panic("unreachable") -} - func getMockDagServ(t *testing.T) *mdag.DAGService { dstore := ds.NewMapDatastore() bserv, err := bs.NewBlockService(dstore, nil) @@ -54,9 +26,9 @@ func getMockDagServ(t *testing.T) *mdag.DAGService { } func getNode(t *testing.T, dserv *mdag.DAGService, size int64) ([]byte, *mdag.Node) { - dw := NewDagWriter(dserv, &imp.SizeSplitter2{500}) + dw := NewDagWriter(dserv, &imp.SizeSplitter{500}) - n, err := io.CopyN(dw, newRand(), size) + n, err := io.CopyN(dw, u.NewFastRand(), size) if err != nil { t.Fatal(err) } @@ -82,7 +54,7 @@ func getNode(t *testing.T, dserv *mdag.DAGService, size int64) ([]byte, *mdag.No func testModWrite(t *testing.T, beg, size uint64, orig []byte, dm *DagModifier) []byte { newdata := make([]byte, size) - r := newRand() + r := u.NewFastRand() r.Read(newdata) if size+beg > uint64(len(orig)) { @@ -127,7 +99,7 @@ func TestDagModifierBasic(t *testing.T) { dserv := getMockDagServ(t) b, n := getNode(t, dserv, 50000) - dagmod, err := NewDagModifier(n, dserv) + dagmod, err := NewDagModifier(n, dserv, &imp.SizeSplitter{512}) if err != nil { t.Fatal(err) } diff --git a/importer/dagwriter/dagwriter.go b/importer/dagwriter/dagwriter.go index 23c33a144..239618136 100644 --- a/importer/dagwriter/dagwriter.go +++ b/importer/dagwriter/dagwriter.go @@ -15,11 +15,11 @@ type DagWriter struct { totalSize int64 splChan chan []byte done chan struct{} - splitter imp.StreamSplitter + splitter imp.BlockSplitter seterr error } -func NewDagWriter(ds *dag.DAGService, splitter imp.StreamSplitter) *DagWriter { +func NewDagWriter(ds *dag.DAGService, splitter imp.BlockSplitter) *DagWriter { dw := new(DagWriter) dw.dagserv = ds dw.splChan = make(chan []byte, 8) @@ -30,7 +30,8 @@ func NewDagWriter(ds *dag.DAGService, splitter imp.StreamSplitter) *DagWriter { } func (dw *DagWriter) startSplitter() { - blkchan := dw.splitter.Split(dw.splChan) + r := util.NewByteChanReader(dw.splChan) + blkchan := dw.splitter.Split(r) first := <-blkchan mbf := new(ft.MultiBlock) root := new(dag.Node) diff --git a/importer/dagwriter/dagwriter_test.go b/importer/dagwriter/dagwriter_test.go index b7b6cc21b..ec6d56449 100644 --- a/importer/dagwriter/dagwriter_test.go +++ b/importer/dagwriter/dagwriter_test.go @@ -54,7 +54,7 @@ func TestDagWriter(t *testing.T) { t.Fatal(err) } dag := &mdag.DAGService{bserv} - dw := NewDagWriter(dag, &imp.SizeSplitter2{4096}) + dw := NewDagWriter(dag, &imp.SizeSplitter{4096}) nbytes := int64(1024 * 1024 * 2) n, err := io.CopyN(dw, &datasource{}, nbytes) @@ -88,7 +88,7 @@ func TestMassiveWrite(t *testing.T) { t.Fatal(err) } dag := &mdag.DAGService{bserv} - dw := NewDagWriter(dag, &imp.SizeSplitter2{4096}) + dw := NewDagWriter(dag, &imp.SizeSplitter{4096}) nbytes := int64(1024 * 1024 * 1024 * 16) n, err := io.CopyN(dw, &datasource{}, nbytes) @@ -113,7 +113,7 @@ func BenchmarkDagWriter(b *testing.B) { nbytes := int64(b.N) for i := 0; i < b.N; i++ { b.SetBytes(nbytes) - dw := NewDagWriter(dag, &imp.SizeSplitter2{4096}) + dw := NewDagWriter(dag, &imp.SizeSplitter{4096}) n, err := io.CopyN(dw, &datasource{}, nbytes) if err != nil { b.Fatal(err) diff --git a/importer/format/format_test.go b/importer/format/format_test.go new file mode 100644 index 000000000..06194aefc --- /dev/null +++ b/importer/format/format_test.go @@ -0,0 +1,36 @@ +package format + +import ( + "testing" + + "code.google.com/p/goprotobuf/proto" +) + +func TestMultiBlock(t *testing.T) { + mbf := new(MultiBlock) + for i := 0; i < 15; i++ { + mbf.AddBlockSize(100) + } + + mbf.Data = make([]byte, 128) + + b, err := mbf.GetBytes() + if err != nil { + t.Fatal(err) + } + + pbn := new(PBData) + err = proto.Unmarshal(b, pbn) + if err != nil { + t.Fatal(err) + } + + ds, err := DataSize(b) + if err != nil { + t.Fatal(err) + } + + if ds != (100*15)+128 { + t.Fatal("Datasize calculations incorrect!") + } +} diff --git a/importer/importer.go b/importer/importer.go index e64f440b5..3c92eb3ab 100644 --- a/importer/importer.go +++ b/importer/importer.go @@ -7,8 +7,11 @@ import ( ft "github.com/jbenet/go-ipfs/importer/format" dag "github.com/jbenet/go-ipfs/merkledag" + "github.com/jbenet/go-ipfs/util" ) +var log = util.Logger("importer") + // BlockSizeLimit specifies the maximum size an imported block can have. var BlockSizeLimit = int64(1048576) // 1 MB diff --git a/importer/rabin.go b/importer/rabin.go index 3eab5bc9c..64b6f8d3f 100644 --- a/importer/rabin.go +++ b/importer/rabin.go @@ -92,43 +92,3 @@ func (mr *MaybeRabin) Split(r io.Reader) chan []byte { }() return out } - -/* -func WhyrusleepingCantImplementRabin(r io.Reader) chan []byte { - out := make(chan []byte, 4) - go func() { - buf := bufio.NewReader(r) - blkbuf := new(bytes.Buffer) - window := make([]byte, 16) - var val uint64 - prime := uint64(61) - - get := func(i int) uint64 { - return uint64(window[i%len(window)]) - } - - set := func(i int, val byte) { - window[i%len(window)] = val - } - - for i := 0; ; i++ { - curb, err := buf.ReadByte() - if err != nil { - break - } - set(i, curb) - blkbuf.WriteByte(curb) - - hash := md5.Sum(window) - if hash[0] == 0 && hash[1] == 0 { - out <- blkbuf.Bytes() - blkbuf.Reset() - } - } - out <- blkbuf.Bytes() - close(out) - }() - - return out -} -*/ diff --git a/importer/splitting.go b/importer/splitting.go index 65b672486..7b6e56cee 100644 --- a/importer/splitting.go +++ b/importer/splitting.go @@ -1,19 +1,9 @@ package importer -import ( - "io" +import "io" - u "github.com/jbenet/go-ipfs/util" -) - -// OLD type BlockSplitter interface { - Split(io.Reader) chan []byte -} - -// NEW -type StreamSplitter interface { - Split(chan []byte) chan []byte + Split(r io.Reader) chan []byte } type SizeSplitter struct { @@ -34,7 +24,7 @@ func (ss *SizeSplitter) Split(r io.Reader) chan []byte { } return } - u.PErr("block split error: %v\n", err) + log.Error("Block split error: %s", err) return } if nread < ss.Size { @@ -45,26 +35,3 @@ func (ss *SizeSplitter) Split(r io.Reader) chan []byte { }() return out } - -type SizeSplitter2 struct { - Size int -} - -func (ss *SizeSplitter2) Split(in chan []byte) chan []byte { - out := make(chan []byte) - go func() { - defer close(out) - var buf []byte - for b := range in { - buf = append(buf, b...) - for len(buf) > ss.Size { - out <- buf[:ss.Size] - buf = buf[ss.Size:] - } - } - if len(buf) > 0 { - out <- buf - } - }() - return out -} diff --git a/util/util.go b/util/util.go index 3a55188dc..5a9d21e81 100644 --- a/util/util.go +++ b/util/util.go @@ -3,10 +3,13 @@ package util import ( "errors" "fmt" + "io" + "math/rand" "os" "os/user" "path/filepath" "strings" + "time" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go" b58 "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-base58" @@ -154,3 +157,76 @@ func ExpandPathnames(paths []string) ([]string, error) { } return out, nil } + +// byteChanReader wraps a byte chan in a reader +type byteChanReader struct { + in chan []byte + buf []byte +} + +func NewByteChanReader(in chan []byte) io.Reader { + return &byteChanReader{in: in} +} + +func (bcr *byteChanReader) Read(b []byte) (int, error) { + if len(bcr.buf) == 0 { + data, ok := <-bcr.in + if !ok { + return 0, io.EOF + } + bcr.buf = data + } + + if len(bcr.buf) >= len(b) { + copy(b, bcr.buf) + bcr.buf = bcr.buf[len(b):] + return len(b), nil + } + + copy(b, bcr.buf) + b = b[len(bcr.buf):] + totread := len(bcr.buf) + + for data := range bcr.in { + if len(data) > len(b) { + totread += len(b) + copy(b, data[:len(b)]) + bcr.buf = data[len(b):] + return totread, nil + } + copy(b, data) + totread += len(data) + b = b[len(data):] + if len(b) == 0 { + return totread, nil + } + } + return totread, io.EOF +} + +type randGen struct { + src rand.Source +} + +func NewFastRand() io.Reader { + return &randGen{rand.NewSource(time.Now().UnixNano())} +} + +func (r *randGen) Read(p []byte) (n int, err error) { + todo := len(p) + offset := 0 + for { + val := int64(r.src.Int63()) + for i := 0; i < 8; i++ { + p[offset] = byte(val & 0xff) + todo-- + if todo == 0 { + return len(p), nil + } + offset++ + val >>= 8 + } + } + + panic("unreachable") +} diff --git a/util/util_test.go b/util/util_test.go index 821a97e66..a85c492fe 100644 --- a/util/util_test.go +++ b/util/util_test.go @@ -2,8 +2,11 @@ package util import ( "bytes" - mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash" + "io/ioutil" + "math/rand" "testing" + + mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash" ) func TestKey(t *testing.T) { @@ -25,3 +28,33 @@ func TestKey(t *testing.T) { t.Error("Keys not equal.") } } + +func TestByteChanReader(t *testing.T) { + data := make([]byte, 1024*1024) + r := NewFastRand() + r.Read(data) + dch := make(chan []byte, 8) + + go func() { + beg := 0 + for i := 0; i < len(data); { + i += rand.Intn(100) + 1 + if i > len(data) { + i = len(data) + } + dch <- data[beg:i] + beg = i + } + close(dch) + }() + + read := NewByteChanReader(dch) + out, err := ioutil.ReadAll(read) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(out, data) { + t.Fatal("Reader failed to stream correct bytes") + } +}