1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-05-17 15:06:47 +08:00

implement dagmodifier and tests.

This commit is contained in:
Jeromy
2014-10-06 23:49:45 +00:00
parent 77fccaa2f3
commit 3591e10b2e
9 changed files with 489 additions and 37 deletions

1
.gitignore vendored
View File

@ -3,3 +3,4 @@
*.out
*.test
*.orig
*~

View File

@ -36,7 +36,7 @@ func NewBlockService(d ds.Datastore, rem exchange.Interface) (*BlockService, err
// AddBlock adds a particular block to the service, Putting it into the datastore.
func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
k := b.Key()
log.Debug("storing [%s] in datastore", k)
log.Debug("blockservice: storing [%s] in datastore", k.Pretty())
// TODO(brian): define a block datastore with a Put method which accepts a
// block parameter
err := s.Datastore.Put(k.DsKey(), b.Data)
@ -53,7 +53,7 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
// GetBlock retrieves a particular block from the service,
// Getting it from the datastore using the key (hash).
func (s *BlockService) GetBlock(k u.Key) (*blocks.Block, error) {
log.Debug("BlockService GetBlock: '%s'", k)
log.Debug("BlockService GetBlock: '%s'", k.Pretty())
datai, err := s.Datastore.Get(k.DsKey())
if err == nil {
log.Debug("Blockservice: Got data in datastore.")

View File

@ -0,0 +1,190 @@
package dagwriter
import (
"errors"
"code.google.com/p/goprotobuf/proto"
imp "github.com/jbenet/go-ipfs/importer"
ft "github.com/jbenet/go-ipfs/importer/format"
mdag "github.com/jbenet/go-ipfs/merkledag"
u "github.com/jbenet/go-ipfs/util"
)
// DagModifier is the only struct licensed and able to correctly
// perform surgery on a DAG 'file'
// Dear god, please rename this to something more pleasant
type DagModifier struct {
dagserv *mdag.DAGService
curNode *mdag.Node
pbdata *ft.PBData
}
func NewDagModifier(from *mdag.Node, serv *mdag.DAGService) (*DagModifier, error) {
pbd, err := ft.FromBytes(from.Data)
if err != nil {
return nil, err
}
return &DagModifier{
curNode: from.Copy(),
dagserv: serv,
pbdata: pbd,
}, nil
}
// WriteAt will modify a dag file in place
// NOTE: it currently assumes only a single level of indirection
func (dm *DagModifier) WriteAt(b []byte, offset uint64) (int, error) {
// Check bounds
if dm.pbdata.GetFilesize() < offset {
return 0, errors.New("Attempted to perform write starting past end of file")
}
// This shouldnt be necessary if we do subblocks sizes properly
newsize := dm.pbdata.GetFilesize()
if uint64(len(b))+offset > dm.pbdata.GetFilesize() {
newsize = uint64(len(b)) + offset
}
// First need to find where we are writing at
end := uint64(len(b)) + offset
zeroblocklen := uint64(len(dm.pbdata.Data))
origlen := len(b)
if end <= zeroblocklen {
log.Debug("Writing into zero block.")
// Replacing zeroeth data block (embedded in the root node)
//TODO: check chunking here
copy(dm.pbdata.Data[offset:], b)
return len(b), nil
}
// Find where write should start
var traversed uint64
startsubblk := len(dm.pbdata.Blocksizes)
if offset < zeroblocklen {
dm.pbdata.Data = dm.pbdata.Data[:offset]
startsubblk = 0
} else {
traversed = uint64(zeroblocklen)
for i, size := range dm.pbdata.Blocksizes {
if uint64(offset) < traversed+size {
log.Debug("Starting mod at block %d. [%d < %d + %d]", i, offset, traversed, size)
// Here is where we start
startsubblk = i
lnk := dm.curNode.Links[i]
node, err := dm.dagserv.Get(u.Key(lnk.Hash))
if err != nil {
return 0, err
}
data, err := ft.UnwrapData(node.Data)
if err != nil {
return 0, err
}
b = append(data[:offset-traversed], b...)
break
}
traversed += size
}
if startsubblk == len(dm.pbdata.Blocksizes) {
// TODO: something?
/*
if traversed < offset {
return 0, errors.New("Tried to start write outside bounds of file.")
}
*/
}
}
// Find blocks that need to be overwritten
var changed []int
mid := -1
var midoff uint64
for i, size := range dm.pbdata.Blocksizes[startsubblk:] {
if end > traversed {
changed = append(changed, i+startsubblk)
} else if end == traversed {
break
} else {
break
}
traversed += size
if end < traversed {
mid = i + startsubblk
midoff = end - (traversed - size)
break
}
}
var midlnk *mdag.Link
if mid >= 0 {
midlnk = dm.curNode.Links[mid]
midnode, err := dm.dagserv.Get(u.Key(midlnk.Hash))
if err != nil {
return 0, err
}
// NOTE: this may have to be changed later when we have multiple
// layers of indirection
data, err := ft.UnwrapData(midnode.Data)
if err != nil {
return 0, err
}
b = append(b, data[midoff:]...)
}
// TODO: dont assume a splitting func here
subblocks := splitBytes(b, &imp.SizeSplitter2{512})
var links []*mdag.Link
var sizes []uint64
for _, sb := range subblocks {
n := &mdag.Node{Data: ft.WrapData(sb)}
_, err := dm.dagserv.Add(n)
if err != nil {
log.Error("Failed adding node to DAG service: %s", err)
return 0, err
}
lnk, err := mdag.MakeLink(n)
if err != nil {
return 0, err
}
links = append(links, lnk)
sizes = append(sizes, uint64(len(sb)))
}
// This is disgusting
if len(changed) > 0 {
dm.curNode.Links = append(dm.curNode.Links[:changed[0]], append(links, dm.curNode.Links[changed[len(changed)-1]+1:]...)...)
dm.pbdata.Blocksizes = append(dm.pbdata.Blocksizes[:changed[0]], append(sizes, dm.pbdata.Blocksizes[changed[len(changed)-1]+1:]...)...)
} else {
dm.curNode.Links = append(dm.curNode.Links, links...)
dm.pbdata.Blocksizes = append(dm.pbdata.Blocksizes, sizes...)
}
dm.pbdata.Filesize = proto.Uint64(newsize)
return origlen, nil
}
func splitBytes(b []byte, spl imp.StreamSplitter) [][]byte {
ch := make(chan []byte)
out := spl.Split(ch)
ch <- b
close(ch)
var arr [][]byte
for blk := range out {
arr = append(arr, blk)
}
return arr
}
func (dm *DagModifier) GetNode() (*mdag.Node, error) {
b, err := proto.Marshal(dm.pbdata)
if err != nil {
return nil, err
}
dm.curNode.Data = b
return dm.curNode.Copy(), nil
}

View File

@ -0,0 +1,187 @@
package dagwriter
import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"testing"
"time"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/op/go-logging"
bs "github.com/jbenet/go-ipfs/blockservice"
imp "github.com/jbenet/go-ipfs/importer"
ft "github.com/jbenet/go-ipfs/importer/format"
mdag "github.com/jbenet/go-ipfs/merkledag"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
)
type randGen struct {
src rand.Source
}
func newRand() *randGen {
return &randGen{rand.NewSource(time.Now().UnixNano())}
}
func (r *randGen) Read(p []byte) (n int, err error) {
todo := len(p)
offset := 0
for {
val := int64(r.src.Int63())
for i := 0; i < 8; i++ {
p[offset] = byte(val & 0xff)
todo--
if todo == 0 {
return len(p), nil
}
offset++
val >>= 8
}
}
panic("unreachable")
}
func getMockDagServ(t *testing.T) *mdag.DAGService {
dstore := ds.NewMapDatastore()
bserv, err := bs.NewBlockService(dstore, nil)
if err != nil {
t.Fatal(err)
}
return &mdag.DAGService{bserv}
}
func getNode(t *testing.T, dserv *mdag.DAGService, size int64) ([]byte, *mdag.Node) {
dw := NewDagWriter(dserv, &imp.SizeSplitter2{500})
n, err := io.CopyN(dw, newRand(), size)
if err != nil {
t.Fatal(err)
}
if n != size {
t.Fatal("Incorrect copy amount!")
}
dw.Close()
node := dw.GetNode()
dr, err := mdag.NewDagReader(node, dserv)
if err != nil {
t.Fatal(err)
}
b, err := ioutil.ReadAll(dr)
if err != nil {
t.Fatal(err)
}
return b, node
}
func testModWrite(t *testing.T, beg, size uint64, orig []byte, dm *DagModifier) []byte {
newdata := make([]byte, size)
r := newRand()
r.Read(newdata)
if size+beg > uint64(len(orig)) {
orig = append(orig, make([]byte, (size+beg)-uint64(len(orig)))...)
}
copy(orig[beg:], newdata)
nmod, err := dm.WriteAt(newdata, uint64(beg))
if err != nil {
t.Fatal(err)
}
if nmod != int(size) {
t.Fatalf("Mod length not correct! %d != %d", nmod, size)
}
nd, err := dm.GetNode()
if err != nil {
t.Fatal(err)
}
rd, err := mdag.NewDagReader(nd, dm.dagserv)
if err != nil {
t.Fatal(err)
}
after, err := ioutil.ReadAll(rd)
if err != nil {
t.Fatal(err)
}
err = arrComp(after, orig)
if err != nil {
t.Fatal(err)
}
return orig
}
func TestDagModifierBasic(t *testing.T) {
logging.SetLevel(logging.CRITICAL, "blockservice")
logging.SetLevel(logging.CRITICAL, "merkledag")
dserv := getMockDagServ(t)
b, n := getNode(t, dserv, 50000)
dagmod, err := NewDagModifier(n, dserv)
if err != nil {
t.Fatal(err)
}
// Within zero block
beg := uint64(15)
length := uint64(60)
t.Log("Testing mod within zero block")
b = testModWrite(t, beg, length, b, dagmod)
// Within bounds of existing file
beg = 1000
length = 4000
t.Log("Testing mod within bounds of existing file.")
b = testModWrite(t, beg, length, b, dagmod)
// Extend bounds
beg = 49500
length = 4000
t.Log("Testing mod that extends file.")
b = testModWrite(t, beg, length, b, dagmod)
// "Append"
beg = uint64(len(b))
length = 3000
b = testModWrite(t, beg, length, b, dagmod)
// Verify reported length
node, err := dagmod.GetNode()
if err != nil {
t.Fatal(err)
}
size, err := ft.DataSize(node.Data)
if err != nil {
t.Fatal(err)
}
expected := uint64(50000 + 3500 + 3000)
if size != expected {
t.Fatal("Final reported size is incorrect [%d != %d]", size, expected)
}
}
func arrComp(a, b []byte) error {
if len(a) != len(b) {
return fmt.Errorf("Arrays differ in length. %d != %d", len(a), len(b))
}
for i, v := range a {
if v != b[i] {
return fmt.Errorf("Arrays differ at index: %d", i)
}
}
return nil
}

View File

@ -32,10 +32,11 @@ func NewDagWriter(ds *dag.DAGService, splitter imp.StreamSplitter) *DagWriter {
func (dw *DagWriter) startSplitter() {
blkchan := dw.splitter.Split(dw.splChan)
first := <-blkchan
mbf := new(ft.MultiBlock)
root := new(dag.Node)
fileSize := uint64(0)
for blkData := range blkchan {
fileSize += uint64(len(blkData))
mbf.AddBlockSize(uint64(len(blkData)))
node := &dag.Node{Data: ft.WrapData(blkData)}
_, err := dw.dagserv.Add(node)
if err != nil {
@ -50,8 +51,16 @@ func (dw *DagWriter) startSplitter() {
return
}
}
root.Data = ft.FilePBData(first, fileSize)
_, err := dw.dagserv.Add(root)
mbf.Data = first
data, err := mbf.GetBytes()
if err != nil {
dw.seterr = err
log.Critical("Failed generating bytes for multiblock file: %s", err)
return
}
root.Data = data
_, err = dw.dagserv.Add(root)
if err != nil {
dw.seterr = err
log.Critical("Got error adding created node to dagservice: %s", err)

View File

@ -100,3 +100,28 @@ func TestMassiveWrite(t *testing.T) {
}
dw.Close()
}
func BenchmarkDagWriter(b *testing.B) {
dstore := ds.NewNullDatastore()
bserv, err := bs.NewBlockService(dstore, nil)
if err != nil {
b.Fatal(err)
}
dag := &mdag.DAGService{bserv}
b.ResetTimer()
nbytes := int64(b.N)
for i := 0; i < b.N; i++ {
b.SetBytes(nbytes)
dw := NewDagWriter(dag, &imp.SizeSplitter2{4096})
n, err := io.CopyN(dw, &datasource{}, nbytes)
if err != nil {
b.Fatal(err)
}
if n != nbytes {
b.Fatal("Incorrect copy size.")
}
dw.Close()
}
}

View File

@ -8,6 +8,15 @@ import (
"code.google.com/p/goprotobuf/proto"
)
func FromBytes(data []byte) (*PBData, error) {
pbdata := new(PBData)
err := proto.Unmarshal(data, pbdata)
if err != nil {
return nil, err
}
return pbdata, nil
}
func FilePBData(data []byte, totalsize uint64) []byte {
pbfile := new(PBData)
typ := PBData_File
@ -51,6 +60,15 @@ func WrapData(b []byte) []byte {
return out
}
func UnwrapData(data []byte) ([]byte, error) {
pbdata := new(PBData)
err := proto.Unmarshal(data, pbdata)
if err != nil {
return nil, err
}
return pbdata.GetData(), nil
}
func DataSize(data []byte) (uint64, error) {
pbdata := new(PBData)
err := proto.Unmarshal(data, pbdata)
@ -69,3 +87,24 @@ func DataSize(data []byte) (uint64, error) {
return 0, errors.New("Unrecognized node data type!")
}
}
type MultiBlock struct {
Data []byte
blocksizes []uint64
subtotal uint64
}
func (mb *MultiBlock) AddBlockSize(s uint64) {
mb.subtotal += s
mb.blocksizes = append(mb.blocksizes, s)
}
func (mb *MultiBlock) GetBytes() ([]byte, error) {
pbn := new(PBData)
t := PBData_File
pbn.Type = &t
pbn.Filesize = proto.Uint64(uint64(len(mb.Data)) + mb.subtotal)
pbn.Blocksizes = mb.blocksizes
pbn.Data = mb.Data
return proto.Marshal(pbn)
}

View File

@ -31,19 +31,23 @@ func NewDagFromReaderWithSplitter(r io.Reader, spl BlockSplitter) (*dag.Node, er
first := <-blkChan
root := &dag.Node{}
i := 0
totalsize := uint64(len(first))
mbf := new(ft.MultiBlock)
for blk := range blkChan {
totalsize += uint64(len(blk))
mbf.AddBlockSize(uint64(len(blk)))
child := &dag.Node{Data: ft.WrapData(blk)}
err := root.AddNodeLink(fmt.Sprintf("%d", i), child)
err := root.AddNodeLink("", child)
if err != nil {
return nil, err
}
i++
}
root.Data = ft.FilePBData(first, totalsize)
mbf.Data = first
data, err := mbf.GetBytes()
if err != nil {
return nil, err
}
root.Data = data
return root, nil
}

View File

@ -34,9 +34,6 @@ type Link struct {
// cumulative size of target object
Size uint64
// cumulative size of data stored in object
DataSize uint64
// multihash of the target object
Hash mh.Multihash
@ -44,45 +41,45 @@ type Link struct {
Node *Node
}
// AddNodeLink adds a link to another node.
func (n *Node) AddNodeLink(name string, that *Node) error {
s, err := that.Size()
func MakeLink(n *Node) (*Link, error) {
s, err := n.Size()
if err != nil {
return err
return nil, err
}
h, err := that.Multihash()
h, err := n.Multihash()
if err != nil {
return err
return nil, err
}
n.Links = append(n.Links, &Link{
Name: name,
return &Link{
Size: s,
Hash: h,
Node: that,
})
}, nil
}
// AddNodeLink adds a link to another node.
func (n *Node) AddNodeLink(name string, that *Node) error {
lnk, err := MakeLink(that)
if err != nil {
return err
}
lnk.Name = name
lnk.Node = that
n.Links = append(n.Links, lnk)
return nil
}
// AddNodeLink adds a link to another node. without keeping a reference to
// the child node
func (n *Node) AddNodeLinkClean(name string, that *Node) error {
s, err := that.Size()
lnk, err := MakeLink(that)
if err != nil {
return err
}
lnk.Name = name
h, err := that.Multihash()
if err != nil {
return err
}
n.Links = append(n.Links, &Link{
Name: name,
Size: s,
Hash: h,
})
n.Links = append(n.Links, lnk)
return nil
}