1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-07-01 02:30:39 +08:00

Corenet API: Apply suggestions, cleanups

License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
This commit is contained in:
Łukasz Magiera
2017-05-28 23:36:37 +02:00
parent 69d3e08776
commit fc08d5331b
3 changed files with 95 additions and 39 deletions

View File

@ -9,6 +9,7 @@ import (
"text/tabwriter"
cmds "github.com/ipfs/go-ipfs/commands"
core "github.com/ipfs/go-ipfs/core"
corenet "github.com/ipfs/go-ipfs/core/corenet"
peerstore "gx/ipfs/QmNUVzEjq3XWJ89hegahPvyfJbTXgTaom48pLb7YBD9gHQ/go-libp2p-peerstore"
@ -18,14 +19,13 @@ import (
manet "gx/ipfs/Qmf1Gq7N45Rpuw7ev47uWgH6dLPtdnvcMRNPkVBwqjLJg2/go-multiaddr-net"
)
// Command output types.
type AppInfoOutput struct {
type CorenetAppInfoOutput struct {
Protocol string
Address string
}
type StreamInfoOutput struct {
HandlerId string
type CorenetStreamInfoOutput struct {
HandlerID string
Protocol string
LocalPeer string
LocalAddress string
@ -34,11 +34,11 @@ type StreamInfoOutput struct {
}
type CorenetLsOutput struct {
Apps []AppInfoOutput
Apps []CorenetAppInfoOutput
}
type CorenetStreamsOutput struct {
Streams []StreamInfoOutput
Streams []CorenetStreamInfoOutput
}
// cnAppInfo holds information on a local application protocol listener service.
@ -91,7 +91,7 @@ func (c *cnAppRegistry) Deregister(proto string) {
// cnStreamInfo holds information on active incoming and outgoing protocol app streams.
type cnStreamInfo struct {
handlerId uint64
handlerID uint64
protocol string
@ -108,7 +108,7 @@ type cnStreamInfo struct {
func (c *cnStreamInfo) Close() error {
c.local.Close()
c.remote.Close()
streams.Deregister(c.handlerId)
streams.Deregister(c.handlerID)
return nil
}
@ -116,19 +116,19 @@ func (c *cnStreamInfo) Close() error {
type cnStreamRegistry struct {
streams []*cnStreamInfo
nextId uint64
nextID uint64
}
func (c *cnStreamRegistry) Register(streamInfo *cnStreamInfo) {
streamInfo.handlerId = c.nextId
streamInfo.handlerID = c.nextID
c.streams = append(c.streams, streamInfo)
c.nextId += 1
c.nextID++
}
func (c *cnStreamRegistry) Deregister(handlerId uint64) {
func (c *cnStreamRegistry) Deregister(handlerID uint64) {
foundAt := -1
for i, s := range c.streams {
if s.handlerId == handlerId {
if s.handlerID == handlerID {
foundAt = i
break
}
@ -145,7 +145,11 @@ var streams cnStreamRegistry
var CorenetCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Application network streams.",
Tagline: "Libp2p stream mounting.",
ShortDescription: `
Expose a local application to remote peers over libp2p
Note: this command is experimental and subject to change as usecases and APIs are refined`,
},
Subcommands: map[string]*cmds.Command{
@ -162,7 +166,7 @@ var CorenetLsCmd = &cmds.Command{
Tagline: "List active application protocol listeners.",
},
Options: []cmds.Option{
cmds.BoolOption("headers", "v", "Print table headers (HandlerId, Protocol, Local, Remote).").Default(false),
cmds.BoolOption("headers", "v", "Print table headers (HandlerID, Protocol, Local, Remote).").Default(false),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
@ -171,6 +175,12 @@ var CorenetLsCmd = &cmds.Command{
return
}
err = checkEnabled(n)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
if !n.OnlineMode() {
res.SetError(errNotOnline, cmds.ErrClient)
return
@ -179,7 +189,7 @@ var CorenetLsCmd = &cmds.Command{
output := &CorenetLsOutput{}
for _, a := range apps.apps {
output.Apps = append(output.Apps, AppInfoOutput{
output.Apps = append(output.Apps, CorenetAppInfoOutput{
Protocol: a.protocol,
Address: a.address.String(),
})
@ -210,10 +220,10 @@ var CorenetLsCmd = &cmds.Command{
var CorenetStreamsCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "List active application protocol connections.",
Tagline: "List active application protocol streams.",
},
Options: []cmds.Option{
cmds.BoolOption("headers", "v", "Print table headers (HandlerId, Protocol, Local, Remote).").Default(false),
cmds.BoolOption("headers", "v", "Print table headers (HandlerID, Protocol, Local, Remote).").Default(false),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
@ -222,6 +232,12 @@ var CorenetStreamsCmd = &cmds.Command{
return
}
err = checkEnabled(n)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
if !n.OnlineMode() {
res.SetError(errNotOnline, cmds.ErrClient)
return
@ -230,8 +246,8 @@ var CorenetStreamsCmd = &cmds.Command{
output := &CorenetStreamsOutput{}
for _, s := range streams.streams {
output.Streams = append(output.Streams, StreamInfoOutput{
HandlerId: strconv.FormatUint(s.handlerId, 10),
output.Streams = append(output.Streams, CorenetStreamInfoOutput{
HandlerID: strconv.FormatUint(s.handlerID, 10),
Protocol: s.protocol,
@ -254,10 +270,10 @@ var CorenetStreamsCmd = &cmds.Command{
w := tabwriter.NewWriter(buf, 1, 2, 1, ' ', 0)
for _, stream := range list.Streams {
if headers {
fmt.Fprintln(w, "HandlerId\tProtocol\tLocal\tRemote")
fmt.Fprintln(w, "HandlerID\tProtocol\tLocal\tRemote")
}
fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", stream.HandlerId, stream.Protocol, stream.LocalAddress, stream.RemotePeer)
fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", stream.HandlerID, stream.Protocol, stream.LocalAddress, stream.RemotePeer)
}
w.Flush()
@ -281,6 +297,12 @@ var CorenetListenCmd = &cmds.Command{
return
}
err = checkEnabled(n)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
if !n.OnlineMode() {
res.SetError(errNotOnline, cmds.ErrClient)
return
@ -288,7 +310,7 @@ var CorenetListenCmd = &cmds.Command{
proto := "/app/" + req.Arguments()[0]
if checkProtoExists(n.PeerHost.Mux().Protocols(), proto) {
res.SetError(errors.New("Protocol handler already registered."), cmds.ErrNormal)
res.SetError(errors.New("protocol handler already registered"), cmds.ErrNormal)
return
}
@ -317,7 +339,7 @@ var CorenetListenCmd = &cmds.Command{
apps.Register(&app)
// Successful response.
res.SetOutput(&AppInfoOutput{
res.SetOutput(&CorenetAppInfoOutput{
Protocol: proto,
Address: addr.String(),
})
@ -395,6 +417,12 @@ var CorenetDialCmd = &cmds.Command{
return
}
err = checkEnabled(n)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
if !n.OnlineMode() {
res.SetError(errNotOnline, cmds.ErrClient)
return
@ -454,11 +482,11 @@ var CorenetDialCmd = &cmds.Command{
go doAccept(&app, remote, listener)
default:
res.SetError(errors.New("Unsupported protocol: "+lnet), cmds.ErrNormal)
res.SetError(errors.New("unsupported protocol: "+lnet), cmds.ErrNormal)
return
}
output := AppInfoOutput{
output := CorenetAppInfoOutput{
Protocol: app.protocol,
Address: app.address.String(),
}
@ -497,8 +525,8 @@ var CorenetCloseCmd = &cmds.Command{
Tagline: "Closes an active stream listener or client.",
},
Arguments: []cmds.Argument{
cmds.StringArg("HandlerId", false, false, "Application listener or client HandlerId"),
cmds.StringArg("Protocol", false, false, "Application listener or client HandlerId"),
cmds.StringArg("HandlerID", false, false, "Application listener or client HandlerID"),
cmds.StringArg("Protocol", false, false, "Application listener or client HandlerID"),
},
Options: []cmds.Option{
cmds.BoolOption("all", "a", "Close all streams and listeners.").Default(false),
@ -510,6 +538,12 @@ var CorenetCloseCmd = &cmds.Command{
return
}
err = checkEnabled(n)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
if !n.OnlineMode() {
res.SetError(errNotOnline, cmds.ErrClient)
return
@ -518,27 +552,27 @@ var CorenetCloseCmd = &cmds.Command{
closeAll, _, _ := req.Option("all").Bool()
var proto string
var handlerId uint64
var handlerID uint64
useHandlerId := false
useHandlerID := false
if !closeAll && len(req.Arguments()) == 0 {
res.SetError(errors.New("You must supply a handlerId or stream protocol."), cmds.ErrNormal)
res.SetError(errors.New(" handlerID nor stream protocol"), cmds.ErrNormal)
return
} else if !closeAll {
handlerId, err = strconv.ParseUint(req.Arguments()[0], 10, 64)
handlerID, err = strconv.ParseUint(req.Arguments()[0], 10, 64)
if err != nil {
proto = "/app/" + req.Arguments()[0]
} else {
useHandlerId = true
useHandlerID = true
}
}
if closeAll || useHandlerId {
if closeAll || useHandlerID {
for _, s := range streams.streams {
if !closeAll && handlerId != s.handlerId {
if !closeAll && handlerID != s.handlerID {
continue
}
s.Close()
@ -548,7 +582,7 @@ var CorenetCloseCmd = &cmds.Command{
}
}
if closeAll || !useHandlerId {
if closeAll || !useHandlerID {
for _, a := range apps.apps {
if !closeAll && a.protocol != proto {
continue
@ -564,3 +598,15 @@ var CorenetCloseCmd = &cmds.Command{
}
},
}
func checkEnabled(n *core.IpfsNode) error {
config, err := n.Repo.Config()
if err != nil {
return err
}
if !config.Experimental.Libp2pStreamMounting {
return errors.New("libp2p stream mounting not enabled")
}
return nil
}

View File

@ -1,6 +1,7 @@
package config
type Experiments struct {
FilestoreEnabled bool
ShardingEnabled bool
FilestoreEnabled bool
ShardingEnabled bool
Libp2pStreamMounting bool
}

View File

@ -26,7 +26,16 @@ test_expect_success "test ports are closed" '
(! (netstat -ln | grep "LISTEN" | grep ":10102 "))
'
test_expect_success 'start ipfs listener' '
test_must_fail 'fail without config option being enabled' '
ipfsi 0 exp corenet ls
'
test_expect_success "enable filestore config setting" '
ipfsi 0 config --json Experimental.Libp2pStreamMounting true
ipfsi 1 config --json Experimental.Libp2pStreamMounting true
'
test_expect_success 'start corenet listener' '
ipfsi 0 exp corenet listen corenet-test /ip4/127.0.0.1/tcp/10101 2>&1 > listener-stdouterr.log
'