diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index ae8d71cfe..f6bc8f9f5 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -394,45 +394,40 @@ func EnumerateChildren(ctx context.Context, ds LinkService, root *cid.Cid, visit var FetchGraphConcurrency = 8 func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visit func(*cid.Cid) bool) error { - if !visit(c) { - return nil - } - - root, err := ds.Get(ctx, c) - if err != nil { - return err - } - - feed := make(chan node.Node) - out := make(chan *NodeOption) + feed := make(chan *cid.Cid) + out := make(chan node.Node) done := make(chan struct{}) var setlk sync.Mutex - + + errChan := make(chan error) + fetchersCtx, cancel := context.WithCancel(ctx) + + defer cancel() + for i := 0; i < FetchGraphConcurrency; i++ { go func() { - for n := range feed { - links := n.Links() - cids := make([]*cid.Cid, 0, len(links)) - for _, l := range links { - setlk.Lock() - unseen := visit(l.Cid) - setlk.Unlock() - if unseen { - cids = append(cids, l.Cid) - } + for ic := range feed { + n, err := ds.Get(ctx, ic) + if err != nil { + errChan <- err + return } - - for nopt := range ds.GetMany(ctx, cids) { + + setlk.Lock() + unseen := visit(ic) + setlk.Unlock() + + if unseen { select { - case out <- nopt: - case <-ctx.Done(): + case out <- n: + case <-fetchersCtx.Done(): return } } select { case done <- struct{}{}: - case <-ctx.Done(): + case <-fetchersCtx.Done(): } } }() @@ -440,10 +435,10 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visi defer close(feed) send := feed - var todobuffer []node.Node + var todobuffer []*cid.Cid var inProgress int - next := root + next := c for { select { case send <- next: @@ -460,18 +455,18 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visi if inProgress == 0 && next == nil { return nil } - case nc := <-out: - if nc.Err != nil { - return nc.Err + case nd := <-out: + for _, lnk := range nd.Links() { + if next == nil { + next = lnk.Cid + send = feed + } else { + todobuffer = append(todobuffer, lnk.Cid) + } } - - if next == nil { - next = nc.Node - send = feed - } else { - todobuffer = append(todobuffer, nc.Node) - } - + case err := <-errChan: + return err + case <-ctx.Done(): return ctx.Err() }