mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-30 01:52:26 +08:00
rewrite of provides to better select peers to send RPCs to
refactor test peer creation to be deterministic and reliable a bit of cleanup trying to figure out TestGetFailure add test to verify deterministic peer creation switch put RPC over to use getClosestPeers rm 0xDEADC0DE fix queries not searching peer if its not actually closer
This commit is contained in:
@ -2,6 +2,7 @@ package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"os"
|
||||
@ -252,7 +253,7 @@ func identityConfig(nbits int) (config.Identity, error) {
|
||||
}
|
||||
|
||||
fmt.Printf("generating key pair...")
|
||||
sk, pk, err := ci.GenerateKeyPair(ci.RSA, nbits)
|
||||
sk, pk, err := ci.GenerateKeyPair(ci.RSA, nbits, rand.Reader)
|
||||
if err != nil {
|
||||
return ident, err
|
||||
}
|
||||
|
@ -115,7 +115,7 @@ func setupPeer(a args) (peer.ID, peer.Peerstore, error) {
|
||||
}
|
||||
|
||||
out("generating key pair...")
|
||||
sk, pk, err := ci.GenerateKeyPair(ci.RSA, a.keybits)
|
||||
sk, pk, err := ci.GenerateKeyPair(ci.RSA, a.keybits, u.NewTimeSeededRand())
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
|
@ -1,7 +1,9 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
|
||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||
syncds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
|
||||
"github.com/jbenet/go-ipfs/blocks/blockstore"
|
||||
@ -28,7 +30,7 @@ func NewMockNode() (*IpfsNode, error) {
|
||||
nd := new(IpfsNode)
|
||||
|
||||
// Generate Identity
|
||||
sk, pk, err := ci.GenerateKeyPair(ci.RSA, 1024)
|
||||
sk, pk, err := ci.GenerateKeyPair(ci.RSA, 1024, rand.Reader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"crypto/elliptic"
|
||||
"crypto/hmac"
|
||||
@ -75,10 +76,10 @@ type PubKey interface {
|
||||
type GenSharedKey func([]byte) ([]byte, error)
|
||||
|
||||
// Generates a keypair of the given type and bitsize
|
||||
func GenerateKeyPair(typ, bits int) (PrivKey, PubKey, error) {
|
||||
func GenerateKeyPair(typ, bits int, src io.Reader) (PrivKey, PubKey, error) {
|
||||
switch typ {
|
||||
case RSA:
|
||||
priv, err := rsa.GenerateKey(rand.Reader, bits)
|
||||
priv, err := rsa.GenerateKey(src, bits)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
@ -2,11 +2,12 @@ package crypto
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestRsaKeys(t *testing.T) {
|
||||
sk, pk, err := GenerateKeyPair(RSA, 512)
|
||||
sk, pk, err := GenerateKeyPair(RSA, 512, u.NewTimeSeededRand())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -90,7 +91,7 @@ func testKeyEquals(t *testing.T, k Key) {
|
||||
t.Fatal("Key not equal to key with same bytes.")
|
||||
}
|
||||
|
||||
sk, pk, err := GenerateKeyPair(RSA, 512)
|
||||
sk, pk, err := GenerateKeyPair(RSA, 512, u.NewTimeSeededRand())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -15,7 +15,7 @@ func TestRoutingResolve(t *testing.T) {
|
||||
resolver := NewRoutingResolver(d)
|
||||
publisher := NewRoutingPublisher(d)
|
||||
|
||||
privk, pubk, err := ci.GenerateKeyPair(ci.RSA, 512)
|
||||
privk, pubk, err := ci.GenerateKeyPair(ci.RSA, 512, u.NewTimeSeededRand())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -41,7 +41,7 @@ func New(ctx context.Context) Mocknet {
|
||||
}
|
||||
|
||||
func (mn *mocknet) GenPeer() (inet.Network, error) {
|
||||
sk, _, err := testutil.RandKeyPair(512)
|
||||
sk, _, err := testutil.SeededKeyPair(512, int64(len(mn.nets)))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -103,9 +103,8 @@ func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.ID) error {
|
||||
}
|
||||
|
||||
// putValueToNetwork stores the given key/value pair at the peer 'p'
|
||||
// meaning: it sends a PUT_VALUE message to p
|
||||
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.ID,
|
||||
key string, rec *pb.Record) error {
|
||||
key u.Key, rec *pb.Record) error {
|
||||
|
||||
pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0)
|
||||
pmes.Record = rec
|
||||
@ -285,7 +284,7 @@ func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
|
||||
}
|
||||
|
||||
// betterPeerToQuery returns nearestPeersToQuery, but iff closer than self.
|
||||
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.ID {
|
||||
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) []peer.ID {
|
||||
closer := dht.nearestPeersToQuery(pmes, count)
|
||||
|
||||
// no node? nil
|
||||
@ -302,11 +301,16 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.ID {
|
||||
}
|
||||
|
||||
var filtered []peer.ID
|
||||
for _, p := range closer {
|
||||
for _, clp := range closer {
|
||||
// Dont send a peer back themselves
|
||||
if p == clp {
|
||||
continue
|
||||
}
|
||||
|
||||
// must all be closer than self
|
||||
key := u.Key(pmes.GetKey())
|
||||
if !kb.Closer(dht.self, p, key) {
|
||||
filtered = append(filtered, p)
|
||||
if !kb.Closer(dht.self, clp, key) {
|
||||
filtered = append(filtered, clp)
|
||||
}
|
||||
}
|
||||
|
||||
@ -323,23 +327,6 @@ func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, p peer.ID) error
|
||||
return dht.network.DialPeer(ctx, p)
|
||||
}
|
||||
|
||||
//TODO: this should be smarter about which keys it selects.
|
||||
func (dht *IpfsDHT) loadProvidableKeys() error {
|
||||
kl, err := dht.datastore.KeyList()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, dsk := range kl {
|
||||
k := u.KeyFromDsKey(dsk)
|
||||
if len(k) == 0 {
|
||||
log.Errorf("loadProvidableKeys error: %v", dsk)
|
||||
}
|
||||
|
||||
dht.providers.AddProvider(k, dht.self)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// PingRoutine periodically pings nearest neighbors.
|
||||
func (dht *IpfsDHT) PingRoutine(t time.Duration) {
|
||||
defer dht.Children().Done()
|
||||
|
@ -14,7 +14,6 @@ import (
|
||||
dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
|
||||
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
|
||||
// ci "github.com/jbenet/go-ipfs/crypto"
|
||||
inet "github.com/jbenet/go-ipfs/net"
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
routing "github.com/jbenet/go-ipfs/routing"
|
||||
@ -33,9 +32,9 @@ func init() {
|
||||
}
|
||||
}
|
||||
|
||||
func setupDHT(ctx context.Context, t *testing.T, addr ma.Multiaddr) *IpfsDHT {
|
||||
func setupDHT(ctx context.Context, t *testing.T, addr ma.Multiaddr, seed int64) *IpfsDHT {
|
||||
|
||||
sk, pk, err := testutil.RandKeyPair(512)
|
||||
sk, pk, err := testutil.SeededKeyPair(512, seed)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -71,7 +70,7 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
addrs[i] = testutil.RandLocalTCPAddress()
|
||||
dhts[i] = setupDHT(ctx, t, addrs[i])
|
||||
dhts[i] = setupDHT(ctx, t, addrs[i], int64(i))
|
||||
peers[i] = dhts[i].self
|
||||
}
|
||||
|
||||
@ -120,8 +119,8 @@ func TestPing(t *testing.T) {
|
||||
addrA := testutil.RandLocalTCPAddress()
|
||||
addrB := testutil.RandLocalTCPAddress()
|
||||
|
||||
dhtA := setupDHT(ctx, t, addrA)
|
||||
dhtB := setupDHT(ctx, t, addrB)
|
||||
dhtA := setupDHT(ctx, t, addrA, 1)
|
||||
dhtB := setupDHT(ctx, t, addrB, 2)
|
||||
|
||||
peerA := dhtA.self
|
||||
peerB := dhtB.self
|
||||
@ -153,8 +152,8 @@ func TestValueGetSet(t *testing.T) {
|
||||
addrA := testutil.RandLocalTCPAddress()
|
||||
addrB := testutil.RandLocalTCPAddress()
|
||||
|
||||
dhtA := setupDHT(ctx, t, addrA)
|
||||
dhtB := setupDHT(ctx, t, addrB)
|
||||
dhtA := setupDHT(ctx, t, addrA, 1)
|
||||
dhtB := setupDHT(ctx, t, addrB, 2)
|
||||
|
||||
defer dhtA.Close()
|
||||
defer dhtB.Close()
|
||||
@ -642,8 +641,8 @@ func TestConnectCollision(t *testing.T) {
|
||||
addrA := testutil.RandLocalTCPAddress()
|
||||
addrB := testutil.RandLocalTCPAddress()
|
||||
|
||||
dhtA := setupDHT(ctx, t, addrA)
|
||||
dhtB := setupDHT(ctx, t, addrB)
|
||||
dhtA := setupDHT(ctx, t, addrA, int64((rtime*2)+1))
|
||||
dhtB := setupDHT(ctx, t, addrB, int64((rtime*2)+2))
|
||||
|
||||
peerA := dhtA.self
|
||||
peerB := dhtB.self
|
||||
|
@ -47,9 +47,8 @@ func TestGetFailures(t *testing.T) {
|
||||
t.Fatal("Did not get expected error!")
|
||||
}
|
||||
|
||||
msgs := make(chan *pb.Message, 100)
|
||||
t.Log("Timeout test passed.")
|
||||
|
||||
// u.POut("NotFound Test\n")
|
||||
// Reply with failures to every message
|
||||
nets[1].SetHandler(inet.ProtocolDHT, func(s inet.Stream) {
|
||||
defer s.Close()
|
||||
@ -68,8 +67,6 @@ func TestGetFailures(t *testing.T) {
|
||||
if err := pbw.WriteMsg(resp); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
msgs <- resp
|
||||
})
|
||||
|
||||
// This one should fail with NotFound
|
||||
@ -83,6 +80,8 @@ func TestGetFailures(t *testing.T) {
|
||||
t.Fatal("expected error, got none.")
|
||||
}
|
||||
|
||||
t.Log("ErrNotFound check passed!")
|
||||
|
||||
// Now we test this DHT's handleGetValue failure
|
||||
{
|
||||
typ := pb.Message_GET_VALUE
|
||||
|
@ -93,7 +93,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
|
||||
}
|
||||
|
||||
// Find closest peer on given cluster to desired key and reply with that info
|
||||
closer := dht.betterPeersToQuery(pmes, CloserPeerCount)
|
||||
closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
|
||||
closerinfos := peer.PeerInfos(dht.peerstore, closer)
|
||||
if closer != nil {
|
||||
for _, pi := range closerinfos {
|
||||
@ -137,6 +137,9 @@ func (dht *IpfsDHT) handlePing(_ context.Context, p peer.ID, pmes *pb.Message) (
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
|
||||
log.Errorf("handle find peer %s start", p)
|
||||
defer log.Errorf("handle find peer %s end", p)
|
||||
|
||||
resp := pb.NewMessage(pmes.GetType(), "", pmes.GetClusterLevel())
|
||||
var closest []peer.ID
|
||||
|
||||
@ -144,11 +147,11 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Mess
|
||||
if peer.ID(pmes.GetKey()) == dht.self {
|
||||
closest = []peer.ID{dht.self}
|
||||
} else {
|
||||
closest = dht.betterPeersToQuery(pmes, CloserPeerCount)
|
||||
closest = dht.betterPeersToQuery(pmes, p, CloserPeerCount)
|
||||
}
|
||||
|
||||
if closest == nil {
|
||||
log.Debugf("handleFindPeer: could not find anything.")
|
||||
log.Warningf("handleFindPeer: could not find anything.")
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
@ -189,7 +192,7 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.
|
||||
}
|
||||
|
||||
// Also send closer peers.
|
||||
closer := dht.betterPeersToQuery(pmes, CloserPeerCount)
|
||||
closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
|
||||
if closer != nil {
|
||||
infos := peer.PeerInfos(dht.peerstore, providers)
|
||||
resp.CloserPeers = pb.PeerInfosToPBPeers(dht.network, infos)
|
||||
|
@ -7,8 +7,8 @@ import (
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
queue "github.com/jbenet/go-ipfs/peer/queue"
|
||||
"github.com/jbenet/go-ipfs/routing"
|
||||
kb "github.com/jbenet/go-ipfs/routing/kbucket"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
pset "github.com/jbenet/go-ipfs/util/peerset"
|
||||
todoctr "github.com/jbenet/go-ipfs/util/todocounter"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
@ -71,7 +71,7 @@ type dhtQueryRunner struct {
|
||||
peersToQuery *queue.ChanQueue
|
||||
|
||||
// peersSeen are all the peers queried. used to prevent querying same peer 2x
|
||||
peersSeen peer.Set
|
||||
peersSeen *pset.PeerSet
|
||||
|
||||
// rateLimit is a channel used to rate limit our processing (semaphore)
|
||||
rateLimit chan struct{}
|
||||
@ -97,7 +97,7 @@ func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner {
|
||||
query: q,
|
||||
peersToQuery: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(q.key)),
|
||||
peersRemaining: todoctr.NewSyncCounter(),
|
||||
peersSeen: peer.Set{},
|
||||
peersSeen: pset.New(),
|
||||
rateLimit: make(chan struct{}, q.concurrency),
|
||||
cg: ctxgroup.WithContext(ctx),
|
||||
}
|
||||
@ -117,7 +117,7 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
|
||||
|
||||
// add all the peers we got first.
|
||||
for _, p := range peers {
|
||||
r.addPeerToQuery(r.cg.Context(), p, "") // don't have access to self here...
|
||||
r.addPeerToQuery(r.cg.Context(), p)
|
||||
}
|
||||
|
||||
// go do this thing.
|
||||
@ -153,32 +153,17 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID, benchmark peer.ID) {
|
||||
func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID) {
|
||||
// if new peer is ourselves...
|
||||
if next == r.query.dialer.LocalPeer() {
|
||||
return
|
||||
}
|
||||
|
||||
// if new peer further away than whom we got it from, don't bother (loops)
|
||||
// TODO----------- this benchmark should be replaced by a heap:
|
||||
// we should be doing the s/kademlia "continue to search"
|
||||
// (i.e. put all of them in a heap sorted by dht distance and then just
|
||||
// pull from the the top until a) you exhaust all peers you get,
|
||||
// b) you succeed, c) your context expires.
|
||||
if benchmark != "" && kb.Closer(benchmark, next, r.query.key) {
|
||||
if !r.peersSeen.TryAdd(next) {
|
||||
log.Debug("query peer was already seen")
|
||||
return
|
||||
}
|
||||
|
||||
// if already seen, no need.
|
||||
r.Lock()
|
||||
_, found := r.peersSeen[next]
|
||||
if found {
|
||||
r.Unlock()
|
||||
return
|
||||
}
|
||||
r.peersSeen[next] = struct{}{}
|
||||
r.Unlock()
|
||||
|
||||
log.Debugf("adding peer to query: %v", next)
|
||||
|
||||
// do this after unlocking to prevent possible deadlocks.
|
||||
@ -278,7 +263,7 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) {
|
||||
}
|
||||
|
||||
r.query.dialer.Peerstore().AddAddresses(next.ID, next.Addrs)
|
||||
r.addPeerToQuery(cg.Context(), next.ID, p)
|
||||
r.addPeerToQuery(cg.Context(), next.ID)
|
||||
log.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs)
|
||||
}
|
||||
} else {
|
||||
|
@ -40,19 +40,24 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error
|
||||
return err
|
||||
}
|
||||
|
||||
peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue)
|
||||
pchan, err := dht.getClosestPeers(ctx, key, KValue)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
query := newQuery(key, dht.network, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
|
||||
log.Debugf("%s PutValue qry part %v", dht.self, p)
|
||||
err := dht.putValueToNetwork(ctx, p, string(key), rec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &dhtQueryResult{success: true}, nil
|
||||
})
|
||||
|
||||
_, err = query.Run(ctx, peers)
|
||||
return err
|
||||
wg := sync.WaitGroup{}
|
||||
for p := range pchan {
|
||||
wg.Add(1)
|
||||
go func(p peer.ID) {
|
||||
defer wg.Done()
|
||||
err := dht.putValueToNetwork(ctx, p, key, rec)
|
||||
if err != nil {
|
||||
log.Errorf("failed putting value to peer: %s", err)
|
||||
}
|
||||
}(p)
|
||||
}
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetValue searches for the value corresponding to given Key.
|
||||
@ -111,18 +116,19 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
|
||||
// Provide makes this node announce that it can provide a value for the given key
|
||||
func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error {
|
||||
|
||||
log.Event(ctx, "Provide Value start", &key)
|
||||
defer log.Event(ctx, "Provide Value end", &key)
|
||||
dht.providers.AddProvider(key, dht.self)
|
||||
peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), PoolSize)
|
||||
if len(peers) == 0 {
|
||||
return nil
|
||||
|
||||
peers, err := dht.getClosestPeers(ctx, key, KValue)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
//TODO FIX: this doesn't work! it needs to be sent to the actual nearest peers.
|
||||
// `peers` are the closest peers we have, not the ones that should get the value.
|
||||
for _, p := range peers {
|
||||
for p := range peers {
|
||||
err := dht.putProvider(ctx, p, string(key))
|
||||
if err != nil {
|
||||
return err
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@ -137,6 +143,87 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]peer.PeerIn
|
||||
return providers, nil
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key, count int) (<-chan peer.ID, error) {
|
||||
log.Error("Get Closest Peers")
|
||||
tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
|
||||
if len(tablepeers) == 0 {
|
||||
return nil, kb.ErrLookupFailure
|
||||
}
|
||||
|
||||
out := make(chan peer.ID, count)
|
||||
peerset := pset.NewLimited(count)
|
||||
|
||||
for _, p := range tablepeers {
|
||||
out <- p
|
||||
peerset.Add(p)
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
for _, p := range tablepeers {
|
||||
wg.Add(1)
|
||||
go func(p peer.ID) {
|
||||
dht.getClosestPeersRecurse(ctx, key, p, peerset, out)
|
||||
wg.Done()
|
||||
}(p)
|
||||
}
|
||||
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(out)
|
||||
log.Error("Closing closest peer chan")
|
||||
}()
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) getClosestPeersRecurse(ctx context.Context, key u.Key, p peer.ID, peers *pset.PeerSet, peerOut chan<- peer.ID) {
|
||||
log.Error("closest peers recurse")
|
||||
defer log.Error("closest peers recurse end")
|
||||
closer, err := dht.closerPeersSingle(ctx, key, p)
|
||||
if err != nil {
|
||||
log.Errorf("error getting closer peers: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
for _, p := range closer {
|
||||
if kb.Closer(p, dht.self, key) && peers.TryAdd(p) {
|
||||
select {
|
||||
case peerOut <- p:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(p peer.ID) {
|
||||
dht.getClosestPeersRecurse(ctx, key, p, peers, peerOut)
|
||||
wg.Done()
|
||||
}(p)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) closerPeersSingle(ctx context.Context, key u.Key, p peer.ID) ([]peer.ID, error) {
|
||||
log.Errorf("closest peers single %s %s", p, key)
|
||||
defer log.Errorf("closest peers single end %s %s", p, key)
|
||||
pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var out []peer.ID
|
||||
for _, pbp := range pmes.GetCloserPeers() {
|
||||
pid := peer.ID(pbp.GetId())
|
||||
dht.peerstore.AddAddresses(pid, pbp.Addresses())
|
||||
err := dht.ensureConnectedToPeer(ctx, pid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, pid)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
|
||||
// Peers will be returned on the channel as soon as they are found, even before
|
||||
// the search query completes.
|
||||
@ -182,6 +269,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co
|
||||
// Add unique providers from request, up to 'count'
|
||||
for _, prov := range provs {
|
||||
if ps.TryAdd(prov.ID) {
|
||||
dht.peerstore.AddAddresses(prov.ID, prov.Addrs)
|
||||
select {
|
||||
case peerOut <- prov:
|
||||
case <-ctx.Done():
|
||||
|
59
routing/kbucket/sorting.go
Normal file
59
routing/kbucket/sorting.go
Normal file
@ -0,0 +1,59 @@
|
||||
package kbucket
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
"sort"
|
||||
)
|
||||
|
||||
// A helper struct to sort peers by their distance to the local node
|
||||
type peerDistance struct {
|
||||
p peer.ID
|
||||
distance ID
|
||||
}
|
||||
|
||||
// peerSorterArr implements sort.Interface to sort peers by xor distance
|
||||
type peerSorterArr []*peerDistance
|
||||
|
||||
func (p peerSorterArr) Len() int { return len(p) }
|
||||
func (p peerSorterArr) Swap(a, b int) { p[a], p[b] = p[b], p[a] }
|
||||
func (p peerSorterArr) Less(a, b int) bool {
|
||||
return p[a].distance.less(p[b].distance)
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) peerSorterArr {
|
||||
for e := peerList.Front(); e != nil; e = e.Next() {
|
||||
p := e.Value.(peer.ID)
|
||||
pID := ConvertPeerID(p)
|
||||
pd := peerDistance{
|
||||
p: p,
|
||||
distance: xor(target, pID),
|
||||
}
|
||||
peerArr = append(peerArr, &pd)
|
||||
if e == nil {
|
||||
log.Debug("list element was nil")
|
||||
return peerArr
|
||||
}
|
||||
}
|
||||
return peerArr
|
||||
}
|
||||
|
||||
func SortClosestPeers(peers []peer.ID, target ID) []peer.ID {
|
||||
var psarr peerSorterArr
|
||||
for _, p := range peers {
|
||||
pID := ConvertPeerID(p)
|
||||
pd := &peerDistance{
|
||||
p: p,
|
||||
distance: xor(target, pID),
|
||||
}
|
||||
psarr = append(psarr, pd)
|
||||
}
|
||||
sort.Sort(psarr)
|
||||
var out []peer.ID
|
||||
for _, p := range psarr {
|
||||
out = append(out, p.p)
|
||||
}
|
||||
return out
|
||||
}
|
@ -2,7 +2,6 @@
|
||||
package kbucket
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
@ -103,40 +102,6 @@ func (rt *RoutingTable) nextBucket() peer.ID {
|
||||
return ""
|
||||
}
|
||||
|
||||
// A helper struct to sort peers by their distance to the local node
|
||||
type peerDistance struct {
|
||||
p peer.ID
|
||||
distance ID
|
||||
}
|
||||
|
||||
// peerSorterArr implements sort.Interface to sort peers by xor distance
|
||||
type peerSorterArr []*peerDistance
|
||||
|
||||
func (p peerSorterArr) Len() int { return len(p) }
|
||||
func (p peerSorterArr) Swap(a, b int) { p[a], p[b] = p[b], p[a] }
|
||||
func (p peerSorterArr) Less(a, b int) bool {
|
||||
return p[a].distance.less(p[b].distance)
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) peerSorterArr {
|
||||
for e := peerList.Front(); e != nil; e = e.Next() {
|
||||
p := e.Value.(peer.ID)
|
||||
pID := ConvertPeerID(p)
|
||||
pd := peerDistance{
|
||||
p: p,
|
||||
distance: xor(target, pID),
|
||||
}
|
||||
peerArr = append(peerArr, &pd)
|
||||
if e == nil {
|
||||
log.Debug("list element was nil")
|
||||
return peerArr
|
||||
}
|
||||
}
|
||||
return peerArr
|
||||
}
|
||||
|
||||
// Find a specific peer by ID or return nil
|
||||
func (rt *RoutingTable) Find(id peer.ID) peer.ID {
|
||||
srch := rt.NearestPeers(ConvertPeerID(id), 1)
|
||||
|
@ -17,7 +17,11 @@ import (
|
||||
)
|
||||
|
||||
func RandKeyPair(bits int) (ci.PrivKey, ci.PubKey, error) {
|
||||
return ci.GenerateKeyPair(ci.RSA, bits)
|
||||
return ci.GenerateKeyPair(ci.RSA, bits, crand.Reader)
|
||||
}
|
||||
|
||||
func SeededKeyPair(bits int, seed int64) (ci.PrivKey, ci.PubKey, error) {
|
||||
return ci.GenerateKeyPair(ci.RSA, bits, u.NewSeededRand(seed))
|
||||
}
|
||||
|
||||
// RandPeerID generates random "valid" peer IDs. it does not NEED to generate
|
||||
@ -120,7 +124,7 @@ func RandPeerNetParams() (*PeerNetParams, error) {
|
||||
var p PeerNetParams
|
||||
var err error
|
||||
p.Addr = RandLocalTCPAddress()
|
||||
p.PrivKey, p.PubKey, err = ci.GenerateKeyPair(ci.RSA, 512)
|
||||
p.PrivKey, p.PubKey, err = ci.GenerateKeyPair(ci.RSA, 512, u.NewTimeSeededRand())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -107,6 +107,13 @@ func NewTimeSeededRand() io.Reader {
|
||||
}
|
||||
}
|
||||
|
||||
func NewSeededRand(seed int64) io.Reader {
|
||||
src := rand.NewSource(seed)
|
||||
return &randGen{
|
||||
Rand: *rand.New(src),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *randGen) Read(p []byte) (n int, err error) {
|
||||
for i := 0; i < len(p); i++ {
|
||||
p[i] = byte(r.Rand.Intn(255))
|
||||
|
Reference in New Issue
Block a user