diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 8031ce054..6aa0e7ad0 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -136,7 +136,7 @@ }, { "ImportPath": "github.com/jbenet/go-peerstream", - "Rev": "eab3056e47ecbd1bb32b8c8512fe46fc856f0387" + "Rev": "55792f89d00cf62166668ded3288536cbe6a72cc" }, { "ImportPath": "github.com/jbenet/go-random", diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go index 05e6a339a..183ab4e71 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go @@ -111,8 +111,8 @@ func (c *Conn) Close() error { } // close underlying connection - c.netConn.Close() - return c.swarm.removeConn(c) + c.swarm.removeConn(c) + return c.pstConn.Close() } // ConnsWithGroup narrows down a set of connections to those in a given group. @@ -234,10 +234,9 @@ func (s *Swarm) removeStream(stream *Stream) error { return stream.pstStream.Close() } -func (s *Swarm) removeConn(conn *Conn) error { +func (s *Swarm) removeConn(conn *Conn) { // remove from our maps s.connLock.Lock() delete(s.conns, conn) s.connLock.Unlock() - return nil } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/swarm.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/swarm.go index b075e6db4..4138043bb 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/swarm.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/swarm.go @@ -4,6 +4,7 @@ import ( "errors" "net" "sync" + "time" pst "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport" ) @@ -11,6 +12,9 @@ import ( // fd is a (file) descriptor, unix style type fd uint32 +// GarbageCollectTimeout governs the periodic connection closer. +var GarbageCollectTimeout = 5 * time.Second + type Swarm struct { // the transport we'll use. transport pst.Transport @@ -33,10 +37,12 @@ type Swarm struct { connHandler ConnHandler // receives Conns intiated remotely streamHandler StreamHandler // receives Streams initiated remotely selectConn SelectConn // default SelectConn function + + closed chan struct{} } func NewSwarm(t pst.Transport) *Swarm { - return &Swarm{ + s := &Swarm{ transport: t, streams: make(map[*Stream]struct{}), conns: make(map[*Conn]struct{}), @@ -44,7 +50,10 @@ func NewSwarm(t pst.Transport) *Swarm { selectConn: SelectRandomConn, streamHandler: NoOpStreamHandler, connHandler: NoOpConnHandler, + closed: make(chan struct{}), } + go s.connGarbageCollect() + return s } // SetStreamHandler assigns the stream handler in the swarm. @@ -122,7 +131,16 @@ func (s *Swarm) Conns() []*Conn { conns = append(conns, c) } s.connLock.RUnlock() - return conns + + open := make([]*Conn, 0, len(conns)) + for _, c := range conns { + if c.pstConn.IsClosed() { + c.Close() + } else { + open = append(open, c) + } + } + return open } // Listeners returns all the listeners associated with this Swarm. @@ -225,6 +243,11 @@ func (s *Swarm) NewStreamWithConn(conn *Conn) (*Stream, error) { return nil, errors.New("connection not associated with swarm") } + if conn.pstConn.IsClosed() { + go conn.Close() + return nil, errors.New("conn is closed") + } + s.connLock.RLock() if _, found := s.conns[conn]; !found { s.connLock.RUnlock() @@ -251,6 +274,46 @@ func (s *Swarm) StreamsWithGroup(g Group) []*Stream { // Close shuts down the Swarm, and it's listeners. func (s *Swarm) Close() error { - // shut down TODO + // automatically close everything new we get. + s.SetConnHandler(func(c *Conn) { c.Close() }) + s.SetStreamHandler(func(s *Stream) { s.Close() }) + + var wgl sync.WaitGroup + for _, l := range s.Listeners() { + wgl.Add(1) + go func() { + l.Close() + wgl.Done() + }() + } + wgl.Wait() + + var wgc sync.WaitGroup + for _, c := range s.Conns() { + wgc.Add(1) + go func() { + c.Close() + wgc.Done() + }() + } + wgc.Wait() return nil } + +// connGarbageCollect periodically sweeps conns to make sure +// they're still alive. if any are closed, remvoes them. +func (s *Swarm) connGarbageCollect() { + for { + select { + case <-s.closed: + return + case <-time.After(GarbageCollectTimeout): + } + + for _, c := range s.Conns() { + if c.pstConn.IsClosed() { + go c.Close() + } + } + } +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/muxado/muxado.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/muxado/muxado.go index 3cf18ed08..d0d61f8ee 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/muxado/muxado.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/muxado/muxado.go @@ -31,6 +31,8 @@ func (s *stream) Close() error { // Conn is a connection to a remote peer. type conn struct { ms muxado.Session + + closed chan struct{} } func (c *conn) muxadoSession() muxado.Session { @@ -41,6 +43,15 @@ func (c *conn) Close() error { return c.ms.Close() } +func (c *conn) IsClosed() bool { + select { + case <-c.closed: + return true + default: + return false + } +} + // OpenStream creates a new stream. func (c *conn) OpenStream() (pst.Stream, error) { s, err := c.ms.Open() @@ -76,5 +87,10 @@ func (t transport) NewConn(nc net.Conn, isServer bool) (pst.Conn, error) { } else { s = muxado.Client(nc) } - return &conn{ms: s}, nil + cl := make(chan struct{}) + go func() { + s.Wait() + close(cl) + }() + return &conn{ms: s, closed: cl}, nil } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/transport.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/transport.go index 239181072..6bace1855 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/transport.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/transport.go @@ -20,6 +20,10 @@ type StreamHandler func(Stream) type Conn interface { io.Closer + // IsClosed returns whether a connection is fully closed, so it can + // be garbage collected. + IsClosed() bool + // OpenStream creates a new stream. OpenStream() (Stream, error) diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux/yamux.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux/yamux.go index d329b7bdd..f76d316ee 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux/yamux.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux/yamux.go @@ -39,6 +39,10 @@ func (c *conn) Close() error { return c.yamuxSession().Close() } +func (c *conn) IsClosed() bool { + return c.yamuxSession().IsClosed() +} + // OpenStream creates a new stream. func (c *conn) OpenStream() (pst.Stream, error) { s, err := c.yamuxSession().OpenStream() diff --git a/p2p/net/conn/secure_conn.go b/p2p/net/conn/secure_conn.go index 6d8cca6d5..d2c16e0ac 100644 --- a/p2p/net/conn/secure_conn.go +++ b/p2p/net/conn/secure_conn.go @@ -58,11 +58,7 @@ func newSecureConn(ctx context.Context, sk ic.PrivKey, insecure Conn) (Conn, err } func (c *secureConn) Close() error { - if err := c.secure.Close(); err != nil { - c.insecure.Close() - return err - } - return c.insecure.Close() + return c.secure.Close() } // ID is an identifier unique to this connection. diff --git a/p2p/test/reconnects/reconnect.go b/p2p/test/reconnects/reconnect.go new file mode 100644 index 000000000..a0f356d2e --- /dev/null +++ b/p2p/test/reconnects/reconnect.go @@ -0,0 +1,2 @@ +// Package reconnect tests connect -> disconnect -> reconnect works +package reconnect diff --git a/p2p/test/reconnects/reconnect_test.go b/p2p/test/reconnects/reconnect_test.go new file mode 100644 index 000000000..fee269bbd --- /dev/null +++ b/p2p/test/reconnects/reconnect_test.go @@ -0,0 +1,234 @@ +package reconnect + +import ( + crand "crypto/rand" + "io" + "math/rand" + "sync" + "testing" + "time" + + host "github.com/jbenet/go-ipfs/p2p/host" + inet "github.com/jbenet/go-ipfs/p2p/net" + swarm "github.com/jbenet/go-ipfs/p2p/net/swarm" + protocol "github.com/jbenet/go-ipfs/p2p/protocol" + testutil "github.com/jbenet/go-ipfs/p2p/test/util" + eventlog "github.com/jbenet/go-ipfs/util/eventlog" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ps "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream" +) + +func init() { + // change the garbage collect timeout for testing. + ps.GarbageCollectTimeout = 10 * time.Millisecond +} + +var log = eventlog.Logger("reconnect") + +func EchoStreamHandler(stream inet.Stream) { + c := stream.Conn() + log.Debugf("%s echoing %s", c.LocalPeer(), c.RemotePeer()) + go func() { + defer stream.Close() + io.Copy(stream, stream) + }() +} + +type sendChans struct { + send chan struct{} + sent chan struct{} + read chan struct{} + close_ chan struct{} + closed chan struct{} +} + +func newSendChans() sendChans { + return sendChans{ + send: make(chan struct{}), + sent: make(chan struct{}), + read: make(chan struct{}), + close_: make(chan struct{}), + closed: make(chan struct{}), + } +} + +func newSender() (chan sendChans, func(s inet.Stream)) { + scc := make(chan sendChans) + return scc, func(s inet.Stream) { + sc := newSendChans() + scc <- sc + + defer func() { + s.Close() + sc.closed <- struct{}{} + }() + + buf := make([]byte, 65536) + buf2 := make([]byte, 65536) + crand.Read(buf) + + for { + select { + case <-sc.close_: + return + case <-sc.send: + } + + // send a randomly sized subchunk + from := rand.Intn(len(buf) / 2) + to := rand.Intn(len(buf) / 2) + sendbuf := buf[from : from+to] + + log.Debugf("sender sending %d bytes", len(sendbuf)) + n, err := s.Write(sendbuf) + if err != nil { + log.Debug("sender error. exiting:", err) + return + } + + log.Debugf("sender wrote %d bytes", n) + sc.sent <- struct{}{} + + if n, err = io.ReadFull(s, buf2[:len(sendbuf)]); err != nil { + log.Debug("sender error. failed to read:", err) + return + } + + log.Debugf("sender read %d bytes", n) + sc.read <- struct{}{} + } + } +} + +// TestReconnect tests whether hosts are able to disconnect and reconnect. +func TestReconnect2(t *testing.T) { + ctx := context.Background() + h1 := testutil.GenHostSwarm(t, ctx) + h2 := testutil.GenHostSwarm(t, ctx) + hosts := []host.Host{h1, h2} + + h1.SetStreamHandler(protocol.TestingID, EchoStreamHandler) + h2.SetStreamHandler(protocol.TestingID, EchoStreamHandler) + + rounds := 10 + if testing.Short() { + rounds = 4 + } + for i := 0; i < rounds; i++ { + log.Debugf("TestReconnect: %d/%d\n", i, rounds) + SubtestConnSendDisc(t, hosts) + } +} + +// TestReconnect tests whether hosts are able to disconnect and reconnect. +func TestReconnect5(t *testing.T) { + ctx := context.Background() + h1 := testutil.GenHostSwarm(t, ctx) + h2 := testutil.GenHostSwarm(t, ctx) + h3 := testutil.GenHostSwarm(t, ctx) + h4 := testutil.GenHostSwarm(t, ctx) + h5 := testutil.GenHostSwarm(t, ctx) + hosts := []host.Host{h1, h2, h3, h4, h5} + + h1.SetStreamHandler(protocol.TestingID, EchoStreamHandler) + h2.SetStreamHandler(protocol.TestingID, EchoStreamHandler) + h3.SetStreamHandler(protocol.TestingID, EchoStreamHandler) + h4.SetStreamHandler(protocol.TestingID, EchoStreamHandler) + h5.SetStreamHandler(protocol.TestingID, EchoStreamHandler) + + rounds := 10 + if testing.Short() { + rounds = 4 + } + for i := 0; i < rounds; i++ { + log.Debugf("TestReconnect: %d/%d\n", i, rounds) + SubtestConnSendDisc(t, hosts) + } +} + +func SubtestConnSendDisc(t *testing.T, hosts []host.Host) { + + ctx := context.Background() + numStreams := 3 * len(hosts) + numMsgs := 10 + + if testing.Short() { + numStreams = 5 * len(hosts) + numMsgs = 4 + } + + ss, sF := newSender() + + for _, h1 := range hosts { + for _, h2 := range hosts { + if h1.ID() >= h2.ID() { + continue + } + + h2pi := h2.Peerstore().PeerInfo(h2.ID()) + log.Debugf("dialing %s", h2pi.Addrs) + if err := h1.Connect(ctx, h2pi); err != nil { + t.Fatalf("Failed to connect:", err) + } + } + } + + var wg sync.WaitGroup + for i := 0; i < numStreams; i++ { + h1 := hosts[i%len(hosts)] + h2 := hosts[(i+1)%len(hosts)] + s, err := h1.NewStream(protocol.TestingID, h2.ID()) + if err != nil { + t.Error(err) + } + + wg.Add(1) + go func(j int) { + defer wg.Done() + + go sF(s) + log.Debugf("getting handle %d", i) + sc := <-ss // wait to get handle. + log.Debugf("spawning worker %d", i) + + for i := 0; i < numMsgs; i++ { + sc.send <- struct{}{} + <-sc.sent + log.Debugf("%d sent %d", j, i) + <-sc.read + log.Debugf("%d read %d", j, i) + } + sc.close_ <- struct{}{} + <-sc.closed + log.Debugf("closed %d", j) + }(i) + } + wg.Wait() + + for i, h1 := range hosts { + log.Debugf("host %d has %d conns", i, len(h1.Network().Conns())) + } + + for _, h1 := range hosts { + // close connection + cs := h1.Network().Conns() + for _, c := range cs { + sc := c.(*swarm.Conn) + if sc.LocalPeer() > sc.RemotePeer() { + continue // only close it on one side. + } + + log.Debugf("closing: %s", sc.RawConn()) + sc.Close() + } + } + + <-time.After(20 * time.Millisecond) + + for i, h := range hosts { + if len(h.Network().Conns()) > 0 { + t.Fatalf("host %d %s has %d conns! not zero.", i, h.ID(), len(h.Network().Conns())) + } + } +}