mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-29 01:12:24 +08:00
filter incoming connections and add a test of functionality
- add extra check to dialblock test - move filter to separate package - also improved tests - sunk filters down into p2p/net/conn/listener License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com> Signed-off-by: Juan Batiz-Benet <juan@benet.ai>
This commit is contained in:
@ -7,6 +7,7 @@ import (
|
||||
|
||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||
ic "github.com/ipfs/go-ipfs/p2p/crypto"
|
||||
filter "github.com/ipfs/go-ipfs/p2p/net/filter"
|
||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||
|
||||
msgio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio"
|
||||
@ -86,6 +87,8 @@ type Listener interface {
|
||||
// LocalPeer is the identity of the local Peer.
|
||||
LocalPeer() peer.ID
|
||||
|
||||
SetAddrFilters(*filter.Filters)
|
||||
|
||||
// Close closes the listener.
|
||||
// Any blocked Accept operations will be unblocked and return errors.
|
||||
Close() error
|
||||
|
@ -13,6 +13,7 @@ import (
|
||||
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
|
||||
ic "github.com/ipfs/go-ipfs/p2p/crypto"
|
||||
filter "github.com/ipfs/go-ipfs/p2p/net/filter"
|
||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||
)
|
||||
|
||||
@ -26,6 +27,8 @@ type listener struct {
|
||||
local peer.ID // LocalPeer is the identity of the local Peer
|
||||
privk ic.PrivKey // private key to use to initialize secure conns
|
||||
|
||||
filters *filter.Filters
|
||||
|
||||
wrapper ConnWrapper
|
||||
|
||||
cg ctxgroup.ContextGroup
|
||||
@ -45,6 +48,10 @@ func (l *listener) String() string {
|
||||
return fmt.Sprintf("<Listener %s %s>", l.local, l.Multiaddr())
|
||||
}
|
||||
|
||||
func (l *listener) SetAddrFilters(fs *filter.Filters) {
|
||||
l.filters = fs
|
||||
}
|
||||
|
||||
// Accept waits for and returns the next connection to the listener.
|
||||
// Note that unfortunately this
|
||||
func (l *listener) Accept() (net.Conn, error) {
|
||||
@ -81,6 +88,12 @@ func (l *listener) Accept() (net.Conn, error) {
|
||||
}
|
||||
|
||||
log.Debugf("listener %s got connection: %s <---> %s", l, maconn.LocalMultiaddr(), maconn.RemoteMultiaddr())
|
||||
|
||||
if l.filters != nil && l.filters.AddrBlocked(maconn.RemoteMultiaddr()) {
|
||||
log.Debugf("blocked connection from %s", maconn.RemoteMultiaddr())
|
||||
maconn.Close()
|
||||
continue
|
||||
}
|
||||
// If we have a wrapper func, wrap this conn
|
||||
if l.wrapper != nil {
|
||||
maconn = l.wrapper(maconn)
|
||||
|
34
p2p/net/filter/filter.go
Normal file
34
p2p/net/filter/filter.go
Normal file
@ -0,0 +1,34 @@
|
||||
package filter
|
||||
|
||||
import (
|
||||
"net"
|
||||
"strings"
|
||||
|
||||
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
|
||||
)
|
||||
|
||||
type Filters struct {
|
||||
filters []*net.IPNet
|
||||
}
|
||||
|
||||
func (fs *Filters) AddDialFilter(f *net.IPNet) {
|
||||
fs.filters = append(fs.filters, f)
|
||||
}
|
||||
|
||||
func (f *Filters) AddrBlocked(a ma.Multiaddr) bool {
|
||||
_, addr, err := manet.DialArgs(a)
|
||||
if err != nil {
|
||||
// if we cant parse it, its probably not blocked
|
||||
return false
|
||||
}
|
||||
|
||||
ipstr := strings.Split(addr, ":")[0]
|
||||
ip := net.ParseIP(ipstr)
|
||||
for _, ft := range f.filters {
|
||||
if ft.Contains(ip) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
@ -4,19 +4,18 @@ package swarm
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
metrics "github.com/ipfs/go-ipfs/metrics"
|
||||
inet "github.com/ipfs/go-ipfs/p2p/net"
|
||||
filter "github.com/ipfs/go-ipfs/p2p/net/filter"
|
||||
addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr"
|
||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
|
||||
|
||||
ctxgroup "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
|
||||
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
|
||||
ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream"
|
||||
pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport"
|
||||
psy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux"
|
||||
@ -53,7 +52,7 @@ type Swarm struct {
|
||||
notifs map[inet.Notifiee]ps.Notifiee
|
||||
|
||||
// filters for addresses that shouldnt be dialed
|
||||
Filters *Filters
|
||||
Filters *filter.Filters
|
||||
|
||||
cg ctxgroup.ContextGroup
|
||||
bwc metrics.Reporter
|
||||
@ -76,7 +75,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
|
||||
dialT: DialTimeout,
|
||||
notifs: make(map[inet.Notifiee]ps.Notifiee),
|
||||
bwc: bwc,
|
||||
Filters: new(Filters),
|
||||
Filters: new(filter.Filters),
|
||||
}
|
||||
|
||||
// configure Swarm
|
||||
@ -90,30 +89,6 @@ func (s *Swarm) teardown() error {
|
||||
return s.swarm.Close()
|
||||
}
|
||||
|
||||
type Filters struct {
|
||||
filters []*net.IPNet
|
||||
}
|
||||
|
||||
func (fs *Filters) AddDialFilter(f *net.IPNet) {
|
||||
fs.filters = append(fs.filters, f)
|
||||
}
|
||||
|
||||
func (f *Filters) AddrBlocked(a ma.Multiaddr) bool {
|
||||
_, addr, err := manet.DialArgs(a)
|
||||
if err != nil {
|
||||
// if we cant parse it, its probably not blocked
|
||||
return false
|
||||
}
|
||||
|
||||
ip := net.ParseIP(addr)
|
||||
for _, ft := range f.filters {
|
||||
if ft.Contains(ip) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// CtxGroup returns the Context Group of the swarm
|
||||
func filterAddrs(listenAddrs []ma.Multiaddr) ([]ma.Multiaddr, error) {
|
||||
if len(listenAddrs) > 0 {
|
||||
|
@ -303,7 +303,6 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
|
||||
ila, _ := s.InterfaceListenAddresses()
|
||||
remoteAddrs = addrutil.Subtract(remoteAddrs, ila)
|
||||
remoteAddrs = addrutil.Subtract(remoteAddrs, s.peers.Addrs(s.local))
|
||||
remoteAddrs = s.filterAddrs(remoteAddrs)
|
||||
|
||||
log.Debugf("%s swarm dialing %s -- local:%s remote:%s", s.local, p, s.ListenAddresses(), remoteAddrs)
|
||||
if len(remoteAddrs) == 0 {
|
||||
@ -312,6 +311,13 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
remoteAddrs = s.filterAddrs(remoteAddrs)
|
||||
if len(remoteAddrs) == 0 {
|
||||
err := errors.New("all adresses for peer have been filtered out")
|
||||
logdial["error"] = err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// open connection to peer
|
||||
d := &conn.Dialer{
|
||||
Dialer: manet.Dialer{
|
||||
|
@ -69,6 +69,8 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
|
||||
return err
|
||||
}
|
||||
|
||||
list.SetAddrFilters(s.Filters)
|
||||
|
||||
if cw, ok := list.(conn.ListenerConnWrapper); ok {
|
||||
cw.SetConnWrapper(func(c manet.Conn) manet.Conn {
|
||||
return mconn.WrapConn(s.bwc, c)
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
@ -270,3 +271,60 @@ func TestConnHandler(t *testing.T) {
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddrBlocking(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
swarms := makeSwarms(ctx, t, 2)
|
||||
|
||||
swarms[0].SetConnHandler(func(conn *Conn) {
|
||||
t.Fatal("no connections should happen!")
|
||||
})
|
||||
|
||||
_, block, err := net.ParseCIDR("127.0.0.1/8")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
swarms[1].Filters.AddDialFilter(block)
|
||||
|
||||
swarms[1].peers.AddAddr(swarms[0].LocalPeer(), swarms[0].ListenAddresses()[0], peer.PermanentAddrTTL)
|
||||
_, err = swarms[1].Dial(context.TODO(), swarms[0].LocalPeer())
|
||||
if err == nil {
|
||||
t.Fatal("dial should have failed")
|
||||
}
|
||||
|
||||
swarms[0].peers.AddAddr(swarms[1].LocalPeer(), swarms[1].ListenAddresses()[0], peer.PermanentAddrTTL)
|
||||
_, err = swarms[0].Dial(context.TODO(), swarms[1].LocalPeer())
|
||||
if err == nil {
|
||||
t.Fatal("dial should have failed")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFilterBounds(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
swarms := makeSwarms(ctx, t, 2)
|
||||
|
||||
conns := make(chan struct{}, 8)
|
||||
swarms[0].SetConnHandler(func(conn *Conn) {
|
||||
conns <- struct{}{}
|
||||
})
|
||||
|
||||
// Address that we wont be dialing from
|
||||
_, block, err := net.ParseCIDR("192.0.0.1/8")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// set filter on both sides, shouldnt matter
|
||||
swarms[1].Filters.AddDialFilter(block)
|
||||
swarms[0].Filters.AddDialFilter(block)
|
||||
|
||||
connectSwarms(t, ctx, swarms)
|
||||
|
||||
select {
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("should have gotten connection")
|
||||
case <-conns:
|
||||
fmt.Println("got connect")
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user