mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-03 04:37:30 +08:00
fix(fsrepo/datastore) allow goroutines to share the datastore.
doh! I forgot to make sure leveldb is only opened once. thanks for catching this @mappum * You may be wondering why we don't just share pointers to FSRepos. We want to manage the lifecycle of the FSRepo by tracking its `state`. Thus each FSRepo/goroutine requires private instance variables. For this reason, each `fsrepo.At(p)` caller must get its own goroutine. * There's a test in `fsrepo` because callers desire the ability to Open from multiple goroutines. There's a test in `component` because this is where the actual work needs to go in order to provide the desired contract. If the `component` package moves, the assurances need to move along with it. cc @whyrusleeping @jbenet side note: there are a couple packages in FSRepo that it might be worthwhile to extract once the dust settles on this feature-set.
This commit is contained in:
@ -1,19 +1,34 @@
|
|||||||
package component
|
package component
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
|
"sync"
|
||||||
|
|
||||||
datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||||
levelds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb"
|
levelds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb"
|
||||||
ldbopts "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt"
|
ldbopts "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt"
|
||||||
config "github.com/jbenet/go-ipfs/repo/config"
|
config "github.com/jbenet/go-ipfs/repo/config"
|
||||||
|
counter "github.com/jbenet/go-ipfs/repo/fsrepo/counter"
|
||||||
dir "github.com/jbenet/go-ipfs/repo/fsrepo/dir"
|
dir "github.com/jbenet/go-ipfs/repo/fsrepo/dir"
|
||||||
util "github.com/jbenet/go-ipfs/util"
|
util "github.com/jbenet/go-ipfs/util"
|
||||||
ds2 "github.com/jbenet/go-ipfs/util/datastore2"
|
ds2 "github.com/jbenet/go-ipfs/util/datastore2"
|
||||||
debugerror "github.com/jbenet/go-ipfs/util/debugerror"
|
debugerror "github.com/jbenet/go-ipfs/util/debugerror"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ Component = &DatastoreComponent{}
|
var (
|
||||||
var _ Initializer = InitDatastoreComponent
|
_ Component = &DatastoreComponent{}
|
||||||
var _ InitializationChecker = DatastoreComponentIsInitialized
|
_ Initializer = InitDatastoreComponent
|
||||||
|
_ InitializationChecker = DatastoreComponentIsInitialized
|
||||||
|
|
||||||
|
dsLock sync.Mutex // protects openersCounter and datastores
|
||||||
|
openersCounter *counter.Openers
|
||||||
|
datastores map[string]ds2.ThreadSafeDatastoreCloser
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
openersCounter = counter.NewOpenersCounter()
|
||||||
|
datastores = make(map[string]ds2.ThreadSafeDatastoreCloser)
|
||||||
|
}
|
||||||
|
|
||||||
func InitDatastoreComponent(path string, conf *config.Config) error {
|
func InitDatastoreComponent(path string, conf *config.Config) error {
|
||||||
// The actual datastore contents are initialized lazily when Opened.
|
// The actual datastore contents are initialized lazily when Opened.
|
||||||
@ -41,24 +56,57 @@ func DatastoreComponentIsInitialized(path string) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// DatastoreComponent abstracts the datastore component of the FSRepo.
|
// DatastoreComponent abstracts the datastore component of the FSRepo.
|
||||||
// NB: create with makeDatastoreComponent function.
|
|
||||||
type DatastoreComponent struct {
|
type DatastoreComponent struct {
|
||||||
path string
|
path string // required
|
||||||
ds ds2.ThreadSafeDatastoreCloser
|
ds ds2.ThreadSafeDatastoreCloser // assigned when repo is opened
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (dsc *DatastoreComponent) SetPath(p string) { dsc.path = p }
|
||||||
|
func (dsc *DatastoreComponent) Datastore() datastore.ThreadSafeDatastore { return dsc.ds }
|
||||||
|
|
||||||
// Open returns an error if the config file is not present.
|
// Open returns an error if the config file is not present.
|
||||||
func (dsc *DatastoreComponent) Open() error {
|
func (dsc *DatastoreComponent) Open() error {
|
||||||
|
|
||||||
|
dsLock.Lock()
|
||||||
|
defer dsLock.Unlock()
|
||||||
|
|
||||||
|
// if no other goroutines have the datastore Open, initialize it and assign
|
||||||
|
// it to the package-scoped map for the goroutines that follow.
|
||||||
|
if openersCounter.NumOpeners(dsc.path) == 0 {
|
||||||
ds, err := levelds.NewDatastore(dsc.path, &levelds.Options{
|
ds, err := levelds.NewDatastore(dsc.path, &levelds.Options{
|
||||||
Compression: ldbopts.NoCompression,
|
Compression: ldbopts.NoCompression,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return errors.New("unable to open leveldb datastore")
|
||||||
|
}
|
||||||
|
datastores[dsc.path] = ds
|
||||||
|
}
|
||||||
|
|
||||||
|
// get the datastore from the package-scoped map and record self as an
|
||||||
|
// opener.
|
||||||
|
ds, dsIsPresent := datastores[dsc.path]
|
||||||
|
if !dsIsPresent {
|
||||||
|
// This indicates a programmer error has occurred.
|
||||||
|
return errors.New("datastore should be available, but it isn't")
|
||||||
}
|
}
|
||||||
dsc.ds = ds
|
dsc.ds = ds
|
||||||
|
openersCounter.AddOpener(dsc.path) // only after success
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dsc *DatastoreComponent) Close() error { return dsc.ds.Close() }
|
func (dsc *DatastoreComponent) Close() error {
|
||||||
func (dsc *DatastoreComponent) SetPath(p string) { dsc.path = p }
|
|
||||||
func (dsc *DatastoreComponent) Datastore() datastore.ThreadSafeDatastore { return dsc.ds }
|
dsLock.Lock()
|
||||||
|
defer dsLock.Unlock()
|
||||||
|
|
||||||
|
// decrement the Opener count. if this goroutine is the last, also close
|
||||||
|
// the underlying datastore (and remove its reference from the map)
|
||||||
|
|
||||||
|
openersCounter.RemoveOpener(dsc.path)
|
||||||
|
|
||||||
|
if openersCounter.NumOpeners(dsc.path) == 0 {
|
||||||
|
delete(datastores, dsc.path) // remove the reference
|
||||||
|
return dsc.ds.Close()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
30
repo/fsrepo/component/datastore_test.go
Normal file
30
repo/fsrepo/component/datastore_test.go
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
package component
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io/ioutil"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/jbenet/go-ipfs/repo/fsrepo/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
// swap arg order
|
||||||
|
func testRepoPath(t *testing.T, path ...string) string {
|
||||||
|
name, err := ioutil.TempDir("", filepath.Join(path...))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
return name
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOpenMoreThanOnceInSameProcess(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
path := testRepoPath(t)
|
||||||
|
dsc1 := DatastoreComponent{path: path}
|
||||||
|
dsc2 := DatastoreComponent{path: path}
|
||||||
|
assert.Nil(dsc1.Open(), t, "first repo should open successfully")
|
||||||
|
assert.Nil(dsc2.Open(), t, "second repo should open successfully")
|
||||||
|
|
||||||
|
assert.Nil(dsc1.Close(), t)
|
||||||
|
assert.Nil(dsc2.Close(), t)
|
||||||
|
}
|
@ -125,3 +125,17 @@ func TestDatastorePersistsFromRepoToRepo(t *testing.T) {
|
|||||||
assert.Nil(r2.Close(), t)
|
assert.Nil(r2.Close(), t)
|
||||||
assert.True(bytes.Compare(expected, actual) == 0, t, "data should match")
|
assert.True(bytes.Compare(expected, actual) == 0, t, "data should match")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestOpenMoreThanOnceInSameProcess(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
path := testRepoPath("", t)
|
||||||
|
assert.Nil(Init(path, &config.Config{}), t)
|
||||||
|
|
||||||
|
r1 := At(path)
|
||||||
|
r2 := At(path)
|
||||||
|
assert.Nil(r1.Open(), t, "first repo should open successfully")
|
||||||
|
assert.Nil(r2.Open(), t, "second repo should open successfully")
|
||||||
|
|
||||||
|
assert.Nil(r1.Close(), t)
|
||||||
|
assert.Nil(r2.Close(), t)
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user