diff --git a/core/commands/ptp.go b/core/commands/ptp.go index f1c610c7b..daeecae71 100644 --- a/core/commands/ptp.go +++ b/core/commands/ptp.go @@ -51,15 +51,40 @@ Note: this command is experimental and subject to change as usecases and APIs ar }, Subcommands: map[string]*cmds.Command{ - "ls": ptpLsCmd, - "streams": ptpStreamsCmd, - "dial": ptpDialCmd, - "listen": ptpListenCmd, - "close": ptpCloseCmd, + "listener": ptpListenerCmd, + "stream": ptpStreamCmd, }, } -var ptpLsCmd = &cmds.Command{ +// ptpListenerCmd is the 'ipfs ptp listener' command +var ptpListenerCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "P2P listener management.", + ShortDescription: "Create and manage listener p2p endpoints", + }, + + Subcommands: map[string]*cmds.Command{ + "ls": ptpListenerLsCmd, + "open": ptpListenerListenCmd, + "close": ptpListenerCloseCmd, + }, +} + +// ptpStreamCmd is the 'ipfs ptp stream' command +var ptpStreamCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "P2P stream management.", + ShortDescription: "Create and manage p2p streams", + }, + + Subcommands: map[string]*cmds.Command{ + "ls": ptpStreamLsCmd, + "dial": ptpStreamDialCmd, + "close": ptpStreamCloseCmd, + }, +} + +var ptpListenerLsCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "List active p2p listeners.", }, @@ -67,23 +92,13 @@ var ptpLsCmd = &cmds.Command{ cmds.BoolOption("headers", "v", "Print table headers (HandlerID, Protocol, Local, Remote).").Default(false), }, Run: func(req cmds.Request, res cmds.Response) { - n, err := req.InvocContext().GetNode() + + n, err := getNode(req) if err != nil { res.SetError(err, cmds.ErrNormal) return } - err = checkEnabled(n) - if err != nil { - res.SetError(err, cmds.ErrNormal) - return - } - - if !n.OnlineMode() { - res.SetError(errNotOnline, cmds.ErrClient) - return - } - output := &PTPLsOutput{} for _, listener := range n.PTP.Listeners.Listeners { @@ -116,7 +131,7 @@ var ptpLsCmd = &cmds.Command{ }, } -var ptpStreamsCmd = &cmds.Command{ +var ptpStreamLsCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "List active p2p streams.", }, @@ -124,23 +139,12 @@ var ptpStreamsCmd = &cmds.Command{ cmds.BoolOption("headers", "v", "Print table headers (HagndlerID, Protocol, Local, Remote).").Default(false), }, Run: func(req cmds.Request, res cmds.Response) { - n, err := req.InvocContext().GetNode() + n, err := getNode(req) if err != nil { res.SetError(err, cmds.ErrNormal) return } - err = checkEnabled(n) - if err != nil { - res.SetError(err, cmds.ErrNormal) - return - } - - if !n.OnlineMode() { - res.SetError(errNotOnline, cmds.ErrClient) - return - } - output := &PTPStreamsOutput{} for _, s := range n.PTP.Streams.Streams { @@ -180,7 +184,7 @@ var ptpStreamsCmd = &cmds.Command{ }, } -var ptpListenCmd = &cmds.Command{ +var ptpListenerListenCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Forward p2p connections to a network multiaddr.", ShortDescription: ` @@ -194,23 +198,12 @@ Note that the connections originate from the ipfs daemon process. cmds.StringArg("Address", true, false, "Request handling application address."), }, Run: func(req cmds.Request, res cmds.Response) { - n, err := req.InvocContext().GetNode() + n, err := getNode(req) if err != nil { res.SetError(err, cmds.ErrNormal) return } - err = checkEnabled(n) - if err != nil { - res.SetError(err, cmds.ErrNormal) - return - } - - if !n.OnlineMode() { - res.SetError(errNotOnline, cmds.ErrClient) - return - } - proto := "/ptp/" + req.Arguments()[0] if n.PTP.CheckProtoExists(proto) { res.SetError(errors.New("protocol handler already registered"), cmds.ErrNormal) @@ -237,7 +230,7 @@ Note that the connections originate from the ipfs daemon process. }, } -var ptpDialCmd = &cmds.Command{ +var ptpStreamDialCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Dial to a p2p listener.", @@ -255,23 +248,12 @@ transparently connect to a p2p service. cmds.StringArg("BindAddress", false, false, "Address to listen for connection/s (default: /ip4/127.0.0.1/tcp/0)."), }, Run: func(req cmds.Request, res cmds.Response) { - n, err := req.InvocContext().GetNode() + n, err := getNode(req) if err != nil { res.SetError(err, cmds.ErrNormal) return } - err = checkEnabled(n) - if err != nil { - res.SetError(err, cmds.ErrNormal) - return - } - - if !n.OnlineMode() { - res.SetError(errNotOnline, cmds.ErrClient) - return - } - addr, peer, err := ParsePeerParam(req.Arguments()[0]) if err != nil { res.SetError(err, cmds.ErrNormal) @@ -304,89 +286,110 @@ transparently connect to a p2p service. }, } -var ptpCloseCmd = &cmds.Command{ +var ptpListenerCloseCmd = &cmds.Command{ Helptext: cmds.HelpText{ - Tagline: "Closes an active p2p stream or listener.", + Tagline: "Close active p2p listener.", }, Arguments: []cmds.Argument{ - cmds.StringArg("Identifier", false, false, "Stream HandlerID or p2p listener protocol"), + cmds.StringArg("Protocol", false, false, "P2P listener protocol"), }, Options: []cmds.Option{ - cmds.BoolOption("all", "a", "Close all streams and listeners.").Default(false), + cmds.BoolOption("all", "a", "Close all listeners.").Default(false), }, Run: func(req cmds.Request, res cmds.Response) { - n, err := req.InvocContext().GetNode() + n, err := getNode(req) if err != nil { res.SetError(err, cmds.ErrNormal) return } - err = checkEnabled(n) - if err != nil { - res.SetError(err, cmds.ErrNormal) - return - } - - if !n.OnlineMode() { - res.SetError(errNotOnline, cmds.ErrClient) - return - } - closeAll, _, _ := req.Option("all").Bool() - var proto string - var handlerID uint64 - - useHandlerID := false if !closeAll { if len(req.Arguments()) == 0 { - res.SetError(errors.New("no handlerID nor listener protocol specified"), cmds.ErrNormal) + res.SetError(errors.New("no protocol name specified"), cmds.ErrNormal) + return + } + + proto = "/ptp/" + req.Arguments()[0] + } + + for _, listener := range n.PTP.Listeners.Listeners { + if !closeAll && listener.Protocol != proto { + continue + } + listener.Close() + if !closeAll { + break + } + } + }, +} + +var ptpStreamCloseCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Close active p2p stream.", + }, + Arguments: []cmds.Argument{ + cmds.StringArg("HandlerID", false, false, "Stream HandlerID"), + }, + Options: []cmds.Option{ + cmds.BoolOption("all", "a", "Close all streams.").Default(false), + }, + Run: func(req cmds.Request, res cmds.Response) { + n, err := getNode(req) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + closeAll, _, _ := req.Option("all").Bool() + var handlerID uint64 + + if !closeAll { + if len(req.Arguments()) == 0 { + res.SetError(errors.New("no HandlerID specified"), cmds.ErrNormal) return } handlerID, err = strconv.ParseUint(req.Arguments()[0], 10, 64) if err != nil { - proto = "/ptp/" + req.Arguments()[0] - } else { - useHandlerID = true + res.SetError(err, cmds.ErrNormal) + return } } - if closeAll || useHandlerID { - for _, stream := range n.PTP.Streams.Streams { - if !closeAll && handlerID != stream.HandlerID { - continue - } - stream.Close() - if !closeAll { - break - } + for _, stream := range n.PTP.Streams.Streams { + if !closeAll && handlerID != stream.HandlerID { + continue } - } - - if closeAll || !useHandlerID { - for _, listener := range n.PTP.Listeners.Listeners { - if !closeAll && listener.Protocol != proto { - continue - } - listener.Close() - if !closeAll { - break - } + stream.Close() + if !closeAll { + break } } }, } -func checkEnabled(n *core.IpfsNode) error { +func getNode(req cmds.Request) (*core.IpfsNode, error) { + n, err := req.InvocContext().GetNode() + if err != nil { + return nil, err + } + config, err := n.Repo.Config() if err != nil { - return err + return nil, err } if !config.Experimental.Libp2pStreamMounting { - return errors.New("libp2p stream mounting not enabled") + return nil, errors.New("libp2p stream mounting not enabled") } - return nil + + if !n.OnlineMode() { + return nil, errNotOnline + } + + return n, nil } diff --git a/ptp/ptp.go b/ptp/ptp.go index ea8500ddc..e7842838b 100644 --- a/ptp/ptp.go +++ b/ptp/ptp.go @@ -43,6 +43,7 @@ func (ptp *PTP) newStreamTo(ctx2 context.Context, p peer.ID, protocol string) (n return ptp.peerHost.NewStream(ctx2, p, pro.ID(protocol)) } +// Dial creates new P2P stream to a remote listener func (ptp *PTP) Dial(ctx context.Context, addr ma.Multiaddr, peer peer.ID, proto string, bindAddr ma.Multiaddr) (*ListenerInfo, error) { lnet, _, err := manet.DialArgs(bindAddr) if err != nil { diff --git a/ptp/registry.go b/ptp/registry.go index b92a767f9..586719eba 100644 --- a/ptp/registry.go +++ b/ptp/registry.go @@ -83,10 +83,10 @@ type StreamInfo struct { } // Close closes stream endpoints and deregisters it -func (c *StreamInfo) Close() error { - c.Local.Close() - c.Remote.Close() - c.Registry.Deregister(c.HandlerID) +func (s *StreamInfo) Close() error { + s.Local.Close() + s.Remote.Close() + s.Registry.Deregister(s.HandlerID) return nil } diff --git a/test/dependencies/ma-pipe-unidir/main.go b/test/dependencies/ma-pipe-unidir/main.go index 1f215786a..24412d000 100644 --- a/test/dependencies/ma-pipe-unidir/main.go +++ b/test/dependencies/ma-pipe-unidir/main.go @@ -38,6 +38,11 @@ func app() int { mode := args[0] addr := args[1] + if mode != "send" && mode != "recv" { + fmt.Print(USAGE) + return 1 + } + if len(opts.PidFile) > 0 { data := []byte(strconv.Itoa(os.Getpid())) err := ioutil.WriteFile(opts.PidFile, data, 0644) @@ -80,8 +85,6 @@ func app() int { case "send": io.Copy(conn, os.Stdin) default: - //TODO: a bit late - fmt.Print(USAGE) return 1 } return 0 diff --git a/test/sharness/t0180-ptp.sh b/test/sharness/t0180-ptp.sh index 398030169..b321bad8e 100755 --- a/test/sharness/t0180-ptp.sh +++ b/test/sharness/t0180-ptp.sh @@ -27,7 +27,7 @@ test_expect_success "test ports are closed" ' ' test_must_fail 'fail without config option being enabled' ' - ipfsi 0 ptp ls + ipfsi 0 ptp stream ls ' test_expect_success "enable filestore config setting" ' @@ -36,14 +36,14 @@ test_expect_success "enable filestore config setting" ' ' test_expect_success 'start ptp listener' ' - ipfsi 0 ptp listen ptp-test /ip4/127.0.0.1/tcp/10101 2>&1 > listener-stdouterr.log + ipfsi 0 ptp listener open ptp-test /ip4/127.0.0.1/tcp/10101 2>&1 > listener-stdouterr.log ' test_expect_success 'Test server to client communications' ' ma-pipe-unidir --listen send /ip4/127.0.0.1/tcp/10101 < test0.bin & SERVER_PID=$! - ipfsi 1 ptp dial $PEERID_0 ptp-test /ip4/127.0.0.1/tcp/10102 2>&1 > dialer-stdouterr.log && + ipfsi 1 ptp stream dial $PEERID_0 ptp-test /ip4/127.0.0.1/tcp/10102 2>&1 > dialer-stdouterr.log && ma-pipe-unidir recv /ip4/127.0.0.1/tcp/10102 > client.out && wait $SERVER_PID ' @@ -52,7 +52,7 @@ test_expect_success 'Test client to server communications' ' ma-pipe-unidir --listen recv /ip4/127.0.0.1/tcp/10101 > server.out & SERVER_PID=$! - ipfsi 1 ptp dial $PEERID_0 ptp-test /ip4/127.0.0.1/tcp/10102 2>&1 > dialer-stdouterr.log && + ipfsi 1 ptp stream dial $PEERID_0 ptp-test /ip4/127.0.0.1/tcp/10102 2>&1 > dialer-stdouterr.log && ma-pipe-unidir send /ip4/127.0.0.1/tcp/10102 < test1.bin wait $SERVER_PID ' @@ -65,79 +65,90 @@ test_expect_success 'client to server output looks good' ' test_cmp server.out test1.bin ' -test_expect_success "'ipfs ptp ls' succeeds" ' +test_expect_success "'ipfs listener ptp ls' succeeds" ' echo "/ip4/127.0.0.1/tcp/10101 /ptp/ptp-test" > expected && - ipfsi 0 ptp ls > actual + ipfsi 0 ptp listener ls > actual ' -test_expect_success "'ipfs ptp ls' output looks good" ' +test_expect_success "'ipfs ptp listener ls' output looks good" ' test_cmp expected actual ' test_expect_success "Cannot re-register app handler" ' - (! ipfsi 0 ptp listen ptp-test /ip4/127.0.0.1/tcp/10101) + (! ipfsi 0 ptp listener open ptp-test /ip4/127.0.0.1/tcp/10101) ' -test_expect_success "'ipfs ptp streams' output is empty" ' - ipfsi 0 ptp streams > actual && +test_expect_success "'ipfs ptp stream ls' output is empty" ' + ipfsi 0 ptp stream ls > actual && test_must_be_empty actual ' test_expect_success "Setup: Idle stream" ' ma-pipe-unidir --listen --pidFile=listener.pid recv /ip4/127.0.0.1/tcp/10101 & - ipfsi 1 ptp dial $PEERID_0 ptp-test /ip4/127.0.0.1/tcp/10102 2>&1 > dialer-stdouterr.log && + ipfsi 1 ptp stream dial $PEERID_0 ptp-test /ip4/127.0.0.1/tcp/10102 2>&1 > dialer-stdouterr.log && ma-pipe-unidir --pidFile=client.pid recv /ip4/127.0.0.1/tcp/10102 & go-sleep 500ms && kill -0 $(cat listener.pid) && kill -0 $(cat client.pid) ' -test_expect_success "'ipfs ptp streams' succeeds" ' +test_expect_success "'ipfs ptp stream ls' succeeds" ' echo "2 /ptp/ptp-test /ip4/127.0.0.1/tcp/10101 $PEERID_1" > expected - ipfsi 0 ptp streams > actual + ipfsi 0 ptp stream ls > actual ' -test_expect_success "'ipfs ptp streams' output looks good" ' +test_expect_success "'ipfs ptp stream ls' output looks good" ' test_cmp expected actual ' -test_expect_success "'ipfs ptp close' closes stream" ' - ipfsi 0 ptp close 2 && - ipfsi 0 ptp streams > actual && +test_expect_success "'ipfs ptp stream close' closes stream" ' + ipfsi 0 ptp stream close 2 && + ipfsi 0 ptp stream ls > actual && [ ! -f listener.pid ] && [ ! -f client.pid ] && test_must_be_empty actual ' -test_expect_success "'ipfs ptp close' closes app handler" ' - ipfsi 0 ptp close ptp-test && - ipfsi 0 ptp ls > actual && +test_expect_success "'ipfs ptp listener close' closes app handler" ' + ipfsi 0 ptp listener close ptp-test && + ipfsi 0 ptp listener ls > actual && test_must_be_empty actual ' test_expect_success "Setup: Idle stream(2)" ' ma-pipe-unidir --listen --pidFile=listener.pid recv /ip4/127.0.0.1/tcp/10101 & - ipfsi 0 ptp listen ptp-test2 /ip4/127.0.0.1/tcp/10101 2>&1 > listener-stdouterr.log && - ipfsi 1 ptp dial $PEERID_0 ptp-test2 /ip4/127.0.0.1/tcp/10102 2>&1 > dialer-stdouterr.log && + ipfsi 0 ptp listener open ptp-test2 /ip4/127.0.0.1/tcp/10101 2>&1 > listener-stdouterr.log && + ipfsi 1 ptp stream dial $PEERID_0 ptp-test2 /ip4/127.0.0.1/tcp/10102 2>&1 > dialer-stdouterr.log && ma-pipe-unidir --pidFile=client.pid recv /ip4/127.0.0.1/tcp/10102 & go-sleep 500ms && kill -0 $(cat listener.pid) && kill -0 $(cat client.pid) ' -test_expect_success "'ipfs ptp streams' succeeds(2)" ' +test_expect_success "'ipfs ptp stream ls' succeeds(2)" ' echo "3 /ptp/ptp-test2 /ip4/127.0.0.1/tcp/10101 $PEERID_1" > expected - ipfsi 0 ptp streams > actual + ipfsi 0 ptp stream ls > actual test_cmp expected actual ' -test_expect_success "'ipfs ptp close -a' closes streams and app handlers" ' - ipfsi 0 ptp close -a && - ipfsi 0 ptp streams > actual && +test_expect_success "'ipfs ptp listener close -a' closes app handlers" ' + ipfsi 0 ptp listener close -a && + ipfsi 0 ptp listener ls > actual && + test_must_be_empty actual +' + +test_expect_success "'ipfs ptp stream close -a' closes streams" ' + ipfsi 0 ptp stream close -a && + ipfsi 0 ptp stream ls > actual && [ ! -f listener.pid ] && [ ! -f client.pid ] && - test_must_be_empty actual && - ipfsi 0 ptp ls > actual && + test_must_be_empty actual +' + +test_expect_success "'ipfs ptp listener close' closes app numeric handlers" ' + ipfsi 0 ptp listener open 1234 /ip4/127.0.0.1/tcp/10101 && + ipfsi 0 ptp listener close 1234 && + ipfsi 0 ptp listener ls > actual && test_must_be_empty actual '