From 1c2223dece434650695c155c97ef211776b65af8 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 5 Oct 2015 23:29:35 -0700 Subject: [PATCH 1/2] rate limit concurrent peer dials License: MIT Signed-off-by: Jeromy --- p2p/net/swarm/swarm.go | 20 +++++++++------- p2p/net/swarm/swarm_dial.go | 48 ++++++++++++++++++++++++++++++++++--- 2 files changed, 57 insertions(+), 11 deletions(-) diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index b118b9ece..d2d6d3971 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -64,6 +64,9 @@ type Swarm struct { // filters for addresses that shouldnt be dialed Filters *filter.Filters + // file descriptor rate limited + fdRateLimit chan struct{} + proc goprocess.Process ctx context.Context bwc metrics.Reporter @@ -79,14 +82,15 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, } s := &Swarm{ - swarm: ps.NewSwarm(PSTransport), - local: local, - peers: peers, - ctx: ctx, - dialT: DialTimeout, - notifs: make(map[inet.Notifiee]ps.Notifiee), - bwc: bwc, - Filters: filter.NewFilters(), + swarm: ps.NewSwarm(PSTransport), + local: local, + peers: peers, + ctx: ctx, + dialT: DialTimeout, + notifs: make(map[inet.Notifiee]ps.Notifiee), + bwc: bwc, + fdRateLimit: make(chan struct{}, concurrentFdDials), + Filters: filter.NewFilters(), } // configure Swarm diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index a68d28805..4beb986a3 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -44,6 +44,9 @@ var ( // add loop back in Dial(.) const dialAttempts = 1 +// number of concurrent outbound dials over transports that consume file descriptors +const concurrentFdDials = 160 + // DialTimeout is the amount of time each dial attempt has. We can think about making // this larger down the road, or putting more granular timeouts (i.e. within each // subcomponent of Dial) @@ -115,6 +118,7 @@ func (ds *dialsync) Unlock(dst peer.ID) { if !found { panic("called dialDone with no ongoing dials to peer: " + dst.Pretty()) } + delete(ds.ongoing, dst) // remove ongoing dial close(wait) // release everyone else ds.lock.Unlock() @@ -398,7 +402,7 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote // to end early. go func() { // rate limiting just in case. at most 10 addrs at once. - limiter := ratelimit.NewRateLimiter(process.Background(), 10) + limiter := ratelimit.NewRateLimiter(process.Background(), 8) limiter.Go(func(worker process.Process) { // permute addrs so we try different sets first each time. for _, i := range rand.Perm(len(remoteAddrs)) { @@ -411,9 +415,27 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote } workerAddr := remoteAddrs[i] // shadow variable to avoid race - limiter.LimitedGo(func(worker process.Process) { - dialSingleAddr(workerAddr) + + // we have to do the waiting concurrently because there are addrs + // that SHOULD NOT be rate limited (utp), nor blocked by other + // rate limited addrs (tcp). + // + // (and we need to call `limiter.Go`, instead of `go` as required + // by goproc/limiter semantics. note: limiter.Go is not LimitedGo.) + limiter.Go(func(p process.Process) { + + // returns whatever ratelimiting is acceptable for workerAddr. + // may not rate limit at all. + rl := s.addrDialRateLimit(workerAddr) + rl <- struct{}{} + + limiter.LimitedGo(func(worker process.Process) { + dialSingleAddr(workerAddr) + }) + + <-rl }) + } }) @@ -491,3 +513,23 @@ func dialConnSetup(ctx context.Context, s *Swarm, connC conn.Conn) (*Conn, error return swarmC, err } + +// addrDialRateLimit returns a ratelimiting channel for dialing transport +// addrs like a. for example, tcp is fd-ratelimited. utp is not ratelimited. +func (s *Swarm) addrDialRateLimit(a ma.Multiaddr) chan struct{} { + if isFDCostlyTransport(a) { + return s.fdRateLimit + } + + // do not rate limit it at all + return make(chan struct{}, 1) +} + +func isFDCostlyTransport(a ma.Multiaddr) bool { + return isTCPMultiaddr(a) +} + +func isTCPMultiaddr(a ma.Multiaddr) bool { + p := a.Protocols() + return len(p) == 2 && (p[0].Name == "ip4" || p[0].Name == "ip6") && p[1].Name == "tcp" +} From c1c9a74a63ded56ba4081983004b61df5343f867 Mon Sep 17 00:00:00 2001 From: Jeromy Johnson Date: Thu, 8 Oct 2015 11:04:43 -0700 Subject: [PATCH 2/2] fix too many FD error on osx License: MIT Signed-off-by: Jeromy Johnson --- p2p/net/swarm/simul_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/p2p/net/swarm/simul_test.go b/p2p/net/swarm/simul_test.go index dfb775c97..6a5547dc6 100644 --- a/p2p/net/swarm/simul_test.go +++ b/p2p/net/swarm/simul_test.go @@ -1,6 +1,7 @@ package swarm import ( + "runtime" "sync" "testing" "time" @@ -49,7 +50,8 @@ func TestSimultOpenMany(t *testing.T) { addrs := 20 rounds := 10 - if ci.IsRunning() { + if ci.IsRunning() || runtime.GOOS == "darwin" { + // osx has a limit of 256 file descriptors addrs = 10 rounds = 5 }