mirror of
https://github.com/ipfs/kubo.git
synced 2025-09-10 05:52:20 +08:00
Implement ipns republisher
This commit adds a very basic process that will periodically go through a list of given ids and republish the values for their ipns entries. License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com>
This commit is contained in:
@ -14,6 +14,7 @@ import (
|
||||
dag "github.com/ipfs/go-ipfs/merkledag"
|
||||
pb "github.com/ipfs/go-ipfs/namesys/pb"
|
||||
ci "github.com/ipfs/go-ipfs/p2p/crypto"
|
||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||
path "github.com/ipfs/go-ipfs/path"
|
||||
pin "github.com/ipfs/go-ipfs/pin"
|
||||
routing "github.com/ipfs/go-ipfs/routing"
|
||||
@ -30,6 +31,8 @@ var ErrExpiredRecord = errors.New("expired record")
|
||||
// unknown validity type.
|
||||
var ErrUnrecognizedValidity = errors.New("unrecognized validity type")
|
||||
|
||||
var PublishPutValTimeout = time.Minute
|
||||
|
||||
// ipnsPublisher is capable of publishing and resolving names to the IPFS
|
||||
// routing system.
|
||||
type ipnsPublisher struct {
|
||||
@ -37,7 +40,7 @@ type ipnsPublisher struct {
|
||||
}
|
||||
|
||||
// NewRoutingPublisher constructs a publisher for the IPFS Routing name system.
|
||||
func NewRoutingPublisher(route routing.IpfsRouting) Publisher {
|
||||
func NewRoutingPublisher(route routing.IpfsRouting) *ipnsPublisher {
|
||||
return &ipnsPublisher{routing: route}
|
||||
}
|
||||
|
||||
@ -45,16 +48,19 @@ func NewRoutingPublisher(route routing.IpfsRouting) Publisher {
|
||||
// and publishes it out to the routing system
|
||||
func (p *ipnsPublisher) Publish(ctx context.Context, k ci.PrivKey, value path.Path) error {
|
||||
log.Debugf("Publish %s", value)
|
||||
return p.PublishWithEOL(ctx, k, value, time.Now().Add(time.Hour*24))
|
||||
}
|
||||
|
||||
pubkey := k.GetPublic()
|
||||
pkbytes, err := pubkey.Bytes()
|
||||
// PublishWithEOL is a temporary stand in for the ipns records implementation
|
||||
// see here for more details: https://github.com/ipfs/specs/tree/master/records
|
||||
func (p *ipnsPublisher) PublishWithEOL(ctx context.Context, k ci.PrivKey, value path.Path, eol time.Time) error {
|
||||
|
||||
id, err := peer.IDFromPrivateKey(k)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
nameb := u.Hash(pkbytes)
|
||||
namekey := key.Key("/pk/" + string(nameb))
|
||||
ipnskey := key.Key("/ipns/" + string(nameb))
|
||||
namekey, ipnskey := IpnsKeysForID(id)
|
||||
|
||||
// get previous records sequence number, and add one to it
|
||||
var seqnum uint64
|
||||
@ -71,46 +77,75 @@ func (p *ipnsPublisher) Publish(ctx context.Context, k ci.PrivKey, value path.Pa
|
||||
return err
|
||||
}
|
||||
|
||||
data, err := createRoutingEntryData(k, value, seqnum)
|
||||
entry, err := CreateRoutingEntryData(k, value, seqnum, eol)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debugf("Storing pubkey at: %s", namekey)
|
||||
// Store associated public key
|
||||
timectx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Second*10))
|
||||
defer cancel()
|
||||
err = p.routing.PutValue(timectx, namekey, pkbytes)
|
||||
err = PublishEntry(ctx, p.routing, ipnskey, entry)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debugf("Storing ipns entry at: %s", ipnskey)
|
||||
// Store ipns entry at "/ipns/"+b58(h(pubkey))
|
||||
timectx, cancel = context.WithDeadline(ctx, time.Now().Add(time.Second*10))
|
||||
defer cancel()
|
||||
if err := p.routing.PutValue(timectx, ipnskey, data); err != nil {
|
||||
err = PublishPublicKey(ctx, p.routing, namekey, k.GetPublic())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func createRoutingEntryData(pk ci.PrivKey, val path.Path, seq uint64) ([]byte, error) {
|
||||
func PublishPublicKey(ctx context.Context, r routing.IpfsRouting, k key.Key, pubk ci.PubKey) error {
|
||||
log.Debugf("Storing pubkey at: %s", k)
|
||||
pkbytes, err := pubk.Bytes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Store associated public key
|
||||
timectx, cancel := context.WithTimeout(ctx, PublishPutValTimeout)
|
||||
defer cancel()
|
||||
err = r.PutValue(timectx, k, pkbytes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func PublishEntry(ctx context.Context, r routing.IpfsRouting, ipnskey key.Key, rec *pb.IpnsEntry) error {
|
||||
timectx, cancel := context.WithTimeout(ctx, PublishPutValTimeout)
|
||||
defer cancel()
|
||||
|
||||
data, err := proto.Marshal(rec)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debugf("Storing ipns entry at: %s", ipnskey)
|
||||
// Store ipns entry at "/ipns/"+b58(h(pubkey))
|
||||
if err := r.PutValue(timectx, ipnskey, data); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func CreateRoutingEntryData(pk ci.PrivKey, val path.Path, seq uint64, eol time.Time) (*pb.IpnsEntry, error) {
|
||||
entry := new(pb.IpnsEntry)
|
||||
|
||||
entry.Value = []byte(val)
|
||||
typ := pb.IpnsEntry_EOL
|
||||
entry.ValidityType = &typ
|
||||
entry.Sequence = proto.Uint64(seq)
|
||||
entry.Validity = []byte(u.FormatRFC3339(time.Now().Add(time.Hour * 24)))
|
||||
entry.Validity = []byte(u.FormatRFC3339(eol))
|
||||
|
||||
sig, err := pk.Sign(ipnsEntryDataForSig(entry))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
entry.Signature = sig
|
||||
return proto.Marshal(entry)
|
||||
return entry, nil
|
||||
}
|
||||
|
||||
func ipnsEntryDataForSig(e *pb.IpnsEntry) []byte {
|
||||
@ -226,3 +261,10 @@ func InitializeKeyspace(ctx context.Context, ds dag.DAGService, pub Publisher, p
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func IpnsKeysForID(id peer.ID) (name, ipns key.Key) {
|
||||
namekey := key.Key("/pk/" + id)
|
||||
ipnskey := key.Key("/ipns/" + id)
|
||||
|
||||
return namekey, ipnskey
|
||||
}
|
||||
|
142
namesys/republisher/repub.go
Normal file
142
namesys/republisher/repub.go
Normal file
@ -0,0 +1,142 @@
|
||||
package republisher
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||
namesys "github.com/ipfs/go-ipfs/namesys"
|
||||
pb "github.com/ipfs/go-ipfs/namesys/pb"
|
||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||
path "github.com/ipfs/go-ipfs/path"
|
||||
"github.com/ipfs/go-ipfs/routing"
|
||||
dhtpb "github.com/ipfs/go-ipfs/routing/dht/pb"
|
||||
|
||||
proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
|
||||
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||
goprocess "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
|
||||
gpctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
|
||||
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
logging "github.com/ipfs/go-ipfs/vendor/go-log-v1.0.0"
|
||||
)
|
||||
|
||||
var errNoEntry = errors.New("no previous entry")
|
||||
|
||||
var log = logging.Logger("ipns-repub")
|
||||
|
||||
var DefaultRebroadcastInterval = time.Hour * 4
|
||||
|
||||
const DefaultRecordLifetime = time.Hour * 24
|
||||
|
||||
type Republisher struct {
|
||||
r routing.IpfsRouting
|
||||
ds ds.Datastore
|
||||
ps peer.Peerstore
|
||||
|
||||
Interval time.Duration
|
||||
|
||||
// how long records that are republished should be valid for
|
||||
RecordLifetime time.Duration
|
||||
|
||||
entrylock sync.Mutex
|
||||
entries map[peer.ID]struct{}
|
||||
}
|
||||
|
||||
func NewRepublisher(r routing.IpfsRouting, ds ds.Datastore, ps peer.Peerstore) *Republisher {
|
||||
return &Republisher{
|
||||
r: r,
|
||||
ps: ps,
|
||||
ds: ds,
|
||||
entries: make(map[peer.ID]struct{}),
|
||||
Interval: DefaultRebroadcastInterval,
|
||||
RecordLifetime: DefaultRecordLifetime,
|
||||
}
|
||||
}
|
||||
|
||||
func (rp *Republisher) AddName(id peer.ID) {
|
||||
rp.entrylock.Lock()
|
||||
defer rp.entrylock.Unlock()
|
||||
rp.entries[id] = struct{}{}
|
||||
}
|
||||
|
||||
func (rp *Republisher) Run(proc goprocess.Process) {
|
||||
tick := time.NewTicker(rp.Interval)
|
||||
defer tick.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-tick.C:
|
||||
err := rp.republishEntries(proc)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
case <-proc.Closing():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (rp *Republisher) republishEntries(p goprocess.Process) error {
|
||||
ctx, cancel := context.WithCancel(gpctx.OnClosingContext(p))
|
||||
defer cancel()
|
||||
|
||||
for id, _ := range rp.entries {
|
||||
log.Debugf("republishing ipns entry for %s", id)
|
||||
priv := rp.ps.PrivKey(id)
|
||||
|
||||
// Look for it locally only
|
||||
namekey, ipnskey := namesys.IpnsKeysForID(id)
|
||||
p, seq, err := rp.getLastVal(ipnskey)
|
||||
if err != nil {
|
||||
if err == errNoEntry {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// update record with same sequence number
|
||||
eol := time.Now().Add(rp.RecordLifetime)
|
||||
entry, err := namesys.CreateRoutingEntryData(priv, p, seq, eol)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// republish public key
|
||||
err = namesys.PublishPublicKey(ctx, rp.r, namekey, priv.GetPublic())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// republish ipns entry
|
||||
err = namesys.PublishEntry(ctx, rp.r, ipnskey, entry)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rp *Republisher) getLastVal(k key.Key) (path.Path, uint64, error) {
|
||||
ival, err := rp.ds.Get(k.DsKey())
|
||||
if err != nil {
|
||||
// not found means we dont have a previously published entry
|
||||
return "", 0, errNoEntry
|
||||
}
|
||||
|
||||
val := ival.([]byte)
|
||||
dhtrec := new(dhtpb.Record)
|
||||
err = proto.Unmarshal(val, dhtrec)
|
||||
if err != nil {
|
||||
return "", 0, err
|
||||
}
|
||||
|
||||
// extract published data from record
|
||||
e := new(pb.IpnsEntry)
|
||||
err = proto.Unmarshal(dhtrec.GetValue(), e)
|
||||
if err != nil {
|
||||
return "", 0, err
|
||||
}
|
||||
return path.Path(e.Value), e.GetSequence(), nil
|
||||
}
|
120
namesys/republisher/repub_test.go
Normal file
120
namesys/republisher/repub_test.go
Normal file
@ -0,0 +1,120 @@
|
||||
package republisher_test
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
goprocess "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
|
||||
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
|
||||
"github.com/ipfs/go-ipfs/core"
|
||||
mock "github.com/ipfs/go-ipfs/core/mock"
|
||||
namesys "github.com/ipfs/go-ipfs/namesys"
|
||||
. "github.com/ipfs/go-ipfs/namesys/republisher"
|
||||
mocknet "github.com/ipfs/go-ipfs/p2p/net/mock"
|
||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||
path "github.com/ipfs/go-ipfs/path"
|
||||
)
|
||||
|
||||
func TestRepublish(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// create network
|
||||
mn := mocknet.New(ctx)
|
||||
|
||||
var nodes []*core.IpfsNode
|
||||
for i := 0; i < 10; i++ {
|
||||
nd, err := core.NewNode(ctx, &core.BuildCfg{
|
||||
Online: true,
|
||||
Host: mock.MockHostOption(mn),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
nodes = append(nodes, nd)
|
||||
}
|
||||
|
||||
mn.LinkAll()
|
||||
|
||||
bsinf := core.BootstrapConfigWithPeers(
|
||||
[]peer.PeerInfo{
|
||||
nodes[0].Peerstore.PeerInfo(nodes[0].Identity),
|
||||
},
|
||||
)
|
||||
|
||||
for _, n := range nodes[1:] {
|
||||
if err := n.Bootstrap(bsinf); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// have one node publish a record that is valid for 1 second
|
||||
publisher := nodes[3]
|
||||
p := path.FromString("/ipfs/QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn") // does not need to be valid
|
||||
rp := namesys.NewRoutingPublisher(publisher.Routing)
|
||||
err := rp.PublishWithEOL(ctx, publisher.PrivateKey, p, time.Now().Add(time.Second))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
name := "/ipns/" + publisher.Identity.Pretty()
|
||||
if err := verifyResolution(nodes, name, p); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Now wait a second, the records will be invalid and we should fail to resolve
|
||||
time.Sleep(time.Second)
|
||||
if err := verifyResolutionFails(nodes, name); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// The republishers that are contained within the nodes have their timeout set
|
||||
// to 12 hours. Instead of trying to tweak those, we're just going to pretend
|
||||
// they dont exist and make our own.
|
||||
repub := NewRepublisher(publisher.Routing, publisher.Repo.Datastore(), publisher.Peerstore)
|
||||
repub.Interval = time.Second
|
||||
repub.RecordLifetime = time.Second * 5
|
||||
repub.AddName(publisher.Identity)
|
||||
|
||||
proc := goprocess.Go(repub.Run)
|
||||
defer proc.Close()
|
||||
|
||||
// now wait a couple seconds for it to fire
|
||||
time.Sleep(time.Second * 2)
|
||||
|
||||
// we should be able to resolve them now
|
||||
if err := verifyResolution(nodes, name, p); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func verifyResolution(nodes []*core.IpfsNode, key string, exp path.Path) error {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
for _, n := range nodes {
|
||||
val, err := n.Namesys.Resolve(ctx, key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if val != exp {
|
||||
return errors.New("resolved wrong record")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func verifyResolutionFails(nodes []*core.IpfsNode, key string) error {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
for _, n := range nodes {
|
||||
_, err := n.Namesys.Resolve(ctx, key)
|
||||
if err == nil {
|
||||
return errors.New("expected resolution to fail")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
Reference in New Issue
Block a user