1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-07-01 02:30:39 +08:00

Merge pull request #395 from jbenet/misc/2014-11-11th-hour

miscellaneous +/- (commands, dht, net)
This commit is contained in:
Brian Tiger Chow
2014-12-05 21:00:40 -08:00
25 changed files with 113 additions and 81 deletions

View File

@ -5,7 +5,7 @@ import (
"errors"
"fmt"
"os"
fp "path/filepath"
fp "path"
"runtime"
"strings"

View File

@ -48,14 +48,21 @@ func (c *client) Send(req cmds.Request) (cmds.Response, error) {
}
var fileReader *MultiFileReader
var reader io.Reader
if req.Files() != nil {
fileReader = NewMultiFileReader(req.Files(), true)
reader = fileReader
} else {
// if we have no file data, use an empty Reader
// (http.NewRequest panics when a nil Reader is used)
reader = strings.NewReader("")
}
path := strings.Join(req.Path(), "/")
url := fmt.Sprintf(ApiUrlFormat, c.serverAddress, ApiPath, path, query)
httpReq, err := http.NewRequest("POST", url, fileReader)
httpReq, err := http.NewRequest("POST", url, reader)
if err != nil {
return nil, err
}
@ -100,15 +107,21 @@ func getQuery(req cmds.Request) (string, error) {
args := req.Arguments()
argDefs := req.Command().Arguments
var argDef cmds.Argument
for i, arg := range args {
if i < len(argDefs) {
argDef = argDefs[i]
argDefIndex := 0
for _, arg := range args {
argDef := argDefs[argDefIndex]
// skip ArgFiles
for argDef.Type == cmds.ArgFile {
argDefIndex++
argDef = argDefs[argDefIndex]
}
if argDef.Type == cmds.ArgString {
query.Add("arg", arg)
query.Add("arg", arg)
if len(argDefs) > argDefIndex+1 {
argDefIndex++
}
}

View File

@ -26,7 +26,7 @@ type AddOutput struct {
Quiet bool
}
var addCmd = &cmds.Command{
var AddCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Add an object to ipfs.",
ShortDescription: `

View File

@ -18,7 +18,7 @@ type Block struct {
Length int
}
var blockCmd = &cmds.Command{
var BlockCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Manipulate raw IPFS blocks",
ShortDescription: `

View File

@ -19,13 +19,13 @@ type BootstrapOutput struct {
var peerOptionDesc = "A peer to add to the bootstrap list (in the format '<multiaddr>/<peerID>')"
var bootstrapCmd = &cmds.Command{
var BootstrapCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Show or edit the list of bootstrap peers",
Synopsis: `
ipfs bootstrap list - Show peers in the bootstrap list
ipfs bootstrap add <peer>... - Add peers to the bootstrap list
ipfs bootstrap remove <peer>... - Removes peers from the bootstrap list
ipfs bootstrap rm <peer>... - Removes peers from the bootstrap list
`,
ShortDescription: `
Running 'ipfs bootstrap' with no arguments will run 'ipfs bootstrap list'.

View File

@ -8,7 +8,7 @@ import (
uio "github.com/jbenet/go-ipfs/unixfs/io"
)
var catCmd = &cmds.Command{
var CatCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Show IPFS object data",
ShortDescription: `

View File

@ -19,7 +19,7 @@ type ConfigField struct {
Value interface{}
}
var configCmd = &cmds.Command{
var ConfigCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "get and set IPFS config values",
Synopsis: `

View File

@ -31,7 +31,7 @@ type IdOutput struct {
ProtocolVersion string
}
var idCmd = &cmds.Command{
var IDCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Show IPFS Node ID info",
ShortDescription: `
@ -60,7 +60,8 @@ if no peer is specified, prints out local peers info.
}
ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
if node.Routing == nil {
// TODO handle offline mode with polymorphism instead of conditionals
if !node.OnlineMode() {
return nil, errors.New(offlineIdErrorMessage)
}

View File

@ -21,7 +21,7 @@ type LsOutput struct {
Objects []Object
}
var lsCmd = &cmds.Command{
var LsCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "List links from an object.",
ShortDescription: `

View File

@ -22,7 +22,7 @@ const mountTimeout = time.Second
// fuseNoDirectory used to check the returning fuse error
const fuseNoDirectory = "fusermount: failed to access mountpoint"
var mountCmd = &cmds.Command{
var MountCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Mounts IPFS to the filesystem (read-only)",
Synopsis: `

View File

@ -7,7 +7,7 @@ import (
"github.com/jbenet/go-ipfs/core"
)
var mountCmd = &cmds.Command{
var MountCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Not yet implemented on Windows",
ShortDescription: "Not yet implemented on Windows. :(",

View File

@ -7,7 +7,7 @@ type IpnsEntry struct {
Value string
}
var nameCmd = &cmds.Command{
var NameCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "IPFS namespace (IPNS) tool",
Synopsis: `

View File

@ -24,7 +24,7 @@ type Node struct {
Data []byte
}
var objectCmd = &cmds.Command{
var ObjectCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Interact with ipfs objects",
ShortDescription: `

View File

@ -9,7 +9,7 @@ import (
u "github.com/jbenet/go-ipfs/util"
)
var pinCmd = &cmds.Command{
var PinCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Pin (and unpin) objects to local storage",
},

View File

@ -25,7 +25,7 @@ func KeyListTextMarshaler(res cmds.Response) ([]byte, error) {
return []byte(s), nil
}
var refsCmd = &cmds.Command{
var RefsCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Lists link hashes from an object",
ShortDescription: `

View File

@ -62,24 +62,24 @@ Use 'ipfs <command> --help' to learn more about each command.
var CommandsDaemonCmd = CommandsCmd(Root)
var rootSubcommands = map[string]*cmds.Command{
"cat": catCmd,
"ls": lsCmd,
"add": AddCmd,
"block": BlockCmd,
"bootstrap": BootstrapCmd,
"cat": CatCmd,
"commands": CommandsDaemonCmd,
"name": nameCmd,
"add": addCmd,
"log": LogCmd,
"config": ConfigCmd,
"diag": DiagCmd,
"pin": pinCmd,
"version": VersionCmd,
"config": configCmd,
"bootstrap": bootstrapCmd,
"mount": mountCmd,
"block": blockCmd,
"id": IDCmd,
"log": LogCmd,
"ls": LsCmd,
"mount": MountCmd,
"name": NameCmd,
"object": ObjectCmd,
"pin": PinCmd,
"refs": RefsCmd,
"swarm": SwarmCmd,
"update": UpdateCmd,
"object": objectCmd,
"refs": refsCmd,
"id": idCmd,
"swarm": swarmCmd,
"version": VersionCmd,
}
func init() {

View File

@ -17,7 +17,7 @@ type stringList struct {
Strings []string
}
var swarmCmd = &cmds.Command{
var SwarmCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "swarm inspection tool",
Synopsis: `

View File

@ -262,6 +262,16 @@ func (c *MultiConn) ID() string {
return string(ids)
}
func (c *MultiConn) getConns() []Conn {
c.RLock()
defer c.RUnlock()
var conns []Conn
for _, c := range c.conns {
conns = append(conns, c)
}
return conns
}
func (c *MultiConn) String() string {
return String(c, "MultiConn")
}

View File

@ -307,11 +307,11 @@ func TestMulticonnClose(t *testing.T) {
ctx := context.Background()
c1, c2 := setupMultiConns(t, ctx)
for _, c := range c1.conns {
for _, c := range c1.getConns() {
c.Close()
}
for _, c := range c2.conns {
for _, c := range c2.getConns() {
c.Close()
}

View File

@ -174,8 +174,7 @@ func (m *Muxer) handleOutgoingMessages(pid pb.ProtocolID, proto Protocol) {
if !more {
return
}
m.Children().Add(1)
go m.handleOutgoingMessage(pid, msg)
m.handleOutgoingMessage(pid, msg)
case <-m.Closing():
return
@ -185,7 +184,6 @@ func (m *Muxer) handleOutgoingMessages(pid pb.ProtocolID, proto Protocol) {
// handleOutgoingMessage wraps out a message and sends it out the
func (m *Muxer) handleOutgoingMessage(pid pb.ProtocolID, m1 msg.NetMessage) {
defer m.Children().Done()
data, err := wrapData(m1.Data(), pid)
if err != nil {

View File

@ -161,7 +161,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N
}
// dispatch handler.
rpmes, err := handler(mPeer, pmes)
rpmes, err := handler(ctx, mPeer, pmes)
if err != nil {
log.Errorf("handle message error: %s", err)
return nil
@ -194,24 +194,21 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *pb.Messa
start := time.Now()
log.Event(ctx, "sentMessage", dht.self, p, pmes)
rmes, err := dht.sender.SendRequest(ctx, mes)
rmes, err := dht.sender.SendRequest(ctx, mes) // respect?
if err != nil {
return nil, err
}
if rmes == nil {
return nil, errors.New("no response to request")
}
log.Event(ctx, "sentMessage", dht.self, p, pmes)
rtt := time.Since(start)
rmes.Peer().SetLatency(rtt)
rmes.Peer().SetLatency(time.Since(start))
rpmes := new(pb.Message)
if err := proto.Unmarshal(rmes.Data(), rpmes); err != nil {
return nil, err
}
return rpmes, nil
}

View File

@ -5,20 +5,19 @@ import (
"fmt"
"time"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
peer "github.com/jbenet/go-ipfs/peer"
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
u "github.com/jbenet/go-ipfs/util"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
)
// The number of closer peers to send on requests.
var CloserPeerCount = 4
// dhthandler specifies the signature of functions that handle DHT messages.
type dhtHandler func(peer.Peer, *pb.Message) (*pb.Message, error)
type dhtHandler func(context.Context, peer.Peer, *pb.Message) (*pb.Message, error)
func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler {
switch t {
@ -39,7 +38,7 @@ func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler {
}
}
func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
log.Debugf("%s handleGetValue for key: %s\n", dht.self, pmes.GetKey())
// setup response
@ -85,7 +84,7 @@ func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *pb.Message) (*pb.Message,
}
// if we know any providers for the requested value, return those.
provs := dht.providers.GetProviders(u.Key(pmes.GetKey()))
provs := dht.providers.GetProviders(ctx, u.Key(pmes.GetKey()))
if len(provs) > 0 {
log.Debugf("handleGetValue returning %d provider[s]", len(provs))
resp.ProviderPeers = pb.PeersToPBPeers(provs)
@ -107,7 +106,7 @@ func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *pb.Message) (*pb.Message,
}
// Store a value in this peer local storage
func (dht *IpfsDHT) handlePutValue(p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
dht.dslock.Lock()
defer dht.dslock.Unlock()
dskey := u.Key(pmes.GetKey()).DsKey()
@ -129,12 +128,12 @@ func (dht *IpfsDHT) handlePutValue(p peer.Peer, pmes *pb.Message) (*pb.Message,
return pmes, err
}
func (dht *IpfsDHT) handlePing(p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
func (dht *IpfsDHT) handlePing(_ context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
log.Debugf("%s Responding to ping from %s!\n", dht.self, p)
return pmes, nil
}
func (dht *IpfsDHT) handleFindPeer(p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
resp := pb.NewMessage(pmes.GetType(), "", pmes.GetClusterLevel())
var closest []peer.Peer
@ -164,7 +163,7 @@ func (dht *IpfsDHT) handleFindPeer(p peer.Peer, pmes *pb.Message) (*pb.Message,
return resp, nil
}
func (dht *IpfsDHT) handleGetProviders(p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
// check if we have this value, to add ourselves as provider.
@ -177,7 +176,7 @@ func (dht *IpfsDHT) handleGetProviders(p peer.Peer, pmes *pb.Message) (*pb.Messa
}
// setup providers
providers := dht.providers.GetProviders(u.Key(pmes.GetKey()))
providers := dht.providers.GetProviders(ctx, u.Key(pmes.GetKey()))
if has {
providers = append(providers, dht.self)
}
@ -201,7 +200,7 @@ type providerInfo struct {
Value peer.Peer
}
func (dht *IpfsDHT) handleAddProvider(p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
key := u.Key(pmes.GetKey())
log.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, peer.ID(key))

View File

@ -101,12 +101,17 @@ func (pm *ProviderManager) AddProvider(k u.Key, val peer.Peer) {
}
}
func (pm *ProviderManager) GetProviders(k u.Key) []peer.Peer {
gp := new(getProv)
gp.k = k
gp.resp = make(chan []peer.Peer)
pm.getprovs <- gp
return <-gp.resp
func (pm *ProviderManager) GetProviders(ctx context.Context, k u.Key) []peer.Peer {
gp := &getProv{
k: k,
resp: make(chan []peer.Peer, 1), // buffered to prevent sender from blocking
}
select {
case pm.getprovs <- gp:
return <-gp.resp
case <-ctx.Done():
return nil
}
}
func (pm *ProviderManager) GetLocal() []u.Key {

View File

@ -15,7 +15,7 @@ func TestProviderManager(t *testing.T) {
p := NewProviderManager(ctx, mid)
a := u.Key("test")
p.AddProvider(a, peer.WithIDString("testingprovider"))
resp := p.GetProviders(a)
resp := p.GetProviders(ctx, a)
if len(resp) != 1 {
t.Fatal("Could not retrieve provider.")
}

View File

@ -129,19 +129,26 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int
log.Event(ctx, "findProviders", &key)
peerOut := make(chan peer.Peer, count)
go func() {
defer close(peerOut)
ps := newPeerSet()
provs := dht.providers.GetProviders(key)
// TODO may want to make this function async to hide latency
provs := dht.providers.GetProviders(ctx, key)
for _, p := range provs {
count--
// NOTE: assuming that this list of peers is unique
ps.Add(p)
peerOut <- p
select {
case peerOut <- p:
case <-ctx.Done():
return
}
if count <= 0 {
return
}
}
wg := new(sync.WaitGroup)
var wg sync.WaitGroup
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), AlphaValue)
for _, pp := range peers {
wg.Add(1)
@ -156,16 +163,16 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int
}(pp)
}
wg.Wait()
close(peerOut)
}()
return peerOut
}
func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.Message_Peer, ps *peerSet, count int, out chan peer.Peer) {
done := make(chan struct{})
var wg sync.WaitGroup
for _, pbp := range peers {
wg.Add(1)
go func(mp *pb.Message_Peer) {
defer func() { done <- struct{}{} }()
defer wg.Done()
// construct new peer
p, err := dht.ensureConnectedToPeer(ctx, mp)
if err != nil {
@ -179,15 +186,17 @@ func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.M
dht.providers.AddProvider(k, p)
if ps.AddIfSmallerThan(p, count) {
out <- p
select {
case out <- p:
case <-ctx.Done():
return
}
} else if ps.Size() >= count {
return
}
}(pbp)
}
for _ = range peers {
<-done
}
wg.Wait()
}
// Find specific Peer