From b84cbec2b64fb9f50f973dd862cfde5c57f4a002 Mon Sep 17 00:00:00 2001 From: Kevin Atkinson Date: Thu, 5 May 2016 18:00:43 -0400 Subject: [PATCH] Make blocks.Block an interface. License: MIT Signed-off-by: Kevin Atkinson --- blocks/blocks.go | 38 +++++++++++++------ blocks/blockstore/blockstore.go | 16 ++++---- blocks/blockstore/blockstore_test.go | 2 +- blocks/blockstore/write_cache.go | 8 ++-- blocks/blocksutil/block_generator.go | 6 +-- blockservice/blockservice.go | 10 ++--- blockservice/test/blocks_test.go | 6 +-- core/commands/block.go | 6 +-- exchange/bitswap/bitswap.go | 20 +++++----- exchange/bitswap/bitswap_test.go | 4 +- exchange/bitswap/decision/engine.go | 12 +++--- exchange/bitswap/decision/engine_test.go | 2 +- exchange/bitswap/message/message.go | 16 ++++---- .../bitswap/notifications/notifications.go | 12 +++--- .../notifications/notifications_test.go | 6 +-- exchange/bitswap/testnet/network_test.go | 2 +- exchange/bitswap/workers.go | 2 +- exchange/interface.go | 6 +-- exchange/offline/offline.go | 8 ++-- importer/chunk/rabin_test.go | 4 +- merkledag/merkledag.go | 22 +++++------ test/integration/bitswap_wo_routing_test.go | 4 +- 22 files changed, 114 insertions(+), 98 deletions(-) diff --git a/blocks/blocks.go b/blocks/blocks.go index bcf58f747..777ab4c90 100644 --- a/blocks/blocks.go +++ b/blocks/blocks.go @@ -11,40 +11,56 @@ import ( u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util" ) +type Block interface { + Multihash() mh.Multihash + Data() []byte + Key() key.Key + String() string + Loggable() map[string]interface{} +} + // Block is a singular block of data in ipfs -type Block struct { - Multihash mh.Multihash - Data []byte +type RawBlock struct { + multihash mh.Multihash + data []byte } // NewBlock creates a Block object from opaque data. It will hash the data. -func NewBlock(data []byte) *Block { - return &Block{Data: data, Multihash: u.Hash(data)} +func NewBlock(data []byte) *RawBlock { + return &RawBlock{data: data, multihash: u.Hash(data)} } // NewBlockWithHash creates a new block when the hash of the data // is already known, this is used to save time in situations where // we are able to be confident that the data is correct -func NewBlockWithHash(data []byte, h mh.Multihash) (*Block, error) { +func NewBlockWithHash(data []byte, h mh.Multihash) (*RawBlock, error) { if u.Debug { chk := u.Hash(data) if string(chk) != string(h) { return nil, errors.New("Data did not match given hash!") } } - return &Block{Data: data, Multihash: h}, nil + return &RawBlock{data: data, multihash: h}, nil +} + +func (b *RawBlock) Multihash() mh.Multihash { + return b.multihash +} + +func (b *RawBlock) Data() []byte { + return b.data } // Key returns the block's Multihash as a Key value. -func (b *Block) Key() key.Key { - return key.Key(b.Multihash) +func (b *RawBlock) Key() key.Key { + return key.Key(b.multihash) } -func (b *Block) String() string { +func (b *RawBlock) String() string { return fmt.Sprintf("[Block %s]", b.Key()) } -func (b *Block) Loggable() map[string]interface{} { +func (b *RawBlock) Loggable() map[string]interface{} { return map[string]interface{}{ "block": b.Key().String(), } diff --git a/blocks/blockstore/blockstore.go b/blocks/blockstore/blockstore.go index f8c086cc2..671ae2c25 100644 --- a/blocks/blockstore/blockstore.go +++ b/blocks/blockstore/blockstore.go @@ -30,9 +30,9 @@ var ErrNotFound = errors.New("blockstore: block not found") type Blockstore interface { DeleteBlock(key.Key) error Has(key.Key) (bool, error) - Get(key.Key) (*blocks.Block, error) - Put(*blocks.Block) error - PutMany([]*blocks.Block) error + Get(key.Key) (blocks.Block, error) + Put(blocks.Block) error + PutMany([]blocks.Block) error AllKeysChan(ctx context.Context) (<-chan key.Key, error) } @@ -73,7 +73,7 @@ type blockstore struct { gcreqlk sync.Mutex } -func (bs *blockstore) Get(k key.Key) (*blocks.Block, error) { +func (bs *blockstore) Get(k key.Key) (blocks.Block, error) { maybeData, err := bs.datastore.Get(k.DsKey()) if err == ds.ErrNotFound { return nil, ErrNotFound @@ -89,7 +89,7 @@ func (bs *blockstore) Get(k key.Key) (*blocks.Block, error) { return blocks.NewBlockWithHash(bdata, mh.Multihash(k)) } -func (bs *blockstore) Put(block *blocks.Block) error { +func (bs *blockstore) Put(block blocks.Block) error { k := block.Key().DsKey() // Has is cheaper than Put, so see if we already have it @@ -97,10 +97,10 @@ func (bs *blockstore) Put(block *blocks.Block) error { if err == nil && exists { return nil // already stored. } - return bs.datastore.Put(k, block.Data) + return bs.datastore.Put(k, block.Data()) } -func (bs *blockstore) PutMany(blocks []*blocks.Block) error { +func (bs *blockstore) PutMany(blocks []blocks.Block) error { t, err := bs.datastore.Batch() if err != nil { return err @@ -112,7 +112,7 @@ func (bs *blockstore) PutMany(blocks []*blocks.Block) error { continue } - err = t.Put(k, b.Data) + err = t.Put(k, b.Data()) if err != nil { return err } diff --git a/blocks/blockstore/blockstore_test.go b/blocks/blockstore/blockstore_test.go index 4987f9670..446d4b776 100644 --- a/blocks/blockstore/blockstore_test.go +++ b/blocks/blockstore/blockstore_test.go @@ -40,7 +40,7 @@ func TestPutThenGetBlock(t *testing.T) { if err != nil { t.Fatal(err) } - if !bytes.Equal(block.Data, blockFromBlockstore.Data) { + if !bytes.Equal(block.Data(), blockFromBlockstore.Data()) { t.Fail() } } diff --git a/blocks/blockstore/write_cache.go b/blocks/blockstore/write_cache.go index 9084b1a67..f7c2caf45 100644 --- a/blocks/blockstore/write_cache.go +++ b/blocks/blockstore/write_cache.go @@ -34,11 +34,11 @@ func (w *writecache) Has(k key.Key) (bool, error) { return w.blockstore.Has(k) } -func (w *writecache) Get(k key.Key) (*blocks.Block, error) { +func (w *writecache) Get(k key.Key) (blocks.Block, error) { return w.blockstore.Get(k) } -func (w *writecache) Put(b *blocks.Block) error { +func (w *writecache) Put(b blocks.Block) error { k := b.Key() if _, ok := w.cache.Get(k); ok { return nil @@ -49,8 +49,8 @@ func (w *writecache) Put(b *blocks.Block) error { return w.blockstore.Put(b) } -func (w *writecache) PutMany(bs []*blocks.Block) error { - var good []*blocks.Block +func (w *writecache) PutMany(bs []blocks.Block) error { + var good []blocks.Block for _, b := range bs { if _, ok := w.cache.Get(b.Key()); !ok { good = append(good, b) diff --git a/blocks/blocksutil/block_generator.go b/blocks/blocksutil/block_generator.go index 2d37fa056..d70f79470 100644 --- a/blocks/blocksutil/block_generator.go +++ b/blocks/blocksutil/block_generator.go @@ -10,13 +10,13 @@ type BlockGenerator struct { seq int } -func (bg *BlockGenerator) Next() *blocks.Block { +func (bg *BlockGenerator) Next() blocks.Block { bg.seq++ return blocks.NewBlock([]byte(string(bg.seq))) } -func (bg *BlockGenerator) Blocks(n int) []*blocks.Block { - blocks := make([]*blocks.Block, 0) +func (bg *BlockGenerator) Blocks(n int) []blocks.Block { + blocks := make([]blocks.Block, 0) for i := 0; i < n; i++ { b := bg.Next() blocks = append(blocks, b) diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 802f493d1..78838757a 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -41,7 +41,7 @@ func New(bs blockstore.Blockstore, rem exchange.Interface) *BlockService { // AddBlock adds a particular block to the service, Putting it into the datastore. // TODO pass a context into this if the remote.HasBlock is going to remain here. -func (s *BlockService) AddBlock(b *blocks.Block) (key.Key, error) { +func (s *BlockService) AddBlock(b blocks.Block) (key.Key, error) { k := b.Key() err := s.Blockstore.Put(b) if err != nil { @@ -53,7 +53,7 @@ func (s *BlockService) AddBlock(b *blocks.Block) (key.Key, error) { return k, nil } -func (s *BlockService) AddBlocks(bs []*blocks.Block) ([]key.Key, error) { +func (s *BlockService) AddBlocks(bs []blocks.Block) ([]key.Key, error) { err := s.Blockstore.PutMany(bs) if err != nil { return nil, err @@ -71,7 +71,7 @@ func (s *BlockService) AddBlocks(bs []*blocks.Block) ([]key.Key, error) { // GetBlock retrieves a particular block from the service, // Getting it from the datastore using the key (hash). -func (s *BlockService) GetBlock(ctx context.Context, k key.Key) (*blocks.Block, error) { +func (s *BlockService) GetBlock(ctx context.Context, k key.Key) (blocks.Block, error) { log.Debugf("BlockService GetBlock: '%s'", k) block, err := s.Blockstore.Get(k) if err == nil { @@ -103,8 +103,8 @@ func (s *BlockService) GetBlock(ctx context.Context, k key.Key) (*blocks.Block, // GetBlocks gets a list of blocks asynchronously and returns through // the returned channel. // NB: No guarantees are made about order. -func (s *BlockService) GetBlocks(ctx context.Context, ks []key.Key) <-chan *blocks.Block { - out := make(chan *blocks.Block, 0) +func (s *BlockService) GetBlocks(ctx context.Context, ks []key.Key) <-chan blocks.Block { + out := make(chan blocks.Block, 0) go func() { defer close(out) var misses []key.Key diff --git a/blockservice/test/blocks_test.go b/blockservice/test/blocks_test.go index ab6a476aa..584505b21 100644 --- a/blockservice/test/blocks_test.go +++ b/blockservice/test/blocks_test.go @@ -24,7 +24,7 @@ func TestBlocks(t *testing.T) { b := blocks.NewBlock([]byte("beep boop")) h := u.Hash([]byte("beep boop")) - if !bytes.Equal(b.Multihash, h) { + if !bytes.Equal(b.Multihash(), h) { t.Error("Block Multihash and data multihash not equal") } @@ -54,7 +54,7 @@ func TestBlocks(t *testing.T) { t.Error("Block keys not equal.") } - if !bytes.Equal(b.Data, b2.Data) { + if !bytes.Equal(b.Data(), b2.Data()) { t.Error("Block data is not equal.") } } @@ -79,7 +79,7 @@ func TestGetBlocksSequential(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*50) defer cancel() out := servs[i].GetBlocks(ctx, keys) - gotten := make(map[key.Key]*blocks.Block) + gotten := make(map[key.Key]blocks.Block) for blk := range out { if _, ok := gotten[blk.Key()]; ok { t.Fatal("Got duplicate block!") diff --git a/core/commands/block.go b/core/commands/block.go index 5f9ed2d4c..8655833ea 100644 --- a/core/commands/block.go +++ b/core/commands/block.go @@ -66,7 +66,7 @@ on raw ipfs blocks. It outputs the following to stdout: res.SetOutput(&BlockStat{ Key: b.Key().B58String(), - Size: len(b.Data), + Size: len(b.Data()), }) }, Type: BlockStat{}, @@ -97,7 +97,7 @@ It outputs to stdout, and is a base58 encoded multihash. return } - res.SetOutput(bytes.NewReader(b.Data)) + res.SetOutput(bytes.NewReader(b.Data())) }, } @@ -161,7 +161,7 @@ It reads from stdin, and is a base58 encoded multihash. Type: BlockStat{}, } -func getBlockForKey(req cmds.Request, skey string) (*blocks.Block, error) { +func getBlockForKey(req cmds.Request, skey string) (blocks.Block, error) { n, err := req.InvocContext().GetNode() if err != nil { return nil, err diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 59e84d4b0..68f7f3e8d 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -90,7 +90,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, network: network, findKeys: make(chan *wantlist.Entry, sizeBatchRequestChan), process: px, - newBlocks: make(chan *blocks.Block, HasBlockBufferSize), + newBlocks: make(chan blocks.Block, HasBlockBufferSize), provideKeys: make(chan key.Key, provideKeysBufferSize), wm: NewWantManager(ctx, network), } @@ -137,7 +137,7 @@ type Bitswap struct { process process.Process - newBlocks chan *blocks.Block + newBlocks chan blocks.Block provideKeys chan key.Key @@ -154,7 +154,7 @@ type blockRequest struct { // GetBlock attempts to retrieve a particular block from peers within the // deadline enforced by the context. -func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (*blocks.Block, error) { +func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (blocks.Block, error) { // Any async work initiated by this function must end when this function // returns. To ensure this, derive a new context. Note that it is okay to @@ -209,9 +209,9 @@ func (bs *Bitswap) WantlistForPeer(p peer.ID) []key.Key { // NB: Your request remains open until the context expires. To conserve // resources, provide a context with a reasonably short deadline (ie. not one // that lasts throughout the lifetime of the server) -func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan *blocks.Block, error) { +func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks.Block, error) { if len(keys) == 0 { - out := make(chan *blocks.Block) + out := make(chan blocks.Block) close(out) return out, nil } @@ -251,7 +251,7 @@ func (bs *Bitswap) CancelWants(ks []key.Key) { // HasBlock announces the existance of a block to this bitswap service. The // service will potentially notify its peers. -func (bs *Bitswap) HasBlock(blk *blocks.Block) error { +func (bs *Bitswap) HasBlock(blk blocks.Block) error { select { case <-bs.process.Closing(): return errors.New("bitswap is closed") @@ -277,7 +277,7 @@ func (bs *Bitswap) HasBlock(blk *blocks.Block) error { return nil } -func (bs *Bitswap) tryPutBlock(blk *blocks.Block, attempts int) error { +func (bs *Bitswap) tryPutBlock(blk blocks.Block, attempts int) error { var err error for i := 0; i < attempts; i++ { if err = bs.blockstore.Put(blk); err == nil { @@ -316,7 +316,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg wg := sync.WaitGroup{} for _, block := range iblocks { wg.Add(1) - go func(b *blocks.Block) { + go func(b blocks.Block) { defer wg.Done() if err := bs.updateReceiveCounters(b); err != nil { @@ -337,7 +337,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg var ErrAlreadyHaveBlock = errors.New("already have block") -func (bs *Bitswap) updateReceiveCounters(b *blocks.Block) error { +func (bs *Bitswap) updateReceiveCounters(b blocks.Block) error { bs.counterLk.Lock() defer bs.counterLk.Unlock() bs.blocksRecvd++ @@ -348,7 +348,7 @@ func (bs *Bitswap) updateReceiveCounters(b *blocks.Block) error { } if err == nil && has { bs.dupBlocksRecvd++ - bs.dupDataRecvd += uint64(len(b.Data)) + bs.dupDataRecvd += uint64(len(b.Data())) } if has { diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index d7fde792b..baab322e2 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -85,7 +85,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { t.Fatal("Expected to succeed") } - if !bytes.Equal(block.Data, received.Data) { + if !bytes.Equal(block.Data(), received.Data()) { t.Fatal("Data doesn't match") } } @@ -218,7 +218,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { } } -func getOrFail(bitswap Instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) { +func getOrFail(bitswap Instance, b blocks.Block, t *testing.T, wg *sync.WaitGroup) { if _, err := bitswap.Blockstore().Get(b.Key()); err != nil { _, err := bitswap.Exchange.GetBlock(context.Background(), b.Key()) if err != nil { diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index 366e8ab23..87a77b086 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -58,7 +58,7 @@ type Envelope struct { Peer peer.ID // Block is the payload - Block *blocks.Block + Block blocks.Block // A callback to notify the decision queue that the task is complete Sent func() @@ -226,13 +226,13 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { } for _, block := range m.Blocks() { - log.Debugf("got block %s %d bytes", block.Key(), len(block.Data)) - l.ReceivedBytes(len(block.Data)) + log.Debugf("got block %s %d bytes", block.Key(), len(block.Data())) + l.ReceivedBytes(len(block.Data())) } return nil } -func (e *Engine) addBlock(block *blocks.Block) { +func (e *Engine) addBlock(block blocks.Block) { work := false for _, l := range e.ledgerMap { @@ -247,7 +247,7 @@ func (e *Engine) addBlock(block *blocks.Block) { } } -func (e *Engine) AddBlock(block *blocks.Block) { +func (e *Engine) AddBlock(block blocks.Block) { e.lock.Lock() defer e.lock.Unlock() @@ -266,7 +266,7 @@ func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error { l := e.findOrCreate(p) for _, block := range m.Blocks() { - l.SentBytes(len(block.Data)) + l.SentBytes(len(block.Data())) l.wantList.Remove(block.Key()) e.peerRequestQueue.Remove(block.Key(), p) } diff --git a/exchange/bitswap/decision/engine_test.go b/exchange/bitswap/decision/engine_test.go index d496096bb..4d906276b 100644 --- a/exchange/bitswap/decision/engine_test.go +++ b/exchange/bitswap/decision/engine_test.go @@ -188,7 +188,7 @@ func checkHandledInOrder(t *testing.T, e *Engine, keys []string) error { received := envelope.Block expected := blocks.NewBlock([]byte(k)) if received.Key() != expected.Key() { - return errors.New(fmt.Sprintln("received", string(received.Data), "expected", string(expected.Data))) + return errors.New(fmt.Sprintln("received", string(received.Data()), "expected", string(expected.Data()))) } } return nil diff --git a/exchange/bitswap/message/message.go b/exchange/bitswap/message/message.go index 6cff5e554..76afd0cbf 100644 --- a/exchange/bitswap/message/message.go +++ b/exchange/bitswap/message/message.go @@ -22,7 +22,7 @@ type BitSwapMessage interface { Wantlist() []Entry // Blocks returns a slice of unique blocks - Blocks() []*blocks.Block + Blocks() []blocks.Block // AddEntry adds an entry to the Wantlist. AddEntry(key key.Key, priority int) @@ -34,7 +34,7 @@ type BitSwapMessage interface { // A full wantlist is an authoritative copy, a 'non-full' wantlist is a patch-set Full() bool - AddBlock(*blocks.Block) + AddBlock(blocks.Block) Exportable Loggable() map[string]interface{} @@ -48,7 +48,7 @@ type Exportable interface { type impl struct { full bool wantlist map[key.Key]Entry - blocks map[key.Key]*blocks.Block + blocks map[key.Key]blocks.Block } func New(full bool) BitSwapMessage { @@ -57,7 +57,7 @@ func New(full bool) BitSwapMessage { func newMsg(full bool) *impl { return &impl{ - blocks: make(map[key.Key]*blocks.Block), + blocks: make(map[key.Key]blocks.Block), wantlist: make(map[key.Key]Entry), full: full, } @@ -96,8 +96,8 @@ func (m *impl) Wantlist() []Entry { return out } -func (m *impl) Blocks() []*blocks.Block { - bs := make([]*blocks.Block, 0, len(m.blocks)) +func (m *impl) Blocks() []blocks.Block { + bs := make([]blocks.Block, 0, len(m.blocks)) for _, block := range m.blocks { bs = append(bs, block) } @@ -129,7 +129,7 @@ func (m *impl) addEntry(k key.Key, priority int, cancel bool) { } } -func (m *impl) AddBlock(b *blocks.Block) { +func (m *impl) AddBlock(b blocks.Block) { m.blocks[b.Key()] = b } @@ -156,7 +156,7 @@ func (m *impl) ToProto() *pb.Message { }) } for _, b := range m.Blocks() { - pbm.Blocks = append(pbm.Blocks, b.Data) + pbm.Blocks = append(pbm.Blocks, b.Data()) } return pbm } diff --git a/exchange/bitswap/notifications/notifications.go b/exchange/bitswap/notifications/notifications.go index 8a83bba9b..0b7f4f33a 100644 --- a/exchange/bitswap/notifications/notifications.go +++ b/exchange/bitswap/notifications/notifications.go @@ -10,8 +10,8 @@ import ( const bufferSize = 16 type PubSub interface { - Publish(block *blocks.Block) - Subscribe(ctx context.Context, keys ...key.Key) <-chan *blocks.Block + Publish(block blocks.Block) + Subscribe(ctx context.Context, keys ...key.Key) <-chan blocks.Block Shutdown() } @@ -23,7 +23,7 @@ type impl struct { wrapped pubsub.PubSub } -func (ps *impl) Publish(block *blocks.Block) { +func (ps *impl) Publish(block blocks.Block) { topic := string(block.Key()) ps.wrapped.Pub(block, topic) } @@ -35,9 +35,9 @@ func (ps *impl) Shutdown() { // Subscribe returns a channel of blocks for the given |keys|. |blockChannel| // is closed if the |ctx| times out or is cancelled, or after sending len(keys) // blocks. -func (ps *impl) Subscribe(ctx context.Context, keys ...key.Key) <-chan *blocks.Block { +func (ps *impl) Subscribe(ctx context.Context, keys ...key.Key) <-chan blocks.Block { - blocksCh := make(chan *blocks.Block, len(keys)) + blocksCh := make(chan blocks.Block, len(keys)) valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking if len(keys) == 0 { close(blocksCh) @@ -55,7 +55,7 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...key.Key) <-chan *blocks.B if !ok { return } - block, ok := val.(*blocks.Block) + block, ok := val.(blocks.Block) if !ok { return } diff --git a/exchange/bitswap/notifications/notifications_test.go b/exchange/bitswap/notifications/notifications_test.go index 02acbd13f..3e923b84e 100644 --- a/exchange/bitswap/notifications/notifications_test.go +++ b/exchange/bitswap/notifications/notifications_test.go @@ -151,15 +151,15 @@ func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) { t.Log("publishing the large number of blocks to the ignored channel must not deadlock") } -func assertBlockChannelNil(t *testing.T, blockChannel <-chan *blocks.Block) { +func assertBlockChannelNil(t *testing.T, blockChannel <-chan blocks.Block) { _, ok := <-blockChannel if ok { t.Fail() } } -func assertBlocksEqual(t *testing.T, a, b *blocks.Block) { - if !bytes.Equal(a.Data, b.Data) { +func assertBlocksEqual(t *testing.T, a, b blocks.Block) { + if !bytes.Equal(a.Data(), b.Data()) { t.Fatal("blocks aren't equal") } if a.Key() != b.Key() { diff --git a/exchange/bitswap/testnet/network_test.go b/exchange/bitswap/testnet/network_test.go index 609e51f7e..4db57ac8e 100644 --- a/exchange/bitswap/testnet/network_test.go +++ b/exchange/bitswap/testnet/network_test.go @@ -44,7 +44,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { // TODO assert that this came from the correct peer and that the message contents are as expected ok := false for _, b := range msgFromResponder.Blocks() { - if string(b.Data) == expectedStr { + if string(b.Data()) == expectedStr { wg.Done() ok = true } diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index a9dbaa6f2..2c190d000 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -61,7 +61,7 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { log.Event(ctx, "Bitswap.TaskWorker.Work", logging.LoggableMap{ "ID": id, "Target": envelope.Peer.Pretty(), - "Block": envelope.Block.Multihash.B58String(), + "Block": envelope.Block.Multihash().B58String(), }) bs.wm.SendBlock(ctx, envelope) diff --git a/exchange/interface.go b/exchange/interface.go index dbc66e3b6..6db476d9e 100644 --- a/exchange/interface.go +++ b/exchange/interface.go @@ -13,13 +13,13 @@ import ( // exchange protocol. type Interface interface { // type Exchanger interface // GetBlock returns the block associated with a given key. - GetBlock(context.Context, key.Key) (*blocks.Block, error) + GetBlock(context.Context, key.Key) (blocks.Block, error) - GetBlocks(context.Context, []key.Key) (<-chan *blocks.Block, error) + GetBlocks(context.Context, []key.Key) (<-chan blocks.Block, error) // TODO Should callers be concerned with whether the block was made // available on the network? - HasBlock(*blocks.Block) error + HasBlock(blocks.Block) error io.Closer } diff --git a/exchange/offline/offline.go b/exchange/offline/offline.go index 8f857d933..d2ee4fbaa 100644 --- a/exchange/offline/offline.go +++ b/exchange/offline/offline.go @@ -23,12 +23,12 @@ type offlineExchange struct { // GetBlock returns nil to signal that a block could not be retrieved for the // given key. // NB: This function may return before the timeout expires. -func (e *offlineExchange) GetBlock(_ context.Context, k key.Key) (*blocks.Block, error) { +func (e *offlineExchange) GetBlock(_ context.Context, k key.Key) (blocks.Block, error) { return e.bs.Get(k) } // HasBlock always returns nil. -func (e *offlineExchange) HasBlock(b *blocks.Block) error { +func (e *offlineExchange) HasBlock(b blocks.Block) error { return e.bs.Put(b) } @@ -39,8 +39,8 @@ func (_ *offlineExchange) Close() error { return nil } -func (e *offlineExchange) GetBlocks(ctx context.Context, ks []key.Key) (<-chan *blocks.Block, error) { - out := make(chan *blocks.Block, 0) +func (e *offlineExchange) GetBlocks(ctx context.Context, ks []key.Key) (<-chan blocks.Block, error) { + out := make(chan blocks.Block, 0) go func() { defer close(out) var misses []key.Key diff --git a/importer/chunk/rabin_test.go b/importer/chunk/rabin_test.go index 7702d3e76..9b9cfce8f 100644 --- a/importer/chunk/rabin_test.go +++ b/importer/chunk/rabin_test.go @@ -39,10 +39,10 @@ func TestRabinChunking(t *testing.T) { } } -func chunkData(t *testing.T, data []byte) map[key.Key]*blocks.Block { +func chunkData(t *testing.T, data []byte) map[key.Key]blocks.Block { r := NewRabin(bytes.NewReader(data), 1024*256) - blkmap := make(map[key.Key]*blocks.Block) + blkmap := make(map[key.Key]blocks.Block) for { blk, err := r.NextBytes() diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index b67723d58..938b71308 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -52,13 +52,13 @@ func (n *dagService) Add(nd *Node) (key.Key, error) { return "", err } - b := new(blocks.Block) - b.Data = d - b.Multihash, err = nd.Multihash() + mh, err := nd.Multihash() if err != nil { return "", err } + b, _ := blocks.NewBlockWithHash(d, mh) + return n.Blocks.AddBlock(b) } @@ -82,7 +82,7 @@ func (n *dagService) Get(ctx context.Context, k key.Key) (*Node, error) { return nil, fmt.Errorf("Failed to get block for %s: %v", k.B58String(), err) } - res, err := DecodeProtobuf(b.Data) + res, err := DecodeProtobuf(b.Data()) if err != nil { return nil, fmt.Errorf("Failed to decode Protocol Buffers: %v", err) } @@ -135,7 +135,7 @@ func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) <-chan *NodeO } return } - nd, err := DecodeProtobuf(b.Data) + nd, err := DecodeProtobuf(b.Data()) if err != nil { out <- &NodeOption{Err: err} return @@ -316,7 +316,7 @@ func (np *nodePromise) Get(ctx context.Context) (*Node, error) { type Batch struct { ds *dagService - blocks []*blocks.Block + blocks []blocks.Block size int MaxSize int } @@ -327,17 +327,17 @@ func (t *Batch) Add(nd *Node) (key.Key, error) { return "", err } - b := new(blocks.Block) - b.Data = d - b.Multihash, err = nd.Multihash() + mh, err := nd.Multihash() if err != nil { return "", err } - k := key.Key(b.Multihash) + b, _ := blocks.NewBlockWithHash(d, mh) + + k := key.Key(mh) t.blocks = append(t.blocks, b) - t.size += len(b.Data) + t.size += len(b.Data()) if t.size > t.MaxSize { return k, t.Commit() } diff --git a/test/integration/bitswap_wo_routing_test.go b/test/integration/bitswap_wo_routing_test.go index f785bb1cf..2d0bbc7d4 100644 --- a/test/integration/bitswap_wo_routing_test.go +++ b/test/integration/bitswap_wo_routing_test.go @@ -71,7 +71,7 @@ func TestBitswapWithoutRouting(t *testing.T) { b, err := n.Blocks.GetBlock(ctx, block0.Key()) if err != nil { t.Error(err) - } else if !bytes.Equal(b.Data, block0.Data) { + } else if !bytes.Equal(b.Data(), block0.Data()) { t.Error("byte comparison fail") } else { log.Debug("got block: %s", b.Key()) @@ -88,7 +88,7 @@ func TestBitswapWithoutRouting(t *testing.T) { b, err := n.Blocks.GetBlock(ctx, block1.Key()) if err != nil { t.Error(err) - } else if !bytes.Equal(b.Data, block1.Data) { + } else if !bytes.Equal(b.Data(), block1.Data()) { t.Error("byte comparison fail") } else { log.Debug("got block: %s", b.Key())