1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-26 23:53:19 +08:00

Merge pull request #1802 from ipfs/fix/too-many-fd

rate limit concurrent peer dials
This commit is contained in:
Juan Benet
2015-10-09 13:06:40 -07:00
3 changed files with 60 additions and 12 deletions

View File

@ -1,6 +1,7 @@
package swarm package swarm
import ( import (
"runtime"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -49,7 +50,8 @@ func TestSimultOpenMany(t *testing.T) {
addrs := 20 addrs := 20
rounds := 10 rounds := 10
if ci.IsRunning() { if ci.IsRunning() || runtime.GOOS == "darwin" {
// osx has a limit of 256 file descriptors
addrs = 10 addrs = 10
rounds = 5 rounds = 5
} }

View File

@ -64,6 +64,9 @@ type Swarm struct {
// filters for addresses that shouldnt be dialed // filters for addresses that shouldnt be dialed
Filters *filter.Filters Filters *filter.Filters
// file descriptor rate limited
fdRateLimit chan struct{}
proc goprocess.Process proc goprocess.Process
ctx context.Context ctx context.Context
bwc metrics.Reporter bwc metrics.Reporter
@ -86,6 +89,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
dialT: DialTimeout, dialT: DialTimeout,
notifs: make(map[inet.Notifiee]ps.Notifiee), notifs: make(map[inet.Notifiee]ps.Notifiee),
bwc: bwc, bwc: bwc,
fdRateLimit: make(chan struct{}, concurrentFdDials),
Filters: filter.NewFilters(), Filters: filter.NewFilters(),
} }

View File

@ -44,6 +44,9 @@ var (
// add loop back in Dial(.) // add loop back in Dial(.)
const dialAttempts = 1 const dialAttempts = 1
// number of concurrent outbound dials over transports that consume file descriptors
const concurrentFdDials = 160
// DialTimeout is the amount of time each dial attempt has. We can think about making // DialTimeout is the amount of time each dial attempt has. We can think about making
// this larger down the road, or putting more granular timeouts (i.e. within each // this larger down the road, or putting more granular timeouts (i.e. within each
// subcomponent of Dial) // subcomponent of Dial)
@ -115,6 +118,7 @@ func (ds *dialsync) Unlock(dst peer.ID) {
if !found { if !found {
panic("called dialDone with no ongoing dials to peer: " + dst.Pretty()) panic("called dialDone with no ongoing dials to peer: " + dst.Pretty())
} }
delete(ds.ongoing, dst) // remove ongoing dial delete(ds.ongoing, dst) // remove ongoing dial
close(wait) // release everyone else close(wait) // release everyone else
ds.lock.Unlock() ds.lock.Unlock()
@ -398,7 +402,7 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote
// to end early. // to end early.
go func() { go func() {
// rate limiting just in case. at most 10 addrs at once. // rate limiting just in case. at most 10 addrs at once.
limiter := ratelimit.NewRateLimiter(process.Background(), 10) limiter := ratelimit.NewRateLimiter(process.Background(), 8)
limiter.Go(func(worker process.Process) { limiter.Go(func(worker process.Process) {
// permute addrs so we try different sets first each time. // permute addrs so we try different sets first each time.
for _, i := range rand.Perm(len(remoteAddrs)) { for _, i := range rand.Perm(len(remoteAddrs)) {
@ -411,9 +415,27 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote
} }
workerAddr := remoteAddrs[i] // shadow variable to avoid race workerAddr := remoteAddrs[i] // shadow variable to avoid race
// we have to do the waiting concurrently because there are addrs
// that SHOULD NOT be rate limited (utp), nor blocked by other
// rate limited addrs (tcp).
//
// (and we need to call `limiter.Go`, instead of `go` as required
// by goproc/limiter semantics. note: limiter.Go is not LimitedGo.)
limiter.Go(func(p process.Process) {
// returns whatever ratelimiting is acceptable for workerAddr.
// may not rate limit at all.
rl := s.addrDialRateLimit(workerAddr)
rl <- struct{}{}
limiter.LimitedGo(func(worker process.Process) { limiter.LimitedGo(func(worker process.Process) {
dialSingleAddr(workerAddr) dialSingleAddr(workerAddr)
}) })
<-rl
})
} }
}) })
@ -491,3 +513,23 @@ func dialConnSetup(ctx context.Context, s *Swarm, connC conn.Conn) (*Conn, error
return swarmC, err return swarmC, err
} }
// addrDialRateLimit returns a ratelimiting channel for dialing transport
// addrs like a. for example, tcp is fd-ratelimited. utp is not ratelimited.
func (s *Swarm) addrDialRateLimit(a ma.Multiaddr) chan struct{} {
if isFDCostlyTransport(a) {
return s.fdRateLimit
}
// do not rate limit it at all
return make(chan struct{}, 1)
}
func isFDCostlyTransport(a ma.Multiaddr) bool {
return isTCPMultiaddr(a)
}
func isTCPMultiaddr(a ma.Multiaddr) bool {
p := a.Protocols()
return len(p) == 2 && (p[0].Name == "ip4" || p[0].Name == "ip6") && p[1].Name == "tcp"
}