1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-09-10 09:52:20 +08:00

mount: fixed mount init + teardown

This commit adds a Mount abstraction (which is really just
a wrapped context closer). It makes sure to bind the mount
to the fate of the Node (i.e. close it if the node ends).
This fixes #350
This commit is contained in:
Juan Batiz-Benet
2014-11-16 01:42:38 -08:00
parent 5a372f6996
commit 99f2378bac
6 changed files with 230 additions and 119 deletions

View File

@ -297,6 +297,7 @@ func callCommand(req cmds.Request, root *cmds.Command) (cmds.Response, error) {
// this is gross, and should be changed when we extract out the exec Context.
node := req.Context().NodeWithoutConstructing()
if node != nil {
log.Info("Shutting down node...")
node.Close()
}
}

View File

@ -11,6 +11,7 @@ import (
config "github.com/jbenet/go-ipfs/config"
core "github.com/jbenet/go-ipfs/core"
ipns "github.com/jbenet/go-ipfs/fuse/ipns"
mount "github.com/jbenet/go-ipfs/fuse/mount"
rofs "github.com/jbenet/go-ipfs/fuse/readonly"
)
@ -113,7 +114,6 @@ baz
if !found {
fsdir = cfg.Mounts.IPFS // use default value
}
fsdone := mountIpfs(node, fsdir)
// get default mount points
nsdir, found, err := req.Option("n").String()
@ -124,30 +124,14 @@ baz
nsdir = cfg.Mounts.IPNS // NB: be sure to not redeclare!
}
nsdone := mountIpns(node, nsdir, fsdir)
fmtFuseErr := func(err error) error {
s := err.Error()
if strings.Contains(s, fuseNoDirectory) {
s = strings.Replace(s, `fusermount: "fusermount:`, "", -1)
s = strings.Replace(s, `\n", exit status 1`, "", -1)
return cmds.ClientError(s)
}
return err
if err := doMount(node, fsdir, nsdir); err != nil {
return nil, err
}
// wait until mounts return an error (or timeout if successful)
select {
case err := <-fsdone:
return nil, fmtFuseErr(err)
case err := <-nsdone:
return nil, fmtFuseErr(err)
// mounted successfully, we timed out with no errors
case <-time.After(mountTimeout):
output := cfg.Mounts
return &output, nil
}
var output config.Mounts
output.IPFS = fsdir
output.IPNS = nsdir
return &output, nil
},
Type: &config.Mounts{},
Marshalers: cmds.MarshalerMap{
@ -160,33 +144,52 @@ baz
},
}
func mountIpfs(node *core.IpfsNode, fsdir string) <-chan error {
done := make(chan error)
log.Info("Mounting IPFS at ", fsdir)
go func() {
err := rofs.Mount(node, fsdir)
done <- err
close(done)
}()
return done
}
func mountIpns(node *core.IpfsNode, nsdir, fsdir string) <-chan error {
if nsdir == "" {
return nil
func doMount(node *core.IpfsNode, fsdir, nsdir string) error {
fmtFuseErr := func(err error) error {
s := err.Error()
if strings.Contains(s, fuseNoDirectory) {
s = strings.Replace(s, `fusermount: "fusermount:`, "", -1)
s = strings.Replace(s, `\n", exit status 1`, "", -1)
return cmds.ClientError(s)
}
return err
}
done := make(chan error)
log.Info("Mounting IPNS at ", nsdir)
// this sync stuff is so that both can be mounted simultaneously.
var fsmount mount.Mount
var nsmount mount.Mount
var err1 error
var err2 error
done := make(chan struct{})
go func() {
err := ipns.Mount(node, nsdir, fsdir)
done <- err
close(done)
fsmount, err1 = rofs.Mount(node, fsdir)
done <- struct{}{}
}()
return done
go func() {
nsmount, err2 = ipns.Mount(node, nsdir, fsdir)
done <- struct{}{}
}()
<-done
<-done
if err1 != nil || err2 != nil {
fsmount.Close()
nsmount.Close()
if err1 != nil {
return fmtFuseErr(err1)
} else {
return fmtFuseErr(err2)
}
}
// setup node state, so that it can be cancelled
node.Mounts.Ipfs = fsmount
node.Mounts.Ipns = nsmount
return nil
}
var platformFuseChecks = func() error {

View File

@ -27,6 +27,7 @@ import (
routing "github.com/jbenet/go-ipfs/routing"
dht "github.com/jbenet/go-ipfs/routing/dht"
u "github.com/jbenet/go-ipfs/util"
mount "github.com/jbenet/go-ipfs/fuse/mount"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
)
@ -74,11 +75,22 @@ type IpfsNode struct {
// the pinning manager
Pinning pin.Pinner
// current mount state, if any.
Mounts Mounts
ctxc.ContextCloser
onlineMode bool // alternatively, offline
}
// Mounts defines what the node's mount state is. This should
// perhaps be moved to the daemon or mount. It's here because
// it needs to be accessible across daemon requests.
type Mounts struct {
Ipfs mount.Mount
Ipns mount.Mount
}
// NewIpfsNode constructs a new IpfsNode based on the given config.
func NewIpfsNode(cfg *config.Config, online bool) (n *IpfsNode, err error) {
success := false // flip to true after all sub-system inits succeed

View File

@ -2,66 +2,69 @@ package ipns
import (
"fmt"
"os"
"os/exec"
"os/signal"
"runtime"
"syscall"
"time"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse/fs"
"github.com/jbenet/go-ipfs/core"
fuse "github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse"
fs "github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse/fs"
core "github.com/jbenet/go-ipfs/core"
mount "github.com/jbenet/go-ipfs/fuse/mount"
)
// Mount mounts an IpfsNode instance at a particular path. It
// serves until the process receives exit signals (to Unmount).
func Mount(ipfs *core.IpfsNode, fpath string, ipfspath string) error {
func Mount(ipfs *core.IpfsNode, fpath string, ipfspath string) (mount.Mount, error) {
log.Infof("Mounting ipns at %s...", fpath)
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT,
syscall.SIGTERM, syscall.SIGQUIT)
// setup the Mount abstraction.
m := mount.New(ipfs.Context(), fpath, unmount)
go func() {
defer ipfs.Network.Close()
<-sigc
for {
err := Unmount(fpath)
if err == nil {
return
}
time.Sleep(time.Millisecond * 100)
// go serve the mount
mount.ServeMount(m, func(m mount.Mount) error {
c, err := fuse.Mount(fpath)
if err != nil {
return err
}
}()
defer c.Close()
c, err := fuse.Mount(fpath)
if err != nil {
return err
}
defer c.Close()
fsys, err := NewIpns(ipfs, ipfspath)
if err != nil {
return err
}
fsys, err := NewIpns(ipfs, ipfspath)
if err != nil {
return err
log.Infof("Mounted ipns at %s.", fpath)
if err := fs.Serve(c, fsys); err != nil {
return err
}
// check if the mount process has an error to report
<-c.Ready
if err := c.MountError; err != nil {
return err
}
return nil
})
select {
case <-m.Closed():
return nil, fmt.Errorf("failed to mount")
case <-time.After(time.Second):
// assume it worked...
}
err = fs.Serve(c, fsys)
if err != nil {
return err
}
// check if the mount process has an error to report
<-c.Ready
if err := c.MountError; err != nil {
return err
}
return nil
// bind the mount (ContextCloser) to the node, so that when the node exits
// the fsclosers are automatically closed.
ipfs.AddCloserChild(m)
return m, nil
}
// Unmount attempts to unmount the provided FUSE mount point, forcibly
// unmount attempts to unmount the provided FUSE mount point, forcibly
// if necessary.
func Unmount(point string) error {
fmt.Printf("Unmounting %s...\n", point)
func unmount(point string) error {
log.Infof("Unmounting ipns at %s...", point)
var cmd *exec.Cmd
switch runtime.GOOS {

86
fuse/mount/mount.go Normal file
View File

@ -0,0 +1,86 @@
// package mount provides a simple abstraction around a mount point
package mount
import (
"fmt"
"time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
u "github.com/jbenet/go-ipfs/util"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
)
var log = u.Logger("mount")
// Mount represents a filesystem mount
type Mount interface {
// MountPoint is the path at which this mount is mounted
MountPoint() string
// Unmount calls Close.
Unmount() error
ctxc.ContextCloser
}
// UnmountFunc is a function used to unmount a mount
type UnmountFunc func(mountpoint string) error
// New constructs a new Mount instance. ctx is a context to wait upon,
// the mountpoint is the directory that the mount was mounted at, and unmount
// in an UnmountFunc to perform the unmounting logic.
func New(ctx context.Context, mountpoint string, unmount UnmountFunc) Mount {
m := &mount{
mpoint: mountpoint,
unmount: unmount,
}
m.ContextCloser = ctxc.NewContextCloser(ctx, m.persistentUnmount)
return m
}
type mount struct {
ctxc.ContextCloser
unmount UnmountFunc
mpoint string
}
// umount is called after the mount is closed.
// TODO this is hacky, make it better.
func (m *mount) persistentUnmount() error {
// ok try to unmount a whole bunch of times...
for i := 0; i < 34; i++ {
err := m.unmount(m.mpoint)
if err == nil {
return nil
}
time.Sleep(time.Millisecond * 300)
}
// didnt work.
return fmt.Errorf("Unmount %s failed after 10 seconds of trying.")
}
func (m *mount) MountPoint() string {
return m.mpoint
}
func (m *mount) Unmount() error {
return m.Close()
}
func ServeMount(m Mount, mount func(Mount) error) {
m.Children().Add(1)
// go serve the mount
go func() {
if err := mount(m); err != nil {
log.Error("%s mount: %s", m.MountPoint(), err)
}
m.Children().Done()
m.Unmount()
}()
}

View File

@ -9,9 +9,7 @@ import (
"io/ioutil"
"os"
"os/exec"
"os/signal"
"runtime"
"syscall"
"time"
fuse "github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse"
@ -19,6 +17,7 @@ import (
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
core "github.com/jbenet/go-ipfs/core"
mount "github.com/jbenet/go-ipfs/fuse/mount"
mdag "github.com/jbenet/go-ipfs/merkledag"
uio "github.com/jbenet/go-ipfs/unixfs/io"
ftpb "github.com/jbenet/go-ipfs/unixfs/pb"
@ -162,47 +161,54 @@ func (s *Node) ReadAll(intr fs.Intr) ([]byte, fuse.Error) {
// Mount mounts an IpfsNode instance at a particular path. It
// serves until the process receives exit signals (to Unmount).
func Mount(ipfs *core.IpfsNode, fpath string) error {
func Mount(ipfs *core.IpfsNode, fpath string) (mount.Mount, error) {
log.Infof("Mounting ipfs at %s...", fpath)
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT,
syscall.SIGTERM, syscall.SIGQUIT)
// setup the Mount abstraction.
m := mount.New(ipfs.Context(), fpath, unmount)
go func() {
defer ipfs.Network.Close()
<-sigc
for {
err := Unmount(fpath)
if err == nil {
return
}
time.Sleep(time.Millisecond * 10)
// go serve the mount
mount.ServeMount(m, func(m mount.Mount) error {
c, err := fuse.Mount(m.MountPoint())
if err != nil {
return err
}
}()
defer c.Close()
c, err := fuse.Mount(fpath)
if err != nil {
return err
}
defer c.Close()
fsys := FileSystem{Ipfs: ipfs}
err = fs.Serve(c, FileSystem{Ipfs: ipfs})
if err != nil {
return err
log.Infof("Mounted ipfs at %s.", fpath)
if err := fs.Serve(c, fsys); err != nil {
return err
}
// check if the mount process has an error to report
<-c.Ready
if err := c.MountError; err != nil {
m.Unmount()
return err
}
return nil
})
select {
case <-m.Closed():
return nil, fmt.Errorf("failed to mount")
case <-time.After(time.Second):
// assume it worked...
}
// check if the mount process has an error to report
<-c.Ready
if err := c.MountError; err != nil {
return err
}
return nil
// bind the mount (ContextCloser) to the node, so that when the node exits
// the fsclosers are automatically closed.
ipfs.AddCloserChild(m)
return m, nil
}
// Unmount attempts to unmount the provided FUSE mount point, forcibly
// if necessary.
func Unmount(point string) error {
log.Info("Unmounting %s...", point)
func unmount(point string) error {
log.Infof("Unmounting ipfs at %s...", point)
var cmd *exec.Cmd
switch runtime.GOOS {