mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-24 22:38:27 +08:00
Merge pull request #5189 from schomatis/feat/pbdagreader/use-fsnode
pbdagreader: use FSNode instead of protobuf structure
This commit is contained in:
@ -77,7 +77,7 @@ func (n *UnixfsNode) Set(other *UnixfsNode) {
|
||||
n.raw = other.raw
|
||||
n.rawnode = other.rawnode
|
||||
if other.ufmt != nil {
|
||||
n.ufmt.SetData(other.ufmt.GetData())
|
||||
n.ufmt.SetData(other.ufmt.Data())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -60,7 +60,7 @@ func (fi *File) Open(flags int, sync bool) (FileDescriptor, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch fsn.GetType() {
|
||||
switch fsn.Type() {
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported fsnode type for 'file'")
|
||||
case ft.TSymlink:
|
||||
|
@ -852,7 +852,7 @@ func TestFlushing(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if fsnode.GetType() != ft.TDirectory {
|
||||
if fsnode.Type() != ft.TDirectory {
|
||||
t.Fatal("root wasnt a directory")
|
||||
}
|
||||
|
||||
|
@ -16,7 +16,6 @@ import (
|
||||
upb "github.com/ipfs/go-ipfs/unixfs/pb"
|
||||
|
||||
ipld "gx/ipfs/QmWi2BYBL5gJ3CiAiQchg6rn1A8iBsrWy51EYxvHVjFvLb/go-ipld-format"
|
||||
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
|
||||
)
|
||||
|
||||
// Writer is a utility structure that helps to write
|
||||
@ -57,12 +56,12 @@ func (w *Writer) writeDir(nd *mdag.ProtoNode, fpath string) error {
|
||||
})
|
||||
}
|
||||
|
||||
func (w *Writer) writeFile(nd *mdag.ProtoNode, pb *upb.Data, fpath string) error {
|
||||
if err := writeFileHeader(w.TarW, fpath, pb.GetFilesize()); err != nil {
|
||||
func (w *Writer) writeFile(nd *mdag.ProtoNode, fsNode *ft.FSNode, fpath string) error {
|
||||
if err := writeFileHeader(w.TarW, fpath, fsNode.FileSize()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dagr := uio.NewPBFileReader(w.ctx, nd, pb, w.Dag)
|
||||
dagr := uio.NewPBFileReader(w.ctx, nd, fsNode, w.Dag)
|
||||
if _, err := dagr.WriteTo(w.TarW); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -74,12 +73,12 @@ func (w *Writer) writeFile(nd *mdag.ProtoNode, pb *upb.Data, fpath string) error
|
||||
func (w *Writer) WriteNode(nd ipld.Node, fpath string) error {
|
||||
switch nd := nd.(type) {
|
||||
case *mdag.ProtoNode:
|
||||
pb := new(upb.Data)
|
||||
if err := proto.Unmarshal(nd.Data(), pb); err != nil {
|
||||
fsNode, err := ft.FSNodeFromBytes(nd.Data())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch pb.GetType() {
|
||||
switch fsNode.Type() {
|
||||
case upb.Data_Metadata:
|
||||
fallthrough
|
||||
case upb.Data_Directory, upb.Data_HAMTShard:
|
||||
@ -87,9 +86,9 @@ func (w *Writer) WriteNode(nd ipld.Node, fpath string) error {
|
||||
case upb.Data_Raw:
|
||||
fallthrough
|
||||
case upb.Data_File:
|
||||
return w.writeFile(nd, pb, fpath)
|
||||
return w.writeFile(nd, fsNode, fpath)
|
||||
case upb.Data_Symlink:
|
||||
return writeSymlinkHeader(w.TarW, string(pb.GetData()), fpath)
|
||||
return writeSymlinkHeader(w.TarW, string(fsNode.Data()), fpath)
|
||||
default:
|
||||
return ft.ErrUnrecognizedType
|
||||
}
|
||||
|
@ -11,7 +11,6 @@ import (
|
||||
ftpb "github.com/ipfs/go-ipfs/unixfs/pb"
|
||||
|
||||
ipld "gx/ipfs/QmWi2BYBL5gJ3CiAiQchg6rn1A8iBsrWy51EYxvHVjFvLb/go-ipld-format"
|
||||
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
|
||||
)
|
||||
|
||||
// Common errors
|
||||
@ -44,17 +43,17 @@ func NewDagReader(ctx context.Context, n ipld.Node, serv ipld.NodeGetter) (DagRe
|
||||
case *mdag.RawNode:
|
||||
return NewBufDagReader(n.RawData()), nil
|
||||
case *mdag.ProtoNode:
|
||||
pb := new(ftpb.Data)
|
||||
if err := proto.Unmarshal(n.Data(), pb); err != nil {
|
||||
fsNode, err := ft.FSNodeFromBytes(n.Data())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch pb.GetType() {
|
||||
switch fsNode.Type() {
|
||||
case ftpb.Data_Directory, ftpb.Data_HAMTShard:
|
||||
// Dont allow reading directories
|
||||
return nil, ErrIsDir
|
||||
case ftpb.Data_File, ftpb.Data_Raw:
|
||||
return NewPBFileReader(ctx, n, pb, serv), nil
|
||||
return NewPBFileReader(ctx, n, fsNode, serv), nil
|
||||
case ftpb.Data_Metadata:
|
||||
if len(n.Links()) == 0 {
|
||||
return nil, errors.New("incorrectly formatted metadata object")
|
||||
|
@ -11,7 +11,6 @@ import (
|
||||
ftpb "github.com/ipfs/go-ipfs/unixfs/pb"
|
||||
|
||||
ipld "gx/ipfs/QmWi2BYBL5gJ3CiAiQchg6rn1A8iBsrWy51EYxvHVjFvLb/go-ipld-format"
|
||||
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
|
||||
cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid"
|
||||
)
|
||||
|
||||
@ -19,11 +18,8 @@ import (
|
||||
type PBDagReader struct {
|
||||
serv ipld.NodeGetter
|
||||
|
||||
// the node being read
|
||||
node *mdag.ProtoNode
|
||||
|
||||
// cached protobuf structure from node.Data
|
||||
pbdata *ftpb.Data
|
||||
// UnixFS file (it should be of type `Data_File` or `Data_Raw` only).
|
||||
file *ft.FSNode
|
||||
|
||||
// the current data buffer to be read from
|
||||
// will either be a bytes.Reader or a child DagReader
|
||||
@ -51,18 +47,17 @@ type PBDagReader struct {
|
||||
var _ DagReader = (*PBDagReader)(nil)
|
||||
|
||||
// NewPBFileReader constructs a new PBFileReader.
|
||||
func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv ipld.NodeGetter) *PBDagReader {
|
||||
func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, file *ft.FSNode, serv ipld.NodeGetter) *PBDagReader {
|
||||
fctx, cancel := context.WithCancel(ctx)
|
||||
curLinks := getLinkCids(n)
|
||||
return &PBDagReader{
|
||||
node: n,
|
||||
serv: serv,
|
||||
buf: NewBufDagReader(pb.GetData()),
|
||||
buf: NewBufDagReader(file.Data()),
|
||||
promises: make([]*ipld.NodePromise, len(curLinks)),
|
||||
links: curLinks,
|
||||
ctx: fctx,
|
||||
cancel: cancel,
|
||||
pbdata: pb,
|
||||
file: file,
|
||||
}
|
||||
}
|
||||
|
||||
@ -105,21 +100,20 @@ func (dr *PBDagReader) precalcNextBuf(ctx context.Context) error {
|
||||
|
||||
switch nxt := nxt.(type) {
|
||||
case *mdag.ProtoNode:
|
||||
pb := new(ftpb.Data)
|
||||
err = proto.Unmarshal(nxt.Data(), pb)
|
||||
fsNode, err := ft.FSNodeFromBytes(nxt.Data())
|
||||
if err != nil {
|
||||
return fmt.Errorf("incorrectly formatted protobuf: %s", err)
|
||||
}
|
||||
|
||||
switch pb.GetType() {
|
||||
switch fsNode.Type() {
|
||||
case ftpb.Data_Directory, ftpb.Data_HAMTShard:
|
||||
// A directory should not exist within a file
|
||||
return ft.ErrInvalidDirLocation
|
||||
case ftpb.Data_File:
|
||||
dr.buf = NewPBFileReader(dr.ctx, nxt, pb, dr.serv)
|
||||
dr.buf = NewPBFileReader(dr.ctx, nxt, fsNode, dr.serv)
|
||||
return nil
|
||||
case ftpb.Data_Raw:
|
||||
dr.buf = NewBufDagReader(pb.GetData())
|
||||
dr.buf = NewBufDagReader(fsNode.Data())
|
||||
return nil
|
||||
case ftpb.Data_Metadata:
|
||||
return errors.New("shouldnt have had metadata object inside file")
|
||||
@ -146,7 +140,7 @@ func getLinkCids(n ipld.Node) []*cid.Cid {
|
||||
|
||||
// Size return the total length of the data from the DAG structured file.
|
||||
func (dr *PBDagReader) Size() uint64 {
|
||||
return dr.pbdata.GetFilesize()
|
||||
return dr.file.FileSize()
|
||||
}
|
||||
|
||||
// Read reads data from the DAG structured file
|
||||
@ -239,17 +233,14 @@ func (dr *PBDagReader) Seek(offset int64, whence int) (int64, error) {
|
||||
return offset, nil
|
||||
}
|
||||
|
||||
// Grab cached protobuf object (solely to make code look cleaner)
|
||||
pb := dr.pbdata
|
||||
|
||||
// left represents the number of bytes remaining to seek to (from beginning)
|
||||
left := offset
|
||||
if int64(len(pb.Data)) >= offset {
|
||||
if int64(len(dr.file.Data())) >= offset {
|
||||
// Close current buf to close potential child dagreader
|
||||
if dr.buf != nil {
|
||||
dr.buf.Close()
|
||||
}
|
||||
dr.buf = NewBufDagReader(pb.GetData()[offset:])
|
||||
dr.buf = NewBufDagReader(dr.file.Data()[offset:])
|
||||
|
||||
// start reading links from the beginning
|
||||
dr.linkPosition = 0
|
||||
@ -258,15 +249,15 @@ func (dr *PBDagReader) Seek(offset int64, whence int) (int64, error) {
|
||||
}
|
||||
|
||||
// skip past root block data
|
||||
left -= int64(len(pb.Data))
|
||||
left -= int64(len(dr.file.Data()))
|
||||
|
||||
// iterate through links and find where we need to be
|
||||
for i := 0; i < len(pb.Blocksizes); i++ {
|
||||
if pb.Blocksizes[i] > uint64(left) {
|
||||
for i := 0; i < dr.file.NumChildren(); i++ {
|
||||
if dr.file.BlockSize(i) > uint64(left) {
|
||||
dr.linkPosition = i
|
||||
break
|
||||
} else {
|
||||
left -= int64(pb.Blocksizes[i])
|
||||
left -= int64(dr.file.BlockSize(i))
|
||||
}
|
||||
}
|
||||
|
||||
@ -298,14 +289,14 @@ func (dr *PBDagReader) Seek(offset int64, whence int) (int64, error) {
|
||||
noffset := dr.offset + offset
|
||||
return dr.Seek(noffset, io.SeekStart)
|
||||
case io.SeekEnd:
|
||||
noffset := int64(dr.pbdata.GetFilesize()) - offset
|
||||
noffset := int64(dr.file.FileSize()) - offset
|
||||
n, err := dr.Seek(noffset, io.SeekStart)
|
||||
|
||||
// Return negative number if we can't figure out the file size. Using io.EOF
|
||||
// for this seems to be good(-enough) solution as it's only returned by
|
||||
// precalcNextBuf when we step out of file range.
|
||||
// This is needed for gateway to function properly
|
||||
if err == io.EOF && *dr.pbdata.Type == ftpb.Data_File {
|
||||
if err == io.EOF && dr.file.Type() == ftpb.Data_File {
|
||||
return -1, nil
|
||||
}
|
||||
return n, err
|
||||
|
@ -195,6 +195,12 @@ func (n *FSNode) RemoveBlockSize(i int) {
|
||||
n.format.Blocksizes = append(n.format.Blocksizes[:i], n.format.Blocksizes[i+1:]...)
|
||||
}
|
||||
|
||||
// BlockSize returns the block size indexed by `i`.
|
||||
// TODO: Evaluate if this function should be bounds checking.
|
||||
func (n *FSNode) BlockSize(i int) uint64 {
|
||||
return n.format.Blocksizes[i]
|
||||
}
|
||||
|
||||
// GetBytes marshals this node as a protobuf message.
|
||||
func (n *FSNode) GetBytes() ([]byte, error) {
|
||||
return proto.Marshal(&n.format)
|
||||
@ -211,15 +217,15 @@ func (n *FSNode) NumChildren() int {
|
||||
return len(n.format.Blocksizes)
|
||||
}
|
||||
|
||||
// GetData retrieves the `Data` field from the internal `format`.
|
||||
func (n *FSNode) GetData() []byte {
|
||||
// Data retrieves the `Data` field from the internal `format`.
|
||||
func (n *FSNode) Data() []byte {
|
||||
return n.format.GetData()
|
||||
}
|
||||
|
||||
// SetData sets the `Data` field from the internal `format`
|
||||
// updating its `Filesize`.
|
||||
func (n *FSNode) SetData(newData []byte) {
|
||||
n.UpdateFilesize(int64(len(newData) - len(n.GetData())))
|
||||
n.UpdateFilesize(int64(len(newData) - len(n.Data())))
|
||||
n.format.Data = newData
|
||||
}
|
||||
|
||||
@ -231,8 +237,8 @@ func (n *FSNode) UpdateFilesize(filesizeDiff int64) {
|
||||
int64(n.format.GetFilesize()) + filesizeDiff))
|
||||
}
|
||||
|
||||
// GetType retrieves the `Type` field from the internal `format`.
|
||||
func (n *FSNode) GetType() pb.Data_DataType {
|
||||
// Type retrieves the `Type` field from the internal `format`.
|
||||
func (n *FSNode) Type() pb.Data_DataType {
|
||||
return n.format.GetType()
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user