From b9e5cfaa15ea2d105e3dba653eaa05be3a142645 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 7 Jul 2015 09:04:03 -0700 Subject: [PATCH] merkledag FetchGraph and EnumerateChildren This commit improves (fixes) the FetchGraph call for recursively fetching every descendant node of a given merkledag node. This operation should be the simplest way of ensuring that you have replicated a dag locally. This commit also implements a method in the merkledag package called EnumerateChildren, this method is used to get a set of the keys of every descendant node of the given node. All keys found are noted in the passed in KeySet, which may in the future be implemented on disk to avoid excessive memory consumption. License: MIT Signed-off-by: Jeromy --- core/core.go | 2 +- merkledag/merkledag.go | 119 ++++++++++++++++++++++++++++-------- merkledag/merkledag_test.go | 79 +++++++++++++++++++++++- 3 files changed, 170 insertions(+), 30 deletions(-) diff --git a/core/core.go b/core/core.go index 71ff1d33e..6ebb723bc 100644 --- a/core/core.go +++ b/core/core.go @@ -90,7 +90,7 @@ type IpfsNode struct { // Services Peerstore peer.Peerstore // storage for other Peer instances - Blockstore bstore.Blockstore // the block store (lower level) + Blockstore bstore.GCBlockstore // the block store (lower level) Blocks *bserv.BlockService // the block service, get/add blocks. DAG merkledag.DAGService // the merkle dag service, get/add objects. Resolver *path.Resolver // the path resolution system diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index da921ed09..5158c42aa 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -3,7 +3,6 @@ package merkledag import ( "fmt" - "sync" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" blocks "github.com/ipfs/go-ipfs/blocks" @@ -121,41 +120,86 @@ func (n *dagService) Remove(nd *Node) error { return n.Blocks.DeleteBlock(k) } -// FetchGraph asynchronously fetches all nodes that are children of the given -// node, and returns a channel that may be waited upon for the fetch to complete -func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{} { - log.Warning("Untested.") - var wg sync.WaitGroup - done := make(chan struct{}) +// FetchGraph fetches all nodes that are children of the given node +func FetchGraph(ctx context.Context, root *Node, serv DAGService) error { + toprocess := make(chan []key.Key, 8) + nodes := make(chan *Node, 8) + errs := make(chan error, 1) - for _, l := range root.Links { - wg.Add(1) - go func(lnk *Link) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + defer close(toprocess) - // Signal child is done on way out - defer wg.Done() - select { - case <-ctx.Done(): - return + go fetchNodes(ctx, serv, toprocess, nodes, errs) + + nodes <- root + live := 1 + + for { + select { + case nd, ok := <-nodes: + if !ok { + return nil } - nd, err := lnk.GetNode(ctx, serv) - if err != nil { - log.Debug(err) - return + var keys []key.Key + for _, lnk := range nd.Links { + keys = append(keys, key.Key(lnk.Hash)) + } + keys = dedupeKeys(keys) + + // keep track of open request, when zero, we're done + live += len(keys) - 1 + + if live == 0 { + return nil } - // Wait for children to finish - <-FetchGraph(ctx, nd, serv) - }(l) + if len(keys) > 0 { + select { + case toprocess <- keys: + case <-ctx.Done(): + return ctx.Err() + } + } + case err := <-errs: + return err + case <-ctx.Done(): + return ctx.Err() + } } +} - go func() { - wg.Wait() - done <- struct{}{} - }() +func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *Node, errs chan<- error) { + defer close(out) + for { + select { + case ks, ok := <-in: + if !ok { + return + } - return done + ng := ds.GetNodes(ctx, ks) + for _, g := range ng { + go func(g NodeGetter) { + nd, err := g.Get(ctx) + if err != nil { + select { + case errs <- err: + case <-ctx.Done(): + } + return + } + + select { + case out <- nd: + case <-ctx.Done(): + return + } + }(g) + } + } + } } // FindLinks searches this nodes links for the given key, @@ -318,3 +362,24 @@ func (t *Batch) Commit() error { t.size = 0 return err } + +// EnumerateChildren will walk the dag below the given root node and add all +// unseen children to the passed in set. +// TODO: parallelize to avoid disk latency perf hits? +func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, set key.KeySet) error { + for _, lnk := range root.Links { + k := key.Key(lnk.Hash) + if !set.Has(k) { + set.Add(k) + child, err := ds.Get(ctx, k) + if err != nil { + return err + } + err = EnumerateChildren(ctx, ds, child, set) + if err != nil { + return err + } + } + } + return nil +} diff --git a/merkledag/merkledag_test.go b/merkledag/merkledag_test.go index dda4a976e..3e316b083 100644 --- a/merkledag/merkledag_test.go +++ b/merkledag/merkledag_test.go @@ -130,7 +130,7 @@ func SubtestNodeStat(t *testing.T, n *Node) { } if expected != *actual { - t.Errorf("n.Stat incorrect.\nexpect: %s\nactual: %s", expected, actual) + t.Error("n.Stat incorrect.\nexpect: %s\nactual: %s", expected, actual) } else { fmt.Printf("n.Stat correct: %s\n", actual) } @@ -232,7 +232,6 @@ func runBatchFetchTest(t *testing.T, read io.Reader) { } } } - func TestRecursiveAdd(t *testing.T) { a := &Node{Data: []byte("A")} b := &Node{Data: []byte("B")} @@ -298,3 +297,79 @@ func TestCantGet(t *testing.T) { t.Fatal("expected err not found, got: ", err) } } + +func TestFetchGraph(t *testing.T) { + bsi := bstest.Mocks(t, 1)[0] + ds := NewDAGService(bsi) + + read := io.LimitReader(u.NewTimeSeededRand(), 1024*32) + spl := &chunk.SizeSplitter{512} + + root, err := imp.BuildDagFromReader(read, ds, spl, nil) + if err != nil { + t.Fatal(err) + } + + err = FetchGraph(context.TODO(), root, ds) + if err != nil { + t.Fatal(err) + } +} + +func TestFetchGraphOther(t *testing.T) { + var dservs []DAGService + for _, bsi := range bstest.Mocks(t, 2) { + dservs = append(dservs, NewDAGService(bsi)) + } + + read := io.LimitReader(u.NewTimeSeededRand(), 1024*32) + spl := &chunk.SizeSplitter{512} + + root, err := imp.BuildDagFromReader(read, dservs[0], spl, nil) + if err != nil { + t.Fatal(err) + } + + err = FetchGraph(context.TODO(), root, dservs[1]) + if err != nil { + t.Fatal(err) + } +} + +func TestEnumerateChildren(t *testing.T) { + bsi := bstest.Mocks(t, 1) + ds := NewDAGService(bsi[0]) + + spl := &chunk.SizeSplitter{512} + + read := io.LimitReader(u.NewTimeSeededRand(), 1024*1024) + + root, err := imp.BuildDagFromReader(read, ds, spl, nil) + if err != nil { + t.Fatal(err) + } + + ks := key.NewKeySet() + err = EnumerateChildren(context.Background(), ds, root, ks) + if err != nil { + t.Fatal(err) + } + + var traverse func(n *Node) + traverse = func(n *Node) { + // traverse dag and check + for _, lnk := range n.Links { + k := key.Key(lnk.Hash) + if !ks.Has(k) { + t.Fatal("missing key in set!") + } + child, err := ds.Get(context.Background(), k) + if err != nil { + t.Fatal(err) + } + traverse(child) + } + } + + traverse(root) +}