diff --git a/blocks/blockstore/blockstore.go b/blocks/blockstore/blockstore.go index 8221ec4a5..51979e653 100644 --- a/blocks/blockstore/blockstore.go +++ b/blocks/blockstore/blockstore.go @@ -43,13 +43,13 @@ type GCBlockstore interface { // GCLock locks the blockstore for garbage collection. No operations // that expect to finish with a pin should ocurr simultaneously. // Reading during GC is safe, and requires no lock. - GCLock() func() + GCLock() Unlocker // PinLock locks the blockstore for sequences of puts expected to finish // with a pin (before GC). Multiple put->pin sequences can write through // at the same time, but no GC should not happen simulatenously. // Reading during Pinning is safe, and requires no lock. - PinLock() func() + PinLock() Unlocker // GcRequested returns true if GCLock has been called and is waiting to // take the lock @@ -198,16 +198,29 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan key.Key, error) { return output, nil } -func (bs *blockstore) GCLock() func() { +type Unlocker interface { + Unlock() +} + +type unlocker struct { + unlock func() +} + +func (u *unlocker) Unlock() { + u.unlock() + u.unlock = nil // ensure its not called twice +} + +func (bs *blockstore) GCLock() Unlocker { atomic.AddInt32(&bs.gcreq, 1) bs.lk.Lock() atomic.AddInt32(&bs.gcreq, -1) - return bs.lk.Unlock + return &unlocker{bs.lk.Unlock} } -func (bs *blockstore) PinLock() func() { +func (bs *blockstore) PinLock() Unlocker { bs.lk.RLock() - return bs.lk.RUnlock + return &unlocker{bs.lk.RUnlock} } func (bs *blockstore) GCRequested() bool { diff --git a/blocks/blockstore/write_cache.go b/blocks/blockstore/write_cache.go index 90109e8a2..2567a7216 100644 --- a/blocks/blockstore/write_cache.go +++ b/blocks/blockstore/write_cache.go @@ -59,11 +59,11 @@ func (w *writecache) AllKeysChan(ctx context.Context) (<-chan key.Key, error) { return w.blockstore.AllKeysChan(ctx) } -func (w *writecache) GCLock() func() { +func (w *writecache) GCLock() Unlocker { return w.blockstore.(GCBlockstore).GCLock() } -func (w *writecache) PinLock() func() { +func (w *writecache) PinLock() Unlocker { return w.blockstore.(GCBlockstore).PinLock() } diff --git a/core/commands/files/files.go b/core/commands/files/files.go index 6d20de2d6..d31fa7668 100644 --- a/core/commands/files/files.go +++ b/core/commands/files/files.go @@ -109,8 +109,7 @@ func statNode(ds dag.DAGService, fsn mfs.FSNode) (*Object, error) { return nil, err } - // add to dagserv to ensure its available - k, err := ds.Add(nd) + k, err := nd.Key() if err != nil { return nil, err } @@ -159,6 +158,11 @@ var FilesCpCmd = &cmds.Command{ return } + flush, found, _ := req.Option("flush").Bool() + if !found { + flush = true + } + src, err := checkPath(req.Arguments()[0]) if err != nil { res.SetError(err, cmds.ErrNormal) @@ -181,6 +185,14 @@ var FilesCpCmd = &cmds.Command{ res.SetError(err, cmds.ErrNormal) return } + + if flush { + err := mfs.FlushPath(node.FilesRoot, dst) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + } }, } @@ -501,8 +513,8 @@ Warning: create, _, _ := req.Option("create").Bool() trunc, _, _ := req.Option("truncate").Bool() - flush, set, _ := req.Option("flush").Bool() - if !set { + flush, fset, _ := req.Option("flush").Bool() + if !fset { flush = true } @@ -529,14 +541,7 @@ Warning: } if flush { - defer func() { - fi.Close() - err := mfs.FlushPath(nd.FilesRoot, path) - if err != nil { - res.SetError(err, cmds.ErrNormal) - return - } - }() + defer fi.Close() } else { defer fi.Sync() } @@ -653,9 +658,6 @@ are run with the '--flush=false'. return } - // take the lock and defer the unlock - defer nd.Blockstore.PinLock()() - path := "/" if len(req.Arguments()) > 0 { path = req.Arguments()[0] diff --git a/core/commands/pin.go b/core/commands/pin.go index 55262e371..cc3333328 100644 --- a/core/commands/pin.go +++ b/core/commands/pin.go @@ -54,8 +54,7 @@ on disk. return } - unlock := n.Blockstore.PinLock() - defer unlock() + defer n.Blockstore.PinLock().Unlock() // set recursive flag recursive, found, err := req.Option("recursive").Bool() diff --git a/core/core.go b/core/core.go index 5b42545e8..761a17389 100644 --- a/core/core.go +++ b/core/core.go @@ -474,23 +474,6 @@ func (n *IpfsNode) loadBootstrapPeers() ([]peer.PeerInfo, error) { func (n *IpfsNode) loadFilesRoot() error { dsk := ds.NewKey("/local/filesroot") pf := func(ctx context.Context, k key.Key) error { - ds := n.Repo.Datastore() - if old, err := ds.Get(dsk); err == nil && old != nil { - _ = n.Pinning.Unpin(ctx, key.Key(old.([]byte)), true) - } - nnd, err := n.DAG.Get(ctx, k) - if err != nil { - return err - } - - if err := n.Pinning.Pin(ctx, nnd, true); err != nil { - return err - } - - if err := n.Pinning.Flush(); err != nil { - return err - } - return n.Repo.Datastore().Put(dsk, []byte(k)) } diff --git a/core/coreunix/add.go b/core/coreunix/add.go index a7f695116..c8a61c306 100644 --- a/core/coreunix/add.go +++ b/core/coreunix/add.go @@ -20,6 +20,7 @@ import ( "github.com/ipfs/go-ipfs/pin" context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" + bs "github.com/ipfs/go-ipfs/blocks/blockstore" "github.com/ipfs/go-ipfs/commands/files" core "github.com/ipfs/go-ipfs/core" dag "github.com/ipfs/go-ipfs/merkledag" @@ -100,7 +101,7 @@ type Adder struct { Chunker string root *dag.Node mr *mfs.Root - unlock func() + unlocker bs.Unlocker tempRoot key.Key } @@ -225,8 +226,7 @@ func (adder *Adder) outputDirs(path string, nd *dag.Node) error { // Add builds a merkledag from the a reader, pinning all objects to the local // datastore. Returns a key representing the root node. func Add(n *core.IpfsNode, r io.Reader) (string, error) { - unlock := n.Blockstore.PinLock() - defer unlock() + defer n.Blockstore.PinLock().Unlock() fileAdder, err := NewAdder(n.Context(), n, nil) if err != nil { @@ -247,8 +247,7 @@ func Add(n *core.IpfsNode, r io.Reader) (string, error) { // AddR recursively adds files in |path|. func AddR(n *core.IpfsNode, root string) (key string, err error) { - unlock := n.Blockstore.PinLock() - defer unlock() + n.Blockstore.PinLock().Unlock() stat, err := os.Lstat(root) if err != nil { @@ -296,8 +295,7 @@ func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, *dag.No } fileAdder.Wrap = true - unlock := n.Blockstore.PinLock() - defer unlock() + defer n.Blockstore.PinLock().Unlock() err = fileAdder.addFile(file) if err != nil { @@ -347,8 +345,10 @@ func (adder *Adder) addNode(node *dag.Node, path string) error { // Add the given file while respecting the adder. func (adder *Adder) AddFile(file files.File) error { - adder.unlock = adder.node.Blockstore.PinLock() - defer adder.unlock() + adder.unlocker = adder.node.Blockstore.PinLock() + defer func() { + adder.unlocker.Unlock() + }() return adder.addFile(file) } @@ -434,8 +434,8 @@ func (adder *Adder) maybePauseForGC() error { return err } - adder.unlock() - adder.unlock = adder.node.Blockstore.PinLock() + adder.unlocker.Unlock() + adder.unlocker = adder.node.Blockstore.PinLock() } return nil } diff --git a/mfs/dir.go b/mfs/dir.go index c70555bb7..d437b28d7 100644 --- a/mfs/dir.go +++ b/mfs/dir.go @@ -51,17 +51,20 @@ func NewDirectory(ctx context.Context, name string, node *dag.Node, parent child // closeChild updates the child by the given name to the dag node 'nd' // and changes its own dag node -func (d *Directory) closeChild(name string, nd *dag.Node) error { - mynd, err := d.closeChildUpdate(name, nd) +func (d *Directory) closeChild(name string, nd *dag.Node, sync bool) error { + mynd, err := d.closeChildUpdate(name, nd, sync) if err != nil { return err } - return d.parent.closeChild(d.name, mynd) + if sync { + return d.parent.closeChild(d.name, mynd, true) + } + return nil } // closeChildUpdate is the portion of closeChild that needs to be locked around -func (d *Directory) closeChildUpdate(name string, nd *dag.Node) (*dag.Node, error) { +func (d *Directory) closeChildUpdate(name string, nd *dag.Node, sync bool) (*dag.Node, error) { d.lock.Lock() defer d.lock.Unlock() @@ -70,7 +73,10 @@ func (d *Directory) closeChildUpdate(name string, nd *dag.Node) (*dag.Node, erro return nil, err } - return d.flushCurrentNode() + if sync { + return d.flushCurrentNode() + } + return nil, nil } func (d *Directory) flushCurrentNode() (*dag.Node, error) { @@ -295,12 +301,15 @@ func (d *Directory) Unlink(name string) error { } func (d *Directory) Flush() error { + d.lock.Lock() nd, err := d.flushCurrentNode() if err != nil { + d.lock.Unlock() return err } + d.lock.Unlock() - return d.parent.closeChild(d.name, nd) + return d.parent.closeChild(d.name, nd, true) } // AddChild adds the node 'nd' under this directory giving it the name 'name' @@ -335,11 +344,6 @@ func (d *Directory) sync() error { return err } - _, err = d.dserv.Add(nd) - if err != nil { - return err - } - err = d.updateChild(name, nd) if err != nil { return err @@ -352,11 +356,6 @@ func (d *Directory) sync() error { return err } - _, err = d.dserv.Add(nd) - if err != nil { - return err - } - err = d.updateChild(name, nd) if err != nil { return err @@ -385,6 +384,11 @@ func (d *Directory) GetNode() (*dag.Node, error) { return nil, err } + _, err = d.dserv.Add(d.node) + if err != nil { + return nil, err + } + return d.node.Copy(), nil } diff --git a/mfs/file.go b/mfs/file.go index da4737140..99dfff0df 100644 --- a/mfs/file.go +++ b/mfs/file.go @@ -73,7 +73,7 @@ func (fi *File) Close() error { // explicitly stay locked for flushUp call, // it will manage the lock for us - return fi.flushUp() + return fi.flushUp(true) } fi.Unlock() @@ -82,7 +82,7 @@ func (fi *File) Close() error { // flushUp syncs the file and adds it to the dagservice // it *must* be called with the File's lock taken -func (fi *File) flushUp() error { +func (fi *File) flushUp(fullsync bool) error { nd, err := fi.mod.GetNode() if err != nil { fi.Unlock() @@ -95,20 +95,18 @@ func (fi *File) flushUp() error { return err } - //name := fi.name - //parent := fi.parent + name := fi.name + parent := fi.parent // explicit unlock *only* before closeChild call fi.Unlock() - return nil - //return parent.closeChild(name, nd) + return parent.closeChild(name, nd, fullsync) } // Sync flushes the changes in the file to disk func (fi *File) Sync() error { fi.Lock() - defer fi.Unlock() - return fi.mod.Sync() + return fi.flushUp(false) } // Seek implements io.Seeker diff --git a/mfs/mfs_test.go b/mfs/mfs_test.go index 161a8945a..38237c218 100644 --- a/mfs/mfs_test.go +++ b/mfs/mfs_test.go @@ -576,15 +576,15 @@ func actorRemoveFile(d *Directory) error { return d.Unlink(re.Name) } -func actorReadFile(d *Directory) error { +func randomFile(d *Directory) (*File, error) { d, err := randomWalk(d, rand.Intn(6)) if err != nil { - return err + return nil, err } ents, err := d.List() if err != nil { - return err + return nil, err } var files []string @@ -595,18 +595,61 @@ func actorReadFile(d *Directory) error { } if len(files) == 0 { - return nil + return nil, nil } fname := files[rand.Intn(len(files))] fsn, err := d.Child(fname) if err != nil { - return err + return nil, err } fi, ok := fsn.(*File) if !ok { - return errors.New("file wasnt a file, race?") + return nil, errors.New("file wasnt a file, race?") + } + + return fi, nil +} + +func actorWriteFile(d *Directory) error { + fi, err := randomFile(d) + if err != nil { + return err + } + if fi == nil { + return nil + } + + size := rand.Intn(1024) + buf := make([]byte, size) + randbo.New().Read(buf) + + s, err := fi.Size() + if err != nil { + return err + } + + offset := rand.Int63n(s) + + n, err := fi.WriteAt(buf, offset) + if err != nil { + return err + } + if n != size { + return fmt.Errorf("didnt write enough") + } + + return fi.Close() +} + +func actorReadFile(d *Directory) error { + fi, err := randomFile(d) + if err != nil { + return err + } + if fi == nil { + return nil } _, err = fi.Size() @@ -637,12 +680,7 @@ func testActor(rt *Root, iterations int, errs chan error) { return } case 3: - continue - // randomly deleting things - // doesnt really give us any sort of useful test results. - // you will never have this in a real environment where - // you expect anything productive to happen... - if err := actorRemoveFile(d); err != nil { + if err := actorWriteFile(d); err != nil { errs <- err return } @@ -698,6 +736,9 @@ func TestFlushing(t *testing.T) { if err := e.AddChild("TEST", nd1); err != nil { t.Fatal(err) } + if err := dir.AddChild("FILE", nd1); err != nil { + t.Fatal(err) + } if err := FlushPath(rt, "/a/b/c/TEST"); err != nil { t.Fatal(err) @@ -711,17 +752,31 @@ func TestFlushing(t *testing.T) { t.Fatal(err) } + if err := FlushPath(rt, "/FILE"); err != nil { + t.Fatal(err) + } + rnd, err := dir.GetNode() if err != nil { t.Fatal(err) } + fsnode, err := ft.FSNodeFromBytes(rnd.Data) + if err != nil { + t.Fatal(err) + } + + if fsnode.Type != ft.TDirectory { + t.Fatal("root wasnt a directory") + } + rnk, err := rnd.Key() if err != nil { t.Fatal(err) } - if rnk.B58String() != "QmWcvrHUFk7LQRrA4WqKjqy7ZyRGFLVagtgNxbEodTEzQ4" { - t.Fatal("dag looks wrong") + exp := "QmWMVyhTuyxUrXX3ynz171jq76yY3PktfY9Bxiph7b9ikr" + if rnk.B58String() != exp { + t.Fatalf("dag looks wrong, expected %s, but got %s", exp, rnk.B58String()) } } diff --git a/mfs/ops.go b/mfs/ops.go index f12dfa743..9bc59994c 100644 --- a/mfs/ops.go +++ b/mfs/ops.go @@ -230,11 +230,6 @@ func flushPathRec(d *Directory, parts []string) (*dag.Node, error) { return nil, err } - _, err = d.dserv.Add(nd) - if err != nil { - return nil, err - } - return nd, nil } @@ -247,6 +242,7 @@ func flushPathRec(d *Directory, parts []string) (*dag.Node, error) { return nil, err } + var ndagnode *dag.Node switch next := next.(type) { case *Directory: nd, err := flushPathRec(next, parts[1:]) @@ -254,25 +250,33 @@ func flushPathRec(d *Directory, parts []string) (*dag.Node, error) { return nil, err } - newnode, err := d.node.UpdateNodeLink(parts[0], nd) - if err != nil { - return nil, err - } + ndagnode = nd - _, err = d.dserv.Add(newnode) - if err != nil { - return nil, err - } - - d.node = newnode - return newnode, nil case *File: if len(parts) > 1 { return nil, fmt.Errorf("%s is a file, not a directory", parts[0]) } - return next.GetNode() + child, err := next.GetNode() + if err != nil { + return nil, err + } + + ndagnode = child default: return nil, fmt.Errorf("unrecognized FSNode type: %#v", next) } + + newnode, err := d.node.UpdateNodeLink(parts[0], ndagnode) + if err != nil { + return nil, err + } + + _, err = d.dserv.Add(newnode) + if err != nil { + return nil, err + } + + d.node = newnode + return newnode, nil } diff --git a/mfs/system.go b/mfs/system.go index b5fe38768..454577c45 100644 --- a/mfs/system.go +++ b/mfs/system.go @@ -29,7 +29,7 @@ var log = logging.Logger("mfs") var ErrIsDirectory = errors.New("error: is a directory") type childCloser interface { - closeChild(string, *dag.Node) error + closeChild(string, *dag.Node, bool) error } type NodeType int @@ -115,7 +115,7 @@ func (kr *Root) Flush() error { return err } - k, err := kr.dserv.Add(nd) + k, err := nd.Key() if err != nil { return err } @@ -128,7 +128,7 @@ func (kr *Root) Flush() error { // closeChild implements the childCloser interface, and signals to the publisher that // there are changes ready to be published -func (kr *Root) closeChild(name string, nd *dag.Node) error { +func (kr *Root) closeChild(name string, nd *dag.Node, sync bool) error { k, err := kr.dserv.Add(nd) if err != nil { return err @@ -146,7 +146,7 @@ func (kr *Root) Close() error { return err } - k, err := kr.dserv.Add(nd) + k, err := nd.Key() if err != nil { return err } diff --git a/pin/gc/gc.go b/pin/gc/gc.go index 695da62ec..0aad6c03f 100644 --- a/pin/gc/gc.go +++ b/pin/gc/gc.go @@ -23,7 +23,7 @@ var log = logging.Logger("gc") // The routine then iterates over every block in the blockstore and // deletes any block that is not found in the marked set. func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner) (<-chan key.Key, error) { - unlock := bs.GCLock() + unlocker := bs.GCLock() bsrv := bserv.New(bs, offline.Exchange(bs)) ds := dag.NewDAGService(bsrv) @@ -41,7 +41,7 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner) (<-chan key. output := make(chan key.Key) go func() { defer close(output) - defer unlock() + defer unlocker.Unlock() for { select { case k, ok := <-keychan: diff --git a/pin/pin.go b/pin/pin.go index fb6269d3e..a7f62417f 100644 --- a/pin/pin.go +++ b/pin/pin.go @@ -123,6 +123,8 @@ func (p *pinner) Pin(ctx context.Context, node *mdag.Node, recurse bool) error { return nil } +var ErrNotPinned = fmt.Errorf("not pinned") + // Unpin a given key func (p *pinner) Unpin(ctx context.Context, k key.Key, recursive bool) error { p.lock.Lock() @@ -132,7 +134,7 @@ func (p *pinner) Unpin(ctx context.Context, k key.Key, recursive bool) error { return err } if !pinned { - return fmt.Errorf("%s is not pinned", k) + return ErrNotPinned } switch reason { case "recursive": diff --git a/test/sharness/t0250-files-api.sh b/test/sharness/t0250-files-api.sh index 8732cda59..4544da473 100755 --- a/test/sharness/t0250-files-api.sh +++ b/test/sharness/t0250-files-api.sh @@ -340,11 +340,6 @@ test_files_api() { ipfs files flush / ' - test_expect_success "root hash is pinned after flush" ' - ipfs pin ls > pins && - grep $EXP_ROOT_HASH pins || (cat pins && exit 1) - ' - # test mv test_expect_success "can mv dir" ' ipfs files mv /cats/this/is /cats/ diff --git a/test/sharness/t0251-files-flushing.sh b/test/sharness/t0251-files-flushing.sh new file mode 100755 index 000000000..629a93016 --- /dev/null +++ b/test/sharness/t0251-files-flushing.sh @@ -0,0 +1,53 @@ +#!/bin/sh +# +# Copyright (c) 2016 Jeromy Johnson +# MIT Licensed; see the LICENSE file in this repository. +# + +test_description="test the unix files api flushing" + +. lib/test-lib.sh + +test_init_ipfs + +verify_path_exists() { + # simply running ls on a file should be a good 'check' + ipfs files ls $1 +} + +verify_dir_contents() { + dir=$1 + shift + rm -f expected + touch expected + for e in $@ + do + echo $e >> expected + done + + test_expect_success "can list dir" ' + ipfs files ls $dir > output + ' + + test_expect_success "dir entries look good" ' + test_sort_cmp output expected + ' +} + +test_launch_ipfs_daemon + +test_expect_success "can copy a file in" ' + HASH=$(echo "foo" | ipfs add -q) && + ipfs files cp /ipfs/$HASH /file +' + +test_kill_ipfs_daemon +test_launch_ipfs_daemon + +test_expect_success "file is still there" ' + verify_path_exists /file +' + +test_kill_ipfs_daemon + +test_done diff --git a/unixfs/mod/dagmodifier.go b/unixfs/mod/dagmodifier.go index e9dbe40a0..5306399f6 100644 --- a/unixfs/mod/dagmodifier.go +++ b/unixfs/mod/dagmodifier.go @@ -169,12 +169,6 @@ func (dm *DagModifier) Sync() error { // Number of bytes we're going to write buflen := dm.wrBuf.Len() - // Grab key for unpinning after mod operation - _, err := dm.curNode.Key() - if err != nil { - return err - } - // overwrite existing dag nodes thisk, done, err := dm.modifyDag(dm.curNode, dm.writeStart, dm.wrBuf) if err != nil {