1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-05-17 15:06:47 +08:00

pin: add context and error return to most of the Pinner functions

This commit is contained in:
Michael Muré
2019-09-15 14:14:42 +02:00
committed by Michael Muré
parent 8c358629fb
commit 097b684b42
14 changed files with 105 additions and 66 deletions

View File

@ -2,6 +2,7 @@
package blockstoreutil
import (
"context"
"fmt"
"io"
@ -33,7 +34,7 @@ type RmBlocksOpts struct {
// It returns a channel where objects of type RemovedBlock are placed, when
// not using the Quiet option. Block removal is asynchronous and will
// skip any pinned blocks.
func RmBlocks(blocks bs.GCBlockstore, pins pin.Pinner, cids []cid.Cid, opts RmBlocksOpts) (<-chan interface{}, error) {
func RmBlocks(ctx context.Context, blocks bs.GCBlockstore, pins pin.Pinner, cids []cid.Cid, opts RmBlocksOpts) (<-chan interface{}, error) {
// make the channel large enough to hold any result to avoid
// blocking while holding the GCLock
out := make(chan interface{}, len(cids))
@ -43,7 +44,7 @@ func RmBlocks(blocks bs.GCBlockstore, pins pin.Pinner, cids []cid.Cid, opts RmBl
unlocker := blocks.GCLock()
defer unlocker.Unlock()
stillOkay := FilterPinned(pins, out, cids)
stillOkay := FilterPinned(ctx, pins, out, cids)
for _, c := range stillOkay {
// Kept for backwards compatibility. We may want to
@ -74,9 +75,9 @@ func RmBlocks(blocks bs.GCBlockstore, pins pin.Pinner, cids []cid.Cid, opts RmBl
// out channel, with an error which indicates that the Cid is pinned.
// This function is used in RmBlocks to filter out any blocks which are not
// to be removed (because they are pinned).
func FilterPinned(pins pin.Pinner, out chan<- interface{}, cids []cid.Cid) []cid.Cid {
func FilterPinned(ctx context.Context, pins pin.Pinner, out chan<- interface{}, cids []cid.Cid) []cid.Cid {
stillOkay := make([]cid.Cid, 0, len(cids))
res, err := pins.CheckIfPinned(cids...)
res, err := pins.CheckIfPinned(ctx, cids...)
if err != nil {
out <- &RemovedBlock{Error: fmt.Sprintf("pin check failed: %s", err)}
return nil

View File

@ -446,7 +446,7 @@ func pinLsKeys(req *cmds.Request, typeStr string, n *core.IpfsNode, api coreifac
return err
}
pinType, pinned, err := n.Pinning.IsPinnedWithType(c.Cid(), mode)
pinType, pinned, err := n.Pinning.IsPinnedWithType(req.Context, c.Cid(), mode)
if err != nil {
return err
}
@ -501,19 +501,31 @@ func pinLsAll(req *cmds.Request, typeStr string, n *core.IpfsNode, emit func(val
}
if typeStr == "direct" || typeStr == "all" {
err := AddToResultKeys(n.Pinning.DirectKeys(), "direct")
dkeys, err := n.Pinning.DirectKeys(req.Context)
if err != nil {
return err
}
err = AddToResultKeys(dkeys, "direct")
if err != nil {
return err
}
}
if typeStr == "recursive" || typeStr == "all" {
err := AddToResultKeys(n.Pinning.RecursiveKeys(), "recursive")
rkeys, err := n.Pinning.RecursiveKeys(req.Context)
if err != nil {
return err
}
err = AddToResultKeys(rkeys, "recursive")
if err != nil {
return err
}
}
if typeStr == "indirect" || typeStr == "all" {
for _, k := range n.Pinning.RecursiveKeys() {
rkeys, err := n.Pinning.RecursiveKeys(req.Context)
if err != nil {
return err
}
for _, k := range rkeys {
var visitErr error
err := dag.Walk(req.Context, dag.GetLinksWithDAG(n.DAG), k, func(c cid.Cid) bool {
r := keys.Visit(c)
@ -642,8 +654,10 @@ var verifyPinCmd = &cmds.Command{
explain: !quiet,
includeOk: verbose,
}
out := pinVerify(req.Context, n, opts, enc)
out, err := pinVerify(req.Context, n, opts, enc)
if err != nil {
return err
}
return res.Emit(out)
},
Type: PinVerifyRes{},
@ -685,13 +699,16 @@ type pinVerifyOpts struct {
includeOk bool
}
func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc cidenc.Encoder) <-chan interface{} {
func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc cidenc.Encoder) (<-chan interface{}, error) {
visited := make(map[cid.Cid]PinStatus)
bs := n.Blocks.Blockstore()
DAG := dag.NewDAGService(bserv.New(bs, offline.Exchange(bs)))
getLinks := dag.GetLinksWithDAG(DAG)
recPins := n.Pinning.RecursiveKeys()
recPins, err := n.Pinning.RecursiveKeys(ctx)
if err != nil {
return nil, err
}
var checkPin func(root cid.Cid) PinStatus
checkPin = func(root cid.Cid) PinStatus {
@ -747,7 +764,7 @@ func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc ci
}
}()
return out
return out, nil
}
// Format formats PinVerifyRes

View File

@ -56,7 +56,7 @@ func (api *BlockAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.Bloc
if settings.Pin {
api.pinning.PinWithMode(b.Cid(), pin.Recursive)
if err := api.pinning.Flush(); err != nil {
if err := api.pinning.Flush(ctx); err != nil {
return nil, err
}
}
@ -91,7 +91,7 @@ func (api *BlockAPI) Rm(ctx context.Context, p path.Path, opts ...caopts.BlockRm
cids := []cid.Cid{rp.Cid()}
o := util.RmBlocksOpts{Force: settings.Force}
out, err := util.RmBlocks(api.blockstore, api.pinning, cids, o)
out, err := util.RmBlocks(ctx, api.blockstore, api.pinning, cids, o)
if err != nil {
return err
}

View File

@ -26,7 +26,7 @@ func (adder *pinningAdder) Add(ctx context.Context, nd ipld.Node) error {
adder.pinning.PinWithMode(nd.Cid(), pin.Recursive)
return adder.pinning.Flush()
return adder.pinning.Flush(ctx)
}
func (adder *pinningAdder) AddMany(ctx context.Context, nds []ipld.Node) error {
@ -45,7 +45,7 @@ func (adder *pinningAdder) AddMany(ctx context.Context, nds []ipld.Node) error {
}
}
return adder.pinning.Flush()
return adder.pinning.Flush(ctx)
}
func (api *dagAPI) Pinning() ipld.NodeAdder {

View File

@ -119,7 +119,7 @@ func (api *ObjectAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.Obj
if options.Pin {
api.pinning.PinWithMode(dagnode.Cid(), pin.Recursive)
err = api.pinning.Flush()
err = api.pinning.Flush(ctx)
if err != nil {
return nil, err
}

View File

@ -37,7 +37,7 @@ func (api *PinAPI) Add(ctx context.Context, p path.Path, opts ...caopts.PinAddOp
return err
}
return api.pinning.Flush()
return api.pinning.Flush(ctx)
}
func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) ([]coreiface.Pin, error) {
@ -75,7 +75,7 @@ func (api *PinAPI) Rm(ctx context.Context, p path.Path, opts ...caopts.PinRmOpti
return err
}
return api.pinning.Flush()
return api.pinning.Flush(ctx)
}
func (api *PinAPI) Update(ctx context.Context, from path.Path, to path.Path, opts ...caopts.PinUpdateOption) error {
@ -101,7 +101,7 @@ func (api *PinAPI) Update(ctx context.Context, from path.Path, to path.Path, opt
return err
}
return api.pinning.Flush()
return api.pinning.Flush(ctx)
}
type pinStatus struct {
@ -137,7 +137,10 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, erro
bs := api.blockstore
DAG := merkledag.NewDAGService(bserv.New(bs, offline.Exchange(bs)))
getLinks := merkledag.GetLinksWithDAG(DAG)
recPins := api.pinning.RecursiveKeys()
recPins, err := api.pinning.RecursiveKeys(ctx)
if err != nil {
return nil, err
}
var checkPin func(root cid.Cid) *pinStatus
checkPin = func(root cid.Cid) *pinStatus {
@ -204,11 +207,19 @@ func (api *PinAPI) pinLsAll(typeStr string, ctx context.Context) ([]coreiface.Pi
}
if typeStr == "direct" || typeStr == "all" {
AddToResultKeys(api.pinning.DirectKeys(), "direct")
dkeys, err := api.pinning.DirectKeys(ctx)
if err != nil {
return nil, err
}
AddToResultKeys(dkeys, "direct")
}
if typeStr == "indirect" || typeStr == "all" {
set := cid.NewSet()
for _, k := range api.pinning.RecursiveKeys() {
rkeys, err := api.pinning.RecursiveKeys(ctx)
if err != nil {
return nil, err
}
for _, k := range rkeys {
err := merkledag.Walk(
ctx, merkledag.GetLinksWithDAG(api.dag), k,
set.Visit,
@ -221,7 +232,11 @@ func (api *PinAPI) pinLsAll(typeStr string, ctx context.Context) ([]coreiface.Pi
AddToResultKeys(set.Keys(), "indirect")
}
if typeStr == "recursive" || typeStr == "all" {
AddToResultKeys(api.pinning.RecursiveKeys(), "recursive")
rkeys, err := api.pinning.RecursiveKeys(ctx)
if err != nil {
return nil, err
}
AddToResultKeys(rkeys, "recursive")
}
out := make([]coreiface.Pin, 0, len(keys))

View File

@ -176,7 +176,7 @@ func (adder *Adder) PinRoot(root ipld.Node) error {
}
adder.pinning.PinWithMode(rnk, pin.Recursive)
return adder.pinning.Flush()
return adder.pinning.Flush(adder.ctx)
}
func (adder *Adder) outputDirs(path string, fsn mfs.FSNode) error {

View File

@ -23,7 +23,7 @@ func InitializeKeyspace(n *core.IpfsNode, key ci.PrivKey) error {
return err
}
err = n.Pinning.Flush()
err = n.Pinning.Flush(ctx)
if err != nil {
return err
}

2
go.mod
View File

@ -108,3 +108,5 @@ require (
)
go 1.13
replace github.com/ipfs/go-ipfs-provider => github.com/MichaelMure/go-ipfs-provider v0.2.2-0.20191017161655-f2597dc7065a

2
go.sum
View File

@ -8,6 +8,8 @@ github.com/AndreasBriese/bbloom v0.0.0-20190823232136-616930265c33/go.mod h1:bOv
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Kubuxu/go-os-helper v0.0.1 h1:EJiD2VUQyh5A9hWJLmc6iWg6yIcJ7jpBcwC8GMGXfDk=
github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y=
github.com/MichaelMure/go-ipfs-provider v0.2.2-0.20191017161655-f2597dc7065a h1:U/EUpAtQboxuN8yfU5ww0/5LWticjuR32KnFwRHGtqU=
github.com/MichaelMure/go-ipfs-provider v0.2.2-0.20191017161655-f2597dc7065a/go.mod h1:rcQBVqfblDQRk5LaCtf2uxuKxMJxvKmF5pLS0pO4au4=
github.com/Stebalien/go-bitfield v0.0.0-20180330043415-076a62f9ce6e/go.mod h1:3oM7gXIttpYDAJXpVNnSCiUMYBLIZ6cb1t+Ip982MRo=
github.com/Stebalien/go-bitfield v0.0.1 h1:X3kbSSPUaJK60wV2hjOPZwmpljr6VGCqdq4cBLhbQBo=
github.com/Stebalien/go-bitfield v0.0.1/go.mod h1:GNjFpasyUVkHMsfEOk8EFLJ9syQ6SI+XWrX9Wf2XH0s=

View File

@ -298,7 +298,7 @@ func InitializeKeyspace(ctx context.Context, pub Publisher, pins pin.Pinner, key
return err
}
err = pins.Flush()
err = pins.Flush(ctx)
if err != nil {
return err
}

View File

@ -201,7 +201,11 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ng ipld.NodeGetter, bestEffo
}
return links, nil
}
err := Descendants(ctx, getLinks, gcs, pn.RecursiveKeys())
rkeys, err := pn.RecursiveKeys(ctx)
if err != nil {
return nil, err
}
err = Descendants(ctx, getLinks, gcs, rkeys)
if err != nil {
errors = true
select {
@ -233,11 +237,19 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ng ipld.NodeGetter, bestEffo
}
}
for _, k := range pn.DirectKeys() {
dkeys, err := pn.DirectKeys(ctx)
if err != nil {
return nil, err
}
for _, k := range dkeys {
gcs.Add(k)
}
err = Descendants(ctx, getLinks, gcs, pn.InternalPins())
ikeys, err := pn.InternalPins(ctx)
if err != nil {
return nil, err
}
err = Descendants(ctx, getLinks, gcs, ikeys)
if err != nil {
errors = true
select {

View File

@ -105,11 +105,11 @@ func StringToMode(s string) (Mode, bool) {
type Pinner interface {
// IsPinned returns whether or not the given cid is pinned
// and an explanation of why its pinned
IsPinned(cid.Cid) (string, bool, error)
IsPinned(ctx context.Context, c cid.Cid) (string, bool, error)
// IsPinnedWithType returns whether or not the given cid is pinned with the
// given pin type, as well as returning the type of pin its pinned with.
IsPinnedWithType(cid.Cid, Mode) (string, bool, error)
IsPinnedWithType(ctx context.Context, c cid.Cid, mode Mode) (string, bool, error)
// Pin the given node, optionally recursively.
Pin(ctx context.Context, node ipld.Node, recursive bool) error
@ -125,7 +125,7 @@ type Pinner interface {
// Check if a set of keys are pinned, more efficient than
// calling IsPinned for each key
CheckIfPinned(cids ...cid.Cid) ([]Pinned, error)
CheckIfPinned(ctx context.Context, cids ...cid.Cid) ([]Pinned, error)
// PinWithMode is for manually editing the pin structure. Use with
// care! If used improperly, garbage collection may not be
@ -138,17 +138,17 @@ type Pinner interface {
RemovePinWithMode(cid.Cid, Mode)
// Flush writes the pin state to the backing datastore
Flush() error
Flush(ctx context.Context) error
// DirectKeys returns all directly pinned cids
DirectKeys() []cid.Cid
DirectKeys(ctx context.Context) ([]cid.Cid, error)
// DirectKeys returns all recursively pinned cids
RecursiveKeys() []cid.Cid
RecursiveKeys(ctx context.Context) ([]cid.Cid, error)
// InternalPins returns all cids kept pinned for the internal state of the
// pinner
InternalPins() []cid.Cid
InternalPins(ctx context.Context) ([]cid.Cid, error)
}
// Pinned represents CID which has been pinned with a pinning strategy.
@ -211,8 +211,6 @@ func NewPinner(dstore ds.Datastore, serv, internal ipld.DAGService) Pinner {
// Pin the given node, optionally recursive
func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error {
p.lock.Lock()
defer p.lock.Unlock()
err := p.dserv.Add(ctx, node)
if err != nil {
return err
@ -220,13 +218,16 @@ func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error {
c := node.Cid()
p.lock.Lock()
defer p.lock.Unlock()
if recurse {
if p.recursePin.Has(c) {
return nil
}
p.lock.Unlock()
// fetch entire graph
// temporary unlock to fetch the entire graph
err := mdag.FetchGraph(ctx, c, p.dserv)
p.lock.Lock()
if err != nil {
@ -243,13 +244,6 @@ func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error {
p.recursePin.Add(c)
} else {
p.lock.Unlock()
_, err := p.dserv.Get(ctx, c)
p.lock.Lock()
if err != nil {
return err
}
if p.recursePin.Has(c) {
return fmt.Errorf("%s already pinned recursively", c.String())
}
@ -286,15 +280,13 @@ func (p *pinner) isInternalPin(c cid.Cid) bool {
// IsPinned returns whether or not the given key is pinned
// and an explanation of why its pinned
func (p *pinner) IsPinned(c cid.Cid) (string, bool, error) {
p.lock.RLock()
defer p.lock.RUnlock()
func (p *pinner) IsPinned(ctx context.Context, c cid.Cid) (string, bool, error) {
return p.isPinnedWithType(c, Any)
}
// IsPinnedWithType returns whether or not the given cid is pinned with the
// given pin type, as well as returning the type of pin its pinned with.
func (p *pinner) IsPinnedWithType(c cid.Cid, mode Mode) (string, bool, error) {
func (p *pinner) IsPinnedWithType(ctx context.Context, c cid.Cid, mode Mode) (string, bool, error) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.isPinnedWithType(c, mode)
@ -347,7 +339,7 @@ func (p *pinner) isPinnedWithType(c cid.Cid, mode Mode) (string, bool, error) {
// CheckIfPinned Checks if a set of keys are pinned, more efficient than
// calling IsPinned for each key, returns the pinned status of cid(s)
func (p *pinner) CheckIfPinned(cids ...cid.Cid) ([]Pinned, error) {
func (p *pinner) CheckIfPinned(ctx context.Context, cids ...cid.Cid) ([]Pinned, error) {
p.lock.RLock()
defer p.lock.RUnlock()
pinned := make([]Pinned, 0, len(cids))
@ -494,19 +486,19 @@ func LoadPinner(d ds.Datastore, dserv, internal ipld.DAGService) (Pinner, error)
}
// DirectKeys returns a slice containing the directly pinned keys
func (p *pinner) DirectKeys() []cid.Cid {
func (p *pinner) DirectKeys(ctx context.Context) ([]cid.Cid, error) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.directPin.Keys()
return p.directPin.Keys(), nil
}
// RecursiveKeys returns a slice containing the recursively pinned keys
func (p *pinner) RecursiveKeys() []cid.Cid {
func (p *pinner) RecursiveKeys(ctx context.Context) ([]cid.Cid, error) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.recursePin.Keys()
return p.recursePin.Keys(), nil
}
// Update updates a recursive pin from one cid to another
@ -541,12 +533,10 @@ func (p *pinner) Update(ctx context.Context, from, to cid.Cid, unpin bool) error
}
// Flush encodes and writes pinner keysets to the datastore
func (p *pinner) Flush() error {
func (p *pinner) Flush(ctx context.Context) error {
p.lock.Lock()
defer p.lock.Unlock()
ctx := context.TODO()
internalset := cid.NewSet()
recordInternal := internalset.Add
@ -594,12 +584,12 @@ func (p *pinner) Flush() error {
// InternalPins returns all cids kept pinned for the internal state of the
// pinner
func (p *pinner) InternalPins() []cid.Cid {
func (p *pinner) InternalPins(ctx context.Context) ([]cid.Cid, error) {
p.lock.Lock()
defer p.lock.Unlock()
var out []cid.Cid
out = append(out, p.internalPin.Keys()...)
return out
return out, nil
}
// PinWithMode allows the user to have fine grained control over pin

View File

@ -31,7 +31,7 @@ func randNode() (*mdag.ProtoNode, cid.Cid) {
}
func assertPinned(t *testing.T, p Pinner, c cid.Cid, failmsg string) {
_, pinned, err := p.IsPinned(c)
_, pinned, err := p.IsPinned(context.Background(), c)
if err != nil {
t.Fatal(err)
}
@ -42,7 +42,7 @@ func assertPinned(t *testing.T, p Pinner, c cid.Cid, failmsg string) {
}
func assertUnpinned(t *testing.T, p Pinner, c cid.Cid, failmsg string) {
_, pinned, err := p.IsPinned(c)
_, pinned, err := p.IsPinned(context.Background(), c)
if err != nil {
t.Fatal(err)
}
@ -146,7 +146,7 @@ func TestPinnerBasic(t *testing.T) {
t.Fatal(err)
}
err = p.Flush()
err = p.Flush(ctx)
if err != nil {
t.Fatal(err)
}
@ -327,7 +327,7 @@ func TestFlush(t *testing.T) {
_, k := randNode()
p.PinWithMode(k, Recursive)
if err := p.Flush(); err != nil {
if err := p.Flush(context.Background()); err != nil {
t.Fatal(err)
}
assertPinned(t, p, k, "expected key to still be pinned")