mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-30 09:59:13 +08:00
Allow dagmodifier to be created (but not used) with raw nodes
License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com>
This commit is contained in:
@ -4,6 +4,7 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
chunk "github.com/ipfs/go-ipfs/importer/chunk"
|
chunk "github.com/ipfs/go-ipfs/importer/chunk"
|
||||||
@ -29,7 +30,7 @@ var writebufferSize = 1 << 21
|
|||||||
// Dear god, please rename this to something more pleasant
|
// Dear god, please rename this to something more pleasant
|
||||||
type DagModifier struct {
|
type DagModifier struct {
|
||||||
dagserv mdag.DAGService
|
dagserv mdag.DAGService
|
||||||
curNode *mdag.ProtoNode
|
curNode node.Node
|
||||||
|
|
||||||
splitter chunk.SplitterGen
|
splitter chunk.SplitterGen
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
@ -42,14 +43,18 @@ type DagModifier struct {
|
|||||||
read uio.DagReader
|
read uio.DagReader
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var ErrNotUnixfs = fmt.Errorf("dagmodifier only supports unixfs nodes (proto or raw)")
|
||||||
|
|
||||||
func NewDagModifier(ctx context.Context, from node.Node, serv mdag.DAGService, spl chunk.SplitterGen) (*DagModifier, error) {
|
func NewDagModifier(ctx context.Context, from node.Node, serv mdag.DAGService, spl chunk.SplitterGen) (*DagModifier, error) {
|
||||||
pbn, ok := from.(*mdag.ProtoNode)
|
switch from.(type) {
|
||||||
if !ok {
|
case *mdag.ProtoNode, *mdag.RawNode:
|
||||||
return nil, mdag.ErrNotProtobuf
|
// ok
|
||||||
|
default:
|
||||||
|
return nil, ErrNotUnixfs
|
||||||
}
|
}
|
||||||
|
|
||||||
return &DagModifier{
|
return &DagModifier{
|
||||||
curNode: pbn.Copy().(*mdag.ProtoNode),
|
curNode: from.Copy(),
|
||||||
dagserv: serv,
|
dagserv: serv,
|
||||||
splitter: spl,
|
splitter: spl,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
@ -144,8 +149,15 @@ func (dm *DagModifier) Write(b []byte) (int, error) {
|
|||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var ErrNoRawYet = fmt.Errorf("currently only fully support protonodes in the dagmodifier")
|
||||||
|
|
||||||
func (dm *DagModifier) Size() (int64, error) {
|
func (dm *DagModifier) Size() (int64, error) {
|
||||||
pbn, err := ft.FromBytes(dm.curNode.Data())
|
pbnd, ok := dm.curNode.(*mdag.ProtoNode)
|
||||||
|
if !ok {
|
||||||
|
return 0, ErrNoRawYet
|
||||||
|
}
|
||||||
|
|
||||||
|
pbn, err := ft.FromBytes(pbnd.Data())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
@ -222,7 +234,12 @@ func (dm *DagModifier) Sync() error {
|
|||||||
// modifyDag writes the data in 'data' over the data in 'node' starting at 'offset'
|
// modifyDag writes the data in 'data' over the data in 'node' starting at 'offset'
|
||||||
// returns the new key of the passed in node and whether or not all the data in the reader
|
// returns the new key of the passed in node and whether or not all the data in the reader
|
||||||
// has been consumed.
|
// has been consumed.
|
||||||
func (dm *DagModifier) modifyDag(node *mdag.ProtoNode, offset uint64, data io.Reader) (*cid.Cid, bool, error) {
|
func (dm *DagModifier) modifyDag(n node.Node, offset uint64, data io.Reader) (*cid.Cid, bool, error) {
|
||||||
|
node, ok := n.(*mdag.ProtoNode)
|
||||||
|
if !ok {
|
||||||
|
return nil, false, ErrNoRawYet
|
||||||
|
}
|
||||||
|
|
||||||
f, err := ft.FromBytes(node.Data())
|
f, err := ft.FromBytes(node.Data())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, false, err
|
return nil, false, err
|
||||||
@ -301,13 +318,26 @@ func (dm *DagModifier) modifyDag(node *mdag.ProtoNode, offset uint64, data io.Re
|
|||||||
}
|
}
|
||||||
|
|
||||||
// appendData appends the blocks from the given chan to the end of this dag
|
// appendData appends the blocks from the given chan to the end of this dag
|
||||||
func (dm *DagModifier) appendData(node *mdag.ProtoNode, spl chunk.Splitter) (node.Node, error) {
|
func (dm *DagModifier) appendData(nd node.Node, spl chunk.Splitter) (node.Node, error) {
|
||||||
|
|
||||||
|
var root *mdag.ProtoNode
|
||||||
|
switch nd := nd.(type) {
|
||||||
|
case *mdag.ProtoNode:
|
||||||
|
root = nd
|
||||||
|
case *mdag.RawNode:
|
||||||
|
// TODO: be able to append to rawnodes. Probably requires making this
|
||||||
|
// node a child of a unxifs intermediate node and passing it down
|
||||||
|
return nil, fmt.Errorf("appending to raw node types not yet supported")
|
||||||
|
default:
|
||||||
|
return nil, ErrNotUnixfs
|
||||||
|
}
|
||||||
|
|
||||||
dbp := &help.DagBuilderParams{
|
dbp := &help.DagBuilderParams{
|
||||||
Dagserv: dm.dagserv,
|
Dagserv: dm.dagserv,
|
||||||
Maxlinks: help.DefaultLinksPerBlock,
|
Maxlinks: help.DefaultLinksPerBlock,
|
||||||
}
|
}
|
||||||
|
|
||||||
return trickle.TrickleAppend(dm.ctx, node, dbp.New(spl))
|
return trickle.TrickleAppend(dm.ctx, root, dbp.New(spl))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read data from this dag starting at the current offset
|
// Read data from this dag starting at the current offset
|
||||||
@ -452,7 +482,12 @@ func (dm *DagModifier) Truncate(size int64) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// dagTruncate truncates the given node to 'size' and returns the modified Node
|
// dagTruncate truncates the given node to 'size' and returns the modified Node
|
||||||
func dagTruncate(ctx context.Context, nd *mdag.ProtoNode, size uint64, ds mdag.DAGService) (*mdag.ProtoNode, error) {
|
func dagTruncate(ctx context.Context, n node.Node, size uint64, ds mdag.DAGService) (*mdag.ProtoNode, error) {
|
||||||
|
nd, ok := n.(*mdag.ProtoNode)
|
||||||
|
if !ok {
|
||||||
|
return nil, ErrNoRawYet
|
||||||
|
}
|
||||||
|
|
||||||
if len(nd.Links()) == 0 {
|
if len(nd.Links()) == 0 {
|
||||||
// TODO: this can likely be done without marshaling and remarshaling
|
// TODO: this can likely be done without marshaling and remarshaling
|
||||||
pbn, err := ft.FromBytes(nd.Data())
|
pbn, err := ft.FromBytes(nd.Data())
|
||||||
|
Reference in New Issue
Block a user