1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-27 16:07:42 +08:00

clean up benchmarks, implement WriterTo on DAGReader, and optimize DagReader

This commit is contained in:
Jeromy
2015-02-04 19:43:49 +00:00
parent 414bdc78c0
commit 1e93ee00c0
2 changed files with 60 additions and 66 deletions

View File

@ -65,21 +65,9 @@ func BenchmarkBalancedReadSmallBlock(b *testing.B) {
nbytes := int64(10000000) nbytes := int64(10000000)
nd, ds := getBalancedDag(b, nbytes, 4096) nd, ds := getBalancedDag(b, nbytes, 4096)
b.StartTimer()
for i := 0; i < b.N; i++ {
read, err := uio.NewDagReader(context.TODO(), nd, ds)
if err != nil {
b.Fatal(err)
}
n, err := io.Copy(ioutil.Discard, read)
if err != nil {
b.Fatal(err)
}
if n != nbytes {
b.Fatal("Failed to read correct amount")
}
}
b.SetBytes(nbytes) b.SetBytes(nbytes)
b.StartTimer()
runReadBench(b, nd, ds)
} }
func BenchmarkTrickleReadSmallBlock(b *testing.B) { func BenchmarkTrickleReadSmallBlock(b *testing.B) {
@ -87,22 +75,9 @@ func BenchmarkTrickleReadSmallBlock(b *testing.B) {
nbytes := int64(10000000) nbytes := int64(10000000)
nd, ds := getTrickleDag(b, nbytes, 4096) nd, ds := getTrickleDag(b, nbytes, 4096)
b.StartTimer()
for i := 0; i < b.N; i++ {
read, err := uio.NewDagReader(context.TODO(), nd, ds)
if err != nil {
b.Fatal(err)
}
n, err := io.Copy(ioutil.Discard, read)
if err != nil {
b.Fatal(err)
}
if n != nbytes {
b.Fatal("Failed to read correct amount")
}
}
b.SetBytes(nbytes) b.SetBytes(nbytes)
b.StartTimer()
runReadBench(b, nd, ds)
} }
func BenchmarkBalancedReadFull(b *testing.B) { func BenchmarkBalancedReadFull(b *testing.B) {
@ -110,21 +85,9 @@ func BenchmarkBalancedReadFull(b *testing.B) {
nbytes := int64(10000000) nbytes := int64(10000000)
nd, ds := getBalancedDag(b, nbytes, chunk.DefaultBlockSize) nd, ds := getBalancedDag(b, nbytes, chunk.DefaultBlockSize)
b.StartTimer()
for i := 0; i < b.N; i++ {
read, err := uio.NewDagReader(context.TODO(), nd, ds)
if err != nil {
b.Fatal(err)
}
n, err := io.Copy(ioutil.Discard, read)
if err != nil {
b.Fatal(err)
}
if n != nbytes {
b.Fatal("Failed to read correct amount")
}
}
b.SetBytes(nbytes) b.SetBytes(nbytes)
b.StartTimer()
runReadBench(b, nd, ds)
} }
func BenchmarkTrickleReadFull(b *testing.B) { func BenchmarkTrickleReadFull(b *testing.B) {
@ -132,20 +95,23 @@ func BenchmarkTrickleReadFull(b *testing.B) {
nbytes := int64(10000000) nbytes := int64(10000000)
nd, ds := getTrickleDag(b, nbytes, chunk.DefaultBlockSize) nd, ds := getTrickleDag(b, nbytes, chunk.DefaultBlockSize)
b.SetBytes(nbytes)
b.StartTimer() b.StartTimer()
runReadBench(b, nd, ds)
}
func runReadBench(b *testing.B, nd *dag.Node, ds dag.DAGService) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
read, err := uio.NewDagReader(context.TODO(), nd, ds) ctx, cancel := context.WithCancel(context.TODO())
read, err := uio.NewDagReader(ctx, nd, ds)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
n, err := io.Copy(ioutil.Discard, read) _, err = read.WriteTo(ioutil.Discard)
if err != nil { if err != nil && err != io.EOF {
b.Fatal(err) b.Fatal(err)
} }
if n != nbytes { cancel()
b.Fatal("Failed to read correct amount")
}
} }
b.SetBytes(nbytes)
} }

