1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-28 00:39:31 +08:00

Merge pull request #1274 from ipfs/refactor/importer

change pinning to happen in a callback
This commit is contained in:
Juan Batiz-Benet
2015-05-29 17:06:11 -07:00
12 changed files with 105 additions and 48 deletions

View File

@ -7,7 +7,6 @@ import (
"strings" "strings"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/cheggaaa/pb" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/cheggaaa/pb"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
cmds "github.com/ipfs/go-ipfs/commands" cmds "github.com/ipfs/go-ipfs/commands"
files "github.com/ipfs/go-ipfs/commands/files" files "github.com/ipfs/go-ipfs/commands/files"
@ -16,6 +15,7 @@ import (
importer "github.com/ipfs/go-ipfs/importer" importer "github.com/ipfs/go-ipfs/importer"
"github.com/ipfs/go-ipfs/importer/chunk" "github.com/ipfs/go-ipfs/importer/chunk"
dag "github.com/ipfs/go-ipfs/merkledag" dag "github.com/ipfs/go-ipfs/merkledag"
pin "github.com/ipfs/go-ipfs/pin"
ft "github.com/ipfs/go-ipfs/unixfs" ft "github.com/ipfs/go-ipfs/unixfs"
u "github.com/ipfs/go-ipfs/util" u "github.com/ipfs/go-ipfs/util"
) )
@ -113,12 +113,16 @@ remains to be implemented.
return return
} }
err = n.Pinning.Pin(context.Background(), rootnd, true) rnk, err := rootnd.Key()
if err != nil { if err != nil {
res.SetError(err, cmds.ErrNormal) res.SetError(err, cmds.ErrNormal)
return return
} }
mp := n.Pinning.GetManual()
mp.RemovePinWithMode(rnk, pin.Indirect)
mp.PinWithMode(rnk, pin.Recursive)
err = n.Pinning.Flush() err = n.Pinning.Flush()
if err != nil { if err != nil {
res.SetError(err, cmds.ErrNormal) res.SetError(err, cmds.ErrNormal)
@ -214,7 +218,12 @@ remains to be implemented.
} }
func add(n *core.IpfsNode, reader io.Reader) (*dag.Node, error) { func add(n *core.IpfsNode, reader io.Reader) (*dag.Node, error) {
node, err := importer.BuildDagFromReader(reader, n.DAG, nil, chunk.DefaultSplitter) node, err := importer.BuildDagFromReader(
reader,
n.DAG,
chunk.DefaultSplitter,
importer.PinIndirectCB(n.Pinning.GetManual()),
)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -290,11 +299,13 @@ func addDir(n *core.IpfsNode, dir files.File, out chan interface{}, progress boo
return nil, err return nil, err
} }
_, err = n.DAG.Add(tree) k, err := n.DAG.Add(tree)
if err != nil { if err != nil {
return nil, err return nil, err
} }
n.Pinning.GetManual().PinWithMode(k, pin.Indirect)
return tree, nil return tree, nil
} }

View File

@ -72,7 +72,7 @@ func (i *gatewayHandler) newDagFromReader(r io.Reader) (*dag.Node, error) {
// TODO(cryptix): change and remove this helper once PR1136 is merged // TODO(cryptix): change and remove this helper once PR1136 is merged
// return ufs.AddFromReader(i.node, r.Body) // return ufs.AddFromReader(i.node, r.Body)
return importer.BuildDagFromReader( return importer.BuildDagFromReader(
r, i.node.DAG, i.node.Pinning.GetManual(), chunk.DefaultSplitter) r, i.node.DAG, chunk.DefaultSplitter, importer.BasicPinnerCB(i.node.Pinning.GetManual()))
} }
// TODO(btc): break this apart into separate handlers using a more expressive muxer // TODO(btc): break this apart into separate handlers using a more expressive muxer

View File

