mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-28 00:39:31 +08:00
Merge pull request #1858 from ipfs/fix/bitswap-limiter
fix panic in bitswap working limit spawning
This commit is contained in:
@ -5,7 +5,6 @@ import (
|
|||||||
|
|
||||||
process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
|
process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
|
||||||
procctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
|
procctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
|
||||||
ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
|
|
||||||
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
|
|
||||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||||
@ -74,11 +73,14 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
|
|||||||
|
|
||||||
func (bs *Bitswap) provideWorker(px process.Process) {
|
func (bs *Bitswap) provideWorker(px process.Process) {
|
||||||
|
|
||||||
limiter := ratelimit.NewRateLimiter(px, provideWorkerMax)
|
limit := make(chan struct{}, provideWorkerMax)
|
||||||
|
|
||||||
limitedGoProvide := func(k key.Key, wid int) {
|
limitedGoProvide := func(k key.Key, wid int) {
|
||||||
|
defer func() {
|
||||||
|
// replace token when done
|
||||||
|
<-limit
|
||||||
|
}()
|
||||||
ev := logging.LoggableMap{"ID": wid}
|
ev := logging.LoggableMap{"ID": wid}
|
||||||
limiter.LimitedGo(func(px process.Process) {
|
|
||||||
|
|
||||||
ctx := procctx.OnClosingContext(px) // derive ctx from px
|
ctx := procctx.OnClosingContext(px) // derive ctx from px
|
||||||
defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, &k).Done()
|
defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, &k).Done()
|
||||||
@ -89,12 +91,10 @@ func (bs *Bitswap) provideWorker(px process.Process) {
|
|||||||
if err := bs.network.Provide(ctx, k); err != nil {
|
if err := bs.network.Provide(ctx, k); err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
}
|
}
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// worker spawner, reads from bs.provideKeys until it closes, spawning a
|
// worker spawner, reads from bs.provideKeys until it closes, spawning a
|
||||||
// _ratelimited_ number of workers to handle each key.
|
// _ratelimited_ number of workers to handle each key.
|
||||||
limiter.Go(func(px process.Process) {
|
|
||||||
for wid := 2; ; wid++ {
|
for wid := 2; ; wid++ {
|
||||||
ev := logging.LoggableMap{"ID": 1}
|
ev := logging.LoggableMap{"ID": 1}
|
||||||
log.Event(procctx.OnClosingContext(px), "Bitswap.ProvideWorker.Loop", ev)
|
log.Event(procctx.OnClosingContext(px), "Bitswap.ProvideWorker.Loop", ev)
|
||||||
@ -107,10 +107,14 @@ func (bs *Bitswap) provideWorker(px process.Process) {
|
|||||||
log.Debug("provideKeys channel closed")
|
log.Debug("provideKeys channel closed")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
limitedGoProvide(k, wid)
|
select {
|
||||||
|
case <-px.Closing():
|
||||||
|
return
|
||||||
|
case limit <- struct{}{}:
|
||||||
|
go limitedGoProvide(k, wid)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *Bitswap) provideCollector(ctx context.Context) {
|
func (bs *Bitswap) provideCollector(ctx context.Context) {
|
||||||
|
Reference in New Issue
Block a user