From 07b064010e584f046234681de6f47d79d09b9689 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sun, 14 Dec 2014 00:50:49 +0000 Subject: [PATCH 1/8] rewrite of provides to better select peers to send RPCs to refactor test peer creation to be deterministic and reliable a bit of cleanup trying to figure out TestGetFailure add test to verify deterministic peer creation switch put RPC over to use getClosestPeers rm 0xDEADC0DE fix queries not searching peer if its not actually closer --- cmd/ipfs/init.go | 3 +- cmd/seccat/seccat.go | 2 +- core/mock.go | 4 +- crypto/key.go | 5 +- crypto/key_test.go | 5 +- namesys/resolve_test.go | 2 +- net/mock/mock_net.go | 2 +- routing/dht/dht.go | 33 +++------- routing/dht/dht_test.go | 19 +++--- routing/dht/ext_test.go | 7 +-- routing/dht/handlers.go | 11 ++-- routing/dht/query.go | 31 +++------ routing/dht/routing.go | 126 +++++++++++++++++++++++++++++++------ routing/kbucket/sorting.go | 59 +++++++++++++++++ routing/kbucket/table.go | 35 ----------- util/testutil/gen.go | 8 ++- util/util.go | 7 +++ 17 files changed, 230 insertions(+), 129 deletions(-) create mode 100644 routing/kbucket/sorting.go diff --git a/cmd/ipfs/init.go b/cmd/ipfs/init.go index bc609659f..62415f3a7 100644 --- a/cmd/ipfs/init.go +++ b/cmd/ipfs/init.go @@ -2,6 +2,7 @@ package main import ( "bytes" + "crypto/rand" "encoding/base64" "fmt" "os" @@ -252,7 +253,7 @@ func identityConfig(nbits int) (config.Identity, error) { } fmt.Printf("generating key pair...") - sk, pk, err := ci.GenerateKeyPair(ci.RSA, nbits) + sk, pk, err := ci.GenerateKeyPair(ci.RSA, nbits, rand.Reader) if err != nil { return ident, err } diff --git a/cmd/seccat/seccat.go b/cmd/seccat/seccat.go index 625f64ec0..a858593e2 100644 --- a/cmd/seccat/seccat.go +++ b/cmd/seccat/seccat.go @@ -115,7 +115,7 @@ func setupPeer(a args) (peer.ID, peer.Peerstore, error) { } out("generating key pair...") - sk, pk, err := ci.GenerateKeyPair(ci.RSA, a.keybits) + sk, pk, err := ci.GenerateKeyPair(ci.RSA, a.keybits, u.NewTimeSeededRand()) if err != nil { return "", nil, err } diff --git a/core/mock.go b/core/mock.go index a0ce6a460..92946276c 100644 --- a/core/mock.go +++ b/core/mock.go @@ -1,7 +1,9 @@ package core import ( + "crypto/rand" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" syncds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" "github.com/jbenet/go-ipfs/blocks/blockstore" @@ -28,7 +30,7 @@ func NewMockNode() (*IpfsNode, error) { nd := new(IpfsNode) // Generate Identity - sk, pk, err := ci.GenerateKeyPair(ci.RSA, 1024) + sk, pk, err := ci.GenerateKeyPair(ci.RSA, 1024, rand.Reader) if err != nil { return nil, err } diff --git a/crypto/key.go b/crypto/key.go index 67a316527..fe687691e 100644 --- a/crypto/key.go +++ b/crypto/key.go @@ -8,6 +8,7 @@ import ( "encoding/base64" "errors" "fmt" + "io" "crypto/elliptic" "crypto/hmac" @@ -75,10 +76,10 @@ type PubKey interface { type GenSharedKey func([]byte) ([]byte, error) // Generates a keypair of the given type and bitsize -func GenerateKeyPair(typ, bits int) (PrivKey, PubKey, error) { +func GenerateKeyPair(typ, bits int, src io.Reader) (PrivKey, PubKey, error) { switch typ { case RSA: - priv, err := rsa.GenerateKey(rand.Reader, bits) + priv, err := rsa.GenerateKey(src, bits) if err != nil { return nil, nil, err } diff --git a/crypto/key_test.go b/crypto/key_test.go index 1300be8f8..d771ea648 100644 --- a/crypto/key_test.go +++ b/crypto/key_test.go @@ -2,11 +2,12 @@ package crypto import ( "bytes" + u "github.com/jbenet/go-ipfs/util" "testing" ) func TestRsaKeys(t *testing.T) { - sk, pk, err := GenerateKeyPair(RSA, 512) + sk, pk, err := GenerateKeyPair(RSA, 512, u.NewTimeSeededRand()) if err != nil { t.Fatal(err) } @@ -90,7 +91,7 @@ func testKeyEquals(t *testing.T, k Key) { t.Fatal("Key not equal to key with same bytes.") } - sk, pk, err := GenerateKeyPair(RSA, 512) + sk, pk, err := GenerateKeyPair(RSA, 512, u.NewTimeSeededRand()) if err != nil { t.Fatal(err) } diff --git a/namesys/resolve_test.go b/namesys/resolve_test.go index 84e4f1cb6..592c344d7 100644 --- a/namesys/resolve_test.go +++ b/namesys/resolve_test.go @@ -15,7 +15,7 @@ func TestRoutingResolve(t *testing.T) { resolver := NewRoutingResolver(d) publisher := NewRoutingPublisher(d) - privk, pubk, err := ci.GenerateKeyPair(ci.RSA, 512) + privk, pubk, err := ci.GenerateKeyPair(ci.RSA, 512, u.NewTimeSeededRand()) if err != nil { t.Fatal(err) } diff --git a/net/mock/mock_net.go b/net/mock/mock_net.go index ffed78b33..ab3359237 100644 --- a/net/mock/mock_net.go +++ b/net/mock/mock_net.go @@ -41,7 +41,7 @@ func New(ctx context.Context) Mocknet { } func (mn *mocknet) GenPeer() (inet.Network, error) { - sk, _, err := testutil.RandKeyPair(512) + sk, _, err := testutil.SeededKeyPair(512, int64(len(mn.nets))) if err != nil { return nil, err } diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 4cbf68e43..1573f3477 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -103,9 +103,8 @@ func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.ID) error { } // putValueToNetwork stores the given key/value pair at the peer 'p' -// meaning: it sends a PUT_VALUE message to p func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.ID, - key string, rec *pb.Record) error { + key u.Key, rec *pb.Record) error { pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0) pmes.Record = rec @@ -285,7 +284,7 @@ func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID { } // betterPeerToQuery returns nearestPeersToQuery, but iff closer than self. -func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.ID { +func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) []peer.ID { closer := dht.nearestPeersToQuery(pmes, count) // no node? nil @@ -302,11 +301,16 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.ID { } var filtered []peer.ID - for _, p := range closer { + for _, clp := range closer { + // Dont send a peer back themselves + if p == clp { + continue + } + // must all be closer than self key := u.Key(pmes.GetKey()) - if !kb.Closer(dht.self, p, key) { - filtered = append(filtered, p) + if !kb.Closer(dht.self, clp, key) { + filtered = append(filtered, clp) } } @@ -323,23 +327,6 @@ func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, p peer.ID) error return dht.network.DialPeer(ctx, p) } -//TODO: this should be smarter about which keys it selects. -func (dht *IpfsDHT) loadProvidableKeys() error { - kl, err := dht.datastore.KeyList() - if err != nil { - return err - } - for _, dsk := range kl { - k := u.KeyFromDsKey(dsk) - if len(k) == 0 { - log.Errorf("loadProvidableKeys error: %v", dsk) - } - - dht.providers.AddProvider(k, dht.self) - } - return nil -} - // PingRoutine periodically pings nearest neighbors. func (dht *IpfsDHT) PingRoutine(t time.Duration) { defer dht.Children().Done() diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index 5603c4d5c..bbc7b9692 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -14,7 +14,6 @@ import ( dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" - // ci "github.com/jbenet/go-ipfs/crypto" inet "github.com/jbenet/go-ipfs/net" peer "github.com/jbenet/go-ipfs/peer" routing "github.com/jbenet/go-ipfs/routing" @@ -33,9 +32,9 @@ func init() { } } -func setupDHT(ctx context.Context, t *testing.T, addr ma.Multiaddr) *IpfsDHT { +func setupDHT(ctx context.Context, t *testing.T, addr ma.Multiaddr, seed int64) *IpfsDHT { - sk, pk, err := testutil.RandKeyPair(512) + sk, pk, err := testutil.SeededKeyPair(512, seed) if err != nil { t.Fatal(err) } @@ -71,7 +70,7 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer for i := 0; i < n; i++ { addrs[i] = testutil.RandLocalTCPAddress() - dhts[i] = setupDHT(ctx, t, addrs[i]) + dhts[i] = setupDHT(ctx, t, addrs[i], int64(i)) peers[i] = dhts[i].self } @@ -120,8 +119,8 @@ func TestPing(t *testing.T) { addrA := testutil.RandLocalTCPAddress() addrB := testutil.RandLocalTCPAddress() - dhtA := setupDHT(ctx, t, addrA) - dhtB := setupDHT(ctx, t, addrB) + dhtA := setupDHT(ctx, t, addrA, 1) + dhtB := setupDHT(ctx, t, addrB, 2) peerA := dhtA.self peerB := dhtB.self @@ -153,8 +152,8 @@ func TestValueGetSet(t *testing.T) { addrA := testutil.RandLocalTCPAddress() addrB := testutil.RandLocalTCPAddress() - dhtA := setupDHT(ctx, t, addrA) - dhtB := setupDHT(ctx, t, addrB) + dhtA := setupDHT(ctx, t, addrA, 1) + dhtB := setupDHT(ctx, t, addrB, 2) defer dhtA.Close() defer dhtB.Close() @@ -642,8 +641,8 @@ func TestConnectCollision(t *testing.T) { addrA := testutil.RandLocalTCPAddress() addrB := testutil.RandLocalTCPAddress() - dhtA := setupDHT(ctx, t, addrA) - dhtB := setupDHT(ctx, t, addrB) + dhtA := setupDHT(ctx, t, addrA, int64((rtime*2)+1)) + dhtB := setupDHT(ctx, t, addrB, int64((rtime*2)+2)) peerA := dhtA.self peerB := dhtB.self diff --git a/routing/dht/ext_test.go b/routing/dht/ext_test.go index b4b1158d7..8441c1f72 100644 --- a/routing/dht/ext_test.go +++ b/routing/dht/ext_test.go @@ -47,9 +47,8 @@ func TestGetFailures(t *testing.T) { t.Fatal("Did not get expected error!") } - msgs := make(chan *pb.Message, 100) + t.Log("Timeout test passed.") - // u.POut("NotFound Test\n") // Reply with failures to every message nets[1].SetHandler(inet.ProtocolDHT, func(s inet.Stream) { defer s.Close() @@ -68,8 +67,6 @@ func TestGetFailures(t *testing.T) { if err := pbw.WriteMsg(resp); err != nil { panic(err) } - - msgs <- resp }) // This one should fail with NotFound @@ -83,6 +80,8 @@ func TestGetFailures(t *testing.T) { t.Fatal("expected error, got none.") } + t.Log("ErrNotFound check passed!") + // Now we test this DHT's handleGetValue failure { typ := pb.Message_GET_VALUE diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index 5aec6c2ff..e8edaa5eb 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -93,7 +93,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess } // Find closest peer on given cluster to desired key and reply with that info - closer := dht.betterPeersToQuery(pmes, CloserPeerCount) + closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount) closerinfos := peer.PeerInfos(dht.peerstore, closer) if closer != nil { for _, pi := range closerinfos { @@ -137,6 +137,9 @@ func (dht *IpfsDHT) handlePing(_ context.Context, p peer.ID, pmes *pb.Message) ( } func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { + log.Errorf("handle find peer %s start", p) + defer log.Errorf("handle find peer %s end", p) + resp := pb.NewMessage(pmes.GetType(), "", pmes.GetClusterLevel()) var closest []peer.ID @@ -144,11 +147,11 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Mess if peer.ID(pmes.GetKey()) == dht.self { closest = []peer.ID{dht.self} } else { - closest = dht.betterPeersToQuery(pmes, CloserPeerCount) + closest = dht.betterPeersToQuery(pmes, p, CloserPeerCount) } if closest == nil { - log.Debugf("handleFindPeer: could not find anything.") + log.Warningf("handleFindPeer: could not find anything.") return resp, nil } @@ -189,7 +192,7 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb. } // Also send closer peers. - closer := dht.betterPeersToQuery(pmes, CloserPeerCount) + closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount) if closer != nil { infos := peer.PeerInfos(dht.peerstore, providers) resp.CloserPeers = pb.PeerInfosToPBPeers(dht.network, infos) diff --git a/routing/dht/query.go b/routing/dht/query.go index 6a7bb687d..4790e814c 100644 --- a/routing/dht/query.go +++ b/routing/dht/query.go @@ -7,8 +7,8 @@ import ( peer "github.com/jbenet/go-ipfs/peer" queue "github.com/jbenet/go-ipfs/peer/queue" "github.com/jbenet/go-ipfs/routing" - kb "github.com/jbenet/go-ipfs/routing/kbucket" u "github.com/jbenet/go-ipfs/util" + pset "github.com/jbenet/go-ipfs/util/peerset" todoctr "github.com/jbenet/go-ipfs/util/todocounter" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" @@ -71,7 +71,7 @@ type dhtQueryRunner struct { peersToQuery *queue.ChanQueue // peersSeen are all the peers queried. used to prevent querying same peer 2x - peersSeen peer.Set + peersSeen *pset.PeerSet // rateLimit is a channel used to rate limit our processing (semaphore) rateLimit chan struct{} @@ -97,7 +97,7 @@ func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner { query: q, peersToQuery: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(q.key)), peersRemaining: todoctr.NewSyncCounter(), - peersSeen: peer.Set{}, + peersSeen: pset.New(), rateLimit: make(chan struct{}, q.concurrency), cg: ctxgroup.WithContext(ctx), } @@ -117,7 +117,7 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) { // add all the peers we got first. for _, p := range peers { - r.addPeerToQuery(r.cg.Context(), p, "") // don't have access to self here... + r.addPeerToQuery(r.cg.Context(), p) } // go do this thing. @@ -153,32 +153,17 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) { return nil, err } -func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID, benchmark peer.ID) { +func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID) { // if new peer is ourselves... if next == r.query.dialer.LocalPeer() { return } - // if new peer further away than whom we got it from, don't bother (loops) - // TODO----------- this benchmark should be replaced by a heap: - // we should be doing the s/kademlia "continue to search" - // (i.e. put all of them in a heap sorted by dht distance and then just - // pull from the the top until a) you exhaust all peers you get, - // b) you succeed, c) your context expires. - if benchmark != "" && kb.Closer(benchmark, next, r.query.key) { + if !r.peersSeen.TryAdd(next) { + log.Debug("query peer was already seen") return } - // if already seen, no need. - r.Lock() - _, found := r.peersSeen[next] - if found { - r.Unlock() - return - } - r.peersSeen[next] = struct{}{} - r.Unlock() - log.Debugf("adding peer to query: %v", next) // do this after unlocking to prevent possible deadlocks. @@ -278,7 +263,7 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) { } r.query.dialer.Peerstore().AddAddresses(next.ID, next.Addrs) - r.addPeerToQuery(cg.Context(), next.ID, p) + r.addPeerToQuery(cg.Context(), next.ID) log.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs) } } else { diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 34108f076..0cd5751a1 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -40,19 +40,24 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error return err } - peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue) + pchan, err := dht.getClosestPeers(ctx, key, KValue) + if err != nil { + return err + } - query := newQuery(key, dht.network, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { - log.Debugf("%s PutValue qry part %v", dht.self, p) - err := dht.putValueToNetwork(ctx, p, string(key), rec) - if err != nil { - return nil, err - } - return &dhtQueryResult{success: true}, nil - }) - - _, err = query.Run(ctx, peers) - return err + wg := sync.WaitGroup{} + for p := range pchan { + wg.Add(1) + go func(p peer.ID) { + defer wg.Done() + err := dht.putValueToNetwork(ctx, p, key, rec) + if err != nil { + log.Errorf("failed putting value to peer: %s", err) + } + }(p) + } + wg.Wait() + return nil } // GetValue searches for the value corresponding to given Key. @@ -111,18 +116,19 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { // Provide makes this node announce that it can provide a value for the given key func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error { + log.Event(ctx, "Provide Value start", &key) + defer log.Event(ctx, "Provide Value end", &key) dht.providers.AddProvider(key, dht.self) - peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), PoolSize) - if len(peers) == 0 { - return nil + + peers, err := dht.getClosestPeers(ctx, key, KValue) + if err != nil { + return err } - //TODO FIX: this doesn't work! it needs to be sent to the actual nearest peers. - // `peers` are the closest peers we have, not the ones that should get the value. - for _, p := range peers { + for p := range peers { err := dht.putProvider(ctx, p, string(key)) if err != nil { - return err + log.Error(err) } } return nil @@ -137,6 +143,87 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]peer.PeerIn return providers, nil } +func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key, count int) (<-chan peer.ID, error) { + log.Error("Get Closest Peers") + tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) + if len(tablepeers) == 0 { + return nil, kb.ErrLookupFailure + } + + out := make(chan peer.ID, count) + peerset := pset.NewLimited(count) + + for _, p := range tablepeers { + out <- p + peerset.Add(p) + } + + wg := sync.WaitGroup{} + for _, p := range tablepeers { + wg.Add(1) + go func(p peer.ID) { + dht.getClosestPeersRecurse(ctx, key, p, peerset, out) + wg.Done() + }(p) + } + + go func() { + wg.Wait() + close(out) + log.Error("Closing closest peer chan") + }() + + return out, nil +} + +func (dht *IpfsDHT) getClosestPeersRecurse(ctx context.Context, key u.Key, p peer.ID, peers *pset.PeerSet, peerOut chan<- peer.ID) { + log.Error("closest peers recurse") + defer log.Error("closest peers recurse end") + closer, err := dht.closerPeersSingle(ctx, key, p) + if err != nil { + log.Errorf("error getting closer peers: %s", err) + return + } + + wg := sync.WaitGroup{} + for _, p := range closer { + if kb.Closer(p, dht.self, key) && peers.TryAdd(p) { + select { + case peerOut <- p: + case <-ctx.Done(): + return + } + wg.Add(1) + go func(p peer.ID) { + dht.getClosestPeersRecurse(ctx, key, p, peers, peerOut) + wg.Done() + }(p) + } + } + wg.Wait() +} + +func (dht *IpfsDHT) closerPeersSingle(ctx context.Context, key u.Key, p peer.ID) ([]peer.ID, error) { + log.Errorf("closest peers single %s %s", p, key) + defer log.Errorf("closest peers single end %s %s", p, key) + pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key)) + if err != nil { + return nil, err + } + + var out []peer.ID + for _, pbp := range pmes.GetCloserPeers() { + pid := peer.ID(pbp.GetId()) + dht.peerstore.AddAddresses(pid, pbp.Addresses()) + err := dht.ensureConnectedToPeer(ctx, pid) + if err != nil { + return nil, err + } + out = append(out, pid) + } + return out, nil +} + // FindProvidersAsync is the same thing as FindProviders, but returns a channel. // Peers will be returned on the channel as soon as they are found, even before // the search query completes. @@ -182,6 +269,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co // Add unique providers from request, up to 'count' for _, prov := range provs { if ps.TryAdd(prov.ID) { + dht.peerstore.AddAddresses(prov.ID, prov.Addrs) select { case peerOut <- prov: case <-ctx.Done(): diff --git a/routing/kbucket/sorting.go b/routing/kbucket/sorting.go new file mode 100644 index 000000000..a3a68767b --- /dev/null +++ b/routing/kbucket/sorting.go @@ -0,0 +1,59 @@ +package kbucket + +import ( + "container/list" + peer "github.com/jbenet/go-ipfs/peer" + "sort" +) + +// A helper struct to sort peers by their distance to the local node +type peerDistance struct { + p peer.ID + distance ID +} + +// peerSorterArr implements sort.Interface to sort peers by xor distance +type peerSorterArr []*peerDistance + +func (p peerSorterArr) Len() int { return len(p) } +func (p peerSorterArr) Swap(a, b int) { p[a], p[b] = p[b], p[a] } +func (p peerSorterArr) Less(a, b int) bool { + return p[a].distance.less(p[b].distance) +} + +// + +func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) peerSorterArr { + for e := peerList.Front(); e != nil; e = e.Next() { + p := e.Value.(peer.ID) + pID := ConvertPeerID(p) + pd := peerDistance{ + p: p, + distance: xor(target, pID), + } + peerArr = append(peerArr, &pd) + if e == nil { + log.Debug("list element was nil") + return peerArr + } + } + return peerArr +} + +func SortClosestPeers(peers []peer.ID, target ID) []peer.ID { + var psarr peerSorterArr + for _, p := range peers { + pID := ConvertPeerID(p) + pd := &peerDistance{ + p: p, + distance: xor(target, pID), + } + psarr = append(psarr, pd) + } + sort.Sort(psarr) + var out []peer.ID + for _, p := range psarr { + out = append(out, p.p) + } + return out +} diff --git a/routing/kbucket/table.go b/routing/kbucket/table.go index bed7447a5..90ba65530 100644 --- a/routing/kbucket/table.go +++ b/routing/kbucket/table.go @@ -2,7 +2,6 @@ package kbucket import ( - "container/list" "fmt" "sort" "sync" @@ -103,40 +102,6 @@ func (rt *RoutingTable) nextBucket() peer.ID { return "" } -// A helper struct to sort peers by their distance to the local node -type peerDistance struct { - p peer.ID - distance ID -} - -// peerSorterArr implements sort.Interface to sort peers by xor distance -type peerSorterArr []*peerDistance - -func (p peerSorterArr) Len() int { return len(p) } -func (p peerSorterArr) Swap(a, b int) { p[a], p[b] = p[b], p[a] } -func (p peerSorterArr) Less(a, b int) bool { - return p[a].distance.less(p[b].distance) -} - -// - -func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) peerSorterArr { - for e := peerList.Front(); e != nil; e = e.Next() { - p := e.Value.(peer.ID) - pID := ConvertPeerID(p) - pd := peerDistance{ - p: p, - distance: xor(target, pID), - } - peerArr = append(peerArr, &pd) - if e == nil { - log.Debug("list element was nil") - return peerArr - } - } - return peerArr -} - // Find a specific peer by ID or return nil func (rt *RoutingTable) Find(id peer.ID) peer.ID { srch := rt.NearestPeers(ConvertPeerID(id), 1) diff --git a/util/testutil/gen.go b/util/testutil/gen.go index 59dd862a8..aa06047b2 100644 --- a/util/testutil/gen.go +++ b/util/testutil/gen.go @@ -17,7 +17,11 @@ import ( ) func RandKeyPair(bits int) (ci.PrivKey, ci.PubKey, error) { - return ci.GenerateKeyPair(ci.RSA, bits) + return ci.GenerateKeyPair(ci.RSA, bits, crand.Reader) +} + +func SeededKeyPair(bits int, seed int64) (ci.PrivKey, ci.PubKey, error) { + return ci.GenerateKeyPair(ci.RSA, bits, u.NewSeededRand(seed)) } // RandPeerID generates random "valid" peer IDs. it does not NEED to generate @@ -120,7 +124,7 @@ func RandPeerNetParams() (*PeerNetParams, error) { var p PeerNetParams var err error p.Addr = RandLocalTCPAddress() - p.PrivKey, p.PubKey, err = ci.GenerateKeyPair(ci.RSA, 512) + p.PrivKey, p.PubKey, err = ci.GenerateKeyPair(ci.RSA, 512, u.NewTimeSeededRand()) if err != nil { return nil, err } diff --git a/util/util.go b/util/util.go index 141cd8cba..32bc314ab 100644 --- a/util/util.go +++ b/util/util.go @@ -107,6 +107,13 @@ func NewTimeSeededRand() io.Reader { } } +func NewSeededRand(seed int64) io.Reader { + src := rand.NewSource(seed) + return &randGen{ + Rand: *rand.New(src), + } +} + func (r *randGen) Read(p []byte) (n int, err error) { for i := 0; i < len(p); i++ { p[i] = byte(r.Rand.Intn(255)) From b4b6fe22141d7344529a1b1d7d50be995a6468e4 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 24 Dec 2014 19:42:37 +0000 Subject: [PATCH 2/8] a couple small fixes --- peer/peer_test.go | 2 +- routing/dht/routing.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/peer/peer_test.go b/peer/peer_test.go index 810df6218..ecf805953 100644 --- a/peer/peer_test.go +++ b/peer/peer_test.go @@ -39,7 +39,7 @@ type keyset struct { func (ks *keyset) generate() error { var err error - ks.sk, ks.pk, err = ic.GenerateKeyPair(ic.RSA, 1024) + ks.sk, ks.pk, err = ic.GenerateKeyPair(ic.RSA, 1024, u.NewTimeSeededRand()) if err != nil { return err } diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 0cd5751a1..a0334451f 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -269,7 +269,6 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co // Add unique providers from request, up to 'count' for _, prov := range provs { if ps.TryAdd(prov.ID) { - dht.peerstore.AddAddresses(prov.ID, prov.Addrs) select { case peerOut <- prov: case <-ctx.Done(): From 2ad23f05797195d434baa2188075b346922739d0 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sun, 28 Dec 2014 15:46:41 -0800 Subject: [PATCH 3/8] dht: fix TestLayeredGet The test was occasionally passing because: - it called `putLocal(key, val)` - GetValue calls `getLocal(key)` optimistically. cc @whyrusleeping --- routing/dht/dht_test.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index bbc7b9692..547e88a9c 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -486,12 +486,7 @@ func TestLayeredGet(t *testing.T) { connect(t, ctx, dhts[1], dhts[2]) connect(t, ctx, dhts[1], dhts[3]) - err := dhts[3].putLocal(u.Key("/v/hello"), []byte("world")) - if err != nil { - t.Fatal(err) - } - - err = dhts[3].Provide(ctx, u.Key("/v/hello")) + err := dhts[3].Provide(ctx, u.Key("/v/hello")) if err != nil { t.Fatal(err) } From 46aa22e949c0884dd67f255ae7fbcfa6781e5cf1 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sun, 28 Dec 2014 23:46:25 +0000 Subject: [PATCH 4/8] some better logging and cleanup --- routing/dht/dht.go | 30 ++++------------------- routing/dht/handlers.go | 7 ++---- routing/dht/routing.go | 54 ++++++++++------------------------------- 3 files changed, 20 insertions(+), 71 deletions(-) diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 1573f3477..fb7c5a49b 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -102,8 +102,8 @@ func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.ID) error { return nil } -// putValueToNetwork stores the given key/value pair at the peer 'p' -func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.ID, +// putValueToPeer stores the given key/value pair at the peer 'p' +func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, key u.Key, rec *pb.Record) error { pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0) @@ -237,12 +237,12 @@ func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) { } // FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in. -func (dht *IpfsDHT) FindLocal(id peer.ID) (peer.PeerInfo, *kb.RoutingTable) { +func (dht *IpfsDHT) FindLocal(id peer.ID) peer.PeerInfo { p := dht.routingTable.Find(id) if p != "" { - return dht.peerstore.PeerInfo(p), dht.routingTable + return dht.peerstore.PeerInfo(p) } - return peer.PeerInfo{}, nil + return peer.PeerInfo{} } // findPeerSingle asks peer 'p' if they know where the peer with id 'id' is @@ -256,26 +256,6 @@ func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key u.Ke return dht.sendRequest(ctx, p, pmes) } -func (dht *IpfsDHT) addProviders(key u.Key, pbps []*pb.Message_Peer) []peer.ID { - peers := pb.PBPeersToPeerInfos(pbps) - - var provArr []peer.ID - for _, pi := range peers { - p := pi.ID - - // Dont add outselves to the list - if p == dht.self { - continue - } - - log.Debugf("%s adding provider: %s for %s", dht.self, p, key) - // TODO(jbenet) ensure providers is idempotent - dht.providers.AddProvider(key, p) - provArr = append(provArr, p) - } - return provArr -} - // nearestPeersToQuery returns the routing tables closest peers. func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID { key := u.Key(pmes.GetKey()) diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index e8edaa5eb..acb052248 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -39,7 +39,7 @@ func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler { } func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { - log.Debugf("%s handleGetValue for key: %s\n", dht.self, pmes.GetKey()) + log.Debugf("%s handleGetValue for key: %s", dht.self, pmes.GetKey()) // setup response resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel()) @@ -127,7 +127,7 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess } err = dht.datastore.Put(dskey, data) - log.Debugf("%s handlePutValue %v\n", dht.self, dskey) + log.Debugf("%s handlePutValue %v", dht.self, dskey) return pmes, err } @@ -137,9 +137,6 @@ func (dht *IpfsDHT) handlePing(_ context.Context, p peer.ID, pmes *pb.Message) ( } func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { - log.Errorf("handle find peer %s start", p) - defer log.Errorf("handle find peer %s end", p) - resp := pb.NewMessage(pmes.GetType(), "", pmes.GetClusterLevel()) var closest []peer.ID diff --git a/routing/dht/routing.go b/routing/dht/routing.go index a0334451f..8b0bb1670 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -50,7 +50,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error wg.Add(1) go func(p peer.ID) { defer wg.Done() - err := dht.putValueToNetwork(ctx, p, key, rec) + err := dht.putValueToPeer(ctx, p, key, rec) if err != nil { log.Errorf("failed putting value to peer: %s", err) } @@ -125,12 +125,18 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error { return err } + wg := sync.WaitGroup{} for p := range peers { - err := dht.putProvider(ctx, p, string(key)) - if err != nil { - log.Error(err) - } + wg.Add(1) + go func(p peer.ID) { + defer wg.Done() + err := dht.putProvider(ctx, p, string(key)) + if err != nil { + log.Error(err) + } + }(p) } + wg.Wait() return nil } @@ -144,7 +150,6 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]peer.PeerIn } func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key, count int) (<-chan peer.ID, error) { - log.Error("Get Closest Peers") tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) if len(tablepeers) == 0 { return nil, kb.ErrLookupFailure @@ -170,15 +175,12 @@ func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key, count int) ( go func() { wg.Wait() close(out) - log.Error("Closing closest peer chan") }() return out, nil } func (dht *IpfsDHT) getClosestPeersRecurse(ctx context.Context, key u.Key, p peer.ID, peers *pset.PeerSet, peerOut chan<- peer.ID) { - log.Error("closest peers recurse") - defer log.Error("closest peers recurse end") closer, err := dht.closerPeersSingle(ctx, key, p) if err != nil { log.Errorf("error getting closer peers: %s", err) @@ -204,8 +206,6 @@ func (dht *IpfsDHT) getClosestPeersRecurse(ctx context.Context, key u.Key, p pee } func (dht *IpfsDHT) closerPeersSingle(ctx context.Context, key u.Key, p peer.ID) ([]peer.ID, error) { - log.Errorf("closest peers single %s %s", p, key) - defer log.Errorf("closest peers single end %s %s", p, key) pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key)) if err != nil { return nil, err @@ -236,6 +236,7 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, count int, peerOut chan peer.PeerInfo) { defer close(peerOut) + defer log.Event(ctx, "findProviders end", &key) log.Debugf("%s FindProviders %s", dht.self, key) ps := pset.NewLimited(count) @@ -294,40 +295,11 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co } } -func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.Message_Peer, ps *pset.PeerSet, count int, out chan peer.PeerInfo) { - var wg sync.WaitGroup - peerInfos := pb.PBPeersToPeerInfos(peers) - for _, pi := range peerInfos { - wg.Add(1) - go func(pi peer.PeerInfo) { - defer wg.Done() - - p := pi.ID - if err := dht.ensureConnectedToPeer(ctx, p); err != nil { - log.Errorf("%s", err) - return - } - - dht.providers.AddProvider(k, p) - if ps.TryAdd(p) { - select { - case out <- pi: - case <-ctx.Done(): - return - } - } else if ps.Size() >= count { - return - } - }(pi) - } - wg.Wait() -} - // FindPeer searches for a peer with given ID. func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, error) { // Check if were already connected to them - if pi, _ := dht.FindLocal(id); pi.ID != "" { + if pi := dht.FindLocal(id); pi.ID != "" { return pi, nil } From 14fc4188be4a98be5526b9254e46cf5806caf834 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 29 Dec 2014 02:29:55 +0000 Subject: [PATCH 5/8] address comments from PR --- cmd/ipfs/init.go | 3 +-- cmd/seccat/seccat.go | 2 +- core/mock.go | 3 +-- crypto/key.go | 6 +++++- crypto/key_test.go | 10 ++++++---- namesys/resolve_test.go | 3 +-- peer/peer_test.go | 6 ++++-- util/testutil/gen.go | 6 +++--- 8 files changed, 22 insertions(+), 17 deletions(-) diff --git a/cmd/ipfs/init.go b/cmd/ipfs/init.go index 62415f3a7..bc609659f 100644 --- a/cmd/ipfs/init.go +++ b/cmd/ipfs/init.go @@ -2,7 +2,6 @@ package main import ( "bytes" - "crypto/rand" "encoding/base64" "fmt" "os" @@ -253,7 +252,7 @@ func identityConfig(nbits int) (config.Identity, error) { } fmt.Printf("generating key pair...") - sk, pk, err := ci.GenerateKeyPair(ci.RSA, nbits, rand.Reader) + sk, pk, err := ci.GenerateKeyPair(ci.RSA, nbits) if err != nil { return ident, err } diff --git a/cmd/seccat/seccat.go b/cmd/seccat/seccat.go index a858593e2..625f64ec0 100644 --- a/cmd/seccat/seccat.go +++ b/cmd/seccat/seccat.go @@ -115,7 +115,7 @@ func setupPeer(a args) (peer.ID, peer.Peerstore, error) { } out("generating key pair...") - sk, pk, err := ci.GenerateKeyPair(ci.RSA, a.keybits, u.NewTimeSeededRand()) + sk, pk, err := ci.GenerateKeyPair(ci.RSA, a.keybits) if err != nil { return "", nil, err } diff --git a/core/mock.go b/core/mock.go index 92946276c..758b30588 100644 --- a/core/mock.go +++ b/core/mock.go @@ -1,7 +1,6 @@ package core import ( - "crypto/rand" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" @@ -30,7 +29,7 @@ func NewMockNode() (*IpfsNode, error) { nd := new(IpfsNode) // Generate Identity - sk, pk, err := ci.GenerateKeyPair(ci.RSA, 1024, rand.Reader) + sk, pk, err := ci.GenerateKeyPair(ci.RSA, 1024) if err != nil { return nil, err } diff --git a/crypto/key.go b/crypto/key.go index fe687691e..df9a8b512 100644 --- a/crypto/key.go +++ b/crypto/key.go @@ -75,8 +75,12 @@ type PubKey interface { // Given a public key, generates the shared key. type GenSharedKey func([]byte) ([]byte, error) +func GenerateKeyPair(typ, bits int) (PrivKey, PubKey, error) { + return GenerateKeyPairWithReader(typ, bits, rand.Reader) +} + // Generates a keypair of the given type and bitsize -func GenerateKeyPair(typ, bits int, src io.Reader) (PrivKey, PubKey, error) { +func GenerateKeyPairWithReader(typ, bits int, src io.Reader) (PrivKey, PubKey, error) { switch typ { case RSA: priv, err := rsa.GenerateKey(src, bits) diff --git a/crypto/key_test.go b/crypto/key_test.go index d771ea648..16b13a2f8 100644 --- a/crypto/key_test.go +++ b/crypto/key_test.go @@ -1,13 +1,15 @@ -package crypto +package crypto_test import ( + . "github.com/jbenet/go-ipfs/crypto" + "bytes" - u "github.com/jbenet/go-ipfs/util" + tu "github.com/jbenet/go-ipfs/util/testutil" "testing" ) func TestRsaKeys(t *testing.T) { - sk, pk, err := GenerateKeyPair(RSA, 512, u.NewTimeSeededRand()) + sk, pk, err := tu.RandKeyPair(512) if err != nil { t.Fatal(err) } @@ -91,7 +93,7 @@ func testKeyEquals(t *testing.T, k Key) { t.Fatal("Key not equal to key with same bytes.") } - sk, pk, err := GenerateKeyPair(RSA, 512, u.NewTimeSeededRand()) + sk, pk, err := tu.RandKeyPair(512) if err != nil { t.Fatal(err) } diff --git a/namesys/resolve_test.go b/namesys/resolve_test.go index 592c344d7..35851fc32 100644 --- a/namesys/resolve_test.go +++ b/namesys/resolve_test.go @@ -3,7 +3,6 @@ package namesys import ( "testing" - ci "github.com/jbenet/go-ipfs/crypto" mockrouting "github.com/jbenet/go-ipfs/routing/mock" u "github.com/jbenet/go-ipfs/util" testutil "github.com/jbenet/go-ipfs/util/testutil" @@ -15,7 +14,7 @@ func TestRoutingResolve(t *testing.T) { resolver := NewRoutingResolver(d) publisher := NewRoutingPublisher(d) - privk, pubk, err := ci.GenerateKeyPair(ci.RSA, 512, u.NewTimeSeededRand()) + privk, pubk, err := testutil.RandKeyPair(512) if err != nil { t.Fatal(err) } diff --git a/peer/peer_test.go b/peer/peer_test.go index ecf805953..e8d1d29f3 100644 --- a/peer/peer_test.go +++ b/peer/peer_test.go @@ -1,4 +1,4 @@ -package peer +package peer_test import ( "encoding/base64" @@ -7,7 +7,9 @@ import ( "testing" ic "github.com/jbenet/go-ipfs/crypto" + . "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" + tu "github.com/jbenet/go-ipfs/util/testutil" b58 "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-base58" ) @@ -39,7 +41,7 @@ type keyset struct { func (ks *keyset) generate() error { var err error - ks.sk, ks.pk, err = ic.GenerateKeyPair(ic.RSA, 1024, u.NewTimeSeededRand()) + ks.sk, ks.pk, err = tu.RandKeyPair(512) if err != nil { return err } diff --git a/util/testutil/gen.go b/util/testutil/gen.go index aa06047b2..9e060fd87 100644 --- a/util/testutil/gen.go +++ b/util/testutil/gen.go @@ -17,11 +17,11 @@ import ( ) func RandKeyPair(bits int) (ci.PrivKey, ci.PubKey, error) { - return ci.GenerateKeyPair(ci.RSA, bits, crand.Reader) + return ci.GenerateKeyPairWithReader(ci.RSA, bits, u.NewTimeSeededRand()) } func SeededKeyPair(bits int, seed int64) (ci.PrivKey, ci.PubKey, error) { - return ci.GenerateKeyPair(ci.RSA, bits, u.NewSeededRand(seed)) + return ci.GenerateKeyPairWithReader(ci.RSA, bits, u.NewSeededRand(seed)) } // RandPeerID generates random "valid" peer IDs. it does not NEED to generate @@ -124,7 +124,7 @@ func RandPeerNetParams() (*PeerNetParams, error) { var p PeerNetParams var err error p.Addr = RandLocalTCPAddress() - p.PrivKey, p.PubKey, err = ci.GenerateKeyPair(ci.RSA, 512, u.NewTimeSeededRand()) + p.PrivKey, p.PubKey, err = RandKeyPair(512) if err != nil { return nil, err } From 5af5625805a968b2e3524090d41e517971b8883c Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 29 Dec 2014 06:32:27 +0000 Subject: [PATCH 6/8] use query for getClosestPeers --- routing/dht/routing.go | 68 ++++++++++++++++++++---------------------- 1 file changed, 32 insertions(+), 36 deletions(-) diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 8b0bb1670..98c4ce3d4 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -159,52 +159,48 @@ func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key, count int) ( peerset := pset.NewLimited(count) for _, p := range tablepeers { - out <- p + select { + case out <- p: + case <-ctx.Done(): + return nil, ctx.Err() + } peerset.Add(p) } - wg := sync.WaitGroup{} - for _, p := range tablepeers { - wg.Add(1) - go func(p peer.ID) { - dht.getClosestPeersRecurse(ctx, key, p, peerset, out) - wg.Done() - }(p) - } + query := newQuery(key, dht.network, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { + closer, err := dht.closerPeersSingle(ctx, key, p) + if err != nil { + log.Errorf("error getting closer peers: %s", err) + return nil, err + } + + var filtered []peer.PeerInfo + for _, p := range closer { + if kb.Closer(p, dht.self, key) && peerset.TryAdd(p) { + select { + case out <- p: + case <-ctx.Done(): + return nil, ctx.Err() + } + filtered = append(filtered, dht.peerstore.PeerInfo(p)) + } + } + + return &dhtQueryResult{closerPeers: filtered}, nil + }) go func() { - wg.Wait() - close(out) + defer close(out) + // run it! + _, err := query.Run(ctx, tablepeers) + if err != nil { + log.Errorf("closestPeers query run error: %s", err) + } }() return out, nil } -func (dht *IpfsDHT) getClosestPeersRecurse(ctx context.Context, key u.Key, p peer.ID, peers *pset.PeerSet, peerOut chan<- peer.ID) { - closer, err := dht.closerPeersSingle(ctx, key, p) - if err != nil { - log.Errorf("error getting closer peers: %s", err) - return - } - - wg := sync.WaitGroup{} - for _, p := range closer { - if kb.Closer(p, dht.self, key) && peers.TryAdd(p) { - select { - case peerOut <- p: - case <-ctx.Done(): - return - } - wg.Add(1) - go func(p peer.ID) { - dht.getClosestPeersRecurse(ctx, key, p, peers, peerOut) - wg.Done() - }(p) - } - } - wg.Wait() -} - func (dht *IpfsDHT) closerPeersSingle(ctx context.Context, key u.Key, p peer.ID) ([]peer.ID, error) { pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key)) if err != nil { From b4c6c874229ada3131ccdd4cc70f30e2cd404658 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 29 Dec 2014 18:22:16 +0000 Subject: [PATCH 7/8] Improve readability of getClosestPeers method. Also remove older useless code. --- routing/dht/routing.go | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 98c4ce3d4..36e281cc9 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -40,7 +40,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error return err } - pchan, err := dht.getClosestPeers(ctx, key, KValue) + pchan, err := dht.getClosestPeers(ctx, key) if err != nil { return err } @@ -116,11 +116,11 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { // Provide makes this node announce that it can provide a value for the given key func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error { - log.Event(ctx, "Provide Value start", &key) - defer log.Event(ctx, "Provide Value end", &key) + log.Event(ctx, "provideBegin", &key) + defer log.Event(ctx, "provideEnd", &key) dht.providers.AddProvider(key, dht.self) - peers, err := dht.getClosestPeers(ctx, key, KValue) + peers, err := dht.getClosestPeers(ctx, key) if err != nil { return err } @@ -149,14 +149,16 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]peer.PeerIn return providers, nil } -func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key, count int) (<-chan peer.ID, error) { +// Kademlia 'node lookup' operation. Returns a channel of the K closest peers +// to the given key +func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key) (<-chan peer.ID, error) { tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) if len(tablepeers) == 0 { return nil, kb.ErrLookupFailure } - out := make(chan peer.ID, count) - peerset := pset.NewLimited(count) + out := make(chan peer.ID, KValue) + peerset := pset.NewLimited(KValue) for _, p := range tablepeers { select { @@ -211,10 +213,6 @@ func (dht *IpfsDHT) closerPeersSingle(ctx context.Context, key u.Key, p peer.ID) for _, pbp := range pmes.GetCloserPeers() { pid := peer.ID(pbp.GetId()) dht.peerstore.AddAddresses(pid, pbp.Addresses()) - err := dht.ensureConnectedToPeer(ctx, pid) - if err != nil { - return nil, err - } out = append(out, pid) } return out, nil From da04d267796560c37d4a36b33d50c82338c85ff2 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 2 Jan 2015 08:33:42 +0000 Subject: [PATCH 8/8] clean up test setup interface --- net/mock/mock_net.go | 2 +- routing/dht/dht_test.go | 18 +++++++++--------- util/testutil/gen.go | 4 ++-- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/net/mock/mock_net.go b/net/mock/mock_net.go index ab3359237..2be03f836 100644 --- a/net/mock/mock_net.go +++ b/net/mock/mock_net.go @@ -41,7 +41,7 @@ func New(ctx context.Context) Mocknet { } func (mn *mocknet) GenPeer() (inet.Network, error) { - sk, _, err := testutil.SeededKeyPair(512, int64(len(mn.nets))) + sk, _, err := testutil.SeededKeyPair(int64(len(mn.nets))) if err != nil { return nil, err } diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index 547e88a9c..5eeb3a2bc 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -32,9 +32,9 @@ func init() { } } -func setupDHT(ctx context.Context, t *testing.T, addr ma.Multiaddr, seed int64) *IpfsDHT { +func setupDHT(ctx context.Context, t *testing.T, addr ma.Multiaddr) *IpfsDHT { - sk, pk, err := testutil.SeededKeyPair(512, seed) + sk, pk, err := testutil.SeededKeyPair(time.Now().UnixNano()) if err != nil { t.Fatal(err) } @@ -70,7 +70,7 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer for i := 0; i < n; i++ { addrs[i] = testutil.RandLocalTCPAddress() - dhts[i] = setupDHT(ctx, t, addrs[i], int64(i)) + dhts[i] = setupDHT(ctx, t, addrs[i]) peers[i] = dhts[i].self } @@ -119,8 +119,8 @@ func TestPing(t *testing.T) { addrA := testutil.RandLocalTCPAddress() addrB := testutil.RandLocalTCPAddress() - dhtA := setupDHT(ctx, t, addrA, 1) - dhtB := setupDHT(ctx, t, addrB, 2) + dhtA := setupDHT(ctx, t, addrA) + dhtB := setupDHT(ctx, t, addrB) peerA := dhtA.self peerB := dhtB.self @@ -152,8 +152,8 @@ func TestValueGetSet(t *testing.T) { addrA := testutil.RandLocalTCPAddress() addrB := testutil.RandLocalTCPAddress() - dhtA := setupDHT(ctx, t, addrA, 1) - dhtB := setupDHT(ctx, t, addrB, 2) + dhtA := setupDHT(ctx, t, addrA) + dhtB := setupDHT(ctx, t, addrB) defer dhtA.Close() defer dhtB.Close() @@ -636,8 +636,8 @@ func TestConnectCollision(t *testing.T) { addrA := testutil.RandLocalTCPAddress() addrB := testutil.RandLocalTCPAddress() - dhtA := setupDHT(ctx, t, addrA, int64((rtime*2)+1)) - dhtB := setupDHT(ctx, t, addrB, int64((rtime*2)+2)) + dhtA := setupDHT(ctx, t, addrA) + dhtB := setupDHT(ctx, t, addrB) peerA := dhtA.self peerB := dhtB.self diff --git a/util/testutil/gen.go b/util/testutil/gen.go index 9e060fd87..bcf1f283e 100644 --- a/util/testutil/gen.go +++ b/util/testutil/gen.go @@ -20,8 +20,8 @@ func RandKeyPair(bits int) (ci.PrivKey, ci.PubKey, error) { return ci.GenerateKeyPairWithReader(ci.RSA, bits, u.NewTimeSeededRand()) } -func SeededKeyPair(bits int, seed int64) (ci.PrivKey, ci.PubKey, error) { - return ci.GenerateKeyPairWithReader(ci.RSA, bits, u.NewSeededRand(seed)) +func SeededKeyPair(seed int64) (ci.PrivKey, ci.PubKey, error) { + return ci.GenerateKeyPairWithReader(ci.RSA, 512, u.NewSeededRand(seed)) } // RandPeerID generates random "valid" peer IDs. it does not NEED to generate