1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-29 09:34:03 +08:00

relay service -- streams across peers

this is the leadup into NAT traversal.
note: doesn't work yet. hangs the test.
This commit is contained in:
Juan Batiz-Benet
2014-12-24 11:16:17 -08:00
parent 4d08eb0140
commit 735c3de7fa
4 changed files with 277 additions and 5 deletions

View File

@ -5,11 +5,13 @@ import (
"fmt"
ic "github.com/jbenet/go-ipfs/crypto"
peer "github.com/jbenet/go-ipfs/peer"
inet "github.com/jbenet/go-ipfs/net"
ids "github.com/jbenet/go-ipfs/net/services/identify"
mux "github.com/jbenet/go-ipfs/net/services/mux"
relay "github.com/jbenet/go-ipfs/net/services/relay"
swarm "github.com/jbenet/go-ipfs/net/swarm"
peer "github.com/jbenet/go-ipfs/peer"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
@ -105,6 +107,7 @@ type Network struct {
swarm *swarm.Swarm // peer connection multiplexing
mux mux.Mux // protocol multiplexing
ids *ids.IDService
relay *relay.RelayService
cg ctxgroup.ContextGroup // for Context closing
}
@ -133,11 +136,13 @@ func NewNetwork(ctx context.Context, listen []ma.Multiaddr, local peer.ID,
n.mux.Handle((*stream)(s))
})
// setup a conn handler that immediately "asks the other side about them"
// this is ProtocolIdentify.
// setup ProtocolIdentify to immediately "asks the other side about them"
n.ids = ids.NewIDService(n)
s.SetConnHandler(n.newConnHandler)
// setup ProtocolRelay to allow traffic relaying.
// Feed things we get for ourselves into the muxer.
n.relay = relay.NewRelayService(n.cg.Context(), n, n.mux.HandleSync)
return n, nil
}

View File

@ -89,9 +89,16 @@ func (m *Mux) SetHandler(p inet.ProtocolID, h inet.StreamHandler) {
m.Unlock()
}
// Handle reads the next name off the Stream, and calls a function
// Handle reads the next name off the Stream, and calls a handler function
// This is done in its own goroutine, to avoid blocking the caller.
func (m *Mux) Handle(s inet.Stream) {
go m.HandleSync(s)
}
// HandleSync reads the next name off the Stream, and calls a handler function
// This is done synchronously. The handler function will return before
// HandleSync returns.
func (m *Mux) HandleSync(s inet.Stream) {
ctx := context.Background()
name, handler, err := m.ReadProtocolHeader(s)
@ -102,7 +109,7 @@ func (m *Mux) Handle(s inet.Stream) {
return
}
log.Info("muxer handle protocol: %s", name)
log.Infof("muxer handle protocol: %s", name)
log.Event(ctx, "muxHandle", eventlog.Metadata{"protocol": name})
handler(s)
}

159
net/services/relay/relay.go Normal file
View File

@ -0,0 +1,159 @@
package relay
import (
"fmt"
"io"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
inet "github.com/jbenet/go-ipfs/net"
peer "github.com/jbenet/go-ipfs/peer"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
)
var log = eventlog.Logger("relay")
// ProtocolRelay is the ProtocolID of the Relay Service.
const ProtocolRelay inet.ProtocolID = "/ipfs/relay"
// Relay is a structure that implements ProtocolRelay.
// It is a simple relay service which forwards traffic
// between two directly connected peers.
//
// the protocol is very simple:
//
// /ipfs/relay\n
// <multihash src id>
// <multihash dst id>
// <data stream>
//
type RelayService struct {
Network inet.Network
handler inet.StreamHandler // for streams sent to us locally.
cg ctxgroup.ContextGroup
}
func NewRelayService(ctx context.Context, n inet.Network, sh inet.StreamHandler) *RelayService {
s := &RelayService{
Network: n,
handler: sh,
cg: ctxgroup.WithContext(ctx),
}
n.SetHandler(inet.ProtocolRelay, s.requestHandler)
return s
}
// requestHandler is the function called by clients
func (rs *RelayService) requestHandler(s inet.Stream) {
if err := rs.handleStream(s); err != nil {
log.Error("RelayService error:", err)
}
}
// handleStream is our own handler, which returns an error for simplicity.
func (rs *RelayService) handleStream(s inet.Stream) error {
defer s.Close()
// read the header (src and dst peer.IDs)
src, dst, err := ReadHeader(s)
if err != nil {
return fmt.Errorf("stream with bad header: %s", err)
}
local := rs.Network.LocalPeer()
switch {
case src == local:
return fmt.Errorf("relaying from self")
case dst == local: // it's for us! yaaay.
log.Debugf("%s consuming stream from %s", rs.Network.LocalPeer(), src)
return rs.consumeStream(s)
default: // src and dst are not local. relay it.
log.Debugf("%s relaying stream %s <--> %s", rs.Network.LocalPeer(), src, dst)
return rs.pipeStream(src, dst, s)
}
}
// consumeStream connects streams directed to the local peer
// to our handler, with the header now stripped (read).
func (rs *RelayService) consumeStream(s inet.Stream) error {
rs.handler(s) // boom.
return nil
}
// pipeStream relays over a stream to a remote peer. It's like `cat`
func (rs *RelayService) pipeStream(src, dst peer.ID, s inet.Stream) error {
s2, err := rs.openStreamToPeer(dst)
if err != nil {
return fmt.Errorf("failed to open stream to peer: %s -- %s", dst, err)
}
if err := WriteHeader(s2, src, dst); err != nil {
return err
}
// connect the series of tubes.
done := make(chan retio, 2)
go func() {
n, err := io.Copy(s2, s)
done <- retio{n, err}
}()
go func() {
n, err := io.Copy(s, s2)
done <- retio{n, err}
}()
r1 := <-done
r2 := <-done
log.Infof("relayed %d/%d bytes between %s and %s", r1.n, r2.n, src, dst)
if r1.err != nil {
return r1.err
}
return r2.err
}
// openStreamToPeer opens a pipe to a remote endpoint
// for now, can only open streams to directly connected peers.
// maybe we can do some routing later on.
func (rs *RelayService) openStreamToPeer(p peer.ID) (inet.Stream, error) {
return rs.Network.NewStream(ProtocolRelay, p)
}
func ReadHeader(r io.Reader) (src, dst peer.ID, err error) {
mhr := mh.NewReader(r)
s, err := mhr.ReadMultihash()
if err != nil {
return "", "", err
}
d, err := mhr.ReadMultihash()
if err != nil {
return "", "", err
}
return peer.ID(s), peer.ID(d), nil
}
func WriteHeader(w io.Writer, src, dst peer.ID) error {
// write header to w.
mhw := mh.NewWriter(w)
if err := mhw.WriteMultihash(mh.Multihash(src)); err != nil {
return fmt.Errorf("failed to write relay header: %s -- %s", dst, err)
}
if err := mhw.WriteMultihash(mh.Multihash(dst)); err != nil {
return fmt.Errorf("failed to write relay header: %s -- %s", dst, err)
}
return nil
}
type retio struct {
n int64
err error
}

