From 0075a47df02d6c14a121dd434c7b672e34349d19 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Sun, 14 Sep 2014 14:30:52 -0700 Subject: [PATCH] fix(bs) remove concrete refs to swarm and dht --- bitswap/bitswap.go | 62 +++++++++++----------------------------------- core/core.go | 2 +- routing/routing.go | 1 + 3 files changed, 16 insertions(+), 49 deletions(-) diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index dadf306c9..ed30b0d2c 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -11,13 +11,13 @@ import ( notifications "github.com/jbenet/go-ipfs/bitswap/notifications" tx "github.com/jbenet/go-ipfs/bitswap/transmission" blocks "github.com/jbenet/go-ipfs/blocks" - swarm "github.com/jbenet/go-ipfs/net/swarm" peer "github.com/jbenet/go-ipfs/peer" routing "github.com/jbenet/go-ipfs/routing" - dht "github.com/jbenet/go-ipfs/routing/dht" u "github.com/jbenet/go-ipfs/util" ) +// TODO(brian): ensure messages are being received + // PartnerWantListMax is the bound for the number of keys we'll store per // partner. These are usually taken from the top of the Partner's WantList // advertisements. WantLists are sorted in terms of priority. @@ -32,16 +32,14 @@ type BitSwap struct { // peer is the identity of this (local) node. peer *peer.Peer - // net holds the connections to all peers. - sender tx.Sender - net swarm.Network - meschan *swarm.Chan + // sender delivers messages on behalf of the session + sender tx.Sender // datastore is the local database // Ledgers of known datastore ds.Datastore // routing interface for communication - routing *dht.IpfsDHT + routing routing.IpfsRouting notifications notifications.PubSub @@ -63,25 +61,21 @@ type BitSwap struct { } // NewBitSwap creates a new BitSwap instance. It does not check its parameters. -func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore, r routing.IpfsRouting) *BitSwap { +func NewBitSwap(p *peer.Peer, d ds.Datastore, r routing.IpfsRouting) *BitSwap { receiver := tx.Forwarder{} sender := tx.NewBSNetService(context.Background(), &receiver) bs := &BitSwap{ - peer: p, - net: net, - datastore: d, - partners: LedgerMap{}, - wantList: KeySet{}, - routing: r.(*dht.IpfsDHT), - // TODO(brian): replace |meschan| with |sender| in BitSwap impl - meschan: net.GetChannel(swarm.PBWrapper_BITSWAP), + peer: p, + datastore: d, + partners: LedgerMap{}, + wantList: KeySet{}, + routing: r, sender: sender, haltChan: make(chan struct{}), notifications: notifications.New(), } receiver.Delegate(bs) - go bs.handleMessages() return bs } @@ -130,7 +124,7 @@ func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) (*bloc message := bsmsg.New() message.AppendWanted(k) - bs.meschan.Outgoing <- message.ToSwarm(p) + bs.sender.SendMessage(ctx, p, message) block, ok := <-blockChannel if !ok { @@ -159,35 +153,7 @@ func (bs *BitSwap) HaveBlock(blk *blocks.Block) error { func (bs *BitSwap) SendBlock(p *peer.Peer, b *blocks.Block) { message := bsmsg.New() message.AppendBlock(b) - bs.meschan.Outgoing <- message.ToSwarm(p) -} - -func (bs *BitSwap) handleMessages() { - for { - select { - case mes := <-bs.meschan.Incoming: - bsmsg, err := bsmsg.FromSwarm(*mes) - if err != nil { - u.PErr("%v\n", err) - continue - } - - if bsmsg.Blocks() != nil { - for _, blk := range bsmsg.Blocks() { - go bs.blockReceive(mes.Peer, blk) - } - } - - if bsmsg.Wantlist() != nil { - for _, want := range bsmsg.Wantlist() { - go bs.peerWantsBlock(mes.Peer, want) - } - } - case <-bs.haltChan: - bs.notifications.Shutdown() - return - } - } + bs.sender.SendMessage(context.Background(), p, message) } // peerWantsBlock will check if we have the block in question, @@ -260,7 +226,7 @@ func (bs *BitSwap) SendWantList(wl KeySet) error { // Lets just ping everybody all at once for _, ledger := range bs.partners { - bs.meschan.Outgoing <- message.ToSwarm(ledger.Partner) + bs.sender.SendMessage(context.TODO(), ledger.Partner, message) } return nil diff --git a/core/core.go b/core/core.go index 9c17f2a45..97eec86f0 100644 --- a/core/core.go +++ b/core/core.go @@ -99,7 +99,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) { route.Start() // TODO(brian): pass a context to bs for its async operations - swap = bitswap.NewBitSwap(local, net, d, route) + swap = bitswap.NewBitSwap(local, d, route) swap.SetStrategy(bitswap.YesManStrategy) // TODO(brian): pass a context to initConnections diff --git a/routing/routing.go b/routing/routing.go index fdf350749..c8dc2772b 100644 --- a/routing/routing.go +++ b/routing/routing.go @@ -10,6 +10,7 @@ import ( // IpfsRouting is the routing module interface // It is implemented by things like DHTs, etc. type IpfsRouting interface { + FindProvidersAsync(u.Key, int, time.Duration) <-chan *peer.Peer // Basic Put/Get