diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 5234aefc9..cbc9bcf4f 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -39,8 +39,9 @@ const ( // kMaxPriority is the max priority as defined by the bitswap protocol kMaxPriority = math.MaxInt32 - HasBlockBufferSize = 256 - provideWorkers = 4 + HasBlockBufferSize = 256 + provideKeysBufferSize = 2048 + provideWorkerMax = 512 ) var rebroadcastDelay = delay.Fixed(time.Second * 10) @@ -85,7 +86,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, findKeys: make(chan *blockRequest, sizeBatchRequestChan), process: px, newBlocks: make(chan *blocks.Block, HasBlockBufferSize), - provideKeys: make(chan key.Key), + provideKeys: make(chan key.Key, provideKeysBufferSize), wm: NewWantManager(ctx, network), } go bs.wm.Run() diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index edd05bfb3..e19cf2fbc 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -1,12 +1,12 @@ package bitswap import ( - "os" - "strconv" "time" process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + waitable "github.com/ipfs/go-ipfs/thirdparty/waitable" key "github.com/ipfs/go-ipfs/blocks/key" eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog" @@ -14,22 +14,6 @@ import ( var TaskWorkerCount = 8 -func init() { - twc := os.Getenv("IPFS_BITSWAP_TASK_WORKERS") - if twc != "" { - n, err := strconv.Atoi(twc) - if err != nil { - log.Error(err) - return - } - if n > 0 { - TaskWorkerCount = n - } else { - log.Errorf("Invalid value of '%d' for IPFS_BITSWAP_TASK_WORKERS", n) - } - } -} - func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { // Start up a worker to handle block requests this node is making px.Go(func(px process.Process) { @@ -57,12 +41,7 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { // Spawn up multiple workers to handle incoming blocks // consider increasing number if providing blocks bottlenecks // file transfers - for i := 0; i < provideWorkers; i++ { - i := i - px.Go(func(px process.Process) { - bs.provideWorker(ctx, i) - }) - } + px.Go(bs.provideWorker) } func (bs *Bitswap) taskWorker(ctx context.Context, id int) { @@ -77,7 +56,11 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { if !ok { continue } - log.Event(ctx, "Bitswap.TaskWorker.Work", eventlog.LoggableMap{"ID": id, "Target": envelope.Peer.Pretty(), "Block": envelope.Block.Multihash.B58String()}) + log.Event(ctx, "Bitswap.TaskWorker.Work", eventlog.LoggableMap{ + "ID": id, + "Target": envelope.Peer.Pretty(), + "Block": envelope.Block.Multihash.B58String(), + }) bs.wm.SendBlock(ctx, envelope) case <-ctx.Done(): @@ -89,27 +72,45 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { } } -func (bs *Bitswap) provideWorker(ctx context.Context, id int) { - idmap := eventlog.LoggableMap{"ID": id} - for { - log.Event(ctx, "Bitswap.ProvideWorker.Loop", idmap) - select { - case k, ok := <-bs.provideKeys: - log.Event(ctx, "Bitswap.ProvideWorker.Work", idmap, &k) - if !ok { - log.Debug("provideKeys channel closed") - return - } - ctx, cancel := context.WithTimeout(ctx, provideTimeout) - err := bs.network.Provide(ctx, k) - if err != nil { +func (bs *Bitswap) provideWorker(px process.Process) { + + limiter := ratelimit.NewRateLimiter(px, provideWorkerMax) + + limitedGoProvide := func(k key.Key, wid int) { + ev := eventlog.LoggableMap{"ID": wid} + limiter.LimitedGo(func(px process.Process) { + + ctx := waitable.Context(px) // derive ctx from px + defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, &k).Done() + + ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx + defer cancel() + + if err := bs.network.Provide(ctx, k); err != nil { log.Error(err) } - cancel() - case <-ctx.Done(): - return - } + }) } + + // worker spawner, reads from bs.provideKeys until it closes, spawning a + // _ratelimited_ number of workers to handle each key. + limiter.Go(func(px process.Process) { + for wid := 2; ; wid++ { + ev := eventlog.LoggableMap{"ID": 1} + log.Event(waitable.Context(px), "Bitswap.ProvideWorker.Loop", ev) + + select { + case <-px.Closing(): + return + case k, ok := <-bs.provideKeys: + if !ok { + log.Debug("provideKeys channel closed") + return + } + limitedGoProvide(k, wid) + } + } + }) } func (bs *Bitswap) provideCollector(ctx context.Context) {