1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-29 17:36:38 +08:00

add more tests and rework a lot of utility structures

This commit is contained in:
Jeromy
2014-10-07 05:55:28 +00:00
parent 3591e10b2e
commit 5c802ae852
10 changed files with 175 additions and 128 deletions

View File

@ -1,6 +1,7 @@
package dagwriter
import (
"bytes"
"errors"
"code.google.com/p/goprotobuf/proto"
@ -18,19 +19,21 @@ type DagModifier struct {
dagserv *mdag.DAGService
curNode *mdag.Node
pbdata *ft.PBData
pbdata *ft.PBData
splitter imp.BlockSplitter
}
func NewDagModifier(from *mdag.Node, serv *mdag.DAGService) (*DagModifier, error) {
func NewDagModifier(from *mdag.Node, serv *mdag.DAGService, spl imp.BlockSplitter) (*DagModifier, error) {
pbd, err := ft.FromBytes(from.Data)
if err != nil {
return nil, err
}
return &DagModifier{
curNode: from.Copy(),
dagserv: serv,
pbdata: pbd,
curNode: from.Copy(),
dagserv: serv,
pbdata: pbd,
splitter: spl,
}, nil
}
@ -136,8 +139,7 @@ func (dm *DagModifier) WriteAt(b []byte, offset uint64) (int, error) {
b = append(b, data[midoff:]...)
}
// TODO: dont assume a splitting func here
subblocks := splitBytes(b, &imp.SizeSplitter2{512})
subblocks := splitBytes(b, dm.splitter)
var links []*mdag.Link
var sizes []uint64
for _, sb := range subblocks {
@ -168,11 +170,8 @@ func (dm *DagModifier) WriteAt(b []byte, offset uint64) (int, error) {
return origlen, nil
}
func splitBytes(b []byte, spl imp.StreamSplitter) [][]byte {
ch := make(chan []byte)
out := spl.Split(ch)
ch <- b
close(ch)
func splitBytes(b []byte, spl imp.BlockSplitter) [][]byte {
out := spl.Split(bytes.NewReader(b))
var arr [][]byte
for blk := range out {
arr = append(arr, blk)

View File

@ -4,46 +4,18 @@ import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"testing"
"time"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/op/go-logging"
bs "github.com/jbenet/go-ipfs/blockservice"
imp "github.com/jbenet/go-ipfs/importer"
ft "github.com/jbenet/go-ipfs/importer/format"
mdag "github.com/jbenet/go-ipfs/merkledag"
u "github.com/jbenet/go-ipfs/util"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
)
type randGen struct {
src rand.Source
}
func newRand() *randGen {
return &randGen{rand.NewSource(time.Now().UnixNano())}
}
func (r *randGen) Read(p []byte) (n int, err error) {
todo := len(p)
offset := 0
for {
val := int64(r.src.Int63())
for i := 0; i < 8; i++ {
p[offset] = byte(val & 0xff)
todo--
if todo == 0 {
return len(p), nil
}
offset++
val >>= 8
}
}
panic("unreachable")
}
func getMockDagServ(t *testing.T) *mdag.DAGService {
dstore := ds.NewMapDatastore()
bserv, err := bs.NewBlockService(dstore, nil)
@ -54,9 +26,9 @@ func getMockDagServ(t *testing.T) *mdag.DAGService {
}
func getNode(t *testing.T, dserv *mdag.DAGService, size int64) ([]byte, *mdag.Node) {
dw := NewDagWriter(dserv, &imp.SizeSplitter2{500})
dw := NewDagWriter(dserv, &imp.SizeSplitter{500})
n, err := io.CopyN(dw, newRand(), size)
n, err := io.CopyN(dw, u.NewFastRand(), size)
if err != nil {
t.Fatal(err)
}
@ -82,7 +54,7 @@ func getNode(t *testing.T, dserv *mdag.DAGService, size int64) ([]byte, *mdag.No
func testModWrite(t *testing.T, beg, size uint64, orig []byte, dm *DagModifier) []byte {
newdata := make([]byte, size)
r := newRand()
r := u.NewFastRand()
r.Read(newdata)
if size+beg > uint64(len(orig)) {
@ -127,7 +99,7 @@ func TestDagModifierBasic(t *testing.T) {
dserv := getMockDagServ(t)
b, n := getNode(t, dserv, 50000)
dagmod, err := NewDagModifier(n, dserv)
dagmod, err := NewDagModifier(n, dserv, &imp.SizeSplitter{512})
if err != nil {
t.Fatal(err)
}

View File

@ -15,11 +15,11 @@ type DagWriter struct {
totalSize int64
splChan chan []byte
done chan struct{}
splitter imp.StreamSplitter
splitter imp.BlockSplitter
seterr error
}
func NewDagWriter(ds *dag.DAGService, splitter imp.StreamSplitter) *DagWriter {
func NewDagWriter(ds *dag.DAGService, splitter imp.BlockSplitter) *DagWriter {
dw := new(DagWriter)
dw.dagserv = ds
dw.splChan = make(chan []byte, 8)
@ -30,7 +30,8 @@ func NewDagWriter(ds *dag.DAGService, splitter imp.StreamSplitter) *DagWriter {
}
func (dw *DagWriter) startSplitter() {
blkchan := dw.splitter.Split(dw.splChan)
r := util.NewByteChanReader(dw.splChan)
blkchan := dw.splitter.Split(r)
first := <-blkchan
mbf := new(ft.MultiBlock)
root := new(dag.Node)

View File

@ -54,7 +54,7 @@ func TestDagWriter(t *testing.T) {
t.Fatal(err)
}
dag := &mdag.DAGService{bserv}
dw := NewDagWriter(dag, &imp.SizeSplitter2{4096})
dw := NewDagWriter(dag, &imp.SizeSplitter{4096})
nbytes := int64(1024 * 1024 * 2)
n, err := io.CopyN(dw, &datasource{}, nbytes)
@ -88,7 +88,7 @@ func TestMassiveWrite(t *testing.T) {
t.Fatal(err)
}
dag := &mdag.DAGService{bserv}
dw := NewDagWriter(dag, &imp.SizeSplitter2{4096})
dw := NewDagWriter(dag, &imp.SizeSplitter{4096})
nbytes := int64(1024 * 1024 * 1024 * 16)
n, err := io.CopyN(dw, &datasource{}, nbytes)
@ -113,7 +113,7 @@ func BenchmarkDagWriter(b *testing.B) {
nbytes := int64(b.N)
for i := 0; i < b.N; i++ {
b.SetBytes(nbytes)
dw := NewDagWriter(dag, &imp.SizeSplitter2{4096})
dw := NewDagWriter(dag, &imp.SizeSplitter{4096})
n, err := io.CopyN(dw, &datasource{}, nbytes)
if err != nil {
b.Fatal(err)

View File

@ -0,0 +1,36 @@
package format
import (
"testing"
"code.google.com/p/goprotobuf/proto"
)
func TestMultiBlock(t *testing.T) {
mbf := new(MultiBlock)
for i := 0; i < 15; i++ {
mbf.AddBlockSize(100)
}
mbf.Data = make([]byte, 128)
b, err := mbf.GetBytes()
if err != nil {
t.Fatal(err)
}
pbn := new(PBData)
err = proto.Unmarshal(b, pbn)
if err != nil {
t.Fatal(err)
}
ds, err := DataSize(b)
if err != nil {
t.Fatal(err)
}
if ds != (100*15)+128 {
t.Fatal("Datasize calculations incorrect!")
}
}

View File

@ -7,8 +7,11 @@ import (
ft "github.com/jbenet/go-ipfs/importer/format"
dag "github.com/jbenet/go-ipfs/merkledag"
"github.com/jbenet/go-ipfs/util"
)
var log = util.Logger("importer")
// BlockSizeLimit specifies the maximum size an imported block can have.
var BlockSizeLimit = int64(1048576) // 1 MB

View File

@ -92,43 +92,3 @@ func (mr *MaybeRabin) Split(r io.Reader) chan []byte {
}()
return out
}
/*
func WhyrusleepingCantImplementRabin(r io.Reader) chan []byte {
out := make(chan []byte, 4)
go func() {
buf := bufio.NewReader(r)
blkbuf := new(bytes.Buffer)
window := make([]byte, 16)
var val uint64
prime := uint64(61)
get := func(i int) uint64 {
return uint64(window[i%len(window)])
}
set := func(i int, val byte) {
window[i%len(window)] = val
}
for i := 0; ; i++ {
curb, err := buf.ReadByte()
if err != nil {
break
}
set(i, curb)
blkbuf.WriteByte(curb)
hash := md5.Sum(window)
if hash[0] == 0 && hash[1] == 0 {
out <- blkbuf.Bytes()
blkbuf.Reset()
}
}
out <- blkbuf.Bytes()
close(out)
}()
return out
}
*/

View File

@ -1,19 +1,9 @@
package importer
import (
"io"
import "io"
u "github.com/jbenet/go-ipfs/util"
)
// OLD
type BlockSplitter interface {
Split(io.Reader) chan []byte
}
// NEW
type StreamSplitter interface {
Split(chan []byte) chan []byte
Split(r io.Reader) chan []byte
}
type SizeSplitter struct {
@ -34,7 +24,7 @@ func (ss *SizeSplitter) Split(r io.Reader) chan []byte {
}
return
}
u.PErr("block split error: %v\n", err)
log.Error("Block split error: %s", err)
return
}
if nread < ss.Size {
@ -45,26 +35,3 @@ func (ss *SizeSplitter) Split(r io.Reader) chan []byte {
}()
return out
}
type SizeSplitter2 struct {
Size int
}
func (ss *SizeSplitter2) Split(in chan []byte) chan []byte {
out := make(chan []byte)
go func() {
defer close(out)
var buf []byte
for b := range in {
buf = append(buf, b...)
for len(buf) > ss.Size {
out <- buf[:ss.Size]
buf = buf[ss.Size:]
}
}
if len(buf) > 0 {
out <- buf
}
}()
return out
}

View File

@ -3,10 +3,13 @@ package util
import (
"errors"
"fmt"
"io"
"math/rand"
"os"
"os/user"
"path/filepath"
"strings"
"time"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
b58 "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-base58"
@ -154,3 +157,76 @@ func ExpandPathnames(paths []string) ([]string, error) {
}
return out, nil
}
// byteChanReader wraps a byte chan in a reader
type byteChanReader struct {
in chan []byte
buf []byte
}
func NewByteChanReader(in chan []byte) io.Reader {
return &byteChanReader{in: in}
}
func (bcr *byteChanReader) Read(b []byte) (int, error) {
if len(bcr.buf) == 0 {
data, ok := <-bcr.in
if !ok {
return 0, io.EOF
}
bcr.buf = data
}
if len(bcr.buf) >= len(b) {
copy(b, bcr.buf)
bcr.buf = bcr.buf[len(b):]
return len(b), nil
}
copy(b, bcr.buf)
b = b[len(bcr.buf):]
totread := len(bcr.buf)
for data := range bcr.in {
if len(data) > len(b) {
totread += len(b)
copy(b, data[:len(b)])
bcr.buf = data[len(b):]
return totread, nil
}
copy(b, data)
totread += len(data)
b = b[len(data):]
if len(b) == 0 {
return totread, nil
}
}
return totread, io.EOF
}
type randGen struct {
src rand.Source
}
func NewFastRand() io.Reader {
return &randGen{rand.NewSource(time.Now().UnixNano())}
}
func (r *randGen) Read(p []byte) (n int, err error) {
todo := len(p)
offset := 0
for {
val := int64(r.src.Int63())
for i := 0; i < 8; i++ {
p[offset] = byte(val & 0xff)
todo--
if todo == 0 {
return len(p), nil
}
offset++
val >>= 8
}
}
panic("unreachable")
}

View File

@ -2,8 +2,11 @@ package util
import (
"bytes"
mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
"io/ioutil"
"math/rand"
"testing"
mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
)
func TestKey(t *testing.T) {
@ -25,3 +28,33 @@ func TestKey(t *testing.T) {
t.Error("Keys not equal.")
}
}
func TestByteChanReader(t *testing.T) {
data := make([]byte, 1024*1024)
r := NewFastRand()
r.Read(data)
dch := make(chan []byte, 8)
go func() {
beg := 0
for i := 0; i < len(data); {
i += rand.Intn(100) + 1
if i > len(data) {
i = len(data)
}
dch <- data[beg:i]
beg = i
}
close(dch)
}()
read := NewByteChanReader(dch)
out, err := ioutil.ReadAll(read)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(out, data) {
t.Fatal("Reader failed to stream correct bytes")
}
}