From 56a5e72760ed3fca5684ec18cc9318a253d1e9e0 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 22 Jan 2015 22:18:36 +0000 Subject: [PATCH] implement dht findprovs and add error output to dht query --- core/commands/dht.go | 83 +++++++++++++++++++++++++++++++++++++++++- notifications/query.go | 15 ++++++-- routing/dht/lookup.go | 1 - routing/dht/query.go | 7 ++++ 4 files changed, 100 insertions(+), 6 deletions(-) diff --git a/core/commands/dht.go b/core/commands/dht.go index 7fa0e9a57..12f5b09b0 100644 --- a/core/commands/dht.go +++ b/core/commands/dht.go @@ -9,6 +9,7 @@ import ( cmds "github.com/jbenet/go-ipfs/commands" notif "github.com/jbenet/go-ipfs/notifications" + peer "github.com/jbenet/go-ipfs/p2p/peer" ipdht "github.com/jbenet/go-ipfs/routing/dht" u "github.com/jbenet/go-ipfs/util" ) @@ -20,7 +21,8 @@ var DhtCmd = &cmds.Command{ }, Subcommands: map[string]*cmds.Command{ - "query": queryDhtCmd, + "query": queryDhtCmd, + "findprovs": findProvidersDhtCmd, }, } @@ -97,6 +99,10 @@ var queryDhtCmd = &cmds.Command{ fmt.Fprintln(buf) case notif.SendingQuery: fmt.Fprintf(buf, "* querying %s\n", obj.ID) + case notif.QueryError: + fmt.Fprintf(buf, "error: %s\n", obj.Extra) + default: + fmt.Fprintf(buf, "unrecognized event type: %d\n", obj.Type) } return buf, nil } @@ -109,3 +115,78 @@ var queryDhtCmd = &cmds.Command{ }, Type: notif.QueryEvent{}, } + +var findProvidersDhtCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Run a 'FindProviders' query through the DHT", + ShortDescription: ` +FindProviders will return a list of peers who are able to provide the value requested. +`, + }, + + Arguments: []cmds.Argument{ + cmds.StringArg("key", true, true, "The key to find providers for"), + }, + Options: []cmds.Option{ + cmds.BoolOption("verbose", "v", "Write extra information"), + }, + Run: func(req cmds.Request) (interface{}, error) { + n, err := req.Context().GetNode() + if err != nil { + return nil, err + } + + dht, ok := n.Routing.(*ipdht.IpfsDHT) + if !ok { + return nil, errors.New("Routing service was not a dht") + } + + numProviders := 20 + + outChan := make(chan interface{}) + pchan := dht.FindProvidersAsync(req.Context().Context, u.B58KeyDecode(req.Arguments()[0]), numProviders) + + go func() { + defer close(outChan) + for p := range pchan { + np := p + outChan <- &np + } + }() + return outChan, nil + }, + Marshalers: cmds.MarshalerMap{ + cmds.Text: func(res cmds.Response) (io.Reader, error) { + outChan, ok := res.Output().(<-chan interface{}) + if !ok { + return nil, u.ErrCast() + } + + marshal := func(v interface{}) (io.Reader, error) { + obj, ok := v.(*peer.PeerInfo) + if !ok { + return nil, u.ErrCast() + } + + verbose, _, err := res.Request().Option("v").Bool() + if err != nil { + return nil, err + } + + buf := new(bytes.Buffer) + if verbose { + fmt.Fprintf(buf, "%s\n", obj.ID.Pretty()) + } else { + fmt.Fprintf(buf, "%s\n", obj.ID) + } + return buf, nil + } + + return &cmds.ChannelMarshaler{ + Channel: outChan, + Marshaler: marshal, + }, nil + }, + }, + Type: peer.PeerInfo{}, +} diff --git a/notifications/query.go b/notifications/query.go index 7713d6747..85c0600e0 100644 --- a/notifications/query.go +++ b/notifications/query.go @@ -15,12 +15,14 @@ const ( SendingQuery QueryEventType = iota PeerResponse FinalPeer + QueryError ) type QueryEvent struct { ID peer.ID Type QueryEventType Responses []*peer.PeerInfo + Extra string } func RegisterForQueryEvents(ctx context.Context, ch chan<- *QueryEvent) context.Context { @@ -49,6 +51,7 @@ func (qe *QueryEvent) MarshalJSON() ([]byte, error) { out["ID"] = peer.IDB58Encode(qe.ID) out["Type"] = int(qe.Type) out["Responses"] = qe.Responses + out["Extra"] = qe.Extra return json.Marshal(out) } @@ -57,17 +60,21 @@ func (qe *QueryEvent) UnmarshalJSON(b []byte) error { ID string Type int Responses []*peer.PeerInfo + Extra string }{} err := json.Unmarshal(b, &temp) if err != nil { return err } - pid, err := peer.IDB58Decode(temp.ID) - if err != nil { - return err + if len(temp.ID) > 0 { + pid, err := peer.IDB58Decode(temp.ID) + if err != nil { + return err + } + qe.ID = pid } - qe.ID = pid qe.Type = QueryEventType(temp.Type) qe.Responses = temp.Responses + qe.Extra = temp.Extra return nil } diff --git a/routing/dht/lookup.go b/routing/dht/lookup.go index c0be519b2..c97e70fb1 100644 --- a/routing/dht/lookup.go +++ b/routing/dht/lookup.go @@ -67,7 +67,6 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key u.Key) (<-chan peer filtered = append(filtered, dht.peerstore.PeerInfo(clp)) } } - log.Errorf("filtered: %v", filtered) // For DHT query command notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ diff --git a/routing/dht/query.go b/routing/dht/query.go index f4150d82e..dfaecef98 100644 --- a/routing/dht/query.go +++ b/routing/dht/query.go @@ -3,6 +3,7 @@ package dht import ( "sync" + notif "github.com/jbenet/go-ipfs/notifications" peer "github.com/jbenet/go-ipfs/p2p/peer" queue "github.com/jbenet/go-ipfs/p2p/peer/queue" "github.com/jbenet/go-ipfs/routing" @@ -230,6 +231,12 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) { pi := peer.PeerInfo{ID: p} if err := r.query.dht.host.Connect(cg.Context(), pi); err != nil { log.Debugf("Error connecting: %s", err) + + notif.PublishQueryEvent(cg.Context(), ¬if.QueryEvent{ + Type: notif.QueryError, + Extra: err.Error(), + }) + r.Lock() r.errs = append(r.errs, err) r.Unlock()