mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-28 00:39:31 +08:00
coreapi: implement pubsub api
License: MIT Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
This commit is contained in:

committed by
Steven Allen

parent
3d3bedcf41
commit
51bb9d6843
@ -72,3 +72,8 @@ func (api *CoreAPI) Dht() coreiface.DhtAPI {
|
|||||||
func (api *CoreAPI) Swarm() coreiface.SwarmAPI {
|
func (api *CoreAPI) Swarm() coreiface.SwarmAPI {
|
||||||
return (*SwarmAPI)(api)
|
return (*SwarmAPI)(api)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PubSub returns the PubSubAPI interface implementation backed by the go-ipfs node
|
||||||
|
func (api *CoreAPI) PubSub() coreiface.PubSubAPI {
|
||||||
|
return (*PubSubAPI)(api)
|
||||||
|
}
|
||||||
|
@ -37,6 +37,9 @@ type CoreAPI interface {
|
|||||||
// Swarm returns an implementation of Swarm API
|
// Swarm returns an implementation of Swarm API
|
||||||
Swarm() SwarmAPI
|
Swarm() SwarmAPI
|
||||||
|
|
||||||
|
// PubSub returns an implementation of PubSub API
|
||||||
|
PubSub() PubSubAPI
|
||||||
|
|
||||||
// ResolvePath resolves the path using Unixfs resolver
|
// ResolvePath resolves the path using Unixfs resolver
|
||||||
ResolvePath(context.Context, Path) (ResolvedPath, error)
|
ResolvePath(context.Context, Path) (ResolvedPath, error)
|
||||||
|
|
||||||
|
@ -4,5 +4,5 @@ import "errors"
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
ErrIsDir = errors.New("object is a directory")
|
ErrIsDir = errors.New("object is a directory")
|
||||||
ErrOffline = errors.New("can't resolve, ipfs node is offline")
|
ErrOffline = errors.New("this action must be run in online mode, try running 'ipfs daemon' first")
|
||||||
)
|
)
|
||||||
|
@ -39,16 +39,18 @@ func PubSubSubscribeOptions(opts ...PubSubSubscribeOption) (*PubSubSubscribeSett
|
|||||||
return options, nil
|
return options, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type PubSubOptions struct{}
|
type pubsubOpts struct{}
|
||||||
|
|
||||||
func (api *PubSubOptions) WithTopic(topic string) PubSubPeersOption {
|
var PubBub nameOpts
|
||||||
|
|
||||||
|
func (pubsubOpts) Topic(topic string) PubSubPeersOption {
|
||||||
return func(settings *PubSubPeersSettings) error {
|
return func(settings *PubSubPeersSettings) error {
|
||||||
settings.Topic = topic
|
settings.Topic = topic
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *PubSubOptions) WithDiscover(discover bool) PubSubSubscribeOption {
|
func (pubsubOpts) Discover(discover bool) PubSubSubscribeOption {
|
||||||
return func(settings *PubSubSubscribeSettings) error {
|
return func(settings *PubSubSubscribeSettings) error {
|
||||||
settings.Discover = discover
|
settings.Discover = discover
|
||||||
return nil
|
return nil
|
||||||
|
@ -6,15 +6,15 @@ import (
|
|||||||
|
|
||||||
options "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
|
options "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
|
||||||
|
|
||||||
peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer"
|
peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PubSubSubscription is an active PubSub subscription
|
// PubSubSubscription is an active PubSub subscription
|
||||||
type PubSubSubscription interface {
|
type PubSubSubscription interface {
|
||||||
io.Closer
|
io.Closer
|
||||||
|
|
||||||
// Chan return incoming message channel
|
// Next return the next incoming message
|
||||||
Chan(context.Context) <-chan PubSubMessage
|
Next(context.Context) (PubSubMessage, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PubSubMessage is a single PubSub message
|
// PubSubMessage is a single PubSub message
|
||||||
@ -35,17 +35,9 @@ type PubSubAPI interface {
|
|||||||
// TODO: WithTopic
|
// TODO: WithTopic
|
||||||
Peers(context.Context, ...options.PubSubPeersOption) ([]peer.ID, error)
|
Peers(context.Context, ...options.PubSubPeersOption) ([]peer.ID, error)
|
||||||
|
|
||||||
// WithTopic is an option for peers which specifies a topic filter for the
|
|
||||||
// function
|
|
||||||
WithTopic(topic string) options.PubSubPeersOption
|
|
||||||
|
|
||||||
// Publish a message to a given pubsub topic
|
// Publish a message to a given pubsub topic
|
||||||
Publish(context.Context, string, []byte) error
|
Publish(context.Context, string, []byte) error
|
||||||
|
|
||||||
// Subscribe to messages on a given topic
|
// Subscribe to messages on a given topic
|
||||||
Subscribe(context.Context, string) (PubSubSubscription, error)
|
Subscribe(context.Context, string, ...options.PubSubSubscribeOption) (PubSubSubscription, error)
|
||||||
|
|
||||||
// WithDiscover is an option for Subscribe which specifies whether to try to
|
|
||||||
// discover other peers subscribed to the same topic
|
|
||||||
WithDiscover(discover bool) options.PubSubSubscribeOption
|
|
||||||
}
|
}
|
||||||
|
105
core/coreapi/pubsub.go
Normal file
105
core/coreapi/pubsub.go
Normal file
@ -0,0 +1,105 @@
|
|||||||
|
package coreapi
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
|
||||||
|
caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
|
||||||
|
|
||||||
|
peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
|
||||||
|
floodsub "gx/ipfs/QmY1L5krVk8dv8d74uESmJTXGpoigVYqBVxXXz1aS8aFSb/go-libp2p-floodsub"
|
||||||
|
)
|
||||||
|
|
||||||
|
type PubSubAPI CoreAPI
|
||||||
|
|
||||||
|
type pubSubSubscription struct {
|
||||||
|
subscription *floodsub.Subscription
|
||||||
|
}
|
||||||
|
|
||||||
|
type pubSubMessage struct {
|
||||||
|
msg *floodsub.Message
|
||||||
|
}
|
||||||
|
|
||||||
|
func (api *PubSubAPI) Ls(ctx context.Context) ([]string, error) {
|
||||||
|
if err := api.checkNode(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return api.node.Floodsub.GetTopics(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (api *PubSubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOption) ([]peer.ID, error) {
|
||||||
|
if err := api.checkNode(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
settings, err := caopts.PubSubPeersOptions(opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
peers := api.node.Floodsub.ListPeers(settings.Topic)
|
||||||
|
out := make([]peer.ID, len(peers))
|
||||||
|
|
||||||
|
for i, peer := range peers {
|
||||||
|
out[i] = peer
|
||||||
|
}
|
||||||
|
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (api *PubSubAPI) Publish(ctx context.Context, topic string, data []byte) error {
|
||||||
|
if err := api.checkNode(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return api.node.Floodsub.Publish(topic, data)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (coreiface.PubSubSubscription, error) {
|
||||||
|
if err := api.checkNode(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
sub, err := api.node.Floodsub.Subscribe(topic)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &pubSubSubscription{sub}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (api *PubSubAPI) checkNode() error {
|
||||||
|
if !api.node.OnlineMode() {
|
||||||
|
return coreiface.ErrOffline
|
||||||
|
}
|
||||||
|
|
||||||
|
if api.node.Floodsub == nil {
|
||||||
|
return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sub *pubSubSubscription) Close() error {
|
||||||
|
sub.subscription.Cancel()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sub *pubSubSubscription) Next(ctx context.Context) (coreiface.PubSubMessage, error) {
|
||||||
|
msg, err := sub.subscription.Next(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &pubSubMessage{msg}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (msg *pubSubMessage) From() peer.ID {
|
||||||
|
return peer.ID(msg.msg.From)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (msg *pubSubMessage) Data() []byte {
|
||||||
|
return msg.msg.Data
|
||||||
|
}
|
Reference in New Issue
Block a user