diff --git a/identify/identify.go b/identify/identify.go index 2a0ddcd37..b6c67f2c5 100644 --- a/identify/identify.go +++ b/identify/identify.go @@ -10,13 +10,7 @@ import ( // Perform initial communication with this peer to share node ID's and // initiate communication func Handshake(self, remote *peer.Peer, in, out chan []byte) error { - - // temporary: - // put your own id in a 16byte buffer and send that over to - // the peer as your ID, then wait for them to send their ID. - // Once that trade is finished, the handshake is complete and - // both sides should 'trust' each other - + // TODO: make this more... secure. out <- self.ID resp := <-in remote.ID = peer.ID(resp) diff --git a/routing/dht/dht.go b/routing/dht/dht.go index c12e35190..c2c1b63be 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -73,6 +73,7 @@ func (dht *IpfsDHT) Start() { } // Connect to a new peer at the given address +// TODO: move this into swarm func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) { if addr == nil { panic("addr was nil!") @@ -90,9 +91,21 @@ func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) { return nil, err } + // Send node an address that you can be reached on + myaddr := dht.self.NetAddress("tcp") + mastr,err := myaddr.String() + if err != nil { + panic("No local address to send") + } + + conn.Outgoing.MsgChan <- []byte(mastr) + dht.network.StartConn(conn) - dht.routes.Update(peer) + removed := dht.routes.Update(peer) + if removed != nil { + panic("need to remove this peer.") + } return peer, nil } @@ -115,7 +128,10 @@ func (dht *IpfsDHT) handleMessages() { } // Update peers latest visit in routing table - dht.routes.Update(mes.Peer) + removed := dht.routes.Update(mes.Peer) + if removed != nil { + panic("Need to handle removed peer.") + } // Note: not sure if this is the correct place for this if pmes.GetResponse() { @@ -140,7 +156,7 @@ func (dht *IpfsDHT) handleMessages() { case DHTMessage_PUT_VALUE: dht.handlePutValue(mes.Peer, pmes) case DHTMessage_FIND_NODE: - dht.handleFindNode(mes.Peer, pmes) + dht.handleFindPeer(mes.Peer, pmes) case DHTMessage_ADD_PROVIDER: dht.handleAddProvider(mes.Peer, pmes) case DHTMessage_GET_PROVIDERS: @@ -171,14 +187,14 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) { mes := swarm.NewMessage(p, resp.ToProtobuf()) dht.network.Chan.Outgoing <- mes } else if err == ds.ErrNotFound { - // Find closest node(s) to desired key and reply with that info + // Find closest peer(s) to desired key and reply with that info // TODO: this will need some other metadata in the protobuf message - // to signal to the querying node that the data its receiving - // is actually a list of other nodes + // to signal to the querying peer that the data its receiving + // is actually a list of other peer } } -// Store a value in this nodes local storage +// Store a value in this peer local storage func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) { dskey := ds.NewKey(pmes.GetKey()) err := dht.datastore.Put(dskey, pmes.GetValue()) @@ -189,7 +205,7 @@ func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) { } func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) { - resp := &pDHTMessage{ + resp := pDHTMessage{ Type: pmes.GetType(), Response: true, Id: pmes.GetId(), @@ -198,8 +214,29 @@ func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) { dht.network.Chan.Outgoing <-swarm.NewMessage(p, resp.ToProtobuf()) } -func (dht *IpfsDHT) handleFindNode(p *peer.Peer, pmes *DHTMessage) { - panic("Not implemented.") +func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *DHTMessage) { + closest := dht.routes.NearestPeer(convertKey(u.Key(pmes.GetKey()))) + if closest == nil { + } + + if len(closest.Addresses) == 0 { + panic("no addresses for connected peer...") + } + + addr,err := closest.Addresses[0].String() + if err != nil { + panic(err) + } + + resp := pDHTMessage{ + Type: pmes.GetType(), + Response: true, + Id: pmes.GetId(), + Value: []byte(addr), + } + + mes := swarm.NewMessage(p, resp.ToProtobuf()) + dht.network.Chan.Outgoing <-mes } func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) { @@ -269,13 +306,13 @@ func (dht *IpfsDHT) Unlisten(mesid uint64) { close(ch) } -// Stop all communications from this node and shut down +// Stop all communications from this peer and shut down func (dht *IpfsDHT) Halt() { dht.shutdown <- struct{}{} dht.network.Close() } -// Ping a node, log the time it took +// Ping a peer, log the time it took func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error { // Thoughts: maybe this should accept an ID and do a peer lookup? u.DOut("Enter Ping.") @@ -294,8 +331,8 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error { u.POut("Ping took %s.", roundtrip.String()) return nil case <-tout: - // Timed out, think about removing node from network - u.DOut("Ping node timed out.") + // Timed out, think about removing peer from network + u.DOut("Ping peer timed out.") return u.ErrTimeout } } diff --git a/routing/dht/routing.go b/routing/dht/routing.go index cbe0e9246..138a0ee92 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -188,6 +188,12 @@ func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error if err != nil { return nil, err } - panic("Not yet implemented.") + addr := string(pmes_out.GetValue()) + maddr, err := ma.NewMultiaddr(addr) + if err != nil { + return nil, err + } + + return s.Connect(maddr) } } diff --git a/swarm/swarm.go b/swarm/swarm.go index 451de76ed..882c0a05d 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -163,6 +163,14 @@ func (s *Swarm) handleNewConn(nconn net.Conn) { panic(err) } + // Get address to contact remote peer from + addr := <-conn.Incoming.MsgChan + maddr, err := ma.NewMultiaddr(string(addr)) + if err != nil { + u.PErr("Got invalid address from peer.") + } + p.AddAddress(maddr) + s.StartConn(conn) }