1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-08-06 19:44:01 +08:00

[skip changelog] pinmfs: mitigate slow mfs writes when it triggers (#10623)

* pinmfs: mitigate slow mfs writes when it triggers

This mitigates slow mfs writes when the pinmfs daemon calls mfs.RootNode()

When writing lots of files to MFS, this call triggers a mfs directory cache
sync operations. The cache grows forever and is unbounded. The more files
added to a directory, the longer it takes. In the meantime, writing to mfs is
locked.

The pinmfs, even when no remote pinning services are configured, will trigger
this issue. When RootNode() takes more than 30 seconds, the issue will be triggered
continuously causing a write-deadlock onto MFS.

This commit does not fix the fact that if you write 1M items into an MFS
directory, the first time that the directory is traversed it will still have
to sync those 1M items. It does at least prevent writes from stalling after
about ~6000 items.

* pinmfs: fix test
This commit is contained in:
Hector Sanjuan
2024-12-17 21:59:12 +01:00
committed by GitHub
parent 07b8742177
commit 898f024f3c
2 changed files with 47 additions and 22 deletions

View File

@ -90,34 +90,46 @@ func pinMFSOnChange(cctx pinMFSContext, configPollInterval time.Duration, node p
case <-cctx.Context().Done():
return
case <-tmo.C:
tmo.Reset(configPollInterval)
}
// reread the config, which may have changed in the meantime
cfg, err := cctx.GetConfig()
if err != nil {
mfslog.Errorf("pinning reading config (%v)", err)
continue
}
mfslog.Debugf("pinning loop is awake, %d remote services", len(cfg.Pinning.RemoteServices))
// reread the config, which may have changed in the meantime
cfg, err := cctx.GetConfig()
if err != nil {
mfslog.Errorf("pinning reading config (%v)", err)
continue
// pin to all remote services in parallel
pinAllMFS(cctx.Context(), node, cfg, lastPins)
}
mfslog.Debugf("pinning loop is awake, %d remote services", len(cfg.Pinning.RemoteServices))
// get the most recent MFS root cid
rootNode, err := node.RootNode()
if err != nil {
mfslog.Errorf("pinning reading MFS root (%v)", err)
continue
}
// pin to all remote services in parallel
pinAllMFS(cctx.Context(), node, cfg, rootNode.Cid(), lastPins)
// pinAllMFS may take long. Reset interval only when we are done doing it
// so that we are not pinning constantly.
tmo.Reset(configPollInterval)
}
}
// pinAllMFS pins on all remote services in parallel to overcome DoS attacks.
func pinAllMFS(ctx context.Context, node pinMFSNode, cfg *config.Config, rootCid cid.Cid, lastPins map[string]lastPin) {
func pinAllMFS(ctx context.Context, node pinMFSNode, cfg *config.Config, lastPins map[string]lastPin) {
ch := make(chan lastPin)
var started int
// Bail out to mitigate issue below when not needing to do anything.
if len(cfg.Pinning.RemoteServices) == 0 {
return
}
// get the most recent MFS root cid.
// Warning! This can be super expensive.
// See https://github.com/ipfs/boxo/pull/751
// and https://github.com/ipfs/kubo/issues/8694
// Reading an MFS-directory nodes can take minutes due to
// ever growing cache being synced to unixfs.
rootNode, err := node.RootNode()
if err != nil {
mfslog.Errorf("pinning reading MFS root (%v)", err)
return
}
rootCid := rootNode.Cid()
for svcName, svcConfig := range cfg.Pinning.RemoteServices {
if ctx.Err() != nil {
break

View File

@ -94,11 +94,24 @@ func TestPinMFSRootNodeError(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*testConfigPollInterval)
defer cancel()
// need at least one config to trigger
cfg := &config.Config{
Pinning: config.Pinning{
RemoteServices: map[string]config.RemotePinningService{
"A": {
Policies: config.RemotePinningServicePolicies{
MFS: config.RemotePinningServiceMFSPolicy{
Enable: false,
},
},
},
},
},
}
cctx := &testPinMFSContext{
ctx: ctx,
cfg: &config.Config{
Pinning: config.Pinning{},
},
cfg: cfg,
err: nil,
}
node := &testPinMFSNode{