1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-30 09:59:13 +08:00

Merge pull request #5118 from schomatis/feat/importer/remove-unixfsnode

importer: remove `UnixfsNode` from the balanced builder
This commit is contained in:
Whyrusleeping
2018-07-16 18:34:03 +02:00
committed by GitHub
3 changed files with 446 additions and 105 deletions

View File

@ -1,113 +1,255 @@
// Package balanced provides methods to build balanced DAGs.
// In a balanced DAG, nodes are added to a single root
// until the maximum number of links is reached (with leaves
// being at depth 0). Then, a new root is created, and points to the
// old root, and incorporates a new child, which proceeds to be
// filled up (link) to more leaves. In all cases, the Data (chunks)
// is stored only at the leaves, with the rest of nodes only
// storing links to their children.
// Package balanced provides methods to build balanced DAGs, which are generalistic
// DAGs in which all leaves (nodes representing chunks of data) are at the same
// distance from the root. Nodes can have only a maximum number of children; to be
// able to store more leaf data nodes balanced DAGs are extended by increasing its
// depth (and having more intermediary nodes).
//
// In a balanced DAG, nodes fill their link capacity before
// creating new ones, thus depth only increases when the
// current tree is completely full.
// Internal nodes are always represented by UnixFS nodes (of type `File`) encoded
// inside DAG nodes (see the `go-ipfs/unixfs` package for details of UnixFS). In
// contrast, leaf nodes with data have multiple possible representations: UnixFS
// nodes as above, raw nodes with just the file data (no format) and Filestore
// nodes (that directly link to the file on disk using a format stored on a raw
// node, see the `go-ipfs/filestore` package for details of Filestore.)
//
// In the case the entire file fits into just one node it will be formatted as a
// (single) leaf node (without parent) with the possible representations already
// mentioned. This is the only scenario where the root can be of a type different
// that the UnixFS node.
//
// +-------------+
// | Root 4 |
// +-------------+
// |
// +--------------------------+----------------------------+
// | |
// +-------------+ +-------------+
// | Node 2 | | Node 5 |
// +-------------+ +-------------+
// | |
// +-------------+-------------+ +-------------+
// | | |
// +-------------+ +-------------+ +-------------+
// | Node 1 | | Node 3 | | Node 6 |
// +-------------+ +-------------+ +-------------+
// | | |
// +------+------+ +------+------+ +------+
// | | | | |
// +=========+ +=========+ +=========+ +=========+ +=========+
// | Chunk 1 | | Chunk 2 | | Chunk 3 | | Chunk 4 | | Chunk 5 |
// +=========+ +=========+ +=========+ +=========+ +=========+
//
// Balanced DAGs are generalistic DAGs in which all leaves
// are at the same distance from the root.
package balanced
import (
"errors"
h "github.com/ipfs/go-ipfs/importer/helpers"
ft "github.com/ipfs/go-ipfs/unixfs"
ipld "gx/ipfs/QmWi2BYBL5gJ3CiAiQchg6rn1A8iBsrWy51EYxvHVjFvLb/go-ipld-format"
)
// Layout builds a balanced DAG. Data is stored at the leaves
// and depth only increases when the tree is full, that is, when
// the root node has reached the maximum number of links.
// Layout builds a balanced DAG layout. In a balanced DAG of depth 1, leaf nodes
// with data are added to a single `root` until the maximum number of links is
// reached. Then, to continue adding more data leaf nodes, a `newRoot` is created
// pointing to the old `root` (which will now become and intermediary node),
// increasing the depth of the DAG to 2. This will increase the maximum number of
// data leaf nodes the DAG can have (`Maxlinks() ^ depth`). The `fillNodeRec`
// function will add more intermediary child nodes to `newRoot` (which already has
// `root` as child) that in turn will have leaf nodes with data added to them.
// After that process is completed (the maximum number of links is reached),
// `fillNodeRec` will return and the loop will be repeated: the `newRoot` created
// will become the old `root` and a new root will be created again to increase the
// depth of the DAG. The process is repeated until there is no more data to add
// (i.e. the DagBuilderHelpers Done() function returns true).
//
// The nodes are filled recursively, so the DAG is built from the bottom up. Leaf
// nodes are created first using the chunked file data and its size. The size is
// then bubbled up to the parent (internal) node, which aggregates all the sizes of
// its children and bubbles that combined size up to its parent, and so on up to
// the root. This way, a balanced DAG acts like a B-tree when seeking to a byte
// offset in the file the graph represents: each internal node uses the file size
// of its children as an index when seeking.
//
// `Layout` creates a root and hands it off to be filled:
//
// +-------------+
// | Root 1 |
// +-------------+
// |
// ( fillNodeRec fills in the )
// ( chunks on the root. )
// |
// +------+------+
// | |
// + - - - - + + - - - - +
// | Chunk 1 | | Chunk 2 |
// + - - - - + + - - - - +
//
// ↓
// When the root is full but there's more data...
// ↓
//
// +-------------+
// | Root 1 |
// +-------------+
// |
// +------+------+
// | |
// +=========+ +=========+ + - - - - +
// | Chunk 1 | | Chunk 2 | | Chunk 3 |
// +=========+ +=========+ + - - - - +
//
// ↓
// ...Layout's job is to create a new root.
// ↓
//
// +-------------+
// | Root 2 |
// +-------------+
// |
// +-------------+ - - - - - - - - +
// | |
// +-------------+ ( fillNodeRec creates the )
// | Node 1 | ( branch that connects )
// +-------------+ ( "Root 2" to "Chunk 3." )
// | |
// +------+------+ + - - - - -+
// | | |
// +=========+ +=========+ + - - - - +
// | Chunk 1 | | Chunk 2 | | Chunk 3 |
// +=========+ +=========+ + - - - - +
//
func Layout(db *h.DagBuilderHelper) (ipld.Node, error) {
var offset uint64
var root *h.UnixfsNode
for level := 0; !db.Done(); level++ {
if db.Done() {
// No data, return just an empty node.
root, err := db.NewLeafNode(nil)
if err != nil {
return nil, err
}
// This works without Filestore support (`ProcessFileStore`).
// TODO: Why? Is there a test case missing?
nroot := db.NewUnixfsNode()
db.SetPosInfo(nroot, 0)
return db.AddNodeAndClose(root)
}
// add our old root as a child of the new root.
if root != nil { // nil if it's the first node.
if err := nroot.AddChild(root, db); err != nil {
return nil, err
// The first `root` will be a single leaf node with data
// (corner case), after that subsequent `root` nodes will
// always be internal nodes (with a depth > 0) that can
// be handled by the loop.
root, fileSize, err := db.NewLeafDataNode()
if err != nil {
return nil, err
}
// Each time a DAG of a certain `depth` is filled (because it
// has reached its maximum capacity of `db.Maxlinks()` per node)
// extend it by making it a sub-DAG of a bigger DAG with `depth+1`.
for depth := 1; !db.Done(); depth++ {
// Add the old `root` as a child of the `newRoot`.
newRoot := db.NewFSNodeOverDag(ft.TFile)
newRoot.AddChild(root, fileSize, db)
// Fill the `newRoot` (that has the old `root` already as child)
// and make it the current `root` for the next iteration (when
// it will become "old").
root, fileSize, err = fillNodeRec(db, newRoot, depth)
if err != nil {
return nil, err
}
}
return db.AddNodeAndClose(root)
}
// fillNodeRec will "fill" the given internal (non-leaf) `node` with data by
// adding child nodes to it, either leaf data nodes (if `depth` is 1) or more
// internal nodes with higher depth (and calling itself recursively on them
// until *they* are filled with data). The data to fill the node with is
// provided by DagBuilderHelper.
//
// `node` represents a (sub-)DAG root that is being filled. If called recursively,
// it is `nil`, a new node is created. If it has been called from `Layout` (see
// diagram below) it points to the new root (that increases the depth of the DAG),
// it already has a child (the old root). New children will be added to this new
// root, and those children will in turn be filled (calling `fillNodeRec`
// recursively).
//
// +-------------+
// | `node` |
// | (new root) |
// +-------------+
// |
// +-------------+ - - - - - - + - - - - - - - - - - - +
// | | |
// +--------------+ + - - - - - + + - - - - - +
// | (old root) | | new child | | |
// +--------------+ + - - - - - + + - - - - - +
// | | |
// +------+------+ + - - + - - - +
// | | | |
// +=========+ +=========+ + - - - - + + - - - - +
// | Chunk 1 | | Chunk 2 | | Chunk 3 | | Chunk 4 |
// +=========+ +=========+ + - - - - + + - - - - +
//
// The `node` to be filled uses the `FSNodeOverDag` abstraction that allows adding
// child nodes without packing/unpacking the UnixFS layer node (having an internal
// `ft.FSNode` cache).
//
// It returns the `ipld.Node` representation of the passed `node` filled with
// children and the `nodeFileSize` with the total size of the file chunk (leaf)
// nodes stored under this node (parent nodes store this to enable efficient
// seeking through the DAG when reading data later).
//
// warning: **children** pinned indirectly, but input node IS NOT pinned.
func fillNodeRec(db *h.DagBuilderHelper, node *h.FSNodeOverDag, depth int) (filledNode ipld.Node, nodeFileSize uint64, err error) {
if depth < 1 {
return nil, 0, errors.New("attempt to fillNode at depth < 1")
}
if node == nil {
node = db.NewFSNodeOverDag(ft.TFile)
}
// Child node created on every iteration to add to parent `node`.
// It can be a leaf node or another internal node.
var childNode ipld.Node
// File size from the child node needed to update the `FSNode`
// in `node` when adding the child.
var childFileSize uint64
// While we have room and there is data available to be added.
for node.NumChildren() < db.Maxlinks() && !db.Done() {
if depth == 1 {
// Base case: add leaf node with data.
childNode, childFileSize, err = db.NewLeafDataNode()
if err != nil {
return nil, 0, err
}
} else {
// Recursion case: create an internal node to in turn keep
// descending in the DAG and adding child nodes to it.
childNode, childFileSize, err = fillNodeRec(db, nil, depth-1)
if err != nil {
return nil, 0, err
}
}
// fill it up.
if err := fillNodeRec(db, nroot, level, offset); err != nil {
return nil, err
}
offset = nroot.FileSize()
root = nroot
}
if root == nil {
// this should only happen with an empty node, so return a leaf
var err error
root, err = db.NewLeaf(nil)
err = node.AddChild(childNode, childFileSize, db)
if err != nil {
return nil, err
return nil, 0, err
}
}
out, err := db.Add(root)
nodeFileSize = node.FileSize()
// Get the final `dag.ProtoNode` with the `FSNode` data encoded inside.
filledNode, err = node.Commit()
if err != nil {
return nil, err
return nil, 0, err
}
err = db.Close()
if err != nil {
return nil, err
}
return out, nil
}
// fillNodeRec will fill the given node with data from the dagBuilders input
// source down to an indirection depth as specified by 'depth'
// it returns the total dataSize of the node, and a potential error
//
// warning: **children** pinned indirectly, but input node IS NOT pinned.
func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int, offset uint64) error {
if depth < 0 {
return errors.New("attempt to fillNode at depth < 0")
}
// Base case
if depth <= 0 { // catch accidental -1's in case error above is removed.
child, err := db.GetNextDataNode()
if err != nil {
return err
}
node.Set(child)
return nil
}
// while we have room AND we're not done
for node.NumChildren() < db.Maxlinks() && !db.Done() {
child := db.NewUnixfsNode()
db.SetPosInfo(child, offset)
err := fillNodeRec(db, child, depth-1, offset)
if err != nil {
return err
}
if err := node.AddChild(child, db); err != nil {
return err
}
offset += child.FileSize()
}
return nil
return filledNode, nodeFileSize, nil
}

View File

@ -7,7 +7,9 @@ import (
dag "github.com/ipfs/go-ipfs/merkledag"
ft "github.com/ipfs/go-ipfs/unixfs"
pb "github.com/ipfs/go-ipfs/unixfs/pb"
pi "gx/ipfs/QmUWsXLvYYDAaoAt9TPZpFX4ffHHMg46AHrz1ZLTN5ABbe/go-ipfs-posinfo"
ipld "gx/ipfs/QmWi2BYBL5gJ3CiAiQchg6rn1A8iBsrWy51EYxvHVjFvLb/go-ipld-format"
chunker "gx/ipfs/QmXnzH7wowyLZy8XJxxaQCVTgLMcDXdMBznmsrmQWCyiQV/go-ipfs-chunker"
cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid"
@ -24,9 +26,21 @@ type DagBuilderHelper struct {
nextData []byte // the next item to return.
maxlinks int
batch *ipld.Batch
fullPath string
stat os.FileInfo
prefix *cid.Prefix
// Filestore support variables.
// ----------------------------
// TODO: Encapsulate in `FilestoreNode` (which is basically what they are).
//
// Besides having the path this variable (if set) is used as a flag
// to indicate that Filestore should be used.
fullPath string
stat os.FileInfo
// Keeps track of the current file size added to the DAG (used in
// the balanced builder). It is assumed that the `DagBuilderHelper`
// is not reused to construct another DAG, but a new one (with a
// zero `offset`) is created.
offset uint64
}
// DagBuilderParams wraps configuration options to create a DagBuilderHelper
@ -131,6 +145,11 @@ func (db *DagBuilderHelper) NewUnixfsNode() *UnixfsNode {
return n
}
// GetPrefix returns the internal `cid.Prefix` set in the builder.
func (db *DagBuilderHelper) GetPrefix() *cid.Prefix {
return db.prefix
}
// NewLeaf creates a leaf node filled with data. If rawLeaves is
// defined than a raw leaf will be returned. Otherwise, if data is
// nil the type field will be TRaw (for backwards compatibility), if
@ -166,6 +185,44 @@ func (db *DagBuilderHelper) NewLeaf(data []byte) (*UnixfsNode, error) {
return blk, nil
}
// NewLeafNode is a variation from `NewLeaf` (see its description) that
// returns an `ipld.Node` instead.
func (db *DagBuilderHelper) NewLeafNode(data []byte) (ipld.Node, error) {
if len(data) > BlockSizeLimit {
return nil, ErrSizeLimitExceeded
}
if db.rawLeaves {
// Encapsulate the data in a raw node.
if db.prefix == nil {
return dag.NewRawNode(data), nil
}
rawnode, err := dag.NewRawNodeWPrefix(data, *db.prefix)
if err != nil {
return nil, err
}
return rawnode, nil
}
// Encapsulate the data in UnixFS node (instead of a raw node).
fsNodeOverDag := db.NewFSNodeOverDag(ft.TFile)
fsNodeOverDag.SetFileData(data)
node, err := fsNodeOverDag.Commit()
if err != nil {
return nil, err
}
// TODO: Encapsulate this sequence of calls into a function that
// just returns the final `ipld.Node` avoiding going through
// `FSNodeOverDag`.
// TODO: Using `TFile` for backwards-compatibility, a bug in the
// balanced builder was causing the leaf nodes to be generated
// with this type instead of `TRaw`, the one that should be used
// (like the trickle builder does).
// (See https://github.com/ipfs/go-ipfs/pull/5120.)
return node, nil
}
// newUnixfsBlock creates a new Unixfs node to represent a raw data block
func (db *DagBuilderHelper) newUnixfsBlock() *UnixfsNode {
n := &UnixfsNode{
@ -211,12 +268,63 @@ func (db *DagBuilderHelper) GetNextDataNode() (*UnixfsNode, error) {
return db.NewLeaf(data)
}
// SetPosInfo sets the offset information of a node using the fullpath and stat
// from the DagBuilderHelper.
func (db *DagBuilderHelper) SetPosInfo(node *UnixfsNode, offset uint64) {
if db.fullPath != "" {
node.SetPosInfo(offset, db.fullPath, db.stat)
// NewLeafDataNode is a variation of `GetNextDataNode` that returns
// an `ipld.Node` instead. It builds the `node` with the data obtained
// from the Splitter and returns it with the `dataSize` (that will be
// used to keep track of the DAG file size). The size of the data is
// computed here because after that it will be hidden by `NewLeafNode`
// inside a generic `ipld.Node` representation.
func (db *DagBuilderHelper) NewLeafDataNode() (node ipld.Node, dataSize uint64, err error) {
fileData, err := db.Next()
if err != nil {
return nil, 0, err
}
dataSize = uint64(len(fileData))
// Create a new leaf node containing the file chunk data.
node, err = db.NewLeafNode(fileData)
if err != nil {
return nil, 0, err
}
// Convert this leaf to a `FilestoreNode` if needed.
node = db.ProcessFileStore(node, dataSize)
return node, dataSize, nil
}
// ProcessFileStore generates, if Filestore is being used, the
// `FilestoreNode` representation of the `ipld.Node` that
// contains the file data. If Filestore is not being used just
// return the same node to continue with its addition to the DAG.
//
// The `db.offset` is updated at this point (instead of when
// `NewLeafDataNode` is called, both work in tandem but the
// offset is more related to this function).
func (db *DagBuilderHelper) ProcessFileStore(node ipld.Node, dataSize uint64) ipld.Node {
// Check if Filestore is being used.
if db.fullPath != "" {
// Check if the node is actually a raw node (needed for
// Filestore support).
if _, ok := node.(*dag.RawNode); ok {
fn := &pi.FilestoreNode{
Node: node,
PosInfo: &pi.PosInfo{
Offset: db.offset,
FullPath: db.fullPath,
Stat: db.stat,
},
}
// Update `offset` with the size of the data generated by `db.Next`.
db.offset += dataSize
return fn
}
}
// Filestore is not used, return the same `node` argument.
return node
}
// Add sends a node to the DAGService, and returns it.
@ -246,3 +354,105 @@ func (db *DagBuilderHelper) Maxlinks() int {
func (db *DagBuilderHelper) Close() error {
return db.batch.Commit()
}
// AddNodeAndClose adds the last `ipld.Node` from the DAG and
// closes the builder. It returns the same `node` passed as
// argument.
func (db *DagBuilderHelper) AddNodeAndClose(node ipld.Node) (ipld.Node, error) {
err := db.batch.Add(node)
if err != nil {
return nil, err
}
err = db.Close()
if err != nil {
return nil, err
}
return node, nil
}
// FSNodeOverDag encapsulates an `unixfs.FSNode` that will be stored in a
// `dag.ProtoNode`. Instead of just having a single `ipld.Node` that
// would need to be constantly (un)packed to access and modify its
// internal `FSNode` in the process of creating a UnixFS DAG, this
// structure stores an `FSNode` cache to manipulate it (add child nodes)
// directly , and only when the node has reached its final (immutable) state
// (signaled by calling `Commit()`) is it committed to a single (indivisible)
// `ipld.Node`.
//
// It is used mainly for internal (non-leaf) nodes, and for some
// representations of data leaf nodes (that don't use raw nodes or
// Filestore).
//
// It aims to replace the `UnixfsNode` structure which encapsulated too
// many possible node state combinations.
//
// TODO: Revisit the name.
type FSNodeOverDag struct {
dag *dag.ProtoNode
file *ft.FSNode
}
// NewFSNodeOverDag creates a new `dag.ProtoNode` and `ft.FSNode`
// decoupled from one onther (and will continue in that way until
// `Commit` is called), with `fsNodeType` specifying the type of
// the UnixFS layer node (either `File` or `Raw`).
func (db *DagBuilderHelper) NewFSNodeOverDag(fsNodeType pb.Data_DataType) *FSNodeOverDag {
node := new(FSNodeOverDag)
node.dag = new(dag.ProtoNode)
node.dag.SetPrefix(db.GetPrefix())
node.file = ft.NewFSNode(fsNodeType)
return node
}
// AddChild adds a `child` `ipld.Node` to both node layers. The
// `dag.ProtoNode` creates a link to the child node while the
// `ft.FSNode` stores its file size (that is, not the size of the
// node but the size of the file data that it is storing at the
// UnixFS layer). The child is also stored in the `DAGService`.
func (n *FSNodeOverDag) AddChild(child ipld.Node, fileSize uint64, db *DagBuilderHelper) error {
err := n.dag.AddNodeLink("", child)
if err != nil {
return err
}
n.file.AddBlockSize(fileSize)
return db.batch.Add(child)
}
// Commit unifies (resolves) the cache nodes into a single `ipld.Node`
// that represents them: the `ft.FSNode` is encoded inside the
// `dag.ProtoNode`.
//
// TODO: Evaluate making it read-only after committing.
func (n *FSNodeOverDag) Commit() (ipld.Node, error) {
fileData, err := n.file.GetBytes()
if err != nil {
return nil, err
}
n.dag.SetData(fileData)
return n.dag, nil
}
// NumChildren returns the number of children of the `ft.FSNode`.
func (n *FSNodeOverDag) NumChildren() int {
return n.file.NumChildren()
}
// FileSize returns the `Filesize` attribute from the underlying
// representation of the `ft.FSNode`.
func (n *FSNodeOverDag) FileSize() uint64 {
return n.file.FileSize()
}
// SetFileData stores the `fileData` in the `ft.FSNode`. It
// should be used only when `FSNodeOverDag` represents a leaf
// node (internal nodes don't carry data, just file sizes).
func (n *FSNodeOverDag) SetFileData(fileData []byte) {
n.file.SetData(fileData)
}

View File

@ -70,17 +70,6 @@ func (n *UnixfsNode) NumChildren() int {
return n.ufmt.NumChildren()
}
// Set replaces the current UnixfsNode with another one. It performs
// a shallow copy.
func (n *UnixfsNode) Set(other *UnixfsNode) {
n.node = other.node
n.raw = other.raw
n.rawnode = other.rawnode
if other.ufmt != nil {
n.ufmt.SetData(other.ufmt.Data())
}
}
// GetChild gets the ith child of this node from the given DAGService.
func (n *UnixfsNode) GetChild(ctx context.Context, i int, ds ipld.DAGService) (*UnixfsNode, error) {
nd, err := n.node.Links()[i].GetNode(ctx, ds)