1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-27 07:57:30 +08:00

comments! and cleanup

This commit is contained in:
Jeromy
2015-03-15 11:13:59 -07:00
parent 0ee7091cbc
commit f679127d83
4 changed files with 70 additions and 18 deletions

View File

@ -296,13 +296,16 @@ func (n *IpfsNode) teardown() error {
n.Repo, n.Repo,
} }
if n.Blocks != nil { // Filesystem needs to be closed before network, dht, and blockservice
closers = append(closers, n.Blocks) // so it can use them as its shutting down
}
if n.IpnsFs != nil { if n.IpnsFs != nil {
closers = append(closers, n.IpnsFs) closers = append(closers, n.IpnsFs)
} }
if n.Blocks != nil {
closers = append(closers, n.Blocks)
}
if n.Bootstrapper != nil { if n.Bootstrapper != nil {
closers = append(closers, n.Bootstrapper) closers = append(closers, n.Bootstrapper)
} }

View File

@ -70,10 +70,8 @@ func (d *Directory) Open(tpath []string, mode int) (*File, error) {
return dir.Open(tpath[1:], mode) return dir.Open(tpath[1:], mode)
} }
type childCloser interface { // closeChild updates the child by the given name to the dag node 'nd'
closeChild(string, *dag.Node) error // and changes its own dag node, then propogates the changes upward
}
func (d *Directory) closeChild(name string, nd *dag.Node) error { func (d *Directory) closeChild(name string, nd *dag.Node) error {
_, err := d.fs.dserv.Add(nd) _, err := d.fs.dserv.Add(nd)
if err != nil { if err != nil {

View File

@ -21,6 +21,7 @@ type File struct {
lock sync.Mutex lock sync.Mutex
} }
// NewFile returns a NewFile object with the given parameters
func NewFile(name string, node *dag.Node, parent childCloser, fs *Filesystem) (*File, error) { func NewFile(name string, node *dag.Node, parent childCloser, fs *Filesystem) (*File, error) {
dmod, err := mod.NewDagModifier(context.Background(), node, fs.dserv, fs.pins.GetManual(), chunk.DefaultSplitter) dmod, err := mod.NewDagModifier(context.Background(), node, fs.dserv, fs.pins.GetManual(), chunk.DefaultSplitter)
if err != nil { if err != nil {
@ -35,6 +36,7 @@ func NewFile(name string, node *dag.Node, parent childCloser, fs *Filesystem) (*
}, nil }, nil
} }
// Write writes the given data to the file at its current offset
func (fi *File) Write(b []byte) (int, error) { func (fi *File) Write(b []byte) (int, error) {
fi.Lock() fi.Lock()
defer fi.Unlock() defer fi.Unlock()
@ -42,12 +44,15 @@ func (fi *File) Write(b []byte) (int, error) {
return fi.mod.Write(b) return fi.mod.Write(b)
} }
// Read reads into the given buffer from the current offset
func (fi *File) Read(b []byte) (int, error) { func (fi *File) Read(b []byte) (int, error) {
fi.Lock() fi.Lock()
defer fi.Unlock() defer fi.Unlock()
return fi.mod.Read(b) return fi.mod.Read(b)
} }
// Close flushes, then propogates the modified dag node up the directory structure
// and signals a republish to occur
func (fi *File) Close() error { func (fi *File) Close() error {
fi.Lock() fi.Lock()
defer fi.Unlock() defer fi.Unlock()
@ -75,18 +80,21 @@ func (fi *File) Close() error {
return nil return nil
} }
// Flush flushes the changes in the file to disk
func (fi *File) Flush() error { func (fi *File) Flush() error {
fi.Lock() fi.Lock()
defer fi.Unlock() defer fi.Unlock()
return fi.mod.Flush() return fi.mod.Flush()
} }
// Seek implements io.Seeker
func (fi *File) Seek(offset int64, whence int) (int64, error) { func (fi *File) Seek(offset int64, whence int) (int64, error) {
fi.Lock() fi.Lock()
defer fi.Unlock() defer fi.Unlock()
return fi.mod.Seek(offset, whence) return fi.mod.Seek(offset, whence)
} }
// Write At writes the given bytes at the offset 'at'
func (fi *File) WriteAt(b []byte, at int64) (int, error) { func (fi *File) WriteAt(b []byte, at int64) (int, error) {
fi.Lock() fi.Lock()
defer fi.Unlock() defer fi.Unlock()
@ -94,18 +102,21 @@ func (fi *File) WriteAt(b []byte, at int64) (int, error) {
return fi.mod.WriteAt(b, at) return fi.mod.WriteAt(b, at)
} }
// Size returns the size of this file
func (fi *File) Size() (int64, error) { func (fi *File) Size() (int64, error) {
fi.Lock() fi.Lock()
defer fi.Unlock() defer fi.Unlock()
return fi.mod.Size() return fi.mod.Size()
} }
// GetNode returns the dag node associated with this file
func (fi *File) GetNode() (*dag.Node, error) { func (fi *File) GetNode() (*dag.Node, error) {
fi.Lock() fi.Lock()
defer fi.Unlock() defer fi.Unlock()
return fi.mod.GetNode() return fi.mod.GetNode()
} }
// Truncate truncates the file to size
func (fi *File) Truncate(size int64) error { func (fi *File) Truncate(size int64) error {
fi.Lock() fi.Lock()
defer fi.Unlock() defer fi.Unlock()
@ -113,14 +124,17 @@ func (fi *File) Truncate(size int64) error {
return fi.mod.Truncate(size) return fi.mod.Truncate(size)
} }
// Type returns the type FSNode this is
func (fi *File) Type() NodeType { func (fi *File) Type() NodeType {
return TFile return TFile
} }
// Lock the file
func (fi *File) Lock() { func (fi *File) Lock() {
fi.lock.Lock() fi.lock.Lock()
} }
// Unlock the file
func (fi *File) Unlock() { func (fi *File) Unlock() {
fi.lock.Unlock() fi.lock.Unlock()
} }

View File

@ -1,3 +1,13 @@
// package ipnsfs implements an in memory model of a mutable ipns filesystem,
// to be used by the fuse filesystem.
//
// It consists of four main structs:
// 1) The Filesystem
// The filesystem serves as a container and entry point for the ipns filesystem
// 2) KeyRoots
// KeyRoots represent the root of the keyspace controlled by a given keypair
// 3) Directories
// 4) Files
package ipnsfs package ipnsfs
import ( import (
@ -5,6 +15,7 @@ import (
"fmt" "fmt"
"os" "os"
"strings" "strings"
"sync"
"time" "time"
dag "github.com/jbenet/go-ipfs/merkledag" dag "github.com/jbenet/go-ipfs/merkledag"
@ -33,6 +44,7 @@ type Filesystem struct {
roots map[string]*KeyRoot roots map[string]*KeyRoot
} }
// NewFilesystem instantiates an ipns filesystem using the given parameters and locally owned keys
func NewFilesystem(ctx context.Context, ds dag.DAGService, nsys namesys.NameSystem, pins pin.Pinner, keys ...ci.PrivKey) (*Filesystem, error) { func NewFilesystem(ctx context.Context, ds dag.DAGService, nsys namesys.NameSystem, pins pin.Pinner, keys ...ci.PrivKey) (*Filesystem, error) {
roots := make(map[string]*KeyRoot) roots := make(map[string]*KeyRoot)
fs := &Filesystem{ fs := &Filesystem{
@ -47,7 +59,7 @@ func NewFilesystem(ctx context.Context, ds dag.DAGService, nsys namesys.NameSyst
return nil, err return nil, err
} }
root, err := fs.NewKeyRoot(ctx, k) root, err := fs.newKeyRoot(ctx, k)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -57,6 +69,7 @@ func NewFilesystem(ctx context.Context, ds dag.DAGService, nsys namesys.NameSyst
return fs, nil return fs, nil
} }
// Open opens a file at the given path
func (fs *Filesystem) Open(tpath string, mode int) (*File, error) { func (fs *Filesystem) Open(tpath string, mode int) (*File, error) {
pathelem := strings.Split(tpath, "/") pathelem := strings.Split(tpath, "/")
r, ok := fs.roots[pathelem[0]] r, ok := fs.roots[pathelem[0]]
@ -68,15 +81,23 @@ func (fs *Filesystem) Open(tpath string, mode int) (*File, error) {
} }
func (fs *Filesystem) Close() error { func (fs *Filesystem) Close() error {
wg := sync.WaitGroup{}
for _, r := range fs.roots { for _, r := range fs.roots {
err := r.Publish(context.TODO()) wg.Add(1)
if err != nil { go func(r *KeyRoot) {
return err defer wg.Done()
} err := r.Publish(context.TODO())
if err != nil {
log.Error(err)
return
}
}(r)
} }
wg.Wait()
return nil return nil
} }
// GetRoot returns the KeyRoot of the given name
func (fs *Filesystem) GetRoot(name string) (*KeyRoot, error) { func (fs *Filesystem) GetRoot(name string) (*KeyRoot, error) {
r, ok := fs.roots[name] r, ok := fs.roots[name]
if ok { if ok {
@ -85,6 +106,10 @@ func (fs *Filesystem) GetRoot(name string) (*KeyRoot, error) {
return nil, os.ErrNotExist return nil, os.ErrNotExist
} }
type childCloser interface {
closeChild(string, *dag.Node) error
}
type NodeType int type NodeType int
const ( const (
@ -92,6 +117,7 @@ const (
TDir TDir
) )
// FSNode represents any node (directory, root, or file) in the ipns filesystem
type FSNode interface { type FSNode interface {
GetNode() (*dag.Node, error) GetNode() (*dag.Node, error)
Type() NodeType Type() NodeType
@ -115,7 +141,9 @@ type KeyRoot struct {
repub *Republisher repub *Republisher
} }
func (fs *Filesystem) NewKeyRoot(parent context.Context, k ci.PrivKey) (*KeyRoot, error) { // newKeyRoot creates a new KeyRoot for the given key, and starts up a republisher routine
// for it
func (fs *Filesystem) newKeyRoot(parent context.Context, k ci.PrivKey) (*KeyRoot, error) {
hash, err := k.GetPublic().Hash() hash, err := k.GetPublic().Hash()
if err != nil { if err != nil {
return nil, err return nil, err
@ -180,14 +208,14 @@ func (kr *KeyRoot) GetValue() FSNode {
func (kr *KeyRoot) Open(tpath []string, mode int) (*File, error) { func (kr *KeyRoot) Open(tpath []string, mode int) (*File, error) {
if kr.val == nil { if kr.val == nil {
// No entry here... what should we do? // No entry here. KeyRoot was created incorrectly
panic("nyi") panic("nil keyroot.val, improperly constructed keyroot")
} }
if len(tpath) > 0 { if len(tpath) > 0 {
// Make sure our root is a directory // Make sure our root is a directory
dir, ok := kr.val.(*Directory) dir, ok := kr.val.(*Directory)
if !ok { if !ok {
return nil, fmt.Errorf("no such file or directory: %s", tpath[0]) return nil, os.ErrNotExist
} }
return dir.Open(tpath, mode) return dir.Open(tpath, mode)
@ -222,6 +250,7 @@ func (kr *KeyRoot) Publish(ctx context.Context) error {
return err return err
} }
// Holding this lock so our child doesnt change out from under us
child.Lock() child.Lock()
k, err := kr.fs.dserv.Add(nd) k, err := kr.fs.dserv.Add(nd)
if err != nil { if err != nil {
@ -230,6 +259,8 @@ func (kr *KeyRoot) Publish(ctx context.Context) error {
} }
child.Unlock() child.Unlock()
// Dont want to hold the lock while we publish // Dont want to hold the lock while we publish
// otherwise we are holding the lock through a costly
// network operation
fmt.Println("Publishing!") fmt.Println("Publishing!")
return kr.fs.nsys.Publish(ctx, kr.key, k) return kr.fs.nsys.Publish(ctx, kr.key, k)
@ -243,6 +274,8 @@ type Republisher struct {
root *KeyRoot root *KeyRoot
} }
// NewRepublisher creates a new Republisher object to republish the given keyroot
// using the given short and long time intervals
func NewRepublisher(root *KeyRoot, tshort, tlong time.Duration) *Republisher { func NewRepublisher(root *KeyRoot, tshort, tlong time.Duration) *Republisher {
return &Republisher{ return &Republisher{
TimeoutShort: tshort, TimeoutShort: tshort,
@ -252,6 +285,9 @@ func NewRepublisher(root *KeyRoot, tshort, tlong time.Duration) *Republisher {
} }
} }
// Touch signals that an update has occurred since the last publish.
// Multiple consecutive touches may extend the time period before
// the next Publish occurs in order to more efficiently batch updates
func (np *Republisher) Touch() { func (np *Republisher) Touch() {
select { select {
case np.Publish <- struct{}{}: case np.Publish <- struct{}{}:
@ -259,6 +295,7 @@ func (np *Republisher) Touch() {
} }
} }
// Run is the main republisher loop
func (np *Republisher) Run(ctx context.Context) { func (np *Republisher) Run(ctx context.Context) {
for { for {
select { select {
@ -268,13 +305,13 @@ func (np *Republisher) Run(ctx context.Context) {
wait: wait:
select { select {
case <-quick:
case <-longer:
case <-ctx.Done(): case <-ctx.Done():
return return
case <-np.Publish: case <-np.Publish:
quick = time.After(np.TimeoutShort) quick = time.After(np.TimeoutShort)
goto wait goto wait
case <-quick:
case <-longer:
} }
log.Info("Publishing Changes!") log.Info("Publishing Changes!")