1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-08-06 19:44:01 +08:00

p2p: tag connections in connection manager

License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
This commit is contained in:
Łukasz Magiera
2018-07-30 17:15:01 +02:00
parent 8849193de0
commit 4badcdc340
5 changed files with 29 additions and 9 deletions

View File

@ -14,7 +14,6 @@ import (
core "github.com/ipfs/go-ipfs/core"
p2p "github.com/ipfs/go-ipfs/p2p"
"gx/ipfs/Qme4QgoVPyQqxVc4G1c2L2wc9TDa6o294rtspGMnBNRujm/go-ipfs-addr"
ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
"gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
pstore "gx/ipfs/QmZR2XWVVBCtbgBWnQhWk2xcQfaR3W8faQPriAiaaj7rsr/go-libp2p-peerstore"

View File

@ -4,12 +4,12 @@ import (
"context"
"time"
"gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net"
ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
"gx/ipfs/QmdVrMn1LhB4ybb8hMVaMLXnA8XRSewMnK6YqXKXoTcRvN/go-libp2p-peer"
tec "gx/ipfs/QmWHgLqrghM9zw77nF6gdvT9ExQ2RB9pLxkd8sDHZf1rWb/go-temp-err-catcher"
"gx/ipfs/QmPjvxTpVH8qJyQDnxnsxF9kv9jezKD1kozz1hs3fCGsNh/go-libp2p-net"
"gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net"
tec "gx/ipfs/QmWHgLqrghM9zw77nF6gdvT9ExQ2RB9pLxkd8sDHZf1rWb/go-temp-err-catcher"
ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
"gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
"gx/ipfs/QmdVrMn1LhB4ybb8hMVaMLXnA8XRSewMnK6YqXKXoTcRvN/go-libp2p-peer"
)
// localListener manet streams and proxies them to libp2p services
@ -77,6 +77,9 @@ func (l *localListener) setupStream(local manet.Conn) {
return
}
cmgr := l.p2p.peerHost.ConnManager()
cmgr.TagPeer(l.peer, CMGR_TAG, 20)
stream := &Stream{
Protocol: l.proto,
@ -87,6 +90,10 @@ func (l *localListener) setupStream(local manet.Conn) {
Remote: remote,
Registry: l.p2p.Streams,
cleanup: func() {
cmgr.UntagPeer(l.peer, CMGR_TAG)
},
}
l.p2p.Streams.Register(stream)

View File

@ -1,9 +1,9 @@
package p2p
import (
logging "gx/ipfs/QmcVVHfdyv15GVPk7NrxdWjh2hLVccXnoD8j2tyQShiXJb/go-log"
pstore "gx/ipfs/QmZR2XWVVBCtbgBWnQhWk2xcQfaR3W8faQPriAiaaj7rsr/go-libp2p-peerstore"
p2phost "gx/ipfs/Qmb8T6YBBsjYsVGfrihQLfCJveczZnneSBqBKkYEBWDjge/go-libp2p-host"
logging "gx/ipfs/QmcVVHfdyv15GVPk7NrxdWjh2hLVccXnoD8j2tyQShiXJb/go-log"
peer "gx/ipfs/QmdVrMn1LhB4ybb8hMVaMLXnA8XRSewMnK6YqXKXoTcRvN/go-libp2p-peer"
)

View File

@ -3,9 +3,9 @@ package p2p
import (
"context"
net "gx/ipfs/QmPjvxTpVH8qJyQDnxnsxF9kv9jezKD1kozz1hs3fCGsNh/go-libp2p-net"
manet "gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net"
ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
net "gx/ipfs/QmPjvxTpVH8qJyQDnxnsxF9kv9jezKD1kozz1hs3fCGsNh/go-libp2p-net"
protocol "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
)
@ -47,12 +47,17 @@ func (l *remoteListener) start() error {
return
}
peerMa, err := ma.NewMultiaddr(maPrefix + remote.Conn().RemotePeer().Pretty())
peer := remote.Conn().RemotePeer()
peerMa, err := ma.NewMultiaddr(maPrefix + peer.Pretty())
if err != nil {
remote.Reset()
return
}
cmgr := l.p2p.peerHost.ConnManager()
cmgr.TagPeer(peer, CMGR_TAG, 20)
stream := &Stream{
Protocol: l.proto,
@ -63,6 +68,10 @@ func (l *remoteListener) start() error {
Remote: remote,
Registry: l.p2p.Streams,
cleanup: func() {
cmgr.UntagPeer(peer, CMGR_TAG)
},
}
l.p2p.Streams.Register(stream)

View File

@ -4,12 +4,14 @@ import (
"io"
"sync"
net "gx/ipfs/QmPjvxTpVH8qJyQDnxnsxF9kv9jezKD1kozz1hs3fCGsNh/go-libp2p-net"
manet "gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net"
ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
net "gx/ipfs/QmPjvxTpVH8qJyQDnxnsxF9kv9jezKD1kozz1hs3fCGsNh/go-libp2p-net"
"gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
)
const CMGR_TAG = "stream-fwd"
// Stream holds information on active incoming and outgoing p2p streams.
type Stream struct {
id uint64
@ -23,12 +25,15 @@ type Stream struct {
Remote net.Stream
Registry *StreamRegistry
cleanup func()
}
// Close closes stream endpoints and deregisters it
func (s *Stream) Close() error {
s.Local.Close()
s.Remote.Close()
s.cleanup()
s.Registry.Deregister(s.id)
return nil
}