mirror of
https://github.com/ipfs/kubo.git
synced 2025-12-13 21:10:40 +08:00
improved keystore gc process
This commit is contained in:
@@ -296,6 +296,7 @@ type addrsFilter interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func SweepingProvider(cfg *config.Config) fx.Option {
|
func SweepingProvider(cfg *config.Config) fx.Option {
|
||||||
|
reprovideInterval := cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval)
|
||||||
type providerInput struct {
|
type providerInput struct {
|
||||||
fx.In
|
fx.In
|
||||||
DHT routing.Routing `name:"dhtc"`
|
DHT routing.Routing `name:"dhtc"`
|
||||||
@@ -305,7 +306,6 @@ func SweepingProvider(cfg *config.Config) fx.Option {
|
|||||||
keyStore, err := rds.NewKeyStore(in.Repo.Datastore(),
|
keyStore, err := rds.NewKeyStore(in.Repo.Datastore(),
|
||||||
rds.WithPrefixBits(10),
|
rds.WithPrefixBits(10),
|
||||||
rds.WithDatastorePrefix("/reprovider/keystore"),
|
rds.WithDatastorePrefix("/reprovider/keystore"),
|
||||||
rds.WithGCInterval(cfg.Reprovider.Sweep.KeyStoreGCInterval.WithDefault(cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval))),
|
|
||||||
rds.WithGCBatchSize(int(cfg.Reprovider.Sweep.KeyStoreBatchSize.WithDefault(cfg.Reprovider.Sweep.KeyStoreBatchSize.WithDefault(config.DefaultReproviderSweepKeyStoreBatchSize)))),
|
rds.WithGCBatchSize(int(cfg.Reprovider.Sweep.KeyStoreBatchSize.WithDefault(cfg.Reprovider.Sweep.KeyStoreBatchSize.WithDefault(config.DefaultReproviderSweepKeyStoreBatchSize)))),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -322,7 +322,7 @@ func SweepingProvider(cfg *config.Config) fx.Option {
|
|||||||
prov, err := ddhtprovider.New(inDht,
|
prov, err := ddhtprovider.New(inDht,
|
||||||
ddhtprovider.WithKeyStore(keyStore),
|
ddhtprovider.WithKeyStore(keyStore),
|
||||||
|
|
||||||
ddhtprovider.WithReprovideInterval(cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval)),
|
ddhtprovider.WithReprovideInterval(reprovideInterval),
|
||||||
ddhtprovider.WithMaxReprovideDelay(time.Hour),
|
ddhtprovider.WithMaxReprovideDelay(time.Hour),
|
||||||
ddhtprovider.WithOfflineDelay(cfg.Reprovider.Sweep.OfflineDelay.WithDefault(config.DefaultReproviderSweepOfflineDelay)),
|
ddhtprovider.WithOfflineDelay(cfg.Reprovider.Sweep.OfflineDelay.WithDefault(config.DefaultReproviderSweepOfflineDelay)),
|
||||||
ddhtprovider.WithConnectivityCheckOnlineInterval(1*time.Minute),
|
ddhtprovider.WithConnectivityCheckOnlineInterval(1*time.Minute),
|
||||||
@@ -364,7 +364,7 @@ func SweepingProvider(cfg *config.Config) fx.Option {
|
|||||||
}),
|
}),
|
||||||
|
|
||||||
dhtprovider.WithReplicationFactor(amino.DefaultBucketSize),
|
dhtprovider.WithReplicationFactor(amino.DefaultBucketSize),
|
||||||
dhtprovider.WithReprovideInterval(cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval)),
|
dhtprovider.WithReprovideInterval(reprovideInterval),
|
||||||
dhtprovider.WithMaxReprovideDelay(time.Hour),
|
dhtprovider.WithMaxReprovideDelay(time.Hour),
|
||||||
dhtprovider.WithOfflineDelay(cfg.Reprovider.Sweep.OfflineDelay.WithDefault(config.DefaultReproviderSweepOfflineDelay)),
|
dhtprovider.WithOfflineDelay(cfg.Reprovider.Sweep.OfflineDelay.WithDefault(config.DefaultReproviderSweepOfflineDelay)),
|
||||||
dhtprovider.WithConnectivityCheckOnlineInterval(1 * time.Minute),
|
dhtprovider.WithConnectivityCheckOnlineInterval(1 * time.Minute),
|
||||||
@@ -386,28 +386,66 @@ func SweepingProvider(cfg *config.Config) fx.Option {
|
|||||||
KeyProvider provider.KeyChanFunc
|
KeyProvider provider.KeyChanFunc
|
||||||
}
|
}
|
||||||
initKeyStore := fx.Invoke(func(lc fx.Lifecycle, in keystoreInput) {
|
initKeyStore := fx.Invoke(func(lc fx.Lifecycle, in keystoreInput) {
|
||||||
|
var (
|
||||||
|
cancel context.CancelFunc
|
||||||
|
done = make(chan struct{})
|
||||||
|
)
|
||||||
|
|
||||||
|
syncKeyStore := func(ctx context.Context) error {
|
||||||
|
kcf, err := in.KeyProvider(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := in.KeyStore.ResetCids(ctx, kcf); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := in.Provider.RefreshSchedule(); err != nil {
|
||||||
|
logger.Infow("refreshing provider schedule", "err", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
lc.Append(fx.Hook{
|
lc.Append(fx.Hook{
|
||||||
OnStart: func(ctx context.Context) error {
|
OnStart: func(ctx context.Context) error {
|
||||||
// Set the KeyProvider as a garbage collection function for the
|
// Set the KeyProvider as a garbage collection function for the
|
||||||
// keystore. The KeyStore will periodically purge its keys and replace
|
// keystore. Periodically purge the KeyStore from all its keys and
|
||||||
// them with the ones coming from the KeyChanFunc, to remove CIDs that
|
// replace them with the keys that needs to be reprovided, coming from
|
||||||
// should stop being reprovided from its state.
|
// the KeyChanFunc. So far, this is the less worse way to remove CIDs
|
||||||
in.KeyStore.SetGCFunc(in.KeyProvider)
|
// that shouldn't be reprovided from the provider's state.
|
||||||
|
if err := syncKeyStore(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
ch, err := in.KeyProvider(ctx)
|
gcCtx, c := context.WithCancel(context.Background())
|
||||||
if err != nil {
|
cancel = c
|
||||||
return err
|
|
||||||
}
|
go func() { // garbage collection loop for cids to reprovide
|
||||||
// Initialize the KeyStore with the current keys from the KeyProvider.
|
defer close(done)
|
||||||
err = in.KeyStore.ResetCids(ctx, ch)
|
ticker := time.NewTicker(reprovideInterval)
|
||||||
if err != nil {
|
defer ticker.Stop()
|
||||||
return err
|
|
||||||
}
|
for {
|
||||||
// Add keys from the KeyStore to the schedule.
|
select {
|
||||||
_ = in.Provider.RefreshSchedule()
|
case <-gcCtx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
if err := syncKeyStore(gcCtx); err != nil {
|
||||||
|
logger.Errorw("provider keystore sync", "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
OnStop: func(_ context.Context) error {
|
OnStop: func(ctx context.Context) error {
|
||||||
|
if cancel != nil {
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
return in.KeyStore.Close()
|
return in.KeyStore.Close()
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|||||||
Reference in New Issue
Block a user