diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 64f28c74e..2f9a6ebe6 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -312,11 +312,7 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) [ continue } - // must all be closer than self - key := key.Key(pmes.GetKey()) - if !kb.Closer(dht.self, clp, key) { - filtered = append(filtered, clp) - } + filtered = append(filtered, clp) } // ok seems like closer nodes diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index 1c5d2b1c1..137995bed 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -14,7 +14,7 @@ import ( ) // The number of closer peers to send on requests. -var CloserPeerCount = 4 +var CloserPeerCount = KValue // dhthandler specifies the signature of functions that handle DHT messages. type dhtHandler func(context.Context, peer.ID, *pb.Message) (*pb.Message, error) diff --git a/routing/dht/lookup.go b/routing/dht/lookup.go index 76173a615..dc377e8b7 100644 --- a/routing/dht/lookup.go +++ b/routing/dht/lookup.go @@ -23,7 +23,7 @@ func pointerizePeerInfos(pis []peer.PeerInfo) []*peer.PeerInfo { // to the given key func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key key.Key) (<-chan peer.ID, error) { e := log.EventBegin(ctx, "getClosestPeers", &key) - tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) + tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue) if len(tablepeers) == 0 { return nil, kb.ErrLookupFailure } diff --git a/routing/dht/query.go b/routing/dht/query.go index ab4f28492..b1bec20bb 100644 --- a/routing/dht/query.go +++ b/routing/dht/query.go @@ -184,29 +184,28 @@ func (r *dhtQueryRunner) spawnWorkers(proc process.Process) { case <-r.proc.Closing(): return - case p, more := <-r.peersToQuery.DeqChan: - if !more { - return // channel closed. - } + case <-r.rateLimit: + select { + case p, more := <-r.peersToQuery.DeqChan: + if !more { + return // channel closed. + } - // do it as a child func to make sure Run exits - // ONLY AFTER spawn workers has exited. - proc.Go(func(proc process.Process) { - r.queryPeer(proc, p) - }) + // do it as a child func to make sure Run exits + // ONLY AFTER spawn workers has exited. + proc.Go(func(proc process.Process) { + r.queryPeer(proc, p) + }) + case <-r.proc.Closing(): + return + case <-r.peersRemaining.Done(): + return + } } } } func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) { - // make sure we rate limit concurrency. - select { - case <-r.rateLimit: - case <-proc.Closing(): - r.peersRemaining.Decrement(1) - return - } - // ok let's do this! // create a context from our proc. diff --git a/routing/dht/routing.go b/routing/dht/routing.go index d5854155f..57341e69c 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -145,7 +145,7 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key key.Key, nvals int) ([]ro } // get closest peers in the routing table - rtp := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) + rtp := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue) log.Debugf("peers in rt: %s", len(rtp), rtp) if len(rtp) == 0 { log.Warning("No peers from routing table!") @@ -322,7 +322,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key key.Key, return &dhtQueryResult{closerPeers: clpeers}, nil }) - peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) + peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue) _, err := query.Run(ctx, peers) if err != nil { log.Debugf("Query error: %s", err) @@ -342,7 +342,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, er return pi, nil } - peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue) + peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), KValue) if len(peers) == 0 { return peer.PeerInfo{}, kb.ErrLookupFailure } @@ -409,7 +409,7 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (< peerchan := make(chan peer.PeerInfo, asyncQueryBuffer) peersSeen := peer.Set{} - peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue) + peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), KValue) if len(peers) == 0 { return nil, kb.ErrLookupFailure }