mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-30 18:13:54 +08:00
Merge pull request #801 from jbenet/feat/get-put-cmds
Implement dht put and get CLI commands
This commit is contained in:
@ -26,6 +26,8 @@ var DhtCmd = &cmds.Command{
|
||||
"query": queryDhtCmd,
|
||||
"findprovs": findProvidersDhtCmd,
|
||||
"findpeer": findPeerDhtCmd,
|
||||
"get": getValueDhtCmd,
|
||||
"put": putValueDhtCmd,
|
||||
},
|
||||
}
|
||||
|
||||
@ -352,3 +354,220 @@ var findPeerDhtCmd = &cmds.Command{
|
||||
},
|
||||
Type: notif.QueryEvent{},
|
||||
}
|
||||
|
||||
var getValueDhtCmd = &cmds.Command{
|
||||
Helptext: cmds.HelpText{
|
||||
Tagline: "Run a 'GetValue' query through the DHT",
|
||||
ShortDescription: `
|
||||
GetValue will return the value stored in the dht at the given key.
|
||||
`,
|
||||
},
|
||||
|
||||
Arguments: []cmds.Argument{
|
||||
cmds.StringArg("key", true, true, "The key to find a value for"),
|
||||
},
|
||||
Options: []cmds.Option{
|
||||
cmds.BoolOption("verbose", "v", "Write extra information"),
|
||||
},
|
||||
Run: func(req cmds.Request, res cmds.Response) {
|
||||
n, err := req.Context().GetNode()
|
||||
if err != nil {
|
||||
res.SetError(err, cmds.ErrNormal)
|
||||
return
|
||||
}
|
||||
|
||||
dht, ok := n.Routing.(*ipdht.IpfsDHT)
|
||||
if !ok {
|
||||
res.SetError(ErrNotDHT, cmds.ErrNormal)
|
||||
return
|
||||
}
|
||||
|
||||
outChan := make(chan interface{})
|
||||
res.SetOutput((<-chan interface{})(outChan))
|
||||
|
||||
events := make(chan *notif.QueryEvent)
|
||||
ctx := notif.RegisterForQueryEvents(req.Context().Context, events)
|
||||
|
||||
go func() {
|
||||
defer close(outChan)
|
||||
for e := range events {
|
||||
outChan <- e
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer close(events)
|
||||
val, err := dht.GetValue(ctx, u.B58KeyDecode(req.Arguments()[0]))
|
||||
if err != nil {
|
||||
notif.PublishQueryEvent(ctx, ¬if.QueryEvent{
|
||||
Type: notif.QueryError,
|
||||
Extra: err.Error(),
|
||||
})
|
||||
} else {
|
||||
notif.PublishQueryEvent(ctx, ¬if.QueryEvent{
|
||||
Type: notif.Value,
|
||||
Extra: string(val),
|
||||
})
|
||||
}
|
||||
}()
|
||||
},
|
||||
Marshalers: cmds.MarshalerMap{
|
||||
cmds.Text: func(res cmds.Response) (io.Reader, error) {
|
||||
outChan, ok := res.Output().(<-chan interface{})
|
||||
if !ok {
|
||||
return nil, u.ErrCast()
|
||||
}
|
||||
|
||||
verbose, _, _ := res.Request().Option("v").Bool()
|
||||
|
||||
marshal := func(v interface{}) (io.Reader, error) {
|
||||
obj, ok := v.(*notif.QueryEvent)
|
||||
if !ok {
|
||||
return nil, u.ErrCast()
|
||||
}
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
if verbose {
|
||||
fmt.Fprintf(buf, "%s: ", time.Now().Format("15:04:05.000"))
|
||||
}
|
||||
switch obj.Type {
|
||||
case notif.PeerResponse:
|
||||
if verbose {
|
||||
fmt.Fprintf(buf, "* %s says use ", obj.ID)
|
||||
for _, p := range obj.Responses {
|
||||
fmt.Fprintf(buf, "%s ", p.ID)
|
||||
}
|
||||
fmt.Fprintln(buf)
|
||||
}
|
||||
case notif.SendingQuery:
|
||||
if verbose {
|
||||
fmt.Fprintf(buf, "* querying %s\n", obj.ID)
|
||||
}
|
||||
case notif.Value:
|
||||
fmt.Fprintf(buf, "got value: '%s'\n", obj.Extra)
|
||||
case notif.QueryError:
|
||||
fmt.Fprintf(buf, "error: %s\n", obj.Extra)
|
||||
default:
|
||||
fmt.Fprintf(buf, "unrecognized event type: %d\n", obj.Type)
|
||||
}
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
return &cmds.ChannelMarshaler{
|
||||
Channel: outChan,
|
||||
Marshaler: marshal,
|
||||
}, nil
|
||||
},
|
||||
},
|
||||
Type: notif.QueryEvent{},
|
||||
}
|
||||
|
||||
var putValueDhtCmd = &cmds.Command{
|
||||
Helptext: cmds.HelpText{
|
||||
Tagline: "Run a 'PutValue' query through the DHT",
|
||||
ShortDescription: `
|
||||
PutValue will store the given key value pair in the dht.
|
||||
`,
|
||||
},
|
||||
|
||||
Arguments: []cmds.Argument{
|
||||
cmds.StringArg("key", true, false, "The key to store the value at"),
|
||||
cmds.StringArg("value", true, false, "The value to store").EnableStdin(),
|
||||
},
|
||||
Options: []cmds.Option{
|
||||
cmds.BoolOption("verbose", "v", "Write extra information"),
|
||||
},
|
||||
Run: func(req cmds.Request, res cmds.Response) {
|
||||
n, err := req.Context().GetNode()
|
||||
if err != nil {
|
||||
res.SetError(err, cmds.ErrNormal)
|
||||
return
|
||||
}
|
||||
|
||||
dht, ok := n.Routing.(*ipdht.IpfsDHT)
|
||||
if !ok {
|
||||
res.SetError(ErrNotDHT, cmds.ErrNormal)
|
||||
return
|
||||
}
|
||||
|
||||
outChan := make(chan interface{})
|
||||
res.SetOutput((<-chan interface{})(outChan))
|
||||
|
||||
events := make(chan *notif.QueryEvent)
|
||||
ctx := notif.RegisterForQueryEvents(req.Context().Context, events)
|
||||
|
||||
key := u.B58KeyDecode(req.Arguments()[0])
|
||||
data := req.Arguments()[1]
|
||||
|
||||
go func() {
|
||||
defer close(outChan)
|
||||
for e := range events {
|
||||
outChan <- e
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer close(events)
|
||||
err := dht.PutValue(ctx, key, []byte(data))
|
||||
if err != nil {
|
||||
notif.PublishQueryEvent(ctx, ¬if.QueryEvent{
|
||||
Type: notif.QueryError,
|
||||
Extra: err.Error(),
|
||||
})
|
||||
}
|
||||
}()
|
||||
},
|
||||
Marshalers: cmds.MarshalerMap{
|
||||
cmds.Text: func(res cmds.Response) (io.Reader, error) {
|
||||
outChan, ok := res.Output().(<-chan interface{})
|
||||
if !ok {
|
||||
return nil, u.ErrCast()
|
||||
}
|
||||
|
||||
verbose, _, _ := res.Request().Option("v").Bool()
|
||||
|
||||
marshal := func(v interface{}) (io.Reader, error) {
|
||||
obj, ok := v.(*notif.QueryEvent)
|
||||
if !ok {
|
||||
return nil, u.ErrCast()
|
||||
}
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
if verbose {
|
||||
fmt.Fprintf(buf, "%s: ", time.Now().Format("15:04:05.000"))
|
||||
}
|
||||
switch obj.Type {
|
||||
case notif.FinalPeer:
|
||||
if verbose {
|
||||
fmt.Fprintf(buf, "* closest peer %s\n", obj.ID)
|
||||
}
|
||||
case notif.PeerResponse:
|
||||
if verbose {
|
||||
fmt.Fprintf(buf, "* %s says use ", obj.ID)
|
||||
for _, p := range obj.Responses {
|
||||
fmt.Fprintf(buf, "%s ", p.ID)
|
||||
}
|
||||
fmt.Fprintln(buf)
|
||||
}
|
||||
case notif.SendingQuery:
|
||||
if verbose {
|
||||
fmt.Fprintf(buf, "* querying %s\n", obj.ID)
|
||||
}
|
||||
case notif.QueryError:
|
||||
fmt.Fprintf(buf, "error: %s\n", obj.Extra)
|
||||
case notif.Value:
|
||||
fmt.Fprintf(buf, "storing value at %s\n", obj.ID)
|
||||
default:
|
||||
fmt.Fprintf(buf, "unrecognized event type: %d\n", obj.Type)
|
||||
}
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
return &cmds.ChannelMarshaler{
|
||||
Channel: outChan,
|
||||
Marshaler: marshal,
|
||||
}, nil
|
||||
},
|
||||
},
|
||||
Type: notif.QueryEvent{},
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ const (
|
||||
FinalPeer
|
||||
QueryError
|
||||
Provider
|
||||
Value
|
||||
)
|
||||
|
||||
type QueryEvent struct {
|
||||
|
@ -322,7 +322,7 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) [
|
||||
// == to self? thats bad
|
||||
for _, p := range closer {
|
||||
if p == dht.self {
|
||||
log.Debug("Attempted to return self! this shouldnt happen...")
|
||||
log.Info("Attempted to return self! this shouldnt happen...")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@ -83,7 +83,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
|
||||
|
||||
// Find closest peer on given cluster to desired key and reply with that info
|
||||
closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
|
||||
if closer != nil {
|
||||
if len(closer) > 0 {
|
||||
closerinfos := peer.PeerInfos(dht.peerstore, closer)
|
||||
for _, pi := range closerinfos {
|
||||
log.Debugf("handleGetValue returning closer peer: '%s'", pi.ID)
|
||||
|
@ -1,7 +1,6 @@
|
||||
package dht
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -59,6 +58,11 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error
|
||||
wg.Add(1)
|
||||
go func(p peer.ID) {
|
||||
defer wg.Done()
|
||||
notif.PublishQueryEvent(ctx, ¬if.QueryEvent{
|
||||
Type: notif.Value,
|
||||
ID: p,
|
||||
})
|
||||
|
||||
err := dht.putValueToPeer(ctx, p, key, rec)
|
||||
if err != nil {
|
||||
log.Debugf("failed putting value to peer: %s", err)
|
||||
@ -92,6 +96,11 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
|
||||
|
||||
// setup the Query
|
||||
query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
|
||||
notif.PublishQueryEvent(ctx, ¬if.QueryEvent{
|
||||
Type: notif.SendingQuery,
|
||||
ID: p,
|
||||
})
|
||||
|
||||
val, peers, err := dht.getValueOrPeers(ctx, p, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -102,6 +111,12 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
|
||||
res.success = true
|
||||
}
|
||||
|
||||
notif.PublishQueryEvent(ctx, ¬if.QueryEvent{
|
||||
Type: notif.PeerResponse,
|
||||
ID: p,
|
||||
Responses: pointerizePeerInfos(peers),
|
||||
})
|
||||
|
||||
return res, nil
|
||||
})
|
||||
|
||||
@ -153,7 +168,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error {
|
||||
// FindProviders searches until the context expires.
|
||||
func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]peer.PeerInfo, error) {
|
||||
var providers []peer.PeerInfo
|
||||
for p := range dht.FindProvidersAsync(ctx, key, math.MaxInt32) {
|
||||
for p := range dht.FindProvidersAsync(ctx, key, KValue) {
|
||||
providers = append(providers, p)
|
||||
}
|
||||
return providers, nil
|
||||
|
Reference in New Issue
Block a user