mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-30 18:13:54 +08:00
Make Golint happy in the blocks submodule.
This has required changing the order of some parameters and adding HashOnRead to the Blockstore interface (which I have in turn added to all the wrapper implementations). License: MIT Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
@ -1,5 +1,6 @@
|
|||||||
// package blocks contains the lowest level of IPFS data structures,
|
// Package blocks contains the lowest level of IPFS data structures.
|
||||||
// the raw block with a checksum.
|
// A block is raw data accompanied by a CID. The CID contains the multihash
|
||||||
|
// corresponding to the block.
|
||||||
package blocks
|
package blocks
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@ -11,8 +12,11 @@ import (
|
|||||||
mh "gx/ipfs/QmbZ6Cee2uHjG7hf19qLHppgKDRtaG4CVtMzdmK9VCVqLu/go-multihash"
|
mh "gx/ipfs/QmbZ6Cee2uHjG7hf19qLHppgKDRtaG4CVtMzdmK9VCVqLu/go-multihash"
|
||||||
)
|
)
|
||||||
|
|
||||||
var ErrWrongHash = errors.New("data did not match given hash!")
|
// ErrWrongHash is returned when the Cid of a block is not the expected
|
||||||
|
// according to the contents. It is currently used only when debugging.
|
||||||
|
var ErrWrongHash = errors.New("data did not match given hash")
|
||||||
|
|
||||||
|
// Block provides abstraction for blocks implementations.
|
||||||
type Block interface {
|
type Block interface {
|
||||||
RawData() []byte
|
RawData() []byte
|
||||||
Cid() *cid.Cid
|
Cid() *cid.Cid
|
||||||
@ -20,7 +24,8 @@ type Block interface {
|
|||||||
Loggable() map[string]interface{}
|
Loggable() map[string]interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Block is a singular block of data in ipfs
|
// A BasicBlock is a singular block of data in ipfs. It implements the Block
|
||||||
|
// interface.
|
||||||
type BasicBlock struct {
|
type BasicBlock struct {
|
||||||
cid *cid.Cid
|
cid *cid.Cid
|
||||||
data []byte
|
data []byte
|
||||||
@ -32,9 +37,9 @@ func NewBlock(data []byte) *BasicBlock {
|
|||||||
return &BasicBlock{data: data, cid: cid.NewCidV0(u.Hash(data))}
|
return &BasicBlock{data: data, cid: cid.NewCidV0(u.Hash(data))}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBlockWithHash creates a new block when the hash of the data
|
// NewBlockWithCid creates a new block when the hash of the data
|
||||||
// is already known, this is used to save time in situations where
|
// is already known, this is used to save time in situations where
|
||||||
// we are able to be confident that the data is correct
|
// we are able to be confident that the data is correct.
|
||||||
func NewBlockWithCid(data []byte, c *cid.Cid) (*BasicBlock, error) {
|
func NewBlockWithCid(data []byte, c *cid.Cid) (*BasicBlock, error) {
|
||||||
if u.Debug {
|
if u.Debug {
|
||||||
chkc, err := c.Prefix().Sum(data)
|
chkc, err := c.Prefix().Sum(data)
|
||||||
@ -49,22 +54,27 @@ func NewBlockWithCid(data []byte, c *cid.Cid) (*BasicBlock, error) {
|
|||||||
return &BasicBlock{data: data, cid: c}, nil
|
return &BasicBlock{data: data, cid: c}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Multihash returns the hash contained in the block CID.
|
||||||
func (b *BasicBlock) Multihash() mh.Multihash {
|
func (b *BasicBlock) Multihash() mh.Multihash {
|
||||||
return b.cid.Hash()
|
return b.cid.Hash()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RawData returns the block raw contents as a byte slice.
|
||||||
func (b *BasicBlock) RawData() []byte {
|
func (b *BasicBlock) RawData() []byte {
|
||||||
return b.data
|
return b.data
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Cid returns the content identifier of the block.
|
||||||
func (b *BasicBlock) Cid() *cid.Cid {
|
func (b *BasicBlock) Cid() *cid.Cid {
|
||||||
return b.cid
|
return b.cid
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// String provides a human-readable representation of the block CID.
|
||||||
func (b *BasicBlock) String() string {
|
func (b *BasicBlock) String() string {
|
||||||
return fmt.Sprintf("[Block %s]", b.Cid())
|
return fmt.Sprintf("[Block %s]", b.Cid())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Loggable returns a go-log loggable item.
|
||||||
func (b *BasicBlock) Loggable() map[string]interface{} {
|
func (b *BasicBlock) Loggable() map[string]interface{} {
|
||||||
return map[string]interface{}{
|
return map[string]interface{}{
|
||||||
"block": b.Cid().String(),
|
"block": b.Cid().String(),
|
||||||
|
@ -11,6 +11,9 @@ import (
|
|||||||
lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru"
|
lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// arccache wraps a BlockStore with an Adaptive Replacement Cache (ARC) for
|
||||||
|
// block Cids. This provides block access-time improvements, allowing
|
||||||
|
// to short-cut many searches without query-ing the underlying datastore.
|
||||||
type arccache struct {
|
type arccache struct {
|
||||||
arc *lru.ARCCache
|
arc *lru.ARCCache
|
||||||
blockstore Blockstore
|
blockstore Blockstore
|
||||||
@ -128,6 +131,10 @@ func (b *arccache) PutMany(bs []blocks.Block) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *arccache) HashOnRead(enabled bool) {
|
||||||
|
b.blockstore.HashOnRead(enabled)
|
||||||
|
}
|
||||||
|
|
||||||
func (b *arccache) addCache(c *cid.Cid, has bool) {
|
func (b *arccache) addCache(c *cid.Cid, has bool) {
|
||||||
b.arc.Add(c.KeyString(), has)
|
b.arc.Add(c.KeyString(), has)
|
||||||
}
|
}
|
||||||
|
@ -13,25 +13,24 @@ import (
|
|||||||
|
|
||||||
var exampleBlock = blocks.NewBlock([]byte("foo"))
|
var exampleBlock = blocks.NewBlock([]byte("foo"))
|
||||||
|
|
||||||
func testArcCached(bs Blockstore, ctx context.Context) (*arccache, error) {
|
func testArcCached(ctx context.Context, bs Blockstore) (*arccache, error) {
|
||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
ctx = context.TODO()
|
ctx = context.TODO()
|
||||||
}
|
}
|
||||||
opts := DefaultCacheOpts()
|
opts := DefaultCacheOpts()
|
||||||
opts.HasBloomFilterSize = 0
|
opts.HasBloomFilterSize = 0
|
||||||
opts.HasBloomFilterHashes = 0
|
opts.HasBloomFilterHashes = 0
|
||||||
bbs, err := CachedBlockstore(bs, ctx, opts)
|
bbs, err := CachedBlockstore(ctx, bs, opts)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return bbs.(*arccache), nil
|
return bbs.(*arccache), nil
|
||||||
} else {
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
func createStores(t *testing.T) (*arccache, *blockstore, *callbackDatastore) {
|
func createStores(t *testing.T) (*arccache, Blockstore, *callbackDatastore) {
|
||||||
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
|
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
|
||||||
bs := NewBlockstore(syncds.MutexWrap(cd))
|
bs := NewBlockstore(syncds.MutexWrap(cd))
|
||||||
arc, err := testArcCached(bs, nil)
|
arc, err := testArcCached(nil, bs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
// package blockstore implements a thin wrapper over a datastore, giving a
|
// Package blockstore implements a thin wrapper over a datastore, giving a
|
||||||
// clean interface for Getting and Putting block objects.
|
// clean interface for Getting and Putting block objects.
|
||||||
package blockstore
|
package blockstore
|
||||||
|
|
||||||
@ -23,22 +23,36 @@ var log = logging.Logger("blockstore")
|
|||||||
// BlockPrefix namespaces blockstore datastores
|
// BlockPrefix namespaces blockstore datastores
|
||||||
var BlockPrefix = ds.NewKey("blocks")
|
var BlockPrefix = ds.NewKey("blocks")
|
||||||
|
|
||||||
var ValueTypeMismatch = errors.New("the retrieved value is not a Block")
|
// ErrValueTypeMismatch is an error returned when the item retrieved from
|
||||||
|
// the datatstore is not a block.
|
||||||
|
var ErrValueTypeMismatch = errors.New("the retrieved value is not a Block")
|
||||||
|
|
||||||
|
// ErrHashMismatch is an error returned when the hash of a block
|
||||||
|
// is different than expected.
|
||||||
var ErrHashMismatch = errors.New("block in storage has different hash than requested")
|
var ErrHashMismatch = errors.New("block in storage has different hash than requested")
|
||||||
|
|
||||||
|
// ErrNotFound is an error returned when a block is not found.
|
||||||
var ErrNotFound = errors.New("blockstore: block not found")
|
var ErrNotFound = errors.New("blockstore: block not found")
|
||||||
|
|
||||||
// Blockstore wraps a Datastore
|
// Blockstore wraps a Datastore block-centered methods and provides a layer
|
||||||
|
// of abstraction which allows to add different caching strategies.
|
||||||
type Blockstore interface {
|
type Blockstore interface {
|
||||||
DeleteBlock(*cid.Cid) error
|
DeleteBlock(*cid.Cid) error
|
||||||
Has(*cid.Cid) (bool, error)
|
Has(*cid.Cid) (bool, error)
|
||||||
Get(*cid.Cid) (blocks.Block, error)
|
Get(*cid.Cid) (blocks.Block, error)
|
||||||
Put(blocks.Block) error
|
Put(blocks.Block) error
|
||||||
PutMany([]blocks.Block) error
|
PutMany([]blocks.Block) error
|
||||||
|
// AllKeysChan returns a channel from which
|
||||||
|
// the CIDs in the Blockstore can be read. It should respect
|
||||||
|
// the given context, closing the channel if it becomes Done.
|
||||||
AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error)
|
AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error)
|
||||||
|
// HashOnRead specifies if every read block should be
|
||||||
|
// rehashed to make sure it matches its CID.
|
||||||
|
HashOnRead(enabled bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GCLocker abstract functionality to lock a blockstore when performing
|
||||||
|
// garbage-collection operations.
|
||||||
type GCLocker interface {
|
type GCLocker interface {
|
||||||
// GCLock locks the blockstore for garbage collection. No operations
|
// GCLock locks the blockstore for garbage collection. No operations
|
||||||
// that expect to finish with a pin should ocurr simultaneously.
|
// that expect to finish with a pin should ocurr simultaneously.
|
||||||
@ -56,11 +70,15 @@ type GCLocker interface {
|
|||||||
GCRequested() bool
|
GCRequested() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GCBlockstore is a blockstore that can safely run garbage-collection
|
||||||
|
// operations.
|
||||||
type GCBlockstore interface {
|
type GCBlockstore interface {
|
||||||
Blockstore
|
Blockstore
|
||||||
GCLocker
|
GCLocker
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewGCBlockstore returns a default implementation of GCBlockstore
|
||||||
|
// using the given Blockstore and GCLocker.
|
||||||
func NewGCBlockstore(bs Blockstore, gcl GCLocker) GCBlockstore {
|
func NewGCBlockstore(bs Blockstore, gcl GCLocker) GCBlockstore {
|
||||||
return gcBlockstore{bs, gcl}
|
return gcBlockstore{bs, gcl}
|
||||||
}
|
}
|
||||||
@ -70,7 +88,9 @@ type gcBlockstore struct {
|
|||||||
GCLocker
|
GCLocker
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBlockstore(d ds.Batching) *blockstore {
|
// NewBlockstore returns a default Blockstore implementation
|
||||||
|
// using the provided datastore.Batching backend.
|
||||||
|
func NewBlockstore(d ds.Batching) Blockstore {
|
||||||
var dsb ds.Batching
|
var dsb ds.Batching
|
||||||
dd := dsns.Wrap(d, BlockPrefix)
|
dd := dsns.Wrap(d, BlockPrefix)
|
||||||
dsb = dd
|
dsb = dd
|
||||||
@ -108,7 +128,7 @@ func (bs *blockstore) Get(k *cid.Cid) (blocks.Block, error) {
|
|||||||
}
|
}
|
||||||
bdata, ok := maybeData.([]byte)
|
bdata, ok := maybeData.([]byte)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, ValueTypeMismatch
|
return nil, ErrValueTypeMismatch
|
||||||
}
|
}
|
||||||
|
|
||||||
if bs.rehash {
|
if bs.rehash {
|
||||||
@ -122,9 +142,8 @@ func (bs *blockstore) Get(k *cid.Cid) (blocks.Block, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return blocks.NewBlockWithCid(bdata, rbcid)
|
return blocks.NewBlockWithCid(bdata, rbcid)
|
||||||
} else {
|
|
||||||
return blocks.NewBlockWithCid(bdata, k)
|
|
||||||
}
|
}
|
||||||
|
return blocks.NewBlockWithCid(bdata, k)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *blockstore) Put(block blocks.Block) error {
|
func (bs *blockstore) Put(block blocks.Block) error {
|
||||||
@ -162,8 +181,8 @@ func (bs *blockstore) Has(k *cid.Cid) (bool, error) {
|
|||||||
return bs.datastore.Has(dshelp.CidToDsKey(k))
|
return bs.datastore.Has(dshelp.CidToDsKey(k))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *blockstore) DeleteBlock(k *cid.Cid) error {
|
func (bs *blockstore) DeleteBlock(k *cid.Cid) error {
|
||||||
err := s.datastore.Delete(dshelp.CidToDsKey(k))
|
err := bs.datastore.Delete(dshelp.CidToDsKey(k))
|
||||||
if err == ds.ErrNotFound {
|
if err == ds.ErrNotFound {
|
||||||
return ErrNotFound
|
return ErrNotFound
|
||||||
}
|
}
|
||||||
@ -173,7 +192,7 @@ func (s *blockstore) DeleteBlock(k *cid.Cid) error {
|
|||||||
// AllKeysChan runs a query for keys from the blockstore.
|
// AllKeysChan runs a query for keys from the blockstore.
|
||||||
// this is very simplistic, in the future, take dsq.Query as a param?
|
// this is very simplistic, in the future, take dsq.Query as a param?
|
||||||
//
|
//
|
||||||
// AllKeysChan respects context
|
// AllKeysChan respects context.
|
||||||
func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) {
|
func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) {
|
||||||
|
|
||||||
// KeysOnly, because that would be _a lot_ of data.
|
// KeysOnly, because that would be _a lot_ of data.
|
||||||
@ -220,7 +239,9 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error)
|
|||||||
return output, nil
|
return output, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewGCLocker() *gclocker {
|
// NewGCLocker returns a default implementation of
|
||||||
|
// GCLocker using standard [RW] mutexes.
|
||||||
|
func NewGCLocker() GCLocker {
|
||||||
return &gclocker{}
|
return &gclocker{}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -230,6 +251,8 @@ type gclocker struct {
|
|||||||
gcreqlk sync.Mutex
|
gcreqlk sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Unlocker represents an object which can Unlock
|
||||||
|
// something.
|
||||||
type Unlocker interface {
|
type Unlocker interface {
|
||||||
Unlock()
|
Unlock()
|
||||||
}
|
}
|
||||||
|
@ -186,7 +186,7 @@ func TestAllKeysRespectsContext(t *testing.T) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestValueTypeMismatch(t *testing.T) {
|
func TestErrValueTypeMismatch(t *testing.T) {
|
||||||
block := blocks.NewBlock([]byte("some data"))
|
block := blocks.NewBlock([]byte("some data"))
|
||||||
|
|
||||||
datastore := ds.NewMapDatastore()
|
datastore := ds.NewMapDatastore()
|
||||||
@ -196,7 +196,7 @@ func TestValueTypeMismatch(t *testing.T) {
|
|||||||
blockstore := NewBlockstore(ds_sync.MutexWrap(datastore))
|
blockstore := NewBlockstore(ds_sync.MutexWrap(datastore))
|
||||||
|
|
||||||
_, err := blockstore.Get(block.Cid())
|
_, err := blockstore.Get(block.Cid())
|
||||||
if err != ValueTypeMismatch {
|
if err != ErrValueTypeMismatch {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -12,9 +12,10 @@ import (
|
|||||||
bloom "gx/ipfs/QmeiMCBkYHxkDkDfnDadzz4YxY5ruL5Pj499essE4vRsGM/bbloom"
|
bloom "gx/ipfs/QmeiMCBkYHxkDkDfnDadzz4YxY5ruL5Pj499essE4vRsGM/bbloom"
|
||||||
)
|
)
|
||||||
|
|
||||||
// bloomCached returns Blockstore that caches Has requests using Bloom filter
|
// bloomCached returns a Blockstore that caches Has requests using a Bloom
|
||||||
// Size is size of bloom filter in bytes
|
// filter. bloomSize is size of bloom filter in bytes. hashCount specifies the
|
||||||
func bloomCached(bs Blockstore, ctx context.Context, bloomSize, hashCount int) (*bloomcache, error) {
|
// number of hashing functions in the bloom filter (usually known as k).
|
||||||
|
func bloomCached(ctx context.Context, bs Blockstore, bloomSize, hashCount int) (*bloomcache, error) {
|
||||||
bl, err := bloom.New(float64(bloomSize), float64(hashCount))
|
bl, err := bloom.New(float64(bloomSize), float64(hashCount))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -165,6 +166,10 @@ func (b *bloomcache) PutMany(bs []blocks.Block) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *bloomcache) HashOnRead(enabled bool) {
|
||||||
|
b.blockstore.HashOnRead(enabled)
|
||||||
|
}
|
||||||
|
|
||||||
func (b *bloomcache) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) {
|
func (b *bloomcache) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) {
|
||||||
return b.blockstore.AllKeysChan(ctx)
|
return b.blockstore.AllKeysChan(ctx)
|
||||||
}
|
}
|
||||||
|
@ -14,18 +14,17 @@ import (
|
|||||||
syncds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/sync"
|
syncds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
func testBloomCached(bs Blockstore, ctx context.Context) (*bloomcache, error) {
|
func testBloomCached(ctx context.Context, bs Blockstore) (*bloomcache, error) {
|
||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
ctx = context.TODO()
|
ctx = context.Background()
|
||||||
}
|
}
|
||||||
opts := DefaultCacheOpts()
|
opts := DefaultCacheOpts()
|
||||||
opts.HasARCCacheSize = 0
|
opts.HasARCCacheSize = 0
|
||||||
bbs, err := CachedBlockstore(bs, ctx, opts)
|
bbs, err := CachedBlockstore(ctx, bs, opts)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return bbs.(*bloomcache), nil
|
return bbs.(*bloomcache), nil
|
||||||
} else {
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPutManyAddsToBloom(t *testing.T) {
|
func TestPutManyAddsToBloom(t *testing.T) {
|
||||||
@ -34,7 +33,7 @@ func TestPutManyAddsToBloom(t *testing.T) {
|
|||||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
cachedbs, err := testBloomCached(bs, ctx)
|
cachedbs, err := testBloomCached(ctx, bs)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-cachedbs.rebuildChan:
|
case <-cachedbs.rebuildChan:
|
||||||
@ -65,7 +64,7 @@ func TestPutManyAddsToBloom(t *testing.T) {
|
|||||||
|
|
||||||
func TestReturnsErrorWhenSizeNegative(t *testing.T) {
|
func TestReturnsErrorWhenSizeNegative(t *testing.T) {
|
||||||
bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
|
bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
|
||||||
_, err := bloomCached(bs, context.TODO(), -1, 1)
|
_, err := bloomCached(context.Background(), bs, -1, 1)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fail()
|
t.Fail()
|
||||||
}
|
}
|
||||||
@ -80,7 +79,7 @@ func TestHasIsBloomCached(t *testing.T) {
|
|||||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
cachedbs, err := testBloomCached(bs, ctx)
|
cachedbs, err := testBloomCached(ctx, bs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -7,6 +7,7 @@ import (
|
|||||||
"gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
|
"gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// CacheOpts wraps options for CachedBlockStore().
|
||||||
// Next to each option is it aproximate memory usage per unit
|
// Next to each option is it aproximate memory usage per unit
|
||||||
type CacheOpts struct {
|
type CacheOpts struct {
|
||||||
HasBloomFilterSize int // 1 byte
|
HasBloomFilterSize int // 1 byte
|
||||||
@ -14,6 +15,7 @@ type CacheOpts struct {
|
|||||||
HasARCCacheSize int // 32 bytes
|
HasARCCacheSize int // 32 bytes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DefaultCacheOpts returns a CacheOpts initialized with default values.
|
||||||
func DefaultCacheOpts() CacheOpts {
|
func DefaultCacheOpts() CacheOpts {
|
||||||
return CacheOpts{
|
return CacheOpts{
|
||||||
HasBloomFilterSize: 512 << 10,
|
HasBloomFilterSize: 512 << 10,
|
||||||
@ -22,8 +24,12 @@ func DefaultCacheOpts() CacheOpts {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func CachedBlockstore(bs Blockstore,
|
// CachedBlockstore returns a blockstore wrapped in an ARCCache and
|
||||||
ctx context.Context, opts CacheOpts) (cbs Blockstore, err error) {
|
// then in a bloom filter cache, if the options indicate it.
|
||||||
|
func CachedBlockstore(
|
||||||
|
ctx context.Context,
|
||||||
|
bs Blockstore,
|
||||||
|
opts CacheOpts) (cbs Blockstore, err error) {
|
||||||
cbs = bs
|
cbs = bs
|
||||||
|
|
||||||
if opts.HasBloomFilterSize < 0 || opts.HasBloomFilterHashes < 0 ||
|
if opts.HasBloomFilterSize < 0 || opts.HasBloomFilterHashes < 0 ||
|
||||||
@ -42,7 +48,7 @@ func CachedBlockstore(bs Blockstore,
|
|||||||
}
|
}
|
||||||
if opts.HasBloomFilterSize != 0 {
|
if opts.HasBloomFilterSize != 0 {
|
||||||
// *8 because of bytes to bits conversion
|
// *8 because of bytes to bits conversion
|
||||||
cbs, err = bloomCached(cbs, ctx, opts.HasBloomFilterSize*8, opts.HasBloomFilterHashes)
|
cbs, err = bloomCached(ctx, cbs, opts.HasBloomFilterSize*8, opts.HasBloomFilterHashes)
|
||||||
}
|
}
|
||||||
|
|
||||||
return cbs, err
|
return cbs, err
|
||||||
|
@ -1,13 +1,15 @@
|
|||||||
package blockstore_util
|
// Package blockstoreutil provides utility functions for Blockstores.
|
||||||
|
package blockstoreutil
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
bs "github.com/ipfs/go-ipfs/blocks/blockstore"
|
|
||||||
"github.com/ipfs/go-ipfs/pin"
|
|
||||||
ds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
|
ds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
|
||||||
cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
|
cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
|
||||||
|
|
||||||
|
bs "github.com/ipfs/go-ipfs/blocks/blockstore"
|
||||||
|
"github.com/ipfs/go-ipfs/pin"
|
||||||
)
|
)
|
||||||
|
|
||||||
// RemovedBlock is used to respresent the result of removing a block.
|
// RemovedBlock is used to respresent the result of removing a block.
|
||||||
@ -21,12 +23,17 @@ type RemovedBlock struct {
|
|||||||
Error string `json:",omitempty"`
|
Error string `json:",omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RmBlocksOpts is used to wrap options for RmBlocks().
|
||||||
type RmBlocksOpts struct {
|
type RmBlocksOpts struct {
|
||||||
Prefix string
|
Prefix string
|
||||||
Quiet bool
|
Quiet bool
|
||||||
Force bool
|
Force bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RmBlocks removes the blocks provided in the cids slice.
|
||||||
|
// It returns a channel where objects of type RemovedBlock are placed, when
|
||||||
|
// not using the Quiet option. Block removal is asynchronous and will
|
||||||
|
// skip any pinned blocks.
|
||||||
func RmBlocks(blocks bs.GCBlockstore, pins pin.Pinner, cids []*cid.Cid, opts RmBlocksOpts) (<-chan interface{}, error) {
|
func RmBlocks(blocks bs.GCBlockstore, pins pin.Pinner, cids []*cid.Cid, opts RmBlocksOpts) (<-chan interface{}, error) {
|
||||||
// make the channel large enough to hold any result to avoid
|
// make the channel large enough to hold any result to avoid
|
||||||
// blocking while holding the GCLock
|
// blocking while holding the GCLock
|
||||||
@ -53,6 +60,11 @@ func RmBlocks(blocks bs.GCBlockstore, pins pin.Pinner, cids []*cid.Cid, opts RmB
|
|||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FilterPinned takes a slice of Cids and returns it with the pinned Cids
|
||||||
|
// removed. If a Cid is pinned, it will place RemovedBlock objects in the given
|
||||||
|
// out channel, with an error which indicates that the Cid is pinned.
|
||||||
|
// This function is used in RmBlocks to filter out any blocks which are not
|
||||||
|
// to be removed (because they are pinned).
|
||||||
func FilterPinned(pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid) []*cid.Cid {
|
func FilterPinned(pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid) []*cid.Cid {
|
||||||
stillOkay := make([]*cid.Cid, 0, len(cids))
|
stillOkay := make([]*cid.Cid, 0, len(cids))
|
||||||
res, err := pins.CheckIfPinned(cids...)
|
res, err := pins.CheckIfPinned(cids...)
|
||||||
@ -73,6 +85,9 @@ func FilterPinned(pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid) []*c
|
|||||||
return stillOkay
|
return stillOkay
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ProcRmOutput takes the channel returned by RmBlocks and writes
|
||||||
|
// to stdout/stderr according to the RemovedBlock objects received in
|
||||||
|
// that channel.
|
||||||
func ProcRmOutput(in <-chan interface{}, sout io.Writer, serr io.Writer) error {
|
func ProcRmOutput(in <-chan interface{}, sout io.Writer, serr io.Writer) error {
|
||||||
someFailed := false
|
someFailed := false
|
||||||
for res := range in {
|
for res := range in {
|
||||||
|
@ -1,20 +1,30 @@
|
|||||||
|
// Package blocksutil provides utility functions for working
|
||||||
|
// with Blocks.
|
||||||
package blocksutil
|
package blocksutil
|
||||||
|
|
||||||
import "github.com/ipfs/go-ipfs/blocks"
|
import "github.com/ipfs/go-ipfs/blocks"
|
||||||
|
|
||||||
|
// NewBlockGenerator returns an object capable of
|
||||||
|
// producing blocks.
|
||||||
func NewBlockGenerator() BlockGenerator {
|
func NewBlockGenerator() BlockGenerator {
|
||||||
return BlockGenerator{}
|
return BlockGenerator{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BlockGenerator generates BasicBlocks on demand.
|
||||||
|
// For each instace of BlockGenerator,
|
||||||
|
// each new block is different from the previous,
|
||||||
|
// although two different instances will produce the same.
|
||||||
type BlockGenerator struct {
|
type BlockGenerator struct {
|
||||||
seq int
|
seq int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Next generates a new BasicBlock.
|
||||||
func (bg *BlockGenerator) Next() *blocks.BasicBlock {
|
func (bg *BlockGenerator) Next() *blocks.BasicBlock {
|
||||||
bg.seq++
|
bg.seq++
|
||||||
return blocks.NewBlock([]byte(string(bg.seq)))
|
return blocks.NewBlock([]byte(string(bg.seq)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Blocks generates as many BasicBlocks as specified by n.
|
||||||
func (bg *BlockGenerator) Blocks(n int) []*blocks.BasicBlock {
|
func (bg *BlockGenerator) Blocks(n int) []*blocks.BasicBlock {
|
||||||
blocks := make([]*blocks.BasicBlock, 0)
|
blocks := make([]*blocks.BasicBlock, 0)
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
|
@ -1,15 +1,17 @@
|
|||||||
// package bloom implements a simple bloom filter.
|
// Package bloom implements a simple bloom filter.
|
||||||
package bloom
|
package bloom
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
// Non crypto hash, because speed
|
// Non crypto hash, because speed
|
||||||
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/mtchavez/jenkins"
|
|
||||||
"gx/ipfs/QmeWQMDa5dSdP4n8WDeoY5z8L2EKVqF4ZvK4VEHsLqXsGu/hamming"
|
"gx/ipfs/QmeWQMDa5dSdP4n8WDeoY5z8L2EKVqF4ZvK4VEHsLqXsGu/hamming"
|
||||||
"hash"
|
"hash"
|
||||||
|
|
||||||
|
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/mtchavez/jenkins"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// A Filter represents a bloom filter.
|
||||||
type Filter interface {
|
type Filter interface {
|
||||||
Add([]byte)
|
Add([]byte)
|
||||||
Find([]byte) bool
|
Find([]byte) bool
|
||||||
@ -17,6 +19,8 @@ type Filter interface {
|
|||||||
HammingDistance(Filter) (int, error)
|
HammingDistance(Filter) (int, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewFilter creates a new bloom Filter with the given
|
||||||
|
// size. k (the number of hash functions), is hardcoded to 3.
|
||||||
func NewFilter(size int) Filter {
|
func NewFilter(size int) Filter {
|
||||||
return &filter{
|
return &filter{
|
||||||
hash: jenkins.New(),
|
hash: jenkins.New(),
|
||||||
@ -31,6 +35,8 @@ type filter struct {
|
|||||||
k int
|
k int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BasicFilter calls NewFilter with a bloom filter size of
|
||||||
|
// 2048 bytes.
|
||||||
func BasicFilter() Filter {
|
func BasicFilter() Filter {
|
||||||
return NewFilter(2048)
|
return NewFilter(2048)
|
||||||
}
|
}
|
||||||
@ -84,11 +90,11 @@ func (f *filter) Merge(o Filter) (Filter, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if len(casfil.filter) != len(f.filter) {
|
if len(casfil.filter) != len(f.filter) {
|
||||||
return nil, errors.New("filter lengths must match!")
|
return nil, errors.New("filter lengths must match")
|
||||||
}
|
}
|
||||||
|
|
||||||
if casfil.k != f.k {
|
if casfil.k != f.k {
|
||||||
return nil, errors.New("filter k-values must match!")
|
return nil, errors.New("filter k-values must match")
|
||||||
}
|
}
|
||||||
|
|
||||||
nfilt := new(filter)
|
nfilt := new(filter)
|
||||||
@ -110,7 +116,7 @@ func (f *filter) HammingDistance(o Filter) (int, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if len(f.filter) != len(casfil.filter) {
|
if len(f.filter) != len(casfil.filter) {
|
||||||
return 0, errors.New("filter lengths must match!")
|
return 0, errors.New("filter lengths must match")
|
||||||
}
|
}
|
||||||
|
|
||||||
acc := 0
|
acc := 0
|
||||||
|
@ -1,24 +1,30 @@
|
|||||||
// package set contains various different types of 'BlockSet's
|
// Package set defines the BlockSet interface which provides
|
||||||
|
// abstraction for sets of Cids.
|
||||||
|
// It provides a default implementation using cid.Set.
|
||||||
package set
|
package set
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/ipfs/go-ipfs/blocks/bloom"
|
|
||||||
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
|
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
|
||||||
cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
|
cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
|
||||||
|
|
||||||
|
"github.com/ipfs/go-ipfs/blocks/bloom"
|
||||||
)
|
)
|
||||||
|
|
||||||
var log = logging.Logger("blockset")
|
var log = logging.Logger("blockset")
|
||||||
|
|
||||||
// BlockSet represents a mutable set of keyed blocks
|
// BlockSet represents a mutable set of blocks CIDs.
|
||||||
type BlockSet interface {
|
type BlockSet interface {
|
||||||
AddBlock(*cid.Cid)
|
AddBlock(*cid.Cid)
|
||||||
RemoveBlock(*cid.Cid)
|
RemoveBlock(*cid.Cid)
|
||||||
HasKey(*cid.Cid) bool
|
HasKey(*cid.Cid) bool
|
||||||
|
// GetBloomFilter creates and returns a bloom filter to which
|
||||||
|
// all the CIDs in the set have been added.
|
||||||
GetBloomFilter() bloom.Filter
|
GetBloomFilter() bloom.Filter
|
||||||
|
|
||||||
GetKeys() []*cid.Cid
|
GetKeys() []*cid.Cid
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SimpleSetFromKeys returns a default implementation of BlockSet
|
||||||
|
// using cid.Set. The given keys are added to the set.
|
||||||
func SimpleSetFromKeys(keys []*cid.Cid) BlockSet {
|
func SimpleSetFromKeys(keys []*cid.Cid) BlockSet {
|
||||||
sbs := &simpleBlockSet{blocks: cid.NewSet()}
|
sbs := &simpleBlockSet{blocks: cid.NewSet()}
|
||||||
for _, k := range keys {
|
for _, k := range keys {
|
||||||
@ -27,6 +33,8 @@ func SimpleSetFromKeys(keys []*cid.Cid) BlockSet {
|
|||||||
return sbs
|
return sbs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewSimpleBlockSet returns a new empty default implementation
|
||||||
|
// of BlockSet using cid.Set.
|
||||||
func NewSimpleBlockSet() BlockSet {
|
func NewSimpleBlockSet() BlockSet {
|
||||||
return &simpleBlockSet{blocks: cid.NewSet()}
|
return &simpleBlockSet{blocks: cid.NewSet()}
|
||||||
}
|
}
|
||||||
|
@ -184,7 +184,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
|
|||||||
opts.HasBloomFilterSize = 0
|
opts.HasBloomFilterSize = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
cbs, err := bstore.CachedBlockstore(bs, ctx, opts)
|
cbs, err := bstore.CachedBlockstore(ctx, bs, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -94,8 +94,9 @@ func Session(ctx context.Context, net tn.Network, p testutil.Identity) Instance
|
|||||||
adapter := net.Adapter(p)
|
adapter := net.Adapter(p)
|
||||||
dstore := ds_sync.MutexWrap(datastore2.WithDelay(ds.NewMapDatastore(), bsdelay))
|
dstore := ds_sync.MutexWrap(datastore2.WithDelay(ds.NewMapDatastore(), bsdelay))
|
||||||
|
|
||||||
bstore, err := blockstore.CachedBlockstore(blockstore.NewBlockstore(
|
bstore, err := blockstore.CachedBlockstore(ctx,
|
||||||
ds_sync.MutexWrap(dstore)), ctx, blockstore.DefaultCacheOpts())
|
blockstore.NewBlockstore(ds_sync.MutexWrap(dstore)),
|
||||||
|
blockstore.DefaultCacheOpts())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err.Error()) // FIXME perhaps change signature and return error.
|
panic(err.Error()) // FIXME perhaps change signature and return error.
|
||||||
}
|
}
|
||||||
|
@ -199,4 +199,9 @@ func (f *Filestore) PutMany(bs []blocks.Block) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HashOnRead calls blockstore.HashOnRead.
|
||||||
|
func (f *Filestore) HashOnRead(enabled bool) {
|
||||||
|
f.bs.HashOnRead(enabled)
|
||||||
|
}
|
||||||
|
|
||||||
var _ blockstore.Blockstore = (*Filestore)(nil)
|
var _ blockstore.Blockstore = (*Filestore)(nil)
|
||||||
|
Reference in New Issue
Block a user