1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-29 09:34:03 +08:00

Corenet API: Move more logic away from commands

License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
This commit is contained in:
Łukasz Magiera
2017-06-02 15:57:38 +02:00
parent e66e8bbe96
commit eac71847bd
6 changed files with 214 additions and 149 deletions

View File

@ -9,14 +9,10 @@ import (
"text/tabwriter" "text/tabwriter"
cmds "github.com/ipfs/go-ipfs/commands" cmds "github.com/ipfs/go-ipfs/commands"
core "github.com/ipfs/go-ipfs/core" "github.com/ipfs/go-ipfs/core"
corenet "github.com/ipfs/go-ipfs/corenet"
cnet "github.com/ipfs/go-ipfs/corenet/net" cnet "github.com/ipfs/go-ipfs/corenet/net"
net "gx/ipfs/QmRscs8KxrSmSv4iuevHv8JfuUzHBMoqiaHzxfDRiksd6e/go-libp2p-net"
peerstore "gx/ipfs/QmXZSd1qR5BxZkPyuwfT5jpqQFScZccoZvDneXsKzCNHWX/go-libp2p-peerstore"
ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr" ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr"
manet "gx/ipfs/Qmf1Gq7N45Rpuw7ev47uWgH6dLPtdnvcMRNPkVBwqjLJg2/go-multiaddr-net"
) )
// CorenetAppInfoOutput is output type of ls command // CorenetAppInfoOutput is output type of ls command
@ -188,6 +184,12 @@ var corenetStreamsCmd = &cmds.Command{
var corenetListenCmd = &cmds.Command{ var corenetListenCmd = &cmds.Command{
Helptext: cmds.HelpText{ Helptext: cmds.HelpText{
Tagline: "Create application protocol listener and proxy to network multiaddr.", Tagline: "Create application protocol listener and proxy to network multiaddr.",
ShortDescription: `
Register a p2p connection handler and proxies the connections to a specified
address.
Note that the connections originate from the ipfs daemon process.
`,
}, },
Arguments: []cmds.Argument{ Arguments: []cmds.Argument{
cmds.StringArg("Protocol", true, false, "Protocol identifier."), cmds.StringArg("Protocol", true, false, "Protocol identifier."),
@ -212,7 +214,7 @@ var corenetListenCmd = &cmds.Command{
} }
proto := "/app/" + req.Arguments()[0] proto := "/app/" + req.Arguments()[0]
if checkProtoExists(n.PeerHost.Mux().Protocols(), proto) { if cnet.CheckProtoExists(n, proto) {
res.SetError(errors.New("protocol handler already registered"), cmds.ErrNormal) res.SetError(errors.New("protocol handler already registered"), cmds.ErrNormal)
return return
} }
@ -223,25 +225,12 @@ var corenetListenCmd = &cmds.Command{
return return
} }
listener, err := cnet.Listen(n, proto) _, err = cnet.NewListener(n, proto, addr)
if err != nil { if err != nil {
res.SetError(err, cmds.ErrNormal) res.SetError(err, cmds.ErrNormal)
return return
} }
app := corenet.AppInfo{
Identity: n.Identity,
Protocol: proto,
Address: addr,
Closer: listener,
Running: true,
Registry: &n.Corenet.Apps,
}
go acceptStreams(n, &app, listener)
n.Corenet.Apps.Register(&app)
// Successful response. // Successful response.
res.SetOutput(&CorenetAppInfoOutput{ res.SetOutput(&CorenetAppInfoOutput{
Protocol: proto, Protocol: proto,
@ -250,66 +239,17 @@ var corenetListenCmd = &cmds.Command{
}, },
} }
func checkProtoExists(protos []string, proto string) bool {
for _, p := range protos {
if p != proto {
continue
}
return true
}
return false
}
func acceptStreams(n *core.IpfsNode, app *corenet.AppInfo, listener cnet.Listener) {
for app.Running {
remote, err := listener.Accept()
if err != nil {
listener.Close()
break
}
local, err := manet.Dial(app.Address)
if err != nil {
remote.Close()
continue
}
stream := corenet.StreamInfo{
Protocol: app.Protocol,
LocalPeer: app.Identity,
LocalAddr: app.Address,
RemotePeer: remote.Conn().RemotePeer(),
RemoteAddr: remote.Conn().RemoteMultiaddr(),
Local: local,
Remote: remote,
Registry: &n.Corenet.Streams,
}
n.Corenet.Streams.Register(&stream)
startStreaming(&stream)
}
n.Corenet.Apps.Deregister(app.Protocol)
}
func startStreaming(stream *corenet.StreamInfo) {
go func() {
io.Copy(stream.Local, stream.Remote)
stream.Close()
}()
go func() {
io.Copy(stream.Remote, stream.Local)
stream.Close()
}()
}
var corenetDialCmd = &cmds.Command{ var corenetDialCmd = &cmds.Command{
Helptext: cmds.HelpText{ Helptext: cmds.HelpText{
Tagline: "Dial to an application service.", Tagline: "Dial to an application service.",
ShortDescription: `
Establish a new connection to a peer service.
When a connection is made to a peer service the ipfs daemon will setup one time
TCP listener and return it's bind port, this way a dialing application can
transparently connect to a corenet service.
`,
}, },
Arguments: []cmds.Argument{ Arguments: []cmds.Argument{
cmds.StringArg("Peer", true, false, "Remote peer to connect to"), cmds.StringArg("Peer", true, false, "Remote peer to connect to"),
@ -351,47 +291,12 @@ var corenetDialCmd = &cmds.Command{
} }
} }
lnet, _, err := manet.DialArgs(bindAddr) app, err := cnet.Dial(n, addr, peer, proto, bindAddr)
if err != nil { if err != nil {
res.SetError(err, cmds.ErrNormal) res.SetError(err, cmds.ErrNormal)
return return
} }
app := corenet.AppInfo{
Identity: n.Identity,
Protocol: proto,
}
n.Peerstore.AddAddr(peer, addr, peerstore.TempAddrTTL)
remote, err := cnet.Dial(n, peer, proto)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
switch lnet {
case "tcp", "tcp4", "tcp6":
listener, err := manet.Listen(bindAddr)
if err != nil {
res.SetError(err, cmds.ErrNormal)
if err := remote.Close(); err != nil {
res.SetError(err, cmds.ErrNormal)
}
return
}
app.Address = listener.Multiaddr()
app.Closer = listener
app.Running = true
go doAccept(n, &app, remote, listener)
default:
res.SetError(errors.New("unsupported protocol: "+lnet), cmds.ErrNormal)
return
}
output := CorenetAppInfoOutput{ output := CorenetAppInfoOutput{
Protocol: app.Protocol, Protocol: app.Protocol,
Address: app.Address.String(), Address: app.Address.String(),
@ -401,33 +306,6 @@ var corenetDialCmd = &cmds.Command{
}, },
} }
func doAccept(n *core.IpfsNode, app *corenet.AppInfo, remote net.Stream, listener manet.Listener) {
defer listener.Close()
local, err := listener.Accept()
if err != nil {
return
}
stream := corenet.StreamInfo{
Protocol: app.Protocol,
LocalPeer: app.Identity,
LocalAddr: app.Address,
RemotePeer: remote.Conn().RemotePeer(),
RemoteAddr: remote.Conn().RemoteMultiaddr(),
Local: local,
Remote: remote,
Registry: &n.Corenet.Streams,
}
n.Corenet.Streams.Register(&stream)
startStreaming(&stream)
}
var corenetCloseCmd = &cmds.Command{ var corenetCloseCmd = &cmds.Command{
Helptext: cmds.HelpText{ Helptext: cmds.HelpText{
Tagline: "Closes an active stream listener or client.", Tagline: "Closes an active stream listener or client.",
@ -464,15 +342,15 @@ var corenetCloseCmd = &cmds.Command{
useHandlerID := false useHandlerID := false
if !closeAll && len(req.Arguments()) == 0 { if !closeAll {
res.SetError(errors.New("no handlerID nor stream protocol specified"), cmds.ErrNormal) if len(req.Arguments()) == 0 {
return res.SetError(errors.New("no handlerID nor stream protocol specified"), cmds.ErrNormal)
return
}
} else if !closeAll {
handlerID, err = strconv.ParseUint(req.Arguments()[0], 10, 64) handlerID, err = strconv.ParseUint(req.Arguments()[0], 10, 64)
if err != nil { if err != nil {
proto = "/app/" + req.Arguments()[0] proto = "/app/" + req.Arguments()[0]
} else { } else {
useHandlerID = true useHandlerID = true
} }

View File

@ -5,6 +5,7 @@ import (
ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr" ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr"
peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer" peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
"fmt"
) )
// AppInfo holds information on a local application protocol listener service. // AppInfo holds information on a local application protocol listener service.
@ -30,9 +31,9 @@ type AppInfo struct {
// Close closes the listener. Does not affect child streams // Close closes the listener. Does not affect child streams
func (c *AppInfo) Close() error { func (c *AppInfo) Close() error {
c.Registry.Deregister(c.Protocol)
c.Closer.Close() c.Closer.Close()
return nil err := c.Registry.Deregister(c.Protocol)
return err
} }
// AppRegistry is a collection of local application protocol listeners. // AppRegistry is a collection of local application protocol listeners.
@ -46,7 +47,7 @@ func (c *AppRegistry) Register(appInfo *AppInfo) {
} }
// Deregister deregisters protocol handler from this registry // Deregister deregisters protocol handler from this registry
func (c *AppRegistry) Deregister(proto string) { func (c *AppRegistry) Deregister(proto string) error {
foundAt := -1 foundAt := -1
for i, a := range c.Apps { for i, a := range c.Apps {
if a.Protocol == proto { if a.Protocol == proto {
@ -57,5 +58,8 @@ func (c *AppRegistry) Deregister(proto string) {
if foundAt != -1 { if foundAt != -1 {
c.Apps = append(c.Apps[:foundAt], c.Apps[foundAt+1:]...) c.Apps = append(c.Apps[:foundAt], c.Apps[foundAt+1:]...)
return nil
} }
return fmt.Errorf("failed to deregister proto %s", proto)
} }

82
corenet/net/dial.go Normal file
View File

@ -0,0 +1,82 @@
package net
import (
"errors"
core "github.com/ipfs/go-ipfs/core"
corenet "github.com/ipfs/go-ipfs/corenet"
net "gx/ipfs/QmRscs8KxrSmSv4iuevHv8JfuUzHBMoqiaHzxfDRiksd6e/go-libp2p-net"
peerstore "gx/ipfs/QmXZSd1qR5BxZkPyuwfT5jpqQFScZccoZvDneXsKzCNHWX/go-libp2p-peerstore"
ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr"
peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
manet "gx/ipfs/Qmf1Gq7N45Rpuw7ev47uWgH6dLPtdnvcMRNPkVBwqjLJg2/go-multiaddr-net"
)
func Dial(n *core.IpfsNode, addr ma.Multiaddr, peer peer.ID, proto string, bindAddr ma.Multiaddr) (*corenet.AppInfo, error) {
lnet, _, err := manet.DialArgs(bindAddr)
if err != nil {
return nil, err
}
app := corenet.AppInfo{
Identity: n.Identity,
Protocol: proto,
}
n.Peerstore.AddAddr(peer, addr, peerstore.TempAddrTTL)
remote, err := dial(n, peer, proto)
if err != nil {
return nil, err
}
switch lnet {
case "tcp", "tcp4", "tcp6":
listener, err := manet.Listen(bindAddr)
if err != nil {
if err2 := remote.Close(); err2 != nil {
return nil, err2
}
return nil, err
}
app.Address = listener.Multiaddr()
app.Closer = listener
app.Running = true
go doAccept(n, &app, remote, listener)
default:
return nil, errors.New("unsupported protocol: " + lnet)
}
return &app, nil
}
func doAccept(n *core.IpfsNode, app *corenet.AppInfo, remote net.Stream, listener manet.Listener) {
defer listener.Close()
local, err := listener.Accept()
if err != nil {
return
}
stream := corenet.StreamInfo{
Protocol: app.Protocol,
LocalPeer: app.Identity,
LocalAddr: app.Address,
RemotePeer: remote.Conn().RemotePeer(),
RemoteAddr: remote.Conn().RemoteMultiaddr(),
Local: local,
Remote: remote,
Registry: &n.Corenet.Streams,
}
n.Corenet.Streams.Register(&stream)
startStreaming(&stream)
}

67
corenet/net/listen.go Normal file
View File

@ -0,0 +1,67 @@
package net
import (
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/corenet"
ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr"
manet "gx/ipfs/Qmf1Gq7N45Rpuw7ev47uWgH6dLPtdnvcMRNPkVBwqjLJg2/go-multiaddr-net"
)
// NewListener creates new corenet listener
func NewListener(n *core.IpfsNode, proto string, addr ma.Multiaddr) (*corenet.AppInfo, error) {
listener, err := Listen(n, proto)
if err != nil {
return nil, err
}
app := corenet.AppInfo{
Identity: n.Identity,
Protocol: proto,
Address: addr,
Closer: listener,
Running: true,
Registry: &n.Corenet.Apps,
}
go acceptStreams(n, &app, listener)
n.Corenet.Apps.Register(&app)
return &app, nil
}
func acceptStreams(n *core.IpfsNode, app *corenet.AppInfo, listener Listener) {
for app.Running {
remote, err := listener.Accept()
if err != nil {
listener.Close()
break
}
local, err := manet.Dial(app.Address)
if err != nil {
remote.Close()
continue
}
stream := corenet.StreamInfo{
Protocol: app.Protocol,
LocalPeer: app.Identity,
LocalAddr: app.Address,
RemotePeer: remote.Conn().RemotePeer(),
RemoteAddr: remote.Conn().RemoteMultiaddr(),
Local: local,
Remote: remote,
Registry: &n.Corenet.Streams,
}
n.Corenet.Streams.Register(&stream)
startStreaming(&stream)
}
n.Corenet.Apps.Deregister(app.Protocol)
}

View File

@ -5,6 +5,7 @@ import (
context "context" context "context"
core "github.com/ipfs/go-ipfs/core" core "github.com/ipfs/go-ipfs/core"
net "gx/ipfs/QmRscs8KxrSmSv4iuevHv8JfuUzHBMoqiaHzxfDRiksd6e/go-libp2p-net" net "gx/ipfs/QmRscs8KxrSmSv4iuevHv8JfuUzHBMoqiaHzxfDRiksd6e/go-libp2p-net"
pstore "gx/ipfs/QmXZSd1qR5BxZkPyuwfT5jpqQFScZccoZvDneXsKzCNHWX/go-libp2p-peerstore" pstore "gx/ipfs/QmXZSd1qR5BxZkPyuwfT5jpqQFScZccoZvDneXsKzCNHWX/go-libp2p-peerstore"
pro "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" pro "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
@ -67,7 +68,7 @@ func Listen(nd *core.IpfsNode, protocol string) (*IpfsListener, error) {
} }
// Dial dials to a specified node and protocol // Dial dials to a specified node and protocol
func Dial(nd *core.IpfsNode, p peer.ID, protocol string) (net.Stream, error) { func dial(nd *core.IpfsNode, p peer.ID, protocol string) (net.Stream, error) {
ctx, cancel := context.WithTimeout(nd.Context(), time.Second*30) ctx, cancel := context.WithTimeout(nd.Context(), time.Second*30)
defer cancel() defer cancel()
err := nd.PeerHost.Connect(ctx, pstore.PeerInfo{ID: p}) err := nd.PeerHost.Connect(ctx, pstore.PeerInfo{ID: p})
@ -76,3 +77,17 @@ func Dial(nd *core.IpfsNode, p peer.ID, protocol string) (net.Stream, error) {
} }
return nd.PeerHost.NewStream(nd.Context(), p, pro.ID(protocol)) return nd.PeerHost.NewStream(nd.Context(), p, pro.ID(protocol))
} }
// CheckProtoExists checks whether a protocol handler is registered to
// mux handler
func CheckProtoExists(n *core.IpfsNode, proto string) bool {
protos := n.PeerHost.Mux().Protocols()
for _, p := range protos {
if p != proto {
continue
}
return true
}
return false
}

19
corenet/net/util.go Normal file
View File

@ -0,0 +1,19 @@
package net
import (
"io"
corenet "github.com/ipfs/go-ipfs/corenet"
)
func startStreaming(stream *corenet.StreamInfo) {
go func() {
io.Copy(stream.Local, stream.Remote)
stream.Close()
}()
go func() {
io.Copy(stream.Remote, stream.Local)
stream.Close()
}()
}