diff --git a/core/core.go b/core/core.go index 71ff1d33e..6ebb723bc 100644 --- a/core/core.go +++ b/core/core.go @@ -90,7 +90,7 @@ type IpfsNode struct { // Services Peerstore peer.Peerstore // storage for other Peer instances - Blockstore bstore.Blockstore // the block store (lower level) + Blockstore bstore.GCBlockstore // the block store (lower level) Blocks *bserv.BlockService // the block service, get/add blocks. DAG merkledag.DAGService // the merkle dag service, get/add objects. Resolver *path.Resolver // the path resolution system diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index da921ed09..5158c42aa 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -3,7 +3,6 @@ package merkledag import ( "fmt" - "sync" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" blocks "github.com/ipfs/go-ipfs/blocks" @@ -121,41 +120,86 @@ func (n *dagService) Remove(nd *Node) error { return n.Blocks.DeleteBlock(k) } -// FetchGraph asynchronously fetches all nodes that are children of the given -// node, and returns a channel that may be waited upon for the fetch to complete -func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{} { - log.Warning("Untested.") - var wg sync.WaitGroup - done := make(chan struct{}) +// FetchGraph fetches all nodes that are children of the given node +func FetchGraph(ctx context.Context, root *Node, serv DAGService) error { + toprocess := make(chan []key.Key, 8) + nodes := make(chan *Node, 8) + errs := make(chan error, 1) - for _, l := range root.Links { - wg.Add(1) - go func(lnk *Link) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + defer close(toprocess) - // Signal child is done on way out - defer wg.Done() - select { - case <-ctx.Done(): - return + go fetchNodes(ctx, serv, toprocess, nodes, errs) + + nodes <- root + live := 1 + + for { + select { + case nd, ok := <-nodes: + if !ok { + return nil } - nd, err := lnk.GetNode(ctx, serv) - if err != nil { - log.Debug(err) - return + var keys []key.Key + for _, lnk := range nd.Links { + keys = append(keys, key.Key(lnk.Hash)) + } + keys = dedupeKeys(keys) + + // keep track of open request, when zero, we're done + live += len(keys) - 1 + + if live == 0 { + return nil } - // Wait for children to finish - <-FetchGraph(ctx, nd, serv) - }(l) + if len(keys) > 0 { + select { + case toprocess <- keys: + case <-ctx.Done(): + return ctx.Err() + } + } + case err := <-errs: + return err + case <-ctx.Done(): + return ctx.Err() + } } +} - go func() { - wg.Wait() - done <- struct{}{} - }() +func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *Node, errs chan<- error) { + defer close(out) + for { + select { + case ks, ok := <-in: + if !ok { + return + } - return done + ng := ds.GetNodes(ctx, ks) + for _, g := range ng { + go func(g NodeGetter) { + nd, err := g.Get(ctx) + if err != nil { + select { + case errs <- err: + case <-ctx.Done(): + } + return + } + + select { + case out <- nd: + case <-ctx.Done(): + return + } + }(g) + } + } + } } // FindLinks searches this nodes links for the given key, @@ -318,3 +362,24 @@ func (t *Batch) Commit() error { t.size = 0 return err } + +// EnumerateChildren will walk the dag below the given root node and add all +// unseen children to the passed in set. +// TODO: parallelize to avoid disk latency perf hits? +func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, set key.KeySet) error { + for _, lnk := range root.Links { + k := key.Key(lnk.Hash) + if !set.Has(k) { + set.Add(k) + child, err := ds.Get(ctx, k) + if err != nil { + return err + } + err = EnumerateChildren(ctx, ds, child, set) + if err != nil { + return err + } + } + } + return nil +} diff --git a/merkledag/merkledag_test.go b/merkledag/merkledag_test.go index dda4a976e..3e316b083 100644 --- a/merkledag/merkledag_test.go +++ b/merkledag/merkledag_test.go @@ -130,7 +130,7 @@ func SubtestNodeStat(t *testing.T, n *Node) { } if expected != *actual { - t.Errorf("n.Stat incorrect.\nexpect: %s\nactual: %s", expected, actual) + t.Error("n.Stat incorrect.\nexpect: %s\nactual: %s", expected, actual) } else { fmt.Printf("n.Stat correct: %s\n", actual) } @@ -232,7 +232,6 @@ func runBatchFetchTest(t *testing.T, read io.Reader) { } } } - func TestRecursiveAdd(t *testing.T) { a := &Node{Data: []byte("A")} b := &Node{Data: []byte("B")} @@ -298,3 +297,79 @@ func TestCantGet(t *testing.T) { t.Fatal("expected err not found, got: ", err) } } + +func TestFetchGraph(t *testing.T) { + bsi := bstest.Mocks(t, 1)[0] + ds := NewDAGService(bsi) + + read := io.LimitReader(u.NewTimeSeededRand(), 1024*32) + spl := &chunk.SizeSplitter{512} + + root, err := imp.BuildDagFromReader(read, ds, spl, nil) + if err != nil { + t.Fatal(err) + } + + err = FetchGraph(context.TODO(), root, ds) + if err != nil { + t.Fatal(err) + } +} + +func TestFetchGraphOther(t *testing.T) { + var dservs []DAGService + for _, bsi := range bstest.Mocks(t, 2) { + dservs = append(dservs, NewDAGService(bsi)) + } + + read := io.LimitReader(u.NewTimeSeededRand(), 1024*32) + spl := &chunk.SizeSplitter{512} + + root, err := imp.BuildDagFromReader(read, dservs[0], spl, nil) + if err != nil { + t.Fatal(err) + } + + err = FetchGraph(context.TODO(), root, dservs[1]) + if err != nil { + t.Fatal(err) + } +} + +func TestEnumerateChildren(t *testing.T) { + bsi := bstest.Mocks(t, 1) + ds := NewDAGService(bsi[0]) + + spl := &chunk.SizeSplitter{512} + + read := io.LimitReader(u.NewTimeSeededRand(), 1024*1024) + + root, err := imp.BuildDagFromReader(read, ds, spl, nil) + if err != nil { + t.Fatal(err) + } + + ks := key.NewKeySet() + err = EnumerateChildren(context.Background(), ds, root, ks) + if err != nil { + t.Fatal(err) + } + + var traverse func(n *Node) + traverse = func(n *Node) { + // traverse dag and check + for _, lnk := range n.Links { + k := key.Key(lnk.Hash) + if !ks.Has(k) { + t.Fatal("missing key in set!") + } + child, err := ds.Get(context.Background(), k) + if err != nil { + t.Fatal(err) + } + traverse(child) + } + } + + traverse(root) +}