From 5806ac00c84bcb3301826aaad1f2dacafa54abf6 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 20 Feb 2016 11:04:21 -0800 Subject: [PATCH] rework FetchGraph to be less of a memory hog License: MIT Signed-off-by: Jeromy --- merkledag/merkledag.go | 39 ++++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index a311b396c..dc02b92f8 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -149,6 +149,8 @@ func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) (<-chan *Node out := make(chan *Node) errs := make(chan error, 1) blocks := ds.Blocks.GetBlocks(ctx, keys) + var count int + go func() { defer close(out) defer close(errs) @@ -156,6 +158,9 @@ func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) (<-chan *Node select { case b, ok := <-blocks: if !ok { + if count != len(keys) { + errs <- fmt.Errorf("failed to fetch all nodes") + } return } nd, err := Decoded(b.Data) @@ -165,6 +170,7 @@ func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) (<-chan *Node } select { case out <- nd: + count++ case <-ctx.Done(): return } @@ -404,28 +410,27 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, set func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *Node, errs chan<- error) { defer close(out) - get := func(g NodeGetter) { - nd, err := g.Get(ctx) - if err != nil { + get := func(ks []key.Key) { + nodes, errch := ds.GetMany(ctx, ks) + for { select { - case errs <- err: - case <-ctx.Done(): + case nd, ok := <-nodes: + if !ok { + return + } + select { + case out <- nd: + case <-ctx.Done(): + return + } + case err := <-errch: + errs <- err + return } - return - } - - select { - case out <- nd: - case <-ctx.Done(): - return } } for ks := range in { - ng := GetNodes(ctx, ds, ks) - for _, g := range ng { - go get(g) - } + go get(ks) } - }