From 51bb9d6843f12507db23d809caab163862ad2cc8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 10 Mar 2018 19:28:22 +0100 Subject: [PATCH] coreapi: implement pubsub api MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Ɓukasz Magiera --- core/coreapi/coreapi.go | 5 ++ core/coreapi/interface/coreapi.go | 3 + core/coreapi/interface/errors.go | 2 +- core/coreapi/interface/options/pubsub.go | 8 +- core/coreapi/interface/pubsub.go | 16 +--- core/coreapi/pubsub.go | 105 +++++++++++++++++++++++ 6 files changed, 123 insertions(+), 16 deletions(-) create mode 100644 core/coreapi/pubsub.go diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go index 29a8b78fb..449d8440f 100644 --- a/core/coreapi/coreapi.go +++ b/core/coreapi/coreapi.go @@ -72,3 +72,8 @@ func (api *CoreAPI) Dht() coreiface.DhtAPI { func (api *CoreAPI) Swarm() coreiface.SwarmAPI { return (*SwarmAPI)(api) } + +// PubSub returns the PubSubAPI interface implementation backed by the go-ipfs node +func (api *CoreAPI) PubSub() coreiface.PubSubAPI { + return (*PubSubAPI)(api) +} diff --git a/core/coreapi/interface/coreapi.go b/core/coreapi/interface/coreapi.go index 0b153b6f9..bc889237b 100644 --- a/core/coreapi/interface/coreapi.go +++ b/core/coreapi/interface/coreapi.go @@ -37,6 +37,9 @@ type CoreAPI interface { // Swarm returns an implementation of Swarm API Swarm() SwarmAPI + // PubSub returns an implementation of PubSub API + PubSub() PubSubAPI + // ResolvePath resolves the path using Unixfs resolver ResolvePath(context.Context, Path) (ResolvedPath, error) diff --git a/core/coreapi/interface/errors.go b/core/coreapi/interface/errors.go index 81f978971..072275409 100644 --- a/core/coreapi/interface/errors.go +++ b/core/coreapi/interface/errors.go @@ -4,5 +4,5 @@ import "errors" var ( ErrIsDir = errors.New("object is a directory") - ErrOffline = errors.New("can't resolve, ipfs node is offline") + ErrOffline = errors.New("this action must be run in online mode, try running 'ipfs daemon' first") ) diff --git a/core/coreapi/interface/options/pubsub.go b/core/coreapi/interface/options/pubsub.go index e276d7e4a..f0a614d58 100644 --- a/core/coreapi/interface/options/pubsub.go +++ b/core/coreapi/interface/options/pubsub.go @@ -39,16 +39,18 @@ func PubSubSubscribeOptions(opts ...PubSubSubscribeOption) (*PubSubSubscribeSett return options, nil } -type PubSubOptions struct{} +type pubsubOpts struct{} -func (api *PubSubOptions) WithTopic(topic string) PubSubPeersOption { +var PubBub nameOpts + +func (pubsubOpts) Topic(topic string) PubSubPeersOption { return func(settings *PubSubPeersSettings) error { settings.Topic = topic return nil } } -func (api *PubSubOptions) WithDiscover(discover bool) PubSubSubscribeOption { +func (pubsubOpts) Discover(discover bool) PubSubSubscribeOption { return func(settings *PubSubSubscribeSettings) error { settings.Discover = discover return nil diff --git a/core/coreapi/interface/pubsub.go b/core/coreapi/interface/pubsub.go index f78734a09..4b52ed6d6 100644 --- a/core/coreapi/interface/pubsub.go +++ b/core/coreapi/interface/pubsub.go @@ -6,15 +6,15 @@ import ( options "github.com/ipfs/go-ipfs/core/coreapi/interface/options" - peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer" + peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer" ) // PubSubSubscription is an active PubSub subscription type PubSubSubscription interface { io.Closer - // Chan return incoming message channel - Chan(context.Context) <-chan PubSubMessage + // Next return the next incoming message + Next(context.Context) (PubSubMessage, error) } // PubSubMessage is a single PubSub message @@ -35,17 +35,9 @@ type PubSubAPI interface { // TODO: WithTopic Peers(context.Context, ...options.PubSubPeersOption) ([]peer.ID, error) - // WithTopic is an option for peers which specifies a topic filter for the - // function - WithTopic(topic string) options.PubSubPeersOption - // Publish a message to a given pubsub topic Publish(context.Context, string, []byte) error // Subscribe to messages on a given topic - Subscribe(context.Context, string) (PubSubSubscription, error) - - // WithDiscover is an option for Subscribe which specifies whether to try to - // discover other peers subscribed to the same topic - WithDiscover(discover bool) options.PubSubSubscribeOption + Subscribe(context.Context, string, ...options.PubSubSubscribeOption) (PubSubSubscription, error) } diff --git a/core/coreapi/pubsub.go b/core/coreapi/pubsub.go new file mode 100644 index 000000000..66edd6907 --- /dev/null +++ b/core/coreapi/pubsub.go @@ -0,0 +1,105 @@ +package coreapi + +import ( + "context" + "errors" + + coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" + caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options" + + peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer" + floodsub "gx/ipfs/QmY1L5krVk8dv8d74uESmJTXGpoigVYqBVxXXz1aS8aFSb/go-libp2p-floodsub" +) + +type PubSubAPI CoreAPI + +type pubSubSubscription struct { + subscription *floodsub.Subscription +} + +type pubSubMessage struct { + msg *floodsub.Message +} + +func (api *PubSubAPI) Ls(ctx context.Context) ([]string, error) { + if err := api.checkNode(); err != nil { + return nil, err + } + + return api.node.Floodsub.GetTopics(), nil +} + +func (api *PubSubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOption) ([]peer.ID, error) { + if err := api.checkNode(); err != nil { + return nil, err + } + + settings, err := caopts.PubSubPeersOptions(opts...) + if err != nil { + return nil, err + } + + peers := api.node.Floodsub.ListPeers(settings.Topic) + out := make([]peer.ID, len(peers)) + + for i, peer := range peers { + out[i] = peer + } + + return out, nil +} + +func (api *PubSubAPI) Publish(ctx context.Context, topic string, data []byte) error { + if err := api.checkNode(); err != nil { + return err + } + + return api.node.Floodsub.Publish(topic, data) +} + +func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (coreiface.PubSubSubscription, error) { + if err := api.checkNode(); err != nil { + return nil, err + } + + sub, err := api.node.Floodsub.Subscribe(topic) + if err != nil { + return nil, err + } + + return &pubSubSubscription{sub}, nil +} + +func (api *PubSubAPI) checkNode() error { + if !api.node.OnlineMode() { + return coreiface.ErrOffline + } + + if api.node.Floodsub == nil { + return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.") + } + + return nil +} + +func (sub *pubSubSubscription) Close() error { + sub.subscription.Cancel() + return nil +} + +func (sub *pubSubSubscription) Next(ctx context.Context) (coreiface.PubSubMessage, error) { + msg, err := sub.subscription.Next(ctx) + if err != nil { + return nil, err + } + + return &pubSubMessage{msg}, nil +} + +func (msg *pubSubMessage) From() peer.ID { + return peer.ID(msg.msg.From) +} + +func (msg *pubSubMessage) Data() []byte { + return msg.msg.Data +}