1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-09-10 14:34:24 +08:00

refactor importer package with trickle and balanced dag generation

This commit is contained in:
Jeromy
2015-02-04 03:53:10 +00:00
parent b3e74fabb5
commit bc79ae17a1
11 changed files with 1321 additions and 871 deletions

View File

@ -0,0 +1,443 @@
package balanced
import (
"bytes"
"crypto/rand"
"fmt"
"io"
"io/ioutil"
mrand "math/rand"
"os"
"testing"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
chunk "github.com/jbenet/go-ipfs/importer/chunk"
h "github.com/jbenet/go-ipfs/importer/helpers"
merkledag "github.com/jbenet/go-ipfs/merkledag"
mdtest "github.com/jbenet/go-ipfs/merkledag/test"
pin "github.com/jbenet/go-ipfs/pin"
uio "github.com/jbenet/go-ipfs/unixfs/io"
u "github.com/jbenet/go-ipfs/util"
)
func buildTestDag(r io.Reader, ds merkledag.DAGService, spl chunk.BlockSplitter) (*merkledag.Node, error) {
// Start the splitter
blkch := spl.Split(r)
dbp := h.DagBuilderParams{
Dagserv: ds,
Maxlinks: h.DefaultLinksPerBlock,
}
return BalancedLayout(dbp.New(blkch))
}
//Test where calls to read are smaller than the chunk size
func TestSizeBasedSplit(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
bs := &chunk.SizeSplitter{Size: 512}
testFileConsistency(t, bs, 32*512)
bs = &chunk.SizeSplitter{Size: 4096}
testFileConsistency(t, bs, 32*4096)
// Uneven offset
testFileConsistency(t, bs, 31*4095)
}
func dup(b []byte) []byte {
o := make([]byte, len(b))
copy(o, b)
return o
}
func testFileConsistency(t *testing.T, bs chunk.BlockSplitter, nbytes int) {
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)
read := bytes.NewReader(should)
ds := mdtest.Mock(t)
nd, err := buildTestDag(read, ds, bs)
if err != nil {
t.Fatal(err)
}
r, err := uio.NewDagReader(context.Background(), nd, ds)
if err != nil {
t.Fatal(err)
}
out, err := ioutil.ReadAll(r)
if err != nil {
t.Fatal(err)
}
err = arrComp(out, should)
if err != nil {
t.Fatal(err)
}
}
func TestBuilderConsistency(t *testing.T) {
nbytes := 100000
buf := new(bytes.Buffer)
io.CopyN(buf, u.NewTimeSeededRand(), int64(nbytes))
should := dup(buf.Bytes())
dagserv := mdtest.Mock(t)
nd, err := buildTestDag(buf, dagserv, chunk.DefaultSplitter)
if err != nil {
t.Fatal(err)
}
r, err := uio.NewDagReader(context.Background(), nd, dagserv)
if err != nil {
t.Fatal(err)
}
out, err := ioutil.ReadAll(r)
if err != nil {
t.Fatal(err)
}
err = arrComp(out, should)
if err != nil {
t.Fatal(err)
}
}
func arrComp(a, b []byte) error {
if len(a) != len(b) {
return fmt.Errorf("Arrays differ in length. %d != %d", len(a), len(b))
}
for i, v := range a {
if v != b[i] {
return fmt.Errorf("Arrays differ at index: %d", i)
}
}
return nil
}
func TestMaybeRabinConsistency(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
testFileConsistency(t, chunk.NewMaybeRabin(4096), 256*4096)
}
func TestRabinBlockSize(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
buf := new(bytes.Buffer)
nbytes := 1024 * 1024
io.CopyN(buf, rand.Reader, int64(nbytes))
rab := chunk.NewMaybeRabin(4096)
blkch := rab.Split(buf)
var blocks [][]byte
for b := range blkch {
blocks = append(blocks, b)
}
fmt.Printf("Avg block size: %d\n", nbytes/len(blocks))
}
type dagservAndPinner struct {
ds merkledag.DAGService
mp pin.ManualPinner
}
func TestIndirectBlocks(t *testing.T) {
splitter := &chunk.SizeSplitter{512}
nbytes := 1024 * 1024
buf := make([]byte, nbytes)
u.NewTimeSeededRand().Read(buf)
read := bytes.NewReader(buf)
ds := mdtest.Mock(t)
dag, err := buildTestDag(read, ds, splitter)
if err != nil {
t.Fatal(err)
}
reader, err := uio.NewDagReader(context.Background(), dag, ds)
if err != nil {
t.Fatal(err)
}
out, err := ioutil.ReadAll(reader)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(out, buf) {
t.Fatal("Not equal!")
}
}
func TestSeekingBasic(t *testing.T) {
nbytes := int64(10 * 1024)
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)
read := bytes.NewReader(should)
ds := mdtest.Mock(t)
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500})
if err != nil {
t.Fatal(err)
}
rs, err := uio.NewDagReader(context.Background(), nd, ds)
if err != nil {
t.Fatal(err)
}
start := int64(4000)
n, err := rs.Seek(start, os.SEEK_SET)
if err != nil {
t.Fatal(err)
}
if n != start {
t.Fatal("Failed to seek to correct offset")
}
out, err := ioutil.ReadAll(rs)
if err != nil {
t.Fatal(err)
}
err = arrComp(out, should[start:])
if err != nil {
t.Fatal(err)
}
}
func TestSeekToBegin(t *testing.T) {
nbytes := int64(10 * 1024)
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)
read := bytes.NewReader(should)
ds := mdtest.Mock(t)
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500})
if err != nil {
t.Fatal(err)
}
rs, err := uio.NewDagReader(context.Background(), nd, ds)
if err != nil {
t.Fatal(err)
}
n, err := io.CopyN(ioutil.Discard, rs, 1024*4)
if err != nil {
t.Fatal(err)
}
if n != 4096 {
t.Fatal("Copy didnt copy enough bytes")
}
seeked, err := rs.Seek(0, os.SEEK_SET)
if err != nil {
t.Fatal(err)
}
if seeked != 0 {
t.Fatal("Failed to seek to beginning")
}
out, err := ioutil.ReadAll(rs)
if err != nil {
t.Fatal(err)
}
err = arrComp(out, should)
if err != nil {
t.Fatal(err)
}
}
func TestSeekToAlmostBegin(t *testing.T) {
nbytes := int64(10 * 1024)
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)
read := bytes.NewReader(should)
ds := mdtest.Mock(t)
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500})
if err != nil {
t.Fatal(err)
}
rs, err := uio.NewDagReader(context.Background(), nd, ds)
if err != nil {
t.Fatal(err)
}
n, err := io.CopyN(ioutil.Discard, rs, 1024*4)
if err != nil {
t.Fatal(err)
}
if n != 4096 {
t.Fatal("Copy didnt copy enough bytes")
}
seeked, err := rs.Seek(1, os.SEEK_SET)
if err != nil {
t.Fatal(err)
}
if seeked != 1 {
t.Fatal("Failed to seek to almost beginning")
}
out, err := ioutil.ReadAll(rs)
if err != nil {
t.Fatal(err)
}
err = arrComp(out, should[1:])
if err != nil {
t.Fatal(err)
}
}
func TestSeekEnd(t *testing.T) {
nbytes := int64(50 * 1024)
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)
read := bytes.NewReader(should)
ds := mdtest.Mock(t)
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500})
if err != nil {
t.Fatal(err)
}
rs, err := uio.NewDagReader(context.Background(), nd, ds)
if err != nil {
t.Fatal(err)
}
seeked, err := rs.Seek(0, os.SEEK_END)
if err != nil {
t.Fatal(err)
}
if seeked != nbytes {
t.Fatal("Failed to seek to end")
}
}
func TestSeekEndSingleBlockFile(t *testing.T) {
nbytes := int64(100)
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)
read := bytes.NewReader(should)
ds := mdtest.Mock(t)
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{5000})
if err != nil {
t.Fatal(err)
}
rs, err := uio.NewDagReader(context.Background(), nd, ds)
if err != nil {
t.Fatal(err)
}
seeked, err := rs.Seek(0, os.SEEK_END)
if err != nil {
t.Fatal(err)
}
if seeked != nbytes {
t.Fatal("Failed to seek to end")
}
}
func TestSeekingStress(t *testing.T) {
nbytes := int64(1024 * 1024)
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)
read := bytes.NewReader(should)
ds := mdtest.Mock(t)
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{1000})
if err != nil {
t.Fatal(err)
}
rs, err := uio.NewDagReader(context.Background(), nd, ds)
if err != nil {
t.Fatal(err)
}
testbuf := make([]byte, nbytes)
for i := 0; i < 50; i++ {
offset := mrand.Intn(int(nbytes))
l := int(nbytes) - offset
n, err := rs.Seek(int64(offset), os.SEEK_SET)
if err != nil {
t.Fatal(err)
}
if n != int64(offset) {
t.Fatal("Seek failed to move to correct position")
}
nread, err := rs.Read(testbuf[:l])
if err != nil {
t.Fatal(err)
}
if nread != l {
t.Fatal("Failed to read enough bytes")
}
err = arrComp(testbuf[:l], should[offset:offset+l])
if err != nil {
t.Fatal(err)
}
}
}
func TestSeekingConsistency(t *testing.T) {
nbytes := int64(128 * 1024)
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)
read := bytes.NewReader(should)
ds := mdtest.Mock(t)
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500})
if err != nil {
t.Fatal(err)
}
rs, err := uio.NewDagReader(context.Background(), nd, ds)
if err != nil {
t.Fatal(err)
}
out := make([]byte, nbytes)
for coff := nbytes - 4096; coff >= 0; coff -= 4096 {
t.Log(coff)
n, err := rs.Seek(coff, os.SEEK_SET)
if err != nil {
t.Fatal(err)
}
if n != coff {
t.Fatal("wasnt able to seek to the right position")
}
nread, err := rs.Read(out[coff : coff+4096])
if err != nil {
t.Fatal(err)
}
if nread != 4096 {
t.Fatal("didnt read the correct number of bytes")
}
}
err = arrComp(out, should)
if err != nil {
t.Fatal(err)
}
}

