1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-09-10 05:52:20 +08:00

change pinning to happen in a callback

This commit is contained in:
Jeromy
2015-05-19 08:52:30 -07:00
parent b3e73c62fe
commit dd928a2b1d
12 changed files with 94 additions and 46 deletions

View File

@ -3,8 +3,13 @@ package helpers
import (
dag "github.com/ipfs/go-ipfs/merkledag"
"github.com/ipfs/go-ipfs/pin"
u "github.com/ipfs/go-ipfs/util"
)
type BlockCB func(u.Key, bool) error
var nilFunc BlockCB = func(_ u.Key, _ bool) error { return nil }
// DagBuilderHelper wraps together a bunch of objects needed to
// efficiently create unixfs dag trees
type DagBuilderHelper struct {
@ -13,6 +18,7 @@ type DagBuilderHelper struct {
in <-chan []byte
nextData []byte // the next item to return.
maxlinks int
bcb BlockCB
}
type DagBuilderParams struct {
@ -22,18 +28,23 @@ type DagBuilderParams struct {
// DAGService to write blocks to (required)
Dagserv dag.DAGService
// Pinner to use for pinning files (optionally nil)
Pinner pin.ManualPinner
// Callback for each block added
BlockCB BlockCB
}
// Generate a new DagBuilderHelper from the given params, using 'in' as a
// data source
func (dbp *DagBuilderParams) New(in <-chan []byte) *DagBuilderHelper {
bcb := dbp.BlockCB
if bcb == nil {
bcb = nilFunc
}
return &DagBuilderHelper{
dserv: dbp.Dagserv,
mp: dbp.Pinner,
in: in,
maxlinks: dbp.Maxlinks,
bcb: bcb,
}
}
@ -130,12 +141,10 @@ func (db *DagBuilderHelper) Add(node *UnixfsNode) (*dag.Node, error) {
return nil, err
}
if db.mp != nil {
db.mp.PinWithMode(key, pin.Recursive)
err := db.mp.Flush()
if err != nil {
return nil, err
}
// block callback
err = db.bcb(key, true)
if err != nil {
return nil, err
}
return dn, nil

View File

@ -113,8 +113,9 @@ func (n *UnixfsNode) AddChild(child *UnixfsNode, db *DagBuilderHelper) error {
}
// Pin the child node indirectly
if db.mp != nil {
db.mp.PinWithMode(childkey, pin.Indirect)
err = db.bcb(childkey, false)
if err != nil {
return err
}
return nil

View File

@ -13,10 +13,10 @@ import (
trickle "github.com/ipfs/go-ipfs/importer/trickle"
dag "github.com/ipfs/go-ipfs/merkledag"
"github.com/ipfs/go-ipfs/pin"
"github.com/ipfs/go-ipfs/util"
u "github.com/ipfs/go-ipfs/util"
)
var log = util.Logger("importer")
var log = u.Logger("importer")
// Builds a DAG from the given file, writing created blocks to disk as they are
// created
@ -36,31 +36,50 @@ func BuildDagFromFile(fpath string, ds dag.DAGService, mp pin.ManualPinner) (*da
}
defer f.Close()
return BuildDagFromReader(f, ds, mp, chunk.DefaultSplitter)
return BuildDagFromReader(f, ds, chunk.DefaultSplitter, BasicPinnerCB(mp))
}
func BuildDagFromReader(r io.Reader, ds dag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*dag.Node, error) {
func BuildDagFromReader(r io.Reader, ds dag.DAGService, spl chunk.BlockSplitter, bcb h.BlockCB) (*dag.Node, error) {
// Start the splitter
blkch := spl.Split(r)
dbp := h.DagBuilderParams{
Dagserv: ds,
Maxlinks: h.DefaultLinksPerBlock,
Pinner: mp,
BlockCB: bcb,
}
return bal.BalancedLayout(dbp.New(blkch))
}
func BuildTrickleDagFromReader(r io.Reader, ds dag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*dag.Node, error) {
func BuildTrickleDagFromReader(r io.Reader, ds dag.DAGService, spl chunk.BlockSplitter, bcb h.BlockCB) (*dag.Node, error) {
// Start the splitter
blkch := spl.Split(r)
dbp := h.DagBuilderParams{
Dagserv: ds,
Maxlinks: h.DefaultLinksPerBlock,
Pinner: mp,
BlockCB: bcb,
}
return trickle.TrickleLayout(dbp.New(blkch))
}
func BasicPinnerCB(p pin.ManualPinner) h.BlockCB {
return func(k u.Key, root bool) error {
if root {
p.PinWithMode(k, pin.Recursive)
return p.Flush()
} else {
p.PinWithMode(k, pin.Indirect)
return nil
}
}
}
func PinIndirectCB(p pin.ManualPinner) h.BlockCB {
return func(k u.Key, root bool) error {
p.PinWithMode(k, pin.Indirect)
return nil
}
}

View File

@ -17,7 +17,7 @@ import (
func getBalancedDag(t testing.TB, size int64, blksize int) (*dag.Node, dag.DAGService) {
ds := mdtest.Mock(t)
r := io.LimitReader(u.NewTimeSeededRand(), size)
nd, err := BuildDagFromReader(r, ds, nil, &chunk.SizeSplitter{blksize})
nd, err := BuildDagFromReader(r, ds, &chunk.SizeSplitter{blksize}, nil)
if err != nil {
t.Fatal(err)
}
@ -27,7 +27,7 @@ func getBalancedDag(t testing.TB, size int64, blksize int) (*dag.Node, dag.DAGSe
func getTrickleDag(t testing.TB, size int64, blksize int) (*dag.Node, dag.DAGService) {
ds := mdtest.Mock(t)
r := io.LimitReader(u.NewTimeSeededRand(), size)
nd, err := BuildTrickleDagFromReader(r, ds, nil, &chunk.SizeSplitter{blksize})
nd, err := BuildTrickleDagFromReader(r, ds, &chunk.SizeSplitter{blksize}, nil)
if err != nil {
t.Fatal(err)
}
@ -40,7 +40,7 @@ func TestBalancedDag(t *testing.T) {
u.NewTimeSeededRand().Read(buf)
r := bytes.NewReader(buf)
nd, err := BuildDagFromReader(r, ds, nil, chunk.DefaultSplitter)
nd, err := BuildDagFromReader(r, ds, chunk.DefaultSplitter, nil)
if err != nil {
t.Fatal(err)
}