From e0bee137e9d6693e4d34a559bcb5946a7810e78d Mon Sep 17 00:00:00 2001 From: Tommi Virtanen Date: Fri, 13 Mar 2015 14:30:05 -0700 Subject: [PATCH 01/10] fsrepo.Remove no longer checks for concurrently open instances The only caller is `ipfs init`, which at that time does not hold a repository open, and refuses to run on existing repos anyway. --- repo/fsrepo/fsrepo.go | 15 +-------------- repo/fsrepo/fsrepo_test.go | 13 +------------ 2 files changed, 2 insertions(+), 26 deletions(-) diff --git a/repo/fsrepo/fsrepo.go b/repo/fsrepo/fsrepo.go index 125986883..77b7eb92c 100644 --- a/repo/fsrepo/fsrepo.go +++ b/repo/fsrepo/fsrepo.go @@ -38,8 +38,7 @@ var ( // lockfiles holds references to the Closers that ensure that repos are // only accessed by one process at a time. lockfiles map[string]io.Closer - // openersCounter prevents the fsrepo from being removed while there exist open - // FSRepo handles. It also ensures that the Init is atomic. + // openersCounter ensures that the Init is atomic. // // packageLock also protects numOpenedRepos // @@ -165,15 +164,6 @@ func Init(repoPath string, conf *config.Config) error { // Remove recursively removes the FSRepo at |path|. func Remove(repoPath string) error { repoPath = path.Clean(repoPath) - - // packageLock must be held to ensure that the repo is not removed while - // being accessed by others. - packageLock.Lock() - defer packageLock.Unlock() - - if openersCounter.NumOpeners(repoPath) != 0 { - return errors.New("repo in use") - } return os.RemoveAll(repoPath) } @@ -250,9 +240,6 @@ func configureEventLoggerAtRepoPath(c *config.Config, repoPath string) { // Open returns an error if the repo is not initialized. func (r *FSRepo) Open() error { - // packageLock must be held to make sure that the repo is not destroyed by - // another caller. It must not be released until initialization is complete - // and the number of openers is incremeneted. packageLock.Lock() defer packageLock.Unlock() diff --git a/repo/fsrepo/fsrepo_test.go b/repo/fsrepo/fsrepo_test.go index 73871566a..06e3ba623 100644 --- a/repo/fsrepo/fsrepo_test.go +++ b/repo/fsrepo/fsrepo_test.go @@ -30,18 +30,7 @@ func TestInitIdempotence(t *testing.T) { func TestRemove(t *testing.T) { t.Parallel() path := testRepoPath("foo", t) - assert.Nil(Remove(path), t, "should be able to remove after closed") -} - -func TestCannotRemoveIfOpen(t *testing.T) { - t.Parallel() - path := testRepoPath("TestCannotRemoveIfOpen", t) - assert.Nil(Init(path, &config.Config{}), t, "should initialize successfully") - r := At(path) - assert.Nil(r.Open(), t) - assert.Err(Remove(path), t, "should not be able to remove while open") - assert.Nil(r.Close(), t) - assert.Nil(Remove(path), t, "should be able to remove after closed") + assert.Nil(Remove(path), t, "can remove a repository") } func TestCannotBeReopened(t *testing.T) { From 215fed1051d1e5527c9b544cf175910a47e2cff0 Mon Sep 17 00:00:00 2001 From: Tommi Virtanen Date: Fri, 13 Mar 2015 14:38:33 -0700 Subject: [PATCH 02/10] fsrepo.LockedByOtherProcess no longer checks for local opens This is only called from `ipfs` command line tool well before it opens the repo. The behavior change here causes a false positive if the current process has already opened the repo. That's a bit late to ask this question, anyway, and is not expected to have ever triggered. --- repo/fsrepo/fsrepo.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/repo/fsrepo/fsrepo.go b/repo/fsrepo/fsrepo.go index 77b7eb92c..ab56fa7f1 100644 --- a/repo/fsrepo/fsrepo.go +++ b/repo/fsrepo/fsrepo.go @@ -172,12 +172,11 @@ func Remove(repoPath string) error { func LockedByOtherProcess(repoPath string) bool { repoPath = path.Clean(repoPath) - // packageLock must be held to check the number of openers. - packageLock.Lock() - defer packageLock.Unlock() + // TODO replace this with the "api" file + // https://github.com/ipfs/specs/tree/master/repo/fs-repo // NB: the lock is only held when repos are Open - return lockfile.Locked(repoPath) && openersCounter.NumOpeners(repoPath) == 0 + return lockfile.Locked(repoPath) } // openConfig returns an error if the config file is not present. From fdd1cd8dc045db90b06497a15df7f6f232f76bda Mon Sep 17 00:00:00 2001 From: Tommi Virtanen Date: Fri, 13 Mar 2015 16:01:14 -0700 Subject: [PATCH 03/10] Remove fsrepo.At, make Open a constructor function Nobody calls At without immediately calling Open. First step, a mechanical transformation. Cleanups will follow. --- cmd/ipfs/daemon.go | 4 +- cmd/ipfs/init.go | 8 ++-- cmd/ipfs/main.go | 4 +- cmd/ipfs/tour.go | 4 +- cmd/ipfs_bootstrapd/main.go | 4 +- cmd/ipfs_routingd/main.go | 4 +- cmd/ipfswatch/main.go | 4 +- core/commands/bootstrap.go | 12 ++--- core/commands/config.go | 9 ++-- repo/fsrepo/fsrepo.go | 85 +++++++++++++++++------------------ repo/fsrepo/fsrepo_test.go | 43 +++++++----------- test/supernode_client/main.go | 4 +- updates/updates.go | 4 +- 13 files changed, 87 insertions(+), 102 deletions(-) diff --git a/cmd/ipfs/daemon.go b/cmd/ipfs/daemon.go index e8794f68a..8589542e4 100644 --- a/cmd/ipfs/daemon.go +++ b/cmd/ipfs/daemon.go @@ -118,8 +118,8 @@ func daemonFunc(req cmds.Request, res cmds.Response) { // acquire the repo lock _before_ constructing a node. we need to make // sure we are permitted to access the resources (datastore, etc.) - repo := fsrepo.At(req.Context().ConfigRoot) - if err := repo.Open(); err != nil { + repo, err := fsrepo.Open(req.Context().ConfigRoot) + if err != nil { res.SetError(debugerror.Errorf("Couldn't obtain lock. Is another daemon already running?"), cmds.ErrNormal) return } diff --git a/cmd/ipfs/init.go b/cmd/ipfs/init.go index 72446cf3a..f962792da 100644 --- a/cmd/ipfs/init.go +++ b/cmd/ipfs/init.go @@ -110,8 +110,8 @@ func addDefaultAssets(out io.Writer, repoRoot string) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - r := fsrepo.At(repoRoot) - if err := r.Open(); err != nil { // NB: repo is owned by the node + r, err := fsrepo.Open(repoRoot) + if err != nil { // NB: repo is owned by the node return err } @@ -163,8 +163,8 @@ func initializeIpnsKeyspace(repoRoot string) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - r := fsrepo.At(repoRoot) - if err := r.Open(); err != nil { // NB: repo is owned by the node + r, err := fsrepo.Open(repoRoot) + if err != nil { // NB: repo is owned by the node return err } diff --git a/cmd/ipfs/main.go b/cmd/ipfs/main.go index 2bca9dd45..abe2a6594 100644 --- a/cmd/ipfs/main.go +++ b/cmd/ipfs/main.go @@ -193,8 +193,8 @@ func (i *cmdInvocation) constructNodeFunc(ctx context.Context) func() (*core.Ipf return nil, errors.New("constructing node without a request context") } - r := fsrepo.At(i.req.Context().ConfigRoot) - if err := r.Open(); err != nil { // repo is owned by the node + r, err := fsrepo.Open(i.req.Context().ConfigRoot) + if err != nil { // repo is owned by the node return nil, err } diff --git a/cmd/ipfs/tour.go b/cmd/ipfs/tour.go index 9a5277301..f2aae417d 100644 --- a/cmd/ipfs/tour.go +++ b/cmd/ipfs/tour.go @@ -193,8 +193,8 @@ func tourGet(id tour.ID) (*tour.Topic, error) { // TODO share func func writeConfig(path string, cfg *config.Config) error { // NB: This needs to run on the daemon. - r := fsrepo.At(path) - if err := r.Open(); err != nil { + r, err := fsrepo.Open(path) + if err != nil { return err } defer r.Close() diff --git a/cmd/ipfs_bootstrapd/main.go b/cmd/ipfs_bootstrapd/main.go index a215d65b0..699d569b0 100644 --- a/cmd/ipfs_bootstrapd/main.go +++ b/cmd/ipfs_bootstrapd/main.go @@ -57,8 +57,8 @@ func run() error { } } - repo := fsrepo.At(repoPath) - if err := repo.Open(); err != nil { // owned by node + repo, err := fsrepo.Open(repoPath) + if err != nil { // owned by node return err } diff --git a/cmd/ipfs_routingd/main.go b/cmd/ipfs_routingd/main.go index d02a9744a..7cd35a330 100644 --- a/cmd/ipfs_routingd/main.go +++ b/cmd/ipfs_routingd/main.go @@ -57,8 +57,8 @@ func run() error { return err } } - repo := fsrepo.At(repoPath) - if err := repo.Open(); err != nil { // owned by node + repo, err := fsrepo.Open(repoPath) + if err != nil { // owned by node return err } diff --git a/cmd/ipfswatch/main.go b/cmd/ipfswatch/main.go index 7aff99a3f..e9e397ed2 100644 --- a/cmd/ipfswatch/main.go +++ b/cmd/ipfswatch/main.go @@ -65,8 +65,8 @@ func run(ipfsPath, watchPath string) error { return err } - r := fsrepo.At(ipfsPath) - if err := r.Open(); err != nil { + r, err := fsrepo.Open(ipfsPath) + if err != nil { // TODO handle case: daemon running // TODO handle case: repo doesn't exist or isn't initialized return err diff --git a/core/commands/bootstrap.go b/core/commands/bootstrap.go index 4c1d0d42d..c2116a555 100644 --- a/core/commands/bootstrap.go +++ b/core/commands/bootstrap.go @@ -66,8 +66,8 @@ in the bootstrap list). return } - r := fsrepo.At(req.Context().ConfigRoot) - if err := r.Open(); err != nil { + r, err := fsrepo.Open(req.Context().ConfigRoot) + if err != nil { res.SetError(err, cmds.ErrNormal) return } @@ -143,8 +143,8 @@ var bootstrapRemoveCmd = &cmds.Command{ return } - r := fsrepo.At(req.Context().ConfigRoot) - if err := r.Open(); err != nil { + r, err := fsrepo.Open(req.Context().ConfigRoot) + if err != nil { res.SetError(err, cmds.ErrNormal) return } @@ -192,8 +192,8 @@ var bootstrapListCmd = &cmds.Command{ }, Run: func(req cmds.Request, res cmds.Response) { - r := fsrepo.At(req.Context().ConfigRoot) - if err := r.Open(); err != nil { + r, err := fsrepo.Open(req.Context().ConfigRoot) + if err != nil { res.SetError(err, cmds.ErrNormal) return } diff --git a/core/commands/config.go b/core/commands/config.go index e0d8d586e..e57b6626e 100644 --- a/core/commands/config.go +++ b/core/commands/config.go @@ -64,14 +64,13 @@ Set the value of the 'datastore.path' key: args := req.Arguments() key := args[0] - r := fsrepo.At(req.Context().ConfigRoot) - if err := r.Open(); err != nil { + r, err := fsrepo.Open(req.Context().ConfigRoot) + if err != nil { res.SetError(err, cmds.ErrNormal) return } defer r.Close() - var err error var output *ConfigField if len(args) == 2 { value := args[1] @@ -182,8 +181,8 @@ can't be undone. cmds.FileArg("file", true, false, "The file to use as the new config"), }, Run: func(req cmds.Request, res cmds.Response) { - r := fsrepo.At(req.Context().ConfigRoot) - if err := r.Open(); err != nil { + r, err := fsrepo.Open(req.Context().ConfigRoot) + if err != nil { res.SetError(err, cmds.ErrNormal) return } diff --git a/repo/fsrepo/fsrepo.go b/repo/fsrepo/fsrepo.go index ab56fa7f1..626ecb18d 100644 --- a/repo/fsrepo/fsrepo.go +++ b/repo/fsrepo/fsrepo.go @@ -75,13 +75,51 @@ type FSRepo struct { var _ repo.Repo = (*FSRepo)(nil) -// At returns a handle to an FSRepo at the provided |path|. -func At(repoPath string) *FSRepo { - // This method must not have side-effects. - return &FSRepo{ +// Open the FSRepo at path. Returns an error if the repo is not +// initialized. +func Open(repoPath string) (*FSRepo, error) { + packageLock.Lock() + defer packageLock.Unlock() + + r := &FSRepo{ path: path.Clean(repoPath), state: unopened, // explicitly set for clarity } + + expPath, err := u.TildeExpansion(r.path) + if err != nil { + return nil, err + } + r.path = expPath + + if r.state != unopened { + return nil, debugerror.Errorf("repo is %s", r.state) + } + if !isInitializedUnsynced(r.path) { + return nil, debugerror.New("ipfs not initialized, please run 'ipfs init'") + } + // check repo path, then check all constituent parts. + // TODO acquire repo lock + // TODO if err := initCheckDir(logpath); err != nil { // } + if err := dir.Writable(r.path); err != nil { + return nil, err + } + + if err := r.openConfig(); err != nil { + return nil, err + } + + if err := r.openDatastore(); err != nil { + return nil, err + } + + // log.Debugf("writing eventlogs to ...", c.path) + configureEventLoggerAtRepoPath(r.config, r.path) + + if err := r.transitionToOpened(); err != nil { + return nil, err + } + return r, nil } // ConfigAt returns an error if the FSRepo at the given path is not @@ -236,45 +274,6 @@ func configureEventLoggerAtRepoPath(c *config.Config, repoPath string) { eventlog.Configure(eventlog.OutputRotatingLogFile(rotateConf)) } -// Open returns an error if the repo is not initialized. -func (r *FSRepo) Open() error { - - packageLock.Lock() - defer packageLock.Unlock() - - expPath, err := u.TildeExpansion(r.path) - if err != nil { - return err - } - r.path = expPath - - if r.state != unopened { - return debugerror.Errorf("repo is %s", r.state) - } - if !isInitializedUnsynced(r.path) { - return debugerror.New("ipfs not initialized, please run 'ipfs init'") - } - // check repo path, then check all constituent parts. - // TODO acquire repo lock - // TODO if err := initCheckDir(logpath); err != nil { // } - if err := dir.Writable(r.path); err != nil { - return err - } - - if err := r.openConfig(); err != nil { - return err - } - - if err := r.openDatastore(); err != nil { - return err - } - - // log.Debugf("writing eventlogs to ...", c.path) - configureEventLoggerAtRepoPath(r.config, r.path) - - return r.transitionToOpened() -} - func (r *FSRepo) closeDatastore() error { dsLock.Lock() defer dsLock.Unlock() diff --git a/repo/fsrepo/fsrepo_test.go b/repo/fsrepo/fsrepo_test.go index 06e3ba623..a526a31c9 100644 --- a/repo/fsrepo/fsrepo_test.go +++ b/repo/fsrepo/fsrepo_test.go @@ -33,19 +33,6 @@ func TestRemove(t *testing.T) { assert.Nil(Remove(path), t, "can remove a repository") } -func TestCannotBeReopened(t *testing.T) { - t.Parallel() - path := testRepoPath("", t) - assert.Nil(Init(path, &config.Config{}), t) - r := At(path) - assert.Nil(r.Open(), t) - assert.Nil(r.Close(), t) - assert.Err(r.Open(), t, "shouldn't be possible to re-open the repo") - - // mutable state is the enemy. Take Close() as an opportunity to reduce - // entropy. Callers ought to start fresh with a new handle by calling `At`. -} - func TestCanManageReposIndependently(t *testing.T) { t.Parallel() pathA := testRepoPath("a", t) @@ -60,10 +47,10 @@ func TestCanManageReposIndependently(t *testing.T) { assert.True(IsInitialized(pathB), t, "b should be initialized") t.Log("open the two repos") - repoA := At(pathA) - repoB := At(pathB) - assert.Nil(repoA.Open(), t, "a") - assert.Nil(repoB.Open(), t, "b") + repoA, err := Open(pathA) + assert.Nil(err, t, "a") + repoB, err := Open(pathB) + assert.Nil(err, t, "b") t.Log("close and remove b while a is open") assert.Nil(repoB.Close(), t, "close b") @@ -80,15 +67,15 @@ func TestDatastoreGetNotAllowedAfterClose(t *testing.T) { assert.True(!IsInitialized(path), t, "should NOT be initialized") assert.Nil(Init(path, &config.Config{}), t, "should initialize successfully") - r := At(path) - assert.Nil(r.Open(), t, "should open successfully") + r, err := Open(path) + assert.Nil(err, t, "should open successfully") k := "key" data := []byte(k) assert.Nil(r.Datastore().Put(datastore.NewKey(k), data), t, "Put should be successful") assert.Nil(r.Close(), t) - _, err := r.Datastore().Get(datastore.NewKey(k)) + _, err = r.Datastore().Get(datastore.NewKey(k)) assert.Err(err, t, "after closer, Get should be fail") } @@ -97,16 +84,16 @@ func TestDatastorePersistsFromRepoToRepo(t *testing.T) { path := testRepoPath("test", t) assert.Nil(Init(path, &config.Config{}), t) - r1 := At(path) - assert.Nil(r1.Open(), t) + r1, err := Open(path) + assert.Nil(err, t) k := "key" expected := []byte(k) assert.Nil(r1.Datastore().Put(datastore.NewKey(k), expected), t, "using first repo, Put should be successful") assert.Nil(r1.Close(), t) - r2 := At(path) - assert.Nil(r2.Open(), t) + r2, err := Open(path) + assert.Nil(err, t) v, err := r2.Datastore().Get(datastore.NewKey(k)) assert.Nil(err, t, "using second repo, Get should be successful") actual, ok := v.([]byte) @@ -120,10 +107,10 @@ func TestOpenMoreThanOnceInSameProcess(t *testing.T) { path := testRepoPath("", t) assert.Nil(Init(path, &config.Config{}), t) - r1 := At(path) - r2 := At(path) - assert.Nil(r1.Open(), t, "first repo should open successfully") - assert.Nil(r2.Open(), t, "second repo should open successfully") + r1, err := Open(path) + assert.Nil(err, t, "first repo should open successfully") + r2, err := Open(path) + assert.Nil(err, t, "second repo should open successfully") assert.True(r1.ds == r2.ds, t, "repos should share the datastore") assert.Nil(r1.Close(), t) diff --git a/test/supernode_client/main.go b/test/supernode_client/main.go index 13f39ac8b..844355516 100644 --- a/test/supernode_client/main.go +++ b/test/supernode_client/main.go @@ -66,8 +66,8 @@ func run() error { repoPath := gopath.Join(cwd, ".go-ipfs") if err := ensureRepoInitialized(repoPath); err != nil { } - repo := fsrepo.At(repoPath) - if err := repo.Open(); err != nil { // owned by node + repo, err := fsrepo.Open(repoPath) + if err != nil { // owned by node return err } cfg := repo.Config() diff --git a/updates/updates.go b/updates/updates.go index 229ba95d5..81f1eb444 100644 --- a/updates/updates.go +++ b/updates/updates.go @@ -212,8 +212,8 @@ func CliCheckForUpdates(cfg *config.Config, repoPath string) error { // if we checked successfully. if err == ErrNoUpdateAvailable { log.Noticef("No update available, checked on %s", time.Now()) - r := fsrepo.At(repoPath) - if err := r.Open(); err != nil { + r, err := fsrepo.Open(repoPath) + if err != nil { return err } if err := recordUpdateCheck(cfg); err != nil { From c8992f2c1b792b96eba12de9648d194e02938a75 Mon Sep 17 00:00:00 2001 From: Tommi Virtanen Date: Fri, 13 Mar 2015 19:05:06 -0700 Subject: [PATCH 04/10] repo.OnlyOne tracks open Repos and reuses them This will replace the elaborate refcounting in fsrepo, to make it easier to maintain. --- repo/fsrepo/fsrepo.go | 23 +++++++++++- repo/fsrepo/fsrepo_test.go | 2 +- repo/onlyone.go | 72 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 95 insertions(+), 2 deletions(-) create mode 100644 repo/onlyone.go diff --git a/repo/fsrepo/fsrepo.go b/repo/fsrepo/fsrepo.go index 626ecb18d..d4ab8c240 100644 --- a/repo/fsrepo/fsrepo.go +++ b/repo/fsrepo/fsrepo.go @@ -50,6 +50,20 @@ var ( dsLock sync.Mutex dsOpenersCounter *counter.Openers datastores map[string]ds2.ThreadSafeDatastoreCloser + + // onlyOne keeps track of open FSRepo instances. + // + // TODO: once command Context / Repo integration is cleaned up, + // this can be removed. Right now, this makes ConfigCmd.Run + // function try to open the repo twice: + // + // $ ipfs daemon & + // $ ipfs config foo + // + // The reason for the above is that in standalone mode without the + // daemon, `ipfs config` tries to save work by not building the + // full IpfsNode, but accessing the Repo directly. + onlyOne repo.OnlyOne ) func init() { @@ -77,7 +91,14 @@ var _ repo.Repo = (*FSRepo)(nil) // Open the FSRepo at path. Returns an error if the repo is not // initialized. -func Open(repoPath string) (*FSRepo, error) { +func Open(repoPath string) (repo.Repo, error) { + fn := func() (repo.Repo, error) { + return open(repoPath) + } + return onlyOne.Open(repoPath, fn) +} + +func open(repoPath string) (repo.Repo, error) { packageLock.Lock() defer packageLock.Unlock() diff --git a/repo/fsrepo/fsrepo_test.go b/repo/fsrepo/fsrepo_test.go index a526a31c9..bcd3af600 100644 --- a/repo/fsrepo/fsrepo_test.go +++ b/repo/fsrepo/fsrepo_test.go @@ -111,7 +111,7 @@ func TestOpenMoreThanOnceInSameProcess(t *testing.T) { assert.Nil(err, t, "first repo should open successfully") r2, err := Open(path) assert.Nil(err, t, "second repo should open successfully") - assert.True(r1.ds == r2.ds, t, "repos should share the datastore") + assert.True(r1 == r2, t, "second open returns same value") assert.Nil(r1.Close(), t) assert.Nil(r2.Close(), t) diff --git a/repo/onlyone.go b/repo/onlyone.go new file mode 100644 index 000000000..860c166bf --- /dev/null +++ b/repo/onlyone.go @@ -0,0 +1,72 @@ +package repo + +import ( + "sync" +) + +// OnlyOne tracks open Repos by arbitrary key and returns the already +// open one. +type OnlyOne struct { + mu sync.Mutex + active map[interface{}]*ref +} + +// Open a Repo identified by key. If Repo is not already open, the +// open function is called, and the result is remember for further +// use. +// +// Key must be comparable, or Open will panic. Make sure to pick keys +// that are unique across different concrete Repo implementations, +// e.g. by creating a local type: +// +// type repoKey string +// r, err := o.Open(repoKey(path), open) +// +// Call Repo.Close when done. +func (o *OnlyOne) Open(key interface{}, open func() (Repo, error)) (Repo, error) { + o.mu.Lock() + defer o.mu.Unlock() + if o.active == nil { + o.active = make(map[interface{}]*ref) + } + + item, found := o.active[key] + if !found { + repo, err := open() + if err != nil { + return nil, err + } + item = &ref{ + parent: o, + key: key, + Repo: repo, + } + o.active[key] = item + } + item.refs++ + return item, nil +} + +type ref struct { + parent *OnlyOne + key interface{} + refs uint32 + Repo +} + +var _ Repo = (*ref)(nil) + +func (r *ref) Close() error { + r.parent.mu.Lock() + defer r.parent.mu.Unlock() + + r.refs-- + if r.refs > 0 { + // others are holding it open + return nil + } + + // last one + delete(r.parent.active, r.key) + return r.Repo.Close() +} From d5ce5da5febd3cd2269f1203ed7783c3b3f83441 Mon Sep 17 00:00:00 2001 From: Tommi Virtanen Date: Fri, 13 Mar 2015 14:59:45 -0700 Subject: [PATCH 05/10] Remove fsrepo Datastore refcounting repo.Once now does refcounts on a whole Repo level, returning the same pointer to multiple Openers. This removes the need for the weird model of separate FSRepo instances pointing to the same underlying storage, and the races caused by that. --- repo/fsrepo/fsrepo.go | 56 +++++-------------------------------------- 1 file changed, 6 insertions(+), 50 deletions(-) diff --git a/repo/fsrepo/fsrepo.go b/repo/fsrepo/fsrepo.go index d4ab8c240..806cc8e41 100644 --- a/repo/fsrepo/fsrepo.go +++ b/repo/fsrepo/fsrepo.go @@ -46,11 +46,6 @@ var ( // change the repo's state, the package lock does not need to be acquired. openersCounter *counter.Openers - // protects dsOpenersCounter and datastores - dsLock sync.Mutex - dsOpenersCounter *counter.Openers - datastores map[string]ds2.ThreadSafeDatastoreCloser - // onlyOne keeps track of open FSRepo instances. // // TODO: once command Context / Repo integration is cleaned up, @@ -69,9 +64,6 @@ var ( func init() { openersCounter = counter.NewOpenersCounter() lockfiles = make(map[string]io.Closer) - - dsOpenersCounter = counter.NewOpenersCounter() - datastores = make(map[string]ds2.ThreadSafeDatastoreCloser) } // FSRepo represents an IPFS FileSystem Repo. It is safe for use by multiple @@ -254,32 +246,14 @@ func (r *FSRepo) openConfig() error { // openDatastore returns an error if the config file is not present. func (r *FSRepo) openDatastore() error { - dsLock.Lock() - defer dsLock.Unlock() - dsPath := path.Join(r.path, defaultDataStoreDirectory) - - // if no other goroutines have the datastore Open, initialize it and assign - // it to the package-scoped map for the goroutines that follow. - if dsOpenersCounter.NumOpeners(dsPath) == 0 { - ds, err := levelds.NewDatastore(dsPath, &levelds.Options{ - Compression: ldbopts.NoCompression, - }) - if err != nil { - return debugerror.New("unable to open leveldb datastore") - } - datastores[dsPath] = ds - } - - // get the datastore from the package-scoped map and record self as an - // opener. - ds, dsIsPresent := datastores[dsPath] - if !dsIsPresent { - // This indicates a programmer error has occurred. - return errors.New("datastore should be available, but it isn't") + ds, err := levelds.NewDatastore(dsPath, &levelds.Options{ + Compression: ldbopts.NoCompression, + }) + if err != nil { + return debugerror.New("unable to open leveldb datastore") } r.ds = ds - dsOpenersCounter.AddOpener(dsPath) // only after success return nil } @@ -295,24 +269,6 @@ func configureEventLoggerAtRepoPath(c *config.Config, repoPath string) { eventlog.Configure(eventlog.OutputRotatingLogFile(rotateConf)) } -func (r *FSRepo) closeDatastore() error { - dsLock.Lock() - defer dsLock.Unlock() - - dsPath := path.Join(r.path, defaultDataStoreDirectory) - - // decrement the Opener count. if this goroutine is the last, also close - // the underlying datastore (and remove its reference from the map) - - dsOpenersCounter.RemoveOpener(dsPath) - - if dsOpenersCounter.NumOpeners(dsPath) == 0 { - delete(datastores, dsPath) // remove the reference - return r.ds.Close() - } - return nil -} - // Close closes the FSRepo, releasing held resources. func (r *FSRepo) Close() error { packageLock.Lock() @@ -322,7 +278,7 @@ func (r *FSRepo) Close() error { return debugerror.Errorf("repo is %s", r.state) } - if err := r.closeDatastore(); err != nil { + if err := r.ds.Close(); err != nil { return err } From 543adf91f1ea92515132bdc871ae7d32d9d3de7a Mon Sep 17 00:00:00 2001 From: Tommi Virtanen Date: Fri, 13 Mar 2015 15:04:20 -0700 Subject: [PATCH 06/10] Move fsrepo lockfile from global map to struct field --- repo/fsrepo/fsrepo.go | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/repo/fsrepo/fsrepo.go b/repo/fsrepo/fsrepo.go index 806cc8e41..ac3327919 100644 --- a/repo/fsrepo/fsrepo.go +++ b/repo/fsrepo/fsrepo.go @@ -1,7 +1,6 @@ package fsrepo import ( - "errors" "fmt" "io" "os" @@ -34,10 +33,7 @@ var ( // packageLock must be held to while performing any operation that modifies an // FSRepo's state field. This includes Init, Open, Close, and Remove. - packageLock sync.Mutex // protects openersCounter and lockfiles - // lockfiles holds references to the Closers that ensure that repos are - // only accessed by one process at a time. - lockfiles map[string]io.Closer + packageLock sync.Mutex // protects openersCounter // openersCounter ensures that the Init is atomic. // // packageLock also protects numOpenedRepos @@ -63,7 +59,6 @@ var ( func init() { openersCounter = counter.NewOpenersCounter() - lockfiles = make(map[string]io.Closer) } // FSRepo represents an IPFS FileSystem Repo. It is safe for use by multiple @@ -73,6 +68,9 @@ type FSRepo struct { state state // path is the file-system path path string + // lockfile is the file system lock to prevent others from opening + // the same fsrepo path concurrently + lockfile io.Closer // config is set on Open, guarded by packageLock config *config.Config // ds is set on Open @@ -451,7 +449,7 @@ func (r *FSRepo) transitionToOpened() error { if err != nil { return err } - lockfiles[r.path] = closer + r.lockfile = closer } return openersCounter.AddOpener(r.path) } @@ -464,14 +462,9 @@ func (r *FSRepo) transitionToClosed() error { return err } if countAfter := openersCounter.NumOpeners(r.path); countAfter == 0 { - closer, ok := lockfiles[r.path] - if !ok { - return errors.New("package error: lockfile is not held") - } - if err := closer.Close(); err != nil { + if err := r.lockfile.Close(); err != nil { return err } - delete(lockfiles, r.path) } return nil } From 431cf1c902ce7f25036e6c50236b09898aa39d8f Mon Sep 17 00:00:00 2001 From: Tommi Virtanen Date: Fri, 13 Mar 2015 15:24:50 -0700 Subject: [PATCH 07/10] Remove fsrepo refcounting No code attempts to open the same repo multiple times. Error handling is still buggy, but it's starting to get clearer. --- repo/fsrepo/counter/openers.go | 43 ---------------------------------- repo/fsrepo/fsrepo.go | 33 ++++++-------------------- 2 files changed, 7 insertions(+), 69 deletions(-) delete mode 100644 repo/fsrepo/counter/openers.go diff --git a/repo/fsrepo/counter/openers.go b/repo/fsrepo/counter/openers.go deleted file mode 100644 index 25e58107f..000000000 --- a/repo/fsrepo/counter/openers.go +++ /dev/null @@ -1,43 +0,0 @@ -package counter - -import "path" - -// TODO this could be made into something more generic. - -type Openers struct { - // repos maps repo paths to the number of openers holding an FSRepo handle - // to it - repos map[string]int -} - -func NewOpenersCounter() *Openers { - return &Openers{ - repos: make(map[string]int), - } -} - -// NumOpeners returns the number of FSRepos holding a handle to the repo at -// this path. This method is not thread-safe. The caller must have this object -// locked. -func (l *Openers) NumOpeners(repoPath string) int { - return l.repos[key(repoPath)] -} - -// AddOpener messages that an FSRepo holds a handle to the repo at this path. -// This method is not thread-safe. The caller must have this object locked. -func (l *Openers) AddOpener(repoPath string) error { - l.repos[key(repoPath)]++ - return nil -} - -// RemoveOpener messgaes that an FSRepo no longer holds a handle to the repo at -// this path. This method is not thread-safe. The caller must have this object -// locked. -func (l *Openers) RemoveOpener(repoPath string) error { - l.repos[key(repoPath)]-- - return nil -} - -func key(repoPath string) string { - return path.Clean(repoPath) -} diff --git a/repo/fsrepo/fsrepo.go b/repo/fsrepo/fsrepo.go index ac3327919..0a9ea02a7 100644 --- a/repo/fsrepo/fsrepo.go +++ b/repo/fsrepo/fsrepo.go @@ -14,7 +14,6 @@ import ( repo "github.com/jbenet/go-ipfs/repo" "github.com/jbenet/go-ipfs/repo/common" config "github.com/jbenet/go-ipfs/repo/config" - counter "github.com/jbenet/go-ipfs/repo/fsrepo/counter" lockfile "github.com/jbenet/go-ipfs/repo/fsrepo/lock" serialize "github.com/jbenet/go-ipfs/repo/fsrepo/serialize" dir "github.com/jbenet/go-ipfs/thirdparty/dir" @@ -33,14 +32,7 @@ var ( // packageLock must be held to while performing any operation that modifies an // FSRepo's state field. This includes Init, Open, Close, and Remove. - packageLock sync.Mutex // protects openersCounter - // openersCounter ensures that the Init is atomic. - // - // packageLock also protects numOpenedRepos - // - // If an operation is used when repo is Open and the operation does not - // change the repo's state, the package lock does not need to be acquired. - openersCounter *counter.Openers + packageLock sync.Mutex // onlyOne keeps track of open FSRepo instances. // @@ -57,10 +49,6 @@ var ( onlyOne repo.OnlyOne ) -func init() { - openersCounter = counter.NewOpenersCounter() -} - // FSRepo represents an IPFS FileSystem Repo. It is safe for use by multiple // callers. type FSRepo struct { @@ -444,27 +432,20 @@ func isInitializedUnsynced(repoPath string) bool { // the package mutex. func (r *FSRepo) transitionToOpened() error { r.state = opened - if countBefore := openersCounter.NumOpeners(r.path); countBefore == 0 { // #first - closer, err := lockfile.Lock(r.path) - if err != nil { - return err - } - r.lockfile = closer + closer, err := lockfile.Lock(r.path) + if err != nil { + return err } - return openersCounter.AddOpener(r.path) + r.lockfile = closer + return nil } // transitionToClosed manages the state transition to |closed|. Caller must // hold the package mutex. func (r *FSRepo) transitionToClosed() error { r.state = closed - if err := openersCounter.RemoveOpener(r.path); err != nil { + if err := r.lockfile.Close(); err != nil { return err } - if countAfter := openersCounter.NumOpeners(r.path); countAfter == 0 { - if err := r.lockfile.Close(); err != nil { - return err - } - } return nil } From 384ca525b4178a3eb7447a7f8c9dfcd3dbbbf5b1 Mon Sep 17 00:00:00 2001 From: Tommi Virtanen Date: Fri, 13 Mar 2015 16:02:14 -0700 Subject: [PATCH 08/10] Clean up fsrepo path handling --- repo/fsrepo/fsrepo.go | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/repo/fsrepo/fsrepo.go b/repo/fsrepo/fsrepo.go index 0a9ea02a7..76c58214b 100644 --- a/repo/fsrepo/fsrepo.go +++ b/repo/fsrepo/fsrepo.go @@ -80,20 +80,15 @@ func open(repoPath string) (repo.Repo, error) { packageLock.Lock() defer packageLock.Unlock() - r := &FSRepo{ - path: path.Clean(repoPath), - state: unopened, // explicitly set for clarity - } - - expPath, err := u.TildeExpansion(r.path) + expPath, err := u.TildeExpansion(path.Clean(repoPath)) if err != nil { return nil, err } - r.path = expPath - if r.state != unopened { - return nil, debugerror.Errorf("repo is %s", r.state) + r := &FSRepo{ + path: expPath, } + if !isInitializedUnsynced(r.path) { return nil, debugerror.New("ipfs not initialized, please run 'ipfs init'") } From 55228dae11a11ac72c7270f02a79f0044f835bde Mon Sep 17 00:00:00 2001 From: Tommi Virtanen Date: Fri, 13 Mar 2015 16:16:12 -0700 Subject: [PATCH 09/10] Simplify FSRepo life cycle, it's either open or closed --- repo/fsrepo/fsrepo.go | 56 +++++++++++++++---------------------------- repo/fsrepo/state.go | 22 ----------------- 2 files changed, 19 insertions(+), 59 deletions(-) delete mode 100644 repo/fsrepo/state.go diff --git a/repo/fsrepo/fsrepo.go b/repo/fsrepo/fsrepo.go index 76c58214b..69ce06c70 100644 --- a/repo/fsrepo/fsrepo.go +++ b/repo/fsrepo/fsrepo.go @@ -1,7 +1,6 @@ package fsrepo import ( - "fmt" "io" "os" "path" @@ -52,15 +51,14 @@ var ( // FSRepo represents an IPFS FileSystem Repo. It is safe for use by multiple // callers. type FSRepo struct { - // state is the FSRepo's state (unopened, opened, closed) - state state + // has Close been called already + closed bool // path is the file-system path path string // lockfile is the file system lock to prevent others from opening // the same fsrepo path concurrently lockfile io.Closer - // config is set on Open, guarded by packageLock - config *config.Config + config *config.Config // ds is set on Open ds ds2.ThreadSafeDatastoreCloser } @@ -110,9 +108,11 @@ func open(repoPath string) (repo.Repo, error) { // log.Debugf("writing eventlogs to ...", c.path) configureEventLoggerAtRepoPath(r.config, r.path) - if err := r.transitionToOpened(); err != nil { + closer, err := lockfile.Lock(r.path) + if err != nil { return nil, err } + r.lockfile = closer return r, nil } @@ -255,8 +255,8 @@ func (r *FSRepo) Close() error { packageLock.Lock() defer packageLock.Unlock() - if r.state != opened { - return debugerror.Errorf("repo is %s", r.state) + if r.closed { + return debugerror.New("repo is closed") } if err := r.ds.Close(); err != nil { @@ -271,7 +271,11 @@ func (r *FSRepo) Close() error { // to disable logging once the component is closed. // eventlog.Configure(eventlog.Output(os.Stderr)) - return r.transitionToClosed() + r.closed = true + if err := r.lockfile.Close(); err != nil { + return err + } + return nil } // Config returns the FSRepo's config. This method must not be called if the @@ -288,8 +292,8 @@ func (r *FSRepo) Config() *config.Config { packageLock.Lock() defer packageLock.Unlock() - if r.state != opened { - panic(fmt.Sprintln("repo is", r.state)) + if r.closed { + panic("repo is closed") } return r.config } @@ -336,8 +340,8 @@ func (r *FSRepo) GetConfigKey(key string) (interface{}, error) { packageLock.Lock() defer packageLock.Unlock() - if r.state != opened { - return nil, debugerror.Errorf("repo is %s", r.state) + if r.closed { + return nil, debugerror.New("repo is closed") } filename, err := config.Filename(r.path) @@ -356,8 +360,8 @@ func (r *FSRepo) SetConfigKey(key string, value interface{}) error { packageLock.Lock() defer packageLock.Unlock() - if r.state != opened { - return debugerror.Errorf("repo is %s", r.state) + if r.closed { + return debugerror.New("repo is closed") } filename, err := config.Filename(r.path) @@ -422,25 +426,3 @@ func isInitializedUnsynced(repoPath string) bool { } return true } - -// transitionToOpened manages the state transition to |opened|. Caller must hold -// the package mutex. -func (r *FSRepo) transitionToOpened() error { - r.state = opened - closer, err := lockfile.Lock(r.path) - if err != nil { - return err - } - r.lockfile = closer - return nil -} - -// transitionToClosed manages the state transition to |closed|. Caller must -// hold the package mutex. -func (r *FSRepo) transitionToClosed() error { - r.state = closed - if err := r.lockfile.Close(); err != nil { - return err - } - return nil -} diff --git a/repo/fsrepo/state.go b/repo/fsrepo/state.go deleted file mode 100644 index e6ccf35c1..000000000 --- a/repo/fsrepo/state.go +++ /dev/null @@ -1,22 +0,0 @@ -package fsrepo - -type state int - -const ( - unopened = iota - opened - closed -) - -func (s state) String() string { - switch s { - case unopened: - return "unopened" - case opened: - return "opened" - case closed: - return "closed" - default: - return "invalid" - } -} From 03bc2ccb9dfe4af966d9e8ca0f2e1801b36c489b Mon Sep 17 00:00:00 2001 From: Tommi Virtanen Date: Fri, 13 Mar 2015 16:18:38 -0700 Subject: [PATCH 10/10] Take FSRepo lock way earlier There is no way this was safe before. Be careful to unlock on the error paths. --- repo/fsrepo/fsrepo.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/repo/fsrepo/fsrepo.go b/repo/fsrepo/fsrepo.go index 69ce06c70..8730df9a1 100644 --- a/repo/fsrepo/fsrepo.go +++ b/repo/fsrepo/fsrepo.go @@ -87,6 +87,18 @@ func open(repoPath string) (repo.Repo, error) { path: expPath, } + r.lockfile, err = lockfile.Lock(r.path) + if err != nil { + return nil, err + } + keepLocked := false + defer func() { + // unlock on error, leave it locked on success + if !keepLocked { + r.lockfile.Close() + } + }() + if !isInitializedUnsynced(r.path) { return nil, debugerror.New("ipfs not initialized, please run 'ipfs init'") } @@ -108,11 +120,7 @@ func open(repoPath string) (repo.Repo, error) { // log.Debugf("writing eventlogs to ...", c.path) configureEventLoggerAtRepoPath(r.config, r.path) - closer, err := lockfile.Lock(r.path) - if err != nil { - return nil, err - } - r.lockfile = closer + keepLocked = true return r, nil }