diff --git a/metrics/conn/conn.go b/metrics/conn/conn.go index 57ca6c80d..d20974592 100644 --- a/metrics/conn/conn.go +++ b/metrics/conn/conn.go @@ -1,22 +1,22 @@ package meterconn import ( - manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" metrics "github.com/ipfs/go-ipfs/metrics" + transport "github.com/ipfs/go-ipfs/p2p/net/transport" ) type MeteredConn struct { mesRecv metrics.MeterCallback mesSent metrics.MeterCallback - manet.Conn + transport.Conn } -func WrapConn(bwc metrics.Reporter, c manet.Conn) manet.Conn { +func WrapConn(bwc metrics.Reporter, c transport.Conn) transport.Conn { return newMeteredConn(c, bwc.LogRecvMessage, bwc.LogSentMessage) } -func newMeteredConn(base manet.Conn, rcb metrics.MeterCallback, scb metrics.MeterCallback) manet.Conn { +func newMeteredConn(base transport.Conn, rcb metrics.MeterCallback, scb metrics.MeterCallback) transport.Conn { return &MeteredConn{ Conn: base, mesRecv: rcb, diff --git a/p2p/net/conn/dial.go b/p2p/net/conn/dial.go index a9a1a7aaf..76f4b35f6 100644 --- a/p2p/net/conn/dial.go +++ b/p2p/net/conn/dial.go @@ -3,23 +3,32 @@ package conn import ( "fmt" "math/rand" - "net" "strings" - "syscall" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" - reuseport "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables" + ci "github.com/ipfs/go-ipfs/p2p/crypto" addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr" + transport "github.com/ipfs/go-ipfs/p2p/net/transport" peer "github.com/ipfs/go-ipfs/p2p/peer" ) +type WrapFunc func(transport.Conn) transport.Conn + +func NewDialer(p peer.ID, pk ci.PrivKey, wrap WrapFunc) *Dialer { + return &Dialer{ + LocalPeer: p, + PrivateKey: pk, + Wrapper: wrap, + } +} + // String returns the string rep of d. func (d *Dialer) String() string { - return fmt.Sprintf("", d.LocalPeer, d.LocalAddrs[0]) + return fmt.Sprintf("", d.LocalPeer) } // Dial connects to a peer over a particular address @@ -95,112 +104,34 @@ func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) ( return connOut, nil } -// rawConnDial dials the underlying net.Conn + manet.Conns -func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (manet.Conn, error) { +func (d *Dialer) AddDialer(pd transport.Dialer) { + d.Dialers = append(d.Dialers, pd) +} - // before doing anything, check we're going to be able to dial. - // we may not support the given address. - if _, _, err := manet.DialArgs(raddr); err != nil { - return nil, err +// returns dialer that can dial the given address +func (d *Dialer) subDialerForAddr(raddr ma.Multiaddr) transport.Dialer { + for _, pd := range d.Dialers { + if pd.Matches(raddr) { + return pd + } } + return nil +} + +// rawConnDial dials the underlying net.Conn + manet.Conns +func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (transport.Conn, error) { if strings.HasPrefix(raddr.String(), "/ip4/0.0.0.0") { log.Event(ctx, "connDialZeroAddr", lgbl.Dial("conn", d.LocalPeer, remote, nil, raddr)) return nil, fmt.Errorf("Attempted to connect to zero address: %s", raddr) } - // get local addr to use. - laddr := pickLocalAddr(d.LocalAddrs, raddr) - logdial := lgbl.Dial("conn", d.LocalPeer, remote, laddr, raddr) - defer log.EventBegin(ctx, "connDialRawConn", logdial).Done() - - // make a copy of the manet.Dialer, we may need to change its timeout. - madialer := d.Dialer - - if laddr != nil && reuseportIsAvailable() { - // we're perhaps going to dial twice. half the timeout, so we can afford to. - // otherwise our context would expire right after the first dial. - madialer.Dialer.Timeout = (madialer.Dialer.Timeout / 2) - - // dial using reuseport.Dialer, because we're probably reusing addrs. - // this is optimistic, as the reuseDial may fail to bind the port. - rpev := log.EventBegin(ctx, "connDialReusePort", logdial) - if nconn, retry, reuseErr := reuseDial(madialer.Dialer, laddr, raddr); reuseErr == nil { - // if it worked, wrap the raw net.Conn with our manet.Conn - logdial["reuseport"] = "success" - rpev.Done() - return manet.WrapNetConn(nconn) - } else if !retry { - // reuseDial is sure this is a legitimate dial failure, not a reuseport failure. - logdial["reuseport"] = "failure" - logdial["error"] = reuseErr - rpev.Done() - return nil, reuseErr - } else { - // this is a failure to reuse port. log it. - logdial["reuseport"] = "retry" - logdial["error"] = reuseErr - rpev.Done() - } + sd := d.subDialerForAddr(raddr) + if sd == nil { + return nil, fmt.Errorf("no dialer for %s", raddr) } - defer log.EventBegin(ctx, "connDialManet", logdial).Done() - return madialer.Dial(raddr) -} - -func reuseDial(dialer net.Dialer, laddr, raddr ma.Multiaddr) (conn net.Conn, retry bool, err error) { - if laddr == nil { - // if we're given no local address no sense in using reuseport to dial, dial out as usual. - return nil, true, reuseport.ErrReuseFailed - } - - // give reuse.Dialer the manet.Dialer's Dialer. - // (wow, Dialer should've so been an interface...) - rd := reuseport.Dialer{dialer} - - // get the local net.Addr manually - rd.D.LocalAddr, err = manet.ToNetAddr(laddr) - if err != nil { - return nil, true, err // something wrong with laddr. retry without. - } - - // get the raddr dial args for rd.dial - network, netraddr, err := manet.DialArgs(raddr) - if err != nil { - return nil, true, err // something wrong with laddr. retry without. - } - - // rd.Dial gets us a net.Conn with SO_REUSEPORT and SO_REUSEADDR set. - conn, err = rd.Dial(network, netraddr) - return conn, reuseErrShouldRetry(err), err // hey! it worked! -} - -// reuseErrShouldRetry diagnoses whether to retry after a reuse error. -// if we failed to bind, we should retry. if bind worked and this is a -// real dial error (remote end didnt answer) then we should not retry. -func reuseErrShouldRetry(err error) bool { - if err == nil { - return false // hey, it worked! no need to retry. - } - - // if it's a network timeout error, it's a legitimate failure. - if nerr, ok := err.(net.Error); ok && nerr.Timeout() { - return false - } - - errno, ok := err.(syscall.Errno) - if !ok { // not an errno? who knows what this is. retry. - return true - } - - switch errno { - case syscall.EADDRINUSE, syscall.EADDRNOTAVAIL: - return true // failure to bind. retry. - case syscall.ECONNREFUSED: - return false // real dial error - default: - return true // optimistically default to retry. - } + return sd.Dial(raddr) } func pickLocalAddr(laddrs []ma.Multiaddr, raddr ma.Multiaddr) (laddr ma.Multiaddr) { diff --git a/p2p/net/conn/dial_test.go b/p2p/net/conn/dial_test.go index 4c5b584bc..78c9d1d12 100644 --- a/p2p/net/conn/dial_test.go +++ b/p2p/net/conn/dial_test.go @@ -8,8 +8,12 @@ import ( "testing" "time" + ic "github.com/ipfs/go-ipfs/p2p/crypto" + transport "github.com/ipfs/go-ipfs/p2p/net/transport" + peer "github.com/ipfs/go-ipfs/p2p/peer" tu "github.com/ipfs/go-ipfs/util/testutil" + ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" ) @@ -49,6 +53,25 @@ func setupSingleConn(t *testing.T, ctx context.Context) (a, b Conn, p1, p2 tu.Pe return setupConn(t, ctx, false) } +func Listen(ctx context.Context, addr ma.Multiaddr, local peer.ID, sk ic.PrivKey) (Listener, error) { + list, err := transport.NewTCPTransport().Listen(addr) + if err != nil { + return nil, err + } + + return WrapTransportListener(ctx, list, local, sk) +} + +func dialer(t *testing.T, a ma.Multiaddr) transport.Dialer { + tpt := transport.NewTCPTransport() + tptd, err := tpt.Dialer(a) + if err != nil { + t.Fatal(err) + } + + return tptd +} + func setupConn(t *testing.T, ctx context.Context, secure bool) (a, b Conn, p1, p2 tu.PeerNetParams) { p1 = tu.RandPeerNetParamsOrFatal(t) @@ -71,6 +94,8 @@ func setupConn(t *testing.T, ctx context.Context, secure bool) (a, b Conn, p1, p PrivateKey: key2, } + d2.AddDialer(dialer(t, p2.Addr)) + var c2 Conn done := make(chan error) @@ -152,6 +177,7 @@ func testDialer(t *testing.T, secure bool) { LocalPeer: p2.ID, PrivateKey: key2, } + d2.AddDialer(dialer(t, p2.Addr)) go echoListen(ctx, l1) @@ -227,6 +253,7 @@ func testDialerCloseEarly(t *testing.T, secure bool) { LocalPeer: p2.ID, // PrivateKey: key2, -- dont give it key. we'll just close the conn. } + d2.AddDialer(dialer(t, p2.Addr)) errs := make(chan error, 100) done := make(chan struct{}, 1) @@ -253,7 +280,7 @@ func testDialerCloseEarly(t *testing.T, secure bool) { c, err := d2.Dial(ctx, p1.Addr, p1.ID) if err != nil { - errs <- err + t.Fatal(err) } c.Close() // close it early. diff --git a/p2p/net/conn/interface.go b/p2p/net/conn/interface.go index 820085930..bbd13bdf7 100644 --- a/p2p/net/conn/interface.go +++ b/p2p/net/conn/interface.go @@ -8,11 +8,11 @@ import ( key "github.com/ipfs/go-ipfs/blocks/key" ic "github.com/ipfs/go-ipfs/p2p/crypto" filter "github.com/ipfs/go-ipfs/p2p/net/filter" + transport "github.com/ipfs/go-ipfs/p2p/net/transport" peer "github.com/ipfs/go-ipfs/p2p/peer" msgio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" - manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" ) // Map maps Keys (Peer.IDs) to Connections. @@ -54,22 +54,22 @@ type Conn interface { // Dial function as before, but it would have many arguments, as dialing is // no longer simple (need a peerstore, a local peer, a context, a network, etc) type Dialer struct { - - // Dialer is an optional manet.Dialer to use. - Dialer manet.Dialer - // LocalPeer is the identity of the local Peer. LocalPeer peer.ID // LocalAddrs is a set of local addresses to use. - LocalAddrs []ma.Multiaddr + //LocalAddrs []ma.Multiaddr + + // Dialers are the sub-dialers usable by this dialer + // selected in order based on the address being dialed + Dialers []transport.Dialer // PrivateKey used to initialize a secure connection. // Warning: if PrivateKey is nil, connection will not be secured. PrivateKey ic.PrivKey // Wrapper to wrap the raw connection (optional) - Wrapper func(manet.Conn) manet.Conn + Wrapper WrapFunc } // Listener is an object that can accept connections. It matches net.Listener diff --git a/p2p/net/conn/listen.go b/p2p/net/conn/listen.go index 1cd6aa2c3..eeb5662a0 100644 --- a/p2p/net/conn/listen.go +++ b/p2p/net/conn/listen.go @@ -6,8 +6,6 @@ import ( "net" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" - manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" - reuseport "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport" tec "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-temp-err-catcher" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context" @@ -15,15 +13,16 @@ import ( ic "github.com/ipfs/go-ipfs/p2p/crypto" filter "github.com/ipfs/go-ipfs/p2p/net/filter" + transport "github.com/ipfs/go-ipfs/p2p/net/transport" peer "github.com/ipfs/go-ipfs/p2p/peer" ) // ConnWrapper is any function that wraps a raw multiaddr connection -type ConnWrapper func(manet.Conn) manet.Conn +type ConnWrapper func(transport.Conn) transport.Conn // listener is an object that can accept connections. It implements Listener type listener struct { - manet.Listener + transport.Listener local peer.ID // LocalPeer is the identity of the local Peer privk ic.PrivKey // private key to use to initialize secure conns @@ -147,13 +146,7 @@ func (l *listener) Loggable() map[string]interface{} { } } -// Listen listens on the particular multiaddr, with given peer and peerstore. -func Listen(ctx context.Context, addr ma.Multiaddr, local peer.ID, sk ic.PrivKey) (Listener, error) { - ml, err := manetListen(addr) - if err != nil { - return nil, err - } - +func WrapTransportListener(ctx context.Context, ml transport.Listener, local peer.ID, sk ic.PrivKey) (Listener, error) { l := &listener{ Listener: ml, local: local, @@ -175,23 +168,3 @@ type ListenerConnWrapper interface { func (l *listener) SetConnWrapper(cw ConnWrapper) { l.wrapper = cw } - -func manetListen(addr ma.Multiaddr) (manet.Listener, error) { - network, naddr, err := manet.DialArgs(addr) - if err != nil { - return nil, err - } - - if reuseportIsAvailable() { - nl, err := reuseport.Listen(network, naddr) - if err == nil { - // hey, it worked! - return manet.WrapNetListener(nl) - } - // reuseport is available, but we failed to listen. log debug, and retry normally. - log.Debugf("reuseport available, but failed to listen: %s %s, %s", network, naddr, err) - } - - // either reuseport not available, or it failed. try normally. - return manet.Listen(addr) -} diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 673062e00..dabcf5368 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -8,9 +8,12 @@ import ( "time" metrics "github.com/ipfs/go-ipfs/metrics" + mconn "github.com/ipfs/go-ipfs/metrics/conn" inet "github.com/ipfs/go-ipfs/p2p/net" + conn "github.com/ipfs/go-ipfs/p2p/net/conn" filter "github.com/ipfs/go-ipfs/p2p/net/filter" addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr" + transport "github.com/ipfs/go-ipfs/p2p/net/transport" peer "github.com/ipfs/go-ipfs/p2p/peer" logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log" @@ -58,9 +61,13 @@ type Swarm struct { backf dialbackoff dialT time.Duration // mainly for tests + dialer *conn.Dialer + notifmu sync.RWMutex notifs map[inet.Notifiee]ps.Notifiee + transports []transport.Transport + // filters for addresses that shouldnt be dialed Filters *filter.Filters @@ -81,6 +88,10 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, return nil, err } + wrap := func(c transport.Conn) transport.Conn { + return mconn.WrapConn(bwc, c) + } + s := &Swarm{ swarm: ps.NewSwarm(PSTransport), local: local, @@ -88,9 +99,11 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, ctx: ctx, dialT: DialTimeout, notifs: make(map[inet.Notifiee]ps.Notifiee), + transports: []transport.Transport{transport.NewTCPTransport()}, bwc: bwc, fdRateLimit: make(chan struct{}, concurrentFdDials), Filters: filter.NewFilters(), + dialer: conn.NewDialer(local, peers.PrivKey(local), wrap), } // configure Swarm @@ -101,7 +114,12 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, prom.MustRegisterOrGet(peersTotal) s.Notify((*metricsNotifiee)(s)) - return s, s.listen(listenAddrs) + err = s.setupInterfaces(listenAddrs) + if err != nil { + return nil, err + } + + return s, nil } func (s *Swarm) teardown() error { @@ -134,7 +152,7 @@ func (s *Swarm) Listen(addrs ...ma.Multiaddr) error { return err } - return s.listen(addrs) + return s.setupInterfaces(addrs) } // Process returns the Process of the swarm diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 1e2e34143..77c768391 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -4,19 +4,17 @@ import ( "bytes" "errors" "fmt" - "net" "sort" "sync" "time" - mconn "github.com/ipfs/go-ipfs/metrics/conn" + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" conn "github.com/ipfs/go-ipfs/p2p/net/conn" addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr" peer "github.com/ipfs/go-ipfs/p2p/peer" lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" - manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" ) @@ -289,14 +287,6 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.") } - // get our own addrs. try dialing out from our listener addresses (reusing ports) - // Note that using our peerstore's addresses here is incorrect, as that would - // include observed addresses. TODO: make peerstore's address book smarter. - localAddrs := s.ListenAddresses() - if len(localAddrs) == 0 { - log.Debug("Dialing out with no local addresses.") - } - // get remote peer addrs remoteAddrs := s.peers.Addrs(p) // make sure we can use the addresses. @@ -321,23 +311,8 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { return nil, err } - // open connection to peer - d := &conn.Dialer{ - Dialer: manet.Dialer{ - Dialer: net.Dialer{ - Timeout: s.dialT, - }, - }, - LocalPeer: s.local, - LocalAddrs: localAddrs, - PrivateKey: sk, - Wrapper: func(c manet.Conn) manet.Conn { - return mconn.WrapConn(s.bwc, c) - }, - } - // try to get a connection to any addr - connC, err := s.dialAddrs(ctx, d, p, remoteAddrs) + connC, err := s.dialAddrs(ctx, p, remoteAddrs) if err != nil { logdial["error"] = err return nil, err @@ -357,7 +332,7 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { return swarmC, nil } -func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remoteAddrs []ma.Multiaddr) (conn.Conn, error) { +func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs []ma.Multiaddr) (conn.Conn, error) { // sort addresses so preferred addresses are dialed sooner sort.Sort(AddrList(remoteAddrs)) @@ -381,7 +356,7 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote connsout := conns errsout := errs - connC, err := s.dialAddr(ctx, d, p, addr) + connC, err := s.dialAddr(ctx, p, addr) if err != nil { connsout = nil } else if connC == nil { @@ -451,10 +426,10 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote return nil, exitErr } -func (s *Swarm) dialAddr(ctx context.Context, d *conn.Dialer, p peer.ID, addr ma.Multiaddr) (conn.Conn, error) { +func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (conn.Conn, error) { log.Debugf("%s swarm dialing %s %s", s.local, p, addr) - connC, err := d.Dial(ctx, addr, p) + connC, err := s.dialer.Dial(ctx, addr, p) if err != nil { return nil, fmt.Errorf("%s --> %s dial attempt failed: %s", s.local, p, err) } diff --git a/p2p/net/swarm/swarm_listen.go b/p2p/net/swarm/swarm_listen.go index d1bcb0752..640450399 100644 --- a/p2p/net/swarm/swarm_listen.go +++ b/p2p/net/swarm/swarm_listen.go @@ -6,65 +6,78 @@ import ( mconn "github.com/ipfs/go-ipfs/metrics/conn" inet "github.com/ipfs/go-ipfs/p2p/net" conn "github.com/ipfs/go-ipfs/p2p/net/conn" - addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr" + transport "github.com/ipfs/go-ipfs/p2p/net/transport" lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" - manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" - multierr "github.com/ipfs/go-ipfs/thirdparty/multierr" ) -// Open listeners for each network the swarm should listen on -func (s *Swarm) listen(addrs []ma.Multiaddr) error { - - for _, addr := range addrs { - if !addrutil.AddrUsable(addr, true) { - return fmt.Errorf("cannot use addr: %s", addr) +// Open listeners and reuse-dialers for the given addresses +func (s *Swarm) setupInterfaces(addrs []ma.Multiaddr) error { + errs := make([]error, len(addrs)) + var succeeded int + for i, a := range addrs { + tpt := s.transportForAddr(a) + if tpt == nil { + errs[i] = fmt.Errorf("no transport for address: %s", a) + continue } - } - retErr := multierr.New() - - // listen on every address - for i, addr := range addrs { - err := s.setupListener(addr) + d, err := tpt.Dialer(a, transport.TimeoutOpt(DialTimeout), transport.ReusePorts) if err != nil { - if retErr.Errors == nil { - retErr.Errors = make([]error, len(addrs)) - } - retErr.Errors[i] = err - log.Debugf("Failed to listen on: %s - %s", addr, err) + errs[i] = err + continue } + + s.dialer.AddDialer(d) + + list, err := tpt.Listen(a) + if err != nil { + errs[i] = err + continue + } + + err = s.addListener(list) + if err != nil { + errs[i] = err + continue + } + succeeded++ } - if retErr.Errors != nil { - return retErr + for i, e := range errs { + if e != nil { + log.Warning("listen on %s failed: %s", addrs[i], errs[i]) + } } + if succeeded == 0 && len(addrs) > 0 { + return fmt.Errorf("failed to listen on any addresses: %s", errs) + } + return nil } -// Listen for new connections on the given multiaddr -func (s *Swarm) setupListener(maddr ma.Multiaddr) error { +func (s *Swarm) transportForAddr(a ma.Multiaddr) transport.Transport { + for _, t := range s.transports { + if t.Matches(a) { + return t + } + } - // TODO rethink how this has to work. (jbenet) - // - // resolved, err := resolveUnspecifiedAddresses([]ma.Multiaddr{maddr}) - // if err != nil { - // return err - // } - // for _, a := range resolved { - // s.peers.AddAddr(s.local, a) - // } + return nil +} + +func (s *Swarm) addListener(tptlist transport.Listener) error { sk := s.peers.PrivKey(s.local) if sk == nil { // may be fine for sk to be nil, just log a warning. log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.") } - log.Debugf("Swarm Listening at %s", maddr) - list, err := conn.Listen(s.Context(), maddr, s.local, sk) + + list, err := conn.WrapTransportListener(s.Context(), tptlist, s.local, sk) if err != nil { return err } @@ -72,11 +85,15 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error { list.SetAddrFilters(s.Filters) if cw, ok := list.(conn.ListenerConnWrapper); ok { - cw.SetConnWrapper(func(c manet.Conn) manet.Conn { + cw.SetConnWrapper(func(c transport.Conn) transport.Conn { return mconn.WrapConn(s.bwc, c) }) } + return s.addConnListener(list) +} + +func (s *Swarm) addConnListener(list conn.Listener) error { // AddListener to the peerstream Listener. this will begin accepting connections // and streams! sl, err := s.swarm.AddListener(list) @@ -85,6 +102,8 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error { } log.Debugf("Swarm Listeners at %s", s.ListenAddresses()) + maddr := list.Multiaddr() + // signal to our notifiees on successful conn. s.notifyAll(func(n inet.Notifiee) { n.Listen((*Network)(s), maddr) @@ -107,7 +126,7 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error { if !more { return } - log.Warningf("swarm listener accept error: %s", err) + log.Errorf("swarm listener accept error: %s", err) case <-ctx.Done(): return } diff --git a/p2p/net/conn/reuseport.go b/p2p/net/transport/reuseport.go similarity index 52% rename from p2p/net/conn/reuseport.go rename to p2p/net/transport/reuseport.go index 359c52315..e940a486d 100644 --- a/p2p/net/conn/reuseport.go +++ b/p2p/net/transport/reuseport.go @@ -1,8 +1,10 @@ -package conn +package transport import ( + "net" "os" "strings" + "syscall" reuseport "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport" ) @@ -30,6 +32,34 @@ func init() { // // If this becomes a sought after feature, we could add this to the config. // In the end, reuseport is a stop-gap. -func reuseportIsAvailable() bool { +func ReuseportIsAvailable() bool { return envReuseportVal && reuseport.Available() } + +// ReuseErrShouldRetry diagnoses whether to retry after a reuse error. +// if we failed to bind, we should retry. if bind worked and this is a +// real dial error (remote end didnt answer) then we should not retry. +func ReuseErrShouldRetry(err error) bool { + if err == nil { + return false // hey, it worked! no need to retry. + } + + // if it's a network timeout error, it's a legitimate failure. + if nerr, ok := err.(net.Error); ok && nerr.Timeout() { + return false + } + + errno, ok := err.(syscall.Errno) + if !ok { // not an errno? who knows what this is. retry. + return true + } + + switch errno { + case syscall.EADDRINUSE, syscall.EADDRNOTAVAIL: + return true // failure to bind. retry. + case syscall.ECONNREFUSED: + return false // real dial error + default: + return true // optimistically default to retry. + } +} diff --git a/p2p/net/transport/tcp.go b/p2p/net/transport/tcp.go new file mode 100644 index 000000000..8b200bbd0 --- /dev/null +++ b/p2p/net/transport/tcp.go @@ -0,0 +1,236 @@ +package transport + +import ( + "fmt" + "net" + "sync" + "time" + + ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" + reuseport "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport" + context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables" +) + +type TcpTransport struct { + dlock sync.Mutex + dialers map[string]Dialer + + llock sync.Mutex + listeners map[string]Listener +} + +func NewTCPTransport() *TcpTransport { + return &TcpTransport{ + dialers: make(map[string]Dialer), + listeners: make(map[string]Listener), + } +} + +func (t *TcpTransport) Dialer(laddr ma.Multiaddr, opts ...DialOpt) (Dialer, error) { + t.dlock.Lock() + defer t.dlock.Unlock() + s := laddr.String() + d, found := t.dialers[s] + if found { + return d, nil + } + var base manet.Dialer + + var doReuse bool + for _, o := range opts { + switch o := o.(type) { + case TimeoutOpt: + base.Timeout = time.Duration(o) + case ReuseportOpt: + doReuse = bool(o) + default: + return nil, fmt.Errorf("unrecognized option: %#v", o) + } + } + + tcpd, err := t.newTcpDialer(base, laddr, doReuse) + if err != nil { + return nil, err + } + + t.dialers[s] = tcpd + return tcpd, nil +} + +func (t *TcpTransport) Listen(laddr ma.Multiaddr) (Listener, error) { + t.llock.Lock() + defer t.llock.Unlock() + s := laddr.String() + l, found := t.listeners[s] + if found { + return l, nil + } + + list, err := manetListen(laddr) + if err != nil { + return nil, err + } + + tlist := &tcpListener{ + list: list, + transport: t, + } + + t.listeners[s] = tlist + return tlist, nil +} + +func manetListen(addr ma.Multiaddr) (manet.Listener, error) { + network, naddr, err := manet.DialArgs(addr) + if err != nil { + return nil, err + } + + if ReuseportIsAvailable() { + nl, err := reuseport.Listen(network, naddr) + if err == nil { + // hey, it worked! + return manet.WrapNetListener(nl) + } + // reuseport is available, but we failed to listen. log debug, and retry normally. + log.Debugf("reuseport available, but failed to listen: %s %s, %s", network, naddr, err) + } + + // either reuseport not available, or it failed. try normally. + return manet.Listen(addr) +} + +func (t *TcpTransport) Matches(a ma.Multiaddr) bool { + return IsTcpMultiaddr(a) +} + +type tcpDialer struct { + laddr ma.Multiaddr + + doReuse bool + + rd reuseport.Dialer + madialer manet.Dialer + + transport Transport +} + +func (t *TcpTransport) newTcpDialer(base manet.Dialer, laddr ma.Multiaddr, doReuse bool) (*tcpDialer, error) { + // get the local net.Addr manually + la, err := manet.ToNetAddr(laddr) + if err != nil { + return nil, err // something wrong with laddr. + } + + if doReuse && ReuseportIsAvailable() { + rd := reuseport.Dialer{ + D: net.Dialer{ + LocalAddr: la, + Timeout: base.Timeout, + }, + } + + return &tcpDialer{ + doReuse: true, + laddr: laddr, + rd: rd, + madialer: base, + transport: t, + }, nil + } + + return &tcpDialer{ + doReuse: false, + laddr: laddr, + madialer: base, + transport: t, + }, nil +} + +func (d *tcpDialer) Dial(raddr ma.Multiaddr) (Conn, error) { + var c manet.Conn + var err error + if d.doReuse { + c, err = d.reuseDial(raddr) + } else { + c, err = d.madialer.Dial(raddr) + } + + if err != nil { + return nil, err + } + + return &connWrap{ + Conn: c, + transport: d.transport, + }, nil +} + +func (d *tcpDialer) reuseDial(raddr ma.Multiaddr) (manet.Conn, error) { + logdial := lgbl.Dial("conn", "", "", d.laddr, raddr) + rpev := log.EventBegin(context.TODO(), "tptDialReusePort", logdial) + + network, netraddr, err := manet.DialArgs(raddr) + if err != nil { + return nil, err + } + + con, err := d.rd.Dial(network, netraddr) + if err == nil { + logdial["reuseport"] = "success" + rpev.Done() + return manet.WrapNetConn(con) + } + + if !ReuseErrShouldRetry(err) { + logdial["reuseport"] = "failure" + logdial["error"] = err + rpev.Done() + return nil, err + } + + logdial["reuseport"] = "retry" + logdial["error"] = err + rpev.Done() + + return d.madialer.Dial(raddr) +} + +func (d *tcpDialer) Matches(a ma.Multiaddr) bool { + return IsTcpMultiaddr(a) +} + +type tcpListener struct { + list manet.Listener + transport Transport +} + +func (d *tcpListener) Accept() (Conn, error) { + c, err := d.list.Accept() + if err != nil { + return nil, err + } + + return &connWrap{ + Conn: c, + transport: d.transport, + }, nil +} + +func (d *tcpListener) Addr() net.Addr { + return d.list.Addr() +} + +func (t *tcpListener) Multiaddr() ma.Multiaddr { + return t.list.Multiaddr() +} + +func (t *tcpListener) NetListener() net.Listener { + return t.list.NetListener() +} + +func (d *tcpListener) Close() error { + return d.list.Close() +} diff --git a/p2p/net/transport/transport.go b/p2p/net/transport/transport.go new file mode 100644 index 000000000..e75f43029 --- /dev/null +++ b/p2p/net/transport/transport.go @@ -0,0 +1,61 @@ +package transport + +import ( + "net" + "time" + + ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" + logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log" +) + +var log = logging.Logger("transport") + +type Conn interface { + manet.Conn + + Transport() Transport +} + +type Transport interface { + Dialer(laddr ma.Multiaddr, opts ...DialOpt) (Dialer, error) + Listen(laddr ma.Multiaddr) (Listener, error) + Matches(ma.Multiaddr) bool +} + +type Dialer interface { + Dial(raddr ma.Multiaddr) (Conn, error) + Matches(ma.Multiaddr) bool +} + +type Listener interface { + Accept() (Conn, error) + Close() error + Addr() net.Addr + Multiaddr() ma.Multiaddr +} + +type connWrap struct { + manet.Conn + transport Transport +} + +func (cw *connWrap) Transport() Transport { + return cw.transport +} + +type DialOpt interface{} +type TimeoutOpt time.Duration +type ReuseportOpt bool + +var ReusePorts ReuseportOpt = true + +func IsTcpMultiaddr(a ma.Multiaddr) bool { + p := a.Protocols() + return len(p) == 2 && (p[0].Name == "ip4" || p[0].Name == "ip6") && p[1].Name == "tcp" +} + +func IsUtpMultiaddr(a ma.Multiaddr) bool { + p := a.Protocols() + return len(p) == 3 && p[2].Name == "utp" +}