From 7fcb5d3a4bb01d7098081c4ea0dfd3088eee80b2 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Fri, 12 Sep 2014 20:51:57 -0700 Subject: [PATCH] feat(bs:net) impl service wrapper --- bitswap/bitswap.go | 26 +++++++--- bitswap/message/Makefile | 8 ++++ bitswap/message/message.go | 7 +++ bitswap/network/interface.go | 20 ++++++++ bitswap/network/service_wrapper.go | 76 ++++++++++++++++++++++++++++++ bitswap/receiver.go | 29 ++++++++++++ bitswap/receiver_test.go | 13 +++++ 7 files changed, 173 insertions(+), 6 deletions(-) create mode 100644 bitswap/message/Makefile create mode 100644 bitswap/network/interface.go create mode 100644 bitswap/network/service_wrapper.go create mode 100644 bitswap/receiver.go create mode 100644 bitswap/receiver_test.go diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index 56b3db955..5882d7c0b 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -1,12 +1,14 @@ package bitswap import ( + "errors" "time" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go" bsmsg "github.com/jbenet/go-ipfs/bitswap/message" + bsnet "github.com/jbenet/go-ipfs/bitswap/network" notifications "github.com/jbenet/go-ipfs/bitswap/notifications" blocks "github.com/jbenet/go-ipfs/blocks" swarm "github.com/jbenet/go-ipfs/net/swarm" @@ -31,6 +33,7 @@ type BitSwap struct { peer *peer.Peer // net holds the connections to all peers. + sender bsnet.Sender net swarm.Network meschan *swarm.Chan @@ -61,17 +64,22 @@ type BitSwap struct { // NewBitSwap creates a new BitSwap instance. It does not check its parameters. func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore, r routing.IpfsRouting) *BitSwap { + receiver := receiver{} + sender := bsnet.NewBSNetService(context.Background(), &receiver) bs := &BitSwap{ - peer: p, - net: net, - datastore: d, - partners: LedgerMap{}, - wantList: KeySet{}, - routing: r.(*dht.IpfsDHT), + peer: p, + net: net, + datastore: d, + partners: LedgerMap{}, + wantList: KeySet{}, + routing: r.(*dht.IpfsDHT), + // TODO(brian): replace |meschan| with |sender| in BitSwap impl meschan: net.GetChannel(swarm.PBWrapper_BITSWAP), + sender: sender, haltChan: make(chan struct{}), notifications: notifications.New(), } + receiver.Delegate(bs) go bs.handleMessages() return bs @@ -274,3 +282,9 @@ func (bs *BitSwap) SetStrategy(sf StrategyFunc) { ledger.Strategy = sf } } + +func (r *BitSwap) ReceiveMessage( + ctx context.Context, incoming bsmsg.BitSwapMessage) ( + bsmsg.BitSwapMessage, *peer.Peer, error) { + return nil, nil, errors.New("TODO implement") +} diff --git a/bitswap/message/Makefile b/bitswap/message/Makefile new file mode 100644 index 000000000..5bbebea07 --- /dev/null +++ b/bitswap/message/Makefile @@ -0,0 +1,8 @@ +# TODO(brian): add proto tasks +all: message.pb.go + +message.pb.go: message.proto + protoc --gogo_out=. --proto_path=../../../../../:/usr/local/opt/protobuf/include:. $< + +clean: + rm message.pb.go diff --git a/bitswap/message/message.go b/bitswap/message/message.go index d5ad94643..91bf957af 100644 --- a/bitswap/message/message.go +++ b/bitswap/message/message.go @@ -1,7 +1,10 @@ package message import ( + "errors" + proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" + netmsg "github.com/jbenet/go-ipfs/net/message" blocks "github.com/jbenet/go-ipfs/blocks" nm "github.com/jbenet/go-ipfs/net/message" @@ -55,6 +58,10 @@ func (m *message) AppendBlock(b *blocks.Block) { m.pb.Blocks = append(m.pb.Blocks, b.Data) } +func FromNet(nmsg netmsg.NetMessage) (BitSwapMessage, error) { + return nil, errors.New("TODO implement") +} + func FromSwarm(sms swarm.Message) (BitSwapMessage, error) { var protoMsg PBMessage err := proto.Unmarshal(sms.Data, &protoMsg) diff --git a/bitswap/network/interface.go b/bitswap/network/interface.go new file mode 100644 index 000000000..99f63e29b --- /dev/null +++ b/bitswap/network/interface.go @@ -0,0 +1,20 @@ +package network + +import ( + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + + bsmsg "github.com/jbenet/go-ipfs/bitswap/message" + peer "github.com/jbenet/go-ipfs/peer" +) + +type Sender interface { + SendMessage(ctx context.Context, destination *peer.Peer, message bsmsg.Exportable) error + SendRequest(ctx context.Context, destination *peer.Peer, outgoing bsmsg.Exportable) ( + incoming bsmsg.BitSwapMessage, err error) +} + +// TODO(brian): consider returning a NetMessage +type Receiver interface { + ReceiveMessage(ctx context.Context, incoming bsmsg.BitSwapMessage) ( + outgoing bsmsg.BitSwapMessage, destination *peer.Peer, err error) +} diff --git a/bitswap/network/service_wrapper.go b/bitswap/network/service_wrapper.go new file mode 100644 index 000000000..a9e1c3684 --- /dev/null +++ b/bitswap/network/service_wrapper.go @@ -0,0 +1,76 @@ +package network + +import ( + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + + bsmsg "github.com/jbenet/go-ipfs/bitswap/message" + netmsg "github.com/jbenet/go-ipfs/net/message" + netservice "github.com/jbenet/go-ipfs/net/service" + peer "github.com/jbenet/go-ipfs/peer" +) + +func NewBSNetService(ctx context.Context, r Receiver) Sender { + h := &handlerWrapper{r} + s := netservice.NewService(ctx, h) + return &serviceWrapper{*s} +} + +// handlerWrapper is responsible for marshaling/unmarshaling NetMessages. It +// delegates calls to the BitSwap delegate. +type handlerWrapper struct { + bitswapDelegate Receiver +} + +// HandleMessage marshals and unmarshals net messages, forwarding them to the +// BitSwapMessage receiver +func (wrapper *handlerWrapper) HandleMessage( + ctx context.Context, incoming netmsg.NetMessage) (netmsg.NetMessage, error) { + + received, err := bsmsg.FromNet(incoming) + if err != nil { + return nil, err + } + + bsmsg, p, err := wrapper.bitswapDelegate.ReceiveMessage(ctx, received) + if err != nil { + return nil, err + } + if bsmsg == nil { + return nil, nil + } + + outgoing, err := bsmsg.ToNet(p) + if err != nil { + return nil, err + } + + return outgoing, nil +} + +type serviceWrapper struct { + serviceDelegate netservice.Service +} + +func (wrapper *serviceWrapper) SendMessage( + ctx context.Context, p *peer.Peer, outgoing bsmsg.Exportable) error { + nmsg, err := outgoing.ToNet(p) + if err != nil { + return err + } + req, err := netservice.NewRequest(p.ID) + return wrapper.serviceDelegate.SendMessage(ctx, nmsg, req.ID) +} + +func (wrapper *serviceWrapper) SendRequest(ctx context.Context, + p *peer.Peer, outgoing bsmsg.Exportable) (bsmsg.BitSwapMessage, error) { + + outgoingMsg, err := outgoing.ToNet(p) + if err != nil { + return nil, err + } + incomingMsg, err := wrapper.serviceDelegate.SendRequest(ctx, outgoingMsg) + if err != nil { + return nil, err + } + return bsmsg.FromNet(incomingMsg) +} diff --git a/bitswap/receiver.go b/bitswap/receiver.go new file mode 100644 index 000000000..94a53dd76 --- /dev/null +++ b/bitswap/receiver.go @@ -0,0 +1,29 @@ +package bitswap + +import ( + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + bsmsg "github.com/jbenet/go-ipfs/bitswap/message" + bsnet "github.com/jbenet/go-ipfs/bitswap/network" + peer "github.com/jbenet/go-ipfs/peer" +) + +// receiver breaks the circular dependency between bitswap and its sender +// NB: A sender is instantiated with a handler and this sender is then passed +// as a constructor argument to BitSwap. However, the handler is BitSwap! +// Hence, this receiver. +type receiver struct { + delegate bsnet.Receiver +} + +func (r *receiver) ReceiveMessage( + ctx context.Context, incoming bsmsg.BitSwapMessage) ( + bsmsg.BitSwapMessage, *peer.Peer, error) { + if r.delegate == nil { + return nil, nil, nil + } + return r.delegate.ReceiveMessage(ctx, incoming) +} + +func (r *receiver) Delegate(delegate bsnet.Receiver) { + r.delegate = delegate +} diff --git a/bitswap/receiver_test.go b/bitswap/receiver_test.go new file mode 100644 index 000000000..c82eacd51 --- /dev/null +++ b/bitswap/receiver_test.go @@ -0,0 +1,13 @@ +package bitswap + +import ( + "testing" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + bsmsg "github.com/jbenet/go-ipfs/bitswap/message" +) + +func TestDoesntPanicIfDelegateNotPresent(t *testing.T) { + r := receiver{} + r.ReceiveMessage(context.Background(), bsmsg.New()) +}