1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-09-10 14:34:24 +08:00

remove dagwriter in favor of new importer function

This commit is contained in:
Jeromy
2014-10-31 03:20:26 +00:00
parent 01a583ea7f
commit ae339a0320
9 changed files with 83 additions and 314 deletions

2
Godeps/Godeps.json generated
View File

@ -1,6 +1,6 @@
{
"ImportPath": "github.com/jbenet/go-ipfs",
"GoVersion": "go1.3",
"GoVersion": "go1.3.3",
"Packages": [
"./..."
],

View File

@ -10,11 +10,9 @@ import (
"github.com/jbenet/go-ipfs/core"
"github.com/jbenet/go-ipfs/importer"
"github.com/jbenet/go-ipfs/importer/chunk"
dag "github.com/jbenet/go-ipfs/merkledag"
"github.com/jbenet/go-ipfs/pin"
ft "github.com/jbenet/go-ipfs/unixfs"
uio "github.com/jbenet/go-ipfs/unixfs/io"
)
// Error indicating the max depth has been exceded.
@ -90,14 +88,12 @@ func addDir(n *core.IpfsNode, fpath string, depth int, out io.Writer) (*dag.Node
}
func addFile(n *core.IpfsNode, fpath string, depth int, out io.Writer) (*dag.Node, error) {
dw := uio.NewDagWriter(n.DAG, chunk.DefaultSplitter)
mp, ok := n.Pinning.(pin.ManualPinner)
if !ok {
return nil, errors.New("invalid pinner type! expected manual pinner")
}
dw.Pinner = mp
root, err := importer.ImportFileDag(fpath, dw)
root, err := importer.BuildDagFromFile(fpath, n.DAG, mp)
if err != nil {
return nil, err
}

View File

@ -7,8 +7,8 @@ import (
"github.com/jbenet/go-ipfs/importer/chunk"
dag "github.com/jbenet/go-ipfs/merkledag"
"github.com/jbenet/go-ipfs/pin"
ft "github.com/jbenet/go-ipfs/unixfs"
uio "github.com/jbenet/go-ipfs/unixfs/io"
"github.com/jbenet/go-ipfs/util"
)
@ -74,7 +74,7 @@ func NewDagFromFile(fpath string) (*dag.Node, error) {
return NewDagFromReader(f)
}
func ImportFileDag(fpath string, dw *uio.DagWriter) (*dag.Node, error) {
func BuildDagFromFile(fpath string, ds dag.DAGService, mp pin.ManualPinner) (*dag.Node, error) {
stat, err := os.Stat(fpath)
if err != nil {
return nil, err
@ -90,17 +90,50 @@ func ImportFileDag(fpath string, dw *uio.DagWriter) (*dag.Node, error) {
}
defer f.Close()
return ImportReaderDag(f, dw)
return BuildDagFromReader(f, ds, mp, chunk.DefaultSplitter)
}
func ImportReaderDag(r io.Reader, dw *uio.DagWriter) (*dag.Node, error) {
_, err := io.Copy(dw, r)
func BuildDagFromReader(r io.Reader, ds dag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*dag.Node, error) {
blkChan := spl.Split(r)
first := <-blkChan
root := &dag.Node{}
mbf := new(ft.MultiBlock)
for blk := range blkChan {
// Store the block size in the root node
mbf.AddBlockSize(uint64(len(blk)))
node := &dag.Node{Data: ft.WrapData(blk)}
nk, err := ds.Add(node)
if mp != nil {
mp.PinWithMode(nk, pin.Indirect)
}
if err != nil {
return nil, err
}
err = dw.Close()
// Add a link to this node without storing a reference to the memory
err = root.AddNodeLinkClean("", node)
if err != nil {
return nil, err
}
return dw.GetNode(), nil
}
// Generate the root node data
mbf.Data = first
data, err := mbf.GetBytes()
if err != nil {
return nil, err
}
root.Data = data
// Add root node to the dagservice
rootk, err := ds.Add(root)
if err != nil {
return nil, err
}
if mp != nil {
mp.PinWithMode(rootk, pin.Recursive)
}
return root, nil
}

View File

@ -9,7 +9,10 @@ import (
"os"
"testing"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
bsrv "github.com/jbenet/go-ipfs/blockservice"
"github.com/jbenet/go-ipfs/importer/chunk"
dag "github.com/jbenet/go-ipfs/merkledag"
uio "github.com/jbenet/go-ipfs/unixfs/io"
)
@ -77,6 +80,37 @@ func testFileConsistency(t *testing.T, bs chunk.BlockSplitter, nbytes int) {
}
}
func TestBuilderConsistency(t *testing.T) {
nbytes := 100000
dstore := ds.NewMapDatastore()
bserv, err := bsrv.NewBlockService(dstore, nil)
if err != nil {
t.Fatal(err)
}
dagserv := dag.NewDAGService(bserv)
buf := new(bytes.Buffer)
io.CopyN(buf, rand.Reader, int64(nbytes))
should := dup(buf.Bytes())
nd, err := BuildDagFromReader(buf, dagserv, nil, chunk.DefaultSplitter)
if err != nil {
t.Fatal(err)
}
r, err := uio.NewDagReader(nd, dagserv)
if err != nil {
t.Fatal(err)
}
out, err := ioutil.ReadAll(r)
if err != nil {
t.Fatal(err)
}
err = arrComp(out, should)
if err != nil {
t.Fatal(err)
}
}
func arrComp(a, b []byte) error {
if len(a) != len(b) {
return fmt.Errorf("Arrays differ in length. %d != %d", len(a), len(b))

View File

@ -207,7 +207,7 @@ func (r *dhtQueryRunner) spawnWorkers() {
}
func (r *dhtQueryRunner) queryPeer(p peer.Peer) {
log.Debugf("spawned worker for: %v\n", p)
log.Debugf("spawned worker for: %v", p)
// make sure we rate limit concurrency.
select {
@ -256,7 +256,7 @@ func (r *dhtQueryRunner) queryPeer(p peer.Peer) {
r.cancel() // signal to everyone that we're done.
} else if res.closerPeers != nil {
log.Debugf("PEERS CLOSER -- worker for: %v\n", p)
log.Debugf("PEERS CLOSER -- worker for: %v", p)
for _, next := range res.closerPeers {
r.addPeerToQuery(next, p)
}

View File

@ -13,6 +13,8 @@ import (
u "github.com/jbenet/go-ipfs/util"
)
var log = u.Logger("dagio")
// DagModifier is the only struct licensed and able to correctly
// perform surgery on a DAG 'file'
// Dear god, please rename this to something more pleasant

View File

@ -7,6 +7,7 @@ import (
"testing"
bs "github.com/jbenet/go-ipfs/blockservice"
imp "github.com/jbenet/go-ipfs/importer"
"github.com/jbenet/go-ipfs/importer/chunk"
mdag "github.com/jbenet/go-ipfs/merkledag"
ft "github.com/jbenet/go-ipfs/unixfs"
@ -26,22 +27,11 @@ func getMockDagServ(t *testing.T) mdag.DAGService {
}
func getNode(t *testing.T, dserv mdag.DAGService, size int64) ([]byte, *mdag.Node) {
dw := NewDagWriter(dserv, &chunk.SizeSplitter{500})
n, err := io.CopyN(dw, u.NewTimeSeededRand(), size)
in := io.LimitReader(u.NewTimeSeededRand(), size)
node, err := imp.BuildDagFromReader(in, dserv, nil, &chunk.SizeSplitter{500})
if err != nil {
t.Fatal(err)
}
if n != size {
t.Fatal("Incorrect copy amount!")
}
err = dw.Close()
if err != nil {
t.Fatal("DagWriter failed to close,", err)
}
node := dw.GetNode()
dr, err := NewDagReader(node, dserv)
if err != nil {

View File

@ -1,115 +0,0 @@
package io
import (
"github.com/jbenet/go-ipfs/importer/chunk"
dag "github.com/jbenet/go-ipfs/merkledag"
"github.com/jbenet/go-ipfs/pin"
ft "github.com/jbenet/go-ipfs/unixfs"
"github.com/jbenet/go-ipfs/util"
)
var log = util.Logger("dagwriter")
type DagWriter struct {
dagserv dag.DAGService
node *dag.Node
totalSize int64
splChan chan []byte
done chan struct{}
splitter chunk.BlockSplitter
seterr error
Pinner pin.ManualPinner
}
func NewDagWriter(ds dag.DAGService, splitter chunk.BlockSplitter) *DagWriter {
dw := new(DagWriter)
dw.dagserv = ds
dw.splChan = make(chan []byte, 8)
dw.splitter = splitter
dw.done = make(chan struct{})
go dw.startSplitter()
return dw
}
// startSplitter manages splitting incoming bytes and
// creating dag nodes from them. Created nodes are stored
// in the DAGService and then released to the GC.
func (dw *DagWriter) startSplitter() {
// Since the splitter functions take a reader (and should!)
// we wrap our byte chan input in a reader
r := util.NewByteChanReader(dw.splChan)
blkchan := dw.splitter.Split(r)
// First data block is reserved for storage in the root node
first := <-blkchan
mbf := new(ft.MultiBlock)
root := new(dag.Node)
for blkData := range blkchan {
// Store the block size in the root node
mbf.AddBlockSize(uint64(len(blkData)))
node := &dag.Node{Data: ft.WrapData(blkData)}
nk, err := dw.dagserv.Add(node)
if dw.Pinner != nil {
dw.Pinner.PinWithMode(nk, pin.Indirect)
}
if err != nil {
dw.seterr = err
log.Critical("Got error adding created node to dagservice: %s", err)
return
}
// Add a link to this node without storing a reference to the memory
err = root.AddNodeLinkClean("", node)
if err != nil {
dw.seterr = err
log.Critical("Got error adding created node to root node: %s", err)
return
}
}
// Generate the root node data
mbf.Data = first
data, err := mbf.GetBytes()
if err != nil {
dw.seterr = err
log.Critical("Failed generating bytes for multiblock file: %s", err)
return
}
root.Data = data
// Add root node to the dagservice
rootk, err := dw.dagserv.Add(root)
if err != nil {
dw.seterr = err
log.Critical("Got error adding created node to dagservice: %s", err)
return
}
if dw.Pinner != nil {
dw.Pinner.PinWithMode(rootk, pin.Recursive)
}
dw.node = root
dw.done <- struct{}{}
}
func (dw *DagWriter) Write(b []byte) (int, error) {
if dw.seterr != nil {
return 0, dw.seterr
}
dw.splChan <- b
return len(b), nil
}
// Close the splitters input channel and wait for it to finish
// Must be called to finish up splitting, otherwise split method
// will never halt
func (dw *DagWriter) Close() error {
close(dw.splChan)
<-dw.done
return nil
}
func (dw *DagWriter) GetNode() *dag.Node {
return dw.node
}

View File

@ -1,171 +0,0 @@
package io_test
import (
"testing"
"io"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
bs "github.com/jbenet/go-ipfs/blockservice"
importer "github.com/jbenet/go-ipfs/importer"
chunk "github.com/jbenet/go-ipfs/importer/chunk"
mdag "github.com/jbenet/go-ipfs/merkledag"
dagio "github.com/jbenet/go-ipfs/unixfs/io"
)
type datasource struct {
i int
}
func (d *datasource) Read(b []byte) (int, error) {
for i, _ := range b {
b[i] = byte(d.i % 256)
d.i++
}
return len(b), nil
}
func (d *datasource) Matches(t *testing.T, r io.Reader, length int) bool {
b := make([]byte, 100)
i := 0
for {
n, err := r.Read(b)
if err != nil && err != io.EOF {
t.Fatal(err)
}
for _, v := range b[:n] {
if v != byte(i%256) {
t.Fatalf("Buffers differed at byte: %d (%d != %d)", i, v, (i % 256))
}
i++
}
if err == io.EOF {
break
}
}
if i != length {
t.Fatalf("Incorrect length. (%d != %d)", i, length)
}
return true
}
func TestDagWriter(t *testing.T) {
dstore := ds.NewMapDatastore()
bserv, err := bs.NewBlockService(dstore, nil)
if err != nil {
t.Fatal(err)
}
dag := mdag.NewDAGService(bserv)
dw := dagio.NewDagWriter(dag, &chunk.SizeSplitter{Size: 4096})
nbytes := int64(1024 * 1024 * 2)
n, err := io.CopyN(dw, &datasource{}, nbytes)
if err != nil {
t.Fatal(err)
}
if n != nbytes {
t.Fatal("Copied incorrect amount of bytes!")
}
dw.Close()
node := dw.GetNode()
read, err := dagio.NewDagReader(node, dag)
if err != nil {
t.Fatal(err)
}
d := &datasource{}
if !d.Matches(t, read, int(nbytes)) {
t.Fatal("Failed to validate!")
}
}
func TestMassiveWrite(t *testing.T) {
t.SkipNow()
dstore := ds.NewNullDatastore()
bserv, err := bs.NewBlockService(dstore, nil)
if err != nil {
t.Fatal(err)
}
dag := mdag.NewDAGService(bserv)
dw := dagio.NewDagWriter(dag, &chunk.SizeSplitter{Size: 4096})
nbytes := int64(1024 * 1024 * 1024 * 16)
n, err := io.CopyN(dw, &datasource{}, nbytes)
if err != nil {
t.Fatal(err)
}
if n != nbytes {
t.Fatal("Incorrect copy size.")
}
dw.Close()
}
func BenchmarkDagWriter(b *testing.B) {
dstore := ds.NewNullDatastore()
bserv, err := bs.NewBlockService(dstore, nil)
if err != nil {
b.Fatal(err)
}
dag := mdag.NewDAGService(bserv)
b.ResetTimer()
nbytes := int64(100000)
for i := 0; i < b.N; i++ {
b.SetBytes(nbytes)
dw := dagio.NewDagWriter(dag, &chunk.SizeSplitter{Size: 4096})
n, err := io.CopyN(dw, &datasource{}, nbytes)
if err != nil {
b.Fatal(err)
}
if n != nbytes {
b.Fatal("Incorrect copy size.")
}
dw.Close()
}
}
func TestAgainstImporter(t *testing.T) {
dstore := ds.NewMapDatastore()
bserv, err := bs.NewBlockService(dstore, nil)
if err != nil {
t.Fatal(err)
}
dag := mdag.NewDAGService(bserv)
nbytes := int64(1024 * 1024 * 2)
// DagWriter
dw := dagio.NewDagWriter(dag, &chunk.SizeSplitter{4096})
n, err := io.CopyN(dw, &datasource{}, nbytes)
if err != nil {
t.Fatal(err)
}
if n != nbytes {
t.Fatal("Copied incorrect amount of bytes!")
}
dw.Close()
dwNode := dw.GetNode()
dwKey, err := dwNode.Key()
if err != nil {
t.Fatal(err)
}
// DagFromFile
rl := &io.LimitedReader{&datasource{}, nbytes}
dffNode, err := importer.NewDagFromReaderWithSplitter(rl, &chunk.SizeSplitter{4096})
dffKey, err := dffNode.Key()
if err != nil {
t.Fatal(err)
}
if dwKey.String() != dffKey.String() {
t.Errorf("\nDagWriter produced %s\n"+
"DagFromReader produced %s",
dwKey, dffKey)
}
}