From 48a33ffb673fb3eaf6d20cd7ca6bed04426a2abb Mon Sep 17 00:00:00 2001 From: rht Date: Tue, 20 Oct 2015 10:56:41 +0700 Subject: [PATCH] Add fixed period repo GC + test License: MIT Signed-off-by: rht --- cmd/ipfs/daemon.go | 32 +++++++- commands/request.go | 2 +- core/commands/add.go | 8 ++ core/commands/cat.go | 5 ++ core/corerepo/gc.go | 142 +++++++++++++++++++++++++++++++++- repo/config/datastore.go | 7 +- repo/config/init.go | 7 +- repo/fsrepo/fsrepo.go | 18 ++++- repo/mock.go | 2 + repo/repo.go | 1 + test/sharness/lib/test-lib.sh | 4 +- test/sharness/t0080-repo.sh | 1 + 12 files changed, 214 insertions(+), 15 deletions(-) diff --git a/cmd/ipfs/daemon.go b/cmd/ipfs/daemon.go index b916391a4..ce0d45ecf 100644 --- a/cmd/ipfs/daemon.go +++ b/cmd/ipfs/daemon.go @@ -19,6 +19,7 @@ import ( "github.com/ipfs/go-ipfs/core" commands "github.com/ipfs/go-ipfs/core/commands" corehttp "github.com/ipfs/go-ipfs/core/corehttp" + corerepo "github.com/ipfs/go-ipfs/core/corerepo" "github.com/ipfs/go-ipfs/core/corerouting" conn "github.com/ipfs/go-ipfs/p2p/net/conn" peer "github.com/ipfs/go-ipfs/p2p/peer" @@ -36,6 +37,7 @@ const ( ipnsMountKwd = "mount-ipns" unrestrictedApiAccessKwd = "unrestricted-api" unencryptTransportKwd = "disable-transport-encryption" + enableGCKwd = "enable-gc" // apiAddrKwd = "address-api" // swarmAddrKwd = "address-swarm" ) @@ -114,6 +116,7 @@ future version, along with this notice. Please move to setting the HTTP Headers. cmds.StringOption(ipnsMountKwd, "Path to the mountpoint for IPNS (if using --mount)"), cmds.BoolOption(unrestrictedApiAccessKwd, "Allow API access to unlisted hashes"), cmds.BoolOption(unencryptTransportKwd, "Disable transport encryption (for debugging protocols)"), + cmds.BoolOption(enableGCKwd, "Enable automatic periodic repo garbage collection"), // TODO: add way to override addresses. tricky part: updating the config if also --init. // cmds.StringOption(apiAddrKwd, "Address for the daemon rpc API (overrides config)"), @@ -277,15 +280,23 @@ func daemonFunc(req cmds.Request, res cmds.Response) { } } + // repo blockstore GC - if --enable-gc flag is present + err, gcErrc := maybeRunGC(req, node) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + fmt.Printf("Daemon is ready\n") // collect long-running errors and block for shutdown // TODO(cryptix): our fuse currently doesnt follow this pattern for graceful shutdown - for err := range merge(apiErrc, gwErrc) { + for err := range merge(apiErrc, gwErrc, gcErrc) { if err != nil { + log.Error(err) res.SetError(err, cmds.ErrNormal) - return } } + return } // serveHTTPApi collects options, creates listener, prints status message and starts serving requests @@ -478,6 +489,23 @@ func mountFuse(req cmds.Request) error { return nil } +func maybeRunGC(req cmds.Request, node *core.IpfsNode) (error, <-chan error) { + enableGC, _, err := req.Option(enableGCKwd).Bool() + if err != nil { + return err, nil + } + if !enableGC { + return nil, nil + } + + errc := make(chan error) + go func() { + errc <- corerepo.PeriodicGC(req.Context(), node) + close(errc) + }() + return nil, errc +} + // merge does fan-in of multiple read-only error channels // taken from http://blog.golang.org/pipelines func merge(cs ...<-chan error) <-chan error { diff --git a/commands/request.go b/commands/request.go index 11e556754..b597c21c9 100644 --- a/commands/request.go +++ b/commands/request.go @@ -43,7 +43,7 @@ func (c *Context) GetConfig() (*config.Config, error) { } // GetNode returns the node of the current Command exection -// context. It may construct it with the providied function. +// context. It may construct it with the provided function. func (c *Context) GetNode() (*core.IpfsNode, error) { var err error if c.node == nil { diff --git a/core/commands/add.go b/core/commands/add.go index cdaa1ef30..a594f90f0 100644 --- a/core/commands/add.go +++ b/core/commands/add.go @@ -89,6 +89,7 @@ remains to be implemented. // see comment above return nil } + log.Debugf("Total size of file being added: %v\n", size) req.Values()["size"] = size @@ -100,6 +101,13 @@ remains to be implemented. res.SetError(err, cmds.ErrNormal) return } + // check if repo will exceed storage limit if added + // TODO: this doesn't handle the case if the hashed file is already in blocks (deduplicated) + // TODO: conditional GC is disabled due to it is somehow not possible to pass the size to the daemon + //if err := corerepo.ConditionalGC(req.Context(), n, uint64(size)); err != nil { + // res.SetError(err, cmds.ErrNormal) + // return + //} progress, _, _ := req.Option(progressOptionName).Bool() trickle, _, _ := req.Option(trickleOptionName).Bool() diff --git a/core/commands/cat.go b/core/commands/cat.go index 2860fcc6d..532404bfc 100644 --- a/core/commands/cat.go +++ b/core/commands/cat.go @@ -5,6 +5,7 @@ import ( cmds "github.com/ipfs/go-ipfs/commands" core "github.com/ipfs/go-ipfs/core" + "github.com/ipfs/go-ipfs/core/corerepo" coreunix "github.com/ipfs/go-ipfs/core/coreunix" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" @@ -44,6 +45,10 @@ it contains. return } + if err := corerepo.ConditionalGC(req.Context(), node, length); err != nil { + res.SetError(err, cmds.ErrNormal) + return + } res.SetLength(length) reader := io.MultiReader(readers...) diff --git a/core/corerepo/gc.go b/core/corerepo/gc.go index 8269964e0..5175a0410 100644 --- a/core/corerepo/gc.go +++ b/core/corerepo/gc.go @@ -1,21 +1,77 @@ package corerepo import ( + "errors" + "time" + + humanize "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/dustin/go-humanize" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" key "github.com/ipfs/go-ipfs/blocks/key" "github.com/ipfs/go-ipfs/core" - + repo "github.com/ipfs/go-ipfs/repo" logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log" ) var log = logging.Logger("corerepo") +var ErrMaxStorageExceeded = errors.New("Maximum storage limit exceeded. Maybe unpin some files?") + type KeyRemoved struct { Key key.Key } +type GC struct { + Node *core.IpfsNode + Repo repo.Repo + StorageMax uint64 + StorageGC uint64 + SlackGB uint64 + Storage uint64 +} + +func NewGC(n *core.IpfsNode) (*GC, error) { + r := n.Repo + cfg, err := r.Config() + if err != nil { + return nil, err + } + + // check if cfg has these fields initialized + // TODO: there should be a general check for all of the cfg fields + // maybe distinguish between user config file and default struct? + if cfg.Datastore.StorageMax == "" { + r.SetConfigKey("Datastore.StorageMax", "10GB") + cfg.Datastore.StorageMax = "10GB" + } + if cfg.Datastore.StorageGCWatermark == 0 { + r.SetConfigKey("Datastore.StorageGCWatermark", 90) + cfg.Datastore.StorageGCWatermark = 90 + } + + storageMax, err := humanize.ParseBytes(cfg.Datastore.StorageMax) + if err != nil { + return nil, err + } + storageGC := storageMax * uint64(cfg.Datastore.StorageGCWatermark) / 100 + + // calculate the slack space between StorageMax and StorageGCWatermark + // used to limit GC duration + slackGB := (storageMax - storageGC) / 10e9 + if slackGB < 1 { + slackGB = 1 + } + + return &GC{ + Node: n, + Repo: r, + StorageMax: storageMax, + StorageGC: storageGC, + SlackGB: slackGB, + }, nil +} + func GarbageCollect(n *core.IpfsNode, ctx context.Context) error { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) defer cancel() // in case error occurs during operation keychan, err := n.Blockstore.AllKeysChan(ctx) if err != nil { @@ -23,8 +79,7 @@ func GarbageCollect(n *core.IpfsNode, ctx context.Context) error { } for k := range keychan { // rely on AllKeysChan to close chan if !n.Pinning.IsPinned(k) { - err := n.Blockstore.DeleteBlock(k) - if err != nil { + if err := n.Blockstore.DeleteBlock(k); err != nil { return err } } @@ -66,3 +121,82 @@ func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) (<-chan *KeyRemo }() return output, nil } + +func PeriodicGC(ctx context.Context, node *core.IpfsNode) error { + cfg, err := node.Repo.Config() + if err != nil { + return err + } + + if cfg.Datastore.GCPeriod == "" { + node.Repo.SetConfigKey("Datastore.GCPeriod", "1h") + cfg.Datastore.GCPeriod = "1h" + } + + period, err := time.ParseDuration(cfg.Datastore.GCPeriod) + if err != nil { + return err + } + if int64(period) == 0 { + // if duration is 0, it means GC is disabled. + return nil + } + + gc, err := NewGC(node) + if err != nil { + return err + } + + for { + select { + case <-ctx.Done(): + return nil + case <-time.After(period): + // the private func maybeGC doesn't compute storageMax, storageGC, slackGC so that they are not re-computed for every cycle + if err := gc.maybeGC(ctx, 0); err != nil { + return err + } + } + } +} + +func ConditionalGC(ctx context.Context, node *core.IpfsNode, offset uint64) error { + gc, err := NewGC(node) + if err != nil { + return err + } + return gc.maybeGC(ctx, offset) +} + +func (gc *GC) maybeGC(ctx context.Context, offset uint64) error { + storage, err := gc.Repo.GetStorageUsage() + if err != nil { + return err + } + + if storage+offset > gc.StorageMax { + err := ErrMaxStorageExceeded + log.Error(err) + return err + } + + if storage+offset > gc.StorageGC { + // Do GC here + log.Info("Starting repo GC...") + defer log.EventBegin(ctx, "repoGC").Done() + // 1 minute is sufficient for ~1GB unlink() blocks each of 100kb in SSD + _ctx, cancel := context.WithTimeout(ctx, time.Duration(gc.SlackGB)*time.Minute) + defer cancel() + + if err := GarbageCollect(gc.Node, _ctx); err != nil { + return err + } + newStorage, err := gc.Repo.GetStorageUsage() + if err != nil { + return err + } + log.Infof("Repo GC done. Released %s\n", humanize.Bytes(uint64(storage-newStorage))) + return nil + } + return nil +} diff --git a/repo/config/datastore.go b/repo/config/datastore.go index b615c650f..6749a4c39 100644 --- a/repo/config/datastore.go +++ b/repo/config/datastore.go @@ -5,8 +5,11 @@ const DefaultDataStoreDirectory = "datastore" // Datastore tracks the configuration of the datastore. type Datastore struct { - Type string - Path string + Type string + Path string + StorageMax string // in B, kB, kiB, MB, ... + StorageGCWatermark int64 // in percentage to multiply on StorageMax + GCPeriod string // in ns, us, ms, s, m, h } // DataStorePath returns the default data store path given a configuration root diff --git a/repo/config/init.go b/repo/config/init.go index 970597a65..4d50ac661 100644 --- a/repo/config/init.go +++ b/repo/config/init.go @@ -87,8 +87,11 @@ func datastoreConfig() (*Datastore, error) { return nil, err } return &Datastore{ - Path: dspath, - Type: "leveldb", + Path: dspath, + Type: "leveldb", + StorageMax: "10GB", + StorageGCWatermark: 90, // 90% + GCPeriod: "1h", }, nil } diff --git a/repo/fsrepo/fsrepo.go b/repo/fsrepo/fsrepo.go index 984bda2c7..c62e515ba 100644 --- a/repo/fsrepo/fsrepo.go +++ b/repo/fsrepo/fsrepo.go @@ -23,7 +23,6 @@ import ( mfsr "github.com/ipfs/go-ipfs/repo/fsrepo/migrations" serialize "github.com/ipfs/go-ipfs/repo/fsrepo/serialize" dir "github.com/ipfs/go-ipfs/thirdparty/dir" - u "github.com/ipfs/go-ipfs/util" util "github.com/ipfs/go-ipfs/util" ds2 "github.com/ipfs/go-ipfs/util/datastore2" logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log" @@ -166,7 +165,7 @@ func open(repoPath string) (repo.Repo, error) { } func newFSRepo(rpath string) (*FSRepo, error) { - expPath, err := u.TildeExpansion(filepath.Clean(rpath)) + expPath, err := util.TildeExpansion(filepath.Clean(rpath)) if err != nil { return nil, err } @@ -587,6 +586,21 @@ func (r *FSRepo) Datastore() ds.ThreadSafeDatastore { return d } +// GetStorageUsage computes the storage space taken by the repo in bytes +func (r *FSRepo) GetStorageUsage() (uint64, error) { + pth, err := config.PathRoot() + if err != nil { + return 0, err + } + + var du uint64 + err = filepath.Walk(pth, func(p string, f os.FileInfo, err error) error { + du += uint64(f.Size()) + return nil + }) + return du, err +} + var _ io.Closer = &FSRepo{} var _ repo.Repo = &FSRepo{} diff --git a/repo/mock.go b/repo/mock.go index 116ccefdc..e79a1faef 100644 --- a/repo/mock.go +++ b/repo/mock.go @@ -34,6 +34,8 @@ func (m *Mock) GetConfigKey(key string) (interface{}, error) { func (m *Mock) Datastore() ds.ThreadSafeDatastore { return m.D } +func (m *Mock) GetStorageUsage() (uint64, error) { return 0, nil } + func (m *Mock) Close() error { return errTODO } func (m *Mock) SetAPIAddr(addr string) error { return errTODO } diff --git a/repo/repo.go b/repo/repo.go index 4efcb2eb6..ed3b03112 100644 --- a/repo/repo.go +++ b/repo/repo.go @@ -21,6 +21,7 @@ type Repo interface { GetConfigKey(key string) (interface{}, error) Datastore() datastore.ThreadSafeDatastore + GetStorageUsage() (uint64, error) // SetAPIAddr sets the API address in the repo. SetAPIAddr(addr string) error diff --git a/test/sharness/lib/test-lib.sh b/test/sharness/lib/test-lib.sh index b9a25ae87..21f566ee2 100644 --- a/test/sharness/lib/test-lib.sh +++ b/test/sharness/lib/test-lib.sh @@ -324,8 +324,8 @@ disk_usage() { FreeBSD) DU="du -s -A -B 1" ;; - Darwin | DragonFly) - DU="du" + Darwin | DragonFly | *) + DU="du -s" ;; esac $DU "$1" | awk "{print \$1}" diff --git a/test/sharness/t0080-repo.sh b/test/sharness/t0080-repo.sh index ea24ec02c..fe0cf55b5 100755 --- a/test/sharness/t0080-repo.sh +++ b/test/sharness/t0080-repo.sh @@ -55,6 +55,7 @@ test_expect_success "'ipfs pin rm' output looks good" ' ' test_expect_failure "ipfs repo gc fully reverse ipfs add" ' + ipfs repo gc && random 100000 41 >gcfile && disk_usage "$IPFS_PATH/blocks" >expected && hash=`ipfs add -q gcfile` &&