diff --git a/importer/importer_test.go b/importer/importer_test.go index e3e22a479..e8b8a03a9 100644 --- a/importer/importer_test.go +++ b/importer/importer_test.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "io/ioutil" + mrand "math/rand" "os" "testing" @@ -307,6 +308,51 @@ func TestSeekToAlmostBegin(t *testing.T) { } } +func TestSeekingStress(t *testing.T) { + nbytes := int64(1024 * 1024) + should := make([]byte, nbytes) + u.NewTimeSeededRand().Read(should) + + read := bytes.NewReader(should) + dnp := getDagservAndPinner(t) + nd, err := BuildDagFromReader(read, dnp.ds, dnp.mp, &chunk.SizeSplitter{1000}) + if err != nil { + t.Fatal(err) + } + + rs, err := uio.NewDagReader(context.TODO(), nd, dnp.ds) + if err != nil { + t.Fatal(err) + } + + testbuf := make([]byte, nbytes) + for i := 0; i < 50; i++ { + offset := mrand.Intn(int(nbytes)) + l := int(nbytes) - offset + n, err := rs.Seek(int64(offset), os.SEEK_SET) + if err != nil { + t.Fatal(err) + } + if n != int64(offset) { + t.Fatal("Seek failed to move to correct position") + } + + nread, err := rs.Read(testbuf[:l]) + if err != nil { + t.Fatal(err) + } + if nread != l { + t.Fatal("Failed to read enough bytes") + } + + err = arrComp(testbuf[:l], should[offset:offset+l]) + if err != nil { + t.Fatal(err) + } + } + +} + func TestSeekingConsistency(t *testing.T) { nbytes := int64(128 * 1024) should := make([]byte, nbytes) diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index b05b309b7..5a68b12d2 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -177,6 +177,8 @@ func (ds *dagService) GetDAG(ctx context.Context, root *Node) []NodeGetter { return ds.GetNodes(ctx, keys) } +// GetNodes returns an array of 'NodeGetter' promises, with each corresponding +// to the key with the same index as the passed in keys func (ds *dagService) GetNodes(ctx context.Context, keys []u.Key) []NodeGetter { promises := make([]NodeGetter, len(keys)) sendChans := make([]chan<- *Node, len(keys)) diff --git a/unixfs/io/dagmodifier_test.go b/unixfs/io/dagmodifier_test.go index 52ff5853b..23f2a0afa 100644 --- a/unixfs/io/dagmodifier_test.go +++ b/unixfs/io/dagmodifier_test.go @@ -39,7 +39,7 @@ func getNode(t *testing.T, dserv mdag.DAGService, size int64) ([]byte, *mdag.Nod t.Fatal(err) } - dr, err := NewDagReader(context.TODO(), node, dserv) + dr, err := NewDagReader(context.Background(), node, dserv) if err != nil { t.Fatal(err) } @@ -76,7 +76,7 @@ func testModWrite(t *testing.T, beg, size uint64, orig []byte, dm *DagModifier) t.Fatal(err) } - rd, err := NewDagReader(context.TODO(), nd, dm.dagserv) + rd, err := NewDagReader(context.Background(), nd, dm.dagserv) if err != nil { t.Fatal(err) } @@ -174,7 +174,7 @@ func TestMultiWrite(t *testing.T) { t.Fatal(err) } - read, err := NewDagReader(context.TODO(), nd, dserv) + read, err := NewDagReader(context.Background(), nd, dserv) if err != nil { t.Fatal(err) } @@ -216,7 +216,7 @@ func TestMultiWriteCoal(t *testing.T) { t.Fatal(err) } - read, err := NewDagReader(context.TODO(), nd, dserv) + read, err := NewDagReader(context.Background(), nd, dserv) if err != nil { t.Fatal(err) } diff --git a/unixfs/io/dagreader.go b/unixfs/io/dagreader.go index 300eb7016..8d1c87507 100644 --- a/unixfs/io/dagreader.go +++ b/unixfs/io/dagreader.go @@ -18,19 +18,31 @@ var ErrIsDir = errors.New("this dag node is a directory") // DagReader provides a way to easily read the data contained in a dag. type DagReader struct { - serv mdag.DAGService - node *mdag.Node - pbdata *ftpb.Data - buf ReadSeekCloser - promises []mdag.NodeGetter + serv mdag.DAGService + + // the node being read + node *mdag.Node + + // cached protobuf structure from node.Data + pbdata *ftpb.Data + + // the current data buffer to be read from + // will either be a bytes.Reader or a child DagReader + buf ReadSeekCloser + + // NodeGetters for each of 'nodes' child links + promises []mdag.NodeGetter + + // the index of the child link currently being read from linkPosition int - offset int64 + + // current offset for the read head within the 'file' + offset int64 // Our context ctx context.Context - // Context for children - fctx context.Context + // context cancel for children cancel func() } @@ -145,6 +157,8 @@ func (dr *DagReader) Close() error { return nil } +// Seek implements io.Seeker, and will seek to a given offset in the file +// interface matches standard unix seek func (dr *DagReader) Seek(offset int64, whence int) (int64, error) { switch whence { case os.SEEK_SET: @@ -152,18 +166,26 @@ func (dr *DagReader) Seek(offset int64, whence int) (int64, error) { return -1, errors.New("Invalid offset") } + // Grab cached protobuf object (solely to make code look cleaner) pb := dr.pbdata + + // left represents the number of bytes remaining to seek to (from beginning) left := offset if int64(len(pb.Data)) > offset { + // Close current buf to close potential child dagreader dr.buf.Close() dr.buf = NewRSNCFromBytes(pb.GetData()[offset:]) + + // start reading links from the beginning dr.linkPosition = 0 dr.offset = offset return offset, nil } else { + // skip past root block data left -= int64(len(pb.Data)) } + // iterate through links and find where we need to be for i := 0; i < len(pb.Blocksizes); i++ { if pb.Blocksizes[i] > uint64(left) { dr.linkPosition = i @@ -173,15 +195,19 @@ func (dr *DagReader) Seek(offset int64, whence int) (int64, error) { } } + // start sub-block request err := dr.precalcNextBuf() if err != nil { return 0, err } + // set proper offset within child readseeker n, err := dr.buf.Seek(left, os.SEEK_SET) if err != nil { return -1, err } + + // sanity left -= n if left != 0 { return -1, errors.New("failed to seek properly") @@ -201,6 +227,7 @@ func (dr *DagReader) Seek(offset int64, whence int) (int64, error) { return 0, nil } +// readSeekNopCloser wraps a bytes.Reader to implement ReadSeekCloser type readSeekNopCloser struct { *bytes.Reader }