diff --git a/core/core.go b/core/core.go index 7cabef281..41e974c30 100644 --- a/core/core.go +++ b/core/core.go @@ -21,6 +21,7 @@ import ( merkledag "github.com/jbenet/go-ipfs/merkledag" namesys "github.com/jbenet/go-ipfs/namesys" inet "github.com/jbenet/go-ipfs/net" + ipfsnet "github.com/jbenet/go-ipfs/net/ipfsnet" path "github.com/jbenet/go-ipfs/path" peer "github.com/jbenet/go-ipfs/peer" pin "github.com/jbenet/go-ipfs/pin" @@ -121,7 +122,7 @@ func NewIpfsNode(ctx context.Context, cfg *config.Config, online bool) (n *IpfsN return nil, debugerror.Wrap(err) } - n.Network, err = inet.NewNetwork(ctx, listenAddrs, n.Identity, n.Peerstore) + n.Network, err = ipfsnet.NewNetwork(ctx, listenAddrs, n.Identity, n.Peerstore) if err != nil { return nil, debugerror.Wrap(err) } diff --git a/net/backpressure/backpressure_test.go b/net/backpressure/backpressure_test.go index 7a8593ee4..72e0b2634 100644 --- a/net/backpressure/backpressure_test.go +++ b/net/backpressure/backpressure_test.go @@ -8,30 +8,15 @@ import ( "time" inet "github.com/jbenet/go-ipfs/net" + netutil "github.com/jbenet/go-ipfs/net/ipfsnet/util" peer "github.com/jbenet/go-ipfs/peer" eventlog "github.com/jbenet/go-ipfs/util/eventlog" - testutil "github.com/jbenet/go-ipfs/util/testutil" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ) var log = eventlog.Logger("backpressure") -func GenNetwork(t *testing.T, ctx context.Context) (inet.Network, error) { - p := testutil.RandPeerNetParamsOrFatal(t) - ps := peer.NewPeerstore() - ps.AddAddress(p.ID, p.Addr) - ps.AddPubKey(p.ID, p.PubKey) - ps.AddPrivKey(p.ID, p.PrivKey) - return inet.NewNetwork(ctx, ps.Addresses(p.ID), p.ID, ps) -} - -func divulgeAddresses(a, b inet.Network) { - id := a.LocalPeer() - addrs := a.Peerstore().Addresses(id) - b.Peerstore().AddAddresses(id, addrs) -} - // TestBackpressureStreamHandler tests whether mux handler // ratelimiting works. Meaning, since the handler is sequential // it should block senders. @@ -149,14 +134,8 @@ a problem. // ok that's enough setup. let's do it! ctx := context.Background() - n1, err := GenNetwork(t, ctx) - if err != nil { - t.Fatal(err) - } - n2, err := GenNetwork(t, ctx) - if err != nil { - t.Fatal(err) - } + n1 := netutil.GenNetwork(t, ctx) + n2 := netutil.GenNetwork(t, ctx) // setup receiver handler n1.SetHandler(inet.ProtocolTesting, receiver) @@ -291,17 +270,11 @@ func TestStBackpressureStreamWrite(t *testing.T) { // setup the networks ctx := context.Background() - n1, err := GenNetwork(t, ctx) - if err != nil { - t.Fatal(err) - } - n2, err := GenNetwork(t, ctx) - if err != nil { - t.Fatal(err) - } + n1 := netutil.GenNetwork(t, ctx) + n2 := netutil.GenNetwork(t, ctx) - divulgeAddresses(n1, n2) - divulgeAddresses(n2, n1) + netutil.DivulgeAddresses(n1, n2) + netutil.DivulgeAddresses(n2, n1) // setup sender handler on 1 n1.SetHandler(inet.ProtocolTesting, sender) @@ -313,6 +286,9 @@ func TestStBackpressureStreamWrite(t *testing.T) { // open a stream, from 2->1, this is our reader s, err := n2.NewStream(inet.ProtocolTesting, n1.LocalPeer()) + if err != nil { + t.Fatal(err) + } // let's make sure r/w works. testSenderWrote := func(bytesE int) { diff --git a/net/interface.go b/net/interface.go index 7f9f3e617..5f16ec84e 100644 --- a/net/interface.go +++ b/net/interface.go @@ -23,6 +23,7 @@ const ( ProtocolDHT ProtocolID = "/ipfs/dht" ProtocolIdentify ProtocolID = "/ipfs/id" ProtocolDiag ProtocolID = "/ipfs/diagnostics" + ProtocolRelay ProtocolID = "/ipfs/relay" ) // MessageSizeMax is a soft (recommended) maximum for network messages. @@ -96,11 +97,6 @@ type Network interface { // CtxGroup returns the network's contextGroup CtxGroup() ctxgroup.ContextGroup - - // IdentifyProtocol returns the instance of the object running the Identify - // Protocol. This is what runs the ifps handshake-- this should be removed - // if this abstracted out to its own package. - IdentifyProtocol() *IDService } // Dialer represents a service that can dial out to peers diff --git a/net/net.go b/net/ipfsnet/net.go similarity index 71% rename from net/net.go rename to net/ipfsnet/net.go index 39afc6b10..bbd356ff9 100644 --- a/net/net.go +++ b/net/ipfsnet/net.go @@ -5,14 +5,20 @@ import ( "fmt" ic "github.com/jbenet/go-ipfs/crypto" + inet "github.com/jbenet/go-ipfs/net" + ids "github.com/jbenet/go-ipfs/net/services/identify" + mux "github.com/jbenet/go-ipfs/net/services/mux" swarm "github.com/jbenet/go-ipfs/net/swarm" peer "github.com/jbenet/go-ipfs/peer" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + eventlog "github.com/jbenet/go-ipfs/util/eventlog" ) +var log = eventlog.Logger("net/mux") + type stream swarm.Stream func (s *stream) SwarmStream() *swarm.Stream { @@ -20,7 +26,7 @@ func (s *stream) SwarmStream() *swarm.Stream { } // Conn returns the connection this stream is part of. -func (s *stream) Conn() Conn { +func (s *stream) Conn() inet.Conn { c := s.SwarmStream().Conn() return (*conn_)(c) } @@ -50,7 +56,7 @@ func (c *conn_) SwarmConn() *swarm.Conn { return (*swarm.Conn)(c) } -func (c *conn_) NewStreamWithProtocol(pr ProtocolID) (Stream, error) { +func (c *conn_) NewStreamWithProtocol(pr inet.ProtocolID) (inet.Stream, error) { s, err := (*swarm.Conn)(c).NewStream() if err != nil { return nil, err @@ -58,7 +64,7 @@ func (c *conn_) NewStreamWithProtocol(pr ProtocolID) (Stream, error) { ss := (*stream)(s) - if err := WriteProtocolHeader(pr, ss); err != nil { + if err := mux.WriteProtocolHeader(pr, ss); err != nil { ss.Close() return nil, err } @@ -90,30 +96,32 @@ func (c *conn_) RemotePublicKey() ic.PubKey { return c.SwarmConn().RemotePublicKey() } -// network implements the Network interface, -type network struct { - local peer.ID // local peer - mux Mux // protocol multiplexing - swarm *swarm.Swarm // peer connection multiplexing +// Network implements the inet.Network interface. +// It uses a swarm to connect to remote hosts. +type Network struct { + local peer.ID // local peer ps peer.Peerstore - ids *IDService + + swarm *swarm.Swarm // peer connection multiplexing + mux mux.Mux // protocol multiplexing + ids *ids.IDService cg ctxgroup.ContextGroup // for Context closing } // NewNetwork constructs a new network and starts listening on given addresses. func NewNetwork(ctx context.Context, listen []ma.Multiaddr, local peer.ID, - peers peer.Peerstore) (Network, error) { + peers peer.Peerstore) (*Network, error) { s, err := swarm.NewSwarm(ctx, listen, local, peers) if err != nil { return nil, err } - n := &network{ + n := &Network{ local: local, swarm: s, - mux: Mux{Handlers: StreamHandlerMap{}}, + mux: mux.Mux{Handlers: inet.StreamHandlerMap{}}, cg: ctxgroup.WithContext(ctx), ps: peers, } @@ -127,20 +135,20 @@ func NewNetwork(ctx context.Context, listen []ma.Multiaddr, local peer.ID, // setup a conn handler that immediately "asks the other side about them" // this is ProtocolIdentify. - n.ids = NewIDService(n) + n.ids = ids.NewIDService(n) s.SetConnHandler(n.newConnHandler) return n, nil } -func (n *network) newConnHandler(c *swarm.Conn) { +func (n *Network) newConnHandler(c *swarm.Conn) { cc := (*conn_)(c) n.ids.IdentifyConn(cc) } // DialPeer attempts to establish a connection to a given peer. // Respects the context. -func (n *network) DialPeer(ctx context.Context, p peer.ID) error { +func (n *Network) DialPeer(ctx context.Context, p peer.ID) error { log.Debugf("[%s] network dialing peer [%s]", n.local, p) sc, err := n.swarm.Dial(ctx, p) if err != nil { @@ -165,39 +173,40 @@ func (n *network) DialPeer(ctx context.Context, p peer.ID) error { return nil } -func (n *network) Protocols() []ProtocolID { +// Protocols returns the ProtocolIDs of all the registered handlers. +func (n *Network) Protocols() []inet.ProtocolID { return n.mux.Protocols() } // CtxGroup returns the network's ContextGroup -func (n *network) CtxGroup() ctxgroup.ContextGroup { +func (n *Network) CtxGroup() ctxgroup.ContextGroup { return n.cg } // Swarm returns the network's peerstream.Swarm -func (n *network) Swarm() *swarm.Swarm { +func (n *Network) Swarm() *swarm.Swarm { return n.Swarm() } // LocalPeer the network's LocalPeer -func (n *network) LocalPeer() peer.ID { +func (n *Network) LocalPeer() peer.ID { return n.swarm.LocalPeer() } // Peers returns the connected peers -func (n *network) Peers() []peer.ID { +func (n *Network) Peers() []peer.ID { return n.swarm.Peers() } // Peers returns the connected peers -func (n *network) Peerstore() peer.Peerstore { +func (n *Network) Peerstore() peer.Peerstore { return n.ps } // Conns returns the connected peers -func (n *network) Conns() []Conn { +func (n *Network) Conns() []inet.Conn { conns1 := n.swarm.Connections() - out := make([]Conn, len(conns1)) + out := make([]inet.Conn, len(conns1)) for i, c := range conns1 { out[i] = (*conn_)(c) } @@ -205,9 +214,9 @@ func (n *network) Conns() []Conn { } // ConnsToPeer returns the connections in this Netowrk for given peer. -func (n *network) ConnsToPeer(p peer.ID) []Conn { +func (n *Network) ConnsToPeer(p peer.ID) []inet.Conn { conns1 := n.swarm.ConnectionsToPeer(p) - out := make([]Conn, len(conns1)) + out := make([]inet.Conn, len(conns1)) for i, c := range conns1 { out[i] = (*conn_)(c) } @@ -215,53 +224,53 @@ func (n *network) ConnsToPeer(p peer.ID) []Conn { } // ClosePeer connection to peer -func (n *network) ClosePeer(p peer.ID) error { +func (n *Network) ClosePeer(p peer.ID) error { return n.swarm.CloseConnection(p) } // close is the real teardown function -func (n *network) close() error { +func (n *Network) close() error { return n.swarm.Close() } // Close calls the ContextCloser func -func (n *network) Close() error { +func (n *Network) Close() error { return n.cg.Close() } // BandwidthTotals returns the total amount of bandwidth transferred -func (n *network) BandwidthTotals() (in uint64, out uint64) { +func (n *Network) BandwidthTotals() (in uint64, out uint64) { // need to implement this. probably best to do it in swarm this time. // need a "metrics" object return 0, 0 } // ListenAddresses returns a list of addresses at which this network listens. -func (n *network) ListenAddresses() []ma.Multiaddr { +func (n *Network) ListenAddresses() []ma.Multiaddr { return n.swarm.ListenAddresses() } // InterfaceListenAddresses returns a list of addresses at which this network // listens. It expands "any interface" addresses (/ip4/0.0.0.0, /ip6/::) to // use the known local interfaces. -func (n *network) InterfaceListenAddresses() ([]ma.Multiaddr, error) { +func (n *Network) InterfaceListenAddresses() ([]ma.Multiaddr, error) { return swarm.InterfaceListenAddresses(n.swarm) } // Connectedness returns a state signaling connection capabilities // For now only returns Connected || NotConnected. Expand into more later. -func (n *network) Connectedness(p peer.ID) Connectedness { +func (n *Network) Connectedness(p peer.ID) inet.Connectedness { c := n.swarm.ConnectionsToPeer(p) if c != nil && len(c) > 0 { - return Connected + return inet.Connected } - return NotConnected + return inet.NotConnected } // NewStream returns a new stream to given peer p. // If there is no connection to p, attempts to create one. // If ProtocolID is "", writes no header. -func (n *network) NewStream(pr ProtocolID, p peer.ID) (Stream, error) { +func (n *Network) NewStream(pr inet.ProtocolID, p peer.ID) (inet.Stream, error) { log.Debugf("[%s] network opening stream to peer [%s]: %s", n.local, p, pr) s, err := n.swarm.NewStreamWithPeer(p) if err != nil { @@ -270,7 +279,7 @@ func (n *network) NewStream(pr ProtocolID, p peer.ID) (Stream, error) { ss := (*stream)(s) - if err := WriteProtocolHeader(pr, ss); err != nil { + if err := mux.WriteProtocolHeader(pr, ss); err != nil { ss.Close() return nil, err } @@ -280,23 +289,16 @@ func (n *network) NewStream(pr ProtocolID, p peer.ID) (Stream, error) { // SetHandler sets the protocol handler on the Network's Muxer. // This operation is threadsafe. -func (n *network) SetHandler(p ProtocolID, h StreamHandler) { +func (n *Network) SetHandler(p inet.ProtocolID, h inet.StreamHandler) { n.mux.SetHandler(p, h) } -func (n *network) String() string { +// String returns a string representation of Network. +func (n *Network) String() string { return fmt.Sprintf("", n.LocalPeer()) } -func (n *network) IdentifyProtocol() *IDService { +// IdentifyProtocol returns the network's IDService +func (n *Network) IdentifyProtocol() *ids.IDService { return n.ids } - -func WriteProtocolHeader(pr ProtocolID, s Stream) error { - if pr != "" { // only write proper protocol headers - if err := WriteLengthPrefix(s, string(pr)); err != nil { - return err - } - } - return nil -} diff --git a/net/net_test.go b/net/ipfsnet/net_test.go similarity index 93% rename from net/net_test.go rename to net/ipfsnet/net_test.go index 0704ec25c..8cdd30f84 100644 --- a/net/net_test.go +++ b/net/ipfsnet/net_test.go @@ -6,7 +6,9 @@ import ( "time" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + inet "github.com/jbenet/go-ipfs/net" + netutil "github.com/jbenet/go-ipfs/net/ipfsnet/util" ) // TestConnectednessCorrect starts a few networks, connects a few @@ -17,13 +19,13 @@ func TestConnectednessCorrect(t *testing.T) { nets := make([]inet.Network, 4) for i := 0; i < 4; i++ { - nets[i] = GenNetwork(t, ctx) + nets[i] = netutil.GenNetwork(t, ctx) } // connect 0-1, 0-2, 0-3, 1-2, 2-3 dial := func(a, b inet.Network) { - DivulgeAddresses(b, a) + netutil.DivulgeAddresses(b, a) if err := a.DialPeer(ctx, b.LocalPeer()); err != nil { t.Fatalf("Failed to dial: %s", err) } diff --git a/net/ipfsnet/util/util.go b/net/ipfsnet/util/util.go new file mode 100644 index 000000000..6260cfc3f --- /dev/null +++ b/net/ipfsnet/util/util.go @@ -0,0 +1,31 @@ +package testutil + +import ( + "testing" + + inet "github.com/jbenet/go-ipfs/net" + in "github.com/jbenet/go-ipfs/net/ipfsnet" + peer "github.com/jbenet/go-ipfs/peer" + tu "github.com/jbenet/go-ipfs/util/testutil" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" +) + +func GenNetwork(t *testing.T, ctx context.Context) *in.Network { + p := tu.RandPeerNetParamsOrFatal(t) + ps := peer.NewPeerstore() + ps.AddAddress(p.ID, p.Addr) + ps.AddPubKey(p.ID, p.PubKey) + ps.AddPrivKey(p.ID, p.PrivKey) + n, err := in.NewNetwork(ctx, ps.Addresses(p.ID), p.ID, ps) + if err != nil { + t.Fatal(err) + } + return n +} + +func DivulgeAddresses(a, b inet.Network) { + id := a.LocalPeer() + addrs := a.Peerstore().Addresses(id) + b.Peerstore().AddAddresses(id, addrs) +} diff --git a/net/mock/mock_conn.go b/net/mock/mock_conn.go index e92ff6fa4..89af0a4a4 100644 --- a/net/mock/mock_conn.go +++ b/net/mock/mock_conn.go @@ -6,6 +6,7 @@ import ( ic "github.com/jbenet/go-ipfs/crypto" inet "github.com/jbenet/go-ipfs/net" + mux "github.com/jbenet/go-ipfs/net/services/mux" peer "github.com/jbenet/go-ipfs/peer" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" @@ -86,7 +87,7 @@ func (c *conn) NewStreamWithProtocol(pr inet.ProtocolID) (inet.Stream, error) { log.Debugf("Conn.NewStreamWithProtocol: %s --> %s", c.local, c.remote) s := c.openStream() - if err := inet.WriteProtocolHeader(pr, s); err != nil { + if err := mux.WriteProtocolHeader(pr, s); err != nil { s.Close() return nil, err } diff --git a/net/mock/mock_peernet.go b/net/mock/mock_peernet.go index 51ffb1e72..675c73442 100644 --- a/net/mock/mock_peernet.go +++ b/net/mock/mock_peernet.go @@ -7,6 +7,8 @@ import ( ic "github.com/jbenet/go-ipfs/crypto" inet "github.com/jbenet/go-ipfs/net" + ids "github.com/jbenet/go-ipfs/net/services/identify" + mux "github.com/jbenet/go-ipfs/net/services/mux" peer "github.com/jbenet/go-ipfs/peer" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" @@ -28,8 +30,8 @@ type peernet struct { connsByLink map[*link]map[*conn]struct{} // needed to implement inet.Network - mux inet.Mux - ids *inet.IDService + mux mux.Mux + ids *ids.IDService cg ctxgroup.ContextGroup sync.RWMutex @@ -54,7 +56,7 @@ func newPeernet(ctx context.Context, m *mocknet, k ic.PrivKey, mocknet: m, peer: p, ps: ps, - mux: inet.Mux{Handlers: inet.StreamHandlerMap{}}, + mux: mux.Mux{Handlers: inet.StreamHandlerMap{}}, cg: ctxgroup.WithContext(ctx), connsByPeer: map[peer.ID]map[*conn]struct{}{}, @@ -65,7 +67,7 @@ func newPeernet(ctx context.Context, m *mocknet, k ic.PrivKey, // setup a conn handler that immediately "asks the other side about them" // this is ProtocolIdentify. - n.ids = inet.NewIDService(n) + n.ids = ids.NewIDService(n) return n, nil } @@ -338,6 +340,6 @@ func (pn *peernet) SetHandler(p inet.ProtocolID, h inet.StreamHandler) { pn.mux.SetHandler(p, h) } -func (pn *peernet) IdentifyProtocol() *inet.IDService { +func (pn *peernet) IdentifyProtocol() *ids.IDService { return pn.ids } diff --git a/net/id.go b/net/services/identify/id.go similarity index 84% rename from net/id.go rename to net/services/identify/id.go index 802d54794..df624187d 100644 --- a/net/id.go +++ b/net/services/identify/id.go @@ -1,15 +1,22 @@ -package net +package identify import ( "sync" - handshake "github.com/jbenet/go-ipfs/net/handshake" - pb "github.com/jbenet/go-ipfs/net/handshake/pb" - ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + + inet "github.com/jbenet/go-ipfs/net" + handshake "github.com/jbenet/go-ipfs/net/handshake" + pb "github.com/jbenet/go-ipfs/net/handshake/pb" + eventlog "github.com/jbenet/go-ipfs/util/eventlog" ) +var log = eventlog.Logger("net/identify") + +// ProtocolIdentify is the ProtocolID of the Identify Service. +const ProtocolIdentify inet.ProtocolID = "/ipfs/identify" + // IDService is a structure that implements ProtocolIdentify. // It is a trivial service that gives the other peer some // useful information about the local peer. A sort of hello. @@ -19,24 +26,24 @@ import ( // * Our IPFS Agent Version // * Our public Listen Addresses type IDService struct { - Network Network + Network inet.Network // connections undergoing identification // for wait purposes - currid map[Conn]chan struct{} + currid map[inet.Conn]chan struct{} currmu sync.RWMutex } -func NewIDService(n Network) *IDService { +func NewIDService(n inet.Network) *IDService { s := &IDService{ Network: n, - currid: make(map[Conn]chan struct{}), + currid: make(map[inet.Conn]chan struct{}), } n.SetHandler(ProtocolIdentify, s.RequestHandler) return s } -func (ids *IDService) IdentifyConn(c Conn) { +func (ids *IDService) IdentifyConn(c inet.Conn) { ids.currmu.Lock() if wait, found := ids.currid[c]; found { ids.currmu.Unlock() @@ -70,7 +77,7 @@ func (ids *IDService) IdentifyConn(c Conn) { close(ch) // release everyone waiting. } -func (ids *IDService) RequestHandler(s Stream) { +func (ids *IDService) RequestHandler(s inet.Stream) { defer s.Close() c := s.Conn() @@ -83,7 +90,7 @@ func (ids *IDService) RequestHandler(s Stream) { c.RemotePeer(), c.RemoteMultiaddr()) } -func (ids *IDService) ResponseHandler(s Stream) { +func (ids *IDService) ResponseHandler(s inet.Stream) { defer s.Close() c := s.Conn() @@ -100,7 +107,7 @@ func (ids *IDService) ResponseHandler(s Stream) { c.RemotePeer(), c.RemoteMultiaddr()) } -func (ids *IDService) populateMessage(mes *pb.Handshake3, c Conn) { +func (ids *IDService) populateMessage(mes *pb.Handshake3, c inet.Conn) { // set protocols this node is currently handling protos := ids.Network.Protocols() @@ -129,7 +136,7 @@ func (ids *IDService) populateMessage(mes *pb.Handshake3, c Conn) { mes.H1 = handshake.NewHandshake1("", "") } -func (ids *IDService) consumeMessage(mes *pb.Handshake3, c Conn) { +func (ids *IDService) consumeMessage(mes *pb.Handshake3, c inet.Conn) { p := c.RemotePeer() // mes.Protocols @@ -164,7 +171,7 @@ func (ids *IDService) consumeMessage(mes *pb.Handshake3, c Conn) { // This happens async so the connection can start to be used // even if handshake3 knowledge is not necesary. // Users **MUST** call IdentifyWait _after_ IdentifyConn -func (ids *IDService) IdentifyWait(c Conn) <-chan struct{} { +func (ids *IDService) IdentifyWait(c inet.Conn) <-chan struct{} { ids.currmu.Lock() ch, found := ids.currid[c] ids.currmu.Unlock() diff --git a/net/id_test.go b/net/services/identify/id_test.go similarity index 82% rename from net/id_test.go rename to net/services/identify/id_test.go index 524616a2d..6583683a4 100644 --- a/net/id_test.go +++ b/net/services/identify/id_test.go @@ -1,4 +1,4 @@ -package net_test +package identify_test import ( "testing" @@ -6,38 +6,19 @@ import ( inet "github.com/jbenet/go-ipfs/net" handshake "github.com/jbenet/go-ipfs/net/handshake" + netutil "github.com/jbenet/go-ipfs/net/ipfsnet/util" peer "github.com/jbenet/go-ipfs/peer" - testutil "github.com/jbenet/go-ipfs/util/testutil" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ) -func GenNetwork(t *testing.T, ctx context.Context) inet.Network { - p := testutil.RandPeerNetParamsOrFatal(t) - ps := peer.NewPeerstore() - ps.AddAddress(p.ID, p.Addr) - ps.AddPubKey(p.ID, p.PubKey) - ps.AddPrivKey(p.ID, p.PrivKey) - n, err := inet.NewNetwork(ctx, ps.Addresses(p.ID), p.ID, ps) - if err != nil { - t.Fatal(err) - } - return n -} - -func DivulgeAddresses(a, b inet.Network) { - id := a.LocalPeer() - addrs := a.Peerstore().Addresses(id) - b.Peerstore().AddAddresses(id, addrs) -} - func subtestIDService(t *testing.T, postDialWait time.Duration) { // the generated networks should have the id service wired in. ctx := context.Background() - n1 := GenNetwork(t, ctx) - n2 := GenNetwork(t, ctx) + n1 := netutil.GenNetwork(t, ctx) + n2 := netutil.GenNetwork(t, ctx) n1p := n1.LocalPeer() n2p := n2.LocalPeer() @@ -46,7 +27,7 @@ func subtestIDService(t *testing.T, postDialWait time.Duration) { testKnowsAddrs(t, n2, n1p, []ma.Multiaddr{}) // nothing // have n2 tell n1, so we can dial... - DivulgeAddresses(n2, n1) + netutil.DivulgeAddresses(n2, n1) testKnowsAddrs(t, n1, n2p, n2.Peerstore().Addresses(n2p)) // has them testKnowsAddrs(t, n2, n1p, []ma.Multiaddr{}) // nothing diff --git a/net/mux.go b/net/services/mux/mux.go similarity index 78% rename from net/mux.go rename to net/services/mux/mux.go index d2bb7dc22..a89c32678 100644 --- a/net/mux.go +++ b/net/services/mux/mux.go @@ -1,4 +1,4 @@ -package net +package mux import ( "fmt" @@ -6,11 +6,13 @@ import ( "sync" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + + inet "github.com/jbenet/go-ipfs/net" eventlog "github.com/jbenet/go-ipfs/util/eventlog" lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables" ) -var log = eventlog.Logger("network") +var log = eventlog.Logger("net/mux") // Mux provides simple stream multixplexing. // It helps you precisely when: @@ -30,16 +32,16 @@ var log = eventlog.Logger("network") // WARNING: this datastructure IS NOT threadsafe. // do not modify it once the network is using it. type Mux struct { - Default StreamHandler // handles unknown protocols. - Handlers StreamHandlerMap + Default inet.StreamHandler // handles unknown protocols. + Handlers inet.StreamHandlerMap sync.RWMutex } // Protocols returns the list of protocols this muxer has handlers for -func (m *Mux) Protocols() []ProtocolID { +func (m *Mux) Protocols() []inet.ProtocolID { m.RLock() - l := make([]ProtocolID, 0, len(m.Handlers)) + l := make([]inet.ProtocolID, 0, len(m.Handlers)) for p := range m.Handlers { l = append(l, p) } @@ -49,7 +51,7 @@ func (m *Mux) Protocols() []ProtocolID { // ReadProtocolHeader reads the stream and returns the next Handler function // according to the muxer encoding. -func (m *Mux) ReadProtocolHeader(s io.Reader) (string, StreamHandler, error) { +func (m *Mux) ReadProtocolHeader(s io.Reader) (string, inet.StreamHandler, error) { // log.Error("ReadProtocolHeader") name, err := ReadLengthPrefix(s) if err != nil { @@ -58,7 +60,7 @@ func (m *Mux) ReadProtocolHeader(s io.Reader) (string, StreamHandler, error) { // log.Debug("ReadProtocolHeader got:", name) m.RLock() - h, found := m.Handlers[ProtocolID(name)] + h, found := m.Handlers[inet.ProtocolID(name)] m.RUnlock() switch { @@ -80,7 +82,7 @@ func (m *Mux) String() string { // SetHandler sets the protocol handler on the Network's Muxer. // This operation is threadsafe. -func (m *Mux) SetHandler(p ProtocolID, h StreamHandler) { +func (m *Mux) SetHandler(p inet.ProtocolID, h inet.StreamHandler) { log.Debugf("%s setting handler for protocol: %s (%d)", m, p, len(p)) m.Lock() m.Handlers[p] = h @@ -88,7 +90,8 @@ func (m *Mux) SetHandler(p ProtocolID, h StreamHandler) { } // Handle reads the next name off the Stream, and calls a function -func (m *Mux) Handle(s Stream) { +func (m *Mux) Handle(s inet.Stream) { + ctx := context.Background() name, handler, err := m.ReadProtocolHeader(s) @@ -133,3 +136,14 @@ func WriteLengthPrefix(w io.Writer, name string) error { _, err := w.Write(s) return err } + +// WriteProtocolHeader defines how a protocol is written into the header of +// a stream. This is so the muxer can multiplex between services. +func WriteProtocolHeader(pr inet.ProtocolID, s inet.Stream) error { + if pr != "" { // only write proper protocol headers + if err := WriteLengthPrefix(s, string(pr)); err != nil { + return err + } + } + return nil +} diff --git a/net/mux_test.go b/net/services/mux/mux_test.go similarity index 86% rename from net/mux_test.go rename to net/services/mux/mux_test.go index 2dff54c33..ac6e54843 100644 --- a/net/mux_test.go +++ b/net/services/mux/mux_test.go @@ -1,8 +1,10 @@ -package net +package mux import ( "bytes" "testing" + + inet "github.com/jbenet/go-ipfs/net" ) var testCases = map[string]string{ @@ -28,13 +30,13 @@ func TestHandler(t *testing.T) { outs := make(chan string, 10) - h := func(n string) func(s Stream) { - return func(s Stream) { + h := func(n string) func(s inet.Stream) { + return func(s inet.Stream) { outs <- n } } - m := Mux{Handlers: StreamHandlerMap{}} + m := Mux{Handlers: inet.StreamHandlerMap{}} m.Default = h("default") m.Handlers["dht"] = h("bitswap") // m.Handlers["ipfs"] = h("bitswap") // default! diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index 5eeb3a2bc..4e49b8f96 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -14,7 +14,7 @@ import ( dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" - inet "github.com/jbenet/go-ipfs/net" + ipfsnet "github.com/jbenet/go-ipfs/net/ipfsnet" peer "github.com/jbenet/go-ipfs/peer" routing "github.com/jbenet/go-ipfs/routing" u "github.com/jbenet/go-ipfs/util" @@ -49,7 +49,7 @@ func setupDHT(ctx context.Context, t *testing.T, addr ma.Multiaddr) *IpfsDHT { peerstore.AddPubKey(p, pk) peerstore.AddAddress(p, addr) - n, err := inet.NewNetwork(ctx, []ma.Multiaddr{addr}, p, peerstore) + n, err := ipfsnet.NewNetwork(ctx, []ma.Multiaddr{addr}, p, peerstore) if err != nil { t.Fatal(err) }