mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-28 17:03:58 +08:00
reuseport tcp is now a dial creation option
And other assorted PR feedback License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com>
This commit is contained in:
@ -54,7 +54,7 @@ func setupSingleConn(t *testing.T, ctx context.Context) (a, b Conn, p1, p2 tu.Pe
|
|||||||
}
|
}
|
||||||
|
|
||||||
func Listen(ctx context.Context, addr ma.Multiaddr, local peer.ID, sk ic.PrivKey) (Listener, error) {
|
func Listen(ctx context.Context, addr ma.Multiaddr, local peer.ID, sk ic.PrivKey) (Listener, error) {
|
||||||
list, err := transport.NewTCPTransport().Listener(addr)
|
list, err := transport.NewTCPTransport().Listen(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -114,7 +114,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
|
|||||||
prom.MustRegisterOrGet(peersTotal)
|
prom.MustRegisterOrGet(peersTotal)
|
||||||
s.Notify((*metricsNotifiee)(s))
|
s.Notify((*metricsNotifiee)(s))
|
||||||
|
|
||||||
return s, s.setupAddresses(listenAddrs)
|
return s, s.setupInterfaces(listenAddrs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Swarm) teardown() error {
|
func (s *Swarm) teardown() error {
|
||||||
@ -147,7 +147,7 @@ func (s *Swarm) Listen(addrs ...ma.Multiaddr) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.setupAddresses(addrs)
|
return s.setupInterfaces(addrs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process returns the Process of the swarm
|
// Process returns the Process of the swarm
|
||||||
|
@ -15,21 +15,21 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// Open listeners and reuse-dialers for the given addresses
|
// Open listeners and reuse-dialers for the given addresses
|
||||||
func (s *Swarm) setupAddresses(addrs []ma.Multiaddr) error {
|
func (s *Swarm) setupInterfaces(addrs []ma.Multiaddr) error {
|
||||||
for _, a := range addrs {
|
for _, a := range addrs {
|
||||||
tpt := s.transportForAddr(a)
|
tpt := s.transportForAddr(a)
|
||||||
if tpt == nil {
|
if tpt == nil {
|
||||||
return fmt.Errorf("no transport for address: %s", a)
|
return fmt.Errorf("no transport for address: %s", a)
|
||||||
}
|
}
|
||||||
|
|
||||||
d, err := tpt.Dialer(a, transport.TimeoutOpt(DialTimeout))
|
d, err := tpt.Dialer(a, transport.TimeoutOpt(DialTimeout), transport.ReusePorts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
s.dialer.AddDialer(d)
|
s.dialer.AddDialer(d)
|
||||||
|
|
||||||
list, err := tpt.Listener(a)
|
list, err := tpt.Listen(a)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -38,16 +38,19 @@ func (t *TcpTransport) Dialer(laddr ma.Multiaddr, opts ...DialOpt) (Dialer, erro
|
|||||||
}
|
}
|
||||||
var base manet.Dialer
|
var base manet.Dialer
|
||||||
|
|
||||||
|
var doReuse bool
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
switch o := o.(type) {
|
switch o := o.(type) {
|
||||||
case TimeoutOpt:
|
case TimeoutOpt:
|
||||||
base.Timeout = o.(time.Duration)
|
base.Timeout = time.Duration(o)
|
||||||
|
case ReuseportOpt:
|
||||||
|
doReuse = bool(o)
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("unrecognized option: %#v", o)
|
return nil, fmt.Errorf("unrecognized option: %#v", o)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tcpd, err := t.newTcpDialer(base, laddr)
|
tcpd, err := t.newTcpDialer(base, laddr, doReuse)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -56,7 +59,7 @@ func (t *TcpTransport) Dialer(laddr ma.Multiaddr, opts ...DialOpt) (Dialer, erro
|
|||||||
return tcpd, nil
|
return tcpd, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TcpTransport) Listener(laddr ma.Multiaddr) (Listener, error) {
|
func (t *TcpTransport) Listen(laddr ma.Multiaddr) (Listener, error) {
|
||||||
t.llock.Lock()
|
t.llock.Lock()
|
||||||
defer t.llock.Unlock()
|
defer t.llock.Unlock()
|
||||||
s := laddr.String()
|
s := laddr.String()
|
||||||
@ -114,33 +117,33 @@ type tcpDialer struct {
|
|||||||
transport Transport
|
transport Transport
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TcpTransport) newTcpDialer(base manet.Dialer, laddr ma.Multiaddr) (*tcpDialer, error) {
|
func (t *TcpTransport) newTcpDialer(base manet.Dialer, laddr ma.Multiaddr, doReuse bool) (*tcpDialer, error) {
|
||||||
// get the local net.Addr manually
|
// get the local net.Addr manually
|
||||||
la, err := manet.ToNetAddr(laddr)
|
la, err := manet.ToNetAddr(laddr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err // something wrong with laddr.
|
return nil, err // something wrong with laddr.
|
||||||
}
|
}
|
||||||
|
|
||||||
if !ReuseportIsAvailable() {
|
if doReuse && ReuseportIsAvailable() {
|
||||||
|
rd := reuseport.Dialer{
|
||||||
|
D: net.Dialer{
|
||||||
|
LocalAddr: la,
|
||||||
|
Timeout: base.Timeout,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
return &tcpDialer{
|
return &tcpDialer{
|
||||||
doReuse: false,
|
doReuse: true,
|
||||||
laddr: laddr,
|
laddr: laddr,
|
||||||
|
rd: rd,
|
||||||
madialer: base,
|
madialer: base,
|
||||||
transport: t,
|
transport: t,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
rd := reuseport.Dialer{
|
|
||||||
D: net.Dialer{
|
|
||||||
LocalAddr: la,
|
|
||||||
Timeout: base.Timeout,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
return &tcpDialer{
|
return &tcpDialer{
|
||||||
doReuse: true,
|
doReuse: false,
|
||||||
laddr: laddr,
|
laddr: laddr,
|
||||||
rd: rd,
|
|
||||||
madialer: base,
|
madialer: base,
|
||||||
transport: t,
|
transport: t,
|
||||||
}, nil
|
}, nil
|
||||||
|
@ -2,6 +2,7 @@ package transport
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"net"
|
"net"
|
||||||
|
"time"
|
||||||
|
|
||||||
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||||
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
|
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
|
||||||
@ -18,7 +19,7 @@ type Conn interface {
|
|||||||
|
|
||||||
type Transport interface {
|
type Transport interface {
|
||||||
Dialer(laddr ma.Multiaddr, opts ...DialOpt) (Dialer, error)
|
Dialer(laddr ma.Multiaddr, opts ...DialOpt) (Dialer, error)
|
||||||
Listener(laddr ma.Multiaddr) (Listener, error)
|
Listen(laddr ma.Multiaddr) (Listener, error)
|
||||||
Matches(ma.Multiaddr) bool
|
Matches(ma.Multiaddr) bool
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -44,7 +45,10 @@ func (cw *connWrap) Transport() Transport {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type DialOpt interface{}
|
type DialOpt interface{}
|
||||||
type TimeoutOpt interface{}
|
type TimeoutOpt time.Duration
|
||||||
|
type ReuseportOpt bool
|
||||||
|
|
||||||
|
var ReusePorts ReuseportOpt = true
|
||||||
|
|
||||||
func IsTcpMultiaddr(a ma.Multiaddr) bool {
|
func IsTcpMultiaddr(a ma.Multiaddr) bool {
|
||||||
p := a.Protocols()
|
p := a.Protocols()
|
||||||
|
Reference in New Issue
Block a user