1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-25 23:21:54 +08:00

respect verbose option a bit, and show query events for other commands

This commit is contained in:
Jeromy
2015-01-24 00:59:47 +00:00
parent 4607f54254
commit d68efad06c
4 changed files with 137 additions and 32 deletions

View File

@ -14,6 +14,8 @@ import (
u "github.com/jbenet/go-ipfs/util"
)
var ErrNotDHT = errors.New("routing service is not a DHT")
var DhtCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Issue commands directly through the DHT",
@ -87,19 +89,27 @@ var queryDhtCmd = &cmds.Command{
return nil, u.ErrCast()
}
verbose, _, _ := res.Request().Option("v").Bool()
buf := new(bytes.Buffer)
fmt.Fprintf(buf, "%s: ", time.Now().Format("15:04:05.000"))
if verbose {
fmt.Fprintf(buf, "%s: ", time.Now().Format("15:04:05.000"))
}
switch obj.Type {
case notif.FinalPeer:
fmt.Fprintf(buf, "%s\n", obj.ID)
case notif.PeerResponse:
fmt.Fprintf(buf, "* %s says use ", obj.ID)
for _, p := range obj.Responses {
fmt.Fprintf(buf, "%s ", p.ID)
if verbose {
fmt.Fprintf(buf, "* %s says use ", obj.ID)
for _, p := range obj.Responses {
fmt.Fprintf(buf, "%s ", p.ID)
}
fmt.Fprintln(buf)
}
fmt.Fprintln(buf)
case notif.SendingQuery:
fmt.Fprintf(buf, "* querying %s\n", obj.ID)
if verbose {
fmt.Fprintf(buf, "* querying %s\n", obj.ID)
}
case notif.QueryError:
fmt.Fprintf(buf, "error: %s\n", obj.Extra)
default:
@ -139,19 +149,31 @@ FindProviders will return a list of peers who are able to provide the value requ
dht, ok := n.Routing.(*ipdht.IpfsDHT)
if !ok {
return nil, errors.New("Routing service was not a dht")
return nil, ErrNotDHT
}
numProviders := 20
outChan := make(chan interface{})
pchan := dht.FindProvidersAsync(req.Context().Context, u.B58KeyDecode(req.Arguments()[0]), numProviders)
events := make(chan *notif.QueryEvent)
ctx := notif.RegisterForQueryEvents(req.Context().Context, events)
pchan := dht.FindProvidersAsync(ctx, u.B58KeyDecode(req.Arguments()[0]), numProviders)
go func() {
defer close(outChan)
for e := range events {
outChan <- e
}
}()
go func() {
defer close(events)
for p := range pchan {
np := p
outChan <- &np
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.Provider,
Responses: []*peer.PeerInfo{&np},
})
}
}()
return outChan, nil
@ -163,22 +185,41 @@ FindProviders will return a list of peers who are able to provide the value requ
return nil, u.ErrCast()
}
verbose, _, _ := res.Request().Option("v").Bool()
marshal := func(v interface{}) (io.Reader, error) {
obj, ok := v.(*peer.PeerInfo)
obj, ok := v.(*notif.QueryEvent)
if !ok {
return nil, u.ErrCast()
}
verbose, _, err := res.Request().Option("v").Bool()
if err != nil {
return nil, err
}
buf := new(bytes.Buffer)
if verbose {
fmt.Fprintf(buf, "%s: ", time.Now().Format("15:04:05.000"))
}
switch obj.Type {
case notif.FinalPeer:
if verbose {
fmt.Fprintf(buf, "* closest peer %s\n", obj.ID)
}
case notif.Provider:
fmt.Fprintf(buf, "%s\n", obj.ID.Pretty())
} else {
fmt.Fprintf(buf, "%s\n", obj.ID)
case notif.PeerResponse:
if verbose {
fmt.Fprintf(buf, "* %s says use ", obj.ID)
for _, p := range obj.Responses {
fmt.Fprintf(buf, "%s ", p.ID)
}
fmt.Fprintln(buf)
}
case notif.SendingQuery:
if verbose {
fmt.Fprintf(buf, "* querying %s\n", obj.ID)
}
case notif.QueryError:
fmt.Fprintf(buf, "error: %s\n", obj.Extra)
default:
fmt.Fprintf(buf, "unrecognized event type: %d\n", obj.Type)
}
return buf, nil
}
@ -209,7 +250,7 @@ var findPeerDhtCmd = &cmds.Command{
dht, ok := n.Routing.(*ipdht.IpfsDHT)
if !ok {
return nil, errors.New("Routing service was not a dht")
return nil, ErrNotDHT
}
pid, err := peer.IDB58Decode(req.Arguments()[0])
@ -217,29 +258,79 @@ var findPeerDhtCmd = &cmds.Command{
return nil, err
}
pi, err := dht.FindPeer(req.Context().Context, pid)
if err != nil {
return nil, err
}
outChan := make(chan interface{})
events := make(chan *notif.QueryEvent)
ctx := notif.RegisterForQueryEvents(req.Context().Context, events)
return &pi, nil
go func() {
defer close(outChan)
for v := range events {
outChan <- v
}
}()
go func() {
defer close(events)
pi, err := dht.FindPeer(ctx, pid)
if err != nil {
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.QueryError,
Extra: err.Error(),
})
return
}
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.FinalPeer,
Responses: []*peer.PeerInfo{&pi},
})
}()
return outChan, nil
},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
pinfo, ok := res.Output().(*peer.PeerInfo)
outChan, ok := res.Output().(<-chan interface{})
if !ok {
return nil, u.ErrCast()
}
buf := new(bytes.Buffer)
fmt.Fprintf(buf, "found peer: %s\n", pinfo.ID)
fmt.Fprintf(buf, "reported addresses:\n")
for _, addr := range pinfo.Addrs {
fmt.Fprintf(buf, "\t%s\n", addr)
marshal := func(v interface{}) (io.Reader, error) {
obj, ok := v.(*notif.QueryEvent)
if !ok {
return nil, u.ErrCast()
}
buf := new(bytes.Buffer)
fmt.Fprintf(buf, "%s: ", time.Now().Format("15:04:05.000"))
switch obj.Type {
case notif.FinalPeer:
pi := obj.Responses[0]
fmt.Fprintf(buf, "%s\n", pi.ID)
for _, a := range pi.Addrs {
fmt.Fprintf(buf, "\t%s\n", a)
}
case notif.PeerResponse:
fmt.Fprintf(buf, "* %s says use ", obj.ID)
for _, p := range obj.Responses {
fmt.Fprintf(buf, "%s ", p.ID)
}
fmt.Fprintln(buf)
case notif.SendingQuery:
fmt.Fprintf(buf, "* querying %s\n", obj.ID)
case notif.QueryError:
fmt.Fprintf(buf, "error: %s\n", obj.Extra)
default:
fmt.Fprintf(buf, "unrecognized event type: %d\n", obj.Type)
}
return buf, nil
}
return buf, nil
return &cmds.ChannelMarshaler{
Channel: outChan,
Marshaler: marshal,
}, nil
},
},
Type: peer.PeerInfo{},
Type: notif.QueryEvent{},
}

