diff --git a/core/mock.go b/core/mock.go index a0ce6a460..758b30588 100644 --- a/core/mock.go +++ b/core/mock.go @@ -2,6 +2,7 @@ package core import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" syncds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" "github.com/jbenet/go-ipfs/blocks/blockstore" diff --git a/crypto/key.go b/crypto/key.go index 67a316527..df9a8b512 100644 --- a/crypto/key.go +++ b/crypto/key.go @@ -8,6 +8,7 @@ import ( "encoding/base64" "errors" "fmt" + "io" "crypto/elliptic" "crypto/hmac" @@ -74,11 +75,15 @@ type PubKey interface { // Given a public key, generates the shared key. type GenSharedKey func([]byte) ([]byte, error) -// Generates a keypair of the given type and bitsize func GenerateKeyPair(typ, bits int) (PrivKey, PubKey, error) { + return GenerateKeyPairWithReader(typ, bits, rand.Reader) +} + +// Generates a keypair of the given type and bitsize +func GenerateKeyPairWithReader(typ, bits int, src io.Reader) (PrivKey, PubKey, error) { switch typ { case RSA: - priv, err := rsa.GenerateKey(rand.Reader, bits) + priv, err := rsa.GenerateKey(src, bits) if err != nil { return nil, nil, err } diff --git a/crypto/key_test.go b/crypto/key_test.go index 1300be8f8..16b13a2f8 100644 --- a/crypto/key_test.go +++ b/crypto/key_test.go @@ -1,12 +1,15 @@ -package crypto +package crypto_test import ( + . "github.com/jbenet/go-ipfs/crypto" + "bytes" + tu "github.com/jbenet/go-ipfs/util/testutil" "testing" ) func TestRsaKeys(t *testing.T) { - sk, pk, err := GenerateKeyPair(RSA, 512) + sk, pk, err := tu.RandKeyPair(512) if err != nil { t.Fatal(err) } @@ -90,7 +93,7 @@ func testKeyEquals(t *testing.T, k Key) { t.Fatal("Key not equal to key with same bytes.") } - sk, pk, err := GenerateKeyPair(RSA, 512) + sk, pk, err := tu.RandKeyPair(512) if err != nil { t.Fatal(err) } diff --git a/namesys/resolve_test.go b/namesys/resolve_test.go index 84e4f1cb6..35851fc32 100644 --- a/namesys/resolve_test.go +++ b/namesys/resolve_test.go @@ -3,7 +3,6 @@ package namesys import ( "testing" - ci "github.com/jbenet/go-ipfs/crypto" mockrouting "github.com/jbenet/go-ipfs/routing/mock" u "github.com/jbenet/go-ipfs/util" testutil "github.com/jbenet/go-ipfs/util/testutil" @@ -15,7 +14,7 @@ func TestRoutingResolve(t *testing.T) { resolver := NewRoutingResolver(d) publisher := NewRoutingPublisher(d) - privk, pubk, err := ci.GenerateKeyPair(ci.RSA, 512) + privk, pubk, err := testutil.RandKeyPair(512) if err != nil { t.Fatal(err) } diff --git a/net/mock/mock_net.go b/net/mock/mock_net.go index ffed78b33..2be03f836 100644 --- a/net/mock/mock_net.go +++ b/net/mock/mock_net.go @@ -41,7 +41,7 @@ func New(ctx context.Context) Mocknet { } func (mn *mocknet) GenPeer() (inet.Network, error) { - sk, _, err := testutil.RandKeyPair(512) + sk, _, err := testutil.SeededKeyPair(int64(len(mn.nets))) if err != nil { return nil, err } diff --git a/peer/peer_test.go b/peer/peer_test.go index 810df6218..e8d1d29f3 100644 --- a/peer/peer_test.go +++ b/peer/peer_test.go @@ -1,4 +1,4 @@ -package peer +package peer_test import ( "encoding/base64" @@ -7,7 +7,9 @@ import ( "testing" ic "github.com/jbenet/go-ipfs/crypto" + . "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" + tu "github.com/jbenet/go-ipfs/util/testutil" b58 "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-base58" ) @@ -39,7 +41,7 @@ type keyset struct { func (ks *keyset) generate() error { var err error - ks.sk, ks.pk, err = ic.GenerateKeyPair(ic.RSA, 1024) + ks.sk, ks.pk, err = tu.RandKeyPair(512) if err != nil { return err } diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 4cbf68e43..fb7c5a49b 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -102,10 +102,9 @@ func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.ID) error { return nil } -// putValueToNetwork stores the given key/value pair at the peer 'p' -// meaning: it sends a PUT_VALUE message to p -func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.ID, - key string, rec *pb.Record) error { +// putValueToPeer stores the given key/value pair at the peer 'p' +func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, + key u.Key, rec *pb.Record) error { pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0) pmes.Record = rec @@ -238,12 +237,12 @@ func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) { } // FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in. -func (dht *IpfsDHT) FindLocal(id peer.ID) (peer.PeerInfo, *kb.RoutingTable) { +func (dht *IpfsDHT) FindLocal(id peer.ID) peer.PeerInfo { p := dht.routingTable.Find(id) if p != "" { - return dht.peerstore.PeerInfo(p), dht.routingTable + return dht.peerstore.PeerInfo(p) } - return peer.PeerInfo{}, nil + return peer.PeerInfo{} } // findPeerSingle asks peer 'p' if they know where the peer with id 'id' is @@ -257,26 +256,6 @@ func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key u.Ke return dht.sendRequest(ctx, p, pmes) } -func (dht *IpfsDHT) addProviders(key u.Key, pbps []*pb.Message_Peer) []peer.ID { - peers := pb.PBPeersToPeerInfos(pbps) - - var provArr []peer.ID - for _, pi := range peers { - p := pi.ID - - // Dont add outselves to the list - if p == dht.self { - continue - } - - log.Debugf("%s adding provider: %s for %s", dht.self, p, key) - // TODO(jbenet) ensure providers is idempotent - dht.providers.AddProvider(key, p) - provArr = append(provArr, p) - } - return provArr -} - // nearestPeersToQuery returns the routing tables closest peers. func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID { key := u.Key(pmes.GetKey()) @@ -285,7 +264,7 @@ func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID { } // betterPeerToQuery returns nearestPeersToQuery, but iff closer than self. -func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.ID { +func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) []peer.ID { closer := dht.nearestPeersToQuery(pmes, count) // no node? nil @@ -302,11 +281,16 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.ID { } var filtered []peer.ID - for _, p := range closer { + for _, clp := range closer { + // Dont send a peer back themselves + if p == clp { + continue + } + // must all be closer than self key := u.Key(pmes.GetKey()) - if !kb.Closer(dht.self, p, key) { - filtered = append(filtered, p) + if !kb.Closer(dht.self, clp, key) { + filtered = append(filtered, clp) } } @@ -323,23 +307,6 @@ func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, p peer.ID) error return dht.network.DialPeer(ctx, p) } -//TODO: this should be smarter about which keys it selects. -func (dht *IpfsDHT) loadProvidableKeys() error { - kl, err := dht.datastore.KeyList() - if err != nil { - return err - } - for _, dsk := range kl { - k := u.KeyFromDsKey(dsk) - if len(k) == 0 { - log.Errorf("loadProvidableKeys error: %v", dsk) - } - - dht.providers.AddProvider(k, dht.self) - } - return nil -} - // PingRoutine periodically pings nearest neighbors. func (dht *IpfsDHT) PingRoutine(t time.Duration) { defer dht.Children().Done() diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index 5603c4d5c..5eeb3a2bc 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -14,7 +14,6 @@ import ( dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" - // ci "github.com/jbenet/go-ipfs/crypto" inet "github.com/jbenet/go-ipfs/net" peer "github.com/jbenet/go-ipfs/peer" routing "github.com/jbenet/go-ipfs/routing" @@ -35,7 +34,7 @@ func init() { func setupDHT(ctx context.Context, t *testing.T, addr ma.Multiaddr) *IpfsDHT { - sk, pk, err := testutil.RandKeyPair(512) + sk, pk, err := testutil.SeededKeyPair(time.Now().UnixNano()) if err != nil { t.Fatal(err) } @@ -487,12 +486,7 @@ func TestLayeredGet(t *testing.T) { connect(t, ctx, dhts[1], dhts[2]) connect(t, ctx, dhts[1], dhts[3]) - err := dhts[3].putLocal(u.Key("/v/hello"), []byte("world")) - if err != nil { - t.Fatal(err) - } - - err = dhts[3].Provide(ctx, u.Key("/v/hello")) + err := dhts[3].Provide(ctx, u.Key("/v/hello")) if err != nil { t.Fatal(err) } diff --git a/routing/dht/ext_test.go b/routing/dht/ext_test.go index b4b1158d7..8441c1f72 100644 --- a/routing/dht/ext_test.go +++ b/routing/dht/ext_test.go @@ -47,9 +47,8 @@ func TestGetFailures(t *testing.T) { t.Fatal("Did not get expected error!") } - msgs := make(chan *pb.Message, 100) + t.Log("Timeout test passed.") - // u.POut("NotFound Test\n") // Reply with failures to every message nets[1].SetHandler(inet.ProtocolDHT, func(s inet.Stream) { defer s.Close() @@ -68,8 +67,6 @@ func TestGetFailures(t *testing.T) { if err := pbw.WriteMsg(resp); err != nil { panic(err) } - - msgs <- resp }) // This one should fail with NotFound @@ -83,6 +80,8 @@ func TestGetFailures(t *testing.T) { t.Fatal("expected error, got none.") } + t.Log("ErrNotFound check passed!") + // Now we test this DHT's handleGetValue failure { typ := pb.Message_GET_VALUE diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index 5aec6c2ff..acb052248 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -39,7 +39,7 @@ func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler { } func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { - log.Debugf("%s handleGetValue for key: %s\n", dht.self, pmes.GetKey()) + log.Debugf("%s handleGetValue for key: %s", dht.self, pmes.GetKey()) // setup response resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel()) @@ -93,7 +93,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess } // Find closest peer on given cluster to desired key and reply with that info - closer := dht.betterPeersToQuery(pmes, CloserPeerCount) + closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount) closerinfos := peer.PeerInfos(dht.peerstore, closer) if closer != nil { for _, pi := range closerinfos { @@ -127,7 +127,7 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess } err = dht.datastore.Put(dskey, data) - log.Debugf("%s handlePutValue %v\n", dht.self, dskey) + log.Debugf("%s handlePutValue %v", dht.self, dskey) return pmes, err } @@ -144,11 +144,11 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Mess if peer.ID(pmes.GetKey()) == dht.self { closest = []peer.ID{dht.self} } else { - closest = dht.betterPeersToQuery(pmes, CloserPeerCount) + closest = dht.betterPeersToQuery(pmes, p, CloserPeerCount) } if closest == nil { - log.Debugf("handleFindPeer: could not find anything.") + log.Warningf("handleFindPeer: could not find anything.") return resp, nil } @@ -189,7 +189,7 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb. } // Also send closer peers. - closer := dht.betterPeersToQuery(pmes, CloserPeerCount) + closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount) if closer != nil { infos := peer.PeerInfos(dht.peerstore, providers) resp.CloserPeers = pb.PeerInfosToPBPeers(dht.network, infos) diff --git a/routing/dht/query.go b/routing/dht/query.go index 6a7bb687d..4790e814c 100644 --- a/routing/dht/query.go +++ b/routing/dht/query.go @@ -7,8 +7,8 @@ import ( peer "github.com/jbenet/go-ipfs/peer" queue "github.com/jbenet/go-ipfs/peer/queue" "github.com/jbenet/go-ipfs/routing" - kb "github.com/jbenet/go-ipfs/routing/kbucket" u "github.com/jbenet/go-ipfs/util" + pset "github.com/jbenet/go-ipfs/util/peerset" todoctr "github.com/jbenet/go-ipfs/util/todocounter" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" @@ -71,7 +71,7 @@ type dhtQueryRunner struct { peersToQuery *queue.ChanQueue // peersSeen are all the peers queried. used to prevent querying same peer 2x - peersSeen peer.Set + peersSeen *pset.PeerSet // rateLimit is a channel used to rate limit our processing (semaphore) rateLimit chan struct{} @@ -97,7 +97,7 @@ func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner { query: q, peersToQuery: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(q.key)), peersRemaining: todoctr.NewSyncCounter(), - peersSeen: peer.Set{}, + peersSeen: pset.New(), rateLimit: make(chan struct{}, q.concurrency), cg: ctxgroup.WithContext(ctx), } @@ -117,7 +117,7 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) { // add all the peers we got first. for _, p := range peers { - r.addPeerToQuery(r.cg.Context(), p, "") // don't have access to self here... + r.addPeerToQuery(r.cg.Context(), p) } // go do this thing. @@ -153,32 +153,17 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) { return nil, err } -func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID, benchmark peer.ID) { +func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID) { // if new peer is ourselves... if next == r.query.dialer.LocalPeer() { return } - // if new peer further away than whom we got it from, don't bother (loops) - // TODO----------- this benchmark should be replaced by a heap: - // we should be doing the s/kademlia "continue to search" - // (i.e. put all of them in a heap sorted by dht distance and then just - // pull from the the top until a) you exhaust all peers you get, - // b) you succeed, c) your context expires. - if benchmark != "" && kb.Closer(benchmark, next, r.query.key) { + if !r.peersSeen.TryAdd(next) { + log.Debug("query peer was already seen") return } - // if already seen, no need. - r.Lock() - _, found := r.peersSeen[next] - if found { - r.Unlock() - return - } - r.peersSeen[next] = struct{}{} - r.Unlock() - log.Debugf("adding peer to query: %v", next) // do this after unlocking to prevent possible deadlocks. @@ -278,7 +263,7 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) { } r.query.dialer.Peerstore().AddAddresses(next.ID, next.Addrs) - r.addPeerToQuery(cg.Context(), next.ID, p) + r.addPeerToQuery(cg.Context(), next.ID) log.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs) } } else { diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 34108f076..36e281cc9 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -40,19 +40,24 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error return err } - peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue) + pchan, err := dht.getClosestPeers(ctx, key) + if err != nil { + return err + } - query := newQuery(key, dht.network, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { - log.Debugf("%s PutValue qry part %v", dht.self, p) - err := dht.putValueToNetwork(ctx, p, string(key), rec) - if err != nil { - return nil, err - } - return &dhtQueryResult{success: true}, nil - }) - - _, err = query.Run(ctx, peers) - return err + wg := sync.WaitGroup{} + for p := range pchan { + wg.Add(1) + go func(p peer.ID) { + defer wg.Done() + err := dht.putValueToPeer(ctx, p, key, rec) + if err != nil { + log.Errorf("failed putting value to peer: %s", err) + } + }(p) + } + wg.Wait() + return nil } // GetValue searches for the value corresponding to given Key. @@ -111,20 +116,27 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { // Provide makes this node announce that it can provide a value for the given key func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error { + log.Event(ctx, "provideBegin", &key) + defer log.Event(ctx, "provideEnd", &key) dht.providers.AddProvider(key, dht.self) - peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), PoolSize) - if len(peers) == 0 { - return nil + + peers, err := dht.getClosestPeers(ctx, key) + if err != nil { + return err } - //TODO FIX: this doesn't work! it needs to be sent to the actual nearest peers. - // `peers` are the closest peers we have, not the ones that should get the value. - for _, p := range peers { - err := dht.putProvider(ctx, p, string(key)) - if err != nil { - return err - } + wg := sync.WaitGroup{} + for p := range peers { + wg.Add(1) + go func(p peer.ID) { + defer wg.Done() + err := dht.putProvider(ctx, p, string(key)) + if err != nil { + log.Error(err) + } + }(p) } + wg.Wait() return nil } @@ -137,6 +149,75 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]peer.PeerIn return providers, nil } +// Kademlia 'node lookup' operation. Returns a channel of the K closest peers +// to the given key +func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key) (<-chan peer.ID, error) { + tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) + if len(tablepeers) == 0 { + return nil, kb.ErrLookupFailure + } + + out := make(chan peer.ID, KValue) + peerset := pset.NewLimited(KValue) + + for _, p := range tablepeers { + select { + case out <- p: + case <-ctx.Done(): + return nil, ctx.Err() + } + peerset.Add(p) + } + + query := newQuery(key, dht.network, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { + closer, err := dht.closerPeersSingle(ctx, key, p) + if err != nil { + log.Errorf("error getting closer peers: %s", err) + return nil, err + } + + var filtered []peer.PeerInfo + for _, p := range closer { + if kb.Closer(p, dht.self, key) && peerset.TryAdd(p) { + select { + case out <- p: + case <-ctx.Done(): + return nil, ctx.Err() + } + filtered = append(filtered, dht.peerstore.PeerInfo(p)) + } + } + + return &dhtQueryResult{closerPeers: filtered}, nil + }) + + go func() { + defer close(out) + // run it! + _, err := query.Run(ctx, tablepeers) + if err != nil { + log.Errorf("closestPeers query run error: %s", err) + } + }() + + return out, nil +} + +func (dht *IpfsDHT) closerPeersSingle(ctx context.Context, key u.Key, p peer.ID) ([]peer.ID, error) { + pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key)) + if err != nil { + return nil, err + } + + var out []peer.ID + for _, pbp := range pmes.GetCloserPeers() { + pid := peer.ID(pbp.GetId()) + dht.peerstore.AddAddresses(pid, pbp.Addresses()) + out = append(out, pid) + } + return out, nil +} + // FindProvidersAsync is the same thing as FindProviders, but returns a channel. // Peers will be returned on the channel as soon as they are found, even before // the search query completes. @@ -149,6 +230,7 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, count int, peerOut chan peer.PeerInfo) { defer close(peerOut) + defer log.Event(ctx, "findProviders end", &key) log.Debugf("%s FindProviders %s", dht.self, key) ps := pset.NewLimited(count) @@ -207,40 +289,11 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co } } -func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.Message_Peer, ps *pset.PeerSet, count int, out chan peer.PeerInfo) { - var wg sync.WaitGroup - peerInfos := pb.PBPeersToPeerInfos(peers) - for _, pi := range peerInfos { - wg.Add(1) - go func(pi peer.PeerInfo) { - defer wg.Done() - - p := pi.ID - if err := dht.ensureConnectedToPeer(ctx, p); err != nil { - log.Errorf("%s", err) - return - } - - dht.providers.AddProvider(k, p) - if ps.TryAdd(p) { - select { - case out <- pi: - case <-ctx.Done(): - return - } - } else if ps.Size() >= count { - return - } - }(pi) - } - wg.Wait() -} - // FindPeer searches for a peer with given ID. func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, error) { // Check if were already connected to them - if pi, _ := dht.FindLocal(id); pi.ID != "" { + if pi := dht.FindLocal(id); pi.ID != "" { return pi, nil } diff --git a/routing/kbucket/sorting.go b/routing/kbucket/sorting.go new file mode 100644 index 000000000..a3a68767b --- /dev/null +++ b/routing/kbucket/sorting.go @@ -0,0 +1,59 @@ +package kbucket + +import ( + "container/list" + peer "github.com/jbenet/go-ipfs/peer" + "sort" +) + +// A helper struct to sort peers by their distance to the local node +type peerDistance struct { + p peer.ID + distance ID +} + +// peerSorterArr implements sort.Interface to sort peers by xor distance +type peerSorterArr []*peerDistance + +func (p peerSorterArr) Len() int { return len(p) } +func (p peerSorterArr) Swap(a, b int) { p[a], p[b] = p[b], p[a] } +func (p peerSorterArr) Less(a, b int) bool { + return p[a].distance.less(p[b].distance) +} + +// + +func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) peerSorterArr { + for e := peerList.Front(); e != nil; e = e.Next() { + p := e.Value.(peer.ID) + pID := ConvertPeerID(p) + pd := peerDistance{ + p: p, + distance: xor(target, pID), + } + peerArr = append(peerArr, &pd) + if e == nil { + log.Debug("list element was nil") + return peerArr + } + } + return peerArr +} + +func SortClosestPeers(peers []peer.ID, target ID) []peer.ID { + var psarr peerSorterArr + for _, p := range peers { + pID := ConvertPeerID(p) + pd := &peerDistance{ + p: p, + distance: xor(target, pID), + } + psarr = append(psarr, pd) + } + sort.Sort(psarr) + var out []peer.ID + for _, p := range psarr { + out = append(out, p.p) + } + return out +} diff --git a/routing/kbucket/table.go b/routing/kbucket/table.go index bed7447a5..90ba65530 100644 --- a/routing/kbucket/table.go +++ b/routing/kbucket/table.go @@ -2,7 +2,6 @@ package kbucket import ( - "container/list" "fmt" "sort" "sync" @@ -103,40 +102,6 @@ func (rt *RoutingTable) nextBucket() peer.ID { return "" } -// A helper struct to sort peers by their distance to the local node -type peerDistance struct { - p peer.ID - distance ID -} - -// peerSorterArr implements sort.Interface to sort peers by xor distance -type peerSorterArr []*peerDistance - -func (p peerSorterArr) Len() int { return len(p) } -func (p peerSorterArr) Swap(a, b int) { p[a], p[b] = p[b], p[a] } -func (p peerSorterArr) Less(a, b int) bool { - return p[a].distance.less(p[b].distance) -} - -// - -func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) peerSorterArr { - for e := peerList.Front(); e != nil; e = e.Next() { - p := e.Value.(peer.ID) - pID := ConvertPeerID(p) - pd := peerDistance{ - p: p, - distance: xor(target, pID), - } - peerArr = append(peerArr, &pd) - if e == nil { - log.Debug("list element was nil") - return peerArr - } - } - return peerArr -} - // Find a specific peer by ID or return nil func (rt *RoutingTable) Find(id peer.ID) peer.ID { srch := rt.NearestPeers(ConvertPeerID(id), 1) diff --git a/util/testutil/gen.go b/util/testutil/gen.go index 59dd862a8..bcf1f283e 100644 --- a/util/testutil/gen.go +++ b/util/testutil/gen.go @@ -17,7 +17,11 @@ import ( ) func RandKeyPair(bits int) (ci.PrivKey, ci.PubKey, error) { - return ci.GenerateKeyPair(ci.RSA, bits) + return ci.GenerateKeyPairWithReader(ci.RSA, bits, u.NewTimeSeededRand()) +} + +func SeededKeyPair(seed int64) (ci.PrivKey, ci.PubKey, error) { + return ci.GenerateKeyPairWithReader(ci.RSA, 512, u.NewSeededRand(seed)) } // RandPeerID generates random "valid" peer IDs. it does not NEED to generate @@ -120,7 +124,7 @@ func RandPeerNetParams() (*PeerNetParams, error) { var p PeerNetParams var err error p.Addr = RandLocalTCPAddress() - p.PrivKey, p.PubKey, err = ci.GenerateKeyPair(ci.RSA, 512) + p.PrivKey, p.PubKey, err = RandKeyPair(512) if err != nil { return nil, err } diff --git a/util/util.go b/util/util.go index 141cd8cba..32bc314ab 100644 --- a/util/util.go +++ b/util/util.go @@ -107,6 +107,13 @@ func NewTimeSeededRand() io.Reader { } } +func NewSeededRand(seed int64) io.Reader { + src := rand.NewSource(seed) + return &randGen{ + Rand: *rand.New(src), + } +} + func (r *randGen) Read(p []byte) (n int, err error) { for i := 0; i < len(p); i++ { p[i] = byte(r.Rand.Intn(255))