diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index 734b26ed1..46fc727ac 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -93,7 +93,7 @@ func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) ( tleft := timeout - time.Now().Sub(begin) provs_ch := bs.routing.FindProvidersAsync(k, 20, timeout) - blockChannel := make(chan *blocks.Block) + blockChannel := make(chan blocks.Block) after := time.After(tleft) // TODO: when the data is received, shut down this for loop ASAP @@ -106,7 +106,7 @@ func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) ( return } select { - case blockChannel <- blk: + case blockChannel <- *blk: default: } }(p) @@ -116,7 +116,7 @@ func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) ( select { case block := <-blockChannel: close(blockChannel) - return block, nil + return &block, nil case <-after: return nil, u.ErrTimeout } @@ -137,7 +137,7 @@ func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) (*bloc u.PErr("getBlock for '%s' timed out.\n", k.Pretty()) return nil, u.ErrTimeout } - return block, nil + return &block, nil } // HaveBlock announces the existance of a block to BitSwap, potentially sending @@ -173,12 +173,7 @@ func (bs *BitSwap) handleMessages() { } if bsmsg.Blocks() != nil { - for _, blkData := range bsmsg.Blocks() { - blk, err := blocks.NewBlock(blkData) - if err != nil { - u.PErr("%v\n", err) - continue - } + for _, blk := range bsmsg.Blocks() { go bs.blockReceive(mes.Peer, blk) } } @@ -231,7 +226,7 @@ func (bs *BitSwap) peerWantsBlock(p *peer.Peer, want string) { } } -func (bs *BitSwap) blockReceive(p *peer.Peer, blk *blocks.Block) { +func (bs *BitSwap) blockReceive(p *peer.Peer, blk blocks.Block) { u.DOut("blockReceive: %s\n", blk.Key().Pretty()) err := bs.datastore.Put(ds.NewKey(string(blk.Key())), blk.Data) if err != nil { @@ -286,5 +281,16 @@ func (bs *BitSwap) SetStrategy(sf StrategyFunc) { func (bs *BitSwap) ReceiveMessage( ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) ( bsmsg.BitSwapMessage, *peer.Peer, error) { + if incoming.Blocks() != nil { + for _, block := range incoming.Blocks() { + go bs.blockReceive(sender, block) + } + } + + if incoming.Wantlist() != nil { + for _, want := range incoming.Wantlist() { + go bs.peerWantsBlock(sender, want) + } + } return nil, nil, errors.New("TODO implement") } diff --git a/bitswap/message/message.go b/bitswap/message/message.go index 91bf957af..ff07986f9 100644 --- a/bitswap/message/message.go +++ b/bitswap/message/message.go @@ -15,7 +15,7 @@ import ( type BitSwapMessage interface { Wantlist() []string - Blocks() [][]byte + Blocks() []blocks.Block AppendWanted(k u.Key) AppendBlock(b *blocks.Block) Exportable @@ -46,8 +46,16 @@ func (m *message) Wantlist() []string { } // TODO(brian): convert these into blocks -func (m *message) Blocks() [][]byte { - return m.pb.Blocks +func (m *message) Blocks() []blocks.Block { + bs := make([]blocks.Block, len(m.pb.Blocks)) + for _, data := range m.pb.Blocks { + b, err := blocks.NewBlock(data) + if err != nil { + continue + } + bs = append(bs, *b) + } + return bs } func (m *message) AppendWanted(k u.Key) { diff --git a/bitswap/notifications/notifications.go b/bitswap/notifications/notifications.go index 56e5d88d5..2da2b7fad 100644 --- a/bitswap/notifications/notifications.go +++ b/bitswap/notifications/notifications.go @@ -9,8 +9,8 @@ import ( ) type PubSub interface { - Publish(block *blocks.Block) - Subscribe(ctx context.Context, k u.Key) <-chan *blocks.Block + Publish(block blocks.Block) + Subscribe(ctx context.Context, k u.Key) <-chan blocks.Block Shutdown() } @@ -23,7 +23,7 @@ type impl struct { wrapped pubsub.PubSub } -func (ps *impl) Publish(block *blocks.Block) { +func (ps *impl) Publish(block blocks.Block) { topic := string(block.Key()) ps.wrapped.Pub(block, topic) } @@ -31,15 +31,15 @@ func (ps *impl) Publish(block *blocks.Block) { // Subscribe returns a one-time use |blockChannel|. |blockChannel| returns nil // if the |ctx| times out or is cancelled. Then channel is closed after the // block given by |k| is sent. -func (ps *impl) Subscribe(ctx context.Context, k u.Key) <-chan *blocks.Block { +func (ps *impl) Subscribe(ctx context.Context, k u.Key) <-chan blocks.Block { topic := string(k) subChan := ps.wrapped.SubOnce(topic) - blockChannel := make(chan *blocks.Block) + blockChannel := make(chan blocks.Block) go func() { defer close(blockChannel) select { case val := <-subChan: - block, ok := val.(*blocks.Block) + block, ok := val.(blocks.Block) if ok { blockChannel <- block } diff --git a/bitswap/notifications/notifications_test.go b/bitswap/notifications/notifications_test.go index 6679c481c..487474e2d 100644 --- a/bitswap/notifications/notifications_test.go +++ b/bitswap/notifications/notifications_test.go @@ -34,19 +34,20 @@ func TestCarryOnWhenDeadlineExpires(t *testing.T) { n := New() defer n.Shutdown() - blockChannel := n.Subscribe(fastExpiringCtx, getBlockOrFail(t, "A Missed Connection").Key()) + block := getBlockOrFail(t, "A Missed Connection") + blockChannel := n.Subscribe(fastExpiringCtx, block.Key()) assertBlockChannelNil(t, blockChannel) } -func assertBlockChannelNil(t *testing.T, blockChannel <-chan *blocks.Block) { - blockReceived := <-blockChannel - if blockReceived != nil { +func assertBlockChannelNil(t *testing.T, blockChannel <-chan blocks.Block) { + _, ok := <-blockChannel + if ok { t.Fail() } } -func assertBlocksEqual(t *testing.T, a, b *blocks.Block) { +func assertBlocksEqual(t *testing.T, a, b blocks.Block) { if !bytes.Equal(a.Data, b.Data) { t.Fail() } @@ -55,10 +56,10 @@ func assertBlocksEqual(t *testing.T, a, b *blocks.Block) { } } -func getBlockOrFail(t *testing.T, msg string) *blocks.Block { +func getBlockOrFail(t *testing.T, msg string) blocks.Block { block, blockCreationErr := blocks.NewBlock([]byte(msg)) if blockCreationErr != nil { t.Fail() } - return block + return *block }