mirror of
https://github.com/ipfs/kubo.git
synced 2025-09-10 05:52:20 +08:00
refactor ipns fuse to utilize ipnsfs
This commit is contained in:
@ -5,16 +5,19 @@ package ipns
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
mrand "math/rand"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
fstest "github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse/fs/fstestutil"
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
racedet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-detect-race"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
core "github.com/jbenet/go-ipfs/core"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
nsfs "github.com/jbenet/go-ipfs/ipnsfs"
|
||||
ci "github.com/jbenet/go-ipfs/util/testutil/ci"
|
||||
)
|
||||
|
||||
@ -30,6 +33,13 @@ func randBytes(size int) []byte {
|
||||
return b
|
||||
}
|
||||
|
||||
func mkdir(t *testing.T, path string) {
|
||||
err := os.Mkdir(path, os.ModeDir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func writeFile(t *testing.T, size int, path string) []byte {
|
||||
return writeFileData(t, randBytes(size), path)
|
||||
}
|
||||
@ -57,6 +67,38 @@ func writeFileData(t *testing.T, data []byte, path string) []byte {
|
||||
return data
|
||||
}
|
||||
|
||||
func verifyFile(t *testing.T, path string, data []byte) {
|
||||
fi, err := os.Open(path)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer fi.Close()
|
||||
|
||||
out, err := ioutil.ReadAll(fi)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if !bytes.Equal(out, data) {
|
||||
t.Fatal("Data not equal")
|
||||
}
|
||||
}
|
||||
|
||||
func checkExists(t *testing.T, path string) {
|
||||
_, err := os.Stat(path)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func closeMount(mnt *fstest.Mount) {
|
||||
if err := recover(); err != nil {
|
||||
log.Error("Recovered panic")
|
||||
log.Error(err)
|
||||
}
|
||||
mnt.Close()
|
||||
}
|
||||
|
||||
func setupIpnsTest(t *testing.T, node *core.IpfsNode) (*core.IpfsNode, *fstest.Mount) {
|
||||
maybeSkipFuseTests(t)
|
||||
|
||||
@ -66,6 +108,13 @@ func setupIpnsTest(t *testing.T, node *core.IpfsNode) (*core.IpfsNode, *fstest.M
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ipnsfs, err := nsfs.NewFilesystem(context.TODO(), node.DAG, node.Namesys, node.Pinning, node.PrivateKey)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
node.IpnsFs = ipnsfs
|
||||
}
|
||||
|
||||
fs, err := NewFileSystem(node, node.PrivateKey, "")
|
||||
@ -80,17 +129,29 @@ func setupIpnsTest(t *testing.T, node *core.IpfsNode) (*core.IpfsNode, *fstest.M
|
||||
return node, mnt
|
||||
}
|
||||
|
||||
func TestIpnsLocalLink(t *testing.T) {
|
||||
_, mnt := setupIpnsTest(t, nil)
|
||||
defer mnt.Close()
|
||||
name := mnt.Dir + "/local"
|
||||
|
||||
finfo, err := os.Stat(name)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Log(finfo.Name())
|
||||
}
|
||||
|
||||
// Test writing a file and reading it back
|
||||
func TestIpnsBasicIO(t *testing.T) {
|
||||
t.Skip("Skipping until DAGModifier can be fixed.")
|
||||
if testing.Short() {
|
||||
t.SkipNow()
|
||||
}
|
||||
_, mnt := setupIpnsTest(t, nil)
|
||||
defer mnt.Close()
|
||||
defer closeMount(mnt)
|
||||
|
||||
fname := mnt.Dir + "/local/testfile"
|
||||
data := writeFile(t, 12345, fname)
|
||||
data := writeFile(t, 10, fname)
|
||||
|
||||
rbuf, err := ioutil.ReadFile(fname)
|
||||
if err != nil {
|
||||
@ -104,7 +165,6 @@ func TestIpnsBasicIO(t *testing.T) {
|
||||
|
||||
// Test to make sure file changes persist over mounts of ipns
|
||||
func TestFilePersistence(t *testing.T) {
|
||||
t.Skip("Skipping until DAGModifier can be fixed.")
|
||||
if testing.Short() {
|
||||
t.SkipNow()
|
||||
}
|
||||
@ -113,11 +173,9 @@ func TestFilePersistence(t *testing.T) {
|
||||
fname := "/local/atestfile"
|
||||
data := writeFile(t, 127, mnt.Dir+fname)
|
||||
|
||||
// Wait for publish: TODO: make publish happen faster in tests
|
||||
time.Sleep(time.Millisecond * 40)
|
||||
|
||||
mnt.Close()
|
||||
|
||||
t.Log("Closed, opening new fs")
|
||||
node, mnt = setupIpnsTest(t, node)
|
||||
defer mnt.Close()
|
||||
|
||||
@ -131,9 +189,45 @@ func TestFilePersistence(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeeperDirs(t *testing.T) {
|
||||
node, mnt := setupIpnsTest(t, nil)
|
||||
|
||||
t.Log("make a top level dir")
|
||||
dir1 := "/local/test1"
|
||||
mkdir(t, mnt.Dir+dir1)
|
||||
|
||||
checkExists(t, mnt.Dir+dir1)
|
||||
|
||||
t.Log("write a file in it")
|
||||
data1 := writeFile(t, 4000, mnt.Dir+dir1+"/file1")
|
||||
|
||||
verifyFile(t, mnt.Dir+dir1+"/file1", data1)
|
||||
|
||||
t.Log("sub directory")
|
||||
mkdir(t, mnt.Dir+dir1+"/dir2")
|
||||
|
||||
checkExists(t, mnt.Dir+dir1+"/dir2")
|
||||
|
||||
t.Log("file in that subdirectory")
|
||||
data2 := writeFile(t, 5000, mnt.Dir+dir1+"/dir2/file2")
|
||||
|
||||
verifyFile(t, mnt.Dir+dir1+"/dir2/file2", data2)
|
||||
|
||||
mnt.Close()
|
||||
t.Log("closing mount, then restarting")
|
||||
|
||||
_, mnt = setupIpnsTest(t, node)
|
||||
|
||||
checkExists(t, mnt.Dir+dir1)
|
||||
|
||||
verifyFile(t, mnt.Dir+dir1+"/file1", data1)
|
||||
|
||||
verifyFile(t, mnt.Dir+dir1+"/dir2/file2", data2)
|
||||
mnt.Close()
|
||||
}
|
||||
|
||||
// Test to make sure the filesystem reports file sizes correctly
|
||||
func TestFileSizeReporting(t *testing.T) {
|
||||
t.Skip("Skipping until DAGModifier can be fixed.")
|
||||
if testing.Short() {
|
||||
t.SkipNow()
|
||||
}
|
||||
@ -155,7 +249,6 @@ func TestFileSizeReporting(t *testing.T) {
|
||||
|
||||
// Test to make sure you cant create multiple entries with the same name
|
||||
func TestDoubleEntryFailure(t *testing.T) {
|
||||
t.Skip("Skipping until DAGModifier can be fixed.")
|
||||
if testing.Short() {
|
||||
t.SkipNow()
|
||||
}
|
||||
@ -175,7 +268,6 @@ func TestDoubleEntryFailure(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAppendFile(t *testing.T) {
|
||||
t.Skip("Skipping until DAGModifier can be fixed.")
|
||||
if testing.Short() {
|
||||
t.SkipNow()
|
||||
}
|
||||
@ -216,8 +308,126 @@ func TestAppendFile(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestConcurrentWrites(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.SkipNow()
|
||||
}
|
||||
_, mnt := setupIpnsTest(t, nil)
|
||||
defer mnt.Close()
|
||||
|
||||
nactors := 4
|
||||
filesPerActor := 400
|
||||
fileSize := 2000
|
||||
|
||||
data := make([][][]byte, nactors)
|
||||
|
||||
if racedet.WithRace() {
|
||||
nactors = 2
|
||||
filesPerActor = 50
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
for i := 0; i < nactors; i++ {
|
||||
data[i] = make([][]byte, filesPerActor)
|
||||
wg.Add(1)
|
||||
go func(n int) {
|
||||
defer wg.Done()
|
||||
for j := 0; j < filesPerActor; j++ {
|
||||
out := writeFile(t, fileSize, mnt.Dir+fmt.Sprintf("/local/%dFILE%d", n, j))
|
||||
data[n][j] = out
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
for i := 0; i < nactors; i++ {
|
||||
for j := 0; j < filesPerActor; j++ {
|
||||
verifyFile(t, mnt.Dir+fmt.Sprintf("/local/%dFILE%d", i, j), data[i][j])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFSThrash(t *testing.T) {
|
||||
files := make(map[string][]byte)
|
||||
|
||||
if testing.Short() {
|
||||
t.SkipNow()
|
||||
}
|
||||
_, mnt := setupIpnsTest(t, nil)
|
||||
defer mnt.Close()
|
||||
|
||||
base := mnt.Dir + "/local"
|
||||
dirs := []string{base}
|
||||
dirlock := sync.RWMutex{}
|
||||
filelock := sync.Mutex{}
|
||||
|
||||
ndirWorkers := 2
|
||||
nfileWorkers := 2
|
||||
|
||||
ndirs := 100
|
||||
nfiles := 200
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
// Spawn off workers to make directories
|
||||
for i := 0; i < ndirWorkers; i++ {
|
||||
wg.Add(1)
|
||||
go func(worker int) {
|
||||
defer wg.Done()
|
||||
for j := 0; j < ndirs; j++ {
|
||||
dirlock.RLock()
|
||||
n := mrand.Intn(len(dirs))
|
||||
dir := dirs[n]
|
||||
dirlock.RUnlock()
|
||||
|
||||
newDir := fmt.Sprintf("%s/dir%d-%d", dir, worker, j)
|
||||
err := os.Mkdir(newDir, os.ModeDir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
dirlock.Lock()
|
||||
dirs = append(dirs, newDir)
|
||||
dirlock.Unlock()
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
// Spawn off workers to make files
|
||||
for i := 0; i < nfileWorkers; i++ {
|
||||
wg.Add(1)
|
||||
go func(worker int) {
|
||||
defer wg.Done()
|
||||
for j := 0; j < nfiles; j++ {
|
||||
dirlock.RLock()
|
||||
n := mrand.Intn(len(dirs))
|
||||
dir := dirs[n]
|
||||
dirlock.RUnlock()
|
||||
|
||||
newFileName := fmt.Sprintf("%s/file%d-%d", dir, worker, j)
|
||||
|
||||
data := writeFile(t, 2000+mrand.Intn(5000), newFileName)
|
||||
filelock.Lock()
|
||||
files[newFileName] = data
|
||||
filelock.Unlock()
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
for name, data := range files {
|
||||
out, err := ioutil.ReadFile(name)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if !bytes.Equal(data, out) {
|
||||
t.Fatal("Data didnt match")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
func TestFastRepublish(t *testing.T) {
|
||||
t.Skip("Skipping until DAGModifier can be fixed.")
|
||||
if testing.Short() {
|
||||
t.SkipNow()
|
||||
}
|
||||
@ -319,10 +529,11 @@ func TestFastRepublish(t *testing.T) {
|
||||
|
||||
close(closed)
|
||||
}
|
||||
*/
|
||||
|
||||
// Test writing a medium sized file one byte at a time
|
||||
func TestMultiWrite(t *testing.T) {
|
||||
t.Skip("Skipping until DAGModifier can be fixed.")
|
||||
|
||||
if testing.Short() {
|
||||
t.SkipNow()
|
||||
}
|
||||
|
@ -8,37 +8,22 @@ import (
|
||||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
fuse "github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse"
|
||||
fs "github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse/fs"
|
||||
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
|
||||
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog"
|
||||
|
||||
core "github.com/jbenet/go-ipfs/core"
|
||||
chunk "github.com/jbenet/go-ipfs/importer/chunk"
|
||||
mdag "github.com/jbenet/go-ipfs/merkledag"
|
||||
nsfs "github.com/jbenet/go-ipfs/ipnsfs"
|
||||
dag "github.com/jbenet/go-ipfs/merkledag"
|
||||
ci "github.com/jbenet/go-ipfs/p2p/crypto"
|
||||
path "github.com/jbenet/go-ipfs/path"
|
||||
ft "github.com/jbenet/go-ipfs/unixfs"
|
||||
uio "github.com/jbenet/go-ipfs/unixfs/io"
|
||||
mod "github.com/jbenet/go-ipfs/unixfs/mod"
|
||||
ftpb "github.com/jbenet/go-ipfs/unixfs/pb"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"
|
||||
)
|
||||
|
||||
const IpnsReadonly = true
|
||||
|
||||
var log = eventlog.Logger("fuse/ipns")
|
||||
|
||||
var (
|
||||
shortRepublishTimeout = time.Millisecond * 5
|
||||
longRepublishTimeout = time.Millisecond * 500
|
||||
)
|
||||
|
||||
// FileSystem is the readwrite IPNS Fuse Filesystem.
|
||||
type FileSystem struct {
|
||||
Ipfs *core.IpfsNode
|
||||
@ -54,73 +39,17 @@ func NewFileSystem(ipfs *core.IpfsNode, sk ci.PrivKey, ipfspath string) (*FileSy
|
||||
return &FileSystem{Ipfs: ipfs, RootNode: root}, nil
|
||||
}
|
||||
|
||||
func CreateRoot(n *core.IpfsNode, keys []ci.PrivKey, ipfsroot string) (*Root, error) {
|
||||
root := new(Root)
|
||||
root.LocalDirs = make(map[string]*Node)
|
||||
root.Ipfs = n
|
||||
abspath, err := filepath.Abs(ipfsroot)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
root.IpfsRoot = abspath
|
||||
|
||||
root.Keys = keys
|
||||
|
||||
if len(keys) == 0 {
|
||||
log.Warning("No keys given for ipns root creation")
|
||||
} else {
|
||||
k := keys[0]
|
||||
pub := k.GetPublic()
|
||||
hash, err := pub.Hash()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
root.LocalLink = &Link{u.Key(hash).Pretty()}
|
||||
}
|
||||
|
||||
for _, k := range keys {
|
||||
hash, err := k.GetPublic().Hash()
|
||||
if err != nil {
|
||||
log.Debug("failed to hash public key.")
|
||||
continue
|
||||
}
|
||||
name := u.Key(hash).Pretty()
|
||||
nd := new(Node)
|
||||
nd.Ipfs = n
|
||||
nd.key = k
|
||||
nd.repub = NewRepublisher(nd, shortRepublishTimeout, longRepublishTimeout)
|
||||
|
||||
go nd.repub.Run()
|
||||
|
||||
pointsTo, err := n.Namesys.Resolve(n.Context(), name)
|
||||
if err != nil {
|
||||
log.Warning("Could not resolve value for local ipns entry, providing empty dir")
|
||||
nd.Nd = &mdag.Node{Data: ft.FolderPBData()}
|
||||
root.LocalDirs[name] = nd
|
||||
continue
|
||||
}
|
||||
|
||||
if !u.IsValidHash(pointsTo.B58String()) {
|
||||
log.Criticalf("Got back bad data from namesys resolve! [%s]", pointsTo)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
node, err := n.Resolver.ResolvePath(path.Path(pointsTo.B58String()))
|
||||
if err != nil {
|
||||
log.Warning("Failed to resolve value from ipns entry in ipfs")
|
||||
continue
|
||||
}
|
||||
|
||||
nd.Nd = node
|
||||
root.LocalDirs[name] = nd
|
||||
}
|
||||
|
||||
return root, nil
|
||||
// Root constructs the Root of the filesystem, a Root object.
|
||||
func (f *FileSystem) Root() (fs.Node, error) {
|
||||
log.Debug("Filesystem, get root")
|
||||
return f.RootNode, nil
|
||||
}
|
||||
|
||||
// Root constructs the Root of the filesystem, a Root object.
|
||||
func (f FileSystem) Root() (fs.Node, error) {
|
||||
return f.RootNode, nil
|
||||
func (f *FileSystem) Destroy() {
|
||||
err := f.RootNode.Close()
|
||||
if err != nil {
|
||||
log.Errorf("Error Shutting Down Filesystem: %s\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Root is the root object of the filesystem tree.
|
||||
@ -130,13 +59,53 @@ type Root struct {
|
||||
|
||||
// Used for symlinking into ipfs
|
||||
IpfsRoot string
|
||||
LocalDirs map[string]*Node
|
||||
LocalDirs map[string]fs.Node
|
||||
Roots map[string]*nsfs.KeyRoot
|
||||
|
||||
fs *nsfs.Filesystem
|
||||
LocalLink *Link
|
||||
}
|
||||
|
||||
func CreateRoot(ipfs *core.IpfsNode, keys []ci.PrivKey, ipfspath string) (*Root, error) {
|
||||
ldirs := make(map[string]fs.Node)
|
||||
roots := make(map[string]*nsfs.KeyRoot)
|
||||
for _, k := range keys {
|
||||
pkh, err := k.GetPublic().Hash()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
name := u.Key(pkh).B58String()
|
||||
root, err := ipfs.IpnsFs.GetRoot(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
roots[name] = root
|
||||
|
||||
switch val := root.GetValue().(type) {
|
||||
case *nsfs.Directory:
|
||||
ldirs[name] = &Directory{dir: val}
|
||||
case *nsfs.File:
|
||||
ldirs[name] = &File{fi: val}
|
||||
default:
|
||||
return nil, errors.New("unrecognized type")
|
||||
}
|
||||
}
|
||||
|
||||
return &Root{
|
||||
fs: ipfs.IpnsFs,
|
||||
Ipfs: ipfs,
|
||||
IpfsRoot: ipfspath,
|
||||
Keys: keys,
|
||||
LocalDirs: ldirs,
|
||||
LocalLink: &Link{ipfs.Identity.Pretty()},
|
||||
Roots: roots,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Attr returns file attributes.
|
||||
func (*Root) Attr() fuse.Attr {
|
||||
log.Debug("Root Attr")
|
||||
return fuse.Attr{Mode: os.ModeDir | 0111} // -rw+x
|
||||
}
|
||||
|
||||
@ -148,6 +117,7 @@ func (s *Root) Lookup(ctx context.Context, name string) (fs.Node, error) {
|
||||
return nil, fuse.ENOENT
|
||||
}
|
||||
|
||||
// Local symlink to the node ID keyspace
|
||||
if name == "local" {
|
||||
if s.LocalLink == nil {
|
||||
return nil, fuse.ENOENT
|
||||
@ -157,9 +127,17 @@ func (s *Root) Lookup(ctx context.Context, name string) (fs.Node, error) {
|
||||
|
||||
nd, ok := s.LocalDirs[name]
|
||||
if ok {
|
||||
return nd, nil
|
||||
switch nd := nd.(type) {
|
||||
case *Directory:
|
||||
return nd, nil
|
||||
case *File:
|
||||
return nd, nil
|
||||
default:
|
||||
return nil, fuse.EIO
|
||||
}
|
||||
}
|
||||
|
||||
// other links go through ipns resolution and are symlinked into the ipfs mountpoint
|
||||
resolved, err := s.Ipfs.Namesys.Resolve(s.Ipfs.Context(), name)
|
||||
if err != nil {
|
||||
log.Warningf("ipns: namesys resolve error: %s", err)
|
||||
@ -169,8 +147,29 @@ func (s *Root) Lookup(ctx context.Context, name string) (fs.Node, error) {
|
||||
return &Link{s.IpfsRoot + "/" + resolved.B58String()}, nil
|
||||
}
|
||||
|
||||
// ReadDirAll reads a particular directory. Disallowed for root.
|
||||
func (r *Root) Close() error {
|
||||
for _, kr := range r.Roots {
|
||||
err := kr.Publish(r.Ipfs.Context())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Forget is called when the filesystem is unmounted. probably.
|
||||
// see comments here: http://godoc.org/bazil.org/fuse/fs#FSDestroyer
|
||||
func (r *Root) Forget() {
|
||||
err := r.Close()
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
// ReadDirAll reads a particular directory. Will show locally available keys
|
||||
// as well as a symlink to the peerID key
|
||||
func (r *Root) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
|
||||
log.Debug("Root ReadDirAll")
|
||||
listing := []fuse.Dirent{
|
||||
fuse.Dirent{
|
||||
Name: "local",
|
||||
@ -192,115 +191,78 @@ func (r *Root) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
|
||||
return listing, nil
|
||||
}
|
||||
|
||||
// Node is the core object representing a filesystem tree node.
|
||||
type Node struct {
|
||||
root *Root
|
||||
nsRoot *Node
|
||||
parent *Node
|
||||
// Directory is wrapper over an ipnsfs directory to satisfy the fuse fs interface
|
||||
type Directory struct {
|
||||
dir *nsfs.Directory
|
||||
|
||||
repub *Republisher
|
||||
|
||||
// This nodes name in its parent dir.
|
||||
// NOTE: this strategy wont work well if we allow hard links
|
||||
// (im all for murdering the thought of hard links)
|
||||
name string
|
||||
|
||||
// Private keys held by nodes at the root of a keyspace
|
||||
// WARNING(security): the PrivKey interface is currently insecure
|
||||
// (holds the raw key). It will be secured later.
|
||||
key ci.PrivKey
|
||||
|
||||
Ipfs *core.IpfsNode
|
||||
Nd *mdag.Node
|
||||
dagMod *mod.DagModifier
|
||||
cached *ftpb.Data
|
||||
fs.NodeRef
|
||||
}
|
||||
|
||||
func (s *Node) loadData() error {
|
||||
s.cached = new(ftpb.Data)
|
||||
return proto.Unmarshal(s.Nd.Data, s.cached)
|
||||
// File is wrapper over an ipnsfs file to satisfy the fuse fs interface
|
||||
type File struct {
|
||||
fi *nsfs.File
|
||||
|
||||
fs.NodeRef
|
||||
}
|
||||
|
||||
// Attr returns the attributes of a given node.
|
||||
func (s *Node) Attr() fuse.Attr {
|
||||
if s.cached == nil {
|
||||
err := s.loadData()
|
||||
if err != nil {
|
||||
log.Debugf("Error loading PBData for file: '%s'", s.name)
|
||||
}
|
||||
func (d *Directory) Attr() fuse.Attr {
|
||||
log.Debug("Directory Attr")
|
||||
return fuse.Attr{Mode: os.ModeDir | 0555}
|
||||
}
|
||||
|
||||
// Attr returns the attributes of a given node.
|
||||
func (fi *File) Attr() fuse.Attr {
|
||||
log.Debug("File Attr")
|
||||
size, err := fi.fi.Size()
|
||||
if err != nil {
|
||||
// In this case, the dag node in question may not be unixfs
|
||||
log.Critical("Failed to get file size: %s", err)
|
||||
}
|
||||
switch s.cached.GetType() {
|
||||
case ftpb.Data_Directory:
|
||||
return fuse.Attr{Mode: os.ModeDir | 0555}
|
||||
case ftpb.Data_File, ftpb.Data_Raw:
|
||||
size, err := ft.DataSize(s.Nd.Data)
|
||||
if err != nil {
|
||||
log.Debugf("Error getting size of file: %s", err)
|
||||
size = 0
|
||||
}
|
||||
if size == 0 {
|
||||
dmsize, err := s.dagMod.Size()
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
size = uint64(dmsize)
|
||||
}
|
||||
|
||||
mode := os.FileMode(0666)
|
||||
if IpnsReadonly {
|
||||
mode = 0444
|
||||
}
|
||||
|
||||
return fuse.Attr{
|
||||
Mode: mode,
|
||||
Size: size,
|
||||
Blocks: uint64(len(s.Nd.Links)),
|
||||
}
|
||||
default:
|
||||
log.Debug("Invalid data type.")
|
||||
return fuse.Attr{}
|
||||
return fuse.Attr{
|
||||
Mode: os.FileMode(0666),
|
||||
Size: uint64(size),
|
||||
}
|
||||
}
|
||||
|
||||
// Lookup performs a lookup under this node.
|
||||
func (s *Node) Lookup(ctx context.Context, name string) (fs.Node, error) {
|
||||
nodes, err := s.Ipfs.Resolver.ResolveLinks(s.Nd, []string{name})
|
||||
func (s *Directory) Lookup(ctx context.Context, name string) (fs.Node, error) {
|
||||
child, err := s.dir.Child(name)
|
||||
if err != nil {
|
||||
// todo: make this error more versatile.
|
||||
return nil, fuse.ENOENT
|
||||
}
|
||||
|
||||
return s.makeChild(name, nodes[len(nodes)-1]), nil
|
||||
}
|
||||
|
||||
func (n *Node) makeChild(name string, node *mdag.Node) *Node {
|
||||
child := &Node{
|
||||
Ipfs: n.Ipfs,
|
||||
Nd: node,
|
||||
name: name,
|
||||
nsRoot: n.nsRoot,
|
||||
parent: n,
|
||||
switch child := child.(type) {
|
||||
case *nsfs.Directory:
|
||||
return &Directory{dir: child}, nil
|
||||
case *nsfs.File:
|
||||
return &File{fi: child}, nil
|
||||
default:
|
||||
panic("system has proven to be insane")
|
||||
}
|
||||
|
||||
// Always ensure that each child knows where the root is
|
||||
if n.nsRoot == nil {
|
||||
child.nsRoot = n
|
||||
} else {
|
||||
child.nsRoot = n.nsRoot
|
||||
}
|
||||
|
||||
return child
|
||||
}
|
||||
|
||||
// ReadDirAll reads the link structure as directory entries
|
||||
func (s *Node) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
|
||||
entries := make([]fuse.Dirent, len(s.Nd.Links))
|
||||
for i, link := range s.Nd.Links {
|
||||
n := link.Name
|
||||
if len(n) == 0 {
|
||||
n = link.Hash.B58String()
|
||||
func (dir *Directory) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
|
||||
var entries []fuse.Dirent
|
||||
for _, name := range dir.dir.List() {
|
||||
dirent := fuse.Dirent{Name: name}
|
||||
|
||||
// TODO: make dir.dir.List() return dirinfos
|
||||
child, err := dir.dir.Child(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
entries[i] = fuse.Dirent{Name: n, Type: fuse.DT_File}
|
||||
|
||||
switch child.Type() {
|
||||
case nsfs.TDir:
|
||||
dirent.Type = fuse.DT_Dir
|
||||
case nsfs.TFile:
|
||||
dirent.Type = fuse.DT_File
|
||||
}
|
||||
|
||||
entries = append(entries, dirent)
|
||||
}
|
||||
|
||||
if len(entries) > 0 {
|
||||
@ -309,279 +271,130 @@ func (s *Node) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
|
||||
return nil, fuse.ENOENT
|
||||
}
|
||||
|
||||
func (s *Node) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
|
||||
k, err := s.Nd.Key()
|
||||
func (fi *File) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
|
||||
_, err := fi.fi.Seek(req.Offset, os.SEEK_SET)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// setup our logging event
|
||||
lm := make(lgbl.DeferredMap)
|
||||
lm["fs"] = "ipns"
|
||||
lm["key"] = func() interface{} { return k.Pretty() }
|
||||
lm["req_offset"] = req.Offset
|
||||
lm["req_size"] = req.Size
|
||||
defer log.EventBegin(ctx, "fuseRead", lm).Done()
|
||||
|
||||
r, err := uio.NewDagReader(ctx, s.Nd, s.Ipfs.DAG)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
o, err := r.Seek(req.Offset, os.SEEK_SET)
|
||||
lm["res_offset"] = o
|
||||
fisize, err := fi.fi.Size()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
buf := resp.Data[:min(req.Size, int(r.Size()))]
|
||||
n, err := io.ReadFull(r, buf)
|
||||
readsize := min(req.Size, int(fisize-req.Offset))
|
||||
n, err := io.ReadFull(fi.fi, resp.Data[:readsize])
|
||||
resp.Data = resp.Data[:n]
|
||||
lm["res_size"] = n
|
||||
return err // may be non-nil / not succeeded
|
||||
}
|
||||
|
||||
func (n *Node) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
|
||||
// log.Debugf("ipns: Node Write [%s]: flags = %s, offset = %d, size = %d", n.name, req.Flags.String(), req.Offset, len(req.Data))
|
||||
if IpnsReadonly {
|
||||
log.Debug("Attempted to write on readonly ipns filesystem.")
|
||||
return fuse.EPERM
|
||||
}
|
||||
|
||||
if n.dagMod == nil {
|
||||
// Create a DagModifier to allow us to change the existing dag node
|
||||
dmod, err := mod.NewDagModifier(ctx, n.Nd, n.Ipfs.DAG, n.Ipfs.Pinning.GetManual(), chunk.DefaultSplitter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n.dagMod = dmod
|
||||
}
|
||||
wrote, err := n.dagMod.WriteAt(req.Data, int64(req.Offset))
|
||||
func (fi *File) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
|
||||
wrote, err := fi.fi.WriteAt(req.Data, req.Offset)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp.Size = wrote
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *Node) Flush(ctx context.Context, req *fuse.FlushRequest) error {
|
||||
if IpnsReadonly {
|
||||
return nil
|
||||
}
|
||||
|
||||
// If a write has happened
|
||||
if n.dagMod != nil {
|
||||
newNode, err := n.dagMod.GetNode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if n.parent != nil {
|
||||
log.Error("updating self in parent!")
|
||||
err := n.parent.update(n.name, newNode)
|
||||
if err != nil {
|
||||
log.Criticalf("error in updating ipns dag tree: %s", err)
|
||||
// return fuse.ETHISISPRETTYBAD
|
||||
return err
|
||||
}
|
||||
}
|
||||
n.Nd = newNode
|
||||
|
||||
/*/TEMP
|
||||
dr, err := mdag.NewDagReader(n.Nd, n.Ipfs.DAG)
|
||||
if err != nil {
|
||||
log.Critical("Verification read failed.")
|
||||
}
|
||||
b, err := ioutil.ReadAll(dr)
|
||||
if err != nil {
|
||||
log.Critical("Verification read failed.")
|
||||
}
|
||||
fmt.Println("VERIFICATION READ")
|
||||
fmt.Printf("READ %d BYTES\n", len(b))
|
||||
fmt.Println(string(b))
|
||||
fmt.Println(b)
|
||||
//*/
|
||||
|
||||
n.dagMod = nil
|
||||
|
||||
n.wasChanged()
|
||||
}
|
||||
return nil
|
||||
func (fi *File) Flush(ctx context.Context, req *fuse.FlushRequest) error {
|
||||
return fi.fi.Close()
|
||||
}
|
||||
|
||||
// Signal that a node in this tree was changed so the root can republish
|
||||
func (n *Node) wasChanged() {
|
||||
if IpnsReadonly {
|
||||
return
|
||||
}
|
||||
root := n.nsRoot
|
||||
if root == nil {
|
||||
root = n
|
||||
}
|
||||
|
||||
root.repub.Publish <- struct{}{}
|
||||
func (fi *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
|
||||
return fi.fi.Flush()
|
||||
}
|
||||
|
||||
func (n *Node) republishRoot() error {
|
||||
|
||||
// We should already be the root, this is just a sanity check
|
||||
var root *Node
|
||||
if n.nsRoot != nil {
|
||||
root = n.nsRoot
|
||||
} else {
|
||||
root = n
|
||||
}
|
||||
|
||||
// Add any nodes that may be new to the DAG service
|
||||
err := n.Ipfs.DAG.AddRecursive(root.Nd)
|
||||
func (fi *File) Forget() {
|
||||
err := fi.fi.Flush()
|
||||
if err != nil {
|
||||
log.Criticalf("ipns: Dag Add Error: %s", err)
|
||||
return err
|
||||
log.Debug("Forget file error: ", err)
|
||||
}
|
||||
}
|
||||
|
||||
ndkey, err := root.Nd.Key()
|
||||
func (dir *Directory) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, error) {
|
||||
child, err := dir.dir.Mkdir(req.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = n.Ipfs.Namesys.Publish(n.Ipfs.Context(), root.key, ndkey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return &Directory{dir: child}, nil
|
||||
}
|
||||
|
||||
func (n *Node) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *Node) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, error) {
|
||||
if IpnsReadonly {
|
||||
return nil, fuse.EPERM
|
||||
}
|
||||
dagnd := &mdag.Node{Data: ft.FolderPBData()}
|
||||
nnode := n.Nd.Copy()
|
||||
nnode.AddNodeLink(req.Name, dagnd)
|
||||
|
||||
child := &Node{
|
||||
Ipfs: n.Ipfs,
|
||||
Nd: dagnd,
|
||||
name: req.Name,
|
||||
}
|
||||
|
||||
if n.nsRoot == nil {
|
||||
child.nsRoot = n
|
||||
} else {
|
||||
child.nsRoot = n.nsRoot
|
||||
}
|
||||
|
||||
if n.parent != nil {
|
||||
err := n.parent.update(n.name, nnode)
|
||||
if err != nil {
|
||||
log.Criticalf("Error updating node: %s", err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
n.Nd = nnode
|
||||
|
||||
n.wasChanged()
|
||||
|
||||
return child, nil
|
||||
}
|
||||
|
||||
func (n *Node) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) {
|
||||
//log.Debug("[%s] Received open request! flags = %s", n.name, req.Flags.String())
|
||||
//TODO: check open flags and truncate if necessary
|
||||
func (fi *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) {
|
||||
if req.Flags&fuse.OpenTruncate != 0 {
|
||||
log.Warning("Need to truncate file!")
|
||||
n.cached = nil
|
||||
n.Nd = &mdag.Node{Data: ft.FilePBData(nil, 0)}
|
||||
err := fi.fi.Truncate(0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else if req.Flags&fuse.OpenAppend != 0 {
|
||||
log.Warning("Need to append to file!")
|
||||
}
|
||||
return n, nil
|
||||
return fi, nil
|
||||
}
|
||||
|
||||
func (n *Node) Mknod(ctx context.Context, req *fuse.MknodRequest) (fs.Node, error) {
|
||||
return nil, nil
|
||||
func (fi *File) Release(ctx context.Context, req *fuse.ReleaseRequest) error {
|
||||
return fi.fi.Close()
|
||||
}
|
||||
|
||||
func (n *Node) Create(ctx context.Context, req *fuse.CreateRequest, resp *fuse.CreateResponse) (fs.Node, fs.Handle, error) {
|
||||
if IpnsReadonly {
|
||||
log.Debug("Attempted to call Create on a readonly filesystem.")
|
||||
return nil, nil, fuse.EPERM
|
||||
}
|
||||
|
||||
func (dir *Directory) Create(ctx context.Context, req *fuse.CreateRequest, resp *fuse.CreateResponse) (fs.Node, fs.Handle, error) {
|
||||
// New 'empty' file
|
||||
nd := &mdag.Node{Data: ft.FilePBData(nil, 0)}
|
||||
child := n.makeChild(req.Name, nd)
|
||||
|
||||
nnode := n.Nd.Copy()
|
||||
|
||||
err := nnode.AddNodeLink(req.Name, nd)
|
||||
nd := &dag.Node{Data: ft.FilePBData(nil, 0)}
|
||||
err := dir.dir.AddChild(req.Name, nd)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if n.parent != nil {
|
||||
err := n.parent.update(n.name, nnode)
|
||||
if err != nil {
|
||||
log.Criticalf("Error updating node: %s", err)
|
||||
// Can we panic, please?
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
n.Nd = nnode
|
||||
n.wasChanged()
|
||||
|
||||
return child, child, nil
|
||||
child, err := dir.dir.Child(req.Name)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
fi, ok := child.(*nsfs.File)
|
||||
if !ok {
|
||||
return nil, nil, errors.New("child creation failed")
|
||||
}
|
||||
|
||||
nodechild := &File{fi: fi}
|
||||
return nodechild, nodechild, nil
|
||||
}
|
||||
|
||||
func (n *Node) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
|
||||
if IpnsReadonly {
|
||||
return fuse.EPERM
|
||||
}
|
||||
|
||||
nnode := n.Nd.Copy()
|
||||
err := nnode.RemoveNodeLink(req.Name)
|
||||
func (dir *Directory) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
|
||||
err := dir.dir.Unlink(req.Name)
|
||||
if err != nil {
|
||||
return fuse.ENOENT
|
||||
}
|
||||
|
||||
if n.parent != nil {
|
||||
err := n.parent.update(n.name, nnode)
|
||||
if err != nil {
|
||||
log.Criticalf("Error updating node: %s", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
n.Nd = nnode
|
||||
n.wasChanged()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *Node) Rename(ctx context.Context, req *fuse.RenameRequest, newDir fs.Node) error {
|
||||
if IpnsReadonly {
|
||||
log.Debug("Attempted to call Rename on a readonly filesystem.")
|
||||
return fuse.EPERM
|
||||
// Rename implements NodeRenamer
|
||||
func (dir *Directory) Rename(ctx context.Context, req *fuse.RenameRequest, newDir fs.Node) error {
|
||||
cur, err := dir.dir.Child(req.OldName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var mdn *mdag.Node
|
||||
for _, l := range n.Nd.Links {
|
||||
if l.Name == req.OldName {
|
||||
mdn = l.Node
|
||||
}
|
||||
err = dir.dir.Unlink(req.OldName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if mdn == nil {
|
||||
log.Critical("nil Link found on rename!")
|
||||
return fuse.ENOENT
|
||||
}
|
||||
n.Nd.RemoveNodeLink(req.OldName)
|
||||
|
||||
switch newDir := newDir.(type) {
|
||||
case *Node:
|
||||
err := newDir.Nd.AddNodeLink(req.NewName, mdn)
|
||||
case *Directory:
|
||||
nd, err := cur.GetNode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = newDir.dir.AddChild(req.NewName, nd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case *File:
|
||||
log.Critical("Cannot move node into a file!")
|
||||
return fuse.EPERM
|
||||
default:
|
||||
log.Critical("Unknown node type for rename target dir!")
|
||||
return errors.New("Unknown fs node type!")
|
||||
@ -589,21 +402,11 @@ func (n *Node) Rename(ctx context.Context, req *fuse.RenameRequest, newDir fs.No
|
||||
return nil
|
||||
}
|
||||
|
||||
// Updates the child of this node, specified by name to the given newnode
|
||||
func (n *Node) update(name string, newnode *mdag.Node) error {
|
||||
nnode, err := n.Nd.UpdateNodeLink(name, newnode)
|
||||
if err != nil {
|
||||
return err
|
||||
func min(a, b int) int {
|
||||
if a < b {
|
||||
return a
|
||||
}
|
||||
|
||||
if n.parent != nil {
|
||||
err := n.parent.update(n.name, nnode)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
n.Nd = nnode
|
||||
return nil
|
||||
return b
|
||||
}
|
||||
|
||||
// to check that out Node implements all the interfaces we want
|
||||
@ -615,27 +418,26 @@ type ipnsRoot interface {
|
||||
|
||||
var _ ipnsRoot = (*Root)(nil)
|
||||
|
||||
type ipnsNode interface {
|
||||
fs.HandleFlusher
|
||||
type ipnsDirectory interface {
|
||||
fs.HandleReadDirAller
|
||||
fs.HandleReader
|
||||
fs.HandleWriter
|
||||
fs.Node
|
||||
fs.NodeCreater
|
||||
fs.NodeFsyncer
|
||||
fs.NodeMkdirer
|
||||
fs.NodeMknoder
|
||||
fs.NodeOpener
|
||||
fs.NodeRemover
|
||||
fs.NodeRenamer
|
||||
fs.NodeStringLookuper
|
||||
}
|
||||
|
||||
var _ ipnsNode = (*Node)(nil)
|
||||
var _ ipnsDirectory = (*Directory)(nil)
|
||||
|
||||
func min(a, b int) int {
|
||||
if a < b {
|
||||
return a
|
||||
}
|
||||
return b
|
||||
type ipnsFile interface {
|
||||
fs.HandleFlusher
|
||||
fs.HandleReader
|
||||
fs.HandleWriter
|
||||
fs.HandleReleaser
|
||||
fs.Node
|
||||
fs.NodeFsyncer
|
||||
fs.NodeOpener
|
||||
}
|
||||
|
||||
var _ ipnsFile = (*File)(nil)
|
||||
|
@ -1,44 +0,0 @@
|
||||
// +build !nofuse
|
||||
|
||||
package ipns
|
||||
|
||||
import "time"
|
||||
|
||||
type Republisher struct {
|
||||
TimeoutLong time.Duration
|
||||
TimeoutShort time.Duration
|
||||
Publish chan struct{}
|
||||
node *Node
|
||||
}
|
||||
|
||||
func NewRepublisher(n *Node, tshort, tlong time.Duration) *Republisher {
|
||||
return &Republisher{
|
||||
TimeoutShort: tshort,
|
||||
TimeoutLong: tlong,
|
||||
Publish: make(chan struct{}),
|
||||
node: n,
|
||||
}
|
||||
}
|
||||
|
||||
func (np *Republisher) Run() {
|
||||
for _ = range np.Publish {
|
||||
quick := time.After(np.TimeoutShort)
|
||||
longer := time.After(np.TimeoutLong)
|
||||
|
||||
wait:
|
||||
select {
|
||||
case <-quick:
|
||||
case <-longer:
|
||||
case <-np.Publish:
|
||||
quick = time.After(np.TimeoutShort)
|
||||
goto wait
|
||||
}
|
||||
|
||||
log.Info("Publishing Changes!")
|
||||
err := np.node.republishRoot()
|
||||
if err != nil {
|
||||
log.Critical("republishRoot error: %s", err)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user