mirror of
https://github.com/ipfs/kubo.git
synced 2025-08-06 19:44:01 +08:00
Make unixio.DagReader an interface
License: MIT Signed-off-by: Jakub Sztandera <kubuxu@protonmail.ch>
This commit is contained in:
@ -8,7 +8,7 @@ import (
|
||||
uio "github.com/ipfs/go-ipfs/unixfs/io"
|
||||
)
|
||||
|
||||
func Cat(ctx context.Context, n *core.IpfsNode, pstr string) (*uio.DagReader, error) {
|
||||
func Cat(ctx context.Context, n *core.IpfsNode, pstr string) (uio.DagReader, error) {
|
||||
dagNode, err := core.Resolve(ctx, n.Namesys, n.Resolver, path.Path(pstr))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -20,8 +20,15 @@ var ErrIsDir = errors.New("this dag node is a directory")
|
||||
|
||||
var ErrCantReadSymlinks = errors.New("cannot currently read symlinks")
|
||||
|
||||
type DagReader interface {
|
||||
ReadSeekCloser
|
||||
Size() uint64
|
||||
CtxReadFull(context.Context, []byte) (int, error)
|
||||
Offset() int64
|
||||
}
|
||||
|
||||
// DagReader provides a way to easily read the data contained in a dag.
|
||||
type DagReader struct {
|
||||
type pbDagReader struct {
|
||||
serv mdag.DAGService
|
||||
|
||||
// the node being read
|
||||
@ -59,10 +66,10 @@ type ReadSeekCloser interface {
|
||||
|
||||
// NewDagReader creates a new reader object that reads the data represented by
|
||||
// the given node, using the passed in DAGService for data retreival
|
||||
func NewDagReader(ctx context.Context, n node.Node, serv mdag.DAGService) (*DagReader, error) {
|
||||
func NewDagReader(ctx context.Context, n node.Node, serv mdag.DAGService) (DagReader, error) {
|
||||
switch n := n.(type) {
|
||||
case *mdag.RawNode:
|
||||
return &DagReader{
|
||||
return &pbDagReader{
|
||||
buf: NewRSNCFromBytes(n.RawData()),
|
||||
}, nil
|
||||
case *mdag.ProtoNode:
|
||||
@ -101,10 +108,10 @@ func NewDagReader(ctx context.Context, n node.Node, serv mdag.DAGService) (*DagR
|
||||
}
|
||||
}
|
||||
|
||||
func NewDataFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv mdag.DAGService) *DagReader {
|
||||
func NewDataFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv mdag.DAGService) *pbDagReader {
|
||||
fctx, cancel := context.WithCancel(ctx)
|
||||
promises := mdag.GetDAG(fctx, serv, n)
|
||||
return &DagReader{
|
||||
return &pbDagReader{
|
||||
node: n,
|
||||
serv: serv,
|
||||
buf: NewRSNCFromBytes(pb.GetData()),
|
||||
@ -117,7 +124,7 @@ func NewDataFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, se
|
||||
|
||||
// precalcNextBuf follows the next link in line and loads it from the
|
||||
// DAGService, setting the next buffer to read from
|
||||
func (dr *DagReader) precalcNextBuf(ctx context.Context) error {
|
||||
func (dr *pbDagReader) precalcNextBuf(ctx context.Context) error {
|
||||
dr.buf.Close() // Just to make sure
|
||||
if dr.linkPosition >= len(dr.promises) {
|
||||
return io.EOF
|
||||
@ -158,22 +165,22 @@ func (dr *DagReader) precalcNextBuf(ctx context.Context) error {
|
||||
dr.buf = NewRSNCFromBytes(nxt.RawData())
|
||||
return nil
|
||||
default:
|
||||
return errors.New("unrecognized node type in DagReader")
|
||||
return errors.New("unrecognized node type in pbDagReader")
|
||||
}
|
||||
}
|
||||
|
||||
// Size return the total length of the data from the DAG structured file.
|
||||
func (dr *DagReader) Size() uint64 {
|
||||
func (dr *pbDagReader) Size() uint64 {
|
||||
return dr.pbdata.GetFilesize()
|
||||
}
|
||||
|
||||
// Read reads data from the DAG structured file
|
||||
func (dr *DagReader) Read(b []byte) (int, error) {
|
||||
func (dr *pbDagReader) Read(b []byte) (int, error) {
|
||||
return dr.CtxReadFull(dr.ctx, b)
|
||||
}
|
||||
|
||||
// CtxReadFull reads data from the DAG structured file
|
||||
func (dr *DagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) {
|
||||
func (dr *pbDagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) {
|
||||
// If no cached buffer, load one
|
||||
total := 0
|
||||
for {
|
||||
@ -201,7 +208,7 @@ func (dr *DagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) {
|
||||
}
|
||||
}
|
||||
|
||||
func (dr *DagReader) WriteTo(w io.Writer) (int64, error) {
|
||||
func (dr *pbDagReader) WriteTo(w io.Writer) (int64, error) {
|
||||
// If no cached buffer, load one
|
||||
total := int64(0)
|
||||
for {
|
||||
@ -226,12 +233,12 @@ func (dr *DagReader) WriteTo(w io.Writer) (int64, error) {
|
||||
}
|
||||
}
|
||||
|
||||
func (dr *DagReader) Close() error {
|
||||
func (dr *pbDagReader) Close() error {
|
||||
dr.cancel()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dr *DagReader) Offset() int64 {
|
||||
func (dr *pbDagReader) Offset() int64 {
|
||||
return dr.offset
|
||||
}
|
||||
|
||||
@ -239,7 +246,7 @@ func (dr *DagReader) Offset() int64 {
|
||||
// interface matches standard unix seek
|
||||
// TODO: check if we can do relative seeks, to reduce the amount of dagreader
|
||||
// recreations that need to happen.
|
||||
func (dr *DagReader) Seek(offset int64, whence int) (int64, error) {
|
||||
func (dr *pbDagReader) Seek(offset int64, whence int) (int64, error) {
|
||||
switch whence {
|
||||
case os.SEEK_SET:
|
||||
if offset < 0 {
|
||||
|
@ -236,7 +236,7 @@ func TestReaderSzie(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func readByte(t testing.TB, reader *DagReader) byte {
|
||||
func readByte(t testing.TB, reader DagReader) byte {
|
||||
out := make([]byte, 1)
|
||||
c, err := reader.Read(out)
|
||||
|
||||
|
@ -43,7 +43,7 @@ type DagModifier struct {
|
||||
curWrOff uint64
|
||||
wrBuf *bytes.Buffer
|
||||
|
||||
read *uio.DagReader
|
||||
read uio.DagReader
|
||||
}
|
||||
|
||||
func NewDagModifier(ctx context.Context, from node.Node, serv mdag.DAGService, spl chunk.SplitterGen) (*DagModifier, error) {
|
||||
|
Reference in New Issue
Block a user