1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-12-13 21:10:40 +08:00

improved keystore gc process

This commit is contained in:
guillaumemichel
2025-09-01 11:58:37 +02:00
parent 138a86636a
commit b540fba1af

View File

@@ -296,6 +296,7 @@ type addrsFilter interface {
} }
func SweepingProvider(cfg *config.Config) fx.Option { func SweepingProvider(cfg *config.Config) fx.Option {
reprovideInterval := cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval)
type providerInput struct { type providerInput struct {
fx.In fx.In
DHT routing.Routing `name:"dhtc"` DHT routing.Routing `name:"dhtc"`
@@ -305,7 +306,6 @@ func SweepingProvider(cfg *config.Config) fx.Option {
keyStore, err := rds.NewKeyStore(in.Repo.Datastore(), keyStore, err := rds.NewKeyStore(in.Repo.Datastore(),
rds.WithPrefixBits(10), rds.WithPrefixBits(10),
rds.WithDatastorePrefix("/reprovider/keystore"), rds.WithDatastorePrefix("/reprovider/keystore"),
rds.WithGCInterval(cfg.Reprovider.Sweep.KeyStoreGCInterval.WithDefault(cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval))),
rds.WithGCBatchSize(int(cfg.Reprovider.Sweep.KeyStoreBatchSize.WithDefault(cfg.Reprovider.Sweep.KeyStoreBatchSize.WithDefault(config.DefaultReproviderSweepKeyStoreBatchSize)))), rds.WithGCBatchSize(int(cfg.Reprovider.Sweep.KeyStoreBatchSize.WithDefault(cfg.Reprovider.Sweep.KeyStoreBatchSize.WithDefault(config.DefaultReproviderSweepKeyStoreBatchSize)))),
) )
if err != nil { if err != nil {
@@ -322,7 +322,7 @@ func SweepingProvider(cfg *config.Config) fx.Option {
prov, err := ddhtprovider.New(inDht, prov, err := ddhtprovider.New(inDht,
ddhtprovider.WithKeyStore(keyStore), ddhtprovider.WithKeyStore(keyStore),
ddhtprovider.WithReprovideInterval(cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval)), ddhtprovider.WithReprovideInterval(reprovideInterval),
ddhtprovider.WithMaxReprovideDelay(time.Hour), ddhtprovider.WithMaxReprovideDelay(time.Hour),
ddhtprovider.WithOfflineDelay(cfg.Reprovider.Sweep.OfflineDelay.WithDefault(config.DefaultReproviderSweepOfflineDelay)), ddhtprovider.WithOfflineDelay(cfg.Reprovider.Sweep.OfflineDelay.WithDefault(config.DefaultReproviderSweepOfflineDelay)),
ddhtprovider.WithConnectivityCheckOnlineInterval(1*time.Minute), ddhtprovider.WithConnectivityCheckOnlineInterval(1*time.Minute),
@@ -364,7 +364,7 @@ func SweepingProvider(cfg *config.Config) fx.Option {
}), }),
dhtprovider.WithReplicationFactor(amino.DefaultBucketSize), dhtprovider.WithReplicationFactor(amino.DefaultBucketSize),
dhtprovider.WithReprovideInterval(cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval)), dhtprovider.WithReprovideInterval(reprovideInterval),
dhtprovider.WithMaxReprovideDelay(time.Hour), dhtprovider.WithMaxReprovideDelay(time.Hour),
dhtprovider.WithOfflineDelay(cfg.Reprovider.Sweep.OfflineDelay.WithDefault(config.DefaultReproviderSweepOfflineDelay)), dhtprovider.WithOfflineDelay(cfg.Reprovider.Sweep.OfflineDelay.WithDefault(config.DefaultReproviderSweepOfflineDelay)),
dhtprovider.WithConnectivityCheckOnlineInterval(1 * time.Minute), dhtprovider.WithConnectivityCheckOnlineInterval(1 * time.Minute),
@@ -386,28 +386,66 @@ func SweepingProvider(cfg *config.Config) fx.Option {
KeyProvider provider.KeyChanFunc KeyProvider provider.KeyChanFunc
} }
initKeyStore := fx.Invoke(func(lc fx.Lifecycle, in keystoreInput) { initKeyStore := fx.Invoke(func(lc fx.Lifecycle, in keystoreInput) {
var (
cancel context.CancelFunc
done = make(chan struct{})
)
syncKeyStore := func(ctx context.Context) error {
kcf, err := in.KeyProvider(ctx)
if err != nil {
return err
}
if err := in.KeyStore.ResetCids(ctx, kcf); err != nil {
return err
}
if err := in.Provider.RefreshSchedule(); err != nil {
logger.Infow("refreshing provider schedule", "err", err)
}
return nil
}
lc.Append(fx.Hook{ lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error { OnStart: func(ctx context.Context) error {
// Set the KeyProvider as a garbage collection function for the // Set the KeyProvider as a garbage collection function for the
// keystore. The KeyStore will periodically purge its keys and replace // keystore. Periodically purge the KeyStore from all its keys and
// them with the ones coming from the KeyChanFunc, to remove CIDs that // replace them with the keys that needs to be reprovided, coming from
// should stop being reprovided from its state. // the KeyChanFunc. So far, this is the less worse way to remove CIDs
in.KeyStore.SetGCFunc(in.KeyProvider) // that shouldn't be reprovided from the provider's state.
if err := syncKeyStore(ctx); err != nil {
return err
}
ch, err := in.KeyProvider(ctx) gcCtx, c := context.WithCancel(context.Background())
if err != nil { cancel = c
return err
} go func() { // garbage collection loop for cids to reprovide
// Initialize the KeyStore with the current keys from the KeyProvider. defer close(done)
err = in.KeyStore.ResetCids(ctx, ch) ticker := time.NewTicker(reprovideInterval)
if err != nil { defer ticker.Stop()
return err
} for {
// Add keys from the KeyStore to the schedule. select {
_ = in.Provider.RefreshSchedule() case <-gcCtx.Done():
return
case <-ticker.C:
if err := syncKeyStore(gcCtx); err != nil {
logger.Errorw("provider keystore sync", "err", err)
}
}
}
}()
return nil return nil
}, },
OnStop: func(_ context.Context) error { OnStop: func(ctx context.Context) error {
if cancel != nil {
cancel()
}
select {
case <-done:
case <-ctx.Done():
return ctx.Err()
}
return in.KeyStore.Close() return in.KeyStore.Close()
}, },
}) })