mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-24 02:39:25 +08:00
blocks/blockstore: introduce context passing to blockstore
License: MIT Signed-off-by: Jakub Sztandera <kubuxu@protonmail.ch>
This commit is contained in:
@ -11,10 +11,10 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
// BloomCached returns Blockstore that caches Has requests using Bloom filter
|
// bloomCached returns Blockstore that caches Has requests using Bloom filter
|
||||||
// Size is size of bloom filter in bytes
|
// Size is size of bloom filter in bytes
|
||||||
func BloomCached(bs Blockstore, bloomSize, lruSize int) (*bloomcache, error) {
|
func bloomCached(bs Blockstore, ctx context.Context, bloomSize, hashCount, lruSize int) (*bloomcache, error) {
|
||||||
bl, err := bloom.New(float64(bloomSize), float64(7))
|
bl, err := bloom.New(float64(bloomSize), float64(hashCount))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -24,7 +24,7 @@ func BloomCached(bs Blockstore, bloomSize, lruSize int) (*bloomcache, error) {
|
|||||||
}
|
}
|
||||||
bc := &bloomcache{blockstore: bs, bloom: bl, arc: arc}
|
bc := &bloomcache{blockstore: bs, bloom: bl, arc: arc}
|
||||||
bc.Invalidate()
|
bc.Invalidate()
|
||||||
go bc.Rebuild()
|
go bc.Rebuild(ctx)
|
||||||
|
|
||||||
return bc, nil
|
return bc, nil
|
||||||
}
|
}
|
||||||
@ -52,8 +52,7 @@ func (b *bloomcache) BloomActive() bool {
|
|||||||
return atomic.LoadInt32(&b.active) != 0
|
return atomic.LoadInt32(&b.active) != 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *bloomcache) Rebuild() {
|
func (b *bloomcache) Rebuild(ctx context.Context) {
|
||||||
ctx := context.TODO()
|
|
||||||
evt := log.EventBegin(ctx, "bloomcache.Rebuild")
|
evt := log.EventBegin(ctx, "bloomcache.Rebuild")
|
||||||
defer evt.Done()
|
defer evt.Done()
|
||||||
|
|
||||||
@ -62,8 +61,19 @@ func (b *bloomcache) Rebuild() {
|
|||||||
log.Errorf("AllKeysChan failed in bloomcache rebuild with: %v", err)
|
log.Errorf("AllKeysChan failed in bloomcache rebuild with: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for key := range ch {
|
finish := false
|
||||||
b.bloom.AddTS([]byte(key)) // Use binary key, the more compact the better
|
for !finish {
|
||||||
|
select {
|
||||||
|
case key, ok := <-ch:
|
||||||
|
if ok {
|
||||||
|
b.bloom.AddTS([]byte(key)) // Use binary key, the more compact the better
|
||||||
|
} else {
|
||||||
|
finish = true
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
log.Warning("Cache rebuild closed by context finishing.")
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
close(b.rebuildChan)
|
close(b.rebuildChan)
|
||||||
atomic.StoreInt32(&b.active, 1)
|
atomic.StoreInt32(&b.active, 1)
|
||||||
|
@ -8,18 +8,29 @@ import (
|
|||||||
|
|
||||||
"github.com/ipfs/go-ipfs/blocks"
|
"github.com/ipfs/go-ipfs/blocks"
|
||||||
|
|
||||||
|
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
|
||||||
ds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore"
|
ds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore"
|
||||||
dsq "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore/query"
|
dsq "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore/query"
|
||||||
syncds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore/sync"
|
syncds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore/sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func testBloomCached(bs GCBlockstore, ctx context.Context) (*bloomcache, error) {
|
||||||
|
opts := DefaultCacheOpts()
|
||||||
|
bbs, err := CachedBlockstore(bs, ctx, opts)
|
||||||
|
if err == nil {
|
||||||
|
return bbs.(*bloomcache), nil
|
||||||
|
} else {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestReturnsErrorWhenSizeNegative(t *testing.T) {
|
func TestReturnsErrorWhenSizeNegative(t *testing.T) {
|
||||||
bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
|
bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
|
||||||
_, err := BloomCached(bs, 100, -1)
|
_, err := bloomCached(bs, nil, 100, 1, -1)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fail()
|
t.Fail()
|
||||||
}
|
}
|
||||||
_, err = BloomCached(bs, -1, 100)
|
_, err = bloomCached(bs, nil, -1, 1, 100)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fail()
|
t.Fail()
|
||||||
}
|
}
|
||||||
@ -29,7 +40,7 @@ func TestRemoveCacheEntryOnDelete(t *testing.T) {
|
|||||||
b := blocks.NewBlock([]byte("foo"))
|
b := blocks.NewBlock([]byte("foo"))
|
||||||
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
|
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
|
||||||
bs := NewBlockstore(syncds.MutexWrap(cd))
|
bs := NewBlockstore(syncds.MutexWrap(cd))
|
||||||
cachedbs, err := BloomCached(bs, 1, 1)
|
cachedbs, err := testBloomCached(bs, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -53,7 +64,7 @@ func TestRemoveCacheEntryOnDelete(t *testing.T) {
|
|||||||
func TestElideDuplicateWrite(t *testing.T) {
|
func TestElideDuplicateWrite(t *testing.T) {
|
||||||
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
|
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
|
||||||
bs := NewBlockstore(syncds.MutexWrap(cd))
|
bs := NewBlockstore(syncds.MutexWrap(cd))
|
||||||
cachedbs, err := BloomCached(bs, 1, 1)
|
cachedbs, err := testBloomCached(bs, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -73,14 +84,15 @@ func TestHasIsBloomCached(t *testing.T) {
|
|||||||
for i := 0; i < 1000; i++ {
|
for i := 0; i < 1000; i++ {
|
||||||
bs.Put(blocks.NewBlock([]byte(fmt.Sprintf("data: %d", i))))
|
bs.Put(blocks.NewBlock([]byte(fmt.Sprintf("data: %d", i))))
|
||||||
}
|
}
|
||||||
cachedbs, err := BloomCached(bs, 256*1024, 128)
|
ctx, _ := context.WithTimeout(context.Background(), 1*time.Second)
|
||||||
|
cachedbs, err := testBloomCached(bs, ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-cachedbs.rebuildChan:
|
case <-cachedbs.rebuildChan:
|
||||||
case <-time.After(1 * time.Second):
|
case <-ctx.Done():
|
||||||
t.Fatalf("Timeout wating for rebuild: %d", cachedbs.bloom.ElementsAdded())
|
t.Fatalf("Timeout wating for rebuild: %d", cachedbs.bloom.ElementsAdded())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,5 +1,11 @@
|
|||||||
package blockstore
|
package blockstore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
|
||||||
|
)
|
||||||
|
|
||||||
// Next to each option is it aproximate memory usage per unit
|
// Next to each option is it aproximate memory usage per unit
|
||||||
type CacheOpts struct {
|
type CacheOpts struct {
|
||||||
HasBloomFilterSize int // 1 bit
|
HasBloomFilterSize int // 1 bit
|
||||||
@ -14,3 +20,23 @@ func DefaultCacheOpts() CacheOpts {
|
|||||||
HasARCCacheSize: 64 * 1024,
|
HasARCCacheSize: 64 * 1024,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func CachedBlockstore(bs GCBlockstore,
|
||||||
|
ctx context.Context, opts CacheOpts) (cbs GCBlockstore, err error) {
|
||||||
|
if ctx == nil {
|
||||||
|
ctx = context.TODO() // For tests
|
||||||
|
}
|
||||||
|
|
||||||
|
if opts.HasBloomFilterSize < 0 || opts.HasBloomFilterHashes < 0 ||
|
||||||
|
opts.HasARCCacheSize < 0 {
|
||||||
|
return nil, errors.New("all options for cache need to be greater than zero")
|
||||||
|
}
|
||||||
|
|
||||||
|
if opts.HasBloomFilterSize != 0 && opts.HasBloomFilterHashes == 0 {
|
||||||
|
return nil, errors.New("bloom filter hash count can't be 0 when there is size set")
|
||||||
|
}
|
||||||
|
cbs, err = bloomCached(bs, ctx, opts.HasBloomFilterSize, opts.HasBloomFilterHashes,
|
||||||
|
opts.HasARCCacheSize)
|
||||||
|
|
||||||
|
return cbs, err
|
||||||
|
}
|
||||||
|
@ -131,7 +131,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
|
|||||||
|
|
||||||
var err error
|
var err error
|
||||||
bs := bstore.NewBlockstore(n.Repo.Datastore())
|
bs := bstore.NewBlockstore(n.Repo.Datastore())
|
||||||
n.Blockstore, err = bstore.BloomCached(bs, 256*1024, kSizeBlockstoreWriteCache)
|
n.Blockstore, err = bstore.CachedBlockstore(bs, ctx, bstore.DefaultCacheOpts())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -93,7 +93,8 @@ func Session(ctx context.Context, net tn.Network, p testutil.Identity) Instance
|
|||||||
adapter := net.Adapter(p)
|
adapter := net.Adapter(p)
|
||||||
dstore := ds_sync.MutexWrap(datastore2.WithDelay(ds.NewMapDatastore(), bsdelay))
|
dstore := ds_sync.MutexWrap(datastore2.WithDelay(ds.NewMapDatastore(), bsdelay))
|
||||||
|
|
||||||
bstore, err := blockstore.BloomCached(blockstore.NewBlockstore(ds_sync.MutexWrap(dstore)), bloomSize, writeCacheElems)
|
bstore, err := blockstore.CachedBlockstore(blockstore.NewBlockstore(
|
||||||
|
ds_sync.MutexWrap(dstore)), ctx, blockstore.DefaultCacheOpts())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err.Error()) // FIXME perhaps change signature and return error.
|
panic(err.Error()) // FIXME perhaps change signature and return error.
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user