From 40a49c8399b8e952c6852909f55ba5eea8d4d591 Mon Sep 17 00:00:00 2001 From: Stephen Whitmore Date: Thu, 4 Feb 2016 12:55:41 -0800 Subject: [PATCH] Mounts detect unmounts and track mount state. This lets FUSE mounts to track whether they are active or not by tracking when fs.Serve terminates. License: MIT Signed-off-by: Stephen Whitmore --- cmd/ipfs/daemon.go | 3 +- core/commands/mount_nofuse.go | 7 -- core/commands/mount_unix.go | 111 +---------------- core/commands/mount_windows.go | 7 -- core/core.go | 4 +- fuse/ipns/ipns_test.go | 108 +--------------- fuse/mount/fuse.go | 49 ++++++-- fuse/mount/mount.go | 3 + {core/commands => fuse/node}/mount_darwin.go | 2 +- fuse/node/mount_nofuse.go | 14 +++ fuse/node/mount_test.go | 98 +++++++++++++++ fuse/node/mount_unix.go | 122 +++++++++++++++++++ fuse/node/mount_windows.go | 11 ++ 13 files changed, 296 insertions(+), 243 deletions(-) rename {core/commands => fuse/node}/mount_darwin.go (99%) create mode 100644 fuse/node/mount_nofuse.go create mode 100644 fuse/node/mount_test.go create mode 100644 fuse/node/mount_unix.go create mode 100644 fuse/node/mount_windows.go diff --git a/cmd/ipfs/daemon.go b/cmd/ipfs/daemon.go index 3906bec86..f3c3a9203 100644 --- a/cmd/ipfs/daemon.go +++ b/cmd/ipfs/daemon.go @@ -21,6 +21,7 @@ import ( corehttp "github.com/ipfs/go-ipfs/core/corehttp" corerepo "github.com/ipfs/go-ipfs/core/corerepo" "github.com/ipfs/go-ipfs/core/corerouting" + nodeMount "github.com/ipfs/go-ipfs/fuse/node" fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo" util "github.com/ipfs/go-ipfs/util" conn "gx/ipfs/QmUBogf4nUefBjmYjn6jfsfPJRkmDGSeMhNj4usRKq69f4/go-libp2p/p2p/net/conn" @@ -494,7 +495,7 @@ func mountFuse(req cmds.Request) error { return fmt.Errorf("mountFuse: ConstructNode() failed: %s", err) } - err = commands.Mount(node, fsdir, nsdir) + err = nodeMount.Mount(node, fsdir, nsdir) if err != nil { return err } diff --git a/core/commands/mount_nofuse.go b/core/commands/mount_nofuse.go index c6ea92693..932f1839e 100644 --- a/core/commands/mount_nofuse.go +++ b/core/commands/mount_nofuse.go @@ -4,10 +4,7 @@ package commands import ( - "errors" - cmds "github.com/ipfs/go-ipfs/commands" - "github.com/ipfs/go-ipfs/core" ) var MountCmd = &cmds.Command{ @@ -23,7 +20,3 @@ For the latest instructions, please check the project's repository: `, }, } - -func Mount(node *core.IpfsNode, fsdir, nsdir string) error { - return errors.New("not compiled in") -} diff --git a/core/commands/mount_unix.go b/core/commands/mount_unix.go index 5c3046af4..27e1fd0d2 100644 --- a/core/commands/mount_unix.go +++ b/core/commands/mount_unix.go @@ -7,32 +7,12 @@ import ( "fmt" "io" "strings" - "time" cmds "github.com/ipfs/go-ipfs/commands" - core "github.com/ipfs/go-ipfs/core" - ipns "github.com/ipfs/go-ipfs/fuse/ipns" - mount "github.com/ipfs/go-ipfs/fuse/mount" - rofs "github.com/ipfs/go-ipfs/fuse/readonly" + nodeMount "github.com/ipfs/go-ipfs/fuse/node" config "github.com/ipfs/go-ipfs/repo/config" ) -// amount of time to wait for mount errors -// TODO is this non-deterministic? -const mountTimeout = time.Second - -// fuseNoDirectory used to check the returning fuse error -const fuseNoDirectory = "fusermount: failed to access mountpoint" - -// fuseExitStatus1 used to check the returning fuse error -const fuseExitStatus1 = "fusermount: exit status 1" - -// platformFuseChecks can get overridden by arch-specific files -// to run fuse checks (like checking the OSXFUSE version) -var platformFuseChecks = func(*core.IpfsNode) error { - return nil -} - var MountCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Mounts IPFS to the filesystem (read-only).", @@ -134,7 +114,7 @@ baz nsdir = cfg.Mounts.IPNS // NB: be sure to not redeclare! } - err = Mount(node, fsdir, nsdir) + err = nodeMount.Mount(node, fsdir, nsdir) if err != nil { res.SetError(err, cmds.ErrNormal) return @@ -155,90 +135,3 @@ baz }, }, } - -func Mount(node *core.IpfsNode, fsdir, nsdir string) error { - // check if we already have live mounts. - // if the user said "Mount", then there must be something wrong. - // so, close them and try again. - if node.Mounts.Ipfs != nil { - node.Mounts.Ipfs.Unmount() - } - if node.Mounts.Ipns != nil { - node.Mounts.Ipns.Unmount() - } - - if err := platformFuseChecks(node); err != nil { - return err - } - - var err error - if err = doMount(node, fsdir, nsdir); err != nil { - return err - } - - return nil -} - -func doMount(node *core.IpfsNode, fsdir, nsdir string) error { - fmtFuseErr := func(err error, mountpoint string) error { - s := err.Error() - if strings.Contains(s, fuseNoDirectory) { - s = strings.Replace(s, `fusermount: "fusermount:`, "", -1) - s = strings.Replace(s, `\n", exit status 1`, "", -1) - return cmds.ClientError(s) - } - if s == fuseExitStatus1 { - s = fmt.Sprintf("fuse failed to access mountpoint %s", mountpoint) - return cmds.ClientError(s) - } - return err - } - - // this sync stuff is so that both can be mounted simultaneously. - var fsmount mount.Mount - var nsmount mount.Mount - var err1 error - var err2 error - - done := make(chan struct{}) - - go func() { - fsmount, err1 = rofs.Mount(node, fsdir) - done <- struct{}{} - }() - - go func() { - nsmount, err2 = ipns.Mount(node, nsdir, fsdir) - done <- struct{}{} - }() - - <-done - <-done - - if err1 != nil { - log.Errorf("error mounting: %s", err1) - } - - if err2 != nil { - log.Errorf("error mounting: %s", err2) - } - - if err1 != nil || err2 != nil { - if fsmount != nil { - fsmount.Unmount() - } - if nsmount != nil { - nsmount.Unmount() - } - - if err1 != nil { - return fmtFuseErr(err1, fsdir) - } - return fmtFuseErr(err2, nsdir) - } - - // setup node state, so that it can be cancelled - node.Mounts.Ipfs = fsmount - node.Mounts.Ipns = nsmount - return nil -} diff --git a/core/commands/mount_windows.go b/core/commands/mount_windows.go index b06e2a830..8eeb6bfbb 100644 --- a/core/commands/mount_windows.go +++ b/core/commands/mount_windows.go @@ -4,7 +4,6 @@ import ( "errors" cmds "github.com/ipfs/go-ipfs/commands" - "github.com/ipfs/go-ipfs/core" ) var MountCmd = &cmds.Command{ @@ -17,9 +16,3 @@ var MountCmd = &cmds.Command{ res.SetError(errors.New("Mount isn't compatible with Windows yet"), cmds.ErrNormal) }, } - -func Mount(node *core.IpfsNode, fsdir, nsdir string) error { - // TODO - // currently a no-op, but we don't want to return an error - return nil -} diff --git a/core/core.go b/core/core.go index ccad484cf..761a17389 100644 --- a/core/core.go +++ b/core/core.go @@ -335,10 +335,10 @@ func (n *IpfsNode) teardown() error { closers = append(closers, n.Exchange) } - if n.Mounts.Ipfs != nil { + if n.Mounts.Ipfs != nil && !n.Mounts.Ipfs.IsActive() { closers = append(closers, mount.Closer(n.Mounts.Ipfs)) } - if n.Mounts.Ipns != nil { + if n.Mounts.Ipns != nil && !n.Mounts.Ipns.IsActive() { closers = append(closers, mount.Closer(n.Mounts.Ipns)) } diff --git a/fuse/ipns/ipns_test.go b/fuse/ipns/ipns_test.go index df11624ce..321376d75 100644 --- a/fuse/ipns/ipns_test.go +++ b/fuse/ipns/ipns_test.go @@ -15,12 +15,11 @@ import ( racedet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-detect-race" core "github.com/ipfs/go-ipfs/core" - context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" - //mfs "github.com/ipfs/go-ipfs/mfs" namesys "github.com/ipfs/go-ipfs/namesys" offroute "github.com/ipfs/go-ipfs/routing/offline" u "github.com/ipfs/go-ipfs/util" ci "github.com/ipfs/go-ipfs/util/testutil/ci" + context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" ) func maybeSkipFuseTests(t *testing.T) { @@ -437,111 +436,6 @@ func TestFSThrash(t *testing.T) { } } -/* -func TestFastRepublish(t *testing.T) { - if testing.Short() { - t.SkipNow() - } - - // make timeout noticeable. - osrt := shortRepublishTimeout - shortRepublishTimeout = time.Millisecond * 100 - - olrt := longRepublishTimeout - longRepublishTimeout = time.Second - - node, mnt := setupIpnsTest(t, nil) - - h, err := node.PrivateKey.GetPublic().Hash() - if err != nil { - t.Fatal(err) - } - pubkeyPath := "/ipns/" + u.Key(h).String() - - // set them back - defer func() { - shortRepublishTimeout = osrt - longRepublishTimeout = olrt - mnt.Close() - }() - - closed := make(chan struct{}) - dataA := []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") - dataB := []byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb") - - fname := mnt.Dir + "/local/file" - - // get first resolved hash - log.Debug("publishing first hash") - writeFileData(t, dataA, fname) // random - <-time.After(shortRepublishTimeout * 2) - log.Debug("resolving first hash") - resolvedHash, err := node.Namesys.Resolve(context.Background(), pubkeyPath) - if err != nil { - t.Fatal("resolve err:", pubkeyPath, err) - } - - // constantly keep writing to the file - go func(timeout time.Duration) { - for { - select { - case <-closed: - return - - case <-time.After(timeout * 8 / 10): - writeFileData(t, dataB, fname) - } - } - }(shortRepublishTimeout) - - hasPublished := func() bool { - res, err := node.Namesys.Resolve(context.Background(), pubkeyPath) - if err != nil { - t.Fatalf("resolve err: %v", err) - } - return res != resolvedHash - } - - // test things - - // at this point, should not have written dataA and not have written dataB - rbuf, err := ioutil.ReadFile(fname) - if err != nil || !bytes.Equal(rbuf, dataA) { - t.Fatalf("Data inconsistent! %v %v", err, string(rbuf)) - } - - if hasPublished() { - t.Fatal("published (wrote)") - } - - <-time.After(shortRepublishTimeout * 11 / 10) - - // at this point, should have written written dataB, but not published it - rbuf, err = ioutil.ReadFile(fname) - if err != nil || !bytes.Equal(rbuf, dataB) { - t.Fatalf("Data inconsistent! %v %v", err, string(rbuf)) - } - - if hasPublished() { - t.Fatal("published (wrote)") - } - - <-time.After(longRepublishTimeout * 11 / 10) - - // at this point, should have written written dataB, and published it - rbuf, err = ioutil.ReadFile(fname) - if err != nil || !bytes.Equal(rbuf, dataB) { - t.Fatalf("Data inconsistent! %v %v", err, string(rbuf)) - } - - if !hasPublished() { - t.Fatal("not published") - } - - close(closed) -} -*/ - // Test writing a medium sized file one byte at a time func TestMultiWrite(t *testing.T) { diff --git a/fuse/mount/fuse.go b/fuse/mount/fuse.go index e4c1c18f5..0351b7f16 100644 --- a/fuse/mount/fuse.go +++ b/fuse/mount/fuse.go @@ -4,7 +4,9 @@ package mount import ( + "errors" "fmt" + "sync" "time" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/bazil.org/fuse" @@ -12,12 +14,16 @@ import ( "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" ) +var ErrNotMounted = errors.New("not mounted") + // mount implements go-ipfs/fuse/mount type mount struct { mpoint string filesys fs.FS fuseConn *fuse.Conn - // closeErr error + + active bool + activeLock *sync.RWMutex proc goprocess.Process } @@ -39,10 +45,12 @@ func NewMount(p goprocess.Process, fsys fs.FS, mountpoint string, allow_other bo } m := &mount{ - mpoint: mountpoint, - fuseConn: conn, - filesys: fsys, - proc: goprocess.WithParent(p), // link it to parent. + mpoint: mountpoint, + fuseConn: conn, + filesys: fsys, + active: false, + activeLock: &sync.RWMutex{}, + proc: goprocess.WithParent(p), // link it to parent. } m.proc.SetTeardown(m.unmount) @@ -60,11 +68,14 @@ func (m *mount) mount() error { errs := make(chan error, 1) go func() { + // fs.Serve blocks until the filesystem is unmounted. err := fs.Serve(m.fuseConn, m.filesys) - log.Debugf("Mounting %s -- fs.Serve returned (%s)", err) + log.Debugf("%s is unmounted", m.MountPoint()) if err != nil { + log.Debugf("fs.Serve returned (%s)", err) errs <- err } + m.setActive(false) }() // wait for the mount process to be done, or timed out. @@ -81,6 +92,8 @@ func (m *mount) mount() error { return err } + m.setActive(true) + log.Infof("Mounted %s", m.MountPoint()) return nil } @@ -95,6 +108,7 @@ func (m *mount) unmount() error { // try unmounting with fuse lib err := fuse.Unmount(m.MountPoint()) if err == nil { + m.setActive(false) return nil } log.Warningf("fuse unmount err: %s", err) @@ -102,11 +116,10 @@ func (m *mount) unmount() error { // try closing the fuseConn err = m.fuseConn.Close() if err == nil { + m.setActive(false) return nil } - if err != nil { - log.Warningf("fuse conn error: %s", err) - } + log.Warningf("fuse conn error: %s", err) // try mount.ForceUnmountManyTimes if err := ForceUnmountManyTimes(m, 10); err != nil { @@ -114,6 +127,7 @@ func (m *mount) unmount() error { } log.Infof("Seemingly unmounted %s", m.MountPoint()) + m.setActive(false) return nil } @@ -126,6 +140,23 @@ func (m *mount) MountPoint() string { } func (m *mount) Unmount() error { + if !m.IsActive() { + return ErrNotMounted + } + // call Process Close(), which calls unmount() exactly once. return m.proc.Close() } + +func (m *mount) IsActive() bool { + m.activeLock.RLock() + defer m.activeLock.RUnlock() + + return m.active +} + +func (m *mount) setActive(a bool) { + m.activeLock.Lock() + m.active = a + m.activeLock.Unlock() +} diff --git a/fuse/mount/mount.go b/fuse/mount/mount.go index 4404fa6e7..cb7d25ffa 100644 --- a/fuse/mount/mount.go +++ b/fuse/mount/mount.go @@ -25,6 +25,9 @@ type Mount interface { // Unmounts the mount Unmount() error + // Checks if the mount is still active. + IsActive() bool + // Process returns the mount's Process to be able to link it // to other processes. Unmount upon closing. Process() goprocess.Process diff --git a/core/commands/mount_darwin.go b/fuse/node/mount_darwin.go similarity index 99% rename from core/commands/mount_darwin.go rename to fuse/node/mount_darwin.go index db75a53f7..ebefdd806 100644 --- a/core/commands/mount_darwin.go +++ b/fuse/node/mount_darwin.go @@ -1,6 +1,6 @@ // +build !nofuse -package commands +package node import ( "bytes" diff --git a/fuse/node/mount_nofuse.go b/fuse/node/mount_nofuse.go new file mode 100644 index 000000000..2a26c88d9 --- /dev/null +++ b/fuse/node/mount_nofuse.go @@ -0,0 +1,14 @@ +// +build linux darwin freebsd +// +build nofuse + +package node + +import ( + "errors" + + core "github.com/ipfs/go-ipfs/core" +) + +func Mount(node *core.IpfsNode, fsdir, nsdir string) error { + return errors.New("not compiled in") +} diff --git a/fuse/node/mount_test.go b/fuse/node/mount_test.go new file mode 100644 index 000000000..6b50ea294 --- /dev/null +++ b/fuse/node/mount_test.go @@ -0,0 +1,98 @@ +// +build !nofuse + +package node + +import ( + "io/ioutil" + "os" + "os/exec" + "testing" + "time" + + context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + core "github.com/ipfs/go-ipfs/core" + ipns "github.com/ipfs/go-ipfs/fuse/ipns" + mount "github.com/ipfs/go-ipfs/fuse/mount" + namesys "github.com/ipfs/go-ipfs/namesys" + offroute "github.com/ipfs/go-ipfs/routing/offline" + ci "github.com/ipfs/go-ipfs/util/testutil/ci" +) + +func maybeSkipFuseTests(t *testing.T) { + if ci.NoFuse() { + t.Skip("Skipping FUSE tests") + } +} + +func mkdir(t *testing.T, path string) { + err := os.Mkdir(path, os.ModeDir|os.ModePerm) + if err != nil { + t.Fatal(err) + } +} + +// Test externally unmounting, then trying to unmount in code +func TestExternalUnmount(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + + // TODO: needed? + maybeSkipFuseTests(t) + + node, err := core.NewNode(context.Background(), nil) + if err != nil { + t.Fatal(err) + } + + err = node.LoadPrivateKey() + if err != nil { + t.Fatal(err) + } + + node.Routing = offroute.NewOfflineRouter(node.Repo.Datastore(), node.PrivateKey) + node.Namesys = namesys.NewNameSystem(node.Routing, node.Repo.Datastore(), 0) + + err = ipns.InitializeKeyspace(node, node.PrivateKey) + if err != nil { + t.Fatal(err) + } + + // get the test dir paths (/tmp/fusetestXXXX) + dir, err := ioutil.TempDir("", "fusetest") + if err != nil { + t.Fatal(err) + } + + ipfsDir := dir + "/ipfs" + ipnsDir := dir + "/ipns" + mkdir(t, ipfsDir) + mkdir(t, ipnsDir) + + err = Mount(node, ipfsDir, ipnsDir) + if err != nil { + t.Fatal(err) + } + + // Run shell command to externally unmount the directory + cmd := "fusermount" + args := []string{"-u", ipnsDir} + if err := exec.Command(cmd, args...).Run(); err != nil { + t.Fatal(err) + } + + // TODO(noffle): it takes a moment for the goroutine that's running fs.Serve to be notified and do its cleanup. + time.Sleep(time.Millisecond * 100) + + // Attempt to unmount IPNS; check that it was already unmounted. + err = node.Mounts.Ipns.Unmount() + if err != mount.ErrNotMounted { + t.Fatal("Unmount should have failed") + } + + // Attempt to unmount IPFS; it should unmount successfully. + err = node.Mounts.Ipfs.Unmount() + if err != nil { + t.Fatal(err) + } +} diff --git a/fuse/node/mount_unix.go b/fuse/node/mount_unix.go new file mode 100644 index 000000000..63c172f0e --- /dev/null +++ b/fuse/node/mount_unix.go @@ -0,0 +1,122 @@ +// +build linux darwin freebsd +// +build !nofuse + +package node + +import ( + "errors" + "fmt" + "strings" + "time" + + core "github.com/ipfs/go-ipfs/core" + ipns "github.com/ipfs/go-ipfs/fuse/ipns" + mount "github.com/ipfs/go-ipfs/fuse/mount" + rofs "github.com/ipfs/go-ipfs/fuse/readonly" + logging "gx/ipfs/Qmazh5oNUVsDZTs2g59rq8aYQqwpss8tcUWQzor5sCCEuH/go-log" +) + +var log = logging.Logger("node") + +// amount of time to wait for mount errors +// TODO is this non-deterministic? +const mountTimeout = time.Second + +// fuseNoDirectory used to check the returning fuse error +const fuseNoDirectory = "fusermount: failed to access mountpoint" + +// fuseExitStatus1 used to check the returning fuse error +const fuseExitStatus1 = "fusermount: exit status 1" + +// platformFuseChecks can get overridden by arch-specific files +// to run fuse checks (like checking the OSXFUSE version) +var platformFuseChecks = func(*core.IpfsNode) error { + return nil +} + +func Mount(node *core.IpfsNode, fsdir, nsdir string) error { + // check if we already have live mounts. + // if the user said "Mount", then there must be something wrong. + // so, close them and try again. + if node.Mounts.Ipfs != nil && node.Mounts.Ipfs.IsActive() { + node.Mounts.Ipfs.Unmount() + } + if node.Mounts.Ipns != nil && node.Mounts.Ipns.IsActive() { + node.Mounts.Ipns.Unmount() + } + + if err := platformFuseChecks(node); err != nil { + return err + } + + var err error + if err = doMount(node, fsdir, nsdir); err != nil { + return err + } + + return nil +} + +func doMount(node *core.IpfsNode, fsdir, nsdir string) error { + fmtFuseErr := func(err error, mountpoint string) error { + s := err.Error() + if strings.Contains(s, fuseNoDirectory) { + s = strings.Replace(s, `fusermount: "fusermount:`, "", -1) + s = strings.Replace(s, `\n", exit status 1`, "", -1) + return errors.New(s) + } + if s == fuseExitStatus1 { + s = fmt.Sprintf("fuse failed to access mountpoint %s", mountpoint) + return errors.New(s) + } + return err + } + + // this sync stuff is so that both can be mounted simultaneously. + var fsmount mount.Mount + var nsmount mount.Mount + var err1 error + var err2 error + + done := make(chan struct{}) + + go func() { + fsmount, err1 = rofs.Mount(node, fsdir) + done <- struct{}{} + }() + + go func() { + nsmount, err2 = ipns.Mount(node, nsdir, fsdir) + done <- struct{}{} + }() + + <-done + <-done + + if err1 != nil { + log.Errorf("error mounting: %s", err1) + } + + if err2 != nil { + log.Errorf("error mounting: %s", err2) + } + + if err1 != nil || err2 != nil { + if fsmount != nil { + fsmount.Unmount() + } + if nsmount != nil { + nsmount.Unmount() + } + + if err1 != nil { + return fmtFuseErr(err1, fsdir) + } + return fmtFuseErr(err2, nsdir) + } + + // setup node state, so that it can be cancelled + node.Mounts.Ipfs = fsmount + node.Mounts.Ipns = nsmount + return nil +} diff --git a/fuse/node/mount_windows.go b/fuse/node/mount_windows.go new file mode 100644 index 000000000..ce89deddb --- /dev/null +++ b/fuse/node/mount_windows.go @@ -0,0 +1,11 @@ +package node + +import ( + "github.com/ipfs/go-ipfs/core" +) + +func Mount(node *core.IpfsNode, fsdir, nsdir string) error { + // TODO + // currently a no-op, but we don't want to return an error + return nil +}