1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-29 17:36:38 +08:00

Merge pull request #2296 from noffle/clean_unmount

Mounts detect unmounts and track mount state.
This commit is contained in:
Stephen Whitmore
2016-02-08 13:22:45 -08:00
13 changed files with 296 additions and 243 deletions

View File

@ -21,6 +21,7 @@ import (
corehttp "github.com/ipfs/go-ipfs/core/corehttp" corehttp "github.com/ipfs/go-ipfs/core/corehttp"
corerepo "github.com/ipfs/go-ipfs/core/corerepo" corerepo "github.com/ipfs/go-ipfs/core/corerepo"
"github.com/ipfs/go-ipfs/core/corerouting" "github.com/ipfs/go-ipfs/core/corerouting"
nodeMount "github.com/ipfs/go-ipfs/fuse/node"
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo" fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
util "github.com/ipfs/go-ipfs/util" util "github.com/ipfs/go-ipfs/util"
conn "gx/ipfs/QmUBogf4nUefBjmYjn6jfsfPJRkmDGSeMhNj4usRKq69f4/go-libp2p/p2p/net/conn" conn "gx/ipfs/QmUBogf4nUefBjmYjn6jfsfPJRkmDGSeMhNj4usRKq69f4/go-libp2p/p2p/net/conn"
@ -494,7 +495,7 @@ func mountFuse(req cmds.Request) error {
return fmt.Errorf("mountFuse: ConstructNode() failed: %s", err) return fmt.Errorf("mountFuse: ConstructNode() failed: %s", err)
} }
err = commands.Mount(node, fsdir, nsdir) err = nodeMount.Mount(node, fsdir, nsdir)
if err != nil { if err != nil {
return err return err
} }

View File

@ -4,10 +4,7 @@
package commands package commands
import ( import (
"errors"
cmds "github.com/ipfs/go-ipfs/commands" cmds "github.com/ipfs/go-ipfs/commands"
"github.com/ipfs/go-ipfs/core"
) )
var MountCmd = &cmds.Command{ var MountCmd = &cmds.Command{
@ -23,7 +20,3 @@ For the latest instructions, please check the project's repository:
`, `,
}, },
} }
func Mount(node *core.IpfsNode, fsdir, nsdir string) error {
return errors.New("not compiled in")
}

View File

