diff --git a/notifications/query.go b/notifications/query.go index 29a0bc0fb..6a9d08da3 100644 --- a/notifications/query.go +++ b/notifications/query.go @@ -18,6 +18,8 @@ const ( QueryError Provider Value + AddingPeer + DialingPeer ) type QueryEvent struct { diff --git a/p2p/net/swarm/dial_test.go b/p2p/net/swarm/dial_test.go index 6c3c4f920..d0e0cdbb0 100644 --- a/p2p/net/swarm/dial_test.go +++ b/p2p/net/swarm/dial_test.go @@ -420,18 +420,18 @@ func TestDialBackoffClears(t *testing.T) { } s1.peers.AddAddrs(s2.local, ifaceAddrs1, peer.PermanentAddrTTL) - before = time.Now() + if _, err := s1.Dial(ctx, s2.local); err == nil { + t.Fatal("should have failed to dial backed off peer") + } + + time.Sleep(baseBackoffTime) + if c, err := s1.Dial(ctx, s2.local); err != nil { t.Fatal(err) } else { c.Close() t.Log("correctly connected") } - duration = time.Now().Sub(before) - - if duration >= dt { - // t.Error("took too long", duration, dt) - } if s1.backf.Backoff(s2.local) { t.Error("s2 should no longer be on backoff") diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 77c768391..6d4e0344e 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -145,44 +145,71 @@ func (ds *dialsync) Unlock(dst peer.ID) { // dialbackoff.Clear(p) // } // + type dialbackoff struct { - entries map[peer.ID]struct{} + entries map[peer.ID]*backoffPeer lock sync.RWMutex } +type backoffPeer struct { + tries int + until time.Time +} + func (db *dialbackoff) init() { if db.entries == nil { - db.entries = make(map[peer.ID]struct{}) + db.entries = make(map[peer.ID]*backoffPeer) } } // Backoff returns whether the client should backoff from dialing -// peeer p -func (db *dialbackoff) Backoff(p peer.ID) bool { +// peer p +func (db *dialbackoff) Backoff(p peer.ID) (backoff bool) { db.lock.Lock() + defer db.lock.Unlock() db.init() - _, found := db.entries[p] - db.lock.Unlock() - return found + bp, found := db.entries[p] + if found && time.Now().Before(bp.until) { + return true + } + + return false } +const baseBackoffTime = time.Second * 5 +const maxBackoffTime = time.Minute * 5 + // AddBackoff lets other nodes know that we've entered backoff with // peer p, so dialers should not wait unnecessarily. We still will // attempt to dial with one goroutine, in case we get through. func (db *dialbackoff) AddBackoff(p peer.ID) { db.lock.Lock() + defer db.lock.Unlock() db.init() - db.entries[p] = struct{}{} - db.lock.Unlock() + bp, ok := db.entries[p] + if !ok { + db.entries[p] = &backoffPeer{ + tries: 1, + until: time.Now().Add(baseBackoffTime), + } + return + } + + expTimeAdd := time.Second * time.Duration(bp.tries*bp.tries) + if expTimeAdd > maxBackoffTime { + expTimeAdd = maxBackoffTime + } + bp.until = time.Now().Add(baseBackoffTime + expTimeAdd) + bp.tries++ } // Clear removes a backoff record. Clients should call this after a // successful Dial. func (db *dialbackoff) Clear(p peer.ID) { db.lock.Lock() + defer db.lock.Unlock() db.init() delete(db.entries, p) - db.lock.Unlock() } // Dial connects to a peer. @@ -225,14 +252,20 @@ func (s *Swarm) gatedDialAttempt(ctx context.Context, p peer.ID) (*Conn, error) // check if there's an ongoing dial to this peer if ok, wait := s.dsync.Lock(p); ok { + defer s.dsync.Unlock(p) + + // if this peer has been backed off, lets get out of here + if s.backf.Backoff(p) { + log.Event(ctx, "swarmDialBackoff", logdial) + return nil, ErrDialBackoff + } + // ok, we have been charged to dial! let's do it. // if it succeeds, dial will add the conn to the swarm itself. - defer log.EventBegin(ctx, "swarmDialAttemptStart", logdial).Done() ctxT, cancel := context.WithTimeout(ctx, s.dialT) conn, err := s.dial(ctxT, p) cancel() - s.dsync.Unlock(p) log.Debugf("dial end %s", conn) if err != nil { log.Event(ctx, "swarmDialBackoffAdd", logdial) diff --git a/p2p/net/swarm/swarm_listen.go b/p2p/net/swarm/swarm_listen.go index 640450399..c19faccb2 100644 --- a/p2p/net/swarm/swarm_listen.go +++ b/p2p/net/swarm/swarm_listen.go @@ -157,5 +157,8 @@ func (s *Swarm) connHandler(c *ps.Conn) *Conn { return nil } + // if a peer dials us, remove from dial backoff. + s.backf.Clear(sc.RemotePeer()) + return sc } diff --git a/routing/dht/query.go b/routing/dht/query.go index 86b33a8db..666a95878 100644 --- a/routing/dht/query.go +++ b/routing/dht/query.go @@ -79,6 +79,8 @@ type dhtQueryRunner struct { rateLimit chan struct{} // processing semaphore log logging.EventLogger + runCtx context.Context + proc process.Process sync.RWMutex } @@ -98,6 +100,7 @@ func newQueryRunner(q *dhtQuery) *dhtQueryRunner { func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) { r.log = log + r.runCtx = ctx if len(peers) == 0 { log.Warning("Running query with no peers!") @@ -167,6 +170,11 @@ func (r *dhtQueryRunner) addPeerToQuery(next peer.ID) { return } + notif.PublishQueryEvent(r.runCtx, ¬if.QueryEvent{ + Type: notif.AddingPeer, + ID: next, + }) + r.peersRemaining.Increment(1) select { case r.peersToQuery.EnqChan <- next: @@ -221,7 +229,12 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) { // make sure we're connected to the peer. // FIXME abstract away into the network layer if conns := r.query.dht.host.Network().ConnsToPeer(p); len(conns) == 0 { - log.Infof("not connected. dialing.") + log.Error("not connected. dialing.") + + notif.PublishQueryEvent(r.runCtx, ¬if.QueryEvent{ + Type: notif.DialingPeer, + ID: p, + }) // while we dial, we do not take up a rate limit. this is to allow // forward progress during potentially very high latency dials. r.rateLimit <- struct{}{} @@ -231,9 +244,10 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) { if err := r.query.dht.host.Connect(ctx, pi); err != nil { log.Debugf("Error connecting: %s", err) - notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + notif.PublishQueryEvent(r.runCtx, ¬if.QueryEvent{ Type: notif.QueryError, Extra: err.Error(), + ID: p, }) r.Lock()