View File

@ -0,0 +1,66 @@
package balanced
import (
"errors"
h "github.com/jbenet/go-ipfs/importer/helpers"
dag "github.com/jbenet/go-ipfs/merkledag"
)
func BalancedLayout(db *h.DagBuilderHelper) (*dag.Node, error) {
var root *h.UnixfsNode
for level := 0; !db.Done(); level++ {
nroot := h.NewUnixfsNode()
// add our old root as a child of the new root.
if root != nil { // nil if it's the first node.
if err := nroot.AddChild(root, db); err != nil {
return nil, err
}
}
// fill it up.
if err := fillNodeRec(db, nroot, level); err != nil {
return nil, err
}
root = nroot
}
if root == nil {
root = h.NewUnixfsNode()
}
return db.Add(root)
}
// fillNodeRec will fill the given node with data from the dagBuilders input
// source down to an indirection depth as specified by 'depth'
// it returns the total dataSize of the node, and a potential error
//
// warning: **children** pinned indirectly, but input node IS NOT pinned.
func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error {
if depth < 0 {
return errors.New("attempt to fillNode at depth < 0")
}
// Base case
if depth <= 0 { // catch accidental -1's in case error above is removed.
return db.FillNodeWithData(node)
}
// while we have room AND we're not done
for node.NumChildren() < db.Maxlinks() && !db.Done() {
child := h.NewUnixfsNode()
if err := fillNodeRec(db, child, depth-1); err != nil {
return err
}
if err := node.AddChild(child, db); err != nil {
return err
}
}
return nil
}

View File

@ -1,50 +0,0 @@
package importer
import (
"math"
"testing"
humanize "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/dustin/go-humanize"
)
func TestCalculateSizes(t *testing.T) {
// d := ((lbs/271) ^ layer) * dbs
increments := func(a, b int) []int {
ints := []int{}
for ; a <= b; a *= 2 {
ints = append(ints, a)
}
return ints
}
layers := 7
roughLinkSize := roughLinkSize // from importer pkg
dataBlockSizes := increments(1<<12, 1<<18)
linkBlockSizes := increments(1<<12, 1<<14)
t.Logf("rough link size: %d", roughLinkSize)
t.Logf("data block sizes: %v", dataBlockSizes)
t.Logf("link block sizes: %v", linkBlockSizes)
for _, dbs := range dataBlockSizes {
t.Logf("")
t.Logf("with data block size: %d", dbs)
for _, lbs := range linkBlockSizes {
t.Logf("")
t.Logf("\twith data block size: %d", dbs)
t.Logf("\twith link block size: %d", lbs)
lpb := lbs / roughLinkSize
t.Logf("\tlinks per block: %d", lpb)
for l := 1; l < layers; l++ {
total := int(math.Pow(float64(lpb), float64(l))) * dbs
htotal := humanize.Bytes(uint64(total))
t.Logf("\t\t\tlayer %d: %s\t%d", l, htotal, total)
}
}
}
}

View File

