1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-27 07:57:30 +08:00

Merge pull request #1495 from rht/gc

Add fixed-period and conditional repo GC
This commit is contained in:
Juan Benet
2015-11-10 07:27:14 +00:00
12 changed files with 214 additions and 15 deletions

View File

@ -19,6 +19,7 @@ import (
"github.com/ipfs/go-ipfs/core"
commands "github.com/ipfs/go-ipfs/core/commands"
corehttp "github.com/ipfs/go-ipfs/core/corehttp"
corerepo "github.com/ipfs/go-ipfs/core/corerepo"
"github.com/ipfs/go-ipfs/core/corerouting"
conn "github.com/ipfs/go-ipfs/p2p/net/conn"
peer "github.com/ipfs/go-ipfs/p2p/peer"
@ -36,6 +37,7 @@ const (
ipnsMountKwd = "mount-ipns"
unrestrictedApiAccessKwd = "unrestricted-api"
unencryptTransportKwd = "disable-transport-encryption"
enableGCKwd = "enable-gc"
// apiAddrKwd = "address-api"
// swarmAddrKwd = "address-swarm"
)
@ -114,6 +116,7 @@ future version, along with this notice. Please move to setting the HTTP Headers.
cmds.StringOption(ipnsMountKwd, "Path to the mountpoint for IPNS (if using --mount)"),
cmds.BoolOption(unrestrictedApiAccessKwd, "Allow API access to unlisted hashes"),
cmds.BoolOption(unencryptTransportKwd, "Disable transport encryption (for debugging protocols)"),
cmds.BoolOption(enableGCKwd, "Enable automatic periodic repo garbage collection"),
// TODO: add way to override addresses. tricky part: updating the config if also --init.
// cmds.StringOption(apiAddrKwd, "Address for the daemon rpc API (overrides config)"),
@ -277,15 +280,23 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
}
}
fmt.Printf("Daemon is ready\n")
// collect long-running errors and block for shutdown
// TODO(cryptix): our fuse currently doesnt follow this pattern for graceful shutdown
for err := range merge(apiErrc, gwErrc) {
// repo blockstore GC - if --enable-gc flag is present
err, gcErrc := maybeRunGC(req, node)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
fmt.Printf("Daemon is ready\n")
// collect long-running errors and block for shutdown
// TODO(cryptix): our fuse currently doesnt follow this pattern for graceful shutdown
for err := range merge(apiErrc, gwErrc, gcErrc) {
if err != nil {
log.Error(err)
res.SetError(err, cmds.ErrNormal)
}
}
return
}
// serveHTTPApi collects options, creates listener, prints status message and starts serving requests
@ -478,6 +489,23 @@ func mountFuse(req cmds.Request) error {
return nil
}
func maybeRunGC(req cmds.Request, node *core.IpfsNode) (error, <-chan error) {
enableGC, _, err := req.Option(enableGCKwd).Bool()
if err != nil {
return err, nil
}
if !enableGC {
return nil, nil
}
errc := make(chan error)
go func() {
errc <- corerepo.PeriodicGC(req.Context(), node)
close(errc)
}()
return nil, errc
}
// merge does fan-in of multiple read-only error channels
// taken from http://blog.golang.org/pipelines
func merge(cs ...<-chan error) <-chan error {

View File

@ -43,7 +43,7 @@ func (c *Context) GetConfig() (*config.Config, error) {
}
// GetNode returns the node of the current Command exection
// context. It may construct it with the providied function.
// context. It may construct it with the provided function.
func (c *Context) GetNode() (*core.IpfsNode, error) {
var err error
if c.node == nil {

View File

@ -89,6 +89,7 @@ remains to be implemented.
// see comment above
return nil
}
log.Debugf("Total size of file being added: %v\n", size)
req.Values()["size"] = size
@ -100,6 +101,13 @@ remains to be implemented.
res.SetError(err, cmds.ErrNormal)
return
}
// check if repo will exceed storage limit if added
// TODO: this doesn't handle the case if the hashed file is already in blocks (deduplicated)
// TODO: conditional GC is disabled due to it is somehow not possible to pass the size to the daemon
//if err := corerepo.ConditionalGC(req.Context(), n, uint64(size)); err != nil {
// res.SetError(err, cmds.ErrNormal)
// return
//}
progress, _, _ := req.Option(progressOptionName).Bool()
trickle, _, _ := req.Option(trickleOptionName).Bool()

View File

@ -5,6 +5,7 @@ import (
cmds "github.com/ipfs/go-ipfs/commands"
core "github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/core/corerepo"
coreunix "github.com/ipfs/go-ipfs/core/coreunix"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
@ -44,6 +45,10 @@ it contains.
return
}
if err := corerepo.ConditionalGC(req.Context(), node, length); err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
res.SetLength(length)
reader := io.MultiReader(readers...)

View File

@ -1,21 +1,77 @@
package corerepo
import (
"errors"
"time"
humanize "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/dustin/go-humanize"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
key "github.com/ipfs/go-ipfs/blocks/key"
"github.com/ipfs/go-ipfs/core"
repo "github.com/ipfs/go-ipfs/repo"
logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
)
var log = logging.Logger("corerepo")
var ErrMaxStorageExceeded = errors.New("Maximum storage limit exceeded. Maybe unpin some files?")
type KeyRemoved struct {
Key key.Key
}
type GC struct {
Node *core.IpfsNode
Repo repo.Repo
StorageMax uint64
StorageGC uint64
SlackGB uint64
Storage uint64
}
func NewGC(n *core.IpfsNode) (*GC, error) {
r := n.Repo
cfg, err := r.Config()
if err != nil {
return nil, err
}
// check if cfg has these fields initialized
// TODO: there should be a general check for all of the cfg fields
// maybe distinguish between user config file and default struct?
if cfg.Datastore.StorageMax == "" {
r.SetConfigKey("Datastore.StorageMax", "10GB")
cfg.Datastore.StorageMax = "10GB"
}
if cfg.Datastore.StorageGCWatermark == 0 {
r.SetConfigKey("Datastore.StorageGCWatermark", 90)
cfg.Datastore.StorageGCWatermark = 90
}
storageMax, err := humanize.ParseBytes(cfg.Datastore.StorageMax)
if err != nil {
return nil, err
}
storageGC := storageMax * uint64(cfg.Datastore.StorageGCWatermark) / 100
// calculate the slack space between StorageMax and StorageGCWatermark
// used to limit GC duration
slackGB := (storageMax - storageGC) / 10e9
if slackGB < 1 {
slackGB = 1
}
return &GC{
Node: n,
Repo: r,
StorageMax: storageMax,
StorageGC: storageGC,
SlackGB: slackGB,
}, nil
}
func GarbageCollect(n *core.IpfsNode, ctx context.Context) error {
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
defer cancel() // in case error occurs during operation
keychan, err := n.Blockstore.AllKeysChan(ctx)
if err != nil {
@ -23,8 +79,7 @@ func GarbageCollect(n *core.IpfsNode, ctx context.Context) error {
}
for k := range keychan { // rely on AllKeysChan to close chan
if !n.Pinning.IsPinned(k) {
err := n.Blockstore.DeleteBlock(k)
if err != nil {
if err := n.Blockstore.DeleteBlock(k); err != nil {
return err
}
}
@ -66,3 +121,82 @@ func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) (<-chan *KeyRemo
}()
return output, nil
}
func PeriodicGC(ctx context.Context, node *core.IpfsNode) error {
cfg, err := node.Repo.Config()
if err != nil {
return err
}
if cfg.Datastore.GCPeriod == "" {
node.Repo.SetConfigKey("Datastore.GCPeriod", "1h")
cfg.Datastore.GCPeriod = "1h"
}
period, err := time.ParseDuration(cfg.Datastore.GCPeriod)
if err != nil {
return err
}
if int64(period) == 0 {
// if duration is 0, it means GC is disabled.
return nil
}
gc, err := NewGC(node)
if err != nil {
return err
}
for {
select {
case <-ctx.Done():
return nil
case <-time.After(period):
// the private func maybeGC doesn't compute storageMax, storageGC, slackGC so that they are not re-computed for every cycle
if err := gc.maybeGC(ctx, 0); err != nil {
return err
}
}
}
}
func ConditionalGC(ctx context.Context, node *core.IpfsNode, offset uint64) error {
gc, err := NewGC(node)
if err != nil {
return err
}
return gc.maybeGC(ctx, offset)
}
func (gc *GC) maybeGC(ctx context.Context, offset uint64) error {
storage, err := gc.Repo.GetStorageUsage()
if err != nil {
return err
}
if storage+offset > gc.StorageMax {
err := ErrMaxStorageExceeded
log.Error(err)
return err
}
if storage+offset > gc.StorageGC {
// Do GC here
log.Info("Starting repo GC...")
defer log.EventBegin(ctx, "repoGC").Done()
// 1 minute is sufficient for ~1GB unlink() blocks each of 100kb in SSD
_ctx, cancel := context.WithTimeout(ctx, time.Duration(gc.SlackGB)*time.Minute)
defer cancel()
if err := GarbageCollect(gc.Node, _ctx); err != nil {
return err
}
newStorage, err := gc.Repo.GetStorageUsage()
if err != nil {
return err
}
log.Infof("Repo GC done. Released %s\n", humanize.Bytes(uint64(storage-newStorage)))
return nil
}
return nil
}

View File

@ -7,6 +7,9 @@ const DefaultDataStoreDirectory = "datastore"
type Datastore struct {
Type string
Path string
StorageMax string // in B, kB, kiB, MB, ...
StorageGCWatermark int64 // in percentage to multiply on StorageMax
GCPeriod string // in ns, us, ms, s, m, h
}
// DataStorePath returns the default data store path given a configuration root

View File

@ -89,6 +89,9 @@ func datastoreConfig() (*Datastore, error) {
return &Datastore{
Path: dspath,
Type: "leveldb",
StorageMax: "10GB",
StorageGCWatermark: 90, // 90%
GCPeriod: "1h",
}, nil
}

View File

@ -23,7 +23,6 @@ import (
mfsr "github.com/ipfs/go-ipfs/repo/fsrepo/migrations"
serialize "github.com/ipfs/go-ipfs/repo/fsrepo/serialize"
dir "github.com/ipfs/go-ipfs/thirdparty/dir"
u "github.com/ipfs/go-ipfs/util"
util "github.com/ipfs/go-ipfs/util"
ds2 "github.com/ipfs/go-ipfs/util/datastore2"
logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
@ -166,7 +165,7 @@ func open(repoPath string) (repo.Repo, error) {
}
func newFSRepo(rpath string) (*FSRepo, error) {
expPath, err := u.TildeExpansion(filepath.Clean(rpath))
expPath, err := util.TildeExpansion(filepath.Clean(rpath))
if err != nil {
return nil, err
}
@ -587,6 +586,21 @@ func (r *FSRepo) Datastore() ds.ThreadSafeDatastore {
return d
}
// GetStorageUsage computes the storage space taken by the repo in bytes
func (r *FSRepo) GetStorageUsage() (uint64, error) {
pth, err := config.PathRoot()
if err != nil {
return 0, err
}
var du uint64
err = filepath.Walk(pth, func(p string, f os.FileInfo, err error) error {
du += uint64(f.Size())
return nil
})
return du, err
}
var _ io.Closer = &FSRepo{}
var _ repo.Repo = &FSRepo{}

View File

@ -34,6 +34,8 @@ func (m *Mock) GetConfigKey(key string) (interface{}, error) {
func (m *Mock) Datastore() ds.ThreadSafeDatastore { return m.D }
func (m *Mock) GetStorageUsage() (uint64, error) { return 0, nil }
func (m *Mock) Close() error { return errTODO }
func (m *Mock) SetAPIAddr(addr string) error { return errTODO }

View File

@ -21,6 +21,7 @@ type Repo interface {
GetConfigKey(key string) (interface{}, error)
Datastore() datastore.ThreadSafeDatastore
GetStorageUsage() (uint64, error)
// SetAPIAddr sets the API address in the repo.
SetAPIAddr(addr string) error

View File

@ -324,8 +324,8 @@ disk_usage() {
FreeBSD)
DU="du -s -A -B 1"
;;
Darwin | DragonFly)
DU="du"
Darwin | DragonFly | *)
DU="du -s"
;;
esac
$DU "$1" | awk "{print \$1}"

View File

@ -55,6 +55,7 @@ test_expect_success "'ipfs pin rm' output looks good" '
'
test_expect_failure "ipfs repo gc fully reverse ipfs add" '
ipfs repo gc &&
random 100000 41 >gcfile &&
disk_usage "$IPFS_PATH/blocks" >expected &&
hash=`ipfs add -q gcfile` &&