mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-30 01:52:26 +08:00
Merge pull request #2633 from ipfs-filestore/blocks-interface
Make blocks.Block an interface.
This commit is contained in:
@ -11,40 +11,56 @@ import (
|
|||||||
u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util"
|
u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type Block interface {
|
||||||
|
Multihash() mh.Multihash
|
||||||
|
Data() []byte
|
||||||
|
Key() key.Key
|
||||||
|
String() string
|
||||||
|
Loggable() map[string]interface{}
|
||||||
|
}
|
||||||
|
|
||||||
// Block is a singular block of data in ipfs
|
// Block is a singular block of data in ipfs
|
||||||
type Block struct {
|
type RawBlock struct {
|
||||||
Multihash mh.Multihash
|
multihash mh.Multihash
|
||||||
Data []byte
|
data []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBlock creates a Block object from opaque data. It will hash the data.
|
// NewBlock creates a Block object from opaque data. It will hash the data.
|
||||||
func NewBlock(data []byte) *Block {
|
func NewBlock(data []byte) *RawBlock {
|
||||||
return &Block{Data: data, Multihash: u.Hash(data)}
|
return &RawBlock{data: data, multihash: u.Hash(data)}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBlockWithHash creates a new block when the hash of the data
|
// NewBlockWithHash creates a new block when the hash of the data
|
||||||
// is already known, this is used to save time in situations where
|
// is already known, this is used to save time in situations where
|
||||||
// we are able to be confident that the data is correct
|
// we are able to be confident that the data is correct
|
||||||
func NewBlockWithHash(data []byte, h mh.Multihash) (*Block, error) {
|
func NewBlockWithHash(data []byte, h mh.Multihash) (*RawBlock, error) {
|
||||||
if u.Debug {
|
if u.Debug {
|
||||||
chk := u.Hash(data)
|
chk := u.Hash(data)
|
||||||
if string(chk) != string(h) {
|
if string(chk) != string(h) {
|
||||||
return nil, errors.New("Data did not match given hash!")
|
return nil, errors.New("Data did not match given hash!")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return &Block{Data: data, Multihash: h}, nil
|
return &RawBlock{data: data, multihash: h}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *RawBlock) Multihash() mh.Multihash {
|
||||||
|
return b.multihash
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *RawBlock) Data() []byte {
|
||||||
|
return b.data
|
||||||
}
|
}
|
||||||
|
|
||||||
// Key returns the block's Multihash as a Key value.
|
// Key returns the block's Multihash as a Key value.
|
||||||
func (b *Block) Key() key.Key {
|
func (b *RawBlock) Key() key.Key {
|
||||||
return key.Key(b.Multihash)
|
return key.Key(b.multihash)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Block) String() string {
|
func (b *RawBlock) String() string {
|
||||||
return fmt.Sprintf("[Block %s]", b.Key())
|
return fmt.Sprintf("[Block %s]", b.Key())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Block) Loggable() map[string]interface{} {
|
func (b *RawBlock) Loggable() map[string]interface{} {
|
||||||
return map[string]interface{}{
|
return map[string]interface{}{
|
||||||
"block": b.Key().String(),
|
"block": b.Key().String(),
|
||||||
}
|
}
|
||||||
|
@ -30,9 +30,9 @@ var ErrNotFound = errors.New("blockstore: block not found")
|
|||||||
type Blockstore interface {
|
type Blockstore interface {
|
||||||
DeleteBlock(key.Key) error
|
DeleteBlock(key.Key) error
|
||||||
Has(key.Key) (bool, error)
|
Has(key.Key) (bool, error)
|
||||||
Get(key.Key) (*blocks.Block, error)
|
Get(key.Key) (blocks.Block, error)
|
||||||
Put(*blocks.Block) error
|
Put(blocks.Block) error
|
||||||
PutMany([]*blocks.Block) error
|
PutMany([]blocks.Block) error
|
||||||
|
|
||||||
AllKeysChan(ctx context.Context) (<-chan key.Key, error)
|
AllKeysChan(ctx context.Context) (<-chan key.Key, error)
|
||||||
}
|
}
|
||||||
@ -73,7 +73,7 @@ type blockstore struct {
|
|||||||
gcreqlk sync.Mutex
|
gcreqlk sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *blockstore) Get(k key.Key) (*blocks.Block, error) {
|
func (bs *blockstore) Get(k key.Key) (blocks.Block, error) {
|
||||||
maybeData, err := bs.datastore.Get(k.DsKey())
|
maybeData, err := bs.datastore.Get(k.DsKey())
|
||||||
if err == ds.ErrNotFound {
|
if err == ds.ErrNotFound {
|
||||||
return nil, ErrNotFound
|
return nil, ErrNotFound
|
||||||
@ -89,7 +89,7 @@ func (bs *blockstore) Get(k key.Key) (*blocks.Block, error) {
|
|||||||
return blocks.NewBlockWithHash(bdata, mh.Multihash(k))
|
return blocks.NewBlockWithHash(bdata, mh.Multihash(k))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *blockstore) Put(block *blocks.Block) error {
|
func (bs *blockstore) Put(block blocks.Block) error {
|
||||||
k := block.Key().DsKey()
|
k := block.Key().DsKey()
|
||||||
|
|
||||||
// Has is cheaper than Put, so see if we already have it
|
// Has is cheaper than Put, so see if we already have it
|
||||||
@ -97,10 +97,10 @@ func (bs *blockstore) Put(block *blocks.Block) error {
|
|||||||
if err == nil && exists {
|
if err == nil && exists {
|
||||||
return nil // already stored.
|
return nil // already stored.
|
||||||
}
|
}
|
||||||
return bs.datastore.Put(k, block.Data)
|
return bs.datastore.Put(k, block.Data())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *blockstore) PutMany(blocks []*blocks.Block) error {
|
func (bs *blockstore) PutMany(blocks []blocks.Block) error {
|
||||||
t, err := bs.datastore.Batch()
|
t, err := bs.datastore.Batch()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -112,7 +112,7 @@ func (bs *blockstore) PutMany(blocks []*blocks.Block) error {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
err = t.Put(k, b.Data)
|
err = t.Put(k, b.Data())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -40,7 +40,7 @@ func TestPutThenGetBlock(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if !bytes.Equal(block.Data, blockFromBlockstore.Data) {
|
if !bytes.Equal(block.Data(), blockFromBlockstore.Data()) {
|
||||||
t.Fail()
|
t.Fail()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -34,11 +34,11 @@ func (w *writecache) Has(k key.Key) (bool, error) {
|
|||||||
return w.blockstore.Has(k)
|
return w.blockstore.Has(k)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *writecache) Get(k key.Key) (*blocks.Block, error) {
|
func (w *writecache) Get(k key.Key) (blocks.Block, error) {
|
||||||
return w.blockstore.Get(k)
|
return w.blockstore.Get(k)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *writecache) Put(b *blocks.Block) error {
|
func (w *writecache) Put(b blocks.Block) error {
|
||||||
k := b.Key()
|
k := b.Key()
|
||||||
if _, ok := w.cache.Get(k); ok {
|
if _, ok := w.cache.Get(k); ok {
|
||||||
return nil
|
return nil
|
||||||
@ -49,8 +49,8 @@ func (w *writecache) Put(b *blocks.Block) error {
|
|||||||
return w.blockstore.Put(b)
|
return w.blockstore.Put(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *writecache) PutMany(bs []*blocks.Block) error {
|
func (w *writecache) PutMany(bs []blocks.Block) error {
|
||||||
var good []*blocks.Block
|
var good []blocks.Block
|
||||||
for _, b := range bs {
|
for _, b := range bs {
|
||||||
if _, ok := w.cache.Get(b.Key()); !ok {
|
if _, ok := w.cache.Get(b.Key()); !ok {
|
||||||
good = append(good, b)
|
good = append(good, b)
|
||||||
|
@ -10,13 +10,13 @@ type BlockGenerator struct {
|
|||||||
seq int
|
seq int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bg *BlockGenerator) Next() *blocks.Block {
|
func (bg *BlockGenerator) Next() blocks.Block {
|
||||||
bg.seq++
|
bg.seq++
|
||||||
return blocks.NewBlock([]byte(string(bg.seq)))
|
return blocks.NewBlock([]byte(string(bg.seq)))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bg *BlockGenerator) Blocks(n int) []*blocks.Block {
|
func (bg *BlockGenerator) Blocks(n int) []blocks.Block {
|
||||||
blocks := make([]*blocks.Block, 0)
|
blocks := make([]blocks.Block, 0)
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
b := bg.Next()
|
b := bg.Next()
|
||||||
blocks = append(blocks, b)
|
blocks = append(blocks, b)
|
||||||
|
@ -41,7 +41,7 @@ func New(bs blockstore.Blockstore, rem exchange.Interface) *BlockService {
|
|||||||
|
|
||||||
// AddBlock adds a particular block to the service, Putting it into the datastore.
|
// AddBlock adds a particular block to the service, Putting it into the datastore.
|
||||||
// TODO pass a context into this if the remote.HasBlock is going to remain here.
|
// TODO pass a context into this if the remote.HasBlock is going to remain here.
|
||||||
func (s *BlockService) AddBlock(b *blocks.Block) (key.Key, error) {
|
func (s *BlockService) AddBlock(b blocks.Block) (key.Key, error) {
|
||||||
k := b.Key()
|
k := b.Key()
|
||||||
err := s.Blockstore.Put(b)
|
err := s.Blockstore.Put(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -53,7 +53,7 @@ func (s *BlockService) AddBlock(b *blocks.Block) (key.Key, error) {
|
|||||||
return k, nil
|
return k, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BlockService) AddBlocks(bs []*blocks.Block) ([]key.Key, error) {
|
func (s *BlockService) AddBlocks(bs []blocks.Block) ([]key.Key, error) {
|
||||||
err := s.Blockstore.PutMany(bs)
|
err := s.Blockstore.PutMany(bs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -71,7 +71,7 @@ func (s *BlockService) AddBlocks(bs []*blocks.Block) ([]key.Key, error) {
|
|||||||
|
|
||||||
// GetBlock retrieves a particular block from the service,
|
// GetBlock retrieves a particular block from the service,
|
||||||
// Getting it from the datastore using the key (hash).
|
// Getting it from the datastore using the key (hash).
|
||||||
func (s *BlockService) GetBlock(ctx context.Context, k key.Key) (*blocks.Block, error) {
|
func (s *BlockService) GetBlock(ctx context.Context, k key.Key) (blocks.Block, error) {
|
||||||
log.Debugf("BlockService GetBlock: '%s'", k)
|
log.Debugf("BlockService GetBlock: '%s'", k)
|
||||||
block, err := s.Blockstore.Get(k)
|
block, err := s.Blockstore.Get(k)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
@ -103,8 +103,8 @@ func (s *BlockService) GetBlock(ctx context.Context, k key.Key) (*blocks.Block,
|
|||||||
// GetBlocks gets a list of blocks asynchronously and returns through
|
// GetBlocks gets a list of blocks asynchronously and returns through
|
||||||
// the returned channel.
|
// the returned channel.
|
||||||
// NB: No guarantees are made about order.
|
// NB: No guarantees are made about order.
|
||||||
func (s *BlockService) GetBlocks(ctx context.Context, ks []key.Key) <-chan *blocks.Block {
|
func (s *BlockService) GetBlocks(ctx context.Context, ks []key.Key) <-chan blocks.Block {
|
||||||
out := make(chan *blocks.Block, 0)
|
out := make(chan blocks.Block, 0)
|
||||||
go func() {
|
go func() {
|
||||||
defer close(out)
|
defer close(out)
|
||||||
var misses []key.Key
|
var misses []key.Key
|
||||||
|
@ -24,7 +24,7 @@ func TestBlocks(t *testing.T) {
|
|||||||
|
|
||||||
b := blocks.NewBlock([]byte("beep boop"))
|
b := blocks.NewBlock([]byte("beep boop"))
|
||||||
h := u.Hash([]byte("beep boop"))
|
h := u.Hash([]byte("beep boop"))
|
||||||
if !bytes.Equal(b.Multihash, h) {
|
if !bytes.Equal(b.Multihash(), h) {
|
||||||
t.Error("Block Multihash and data multihash not equal")
|
t.Error("Block Multihash and data multihash not equal")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -54,7 +54,7 @@ func TestBlocks(t *testing.T) {
|
|||||||
t.Error("Block keys not equal.")
|
t.Error("Block keys not equal.")
|
||||||
}
|
}
|
||||||
|
|
||||||
if !bytes.Equal(b.Data, b2.Data) {
|
if !bytes.Equal(b.Data(), b2.Data()) {
|
||||||
t.Error("Block data is not equal.")
|
t.Error("Block data is not equal.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -79,7 +79,7 @@ func TestGetBlocksSequential(t *testing.T) {
|
|||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*50)
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*50)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
out := servs[i].GetBlocks(ctx, keys)
|
out := servs[i].GetBlocks(ctx, keys)
|
||||||
gotten := make(map[key.Key]*blocks.Block)
|
gotten := make(map[key.Key]blocks.Block)
|
||||||
for blk := range out {
|
for blk := range out {
|
||||||
if _, ok := gotten[blk.Key()]; ok {
|
if _, ok := gotten[blk.Key()]; ok {
|
||||||
t.Fatal("Got duplicate block!")
|
t.Fatal("Got duplicate block!")
|
||||||
|
@ -66,7 +66,7 @@ on raw ipfs blocks. It outputs the following to stdout:
|
|||||||
|
|
||||||
res.SetOutput(&BlockStat{
|
res.SetOutput(&BlockStat{
|
||||||
Key: b.Key().B58String(),
|
Key: b.Key().B58String(),
|
||||||
Size: len(b.Data),
|
Size: len(b.Data()),
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
Type: BlockStat{},
|
Type: BlockStat{},
|
||||||
@ -97,7 +97,7 @@ It outputs to stdout, and <key> is a base58 encoded multihash.
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
res.SetOutput(bytes.NewReader(b.Data))
|
res.SetOutput(bytes.NewReader(b.Data()))
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -161,7 +161,7 @@ It reads from stdin, and <key> is a base58 encoded multihash.
|
|||||||
Type: BlockStat{},
|
Type: BlockStat{},
|
||||||
}
|
}
|
||||||
|
|
||||||
func getBlockForKey(req cmds.Request, skey string) (*blocks.Block, error) {
|
func getBlockForKey(req cmds.Request, skey string) (blocks.Block, error) {
|
||||||
n, err := req.InvocContext().GetNode()
|
n, err := req.InvocContext().GetNode()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -90,7 +90,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
|
|||||||
network: network,
|
network: network,
|
||||||
findKeys: make(chan *wantlist.Entry, sizeBatchRequestChan),
|
findKeys: make(chan *wantlist.Entry, sizeBatchRequestChan),
|
||||||
process: px,
|
process: px,
|
||||||
newBlocks: make(chan *blocks.Block, HasBlockBufferSize),
|
newBlocks: make(chan blocks.Block, HasBlockBufferSize),
|
||||||
provideKeys: make(chan key.Key, provideKeysBufferSize),
|
provideKeys: make(chan key.Key, provideKeysBufferSize),
|
||||||
wm: NewWantManager(ctx, network),
|
wm: NewWantManager(ctx, network),
|
||||||
}
|
}
|
||||||
@ -137,7 +137,7 @@ type Bitswap struct {
|
|||||||
|
|
||||||
process process.Process
|
process process.Process
|
||||||
|
|
||||||
newBlocks chan *blocks.Block
|
newBlocks chan blocks.Block
|
||||||
|
|
||||||
provideKeys chan key.Key
|
provideKeys chan key.Key
|
||||||
|
|
||||||
@ -154,7 +154,7 @@ type blockRequest struct {
|
|||||||
|
|
||||||
// GetBlock attempts to retrieve a particular block from peers within the
|
// GetBlock attempts to retrieve a particular block from peers within the
|
||||||
// deadline enforced by the context.
|
// deadline enforced by the context.
|
||||||
func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (*blocks.Block, error) {
|
func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (blocks.Block, error) {
|
||||||
|
|
||||||
// Any async work initiated by this function must end when this function
|
// Any async work initiated by this function must end when this function
|
||||||
// returns. To ensure this, derive a new context. Note that it is okay to
|
// returns. To ensure this, derive a new context. Note that it is okay to
|
||||||
@ -209,9 +209,9 @@ func (bs *Bitswap) WantlistForPeer(p peer.ID) []key.Key {
|
|||||||
// NB: Your request remains open until the context expires. To conserve
|
// NB: Your request remains open until the context expires. To conserve
|
||||||
// resources, provide a context with a reasonably short deadline (ie. not one
|
// resources, provide a context with a reasonably short deadline (ie. not one
|
||||||
// that lasts throughout the lifetime of the server)
|
// that lasts throughout the lifetime of the server)
|
||||||
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan *blocks.Block, error) {
|
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks.Block, error) {
|
||||||
if len(keys) == 0 {
|
if len(keys) == 0 {
|
||||||
out := make(chan *blocks.Block)
|
out := make(chan blocks.Block)
|
||||||
close(out)
|
close(out)
|
||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
@ -251,7 +251,7 @@ func (bs *Bitswap) CancelWants(ks []key.Key) {
|
|||||||
|
|
||||||
// HasBlock announces the existance of a block to this bitswap service. The
|
// HasBlock announces the existance of a block to this bitswap service. The
|
||||||
// service will potentially notify its peers.
|
// service will potentially notify its peers.
|
||||||
func (bs *Bitswap) HasBlock(blk *blocks.Block) error {
|
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
|
||||||
select {
|
select {
|
||||||
case <-bs.process.Closing():
|
case <-bs.process.Closing():
|
||||||
return errors.New("bitswap is closed")
|
return errors.New("bitswap is closed")
|
||||||
@ -277,7 +277,7 @@ func (bs *Bitswap) HasBlock(blk *blocks.Block) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *Bitswap) tryPutBlock(blk *blocks.Block, attempts int) error {
|
func (bs *Bitswap) tryPutBlock(blk blocks.Block, attempts int) error {
|
||||||
var err error
|
var err error
|
||||||
for i := 0; i < attempts; i++ {
|
for i := 0; i < attempts; i++ {
|
||||||
if err = bs.blockstore.Put(blk); err == nil {
|
if err = bs.blockstore.Put(blk); err == nil {
|
||||||
@ -316,7 +316,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
|
|||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
for _, block := range iblocks {
|
for _, block := range iblocks {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(b *blocks.Block) {
|
go func(b blocks.Block) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
if err := bs.updateReceiveCounters(b); err != nil {
|
if err := bs.updateReceiveCounters(b); err != nil {
|
||||||
@ -337,7 +337,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
|
|||||||
|
|
||||||
var ErrAlreadyHaveBlock = errors.New("already have block")
|
var ErrAlreadyHaveBlock = errors.New("already have block")
|
||||||
|
|
||||||
func (bs *Bitswap) updateReceiveCounters(b *blocks.Block) error {
|
func (bs *Bitswap) updateReceiveCounters(b blocks.Block) error {
|
||||||
bs.counterLk.Lock()
|
bs.counterLk.Lock()
|
||||||
defer bs.counterLk.Unlock()
|
defer bs.counterLk.Unlock()
|
||||||
bs.blocksRecvd++
|
bs.blocksRecvd++
|
||||||
@ -348,7 +348,7 @@ func (bs *Bitswap) updateReceiveCounters(b *blocks.Block) error {
|
|||||||
}
|
}
|
||||||
if err == nil && has {
|
if err == nil && has {
|
||||||
bs.dupBlocksRecvd++
|
bs.dupBlocksRecvd++
|
||||||
bs.dupDataRecvd += uint64(len(b.Data))
|
bs.dupDataRecvd += uint64(len(b.Data()))
|
||||||
}
|
}
|
||||||
|
|
||||||
if has {
|
if has {
|
||||||
|
@ -85,7 +85,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
|
|||||||
t.Fatal("Expected to succeed")
|
t.Fatal("Expected to succeed")
|
||||||
}
|
}
|
||||||
|
|
||||||
if !bytes.Equal(block.Data, received.Data) {
|
if !bytes.Equal(block.Data(), received.Data()) {
|
||||||
t.Fatal("Data doesn't match")
|
t.Fatal("Data doesn't match")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -218,7 +218,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func getOrFail(bitswap Instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
|
func getOrFail(bitswap Instance, b blocks.Block, t *testing.T, wg *sync.WaitGroup) {
|
||||||
if _, err := bitswap.Blockstore().Get(b.Key()); err != nil {
|
if _, err := bitswap.Blockstore().Get(b.Key()); err != nil {
|
||||||
_, err := bitswap.Exchange.GetBlock(context.Background(), b.Key())
|
_, err := bitswap.Exchange.GetBlock(context.Background(), b.Key())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -58,7 +58,7 @@ type Envelope struct {
|
|||||||
Peer peer.ID
|
Peer peer.ID
|
||||||
|
|
||||||
// Block is the payload
|
// Block is the payload
|
||||||
Block *blocks.Block
|
Block blocks.Block
|
||||||
|
|
||||||
// A callback to notify the decision queue that the task is complete
|
// A callback to notify the decision queue that the task is complete
|
||||||
Sent func()
|
Sent func()
|
||||||
@ -226,13 +226,13 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, block := range m.Blocks() {
|
for _, block := range m.Blocks() {
|
||||||
log.Debugf("got block %s %d bytes", block.Key(), len(block.Data))
|
log.Debugf("got block %s %d bytes", block.Key(), len(block.Data()))
|
||||||
l.ReceivedBytes(len(block.Data))
|
l.ReceivedBytes(len(block.Data()))
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Engine) addBlock(block *blocks.Block) {
|
func (e *Engine) addBlock(block blocks.Block) {
|
||||||
work := false
|
work := false
|
||||||
|
|
||||||
for _, l := range e.ledgerMap {
|
for _, l := range e.ledgerMap {
|
||||||
@ -247,7 +247,7 @@ func (e *Engine) addBlock(block *blocks.Block) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Engine) AddBlock(block *blocks.Block) {
|
func (e *Engine) AddBlock(block blocks.Block) {
|
||||||
e.lock.Lock()
|
e.lock.Lock()
|
||||||
defer e.lock.Unlock()
|
defer e.lock.Unlock()
|
||||||
|
|
||||||
@ -266,7 +266,7 @@ func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error {
|
|||||||
|
|
||||||
l := e.findOrCreate(p)
|
l := e.findOrCreate(p)
|
||||||
for _, block := range m.Blocks() {
|
for _, block := range m.Blocks() {
|
||||||
l.SentBytes(len(block.Data))
|
l.SentBytes(len(block.Data()))
|
||||||
l.wantList.Remove(block.Key())
|
l.wantList.Remove(block.Key())
|
||||||
e.peerRequestQueue.Remove(block.Key(), p)
|
e.peerRequestQueue.Remove(block.Key(), p)
|
||||||
}
|
}
|
||||||
|
@ -188,7 +188,7 @@ func checkHandledInOrder(t *testing.T, e *Engine, keys []string) error {
|
|||||||
received := envelope.Block
|
received := envelope.Block
|
||||||
expected := blocks.NewBlock([]byte(k))
|
expected := blocks.NewBlock([]byte(k))
|
||||||
if received.Key() != expected.Key() {
|
if received.Key() != expected.Key() {
|
||||||
return errors.New(fmt.Sprintln("received", string(received.Data), "expected", string(expected.Data)))
|
return errors.New(fmt.Sprintln("received", string(received.Data()), "expected", string(expected.Data())))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -22,7 +22,7 @@ type BitSwapMessage interface {
|
|||||||
Wantlist() []Entry
|
Wantlist() []Entry
|
||||||
|
|
||||||
// Blocks returns a slice of unique blocks
|
// Blocks returns a slice of unique blocks
|
||||||
Blocks() []*blocks.Block
|
Blocks() []blocks.Block
|
||||||
|
|
||||||
// AddEntry adds an entry to the Wantlist.
|
// AddEntry adds an entry to the Wantlist.
|
||||||
AddEntry(key key.Key, priority int)
|
AddEntry(key key.Key, priority int)
|
||||||
@ -34,7 +34,7 @@ type BitSwapMessage interface {
|
|||||||
// A full wantlist is an authoritative copy, a 'non-full' wantlist is a patch-set
|
// A full wantlist is an authoritative copy, a 'non-full' wantlist is a patch-set
|
||||||
Full() bool
|
Full() bool
|
||||||
|
|
||||||
AddBlock(*blocks.Block)
|
AddBlock(blocks.Block)
|
||||||
Exportable
|
Exportable
|
||||||
|
|
||||||
Loggable() map[string]interface{}
|
Loggable() map[string]interface{}
|
||||||
@ -48,7 +48,7 @@ type Exportable interface {
|
|||||||
type impl struct {
|
type impl struct {
|
||||||
full bool
|
full bool
|
||||||
wantlist map[key.Key]Entry
|
wantlist map[key.Key]Entry
|
||||||
blocks map[key.Key]*blocks.Block
|
blocks map[key.Key]blocks.Block
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(full bool) BitSwapMessage {
|
func New(full bool) BitSwapMessage {
|
||||||
@ -57,7 +57,7 @@ func New(full bool) BitSwapMessage {
|
|||||||
|
|
||||||
func newMsg(full bool) *impl {
|
func newMsg(full bool) *impl {
|
||||||
return &impl{
|
return &impl{
|
||||||
blocks: make(map[key.Key]*blocks.Block),
|
blocks: make(map[key.Key]blocks.Block),
|
||||||
wantlist: make(map[key.Key]Entry),
|
wantlist: make(map[key.Key]Entry),
|
||||||
full: full,
|
full: full,
|
||||||
}
|
}
|
||||||
@ -96,8 +96,8 @@ func (m *impl) Wantlist() []Entry {
|
|||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *impl) Blocks() []*blocks.Block {
|
func (m *impl) Blocks() []blocks.Block {
|
||||||
bs := make([]*blocks.Block, 0, len(m.blocks))
|
bs := make([]blocks.Block, 0, len(m.blocks))
|
||||||
for _, block := range m.blocks {
|
for _, block := range m.blocks {
|
||||||
bs = append(bs, block)
|
bs = append(bs, block)
|
||||||
}
|
}
|
||||||
@ -129,7 +129,7 @@ func (m *impl) addEntry(k key.Key, priority int, cancel bool) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *impl) AddBlock(b *blocks.Block) {
|
func (m *impl) AddBlock(b blocks.Block) {
|
||||||
m.blocks[b.Key()] = b
|
m.blocks[b.Key()] = b
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -156,7 +156,7 @@ func (m *impl) ToProto() *pb.Message {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
for _, b := range m.Blocks() {
|
for _, b := range m.Blocks() {
|
||||||
pbm.Blocks = append(pbm.Blocks, b.Data)
|
pbm.Blocks = append(pbm.Blocks, b.Data())
|
||||||
}
|
}
|
||||||
return pbm
|
return pbm
|
||||||
}
|
}
|
||||||
|
@ -10,8 +10,8 @@ import (
|
|||||||
const bufferSize = 16
|
const bufferSize = 16
|
||||||
|
|
||||||
type PubSub interface {
|
type PubSub interface {
|
||||||
Publish(block *blocks.Block)
|
Publish(block blocks.Block)
|
||||||
Subscribe(ctx context.Context, keys ...key.Key) <-chan *blocks.Block
|
Subscribe(ctx context.Context, keys ...key.Key) <-chan blocks.Block
|
||||||
Shutdown()
|
Shutdown()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -23,7 +23,7 @@ type impl struct {
|
|||||||
wrapped pubsub.PubSub
|
wrapped pubsub.PubSub
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ps *impl) Publish(block *blocks.Block) {
|
func (ps *impl) Publish(block blocks.Block) {
|
||||||
topic := string(block.Key())
|
topic := string(block.Key())
|
||||||
ps.wrapped.Pub(block, topic)
|
ps.wrapped.Pub(block, topic)
|
||||||
}
|
}
|
||||||
@ -35,9 +35,9 @@ func (ps *impl) Shutdown() {
|
|||||||
// Subscribe returns a channel of blocks for the given |keys|. |blockChannel|
|
// Subscribe returns a channel of blocks for the given |keys|. |blockChannel|
|
||||||
// is closed if the |ctx| times out or is cancelled, or after sending len(keys)
|
// is closed if the |ctx| times out or is cancelled, or after sending len(keys)
|
||||||
// blocks.
|
// blocks.
|
||||||
func (ps *impl) Subscribe(ctx context.Context, keys ...key.Key) <-chan *blocks.Block {
|
func (ps *impl) Subscribe(ctx context.Context, keys ...key.Key) <-chan blocks.Block {
|
||||||
|
|
||||||
blocksCh := make(chan *blocks.Block, len(keys))
|
blocksCh := make(chan blocks.Block, len(keys))
|
||||||
valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking
|
valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking
|
||||||
if len(keys) == 0 {
|
if len(keys) == 0 {
|
||||||
close(blocksCh)
|
close(blocksCh)
|
||||||
@ -55,7 +55,7 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...key.Key) <-chan *blocks.B
|
|||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
block, ok := val.(*blocks.Block)
|
block, ok := val.(blocks.Block)
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -151,15 +151,15 @@ func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) {
|
|||||||
t.Log("publishing the large number of blocks to the ignored channel must not deadlock")
|
t.Log("publishing the large number of blocks to the ignored channel must not deadlock")
|
||||||
}
|
}
|
||||||
|
|
||||||
func assertBlockChannelNil(t *testing.T, blockChannel <-chan *blocks.Block) {
|
func assertBlockChannelNil(t *testing.T, blockChannel <-chan blocks.Block) {
|
||||||
_, ok := <-blockChannel
|
_, ok := <-blockChannel
|
||||||
if ok {
|
if ok {
|
||||||
t.Fail()
|
t.Fail()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func assertBlocksEqual(t *testing.T, a, b *blocks.Block) {
|
func assertBlocksEqual(t *testing.T, a, b blocks.Block) {
|
||||||
if !bytes.Equal(a.Data, b.Data) {
|
if !bytes.Equal(a.Data(), b.Data()) {
|
||||||
t.Fatal("blocks aren't equal")
|
t.Fatal("blocks aren't equal")
|
||||||
}
|
}
|
||||||
if a.Key() != b.Key() {
|
if a.Key() != b.Key() {
|
||||||
|
@ -44,7 +44,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
|
|||||||
// TODO assert that this came from the correct peer and that the message contents are as expected
|
// TODO assert that this came from the correct peer and that the message contents are as expected
|
||||||
ok := false
|
ok := false
|
||||||
for _, b := range msgFromResponder.Blocks() {
|
for _, b := range msgFromResponder.Blocks() {
|
||||||
if string(b.Data) == expectedStr {
|
if string(b.Data()) == expectedStr {
|
||||||
wg.Done()
|
wg.Done()
|
||||||
ok = true
|
ok = true
|
||||||
}
|
}
|
||||||
|
@ -61,7 +61,7 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
|
|||||||
log.Event(ctx, "Bitswap.TaskWorker.Work", logging.LoggableMap{
|
log.Event(ctx, "Bitswap.TaskWorker.Work", logging.LoggableMap{
|
||||||
"ID": id,
|
"ID": id,
|
||||||
"Target": envelope.Peer.Pretty(),
|
"Target": envelope.Peer.Pretty(),
|
||||||
"Block": envelope.Block.Multihash.B58String(),
|
"Block": envelope.Block.Multihash().B58String(),
|
||||||
})
|
})
|
||||||
|
|
||||||
bs.wm.SendBlock(ctx, envelope)
|
bs.wm.SendBlock(ctx, envelope)
|
||||||
|
@ -13,13 +13,13 @@ import (
|
|||||||
// exchange protocol.
|
// exchange protocol.
|
||||||
type Interface interface { // type Exchanger interface
|
type Interface interface { // type Exchanger interface
|
||||||
// GetBlock returns the block associated with a given key.
|
// GetBlock returns the block associated with a given key.
|
||||||
GetBlock(context.Context, key.Key) (*blocks.Block, error)
|
GetBlock(context.Context, key.Key) (blocks.Block, error)
|
||||||
|
|
||||||
GetBlocks(context.Context, []key.Key) (<-chan *blocks.Block, error)
|
GetBlocks(context.Context, []key.Key) (<-chan blocks.Block, error)
|
||||||
|
|
||||||
// TODO Should callers be concerned with whether the block was made
|
// TODO Should callers be concerned with whether the block was made
|
||||||
// available on the network?
|
// available on the network?
|
||||||
HasBlock(*blocks.Block) error
|
HasBlock(blocks.Block) error
|
||||||
|
|
||||||
io.Closer
|
io.Closer
|
||||||
}
|
}
|
||||||
|
@ -23,12 +23,12 @@ type offlineExchange struct {
|
|||||||
// GetBlock returns nil to signal that a block could not be retrieved for the
|
// GetBlock returns nil to signal that a block could not be retrieved for the
|
||||||
// given key.
|
// given key.
|
||||||
// NB: This function may return before the timeout expires.
|
// NB: This function may return before the timeout expires.
|
||||||
func (e *offlineExchange) GetBlock(_ context.Context, k key.Key) (*blocks.Block, error) {
|
func (e *offlineExchange) GetBlock(_ context.Context, k key.Key) (blocks.Block, error) {
|
||||||
return e.bs.Get(k)
|
return e.bs.Get(k)
|
||||||
}
|
}
|
||||||
|
|
||||||
// HasBlock always returns nil.
|
// HasBlock always returns nil.
|
||||||
func (e *offlineExchange) HasBlock(b *blocks.Block) error {
|
func (e *offlineExchange) HasBlock(b blocks.Block) error {
|
||||||
return e.bs.Put(b)
|
return e.bs.Put(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -39,8 +39,8 @@ func (_ *offlineExchange) Close() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *offlineExchange) GetBlocks(ctx context.Context, ks []key.Key) (<-chan *blocks.Block, error) {
|
func (e *offlineExchange) GetBlocks(ctx context.Context, ks []key.Key) (<-chan blocks.Block, error) {
|
||||||
out := make(chan *blocks.Block, 0)
|
out := make(chan blocks.Block, 0)
|
||||||
go func() {
|
go func() {
|
||||||
defer close(out)
|
defer close(out)
|
||||||
var misses []key.Key
|
var misses []key.Key
|
||||||
|
@ -39,10 +39,10 @@ func TestRabinChunking(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func chunkData(t *testing.T, data []byte) map[key.Key]*blocks.Block {
|
func chunkData(t *testing.T, data []byte) map[key.Key]blocks.Block {
|
||||||
r := NewRabin(bytes.NewReader(data), 1024*256)
|
r := NewRabin(bytes.NewReader(data), 1024*256)
|
||||||
|
|
||||||
blkmap := make(map[key.Key]*blocks.Block)
|
blkmap := make(map[key.Key]blocks.Block)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
blk, err := r.NextBytes()
|
blk, err := r.NextBytes()
|
||||||
|
@ -52,13 +52,13 @@ func (n *dagService) Add(nd *Node) (key.Key, error) {
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
b := new(blocks.Block)
|
mh, err := nd.Multihash()
|
||||||
b.Data = d
|
|
||||||
b.Multihash, err = nd.Multihash()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
b, _ := blocks.NewBlockWithHash(d, mh)
|
||||||
|
|
||||||
return n.Blocks.AddBlock(b)
|
return n.Blocks.AddBlock(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -82,7 +82,7 @@ func (n *dagService) Get(ctx context.Context, k key.Key) (*Node, error) {
|
|||||||
return nil, fmt.Errorf("Failed to get block for %s: %v", k.B58String(), err)
|
return nil, fmt.Errorf("Failed to get block for %s: %v", k.B58String(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := DecodeProtobuf(b.Data)
|
res, err := DecodeProtobuf(b.Data())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("Failed to decode Protocol Buffers: %v", err)
|
return nil, fmt.Errorf("Failed to decode Protocol Buffers: %v", err)
|
||||||
}
|
}
|
||||||
@ -135,7 +135,7 @@ func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) <-chan *NodeO
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
nd, err := DecodeProtobuf(b.Data)
|
nd, err := DecodeProtobuf(b.Data())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
out <- &NodeOption{Err: err}
|
out <- &NodeOption{Err: err}
|
||||||
return
|
return
|
||||||
@ -316,7 +316,7 @@ func (np *nodePromise) Get(ctx context.Context) (*Node, error) {
|
|||||||
type Batch struct {
|
type Batch struct {
|
||||||
ds *dagService
|
ds *dagService
|
||||||
|
|
||||||
blocks []*blocks.Block
|
blocks []blocks.Block
|
||||||
size int
|
size int
|
||||||
MaxSize int
|
MaxSize int
|
||||||
}
|
}
|
||||||
@ -327,17 +327,17 @@ func (t *Batch) Add(nd *Node) (key.Key, error) {
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
b := new(blocks.Block)
|
mh, err := nd.Multihash()
|
||||||
b.Data = d
|
|
||||||
b.Multihash, err = nd.Multihash()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
k := key.Key(b.Multihash)
|
b, _ := blocks.NewBlockWithHash(d, mh)
|
||||||
|
|
||||||
|
k := key.Key(mh)
|
||||||
|
|
||||||
t.blocks = append(t.blocks, b)
|
t.blocks = append(t.blocks, b)
|
||||||
t.size += len(b.Data)
|
t.size += len(b.Data())
|
||||||
if t.size > t.MaxSize {
|
if t.size > t.MaxSize {
|
||||||
return k, t.Commit()
|
return k, t.Commit()
|
||||||
}
|
}
|
||||||
|
@ -71,7 +71,7 @@ func TestBitswapWithoutRouting(t *testing.T) {
|
|||||||
b, err := n.Blocks.GetBlock(ctx, block0.Key())
|
b, err := n.Blocks.GetBlock(ctx, block0.Key())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
} else if !bytes.Equal(b.Data, block0.Data) {
|
} else if !bytes.Equal(b.Data(), block0.Data()) {
|
||||||
t.Error("byte comparison fail")
|
t.Error("byte comparison fail")
|
||||||
} else {
|
} else {
|
||||||
log.Debug("got block: %s", b.Key())
|
log.Debug("got block: %s", b.Key())
|
||||||
@ -88,7 +88,7 @@ func TestBitswapWithoutRouting(t *testing.T) {
|
|||||||
b, err := n.Blocks.GetBlock(ctx, block1.Key())
|
b, err := n.Blocks.GetBlock(ctx, block1.Key())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
} else if !bytes.Equal(b.Data, block1.Data) {
|
} else if !bytes.Equal(b.Data(), block1.Data()) {
|
||||||
t.Error("byte comparison fail")
|
t.Error("byte comparison fail")
|
||||||
} else {
|
} else {
|
||||||
log.Debug("got block: %s", b.Key())
|
log.Debug("got block: %s", b.Key())
|
||||||
|
Reference in New Issue
Block a user