From 014157cac6ab3cc249e0a39c156a27b7a2e08612 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Tue, 16 Sep 2014 02:05:24 -0700 Subject: [PATCH] refac(bitswap) simply network interfaces --- bitswap/bitswap.go | 19 ++++------ bitswap/network/forwarder.go | 12 +++--- bitswap/network/forwarder_test.go | 14 ++++++- bitswap/network/interface.go | 29 +++++++++++---- bitswap/network/network_adapter.go | 60 +++++++++++++++++++----------- 5 files changed, 85 insertions(+), 49 deletions(-) diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index ead1d3283..6424f97ac 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -8,10 +8,9 @@ import ( ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go" bsmsg "github.com/jbenet/go-ipfs/bitswap/message" + bsnet "github.com/jbenet/go-ipfs/bitswap/network" notifications "github.com/jbenet/go-ipfs/bitswap/notifications" - tx "github.com/jbenet/go-ipfs/bitswap/transmission" blocks "github.com/jbenet/go-ipfs/blocks" - net "github.com/jbenet/go-ipfs/net" peer "github.com/jbenet/go-ipfs/peer" routing "github.com/jbenet/go-ipfs/routing" u "github.com/jbenet/go-ipfs/util" @@ -34,7 +33,7 @@ type BitSwap struct { peer *peer.Peer // sender delivers messages on behalf of the session - sender tx.Sender + sender bsnet.NetworkAdapter // datastore is the local database // Ledgers of known datastore ds.Datastore @@ -62,21 +61,16 @@ type BitSwap struct { } // NewSession initializes a bitswap session. -func NewSession(parent context.Context, s net.Sender, p *peer.Peer, d ds.Datastore, r routing.IpfsRouting) *BitSwap { +func NewSession(parent context.Context, s bsnet.NetworkService, p *peer.Peer, d ds.Datastore, r routing.IpfsRouting) *BitSwap { - // TODO(brian): define a contract for management of async operations that - // fall under bitswap's purview - // ctx, _ := context.WithCancel(parent) - - receiver := tx.Forwarder{} - sender := tx.NewSender(s) + receiver := bsnet.Forwarder{} bs := &BitSwap{ peer: p, datastore: d, partners: LedgerMap{}, wantList: KeySet{}, routing: r, - sender: sender, + sender: bsnet.NewNetworkAdapter(s, &receiver), haltChan: make(chan struct{}), notifications: notifications.New(), strategy: YesManStrategy, @@ -246,7 +240,7 @@ func (bs *BitSwap) Halt() { func (bs *BitSwap) ReceiveMessage( ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) ( - bsmsg.BitSwapMessage, *peer.Peer, error) { + *peer.Peer, bsmsg.BitSwapMessage, error) { if incoming.Blocks() != nil { for _, block := range incoming.Blocks() { go bs.blockReceive(sender, block) @@ -255,6 +249,7 @@ func (bs *BitSwap) ReceiveMessage( if incoming.Wantlist() != nil { for _, want := range incoming.Wantlist() { + // TODO(brian): return the block synchronously go bs.peerWantsBlock(sender, want) } } diff --git a/bitswap/network/forwarder.go b/bitswap/network/forwarder.go index ab2fc6a08..f4eba0c14 100644 --- a/bitswap/network/forwarder.go +++ b/bitswap/network/forwarder.go @@ -1,4 +1,4 @@ -package transmission +package network import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" @@ -6,17 +6,17 @@ import ( peer "github.com/jbenet/go-ipfs/peer" ) -// Forwarder breaks the circular dependency between bitswap and its sender -// NB: A sender is instantiated with a handler and this sender is then passed -// as a constructor argument to BitSwap. However, the handler is BitSwap! -// Hence, this receiver. +// Forwarder receives messages and forwards them to the delegate. +// +// Forwarder breaks the circular dependency between the BitSwap Session and the +// Network Service. type Forwarder struct { delegate Receiver } func (r *Forwarder) ReceiveMessage( ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) ( - bsmsg.BitSwapMessage, *peer.Peer, error) { + *peer.Peer, bsmsg.BitSwapMessage, error) { if r.delegate == nil { return nil, nil, nil } diff --git a/bitswap/network/forwarder_test.go b/bitswap/network/forwarder_test.go index f17ebb147..accc2c781 100644 --- a/bitswap/network/forwarder_test.go +++ b/bitswap/network/forwarder_test.go @@ -1,4 +1,4 @@ -package transmission +package network import ( "testing" @@ -13,4 +13,14 @@ func TestDoesntPanicIfDelegateNotPresent(t *testing.T) { fwdr.ReceiveMessage(context.Background(), &peer.Peer{}, bsmsg.New()) } -// TODO(brian): func TestForwardsMessageToDelegate(t *testing.T) +func TestForwardsMessageToDelegate(t *testing.T) { + fwdr := Forwarder{delegate: &EchoDelegate{}} + fwdr.ReceiveMessage(context.Background(), &peer.Peer{}, bsmsg.New()) +} + +type EchoDelegate struct{} + +func (d *EchoDelegate) ReceiveMessage(ctx context.Context, p *peer.Peer, + incoming bsmsg.BitSwapMessage) (*peer.Peer, bsmsg.BitSwapMessage, error) { + return p, incoming, nil +} diff --git a/bitswap/network/interface.go b/bitswap/network/interface.go index 5bd333ae4..89157b7a8 100644 --- a/bitswap/network/interface.go +++ b/bitswap/network/interface.go @@ -1,23 +1,38 @@ -package transmission +package network import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + netservice "github.com/jbenet/go-ipfs/net/service" bsmsg "github.com/jbenet/go-ipfs/bitswap/message" + netmsg "github.com/jbenet/go-ipfs/net/message" peer "github.com/jbenet/go-ipfs/peer" ) -type Sender interface { - SendMessage(ctx context.Context, destination *peer.Peer, message bsmsg.Exportable) error - SendRequest(ctx context.Context, destination *peer.Peer, outgoing bsmsg.Exportable) ( - incoming bsmsg.BitSwapMessage, err error) +// NetworkAdapter mediates the exchange's communication with the network. +type NetworkAdapter interface { + + // SendMessage sends a BitSwap message to a peer. + SendMessage( + context.Context, + *peer.Peer, + bsmsg.BitSwapMessage) error + + // SendRequest sends a BitSwap message to a peer and waits for a response. + SendRequest( + context.Context, + *peer.Peer, + bsmsg.BitSwapMessage) (incoming bsmsg.BitSwapMessage, err error) + + // SetDelegate registers the Reciver to handle messages received from the + // network. + SetDelegate(Receiver) } -// TODO(brian): consider returning a NetMessage type Receiver interface { ReceiveMessage( ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) ( - outgoing bsmsg.BitSwapMessage, destination *peer.Peer, err error) + destination *peer.Peer, outgoing bsmsg.BitSwapMessage, err error) } // TODO(brian): move this to go-ipfs/net package diff --git a/bitswap/network/network_adapter.go b/bitswap/network/network_adapter.go index 64ca9f275..f4b0a1937 100644 --- a/bitswap/network/network_adapter.go +++ b/bitswap/network/network_adapter.go @@ -1,43 +1,54 @@ -package transmission +package network import ( + "errors" + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" bsmsg "github.com/jbenet/go-ipfs/bitswap/message" - net "github.com/jbenet/go-ipfs/net" netmsg "github.com/jbenet/go-ipfs/net/message" peer "github.com/jbenet/go-ipfs/peer" ) -// NewSender wraps the net.service.Sender to perform translation between +// NewSender wraps a network Service to perform translation between // BitSwapMessage and NetMessage formats. This allows the BitSwap session to // ignore these details. -func NewSender(s net.Sender) Sender { - return &senderWrapper{s} +func NewNetworkAdapter(s NetworkService, r Receiver) NetworkAdapter { + adapter := networkAdapter{ + networkService: s, + receiver: r, + } + s.SetHandler(&adapter) + return &adapter } -// handlerWrapper implements the net.service.Handler interface. It is -// responsible for converting between -// delegates calls to the BitSwap delegate. -type handlerWrapper struct { - bitswapDelegate Receiver +// networkAdapter implements NetworkAdapter +type networkAdapter struct { + networkService NetworkService + receiver Receiver } // HandleMessage marshals and unmarshals net messages, forwarding them to the // BitSwapMessage receiver -func (wrapper *handlerWrapper) HandleMessage( +func (adapter *networkAdapter) HandleMessage( ctx context.Context, incoming netmsg.NetMessage) (netmsg.NetMessage, error) { + if adapter.receiver == nil { + return nil, errors.New("No receiver. NetMessage dropped") + } + received, err := bsmsg.FromNet(incoming) if err != nil { return nil, err } - bsmsg, p, err := wrapper.bitswapDelegate.ReceiveMessage(ctx, incoming.Peer(), received) + p, bsmsg, err := adapter.receiver.ReceiveMessage(ctx, incoming.Peer(), received) if err != nil { return nil, err } - if bsmsg == nil { + + // TODO(brian): put this in a helper function + if bsmsg == nil || p == nil { return nil, nil } @@ -49,29 +60,34 @@ func (wrapper *handlerWrapper) HandleMessage( return outgoing, nil } -type senderWrapper struct { - serviceDelegate net.Sender -} +func (adapter *networkAdapter) SendMessage( + ctx context.Context, + p *peer.Peer, + outgoing bsmsg.BitSwapMessage) error { -func (wrapper *senderWrapper) SendMessage( - ctx context.Context, p *peer.Peer, outgoing bsmsg.Exportable) error { nmsg, err := outgoing.ToNet(p) if err != nil { return err } - return wrapper.serviceDelegate.SendMessage(ctx, nmsg) + return adapter.networkService.SendMessage(ctx, nmsg) } -func (wrapper *senderWrapper) SendRequest(ctx context.Context, - p *peer.Peer, outgoing bsmsg.Exportable) (bsmsg.BitSwapMessage, error) { +func (adapter *networkAdapter) SendRequest( + ctx context.Context, + p *peer.Peer, + outgoing bsmsg.BitSwapMessage) (bsmsg.BitSwapMessage, error) { outgoingMsg, err := outgoing.ToNet(p) if err != nil { return nil, err } - incomingMsg, err := wrapper.serviceDelegate.SendRequest(ctx, outgoingMsg) + incomingMsg, err := adapter.networkService.SendRequest(ctx, outgoingMsg) if err != nil { return nil, err } return bsmsg.FromNet(incomingMsg) } + +func (adapter *networkAdapter) SetDelegate(r Receiver) { + adapter.receiver = r +}