mirror of
https://github.com/ipfs/kubo.git
synced 2025-09-10 11:52:21 +08:00

the travis docs say the env flag TRAVIS=true but, i want to make absolutely sure. explicitness leaves less room for error.
440 lines
10 KiB
Go
440 lines
10 KiB
Go
package swarm
|
|
|
|
import (
|
|
"net"
|
|
"os"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr"
|
|
peer "github.com/jbenet/go-ipfs/p2p/peer"
|
|
testutil "github.com/jbenet/go-ipfs/util/testutil"
|
|
|
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
|
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
|
manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
|
|
)
|
|
|
|
func acceptAndHang(l net.Listener) {
|
|
conns := make([]net.Conn, 0, 10)
|
|
for {
|
|
c, err := l.Accept()
|
|
if err != nil {
|
|
break
|
|
}
|
|
if c != nil {
|
|
conns = append(conns, c)
|
|
}
|
|
}
|
|
for _, c := range conns {
|
|
c.Close()
|
|
}
|
|
}
|
|
|
|
func TestSimultDials(t *testing.T) {
|
|
// t.Skip("skipping for another test")
|
|
t.Parallel()
|
|
|
|
ctx := context.Background()
|
|
swarms := makeSwarms(ctx, t, 2)
|
|
|
|
// connect everyone
|
|
{
|
|
var wg sync.WaitGroup
|
|
connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) {
|
|
// copy for other peer
|
|
log.Debugf("TestSimultOpen: connecting: %s --> %s (%s)", s.local, dst, addr)
|
|
s.peers.AddAddress(dst, addr)
|
|
if _, err := s.Dial(ctx, dst); err != nil {
|
|
t.Fatal("error swarm dialing to peer", err)
|
|
}
|
|
wg.Done()
|
|
}
|
|
|
|
ifaceAddrs0, err := swarms[0].InterfaceListenAddresses()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
ifaceAddrs1, err := swarms[1].InterfaceListenAddresses()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
log.Info("Connecting swarms simultaneously.")
|
|
for i := 0; i < 10; i++ { // connect 10x for each.
|
|
wg.Add(2)
|
|
go connect(swarms[0], swarms[1].local, ifaceAddrs1[0])
|
|
go connect(swarms[1], swarms[0].local, ifaceAddrs0[0])
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
// should still just have 1, at most 2 connections :)
|
|
c01l := len(swarms[0].ConnectionsToPeer(swarms[1].local))
|
|
if c01l > 2 {
|
|
t.Error("0->1 has", c01l)
|
|
}
|
|
c10l := len(swarms[1].ConnectionsToPeer(swarms[0].local))
|
|
if c10l > 2 {
|
|
t.Error("1->0 has", c10l)
|
|
}
|
|
|
|
for _, s := range swarms {
|
|
s.Close()
|
|
}
|
|
}
|
|
|
|
func newSilentPeer(t *testing.T) (peer.ID, ma.Multiaddr, net.Listener) {
|
|
dst := testutil.RandPeerIDFatal(t)
|
|
lst, err := net.Listen("tcp", ":0")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
addr, err := manet.FromNetAddr(lst.Addr())
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
addrs := []ma.Multiaddr{addr}
|
|
addrs, err = addrutil.ResolveUnspecifiedAddresses(addrs, nil)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
t.Log("new silent peer:", dst, addrs[0])
|
|
return dst, addrs[0], lst
|
|
}
|
|
|
|
func TestDialWait(t *testing.T) {
|
|
// t.Skip("skipping for another test")
|
|
t.Parallel()
|
|
|
|
ctx := context.Background()
|
|
swarms := makeSwarms(ctx, t, 1)
|
|
s1 := swarms[0]
|
|
defer s1.Close()
|
|
|
|
s1.dialT = time.Millisecond * 300 // lower timeout for tests.
|
|
if os.Getenv("TRAVIS") == "true" {
|
|
s1.dialT = time.Second
|
|
}
|
|
|
|
// dial to a non-existent peer.
|
|
s2p, s2addr, s2l := newSilentPeer(t)
|
|
go acceptAndHang(s2l)
|
|
defer s2l.Close()
|
|
s1.peers.AddAddress(s2p, s2addr)
|
|
|
|
before := time.Now()
|
|
if c, err := s1.Dial(ctx, s2p); err == nil {
|
|
defer c.Close()
|
|
t.Fatal("error swarm dialing to unknown peer worked...", err)
|
|
} else {
|
|
t.Log("correctly got error:", err)
|
|
}
|
|
duration := time.Now().Sub(before)
|
|
|
|
dt := s1.dialT
|
|
if duration < dt*dialAttempts {
|
|
t.Error("< DialTimeout * dialAttempts not being respected", duration, dt*dialAttempts)
|
|
}
|
|
if duration > 2*dt*dialAttempts {
|
|
t.Error("> 2*DialTimeout * dialAttempts not being respected", duration, 2*dt*dialAttempts)
|
|
}
|
|
|
|
if !s1.backf.Backoff(s2p) {
|
|
t.Error("s2 should now be on backoff")
|
|
}
|
|
}
|
|
|
|
func TestDialBackoff(t *testing.T) {
|
|
// t.Skip("skipping for another test")
|
|
if os.Getenv("TRAVIS") == "true" {
|
|
t.Skip("travis will never have fun with this test")
|
|
}
|
|
|
|
t.Parallel()
|
|
|
|
ctx := context.Background()
|
|
swarms := makeSwarms(ctx, t, 2)
|
|
s1 := swarms[0]
|
|
s2 := swarms[1]
|
|
defer s1.Close()
|
|
defer s2.Close()
|
|
|
|
s1.dialT = time.Millisecond * 500 // lower timeout for tests.
|
|
s2.dialT = time.Millisecond * 500 // lower timeout for tests.
|
|
|
|
s2addrs, err := s2.InterfaceListenAddresses()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
s1.peers.AddAddresses(s2.local, s2addrs)
|
|
|
|
// dial to a non-existent peer.
|
|
s3p, s3addr, s3l := newSilentPeer(t)
|
|
go acceptAndHang(s3l)
|
|
defer s3l.Close()
|
|
s1.peers.AddAddress(s3p, s3addr)
|
|
|
|
// in this test we will:
|
|
// 1) dial 10x to each node.
|
|
// 2) all dials should hang
|
|
// 3) s1->s2 should succeed.
|
|
// 4) s1->s3 should not (and should place s3 on backoff)
|
|
// 5) disconnect entirely
|
|
// 6) dial 10x to each node again
|
|
// 7) s3 dials should all return immediately (except 1)
|
|
// 8) s2 dials should all hang, and succeed
|
|
// 9) last s3 dial ends, unsuccessful
|
|
|
|
dialOnlineNode := func(dst peer.ID, times int) <-chan bool {
|
|
ch := make(chan bool)
|
|
for i := 0; i < times; i++ {
|
|
go func() {
|
|
if _, err := s1.Dial(ctx, dst); err != nil {
|
|
t.Error("error dialing", dst, err)
|
|
ch <- false
|
|
} else {
|
|
ch <- true
|
|
}
|
|
}()
|
|
}
|
|
return ch
|
|
}
|
|
|
|
dialOfflineNode := func(dst peer.ID, times int) <-chan bool {
|
|
ch := make(chan bool)
|
|
for i := 0; i < times; i++ {
|
|
go func() {
|
|
if c, err := s1.Dial(ctx, dst); err != nil {
|
|
ch <- false
|
|
} else {
|
|
t.Error("succeeded in dialing", dst)
|
|
ch <- true
|
|
c.Close()
|
|
}
|
|
}()
|
|
}
|
|
return ch
|
|
}
|
|
|
|
{
|
|
// 1) dial 10x to each node.
|
|
N := 10
|
|
s2done := dialOnlineNode(s2.local, N)
|
|
s3done := dialOfflineNode(s3p, N)
|
|
|
|
// when all dials should be done by:
|
|
dialTimeout1x := time.After(s1.dialT)
|
|
dialTimeout1Ax := time.After(s1.dialT * dialAttempts)
|
|
dialTimeout10Ax := time.After(s1.dialT * dialAttempts * 10)
|
|
|
|
// 2) all dials should hang
|
|
select {
|
|
case <-s2done:
|
|
t.Error("s2 should not happen immediately")
|
|
case <-s3done:
|
|
t.Error("s3 should not happen yet")
|
|
case <-time.After(time.Millisecond):
|
|
// s2 may finish very quickly, so let's get out.
|
|
}
|
|
|
|
// 3) s1->s2 should succeed.
|
|
for i := 0; i < N; i++ {
|
|
select {
|
|
case r := <-s2done:
|
|
if !r {
|
|
t.Error("s2 should not fail")
|
|
}
|
|
case <-s3done:
|
|
t.Error("s3 should not happen yet")
|
|
case <-dialTimeout1x:
|
|
t.Error("s2 took too long")
|
|
}
|
|
}
|
|
|
|
select {
|
|
case <-s2done:
|
|
t.Error("s2 should have no more")
|
|
case <-s3done:
|
|
t.Error("s3 should not happen yet")
|
|
case <-dialTimeout1x: // let it pass
|
|
}
|
|
|
|
// 4) s1->s3 should not (and should place s3 on backoff)
|
|
// N-1 should finish before dialTimeout1Ax
|
|
for i := 0; i < N; i++ {
|
|
select {
|
|
case <-s2done:
|
|
t.Error("s2 should have no more")
|
|
case r := <-s3done:
|
|
if r {
|
|
t.Error("s3 should not succeed")
|
|
}
|
|
case <-dialTimeout1Ax:
|
|
if i < (N - 1) {
|
|
t.Fatal("s3 took too long")
|
|
}
|
|
t.Log("dialTimeout1Ax hit for last peer")
|
|
case <-dialTimeout10Ax:
|
|
t.Fatal("s3 took too long")
|
|
}
|
|
}
|
|
|
|
// check backoff state
|
|
if s1.backf.Backoff(s2.local) {
|
|
t.Error("s2 should not be on backoff")
|
|
}
|
|
if !s1.backf.Backoff(s3p) {
|
|
t.Error("s3 should be on backoff")
|
|
}
|
|
|
|
// 5) disconnect entirely
|
|
|
|
for _, c := range s1.Connections() {
|
|
c.Close()
|
|
}
|
|
for i := 0; i < 100 && len(s1.Connections()) > 0; i++ {
|
|
<-time.After(time.Millisecond)
|
|
}
|
|
if len(s1.Connections()) > 0 {
|
|
t.Fatal("s1 conns must exit")
|
|
}
|
|
}
|
|
|
|
{
|
|
// 6) dial 10x to each node again
|
|
N := 10
|
|
s2done := dialOnlineNode(s2.local, N)
|
|
s3done := dialOfflineNode(s3p, N)
|
|
|
|
// when all dials should be done by:
|
|
dialTimeout1x := time.After(s1.dialT)
|
|
dialTimeout1Ax := time.After(s1.dialT * dialAttempts)
|
|
dialTimeout10Ax := time.After(s1.dialT * dialAttempts * 10)
|
|
|
|
// 7) s3 dials should all return immediately (except 1)
|
|
for i := 0; i < N-1; i++ {
|
|
select {
|
|
case <-s2done:
|
|
t.Error("s2 should not succeed yet")
|
|
case r := <-s3done:
|
|
if r {
|
|
t.Error("s3 should not succeed")
|
|
}
|
|
case <-dialTimeout1x:
|
|
t.Fatal("s3 took too long")
|
|
}
|
|
}
|
|
|
|
// 8) s2 dials should all hang, and succeed
|
|
for i := 0; i < N; i++ {
|
|
select {
|
|
case r := <-s2done:
|
|
if !r {
|
|
t.Error("s2 should succeed")
|
|
}
|
|
// case <-s3done:
|
|
case <-dialTimeout1Ax:
|
|
t.Fatal("s3 took too long")
|
|
}
|
|
}
|
|
|
|
// 9) the last s3 should return, failed.
|
|
select {
|
|
case <-s2done:
|
|
t.Error("s2 should have no more")
|
|
case r := <-s3done:
|
|
if r {
|
|
t.Error("s3 should not succeed")
|
|
}
|
|
case <-dialTimeout10Ax:
|
|
t.Fatal("s3 took too long")
|
|
}
|
|
|
|
// check backoff state (the same)
|
|
if s1.backf.Backoff(s2.local) {
|
|
t.Error("s2 should not be on backoff")
|
|
}
|
|
if !s1.backf.Backoff(s3p) {
|
|
t.Error("s3 should be on backoff")
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
func TestDialBackoffClears(t *testing.T) {
|
|
// t.Skip("skipping for another test")
|
|
t.Parallel()
|
|
|
|
ctx := context.Background()
|
|
swarms := makeSwarms(ctx, t, 2)
|
|
s1 := swarms[0]
|
|
s2 := swarms[1]
|
|
defer s1.Close()
|
|
defer s2.Close()
|
|
s1.dialT = time.Millisecond * 300 // lower timeout for tests.
|
|
s2.dialT = time.Millisecond * 300 // lower timeout for tests.
|
|
if os.Getenv("TRAVIS") == "true" {
|
|
s1.dialT = time.Second
|
|
s2.dialT = time.Second
|
|
}
|
|
|
|
// use another address first, that accept and hang on conns
|
|
_, s2bad, s2l := newSilentPeer(t)
|
|
go acceptAndHang(s2l)
|
|
defer s2l.Close()
|
|
|
|
// phase 1 -- dial to non-operational addresses
|
|
s1.peers.AddAddress(s2.local, s2bad)
|
|
|
|
before := time.Now()
|
|
if c, err := s1.Dial(ctx, s2.local); err == nil {
|
|
t.Fatal("dialing to broken addr worked...", err)
|
|
defer c.Close()
|
|
} else {
|
|
t.Log("correctly got error:", err)
|
|
}
|
|
duration := time.Now().Sub(before)
|
|
|
|
dt := s1.dialT
|
|
if duration < dt*dialAttempts {
|
|
t.Error("< DialTimeout * dialAttempts not being respected", duration, dt*dialAttempts)
|
|
}
|
|
if duration > 2*dt*dialAttempts {
|
|
t.Error("> 2*DialTimeout * dialAttempts not being respected", duration, 2*dt*dialAttempts)
|
|
}
|
|
|
|
if !s1.backf.Backoff(s2.local) {
|
|
t.Error("s2 should now be on backoff")
|
|
} else {
|
|
t.Log("correctly added to backoff")
|
|
}
|
|
|
|
// phase 2 -- add the working address. dial should succeed.
|
|
ifaceAddrs1, err := swarms[1].InterfaceListenAddresses()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
s1.peers.AddAddresses(s2.local, ifaceAddrs1)
|
|
|
|
before = time.Now()
|
|
if c, err := s1.Dial(ctx, s2.local); err != nil {
|
|
t.Fatal(err)
|
|
} else {
|
|
c.Close()
|
|
t.Log("correctly connected")
|
|
}
|
|
duration = time.Now().Sub(before)
|
|
|
|
if duration >= dt {
|
|
// t.Error("took too long", duration, dt)
|
|
}
|
|
|
|
if s1.backf.Backoff(s2.local) {
|
|
t.Error("s2 should no longer be on backoff")
|
|
} else {
|
|
t.Log("correctly cleared backoff")
|
|
}
|
|
}
|