@ -7,32 +7,12 @@ import (
"fmt" "fmt"
"io" "io"
"strings" "strings"
"time"
cmds "github.com/ipfs/go-ipfs/commands" cmds "github.com/ipfs/go-ipfs/commands"
core "github.com/ipfs/go-ipfs/core" nodeMount "github.com/ipfs/go-ipfs/fuse/node"
ipns "github.com/ipfs/go-ipfs/fuse/ipns"
mount "github.com/ipfs/go-ipfs/fuse/mount"
rofs "github.com/ipfs/go-ipfs/fuse/readonly"
config "github.com/ipfs/go-ipfs/repo/config" config "github.com/ipfs/go-ipfs/repo/config"
) )
// amount of time to wait for mount errors
// TODO is this non-deterministic?
const mountTimeout = time.Second
// fuseNoDirectory used to check the returning fuse error
const fuseNoDirectory = "fusermount: failed to access mountpoint"
// fuseExitStatus1 used to check the returning fuse error
const fuseExitStatus1 = "fusermount: exit status 1"
// platformFuseChecks can get overridden by arch-specific files
// to run fuse checks (like checking the OSXFUSE version)
var platformFuseChecks = func(*core.IpfsNode) error {
return nil
}
var MountCmd = &cmds.Command{ var MountCmd = &cmds.Command{
Helptext: cmds.HelpText{ Helptext: cmds.HelpText{
Tagline: "Mounts IPFS to the filesystem (read-only).", Tagline: "Mounts IPFS to the filesystem (read-only).",
@ -134,7 +114,7 @@ baz
nsdir = cfg.Mounts.IPNS // NB: be sure to not redeclare! nsdir = cfg.Mounts.IPNS // NB: be sure to not redeclare!
} }
err = Mount(node, fsdir, nsdir) err = nodeMount.Mount(node, fsdir, nsdir)
if err != nil { if err != nil {
res.SetError(err, cmds.ErrNormal) res.SetError(err, cmds.ErrNormal)
return return
@ -155,90 +135,3 @@ baz
}, },
}, },
} }
func Mount(node *core.IpfsNode, fsdir, nsdir string) error {
// check if we already have live mounts.
// if the user said "Mount", then there must be something wrong.
// so, close them and try again.
if node.Mounts.Ipfs != nil {
node.Mounts.Ipfs.Unmount()
}
if node.Mounts.Ipns != nil {
node.Mounts.Ipns.Unmount()
}
if err := platformFuseChecks(node); err != nil {
return err
}
var err error
if err = doMount(node, fsdir, nsdir); err != nil {
return err
}
return nil
}
func doMount(node *core.IpfsNode, fsdir, nsdir string) error {
fmtFuseErr := func(err error, mountpoint string) error {
s := err.Error()
if strings.Contains(s, fuseNoDirectory) {
s = strings.Replace(s, `fusermount: "fusermount:`, "", -1)
s = strings.Replace(s, `\n", exit status 1`, "", -1)
return cmds.ClientError(s)
}
if s == fuseExitStatus1 {
s = fmt.Sprintf("fuse failed to access mountpoint %s", mountpoint)
return cmds.ClientError(s)
}
return err
}
// this sync stuff is so that both can be mounted simultaneously.
var fsmount mount.Mount
var nsmount mount.Mount
var err1 error
var err2 error
done := make(chan struct{})
go func() {
fsmount, err1 = rofs.Mount(node, fsdir)
done <- struct{}{}
}()
go func() {
nsmount, err2 = ipns.Mount(node, nsdir, fsdir)
done <- struct{}{}
}()
<-done
<-done
if err1 != nil {
log.Errorf("error mounting: %s", err1)
}
if err2 != nil {
log.Errorf("error mounting: %s", err2)
}
if err1 != nil || err2 != nil {
if fsmount != nil {
fsmount.Unmount()
}
if nsmount != nil {
nsmount.Unmount()
}
if err1 != nil {
return fmtFuseErr(err1, fsdir)
}
return fmtFuseErr(err2, nsdir)
}
// setup node state, so that it can be cancelled
node.Mounts.Ipfs = fsmount
node.Mounts.Ipns = nsmount
return nil
}

View File

@ -4,7 +4,6 @@ import (
"errors" "errors"
cmds "github.com/ipfs/go-ipfs/commands" cmds "github.com/ipfs/go-ipfs/commands"
"github.com/ipfs/go-ipfs/core"
) )
var MountCmd = &cmds.Command{ var MountCmd = &cmds.Command{
@ -17,9 +16,3 @@ var MountCmd = &cmds.Command{
res.SetError(errors.New("Mount isn't compatible with Windows yet"), cmds.ErrNormal) res.SetError(errors.New("Mount isn't compatible with Windows yet"), cmds.ErrNormal)
}, },
} }
func Mount(node *core.IpfsNode, fsdir, nsdir string) error {
// TODO
// currently a no-op, but we don't want to return an error
return nil
}

View File

@ -335,10 +335,10 @@ func (n *IpfsNode) teardown() error {
closers = append(closers, n.Exchange) closers = append(closers, n.Exchange)
} }
if n.Mounts.Ipfs != nil { if n.Mounts.Ipfs != nil && !n.Mounts.Ipfs.IsActive() {
closers = append(closers, mount.Closer(n.Mounts.Ipfs)) closers = append(closers, mount.Closer(n.Mounts.Ipfs))
} }
if n.Mounts.Ipns != nil { if n.Mounts.Ipns != nil && !n.Mounts.Ipns.IsActive() {
closers = append(closers, mount.Closer(n.Mounts.Ipns)) closers = append(closers, mount.Closer(n.Mounts.Ipns))
} }

