mirror of
https://github.com/ipfs/kubo.git
synced 2025-10-14 02:07:16 +08:00
working on making importer not break on large files
This commit is contained in:
@ -3,7 +3,6 @@ package importer
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
|
||||
dag "github.com/jbenet/go-ipfs/merkledag"
|
||||
@ -20,32 +19,17 @@ var ErrSizeLimitExceeded = fmt.Errorf("object size limit exceeded")
|
||||
|
||||
// NewDagFromReader constructs a Merkle DAG from the given io.Reader.
|
||||
// size required for block construction.
|
||||
func NewDagFromReader(r io.Reader, size int64) (*dag.Node, error) {
|
||||
// todo: block-splitting based on rabin fingerprinting
|
||||
// todo: block-splitting with user-defined function
|
||||
// todo: block-splitting at all. :P
|
||||
// todo: write mote todos
|
||||
func NewDagFromReader(r io.Reader) (*dag.Node, error) {
|
||||
blkChan := SplitterBySize(1024 * 512)(r)
|
||||
root := &dag.Node{}
|
||||
|
||||
// totally just trusts the reported size. fix later.
|
||||
if size > BlockSizeLimit { // 1 MB limit for now.
|
||||
return nil, ErrSizeLimitExceeded
|
||||
for blk := range blkChan {
|
||||
child := &dag.Node{Data: blk}
|
||||
err := root.AddNodeLink("", child)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that we dont get stuck reading way too much data
|
||||
r = io.LimitReader(r, BlockSizeLimit)
|
||||
|
||||
// we're doing it live!
|
||||
buf, err := ioutil.ReadAll(r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if int64(len(buf)) > BlockSizeLimit {
|
||||
return nil, ErrSizeLimitExceeded // lying punk.
|
||||
}
|
||||
|
||||
root := &dag.Node{Data: buf}
|
||||
// no children for now because not block splitting yet
|
||||
return root, nil
|
||||
}
|
||||
|
||||
@ -66,5 +50,5 @@ func NewDagFromFile(fpath string) (*dag.Node, error) {
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
return NewDagFromReader(f, stat.Size())
|
||||
return NewDagFromReader(f)
|
||||
}
|
||||
|
@ -1,15 +1,14 @@
|
||||
package importer
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"crypto/rand"
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestDataSplitting(t *testing.T) {
|
||||
buf := make([]byte, 16*1024*1024)
|
||||
rand.Read(buf)
|
||||
|
||||
split := Rabin(buf)
|
||||
|
||||
if len(split) == 1 {
|
||||
@ -47,4 +46,3 @@ func TestDataSplitting(t *testing.T) {
|
||||
t.Log(len(split))
|
||||
t.Log(min, max, mxcount)
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,37 @@
|
||||
package importer
|
||||
|
||||
type BlockSplitter func([]byte) [][]byte
|
||||
import (
|
||||
"io"
|
||||
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
)
|
||||
|
||||
type BlockSplitter func(io.Reader) chan []byte
|
||||
|
||||
func SplitterBySize(n int) BlockSplitter {
|
||||
return func(r io.Reader) chan []byte {
|
||||
out := make(chan []byte)
|
||||
go func(n int) {
|
||||
defer close(out)
|
||||
for {
|
||||
chunk := make([]byte, n)
|
||||
nread, err := r.Read(chunk)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
return
|
||||
}
|
||||
u.PErr("block split error: %v\n", err)
|
||||
return
|
||||
}
|
||||
if nread < n {
|
||||
chunk = chunk[:n]
|
||||
}
|
||||
out <- chunk
|
||||
}
|
||||
}(n)
|
||||
return out
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: this should take a reader, not a byte array. what if we're splitting a 3TB file?
|
||||
func Rabin(b []byte) [][]byte {
|
||||
@ -39,7 +70,7 @@ func Rabin(b []byte) [][]byte {
|
||||
}
|
||||
|
||||
// first 13 bits of polynomial are 0
|
||||
if poly % 8192 == 0 && i-blk_beg_i >= min_blk_size {
|
||||
if poly%8192 == 0 && i-blk_beg_i >= min_blk_size {
|
||||
// push block
|
||||
out = append(out, b[blk_beg_i:i])
|
||||
blk_beg_i = i
|
||||
|
Reference in New Issue
Block a user