@ -0,0 +1,141 @@
package helpers
import (
dag "github.com/jbenet/go-ipfs/merkledag"
"github.com/jbenet/go-ipfs/pin"
)
// DagBuilderHelper wraps together a bunch of objects needed to
// efficiently create unixfs dag trees
type DagBuilderHelper struct {
dserv dag.DAGService
mp pin.ManualPinner
in <-chan []byte
nextData []byte // the next item to return.
maxlinks int
}
type DagBuilderParams struct {
// Maximum number of links per intermediate node
Maxlinks int
// DAGService to write blocks to (required)
Dagserv dag.DAGService
// Pinner to use for pinning files (optionally nil)
Pinner pin.ManualPinner
}
// Generate a new DagBuilderHelper from the given params, using 'in' as a
// data source
func (dbp *DagBuilderParams) New(in <-chan []byte) *DagBuilderHelper {
return &DagBuilderHelper{
dserv: dbp.Dagserv,
mp: dbp.Pinner,
in: in,
maxlinks: dbp.Maxlinks,
}
}
// prepareNext consumes the next item from the channel and puts it
// in the nextData field. it is idempotent-- if nextData is full
// it will do nothing.
//
// i realized that building the dag becomes _a lot_ easier if we can
// "peek" the "are done yet?" (i.e. not consume it from the channel)
func (db *DagBuilderHelper) prepareNext() {
if db.in == nil {
// if our input is nil, there is "nothing to do". we're done.
// as if there was no data at all. (a sort of zero-value)
return
}
// if we already have data waiting to be consumed, we're ready.
if db.nextData != nil {
return
}
// if it's closed, nextData will be correctly set to nil, signaling
// that we're done consuming from the channel.
db.nextData = <-db.in
}
// Done returns whether or not we're done consuming the incoming data.
func (db *DagBuilderHelper) Done() bool {
// ensure we have an accurate perspective on data
// as `done` this may be called before `next`.
db.prepareNext() // idempotent
return db.nextData == nil
}
// Next returns the next chunk of data to be inserted into the dag
// if it returns nil, that signifies that the stream is at an end, and
// that the current building operation should finish
func (db *DagBuilderHelper) Next() []byte {
db.prepareNext() // idempotent
d := db.nextData
db.nextData = nil // signal we've consumed it
return d
}
// FillNodeLayer will add datanodes as children to the give node until
// at most db.indirSize ndoes are added
//
// warning: **children** pinned indirectly, but input node IS NOT pinned.
func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error {
// while we have room AND we're not done
for node.NumChildren() < db.maxlinks && !db.Done() {
child := NewUnixfsNode()
if err := db.FillNodeWithData(child); err != nil {
return err
}
if err := node.AddChild(child, db); err != nil {
return err
}
}
return nil
}
func (db *DagBuilderHelper) FillNodeWithData(node *UnixfsNode) error {
data := db.Next()
if data == nil { // we're done!
return nil
}
if len(data) > BlockSizeLimit {
return ErrSizeLimitExceeded
}
node.setData(data)
return nil
}
func (db *DagBuilderHelper) Add(node *UnixfsNode) (*dag.Node, error) {
dn, err := node.GetDagNode()
if err != nil {
return nil, err
}
key, err := db.dserv.Add(dn)
if err != nil {
return nil, err
}
if db.mp != nil {
db.mp.PinWithMode(key, pin.Recursive)
err := db.mp.Flush()
if err != nil {
return nil, err
}
}
return dn, nil
}
func (db *DagBuilderHelper) Maxlinks() int {
return db.maxlinks
}

View File

@ -0,0 +1,99 @@
package helpers
import (
"fmt"
chunk "github.com/jbenet/go-ipfs/importer/chunk"
dag "github.com/jbenet/go-ipfs/merkledag"
"github.com/jbenet/go-ipfs/pin"
ft "github.com/jbenet/go-ipfs/unixfs"
)
// BlockSizeLimit specifies the maximum size an imported block can have.
var BlockSizeLimit = 1048576 // 1 MB
// rough estimates on expected sizes
var roughDataBlockSize = chunk.DefaultBlockSize
var roughLinkBlockSize = 1 << 13 // 8KB
var roughLinkSize = 258 + 8 + 5 // sha256 multihash + size + no name + protobuf framing
// DefaultLinksPerBlock governs how the importer decides how many links there
// will be per block. This calculation is based on expected distributions of:
// * the expected distribution of block sizes
// * the expected distribution of link sizes
// * desired access speed
// For now, we use:
//
// var roughLinkBlockSize = 1 << 13 // 8KB
// var roughLinkSize = 288 // sha256 + framing + name
// var DefaultLinksPerBlock = (roughLinkBlockSize / roughLinkSize)
//
// See calc_test.go
var DefaultLinksPerBlock = (roughLinkBlockSize / roughLinkSize)
// ErrSizeLimitExceeded signals that a block is larger than BlockSizeLimit.
var ErrSizeLimitExceeded = fmt.Errorf("object size limit exceeded")
// UnixfsNode is a struct created to aid in the generation
// of unixfs DAG trees
type UnixfsNode struct {
node *dag.Node
ufmt *ft.MultiBlock
}
func NewUnixfsNode() *UnixfsNode {
return &UnixfsNode{
node: new(dag.Node),
ufmt: new(ft.MultiBlock),
}
}
func (n *UnixfsNode) NumChildren() int {
return n.ufmt.NumChildren()
}
// addChild will add the given UnixfsNode as a child of the receiver.
// the passed in DagBuilderHelper is used to store the child node an
// pin it locally so it doesnt get lost
func (n *UnixfsNode) AddChild(child *UnixfsNode, db *DagBuilderHelper) error {
n.ufmt.AddBlockSize(child.ufmt.FileSize())
childnode, err := child.GetDagNode()
if err != nil {
return err
}
// Add a link to this node without storing a reference to the memory
// This way, we avoid nodes building up and consuming all of our RAM
err = n.node.AddNodeLinkClean("", childnode)
if err != nil {
return err
}
childkey, err := db.dserv.Add(childnode)
if err != nil {
return err
}
// Pin the child node indirectly
if db.mp != nil {
db.mp.PinWithMode(childkey, pin.Indirect)
}
return nil
}
func (n *UnixfsNode) setData(data []byte) {
n.ufmt.Data = data
}
// getDagNode fills out the proper formatting for the unixfs node
// inside of a DAG node and returns the dag node
func (n *UnixfsNode) GetDagNode() (*dag.Node, error) {
data, err := n.ufmt.GetBytes()
if err != nil {
return nil, err
}
n.node.Data = data
return n.node, nil
}

View File

