mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-05 07:29:02 +08:00
783 lines
18 KiB
Go
783 lines
18 KiB
Go
package commands
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"time"
|
|
|
|
cmds "github.com/ipfs/go-ipfs/commands"
|
|
dag "github.com/ipfs/go-ipfs/merkledag"
|
|
path "github.com/ipfs/go-ipfs/path"
|
|
|
|
routing "gx/ipfs/QmNdaQ8itUU9jEZUwTsG4gHMaPmRfi6FEe89QjQAFbep3M/go-libp2p-routing"
|
|
notif "gx/ipfs/QmNdaQ8itUU9jEZUwTsG4gHMaPmRfi6FEe89QjQAFbep3M/go-libp2p-routing/notifications"
|
|
ipdht "gx/ipfs/QmRmroYSdievxnjiuy99C8BzShNstdEWcEF3LQHF7fUbez/go-libp2p-kad-dht"
|
|
b58 "gx/ipfs/QmT8rehPR3F6bmwL6zjUN8XpiDBFFpMP2myPdC6ApsWfJf/go-base58"
|
|
u "gx/ipfs/QmWbjfz3u6HkAdPh34dgPchGbQjob6LXLhAeCGii2TX69n/go-ipfs-util"
|
|
pstore "gx/ipfs/QmXZSd1qR5BxZkPyuwfT5jpqQFScZccoZvDneXsKzCNHWX/go-libp2p-peerstore"
|
|
cid "gx/ipfs/QmYhQaCYEcaPPjxJX7YcPcVKkQfRy6sJ7B3XmGFk82XYdQ/go-cid"
|
|
peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
|
|
)
|
|
|
|
var ErrNotDHT = errors.New("routing service is not a DHT")
|
|
|
|
var DhtCmd = &cmds.Command{
|
|
Helptext: cmds.HelpText{
|
|
Tagline: "Issue commands directly through the DHT.",
|
|
ShortDescription: ``,
|
|
},
|
|
|
|
Subcommands: map[string]*cmds.Command{
|
|
"query": queryDhtCmd,
|
|
"findprovs": findProvidersDhtCmd,
|
|
"findpeer": findPeerDhtCmd,
|
|
"get": getValueDhtCmd,
|
|
"put": putValueDhtCmd,
|
|
"provide": provideRefDhtCmd,
|
|
},
|
|
}
|
|
|
|
var queryDhtCmd = &cmds.Command{
|
|
Helptext: cmds.HelpText{
|
|
Tagline: "Find the closest Peer IDs to a given Peer ID by querying the DHT.",
|
|
ShortDescription: "Outputs a list of newline-delimited Peer IDs.",
|
|
},
|
|
|
|
Arguments: []cmds.Argument{
|
|
cmds.StringArg("peerID", true, true, "The peerID to run the query against."),
|
|
},
|
|
Options: []cmds.Option{
|
|
cmds.BoolOption("verbose", "v", "Print extra information.").Default(false),
|
|
},
|
|
Run: func(req cmds.Request, res cmds.Response) {
|
|
n, err := req.InvocContext().GetNode()
|
|
if err != nil {
|
|
res.SetError(err, cmds.ErrNormal)
|
|
return
|
|
}
|
|
|
|
dht, ok := n.Routing.(*ipdht.IpfsDHT)
|
|
if !ok {
|
|
res.SetError(ErrNotDHT, cmds.ErrNormal)
|
|
return
|
|
}
|
|
|
|
events := make(chan *notif.QueryEvent)
|
|
ctx := notif.RegisterForQueryEvents(req.Context(), events)
|
|
|
|
k := string(b58.Decode(req.Arguments()[0]))
|
|
|
|
closestPeers, err := dht.GetClosestPeers(ctx, k)
|
|
if err != nil {
|
|
res.SetError(err, cmds.ErrNormal)
|
|
return
|
|
}
|
|
|
|
go func() {
|
|
defer close(events)
|
|
for p := range closestPeers {
|
|
notif.PublishQueryEvent(ctx, ¬if.QueryEvent{
|
|
ID: p,
|
|
Type: notif.FinalPeer,
|
|
})
|
|
}
|
|
}()
|
|
|
|
outChan := make(chan interface{})
|
|
res.SetOutput((<-chan interface{})(outChan))
|
|
|
|
go func() {
|
|
defer close(outChan)
|
|
for e := range events {
|
|
outChan <- e
|
|
}
|
|
}()
|
|
},
|
|
Marshalers: cmds.MarshalerMap{
|
|
cmds.Text: func(res cmds.Response) (io.Reader, error) {
|
|
outChan, ok := res.Output().(<-chan interface{})
|
|
if !ok {
|
|
return nil, u.ErrCast()
|
|
}
|
|
|
|
pfm := pfuncMap{
|
|
notif.PeerResponse: func(obj *notif.QueryEvent, out io.Writer, verbose bool) {
|
|
for _, p := range obj.Responses {
|
|
fmt.Fprintf(out, "%s\n", p.ID.Pretty())
|
|
}
|
|
},
|
|
}
|
|
|
|
marshal := func(v interface{}) (io.Reader, error) {
|
|
obj, ok := v.(*notif.QueryEvent)
|
|
if !ok {
|
|
return nil, u.ErrCast()
|
|
}
|
|
|
|
verbose, _, _ := res.Request().Option("v").Bool()
|
|
|
|
buf := new(bytes.Buffer)
|
|
printEvent(obj, buf, verbose, pfm)
|
|
return buf, nil
|
|
}
|
|
|
|
return &cmds.ChannelMarshaler{
|
|
Channel: outChan,
|
|
Marshaler: marshal,
|
|
Res: res,
|
|
}, nil
|
|
},
|
|
},
|
|
Type: notif.QueryEvent{},
|
|
}
|
|
|
|
var findProvidersDhtCmd = &cmds.Command{
|
|
Helptext: cmds.HelpText{
|
|
Tagline: "Find peers in the DHT that can provide a specific value, given a key.",
|
|
ShortDescription: "Outputs a list of newline-delimited provider Peer IDs.",
|
|
},
|
|
|
|
Arguments: []cmds.Argument{
|
|
cmds.StringArg("key", true, true, "The key to find providers for."),
|
|
},
|
|
Options: []cmds.Option{
|
|
cmds.BoolOption("verbose", "v", "Print extra information.").Default(false),
|
|
},
|
|
Run: func(req cmds.Request, res cmds.Response) {
|
|
n, err := req.InvocContext().GetNode()
|
|
if err != nil {
|
|
res.SetError(err, cmds.ErrNormal)
|
|
return
|
|
}
|
|
|
|
dht, ok := n.Routing.(*ipdht.IpfsDHT)
|
|
if !ok {
|
|
res.SetError(ErrNotDHT, cmds.ErrNormal)
|
|
return
|
|
}
|
|
|
|
numProviders := 20
|
|
|
|
outChan := make(chan interface{})
|
|
res.SetOutput((<-chan interface{})(outChan))
|
|
|
|
events := make(chan *notif.QueryEvent)
|
|
ctx := notif.RegisterForQueryEvents(req.Context(), events)
|
|
|
|
c, err := cid.Decode(req.Arguments()[0])
|
|
if err != nil {
|
|
res.SetError(err, cmds.ErrNormal)
|
|
return
|
|
}
|
|
|
|
pchan := dht.FindProvidersAsync(ctx, c, numProviders)
|
|
go func() {
|
|
defer close(outChan)
|
|
for e := range events {
|
|
outChan <- e
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
defer close(events)
|
|
for p := range pchan {
|
|
np := p
|
|
notif.PublishQueryEvent(ctx, ¬if.QueryEvent{
|
|
Type: notif.Provider,
|
|
Responses: []*pstore.PeerInfo{&np},
|
|
})
|
|
}
|
|
}()
|
|
},
|
|
Marshalers: cmds.MarshalerMap{
|
|
cmds.Text: func(res cmds.Response) (io.Reader, error) {
|
|
outChan, ok := res.Output().(<-chan interface{})
|
|
if !ok {
|
|
return nil, u.ErrCast()
|
|
}
|
|
|
|
verbose, _, _ := res.Request().Option("v").Bool()
|
|
pfm := pfuncMap{
|
|
notif.FinalPeer: func(obj *notif.QueryEvent, out io.Writer, verbose bool) {
|
|
if verbose {
|
|
fmt.Fprintf(out, "* closest peer %s\n", obj.ID)
|
|
}
|
|
},
|
|
notif.Provider: func(obj *notif.QueryEvent, out io.Writer, verbose bool) {
|
|
prov := obj.Responses[0]
|
|
if verbose {
|
|
fmt.Fprintf(out, "provider: ")
|
|
}
|
|
fmt.Fprintf(out, "%s\n", prov.ID.Pretty())
|
|
if verbose {
|
|
for _, a := range prov.Addrs {
|
|
fmt.Fprintf(out, "\t%s\n", a)
|
|
}
|
|
}
|
|
},
|
|
}
|
|
|
|
marshal := func(v interface{}) (io.Reader, error) {
|
|
obj, ok := v.(*notif.QueryEvent)
|
|
if !ok {
|
|
return nil, u.ErrCast()
|
|
}
|
|
|
|
buf := new(bytes.Buffer)
|
|
printEvent(obj, buf, verbose, pfm)
|
|
return buf, nil
|
|
}
|
|
|
|
return &cmds.ChannelMarshaler{
|
|
Channel: outChan,
|
|
Marshaler: marshal,
|
|
Res: res,
|
|
}, nil
|
|
},
|
|
},
|
|
Type: notif.QueryEvent{},
|
|
}
|
|
|
|
var provideRefDhtCmd = &cmds.Command{
|
|
Helptext: cmds.HelpText{
|
|
Tagline: "Announce to the network that you are providing given values.",
|
|
},
|
|
|
|
Arguments: []cmds.Argument{
|
|
cmds.StringArg("key", true, true, "The key[s] to send provide records for.").EnableStdin(),
|
|
},
|
|
Options: []cmds.Option{
|
|
cmds.BoolOption("verbose", "v", "Print extra information.").Default(false),
|
|
cmds.BoolOption("recursive", "r", "Recursively provide entire graph.").Default(false),
|
|
},
|
|
Run: func(req cmds.Request, res cmds.Response) {
|
|
n, err := req.InvocContext().GetNode()
|
|
if err != nil {
|
|
res.SetError(err, cmds.ErrNormal)
|
|
return
|
|
}
|
|
|
|
if n.Routing == nil {
|
|
res.SetError(errNotOnline, cmds.ErrNormal)
|
|
return
|
|
}
|
|
|
|
rec, _, _ := req.Option("recursive").Bool()
|
|
|
|
var cids []*cid.Cid
|
|
for _, arg := range req.Arguments() {
|
|
c, err := cid.Decode(arg)
|
|
if err != nil {
|
|
res.SetError(err, cmds.ErrNormal)
|
|
return
|
|
}
|
|
|
|
has, err := n.Blockstore.Has(c)
|
|
if err != nil {
|
|
res.SetError(err, cmds.ErrNormal)
|
|
return
|
|
}
|
|
|
|
if !has {
|
|
res.SetError(fmt.Errorf("block %s not found locally, cannot provide", c), cmds.ErrNormal)
|
|
return
|
|
}
|
|
|
|
cids = append(cids, c)
|
|
}
|
|
|
|
outChan := make(chan interface{})
|
|
res.SetOutput((<-chan interface{})(outChan))
|
|
|
|
events := make(chan *notif.QueryEvent)
|
|
ctx := notif.RegisterForQueryEvents(req.Context(), events)
|
|
|
|
go func() {
|
|
defer close(outChan)
|
|
for e := range events {
|
|
outChan <- e
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
defer close(events)
|
|
var err error
|
|
if rec {
|
|
err = provideKeysRec(ctx, n.Routing, n.DAG, cids)
|
|
} else {
|
|
err = provideKeys(ctx, n.Routing, cids)
|
|
}
|
|
if err != nil {
|
|
notif.PublishQueryEvent(ctx, ¬if.QueryEvent{
|
|
Type: notif.QueryError,
|
|
Extra: err.Error(),
|
|
})
|
|
}
|
|
}()
|
|
},
|
|
Marshalers: cmds.MarshalerMap{
|
|
cmds.Text: func(res cmds.Response) (io.Reader, error) {
|
|
outChan, ok := res.Output().(<-chan interface{})
|
|
if !ok {
|
|
return nil, u.ErrCast()
|
|
}
|
|
|
|
verbose, _, _ := res.Request().Option("v").Bool()
|
|
pfm := pfuncMap{
|
|
notif.FinalPeer: func(obj *notif.QueryEvent, out io.Writer, verbose bool) {
|
|
if verbose {
|
|
fmt.Fprintf(out, "sending provider record to peer %s\n", obj.ID)
|
|
}
|
|
},
|
|
}
|
|
|
|
marshal := func(v interface{}) (io.Reader, error) {
|
|
obj, ok := v.(*notif.QueryEvent)
|
|
if !ok {
|
|
return nil, u.ErrCast()
|
|
}
|
|
|
|
buf := new(bytes.Buffer)
|
|
printEvent(obj, buf, verbose, pfm)
|
|
return buf, nil
|
|
}
|
|
|
|
return &cmds.ChannelMarshaler{
|
|
Channel: outChan,
|
|
Marshaler: marshal,
|
|
Res: res,
|
|
}, nil
|
|
},
|
|
},
|
|
Type: notif.QueryEvent{},
|
|
}
|
|
|
|
func provideKeys(ctx context.Context, r routing.IpfsRouting, cids []*cid.Cid) error {
|
|
for _, c := range cids {
|
|
err := r.Provide(ctx, c, true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv dag.DAGService, cids []*cid.Cid) error {
|
|
provided := cid.NewSet()
|
|
for _, c := range cids {
|
|
kset := cid.NewSet()
|
|
|
|
err := dag.EnumerateChildrenAsync(ctx, dag.GetLinksDirect(dserv), c, kset.Visit)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, k := range kset.Keys() {
|
|
if provided.Has(k) {
|
|
continue
|
|
}
|
|
|
|
err = r.Provide(ctx, k, true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
provided.Add(k)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
var findPeerDhtCmd = &cmds.Command{
|
|
Helptext: cmds.HelpText{
|
|
Tagline: "Query the DHT for all of the multiaddresses associated with a Peer ID.",
|
|
ShortDescription: "Outputs a list of newline-delimited multiaddresses.",
|
|
},
|
|
|
|
Arguments: []cmds.Argument{
|
|
cmds.StringArg("peerID", true, true, "The ID of the peer to search for."),
|
|
},
|
|
Options: []cmds.Option{
|
|
cmds.BoolOption("verbose", "v", "Print extra information.").Default(false),
|
|
},
|
|
Run: func(req cmds.Request, res cmds.Response) {
|
|
n, err := req.InvocContext().GetNode()
|
|
if err != nil {
|
|
res.SetError(err, cmds.ErrNormal)
|
|
return
|
|
}
|
|
|
|
dht, ok := n.Routing.(*ipdht.IpfsDHT)
|
|
if !ok {
|
|
res.SetError(ErrNotDHT, cmds.ErrNormal)
|
|
return
|
|
}
|
|
|
|
pid, err := peer.IDB58Decode(req.Arguments()[0])
|
|
if err != nil {
|
|
res.SetError(err, cmds.ErrNormal)
|
|
return
|
|
}
|
|
|
|
outChan := make(chan interface{})
|
|
res.SetOutput((<-chan interface{})(outChan))
|
|
|
|
events := make(chan *notif.QueryEvent)
|
|
ctx := notif.RegisterForQueryEvents(req.Context(), events)
|
|
|
|
go func() {
|
|
defer close(outChan)
|
|
for v := range events {
|
|
outChan <- v
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
defer close(events)
|
|
pi, err := dht.FindPeer(ctx, pid)
|
|
if err != nil {
|
|
notif.PublishQueryEvent(ctx, ¬if.QueryEvent{
|
|
Type: notif.QueryError,
|
|
Extra: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
notif.PublishQueryEvent(ctx, ¬if.QueryEvent{
|
|
Type: notif.FinalPeer,
|
|
Responses: []*pstore.PeerInfo{&pi},
|
|
})
|
|
}()
|
|
},
|
|
Marshalers: cmds.MarshalerMap{
|
|
cmds.Text: func(res cmds.Response) (io.Reader, error) {
|
|
outChan, ok := res.Output().(<-chan interface{})
|
|
if !ok {
|
|
return nil, u.ErrCast()
|
|
}
|
|
|
|
verbose, _, _ := res.Request().Option("v").Bool()
|
|
|
|
pfm := pfuncMap{
|
|
notif.FinalPeer: func(obj *notif.QueryEvent, out io.Writer, verbose bool) {
|
|
pi := obj.Responses[0]
|
|
for _, a := range pi.Addrs {
|
|
fmt.Fprintf(out, "%s\n", a)
|
|
}
|
|
},
|
|
}
|
|
marshal := func(v interface{}) (io.Reader, error) {
|
|
obj, ok := v.(*notif.QueryEvent)
|
|
if !ok {
|
|
return nil, u.ErrCast()
|
|
}
|
|
|
|
buf := new(bytes.Buffer)
|
|
printEvent(obj, buf, verbose, pfm)
|
|
return buf, nil
|
|
}
|
|
|
|
return &cmds.ChannelMarshaler{
|
|
Channel: outChan,
|
|
Marshaler: marshal,
|
|
Res: res,
|
|
}, nil
|
|
},
|
|
},
|
|
Type: notif.QueryEvent{},
|
|
}
|
|
|
|
var getValueDhtCmd = &cmds.Command{
|
|
Helptext: cmds.HelpText{
|
|
Tagline: "Given a key, query the DHT for its best value.",
|
|
ShortDescription: `
|
|
Outputs the best value for the given key.
|
|
|
|
There may be several different values for a given key stored in the DHT; in
|
|
this context 'best' means the record that is most desirable. There is no one
|
|
metric for 'best': it depends entirely on the key type. For IPNS, 'best' is
|
|
the record that is both valid and has the highest sequence number (freshest).
|
|
Different key types can specify other 'best' rules.
|
|
`,
|
|
},
|
|
|
|
Arguments: []cmds.Argument{
|
|
cmds.StringArg("key", true, true, "The key to find a value for."),
|
|
},
|
|
Options: []cmds.Option{
|
|
cmds.BoolOption("verbose", "v", "Print extra information.").Default(false),
|
|
},
|
|
Run: func(req cmds.Request, res cmds.Response) {
|
|
n, err := req.InvocContext().GetNode()
|
|
if err != nil {
|
|
res.SetError(err, cmds.ErrNormal)
|
|
return
|
|
}
|
|
|
|
dht, ok := n.Routing.(*ipdht.IpfsDHT)
|
|
if !ok {
|
|
res.SetError(ErrNotDHT, cmds.ErrNormal)
|
|
return
|
|
}
|
|
|
|
outChan := make(chan interface{})
|
|
res.SetOutput((<-chan interface{})(outChan))
|
|
|
|
events := make(chan *notif.QueryEvent)
|
|
ctx := notif.RegisterForQueryEvents(req.Context(), events)
|
|
|
|
dhtkey, err := escapeDhtKey(req.Arguments()[0])
|
|
if err != nil {
|
|
res.SetError(err, cmds.ErrNormal)
|
|
return
|
|
}
|
|
|
|
go func() {
|
|
defer close(outChan)
|
|
for e := range events {
|
|
outChan <- e
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
defer close(events)
|
|
val, err := dht.GetValue(ctx, dhtkey)
|
|
if err != nil {
|
|
notif.PublishQueryEvent(ctx, ¬if.QueryEvent{
|
|
Type: notif.QueryError,
|
|
Extra: err.Error(),
|
|
})
|
|
} else {
|
|
notif.PublishQueryEvent(ctx, ¬if.QueryEvent{
|
|
Type: notif.Value,
|
|
Extra: string(val),
|
|
})
|
|
}
|
|
}()
|
|
},
|
|
Marshalers: cmds.MarshalerMap{
|
|
cmds.Text: func(res cmds.Response) (io.Reader, error) {
|
|
outChan, ok := res.Output().(<-chan interface{})
|
|
if !ok {
|
|
return nil, u.ErrCast()
|
|
}
|
|
|
|
verbose, _, _ := res.Request().Option("v").Bool()
|
|
|
|
pfm := pfuncMap{
|
|
notif.Value: func(obj *notif.QueryEvent, out io.Writer, verbose bool) {
|
|
if verbose {
|
|
fmt.Fprintf(out, "got value: '%s'\n", obj.Extra)
|
|
} else {
|
|
fmt.Fprintln(out, obj.Extra)
|
|
}
|
|
},
|
|
}
|
|
marshal := func(v interface{}) (io.Reader, error) {
|
|
obj, ok := v.(*notif.QueryEvent)
|
|
if !ok {
|
|
return nil, u.ErrCast()
|
|
}
|
|
|
|
buf := new(bytes.Buffer)
|
|
|
|
printEvent(obj, buf, verbose, pfm)
|
|
|
|
return buf, nil
|
|
}
|
|
|
|
return &cmds.ChannelMarshaler{
|
|
Channel: outChan,
|
|
Marshaler: marshal,
|
|
Res: res,
|
|
}, nil
|
|
},
|
|
},
|
|
Type: notif.QueryEvent{},
|
|
}
|
|
|
|
var putValueDhtCmd = &cmds.Command{
|
|
Helptext: cmds.HelpText{
|
|
Tagline: "Write a key/value pair to the DHT.",
|
|
ShortDescription: `
|
|
Given a key of the form /foo/bar and a value of any form, this will write that
|
|
value to the DHT with that key.
|
|
|
|
Keys have two parts: a keytype (foo) and the key name (bar). IPNS uses the
|
|
/ipns keytype, and expects the key name to be a Peer ID. IPNS entries are
|
|
specifically formatted (protocol buffer).
|
|
|
|
You may only use keytypes that are supported in your ipfs binary: currently
|
|
this is only /ipns. Unless you have a relatively deep understanding of the
|
|
go-ipfs DHT internals, you likely want to be using 'ipfs name publish' instead
|
|
of this.
|
|
|
|
Value is arbitrary text. Standard input can be used to provide value.
|
|
|
|
NOTE: A value may not exceed 2048 bytes.
|
|
`,
|
|
},
|
|
|
|
Arguments: []cmds.Argument{
|
|
cmds.StringArg("key", true, false, "The key to store the value at."),
|
|
cmds.StringArg("value", true, false, "The value to store.").EnableStdin(),
|
|
},
|
|
Options: []cmds.Option{
|
|
cmds.BoolOption("verbose", "v", "Print extra information.").Default(false),
|
|
},
|
|
Run: func(req cmds.Request, res cmds.Response) {
|
|
n, err := req.InvocContext().GetNode()
|
|
if err != nil {
|
|
res.SetError(err, cmds.ErrNormal)
|
|
return
|
|
}
|
|
|
|
dht, ok := n.Routing.(*ipdht.IpfsDHT)
|
|
if !ok {
|
|
res.SetError(ErrNotDHT, cmds.ErrNormal)
|
|
return
|
|
}
|
|
|
|
outChan := make(chan interface{})
|
|
res.SetOutput((<-chan interface{})(outChan))
|
|
|
|
events := make(chan *notif.QueryEvent)
|
|
ctx := notif.RegisterForQueryEvents(req.Context(), events)
|
|
|
|
key, err := escapeDhtKey(req.Arguments()[0])
|
|
if err != nil {
|
|
res.SetError(err, cmds.ErrNormal)
|
|
return
|
|
}
|
|
|
|
data := req.Arguments()[1]
|
|
|
|
go func() {
|
|
defer close(outChan)
|
|
for e := range events {
|
|
outChan <- e
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
defer close(events)
|
|
err := dht.PutValue(ctx, key, []byte(data))
|
|
if err != nil {
|
|
notif.PublishQueryEvent(ctx, ¬if.QueryEvent{
|
|
Type: notif.QueryError,
|
|
Extra: err.Error(),
|
|
})
|
|
}
|
|
}()
|
|
},
|
|
Marshalers: cmds.MarshalerMap{
|
|
cmds.Text: func(res cmds.Response) (io.Reader, error) {
|
|
outChan, ok := res.Output().(<-chan interface{})
|
|
if !ok {
|
|
return nil, u.ErrCast()
|
|
}
|
|
|
|
verbose, _, _ := res.Request().Option("v").Bool()
|
|
pfm := pfuncMap{
|
|
notif.FinalPeer: func(obj *notif.QueryEvent, out io.Writer, verbose bool) {
|
|
if verbose {
|
|
fmt.Fprintf(out, "* closest peer %s\n", obj.ID)
|
|
}
|
|
},
|
|
notif.Value: func(obj *notif.QueryEvent, out io.Writer, verbose bool) {
|
|
fmt.Fprintf(out, "%s\n", obj.ID.Pretty())
|
|
},
|
|
}
|
|
|
|
marshal := func(v interface{}) (io.Reader, error) {
|
|
obj, ok := v.(*notif.QueryEvent)
|
|
if !ok {
|
|
return nil, u.ErrCast()
|
|
}
|
|
|
|
buf := new(bytes.Buffer)
|
|
printEvent(obj, buf, verbose, pfm)
|
|
|
|
return buf, nil
|
|
}
|
|
|
|
return &cmds.ChannelMarshaler{
|
|
Channel: outChan,
|
|
Marshaler: marshal,
|
|
Res: res,
|
|
}, nil
|
|
},
|
|
},
|
|
Type: notif.QueryEvent{},
|
|
}
|
|
|
|
type printFunc func(obj *notif.QueryEvent, out io.Writer, verbose bool)
|
|
type pfuncMap map[notif.QueryEventType]printFunc
|
|
|
|
func printEvent(obj *notif.QueryEvent, out io.Writer, verbose bool, override pfuncMap) {
|
|
if verbose {
|
|
fmt.Fprintf(out, "%s: ", time.Now().Format("15:04:05.000"))
|
|
}
|
|
|
|
if override != nil {
|
|
if pf, ok := override[obj.Type]; ok {
|
|
pf(obj, out, verbose)
|
|
return
|
|
}
|
|
}
|
|
|
|
switch obj.Type {
|
|
case notif.SendingQuery:
|
|
if verbose {
|
|
fmt.Fprintf(out, "* querying %s\n", obj.ID)
|
|
}
|
|
case notif.Value:
|
|
if verbose {
|
|
fmt.Fprintf(out, "got value: '%s'\n", obj.Extra)
|
|
} else {
|
|
fmt.Fprint(out, obj.Extra)
|
|
}
|
|
case notif.PeerResponse:
|
|
if verbose {
|
|
fmt.Fprintf(out, "* %s says use ", obj.ID)
|
|
for _, p := range obj.Responses {
|
|
fmt.Fprintf(out, "%s ", p.ID)
|
|
}
|
|
fmt.Fprintln(out)
|
|
}
|
|
case notif.QueryError:
|
|
if verbose {
|
|
fmt.Fprintf(out, "error: %s\n", obj.Extra)
|
|
}
|
|
case notif.DialingPeer:
|
|
if verbose {
|
|
fmt.Fprintf(out, "dialing peer: %s\n", obj.ID)
|
|
}
|
|
case notif.AddingPeer:
|
|
if verbose {
|
|
fmt.Fprintf(out, "adding peer to query: %s\n", obj.ID)
|
|
}
|
|
case notif.FinalPeer:
|
|
default:
|
|
if verbose {
|
|
fmt.Fprintf(out, "unrecognized event type: %d\n", obj.Type)
|
|
}
|
|
}
|
|
}
|
|
|
|
func escapeDhtKey(s string) (string, error) {
|
|
parts := path.SplitList(s)
|
|
switch len(parts) {
|
|
case 1:
|
|
return string(b58.Decode(s)), nil
|
|
case 3:
|
|
k := b58.Decode(parts[2])
|
|
return path.Join(append(parts[:2], string(k))), nil
|
|
default:
|
|
return "", errors.New("invalid key")
|
|
}
|
|
}
|