diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 86e0c776b..96234c12a 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -68,7 +68,7 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) { // consider moving this to an sync process. if s.Remote != nil { ctx := context.TODO() - err = s.Remote.HasBlock(ctx, *b) + err = s.Remote.HasBlock(ctx, b) } return k, err } @@ -98,6 +98,7 @@ func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, er func (s *BlockService) GetBlocks(ctx context.Context, ks []u.Key) <-chan *blocks.Block { out := make(chan *blocks.Block, 32) go func() { + defer close(out) var toFetch []u.Key for _, k := range ks { block, err := s.Blockstore.Get(k) @@ -108,6 +109,15 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []u.Key) <-chan *blocks log.Debug("Blockservice: Got data in datastore.") out <- block } + + nblocks, err := s.Remote.GetBlocks(ctx, toFetch) + if err != nil { + log.Errorf("Error with GetBlocks: %s", err) + return + } + for blk := range nblocks { + out <- blk + } }() return out } diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 05ed27eb3..604cfa21a 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -108,7 +108,7 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err select { case block := <-promise: - return &block, nil + return block, nil case <-parent.Done(): return nil, parent.Err() } @@ -122,7 +122,7 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err // NB: Your request remains open until the context expires. To conserve // resources, provide a context with a reasonably short deadline (ie. not one // that lasts throughout the lifetime of the server) -func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan blocks.Block, error) { +func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.Block, error) { // TODO log the request promise := bs.notifications.Subscribe(ctx, keys...) @@ -213,7 +213,7 @@ func (bs *bitswap) loop(parent context.Context) { // HasBlock announces the existance of a block to this bitswap service. The // service will potentially notify its peers. -func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error { +func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { log.Debugf("Has Block %v", blk.Key()) bs.wantlist.Remove(blk.Key()) bs.sendToPeersThatWant(ctx, blk) @@ -244,7 +244,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm for _, block := range incoming.Blocks() { // TODO verify blocks? - if err := bs.blockstore.Put(&block); err != nil { + if err := bs.blockstore.Put(block); err != nil { log.Criticalf("error putting block: %s", err) continue // FIXME(brian): err ignored } @@ -267,7 +267,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil { continue } else { - message.AddBlock(*block) + message.AddBlock(block) } } } @@ -290,7 +290,7 @@ func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage bs.strategy.MessageSent(p, m) } -func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) { +func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block *blocks.Block) { log.Debugf("Sending %v to peers that want it", block.Key()) for _, p := range bs.strategy.Peers() { diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index a8483c3bd..4f5755ae0 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -83,7 +83,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { if err := hasBlock.blockstore.Put(block); err != nil { t.Fatal(err) } - if err := hasBlock.exchange.HasBlock(context.Background(), *block); err != nil { + if err := hasBlock.exchange.HasBlock(context.Background(), block); err != nil { t.Fatal(err) } @@ -140,7 +140,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { first := instances[0] for _, b := range blocks { first.blockstore.Put(b) - first.exchange.HasBlock(context.Background(), *b) + first.exchange.HasBlock(context.Background(), b) rs.Announce(first.peer, b.Key()) } @@ -212,7 +212,7 @@ func TestSendToWantingPeer(t *testing.T) { beta := bg.Next() t.Logf("Peer %v announes availability of %v\n", w.peer, beta.Key()) ctx, _ = context.WithTimeout(context.Background(), timeout) - if err := w.blockstore.Put(&beta); err != nil { + if err := w.blockstore.Put(beta); err != nil { t.Fatal(err) } w.exchange.HasBlock(ctx, beta) @@ -225,7 +225,7 @@ func TestSendToWantingPeer(t *testing.T) { t.Logf("%v announces availability of %v\n", o.peer, alpha.Key()) ctx, _ = context.WithTimeout(context.Background(), timeout) - if err := o.blockstore.Put(&alpha); err != nil { + if err := o.blockstore.Put(alpha); err != nil { t.Fatal(err) } o.exchange.HasBlock(ctx, alpha) @@ -254,16 +254,16 @@ type BlockGenerator struct { seq int } -func (bg *BlockGenerator) Next() blocks.Block { +func (bg *BlockGenerator) Next() *blocks.Block { bg.seq++ - return *blocks.NewBlock([]byte(string(bg.seq))) + return blocks.NewBlock([]byte(string(bg.seq))) } func (bg *BlockGenerator) Blocks(n int) []*blocks.Block { blocks := make([]*blocks.Block, 0) for i := 0; i < n; i++ { b := bg.Next() - blocks = append(blocks, &b) + blocks = append(blocks, b) } return blocks } diff --git a/exchange/bitswap/message/message.go b/exchange/bitswap/message/message.go index e0aea227d..b69450a6f 100644 --- a/exchange/bitswap/message/message.go +++ b/exchange/bitswap/message/message.go @@ -19,7 +19,7 @@ type BitSwapMessage interface { Wantlist() []u.Key // Blocks returns a slice of unique blocks - Blocks() []blocks.Block + Blocks() []*blocks.Block // AddWanted adds the key to the Wantlist. // @@ -32,7 +32,7 @@ type BitSwapMessage interface { // implies Priority(A) > Priority(B) AddWanted(u.Key) - AddBlock(blocks.Block) + AddBlock(*blocks.Block) Exportable } @@ -42,14 +42,14 @@ type Exportable interface { } type impl struct { - existsInWantlist map[u.Key]struct{} // map to detect duplicates - wantlist []u.Key // slice to preserve ordering - blocks map[u.Key]blocks.Block // map to detect duplicates + existsInWantlist map[u.Key]struct{} // map to detect duplicates + wantlist []u.Key // slice to preserve ordering + blocks map[u.Key]*blocks.Block // map to detect duplicates } func New() BitSwapMessage { return &impl{ - blocks: make(map[u.Key]blocks.Block), + blocks: make(map[u.Key]*blocks.Block), existsInWantlist: make(map[u.Key]struct{}), wantlist: make([]u.Key, 0), } @@ -62,7 +62,7 @@ func newMessageFromProto(pbm pb.Message) BitSwapMessage { } for _, d := range pbm.GetBlocks() { b := blocks.NewBlock(d) - m.AddBlock(*b) + m.AddBlock(b) } return m } @@ -71,8 +71,8 @@ func (m *impl) Wantlist() []u.Key { return m.wantlist } -func (m *impl) Blocks() []blocks.Block { - bs := make([]blocks.Block, 0) +func (m *impl) Blocks() []*blocks.Block { + bs := make([]*blocks.Block, 0) for _, block := range m.blocks { bs = append(bs, block) } @@ -88,7 +88,7 @@ func (m *impl) AddWanted(k u.Key) { m.wantlist = append(m.wantlist, k) } -func (m *impl) AddBlock(b blocks.Block) { +func (m *impl) AddBlock(b *blocks.Block) { m.blocks[b.Key()] = b } diff --git a/exchange/bitswap/message/message_test.go b/exchange/bitswap/message/message_test.go index 9c69136cd..de64b7925 100644 --- a/exchange/bitswap/message/message_test.go +++ b/exchange/bitswap/message/message_test.go @@ -42,7 +42,7 @@ func TestAppendBlock(t *testing.T) { m := New() for _, str := range strs { block := blocks.NewBlock([]byte(str)) - m.AddBlock(*block) + m.AddBlock(block) } // assert strings are in proto message @@ -133,10 +133,10 @@ func TestToNetFromNetPreservesWantList(t *testing.T) { func TestToAndFromNetMessage(t *testing.T) { original := New() - original.AddBlock(*blocks.NewBlock([]byte("W"))) - original.AddBlock(*blocks.NewBlock([]byte("E"))) - original.AddBlock(*blocks.NewBlock([]byte("F"))) - original.AddBlock(*blocks.NewBlock([]byte("M"))) + original.AddBlock(blocks.NewBlock([]byte("W"))) + original.AddBlock(blocks.NewBlock([]byte("E"))) + original.AddBlock(blocks.NewBlock([]byte("F"))) + original.AddBlock(blocks.NewBlock([]byte("M"))) p := peer.WithIDString("X") netmsg, err := original.ToNet(p) @@ -180,8 +180,8 @@ func TestDuplicates(t *testing.T) { t.Fatal("Duplicate in BitSwapMessage") } - msg.AddBlock(*b) - msg.AddBlock(*b) + msg.AddBlock(b) + msg.AddBlock(b) if len(msg.Blocks()) != 1 { t.Fatal("Duplicate in BitSwapMessage") } diff --git a/exchange/bitswap/notifications/notifications.go b/exchange/bitswap/notifications/notifications.go index a2646c814..2497f6316 100644 --- a/exchange/bitswap/notifications/notifications.go +++ b/exchange/bitswap/notifications/notifications.go @@ -11,8 +11,8 @@ import ( const bufferSize = 16 type PubSub interface { - Publish(block blocks.Block) - Subscribe(ctx context.Context, keys ...u.Key) <-chan blocks.Block + Publish(block *blocks.Block) + Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block Shutdown() } @@ -24,7 +24,7 @@ type impl struct { wrapped pubsub.PubSub } -func (ps *impl) Publish(block blocks.Block) { +func (ps *impl) Publish(block *blocks.Block) { topic := string(block.Key()) ps.wrapped.Pub(block, topic) } @@ -32,18 +32,18 @@ func (ps *impl) Publish(block blocks.Block) { // Subscribe returns a one-time use |blockChannel|. |blockChannel| returns nil // if the |ctx| times out or is cancelled. Then channel is closed after the // blocks given by |keys| are sent. -func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan blocks.Block { +func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block { topics := make([]string, 0) for _, key := range keys { topics = append(topics, string(key)) } subChan := ps.wrapped.SubOnce(topics...) - blockChannel := make(chan blocks.Block, 1) // buffered so the sender doesn't wait on receiver + blockChannel := make(chan *blocks.Block, 1) // buffered so the sender doesn't wait on receiver go func() { defer close(blockChannel) select { case val := <-subChan: - block, ok := val.(blocks.Block) + block, ok := val.(*blocks.Block) if ok { blockChannel <- block } diff --git a/exchange/bitswap/notifications/notifications_test.go b/exchange/bitswap/notifications/notifications_test.go index 063634f61..ebbae2a51 100644 --- a/exchange/bitswap/notifications/notifications_test.go +++ b/exchange/bitswap/notifications/notifications_test.go @@ -16,13 +16,13 @@ func TestPublishSubscribe(t *testing.T) { defer n.Shutdown() ch := n.Subscribe(context.Background(), blockSent.Key()) - n.Publish(*blockSent) + n.Publish(blockSent) blockRecvd, ok := <-ch if !ok { t.Fail() } - assertBlocksEqual(t, blockRecvd, *blockSent) + assertBlocksEqual(t, blockRecvd, blockSent) } @@ -39,14 +39,14 @@ func TestCarryOnWhenDeadlineExpires(t *testing.T) { assertBlockChannelNil(t, blockChannel) } -func assertBlockChannelNil(t *testing.T, blockChannel <-chan blocks.Block) { +func assertBlockChannelNil(t *testing.T, blockChannel <-chan *blocks.Block) { _, ok := <-blockChannel if ok { t.Fail() } } -func assertBlocksEqual(t *testing.T, a, b blocks.Block) { +func assertBlocksEqual(t *testing.T, a, b *blocks.Block) { if !bytes.Equal(a.Data, b.Data) { t.Fail() } diff --git a/exchange/bitswap/strategy/strategy_test.go b/exchange/bitswap/strategy/strategy_test.go index ef93d9827..d07af601b 100644 --- a/exchange/bitswap/strategy/strategy_test.go +++ b/exchange/bitswap/strategy/strategy_test.go @@ -30,7 +30,7 @@ func TestConsistentAccounting(t *testing.T) { m := message.New() content := []string{"this", "is", "message", "i"} - m.AddBlock(*blocks.NewBlock([]byte(strings.Join(content, " ")))) + m.AddBlock(blocks.NewBlock([]byte(strings.Join(content, " ")))) sender.MessageSent(receiver.Peer, m) receiver.MessageReceived(sender.Peer, m) diff --git a/exchange/bitswap/testnet/network_test.go b/exchange/bitswap/testnet/network_test.go index 3930c2a8c..6f57aad50 100644 --- a/exchange/bitswap/testnet/network_test.go +++ b/exchange/bitswap/testnet/network_test.go @@ -33,7 +33,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) { // TODO test contents of incoming message m := bsmsg.New() - m.AddBlock(*blocks.NewBlock([]byte(expectedStr))) + m.AddBlock(blocks.NewBlock([]byte(expectedStr))) return from, m })) @@ -41,7 +41,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) { t.Log("Build a message and send a synchronous request to recipient") message := bsmsg.New() - message.AddBlock(*blocks.NewBlock([]byte("data"))) + message.AddBlock(blocks.NewBlock([]byte("data"))) response, err := initiator.SendRequest( context.Background(), peer.WithID(idOfRecipient), message) if err != nil { @@ -77,7 +77,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { peer.Peer, bsmsg.BitSwapMessage) { msgToWaiter := bsmsg.New() - msgToWaiter.AddBlock(*blocks.NewBlock([]byte(expectedStr))) + msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr))) return fromWaiter, msgToWaiter })) @@ -105,7 +105,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { })) messageSentAsync := bsmsg.New() - messageSentAsync.AddBlock(*blocks.NewBlock([]byte("data"))) + messageSentAsync.AddBlock(blocks.NewBlock([]byte("data"))) errSending := waiter.SendMessage( context.Background(), peer.WithID(idOfResponder), messageSentAsync) if errSending != nil { diff --git a/exchange/interface.go b/exchange/interface.go index 1f126eed3..aa2e2431c 100644 --- a/exchange/interface.go +++ b/exchange/interface.go @@ -16,9 +16,11 @@ type Interface interface { // GetBlock returns the block associated with a given key. GetBlock(context.Context, u.Key) (*blocks.Block, error) + GetBlocks(context.Context, []u.Key) (<-chan *blocks.Block, error) + // TODO Should callers be concerned with whether the block was made // available on the network? - HasBlock(context.Context, blocks.Block) error + HasBlock(context.Context, *blocks.Block) error io.Closer } diff --git a/exchange/offline/offline.go b/exchange/offline/offline.go index 893f546a9..24a89e038 100644 --- a/exchange/offline/offline.go +++ b/exchange/offline/offline.go @@ -30,7 +30,7 @@ func (_ *offlineExchange) GetBlock(context.Context, u.Key) (*blocks.Block, error } // HasBlock always returns nil. -func (_ *offlineExchange) HasBlock(context.Context, blocks.Block) error { +func (_ *offlineExchange) HasBlock(context.Context, *blocks.Block) error { return nil } @@ -38,3 +38,7 @@ func (_ *offlineExchange) HasBlock(context.Context, blocks.Block) error { func (_ *offlineExchange) Close() error { return nil } + +func (_ *offlineExchange) GetBlocks(context.Context, []u.Key) (<-chan *blocks.Block, error) { + return nil, OfflineMode +} diff --git a/exchange/offline/offline_test.go b/exchange/offline/offline_test.go index 98b6e1a8c..ac02d2101 100644 --- a/exchange/offline/offline_test.go +++ b/exchange/offline/offline_test.go @@ -21,7 +21,7 @@ func TestBlockReturnsErr(t *testing.T) { func TestHasBlockReturnsNil(t *testing.T) { off := Exchange() block := blocks.NewBlock([]byte("data")) - err := off.HasBlock(context.Background(), *block) + err := off.HasBlock(context.Background(), block) if err != nil { t.Fatal("") } diff --git a/importer/importer_test.go b/importer/importer_test.go index e8177aa91..21af40670 100644 --- a/importer/importer_test.go +++ b/importer/importer_test.go @@ -69,6 +69,7 @@ func testFileConsistency(t *testing.T, bs chunk.BlockSplitter, nbytes int) { if err != nil { t.Fatal(err) } + r, err := uio.NewDagReader(nd, nil) if err != nil { t.Fatal(err) diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index bdb258444..2fba2f5fa 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -328,7 +328,7 @@ func (ds *dagService) BatchFetch(ctx context.Context, root *Node) <-chan *Node { if next == i { sig <- nd next++ - for ; nodes[next] != nil; next++ { + for ; next < len(nodes) && nodes[next] != nil; next++ { sig <- nodes[next] } } diff --git a/unixfs/io/dagreader.go b/unixfs/io/dagreader.go index ec1b21bfe..7f8720cf1 100644 --- a/unixfs/io/dagreader.go +++ b/unixfs/io/dagreader.go @@ -17,10 +17,11 @@ var ErrIsDir = errors.New("this dag node is a directory") // DagReader provides a way to easily read the data contained in a dag. type DagReader struct { - serv mdag.DAGService - node *mdag.Node - buf io.Reader - fetchChan <-chan *mdag.Node + serv mdag.DAGService + node *mdag.Node + buf io.Reader + fetchChan <-chan *mdag.Node + linkPosition int } // NewDagReader creates a new reader object that reads the data represented by the given @@ -37,11 +38,15 @@ func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) { // Dont allow reading directories return nil, ErrIsDir case ftpb.Data_File: + var fetchChan <-chan *mdag.Node + if serv != nil { + fetchChan = serv.BatchFetch(context.TODO(), n) + } return &DagReader{ node: n, serv: serv, buf: bytes.NewBuffer(pb.GetData()), - fetchChan: serv.BatchFetch(context.TODO(), n), + fetchChan: fetchChan, }, nil case ftpb.Data_Raw: // Raw block will just be a single level, return a byte buffer @@ -61,6 +66,17 @@ func (dr *DagReader) precalcNextBuf() error { if !ok { return io.EOF } + default: + // Only used when fetchChan is nil, + // which only happens when passed in a nil dagservice + // TODO: this logic is hard to follow, do it better. + // NOTE: the only time this code is used, is during the + // importer tests, consider just changing those tests + if dr.linkPosition >= len(dr.node.Links) { + return io.EOF + } + nxt = dr.node.Links[dr.linkPosition].Node + dr.linkPosition++ } pb := new(ftpb.Data)