1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-07-25 11:58:12 +08:00

Merge pull request from ipfs/dht/speed

fix swarm dial backoff
This commit is contained in:
Juan Benet
2015-11-10 06:41:54 +00:00
5 changed files with 72 additions and 20 deletions

@ -18,6 +18,8 @@ const (
QueryError QueryError
Provider Provider
Value Value
AddingPeer
DialingPeer
) )
type QueryEvent struct { type QueryEvent struct {

@ -420,18 +420,18 @@ func TestDialBackoffClears(t *testing.T) {
} }
s1.peers.AddAddrs(s2.local, ifaceAddrs1, peer.PermanentAddrTTL) s1.peers.AddAddrs(s2.local, ifaceAddrs1, peer.PermanentAddrTTL)
before = time.Now() if _, err := s1.Dial(ctx, s2.local); err == nil {
t.Fatal("should have failed to dial backed off peer")
}
time.Sleep(baseBackoffTime)
if c, err := s1.Dial(ctx, s2.local); err != nil { if c, err := s1.Dial(ctx, s2.local); err != nil {
t.Fatal(err) t.Fatal(err)
} else { } else {
c.Close() c.Close()
t.Log("correctly connected") t.Log("correctly connected")
} }
duration = time.Now().Sub(before)
if duration >= dt {
// t.Error("took too long", duration, dt)
}
if s1.backf.Backoff(s2.local) { if s1.backf.Backoff(s2.local) {
t.Error("s2 should no longer be on backoff") t.Error("s2 should no longer be on backoff")

@ -145,44 +145,71 @@ func (ds *dialsync) Unlock(dst peer.ID) {
// dialbackoff.Clear(p) // dialbackoff.Clear(p)
// } // }
// //
type dialbackoff struct { type dialbackoff struct {
entries map[peer.ID]struct{} entries map[peer.ID]*backoffPeer
lock sync.RWMutex lock sync.RWMutex
} }
type backoffPeer struct {
tries int
until time.Time
}
func (db *dialbackoff) init() { func (db *dialbackoff) init() {
if db.entries == nil { if db.entries == nil {
db.entries = make(map[peer.ID]struct{}) db.entries = make(map[peer.ID]*backoffPeer)
} }
} }
// Backoff returns whether the client should backoff from dialing // Backoff returns whether the client should backoff from dialing
// peeer p // peer p
func (db *dialbackoff) Backoff(p peer.ID) bool { func (db *dialbackoff) Backoff(p peer.ID) (backoff bool) {
db.lock.Lock() db.lock.Lock()
defer db.lock.Unlock()
db.init() db.init()
_, found := db.entries[p] bp, found := db.entries[p]
db.lock.Unlock() if found && time.Now().Before(bp.until) {
return found return true
}
return false
} }
const baseBackoffTime = time.Second * 5
const maxBackoffTime = time.Minute * 5
// AddBackoff lets other nodes know that we've entered backoff with // AddBackoff lets other nodes know that we've entered backoff with
// peer p, so dialers should not wait unnecessarily. We still will // peer p, so dialers should not wait unnecessarily. We still will
// attempt to dial with one goroutine, in case we get through. // attempt to dial with one goroutine, in case we get through.
func (db *dialbackoff) AddBackoff(p peer.ID) { func (db *dialbackoff) AddBackoff(p peer.ID) {
db.lock.Lock() db.lock.Lock()
defer db.lock.Unlock()
db.init() db.init()
db.entries[p] = struct{}{} bp, ok := db.entries[p]
db.lock.Unlock() if !ok {
db.entries[p] = &backoffPeer{
tries: 1,
until: time.Now().Add(baseBackoffTime),
}
return
}
expTimeAdd := time.Second * time.Duration(bp.tries*bp.tries)
if expTimeAdd > maxBackoffTime {
expTimeAdd = maxBackoffTime
}
bp.until = time.Now().Add(baseBackoffTime + expTimeAdd)
bp.tries++
} }
// Clear removes a backoff record. Clients should call this after a // Clear removes a backoff record. Clients should call this after a
// successful Dial. // successful Dial.
func (db *dialbackoff) Clear(p peer.ID) { func (db *dialbackoff) Clear(p peer.ID) {
db.lock.Lock() db.lock.Lock()
defer db.lock.Unlock()
db.init() db.init()
delete(db.entries, p) delete(db.entries, p)
db.lock.Unlock()
} }
// Dial connects to a peer. // Dial connects to a peer.
@ -225,14 +252,20 @@ func (s *Swarm) gatedDialAttempt(ctx context.Context, p peer.ID) (*Conn, error)
// check if there's an ongoing dial to this peer // check if there's an ongoing dial to this peer
if ok, wait := s.dsync.Lock(p); ok { if ok, wait := s.dsync.Lock(p); ok {
defer s.dsync.Unlock(p)
// if this peer has been backed off, lets get out of here
if s.backf.Backoff(p) {
log.Event(ctx, "swarmDialBackoff", logdial)
return nil, ErrDialBackoff
}
// ok, we have been charged to dial! let's do it. // ok, we have been charged to dial! let's do it.
// if it succeeds, dial will add the conn to the swarm itself. // if it succeeds, dial will add the conn to the swarm itself.
defer log.EventBegin(ctx, "swarmDialAttemptStart", logdial).Done() defer log.EventBegin(ctx, "swarmDialAttemptStart", logdial).Done()
ctxT, cancel := context.WithTimeout(ctx, s.dialT) ctxT, cancel := context.WithTimeout(ctx, s.dialT)
conn, err := s.dial(ctxT, p) conn, err := s.dial(ctxT, p)
cancel() cancel()
s.dsync.Unlock(p)
log.Debugf("dial end %s", conn) log.Debugf("dial end %s", conn)
if err != nil { if err != nil {
log.Event(ctx, "swarmDialBackoffAdd", logdial) log.Event(ctx, "swarmDialBackoffAdd", logdial)

@ -157,5 +157,8 @@ func (s *Swarm) connHandler(c *ps.Conn) *Conn {
return nil return nil
} }
// if a peer dials us, remove from dial backoff.
s.backf.Clear(sc.RemotePeer())
return sc return sc
} }

@ -79,6 +79,8 @@ type dhtQueryRunner struct {
rateLimit chan struct{} // processing semaphore rateLimit chan struct{} // processing semaphore
log logging.EventLogger log logging.EventLogger
runCtx context.Context
proc process.Process proc process.Process
sync.RWMutex sync.RWMutex
} }
@ -98,6 +100,7 @@ func newQueryRunner(q *dhtQuery) *dhtQueryRunner {
func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) { func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) {
r.log = log r.log = log
r.runCtx = ctx
if len(peers) == 0 { if len(peers) == 0 {
log.Warning("Running query with no peers!") log.Warning("Running query with no peers!")
@ -167,6 +170,11 @@ func (r *dhtQueryRunner) addPeerToQuery(next peer.ID) {
return return
} }
notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
Type: notif.AddingPeer,
ID: next,
})
r.peersRemaining.Increment(1) r.peersRemaining.Increment(1)
select { select {
case r.peersToQuery.EnqChan <- next: case r.peersToQuery.EnqChan <- next:
@ -221,7 +229,12 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
// make sure we're connected to the peer. // make sure we're connected to the peer.
// FIXME abstract away into the network layer // FIXME abstract away into the network layer
if conns := r.query.dht.host.Network().ConnsToPeer(p); len(conns) == 0 { if conns := r.query.dht.host.Network().ConnsToPeer(p); len(conns) == 0 {
log.Infof("not connected. dialing.") log.Error("not connected. dialing.")
notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
Type: notif.DialingPeer,
ID: p,
})
// while we dial, we do not take up a rate limit. this is to allow // while we dial, we do not take up a rate limit. this is to allow
// forward progress during potentially very high latency dials. // forward progress during potentially very high latency dials.
r.rateLimit <- struct{}{} r.rateLimit <- struct{}{}
@ -231,9 +244,10 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
if err := r.query.dht.host.Connect(ctx, pi); err != nil { if err := r.query.dht.host.Connect(ctx, pi); err != nil {
log.Debugf("Error connecting: %s", err) log.Debugf("Error connecting: %s", err)
notif.PublishQueryEvent(ctx, &notif.QueryEvent{ notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
Type: notif.QueryError, Type: notif.QueryError,
Extra: err.Error(), Extra: err.Error(),
ID: p,
}) })
r.Lock() r.Lock()