1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-09-10 14:34:24 +08:00

use mfs for adds

License: MIT
Signed-off-by: Jeromy <jeromyj@gmail.com>
This commit is contained in:
Jeromy
2015-12-04 14:25:13 -08:00
parent 4fdfbc7d32
commit 1efbc79223
5 changed files with 135 additions and 81 deletions

View File

@ -18,6 +18,7 @@ var ErrDepthLimitExceeded = fmt.Errorf("depth limit exceeded")
const ( const (
quietOptionName = "quiet" quietOptionName = "quiet"
silentOptionName = "silent"
progressOptionName = "progress" progressOptionName = "progress"
trickleOptionName = "trickle" trickleOptionName = "trickle"
wrapOptionName = "wrap-with-directory" wrapOptionName = "wrap-with-directory"
@ -44,6 +45,7 @@ remains to be implemented.
Options: []cmds.Option{ Options: []cmds.Option{
cmds.OptionRecursivePath, // a builtin option that allows recursive paths (-r, --recursive) cmds.OptionRecursivePath, // a builtin option that allows recursive paths (-r, --recursive)
cmds.BoolOption(quietOptionName, "q", "Write minimal output"), cmds.BoolOption(quietOptionName, "q", "Write minimal output"),
cmds.BoolOption(silentOptionName, "x", "Write no output"),
cmds.BoolOption(progressOptionName, "p", "Stream progress data"), cmds.BoolOption(progressOptionName, "p", "Stream progress data"),
cmds.BoolOption(trickleOptionName, "t", "Use trickle-dag format for dag generation"), cmds.BoolOption(trickleOptionName, "t", "Use trickle-dag format for dag generation"),
cmds.BoolOption(onlyHashOptionName, "n", "Only chunk and hash - do not write to disk"), cmds.BoolOption(onlyHashOptionName, "n", "Only chunk and hash - do not write to disk"),
@ -59,6 +61,9 @@ remains to be implemented.
req.SetOption(progressOptionName, true) req.SetOption(progressOptionName, true)
log.Error("SKIPPING SIZE")
return nil
sizeFile, ok := req.Files().(files.SizeFile) sizeFile, ok := req.Files().(files.SizeFile)
if !ok { if !ok {
// we don't need to error, the progress bar just won't know how big the files are // we don't need to error, the progress bar just won't know how big the files are
@ -100,6 +105,7 @@ remains to be implemented.
wrap, _, _ := req.Option(wrapOptionName).Bool() wrap, _, _ := req.Option(wrapOptionName).Bool()
hash, _, _ := req.Option(onlyHashOptionName).Bool() hash, _, _ := req.Option(onlyHashOptionName).Bool()
hidden, _, _ := req.Option(hiddenOptionName).Bool() hidden, _, _ := req.Option(hiddenOptionName).Bool()
silent, _, _ := req.Option(silentOptionName).Bool()
chunker, _, _ := req.Option(chunkerOptionName).String() chunker, _, _ := req.Option(chunkerOptionName).String()
dopin, pin_found, _ := req.Option(pinOptionName).Bool() dopin, pin_found, _ := req.Option(pinOptionName).Bool()
@ -123,13 +129,18 @@ remains to be implemented.
outChan := make(chan interface{}, 8) outChan := make(chan interface{}, 8)
res.SetOutput((<-chan interface{})(outChan)) res.SetOutput((<-chan interface{})(outChan))
fileAdder := coreunix.NewAdder(req.Context(), n, outChan) fileAdder, err := coreunix.NewAdder(req.Context(), n, outChan)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
fileAdder.Chunker = chunker fileAdder.Chunker = chunker
fileAdder.Progress = progress fileAdder.Progress = progress
fileAdder.Hidden = hidden fileAdder.Hidden = hidden
fileAdder.Trickle = trickle fileAdder.Trickle = trickle
fileAdder.Wrap = wrap fileAdder.Wrap = wrap
fileAdder.Pin = dopin fileAdder.Pin = dopin
fileAdder.Silent = silent
// addAllFiles loops over a convenience slice file to // addAllFiles loops over a convenience slice file to
// add each file individually. e.g. 'ipfs add a b c' // add each file individually. e.g. 'ipfs add a b c'
@ -143,7 +154,7 @@ remains to be implemented.
return nil // done return nil // done
} }
if _, err := fileAdder.AddFile(file); err != nil { if err := fileAdder.AddFile(file); err != nil {
return err return err
} }
} }
@ -159,9 +170,8 @@ remains to be implemented.
} }
// copy intermediary nodes from editor to our actual dagservice // copy intermediary nodes from editor to our actual dagservice
_, err := fileAdder.Finalize(n.DAG) _, err := fileAdder.Finalize()
if err != nil { if err != nil {
log.Error("WRITE OUT: ", err)
return err return err
} }
@ -194,7 +204,13 @@ remains to be implemented.
return return
} }
showProgressBar := !quiet progress, _, err := req.Option(progressOptionName).Bool()
if err != nil {
res.SetError(u.ErrCast(), cmds.ErrNormal)
return
}
showProgressBar := !quiet || progress
var bar *pb.ProgressBar var bar *pb.ProgressBar
var terminalWidth int var terminalWidth int

