1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-08-05 19:02:21 +08:00

Merge pull request #4047 from vyzo/ipns-pubsub

namesys/pubsub: pubsub Publisher and Resolver
This commit is contained in:
Whyrusleeping
2017-11-30 13:41:38 +01:00
committed by GitHub
12 changed files with 1008 additions and 26 deletions

View File

@ -46,6 +46,7 @@ const (
unrestrictedApiAccessKwd = "unrestricted-api"
writableKwd = "writable"
enableFloodSubKwd = "enable-pubsub-experiment"
enableIPNSPubSubKwd = "enable-namesys-pubsub"
enableMultiplexKwd = "enable-mplex-experiment"
// apiAddrKwd = "address-api"
// swarmAddrKwd = "address-swarm"
@ -157,6 +158,7 @@ Headers.
cmdkit.BoolOption(offlineKwd, "Run offline. Do not connect to the rest of the network but provide local API."),
cmdkit.BoolOption(migrateKwd, "If true, assume yes at the migrate prompt. If false, assume no."),
cmdkit.BoolOption(enableFloodSubKwd, "Instantiate the ipfs daemon with the experimental pubsub feature enabled."),
cmdkit.BoolOption(enableIPNSPubSubKwd, "Enable IPNS record distribution through pubsub; enables pubsub."),
cmdkit.BoolOption(enableMultiplexKwd, "Add the experimental 'go-multiplex' stream muxer to libp2p on construction.").WithDefault(true),
// TODO: add way to override addresses. tricky part: updating the config if also --init.
@ -283,6 +285,7 @@ func daemonFunc(req cmds.Request, re cmds.ResponseEmitter) {
offline, _, _ := req.Option(offlineKwd).Bool()
pubsub, _, _ := req.Option(enableFloodSubKwd).Bool()
ipnsps, _, _ := req.Option(enableIPNSPubSubKwd).Bool()
mplex, _, _ := req.Option(enableMultiplexKwd).Bool()
// Start assembling node config
@ -292,6 +295,7 @@ func daemonFunc(req cmds.Request, re cmds.ResponseEmitter) {
Online: !offline,
ExtraOpts: map[string]bool{
"pubsub": pubsub,
"ipnsps": ipnsps,
"mplex": mplex,
},
//TODO(Kubuxu): refactor Online vs Offline by adding Permanent vs Ephemeral

View File

@ -210,7 +210,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
if cfg.Online {
do := setupDiscoveryOption(rcfg.Discovery)
if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do, cfg.getOpt("pubsub"), cfg.getOpt("mplex")); err != nil {
if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do, cfg.getOpt("pubsub"), cfg.getOpt("ipnsps"), cfg.getOpt("mplex")); err != nil {
return err
}
} else {

163
core/commands/ipnsps.go Normal file
View File

@ -0,0 +1,163 @@
package commands
import (
"errors"
"fmt"
"io"
"strings"
cmds "github.com/ipfs/go-ipfs/commands"
e "github.com/ipfs/go-ipfs/core/commands/e"
ns "github.com/ipfs/go-ipfs/namesys"
cmdkit "gx/ipfs/QmUyfy4QSr3NXym4etEiRyxBLqqAeKHJuRdi8AACxg63fZ/go-ipfs-cmdkit"
)
type ipnsPubsubState struct {
Enabled bool
}
type ipnsPubsubCancel struct {
Canceled bool
}
// IpnsPubsubCmd is the subcommand that allows us to manage the IPNS pubsub system
var IpnsPubsubCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "IPNS pubsub management",
ShortDescription: `
Manage and inspect the state of the IPNS pubsub resolver.
Note: this command is experimental and subject to change as the system is refined
`,
},
Subcommands: map[string]*cmds.Command{
"state": ipnspsStateCmd,
"subs": ipnspsSubsCmd,
"cancel": ipnspsCancelCmd,
},
}
var ipnspsStateCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Query the state of IPNS pubsub",
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
_, ok := n.Namesys.GetResolver("pubsub")
res.SetOutput(&ipnsPubsubState{ok})
},
Type: ipnsPubsubState{},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
v, err := unwrapOutput(res.Output())
if err != nil {
return nil, err
}
output, ok := v.(*ipnsPubsubState)
if !ok {
return nil, e.TypeErr(output, v)
}
var state string
if output.Enabled {
state = "enabled"
} else {
state = "disabled"
}
return strings.NewReader(state + "\n"), nil
},
},
}
var ipnspsSubsCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Show current name subscriptions",
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
r, ok := n.Namesys.GetResolver("pubsub")
if !ok {
res.SetError(errors.New("IPNS pubsub subsystem is not enabled"), cmdkit.ErrClient)
return
}
psr, ok := r.(*ns.PubsubResolver)
if !ok {
res.SetError(fmt.Errorf("unexpected resolver type: %v", r), cmdkit.ErrNormal)
return
}
res.SetOutput(&stringList{psr.GetSubscriptions()})
},
Type: stringList{},
Marshalers: cmds.MarshalerMap{
cmds.Text: stringListMarshaler,
},
}
var ipnspsCancelCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Cancel a name subscription",
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
r, ok := n.Namesys.GetResolver("pubsub")
if !ok {
res.SetError(errors.New("IPNS pubsub subsystem is not enabled"), cmdkit.ErrClient)
return
}
psr, ok := r.(*ns.PubsubResolver)
if !ok {
res.SetError(fmt.Errorf("unexpected resolver type: %v", r), cmdkit.ErrNormal)
return
}
ok = psr.Cancel(req.Arguments()[0])
res.SetOutput(&ipnsPubsubCancel{ok})
},
Arguments: []cmdkit.Argument{
cmdkit.StringArg("name", true, false, "Name to cancel the subscription for."),
},
Type: ipnsPubsubCancel{},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
v, err := unwrapOutput(res.Output())
if err != nil {
return nil, err
}
output, ok := v.(*ipnsPubsubCancel)
if !ok {
return nil, e.TypeErr(output, v)
}
var state string
if output.Canceled {
state = "canceled"
} else {
state = "no subscription"
}
return strings.NewReader(state + "\n"), nil
},
},
}

