mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-29 01:12:24 +08:00
refactor(bs, core) rename bitswap objects
This commit is contained in:
@ -60,10 +60,15 @@ type BitSwap struct {
|
|||||||
haltChan chan struct{}
|
haltChan chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBitSwap creates a new BitSwap instance. It does not check its parameters.
|
// NewSession initializes a bitswap session.
|
||||||
func NewBitSwap(p *peer.Peer, d ds.Datastore, r routing.IpfsRouting) *BitSwap {
|
func NewSession(parent context.Context, p *peer.Peer, d ds.Datastore, r routing.IpfsRouting) *BitSwap {
|
||||||
|
|
||||||
|
// TODO(brian): define a contract for management of async operations that
|
||||||
|
// fall under bitswap's purview
|
||||||
|
ctx, _ := context.WithCancel(parent)
|
||||||
|
|
||||||
receiver := tx.Forwarder{}
|
receiver := tx.Forwarder{}
|
||||||
sender := tx.NewBSNetService(context.Background(), &receiver)
|
sender := tx.NewServiceWrapper(ctx, &receiver)
|
||||||
bs := &BitSwap{
|
bs := &BitSwap{
|
||||||
peer: p,
|
peer: p,
|
||||||
datastore: d,
|
datastore: d,
|
||||||
|
@ -9,10 +9,10 @@ import (
|
|||||||
peer "github.com/jbenet/go-ipfs/peer"
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewBSNetService(ctx context.Context, r Receiver) Sender {
|
func NewServiceWrapper(ctx context.Context, r Receiver) Sender {
|
||||||
h := &handlerWrapper{r}
|
h := &handlerWrapper{r}
|
||||||
s := netservice.NewService(ctx, h)
|
s := netservice.NewService(ctx, h)
|
||||||
return &serviceWrapper{*s}
|
return &senderWrapper{s}
|
||||||
}
|
}
|
||||||
|
|
||||||
// handlerWrapper is responsible for marshaling/unmarshaling NetMessages. It
|
// handlerWrapper is responsible for marshaling/unmarshaling NetMessages. It
|
||||||
@ -47,11 +47,11 @@ func (wrapper *handlerWrapper) HandleMessage(
|
|||||||
return outgoing, nil
|
return outgoing, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type serviceWrapper struct {
|
type senderWrapper struct {
|
||||||
serviceDelegate netservice.Service
|
serviceDelegate netservice.Sender
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wrapper *serviceWrapper) SendMessage(
|
func (wrapper *senderWrapper) SendMessage(
|
||||||
ctx context.Context, p *peer.Peer, outgoing bsmsg.Exportable) error {
|
ctx context.Context, p *peer.Peer, outgoing bsmsg.Exportable) error {
|
||||||
nmsg, err := outgoing.ToNet(p)
|
nmsg, err := outgoing.ToNet(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -61,7 +61,7 @@ func (wrapper *serviceWrapper) SendMessage(
|
|||||||
return wrapper.serviceDelegate.SendMessage(ctx, nmsg, req.ID)
|
return wrapper.serviceDelegate.SendMessage(ctx, nmsg, req.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wrapper *serviceWrapper) SendRequest(ctx context.Context,
|
func (wrapper *senderWrapper) SendRequest(ctx context.Context,
|
||||||
p *peer.Peer, outgoing bsmsg.Exportable) (bsmsg.BitSwapMessage, error) {
|
p *peer.Peer, outgoing bsmsg.Exportable) (bsmsg.BitSwapMessage, error) {
|
||||||
|
|
||||||
outgoingMsg, err := outgoing.ToNet(p)
|
outgoingMsg, err := outgoing.ToNet(p)
|
||||||
|
11
core/core.go
11
core/core.go
@ -46,7 +46,7 @@ type IpfsNode struct {
|
|||||||
Routing routing.IpfsRouting
|
Routing routing.IpfsRouting
|
||||||
|
|
||||||
// the block exchange + strategy (bitswap)
|
// the block exchange + strategy (bitswap)
|
||||||
BitSwap *bitswap.BitSwap
|
BitSwap bitswap.BitSwap
|
||||||
|
|
||||||
// the block service, get/add blocks.
|
// the block service, get/add blocks.
|
||||||
Blocks *bserv.BlockService
|
Blocks *bserv.BlockService
|
||||||
@ -81,7 +81,6 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) {
|
|||||||
net *inet.Network
|
net *inet.Network
|
||||||
// TODO: refactor so we can use IpfsRouting interface instead of being DHT-specific
|
// TODO: refactor so we can use IpfsRouting interface instead of being DHT-specific
|
||||||
route *dht.IpfsDHT
|
route *dht.IpfsDHT
|
||||||
swap *bitswap.BitSwap
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if online {
|
if online {
|
||||||
@ -99,14 +98,14 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) {
|
|||||||
route.Start()
|
route.Start()
|
||||||
|
|
||||||
// TODO(brian): pass a context to bs for its async operations
|
// TODO(brian): pass a context to bs for its async operations
|
||||||
swap = bitswap.NewBitSwap(local, d, route)
|
bitswapSession := bitswap.NewSession(context.TODO(), local, d, route)
|
||||||
swap.SetStrategy(bitswap.YesManStrategy)
|
bitswapSession.SetStrategy(bitswap.YesManStrategy)
|
||||||
|
|
||||||
// TODO(brian): pass a context to initConnections
|
// TODO(brian): pass a context to initConnections
|
||||||
go initConnections(cfg, route)
|
go initConnections(cfg, route)
|
||||||
}
|
}
|
||||||
|
|
||||||
bs, err := bserv.NewBlockService(d, swap)
|
bs, err := bserv.NewBlockService(d, bitswapSession)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -120,7 +119,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) {
|
|||||||
Blocks: bs,
|
Blocks: bs,
|
||||||
DAG: dag,
|
DAG: dag,
|
||||||
Resolver: &path.Resolver{DAG: dag},
|
Resolver: &path.Resolver{DAG: dag},
|
||||||
BitSwap: swap,
|
BitSwap: bitswapSession,
|
||||||
Identity: local,
|
Identity: local,
|
||||||
Routing: route,
|
Routing: route,
|
||||||
}, nil
|
}, nil
|
||||||
|
Reference in New Issue
Block a user