@ -1,7 +1,6 @@
package coreunix package coreunix
import ( import (
"errors"
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
@ -29,15 +28,12 @@ func Add(n *core.IpfsNode, r io.Reader) (string, error) {
dagNode, err := importer.BuildDagFromReader( dagNode, err := importer.BuildDagFromReader(
r, r,
n.DAG, n.DAG,
n.Pinning.GetManual(), // Fix this interface
chunk.DefaultSplitter, chunk.DefaultSplitter,
importer.BasicPinnerCB(n.Pinning.GetManual()),
) )
if err != nil { if err != nil {
return "", err return "", err
} }
if err := n.Pinning.Flush(); err != nil {
return "", err
}
k, err := dagNode.Key() k, err := dagNode.Key()
if err != nil { if err != nil {
return "", err return "", err
@ -53,18 +49,28 @@ func AddR(n *core.IpfsNode, root string) (key string, err error) {
return "", err return "", err
} }
defer f.Close() defer f.Close()
ff, err := files.NewSerialFile(root, f) ff, err := files.NewSerialFile(root, f)
if err != nil { if err != nil {
return "", err return "", err
} }
dagnode, err := addFile(n, ff) dagnode, err := addFile(n, ff)
if err != nil { if err != nil {
return "", err return "", err
} }
k, err := dagnode.Key() k, err := dagnode.Key()
if err != nil { if err != nil {
return "", err return "", err
} }
n.Pinning.GetManual().RemovePinWithMode(k, pin.Indirect)
err = n.Pinning.Flush()
if err != nil {
return "", err
}
return k.String(), nil return k.String(), nil
} }
@ -87,17 +93,14 @@ func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, *merkle
} }
func add(n *core.IpfsNode, reader io.Reader) (*merkledag.Node, error) { func add(n *core.IpfsNode, reader io.Reader) (*merkledag.Node, error) {
mp, ok := n.Pinning.(pin.ManualPinner) mp := n.Pinning.GetManual()
if !ok {
return nil, errors.New("invalid pinner type! expected manual pinner")
}
node, err := importer.BuildDagFromReader(reader, n.DAG, mp, chunk.DefaultSplitter) node, err := importer.BuildDagFromReader(
if err != nil { reader,
return nil, err n.DAG,
} chunk.DefaultSplitter,
importer.PinIndirectCB(mp),
err = n.Pinning.Flush() )
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -37,7 +37,7 @@ func TestMetadata(t *testing.T) {
data := make([]byte, 1000) data := make([]byte, 1000)
u.NewTimeSeededRand().Read(data) u.NewTimeSeededRand().Read(data)
r := bytes.NewReader(data) r := bytes.NewReader(data)
nd, err := importer.BuildDagFromReader(r, ds, nil, chunk.DefaultSplitter) nd, err := importer.BuildDagFromReader(r, ds, chunk.DefaultSplitter, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -34,7 +34,7 @@ func randObj(t *testing.T, nd *core.IpfsNode, size int64) (*dag.Node, []byte) {
buf := make([]byte, size) buf := make([]byte, size)
u.NewTimeSeededRand().Read(buf) u.NewTimeSeededRand().Read(buf)
read := bytes.NewReader(buf) read := bytes.NewReader(buf)
obj, err := importer.BuildTrickleDagFromReader(read, nd.DAG, nil, chunk.DefaultSplitter) obj, err := importer.BuildTrickleDagFromReader(read, nd.DAG, chunk.DefaultSplitter, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -5,6 +5,14 @@ import (
"github.com/ipfs/go-ipfs/pin" "github.com/ipfs/go-ipfs/pin"
) )
// NodeCB is callback function for dag generation
// the `last` flag signifies whether or not this is the last
// (top-most root) node being added. useful for things like
// only pinning the first node recursively.
type NodeCB func(node *dag.Node, last bool) error
var nilFunc NodeCB = func(_ *dag.Node, _ bool) error { return nil }
// DagBuilderHelper wraps together a bunch of objects needed to // DagBuilderHelper wraps together a bunch of objects needed to
// efficiently create unixfs dag trees // efficiently create unixfs dag trees
type DagBuilderHelper struct { type DagBuilderHelper struct {
@ -13,6 +21,7 @@ type DagBuilderHelper struct {
in <-chan []byte in <-chan []byte
nextData []byte // the next item to return. nextData []byte // the next item to return.
maxlinks int maxlinks int
ncb NodeCB
} }
type DagBuilderParams struct { type DagBuilderParams struct {
@ -22,18 +31,23 @@ type DagBuilderParams struct {
// DAGService to write blocks to (required) // DAGService to write blocks to (required)
Dagserv dag.DAGService Dagserv dag.DAGService
// Pinner to use for pinning files (optionally nil) // Callback for each block added
Pinner pin.ManualPinner NodeCB NodeCB
} }
// Generate a new DagBuilderHelper from the given params, using 'in' as a // Generate a new DagBuilderHelper from the given params, using 'in' as a
// data source // data source
func (dbp *DagBuilderParams) New(in <-chan []byte) *DagBuilderHelper { func (dbp *DagBuilderParams) New(in <-chan []byte) *DagBuilderHelper {
ncb := dbp.NodeCB
if ncb == nil {
ncb = nilFunc
}
return &DagBuilderHelper{ return &DagBuilderHelper{
dserv: dbp.Dagserv, dserv: dbp.Dagserv,
mp: dbp.Pinner,
in: in, in: in,
maxlinks: dbp.Maxlinks, maxlinks: dbp.Maxlinks,
ncb: ncb,
} }
} }
@ -125,17 +139,15 @@ func (db *DagBuilderHelper) Add(node *UnixfsNode) (*dag.Node, error) {
return nil, err return nil, err
} }
key, err := db.dserv.Add(dn) _, err = db.dserv.Add(dn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if db.mp != nil { // node callback
db.mp.PinWithMode(key, pin.Recursive) err = db.ncb(dn, true)
err := db.mp.Flush() if err != nil {
if err != nil { return nil, err
return nil, err
}
} }
return dn, nil return dn, nil

View File

@ -107,14 +107,15 @@ func (n *UnixfsNode) AddChild(child *UnixfsNode, db *DagBuilderHelper) error {
return err return err
} }
childkey, err := db.dserv.Add(childnode) _, err = db.dserv.Add(childnode)
if err != nil { if err != nil {
return err return err
} }
// Pin the child node indirectly // Pin the child node indirectly
if db.mp != nil { err = db.ncb(childnode, false)
db.mp.PinWithMode(childkey, pin.Indirect) if err != nil {
return err
} }
return nil return nil

View File

@ -13,10 +13,10 @@ import (
trickle "github.com/ipfs/go-ipfs/importer/trickle" trickle "github.com/ipfs/go-ipfs/importer/trickle"
dag "github.com/ipfs/go-ipfs/merkledag" dag "github.com/ipfs/go-ipfs/merkledag"
"github.com/ipfs/go-ipfs/pin" "github.com/ipfs/go-ipfs/pin"
"github.com/ipfs/go-ipfs/util" u "github.com/ipfs/go-ipfs/util"
) )
var log = util.Logger("importer") var log = u.Logger("importer")
// Builds a DAG from the given file, writing created blocks to disk as they are // Builds a DAG from the given file, writing created blocks to disk as they are
// created // created
@ -36,31 +36,60 @@ func BuildDagFromFile(fpath string, ds dag.DAGService, mp pin.ManualPinner) (*da
} }
defer f.Close() defer f.Close()
return BuildDagFromReader(f, ds, mp, chunk.DefaultSplitter) return BuildDagFromReader(f, ds, chunk.DefaultSplitter, BasicPinnerCB(mp))
} }
func BuildDagFromReader(r io.Reader, ds dag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*dag.Node, error) { func BuildDagFromReader(r io.Reader, ds dag.DAGService, spl chunk.BlockSplitter, ncb h.NodeCB) (*dag.Node, error) {
// Start the splitter // Start the splitter
blkch := spl.Split(r) blkch := spl.Split(r)
dbp := h.DagBuilderParams{ dbp := h.DagBuilderParams{
Dagserv: ds, Dagserv: ds,
Maxlinks: h.DefaultLinksPerBlock, Maxlinks: h.DefaultLinksPerBlock,
Pinner: mp, NodeCB: ncb,
} }
return bal.BalancedLayout(dbp.New(blkch)) return bal.BalancedLayout(dbp.New(blkch))
} }
func BuildTrickleDagFromReader(r io.Reader, ds dag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*dag.Node, error) { func BuildTrickleDagFromReader(r io.Reader, ds dag.DAGService, spl chunk.BlockSplitter, ncb h.NodeCB) (*dag.Node, error) {
// Start the splitter // Start the splitter
blkch := spl.Split(r) blkch := spl.Split(r)
dbp := h.DagBuilderParams{ dbp := h.DagBuilderParams{
Dagserv: ds, Dagserv: ds,
Maxlinks: h.DefaultLinksPerBlock, Maxlinks: h.DefaultLinksPerBlock,
Pinner: mp, NodeCB: ncb,
} }
return trickle.TrickleLayout(dbp.New(blkch)) return trickle.TrickleLayout(dbp.New(blkch))
} }
func BasicPinnerCB(p pin.ManualPinner) h.NodeCB {
return func(n *dag.Node, last bool) error {
k, err := n.Key()
if err != nil {
return err
}
if last {
p.PinWithMode(k, pin.Recursive)
return p.Flush()
} else {
p.PinWithMode(k, pin.Indirect)
return nil
}
}
}
func PinIndirectCB(p pin.ManualPinner) h.NodeCB {
return func(n *dag.Node, last bool) error {
k, err := n.Key()
if err != nil {
return err
}
p.PinWithMode(k, pin.Indirect)
return nil
}
}

View File

@ -17,7 +17,7 @@ import (
func getBalancedDag(t testing.TB, size int64, blksize int) (*dag.Node, dag.DAGService) { func getBalancedDag(t testing.TB, size int64, blksize int) (*dag.Node, dag.DAGService) {
ds := mdtest.Mock(t) ds := mdtest.Mock(t)
r := io.LimitReader(u.NewTimeSeededRand(), size) r := io.LimitReader(u.NewTimeSeededRand(), size)
nd, err := BuildDagFromReader(r, ds, nil, &chunk.SizeSplitter{blksize}) nd, err := BuildDagFromReader(r, ds, &chunk.SizeSplitter{blksize}, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -27,7 +27,7 @@ func getBalancedDag(t testing.TB, size int64, blksize int) (*dag.Node, dag.DAGSe
func getTrickleDag(t testing.TB, size int64, blksize int) (*dag.Node, dag.DAGService) { func getTrickleDag(t testing.TB, size int64, blksize int) (*dag.Node, dag.DAGService) {
ds := mdtest.Mock(t) ds := mdtest.Mock(t)
r := io.LimitReader(u.NewTimeSeededRand(), size) r := io.LimitReader(u.NewTimeSeededRand(), size)
nd, err := BuildTrickleDagFromReader(r, ds, nil, &chunk.SizeSplitter{blksize}) nd, err := BuildTrickleDagFromReader(r, ds, &chunk.SizeSplitter{blksize}, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -40,7 +40,7 @@ func TestBalancedDag(t *testing.T) {
u.NewTimeSeededRand().Read(buf) u.NewTimeSeededRand().Read(buf)
r := bytes.NewReader(buf) r := bytes.NewReader(buf)
nd, err := BuildDagFromReader(r, ds, nil, chunk.DefaultSplitter) nd, err := BuildDagFromReader(r, ds, chunk.DefaultSplitter, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -156,7 +156,7 @@ func runBatchFetchTest(t *testing.T, read io.Reader) {
spl := &chunk.SizeSplitter{512} spl := &chunk.SizeSplitter{512}
root, err := imp.BuildDagFromReader(read, dagservs[0], nil, spl) root, err := imp.BuildDagFromReader(read, dagservs[0], spl, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -11,6 +11,7 @@ import (
mh "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash" mh "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
imp "github.com/ipfs/go-ipfs/importer"
chunk "github.com/ipfs/go-ipfs/importer/chunk" chunk "github.com/ipfs/go-ipfs/importer/chunk"
help "github.com/ipfs/go-ipfs/importer/helpers" help "github.com/ipfs/go-ipfs/importer/helpers"
trickle "github.com/ipfs/go-ipfs/importer/trickle" trickle "github.com/ipfs/go-ipfs/importer/trickle"
@ -308,7 +309,7 @@ func (dm *DagModifier) appendData(node *mdag.Node, blks <-chan []byte) (*mdag.No
dbp := &help.DagBuilderParams{ dbp := &help.DagBuilderParams{
Dagserv: dm.dagserv, Dagserv: dm.dagserv,
Maxlinks: help.DefaultLinksPerBlock, Maxlinks: help.DefaultLinksPerBlock,
Pinner: dm.mp, NodeCB: imp.BasicPinnerCB(dm.mp),
} }
return trickle.TrickleAppend(node, dbp.New(blks)) return trickle.TrickleAppend(node, dbp.New(blks))

View File

@ -52,7 +52,7 @@ func getMockDagServAndBstore(t testing.TB) (mdag.DAGService, blockstore.Blocksto
func getNode(t testing.TB, dserv mdag.DAGService, size int64, pinner pin.ManualPinner) ([]byte, *mdag.Node) { func getNode(t testing.TB, dserv mdag.DAGService, size int64, pinner pin.ManualPinner) ([]byte, *mdag.Node) {
in := io.LimitReader(u.NewTimeSeededRand(), size) in := io.LimitReader(u.NewTimeSeededRand(), size)
node, err := imp.BuildTrickleDagFromReader(in, dserv, pinner, &chunk.SizeSplitter{500}) node, err := imp.BuildTrickleDagFromReader(in, dserv, &chunk.SizeSplitter{500}, imp.BasicPinnerCB(pinner))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }