diff --git a/core/core.go b/core/core.go index 45767728b..5d5eb3df4 100644 --- a/core/core.go +++ b/core/core.go @@ -37,6 +37,7 @@ import ( rp "github.com/jbenet/go-ipfs/exchange/reprovide" mount "github.com/jbenet/go-ipfs/fuse/mount" + ipnsfs "github.com/jbenet/go-ipfs/ipnsfs" merkledag "github.com/jbenet/go-ipfs/merkledag" namesys "github.com/jbenet/go-ipfs/namesys" path "github.com/jbenet/go-ipfs/path" @@ -89,6 +90,8 @@ type IpfsNode struct { Diagnostics *diag.Diagnostics // the diagnostics service Reprovider *rp.Reprovider // the value reprovider system + IpnsFs *ipnsfs.Filesystem + ctxgroup.ContextGroup mode mode @@ -138,6 +141,16 @@ func NewIPFSNode(parent context.Context, option ConfigOption) (*IpfsNode, error) node.Pinning = pin.NewPinner(node.Repo.Datastore(), node.DAG) } node.Resolver = &path.Resolver{DAG: node.DAG} + + // Setup the mutable ipns filesystem structure + if node.OnlineMode() { + fs, err := ipnsfs.NewFilesystem(ctx, node.DAG, node.Namesys, node.Pinning, node.PrivateKey) + if err != nil { + return nil, debugerror.Wrap(err) + } + node.IpnsFs = fs + } + success = true return node, nil } @@ -268,6 +281,7 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost // setup name system n.Namesys = namesys.NewNameSystem(n.Routing) + return nil } @@ -278,7 +292,6 @@ func (n *IpfsNode) teardown() error { // owned objects are closed in this teardown to ensure that they're closed // regardless of which constructor was used to add them to the node. closers := []io.Closer{ - n.Blocks, n.Exchange, n.Repo, } @@ -288,6 +301,13 @@ func (n *IpfsNode) teardown() error { } } + if n.Blocks != nil { + addCloser(n.Blocks) + } + if n.IpnsFs != nil { + addCloser(n.IpnsFs) + } + addCloser(n.Bootstrapper) if dht, ok := n.Routing.(*dht.IpfsDHT); ok { addCloser(dht) diff --git a/core/mock.go b/core/mock.go index 987ba02b0..149795cef 100644 --- a/core/mock.go +++ b/core/mock.go @@ -13,8 +13,9 @@ import ( mocknet "github.com/jbenet/go-ipfs/p2p/net/mock" peer "github.com/jbenet/go-ipfs/p2p/peer" path "github.com/jbenet/go-ipfs/path" + pin "github.com/jbenet/go-ipfs/pin" "github.com/jbenet/go-ipfs/repo" - mockrouting "github.com/jbenet/go-ipfs/routing/mock" + offrt "github.com/jbenet/go-ipfs/routing/offline" ds2 "github.com/jbenet/go-ipfs/util/datastore2" testutil "github.com/jbenet/go-ipfs/util/testutil" ) @@ -54,7 +55,7 @@ func NewMockNode() (*IpfsNode, error) { } // Routing - nd.Routing = mockrouting.NewServer().Client(ident) + nd.Routing = offrt.NewOfflineRouter(nd.Repo.Datastore(), nd.PrivateKey) // Bitswap bstore := blockstore.NewBlockstore(nd.Repo.Datastore()) @@ -65,6 +66,8 @@ func NewMockNode() (*IpfsNode, error) { nd.DAG = mdag.NewDAGService(bserv) + nd.Pinning = pin.NewPinner(nd.Repo.Datastore(), nd.DAG) + // Namespace resolver nd.Namesys = nsys.NewNameSystem(nd.Routing) diff --git a/ipnsfs/dir.go b/ipnsfs/dir.go new file mode 100644 index 000000000..979ace875 --- /dev/null +++ b/ipnsfs/dir.go @@ -0,0 +1,340 @@ +package ipnsfs + +import ( + "errors" + "fmt" + "os" + "sync" + + dag "github.com/jbenet/go-ipfs/merkledag" + ft "github.com/jbenet/go-ipfs/unixfs" + ufspb "github.com/jbenet/go-ipfs/unixfs/pb" +) + +type Directory struct { + fs *Filesystem + parent childCloser + childDirs map[string]*Directory + files map[string]*file + + node *dag.Node + name string + lock sync.Mutex + + ref int + refLock sync.Mutex +} + +func NewDirectory(name string, node *dag.Node, parent childCloser, fs *Filesystem) *Directory { + return &Directory{ + fs: fs, + name: name, + node: node, + parent: parent, + childDirs: make(map[string]*Directory), + files: make(map[string]*file), + } +} + +func (d *Directory) Open(tpath []string, mode int) (File, error) { + if len(tpath) == 0 { + return nil, ErrIsDirectory + } + if len(tpath) == 1 { + fi, err := d.childFile(tpath[0]) + if err == nil { + return fi.withMode(mode), nil + } + + if mode|os.O_CREATE != 0 { + fnode := new(dag.Node) + fnode.Data = ft.FilePBData(nil, 0) + nfi, err := NewFile(tpath[0], fnode, d, d.fs) + if err != nil { + return nil, err + } + d.files[tpath[0]] = nfi + return nfi.withMode(mode), nil + } + + return nil, ErrNoSuch + } + + dir, err := d.childDir(tpath[0]) + if err != nil { + return nil, err + } + return dir.Open(tpath[1:], mode) +} + +// consider combining into a single method... +type childCloser interface { + closeChild(string, *dag.Node) error +} + +func (d *Directory) closeChild(name string, nd *dag.Node) error { + _, err := d.fs.dserv.Add(nd) + if err != nil { + return err + } + + d.lock.Lock() + err = d.node.RemoveNodeLink(name) + if err != nil && err != dag.ErrNotFound { + d.lock.Unlock() + return err + } + + err = d.node.AddNodeLinkClean(name, nd) + if err != nil { + d.lock.Unlock() + return err + } + d.lock.Unlock() + + return d.parent.closeChild(d.name, d.node) +} + +func (d *Directory) Type() NodeType { + return TDir +} + +func (d *Directory) childFile(name string) (*file, error) { + fi, ok := d.files[name] + if ok { + return fi, nil + } + + // search dag + for _, lnk := range d.node.Links { + if lnk.Name == name { + nd, err := lnk.GetNode(d.fs.dserv) + if err != nil { + return nil, err + } + i, err := ft.FromBytes(nd.Data) + if err != nil { + return nil, err + } + + switch i.GetType() { + case ufspb.Data_Directory: + return nil, ErrIsDirectory + case ufspb.Data_File: + nfi, err := NewFile(name, nd, d, d.fs) + if err != nil { + return nil, err + } + d.files[name] = nfi + return nfi, nil + case ufspb.Data_Metadata: + panic("NOT YET IMPLEMENTED") + default: + panic("NO!") + } + } + } + return nil, ErrNoSuch +} + +func (d *Directory) childDir(name string) (*Directory, error) { + dir, ok := d.childDirs[name] + if ok { + return dir, nil + } + + for _, lnk := range d.node.Links { + if lnk.Name == name { + nd, err := lnk.GetNode(d.fs.dserv) + if err != nil { + return nil, err + } + i, err := ft.FromBytes(nd.Data) + if err != nil { + return nil, err + } + + switch i.GetType() { + case ufspb.Data_Directory: + ndir := NewDirectory(name, nd, d, d.fs) + d.childDirs[name] = ndir + return ndir, nil + case ufspb.Data_File: + return nil, fmt.Errorf("%s is not a directory", name) + case ufspb.Data_Metadata: + panic("NOT YET IMPLEMENTED") + default: + panic("NO!") + } + } + + } + + return nil, ErrNoSuch +} + +func (d *Directory) Child(name string) (FSNode, error) { + d.lock.Lock() + defer d.lock.Unlock() + dir, err := d.childDir(name) + if err == nil { + return dir, nil + } + fi, err := d.childFile(name) + if err == nil { + return fi, nil + } + + return nil, ErrNoSuch +} + +func (d *Directory) List() []string { + d.lock.Lock() + defer d.lock.Unlock() + + var out []string + for _, lnk := range d.node.Links { + out = append(out, lnk.Name) + } + return out +} + +func (d *Directory) Mkdir(name string) (*Directory, error) { + d.lock.Lock() + + _, err := d.childDir(name) + if err == nil { + d.lock.Unlock() + return nil, errors.New("directory by that name already exists") + } + _, err = d.childFile(name) + if err == nil { + d.lock.Unlock() + return nil, errors.New("file by that name already exists") + } + + ndir := &dag.Node{Data: ft.FolderPBData()} + err = d.node.AddNodeLinkClean(name, ndir) + if err != nil { + d.lock.Unlock() + return nil, err + } + d.lock.Unlock() + + err = d.parent.closeChild(d.name, d.node) + if err != nil { + return nil, err + } + + d.lock.Lock() + defer d.lock.Unlock() + + return d.childDir(name) +} + +func (d *Directory) Unlink(name string) error { + d.lock.Lock() + delete(d.childDirs, name) + delete(d.files, name) + + err := d.node.RemoveNodeLink(name) + if err != nil { + d.lock.Unlock() + return err + } + d.lock.Unlock() + + return d.parent.closeChild(d.name, d.node) +} + +func (d *Directory) RenameEntry(oldname, newname string) error { + dir, err := d.childDir(oldname) + if err == nil { + dir.name = newname + + err := d.node.RemoveNodeLink(oldname) + if err != nil { + return err + } + err = d.node.AddNodeLinkClean(newname, dir.node) + if err != nil { + return err + } + + delete(d.childDirs, oldname) + d.childDirs[newname] = dir + return d.parent.closeChild(d.name, d.node) + } + + fi, err := d.childFile(oldname) + if err == nil { + fi.name = newname + + err := d.node.RemoveNodeLink(oldname) + if err != nil { + return err + } + + nd, err := fi.GetNode() + if err != nil { + return err + } + + err = d.node.AddNodeLinkClean(newname, nd) + if err != nil { + return err + } + + delete(d.childDirs, oldname) + d.files[newname] = fi + return d.parent.closeChild(d.name, d.node) + } + return ErrNoSuch +} + +func (d *Directory) AddChild(name string, nd *dag.Node) error { + pbn, err := ft.FromBytes(nd.Data) + if err != nil { + return err + } + + _, err = d.Child(name) + if err == nil { + return errors.New("directory already has entry by that name") + } + + err = d.node.AddNodeLinkClean(name, nd) + if err != nil { + return err + } + + switch pbn.GetType() { + case ft.TDirectory: + d.childDirs[name] = NewDirectory(name, nd, d, d.fs) + case ft.TFile, ft.TMetadata, ft.TRaw: + nfi, err := NewFile(name, nd, d, d.fs) + if err != nil { + return err + } + d.files[name] = nfi + default: + panic("invalid unixfs node") + } + return d.parent.closeChild(d.name, d.node) +} + +func (d *Directory) GetNode() (*dag.Node, error) { + return d.node, nil +} + +func (d *Directory) Upref() { + d.refLock.Lock() + d.ref++ + d.refLock.Unlock() +} + +func (d *Directory) Deref() { + d.refLock.Lock() + d.ref-- + d.refLock.Unlock() +} diff --git a/ipnsfs/file.go b/ipnsfs/file.go new file mode 100644 index 000000000..d5c951690 --- /dev/null +++ b/ipnsfs/file.go @@ -0,0 +1,145 @@ +package ipnsfs + +import ( + "errors" + "io" + "os" + "sync" + + chunk "github.com/jbenet/go-ipfs/importer/chunk" + dag "github.com/jbenet/go-ipfs/merkledag" + mod "github.com/jbenet/go-ipfs/unixfs/mod" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" +) + +type File interface { + io.ReadWriteCloser + io.WriterAt + Seek(int64, int) (int64, error) + Size() (int64, error) + Flush() error + Truncate(int64) error + FSNode +} + +type file struct { + parent childCloser + fs *Filesystem + + name string + hasChanges bool + + // TODO: determine whether or not locking here is actually required... + lk sync.Mutex + mod *mod.DagModifier +} + +func NewFile(name string, node *dag.Node, parent childCloser, fs *Filesystem) (*file, error) { + dmod, err := mod.NewDagModifier(context.TODO(), node, fs.dserv, fs.pins.GetManual(), chunk.DefaultSplitter) + if err != nil { + return nil, err + } + + return &file{ + fs: fs, + parent: parent, + name: name, + mod: dmod, + }, nil +} + +func (fi *file) Write(b []byte) (int, error) { + fi.lk.Lock() + defer fi.lk.Unlock() + fi.hasChanges = true + return fi.mod.Write(b) +} + +func (fi *file) Read(b []byte) (int, error) { + fi.lk.Lock() + defer fi.lk.Unlock() + return fi.mod.Read(b) +} + +func (fi *file) Close() error { + fi.lk.Lock() + defer fi.lk.Unlock() + if fi.hasChanges { + err := fi.mod.Flush() + if err != nil { + return err + } + + nd, err := fi.mod.GetNode() + if err != nil { + return err + } + + err = fi.parent.closeChild(fi.name, nd) + if err != nil { + return err + } + + fi.hasChanges = false + } + + return nil +} + +func (fi *file) Flush() error { + fi.lk.Lock() + defer fi.lk.Unlock() + return fi.mod.Flush() +} + +func (fi *file) withMode(mode int) File { + if mode == os.O_RDONLY { + return &readOnlyFile{fi} + } + return fi +} + +func (fi *file) Seek(offset int64, whence int) (int64, error) { + fi.lk.Lock() + defer fi.lk.Unlock() + return fi.mod.Seek(offset, whence) +} + +func (fi *file) WriteAt(b []byte, at int64) (int, error) { + fi.lk.Lock() + defer fi.lk.Unlock() + fi.hasChanges = true + return fi.mod.WriteAt(b, at) +} + +func (fi *file) Size() (int64, error) { + fi.lk.Lock() + defer fi.lk.Unlock() + return fi.mod.Size() +} + +func (fi *file) GetNode() (*dag.Node, error) { + fi.lk.Lock() + defer fi.lk.Unlock() + return fi.mod.GetNode() +} + +func (fi *file) Truncate(size int64) error { + fi.lk.Lock() + defer fi.lk.Unlock() + fi.hasChanges = true + return fi.mod.Truncate(size) +} + +func (fi *file) Type() NodeType { + return TFile +} + +type readOnlyFile struct { + *file +} + +func (ro *readOnlyFile) Write([]byte) (int, error) { + return 0, errors.New("permission denied: file readonly") +} diff --git a/ipnsfs/system.go b/ipnsfs/system.go new file mode 100644 index 000000000..e3f25fcc6 --- /dev/null +++ b/ipnsfs/system.go @@ -0,0 +1,285 @@ +package ipnsfs + +import ( + "errors" + "fmt" + "strings" + "time" + + dag "github.com/jbenet/go-ipfs/merkledag" + namesys "github.com/jbenet/go-ipfs/namesys" + ci "github.com/jbenet/go-ipfs/p2p/crypto" + pin "github.com/jbenet/go-ipfs/pin" + ft "github.com/jbenet/go-ipfs/unixfs" + u "github.com/jbenet/go-ipfs/util" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog" +) + +var log = eventlog.Logger("ipnsfs") + +var ErrIsDirectory = errors.New("error: is a directory") + +var ErrNoSuch = errors.New("no such file or directory") + +// Filesystem is the writeable fuse filesystem structure +type Filesystem struct { + dserv dag.DAGService + + nsys namesys.NameSystem + + pins pin.Pinner + + roots map[string]*KeyRoot +} + +func NewFilesystem(ctx context.Context, ds dag.DAGService, nsys namesys.NameSystem, pins pin.Pinner, keys ...ci.PrivKey) (*Filesystem, error) { + roots := make(map[string]*KeyRoot) + fs := &Filesystem{ + roots: roots, + nsys: nsys, + dserv: ds, + pins: pins, + } + for _, k := range keys { + pkh, err := k.GetPublic().Hash() + if err != nil { + return nil, err + } + + root, err := fs.NewKeyRoot(ctx, k) + if err != nil { + return nil, err + } + roots[u.Key(pkh).Pretty()] = root + } + + return fs, nil +} + +func (fs *Filesystem) Open(tpath string, mode int) (File, error) { + pathelem := strings.Split(tpath, "/") + r, ok := fs.roots[pathelem[0]] + if !ok { + return nil, ErrNoSuch + } + + return r.Open(pathelem[1:], mode) +} + +func (fs *Filesystem) Close() error { + for _, r := range fs.roots { + err := r.Publish(context.TODO()) + if err != nil { + return err + } + } + return nil +} + +func (fs *Filesystem) GetRoot(name string) (*KeyRoot, error) { + r, ok := fs.roots[name] + if ok { + return r, nil + } + return nil, ErrNoSuch +} + +type NodeType int + +const ( + TFile NodeType = iota + TDir +) + +type FSNode interface { + GetNode() (*dag.Node, error) + Type() NodeType +} + +// KeyRoot represents the root of a filesystem tree pointed to by a given keypair +type KeyRoot struct { + key ci.PrivKey + + // node is the merkledag node pointed to by this keypair + node *dag.Node + + // A pointer to the filesystem to access components + fs *Filesystem + + // val represents the node pointed to by this key. It can either be a File or a Directory + val FSNode + + repub *Republisher +} + +func (fs *Filesystem) NewKeyRoot(parent context.Context, k ci.PrivKey) (*KeyRoot, error) { + hash, err := k.GetPublic().Hash() + if err != nil { + return nil, err + } + + name := u.Key(hash).Pretty() + + root := new(KeyRoot) + root.key = k + root.fs = fs + + ctx, cancel := context.WithCancel(parent) + defer cancel() + + pointsTo, err := fs.nsys.Resolve(ctx, name) + if err != nil { + err = namesys.InitializeKeyspace(ctx, fs.dserv, fs.nsys, fs.pins, k) + if err != nil { + return nil, err + } + + pointsTo, err = fs.nsys.Resolve(ctx, name) + if err != nil { + return nil, err + } + } + + mnode, err := fs.dserv.Get(pointsTo) + if err != nil { + return nil, err + } + + root.node = mnode + + root.repub = NewRepublisher(root, time.Millisecond*300, time.Second*3) + go root.repub.Run(parent) + + pbn, err := ft.FromBytes(mnode.Data) + if err != nil { + log.Error("IPNS pointer was not unixfs node") + return nil, err + } + + switch pbn.GetType() { + case ft.TDirectory: + root.val = NewDirectory(pointsTo.B58String(), mnode, root, fs) + case ft.TFile, ft.TMetadata, ft.TRaw: + fi, err := NewFile(pointsTo.B58String(), mnode, root, fs) + if err != nil { + return nil, err + } + root.val = fi + default: + panic("unrecognized! (NYI)") + } + return root, nil +} + +func (kr *KeyRoot) GetValue() FSNode { + return kr.val +} + +func (kr *KeyRoot) Open(tpath []string, mode int) (File, error) { + if kr.val == nil { + // No entry here... what should we do? + panic("nyi") + } + if len(tpath) > 0 { + // Make sure our root is a directory + dir, ok := kr.val.(*Directory) + if !ok { + return nil, fmt.Errorf("no such file or directory: %s", tpath[0]) + } + + return dir.Open(tpath, mode) + } + + switch t := kr.val.(type) { + case *Directory: + return nil, ErrIsDirectory + case File: + return t, nil + default: + panic("unrecognized type, should not happen") + } +} + +// closeChild implements the childCloser interface, and signals to the publisher that +// there are changes ready to be published +func (kr *KeyRoot) closeChild(name string, nd *dag.Node) error { + kr.repub.Touch() + return nil +} + +// Publish publishes the ipns entry associated with this key +func (kr *KeyRoot) Publish(ctx context.Context) error { + child, ok := kr.val.(FSNode) + if !ok { + return errors.New("child of key root not valid type") + } + + nd, err := child.GetNode() + if err != nil { + return err + } + + k, err := kr.fs.dserv.Add(nd) + if err != nil { + return err + } + + fmt.Println("Publishing!") + return kr.fs.nsys.Publish(ctx, kr.key, k) +} + +// Republisher manages when to publish the ipns entry associated with a given key +type Republisher struct { + TimeoutLong time.Duration + TimeoutShort time.Duration + Publish chan struct{} + root *KeyRoot +} + +func NewRepublisher(root *KeyRoot, tshort, tlong time.Duration) *Republisher { + return &Republisher{ + TimeoutShort: tshort, + TimeoutLong: tlong, + Publish: make(chan struct{}, 1), + root: root, + } +} + +func (np *Republisher) Touch() { + select { + case np.Publish <- struct{}{}: + default: + } +} + +func (np *Republisher) Run(ctx context.Context) { + for { + select { + case <-np.Publish: + quick := time.After(np.TimeoutShort) + longer := time.After(np.TimeoutLong) + + wait: + select { + case <-quick: + case <-longer: + case <-ctx.Done(): + return + case <-np.Publish: + quick = time.After(np.TimeoutShort) + goto wait + } + + log.Info("Publishing Changes!") + err := np.root.Publish(ctx) + if err != nil { + log.Critical("republishRoot error: %s", err) + } + + case <-ctx.Done(): + return + } + } +} diff --git a/ipnsfs/system_test.go b/ipnsfs/system_test.go new file mode 100644 index 000000000..355cd3f87 --- /dev/null +++ b/ipnsfs/system_test.go @@ -0,0 +1,92 @@ +package ipnsfs_test + +import ( + "bytes" + "io/ioutil" + "os" + "path" + "testing" + + core "github.com/jbenet/go-ipfs/core" + . "github.com/jbenet/go-ipfs/ipnsfs" + u "github.com/jbenet/go-ipfs/util" +) + +func testFS(t *testing.T, nd *core.IpfsNode) *Filesystem { + fs, err := NewFilesystem(nd.Context(), nd.DAG, nd.Namesys, nd.Pinning, nd.PrivateKey) + if err != nil { + t.Fatal(err) + } + + return fs +} + +func TestBasic(t *testing.T) { + mock, err := core.NewMockNode() + if err != nil { + t.Fatal(err) + } + + fs := testFS(t, mock) + + k := u.Key(mock.Identity) + p := path.Join(k.B58String(), "file") + fi, err := fs.Open(p, os.O_CREATE) + if err != nil { + t.Fatal(err) + } + + data := []byte("Hello World") + n, err := fi.Write(data) + if err != nil { + t.Fatal(err) + } + + if n != len(data) { + t.Fatal("wrote incorrect amount") + } + + err = fi.Close() + if err != nil { + t.Fatal(err) + } + + nfi, err := fs.Open(p, os.O_RDONLY) + if err != nil { + t.Fatal(err) + } + + out, err := ioutil.ReadAll(nfi) + if err != nil { + t.Fatal(err) + } + + err = nfi.Close() + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(out, data) { + t.Fatal("Write failed.") + } + + err = fs.Close() + if err != nil { + t.Fatal(err) + } + + // Open the filesystem again, and try to read our file + nfs := testFS(t, mock) + + fi, err = nfs.Open(p, os.O_RDONLY) + nb, err := ioutil.ReadAll(fi) + if err != nil { + t.Fatal(err) + } + + t.Log(nb) + + if !bytes.Equal(nb, data) { + t.Fatal("data not the same after closing down fs") + } +}