mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-01 19:24:14 +08:00
do not hold locks for multiple filesystem nodes at the same time
License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com>
This commit is contained in:
39
mfs/dir.go
39
mfs/dir.go
@ -50,19 +50,34 @@ func NewDirectory(ctx context.Context, name string, node *dag.Node, parent child
|
|||||||
// closeChild updates the child by the given name to the dag node 'nd'
|
// closeChild updates the child by the given name to the dag node 'nd'
|
||||||
// and changes its own dag node, then propogates the changes upward
|
// and changes its own dag node, then propogates the changes upward
|
||||||
func (d *Directory) closeChild(name string, nd *dag.Node) error {
|
func (d *Directory) closeChild(name string, nd *dag.Node) error {
|
||||||
_, err := d.dserv.Add(nd)
|
mynd, err := d.closeChildUpdate(name, nd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return d.parent.closeChild(d.name, mynd)
|
||||||
|
}
|
||||||
|
|
||||||
|
// closeChildUpdate is the portion of closeChild that needs to be locked around
|
||||||
|
func (d *Directory) closeChildUpdate(name string, nd *dag.Node) (*dag.Node, error) {
|
||||||
d.lock.Lock()
|
d.lock.Lock()
|
||||||
defer d.lock.Unlock()
|
defer d.lock.Unlock()
|
||||||
err = d.updateChild(name, nd)
|
|
||||||
|
err := d.updateChild(name, nd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return d.parent.closeChild(d.name, d.node)
|
return d.flushCurrentNode()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Directory) flushCurrentNode() (*dag.Node, error) {
|
||||||
|
_, err := d.dserv.Add(d.node)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return d.node.Copy(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Directory) updateChild(name string, nd *dag.Node) error {
|
func (d *Directory) updateChild(name string, nd *dag.Node) error {
|
||||||
@ -263,7 +278,7 @@ func (d *Directory) Mkdir(name string) (*Directory, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = d.parent.closeChild(d.name, d.node)
|
err = d.flushUp()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -285,13 +300,18 @@ func (d *Directory) Unlink(name string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return d.flushUp()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Directory) flushUp() error {
|
||||||
|
|
||||||
return d.parent.closeChild(d.name, d.node)
|
return d.parent.closeChild(d.name, d.node)
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddChild adds the node 'nd' under this directory giving it the name 'name'
|
// AddChild adds the node 'nd' under this directory giving it the name 'name'
|
||||||
func (d *Directory) AddChild(name string, nd *dag.Node) error {
|
func (d *Directory) AddChild(name string, nd *dag.Node) error {
|
||||||
d.Lock()
|
d.lock.Lock()
|
||||||
defer d.Unlock()
|
defer d.lock.Unlock()
|
||||||
|
|
||||||
_, err := d.childUnsync(name)
|
_, err := d.childUnsync(name)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
@ -310,7 +330,6 @@ func (d *Directory) AddChild(name string, nd *dag.Node) error {
|
|||||||
|
|
||||||
d.modTime = time.Now()
|
d.modTime = time.Now()
|
||||||
|
|
||||||
//return d.parent.closeChild(d.name, d.node)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -353,8 +372,8 @@ func (d *Directory) sync() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (d *Directory) GetNode() (*dag.Node, error) {
|
func (d *Directory) GetNode() (*dag.Node, error) {
|
||||||
d.Lock()
|
d.lock.Lock()
|
||||||
defer d.Unlock()
|
defer d.lock.Unlock()
|
||||||
|
|
||||||
err := d.sync()
|
err := d.sync()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
46
mfs/file.go
46
mfs/file.go
@ -16,8 +16,9 @@ type File struct {
|
|||||||
name string
|
name string
|
||||||
hasChanges bool
|
hasChanges bool
|
||||||
|
|
||||||
mod *mod.DagModifier
|
dserv dag.DAGService
|
||||||
lock sync.Mutex
|
mod *mod.DagModifier
|
||||||
|
lock sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewFile returns a NewFile object with the given parameters
|
// NewFile returns a NewFile object with the given parameters
|
||||||
@ -28,6 +29,7 @@ func NewFile(name string, node *dag.Node, parent childCloser, dserv dag.DAGServi
|
|||||||
}
|
}
|
||||||
|
|
||||||
return &File{
|
return &File{
|
||||||
|
dserv: dserv,
|
||||||
parent: parent,
|
parent: parent,
|
||||||
name: name,
|
name: name,
|
||||||
mod: dmod,
|
mod: dmod,
|
||||||
@ -60,31 +62,45 @@ func (fi *File) CtxReadFull(ctx context.Context, b []byte) (int, error) {
|
|||||||
// and signals a republish to occur
|
// and signals a republish to occur
|
||||||
func (fi *File) Close() error {
|
func (fi *File) Close() error {
|
||||||
fi.Lock()
|
fi.Lock()
|
||||||
defer fi.Unlock()
|
|
||||||
if fi.hasChanges {
|
if fi.hasChanges {
|
||||||
err := fi.mod.Sync()
|
err := fi.mod.Sync()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
nd, err := fi.mod.GetNode()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
fi.Unlock()
|
|
||||||
err = fi.parent.closeChild(fi.name, nd)
|
|
||||||
fi.Lock()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
fi.hasChanges = false
|
fi.hasChanges = false
|
||||||
|
|
||||||
|
// explicitly stay locked for flushUp call,
|
||||||
|
// it will manage the lock for us
|
||||||
|
return fi.flushUp()
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// flushUp syncs the file and adds it to the dagservice
|
||||||
|
// it *must* be called with the File's lock taken
|
||||||
|
func (fi *File) flushUp() error {
|
||||||
|
nd, err := fi.mod.GetNode()
|
||||||
|
if err != nil {
|
||||||
|
fi.Unlock()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = fi.dserv.Add(nd)
|
||||||
|
if err != nil {
|
||||||
|
fi.Unlock()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
name := fi.name
|
||||||
|
parent := fi.parent
|
||||||
|
|
||||||
|
// explicit unlock *only* before closeChild call
|
||||||
|
fi.Unlock()
|
||||||
|
return parent.closeChild(name, nd)
|
||||||
|
}
|
||||||
|
|
||||||
// Sync flushes the changes in the file to disk
|
// Sync flushes the changes in the file to disk
|
||||||
func (fi *File) Sync() error {
|
func (fi *File) Sync() error {
|
||||||
fi.Lock()
|
fi.Lock()
|
||||||
|
@ -109,6 +109,23 @@ func (kr *Root) GetValue() FSNode {
|
|||||||
return kr.val
|
return kr.val
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (kr *Root) Flush() error {
|
||||||
|
nd, err := kr.GetValue().GetNode()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
k, err := kr.dserv.Add(nd)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if kr.repub != nil {
|
||||||
|
kr.repub.Update(k)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// closeChild implements the childCloser interface, and signals to the publisher that
|
// closeChild implements the childCloser interface, and signals to the publisher that
|
||||||
// there are changes ready to be published
|
// there are changes ready to be published
|
||||||
func (kr *Root) closeChild(name string, nd *dag.Node) error {
|
func (kr *Root) closeChild(name string, nd *dag.Node) error {
|
||||||
|
Reference in New Issue
Block a user