mirror of
https://github.com/ipfs/kubo.git
synced 2025-09-10 09:52:20 +08:00
implement trickledag for faster unixfs operations
This commit is contained in:
@ -96,6 +96,32 @@ func TestBuilderConsistency(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestTrickleBuilderConsistency(t *testing.T) {
|
||||
nbytes := 100000
|
||||
buf := new(bytes.Buffer)
|
||||
io.CopyN(buf, u.NewTimeSeededRand(), int64(nbytes))
|
||||
should := dup(buf.Bytes())
|
||||
dagserv := merkledag.Mock(t)
|
||||
nd, err := BuildTrickleDagFromReader(buf, dagserv, nil, chunk.DefaultSplitter)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
r, err := uio.NewDagReader(context.Background(), nd, dagserv)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
out, err := ioutil.ReadAll(r)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = arrComp(out, should)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func arrComp(a, b []byte) error {
|
||||
if len(a) != len(b) {
|
||||
return fmt.Errorf("Arrays differ in length. %d != %d", len(a), len(b))
|
||||
@ -220,6 +246,43 @@ func TestSeekingBasic(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestTrickleSeekingBasic(t *testing.T) {
|
||||
nbytes := int64(10 * 1024)
|
||||
should := make([]byte, nbytes)
|
||||
u.NewTimeSeededRand().Read(should)
|
||||
|
||||
read := bytes.NewReader(should)
|
||||
dnp := getDagservAndPinner(t)
|
||||
nd, err := BuildDagFromReader(read, dnp.ds, dnp.mp, &chunk.SizeSplitter{500})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
rs, err := uio.NewDagReader(context.Background(), nd, dnp.ds)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
start := int64(4000)
|
||||
n, err := rs.Seek(start, os.SEEK_SET)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if n != start {
|
||||
t.Fatal("Failed to seek to correct offset")
|
||||
}
|
||||
|
||||
out, err := ioutil.ReadAll(rs)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = arrComp(out, should[start:])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSeekToBegin(t *testing.T) {
|
||||
nbytes := int64(10 * 1024)
|
||||
should := make([]byte, nbytes)
|
||||
|
91
importer/trickledag.go
Normal file
91
importer/trickledag.go
Normal file
@ -0,0 +1,91 @@
|
||||
package importer
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/jbenet/go-ipfs/importer/chunk"
|
||||
dag "github.com/jbenet/go-ipfs/merkledag"
|
||||
"github.com/jbenet/go-ipfs/pin"
|
||||
)
|
||||
|
||||
// layerRepeat specifies how many times to append a child tree of a
|
||||
// given depth. Higher values increase the width of a given node, which
|
||||
// improves seek speeds.
|
||||
const layerRepeat = 4
|
||||
|
||||
func BuildTrickleDagFromReader(r io.Reader, ds dag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*dag.Node, error) {
|
||||
// Start the splitter
|
||||
blkch := spl.Split(r)
|
||||
|
||||
// Create our builder helper
|
||||
db := &dagBuilderHelper{
|
||||
dserv: ds,
|
||||
mp: mp,
|
||||
in: blkch,
|
||||
maxlinks: DefaultLinksPerBlock,
|
||||
indrSize: defaultIndirectBlockDataSize(),
|
||||
}
|
||||
|
||||
root := newUnixfsNode()
|
||||
err := db.fillNodeRec(root, 1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for level := 1; !db.done(); level++ {
|
||||
for i := 0; i < layerRepeat && !db.done(); i++ {
|
||||
next := newUnixfsNode()
|
||||
err := db.fillTrickleRec(next, level)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = root.addChild(next, db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
rootnode, err := root.getDagNode()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rootkey, err := ds.Add(rootnode)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if mp != nil {
|
||||
mp.PinWithMode(rootkey, pin.Recursive)
|
||||
err := mp.Flush()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return root.getDagNode()
|
||||
}
|
||||
|
||||
func (db *dagBuilderHelper) fillTrickleRec(node *unixfsNode, depth int) error {
|
||||
// Always do this, even in the base case
|
||||
err := db.fillNodeRec(node, 1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := 1; i < depth && !db.done(); i++ {
|
||||
for j := 0; j < layerRepeat; j++ {
|
||||
next := newUnixfsNode()
|
||||
err := db.fillTrickleRec(next, i)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = node.addChild(next, db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
Reference in New Issue
Block a user