diff --git a/net/swarm/conn.go b/net/swarm/conn.go index 655a46c99..7b73e6e3c 100644 --- a/net/swarm/conn.go +++ b/net/swarm/conn.go @@ -7,10 +7,72 @@ import ( ident "github.com/jbenet/go-ipfs/identify" conn "github.com/jbenet/go-ipfs/net/conn" - + msg "github.com/jbenet/go-ipfs/net/message" u "github.com/jbenet/go-ipfs/util" + + ma "github.com/jbenet/go-multiaddr" ) +// Open listeners for each network the swarm should listen on +func (s *Swarm) listen() error { + hasErr := false + retErr := &ListenErr{ + Errors: make([]error, len(s.local.Addresses)), + } + + // listen on every address + for i, addr := range s.local.Addresses { + err := s.connListen(addr) + if err != nil { + hasErr = true + retErr.Errors[i] = err + u.PErr("Failed to listen on: %s [%s]", addr, err) + } + } + + if hasErr { + return retErr + } + return nil +} + +// Listen for new connections on the given multiaddr +func (s *Swarm) connListen(maddr *ma.Multiaddr) error { + netstr, addr, err := maddr.DialArgs() + if err != nil { + return err + } + + list, err := net.Listen(netstr, addr) + if err != nil { + return err + } + + // NOTE: this may require a lock around it later. currently, only run on setup + s.listeners = append(s.listeners, list) + + // Accept and handle new connections on this listener until it errors + go func() { + for { + nconn, err := list.Accept() + if err != nil { + e := fmt.Errorf("Failed to accept connection: %s - %s [%s]", + netstr, addr, err) + s.errChan <- e + + // if cancel is nil, we're closed. + if s.cancel == nil { + return + } + } else { + go s.handleIncomingConn(nconn) + } + } + }() + + return nil +} + // Handle getting ID from this peer, handshake, and adding it into the map func (s *Swarm) handleIncomingConn(nconn net.Conn) { @@ -67,3 +129,63 @@ func (s *Swarm) connHandshake(c *conn.Conn) error { // needs cleanup. needs context. use msg.Pipe. return ident.Handshake(s.local, c.Peer, c.Incoming.MsgChan, c.Outgoing.MsgChan) } + +// Handles the unwrapping + sending of messages to the right connection. +func (s *Swarm) fanOut() { + for { + select { + case <-s.ctx.Done(): + return // told to close. + + case msg, ok := <-s.Outgoing: + if !ok { + return + } + + s.connsLock.RLock() + conn, found := s.conns[msg.Peer.Key()] + s.connsLock.RUnlock() + + if !found { + e := fmt.Errorf("Sent msg to peer without open conn: %v", + msg.Peer) + s.errChan <- e + continue + } + + // queue it in the connection's buffer + conn.Outgoing.MsgChan <- msg.Data + } + } +} + +// Handles the receiving + wrapping of messages, per conn. +// Consider using reflect.Select with one goroutine instead of n. +func (s *Swarm) fanIn(c *conn.Conn) { + for { + select { + case <-s.ctx.Done(): + // close Conn. + c.Close() + goto out + + case <-c.Closed: + goto out + + case data, ok := <-c.Incoming.MsgChan: + if !ok { + e := fmt.Errorf("Error retrieving from conn: %v", c.Peer.Key().Pretty()) + s.errChan <- e + goto out + } + + msg := &msg.Message{Peer: c.Peer, Data: data} + s.Incoming <- msg + } + } + +out: + s.connsLock.Lock() + delete(s.conns, c.Peer.Key()) + s.connsLock.Unlock() +} diff --git a/net/swarm/swarm.go b/net/swarm/swarm.go index 104d51a2f..27c1a8770 100644 --- a/net/swarm/swarm.go +++ b/net/swarm/swarm.go @@ -78,66 +78,6 @@ func NewSwarm(ctx context.Context, local *peer.Peer) (*Swarm, error) { return s, s.listen() } -// Open listeners for each network the swarm should listen on -func (s *Swarm) listen() error { - hasErr := false - retErr := &ListenErr{ - Errors: make([]error, len(s.local.Addresses)), - } - - // listen on every address - for i, addr := range s.local.Addresses { - err := s.connListen(addr) - if err != nil { - hasErr = true - retErr.Errors[i] = err - u.PErr("Failed to listen on: %s [%s]", addr, err) - } - } - - if hasErr { - return retErr - } - return nil -} - -// Listen for new connections on the given multiaddr -func (s *Swarm) connListen(maddr *ma.Multiaddr) error { - netstr, addr, err := maddr.DialArgs() - if err != nil { - return err - } - - list, err := net.Listen(netstr, addr) - if err != nil { - return err - } - - // NOTE: this may require a lock around it later. currently, only run on setup - s.listeners = append(s.listeners, list) - - // Accept and handle new connections on this listener until it errors - go func() { - for { - nconn, err := list.Accept() - if err != nil { - e := fmt.Errorf("Failed to accept connection: %s - %s [%s]", - netstr, addr, err) - s.errChan <- e - - // if cancel is nil, we're closed. - if s.cancel == nil { - return - } - } else { - go s.handleIncomingConn(nconn) - } - } - }() - - return nil -} - // Close stops a swarm. func (s *Swarm) Close() error { if s.cancel == nil { @@ -218,66 +158,6 @@ func (s *Swarm) DialAddr(addr *ma.Multiaddr) (*conn.Conn, error) { return c, err } -// Handles the unwrapping + sending of messages to the right connection. -func (s *Swarm) fanOut() { - for { - select { - case <-s.ctx.Done(): - return // told to close. - - case msg, ok := <-s.Outgoing: - if !ok { - return - } - - s.connsLock.RLock() - conn, found := s.conns[msg.Peer.Key()] - s.connsLock.RUnlock() - - if !found { - e := fmt.Errorf("Sent msg to peer without open conn: %v", - msg.Peer) - s.errChan <- e - continue - } - - // queue it in the connection's buffer - conn.Outgoing.MsgChan <- msg.Data - } - } -} - -// Handles the receiving + wrapping of messages, per conn. -// Consider using reflect.Select with one goroutine instead of n. -func (s *Swarm) fanIn(c *conn.Conn) { - for { - select { - case <-s.ctx.Done(): - // close Conn. - c.Close() - goto out - - case <-c.Closed: - goto out - - case data, ok := <-c.Incoming.MsgChan: - if !ok { - e := fmt.Errorf("Error retrieving from conn: %v", c.Peer.Key().Pretty()) - s.errChan <- e - goto out - } - - msg := &msg.Message{Peer: c.Peer, Data: data} - s.Incoming <- msg - } - } - -out: - s.connsLock.Lock() - delete(s.conns, c.Peer.Key()) - s.connsLock.Unlock() -} - // GetPeer returns the peer in the swarm with given key id. func (s *Swarm) GetPeer(key u.Key) *peer.Peer { s.connsLock.RLock()