mirror of
https://github.com/ipfs/kubo.git
synced 2025-10-27 19:45:26 +08:00
address concerns from PR
This commit is contained in:
@ -6,6 +6,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
mrand "math/rand"
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
@ -307,6 +308,51 @@ func TestSeekToAlmostBegin(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSeekingStress(t *testing.T) {
|
||||||
|
nbytes := int64(1024 * 1024)
|
||||||
|
should := make([]byte, nbytes)
|
||||||
|
u.NewTimeSeededRand().Read(should)
|
||||||
|
|
||||||
|
read := bytes.NewReader(should)
|
||||||
|
dnp := getDagservAndPinner(t)
|
||||||
|
nd, err := BuildDagFromReader(read, dnp.ds, dnp.mp, &chunk.SizeSplitter{1000})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
rs, err := uio.NewDagReader(context.TODO(), nd, dnp.ds)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
testbuf := make([]byte, nbytes)
|
||||||
|
for i := 0; i < 50; i++ {
|
||||||
|
offset := mrand.Intn(int(nbytes))
|
||||||
|
l := int(nbytes) - offset
|
||||||
|
n, err := rs.Seek(int64(offset), os.SEEK_SET)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if n != int64(offset) {
|
||||||
|
t.Fatal("Seek failed to move to correct position")
|
||||||
|
}
|
||||||
|
|
||||||
|
nread, err := rs.Read(testbuf[:l])
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if nread != l {
|
||||||
|
t.Fatal("Failed to read enough bytes")
|
||||||
|
}
|
||||||
|
|
||||||
|
err = arrComp(testbuf[:l], should[offset:offset+l])
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func TestSeekingConsistency(t *testing.T) {
|
func TestSeekingConsistency(t *testing.T) {
|
||||||
nbytes := int64(128 * 1024)
|
nbytes := int64(128 * 1024)
|
||||||
should := make([]byte, nbytes)
|
should := make([]byte, nbytes)
|
||||||
|
|||||||
@ -177,6 +177,8 @@ func (ds *dagService) GetDAG(ctx context.Context, root *Node) []NodeGetter {
|
|||||||
return ds.GetNodes(ctx, keys)
|
return ds.GetNodes(ctx, keys)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
|
||||||
|
// to the key with the same index as the passed in keys
|
||||||
func (ds *dagService) GetNodes(ctx context.Context, keys []u.Key) []NodeGetter {
|
func (ds *dagService) GetNodes(ctx context.Context, keys []u.Key) []NodeGetter {
|
||||||
promises := make([]NodeGetter, len(keys))
|
promises := make([]NodeGetter, len(keys))
|
||||||
sendChans := make([]chan<- *Node, len(keys))
|
sendChans := make([]chan<- *Node, len(keys))
|
||||||
|
|||||||
@ -39,7 +39,7 @@ func getNode(t *testing.T, dserv mdag.DAGService, size int64) ([]byte, *mdag.Nod
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
dr, err := NewDagReader(context.TODO(), node, dserv)
|
dr, err := NewDagReader(context.Background(), node, dserv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -76,7 +76,7 @@ func testModWrite(t *testing.T, beg, size uint64, orig []byte, dm *DagModifier)
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
rd, err := NewDagReader(context.TODO(), nd, dm.dagserv)
|
rd, err := NewDagReader(context.Background(), nd, dm.dagserv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -174,7 +174,7 @@ func TestMultiWrite(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
read, err := NewDagReader(context.TODO(), nd, dserv)
|
read, err := NewDagReader(context.Background(), nd, dserv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -216,7 +216,7 @@ func TestMultiWriteCoal(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
read, err := NewDagReader(context.TODO(), nd, dserv)
|
read, err := NewDagReader(context.Background(), nd, dserv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -18,19 +18,31 @@ var ErrIsDir = errors.New("this dag node is a directory")
|
|||||||
|
|
||||||
// DagReader provides a way to easily read the data contained in a dag.
|
// DagReader provides a way to easily read the data contained in a dag.
|
||||||
type DagReader struct {
|
type DagReader struct {
|
||||||
serv mdag.DAGService
|
serv mdag.DAGService
|
||||||
node *mdag.Node
|
|
||||||
pbdata *ftpb.Data
|
// the node being read
|
||||||
buf ReadSeekCloser
|
node *mdag.Node
|
||||||
promises []mdag.NodeGetter
|
|
||||||
|
// cached protobuf structure from node.Data
|
||||||
|
pbdata *ftpb.Data
|
||||||
|
|
||||||
|
// the current data buffer to be read from
|
||||||
|
// will either be a bytes.Reader or a child DagReader
|
||||||
|
buf ReadSeekCloser
|
||||||
|
|
||||||
|
// NodeGetters for each of 'nodes' child links
|
||||||
|
promises []mdag.NodeGetter
|
||||||
|
|
||||||
|
// the index of the child link currently being read from
|
||||||
linkPosition int
|
linkPosition int
|
||||||
offset int64
|
|
||||||
|
// current offset for the read head within the 'file'
|
||||||
|
offset int64
|
||||||
|
|
||||||
// Our context
|
// Our context
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
|
||||||
// Context for children
|
// context cancel for children
|
||||||
fctx context.Context
|
|
||||||
cancel func()
|
cancel func()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -145,6 +157,8 @@ func (dr *DagReader) Close() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Seek implements io.Seeker, and will seek to a given offset in the file
|
||||||
|
// interface matches standard unix seek
|
||||||
func (dr *DagReader) Seek(offset int64, whence int) (int64, error) {
|
func (dr *DagReader) Seek(offset int64, whence int) (int64, error) {
|
||||||
switch whence {
|
switch whence {
|
||||||
case os.SEEK_SET:
|
case os.SEEK_SET:
|
||||||
@ -152,18 +166,26 @@ func (dr *DagReader) Seek(offset int64, whence int) (int64, error) {
|
|||||||
return -1, errors.New("Invalid offset")
|
return -1, errors.New("Invalid offset")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Grab cached protobuf object (solely to make code look cleaner)
|
||||||
pb := dr.pbdata
|
pb := dr.pbdata
|
||||||
|
|
||||||
|
// left represents the number of bytes remaining to seek to (from beginning)
|
||||||
left := offset
|
left := offset
|
||||||
if int64(len(pb.Data)) > offset {
|
if int64(len(pb.Data)) > offset {
|
||||||
|
// Close current buf to close potential child dagreader
|
||||||
dr.buf.Close()
|
dr.buf.Close()
|
||||||
dr.buf = NewRSNCFromBytes(pb.GetData()[offset:])
|
dr.buf = NewRSNCFromBytes(pb.GetData()[offset:])
|
||||||
|
|
||||||
|
// start reading links from the beginning
|
||||||
dr.linkPosition = 0
|
dr.linkPosition = 0
|
||||||
dr.offset = offset
|
dr.offset = offset
|
||||||
return offset, nil
|
return offset, nil
|
||||||
} else {
|
} else {
|
||||||
|
// skip past root block data
|
||||||
left -= int64(len(pb.Data))
|
left -= int64(len(pb.Data))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// iterate through links and find where we need to be
|
||||||
for i := 0; i < len(pb.Blocksizes); i++ {
|
for i := 0; i < len(pb.Blocksizes); i++ {
|
||||||
if pb.Blocksizes[i] > uint64(left) {
|
if pb.Blocksizes[i] > uint64(left) {
|
||||||
dr.linkPosition = i
|
dr.linkPosition = i
|
||||||
@ -173,15 +195,19 @@ func (dr *DagReader) Seek(offset int64, whence int) (int64, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// start sub-block request
|
||||||
err := dr.precalcNextBuf()
|
err := dr.precalcNextBuf()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// set proper offset within child readseeker
|
||||||
n, err := dr.buf.Seek(left, os.SEEK_SET)
|
n, err := dr.buf.Seek(left, os.SEEK_SET)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return -1, err
|
return -1, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// sanity
|
||||||
left -= n
|
left -= n
|
||||||
if left != 0 {
|
if left != 0 {
|
||||||
return -1, errors.New("failed to seek properly")
|
return -1, errors.New("failed to seek properly")
|
||||||
@ -201,6 +227,7 @@ func (dr *DagReader) Seek(offset int64, whence int) (int64, error) {
|
|||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// readSeekNopCloser wraps a bytes.Reader to implement ReadSeekCloser
|
||||||
type readSeekNopCloser struct {
|
type readSeekNopCloser struct {
|
||||||
*bytes.Reader
|
*bytes.Reader
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user