View File

@ -63,5 +63,6 @@ Resolve the value of a dnslink:
Subcommands: map[string]*cmds.Command{
"publish": PublishCmd,
"resolve": IpnsCmd,
"pubsub": IpnsPubsubCmd,
},
}

View File

@ -152,7 +152,7 @@ type Mounts struct {
Ipns mount.Mount
}
func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption, pubsub, mplex bool) error {
func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption, pubsub, ipnsps, mplex bool) error {
if n.PeerHost != nil { // already online.
return errors.New("node already online")
@ -249,10 +249,17 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
return err
}
if pubsub {
if pubsub || ipnsps {
n.Floodsub = floodsub.NewFloodSub(ctx, peerhost)
}
if ipnsps {
err = namesys.AddPubsubNameSystem(ctx, n.Namesys, n.PeerHost, n.Routing, n.Repo.Datastore(), n.Floodsub)
if err != nil {
return err
}
}
n.P2P = p2p.NewP2P(n.Identity, n.PeerHost, n.Peerstore)
// setup local discovery

View File

@ -48,6 +48,10 @@ func (m mockNamesys) PublishWithEOL(ctx context.Context, name ci.PrivKey, value
return errors.New("not implemented for mockNamesys")
}
func (m mockNamesys) GetResolver(subs string) (namesys.Resolver, bool) {
return nil, false
}
func newNodeWithMockNamesys(ns mockNamesys) (*core.IpfsNode, error) {
c := config.Config{
Identity: config.Identity{

View File

@ -70,6 +70,7 @@ var ErrPublishFailed = errors.New("Could not publish name.")
type NameSystem interface {
Resolver
Publisher
ResolverLookup
}
// Resolver is an object capable of resolving names.
@ -112,3 +113,10 @@ type Publisher interface {
// call once the records spec is implemented
PublishWithEOL(ctx context.Context, name ci.PrivKey, value path.Path, eol time.Time) error
}
// ResolverLookup is an object capable of finding resolvers for a subsystem
type ResolverLookup interface {
// GetResolver retrieves a resolver associated with a subsystem
GetResolver(subs string) (Resolver, bool)
}

View File

@ -2,14 +2,20 @@ package namesys
import (
"context"
"errors"
"strings"
"sync"
"time"
path "github.com/ipfs/go-ipfs/path"
routing "gx/ipfs/QmPR2JzfKd9poHx9XBhzoFeBBC31ZM3W5iUPKJZWyaoZZm/go-libp2p-routing"
p2phost "gx/ipfs/QmRS46AyqtpJBsf1zmQdeizSDEzo1qkWR7rdEuPFAv8237/go-libp2p-host"
mh "gx/ipfs/QmU9a9NV9RdPNwZQDYd5uKsm6N6LJLSvLbywDDYFbaaC6P/go-multihash"
floodsub "gx/ipfs/QmVNv1WV6XxzQV4MBuiLX5729wMazaf8TNzm2Sq6ejyHh7/go-libp2p-floodsub"
ds "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore"
peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
isd "gx/ipfs/QmZmmuAXgX73UQmX1jRKjTGmjzq24Jinqkq8vzkBtno4uX/go-is-domain"
ci "gx/ipfs/QmaPbCnUMBohSGo3KnxEa2bHqyJVVeEEcwtqJAYxerieBo/go-libp2p-crypto"
)
@ -36,11 +42,28 @@ func NewNameSystem(r routing.ValueStore, ds ds.Datastore, cachesize int) NameSys
"dht": NewRoutingResolver(r, cachesize),
},
publishers: map[string]Publisher{
"/ipns/": NewRoutingPublisher(r, ds),
"dht": NewRoutingPublisher(r, ds),
},
}
}
// AddPubsubNameSystem adds the pubsub publisher and resolver to the namesystem
func AddPubsubNameSystem(ctx context.Context, ns NameSystem, host p2phost.Host, r routing.IpfsRouting, ds ds.Datastore, ps *floodsub.PubSub) error {
mpns, ok := ns.(*mpns)
if !ok {
return errors.New("unexpected NameSystem; not an mpns instance")
}
pkf, ok := r.(routing.PubKeyFetcher)
if !ok {
return errors.New("unexpected IpfsRouting; not a PubKeyFetcher instance")
}
mpns.resolvers["pubsub"] = NewPubsubResolver(ctx, host, r, pkf, ps)
mpns.publishers["pubsub"] = NewPubsubPublisher(ctx, host, ds, r, ps)
return nil
}
const DefaultResolverCacheTTL = time.Minute
// Resolve implements Resolver.
@ -72,38 +95,100 @@ func (ns *mpns) resolveOnce(ctx context.Context, name string) (path.Path, error)
return "", ErrResolveFailed
}
for protocol, resolver := range ns.resolvers {
log.Debugf("Attempting to resolve %s with %s", segments[2], protocol)
p, err := resolver.resolveOnce(ctx, segments[2])
if err == nil {
if len(segments) > 3 {
return path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[3])
} else {
return p, err
}
makePath := func(p path.Path) (path.Path, error) {
if len(segments) > 3 {
return path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[3])
} else {
return p, nil
}
}
// Resolver selection:
// 1. if it is a multihash resolve through "pubsub" (if available),
// with fallback to "dht"
// 2. if it is a domain name, resolve through "dns"
// 3. otherwise resolve through the "proquint" resolver
key := segments[2]
_, err := mh.FromB58String(key)
if err == nil {
res, ok := ns.resolvers["pubsub"]
if ok {
p, err := res.resolveOnce(ctx, key)
if err == nil {
return makePath(p)
}
}
res, ok = ns.resolvers["dht"]
if ok {
p, err := res.resolveOnce(ctx, key)
if err == nil {
return makePath(p)
}
}
return "", ErrResolveFailed
}
if isd.IsDomain(key) {
res, ok := ns.resolvers["dns"]
if ok {
p, err := res.resolveOnce(ctx, key)
if err == nil {
return makePath(p)
}
}
return "", ErrResolveFailed
}
res, ok := ns.resolvers["proquint"]
if ok {
p, err := res.resolveOnce(ctx, key)
if err == nil {
return makePath(p)
}
return "", ErrResolveFailed
}
log.Warningf("No resolver found for %s", name)
return "", ErrResolveFailed
}
// Publish implements Publisher
func (ns *mpns) Publish(ctx context.Context, name ci.PrivKey, value path.Path) error {
err := ns.publishers["/ipns/"].Publish(ctx, name, value)
if err != nil {
return err
}
ns.addToDHTCache(name, value, time.Now().Add(DefaultRecordTTL))
return nil
return ns.PublishWithEOL(ctx, name, value, time.Now().Add(DefaultRecordTTL))
}
func (ns *mpns) PublishWithEOL(ctx context.Context, name ci.PrivKey, value path.Path, eol time.Time) error {
err := ns.publishers["/ipns/"].PublishWithEOL(ctx, name, value, eol)
if err != nil {
return err
var dhtErr error
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
dhtErr = ns.publishers["dht"].PublishWithEOL(ctx, name, value, eol)
if dhtErr == nil {
ns.addToDHTCache(name, value, eol)
}
wg.Done()
}()
pub, ok := ns.publishers["pubsub"]
if ok {
wg.Add(1)
go func() {
err := pub.PublishWithEOL(ctx, name, value, eol)
if err != nil {
log.Warningf("error publishing %s with pubsub: %s", name, err.Error())
}
wg.Done()
}()
}
ns.addToDHTCache(name, value, eol)
return nil
wg.Wait()
return dhtErr
}
func (ns *mpns) addToDHTCache(key ci.PrivKey, value path.Path, eol time.Time) {
@ -138,3 +223,16 @@ func (ns *mpns) addToDHTCache(key ci.PrivKey, value path.Path, eol time.Time) {
eol: eol,
})
}
// GetResolver implements ResolverLookup
func (ns *mpns) GetResolver(subs string) (Resolver, bool) {
res, ok := ns.resolvers[subs]
if ok {
ires, ok := res.(Resolver)
if ok {
return ires, true
}
}
return nil, false
}

View File

@ -58,8 +58,8 @@ func mockResolverTwo() *mockResolver {
func TestNamesysResolution(t *testing.T) {
r := &mpns{
resolvers: map[string]resolver{
"one": mockResolverOne(),
"two": mockResolverTwo(),
"dht": mockResolverOne(),
"dns": mockResolverTwo(),
},
}

430
namesys/pubsub.go Normal file
View File

@ -0,0 +1,430 @@
package namesys
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
pb "github.com/ipfs/go-ipfs/namesys/pb"
path "github.com/ipfs/go-ipfs/path"
dshelp "github.com/ipfs/go-ipfs/thirdparty/ds-help"
cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
routing "gx/ipfs/QmPR2JzfKd9poHx9XBhzoFeBBC31ZM3W5iUPKJZWyaoZZm/go-libp2p-routing"
pstore "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore"
p2phost "gx/ipfs/QmRS46AyqtpJBsf1zmQdeizSDEzo1qkWR7rdEuPFAv8237/go-libp2p-host"
u "gx/ipfs/QmSU6eubNdhXjFBJBSksTp8kv8YRub8mGAPv8tVJHmL2EU/go-ipfs-util"
mh "gx/ipfs/QmU9a9NV9RdPNwZQDYd5uKsm6N6LJLSvLbywDDYFbaaC6P/go-multihash"
floodsub "gx/ipfs/QmVNv1WV6XxzQV4MBuiLX5729wMazaf8TNzm2Sq6ejyHh7/go-libp2p-floodsub"
ds "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore"
dssync "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore/sync"
peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
ci "gx/ipfs/QmaPbCnUMBohSGo3KnxEa2bHqyJVVeEEcwtqJAYxerieBo/go-libp2p-crypto"
record "gx/ipfs/QmbxkgUceEcuSZ4ZdBA3x74VUDSSYjHYmmeEqkjxbtZ6Jg/go-libp2p-record"
dhtpb "gx/ipfs/QmbxkgUceEcuSZ4ZdBA3x74VUDSSYjHYmmeEqkjxbtZ6Jg/go-libp2p-record/pb"
)
// PubsubPublisher is a publisher that distributes IPNS records through pubsub
type PubsubPublisher struct {
ctx context.Context
ds ds.Datastore
host p2phost.Host
cr routing.ContentRouting
ps *floodsub.PubSub
mx sync.Mutex
subs map[string]struct{}
}
// PubsubResolver is a resolver that receives IPNS records through pubsub
type PubsubResolver struct {
ctx context.Context
ds ds.Datastore
host p2phost.Host
cr routing.ContentRouting
pkf routing.PubKeyFetcher
ps *floodsub.PubSub
mx sync.Mutex
subs map[string]*floodsub.Subscription
}
// NewPubsubPublisher constructs a new Publisher that publishes IPNS records through pubsub.
// The constructor interface is complicated by the need to bootstrap the pubsub topic.
// This could be greatly simplified if the pubsub implementation handled bootstrap itself
func NewPubsubPublisher(ctx context.Context, host p2phost.Host, ds ds.Datastore, cr routing.ContentRouting, ps *floodsub.PubSub) *PubsubPublisher {
return &PubsubPublisher{
ctx: ctx,
ds: ds,
host: host, // needed for pubsub bootstrap
cr: cr, // needed for pubsub bootstrap
ps: ps,
subs: make(map[string]struct{}),
}
}
// NewPubsubResolver constructs a new Resolver that resolves IPNS records through pubsub.
// same as above for pubsub bootstrap dependencies
func NewPubsubResolver(ctx context.Context, host p2phost.Host, cr routing.ContentRouting, pkf routing.PubKeyFetcher, ps *floodsub.PubSub) *PubsubResolver {
return &PubsubResolver{
ctx: ctx,
ds: dssync.MutexWrap(ds.NewMapDatastore()),
host: host, // needed for pubsub bootstrap
cr: cr, // needed for pubsub bootstrap
pkf: pkf,
ps: ps,
subs: make(map[string]*floodsub.Subscription),
}
}
// Publish publishes an IPNS record through pubsub with default TTL
func (p *PubsubPublisher) Publish(ctx context.Context, k ci.PrivKey, value path.Path) error {
return p.PublishWithEOL(ctx, k, value, time.Now().Add(DefaultRecordTTL))
}
// PublishWithEOL publishes an IPNS record through pubsub
func (p *PubsubPublisher) PublishWithEOL(ctx context.Context, k ci.PrivKey, value path.Path, eol time.Time) error {
id, err := peer.IDFromPrivateKey(k)
if err != nil {
return err
}
_, ipnskey := IpnsKeysForID(id)
seqno, err := p.getPreviousSeqNo(ctx, ipnskey)
if err != nil {
return err
}
seqno++
return p.publishRecord(ctx, k, value, seqno, eol, ipnskey, id)
}
func (p *PubsubPublisher) getPreviousSeqNo(ctx context.Context, ipnskey string) (uint64, error) {
// the datastore is shared with the routing publisher to properly increment and persist
// ipns record sequence numbers.
prevrec, err := p.ds.Get(dshelp.NewKeyFromBinary([]byte(ipnskey)))
if err != nil {
if err == ds.ErrNotFound {
// None found, lets start at zero!
return 0, nil
}
return 0, err
}
prbytes, ok := prevrec.([]byte)
if !ok {
return 0, fmt.Errorf("unexpected type returned from datastore: %#v", prevrec)
}
var dsrec dhtpb.Record
err = proto.Unmarshal(prbytes, &dsrec)
if err != nil {
return 0, err
}
var entry pb.IpnsEntry
err = proto.Unmarshal(dsrec.GetValue(), &entry)
if err != nil {
return 0, err
}
return entry.GetSequence(), nil
}
func (p *PubsubPublisher) publishRecord(ctx context.Context, k ci.PrivKey, value path.Path, seqno uint64, eol time.Time, ipnskey string, ID peer.ID) error {
entry, err := CreateRoutingEntryData(k, value, seqno, eol)
if err != nil {
return err
}
data, err := proto.Marshal(entry)
if err != nil {
return err
}
// the datastore is shared with the routing publisher to properly increment and persist
// ipns record sequence numbers; so we need to Record our new entry in the datastore
dsrec, err := record.MakePutRecord(k, ipnskey, data, true)
if err != nil {
return err
}
dsdata, err := proto.Marshal(dsrec)
if err != nil {
return err
}
err = p.ds.Put(dshelp.NewKeyFromBinary([]byte(ipnskey)), dsdata)
if err != nil {
return err
}
// now we publish, but we also need to bootstrap pubsub for our messages to propagate
topic := "/ipns/" + ID.Pretty()
p.mx.Lock()
_, ok := p.subs[topic]
if !ok {
p.subs[topic] = struct{}{}
p.mx.Unlock()
bootstrapPubsub(p.ctx, p.cr, p.host, topic)
} else {
p.mx.Unlock()
}
log.Debugf("PubsubPublish: publish IPNS record for %s (%d)", topic, seqno)
return p.ps.Publish(topic, data)
}
// Resolve resolves a name through pubsub and default depth limit
func (r *PubsubResolver) Resolve(ctx context.Context, name string) (path.Path, error) {
return r.ResolveN(ctx, name, DefaultDepthLimit)
}
// ResolveN resolves a name through pubsub with the specified depth limit
func (r *PubsubResolver) ResolveN(ctx context.Context, name string, depth int) (path.Path, error) {
return resolve(ctx, r, name, depth, "/ipns/")
}
func (r *PubsubResolver) resolveOnce(ctx context.Context, name string) (path.Path, error) {
log.Debugf("PubsubResolve: resolve '%s'", name)
// retrieve the public key once (for verifying messages)
xname := strings.TrimPrefix(name, "/ipns/")
hash, err := mh.FromB58String(xname)
if err != nil {
log.Warningf("PubsubResolve: bad input hash: [%s]", xname)
return "", err
}
id := peer.ID(hash)
if r.host.Peerstore().PrivKey(id) != nil {
return "", errors.New("Cannot resolve own name through pubsub")
}
pubk := id.ExtractPublicKey()
if pubk == nil {
pubk, err = r.pkf.GetPublicKey(ctx, id)
if err != nil {
log.Warningf("PubsubResolve: error fetching public key: %s [%s]", err.Error(), xname)
return "", err
}
}
// the topic is /ipns/Qmhash
if !strings.HasPrefix(name, "/ipns/") {
name = "/ipns/" + name
}
r.mx.Lock()
// see if we already have a pubsub subscription; if not, subscribe
sub, ok := r.subs[name]
if !ok {
sub, err = r.ps.Subscribe(name)
if err != nil {
r.mx.Unlock()
return "", err
}
log.Debugf("PubsubResolve: subscribed to %s", name)
r.subs[name] = sub
ctx, cancel := context.WithCancel(r.ctx)
go r.handleSubscription(sub, name, pubk, cancel)
go bootstrapPubsub(ctx, r.cr, r.host, name)
}
r.mx.Unlock()
// resolve to what we may already have in the datastore
dsval, err := r.ds.Get(dshelp.NewKeyFromBinary([]byte(name)))
if err != nil {
if err == ds.ErrNotFound {
return "", ErrResolveFailed
}
return "", err
}
data := dsval.([]byte)
entry := new(pb.IpnsEntry)
err = proto.Unmarshal(data, entry)
if err != nil {
return "", err
}
// check EOL; if the entry has expired, delete from datastore and return ds.ErrNotFound
eol, ok := checkEOL(entry)
if ok && eol.Before(time.Now()) {
err = r.ds.Delete(dshelp.NewKeyFromBinary([]byte(name)))
if err != nil {
log.Warningf("PubsubResolve: error deleting stale value for %s: %s", name, err.Error())
}
return "", ErrResolveFailed
}
value, err := path.ParsePath(string(entry.GetValue()))
return value, err
}
// GetSubscriptions retrieves a list of active topic subscriptions
func (r *PubsubResolver) GetSubscriptions() []string {
r.mx.Lock()
defer r.mx.Unlock()
var res []string
for sub := range r.subs {
res = append(res, sub)
}
return res
}
// Cancel cancels a topic subscription; returns true if an active
// subscription was canceled
func (r *PubsubResolver) Cancel(name string) bool {
r.mx.Lock()
defer r.mx.Unlock()
sub, ok := r.subs[name]
if ok {
sub.Cancel()
delete(r.subs, name)
}
return ok
}
func (r *PubsubResolver) handleSubscription(sub *floodsub.Subscription, name string, pubk ci.PubKey, cancel func()) {
defer sub.Cancel()
defer cancel()
for {
msg, err := sub.Next(r.ctx)
if err != nil {
if err != context.Canceled {
log.Warningf("PubsubResolve: subscription error in %s: %s", name, err.Error())
}
return
}
err = r.receive(msg, name, pubk)
if err != nil {
log.Warningf("PubsubResolve: error proessing update for %s: %s", name, err.Error())
}
}
}
func (r *PubsubResolver) receive(msg *floodsub.Message, name string, pubk ci.PubKey) error {
data := msg.GetData()
if data == nil {
return errors.New("empty message")
}
entry := new(pb.IpnsEntry)
err := proto.Unmarshal(data, entry)
if err != nil {
return err
}
ok, err := pubk.Verify(ipnsEntryDataForSig(entry), entry.GetSignature())
if err != nil || !ok {
return errors.New("signature verification failed")
}
_, err = path.ParsePath(string(entry.GetValue()))
if err != nil {
return err
}
eol, ok := checkEOL(entry)
if ok && eol.Before(time.Now()) {
return errors.New("stale update; EOL exceeded")
}
// check the sequence number against what we may already have in our datastore
oval, err := r.ds.Get(dshelp.NewKeyFromBinary([]byte(name)))
if err == nil {
odata := oval.([]byte)
oentry := new(pb.IpnsEntry)
err = proto.Unmarshal(odata, oentry)
if err != nil {
return err
}
if entry.GetSequence() <= oentry.GetSequence() {
return errors.New("stale update; sequence number too small")
}
}
log.Debugf("PubsubResolve: receive IPNS record for %s", name)
return r.ds.Put(dshelp.NewKeyFromBinary([]byte(name)), data)
}
// rendezvous with peers in the name topic through provider records
// Note: rendezbous/boostrap should really be handled by the pubsub implementation itself!
func bootstrapPubsub(ctx context.Context, cr routing.ContentRouting, host p2phost.Host, name string) {
topic := "floodsub:" + name
hash := u.Hash([]byte(topic))
rz := cid.NewCidV1(cid.Raw, hash)
err := cr.Provide(ctx, rz, true)
if err != nil {
log.Warningf("bootstrapPubsub: error providing rendezvous for %s: %s", topic, err.Error())
}
go func() {
for {
select {
case <-time.After(8 * time.Hour):
err := cr.Provide(ctx, rz, true)
if err != nil {
log.Warningf("bootstrapPubsub: error providing rendezvous for %s: %s", topic, err.Error())
}
case <-ctx.Done():
return
}
}
}()
rzctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
wg := &sync.WaitGroup{}
for pi := range cr.FindProvidersAsync(rzctx, rz, 10) {
if pi.ID == host.ID() {
continue
}
wg.Add(1)
go func(pi pstore.PeerInfo) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
err := host.Connect(ctx, pi)
if err != nil {
log.Debugf("Error connecting to pubsub peer %s: %s", pi.ID, err.Error())
return
}
// delay to let pubsub perform its handshake
time.Sleep(time.Millisecond * 250)
log.Debugf("Connected to pubsub peer %s", pi.ID)
}(pi)
}
wg.Wait()
}