@ -3,70 +3,21 @@
package importer
import (
"errors"
"fmt"
"io"
"os"
bal "github.com/jbenet/go-ipfs/importer/balanced"
"github.com/jbenet/go-ipfs/importer/chunk"
h "github.com/jbenet/go-ipfs/importer/helpers"
trickle "github.com/jbenet/go-ipfs/importer/trickle"
dag "github.com/jbenet/go-ipfs/merkledag"
"github.com/jbenet/go-ipfs/pin"
ft "github.com/jbenet/go-ipfs/unixfs"
"github.com/jbenet/go-ipfs/util"
)
var log = util.Logger("importer")
// BlockSizeLimit specifies the maximum size an imported block can have.
var BlockSizeLimit = 1048576 // 1 MB
// rough estimates on expected sizes
var roughDataBlockSize = chunk.DefaultBlockSize
var roughLinkBlockSize = 1 << 13 // 8KB
var roughLinkSize = 258 + 8 + 5 // sha256 multihash + size + no name + protobuf framing
// DefaultLinksPerBlock governs how the importer decides how many links there
// will be per block. This calculation is based on expected distributions of:
// * the expected distribution of block sizes
// * the expected distribution of link sizes
// * desired access speed
// For now, we use:
//
// var roughLinkBlockSize = 1 << 13 // 8KB
// var roughLinkSize = 288 // sha256 + framing + name
// var DefaultLinksPerBlock = (roughLinkBlockSize / roughLinkSize)
//
// See calc_test.go
var DefaultLinksPerBlock = (roughLinkBlockSize / roughLinkSize)
// ErrSizeLimitExceeded signals that a block is larger than BlockSizeLimit.
var ErrSizeLimitExceeded = fmt.Errorf("object size limit exceeded")
// IndirectBlocksCopyData governs whether indirect blocks should copy over
// data from their first child, and how much. If this is 0, indirect blocks
// have no data, only links. If this is larger, Indirect blocks will copy
// as much as (maybe less than) this many bytes.
//
// This number should be <= (BlockSizeLimit - (DefaultLinksPerBlock * LinkSize))
// Note that it is not known here what the LinkSize is, because the hash function
// could vary wildly in size. Exercise caution when setting this option. For
// safety, it will be clipped to (BlockSizeLimit - (DefaultLinksPerBlock * 256))
var IndirectBlockDataSize = 0
// this check is here to ensure the conditions on IndirectBlockDataSize hold.
// returns int because it will be used as an input to `make()` later on. if
// `int` will flip over to negative, better know here.
func defaultIndirectBlockDataSize() int {
max := BlockSizeLimit - (DefaultLinksPerBlock * 256)
if IndirectBlockDataSize < max {
max = IndirectBlockDataSize
}
if max < 0 {
return 0
}
return max
}
// Builds a DAG from the given file, writing created blocks to disk as they are
// created
func BuildDagFromFile(fpath string, ds dag.DAGService, mp pin.ManualPinner) (*dag.Node, error) {
@ -88,228 +39,28 @@ func BuildDagFromFile(fpath string, ds dag.DAGService, mp pin.ManualPinner) (*da
return BuildDagFromReader(f, ds, mp, chunk.DefaultSplitter)
}
// unixfsNode is a struct created to aid in the generation
// of unixfs DAG trees
type unixfsNode struct {
node *dag.Node
ufmt *ft.MultiBlock
}
func newUnixfsNode() *unixfsNode {
return &unixfsNode{
node: new(dag.Node),
ufmt: new(ft.MultiBlock),
}
}
func (n *unixfsNode) numChildren() int {
return n.ufmt.NumChildren()
}
// addChild will add the given unixfsNode as a child of the receiver.
// the passed in dagBuilderHelper is used to store the child node an
// pin it locally so it doesnt get lost
func (n *unixfsNode) addChild(child *unixfsNode, db *dagBuilderHelper) error {
n.ufmt.AddBlockSize(child.ufmt.FileSize())
childnode, err := child.getDagNode()
if err != nil {
return err
}
// Add a link to this node without storing a reference to the memory
// This way, we avoid nodes building up and consuming all of our RAM
err = n.node.AddNodeLinkClean("", childnode)
if err != nil {
return err
}
childkey, err := db.dserv.Add(childnode)
if err != nil {
return err
}
// Pin the child node indirectly
if db.mp != nil {
db.mp.PinWithMode(childkey, pin.Indirect)
}
return nil
}
func (n *unixfsNode) setData(data []byte) {
n.ufmt.Data = data
}
// getDagNode fills out the proper formatting for the unixfs node
// inside of a DAG node and returns the dag node
func (n *unixfsNode) getDagNode() (*dag.Node, error) {
data, err := n.ufmt.GetBytes()
if err != nil {
return nil, err
}
n.node.Data = data
return n.node, nil
}
func BuildDagFromReader(r io.Reader, ds dag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*dag.Node, error) {
// Start the splitter
blkch := spl.Split(r)
// Create our builder helper
db := &dagBuilderHelper{
dserv: ds,
mp: mp,
in: blkch,
maxlinks: DefaultLinksPerBlock,
indrSize: defaultIndirectBlockDataSize(),
dbp := h.DagBuilderParams{
Dagserv: ds,
Maxlinks: h.DefaultLinksPerBlock,
Pinner: mp,
}
var root *unixfsNode
for level := 0; !db.done(); level++ {
nroot := newUnixfsNode()
// add our old root as a child of the new root.
if root != nil { // nil if it's the first node.
if err := nroot.addChild(root, db); err != nil {
return nil, err
}
}
// fill it up.
if err := db.fillNodeRec(nroot, level); err != nil {
return nil, err
}
root = nroot
}
if root == nil {
root = newUnixfsNode()
}
rootnode, err := root.getDagNode()
if err != nil {
return nil, err
}
rootkey, err := ds.Add(rootnode)
if err != nil {
return nil, err
}
if mp != nil {
mp.PinWithMode(rootkey, pin.Recursive)
err := mp.Flush()
if err != nil {
return nil, err
}
}
return root.getDagNode()
return bal.BalancedLayout(dbp.New(blkch))
}
// dagBuilderHelper wraps together a bunch of objects needed to
// efficiently create unixfs dag trees
type dagBuilderHelper struct {
dserv dag.DAGService
mp pin.ManualPinner
in <-chan []byte
nextData []byte // the next item to return.
maxlinks int
indrSize int // see IndirectBlockData
}
// prepareNext consumes the next item from the channel and puts it
// in the nextData field. it is idempotent-- if nextData is full
// it will do nothing.
//
// i realized that building the dag becomes _a lot_ easier if we can
// "peek" the "are done yet?" (i.e. not consume it from the channel)
func (db *dagBuilderHelper) prepareNext() {
if db.in == nil {
// if our input is nil, there is "nothing to do". we're done.
// as if there was no data at all. (a sort of zero-value)
return
}
// if we already have data waiting to be consumed, we're ready.
if db.nextData != nil {
return
}
// if it's closed, nextData will be correctly set to nil, signaling
// that we're done consuming from the channel.
db.nextData = <-db.in
}
// done returns whether or not we're done consuming the incoming data.
func (db *dagBuilderHelper) done() bool {
// ensure we have an accurate perspective on data
// as `done` this may be called before `next`.
db.prepareNext() // idempotent
return db.nextData == nil
}
// next returns the next chunk of data to be inserted into the dag
// if it returns nil, that signifies that the stream is at an end, and
// that the current building operation should finish
func (db *dagBuilderHelper) next() []byte {
db.prepareNext() // idempotent
d := db.nextData
db.nextData = nil // signal we've consumed it
return d
}
// fillNodeRec will fill the given node with data from the dagBuilders input
// source down to an indirection depth as specified by 'depth'
// it returns the total dataSize of the node, and a potential error
//
// warning: **children** pinned indirectly, but input node IS NOT pinned.
func (db *dagBuilderHelper) fillNodeRec(node *unixfsNode, depth int) error {
if depth < 0 {
return errors.New("attempt to fillNode at depth < 0")
}
// Base case
if depth <= 0 { // catch accidental -1's in case error above is removed.
return db.fillNodeWithData(node)
}
// while we have room AND we're not done
for node.numChildren() < db.maxlinks && !db.done() {
child := newUnixfsNode()
if err := db.fillNodeRec(child, depth-1); err != nil {
return err
}
if err := node.addChild(child, db); err != nil {
return err
}
}
return nil
}
func (db *dagBuilderHelper) fillNodeWithData(node *unixfsNode) error {
data := db.next()
if data == nil { // we're done!
return nil
}
if len(data) > BlockSizeLimit {
return ErrSizeLimitExceeded
}
node.setData(data)
return nil
}
// why is intmin not in math?
func min(a, b int) int {
if a > b {
return a
}
return b
func BuildTrickleDagFromReader(r io.Reader, ds dag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*dag.Node, error) {
// Start the splitter
blkch := spl.Split(r)
dbp := h.DagBuilderParams{
Dagserv: ds,
Maxlinks: h.DefaultLinksPerBlock,
Pinner: mp,
}
return trickle.TrickleLayout(dbp.New(blkch))
}

View File

@ -2,511 +2,100 @@ package importer
import (
"bytes"
"crypto/rand"
"fmt"
"io"
"io/ioutil"
mrand "math/rand"
"os"
"testing"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
bserv "github.com/jbenet/go-ipfs/blockservice"
offline "github.com/jbenet/go-ipfs/exchange/offline"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
chunk "github.com/jbenet/go-ipfs/importer/chunk"
merkledag "github.com/jbenet/go-ipfs/merkledag"
pin "github.com/jbenet/go-ipfs/pin"
dag "github.com/jbenet/go-ipfs/merkledag"
mdtest "github.com/jbenet/go-ipfs/merkledag/test"
uio "github.com/jbenet/go-ipfs/unixfs/io"
u "github.com/jbenet/go-ipfs/util"
)
//Test where calls to read are smaller than the chunk size
func TestSizeBasedSplit(t *testing.T) {
if testing.Short() {
t.SkipNow()
func getBalancedDag(t testing.TB, size int64) (*dag.Node, dag.DAGService) {
ds := mdtest.Mock(t)
r := io.LimitReader(u.NewTimeSeededRand(), size)
nd, err := BuildDagFromReader(r, ds, nil, chunk.DefaultSplitter)
if err != nil {
t.Fatal(err)
}
bs := &chunk.SizeSplitter{Size: 512}
testFileConsistency(t, bs, 32*512)
bs = &chunk.SizeSplitter{Size: 4096}
testFileConsistency(t, bs, 32*4096)
// Uneven offset
testFileConsistency(t, bs, 31*4095)
return nd, ds
}
func dup(b []byte) []byte {
o := make([]byte, len(b))
copy(o, b)
return o
func getTrickleDag(t testing.TB, size int64) (*dag.Node, dag.DAGService) {
ds := mdtest.Mock(t)
r := io.LimitReader(u.NewTimeSeededRand(), size)
nd, err := BuildTrickleDagFromReader(r, ds, nil, chunk.DefaultSplitter)
if err != nil {
t.Fatal(err)
}
return nd, ds
}
func testFileConsistency(t *testing.T, bs chunk.BlockSplitter, nbytes int) {
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)
read := bytes.NewReader(should)
dnp := getDagservAndPinner(t)
nd, err := BuildDagFromReader(read, dnp.ds, dnp.mp, bs)
if err != nil {
t.Fatal(err)
}
r, err := uio.NewDagReader(context.Background(), nd, dnp.ds)
if err != nil {
t.Fatal(err)
}
out, err := ioutil.ReadAll(r)
if err != nil {
t.Fatal(err)
}
err = arrComp(out, should)
if err != nil {
t.Fatal(err)
}
}
func TestBuilderConsistency(t *testing.T) {
nbytes := 100000
buf := new(bytes.Buffer)
io.CopyN(buf, u.NewTimeSeededRand(), int64(nbytes))
should := dup(buf.Bytes())
dagserv := merkledag.Mock(t)
nd, err := BuildDagFromReader(buf, dagserv, nil, chunk.DefaultSplitter)
if err != nil {
t.Fatal(err)
}
r, err := uio.NewDagReader(context.Background(), nd, dagserv)
if err != nil {
t.Fatal(err)
}
out, err := ioutil.ReadAll(r)
if err != nil {
t.Fatal(err)
}
err = arrComp(out, should)
if err != nil {
t.Fatal(err)
}
}
func TestTrickleBuilderConsistency(t *testing.T) {
nbytes := 100000
buf := new(bytes.Buffer)
io.CopyN(buf, u.NewTimeSeededRand(), int64(nbytes))
should := dup(buf.Bytes())
dagserv := merkledag.Mock(t)
nd, err := BuildTrickleDagFromReader(buf, dagserv, nil, chunk.DefaultSplitter)
if err != nil {
t.Fatal(err)
}
r, err := uio.NewDagReader(context.Background(), nd, dagserv)
if err != nil {
t.Fatal(err)
}
out, err := ioutil.ReadAll(r)
if err != nil {
t.Fatal(err)
}
err = arrComp(out, should)
if err != nil {
t.Fatal(err)
}
}
func arrComp(a, b []byte) error {
if len(a) != len(b) {
return fmt.Errorf("Arrays differ in length. %d != %d", len(a), len(b))
}
for i, v := range a {
if v != b[i] {
return fmt.Errorf("Arrays differ at index: %d", i)
}
}
return nil
}
func TestMaybeRabinConsistency(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
testFileConsistency(t, chunk.NewMaybeRabin(4096), 256*4096)
}
func TestRabinBlockSize(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
buf := new(bytes.Buffer)
nbytes := 1024 * 1024
io.CopyN(buf, rand.Reader, int64(nbytes))
rab := chunk.NewMaybeRabin(4096)
blkch := rab.Split(buf)
var blocks [][]byte
for b := range blkch {
blocks = append(blocks, b)
}
fmt.Printf("Avg block size: %d\n", nbytes/len(blocks))
}
type dagservAndPinner struct {
ds merkledag.DAGService
mp pin.ManualPinner
}
func getDagservAndPinner(t *testing.T) dagservAndPinner {
db := dssync.MutexWrap(ds.NewMapDatastore())
bs := bstore.NewBlockstore(db)
blockserv, err := bserv.New(bs, offline.Exchange(bs))
if err != nil {
t.Fatal(err)
}
dserv := merkledag.NewDAGService(blockserv)
mpin := pin.NewPinner(db, dserv).GetManual()
return dagservAndPinner{
ds: dserv,
mp: mpin,
}
}
func TestIndirectBlocks(t *testing.T) {
splitter := &chunk.SizeSplitter{512}
nbytes := 1024 * 1024
buf := make([]byte, nbytes)
func TestBalancedDag(t *testing.T) {
ds := mdtest.Mock(t)
buf := make([]byte, 10000)
u.NewTimeSeededRand().Read(buf)
r := bytes.NewReader(buf)
read := bytes.NewReader(buf)
dnp := getDagservAndPinner(t)
dag, err := BuildDagFromReader(read, dnp.ds, dnp.mp, splitter)
nd, err := BuildDagFromReader(r, ds, nil, chunk.DefaultSplitter)
if err != nil {
t.Fatal(err)
}
reader, err := uio.NewDagReader(context.Background(), dag, dnp.ds)
dr, err := uio.NewDagReader(context.TODO(), nd, ds)
if err != nil {
t.Fatal(err)
}
out, err := ioutil.ReadAll(reader)
out, err := ioutil.ReadAll(dr)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(out, buf) {
t.Fatal("Not equal!")
t.Fatal("bad read")
}
}
func TestSeekingBasic(t *testing.T) {
nbytes := int64(10 * 1024)
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)
func BenchmarkBalancedRead(b *testing.B) {
b.StopTimer()
nd, ds := getBalancedDag(b, int64(b.N))
read := bytes.NewReader(should)
dnp := getDagservAndPinner(t)
nd, err := BuildDagFromReader(read, dnp.ds, dnp.mp, &chunk.SizeSplitter{500})
read, err := uio.NewDagReader(context.TODO(), nd, ds)
if err != nil {
t.Fatal(err)
b.Fatal(err)
}
rs, err := uio.NewDagReader(context.Background(), nd, dnp.ds)
b.StartTimer()
b.SetBytes(int64(b.N))
n, err := io.Copy(ioutil.Discard, read)
if err != nil {
t.Fatal(err)
b.Fatal(err)
}
start := int64(4000)
n, err := rs.Seek(start, os.SEEK_SET)
if err != nil {
t.Fatal(err)
}
if n != start {
t.Fatal("Failed to seek to correct offset")
}
out, err := ioutil.ReadAll(rs)
if err != nil {
t.Fatal(err)
}
err = arrComp(out, should[start:])
if err != nil {
t.Fatal(err)
if n != int64(b.N) {
b.Fatal("Failed to read correct amount")
}
}
func TestTrickleSeekingBasic(t *testing.T) {
nbytes := int64(10 * 1024)
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)
func BenchmarkTrickleRead(b *testing.B) {
b.StopTimer()
nd, ds := getTrickleDag(b, int64(b.N))
read := bytes.NewReader(should)
dnp := getDagservAndPinner(t)
nd, err := BuildDagFromReader(read, dnp.ds, dnp.mp, &chunk.SizeSplitter{500})
read, err := uio.NewDagReader(context.TODO(), nd, ds)
if err != nil {
t.Fatal(err)
b.Fatal(err)
}
rs, err := uio.NewDagReader(context.Background(), nd, dnp.ds)
b.StartTimer()
b.SetBytes(int64(b.N))
n, err := io.Copy(new(bytes.Buffer), read)
if err != nil {
t.Fatal(err)
b.Fatal(err)
}
start := int64(4000)
n, err := rs.Seek(start, os.SEEK_SET)
if err != nil {
t.Fatal(err)
}
if n != start {
t.Fatal("Failed to seek to correct offset")
}
out, err := ioutil.ReadAll(rs)
if err != nil {
t.Fatal(err)
}
err = arrComp(out, should[start:])
if err != nil {
t.Fatal(err)
}
}
func TestSeekToBegin(t *testing.T) {
nbytes := int64(10 * 1024)
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)
read := bytes.NewReader(should)
dnp := getDagservAndPinner(t)
nd, err := BuildDagFromReader(read, dnp.ds, dnp.mp, &chunk.SizeSplitter{500})
if err != nil {
t.Fatal(err)
}
rs, err := uio.NewDagReader(context.Background(), nd, dnp.ds)
if err != nil {
t.Fatal(err)
}
n, err := io.CopyN(ioutil.Discard, rs, 1024*4)
if err != nil {
t.Fatal(err)
}
if n != 4096 {
t.Fatal("Copy didnt copy enough bytes")
}
seeked, err := rs.Seek(0, os.SEEK_SET)
if err != nil {
t.Fatal(err)
}
if seeked != 0 {
t.Fatal("Failed to seek to beginning")
}
out, err := ioutil.ReadAll(rs)
if err != nil {
t.Fatal(err)
}
err = arrComp(out, should)
if err != nil {
t.Fatal(err)
}
}
func TestSeekToAlmostBegin(t *testing.T) {
nbytes := int64(10 * 1024)
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)
read := bytes.NewReader(should)
dnp := getDagservAndPinner(t)
nd, err := BuildDagFromReader(read, dnp.ds, dnp.mp, &chunk.SizeSplitter{500})
if err != nil {
t.Fatal(err)
}
rs, err := uio.NewDagReader(context.Background(), nd, dnp.ds)
if err != nil {
t.Fatal(err)
}
n, err := io.CopyN(ioutil.Discard, rs, 1024*4)
if err != nil {
t.Fatal(err)
}
if n != 4096 {
t.Fatal("Copy didnt copy enough bytes")
}
seeked, err := rs.Seek(1, os.SEEK_SET)
if err != nil {
t.Fatal(err)
}
if seeked != 1 {
t.Fatal("Failed to seek to almost beginning")
}
out, err := ioutil.ReadAll(rs)
if err != nil {
t.Fatal(err)
}
err = arrComp(out, should[1:])
if err != nil {
t.Fatal(err)
}
}
func TestSeekEnd(t *testing.T) {
nbytes := int64(50 * 1024)
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)
read := bytes.NewReader(should)
dnp := getDagservAndPinner(t)
nd, err := BuildDagFromReader(read, dnp.ds, dnp.mp, &chunk.SizeSplitter{500})
if err != nil {
t.Fatal(err)
}
rs, err := uio.NewDagReader(context.Background(), nd, dnp.ds)
if err != nil {
t.Fatal(err)
}
seeked, err := rs.Seek(0, os.SEEK_END)
if err != nil {
t.Fatal(err)
}
if seeked != nbytes {
t.Fatal("Failed to seek to end")
}
}
func TestSeekEndSingleBlockFile(t *testing.T) {
nbytes := int64(100)
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)
read := bytes.NewReader(should)
dnp := getDagservAndPinner(t)
nd, err := BuildDagFromReader(read, dnp.ds, dnp.mp, &chunk.SizeSplitter{5000})
if err != nil {
t.Fatal(err)
}
rs, err := uio.NewDagReader(context.Background(), nd, dnp.ds)
if err != nil {
t.Fatal(err)
}
seeked, err := rs.Seek(0, os.SEEK_END)
if err != nil {
t.Fatal(err)
}
if seeked != nbytes {
t.Fatal("Failed to seek to end")
}
}
func TestSeekingStress(t *testing.T) {
nbytes := int64(1024 * 1024)
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)
read := bytes.NewReader(should)
dnp := getDagservAndPinner(t)
nd, err := BuildDagFromReader(read, dnp.ds, dnp.mp, &chunk.SizeSplitter{1000})
if err != nil {
t.Fatal(err)
}
rs, err := uio.NewDagReader(context.Background(), nd, dnp.ds)
if err != nil {
t.Fatal(err)
}
testbuf := make([]byte, nbytes)
for i := 0; i < 50; i++ {
offset := mrand.Intn(int(nbytes))
l := int(nbytes) - offset
n, err := rs.Seek(int64(offset), os.SEEK_SET)
if err != nil {
t.Fatal(err)
}
if n != int64(offset) {
t.Fatal("Seek failed to move to correct position")
}
nread, err := rs.Read(testbuf[:l])
if err != nil {
t.Fatal(err)
}
if nread != l {
t.Fatal("Failed to read enough bytes")
}
err = arrComp(testbuf[:l], should[offset:offset+l])
if err != nil {
t.Fatal(err)
}
}
}
func TestSeekingConsistency(t *testing.T) {
nbytes := int64(128 * 1024)
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)
read := bytes.NewReader(should)
dnp := getDagservAndPinner(t)
nd, err := BuildDagFromReader(read, dnp.ds, dnp.mp, &chunk.SizeSplitter{500})
if err != nil {
t.Fatal(err)
}
rs, err := uio.NewDagReader(context.Background(), nd, dnp.ds)
if err != nil {
t.Fatal(err)
}
out := make([]byte, nbytes)
for coff := nbytes - 4096; coff >= 0; coff -= 4096 {
t.Log(coff)
n, err := rs.Seek(coff, os.SEEK_SET)
if err != nil {
t.Fatal(err)
}
if n != coff {
t.Fatal("wasnt able to seek to the right position")
}
nread, err := rs.Read(out[coff : coff+4096])
if err != nil {
t.Fatal(err)
}
if nread != 4096 {
t.Fatal("didnt read the correct number of bytes")
}
}
err = arrComp(out, should)
if err != nil {
t.Fatal(err)
if n != int64(b.N) {
b.Fatal("Failed to read correct amount")
}
}

View File

@ -0,0 +1,443 @@
package trickle
import (
"bytes"
"crypto/rand"
"fmt"
"io"
"io/ioutil"
mrand "math/rand"
"os"
"testing"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
chunk "github.com/jbenet/go-ipfs/importer/chunk"
h "github.com/jbenet/go-ipfs/importer/helpers"
merkledag "github.com/jbenet/go-ipfs/merkledag"
mdtest "github.com/jbenet/go-ipfs/merkledag/test"
pin "github.com/jbenet/go-ipfs/pin"
uio "github.com/jbenet/go-ipfs/unixfs/io"
u "github.com/jbenet/go-ipfs/util"
)
func buildTestDag(r io.Reader, ds merkledag.DAGService, spl chunk.BlockSplitter) (*merkledag.Node, error) {
// Start the splitter
blkch := spl.Split(r)
dbp := h.DagBuilderParams{
Dagserv: ds,
Maxlinks: h.DefaultLinksPerBlock,
}
return TrickleLayout(dbp.New(blkch))
}
//Test where calls to read are smaller than the chunk size
func TestSizeBasedSplit(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
bs := &chunk.SizeSplitter{Size: 512}
testFileConsistency(t, bs, 32*512)
bs = &chunk.SizeSplitter{Size: 4096}
testFileConsistency(t, bs, 32*4096)
// Uneven offset
testFileConsistency(t, bs, 31*4095)
}
func dup(b []byte) []byte {
o := make([]byte, len(b))
copy(o, b)
return o
}
func testFileConsistency(t *testing.T, bs chunk.BlockSplitter, nbytes int) {
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)
read := bytes.NewReader(should)
ds := mdtest.Mock(t)
nd, err := buildTestDag(read, ds, bs)
if err != nil {
t.Fatal(err)
}
r, err := uio.NewDagReader(context.Background(), nd, ds)
if err != nil {
t.Fatal(err)
}
out, err := ioutil.ReadAll(r)
if err != nil {
t.Fatal(err)
}
err = arrComp(out, should)
if err != nil {
t.Fatal(err)
}
}
func TestBuilderConsistency(t *testing.T) {
nbytes := 100000
buf := new(bytes.Buffer)
io.CopyN(buf, u.NewTimeSeededRand(), int64(nbytes))
should := dup(buf.Bytes())
dagserv := mdtest.Mock(t)
nd, err := buildTestDag(buf, dagserv, chunk.DefaultSplitter)
if err != nil {
t.Fatal(err)
}
r, err := uio.NewDagReader(context.Background(), nd, dagserv)
if err != nil {
t.Fatal(err)
}
out, err := ioutil.ReadAll(r)
if err != nil {
t.Fatal(err)
}
err = arrComp(out, should)
if err != nil {
t.Fatal(err)
}
}
func arrComp(a, b []byte) error {
if len(a) != len(b) {
return fmt.Errorf("Arrays differ in length. %d != %d", len(a), len(b))
}
for i, v := range a {
if v != b[i] {
return fmt.Errorf("Arrays differ at index: %d", i)
}
}
return nil
}
func TestMaybeRabinConsistency(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
testFileConsistency(t, chunk.NewMaybeRabin(4096), 256*4096)
}
func TestRabinBlockSize(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
buf := new(bytes.Buffer)
nbytes := 1024 * 1024
io.CopyN(buf, rand.Reader, int64(nbytes))
rab := chunk.NewMaybeRabin(4096)
blkch := rab.Split(buf)
var blocks [][]byte
for b := range blkch {
blocks = append(blocks, b)
}
fmt.Printf("Avg block size: %d\n", nbytes/len(blocks))
}
type dagservAndPinner struct {
ds merkledag.DAGService
mp pin.ManualPinner
}
func TestIndirectBlocks(t *testing.T) {
splitter := &chunk.SizeSplitter{512}
nbytes := 1024 * 1024
buf := make([]byte, nbytes)
u.NewTimeSeededRand().Read(buf)
read := bytes.NewReader(buf)
ds := mdtest.Mock(t)
dag, err := buildTestDag(read, ds, splitter)
if err != nil {
t.Fatal(err)
}
reader, err := uio.NewDagReader(context.Background(), dag, ds)
if err != nil {
t.Fatal(err)
}
out, err := ioutil.ReadAll(reader)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(out, buf) {
t.Fatal("Not equal!")
}
}
func TestSeekingBasic(t *testing.T) {
nbytes := int64(10 * 1024)
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)
read := bytes.NewReader(should)
ds := mdtest.Mock(t)
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500})
if err != nil {
t.Fatal(err)
}
rs, err := uio.NewDagReader(context.Background(), nd, ds)
if err != nil {
t.Fatal(err)
}
start := int64(4000)
n, err := rs.Seek(start, os.SEEK_SET)
if err != nil {
t.Fatal(err)
}
if n != start {
t.Fatal("Failed to seek to correct offset")
}
out, err := ioutil.ReadAll(rs)
if err != nil {
t.Fatal(err)
}
err = arrComp(out, should[start:])
if err != nil {
t.Fatal(err)
}
}
func TestSeekToBegin(t *testing.T) {
nbytes := int64(10 * 1024)
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)
read := bytes.NewReader(should)
ds := mdtest.Mock(t)
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500})
if err != nil {
t.Fatal(err)
}
rs, err := uio.NewDagReader(context.Background(), nd, ds)
if err != nil {
t.Fatal(err)
}
n, err := io.CopyN(ioutil.Discard, rs, 1024*4)
if err != nil {
t.Fatal(err)
}
if n != 4096 {
t.Fatal("Copy didnt copy enough bytes")
}
seeked, err := rs.Seek(0, os.SEEK_SET)
if err != nil {
t.Fatal(err)
}
if seeked != 0 {
t.Fatal("Failed to seek to beginning")
}
out, err := ioutil.ReadAll(rs)
if err != nil {
t.Fatal(err)
}
err = arrComp(out, should)
if err != nil {
t.Fatal(err)
}
}
func TestSeekToAlmostBegin(t *testing.T) {
nbytes := int64(10 * 1024)
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)
read := bytes.NewReader(should)
ds := mdtest.Mock(t)
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500})
if err != nil {
t.Fatal(err)
}
rs, err := uio.NewDagReader(context.Background(), nd, ds)
if err != nil {
t.Fatal(err)
}
n, err := io.CopyN(ioutil.Discard, rs, 1024*4)
if err != nil {
t.Fatal(err)
}
if n != 4096 {
t.Fatal("Copy didnt copy enough bytes")
}
seeked, err := rs.Seek(1, os.SEEK_SET)
if err != nil {
t.Fatal(err)
}
if seeked != 1 {
t.Fatal("Failed to seek to almost beginning")
}
out, err := ioutil.ReadAll(rs)
if err != nil {
t.Fatal(err)
}
err = arrComp(out, should[1:])
if err != nil {
t.Fatal(err)
}
}
func TestSeekEnd(t *testing.T) {
nbytes := int64(50 * 1024)
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)
read := bytes.NewReader(should)
ds := mdtest.Mock(t)
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500})
if err != nil {
t.Fatal(err)
}
rs, err := uio.NewDagReader(context.Background(), nd, ds)
if err != nil {
t.Fatal(err)
}
seeked, err := rs.Seek(0, os.SEEK_END)
if err != nil {
t.Fatal(err)
}
if seeked != nbytes {
t.Fatal("Failed to seek to end")
}
}
func TestSeekEndSingleBlockFile(t *testing.T) {
nbytes := int64(100)
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)
read := bytes.NewReader(should)
ds := mdtest.Mock(t)
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{5000})
if err != nil {
t.Fatal(err)
}
rs, err := uio.NewDagReader(context.Background(), nd, ds)
if err != nil {
t.Fatal(err)
}
seeked, err := rs.Seek(0, os.SEEK_END)
if err != nil {
t.Fatal(err)
}
if seeked != nbytes {
t.Fatal("Failed to seek to end")
}
}
func TestSeekingStress(t *testing.T) {
nbytes := int64(1024 * 1024)
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)
read := bytes.NewReader(should)
ds := mdtest.Mock(t)
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{1000})
if err != nil {
t.Fatal(err)
}
rs, err := uio.NewDagReader(context.Background(), nd, ds)
if err != nil {
t.Fatal(err)
}
testbuf := make([]byte, nbytes)
for i := 0; i < 50; i++ {
offset := mrand.Intn(int(nbytes))
l := int(nbytes) - offset
n, err := rs.Seek(int64(offset), os.SEEK_SET)
if err != nil {
t.Fatal(err)
}
if n != int64(offset) {
t.Fatal("Seek failed to move to correct position")
}
nread, err := rs.Read(testbuf[:l])
if err != nil {
t.Fatal(err)
}
if nread != l {
t.Fatal("Failed to read enough bytes")
}
err = arrComp(testbuf[:l], should[offset:offset+l])
if err != nil {
t.Fatal(err)
}
}
}
func TestSeekingConsistency(t *testing.T) {
nbytes := int64(128 * 1024)
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)
read := bytes.NewReader(should)
ds := mdtest.Mock(t)
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500})
if err != nil {
t.Fatal(err)
}
rs, err := uio.NewDagReader(context.Background(), nd, ds)
if err != nil {
t.Fatal(err)
}
out := make([]byte, nbytes)
for coff := nbytes - 4096; coff >= 0; coff -= 4096 {
t.Log(coff)
n, err := rs.Seek(coff, os.SEEK_SET)
if err != nil {
t.Fatal(err)
}
if n != coff {
t.Fatal("wasnt able to seek to the right position")
}
nread, err := rs.Read(out[coff : coff+4096])
if err != nil {
t.Fatal(err)
}
if nread != 4096 {
t.Fatal("didnt read the correct number of bytes")
}
}
err = arrComp(out, should)
if err != nil {
t.Fatal(err)
}
}

