1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-29 01:12:24 +08:00

Merge pull request #5200 from ipfs/fix/5197

fix dht commands when pubsub routing is enabled
This commit is contained in:
Whyrusleeping
2018-07-09 01:41:31 -07:00
committed by GitHub
5 changed files with 139 additions and 118 deletions

View File

@ -13,7 +13,6 @@ import (
dag "github.com/ipfs/go-ipfs/merkledag" dag "github.com/ipfs/go-ipfs/merkledag"
path "github.com/ipfs/go-ipfs/path" path "github.com/ipfs/go-ipfs/path"
ipdht "gx/ipfs/QmNg6M98bwS97SL9ArvrRxKujFps3eV6XvmKgduiYga8Bn/go-libp2p-kad-dht"
routing "gx/ipfs/QmPpdpS9fknTBM3qHDcpayU6nYPZQeVjia2fbNrD8YWDe6/go-libp2p-routing" routing "gx/ipfs/QmPpdpS9fknTBM3qHDcpayU6nYPZQeVjia2fbNrD8YWDe6/go-libp2p-routing"
notif "gx/ipfs/QmPpdpS9fknTBM3qHDcpayU6nYPZQeVjia2fbNrD8YWDe6/go-libp2p-routing/notifications" notif "gx/ipfs/QmPpdpS9fknTBM3qHDcpayU6nYPZQeVjia2fbNrD8YWDe6/go-libp2p-routing/notifications"
b58 "gx/ipfs/QmWFAMPqsEyUX7gDUsRVmMWz59FxSpJ1b2v6bJ1yYzo7jY/go-base58-fast/base58" b58 "gx/ipfs/QmWFAMPqsEyUX7gDUsRVmMWz59FxSpJ1b2v6bJ1yYzo7jY/go-base58-fast/base58"
@ -26,6 +25,9 @@ import (
var ErrNotDHT = errors.New("routing service is not a DHT") var ErrNotDHT = errors.New("routing service is not a DHT")
// TODO: Factor into `ipfs dht` and `ipfs routing`.
// Everything *except `query` goes into `ipfs routing`.
var DhtCmd = &cmds.Command{ var DhtCmd = &cmds.Command{
Helptext: cmdkit.HelpText{ Helptext: cmdkit.HelpText{
Tagline: "Issue commands directly through the DHT.", Tagline: "Issue commands directly through the DHT.",
@ -61,8 +63,7 @@ var queryDhtCmd = &cmds.Command{
return return
} }
dht, ok := n.Routing.(*ipdht.IpfsDHT) if n.DHT == nil {
if !ok {
res.SetError(ErrNotDHT, cmdkit.ErrNormal) res.SetError(ErrNotDHT, cmdkit.ErrNormal)
return return
} }
@ -76,7 +77,7 @@ var queryDhtCmd = &cmds.Command{
return return
} }
closestPeers, err := dht.GetClosestPeers(ctx, string(id)) closestPeers, err := n.DHT.GetClosestPeers(ctx, string(id))
if err != nil { if err != nil {
res.SetError(err, cmdkit.ErrNormal) res.SetError(err, cmdkit.ErrNormal)
return return
@ -140,7 +141,7 @@ var queryDhtCmd = &cmds.Command{
var findProvidersDhtCmd = &cmds.Command{ var findProvidersDhtCmd = &cmds.Command{
Helptext: cmdkit.HelpText{ Helptext: cmdkit.HelpText{
Tagline: "Find peers in the DHT that can provide a specific value, given a key.", Tagline: "Find peers that can provide a specific value, given a key.",
ShortDescription: "Outputs a list of newline-delimited provider Peer IDs.", ShortDescription: "Outputs a list of newline-delimited provider Peer IDs.",
}, },
@ -158,9 +159,8 @@ var findProvidersDhtCmd = &cmds.Command{
return return
} }
dht, ok := n.Routing.(*ipdht.IpfsDHT) if n.Routing == nil {
if !ok { res.SetError(errNotOnline, cmdkit.ErrNormal)
res.SetError(ErrNotDHT, cmdkit.ErrNormal)
return return
} }
@ -186,7 +186,7 @@ var findProvidersDhtCmd = &cmds.Command{
outChan := make(chan interface{}) outChan := make(chan interface{})
res.SetOutput((<-chan interface{})(outChan)) res.SetOutput((<-chan interface{})(outChan))
pchan := dht.FindProvidersAsync(ctx, c, numProviders) pchan := n.Routing.FindProvidersAsync(ctx, c, numProviders)
go func() { go func() {
defer close(outChan) defer close(outChan)
for e := range events { for e := range events {
@ -406,7 +406,7 @@ func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv ipld.DAGSe
var findPeerDhtCmd = &cmds.Command{ var findPeerDhtCmd = &cmds.Command{
Helptext: cmdkit.HelpText{ Helptext: cmdkit.HelpText{
Tagline: "Query the DHT for all of the multiaddresses associated with a Peer ID.", Tagline: "Find the multiaddresses associated with a Peer ID.",
ShortDescription: "Outputs a list of newline-delimited multiaddresses.", ShortDescription: "Outputs a list of newline-delimited multiaddresses.",
}, },
@ -423,9 +423,8 @@ var findPeerDhtCmd = &cmds.Command{
return return
} }
dht, ok := n.Routing.(*ipdht.IpfsDHT) if n.Routing == nil {
if !ok { res.SetError(errNotOnline, cmdkit.ErrNormal)
res.SetError(ErrNotDHT, cmdkit.ErrNormal)
return return
} }
@ -454,7 +453,7 @@ var findPeerDhtCmd = &cmds.Command{
go func() { go func() {
defer close(events) defer close(events)
pi, err := dht.FindPeer(ctx, pid) pi, err := n.Routing.FindPeer(ctx, pid)
if err != nil { if err != nil {
notif.PublishQueryEvent(ctx, &notif.QueryEvent{ notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.QueryError, Type: notif.QueryError,
@ -504,14 +503,14 @@ var findPeerDhtCmd = &cmds.Command{
var getValueDhtCmd = &cmds.Command{ var getValueDhtCmd = &cmds.Command{
Helptext: cmdkit.HelpText{ Helptext: cmdkit.HelpText{
Tagline: "Given a key, query the DHT for its best value.", Tagline: "Given a key, query the routing system for its best value.",
ShortDescription: ` ShortDescription: `
Outputs the best value for the given key. Outputs the best value for the given key.
There may be several different values for a given key stored in the DHT; in There may be several different values for a given key stored in the routing
this context 'best' means the record that is most desirable. There is no one system; in this context 'best' means the record that is most desirable. There is
metric for 'best': it depends entirely on the key type. For IPNS, 'best' is no one metric for 'best': it depends entirely on the key type. For IPNS, 'best'
the record that is both valid and has the highest sequence number (freshest). is the record that is both valid and has the highest sequence number (freshest).
Different key types can specify other 'best' rules. Different key types can specify other 'best' rules.
`, `,
}, },
@ -529,9 +528,8 @@ Different key types can specify other 'best' rules.
return return
} }
dht, ok := n.Routing.(*ipdht.IpfsDHT) if n.Routing == nil {
if !ok { res.SetError(errNotOnline, cmdkit.ErrNormal)
res.SetError(ErrNotDHT, cmdkit.ErrNormal)
return return
} }
@ -559,7 +557,7 @@ Different key types can specify other 'best' rules.
go func() { go func() {
defer close(events) defer close(events)
val, err := dht.GetValue(ctx, dhtkey) val, err := n.Routing.GetValue(ctx, dhtkey)
if err != nil { if err != nil {
notif.PublishQueryEvent(ctx, &notif.QueryEvent{ notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.QueryError, Type: notif.QueryError,
@ -610,10 +608,10 @@ Different key types can specify other 'best' rules.
var putValueDhtCmd = &cmds.Command{ var putValueDhtCmd = &cmds.Command{
Helptext: cmdkit.HelpText{ Helptext: cmdkit.HelpText{
Tagline: "Write a key/value pair to the DHT.", Tagline: "Write a key/value pair to the routing system.",
ShortDescription: ` ShortDescription: `
Given a key of the form /foo/bar and a value of any form, this will write that Given a key of the form /foo/bar and a value of any form, this will write that
value to the DHT with that key. value to the routing system with that key.
Keys have two parts: a keytype (foo) and the key name (bar). IPNS uses the Keys have two parts: a keytype (foo) and the key name (bar). IPNS uses the
/ipns keytype, and expects the key name to be a Peer ID. IPNS entries are /ipns keytype, and expects the key name to be a Peer ID. IPNS entries are
@ -621,7 +619,7 @@ specifically formatted (protocol buffer).
You may only use keytypes that are supported in your ipfs binary: currently You may only use keytypes that are supported in your ipfs binary: currently
this is only /ipns. Unless you have a relatively deep understanding of the this is only /ipns. Unless you have a relatively deep understanding of the
go-ipfs DHT internals, you likely want to be using 'ipfs name publish' instead go-ipfs routing internals, you likely want to be using 'ipfs name publish' instead
of this. of this.
Value is arbitrary text. Standard input can be used to provide value. Value is arbitrary text. Standard input can be used to provide value.
@ -644,9 +642,8 @@ NOTE: A value may not exceed 2048 bytes.
return return
} }
dht, ok := n.Routing.(*ipdht.IpfsDHT) if n.Routing == nil {
if !ok { res.SetError(errNotOnline, cmdkit.ErrNormal)
res.SetError(ErrNotDHT, cmdkit.ErrNormal)
return return
} }
@ -677,7 +674,7 @@ NOTE: A value may not exceed 2048 bytes.
go func() { go func() {
defer close(events) defer close(events)
err := dht.PutValue(ctx, key, []byte(data)) err := n.Routing.PutValue(ctx, key, []byte(data))
if err != nil { if err != nil {
notif.PublishQueryEvent(ctx, &notif.QueryEvent{ notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.QueryError, Type: notif.QueryError,

View File

@ -41,7 +41,7 @@ import (
dhtopts "gx/ipfs/QmNg6M98bwS97SL9ArvrRxKujFps3eV6XvmKgduiYga8Bn/go-libp2p-kad-dht/opts" dhtopts "gx/ipfs/QmNg6M98bwS97SL9ArvrRxKujFps3eV6XvmKgduiYga8Bn/go-libp2p-kad-dht/opts"
u "gx/ipfs/QmPdKqUcHGFdeSpvjVoaTRPPstGif9GBZb5Q56RVw9o69A/go-ipfs-util" u "gx/ipfs/QmPdKqUcHGFdeSpvjVoaTRPPstGif9GBZb5Q56RVw9o69A/go-ipfs-util"
routing "gx/ipfs/QmPpdpS9fknTBM3qHDcpayU6nYPZQeVjia2fbNrD8YWDe6/go-libp2p-routing" routing "gx/ipfs/QmPpdpS9fknTBM3qHDcpayU6nYPZQeVjia2fbNrD8YWDe6/go-libp2p-routing"
psrouter "gx/ipfs/QmPxCZ99jTHMxD93qQV4pN3WJbBHrBLCQiumWDgfJjFQJy/go-libp2p-pubsub-router" psrouter "gx/ipfs/QmR2hqcem4qjd4DkuyiwSFjfUiCP5eXHdPoM7o7dWKwct9/go-libp2p-pubsub-router"
pnet "gx/ipfs/QmRGvSwDpN4eunxgDNfmQhayZ6Z9F5a2v31V2D7y77osLg/go-libp2p-pnet" pnet "gx/ipfs/QmRGvSwDpN4eunxgDNfmQhayZ6Z9F5a2v31V2D7y77osLg/go-libp2p-pnet"
goprocess "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess" goprocess "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
mamask "gx/ipfs/QmSMZwvs3n4GBikZ7hKzT17c3bk65FmyZo2JqtJ16swqCv/multiaddr-filter" mamask "gx/ipfs/QmSMZwvs3n4GBikZ7hKzT17c3bk65FmyZo2JqtJ16swqCv/multiaddr-filter"
@ -65,7 +65,7 @@ import (
ping "gx/ipfs/QmZ86eLPtXkQ1Dfa992Q8NpXArUoWWh3y728JDcWvzRrvC/go-libp2p/p2p/protocol/ping" ping "gx/ipfs/QmZ86eLPtXkQ1Dfa992Q8NpXArUoWWh3y728JDcWvzRrvC/go-libp2p/p2p/protocol/ping"
mplex "gx/ipfs/QmZHiqdRuNXujvSPNu1ZWxxzV6a2WhoZpfYkesdgyaKF9f/go-smux-multiplex" mplex "gx/ipfs/QmZHiqdRuNXujvSPNu1ZWxxzV6a2WhoZpfYkesdgyaKF9f/go-smux-multiplex"
pstore "gx/ipfs/QmZR2XWVVBCtbgBWnQhWk2xcQfaR3W8faQPriAiaaj7rsr/go-libp2p-peerstore" pstore "gx/ipfs/QmZR2XWVVBCtbgBWnQhWk2xcQfaR3W8faQPriAiaaj7rsr/go-libp2p-peerstore"
rhelpers "gx/ipfs/QmZw5m4ioaoNmATBtP3o7qC1UERubJgz84RzccT3UEHZKr/go-libp2p-routing-helpers" rhelpers "gx/ipfs/Qmafsgr3GSDKyGHW8SU9dbe6Vtv4rEgcgJ3WRnS72qtAzv/go-libp2p-routing-helpers"
cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid" cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid"
p2phost "gx/ipfs/Qmb8T6YBBsjYsVGfrihQLfCJveczZnneSBqBKkYEBWDjge/go-libp2p-host" p2phost "gx/ipfs/Qmb8T6YBBsjYsVGfrihQLfCJveczZnneSBqBKkYEBWDjge/go-libp2p-host"
logging "gx/ipfs/QmcVVHfdyv15GVPk7NrxdWjh2hLVccXnoD8j2tyQShiXJb/go-log" logging "gx/ipfs/QmcVVHfdyv15GVPk7NrxdWjh2hLVccXnoD8j2tyQShiXJb/go-log"
@ -137,6 +137,7 @@ type IpfsNode struct {
Floodsub *floodsub.PubSub Floodsub *floodsub.PubSub
PSRouter *psrouter.PubsubValueStore PSRouter *psrouter.PubsubValueStore
DHT *dht.IpfsDHT
P2P *p2p.P2P P2P *p2p.P2P
proc goprocess.Process proc goprocess.Process
@ -465,6 +466,23 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost
} }
n.Routing = r n.Routing = r
// TODO: I'm not a fan of type assertions like this but the
// `RoutingOption` system doesn't currently provide access to the
// IpfsNode.
//
// Ideally, we'd do something like:
//
// 1. Add some fancy method to introspect into tiered routers to extract
// things like the pubsub router or the DHT (complicated, messy,
// probably not worth it).
// 2. Pass the IpfsNode into the RoutingOption (would also remove the
// PSRouter case below.
// 3. Introduce some kind of service manager? (my personal favorite but
// that requires a fair amount of work).
if dht, ok := r.(*dht.IpfsDHT); ok {
n.DHT = dht
}
if ipnsps { if ipnsps {
n.PSRouter = psrouter.NewPubsubValueStore( n.PSRouter = psrouter.NewPubsubValueStore(
ctx, ctx,
@ -601,8 +619,8 @@ func (n *IpfsNode) teardown() error {
closers = append(closers, mount.Closer(n.Mounts.Ipns)) closers = append(closers, mount.Closer(n.Mounts.Ipns))
} }
if dht, ok := n.Routing.(*dht.IpfsDHT); ok { if n.DHT != nil {
closers = append(closers, dht.Process()) closers = append(closers, n.DHT.Process())
} }
if n.Blocks != nil { if n.Blocks != nil {

View File

@ -524,15 +524,15 @@
}, },
{ {
"author": "stebalien", "author": "stebalien",
"hash": "QmPxCZ99jTHMxD93qQV4pN3WJbBHrBLCQiumWDgfJjFQJy", "hash": "QmR2hqcem4qjd4DkuyiwSFjfUiCP5eXHdPoM7o7dWKwct9",
"name": "go-libp2p-pubsub-router", "name": "go-libp2p-pubsub-router",
"version": "0.3.5" "version": "0.3.6"
}, },
{ {
"author": "Stebalien", "author": "Stebalien",
"hash": "QmZw5m4ioaoNmATBtP3o7qC1UERubJgz84RzccT3UEHZKr", "hash": "Qmafsgr3GSDKyGHW8SU9dbe6Vtv4rEgcgJ3WRnS72qtAzv",
"name": "go-libp2p-routing-helpers", "name": "go-libp2p-routing-helpers",
"version": "0.2.4" "version": "0.2.5"
}, },
{ {
"author": "fsnotify", "author": "fsnotify",

View File

@ -35,9 +35,9 @@ startup_cluster() {
bound=$(expr "$num_nodes" - 1) bound=$(expr "$num_nodes" - 1)
if test -n "$other_args"; then if test -n "$other_args"; then
test_expect_success "start up nodes with additional args" ' test_expect_success "start up nodes with additional args" "
iptb start --args $other_args iptb start --args \"${other_args[@]}\"
' "
else else
test_expect_success "start up nodes" ' test_expect_success "start up nodes" '
iptb start iptb start

View File

@ -7,87 +7,93 @@ test_description="Test dht command"
TEST_DHT_VALUE="foobar" TEST_DHT_VALUE="foobar"
TEST_DHT_PATH="/pk/QmbWTwYGcmdyK9CYfNBcfs9nhZs17a6FQ4Y8oea278xx41" TEST_DHT_PATH="/pk/QmbWTwYGcmdyK9CYfNBcfs9nhZs17a6FQ4Y8oea278xx41"
# start iptb + wait for peering test_dht() {
NUM_NODES=5 NUM_NODES=5
test_expect_success 'init iptb' '
iptb init -n $NUM_NODES --bootstrap=none --port=0
'
startup_cluster $NUM_NODES test_expect_success 'init iptb' '
rm -rf .iptb/ &&
iptb init -n $NUM_NODES --bootstrap=none --port=0
'
test_expect_success 'peer ids' ' startup_cluster $NUM_NODES "$@"
PEERID_0=$(iptb get id 0) &&
PEERID_2=$(iptb get id 2)
'
# ipfs dht findpeer <peerID> test_expect_success 'peer ids' '
test_expect_success 'findpeer' ' PEERID_0=$(iptb get id 0) &&
ipfsi 1 dht findpeer $PEERID_0 | sort >actual && PEERID_2=$(iptb get id 2)
ipfsi 0 id -f "<addrs>" | cut -d / -f 1-5 | sort >expected && '
test_cmp actual expected
' # ipfs dht findpeer <peerID>
test_expect_success 'findpeer' '
ipfsi 1 dht findpeer $PEERID_0 | sort >actual &&
ipfsi 0 id -f "<addrs>" | cut -d / -f 1-5 | sort >expected &&
test_cmp actual expected
'
# ipfs dht put <key> <value>
test_expect_success 'put with good keys' '
ipfsi 0 dht put "$TEST_DHT_PATH" "$TEST_DHT_VALUE" | sort >putted &&
[ -s putted ] ||
test_fsh cat putted
'
# ipfs dht get <key>
test_expect_success 'get with good keys' '
HASH="$(echo "hello world" | ipfsi 2 add -q)" &&
ipfsi 2 name publish "/ipfs/$HASH" &&
ipfsi 1 dht get "/ipns/$PEERID_2" | grep -aq "/ipfs/$HASH"
'
test_expect_success 'put with bad keys fails (issue #5113)' '
ipfsi 0 dht put "foo" "bar" >putted
ipfsi 0 dht put "/pk/foo" "bar" >>putted
ipfsi 0 dht put "/ipns/foo" "bar" >>putted
[ ! -s putted ] ||
test_fsh cat putted
'
test_expect_failure 'put with bad keys returns error (issue #4611)' '
! ipfsi 0 dht put "foo" "bar" &&
! ipfsi 0 dht put "/pk/foo" "bar" &&
! ipfsi 0 dht put "/ipns/foo" "bar"
'
test_expect_failure 'get with bad keys (issue #4611)' '
! ipfsi 0 dht get "foo" &&
! ipfsi 0 dht get "/pk/foo"
'
test_expect_success "add a ref so we can find providers for it" '
echo "some stuff" > afile &&
HASH=$(ipfsi 3 add -q afile)
'
# ipfs dht findprovs <key>
test_expect_success 'findprovs' '
ipfsi 4 dht findprovs $HASH > provs &&
iptb get id 3 > expected &&
test_cmp provs expected
'
# ipfs dht query <peerID>
## We query 3 different keys, to statisically lower the chance that the queryer
## turns out to be the closest to what a key hashes to.
# TODO: flaky. tracked by https://github.com/ipfs/go-ipfs/issues/2620
test_expect_success 'query' '
ipfsi 3 dht query "$(echo banana | ipfsi 3 add -q)" >actual &&
ipfsi 3 dht query "$(echo apple | ipfsi 3 add -q)" >>actual &&
ipfsi 3 dht query "$(echo pear | ipfsi 3 add -q)" >>actual &&
PEERS=$(wc -l actual | cut -d '"'"' '"'"' -f 1) &&
[ -s actual ] ||
test_might_fail test_fsh cat actual
'
# ipfs dht put <key> <value> test_expect_success 'stop iptb' '
test_expect_success 'put with good keys' ' iptb stop
ipfsi 0 dht put "$TEST_DHT_PATH" "$TEST_DHT_VALUE" | sort >putted && '
[ -s putted ] || }
test_fsh cat putted
'
# ipfs dht get <key> test_dht
test_expect_success 'get with good keys' ' test_dht --enable-pubsub-experiment --enable-namesys-pubsub
HASH="$(echo "hello world" | ipfsi 2 add -q)" &&
ipfsi 2 name publish "/ipfs/$HASH" &&
ipfsi 1 dht get "/ipns/$PEERID_2" | grep -aq "/ipfs/$HASH"
'
test_expect_success 'put with bad keys fails (issue #5113)' '
ipfsi 0 dht put "foo" "bar" >putted
ipfsi 0 dht put "/pk/foo" "bar" >>putted
ipfsi 0 dht put "/ipns/foo" "bar" >>putted
[ ! -s putted ] ||
test_fsh cat putted
'
test_expect_failure 'put with bad keys returns error (issue #4611)' '
! ipfsi 0 dht put "foo" "bar" &&
! ipfsi 0 dht put "/pk/foo" "bar" &&
! ipfsi 0 dht put "/ipns/foo" "bar"
'
test_expect_failure 'get with bad keys (issue #4611)' '
! ipfsi 0 dht get "foo" &&
! ipfsi 0 dht get "/pk/foo"
'
test_expect_success "add a ref so we can find providers for it" '
echo "some stuff" > afile &&
HASH=$(ipfsi 3 add -q afile)
'
# ipfs dht findprovs <key>
test_expect_success 'findprovs' '
ipfsi 4 dht findprovs $HASH > provs &&
iptb get id 3 > expected &&
test_cmp provs expected
'
# ipfs dht query <peerID>
## We query 3 different keys, to statisically lower the chance that the queryer
## turns out to be the closest to what a key hashes to.
# TODO: flaky. tracked by https://github.com/ipfs/go-ipfs/issues/2620
test_expect_success 'query' '
ipfsi 3 dht query "$(echo banana | ipfsi 3 add -q)" >actual &&
ipfsi 3 dht query "$(echo apple | ipfsi 3 add -q)" >>actual &&
ipfsi 3 dht query "$(echo pear | ipfsi 3 add -q)" >>actual &&
PEERS=$(wc -l actual | cut -d '"'"' '"'"' -f 1) &&
[ -s actual ] ||
test_might_fail test_fsh cat actual
'
test_expect_success 'stop iptb' '
iptb stop
'
test_done test_done