mirror of
https://github.com/ipfs/kubo.git
synced 2025-09-10 09:52:20 +08:00
implement dht findprovs and add error output to dht query
This commit is contained in:
@ -9,6 +9,7 @@ import (
|
||||
|
||||
cmds "github.com/jbenet/go-ipfs/commands"
|
||||
notif "github.com/jbenet/go-ipfs/notifications"
|
||||
peer "github.com/jbenet/go-ipfs/p2p/peer"
|
||||
ipdht "github.com/jbenet/go-ipfs/routing/dht"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
)
|
||||
@ -20,7 +21,8 @@ var DhtCmd = &cmds.Command{
|
||||
},
|
||||
|
||||
Subcommands: map[string]*cmds.Command{
|
||||
"query": queryDhtCmd,
|
||||
"query": queryDhtCmd,
|
||||
"findprovs": findProvidersDhtCmd,
|
||||
},
|
||||
}
|
||||
|
||||
@ -97,6 +99,10 @@ var queryDhtCmd = &cmds.Command{
|
||||
fmt.Fprintln(buf)
|
||||
case notif.SendingQuery:
|
||||
fmt.Fprintf(buf, "* querying %s\n", obj.ID)
|
||||
case notif.QueryError:
|
||||
fmt.Fprintf(buf, "error: %s\n", obj.Extra)
|
||||
default:
|
||||
fmt.Fprintf(buf, "unrecognized event type: %d\n", obj.Type)
|
||||
}
|
||||
return buf, nil
|
||||
}
|
||||
@ -109,3 +115,78 @@ var queryDhtCmd = &cmds.Command{
|
||||
},
|
||||
Type: notif.QueryEvent{},
|
||||
}
|
||||
|
||||
var findProvidersDhtCmd = &cmds.Command{
|
||||
Helptext: cmds.HelpText{
|
||||
Tagline: "Run a 'FindProviders' query through the DHT",
|
||||
ShortDescription: `
|
||||
FindProviders will return a list of peers who are able to provide the value requested.
|
||||
`,
|
||||
},
|
||||
|
||||
Arguments: []cmds.Argument{
|
||||
cmds.StringArg("key", true, true, "The key to find providers for"),
|
||||
},
|
||||
Options: []cmds.Option{
|
||||
cmds.BoolOption("verbose", "v", "Write extra information"),
|
||||
},
|
||||
Run: func(req cmds.Request) (interface{}, error) {
|
||||
n, err := req.Context().GetNode()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dht, ok := n.Routing.(*ipdht.IpfsDHT)
|
||||
if !ok {
|
||||
return nil, errors.New("Routing service was not a dht")
|
||||
}
|
||||
|
||||
numProviders := 20
|
||||
|
||||
outChan := make(chan interface{})
|
||||
pchan := dht.FindProvidersAsync(req.Context().Context, u.B58KeyDecode(req.Arguments()[0]), numProviders)
|
||||
|
||||
go func() {
|
||||
defer close(outChan)
|
||||
for p := range pchan {
|
||||
np := p
|
||||
outChan <- &np
|
||||
}
|
||||
}()
|
||||
return outChan, nil
|
||||
},
|
||||
Marshalers: cmds.MarshalerMap{
|
||||
cmds.Text: func(res cmds.Response) (io.Reader, error) {
|
||||
outChan, ok := res.Output().(<-chan interface{})
|
||||
if !ok {
|
||||
return nil, u.ErrCast()
|
||||
}
|
||||
|
||||
marshal := func(v interface{}) (io.Reader, error) {
|
||||
obj, ok := v.(*peer.PeerInfo)
|
||||
if !ok {
|
||||
return nil, u.ErrCast()
|
||||
}
|
||||
|
||||
verbose, _, err := res.Request().Option("v").Bool()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
if verbose {
|
||||
fmt.Fprintf(buf, "%s\n", obj.ID.Pretty())
|
||||
} else {
|
||||
fmt.Fprintf(buf, "%s\n", obj.ID)
|
||||
}
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
return &cmds.ChannelMarshaler{
|
||||
Channel: outChan,
|
||||
Marshaler: marshal,
|
||||
}, nil
|
||||
},
|
||||
},
|
||||
Type: peer.PeerInfo{},
|
||||
}
|
||||
|
@ -15,12 +15,14 @@ const (
|
||||
SendingQuery QueryEventType = iota
|
||||
PeerResponse
|
||||
FinalPeer
|
||||
QueryError
|
||||
)
|
||||
|
||||
type QueryEvent struct {
|
||||
ID peer.ID
|
||||
Type QueryEventType
|
||||
Responses []*peer.PeerInfo
|
||||
Extra string
|
||||
}
|
||||
|
||||
func RegisterForQueryEvents(ctx context.Context, ch chan<- *QueryEvent) context.Context {
|
||||
@ -49,6 +51,7 @@ func (qe *QueryEvent) MarshalJSON() ([]byte, error) {
|
||||
out["ID"] = peer.IDB58Encode(qe.ID)
|
||||
out["Type"] = int(qe.Type)
|
||||
out["Responses"] = qe.Responses
|
||||
out["Extra"] = qe.Extra
|
||||
return json.Marshal(out)
|
||||
}
|
||||
|
||||
@ -57,17 +60,21 @@ func (qe *QueryEvent) UnmarshalJSON(b []byte) error {
|
||||
ID string
|
||||
Type int
|
||||
Responses []*peer.PeerInfo
|
||||
Extra string
|
||||
}{}
|
||||
err := json.Unmarshal(b, &temp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pid, err := peer.IDB58Decode(temp.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
if len(temp.ID) > 0 {
|
||||
pid, err := peer.IDB58Decode(temp.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
qe.ID = pid
|
||||
}
|
||||
qe.ID = pid
|
||||
qe.Type = QueryEventType(temp.Type)
|
||||
qe.Responses = temp.Responses
|
||||
qe.Extra = temp.Extra
|
||||
return nil
|
||||
}
|
||||
|
@ -67,7 +67,6 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key u.Key) (<-chan peer
|
||||
filtered = append(filtered, dht.peerstore.PeerInfo(clp))
|
||||
}
|
||||
}
|
||||
log.Errorf("filtered: %v", filtered)
|
||||
|
||||
// For DHT query command
|
||||
notif.PublishQueryEvent(ctx, ¬if.QueryEvent{
|
||||
|
@ -3,6 +3,7 @@ package dht
|
||||
import (
|
||||
"sync"
|
||||
|
||||
notif "github.com/jbenet/go-ipfs/notifications"
|
||||
peer "github.com/jbenet/go-ipfs/p2p/peer"
|
||||
queue "github.com/jbenet/go-ipfs/p2p/peer/queue"
|
||||
"github.com/jbenet/go-ipfs/routing"
|
||||
@ -230,6 +231,12 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) {
|
||||
pi := peer.PeerInfo{ID: p}
|
||||
if err := r.query.dht.host.Connect(cg.Context(), pi); err != nil {
|
||||
log.Debugf("Error connecting: %s", err)
|
||||
|
||||
notif.PublishQueryEvent(cg.Context(), ¬if.QueryEvent{
|
||||
Type: notif.QueryError,
|
||||
Extra: err.Error(),
|
||||
})
|
||||
|
||||
r.Lock()
|
||||
r.errs = append(r.errs, err)
|
||||
r.Unlock()
|
||||
|
Reference in New Issue
Block a user