mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-30 09:59:13 +08:00
Merge pull request #456 from jbenet/provides-rewrite
rewrite of provides to better select peers to send RPCs to
This commit is contained in:
@ -2,6 +2,7 @@ package core
|
||||
|
||||
import (
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
|
||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||
syncds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
|
||||
"github.com/jbenet/go-ipfs/blocks/blockstore"
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"crypto/elliptic"
|
||||
"crypto/hmac"
|
||||
@ -74,11 +75,15 @@ type PubKey interface {
|
||||
// Given a public key, generates the shared key.
|
||||
type GenSharedKey func([]byte) ([]byte, error)
|
||||
|
||||
// Generates a keypair of the given type and bitsize
|
||||
func GenerateKeyPair(typ, bits int) (PrivKey, PubKey, error) {
|
||||
return GenerateKeyPairWithReader(typ, bits, rand.Reader)
|
||||
}
|
||||
|
||||
// Generates a keypair of the given type and bitsize
|
||||
func GenerateKeyPairWithReader(typ, bits int, src io.Reader) (PrivKey, PubKey, error) {
|
||||
switch typ {
|
||||
case RSA:
|
||||
priv, err := rsa.GenerateKey(rand.Reader, bits)
|
||||
priv, err := rsa.GenerateKey(src, bits)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
@ -1,12 +1,15 @@
|
||||
package crypto
|
||||
package crypto_test
|
||||
|
||||
import (
|
||||
. "github.com/jbenet/go-ipfs/crypto"
|
||||
|
||||
"bytes"
|
||||
tu "github.com/jbenet/go-ipfs/util/testutil"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestRsaKeys(t *testing.T) {
|
||||
sk, pk, err := GenerateKeyPair(RSA, 512)
|
||||
sk, pk, err := tu.RandKeyPair(512)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -90,7 +93,7 @@ func testKeyEquals(t *testing.T, k Key) {
|
||||
t.Fatal("Key not equal to key with same bytes.")
|
||||
}
|
||||
|
||||
sk, pk, err := GenerateKeyPair(RSA, 512)
|
||||
sk, pk, err := tu.RandKeyPair(512)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -3,7 +3,6 @@ package namesys
|
||||
import (
|
||||
"testing"
|
||||
|
||||
ci "github.com/jbenet/go-ipfs/crypto"
|
||||
mockrouting "github.com/jbenet/go-ipfs/routing/mock"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
testutil "github.com/jbenet/go-ipfs/util/testutil"
|
||||
@ -15,7 +14,7 @@ func TestRoutingResolve(t *testing.T) {
|
||||
resolver := NewRoutingResolver(d)
|
||||
publisher := NewRoutingPublisher(d)
|
||||
|
||||
privk, pubk, err := ci.GenerateKeyPair(ci.RSA, 512)
|
||||
privk, pubk, err := testutil.RandKeyPair(512)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -41,7 +41,7 @@ func New(ctx context.Context) Mocknet {
|
||||
}
|
||||
|
||||
func (mn *mocknet) GenPeer() (inet.Network, error) {
|
||||
sk, _, err := testutil.RandKeyPair(512)
|
||||
sk, _, err := testutil.SeededKeyPair(int64(len(mn.nets)))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
package peer
|
||||
package peer_test
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
@ -7,7 +7,9 @@ import (
|
||||
"testing"
|
||||
|
||||
ic "github.com/jbenet/go-ipfs/crypto"
|
||||
. "github.com/jbenet/go-ipfs/peer"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
tu "github.com/jbenet/go-ipfs/util/testutil"
|
||||
|
||||
b58 "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-base58"
|
||||
)
|
||||
@ -39,7 +41,7 @@ type keyset struct {
|
||||
|
||||
func (ks *keyset) generate() error {
|
||||
var err error
|
||||
ks.sk, ks.pk, err = ic.GenerateKeyPair(ic.RSA, 1024)
|
||||
ks.sk, ks.pk, err = tu.RandKeyPair(512)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -102,10 +102,9 @@ func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.ID) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// putValueToNetwork stores the given key/value pair at the peer 'p'
|
||||
// meaning: it sends a PUT_VALUE message to p
|
||||
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.ID,
|
||||
key string, rec *pb.Record) error {
|
||||
// putValueToPeer stores the given key/value pair at the peer 'p'
|
||||
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID,
|
||||
key u.Key, rec *pb.Record) error {
|
||||
|
||||
pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0)
|
||||
pmes.Record = rec
|
||||
@ -238,12 +237,12 @@ func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) {
|
||||
}
|
||||
|
||||
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
|
||||
func (dht *IpfsDHT) FindLocal(id peer.ID) (peer.PeerInfo, *kb.RoutingTable) {
|
||||
func (dht *IpfsDHT) FindLocal(id peer.ID) peer.PeerInfo {
|
||||
p := dht.routingTable.Find(id)
|
||||
if p != "" {
|
||||
return dht.peerstore.PeerInfo(p), dht.routingTable
|
||||
return dht.peerstore.PeerInfo(p)
|
||||
}
|
||||
return peer.PeerInfo{}, nil
|
||||
return peer.PeerInfo{}
|
||||
}
|
||||
|
||||
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
|
||||
@ -257,26 +256,6 @@ func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key u.Ke
|
||||
return dht.sendRequest(ctx, p, pmes)
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) addProviders(key u.Key, pbps []*pb.Message_Peer) []peer.ID {
|
||||
peers := pb.PBPeersToPeerInfos(pbps)
|
||||
|
||||
var provArr []peer.ID
|
||||
for _, pi := range peers {
|
||||
p := pi.ID
|
||||
|
||||
// Dont add outselves to the list
|
||||
if p == dht.self {
|
||||
continue
|
||||
}
|
||||
|
||||
log.Debugf("%s adding provider: %s for %s", dht.self, p, key)
|
||||
// TODO(jbenet) ensure providers is idempotent
|
||||
dht.providers.AddProvider(key, p)
|
||||
provArr = append(provArr, p)
|
||||
}
|
||||
return provArr
|
||||
}
|
||||
|
||||
// nearestPeersToQuery returns the routing tables closest peers.
|
||||
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
|
||||
key := u.Key(pmes.GetKey())
|
||||
@ -285,7 +264,7 @@ func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
|
||||
}
|
||||
|
||||
// betterPeerToQuery returns nearestPeersToQuery, but iff closer than self.
|
||||
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.ID {
|
||||
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) []peer.ID {
|
||||
closer := dht.nearestPeersToQuery(pmes, count)
|
||||
|
||||
// no node? nil
|
||||
@ -302,11 +281,16 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.ID {
|
||||
}
|
||||
|
||||
var filtered []peer.ID
|
||||
for _, p := range closer {
|
||||
for _, clp := range closer {
|
||||
// Dont send a peer back themselves
|
||||
if p == clp {
|
||||
continue
|
||||
}
|
||||
|
||||
// must all be closer than self
|
||||
key := u.Key(pmes.GetKey())
|
||||
if !kb.Closer(dht.self, p, key) {
|
||||
filtered = append(filtered, p)
|
||||
if !kb.Closer(dht.self, clp, key) {
|
||||
filtered = append(filtered, clp)
|
||||
}
|
||||
}
|
||||
|
||||
@ -323,23 +307,6 @@ func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, p peer.ID) error
|
||||
return dht.network.DialPeer(ctx, p)
|
||||
}
|
||||
|
||||
//TODO: this should be smarter about which keys it selects.
|
||||
func (dht *IpfsDHT) loadProvidableKeys() error {
|
||||
kl, err := dht.datastore.KeyList()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, dsk := range kl {
|
||||
k := u.KeyFromDsKey(dsk)
|
||||
if len(k) == 0 {
|
||||
log.Errorf("loadProvidableKeys error: %v", dsk)
|
||||
}
|
||||
|
||||
dht.providers.AddProvider(k, dht.self)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// PingRoutine periodically pings nearest neighbors.
|
||||
func (dht *IpfsDHT) PingRoutine(t time.Duration) {
|
||||
defer dht.Children().Done()
|
||||
|
@ -14,7 +14,6 @@ import (
|
||||
dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
|
||||
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
|
||||
// ci "github.com/jbenet/go-ipfs/crypto"
|
||||
inet "github.com/jbenet/go-ipfs/net"
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
routing "github.com/jbenet/go-ipfs/routing"
|
||||
@ -35,7 +34,7 @@ func init() {
|
||||
|
||||
func setupDHT(ctx context.Context, t *testing.T, addr ma.Multiaddr) *IpfsDHT {
|
||||
|
||||
sk, pk, err := testutil.RandKeyPair(512)
|
||||
sk, pk, err := testutil.SeededKeyPair(time.Now().UnixNano())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -487,12 +486,7 @@ func TestLayeredGet(t *testing.T) {
|
||||
connect(t, ctx, dhts[1], dhts[2])
|
||||
connect(t, ctx, dhts[1], dhts[3])
|
||||
|
||||
err := dhts[3].putLocal(u.Key("/v/hello"), []byte("world"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = dhts[3].Provide(ctx, u.Key("/v/hello"))
|
||||
err := dhts[3].Provide(ctx, u.Key("/v/hello"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -47,9 +47,8 @@ func TestGetFailures(t *testing.T) {
|
||||
t.Fatal("Did not get expected error!")
|
||||
}
|
||||
|
||||
msgs := make(chan *pb.Message, 100)
|
||||
t.Log("Timeout test passed.")
|
||||
|
||||
// u.POut("NotFound Test\n")
|
||||
// Reply with failures to every message
|
||||
nets[1].SetHandler(inet.ProtocolDHT, func(s inet.Stream) {
|
||||
defer s.Close()
|
||||
@ -68,8 +67,6 @@ func TestGetFailures(t *testing.T) {
|
||||
if err := pbw.WriteMsg(resp); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
msgs <- resp
|
||||
})
|
||||
|
||||
// This one should fail with NotFound
|
||||
@ -83,6 +80,8 @@ func TestGetFailures(t *testing.T) {
|
||||
t.Fatal("expected error, got none.")
|
||||
}
|
||||
|
||||
t.Log("ErrNotFound check passed!")
|
||||
|
||||
// Now we test this DHT's handleGetValue failure
|
||||
{
|
||||
typ := pb.Message_GET_VALUE
|
||||
|
@ -39,7 +39,7 @@ func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler {
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
|
||||
log.Debugf("%s handleGetValue for key: %s\n", dht.self, pmes.GetKey())
|
||||
log.Debugf("%s handleGetValue for key: %s", dht.self, pmes.GetKey())
|
||||
|
||||
// setup response
|
||||
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
|
||||
@ -93,7 +93,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
|
||||
}
|
||||
|
||||
// Find closest peer on given cluster to desired key and reply with that info
|
||||
closer := dht.betterPeersToQuery(pmes, CloserPeerCount)
|
||||
closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
|
||||
closerinfos := peer.PeerInfos(dht.peerstore, closer)
|
||||
if closer != nil {
|
||||
for _, pi := range closerinfos {
|
||||
@ -127,7 +127,7 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
|
||||
}
|
||||
|
||||
err = dht.datastore.Put(dskey, data)
|
||||
log.Debugf("%s handlePutValue %v\n", dht.self, dskey)
|
||||
log.Debugf("%s handlePutValue %v", dht.self, dskey)
|
||||
return pmes, err
|
||||
}
|
||||
|
||||
@ -144,11 +144,11 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Mess
|
||||
if peer.ID(pmes.GetKey()) == dht.self {
|
||||
closest = []peer.ID{dht.self}
|
||||
} else {
|
||||
closest = dht.betterPeersToQuery(pmes, CloserPeerCount)
|
||||
closest = dht.betterPeersToQuery(pmes, p, CloserPeerCount)
|
||||
}
|
||||
|
||||
if closest == nil {
|
||||
log.Debugf("handleFindPeer: could not find anything.")
|
||||
log.Warningf("handleFindPeer: could not find anything.")
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
@ -189,7 +189,7 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.
|
||||
}
|
||||
|
||||
// Also send closer peers.
|
||||
closer := dht.betterPeersToQuery(pmes, CloserPeerCount)
|
||||
closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
|
||||
if closer != nil {
|
||||
infos := peer.PeerInfos(dht.peerstore, providers)
|
||||
resp.CloserPeers = pb.PeerInfosToPBPeers(dht.network, infos)
|
||||
|
@ -7,8 +7,8 @@ import (
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
queue "github.com/jbenet/go-ipfs/peer/queue"
|
||||
"github.com/jbenet/go-ipfs/routing"
|
||||
kb "github.com/jbenet/go-ipfs/routing/kbucket"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
pset "github.com/jbenet/go-ipfs/util/peerset"
|
||||
todoctr "github.com/jbenet/go-ipfs/util/todocounter"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
@ -71,7 +71,7 @@ type dhtQueryRunner struct {
|
||||
peersToQuery *queue.ChanQueue
|
||||
|
||||
// peersSeen are all the peers queried. used to prevent querying same peer 2x
|
||||
peersSeen peer.Set
|
||||
peersSeen *pset.PeerSet
|
||||
|
||||
// rateLimit is a channel used to rate limit our processing (semaphore)
|
||||
rateLimit chan struct{}
|
||||
@ -97,7 +97,7 @@ func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner {
|
||||
query: q,
|
||||
peersToQuery: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(q.key)),
|
||||
peersRemaining: todoctr.NewSyncCounter(),
|
||||
peersSeen: peer.Set{},
|
||||
peersSeen: pset.New(),
|
||||
rateLimit: make(chan struct{}, q.concurrency),
|
||||
cg: ctxgroup.WithContext(ctx),
|
||||
}
|
||||
@ -117,7 +117,7 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
|
||||
|
||||
// add all the peers we got first.
|
||||
for _, p := range peers {
|
||||
r.addPeerToQuery(r.cg.Context(), p, "") // don't have access to self here...
|
||||
r.addPeerToQuery(r.cg.Context(), p)
|
||||
}
|
||||
|
||||
// go do this thing.
|
||||
@ -153,32 +153,17 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID, benchmark peer.ID) {
|
||||
func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID) {
|
||||
// if new peer is ourselves...
|
||||
if next == r.query.dialer.LocalPeer() {
|
||||
return
|
||||
}
|
||||
|
||||
// if new peer further away than whom we got it from, don't bother (loops)
|
||||
// TODO----------- this benchmark should be replaced by a heap:
|
||||
// we should be doing the s/kademlia "continue to search"
|
||||
// (i.e. put all of them in a heap sorted by dht distance and then just
|
||||
// pull from the the top until a) you exhaust all peers you get,
|
||||
// b) you succeed, c) your context expires.
|
||||
if benchmark != "" && kb.Closer(benchmark, next, r.query.key) {
|
||||
if !r.peersSeen.TryAdd(next) {
|
||||
log.Debug("query peer was already seen")
|
||||
return
|
||||
}
|
||||
|
||||
// if already seen, no need.
|
||||
r.Lock()
|
||||
_, found := r.peersSeen[next]
|
||||
if found {
|
||||
r.Unlock()
|
||||
return
|
||||
}
|
||||
r.peersSeen[next] = struct{}{}
|
||||
r.Unlock()
|
||||
|
||||
log.Debugf("adding peer to query: %v", next)
|
||||
|
||||
// do this after unlocking to prevent possible deadlocks.
|
||||
@ -278,7 +263,7 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) {
|
||||
}
|
||||
|
||||
r.query.dialer.Peerstore().AddAddresses(next.ID, next.Addrs)
|
||||
r.addPeerToQuery(cg.Context(), next.ID, p)
|
||||
r.addPeerToQuery(cg.Context(), next.ID)
|
||||
log.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs)
|
||||
}
|
||||
} else {
|
||||
|
@ -40,19 +40,24 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error
|
||||
return err
|
||||
}
|
||||
|
||||
peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue)
|
||||
pchan, err := dht.getClosestPeers(ctx, key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
query := newQuery(key, dht.network, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
|
||||
log.Debugf("%s PutValue qry part %v", dht.self, p)
|
||||
err := dht.putValueToNetwork(ctx, p, string(key), rec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &dhtQueryResult{success: true}, nil
|
||||
})
|
||||
|
||||
_, err = query.Run(ctx, peers)
|
||||
return err
|
||||
wg := sync.WaitGroup{}
|
||||
for p := range pchan {
|
||||
wg.Add(1)
|
||||
go func(p peer.ID) {
|
||||
defer wg.Done()
|
||||
err := dht.putValueToPeer(ctx, p, key, rec)
|
||||
if err != nil {
|
||||
log.Errorf("failed putting value to peer: %s", err)
|
||||
}
|
||||
}(p)
|
||||
}
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetValue searches for the value corresponding to given Key.
|
||||
@ -111,20 +116,27 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
|
||||
// Provide makes this node announce that it can provide a value for the given key
|
||||
func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error {
|
||||
|
||||
log.Event(ctx, "provideBegin", &key)
|
||||
defer log.Event(ctx, "provideEnd", &key)
|
||||
dht.providers.AddProvider(key, dht.self)
|
||||
peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), PoolSize)
|
||||
if len(peers) == 0 {
|
||||
return nil
|
||||
|
||||
peers, err := dht.getClosestPeers(ctx, key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
//TODO FIX: this doesn't work! it needs to be sent to the actual nearest peers.
|
||||
// `peers` are the closest peers we have, not the ones that should get the value.
|
||||
for _, p := range peers {
|
||||
err := dht.putProvider(ctx, p, string(key))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
wg := sync.WaitGroup{}
|
||||
for p := range peers {
|
||||
wg.Add(1)
|
||||
go func(p peer.ID) {
|
||||
defer wg.Done()
|
||||
err := dht.putProvider(ctx, p, string(key))
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}(p)
|
||||
}
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -137,6 +149,75 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]peer.PeerIn
|
||||
return providers, nil
|
||||
}
|
||||
|
||||
// Kademlia 'node lookup' operation. Returns a channel of the K closest peers
|
||||
// to the given key
|
||||
func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key) (<-chan peer.ID, error) {
|
||||
tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
|
||||
if len(tablepeers) == 0 {
|
||||
return nil, kb.ErrLookupFailure
|
||||
}
|
||||
|
||||
out := make(chan peer.ID, KValue)
|
||||
peerset := pset.NewLimited(KValue)
|
||||
|
||||
for _, p := range tablepeers {
|
||||
select {
|
||||
case out <- p:
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
peerset.Add(p)
|
||||
}
|
||||
|
||||
query := newQuery(key, dht.network, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
|
||||
closer, err := dht.closerPeersSingle(ctx, key, p)
|
||||
if err != nil {
|
||||
log.Errorf("error getting closer peers: %s", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var filtered []peer.PeerInfo
|
||||
for _, p := range closer {
|
||||
if kb.Closer(p, dht.self, key) && peerset.TryAdd(p) {
|
||||
select {
|
||||
case out <- p:
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
filtered = append(filtered, dht.peerstore.PeerInfo(p))
|
||||
}
|
||||
}
|
||||
|
||||
return &dhtQueryResult{closerPeers: filtered}, nil
|
||||
})
|
||||
|
||||
go func() {
|
||||
defer close(out)
|
||||
// run it!
|
||||
_, err := query.Run(ctx, tablepeers)
|
||||
if err != nil {
|
||||
log.Errorf("closestPeers query run error: %s", err)
|
||||
}
|
||||
}()
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) closerPeersSingle(ctx context.Context, key u.Key, p peer.ID) ([]peer.ID, error) {
|
||||
pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var out []peer.ID
|
||||
for _, pbp := range pmes.GetCloserPeers() {
|
||||
pid := peer.ID(pbp.GetId())
|
||||
dht.peerstore.AddAddresses(pid, pbp.Addresses())
|
||||
out = append(out, pid)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
|
||||
// Peers will be returned on the channel as soon as they are found, even before
|
||||
// the search query completes.
|
||||
@ -149,6 +230,7 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int
|
||||
|
||||
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, count int, peerOut chan peer.PeerInfo) {
|
||||
defer close(peerOut)
|
||||
defer log.Event(ctx, "findProviders end", &key)
|
||||
log.Debugf("%s FindProviders %s", dht.self, key)
|
||||
|
||||
ps := pset.NewLimited(count)
|
||||
@ -207,40 +289,11 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co
|
||||
}
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.Message_Peer, ps *pset.PeerSet, count int, out chan peer.PeerInfo) {
|
||||
var wg sync.WaitGroup
|
||||
peerInfos := pb.PBPeersToPeerInfos(peers)
|
||||
for _, pi := range peerInfos {
|
||||
wg.Add(1)
|
||||
go func(pi peer.PeerInfo) {
|
||||
defer wg.Done()
|
||||
|
||||
p := pi.ID
|
||||
if err := dht.ensureConnectedToPeer(ctx, p); err != nil {
|
||||
log.Errorf("%s", err)
|
||||
return
|
||||
}
|
||||
|
||||
dht.providers.AddProvider(k, p)
|
||||
if ps.TryAdd(p) {
|
||||
select {
|
||||
case out <- pi:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
} else if ps.Size() >= count {
|
||||
return
|
||||
}
|
||||
}(pi)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// FindPeer searches for a peer with given ID.
|
||||
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, error) {
|
||||
|
||||
// Check if were already connected to them
|
||||
if pi, _ := dht.FindLocal(id); pi.ID != "" {
|
||||
if pi := dht.FindLocal(id); pi.ID != "" {
|
||||
return pi, nil
|
||||
}
|
||||
|
||||
|
59
routing/kbucket/sorting.go
Normal file
59
routing/kbucket/sorting.go
Normal file
@ -0,0 +1,59 @@
|
||||
package kbucket
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
"sort"
|
||||
)
|
||||
|
||||
// A helper struct to sort peers by their distance to the local node
|
||||
type peerDistance struct {
|
||||
p peer.ID
|
||||
distance ID
|
||||
}
|
||||
|
||||
// peerSorterArr implements sort.Interface to sort peers by xor distance
|
||||
type peerSorterArr []*peerDistance
|
||||
|
||||
func (p peerSorterArr) Len() int { return len(p) }
|
||||
func (p peerSorterArr) Swap(a, b int) { p[a], p[b] = p[b], p[a] }
|
||||
func (p peerSorterArr) Less(a, b int) bool {
|
||||
return p[a].distance.less(p[b].distance)
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) peerSorterArr {
|
||||
for e := peerList.Front(); e != nil; e = e.Next() {
|
||||
p := e.Value.(peer.ID)
|
||||
pID := ConvertPeerID(p)
|
||||
pd := peerDistance{
|
||||
p: p,
|
||||
distance: xor(target, pID),
|
||||
}
|
||||
peerArr = append(peerArr, &pd)
|
||||
if e == nil {
|
||||
log.Debug("list element was nil")
|
||||
return peerArr
|
||||
}
|
||||
}
|
||||
return peerArr
|
||||
}
|
||||
|
||||
func SortClosestPeers(peers []peer.ID, target ID) []peer.ID {
|
||||
var psarr peerSorterArr
|
||||
for _, p := range peers {
|
||||
pID := ConvertPeerID(p)
|
||||
pd := &peerDistance{
|
||||
p: p,
|
||||
distance: xor(target, pID),
|
||||
}
|
||||
psarr = append(psarr, pd)
|
||||
}
|
||||
sort.Sort(psarr)
|
||||
var out []peer.ID
|
||||
for _, p := range psarr {
|
||||
out = append(out, p.p)
|
||||
}
|
||||
return out
|
||||
}
|
@ -2,7 +2,6 @@
|
||||
package kbucket
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
@ -103,40 +102,6 @@ func (rt *RoutingTable) nextBucket() peer.ID {
|
||||
return ""
|
||||
}
|
||||
|
||||
// A helper struct to sort peers by their distance to the local node
|
||||
type peerDistance struct {
|
||||
p peer.ID
|
||||
distance ID
|
||||
}
|
||||
|
||||
// peerSorterArr implements sort.Interface to sort peers by xor distance
|
||||
type peerSorterArr []*peerDistance
|
||||
|
||||
func (p peerSorterArr) Len() int { return len(p) }
|
||||
func (p peerSorterArr) Swap(a, b int) { p[a], p[b] = p[b], p[a] }
|
||||
func (p peerSorterArr) Less(a, b int) bool {
|
||||
return p[a].distance.less(p[b].distance)
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) peerSorterArr {
|
||||
for e := peerList.Front(); e != nil; e = e.Next() {
|
||||
p := e.Value.(peer.ID)
|
||||
pID := ConvertPeerID(p)
|
||||
pd := peerDistance{
|
||||
p: p,
|
||||
distance: xor(target, pID),
|
||||
}
|
||||
peerArr = append(peerArr, &pd)
|
||||
if e == nil {
|
||||
log.Debug("list element was nil")
|
||||
return peerArr
|
||||
}
|
||||
}
|
||||
return peerArr
|
||||
}
|
||||
|
||||
// Find a specific peer by ID or return nil
|
||||
func (rt *RoutingTable) Find(id peer.ID) peer.ID {
|
||||
srch := rt.NearestPeers(ConvertPeerID(id), 1)
|
||||
|
@ -17,7 +17,11 @@ import (
|
||||
)
|
||||
|
||||
func RandKeyPair(bits int) (ci.PrivKey, ci.PubKey, error) {
|
||||
return ci.GenerateKeyPair(ci.RSA, bits)
|
||||
return ci.GenerateKeyPairWithReader(ci.RSA, bits, u.NewTimeSeededRand())
|
||||
}
|
||||
|
||||
func SeededKeyPair(seed int64) (ci.PrivKey, ci.PubKey, error) {
|
||||
return ci.GenerateKeyPairWithReader(ci.RSA, 512, u.NewSeededRand(seed))
|
||||
}
|
||||
|
||||
// RandPeerID generates random "valid" peer IDs. it does not NEED to generate
|
||||
@ -120,7 +124,7 @@ func RandPeerNetParams() (*PeerNetParams, error) {
|
||||
var p PeerNetParams
|
||||
var err error
|
||||
p.Addr = RandLocalTCPAddress()
|
||||
p.PrivKey, p.PubKey, err = ci.GenerateKeyPair(ci.RSA, 512)
|
||||
p.PrivKey, p.PubKey, err = RandKeyPair(512)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -107,6 +107,13 @@ func NewTimeSeededRand() io.Reader {
|
||||
}
|
||||
}
|
||||
|
||||
func NewSeededRand(seed int64) io.Reader {
|
||||
src := rand.NewSource(seed)
|
||||
return &randGen{
|
||||
Rand: *rand.New(src),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *randGen) Read(p []byte) (n int, err error) {
|
||||
for i := 0; i < len(p); i++ {
|
||||
p[i] = byte(r.Rand.Intn(255))
|
||||
|
Reference in New Issue
Block a user