mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-01 19:24:14 +08:00
Merge pull request #3598 from Voker57/feat/frugal-enumerate
Pass cids instead of nodes around in EnumerateChildrenAsync
This commit is contained in:
@ -140,7 +140,7 @@ func (n *dagService) Remove(nd node.Node) error {
|
|||||||
|
|
||||||
// FetchGraph fetches all nodes that are children of the given node
|
// FetchGraph fetches all nodes that are children of the given node
|
||||||
func FetchGraph(ctx context.Context, c *cid.Cid, serv DAGService) error {
|
func FetchGraph(ctx context.Context, c *cid.Cid, serv DAGService) error {
|
||||||
return EnumerateChildren(ctx, serv, c, cid.NewSet().Visit, false)
|
return EnumerateChildrenAsync(ctx, serv, c, cid.NewSet().Visit)
|
||||||
}
|
}
|
||||||
|
|
||||||
// FindLinks searches this nodes links for the given key,
|
// FindLinks searches this nodes links for the given key,
|
||||||
@ -394,45 +394,40 @@ func EnumerateChildren(ctx context.Context, ds LinkService, root *cid.Cid, visit
|
|||||||
var FetchGraphConcurrency = 8
|
var FetchGraphConcurrency = 8
|
||||||
|
|
||||||
func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visit func(*cid.Cid) bool) error {
|
func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visit func(*cid.Cid) bool) error {
|
||||||
if !visit(c) {
|
feed := make(chan *cid.Cid)
|
||||||
return nil
|
out := make(chan node.Node)
|
||||||
}
|
|
||||||
|
|
||||||
root, err := ds.Get(ctx, c)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
feed := make(chan node.Node)
|
|
||||||
out := make(chan *NodeOption)
|
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
|
|
||||||
var setlk sync.Mutex
|
var setlk sync.Mutex
|
||||||
|
|
||||||
|
errChan := make(chan error)
|
||||||
|
fetchersCtx, cancel := context.WithCancel(ctx)
|
||||||
|
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
for i := 0; i < FetchGraphConcurrency; i++ {
|
for i := 0; i < FetchGraphConcurrency; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
for n := range feed {
|
for ic := range feed {
|
||||||
links := n.Links()
|
n, err := ds.Get(ctx, ic)
|
||||||
cids := make([]*cid.Cid, 0, len(links))
|
if err != nil {
|
||||||
for _, l := range links {
|
errChan <- err
|
||||||
setlk.Lock()
|
return
|
||||||
unseen := visit(l.Cid)
|
|
||||||
setlk.Unlock()
|
|
||||||
if unseen {
|
|
||||||
cids = append(cids, l.Cid)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for nopt := range ds.GetMany(ctx, cids) {
|
setlk.Lock()
|
||||||
|
unseen := visit(ic)
|
||||||
|
setlk.Unlock()
|
||||||
|
|
||||||
|
if unseen {
|
||||||
select {
|
select {
|
||||||
case out <- nopt:
|
case out <- n:
|
||||||
case <-ctx.Done():
|
case <-fetchersCtx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case done <- struct{}{}:
|
case done <- struct{}{}:
|
||||||
case <-ctx.Done():
|
case <-fetchersCtx.Done():
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -440,10 +435,10 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visi
|
|||||||
defer close(feed)
|
defer close(feed)
|
||||||
|
|
||||||
send := feed
|
send := feed
|
||||||
var todobuffer []node.Node
|
var todobuffer []*cid.Cid
|
||||||
var inProgress int
|
var inProgress int
|
||||||
|
|
||||||
next := root
|
next := c
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case send <- next:
|
case send <- next:
|
||||||
@ -460,17 +455,17 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visi
|
|||||||
if inProgress == 0 && next == nil {
|
if inProgress == 0 && next == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
case nc := <-out:
|
case nd := <-out:
|
||||||
if nc.Err != nil {
|
for _, lnk := range nd.Links() {
|
||||||
return nc.Err
|
if next == nil {
|
||||||
}
|
next = lnk.Cid
|
||||||
|
send = feed
|
||||||
if next == nil {
|
} else {
|
||||||
next = nc.Node
|
todobuffer = append(todobuffer, lnk.Cid)
|
||||||
send = feed
|
}
|
||||||
} else {
|
|
||||||
todobuffer = append(todobuffer, nc.Node)
|
|
||||||
}
|
}
|
||||||
|
case err := <-errChan:
|
||||||
|
return err
|
||||||
|
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
|
Reference in New Issue
Block a user