diff --git a/importer/helpers/helpers.go b/importer/helpers/helpers.go index be0415db0..494fb87f2 100644 --- a/importer/helpers/helpers.go +++ b/importer/helpers/helpers.go @@ -7,6 +7,7 @@ import ( dag "github.com/jbenet/go-ipfs/merkledag" "github.com/jbenet/go-ipfs/pin" ft "github.com/jbenet/go-ipfs/unixfs" + u "github.com/jbenet/go-ipfs/util" ) // BlockSizeLimit specifies the maximum size an imported block can have. @@ -115,7 +116,11 @@ func (n *UnixfsNode) AddChild(child *UnixfsNode, db *DagBuilderHelper) error { } // Removes the child node at the given index -func (n *UnixfsNode) RemoveChild(index int) { +func (n *UnixfsNode) RemoveChild(index int, dbh *DagBuilderHelper) { + k := u.Key(n.node.Links[index].Hash) + if dbh.mp != nil { + dbh.mp.RemovePinWithMode(k, pin.Indirect) + } n.ufmt.RemoveBlockSize(index) n.node.Links = append(n.node.Links[:index], n.node.Links[index+1:]...) } diff --git a/importer/trickle/trickledag.go b/importer/trickle/trickledag.go index 76a7110b6..fa217bb4c 100644 --- a/importer/trickle/trickledag.go +++ b/importer/trickle/trickledag.go @@ -134,7 +134,7 @@ func appendFillLastChild(ufsn *h.UnixfsNode, depth int, layerFill int, db *h.Dag } // Update changed child in parent node - ufsn.RemoveChild(last) + ufsn.RemoveChild(last, db) err = ufsn.AddChild(nchild, db) if err != nil { return err diff --git a/pin/pin.go b/pin/pin.go index 68e627c1e..7ae36f607 100644 --- a/pin/pin.go +++ b/pin/pin.go @@ -47,6 +47,7 @@ type Pinner interface { // may not be successful type ManualPinner interface { PinWithMode(util.Key, PinMode) + RemovePinWithMode(util.Key, PinMode) Pinner } @@ -198,6 +199,20 @@ func (p *pinner) IsPinned(key util.Key) bool { p.indirPin.HasKey(key) } +func (p *pinner) RemovePinWithMode(key util.Key, mode PinMode) { + switch mode { + case Direct: + p.directPin.RemoveBlock(key) + case Indirect: + p.indirPin.Decrement(key) + case Recursive: + p.recursePin.RemoveBlock(key) + default: + // programmer error, panic OK + panic("unrecognized pin type") + } +} + // LoadPinner loads a pinner and its keysets from the given datastore func LoadPinner(d ds.ThreadSafeDatastore, dserv mdag.DAGService) (Pinner, error) { p := new(pinner) diff --git a/unixfs/mod/dagmodifier.go b/unixfs/mod/dagmodifier.go index 61f480e9e..e08c3bf86 100644 --- a/unixfs/mod/dagmodifier.go +++ b/unixfs/mod/dagmodifier.go @@ -172,13 +172,19 @@ func (dm *DagModifier) Flush() error { // Number of bytes we're going to write buflen := dm.wrBuf.Len() - // overwrite existing dag nodes - k, done, err := dm.modifyDag(dm.curNode, dm.writeStart, dm.wrBuf) + // Grab key for unpinning after mod operation + curk, err := dm.curNode.Key() if err != nil { return err } - nd, err := dm.dagserv.Get(k) + // overwrite existing dag nodes + thisk, done, err := dm.modifyDag(dm.curNode, dm.writeStart, dm.wrBuf) + if err != nil { + return err + } + + nd, err := dm.dagserv.Get(thisk) if err != nil { return err } @@ -193,7 +199,7 @@ func (dm *DagModifier) Flush() error { return err } - _, err := dm.dagserv.Add(nd) + thisk, err = dm.dagserv.Add(nd) if err != nil { return err } @@ -201,6 +207,14 @@ func (dm *DagModifier) Flush() error { dm.curNode = nd } + // Finalize correct pinning, and flush pinner + dm.mp.PinWithMode(thisk, pin.Recursive) + dm.mp.RemovePinWithMode(curk, pin.Recursive) + err = dm.mp.Flush() + if err != nil { + return err + } + dm.writeStart += uint64(buflen) dm.wrBuf = nil @@ -237,7 +251,7 @@ func (dm *DagModifier) modifyDag(node *mdag.Node, offset uint64, data io.Reader) // Hey look! we're done! var done bool - if n < len(f.Data) { + if n < len(f.Data[offset:]) { done = true } @@ -249,6 +263,10 @@ func (dm *DagModifier) modifyDag(node *mdag.Node, offset uint64, data io.Reader) for i, bs := range f.GetBlocksizes() { // We found the correct child to write into if cur+bs > offset { + // Unpin block + ckey := u.Key(node.Links[i].Hash) + dm.mp.RemovePinWithMode(ckey, pin.Indirect) + child, err := node.Links[i].GetNode(dm.dagserv) if err != nil { return "", false, err @@ -258,14 +276,24 @@ func (dm *DagModifier) modifyDag(node *mdag.Node, offset uint64, data io.Reader) return "", false, err } + // pin the new node + dm.mp.PinWithMode(k, pin.Indirect) + offset += bs node.Links[i].Hash = mh.Multihash(k) + // Recache serialized node + _, err = node.Encoded(true) + if err != nil { + return "", false, err + } + if sdone { // No more bytes to write! done = true break } + offset = cur + bs } cur += bs } @@ -293,7 +321,8 @@ func (dm *DagModifier) Read(b []byte) (int, error) { } if dm.read == nil { - dr, err := uio.NewDagReader(dm.ctx, dm.curNode, dm.dagserv) + ctx, cancel := context.WithCancel(dm.ctx) + dr, err := uio.NewDagReader(ctx, dm.curNode, dm.dagserv) if err != nil { return 0, err } @@ -307,6 +336,7 @@ func (dm *DagModifier) Read(b []byte) (int, error) { return 0, ErrSeekFail } + dm.readCancel = cancel dm.read = dr } @@ -451,5 +481,11 @@ func dagTruncate(nd *mdag.Node, size uint64, ds mdag.DAGService) (*mdag.Node, er nd.Data = d + // invalidate cache and recompute serialized data + _, err = nd.Encoded(true) + if err != nil { + return nil, err + } + return nd, nil } diff --git a/unixfs/mod/dagmodifier_test.go b/unixfs/mod/dagmodifier_test.go index 90c6d99e5..d5ae29d7d 100644 --- a/unixfs/mod/dagmodifier_test.go +++ b/unixfs/mod/dagmodifier_test.go @@ -4,6 +4,8 @@ import ( "fmt" "io" "io/ioutil" + "math/rand" + "os" "testing" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" @@ -15,6 +17,7 @@ import ( h "github.com/jbenet/go-ipfs/importer/helpers" trickle "github.com/jbenet/go-ipfs/importer/trickle" mdag "github.com/jbenet/go-ipfs/merkledag" + pin "github.com/jbenet/go-ipfs/pin" ft "github.com/jbenet/go-ipfs/unixfs" uio "github.com/jbenet/go-ipfs/unixfs/io" u "github.com/jbenet/go-ipfs/util" @@ -23,7 +26,7 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" ) -func getMockDagServ(t testing.TB) mdag.DAGService { +func getMockDagServ(t testing.TB) (mdag.DAGService, pin.ManualPinner) { dstore := ds.NewMapDatastore() tsds := sync.MutexWrap(dstore) bstore := blockstore.NewBlockstore(tsds) @@ -31,12 +34,25 @@ func getMockDagServ(t testing.TB) mdag.DAGService { if err != nil { t.Fatal(err) } - return mdag.NewDAGService(bserv) + dserv := mdag.NewDAGService(bserv) + return dserv, pin.NewPinner(tsds, dserv).GetManual() } -func getNode(t testing.TB, dserv mdag.DAGService, size int64) ([]byte, *mdag.Node) { +func getMockDagServAndBstore(t testing.TB) (mdag.DAGService, blockstore.Blockstore, pin.ManualPinner) { + dstore := ds.NewMapDatastore() + tsds := sync.MutexWrap(dstore) + bstore := blockstore.NewBlockstore(tsds) + bserv, err := bs.New(bstore, offline.Exchange(bstore)) + if err != nil { + t.Fatal(err) + } + dserv := mdag.NewDAGService(bserv) + return dserv, bstore, pin.NewPinner(tsds, dserv).GetManual() +} + +func getNode(t testing.TB, dserv mdag.DAGService, size int64, pinner pin.ManualPinner) ([]byte, *mdag.Node) { in := io.LimitReader(u.NewTimeSeededRand(), size) - node, err := imp.BuildTrickleDagFromReader(in, dserv, nil, &chunk.SizeSplitter{500}) + node, err := imp.BuildTrickleDagFromReader(in, dserv, pinner, &chunk.SizeSplitter{500}) if err != nil { t.Fatal(err) } @@ -101,12 +117,12 @@ func testModWrite(t *testing.T, beg, size uint64, orig []byte, dm *DagModifier) } func TestDagModifierBasic(t *testing.T) { - dserv := getMockDagServ(t) - b, n := getNode(t, dserv, 50000) + dserv, pin := getMockDagServ(t) + b, n := getNode(t, dserv, 50000, pin) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dagmod, err := NewDagModifier(ctx, n, dserv, nil, &chunk.SizeSplitter{Size: 512}) + dagmod, err := NewDagModifier(ctx, n, dserv, pin, &chunk.SizeSplitter{Size: 512}) if err != nil { t.Fatal(err) } @@ -155,13 +171,13 @@ func TestDagModifierBasic(t *testing.T) { } func TestMultiWrite(t *testing.T) { - dserv := getMockDagServ(t) - _, n := getNode(t, dserv, 0) + dserv, pins := getMockDagServ(t) + _, n := getNode(t, dserv, 0, pins) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dagmod, err := NewDagModifier(ctx, n, dserv, nil, &chunk.SizeSplitter{Size: 512}) + dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512}) if err != nil { t.Fatal(err) } @@ -208,13 +224,13 @@ func TestMultiWrite(t *testing.T) { } func TestMultiWriteAndFlush(t *testing.T) { - dserv := getMockDagServ(t) - _, n := getNode(t, dserv, 0) + dserv, pins := getMockDagServ(t) + _, n := getNode(t, dserv, 0, pins) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dagmod, err := NewDagModifier(ctx, n, dserv, nil, &chunk.SizeSplitter{Size: 512}) + dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512}) if err != nil { t.Fatal(err) } @@ -256,13 +272,13 @@ func TestMultiWriteAndFlush(t *testing.T) { } func TestWriteNewFile(t *testing.T) { - dserv := getMockDagServ(t) - _, n := getNode(t, dserv, 0) + dserv, pins := getMockDagServ(t) + _, n := getNode(t, dserv, 0, pins) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dagmod, err := NewDagModifier(ctx, n, dserv, nil, &chunk.SizeSplitter{Size: 512}) + dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512}) if err != nil { t.Fatal(err) } @@ -299,13 +315,13 @@ func TestWriteNewFile(t *testing.T) { } func TestMultiWriteCoal(t *testing.T) { - dserv := getMockDagServ(t) - _, n := getNode(t, dserv, 0) + dserv, pins := getMockDagServ(t) + _, n := getNode(t, dserv, 0, pins) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dagmod, err := NewDagModifier(ctx, n, dserv, nil, &chunk.SizeSplitter{Size: 512}) + dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512}) if err != nil { t.Fatal(err) } @@ -345,13 +361,13 @@ func TestMultiWriteCoal(t *testing.T) { } func TestLargeWriteChunks(t *testing.T) { - dserv := getMockDagServ(t) - _, n := getNode(t, dserv, 0) + dserv, pins := getMockDagServ(t) + _, n := getNode(t, dserv, 0, pins) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dagmod, err := NewDagModifier(ctx, n, dserv, nil, &chunk.SizeSplitter{Size: 512}) + dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512}) if err != nil { t.Fatal(err) } @@ -384,12 +400,12 @@ func TestLargeWriteChunks(t *testing.T) { } func TestDagTruncate(t *testing.T) { - dserv := getMockDagServ(t) - b, n := getNode(t, dserv, 50000) + dserv, pins := getMockDagServ(t) + b, n := getNode(t, dserv, 50000, pins) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dagmod, err := NewDagModifier(ctx, n, dserv, nil, &chunk.SizeSplitter{Size: 512}) + dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512}) if err != nil { t.Fatal(err) } @@ -399,6 +415,11 @@ func TestDagTruncate(t *testing.T) { t.Fatal(err) } + _, err = dagmod.Seek(0, os.SEEK_SET) + if err != nil { + t.Fatal(err) + } + out, err := ioutil.ReadAll(dagmod) if err != nil { t.Fatal(err) @@ -409,16 +430,174 @@ func TestDagTruncate(t *testing.T) { } } +func TestSparseWrite(t *testing.T) { + dserv, pins := getMockDagServ(t) + _, n := getNode(t, dserv, 0, pins) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512}) + if err != nil { + t.Fatal(err) + } + + buf := make([]byte, 5000) + u.NewTimeSeededRand().Read(buf[2500:]) + + wrote, err := dagmod.WriteAt(buf[2500:], 2500) + if err != nil { + t.Fatal(err) + } + + if wrote != 2500 { + t.Fatal("incorrect write amount") + } + + _, err = dagmod.Seek(0, os.SEEK_SET) + if err != nil { + t.Fatal(err) + } + + out, err := ioutil.ReadAll(dagmod) + if err != nil { + t.Fatal(err) + } + + if err = arrComp(out, buf); err != nil { + t.Fatal(err) + } +} + +func basicGC(t *testing.T, bs blockstore.Blockstore, pins pin.ManualPinner) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // in case error occurs during operation + keychan, err := bs.AllKeysChan(ctx) + if err != nil { + t.Fatal(err) + } + for k := range keychan { // rely on AllKeysChan to close chan + if !pins.IsPinned(k) { + err := bs.DeleteBlock(k) + if err != nil { + t.Fatal(err) + } + } + } +} +func TestCorrectPinning(t *testing.T) { + dserv, bstore, pins := getMockDagServAndBstore(t) + b, n := getNode(t, dserv, 50000, pins) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512}) + if err != nil { + t.Fatal(err) + } + + buf := make([]byte, 1024) + for i := 0; i < 100; i++ { + size, err := dagmod.Size() + if err != nil { + t.Fatal(err) + } + offset := rand.Intn(int(size)) + u.NewTimeSeededRand().Read(buf) + + if offset+len(buf) > int(size) { + b = append(b[:offset], buf...) + } else { + copy(b[offset:], buf) + } + + n, err := dagmod.WriteAt(buf, int64(offset)) + if err != nil { + t.Fatal(err) + } + if n != len(buf) { + t.Fatal("wrote incorrect number of bytes") + } + } + + fisize, err := dagmod.Size() + if err != nil { + t.Fatal(err) + } + + if int(fisize) != len(b) { + t.Fatal("reported filesize incorrect", fisize, len(b)) + } + + // Run a GC, then ensure we can still read the file correctly + basicGC(t, bstore, pins) + + nd, err := dagmod.GetNode() + if err != nil { + t.Fatal(err) + } + read, err := uio.NewDagReader(context.Background(), nd, dserv) + if err != nil { + t.Fatal(err) + } + + out, err := ioutil.ReadAll(read) + if err != nil { + t.Fatal(err) + } + + if err = arrComp(out, b); err != nil { + t.Fatal(err) + } + + rootk, err := nd.Key() + if err != nil { + t.Fatal(err) + } + + // Verify only one recursive pin + recpins := pins.RecursiveKeys() + if len(recpins) != 1 { + t.Fatal("Incorrect number of pinned entries") + } + + // verify the correct node is pinned + if recpins[0] != rootk { + t.Fatal("Incorrect node recursively pinned") + } + + indirpins := pins.IndirectKeys() + children := enumerateChildren(t, nd, dserv) + if len(indirpins) != len(children) { + t.Log(len(indirpins), len(children)) + t.Fatal("Incorrect number of indirectly pinned blocks") + } + +} + +func enumerateChildren(t *testing.T, nd *mdag.Node, ds mdag.DAGService) []u.Key { + var out []u.Key + for _, lnk := range nd.Links { + out = append(out, u.Key(lnk.Hash)) + child, err := lnk.GetNode(ds) + if err != nil { + t.Fatal(err) + } + children := enumerateChildren(t, child, ds) + out = append(out, children...) + } + return out +} + func BenchmarkDagmodWrite(b *testing.B) { b.StopTimer() - dserv := getMockDagServ(b) - _, n := getNode(b, dserv, 0) + dserv, pins := getMockDagServ(b) + _, n := getNode(b, dserv, 0, pins) ctx, cancel := context.WithCancel(context.Background()) defer cancel() wrsize := 4096 - dagmod, err := NewDagModifier(ctx, n, dserv, nil, &chunk.SizeSplitter{Size: 512}) + dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512}) if err != nil { b.Fatal(err) }