diff --git a/blocks/blockstore/blockstore.go b/blocks/blockstore/blockstore.go index bc000df93..59f0f2c72 100644 --- a/blocks/blockstore/blockstore.go +++ b/blocks/blockstore/blockstore.go @@ -5,6 +5,7 @@ package blockstore import ( "errors" "sync" + "sync/atomic" ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" dsns "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace" @@ -49,6 +50,10 @@ type GCBlockstore interface { // at the same time, but no GC should not happen simulatenously. // Reading during Pinning is safe, and requires no lock. PinLock() func() + + // GcRequested returns true if GCLock has been called and is waiting to + // take the lock + GCRequested() bool } func NewBlockstore(d ds.Batching) *blockstore { @@ -63,7 +68,9 @@ func NewBlockstore(d ds.Batching) *blockstore { type blockstore struct { datastore ds.Batching - lk sync.RWMutex + lk sync.RWMutex + gcreq int32 + gcreqlk sync.Mutex } func (bs *blockstore) Get(k key.Key) (*blocks.Block, error) { @@ -192,7 +199,9 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan key.Key, error) { } func (bs *blockstore) GCLock() func() { + atomic.AddInt32(&bs.gcreq, 1) bs.lk.Lock() + atomic.AddInt32(&bs.gcreq, -1) return bs.lk.Unlock } @@ -200,3 +209,7 @@ func (bs *blockstore) PinLock() func() { bs.lk.RLock() return bs.lk.RUnlock } + +func (bs *blockstore) GCRequested() bool { + return atomic.LoadInt32(&bs.gcreq) > 0 +} diff --git a/blocks/blockstore/write_cache.go b/blocks/blockstore/write_cache.go index 52af696e4..73a7813f5 100644 --- a/blocks/blockstore/write_cache.go +++ b/blocks/blockstore/write_cache.go @@ -66,3 +66,7 @@ func (w *writecache) GCLock() func() { func (w *writecache) PinLock() func() { return w.blockstore.(GCBlockstore).PinLock() } + +func (w *writecache) GCRequested() bool { + return w.blockstore.(GCBlockstore).GCRequested() +} diff --git a/core/coreunix/add.go b/core/coreunix/add.go index 6ca989b26..80a4bb6ad 100644 --- a/core/coreunix/add.go +++ b/core/coreunix/add.go @@ -12,6 +12,7 @@ import ( syncds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" bstore "github.com/ipfs/go-ipfs/blocks/blockstore" + key "github.com/ipfs/go-ipfs/blocks/key" bserv "github.com/ipfs/go-ipfs/blockservice" "github.com/ipfs/go-ipfs/exchange/offline" importer "github.com/ipfs/go-ipfs/importer" @@ -99,6 +100,8 @@ type Adder struct { Chunker string root *dag.Node mr *mfs.Root + unlock func() + tempRoot key.Key } // Perform the actual add & pin locally, outputting results to reader @@ -157,6 +160,14 @@ func (params *Adder) PinRoot() error { return err } + if params.tempRoot != "" { + err := params.node.Pinning.Unpin(params.ctx, params.tempRoot, true) + if err != nil { + return err + } + params.tempRoot = rnk + } + params.node.Pinning.PinWithMode(rnk, pin.Recursive) return params.node.Pinning.Flush() } @@ -256,7 +267,7 @@ func AddR(n *core.IpfsNode, root string) (key string, err error) { return "", err } - err = fileAdder.AddFile(f) + err = fileAdder.addFile(f) if err != nil { return "", err } @@ -289,7 +300,7 @@ func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, *dag.No unlock := n.Blockstore.PinLock() defer unlock() - err = fileAdder.AddFile(file) + err = fileAdder.addFile(file) if err != nil { return "", nil, err } @@ -330,12 +341,24 @@ func (params *Adder) addNode(node *dag.Node, path string) error { // Add the given file while respecting the params. func (params *Adder) AddFile(file files.File) error { + params.unlock = params.node.Blockstore.PinLock() + defer params.unlock() + + return params.addFile(file) +} + +func (adder *Adder) addFile(file files.File) error { + err := adder.maybePauseForGC() + if err != nil { + return err + } + switch { - case files.IsHidden(file) && !params.Hidden: + case files.IsHidden(file) && !adder.Hidden: log.Debugf("%s is hidden, skipping", file.FileName()) return &hiddenFileError{file.FileName()} case file.IsDirectory(): - return params.addDir(file) + return adder.addDir(file) } // case for symlink @@ -346,29 +369,29 @@ func (params *Adder) AddFile(file files.File) error { } dagnode := &dag.Node{Data: sdata} - _, err = params.node.DAG.Add(dagnode) + _, err = adder.node.DAG.Add(dagnode) if err != nil { return err } - return params.addNode(dagnode, s.FileName()) + return adder.addNode(dagnode, s.FileName()) } // case for regular file // if the progress flag was specified, wrap the file so that we can send // progress updates to the client (over the output channel) var reader io.Reader = file - if params.Progress { - reader = &progressReader{file: file, out: params.out} + if adder.Progress { + reader = &progressReader{file: file, out: adder.out} } - dagnode, err := params.add(reader) + dagnode, err := adder.add(reader) if err != nil { return err } // patch it into the root - return params.addNode(dagnode, file.FileName()) + return adder.addNode(dagnode, file.FileName()) } func (params *Adder) addDir(dir files.File) error { @@ -388,7 +411,7 @@ func (params *Adder) addDir(dir files.File) error { break } - err = params.AddFile(file) + err = params.addFile(file) if _, ok := err.(*hiddenFileError); ok { // hidden file error, skip file continue @@ -400,6 +423,19 @@ func (params *Adder) addDir(dir files.File) error { return nil } +func (adder *Adder) maybePauseForGC() error { + if adder.node.Blockstore.GCRequested() { + err := adder.PinRoot() + if err != nil { + return err + } + + adder.unlock() + adder.unlock = adder.node.Blockstore.PinLock() + } + return nil +} + // outputDagnode sends dagnode info over the output channel func outputDagnode(out chan interface{}, name string, dn *dag.Node) error { if out == nil { diff --git a/pin/gc/gc.go b/pin/gc/gc.go index ec61f816a..df9ddedc6 100644 --- a/pin/gc/gc.go +++ b/pin/gc/gc.go @@ -24,7 +24,6 @@ var log = logging.Logger("gc") // deletes any block that is not found in the marked set. func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner) (<-chan key.Key, error) { unlock := bs.GCLock() - defer unlock() bsrv := bserv.New(bs, offline.Exchange(bs)) ds := dag.NewDAGService(bsrv) @@ -42,6 +41,7 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner) (<-chan key. output := make(chan key.Key) go func() { defer close(output) + defer unlock() for { select { case k, ok := <-keychan: