From 84a2a591556e8fe724649e882832581d0b5d53cb Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Wed, 7 Jan 2015 18:56:12 -0800 Subject: [PATCH] reconnect p2p/ tests --- p2p/test/reconnects/reconnect.go | 2 + p2p/test/reconnects/reconnect_test.go | 234 ++++++++++++++++++++++++++ 2 files changed, 236 insertions(+) create mode 100644 p2p/test/reconnects/reconnect.go create mode 100644 p2p/test/reconnects/reconnect_test.go diff --git a/p2p/test/reconnects/reconnect.go b/p2p/test/reconnects/reconnect.go new file mode 100644 index 000000000..a0f356d2e --- /dev/null +++ b/p2p/test/reconnects/reconnect.go @@ -0,0 +1,2 @@ +// Package reconnect tests connect -> disconnect -> reconnect works +package reconnect diff --git a/p2p/test/reconnects/reconnect_test.go b/p2p/test/reconnects/reconnect_test.go new file mode 100644 index 000000000..fee269bbd --- /dev/null +++ b/p2p/test/reconnects/reconnect_test.go @@ -0,0 +1,234 @@ +package reconnect + +import ( + crand "crypto/rand" + "io" + "math/rand" + "sync" + "testing" + "time" + + host "github.com/jbenet/go-ipfs/p2p/host" + inet "github.com/jbenet/go-ipfs/p2p/net" + swarm "github.com/jbenet/go-ipfs/p2p/net/swarm" + protocol "github.com/jbenet/go-ipfs/p2p/protocol" + testutil "github.com/jbenet/go-ipfs/p2p/test/util" + eventlog "github.com/jbenet/go-ipfs/util/eventlog" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ps "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream" +) + +func init() { + // change the garbage collect timeout for testing. + ps.GarbageCollectTimeout = 10 * time.Millisecond +} + +var log = eventlog.Logger("reconnect") + +func EchoStreamHandler(stream inet.Stream) { + c := stream.Conn() + log.Debugf("%s echoing %s", c.LocalPeer(), c.RemotePeer()) + go func() { + defer stream.Close() + io.Copy(stream, stream) + }() +} + +type sendChans struct { + send chan struct{} + sent chan struct{} + read chan struct{} + close_ chan struct{} + closed chan struct{} +} + +func newSendChans() sendChans { + return sendChans{ + send: make(chan struct{}), + sent: make(chan struct{}), + read: make(chan struct{}), + close_: make(chan struct{}), + closed: make(chan struct{}), + } +} + +func newSender() (chan sendChans, func(s inet.Stream)) { + scc := make(chan sendChans) + return scc, func(s inet.Stream) { + sc := newSendChans() + scc <- sc + + defer func() { + s.Close() + sc.closed <- struct{}{} + }() + + buf := make([]byte, 65536) + buf2 := make([]byte, 65536) + crand.Read(buf) + + for { + select { + case <-sc.close_: + return + case <-sc.send: + } + + // send a randomly sized subchunk + from := rand.Intn(len(buf) / 2) + to := rand.Intn(len(buf) / 2) + sendbuf := buf[from : from+to] + + log.Debugf("sender sending %d bytes", len(sendbuf)) + n, err := s.Write(sendbuf) + if err != nil { + log.Debug("sender error. exiting:", err) + return + } + + log.Debugf("sender wrote %d bytes", n) + sc.sent <- struct{}{} + + if n, err = io.ReadFull(s, buf2[:len(sendbuf)]); err != nil { + log.Debug("sender error. failed to read:", err) + return + } + + log.Debugf("sender read %d bytes", n) + sc.read <- struct{}{} + } + } +} + +// TestReconnect tests whether hosts are able to disconnect and reconnect. +func TestReconnect2(t *testing.T) { + ctx := context.Background() + h1 := testutil.GenHostSwarm(t, ctx) + h2 := testutil.GenHostSwarm(t, ctx) + hosts := []host.Host{h1, h2} + + h1.SetStreamHandler(protocol.TestingID, EchoStreamHandler) + h2.SetStreamHandler(protocol.TestingID, EchoStreamHandler) + + rounds := 10 + if testing.Short() { + rounds = 4 + } + for i := 0; i < rounds; i++ { + log.Debugf("TestReconnect: %d/%d\n", i, rounds) + SubtestConnSendDisc(t, hosts) + } +} + +// TestReconnect tests whether hosts are able to disconnect and reconnect. +func TestReconnect5(t *testing.T) { + ctx := context.Background() + h1 := testutil.GenHostSwarm(t, ctx) + h2 := testutil.GenHostSwarm(t, ctx) + h3 := testutil.GenHostSwarm(t, ctx) + h4 := testutil.GenHostSwarm(t, ctx) + h5 := testutil.GenHostSwarm(t, ctx) + hosts := []host.Host{h1, h2, h3, h4, h5} + + h1.SetStreamHandler(protocol.TestingID, EchoStreamHandler) + h2.SetStreamHandler(protocol.TestingID, EchoStreamHandler) + h3.SetStreamHandler(protocol.TestingID, EchoStreamHandler) + h4.SetStreamHandler(protocol.TestingID, EchoStreamHandler) + h5.SetStreamHandler(protocol.TestingID, EchoStreamHandler) + + rounds := 10 + if testing.Short() { + rounds = 4 + } + for i := 0; i < rounds; i++ { + log.Debugf("TestReconnect: %d/%d\n", i, rounds) + SubtestConnSendDisc(t, hosts) + } +} + +func SubtestConnSendDisc(t *testing.T, hosts []host.Host) { + + ctx := context.Background() + numStreams := 3 * len(hosts) + numMsgs := 10 + + if testing.Short() { + numStreams = 5 * len(hosts) + numMsgs = 4 + } + + ss, sF := newSender() + + for _, h1 := range hosts { + for _, h2 := range hosts { + if h1.ID() >= h2.ID() { + continue + } + + h2pi := h2.Peerstore().PeerInfo(h2.ID()) + log.Debugf("dialing %s", h2pi.Addrs) + if err := h1.Connect(ctx, h2pi); err != nil { + t.Fatalf("Failed to connect:", err) + } + } + } + + var wg sync.WaitGroup + for i := 0; i < numStreams; i++ { + h1 := hosts[i%len(hosts)] + h2 := hosts[(i+1)%len(hosts)] + s, err := h1.NewStream(protocol.TestingID, h2.ID()) + if err != nil { + t.Error(err) + } + + wg.Add(1) + go func(j int) { + defer wg.Done() + + go sF(s) + log.Debugf("getting handle %d", i) + sc := <-ss // wait to get handle. + log.Debugf("spawning worker %d", i) + + for i := 0; i < numMsgs; i++ { + sc.send <- struct{}{} + <-sc.sent + log.Debugf("%d sent %d", j, i) + <-sc.read + log.Debugf("%d read %d", j, i) + } + sc.close_ <- struct{}{} + <-sc.closed + log.Debugf("closed %d", j) + }(i) + } + wg.Wait() + + for i, h1 := range hosts { + log.Debugf("host %d has %d conns", i, len(h1.Network().Conns())) + } + + for _, h1 := range hosts { + // close connection + cs := h1.Network().Conns() + for _, c := range cs { + sc := c.(*swarm.Conn) + if sc.LocalPeer() > sc.RemotePeer() { + continue // only close it on one side. + } + + log.Debugf("closing: %s", sc.RawConn()) + sc.Close() + } + } + + <-time.After(20 * time.Millisecond) + + for i, h := range hosts { + if len(h.Network().Conns()) > 0 { + t.Fatalf("host %d %s has %d conns! not zero.", i, h.ID(), len(h.Network().Conns())) + } + } +}