mirror of
https://github.com/ipfs/kubo.git
synced 2025-08-06 19:44:01 +08:00
feat: expose BlockKeyCacheSize and enable WriteThrough datastore options (#10614)
* feat: expose BlockKeyCacheSize and enable WriteThrough when bloom filter disabled * import/config: add BatchMaxSize and BatchMaxNodes * config: make BlockKeyCacheSize an OptionalInteger * config: add and wire datastore.WriteThrough option * config: omitempty on BlockKeyCacheSize * changelog: rewrite entry about new options for the datastore * config: add docs for BatchMaxNodes and BatchMaxSize * config: make WriteThrough an optional Flag * changelog: improve description of new datastore/import options * refactor: DefaultWriteThrough as bool * chore: boxo v0.26.0 * docs: config and changelog fixes
This commit is contained in:
@ -11,6 +11,7 @@ import (
|
||||
cmds "github.com/ipfs/go-ipfs-cmds"
|
||||
ipld "github.com/ipfs/go-ipld-format"
|
||||
ipldlegacy "github.com/ipfs/go-ipld-legacy"
|
||||
"github.com/ipfs/kubo/config"
|
||||
"github.com/ipfs/kubo/core/coreiface/options"
|
||||
gocarv2 "github.com/ipld/go-car/v2"
|
||||
|
||||
@ -24,6 +25,11 @@ func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment
|
||||
return err
|
||||
}
|
||||
|
||||
cfg, err := node.Repo.Config()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
api, err := cmdenv.GetApi(env, req)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -55,7 +61,14 @@ func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment
|
||||
// this is *not* a transaction
|
||||
// it is simply a way to relieve pressure on the blockstore
|
||||
// similar to pinner.Pin/pinner.Flush
|
||||
batch := ipld.NewBatch(req.Context, api.Dag())
|
||||
batch := ipld.NewBatch(req.Context, api.Dag(),
|
||||
// Default: 128. Means 128 file descriptors needed in flatfs
|
||||
ipld.MaxNodesBatchOption(int(cfg.Import.BatchMaxNodes.WithDefault(config.DefaultBatchMaxNodes))),
|
||||
// Default 100MiB. When setting block size to 1MiB, we can add
|
||||
// ~100 nodes maximum. With default 256KiB block-size, we will
|
||||
// hit the max nodes limit at 32MiB.p
|
||||
ipld.MaxSizeBatchOption(int(cfg.Import.BatchMaxSize.WithDefault(config.DefaultBatchMaxSize))),
|
||||
)
|
||||
|
||||
roots := cid.NewSet()
|
||||
var blockCount, blockBytesCount uint64
|
||||
|
@ -207,12 +207,12 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e
|
||||
return nil
|
||||
}
|
||||
|
||||
if settings.Offline {
|
||||
cfg, err := n.Repo.Config()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cfg, err := n.Repo.Config()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if settings.Offline {
|
||||
cs := cfg.Ipns.ResolveCacheSize
|
||||
if cs == 0 {
|
||||
cs = node.DefaultIpnsCacheSize
|
||||
@ -244,7 +244,9 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e
|
||||
|
||||
if settings.Offline || !settings.FetchBlocks {
|
||||
subAPI.exchange = offlinexch.Exchange(subAPI.blockstore)
|
||||
subAPI.blocks = bserv.New(subAPI.blockstore, subAPI.exchange)
|
||||
subAPI.blocks = bserv.New(subAPI.blockstore, subAPI.exchange,
|
||||
bserv.WriteThrough(cfg.Datastore.WriteThrough.WithDefault(config.DefaultWriteThrough)),
|
||||
)
|
||||
subAPI.dag = dag.NewDAGService(subAPI.blocks)
|
||||
}
|
||||
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
ds "github.com/ipfs/go-datastore"
|
||||
dssync "github.com/ipfs/go-datastore/sync"
|
||||
ipld "github.com/ipfs/go-ipld-format"
|
||||
"github.com/ipfs/kubo/config"
|
||||
coreiface "github.com/ipfs/kubo/core/coreiface"
|
||||
options "github.com/ipfs/kubo/core/coreiface/options"
|
||||
"github.com/ipfs/kubo/core/coreunix"
|
||||
@ -85,13 +86,15 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options
|
||||
if settings.OnlyHash {
|
||||
// setup a /dev/null pipeline to simulate adding the data
|
||||
dstore := dssync.MutexWrap(ds.NewNullDatastore())
|
||||
bs := bstore.NewBlockstore(dstore, bstore.WriteThrough(true))
|
||||
addblockstore = bstore.NewGCBlockstore(bs, nil) // gclocker will never be used
|
||||
exch = nil // exchange will never be used
|
||||
pinning = nil // pinner will never be used
|
||||
bs := bstore.NewBlockstore(dstore, bstore.WriteThrough(true)) // we use NewNullDatastore, so ok to always WriteThrough when OnlyHash
|
||||
addblockstore = bstore.NewGCBlockstore(bs, nil) // gclocker will never be used
|
||||
exch = nil // exchange will never be used
|
||||
pinning = nil // pinner will never be used
|
||||
}
|
||||
|
||||
bserv := blockservice.New(addblockstore, exch) // hash security 001
|
||||
bserv := blockservice.New(addblockstore, exch,
|
||||
blockservice.WriteThrough(cfg.Datastore.WriteThrough.WithDefault(config.DefaultWriteThrough)),
|
||||
) // hash security 001
|
||||
dserv := merkledag.NewDAGService(bserv)
|
||||
|
||||
// add a sync call to the DagService
|
||||
|
@ -24,21 +24,26 @@ import (
|
||||
dagpb "github.com/ipld/go-codec-dagpb"
|
||||
"go.uber.org/fx"
|
||||
|
||||
"github.com/ipfs/kubo/config"
|
||||
"github.com/ipfs/kubo/core/node/helpers"
|
||||
"github.com/ipfs/kubo/repo"
|
||||
)
|
||||
|
||||
// BlockService creates new blockservice which provides an interface to fetch content-addressable blocks
|
||||
func BlockService(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService {
|
||||
bsvc := blockservice.New(bs, rem)
|
||||
func BlockService(cfg *config.Config) func(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService {
|
||||
return func(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService {
|
||||
bsvc := blockservice.New(bs, rem,
|
||||
blockservice.WriteThrough(cfg.Datastore.WriteThrough.WithDefault(config.DefaultWriteThrough)),
|
||||
)
|
||||
|
||||
lc.Append(fx.Hook{
|
||||
OnStop: func(ctx context.Context) error {
|
||||
return bsvc.Close()
|
||||
},
|
||||
})
|
||||
lc.Append(fx.Hook{
|
||||
OnStop: func(ctx context.Context) error {
|
||||
return bsvc.Close()
|
||||
},
|
||||
})
|
||||
|
||||
return bsvc
|
||||
return bsvc
|
||||
}
|
||||
}
|
||||
|
||||
// Pinning creates new pinner which tells GC which blocks should be kept
|
||||
|
@ -189,6 +189,7 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.Part
|
||||
func Storage(bcfg *BuildCfg, cfg *config.Config) fx.Option {
|
||||
cacheOpts := blockstore.DefaultCacheOpts()
|
||||
cacheOpts.HasBloomFilterSize = cfg.Datastore.BloomFilterSize
|
||||
cacheOpts.HasTwoQueueCacheSize = int(cfg.Datastore.BlockKeyCacheSize.WithDefault(config.DefaultBlockKeyCacheSize))
|
||||
if !bcfg.Permanent {
|
||||
cacheOpts.HasBloomFilterSize = 0
|
||||
}
|
||||
@ -201,7 +202,7 @@ func Storage(bcfg *BuildCfg, cfg *config.Config) fx.Option {
|
||||
return fx.Options(
|
||||
fx.Provide(RepoConfig),
|
||||
fx.Provide(Datastore),
|
||||
fx.Provide(BaseBlockstoreCtor(cacheOpts, cfg.Datastore.HashOnRead)),
|
||||
fx.Provide(BaseBlockstoreCtor(cacheOpts, cfg.Datastore.HashOnRead, cfg.Datastore.WriteThrough.WithDefault(config.DefaultWriteThrough))),
|
||||
finalBstore,
|
||||
)
|
||||
}
|
||||
@ -332,7 +333,6 @@ func Offline(cfg *config.Config) fx.Option {
|
||||
|
||||
// Core groups basic IPFS services
|
||||
var Core = fx.Options(
|
||||
fx.Provide(BlockService),
|
||||
fx.Provide(Dag),
|
||||
fx.Provide(FetcherConfig),
|
||||
fx.Provide(PathResolverConfig),
|
||||
@ -387,7 +387,7 @@ func IPFS(ctx context.Context, bcfg *BuildCfg) fx.Option {
|
||||
Identity(cfg),
|
||||
IPNS,
|
||||
Networked(bcfg, cfg, userResourceOverrides),
|
||||
|
||||
fx.Provide(BlockService(cfg)),
|
||||
Core,
|
||||
)
|
||||
}
|
||||
|
@ -27,10 +27,12 @@ func Datastore(repo repo.Repo) datastore.Datastore {
|
||||
type BaseBlocks blockstore.Blockstore
|
||||
|
||||
// BaseBlockstoreCtor creates cached blockstore backed by the provided datastore
|
||||
func BaseBlockstoreCtor(cacheOpts blockstore.CacheOpts, hashOnRead bool) func(mctx helpers.MetricsCtx, repo repo.Repo, lc fx.Lifecycle) (bs BaseBlocks, err error) {
|
||||
func BaseBlockstoreCtor(cacheOpts blockstore.CacheOpts, hashOnRead bool, writeThrough bool) func(mctx helpers.MetricsCtx, repo repo.Repo, lc fx.Lifecycle) (bs BaseBlocks, err error) {
|
||||
return func(mctx helpers.MetricsCtx, repo repo.Repo, lc fx.Lifecycle) (bs BaseBlocks, err error) {
|
||||
// hash security
|
||||
bs = blockstore.NewBlockstore(repo.Datastore())
|
||||
bs = blockstore.NewBlockstore(repo.Datastore(),
|
||||
blockstore.WriteThrough(writeThrough),
|
||||
)
|
||||
bs = &verifbs.VerifBS{Blockstore: bs}
|
||||
bs, err = blockstore.CachedBlockstore(helpers.LifecycleCtx(mctx, lc), bs, cacheOpts)
|
||||
if err != nil {
|
||||
|
Reference in New Issue
Block a user