1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-28 00:39:31 +08:00

Merge pull request #4113 from ipfs/feat/reprovider-starts

Reprovider strategies
This commit is contained in:
Jeromy Johnson
2017-08-15 17:13:23 -07:00
committed by GitHub
10 changed files with 393 additions and 41 deletions

View File

@ -231,5 +231,11 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
}
n.Resolver = path.NewBasicResolver(n.DAG)
if cfg.Online {
if err := n.startLateOnlineServices(ctx); err != nil {
return err
}
}
return n.loadFilesRoot()
}

View File

@ -21,10 +21,11 @@ var BitswapCmd = &cmds.Command{
ShortDescription: ``,
},
Subcommands: map[string]*cmds.Command{
"wantlist": showWantlistCmd,
"stat": bitswapStatCmd,
"unwant": unwantCmd,
"ledger": ledgerCmd,
"wantlist": showWantlistCmd,
"stat": bitswapStatCmd,
"unwant": unwantCmd,
"ledger": ledgerCmd,
"reprovide": reprovideCmd,
},
}
@ -242,3 +243,30 @@ prints the ledger associated with a given peer.
},
},
}
var reprovideCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Trigger reprovider.",
ShortDescription: `
Trigger reprovider to announce our data to network.
`,
},
Run: func(req cmds.Request, res cmds.Response) {
nd, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
if !nd.OnlineMode() {
res.SetError(errNotOnline, cmds.ErrClient)
return
}
err = nd.Reprovider.Trigger(req.Context())
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
},
}

View File

@ -237,22 +237,6 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
return err
}
n.Reprovider = rp.NewReprovider(n.Routing, n.Blockstore)
if cfg.Reprovider.Interval != "0" {
interval := kReprovideFrequency
if cfg.Reprovider.Interval != "" {
dur, err := time.ParseDuration(cfg.Reprovider.Interval)
if err != nil {
return err
}
interval = dur
}
go n.Reprovider.ProvideEvery(ctx, interval)
}
if pubsub {
n.Floodsub = floodsub.NewFloodSub(ctx, peerhost)
}
@ -273,6 +257,45 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
return n.Bootstrap(DefaultBootstrapConfig)
}
func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error {
cfg, err := n.Repo.Config()
if err != nil {
return err
}
var keyProvider rp.KeyChanFunc
switch cfg.Reprovider.Strategy {
case "all":
fallthrough
case "":
keyProvider = rp.NewBlockstoreProvider(n.Blockstore)
case "roots":
keyProvider = rp.NewPinnedProvider(n.Pinning, n.DAG, true)
case "pinned":
keyProvider = rp.NewPinnedProvider(n.Pinning, n.DAG, false)
default:
return fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy)
}
n.Reprovider = rp.NewReprovider(ctx, n.Routing, keyProvider)
if cfg.Reprovider.Interval != "0" {
interval := kReprovideFrequency
if cfg.Reprovider.Interval != "" {
dur, err := time.ParseDuration(cfg.Reprovider.Interval)
if err != nil {
return err
}
interval = dur
}
go n.Reprovider.ProvideEvery(interval)
}
return nil
}
func makeAddrsFactory(cfg config.Addresses) (p2pbhost.AddrsFactory, error) {
var annAddrs []ma.Multiaddr
for _, addr := range cfg.Announce {

View File

@ -15,7 +15,7 @@ a running daemon do not read the config file at runtime.
- [`Identity`](#identity)
- [`Ipns`](#ipns)
- [`Mounts`](#mounts)
- [`ReproviderInterval`](#reproviderinterval)
- [`Reprovider`](#reprovider)
- [`SupernodeRouting`](#supernoderouting)
- [`Swarm`](#swarm)
@ -192,7 +192,9 @@ Mountpoint for `/ipns/`.
- `FuseAllowOther`
Sets the FUSE allow other option on the mountpoint.
## `ReproviderInterval`
## `Reprovider`
- `Interval`
Sets the time between rounds of reproviding local content to the routing
system. If unset, it defaults to 12 hours. If set to the value `"0"` it will
disable content reproviding.
@ -202,6 +204,12 @@ not being able to discover that you have the objects that you have. If you want
to have this disabled and keep the network aware of what you have, you must
manually announce your content periodically.
- `Strategy`
Tells reprovider what should be announced. Valid strategies are:
- "all" (default) - announce all stored data
- "pinned" - only announce pinned data
- "roots" - only announce directly pinned keys and root keys of recursive pins
## `SupernodeRouting`
Deprecated.

View File

@ -0,0 +1,93 @@
package reprovide
import (
"context"
blocks "github.com/ipfs/go-ipfs/blocks/blockstore"
merkledag "github.com/ipfs/go-ipfs/merkledag"
pin "github.com/ipfs/go-ipfs/pin"
cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid"
)
// NewBlockstoreProvider returns key provider using bstore.AllKeysChan
func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc {
return func(ctx context.Context) (<-chan *cid.Cid, error) {
return bstore.AllKeysChan(ctx)
}
}
// NewPinnedProvider returns provider supplying pinned keys
func NewPinnedProvider(pinning pin.Pinner, dag merkledag.DAGService, onlyRoots bool) KeyChanFunc {
return func(ctx context.Context) (<-chan *cid.Cid, error) {
set, err := pinSet(ctx, pinning, dag, onlyRoots)
if err != nil {
return nil, err
}
outCh := make(chan *cid.Cid)
go func() {
defer close(outCh)
for c := range set.new {
select {
case <-ctx.Done():
return
case outCh <- c:
}
}
}()
return outCh, nil
}
}
func pinSet(ctx context.Context, pinning pin.Pinner, dag merkledag.DAGService, onlyRoots bool) (*streamingSet, error) {
set := newStreamingSet()
go func() {
defer close(set.new)
for _, key := range pinning.DirectKeys() {
set.add(key)
}
for _, key := range pinning.RecursiveKeys() {
set.add(key)
if !onlyRoots {
err := merkledag.EnumerateChildren(ctx, dag.GetLinks, key, set.add)
if err != nil {
log.Errorf("reprovide indirect pins: %s", err)
return
}
}
}
}()
return set, nil
}
type streamingSet struct {
set *cid.Set
new chan *cid.Cid
}
// NewSet initializes and returns a new Set.
func newStreamingSet() *streamingSet {
return &streamingSet{
set: cid.NewSet(),
new: make(chan *cid.Cid),
}
}
// add adds a Cid to the set only if it is
// not in it already.
func (s *streamingSet) add(c *cid.Cid) bool {
if s.set.Visit(c) {
s.new <- c
return true
}
return false
}

View File

@ -5,56 +5,82 @@ import (
"fmt"
"time"
blocks "github.com/ipfs/go-ipfs/blocks/blockstore"
backoff "gx/ipfs/QmPJUtEJsm5YLUWhF6imvyCH8KZXRJa9Wup7FDMwTy5Ufz/backoff"
routing "gx/ipfs/QmPjTrrSfE6TzLv6ya6VWhGcCgPrUAdcgrDcQyRDX2VyW1/go-libp2p-routing"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid"
)
var log = logging.Logger("reprovider")
//KeyChanFunc is function streaming CIDs to pass to content routing
type KeyChanFunc func(context.Context) (<-chan *cid.Cid, error)
type doneFunc func(error)
type Reprovider struct {
ctx context.Context
trigger chan doneFunc
// The routing system to provide values through
rsys routing.ContentRouting
// The backing store for blocks to be provided
bstore blocks.Blockstore
keyProvider KeyChanFunc
}
func NewReprovider(rsys routing.ContentRouting, bstore blocks.Blockstore) *Reprovider {
// NewReprovider creates new Reprovider instance.
func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider {
return &Reprovider{
rsys: rsys,
bstore: bstore,
ctx: ctx,
trigger: make(chan doneFunc),
rsys: rsys,
keyProvider: keyProvider,
}
}
func (rp *Reprovider) ProvideEvery(ctx context.Context, tick time.Duration) {
// ProvideEvery re-provides keys with 'tick' interval
func (rp *Reprovider) ProvideEvery(tick time.Duration) {
// dont reprovide immediately.
// may have just started the daemon and shutting it down immediately.
// probability( up another minute | uptime ) increases with uptime.
after := time.After(time.Minute)
var done doneFunc
for {
select {
case <-ctx.Done():
case <-rp.ctx.Done():
return
case done = <-rp.trigger:
case <-after:
err := rp.Reprovide(ctx)
if err != nil {
log.Debug(err)
}
after = time.After(tick)
}
//'mute' the trigger channel so when `ipfs bitswap reprovide` is called
//a 'reprovider is already running' error is returned
unmute := rp.muteTrigger()
err := rp.Reprovide()
if err != nil {
log.Debug(err)
}
if done != nil {
done(err)
}
unmute()
after = time.After(tick)
}
}
func (rp *Reprovider) Reprovide(ctx context.Context) error {
keychan, err := rp.bstore.AllKeysChan(ctx)
// Reprovide registers all keys given by rp.keyProvider to libp2p content routing
func (rp *Reprovider) Reprovide() error {
keychan, err := rp.keyProvider(rp.ctx)
if err != nil {
return fmt.Errorf("Failed to get key chan from blockstore: %s", err)
return fmt.Errorf("Failed to get key chan: %s", err)
}
for c := range keychan {
op := func() error {
err := rp.rsys.Provide(ctx, c, true)
err := rp.rsys.Provide(rp.ctx, c, true)
if err != nil {
log.Debugf("Failed to provide key: %s", err)
}
@ -71,3 +97,41 @@ func (rp *Reprovider) Reprovide(ctx context.Context) error {
}
return nil
}
// Trigger starts reprovision process in rp.ProvideEvery and waits for it
func (rp *Reprovider) Trigger(ctx context.Context) error {
progressCtx, done := context.WithCancel(ctx)
var err error
df := func(e error) {
err = e
done()
}
select {
case <-rp.ctx.Done():
return context.Canceled
case <-ctx.Done():
return context.Canceled
case rp.trigger <- df:
<-progressCtx.Done()
return err
}
}
func (rp *Reprovider) muteTrigger() context.CancelFunc {
ctx, cf := context.WithCancel(rp.ctx)
go func() {
defer cf()
for {
select {
case <-ctx.Done():
return
case done := <-rp.trigger:
done(fmt.Errorf("reprovider is already running"))
}
}
}()
return cf
}

View File

@ -32,8 +32,9 @@ func TestReprovide(t *testing.T) {
blk := blocks.NewBlock([]byte("this is a test"))
bstore.Put(blk)
reprov := NewReprovider(clA, bstore)
err := reprov.Reprovide(ctx)
keyProvider := NewBlockstoreProvider(bstore)
reprov := NewReprovider(ctx, clA, keyProvider)
err := reprov.Reprovide()
if err != nil {
t.Fatal(err)
}

View File

@ -72,6 +72,7 @@ func Init(out io.Writer, nBitsForKeypair int) (*Config, error) {
},
Reprovider: Reprovider{
Interval: "12h",
Strategy: "all",
},
}

View File

@ -2,4 +2,5 @@ package config
type Reprovider struct {
Interval string // Time period to reprovide locally stored objects to the network
Strategy string // Which keys to announce
}

127
test/sharness/t0175-reprovider.sh Executable file
View File

@ -0,0 +1,127 @@
#!/bin/sh
test_description="Test reprovider"
. lib/test-lib.sh
init_strategy() {
NUM_NODES=6
test_expect_success 'init iptb' '
iptb init -f -n $NUM_NODES --bootstrap=none --port=0
'
test_expect_success 'peer ids' '
PEERID_0=$(iptb get id 0) &&
PEERID_1=$(iptb get id 1)
'
test_expect_success 'use pinning startegy for reprovider' '
ipfsi 0 config Reprovider.Strategy '$1'
'
startup_cluster 6 --debug
}
findprovs_empty() {
test_expect_success 'findprovs '$1' succeeds' '
ipfsi 1 dht findprovs -n 1 '$1' > findprovsOut
'
test_expect_success "findprovs $1 output is empty" '
test_must_be_empty findprovsOut
'
}
findprovs_expect() {
test_expect_success 'findprovs '$1' succeeds' '
ipfsi 1 dht findprovs -n 1 '$1' > findprovsOut &&
echo '$2' > expected
'
test_expect_success "findprovs $1 output looks good" '
test_cmp findprovsOut expected
'
}
reprovide() {
test_expect_success 'reprovide' '
# TODO: this hangs, though only after reprovision was done
ipfsi 0 bitswap reprovide
'
}
test_expect_success 'stop peer 1' '
iptb stop 1
'
# Test 'all' strategy
init_strategy 'all'
test_expect_success 'add test object' '
HASH_0=$(echo "foo" | ipfsi 0 add -q --local)
'
findprovs_empty '$HASH_0'
reprovide
findprovs_expect '$HASH_0' '$PEERID_0'
# Test 'pinned' strategy
init_strategy 'pinned'
test_expect_success 'prepare test files' '
echo foo > f1 &&
echo bar > f2
'
test_expect_success 'add test objects' '
HASH_FOO=$(ipfsi 0 add -q --local --pin=false f1) &&
HASH_BAR=$(ipfsi 0 add -q --local --pin=false f2) &&
HASH_BAR_DIR=$(ipfsi 0 add -q --local -w f2)
'
findprovs_empty '$HASH_FOO'
findprovs_empty '$HASH_BAR'
findprovs_empty '$HASH_BAR_DIR'
reprovide
findprovs_empty '$HASH_FOO'
findprovs_expect '$HASH_BAR' '$PEERID_0'
findprovs_expect '$HASH_BAR_DIR' '$PEERID_0'
test_expect_success 'stop peer 1' '
iptb stop 1
'
# Test 'roots' strategy
init_strategy 'roots'
test_expect_success 'prepare test files' '
echo foo > f1 &&
echo bar > f2 &&
echo baz > f3
'
test_expect_success 'add test objects' '
HASH_FOO=$(ipfsi 0 add -q --local --pin=false f1) &&
HASH_BAR=$(ipfsi 0 add -q --local --pin=false f2) &&
HASH_BAZ=$(ipfsi 0 add -q --local f3) &&
HASH_BAR_DIR=$(ipfsi 0 add -q --local -w f2 | tail -1)
'
findprovs_empty '$HASH_FOO'
findprovs_empty '$HASH_BAR'
findprovs_empty '$HASH_BAR_DIR'
reprovide
findprovs_empty '$HASH_FOO'
findprovs_empty '$HASH_BAR'
findprovs_expect '$HASH_BAZ' '$PEERID_0'
findprovs_expect '$HASH_BAR_DIR' '$PEERID_0'
test_expect_success 'stop peer 1' '
iptb stop 1
'
test_done