mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-25 23:21:54 +08:00
fix locking and race conditions throughout ipnsfs and the pinner
This commit is contained in:
@ -15,17 +15,16 @@ var ErrNotYetImplemented = errors.New("not yet implemented")
|
|||||||
var ErrInvalidChild = errors.New("invalid child node")
|
var ErrInvalidChild = errors.New("invalid child node")
|
||||||
|
|
||||||
type Directory struct {
|
type Directory struct {
|
||||||
fs *Filesystem
|
fs *Filesystem
|
||||||
parent childCloser
|
parent childCloser
|
||||||
|
|
||||||
childDirs map[string]*Directory
|
childDirs map[string]*Directory
|
||||||
files map[string]*file
|
files map[string]*File
|
||||||
|
|
||||||
node *dag.Node
|
|
||||||
name string
|
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
|
node *dag.Node
|
||||||
|
|
||||||
ref int
|
name string
|
||||||
refLock sync.Mutex
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDirectory(name string, node *dag.Node, parent childCloser, fs *Filesystem) *Directory {
|
func NewDirectory(name string, node *dag.Node, parent childCloser, fs *Filesystem) *Directory {
|
||||||
@ -35,18 +34,19 @@ func NewDirectory(name string, node *dag.Node, parent childCloser, fs *Filesyste
|
|||||||
node: node,
|
node: node,
|
||||||
parent: parent,
|
parent: parent,
|
||||||
childDirs: make(map[string]*Directory),
|
childDirs: make(map[string]*Directory),
|
||||||
files: make(map[string]*file),
|
files: make(map[string]*File),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Directory) Open(tpath []string, mode int) (File, error) {
|
// Open opens a file at the given path 'tpath'
|
||||||
|
func (d *Directory) Open(tpath []string, mode int) (*File, error) {
|
||||||
if len(tpath) == 0 {
|
if len(tpath) == 0 {
|
||||||
return nil, ErrIsDirectory
|
return nil, ErrIsDirectory
|
||||||
}
|
}
|
||||||
if len(tpath) == 1 {
|
if len(tpath) == 1 {
|
||||||
fi, err := d.childFile(tpath[0])
|
fi, err := d.childFile(tpath[0])
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return fi.withMode(mode), nil
|
return fi, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if mode|os.O_CREATE != 0 {
|
if mode|os.O_CREATE != 0 {
|
||||||
@ -57,7 +57,7 @@ func (d *Directory) Open(tpath []string, mode int) (File, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
d.files[tpath[0]] = nfi
|
d.files[tpath[0]] = nfi
|
||||||
return nfi.withMode(mode), nil
|
return nfi, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, ErrNoSuch
|
return nil, ErrNoSuch
|
||||||
@ -102,7 +102,7 @@ func (d *Directory) Type() NodeType {
|
|||||||
return TDir
|
return TDir
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Directory) childFile(name string) (*file, error) {
|
func (d *Directory) childFile(name string) (*File, error) {
|
||||||
fi, ok := d.files[name]
|
fi, ok := d.files[name]
|
||||||
if ok {
|
if ok {
|
||||||
return fi, nil
|
return fi, nil
|
||||||
@ -334,14 +334,10 @@ func (d *Directory) GetNode() (*dag.Node, error) {
|
|||||||
return d.node, nil
|
return d.node, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Directory) Upref() {
|
func (d *Directory) Lock() {
|
||||||
d.refLock.Lock()
|
d.lock.Lock()
|
||||||
d.ref++
|
|
||||||
d.refLock.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Directory) Deref() {
|
func (d *Directory) Unlock() {
|
||||||
d.refLock.Lock()
|
d.lock.Unlock()
|
||||||
d.ref--
|
|
||||||
d.refLock.Unlock()
|
|
||||||
}
|
}
|
||||||
|
@ -1,9 +1,7 @@
|
|||||||
package ipnsfs
|
package ipnsfs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"sync"
|
||||||
"io"
|
|
||||||
"os"
|
|
||||||
|
|
||||||
chunk "github.com/jbenet/go-ipfs/importer/chunk"
|
chunk "github.com/jbenet/go-ipfs/importer/chunk"
|
||||||
dag "github.com/jbenet/go-ipfs/merkledag"
|
dag "github.com/jbenet/go-ipfs/merkledag"
|
||||||
@ -12,33 +10,24 @@ import (
|
|||||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
type File interface {
|
type File struct {
|
||||||
io.ReadWriteCloser
|
|
||||||
io.WriterAt
|
|
||||||
Seek(int64, int) (int64, error)
|
|
||||||
Size() (int64, error)
|
|
||||||
Flush() error
|
|
||||||
Truncate(int64) error
|
|
||||||
FSNode
|
|
||||||
}
|
|
||||||
|
|
||||||
type file struct {
|
|
||||||
parent childCloser
|
parent childCloser
|
||||||
fs *Filesystem
|
fs *Filesystem
|
||||||
|
|
||||||
name string
|
name string
|
||||||
hasChanges bool
|
hasChanges bool
|
||||||
|
|
||||||
mod *mod.DagModifier
|
mod *mod.DagModifier
|
||||||
|
lock sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFile(name string, node *dag.Node, parent childCloser, fs *Filesystem) (*file, error) {
|
func NewFile(name string, node *dag.Node, parent childCloser, fs *Filesystem) (*File, error) {
|
||||||
dmod, err := mod.NewDagModifier(context.Background(), node, fs.dserv, fs.pins.GetManual(), chunk.DefaultSplitter)
|
dmod, err := mod.NewDagModifier(context.Background(), node, fs.dserv, fs.pins.GetManual(), chunk.DefaultSplitter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &file{
|
return &File{
|
||||||
fs: fs,
|
fs: fs,
|
||||||
parent: parent,
|
parent: parent,
|
||||||
name: name,
|
name: name,
|
||||||
@ -46,16 +35,20 @@ func NewFile(name string, node *dag.Node, parent childCloser, fs *Filesystem) (*
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fi *file) Write(b []byte) (int, error) {
|
func (fi *File) Write(b []byte) (int, error) {
|
||||||
fi.hasChanges = true
|
fi.hasChanges = true
|
||||||
return fi.mod.Write(b)
|
return fi.mod.Write(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fi *file) Read(b []byte) (int, error) {
|
func (fi *File) Read(b []byte) (int, error) {
|
||||||
|
fi.lock.Lock()
|
||||||
|
defer fi.lock.Unlock()
|
||||||
return fi.mod.Read(b)
|
return fi.mod.Read(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fi *file) Close() error {
|
func (fi *File) Close() error {
|
||||||
|
fi.lock.Lock()
|
||||||
|
defer fi.lock.Unlock()
|
||||||
if fi.hasChanges {
|
if fi.hasChanges {
|
||||||
err := fi.mod.Flush()
|
err := fi.mod.Flush()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -67,7 +60,9 @@ func (fi *file) Close() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fi.lock.Unlock()
|
||||||
err = fi.parent.closeChild(fi.name, nd)
|
err = fi.parent.closeChild(fi.name, nd)
|
||||||
|
fi.lock.Lock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -78,47 +73,52 @@ func (fi *file) Close() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fi *file) Flush() error {
|
func (fi *File) Flush() error {
|
||||||
|
fi.lock.Lock()
|
||||||
|
defer fi.lock.Unlock()
|
||||||
return fi.mod.Flush()
|
return fi.mod.Flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fi *file) withMode(mode int) File {
|
func (fi *File) Seek(offset int64, whence int) (int64, error) {
|
||||||
if mode == os.O_RDONLY {
|
fi.lock.Lock()
|
||||||
return &readOnlyFile{fi}
|
defer fi.lock.Unlock()
|
||||||
}
|
|
||||||
return fi
|
|
||||||
}
|
|
||||||
|
|
||||||
func (fi *file) Seek(offset int64, whence int) (int64, error) {
|
|
||||||
return fi.mod.Seek(offset, whence)
|
return fi.mod.Seek(offset, whence)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fi *file) WriteAt(b []byte, at int64) (int, error) {
|
func (fi *File) WriteAt(b []byte, at int64) (int, error) {
|
||||||
|
fi.lock.Lock()
|
||||||
|
defer fi.lock.Unlock()
|
||||||
fi.hasChanges = true
|
fi.hasChanges = true
|
||||||
return fi.mod.WriteAt(b, at)
|
return fi.mod.WriteAt(b, at)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fi *file) Size() (int64, error) {
|
func (fi *File) Size() (int64, error) {
|
||||||
|
fi.lock.Lock()
|
||||||
|
defer fi.lock.Unlock()
|
||||||
return fi.mod.Size()
|
return fi.mod.Size()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fi *file) GetNode() (*dag.Node, error) {
|
func (fi *File) GetNode() (*dag.Node, error) {
|
||||||
|
fi.lock.Lock()
|
||||||
|
defer fi.lock.Unlock()
|
||||||
return fi.mod.GetNode()
|
return fi.mod.GetNode()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fi *file) Truncate(size int64) error {
|
func (fi *File) Truncate(size int64) error {
|
||||||
|
fi.lock.Lock()
|
||||||
|
defer fi.lock.Unlock()
|
||||||
fi.hasChanges = true
|
fi.hasChanges = true
|
||||||
return fi.mod.Truncate(size)
|
return fi.mod.Truncate(size)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fi *file) Type() NodeType {
|
func (fi *File) Type() NodeType {
|
||||||
return TFile
|
return TFile
|
||||||
}
|
}
|
||||||
|
|
||||||
type readOnlyFile struct {
|
func (fi *File) Lock() {
|
||||||
*file
|
fi.lock.Lock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ro *readOnlyFile) Write([]byte) (int, error) {
|
func (fi *File) Unlock() {
|
||||||
return 0, errors.New("permission denied: file readonly")
|
fi.lock.Unlock()
|
||||||
}
|
}
|
||||||
|
@ -58,7 +58,7 @@ func NewFilesystem(ctx context.Context, ds dag.DAGService, nsys namesys.NameSyst
|
|||||||
return fs, nil
|
return fs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *Filesystem) Open(tpath string, mode int) (File, error) {
|
func (fs *Filesystem) Open(tpath string, mode int) (*File, error) {
|
||||||
pathelem := strings.Split(tpath, "/")
|
pathelem := strings.Split(tpath, "/")
|
||||||
r, ok := fs.roots[pathelem[0]]
|
r, ok := fs.roots[pathelem[0]]
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -96,6 +96,8 @@ const (
|
|||||||
type FSNode interface {
|
type FSNode interface {
|
||||||
GetNode() (*dag.Node, error)
|
GetNode() (*dag.Node, error)
|
||||||
Type() NodeType
|
Type() NodeType
|
||||||
|
Lock()
|
||||||
|
Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// KeyRoot represents the root of a filesystem tree pointed to by a given keypair
|
// KeyRoot represents the root of a filesystem tree pointed to by a given keypair
|
||||||
@ -177,7 +179,7 @@ func (kr *KeyRoot) GetValue() FSNode {
|
|||||||
return kr.val
|
return kr.val
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kr *KeyRoot) Open(tpath []string, mode int) (File, error) {
|
func (kr *KeyRoot) Open(tpath []string, mode int) (*File, error) {
|
||||||
if kr.val == nil {
|
if kr.val == nil {
|
||||||
// No entry here... what should we do?
|
// No entry here... what should we do?
|
||||||
panic("nyi")
|
panic("nyi")
|
||||||
@ -195,7 +197,7 @@ func (kr *KeyRoot) Open(tpath []string, mode int) (File, error) {
|
|||||||
switch t := kr.val.(type) {
|
switch t := kr.val.(type) {
|
||||||
case *Directory:
|
case *Directory:
|
||||||
return nil, ErrIsDirectory
|
return nil, ErrIsDirectory
|
||||||
case File:
|
case *File:
|
||||||
return t, nil
|
return t, nil
|
||||||
default:
|
default:
|
||||||
panic("unrecognized type, should not happen")
|
panic("unrecognized type, should not happen")
|
||||||
@ -221,10 +223,14 @@ func (kr *KeyRoot) Publish(ctx context.Context) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
child.Lock()
|
||||||
k, err := kr.fs.dserv.Add(nd)
|
k, err := kr.fs.dserv.Add(nd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
child.Unlock()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
child.Unlock()
|
||||||
|
// Dont want to hold the lock while we publish
|
||||||
|
|
||||||
fmt.Println("Publishing!")
|
fmt.Println("Publishing!")
|
||||||
return kr.fs.nsys.Publish(ctx, kr.key, k)
|
return kr.fs.nsys.Publish(ctx, kr.key, k)
|
||||||
|
Reference in New Issue
Block a user