diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 3bbcd876e..9b5342910 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -1,6 +1,6 @@ { "ImportPath": "github.com/jbenet/go-ipfs", - "GoVersion": "go1.3", + "GoVersion": "go1.3.3", "Packages": [ "./..." ], diff --git a/core/commands/add.go b/core/commands/add.go index 7bcc5119f..57c9666bb 100644 --- a/core/commands/add.go +++ b/core/commands/add.go @@ -10,11 +10,9 @@ import ( "github.com/jbenet/go-ipfs/core" "github.com/jbenet/go-ipfs/importer" - "github.com/jbenet/go-ipfs/importer/chunk" dag "github.com/jbenet/go-ipfs/merkledag" "github.com/jbenet/go-ipfs/pin" ft "github.com/jbenet/go-ipfs/unixfs" - uio "github.com/jbenet/go-ipfs/unixfs/io" ) // Error indicating the max depth has been exceded. @@ -90,14 +88,12 @@ func addDir(n *core.IpfsNode, fpath string, depth int, out io.Writer) (*dag.Node } func addFile(n *core.IpfsNode, fpath string, depth int, out io.Writer) (*dag.Node, error) { - dw := uio.NewDagWriter(n.DAG, chunk.DefaultSplitter) mp, ok := n.Pinning.(pin.ManualPinner) if !ok { return nil, errors.New("invalid pinner type! expected manual pinner") } - dw.Pinner = mp - root, err := importer.ImportFileDag(fpath, dw) + root, err := importer.BuildDagFromFile(fpath, n.DAG, mp) if err != nil { return nil, err } diff --git a/importer/importer.go b/importer/importer.go index 214c94d69..ac8fe03ca 100644 --- a/importer/importer.go +++ b/importer/importer.go @@ -7,8 +7,8 @@ import ( "github.com/jbenet/go-ipfs/importer/chunk" dag "github.com/jbenet/go-ipfs/merkledag" + "github.com/jbenet/go-ipfs/pin" ft "github.com/jbenet/go-ipfs/unixfs" - uio "github.com/jbenet/go-ipfs/unixfs/io" "github.com/jbenet/go-ipfs/util" ) @@ -74,7 +74,7 @@ func NewDagFromFile(fpath string) (*dag.Node, error) { return NewDagFromReader(f) } -func ImportFileDag(fpath string, dw *uio.DagWriter) (*dag.Node, error) { +func BuildDagFromFile(fpath string, ds dag.DAGService, mp pin.ManualPinner) (*dag.Node, error) { stat, err := os.Stat(fpath) if err != nil { return nil, err @@ -90,17 +90,50 @@ func ImportFileDag(fpath string, dw *uio.DagWriter) (*dag.Node, error) { } defer f.Close() - return ImportReaderDag(f, dw) + return BuildDagFromReader(f, ds, mp, chunk.DefaultSplitter) } -func ImportReaderDag(r io.Reader, dw *uio.DagWriter) (*dag.Node, error) { - _, err := io.Copy(dw, r) +func BuildDagFromReader(r io.Reader, ds dag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*dag.Node, error) { + blkChan := spl.Split(r) + first := <-blkChan + root := &dag.Node{} + + mbf := new(ft.MultiBlock) + for blk := range blkChan { + // Store the block size in the root node + mbf.AddBlockSize(uint64(len(blk))) + node := &dag.Node{Data: ft.WrapData(blk)} + nk, err := ds.Add(node) + if mp != nil { + mp.PinWithMode(nk, pin.Indirect) + } + if err != nil { + return nil, err + } + + // Add a link to this node without storing a reference to the memory + err = root.AddNodeLinkClean("", node) + if err != nil { + return nil, err + } + } + + // Generate the root node data + mbf.Data = first + data, err := mbf.GetBytes() if err != nil { return nil, err } - err = dw.Close() + root.Data = data + + // Add root node to the dagservice + rootk, err := ds.Add(root) if err != nil { return nil, err } - return dw.GetNode(), nil + if mp != nil { + mp.PinWithMode(rootk, pin.Recursive) + } + + return root, nil } diff --git a/importer/importer_test.go b/importer/importer_test.go index fcf872383..638ce9b8b 100644 --- a/importer/importer_test.go +++ b/importer/importer_test.go @@ -9,7 +9,10 @@ import ( "os" "testing" + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + bsrv "github.com/jbenet/go-ipfs/blockservice" "github.com/jbenet/go-ipfs/importer/chunk" + dag "github.com/jbenet/go-ipfs/merkledag" uio "github.com/jbenet/go-ipfs/unixfs/io" ) @@ -77,6 +80,37 @@ func testFileConsistency(t *testing.T, bs chunk.BlockSplitter, nbytes int) { } } +func TestBuilderConsistency(t *testing.T) { + nbytes := 100000 + dstore := ds.NewMapDatastore() + bserv, err := bsrv.NewBlockService(dstore, nil) + if err != nil { + t.Fatal(err) + } + dagserv := dag.NewDAGService(bserv) + buf := new(bytes.Buffer) + io.CopyN(buf, rand.Reader, int64(nbytes)) + should := dup(buf.Bytes()) + nd, err := BuildDagFromReader(buf, dagserv, nil, chunk.DefaultSplitter) + if err != nil { + t.Fatal(err) + } + r, err := uio.NewDagReader(nd, dagserv) + if err != nil { + t.Fatal(err) + } + + out, err := ioutil.ReadAll(r) + if err != nil { + t.Fatal(err) + } + + err = arrComp(out, should) + if err != nil { + t.Fatal(err) + } +} + func arrComp(a, b []byte) error { if len(a) != len(b) { return fmt.Errorf("Arrays differ in length. %d != %d", len(a), len(b)) diff --git a/routing/dht/query.go b/routing/dht/query.go index cd9fae98c..557af095c 100644 --- a/routing/dht/query.go +++ b/routing/dht/query.go @@ -207,7 +207,7 @@ func (r *dhtQueryRunner) spawnWorkers() { } func (r *dhtQueryRunner) queryPeer(p peer.Peer) { - log.Debugf("spawned worker for: %v\n", p) + log.Debugf("spawned worker for: %v", p) // make sure we rate limit concurrency. select { @@ -256,7 +256,7 @@ func (r *dhtQueryRunner) queryPeer(p peer.Peer) { r.cancel() // signal to everyone that we're done. } else if res.closerPeers != nil { - log.Debugf("PEERS CLOSER -- worker for: %v\n", p) + log.Debugf("PEERS CLOSER -- worker for: %v", p) for _, next := range res.closerPeers { r.addPeerToQuery(next, p) } diff --git a/unixfs/io/dagmodifier.go b/unixfs/io/dagmodifier.go index ebec24cfc..a05b9d6ed 100644 --- a/unixfs/io/dagmodifier.go +++ b/unixfs/io/dagmodifier.go @@ -13,6 +13,8 @@ import ( u "github.com/jbenet/go-ipfs/util" ) +var log = u.Logger("dagio") + // DagModifier is the only struct licensed and able to correctly // perform surgery on a DAG 'file' // Dear god, please rename this to something more pleasant diff --git a/unixfs/io/dagmodifier_test.go b/unixfs/io/dagmodifier_test.go index edb4d6f76..822c87471 100644 --- a/unixfs/io/dagmodifier_test.go +++ b/unixfs/io/dagmodifier_test.go @@ -7,6 +7,7 @@ import ( "testing" bs "github.com/jbenet/go-ipfs/blockservice" + imp "github.com/jbenet/go-ipfs/importer" "github.com/jbenet/go-ipfs/importer/chunk" mdag "github.com/jbenet/go-ipfs/merkledag" ft "github.com/jbenet/go-ipfs/unixfs" @@ -26,22 +27,11 @@ func getMockDagServ(t *testing.T) mdag.DAGService { } func getNode(t *testing.T, dserv mdag.DAGService, size int64) ([]byte, *mdag.Node) { - dw := NewDagWriter(dserv, &chunk.SizeSplitter{500}) - - n, err := io.CopyN(dw, u.NewTimeSeededRand(), size) + in := io.LimitReader(u.NewTimeSeededRand(), size) + node, err := imp.BuildDagFromReader(in, dserv, nil, &chunk.SizeSplitter{500}) if err != nil { t.Fatal(err) } - if n != size { - t.Fatal("Incorrect copy amount!") - } - - err = dw.Close() - if err != nil { - t.Fatal("DagWriter failed to close,", err) - } - - node := dw.GetNode() dr, err := NewDagReader(node, dserv) if err != nil { diff --git a/unixfs/io/dagwriter.go b/unixfs/io/dagwriter.go deleted file mode 100644 index 6575b1edf..000000000 --- a/unixfs/io/dagwriter.go +++ /dev/null @@ -1,115 +0,0 @@ -package io - -import ( - "github.com/jbenet/go-ipfs/importer/chunk" - dag "github.com/jbenet/go-ipfs/merkledag" - "github.com/jbenet/go-ipfs/pin" - ft "github.com/jbenet/go-ipfs/unixfs" - "github.com/jbenet/go-ipfs/util" -) - -var log = util.Logger("dagwriter") - -type DagWriter struct { - dagserv dag.DAGService - node *dag.Node - totalSize int64 - splChan chan []byte - done chan struct{} - splitter chunk.BlockSplitter - seterr error - Pinner pin.ManualPinner -} - -func NewDagWriter(ds dag.DAGService, splitter chunk.BlockSplitter) *DagWriter { - dw := new(DagWriter) - dw.dagserv = ds - dw.splChan = make(chan []byte, 8) - dw.splitter = splitter - dw.done = make(chan struct{}) - go dw.startSplitter() - return dw -} - -// startSplitter manages splitting incoming bytes and -// creating dag nodes from them. Created nodes are stored -// in the DAGService and then released to the GC. -func (dw *DagWriter) startSplitter() { - - // Since the splitter functions take a reader (and should!) - // we wrap our byte chan input in a reader - r := util.NewByteChanReader(dw.splChan) - blkchan := dw.splitter.Split(r) - - // First data block is reserved for storage in the root node - first := <-blkchan - mbf := new(ft.MultiBlock) - root := new(dag.Node) - - for blkData := range blkchan { - // Store the block size in the root node - mbf.AddBlockSize(uint64(len(blkData))) - node := &dag.Node{Data: ft.WrapData(blkData)} - nk, err := dw.dagserv.Add(node) - if dw.Pinner != nil { - dw.Pinner.PinWithMode(nk, pin.Indirect) - } - if err != nil { - dw.seterr = err - log.Critical("Got error adding created node to dagservice: %s", err) - return - } - - // Add a link to this node without storing a reference to the memory - err = root.AddNodeLinkClean("", node) - if err != nil { - dw.seterr = err - log.Critical("Got error adding created node to root node: %s", err) - return - } - } - - // Generate the root node data - mbf.Data = first - data, err := mbf.GetBytes() - if err != nil { - dw.seterr = err - log.Critical("Failed generating bytes for multiblock file: %s", err) - return - } - root.Data = data - - // Add root node to the dagservice - rootk, err := dw.dagserv.Add(root) - if err != nil { - dw.seterr = err - log.Critical("Got error adding created node to dagservice: %s", err) - return - } - if dw.Pinner != nil { - dw.Pinner.PinWithMode(rootk, pin.Recursive) - } - dw.node = root - dw.done <- struct{}{} -} - -func (dw *DagWriter) Write(b []byte) (int, error) { - if dw.seterr != nil { - return 0, dw.seterr - } - dw.splChan <- b - return len(b), nil -} - -// Close the splitters input channel and wait for it to finish -// Must be called to finish up splitting, otherwise split method -// will never halt -func (dw *DagWriter) Close() error { - close(dw.splChan) - <-dw.done - return nil -} - -func (dw *DagWriter) GetNode() *dag.Node { - return dw.node -} diff --git a/unixfs/io/dagwriter_test.go b/unixfs/io/dagwriter_test.go deleted file mode 100644 index edb4a0a70..000000000 --- a/unixfs/io/dagwriter_test.go +++ /dev/null @@ -1,171 +0,0 @@ -package io_test - -import ( - "testing" - - "io" - - ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" - bs "github.com/jbenet/go-ipfs/blockservice" - importer "github.com/jbenet/go-ipfs/importer" - chunk "github.com/jbenet/go-ipfs/importer/chunk" - mdag "github.com/jbenet/go-ipfs/merkledag" - dagio "github.com/jbenet/go-ipfs/unixfs/io" -) - -type datasource struct { - i int -} - -func (d *datasource) Read(b []byte) (int, error) { - for i, _ := range b { - b[i] = byte(d.i % 256) - d.i++ - } - return len(b), nil -} - -func (d *datasource) Matches(t *testing.T, r io.Reader, length int) bool { - b := make([]byte, 100) - i := 0 - for { - n, err := r.Read(b) - if err != nil && err != io.EOF { - t.Fatal(err) - } - for _, v := range b[:n] { - if v != byte(i%256) { - t.Fatalf("Buffers differed at byte: %d (%d != %d)", i, v, (i % 256)) - } - i++ - } - if err == io.EOF { - break - } - } - if i != length { - t.Fatalf("Incorrect length. (%d != %d)", i, length) - } - return true -} - -func TestDagWriter(t *testing.T) { - dstore := ds.NewMapDatastore() - bserv, err := bs.NewBlockService(dstore, nil) - if err != nil { - t.Fatal(err) - } - dag := mdag.NewDAGService(bserv) - dw := dagio.NewDagWriter(dag, &chunk.SizeSplitter{Size: 4096}) - - nbytes := int64(1024 * 1024 * 2) - n, err := io.CopyN(dw, &datasource{}, nbytes) - if err != nil { - t.Fatal(err) - } - - if n != nbytes { - t.Fatal("Copied incorrect amount of bytes!") - } - - dw.Close() - - node := dw.GetNode() - read, err := dagio.NewDagReader(node, dag) - if err != nil { - t.Fatal(err) - } - - d := &datasource{} - if !d.Matches(t, read, int(nbytes)) { - t.Fatal("Failed to validate!") - } -} - -func TestMassiveWrite(t *testing.T) { - t.SkipNow() - dstore := ds.NewNullDatastore() - bserv, err := bs.NewBlockService(dstore, nil) - if err != nil { - t.Fatal(err) - } - dag := mdag.NewDAGService(bserv) - dw := dagio.NewDagWriter(dag, &chunk.SizeSplitter{Size: 4096}) - - nbytes := int64(1024 * 1024 * 1024 * 16) - n, err := io.CopyN(dw, &datasource{}, nbytes) - if err != nil { - t.Fatal(err) - } - if n != nbytes { - t.Fatal("Incorrect copy size.") - } - dw.Close() -} - -func BenchmarkDagWriter(b *testing.B) { - dstore := ds.NewNullDatastore() - bserv, err := bs.NewBlockService(dstore, nil) - if err != nil { - b.Fatal(err) - } - dag := mdag.NewDAGService(bserv) - - b.ResetTimer() - nbytes := int64(100000) - for i := 0; i < b.N; i++ { - b.SetBytes(nbytes) - dw := dagio.NewDagWriter(dag, &chunk.SizeSplitter{Size: 4096}) - n, err := io.CopyN(dw, &datasource{}, nbytes) - if err != nil { - b.Fatal(err) - } - if n != nbytes { - b.Fatal("Incorrect copy size.") - } - dw.Close() - } - -} - -func TestAgainstImporter(t *testing.T) { - dstore := ds.NewMapDatastore() - bserv, err := bs.NewBlockService(dstore, nil) - if err != nil { - t.Fatal(err) - } - dag := mdag.NewDAGService(bserv) - - nbytes := int64(1024 * 1024 * 2) - - // DagWriter - dw := dagio.NewDagWriter(dag, &chunk.SizeSplitter{4096}) - n, err := io.CopyN(dw, &datasource{}, nbytes) - if err != nil { - t.Fatal(err) - } - if n != nbytes { - t.Fatal("Copied incorrect amount of bytes!") - } - - dw.Close() - dwNode := dw.GetNode() - dwKey, err := dwNode.Key() - if err != nil { - t.Fatal(err) - } - - // DagFromFile - rl := &io.LimitedReader{&datasource{}, nbytes} - - dffNode, err := importer.NewDagFromReaderWithSplitter(rl, &chunk.SizeSplitter{4096}) - dffKey, err := dffNode.Key() - if err != nil { - t.Fatal(err) - } - if dwKey.String() != dffKey.String() { - t.Errorf("\nDagWriter produced %s\n"+ - "DagFromReader produced %s", - dwKey, dffKey) - } -}