mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-30 18:13:54 +08:00
Merge pull request #4413 from ipfs/fix/cmd-goroutine-leaks
Fix some goroutine leaks in commands
This commit is contained in:
@ -93,7 +93,11 @@ var queryDhtCmd = &cmds.Command{
|
||||
go func() {
|
||||
defer close(outChan)
|
||||
for e := range events {
|
||||
outChan <- e
|
||||
select {
|
||||
case outChan <- e:
|
||||
case <-req.Context().Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
},
|
||||
@ -181,7 +185,11 @@ var findProvidersDhtCmd = &cmds.Command{
|
||||
go func() {
|
||||
defer close(outChan)
|
||||
for e := range events {
|
||||
outChan <- e
|
||||
select {
|
||||
case outChan <- e:
|
||||
case <-req.Context().Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
@ -301,7 +309,11 @@ var provideRefDhtCmd = &cmds.Command{
|
||||
go func() {
|
||||
defer close(outChan)
|
||||
for e := range events {
|
||||
outChan <- e
|
||||
select {
|
||||
case outChan <- e:
|
||||
case <-req.Context().Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
@ -427,7 +439,11 @@ var findPeerDhtCmd = &cmds.Command{
|
||||
go func() {
|
||||
defer close(outChan)
|
||||
for v := range events {
|
||||
outChan <- v
|
||||
select {
|
||||
case outChan <- v:
|
||||
case <-req.Context().Done():
|
||||
}
|
||||
|
||||
}
|
||||
}()
|
||||
|
||||
@ -529,7 +545,10 @@ Different key types can specify other 'best' rules.
|
||||
go func() {
|
||||
defer close(outChan)
|
||||
for e := range events {
|
||||
outChan <- e
|
||||
select {
|
||||
case outChan <- e:
|
||||
case <-req.Context().Done():
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
@ -643,7 +662,11 @@ NOTE: A value may not exceed 2048 bytes.
|
||||
go func() {
|
||||
defer close(outChan)
|
||||
for e := range events {
|
||||
outChan <- e
|
||||
select {
|
||||
case outChan <- e:
|
||||
case <-req.Context().Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -85,10 +85,18 @@ order to reclaim hard disk space.
|
||||
errs := false
|
||||
for res := range gcOutChan {
|
||||
if res.Error != nil {
|
||||
outChan <- &GcResult{Error: res.Error.Error()}
|
||||
select {
|
||||
case outChan <- &GcResult{Error: res.Error.Error()}:
|
||||
case <-req.Context().Done():
|
||||
return
|
||||
}
|
||||
errs = true
|
||||
} else {
|
||||
outChan <- &GcResult{Key: res.KeyRemoved}
|
||||
select {
|
||||
case outChan <- &GcResult{Key: res.KeyRemoved}:
|
||||
case <-req.Context().Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
if errs {
|
||||
@ -96,7 +104,10 @@ order to reclaim hard disk space.
|
||||
}
|
||||
} else {
|
||||
err := corerepo.CollectResult(req.Context(), gcOutChan, func(k *cid.Cid) {
|
||||
outChan <- &GcResult{Key: k}
|
||||
select {
|
||||
case outChan <- &GcResult{Key: k}:
|
||||
case <-req.Context().Done():
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
res.SetError(err, cmdkit.ErrNormal)
|
||||
@ -291,17 +302,29 @@ var repoVerifyCmd = &oldcmds.Command{
|
||||
for k := range keys {
|
||||
_, err := bs.Get(k)
|
||||
if err != nil {
|
||||
out <- &VerifyProgress{
|
||||
select {
|
||||
case out <- &VerifyProgress{
|
||||
Msg: fmt.Sprintf("block %s was corrupt (%s)", k, err),
|
||||
}:
|
||||
case <-req.Context().Done():
|
||||
return
|
||||
}
|
||||
fails++
|
||||
}
|
||||
i++
|
||||
out <- &VerifyProgress{Progress: i}
|
||||
select {
|
||||
case out <- &VerifyProgress{Progress: i}:
|
||||
case <-req.Context().Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if fails == 0 {
|
||||
out <- &VerifyProgress{Msg: "verify complete, all blocks validated."}
|
||||
select {
|
||||
case out <- &VerifyProgress{Msg: "verify complete, all blocks validated."}:
|
||||
case <-req.Context().Done():
|
||||
return
|
||||
}
|
||||
} else {
|
||||
res.SetError(fmt.Errorf("verify complete, some blocks were corrupt"), cmdkit.ErrNormal)
|
||||
}
|
||||
|
Reference in New Issue
Block a user