1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-09-10 11:52:21 +08:00

Merge pull request #1461 from ipfs/update-peerstream

Update go-peerstream
This commit is contained in:
Juan Batiz-Benet
2015-07-09 14:50:58 -07:00
14 changed files with 259 additions and 49 deletions

2
Godeps/Godeps.json generated
View File

@ -172,7 +172,7 @@
}, },
{ {
"ImportPath": "github.com/jbenet/go-peerstream", "ImportPath": "github.com/jbenet/go-peerstream",
"Rev": "8d52ed2801410a2af995b4e87660272d11c8a9a4" "Rev": "62fe5ede12f9d9cd9406750160122525b3d6b694"
}, },
{ {
"ImportPath": "github.com/jbenet/go-random", "ImportPath": "github.com/jbenet/go-random",

View File

@ -7,4 +7,5 @@ go:
- tip - tip
script: script:
- go test -race -cpu=5 ./... - go test ./...
# - go test -race -cpu=5 ./...

View File

@ -0,0 +1,33 @@
{
"ImportPath": "github.com/jbenet/go-peerstream",
"GoVersion": "go1.4.2",
"Packages": [
"./..."
],
"Deps": [
{
"ImportPath": "github.com/docker/spdystream",
"Rev": "b2c3287865f3ad6aa22821ddb7b4692b896ac207"
},
{
"ImportPath": "github.com/hashicorp/yamux",
"Rev": "b2e55852ddaf823a85c67f798080eb7d08acd71d"
},
{
"ImportPath": "github.com/inconshreveable/muxado",
"Rev": "f693c7e88ba316d1a0ae3e205e22a01aa3ec2848"
},
{
"ImportPath": "github.com/jbenet/go-temp-err-catcher",
"Rev": "aac704a3f4f27190b4ccc05f303a4931fd1241ff"
},
{
"ImportPath": "github.com/whyrusleeping/go-multiplex",
"Rev": "ce5baa716247510379cb7640a14da857afd3b622"
},
{
"ImportPath": "github.com/whyrusleeping/go-multistream",
"Rev": "08e8f9c9f5665ed0c63ffde4fa5ef1d5fb3d516d"
}
]
}

View File

@ -0,0 +1,5 @@
This directory tree is generated automatically by godep.
Please do not edit.
See https://github.com/tools/godep for more information.

View File

@ -0,0 +1,15 @@
godep:
go get github.com/tools/godep
vendor: godep
godep save -r ./...
build:
go build ./...
test:
go test ./...
test_race:
go test -race -cpu 5 ./...

View File

@ -158,37 +158,7 @@ func ConnInConns(c1 *Conn, conns []*Conn) bool {
// addConn is the internal version of AddConn. we need the server bool // addConn is the internal version of AddConn. we need the server bool
// as spdystream requires it. // as spdystream requires it.
func (s *Swarm) addConn(netConn net.Conn, isServer bool) (*Conn, error) { func (s *Swarm) addConn(netConn net.Conn, isServer bool) (*Conn, error) {
if netConn == nil { c, err := s.setupConn(netConn, isServer)
return nil, errors.New("nil conn")
}
// this function is so we can defer our lock, which needs to be
// unlocked **before** the Handler is called (which needs to be
// sequential). This was the simplest thing :)
setupConn := func() (*Conn, error) {
s.connLock.Lock()
defer s.connLock.Unlock()
// first, check if we already have it...
for c := range s.conns {
if c.netConn == netConn {
return c, nil
}
}
// create a new spdystream connection
ssConn, err := s.transport.NewConn(netConn, isServer)
if err != nil {
return nil, err
}
// add the connection
c := newConn(netConn, ssConn, s)
s.conns[c] = struct{}{}
return c, nil
}
c, err := setupConn()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -208,6 +178,49 @@ func (s *Swarm) addConn(netConn net.Conn, isServer bool) (*Conn, error) {
return c, nil return c, nil
} }
// setupConn adds the relevant connection to the map, first checking if it
// was already there.
func (s *Swarm) setupConn(netConn net.Conn, isServer bool) (*Conn, error) {
if netConn == nil {
return nil, errors.New("nil conn")
}
// first, check if we already have it, to avoid constructing it
// if it is already there
s.connLock.Lock()
for c := range s.conns {
if c.netConn == netConn {
s.connLock.Unlock()
return c, nil
}
}
s.connLock.Unlock()
// construct the connection without hanging onto the lock
// (as there could be deadlock if so.)
// create a new spdystream connection
ssConn, err := s.transport.NewConn(netConn, isServer)
if err != nil {
return nil, err
}
// take the lock to add it to the map.
s.connLock.Lock()
defer s.connLock.Unlock()
// check for it again as it may have been added already. (TOCTTOU)
for c := range s.conns {
if c.netConn == netConn {
return c, nil
}
}
// add the connection
c := newConn(netConn, ssConn, s)
s.conns[c] = struct{}{}
return c, nil
}
// createStream is the internal function that creates a new stream. assumes // createStream is the internal function that creates a new stream. assumes
// all validation has happened. // all validation has happened.
func (s *Swarm) createStream(c *Conn) (*Stream, error) { func (s *Swarm) createStream(c *Conn) (*Stream, error) {

View File

@ -4,12 +4,13 @@ import (
"errors" "errors"
"fmt" "fmt"
"net" "net"
"sync"
tec "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-temp-err-catcher" tec "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-temp-err-catcher"
) )
// AcceptConcurrency is how many connections can simultaneously be // AcceptConcurrency is how many connections can simultaneously be
// in process of being accepted. Handshakes can sometimes occurr as // in process of being accepted. Handshakes can sometimes occur as
// part of this process, so it may take some time. It is imporant to // part of this process, so it may take some time. It is imporant to
// rate limit lest a malicious influx of connections would cause our // rate limit lest a malicious influx of connections would cause our
// node to consume all its resources accepting new connections. // node to consume all its resources accepting new connections.
@ -73,7 +74,11 @@ func ListenersWithGroup(g Group, ls []*Listener) []*Listener {
// run in a goroutine. // run in a goroutine.
// TODO: add rate limiting // TODO: add rate limiting
func (l *Listener) accept() { func (l *Listener) accept() {
defer l.teardown() var wg sync.WaitGroup
defer func() {
wg.Wait() // must happen before teardown
l.teardown()
}()
// catching the error here is odd. doing what net/http does: // catching the error here is odd. doing what net/http does:
// http://golang.org/src/net/http/server.go?s=51504:51550#L1728 // http://golang.org/src/net/http/server.go?s=51504:51550#L1728
@ -98,12 +103,15 @@ func (l *Listener) accept() {
// do this in a goroutine to avoid blocking the Accept loop. // do this in a goroutine to avoid blocking the Accept loop.
// note that this does not rate limit accepts. // note that this does not rate limit accepts.
limit <- struct{}{} // sema down limit <- struct{}{} // sema down
wg.Add(1)
go func(conn net.Conn) { go func(conn net.Conn) {
defer func() { <-limit }() // sema up defer func() { <-limit }() // sema up
defer wg.Done()
conn2, err := l.swarm.addConn(conn, true) conn2, err := l.swarm.addConn(conn, true)
if err != nil { if err != nil {
l.acceptErr <- err l.acceptErr <- err
return
} }
conn2.groups.AddSet(&l.groups) // add out groups conn2.groups.AddSet(&l.groups) // add out groups
}(conn) }(conn)

View File

@ -0,0 +1,45 @@
package peerstream_multiplex
import (
"net"
pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport"
mp "github.com/whyrusleeping/go-multiplex"
)
type conn struct {
*mp.Multiplex
}
func ( // Conn is a connection to a remote peer.
c *conn) Close() error {
return c.Multiplex.Close()
}
func (c *conn) IsClosed() bool {
return c.Multiplex.IsClosed()
}
// OpenStream creates a new stream.
func (c *conn) OpenStream() (pst.Stream, error) {
return c.Multiplex.NewStream(), nil
}
// Serve starts listening for incoming requests and handles them
// using given StreamHandler
func (c *conn) Serve(handler pst.StreamHandler) {
c.Multiplex.Serve(func(s *mp.Stream) {
handler(s)
})
}
// Transport is a go-peerstream transport that constructs
// multiplex-backed connections.
type Transport struct{}
// DefaultTransport has default settings for multiplex
var DefaultTransport = &Transport{}
func (t *Transport) NewConn(nc net.Conn, isServer bool) (pst.Conn, error) {
return &conn{mp.NewMultiplex(nc, isServer)}, nil
}

View File

@ -0,0 +1,11 @@
package peerstream_multiplex
import (
"testing"
psttest "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test"
)
func TestMultiplexTransport(t *testing.T) {
psttest.SubtestAll(t, DefaultTransport)
}

View File

@ -0,0 +1,59 @@
// package multistream implements a peerstream transport using
// go-multistream to select the underlying stream muxer
package multistream
import (
"net"
pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport"
multiplex "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multiplex"
spdy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream"
yamux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux"
mss "github.com/whyrusleeping/go-multistream"
)
type transport struct {
mux *mss.MultistreamMuxer
tpts map[string]pst.Transport
}
func NewTransport() pst.Transport {
mux := mss.NewMultistreamMuxer()
mux.AddHandler("/multiplex", nil)
mux.AddHandler("/spdystream", nil)
mux.AddHandler("/yamux", nil)
tpts := map[string]pst.Transport{
"/multiplex": multiplex.DefaultTransport,
"/spdystream": spdy.Transport,
"/yamux": yamux.DefaultTransport,
}
return &transport{
mux: mux,
tpts: tpts,
}
}
func (t *transport) NewConn(nc net.Conn, isServer bool) (pst.Conn, error) {
var proto string
if isServer {
selected, _, err := t.mux.Negotiate(nc)
if err != nil {
return nil, err
}
proto = selected
} else {
// prefer yamux
selected, err := mss.SelectOneOf([]string{"/yamux", "/spdystream", "/multiplex"}, nc)
if err != nil {
return nil, err
}
proto = selected
}
tpt := t.tpts[proto]
return tpt.NewConn(nc, isServer)
}

View File

@ -0,0 +1,11 @@
package multistream
import (
"testing"
psttest "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test"
)
func TestMultiStreamTransport(t *testing.T) {
psttest.SubtestAll(t, NewTransport())
}

View File

@ -4,8 +4,8 @@ import (
"net" "net"
"net/http" "net/http"
ss "github.com/docker/spdystream"
pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport" pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport"
ss "github.com/jbenet/spdystream"
) )
// stream implements pst.Stream using a ss.Stream // stream implements pst.Stream using a ss.Stream
@ -55,6 +55,8 @@ func (c *conn) IsClosed() bool {
select { select {
case <-c.closed: case <-c.closed:
return true return true
case <-c.sc.CloseChan():
return true
default: default:
return false return false
} }
@ -62,7 +64,10 @@ func (c *conn) IsClosed() bool {
// OpenStream creates a new stream. // OpenStream creates a new stream.
func (c *conn) OpenStream() (pst.Stream, error) { func (c *conn) OpenStream() (pst.Stream, error) {
s, err := c.spdyConn().CreateStream(http.Header{}, nil, false) s, err := c.spdyConn().CreateStream(http.Header{
":method": []string{"GET"}, // this is here for HTTP/SPDY interop
":path": []string{"/"}, // this is here for HTTP/SPDY interop
}, nil, false)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -87,10 +92,14 @@ func (c *conn) Serve(handler pst.StreamHandler) {
// -- at this moment -- not the solution. Either spdystream must // -- at this moment -- not the solution. Either spdystream must
// change, or we must throttle another way. go-peerstream handles // change, or we must throttle another way. go-peerstream handles
// every new stream in its own goroutine. // every new stream in its own goroutine.
go func() { err := s.SendReply(http.Header{}, false)
s.SendReply(http.Header{}, false) if err != nil {
handler((*stream)(s)) // this _could_ error out. not sure how to handle this failure.
}() // don't return, and let the caller handle a broken stream.
// better than _hiding_ an error.
// return
}
go handler((*stream)(s))
}) })
} }

View File

@ -7,6 +7,5 @@ import (
) )
func TestSpdyStreamTransport(t *testing.T) { func TestSpdyStreamTransport(t *testing.T) {
t.Skip("spdystream is known to be broken")
psttest.SubtestAll(t, Transport) psttest.SubtestAll(t, Transport)
} }

View File

@ -194,7 +194,7 @@ func SubtestSimpleWrite100msgs(t *testing.T, tr pst.Transport) {
bufs <- buf bufs <- buf
log("writing %d bytes (message %d/%d #%x)", len(buf), i, msgs, buf[:3]) log("writing %d bytes (message %d/%d #%x)", len(buf), i, msgs, buf[:3])
if _, err := stream.Write(buf); err != nil { if _, err := stream.Write(buf); err != nil {
errs <- err errs <- fmt.Errorf("stream.Write(buf): %s", err)
continue continue
} }
} }
@ -212,7 +212,7 @@ func SubtestSimpleWrite100msgs(t *testing.T, tr pst.Transport) {
i++ i++
if _, err := io.ReadFull(stream, buf2); err != nil { if _, err := io.ReadFull(stream, buf2); err != nil {
errs <- err errs <- fmt.Errorf("readFull(stream, buf2): %s", err)
continue continue
} }
if !bytes.Equal(buf1, buf2) { if !bytes.Equal(buf1, buf2) {
@ -253,7 +253,7 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm,
bufs <- buf bufs <- buf
log("%p writing %d bytes (message %d/%d #%x)", s, len(buf), i, nMsg, buf[:3]) log("%p writing %d bytes (message %d/%d #%x)", s, len(buf), i, nMsg, buf[:3])
if _, err := s.Write(buf); err != nil { if _, err := s.Write(buf); err != nil {
errs <- err errs <- fmt.Errorf("s.Write(buf): %s", err)
continue continue
} }
} }
@ -265,11 +265,12 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm,
buf2 := make([]byte, msgsize) buf2 := make([]byte, msgsize)
i := 0 i := 0
for buf1 := range bufs { for buf1 := range bufs {
log("%p reading %d bytes (message %d/%d #%x)", s, len(buf1), i, nMsg, buf1[:3])
i++ i++
log("%p reading %d bytes (message %d/%d #%x)", s, len(buf1), i-1, nMsg, buf1[:3])
if _, err := io.ReadFull(s, buf2); err != nil { if _, err := io.ReadFull(s, buf2); err != nil {
errs <- err errs <- fmt.Errorf("io.ReadFull(s, buf2): %s", err)
log("%p failed to read %d bytes (message %d/%d #%x)", s, len(buf1), i-1, nMsg, buf1[:3])
continue continue
} }
if !bytes.Equal(buf1, buf2) { if !bytes.Equal(buf1, buf2) {
@ -307,13 +308,13 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm,
nc, err := net.Dial(nla.Network(), nla.String()) nc, err := net.Dial(nla.Network(), nla.String())
if err != nil { if err != nil {
errs <- err errs <- fmt.Errorf("net.Dial(%s, %s): %s", nla.Network(), nla.String(), err)
return return
} }
c, err := a.AddConn(nc) c, err := a.AddConn(nc)
if err != nil { if err != nil {
errs <- err errs <- fmt.Errorf("a.AddConn(%s <--> %s): %s", nc.LocalAddr(), nc.RemoteAddr(), err)
return return
} }