View File

@ -0,0 +1,58 @@
package trickle
import (
h "github.com/jbenet/go-ipfs/importer/helpers"
dag "github.com/jbenet/go-ipfs/merkledag"
)
// layerRepeat specifies how many times to append a child tree of a
// given depth. Higher values increase the width of a given node, which
// improves seek speeds.
const layerRepeat = 4
func TrickleLayout(db *h.DagBuilderHelper) (*dag.Node, error) {
root := h.NewUnixfsNode()
err := db.FillNodeLayer(root)
if err != nil {
return nil, err
}
for level := 1; !db.Done(); level++ {
for i := 0; i < layerRepeat && !db.Done(); i++ {
next := h.NewUnixfsNode()
err := fillTrickleRec(db, next, level)
if err != nil {
return nil, err
}
err = root.AddChild(next, db)
if err != nil {
return nil, err
}
}
}
return db.Add(root)
}
func fillTrickleRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error {
// Always do this, even in the base case
err := db.FillNodeLayer(node)
if err != nil {
return err
}
for i := 1; i < depth && !db.Done(); i++ {
for j := 0; j < layerRepeat; j++ {
next := h.NewUnixfsNode()
err := fillTrickleRec(db, next, i)
if err != nil {
return err
}
err = node.AddChild(next, db)
if err != nil {
return err
}
}
}
return nil
}

