diff --git a/commands/cli/parse.go b/commands/cli/parse.go index b8447961c..694951bed 100644 --- a/commands/cli/parse.go +++ b/commands/cli/parse.go @@ -5,7 +5,7 @@ import ( "errors" "fmt" "os" - fp "path/filepath" + fp "path" "runtime" "strings" diff --git a/commands/http/client.go b/commands/http/client.go index 3cbc41153..daaec0994 100644 --- a/commands/http/client.go +++ b/commands/http/client.go @@ -48,14 +48,21 @@ func (c *client) Send(req cmds.Request) (cmds.Response, error) { } var fileReader *MultiFileReader + var reader io.Reader + if req.Files() != nil { fileReader = NewMultiFileReader(req.Files(), true) + reader = fileReader + } else { + // if we have no file data, use an empty Reader + // (http.NewRequest panics when a nil Reader is used) + reader = strings.NewReader("") } path := strings.Join(req.Path(), "/") url := fmt.Sprintf(ApiUrlFormat, c.serverAddress, ApiPath, path, query) - httpReq, err := http.NewRequest("POST", url, fileReader) + httpReq, err := http.NewRequest("POST", url, reader) if err != nil { return nil, err } @@ -100,15 +107,21 @@ func getQuery(req cmds.Request) (string, error) { args := req.Arguments() argDefs := req.Command().Arguments - var argDef cmds.Argument - for i, arg := range args { - if i < len(argDefs) { - argDef = argDefs[i] + argDefIndex := 0 + + for _, arg := range args { + argDef := argDefs[argDefIndex] + // skip ArgFiles + for argDef.Type == cmds.ArgFile { + argDefIndex++ + argDef = argDefs[argDefIndex] } - if argDef.Type == cmds.ArgString { - query.Add("arg", arg) + query.Add("arg", arg) + + if len(argDefs) > argDefIndex+1 { + argDefIndex++ } } diff --git a/core/commands/add.go b/core/commands/add.go index 92ceeb76d..176824a9f 100644 --- a/core/commands/add.go +++ b/core/commands/add.go @@ -26,7 +26,7 @@ type AddOutput struct { Quiet bool } -var addCmd = &cmds.Command{ +var AddCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Add an object to ipfs.", ShortDescription: ` diff --git a/core/commands/block.go b/core/commands/block.go index 3efee084a..326dac2fa 100644 --- a/core/commands/block.go +++ b/core/commands/block.go @@ -18,7 +18,7 @@ type Block struct { Length int } -var blockCmd = &cmds.Command{ +var BlockCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Manipulate raw IPFS blocks", ShortDescription: ` diff --git a/core/commands/bootstrap.go b/core/commands/bootstrap.go index 4b68a67c6..37ff44e9e 100644 --- a/core/commands/bootstrap.go +++ b/core/commands/bootstrap.go @@ -19,13 +19,13 @@ type BootstrapOutput struct { var peerOptionDesc = "A peer to add to the bootstrap list (in the format '/')" -var bootstrapCmd = &cmds.Command{ +var BootstrapCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Show or edit the list of bootstrap peers", Synopsis: ` ipfs bootstrap list - Show peers in the bootstrap list ipfs bootstrap add ... - Add peers to the bootstrap list -ipfs bootstrap remove ... - Removes peers from the bootstrap list +ipfs bootstrap rm ... - Removes peers from the bootstrap list `, ShortDescription: ` Running 'ipfs bootstrap' with no arguments will run 'ipfs bootstrap list'. diff --git a/core/commands/cat.go b/core/commands/cat.go index dd5187a10..01b6142c7 100644 --- a/core/commands/cat.go +++ b/core/commands/cat.go @@ -8,7 +8,7 @@ import ( uio "github.com/jbenet/go-ipfs/unixfs/io" ) -var catCmd = &cmds.Command{ +var CatCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Show IPFS object data", ShortDescription: ` diff --git a/core/commands/config.go b/core/commands/config.go index 2e58622d0..8b15b7550 100644 --- a/core/commands/config.go +++ b/core/commands/config.go @@ -19,7 +19,7 @@ type ConfigField struct { Value interface{} } -var configCmd = &cmds.Command{ +var ConfigCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "get and set IPFS config values", Synopsis: ` diff --git a/core/commands/id.go b/core/commands/id.go index 5ce38b543..25bc9fdc3 100644 --- a/core/commands/id.go +++ b/core/commands/id.go @@ -31,7 +31,7 @@ type IdOutput struct { ProtocolVersion string } -var idCmd = &cmds.Command{ +var IDCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Show IPFS Node ID info", ShortDescription: ` @@ -60,7 +60,8 @@ if no peer is specified, prints out local peers info. } ctx, _ := context.WithTimeout(context.TODO(), time.Second*5) - if node.Routing == nil { + // TODO handle offline mode with polymorphism instead of conditionals + if !node.OnlineMode() { return nil, errors.New(offlineIdErrorMessage) } diff --git a/core/commands/ls.go b/core/commands/ls.go index 7b3db8eb5..87f65b278 100644 --- a/core/commands/ls.go +++ b/core/commands/ls.go @@ -21,7 +21,7 @@ type LsOutput struct { Objects []Object } -var lsCmd = &cmds.Command{ +var LsCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "List links from an object.", ShortDescription: ` diff --git a/core/commands/mount_unix.go b/core/commands/mount_unix.go index 42763768f..a9b816ae0 100644 --- a/core/commands/mount_unix.go +++ b/core/commands/mount_unix.go @@ -22,7 +22,7 @@ const mountTimeout = time.Second // fuseNoDirectory used to check the returning fuse error const fuseNoDirectory = "fusermount: failed to access mountpoint" -var mountCmd = &cmds.Command{ +var MountCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Mounts IPFS to the filesystem (read-only)", Synopsis: ` diff --git a/core/commands/mount_windows.go b/core/commands/mount_windows.go index 27b13381b..01bcc698c 100644 --- a/core/commands/mount_windows.go +++ b/core/commands/mount_windows.go @@ -7,7 +7,7 @@ import ( "github.com/jbenet/go-ipfs/core" ) -var mountCmd = &cmds.Command{ +var MountCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Not yet implemented on Windows", ShortDescription: "Not yet implemented on Windows. :(", diff --git a/core/commands/name.go b/core/commands/name.go index a50d02d43..95d1a876e 100644 --- a/core/commands/name.go +++ b/core/commands/name.go @@ -7,7 +7,7 @@ type IpnsEntry struct { Value string } -var nameCmd = &cmds.Command{ +var NameCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "IPFS namespace (IPNS) tool", Synopsis: ` diff --git a/core/commands/object.go b/core/commands/object.go index b4f34e318..5e85ddc18 100644 --- a/core/commands/object.go +++ b/core/commands/object.go @@ -24,7 +24,7 @@ type Node struct { Data []byte } -var objectCmd = &cmds.Command{ +var ObjectCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Interact with ipfs objects", ShortDescription: ` diff --git a/core/commands/pin.go b/core/commands/pin.go index 341d7dd4a..35e8e62cc 100644 --- a/core/commands/pin.go +++ b/core/commands/pin.go @@ -9,7 +9,7 @@ import ( u "github.com/jbenet/go-ipfs/util" ) -var pinCmd = &cmds.Command{ +var PinCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Pin (and unpin) objects to local storage", }, diff --git a/core/commands/refs.go b/core/commands/refs.go index c977a3df8..e3cbfaab9 100644 --- a/core/commands/refs.go +++ b/core/commands/refs.go @@ -25,7 +25,7 @@ func KeyListTextMarshaler(res cmds.Response) ([]byte, error) { return []byte(s), nil } -var refsCmd = &cmds.Command{ +var RefsCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Lists link hashes from an object", ShortDescription: ` diff --git a/core/commands/root.go b/core/commands/root.go index 450b6b7e8..ae192e5c8 100644 --- a/core/commands/root.go +++ b/core/commands/root.go @@ -62,24 +62,24 @@ Use 'ipfs --help' to learn more about each command. var CommandsDaemonCmd = CommandsCmd(Root) var rootSubcommands = map[string]*cmds.Command{ - "cat": catCmd, - "ls": lsCmd, + "add": AddCmd, + "block": BlockCmd, + "bootstrap": BootstrapCmd, + "cat": CatCmd, "commands": CommandsDaemonCmd, - "name": nameCmd, - "add": addCmd, - "log": LogCmd, + "config": ConfigCmd, "diag": DiagCmd, - "pin": pinCmd, - "version": VersionCmd, - "config": configCmd, - "bootstrap": bootstrapCmd, - "mount": mountCmd, - "block": blockCmd, + "id": IDCmd, + "log": LogCmd, + "ls": LsCmd, + "mount": MountCmd, + "name": NameCmd, + "object": ObjectCmd, + "pin": PinCmd, + "refs": RefsCmd, + "swarm": SwarmCmd, "update": UpdateCmd, - "object": objectCmd, - "refs": refsCmd, - "id": idCmd, - "swarm": swarmCmd, + "version": VersionCmd, } func init() { diff --git a/core/commands/swarm.go b/core/commands/swarm.go index a450a3d94..743f46904 100644 --- a/core/commands/swarm.go +++ b/core/commands/swarm.go @@ -17,7 +17,7 @@ type stringList struct { Strings []string } -var swarmCmd = &cmds.Command{ +var SwarmCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "swarm inspection tool", Synopsis: ` diff --git a/net/conn/multiconn.go b/net/conn/multiconn.go index 6c8752308..01b91c6b1 100644 --- a/net/conn/multiconn.go +++ b/net/conn/multiconn.go @@ -262,6 +262,16 @@ func (c *MultiConn) ID() string { return string(ids) } +func (c *MultiConn) getConns() []Conn { + c.RLock() + defer c.RUnlock() + var conns []Conn + for _, c := range c.conns { + conns = append(conns, c) + } + return conns +} + func (c *MultiConn) String() string { return String(c, "MultiConn") } diff --git a/net/conn/multiconn_test.go b/net/conn/multiconn_test.go index b511d9a3b..03ed2e369 100644 --- a/net/conn/multiconn_test.go +++ b/net/conn/multiconn_test.go @@ -307,11 +307,11 @@ func TestMulticonnClose(t *testing.T) { ctx := context.Background() c1, c2 := setupMultiConns(t, ctx) - for _, c := range c1.conns { + for _, c := range c1.getConns() { c.Close() } - for _, c := range c2.conns { + for _, c := range c2.getConns() { c.Close() } diff --git a/net/mux/mux.go b/net/mux/mux.go index d971e9054..4f54890e3 100644 --- a/net/mux/mux.go +++ b/net/mux/mux.go @@ -174,8 +174,7 @@ func (m *Muxer) handleOutgoingMessages(pid pb.ProtocolID, proto Protocol) { if !more { return } - m.Children().Add(1) - go m.handleOutgoingMessage(pid, msg) + m.handleOutgoingMessage(pid, msg) case <-m.Closing(): return @@ -185,7 +184,6 @@ func (m *Muxer) handleOutgoingMessages(pid pb.ProtocolID, proto Protocol) { // handleOutgoingMessage wraps out a message and sends it out the func (m *Muxer) handleOutgoingMessage(pid pb.ProtocolID, m1 msg.NetMessage) { - defer m.Children().Done() data, err := wrapData(m1.Data(), pid) if err != nil { diff --git a/routing/dht/dht.go b/routing/dht/dht.go index f76ca8f59..0277f644b 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -161,7 +161,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N } // dispatch handler. - rpmes, err := handler(mPeer, pmes) + rpmes, err := handler(ctx, mPeer, pmes) if err != nil { log.Errorf("handle message error: %s", err) return nil @@ -194,24 +194,21 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *pb.Messa start := time.Now() - log.Event(ctx, "sentMessage", dht.self, p, pmes) - - rmes, err := dht.sender.SendRequest(ctx, mes) + rmes, err := dht.sender.SendRequest(ctx, mes) // respect? if err != nil { return nil, err } if rmes == nil { return nil, errors.New("no response to request") } + log.Event(ctx, "sentMessage", dht.self, p, pmes) - rtt := time.Since(start) - rmes.Peer().SetLatency(rtt) + rmes.Peer().SetLatency(time.Since(start)) rpmes := new(pb.Message) if err := proto.Unmarshal(rmes.Data(), rpmes); err != nil { return nil, err } - return rpmes, nil } diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index bd4b813ee..07f21f18a 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -5,20 +5,19 @@ import ( "fmt" "time" - "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" - + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" peer "github.com/jbenet/go-ipfs/peer" pb "github.com/jbenet/go-ipfs/routing/dht/pb" u "github.com/jbenet/go-ipfs/util" - - ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ) // The number of closer peers to send on requests. var CloserPeerCount = 4 // dhthandler specifies the signature of functions that handle DHT messages. -type dhtHandler func(peer.Peer, *pb.Message) (*pb.Message, error) +type dhtHandler func(context.Context, peer.Peer, *pb.Message) (*pb.Message, error) func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler { switch t { @@ -39,7 +38,7 @@ func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler { } } -func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *pb.Message) (*pb.Message, error) { +func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) { log.Debugf("%s handleGetValue for key: %s\n", dht.self, pmes.GetKey()) // setup response @@ -85,7 +84,7 @@ func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *pb.Message) (*pb.Message, } // if we know any providers for the requested value, return those. - provs := dht.providers.GetProviders(u.Key(pmes.GetKey())) + provs := dht.providers.GetProviders(ctx, u.Key(pmes.GetKey())) if len(provs) > 0 { log.Debugf("handleGetValue returning %d provider[s]", len(provs)) resp.ProviderPeers = pb.PeersToPBPeers(provs) @@ -107,7 +106,7 @@ func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *pb.Message) (*pb.Message, } // Store a value in this peer local storage -func (dht *IpfsDHT) handlePutValue(p peer.Peer, pmes *pb.Message) (*pb.Message, error) { +func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) { dht.dslock.Lock() defer dht.dslock.Unlock() dskey := u.Key(pmes.GetKey()).DsKey() @@ -129,12 +128,12 @@ func (dht *IpfsDHT) handlePutValue(p peer.Peer, pmes *pb.Message) (*pb.Message, return pmes, err } -func (dht *IpfsDHT) handlePing(p peer.Peer, pmes *pb.Message) (*pb.Message, error) { +func (dht *IpfsDHT) handlePing(_ context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) { log.Debugf("%s Responding to ping from %s!\n", dht.self, p) return pmes, nil } -func (dht *IpfsDHT) handleFindPeer(p peer.Peer, pmes *pb.Message) (*pb.Message, error) { +func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) { resp := pb.NewMessage(pmes.GetType(), "", pmes.GetClusterLevel()) var closest []peer.Peer @@ -164,7 +163,7 @@ func (dht *IpfsDHT) handleFindPeer(p peer.Peer, pmes *pb.Message) (*pb.Message, return resp, nil } -func (dht *IpfsDHT) handleGetProviders(p peer.Peer, pmes *pb.Message) (*pb.Message, error) { +func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) { resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel()) // check if we have this value, to add ourselves as provider. @@ -177,7 +176,7 @@ func (dht *IpfsDHT) handleGetProviders(p peer.Peer, pmes *pb.Message) (*pb.Messa } // setup providers - providers := dht.providers.GetProviders(u.Key(pmes.GetKey())) + providers := dht.providers.GetProviders(ctx, u.Key(pmes.GetKey())) if has { providers = append(providers, dht.self) } @@ -201,7 +200,7 @@ type providerInfo struct { Value peer.Peer } -func (dht *IpfsDHT) handleAddProvider(p peer.Peer, pmes *pb.Message) (*pb.Message, error) { +func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) { key := u.Key(pmes.GetKey()) log.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, peer.ID(key)) diff --git a/routing/dht/providers.go b/routing/dht/providers.go index f7d491d6a..7f70056d3 100644 --- a/routing/dht/providers.go +++ b/routing/dht/providers.go @@ -101,12 +101,17 @@ func (pm *ProviderManager) AddProvider(k u.Key, val peer.Peer) { } } -func (pm *ProviderManager) GetProviders(k u.Key) []peer.Peer { - gp := new(getProv) - gp.k = k - gp.resp = make(chan []peer.Peer) - pm.getprovs <- gp - return <-gp.resp +func (pm *ProviderManager) GetProviders(ctx context.Context, k u.Key) []peer.Peer { + gp := &getProv{ + k: k, + resp: make(chan []peer.Peer, 1), // buffered to prevent sender from blocking + } + select { + case pm.getprovs <- gp: + return <-gp.resp + case <-ctx.Done(): + return nil + } } func (pm *ProviderManager) GetLocal() []u.Key { diff --git a/routing/dht/providers_test.go b/routing/dht/providers_test.go index c4ae53910..1ae85fbc4 100644 --- a/routing/dht/providers_test.go +++ b/routing/dht/providers_test.go @@ -15,7 +15,7 @@ func TestProviderManager(t *testing.T) { p := NewProviderManager(ctx, mid) a := u.Key("test") p.AddProvider(a, peer.WithIDString("testingprovider")) - resp := p.GetProviders(a) + resp := p.GetProviders(ctx, a) if len(resp) != 1 { t.Fatal("Could not retrieve provider.") } diff --git a/routing/dht/routing.go b/routing/dht/routing.go index f0bfbe485..134e54ccb 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -129,19 +129,26 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int log.Event(ctx, "findProviders", &key) peerOut := make(chan peer.Peer, count) go func() { + defer close(peerOut) + ps := newPeerSet() - provs := dht.providers.GetProviders(key) + // TODO may want to make this function async to hide latency + provs := dht.providers.GetProviders(ctx, key) for _, p := range provs { count-- // NOTE: assuming that this list of peers is unique ps.Add(p) - peerOut <- p + select { + case peerOut <- p: + case <-ctx.Done(): + return + } if count <= 0 { return } } - wg := new(sync.WaitGroup) + var wg sync.WaitGroup peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), AlphaValue) for _, pp := range peers { wg.Add(1) @@ -156,16 +163,16 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int }(pp) } wg.Wait() - close(peerOut) }() return peerOut } func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.Message_Peer, ps *peerSet, count int, out chan peer.Peer) { - done := make(chan struct{}) + var wg sync.WaitGroup for _, pbp := range peers { + wg.Add(1) go func(mp *pb.Message_Peer) { - defer func() { done <- struct{}{} }() + defer wg.Done() // construct new peer p, err := dht.ensureConnectedToPeer(ctx, mp) if err != nil { @@ -179,15 +186,17 @@ func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.M dht.providers.AddProvider(k, p) if ps.AddIfSmallerThan(p, count) { - out <- p + select { + case out <- p: + case <-ctx.Done(): + return + } } else if ps.Size() >= count { return } }(pbp) } - for _ = range peers { - <-done - } + wg.Wait() } // Find specific Peer