mirror of
https://github.com/ipfs/kubo.git
synced 2025-08-06 19:44:01 +08:00
fixing mutability issues in ipns
This commit is contained in:
@ -1,6 +1,7 @@
|
||||
package ipns
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@ -72,7 +73,7 @@ func CreateRoot(n *core.IpfsNode, keys []ci.PrivKey, ipfsroot string) (*Root, er
|
||||
nd := new(Node)
|
||||
nd.Ipfs = n
|
||||
nd.key = k
|
||||
nd.repub = NewRepublisher(nd, time.Millisecond*10)
|
||||
nd.repub = NewRepublisher(nd, time.Millisecond*10, time.Second)
|
||||
|
||||
go nd.repub.Run()
|
||||
|
||||
@ -182,11 +183,15 @@ func (r *Root) ReadDir(intr fs.Intr) ([]fuse.Dirent, fuse.Error) {
|
||||
|
||||
// Node is the core object representing a filesystem tree node.
|
||||
type Node struct {
|
||||
root *Root
|
||||
nsRoot *Node
|
||||
parent *Node
|
||||
|
||||
repub *Republisher
|
||||
|
||||
// Name really only for logging purposes
|
||||
// This nodes name in its parent dir.
|
||||
// NOTE: this strategy wont work well if we allow hard links
|
||||
// (im all for murdering the thought of hard links)
|
||||
name string
|
||||
|
||||
// Private keys held by nodes at the root of a keyspace
|
||||
@ -244,9 +249,10 @@ func (s *Node) Lookup(name string, intr fs.Intr) (fs.Node, fuse.Error) {
|
||||
|
||||
func (n *Node) makeChild(name string, node *mdag.Node) *Node {
|
||||
child := &Node{
|
||||
Ipfs: n.Ipfs,
|
||||
Nd: node,
|
||||
name: n.name + "/" + name,
|
||||
Ipfs: n.Ipfs,
|
||||
Nd: node,
|
||||
name: name,
|
||||
nsRoot: n.nsRoot,
|
||||
}
|
||||
|
||||
if n.nsRoot == nil {
|
||||
@ -289,7 +295,7 @@ func (s *Node) ReadAll(intr fs.Intr) ([]byte, fuse.Error) {
|
||||
}
|
||||
|
||||
func (n *Node) Write(req *fuse.WriteRequest, resp *fuse.WriteResponse, intr fs.Intr) fuse.Error {
|
||||
log.Debug("ipns: Node Write: flags = %s, offset = %d, size = %d", req.Flags.String(), req.Offset, len(req.Data))
|
||||
log.Debug("ipns: Node Write [%s]: flags = %s, offset = %d, size = %d", n.name, req.Flags.String(), req.Offset, len(req.Data))
|
||||
if n.dataBuf == nil {
|
||||
n.dataBuf = new(bytes.Buffer)
|
||||
}
|
||||
@ -312,19 +318,45 @@ func (n *Node) Flush(req *fuse.FlushRequest, intr fs.Intr) fuse.Error {
|
||||
// This operation holds everything in memory,
|
||||
// should be changed to stream the block creation/storage
|
||||
// but for now, since the buf is all in memory anyways...
|
||||
err := imp.NewDagInNode(n.dataBuf, n.Nd)
|
||||
|
||||
//NOTE:
|
||||
// This should only occur on a file object, if this were to be a
|
||||
// folder, bad things would happen.
|
||||
newNode, err := imp.NewDagFromReader(n.dataBuf)
|
||||
if err != nil {
|
||||
log.Error("ipns: Flush error: %s", err)
|
||||
// return fuse.EVERYBAD
|
||||
log.Critical("error creating dag from dataBuf: %s", err)
|
||||
return fuse.ENODATA
|
||||
}
|
||||
if n.parent != nil {
|
||||
err := n.parent.update(n.name, newNode)
|
||||
if err != nil {
|
||||
log.Critical("error in updating ipns dag tree: %s", err)
|
||||
// return fuse.ETHISISPRETTYBAD
|
||||
return fuse.ENOSYS
|
||||
}
|
||||
}
|
||||
n.Nd = newNode
|
||||
|
||||
n.sendPublishSignal()
|
||||
//TEMP
|
||||
dr, err := mdag.NewDagReader(n.Nd, n.Ipfs.DAG)
|
||||
if err != nil {
|
||||
log.Critical("Verification read failed.")
|
||||
}
|
||||
b, err := ioutil.ReadAll(dr)
|
||||
if err != nil {
|
||||
log.Critical("Verification read failed.")
|
||||
}
|
||||
fmt.Println("VERIFICATION READ")
|
||||
fmt.Printf("READ %d BYTES\n", len(b))
|
||||
fmt.Println(string(b))
|
||||
//
|
||||
|
||||
n.wasChanged()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *Node) sendPublishSignal() {
|
||||
func (n *Node) wasChanged() {
|
||||
root := n.nsRoot
|
||||
if root == nil {
|
||||
root = n
|
||||
@ -333,7 +365,8 @@ func (n *Node) sendPublishSignal() {
|
||||
root.repub.Publish <- struct{}{}
|
||||
}
|
||||
|
||||
func (n *Node) updateTree() error {
|
||||
func (n *Node) republishRoot() error {
|
||||
log.Debug("Republish root")
|
||||
var root *Node
|
||||
if n.nsRoot != nil {
|
||||
root = n.nsRoot
|
||||
@ -341,19 +374,13 @@ func (n *Node) updateTree() error {
|
||||
root = n
|
||||
}
|
||||
|
||||
err := root.Nd.Update()
|
||||
if err != nil {
|
||||
log.Error("ipns: dag tree update failed: %s", err)
|
||||
return err
|
||||
}
|
||||
|
||||
err = n.Ipfs.DAG.AddRecursive(root.Nd)
|
||||
// Add any nodes that may be new to the DAG service
|
||||
err := n.Ipfs.DAG.AddRecursive(root.Nd)
|
||||
if err != nil {
|
||||
log.Critical("ipns: Dag Add Error: %s", err)
|
||||
return err
|
||||
}
|
||||
|
||||
n.changed = false
|
||||
n.dataBuf = nil
|
||||
|
||||
ndkey, err := root.Nd.Key()
|
||||
@ -380,11 +407,13 @@ func (n *Node) Fsync(req *fuse.FsyncRequest, intr fs.Intr) fuse.Error {
|
||||
func (n *Node) Mkdir(req *fuse.MkdirRequest, intr fs.Intr) (fs.Node, fuse.Error) {
|
||||
log.Debug("Got mkdir request!")
|
||||
dagnd := &mdag.Node{Data: mdag.FolderPBData()}
|
||||
n.Nd.AddNodeLink(req.Name, dagnd)
|
||||
nnode := n.Nd.Copy()
|
||||
nnode.AddNodeLink(req.Name, dagnd)
|
||||
|
||||
child := &Node{
|
||||
Ipfs: n.Ipfs,
|
||||
Nd: dagnd,
|
||||
name: req.Name,
|
||||
}
|
||||
|
||||
if n.nsRoot == nil {
|
||||
@ -393,8 +422,17 @@ func (n *Node) Mkdir(req *fuse.MkdirRequest, intr fs.Intr) (fs.Node, fuse.Error)
|
||||
child.nsRoot = n.nsRoot
|
||||
}
|
||||
|
||||
n.changed = true
|
||||
n.sendPublishSignal()
|
||||
if n.parent != nil {
|
||||
err := n.parent.update(n.name, nnode)
|
||||
if err != nil {
|
||||
log.Critical("Error updating node: %s", err)
|
||||
// Can we panic, please?
|
||||
return nil, fuse.ENODATA
|
||||
}
|
||||
}
|
||||
n.Nd = nnode
|
||||
|
||||
n.wasChanged()
|
||||
|
||||
return child, nil
|
||||
}
|
||||
@ -405,7 +443,7 @@ func (n *Node) Mknod(req *fuse.MknodRequest, intr fs.Intr) (fs.Node, fuse.Error)
|
||||
}
|
||||
|
||||
func (n *Node) Open(req *fuse.OpenRequest, resp *fuse.OpenResponse, intr fs.Intr) (fs.Handle, fuse.Error) {
|
||||
log.Debug("[%s] Received open request! flags = %s", n.name, req.Flags.String())
|
||||
//log.Debug("[%s] Received open request! flags = %s", n.name, req.Flags.String())
|
||||
//TODO: check open flags and truncate if necessary
|
||||
return n, nil
|
||||
}
|
||||
@ -417,23 +455,46 @@ func (n *Node) Create(req *fuse.CreateRequest, resp *fuse.CreateResponse, intr f
|
||||
nd := &mdag.Node{Data: mdag.FilePBData(nil)}
|
||||
child := n.makeChild(req.Name, nd)
|
||||
|
||||
err := n.Nd.AddNodeLink(req.Name, nd)
|
||||
nnode := n.Nd.Copy()
|
||||
|
||||
err := nnode.AddNodeLink(req.Name, nd)
|
||||
if err != nil {
|
||||
log.Error("Error adding child to node: %s", err)
|
||||
return nil, nil, fuse.ENOENT
|
||||
}
|
||||
if n.parent != nil {
|
||||
err := n.parent.update(n.name, nnode)
|
||||
if err != nil {
|
||||
log.Critical("Error updating node: %s", err)
|
||||
// Can we panic, please?
|
||||
return nil, nil, fuse.ENODATA
|
||||
}
|
||||
}
|
||||
n.Nd = nnode
|
||||
n.wasChanged()
|
||||
|
||||
return child, child, nil
|
||||
}
|
||||
|
||||
func (n *Node) Remove(req *fuse.RemoveRequest, intr fs.Intr) fuse.Error {
|
||||
log.Debug("[%s] Got Remove request: %s", n.name, req.Name)
|
||||
err := n.Nd.RemoveNodeLink(req.Name)
|
||||
nnode := n.Nd.Copy()
|
||||
err := nnode.RemoveNodeLink(req.Name)
|
||||
if err != nil {
|
||||
log.Error("Remove: No such file.")
|
||||
return fuse.ENOENT
|
||||
}
|
||||
n.changed = true
|
||||
n.sendPublishSignal()
|
||||
|
||||
if n.parent != nil {
|
||||
err := n.parent.update(n.name, nnode)
|
||||
if err != nil {
|
||||
log.Critical("Error updating node: %s", err)
|
||||
// Can we panic, please?
|
||||
return fuse.ENODATA
|
||||
}
|
||||
}
|
||||
n.Nd = nnode
|
||||
n.wasChanged()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -464,3 +525,21 @@ func (n *Node) Rename(req *fuse.RenameRequest, newDir fs.Node, intr fs.Intr) fus
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *Node) update(name string, newnode *mdag.Node) error {
|
||||
nnode := n.Nd.Copy()
|
||||
err := nnode.RemoveNodeLink(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
nnode.AddNodeLink(name, newnode)
|
||||
|
||||
if n.parent != nil {
|
||||
err := n.parent.update(n.name, newnode)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
n.Nd = nnode
|
||||
return nil
|
||||
}
|
||||
|
@ -3,34 +3,45 @@ package ipns
|
||||
import "time"
|
||||
|
||||
type Republisher struct {
|
||||
Timeout time.Duration
|
||||
Publish chan struct{}
|
||||
node *Node
|
||||
TimeoutLong time.Duration
|
||||
TimeoutShort time.Duration
|
||||
Publish chan struct{}
|
||||
node *Node
|
||||
}
|
||||
|
||||
func NewRepublisher(n *Node, tout time.Duration) *Republisher {
|
||||
func NewRepublisher(n *Node, tshort, tlong time.Duration) *Republisher {
|
||||
return &Republisher{
|
||||
Timeout: tout,
|
||||
Publish: make(chan struct{}),
|
||||
node: n,
|
||||
TimeoutShort: tshort,
|
||||
TimeoutLong: tlong,
|
||||
Publish: make(chan struct{}),
|
||||
node: n,
|
||||
}
|
||||
}
|
||||
|
||||
func (np *Republisher) Run() {
|
||||
for _ = range np.Publish {
|
||||
timer := time.After(np.Timeout)
|
||||
quick := time.After(np.TimeoutShort)
|
||||
longer := time.After(np.TimeoutLong)
|
||||
for {
|
||||
select {
|
||||
case <-timer:
|
||||
case <-quick:
|
||||
//Do the publish!
|
||||
log.Info("Publishing Changes!")
|
||||
err := np.node.updateTree()
|
||||
err := np.node.republishRoot()
|
||||
if err != nil {
|
||||
log.Critical("updateTree error: %s", err)
|
||||
log.Critical("republishRoot error: %s", err)
|
||||
}
|
||||
goto done
|
||||
case <-longer:
|
||||
//Do the publish!
|
||||
log.Info("Publishing Changes!")
|
||||
err := np.node.republishRoot()
|
||||
if err != nil {
|
||||
log.Critical("republishRoot error: %s", err)
|
||||
}
|
||||
goto done
|
||||
case <-np.Publish:
|
||||
timer = time.After(np.Timeout)
|
||||
quick = time.After(np.TimeoutShort)
|
||||
}
|
||||
}
|
||||
done:
|
||||
|
@ -75,6 +75,16 @@ func (n *Node) RemoveNodeLink(name string) error {
|
||||
return u.ErrNotFound
|
||||
}
|
||||
|
||||
func (n *Node) Copy() *Node {
|
||||
nnode := new(Node)
|
||||
nnode.Data = make([]byte, len(n.Data))
|
||||
copy(nnode.Data, n.Data)
|
||||
|
||||
nnode.Links = make([]*Link, len(n.Links))
|
||||
copy(nnode.Links, n.Links)
|
||||
return nnode
|
||||
}
|
||||
|
||||
// Size returns the total size of the data addressed by node,
|
||||
// including the total sizes of references.
|
||||
func (n *Node) Size() (uint64, error) {
|
||||
|
14
util/util.go
14
util/util.go
@ -99,11 +99,15 @@ func DOut(format string, a ...interface{}) {
|
||||
func SetupLogging() {
|
||||
backend := logging.NewLogBackend(os.Stderr, "", 0)
|
||||
logging.SetBackend(backend)
|
||||
if Debug {
|
||||
logging.SetLevel(logging.DEBUG, "")
|
||||
} else {
|
||||
logging.SetLevel(logging.ERROR, "")
|
||||
}
|
||||
/*
|
||||
if Debug {
|
||||
logging.SetLevel(logging.DEBUG, "")
|
||||
} else {
|
||||
logging.SetLevel(logging.ERROR, "")
|
||||
}
|
||||
*/
|
||||
logging.SetLevel(logging.ERROR, "merkledag")
|
||||
logging.SetLevel(logging.ERROR, "blockservice")
|
||||
logging.SetFormatter(logging.MustStringFormatter(LogFormat))
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user