View File

@ -1,91 +0,0 @@
package importer
import (
"io"
"github.com/jbenet/go-ipfs/importer/chunk"
dag "github.com/jbenet/go-ipfs/merkledag"
"github.com/jbenet/go-ipfs/pin"
)
// layerRepeat specifies how many times to append a child tree of a
// given depth. Higher values increase the width of a given node, which
// improves seek speeds.
const layerRepeat = 4
func BuildTrickleDagFromReader(r io.Reader, ds dag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*dag.Node, error) {
// Start the splitter
blkch := spl.Split(r)
// Create our builder helper
db := &dagBuilderHelper{
dserv: ds,
mp: mp,
in: blkch,
maxlinks: DefaultLinksPerBlock,
indrSize: defaultIndirectBlockDataSize(),
}
root := newUnixfsNode()
err := db.fillNodeRec(root, 1)
if err != nil {
return nil, err
}
for level := 1; !db.done(); level++ {
for i := 0; i < layerRepeat && !db.done(); i++ {
next := newUnixfsNode()
err := db.fillTrickleRec(next, level)
if err != nil {
return nil, err
}
err = root.addChild(next, db)
if err != nil {
return nil, err
}
}
}
rootnode, err := root.getDagNode()
if err != nil {
return nil, err
}
rootkey, err := ds.Add(rootnode)
if err != nil {
return nil, err
}
if mp != nil {
mp.PinWithMode(rootkey, pin.Recursive)
err := mp.Flush()
if err != nil {
return nil, err
}
}
return root.getDagNode()
}
func (db *dagBuilderHelper) fillTrickleRec(node *unixfsNode, depth int) error {
// Always do this, even in the base case
err := db.fillNodeRec(node, 1)
if err != nil {
return err
}
for i := 1; i < depth && !db.done(); i++ {
for j := 0; j < layerRepeat; j++ {
next := newUnixfsNode()
err := db.fillTrickleRec(next, i)
if err != nil {
return err
}
err = node.addChild(next, db)
if err != nil {
return err
}
}
}
return nil
}

View File

@ -1,4 +1,4 @@
package merkledag
package mdutils
import (
"testing"
@ -8,13 +8,14 @@ import (
"github.com/jbenet/go-ipfs/blocks/blockstore"
bsrv "github.com/jbenet/go-ipfs/blockservice"
"github.com/jbenet/go-ipfs/exchange/offline"
dag "github.com/jbenet/go-ipfs/merkledag"
)
func Mock(t testing.TB) DAGService {
func Mock(t testing.TB) dag.DAGService {
bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
bserv, err := bsrv.New(bstore, offline.Exchange(bstore))
if err != nil {
t.Fatal(err)
}
return NewDAGService(bserv)
return dag.NewDAGService(bserv)
}