mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-01 19:24:14 +08:00
implement utp transport
License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com>
This commit is contained in:
@ -18,8 +18,8 @@ var log = logging.Logger("p2p/net/swarm/addr")
|
||||
var SupportedTransportStrings = []string{
|
||||
"/ip4/tcp",
|
||||
"/ip6/tcp",
|
||||
// "/ip4/udp/utp", disabled because the lib is broken
|
||||
// "/ip6/udp/utp", disabled because the lib is broken
|
||||
"/ip4/udp/utp",
|
||||
"/ip6/udp/utp",
|
||||
// "/ip4/udp/udt", disabled because the lib doesnt work on arm
|
||||
// "/ip6/udp/udt", disabled because the lib doesnt work on arm
|
||||
}
|
||||
|
@ -20,7 +20,6 @@ func TestFilterAddrs(t *testing.T) {
|
||||
bad := []ma.Multiaddr{
|
||||
newMultiaddr(t, "/ip4/1.2.3.4/udp/1234"), // unreliable
|
||||
newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/sctp/1234"), // not in manet
|
||||
newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/utp"), // utp is broken
|
||||
newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/udt"), // udt is broken on arm
|
||||
newMultiaddr(t, "/ip6/fe80::1/tcp/1234"), // link local
|
||||
newMultiaddr(t, "/ip6/fe80::100/tcp/1234"), // link local
|
||||
@ -29,6 +28,7 @@ func TestFilterAddrs(t *testing.T) {
|
||||
good := []ma.Multiaddr{
|
||||
newMultiaddr(t, "/ip4/127.0.0.1/tcp/1234"),
|
||||
newMultiaddr(t, "/ip6/::1/tcp/1234"),
|
||||
newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/utp"),
|
||||
}
|
||||
|
||||
goodAndBad := append(good, bad...)
|
||||
@ -39,18 +39,12 @@ func TestFilterAddrs(t *testing.T) {
|
||||
if AddrUsable(a, false) {
|
||||
t.Errorf("addr %s should be unusable", a)
|
||||
}
|
||||
if AddrUsable(a, true) {
|
||||
t.Errorf("addr %s should be unusable", a)
|
||||
}
|
||||
}
|
||||
|
||||
for _, a := range good {
|
||||
if !AddrUsable(a, false) {
|
||||
t.Errorf("addr %s should be usable", a)
|
||||
}
|
||||
if !AddrUsable(a, true) {
|
||||
t.Errorf("addr %s should be usable", a)
|
||||
}
|
||||
}
|
||||
|
||||
subtestAddrsEqual(t, FilterUsableAddrs(bad), []ma.Multiaddr{})
|
||||
|
@ -91,13 +91,16 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
|
||||
}
|
||||
|
||||
s := &Swarm{
|
||||
swarm: ps.NewSwarm(PSTransport),
|
||||
local: local,
|
||||
peers: peers,
|
||||
ctx: ctx,
|
||||
dialT: DialTimeout,
|
||||
notifs: make(map[inet.Notifiee]ps.Notifiee),
|
||||
transports: []transport.Transport{transport.NewTCPTransport()},
|
||||
swarm: ps.NewSwarm(PSTransport),
|
||||
local: local,
|
||||
peers: peers,
|
||||
ctx: ctx,
|
||||
dialT: DialTimeout,
|
||||
notifs: make(map[inet.Notifiee]ps.Notifiee),
|
||||
transports: []transport.Transport{
|
||||
transport.NewTCPTransport(),
|
||||
transport.NewUtpTransport(),
|
||||
},
|
||||
bwc: bwc,
|
||||
fdRateLimit: make(chan struct{}, concurrentFdDials),
|
||||
Filters: filter.NewFilters(),
|
||||
|
@ -25,7 +25,6 @@ func TestFilterAddrs(t *testing.T) {
|
||||
bad := []ma.Multiaddr{
|
||||
m("/ip4/1.2.3.4/udp/1234"), // unreliable
|
||||
m("/ip4/1.2.3.4/udp/1234/sctp/1234"), // not in manet
|
||||
m("/ip4/1.2.3.4/udp/1234/utp"), // utp is broken
|
||||
m("/ip4/1.2.3.4/udp/1234/udt"), // udt is broken on arm
|
||||
m("/ip6/fe80::1/tcp/0"), // link local
|
||||
m("/ip6/fe80::100/tcp/1234"), // link local
|
||||
@ -34,6 +33,7 @@ func TestFilterAddrs(t *testing.T) {
|
||||
good := []ma.Multiaddr{
|
||||
m("/ip4/127.0.0.1/tcp/0"),
|
||||
m("/ip6/::1/tcp/0"),
|
||||
m("/ip4/1.2.3.4/udp/1234/utp"),
|
||||
}
|
||||
|
||||
goodAndBad := append(good, bad...)
|
||||
@ -41,13 +41,13 @@ func TestFilterAddrs(t *testing.T) {
|
||||
// test filters
|
||||
|
||||
for _, a := range bad {
|
||||
if addrutil.AddrUsable(a, true) {
|
||||
if addrutil.AddrUsable(a, false) {
|
||||
t.Errorf("addr %s should be unusable", a)
|
||||
}
|
||||
}
|
||||
|
||||
for _, a := range good {
|
||||
if !addrutil.AddrUsable(a, true) {
|
||||
if !addrutil.AddrUsable(a, false) {
|
||||
t.Errorf("addr %s should be usable", a)
|
||||
}
|
||||
}
|
||||
|
148
p2p/net/transport/utp.go
Normal file
148
p2p/net/transport/utp.go
Normal file
@ -0,0 +1,148 @@
|
||||
package transport
|
||||
|
||||
import (
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
utp "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/anacrolix/utp"
|
||||
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
|
||||
mautp "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net/utp"
|
||||
)
|
||||
|
||||
type UtpTransport struct {
|
||||
sockLock sync.Mutex
|
||||
sockets map[string]*UtpSocket
|
||||
}
|
||||
|
||||
func NewUtpTransport() *UtpTransport {
|
||||
return &UtpTransport{
|
||||
sockets: make(map[string]*UtpSocket),
|
||||
}
|
||||
}
|
||||
|
||||
func (d *UtpTransport) Matches(a ma.Multiaddr) bool {
|
||||
p := a.Protocols()
|
||||
return len(p) == 3 && p[2].Name == "utp"
|
||||
}
|
||||
|
||||
type UtpSocket struct {
|
||||
s *utp.Socket
|
||||
laddr ma.Multiaddr
|
||||
transport Transport
|
||||
}
|
||||
|
||||
func (t *UtpTransport) Listen(laddr ma.Multiaddr) (Listener, error) {
|
||||
t.sockLock.Lock()
|
||||
defer t.sockLock.Unlock()
|
||||
s, ok := t.sockets[laddr.String()]
|
||||
if ok {
|
||||
return s, nil
|
||||
}
|
||||
|
||||
ns, err := t.newConn(laddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
t.sockets[laddr.String()] = ns
|
||||
return ns, nil
|
||||
}
|
||||
|
||||
func (t *UtpTransport) Dialer(laddr ma.Multiaddr, opts ...DialOpt) (Dialer, error) {
|
||||
t.sockLock.Lock()
|
||||
defer t.sockLock.Unlock()
|
||||
s, ok := t.sockets[laddr.String()]
|
||||
if ok {
|
||||
return s, nil
|
||||
}
|
||||
|
||||
ns, err := t.newConn(laddr, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
t.sockets[laddr.String()] = ns
|
||||
return ns, nil
|
||||
}
|
||||
|
||||
func (t *UtpTransport) newConn(addr ma.Multiaddr, opts ...DialOpt) (*UtpSocket, error) {
|
||||
network, netaddr, err := manet.DialArgs(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s, err := utp.NewSocket("udp"+network[3:], netaddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
laddr, err := manet.FromNetAddr(mautp.MakeAddr(s.LocalAddr()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &UtpSocket{
|
||||
s: s,
|
||||
laddr: laddr,
|
||||
transport: t,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *UtpSocket) Dial(raddr ma.Multiaddr) (Conn, error) {
|
||||
_, addr, err := manet.DialArgs(raddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
con, err := s.s.Dial(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
mnc, err := manet.WrapNetConn(&mautp.Conn{Conn: con})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &connWrap{
|
||||
Conn: mnc,
|
||||
transport: s.transport,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *UtpSocket) Accept() (Conn, error) {
|
||||
c, err := s.s.Accept()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
mnc, err := manet.WrapNetConn(&mautp.Conn{Conn: c})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &connWrap{
|
||||
Conn: mnc,
|
||||
transport: s.transport,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *UtpSocket) Matches(a ma.Multiaddr) bool {
|
||||
p := a.Protocols()
|
||||
return len(p) == 3 && p[2].Name == "utp"
|
||||
}
|
||||
|
||||
func (t *UtpSocket) Close() error {
|
||||
return t.s.Close()
|
||||
}
|
||||
|
||||
func (t *UtpSocket) Addr() net.Addr {
|
||||
return t.s.Addr()
|
||||
}
|
||||
|
||||
func (t *UtpSocket) Multiaddr() ma.Multiaddr {
|
||||
return t.laddr
|
||||
}
|
||||
|
||||
var _ Transport = (*UtpTransport)(nil)
|
@ -78,4 +78,10 @@ test_expect_success "set up tcp testbed" '
|
||||
|
||||
run_basic_test
|
||||
|
||||
test_expect_success "set up utp testbed" '
|
||||
iptb init -n 5 -p 0 -f --bootstrap=none --utp
|
||||
'
|
||||
|
||||
run_basic_test
|
||||
|
||||
test_done
|
||||
|
Reference in New Issue
Block a user