View File

@ -50,6 +50,7 @@ type ReadSeekCloser interface {
io.Reader io.Reader
io.Seeker io.Seeker
io.Closer io.Closer
io.WriterTo
} }
// NewDagReader creates a new reader object that reads the data represented by the given // NewDagReader creates a new reader object that reads the data represented by the given
@ -68,22 +69,26 @@ func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (*Dag
case ftpb.Data_Raw: case ftpb.Data_Raw:
fallthrough fallthrough
case ftpb.Data_File: case ftpb.Data_File:
fctx, cancel := context.WithCancel(ctx) return newDataFileReader(ctx, n, pb, serv), nil
promises := serv.GetDAG(fctx, n)
return &DagReader{
node: n,
serv: serv,
buf: NewRSNCFromBytes(pb.GetData()),
promises: promises,
ctx: fctx,
cancel: cancel,
pbdata: pb,
}, nil
default: default:
return nil, ft.ErrUnrecognizedType return nil, ft.ErrUnrecognizedType
} }
} }
func newDataFileReader(ctx context.Context, n *mdag.Node, pb *ftpb.Data, serv mdag.DAGService) *DagReader {
fctx, cancel := context.WithCancel(ctx)
promises := serv.GetDAG(fctx, n)
return &DagReader{
node: n,
serv: serv,
buf: NewRSNCFromBytes(pb.GetData()),
promises: promises,
ctx: fctx,
cancel: cancel,
pbdata: pb,
}
}
// precalcNextBuf follows the next link in line and loads it from the DAGService, // precalcNextBuf follows the next link in line and loads it from the DAGService,
// setting the next buffer to read from // setting the next buffer to read from
func (dr *DagReader) precalcNextBuf() error { func (dr *DagReader) precalcNextBuf() error {
@ -108,11 +113,7 @@ func (dr *DagReader) precalcNextBuf() error {
// A directory should not exist within a file // A directory should not exist within a file
return ft.ErrInvalidDirLocation return ft.ErrInvalidDirLocation
case ftpb.Data_File: case ftpb.Data_File:
subr, err := NewDagReader(dr.ctx, nxt, dr.serv) dr.buf = newDataFileReader(dr.ctx, nxt, pb, dr.serv)
if err != nil {
return err
}
dr.buf = subr
return nil return nil
case ftpb.Data_Raw: case ftpb.Data_Raw:
dr.buf = NewRSNCFromBytes(pb.GetData()) dr.buf = NewRSNCFromBytes(pb.GetData())
@ -156,6 +157,31 @@ func (dr *DagReader) Read(b []byte) (int, error) {
} }
} }
func (dr *DagReader) WriteTo(w io.Writer) (int64, error) {
// If no cached buffer, load one
total := int64(0)
for {
// Attempt to write bytes from cached buffer
n, err := dr.buf.WriteTo(w)
total += n
dr.offset += n
if err != nil {
if err != io.EOF {
return total, err
}
}
// Otherwise, load up the next block
err = dr.precalcNextBuf()
if err != nil {
if err == io.EOF {
return total, nil
}
return total, err
}
}
}
func (dr *DagReader) Close() error { func (dr *DagReader) Close() error {
dr.cancel() dr.cancel()
return nil return nil
@ -163,6 +189,8 @@ func (dr *DagReader) Close() error {
// Seek implements io.Seeker, and will seek to a given offset in the file // Seek implements io.Seeker, and will seek to a given offset in the file
// interface matches standard unix seek // interface matches standard unix seek
// TODO: check if we can do relative seeks, to reduce the amount of dagreader
// recreations that need to happen.
func (dr *DagReader) Seek(offset int64, whence int) (int64, error) { func (dr *DagReader) Seek(offset int64, whence int) (int64, error) {
switch whence { switch whence {
case os.SEEK_SET: case os.SEEK_SET: