mirror of
https://github.com/ipfs/kubo.git
synced 2025-09-10 09:52:20 +08:00
implement mark and sweep GC
License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com> dont GC blocks used by pinner License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com> comment GC algo License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com> add lock to blockstore to prevent GC from eating wanted blocks License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com> improve FetchGraph License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com> separate interfaces for blockstore and GCBlockstore License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com> reintroduce indirect pinning, add enumerateChildren dag method License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com>
This commit is contained in:
99
pin/gc/gc.go
Normal file
99
pin/gc/gc.go
Normal file
@ -0,0 +1,99 @@
|
||||
package gc
|
||||
|
||||
import (
|
||||
bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
|
||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||
bserv "github.com/ipfs/go-ipfs/blockservice"
|
||||
offline "github.com/ipfs/go-ipfs/exchange/offline"
|
||||
dag "github.com/ipfs/go-ipfs/merkledag"
|
||||
pin "github.com/ipfs/go-ipfs/pin"
|
||||
|
||||
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
|
||||
)
|
||||
|
||||
var log = logging.Logger("gc")
|
||||
|
||||
// GC performs a mark and sweep garbage collection of the blocks in the blockstore
|
||||
// first, it creates a 'marked' set and adds to it the following:
|
||||
// - all recursively pinned blocks, plus all of their descendants (recursively)
|
||||
// - all directly pinned blocks
|
||||
// - all blocks utilized internally by the pinner
|
||||
//
|
||||
// The routine then iterates over every block in the blockstore and
|
||||
// deletes any block that is not found in the marked set.
|
||||
func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner) (<-chan key.Key, error) {
|
||||
unlock := bs.GCLock()
|
||||
defer unlock()
|
||||
|
||||
bsrv := bserv.New(bs, offline.Exchange(bs))
|
||||
ds := dag.NewDAGService(bsrv)
|
||||
|
||||
// KeySet currently implemented in memory, in the future, may be bloom filter or
|
||||
// disk backed to conserve memory.
|
||||
gcs := key.NewKeySet()
|
||||
for _, k := range pn.RecursiveKeys() {
|
||||
gcs.Add(k)
|
||||
nd, err := ds.Get(ctx, k)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// EnumerateChildren recursively walks the dag and adds the keys to the given set
|
||||
err = dag.EnumerateChildren(ctx, ds, nd, gcs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
for _, k := range pn.DirectKeys() {
|
||||
gcs.Add(k)
|
||||
}
|
||||
for _, k := range pn.InternalPins() {
|
||||
gcs.Add(k)
|
||||
|
||||
nd, err := ds.Get(ctx, k)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// EnumerateChildren recursively walks the dag and adds the keys to the given set
|
||||
err = dag.EnumerateChildren(ctx, ds, nd, gcs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
keychan, err := bs.AllKeysChan(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
output := make(chan key.Key)
|
||||
go func() {
|
||||
defer close(output)
|
||||
for {
|
||||
select {
|
||||
case k, ok := <-keychan:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if !gcs.Has(k) {
|
||||
err := bs.DeleteBlock(k)
|
||||
if err != nil {
|
||||
log.Debugf("Error removing key from blockstore: %s", err)
|
||||
return
|
||||
}
|
||||
select {
|
||||
case output <- k:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return output, nil
|
||||
}
|
107
pin/pin.go
107
pin/pin.go
@ -24,7 +24,6 @@ var emptyKey = key.B58KeyDecode("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n"
|
||||
const (
|
||||
linkDirect = "direct"
|
||||
linkRecursive = "recursive"
|
||||
linkIndirect = "indirect"
|
||||
)
|
||||
|
||||
type PinMode int
|
||||
@ -32,7 +31,6 @@ type PinMode int
|
||||
const (
|
||||
Recursive PinMode = iota
|
||||
Direct
|
||||
Indirect
|
||||
NotPinned
|
||||
)
|
||||
|
||||
@ -52,8 +50,8 @@ type Pinner interface {
|
||||
|
||||
Flush() error
|
||||
DirectKeys() []key.Key
|
||||
IndirectKeys() map[key.Key]uint64
|
||||
RecursiveKeys() []key.Key
|
||||
InternalPins() []key.Key
|
||||
}
|
||||
|
||||
// pinner implements the Pinner interface
|
||||
@ -61,7 +59,7 @@ type pinner struct {
|
||||
lock sync.RWMutex
|
||||
recursePin set.BlockSet
|
||||
directPin set.BlockSet
|
||||
indirPin *indirectPin
|
||||
|
||||
// Track the keys used for storing the pinning state, so gc does
|
||||
// not delete them.
|
||||
internalPin map[key.Key]struct{}
|
||||
@ -80,7 +78,6 @@ func NewPinner(dstore ds.ThreadSafeDatastore, serv mdag.DAGService) Pinner {
|
||||
return &pinner{
|
||||
recursePin: rcset,
|
||||
directPin: dirset,
|
||||
indirPin: newIndirectPin(),
|
||||
dserv: serv,
|
||||
dstore: dstore,
|
||||
}
|
||||
@ -104,7 +101,8 @@ func (p *pinner) Pin(ctx context.Context, node *mdag.Node, recurse bool) error {
|
||||
p.directPin.RemoveBlock(k)
|
||||
}
|
||||
|
||||
err := p.pinLinks(ctx, node)
|
||||
// fetch entire graph
|
||||
err := mdag.FetchGraph(ctx, node, p.dserv)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -131,72 +129,18 @@ func (p *pinner) Unpin(ctx context.Context, k key.Key, recursive bool) error {
|
||||
if p.recursePin.HasKey(k) {
|
||||
if recursive {
|
||||
p.recursePin.RemoveBlock(k)
|
||||
node, err := p.dserv.Get(ctx, k)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return p.unpinLinks(ctx, node)
|
||||
return nil
|
||||
} else {
|
||||
return fmt.Errorf("%s is pinned recursively", k)
|
||||
}
|
||||
} else if p.directPin.HasKey(k) {
|
||||
p.directPin.RemoveBlock(k)
|
||||
return nil
|
||||
} else if p.indirPin.HasKey(k) {
|
||||
return fmt.Errorf("%s is pinned indirectly. indirect pins cannot be removed directly", k)
|
||||
} else {
|
||||
return fmt.Errorf("%s is not pinned", k)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *pinner) unpinLinks(ctx context.Context, node *mdag.Node) error {
|
||||
for _, l := range node.Links {
|
||||
node, err := l.GetNode(ctx, p.dserv)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
k, err := node.Key()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
p.indirPin.Decrement(k)
|
||||
|
||||
err = p.unpinLinks(ctx, node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *pinner) pinIndirectRecurse(ctx context.Context, node *mdag.Node) error {
|
||||
k, err := node.Key()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
p.indirPin.Increment(k)
|
||||
return p.pinLinks(ctx, node)
|
||||
}
|
||||
|
||||
func (p *pinner) pinLinks(ctx context.Context, node *mdag.Node) error {
|
||||
for _, ng := range p.dserv.GetDAG(ctx, node) {
|
||||
subnode, err := ng.Get(ctx)
|
||||
if err != nil {
|
||||
// TODO: Maybe just log and continue?
|
||||
return err
|
||||
}
|
||||
err = p.pinIndirectRecurse(ctx, subnode)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *pinner) isInternalPin(key key.Key) bool {
|
||||
_, ok := p.internalPin[key]
|
||||
return ok
|
||||
@ -208,7 +152,6 @@ func (p *pinner) IsPinned(key key.Key) bool {
|
||||
defer p.lock.RUnlock()
|
||||
return p.recursePin.HasKey(key) ||
|
||||
p.directPin.HasKey(key) ||
|
||||
p.indirPin.HasKey(key) ||
|
||||
p.isInternalPin(key)
|
||||
}
|
||||
|
||||
@ -218,8 +161,6 @@ func (p *pinner) RemovePinWithMode(key key.Key, mode PinMode) {
|
||||
switch mode {
|
||||
case Direct:
|
||||
p.directPin.RemoveBlock(key)
|
||||
case Indirect:
|
||||
p.indirPin.Decrement(key)
|
||||
case Recursive:
|
||||
p.recursePin.RemoveBlock(key)
|
||||
default:
|
||||
@ -274,14 +215,6 @@ func LoadPinner(d ds.ThreadSafeDatastore, dserv mdag.DAGService) (Pinner, error)
|
||||
p.directPin = set.SimpleSetFromKeys(directKeys)
|
||||
}
|
||||
|
||||
{ // load indirect set
|
||||
refcnt, err := loadMultiset(ctx, dserv, root, linkIndirect, recordInternal)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot load indirect pins: %v", err)
|
||||
}
|
||||
p.indirPin = &indirectPin{refCounts: refcnt}
|
||||
}
|
||||
|
||||
p.internalPin = internalPin
|
||||
|
||||
// assign services
|
||||
@ -296,11 +229,6 @@ func (p *pinner) DirectKeys() []key.Key {
|
||||
return p.directPin.GetKeys()
|
||||
}
|
||||
|
||||
// IndirectKeys returns a slice containing the indirectly pinned keys
|
||||
func (p *pinner) IndirectKeys() map[key.Key]uint64 {
|
||||
return p.indirPin.GetRefs()
|
||||
}
|
||||
|
||||
// RecursiveKeys returns a slice containing the recursively pinned keys
|
||||
func (p *pinner) RecursiveKeys() []key.Key {
|
||||
return p.recursePin.GetKeys()
|
||||
@ -339,20 +267,17 @@ func (p *pinner) Flush() error {
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
n, err := storeMultiset(ctx, p.dserv, p.indirPin.GetRefs(), recordInternal)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := root.AddNodeLink(linkIndirect, n); err != nil {
|
||||
return err
|
||||
}
|
||||
// add the empty node, its referenced by the pin sets but never created
|
||||
_, err := p.dserv.Add(new(mdag.Node))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
k, err := p.dserv.Add(root)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
internalPin[k] = struct{}{}
|
||||
if err := p.dstore.Put(pinDatastoreKey, []byte(k)); err != nil {
|
||||
return fmt.Errorf("cannot store pin state: %v", err)
|
||||
@ -361,6 +286,16 @@ func (p *pinner) Flush() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *pinner) InternalPins() []key.Key {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
var out []key.Key
|
||||
for k, _ := range p.internalPin {
|
||||
out = append(out, k)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// PinWithMode allows the user to have fine grained control over pin
|
||||
// counts
|
||||
func (p *pinner) PinWithMode(k key.Key, mode PinMode) {
|
||||
@ -371,7 +306,5 @@ func (p *pinner) PinWithMode(k key.Key, mode PinMode) {
|
||||
p.recursePin.AddBlock(k)
|
||||
case Direct:
|
||||
p.directPin.AddBlock(k)
|
||||
case Indirect:
|
||||
p.indirPin.Increment(k)
|
||||
}
|
||||
}
|
||||
|
@ -53,7 +53,7 @@ func TestPinnerBasic(t *testing.T) {
|
||||
}
|
||||
|
||||
// create new node c, to be indirectly pinned through b
|
||||
c, ck := randNode()
|
||||
c, _ := randNode()
|
||||
_, err = dserv.Add(c)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -82,10 +82,6 @@ func TestPinnerBasic(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if !p.IsPinned(ck) {
|
||||
t.Fatal("Child of recursively pinned node not found")
|
||||
}
|
||||
|
||||
bk, _ := b.Key()
|
||||
if !p.IsPinned(bk) {
|
||||
t.Fatal("Recursively pinned node not found..")
|
||||
@ -95,7 +91,7 @@ func TestPinnerBasic(t *testing.T) {
|
||||
d.AddNodeLink("a", a)
|
||||
d.AddNodeLink("c", c)
|
||||
|
||||
e, ek := randNode()
|
||||
e, _ := randNode()
|
||||
d.AddNodeLink("e", e)
|
||||
|
||||
// Must be in dagserv for unpin to work
|
||||
@ -110,10 +106,6 @@ func TestPinnerBasic(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if !p.IsPinned(ek) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
dk, _ := d.Key()
|
||||
if !p.IsPinned(dk) {
|
||||
t.Fatal("pinned node not found.")
|
||||
@ -125,11 +117,6 @@ func TestPinnerBasic(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// c should still be pinned under b
|
||||
if !p.IsPinned(ck) {
|
||||
t.Fatal("Recursive / indirect unpin fail.")
|
||||
}
|
||||
|
||||
err = p.Flush()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -145,11 +132,6 @@ func TestPinnerBasic(t *testing.T) {
|
||||
t.Fatal("Could not find pinned node!")
|
||||
}
|
||||
|
||||
// Test indirectly pinned
|
||||
if !np.IsPinned(ck) {
|
||||
t.Fatal("could not find indirectly pinned node")
|
||||
}
|
||||
|
||||
// Test recursively pinned
|
||||
if !np.IsPinned(bk) {
|
||||
t.Fatal("could not find recursively pinned node")
|
||||
@ -201,7 +183,7 @@ func TestFlush(t *testing.T) {
|
||||
p := NewPinner(dstore, dserv)
|
||||
_, k := randNode()
|
||||
|
||||
p.PinWithMode(k, Indirect)
|
||||
p.PinWithMode(k, Recursive)
|
||||
if err := p.Flush(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
Reference in New Issue
Block a user