From 80c73f26b851853acfd8f75780852bf5992cf4bc Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 4 Nov 2015 21:49:20 -0800 Subject: [PATCH 1/3] Add in some more notifications to help profile queries License: MIT Signed-off-by: Jeromy --- notifications/query.go | 2 ++ routing/dht/query.go | 18 ++++++++++++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/notifications/query.go b/notifications/query.go index 29a0bc0fb..6a9d08da3 100644 --- a/notifications/query.go +++ b/notifications/query.go @@ -18,6 +18,8 @@ const ( QueryError Provider Value + AddingPeer + DialingPeer ) type QueryEvent struct { diff --git a/routing/dht/query.go b/routing/dht/query.go index 5318897ee..8afaaa7e5 100644 --- a/routing/dht/query.go +++ b/routing/dht/query.go @@ -79,6 +79,8 @@ type dhtQueryRunner struct { rateLimit chan struct{} // processing semaphore log logging.EventLogger + runCtx context.Context + proc process.Process sync.RWMutex } @@ -98,6 +100,7 @@ func newQueryRunner(q *dhtQuery) *dhtQueryRunner { func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) { r.log = log + r.runCtx = ctx if len(peers) == 0 { log.Warning("Running query with no peers!") @@ -167,6 +170,11 @@ func (r *dhtQueryRunner) addPeerToQuery(next peer.ID) { return } + notif.PublishQueryEvent(r.runCtx, ¬if.QueryEvent{ + Type: notif.AddingPeer, + ID: next, + }) + r.peersRemaining.Increment(1) select { case r.peersToQuery.EnqChan <- next: @@ -221,7 +229,12 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) { // make sure we're connected to the peer. // FIXME abstract away into the network layer if conns := r.query.dht.host.Network().ConnsToPeer(p); len(conns) == 0 { - log.Infof("not connected. dialing.") + log.Error("not connected. dialing.") + + notif.PublishQueryEvent(r.runCtx, ¬if.QueryEvent{ + Type: notif.DialingPeer, + ID: p, + }) // while we dial, we do not take up a rate limit. this is to allow // forward progress during potentially very high latency dials. r.rateLimit <- struct{}{} @@ -231,9 +244,10 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) { if err := r.query.dht.host.Connect(ctx, pi); err != nil { log.Debugf("Error connecting: %s", err) - notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + notif.PublishQueryEvent(r.runCtx, ¬if.QueryEvent{ Type: notif.QueryError, Extra: err.Error(), + ID: p, }) r.Lock() From 193d73080e4e9ec21d1f3795460e05ba330835ce Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 4 Nov 2015 21:49:54 -0800 Subject: [PATCH 2/3] fix dial backoff License: MIT Signed-off-by: Jeromy --- p2p/net/swarm/swarm_dial.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 1e2e34143..a2c135126 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -227,14 +227,20 @@ func (s *Swarm) gatedDialAttempt(ctx context.Context, p peer.ID) (*Conn, error) // check if there's an ongoing dial to this peer if ok, wait := s.dsync.Lock(p); ok { + defer s.dsync.Unlock(p) + + // if this peer has been backed off, lets get out of here + if s.backf.Backoff(p) { + log.Event(ctx, "swarmDialBackoff", logdial) + return nil, ErrDialBackoff + } + // ok, we have been charged to dial! let's do it. // if it succeeds, dial will add the conn to the swarm itself. - defer log.EventBegin(ctx, "swarmDialAttemptStart", logdial).Done() ctxT, cancel := context.WithTimeout(ctx, s.dialT) conn, err := s.dial(ctxT, p) cancel() - s.dsync.Unlock(p) log.Debugf("dial end %s", conn) if err != nil { log.Event(ctx, "swarmDialBackoffAdd", logdial) From 097d40ba74e5794c75f02fb3b2ca52fc5408cb35 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 5 Nov 2015 13:36:02 -0800 Subject: [PATCH 3/3] rewrite of backoff mechanism License: MIT Signed-off-by: Jeromy --- p2p/net/swarm/dial_test.go | 12 ++++----- p2p/net/swarm/swarm_dial.go | 47 +++++++++++++++++++++++++++-------- p2p/net/swarm/swarm_listen.go | 3 +++ 3 files changed, 46 insertions(+), 16 deletions(-) diff --git a/p2p/net/swarm/dial_test.go b/p2p/net/swarm/dial_test.go index 6c3c4f920..d0e0cdbb0 100644 --- a/p2p/net/swarm/dial_test.go +++ b/p2p/net/swarm/dial_test.go @@ -420,18 +420,18 @@ func TestDialBackoffClears(t *testing.T) { } s1.peers.AddAddrs(s2.local, ifaceAddrs1, peer.PermanentAddrTTL) - before = time.Now() + if _, err := s1.Dial(ctx, s2.local); err == nil { + t.Fatal("should have failed to dial backed off peer") + } + + time.Sleep(baseBackoffTime) + if c, err := s1.Dial(ctx, s2.local); err != nil { t.Fatal(err) } else { c.Close() t.Log("correctly connected") } - duration = time.Now().Sub(before) - - if duration >= dt { - // t.Error("took too long", duration, dt) - } if s1.backf.Backoff(s2.local) { t.Error("s2 should no longer be on backoff") diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index a2c135126..0bcdda1a6 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -147,44 +147,71 @@ func (ds *dialsync) Unlock(dst peer.ID) { // dialbackoff.Clear(p) // } // + type dialbackoff struct { - entries map[peer.ID]struct{} + entries map[peer.ID]*backoffPeer lock sync.RWMutex } +type backoffPeer struct { + tries int + until time.Time +} + func (db *dialbackoff) init() { if db.entries == nil { - db.entries = make(map[peer.ID]struct{}) + db.entries = make(map[peer.ID]*backoffPeer) } } // Backoff returns whether the client should backoff from dialing -// peeer p -func (db *dialbackoff) Backoff(p peer.ID) bool { +// peer p +func (db *dialbackoff) Backoff(p peer.ID) (backoff bool) { db.lock.Lock() + defer db.lock.Unlock() db.init() - _, found := db.entries[p] - db.lock.Unlock() - return found + bp, found := db.entries[p] + if found && time.Now().Before(bp.until) { + return true + } + + return false } +const baseBackoffTime = time.Second * 5 +const maxBackoffTime = time.Minute * 5 + // AddBackoff lets other nodes know that we've entered backoff with // peer p, so dialers should not wait unnecessarily. We still will // attempt to dial with one goroutine, in case we get through. func (db *dialbackoff) AddBackoff(p peer.ID) { db.lock.Lock() + defer db.lock.Unlock() db.init() - db.entries[p] = struct{}{} - db.lock.Unlock() + bp, ok := db.entries[p] + if !ok { + db.entries[p] = &backoffPeer{ + tries: 1, + until: time.Now().Add(baseBackoffTime), + } + return + } + + expTimeAdd := time.Second * time.Duration(bp.tries*bp.tries) + if expTimeAdd > maxBackoffTime { + expTimeAdd = maxBackoffTime + } + bp.until = time.Now().Add(baseBackoffTime + expTimeAdd) + bp.tries++ } // Clear removes a backoff record. Clients should call this after a // successful Dial. func (db *dialbackoff) Clear(p peer.ID) { db.lock.Lock() + defer db.lock.Unlock() db.init() delete(db.entries, p) - db.lock.Unlock() } // Dial connects to a peer. diff --git a/p2p/net/swarm/swarm_listen.go b/p2p/net/swarm/swarm_listen.go index d1bcb0752..b32e75a18 100644 --- a/p2p/net/swarm/swarm_listen.go +++ b/p2p/net/swarm/swarm_listen.go @@ -138,5 +138,8 @@ func (s *Swarm) connHandler(c *ps.Conn) *Conn { return nil } + // if a peer dials us, remove from dial backoff. + s.backf.Clear(sc.RemotePeer()) + return sc }