1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-29 17:36:38 +08:00

refac(bitswap) simply network interfaces

This commit is contained in:
Brian Tiger Chow
2014-09-16 02:05:24 -07:00
parent c34d4df96d
commit 014157cac6
5 changed files with 85 additions and 49 deletions

View File

@ -8,10 +8,9 @@ import (
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
bsmsg "github.com/jbenet/go-ipfs/bitswap/message" bsmsg "github.com/jbenet/go-ipfs/bitswap/message"
bsnet "github.com/jbenet/go-ipfs/bitswap/network"
notifications "github.com/jbenet/go-ipfs/bitswap/notifications" notifications "github.com/jbenet/go-ipfs/bitswap/notifications"
tx "github.com/jbenet/go-ipfs/bitswap/transmission"
blocks "github.com/jbenet/go-ipfs/blocks" blocks "github.com/jbenet/go-ipfs/blocks"
net "github.com/jbenet/go-ipfs/net"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
routing "github.com/jbenet/go-ipfs/routing" routing "github.com/jbenet/go-ipfs/routing"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
@ -34,7 +33,7 @@ type BitSwap struct {
peer *peer.Peer peer *peer.Peer
// sender delivers messages on behalf of the session // sender delivers messages on behalf of the session
sender tx.Sender sender bsnet.NetworkAdapter
// datastore is the local database // Ledgers of known // datastore is the local database // Ledgers of known
datastore ds.Datastore datastore ds.Datastore
@ -62,21 +61,16 @@ type BitSwap struct {
} }
// NewSession initializes a bitswap session. // NewSession initializes a bitswap session.
func NewSession(parent context.Context, s net.Sender, p *peer.Peer, d ds.Datastore, r routing.IpfsRouting) *BitSwap { func NewSession(parent context.Context, s bsnet.NetworkService, p *peer.Peer, d ds.Datastore, r routing.IpfsRouting) *BitSwap {
// TODO(brian): define a contract for management of async operations that receiver := bsnet.Forwarder{}
// fall under bitswap's purview
// ctx, _ := context.WithCancel(parent)
receiver := tx.Forwarder{}
sender := tx.NewSender(s)
bs := &BitSwap{ bs := &BitSwap{
peer: p, peer: p,
datastore: d, datastore: d,
partners: LedgerMap{}, partners: LedgerMap{},
wantList: KeySet{}, wantList: KeySet{},
routing: r, routing: r,
sender: sender, sender: bsnet.NewNetworkAdapter(s, &receiver),
haltChan: make(chan struct{}), haltChan: make(chan struct{}),
notifications: notifications.New(), notifications: notifications.New(),
strategy: YesManStrategy, strategy: YesManStrategy,
@ -246,7 +240,7 @@ func (bs *BitSwap) Halt() {
func (bs *BitSwap) ReceiveMessage( func (bs *BitSwap) ReceiveMessage(
ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) ( ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) (
bsmsg.BitSwapMessage, *peer.Peer, error) { *peer.Peer, bsmsg.BitSwapMessage, error) {
if incoming.Blocks() != nil { if incoming.Blocks() != nil {
for _, block := range incoming.Blocks() { for _, block := range incoming.Blocks() {
go bs.blockReceive(sender, block) go bs.blockReceive(sender, block)
@ -255,6 +249,7 @@ func (bs *BitSwap) ReceiveMessage(
if incoming.Wantlist() != nil { if incoming.Wantlist() != nil {
for _, want := range incoming.Wantlist() { for _, want := range incoming.Wantlist() {
// TODO(brian): return the block synchronously
go bs.peerWantsBlock(sender, want) go bs.peerWantsBlock(sender, want)
} }
} }

View File

@ -1,4 +1,4 @@
package transmission package network
import ( import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
@ -6,17 +6,17 @@ import (
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
) )
// Forwarder breaks the circular dependency between bitswap and its sender // Forwarder receives messages and forwards them to the delegate.
// NB: A sender is instantiated with a handler and this sender is then passed //
// as a constructor argument to BitSwap. However, the handler is BitSwap! // Forwarder breaks the circular dependency between the BitSwap Session and the
// Hence, this receiver. // Network Service.
type Forwarder struct { type Forwarder struct {
delegate Receiver delegate Receiver
} }
func (r *Forwarder) ReceiveMessage( func (r *Forwarder) ReceiveMessage(
ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) ( ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) (
bsmsg.BitSwapMessage, *peer.Peer, error) { *peer.Peer, bsmsg.BitSwapMessage, error) {
if r.delegate == nil { if r.delegate == nil {
return nil, nil, nil return nil, nil, nil
} }

View File

@ -1,4 +1,4 @@
package transmission package network
import ( import (
"testing" "testing"
@ -13,4 +13,14 @@ func TestDoesntPanicIfDelegateNotPresent(t *testing.T) {
fwdr.ReceiveMessage(context.Background(), &peer.Peer{}, bsmsg.New()) fwdr.ReceiveMessage(context.Background(), &peer.Peer{}, bsmsg.New())
} }
// TODO(brian): func TestForwardsMessageToDelegate(t *testing.T) func TestForwardsMessageToDelegate(t *testing.T) {
fwdr := Forwarder{delegate: &EchoDelegate{}}
fwdr.ReceiveMessage(context.Background(), &peer.Peer{}, bsmsg.New())
}
type EchoDelegate struct{}
func (d *EchoDelegate) ReceiveMessage(ctx context.Context, p *peer.Peer,
incoming bsmsg.BitSwapMessage) (*peer.Peer, bsmsg.BitSwapMessage, error) {
return p, incoming, nil
}

View File

@ -1,23 +1,38 @@
package transmission package network
import ( import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
netservice "github.com/jbenet/go-ipfs/net/service"
bsmsg "github.com/jbenet/go-ipfs/bitswap/message" bsmsg "github.com/jbenet/go-ipfs/bitswap/message"
netmsg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
) )
type Sender interface { // NetworkAdapter mediates the exchange's communication with the network.
SendMessage(ctx context.Context, destination *peer.Peer, message bsmsg.Exportable) error type NetworkAdapter interface {
SendRequest(ctx context.Context, destination *peer.Peer, outgoing bsmsg.Exportable) (
incoming bsmsg.BitSwapMessage, err error) // SendMessage sends a BitSwap message to a peer.
SendMessage(
context.Context,
*peer.Peer,
bsmsg.BitSwapMessage) error
// SendRequest sends a BitSwap message to a peer and waits for a response.
SendRequest(
context.Context,
*peer.Peer,
bsmsg.BitSwapMessage) (incoming bsmsg.BitSwapMessage, err error)
// SetDelegate registers the Reciver to handle messages received from the
// network.
SetDelegate(Receiver)
} }
// TODO(brian): consider returning a NetMessage
type Receiver interface { type Receiver interface {
ReceiveMessage( ReceiveMessage(
ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) ( ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) (
outgoing bsmsg.BitSwapMessage, destination *peer.Peer, err error) destination *peer.Peer, outgoing bsmsg.BitSwapMessage, err error)
} }
// TODO(brian): move this to go-ipfs/net package // TODO(brian): move this to go-ipfs/net package

View File

@ -1,43 +1,54 @@
package transmission package network
import ( import (
"errors"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
bsmsg "github.com/jbenet/go-ipfs/bitswap/message" bsmsg "github.com/jbenet/go-ipfs/bitswap/message"
net "github.com/jbenet/go-ipfs/net"
netmsg "github.com/jbenet/go-ipfs/net/message" netmsg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
) )
// NewSender wraps the net.service.Sender to perform translation between // NewSender wraps a network Service to perform translation between
// BitSwapMessage and NetMessage formats. This allows the BitSwap session to // BitSwapMessage and NetMessage formats. This allows the BitSwap session to
// ignore these details. // ignore these details.
func NewSender(s net.Sender) Sender { func NewNetworkAdapter(s NetworkService, r Receiver) NetworkAdapter {
return &senderWrapper{s} adapter := networkAdapter{
networkService: s,
receiver: r,
}
s.SetHandler(&adapter)
return &adapter
} }
// handlerWrapper implements the net.service.Handler interface. It is // networkAdapter implements NetworkAdapter
// responsible for converting between type networkAdapter struct {
// delegates calls to the BitSwap delegate. networkService NetworkService
type handlerWrapper struct { receiver Receiver
bitswapDelegate Receiver
} }
// HandleMessage marshals and unmarshals net messages, forwarding them to the // HandleMessage marshals and unmarshals net messages, forwarding them to the
// BitSwapMessage receiver // BitSwapMessage receiver
func (wrapper *handlerWrapper) HandleMessage( func (adapter *networkAdapter) HandleMessage(
ctx context.Context, incoming netmsg.NetMessage) (netmsg.NetMessage, error) { ctx context.Context, incoming netmsg.NetMessage) (netmsg.NetMessage, error) {
if adapter.receiver == nil {
return nil, errors.New("No receiver. NetMessage dropped")
}
received, err := bsmsg.FromNet(incoming) received, err := bsmsg.FromNet(incoming)
if err != nil { if err != nil {
return nil, err return nil, err
} }
bsmsg, p, err := wrapper.bitswapDelegate.ReceiveMessage(ctx, incoming.Peer(), received) p, bsmsg, err := adapter.receiver.ReceiveMessage(ctx, incoming.Peer(), received)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if bsmsg == nil {
// TODO(brian): put this in a helper function
if bsmsg == nil || p == nil {
return nil, nil return nil, nil
} }
@ -49,29 +60,34 @@ func (wrapper *handlerWrapper) HandleMessage(
return outgoing, nil return outgoing, nil
} }
type senderWrapper struct { func (adapter *networkAdapter) SendMessage(
serviceDelegate net.Sender ctx context.Context,
} p *peer.Peer,
outgoing bsmsg.BitSwapMessage) error {
func (wrapper *senderWrapper) SendMessage(
ctx context.Context, p *peer.Peer, outgoing bsmsg.Exportable) error {
nmsg, err := outgoing.ToNet(p) nmsg, err := outgoing.ToNet(p)
if err != nil { if err != nil {
return err return err
} }
return wrapper.serviceDelegate.SendMessage(ctx, nmsg) return adapter.networkService.SendMessage(ctx, nmsg)
} }
func (wrapper *senderWrapper) SendRequest(ctx context.Context, func (adapter *networkAdapter) SendRequest(
p *peer.Peer, outgoing bsmsg.Exportable) (bsmsg.BitSwapMessage, error) { ctx context.Context,
p *peer.Peer,
outgoing bsmsg.BitSwapMessage) (bsmsg.BitSwapMessage, error) {
outgoingMsg, err := outgoing.ToNet(p) outgoingMsg, err := outgoing.ToNet(p)
if err != nil { if err != nil {
return nil, err return nil, err
} }
incomingMsg, err := wrapper.serviceDelegate.SendRequest(ctx, outgoingMsg) incomingMsg, err := adapter.networkService.SendRequest(ctx, outgoingMsg)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return bsmsg.FromNet(incomingMsg) return bsmsg.FromNet(incomingMsg)
} }
func (adapter *networkAdapter) SetDelegate(r Receiver) {
adapter.receiver = r
}