mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-28 17:03:58 +08:00
@ -118,8 +118,8 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
|
||||
|
||||
// acquire the repo lock _before_ constructing a node. we need to make
|
||||
// sure we are permitted to access the resources (datastore, etc.)
|
||||
repo := fsrepo.At(req.Context().ConfigRoot)
|
||||
if err := repo.Open(); err != nil {
|
||||
repo, err := fsrepo.Open(req.Context().ConfigRoot)
|
||||
if err != nil {
|
||||
res.SetError(debugerror.Errorf("Couldn't obtain lock. Is another daemon already running?"), cmds.ErrNormal)
|
||||
return
|
||||
}
|
||||
|
@ -110,8 +110,8 @@ func addDefaultAssets(out io.Writer, repoRoot string) error {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
r := fsrepo.At(repoRoot)
|
||||
if err := r.Open(); err != nil { // NB: repo is owned by the node
|
||||
r, err := fsrepo.Open(repoRoot)
|
||||
if err != nil { // NB: repo is owned by the node
|
||||
return err
|
||||
}
|
||||
|
||||
@ -163,8 +163,8 @@ func initializeIpnsKeyspace(repoRoot string) error {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
r := fsrepo.At(repoRoot)
|
||||
if err := r.Open(); err != nil { // NB: repo is owned by the node
|
||||
r, err := fsrepo.Open(repoRoot)
|
||||
if err != nil { // NB: repo is owned by the node
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -193,8 +193,8 @@ func (i *cmdInvocation) constructNodeFunc(ctx context.Context) func() (*core.Ipf
|
||||
return nil, errors.New("constructing node without a request context")
|
||||
}
|
||||
|
||||
r := fsrepo.At(i.req.Context().ConfigRoot)
|
||||
if err := r.Open(); err != nil { // repo is owned by the node
|
||||
r, err := fsrepo.Open(i.req.Context().ConfigRoot)
|
||||
if err != nil { // repo is owned by the node
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -193,8 +193,8 @@ func tourGet(id tour.ID) (*tour.Topic, error) {
|
||||
// TODO share func
|
||||
func writeConfig(path string, cfg *config.Config) error {
|
||||
// NB: This needs to run on the daemon.
|
||||
r := fsrepo.At(path)
|
||||
if err := r.Open(); err != nil {
|
||||
r, err := fsrepo.Open(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer r.Close()
|
||||
|
@ -57,8 +57,8 @@ func run() error {
|
||||
}
|
||||
}
|
||||
|
||||
repo := fsrepo.At(repoPath)
|
||||
if err := repo.Open(); err != nil { // owned by node
|
||||
repo, err := fsrepo.Open(repoPath)
|
||||
if err != nil { // owned by node
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -57,8 +57,8 @@ func run() error {
|
||||
return err
|
||||
}
|
||||
}
|
||||
repo := fsrepo.At(repoPath)
|
||||
if err := repo.Open(); err != nil { // owned by node
|
||||
repo, err := fsrepo.Open(repoPath)
|
||||
if err != nil { // owned by node
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -65,8 +65,8 @@ func run(ipfsPath, watchPath string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
r := fsrepo.At(ipfsPath)
|
||||
if err := r.Open(); err != nil {
|
||||
r, err := fsrepo.Open(ipfsPath)
|
||||
if err != nil {
|
||||
// TODO handle case: daemon running
|
||||
// TODO handle case: repo doesn't exist or isn't initialized
|
||||
return err
|
||||
|
@ -66,8 +66,8 @@ in the bootstrap list).
|
||||
return
|
||||
}
|
||||
|
||||
r := fsrepo.At(req.Context().ConfigRoot)
|
||||
if err := r.Open(); err != nil {
|
||||
r, err := fsrepo.Open(req.Context().ConfigRoot)
|
||||
if err != nil {
|
||||
res.SetError(err, cmds.ErrNormal)
|
||||
return
|
||||
}
|
||||
@ -143,8 +143,8 @@ var bootstrapRemoveCmd = &cmds.Command{
|
||||
return
|
||||
}
|
||||
|
||||
r := fsrepo.At(req.Context().ConfigRoot)
|
||||
if err := r.Open(); err != nil {
|
||||
r, err := fsrepo.Open(req.Context().ConfigRoot)
|
||||
if err != nil {
|
||||
res.SetError(err, cmds.ErrNormal)
|
||||
return
|
||||
}
|
||||
@ -192,8 +192,8 @@ var bootstrapListCmd = &cmds.Command{
|
||||
},
|
||||
|
||||
Run: func(req cmds.Request, res cmds.Response) {
|
||||
r := fsrepo.At(req.Context().ConfigRoot)
|
||||
if err := r.Open(); err != nil {
|
||||
r, err := fsrepo.Open(req.Context().ConfigRoot)
|
||||
if err != nil {
|
||||
res.SetError(err, cmds.ErrNormal)
|
||||
return
|
||||
}
|
||||
|
@ -64,14 +64,13 @@ Set the value of the 'datastore.path' key:
|
||||
args := req.Arguments()
|
||||
key := args[0]
|
||||
|
||||
r := fsrepo.At(req.Context().ConfigRoot)
|
||||
if err := r.Open(); err != nil {
|
||||
r, err := fsrepo.Open(req.Context().ConfigRoot)
|
||||
if err != nil {
|
||||
res.SetError(err, cmds.ErrNormal)
|
||||
return
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
var err error
|
||||
var output *ConfigField
|
||||
if len(args) == 2 {
|
||||
value := args[1]
|
||||
@ -182,8 +181,8 @@ can't be undone.
|
||||
cmds.FileArg("file", true, false, "The file to use as the new config"),
|
||||
},
|
||||
Run: func(req cmds.Request, res cmds.Response) {
|
||||
r := fsrepo.At(req.Context().ConfigRoot)
|
||||
if err := r.Open(); err != nil {
|
||||
r, err := fsrepo.Open(req.Context().ConfigRoot)
|
||||
if err != nil {
|
||||
res.SetError(err, cmds.ErrNormal)
|
||||
return
|
||||
}
|
||||
|
@ -1,43 +0,0 @@
|
||||
package counter
|
||||
|
||||
import "path"
|
||||
|
||||
// TODO this could be made into something more generic.
|
||||
|
||||
type Openers struct {
|
||||
// repos maps repo paths to the number of openers holding an FSRepo handle
|
||||
// to it
|
||||
repos map[string]int
|
||||
}
|
||||
|
||||
func NewOpenersCounter() *Openers {
|
||||
return &Openers{
|
||||
repos: make(map[string]int),
|
||||
}
|
||||
}
|
||||
|
||||
// NumOpeners returns the number of FSRepos holding a handle to the repo at
|
||||
// this path. This method is not thread-safe. The caller must have this object
|
||||
// locked.
|
||||
func (l *Openers) NumOpeners(repoPath string) int {
|
||||
return l.repos[key(repoPath)]
|
||||
}
|
||||
|
||||
// AddOpener messages that an FSRepo holds a handle to the repo at this path.
|
||||
// This method is not thread-safe. The caller must have this object locked.
|
||||
func (l *Openers) AddOpener(repoPath string) error {
|
||||
l.repos[key(repoPath)]++
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoveOpener messgaes that an FSRepo no longer holds a handle to the repo at
|
||||
// this path. This method is not thread-safe. The caller must have this object
|
||||
// locked.
|
||||
func (l *Openers) RemoveOpener(repoPath string) error {
|
||||
l.repos[key(repoPath)]--
|
||||
return nil
|
||||
}
|
||||
|
||||
func key(repoPath string) string {
|
||||
return path.Clean(repoPath)
|
||||
}
|
@ -1,8 +1,6 @@
|
||||
package fsrepo
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
@ -15,7 +13,6 @@ import (
|
||||
repo "github.com/jbenet/go-ipfs/repo"
|
||||
"github.com/jbenet/go-ipfs/repo/common"
|
||||
config "github.com/jbenet/go-ipfs/repo/config"
|
||||
counter "github.com/jbenet/go-ipfs/repo/fsrepo/counter"
|
||||
lockfile "github.com/jbenet/go-ipfs/repo/fsrepo/lock"
|
||||
serialize "github.com/jbenet/go-ipfs/repo/fsrepo/serialize"
|
||||
dir "github.com/jbenet/go-ipfs/thirdparty/dir"
|
||||
@ -34,41 +31,33 @@ var (
|
||||
|
||||
// packageLock must be held to while performing any operation that modifies an
|
||||
// FSRepo's state field. This includes Init, Open, Close, and Remove.
|
||||
packageLock sync.Mutex // protects openersCounter and lockfiles
|
||||
// lockfiles holds references to the Closers that ensure that repos are
|
||||
// only accessed by one process at a time.
|
||||
lockfiles map[string]io.Closer
|
||||
// openersCounter prevents the fsrepo from being removed while there exist open
|
||||
// FSRepo handles. It also ensures that the Init is atomic.
|
||||
//
|
||||
// packageLock also protects numOpenedRepos
|
||||
//
|
||||
// If an operation is used when repo is Open and the operation does not
|
||||
// change the repo's state, the package lock does not need to be acquired.
|
||||
openersCounter *counter.Openers
|
||||
packageLock sync.Mutex
|
||||
|
||||
// protects dsOpenersCounter and datastores
|
||||
dsLock sync.Mutex
|
||||
dsOpenersCounter *counter.Openers
|
||||
datastores map[string]ds2.ThreadSafeDatastoreCloser
|
||||
// onlyOne keeps track of open FSRepo instances.
|
||||
//
|
||||
// TODO: once command Context / Repo integration is cleaned up,
|
||||
// this can be removed. Right now, this makes ConfigCmd.Run
|
||||
// function try to open the repo twice:
|
||||
//
|
||||
// $ ipfs daemon &
|
||||
// $ ipfs config foo
|
||||
//
|
||||
// The reason for the above is that in standalone mode without the
|
||||
// daemon, `ipfs config` tries to save work by not building the
|
||||
// full IpfsNode, but accessing the Repo directly.
|
||||
onlyOne repo.OnlyOne
|
||||
)
|
||||
|
||||
func init() {
|
||||
openersCounter = counter.NewOpenersCounter()
|
||||
lockfiles = make(map[string]io.Closer)
|
||||
|
||||
dsOpenersCounter = counter.NewOpenersCounter()
|
||||
datastores = make(map[string]ds2.ThreadSafeDatastoreCloser)
|
||||
}
|
||||
|
||||
// FSRepo represents an IPFS FileSystem Repo. It is safe for use by multiple
|
||||
// callers.
|
||||
type FSRepo struct {
|
||||
// state is the FSRepo's state (unopened, opened, closed)
|
||||
state state
|
||||
// has Close been called already
|
||||
closed bool
|
||||
// path is the file-system path
|
||||
path string
|
||||
// config is set on Open, guarded by packageLock
|
||||
// lockfile is the file system lock to prevent others from opening
|
||||
// the same fsrepo path concurrently
|
||||
lockfile io.Closer
|
||||
config *config.Config
|
||||
// ds is set on Open
|
||||
ds ds2.ThreadSafeDatastoreCloser
|
||||
@ -76,13 +65,63 @@ type FSRepo struct {
|
||||
|
||||
var _ repo.Repo = (*FSRepo)(nil)
|
||||
|
||||
// At returns a handle to an FSRepo at the provided |path|.
|
||||
func At(repoPath string) *FSRepo {
|
||||
// This method must not have side-effects.
|
||||
return &FSRepo{
|
||||
path: path.Clean(repoPath),
|
||||
state: unopened, // explicitly set for clarity
|
||||
// Open the FSRepo at path. Returns an error if the repo is not
|
||||
// initialized.
|
||||
func Open(repoPath string) (repo.Repo, error) {
|
||||
fn := func() (repo.Repo, error) {
|
||||
return open(repoPath)
|
||||
}
|
||||
return onlyOne.Open(repoPath, fn)
|
||||
}
|
||||
|
||||
func open(repoPath string) (repo.Repo, error) {
|
||||
packageLock.Lock()
|
||||
defer packageLock.Unlock()
|
||||
|
||||
expPath, err := u.TildeExpansion(path.Clean(repoPath))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
r := &FSRepo{
|
||||
path: expPath,
|
||||
}
|
||||
|
||||
r.lockfile, err = lockfile.Lock(r.path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
keepLocked := false
|
||||
defer func() {
|
||||
// unlock on error, leave it locked on success
|
||||
if !keepLocked {
|
||||
r.lockfile.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
if !isInitializedUnsynced(r.path) {
|
||||
return nil, debugerror.New("ipfs not initialized, please run 'ipfs init'")
|
||||
}
|
||||
// check repo path, then check all constituent parts.
|
||||
// TODO acquire repo lock
|
||||
// TODO if err := initCheckDir(logpath); err != nil { // }
|
||||
if err := dir.Writable(r.path); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := r.openConfig(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := r.openDatastore(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// log.Debugf("writing eventlogs to ...", c.path)
|
||||
configureEventLoggerAtRepoPath(r.config, r.path)
|
||||
|
||||
keepLocked = true
|
||||
return r, nil
|
||||
}
|
||||
|
||||
// ConfigAt returns an error if the FSRepo at the given path is not
|
||||
@ -165,15 +204,6 @@ func Init(repoPath string, conf *config.Config) error {
|
||||
// Remove recursively removes the FSRepo at |path|.
|
||||
func Remove(repoPath string) error {
|
||||
repoPath = path.Clean(repoPath)
|
||||
|
||||
// packageLock must be held to ensure that the repo is not removed while
|
||||
// being accessed by others.
|
||||
packageLock.Lock()
|
||||
defer packageLock.Unlock()
|
||||
|
||||
if openersCounter.NumOpeners(repoPath) != 0 {
|
||||
return errors.New("repo in use")
|
||||
}
|
||||
return os.RemoveAll(repoPath)
|
||||
}
|
||||
|
||||
@ -182,12 +212,11 @@ func Remove(repoPath string) error {
|
||||
func LockedByOtherProcess(repoPath string) bool {
|
||||
repoPath = path.Clean(repoPath)
|
||||
|
||||
// packageLock must be held to check the number of openers.
|
||||
packageLock.Lock()
|
||||
defer packageLock.Unlock()
|
||||
// TODO replace this with the "api" file
|
||||
// https://github.com/ipfs/specs/tree/master/repo/fs-repo
|
||||
|
||||
// NB: the lock is only held when repos are Open
|
||||
return lockfile.Locked(repoPath) && openersCounter.NumOpeners(repoPath) == 0
|
||||
return lockfile.Locked(repoPath)
|
||||
}
|
||||
|
||||
// openConfig returns an error if the config file is not present.
|
||||
@ -206,32 +235,14 @@ func (r *FSRepo) openConfig() error {
|
||||
|
||||
// openDatastore returns an error if the config file is not present.
|
||||
func (r *FSRepo) openDatastore() error {
|
||||
dsLock.Lock()
|
||||
defer dsLock.Unlock()
|
||||
|
||||
dsPath := path.Join(r.path, defaultDataStoreDirectory)
|
||||
|
||||
// if no other goroutines have the datastore Open, initialize it and assign
|
||||
// it to the package-scoped map for the goroutines that follow.
|
||||
if dsOpenersCounter.NumOpeners(dsPath) == 0 {
|
||||
ds, err := levelds.NewDatastore(dsPath, &levelds.Options{
|
||||
Compression: ldbopts.NoCompression,
|
||||
})
|
||||
if err != nil {
|
||||
return debugerror.New("unable to open leveldb datastore")
|
||||
}
|
||||
datastores[dsPath] = ds
|
||||
}
|
||||
|
||||
// get the datastore from the package-scoped map and record self as an
|
||||
// opener.
|
||||
ds, dsIsPresent := datastores[dsPath]
|
||||
if !dsIsPresent {
|
||||
// This indicates a programmer error has occurred.
|
||||
return errors.New("datastore should be available, but it isn't")
|
||||
}
|
||||
r.ds = ds
|
||||
dsOpenersCounter.AddOpener(dsPath) // only after success
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -247,76 +258,16 @@ func configureEventLoggerAtRepoPath(c *config.Config, repoPath string) {
|
||||
eventlog.Configure(eventlog.OutputRotatingLogFile(rotateConf))
|
||||
}
|
||||
|
||||
// Open returns an error if the repo is not initialized.
|
||||
func (r *FSRepo) Open() error {
|
||||
|
||||
// packageLock must be held to make sure that the repo is not destroyed by
|
||||
// another caller. It must not be released until initialization is complete
|
||||
// and the number of openers is incremeneted.
|
||||
packageLock.Lock()
|
||||
defer packageLock.Unlock()
|
||||
|
||||
expPath, err := u.TildeExpansion(r.path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.path = expPath
|
||||
|
||||
if r.state != unopened {
|
||||
return debugerror.Errorf("repo is %s", r.state)
|
||||
}
|
||||
if !isInitializedUnsynced(r.path) {
|
||||
return debugerror.New("ipfs not initialized, please run 'ipfs init'")
|
||||
}
|
||||
// check repo path, then check all constituent parts.
|
||||
// TODO acquire repo lock
|
||||
// TODO if err := initCheckDir(logpath); err != nil { // }
|
||||
if err := dir.Writable(r.path); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := r.openConfig(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := r.openDatastore(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// log.Debugf("writing eventlogs to ...", c.path)
|
||||
configureEventLoggerAtRepoPath(r.config, r.path)
|
||||
|
||||
return r.transitionToOpened()
|
||||
}
|
||||
|
||||
func (r *FSRepo) closeDatastore() error {
|
||||
dsLock.Lock()
|
||||
defer dsLock.Unlock()
|
||||
|
||||
dsPath := path.Join(r.path, defaultDataStoreDirectory)
|
||||
|
||||
// decrement the Opener count. if this goroutine is the last, also close
|
||||
// the underlying datastore (and remove its reference from the map)
|
||||
|
||||
dsOpenersCounter.RemoveOpener(dsPath)
|
||||
|
||||
if dsOpenersCounter.NumOpeners(dsPath) == 0 {
|
||||
delete(datastores, dsPath) // remove the reference
|
||||
return r.ds.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the FSRepo, releasing held resources.
|
||||
func (r *FSRepo) Close() error {
|
||||
packageLock.Lock()
|
||||
defer packageLock.Unlock()
|
||||
|
||||
if r.state != opened {
|
||||
return debugerror.Errorf("repo is %s", r.state)
|
||||
if r.closed {
|
||||
return debugerror.New("repo is closed")
|
||||
}
|
||||
|
||||
if err := r.closeDatastore(); err != nil {
|
||||
if err := r.ds.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -328,7 +279,11 @@ func (r *FSRepo) Close() error {
|
||||
// to disable logging once the component is closed.
|
||||
// eventlog.Configure(eventlog.Output(os.Stderr))
|
||||
|
||||
return r.transitionToClosed()
|
||||
r.closed = true
|
||||
if err := r.lockfile.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Config returns the FSRepo's config. This method must not be called if the
|
||||
@ -345,8 +300,8 @@ func (r *FSRepo) Config() *config.Config {
|
||||
packageLock.Lock()
|
||||
defer packageLock.Unlock()
|
||||
|
||||
if r.state != opened {
|
||||
panic(fmt.Sprintln("repo is", r.state))
|
||||
if r.closed {
|
||||
panic("repo is closed")
|
||||
}
|
||||
return r.config
|
||||
}
|
||||
@ -393,8 +348,8 @@ func (r *FSRepo) GetConfigKey(key string) (interface{}, error) {
|
||||
packageLock.Lock()
|
||||
defer packageLock.Unlock()
|
||||
|
||||
if r.state != opened {
|
||||
return nil, debugerror.Errorf("repo is %s", r.state)
|
||||
if r.closed {
|
||||
return nil, debugerror.New("repo is closed")
|
||||
}
|
||||
|
||||
filename, err := config.Filename(r.path)
|
||||
@ -413,8 +368,8 @@ func (r *FSRepo) SetConfigKey(key string, value interface{}) error {
|
||||
packageLock.Lock()
|
||||
defer packageLock.Unlock()
|
||||
|
||||
if r.state != opened {
|
||||
return debugerror.Errorf("repo is %s", r.state)
|
||||
if r.closed {
|
||||
return debugerror.New("repo is closed")
|
||||
}
|
||||
|
||||
filename, err := config.Filename(r.path)
|
||||
@ -479,37 +434,3 @@ func isInitializedUnsynced(repoPath string) bool {
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// transitionToOpened manages the state transition to |opened|. Caller must hold
|
||||
// the package mutex.
|
||||
func (r *FSRepo) transitionToOpened() error {
|
||||
r.state = opened
|
||||
if countBefore := openersCounter.NumOpeners(r.path); countBefore == 0 { // #first
|
||||
closer, err := lockfile.Lock(r.path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
lockfiles[r.path] = closer
|
||||
}
|
||||
return openersCounter.AddOpener(r.path)
|
||||
}
|
||||
|
||||
// transitionToClosed manages the state transition to |closed|. Caller must
|
||||
// hold the package mutex.
|
||||
func (r *FSRepo) transitionToClosed() error {
|
||||
r.state = closed
|
||||
if err := openersCounter.RemoveOpener(r.path); err != nil {
|
||||
return err
|
||||
}
|
||||
if countAfter := openersCounter.NumOpeners(r.path); countAfter == 0 {
|
||||
closer, ok := lockfiles[r.path]
|
||||
if !ok {
|
||||
return errors.New("package error: lockfile is not held")
|
||||
}
|
||||
if err := closer.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
delete(lockfiles, r.path)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -30,31 +30,7 @@ func TestInitIdempotence(t *testing.T) {
|
||||
func TestRemove(t *testing.T) {
|
||||
t.Parallel()
|
||||
path := testRepoPath("foo", t)
|
||||
assert.Nil(Remove(path), t, "should be able to remove after closed")
|
||||
}
|
||||
|
||||
func TestCannotRemoveIfOpen(t *testing.T) {
|
||||
t.Parallel()
|
||||
path := testRepoPath("TestCannotRemoveIfOpen", t)
|
||||
assert.Nil(Init(path, &config.Config{}), t, "should initialize successfully")
|
||||
r := At(path)
|
||||
assert.Nil(r.Open(), t)
|
||||
assert.Err(Remove(path), t, "should not be able to remove while open")
|
||||
assert.Nil(r.Close(), t)
|
||||
assert.Nil(Remove(path), t, "should be able to remove after closed")
|
||||
}
|
||||
|
||||
func TestCannotBeReopened(t *testing.T) {
|
||||
t.Parallel()
|
||||
path := testRepoPath("", t)
|
||||
assert.Nil(Init(path, &config.Config{}), t)
|
||||
r := At(path)
|
||||
assert.Nil(r.Open(), t)
|
||||
assert.Nil(r.Close(), t)
|
||||
assert.Err(r.Open(), t, "shouldn't be possible to re-open the repo")
|
||||
|
||||
// mutable state is the enemy. Take Close() as an opportunity to reduce
|
||||
// entropy. Callers ought to start fresh with a new handle by calling `At`.
|
||||
assert.Nil(Remove(path), t, "can remove a repository")
|
||||
}
|
||||
|
||||
func TestCanManageReposIndependently(t *testing.T) {
|
||||
@ -71,10 +47,10 @@ func TestCanManageReposIndependently(t *testing.T) {
|
||||
assert.True(IsInitialized(pathB), t, "b should be initialized")
|
||||
|
||||
t.Log("open the two repos")
|
||||
repoA := At(pathA)
|
||||
repoB := At(pathB)
|
||||
assert.Nil(repoA.Open(), t, "a")
|
||||
assert.Nil(repoB.Open(), t, "b")
|
||||
repoA, err := Open(pathA)
|
||||
assert.Nil(err, t, "a")
|
||||
repoB, err := Open(pathB)
|
||||
assert.Nil(err, t, "b")
|
||||
|
||||
t.Log("close and remove b while a is open")
|
||||
assert.Nil(repoB.Close(), t, "close b")
|
||||
@ -91,15 +67,15 @@ func TestDatastoreGetNotAllowedAfterClose(t *testing.T) {
|
||||
|
||||
assert.True(!IsInitialized(path), t, "should NOT be initialized")
|
||||
assert.Nil(Init(path, &config.Config{}), t, "should initialize successfully")
|
||||
r := At(path)
|
||||
assert.Nil(r.Open(), t, "should open successfully")
|
||||
r, err := Open(path)
|
||||
assert.Nil(err, t, "should open successfully")
|
||||
|
||||
k := "key"
|
||||
data := []byte(k)
|
||||
assert.Nil(r.Datastore().Put(datastore.NewKey(k), data), t, "Put should be successful")
|
||||
|
||||
assert.Nil(r.Close(), t)
|
||||
_, err := r.Datastore().Get(datastore.NewKey(k))
|
||||
_, err = r.Datastore().Get(datastore.NewKey(k))
|
||||
assert.Err(err, t, "after closer, Get should be fail")
|
||||
}
|
||||
|
||||
@ -108,16 +84,16 @@ func TestDatastorePersistsFromRepoToRepo(t *testing.T) {
|
||||
path := testRepoPath("test", t)
|
||||
|
||||
assert.Nil(Init(path, &config.Config{}), t)
|
||||
r1 := At(path)
|
||||
assert.Nil(r1.Open(), t)
|
||||
r1, err := Open(path)
|
||||
assert.Nil(err, t)
|
||||
|
||||
k := "key"
|
||||
expected := []byte(k)
|
||||
assert.Nil(r1.Datastore().Put(datastore.NewKey(k), expected), t, "using first repo, Put should be successful")
|
||||
assert.Nil(r1.Close(), t)
|
||||
|
||||
r2 := At(path)
|
||||
assert.Nil(r2.Open(), t)
|
||||
r2, err := Open(path)
|
||||
assert.Nil(err, t)
|
||||
v, err := r2.Datastore().Get(datastore.NewKey(k))
|
||||
assert.Nil(err, t, "using second repo, Get should be successful")
|
||||
actual, ok := v.([]byte)
|
||||
@ -131,11 +107,11 @@ func TestOpenMoreThanOnceInSameProcess(t *testing.T) {
|
||||
path := testRepoPath("", t)
|
||||
assert.Nil(Init(path, &config.Config{}), t)
|
||||
|
||||
r1 := At(path)
|
||||
r2 := At(path)
|
||||
assert.Nil(r1.Open(), t, "first repo should open successfully")
|
||||
assert.Nil(r2.Open(), t, "second repo should open successfully")
|
||||
assert.True(r1.ds == r2.ds, t, "repos should share the datastore")
|
||||
r1, err := Open(path)
|
||||
assert.Nil(err, t, "first repo should open successfully")
|
||||
r2, err := Open(path)
|
||||
assert.Nil(err, t, "second repo should open successfully")
|
||||
assert.True(r1 == r2, t, "second open returns same value")
|
||||
|
||||
assert.Nil(r1.Close(), t)
|
||||
assert.Nil(r2.Close(), t)
|
||||
|
@ -1,22 +0,0 @@
|
||||
package fsrepo
|
||||
|
||||
type state int
|
||||
|
||||
const (
|
||||
unopened = iota
|
||||
opened
|
||||
closed
|
||||
)
|
||||
|
||||
func (s state) String() string {
|
||||
switch s {
|
||||
case unopened:
|
||||
return "unopened"
|
||||
case opened:
|
||||
return "opened"
|
||||
case closed:
|
||||
return "closed"
|
||||
default:
|
||||
return "invalid"
|
||||
}
|
||||
}
|
72
repo/onlyone.go
Normal file
72
repo/onlyone.go
Normal file
@ -0,0 +1,72 @@
|
||||
package repo
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// OnlyOne tracks open Repos by arbitrary key and returns the already
|
||||
// open one.
|
||||
type OnlyOne struct {
|
||||
mu sync.Mutex
|
||||
active map[interface{}]*ref
|
||||
}
|
||||
|
||||
// Open a Repo identified by key. If Repo is not already open, the
|
||||
// open function is called, and the result is remember for further
|
||||
// use.
|
||||
//
|
||||
// Key must be comparable, or Open will panic. Make sure to pick keys
|
||||
// that are unique across different concrete Repo implementations,
|
||||
// e.g. by creating a local type:
|
||||
//
|
||||
// type repoKey string
|
||||
// r, err := o.Open(repoKey(path), open)
|
||||
//
|
||||
// Call Repo.Close when done.
|
||||
func (o *OnlyOne) Open(key interface{}, open func() (Repo, error)) (Repo, error) {
|
||||
o.mu.Lock()
|
||||
defer o.mu.Unlock()
|
||||
if o.active == nil {
|
||||
o.active = make(map[interface{}]*ref)
|
||||
}
|
||||
|
||||
item, found := o.active[key]
|
||||
if !found {
|
||||
repo, err := open()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
item = &ref{
|
||||
parent: o,
|
||||
key: key,
|
||||
Repo: repo,
|
||||
}
|
||||
o.active[key] = item
|
||||
}
|
||||
item.refs++
|
||||
return item, nil
|
||||
}
|
||||
|
||||
type ref struct {
|
||||
parent *OnlyOne
|
||||
key interface{}
|
||||
refs uint32
|
||||
Repo
|
||||
}
|
||||
|
||||
var _ Repo = (*ref)(nil)
|
||||
|
||||
func (r *ref) Close() error {
|
||||
r.parent.mu.Lock()
|
||||
defer r.parent.mu.Unlock()
|
||||
|
||||
r.refs--
|
||||
if r.refs > 0 {
|
||||
// others are holding it open
|
||||
return nil
|
||||
}
|
||||
|
||||
// last one
|
||||
delete(r.parent.active, r.key)
|
||||
return r.Repo.Close()
|
||||
}
|
@ -66,8 +66,8 @@ func run() error {
|
||||
repoPath := gopath.Join(cwd, ".go-ipfs")
|
||||
if err := ensureRepoInitialized(repoPath); err != nil {
|
||||
}
|
||||
repo := fsrepo.At(repoPath)
|
||||
if err := repo.Open(); err != nil { // owned by node
|
||||
repo, err := fsrepo.Open(repoPath)
|
||||
if err != nil { // owned by node
|
||||
return err
|
||||
}
|
||||
cfg := repo.Config()
|
||||
|
@ -212,8 +212,8 @@ func CliCheckForUpdates(cfg *config.Config, repoPath string) error {
|
||||
// if we checked successfully.
|
||||
if err == ErrNoUpdateAvailable {
|
||||
log.Noticef("No update available, checked on %s", time.Now())
|
||||
r := fsrepo.At(repoPath)
|
||||
if err := r.Open(); err != nil {
|
||||
r, err := fsrepo.Open(repoPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := recordUpdateCheck(cfg); err != nil {
|
||||
|
Reference in New Issue
Block a user