From deb7dd487286a80f1d4a9aa738159d660589f4f2 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 12 Jul 2014 01:51:32 -0700 Subject: [PATCH] added swarm (fanout/in with peers) --- swarm/conn.go | 96 +++++++++++++++--------------- swarm/conn_test.go | 135 +++++++++++++++++++++---------------------- swarm/swarm.go | 138 +++++++++++++++++++++++++++++++++++++++++++- swarm/swarm_test.go | 109 ++++++++++++++++++++++++++++++++++ 4 files changed, 360 insertions(+), 118 deletions(-) create mode 100644 swarm/swarm_test.go diff --git a/swarm/conn.go b/swarm/conn.go index 4241d6298..73116d38b 100644 --- a/swarm/conn.go +++ b/swarm/conn.go @@ -1,72 +1,74 @@ package swarm import ( - "fmt" - "net" - ma "github.com/jbenet/go-multiaddr" - peer "github.com/jbenet/go-ipfs/peer" - msgio "github.com/jbenet/go-msgio" + "fmt" + peer "github.com/jbenet/go-ipfs/peer" + u "github.com/jbenet/go-ipfs/util" + msgio "github.com/jbenet/go-msgio" + ma "github.com/jbenet/go-multiaddr" + "net" ) const ChanBuffer = 10 type Conn struct { - Peer *peer.Peer - Addr *ma.Multiaddr - Conn net.Conn + Peer *peer.Peer + Addr *ma.Multiaddr + Conn net.Conn - Closed chan bool - Outgoing *msgio.Chan - Incoming *msgio.Chan + Closed chan bool + Outgoing *msgio.Chan + Incoming *msgio.Chan } +type ConnMap map[u.Key]*Conn func Dial(network string, peer *peer.Peer) (*Conn, error) { - addr := peer.NetAddress(network) - if addr == nil { - return nil, fmt.Errorf("No address for network %s", network) - } + addr := peer.NetAddress(network) + if addr == nil { + return nil, fmt.Errorf("No address for network %s", network) + } - network, host, err := addr.DialArgs() - if err != nil { - return nil, err - } + network, host, err := addr.DialArgs() + if err != nil { + return nil, err + } - nconn, err := net.Dial(network, host) - if err != nil { - return nil, err - } + nconn, err := net.Dial(network, host) + if err != nil { + return nil, err + } - out := msgio.NewChan(10) - inc := msgio.NewChan(10) + out := msgio.NewChan(10) + inc := msgio.NewChan(10) - conn := &Conn{ - Peer: peer, - Addr: addr, - Conn: nconn, + conn := &Conn{ + Peer: peer, + Addr: addr, + Conn: nconn, - Outgoing: out, - Incoming: inc, - Closed: make(chan bool, 1), - } + Outgoing: out, + Incoming: inc, + Closed: make(chan bool, 1), + } - go out.WriteTo(nconn) - go inc.ReadFrom(nconn, 1 << 12) + go out.WriteTo(nconn) + go inc.ReadFrom(nconn, 1<<12) - return conn, nil + return conn, nil } func (s *Conn) Close() error { - if s.Conn == nil { - return fmt.Errorf("Already closed.") // already closed - } + if s.Conn == nil { + return fmt.Errorf("Already closed.") // already closed + } - // closing net connection - err := s.Conn.Close() - s.Conn = nil - // closing channels - s.Incoming.Close() - s.Outgoing.Close() - s.Closed<- true - return err + // closing net connection + err := s.Conn.Close() + s.Conn = nil + // closing channels + s.Incoming.Close() + s.Outgoing.Close() + s.Closed <- true + return err } diff --git a/swarm/conn_test.go b/swarm/conn_test.go index 6f989a502..f53a202ac 100644 --- a/swarm/conn_test.go +++ b/swarm/conn_test.go @@ -1,93 +1,90 @@ package swarm import ( - "fmt" - "net" - "testing" - ma "github.com/jbenet/go-multiaddr" - mh "github.com/jbenet/go-multihash" - peer "github.com/jbenet/go-ipfs/peer" + "fmt" + peer "github.com/jbenet/go-ipfs/peer" + ma "github.com/jbenet/go-multiaddr" + mh "github.com/jbenet/go-multihash" + "net" + "testing" ) func setupPeer(id string, addr string) (*peer.Peer, error) { - tcp, err := ma.NewMultiaddr(addr) - if err != nil { - return nil, err - } + tcp, err := ma.NewMultiaddr(addr) + if err != nil { + return nil, err + } - mh, err := mh.FromHexString(id) - if err != nil { - return nil, err - } + mh, err := mh.FromHexString(id) + if err != nil { + return nil, err + } - p := &peer.Peer{Id: peer.PeerId(mh)} - p.AddAddress(tcp) - return p, nil + p := &peer.Peer{Id: peer.PeerId(mh)} + p.AddAddress(tcp) + return p, nil } func echoListen(listener *net.TCPListener) { - for { - c, err := listener.Accept() - if err == nil { - fmt.Println("accepeted") - go echo(c) - } - } + for { + c, err := listener.Accept() + if err == nil { + fmt.Println("accepeted") + go echo(c) + } + } } func echo(c net.Conn) { - for { - data := make([]byte, 1024) - i, err := c.Read(data) - if err != nil { - fmt.Printf("error %v\n", err) - return - } - _, err = c.Write(data[:i]) - if err != nil { - fmt.Printf("error %v\n", err) - return - } - fmt.Println("echoing", data[:i]) - } + for { + data := make([]byte, 1024) + i, err := c.Read(data) + if err != nil { + fmt.Printf("error %v\n", err) + return + } + _, err = c.Write(data[:i]) + if err != nil { + fmt.Printf("error %v\n", err) + return + } + fmt.Println("echoing", data[:i]) + } } func TestDial(t *testing.T) { - listener, err := net.Listen("tcp", "127.0.0.1:1234") - if err != nil { - t.Fatal("error setting up listener", err) - } - go echoListen(listener.(*net.TCPListener)) + listener, err := net.Listen("tcp", "127.0.0.1:1234") + if err != nil { + t.Fatal("error setting up listener", err) + } + go echoListen(listener.(*net.TCPListener)) - p, err := setupPeer("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33", "/ip4/127.0.0.1/tcp/1234") - if err != nil { - t.Fatal("error setting up peer", err) - } + p, err := setupPeer("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33", "/ip4/127.0.0.1/tcp/1234") + if err != nil { + t.Fatal("error setting up peer", err) + } - c, err := Dial("tcp", p) - if err != nil { - t.Fatal("error dialing peer", err) - } + c, err := Dial("tcp", p) + if err != nil { + t.Fatal("error dialing peer", err) + } + fmt.Println("sending") + c.Outgoing.MsgChan <- []byte("beep") + c.Outgoing.MsgChan <- []byte("boop") + out := <-c.Incoming.MsgChan + fmt.Println("recving", string(out)) + if string(out) != "beep" { + t.Error("unexpected conn output") + } - fmt.Println("sending") - c.Outgoing.MsgChan<- []byte("beep") - c.Outgoing.MsgChan<- []byte("boop") - out := <-c.Incoming.MsgChan - fmt.Println("recving", string(out)) - if string(out) != "beep" { - t.Error("unexpected conn output") - } + out = <-c.Incoming.MsgChan + if string(out) != "boop" { + t.Error("unexpected conn output") + } - out = <-c.Incoming.MsgChan - if string(out) != "boop" { - t.Error("unexpected conn output") - } - - fmt.Println("closing") - c.Close() - listener.Close() + fmt.Println("closing") + c.Close() + listener.Close() } - - diff --git a/swarm/swarm.go b/swarm/swarm.go index 1b3d3a0cd..bcdb35fcd 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -1,9 +1,143 @@ package swarm import ( + "fmt" + peer "github.com/jbenet/go-ipfs/peer" + "sync" ) -type Swarm struct { - Conns map[string]*Conn +type Message struct { + // To or from, depending on direction. + Peer *peer.Peer + + // Opaque data + Data []byte } +type Chan struct { + Outgoing chan Message + Incoming chan Message + Errors chan error + Close chan bool +} + +func NewChan(bufsize int) *Chan { + return &Chan{ + Outgoing: make(chan Message, bufsize), + Incoming: make(chan Message, bufsize), + Errors: make(chan error), + Close: make(chan bool, bufsize), + } +} + +type Swarm struct { + Chan *Chan + conns ConnMap + connsLock sync.RWMutex +} + +func NewSwarm() *Swarm { + s := &Swarm{ + Chan: NewChan(10), + conns: ConnMap{}, + } + go s.fanOut() + return s +} + +func (s *Swarm) Close() { + s.connsLock.RLock() + l := len(s.conns) + s.connsLock.RUnlock() + + for i := 0; i < l; i++ { + s.Chan.Close <- true // fan ins + } + s.Chan.Close <- true // fan out + s.Chan.Close <- true // listener +} + +func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error) { + k := peer.Key() + + // check if we already have an open connection first + s.connsLock.RLock() + conn, found := s.conns[k] + s.connsLock.RUnlock() + if found { + return conn, nil + } + + // open connection to peer + conn, err := Dial("tcp", peer) + if err != nil { + return nil, err + } + + // add to conns + s.connsLock.Lock() + s.conns[k] = conn + s.connsLock.Unlock() + + // kick off reader goroutine + go s.fanIn(conn) + return conn, nil +} + +// Handles the unwrapping + sending of messages to the right connection. +func (s *Swarm) fanOut() { + for { + select { + case <-s.Chan.Close: + return // told to close. + case msg, ok := <-s.Chan.Outgoing: + if !ok { + return + } + + s.connsLock.RLock() + conn, found := s.conns[msg.Peer.Key()] + s.connsLock.RUnlock() + if !found { + e := fmt.Errorf("Sent msg to peer without open conn: %v", msg.Peer) + s.Chan.Errors <- e + } + + // queue it in the connection's buffer + conn.Outgoing.MsgChan <- msg.Data + } + } +} + +// Handles the receiving + wrapping of messages, per conn. +// Consider using reflect.Select with one goroutine instead of n. +func (s *Swarm) fanIn(conn *Conn) { +Loop: + for { + select { + case <-s.Chan.Close: + // close Conn. + conn.Close() + break Loop + + case <-conn.Closed: + break Loop + + case data, ok := <-conn.Incoming.MsgChan: + fmt.Println("got back data", data) + if !ok { + e := fmt.Errorf("Error retrieving from conn: %v", conn) + s.Chan.Errors <- e + break Loop + } + + // wrap it for consumers. + msg := Message{Peer: conn.Peer, Data: data} + s.Chan.Incoming <- msg + } + } + + s.connsLock.Lock() + delete(s.conns, conn.Peer.Key()) + s.connsLock.Unlock() +} diff --git a/swarm/swarm_test.go b/swarm/swarm_test.go new file mode 100644 index 000000000..6fbdc21d9 --- /dev/null +++ b/swarm/swarm_test.go @@ -0,0 +1,109 @@ +package swarm + +import ( + "fmt" + peer "github.com/jbenet/go-ipfs/peer" + u "github.com/jbenet/go-ipfs/util" + msgio "github.com/jbenet/go-msgio" + "net" + "testing" +) + +func pingListen(listener *net.TCPListener, peer *peer.Peer) { + for { + c, err := listener.Accept() + if err == nil { + fmt.Println("accepeted") + go pong(c, peer) + } + } +} + +func pong(c net.Conn, peer *peer.Peer) { + mrw := msgio.NewReadWriter(c) + for { + data := make([]byte, 1024) + n, err := mrw.ReadMsg(data) + if err != nil { + fmt.Printf("error %v\n", err) + return + } + if string(data[:n]) != "ping" { + fmt.Printf("error: didn't receive ping: '%v'\n", data[:n]) + return + } + err = mrw.WriteMsg([]byte("pong")) + if err != nil { + fmt.Printf("error %v\n", err) + return + } + fmt.Println("pong") + } +} + +func TestSwarm(t *testing.T) { + + swarm := NewSwarm() + peers := []*peer.Peer{} + listeners := []*net.Listener{} + peerNames := map[string]string{ + "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a30": "/ip4/127.0.0.1/tcp/1234", + "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31": "/ip4/127.0.0.1/tcp/2345", + "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a32": "/ip4/127.0.0.1/tcp/3456", + "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33": "/ip4/127.0.0.1/tcp/4567", + } + + for k, n := range peerNames { + peer, err := setupPeer(k, n) + if err != nil { + t.Fatal("error setting up peer", err) + } + a := peer.NetAddress("tcp") + if a == nil { + t.Fatal("error setting up peer (addr is nil)", peer) + } + n, h, err := a.DialArgs() + if err != nil { + t.Fatal("error getting dial args from addr") + } + listener, err := net.Listen(n, h) + if err != nil { + t.Fatal("error setting up listener", err) + } + go pingListen(listener.(*net.TCPListener), peer) + + _, err = swarm.Dial(peer) + if err != nil { + t.Fatal("error swarm dialing to peer", err) + } + + // ok done, add it. + peers = append(peers, peer) + listeners = append(listeners, &listener) + } + + for i, p := range peers { + swarm.Chan.Outgoing <- Message{Peer: p, Data: []byte("ping")} + fmt.Println("ping", i) + } + + got := map[u.Key]bool{} + for _, _ = range peers { + msg := <-swarm.Chan.Incoming + fmt.Println("recving", string(msg.Data)) + if string(msg.Data) != "pong" { + t.Error("unexpected conn output", msg.Data) + } + got[msg.Peer.Key()] = true + } + + if len(peers) != len(got) { + t.Error("got less messages than sent") + } + + fmt.Println("closing") + swarm.Close() + for _, listener := range listeners { + (*listener).(*net.TCPListener).Close() + } +}