mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-24 14:08:13 +08:00
Merge pull request #1937 from ipfs/refactor/transport
refactor net code to use transports, in rough accordance with libp2p
This commit is contained in:
@ -1,22 +1,22 @@
|
|||||||
package meterconn
|
package meterconn
|
||||||
|
|
||||||
import (
|
import (
|
||||||
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
|
|
||||||
metrics "github.com/ipfs/go-ipfs/metrics"
|
metrics "github.com/ipfs/go-ipfs/metrics"
|
||||||
|
transport "github.com/ipfs/go-ipfs/p2p/net/transport"
|
||||||
)
|
)
|
||||||
|
|
||||||
type MeteredConn struct {
|
type MeteredConn struct {
|
||||||
mesRecv metrics.MeterCallback
|
mesRecv metrics.MeterCallback
|
||||||
mesSent metrics.MeterCallback
|
mesSent metrics.MeterCallback
|
||||||
|
|
||||||
manet.Conn
|
transport.Conn
|
||||||
}
|
}
|
||||||
|
|
||||||
func WrapConn(bwc metrics.Reporter, c manet.Conn) manet.Conn {
|
func WrapConn(bwc metrics.Reporter, c transport.Conn) transport.Conn {
|
||||||
return newMeteredConn(c, bwc.LogRecvMessage, bwc.LogSentMessage)
|
return newMeteredConn(c, bwc.LogRecvMessage, bwc.LogSentMessage)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMeteredConn(base manet.Conn, rcb metrics.MeterCallback, scb metrics.MeterCallback) manet.Conn {
|
func newMeteredConn(base transport.Conn, rcb metrics.MeterCallback, scb metrics.MeterCallback) transport.Conn {
|
||||||
return &MeteredConn{
|
return &MeteredConn{
|
||||||
Conn: base,
|
Conn: base,
|
||||||
mesRecv: rcb,
|
mesRecv: rcb,
|
||||||
|
@ -3,23 +3,32 @@ package conn
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net"
|
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
|
||||||
|
|
||||||
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||||
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
|
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
|
||||||
reuseport "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport"
|
|
||||||
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables"
|
lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables"
|
||||||
|
|
||||||
|
ci "github.com/ipfs/go-ipfs/p2p/crypto"
|
||||||
addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr"
|
addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr"
|
||||||
|
transport "github.com/ipfs/go-ipfs/p2p/net/transport"
|
||||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type WrapFunc func(transport.Conn) transport.Conn
|
||||||
|
|
||||||
|
func NewDialer(p peer.ID, pk ci.PrivKey, wrap WrapFunc) *Dialer {
|
||||||
|
return &Dialer{
|
||||||
|
LocalPeer: p,
|
||||||
|
PrivateKey: pk,
|
||||||
|
Wrapper: wrap,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// String returns the string rep of d.
|
// String returns the string rep of d.
|
||||||
func (d *Dialer) String() string {
|
func (d *Dialer) String() string {
|
||||||
return fmt.Sprintf("<Dialer %s %s ...>", d.LocalPeer, d.LocalAddrs[0])
|
return fmt.Sprintf("<Dialer %s ...>", d.LocalPeer)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Dial connects to a peer over a particular address
|
// Dial connects to a peer over a particular address
|
||||||
@ -95,112 +104,34 @@ func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (
|
|||||||
return connOut, nil
|
return connOut, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// rawConnDial dials the underlying net.Conn + manet.Conns
|
func (d *Dialer) AddDialer(pd transport.Dialer) {
|
||||||
func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (manet.Conn, error) {
|
d.Dialers = append(d.Dialers, pd)
|
||||||
|
}
|
||||||
|
|
||||||
// before doing anything, check we're going to be able to dial.
|
// returns dialer that can dial the given address
|
||||||
// we may not support the given address.
|
func (d *Dialer) subDialerForAddr(raddr ma.Multiaddr) transport.Dialer {
|
||||||
if _, _, err := manet.DialArgs(raddr); err != nil {
|
for _, pd := range d.Dialers {
|
||||||
return nil, err
|
if pd.Matches(raddr) {
|
||||||
|
return pd
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// rawConnDial dials the underlying net.Conn + manet.Conns
|
||||||
|
func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (transport.Conn, error) {
|
||||||
if strings.HasPrefix(raddr.String(), "/ip4/0.0.0.0") {
|
if strings.HasPrefix(raddr.String(), "/ip4/0.0.0.0") {
|
||||||
log.Event(ctx, "connDialZeroAddr", lgbl.Dial("conn", d.LocalPeer, remote, nil, raddr))
|
log.Event(ctx, "connDialZeroAddr", lgbl.Dial("conn", d.LocalPeer, remote, nil, raddr))
|
||||||
return nil, fmt.Errorf("Attempted to connect to zero address: %s", raddr)
|
return nil, fmt.Errorf("Attempted to connect to zero address: %s", raddr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// get local addr to use.
|
sd := d.subDialerForAddr(raddr)
|
||||||
laddr := pickLocalAddr(d.LocalAddrs, raddr)
|
if sd == nil {
|
||||||
logdial := lgbl.Dial("conn", d.LocalPeer, remote, laddr, raddr)
|
return nil, fmt.Errorf("no dialer for %s", raddr)
|
||||||
defer log.EventBegin(ctx, "connDialRawConn", logdial).Done()
|
|
||||||
|
|
||||||
// make a copy of the manet.Dialer, we may need to change its timeout.
|
|
||||||
madialer := d.Dialer
|
|
||||||
|
|
||||||
if laddr != nil && reuseportIsAvailable() {
|
|
||||||
// we're perhaps going to dial twice. half the timeout, so we can afford to.
|
|
||||||
// otherwise our context would expire right after the first dial.
|
|
||||||
madialer.Dialer.Timeout = (madialer.Dialer.Timeout / 2)
|
|
||||||
|
|
||||||
// dial using reuseport.Dialer, because we're probably reusing addrs.
|
|
||||||
// this is optimistic, as the reuseDial may fail to bind the port.
|
|
||||||
rpev := log.EventBegin(ctx, "connDialReusePort", logdial)
|
|
||||||
if nconn, retry, reuseErr := reuseDial(madialer.Dialer, laddr, raddr); reuseErr == nil {
|
|
||||||
// if it worked, wrap the raw net.Conn with our manet.Conn
|
|
||||||
logdial["reuseport"] = "success"
|
|
||||||
rpev.Done()
|
|
||||||
return manet.WrapNetConn(nconn)
|
|
||||||
} else if !retry {
|
|
||||||
// reuseDial is sure this is a legitimate dial failure, not a reuseport failure.
|
|
||||||
logdial["reuseport"] = "failure"
|
|
||||||
logdial["error"] = reuseErr
|
|
||||||
rpev.Done()
|
|
||||||
return nil, reuseErr
|
|
||||||
} else {
|
|
||||||
// this is a failure to reuse port. log it.
|
|
||||||
logdial["reuseport"] = "retry"
|
|
||||||
logdial["error"] = reuseErr
|
|
||||||
rpev.Done()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
defer log.EventBegin(ctx, "connDialManet", logdial).Done()
|
return sd.Dial(raddr)
|
||||||
return madialer.Dial(raddr)
|
|
||||||
}
|
|
||||||
|
|
||||||
func reuseDial(dialer net.Dialer, laddr, raddr ma.Multiaddr) (conn net.Conn, retry bool, err error) {
|
|
||||||
if laddr == nil {
|
|
||||||
// if we're given no local address no sense in using reuseport to dial, dial out as usual.
|
|
||||||
return nil, true, reuseport.ErrReuseFailed
|
|
||||||
}
|
|
||||||
|
|
||||||
// give reuse.Dialer the manet.Dialer's Dialer.
|
|
||||||
// (wow, Dialer should've so been an interface...)
|
|
||||||
rd := reuseport.Dialer{dialer}
|
|
||||||
|
|
||||||
// get the local net.Addr manually
|
|
||||||
rd.D.LocalAddr, err = manet.ToNetAddr(laddr)
|
|
||||||
if err != nil {
|
|
||||||
return nil, true, err // something wrong with laddr. retry without.
|
|
||||||
}
|
|
||||||
|
|
||||||
// get the raddr dial args for rd.dial
|
|
||||||
network, netraddr, err := manet.DialArgs(raddr)
|
|
||||||
if err != nil {
|
|
||||||
return nil, true, err // something wrong with laddr. retry without.
|
|
||||||
}
|
|
||||||
|
|
||||||
// rd.Dial gets us a net.Conn with SO_REUSEPORT and SO_REUSEADDR set.
|
|
||||||
conn, err = rd.Dial(network, netraddr)
|
|
||||||
return conn, reuseErrShouldRetry(err), err // hey! it worked!
|
|
||||||
}
|
|
||||||
|
|
||||||
// reuseErrShouldRetry diagnoses whether to retry after a reuse error.
|
|
||||||
// if we failed to bind, we should retry. if bind worked and this is a
|
|
||||||
// real dial error (remote end didnt answer) then we should not retry.
|
|
||||||
func reuseErrShouldRetry(err error) bool {
|
|
||||||
if err == nil {
|
|
||||||
return false // hey, it worked! no need to retry.
|
|
||||||
}
|
|
||||||
|
|
||||||
// if it's a network timeout error, it's a legitimate failure.
|
|
||||||
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
errno, ok := err.(syscall.Errno)
|
|
||||||
if !ok { // not an errno? who knows what this is. retry.
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
switch errno {
|
|
||||||
case syscall.EADDRINUSE, syscall.EADDRNOTAVAIL:
|
|
||||||
return true // failure to bind. retry.
|
|
||||||
case syscall.ECONNREFUSED:
|
|
||||||
return false // real dial error
|
|
||||||
default:
|
|
||||||
return true // optimistically default to retry.
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func pickLocalAddr(laddrs []ma.Multiaddr, raddr ma.Multiaddr) (laddr ma.Multiaddr) {
|
func pickLocalAddr(laddrs []ma.Multiaddr, raddr ma.Multiaddr) (laddr ma.Multiaddr) {
|
||||||
|
@ -8,8 +8,12 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
ic "github.com/ipfs/go-ipfs/p2p/crypto"
|
||||||
|
transport "github.com/ipfs/go-ipfs/p2p/net/transport"
|
||||||
|
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||||
tu "github.com/ipfs/go-ipfs/util/testutil"
|
tu "github.com/ipfs/go-ipfs/util/testutil"
|
||||||
|
|
||||||
|
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||||
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -49,6 +53,25 @@ func setupSingleConn(t *testing.T, ctx context.Context) (a, b Conn, p1, p2 tu.Pe
|
|||||||
return setupConn(t, ctx, false)
|
return setupConn(t, ctx, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Listen(ctx context.Context, addr ma.Multiaddr, local peer.ID, sk ic.PrivKey) (Listener, error) {
|
||||||
|
list, err := transport.NewTCPTransport().Listen(addr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return WrapTransportListener(ctx, list, local, sk)
|
||||||
|
}
|
||||||
|
|
||||||
|
func dialer(t *testing.T, a ma.Multiaddr) transport.Dialer {
|
||||||
|
tpt := transport.NewTCPTransport()
|
||||||
|
tptd, err := tpt.Dialer(a)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return tptd
|
||||||
|
}
|
||||||
|
|
||||||
func setupConn(t *testing.T, ctx context.Context, secure bool) (a, b Conn, p1, p2 tu.PeerNetParams) {
|
func setupConn(t *testing.T, ctx context.Context, secure bool) (a, b Conn, p1, p2 tu.PeerNetParams) {
|
||||||
|
|
||||||
p1 = tu.RandPeerNetParamsOrFatal(t)
|
p1 = tu.RandPeerNetParamsOrFatal(t)
|
||||||
@ -71,6 +94,8 @@ func setupConn(t *testing.T, ctx context.Context, secure bool) (a, b Conn, p1, p
|
|||||||
PrivateKey: key2,
|
PrivateKey: key2,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
d2.AddDialer(dialer(t, p2.Addr))
|
||||||
|
|
||||||
var c2 Conn
|
var c2 Conn
|
||||||
|
|
||||||
done := make(chan error)
|
done := make(chan error)
|
||||||
@ -152,6 +177,7 @@ func testDialer(t *testing.T, secure bool) {
|
|||||||
LocalPeer: p2.ID,
|
LocalPeer: p2.ID,
|
||||||
PrivateKey: key2,
|
PrivateKey: key2,
|
||||||
}
|
}
|
||||||
|
d2.AddDialer(dialer(t, p2.Addr))
|
||||||
|
|
||||||
go echoListen(ctx, l1)
|
go echoListen(ctx, l1)
|
||||||
|
|
||||||
@ -227,6 +253,7 @@ func testDialerCloseEarly(t *testing.T, secure bool) {
|
|||||||
LocalPeer: p2.ID,
|
LocalPeer: p2.ID,
|
||||||
// PrivateKey: key2, -- dont give it key. we'll just close the conn.
|
// PrivateKey: key2, -- dont give it key. we'll just close the conn.
|
||||||
}
|
}
|
||||||
|
d2.AddDialer(dialer(t, p2.Addr))
|
||||||
|
|
||||||
errs := make(chan error, 100)
|
errs := make(chan error, 100)
|
||||||
done := make(chan struct{}, 1)
|
done := make(chan struct{}, 1)
|
||||||
@ -253,7 +280,7 @@ func testDialerCloseEarly(t *testing.T, secure bool) {
|
|||||||
|
|
||||||
c, err := d2.Dial(ctx, p1.Addr, p1.ID)
|
c, err := d2.Dial(ctx, p1.Addr, p1.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errs <- err
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
c.Close() // close it early.
|
c.Close() // close it early.
|
||||||
|
|
||||||
|
@ -8,11 +8,11 @@ import (
|
|||||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||||
ic "github.com/ipfs/go-ipfs/p2p/crypto"
|
ic "github.com/ipfs/go-ipfs/p2p/crypto"
|
||||||
filter "github.com/ipfs/go-ipfs/p2p/net/filter"
|
filter "github.com/ipfs/go-ipfs/p2p/net/filter"
|
||||||
|
transport "github.com/ipfs/go-ipfs/p2p/net/transport"
|
||||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||||
|
|
||||||
msgio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio"
|
msgio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio"
|
||||||
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||||
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Map maps Keys (Peer.IDs) to Connections.
|
// Map maps Keys (Peer.IDs) to Connections.
|
||||||
@ -54,22 +54,22 @@ type Conn interface {
|
|||||||
// Dial function as before, but it would have many arguments, as dialing is
|
// Dial function as before, but it would have many arguments, as dialing is
|
||||||
// no longer simple (need a peerstore, a local peer, a context, a network, etc)
|
// no longer simple (need a peerstore, a local peer, a context, a network, etc)
|
||||||
type Dialer struct {
|
type Dialer struct {
|
||||||
|
|
||||||
// Dialer is an optional manet.Dialer to use.
|
|
||||||
Dialer manet.Dialer
|
|
||||||
|
|
||||||
// LocalPeer is the identity of the local Peer.
|
// LocalPeer is the identity of the local Peer.
|
||||||
LocalPeer peer.ID
|
LocalPeer peer.ID
|
||||||
|
|
||||||
// LocalAddrs is a set of local addresses to use.
|
// LocalAddrs is a set of local addresses to use.
|
||||||
LocalAddrs []ma.Multiaddr
|
//LocalAddrs []ma.Multiaddr
|
||||||
|
|
||||||
|
// Dialers are the sub-dialers usable by this dialer
|
||||||
|
// selected in order based on the address being dialed
|
||||||
|
Dialers []transport.Dialer
|
||||||
|
|
||||||
// PrivateKey used to initialize a secure connection.
|
// PrivateKey used to initialize a secure connection.
|
||||||
// Warning: if PrivateKey is nil, connection will not be secured.
|
// Warning: if PrivateKey is nil, connection will not be secured.
|
||||||
PrivateKey ic.PrivKey
|
PrivateKey ic.PrivKey
|
||||||
|
|
||||||
// Wrapper to wrap the raw connection (optional)
|
// Wrapper to wrap the raw connection (optional)
|
||||||
Wrapper func(manet.Conn) manet.Conn
|
Wrapper WrapFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// Listener is an object that can accept connections. It matches net.Listener
|
// Listener is an object that can accept connections. It matches net.Listener
|
||||||
|
@ -6,8 +6,6 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
|
|
||||||
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||||
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
|
|
||||||
reuseport "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport"
|
|
||||||
tec "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-temp-err-catcher"
|
tec "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-temp-err-catcher"
|
||||||
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
|
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
|
||||||
goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
|
goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
|
||||||
@ -15,15 +13,16 @@ import (
|
|||||||
|
|
||||||
ic "github.com/ipfs/go-ipfs/p2p/crypto"
|
ic "github.com/ipfs/go-ipfs/p2p/crypto"
|
||||||
filter "github.com/ipfs/go-ipfs/p2p/net/filter"
|
filter "github.com/ipfs/go-ipfs/p2p/net/filter"
|
||||||
|
transport "github.com/ipfs/go-ipfs/p2p/net/transport"
|
||||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ConnWrapper is any function that wraps a raw multiaddr connection
|
// ConnWrapper is any function that wraps a raw multiaddr connection
|
||||||
type ConnWrapper func(manet.Conn) manet.Conn
|
type ConnWrapper func(transport.Conn) transport.Conn
|
||||||
|
|
||||||
// listener is an object that can accept connections. It implements Listener
|
// listener is an object that can accept connections. It implements Listener
|
||||||
type listener struct {
|
type listener struct {
|
||||||
manet.Listener
|
transport.Listener
|
||||||
|
|
||||||
local peer.ID // LocalPeer is the identity of the local Peer
|
local peer.ID // LocalPeer is the identity of the local Peer
|
||||||
privk ic.PrivKey // private key to use to initialize secure conns
|
privk ic.PrivKey // private key to use to initialize secure conns
|
||||||
@ -147,13 +146,7 @@ func (l *listener) Loggable() map[string]interface{} {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Listen listens on the particular multiaddr, with given peer and peerstore.
|
func WrapTransportListener(ctx context.Context, ml transport.Listener, local peer.ID, sk ic.PrivKey) (Listener, error) {
|
||||||
func Listen(ctx context.Context, addr ma.Multiaddr, local peer.ID, sk ic.PrivKey) (Listener, error) {
|
|
||||||
ml, err := manetListen(addr)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
l := &listener{
|
l := &listener{
|
||||||
Listener: ml,
|
Listener: ml,
|
||||||
local: local,
|
local: local,
|
||||||
@ -175,23 +168,3 @@ type ListenerConnWrapper interface {
|
|||||||
func (l *listener) SetConnWrapper(cw ConnWrapper) {
|
func (l *listener) SetConnWrapper(cw ConnWrapper) {
|
||||||
l.wrapper = cw
|
l.wrapper = cw
|
||||||
}
|
}
|
||||||
|
|
||||||
func manetListen(addr ma.Multiaddr) (manet.Listener, error) {
|
|
||||||
network, naddr, err := manet.DialArgs(addr)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if reuseportIsAvailable() {
|
|
||||||
nl, err := reuseport.Listen(network, naddr)
|
|
||||||
if err == nil {
|
|
||||||
// hey, it worked!
|
|
||||||
return manet.WrapNetListener(nl)
|
|
||||||
}
|
|
||||||
// reuseport is available, but we failed to listen. log debug, and retry normally.
|
|
||||||
log.Debugf("reuseport available, but failed to listen: %s %s, %s", network, naddr, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// either reuseport not available, or it failed. try normally.
|
|
||||||
return manet.Listen(addr)
|
|
||||||
}
|
|
||||||
|
@ -8,9 +8,12 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
metrics "github.com/ipfs/go-ipfs/metrics"
|
metrics "github.com/ipfs/go-ipfs/metrics"
|
||||||
|
mconn "github.com/ipfs/go-ipfs/metrics/conn"
|
||||||
inet "github.com/ipfs/go-ipfs/p2p/net"
|
inet "github.com/ipfs/go-ipfs/p2p/net"
|
||||||
|
conn "github.com/ipfs/go-ipfs/p2p/net/conn"
|
||||||
filter "github.com/ipfs/go-ipfs/p2p/net/filter"
|
filter "github.com/ipfs/go-ipfs/p2p/net/filter"
|
||||||
addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr"
|
addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr"
|
||||||
|
transport "github.com/ipfs/go-ipfs/p2p/net/transport"
|
||||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||||
logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
|
logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
|
||||||
|
|
||||||
@ -58,9 +61,13 @@ type Swarm struct {
|
|||||||
backf dialbackoff
|
backf dialbackoff
|
||||||
dialT time.Duration // mainly for tests
|
dialT time.Duration // mainly for tests
|
||||||
|
|
||||||
|
dialer *conn.Dialer
|
||||||
|
|
||||||
notifmu sync.RWMutex
|
notifmu sync.RWMutex
|
||||||
notifs map[inet.Notifiee]ps.Notifiee
|
notifs map[inet.Notifiee]ps.Notifiee
|
||||||
|
|
||||||
|
transports []transport.Transport
|
||||||
|
|
||||||
// filters for addresses that shouldnt be dialed
|
// filters for addresses that shouldnt be dialed
|
||||||
Filters *filter.Filters
|
Filters *filter.Filters
|
||||||
|
|
||||||
@ -81,6 +88,10 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
wrap := func(c transport.Conn) transport.Conn {
|
||||||
|
return mconn.WrapConn(bwc, c)
|
||||||
|
}
|
||||||
|
|
||||||
s := &Swarm{
|
s := &Swarm{
|
||||||
swarm: ps.NewSwarm(PSTransport),
|
swarm: ps.NewSwarm(PSTransport),
|
||||||
local: local,
|
local: local,
|
||||||
@ -88,9 +99,11 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
|
|||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
dialT: DialTimeout,
|
dialT: DialTimeout,
|
||||||
notifs: make(map[inet.Notifiee]ps.Notifiee),
|
notifs: make(map[inet.Notifiee]ps.Notifiee),
|
||||||
|
transports: []transport.Transport{transport.NewTCPTransport()},
|
||||||
bwc: bwc,
|
bwc: bwc,
|
||||||
fdRateLimit: make(chan struct{}, concurrentFdDials),
|
fdRateLimit: make(chan struct{}, concurrentFdDials),
|
||||||
Filters: filter.NewFilters(),
|
Filters: filter.NewFilters(),
|
||||||
|
dialer: conn.NewDialer(local, peers.PrivKey(local), wrap),
|
||||||
}
|
}
|
||||||
|
|
||||||
// configure Swarm
|
// configure Swarm
|
||||||
@ -101,7 +114,12 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
|
|||||||
prom.MustRegisterOrGet(peersTotal)
|
prom.MustRegisterOrGet(peersTotal)
|
||||||
s.Notify((*metricsNotifiee)(s))
|
s.Notify((*metricsNotifiee)(s))
|
||||||
|
|
||||||
return s, s.listen(listenAddrs)
|
err = s.setupInterfaces(listenAddrs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Swarm) teardown() error {
|
func (s *Swarm) teardown() error {
|
||||||
@ -134,7 +152,7 @@ func (s *Swarm) Listen(addrs ...ma.Multiaddr) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.listen(addrs)
|
return s.setupInterfaces(addrs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process returns the Process of the swarm
|
// Process returns the Process of the swarm
|
||||||
|
@ -4,19 +4,17 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
mconn "github.com/ipfs/go-ipfs/metrics/conn"
|
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
|
||||||
conn "github.com/ipfs/go-ipfs/p2p/net/conn"
|
conn "github.com/ipfs/go-ipfs/p2p/net/conn"
|
||||||
addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr"
|
addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr"
|
||||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||||
lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables"
|
lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables"
|
||||||
|
|
||||||
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||||
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
|
|
||||||
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -289,14 +287,6 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
|
|||||||
log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.")
|
log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.")
|
||||||
}
|
}
|
||||||
|
|
||||||
// get our own addrs. try dialing out from our listener addresses (reusing ports)
|
|
||||||
// Note that using our peerstore's addresses here is incorrect, as that would
|
|
||||||
// include observed addresses. TODO: make peerstore's address book smarter.
|
|
||||||
localAddrs := s.ListenAddresses()
|
|
||||||
if len(localAddrs) == 0 {
|
|
||||||
log.Debug("Dialing out with no local addresses.")
|
|
||||||
}
|
|
||||||
|
|
||||||
// get remote peer addrs
|
// get remote peer addrs
|
||||||
remoteAddrs := s.peers.Addrs(p)
|
remoteAddrs := s.peers.Addrs(p)
|
||||||
// make sure we can use the addresses.
|
// make sure we can use the addresses.
|
||||||
@ -321,23 +311,8 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// open connection to peer
|
|
||||||
d := &conn.Dialer{
|
|
||||||
Dialer: manet.Dialer{
|
|
||||||
Dialer: net.Dialer{
|
|
||||||
Timeout: s.dialT,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
LocalPeer: s.local,
|
|
||||||
LocalAddrs: localAddrs,
|
|
||||||
PrivateKey: sk,
|
|
||||||
Wrapper: func(c manet.Conn) manet.Conn {
|
|
||||||
return mconn.WrapConn(s.bwc, c)
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
// try to get a connection to any addr
|
// try to get a connection to any addr
|
||||||
connC, err := s.dialAddrs(ctx, d, p, remoteAddrs)
|
connC, err := s.dialAddrs(ctx, p, remoteAddrs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logdial["error"] = err
|
logdial["error"] = err
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -357,7 +332,7 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
|
|||||||
return swarmC, nil
|
return swarmC, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remoteAddrs []ma.Multiaddr) (conn.Conn, error) {
|
func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs []ma.Multiaddr) (conn.Conn, error) {
|
||||||
|
|
||||||
// sort addresses so preferred addresses are dialed sooner
|
// sort addresses so preferred addresses are dialed sooner
|
||||||
sort.Sort(AddrList(remoteAddrs))
|
sort.Sort(AddrList(remoteAddrs))
|
||||||
@ -381,7 +356,7 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote
|
|||||||
connsout := conns
|
connsout := conns
|
||||||
errsout := errs
|
errsout := errs
|
||||||
|
|
||||||
connC, err := s.dialAddr(ctx, d, p, addr)
|
connC, err := s.dialAddr(ctx, p, addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
connsout = nil
|
connsout = nil
|
||||||
} else if connC == nil {
|
} else if connC == nil {
|
||||||
@ -451,10 +426,10 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote
|
|||||||
return nil, exitErr
|
return nil, exitErr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Swarm) dialAddr(ctx context.Context, d *conn.Dialer, p peer.ID, addr ma.Multiaddr) (conn.Conn, error) {
|
func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (conn.Conn, error) {
|
||||||
log.Debugf("%s swarm dialing %s %s", s.local, p, addr)
|
log.Debugf("%s swarm dialing %s %s", s.local, p, addr)
|
||||||
|
|
||||||
connC, err := d.Dial(ctx, addr, p)
|
connC, err := s.dialer.Dial(ctx, addr, p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("%s --> %s dial attempt failed: %s", s.local, p, err)
|
return nil, fmt.Errorf("%s --> %s dial attempt failed: %s", s.local, p, err)
|
||||||
}
|
}
|
||||||
|
@ -6,65 +6,78 @@ import (
|
|||||||
mconn "github.com/ipfs/go-ipfs/metrics/conn"
|
mconn "github.com/ipfs/go-ipfs/metrics/conn"
|
||||||
inet "github.com/ipfs/go-ipfs/p2p/net"
|
inet "github.com/ipfs/go-ipfs/p2p/net"
|
||||||
conn "github.com/ipfs/go-ipfs/p2p/net/conn"
|
conn "github.com/ipfs/go-ipfs/p2p/net/conn"
|
||||||
addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr"
|
transport "github.com/ipfs/go-ipfs/p2p/net/transport"
|
||||||
lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables"
|
lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables"
|
||||||
|
|
||||||
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||||
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
|
|
||||||
ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream"
|
ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream"
|
||||||
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
multierr "github.com/ipfs/go-ipfs/thirdparty/multierr"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Open listeners for each network the swarm should listen on
|
// Open listeners and reuse-dialers for the given addresses
|
||||||
func (s *Swarm) listen(addrs []ma.Multiaddr) error {
|
func (s *Swarm) setupInterfaces(addrs []ma.Multiaddr) error {
|
||||||
|
errs := make([]error, len(addrs))
|
||||||
for _, addr := range addrs {
|
var succeeded int
|
||||||
if !addrutil.AddrUsable(addr, true) {
|
for i, a := range addrs {
|
||||||
return fmt.Errorf("cannot use addr: %s", addr)
|
tpt := s.transportForAddr(a)
|
||||||
|
if tpt == nil {
|
||||||
|
errs[i] = fmt.Errorf("no transport for address: %s", a)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
retErr := multierr.New()
|
d, err := tpt.Dialer(a, transport.TimeoutOpt(DialTimeout), transport.ReusePorts)
|
||||||
|
|
||||||
// listen on every address
|
|
||||||
for i, addr := range addrs {
|
|
||||||
err := s.setupListener(addr)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if retErr.Errors == nil {
|
errs[i] = err
|
||||||
retErr.Errors = make([]error, len(addrs))
|
continue
|
||||||
}
|
|
||||||
retErr.Errors[i] = err
|
|
||||||
log.Debugf("Failed to listen on: %s - %s", addr, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.dialer.AddDialer(d)
|
||||||
|
|
||||||
|
list, err := tpt.Listen(a)
|
||||||
|
if err != nil {
|
||||||
|
errs[i] = err
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
err = s.addListener(list)
|
||||||
|
if err != nil {
|
||||||
|
errs[i] = err
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
succeeded++
|
||||||
}
|
}
|
||||||
|
|
||||||
if retErr.Errors != nil {
|
for i, e := range errs {
|
||||||
return retErr
|
if e != nil {
|
||||||
|
log.Warning("listen on %s failed: %s", addrs[i], errs[i])
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
if succeeded == 0 && len(addrs) > 0 {
|
||||||
|
return fmt.Errorf("failed to listen on any addresses: %s", errs)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Listen for new connections on the given multiaddr
|
func (s *Swarm) transportForAddr(a ma.Multiaddr) transport.Transport {
|
||||||
func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
|
for _, t := range s.transports {
|
||||||
|
if t.Matches(a) {
|
||||||
|
return t
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TODO rethink how this has to work. (jbenet)
|
return nil
|
||||||
//
|
}
|
||||||
// resolved, err := resolveUnspecifiedAddresses([]ma.Multiaddr{maddr})
|
|
||||||
// if err != nil {
|
func (s *Swarm) addListener(tptlist transport.Listener) error {
|
||||||
// return err
|
|
||||||
// }
|
|
||||||
// for _, a := range resolved {
|
|
||||||
// s.peers.AddAddr(s.local, a)
|
|
||||||
// }
|
|
||||||
|
|
||||||
sk := s.peers.PrivKey(s.local)
|
sk := s.peers.PrivKey(s.local)
|
||||||
if sk == nil {
|
if sk == nil {
|
||||||
// may be fine for sk to be nil, just log a warning.
|
// may be fine for sk to be nil, just log a warning.
|
||||||
log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.")
|
log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.")
|
||||||
}
|
}
|
||||||
log.Debugf("Swarm Listening at %s", maddr)
|
|
||||||
list, err := conn.Listen(s.Context(), maddr, s.local, sk)
|
list, err := conn.WrapTransportListener(s.Context(), tptlist, s.local, sk)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -72,11 +85,15 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
|
|||||||
list.SetAddrFilters(s.Filters)
|
list.SetAddrFilters(s.Filters)
|
||||||
|
|
||||||
if cw, ok := list.(conn.ListenerConnWrapper); ok {
|
if cw, ok := list.(conn.ListenerConnWrapper); ok {
|
||||||
cw.SetConnWrapper(func(c manet.Conn) manet.Conn {
|
cw.SetConnWrapper(func(c transport.Conn) transport.Conn {
|
||||||
return mconn.WrapConn(s.bwc, c)
|
return mconn.WrapConn(s.bwc, c)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return s.addConnListener(list)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Swarm) addConnListener(list conn.Listener) error {
|
||||||
// AddListener to the peerstream Listener. this will begin accepting connections
|
// AddListener to the peerstream Listener. this will begin accepting connections
|
||||||
// and streams!
|
// and streams!
|
||||||
sl, err := s.swarm.AddListener(list)
|
sl, err := s.swarm.AddListener(list)
|
||||||
@ -85,6 +102,8 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
|
|||||||
}
|
}
|
||||||
log.Debugf("Swarm Listeners at %s", s.ListenAddresses())
|
log.Debugf("Swarm Listeners at %s", s.ListenAddresses())
|
||||||
|
|
||||||
|
maddr := list.Multiaddr()
|
||||||
|
|
||||||
// signal to our notifiees on successful conn.
|
// signal to our notifiees on successful conn.
|
||||||
s.notifyAll(func(n inet.Notifiee) {
|
s.notifyAll(func(n inet.Notifiee) {
|
||||||
n.Listen((*Network)(s), maddr)
|
n.Listen((*Network)(s), maddr)
|
||||||
@ -107,7 +126,7 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
|
|||||||
if !more {
|
if !more {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.Warningf("swarm listener accept error: %s", err)
|
log.Errorf("swarm listener accept error: %s", err)
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -1,8 +1,10 @@
|
|||||||
package conn
|
package transport
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
|
"syscall"
|
||||||
|
|
||||||
reuseport "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport"
|
reuseport "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport"
|
||||||
)
|
)
|
||||||
@ -30,6 +32,34 @@ func init() {
|
|||||||
//
|
//
|
||||||
// If this becomes a sought after feature, we could add this to the config.
|
// If this becomes a sought after feature, we could add this to the config.
|
||||||
// In the end, reuseport is a stop-gap.
|
// In the end, reuseport is a stop-gap.
|
||||||
func reuseportIsAvailable() bool {
|
func ReuseportIsAvailable() bool {
|
||||||
return envReuseportVal && reuseport.Available()
|
return envReuseportVal && reuseport.Available()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReuseErrShouldRetry diagnoses whether to retry after a reuse error.
|
||||||
|
// if we failed to bind, we should retry. if bind worked and this is a
|
||||||
|
// real dial error (remote end didnt answer) then we should not retry.
|
||||||
|
func ReuseErrShouldRetry(err error) bool {
|
||||||
|
if err == nil {
|
||||||
|
return false // hey, it worked! no need to retry.
|
||||||
|
}
|
||||||
|
|
||||||
|
// if it's a network timeout error, it's a legitimate failure.
|
||||||
|
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
errno, ok := err.(syscall.Errno)
|
||||||
|
if !ok { // not an errno? who knows what this is. retry.
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
switch errno {
|
||||||
|
case syscall.EADDRINUSE, syscall.EADDRNOTAVAIL:
|
||||||
|
return true // failure to bind. retry.
|
||||||
|
case syscall.ECONNREFUSED:
|
||||||
|
return false // real dial error
|
||||||
|
default:
|
||||||
|
return true // optimistically default to retry.
|
||||||
|
}
|
||||||
|
}
|
236
p2p/net/transport/tcp.go
Normal file
236
p2p/net/transport/tcp.go
Normal file
@ -0,0 +1,236 @@
|
|||||||
|
package transport
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||||
|
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
|
||||||
|
reuseport "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport"
|
||||||
|
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
|
lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables"
|
||||||
|
)
|
||||||
|
|
||||||
|
type TcpTransport struct {
|
||||||
|
dlock sync.Mutex
|
||||||
|
dialers map[string]Dialer
|
||||||
|
|
||||||
|
llock sync.Mutex
|
||||||
|
listeners map[string]Listener
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTCPTransport() *TcpTransport {
|
||||||
|
return &TcpTransport{
|
||||||
|
dialers: make(map[string]Dialer),
|
||||||
|
listeners: make(map[string]Listener),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TcpTransport) Dialer(laddr ma.Multiaddr, opts ...DialOpt) (Dialer, error) {
|
||||||
|
t.dlock.Lock()
|
||||||
|
defer t.dlock.Unlock()
|
||||||
|
s := laddr.String()
|
||||||
|
d, found := t.dialers[s]
|
||||||
|
if found {
|
||||||
|
return d, nil
|
||||||
|
}
|
||||||
|
var base manet.Dialer
|
||||||
|
|
||||||
|
var doReuse bool
|
||||||
|
for _, o := range opts {
|
||||||
|
switch o := o.(type) {
|
||||||
|
case TimeoutOpt:
|
||||||
|
base.Timeout = time.Duration(o)
|
||||||
|
case ReuseportOpt:
|
||||||
|
doReuse = bool(o)
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unrecognized option: %#v", o)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tcpd, err := t.newTcpDialer(base, laddr, doReuse)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
t.dialers[s] = tcpd
|
||||||
|
return tcpd, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TcpTransport) Listen(laddr ma.Multiaddr) (Listener, error) {
|
||||||
|
t.llock.Lock()
|
||||||
|
defer t.llock.Unlock()
|
||||||
|
s := laddr.String()
|
||||||
|
l, found := t.listeners[s]
|
||||||
|
if found {
|
||||||
|
return l, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
list, err := manetListen(laddr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
tlist := &tcpListener{
|
||||||
|
list: list,
|
||||||
|
transport: t,
|
||||||
|
}
|
||||||
|
|
||||||
|
t.listeners[s] = tlist
|
||||||
|
return tlist, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func manetListen(addr ma.Multiaddr) (manet.Listener, error) {
|
||||||
|
network, naddr, err := manet.DialArgs(addr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if ReuseportIsAvailable() {
|
||||||
|
nl, err := reuseport.Listen(network, naddr)
|
||||||
|
if err == nil {
|
||||||
|
// hey, it worked!
|
||||||
|
return manet.WrapNetListener(nl)
|
||||||
|
}
|
||||||
|
// reuseport is available, but we failed to listen. log debug, and retry normally.
|
||||||
|
log.Debugf("reuseport available, but failed to listen: %s %s, %s", network, naddr, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// either reuseport not available, or it failed. try normally.
|
||||||
|
return manet.Listen(addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TcpTransport) Matches(a ma.Multiaddr) bool {
|
||||||
|
return IsTcpMultiaddr(a)
|
||||||
|
}
|
||||||
|
|
||||||
|
type tcpDialer struct {
|
||||||
|
laddr ma.Multiaddr
|
||||||
|
|
||||||
|
doReuse bool
|
||||||
|
|
||||||
|
rd reuseport.Dialer
|
||||||
|
madialer manet.Dialer
|
||||||
|
|
||||||
|
transport Transport
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TcpTransport) newTcpDialer(base manet.Dialer, laddr ma.Multiaddr, doReuse bool) (*tcpDialer, error) {
|
||||||
|
// get the local net.Addr manually
|
||||||
|
la, err := manet.ToNetAddr(laddr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err // something wrong with laddr.
|
||||||
|
}
|
||||||
|
|
||||||
|
if doReuse && ReuseportIsAvailable() {
|
||||||
|
rd := reuseport.Dialer{
|
||||||
|
D: net.Dialer{
|
||||||
|
LocalAddr: la,
|
||||||
|
Timeout: base.Timeout,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
return &tcpDialer{
|
||||||
|
doReuse: true,
|
||||||
|
laddr: laddr,
|
||||||
|
rd: rd,
|
||||||
|
madialer: base,
|
||||||
|
transport: t,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return &tcpDialer{
|
||||||
|
doReuse: false,
|
||||||
|
laddr: laddr,
|
||||||
|
madialer: base,
|
||||||
|
transport: t,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *tcpDialer) Dial(raddr ma.Multiaddr) (Conn, error) {
|
||||||
|
var c manet.Conn
|
||||||
|
var err error
|
||||||
|
if d.doReuse {
|
||||||
|
c, err = d.reuseDial(raddr)
|
||||||
|
} else {
|
||||||
|
c, err = d.madialer.Dial(raddr)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &connWrap{
|
||||||
|
Conn: c,
|
||||||
|
transport: d.transport,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *tcpDialer) reuseDial(raddr ma.Multiaddr) (manet.Conn, error) {
|
||||||
|
logdial := lgbl.Dial("conn", "", "", d.laddr, raddr)
|
||||||
|
rpev := log.EventBegin(context.TODO(), "tptDialReusePort", logdial)
|
||||||
|
|
||||||
|
network, netraddr, err := manet.DialArgs(raddr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
con, err := d.rd.Dial(network, netraddr)
|
||||||
|
if err == nil {
|
||||||
|
logdial["reuseport"] = "success"
|
||||||
|
rpev.Done()
|
||||||
|
return manet.WrapNetConn(con)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !ReuseErrShouldRetry(err) {
|
||||||
|
logdial["reuseport"] = "failure"
|
||||||
|
logdial["error"] = err
|
||||||
|
rpev.Done()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
logdial["reuseport"] = "retry"
|
||||||
|
logdial["error"] = err
|
||||||
|
rpev.Done()
|
||||||
|
|
||||||
|
return d.madialer.Dial(raddr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *tcpDialer) Matches(a ma.Multiaddr) bool {
|
||||||
|
return IsTcpMultiaddr(a)
|
||||||
|
}
|
||||||
|
|
||||||
|
type tcpListener struct {
|
||||||
|
list manet.Listener
|
||||||
|
transport Transport
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *tcpListener) Accept() (Conn, error) {
|
||||||
|
c, err := d.list.Accept()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &connWrap{
|
||||||
|
Conn: c,
|
||||||
|
transport: d.transport,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *tcpListener) Addr() net.Addr {
|
||||||
|
return d.list.Addr()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *tcpListener) Multiaddr() ma.Multiaddr {
|
||||||
|
return t.list.Multiaddr()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *tcpListener) NetListener() net.Listener {
|
||||||
|
return t.list.NetListener()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *tcpListener) Close() error {
|
||||||
|
return d.list.Close()
|
||||||
|
}
|
61
p2p/net/transport/transport.go
Normal file
61
p2p/net/transport/transport.go
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
package transport
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||||
|
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
|
||||||
|
logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
|
||||||
|
)
|
||||||
|
|
||||||
|
var log = logging.Logger("transport")
|
||||||
|
|
||||||
|
type Conn interface {
|
||||||
|
manet.Conn
|
||||||
|
|
||||||
|
Transport() Transport
|
||||||
|
}
|
||||||
|
|
||||||
|
type Transport interface {
|
||||||
|
Dialer(laddr ma.Multiaddr, opts ...DialOpt) (Dialer, error)
|
||||||
|
Listen(laddr ma.Multiaddr) (Listener, error)
|
||||||
|
Matches(ma.Multiaddr) bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type Dialer interface {
|
||||||
|
Dial(raddr ma.Multiaddr) (Conn, error)
|
||||||
|
Matches(ma.Multiaddr) bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type Listener interface {
|
||||||
|
Accept() (Conn, error)
|
||||||
|
Close() error
|
||||||
|
Addr() net.Addr
|
||||||
|
Multiaddr() ma.Multiaddr
|
||||||
|
}
|
||||||
|
|
||||||
|
type connWrap struct {
|
||||||
|
manet.Conn
|
||||||
|
transport Transport
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cw *connWrap) Transport() Transport {
|
||||||
|
return cw.transport
|
||||||
|
}
|
||||||
|
|
||||||
|
type DialOpt interface{}
|
||||||
|
type TimeoutOpt time.Duration
|
||||||
|
type ReuseportOpt bool
|
||||||
|
|
||||||
|
var ReusePorts ReuseportOpt = true
|
||||||
|
|
||||||
|
func IsTcpMultiaddr(a ma.Multiaddr) bool {
|
||||||
|
p := a.Protocols()
|
||||||
|
return len(p) == 2 && (p[0].Name == "ip4" || p[0].Name == "ip6") && p[1].Name == "tcp"
|
||||||
|
}
|
||||||
|
|
||||||
|
func IsUtpMultiaddr(a ma.Multiaddr) bool {
|
||||||
|
p := a.Protocols()
|
||||||
|
return len(p) == 3 && p[2].Name == "utp"
|
||||||
|
}
|
Reference in New Issue
Block a user