View File

@ -0,0 +1,101 @@
package relay_test
import (
"io"
"testing"
inet "github.com/jbenet/go-ipfs/net"
netutil "github.com/jbenet/go-ipfs/net/ipfsnet/util"
mux "github.com/jbenet/go-ipfs/net/services/mux"
relay "github.com/jbenet/go-ipfs/net/services/relay"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
var log = eventlog.Logger("relay_test")
func TestRelaySimple(t *testing.T) {
ctx := context.Background()
// these networks have the relay service wired in already.
n1 := netutil.GenNetwork(t, ctx)
n2 := netutil.GenNetwork(t, ctx)
n3 := netutil.GenNetwork(t, ctx)
n1p := n1.LocalPeer()
n2p := n2.LocalPeer()
n3p := n3.LocalPeer()
netutil.DivulgeAddresses(n2, n1)
netutil.DivulgeAddresses(n2, n3)
if err := n1.DialPeer(ctx, n2p); err != nil {
t.Fatalf("Failed to dial:", err)
}
if err := n3.DialPeer(ctx, n2p); err != nil {
t.Fatalf("Failed to dial:", err)
}
// setup handler on n3 to copy everything over to the pipe.
piper, pipew := io.Pipe()
n3.SetHandler(inet.ProtocolTesting, func(s inet.Stream) {
log.Debug("relay stream opened to n3!")
log.Debug("piping and echoing everything")
w := io.MultiWriter(s, pipew)
io.Copy(w, s)
log.Debug("closing stream")
s.Close()
})
// ok, now we can try to relay n1--->n2--->n3.
log.Debug("open relay stream")
s, err := n1.NewStream(relay.ProtocolRelay, n2p)
if err != nil {
t.Fatal(err)
}
// ok first thing we write the relay header n1->n3
log.Debug("write relay header")
if err := relay.WriteHeader(s, n1p, n3p); err != nil {
t.Fatal(err)
}
// ok now the header's there, we can write the next protocol header.
log.Debug("write testing header")
if err := mux.WriteProtocolHeader(inet.ProtocolTesting, s); err != nil {
t.Fatal(err)
}
// okay, now we should be able to write text, and read it out.
buf1 := []byte("abcdefghij")
buf2 := make([]byte, 10)
buf3 := make([]byte, 10)
log.Debug("write in some text.")
if _, err := s.Write(buf1); err != nil {
t.Fatal(err)
}
// read it out from the pipe.
log.Debug("read it out from the pipe.")
if _, err := io.ReadFull(piper, buf2); err != nil {
t.Fatal(err)
}
if string(buf1) != string(buf2) {
t.Fatal("should've gotten that text out of the pipe")
}
// read it out from the stream (echoed)
log.Debug("read it out from the stream (echoed).")
if _, err := io.ReadFull(s, buf3); err != nil {
t.Fatal(err)
}
if string(buf1) != string(buf3) {
t.Fatal("should've gotten that text out of the stream")
}
// sweet. relay works.
log.Debug("sweet, relay works.")
s.Close()
}