1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-05-17 23:16:11 +08:00

unixfs: allow use of raw merkledag nodes for unixfs files

License: MIT
Signed-off-by: Jeromy <why@ipfs.io>
This commit is contained in:
Jeromy
2016-10-15 09:06:44 -07:00
parent 9cb6ac17a7
commit ded60a7356
24 changed files with 390 additions and 150 deletions

View File

@ -37,8 +37,11 @@ func NewBlock(data []byte) *BasicBlock {
// we are able to be confident that the data is correct
func NewBlockWithCid(data []byte, c *cid.Cid) (*BasicBlock, error) {
if u.Debug {
// TODO: fix assumptions
chkc := cid.NewCidV0(u.Hash(data))
chkc, err := c.Prefix().Sum(data)
if err != nil {
return nil, err
}
if !chkc.Equals(c) {
return nil, ErrWrongHash
}

View File

@ -23,15 +23,16 @@ import (
var ErrDepthLimitExceeded = fmt.Errorf("depth limit exceeded")
const (
quietOptionName = "quiet"
silentOptionName = "silent"
progressOptionName = "progress"
trickleOptionName = "trickle"
wrapOptionName = "wrap-with-directory"
hiddenOptionName = "hidden"
onlyHashOptionName = "only-hash"
chunkerOptionName = "chunker"
pinOptionName = "pin"
quietOptionName = "quiet"
silentOptionName = "silent"
progressOptionName = "progress"
trickleOptionName = "trickle"
wrapOptionName = "wrap-with-directory"
hiddenOptionName = "hidden"
onlyHashOptionName = "only-hash"
chunkerOptionName = "chunker"
pinOptionName = "pin"
rawLeavesOptionName = "raw-leaves"
)
var AddCmd = &cmds.Command{
@ -78,6 +79,7 @@ You can now refer to the added file in a gateway, like so:
cmds.BoolOption(hiddenOptionName, "H", "Include files that are hidden. Only takes effect on recursive add.").Default(false),
cmds.StringOption(chunkerOptionName, "s", "Chunking algorithm to use."),
cmds.BoolOption(pinOptionName, "Pin this object when adding.").Default(true),
cmds.BoolOption(rawLeavesOptionName, "Use raw blocks for leaf nodes. (experimental)"),
},
PreRun: func(req cmds.Request) error {
if quiet, _, _ := req.Option(quietOptionName).Bool(); quiet {
@ -135,6 +137,7 @@ You can now refer to the added file in a gateway, like so:
silent, _, _ := req.Option(silentOptionName).Bool()
chunker, _, _ := req.Option(chunkerOptionName).String()
dopin, _, _ := req.Option(pinOptionName).Bool()
rawblks, _, _ := req.Option(rawLeavesOptionName).Bool()
if hash {
nilnode, err := core.NewNode(n.Context(), &core.BuildCfg{
@ -174,6 +177,7 @@ You can now refer to the added file in a gateway, like so:
fileAdder.Wrap = wrap
fileAdder.Pin = dopin
fileAdder.Silent = silent
fileAdder.RawLeaves = rawblks
if hash {
md := dagtest.Mock()

View File

@ -1,6 +1,7 @@
package corehttp
import (
"context"
"errors"
"fmt"
"io"
@ -18,10 +19,10 @@ import (
path "github.com/ipfs/go-ipfs/path"
uio "github.com/ipfs/go-ipfs/unixfs/io"
"context"
routing "gx/ipfs/QmNUgVQTYnXQVrGT2rajZYsuKV8GYdiL91cdZSQDKNPNgE/go-libp2p-routing"
humanize "gx/ipfs/QmPSBJL4momYnE7DcUyk2DVhD6rH488ZmHBGLbxNdhU44K/go-humanize"
cid "gx/ipfs/QmXUuRadqDq5BuFWzVU6VuKaSjTcNm1gNCtLvvP1TJCW4z/go-cid"
node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node"
)
const (
@ -45,7 +46,7 @@ func newGatewayHandler(node *core.IpfsNode, conf GatewayConfig) *gatewayHandler
}
// TODO(cryptix): find these helpers somewhere else
func (i *gatewayHandler) newDagFromReader(r io.Reader) (*dag.ProtoNode, error) {
func (i *gatewayHandler) newDagFromReader(r io.Reader) (node.Node, error) {
// TODO(cryptix): change and remove this helper once PR1136 is merged
// return ufs.AddFromReader(i.node, r.Body)
return importer.BuildDagFromReader(
@ -353,7 +354,7 @@ func (i *gatewayHandler) putHandler(w http.ResponseWriter, r *http.Request) {
return
}
var newnode *dag.ProtoNode
var newnode node.Node
if rsegs[len(rsegs)-1] == "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn" {
newnode = uio.NewEmptyDirectory()
} else {
@ -417,8 +418,14 @@ func (i *gatewayHandler) putHandler(w http.ResponseWriter, r *http.Request) {
return
}
pbnewnode, ok := newnode.(*dag.ProtoNode)
if !ok {
webError(w, "Cannot read non protobuf nodes through gateway", dag.ErrNotProtobuf, http.StatusBadRequest)
return
}
// object set-data case
pbnd.SetData(newnode.Data())
pbnd.SetData(pbnewnode.Data())
newcid, err = i.node.DAG.Add(pbnd)
if err != nil {

View File

@ -1,6 +1,7 @@
package coreunix
import (
"context"
"fmt"
"io"
"io/ioutil"
@ -13,16 +14,18 @@ import (
"github.com/ipfs/go-ipfs/commands/files"
core "github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/exchange/offline"
importer "github.com/ipfs/go-ipfs/importer"
balanced "github.com/ipfs/go-ipfs/importer/balanced"
"github.com/ipfs/go-ipfs/importer/chunk"
ihelper "github.com/ipfs/go-ipfs/importer/helpers"
trickle "github.com/ipfs/go-ipfs/importer/trickle"
dag "github.com/ipfs/go-ipfs/merkledag"
mfs "github.com/ipfs/go-ipfs/mfs"
"github.com/ipfs/go-ipfs/pin"
unixfs "github.com/ipfs/go-ipfs/unixfs"
context "context"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
cid "gx/ipfs/QmXUuRadqDq5BuFWzVU6VuKaSjTcNm1gNCtLvvP1TJCW4z/go-cid"
node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node"
ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore"
syncds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync"
)
@ -97,6 +100,7 @@ type Adder struct {
Hidden bool
Pin bool
Trickle bool
RawLeaves bool
Silent bool
Wrap bool
Chunker string
@ -111,22 +115,22 @@ func (adder *Adder) SetMfsRoot(r *mfs.Root) {
}
// Perform the actual add & pin locally, outputting results to reader
func (adder Adder) add(reader io.Reader) (*dag.ProtoNode, error) {
func (adder Adder) add(reader io.Reader) (node.Node, error) {
chnk, err := chunk.FromString(reader, adder.Chunker)
if err != nil {
return nil, err
}
params := ihelper.DagBuilderParams{
Dagserv: adder.dagService,
RawLeaves: adder.RawLeaves,
Maxlinks: ihelper.DefaultLinksPerBlock,
}
if adder.Trickle {
return importer.BuildTrickleDagFromReader(
adder.dagService,
chnk,
)
return trickle.TrickleLayout(params.New(chnk))
}
return importer.BuildDagFromReader(
adder.dagService,
chnk,
)
return balanced.BalancedLayout(params.New(chnk))
}
func (adder *Adder) RootNode() (*dag.ProtoNode, error) {
@ -331,7 +335,7 @@ func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, *dag.Pr
return gopath.Join(c.String(), filename), dagnode, nil
}
func (adder *Adder) addNode(node *dag.ProtoNode, path string) error {
func (adder *Adder) addNode(node node.Node, path string) error {
// patch it into the root
if path == "" {
path = node.Cid().String()
@ -456,7 +460,7 @@ func (adder *Adder) maybePauseForGC() error {
}
// outputDagnode sends dagnode info over the output channel
func outputDagnode(out chan interface{}, name string, dn *dag.ProtoNode) error {
func outputDagnode(out chan interface{}, name string, dn node.Node) error {
if out == nil {
return nil
}
@ -482,7 +486,7 @@ func NewMemoryDagService() dag.DAGService {
}
// from core/commands/object.go
func getOutput(dagnode *dag.ProtoNode) (*Object, error) {
func getOutput(dagnode node.Node) (*Object, error) {
c := dagnode.Cid()
output := &Object{

View File

@ -24,6 +24,7 @@ import (
fstest "github.com/ipfs/go-ipfs/Godeps/_workspace/src/bazil.org/fuse/fs/fstestutil"
cid "gx/ipfs/QmXUuRadqDq5BuFWzVU6VuKaSjTcNm1gNCtLvvP1TJCW4z/go-cid"
node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node"
u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util"
)
@ -33,7 +34,7 @@ func maybeSkipFuseTests(t *testing.T) {
}
}
func randObj(t *testing.T, nd *core.IpfsNode, size int64) (*dag.ProtoNode, []byte) {
func randObj(t *testing.T, nd *core.IpfsNode, size int64) (node.Node, []byte) {
buf := make([]byte, size)
u.NewTimeSeededRand().Read(buf)
read := bytes.NewReader(buf)
@ -74,7 +75,7 @@ func TestIpfsBasicRead(t *testing.T) {
defer mnt.Close()
fi, data := randObj(t, nd, 10000)
k := fi.Key()
k := fi.Cid()
fname := path.Join(mnt.Dir, k.String())
rbuf, err := ioutil.ReadFile(fname)
if err != nil {
@ -254,7 +255,7 @@ func TestFileSizeReporting(t *testing.T) {
defer mnt.Close()
fi, data := randObj(t, nd, 10000)
k := fi.Key()
k := fi.Cid()
fname := path.Join(mnt.Dir, k.String())

View File

@ -28,7 +28,12 @@ func buildTestDag(ds dag.DAGService, spl chunk.Splitter) (*dag.ProtoNode, error)
Maxlinks: h.DefaultLinksPerBlock,
}
return BalancedLayout(dbp.New(spl))
nd, err := BalancedLayout(dbp.New(spl))
if err != nil {
return nil, err
}
return nd.(*dag.ProtoNode), nil
}
func getTestDag(t *testing.T, ds dag.DAGService, size int64, blksize int64) (*dag.ProtoNode, []byte) {

View File

@ -4,10 +4,11 @@ import (
"errors"
h "github.com/ipfs/go-ipfs/importer/helpers"
dag "github.com/ipfs/go-ipfs/merkledag"
node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node"
)
func BalancedLayout(db *h.DagBuilderHelper) (*dag.ProtoNode, error) {
func BalancedLayout(db *h.DagBuilderHelper) (node.Node, error) {
var root *h.UnixfsNode
for level := 0; !db.Done(); level++ {
@ -56,14 +57,21 @@ func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error {
// Base case
if depth <= 0 { // catch accidental -1's in case error above is removed.
return db.FillNodeWithData(node)
child, err := db.GetNextDataNode()
if err != nil {
return err
}
node.Set(child)
return nil
}
// while we have room AND we're not done
for node.NumChildren() < db.Maxlinks() && !db.Done() {
child := h.NewUnixfsNode()
if err := fillNodeRec(db, child, depth-1); err != nil {
err := fillNodeRec(db, child, depth-1)
if err != nil {
return err
}

View File

@ -3,23 +3,30 @@ package helpers
import (
"github.com/ipfs/go-ipfs/importer/chunk"
dag "github.com/ipfs/go-ipfs/merkledag"
node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node"
)
// DagBuilderHelper wraps together a bunch of objects needed to
// efficiently create unixfs dag trees
type DagBuilderHelper struct {
dserv dag.DAGService
spl chunk.Splitter
recvdErr error
nextData []byte // the next item to return.
maxlinks int
batch *dag.Batch
dserv dag.DAGService
spl chunk.Splitter
recvdErr error
rawLeaves bool
nextData []byte // the next item to return.
maxlinks int
batch *dag.Batch
}
type DagBuilderParams struct {
// Maximum number of links per intermediate node
Maxlinks int
// RawLeaves signifies that the importer should use raw ipld nodes as leaves
// instead of using the unixfs TRaw type
RawLeaves bool
// DAGService to write blocks to (required)
Dagserv dag.DAGService
}
@ -28,10 +35,11 @@ type DagBuilderParams struct {
// from chunks object
func (dbp *DagBuilderParams) New(spl chunk.Splitter) *DagBuilderHelper {
return &DagBuilderHelper{
dserv: dbp.Dagserv,
spl: spl,
maxlinks: dbp.Maxlinks,
batch: dbp.Dagserv.Batch(),
dserv: dbp.Dagserv,
spl: spl,
rawLeaves: dbp.RawLeaves,
maxlinks: dbp.Maxlinks,
batch: dbp.Dagserv.Batch(),
}
}
@ -78,9 +86,8 @@ func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error {
// while we have room AND we're not done
for node.NumChildren() < db.maxlinks && !db.Done() {
child := NewUnixfsBlock()
if err := db.FillNodeWithData(child); err != nil {
child, err := db.GetNextDataNode()
if err != nil {
return err
}
@ -92,21 +99,29 @@ func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error {
return nil
}
func (db *DagBuilderHelper) FillNodeWithData(node *UnixfsNode) error {
func (db *DagBuilderHelper) GetNextDataNode() (*UnixfsNode, error) {
data := db.Next()
if data == nil { // we're done!
return nil
return nil, nil
}
if len(data) > BlockSizeLimit {
return ErrSizeLimitExceeded
return nil, ErrSizeLimitExceeded
}
node.SetData(data)
return nil
if db.rawLeaves {
return &UnixfsNode{
rawnode: dag.NewRawNode(data),
raw: true,
}, nil
} else {
blk := NewUnixfsBlock()
blk.SetData(data)
return blk, nil
}
}
func (db *DagBuilderHelper) Add(node *UnixfsNode) (*dag.ProtoNode, error) {
func (db *DagBuilderHelper) Add(node *UnixfsNode) (node.Node, error) {
dn, err := node.GetDagNode()
if err != nil {
return nil, err

View File

@ -1,12 +1,14 @@
package helpers
import (
"context"
"fmt"
"context"
chunk "github.com/ipfs/go-ipfs/importer/chunk"
dag "github.com/ipfs/go-ipfs/merkledag"
ft "github.com/ipfs/go-ipfs/unixfs"
node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node"
)
// BlockSizeLimit specifies the maximum size an imported block can have.
@ -37,8 +39,10 @@ var ErrSizeLimitExceeded = fmt.Errorf("object size limit exceeded")
// UnixfsNode is a struct created to aid in the generation
// of unixfs DAG trees
type UnixfsNode struct {
node *dag.ProtoNode
ufmt *ft.FSNode
raw bool
rawnode *dag.RawNode
node *dag.ProtoNode
ufmt *ft.FSNode
}
// NewUnixfsNode creates a new Unixfs node to represent a file
@ -74,6 +78,15 @@ func (n *UnixfsNode) NumChildren() int {
return n.ufmt.NumChildren()
}
func (n *UnixfsNode) Set(other *UnixfsNode) {
n.node = other.node
n.raw = other.raw
n.rawnode = other.rawnode
if other.ufmt != nil {
n.ufmt.Data = other.ufmt.Data
}
}
func (n *UnixfsNode) GetChild(ctx context.Context, i int, ds dag.DAGService) (*UnixfsNode, error) {
nd, err := n.node.Links()[i].GetNode(ctx, ds)
if err != nil {
@ -126,7 +139,11 @@ func (n *UnixfsNode) SetData(data []byte) {
// getDagNode fills out the proper formatting for the unixfs node
// inside of a DAG node and returns the dag node
func (n *UnixfsNode) GetDagNode() (*dag.ProtoNode, error) {
func (n *UnixfsNode) GetDagNode() (node.Node, error) {
if n.raw {
return n.rawnode, nil
}
data, err := n.ufmt.GetBytes()
if err != nil {
return nil, err

View File

@ -12,14 +12,16 @@ import (
h "github.com/ipfs/go-ipfs/importer/helpers"
trickle "github.com/ipfs/go-ipfs/importer/trickle"
dag "github.com/ipfs/go-ipfs/merkledag"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node"
)
var log = logging.Logger("importer")
// Builds a DAG from the given file, writing created blocks to disk as they are
// created
func BuildDagFromFile(fpath string, ds dag.DAGService) (*dag.ProtoNode, error) {
func BuildDagFromFile(fpath string, ds dag.DAGService) (node.Node, error) {
stat, err := os.Lstat(fpath)
if err != nil {
return nil, err
@ -38,7 +40,7 @@ func BuildDagFromFile(fpath string, ds dag.DAGService) (*dag.ProtoNode, error) {
return BuildDagFromReader(ds, chunk.NewSizeSplitter(f, chunk.DefaultBlockSize))
}
func BuildDagFromReader(ds dag.DAGService, spl chunk.Splitter) (*dag.ProtoNode, error) {
func BuildDagFromReader(ds dag.DAGService, spl chunk.Splitter) (node.Node, error) {
dbp := h.DagBuilderParams{
Dagserv: ds,
Maxlinks: h.DefaultLinksPerBlock,
@ -47,7 +49,7 @@ func BuildDagFromReader(ds dag.DAGService, spl chunk.Splitter) (*dag.ProtoNode,
return bal.BalancedLayout(dbp.New(spl))
}
func BuildTrickleDagFromReader(ds dag.DAGService, spl chunk.Splitter) (*dag.ProtoNode, error) {
func BuildTrickleDagFromReader(ds dag.DAGService, spl chunk.Splitter) (node.Node, error) {
dbp := h.DagBuilderParams{
Dagserv: ds,
Maxlinks: h.DefaultLinksPerBlock,

View File

@ -2,19 +2,21 @@ package importer
import (
"bytes"
"context"
"io"
"io/ioutil"
"testing"
context "context"
chunk "github.com/ipfs/go-ipfs/importer/chunk"
dag "github.com/ipfs/go-ipfs/merkledag"
mdtest "github.com/ipfs/go-ipfs/merkledag/test"
uio "github.com/ipfs/go-ipfs/unixfs/io"
node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node"
u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util"
)
func getBalancedDag(t testing.TB, size int64, blksize int64) (*dag.ProtoNode, dag.DAGService) {
func getBalancedDag(t testing.TB, size int64, blksize int64) (node.Node, dag.DAGService) {
ds := mdtest.Mock()
r := io.LimitReader(u.NewTimeSeededRand(), size)
nd, err := BuildDagFromReader(ds, chunk.NewSizeSplitter(r, blksize))
@ -24,7 +26,7 @@ func getBalancedDag(t testing.TB, size int64, blksize int64) (*dag.ProtoNode, da
return nd, ds
}
func getTrickleDag(t testing.TB, size int64, blksize int64) (*dag.ProtoNode, dag.DAGService) {
func getTrickleDag(t testing.TB, size int64, blksize int64) (node.Node, dag.DAGService) {
ds := mdtest.Mock()
r := io.LimitReader(u.NewTimeSeededRand(), size)
nd, err := BuildTrickleDagFromReader(ds, chunk.NewSizeSplitter(r, blksize))
@ -100,7 +102,7 @@ func BenchmarkTrickleReadFull(b *testing.B) {
runReadBench(b, nd, ds)
}
func runReadBench(b *testing.B, nd *dag.ProtoNode, ds dag.DAGService) {
func runReadBench(b *testing.B, nd node.Node, ds dag.DAGService) {
for i := 0; i < b.N; i++ {
ctx, cancel := context.WithCancel(context.Background())
read, err := uio.NewDagReader(ctx, nd, ds)

View File

@ -2,6 +2,7 @@ package trickle
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
@ -9,7 +10,6 @@ import (
"os"
"testing"
"context"
chunk "github.com/ipfs/go-ipfs/importer/chunk"
h "github.com/ipfs/go-ipfs/importer/helpers"
merkledag "github.com/ipfs/go-ipfs/merkledag"
@ -17,6 +17,7 @@ import (
pin "github.com/ipfs/go-ipfs/pin"
ft "github.com/ipfs/go-ipfs/unixfs"
uio "github.com/ipfs/go-ipfs/unixfs/io"
u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util"
)
@ -31,7 +32,12 @@ func buildTestDag(ds merkledag.DAGService, spl chunk.Splitter) (*merkledag.Proto
return nil, err
}
return nd, VerifyTrickleDagStructure(nd, ds, dbp.Maxlinks, layerRepeat)
pbnd, ok := nd.(*merkledag.ProtoNode)
if !ok {
return nil, merkledag.ErrNotProtobuf
}
return pbnd, VerifyTrickleDagStructure(pbnd, ds, dbp.Maxlinks, layerRepeat)
}
//Test where calls to read are smaller than the chunk size

View File

@ -8,6 +8,8 @@ import (
h "github.com/ipfs/go-ipfs/importer/helpers"
dag "github.com/ipfs/go-ipfs/merkledag"
ft "github.com/ipfs/go-ipfs/unixfs"
node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node"
)
// layerRepeat specifies how many times to append a child tree of a
@ -15,7 +17,7 @@ import (
// improves seek speeds.
const layerRepeat = 4
func TrickleLayout(db *h.DagBuilderHelper) (*dag.ProtoNode, error) {
func TrickleLayout(db *h.DagBuilderHelper) (node.Node, error) {
root := h.NewUnixfsNode()
if err := db.FillNodeLayer(root); err != nil {
return nil, err
@ -66,7 +68,12 @@ func fillTrickleRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error
}
// TrickleAppend appends the data in `db` to the dag, using the Trickledag format
func TrickleAppend(ctx context.Context, base *dag.ProtoNode, db *h.DagBuilderHelper) (out *dag.ProtoNode, err_out error) {
func TrickleAppend(ctx context.Context, basen node.Node, db *h.DagBuilderHelper) (out node.Node, err_out error) {
base, ok := basen.(*dag.ProtoNode)
if !ok {
return nil, dag.ErrNotProtobuf
}
defer func() {
if err_out == nil {
if err := db.Close(); err != nil {
@ -229,8 +236,13 @@ func trickleDepthInfo(node *h.UnixfsNode, maxlinks int) (int, int) {
// VerifyTrickleDagStructure checks that the given dag matches exactly the trickle dag datastructure
// layout
func VerifyTrickleDagStructure(nd *dag.ProtoNode, ds dag.DAGService, direct int, layerRepeat int) error {
return verifyTDagRec(nd, -1, direct, layerRepeat, ds)
func VerifyTrickleDagStructure(nd node.Node, ds dag.DAGService, direct int, layerRepeat int) error {
pbnd, ok := nd.(*dag.ProtoNode)
if !ok {
return dag.ErrNotProtobuf
}
return verifyTDagRec(pbnd, -1, direct, layerRepeat, ds)
}
// Recursive call for verifying the structure of a trickledag

View File

@ -85,23 +85,29 @@ func (n *dagService) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
return nil, fmt.Errorf("Failed to get block for %s: %v", c, err)
}
var res node.Node
return decodeBlock(b)
}
func decodeBlock(b blocks.Block) (node.Node, error) {
c := b.Cid()
switch c.Type() {
case cid.Protobuf:
out, err := DecodeProtobuf(b.RawData())
decnd, err := DecodeProtobuf(b.RawData())
if err != nil {
if strings.Contains(err.Error(), "Unmarshal failed") {
return nil, fmt.Errorf("The block referred to by '%s' was not a valid merkledag node", c)
}
return nil, fmt.Errorf("Failed to decode Protocol Buffers: %v", err)
}
out.cached = c
res = out
default:
return nil, fmt.Errorf("unrecognized formatting type")
}
return res, nil
decnd.cached = b.Cid()
return decnd, nil
case cid.Raw:
return NewRawNode(b.RawData()), nil
default:
return nil, fmt.Errorf("unrecognized object type: %s", c.Type())
}
}
func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
@ -164,24 +170,12 @@ func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *Node
return
}
c := b.Cid()
var nd node.Node
switch c.Type() {
case cid.Protobuf:
decnd, err := DecodeProtobuf(b.RawData())
if err != nil {
out <- &NodeOption{Err: err}
return
}
decnd.cached = b.Cid()
nd = decnd
default:
out <- &NodeOption{Err: fmt.Errorf("unrecognized object type: %s", c.Type())}
nd, err := decodeBlock(b)
if err != nil {
out <- &NodeOption{Err: err}
return
}
// buffered, no need to select
out <- &NodeOption{Node: nd}
count++

View File

@ -373,3 +373,81 @@ func TestBasicAddGet(t *testing.T) {
t.Fatal("output didnt match input")
}
}
func TestGetRawNodes(t *testing.T) {
rn := NewRawNode([]byte("test"))
ds := dstest.Mock()
c, err := ds.Add(rn)
if err != nil {
t.Fatal(err)
}
if !c.Equals(rn.Cid()) {
t.Fatal("output cids didnt match")
}
out, err := ds.Get(context.TODO(), c)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(out.RawData(), []byte("test")) {
t.Fatal("raw block should match input data")
}
if out.Links() != nil {
t.Fatal("raw blocks shouldnt have links")
}
if out.Tree() != nil {
t.Fatal("tree should return no paths in a raw block")
}
size, err := out.Size()
if err != nil {
t.Fatal(err)
}
if size != 4 {
t.Fatal("expected size to be 4")
}
ns, err := out.Stat()
if err != nil {
t.Fatal(err)
}
if ns.DataSize != 4 {
t.Fatal("expected size to be 4, got: ", ns.DataSize)
}
_, _, err = out.Resolve([]string{"foo"})
if err != ErrLinkNotFound {
t.Fatal("shouldnt find links under raw blocks")
}
}
func TestProtoNodeResolve(t *testing.T) {
nd := new(ProtoNode)
nd.SetLinks([]*node.Link{{Name: "foo"}})
lnk, left, err := nd.Resolve([]string{"foo", "bar"})
if err != nil {
t.Fatal(err)
}
if len(left) != 1 || left[0] != "bar" {
t.Fatal("expected the single path element 'bar' to remain")
}
if lnk.Name != "foo" {
t.Fatal("how did we get anything else?")
}
tvals := nd.Tree()
if len(tvals) != 1 || tvals[0] != "foo" {
t.Fatal("expected tree to return []{\"foo\"}")
}
}

View File

@ -36,7 +36,7 @@ func NodeWithData(d []byte) *ProtoNode {
}
// AddNodeLink adds a link to another node.
func (n *ProtoNode) AddNodeLink(name string, that *ProtoNode) error {
func (n *ProtoNode) AddNodeLink(name string, that node.Node) error {
n.encoded = nil
lnk, err := node.MakeLink(that)

46
merkledag/raw.go Normal file
View File

@ -0,0 +1,46 @@
package merkledag
import (
"github.com/ipfs/go-ipfs/blocks"
cid "gx/ipfs/QmXUuRadqDq5BuFWzVU6VuKaSjTcNm1gNCtLvvP1TJCW4z/go-cid"
node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node"
u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util"
)
type RawNode struct {
blocks.Block
}
func NewRawNode(data []byte) *RawNode {
h := u.Hash(data)
c := cid.NewCidV1(cid.Raw, h)
blk, _ := blocks.NewBlockWithCid(data, c)
return &RawNode{blk}
}
func (rn *RawNode) Links() []*node.Link {
return nil
}
func (rn *RawNode) Resolve(path []string) (*node.Link, []string, error) {
return nil, nil, ErrLinkNotFound
}
func (rn *RawNode) Tree() []string {
return nil
}
func (rn *RawNode) Size() (uint64, error) {
return uint64(len(rn.RawData())), nil
}
func (rn *RawNode) Stat() (*node.NodeStat, error) {
return &node.NodeStat{
CumulativeSize: len(rn.RawData()),
DataSize: len(rn.RawData()),
}, nil
}
var _ node.Node = (*RawNode)(nil)

View File

@ -1,17 +1,18 @@
package dagutils
import (
"context"
"errors"
context "context"
ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore"
syncds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync"
bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
bserv "github.com/ipfs/go-ipfs/blockservice"
offline "github.com/ipfs/go-ipfs/exchange/offline"
dag "github.com/ipfs/go-ipfs/merkledag"
path "github.com/ipfs/go-ipfs/path"
node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node"
ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore"
syncds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync"
)
type Editor struct {
@ -50,7 +51,7 @@ func (e *Editor) GetDagService() dag.DAGService {
return e.tmp
}
func addLink(ctx context.Context, ds dag.DAGService, root *dag.ProtoNode, childname string, childnd *dag.ProtoNode) (*dag.ProtoNode, error) {
func addLink(ctx context.Context, ds dag.DAGService, root *dag.ProtoNode, childname string, childnd node.Node) (*dag.ProtoNode, error) {
if childname == "" {
return nil, errors.New("cannot create link with no name!")
}
@ -76,7 +77,7 @@ func addLink(ctx context.Context, ds dag.DAGService, root *dag.ProtoNode, childn
return root, nil
}
func (e *Editor) InsertNodeAtPath(ctx context.Context, pth string, toinsert *dag.ProtoNode, create func() *dag.ProtoNode) error {
func (e *Editor) InsertNodeAtPath(ctx context.Context, pth string, toinsert node.Node, create func() *dag.ProtoNode) error {
splpath := path.SplitList(pth)
nd, err := e.insertNodeAtPath(ctx, e.root, splpath, toinsert, create)
if err != nil {
@ -86,7 +87,7 @@ func (e *Editor) InsertNodeAtPath(ctx context.Context, pth string, toinsert *dag
return nil
}
func (e *Editor) insertNodeAtPath(ctx context.Context, root *dag.ProtoNode, path []string, toinsert *dag.ProtoNode, create func() *dag.ProtoNode) (*dag.ProtoNode, error) {
func (e *Editor) insertNodeAtPath(ctx context.Context, root *dag.ProtoNode, path []string, toinsert node.Node, create func() *dag.ProtoNode) (*dag.ProtoNode, error) {
if len(path) == 1 {
return addLink(ctx, e.tmp, root, path[0], toinsert)
}

View File

@ -1,6 +1,7 @@
package mfs
import (
"context"
"errors"
"fmt"
"os"
@ -9,11 +10,11 @@ import (
"sync"
"time"
context "context"
dag "github.com/ipfs/go-ipfs/merkledag"
ft "github.com/ipfs/go-ipfs/unixfs"
ufspb "github.com/ipfs/go-ipfs/unixfs/pb"
node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node"
)
var ErrNotYetImplemented = errors.New("not yet implemented")
@ -323,7 +324,7 @@ func (d *Directory) Flush() error {
}
// AddChild adds the node 'nd' under this directory giving it the name 'name'
func (d *Directory) AddChild(name string, nd *dag.ProtoNode) error {
func (d *Directory) AddChild(name string, nd node.Node) error {
d.lock.Lock()
defer d.lock.Unlock()

View File

@ -42,12 +42,12 @@ func getDagserv(t *testing.T) dag.DAGService {
return dag.NewDAGService(blockserv)
}
func getRandFile(t *testing.T, ds dag.DAGService, size int64) *dag.ProtoNode {
func getRandFile(t *testing.T, ds dag.DAGService, size int64) node.Node {
r := io.LimitReader(u.NewTimeSeededRand(), size)
return fileNodeFromReader(t, ds, r)
}
func fileNodeFromReader(t *testing.T, ds dag.DAGService, r io.Reader) *dag.ProtoNode {
func fileNodeFromReader(t *testing.T, ds dag.DAGService, r io.Reader) node.Node {
nd, err := importer.BuildDagFromReader(ds, chunk.DefaultSplitter(r))
if err != nil {
t.Fatal(err)
@ -125,7 +125,12 @@ func compStrArrs(a, b []string) bool {
return true
}
func assertFileAtPath(ds dag.DAGService, root *Directory, exp *dag.ProtoNode, pth string) error {
func assertFileAtPath(ds dag.DAGService, root *Directory, expn node.Node, pth string) error {
exp, ok := expn.(*dag.ProtoNode)
if !ok {
return dag.ErrNotProtobuf
}
parts := path.SplitList(pth)
cur := root
for i, d := range parts[:len(parts)-1] {

View File

@ -7,8 +7,9 @@ import (
gopath "path"
"strings"
dag "github.com/ipfs/go-ipfs/merkledag"
path "github.com/ipfs/go-ipfs/path"
node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node"
)
// Mv moves the file or directory at 'src' to 'dst'
@ -87,7 +88,7 @@ func lookupDir(r *Root, path string) (*Directory, error) {
}
// PutNode inserts 'nd' at 'path' in the given mfs
func PutNode(r *Root, path string, nd *dag.ProtoNode) error {
func PutNode(r *Root, path string, nd node.Node) error {
dirp, filename := gopath.Split(path)
if filename == "" {
return fmt.Errorf("cannot create file with empty name")

View File

@ -2,17 +2,18 @@ package io
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"context"
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
mdag "github.com/ipfs/go-ipfs/merkledag"
ft "github.com/ipfs/go-ipfs/unixfs"
ftpb "github.com/ipfs/go-ipfs/unixfs/pb"
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node"
)
var ErrIsDir = errors.New("this dag node is a directory")
@ -58,36 +59,45 @@ type ReadSeekCloser interface {
// NewDagReader creates a new reader object that reads the data represented by
// the given node, using the passed in DAGService for data retreival
func NewDagReader(ctx context.Context, n *mdag.ProtoNode, serv mdag.DAGService) (*DagReader, error) {
pb := new(ftpb.Data)
if err := proto.Unmarshal(n.Data(), pb); err != nil {
return nil, err
}
switch pb.GetType() {
case ftpb.Data_Directory:
// Dont allow reading directories
return nil, ErrIsDir
case ftpb.Data_File, ftpb.Data_Raw:
return NewDataFileReader(ctx, n, pb, serv), nil
case ftpb.Data_Metadata:
if len(n.Links()) == 0 {
return nil, errors.New("incorrectly formatted metadata object")
}
child, err := n.Links()[0].GetNode(ctx, serv)
if err != nil {
func NewDagReader(ctx context.Context, n node.Node, serv mdag.DAGService) (*DagReader, error) {
switch n := n.(type) {
case *mdag.RawNode:
return &DagReader{
buf: NewRSNCFromBytes(n.RawData()),
}, nil
case *mdag.ProtoNode:
pb := new(ftpb.Data)
if err := proto.Unmarshal(n.Data(), pb); err != nil {
return nil, err
}
childpb, ok := child.(*mdag.ProtoNode)
if !ok {
return nil, mdag.ErrNotProtobuf
switch pb.GetType() {
case ftpb.Data_Directory:
// Dont allow reading directories
return nil, ErrIsDir
case ftpb.Data_File, ftpb.Data_Raw:
return NewDataFileReader(ctx, n, pb, serv), nil
case ftpb.Data_Metadata:
if len(n.Links()) == 0 {
return nil, errors.New("incorrectly formatted metadata object")
}
child, err := n.Links()[0].GetNode(ctx, serv)
if err != nil {
return nil, err
}
childpb, ok := child.(*mdag.ProtoNode)
if !ok {
return nil, mdag.ErrNotProtobuf
}
return NewDagReader(ctx, childpb, serv)
case ftpb.Data_Symlink:
return nil, ErrCantReadSymlinks
default:
return nil, ft.ErrUnrecognizedType
}
return NewDagReader(ctx, childpb, serv)
case ftpb.Data_Symlink:
return nil, ErrCantReadSymlinks
default:
return nil, ft.ErrUnrecognizedType
return nil, fmt.Errorf("unrecognized node type")
}
}

View File

@ -2,6 +2,7 @@ package mod
import (
"bytes"
"context"
"errors"
"io"
"os"
@ -13,10 +14,10 @@ import (
ft "github.com/ipfs/go-ipfs/unixfs"
uio "github.com/ipfs/go-ipfs/unixfs/io"
context "context"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
cid "gx/ipfs/QmXUuRadqDq5BuFWzVU6VuKaSjTcNm1gNCtLvvP1TJCW4z/go-cid"
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node"
)
var ErrSeekFail = errors.New("failed to seek properly")
@ -45,9 +46,14 @@ type DagModifier struct {
read *uio.DagReader
}
func NewDagModifier(ctx context.Context, from *mdag.ProtoNode, serv mdag.DAGService, spl chunk.SplitterGen) (*DagModifier, error) {
func NewDagModifier(ctx context.Context, from node.Node, serv mdag.DAGService, spl chunk.SplitterGen) (*DagModifier, error) {
pbn, ok := from.(*mdag.ProtoNode)
if !ok {
return nil, mdag.ErrNotProtobuf
}
return &DagModifier{
curNode: from.Copy(),
curNode: pbn.Copy(),
dagserv: serv,
splitter: spl,
ctx: ctx,
@ -109,7 +115,13 @@ func (dm *DagModifier) expandSparse(size int64) error {
if err != nil {
return err
}
dm.curNode = nnode
pbnnode, ok := nnode.(*mdag.ProtoNode)
if !ok {
return mdag.ErrNotProtobuf
}
dm.curNode = pbnnode
return nil
}
@ -197,7 +209,12 @@ func (dm *DagModifier) Sync() error {
return err
}
dm.curNode = nd
pbnode, ok := nd.(*mdag.ProtoNode)
if !ok {
return mdag.ErrNotProtobuf
}
dm.curNode = pbnode
}
dm.writeStart += uint64(buflen)
@ -288,7 +305,7 @@ func (dm *DagModifier) modifyDag(node *mdag.ProtoNode, offset uint64, data io.Re
}
// appendData appends the blocks from the given chan to the end of this dag
func (dm *DagModifier) appendData(node *mdag.ProtoNode, spl chunk.Splitter) (*mdag.ProtoNode, error) {
func (dm *DagModifier) appendData(node *mdag.ProtoNode, spl chunk.Splitter) (node.Node, error) {
dbp := &help.DagBuilderParams{
Dagserv: dm.dagserv,
Maxlinks: help.DefaultLinksPerBlock,

View File

@ -2,6 +2,7 @@ package testu
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
@ -13,7 +14,7 @@ import (
mdagmock "github.com/ipfs/go-ipfs/merkledag/test"
ft "github.com/ipfs/go-ipfs/unixfs"
context "context"
node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node"
u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util"
)
@ -27,7 +28,7 @@ func GetDAGServ() mdag.DAGService {
return mdagmock.Mock()
}
func GetNode(t testing.TB, dserv mdag.DAGService, data []byte) *mdag.ProtoNode {
func GetNode(t testing.TB, dserv mdag.DAGService, data []byte) node.Node {
in := bytes.NewReader(data)
node, err := imp.BuildTrickleDagFromReader(dserv, SizeSplitterGen(500)(in))
if err != nil {
@ -37,11 +38,11 @@ func GetNode(t testing.TB, dserv mdag.DAGService, data []byte) *mdag.ProtoNode {
return node
}
func GetEmptyNode(t testing.TB, dserv mdag.DAGService) *mdag.ProtoNode {
func GetEmptyNode(t testing.TB, dserv mdag.DAGService) node.Node {
return GetNode(t, dserv, []byte{})
}
func GetRandomNode(t testing.TB, dserv mdag.DAGService, size int64) ([]byte, *mdag.ProtoNode) {
func GetRandomNode(t testing.TB, dserv mdag.DAGService, size int64) ([]byte, node.Node) {
in := io.LimitReader(u.NewTimeSeededRand(), size)
buf, err := ioutil.ReadAll(in)
if err != nil {