View File

@ -16,6 +16,7 @@ const (
PeerResponse
FinalPeer
QueryError
Provider
)
type QueryEvent struct {

View File

@ -60,7 +60,6 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key u.Key) (<-chan peer
if kb.Closer(clp, dht.self, key) && peerset.TryAdd(clp) {
select {
case out <- clp:
log.Error("Sending out peer: %s", clp.Pretty())
case <-ctx.Done():
return nil, ctx.Err()
}

View File

@ -7,6 +7,7 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
notif "github.com/jbenet/go-ipfs/notifications"
inet "github.com/jbenet/go-ipfs/p2p/net"
peer "github.com/jbenet/go-ipfs/p2p/peer"
"github.com/jbenet/go-ipfs/routing"
@ -242,6 +243,10 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co
_, err := query.Run(ctx, peers)
if err != nil {
log.Errorf("Query error: %s", err)
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.QueryError,
Extra: err.Error(),
})
}
}
@ -269,6 +274,10 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, er
// setup the Query
query := dht.newQuery(u.Key(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.SendingQuery,
ID: p,
})
pmes, err := dht.findPeerSingle(ctx, p, id)
if err != nil {
@ -288,6 +297,11 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, er
}
}
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.PeerResponse,
Responses: pointerizePeerInfos(clpeerInfos),
})
return &dhtQueryResult{closerPeers: clpeerInfos}, nil
})