1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-29 17:36:38 +08:00

clear out memory after reads from the dagreader

License: MIT
Signed-off-by: Jeromy <jeromyj@gmail.com>
This commit is contained in:
Jeromy
2017-12-26 14:30:22 -08:00
parent 64ae9344fc
commit 092df586d1

View File

@ -10,7 +10,9 @@ import (
ft "github.com/ipfs/go-ipfs/unixfs"
ftpb "github.com/ipfs/go-ipfs/unixfs/pb"
node "gx/ipfs/QmNwUEK7QbwSqyKBu3mMtToo8SUc6wQJ7gdZq4gGGJqfnf/go-ipld-format"
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
cid "gx/ipfs/QmeSrf6pzut73u6zLQkRFQ3ygt3k6XFT2kjdYP8Tnkwwyg/go-cid"
)
// DagReader provides a way to easily read the data contained in a dag.
@ -30,6 +32,9 @@ type pbDagReader struct {
// NodeGetters for each of 'nodes' child links
promises []mdag.NodeGetter
// the cid of each child of the current node
links []*cid.Cid
// the index of the child link currently being read from
linkPosition int
@ -47,30 +52,54 @@ var _ DagReader = (*pbDagReader)(nil)
func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv mdag.DAGService) *pbDagReader {
fctx, cancel := context.WithCancel(ctx)
promises := mdag.GetDAG(fctx, serv, n)
curLinks := getLinkCids(n)
return &pbDagReader{
node: n,
serv: serv,
buf: NewBufDagReader(pb.GetData()),
promises: promises,
promises: make([]mdag.NodeGetter, len(curLinks)),
links: curLinks,
ctx: fctx,
cancel: cancel,
pbdata: pb,
}
}
const preloadSize = 10
func (dr *pbDagReader) preloadNextNodes(ctx context.Context) {
beg := dr.linkPosition
end := beg + preloadSize
if end >= len(dr.links) {
end = len(dr.links)
}
for i, p := range mdag.GetNodes(ctx, dr.serv, dr.links[beg:end]) {
dr.promises[beg+i] = p
}
}
// precalcNextBuf follows the next link in line and loads it from the
// DAGService, setting the next buffer to read from
func (dr *pbDagReader) precalcNextBuf(ctx context.Context) error {
dr.buf.Close() // Just to make sure
if dr.buf != nil {
dr.buf.Close() // Just to make sure
dr.buf = nil
}
if dr.linkPosition >= len(dr.promises) {
return io.EOF
}
if dr.promises[dr.linkPosition] == nil {
dr.preloadNextNodes(ctx)
}
nxt, err := dr.promises[dr.linkPosition].Get(ctx)
if err != nil {
return err
}
dr.promises[dr.linkPosition] = nil
dr.linkPosition++
switch nxt := nxt.(type) {
@ -105,6 +134,15 @@ func (dr *pbDagReader) precalcNextBuf(ctx context.Context) error {
}
}
func getLinkCids(n node.Node) []*cid.Cid {
links := n.Links()
out := make([]*cid.Cid, 0, len(links))
for _, l := range links {
out = append(out, l.Cid)
}
return out
}
// Size return the total length of the data from the DAG structured file.
func (dr *pbDagReader) Size() uint64 {
return dr.pbdata.GetFilesize()
@ -117,6 +155,12 @@ func (dr *pbDagReader) Read(b []byte) (int, error) {
// CtxReadFull reads data from the DAG structured file
func (dr *pbDagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) {
if dr.buf == nil {
if err := dr.precalcNextBuf(ctx); err != nil {
return 0, err
}
}
// If no cached buffer, load one
total := 0
for {
@ -145,6 +189,12 @@ func (dr *pbDagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) {
}
func (dr *pbDagReader) WriteTo(w io.Writer) (int64, error) {
if dr.buf == nil {
if err := dr.precalcNextBuf(dr.ctx); err != nil {
return 0, err
}
}
// If no cached buffer, load one
total := int64(0)
for {
@ -199,7 +249,9 @@ func (dr *pbDagReader) Seek(offset int64, whence int) (int64, error) {
left := offset
if int64(len(pb.Data)) >= offset {
// Close current buf to close potential child dagreader
dr.buf.Close()
if dr.buf != nil {
dr.buf.Close()
}
dr.buf = NewBufDagReader(pb.GetData()[offset:])
// start reading links from the beginning