mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-30 18:13:54 +08:00
rework FetchGraph to be less of a memory hog
License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com>
This commit is contained in:
@ -149,6 +149,8 @@ func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) (<-chan *Node
|
|||||||
out := make(chan *Node)
|
out := make(chan *Node)
|
||||||
errs := make(chan error, 1)
|
errs := make(chan error, 1)
|
||||||
blocks := ds.Blocks.GetBlocks(ctx, keys)
|
blocks := ds.Blocks.GetBlocks(ctx, keys)
|
||||||
|
var count int
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer close(out)
|
defer close(out)
|
||||||
defer close(errs)
|
defer close(errs)
|
||||||
@ -156,6 +158,9 @@ func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) (<-chan *Node
|
|||||||
select {
|
select {
|
||||||
case b, ok := <-blocks:
|
case b, ok := <-blocks:
|
||||||
if !ok {
|
if !ok {
|
||||||
|
if count != len(keys) {
|
||||||
|
errs <- fmt.Errorf("failed to fetch all nodes")
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
nd, err := Decoded(b.Data)
|
nd, err := Decoded(b.Data)
|
||||||
@ -165,6 +170,7 @@ func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) (<-chan *Node
|
|||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case out <- nd:
|
case out <- nd:
|
||||||
|
count++
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -404,28 +410,27 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, set
|
|||||||
func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *Node, errs chan<- error) {
|
func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *Node, errs chan<- error) {
|
||||||
defer close(out)
|
defer close(out)
|
||||||
|
|
||||||
get := func(g NodeGetter) {
|
get := func(ks []key.Key) {
|
||||||
nd, err := g.Get(ctx)
|
nodes, errch := ds.GetMany(ctx, ks)
|
||||||
if err != nil {
|
for {
|
||||||
select {
|
select {
|
||||||
case errs <- err:
|
case nd, ok := <-nodes:
|
||||||
case <-ctx.Done():
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case out <- nd:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case err := <-errch:
|
||||||
|
errs <- err
|
||||||
|
return
|
||||||
}
|
}
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case out <- nd:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for ks := range in {
|
for ks := range in {
|
||||||
ng := GetNodes(ctx, ds, ks)
|
go get(ks)
|
||||||
for _, g := range ng {
|
|
||||||
go get(g)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user