mirror of
https://github.com/ipfs/kubo.git
synced 2025-12-12 19:59:03 +08:00
improved keystore gc process
This commit is contained in:
@@ -296,6 +296,7 @@ type addrsFilter interface {
|
||||
}
|
||||
|
||||
func SweepingProvider(cfg *config.Config) fx.Option {
|
||||
reprovideInterval := cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval)
|
||||
type providerInput struct {
|
||||
fx.In
|
||||
DHT routing.Routing `name:"dhtc"`
|
||||
@@ -305,7 +306,6 @@ func SweepingProvider(cfg *config.Config) fx.Option {
|
||||
keyStore, err := rds.NewKeyStore(in.Repo.Datastore(),
|
||||
rds.WithPrefixBits(10),
|
||||
rds.WithDatastorePrefix("/reprovider/keystore"),
|
||||
rds.WithGCInterval(cfg.Reprovider.Sweep.KeyStoreGCInterval.WithDefault(cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval))),
|
||||
rds.WithGCBatchSize(int(cfg.Reprovider.Sweep.KeyStoreBatchSize.WithDefault(cfg.Reprovider.Sweep.KeyStoreBatchSize.WithDefault(config.DefaultReproviderSweepKeyStoreBatchSize)))),
|
||||
)
|
||||
if err != nil {
|
||||
@@ -322,7 +322,7 @@ func SweepingProvider(cfg *config.Config) fx.Option {
|
||||
prov, err := ddhtprovider.New(inDht,
|
||||
ddhtprovider.WithKeyStore(keyStore),
|
||||
|
||||
ddhtprovider.WithReprovideInterval(cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval)),
|
||||
ddhtprovider.WithReprovideInterval(reprovideInterval),
|
||||
ddhtprovider.WithMaxReprovideDelay(time.Hour),
|
||||
ddhtprovider.WithOfflineDelay(cfg.Reprovider.Sweep.OfflineDelay.WithDefault(config.DefaultReproviderSweepOfflineDelay)),
|
||||
ddhtprovider.WithConnectivityCheckOnlineInterval(1*time.Minute),
|
||||
@@ -364,7 +364,7 @@ func SweepingProvider(cfg *config.Config) fx.Option {
|
||||
}),
|
||||
|
||||
dhtprovider.WithReplicationFactor(amino.DefaultBucketSize),
|
||||
dhtprovider.WithReprovideInterval(cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval)),
|
||||
dhtprovider.WithReprovideInterval(reprovideInterval),
|
||||
dhtprovider.WithMaxReprovideDelay(time.Hour),
|
||||
dhtprovider.WithOfflineDelay(cfg.Reprovider.Sweep.OfflineDelay.WithDefault(config.DefaultReproviderSweepOfflineDelay)),
|
||||
dhtprovider.WithConnectivityCheckOnlineInterval(1 * time.Minute),
|
||||
@@ -386,28 +386,66 @@ func SweepingProvider(cfg *config.Config) fx.Option {
|
||||
KeyProvider provider.KeyChanFunc
|
||||
}
|
||||
initKeyStore := fx.Invoke(func(lc fx.Lifecycle, in keystoreInput) {
|
||||
var (
|
||||
cancel context.CancelFunc
|
||||
done = make(chan struct{})
|
||||
)
|
||||
|
||||
syncKeyStore := func(ctx context.Context) error {
|
||||
kcf, err := in.KeyProvider(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := in.KeyStore.ResetCids(ctx, kcf); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := in.Provider.RefreshSchedule(); err != nil {
|
||||
logger.Infow("refreshing provider schedule", "err", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
// Set the KeyProvider as a garbage collection function for the
|
||||
// keystore. The KeyStore will periodically purge its keys and replace
|
||||
// them with the ones coming from the KeyChanFunc, to remove CIDs that
|
||||
// should stop being reprovided from its state.
|
||||
in.KeyStore.SetGCFunc(in.KeyProvider)
|
||||
// keystore. Periodically purge the KeyStore from all its keys and
|
||||
// replace them with the keys that needs to be reprovided, coming from
|
||||
// the KeyChanFunc. So far, this is the less worse way to remove CIDs
|
||||
// that shouldn't be reprovided from the provider's state.
|
||||
if err := syncKeyStore(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ch, err := in.KeyProvider(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Initialize the KeyStore with the current keys from the KeyProvider.
|
||||
err = in.KeyStore.ResetCids(ctx, ch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Add keys from the KeyStore to the schedule.
|
||||
_ = in.Provider.RefreshSchedule()
|
||||
gcCtx, c := context.WithCancel(context.Background())
|
||||
cancel = c
|
||||
|
||||
go func() { // garbage collection loop for cids to reprovide
|
||||
defer close(done)
|
||||
ticker := time.NewTicker(reprovideInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-gcCtx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := syncKeyStore(gcCtx); err != nil {
|
||||
logger.Errorw("provider keystore sync", "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
},
|
||||
OnStop: func(_ context.Context) error {
|
||||
OnStop: func(ctx context.Context) error {
|
||||
if cancel != nil {
|
||||
cancel()
|
||||
}
|
||||
select {
|
||||
case <-done:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
return in.KeyStore.Close()
|
||||
},
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user