187
namesys/pubsub_test.go Normal file
View File

@ -0,0 +1,187 @@
package namesys
import (
"context"
"sync"
"testing"
"time"
path "github.com/ipfs/go-ipfs/path"
mockrouting "github.com/ipfs/go-ipfs/routing/mock"
routing "gx/ipfs/QmPR2JzfKd9poHx9XBhzoFeBBC31ZM3W5iUPKJZWyaoZZm/go-libp2p-routing"
pstore "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore"
testutil "gx/ipfs/QmQgLZP9haZheimMHqqAjJh2LhRmNfEoZDfbtkpeMhi9xK/go-testutil"
p2phost "gx/ipfs/QmRS46AyqtpJBsf1zmQdeizSDEzo1qkWR7rdEuPFAv8237/go-libp2p-host"
netutil "gx/ipfs/QmUUNDRYXgfqdjxTg79ogkciczU5y4WY1tKMU2vEX9CRN7/go-libp2p-netutil"
floodsub "gx/ipfs/QmVNv1WV6XxzQV4MBuiLX5729wMazaf8TNzm2Sq6ejyHh7/go-libp2p-floodsub"
ds "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore"
peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
ci "gx/ipfs/QmaPbCnUMBohSGo3KnxEa2bHqyJVVeEEcwtqJAYxerieBo/go-libp2p-crypto"
bhost "gx/ipfs/Qmb37wDRoh9VZMZXmmZktN35szvj9GeBYDtA9giDmXwwd7/go-libp2p-blankhost"
)
func newNetHost(ctx context.Context, t *testing.T) p2phost.Host {
netw := netutil.GenSwarmNetwork(t, ctx)
return bhost.NewBlankHost(netw)
}
func newNetHosts(ctx context.Context, t *testing.T, n int) []p2phost.Host {
var out []p2phost.Host
for i := 0; i < n; i++ {
h := newNetHost(ctx, t)
out = append(out, h)
}
return out
}
// PubKeyFetcher implementation with a global key store
type mockKeyStore struct {
keys map[peer.ID]ci.PubKey
mx sync.Mutex
}
func (m *mockKeyStore) addPubKey(id peer.ID, pkey ci.PubKey) {
m.mx.Lock()
defer m.mx.Unlock()
m.keys[id] = pkey
}
func (m *mockKeyStore) getPubKey(id peer.ID) (ci.PubKey, error) {
m.mx.Lock()
defer m.mx.Unlock()
pkey, ok := m.keys[id]
if ok {
return pkey, nil
}
return nil, routing.ErrNotFound
}
func (m *mockKeyStore) GetPublicKey(ctx context.Context, id peer.ID) (ci.PubKey, error) {
return m.getPubKey(id)
}
func newMockKeyStore() *mockKeyStore {
return &mockKeyStore{
keys: make(map[peer.ID]ci.PubKey),
}
}
// ConentRouting mock
func newMockRouting(ms mockrouting.Server, ks *mockKeyStore, host p2phost.Host) routing.ContentRouting {
id := host.ID()
privk := host.Peerstore().PrivKey(id)
pubk := host.Peerstore().PubKey(id)
pi := host.Peerstore().PeerInfo(id)
ks.addPubKey(id, pubk)
return ms.Client(testutil.NewIdentity(id, pi.Addrs[0], privk, pubk))
}
func newMockRoutingForHosts(ms mockrouting.Server, ks *mockKeyStore, hosts []p2phost.Host) []routing.ContentRouting {
rs := make([]routing.ContentRouting, len(hosts))
for i := 0; i < len(hosts); i++ {
rs[i] = newMockRouting(ms, ks, hosts[i])
}
return rs
}
// tests
func TestPubsubPublishSubscribe(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ms := mockrouting.NewServer()
ks := newMockKeyStore()
pubhost := newNetHost(ctx, t)
pubmr := newMockRouting(ms, ks, pubhost)
pub := NewPubsubPublisher(ctx, pubhost, ds.NewMapDatastore(), pubmr, floodsub.NewFloodSub(ctx, pubhost))
privk := pubhost.Peerstore().PrivKey(pubhost.ID())
pubpinfo := pstore.PeerInfo{ID: pubhost.ID(), Addrs: pubhost.Addrs()}
name := "/ipns/" + pubhost.ID().Pretty()
reshosts := newNetHosts(ctx, t, 5)
resmrs := newMockRoutingForHosts(ms, ks, reshosts)
res := make([]*PubsubResolver, len(reshosts))
for i := 0; i < len(res); i++ {
res[i] = NewPubsubResolver(ctx, reshosts[i], resmrs[i], ks, floodsub.NewFloodSub(ctx, reshosts[i]))
if err := reshosts[i].Connect(ctx, pubpinfo); err != nil {
t.Fatal(err)
}
}
time.Sleep(time.Millisecond * 100)
for i := 0; i < len(res); i++ {
checkResolveNotFound(ctx, t, i, res[i], name)
// delay to avoid connection storms
time.Sleep(time.Millisecond * 100)
}
// let the bootstrap finish
time.Sleep(time.Second * 1)
val := path.Path("/ipfs/QmP1DfoUjiWH2ZBo1PBH6FupdBucbDepx3HpWmEY6JMUpY")
err := pub.Publish(ctx, privk, val)
if err != nil {
t.Fatal(err)
}
// let the flood propagate
time.Sleep(time.Second * 1)
for i := 0; i < len(res); i++ {
checkResolve(ctx, t, i, res[i], name, val)
}
val = path.Path("/ipfs/QmP1wMAqk6aZYRZirbaAwmrNeqFRgQrwBt3orUtvSa1UYD")
err = pub.Publish(ctx, privk, val)
if err != nil {
t.Fatal(err)
}
// let the flood propagate
time.Sleep(time.Second * 1)
for i := 0; i < len(res); i++ {
checkResolve(ctx, t, i, res[i], name, val)
}
// cancel subscriptions
for i := 0; i < len(res); i++ {
res[i].Cancel(name)
}
time.Sleep(time.Millisecond * 100)
nval := path.Path("/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr")
err = pub.Publish(ctx, privk, nval)
if err != nil {
t.Fatal(err)
}
// check we still have the old value in the resolver
time.Sleep(time.Second * 1)
for i := 0; i < len(res); i++ {
checkResolve(ctx, t, i, res[i], name, val)
}
}
func checkResolveNotFound(ctx context.Context, t *testing.T, i int, resolver Resolver, name string) {
_, err := resolver.Resolve(ctx, name)
if err != ErrResolveFailed {
t.Fatalf("[resolver %d] unexpected error: %s", i, err.Error())
}
}
func checkResolve(ctx context.Context, t *testing.T, i int, resolver Resolver, name string, val path.Path) {
xval, err := resolver.Resolve(ctx, name)
if err != nil {
t.Fatalf("[resolver %d] resolve failed: %s", i, err.Error())
}
if xval != val {
t.Fatalf("[resolver %d] unexpected value: %s %s", i, val, xval)
}
}

