mirror of
https://github.com/ipfs/kubo.git
synced 2025-10-25 02:16:56 +08:00
use rabin fingerprinting for a chunker
License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com> implement rabin fingerprinting as a chunker for ipfs License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com> vendor correctly License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com> refactor chunking interface a little License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com> work chunking interface changes up into importer License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com> move chunker type parsing into its own file in chunk License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com>
This commit is contained in:
@ -40,7 +40,7 @@ type DagModifier struct {
|
||||
curNode *mdag.Node
|
||||
mp pin.ManualPinner
|
||||
|
||||
splitter chunk.BlockSplitter
|
||||
splitter chunk.SplitterGen
|
||||
ctx context.Context
|
||||
readCancel func()
|
||||
|
||||
@ -51,7 +51,7 @@ type DagModifier struct {
|
||||
read *uio.DagReader
|
||||
}
|
||||
|
||||
func NewDagModifier(ctx context.Context, from *mdag.Node, serv mdag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*DagModifier, error) {
|
||||
func NewDagModifier(ctx context.Context, from *mdag.Node, serv mdag.DAGService, mp pin.ManualPinner, spl chunk.SplitterGen) (*DagModifier, error) {
|
||||
return &DagModifier{
|
||||
curNode: from.Copy(),
|
||||
dagserv: serv,
|
||||
@ -106,10 +106,10 @@ func (zr zeroReader) Read(b []byte) (int, error) {
|
||||
// expandSparse grows the file with zero blocks of 4096
|
||||
// A small blocksize is chosen to aid in deduplication
|
||||
func (dm *DagModifier) expandSparse(size int64) error {
|
||||
spl := chunk.SizeSplitter{4096}
|
||||
r := io.LimitReader(zeroReader{}, size)
|
||||
blks := spl.Split(r)
|
||||
nnode, err := dm.appendData(dm.curNode, blks)
|
||||
spl := chunk.NewSizeSplitter(r, 4096)
|
||||
blks, errs := chunk.Chan(spl)
|
||||
nnode, err := dm.appendData(dm.curNode, blks, errs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -196,8 +196,8 @@ func (dm *DagModifier) Sync() error {
|
||||
|
||||
// need to write past end of current dag
|
||||
if !done {
|
||||
blks := dm.splitter.Split(dm.wrBuf)
|
||||
nd, err = dm.appendData(dm.curNode, blks)
|
||||
blks, errs := chunk.Chan(dm.splitter(dm.wrBuf))
|
||||
nd, err = dm.appendData(dm.curNode, blks, errs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -306,14 +306,14 @@ func (dm *DagModifier) modifyDag(node *mdag.Node, offset uint64, data io.Reader)
|
||||
}
|
||||
|
||||
// appendData appends the blocks from the given chan to the end of this dag
|
||||
func (dm *DagModifier) appendData(node *mdag.Node, blks <-chan []byte) (*mdag.Node, error) {
|
||||
func (dm *DagModifier) appendData(node *mdag.Node, blks <-chan []byte, errs <-chan error) (*mdag.Node, error) {
|
||||
dbp := &help.DagBuilderParams{
|
||||
Dagserv: dm.dagserv,
|
||||
Maxlinks: help.DefaultLinksPerBlock,
|
||||
NodeCB: imp.BasicPinnerCB(dm.mp),
|
||||
}
|
||||
|
||||
return trickle.TrickleAppend(node, dbp.New(blks))
|
||||
return trickle.TrickleAppend(node, dbp.New(blks, errs))
|
||||
}
|
||||
|
||||
// Read data from this dag starting at the current offset
|
||||
|
||||
@ -53,7 +53,7 @@ func getMockDagServAndBstore(t testing.TB) (mdag.DAGService, blockstore.Blocksto
|
||||
|
||||
func getNode(t testing.TB, dserv mdag.DAGService, size int64, pinner pin.ManualPinner) ([]byte, *mdag.Node) {
|
||||
in := io.LimitReader(u.NewTimeSeededRand(), size)
|
||||
node, err := imp.BuildTrickleDagFromReader(in, dserv, &chunk.SizeSplitter{500}, imp.BasicPinnerCB(pinner))
|
||||
node, err := imp.BuildTrickleDagFromReader(dserv, sizeSplitterGen(500)(in), imp.BasicPinnerCB(pinner))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -117,13 +117,19 @@ func testModWrite(t *testing.T, beg, size uint64, orig []byte, dm *DagModifier)
|
||||
return orig
|
||||
}
|
||||
|
||||
func sizeSplitterGen(size int64) chunk.SplitterGen {
|
||||
return func(r io.Reader) chunk.Splitter {
|
||||
return chunk.NewSizeSplitter(r, size)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDagModifierBasic(t *testing.T) {
|
||||
dserv, pin := getMockDagServ(t)
|
||||
b, n := getNode(t, dserv, 50000, pin)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
dagmod, err := NewDagModifier(ctx, n, dserv, pin, &chunk.SizeSplitter{Size: 512})
|
||||
dagmod, err := NewDagModifier(ctx, n, dserv, pin, sizeSplitterGen(512))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -178,7 +184,7 @@ func TestMultiWrite(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512})
|
||||
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -231,7 +237,7 @@ func TestMultiWriteAndFlush(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512})
|
||||
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -279,7 +285,7 @@ func TestWriteNewFile(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512})
|
||||
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -322,7 +328,7 @@ func TestMultiWriteCoal(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512})
|
||||
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -368,7 +374,7 @@ func TestLargeWriteChunks(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512})
|
||||
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -406,7 +412,7 @@ func TestDagTruncate(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512})
|
||||
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -437,7 +443,7 @@ func TestSparseWrite(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512})
|
||||
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -491,7 +497,7 @@ func TestCorrectPinning(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512})
|
||||
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -598,7 +604,7 @@ func BenchmarkDagmodWrite(b *testing.B) {
|
||||
|
||||
wrsize := 4096
|
||||
|
||||
dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512})
|
||||
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user