1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-28 08:47:42 +08:00

moved routing table code into its own package

This commit is contained in:
Jeromy
2014-08-08 19:58:42 -07:00
parent 1eaeb3ba29
commit 9f7604378c
7 changed files with 30 additions and 28 deletions

View File

@ -10,6 +10,7 @@ import (
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
swarm "github.com/jbenet/go-ipfs/swarm" swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
kb "github.com/jbenet/go-ipfs/routing/kbucket"
ma "github.com/jbenet/go-multiaddr" ma "github.com/jbenet/go-multiaddr"
@ -25,7 +26,7 @@ import (
type IpfsDHT struct { type IpfsDHT struct {
// Array of routing tables for differently distanced nodes // Array of routing tables for differently distanced nodes
// NOTE: (currently, only a single table is used) // NOTE: (currently, only a single table is used)
routes []*RoutingTable routes []*kb.RoutingTable
network *swarm.Swarm network *swarm.Swarm
@ -84,8 +85,8 @@ func NewDHT(p *peer.Peer) (*IpfsDHT, error) {
dht.listeners = make(map[uint64]*listenInfo) dht.listeners = make(map[uint64]*listenInfo)
dht.providers = make(map[u.Key][]*providerInfo) dht.providers = make(map[u.Key][]*providerInfo)
dht.shutdown = make(chan struct{}) dht.shutdown = make(chan struct{})
dht.routes = make([]*RoutingTable, 1) dht.routes = make([]*kb.RoutingTable, 1)
dht.routes[0] = NewRoutingTable(20, convertPeerID(p.ID)) dht.routes[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID))
dht.birth = time.Now() dht.birth = time.Now()
return dht, nil return dht, nil
} }
@ -253,7 +254,7 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) {
} }
} else if err == ds.ErrNotFound { } else if err == ds.ErrNotFound {
// Find closest peer(s) to desired key and reply with that info // Find closest peer(s) to desired key and reply with that info
closer := dht.routes[0].NearestPeer(convertKey(u.Key(pmes.GetKey()))) closer := dht.routes[0].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
resp = &DHTMessage{ resp = &DHTMessage{
Response: true, Response: true,
Id: *pmes.Id, Id: *pmes.Id,
@ -290,7 +291,7 @@ func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *PBDHTMessage) {
func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *PBDHTMessage) { func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *PBDHTMessage) {
success := true success := true
u.POut("handleFindPeer: searching for '%s'", peer.ID(pmes.GetKey()).Pretty()) u.POut("handleFindPeer: searching for '%s'", peer.ID(pmes.GetKey()).Pretty())
closest := dht.routes[0].NearestPeer(convertKey(u.Key(pmes.GetKey()))) closest := dht.routes[0].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
if closest == nil { if closest == nil {
u.PErr("handleFindPeer: could not find anything.") u.PErr("handleFindPeer: could not find anything.")
success = false success = false
@ -432,7 +433,7 @@ func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *PBDHTMessage) {
} }
dht.diaglock.Unlock() dht.diaglock.Unlock()
seq := dht.routes[0].NearestPeers(convertPeerID(dht.self.ID), 10) seq := dht.routes[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
listen_chan := dht.ListenFor(pmes.GetId(), len(seq), time.Second*30) listen_chan := dht.ListenFor(pmes.GetId(), len(seq), time.Second*30)
for _, ps := range seq { for _, ps := range seq {

View File

@ -37,7 +37,7 @@ func (dht *IpfsDHT) getDiagInfo() *diagInfo {
di.LifeSpan = time.Since(dht.birth) di.LifeSpan = time.Since(dht.birth)
di.Keys = nil // Currently no way to query datastore di.Keys = nil // Currently no way to query datastore
for _,p := range dht.routes[0].listpeers() { for _,p := range dht.routes[0].Listpeers() {
di.Connections = append(di.Connections, connDiagInfo{p.GetLatency(), p.ID}) di.Connections = append(di.Connections, connDiagInfo{p.GetLatency(), p.ID})
} }
return di return di

View File

@ -12,6 +12,7 @@ import (
ma "github.com/jbenet/go-multiaddr" ma "github.com/jbenet/go-multiaddr"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
kb "github.com/jbenet/go-ipfs/routing/kbucket"
swarm "github.com/jbenet/go-ipfs/swarm" swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
) )
@ -33,7 +34,7 @@ func GenerateMessageID() uint64 {
// This is the top level "Store" operation of the DHT // This is the top level "Store" operation of the DHT
func (s *IpfsDHT) PutValue(key u.Key, value []byte) error { func (s *IpfsDHT) PutValue(key u.Key, value []byte) error {
var p *peer.Peer var p *peer.Peer
p = s.routes[0].NearestPeer(convertKey(key)) p = s.routes[0].NearestPeer(kb.ConvertKey(key))
if p == nil { if p == nil {
return errors.New("Table returned nil peer!") return errors.New("Table returned nil peer!")
} }
@ -46,7 +47,7 @@ func (s *IpfsDHT) PutValue(key u.Key, value []byte) error {
// returned along with util.ErrSearchIncomplete // returned along with util.ErrSearchIncomplete
func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
var p *peer.Peer var p *peer.Peer
p = s.routes[0].NearestPeer(convertKey(key)) p = s.routes[0].NearestPeer(kb.ConvertKey(key))
if p == nil { if p == nil {
return nil, errors.New("Table returned nil peer!") return nil, errors.New("Table returned nil peer!")
} }
@ -90,7 +91,7 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
// Announce that this node can provide value for given key // Announce that this node can provide value for given key
func (s *IpfsDHT) Provide(key u.Key) error { func (s *IpfsDHT) Provide(key u.Key) error {
peers := s.routes[0].NearestPeers(convertKey(key), PoolSize) peers := s.routes[0].NearestPeers(kb.ConvertKey(key), PoolSize)
if len(peers) == 0 { if len(peers) == 0 {
//return an error //return an error
} }
@ -110,7 +111,7 @@ func (s *IpfsDHT) Provide(key u.Key) error {
// FindProviders searches for peers who can provide the value for given key. // FindProviders searches for peers who can provide the value for given key.
func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) { func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) {
p := s.routes[0].NearestPeer(convertKey(key)) p := s.routes[0].NearestPeer(kb.ConvertKey(key))
pmes := DHTMessage{ pmes := DHTMessage{
Type: PBDHTMessage_GET_PROVIDERS, Type: PBDHTMessage_GET_PROVIDERS,
@ -168,7 +169,7 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer,
// FindPeer searches for a peer with given ID. // FindPeer searches for a peer with given ID.
func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) { func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
p := s.routes[0].NearestPeer(convertPeerID(id)) p := s.routes[0].NearestPeer(kb.ConvertPeerID(id))
pmes := DHTMessage{ pmes := DHTMessage{
Type: PBDHTMessage_FIND_NODE, Type: PBDHTMessage_FIND_NODE,
@ -242,7 +243,7 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
func (dht *IpfsDHT) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) { func (dht *IpfsDHT) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
u.DOut("Begin Diagnostic") u.DOut("Begin Diagnostic")
//Send to N closest peers //Send to N closest peers
targets := dht.routes[0].NearestPeers(convertPeerID(dht.self.ID), 10) targets := dht.routes[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
// TODO: Add timeout to this struct so nodes know when to return // TODO: Add timeout to this struct so nodes know when to return
pmes := DHTMessage{ pmes := DHTMessage{

View File

@ -48,7 +48,7 @@ func (b *Bucket) Split(cpl int, target ID) *Bucket {
out := list.New() out := list.New()
e := bucket_list.Front() e := bucket_list.Front()
for e != nil { for e != nil {
peer_id := convertPeerID(e.Value.(*peer.Peer).ID) peer_id := ConvertPeerID(e.Value.(*peer.Peer).ID)
peer_cpl := prefLen(peer_id, target) peer_cpl := prefLen(peer_id, target)
if peer_cpl > cpl { if peer_cpl > cpl {
cur := e cur := e

View File

@ -36,7 +36,7 @@ func NewRoutingTable(bucketsize int, local_id ID) *RoutingTable {
func (rt *RoutingTable) Update(p *peer.Peer) *peer.Peer { func (rt *RoutingTable) Update(p *peer.Peer) *peer.Peer {
rt.tabLock.Lock() rt.tabLock.Lock()
defer rt.tabLock.Unlock() defer rt.tabLock.Unlock()
peer_id := convertPeerID(p.ID) peer_id := ConvertPeerID(p.ID)
cpl := xor(peer_id, rt.local).commonPrefixLen() cpl := xor(peer_id, rt.local).commonPrefixLen()
b_id := cpl b_id := cpl
@ -97,7 +97,7 @@ func (p peerSorterArr) Less(a, b int) bool {
func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) peerSorterArr { func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) peerSorterArr {
for e := peerList.Front(); e != nil; e = e.Next() { for e := peerList.Front(); e != nil; e = e.Next() {
p := e.Value.(*peer.Peer) p := e.Value.(*peer.Peer)
p_id := convertPeerID(p.ID) p_id := ConvertPeerID(p.ID)
pd := peerDistance{ pd := peerDistance{
p: p, p: p,
distance: xor(target, p_id), distance: xor(target, p_id),
@ -173,7 +173,7 @@ func (rt *RoutingTable) Size() int {
} }
// NOTE: This is potentially unsafe... use at your own risk // NOTE: This is potentially unsafe... use at your own risk
func (rt *RoutingTable) listpeers() []*peer.Peer { func (rt *RoutingTable) Listpeers() []*peer.Peer {
var peers []*peer.Peer var peers []*peer.Peer
for _,buck := range rt.Buckets { for _,buck := range rt.Buckets {
for e := buck.getIter(); e != nil; e = e.Next() { for e := buck.getIter(); e != nil; e = e.Next() {

View File

@ -36,7 +36,7 @@ func TestBucket(t *testing.T) {
} }
local := _randPeer() local := _randPeer()
local_id := convertPeerID(local.ID) local_id := ConvertPeerID(local.ID)
i := rand.Intn(len(peers)) i := rand.Intn(len(peers))
e := b.Find(peers[i].ID) e := b.Find(peers[i].ID)
@ -44,10 +44,10 @@ func TestBucket(t *testing.T) {
t.Errorf("Failed to find peer: %v", peers[i]) t.Errorf("Failed to find peer: %v", peers[i])
} }
spl := b.Split(0, convertPeerID(local.ID)) spl := b.Split(0, ConvertPeerID(local.ID))
llist := (*list.List)(b) llist := (*list.List)(b)
for e := llist.Front(); e != nil; e = e.Next() { for e := llist.Front(); e != nil; e = e.Next() {
p := convertPeerID(e.Value.(*peer.Peer).ID) p := ConvertPeerID(e.Value.(*peer.Peer).ID)
cpl := xor(p, local_id).commonPrefixLen() cpl := xor(p, local_id).commonPrefixLen()
if cpl > 0 { if cpl > 0 {
t.Fatalf("Split failed. found id with cpl > 0 in 0 bucket") t.Fatalf("Split failed. found id with cpl > 0 in 0 bucket")
@ -56,7 +56,7 @@ func TestBucket(t *testing.T) {
rlist := (*list.List)(spl) rlist := (*list.List)(spl)
for e := rlist.Front(); e != nil; e = e.Next() { for e := rlist.Front(); e != nil; e = e.Next() {
p := convertPeerID(e.Value.(*peer.Peer).ID) p := ConvertPeerID(e.Value.(*peer.Peer).ID)
cpl := xor(p, local_id).commonPrefixLen() cpl := xor(p, local_id).commonPrefixLen()
if cpl == 0 { if cpl == 0 {
t.Fatalf("Split failed. found id with cpl == 0 in non 0 bucket") t.Fatalf("Split failed. found id with cpl == 0 in non 0 bucket")
@ -67,7 +67,7 @@ func TestBucket(t *testing.T) {
// Right now, this just makes sure that it doesnt hang or crash // Right now, this just makes sure that it doesnt hang or crash
func TestTableUpdate(t *testing.T) { func TestTableUpdate(t *testing.T) {
local := _randPeer() local := _randPeer()
rt := NewRoutingTable(10, convertPeerID(local.ID)) rt := NewRoutingTable(10, ConvertPeerID(local.ID))
peers := make([]*peer.Peer, 100) peers := make([]*peer.Peer, 100)
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
@ -93,7 +93,7 @@ func TestTableUpdate(t *testing.T) {
func TestTableFind(t *testing.T) { func TestTableFind(t *testing.T) {
local := _randPeer() local := _randPeer()
rt := NewRoutingTable(10, convertPeerID(local.ID)) rt := NewRoutingTable(10, ConvertPeerID(local.ID))
peers := make([]*peer.Peer, 100) peers := make([]*peer.Peer, 100)
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
@ -102,7 +102,7 @@ func TestTableFind(t *testing.T) {
} }
t.Logf("Searching for peer: '%s'", peers[2].ID.Pretty()) t.Logf("Searching for peer: '%s'", peers[2].ID.Pretty())
found := rt.NearestPeer(convertPeerID(peers[2].ID)) found := rt.NearestPeer(ConvertPeerID(peers[2].ID))
if !found.ID.Equal(peers[2].ID) { if !found.ID.Equal(peers[2].ID) {
t.Fatalf("Failed to lookup known node...") t.Fatalf("Failed to lookup known node...")
} }
@ -110,7 +110,7 @@ func TestTableFind(t *testing.T) {
func TestTableFindMultiple(t *testing.T) { func TestTableFindMultiple(t *testing.T) {
local := _randPeer() local := _randPeer()
rt := NewRoutingTable(20, convertPeerID(local.ID)) rt := NewRoutingTable(20, ConvertPeerID(local.ID))
peers := make([]*peer.Peer, 100) peers := make([]*peer.Peer, 100)
for i := 0; i < 18; i++ { for i := 0; i < 18; i++ {
@ -119,7 +119,7 @@ func TestTableFindMultiple(t *testing.T) {
} }
t.Logf("Searching for peer: '%s'", peers[2].ID.Pretty()) t.Logf("Searching for peer: '%s'", peers[2].ID.Pretty())
found := rt.NearestPeers(convertPeerID(peers[2].ID), 15) found := rt.NearestPeers(ConvertPeerID(peers[2].ID), 15)
if len(found) != 15 { if len(found) != 15 {
t.Fatalf("Got back different number of peers than we expected.") t.Fatalf("Got back different number of peers than we expected.")
} }

View File

@ -71,12 +71,12 @@ func equalizeSizes(a, b ID) (ID, ID) {
return a, b return a, b
} }
func convertPeerID(id peer.ID) ID { func ConvertPeerID(id peer.ID) ID {
hash := sha256.Sum256(id) hash := sha256.Sum256(id)
return hash[:] return hash[:]
} }
func convertKey(id u.Key) ID { func ConvertKey(id u.Key) ID {
hash := sha256.Sum256([]byte(id)) hash := sha256.Sum256([]byte(id))
return hash[:] return hash[:]
} }