1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-30 09:59:13 +08:00

p2p/net: notify on listens

Network now signals when it successfully listens on some address
or when an address shuts down. This will be used to establish and
close nat port mappings. It could also be used to notify peers
of address changes.
This commit is contained in:
Juan Batiz-Benet
2015-01-30 20:17:55 -08:00
parent 75a2975b85
commit 98f2b0779f
8 changed files with 64 additions and 12 deletions

View File

@ -2,6 +2,7 @@ package network
import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
host "github.com/jbenet/go-ipfs/p2p/host"
@ -171,3 +172,5 @@ func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) {}
func (nn *netNotifiee) ClosedStream(n inet.Network, v inet.Stream) {}
func (nn *netNotifiee) Listen(n inet.Network, a ma.Multiaddr) {}
func (nn *netNotifiee) ListenClose(n inet.Network, a ma.Multiaddr) {}

View File

@ -142,6 +142,8 @@ const (
// Notifiee is an interface for an object wishing to receive
// notifications from a Network.
type Notifiee interface {
Listen(Network, ma.Multiaddr) // called when network starts listening on an addr
ListenClose(Network, ma.Multiaddr) // called when network starts listening on an addr
Connected(Network, Conn) // called when a connection opened
Disconnected(Network, Conn) // called when a connection closed
OpenedStream(Network, Stream) // called when a stream opened

View File

@ -4,9 +4,10 @@ import (
"testing"
"time"
inet "github.com/jbenet/go-ipfs/p2p/net"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
inet "github.com/jbenet/go-ipfs/p2p/net"
)
func TestNotifications(t *testing.T) {
@ -169,6 +170,8 @@ func TestNotifications(t *testing.T) {
}
type netNotifiee struct {
listen chan ma.Multiaddr
listenClose chan ma.Multiaddr
connected chan inet.Conn
disconnected chan inet.Conn
openedStream chan inet.Stream
@ -177,6 +180,8 @@ type netNotifiee struct {
func newNetNotifiee() *netNotifiee {
return &netNotifiee{
listen: make(chan ma.Multiaddr),
listenClose: make(chan ma.Multiaddr),
connected: make(chan inet.Conn),
disconnected: make(chan inet.Conn),
openedStream: make(chan inet.Stream),
@ -184,6 +189,12 @@ func newNetNotifiee() *netNotifiee {
}
}
func (nn *netNotifiee) Listen(n inet.Network, a ma.Multiaddr) {
nn.listen <- a
}
func (nn *netNotifiee) ListenClose(n inet.Network, a ma.Multiaddr) {
nn.listenClose <- a
}
func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
nn.connected <- v
}

View File

@ -202,6 +202,15 @@ func (s *Swarm) LocalPeer() peer.ID {
return s.local
}
// notifyAll sends a signal to all Notifiees
func (s *Swarm) notifyAll(notify func(inet.Notifiee)) {
s.notifmu.RLock()
for f := range s.notifs {
go notify(f)
}
s.notifmu.RUnlock()
}
// Notify signs up Notifiee to receive signals when events happen
func (s *Swarm) Notify(f inet.Notifiee) {
// wrap with our notifiee, to translate function calls

View File

@ -382,7 +382,7 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote
for i := 0; i < len(remoteAddrs); i++ {
select {
case err = <-errs:
log.Info(err)
log.Debug(err)
case connC := <-conns:
// take the first + return asap
close(foundConn)

View File

@ -3,6 +3,7 @@ package swarm
import (
"fmt"
inet "github.com/jbenet/go-ipfs/p2p/net"
conn "github.com/jbenet/go-ipfs/p2p/net/conn"
addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr"
lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"
@ -60,7 +61,7 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
// may be fine for sk to be nil, just log a warning.
log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.")
}
log.Infof("Swarm Listening at %s", maddr)
log.Debugf("Swarm Listening at %s", maddr)
list, err := conn.Listen(s.cg.Context(), maddr, s.local, sk)
if err != nil {
return err
@ -72,20 +73,31 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
if err != nil {
return err
}
log.Infof("Swarm Listeners at %s", s.ListenAddresses())
log.Debugf("Swarm Listeners at %s", s.ListenAddresses())
// signal to our notifiees on successful conn.
s.notifyAll(func(n inet.Notifiee) {
n.Listen((*Network)(s), maddr)
})
// go consume peerstream's listen accept errors. note, these ARE errors.
// they may be killing the listener, and if we get _any_ we should be
// fixing this in our conn.Listener (to ignore them or handle them
// differently.)
go func(ctx context.Context, sl *ps.Listener) {
// signal to our notifiees closing
defer s.notifyAll(func(n inet.Notifiee) {
n.ListenClose((*Network)(s), maddr)
})
for {
select {
case err, more := <-sl.AcceptErrors():
if !more {
return
}
log.Info(err)
log.Debugf("swarm listener accept error: %s", err)
case <-ctx.Done():
return
}

View File

@ -4,9 +4,10 @@ import (
"testing"
"time"
inet "github.com/jbenet/go-ipfs/p2p/net"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
inet "github.com/jbenet/go-ipfs/p2p/net"
)
func TestNotifications(t *testing.T) {
@ -157,6 +158,8 @@ func TestNotifications(t *testing.T) {
}
type netNotifiee struct {
listen chan ma.Multiaddr
listenClose chan ma.Multiaddr
connected chan inet.Conn
disconnected chan inet.Conn
openedStream chan inet.Stream
@ -165,6 +168,8 @@ type netNotifiee struct {
func newNetNotifiee() *netNotifiee {
return &netNotifiee{
listen: make(chan ma.Multiaddr),
listenClose: make(chan ma.Multiaddr),
connected: make(chan inet.Conn),
disconnected: make(chan inet.Conn),
openedStream: make(chan inet.Stream),
@ -172,6 +177,12 @@ func newNetNotifiee() *netNotifiee {
}
}
func (nn *netNotifiee) Listen(n inet.Network, a ma.Multiaddr) {
nn.listen <- a
}
func (nn *netNotifiee) ListenClose(n inet.Network, a ma.Multiaddr) {
nn.listenClose <- a
}
func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
nn.connected <- v
}

View File

@ -1,6 +1,8 @@
package dht
import (
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
inet "github.com/jbenet/go-ipfs/p2p/net"
)
@ -31,3 +33,5 @@ func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) {}
func (nn *netNotifiee) ClosedStream(n inet.Network, v inet.Stream) {}
func (nn *netNotifiee) Listen(n inet.Network, a ma.Multiaddr) {}
func (nn *netNotifiee) ListenClose(n inet.Network, a ma.Multiaddr) {}