1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-08-06 03:19:47 +08:00

introduce concept of filedescriptors to mfs, adjust fuse code to use them

License: MIT
Signed-off-by: Jeromy <jeromyj@gmail.com>
This commit is contained in:
Jeromy
2016-02-03 20:45:12 -08:00
parent 5474e15e93
commit 4b7e3282f1
10 changed files with 576 additions and 254 deletions

View File

@ -368,6 +368,14 @@ Examples:
return
}
rfd, err := fi.Open(mfs.OpenReadOnly, false)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
defer rfd.Close()
offset, _, err := req.Option("offset").Int()
if err != nil {
res.SetError(err, cmds.ErrNormal)
@ -378,7 +386,7 @@ Examples:
return
}
filen, err := fi.Size()
filen, err := rfd.Size()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
@ -389,12 +397,13 @@ Examples:
return
}
_, err = fi.Seek(int64(offset), os.SEEK_SET)
_, err = rfd.Seek(int64(offset), os.SEEK_SET)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
var r io.Reader = &contextReaderWrapper{R: fi, ctx: req.Context()}
var r io.Reader = &contextReaderWrapper{R: rfd, ctx: req.Context()}
count, found, err := req.Option("count").Int()
if err != nil {
res.SetError(err, cmds.ErrNormal)
@ -405,7 +414,7 @@ Examples:
res.SetError(fmt.Errorf("cannot specify negative 'count'"), cmds.ErrNormal)
return
}
r = io.LimitReader(fi, int64(count))
r = io.LimitReader(r, int64(count))
}
res.SetOutput(r)
@ -540,14 +549,16 @@ Warning:
return
}
if flush {
defer fi.Close()
} else {
defer fi.Sync()
wfd, err := fi.Open(mfs.OpenWriteOnly, flush)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
defer wfd.Close()
if trunc {
if err := fi.Truncate(0); err != nil {
if err := wfd.Truncate(0); err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
@ -563,7 +574,7 @@ Warning:
return
}
_, err = fi.Seek(int64(offset), os.SEEK_SET)
_, err = wfd.Seek(int64(offset), os.SEEK_SET)
if err != nil {
log.Error("seekfail: ", err)
res.SetError(err, cmds.ErrNormal)
@ -581,7 +592,7 @@ Warning:
r = io.LimitReader(r, int64(count))
}
n, err := io.Copy(fi, input)
n, err := io.Copy(wfd, input)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return

View File

@ -88,7 +88,7 @@ func checkExists(t *testing.T, path string) {
}
}
func closeMount(mnt *fstest.Mount) {
func closeMount(mnt *mountWrap) {
if err := recover(); err != nil {
log.Error("Recovered panic")
log.Error(err)
@ -96,7 +96,18 @@ func closeMount(mnt *fstest.Mount) {
mnt.Close()
}
func setupIpnsTest(t *testing.T, node *core.IpfsNode) (*core.IpfsNode, *fstest.Mount) {
type mountWrap struct {
*fstest.Mount
Fs *FileSystem
}
func (m *mountWrap) Close() error {
m.Fs.Destroy()
m.Mount.Close()
return nil
}
func setupIpnsTest(t *testing.T, node *core.IpfsNode) (*core.IpfsNode, *mountWrap) {
maybeSkipFuseTests(t)
var err error
@ -129,7 +140,10 @@ func setupIpnsTest(t *testing.T, node *core.IpfsNode) (*core.IpfsNode, *fstest.M
t.Fatal(err)
}
return node, mnt
return node, &mountWrap{
Mount: mnt,
Fs: fs,
}
}
func TestIpnsLocalLink(t *testing.T) {

View File

@ -23,6 +23,14 @@ import (
ci "gx/ipfs/QmUBogf4nUefBjmYjn6jfsfPJRkmDGSeMhNj4usRKq69f4/go-libp2p/p2p/crypto"
)
func init() {
if os.Getenv("IPFS_FUSE_DEBUG") != "" {
fuse.Debug = func(msg interface{}) {
fmt.Println(msg)
}
}
}
var log = logging.Logger("fuse/ipns")
// FileSystem is the readwrite IPNS Fuse Filesystem.
@ -102,7 +110,7 @@ func loadRoot(ctx context.Context, rt *keyRoot, ipfs *core.IpfsNode, name string
case *mfs.Directory:
return &Directory{dir: val}, nil
case *mfs.File:
return &File{fi: val}, nil
return &FileNode{fi: val}, nil
default:
return nil, errors.New("unrecognized type")
}
@ -177,7 +185,7 @@ func (s *Root) Lookup(ctx context.Context, name string) (fs.Node, error) {
switch nd := nd.(type) {
case *Directory:
return nd, nil
case *File:
case *FileNode:
return nd, nil
default:
return nil, fuse.EIO
@ -248,15 +256,15 @@ func (r *Root) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
// Directory is wrapper over an mfs directory to satisfy the fuse fs interface
type Directory struct {
dir *mfs.Directory
}
fs.NodeRef
type FileNode struct {
fi *mfs.File
}
// File is wrapper over an mfs file to satisfy the fuse fs interface
type File struct {
fi *mfs.File
fs.NodeRef
fi mfs.FileDescriptor
}
// Attr returns the attributes of a given node.
@ -269,7 +277,7 @@ func (d *Directory) Attr(ctx context.Context, a *fuse.Attr) error {
}
// Attr returns the attributes of a given node.
func (fi *File) Attr(ctx context.Context, a *fuse.Attr) error {
func (fi *FileNode) Attr(ctx context.Context, a *fuse.Attr) error {
log.Debug("File Attr")
size, err := fi.fi.Size()
if err != nil {
@ -295,7 +303,7 @@ func (s *Directory) Lookup(ctx context.Context, name string) (fs.Node, error) {
case *mfs.Directory:
return &Directory{dir: child}, nil
case *mfs.File:
return &File{fi: child}, nil
return &FileNode{fi: child}, nil
default:
// NB: if this happens, we do not want to continue, unpredictable behaviour
// may occur.
@ -365,7 +373,7 @@ func (fi *File) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.Wr
func (fi *File) Flush(ctx context.Context, req *fuse.FlushRequest) error {
errs := make(chan error, 1)
go func() {
errs <- fi.fi.Close()
errs <- fi.fi.Flush()
}()
select {
case err := <-errs:
@ -393,7 +401,7 @@ func (fi *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fus
// Fsync flushes the content in the file to disk, but does not
// update the dag tree internally
func (fi *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
func (fi *FileNode) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
errs := make(chan error, 1)
go func() {
errs <- fi.fi.Sync()
@ -422,25 +430,49 @@ func (dir *Directory) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Nod
return &Directory{dir: child}, nil
}
func (fi *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) {
func (fi *FileNode) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) {
var mfsflag int
switch {
case req.Flags.IsReadOnly():
mfsflag = mfs.OpenReadOnly
case req.Flags.IsWriteOnly():
mfsflag = mfs.OpenWriteOnly
case req.Flags.IsReadWrite():
mfsflag = mfs.OpenReadWrite
default:
return nil, errors.New("unsupported flag type")
}
fd, err := fi.fi.Open(mfsflag, true)
if err != nil {
return nil, err
}
if req.Flags&fuse.OpenTruncate != 0 {
if req.Flags.IsReadOnly() {
log.Error("tried to open a readonly file with truncate")
return nil, fuse.ENOTSUP
}
log.Info("Need to truncate file!")
err := fi.fi.Truncate(0)
err := fd.Truncate(0)
if err != nil {
return nil, err
}
} else if req.Flags&fuse.OpenAppend != 0 {
log.Info("Need to append to file!")
if req.Flags.IsReadOnly() {
log.Error("tried to open a readonly file with append")
return nil, fuse.ENOTSUP
}
// seek(0) essentially resets the file object, this is required for appends to work
// properly
_, err := fi.fi.Seek(0, os.SEEK_SET)
_, err := fd.Seek(0, os.SEEK_END)
if err != nil {
log.Error("seek reset failed: ", err)
return nil, err
}
}
return fi, nil
return &File{fi: fd}, nil
}
func (fi *File) Release(ctx context.Context, req *fuse.ReleaseRequest) error {
@ -465,8 +497,26 @@ func (dir *Directory) Create(ctx context.Context, req *fuse.CreateRequest, resp
return nil, nil, errors.New("child creation failed")
}
nodechild := &File{fi: fi}
return nodechild, nodechild, nil
nodechild := &FileNode{fi: fi}
var openflag int
switch {
case req.Flags.IsReadOnly():
openflag = mfs.OpenReadOnly
case req.Flags.IsWriteOnly():
openflag = mfs.OpenWriteOnly
case req.Flags.IsReadWrite():
openflag = mfs.OpenReadWrite
default:
return nil, nil, errors.New("unsupported open mode")
}
fd, err := fi.Open(openflag, true)
if err != nil {
return nil, nil, err
}
return nodechild, &File{fi: fd}, nil
}
func (dir *Directory) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
@ -500,7 +550,7 @@ func (dir *Directory) Rename(ctx context.Context, req *fuse.RenameRequest, newDi
if err != nil {
return err
}
case *File:
case *FileNode:
log.Error("Cannot move node into a file!")
return fuse.EPERM
default:
@ -543,9 +593,13 @@ type ipnsFile interface {
fs.HandleReader
fs.HandleWriter
fs.HandleReleaser
}
type ipnsFileNode interface {
fs.Node
fs.NodeFsyncer
fs.NodeOpener
}
var _ ipnsFileNode = (*FileNode)(nil)
var _ ipnsFile = (*File)(nil)

View File

@ -183,8 +183,8 @@ type NodeListing struct {
}
func (d *Directory) ListNames() []string {
d.Lock()
defer d.Unlock()
d.lock.Lock()
defer d.lock.Unlock()
names := make(map[string]struct{})
for n, _ := range d.childDirs {
@ -391,11 +391,3 @@ func (d *Directory) GetNode() (*dag.Node, error) {
return d.node.Copy(), nil
}
func (d *Directory) Lock() {
d.lock.Lock()
}
func (d *Directory) Unlock() {
d.lock.Unlock()
}

151
mfs/fd.go Normal file
View File

@ -0,0 +1,151 @@
package mfs
import (
"fmt"
"io"
mod "github.com/ipfs/go-ipfs/unixfs/mod"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
)
type FileDescriptor interface {
io.Reader
CtxReadFull(context.Context, []byte) (int, error)
io.Writer
io.WriterAt
io.Closer
io.Seeker
Truncate(int64) error
Size() (int64, error)
Sync() error
Flush() error
}
type fileDescriptor struct {
inode *File
mod *mod.DagModifier
perms int
sync bool
hasChanges bool
closed bool
}
// Size returns the size of the file referred to by this descriptor
func (fi *fileDescriptor) Size() (int64, error) {
return fi.mod.Size()
}
// Truncate truncates the file to size
func (fi *fileDescriptor) Truncate(size int64) error {
if fi.perms == OpenReadOnly {
return fmt.Errorf("cannot call truncate on readonly file descriptor")
}
fi.hasChanges = true
return fi.mod.Truncate(size)
}
// Write writes the given data to the file at its current offset
func (fi *fileDescriptor) Write(b []byte) (int, error) {
if fi.perms == OpenReadOnly {
return 0, fmt.Errorf("cannot write on not writeable descriptor")
}
fi.hasChanges = true
return fi.mod.Write(b)
}
// Read reads into the given buffer from the current offset
func (fi *fileDescriptor) Read(b []byte) (int, error) {
if fi.perms == OpenWriteOnly {
return 0, fmt.Errorf("cannot read on write-only descriptor")
}
return fi.mod.Read(b)
}
// Read reads into the given buffer from the current offset
func (fi *fileDescriptor) CtxReadFull(ctx context.Context, b []byte) (int, error) {
if fi.perms == OpenWriteOnly {
return 0, fmt.Errorf("cannot read on write-only descriptor")
}
return fi.mod.CtxReadFull(ctx, b)
}
// Close flushes, then propogates the modified dag node up the directory structure
// and signals a republish to occur
func (fi *fileDescriptor) Close() error {
defer func() {
switch fi.perms {
case OpenReadOnly:
fi.inode.desclock.RUnlock()
case OpenWriteOnly, OpenReadWrite:
fi.inode.desclock.Unlock()
}
}()
if fi.closed {
panic("attempted to close file descriptor twice!")
}
if fi.hasChanges {
err := fi.mod.Sync()
if err != nil {
return err
}
fi.hasChanges = false
// explicitly stay locked for flushUp call,
// it will manage the lock for us
return fi.flushUp(fi.sync)
}
return nil
}
func (fi *fileDescriptor) Sync() error {
return fi.flushUp(false)
}
func (fi *fileDescriptor) Flush() error {
return fi.flushUp(true)
}
// flushUp syncs the file and adds it to the dagservice
// it *must* be called with the File's lock taken
func (fi *fileDescriptor) flushUp(fullsync bool) error {
nd, err := fi.mod.GetNode()
if err != nil {
return err
}
_, err = fi.inode.dserv.Add(nd)
if err != nil {
return err
}
fi.inode.nodelk.Lock()
fi.inode.node = nd
name := fi.inode.name
parent := fi.inode.parent
fi.inode.nodelk.Unlock()
return parent.closeChild(name, nd, fullsync)
}
// Seek implements io.Seeker
func (fi *fileDescriptor) Seek(offset int64, whence int) (int64, error) {
return fi.mod.Seek(offset, whence)
}
// Write At writes the given bytes at the offset 'at'
func (fi *fileDescriptor) WriteAt(b []byte, at int64) (int, error) {
if fi.perms == OpenReadOnly {
return 0, fmt.Errorf("cannot write on not writeable descriptor")
}
fi.hasChanges = true
return fi.mod.WriteAt(b, at)
}

View File

@ -1,10 +1,12 @@
package mfs
import (
"fmt"
"sync"
chunk "github.com/ipfs/go-ipfs/importer/chunk"
dag "github.com/ipfs/go-ipfs/merkledag"
ft "github.com/ipfs/go-ipfs/unixfs"
mod "github.com/ipfs/go-ipfs/unixfs/mod"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
@ -13,150 +15,98 @@ import (
type File struct {
parent childCloser
name string
hasChanges bool
name string
dserv dag.DAGService
mod *mod.DagModifier
lock sync.Mutex
desclock sync.RWMutex
dserv dag.DAGService
node *dag.Node
nodelk sync.Mutex
}
// NewFile returns a NewFile object with the given parameters
func NewFile(name string, node *dag.Node, parent childCloser, dserv dag.DAGService) (*File, error) {
dmod, err := mod.NewDagModifier(context.Background(), node, dserv, chunk.DefaultSplitter)
if err != nil {
return nil, err
}
return &File{
dserv: dserv,
parent: parent,
name: name,
mod: dmod,
node: node,
}, nil
}
// Write writes the given data to the file at its current offset
func (fi *File) Write(b []byte) (int, error) {
fi.Lock()
defer fi.Unlock()
fi.hasChanges = true
return fi.mod.Write(b)
}
const (
OpenReadOnly = iota
OpenWriteOnly
OpenReadWrite
)
// Read reads into the given buffer from the current offset
func (fi *File) Read(b []byte) (int, error) {
fi.Lock()
defer fi.Unlock()
return fi.mod.Read(b)
}
func (fi *File) Open(flags int, sync bool) (FileDescriptor, error) {
fi.nodelk.Lock()
node := fi.node
fi.nodelk.Unlock()
// Read reads into the given buffer from the current offset
func (fi *File) CtxReadFull(ctx context.Context, b []byte) (int, error) {
fi.Lock()
defer fi.Unlock()
return fi.mod.CtxReadFull(ctx, b)
}
// Close flushes, then propogates the modified dag node up the directory structure
// and signals a republish to occur
func (fi *File) Close() error {
fi.Lock()
if fi.hasChanges {
err := fi.mod.Sync()
if err != nil {
fi.Unlock()
return err
}
fi.hasChanges = false
// explicitly stay locked for flushUp call,
// it will manage the lock for us
return fi.flushUp(true)
switch flags {
case OpenReadOnly:
fi.desclock.RLock()
case OpenWriteOnly, OpenReadWrite:
fi.desclock.Lock()
default:
// TODO: support other modes
return nil, fmt.Errorf("mode not supported")
}
fi.Unlock()
return nil
}
// flushUp syncs the file and adds it to the dagservice
// it *must* be called with the File's lock taken
func (fi *File) flushUp(fullsync bool) error {
nd, err := fi.mod.GetNode()
dmod, err := mod.NewDagModifier(context.TODO(), node, fi.dserv, chunk.DefaultSplitter)
if err != nil {
fi.Unlock()
return err
return nil, err
}
_, err = fi.dserv.Add(nd)
if err != nil {
fi.Unlock()
return err
}
name := fi.name
parent := fi.parent
// explicit unlock *only* before closeChild call
fi.Unlock()
return parent.closeChild(name, nd, fullsync)
}
// Sync flushes the changes in the file to disk
func (fi *File) Sync() error {
fi.Lock()
return fi.flushUp(false)
}
// Seek implements io.Seeker
func (fi *File) Seek(offset int64, whence int) (int64, error) {
fi.Lock()
defer fi.Unlock()
return fi.mod.Seek(offset, whence)
}
// Write At writes the given bytes at the offset 'at'
func (fi *File) WriteAt(b []byte, at int64) (int, error) {
fi.Lock()
defer fi.Unlock()
fi.hasChanges = true
return fi.mod.WriteAt(b, at)
return &fileDescriptor{
inode: fi,
perms: flags,
sync: sync,
mod: dmod,
}, nil
}
// Size returns the size of this file
func (fi *File) Size() (int64, error) {
fi.Lock()
defer fi.Unlock()
return fi.mod.Size()
fi.nodelk.Lock()
defer fi.nodelk.Unlock()
pbd, err := ft.FromBytes(fi.node.Data)
if err != nil {
return 0, err
}
return int64(pbd.GetFilesize()), nil
}
// GetNode returns the dag node associated with this file
func (fi *File) GetNode() (*dag.Node, error) {
fi.Lock()
defer fi.Unlock()
return fi.mod.GetNode()
fi.nodelk.Lock()
defer fi.nodelk.Unlock()
return fi.node, nil
}
// Truncate truncates the file to size
func (fi *File) Truncate(size int64) error {
fi.Lock()
defer fi.Unlock()
fi.hasChanges = true
return fi.mod.Truncate(size)
func (fi *File) Flush() error {
// open the file in fullsync mode
fd, err := fi.Open(OpenWriteOnly, true)
if err != nil {
return err
}
defer fd.Close()
return fd.Flush()
}
func (fi *File) Sync() error {
// just being able to take the writelock means the descriptor is synced
fi.desclock.Lock()
fi.desclock.Unlock()
return nil
}
// Type returns the type FSNode this is
func (fi *File) Type() NodeType {
return TFile
}
// Lock the file
func (fi *File) Lock() {
fi.lock.Lock()
}
// Unlock the file
func (fi *File) Unlock() {
fi.lock.Unlock()
}

View File

@ -9,7 +9,9 @@ import (
"math/rand"
"os"
"sort"
"sync"
"testing"
"time"
randbo "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/dustin/randbo"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ipfs/go-datastore"
@ -38,6 +40,10 @@ func getDagserv(t *testing.T) dag.DAGService {
func getRandFile(t *testing.T, ds dag.DAGService, size int64) *dag.Node {
r := io.LimitReader(u.NewTimeSeededRand(), size)
return fileNodeFromReader(t, ds, r)
}
func fileNodeFromReader(t *testing.T, ds dag.DAGService, r io.Reader) *dag.Node {
nd, err := importer.BuildDagFromReader(ds, chunk.DefaultSplitter(r))
if err != nil {
t.Fatal(err)
@ -143,7 +149,12 @@ func assertFileAtPath(ds dag.DAGService, root *Directory, exp *dag.Node, pth str
return fmt.Errorf("%s was not a file!", pth)
}
out, err := ioutil.ReadAll(file)
rfd, err := file.Open(OpenReadOnly, false)
if err != nil {
return err
}
out, err := ioutil.ReadAll(rfd)
if err != nil {
return err
}
@ -374,6 +385,11 @@ func TestMfsFile(t *testing.T) {
t.Fatal("some is seriously wrong here")
}
wfd, err := fi.Open(OpenReadWrite, true)
if err != nil {
t.Fatal(err)
}
// assert size is as expected
size, err := fi.Size()
if size != int64(fisize) {
@ -382,7 +398,7 @@ func TestMfsFile(t *testing.T) {
// write to beginning of file
b := []byte("THIS IS A TEST")
n, err := fi.Write(b)
n, err := wfd.Write(b)
if err != nil {
t.Fatal(err)
}
@ -392,19 +408,19 @@ func TestMfsFile(t *testing.T) {
}
// sync file
err = fi.Sync()
err = wfd.Sync()
if err != nil {
t.Fatal(err)
}
// make sure size hasnt changed
size, err = fi.Size()
size, err = wfd.Size()
if size != int64(fisize) {
t.Fatal("size isnt correct")
}
// seek back to beginning
ns, err := fi.Seek(0, os.SEEK_SET)
ns, err := wfd.Seek(0, os.SEEK_SET)
if err != nil {
t.Fatal(err)
}
@ -415,7 +431,7 @@ func TestMfsFile(t *testing.T) {
// read back bytes we wrote
buf := make([]byte, len(b))
n, err = fi.Read(buf)
n, err = wfd.Read(buf)
if err != nil {
t.Fatal(err)
}
@ -429,12 +445,12 @@ func TestMfsFile(t *testing.T) {
}
// truncate file to ten bytes
err = fi.Truncate(10)
err = wfd.Truncate(10)
if err != nil {
t.Fatal(err)
}
size, err = fi.Size()
size, err = wfd.Size()
if err != nil {
t.Fatal(err)
}
@ -445,7 +461,7 @@ func TestMfsFile(t *testing.T) {
// 'writeAt' to extend it
data := []byte("this is a test foo foo foo")
nwa, err := fi.WriteAt(data, 5)
nwa, err := wfd.WriteAt(data, 5)
if err != nil {
t.Fatal(err)
}
@ -455,7 +471,7 @@ func TestMfsFile(t *testing.T) {
}
// assert size once more
size, err = fi.Size()
size, err = wfd.Size()
if err != nil {
t.Fatal(err)
}
@ -464,14 +480,14 @@ func TestMfsFile(t *testing.T) {
t.Fatal("size was incorrect")
}
// make sure we can get node. TODO: verify it later
_, err = fi.GetNode()
// close it out!
err = wfd.Close()
if err != nil {
t.Fatal(err)
}
// close it out!
err = fi.Close()
// make sure we can get node. TODO: verify it later
_, err = fi.GetNode()
if err != nil {
t.Fatal(err)
}
@ -529,13 +545,18 @@ func actorMakeFile(d *Directory) error {
return err
}
r := io.LimitReader(randbo.New(), int64(77*rand.Intn(123)))
_, err = io.Copy(f, r)
wfd, err := f.Open(OpenWriteOnly, true)
if err != nil {
return err
}
err = f.Close()
r := io.LimitReader(randbo.New(), int64(77*rand.Intn(123)))
_, err = io.Copy(wfd, r)
if err != nil {
return err
}
err = wfd.Close()
if err != nil {
return err
}
@ -630,9 +651,14 @@ func actorWriteFile(d *Directory) error {
return err
}
wfd, err := fi.Open(OpenWriteOnly, true)
if err != nil {
return err
}
offset := rand.Int63n(s)
n, err := fi.WriteAt(buf, offset)
n, err := wfd.WriteAt(buf, offset)
if err != nil {
return err
}
@ -640,7 +666,7 @@ func actorWriteFile(d *Directory) error {
return fmt.Errorf("didnt write enough")
}
return fi.Close()
return wfd.Close()
}
func actorReadFile(d *Directory) error {
@ -657,12 +683,17 @@ func actorReadFile(d *Directory) error {
return err
}
_, err = ioutil.ReadAll(fi)
rfd, err := fi.Open(OpenReadOnly, false)
if err != nil {
return err
}
return fi.Close()
_, err = ioutil.ReadAll(rfd)
if err != nil {
return err
}
return rfd.Close()
}
func testActor(rt *Root, iterations int, errs chan error) {
@ -780,3 +811,187 @@ func TestFlushing(t *testing.T) {
t.Fatalf("dag looks wrong, expected %s, but got %s", exp, rnk.B58String())
}
}
func readFile(rt *Root, path string, offset int64, buf []byte) error {
n, err := Lookup(rt, path)
if err != nil {
return err
}
fi, ok := n.(*File)
if !ok {
return fmt.Errorf("%s was not a file", path)
}
fd, err := fi.Open(OpenReadOnly, false)
if err != nil {
return err
}
_, err = fd.Seek(offset, os.SEEK_SET)
if err != nil {
return err
}
nread, err := fd.Read(buf)
if err != nil {
return err
}
if nread != len(buf) {
return fmt.Errorf("didnt read enough!")
}
return fd.Close()
}
func TestConcurrentReads(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ds, rt := setupRoot(ctx, t)
rootdir := rt.GetValue().(*Directory)
path := "a/b/c"
d := mkdirP(t, rootdir, path)
buf := make([]byte, 2048)
randbo.New().Read(buf)
fi := fileNodeFromReader(t, ds, bytes.NewReader(buf))
err := d.AddChild("afile", fi)
if err != nil {
t.Fatal(err)
}
var wg sync.WaitGroup
nloops := 100
for i := 0; i < 10; i++ {
wg.Add(1)
go func(me int) {
defer wg.Done()
mybuf := make([]byte, len(buf))
for j := 0; j < nloops; j++ {
offset := rand.Intn(len(buf))
length := rand.Intn(len(buf) - offset)
err := readFile(rt, "/a/b/c/afile", int64(offset), mybuf[:length])
if err != nil {
t.Error("readfile failed: ", err)
return
}
if !bytes.Equal(mybuf[:length], buf[offset:offset+length]) {
t.Error("incorrect read!")
}
}
}(i)
}
wg.Wait()
}
func TestFileDescriptors(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ds, rt := setupRoot(ctx, t)
dir := rt.GetValue().(*Directory)
nd := &dag.Node{Data: ft.FilePBData(nil, 0)}
fi, err := NewFile("test", nd, dir, ds)
if err != nil {
t.Fatal(err)
}
// test read only
rfd1, err := fi.Open(OpenReadOnly, false)
if err != nil {
t.Fatal(err)
}
err = rfd1.Truncate(0)
if err == nil {
t.Fatal("shouldnt be able to truncate readonly fd")
}
_, err = rfd1.Write([]byte{})
if err == nil {
t.Fatal("shouldnt be able to write to readonly fd")
}
_, err = rfd1.Read([]byte{})
if err != nil {
t.Fatalf("expected to be able to read from file: %s", err)
}
done := make(chan struct{})
go func() {
defer close(done)
// can open second readonly file descriptor
rfd2, err := fi.Open(OpenReadOnly, false)
if err != nil {
t.Error(err)
return
}
rfd2.Close()
}()
select {
case <-time.After(time.Second):
t.Fatal("open second file descriptor failed")
case <-done:
}
if t.Failed() {
return
}
// test not being able to open for write until reader are closed
done = make(chan struct{})
go func() {
defer close(done)
wfd1, err := fi.Open(OpenWriteOnly, true)
if err != nil {
t.Error(err)
}
wfd1.Close()
}()
select {
case <-time.After(time.Millisecond * 200):
case <-done:
if t.Failed() {
return
}
t.Fatal("shouldnt have been able to open file for writing")
}
err = rfd1.Close()
if err != nil {
t.Fatal(err)
}
select {
case <-time.After(time.Second):
t.Fatal("should have been able to open write fd after closing read fd")
case <-done:
}
wfd, err := fi.Open(OpenWriteOnly, true)
if err != nil {
t.Fatal(err)
}
_, err = wfd.Read([]byte{})
if err == nil {
t.Fatal("shouldnt have been able to read from write only filedescriptor")
}
_, err = wfd.Write([]byte{})
if err != nil {
t.Fatal(err)
}
}

View File

@ -196,87 +196,17 @@ func DirLookup(d *Directory, pth string) (FSNode, error) {
return cur, nil
}
func FlushPath(r *Root, pth string) error {
parts := path.SplitList(strings.Trim(pth, "/"))
if len(parts) == 1 && parts[0] == "" {
parts = nil
}
d, ok := r.GetValue().(*Directory)
if !ok {
return errors.New("mfs root somehow didnt point to a directory")
}
nd, err := flushPathRec(d, parts)
func FlushPath(rt *Root, pth string) error {
nd, err := Lookup(rt, pth)
if err != nil {
return err
}
k, err := nd.Key()
err = nd.Flush()
if err != nil {
return err
}
r.repub.Update(k)
r.repub.WaitPub()
rt.repub.WaitPub()
return nil
}
func flushPathRec(d *Directory, parts []string) (*dag.Node, error) {
if len(parts) == 0 {
nd, err := d.GetNode()
if err != nil {
return nil, err
}
return nd, nil
}
d.Lock()
defer d.Unlock()
next, err := d.childUnsync(parts[0])
if err != nil {
log.Errorf("childnode: %q %q", parts[0], err)
return nil, err
}
var ndagnode *dag.Node
switch next := next.(type) {
case *Directory:
nd, err := flushPathRec(next, parts[1:])
if err != nil {
return nil, err
}
ndagnode = nd
case *File:
if len(parts) > 1 {
return nil, fmt.Errorf("%s is a file, not a directory", parts[0])
}
child, err := next.GetNode()
if err != nil {
return nil, err
}
ndagnode = child
default:
return nil, fmt.Errorf("unrecognized FSNode type: %#v", next)
}
newnode, err := d.node.UpdateNodeLink(parts[0], ndagnode)
if err != nil {
return nil, err
}
_, err = d.dserv.Add(newnode)
if err != nil {
return nil, err
}
d.node = newnode
return newnode, nil
}

View File

@ -42,9 +42,8 @@ const (
// FSNode represents any node (directory, root, or file) in the mfs filesystem
type FSNode interface {
GetNode() (*dag.Node, error)
Flush() error
Type() NodeType
Lock()
Unlock()
}
// Root represents the root of a filesystem tree
@ -210,6 +209,13 @@ func (p *Republisher) pubNow() {
}
func (p *Republisher) WaitPub() {
p.lk.Lock()
consistent := p.lastpub == p.val
p.lk.Unlock()
if consistent {
return
}
wait := make(chan struct{})
p.pubnowch <- wait
<-wait
@ -273,7 +279,6 @@ func (np *Republisher) publish(ctx context.Context) error {
topub := np.val
np.lk.Unlock()
log.Info("Publishing Changes!")
err := np.pubfunc(ctx, topub)
if err != nil {
return err

View File

@ -372,7 +372,7 @@ func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) {
case os.SEEK_SET:
newoffset = uint64(offset)
case os.SEEK_END:
return 0, ErrSeekEndNotImpl
newoffset = uint64(fisize) - uint64(offset)
default:
return 0, ErrUnrecognizedWhence
}