diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index ed30b0d2c..f42330795 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -60,10 +60,15 @@ type BitSwap struct { haltChan chan struct{} } -// NewBitSwap creates a new BitSwap instance. It does not check its parameters. -func NewBitSwap(p *peer.Peer, d ds.Datastore, r routing.IpfsRouting) *BitSwap { +// NewSession initializes a bitswap session. +func NewSession(parent context.Context, p *peer.Peer, d ds.Datastore, r routing.IpfsRouting) *BitSwap { + + // TODO(brian): define a contract for management of async operations that + // fall under bitswap's purview + ctx, _ := context.WithCancel(parent) + receiver := tx.Forwarder{} - sender := tx.NewBSNetService(context.Background(), &receiver) + sender := tx.NewServiceWrapper(ctx, &receiver) bs := &BitSwap{ peer: p, datastore: d, diff --git a/bitswap/transmission/service_wrapper.go b/bitswap/transmission/service_wrapper.go index 04d3f21f8..4a85d1c04 100644 --- a/bitswap/transmission/service_wrapper.go +++ b/bitswap/transmission/service_wrapper.go @@ -9,10 +9,10 @@ import ( peer "github.com/jbenet/go-ipfs/peer" ) -func NewBSNetService(ctx context.Context, r Receiver) Sender { +func NewServiceWrapper(ctx context.Context, r Receiver) Sender { h := &handlerWrapper{r} s := netservice.NewService(ctx, h) - return &serviceWrapper{*s} + return &senderWrapper{s} } // handlerWrapper is responsible for marshaling/unmarshaling NetMessages. It @@ -47,11 +47,11 @@ func (wrapper *handlerWrapper) HandleMessage( return outgoing, nil } -type serviceWrapper struct { - serviceDelegate netservice.Service +type senderWrapper struct { + serviceDelegate netservice.Sender } -func (wrapper *serviceWrapper) SendMessage( +func (wrapper *senderWrapper) SendMessage( ctx context.Context, p *peer.Peer, outgoing bsmsg.Exportable) error { nmsg, err := outgoing.ToNet(p) if err != nil { @@ -61,7 +61,7 @@ func (wrapper *serviceWrapper) SendMessage( return wrapper.serviceDelegate.SendMessage(ctx, nmsg, req.ID) } -func (wrapper *serviceWrapper) SendRequest(ctx context.Context, +func (wrapper *senderWrapper) SendRequest(ctx context.Context, p *peer.Peer, outgoing bsmsg.Exportable) (bsmsg.BitSwapMessage, error) { outgoingMsg, err := outgoing.ToNet(p) diff --git a/core/core.go b/core/core.go index 97eec86f0..fe91ff8e8 100644 --- a/core/core.go +++ b/core/core.go @@ -46,7 +46,7 @@ type IpfsNode struct { Routing routing.IpfsRouting // the block exchange + strategy (bitswap) - BitSwap *bitswap.BitSwap + BitSwap bitswap.BitSwap // the block service, get/add blocks. Blocks *bserv.BlockService @@ -81,7 +81,6 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) { net *inet.Network // TODO: refactor so we can use IpfsRouting interface instead of being DHT-specific route *dht.IpfsDHT - swap *bitswap.BitSwap ) if online { @@ -99,14 +98,14 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) { route.Start() // TODO(brian): pass a context to bs for its async operations - swap = bitswap.NewBitSwap(local, d, route) - swap.SetStrategy(bitswap.YesManStrategy) + bitswapSession := bitswap.NewSession(context.TODO(), local, d, route) + bitswapSession.SetStrategy(bitswap.YesManStrategy) // TODO(brian): pass a context to initConnections go initConnections(cfg, route) } - bs, err := bserv.NewBlockService(d, swap) + bs, err := bserv.NewBlockService(d, bitswapSession) if err != nil { return nil, err } @@ -120,7 +119,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) { Blocks: bs, DAG: dag, Resolver: &path.Resolver{DAG: dag}, - BitSwap: swap, + BitSwap: bitswapSession, Identity: local, Routing: route, }, nil