View File

@ -15,7 +15,7 @@ import (
"github.com/ipfs/go-ipfs/exchange/offline" "github.com/ipfs/go-ipfs/exchange/offline"
importer "github.com/ipfs/go-ipfs/importer" importer "github.com/ipfs/go-ipfs/importer"
"github.com/ipfs/go-ipfs/importer/chunk" "github.com/ipfs/go-ipfs/importer/chunk"
dagutils "github.com/ipfs/go-ipfs/merkledag/utils" mfs "github.com/ipfs/go-ipfs/mfs"
"github.com/ipfs/go-ipfs/pin" "github.com/ipfs/go-ipfs/pin"
"github.com/ipfs/go-ipfs/commands/files" "github.com/ipfs/go-ipfs/commands/files"
@ -62,12 +62,16 @@ type AddedObject struct {
Bytes int64 `json:",omitempty"` Bytes int64 `json:",omitempty"`
} }
func NewAdder(ctx context.Context, n *core.IpfsNode, out chan interface{}) *Adder { func NewAdder(ctx context.Context, n *core.IpfsNode, out chan interface{}) (*Adder, error) {
e := dagutils.NewDagEditor(newDirNode(), nil) mr, err := mfs.NewRoot(ctx, n.DAG, newDirNode(), nil)
if err != nil {
return nil, err
}
return &Adder{ return &Adder{
mr: mr,
ctx: ctx, ctx: ctx,
node: n, node: n,
editor: e,
out: out, out: out,
Progress: false, Progress: false,
Hidden: true, Hidden: true,
@ -75,22 +79,23 @@ func NewAdder(ctx context.Context, n *core.IpfsNode, out chan interface{}) *Adde
Trickle: false, Trickle: false,
Wrap: false, Wrap: false,
Chunker: "", Chunker: "",
} }, nil
} }
// Internal structure for holding the switches passed to the `add` call // Internal structure for holding the switches passed to the `add` call
type Adder struct { type Adder struct {
ctx context.Context ctx context.Context
node *core.IpfsNode node *core.IpfsNode
editor *dagutils.Editor
out chan interface{} out chan interface{}
Progress bool Progress bool
Hidden bool Hidden bool
Pin bool Pin bool
Trickle bool Trickle bool
Silent bool
Wrap bool Wrap bool
Chunker string Chunker string
root *dag.Node root *dag.Node
mr *mfs.Root
} }
// Perform the actual add & pin locally, outputting results to reader // Perform the actual add & pin locally, outputting results to reader
@ -113,26 +118,29 @@ func (params Adder) add(reader io.Reader) (*dag.Node, error) {
} }
func (params *Adder) RootNode() (*dag.Node, error) { func (params *Adder) RootNode() (*dag.Node, error) {
// for memoizing return params.mr.GetValue().GetNode()
if params.root != nil { /*
return params.root, nil // for memoizing
} if params.root != nil {
return params.root, nil
}
root := params.editor.GetNode() root := params.editor.GetNode()
// if not wrapping, AND one root file, use that hash as root. // if not wrapping, AND one root file, use that hash as root.
if !params.Wrap && len(root.Links) == 1 { if !params.Wrap && len(root.Links) == 1 {
var err error var err error
root, err = root.Links[0].GetNode(params.ctx, params.editor.GetDagService()) root, err = root.Links[0].GetNode(params.ctx, params.editor.GetDagService())
params.root = root
// no need to output, as we've already done so.
return root, err
}
// otherwise need to output, as we have not.
err := outputDagnode(params.out, "", root)
params.root = root params.root = root
// no need to output, as we've already done so.
return root, err return root, err
} */
// otherwise need to output, as we have not.
err := outputDagnode(params.out, "", root)
params.root = root
return root, err
} }
func (params *Adder) PinRoot() error { func (params *Adder) PinRoot() error {
@ -153,8 +161,8 @@ func (params *Adder) PinRoot() error {
return params.node.Pinning.Flush() return params.node.Pinning.Flush()
} }
func (params *Adder) Finalize(DAG dag.DAGService) (*dag.Node, error) { func (params *Adder) Finalize() (*dag.Node, error) {
return params.editor.Finalize(DAG) return params.mr.GetValue().GetNode()
} }
// Add builds a merkledag from the a reader, pinning all objects to the local // Add builds a merkledag from the a reader, pinning all objects to the local
@ -163,7 +171,10 @@ func Add(n *core.IpfsNode, r io.Reader) (string, error) {
unlock := n.Blockstore.PinLock() unlock := n.Blockstore.PinLock()
defer unlock() defer unlock()
fileAdder := NewAdder(n.Context(), n, nil) fileAdder, err := NewAdder(n.Context(), n, nil)
if err != nil {
return "", err
}
node, err := fileAdder.add(r) node, err := fileAdder.add(r)
if err != nil { if err != nil {
@ -193,14 +204,22 @@ func AddR(n *core.IpfsNode, root string) (key string, err error) {
} }
defer f.Close() defer f.Close()
fileAdder := NewAdder(n.Context(), n, nil) fileAdder, err := NewAdder(n.Context(), n, nil)
dagnode, err := fileAdder.AddFile(f)
if err != nil { if err != nil {
return "", err return "", err
} }
k, err := dagnode.Key() err = fileAdder.AddFile(f)
if err != nil {
return "", err
}
nd, err := fileAdder.Finalize()
if err != nil {
return "", err
}
k, err := nd.Key()
if err != nil { if err != nil {
return "", err return "", err
} }
@ -215,18 +234,29 @@ func AddR(n *core.IpfsNode, root string) (key string, err error) {
func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, *dag.Node, error) { func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, *dag.Node, error) {
file := files.NewReaderFile(filename, filename, ioutil.NopCloser(r), nil) file := files.NewReaderFile(filename, filename, ioutil.NopCloser(r), nil)
dir := files.NewSliceFile("", "", []files.File{file}) dir := files.NewSliceFile("", "", []files.File{file})
fileAdder := NewAdder(n.Context(), n, nil) fileAdder, err := NewAdder(n.Context(), n, nil)
unlock := n.Blockstore.PinLock()
defer unlock()
dagnode, err := fileAdder.addDir(dir)
if err != nil { if err != nil {
return "", nil, err return "", nil, err
} }
unlock := n.Blockstore.PinLock()
defer unlock()
err = fileAdder.addDir(dir)
if err != nil {
return "", nil, err
}
dagnode, err := fileAdder.Finalize()
if err != nil {
return "", nil, err
}
k, err := dagnode.Key() k, err := dagnode.Key()
if err != nil { if err != nil {
return "", nil, err return "", nil, err
} }
return gopath.Join(k.String(), filename), dagnode, nil return gopath.Join(k.String(), filename), dagnode, nil
} }
@ -241,19 +271,22 @@ func (params *Adder) addNode(node *dag.Node, path string) error {
path = key.Pretty() path = key.Pretty()
} }
if err := params.editor.InsertNodeAtPath(params.ctx, path, node, newDirNode); err != nil { if err := mfs.PutNode(params.mr, path, node); err != nil {
return err return err
} }
return outputDagnode(params.out, path, node) if !params.Silent {
return outputDagnode(params.out, path, node)
}
return nil
} }
// Add the given file while respecting the params. // Add the given file while respecting the params.
func (params *Adder) AddFile(file files.File) (*dag.Node, error) { func (params *Adder) AddFile(file files.File) error {
switch { switch {
case files.IsHidden(file) && !params.Hidden: case files.IsHidden(file) && !params.Hidden:
log.Debugf("%s is hidden, skipping", file.FileName()) log.Debugf("%s is hidden, skipping", file.FileName())
return nil, &hiddenFileError{file.FileName()} return &hiddenFileError{file.FileName()}
case file.IsDirectory(): case file.IsDirectory():
return params.addDir(file) return params.addDir(file)
} }
@ -262,17 +295,16 @@ func (params *Adder) AddFile(file files.File) (*dag.Node, error) {
if s, ok := file.(*files.Symlink); ok { if s, ok := file.(*files.Symlink); ok {
sdata, err := unixfs.SymlinkData(s.Target) sdata, err := unixfs.SymlinkData(s.Target)
if err != nil { if err != nil {
return nil, err return err
} }
dagnode := &dag.Node{Data: sdata} dagnode := &dag.Node{Data: sdata}
_, err = params.node.DAG.Add(dagnode) _, err = params.node.DAG.Add(dagnode)
if err != nil { if err != nil {
return nil, err return err
} }
err = params.addNode(dagnode, s.FileName()) return params.addNode(dagnode, s.FileName())
return dagnode, err
} }
// case for regular file // case for regular file
@ -285,52 +317,40 @@ func (params *Adder) AddFile(file files.File) (*dag.Node, error) {
dagnode, err := params.add(reader) dagnode, err := params.add(reader)
if err != nil { if err != nil {
return nil, err return err
} }
// patch it into the root // patch it into the root
log.Infof("adding file: %s", file.FileName()) return params.addNode(dagnode, file.FileName())
err = params.addNode(dagnode, file.FileName())
return dagnode, err
} }
func (params *Adder) addDir(dir files.File) (*dag.Node, error) { func (params *Adder) addDir(dir files.File) error {
tree := newDirNode()
log.Infof("adding directory: %s", dir.FileName()) log.Infof("adding directory: %s", dir.FileName())
err := mfs.Mkdir(params.mr, dir.FileName(), true)
if err != nil {
return err
}
for { for {
file, err := dir.NextFile() file, err := dir.NextFile()
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
return nil, err return err
} }
if file == nil { if file == nil {
break break
} }
node, err := params.AddFile(file) err = params.AddFile(file)
if _, ok := err.(*hiddenFileError); ok { if _, ok := err.(*hiddenFileError); ok {
// hidden file error, skip file // hidden file error, skip file
continue continue
} else if err != nil { } else if err != nil {
return nil, err return err
}
name := gopath.Base(file.FileName())
if err := tree.AddNodeLinkClean(name, node); err != nil {
return nil, err
} }
} }
if err := params.addNode(tree, dir.FileName()); err != nil { return nil
return nil, err
}
if _, err := params.node.DAG.Add(tree); err != nil {
return nil, err
}
return tree, nil
} }
// outputDagnode sends dagnode info over the output channel // outputDagnode sends dagnode info over the output channel
@ -379,7 +399,7 @@ func getOutput(dagnode *dag.Node) (*Object, error) {
for i, link := range dagnode.Links { for i, link := range dagnode.Links {
output.Links[i] = Link{ output.Links[i] = Link{
Name: link.Name, Name: link.Name,
Hash: link.Hash.B58String(), //Hash: link.Hash.B58String(),
Size: link.Size, Size: link.Size,
} }
} }

View File

@ -89,7 +89,7 @@ func (bs *Bitswap) provideWorker(px process.Process) {
defer cancel() defer cancel()
if err := bs.network.Provide(ctx, k); err != nil { if err := bs.network.Provide(ctx, k); err != nil {
log.Error(err) //log.Error(err)
} }
} }

