From 6e705e1ef061309d3bca69ef3a636e64c057138f Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Tue, 4 Aug 2015 19:53:39 +0200 Subject: [PATCH] bitswap/provide: improved rate limiting this PR greatly speeds up providing and add. (1) Instead of idling workers, we move to a ratelimiter-based worker. We put this max at 512, so that means _up to_ 512 goroutines. This is very small load on the node, as each worker is providing to the dht, which means mostly waiting. It DOES put a large load on the DHT. but i want to try this out for a while and see if it's a problem. We can decide later if it is a problem for the network (nothing stops anyone from re-compiling, but the defaults of course matter). (2) We add a buffer size for provideKeys, which means that we block the add process much less. this is a very cheap buffer, as it only stores keys (it may be even cheaper with a lock + ring buffer instead of a channel...). This makes add blazing fast-- it was being rate limited by providing. Add should not be ratelimited by providing (much, if any) as the user wants to just store the stuff in the local node's repo. This buffer is initially set to 4096, which means: 4096 * keysize (~258 bytes + go overhead) ~ 1-1.5MB this buffer only last a few sec to mins, and is an ok thing to do for the sake of very fast adds. (this could be a configurable paramter, certainly for low-mem footprint use cases). At the moment this is not much, compared to block sizes. (3) We make the providing EventBegin() + Done(), so that we can track how long a provide takes, and we can remove workers as they finish in bsdash and similar tools. License: MIT Signed-off-by: Juan Batiz-Benet --- exchange/bitswap/bitswap.go | 7 +-- exchange/bitswap/workers.go | 87 +++++++++++++++++++------------------ 2 files changed, 48 insertions(+), 46 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 5234aefc9..cbc9bcf4f 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -39,8 +39,9 @@ const ( // kMaxPriority is the max priority as defined by the bitswap protocol kMaxPriority = math.MaxInt32 - HasBlockBufferSize = 256 - provideWorkers = 4 + HasBlockBufferSize = 256 + provideKeysBufferSize = 2048 + provideWorkerMax = 512 ) var rebroadcastDelay = delay.Fixed(time.Second * 10) @@ -85,7 +86,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, findKeys: make(chan *blockRequest, sizeBatchRequestChan), process: px, newBlocks: make(chan *blocks.Block, HasBlockBufferSize), - provideKeys: make(chan key.Key), + provideKeys: make(chan key.Key, provideKeysBufferSize), wm: NewWantManager(ctx, network), } go bs.wm.Run() diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index edd05bfb3..e19cf2fbc 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -1,12 +1,12 @@ package bitswap import ( - "os" - "strconv" "time" process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + waitable "github.com/ipfs/go-ipfs/thirdparty/waitable" key "github.com/ipfs/go-ipfs/blocks/key" eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog" @@ -14,22 +14,6 @@ import ( var TaskWorkerCount = 8 -func init() { - twc := os.Getenv("IPFS_BITSWAP_TASK_WORKERS") - if twc != "" { - n, err := strconv.Atoi(twc) - if err != nil { - log.Error(err) - return - } - if n > 0 { - TaskWorkerCount = n - } else { - log.Errorf("Invalid value of '%d' for IPFS_BITSWAP_TASK_WORKERS", n) - } - } -} - func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { // Start up a worker to handle block requests this node is making px.Go(func(px process.Process) { @@ -57,12 +41,7 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { // Spawn up multiple workers to handle incoming blocks // consider increasing number if providing blocks bottlenecks // file transfers - for i := 0; i < provideWorkers; i++ { - i := i - px.Go(func(px process.Process) { - bs.provideWorker(ctx, i) - }) - } + px.Go(bs.provideWorker) } func (bs *Bitswap) taskWorker(ctx context.Context, id int) { @@ -77,7 +56,11 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { if !ok { continue } - log.Event(ctx, "Bitswap.TaskWorker.Work", eventlog.LoggableMap{"ID": id, "Target": envelope.Peer.Pretty(), "Block": envelope.Block.Multihash.B58String()}) + log.Event(ctx, "Bitswap.TaskWorker.Work", eventlog.LoggableMap{ + "ID": id, + "Target": envelope.Peer.Pretty(), + "Block": envelope.Block.Multihash.B58String(), + }) bs.wm.SendBlock(ctx, envelope) case <-ctx.Done(): @@ -89,27 +72,45 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { } } -func (bs *Bitswap) provideWorker(ctx context.Context, id int) { - idmap := eventlog.LoggableMap{"ID": id} - for { - log.Event(ctx, "Bitswap.ProvideWorker.Loop", idmap) - select { - case k, ok := <-bs.provideKeys: - log.Event(ctx, "Bitswap.ProvideWorker.Work", idmap, &k) - if !ok { - log.Debug("provideKeys channel closed") - return - } - ctx, cancel := context.WithTimeout(ctx, provideTimeout) - err := bs.network.Provide(ctx, k) - if err != nil { +func (bs *Bitswap) provideWorker(px process.Process) { + + limiter := ratelimit.NewRateLimiter(px, provideWorkerMax) + + limitedGoProvide := func(k key.Key, wid int) { + ev := eventlog.LoggableMap{"ID": wid} + limiter.LimitedGo(func(px process.Process) { + + ctx := waitable.Context(px) // derive ctx from px + defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, &k).Done() + + ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx + defer cancel() + + if err := bs.network.Provide(ctx, k); err != nil { log.Error(err) } - cancel() - case <-ctx.Done(): - return - } + }) } + + // worker spawner, reads from bs.provideKeys until it closes, spawning a + // _ratelimited_ number of workers to handle each key. + limiter.Go(func(px process.Process) { + for wid := 2; ; wid++ { + ev := eventlog.LoggableMap{"ID": 1} + log.Event(waitable.Context(px), "Bitswap.ProvideWorker.Loop", ev) + + select { + case <-px.Closing(): + return + case k, ok := <-bs.provideKeys: + if !ok { + log.Debug("provideKeys channel closed") + return + } + limitedGoProvide(k, wid) + } + } + }) } func (bs *Bitswap) provideCollector(ctx context.Context) {