mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-30 18:13:54 +08:00
Merge pull request #3951 from ipfs/kevina/repub
Make sure non-self keys also get republished
This commit is contained in:
@ -370,8 +370,7 @@ func (n *IpfsNode) setupIpnsRepublisher() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
n.IpnsRepub = ipnsrp.NewRepublisher(n.Routing, n.Repo.Datastore(), n.Peerstore)
|
n.IpnsRepub = ipnsrp.NewRepublisher(n.Routing, n.Repo.Datastore(), n.PrivateKey, n.Repo.Keystore())
|
||||||
n.IpnsRepub.AddName(n.Identity)
|
|
||||||
|
|
||||||
if cfg.Ipns.RepublishPeriod != "" {
|
if cfg.Ipns.RepublishPeriod != "" {
|
||||||
d, err := time.ParseDuration(cfg.Ipns.RepublishPeriod)
|
d, err := time.ParseDuration(cfg.Ipns.RepublishPeriod)
|
||||||
|
@ -3,21 +3,21 @@ package republisher
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
keystore "github.com/ipfs/go-ipfs/keystore"
|
||||||
namesys "github.com/ipfs/go-ipfs/namesys"
|
namesys "github.com/ipfs/go-ipfs/namesys"
|
||||||
pb "github.com/ipfs/go-ipfs/namesys/pb"
|
pb "github.com/ipfs/go-ipfs/namesys/pb"
|
||||||
path "github.com/ipfs/go-ipfs/path"
|
path "github.com/ipfs/go-ipfs/path"
|
||||||
dshelp "github.com/ipfs/go-ipfs/thirdparty/ds-help"
|
dshelp "github.com/ipfs/go-ipfs/thirdparty/ds-help"
|
||||||
|
|
||||||
routing "gx/ipfs/QmNdaQ8itUU9jEZUwTsG4gHMaPmRfi6FEe89QjQAFbep3M/go-libp2p-routing"
|
routing "gx/ipfs/QmNdaQ8itUU9jEZUwTsG4gHMaPmRfi6FEe89QjQAFbep3M/go-libp2p-routing"
|
||||||
|
ic "gx/ipfs/QmP1DfoUjiWH2ZBo1PBH6FupdBucbDepx3HpWmEY6JMUpY/go-libp2p-crypto"
|
||||||
ds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
|
ds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
|
||||||
goprocess "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
|
goprocess "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
|
||||||
gpctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
|
gpctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
|
||||||
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
|
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
|
||||||
recpb "gx/ipfs/QmWYCqr6UDqqD1bfRybaAPtbAqcN3TSJpveaBXMwbQ3ePZ/go-libp2p-record/pb"
|
recpb "gx/ipfs/QmWYCqr6UDqqD1bfRybaAPtbAqcN3TSJpveaBXMwbQ3ePZ/go-libp2p-record/pb"
|
||||||
pstore "gx/ipfs/QmXZSd1qR5BxZkPyuwfT5jpqQFScZccoZvDneXsKzCNHWX/go-libp2p-peerstore"
|
|
||||||
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
|
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
|
||||||
peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
|
peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
|
||||||
)
|
)
|
||||||
@ -31,36 +31,29 @@ var DefaultRebroadcastInterval = time.Hour * 4
|
|||||||
const DefaultRecordLifetime = time.Hour * 24
|
const DefaultRecordLifetime = time.Hour * 24
|
||||||
|
|
||||||
type Republisher struct {
|
type Republisher struct {
|
||||||
r routing.ValueStore
|
r routing.ValueStore
|
||||||
ds ds.Datastore
|
ds ds.Datastore
|
||||||
ps pstore.Peerstore
|
self ic.PrivKey
|
||||||
|
ks keystore.Keystore
|
||||||
|
|
||||||
Interval time.Duration
|
Interval time.Duration
|
||||||
|
|
||||||
// how long records that are republished should be valid for
|
// how long records that are republished should be valid for
|
||||||
RecordLifetime time.Duration
|
RecordLifetime time.Duration
|
||||||
|
|
||||||
entrylock sync.Mutex
|
|
||||||
entries map[peer.ID]struct{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRepublisher(r routing.ValueStore, ds ds.Datastore, ps pstore.Peerstore) *Republisher {
|
// NewRepublisher creates a new Republisher
|
||||||
|
func NewRepublisher(r routing.ValueStore, ds ds.Datastore, self ic.PrivKey, ks keystore.Keystore) *Republisher {
|
||||||
return &Republisher{
|
return &Republisher{
|
||||||
r: r,
|
r: r,
|
||||||
ps: ps,
|
|
||||||
ds: ds,
|
ds: ds,
|
||||||
entries: make(map[peer.ID]struct{}),
|
self: self,
|
||||||
|
ks: ks,
|
||||||
Interval: DefaultRebroadcastInterval,
|
Interval: DefaultRebroadcastInterval,
|
||||||
RecordLifetime: DefaultRecordLifetime,
|
RecordLifetime: DefaultRecordLifetime,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *Republisher) AddName(id peer.ID) {
|
|
||||||
rp.entrylock.Lock()
|
|
||||||
defer rp.entrylock.Unlock()
|
|
||||||
rp.entries[id] = struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rp *Republisher) Run(proc goprocess.Process) {
|
func (rp *Republisher) Run(proc goprocess.Process) {
|
||||||
tick := time.NewTicker(rp.Interval)
|
tick := time.NewTicker(rp.Interval)
|
||||||
defer tick.Stop()
|
defer tick.Stop()
|
||||||
@ -82,31 +75,61 @@ func (rp *Republisher) republishEntries(p goprocess.Process) error {
|
|||||||
ctx, cancel := context.WithCancel(gpctx.OnClosingContext(p))
|
ctx, cancel := context.WithCancel(gpctx.OnClosingContext(p))
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
for id, _ := range rp.entries {
|
err := rp.republishEntry(ctx, rp.self)
|
||||||
log.Debugf("republishing ipns entry for %s", id)
|
if err != nil {
|
||||||
priv := rp.ps.PrivKey(id)
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Look for it locally only
|
if rp.ks != nil {
|
||||||
_, ipnskey := namesys.IpnsKeysForID(id)
|
keyNames, err := rp.ks.List()
|
||||||
p, seq, err := rp.getLastVal(ipnskey)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == errNoEntry {
|
return err
|
||||||
continue
|
}
|
||||||
|
for _, name := range keyNames {
|
||||||
|
priv, err := rp.ks.Get(name)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = rp.republishEntry(ctx, priv)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// update record with same sequence number
|
|
||||||
eol := time.Now().Add(rp.RecordLifetime)
|
|
||||||
err = namesys.PutRecordToRouting(ctx, priv, p, seq, eol, rp.r, id)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (rp *Republisher) republishEntry(ctx context.Context, priv ic.PrivKey) error {
|
||||||
|
id, err := peer.IDFromPrivateKey(priv)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf("republishing ipns entry for %s", id)
|
||||||
|
|
||||||
|
// Look for it locally only
|
||||||
|
_, ipnskey := namesys.IpnsKeysForID(id)
|
||||||
|
p, seq, err := rp.getLastVal(ipnskey)
|
||||||
|
if err != nil {
|
||||||
|
if err == errNoEntry {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// update record with same sequence number
|
||||||
|
eol := time.Now().Add(rp.RecordLifetime)
|
||||||
|
err = namesys.PutRecordToRouting(ctx, priv, p, seq, eol, rp.r, id)
|
||||||
|
if err != nil {
|
||||||
|
println("put record to routing error: " + err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (rp *Republisher) getLastVal(k string) (path.Path, uint64, error) {
|
func (rp *Republisher) getLastVal(k string) (path.Path, uint64, error) {
|
||||||
ival, err := rp.ds.Get(dshelp.NewKeyFromBinary([]byte(k)))
|
ival, err := rp.ds.Get(dshelp.NewKeyFromBinary([]byte(k)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -78,10 +78,9 @@ func TestRepublish(t *testing.T) {
|
|||||||
// The republishers that are contained within the nodes have their timeout set
|
// The republishers that are contained within the nodes have their timeout set
|
||||||
// to 12 hours. Instead of trying to tweak those, we're just going to pretend
|
// to 12 hours. Instead of trying to tweak those, we're just going to pretend
|
||||||
// they dont exist and make our own.
|
// they dont exist and make our own.
|
||||||
repub := NewRepublisher(publisher.Routing, publisher.Repo.Datastore(), publisher.Peerstore)
|
repub := NewRepublisher(publisher.Routing, publisher.Repo.Datastore(), publisher.PrivateKey, publisher.Repo.Keystore())
|
||||||
repub.Interval = time.Second
|
repub.Interval = time.Second
|
||||||
repub.RecordLifetime = time.Second * 5
|
repub.RecordLifetime = time.Second * 5
|
||||||
repub.AddName(publisher.Identity)
|
|
||||||
|
|
||||||
proc := goprocess.Go(repub.Run)
|
proc := goprocess.Go(repub.Run)
|
||||||
defer proc.Close()
|
defer proc.Close()
|
||||||
|
@ -94,6 +94,29 @@ go-sleep 15s
|
|||||||
|
|
||||||
verify_can_resolve "$num_test_nodes" "$id" "$HASH" "republisher fires after twenty seconds"
|
verify_can_resolve "$num_test_nodes" "$id" "$HASH" "republisher fires after twenty seconds"
|
||||||
|
|
||||||
|
#
|
||||||
|
|
||||||
|
test_expect_success "generate new key" '
|
||||||
|
KEY2=`ipfsi 1 key gen beepboop --type ed25519`
|
||||||
|
'
|
||||||
|
|
||||||
|
test_expect_success "publish with new key succeeds" '
|
||||||
|
HASH=$(echo "barfoo" | ipfsi 1 add -q) &&
|
||||||
|
ipfsi 1 name publish -t 5s -k "$KEY2" $HASH
|
||||||
|
'
|
||||||
|
|
||||||
|
verify_can_resolve "$num_test_nodes" "$KEY2" "$HASH" "new key just after publishing"
|
||||||
|
|
||||||
|
go-sleep 5s
|
||||||
|
|
||||||
|
verify_cannot_resolve "$num_test_nodes" "$KEY2" "new key cannot resolve after 5 seconds"
|
||||||
|
|
||||||
|
go-sleep 15s
|
||||||
|
|
||||||
|
verify_can_resolve "$num_test_nodes" "$KEY2" "$HASH" "new key can resolve again after republish"
|
||||||
|
|
||||||
|
#
|
||||||
|
|
||||||
teardown_iptb
|
teardown_iptb
|
||||||
|
|
||||||
test_done
|
test_done
|
||||||
|
Reference in New Issue
Block a user