1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-10-25 10:27:01 +08:00

Merge pull request #447 from jbenet/dup-block-fix

GetDAG duplicate block fix.
This commit is contained in:
Jeromy Johnson
2014-12-15 05:42:17 +00:00
2 changed files with 51 additions and 30 deletions

View File

@ -290,15 +290,16 @@ func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{}
return done
}
// Searches this nodes links for one to the given key,
// returns the index of said link
func FindLink(n *Node, k u.Key, found []*Node) (int, error) {
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
func FindLinks(n *Node, k u.Key) []int {
var out []int
for i, lnk := range n.Links {
if u.Key(lnk.Hash) == k && found[i] == nil {
return i, nil
if u.Key(lnk.Hash) == k {
out = append(out, i)
}
}
return -1, u.ErrNotFound
return out
}
// GetDAG will fill out all of the links of the given Node.
@ -307,45 +308,37 @@ func FindLink(n *Node, k u.Key, found []*Node) (int, error) {
func (ds *dagService) GetDAG(ctx context.Context, root *Node) <-chan *Node {
sig := make(chan *Node)
go func() {
var keys []u.Key
nodes := make([]*Node, len(root.Links))
defer close(sig)
var keys []u.Key
for _, lnk := range root.Links {
keys = append(keys, u.Key(lnk.Hash))
}
blkchan := ds.Blocks.GetBlocks(ctx, keys)
nodes := make([]*Node, len(root.Links))
next := 0
for blk := range blkchan {
i, err := FindLink(root, blk.Key(), nodes)
if err != nil {
// NB: can only occur as a result of programmer error
panic("Received block that wasnt in this nodes links!")
}
nd, err := Decoded(blk.Data)
if err != nil {
// NB: can occur in normal situations, with improperly formatted
// input data
// input data
log.Error("Got back bad block!")
break
}
nodes[i] = nd
is := FindLinks(root, blk.Key())
for _, i := range is {
nodes[i] = nd
}
if next == i {
sig <- nd
next++
for ; next < len(nodes) && nodes[next] != nil; next++ {
sig <- nodes[next]
}
for ; next < len(nodes) && nodes[next] != nil; next++ {
sig <- nodes[next]
}
}
if next < len(nodes) {
// TODO: bubble errors back up.
log.Errorf("Did not receive correct number of nodes!")
}
close(sig)
}()
return sig

View File

@ -5,6 +5,7 @@ import (
"fmt"
"io"
"io/ioutil"
"sync"
"testing"
blockservice "github.com/jbenet/go-ipfs/blockservice"
@ -75,14 +76,42 @@ func makeTestDag(t *testing.T) *Node {
return root
}
type devZero struct{}
func (_ devZero) Read(b []byte) (int, error) {
for i, _ := range b {
b[i] = 0
}
return len(b), nil
}
func makeZeroDag(t *testing.T) *Node {
read := io.LimitReader(devZero{}, 1024*32)
spl := &chunk.SizeSplitter{512}
root, err := imp.NewDagFromReaderWithSplitter(read, spl)
if err != nil {
t.Fatal(err)
}
return root
}
func TestBatchFetch(t *testing.T) {
root := makeTestDag(t)
runBatchFetchTest(t, root)
}
func TestBatchFetchDupBlock(t *testing.T) {
root := makeZeroDag(t)
runBatchFetchTest(t, root)
}
func runBatchFetchTest(t *testing.T, root *Node) {
var dagservs []DAGService
for _, bsi := range blockservice.Mocks(t, 5) {
dagservs = append(dagservs, NewDAGService(bsi))
}
t.Log("finished setup.")
root := makeTestDag(t)
read, err := uio.NewDagReader(root, nil)
if err != nil {
t.Fatal(err)
@ -104,9 +133,11 @@ func TestBatchFetch(t *testing.T) {
t.Fatal(err)
}
done := make(chan struct{})
wg := sync.WaitGroup{}
for i := 1; i < len(dagservs); i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
first, err := dagservs[i].Get(k)
if err != nil {
t.Fatal(err)
@ -125,11 +156,8 @@ func TestBatchFetch(t *testing.T) {
if !bytes.Equal(datagot, expected) {
t.Fatal("Got bad data back!")
}
done <- struct{}{}
}(i)
}
for i := 1; i < len(dagservs); i++ {
<-done
}
wg.Done()
}