diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index 60f8ffc22..2873f8c67 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -5,7 +5,6 @@ import ( process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" procctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context" - ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" key "github.com/ipfs/go-ipfs/blocks/key" @@ -74,43 +73,48 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { func (bs *Bitswap) provideWorker(px process.Process) { - limiter := ratelimit.NewRateLimiter(px, provideWorkerMax) + limit := make(chan struct{}, provideWorkerMax) limitedGoProvide := func(k key.Key, wid int) { + defer func() { + // replace token when done + <-limit + }() ev := logging.LoggableMap{"ID": wid} - limiter.LimitedGo(func(px process.Process) { - ctx := procctx.OnClosingContext(px) // derive ctx from px - defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, &k).Done() + ctx := procctx.OnClosingContext(px) // derive ctx from px + defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, &k).Done() - ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx - defer cancel() + ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx + defer cancel() - if err := bs.network.Provide(ctx, k); err != nil { - log.Error(err) - } - }) + if err := bs.network.Provide(ctx, k); err != nil { + log.Error(err) + } } // worker spawner, reads from bs.provideKeys until it closes, spawning a // _ratelimited_ number of workers to handle each key. - limiter.Go(func(px process.Process) { - for wid := 2; ; wid++ { - ev := logging.LoggableMap{"ID": 1} - log.Event(procctx.OnClosingContext(px), "Bitswap.ProvideWorker.Loop", ev) + for wid := 2; ; wid++ { + ev := logging.LoggableMap{"ID": 1} + log.Event(procctx.OnClosingContext(px), "Bitswap.ProvideWorker.Loop", ev) + select { + case <-px.Closing(): + return + case k, ok := <-bs.provideKeys: + if !ok { + log.Debug("provideKeys channel closed") + return + } select { case <-px.Closing(): return - case k, ok := <-bs.provideKeys: - if !ok { - log.Debug("provideKeys channel closed") - return - } - limitedGoProvide(k, wid) + case limit <- struct{}{}: + go limitedGoProvide(k, wid) } } - }) + } } func (bs *Bitswap) provideCollector(ctx context.Context) {