mirror of
https://github.com/ipfs/kubo.git
synced 2025-09-09 23:42:20 +08:00
dht: kick off all the queries wit every node in our rt
s/kademlia calls for makign sure to query all peers we have in our routing table, not just those closest. this helps ensure most queries resolve properly.
This commit is contained in:
@ -88,9 +88,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
|
|||||||
// get closest peers in the routing table
|
// get closest peers in the routing table
|
||||||
rtp := dht.routingTable.ListPeers()
|
rtp := dht.routingTable.ListPeers()
|
||||||
log.Debugf("peers in rt: %s", len(rtp), rtp)
|
log.Debugf("peers in rt: %s", len(rtp), rtp)
|
||||||
|
if len(rtp) == 0 {
|
||||||
closest := dht.routingTable.NearestPeers(kb.ConvertKey(key), PoolSize)
|
|
||||||
if closest == nil || len(closest) == 0 {
|
|
||||||
log.Warning("No peers from routing table!")
|
log.Warning("No peers from routing table!")
|
||||||
return nil, errors.Wrap(kb.ErrLookupFailure)
|
return nil, errors.Wrap(kb.ErrLookupFailure)
|
||||||
}
|
}
|
||||||
@ -111,7 +109,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
// run it!
|
// run it!
|
||||||
result, err := query.Run(ctx, closest)
|
result, err := query.Run(ctx, rtp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -170,7 +168,7 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]peer.PeerIn
|
|||||||
// to the given key
|
// to the given key
|
||||||
func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key) (<-chan peer.ID, error) {
|
func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key) (<-chan peer.ID, error) {
|
||||||
e := log.EventBegin(ctx, "getClosestPeers", &key)
|
e := log.EventBegin(ctx, "getClosestPeers", &key)
|
||||||
tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
|
tablepeers := dht.routingTable.ListPeers()
|
||||||
if len(tablepeers) == 0 {
|
if len(tablepeers) == 0 {
|
||||||
return nil, errors.Wrap(kb.ErrLookupFailure)
|
return nil, errors.Wrap(kb.ErrLookupFailure)
|
||||||
}
|
}
|
||||||
@ -313,7 +311,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co
|
|||||||
return &dhtQueryResult{closerPeers: clpeers}, nil
|
return &dhtQueryResult{closerPeers: clpeers}, nil
|
||||||
})
|
})
|
||||||
|
|
||||||
peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
|
peers := dht.routingTable.ListPeers()
|
||||||
_, err := query.Run(ctx, peers)
|
_, err := query.Run(ctx, peers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Query error: %s", err)
|
log.Errorf("Query error: %s", err)
|
||||||
@ -329,13 +327,13 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, er
|
|||||||
return pi, nil
|
return pi, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
closest := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
|
peers := dht.routingTable.ListPeers()
|
||||||
if closest == nil || len(closest) == 0 {
|
if len(peers) == 0 {
|
||||||
return peer.PeerInfo{}, errors.Wrap(kb.ErrLookupFailure)
|
return peer.PeerInfo{}, errors.Wrap(kb.ErrLookupFailure)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sanity...
|
// Sanity...
|
||||||
for _, p := range closest {
|
for _, p := range peers {
|
||||||
if p == id {
|
if p == id {
|
||||||
log.Error("Found target peer in list of closest peers...")
|
log.Error("Found target peer in list of closest peers...")
|
||||||
return dht.peerstore.PeerInfo(p), nil
|
return dht.peerstore.PeerInfo(p), nil
|
||||||
@ -367,7 +365,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, er
|
|||||||
})
|
})
|
||||||
|
|
||||||
// run it!
|
// run it!
|
||||||
result, err := query.Run(ctx, closest)
|
result, err := query.Run(ctx, peers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return peer.PeerInfo{}, err
|
return peer.PeerInfo{}, err
|
||||||
}
|
}
|
||||||
@ -386,8 +384,8 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<
|
|||||||
peerchan := make(chan peer.PeerInfo, asyncQueryBuffer)
|
peerchan := make(chan peer.PeerInfo, asyncQueryBuffer)
|
||||||
peersSeen := peer.Set{}
|
peersSeen := peer.Set{}
|
||||||
|
|
||||||
closest := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
|
peers := dht.routingTable.ListPeers()
|
||||||
if closest == nil || len(closest) == 0 {
|
if len(peers) == 0 {
|
||||||
return nil, errors.Wrap(kb.ErrLookupFailure)
|
return nil, errors.Wrap(kb.ErrLookupFailure)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -432,7 +430,7 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<
|
|||||||
// run it! run it asynchronously to gen peers as results are found.
|
// run it! run it asynchronously to gen peers as results are found.
|
||||||
// this does no error checking
|
// this does no error checking
|
||||||
go func() {
|
go func() {
|
||||||
if _, err := query.Run(ctx, closest); err != nil {
|
if _, err := query.Run(ctx, peers); err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user