mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-20 02:21:48 +08:00
fix(pin): goroutine leaks
License: MIT Signed-off-by: Overbool <overbool.xu@gmail.com>
This commit is contained in:
46
pin/gc/gc.go
46
pin/gc/gc.go
@ -58,7 +58,10 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, dstor dstore.Datastore, pn
|
||||
|
||||
gcs, err := ColoredSet(ctx, pn, ds, bestEffortRoots, output)
|
||||
if err != nil {
|
||||
output <- Result{Error: err}
|
||||
select {
|
||||
case output <- Result{Error: err}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return
|
||||
}
|
||||
emark.Append(logging.LoggableMap{
|
||||
@ -69,7 +72,10 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, dstor dstore.Datastore, pn
|
||||
|
||||
keychan, err := bs.AllKeysChan(ctx)
|
||||
if err != nil {
|
||||
output <- Result{Error: err}
|
||||
select {
|
||||
case output <- Result{Error: err}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@ -108,7 +114,11 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, dstor dstore.Datastore, pn
|
||||
})
|
||||
esweep.Done()
|
||||
if errors {
|
||||
output <- Result{Error: ErrCannotDeleteSomeBlocks}
|
||||
select {
|
||||
case output <- Result{Error: ErrCannotDeleteSomeBlocks}:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
defer log.EventBegin(ctx, "GC.datastore").Done()
|
||||
@ -119,7 +129,10 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, dstor dstore.Datastore, pn
|
||||
|
||||
err = gds.CollectGarbage()
|
||||
if err != nil {
|
||||
output <- Result{Error: err}
|
||||
select {
|
||||
case output <- Result{Error: err}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return
|
||||
}
|
||||
}()
|
||||
@ -177,28 +190,40 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ng ipld.NodeGetter, bestEffo
|
||||
links, err := ipld.GetLinks(ctx, ng, cid)
|
||||
if err != nil {
|
||||
errors = true
|
||||
output <- Result{Error: &CannotFetchLinksError{cid, err}}
|
||||
select {
|
||||
case output <- Result{Error: &CannotFetchLinksError{cid, err}}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
return links, nil
|
||||
}
|
||||
err := Descendants(ctx, getLinks, gcs, pn.RecursiveKeys())
|
||||
if err != nil {
|
||||
errors = true
|
||||
output <- Result{Error: err}
|
||||
select {
|
||||
case output <- Result{Error: err}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
|
||||
bestEffortGetLinks := func(ctx context.Context, cid cid.Cid) ([]*ipld.Link, error) {
|
||||
links, err := ipld.GetLinks(ctx, ng, cid)
|
||||
if err != nil && err != ipld.ErrNotFound {
|
||||
errors = true
|
||||
output <- Result{Error: &CannotFetchLinksError{cid, err}}
|
||||
select {
|
||||
case output <- Result{Error: &CannotFetchLinksError{cid, err}}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
return links, nil
|
||||
}
|
||||
err = Descendants(ctx, bestEffortGetLinks, gcs, bestEffortRoots)
|
||||
if err != nil {
|
||||
errors = true
|
||||
output <- Result{Error: err}
|
||||
select {
|
||||
case output <- Result{Error: err}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
|
||||
for _, k := range pn.DirectKeys() {
|
||||
@ -208,7 +233,10 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ng ipld.NodeGetter, bestEffo
|
||||
err = Descendants(ctx, getLinks, gcs, pn.InternalPins())
|
||||
if err != nil {
|
||||
errors = true
|
||||
output <- Result{Error: err}
|
||||
select {
|
||||
case output <- Result{Error: err}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
|
||||
if errors {
|
||||
|
Reference in New Issue
Block a user