mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-29 17:36:38 +08:00
fix(bs) remove concrete refs to swarm and dht
This commit is contained in:
@ -11,13 +11,13 @@ import (
|
||||
notifications "github.com/jbenet/go-ipfs/bitswap/notifications"
|
||||
tx "github.com/jbenet/go-ipfs/bitswap/transmission"
|
||||
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||
swarm "github.com/jbenet/go-ipfs/net/swarm"
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
routing "github.com/jbenet/go-ipfs/routing"
|
||||
dht "github.com/jbenet/go-ipfs/routing/dht"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
)
|
||||
|
||||
// TODO(brian): ensure messages are being received
|
||||
|
||||
// PartnerWantListMax is the bound for the number of keys we'll store per
|
||||
// partner. These are usually taken from the top of the Partner's WantList
|
||||
// advertisements. WantLists are sorted in terms of priority.
|
||||
@ -32,16 +32,14 @@ type BitSwap struct {
|
||||
// peer is the identity of this (local) node.
|
||||
peer *peer.Peer
|
||||
|
||||
// net holds the connections to all peers.
|
||||
sender tx.Sender
|
||||
net swarm.Network
|
||||
meschan *swarm.Chan
|
||||
// sender delivers messages on behalf of the session
|
||||
sender tx.Sender
|
||||
|
||||
// datastore is the local database // Ledgers of known
|
||||
datastore ds.Datastore
|
||||
|
||||
// routing interface for communication
|
||||
routing *dht.IpfsDHT
|
||||
routing routing.IpfsRouting
|
||||
|
||||
notifications notifications.PubSub
|
||||
|
||||
@ -63,25 +61,21 @@ type BitSwap struct {
|
||||
}
|
||||
|
||||
// NewBitSwap creates a new BitSwap instance. It does not check its parameters.
|
||||
func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore, r routing.IpfsRouting) *BitSwap {
|
||||
func NewBitSwap(p *peer.Peer, d ds.Datastore, r routing.IpfsRouting) *BitSwap {
|
||||
receiver := tx.Forwarder{}
|
||||
sender := tx.NewBSNetService(context.Background(), &receiver)
|
||||
bs := &BitSwap{
|
||||
peer: p,
|
||||
net: net,
|
||||
datastore: d,
|
||||
partners: LedgerMap{},
|
||||
wantList: KeySet{},
|
||||
routing: r.(*dht.IpfsDHT),
|
||||
// TODO(brian): replace |meschan| with |sender| in BitSwap impl
|
||||
meschan: net.GetChannel(swarm.PBWrapper_BITSWAP),
|
||||
peer: p,
|
||||
datastore: d,
|
||||
partners: LedgerMap{},
|
||||
wantList: KeySet{},
|
||||
routing: r,
|
||||
sender: sender,
|
||||
haltChan: make(chan struct{}),
|
||||
notifications: notifications.New(),
|
||||
}
|
||||
receiver.Delegate(bs)
|
||||
|
||||
go bs.handleMessages()
|
||||
return bs
|
||||
}
|
||||
|
||||
@ -130,7 +124,7 @@ func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) (*bloc
|
||||
|
||||
message := bsmsg.New()
|
||||
message.AppendWanted(k)
|
||||
bs.meschan.Outgoing <- message.ToSwarm(p)
|
||||
bs.sender.SendMessage(ctx, p, message)
|
||||
|
||||
block, ok := <-blockChannel
|
||||
if !ok {
|
||||
@ -159,35 +153,7 @@ func (bs *BitSwap) HaveBlock(blk *blocks.Block) error {
|
||||
func (bs *BitSwap) SendBlock(p *peer.Peer, b *blocks.Block) {
|
||||
message := bsmsg.New()
|
||||
message.AppendBlock(b)
|
||||
bs.meschan.Outgoing <- message.ToSwarm(p)
|
||||
}
|
||||
|
||||
func (bs *BitSwap) handleMessages() {
|
||||
for {
|
||||
select {
|
||||
case mes := <-bs.meschan.Incoming:
|
||||
bsmsg, err := bsmsg.FromSwarm(*mes)
|
||||
if err != nil {
|
||||
u.PErr("%v\n", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if bsmsg.Blocks() != nil {
|
||||
for _, blk := range bsmsg.Blocks() {
|
||||
go bs.blockReceive(mes.Peer, blk)
|
||||
}
|
||||
}
|
||||
|
||||
if bsmsg.Wantlist() != nil {
|
||||
for _, want := range bsmsg.Wantlist() {
|
||||
go bs.peerWantsBlock(mes.Peer, want)
|
||||
}
|
||||
}
|
||||
case <-bs.haltChan:
|
||||
bs.notifications.Shutdown()
|
||||
return
|
||||
}
|
||||
}
|
||||
bs.sender.SendMessage(context.Background(), p, message)
|
||||
}
|
||||
|
||||
// peerWantsBlock will check if we have the block in question,
|
||||
@ -260,7 +226,7 @@ func (bs *BitSwap) SendWantList(wl KeySet) error {
|
||||
|
||||
// Lets just ping everybody all at once
|
||||
for _, ledger := range bs.partners {
|
||||
bs.meschan.Outgoing <- message.ToSwarm(ledger.Partner)
|
||||
bs.sender.SendMessage(context.TODO(), ledger.Partner, message)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -99,7 +99,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) {
|
||||
route.Start()
|
||||
|
||||
// TODO(brian): pass a context to bs for its async operations
|
||||
swap = bitswap.NewBitSwap(local, net, d, route)
|
||||
swap = bitswap.NewBitSwap(local, d, route)
|
||||
swap.SetStrategy(bitswap.YesManStrategy)
|
||||
|
||||
// TODO(brian): pass a context to initConnections
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
// IpfsRouting is the routing module interface
|
||||
// It is implemented by things like DHTs, etc.
|
||||
type IpfsRouting interface {
|
||||
FindProvidersAsync(u.Key, int, time.Duration) <-chan *peer.Peer
|
||||
|
||||
// Basic Put/Get
|
||||
|
||||
|
Reference in New Issue
Block a user