mirror of
https://github.com/ipfs/kubo.git
synced 2025-09-12 16:11:24 +08:00
feat(fsrepo): document lock usage and make the fsrepo thread-safe
fix(fsrepo): extract private, unsynced method to prevent deadlock
This commit is contained in:
@ -41,7 +41,8 @@ func init() {
|
|||||||
lockfiles = make(map[string]io.Closer)
|
lockfiles = make(map[string]io.Closer)
|
||||||
}
|
}
|
||||||
|
|
||||||
// FSRepo represents an IPFS FileSystem Repo. It is not thread-safe.
|
// FSRepo represents an IPFS FileSystem Repo. It is safe for use by multiple
|
||||||
|
// callers.
|
||||||
type FSRepo struct {
|
type FSRepo struct {
|
||||||
state state
|
state state
|
||||||
path string
|
path string
|
||||||
@ -58,6 +59,11 @@ func At(repoPath string) *FSRepo {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func ConfigAt(repoPath string) (*config.Config, error) {
|
func ConfigAt(repoPath string) (*config.Config, error) {
|
||||||
|
|
||||||
|
// packageLock must be held to ensure that the Read is atomic.
|
||||||
|
packageLock.Lock()
|
||||||
|
defer packageLock.Unlock()
|
||||||
|
|
||||||
configFilename, err := config.Filename(repoPath)
|
configFilename, err := config.Filename(repoPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -67,7 +73,10 @@ func ConfigAt(repoPath string) (*config.Config, error) {
|
|||||||
|
|
||||||
// Init initializes a new FSRepo at the given path with the provided config.
|
// Init initializes a new FSRepo at the given path with the provided config.
|
||||||
func Init(path string, conf *config.Config) error {
|
func Init(path string, conf *config.Config) error {
|
||||||
packageLock.Lock() // lock must be held to ensure atomicity (prevent Removal)
|
|
||||||
|
// packageLock must be held to ensure that the repo is not initialized more
|
||||||
|
// than once.
|
||||||
|
packageLock.Lock()
|
||||||
defer packageLock.Unlock()
|
defer packageLock.Unlock()
|
||||||
|
|
||||||
if isInitializedUnsynced(path) {
|
if isInitializedUnsynced(path) {
|
||||||
@ -84,28 +93,42 @@ func Init(path string, conf *config.Config) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Remove recursively removes the FSRepo at |path|.
|
// Remove recursively removes the FSRepo at |path|.
|
||||||
func Remove(path string) error {
|
func Remove(repoPath string) error {
|
||||||
|
repoPath = path.Clean(repoPath)
|
||||||
|
|
||||||
|
// packageLock must be held to ensure that the repo is not removed while
|
||||||
|
// being accessed by others.
|
||||||
packageLock.Lock()
|
packageLock.Lock()
|
||||||
defer packageLock.Unlock()
|
defer packageLock.Unlock()
|
||||||
if openerCounter.NumOpeners(path) != 0 {
|
|
||||||
|
if openerCounter.NumOpeners(repoPath) != 0 {
|
||||||
return errors.New("repo in use")
|
return errors.New("repo in use")
|
||||||
}
|
}
|
||||||
return os.RemoveAll(path)
|
return os.RemoveAll(repoPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
// LockedByOtherProcess returns true if the FSRepo is locked by another
|
// LockedByOtherProcess returns true if the FSRepo is locked by another
|
||||||
// process. If true, then the repo cannot be opened by this process.
|
// process. If true, then the repo cannot be opened by this process.
|
||||||
func LockedByOtherProcess(repoPath string) bool {
|
func LockedByOtherProcess(repoPath string) bool {
|
||||||
|
repoPath = path.Clean(repoPath)
|
||||||
|
|
||||||
|
// packageLock must be held to check the number of openers.
|
||||||
packageLock.Lock()
|
packageLock.Lock()
|
||||||
defer packageLock.Unlock()
|
defer packageLock.Unlock()
|
||||||
|
|
||||||
// NB: the lock is only held when repos are Open
|
// NB: the lock is only held when repos are Open
|
||||||
return lockfile.Locked(repoPath) && openerCounter.NumOpeners(repoPath) == 0
|
return lockfile.Locked(repoPath) && openerCounter.NumOpeners(repoPath) == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open returns an error if the repo is not initialized.
|
// Open returns an error if the repo is not initialized.
|
||||||
func (r *FSRepo) Open() error {
|
func (r *FSRepo) Open() error {
|
||||||
|
|
||||||
|
// packageLock must be held to make sure that the repo is not destroyed by
|
||||||
|
// another caller. It must not be released until initialization is complete
|
||||||
|
// and the number of openers is incremeneted.
|
||||||
packageLock.Lock()
|
packageLock.Lock()
|
||||||
defer packageLock.Unlock()
|
defer packageLock.Unlock()
|
||||||
|
|
||||||
if r.state != unopened {
|
if r.state != unopened {
|
||||||
return debugerror.Errorf("repo is %s", r.state)
|
return debugerror.Errorf("repo is %s", r.state)
|
||||||
}
|
}
|
||||||
@ -154,8 +177,15 @@ func (r *FSRepo) Open() error {
|
|||||||
//
|
//
|
||||||
// Result when not Open is undefined. The method may panic if it pleases.
|
// Result when not Open is undefined. The method may panic if it pleases.
|
||||||
func (r *FSRepo) Config() *config.Config {
|
func (r *FSRepo) Config() *config.Config {
|
||||||
// no lock necessary because repo is either Open (and thus protected from
|
|
||||||
// Removal) or has no side-effect
|
// It is not necessary to hold the package lock since the repo is in an
|
||||||
|
// opened state. The package lock is _not_ meant to ensure that the repo is
|
||||||
|
// thread-safe. The package lock is only meant to guard againt removal and
|
||||||
|
// coordinate the lockfile. However, we provide thread-safety to keep
|
||||||
|
// things simple.
|
||||||
|
packageLock.Lock()
|
||||||
|
defer packageLock.Unlock()
|
||||||
|
|
||||||
if r.state != opened {
|
if r.state != opened {
|
||||||
panic(fmt.Sprintln("repo is", r.state))
|
panic(fmt.Sprintln("repo is", r.state))
|
||||||
}
|
}
|
||||||
@ -164,37 +194,19 @@ func (r *FSRepo) Config() *config.Config {
|
|||||||
|
|
||||||
// SetConfig updates the FSRepo's config.
|
// SetConfig updates the FSRepo's config.
|
||||||
func (r *FSRepo) SetConfig(updated *config.Config) error {
|
func (r *FSRepo) SetConfig(updated *config.Config) error {
|
||||||
// no lock required because repo should be Open
|
|
||||||
if r.state != opened {
|
// packageLock is held to provide thread-safety.
|
||||||
panic(fmt.Sprintln("repo is", r.state))
|
packageLock.Lock()
|
||||||
}
|
defer packageLock.Unlock()
|
||||||
configFilename, err := config.Filename(r.path)
|
|
||||||
if err != nil {
|
return r.setConfigUnsynced(updated)
|
||||||
return err
|
|
||||||
}
|
|
||||||
// to avoid clobbering user-provided keys, must read the config from disk
|
|
||||||
// as a map, write the updated struct values to the map and write the map
|
|
||||||
// to disk.
|
|
||||||
var mapconf map[string]interface{}
|
|
||||||
if err := readConfigFile(configFilename, &mapconf); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
m, err := config.ToMap(updated)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
for k, v := range m {
|
|
||||||
mapconf[k] = v
|
|
||||||
}
|
|
||||||
if err := writeConfigFile(configFilename, mapconf); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
*r.config = *updated // copy so caller cannot modify this private config
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetConfigKey retrieves only the value of a particular key.
|
// GetConfigKey retrieves only the value of a particular key.
|
||||||
func (r *FSRepo) GetConfigKey(key string) (interface{}, error) {
|
func (r *FSRepo) GetConfigKey(key string) (interface{}, error) {
|
||||||
|
packageLock.Lock()
|
||||||
|
defer packageLock.Unlock()
|
||||||
|
|
||||||
if r.state != opened {
|
if r.state != opened {
|
||||||
return nil, debugerror.Errorf("repo is %s", r.state)
|
return nil, debugerror.Errorf("repo is %s", r.state)
|
||||||
}
|
}
|
||||||
@ -211,7 +223,9 @@ func (r *FSRepo) GetConfigKey(key string) (interface{}, error) {
|
|||||||
|
|
||||||
// SetConfigKey writes the value of a particular key.
|
// SetConfigKey writes the value of a particular key.
|
||||||
func (r *FSRepo) SetConfigKey(key string, value interface{}) error {
|
func (r *FSRepo) SetConfigKey(key string, value interface{}) error {
|
||||||
// no lock required because repo should be Open
|
packageLock.Lock()
|
||||||
|
defer packageLock.Unlock()
|
||||||
|
|
||||||
if r.state != opened {
|
if r.state != opened {
|
||||||
return debugerror.Errorf("repo is %s", r.state)
|
return debugerror.Errorf("repo is %s", r.state)
|
||||||
}
|
}
|
||||||
@ -233,13 +247,14 @@ func (r *FSRepo) SetConfigKey(key string, value interface{}) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return r.SetConfig(conf)
|
return r.setConfigUnsynced(conf)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes the FSRepo, releasing held resources.
|
// Close closes the FSRepo, releasing held resources.
|
||||||
func (r *FSRepo) Close() error {
|
func (r *FSRepo) Close() error {
|
||||||
packageLock.Lock()
|
packageLock.Lock()
|
||||||
defer packageLock.Unlock()
|
defer packageLock.Unlock()
|
||||||
|
|
||||||
if r.state != opened {
|
if r.state != opened {
|
||||||
return debugerror.Errorf("repo is %s", r.state)
|
return debugerror.Errorf("repo is %s", r.state)
|
||||||
}
|
}
|
||||||
@ -251,11 +266,15 @@ var _ repo.Repo = &FSRepo{}
|
|||||||
|
|
||||||
// IsInitialized returns true if the repo is initialized at provided |path|.
|
// IsInitialized returns true if the repo is initialized at provided |path|.
|
||||||
func IsInitialized(path string) bool {
|
func IsInitialized(path string) bool {
|
||||||
|
// packageLock is held to ensure that another caller doesn't attempt to
|
||||||
|
// Init or Remove the repo while this call is in progress.
|
||||||
packageLock.Lock()
|
packageLock.Lock()
|
||||||
defer packageLock.Unlock()
|
defer packageLock.Unlock()
|
||||||
return isInitializedUnsynced(path)
|
return isInitializedUnsynced(path)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// private methods below this point. NB: packageLock must held by caller.
|
||||||
|
|
||||||
// isInitializedUnsynced reports whether the repo is initialized. Caller must
|
// isInitializedUnsynced reports whether the repo is initialized. Caller must
|
||||||
// hold openerCounter lock.
|
// hold openerCounter lock.
|
||||||
func isInitializedUnsynced(path string) bool {
|
func isInitializedUnsynced(path string) bool {
|
||||||
@ -317,3 +336,33 @@ func transitionToClosed(r *FSRepo) error {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// setConfigUnsynced is for private use. Callers must hold the packageLock.
|
||||||
|
func (r *FSRepo) setConfigUnsynced(updated *config.Config) error {
|
||||||
|
if r.state != opened {
|
||||||
|
return fmt.Errorf("repo is", r.state)
|
||||||
|
}
|
||||||
|
configFilename, err := config.Filename(r.path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// to avoid clobbering user-provided keys, must read the config from disk
|
||||||
|
// as a map, write the updated struct values to the map and write the map
|
||||||
|
// to disk.
|
||||||
|
var mapconf map[string]interface{}
|
||||||
|
if err := readConfigFile(configFilename, &mapconf); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
m, err := config.ToMap(updated)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for k, v := range m {
|
||||||
|
mapconf[k] = v
|
||||||
|
}
|
||||||
|
if err := writeConfigFile(configFilename, mapconf); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
*r.config = *updated // copy so caller cannot modify this private config
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user