diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index 37876fc95..c89b62136 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -3,6 +3,7 @@ package bitswap import ( "time" + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go" @@ -38,7 +39,7 @@ type BitSwap struct { // routing interface for communication routing *dht.IpfsDHT - listener *swarm.MessageListener + notifications *notifications // partners is a map of currently active bitswap relationships. // The Ledger has the peer.ID, and the peer connection works through net. @@ -60,15 +61,15 @@ type BitSwap struct { // NewBitSwap creates a new BitSwap instance. It does not check its parameters. func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore, r routing.IpfsRouting) *BitSwap { bs := &BitSwap{ - peer: p, - net: net, - datastore: d, - partners: LedgerMap{}, - wantList: KeySet{}, - routing: r.(*dht.IpfsDHT), - meschan: net.GetChannel(swarm.PBWrapper_BITSWAP), - haltChan: make(chan struct{}), - listener: swarm.NewMessageListener(), + peer: p, + net: net, + datastore: d, + partners: LedgerMap{}, + wantList: KeySet{}, + routing: r.(*dht.IpfsDHT), + meschan: net.GetChannel(swarm.PBWrapper_BITSWAP), + haltChan: make(chan struct{}), + notifications: newNotifications(), } go bs.handleMessages() @@ -83,7 +84,7 @@ func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) ( tleft := timeout - time.Now().Sub(begin) provs_ch := bs.routing.FindProvidersAsync(k, 20, timeout) - valchan := make(chan []byte) + blockChannel := make(chan *blocks.Block) after := time.After(tleft) // TODO: when the data is received, shut down this for loop ASAP @@ -96,7 +97,7 @@ func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) ( return } select { - case valchan <- blk: + case blockChannel <- blk: default: } }(p) @@ -104,31 +105,30 @@ func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) ( }() select { - case blkdata := <-valchan: - close(valchan) - return blocks.NewBlock(blkdata) + case block := <-blockChannel: + close(blockChannel) + return block, nil case <-after: return nil, u.ErrTimeout } } -func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) ([]byte, error) { +func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) (*blocks.Block, error) { u.DOut("[%s] getBlock '%s' from [%s]\n", bs.peer.ID.Pretty(), k.Pretty(), p.ID.Pretty()) + ctx, _ := context.WithTimeout(context.Background(), timeout) + blockChannel := bs.notifications.Subscribe(ctx, k) + message := newMessage() message.AppendWanted(k) - - after := time.After(timeout) - resp := bs.listener.Listen(string(k), 1, timeout) bs.meschan.Outgoing <- message.ToSwarm(p) - select { - case resp_mes := <-resp: - return resp_mes.Data, nil - case <-after: + block, ok := <-blockChannel + if !ok { u.PErr("getBlock for '%s' timed out.\n", k.Pretty()) return nil, u.ErrTimeout } + return block, nil } // HaveBlock announces the existance of a block to BitSwap, potentially sending @@ -229,11 +229,7 @@ func (bs *BitSwap) blockReceive(p *peer.Peer, blk *blocks.Block) { return } - mes := &swarm.Message{ - Peer: p, - Data: blk.Data, - } - bs.listener.Respond(string(blk.Key()), mes) + bs.notifications.Publish(blk) ledger := bs.getLedger(p) ledger.ReceivedBytes(len(blk.Data))