From eac71847bdbcd2e44f4eee7f8fc926f47e609085 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 2 Jun 2017 15:57:38 +0200 Subject: [PATCH] Corenet API: Move more logic away from commands MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Ɓukasz Magiera --- core/commands/corenet.go | 168 ++++++--------------------------------- corenet/apps.go | 10 ++- corenet/net/dial.go | 82 +++++++++++++++++++ corenet/net/listen.go | 67 ++++++++++++++++ corenet/net/net.go | 17 +++- corenet/net/util.go | 19 +++++ 6 files changed, 214 insertions(+), 149 deletions(-) create mode 100644 corenet/net/dial.go create mode 100644 corenet/net/listen.go create mode 100644 corenet/net/util.go diff --git a/core/commands/corenet.go b/core/commands/corenet.go index e33c27369..625b3ca29 100644 --- a/core/commands/corenet.go +++ b/core/commands/corenet.go @@ -9,14 +9,10 @@ import ( "text/tabwriter" cmds "github.com/ipfs/go-ipfs/commands" - core "github.com/ipfs/go-ipfs/core" - corenet "github.com/ipfs/go-ipfs/corenet" + "github.com/ipfs/go-ipfs/core" cnet "github.com/ipfs/go-ipfs/corenet/net" - net "gx/ipfs/QmRscs8KxrSmSv4iuevHv8JfuUzHBMoqiaHzxfDRiksd6e/go-libp2p-net" - peerstore "gx/ipfs/QmXZSd1qR5BxZkPyuwfT5jpqQFScZccoZvDneXsKzCNHWX/go-libp2p-peerstore" ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr" - manet "gx/ipfs/Qmf1Gq7N45Rpuw7ev47uWgH6dLPtdnvcMRNPkVBwqjLJg2/go-multiaddr-net" ) // CorenetAppInfoOutput is output type of ls command @@ -188,6 +184,12 @@ var corenetStreamsCmd = &cmds.Command{ var corenetListenCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Create application protocol listener and proxy to network multiaddr.", + ShortDescription: ` +Register a p2p connection handler and proxies the connections to a specified +address. + +Note that the connections originate from the ipfs daemon process. + `, }, Arguments: []cmds.Argument{ cmds.StringArg("Protocol", true, false, "Protocol identifier."), @@ -212,7 +214,7 @@ var corenetListenCmd = &cmds.Command{ } proto := "/app/" + req.Arguments()[0] - if checkProtoExists(n.PeerHost.Mux().Protocols(), proto) { + if cnet.CheckProtoExists(n, proto) { res.SetError(errors.New("protocol handler already registered"), cmds.ErrNormal) return } @@ -223,25 +225,12 @@ var corenetListenCmd = &cmds.Command{ return } - listener, err := cnet.Listen(n, proto) + _, err = cnet.NewListener(n, proto, addr) if err != nil { res.SetError(err, cmds.ErrNormal) return } - app := corenet.AppInfo{ - Identity: n.Identity, - Protocol: proto, - Address: addr, - Closer: listener, - Running: true, - Registry: &n.Corenet.Apps, - } - - go acceptStreams(n, &app, listener) - - n.Corenet.Apps.Register(&app) - // Successful response. res.SetOutput(&CorenetAppInfoOutput{ Protocol: proto, @@ -250,66 +239,17 @@ var corenetListenCmd = &cmds.Command{ }, } -func checkProtoExists(protos []string, proto string) bool { - for _, p := range protos { - if p != proto { - continue - } - return true - } - return false -} - -func acceptStreams(n *core.IpfsNode, app *corenet.AppInfo, listener cnet.Listener) { - for app.Running { - remote, err := listener.Accept() - if err != nil { - listener.Close() - break - } - - local, err := manet.Dial(app.Address) - if err != nil { - remote.Close() - continue - } - - stream := corenet.StreamInfo{ - Protocol: app.Protocol, - - LocalPeer: app.Identity, - LocalAddr: app.Address, - - RemotePeer: remote.Conn().RemotePeer(), - RemoteAddr: remote.Conn().RemoteMultiaddr(), - - Local: local, - Remote: remote, - - Registry: &n.Corenet.Streams, - } - - n.Corenet.Streams.Register(&stream) - startStreaming(&stream) - } - n.Corenet.Apps.Deregister(app.Protocol) -} - -func startStreaming(stream *corenet.StreamInfo) { - go func() { - io.Copy(stream.Local, stream.Remote) - stream.Close() - }() - - go func() { - io.Copy(stream.Remote, stream.Local) - stream.Close() - }() -} - var corenetDialCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Dial to an application service.", + + ShortDescription: ` +Establish a new connection to a peer service. + +When a connection is made to a peer service the ipfs daemon will setup one time +TCP listener and return it's bind port, this way a dialing application can +transparently connect to a corenet service. + `, }, Arguments: []cmds.Argument{ cmds.StringArg("Peer", true, false, "Remote peer to connect to"), @@ -351,47 +291,12 @@ var corenetDialCmd = &cmds.Command{ } } - lnet, _, err := manet.DialArgs(bindAddr) + app, err := cnet.Dial(n, addr, peer, proto, bindAddr) if err != nil { res.SetError(err, cmds.ErrNormal) return } - app := corenet.AppInfo{ - Identity: n.Identity, - Protocol: proto, - } - - n.Peerstore.AddAddr(peer, addr, peerstore.TempAddrTTL) - - remote, err := cnet.Dial(n, peer, proto) - if err != nil { - res.SetError(err, cmds.ErrNormal) - return - } - - switch lnet { - case "tcp", "tcp4", "tcp6": - listener, err := manet.Listen(bindAddr) - if err != nil { - res.SetError(err, cmds.ErrNormal) - if err := remote.Close(); err != nil { - res.SetError(err, cmds.ErrNormal) - } - return - } - - app.Address = listener.Multiaddr() - app.Closer = listener - app.Running = true - - go doAccept(n, &app, remote, listener) - - default: - res.SetError(errors.New("unsupported protocol: "+lnet), cmds.ErrNormal) - return - } - output := CorenetAppInfoOutput{ Protocol: app.Protocol, Address: app.Address.String(), @@ -401,33 +306,6 @@ var corenetDialCmd = &cmds.Command{ }, } -func doAccept(n *core.IpfsNode, app *corenet.AppInfo, remote net.Stream, listener manet.Listener) { - defer listener.Close() - - local, err := listener.Accept() - if err != nil { - return - } - - stream := corenet.StreamInfo{ - Protocol: app.Protocol, - - LocalPeer: app.Identity, - LocalAddr: app.Address, - - RemotePeer: remote.Conn().RemotePeer(), - RemoteAddr: remote.Conn().RemoteMultiaddr(), - - Local: local, - Remote: remote, - - Registry: &n.Corenet.Streams, - } - - n.Corenet.Streams.Register(&stream) - startStreaming(&stream) -} - var corenetCloseCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Closes an active stream listener or client.", @@ -464,15 +342,15 @@ var corenetCloseCmd = &cmds.Command{ useHandlerID := false - if !closeAll && len(req.Arguments()) == 0 { - res.SetError(errors.New("no handlerID nor stream protocol specified"), cmds.ErrNormal) - return + if !closeAll { + if len(req.Arguments()) == 0 { + res.SetError(errors.New("no handlerID nor stream protocol specified"), cmds.ErrNormal) + return + } - } else if !closeAll { handlerID, err = strconv.ParseUint(req.Arguments()[0], 10, 64) if err != nil { proto = "/app/" + req.Arguments()[0] - } else { useHandlerID = true } diff --git a/corenet/apps.go b/corenet/apps.go index 5ad297c51..a807b1fbe 100644 --- a/corenet/apps.go +++ b/corenet/apps.go @@ -5,6 +5,7 @@ import ( ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr" peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer" + "fmt" ) // AppInfo holds information on a local application protocol listener service. @@ -30,9 +31,9 @@ type AppInfo struct { // Close closes the listener. Does not affect child streams func (c *AppInfo) Close() error { - c.Registry.Deregister(c.Protocol) c.Closer.Close() - return nil + err := c.Registry.Deregister(c.Protocol) + return err } // AppRegistry is a collection of local application protocol listeners. @@ -46,7 +47,7 @@ func (c *AppRegistry) Register(appInfo *AppInfo) { } // Deregister deregisters protocol handler from this registry -func (c *AppRegistry) Deregister(proto string) { +func (c *AppRegistry) Deregister(proto string) error { foundAt := -1 for i, a := range c.Apps { if a.Protocol == proto { @@ -57,5 +58,8 @@ func (c *AppRegistry) Deregister(proto string) { if foundAt != -1 { c.Apps = append(c.Apps[:foundAt], c.Apps[foundAt+1:]...) + return nil } + + return fmt.Errorf("failed to deregister proto %s", proto) } diff --git a/corenet/net/dial.go b/corenet/net/dial.go new file mode 100644 index 000000000..80d7d1581 --- /dev/null +++ b/corenet/net/dial.go @@ -0,0 +1,82 @@ +package net + +import ( + "errors" + + core "github.com/ipfs/go-ipfs/core" + corenet "github.com/ipfs/go-ipfs/corenet" + + net "gx/ipfs/QmRscs8KxrSmSv4iuevHv8JfuUzHBMoqiaHzxfDRiksd6e/go-libp2p-net" + peerstore "gx/ipfs/QmXZSd1qR5BxZkPyuwfT5jpqQFScZccoZvDneXsKzCNHWX/go-libp2p-peerstore" + ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr" + peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer" + manet "gx/ipfs/Qmf1Gq7N45Rpuw7ev47uWgH6dLPtdnvcMRNPkVBwqjLJg2/go-multiaddr-net" +) + +func Dial(n *core.IpfsNode, addr ma.Multiaddr, peer peer.ID, proto string, bindAddr ma.Multiaddr) (*corenet.AppInfo, error) { + lnet, _, err := manet.DialArgs(bindAddr) + if err != nil { + return nil, err + } + + app := corenet.AppInfo{ + Identity: n.Identity, + Protocol: proto, + } + + n.Peerstore.AddAddr(peer, addr, peerstore.TempAddrTTL) + + remote, err := dial(n, peer, proto) + if err != nil { + return nil, err + } + + switch lnet { + case "tcp", "tcp4", "tcp6": + listener, err := manet.Listen(bindAddr) + if err != nil { + if err2 := remote.Close(); err2 != nil { + return nil, err2 + } + return nil, err + } + + app.Address = listener.Multiaddr() + app.Closer = listener + app.Running = true + + go doAccept(n, &app, remote, listener) + + default: + return nil, errors.New("unsupported protocol: " + lnet) + } + + return &app, nil +} + +func doAccept(n *core.IpfsNode, app *corenet.AppInfo, remote net.Stream, listener manet.Listener) { + defer listener.Close() + + local, err := listener.Accept() + if err != nil { + return + } + + stream := corenet.StreamInfo{ + Protocol: app.Protocol, + + LocalPeer: app.Identity, + LocalAddr: app.Address, + + RemotePeer: remote.Conn().RemotePeer(), + RemoteAddr: remote.Conn().RemoteMultiaddr(), + + Local: local, + Remote: remote, + + Registry: &n.Corenet.Streams, + } + + n.Corenet.Streams.Register(&stream) + startStreaming(&stream) +} diff --git a/corenet/net/listen.go b/corenet/net/listen.go new file mode 100644 index 000000000..bdc54bb3a --- /dev/null +++ b/corenet/net/listen.go @@ -0,0 +1,67 @@ +package net + +import ( + "github.com/ipfs/go-ipfs/core" + "github.com/ipfs/go-ipfs/corenet" + + ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr" + manet "gx/ipfs/Qmf1Gq7N45Rpuw7ev47uWgH6dLPtdnvcMRNPkVBwqjLJg2/go-multiaddr-net" +) + +// NewListener creates new corenet listener +func NewListener(n *core.IpfsNode, proto string, addr ma.Multiaddr) (*corenet.AppInfo, error) { + listener, err := Listen(n, proto) + if err != nil { + return nil, err + } + + app := corenet.AppInfo{ + Identity: n.Identity, + Protocol: proto, + Address: addr, + Closer: listener, + Running: true, + Registry: &n.Corenet.Apps, + } + + go acceptStreams(n, &app, listener) + + n.Corenet.Apps.Register(&app) + + return &app, nil +} + +func acceptStreams(n *core.IpfsNode, app *corenet.AppInfo, listener Listener) { + for app.Running { + remote, err := listener.Accept() + if err != nil { + listener.Close() + break + } + + local, err := manet.Dial(app.Address) + if err != nil { + remote.Close() + continue + } + + stream := corenet.StreamInfo{ + Protocol: app.Protocol, + + LocalPeer: app.Identity, + LocalAddr: app.Address, + + RemotePeer: remote.Conn().RemotePeer(), + RemoteAddr: remote.Conn().RemoteMultiaddr(), + + Local: local, + Remote: remote, + + Registry: &n.Corenet.Streams, + } + + n.Corenet.Streams.Register(&stream) + startStreaming(&stream) + } + n.Corenet.Apps.Deregister(app.Protocol) +} diff --git a/corenet/net/net.go b/corenet/net/net.go index ee509b2f1..5323c98f2 100644 --- a/corenet/net/net.go +++ b/corenet/net/net.go @@ -5,6 +5,7 @@ import ( context "context" core "github.com/ipfs/go-ipfs/core" + net "gx/ipfs/QmRscs8KxrSmSv4iuevHv8JfuUzHBMoqiaHzxfDRiksd6e/go-libp2p-net" pstore "gx/ipfs/QmXZSd1qR5BxZkPyuwfT5jpqQFScZccoZvDneXsKzCNHWX/go-libp2p-peerstore" pro "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" @@ -67,7 +68,7 @@ func Listen(nd *core.IpfsNode, protocol string) (*IpfsListener, error) { } // Dial dials to a specified node and protocol -func Dial(nd *core.IpfsNode, p peer.ID, protocol string) (net.Stream, error) { +func dial(nd *core.IpfsNode, p peer.ID, protocol string) (net.Stream, error) { ctx, cancel := context.WithTimeout(nd.Context(), time.Second*30) defer cancel() err := nd.PeerHost.Connect(ctx, pstore.PeerInfo{ID: p}) @@ -76,3 +77,17 @@ func Dial(nd *core.IpfsNode, p peer.ID, protocol string) (net.Stream, error) { } return nd.PeerHost.NewStream(nd.Context(), p, pro.ID(protocol)) } + +// CheckProtoExists checks whether a protocol handler is registered to +// mux handler +func CheckProtoExists(n *core.IpfsNode, proto string) bool { + protos := n.PeerHost.Mux().Protocols() + + for _, p := range protos { + if p != proto { + continue + } + return true + } + return false +} diff --git a/corenet/net/util.go b/corenet/net/util.go new file mode 100644 index 000000000..0b35b77f4 --- /dev/null +++ b/corenet/net/util.go @@ -0,0 +1,19 @@ +package net + +import ( + "io" + + corenet "github.com/ipfs/go-ipfs/corenet" +) + +func startStreaming(stream *corenet.StreamInfo) { + go func() { + io.Copy(stream.Local, stream.Remote) + stream.Close() + }() + + go func() { + io.Copy(stream.Remote, stream.Local) + stream.Close() + }() +}