1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-29 17:36:38 +08:00

Merge pull request #1553 from ipfs/fix-providing-speed

bitswap/provide: improved rate limiting
This commit is contained in:
Juan Benet
2015-08-05 09:36:05 +02:00
2 changed files with 48 additions and 46 deletions

View File

@ -39,8 +39,9 @@ const (
// kMaxPriority is the max priority as defined by the bitswap protocol // kMaxPriority is the max priority as defined by the bitswap protocol
kMaxPriority = math.MaxInt32 kMaxPriority = math.MaxInt32
HasBlockBufferSize = 256 HasBlockBufferSize = 256
provideWorkers = 4 provideKeysBufferSize = 2048
provideWorkerMax = 512
) )
var rebroadcastDelay = delay.Fixed(time.Second * 10) var rebroadcastDelay = delay.Fixed(time.Second * 10)
@ -85,7 +86,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
findKeys: make(chan *blockRequest, sizeBatchRequestChan), findKeys: make(chan *blockRequest, sizeBatchRequestChan),
process: px, process: px,
newBlocks: make(chan *blocks.Block, HasBlockBufferSize), newBlocks: make(chan *blocks.Block, HasBlockBufferSize),
provideKeys: make(chan key.Key), provideKeys: make(chan key.Key, provideKeysBufferSize),
wm: NewWantManager(ctx, network), wm: NewWantManager(ctx, network),
} }
go bs.wm.Run() go bs.wm.Run()

View File

@ -1,12 +1,12 @@
package bitswap package bitswap
import ( import (
"os"
"strconv"
"time" "time"
process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
waitable "github.com/ipfs/go-ipfs/thirdparty/waitable"
key "github.com/ipfs/go-ipfs/blocks/key" key "github.com/ipfs/go-ipfs/blocks/key"
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog" eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
@ -14,22 +14,6 @@ import (
var TaskWorkerCount = 8 var TaskWorkerCount = 8
func init() {
twc := os.Getenv("IPFS_BITSWAP_TASK_WORKERS")
if twc != "" {
n, err := strconv.Atoi(twc)
if err != nil {
log.Error(err)
return
}
if n > 0 {
TaskWorkerCount = n
} else {
log.Errorf("Invalid value of '%d' for IPFS_BITSWAP_TASK_WORKERS", n)
}
}
}
func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
// Start up a worker to handle block requests this node is making // Start up a worker to handle block requests this node is making
px.Go(func(px process.Process) { px.Go(func(px process.Process) {
@ -57,12 +41,7 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
// Spawn up multiple workers to handle incoming blocks // Spawn up multiple workers to handle incoming blocks
// consider increasing number if providing blocks bottlenecks // consider increasing number if providing blocks bottlenecks
// file transfers // file transfers
for i := 0; i < provideWorkers; i++ { px.Go(bs.provideWorker)
i := i
px.Go(func(px process.Process) {
bs.provideWorker(ctx, i)
})
}
} }
func (bs *Bitswap) taskWorker(ctx context.Context, id int) { func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
@ -77,7 +56,11 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
if !ok { if !ok {
continue continue
} }
log.Event(ctx, "Bitswap.TaskWorker.Work", eventlog.LoggableMap{"ID": id, "Target": envelope.Peer.Pretty(), "Block": envelope.Block.Multihash.B58String()}) log.Event(ctx, "Bitswap.TaskWorker.Work", eventlog.LoggableMap{
"ID": id,
"Target": envelope.Peer.Pretty(),
"Block": envelope.Block.Multihash.B58String(),
})
bs.wm.SendBlock(ctx, envelope) bs.wm.SendBlock(ctx, envelope)
case <-ctx.Done(): case <-ctx.Done():
@ -89,27 +72,45 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
} }
} }
func (bs *Bitswap) provideWorker(ctx context.Context, id int) { func (bs *Bitswap) provideWorker(px process.Process) {
idmap := eventlog.LoggableMap{"ID": id}
for { limiter := ratelimit.NewRateLimiter(px, provideWorkerMax)
log.Event(ctx, "Bitswap.ProvideWorker.Loop", idmap)
select { limitedGoProvide := func(k key.Key, wid int) {
case k, ok := <-bs.provideKeys: ev := eventlog.LoggableMap{"ID": wid}
log.Event(ctx, "Bitswap.ProvideWorker.Work", idmap, &k) limiter.LimitedGo(func(px process.Process) {
if !ok {
log.Debug("provideKeys channel closed") ctx := waitable.Context(px) // derive ctx from px
return defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, &k).Done()
}
ctx, cancel := context.WithTimeout(ctx, provideTimeout) ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx
err := bs.network.Provide(ctx, k) defer cancel()
if err != nil {
if err := bs.network.Provide(ctx, k); err != nil {
log.Error(err) log.Error(err)
} }
cancel() })
case <-ctx.Done():
return
}
} }
// worker spawner, reads from bs.provideKeys until it closes, spawning a
// _ratelimited_ number of workers to handle each key.
limiter.Go(func(px process.Process) {
for wid := 2; ; wid++ {
ev := eventlog.LoggableMap{"ID": 1}
log.Event(waitable.Context(px), "Bitswap.ProvideWorker.Loop", ev)
select {
case <-px.Closing():
return
case k, ok := <-bs.provideKeys:
if !ok {
log.Debug("provideKeys channel closed")
return
}
limitedGoProvide(k, wid)
}
}
})
} }
func (bs *Bitswap) provideCollector(ctx context.Context) { func (bs *Bitswap) provideCollector(ctx context.Context) {