From 07b064010e584f046234681de6f47d79d09b9689 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sun, 14 Dec 2014 00:50:49 +0000 Subject: [PATCH] rewrite of provides to better select peers to send RPCs to refactor test peer creation to be deterministic and reliable a bit of cleanup trying to figure out TestGetFailure add test to verify deterministic peer creation switch put RPC over to use getClosestPeers rm 0xDEADC0DE fix queries not searching peer if its not actually closer --- cmd/ipfs/init.go | 3 +- cmd/seccat/seccat.go | 2 +- core/mock.go | 4 +- crypto/key.go | 5 +- crypto/key_test.go | 5 +- namesys/resolve_test.go | 2 +- net/mock/mock_net.go | 2 +- routing/dht/dht.go | 33 +++------- routing/dht/dht_test.go | 19 +++--- routing/dht/ext_test.go | 7 +-- routing/dht/handlers.go | 11 ++-- routing/dht/query.go | 31 +++------ routing/dht/routing.go | 126 +++++++++++++++++++++++++++++++------ routing/kbucket/sorting.go | 59 +++++++++++++++++ routing/kbucket/table.go | 35 ----------- util/testutil/gen.go | 8 ++- util/util.go | 7 +++ 17 files changed, 230 insertions(+), 129 deletions(-) create mode 100644 routing/kbucket/sorting.go diff --git a/cmd/ipfs/init.go b/cmd/ipfs/init.go index bc609659f..62415f3a7 100644 --- a/cmd/ipfs/init.go +++ b/cmd/ipfs/init.go @@ -2,6 +2,7 @@ package main import ( "bytes" + "crypto/rand" "encoding/base64" "fmt" "os" @@ -252,7 +253,7 @@ func identityConfig(nbits int) (config.Identity, error) { } fmt.Printf("generating key pair...") - sk, pk, err := ci.GenerateKeyPair(ci.RSA, nbits) + sk, pk, err := ci.GenerateKeyPair(ci.RSA, nbits, rand.Reader) if err != nil { return ident, err } diff --git a/cmd/seccat/seccat.go b/cmd/seccat/seccat.go index 625f64ec0..a858593e2 100644 --- a/cmd/seccat/seccat.go +++ b/cmd/seccat/seccat.go @@ -115,7 +115,7 @@ func setupPeer(a args) (peer.ID, peer.Peerstore, error) { } out("generating key pair...") - sk, pk, err := ci.GenerateKeyPair(ci.RSA, a.keybits) + sk, pk, err := ci.GenerateKeyPair(ci.RSA, a.keybits, u.NewTimeSeededRand()) if err != nil { return "", nil, err } diff --git a/core/mock.go b/core/mock.go index a0ce6a460..92946276c 100644 --- a/core/mock.go +++ b/core/mock.go @@ -1,7 +1,9 @@ package core import ( + "crypto/rand" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" syncds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" "github.com/jbenet/go-ipfs/blocks/blockstore" @@ -28,7 +30,7 @@ func NewMockNode() (*IpfsNode, error) { nd := new(IpfsNode) // Generate Identity - sk, pk, err := ci.GenerateKeyPair(ci.RSA, 1024) + sk, pk, err := ci.GenerateKeyPair(ci.RSA, 1024, rand.Reader) if err != nil { return nil, err } diff --git a/crypto/key.go b/crypto/key.go index 67a316527..fe687691e 100644 --- a/crypto/key.go +++ b/crypto/key.go @@ -8,6 +8,7 @@ import ( "encoding/base64" "errors" "fmt" + "io" "crypto/elliptic" "crypto/hmac" @@ -75,10 +76,10 @@ type PubKey interface { type GenSharedKey func([]byte) ([]byte, error) // Generates a keypair of the given type and bitsize -func GenerateKeyPair(typ, bits int) (PrivKey, PubKey, error) { +func GenerateKeyPair(typ, bits int, src io.Reader) (PrivKey, PubKey, error) { switch typ { case RSA: - priv, err := rsa.GenerateKey(rand.Reader, bits) + priv, err := rsa.GenerateKey(src, bits) if err != nil { return nil, nil, err } diff --git a/crypto/key_test.go b/crypto/key_test.go index 1300be8f8..d771ea648 100644 --- a/crypto/key_test.go +++ b/crypto/key_test.go @@ -2,11 +2,12 @@ package crypto import ( "bytes" + u "github.com/jbenet/go-ipfs/util" "testing" ) func TestRsaKeys(t *testing.T) { - sk, pk, err := GenerateKeyPair(RSA, 512) + sk, pk, err := GenerateKeyPair(RSA, 512, u.NewTimeSeededRand()) if err != nil { t.Fatal(err) } @@ -90,7 +91,7 @@ func testKeyEquals(t *testing.T, k Key) { t.Fatal("Key not equal to key with same bytes.") } - sk, pk, err := GenerateKeyPair(RSA, 512) + sk, pk, err := GenerateKeyPair(RSA, 512, u.NewTimeSeededRand()) if err != nil { t.Fatal(err) } diff --git a/namesys/resolve_test.go b/namesys/resolve_test.go index 84e4f1cb6..592c344d7 100644 --- a/namesys/resolve_test.go +++ b/namesys/resolve_test.go @@ -15,7 +15,7 @@ func TestRoutingResolve(t *testing.T) { resolver := NewRoutingResolver(d) publisher := NewRoutingPublisher(d) - privk, pubk, err := ci.GenerateKeyPair(ci.RSA, 512) + privk, pubk, err := ci.GenerateKeyPair(ci.RSA, 512, u.NewTimeSeededRand()) if err != nil { t.Fatal(err) } diff --git a/net/mock/mock_net.go b/net/mock/mock_net.go index ffed78b33..ab3359237 100644 --- a/net/mock/mock_net.go +++ b/net/mock/mock_net.go @@ -41,7 +41,7 @@ func New(ctx context.Context) Mocknet { } func (mn *mocknet) GenPeer() (inet.Network, error) { - sk, _, err := testutil.RandKeyPair(512) + sk, _, err := testutil.SeededKeyPair(512, int64(len(mn.nets))) if err != nil { return nil, err } diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 4cbf68e43..1573f3477 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -103,9 +103,8 @@ func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.ID) error { } // putValueToNetwork stores the given key/value pair at the peer 'p' -// meaning: it sends a PUT_VALUE message to p func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.ID, - key string, rec *pb.Record) error { + key u.Key, rec *pb.Record) error { pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0) pmes.Record = rec @@ -285,7 +284,7 @@ func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID { } // betterPeerToQuery returns nearestPeersToQuery, but iff closer than self. -func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.ID { +func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) []peer.ID { closer := dht.nearestPeersToQuery(pmes, count) // no node? nil @@ -302,11 +301,16 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.ID { } var filtered []peer.ID - for _, p := range closer { + for _, clp := range closer { + // Dont send a peer back themselves + if p == clp { + continue + } + // must all be closer than self key := u.Key(pmes.GetKey()) - if !kb.Closer(dht.self, p, key) { - filtered = append(filtered, p) + if !kb.Closer(dht.self, clp, key) { + filtered = append(filtered, clp) } } @@ -323,23 +327,6 @@ func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, p peer.ID) error return dht.network.DialPeer(ctx, p) } -//TODO: this should be smarter about which keys it selects. -func (dht *IpfsDHT) loadProvidableKeys() error { - kl, err := dht.datastore.KeyList() - if err != nil { - return err - } - for _, dsk := range kl { - k := u.KeyFromDsKey(dsk) - if len(k) == 0 { - log.Errorf("loadProvidableKeys error: %v", dsk) - } - - dht.providers.AddProvider(k, dht.self) - } - return nil -} - // PingRoutine periodically pings nearest neighbors. func (dht *IpfsDHT) PingRoutine(t time.Duration) { defer dht.Children().Done() diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index 5603c4d5c..bbc7b9692 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -14,7 +14,6 @@ import ( dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" - // ci "github.com/jbenet/go-ipfs/crypto" inet "github.com/jbenet/go-ipfs/net" peer "github.com/jbenet/go-ipfs/peer" routing "github.com/jbenet/go-ipfs/routing" @@ -33,9 +32,9 @@ func init() { } } -func setupDHT(ctx context.Context, t *testing.T, addr ma.Multiaddr) *IpfsDHT { +func setupDHT(ctx context.Context, t *testing.T, addr ma.Multiaddr, seed int64) *IpfsDHT { - sk, pk, err := testutil.RandKeyPair(512) + sk, pk, err := testutil.SeededKeyPair(512, seed) if err != nil { t.Fatal(err) } @@ -71,7 +70,7 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer for i := 0; i < n; i++ { addrs[i] = testutil.RandLocalTCPAddress() - dhts[i] = setupDHT(ctx, t, addrs[i]) + dhts[i] = setupDHT(ctx, t, addrs[i], int64(i)) peers[i] = dhts[i].self } @@ -120,8 +119,8 @@ func TestPing(t *testing.T) { addrA := testutil.RandLocalTCPAddress() addrB := testutil.RandLocalTCPAddress() - dhtA := setupDHT(ctx, t, addrA) - dhtB := setupDHT(ctx, t, addrB) + dhtA := setupDHT(ctx, t, addrA, 1) + dhtB := setupDHT(ctx, t, addrB, 2) peerA := dhtA.self peerB := dhtB.self @@ -153,8 +152,8 @@ func TestValueGetSet(t *testing.T) { addrA := testutil.RandLocalTCPAddress() addrB := testutil.RandLocalTCPAddress() - dhtA := setupDHT(ctx, t, addrA) - dhtB := setupDHT(ctx, t, addrB) + dhtA := setupDHT(ctx, t, addrA, 1) + dhtB := setupDHT(ctx, t, addrB, 2) defer dhtA.Close() defer dhtB.Close() @@ -642,8 +641,8 @@ func TestConnectCollision(t *testing.T) { addrA := testutil.RandLocalTCPAddress() addrB := testutil.RandLocalTCPAddress() - dhtA := setupDHT(ctx, t, addrA) - dhtB := setupDHT(ctx, t, addrB) + dhtA := setupDHT(ctx, t, addrA, int64((rtime*2)+1)) + dhtB := setupDHT(ctx, t, addrB, int64((rtime*2)+2)) peerA := dhtA.self peerB := dhtB.self diff --git a/routing/dht/ext_test.go b/routing/dht/ext_test.go index b4b1158d7..8441c1f72 100644 --- a/routing/dht/ext_test.go +++ b/routing/dht/ext_test.go @@ -47,9 +47,8 @@ func TestGetFailures(t *testing.T) { t.Fatal("Did not get expected error!") } - msgs := make(chan *pb.Message, 100) + t.Log("Timeout test passed.") - // u.POut("NotFound Test\n") // Reply with failures to every message nets[1].SetHandler(inet.ProtocolDHT, func(s inet.Stream) { defer s.Close() @@ -68,8 +67,6 @@ func TestGetFailures(t *testing.T) { if err := pbw.WriteMsg(resp); err != nil { panic(err) } - - msgs <- resp }) // This one should fail with NotFound @@ -83,6 +80,8 @@ func TestGetFailures(t *testing.T) { t.Fatal("expected error, got none.") } + t.Log("ErrNotFound check passed!") + // Now we test this DHT's handleGetValue failure { typ := pb.Message_GET_VALUE diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index 5aec6c2ff..e8edaa5eb 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -93,7 +93,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess } // Find closest peer on given cluster to desired key and reply with that info - closer := dht.betterPeersToQuery(pmes, CloserPeerCount) + closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount) closerinfos := peer.PeerInfos(dht.peerstore, closer) if closer != nil { for _, pi := range closerinfos { @@ -137,6 +137,9 @@ func (dht *IpfsDHT) handlePing(_ context.Context, p peer.ID, pmes *pb.Message) ( } func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { + log.Errorf("handle find peer %s start", p) + defer log.Errorf("handle find peer %s end", p) + resp := pb.NewMessage(pmes.GetType(), "", pmes.GetClusterLevel()) var closest []peer.ID @@ -144,11 +147,11 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Mess if peer.ID(pmes.GetKey()) == dht.self { closest = []peer.ID{dht.self} } else { - closest = dht.betterPeersToQuery(pmes, CloserPeerCount) + closest = dht.betterPeersToQuery(pmes, p, CloserPeerCount) } if closest == nil { - log.Debugf("handleFindPeer: could not find anything.") + log.Warningf("handleFindPeer: could not find anything.") return resp, nil } @@ -189,7 +192,7 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb. } // Also send closer peers. - closer := dht.betterPeersToQuery(pmes, CloserPeerCount) + closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount) if closer != nil { infos := peer.PeerInfos(dht.peerstore, providers) resp.CloserPeers = pb.PeerInfosToPBPeers(dht.network, infos) diff --git a/routing/dht/query.go b/routing/dht/query.go index 6a7bb687d..4790e814c 100644 --- a/routing/dht/query.go +++ b/routing/dht/query.go @@ -7,8 +7,8 @@ import ( peer "github.com/jbenet/go-ipfs/peer" queue "github.com/jbenet/go-ipfs/peer/queue" "github.com/jbenet/go-ipfs/routing" - kb "github.com/jbenet/go-ipfs/routing/kbucket" u "github.com/jbenet/go-ipfs/util" + pset "github.com/jbenet/go-ipfs/util/peerset" todoctr "github.com/jbenet/go-ipfs/util/todocounter" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" @@ -71,7 +71,7 @@ type dhtQueryRunner struct { peersToQuery *queue.ChanQueue // peersSeen are all the peers queried. used to prevent querying same peer 2x - peersSeen peer.Set + peersSeen *pset.PeerSet // rateLimit is a channel used to rate limit our processing (semaphore) rateLimit chan struct{} @@ -97,7 +97,7 @@ func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner { query: q, peersToQuery: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(q.key)), peersRemaining: todoctr.NewSyncCounter(), - peersSeen: peer.Set{}, + peersSeen: pset.New(), rateLimit: make(chan struct{}, q.concurrency), cg: ctxgroup.WithContext(ctx), } @@ -117,7 +117,7 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) { // add all the peers we got first. for _, p := range peers { - r.addPeerToQuery(r.cg.Context(), p, "") // don't have access to self here... + r.addPeerToQuery(r.cg.Context(), p) } // go do this thing. @@ -153,32 +153,17 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) { return nil, err } -func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID, benchmark peer.ID) { +func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID) { // if new peer is ourselves... if next == r.query.dialer.LocalPeer() { return } - // if new peer further away than whom we got it from, don't bother (loops) - // TODO----------- this benchmark should be replaced by a heap: - // we should be doing the s/kademlia "continue to search" - // (i.e. put all of them in a heap sorted by dht distance and then just - // pull from the the top until a) you exhaust all peers you get, - // b) you succeed, c) your context expires. - if benchmark != "" && kb.Closer(benchmark, next, r.query.key) { + if !r.peersSeen.TryAdd(next) { + log.Debug("query peer was already seen") return } - // if already seen, no need. - r.Lock() - _, found := r.peersSeen[next] - if found { - r.Unlock() - return - } - r.peersSeen[next] = struct{}{} - r.Unlock() - log.Debugf("adding peer to query: %v", next) // do this after unlocking to prevent possible deadlocks. @@ -278,7 +263,7 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) { } r.query.dialer.Peerstore().AddAddresses(next.ID, next.Addrs) - r.addPeerToQuery(cg.Context(), next.ID, p) + r.addPeerToQuery(cg.Context(), next.ID) log.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs) } } else { diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 34108f076..0cd5751a1 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -40,19 +40,24 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error return err } - peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue) + pchan, err := dht.getClosestPeers(ctx, key, KValue) + if err != nil { + return err + } - query := newQuery(key, dht.network, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { - log.Debugf("%s PutValue qry part %v", dht.self, p) - err := dht.putValueToNetwork(ctx, p, string(key), rec) - if err != nil { - return nil, err - } - return &dhtQueryResult{success: true}, nil - }) - - _, err = query.Run(ctx, peers) - return err + wg := sync.WaitGroup{} + for p := range pchan { + wg.Add(1) + go func(p peer.ID) { + defer wg.Done() + err := dht.putValueToNetwork(ctx, p, key, rec) + if err != nil { + log.Errorf("failed putting value to peer: %s", err) + } + }(p) + } + wg.Wait() + return nil } // GetValue searches for the value corresponding to given Key. @@ -111,18 +116,19 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { // Provide makes this node announce that it can provide a value for the given key func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error { + log.Event(ctx, "Provide Value start", &key) + defer log.Event(ctx, "Provide Value end", &key) dht.providers.AddProvider(key, dht.self) - peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), PoolSize) - if len(peers) == 0 { - return nil + + peers, err := dht.getClosestPeers(ctx, key, KValue) + if err != nil { + return err } - //TODO FIX: this doesn't work! it needs to be sent to the actual nearest peers. - // `peers` are the closest peers we have, not the ones that should get the value. - for _, p := range peers { + for p := range peers { err := dht.putProvider(ctx, p, string(key)) if err != nil { - return err + log.Error(err) } } return nil @@ -137,6 +143,87 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]peer.PeerIn return providers, nil } +func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key, count int) (<-chan peer.ID, error) { + log.Error("Get Closest Peers") + tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) + if len(tablepeers) == 0 { + return nil, kb.ErrLookupFailure + } + + out := make(chan peer.ID, count) + peerset := pset.NewLimited(count) + + for _, p := range tablepeers { + out <- p + peerset.Add(p) + } + + wg := sync.WaitGroup{} + for _, p := range tablepeers { + wg.Add(1) + go func(p peer.ID) { + dht.getClosestPeersRecurse(ctx, key, p, peerset, out) + wg.Done() + }(p) + } + + go func() { + wg.Wait() + close(out) + log.Error("Closing closest peer chan") + }() + + return out, nil +} + +func (dht *IpfsDHT) getClosestPeersRecurse(ctx context.Context, key u.Key, p peer.ID, peers *pset.PeerSet, peerOut chan<- peer.ID) { + log.Error("closest peers recurse") + defer log.Error("closest peers recurse end") + closer, err := dht.closerPeersSingle(ctx, key, p) + if err != nil { + log.Errorf("error getting closer peers: %s", err) + return + } + + wg := sync.WaitGroup{} + for _, p := range closer { + if kb.Closer(p, dht.self, key) && peers.TryAdd(p) { + select { + case peerOut <- p: + case <-ctx.Done(): + return + } + wg.Add(1) + go func(p peer.ID) { + dht.getClosestPeersRecurse(ctx, key, p, peers, peerOut) + wg.Done() + }(p) + } + } + wg.Wait() +} + +func (dht *IpfsDHT) closerPeersSingle(ctx context.Context, key u.Key, p peer.ID) ([]peer.ID, error) { + log.Errorf("closest peers single %s %s", p, key) + defer log.Errorf("closest peers single end %s %s", p, key) + pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key)) + if err != nil { + return nil, err + } + + var out []peer.ID + for _, pbp := range pmes.GetCloserPeers() { + pid := peer.ID(pbp.GetId()) + dht.peerstore.AddAddresses(pid, pbp.Addresses()) + err := dht.ensureConnectedToPeer(ctx, pid) + if err != nil { + return nil, err + } + out = append(out, pid) + } + return out, nil +} + // FindProvidersAsync is the same thing as FindProviders, but returns a channel. // Peers will be returned on the channel as soon as they are found, even before // the search query completes. @@ -182,6 +269,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co // Add unique providers from request, up to 'count' for _, prov := range provs { if ps.TryAdd(prov.ID) { + dht.peerstore.AddAddresses(prov.ID, prov.Addrs) select { case peerOut <- prov: case <-ctx.Done(): diff --git a/routing/kbucket/sorting.go b/routing/kbucket/sorting.go new file mode 100644 index 000000000..a3a68767b --- /dev/null +++ b/routing/kbucket/sorting.go @@ -0,0 +1,59 @@ +package kbucket + +import ( + "container/list" + peer "github.com/jbenet/go-ipfs/peer" + "sort" +) + +// A helper struct to sort peers by their distance to the local node +type peerDistance struct { + p peer.ID + distance ID +} + +// peerSorterArr implements sort.Interface to sort peers by xor distance +type peerSorterArr []*peerDistance + +func (p peerSorterArr) Len() int { return len(p) } +func (p peerSorterArr) Swap(a, b int) { p[a], p[b] = p[b], p[a] } +func (p peerSorterArr) Less(a, b int) bool { + return p[a].distance.less(p[b].distance) +} + +// + +func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) peerSorterArr { + for e := peerList.Front(); e != nil; e = e.Next() { + p := e.Value.(peer.ID) + pID := ConvertPeerID(p) + pd := peerDistance{ + p: p, + distance: xor(target, pID), + } + peerArr = append(peerArr, &pd) + if e == nil { + log.Debug("list element was nil") + return peerArr + } + } + return peerArr +} + +func SortClosestPeers(peers []peer.ID, target ID) []peer.ID { + var psarr peerSorterArr + for _, p := range peers { + pID := ConvertPeerID(p) + pd := &peerDistance{ + p: p, + distance: xor(target, pID), + } + psarr = append(psarr, pd) + } + sort.Sort(psarr) + var out []peer.ID + for _, p := range psarr { + out = append(out, p.p) + } + return out +} diff --git a/routing/kbucket/table.go b/routing/kbucket/table.go index bed7447a5..90ba65530 100644 --- a/routing/kbucket/table.go +++ b/routing/kbucket/table.go @@ -2,7 +2,6 @@ package kbucket import ( - "container/list" "fmt" "sort" "sync" @@ -103,40 +102,6 @@ func (rt *RoutingTable) nextBucket() peer.ID { return "" } -// A helper struct to sort peers by their distance to the local node -type peerDistance struct { - p peer.ID - distance ID -} - -// peerSorterArr implements sort.Interface to sort peers by xor distance -type peerSorterArr []*peerDistance - -func (p peerSorterArr) Len() int { return len(p) } -func (p peerSorterArr) Swap(a, b int) { p[a], p[b] = p[b], p[a] } -func (p peerSorterArr) Less(a, b int) bool { - return p[a].distance.less(p[b].distance) -} - -// - -func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) peerSorterArr { - for e := peerList.Front(); e != nil; e = e.Next() { - p := e.Value.(peer.ID) - pID := ConvertPeerID(p) - pd := peerDistance{ - p: p, - distance: xor(target, pID), - } - peerArr = append(peerArr, &pd) - if e == nil { - log.Debug("list element was nil") - return peerArr - } - } - return peerArr -} - // Find a specific peer by ID or return nil func (rt *RoutingTable) Find(id peer.ID) peer.ID { srch := rt.NearestPeers(ConvertPeerID(id), 1) diff --git a/util/testutil/gen.go b/util/testutil/gen.go index 59dd862a8..aa06047b2 100644 --- a/util/testutil/gen.go +++ b/util/testutil/gen.go @@ -17,7 +17,11 @@ import ( ) func RandKeyPair(bits int) (ci.PrivKey, ci.PubKey, error) { - return ci.GenerateKeyPair(ci.RSA, bits) + return ci.GenerateKeyPair(ci.RSA, bits, crand.Reader) +} + +func SeededKeyPair(bits int, seed int64) (ci.PrivKey, ci.PubKey, error) { + return ci.GenerateKeyPair(ci.RSA, bits, u.NewSeededRand(seed)) } // RandPeerID generates random "valid" peer IDs. it does not NEED to generate @@ -120,7 +124,7 @@ func RandPeerNetParams() (*PeerNetParams, error) { var p PeerNetParams var err error p.Addr = RandLocalTCPAddress() - p.PrivKey, p.PubKey, err = ci.GenerateKeyPair(ci.RSA, 512) + p.PrivKey, p.PubKey, err = ci.GenerateKeyPair(ci.RSA, 512, u.NewTimeSeededRand()) if err != nil { return nil, err } diff --git a/util/util.go b/util/util.go index 141cd8cba..32bc314ab 100644 --- a/util/util.go +++ b/util/util.go @@ -107,6 +107,13 @@ func NewTimeSeededRand() io.Reader { } } +func NewSeededRand(seed int64) io.Reader { + src := rand.NewSource(seed) + return &randGen{ + Rand: *rand.New(src), + } +} + func (r *randGen) Read(p []byte) (n int, err error) { for i := 0; i < len(p); i++ { p[i] = byte(r.Rand.Intn(255))