mirror of
https://github.com/ipfs/kubo.git
synced 2025-09-10 22:49:13 +08:00
implement seeking in the dagreader
This commit is contained in:
@ -73,7 +73,7 @@ func cat(node *core.IpfsNode, paths []string) ([]io.Reader, uint64, error) {
|
||||
}
|
||||
length += nodeLength
|
||||
|
||||
read, err := uio.NewDagReader(dagnode, node.DAG)
|
||||
read, err := uio.NewDagReader(node.Context(), dagnode, node.DAG)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
@ -77,7 +77,7 @@ func (i *gatewayHandler) AddNodeToDAG(nd *dag.Node) (u.Key, error) {
|
||||
}
|
||||
|
||||
func (i *gatewayHandler) NewDagReader(nd *dag.Node) (io.Reader, error) {
|
||||
return uio.NewDagReader(nd, i.node.DAG)
|
||||
return uio.NewDagReader(i.node.Context(), nd, i.node.DAG)
|
||||
}
|
||||
|
||||
func (i *gatewayHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -12,5 +12,5 @@ func Cat(n *core.IpfsNode, path string) (io.Reader, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return uio.NewDagReader(dagNode, n.DAG)
|
||||
return uio.NewDagReader(n.ContextGroup.Context(), dagNode, n.DAG)
|
||||
}
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
|
||||
fuse "github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse"
|
||||
fs "github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse/fs"
|
||||
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
|
||||
|
||||
core "github.com/jbenet/go-ipfs/core"
|
||||
@ -337,7 +338,7 @@ func (s *Node) ReadDir(intr fs.Intr) ([]fuse.Dirent, fuse.Error) {
|
||||
// ReadAll reads the object data as file data
|
||||
func (s *Node) ReadAll(intr fs.Intr) ([]byte, fuse.Error) {
|
||||
log.Debugf("ipns: ReadAll [%s]", s.name)
|
||||
r, err := uio.NewDagReader(s.Nd, s.Ipfs.DAG)
|
||||
r, err := uio.NewDagReader(context.TODO(), s.Nd, s.Ipfs.DAG)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
|
||||
fuse "github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse"
|
||||
fs "github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse/fs"
|
||||
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
|
||||
|
||||
core "github.com/jbenet/go-ipfs/core"
|
||||
@ -145,7 +146,7 @@ func (s *Node) ReadDir(intr fs.Intr) ([]fuse.Dirent, fuse.Error) {
|
||||
// ReadAll reads the object data as file data
|
||||
func (s *Node) ReadAll(intr fs.Intr) ([]byte, fuse.Error) {
|
||||
log.Debug("Read node.")
|
||||
r, err := uio.NewDagReader(s.Nd, s.Ipfs.DAG)
|
||||
r, err := uio.NewDagReader(context.TODO(), s.Nd, s.Ipfs.DAG)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -6,8 +6,10 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||
dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
|
||||
bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
|
||||
@ -51,7 +53,7 @@ func testFileConsistency(t *testing.T, bs chunk.BlockSplitter, nbytes int) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r, err := uio.NewDagReader(nd, dnp.ds)
|
||||
r, err := uio.NewDagReader(context.TODO(), nd, dnp.ds)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -77,7 +79,7 @@ func TestBuilderConsistency(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
r, err := uio.NewDagReader(nd, dagserv)
|
||||
r, err := uio.NewDagReader(context.TODO(), nd, dagserv)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -165,7 +167,7 @@ func TestIndirectBlocks(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
reader, err := uio.NewDagReader(dag, dnp.ds)
|
||||
reader, err := uio.NewDagReader(context.TODO(), dag, dnp.ds)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -179,3 +181,127 @@ func TestIndirectBlocks(t *testing.T) {
|
||||
t.Fatal("Not equal!")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSeekingBasic(t *testing.T) {
|
||||
nbytes := int64(10 * 1024)
|
||||
should := make([]byte, nbytes)
|
||||
u.NewTimeSeededRand().Read(should)
|
||||
|
||||
read := bytes.NewReader(should)
|
||||
dnp := getDagservAndPinner(t)
|
||||
nd, err := BuildDagFromReader(read, dnp.ds, dnp.mp, &chunk.SizeSplitter{500})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
rs, err := uio.NewDagReader(context.TODO(), nd, dnp.ds)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
start := int64(4000)
|
||||
n, err := rs.Seek(start, os.SEEK_SET)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if n != start {
|
||||
t.Fatal("Failed to seek to correct offset")
|
||||
}
|
||||
|
||||
out, err := ioutil.ReadAll(rs)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = arrComp(out, should[start:])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSeekToBegin(t *testing.T) {
|
||||
nbytes := int64(10 * 1024)
|
||||
should := make([]byte, nbytes)
|
||||
u.NewTimeSeededRand().Read(should)
|
||||
|
||||
read := bytes.NewReader(should)
|
||||
dnp := getDagservAndPinner(t)
|
||||
nd, err := BuildDagFromReader(read, dnp.ds, dnp.mp, &chunk.SizeSplitter{500})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
rs, err := uio.NewDagReader(context.TODO(), nd, dnp.ds)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
n, err := io.CopyN(ioutil.Discard, rs, 1024*4)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if n != 4096 {
|
||||
t.Fatal("Copy didnt copy enough bytes")
|
||||
}
|
||||
|
||||
seeked, err := rs.Seek(0, os.SEEK_SET)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if seeked != 0 {
|
||||
t.Fatal("Failed to seek to beginning")
|
||||
}
|
||||
|
||||
out, err := ioutil.ReadAll(rs)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = arrComp(out, should)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSeekingConsistency(t *testing.T) {
|
||||
nbytes := int64(128 * 1024)
|
||||
should := make([]byte, nbytes)
|
||||
u.NewTimeSeededRand().Read(should)
|
||||
|
||||
read := bytes.NewReader(should)
|
||||
dnp := getDagservAndPinner(t)
|
||||
nd, err := BuildDagFromReader(read, dnp.ds, dnp.mp, &chunk.SizeSplitter{500})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
rs, err := uio.NewDagReader(context.TODO(), nd, dnp.ds)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
out := make([]byte, nbytes)
|
||||
|
||||
for coff := nbytes - 4096; coff >= 0; coff -= 4096 {
|
||||
t.Log(coff)
|
||||
n, err := rs.Seek(coff, os.SEEK_SET)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if n != coff {
|
||||
t.Fatal("wasnt able to seek to the right position")
|
||||
}
|
||||
nread, err := rs.Read(out[coff : coff+4096])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if nread != 4096 {
|
||||
t.Fatal("didnt read the correct number of bytes")
|
||||
}
|
||||
}
|
||||
|
||||
err = arrComp(out, should)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
@ -2,7 +2,6 @@
|
||||
package merkledag
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
@ -27,6 +26,7 @@ type DAGService interface {
|
||||
// GetDAG returns, in order, all the single leve child
|
||||
// nodes of the passed in node.
|
||||
GetDAG(context.Context, *Node) <-chan *Node
|
||||
GetNodes(context.Context, []u.Key) <-chan *Node
|
||||
}
|
||||
|
||||
func NewDAGService(bs *bserv.BlockService) DAGService {
|
||||
@ -155,11 +155,10 @@ func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{}
|
||||
|
||||
// FindLinks searches this nodes links for the given key,
|
||||
// returns the indexes of any links pointing to it
|
||||
func FindLinks(n *Node, k u.Key, start int) []int {
|
||||
func FindLinks(links []u.Key, k u.Key, start int) []int {
|
||||
var out []int
|
||||
keybytes := []byte(k)
|
||||
for i, lnk := range n.Links[start:] {
|
||||
if bytes.Equal([]byte(lnk.Hash), keybytes) {
|
||||
for i, lnk_k := range links[start:] {
|
||||
if k == lnk_k {
|
||||
out = append(out, i+start)
|
||||
}
|
||||
}
|
||||
@ -170,19 +169,31 @@ func FindLinks(n *Node, k u.Key, start int) []int {
|
||||
// It returns a channel of nodes, which the caller can receive
|
||||
// all the child nodes of 'root' on, in proper order.
|
||||
func (ds *dagService) GetDAG(ctx context.Context, root *Node) <-chan *Node {
|
||||
sig := make(chan *Node)
|
||||
go func() {
|
||||
defer close(sig)
|
||||
|
||||
var keys []u.Key
|
||||
for _, lnk := range root.Links {
|
||||
keys = append(keys, u.Key(lnk.Hash))
|
||||
}
|
||||
|
||||
return ds.GetNodes(ctx, keys)
|
||||
}
|
||||
|
||||
func (ds *dagService) GetNodes(ctx context.Context, keys []u.Key) <-chan *Node {
|
||||
sig := make(chan *Node)
|
||||
go func() {
|
||||
defer close(sig)
|
||||
blkchan := ds.Blocks.GetBlocks(ctx, keys)
|
||||
|
||||
nodes := make([]*Node, len(root.Links))
|
||||
nodes := make([]*Node, len(keys))
|
||||
next := 0
|
||||
for blk := range blkchan {
|
||||
for {
|
||||
select {
|
||||
case blk, ok := <-blkchan:
|
||||
if !ok {
|
||||
if next < len(nodes) {
|
||||
log.Errorf("Did not receive correct number of nodes!")
|
||||
}
|
||||
return
|
||||
}
|
||||
nd, err := Decoded(blk.Data)
|
||||
if err != nil {
|
||||
// NB: can occur in normal situations, with improperly formatted
|
||||
@ -190,20 +201,22 @@ func (ds *dagService) GetDAG(ctx context.Context, root *Node) <-chan *Node {
|
||||
log.Error("Got back bad block!")
|
||||
break
|
||||
}
|
||||
is := FindLinks(root, blk.Key(), next)
|
||||
is := FindLinks(keys, blk.Key(), next)
|
||||
for _, i := range is {
|
||||
nodes[i] = nd
|
||||
}
|
||||
|
||||
for ; next < len(nodes) && nodes[next] != nil; next++ {
|
||||
sig <- nodes[next]
|
||||
select {
|
||||
case sig <- nodes[next]:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
if next < len(nodes) {
|
||||
// TODO: bubble errors back up.
|
||||
log.Errorf("Did not receive correct number of nodes!")
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return sig
|
||||
}
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||
dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
|
||||
bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
|
||||
@ -162,7 +163,7 @@ func runBatchFetchTest(t *testing.T, read io.Reader) {
|
||||
|
||||
t.Log("finished setup.")
|
||||
|
||||
dagr, err := uio.NewDagReader(root, dagservs[0])
|
||||
dagr, err := uio.NewDagReader(context.TODO(), root, dagservs[0])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -195,7 +196,7 @@ func runBatchFetchTest(t *testing.T, read io.Reader) {
|
||||
}
|
||||
fmt.Println("Got first node back.")
|
||||
|
||||
read, err := uio.NewDagReader(first, dagservs[i])
|
||||
read, err := uio.NewDagReader(context.TODO(), first, dagservs[i])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package http
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
core "github.com/jbenet/go-ipfs/core"
|
||||
"github.com/jbenet/go-ipfs/importer"
|
||||
chunk "github.com/jbenet/go-ipfs/importer/chunk"
|
||||
@ -36,5 +37,5 @@ func (i *ipfsHandler) AddNodeToDAG(nd *dag.Node) (u.Key, error) {
|
||||
}
|
||||
|
||||
func (i *ipfsHandler) NewDagReader(nd *dag.Node) (io.Reader, error) {
|
||||
return uio.NewDagReader(nd, i.node.DAG)
|
||||
return uio.NewDagReader(context.TODO(), nd, i.node.DAG)
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
|
||||
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
|
||||
"github.com/jbenet/go-ipfs/blocks/blockstore"
|
||||
bs "github.com/jbenet/go-ipfs/blockservice"
|
||||
@ -38,7 +39,7 @@ func getNode(t *testing.T, dserv mdag.DAGService, size int64) ([]byte, *mdag.Nod
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
dr, err := NewDagReader(node, dserv)
|
||||
dr, err := NewDagReader(context.TODO(), node, dserv)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -75,7 +76,7 @@ func testModWrite(t *testing.T, beg, size uint64, orig []byte, dm *DagModifier)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
rd, err := NewDagReader(nd, dm.dagserv)
|
||||
rd, err := NewDagReader(context.TODO(), nd, dm.dagserv)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -173,7 +174,7 @@ func TestMultiWrite(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
read, err := NewDagReader(nd, dserv)
|
||||
read, err := NewDagReader(context.TODO(), nd, dserv)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -215,7 +216,7 @@ func TestMultiWriteCoal(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
read, err := NewDagReader(nd, dserv)
|
||||
read, err := NewDagReader(context.TODO(), nd, dserv)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -3,7 +3,10 @@ package io
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
|
||||
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
|
||||
@ -11,6 +14,7 @@ import (
|
||||
mdag "github.com/jbenet/go-ipfs/merkledag"
|
||||
ft "github.com/jbenet/go-ipfs/unixfs"
|
||||
ftpb "github.com/jbenet/go-ipfs/unixfs/pb"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
)
|
||||
|
||||
var ErrIsDir = errors.New("this dag node is a directory")
|
||||
@ -19,14 +23,28 @@ var ErrIsDir = errors.New("this dag node is a directory")
|
||||
type DagReader struct {
|
||||
serv mdag.DAGService
|
||||
node *mdag.Node
|
||||
buf io.Reader
|
||||
buf ReadSeekCloser
|
||||
fetchChan <-chan *mdag.Node
|
||||
linkPosition int
|
||||
offset int64
|
||||
|
||||
// Our context
|
||||
ctx context.Context
|
||||
|
||||
// Context for children
|
||||
fctx context.Context
|
||||
cancel func()
|
||||
}
|
||||
|
||||
type ReadSeekCloser interface {
|
||||
io.Reader
|
||||
io.Seeker
|
||||
io.Closer
|
||||
}
|
||||
|
||||
// NewDagReader creates a new reader object that reads the data represented by the given
|
||||
// node, using the passed in DAGService for data retreival
|
||||
func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) {
|
||||
func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (ReadSeekCloser, error) {
|
||||
pb := new(ftpb.Data)
|
||||
err := proto.Unmarshal(n.Data, pb)
|
||||
if err != nil {
|
||||
@ -38,16 +56,20 @@ func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) {
|
||||
// Dont allow reading directories
|
||||
return nil, ErrIsDir
|
||||
case ftpb.Data_File:
|
||||
fetchChan := serv.GetDAG(context.TODO(), n)
|
||||
fctx, cancel := context.WithCancel(ctx)
|
||||
fetchChan := serv.GetDAG(fctx, n)
|
||||
return &DagReader{
|
||||
node: n,
|
||||
serv: serv,
|
||||
buf: bytes.NewBuffer(pb.GetData()),
|
||||
buf: NewRSNCFromBytes(pb.GetData()),
|
||||
fetchChan: fetchChan,
|
||||
ctx: ctx,
|
||||
fctx: fctx,
|
||||
cancel: cancel,
|
||||
}, nil
|
||||
case ftpb.Data_Raw:
|
||||
// Raw block will just be a single level, return a byte buffer
|
||||
return bytes.NewBuffer(pb.GetData()), nil
|
||||
return NewRSNCFromBytes(pb.GetData()), nil
|
||||
default:
|
||||
return nil, ft.ErrUnrecognizedType
|
||||
}
|
||||
@ -70,6 +92,8 @@ func (dr *DagReader) precalcNextBuf() error {
|
||||
if !ok {
|
||||
return io.EOF
|
||||
}
|
||||
case <-dr.ctx.Done():
|
||||
return dr.ctx.Err()
|
||||
}
|
||||
|
||||
pb := new(ftpb.Data)
|
||||
@ -85,20 +109,37 @@ func (dr *DagReader) precalcNextBuf() error {
|
||||
case ftpb.Data_File:
|
||||
//TODO: this *should* work, needs testing first
|
||||
log.Warning("Running untested code for multilayered indirect FS reads.")
|
||||
subr, err := NewDagReader(nxt, dr.serv)
|
||||
subr, err := NewDagReader(dr.fctx, nxt, dr.serv)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dr.buf = subr
|
||||
return nil
|
||||
case ftpb.Data_Raw:
|
||||
dr.buf = bytes.NewBuffer(pb.GetData())
|
||||
dr.buf = NewRSNCFromBytes(pb.GetData())
|
||||
return nil
|
||||
default:
|
||||
return ft.ErrUnrecognizedType
|
||||
}
|
||||
}
|
||||
|
||||
func (dr *DagReader) resetBlockFetch(nlinkpos int) {
|
||||
dr.cancel()
|
||||
dr.fetchChan = nil
|
||||
dr.linkPosition = nlinkpos
|
||||
|
||||
var keys []u.Key
|
||||
for _, lnk := range dr.node.Links[dr.linkPosition:] {
|
||||
keys = append(keys, u.Key(lnk.Hash))
|
||||
}
|
||||
|
||||
fctx, cancel := context.WithCancel(dr.ctx)
|
||||
dr.cancel = cancel
|
||||
dr.fctx = fctx
|
||||
fch := dr.serv.GetNodes(fctx, keys)
|
||||
dr.fetchChan = fch
|
||||
}
|
||||
|
||||
// Read reads data from the DAG structured file
|
||||
func (dr *DagReader) Read(b []byte) (int, error) {
|
||||
// If no cached buffer, load one
|
||||
@ -113,6 +154,7 @@ func (dr *DagReader) Read(b []byte) (int, error) {
|
||||
// Attempt to fill bytes from cached buffer
|
||||
n, err := dr.buf.Read(b[total:])
|
||||
total += n
|
||||
dr.offset += int64(n)
|
||||
if err != nil {
|
||||
// EOF is expected
|
||||
if err != io.EOF {
|
||||
@ -133,28 +175,90 @@ func (dr *DagReader) Read(b []byte) (int, error) {
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
func (dr *DagReader) Close() error {
|
||||
if dr.fctx != nil {
|
||||
dr.cancel()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dr *DagReader) Seek(offset int64, whence int) (int64, error) {
|
||||
switch whence {
|
||||
case os.SEEK_SET:
|
||||
for i := 0; i < len(dr.node.Links); i++ {
|
||||
nsize := dr.node.Links[i].Size - 8
|
||||
if offset > nsize {
|
||||
offset -= nsize
|
||||
if offset < 0 {
|
||||
return -1, errors.New("Invalid offset")
|
||||
}
|
||||
//TODO: this pb should be cached
|
||||
pb := new(ftpb.Data)
|
||||
err := proto.Unmarshal(dr.node.Data, pb)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
if offset == 0 {
|
||||
dr.resetBlockFetch(0)
|
||||
dr.buf = NewRSNCFromBytes(pb.GetData())
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
left := offset
|
||||
if int64(len(pb.Data)) > offset {
|
||||
dr.buf = NewRSNCFromBytes(pb.GetData()[offset:])
|
||||
dr.linkPosition = 0
|
||||
dr.offset = offset
|
||||
return offset, nil
|
||||
} else {
|
||||
left -= int64(len(pb.Data))
|
||||
}
|
||||
|
||||
i := 0
|
||||
for ; i < len(pb.Blocksizes); i++ {
|
||||
if pb.Blocksizes[i] > uint64(left) {
|
||||
break
|
||||
} else {
|
||||
left -= int64(pb.Blocksizes[i])
|
||||
}
|
||||
}
|
||||
dr.position = i
|
||||
err := dr.precalcNextBuf()
|
||||
dr.resetBlockFetch(i)
|
||||
err = dr.precalcNextBuf()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
n, err := io.CopyN(ioutil.Discard, dr.buf, left)
|
||||
if err != nil {
|
||||
fmt.Printf("the copy failed: %s - [%d]\n", err, n)
|
||||
return -1, err
|
||||
}
|
||||
left -= n
|
||||
if left != 0 {
|
||||
return -1, errors.New("failed to seek properly")
|
||||
}
|
||||
dr.offset = offset
|
||||
return offset, nil
|
||||
case os.SEEK_CUR:
|
||||
noffset := dr.offset + offset
|
||||
return dr.Seek(noffset, os.SEEK_SET)
|
||||
case os.SEEK_END:
|
||||
pb := new(ftpb.Data)
|
||||
err := proto.Unmarshal(dr.node.Data, pb)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
noffset := int64(pb.GetFilesize()) - offset
|
||||
return dr.Seek(noffset, os.SEEK_SET)
|
||||
default:
|
||||
return 0, errors.New("invalid whence")
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
*/
|
||||
|
||||
type readSeekNopCloser struct {
|
||||
*bytes.Reader
|
||||
}
|
||||
|
||||
func NewRSNCFromBytes(b []byte) ReadSeekCloser {
|
||||
return &readSeekNopCloser{bytes.NewReader(b)}
|
||||
}
|
||||
|
||||
func (r *readSeekNopCloser) Close() error { return nil }
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"archive/tar"
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
"io"
|
||||
gopath "path"
|
||||
"strings"
|
||||
@ -114,7 +115,7 @@ func (i *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) {
|
||||
}
|
||||
i.flush()
|
||||
|
||||
reader, err := uio.NewDagReader(dagnode, i.dag)
|
||||
reader, err := uio.NewDagReader(context.TODO(), dagnode, i.dag)
|
||||
if err != nil {
|
||||
i.emitError(err)
|
||||
return
|
||||
|
Reference in New Issue
Block a user