mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-29 01:12:24 +08:00
ping + find peer
This commit is contained in:

committed by
Brian Tiger Chow

parent
2522625bc6
commit
9eb41e7237
@ -3,9 +3,10 @@ package dht
|
|||||||
import (
|
import (
|
||||||
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
|
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
|
||||||
peer "github.com/jbenet/go-ipfs/peer"
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
func peerInfo(p *peer.Peer) *Message_Peer {
|
func peerToPBPeer(p *peer.Peer) *Message_Peer {
|
||||||
pbp := new(Message_Peer)
|
pbp := new(Message_Peer)
|
||||||
if len(p.Addresses) == 0 || p.Addresses[0] == nil {
|
if len(p.Addresses) == 0 || p.Addresses[0] == nil {
|
||||||
pbp.Addr = proto.String("")
|
pbp.Addr = proto.String("")
|
||||||
@ -22,11 +23,24 @@ func peerInfo(p *peer.Peer) *Message_Peer {
|
|||||||
return pbp
|
return pbp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func peersToPBPeers(peers []*peer.Peer) []*Message_Peer {
|
||||||
|
pbpeers = make([]*Message_Peer, len(peers))
|
||||||
|
for i, p := range peers {
|
||||||
|
pbpeers[i] = peerToPBPeer(p)
|
||||||
|
}
|
||||||
|
return pbpeers
|
||||||
|
}
|
||||||
|
|
||||||
// GetClusterLevel gets and adjusts the cluster level on the message.
|
// GetClusterLevel gets and adjusts the cluster level on the message.
|
||||||
// a +/- 1 adjustment is needed to distinguish a valid first level (1) and
|
// a +/- 1 adjustment is needed to distinguish a valid first level (1) and
|
||||||
// default "no value" protobuf behavior (0)
|
// default "no value" protobuf behavior (0)
|
||||||
func (m *Message) GetClusterLevel() int32 {
|
func (m *Message) GetClusterLevel() int32 {
|
||||||
return m.GetClusterLevelRaw() - 1
|
level := m.GetClusterLevelRaw() - 1
|
||||||
|
if level < 0 {
|
||||||
|
u.PErr("handleGetValue: no routing level specified, assuming 0\n")
|
||||||
|
level = 0
|
||||||
|
}
|
||||||
|
return level
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetClusterLevel adjusts and sets the cluster level on the message.
|
// SetClusterLevel adjusts and sets the cluster level on the message.
|
||||||
|
@ -169,14 +169,14 @@ func (dht *IpfsDHT) handlerForMsgType(t Message_MessageType) dhtHandler {
|
|||||||
return dht.handleGetValue
|
return dht.handleGetValue
|
||||||
// case Message_PUT_VALUE:
|
// case Message_PUT_VALUE:
|
||||||
// return dht.handlePutValue
|
// return dht.handlePutValue
|
||||||
// case Message_FIND_NODE:
|
case Message_FIND_NODE:
|
||||||
// return dht.handleFindPeer
|
return dht.handleFindPeer
|
||||||
// case Message_ADD_PROVIDER:
|
// case Message_ADD_PROVIDER:
|
||||||
// return dht.handleAddProvider
|
// return dht.handleAddProvider
|
||||||
// case Message_GET_PROVIDERS:
|
// case Message_GET_PROVIDERS:
|
||||||
// return dht.handleGetProviders
|
// return dht.handleGetProviders
|
||||||
// case Message_PING:
|
case Message_PING:
|
||||||
// return dht.handlePing
|
return dht.handlePing
|
||||||
// case Message_DIAGNOSTIC:
|
// case Message_DIAGNOSTIC:
|
||||||
// return dht.handleDiagnostic
|
// return dht.handleDiagnostic
|
||||||
default:
|
default:
|
||||||
@ -240,7 +240,7 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error
|
|||||||
provs := dht.providers.GetProviders(u.Key(pmes.GetKey()))
|
provs := dht.providers.GetProviders(u.Key(pmes.GetKey()))
|
||||||
if len(provs) > 0 {
|
if len(provs) > 0 {
|
||||||
u.DOut("handleGetValue returning %d provider[s]\n", len(provs))
|
u.DOut("handleGetValue returning %d provider[s]\n", len(provs))
|
||||||
resp.ProviderPeers = provs
|
resp.ProviderPeers = peersToPBPeers(provs)
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -249,11 +249,6 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error
|
|||||||
|
|
||||||
// stored levels are > 1, to distinguish missing levels.
|
// stored levels are > 1, to distinguish missing levels.
|
||||||
level := pmes.GetClusterLevel()
|
level := pmes.GetClusterLevel()
|
||||||
if level < 0 {
|
|
||||||
// TODO: maybe return an error? Defaulting isnt a good idea IMO
|
|
||||||
u.PErr("handleGetValue: no routing level specified, assuming 0\n")
|
|
||||||
level = 0
|
|
||||||
}
|
|
||||||
u.DOut("handleGetValue searching level %d clusters\n", level)
|
u.DOut("handleGetValue searching level %d clusters\n", level)
|
||||||
|
|
||||||
ck := kb.ConvertKey(u.Key(pmes.GetKey()))
|
ck := kb.ConvertKey(u.Key(pmes.GetKey()))
|
||||||
@ -275,7 +270,7 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error
|
|||||||
|
|
||||||
// we got a closer peer, it seems. return it.
|
// we got a closer peer, it seems. return it.
|
||||||
u.DOut("handleGetValue returning a closer peer: '%s'\n", closer.ID.Pretty())
|
u.DOut("handleGetValue returning a closer peer: '%s'\n", closer.ID.Pretty())
|
||||||
resp.CloserPeers = []*peer.Peer{closer}
|
resp.CloserPeers = peersToPBPeers([]*peer.Peer{closer})
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -291,48 +286,37 @@ func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *Message) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *Message) {
|
func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *Message) (*Message, error) {
|
||||||
u.DOut("[%s] Responding to ping from [%s]!\n", dht.self.ID.Pretty(), p.ID.Pretty())
|
u.DOut("[%s] Responding to ping from [%s]!\n", dht.self.ID.Pretty(), p.ID.Pretty())
|
||||||
resp := Message{
|
return &Message{Type: pmes.Type}, nil
|
||||||
Type: pmes.GetType(),
|
|
||||||
Response: true,
|
|
||||||
ID: pmes.GetId(),
|
|
||||||
}
|
|
||||||
|
|
||||||
dht.netChan.Outgoing <- swarm.NewMessage(p, resp.ToProtobuf())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *Message) {
|
func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *Message) (*Message, error) {
|
||||||
resp := Message{
|
resp := &Message{Type: pmes.Type}
|
||||||
Type: pmes.GetType(),
|
|
||||||
ID: pmes.GetId(),
|
level := pmes.GetClusterLevel()
|
||||||
Response: true,
|
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
mes := swarm.NewMessage(p, resp.ToProtobuf())
|
|
||||||
dht.netChan.Outgoing <- mes
|
|
||||||
}()
|
|
||||||
level := pmes.GetValue()[0]
|
|
||||||
u.DOut("handleFindPeer: searching for '%s'\n", peer.ID(pmes.GetKey()).Pretty())
|
u.DOut("handleFindPeer: searching for '%s'\n", peer.ID(pmes.GetKey()).Pretty())
|
||||||
closest := dht.routingTables[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
|
|
||||||
|
ck := kb.ConvertKey(u.Key(pmes.GetKey()))
|
||||||
|
closest := dht.routingTables[level].NearestPeer(ck)
|
||||||
if closest == nil {
|
if closest == nil {
|
||||||
u.PErr("handleFindPeer: could not find anything.\n")
|
u.PErr("handleFindPeer: could not find anything.\n")
|
||||||
return
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(closest.Addresses) == 0 {
|
if len(closest.Addresses) == 0 {
|
||||||
u.PErr("handleFindPeer: no addresses for connected peer...\n")
|
u.PErr("handleFindPeer: no addresses for connected peer...\n")
|
||||||
return
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the found peer further away than this peer...
|
// If the found peer further away than this peer...
|
||||||
if kb.Closer(dht.self.ID, closest.ID, u.Key(pmes.GetKey())) {
|
if kb.Closer(dht.self.ID, closest.ID, u.Key(pmes.GetKey())) {
|
||||||
return
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
u.DOut("handleFindPeer: sending back '%s'\n", closest.ID.Pretty())
|
u.DOut("handleFindPeer: sending back '%s'\n", closest.ID.Pretty())
|
||||||
resp.Peers = []*peer.Peer{closest}
|
resp.CloserPeers = peersToPBPeers([]*peer.Peer{closest})
|
||||||
resp.Success = true
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *Message) {
|
func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *Message) {
|
||||||
|
Reference in New Issue
Block a user