mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-29 01:12:24 +08:00
host interface + services
The separation of work in the p2p pkg is as follows: - net implements the Swarm and connectivity - protocol has muxer and header protocols - host implements protocol muxing + services - identify took over handshake completely! yay. - p2p package works as a whole
This commit is contained in:
149
p2p/host/basic/basic_host.go
Normal file
149
p2p/host/basic/basic_host.go
Normal file
@ -0,0 +1,149 @@
|
||||
package basichost
|
||||
|
||||
import (
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
|
||||
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
|
||||
|
||||
inet "github.com/jbenet/go-ipfs/p2p/net2"
|
||||
peer "github.com/jbenet/go-ipfs/p2p/peer"
|
||||
protocol "github.com/jbenet/go-ipfs/p2p/protocol"
|
||||
identify "github.com/jbenet/go-ipfs/p2p/protocol/identify"
|
||||
relay "github.com/jbenet/go-ipfs/p2p/protocol/relay"
|
||||
)
|
||||
|
||||
var log = eventlog.Logger("p2p/host/basic")
|
||||
|
||||
type BasicHost struct {
|
||||
network inet.Network
|
||||
mux protocol.Mux
|
||||
ids *identify.IDService
|
||||
relay *relay.RelayService
|
||||
}
|
||||
|
||||
// New constructs and sets up a new *BasicHost with given Network
|
||||
func New(net inet.Network) *BasicHost {
|
||||
h := &BasicHost{
|
||||
network: net,
|
||||
mux: protocol.Mux{Handlers: protocol.StreamHandlerMap{}},
|
||||
}
|
||||
|
||||
// setup host services
|
||||
h.ids = identify.NewIDService(h)
|
||||
h.relay = relay.NewRelayService(h, h.Mux().HandleSync)
|
||||
|
||||
net.SetConnHandler(h.newConnHandler)
|
||||
net.SetStreamHandler(h.newStreamHandler)
|
||||
|
||||
return h
|
||||
}
|
||||
|
||||
// newConnHandler is the remote-opened conn handler for inet.Network
|
||||
func (h *BasicHost) newConnHandler(c inet.Conn) {
|
||||
h.ids.IdentifyConn(c)
|
||||
}
|
||||
|
||||
// newStreamHandler is the remote-opened stream handler for inet.Network
|
||||
func (h *BasicHost) newStreamHandler(s inet.Stream) {
|
||||
h.Mux().Handle(s)
|
||||
}
|
||||
|
||||
// ID returns the (local) peer.ID associated with this Host
|
||||
func (h *BasicHost) ID() peer.ID {
|
||||
return h.Network().LocalPeer()
|
||||
}
|
||||
|
||||
// Peerstore returns the Host's repository of Peer Addresses and Keys.
|
||||
func (h *BasicHost) Peerstore() peer.Peerstore {
|
||||
return h.Network().Peerstore()
|
||||
}
|
||||
|
||||
// Networks returns the Network interface of the Host
|
||||
func (h *BasicHost) Network() inet.Network {
|
||||
return h.network
|
||||
}
|
||||
|
||||
// Mux returns the Mux multiplexing incoming streams to protocol handlers
|
||||
func (h *BasicHost) Mux() *protocol.Mux {
|
||||
return &h.mux
|
||||
}
|
||||
|
||||
func (h *BasicHost) IDService() *identify.IDService {
|
||||
return h.ids
|
||||
}
|
||||
|
||||
// SetStreamHandler sets the protocol handler on the Host's Mux.
|
||||
// This is equivalent to:
|
||||
// host.Mux().SetHandler(proto, handler)
|
||||
// (Threadsafe)
|
||||
func (h *BasicHost) SetStreamHandler(pid protocol.ID, handler inet.StreamHandler) {
|
||||
h.Mux().SetHandler(pid, handler)
|
||||
}
|
||||
|
||||
// NewStream opens a new stream to given peer p, and writes a p2p/protocol
|
||||
// header with given protocol.ID. If there is no connection to p, attempts
|
||||
// to create one. If ProtocolID is "", writes no header.
|
||||
// (Threadsafe)
|
||||
func (h *BasicHost) NewStream(pid protocol.ID, p peer.ID) (inet.Stream, error) {
|
||||
s, err := h.Network().NewStream(p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := protocol.WriteHeader(s, pid); err != nil {
|
||||
s.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// Connect ensures there is a connection between this host and the peer with
|
||||
// given peer.ID. Connect will absorb the addresses in pi into its internal
|
||||
// peerstore. If there is not an active connection, Connect will issue a
|
||||
// h.Network.Dial, and block until a connection is open, or an error is
|
||||
// returned. // TODO: Relay + NAT.
|
||||
func (h *BasicHost) Connect(ctx context.Context, pi peer.PeerInfo) error {
|
||||
|
||||
// absorb addresses into peerstore
|
||||
h.Peerstore().AddPeerInfo(pi)
|
||||
|
||||
cs := h.Network().ConnsToPeer(pi.ID)
|
||||
if len(cs) > 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return h.dialPeer(ctx, pi.ID)
|
||||
}
|
||||
|
||||
// dialPeer opens a connection to peer, and makes sure to identify
|
||||
// the connection once it has been opened.
|
||||
func (h *BasicHost) dialPeer(ctx context.Context, p peer.ID) error {
|
||||
log.Debugf("host %s dialing %s", h.ID, p)
|
||||
c, err := h.Network().DialPeer(ctx, p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// identify the connection before returning.
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
h.ids.IdentifyConn(c)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
// respect don contexteone
|
||||
select {
|
||||
case <-done:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
log.Debugf("host %s finished dialing %s", h.ID, p)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close shuts down the Host's services (network, etc).
|
||||
func (h *BasicHost) Close() error {
|
||||
return h.Network().Close()
|
||||
}
|
63
p2p/host/basic/basic_host_test.go
Normal file
63
p2p/host/basic/basic_host_test.go
Normal file
@ -0,0 +1,63 @@
|
||||
package basichost_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
inet "github.com/jbenet/go-ipfs/p2p/net2"
|
||||
protocol "github.com/jbenet/go-ipfs/p2p/protocol"
|
||||
testutil "github.com/jbenet/go-ipfs/p2p/test/util"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
)
|
||||
|
||||
func TestHostSimple(t *testing.T) {
|
||||
|
||||
ctx := context.Background()
|
||||
h1 := testutil.GenHostSwarm(t, ctx)
|
||||
h2 := testutil.GenHostSwarm(t, ctx)
|
||||
defer h1.Close()
|
||||
defer h2.Close()
|
||||
|
||||
h2pi := h2.Peerstore().PeerInfo(h2.ID())
|
||||
if err := h1.Connect(ctx, h2pi); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
piper, pipew := io.Pipe()
|
||||
h2.SetStreamHandler(protocol.TestingID, func(s inet.Stream) {
|
||||
defer s.Close()
|
||||
w := io.MultiWriter(s, pipew)
|
||||
io.Copy(w, s) // mirror everything
|
||||
})
|
||||
|
||||
s, err := h1.NewStream(protocol.TestingID, h2pi.ID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// write to the stream
|
||||
buf1 := []byte("abcdefghijkl")
|
||||
if _, err := s.Write(buf1); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// get it from the stream (echoed)
|
||||
buf2 := make([]byte, len(buf1))
|
||||
if _, err := io.ReadFull(s, buf2); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !bytes.Equal(buf1, buf2) {
|
||||
t.Fatal("buf1 != buf2 -- %x != %x", buf1, buf2)
|
||||
}
|
||||
|
||||
// get it from the pipe (tee)
|
||||
buf3 := make([]byte, len(buf1))
|
||||
if _, err := io.ReadFull(piper, buf3); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !bytes.Equal(buf1, buf3) {
|
||||
t.Fatal("buf1 != buf3 -- %x != %x", buf1, buf3)
|
||||
}
|
||||
}
|
54
p2p/host/host.go
Normal file
54
p2p/host/host.go
Normal file
@ -0,0 +1,54 @@
|
||||
package host
|
||||
|
||||
import (
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
|
||||
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
|
||||
|
||||
inet "github.com/jbenet/go-ipfs/p2p/net2"
|
||||
peer "github.com/jbenet/go-ipfs/p2p/peer"
|
||||
protocol "github.com/jbenet/go-ipfs/p2p/protocol"
|
||||
)
|
||||
|
||||
var log = eventlog.Logger("p2p/host")
|
||||
|
||||
// Host is an object participating in a p2p network, which
|
||||
// implements protocols or provides services. It handles
|
||||
// requests like a Server, and issues requests like a Client.
|
||||
// It is called Host because it is both Server and Client (and Peer
|
||||
// may be confusing).
|
||||
type Host interface {
|
||||
// ID returns the (local) peer.ID associated with this Host
|
||||
ID() peer.ID
|
||||
|
||||
// Peerstore returns the Host's repository of Peer Addresses and Keys.
|
||||
Peerstore() peer.Peerstore
|
||||
|
||||
// Networks returns the Network interface of the Host
|
||||
Network() inet.Network
|
||||
|
||||
// Mux returns the Mux multiplexing incoming streams to protocol handlers
|
||||
Mux() *protocol.Mux
|
||||
|
||||
// Connect ensures there is a connection between this host and the peer with
|
||||
// given peer.ID. Connect will absorb the addresses in pi into its internal
|
||||
// peerstore. If there is not an active connection, Connect will issue a
|
||||
// h.Network.Dial, and block until a connection is open, or an error is
|
||||
// returned. // TODO: Relay + NAT.
|
||||
Connect(ctx context.Context, pi peer.PeerInfo) error
|
||||
|
||||
// SetStreamHandler sets the protocol handler on the Host's Mux.
|
||||
// This is equivalent to:
|
||||
// host.Mux().SetHandler(proto, handler)
|
||||
// (Threadsafe)
|
||||
SetStreamHandler(pid protocol.ID, handler inet.StreamHandler)
|
||||
|
||||
// NewStream opens a new stream to given peer p, and writes a p2p/protocol
|
||||
// header with given protocol.ID. If there is no connection to p, attempts
|
||||
// to create one. If ProtocolID is "", writes no header.
|
||||
// (Threadsafe)
|
||||
NewStream(pid protocol.ID, p peer.ID) (inet.Stream, error)
|
||||
|
||||
// Close shuts down the host, its Network, and services.
|
||||
Close() error
|
||||
}
|
212
p2p/protocol/identify/id.go
Normal file
212
p2p/protocol/identify/id.go
Normal file
@ -0,0 +1,212 @@
|
||||
package identify
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io"
|
||||
semver "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
|
||||
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
|
||||
config "github.com/jbenet/go-ipfs/config"
|
||||
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
|
||||
|
||||
host "github.com/jbenet/go-ipfs/p2p/host"
|
||||
inet "github.com/jbenet/go-ipfs/p2p/net2"
|
||||
protocol "github.com/jbenet/go-ipfs/p2p/protocol"
|
||||
|
||||
pb "github.com/jbenet/go-ipfs/p2p/protocol/identify/pb"
|
||||
)
|
||||
|
||||
var log = eventlog.Logger("net/identify")
|
||||
|
||||
// ID is the protocol.ID of the Identify Service.
|
||||
const ID protocol.ID = "/ipfs/identify"
|
||||
|
||||
// IpfsVersion holds the current protocol version for a client running this code
|
||||
var IpfsVersion *semver.Version
|
||||
var ClientVersion = "go-ipfs/" + config.CurrentVersionNumber
|
||||
|
||||
func init() {
|
||||
var err error
|
||||
IpfsVersion, err = semver.NewVersion("0.0.1")
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("invalid protocol version: %v", err))
|
||||
}
|
||||
}
|
||||
|
||||
// IDService is a structure that implements ProtocolIdentify.
|
||||
// It is a trivial service that gives the other peer some
|
||||
// useful information about the local peer. A sort of hello.
|
||||
//
|
||||
// The IDService sends:
|
||||
// * Our IPFS Protocol Version
|
||||
// * Our IPFS Agent Version
|
||||
// * Our public Listen Addresses
|
||||
type IDService struct {
|
||||
Host host.Host
|
||||
|
||||
// connections undergoing identification
|
||||
// for wait purposes
|
||||
currid map[inet.Conn]chan struct{}
|
||||
currmu sync.RWMutex
|
||||
}
|
||||
|
||||
func NewIDService(h host.Host) *IDService {
|
||||
s := &IDService{
|
||||
Host: h,
|
||||
currid: make(map[inet.Conn]chan struct{}),
|
||||
}
|
||||
h.SetStreamHandler(ID, s.RequestHandler)
|
||||
return s
|
||||
}
|
||||
|
||||
func (ids *IDService) IdentifyConn(c inet.Conn) {
|
||||
ids.currmu.Lock()
|
||||
if wait, found := ids.currid[c]; found {
|
||||
ids.currmu.Unlock()
|
||||
log.Debugf("IdentifyConn called twice on: %s", c)
|
||||
<-wait // already identifying it. wait for it.
|
||||
return
|
||||
}
|
||||
ids.currid[c] = make(chan struct{})
|
||||
ids.currmu.Unlock()
|
||||
|
||||
s, err := c.NewStream()
|
||||
if err != nil {
|
||||
log.Error("error opening initial stream for %s", ID)
|
||||
log.Event(context.TODO(), "IdentifyOpenFailed", c.RemotePeer())
|
||||
} else {
|
||||
|
||||
// ok give the response to our handler.
|
||||
if err := protocol.WriteHeader(s, ID); err != nil {
|
||||
log.Error("error writing stream header for %s", ID)
|
||||
log.Event(context.TODO(), "IdentifyOpenFailed", c.RemotePeer())
|
||||
}
|
||||
ids.ResponseHandler(s)
|
||||
}
|
||||
|
||||
ids.currmu.Lock()
|
||||
ch, found := ids.currid[c]
|
||||
delete(ids.currid, c)
|
||||
ids.currmu.Unlock()
|
||||
|
||||
if !found {
|
||||
log.Errorf("IdentifyConn failed to find channel (programmer error) for %s", c)
|
||||
return
|
||||
}
|
||||
|
||||
close(ch) // release everyone waiting.
|
||||
}
|
||||
|
||||
func (ids *IDService) RequestHandler(s inet.Stream) {
|
||||
defer s.Close()
|
||||
c := s.Conn()
|
||||
|
||||
w := ggio.NewDelimitedWriter(s)
|
||||
mes := pb.Identify{}
|
||||
ids.populateMessage(&mes, s.Conn())
|
||||
w.WriteMsg(&mes)
|
||||
|
||||
log.Debugf("%s sent message to %s %s", ID,
|
||||
c.RemotePeer(), c.RemoteMultiaddr())
|
||||
}
|
||||
|
||||
func (ids *IDService) ResponseHandler(s inet.Stream) {
|
||||
defer s.Close()
|
||||
c := s.Conn()
|
||||
|
||||
r := ggio.NewDelimitedReader(s, 2048)
|
||||
mes := pb.Identify{}
|
||||
if err := r.ReadMsg(&mes); err != nil {
|
||||
log.Errorf("%s error receiving message from %s %s", ID,
|
||||
c.RemotePeer(), c.RemoteMultiaddr())
|
||||
return
|
||||
}
|
||||
ids.consumeMessage(&mes, c)
|
||||
|
||||
log.Debugf("%s received message from %s %s", ID,
|
||||
c.RemotePeer(), c.RemoteMultiaddr())
|
||||
}
|
||||
|
||||
func (ids *IDService) populateMessage(mes *pb.Identify, c inet.Conn) {
|
||||
|
||||
// set protocols this node is currently handling
|
||||
protos := ids.Host.Mux().Protocols()
|
||||
mes.Protocols = make([]string, len(protos))
|
||||
for i, p := range protos {
|
||||
mes.Protocols[i] = string(p)
|
||||
}
|
||||
|
||||
// observed address so other side is informed of their
|
||||
// "public" address, at least in relation to us.
|
||||
mes.ObservedAddr = c.RemoteMultiaddr().Bytes()
|
||||
|
||||
// set listen addrs
|
||||
laddrs, err := ids.Host.Network().InterfaceListenAddresses()
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
} else {
|
||||
mes.ListenAddrs = make([][]byte, len(laddrs))
|
||||
for i, addr := range laddrs {
|
||||
mes.ListenAddrs[i] = addr.Bytes()
|
||||
}
|
||||
log.Debugf("%s sent listen addrs to %s: %s", c.LocalPeer(), c.RemotePeer(), laddrs)
|
||||
}
|
||||
|
||||
// set protocol versions
|
||||
s := IpfsVersion.String()
|
||||
mes.ProtocolVersion = &s
|
||||
mes.AgentVersion = &ClientVersion
|
||||
}
|
||||
|
||||
func (ids *IDService) consumeMessage(mes *pb.Identify, c inet.Conn) {
|
||||
p := c.RemotePeer()
|
||||
|
||||
// mes.Protocols
|
||||
// mes.ObservedAddr
|
||||
|
||||
// mes.ListenAddrs
|
||||
laddrs := mes.GetListenAddrs()
|
||||
lmaddrs := make([]ma.Multiaddr, 0, len(laddrs))
|
||||
for _, addr := range laddrs {
|
||||
maddr, err := ma.NewMultiaddrBytes(addr)
|
||||
if err != nil {
|
||||
log.Errorf("%s failed to parse multiaddr from %s %s", ID,
|
||||
p, c.RemoteMultiaddr())
|
||||
continue
|
||||
}
|
||||
lmaddrs = append(lmaddrs, maddr)
|
||||
}
|
||||
|
||||
// update our peerstore with the addresses.
|
||||
ids.Host.Peerstore().AddAddresses(p, lmaddrs)
|
||||
log.Debugf("%s received listen addrs for %s: %s", c.LocalPeer(), c.RemotePeer(), lmaddrs)
|
||||
|
||||
// get protocol versions
|
||||
pv := *mes.ProtocolVersion
|
||||
av := *mes.AgentVersion
|
||||
ids.Host.Peerstore().Put(p, "ProtocolVersion", pv)
|
||||
ids.Host.Peerstore().Put(p, "AgentVersion", av)
|
||||
}
|
||||
|
||||
// IdentifyWait returns a channel which will be closed once
|
||||
// "ProtocolIdentify" (handshake3) finishes on given conn.
|
||||
// This happens async so the connection can start to be used
|
||||
// even if handshake3 knowledge is not necesary.
|
||||
// Users **MUST** call IdentifyWait _after_ IdentifyConn
|
||||
func (ids *IDService) IdentifyWait(c inet.Conn) <-chan struct{} {
|
||||
ids.currmu.Lock()
|
||||
ch, found := ids.currid[c]
|
||||
ids.currmu.Unlock()
|
||||
if found {
|
||||
return ch
|
||||
}
|
||||
|
||||
// if not found, it means we are already done identifying it, or
|
||||
// haven't even started. either way, return a new channel closed.
|
||||
ch = make(chan struct{})
|
||||
close(ch)
|
||||
return ch
|
||||
}
|
106
p2p/protocol/identify/id_test.go
Normal file
106
p2p/protocol/identify/id_test.go
Normal file
@ -0,0 +1,106 @@
|
||||
package identify_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
host "github.com/jbenet/go-ipfs/p2p/host"
|
||||
peer "github.com/jbenet/go-ipfs/p2p/peer"
|
||||
identify "github.com/jbenet/go-ipfs/p2p/protocol/identify"
|
||||
testutil "github.com/jbenet/go-ipfs/p2p/test/util"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
)
|
||||
|
||||
func subtestIDService(t *testing.T, postDialWait time.Duration) {
|
||||
|
||||
// the generated networks should have the id service wired in.
|
||||
ctx := context.Background()
|
||||
h1 := testutil.GenHostSwarm(t, ctx)
|
||||
h2 := testutil.GenHostSwarm(t, ctx)
|
||||
|
||||
h1p := h1.ID()
|
||||
h2p := h2.ID()
|
||||
|
||||
testKnowsAddrs(t, h1, h2p, []ma.Multiaddr{}) // nothing
|
||||
testKnowsAddrs(t, h2, h1p, []ma.Multiaddr{}) // nothing
|
||||
|
||||
h2pi := h2.Peerstore().PeerInfo(h2p)
|
||||
if err := h1.Connect(ctx, h2pi); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// we need to wait here if Dial returns before ID service is finished.
|
||||
if postDialWait > 0 {
|
||||
<-time.After(postDialWait)
|
||||
}
|
||||
|
||||
// the IDService should be opened automatically, by the network.
|
||||
// what we should see now is that both peers know about each others listen addresses.
|
||||
testKnowsAddrs(t, h1, h2p, h2.Peerstore().Addresses(h2p)) // has them
|
||||
testHasProtocolVersions(t, h1, h2p)
|
||||
|
||||
// now, this wait we do have to do. it's the wait for the Listening side
|
||||
// to be done identifying the connection.
|
||||
c := h2.Network().ConnsToPeer(h1.ID())
|
||||
if len(c) < 1 {
|
||||
t.Fatal("should have connection by now at least.")
|
||||
}
|
||||
<-h2.IDService().IdentifyWait(c[0])
|
||||
|
||||
// and the protocol versions.
|
||||
testKnowsAddrs(t, h2, h1p, h1.Peerstore().Addresses(h1p)) // has them
|
||||
testHasProtocolVersions(t, h2, h1p)
|
||||
}
|
||||
|
||||
func testKnowsAddrs(t *testing.T, h host.Host, p peer.ID, expected []ma.Multiaddr) {
|
||||
actual := h.Peerstore().Addresses(p)
|
||||
|
||||
if len(actual) != len(expected) {
|
||||
t.Error("dont have the same addresses")
|
||||
}
|
||||
|
||||
have := map[string]struct{}{}
|
||||
for _, addr := range actual {
|
||||
have[addr.String()] = struct{}{}
|
||||
}
|
||||
for _, addr := range expected {
|
||||
if _, found := have[addr.String()]; !found {
|
||||
t.Errorf("%s did not have addr for %s: %s", h.ID(), p, addr)
|
||||
// panic("ahhhhhhh")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testHasProtocolVersions(t *testing.T, h host.Host, p peer.ID) {
|
||||
v, err := h.Peerstore().Get(p, "ProtocolVersion")
|
||||
if v == nil {
|
||||
t.Error("no protocol version")
|
||||
return
|
||||
}
|
||||
if v.(string) != identify.IpfsVersion.String() {
|
||||
t.Error("protocol mismatch", err)
|
||||
}
|
||||
v, err = h.Peerstore().Get(p, "AgentVersion")
|
||||
if v.(string) != identify.ClientVersion {
|
||||
t.Error("agent version mismatch", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestIDServiceWait gives the ID service 100ms to finish after dialing
|
||||
// this is becasue it used to be concurrent. Now, Dial wait till the
|
||||
// id service is done.
|
||||
func TestIDServiceWait(t *testing.T) {
|
||||
N := 3
|
||||
for i := 0; i < N; i++ {
|
||||
subtestIDService(t, 100*time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIDServiceNoWait(t *testing.T) {
|
||||
N := 3
|
||||
for i := 0; i < N; i++ {
|
||||
subtestIDService(t, 0)
|
||||
}
|
||||
}
|
11
p2p/protocol/identify/pb/Makefile
Normal file
11
p2p/protocol/identify/pb/Makefile
Normal file
@ -0,0 +1,11 @@
|
||||
|
||||
PB = $(wildcard *.proto)
|
||||
GO = $(PB:.proto=.pb.go)
|
||||
|
||||
all: $(GO)
|
||||
|
||||
%.pb.go: %.proto
|
||||
protoc --gogo_out=. --proto_path=../../../../../../:/usr/local/opt/protobuf/include:. $<
|
||||
|
||||
clean:
|
||||
rm *.pb.go
|
93
p2p/protocol/identify/pb/identify.pb.go
Normal file
93
p2p/protocol/identify/pb/identify.pb.go
Normal file
@ -0,0 +1,93 @@
|
||||
// Code generated by protoc-gen-gogo.
|
||||
// source: identify.proto
|
||||
// DO NOT EDIT!
|
||||
|
||||
/*
|
||||
Package identify_pb is a generated protocol buffer package.
|
||||
|
||||
It is generated from these files:
|
||||
identify.proto
|
||||
|
||||
It has these top-level messages:
|
||||
Identify
|
||||
*/
|
||||
package identify_pb
|
||||
|
||||
import proto "code.google.com/p/gogoprotobuf/proto"
|
||||
import json "encoding/json"
|
||||
import math "math"
|
||||
|
||||
// Reference proto, json, and math imports to suppress error if they are not otherwise used.
|
||||
var _ = proto.Marshal
|
||||
var _ = &json.SyntaxError{}
|
||||
var _ = math.Inf
|
||||
|
||||
type Identify struct {
|
||||
// protocolVersion determines compatibility between peers
|
||||
ProtocolVersion *string `protobuf:"bytes,5,opt,name=protocolVersion" json:"protocolVersion,omitempty"`
|
||||
// agentVersion is like a UserAgent string in browsers, or client version in bittorrent
|
||||
// includes the client name and client.
|
||||
AgentVersion *string `protobuf:"bytes,6,opt,name=agentVersion" json:"agentVersion,omitempty"`
|
||||
// publicKey is this node's public key (which also gives its node.ID)
|
||||
// - may not need to be sent, as secure channel implies it has been sent.
|
||||
// - then again, if we change / disable secure channel, may still want it.
|
||||
PublicKey []byte `protobuf:"bytes,1,opt,name=publicKey" json:"publicKey,omitempty"`
|
||||
// listenAddrs are the multiaddrs the sender node listens for open connections on
|
||||
ListenAddrs [][]byte `protobuf:"bytes,2,rep,name=listenAddrs" json:"listenAddrs,omitempty"`
|
||||
// oservedAddr is the multiaddr of the remote endpoint that the sender node perceives
|
||||
// this is useful information to convey to the other side, as it helps the remote endpoint
|
||||
// determine whether its connection to the local peer goes through NAT.
|
||||
ObservedAddr []byte `protobuf:"bytes,4,opt,name=observedAddr" json:"observedAddr,omitempty"`
|
||||
// protocols are the services this node is running
|
||||
Protocols []string `protobuf:"bytes,3,rep,name=protocols" json:"protocols,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *Identify) Reset() { *m = Identify{} }
|
||||
func (m *Identify) String() string { return proto.CompactTextString(m) }
|
||||
func (*Identify) ProtoMessage() {}
|
||||
|
||||
func (m *Identify) GetProtocolVersion() string {
|
||||
if m != nil && m.ProtocolVersion != nil {
|
||||
return *m.ProtocolVersion
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *Identify) GetAgentVersion() string {
|
||||
if m != nil && m.AgentVersion != nil {
|
||||
return *m.AgentVersion
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *Identify) GetPublicKey() []byte {
|
||||
if m != nil {
|
||||
return m.PublicKey
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Identify) GetListenAddrs() [][]byte {
|
||||
if m != nil {
|
||||
return m.ListenAddrs
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Identify) GetObservedAddr() []byte {
|
||||
if m != nil {
|
||||
return m.ObservedAddr
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Identify) GetProtocols() []string {
|
||||
if m != nil {
|
||||
return m.Protocols
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
}
|
27
p2p/protocol/identify/pb/identify.proto
Normal file
27
p2p/protocol/identify/pb/identify.proto
Normal file
@ -0,0 +1,27 @@
|
||||
package identify.pb;
|
||||
|
||||
message Identify {
|
||||
|
||||
// protocolVersion determines compatibility between peers
|
||||
optional string protocolVersion = 5; // e.g. ipfs/1.0.0
|
||||
|
||||
// agentVersion is like a UserAgent string in browsers, or client version in bittorrent
|
||||
// includes the client name and client.
|
||||
optional string agentVersion = 6; // e.g. go-ipfs/0.1.0
|
||||
|
||||
// publicKey is this node's public key (which also gives its node.ID)
|
||||
// - may not need to be sent, as secure channel implies it has been sent.
|
||||
// - then again, if we change / disable secure channel, may still want it.
|
||||
optional bytes publicKey = 1;
|
||||
|
||||
// listenAddrs are the multiaddrs the sender node listens for open connections on
|
||||
repeated bytes listenAddrs = 2;
|
||||
|
||||
// oservedAddr is the multiaddr of the remote endpoint that the sender node perceives
|
||||
// this is useful information to convey to the other side, as it helps the remote endpoint
|
||||
// determine whether its connection to the local peer goes through NAT.
|
||||
optional bytes observedAddr = 4;
|
||||
|
||||
// protocols are the services this node is running
|
||||
repeated string protocols = 3;
|
||||
}
|
156
p2p/protocol/relay/relay.go
Normal file
156
p2p/protocol/relay/relay.go
Normal file
@ -0,0 +1,156 @@
|
||||
package relay
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
|
||||
|
||||
host "github.com/jbenet/go-ipfs/p2p/host"
|
||||
inet "github.com/jbenet/go-ipfs/p2p/net2"
|
||||
peer "github.com/jbenet/go-ipfs/p2p/peer"
|
||||
protocol "github.com/jbenet/go-ipfs/p2p/protocol"
|
||||
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
|
||||
)
|
||||
|
||||
var log = eventlog.Logger("p2p/protocol/relay")
|
||||
|
||||
// ID is the protocol.ID of the Relay Service.
|
||||
const ID protocol.ID = "/ipfs/relay"
|
||||
|
||||
// Relay is a structure that implements ProtocolRelay.
|
||||
// It is a simple relay service which forwards traffic
|
||||
// between two directly connected peers.
|
||||
//
|
||||
// the protocol is very simple:
|
||||
//
|
||||
// /ipfs/relay\n
|
||||
// <multihash src id>
|
||||
// <multihash dst id>
|
||||
// <data stream>
|
||||
//
|
||||
type RelayService struct {
|
||||
host host.Host
|
||||
handler inet.StreamHandler // for streams sent to us locally.
|
||||
}
|
||||
|
||||
func NewRelayService(h host.Host, sh inet.StreamHandler) *RelayService {
|
||||
s := &RelayService{
|
||||
host: h,
|
||||
handler: sh,
|
||||
}
|
||||
h.SetStreamHandler(ID, s.requestHandler)
|
||||
return s
|
||||
}
|
||||
|
||||
// requestHandler is the function called by clients
|
||||
func (rs *RelayService) requestHandler(s inet.Stream) {
|
||||
if err := rs.handleStream(s); err != nil {
|
||||
log.Error("RelayService error:", err)
|
||||
}
|
||||
}
|
||||
|
||||
// handleStream is our own handler, which returns an error for simplicity.
|
||||
func (rs *RelayService) handleStream(s inet.Stream) error {
|
||||
defer s.Close()
|
||||
|
||||
// read the header (src and dst peer.IDs)
|
||||
src, dst, err := ReadHeader(s)
|
||||
if err != nil {
|
||||
return fmt.Errorf("stream with bad header: %s", err)
|
||||
}
|
||||
|
||||
local := rs.host.ID()
|
||||
|
||||
switch {
|
||||
case src == local:
|
||||
return fmt.Errorf("relaying from self")
|
||||
case dst == local: // it's for us! yaaay.
|
||||
log.Debugf("%s consuming stream from %s", local, src)
|
||||
return rs.consumeStream(s)
|
||||
default: // src and dst are not local. relay it.
|
||||
log.Debugf("%s relaying stream %s <--> %s", local, src, dst)
|
||||
return rs.pipeStream(src, dst, s)
|
||||
}
|
||||
}
|
||||
|
||||
// consumeStream connects streams directed to the local peer
|
||||
// to our handler, with the header now stripped (read).
|
||||
func (rs *RelayService) consumeStream(s inet.Stream) error {
|
||||
rs.handler(s) // boom.
|
||||
return nil
|
||||
}
|
||||
|
||||
// pipeStream relays over a stream to a remote peer. It's like `cat`
|
||||
func (rs *RelayService) pipeStream(src, dst peer.ID, s inet.Stream) error {
|
||||
s2, err := rs.openStreamToPeer(dst)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open stream to peer: %s -- %s", dst, err)
|
||||
}
|
||||
|
||||
if err := WriteHeader(s2, src, dst); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// connect the series of tubes.
|
||||
done := make(chan retio, 2)
|
||||
go func() {
|
||||
n, err := io.Copy(s2, s)
|
||||
done <- retio{n, err}
|
||||
}()
|
||||
go func() {
|
||||
n, err := io.Copy(s, s2)
|
||||
done <- retio{n, err}
|
||||
}()
|
||||
|
||||
r1 := <-done
|
||||
r2 := <-done
|
||||
log.Infof("%s relayed %d/%d bytes between %s and %s", rs.host.ID(), r1.n, r2.n, src, dst)
|
||||
|
||||
if r1.err != nil {
|
||||
return r1.err
|
||||
}
|
||||
return r2.err
|
||||
}
|
||||
|
||||
// openStreamToPeer opens a pipe to a remote endpoint
|
||||
// for now, can only open streams to directly connected peers.
|
||||
// maybe we can do some routing later on.
|
||||
func (rs *RelayService) openStreamToPeer(p peer.ID) (inet.Stream, error) {
|
||||
return rs.host.NewStream(ID, p)
|
||||
}
|
||||
|
||||
func ReadHeader(r io.Reader) (src, dst peer.ID, err error) {
|
||||
|
||||
mhr := mh.NewReader(r)
|
||||
|
||||
s, err := mhr.ReadMultihash()
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
d, err := mhr.ReadMultihash()
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
return peer.ID(s), peer.ID(d), nil
|
||||
}
|
||||
|
||||
func WriteHeader(w io.Writer, src, dst peer.ID) error {
|
||||
// write header to w.
|
||||
mhw := mh.NewWriter(w)
|
||||
if err := mhw.WriteMultihash(mh.Multihash(src)); err != nil {
|
||||
return fmt.Errorf("failed to write relay header: %s -- %s", dst, err)
|
||||
}
|
||||
if err := mhw.WriteMultihash(mh.Multihash(dst)); err != nil {
|
||||
return fmt.Errorf("failed to write relay header: %s -- %s", dst, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type retio struct {
|
||||
n int64
|
||||
err error
|
||||
}
|
303
p2p/protocol/relay/relay_test.go
Normal file
303
p2p/protocol/relay/relay_test.go
Normal file
@ -0,0 +1,303 @@
|
||||
package relay_test
|
||||
|
||||
import (
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
inet "github.com/jbenet/go-ipfs/p2p/net2"
|
||||
protocol "github.com/jbenet/go-ipfs/p2p/protocol"
|
||||
relay "github.com/jbenet/go-ipfs/p2p/protocol/relay"
|
||||
testutil "github.com/jbenet/go-ipfs/p2p/test/util"
|
||||
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
)
|
||||
|
||||
var log = eventlog.Logger("relay_test")
|
||||
|
||||
func TestRelaySimple(t *testing.T) {
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// these networks have the relay service wired in already.
|
||||
n1 := testutil.GenHostSwarm(t, ctx)
|
||||
n2 := testutil.GenHostSwarm(t, ctx)
|
||||
n3 := testutil.GenHostSwarm(t, ctx)
|
||||
|
||||
n1p := n1.ID()
|
||||
n2p := n2.ID()
|
||||
n3p := n3.ID()
|
||||
|
||||
n2pi := n2.Peerstore().PeerInfo(n2p)
|
||||
if err := n1.Connect(ctx, n2pi); err != nil {
|
||||
t.Fatal("Failed to connect:", err)
|
||||
}
|
||||
if err := n3.Connect(ctx, n2pi); err != nil {
|
||||
t.Fatal("Failed to connect:", err)
|
||||
}
|
||||
|
||||
// setup handler on n3 to copy everything over to the pipe.
|
||||
piper, pipew := io.Pipe()
|
||||
n3.SetStreamHandler(protocol.TestingID, func(s inet.Stream) {
|
||||
log.Debug("relay stream opened to n3!")
|
||||
log.Debug("piping and echoing everything")
|
||||
w := io.MultiWriter(s, pipew)
|
||||
io.Copy(w, s)
|
||||
log.Debug("closing stream")
|
||||
s.Close()
|
||||
})
|
||||
|
||||
// ok, now we can try to relay n1--->n2--->n3.
|
||||
log.Debug("open relay stream")
|
||||
s, err := n1.NewStream(relay.ID, n2p)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// ok first thing we write the relay header n1->n3
|
||||
log.Debug("write relay header")
|
||||
if err := relay.WriteHeader(s, n1p, n3p); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// ok now the header's there, we can write the next protocol header.
|
||||
log.Debug("write testing header")
|
||||
if err := protocol.WriteHeader(s, protocol.TestingID); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// okay, now we should be able to write text, and read it out.
|
||||
buf1 := []byte("abcdefghij")
|
||||
buf2 := make([]byte, 10)
|
||||
buf3 := make([]byte, 10)
|
||||
log.Debug("write in some text.")
|
||||
if _, err := s.Write(buf1); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// read it out from the pipe.
|
||||
log.Debug("read it out from the pipe.")
|
||||
if _, err := io.ReadFull(piper, buf2); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if string(buf1) != string(buf2) {
|
||||
t.Fatal("should've gotten that text out of the pipe")
|
||||
}
|
||||
|
||||
// read it out from the stream (echoed)
|
||||
log.Debug("read it out from the stream (echoed).")
|
||||
if _, err := io.ReadFull(s, buf3); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if string(buf1) != string(buf3) {
|
||||
t.Fatal("should've gotten that text out of the stream")
|
||||
}
|
||||
|
||||
// sweet. relay works.
|
||||
log.Debug("sweet, relay works.")
|
||||
s.Close()
|
||||
}
|
||||
|
||||
func TestRelayAcrossFour(t *testing.T) {
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// these networks have the relay service wired in already.
|
||||
n1 := testutil.GenHostSwarm(t, ctx)
|
||||
n2 := testutil.GenHostSwarm(t, ctx)
|
||||
n3 := testutil.GenHostSwarm(t, ctx)
|
||||
n4 := testutil.GenHostSwarm(t, ctx)
|
||||
n5 := testutil.GenHostSwarm(t, ctx)
|
||||
|
||||
n1p := n1.ID()
|
||||
n2p := n2.ID()
|
||||
n3p := n3.ID()
|
||||
n4p := n4.ID()
|
||||
n5p := n5.ID()
|
||||
|
||||
n2pi := n2.Peerstore().PeerInfo(n2p)
|
||||
n4pi := n4.Peerstore().PeerInfo(n4p)
|
||||
|
||||
if err := n1.Connect(ctx, n2pi); err != nil {
|
||||
t.Fatalf("Failed to dial:", err)
|
||||
}
|
||||
if err := n3.Connect(ctx, n2pi); err != nil {
|
||||
t.Fatalf("Failed to dial:", err)
|
||||
}
|
||||
if err := n3.Connect(ctx, n4pi); err != nil {
|
||||
t.Fatalf("Failed to dial:", err)
|
||||
}
|
||||
if err := n5.Connect(ctx, n4pi); err != nil {
|
||||
t.Fatalf("Failed to dial:", err)
|
||||
}
|
||||
|
||||
// setup handler on n5 to copy everything over to the pipe.
|
||||
piper, pipew := io.Pipe()
|
||||
n5.SetStreamHandler(protocol.TestingID, func(s inet.Stream) {
|
||||
log.Debug("relay stream opened to n5!")
|
||||
log.Debug("piping and echoing everything")
|
||||
w := io.MultiWriter(s, pipew)
|
||||
io.Copy(w, s)
|
||||
log.Debug("closing stream")
|
||||
s.Close()
|
||||
})
|
||||
|
||||
// ok, now we can try to relay n1--->n2--->n3--->n4--->n5
|
||||
log.Debug("open relay stream")
|
||||
s, err := n1.NewStream(relay.ID, n2p)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
log.Debugf("write relay header n1->n3 (%s -> %s)", n1p, n3p)
|
||||
if err := relay.WriteHeader(s, n1p, n3p); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
log.Debugf("write relay header n1->n4 (%s -> %s)", n1p, n4p)
|
||||
if err := protocol.WriteHeader(s, relay.ID); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := relay.WriteHeader(s, n1p, n4p); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
log.Debugf("write relay header n1->n5 (%s -> %s)", n1p, n5p)
|
||||
if err := protocol.WriteHeader(s, relay.ID); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := relay.WriteHeader(s, n1p, n5p); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// ok now the header's there, we can write the next protocol header.
|
||||
log.Debug("write testing header")
|
||||
if err := protocol.WriteHeader(s, protocol.TestingID); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// okay, now we should be able to write text, and read it out.
|
||||
buf1 := []byte("abcdefghij")
|
||||
buf2 := make([]byte, 10)
|
||||
buf3 := make([]byte, 10)
|
||||
log.Debug("write in some text.")
|
||||
if _, err := s.Write(buf1); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// read it out from the pipe.
|
||||
log.Debug("read it out from the pipe.")
|
||||
if _, err := io.ReadFull(piper, buf2); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if string(buf1) != string(buf2) {
|
||||
t.Fatal("should've gotten that text out of the pipe")
|
||||
}
|
||||
|
||||
// read it out from the stream (echoed)
|
||||
log.Debug("read it out from the stream (echoed).")
|
||||
if _, err := io.ReadFull(s, buf3); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if string(buf1) != string(buf3) {
|
||||
t.Fatal("should've gotten that text out of the stream")
|
||||
}
|
||||
|
||||
// sweet. relay works.
|
||||
log.Debug("sweet, relaying across 4 works.")
|
||||
s.Close()
|
||||
}
|
||||
|
||||
func TestRelayStress(t *testing.T) {
|
||||
buflen := 1 << 18
|
||||
iterations := 10
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// these networks have the relay service wired in already.
|
||||
n1 := testutil.GenHostSwarm(t, ctx)
|
||||
n2 := testutil.GenHostSwarm(t, ctx)
|
||||
n3 := testutil.GenHostSwarm(t, ctx)
|
||||
|
||||
n1p := n1.ID()
|
||||
n2p := n2.ID()
|
||||
n3p := n3.ID()
|
||||
|
||||
n2pi := n2.Peerstore().PeerInfo(n2p)
|
||||
if err := n1.Connect(ctx, n2pi); err != nil {
|
||||
t.Fatalf("Failed to dial:", err)
|
||||
}
|
||||
if err := n3.Connect(ctx, n2pi); err != nil {
|
||||
t.Fatalf("Failed to dial:", err)
|
||||
}
|
||||
|
||||
// setup handler on n3 to copy everything over to the pipe.
|
||||
piper, pipew := io.Pipe()
|
||||
n3.SetStreamHandler(protocol.TestingID, func(s inet.Stream) {
|
||||
log.Debug("relay stream opened to n3!")
|
||||
log.Debug("piping and echoing everything")
|
||||
w := io.MultiWriter(s, pipew)
|
||||
io.Copy(w, s)
|
||||
log.Debug("closing stream")
|
||||
s.Close()
|
||||
})
|
||||
|
||||
// ok, now we can try to relay n1--->n2--->n3.
|
||||
log.Debug("open relay stream")
|
||||
s, err := n1.NewStream(relay.ID, n2p)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// ok first thing we write the relay header n1->n3
|
||||
log.Debug("write relay header")
|
||||
if err := relay.WriteHeader(s, n1p, n3p); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// ok now the header's there, we can write the next protocol header.
|
||||
log.Debug("write testing header")
|
||||
if err := protocol.WriteHeader(s, protocol.TestingID); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// okay, now write lots of text and read it back out from both
|
||||
// the pipe and the stream.
|
||||
buf1 := make([]byte, buflen)
|
||||
buf2 := make([]byte, len(buf1))
|
||||
buf3 := make([]byte, len(buf1))
|
||||
|
||||
fillbuf := func(buf []byte, b byte) {
|
||||
for i := range buf {
|
||||
buf[i] = b
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < iterations; i++ {
|
||||
fillbuf(buf1, byte(int('a')+i))
|
||||
log.Debugf("writing %d bytes (%d/%d)", len(buf1), i, iterations)
|
||||
if _, err := s.Write(buf1); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
log.Debug("read it out from the pipe.")
|
||||
if _, err := io.ReadFull(piper, buf2); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if string(buf1) != string(buf2) {
|
||||
t.Fatal("should've gotten that text out of the pipe")
|
||||
}
|
||||
|
||||
// read it out from the stream (echoed)
|
||||
log.Debug("read it out from the stream (echoed).")
|
||||
if _, err := io.ReadFull(s, buf3); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if string(buf1) != string(buf3) {
|
||||
t.Fatal("should've gotten that text out of the stream")
|
||||
}
|
||||
}
|
||||
|
||||
log.Debug("sweet, relay works under stress.")
|
||||
s.Close()
|
||||
}
|
1
p2p/test/backpressure/backpressure.go
Normal file
1
p2p/test/backpressure/backpressure.go
Normal file
@ -0,0 +1 @@
|
||||
package backpressure_tests
|
374
p2p/test/backpressure/backpressure_test.go
Normal file
374
p2p/test/backpressure/backpressure_test.go
Normal file
@ -0,0 +1,374 @@
|
||||
package backpressure_tests
|
||||
|
||||
import (
|
||||
crand "crypto/rand"
|
||||
"io"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
host "github.com/jbenet/go-ipfs/p2p/host"
|
||||
inet "github.com/jbenet/go-ipfs/p2p/net2"
|
||||
peer "github.com/jbenet/go-ipfs/p2p/peer"
|
||||
protocol "github.com/jbenet/go-ipfs/p2p/protocol"
|
||||
testutil "github.com/jbenet/go-ipfs/p2p/test/util"
|
||||
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
)
|
||||
|
||||
var log = eventlog.Logger("backpressure")
|
||||
|
||||
// TestBackpressureStreamHandler tests whether mux handler
|
||||
// ratelimiting works. Meaning, since the handler is sequential
|
||||
// it should block senders.
|
||||
//
|
||||
// Important note: spdystream (which peerstream uses) has a set
|
||||
// of n workers (n=spdsystream.FRAME_WORKERS) which handle new
|
||||
// frames, including those starting new streams. So all of them
|
||||
// can be in the handler at one time. Also, the sending side
|
||||
// does not rate limit unless we call stream.Wait()
|
||||
//
|
||||
//
|
||||
// Note: right now, this happens muxer-wide. the muxer should
|
||||
// learn to flow control, so handlers cant block each other.
|
||||
func TestBackpressureStreamHandler(t *testing.T) {
|
||||
t.Skip(`Sadly, as cool as this test is, it doesn't work
|
||||
Because spdystream doesnt handle stream open backpressure
|
||||
well IMO. I'll see about rewriting that part when it becomes
|
||||
a problem.
|
||||
`)
|
||||
|
||||
// a number of concurrent request handlers
|
||||
limit := 10
|
||||
|
||||
// our way to signal that we're done with 1 request
|
||||
requestHandled := make(chan struct{})
|
||||
|
||||
// handler rate limiting
|
||||
receiverRatelimit := make(chan struct{}, limit)
|
||||
for i := 0; i < limit; i++ {
|
||||
receiverRatelimit <- struct{}{}
|
||||
}
|
||||
|
||||
// sender counter of successfully opened streams
|
||||
senderOpened := make(chan struct{}, limit*100)
|
||||
|
||||
// sender signals it's done (errored out)
|
||||
senderDone := make(chan struct{})
|
||||
|
||||
// the receiver handles requests with some rate limiting
|
||||
receiver := func(s inet.Stream) {
|
||||
log.Debug("receiver received a stream")
|
||||
|
||||
<-receiverRatelimit // acquire
|
||||
go func() {
|
||||
// our request handler. can do stuff here. we
|
||||
// simulate something taking time by waiting
|
||||
// on requestHandled
|
||||
log.Error("request worker handling...")
|
||||
<-requestHandled
|
||||
log.Error("request worker done!")
|
||||
receiverRatelimit <- struct{}{} // release
|
||||
}()
|
||||
}
|
||||
|
||||
// the sender opens streams as fast as possible
|
||||
sender := func(host host.Host, remote peer.ID) {
|
||||
var s inet.Stream
|
||||
var err error
|
||||
defer func() {
|
||||
t.Error(err)
|
||||
log.Debug("sender error. exiting.")
|
||||
senderDone <- struct{}{}
|
||||
}()
|
||||
|
||||
for {
|
||||
s, err = host.NewStream(protocol.TestingID, remote)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
_ = s
|
||||
// if err = s.SwarmStream().Stream().Wait(); err != nil {
|
||||
// return
|
||||
// }
|
||||
|
||||
// "count" another successfully opened stream
|
||||
// (large buffer so shouldn't block in normal operation)
|
||||
log.Debug("sender opened another stream!")
|
||||
senderOpened <- struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
// count our senderOpened events
|
||||
countStreamsOpenedBySender := func(min int) int {
|
||||
opened := 0
|
||||
for opened < min {
|
||||
log.Debugf("countStreamsOpenedBySender got %d (min %d)", opened, min)
|
||||
select {
|
||||
case <-senderOpened:
|
||||
opened++
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
return opened
|
||||
}
|
||||
|
||||
// count our received events
|
||||
// waitForNReceivedStreams := func(n int) {
|
||||
// for n > 0 {
|
||||
// log.Debugf("waiting for %d received streams...", n)
|
||||
// select {
|
||||
// case <-receiverRatelimit:
|
||||
// n--
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
testStreamsOpened := func(expected int) {
|
||||
log.Debugf("testing rate limited to %d streams", expected)
|
||||
if n := countStreamsOpenedBySender(expected); n != expected {
|
||||
t.Fatalf("rate limiting did not work :( -- %d != %d", expected, n)
|
||||
}
|
||||
}
|
||||
|
||||
// ok that's enough setup. let's do it!
|
||||
|
||||
ctx := context.Background()
|
||||
h1 := testutil.GenHostSwarm(t, ctx)
|
||||
h2 := testutil.GenHostSwarm(t, ctx)
|
||||
|
||||
// setup receiver handler
|
||||
h1.SetStreamHandler(protocol.TestingID, receiver)
|
||||
|
||||
h2pi := h2.Peerstore().PeerInfo(h2.ID())
|
||||
log.Debugf("dialing %s", h2pi.Addrs)
|
||||
if err := h1.Connect(ctx, h2pi); err != nil {
|
||||
t.Fatalf("Failed to connect:", err)
|
||||
}
|
||||
|
||||
// launch sender!
|
||||
go sender(h2, h1.ID())
|
||||
|
||||
// ok, what do we expect to happen? the receiver should
|
||||
// receive 10 requests and stop receiving, blocking the sender.
|
||||
// we can test this by counting 10x senderOpened requests
|
||||
|
||||
<-senderOpened // wait for the sender to successfully open some.
|
||||
testStreamsOpened(limit - 1)
|
||||
|
||||
// let's "handle" 3 requests.
|
||||
<-requestHandled
|
||||
<-requestHandled
|
||||
<-requestHandled
|
||||
// the sender should've now been able to open exactly 3 more.
|
||||
|
||||
testStreamsOpened(3)
|
||||
|
||||
// shouldn't have opened anything more
|
||||
testStreamsOpened(0)
|
||||
|
||||
// let's "handle" 100 requests in batches of 5
|
||||
for i := 0; i < 20; i++ {
|
||||
<-requestHandled
|
||||
<-requestHandled
|
||||
<-requestHandled
|
||||
<-requestHandled
|
||||
<-requestHandled
|
||||
testStreamsOpened(5)
|
||||
}
|
||||
|
||||
// success!
|
||||
|
||||
// now for the sugar on top: let's tear down the receiver. it should
|
||||
// exit the sender.
|
||||
h1.Close()
|
||||
|
||||
// shouldn't have opened anything more
|
||||
testStreamsOpened(0)
|
||||
|
||||
select {
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Error("receiver shutdown failed to exit sender")
|
||||
case <-senderDone:
|
||||
log.Info("handler backpressure works!")
|
||||
}
|
||||
}
|
||||
|
||||
// TestStBackpressureStreamWrite tests whether streams see proper
|
||||
// backpressure when writing data over the network streams.
|
||||
func TestStBackpressureStreamWrite(t *testing.T) {
|
||||
|
||||
// senderWrote signals that the sender wrote bytes to remote.
|
||||
// the value is the count of bytes written.
|
||||
senderWrote := make(chan int, 10000)
|
||||
|
||||
// sender signals it's done (errored out)
|
||||
senderDone := make(chan struct{})
|
||||
|
||||
// writeStats lets us listen to all the writes and return
|
||||
// how many happened and how much was written
|
||||
writeStats := func() (int, int) {
|
||||
writes := 0
|
||||
bytes := 0
|
||||
for {
|
||||
select {
|
||||
case n := <-senderWrote:
|
||||
writes++
|
||||
bytes = bytes + n
|
||||
default:
|
||||
log.Debugf("stats: sender wrote %d bytes, %d writes", bytes, writes)
|
||||
return bytes, writes
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// sender attempts to write as fast as possible, signaling on the
|
||||
// completion of every write. This makes it possible to see how
|
||||
// fast it's actually writing. We pair this with a receiver
|
||||
// that waits for a signal to read.
|
||||
sender := func(s inet.Stream) {
|
||||
defer func() {
|
||||
s.Close()
|
||||
senderDone <- struct{}{}
|
||||
}()
|
||||
|
||||
// ready a buffer of random data
|
||||
buf := make([]byte, 65536)
|
||||
crand.Read(buf)
|
||||
|
||||
for {
|
||||
// send a randomly sized subchunk
|
||||
from := rand.Intn(len(buf) / 2)
|
||||
to := rand.Intn(len(buf) / 2)
|
||||
sendbuf := buf[from : from+to]
|
||||
|
||||
n, err := s.Write(sendbuf)
|
||||
if err != nil {
|
||||
log.Debug("sender error. exiting:", err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debugf("sender wrote %d bytes", n)
|
||||
senderWrote <- n
|
||||
}
|
||||
}
|
||||
|
||||
// receive a number of bytes from a stream.
|
||||
// returns the number of bytes written.
|
||||
receive := func(s inet.Stream, expect int) {
|
||||
log.Debugf("receiver to read %d bytes", expect)
|
||||
rbuf := make([]byte, expect)
|
||||
n, err := io.ReadFull(s, rbuf)
|
||||
if err != nil {
|
||||
t.Error("read failed:", err)
|
||||
}
|
||||
if expect != n {
|
||||
t.Error("read len differs: %d != %d", expect, n)
|
||||
}
|
||||
}
|
||||
|
||||
// ok let's do it!
|
||||
|
||||
// setup the networks
|
||||
ctx := context.Background()
|
||||
h1 := testutil.GenHostSwarm(t, ctx)
|
||||
h2 := testutil.GenHostSwarm(t, ctx)
|
||||
|
||||
// setup sender handler on 1
|
||||
h1.SetStreamHandler(protocol.TestingID, sender)
|
||||
|
||||
h2pi := h2.Peerstore().PeerInfo(h2.ID())
|
||||
log.Debugf("dialing %s", h2pi.Addrs)
|
||||
if err := h1.Connect(ctx, h2pi); err != nil {
|
||||
t.Fatalf("Failed to connect:", err)
|
||||
}
|
||||
|
||||
// open a stream, from 2->1, this is our reader
|
||||
s, err := h2.NewStream(protocol.TestingID, h1.ID())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// let's make sure r/w works.
|
||||
testSenderWrote := func(bytesE int) {
|
||||
bytesA, writesA := writeStats()
|
||||
if bytesA != bytesE {
|
||||
t.Errorf("numbers failed: %d =?= %d bytes, via %d writes", bytesA, bytesE, writesA)
|
||||
}
|
||||
}
|
||||
|
||||
// 500ms rounds of lockstep write + drain
|
||||
roundsStart := time.Now()
|
||||
roundsTotal := 0
|
||||
for roundsTotal < (2 << 20) {
|
||||
// let the sender fill its buffers, it will stop sending.
|
||||
<-time.After(300 * time.Millisecond)
|
||||
b, _ := writeStats()
|
||||
testSenderWrote(0)
|
||||
testSenderWrote(0)
|
||||
|
||||
// drain it all, wait again
|
||||
receive(s, b)
|
||||
roundsTotal = roundsTotal + b
|
||||
}
|
||||
roundsTime := time.Now().Sub(roundsStart)
|
||||
|
||||
// now read continously, while we measure stats.
|
||||
stop := make(chan struct{})
|
||||
contStart := time.Now()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
default:
|
||||
receive(s, 2<<15)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
contTotal := 0
|
||||
for contTotal < (2 << 20) {
|
||||
n := <-senderWrote
|
||||
contTotal += n
|
||||
}
|
||||
stop <- struct{}{}
|
||||
contTime := time.Now().Sub(contStart)
|
||||
|
||||
// now compare! continuous should've been faster AND larger
|
||||
if roundsTime < contTime {
|
||||
t.Error("continuous should have been faster")
|
||||
}
|
||||
|
||||
if roundsTotal < contTotal {
|
||||
t.Error("continuous should have been larger, too!")
|
||||
}
|
||||
|
||||
// and a couple rounds more for good measure ;)
|
||||
for i := 0; i < 3; i++ {
|
||||
// let the sender fill its buffers, it will stop sending.
|
||||
<-time.After(300 * time.Millisecond)
|
||||
b, _ := writeStats()
|
||||
testSenderWrote(0)
|
||||
testSenderWrote(0)
|
||||
|
||||
// drain it all, wait again
|
||||
receive(s, b)
|
||||
}
|
||||
|
||||
// this doesn't work :(:
|
||||
// // now for the sugar on top: let's tear down the receiver. it should
|
||||
// // exit the sender.
|
||||
// n1.Close()
|
||||
// testSenderWrote(0)
|
||||
// testSenderWrote(0)
|
||||
// select {
|
||||
// case <-time.After(2 * time.Second):
|
||||
// t.Error("receiver shutdown failed to exit sender")
|
||||
// case <-senderDone:
|
||||
// log.Info("handler backpressure works!")
|
||||
// }
|
||||
}
|
37
p2p/test/util/util.go
Normal file
37
p2p/test/util/util.go
Normal file
@ -0,0 +1,37 @@
|
||||
package testutil
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
bhost "github.com/jbenet/go-ipfs/p2p/host/basic"
|
||||
inet "github.com/jbenet/go-ipfs/p2p/net2"
|
||||
swarm "github.com/jbenet/go-ipfs/p2p/net2/swarm"
|
||||
peer "github.com/jbenet/go-ipfs/p2p/peer"
|
||||
tu "github.com/jbenet/go-ipfs/util/testutil"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
)
|
||||
|
||||
func GenSwarmNetwork(t *testing.T, ctx context.Context) *swarm.Network {
|
||||
p := tu.RandPeerNetParamsOrFatal(t)
|
||||
ps := peer.NewPeerstore()
|
||||
ps.AddAddress(p.ID, p.Addr)
|
||||
ps.AddPubKey(p.ID, p.PubKey)
|
||||
ps.AddPrivKey(p.ID, p.PrivKey)
|
||||
n, err := swarm.NewNetwork(ctx, ps.Addresses(p.ID), p.ID, ps)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func DivulgeAddresses(a, b inet.Network) {
|
||||
id := a.LocalPeer()
|
||||
addrs := a.Peerstore().Addresses(id)
|
||||
b.Peerstore().AddAddresses(id, addrs)
|
||||
}
|
||||
|
||||
func GenHostSwarm(t *testing.T, ctx context.Context) *bhost.BasicHost {
|
||||
n := GenSwarmNetwork(t, ctx)
|
||||
return bhost.New(n)
|
||||
}
|
Reference in New Issue
Block a user