From f679127d835563a1dd29f787fc58a063314f88c6 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sun, 15 Mar 2015 11:13:59 -0700 Subject: [PATCH] comments! and cleanup --- core/core.go | 9 +++++--- ipnsfs/dir.go | 6 ++--- ipnsfs/file.go | 14 ++++++++++++ ipnsfs/system.go | 59 +++++++++++++++++++++++++++++++++++++++--------- 4 files changed, 70 insertions(+), 18 deletions(-) diff --git a/core/core.go b/core/core.go index eba3c4843..f7bfb7757 100644 --- a/core/core.go +++ b/core/core.go @@ -296,13 +296,16 @@ func (n *IpfsNode) teardown() error { n.Repo, } - if n.Blocks != nil { - closers = append(closers, n.Blocks) - } + // Filesystem needs to be closed before network, dht, and blockservice + // so it can use them as its shutting down if n.IpnsFs != nil { closers = append(closers, n.IpnsFs) } + if n.Blocks != nil { + closers = append(closers, n.Blocks) + } + if n.Bootstrapper != nil { closers = append(closers, n.Bootstrapper) } diff --git a/ipnsfs/dir.go b/ipnsfs/dir.go index 168d25a0f..edc36db28 100644 --- a/ipnsfs/dir.go +++ b/ipnsfs/dir.go @@ -70,10 +70,8 @@ func (d *Directory) Open(tpath []string, mode int) (*File, error) { return dir.Open(tpath[1:], mode) } -type childCloser interface { - closeChild(string, *dag.Node) error -} - +// closeChild updates the child by the given name to the dag node 'nd' +// and changes its own dag node, then propogates the changes upward func (d *Directory) closeChild(name string, nd *dag.Node) error { _, err := d.fs.dserv.Add(nd) if err != nil { diff --git a/ipnsfs/file.go b/ipnsfs/file.go index aa5a89b47..d820d5b45 100644 --- a/ipnsfs/file.go +++ b/ipnsfs/file.go @@ -21,6 +21,7 @@ type File struct { lock sync.Mutex } +// NewFile returns a NewFile object with the given parameters func NewFile(name string, node *dag.Node, parent childCloser, fs *Filesystem) (*File, error) { dmod, err := mod.NewDagModifier(context.Background(), node, fs.dserv, fs.pins.GetManual(), chunk.DefaultSplitter) if err != nil { @@ -35,6 +36,7 @@ func NewFile(name string, node *dag.Node, parent childCloser, fs *Filesystem) (* }, nil } +// Write writes the given data to the file at its current offset func (fi *File) Write(b []byte) (int, error) { fi.Lock() defer fi.Unlock() @@ -42,12 +44,15 @@ func (fi *File) Write(b []byte) (int, error) { return fi.mod.Write(b) } +// Read reads into the given buffer from the current offset func (fi *File) Read(b []byte) (int, error) { fi.Lock() defer fi.Unlock() return fi.mod.Read(b) } +// Close flushes, then propogates the modified dag node up the directory structure +// and signals a republish to occur func (fi *File) Close() error { fi.Lock() defer fi.Unlock() @@ -75,18 +80,21 @@ func (fi *File) Close() error { return nil } +// Flush flushes the changes in the file to disk func (fi *File) Flush() error { fi.Lock() defer fi.Unlock() return fi.mod.Flush() } +// Seek implements io.Seeker func (fi *File) Seek(offset int64, whence int) (int64, error) { fi.Lock() defer fi.Unlock() return fi.mod.Seek(offset, whence) } +// Write At writes the given bytes at the offset 'at' func (fi *File) WriteAt(b []byte, at int64) (int, error) { fi.Lock() defer fi.Unlock() @@ -94,18 +102,21 @@ func (fi *File) WriteAt(b []byte, at int64) (int, error) { return fi.mod.WriteAt(b, at) } +// Size returns the size of this file func (fi *File) Size() (int64, error) { fi.Lock() defer fi.Unlock() return fi.mod.Size() } +// GetNode returns the dag node associated with this file func (fi *File) GetNode() (*dag.Node, error) { fi.Lock() defer fi.Unlock() return fi.mod.GetNode() } +// Truncate truncates the file to size func (fi *File) Truncate(size int64) error { fi.Lock() defer fi.Unlock() @@ -113,14 +124,17 @@ func (fi *File) Truncate(size int64) error { return fi.mod.Truncate(size) } +// Type returns the type FSNode this is func (fi *File) Type() NodeType { return TFile } +// Lock the file func (fi *File) Lock() { fi.lock.Lock() } +// Unlock the file func (fi *File) Unlock() { fi.lock.Unlock() } diff --git a/ipnsfs/system.go b/ipnsfs/system.go index f14aa94d8..4e7395b06 100644 --- a/ipnsfs/system.go +++ b/ipnsfs/system.go @@ -1,3 +1,13 @@ +// package ipnsfs implements an in memory model of a mutable ipns filesystem, +// to be used by the fuse filesystem. +// +// It consists of four main structs: +// 1) The Filesystem +// The filesystem serves as a container and entry point for the ipns filesystem +// 2) KeyRoots +// KeyRoots represent the root of the keyspace controlled by a given keypair +// 3) Directories +// 4) Files package ipnsfs import ( @@ -5,6 +15,7 @@ import ( "fmt" "os" "strings" + "sync" "time" dag "github.com/jbenet/go-ipfs/merkledag" @@ -33,6 +44,7 @@ type Filesystem struct { roots map[string]*KeyRoot } +// NewFilesystem instantiates an ipns filesystem using the given parameters and locally owned keys func NewFilesystem(ctx context.Context, ds dag.DAGService, nsys namesys.NameSystem, pins pin.Pinner, keys ...ci.PrivKey) (*Filesystem, error) { roots := make(map[string]*KeyRoot) fs := &Filesystem{ @@ -47,7 +59,7 @@ func NewFilesystem(ctx context.Context, ds dag.DAGService, nsys namesys.NameSyst return nil, err } - root, err := fs.NewKeyRoot(ctx, k) + root, err := fs.newKeyRoot(ctx, k) if err != nil { return nil, err } @@ -57,6 +69,7 @@ func NewFilesystem(ctx context.Context, ds dag.DAGService, nsys namesys.NameSyst return fs, nil } +// Open opens a file at the given path func (fs *Filesystem) Open(tpath string, mode int) (*File, error) { pathelem := strings.Split(tpath, "/") r, ok := fs.roots[pathelem[0]] @@ -68,15 +81,23 @@ func (fs *Filesystem) Open(tpath string, mode int) (*File, error) { } func (fs *Filesystem) Close() error { + wg := sync.WaitGroup{} for _, r := range fs.roots { - err := r.Publish(context.TODO()) - if err != nil { - return err - } + wg.Add(1) + go func(r *KeyRoot) { + defer wg.Done() + err := r.Publish(context.TODO()) + if err != nil { + log.Error(err) + return + } + }(r) } + wg.Wait() return nil } +// GetRoot returns the KeyRoot of the given name func (fs *Filesystem) GetRoot(name string) (*KeyRoot, error) { r, ok := fs.roots[name] if ok { @@ -85,6 +106,10 @@ func (fs *Filesystem) GetRoot(name string) (*KeyRoot, error) { return nil, os.ErrNotExist } +type childCloser interface { + closeChild(string, *dag.Node) error +} + type NodeType int const ( @@ -92,6 +117,7 @@ const ( TDir ) +// FSNode represents any node (directory, root, or file) in the ipns filesystem type FSNode interface { GetNode() (*dag.Node, error) Type() NodeType @@ -115,7 +141,9 @@ type KeyRoot struct { repub *Republisher } -func (fs *Filesystem) NewKeyRoot(parent context.Context, k ci.PrivKey) (*KeyRoot, error) { +// newKeyRoot creates a new KeyRoot for the given key, and starts up a republisher routine +// for it +func (fs *Filesystem) newKeyRoot(parent context.Context, k ci.PrivKey) (*KeyRoot, error) { hash, err := k.GetPublic().Hash() if err != nil { return nil, err @@ -180,14 +208,14 @@ func (kr *KeyRoot) GetValue() FSNode { func (kr *KeyRoot) Open(tpath []string, mode int) (*File, error) { if kr.val == nil { - // No entry here... what should we do? - panic("nyi") + // No entry here. KeyRoot was created incorrectly + panic("nil keyroot.val, improperly constructed keyroot") } if len(tpath) > 0 { // Make sure our root is a directory dir, ok := kr.val.(*Directory) if !ok { - return nil, fmt.Errorf("no such file or directory: %s", tpath[0]) + return nil, os.ErrNotExist } return dir.Open(tpath, mode) @@ -222,6 +250,7 @@ func (kr *KeyRoot) Publish(ctx context.Context) error { return err } + // Holding this lock so our child doesnt change out from under us child.Lock() k, err := kr.fs.dserv.Add(nd) if err != nil { @@ -230,6 +259,8 @@ func (kr *KeyRoot) Publish(ctx context.Context) error { } child.Unlock() // Dont want to hold the lock while we publish + // otherwise we are holding the lock through a costly + // network operation fmt.Println("Publishing!") return kr.fs.nsys.Publish(ctx, kr.key, k) @@ -243,6 +274,8 @@ type Republisher struct { root *KeyRoot } +// NewRepublisher creates a new Republisher object to republish the given keyroot +// using the given short and long time intervals func NewRepublisher(root *KeyRoot, tshort, tlong time.Duration) *Republisher { return &Republisher{ TimeoutShort: tshort, @@ -252,6 +285,9 @@ func NewRepublisher(root *KeyRoot, tshort, tlong time.Duration) *Republisher { } } +// Touch signals that an update has occurred since the last publish. +// Multiple consecutive touches may extend the time period before +// the next Publish occurs in order to more efficiently batch updates func (np *Republisher) Touch() { select { case np.Publish <- struct{}{}: @@ -259,6 +295,7 @@ func (np *Republisher) Touch() { } } +// Run is the main republisher loop func (np *Republisher) Run(ctx context.Context) { for { select { @@ -268,13 +305,13 @@ func (np *Republisher) Run(ctx context.Context) { wait: select { - case <-quick: - case <-longer: case <-ctx.Done(): return case <-np.Publish: quick = time.After(np.TimeoutShort) goto wait + case <-quick: + case <-longer: } log.Info("Publishing Changes!")