1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-12-12 19:59:03 +08:00

improved keystore gc process

This commit is contained in:
guillaumemichel
2025-09-01 11:58:37 +02:00
parent 138a86636a
commit b540fba1af

View File

@@ -296,6 +296,7 @@ type addrsFilter interface {
}
func SweepingProvider(cfg *config.Config) fx.Option {
reprovideInterval := cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval)
type providerInput struct {
fx.In
DHT routing.Routing `name:"dhtc"`
@@ -305,7 +306,6 @@ func SweepingProvider(cfg *config.Config) fx.Option {
keyStore, err := rds.NewKeyStore(in.Repo.Datastore(),
rds.WithPrefixBits(10),
rds.WithDatastorePrefix("/reprovider/keystore"),
rds.WithGCInterval(cfg.Reprovider.Sweep.KeyStoreGCInterval.WithDefault(cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval))),
rds.WithGCBatchSize(int(cfg.Reprovider.Sweep.KeyStoreBatchSize.WithDefault(cfg.Reprovider.Sweep.KeyStoreBatchSize.WithDefault(config.DefaultReproviderSweepKeyStoreBatchSize)))),
)
if err != nil {
@@ -322,7 +322,7 @@ func SweepingProvider(cfg *config.Config) fx.Option {
prov, err := ddhtprovider.New(inDht,
ddhtprovider.WithKeyStore(keyStore),
ddhtprovider.WithReprovideInterval(cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval)),
ddhtprovider.WithReprovideInterval(reprovideInterval),
ddhtprovider.WithMaxReprovideDelay(time.Hour),
ddhtprovider.WithOfflineDelay(cfg.Reprovider.Sweep.OfflineDelay.WithDefault(config.DefaultReproviderSweepOfflineDelay)),
ddhtprovider.WithConnectivityCheckOnlineInterval(1*time.Minute),
@@ -364,7 +364,7 @@ func SweepingProvider(cfg *config.Config) fx.Option {
}),
dhtprovider.WithReplicationFactor(amino.DefaultBucketSize),
dhtprovider.WithReprovideInterval(cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval)),
dhtprovider.WithReprovideInterval(reprovideInterval),
dhtprovider.WithMaxReprovideDelay(time.Hour),
dhtprovider.WithOfflineDelay(cfg.Reprovider.Sweep.OfflineDelay.WithDefault(config.DefaultReproviderSweepOfflineDelay)),
dhtprovider.WithConnectivityCheckOnlineInterval(1 * time.Minute),
@@ -386,28 +386,66 @@ func SweepingProvider(cfg *config.Config) fx.Option {
KeyProvider provider.KeyChanFunc
}
initKeyStore := fx.Invoke(func(lc fx.Lifecycle, in keystoreInput) {
var (
cancel context.CancelFunc
done = make(chan struct{})
)
syncKeyStore := func(ctx context.Context) error {
kcf, err := in.KeyProvider(ctx)
if err != nil {
return err
}
if err := in.KeyStore.ResetCids(ctx, kcf); err != nil {
return err
}
if err := in.Provider.RefreshSchedule(); err != nil {
logger.Infow("refreshing provider schedule", "err", err)
}
return nil
}
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
// Set the KeyProvider as a garbage collection function for the
// keystore. The KeyStore will periodically purge its keys and replace
// them with the ones coming from the KeyChanFunc, to remove CIDs that
// should stop being reprovided from its state.
in.KeyStore.SetGCFunc(in.KeyProvider)
// keystore. Periodically purge the KeyStore from all its keys and
// replace them with the keys that needs to be reprovided, coming from
// the KeyChanFunc. So far, this is the less worse way to remove CIDs
// that shouldn't be reprovided from the provider's state.
if err := syncKeyStore(ctx); err != nil {
return err
}
ch, err := in.KeyProvider(ctx)
if err != nil {
return err
}
// Initialize the KeyStore with the current keys from the KeyProvider.
err = in.KeyStore.ResetCids(ctx, ch)
if err != nil {
return err
}
// Add keys from the KeyStore to the schedule.
_ = in.Provider.RefreshSchedule()
gcCtx, c := context.WithCancel(context.Background())
cancel = c
go func() { // garbage collection loop for cids to reprovide
defer close(done)
ticker := time.NewTicker(reprovideInterval)
defer ticker.Stop()
for {
select {
case <-gcCtx.Done():
return
case <-ticker.C:
if err := syncKeyStore(gcCtx); err != nil {
logger.Errorw("provider keystore sync", "err", err)
}
}
}
}()
return nil
},
OnStop: func(_ context.Context) error {
OnStop: func(ctx context.Context) error {
if cancel != nil {
cancel()
}
select {
case <-done:
case <-ctx.Done():
return ctx.Err()
}
return in.KeyStore.Close()
},
})