1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-25 23:21:54 +08:00

implement find peer rpc

This commit is contained in:
Jeromy
2014-08-05 20:31:48 -07:00
committed by Juan Batiz-Benet
parent 71c7c5844a
commit dc451fba2d
4 changed files with 67 additions and 22 deletions

View File

@ -10,13 +10,7 @@ import (
// Perform initial communication with this peer to share node ID's and
// initiate communication
func Handshake(self, remote *peer.Peer, in, out chan []byte) error {
// temporary:
// put your own id in a 16byte buffer and send that over to
// the peer as your ID, then wait for them to send their ID.
// Once that trade is finished, the handshake is complete and
// both sides should 'trust' each other
// TODO: make this more... secure.
out <- self.ID
resp := <-in
remote.ID = peer.ID(resp)

View File

@ -73,6 +73,7 @@ func (dht *IpfsDHT) Start() {
}
// Connect to a new peer at the given address
// TODO: move this into swarm
func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
if addr == nil {
panic("addr was nil!")
@ -90,9 +91,21 @@ func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
return nil, err
}
// Send node an address that you can be reached on
myaddr := dht.self.NetAddress("tcp")
mastr,err := myaddr.String()
if err != nil {
panic("No local address to send")
}
conn.Outgoing.MsgChan <- []byte(mastr)
dht.network.StartConn(conn)
dht.routes.Update(peer)
removed := dht.routes.Update(peer)
if removed != nil {
panic("need to remove this peer.")
}
return peer, nil
}
@ -115,7 +128,10 @@ func (dht *IpfsDHT) handleMessages() {
}
// Update peers latest visit in routing table
dht.routes.Update(mes.Peer)
removed := dht.routes.Update(mes.Peer)
if removed != nil {
panic("Need to handle removed peer.")
}
// Note: not sure if this is the correct place for this
if pmes.GetResponse() {
@ -140,7 +156,7 @@ func (dht *IpfsDHT) handleMessages() {
case DHTMessage_PUT_VALUE:
dht.handlePutValue(mes.Peer, pmes)
case DHTMessage_FIND_NODE:
dht.handleFindNode(mes.Peer, pmes)
dht.handleFindPeer(mes.Peer, pmes)
case DHTMessage_ADD_PROVIDER:
dht.handleAddProvider(mes.Peer, pmes)
case DHTMessage_GET_PROVIDERS:
@ -171,14 +187,14 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) {
mes := swarm.NewMessage(p, resp.ToProtobuf())
dht.network.Chan.Outgoing <- mes
} else if err == ds.ErrNotFound {
// Find closest node(s) to desired key and reply with that info
// Find closest peer(s) to desired key and reply with that info
// TODO: this will need some other metadata in the protobuf message
// to signal to the querying node that the data its receiving
// is actually a list of other nodes
// to signal to the querying peer that the data its receiving
// is actually a list of other peer
}
}
// Store a value in this nodes local storage
// Store a value in this peer local storage
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) {
dskey := ds.NewKey(pmes.GetKey())
err := dht.datastore.Put(dskey, pmes.GetValue())
@ -189,7 +205,7 @@ func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) {
}
func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) {
resp := &pDHTMessage{
resp := pDHTMessage{
Type: pmes.GetType(),
Response: true,
Id: pmes.GetId(),
@ -198,8 +214,29 @@ func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) {
dht.network.Chan.Outgoing <-swarm.NewMessage(p, resp.ToProtobuf())
}
func (dht *IpfsDHT) handleFindNode(p *peer.Peer, pmes *DHTMessage) {
panic("Not implemented.")
func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *DHTMessage) {
closest := dht.routes.NearestPeer(convertKey(u.Key(pmes.GetKey())))
if closest == nil {
}
if len(closest.Addresses) == 0 {
panic("no addresses for connected peer...")
}
addr,err := closest.Addresses[0].String()
if err != nil {
panic(err)
}
resp := pDHTMessage{
Type: pmes.GetType(),
Response: true,
Id: pmes.GetId(),
Value: []byte(addr),
}
mes := swarm.NewMessage(p, resp.ToProtobuf())
dht.network.Chan.Outgoing <-mes
}
func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) {
@ -269,13 +306,13 @@ func (dht *IpfsDHT) Unlisten(mesid uint64) {
close(ch)
}
// Stop all communications from this node and shut down
// Stop all communications from this peer and shut down
func (dht *IpfsDHT) Halt() {
dht.shutdown <- struct{}{}
dht.network.Close()
}
// Ping a node, log the time it took
// Ping a peer, log the time it took
func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
// Thoughts: maybe this should accept an ID and do a peer lookup?
u.DOut("Enter Ping.")
@ -294,8 +331,8 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
u.POut("Ping took %s.", roundtrip.String())
return nil
case <-tout:
// Timed out, think about removing node from network
u.DOut("Ping node timed out.")
// Timed out, think about removing peer from network
u.DOut("Ping peer timed out.")
return u.ErrTimeout
}
}

View File

@ -188,6 +188,12 @@ func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error
if err != nil {
return nil, err
}
panic("Not yet implemented.")
addr := string(pmes_out.GetValue())
maddr, err := ma.NewMultiaddr(addr)
if err != nil {
return nil, err
}
return s.Connect(maddr)
}
}

View File

@ -163,6 +163,14 @@ func (s *Swarm) handleNewConn(nconn net.Conn) {
panic(err)
}
// Get address to contact remote peer from
addr := <-conn.Incoming.MsgChan
maddr, err := ma.NewMultiaddr(string(addr))
if err != nil {
u.PErr("Got invalid address from peer.")
}
p.AddAddress(maddr)
s.StartConn(conn)
}