mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-25 03:32:50 +08:00
Merge pull request #3162 from ipfs/fix/bs/many-caching-fix
blockstore: fix PutMany with cache logic
This commit is contained in:
blocks/blockstore
@ -3,6 +3,7 @@ package blockstore
|
|||||||
import (
|
import (
|
||||||
"github.com/ipfs/go-ipfs/blocks"
|
"github.com/ipfs/go-ipfs/blocks"
|
||||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||||
|
|
||||||
ds "gx/ipfs/QmNgqJarToRiq2GBaPJhkmW4B5BxS5B74E1rkGvv2JoaTp/go-datastore"
|
ds "gx/ipfs/QmNgqJarToRiq2GBaPJhkmW4B5BxS5B74E1rkGvv2JoaTp/go-datastore"
|
||||||
lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru"
|
lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru"
|
||||||
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
|
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
|
||||||
@ -95,15 +96,17 @@ func (b *arccache) Put(bl blocks.Block) error {
|
|||||||
func (b *arccache) PutMany(bs []blocks.Block) error {
|
func (b *arccache) PutMany(bs []blocks.Block) error {
|
||||||
var good []blocks.Block
|
var good []blocks.Block
|
||||||
for _, block := range bs {
|
for _, block := range bs {
|
||||||
|
// call put on block if result is inconclusive or we are sure that
|
||||||
|
// the block isn't in storage
|
||||||
if has, ok := b.hasCached(block.Key()); !ok || (ok && !has) {
|
if has, ok := b.hasCached(block.Key()); !ok || (ok && !has) {
|
||||||
good = append(good, block)
|
good = append(good, block)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err := b.blockstore.PutMany(bs)
|
err := b.blockstore.PutMany(good)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, block := range bs {
|
for _, block := range good {
|
||||||
b.arc.Add(block.Key(), true)
|
b.arc.Add(block.Key(), true)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -175,4 +175,10 @@ func TestPutManyCaches(t *testing.T) {
|
|||||||
|
|
||||||
trap("has hit datastore", cd, t)
|
trap("has hit datastore", cd, t)
|
||||||
arc.Has(exampleBlock.Key())
|
arc.Has(exampleBlock.Key())
|
||||||
|
untrap(cd)
|
||||||
|
arc.DeleteBlock(exampleBlock.Key())
|
||||||
|
|
||||||
|
arc.Put(exampleBlock)
|
||||||
|
trap("PunMany has hit datastore", cd, t)
|
||||||
|
arc.PutMany([]blocks.Block{exampleBlock})
|
||||||
}
|
}
|
||||||
|
@ -1,12 +1,13 @@
|
|||||||
package blockstore
|
package blockstore
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/ipfs/go-ipfs/blocks"
|
"github.com/ipfs/go-ipfs/blocks"
|
||||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||||
|
|
||||||
bloom "gx/ipfs/QmWQ2SJisXwcCLsUXLwYCKSfyExXjFRW2WbBH5sqCUnwX5/bbloom"
|
bloom "gx/ipfs/QmWQ2SJisXwcCLsUXLwYCKSfyExXjFRW2WbBH5sqCUnwX5/bbloom"
|
||||||
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
|
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
|
||||||
|
|
||||||
"sync/atomic"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// bloomCached returns Blockstore that caches Has requests using Bloom filter
|
// bloomCached returns Blockstore that caches Has requests using Bloom filter
|
||||||
@ -126,20 +127,19 @@ func (b *bloomcache) Put(bl blocks.Block) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (b *bloomcache) PutMany(bs []blocks.Block) error {
|
func (b *bloomcache) PutMany(bs []blocks.Block) error {
|
||||||
var good []blocks.Block
|
// bloom cache gives only conclusive resulty if key is not contained
|
||||||
for _, block := range bs {
|
// to reduce number of puts we need conclusive infomration if block is contained
|
||||||
if has, ok := b.hasCached(block.Key()); !ok || (ok && !has) {
|
// this means that PutMany can't be improved with bloom cache so we just
|
||||||
good = append(good, block)
|
// just do a passthrough.
|
||||||
}
|
|
||||||
}
|
|
||||||
err := b.blockstore.PutMany(bs)
|
err := b.blockstore.PutMany(bs)
|
||||||
if err == nil {
|
if err != nil {
|
||||||
for _, block := range bs {
|
|
||||||
b.bloom.AddTS([]byte(block.Key()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
for _, bl := range bs {
|
||||||
|
b.bloom.AddTS([]byte(bl.Key()))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (b *bloomcache) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
|
func (b *bloomcache) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
|
||||||
return b.blockstore.AllKeysChan(ctx)
|
return b.blockstore.AllKeysChan(ctx)
|
||||||
|
@ -28,6 +28,39 @@ func testBloomCached(bs GCBlockstore, ctx context.Context) (*bloomcache, error)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPutManyAddsToBloom(t *testing.T) {
|
||||||
|
bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
|
||||||
|
|
||||||
|
ctx, _ := context.WithTimeout(context.Background(), 1*time.Second)
|
||||||
|
cachedbs, err := testBloomCached(bs, ctx)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-cachedbs.rebuildChan:
|
||||||
|
case <-ctx.Done():
|
||||||
|
t.Fatalf("Timeout wating for rebuild: %d", cachedbs.bloom.ElementsAdded())
|
||||||
|
}
|
||||||
|
|
||||||
|
block1 := blocks.NewBlock([]byte("foo"))
|
||||||
|
block2 := blocks.NewBlock([]byte("bar"))
|
||||||
|
|
||||||
|
cachedbs.PutMany([]blocks.Block{block1})
|
||||||
|
has, err := cachedbs.Has(block1.Key())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if has == false {
|
||||||
|
t.Fatal("added block is reported missing")
|
||||||
|
}
|
||||||
|
|
||||||
|
has, err = cachedbs.Has(block2.Key())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if has == true {
|
||||||
|
t.Fatal("not added block is reported to be in blockstore")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestReturnsErrorWhenSizeNegative(t *testing.T) {
|
func TestReturnsErrorWhenSizeNegative(t *testing.T) {
|
||||||
bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
|
bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
|
||||||
_, err := bloomCached(bs, context.TODO(), -1, 1)
|
_, err := bloomCached(bs, context.TODO(), -1, 1)
|
||||||
|
Reference in New Issue
Block a user