mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-30 09:59:13 +08:00
Merge pull request #2841 from ipfs/feat/efficient-dht
a few small changes to make the dht more efficient
This commit is contained in:
@ -117,34 +117,6 @@ func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID,
|
||||
return nil
|
||||
}
|
||||
|
||||
// putProvider sends a message to peer 'p' saying that the local node
|
||||
// can provide the value of 'key'
|
||||
func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.ID, skey string) error {
|
||||
|
||||
// add self as the provider
|
||||
pi := pstore.PeerInfo{
|
||||
ID: dht.self,
|
||||
Addrs: dht.host.Addrs(),
|
||||
}
|
||||
|
||||
// // only share WAN-friendly addresses ??
|
||||
// pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs)
|
||||
if len(pi.Addrs) < 1 {
|
||||
// log.Infof("%s putProvider: %s for %s error: no wan-friendly addresses", dht.self, p, key.Key(key), pi.Addrs)
|
||||
return fmt.Errorf("no known addresses for self. cannot put provider.")
|
||||
}
|
||||
|
||||
pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, skey, 0)
|
||||
pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]pstore.PeerInfo{pi})
|
||||
err := dht.sendMessage(ctx, p, pmes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debugf("%s putProvider: %s for %s (%s)", dht.self, p, key.Key(skey), pi.Addrs)
|
||||
return nil
|
||||
}
|
||||
|
||||
var errInvalidRecord = errors.New("received invalid record")
|
||||
|
||||
// getValueOrPeers queries a particular peer p for the value for
|
||||
|
@ -2,6 +2,7 @@ package dht
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -243,13 +244,18 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key key.Key) error {
|
||||
return err
|
||||
}
|
||||
|
||||
mes, err := dht.makeProvRecord(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
for p := range peers {
|
||||
wg.Add(1)
|
||||
go func(p peer.ID) {
|
||||
defer wg.Done()
|
||||
log.Debugf("putProvider(%s, %s)", key, p)
|
||||
err := dht.putProvider(ctx, p, string(key))
|
||||
err := dht.sendMessage(ctx, p, mes)
|
||||
if err != nil {
|
||||
log.Debug(err)
|
||||
}
|
||||
@ -258,6 +264,22 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key key.Key) error {
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
func (dht *IpfsDHT) makeProvRecord(skey key.Key) (*pb.Message, error) {
|
||||
pi := pstore.PeerInfo{
|
||||
ID: dht.self,
|
||||
Addrs: dht.host.Addrs(),
|
||||
}
|
||||
|
||||
// // only share WAN-friendly addresses ??
|
||||
// pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs)
|
||||
if len(pi.Addrs) < 1 {
|
||||
return nil, fmt.Errorf("no known addresses for self. cannot put provider.")
|
||||
}
|
||||
|
||||
pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, string(skey), 0)
|
||||
pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]pstore.PeerInfo{pi})
|
||||
return pmes, nil
|
||||
}
|
||||
|
||||
// FindProviders searches until the context expires.
|
||||
func (dht *IpfsDHT) FindProviders(ctx context.Context, key key.Key) ([]pstore.PeerInfo, error) {
|
||||
|
@ -48,11 +48,11 @@ func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m pstore
|
||||
// Update adds or moves the given peer to the front of its respective bucket
|
||||
// If a peer gets removed from a bucket, it is returned
|
||||
func (rt *RoutingTable) Update(p peer.ID) {
|
||||
rt.tabLock.Lock()
|
||||
defer rt.tabLock.Unlock()
|
||||
peerID := ConvertPeerID(p)
|
||||
cpl := commonPrefixLen(peerID, rt.local)
|
||||
|
||||
rt.tabLock.Lock()
|
||||
defer rt.tabLock.Unlock()
|
||||
bucketID := cpl
|
||||
if bucketID >= len(rt.Buckets) {
|
||||
bucketID = len(rt.Buckets) - 1
|
||||
@ -144,10 +144,10 @@ func (rt *RoutingTable) NearestPeer(id ID) peer.ID {
|
||||
|
||||
// NearestPeers returns a list of the 'count' closest peers to the given ID
|
||||
func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID {
|
||||
rt.tabLock.RLock()
|
||||
defer rt.tabLock.RUnlock()
|
||||
cpl := commonPrefixLen(id, rt.local)
|
||||
|
||||
rt.tabLock.RLock()
|
||||
|
||||
// Get bucket at cpl index or last bucket
|
||||
var bucket *Bucket
|
||||
if cpl >= len(rt.Buckets) {
|
||||
@ -170,6 +170,7 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID {
|
||||
peerArr = copyPeersFromList(id, peerArr, plist)
|
||||
}
|
||||
}
|
||||
rt.tabLock.RUnlock()
|
||||
|
||||
// Sort by distance to local peer
|
||||
sort.Sort(peerArr)
|
||||
|
Reference in New Issue
Block a user