diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 5810e9613..0b08c1316 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -196,7 +196,7 @@ }, { "ImportPath": "github.com/jbenet/go-peerstream", - "Rev": "cfdc29a19c1a209d548670f5c33c5cda2e040143" + "Rev": "f90119e97e8be7b2bdd5e598067b0dc44df63381" }, { "ImportPath": "github.com/jbenet/go-random", diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go index 181ae4687..2e066c2f4 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go @@ -44,6 +44,9 @@ type Conn struct { streams map[*Stream]struct{} streamLock sync.RWMutex + + closed bool + closeLock sync.Mutex } func newConn(nconn net.Conn, tconn smux.Conn, s *Swarm) *Conn { @@ -114,6 +117,14 @@ func (c *Conn) Streams() []*Stream { // Close closes this connection func (c *Conn) Close() error { + c.closeLock.Lock() + defer c.closeLock.Unlock() + + if c.closed { + return nil + } + c.closed = true + // close streams streams := c.Streams() for _, s := range streams { diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index 3b4626a4d..2fae23515 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -56,6 +56,8 @@ type msgQueue struct { out bsmsg.BitSwapMessage network bsnet.BitSwapNetwork + refcnt int + work chan struct{} done chan struct{} } @@ -101,13 +103,13 @@ func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) { } func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue { - _, ok := pm.peers[p] + mq, ok := pm.peers[p] if ok { - // TODO: log an error? + mq.refcnt++ return nil } - mq := pm.newMsgQueue(p) + mq = pm.newMsgQueue(p) // new peer, we will want to give them our full wantlist fullwantlist := bsmsg.New(true) @@ -129,6 +131,11 @@ func (pm *WantManager) stopPeerHandler(p peer.ID) { return } + pq.refcnt-- + if pq.refcnt > 0 { + return + } + close(pq.done) delete(pm.peers, p) } @@ -247,6 +254,7 @@ func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue { mq.work = make(chan struct{}, 1) mq.network = wm.network mq.p = p + mq.refcnt = 1 return mq } diff --git a/test/dependencies/iptb/main.go b/test/dependencies/iptb/main.go index 013c46f8e..fd0bd9063 100644 --- a/test/dependencies/iptb/main.go +++ b/test/dependencies/iptb/main.go @@ -73,13 +73,19 @@ type initCfg struct { Bootstrap string PortStart int Mdns bool + Utp bool } func (c *initCfg) swarmAddrForPeer(i int) string { - if c.PortStart == 0 { - return "/ip4/0.0.0.0/tcp/0" + str := "/ip4/0.0.0.0/tcp/%d" + if c.Utp { + str = "/ip4/0.0.0.0/udp/%d/utp" } - return fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", c.PortStart+i) + + if c.PortStart == 0 { + return fmt.Sprintf(str, 0) + } + return fmt.Sprintf(str, c.PortStart+i) } func (c *initCfg) apiAddrForPeer(i int) string { @@ -250,6 +256,20 @@ func IpfsKillAll() error { return nil } +func envForDaemon(n int) []string { + envs := os.Environ() + npath := "IPFS_PATH=" + IpfsDirN(n) + for i, e := range envs { + p := strings.Split(e, "=") + if p[0] == "IPFS_PATH" { + envs[i] = npath + return envs + } + } + + return append(envs, npath) +} + func IpfsStart(waitall bool) error { var addrs []string n := GetNumNodes() @@ -257,7 +277,7 @@ func IpfsStart(waitall bool) error { dir := IpfsDirN(i) cmd := exec.Command("ipfs", "daemon") cmd.Dir = dir - cmd.Env = append(os.Environ(), "IPFS_PATH="+dir) + cmd.Env = envForDaemon(i) cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true} @@ -441,6 +461,11 @@ func IpfsShell(n int) error { } func ConnectNodes(from, to int) error { + if from == to { + // skip connecting to self.. + return nil + } + fmt.Printf("connecting %d -> %d\n", from, to) cmd := exec.Command("ipfs", "id", "-f", "") cmd.Env = []string{"IPFS_PATH=" + IpfsDirN(to)} out, err := cmd.Output() @@ -449,7 +474,6 @@ func ConnectNodes(from, to int) error { return err } addr := strings.Split(string(out), "\n")[0] - fmt.Println("ADDR: ", addr) connectcmd := exec.Command("ipfs", "swarm", "connect", addr) connectcmd.Env = []string{"IPFS_PATH=" + IpfsDirN(from)} @@ -461,6 +485,55 @@ func ConnectNodes(from, to int) error { return nil } +func parseRange(s string) ([]int, error) { + if strings.HasPrefix(s, "[") && strings.HasSuffix(s, "]") { + ranges := strings.Split(s[1:len(s)-1], ",") + var out []int + for _, r := range ranges { + rng, err := expandDashRange(r) + if err != nil { + return nil, err + } + + out = append(out, rng...) + } + return out, nil + } else { + i, err := strconv.Atoi(s) + if err != nil { + return nil, err + } + + return []int{i}, nil + } +} + +func expandDashRange(s string) ([]int, error) { + parts := strings.Split(s, "-") + if len(parts) == 0 { + i, err := strconv.Atoi(s) + if err != nil { + return nil, err + } + return []int{i}, nil + } + low, err := strconv.Atoi(parts[0]) + if err != nil { + return nil, err + } + + hi, err := strconv.Atoi(parts[1]) + if err != nil { + return nil, err + } + + var out []int + for i := low; i <= hi; i++ { + out = append(out, i) + } + return out, nil +} + func GetAttr(attr string, node int) (string, error) { switch attr { case "id": @@ -518,9 +591,10 @@ func main() { cfg := new(initCfg) kingpin.Flag("n", "number of ipfs nodes to initialize").Short('n').IntVar(&cfg.Count) kingpin.Flag("port", "port to start allocations from").Default("4002").Short('p').IntVar(&cfg.PortStart) - kingpin.Flag("f", "force initialization (overwrite existing configs)").BoolVar(&cfg.Force) + kingpin.Flag("force", "force initialization (overwrite existing configs)").Short('f').BoolVar(&cfg.Force) kingpin.Flag("mdns", "turn on mdns for nodes").BoolVar(&cfg.Mdns) kingpin.Flag("bootstrap", "select bootstrapping style for cluster").Default("star").StringVar(&cfg.Bootstrap) + kingpin.Flag("utp", "use utp for addresses").BoolVar(&cfg.Utp) wait := kingpin.Flag("wait", "wait for nodes to come fully online before exiting").Bool() @@ -576,22 +650,26 @@ func main() { os.Exit(1) } - from, err := strconv.Atoi(args[1]) + from, err := parseRange(args[1]) if err != nil { fmt.Printf("failed to parse: %s\n", err) return } - to, err := strconv.Atoi(args[2]) + to, err := parseRange(args[2]) if err != nil { fmt.Printf("failed to parse: %s\n", err) return } - err = ConnectNodes(from, to) - if err != nil { - fmt.Printf("failed to connect: %s\n", err) - return + for _, f := range from { + for _, t := range to { + err = ConnectNodes(f, t) + if err != nil { + fmt.Printf("failed to connect: %s\n", err) + return + } + } } case "get": diff --git a/test/sharness/t0130-multinode.sh b/test/sharness/t0130-multinode.sh new file mode 100755 index 000000000..7ba364ec5 --- /dev/null +++ b/test/sharness/t0130-multinode.sh @@ -0,0 +1,81 @@ +#!/bin/sh +# +# Copyright (c) 2015 Jeromy Johnson +# MIT Licensed; see the LICENSE file in this repository. +# + +test_description="Test multiple ipfs nodes" + +. lib/test-lib.sh + +export IPTB_ROOT="`pwd`/.iptb" + +ipfsi() { + dir="$1" + shift + IPFS_PATH="$IPTB_ROOT/$dir" ipfs $@ +} + +check_has_connection() { + node=$1 + ipfsi $node swarm peers | grep ipfs > /dev/null +} + +startup_cluster() { + test_expect_success "start up nodes" ' + iptb start + ' + + test_expect_success "connect nodes to eachother" ' + iptb connect [1-4] 0 + ' + + test_expect_success "nodes are connected" ' + check_has_connection 0 && + check_has_connection 1 && + check_has_connection 2 && + check_has_connection 3 && + check_has_connection 4 + ' +} + +check_file_fetch() { + node=$1 + fhash=$2 + fname=$3 + + test_expect_success "can fetch file" ' + ipfsi $node cat $fhash > fetch_out + ' + + test_expect_success "file looks good" ' + test_cmp $fname fetch_out + ' +} + +run_basic_test() { + startup_cluster + + test_expect_success "add a file on node1" ' + random 1000000 > filea && + FILEA_HASH=$(ipfsi 1 add -q filea) + ' + + check_file_fetch 4 $FILEA_HASH filea + check_file_fetch 3 $FILEA_HASH filea + check_file_fetch 2 $FILEA_HASH filea + check_file_fetch 1 $FILEA_HASH filea + check_file_fetch 0 $FILEA_HASH filea + + test_expect_success "shut down nodes" ' + iptb stop + ' +} + +test_expect_success "set up tcp testbed" ' + iptb init -n 5 -p 0 -f --bootstrap=none +' + +run_basic_test + +test_done