View File

@ -0,0 +1,80 @@
#!/bin/sh
test_description="Test IPNS pubsub"
. lib/test-lib.sh
# start iptb + wait for peering
NUM_NODES=5
test_expect_success 'init iptb' '
iptb init -n $NUM_NODES --bootstrap=none --port=0
'
startup_cluster $NUM_NODES --enable-namesys-pubsub
test_expect_success 'peer ids' '
PEERID_0=$(iptb get id 0)
'
test_expect_success 'check namesys pubsub state' '
echo enabled > expected &&
ipfsi 0 name pubsub state > state0 &&
ipfsi 1 name pubsub state > state1 &&
ipfsi 2 name pubsub state > state2 &&
test_cmp expected state0 &&
test_cmp expected state1 &&
test_cmp expected state2
'
test_expect_success 'subscribe nodes to the publisher topic' '
ipfsi 1 name resolve /ipns/$PEERID_0 &&
ipfsi 2 name resolve /ipns/$PEERID_0
'
test_expect_success 'check subscriptions' '
echo /ipns/$PEERID_0 > expected &&
ipfsi 1 name pubsub subs > subs1 &&
ipfsi 2 name pubsub subs > subs2 &&
test_cmp expected subs1 &&
test_cmp expected subs2
'
test_expect_success 'add an obect on publisher node' '
echo "ipns is super fun" > file &&
HASH_FILE=$(ipfsi 0 add -q file)
'
test_expect_success 'publish that object as an ipns entry' '
ipfsi 0 name publish $HASH_FILE
'
test_expect_success 'wait for the flood' '
sleep 1
'
test_expect_success 'resolve name in subscriber nodes' '
echo "/ipfs/$HASH_FILE" > expected &&
ipfsi 1 name resolve /ipns/$PEERID_0 > name1 &&
ipfsi 2 name resolve /ipns/$PEERID_0 > name2 &&
test_cmp expected name1 &&
test_cmp expected name2
'
test_expect_success 'cancel subscriptions to the publisher topic' '
ipfsi 1 name pubsub cancel /ipns/$PEERID_0 &&
ipfsi 2 name pubsub cancel /ipns/$PEERID_0
'
test_expect_success 'check subscriptions' '
rm -f expected && touch expected &&
ipfsi 1 name pubsub subs > subs1 &&
ipfsi 2 name pubsub subs > subs2 &&
test_cmp expected subs1 &&
test_cmp expected subs2
'
test_expect_success "shut down iptb" '
iptb stop
'
test_done