From 735c3de7faab7b1b7f347850dfb74300655a560d Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Wed, 24 Dec 2014 11:16:17 -0800 Subject: [PATCH] relay service -- streams across peers this is the leadup into NAT traversal. note: doesn't work yet. hangs the test. --- net/ipfsnet/net.go | 11 ++- net/services/mux/mux.go | 11 ++- net/services/relay/relay.go | 159 +++++++++++++++++++++++++++++++ net/services/relay/relay_test.go | 101 ++++++++++++++++++++ 4 files changed, 277 insertions(+), 5 deletions(-) create mode 100644 net/services/relay/relay.go create mode 100644 net/services/relay/relay_test.go diff --git a/net/ipfsnet/net.go b/net/ipfsnet/net.go index bbd356ff9..cffe2c984 100644 --- a/net/ipfsnet/net.go +++ b/net/ipfsnet/net.go @@ -5,11 +5,13 @@ import ( "fmt" ic "github.com/jbenet/go-ipfs/crypto" + peer "github.com/jbenet/go-ipfs/peer" + inet "github.com/jbenet/go-ipfs/net" ids "github.com/jbenet/go-ipfs/net/services/identify" mux "github.com/jbenet/go-ipfs/net/services/mux" + relay "github.com/jbenet/go-ipfs/net/services/relay" swarm "github.com/jbenet/go-ipfs/net/swarm" - peer "github.com/jbenet/go-ipfs/peer" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup" @@ -105,6 +107,7 @@ type Network struct { swarm *swarm.Swarm // peer connection multiplexing mux mux.Mux // protocol multiplexing ids *ids.IDService + relay *relay.RelayService cg ctxgroup.ContextGroup // for Context closing } @@ -133,11 +136,13 @@ func NewNetwork(ctx context.Context, listen []ma.Multiaddr, local peer.ID, n.mux.Handle((*stream)(s)) }) - // setup a conn handler that immediately "asks the other side about them" - // this is ProtocolIdentify. + // setup ProtocolIdentify to immediately "asks the other side about them" n.ids = ids.NewIDService(n) s.SetConnHandler(n.newConnHandler) + // setup ProtocolRelay to allow traffic relaying. + // Feed things we get for ourselves into the muxer. + n.relay = relay.NewRelayService(n.cg.Context(), n, n.mux.HandleSync) return n, nil } diff --git a/net/services/mux/mux.go b/net/services/mux/mux.go index a89c32678..52913a56e 100644 --- a/net/services/mux/mux.go +++ b/net/services/mux/mux.go @@ -89,9 +89,16 @@ func (m *Mux) SetHandler(p inet.ProtocolID, h inet.StreamHandler) { m.Unlock() } -// Handle reads the next name off the Stream, and calls a function +// Handle reads the next name off the Stream, and calls a handler function +// This is done in its own goroutine, to avoid blocking the caller. func (m *Mux) Handle(s inet.Stream) { + go m.HandleSync(s) +} +// HandleSync reads the next name off the Stream, and calls a handler function +// This is done synchronously. The handler function will return before +// HandleSync returns. +func (m *Mux) HandleSync(s inet.Stream) { ctx := context.Background() name, handler, err := m.ReadProtocolHeader(s) @@ -102,7 +109,7 @@ func (m *Mux) Handle(s inet.Stream) { return } - log.Info("muxer handle protocol: %s", name) + log.Infof("muxer handle protocol: %s", name) log.Event(ctx, "muxHandle", eventlog.Metadata{"protocol": name}) handler(s) } diff --git a/net/services/relay/relay.go b/net/services/relay/relay.go new file mode 100644 index 000000000..c23647832 --- /dev/null +++ b/net/services/relay/relay.go @@ -0,0 +1,159 @@ +package relay + +import ( + "fmt" + "io" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup" + mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash" + + inet "github.com/jbenet/go-ipfs/net" + peer "github.com/jbenet/go-ipfs/peer" + eventlog "github.com/jbenet/go-ipfs/util/eventlog" +) + +var log = eventlog.Logger("relay") + +// ProtocolRelay is the ProtocolID of the Relay Service. +const ProtocolRelay inet.ProtocolID = "/ipfs/relay" + +// Relay is a structure that implements ProtocolRelay. +// It is a simple relay service which forwards traffic +// between two directly connected peers. +// +// the protocol is very simple: +// +// /ipfs/relay\n +// +// +// +// +type RelayService struct { + Network inet.Network + handler inet.StreamHandler // for streams sent to us locally. + + cg ctxgroup.ContextGroup +} + +func NewRelayService(ctx context.Context, n inet.Network, sh inet.StreamHandler) *RelayService { + s := &RelayService{ + Network: n, + handler: sh, + cg: ctxgroup.WithContext(ctx), + } + n.SetHandler(inet.ProtocolRelay, s.requestHandler) + return s +} + +// requestHandler is the function called by clients +func (rs *RelayService) requestHandler(s inet.Stream) { + if err := rs.handleStream(s); err != nil { + log.Error("RelayService error:", err) + } +} + +// handleStream is our own handler, which returns an error for simplicity. +func (rs *RelayService) handleStream(s inet.Stream) error { + defer s.Close() + + // read the header (src and dst peer.IDs) + src, dst, err := ReadHeader(s) + if err != nil { + return fmt.Errorf("stream with bad header: %s", err) + } + + local := rs.Network.LocalPeer() + + switch { + case src == local: + return fmt.Errorf("relaying from self") + case dst == local: // it's for us! yaaay. + log.Debugf("%s consuming stream from %s", rs.Network.LocalPeer(), src) + return rs.consumeStream(s) + default: // src and dst are not local. relay it. + log.Debugf("%s relaying stream %s <--> %s", rs.Network.LocalPeer(), src, dst) + return rs.pipeStream(src, dst, s) + } +} + +// consumeStream connects streams directed to the local peer +// to our handler, with the header now stripped (read). +func (rs *RelayService) consumeStream(s inet.Stream) error { + rs.handler(s) // boom. + return nil +} + +// pipeStream relays over a stream to a remote peer. It's like `cat` +func (rs *RelayService) pipeStream(src, dst peer.ID, s inet.Stream) error { + s2, err := rs.openStreamToPeer(dst) + if err != nil { + return fmt.Errorf("failed to open stream to peer: %s -- %s", dst, err) + } + + if err := WriteHeader(s2, src, dst); err != nil { + return err + } + + // connect the series of tubes. + done := make(chan retio, 2) + go func() { + n, err := io.Copy(s2, s) + done <- retio{n, err} + }() + go func() { + n, err := io.Copy(s, s2) + done <- retio{n, err} + }() + + r1 := <-done + r2 := <-done + log.Infof("relayed %d/%d bytes between %s and %s", r1.n, r2.n, src, dst) + + if r1.err != nil { + return r1.err + } + return r2.err +} + +// openStreamToPeer opens a pipe to a remote endpoint +// for now, can only open streams to directly connected peers. +// maybe we can do some routing later on. +func (rs *RelayService) openStreamToPeer(p peer.ID) (inet.Stream, error) { + return rs.Network.NewStream(ProtocolRelay, p) +} + +func ReadHeader(r io.Reader) (src, dst peer.ID, err error) { + + mhr := mh.NewReader(r) + + s, err := mhr.ReadMultihash() + if err != nil { + return "", "", err + } + + d, err := mhr.ReadMultihash() + if err != nil { + return "", "", err + } + + return peer.ID(s), peer.ID(d), nil +} + +func WriteHeader(w io.Writer, src, dst peer.ID) error { + // write header to w. + mhw := mh.NewWriter(w) + if err := mhw.WriteMultihash(mh.Multihash(src)); err != nil { + return fmt.Errorf("failed to write relay header: %s -- %s", dst, err) + } + if err := mhw.WriteMultihash(mh.Multihash(dst)); err != nil { + return fmt.Errorf("failed to write relay header: %s -- %s", dst, err) + } + + return nil +} + +type retio struct { + n int64 + err error +} diff --git a/net/services/relay/relay_test.go b/net/services/relay/relay_test.go new file mode 100644 index 000000000..57eee6db1 --- /dev/null +++ b/net/services/relay/relay_test.go @@ -0,0 +1,101 @@ +package relay_test + +import ( + "io" + "testing" + + inet "github.com/jbenet/go-ipfs/net" + netutil "github.com/jbenet/go-ipfs/net/ipfsnet/util" + mux "github.com/jbenet/go-ipfs/net/services/mux" + relay "github.com/jbenet/go-ipfs/net/services/relay" + eventlog "github.com/jbenet/go-ipfs/util/eventlog" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" +) + +var log = eventlog.Logger("relay_test") + +func TestRelaySimple(t *testing.T) { + + ctx := context.Background() + + // these networks have the relay service wired in already. + n1 := netutil.GenNetwork(t, ctx) + n2 := netutil.GenNetwork(t, ctx) + n3 := netutil.GenNetwork(t, ctx) + + n1p := n1.LocalPeer() + n2p := n2.LocalPeer() + n3p := n3.LocalPeer() + + netutil.DivulgeAddresses(n2, n1) + netutil.DivulgeAddresses(n2, n3) + + if err := n1.DialPeer(ctx, n2p); err != nil { + t.Fatalf("Failed to dial:", err) + } + if err := n3.DialPeer(ctx, n2p); err != nil { + t.Fatalf("Failed to dial:", err) + } + + // setup handler on n3 to copy everything over to the pipe. + piper, pipew := io.Pipe() + n3.SetHandler(inet.ProtocolTesting, func(s inet.Stream) { + log.Debug("relay stream opened to n3!") + log.Debug("piping and echoing everything") + w := io.MultiWriter(s, pipew) + io.Copy(w, s) + log.Debug("closing stream") + s.Close() + }) + + // ok, now we can try to relay n1--->n2--->n3. + log.Debug("open relay stream") + s, err := n1.NewStream(relay.ProtocolRelay, n2p) + if err != nil { + t.Fatal(err) + } + + // ok first thing we write the relay header n1->n3 + log.Debug("write relay header") + if err := relay.WriteHeader(s, n1p, n3p); err != nil { + t.Fatal(err) + } + + // ok now the header's there, we can write the next protocol header. + log.Debug("write testing header") + if err := mux.WriteProtocolHeader(inet.ProtocolTesting, s); err != nil { + t.Fatal(err) + } + + // okay, now we should be able to write text, and read it out. + buf1 := []byte("abcdefghij") + buf2 := make([]byte, 10) + buf3 := make([]byte, 10) + log.Debug("write in some text.") + if _, err := s.Write(buf1); err != nil { + t.Fatal(err) + } + + // read it out from the pipe. + log.Debug("read it out from the pipe.") + if _, err := io.ReadFull(piper, buf2); err != nil { + t.Fatal(err) + } + if string(buf1) != string(buf2) { + t.Fatal("should've gotten that text out of the pipe") + } + + // read it out from the stream (echoed) + log.Debug("read it out from the stream (echoed).") + if _, err := io.ReadFull(s, buf3); err != nil { + t.Fatal(err) + } + if string(buf1) != string(buf3) { + t.Fatal("should've gotten that text out of the stream") + } + + // sweet. relay works. + log.Debug("sweet, relay works.") + s.Close() +}