1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-30 01:52:26 +08:00

Merge pull request #4525 from ipfs/fix/dagreader-mem

clear out memory after reads from the dagreader
This commit is contained in:
Whyrusleeping
2018-01-04 10:11:43 -08:00
committed by GitHub
2 changed files with 106 additions and 4 deletions

View File

@ -4,6 +4,7 @@ import (
"bytes"
"io"
"io/ioutil"
"math/rand"
"strings"
"testing"
@ -72,6 +73,55 @@ func TestSeekAndRead(t *testing.T) {
}
}
func TestSeekAndReadLarge(t *testing.T) {
dserv := testu.GetDAGServ()
inbuf := make([]byte, 20000)
rand.Read(inbuf)
node := testu.GetNode(t, dserv, inbuf, testu.UseProtoBufLeaves)
ctx, closer := context.WithCancel(context.Background())
defer closer()
reader, err := NewDagReader(ctx, node, dserv)
if err != nil {
t.Fatal(err)
}
_, err = reader.Seek(10000, io.SeekStart)
if err != nil {
t.Fatal(err)
}
buf := make([]byte, 100)
_, err = io.ReadFull(reader, buf)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(buf, inbuf[10000:10100]) {
t.Fatal("seeked read failed")
}
pbdr := reader.(*pbDagReader)
var count int
for i, p := range pbdr.promises {
if i > 20 && i < 30 {
if p == nil {
t.Fatal("expected index to be not nil: ", i)
}
count++
} else {
if p != nil {
t.Fatal("expected index to be nil: ", i)
}
}
}
// -1 because we read some and it cleared one
if count != preloadSize-1 {
t.Fatalf("expected %d preloaded promises, got %d", preloadSize-1, count)
}
}
func TestRelativeSeek(t *testing.T) {
dserv := testu.GetDAGServ()
ctx, closer := context.WithCancel(context.Background())

View File

@ -10,7 +10,9 @@ import (
ft "github.com/ipfs/go-ipfs/unixfs"
ftpb "github.com/ipfs/go-ipfs/unixfs/pb"
node "gx/ipfs/QmNwUEK7QbwSqyKBu3mMtToo8SUc6wQJ7gdZq4gGGJqfnf/go-ipld-format"
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
cid "gx/ipfs/QmeSrf6pzut73u6zLQkRFQ3ygt3k6XFT2kjdYP8Tnkwwyg/go-cid"
)
// DagReader provides a way to easily read the data contained in a dag.
@ -30,6 +32,9 @@ type pbDagReader struct {
// NodeGetters for each of 'nodes' child links
promises []mdag.NodeGetter
// the cid of each child of the current node
links []*cid.Cid
// the index of the child link currently being read from
linkPosition int
@ -47,30 +52,54 @@ var _ DagReader = (*pbDagReader)(nil)
func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv mdag.DAGService) *pbDagReader {
fctx, cancel := context.WithCancel(ctx)
promises := mdag.GetDAG(fctx, serv, n)
curLinks := getLinkCids(n)
return &pbDagReader{
node: n,
serv: serv,
buf: NewBufDagReader(pb.GetData()),
promises: promises,
promises: make([]mdag.NodeGetter, len(curLinks)),
links: curLinks,
ctx: fctx,
cancel: cancel,
pbdata: pb,
}
}
const preloadSize = 10
func (dr *pbDagReader) preloadNextNodes(ctx context.Context) {
beg := dr.linkPosition
end := beg + preloadSize
if end >= len(dr.links) {
end = len(dr.links)
}
for i, p := range mdag.GetNodes(ctx, dr.serv, dr.links[beg:end]) {
dr.promises[beg+i] = p
}
}
// precalcNextBuf follows the next link in line and loads it from the
// DAGService, setting the next buffer to read from
func (dr *pbDagReader) precalcNextBuf(ctx context.Context) error {
dr.buf.Close() // Just to make sure
if dr.buf != nil {
dr.buf.Close() // Just to make sure
dr.buf = nil
}
if dr.linkPosition >= len(dr.promises) {
return io.EOF
}
if dr.promises[dr.linkPosition] == nil {
dr.preloadNextNodes(ctx)
}
nxt, err := dr.promises[dr.linkPosition].Get(ctx)
if err != nil {
return err
}
dr.promises[dr.linkPosition] = nil
dr.linkPosition++
switch nxt := nxt.(type) {
@ -105,6 +134,15 @@ func (dr *pbDagReader) precalcNextBuf(ctx context.Context) error {
}
}
func getLinkCids(n node.Node) []*cid.Cid {
links := n.Links()
out := make([]*cid.Cid, 0, len(links))
for _, l := range links {
out = append(out, l.Cid)
}
return out
}
// Size return the total length of the data from the DAG structured file.
func (dr *pbDagReader) Size() uint64 {
return dr.pbdata.GetFilesize()
@ -117,6 +155,12 @@ func (dr *pbDagReader) Read(b []byte) (int, error) {
// CtxReadFull reads data from the DAG structured file
func (dr *pbDagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) {
if dr.buf == nil {
if err := dr.precalcNextBuf(ctx); err != nil {
return 0, err
}
}
// If no cached buffer, load one
total := 0
for {
@ -145,6 +189,12 @@ func (dr *pbDagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) {
}
func (dr *pbDagReader) WriteTo(w io.Writer) (int64, error) {
if dr.buf == nil {
if err := dr.precalcNextBuf(dr.ctx); err != nil {
return 0, err
}
}
// If no cached buffer, load one
total := int64(0)
for {
@ -199,7 +249,9 @@ func (dr *pbDagReader) Seek(offset int64, whence int) (int64, error) {
left := offset
if int64(len(pb.Data)) >= offset {
// Close current buf to close potential child dagreader
dr.buf.Close()
if dr.buf != nil {
dr.buf.Close()
}
dr.buf = NewBufDagReader(pb.GetData()[offset:])
// start reading links from the beginning