mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-29 17:36:38 +08:00
reconnect p2p/ tests
This commit is contained in:
2
p2p/test/reconnects/reconnect.go
Normal file
2
p2p/test/reconnects/reconnect.go
Normal file
@ -0,0 +1,2 @@
|
||||
// Package reconnect tests connect -> disconnect -> reconnect works
|
||||
package reconnect
|
234
p2p/test/reconnects/reconnect_test.go
Normal file
234
p2p/test/reconnects/reconnect_test.go
Normal file
@ -0,0 +1,234 @@
|
||||
package reconnect
|
||||
|
||||
import (
|
||||
crand "crypto/rand"
|
||||
"io"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
host "github.com/jbenet/go-ipfs/p2p/host"
|
||||
inet "github.com/jbenet/go-ipfs/p2p/net"
|
||||
swarm "github.com/jbenet/go-ipfs/p2p/net/swarm"
|
||||
protocol "github.com/jbenet/go-ipfs/p2p/protocol"
|
||||
testutil "github.com/jbenet/go-ipfs/p2p/test/util"
|
||||
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
ps "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream"
|
||||
)
|
||||
|
||||
func init() {
|
||||
// change the garbage collect timeout for testing.
|
||||
ps.GarbageCollectTimeout = 10 * time.Millisecond
|
||||
}
|
||||
|
||||
var log = eventlog.Logger("reconnect")
|
||||
|
||||
func EchoStreamHandler(stream inet.Stream) {
|
||||
c := stream.Conn()
|
||||
log.Debugf("%s echoing %s", c.LocalPeer(), c.RemotePeer())
|
||||
go func() {
|
||||
defer stream.Close()
|
||||
io.Copy(stream, stream)
|
||||
}()
|
||||
}
|
||||
|
||||
type sendChans struct {
|
||||
send chan struct{}
|
||||
sent chan struct{}
|
||||
read chan struct{}
|
||||
close_ chan struct{}
|
||||
closed chan struct{}
|
||||
}
|
||||
|
||||
func newSendChans() sendChans {
|
||||
return sendChans{
|
||||
send: make(chan struct{}),
|
||||
sent: make(chan struct{}),
|
||||
read: make(chan struct{}),
|
||||
close_: make(chan struct{}),
|
||||
closed: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func newSender() (chan sendChans, func(s inet.Stream)) {
|
||||
scc := make(chan sendChans)
|
||||
return scc, func(s inet.Stream) {
|
||||
sc := newSendChans()
|
||||
scc <- sc
|
||||
|
||||
defer func() {
|
||||
s.Close()
|
||||
sc.closed <- struct{}{}
|
||||
}()
|
||||
|
||||
buf := make([]byte, 65536)
|
||||
buf2 := make([]byte, 65536)
|
||||
crand.Read(buf)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-sc.close_:
|
||||
return
|
||||
case <-sc.send:
|
||||
}
|
||||
|
||||
// send a randomly sized subchunk
|
||||
from := rand.Intn(len(buf) / 2)
|
||||
to := rand.Intn(len(buf) / 2)
|
||||
sendbuf := buf[from : from+to]
|
||||
|
||||
log.Debugf("sender sending %d bytes", len(sendbuf))
|
||||
n, err := s.Write(sendbuf)
|
||||
if err != nil {
|
||||
log.Debug("sender error. exiting:", err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debugf("sender wrote %d bytes", n)
|
||||
sc.sent <- struct{}{}
|
||||
|
||||
if n, err = io.ReadFull(s, buf2[:len(sendbuf)]); err != nil {
|
||||
log.Debug("sender error. failed to read:", err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debugf("sender read %d bytes", n)
|
||||
sc.read <- struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestReconnect tests whether hosts are able to disconnect and reconnect.
|
||||
func TestReconnect2(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
h1 := testutil.GenHostSwarm(t, ctx)
|
||||
h2 := testutil.GenHostSwarm(t, ctx)
|
||||
hosts := []host.Host{h1, h2}
|
||||
|
||||
h1.SetStreamHandler(protocol.TestingID, EchoStreamHandler)
|
||||
h2.SetStreamHandler(protocol.TestingID, EchoStreamHandler)
|
||||
|
||||
rounds := 10
|
||||
if testing.Short() {
|
||||
rounds = 4
|
||||
}
|
||||
for i := 0; i < rounds; i++ {
|
||||
log.Debugf("TestReconnect: %d/%d\n", i, rounds)
|
||||
SubtestConnSendDisc(t, hosts)
|
||||
}
|
||||
}
|
||||
|
||||
// TestReconnect tests whether hosts are able to disconnect and reconnect.
|
||||
func TestReconnect5(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
h1 := testutil.GenHostSwarm(t, ctx)
|
||||
h2 := testutil.GenHostSwarm(t, ctx)
|
||||
h3 := testutil.GenHostSwarm(t, ctx)
|
||||
h4 := testutil.GenHostSwarm(t, ctx)
|
||||
h5 := testutil.GenHostSwarm(t, ctx)
|
||||
hosts := []host.Host{h1, h2, h3, h4, h5}
|
||||
|
||||
h1.SetStreamHandler(protocol.TestingID, EchoStreamHandler)
|
||||
h2.SetStreamHandler(protocol.TestingID, EchoStreamHandler)
|
||||
h3.SetStreamHandler(protocol.TestingID, EchoStreamHandler)
|
||||
h4.SetStreamHandler(protocol.TestingID, EchoStreamHandler)
|
||||
h5.SetStreamHandler(protocol.TestingID, EchoStreamHandler)
|
||||
|
||||
rounds := 10
|
||||
if testing.Short() {
|
||||
rounds = 4
|
||||
}
|
||||
for i := 0; i < rounds; i++ {
|
||||
log.Debugf("TestReconnect: %d/%d\n", i, rounds)
|
||||
SubtestConnSendDisc(t, hosts)
|
||||
}
|
||||
}
|
||||
|
||||
func SubtestConnSendDisc(t *testing.T, hosts []host.Host) {
|
||||
|
||||
ctx := context.Background()
|
||||
numStreams := 3 * len(hosts)
|
||||
numMsgs := 10
|
||||
|
||||
if testing.Short() {
|
||||
numStreams = 5 * len(hosts)
|
||||
numMsgs = 4
|
||||
}
|
||||
|
||||
ss, sF := newSender()
|
||||
|
||||
for _, h1 := range hosts {
|
||||
for _, h2 := range hosts {
|
||||
if h1.ID() >= h2.ID() {
|
||||
continue
|
||||
}
|
||||
|
||||
h2pi := h2.Peerstore().PeerInfo(h2.ID())
|
||||
log.Debugf("dialing %s", h2pi.Addrs)
|
||||
if err := h1.Connect(ctx, h2pi); err != nil {
|
||||
t.Fatalf("Failed to connect:", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < numStreams; i++ {
|
||||
h1 := hosts[i%len(hosts)]
|
||||
h2 := hosts[(i+1)%len(hosts)]
|
||||
s, err := h1.NewStream(protocol.TestingID, h2.ID())
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func(j int) {
|
||||
defer wg.Done()
|
||||
|
||||
go sF(s)
|
||||
log.Debugf("getting handle %d", i)
|
||||
sc := <-ss // wait to get handle.
|
||||
log.Debugf("spawning worker %d", i)
|
||||
|
||||
for i := 0; i < numMsgs; i++ {
|
||||
sc.send <- struct{}{}
|
||||
<-sc.sent
|
||||
log.Debugf("%d sent %d", j, i)
|
||||
<-sc.read
|
||||
log.Debugf("%d read %d", j, i)
|
||||
}
|
||||
sc.close_ <- struct{}{}
|
||||
<-sc.closed
|
||||
log.Debugf("closed %d", j)
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
for i, h1 := range hosts {
|
||||
log.Debugf("host %d has %d conns", i, len(h1.Network().Conns()))
|
||||
}
|
||||
|
||||
for _, h1 := range hosts {
|
||||
// close connection
|
||||
cs := h1.Network().Conns()
|
||||
for _, c := range cs {
|
||||
sc := c.(*swarm.Conn)
|
||||
if sc.LocalPeer() > sc.RemotePeer() {
|
||||
continue // only close it on one side.
|
||||
}
|
||||
|
||||
log.Debugf("closing: %s", sc.RawConn())
|
||||
sc.Close()
|
||||
}
|
||||
}
|
||||
|
||||
<-time.After(20 * time.Millisecond)
|
||||
|
||||
for i, h := range hosts {
|
||||
if len(h.Network().Conns()) > 0 {
|
||||
t.Fatalf("host %d %s has %d conns! not zero.", i, h.ID(), len(h.Network().Conns()))
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user