View File

@ -15,12 +15,11 @@ import (
racedet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-detect-race" racedet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-detect-race"
core "github.com/ipfs/go-ipfs/core" core "github.com/ipfs/go-ipfs/core"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
//mfs "github.com/ipfs/go-ipfs/mfs"
namesys "github.com/ipfs/go-ipfs/namesys" namesys "github.com/ipfs/go-ipfs/namesys"
offroute "github.com/ipfs/go-ipfs/routing/offline" offroute "github.com/ipfs/go-ipfs/routing/offline"
u "github.com/ipfs/go-ipfs/util" u "github.com/ipfs/go-ipfs/util"
ci "github.com/ipfs/go-ipfs/util/testutil/ci" ci "github.com/ipfs/go-ipfs/util/testutil/ci"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
) )
func maybeSkipFuseTests(t *testing.T) { func maybeSkipFuseTests(t *testing.T) {
@ -437,111 +436,6 @@ func TestFSThrash(t *testing.T) {
} }
} }
/*
func TestFastRepublish(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
// make timeout noticeable.
osrt := shortRepublishTimeout
shortRepublishTimeout = time.Millisecond * 100
olrt := longRepublishTimeout
longRepublishTimeout = time.Second
node, mnt := setupIpnsTest(t, nil)
h, err := node.PrivateKey.GetPublic().Hash()
if err != nil {
t.Fatal(err)
}
pubkeyPath := "/ipns/" + u.Key(h).String()
// set them back
defer func() {
shortRepublishTimeout = osrt
longRepublishTimeout = olrt
mnt.Close()
}()
closed := make(chan struct{})
dataA := []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
dataB := []byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
fname := mnt.Dir + "/local/file"
// get first resolved hash
log.Debug("publishing first hash")
writeFileData(t, dataA, fname) // random
<-time.After(shortRepublishTimeout * 2)
log.Debug("resolving first hash")
resolvedHash, err := node.Namesys.Resolve(context.Background(), pubkeyPath)
if err != nil {
t.Fatal("resolve err:", pubkeyPath, err)
}
// constantly keep writing to the file
go func(timeout time.Duration) {
for {
select {
case <-closed:
return
case <-time.After(timeout * 8 / 10):
writeFileData(t, dataB, fname)
}
}
}(shortRepublishTimeout)
hasPublished := func() bool {
res, err := node.Namesys.Resolve(context.Background(), pubkeyPath)
if err != nil {
t.Fatalf("resolve err: %v", err)
}
return res != resolvedHash
}
// test things
// at this point, should not have written dataA and not have written dataB
rbuf, err := ioutil.ReadFile(fname)
if err != nil || !bytes.Equal(rbuf, dataA) {
t.Fatalf("Data inconsistent! %v %v", err, string(rbuf))
}
if hasPublished() {
t.Fatal("published (wrote)")
}
<-time.After(shortRepublishTimeout * 11 / 10)
// at this point, should have written written dataB, but not published it
rbuf, err = ioutil.ReadFile(fname)
if err != nil || !bytes.Equal(rbuf, dataB) {
t.Fatalf("Data inconsistent! %v %v", err, string(rbuf))
}
if hasPublished() {
t.Fatal("published (wrote)")
}
<-time.After(longRepublishTimeout * 11 / 10)
// at this point, should have written written dataB, and published it
rbuf, err = ioutil.ReadFile(fname)
if err != nil || !bytes.Equal(rbuf, dataB) {
t.Fatalf("Data inconsistent! %v %v", err, string(rbuf))
}
if !hasPublished() {
t.Fatal("not published")
}
close(closed)
}
*/
// Test writing a medium sized file one byte at a time // Test writing a medium sized file one byte at a time
func TestMultiWrite(t *testing.T) { func TestMultiWrite(t *testing.T) {

View File

@ -4,7 +4,9 @@
package mount package mount
import ( import (
"errors"
"fmt" "fmt"
"sync"
"time" "time"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/bazil.org/fuse" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/bazil.org/fuse"
@ -12,12 +14,16 @@ import (
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
) )
var ErrNotMounted = errors.New("not mounted")
// mount implements go-ipfs/fuse/mount // mount implements go-ipfs/fuse/mount
type mount struct { type mount struct {
mpoint string mpoint string
filesys fs.FS filesys fs.FS
fuseConn *fuse.Conn fuseConn *fuse.Conn
// closeErr error
active bool
activeLock *sync.RWMutex
proc goprocess.Process proc goprocess.Process
} }
@ -39,10 +45,12 @@ func NewMount(p goprocess.Process, fsys fs.FS, mountpoint string, allow_other bo
} }
m := &mount{ m := &mount{
mpoint: mountpoint, mpoint: mountpoint,
fuseConn: conn, fuseConn: conn,
filesys: fsys, filesys: fsys,
proc: goprocess.WithParent(p), // link it to parent. active: false,
activeLock: &sync.RWMutex{},
proc: goprocess.WithParent(p), // link it to parent.
} }
m.proc.SetTeardown(m.unmount) m.proc.SetTeardown(m.unmount)
@ -60,11 +68,14 @@ func (m *mount) mount() error {
errs := make(chan error, 1) errs := make(chan error, 1)
go func() { go func() {
// fs.Serve blocks until the filesystem is unmounted.
err := fs.Serve(m.fuseConn, m.filesys) err := fs.Serve(m.fuseConn, m.filesys)
log.Debugf("Mounting %s -- fs.Serve returned (%s)", err) log.Debugf("%s is unmounted", m.MountPoint())
if err != nil { if err != nil {
log.Debugf("fs.Serve returned (%s)", err)
errs <- err errs <- err
} }
m.setActive(false)
}() }()
// wait for the mount process to be done, or timed out. // wait for the mount process to be done, or timed out.
@ -81,6 +92,8 @@ func (m *mount) mount() error {
return err return err
} }
m.setActive(true)
log.Infof("Mounted %s", m.MountPoint()) log.Infof("Mounted %s", m.MountPoint())
return nil return nil
} }
@ -95,6 +108,7 @@ func (m *mount) unmount() error {
// try unmounting with fuse lib // try unmounting with fuse lib
err := fuse.Unmount(m.MountPoint()) err := fuse.Unmount(m.MountPoint())
if err == nil { if err == nil {
m.setActive(false)
return nil return nil
} }
log.Warningf("fuse unmount err: %s", err) log.Warningf("fuse unmount err: %s", err)
@ -102,11 +116,10 @@ func (m *mount) unmount() error {
// try closing the fuseConn // try closing the fuseConn
err = m.fuseConn.Close() err = m.fuseConn.Close()
if err == nil { if err == nil {
m.setActive(false)
return nil return nil
} }
if err != nil { log.Warningf("fuse conn error: %s", err)
log.Warningf("fuse conn error: %s", err)
}
// try mount.ForceUnmountManyTimes // try mount.ForceUnmountManyTimes
if err := ForceUnmountManyTimes(m, 10); err != nil { if err := ForceUnmountManyTimes(m, 10); err != nil {
@ -114,6 +127,7 @@ func (m *mount) unmount() error {
} }
log.Infof("Seemingly unmounted %s", m.MountPoint()) log.Infof("Seemingly unmounted %s", m.MountPoint())
m.setActive(false)
return nil return nil
} }
@ -126,6 +140,23 @@ func (m *mount) MountPoint() string {
} }
func (m *mount) Unmount() error { func (m *mount) Unmount() error {
if !m.IsActive() {
return ErrNotMounted
}
// call Process Close(), which calls unmount() exactly once. // call Process Close(), which calls unmount() exactly once.
return m.proc.Close() return m.proc.Close()
} }
func (m *mount) IsActive() bool {
m.activeLock.RLock()
defer m.activeLock.RUnlock()
return m.active
}
func (m *mount) setActive(a bool) {
m.activeLock.Lock()
m.active = a
m.activeLock.Unlock()
}

