diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go new file mode 100644 index 000000000..2f08b157a --- /dev/null +++ b/p2p/host/basic/basic_host.go @@ -0,0 +1,149 @@ +package basichost + +import ( + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + + eventlog "github.com/jbenet/go-ipfs/util/eventlog" + + inet "github.com/jbenet/go-ipfs/p2p/net2" + peer "github.com/jbenet/go-ipfs/p2p/peer" + protocol "github.com/jbenet/go-ipfs/p2p/protocol" + identify "github.com/jbenet/go-ipfs/p2p/protocol/identify" + relay "github.com/jbenet/go-ipfs/p2p/protocol/relay" +) + +var log = eventlog.Logger("p2p/host/basic") + +type BasicHost struct { + network inet.Network + mux protocol.Mux + ids *identify.IDService + relay *relay.RelayService +} + +// New constructs and sets up a new *BasicHost with given Network +func New(net inet.Network) *BasicHost { + h := &BasicHost{ + network: net, + mux: protocol.Mux{Handlers: protocol.StreamHandlerMap{}}, + } + + // setup host services + h.ids = identify.NewIDService(h) + h.relay = relay.NewRelayService(h, h.Mux().HandleSync) + + net.SetConnHandler(h.newConnHandler) + net.SetStreamHandler(h.newStreamHandler) + + return h +} + +// newConnHandler is the remote-opened conn handler for inet.Network +func (h *BasicHost) newConnHandler(c inet.Conn) { + h.ids.IdentifyConn(c) +} + +// newStreamHandler is the remote-opened stream handler for inet.Network +func (h *BasicHost) newStreamHandler(s inet.Stream) { + h.Mux().Handle(s) +} + +// ID returns the (local) peer.ID associated with this Host +func (h *BasicHost) ID() peer.ID { + return h.Network().LocalPeer() +} + +// Peerstore returns the Host's repository of Peer Addresses and Keys. +func (h *BasicHost) Peerstore() peer.Peerstore { + return h.Network().Peerstore() +} + +// Networks returns the Network interface of the Host +func (h *BasicHost) Network() inet.Network { + return h.network +} + +// Mux returns the Mux multiplexing incoming streams to protocol handlers +func (h *BasicHost) Mux() *protocol.Mux { + return &h.mux +} + +func (h *BasicHost) IDService() *identify.IDService { + return h.ids +} + +// SetStreamHandler sets the protocol handler on the Host's Mux. +// This is equivalent to: +// host.Mux().SetHandler(proto, handler) +// (Threadsafe) +func (h *BasicHost) SetStreamHandler(pid protocol.ID, handler inet.StreamHandler) { + h.Mux().SetHandler(pid, handler) +} + +// NewStream opens a new stream to given peer p, and writes a p2p/protocol +// header with given protocol.ID. If there is no connection to p, attempts +// to create one. If ProtocolID is "", writes no header. +// (Threadsafe) +func (h *BasicHost) NewStream(pid protocol.ID, p peer.ID) (inet.Stream, error) { + s, err := h.Network().NewStream(p) + if err != nil { + return nil, err + } + + if err := protocol.WriteHeader(s, pid); err != nil { + s.Close() + return nil, err + } + + return s, nil +} + +// Connect ensures there is a connection between this host and the peer with +// given peer.ID. Connect will absorb the addresses in pi into its internal +// peerstore. If there is not an active connection, Connect will issue a +// h.Network.Dial, and block until a connection is open, or an error is +// returned. // TODO: Relay + NAT. +func (h *BasicHost) Connect(ctx context.Context, pi peer.PeerInfo) error { + + // absorb addresses into peerstore + h.Peerstore().AddPeerInfo(pi) + + cs := h.Network().ConnsToPeer(pi.ID) + if len(cs) > 0 { + return nil + } + + return h.dialPeer(ctx, pi.ID) +} + +// dialPeer opens a connection to peer, and makes sure to identify +// the connection once it has been opened. +func (h *BasicHost) dialPeer(ctx context.Context, p peer.ID) error { + log.Debugf("host %s dialing %s", h.ID, p) + c, err := h.Network().DialPeer(ctx, p) + if err != nil { + return err + } + + // identify the connection before returning. + done := make(chan struct{}) + go func() { + h.ids.IdentifyConn(c) + close(done) + }() + + // respect don contexteone + select { + case <-done: + case <-ctx.Done(): + return ctx.Err() + } + + log.Debugf("host %s finished dialing %s", h.ID, p) + return nil +} + +// Close shuts down the Host's services (network, etc). +func (h *BasicHost) Close() error { + return h.Network().Close() +} diff --git a/p2p/host/basic/basic_host_test.go b/p2p/host/basic/basic_host_test.go new file mode 100644 index 000000000..992991bec --- /dev/null +++ b/p2p/host/basic/basic_host_test.go @@ -0,0 +1,63 @@ +package basichost_test + +import ( + "bytes" + "io" + "testing" + + inet "github.com/jbenet/go-ipfs/p2p/net2" + protocol "github.com/jbenet/go-ipfs/p2p/protocol" + testutil "github.com/jbenet/go-ipfs/p2p/test/util" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" +) + +func TestHostSimple(t *testing.T) { + + ctx := context.Background() + h1 := testutil.GenHostSwarm(t, ctx) + h2 := testutil.GenHostSwarm(t, ctx) + defer h1.Close() + defer h2.Close() + + h2pi := h2.Peerstore().PeerInfo(h2.ID()) + if err := h1.Connect(ctx, h2pi); err != nil { + t.Fatal(err) + } + + piper, pipew := io.Pipe() + h2.SetStreamHandler(protocol.TestingID, func(s inet.Stream) { + defer s.Close() + w := io.MultiWriter(s, pipew) + io.Copy(w, s) // mirror everything + }) + + s, err := h1.NewStream(protocol.TestingID, h2pi.ID) + if err != nil { + t.Fatal(err) + } + + // write to the stream + buf1 := []byte("abcdefghijkl") + if _, err := s.Write(buf1); err != nil { + t.Fatal(err) + } + + // get it from the stream (echoed) + buf2 := make([]byte, len(buf1)) + if _, err := io.ReadFull(s, buf2); err != nil { + t.Fatal(err) + } + if !bytes.Equal(buf1, buf2) { + t.Fatal("buf1 != buf2 -- %x != %x", buf1, buf2) + } + + // get it from the pipe (tee) + buf3 := make([]byte, len(buf1)) + if _, err := io.ReadFull(piper, buf3); err != nil { + t.Fatal(err) + } + if !bytes.Equal(buf1, buf3) { + t.Fatal("buf1 != buf3 -- %x != %x", buf1, buf3) + } +} diff --git a/p2p/host/host.go b/p2p/host/host.go new file mode 100644 index 000000000..944cb1b44 --- /dev/null +++ b/p2p/host/host.go @@ -0,0 +1,54 @@ +package host + +import ( + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + + eventlog "github.com/jbenet/go-ipfs/util/eventlog" + + inet "github.com/jbenet/go-ipfs/p2p/net2" + peer "github.com/jbenet/go-ipfs/p2p/peer" + protocol "github.com/jbenet/go-ipfs/p2p/protocol" +) + +var log = eventlog.Logger("p2p/host") + +// Host is an object participating in a p2p network, which +// implements protocols or provides services. It handles +// requests like a Server, and issues requests like a Client. +// It is called Host because it is both Server and Client (and Peer +// may be confusing). +type Host interface { + // ID returns the (local) peer.ID associated with this Host + ID() peer.ID + + // Peerstore returns the Host's repository of Peer Addresses and Keys. + Peerstore() peer.Peerstore + + // Networks returns the Network interface of the Host + Network() inet.Network + + // Mux returns the Mux multiplexing incoming streams to protocol handlers + Mux() *protocol.Mux + + // Connect ensures there is a connection between this host and the peer with + // given peer.ID. Connect will absorb the addresses in pi into its internal + // peerstore. If there is not an active connection, Connect will issue a + // h.Network.Dial, and block until a connection is open, or an error is + // returned. // TODO: Relay + NAT. + Connect(ctx context.Context, pi peer.PeerInfo) error + + // SetStreamHandler sets the protocol handler on the Host's Mux. + // This is equivalent to: + // host.Mux().SetHandler(proto, handler) + // (Threadsafe) + SetStreamHandler(pid protocol.ID, handler inet.StreamHandler) + + // NewStream opens a new stream to given peer p, and writes a p2p/protocol + // header with given protocol.ID. If there is no connection to p, attempts + // to create one. If ProtocolID is "", writes no header. + // (Threadsafe) + NewStream(pid protocol.ID, p peer.ID) (inet.Stream, error) + + // Close shuts down the host, its Network, and services. + Close() error +} diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go new file mode 100644 index 000000000..266bcc347 --- /dev/null +++ b/p2p/protocol/identify/id.go @@ -0,0 +1,212 @@ +package identify + +import ( + "fmt" + "sync" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io" + semver "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/coreos/go-semver/semver" + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + + config "github.com/jbenet/go-ipfs/config" + eventlog "github.com/jbenet/go-ipfs/util/eventlog" + + host "github.com/jbenet/go-ipfs/p2p/host" + inet "github.com/jbenet/go-ipfs/p2p/net2" + protocol "github.com/jbenet/go-ipfs/p2p/protocol" + + pb "github.com/jbenet/go-ipfs/p2p/protocol/identify/pb" +) + +var log = eventlog.Logger("net/identify") + +// ID is the protocol.ID of the Identify Service. +const ID protocol.ID = "/ipfs/identify" + +// IpfsVersion holds the current protocol version for a client running this code +var IpfsVersion *semver.Version +var ClientVersion = "go-ipfs/" + config.CurrentVersionNumber + +func init() { + var err error + IpfsVersion, err = semver.NewVersion("0.0.1") + if err != nil { + panic(fmt.Errorf("invalid protocol version: %v", err)) + } +} + +// IDService is a structure that implements ProtocolIdentify. +// It is a trivial service that gives the other peer some +// useful information about the local peer. A sort of hello. +// +// The IDService sends: +// * Our IPFS Protocol Version +// * Our IPFS Agent Version +// * Our public Listen Addresses +type IDService struct { + Host host.Host + + // connections undergoing identification + // for wait purposes + currid map[inet.Conn]chan struct{} + currmu sync.RWMutex +} + +func NewIDService(h host.Host) *IDService { + s := &IDService{ + Host: h, + currid: make(map[inet.Conn]chan struct{}), + } + h.SetStreamHandler(ID, s.RequestHandler) + return s +} + +func (ids *IDService) IdentifyConn(c inet.Conn) { + ids.currmu.Lock() + if wait, found := ids.currid[c]; found { + ids.currmu.Unlock() + log.Debugf("IdentifyConn called twice on: %s", c) + <-wait // already identifying it. wait for it. + return + } + ids.currid[c] = make(chan struct{}) + ids.currmu.Unlock() + + s, err := c.NewStream() + if err != nil { + log.Error("error opening initial stream for %s", ID) + log.Event(context.TODO(), "IdentifyOpenFailed", c.RemotePeer()) + } else { + + // ok give the response to our handler. + if err := protocol.WriteHeader(s, ID); err != nil { + log.Error("error writing stream header for %s", ID) + log.Event(context.TODO(), "IdentifyOpenFailed", c.RemotePeer()) + } + ids.ResponseHandler(s) + } + + ids.currmu.Lock() + ch, found := ids.currid[c] + delete(ids.currid, c) + ids.currmu.Unlock() + + if !found { + log.Errorf("IdentifyConn failed to find channel (programmer error) for %s", c) + return + } + + close(ch) // release everyone waiting. +} + +func (ids *IDService) RequestHandler(s inet.Stream) { + defer s.Close() + c := s.Conn() + + w := ggio.NewDelimitedWriter(s) + mes := pb.Identify{} + ids.populateMessage(&mes, s.Conn()) + w.WriteMsg(&mes) + + log.Debugf("%s sent message to %s %s", ID, + c.RemotePeer(), c.RemoteMultiaddr()) +} + +func (ids *IDService) ResponseHandler(s inet.Stream) { + defer s.Close() + c := s.Conn() + + r := ggio.NewDelimitedReader(s, 2048) + mes := pb.Identify{} + if err := r.ReadMsg(&mes); err != nil { + log.Errorf("%s error receiving message from %s %s", ID, + c.RemotePeer(), c.RemoteMultiaddr()) + return + } + ids.consumeMessage(&mes, c) + + log.Debugf("%s received message from %s %s", ID, + c.RemotePeer(), c.RemoteMultiaddr()) +} + +func (ids *IDService) populateMessage(mes *pb.Identify, c inet.Conn) { + + // set protocols this node is currently handling + protos := ids.Host.Mux().Protocols() + mes.Protocols = make([]string, len(protos)) + for i, p := range protos { + mes.Protocols[i] = string(p) + } + + // observed address so other side is informed of their + // "public" address, at least in relation to us. + mes.ObservedAddr = c.RemoteMultiaddr().Bytes() + + // set listen addrs + laddrs, err := ids.Host.Network().InterfaceListenAddresses() + if err != nil { + log.Error(err) + } else { + mes.ListenAddrs = make([][]byte, len(laddrs)) + for i, addr := range laddrs { + mes.ListenAddrs[i] = addr.Bytes() + } + log.Debugf("%s sent listen addrs to %s: %s", c.LocalPeer(), c.RemotePeer(), laddrs) + } + + // set protocol versions + s := IpfsVersion.String() + mes.ProtocolVersion = &s + mes.AgentVersion = &ClientVersion +} + +func (ids *IDService) consumeMessage(mes *pb.Identify, c inet.Conn) { + p := c.RemotePeer() + + // mes.Protocols + // mes.ObservedAddr + + // mes.ListenAddrs + laddrs := mes.GetListenAddrs() + lmaddrs := make([]ma.Multiaddr, 0, len(laddrs)) + for _, addr := range laddrs { + maddr, err := ma.NewMultiaddrBytes(addr) + if err != nil { + log.Errorf("%s failed to parse multiaddr from %s %s", ID, + p, c.RemoteMultiaddr()) + continue + } + lmaddrs = append(lmaddrs, maddr) + } + + // update our peerstore with the addresses. + ids.Host.Peerstore().AddAddresses(p, lmaddrs) + log.Debugf("%s received listen addrs for %s: %s", c.LocalPeer(), c.RemotePeer(), lmaddrs) + + // get protocol versions + pv := *mes.ProtocolVersion + av := *mes.AgentVersion + ids.Host.Peerstore().Put(p, "ProtocolVersion", pv) + ids.Host.Peerstore().Put(p, "AgentVersion", av) +} + +// IdentifyWait returns a channel which will be closed once +// "ProtocolIdentify" (handshake3) finishes on given conn. +// This happens async so the connection can start to be used +// even if handshake3 knowledge is not necesary. +// Users **MUST** call IdentifyWait _after_ IdentifyConn +func (ids *IDService) IdentifyWait(c inet.Conn) <-chan struct{} { + ids.currmu.Lock() + ch, found := ids.currid[c] + ids.currmu.Unlock() + if found { + return ch + } + + // if not found, it means we are already done identifying it, or + // haven't even started. either way, return a new channel closed. + ch = make(chan struct{}) + close(ch) + return ch +} diff --git a/p2p/protocol/identify/id_test.go b/p2p/protocol/identify/id_test.go new file mode 100644 index 000000000..c6707b0ac --- /dev/null +++ b/p2p/protocol/identify/id_test.go @@ -0,0 +1,106 @@ +package identify_test + +import ( + "testing" + "time" + + host "github.com/jbenet/go-ipfs/p2p/host" + peer "github.com/jbenet/go-ipfs/p2p/peer" + identify "github.com/jbenet/go-ipfs/p2p/protocol/identify" + testutil "github.com/jbenet/go-ipfs/p2p/test/util" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" +) + +func subtestIDService(t *testing.T, postDialWait time.Duration) { + + // the generated networks should have the id service wired in. + ctx := context.Background() + h1 := testutil.GenHostSwarm(t, ctx) + h2 := testutil.GenHostSwarm(t, ctx) + + h1p := h1.ID() + h2p := h2.ID() + + testKnowsAddrs(t, h1, h2p, []ma.Multiaddr{}) // nothing + testKnowsAddrs(t, h2, h1p, []ma.Multiaddr{}) // nothing + + h2pi := h2.Peerstore().PeerInfo(h2p) + if err := h1.Connect(ctx, h2pi); err != nil { + t.Fatal(err) + } + + // we need to wait here if Dial returns before ID service is finished. + if postDialWait > 0 { + <-time.After(postDialWait) + } + + // the IDService should be opened automatically, by the network. + // what we should see now is that both peers know about each others listen addresses. + testKnowsAddrs(t, h1, h2p, h2.Peerstore().Addresses(h2p)) // has them + testHasProtocolVersions(t, h1, h2p) + + // now, this wait we do have to do. it's the wait for the Listening side + // to be done identifying the connection. + c := h2.Network().ConnsToPeer(h1.ID()) + if len(c) < 1 { + t.Fatal("should have connection by now at least.") + } + <-h2.IDService().IdentifyWait(c[0]) + + // and the protocol versions. + testKnowsAddrs(t, h2, h1p, h1.Peerstore().Addresses(h1p)) // has them + testHasProtocolVersions(t, h2, h1p) +} + +func testKnowsAddrs(t *testing.T, h host.Host, p peer.ID, expected []ma.Multiaddr) { + actual := h.Peerstore().Addresses(p) + + if len(actual) != len(expected) { + t.Error("dont have the same addresses") + } + + have := map[string]struct{}{} + for _, addr := range actual { + have[addr.String()] = struct{}{} + } + for _, addr := range expected { + if _, found := have[addr.String()]; !found { + t.Errorf("%s did not have addr for %s: %s", h.ID(), p, addr) + // panic("ahhhhhhh") + } + } +} + +func testHasProtocolVersions(t *testing.T, h host.Host, p peer.ID) { + v, err := h.Peerstore().Get(p, "ProtocolVersion") + if v == nil { + t.Error("no protocol version") + return + } + if v.(string) != identify.IpfsVersion.String() { + t.Error("protocol mismatch", err) + } + v, err = h.Peerstore().Get(p, "AgentVersion") + if v.(string) != identify.ClientVersion { + t.Error("agent version mismatch", err) + } +} + +// TestIDServiceWait gives the ID service 100ms to finish after dialing +// this is becasue it used to be concurrent. Now, Dial wait till the +// id service is done. +func TestIDServiceWait(t *testing.T) { + N := 3 + for i := 0; i < N; i++ { + subtestIDService(t, 100*time.Millisecond) + } +} + +func TestIDServiceNoWait(t *testing.T) { + N := 3 + for i := 0; i < N; i++ { + subtestIDService(t, 0) + } +} diff --git a/p2p/protocol/identify/pb/Makefile b/p2p/protocol/identify/pb/Makefile new file mode 100644 index 000000000..d08f1c3eb --- /dev/null +++ b/p2p/protocol/identify/pb/Makefile @@ -0,0 +1,11 @@ + +PB = $(wildcard *.proto) +GO = $(PB:.proto=.pb.go) + +all: $(GO) + +%.pb.go: %.proto + protoc --gogo_out=. --proto_path=../../../../../../:/usr/local/opt/protobuf/include:. $< + +clean: + rm *.pb.go diff --git a/p2p/protocol/identify/pb/identify.pb.go b/p2p/protocol/identify/pb/identify.pb.go new file mode 100644 index 000000000..1f0458648 --- /dev/null +++ b/p2p/protocol/identify/pb/identify.pb.go @@ -0,0 +1,93 @@ +// Code generated by protoc-gen-gogo. +// source: identify.proto +// DO NOT EDIT! + +/* +Package identify_pb is a generated protocol buffer package. + +It is generated from these files: + identify.proto + +It has these top-level messages: + Identify +*/ +package identify_pb + +import proto "code.google.com/p/gogoprotobuf/proto" +import json "encoding/json" +import math "math" + +// Reference proto, json, and math imports to suppress error if they are not otherwise used. +var _ = proto.Marshal +var _ = &json.SyntaxError{} +var _ = math.Inf + +type Identify struct { + // protocolVersion determines compatibility between peers + ProtocolVersion *string `protobuf:"bytes,5,opt,name=protocolVersion" json:"protocolVersion,omitempty"` + // agentVersion is like a UserAgent string in browsers, or client version in bittorrent + // includes the client name and client. + AgentVersion *string `protobuf:"bytes,6,opt,name=agentVersion" json:"agentVersion,omitempty"` + // publicKey is this node's public key (which also gives its node.ID) + // - may not need to be sent, as secure channel implies it has been sent. + // - then again, if we change / disable secure channel, may still want it. + PublicKey []byte `protobuf:"bytes,1,opt,name=publicKey" json:"publicKey,omitempty"` + // listenAddrs are the multiaddrs the sender node listens for open connections on + ListenAddrs [][]byte `protobuf:"bytes,2,rep,name=listenAddrs" json:"listenAddrs,omitempty"` + // oservedAddr is the multiaddr of the remote endpoint that the sender node perceives + // this is useful information to convey to the other side, as it helps the remote endpoint + // determine whether its connection to the local peer goes through NAT. + ObservedAddr []byte `protobuf:"bytes,4,opt,name=observedAddr" json:"observedAddr,omitempty"` + // protocols are the services this node is running + Protocols []string `protobuf:"bytes,3,rep,name=protocols" json:"protocols,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Identify) Reset() { *m = Identify{} } +func (m *Identify) String() string { return proto.CompactTextString(m) } +func (*Identify) ProtoMessage() {} + +func (m *Identify) GetProtocolVersion() string { + if m != nil && m.ProtocolVersion != nil { + return *m.ProtocolVersion + } + return "" +} + +func (m *Identify) GetAgentVersion() string { + if m != nil && m.AgentVersion != nil { + return *m.AgentVersion + } + return "" +} + +func (m *Identify) GetPublicKey() []byte { + if m != nil { + return m.PublicKey + } + return nil +} + +func (m *Identify) GetListenAddrs() [][]byte { + if m != nil { + return m.ListenAddrs + } + return nil +} + +func (m *Identify) GetObservedAddr() []byte { + if m != nil { + return m.ObservedAddr + } + return nil +} + +func (m *Identify) GetProtocols() []string { + if m != nil { + return m.Protocols + } + return nil +} + +func init() { +} diff --git a/p2p/protocol/identify/pb/identify.proto b/p2p/protocol/identify/pb/identify.proto new file mode 100644 index 000000000..7d31e0474 --- /dev/null +++ b/p2p/protocol/identify/pb/identify.proto @@ -0,0 +1,27 @@ +package identify.pb; + +message Identify { + + // protocolVersion determines compatibility between peers + optional string protocolVersion = 5; // e.g. ipfs/1.0.0 + + // agentVersion is like a UserAgent string in browsers, or client version in bittorrent + // includes the client name and client. + optional string agentVersion = 6; // e.g. go-ipfs/0.1.0 + + // publicKey is this node's public key (which also gives its node.ID) + // - may not need to be sent, as secure channel implies it has been sent. + // - then again, if we change / disable secure channel, may still want it. + optional bytes publicKey = 1; + + // listenAddrs are the multiaddrs the sender node listens for open connections on + repeated bytes listenAddrs = 2; + + // oservedAddr is the multiaddr of the remote endpoint that the sender node perceives + // this is useful information to convey to the other side, as it helps the remote endpoint + // determine whether its connection to the local peer goes through NAT. + optional bytes observedAddr = 4; + + // protocols are the services this node is running + repeated string protocols = 3; +} diff --git a/p2p/protocol/relay/relay.go b/p2p/protocol/relay/relay.go new file mode 100644 index 000000000..12e0d204a --- /dev/null +++ b/p2p/protocol/relay/relay.go @@ -0,0 +1,156 @@ +package relay + +import ( + "fmt" + "io" + + mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash" + + host "github.com/jbenet/go-ipfs/p2p/host" + inet "github.com/jbenet/go-ipfs/p2p/net2" + peer "github.com/jbenet/go-ipfs/p2p/peer" + protocol "github.com/jbenet/go-ipfs/p2p/protocol" + eventlog "github.com/jbenet/go-ipfs/util/eventlog" +) + +var log = eventlog.Logger("p2p/protocol/relay") + +// ID is the protocol.ID of the Relay Service. +const ID protocol.ID = "/ipfs/relay" + +// Relay is a structure that implements ProtocolRelay. +// It is a simple relay service which forwards traffic +// between two directly connected peers. +// +// the protocol is very simple: +// +// /ipfs/relay\n +// +// +// +// +type RelayService struct { + host host.Host + handler inet.StreamHandler // for streams sent to us locally. +} + +func NewRelayService(h host.Host, sh inet.StreamHandler) *RelayService { + s := &RelayService{ + host: h, + handler: sh, + } + h.SetStreamHandler(ID, s.requestHandler) + return s +} + +// requestHandler is the function called by clients +func (rs *RelayService) requestHandler(s inet.Stream) { + if err := rs.handleStream(s); err != nil { + log.Error("RelayService error:", err) + } +} + +// handleStream is our own handler, which returns an error for simplicity. +func (rs *RelayService) handleStream(s inet.Stream) error { + defer s.Close() + + // read the header (src and dst peer.IDs) + src, dst, err := ReadHeader(s) + if err != nil { + return fmt.Errorf("stream with bad header: %s", err) + } + + local := rs.host.ID() + + switch { + case src == local: + return fmt.Errorf("relaying from self") + case dst == local: // it's for us! yaaay. + log.Debugf("%s consuming stream from %s", local, src) + return rs.consumeStream(s) + default: // src and dst are not local. relay it. + log.Debugf("%s relaying stream %s <--> %s", local, src, dst) + return rs.pipeStream(src, dst, s) + } +} + +// consumeStream connects streams directed to the local peer +// to our handler, with the header now stripped (read). +func (rs *RelayService) consumeStream(s inet.Stream) error { + rs.handler(s) // boom. + return nil +} + +// pipeStream relays over a stream to a remote peer. It's like `cat` +func (rs *RelayService) pipeStream(src, dst peer.ID, s inet.Stream) error { + s2, err := rs.openStreamToPeer(dst) + if err != nil { + return fmt.Errorf("failed to open stream to peer: %s -- %s", dst, err) + } + + if err := WriteHeader(s2, src, dst); err != nil { + return err + } + + // connect the series of tubes. + done := make(chan retio, 2) + go func() { + n, err := io.Copy(s2, s) + done <- retio{n, err} + }() + go func() { + n, err := io.Copy(s, s2) + done <- retio{n, err} + }() + + r1 := <-done + r2 := <-done + log.Infof("%s relayed %d/%d bytes between %s and %s", rs.host.ID(), r1.n, r2.n, src, dst) + + if r1.err != nil { + return r1.err + } + return r2.err +} + +// openStreamToPeer opens a pipe to a remote endpoint +// for now, can only open streams to directly connected peers. +// maybe we can do some routing later on. +func (rs *RelayService) openStreamToPeer(p peer.ID) (inet.Stream, error) { + return rs.host.NewStream(ID, p) +} + +func ReadHeader(r io.Reader) (src, dst peer.ID, err error) { + + mhr := mh.NewReader(r) + + s, err := mhr.ReadMultihash() + if err != nil { + return "", "", err + } + + d, err := mhr.ReadMultihash() + if err != nil { + return "", "", err + } + + return peer.ID(s), peer.ID(d), nil +} + +func WriteHeader(w io.Writer, src, dst peer.ID) error { + // write header to w. + mhw := mh.NewWriter(w) + if err := mhw.WriteMultihash(mh.Multihash(src)); err != nil { + return fmt.Errorf("failed to write relay header: %s -- %s", dst, err) + } + if err := mhw.WriteMultihash(mh.Multihash(dst)); err != nil { + return fmt.Errorf("failed to write relay header: %s -- %s", dst, err) + } + + return nil +} + +type retio struct { + n int64 + err error +} diff --git a/p2p/protocol/relay/relay_test.go b/p2p/protocol/relay/relay_test.go new file mode 100644 index 000000000..9aaedfd3d --- /dev/null +++ b/p2p/protocol/relay/relay_test.go @@ -0,0 +1,303 @@ +package relay_test + +import ( + "io" + "testing" + + inet "github.com/jbenet/go-ipfs/p2p/net2" + protocol "github.com/jbenet/go-ipfs/p2p/protocol" + relay "github.com/jbenet/go-ipfs/p2p/protocol/relay" + testutil "github.com/jbenet/go-ipfs/p2p/test/util" + eventlog "github.com/jbenet/go-ipfs/util/eventlog" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" +) + +var log = eventlog.Logger("relay_test") + +func TestRelaySimple(t *testing.T) { + + ctx := context.Background() + + // these networks have the relay service wired in already. + n1 := testutil.GenHostSwarm(t, ctx) + n2 := testutil.GenHostSwarm(t, ctx) + n3 := testutil.GenHostSwarm(t, ctx) + + n1p := n1.ID() + n2p := n2.ID() + n3p := n3.ID() + + n2pi := n2.Peerstore().PeerInfo(n2p) + if err := n1.Connect(ctx, n2pi); err != nil { + t.Fatal("Failed to connect:", err) + } + if err := n3.Connect(ctx, n2pi); err != nil { + t.Fatal("Failed to connect:", err) + } + + // setup handler on n3 to copy everything over to the pipe. + piper, pipew := io.Pipe() + n3.SetStreamHandler(protocol.TestingID, func(s inet.Stream) { + log.Debug("relay stream opened to n3!") + log.Debug("piping and echoing everything") + w := io.MultiWriter(s, pipew) + io.Copy(w, s) + log.Debug("closing stream") + s.Close() + }) + + // ok, now we can try to relay n1--->n2--->n3. + log.Debug("open relay stream") + s, err := n1.NewStream(relay.ID, n2p) + if err != nil { + t.Fatal(err) + } + + // ok first thing we write the relay header n1->n3 + log.Debug("write relay header") + if err := relay.WriteHeader(s, n1p, n3p); err != nil { + t.Fatal(err) + } + + // ok now the header's there, we can write the next protocol header. + log.Debug("write testing header") + if err := protocol.WriteHeader(s, protocol.TestingID); err != nil { + t.Fatal(err) + } + + // okay, now we should be able to write text, and read it out. + buf1 := []byte("abcdefghij") + buf2 := make([]byte, 10) + buf3 := make([]byte, 10) + log.Debug("write in some text.") + if _, err := s.Write(buf1); err != nil { + t.Fatal(err) + } + + // read it out from the pipe. + log.Debug("read it out from the pipe.") + if _, err := io.ReadFull(piper, buf2); err != nil { + t.Fatal(err) + } + if string(buf1) != string(buf2) { + t.Fatal("should've gotten that text out of the pipe") + } + + // read it out from the stream (echoed) + log.Debug("read it out from the stream (echoed).") + if _, err := io.ReadFull(s, buf3); err != nil { + t.Fatal(err) + } + if string(buf1) != string(buf3) { + t.Fatal("should've gotten that text out of the stream") + } + + // sweet. relay works. + log.Debug("sweet, relay works.") + s.Close() +} + +func TestRelayAcrossFour(t *testing.T) { + + ctx := context.Background() + + // these networks have the relay service wired in already. + n1 := testutil.GenHostSwarm(t, ctx) + n2 := testutil.GenHostSwarm(t, ctx) + n3 := testutil.GenHostSwarm(t, ctx) + n4 := testutil.GenHostSwarm(t, ctx) + n5 := testutil.GenHostSwarm(t, ctx) + + n1p := n1.ID() + n2p := n2.ID() + n3p := n3.ID() + n4p := n4.ID() + n5p := n5.ID() + + n2pi := n2.Peerstore().PeerInfo(n2p) + n4pi := n4.Peerstore().PeerInfo(n4p) + + if err := n1.Connect(ctx, n2pi); err != nil { + t.Fatalf("Failed to dial:", err) + } + if err := n3.Connect(ctx, n2pi); err != nil { + t.Fatalf("Failed to dial:", err) + } + if err := n3.Connect(ctx, n4pi); err != nil { + t.Fatalf("Failed to dial:", err) + } + if err := n5.Connect(ctx, n4pi); err != nil { + t.Fatalf("Failed to dial:", err) + } + + // setup handler on n5 to copy everything over to the pipe. + piper, pipew := io.Pipe() + n5.SetStreamHandler(protocol.TestingID, func(s inet.Stream) { + log.Debug("relay stream opened to n5!") + log.Debug("piping and echoing everything") + w := io.MultiWriter(s, pipew) + io.Copy(w, s) + log.Debug("closing stream") + s.Close() + }) + + // ok, now we can try to relay n1--->n2--->n3--->n4--->n5 + log.Debug("open relay stream") + s, err := n1.NewStream(relay.ID, n2p) + if err != nil { + t.Fatal(err) + } + + log.Debugf("write relay header n1->n3 (%s -> %s)", n1p, n3p) + if err := relay.WriteHeader(s, n1p, n3p); err != nil { + t.Fatal(err) + } + + log.Debugf("write relay header n1->n4 (%s -> %s)", n1p, n4p) + if err := protocol.WriteHeader(s, relay.ID); err != nil { + t.Fatal(err) + } + if err := relay.WriteHeader(s, n1p, n4p); err != nil { + t.Fatal(err) + } + + log.Debugf("write relay header n1->n5 (%s -> %s)", n1p, n5p) + if err := protocol.WriteHeader(s, relay.ID); err != nil { + t.Fatal(err) + } + if err := relay.WriteHeader(s, n1p, n5p); err != nil { + t.Fatal(err) + } + + // ok now the header's there, we can write the next protocol header. + log.Debug("write testing header") + if err := protocol.WriteHeader(s, protocol.TestingID); err != nil { + t.Fatal(err) + } + + // okay, now we should be able to write text, and read it out. + buf1 := []byte("abcdefghij") + buf2 := make([]byte, 10) + buf3 := make([]byte, 10) + log.Debug("write in some text.") + if _, err := s.Write(buf1); err != nil { + t.Fatal(err) + } + + // read it out from the pipe. + log.Debug("read it out from the pipe.") + if _, err := io.ReadFull(piper, buf2); err != nil { + t.Fatal(err) + } + if string(buf1) != string(buf2) { + t.Fatal("should've gotten that text out of the pipe") + } + + // read it out from the stream (echoed) + log.Debug("read it out from the stream (echoed).") + if _, err := io.ReadFull(s, buf3); err != nil { + t.Fatal(err) + } + if string(buf1) != string(buf3) { + t.Fatal("should've gotten that text out of the stream") + } + + // sweet. relay works. + log.Debug("sweet, relaying across 4 works.") + s.Close() +} + +func TestRelayStress(t *testing.T) { + buflen := 1 << 18 + iterations := 10 + + ctx := context.Background() + + // these networks have the relay service wired in already. + n1 := testutil.GenHostSwarm(t, ctx) + n2 := testutil.GenHostSwarm(t, ctx) + n3 := testutil.GenHostSwarm(t, ctx) + + n1p := n1.ID() + n2p := n2.ID() + n3p := n3.ID() + + n2pi := n2.Peerstore().PeerInfo(n2p) + if err := n1.Connect(ctx, n2pi); err != nil { + t.Fatalf("Failed to dial:", err) + } + if err := n3.Connect(ctx, n2pi); err != nil { + t.Fatalf("Failed to dial:", err) + } + + // setup handler on n3 to copy everything over to the pipe. + piper, pipew := io.Pipe() + n3.SetStreamHandler(protocol.TestingID, func(s inet.Stream) { + log.Debug("relay stream opened to n3!") + log.Debug("piping and echoing everything") + w := io.MultiWriter(s, pipew) + io.Copy(w, s) + log.Debug("closing stream") + s.Close() + }) + + // ok, now we can try to relay n1--->n2--->n3. + log.Debug("open relay stream") + s, err := n1.NewStream(relay.ID, n2p) + if err != nil { + t.Fatal(err) + } + + // ok first thing we write the relay header n1->n3 + log.Debug("write relay header") + if err := relay.WriteHeader(s, n1p, n3p); err != nil { + t.Fatal(err) + } + + // ok now the header's there, we can write the next protocol header. + log.Debug("write testing header") + if err := protocol.WriteHeader(s, protocol.TestingID); err != nil { + t.Fatal(err) + } + + // okay, now write lots of text and read it back out from both + // the pipe and the stream. + buf1 := make([]byte, buflen) + buf2 := make([]byte, len(buf1)) + buf3 := make([]byte, len(buf1)) + + fillbuf := func(buf []byte, b byte) { + for i := range buf { + buf[i] = b + } + } + + for i := 0; i < iterations; i++ { + fillbuf(buf1, byte(int('a')+i)) + log.Debugf("writing %d bytes (%d/%d)", len(buf1), i, iterations) + if _, err := s.Write(buf1); err != nil { + t.Fatal(err) + } + + log.Debug("read it out from the pipe.") + if _, err := io.ReadFull(piper, buf2); err != nil { + t.Fatal(err) + } + if string(buf1) != string(buf2) { + t.Fatal("should've gotten that text out of the pipe") + } + + // read it out from the stream (echoed) + log.Debug("read it out from the stream (echoed).") + if _, err := io.ReadFull(s, buf3); err != nil { + t.Fatal(err) + } + if string(buf1) != string(buf3) { + t.Fatal("should've gotten that text out of the stream") + } + } + + log.Debug("sweet, relay works under stress.") + s.Close() +} diff --git a/p2p/test/backpressure/backpressure.go b/p2p/test/backpressure/backpressure.go new file mode 100644 index 000000000..19950a728 --- /dev/null +++ b/p2p/test/backpressure/backpressure.go @@ -0,0 +1 @@ +package backpressure_tests diff --git a/p2p/test/backpressure/backpressure_test.go b/p2p/test/backpressure/backpressure_test.go new file mode 100644 index 000000000..729856e4c --- /dev/null +++ b/p2p/test/backpressure/backpressure_test.go @@ -0,0 +1,374 @@ +package backpressure_tests + +import ( + crand "crypto/rand" + "io" + "math/rand" + "testing" + "time" + + host "github.com/jbenet/go-ipfs/p2p/host" + inet "github.com/jbenet/go-ipfs/p2p/net2" + peer "github.com/jbenet/go-ipfs/p2p/peer" + protocol "github.com/jbenet/go-ipfs/p2p/protocol" + testutil "github.com/jbenet/go-ipfs/p2p/test/util" + eventlog "github.com/jbenet/go-ipfs/util/eventlog" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" +) + +var log = eventlog.Logger("backpressure") + +// TestBackpressureStreamHandler tests whether mux handler +// ratelimiting works. Meaning, since the handler is sequential +// it should block senders. +// +// Important note: spdystream (which peerstream uses) has a set +// of n workers (n=spdsystream.FRAME_WORKERS) which handle new +// frames, including those starting new streams. So all of them +// can be in the handler at one time. Also, the sending side +// does not rate limit unless we call stream.Wait() +// +// +// Note: right now, this happens muxer-wide. the muxer should +// learn to flow control, so handlers cant block each other. +func TestBackpressureStreamHandler(t *testing.T) { + t.Skip(`Sadly, as cool as this test is, it doesn't work +Because spdystream doesnt handle stream open backpressure +well IMO. I'll see about rewriting that part when it becomes +a problem. +`) + + // a number of concurrent request handlers + limit := 10 + + // our way to signal that we're done with 1 request + requestHandled := make(chan struct{}) + + // handler rate limiting + receiverRatelimit := make(chan struct{}, limit) + for i := 0; i < limit; i++ { + receiverRatelimit <- struct{}{} + } + + // sender counter of successfully opened streams + senderOpened := make(chan struct{}, limit*100) + + // sender signals it's done (errored out) + senderDone := make(chan struct{}) + + // the receiver handles requests with some rate limiting + receiver := func(s inet.Stream) { + log.Debug("receiver received a stream") + + <-receiverRatelimit // acquire + go func() { + // our request handler. can do stuff here. we + // simulate something taking time by waiting + // on requestHandled + log.Error("request worker handling...") + <-requestHandled + log.Error("request worker done!") + receiverRatelimit <- struct{}{} // release + }() + } + + // the sender opens streams as fast as possible + sender := func(host host.Host, remote peer.ID) { + var s inet.Stream + var err error + defer func() { + t.Error(err) + log.Debug("sender error. exiting.") + senderDone <- struct{}{} + }() + + for { + s, err = host.NewStream(protocol.TestingID, remote) + if err != nil { + return + } + + _ = s + // if err = s.SwarmStream().Stream().Wait(); err != nil { + // return + // } + + // "count" another successfully opened stream + // (large buffer so shouldn't block in normal operation) + log.Debug("sender opened another stream!") + senderOpened <- struct{}{} + } + } + + // count our senderOpened events + countStreamsOpenedBySender := func(min int) int { + opened := 0 + for opened < min { + log.Debugf("countStreamsOpenedBySender got %d (min %d)", opened, min) + select { + case <-senderOpened: + opened++ + case <-time.After(10 * time.Millisecond): + } + } + return opened + } + + // count our received events + // waitForNReceivedStreams := func(n int) { + // for n > 0 { + // log.Debugf("waiting for %d received streams...", n) + // select { + // case <-receiverRatelimit: + // n-- + // } + // } + // } + + testStreamsOpened := func(expected int) { + log.Debugf("testing rate limited to %d streams", expected) + if n := countStreamsOpenedBySender(expected); n != expected { + t.Fatalf("rate limiting did not work :( -- %d != %d", expected, n) + } + } + + // ok that's enough setup. let's do it! + + ctx := context.Background() + h1 := testutil.GenHostSwarm(t, ctx) + h2 := testutil.GenHostSwarm(t, ctx) + + // setup receiver handler + h1.SetStreamHandler(protocol.TestingID, receiver) + + h2pi := h2.Peerstore().PeerInfo(h2.ID()) + log.Debugf("dialing %s", h2pi.Addrs) + if err := h1.Connect(ctx, h2pi); err != nil { + t.Fatalf("Failed to connect:", err) + } + + // launch sender! + go sender(h2, h1.ID()) + + // ok, what do we expect to happen? the receiver should + // receive 10 requests and stop receiving, blocking the sender. + // we can test this by counting 10x senderOpened requests + + <-senderOpened // wait for the sender to successfully open some. + testStreamsOpened(limit - 1) + + // let's "handle" 3 requests. + <-requestHandled + <-requestHandled + <-requestHandled + // the sender should've now been able to open exactly 3 more. + + testStreamsOpened(3) + + // shouldn't have opened anything more + testStreamsOpened(0) + + // let's "handle" 100 requests in batches of 5 + for i := 0; i < 20; i++ { + <-requestHandled + <-requestHandled + <-requestHandled + <-requestHandled + <-requestHandled + testStreamsOpened(5) + } + + // success! + + // now for the sugar on top: let's tear down the receiver. it should + // exit the sender. + h1.Close() + + // shouldn't have opened anything more + testStreamsOpened(0) + + select { + case <-time.After(100 * time.Millisecond): + t.Error("receiver shutdown failed to exit sender") + case <-senderDone: + log.Info("handler backpressure works!") + } +} + +// TestStBackpressureStreamWrite tests whether streams see proper +// backpressure when writing data over the network streams. +func TestStBackpressureStreamWrite(t *testing.T) { + + // senderWrote signals that the sender wrote bytes to remote. + // the value is the count of bytes written. + senderWrote := make(chan int, 10000) + + // sender signals it's done (errored out) + senderDone := make(chan struct{}) + + // writeStats lets us listen to all the writes and return + // how many happened and how much was written + writeStats := func() (int, int) { + writes := 0 + bytes := 0 + for { + select { + case n := <-senderWrote: + writes++ + bytes = bytes + n + default: + log.Debugf("stats: sender wrote %d bytes, %d writes", bytes, writes) + return bytes, writes + } + } + } + + // sender attempts to write as fast as possible, signaling on the + // completion of every write. This makes it possible to see how + // fast it's actually writing. We pair this with a receiver + // that waits for a signal to read. + sender := func(s inet.Stream) { + defer func() { + s.Close() + senderDone <- struct{}{} + }() + + // ready a buffer of random data + buf := make([]byte, 65536) + crand.Read(buf) + + for { + // send a randomly sized subchunk + from := rand.Intn(len(buf) / 2) + to := rand.Intn(len(buf) / 2) + sendbuf := buf[from : from+to] + + n, err := s.Write(sendbuf) + if err != nil { + log.Debug("sender error. exiting:", err) + return + } + + log.Debugf("sender wrote %d bytes", n) + senderWrote <- n + } + } + + // receive a number of bytes from a stream. + // returns the number of bytes written. + receive := func(s inet.Stream, expect int) { + log.Debugf("receiver to read %d bytes", expect) + rbuf := make([]byte, expect) + n, err := io.ReadFull(s, rbuf) + if err != nil { + t.Error("read failed:", err) + } + if expect != n { + t.Error("read len differs: %d != %d", expect, n) + } + } + + // ok let's do it! + + // setup the networks + ctx := context.Background() + h1 := testutil.GenHostSwarm(t, ctx) + h2 := testutil.GenHostSwarm(t, ctx) + + // setup sender handler on 1 + h1.SetStreamHandler(protocol.TestingID, sender) + + h2pi := h2.Peerstore().PeerInfo(h2.ID()) + log.Debugf("dialing %s", h2pi.Addrs) + if err := h1.Connect(ctx, h2pi); err != nil { + t.Fatalf("Failed to connect:", err) + } + + // open a stream, from 2->1, this is our reader + s, err := h2.NewStream(protocol.TestingID, h1.ID()) + if err != nil { + t.Fatal(err) + } + + // let's make sure r/w works. + testSenderWrote := func(bytesE int) { + bytesA, writesA := writeStats() + if bytesA != bytesE { + t.Errorf("numbers failed: %d =?= %d bytes, via %d writes", bytesA, bytesE, writesA) + } + } + + // 500ms rounds of lockstep write + drain + roundsStart := time.Now() + roundsTotal := 0 + for roundsTotal < (2 << 20) { + // let the sender fill its buffers, it will stop sending. + <-time.After(300 * time.Millisecond) + b, _ := writeStats() + testSenderWrote(0) + testSenderWrote(0) + + // drain it all, wait again + receive(s, b) + roundsTotal = roundsTotal + b + } + roundsTime := time.Now().Sub(roundsStart) + + // now read continously, while we measure stats. + stop := make(chan struct{}) + contStart := time.Now() + + go func() { + for { + select { + case <-stop: + return + default: + receive(s, 2<<15) + } + } + }() + + contTotal := 0 + for contTotal < (2 << 20) { + n := <-senderWrote + contTotal += n + } + stop <- struct{}{} + contTime := time.Now().Sub(contStart) + + // now compare! continuous should've been faster AND larger + if roundsTime < contTime { + t.Error("continuous should have been faster") + } + + if roundsTotal < contTotal { + t.Error("continuous should have been larger, too!") + } + + // and a couple rounds more for good measure ;) + for i := 0; i < 3; i++ { + // let the sender fill its buffers, it will stop sending. + <-time.After(300 * time.Millisecond) + b, _ := writeStats() + testSenderWrote(0) + testSenderWrote(0) + + // drain it all, wait again + receive(s, b) + } + + // this doesn't work :(: + // // now for the sugar on top: let's tear down the receiver. it should + // // exit the sender. + // n1.Close() + // testSenderWrote(0) + // testSenderWrote(0) + // select { + // case <-time.After(2 * time.Second): + // t.Error("receiver shutdown failed to exit sender") + // case <-senderDone: + // log.Info("handler backpressure works!") + // } +} diff --git a/p2p/test/util/util.go b/p2p/test/util/util.go new file mode 100644 index 000000000..3c831df67 --- /dev/null +++ b/p2p/test/util/util.go @@ -0,0 +1,37 @@ +package testutil + +import ( + "testing" + + bhost "github.com/jbenet/go-ipfs/p2p/host/basic" + inet "github.com/jbenet/go-ipfs/p2p/net2" + swarm "github.com/jbenet/go-ipfs/p2p/net2/swarm" + peer "github.com/jbenet/go-ipfs/p2p/peer" + tu "github.com/jbenet/go-ipfs/util/testutil" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" +) + +func GenSwarmNetwork(t *testing.T, ctx context.Context) *swarm.Network { + p := tu.RandPeerNetParamsOrFatal(t) + ps := peer.NewPeerstore() + ps.AddAddress(p.ID, p.Addr) + ps.AddPubKey(p.ID, p.PubKey) + ps.AddPrivKey(p.ID, p.PrivKey) + n, err := swarm.NewNetwork(ctx, ps.Addresses(p.ID), p.ID, ps) + if err != nil { + t.Fatal(err) + } + return n +} + +func DivulgeAddresses(a, b inet.Network) { + id := a.LocalPeer() + addrs := a.Peerstore().Addresses(id) + b.Peerstore().AddAddresses(id, addrs) +} + +func GenHostSwarm(t *testing.T, ctx context.Context) *bhost.BasicHost { + n := GenSwarmNetwork(t, ctx) + return bhost.New(n) +}