diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 959cb3580..666d918e0 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -141,7 +141,7 @@ }, { "ImportPath": "github.com/jbenet/go-peerstream", - "Rev": "5023d0d6b3efeb50c2c30535d011bdcb2351e212" + "Rev": "1c71a3e04eeef9297a12ecdff75a0b28ffa8bf90" }, { "ImportPath": "github.com/jbenet/go-random", diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go index 63c0a5e1e..e5325e331 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go @@ -159,7 +159,6 @@ func (s *Swarm) addConn(netConn net.Conn, server bool) (*Conn, error) { // first, check if we already have it... for c := range s.conns { if c.netConn == netConn { - s.connLock.Unlock() return c, nil } } @@ -167,7 +166,6 @@ func (s *Swarm) addConn(netConn net.Conn, server bool) (*Conn, error) { // create a new spdystream connection ssConn, err := ss.NewConnection(netConn, server) if err != nil { - s.connLock.Unlock() return nil, err } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/example/blockhandler/blockhandler.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/example/blockhandler/blockhandler.go new file mode 100644 index 000000000..391aa0fbf --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/example/blockhandler/blockhandler.go @@ -0,0 +1,77 @@ +package main + +import ( + "bufio" + "fmt" + "net" + "os" + "time" + + ps "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream" +) + +func die(err error) { + fmt.Fprintf(os.Stderr, "error: %s\n") + os.Exit(1) +} + +func main() { + // create a new Swarm + swarm := ps.NewSwarm() + defer swarm.Close() + + // tell swarm what to do with a new incoming streams. + // EchoHandler just echos back anything they write. + swarm.SetStreamHandler(ps.EchoHandler) + + l, err := net.Listen("tcp", "localhost:8001") + if err != nil { + die(err) + } + + if _, err := swarm.AddListener(l); err != nil { + die(err) + } + + nc, err := net.Dial("tcp", "localhost:8001") + if err != nil { + die(err) + } + + c, err := swarm.AddConn(nc) + if err != nil { + die(err) + } + + nRcvStream := 0 + bio := bufio.NewReader(os.Stdin) + swarm.SetStreamHandler(func(s *ps.Stream) { + log("handling new stream %d", nRcvStream) + nRcvStream++ + + line, err := bio.ReadString('\n') + if err != nil { + die(err) + } + _ = line + // line = "read: " + line + // s.Write([]byte(line)) + s.Close() + }) + + nSndStream := 0 + for { + <-time.After(200 * time.Millisecond) + s, err := swarm.NewStreamWithConn(c) + if err != nil { + die(err) + } + log("sender got new stream %d", nSndStream) + nSndStream++ + s.Wait() + } +} + +func log(s string, ifs ...interface{}) { + fmt.Fprintf(os.Stderr, s+"\n", ifs...) +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/swarm.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/swarm.go index 21f1313da..b6e8b4146 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/swarm.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/swarm.go @@ -110,28 +110,34 @@ func (s *Swarm) SelectConn() SelectConn { // Conns returns all the connections associated with this Swarm. func (s *Swarm) Conns() []*Conn { + s.connLock.RLock() conns := make([]*Conn, 0, len(s.conns)) for c := range s.conns { conns = append(conns, c) } + s.connLock.RUnlock() return conns } // Listeners returns all the listeners associated with this Swarm. func (s *Swarm) Listeners() []*Listener { + s.listenerLock.RLock() out := make([]*Listener, 0, len(s.listeners)) for c := range s.listeners { out = append(out, c) } + s.listenerLock.RUnlock() return out } // Streams returns all the streams associated with this Swarm. func (s *Swarm) Streams() []*Stream { + s.streamLock.RLock() out := make([]*Stream, 0, len(s.streams)) for c := range s.streams { out = append(out, c) } + s.streamLock.RUnlock() return out }