From bc76d2e526014590dc5f77d5a2e9ebfa77315ea9 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Wed, 14 Jan 2015 13:18:40 -0800 Subject: [PATCH] fix(fsrepo/datastore) allow goroutines to share the datastore. doh! I forgot to make sure leveldb is only opened once. thanks for catching this @mappum * You may be wondering why we don't just share pointers to FSRepos. We want to manage the lifecycle of the FSRepo by tracking its `state`. Thus each FSRepo/goroutine requires private instance variables. For this reason, each `fsrepo.At(p)` caller must get its own goroutine. * There's a test in `fsrepo` because callers desire the ability to Open from multiple goroutines. There's a test in `component` because this is where the actual work needs to go in order to provide the desired contract. If the `component` package moves, the assurances need to move along with it. cc @whyrusleeping @jbenet side note: there are a couple packages in FSRepo that it might be worthwhile to extract once the dust settles on this feature-set. --- repo/fsrepo/component/datastore.go | 76 ++++++++++++++++++++----- repo/fsrepo/component/datastore_test.go | 30 ++++++++++ repo/fsrepo/fsrepo_test.go | 14 +++++ 3 files changed, 106 insertions(+), 14 deletions(-) create mode 100644 repo/fsrepo/component/datastore_test.go diff --git a/repo/fsrepo/component/datastore.go b/repo/fsrepo/component/datastore.go index 67e6c375b..c5536bfaf 100644 --- a/repo/fsrepo/component/datastore.go +++ b/repo/fsrepo/component/datastore.go @@ -1,19 +1,34 @@ package component import ( + "errors" + "sync" + datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" levelds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb" ldbopts "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt" config "github.com/jbenet/go-ipfs/repo/config" + counter "github.com/jbenet/go-ipfs/repo/fsrepo/counter" dir "github.com/jbenet/go-ipfs/repo/fsrepo/dir" util "github.com/jbenet/go-ipfs/util" ds2 "github.com/jbenet/go-ipfs/util/datastore2" debugerror "github.com/jbenet/go-ipfs/util/debugerror" ) -var _ Component = &DatastoreComponent{} -var _ Initializer = InitDatastoreComponent -var _ InitializationChecker = DatastoreComponentIsInitialized +var ( + _ Component = &DatastoreComponent{} + _ Initializer = InitDatastoreComponent + _ InitializationChecker = DatastoreComponentIsInitialized + + dsLock sync.Mutex // protects openersCounter and datastores + openersCounter *counter.Openers + datastores map[string]ds2.ThreadSafeDatastoreCloser +) + +func init() { + openersCounter = counter.NewOpenersCounter() + datastores = make(map[string]ds2.ThreadSafeDatastoreCloser) +} func InitDatastoreComponent(path string, conf *config.Config) error { // The actual datastore contents are initialized lazily when Opened. @@ -41,24 +56,57 @@ func DatastoreComponentIsInitialized(path string) bool { } // DatastoreComponent abstracts the datastore component of the FSRepo. -// NB: create with makeDatastoreComponent function. type DatastoreComponent struct { - path string - ds ds2.ThreadSafeDatastoreCloser + path string // required + ds ds2.ThreadSafeDatastoreCloser // assigned when repo is opened } +func (dsc *DatastoreComponent) SetPath(p string) { dsc.path = p } +func (dsc *DatastoreComponent) Datastore() datastore.ThreadSafeDatastore { return dsc.ds } + // Open returns an error if the config file is not present. func (dsc *DatastoreComponent) Open() error { - ds, err := levelds.NewDatastore(dsc.path, &levelds.Options{ - Compression: ldbopts.NoCompression, - }) - if err != nil { - return err + + dsLock.Lock() + defer dsLock.Unlock() + + // if no other goroutines have the datastore Open, initialize it and assign + // it to the package-scoped map for the goroutines that follow. + if openersCounter.NumOpeners(dsc.path) == 0 { + ds, err := levelds.NewDatastore(dsc.path, &levelds.Options{ + Compression: ldbopts.NoCompression, + }) + if err != nil { + return errors.New("unable to open leveldb datastore") + } + datastores[dsc.path] = ds + } + + // get the datastore from the package-scoped map and record self as an + // opener. + ds, dsIsPresent := datastores[dsc.path] + if !dsIsPresent { + // This indicates a programmer error has occurred. + return errors.New("datastore should be available, but it isn't") } dsc.ds = ds + openersCounter.AddOpener(dsc.path) // only after success return nil } -func (dsc *DatastoreComponent) Close() error { return dsc.ds.Close() } -func (dsc *DatastoreComponent) SetPath(p string) { dsc.path = p } -func (dsc *DatastoreComponent) Datastore() datastore.ThreadSafeDatastore { return dsc.ds } +func (dsc *DatastoreComponent) Close() error { + + dsLock.Lock() + defer dsLock.Unlock() + + // decrement the Opener count. if this goroutine is the last, also close + // the underlying datastore (and remove its reference from the map) + + openersCounter.RemoveOpener(dsc.path) + + if openersCounter.NumOpeners(dsc.path) == 0 { + delete(datastores, dsc.path) // remove the reference + return dsc.ds.Close() + } + return nil +} diff --git a/repo/fsrepo/component/datastore_test.go b/repo/fsrepo/component/datastore_test.go new file mode 100644 index 000000000..5338d1001 --- /dev/null +++ b/repo/fsrepo/component/datastore_test.go @@ -0,0 +1,30 @@ +package component + +import ( + "io/ioutil" + "path/filepath" + "testing" + + "github.com/jbenet/go-ipfs/repo/fsrepo/assert" +) + +// swap arg order +func testRepoPath(t *testing.T, path ...string) string { + name, err := ioutil.TempDir("", filepath.Join(path...)) + if err != nil { + t.Fatal(err) + } + return name +} + +func TestOpenMoreThanOnceInSameProcess(t *testing.T) { + t.Parallel() + path := testRepoPath(t) + dsc1 := DatastoreComponent{path: path} + dsc2 := DatastoreComponent{path: path} + assert.Nil(dsc1.Open(), t, "first repo should open successfully") + assert.Nil(dsc2.Open(), t, "second repo should open successfully") + + assert.Nil(dsc1.Close(), t) + assert.Nil(dsc2.Close(), t) +} diff --git a/repo/fsrepo/fsrepo_test.go b/repo/fsrepo/fsrepo_test.go index 3530cda79..9b181d41a 100644 --- a/repo/fsrepo/fsrepo_test.go +++ b/repo/fsrepo/fsrepo_test.go @@ -125,3 +125,17 @@ func TestDatastorePersistsFromRepoToRepo(t *testing.T) { assert.Nil(r2.Close(), t) assert.True(bytes.Compare(expected, actual) == 0, t, "data should match") } + +func TestOpenMoreThanOnceInSameProcess(t *testing.T) { + t.Parallel() + path := testRepoPath("", t) + assert.Nil(Init(path, &config.Config{}), t) + + r1 := At(path) + r2 := At(path) + assert.Nil(r1.Open(), t, "first repo should open successfully") + assert.Nil(r2.Open(), t, "second repo should open successfully") + + assert.Nil(r1.Close(), t) + assert.Nil(r2.Close(), t) +}