View File

@ -25,6 +25,9 @@ type Mount interface {
// Unmounts the mount // Unmounts the mount
Unmount() error Unmount() error
// Checks if the mount is still active.
IsActive() bool
// Process returns the mount's Process to be able to link it // Process returns the mount's Process to be able to link it
// to other processes. Unmount upon closing. // to other processes. Unmount upon closing.
Process() goprocess.Process Process() goprocess.Process

View File

@ -1,6 +1,6 @@
// +build !nofuse // +build !nofuse
package commands package node
import ( import (
"bytes" "bytes"

14
fuse/node/mount_nofuse.go Normal file
View File

@ -0,0 +1,14 @@
// +build linux darwin freebsd
// +build nofuse
package node
import (
"errors"
core "github.com/ipfs/go-ipfs/core"
)
func Mount(node *core.IpfsNode, fsdir, nsdir string) error {
return errors.New("not compiled in")
}

98
fuse/node/mount_test.go Normal file
View File

@ -0,0 +1,98 @@
// +build !nofuse
package node
import (
"io/ioutil"
"os"
"os/exec"
"testing"
"time"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
core "github.com/ipfs/go-ipfs/core"
ipns "github.com/ipfs/go-ipfs/fuse/ipns"
mount "github.com/ipfs/go-ipfs/fuse/mount"
namesys "github.com/ipfs/go-ipfs/namesys"
offroute "github.com/ipfs/go-ipfs/routing/offline"
ci "github.com/ipfs/go-ipfs/util/testutil/ci"
)
func maybeSkipFuseTests(t *testing.T) {
if ci.NoFuse() {
t.Skip("Skipping FUSE tests")
}
}
func mkdir(t *testing.T, path string) {
err := os.Mkdir(path, os.ModeDir|os.ModePerm)
if err != nil {
t.Fatal(err)
}
}
// Test externally unmounting, then trying to unmount in code
func TestExternalUnmount(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
// TODO: needed?
maybeSkipFuseTests(t)
node, err := core.NewNode(context.Background(), nil)
if err != nil {
t.Fatal(err)
}
err = node.LoadPrivateKey()
if err != nil {
t.Fatal(err)
}
node.Routing = offroute.NewOfflineRouter(node.Repo.Datastore(), node.PrivateKey)
node.Namesys = namesys.NewNameSystem(node.Routing, node.Repo.Datastore(), 0)
err = ipns.InitializeKeyspace(node, node.PrivateKey)
if err != nil {
t.Fatal(err)
}
// get the test dir paths (/tmp/fusetestXXXX)
dir, err := ioutil.TempDir("", "fusetest")
if err != nil {
t.Fatal(err)
}
ipfsDir := dir + "/ipfs"
ipnsDir := dir + "/ipns"
mkdir(t, ipfsDir)
mkdir(t, ipnsDir)
err = Mount(node, ipfsDir, ipnsDir)
if err != nil {
t.Fatal(err)
}
// Run shell command to externally unmount the directory
cmd := "fusermount"
args := []string{"-u", ipnsDir}
if err := exec.Command(cmd, args...).Run(); err != nil {
t.Fatal(err)
}
// TODO(noffle): it takes a moment for the goroutine that's running fs.Serve to be notified and do its cleanup.
time.Sleep(time.Millisecond * 100)
// Attempt to unmount IPNS; check that it was already unmounted.
err = node.Mounts.Ipns.Unmount()
if err != mount.ErrNotMounted {
t.Fatal("Unmount should have failed")
}
// Attempt to unmount IPFS; it should unmount successfully.
err = node.Mounts.Ipfs.Unmount()
if err != nil {
t.Fatal(err)
}
}

122
fuse/node/mount_unix.go Normal file
View File

@ -0,0 +1,122 @@
// +build linux darwin freebsd
// +build !nofuse
package node
import (
"errors"
"fmt"
"strings"
"time"
core "github.com/ipfs/go-ipfs/core"
ipns "github.com/ipfs/go-ipfs/fuse/ipns"
mount "github.com/ipfs/go-ipfs/fuse/mount"
rofs "github.com/ipfs/go-ipfs/fuse/readonly"
logging "gx/ipfs/Qmazh5oNUVsDZTs2g59rq8aYQqwpss8tcUWQzor5sCCEuH/go-log"
)
var log = logging.Logger("node")
// amount of time to wait for mount errors
// TODO is this non-deterministic?
const mountTimeout = time.Second
// fuseNoDirectory used to check the returning fuse error
const fuseNoDirectory = "fusermount: failed to access mountpoint"
// fuseExitStatus1 used to check the returning fuse error
const fuseExitStatus1 = "fusermount: exit status 1"
// platformFuseChecks can get overridden by arch-specific files
// to run fuse checks (like checking the OSXFUSE version)
var platformFuseChecks = func(*core.IpfsNode) error {
return nil
}
func Mount(node *core.IpfsNode, fsdir, nsdir string) error {
// check if we already have live mounts.
// if the user said "Mount", then there must be something wrong.
// so, close them and try again.
if node.Mounts.Ipfs != nil && node.Mounts.Ipfs.IsActive() {
node.Mounts.Ipfs.Unmount()
}
if node.Mounts.Ipns != nil && node.Mounts.Ipns.IsActive() {
node.Mounts.Ipns.Unmount()
}
if err := platformFuseChecks(node); err != nil {
return err
}
var err error
if err = doMount(node, fsdir, nsdir); err != nil {
return err
}
return nil
}
func doMount(node *core.IpfsNode, fsdir, nsdir string) error {
fmtFuseErr := func(err error, mountpoint string) error {
s := err.Error()
if strings.Contains(s, fuseNoDirectory) {
s = strings.Replace(s, `fusermount: "fusermount:`, "", -1)
s = strings.Replace(s, `\n", exit status 1`, "", -1)
return errors.New(s)
}
if s == fuseExitStatus1 {
s = fmt.Sprintf("fuse failed to access mountpoint %s", mountpoint)
return errors.New(s)
}
return err
}
// this sync stuff is so that both can be mounted simultaneously.
var fsmount mount.Mount
var nsmount mount.Mount
var err1 error
var err2 error
done := make(chan struct{})
go func() {
fsmount, err1 = rofs.Mount(node, fsdir)
done <- struct{}{}
}()
go func() {
nsmount, err2 = ipns.Mount(node, nsdir, fsdir)
done <- struct{}{}
}()
<-done
<-done
if err1 != nil {
log.Errorf("error mounting: %s", err1)
}
if err2 != nil {
log.Errorf("error mounting: %s", err2)
}
if err1 != nil || err2 != nil {
if fsmount != nil {
fsmount.Unmount()
}
if nsmount != nil {
nsmount.Unmount()
}
if err1 != nil {
return fmtFuseErr(err1, fsdir)
}
return fmtFuseErr(err2, nsdir)
}
// setup node state, so that it can be cancelled
node.Mounts.Ipfs = fsmount
node.Mounts.Ipns = nsmount
return nil
}

View File

@ -0,0 +1,11 @@
package node
import (
"github.com/ipfs/go-ipfs/core"
)
func Mount(node *core.IpfsNode, fsdir, nsdir string) error {
// TODO
// currently a no-op, but we don't want to return an error
return nil
}