1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-05-17 15:06:47 +08:00

blockstore locks return unlocker object now

License: MIT
Signed-off-by: Jeromy <jeromyj@gmail.com>
This commit is contained in:
Jeromy
2016-01-15 15:14:29 -08:00
parent 829606a9ed
commit 5474e15e93
16 changed files with 228 additions and 126 deletions

View File

@ -43,13 +43,13 @@ type GCBlockstore interface {
// GCLock locks the blockstore for garbage collection. No operations
// that expect to finish with a pin should ocurr simultaneously.
// Reading during GC is safe, and requires no lock.
GCLock() func()
GCLock() Unlocker
// PinLock locks the blockstore for sequences of puts expected to finish
// with a pin (before GC). Multiple put->pin sequences can write through
// at the same time, but no GC should not happen simulatenously.
// Reading during Pinning is safe, and requires no lock.
PinLock() func()
PinLock() Unlocker
// GcRequested returns true if GCLock has been called and is waiting to
// take the lock
@ -198,16 +198,29 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
return output, nil
}
func (bs *blockstore) GCLock() func() {
type Unlocker interface {
Unlock()
}
type unlocker struct {
unlock func()
}
func (u *unlocker) Unlock() {
u.unlock()
u.unlock = nil // ensure its not called twice
}
func (bs *blockstore) GCLock() Unlocker {
atomic.AddInt32(&bs.gcreq, 1)
bs.lk.Lock()
atomic.AddInt32(&bs.gcreq, -1)
return bs.lk.Unlock
return &unlocker{bs.lk.Unlock}
}
func (bs *blockstore) PinLock() func() {
func (bs *blockstore) PinLock() Unlocker {
bs.lk.RLock()
return bs.lk.RUnlock
return &unlocker{bs.lk.RUnlock}
}
func (bs *blockstore) GCRequested() bool {

View File

@ -59,11 +59,11 @@ func (w *writecache) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
return w.blockstore.AllKeysChan(ctx)
}
func (w *writecache) GCLock() func() {
func (w *writecache) GCLock() Unlocker {
return w.blockstore.(GCBlockstore).GCLock()
}
func (w *writecache) PinLock() func() {
func (w *writecache) PinLock() Unlocker {
return w.blockstore.(GCBlockstore).PinLock()
}

View File

@ -109,8 +109,7 @@ func statNode(ds dag.DAGService, fsn mfs.FSNode) (*Object, error) {
return nil, err
}
// add to dagserv to ensure its available
k, err := ds.Add(nd)
k, err := nd.Key()
if err != nil {
return nil, err
}
@ -159,6 +158,11 @@ var FilesCpCmd = &cmds.Command{
return
}
flush, found, _ := req.Option("flush").Bool()
if !found {
flush = true
}
src, err := checkPath(req.Arguments()[0])
if err != nil {
res.SetError(err, cmds.ErrNormal)
@ -181,6 +185,14 @@ var FilesCpCmd = &cmds.Command{
res.SetError(err, cmds.ErrNormal)
return
}
if flush {
err := mfs.FlushPath(node.FilesRoot, dst)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
}
},
}
@ -501,8 +513,8 @@ Warning:
create, _, _ := req.Option("create").Bool()
trunc, _, _ := req.Option("truncate").Bool()
flush, set, _ := req.Option("flush").Bool()
if !set {
flush, fset, _ := req.Option("flush").Bool()
if !fset {
flush = true
}
@ -529,14 +541,7 @@ Warning:
}
if flush {
defer func() {
fi.Close()
err := mfs.FlushPath(nd.FilesRoot, path)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
}()
defer fi.Close()
} else {
defer fi.Sync()
}
@ -653,9 +658,6 @@ are run with the '--flush=false'.
return
}
// take the lock and defer the unlock
defer nd.Blockstore.PinLock()()
path := "/"
if len(req.Arguments()) > 0 {
path = req.Arguments()[0]

View File

@ -54,8 +54,7 @@ on disk.
return
}
unlock := n.Blockstore.PinLock()
defer unlock()
defer n.Blockstore.PinLock().Unlock()
// set recursive flag
recursive, found, err := req.Option("recursive").Bool()

View File

@ -474,23 +474,6 @@ func (n *IpfsNode) loadBootstrapPeers() ([]peer.PeerInfo, error) {
func (n *IpfsNode) loadFilesRoot() error {
dsk := ds.NewKey("/local/filesroot")
pf := func(ctx context.Context, k key.Key) error {
ds := n.Repo.Datastore()
if old, err := ds.Get(dsk); err == nil && old != nil {
_ = n.Pinning.Unpin(ctx, key.Key(old.([]byte)), true)
}
nnd, err := n.DAG.Get(ctx, k)
if err != nil {
return err
}
if err := n.Pinning.Pin(ctx, nnd, true); err != nil {
return err
}
if err := n.Pinning.Flush(); err != nil {
return err
}
return n.Repo.Datastore().Put(dsk, []byte(k))
}

View File

@ -20,6 +20,7 @@ import (
"github.com/ipfs/go-ipfs/pin"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
bs "github.com/ipfs/go-ipfs/blocks/blockstore"
"github.com/ipfs/go-ipfs/commands/files"
core "github.com/ipfs/go-ipfs/core"
dag "github.com/ipfs/go-ipfs/merkledag"
@ -100,7 +101,7 @@ type Adder struct {
Chunker string
root *dag.Node
mr *mfs.Root
unlock func()
unlocker bs.Unlocker
tempRoot key.Key
}
@ -225,8 +226,7 @@ func (adder *Adder) outputDirs(path string, nd *dag.Node) error {
// Add builds a merkledag from the a reader, pinning all objects to the local
// datastore. Returns a key representing the root node.
func Add(n *core.IpfsNode, r io.Reader) (string, error) {
unlock := n.Blockstore.PinLock()
defer unlock()
defer n.Blockstore.PinLock().Unlock()
fileAdder, err := NewAdder(n.Context(), n, nil)
if err != nil {
@ -247,8 +247,7 @@ func Add(n *core.IpfsNode, r io.Reader) (string, error) {
// AddR recursively adds files in |path|.
func AddR(n *core.IpfsNode, root string) (key string, err error) {
unlock := n.Blockstore.PinLock()
defer unlock()
n.Blockstore.PinLock().Unlock()
stat, err := os.Lstat(root)
if err != nil {
@ -296,8 +295,7 @@ func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, *dag.No
}
fileAdder.Wrap = true
unlock := n.Blockstore.PinLock()
defer unlock()
defer n.Blockstore.PinLock().Unlock()
err = fileAdder.addFile(file)
if err != nil {
@ -347,8 +345,10 @@ func (adder *Adder) addNode(node *dag.Node, path string) error {
// Add the given file while respecting the adder.
func (adder *Adder) AddFile(file files.File) error {
adder.unlock = adder.node.Blockstore.PinLock()
defer adder.unlock()
adder.unlocker = adder.node.Blockstore.PinLock()
defer func() {
adder.unlocker.Unlock()
}()
return adder.addFile(file)
}
@ -434,8 +434,8 @@ func (adder *Adder) maybePauseForGC() error {
return err
}
adder.unlock()
adder.unlock = adder.node.Blockstore.PinLock()
adder.unlocker.Unlock()
adder.unlocker = adder.node.Blockstore.PinLock()
}
return nil
}

View File

@ -51,17 +51,20 @@ func NewDirectory(ctx context.Context, name string, node *dag.Node, parent child
// closeChild updates the child by the given name to the dag node 'nd'
// and changes its own dag node
func (d *Directory) closeChild(name string, nd *dag.Node) error {
mynd, err := d.closeChildUpdate(name, nd)
func (d *Directory) closeChild(name string, nd *dag.Node, sync bool) error {
mynd, err := d.closeChildUpdate(name, nd, sync)
if err != nil {
return err
}
return d.parent.closeChild(d.name, mynd)
if sync {
return d.parent.closeChild(d.name, mynd, true)
}
return nil
}
// closeChildUpdate is the portion of closeChild that needs to be locked around
func (d *Directory) closeChildUpdate(name string, nd *dag.Node) (*dag.Node, error) {
func (d *Directory) closeChildUpdate(name string, nd *dag.Node, sync bool) (*dag.Node, error) {
d.lock.Lock()
defer d.lock.Unlock()
@ -70,7 +73,10 @@ func (d *Directory) closeChildUpdate(name string, nd *dag.Node) (*dag.Node, erro
return nil, err
}
return d.flushCurrentNode()
if sync {
return d.flushCurrentNode()
}
return nil, nil
}
func (d *Directory) flushCurrentNode() (*dag.Node, error) {
@ -295,12 +301,15 @@ func (d *Directory) Unlink(name string) error {
}
func (d *Directory) Flush() error {
d.lock.Lock()
nd, err := d.flushCurrentNode()
if err != nil {
d.lock.Unlock()
return err
}
d.lock.Unlock()
return d.parent.closeChild(d.name, nd)
return d.parent.closeChild(d.name, nd, true)
}
// AddChild adds the node 'nd' under this directory giving it the name 'name'
@ -335,11 +344,6 @@ func (d *Directory) sync() error {
return err
}
_, err = d.dserv.Add(nd)
if err != nil {
return err
}
err = d.updateChild(name, nd)
if err != nil {
return err
@ -352,11 +356,6 @@ func (d *Directory) sync() error {
return err
}
_, err = d.dserv.Add(nd)
if err != nil {
return err
}
err = d.updateChild(name, nd)
if err != nil {
return err
@ -385,6 +384,11 @@ func (d *Directory) GetNode() (*dag.Node, error) {
return nil, err
}
_, err = d.dserv.Add(d.node)
if err != nil {
return nil, err
}
return d.node.Copy(), nil
}

View File

@ -73,7 +73,7 @@ func (fi *File) Close() error {
// explicitly stay locked for flushUp call,
// it will manage the lock for us
return fi.flushUp()
return fi.flushUp(true)
}
fi.Unlock()
@ -82,7 +82,7 @@ func (fi *File) Close() error {
// flushUp syncs the file and adds it to the dagservice
// it *must* be called with the File's lock taken
func (fi *File) flushUp() error {
func (fi *File) flushUp(fullsync bool) error {
nd, err := fi.mod.GetNode()
if err != nil {
fi.Unlock()
@ -95,20 +95,18 @@ func (fi *File) flushUp() error {
return err
}
//name := fi.name
//parent := fi.parent
name := fi.name
parent := fi.parent
// explicit unlock *only* before closeChild call
fi.Unlock()
return nil
//return parent.closeChild(name, nd)
return parent.closeChild(name, nd, fullsync)
}
// Sync flushes the changes in the file to disk
func (fi *File) Sync() error {
fi.Lock()
defer fi.Unlock()
return fi.mod.Sync()
return fi.flushUp(false)
}
// Seek implements io.Seeker

View File

@ -576,15 +576,15 @@ func actorRemoveFile(d *Directory) error {
return d.Unlink(re.Name)
}
func actorReadFile(d *Directory) error {
func randomFile(d *Directory) (*File, error) {
d, err := randomWalk(d, rand.Intn(6))
if err != nil {
return err
return nil, err
}
ents, err := d.List()
if err != nil {
return err
return nil, err
}
var files []string
@ -595,18 +595,61 @@ func actorReadFile(d *Directory) error {
}
if len(files) == 0 {
return nil
return nil, nil
}
fname := files[rand.Intn(len(files))]
fsn, err := d.Child(fname)
if err != nil {
return err
return nil, err
}
fi, ok := fsn.(*File)
if !ok {
return errors.New("file wasnt a file, race?")
return nil, errors.New("file wasnt a file, race?")
}
return fi, nil
}
func actorWriteFile(d *Directory) error {
fi, err := randomFile(d)
if err != nil {
return err
}
if fi == nil {
return nil
}
size := rand.Intn(1024)
buf := make([]byte, size)
randbo.New().Read(buf)
s, err := fi.Size()
if err != nil {
return err
}
offset := rand.Int63n(s)
n, err := fi.WriteAt(buf, offset)
if err != nil {
return err
}
if n != size {
return fmt.Errorf("didnt write enough")
}
return fi.Close()
}
func actorReadFile(d *Directory) error {
fi, err := randomFile(d)
if err != nil {
return err
}
if fi == nil {
return nil
}
_, err = fi.Size()
@ -637,12 +680,7 @@ func testActor(rt *Root, iterations int, errs chan error) {
return
}
case 3:
continue
// randomly deleting things
// doesnt really give us any sort of useful test results.
// you will never have this in a real environment where
// you expect anything productive to happen...
if err := actorRemoveFile(d); err != nil {
if err := actorWriteFile(d); err != nil {
errs <- err
return
}
@ -698,6 +736,9 @@ func TestFlushing(t *testing.T) {
if err := e.AddChild("TEST", nd1); err != nil {
t.Fatal(err)
}
if err := dir.AddChild("FILE", nd1); err != nil {
t.Fatal(err)
}
if err := FlushPath(rt, "/a/b/c/TEST"); err != nil {
t.Fatal(err)
@ -711,17 +752,31 @@ func TestFlushing(t *testing.T) {
t.Fatal(err)
}
if err := FlushPath(rt, "/FILE"); err != nil {
t.Fatal(err)
}
rnd, err := dir.GetNode()
if err != nil {
t.Fatal(err)
}
fsnode, err := ft.FSNodeFromBytes(rnd.Data)
if err != nil {
t.Fatal(err)
}
if fsnode.Type != ft.TDirectory {
t.Fatal("root wasnt a directory")
}
rnk, err := rnd.Key()
if err != nil {
t.Fatal(err)
}
if rnk.B58String() != "QmWcvrHUFk7LQRrA4WqKjqy7ZyRGFLVagtgNxbEodTEzQ4" {
t.Fatal("dag looks wrong")
exp := "QmWMVyhTuyxUrXX3ynz171jq76yY3PktfY9Bxiph7b9ikr"
if rnk.B58String() != exp {
t.Fatalf("dag looks wrong, expected %s, but got %s", exp, rnk.B58String())
}
}

View File

@ -230,11 +230,6 @@ func flushPathRec(d *Directory, parts []string) (*dag.Node, error) {
return nil, err
}
_, err = d.dserv.Add(nd)
if err != nil {
return nil, err
}
return nd, nil
}
@ -247,6 +242,7 @@ func flushPathRec(d *Directory, parts []string) (*dag.Node, error) {
return nil, err
}
var ndagnode *dag.Node
switch next := next.(type) {
case *Directory:
nd, err := flushPathRec(next, parts[1:])
@ -254,25 +250,33 @@ func flushPathRec(d *Directory, parts []string) (*dag.Node, error) {
return nil, err
}
newnode, err := d.node.UpdateNodeLink(parts[0], nd)
if err != nil {
return nil, err
}
ndagnode = nd
_, err = d.dserv.Add(newnode)
if err != nil {
return nil, err
}
d.node = newnode
return newnode, nil
case *File:
if len(parts) > 1 {
return nil, fmt.Errorf("%s is a file, not a directory", parts[0])
}
return next.GetNode()
child, err := next.GetNode()
if err != nil {
return nil, err
}
ndagnode = child
default:
return nil, fmt.Errorf("unrecognized FSNode type: %#v", next)
}
newnode, err := d.node.UpdateNodeLink(parts[0], ndagnode)
if err != nil {
return nil, err
}
_, err = d.dserv.Add(newnode)
if err != nil {
return nil, err
}
d.node = newnode
return newnode, nil
}

View File

@ -29,7 +29,7 @@ var log = logging.Logger("mfs")
var ErrIsDirectory = errors.New("error: is a directory")
type childCloser interface {
closeChild(string, *dag.Node) error
closeChild(string, *dag.Node, bool) error
}
type NodeType int
@ -115,7 +115,7 @@ func (kr *Root) Flush() error {
return err
}
k, err := kr.dserv.Add(nd)
k, err := nd.Key()
if err != nil {
return err
}
@ -128,7 +128,7 @@ func (kr *Root) Flush() error {
// closeChild implements the childCloser interface, and signals to the publisher that
// there are changes ready to be published
func (kr *Root) closeChild(name string, nd *dag.Node) error {
func (kr *Root) closeChild(name string, nd *dag.Node, sync bool) error {
k, err := kr.dserv.Add(nd)
if err != nil {
return err
@ -146,7 +146,7 @@ func (kr *Root) Close() error {
return err
}
k, err := kr.dserv.Add(nd)
k, err := nd.Key()
if err != nil {
return err
}

View File

@ -23,7 +23,7 @@ var log = logging.Logger("gc")
// The routine then iterates over every block in the blockstore and
// deletes any block that is not found in the marked set.
func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner) (<-chan key.Key, error) {
unlock := bs.GCLock()
unlocker := bs.GCLock()
bsrv := bserv.New(bs, offline.Exchange(bs))
ds := dag.NewDAGService(bsrv)
@ -41,7 +41,7 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner) (<-chan key.
output := make(chan key.Key)
go func() {
defer close(output)
defer unlock()
defer unlocker.Unlock()
for {
select {
case k, ok := <-keychan:

View File

@ -123,6 +123,8 @@ func (p *pinner) Pin(ctx context.Context, node *mdag.Node, recurse bool) error {
return nil
}
var ErrNotPinned = fmt.Errorf("not pinned")
// Unpin a given key
func (p *pinner) Unpin(ctx context.Context, k key.Key, recursive bool) error {
p.lock.Lock()
@ -132,7 +134,7 @@ func (p *pinner) Unpin(ctx context.Context, k key.Key, recursive bool) error {
return err
}
if !pinned {
return fmt.Errorf("%s is not pinned", k)
return ErrNotPinned
}
switch reason {
case "recursive":

View File

@ -340,11 +340,6 @@ test_files_api() {
ipfs files flush /
'
test_expect_success "root hash is pinned after flush" '
ipfs pin ls > pins &&
grep $EXP_ROOT_HASH pins || (cat pins && exit 1)
'
# test mv
test_expect_success "can mv dir" '
ipfs files mv /cats/this/is /cats/

View File

@ -0,0 +1,53 @@
#!/bin/sh
#
# Copyright (c) 2016 Jeromy Johnson
# MIT Licensed; see the LICENSE file in this repository.
#
test_description="test the unix files api flushing"
. lib/test-lib.sh
test_init_ipfs
verify_path_exists() {
# simply running ls on a file should be a good 'check'
ipfs files ls $1
}
verify_dir_contents() {
dir=$1
shift
rm -f expected
touch expected
for e in $@
do
echo $e >> expected
done
test_expect_success "can list dir" '
ipfs files ls $dir > output
'
test_expect_success "dir entries look good" '
test_sort_cmp output expected
'
}
test_launch_ipfs_daemon
test_expect_success "can copy a file in" '
HASH=$(echo "foo" | ipfs add -q) &&
ipfs files cp /ipfs/$HASH /file
'
test_kill_ipfs_daemon
test_launch_ipfs_daemon
test_expect_success "file is still there" '
verify_path_exists /file
'
test_kill_ipfs_daemon
test_done

View File

@ -169,12 +169,6 @@ func (dm *DagModifier) Sync() error {
// Number of bytes we're going to write
buflen := dm.wrBuf.Len()
// Grab key for unpinning after mod operation
_, err := dm.curNode.Key()
if err != nil {
return err
}
// overwrite existing dag nodes
thisk, done, err := dm.modifyDag(dm.curNode, dm.writeStart, dm.wrBuf)
if err != nil {