1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-08-06 11:31:54 +08:00

PTP API: Make code more object oriented, use less node

License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
This commit is contained in:
Łukasz Magiera
2017-06-03 23:28:26 +02:00
parent ad9ae350bf
commit 104268f1b6
11 changed files with 328 additions and 361 deletions

View File

@ -10,13 +10,12 @@ import (
cmds "github.com/ipfs/go-ipfs/commands"
core "github.com/ipfs/go-ipfs/core"
ptpnet "github.com/ipfs/go-ipfs/ptp/net"
ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr"
)
// PTPAppInfoOutput is output type of ls command
type PTPAppInfoOutput struct {
// PTPListenerInfoOutput is output type of ls command
type PTPListenerInfoOutput struct {
Protocol string
Address string
}
@ -33,7 +32,7 @@ type PTPStreamInfoOutput struct {
// PTPLsOutput is output type of ls command
type PTPLsOutput struct {
Apps []PTPAppInfoOutput
Listeners []PTPListenerInfoOutput
}
// PTPStreamsOutput is output type of streams command
@ -87,10 +86,10 @@ var ptpLsCmd = &cmds.Command{
output := &PTPLsOutput{}
for _, app := range n.PTP.Apps.Apps {
output.Apps = append(output.Apps, PTPAppInfoOutput{
Protocol: app.Protocol,
Address: app.Address.String(),
for _, listener := range n.PTP.Listeners.Listeners {
output.Listeners = append(output.Listeners, PTPListenerInfoOutput{
Protocol: listener.Protocol,
Address: listener.Address.String(),
})
}
@ -103,12 +102,12 @@ var ptpLsCmd = &cmds.Command{
list, _ := res.Output().(*PTPLsOutput)
buf := new(bytes.Buffer)
w := tabwriter.NewWriter(buf, 1, 2, 1, ' ', 0)
for _, app := range list.Apps {
for _, listener := range list.Listeners {
if headers {
fmt.Fprintln(w, "Address\tProtocol")
}
fmt.Fprintf(w, "%s\t%s\n", app.Address, app.Protocol)
fmt.Fprintf(w, "%s\t%s\n", listener.Address, listener.Protocol)
}
w.Flush()
@ -183,9 +182,9 @@ var ptpStreamsCmd = &cmds.Command{
var ptpListenCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Create application protocol listener and proxy to network multiaddr.",
Tagline: "Forward p2p connections to a network multiaddr.",
ShortDescription: `
Register a p2p connection handler and proxies the connections to a specified address.
Register a p2p connection handler and forward the connections to a specified address.
Note that the connections originate from the ipfs daemon process.
`,
@ -212,8 +211,8 @@ Note that the connections originate from the ipfs daemon process.
return
}
proto := "/app/" + req.Arguments()[0]
if ptpnet.CheckProtoExists(n, proto) {
proto := "/ptp/" + req.Arguments()[0]
if n.PTP.CheckProtoExists(proto) {
res.SetError(errors.New("protocol handler already registered"), cmds.ErrNormal)
return
}
@ -224,14 +223,14 @@ Note that the connections originate from the ipfs daemon process.
return
}
_, err = ptpnet.NewListener(n, proto, addr)
_, err = n.PTP.NewListener(n.Context(), proto, addr)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
// Successful response.
res.SetOutput(&PTPAppInfoOutput{
res.SetOutput(&PTPListenerInfoOutput{
Protocol: proto,
Address: addr.String(),
})
@ -253,7 +252,7 @@ transparently connect to a p2p service.
Arguments: []cmds.Argument{
cmds.StringArg("Peer", true, false, "Remote peer to connect to"),
cmds.StringArg("Protocol", true, false, "Protocol identifier."),
cmds.StringArg("BindAddress", false, false, "Address to listen for application/s (default: /ip4/127.0.0.1/tcp/0)."),
cmds.StringArg("BindAddress", false, false, "Address to listen for connection/s (default: /ip4/127.0.0.1/tcp/0)."),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
@ -279,7 +278,7 @@ transparently connect to a p2p service.
return
}
proto := "/app/" + req.Arguments()[1]
proto := "/ptp/" + req.Arguments()[1]
bindAddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0")
if len(req.Arguments()) == 3 {
@ -290,15 +289,15 @@ transparently connect to a p2p service.
}
}
app, err := ptpnet.Dial(n, addr, peer, proto, bindAddr)
listenerInfo, err := n.PTP.Dial(n.Context(), addr, peer, proto, bindAddr)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
output := PTPAppInfoOutput{
Protocol: app.Protocol,
Address: app.Address.String(),
output := PTPListenerInfoOutput{
Protocol: listenerInfo.Protocol,
Address: listenerInfo.Address.String(),
}
res.SetOutput(&output)
@ -348,7 +347,7 @@ var ptpCloseCmd = &cmds.Command{
handlerID, err = strconv.ParseUint(req.Arguments()[0], 10, 64)
if err != nil {
proto = "/app/" + req.Arguments()[0]
proto = "/ptp/" + req.Arguments()[0]
} else {
useHandlerID = true
}
@ -367,11 +366,11 @@ var ptpCloseCmd = &cmds.Command{
}
if closeAll || !useHandlerID {
for _, app := range n.PTP.Apps.Apps {
if !closeAll && app.Protocol != proto {
for _, listener := range n.PTP.Listeners.Listeners {
if !closeAll && listener.Protocol != proto {
continue
}
app.Close()
listener.Close()
if !closeAll {
break
}

View File

@ -47,7 +47,7 @@ ADVANCED COMMANDS
pin Pin objects to local storage
repo Manipulate the IPFS repository
stats Various operational stats
ptp Libp2p stream mounting
ptp Libp2p stream mounting
filestore Manage the filestore (experimental)
NETWORK COMMANDS

View File

@ -23,7 +23,6 @@ import (
bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
bserv "github.com/ipfs/go-ipfs/blockservice"
ptp "github.com/ipfs/go-ipfs/ptp"
exchange "github.com/ipfs/go-ipfs/exchange"
bitswap "github.com/ipfs/go-ipfs/exchange/bitswap"
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
@ -36,6 +35,7 @@ import (
ipnsrp "github.com/ipfs/go-ipfs/namesys/republisher"
path "github.com/ipfs/go-ipfs/path"
pin "github.com/ipfs/go-ipfs/pin"
ptp "github.com/ipfs/go-ipfs/ptp"
repo "github.com/ipfs/go-ipfs/repo"
config "github.com/ipfs/go-ipfs/repo/config"
nilrouting "github.com/ipfs/go-ipfs/routing/none"
@ -248,7 +248,7 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
n.Floodsub = floodsub.NewFloodSub(ctx, peerhost)
}
n.PTP = ptp.NewPTP()
n.PTP = ptp.NewPTP(n.Identity, n.PeerHost, n.Peerstore)
// setup local discovery
if do != nil {

View File

@ -1,82 +0,0 @@
package net
import (
"errors"
core "github.com/ipfs/go-ipfs/core"
ptp "github.com/ipfs/go-ipfs/ptp"
net "gx/ipfs/QmRscs8KxrSmSv4iuevHv8JfuUzHBMoqiaHzxfDRiksd6e/go-libp2p-net"
peerstore "gx/ipfs/QmXZSd1qR5BxZkPyuwfT5jpqQFScZccoZvDneXsKzCNHWX/go-libp2p-peerstore"
ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr"
peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
manet "gx/ipfs/Qmf1Gq7N45Rpuw7ev47uWgH6dLPtdnvcMRNPkVBwqjLJg2/go-multiaddr-net"
)
func Dial(n *core.IpfsNode, addr ma.Multiaddr, peer peer.ID, proto string, bindAddr ma.Multiaddr) (*ptp.ListenerInfo, error) {
lnet, _, err := manet.DialArgs(bindAddr)
if err != nil {
return nil, err
}
app := ptp.ListenerInfo{
Identity: n.Identity,
Protocol: proto,
}
n.Peerstore.AddAddr(peer, addr, peerstore.TempAddrTTL)
remote, err := dial(n, peer, proto)
if err != nil {
return nil, err
}
switch lnet {
case "tcp", "tcp4", "tcp6":
listener, err := manet.Listen(bindAddr)
if err != nil {
if err2 := remote.Close(); err2 != nil {
return nil, err2
}
return nil, err
}
app.Address = listener.Multiaddr()
app.Closer = listener
app.Running = true
go doAccept(n, &app, remote, listener)
default:
return nil, errors.New("unsupported protocol: " + lnet)
}
return &app, nil
}
func doAccept(n *core.IpfsNode, app *ptp.ListenerInfo, remote net.Stream, listener manet.Listener) {
defer listener.Close()
local, err := listener.Accept()
if err != nil {
return
}
stream := ptp.StreamInfo{
Protocol: app.Protocol,
LocalPeer: app.Identity,
LocalAddr: app.Address,
RemotePeer: remote.Conn().RemotePeer(),
RemoteAddr: remote.Conn().RemoteMultiaddr(),
Local: local,
Remote: remote,
Registry: &n.PTP.Streams,
}
n.PTP.Streams.Register(&stream)
startStreaming(&stream)
}

View File

@ -1,67 +0,0 @@
package net
import (
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/ptp"
ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr"
manet "gx/ipfs/Qmf1Gq7N45Rpuw7ev47uWgH6dLPtdnvcMRNPkVBwqjLJg2/go-multiaddr-net"
)
// NewListener creates new ptp listener
func NewListener(n *core.IpfsNode, proto string, addr ma.Multiaddr) (*ptp.ListenerInfo, error) {
listener, err := Listen(n, proto)
if err != nil {
return nil, err
}
listenerInfo := ptp.ListenerInfo{
Identity: n.Identity,
Protocol: proto,
Address: addr,
Closer: listener,
Running: true,
Registry: &n.PTP.Listeners,
}
go acceptStreams(n, &listenerInfo, listener)
n.PTP.Listeners.Register(&listenerInfo)
return &listenerInfo, nil
}
func acceptStreams(n *core.IpfsNode, listenerInfo *ptp.ListenerInfo, listener Listener) {
for listenerInfo.Running {
remote, err := listener.Accept()
if err != nil {
listener.Close()
break
}
local, err := manet.Dial(listenerInfo.Address)
if err != nil {
remote.Close()
continue
}
stream := ptp.StreamInfo{
Protocol: listenerInfo.Protocol,
LocalPeer: listenerInfo.Identity,
LocalAddr: listenerInfo.Address,
RemotePeer: remote.Conn().RemotePeer(),
RemoteAddr: remote.Conn().RemoteMultiaddr(),
Local: local,
Remote: remote,
Registry: &n.PTP.Streams,
}
n.PTP.Streams.Register(&stream)
startStreaming(&stream)
}
n.PTP.Listeners.Deregister(listenerInfo.Protocol)
}

View File

@ -1,93 +0,0 @@
package net
import (
"time"
context "context"
core "github.com/ipfs/go-ipfs/core"
net "gx/ipfs/QmRscs8KxrSmSv4iuevHv8JfuUzHBMoqiaHzxfDRiksd6e/go-libp2p-net"
pstore "gx/ipfs/QmXZSd1qR5BxZkPyuwfT5jpqQFScZccoZvDneXsKzCNHWX/go-libp2p-peerstore"
pro "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
)
// Listener wraps stream handler into a listener
type Listener interface {
Accept() (net.Stream, error)
Close() error
}
// IpfsListener holds information on a listener
type IpfsListener struct {
node *core.IpfsNode
conCh chan net.Stream
proto pro.ID
ctx context.Context
cancel func()
}
// Accept waits for a connection from the listener
func (il *IpfsListener) Accept() (net.Stream, error) {
select {
case c := <-il.conCh:
return c, nil
case <-il.ctx.Done():
return nil, il.ctx.Err()
}
}
// Close closes the listener and removes stream handler
func (il *IpfsListener) Close() error {
il.cancel()
il.node.PeerHost.RemoveStreamHandler(il.proto)
return nil
}
// Listen creates new IpfsListener
func Listen(nd *core.IpfsNode, protocol string) (*IpfsListener, error) {
ctx, cancel := context.WithCancel(nd.Context())
list := &IpfsListener{
node: nd,
proto: pro.ID(protocol),
conCh: make(chan net.Stream),
ctx: ctx,
cancel: cancel,
}
nd.PeerHost.SetStreamHandler(list.proto, func(s net.Stream) {
select {
case list.conCh <- s:
case <-ctx.Done():
s.Close()
}
})
return list, nil
}
// Dial dials to a specified node and protocol
func dial(nd *core.IpfsNode, p peer.ID, protocol string) (net.Stream, error) {
ctx, cancel := context.WithTimeout(nd.Context(), time.Second*30)
defer cancel()
err := nd.PeerHost.Connect(ctx, pstore.PeerInfo{ID: p})
if err != nil {
return nil, err
}
return nd.PeerHost.NewStream(nd.Context(), p, pro.ID(protocol))
}
// CheckProtoExists checks whether a protocol handler is registered to
// mux handler
func CheckProtoExists(n *core.IpfsNode, proto string) bool {
protos := n.PeerHost.Mux().Protocols()
for _, p := range protos {
if p != proto {
continue
}
return true
}
return false
}

View File

@ -1,19 +0,0 @@
package net
import (
"io"
ptp "github.com/ipfs/go-ipfs/ptp"
)
func startStreaming(stream *ptp.StreamInfo) {
go func() {
io.Copy(stream.Local, stream.Remote)
stream.Close()
}()
go func() {
io.Copy(stream.Remote, stream.Local)
stream.Close()
}()
}

View File

@ -1,12 +1,237 @@
package ptp
// PTP structure holds information on currently running streams/apps
import (
"context"
"errors"
"time"
net "gx/ipfs/QmRscs8KxrSmSv4iuevHv8JfuUzHBMoqiaHzxfDRiksd6e/go-libp2p-net"
p2phost "gx/ipfs/QmUywuGNZoUKV8B9iyvup9bPkLiMrhTsyVMkeSXW5VxAfC/go-libp2p-host"
pstore "gx/ipfs/QmXZSd1qR5BxZkPyuwfT5jpqQFScZccoZvDneXsKzCNHWX/go-libp2p-peerstore"
pro "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr"
peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
manet "gx/ipfs/Qmf1Gq7N45Rpuw7ev47uWgH6dLPtdnvcMRNPkVBwqjLJg2/go-multiaddr-net"
)
// PTP structure holds information on currently running streams/listeners
type PTP struct {
Listeners ListenerRegistry
Streams StreamRegistry
identity peer.ID
peerHost p2phost.Host
peerstore pstore.Peerstore
}
// NewPTP creates new PTP struct
func NewPTP() *PTP {
return &PTP{}
func NewPTP(identity peer.ID, peerHost p2phost.Host, peerstore pstore.Peerstore) *PTP {
return &PTP{
identity: identity,
peerHost: peerHost,
peerstore: peerstore,
}
}
func (ptp *PTP) newStreamTo(ctx2 context.Context, p peer.ID, protocol string) (net.Stream, error) {
ctx, cancel := context.WithTimeout(ctx2, time.Second*30) //TODO: configurable?
defer cancel()
err := ptp.peerHost.Connect(ctx, pstore.PeerInfo{ID: p})
if err != nil {
return nil, err
}
return ptp.peerHost.NewStream(ctx2, p, pro.ID(protocol))
}
func (ptp *PTP) Dial(ctx context.Context, addr ma.Multiaddr, peer peer.ID, proto string, bindAddr ma.Multiaddr) (*ListenerInfo, error) {
lnet, _, err := manet.DialArgs(bindAddr)
if err != nil {
return nil, err
}
listenerInfo := ListenerInfo{
Identity: ptp.identity,
Protocol: proto,
}
remote, err := ptp.newStreamTo(ctx, peer, proto)
if err != nil {
return nil, err
}
switch lnet {
case "tcp", "tcp4", "tcp6":
listener, err := manet.Listen(bindAddr)
if err != nil {
if err2 := remote.Close(); err2 != nil {
return nil, err2
}
return nil, err
}
listenerInfo.Address = listener.Multiaddr()
listenerInfo.Closer = listener
listenerInfo.Running = true
go ptp.doAccept(&listenerInfo, remote, listener)
default:
return nil, errors.New("unsupported protocol: " + lnet)
}
return &listenerInfo, nil
}
func (ptp *PTP) doAccept(listenerInfo *ListenerInfo, remote net.Stream, listener manet.Listener) {
defer listener.Close()
local, err := listener.Accept()
if err != nil {
return
}
stream := StreamInfo{
Protocol: listenerInfo.Protocol,
LocalPeer: listenerInfo.Identity,
LocalAddr: listenerInfo.Address,
RemotePeer: remote.Conn().RemotePeer(),
RemoteAddr: remote.Conn().RemoteMultiaddr(),
Local: local,
Remote: remote,
Registry: &ptp.Streams,
}
ptp.Streams.Register(&stream)
stream.startStreaming()
}
// Listener wraps stream handler into a listener
type Listener interface {
Accept() (net.Stream, error)
Close() error
}
// P2PListener holds information on a listener
type P2PListener struct {
peerHost p2phost.Host
conCh chan net.Stream
proto pro.ID
ctx context.Context
cancel func()
}
// Accept waits for a connection from the listener
func (il *P2PListener) Accept() (net.Stream, error) {
select {
case c := <-il.conCh:
return c, nil
case <-il.ctx.Done():
return nil, il.ctx.Err()
}
}
// Close closes the listener and removes stream handler
func (il *P2PListener) Close() error {
il.cancel()
il.peerHost.RemoveStreamHandler(il.proto)
return nil
}
// Listen creates new P2PListener
func (ptp *PTP) registerStreamHandler(ctx2 context.Context, protocol string) (*P2PListener, error) {
ctx, cancel := context.WithCancel(ctx2)
list := &P2PListener{
peerHost: ptp.peerHost,
proto: pro.ID(protocol),
conCh: make(chan net.Stream),
ctx: ctx,
cancel: cancel,
}
ptp.peerHost.SetStreamHandler(list.proto, func(s net.Stream) {
select {
case list.conCh <- s:
case <-ctx.Done():
s.Close()
}
})
return list, nil
}
// NewListener creates new ptp listener
func (ptp *PTP) NewListener(ctx context.Context, proto string, addr ma.Multiaddr) (*ListenerInfo, error) {
listener, err := ptp.registerStreamHandler(ctx, proto)
if err != nil {
return nil, err
}
listenerInfo := ListenerInfo{
Identity: ptp.identity,
Protocol: proto,
Address: addr,
Closer: listener,
Running: true,
Registry: &ptp.Listeners,
}
go ptp.acceptStreams(&listenerInfo, listener)
ptp.Listeners.Register(&listenerInfo)
return &listenerInfo, nil
}
func (ptp *PTP) acceptStreams(listenerInfo *ListenerInfo, listener Listener) {
for listenerInfo.Running {
remote, err := listener.Accept()
if err != nil {
listener.Close()
break
}
local, err := manet.Dial(listenerInfo.Address)
if err != nil {
remote.Close()
continue
}
stream := StreamInfo{
Protocol: listenerInfo.Protocol,
LocalPeer: listenerInfo.Identity,
LocalAddr: listenerInfo.Address,
RemotePeer: remote.Conn().RemotePeer(),
RemoteAddr: remote.Conn().RemoteMultiaddr(),
Local: local,
Remote: remote,
Registry: &ptp.Streams,
}
ptp.Streams.Register(&stream)
stream.startStreaming()
}
ptp.Listeners.Deregister(listenerInfo.Protocol)
}
// CheckProtoExists checks whether a protocol handler is registered to
// mux handler
func (ptp *PTP) CheckProtoExists(proto string) bool {
protos := ptp.peerHost.Mux().Protocols()
for _, p := range protos {
if p != proto {
continue
}
return true
}
return false
}

View File

@ -1,11 +1,11 @@
package ptp
import (
"fmt"
"io"
ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr"
peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
"fmt"
)
// ListenerInfo holds information on a p2p listener.
@ -41,7 +41,7 @@ type ListenerRegistry struct {
Listeners []*ListenerInfo
}
// Register registers listenerInfo in this registry
// Register registers listenerInfo2 in this registry
func (c *ListenerRegistry) Register(listenerInfo *ListenerInfo) {
c.Listeners = append(c.Listeners, listenerInfo)
}
@ -63,3 +63,70 @@ func (c *ListenerRegistry) Deregister(proto string) error {
return fmt.Errorf("failed to deregister proto %s", proto)
}
// StreamInfo holds information on active incoming and outgoing p2p streams.
type StreamInfo struct {
HandlerID uint64
Protocol string
LocalPeer peer.ID
LocalAddr ma.Multiaddr
RemotePeer peer.ID
RemoteAddr ma.Multiaddr
Local io.ReadWriteCloser
Remote io.ReadWriteCloser
Registry *StreamRegistry
}
// Close closes stream endpoints and deregisters it
func (c *StreamInfo) Close() error {
c.Local.Close()
c.Remote.Close()
c.Registry.Deregister(c.HandlerID)
return nil
}
func (s *StreamInfo) startStreaming() {
go func() {
io.Copy(s.Local, s.Remote)
s.Close()
}()
go func() {
io.Copy(s.Remote, s.Local)
s.Close()
}()
}
// StreamRegistry is a collection of active incoming and outgoing protocol app streams.
type StreamRegistry struct {
Streams []*StreamInfo
nextID uint64
}
// Register registers a stream to the registry
func (c *StreamRegistry) Register(streamInfo *StreamInfo) {
streamInfo.HandlerID = c.nextID
c.Streams = append(c.Streams, streamInfo)
c.nextID++
}
// Deregister deregisters stream from the registry
func (c *StreamRegistry) Deregister(handlerID uint64) {
foundAt := -1
for i, s := range c.Streams {
if s.HandlerID == handlerID {
foundAt = i
break
}
}
if foundAt != -1 {
c.Streams = append(c.Streams[:foundAt], c.Streams[foundAt+1:]...)
}
}

View File

@ -1,63 +0,0 @@
package ptp
import (
"io"
ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr"
peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
)
// StreamInfo holds information on active incoming and outgoing p2p streams.
type StreamInfo struct {
HandlerID uint64
Protocol string
LocalPeer peer.ID
LocalAddr ma.Multiaddr
RemotePeer peer.ID
RemoteAddr ma.Multiaddr
Local io.ReadWriteCloser
Remote io.ReadWriteCloser
Registry *StreamRegistry
}
// Close closes stream endpoints and deregisters it
func (c *StreamInfo) Close() error {
c.Local.Close()
c.Remote.Close()
c.Registry.Deregister(c.HandlerID)
return nil
}
// StreamRegistry is a collection of active incoming and outgoing protocol app streams.
type StreamRegistry struct {
Streams []*StreamInfo
nextID uint64
}
// Register registers a stream to the registry
func (c *StreamRegistry) Register(streamInfo *StreamInfo) {
streamInfo.HandlerID = c.nextID
c.Streams = append(c.Streams, streamInfo)
c.nextID++
}
// Deregister deregisters stream from the registry
func (c *StreamRegistry) Deregister(handlerID uint64) {
foundAt := -1
for i, s := range c.Streams {
if s.HandlerID == handlerID {
foundAt = i
break
}
}
if foundAt != -1 {
c.Streams = append(c.Streams[:foundAt], c.Streams[foundAt+1:]...)
}
}

View File

@ -66,7 +66,7 @@ test_expect_success 'client to server output looks good' '
'
test_expect_success "'ipfs ptp ls' succeeds" '
echo "/ip4/127.0.0.1/tcp/10101 /app/ptp-test" > expected &&
echo "/ip4/127.0.0.1/tcp/10101 /ptp/ptp-test" > expected &&
ipfsi 0 ptp ls > actual
'
@ -94,7 +94,7 @@ test_expect_success "Setup: Idle stream" '
'
test_expect_success "'ipfs ptp streams' succeeds" '
echo "2 /app/ptp-test /ip4/127.0.0.1/tcp/10101 $PEERID_1" > expected
echo "2 /ptp/ptp-test /ip4/127.0.0.1/tcp/10101 $PEERID_1" > expected
ipfsi 0 ptp streams > actual
'
@ -127,7 +127,7 @@ test_expect_success "Setup: Idle stream(2)" '
'
test_expect_success "'ipfs ptp streams' succeeds(2)" '
echo "3 /app/ptp-test2 /ip4/127.0.0.1/tcp/10101 $PEERID_1" > expected
echo "3 /ptp/ptp-test2 /ip4/127.0.0.1/tcp/10101 $PEERID_1" > expected
ipfsi 0 ptp streams > actual
test_cmp expected actual
'