From 61f13ea7f7ef41defc9cb65ad5a7e818fc1501af Mon Sep 17 00:00:00 2001 From: Jeromy Johnson Date: Thu, 31 Jul 2014 17:43:48 -0700 Subject: [PATCH] begin planning of identification process --- identify/identify.go | 28 ++++++++++++++++ identify/message.proto | 3 ++ routing/dht/dht.go | 74 +++++++++++++++++++++++++++++++++++++----- swarm/swarm.go | 47 +++++++++++++++++++++++---- 4 files changed, 138 insertions(+), 14 deletions(-) create mode 100644 identify/identify.go create mode 100644 identify/message.proto diff --git a/identify/identify.go b/identify/identify.go new file mode 100644 index 000000000..87a8ec60d --- /dev/null +++ b/identify/identify.go @@ -0,0 +1,28 @@ +// The identify package handles how peers identify with eachother upon +// connection to the network +package identify + +import ( + peer "github.com/jbenet/go-ipfs/peer" + swarm "github.com/jbenet/go-ipfs/swarm" +) + +// Perform initial communication with this peer to share node ID's and +// initiate communication +func Handshake(self *peer.Peer, conn *swarm.Conn) error { + + // temporary: + // put your own id in a 16byte buffer and send that over to + // the peer as your ID, then wait for them to send their ID. + // Once that trade is finished, the handshake is complete and + // both sides should 'trust' each other + + id := make([]byte, 16) + copy(id, self.ID) + + conn.Outgoing.MsgChan <- id + resp := <-conn.Incoming.MsgChan + conn.Peer.ID = peer.ID(resp) + + return nil +} diff --git a/identify/message.proto b/identify/message.proto new file mode 100644 index 000000000..71804c883 --- /dev/null +++ b/identify/message.proto @@ -0,0 +1,3 @@ +message Identify { + required bytes id = 1; +} diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 0f55ba45b..62d4a71cf 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -2,10 +2,14 @@ package dht import ( "sync" + "time" peer "github.com/jbenet/go-ipfs/peer" swarm "github.com/jbenet/go-ipfs/swarm" u "github.com/jbenet/go-ipfs/util" + identify "github.com/jbenet/go-ipfs/identify" + + ma "github.com/jbenet/go-multiaddr" ds "github.com/jbenet/datastore.go" @@ -35,15 +39,44 @@ type IpfsDHT struct { shutdown chan struct{} } -func NewDHT(p *peer.Peer) *IpfsDHT { +// Create a new DHT object with the given peer as the 'local' host +func NewDHT(p *peer.Peer) (*IpfsDHT, error) { dht := new(IpfsDHT) - dht.self = p + dht.network = swarm.NewSwarm(p) + //TODO: should Listen return an error? + dht.network.Listen() + + dht.datastore = ds.NewMapDatastore() + + dht.self = p dht.listeners = make(map[uint64]chan *swarm.Message) dht.shutdown = make(chan struct{}) - return dht + return dht, nil } +// Connect to a new peer at the given address +func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) error { + peer := new(peer.Peer) + peer.AddAddress(addr) + + conn,err := swarm.Dial("tcp", peer) + if err != nil { + return err + } + + err = identify.Handshake(dht.self, conn) + if err != nil { + return err + } + + dht.network.StartConn(conn.Peer.Key(), conn) + + // TODO: Add this peer to our routing table + return nil +} + + // Read in all messages from swarm and handle them appropriately // NOTE: this function is just a quick sketch func (dht *IpfsDHT) handleMessages() { @@ -134,11 +167,9 @@ func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) { resp := new(DHTMessage) resp.Id = pmes.Id resp.Response = &isResponse + resp.Type = pmes.Type - mes := new(swarm.Message) - mes.Peer = p - mes.Data = []byte(resp.String()) - dht.network.Chan.Outgoing <- mes + dht.network.Chan.Outgoing <-swarm.NewMessage(p, []byte(resp.String())) } @@ -162,9 +193,36 @@ func (dht *IpfsDHT) Unlisten(mesid uint64) { close(ch) } - // Stop all communications from this node and shut down func (dht *IpfsDHT) Halt() { dht.shutdown <- struct{}{} dht.network.Close() } + +// Ping a node, log the time it took +func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) { + // Thoughts: maybe this should accept an ID and do a peer lookup? + id := GenerateMessageID() + mes_type := DHTMessage_PING + pmes := new(DHTMessage) + pmes.Id = &id + pmes.Type = &mes_type + + mes := new(swarm.Message) + mes.Peer = p + mes.Data = []byte(pmes.String()) + + before := time.Now() + response_chan := dht.ListenFor(id) + dht.network.Chan.Outgoing <- mes + + tout := time.After(timeout) + select { + case <-response_chan: + roundtrip := time.Since(before) + u.DOut("Ping took %s.", roundtrip.String()) + case <-tout: + // Timed out, think about removing node from network + u.DOut("Ping node timed out.") + } +} diff --git a/swarm/swarm.go b/swarm/swarm.go index fccc74777..8f6676190 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -2,11 +2,12 @@ package swarm import ( "fmt" + "net" + "sync" + peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" ma "github.com/jbenet/go-multiaddr" - "net" - "sync" ) // Message represents a packet of information sent to or received from a @@ -19,6 +20,14 @@ type Message struct { Data []byte } +// Cleaner looking helper function to make a new message struct +func NewMessage(p *peer.Peer, data []byte) *Message { + return &Message{ + Peer: p, + Data: data, + } +} + // Chan is a swam channel, which provides duplex communication and errors. type Chan struct { Outgoing chan *Message @@ -87,7 +96,8 @@ func (s *Swarm) connListen(maddr *ma.Multiaddr) error { for { nconn, err := list.Accept() if err != nil { - u.PErr("Failed to accept connection: %s - %s", netstr, addr) + u.PErr("Failed to accept connection: %s - %s [%s]", netstr, + addr, err) return } go s.handleNewConn(nconn) @@ -99,7 +109,27 @@ func (s *Swarm) connListen(maddr *ma.Multiaddr) error { // Handle getting ID from this peer and adding it into the map func (s *Swarm) handleNewConn(nconn net.Conn) { - panic("Not yet implemented!") + p := MakePeerFromConn(nconn) + + var addr *ma.Multiaddr + + //naddr := nconn.RemoteAddr() + //addr := ma.FromDialArgs(naddr.Network(), naddr.String()) + + conn := &Conn{ + Peer: p, + Addr: addr, + Conn: nconn, + } + + newConnChans(conn) + go s.fanIn(conn) +} + +// Negotiate with peer for its ID and create a peer object +// TODO: this might belong in the peer package +func MakePeerFromConn(conn net.Conn) *peer.Peer { + panic("Not yet implemented.") } // Close closes a swarm. @@ -140,6 +170,11 @@ func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error) { return nil, err } + s.StartConn(k, conn) + return conn, nil +} + +func (s *Swarm) StartConn(k u.Key, conn *Conn) { // add to conns s.connsLock.Lock() s.conns[k] = conn @@ -147,7 +182,6 @@ func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error) { // kick off reader goroutine go s.fanIn(conn) - return conn, nil } // Handles the unwrapping + sending of messages to the right connection. @@ -165,7 +199,8 @@ func (s *Swarm) fanOut() { conn, found := s.conns[msg.Peer.Key()] s.connsLock.RUnlock() if !found { - e := fmt.Errorf("Sent msg to peer without open conn: %v", msg.Peer) + e := fmt.Errorf("Sent msg to peer without open conn: %v", + msg.Peer) s.Chan.Errors <- e }