View File

@ -3,6 +3,7 @@ package merkledag
import ( import (
"fmt" "fmt"
"time"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
blocks "github.com/ipfs/go-ipfs/blocks" blocks "github.com/ipfs/go-ipfs/blocks"
@ -48,6 +49,14 @@ func (n *dagService) Add(nd *Node) (key.Key, error) {
if n == nil { // FIXME remove this assertion. protect with constructor invariant if n == nil { // FIXME remove this assertion. protect with constructor invariant
return "", fmt.Errorf("dagService is nil") return "", fmt.Errorf("dagService is nil")
} }
/*
start := time.Now()
defer func() {
took := time.Now().Sub(start)
log.Error("add took: %s", took)
}()
*/
_ = time.Saturday
d, err := nd.Encoded(false) d, err := nd.Encoded(false)
if err != nil { if err != nil {

View File

@ -71,14 +71,18 @@ func NewRoot(parent context.Context, ds dag.DAGService, node *dag.Node, pf PubFu
return nil, err return nil, err
} }
root := &Root{ var repub *Republisher
node: node, if pf != nil {
repub: NewRepublisher(parent, pf, time.Millisecond*300, time.Second*3), repub = NewRepublisher(parent, pf, time.Millisecond*300, time.Second*3)
dserv: ds, repub.setVal(ndk)
go repub.Run()
} }
root.repub.setVal(ndk) root := &Root{
go root.repub.Run() node: node,
repub: repub,
dserv: ds,
}
pbn, err := ft.FromBytes(node.Data) pbn, err := ft.FromBytes(node.Data)
if err != nil { if err != nil {
@ -113,12 +117,17 @@ func (kr *Root) closeChild(name string, nd *dag.Node) error {
return err return err
} }
kr.repub.Update(k) if kr.repub != nil {
kr.repub.Update(k)
}
return nil return nil
} }
func (kr *Root) Close() error { func (kr *Root) Close() error {
return kr.repub.Close() if kr.repub != nil {
return kr.repub.Close()
}
return nil
} }
// Republisher manages when to publish a given entry // Republisher manages when to publish a given entry