1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-07-02 03:28:25 +08:00

Merge pull request #1010 from ipfs/fix/pin-bug

fix pinning
This commit is contained in:
Juan Batiz-Benet
2015-04-20 00:21:05 -07:00
33 changed files with 556 additions and 99 deletions

View File

@ -143,7 +143,7 @@ func addDefaultAssets(out io.Writer, repoRoot string) error {
return err
}
if err := nd.Pinning.Pin(dir, true); err != nil {
if err := nd.Pinning.Pin(ctx, dir, true); err != nil {
return err
}

View File

@ -1,12 +1,14 @@
package commands
import (
"errors"
"fmt"
"io"
"path"
"strings"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/cheggaaa/pb"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
cmds "github.com/ipfs/go-ipfs/commands"
files "github.com/ipfs/go-ipfs/commands/files"
core "github.com/ipfs/go-ipfs/core"
@ -14,12 +16,9 @@ import (
importer "github.com/ipfs/go-ipfs/importer"
"github.com/ipfs/go-ipfs/importer/chunk"
dag "github.com/ipfs/go-ipfs/merkledag"
pinning "github.com/ipfs/go-ipfs/pin"
ft "github.com/ipfs/go-ipfs/unixfs"
u "github.com/ipfs/go-ipfs/util"
"github.com/ipfs/go-ipfs/util/debugerror"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/cheggaaa/pb"
)
// Error indicating the max depth has been exceded.
@ -105,7 +104,19 @@ remains to be implemented.
return
}
_, err = addFile(n, file, outChan, progress, wrap)
rootnd, err := addFile(n, file, outChan, progress, wrap)
if err != nil {
res.SetError(debugerror.Wrap(err), cmds.ErrNormal)
return
}
err = n.Pinning.Pin(context.Background(), rootnd, true)
if err != nil {
res.SetError(debugerror.Wrap(err), cmds.ErrNormal)
return
}
err = n.Pinning.Flush()
if err != nil {
res.SetError(debugerror.Wrap(err), cmds.ErrNormal)
return
@ -200,15 +211,10 @@ remains to be implemented.
}
func add(n *core.IpfsNode, readers []io.Reader) ([]*dag.Node, error) {
mp, ok := n.Pinning.(pinning.ManualPinner)
if !ok {
return nil, errors.New("invalid pinner type! expected manual pinner")
}
dagnodes := make([]*dag.Node, 0)
for _, reader := range readers {
node, err := importer.BuildDagFromReader(reader, n.DAG, mp, chunk.DefaultSplitter)
node, err := importer.BuildDagFromReader(reader, n.DAG, nil, chunk.DefaultSplitter)
if err != nil {
return nil, err
}
@ -229,11 +235,6 @@ func addNode(n *core.IpfsNode, node *dag.Node) error {
return err
}
err = n.Pinning.Pin(node, true) // ensure we keep it
if err != nil {
return err
}
return nil
}

View File

@ -5,6 +5,9 @@ import (
"fmt"
"io"
"text/tabwriter"
"time"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
cmds "github.com/ipfs/go-ipfs/commands"
merkledag "github.com/ipfs/go-ipfs/merkledag"
@ -77,7 +80,9 @@ it contains, with the following format:
Links: make([]LsLink, len(dagnode.Links)),
}
for j, link := range dagnode.Links {
link.Node, err = link.GetNode(node.DAG)
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
defer cancel()
link.Node, err = link.GetNode(ctx, node.DAG)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return

View File

@ -165,12 +165,14 @@ Use --type=<type> to specify the type of pinned keys to list. Valid values are:
* "indirect": pinned indirectly by an ancestor (like a refcount)
* "all"
To see the ref count on indirect pins, pass the -count option flag.
Defaults to "direct".
`,
},
Options: []cmds.Option{
cmds.StringOption("type", "t", "The type of pinned keys to list. Can be \"direct\", \"indirect\", \"recursive\", or \"all\". Defaults to \"direct\""),
cmds.BoolOption("count", "n", "Show refcount when listing indirect pins"),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.Context().GetNode()
@ -195,21 +197,57 @@ Defaults to "direct".
res.SetError(err, cmds.ErrClient)
}
keys := make([]u.Key, 0)
keys := make(map[string]int)
if typeStr == "direct" || typeStr == "all" {
keys = append(keys, n.Pinning.DirectKeys()...)
for _, k := range n.Pinning.DirectKeys() {
keys[k.B58String()] = 1
}
}
if typeStr == "indirect" || typeStr == "all" {
keys = append(keys, n.Pinning.IndirectKeys()...)
for k, v := range n.Pinning.IndirectKeys() {
keys[k.B58String()] = v
}
}
if typeStr == "recursive" || typeStr == "all" {
keys = append(keys, n.Pinning.RecursiveKeys()...)
for _, k := range n.Pinning.RecursiveKeys() {
keys[k.B58String()] = 1
}
}
res.SetOutput(&KeyList{Keys: keys})
res.SetOutput(&RefKeyList{Keys: keys})
},
Type: KeyList{},
Type: RefKeyList{},
Marshalers: cmds.MarshalerMap{
cmds.Text: KeyListTextMarshaler,
cmds.Text: func(res cmds.Response) (io.Reader, error) {
typeStr, _, err := res.Request().Option("type").String()
if err != nil {
return nil, err
}
count, _, err := res.Request().Option("count").Bool()
if err != nil {
return nil, err
}
keys, ok := res.Output().(*RefKeyList)
if !ok {
return nil, u.ErrCast()
}
out := new(bytes.Buffer)
if typeStr == "indirect" && count {
for k, v := range keys.Keys {
fmt.Fprintf(out, "%s %d\n", k, v)
}
} else {
for k, _ := range keys.Keys {
fmt.Fprintf(out, "%s\n", k)
}
}
return out, nil
},
},
}
type RefKeyList struct {
Keys map[string]int
}

View File

@ -352,7 +352,9 @@ func (i *gatewayHandler) putHandler(w http.ResponseWriter, r *http.Request) {
return
}
rootnd, err := i.node.Resolver.DAG.Get(u.Key(h))
tctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
rootnd, err := i.node.Resolver.DAG.Get(tctx, u.Key(h))
if err != nil {
webError(w, "Could not resolve root object", err, http.StatusBadRequest)
return
@ -414,7 +416,9 @@ func (i *gatewayHandler) deleteHandler(w http.ResponseWriter, r *http.Request) {
return
}
rootnd, err := i.node.Resolver.DAG.Get(u.Key(h))
tctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
rootnd, err := i.node.Resolver.DAG.Get(tctx, u.Key(h))
if err != nil {
webError(w, "Could not resolve root object", err, http.StatusBadRequest)
return

View File

@ -2,6 +2,9 @@ package corerepo
import (
"fmt"
"time"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/merkledag"
@ -27,7 +30,9 @@ func Pin(n *core.IpfsNode, paths []string, recursive bool) ([]u.Key, error) {
return nil, err
}
err = n.Pinning.Pin(dagnode, recursive)
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
defer cancel()
err = n.Pinning.Pin(ctx, dagnode, recursive)
if err != nil {
return nil, fmt.Errorf("pin: %s", err)
}
@ -56,7 +61,10 @@ func Unpin(n *core.IpfsNode, paths []string, recursive bool) ([]u.Key, error) {
var unpinned []u.Key
for _, dagnode := range dagnodes {
k, _ := dagnode.Key()
err := n.Pinning.Unpin(k, recursive)
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
defer cancel()
err := n.Pinning.Unpin(ctx, k, recursive)
if err != nil {
return nil, err
}

View File

@ -6,6 +6,9 @@ import (
"io/ioutil"
"os"
gopath "path"
"time"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/ipfs/go-ipfs/commands/files"
core "github.com/ipfs/go-ipfs/core"
@ -108,7 +111,9 @@ func addNode(n *core.IpfsNode, node *merkledag.Node) error {
if err != nil {
return err
}
err = n.Pinning.Pin(node, true) // ensure we keep it
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
defer cancel()
err = n.Pinning.Pin(ctx, node, true) // ensure we keep it
if err != nil {
return err
}

View File

@ -1,6 +1,10 @@
package coreunix
import (
"time"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
core "github.com/ipfs/go-ipfs/core"
dag "github.com/ipfs/go-ipfs/merkledag"
ft "github.com/ipfs/go-ipfs/unixfs"
@ -9,7 +13,10 @@ import (
func AddMetadataTo(n *core.IpfsNode, key string, m *ft.Metadata) (string, error) {
ukey := u.B58KeyDecode(key)
nd, err := n.DAG.Get(ukey)
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
defer cancel()
nd, err := n.DAG.Get(ctx, ukey)
if err != nil {
return "", err
}
@ -36,7 +43,10 @@ func AddMetadataTo(n *core.IpfsNode, key string, m *ft.Metadata) (string, error)
func Metadata(n *core.IpfsNode, key string) (*ft.Metadata, error) {
ukey := u.B58KeyDecode(key)
nd, err := n.DAG.Get(ukey)
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
defer cancel()
nd, err := n.DAG.Get(ctx, ukey)
if err != nil {
return nil, err
}

View File

@ -8,6 +8,7 @@ import (
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dssync "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
bserv "github.com/ipfs/go-ipfs/blockservice"
core "github.com/ipfs/go-ipfs/core"
@ -65,7 +66,7 @@ func TestMetadata(t *testing.T) {
t.Fatalf("something went wrong in conversion: '%s' != '%s'", rec.MimeType, m.MimeType)
}
retnode, err := ds.Get(u.B58KeyDecode(mdk))
retnode, err := ds.Get(context.Background(), u.B58KeyDecode(mdk))
if err != nil {
t.Fatal(err)
}

View File

@ -1,6 +1,10 @@
package ipns
import (
"time"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/ipfs/go-ipfs/core"
mdag "github.com/ipfs/go-ipfs/merkledag"
nsys "github.com/ipfs/go-ipfs/namesys"
@ -17,7 +21,10 @@ func InitializeKeyspace(n *core.IpfsNode, key ci.PrivKey) error {
return err
}
err = n.Pinning.Pin(emptyDir, false)
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
defer cancel()
err = n.Pinning.Pin(ctx, emptyDir, false)
if err != nil {
return err
}

View File

@ -93,7 +93,7 @@ func getPaths(t *testing.T, ipfs *core.IpfsNode, name string, n *dag.Node) []str
}
var out []string
for _, lnk := range n.Links {
child, err := lnk.GetNode(ipfs.DAG)
child, err := lnk.GetNode(ipfs.Context(), ipfs.DAG)
if err != nil {
t.Fatal(err)
}

View File

@ -2,7 +2,9 @@ package helpers
import (
"fmt"
"time"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
chunk "github.com/ipfs/go-ipfs/importer/chunk"
dag "github.com/ipfs/go-ipfs/merkledag"
"github.com/ipfs/go-ipfs/pin"
@ -76,7 +78,10 @@ func (n *UnixfsNode) NumChildren() int {
}
func (n *UnixfsNode) GetChild(i int, ds dag.DAGService) (*UnixfsNode, error) {
nd, err := n.node.Links[i].GetNode(ds)
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
defer cancel()
nd, err := n.node.Links[i].GetNode(ctx, ds)
if err != nil {
return nil, err
}

View File

@ -607,7 +607,7 @@ func printDag(nd *merkledag.Node, ds merkledag.DAGService, indent int) {
fmt.Println()
}
for _, lnk := range nd.Links {
child, err := lnk.GetNode(ds)
child, err := lnk.GetNode(context.Background(), ds)
if err != nil {
panic(err)
}

View File

@ -2,6 +2,10 @@ package trickle
import (
"errors"
"time"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
h "github.com/ipfs/go-ipfs/importer/helpers"
dag "github.com/ipfs/go-ipfs/merkledag"
ft "github.com/ipfs/go-ipfs/unixfs"
@ -259,7 +263,9 @@ func verifyTDagRec(nd *dag.Node, depth, direct, layerRepeat int, ds dag.DAGServi
}
for i := 0; i < len(nd.Links); i++ {
child, err := nd.Links[i].GetNode(ds)
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
defer cancel()
child, err := nd.Links[i].GetNode(ctx, ds)
if err != nil {
return err
}

View File

@ -5,6 +5,9 @@ import (
"fmt"
"os"
"sync"
"time"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
dag "github.com/ipfs/go-ipfs/merkledag"
ft "github.com/ipfs/go-ipfs/unixfs"
@ -135,7 +138,10 @@ func (d *Directory) childDir(name string) (*Directory, error) {
func (d *Directory) childFromDag(name string) (*dag.Node, error) {
for _, lnk := range d.node.Links {
if lnk.Name == name {
return lnk.GetNode(d.fs.dserv)
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
defer cancel()
return lnk.GetNode(ctx, d.fs.dserv)
}
}

View File

@ -159,7 +159,7 @@ func (fs *Filesystem) newKeyRoot(parent context.Context, k ci.PrivKey) (*KeyRoot
}
}
mnode, err := fs.dserv.Get(pointsTo)
mnode, err := fs.dserv.Get(ctx, pointsTo)
if err != nil {
return nil, err
}

View File

@ -4,7 +4,6 @@ package merkledag
import (
"fmt"
"sync"
"time"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
blocks "github.com/ipfs/go-ipfs/blocks"
@ -19,7 +18,7 @@ var ErrNotFound = fmt.Errorf("merkledag: not found")
type DAGService interface {
Add(*Node) (u.Key, error)
AddRecursive(*Node) error
Get(u.Key) (*Node, error)
Get(context.Context, u.Key) (*Node, error)
Remove(*Node) error
// GetDAG returns, in order, all the single leve child
@ -83,17 +82,11 @@ func (n *dagService) AddRecursive(nd *Node) error {
}
// Get retrieves a node from the dagService, fetching the block in the BlockService
func (n *dagService) Get(k u.Key) (*Node, error) {
func (n *dagService) Get(ctx context.Context, k u.Key) (*Node, error) {
if n == nil {
return nil, fmt.Errorf("dagService is nil")
}
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
defer cancel()
// we shouldn't use an arbitrary timeout here.
// since Get doesnt take in a context yet, we give a large upper bound.
// think of an http request. we want it to go on as long as the client requests it.
b, err := n.Blocks.GetBlock(ctx, k)
if err != nil {
return nil, err
@ -134,7 +127,7 @@ func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{}
return
}
nd, err := lnk.GetNode(serv)
nd, err := lnk.GetNode(ctx, serv)
if err != nil {
log.Debug(err)
return

View File

@ -190,7 +190,7 @@ func runBatchFetchTest(t *testing.T, read io.Reader) {
wg.Add(1)
go func(i int) {
defer wg.Done()
first, err := dagservs[i].Get(k)
first, err := dagservs[i].Get(context.Background(), k)
if err != nil {
t.Fatal(err)
}

View File

@ -3,6 +3,8 @@ package merkledag
import (
"fmt"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
mh "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
u "github.com/ipfs/go-ipfs/util"
)
@ -77,12 +79,12 @@ func MakeLink(n *Node) (*Link, error) {
}
// GetNode returns the MDAG Node that this link points to
func (l *Link) GetNode(serv DAGService) (*Node, error) {
func (l *Link) GetNode(ctx context.Context, serv DAGService) (*Node, error) {
if l.Node != nil {
return l.Node, nil
}
return serv.Get(u.Key(l.Hash))
return serv.Get(ctx, u.Key(l.Hash))
}
// AddNodeLink adds a link to another node.

View File

@ -3,6 +3,9 @@ package traverse
import (
"errors"
"time"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
mdag "github.com/ipfs/go-ipfs/merkledag"
)
@ -64,7 +67,10 @@ func (t *traversal) callFunc(next State) error {
func (t *traversal) getNode(link *mdag.Link) (*mdag.Node, error) {
getNode := func(l *mdag.Link) (*mdag.Node, error) {
next, err := l.GetNode(t.opts.DAG)
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
defer cancel()
next, err := l.GetNode(ctx, t.opts.DAG)
if err != nil {
return nil, err
}

View File

@ -150,7 +150,7 @@ func InitializeKeyspace(ctx context.Context, ds dag.DAGService, pub Publisher, p
// pin recursively because this might already be pinned
// and doing a direct pin would throw an error in that case
err = pins.Pin(emptyDir, true)
err = pins.Pin(ctx, emptyDir, true)
if err != nil {
return err
}

View File

@ -3,8 +3,10 @@ package path
import (
"fmt"
"time"
mh "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
merkledag "github.com/ipfs/go-ipfs/merkledag"
u "github.com/ipfs/go-ipfs/util"
)
@ -74,7 +76,9 @@ func (s *Resolver) ResolvePathComponents(fpath Path) ([]*merkledag.Node, error)
}
log.Debug("Resolve dag get.\n")
nd, err := s.DAG.Get(u.Key(h))
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
defer cancel()
nd, err := s.DAG.Get(ctx, u.Key(h))
if err != nil {
return nil, err
}
@ -117,7 +121,9 @@ func (s *Resolver) ResolveLinks(ndd *merkledag.Node, names []string) (
if nlink.Node == nil {
// fetch object for link and assign to nd
nd, err = s.DAG.Get(next)
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
defer cancel()
nd, err = s.DAG.Get(ctx, next)
if err != nil {
return append(result, nd), err
}

View File

@ -28,9 +28,11 @@ func loadIndirPin(d ds.Datastore, k ds.Key) (*indirectPin, error) {
refcnt := make(map[util.Key]int)
var keys []util.Key
for encK, v := range rcStore {
k := util.B58KeyDecode(encK)
keys = append(keys, k)
refcnt[k] = v
if v > 0 {
k := util.B58KeyDecode(encK)
keys = append(keys, k)
refcnt[k] = v
}
}
// log.Debugf("indirPin keys: %#v", keys)
@ -59,6 +61,7 @@ func (i *indirectPin) Decrement(k util.Key) {
i.refCounts[k] = c
if c <= 0 {
i.blockset.RemoveBlock(k)
delete(i.refCounts, k)
}
}
@ -69,3 +72,7 @@ func (i *indirectPin) HasKey(k util.Key) bool {
func (i *indirectPin) Set() set.BlockSet {
return i.blockset
}
func (i *indirectPin) GetRefs() map[util.Key]int {
return i.refCounts
}

View File

@ -7,7 +7,6 @@ import (
"errors"
"fmt"
"sync"
"time"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
nsds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace"
@ -33,12 +32,12 @@ const (
type Pinner interface {
IsPinned(util.Key) bool
Pin(*mdag.Node, bool) error
Unpin(util.Key, bool) error
Pin(context.Context, *mdag.Node, bool) error
Unpin(context.Context, util.Key, bool) error
Flush() error
GetManual() ManualPinner
DirectKeys() []util.Key
IndirectKeys() []util.Key
IndirectKeys() map[util.Key]int
RecursiveKeys() []util.Key
}
@ -82,7 +81,7 @@ func NewPinner(dstore ds.ThreadSafeDatastore, serv mdag.DAGService) Pinner {
}
// Pin the given node, optionally recursive
func (p *pinner) Pin(node *mdag.Node, recurse bool) error {
func (p *pinner) Pin(ctx context.Context, node *mdag.Node, recurse bool) error {
p.lock.Lock()
defer p.lock.Unlock()
k, err := node.Key()
@ -99,34 +98,40 @@ func (p *pinner) Pin(node *mdag.Node, recurse bool) error {
p.directPin.RemoveBlock(k)
}
p.recursePin.AddBlock(k)
err := p.pinLinks(node)
err := p.pinLinks(ctx, node)
if err != nil {
return err
}
p.recursePin.AddBlock(k)
} else {
_, err := p.dserv.Get(ctx, k)
if err != nil {
return err
}
if p.recursePin.HasKey(k) {
return fmt.Errorf("%s already pinned recursively", k.B58String())
}
p.directPin.AddBlock(k)
}
return nil
}
// Unpin a given key
func (p *pinner) Unpin(k util.Key, recursive bool) error {
func (p *pinner) Unpin(ctx context.Context, k util.Key, recursive bool) error {
p.lock.Lock()
defer p.lock.Unlock()
if p.recursePin.HasKey(k) {
if recursive {
p.recursePin.RemoveBlock(k)
node, err := p.dserv.Get(k)
node, err := p.dserv.Get(ctx, k)
if err != nil {
return err
}
return p.unpinLinks(node)
return p.unpinLinks(ctx, node)
} else {
return fmt.Errorf("%s is pinned recursively", k)
}
@ -140,9 +145,9 @@ func (p *pinner) Unpin(k util.Key, recursive bool) error {
}
}
func (p *pinner) unpinLinks(node *mdag.Node) error {
func (p *pinner) unpinLinks(ctx context.Context, node *mdag.Node) error {
for _, l := range node.Links {
node, err := l.GetNode(p.dserv)
node, err := l.GetNode(ctx, p.dserv)
if err != nil {
return err
}
@ -152,9 +157,9 @@ func (p *pinner) unpinLinks(node *mdag.Node) error {
return err
}
p.recursePin.RemoveBlock(k)
p.indirPin.Decrement(k)
err = p.unpinLinks(node)
err = p.unpinLinks(ctx, node)
if err != nil {
return err
}
@ -162,27 +167,24 @@ func (p *pinner) unpinLinks(node *mdag.Node) error {
return nil
}
func (p *pinner) pinIndirectRecurse(node *mdag.Node) error {
func (p *pinner) pinIndirectRecurse(ctx context.Context, node *mdag.Node) error {
k, err := node.Key()
if err != nil {
return err
}
p.indirPin.Increment(k)
return p.pinLinks(node)
return p.pinLinks(ctx, node)
}
func (p *pinner) pinLinks(node *mdag.Node) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()
func (p *pinner) pinLinks(ctx context.Context, node *mdag.Node) error {
for _, ng := range p.dserv.GetDAG(ctx, node) {
subnode, err := ng.Get(ctx)
if err != nil {
// TODO: Maybe just log and continue?
return err
}
err = p.pinIndirectRecurse(subnode)
err = p.pinIndirectRecurse(ctx, subnode)
if err != nil {
return err
}
@ -256,8 +258,8 @@ func (p *pinner) DirectKeys() []util.Key {
}
// IndirectKeys returns a slice containing the indirectly pinned keys
func (p *pinner) IndirectKeys() []util.Key {
return p.indirPin.Set().GetKeys()
func (p *pinner) IndirectKeys() map[util.Key]int {
return p.indirPin.GetRefs()
}
// RecursiveKeys returns a slice containing the recursively pinned keys

View File

@ -2,6 +2,9 @@ package pin
import (
"testing"
"time"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dssync "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
@ -21,6 +24,8 @@ func randNode() (*mdag.Node, util.Key) {
}
func TestPinnerBasic(t *testing.T) {
ctx := context.Background()
dstore := dssync.MutexWrap(ds.NewMapDatastore())
bstore := blockstore.NewBlockstore(dstore)
bserv, err := bs.New(bstore, offline.Exchange(bstore))
@ -40,7 +45,7 @@ func TestPinnerBasic(t *testing.T) {
}
// Pin A{}
err = p.Pin(a, false)
err = p.Pin(ctx, a, false)
if err != nil {
t.Fatal(err)
}
@ -74,7 +79,7 @@ func TestPinnerBasic(t *testing.T) {
}
// recursively pin B{A,C}
err = p.Pin(b, true)
err = p.Pin(ctx, b, true)
if err != nil {
t.Fatal(err)
}
@ -102,7 +107,7 @@ func TestPinnerBasic(t *testing.T) {
}
// Add D{A,C,E}
err = p.Pin(d, true)
err = p.Pin(ctx, d, true)
if err != nil {
t.Fatal(err)
}
@ -117,7 +122,7 @@ func TestPinnerBasic(t *testing.T) {
}
// Test recursive unpin
err = p.Unpin(dk, true)
err = p.Unpin(ctx, dk, true)
if err != nil {
t.Fatal(err)
}
@ -154,6 +159,7 @@ func TestPinnerBasic(t *testing.T) {
}
func TestDuplicateSemantics(t *testing.T) {
ctx := context.Background()
dstore := dssync.MutexWrap(ds.NewMapDatastore())
bstore := blockstore.NewBlockstore(dstore)
bserv, err := bs.New(bstore, offline.Exchange(bstore))
@ -173,19 +179,59 @@ func TestDuplicateSemantics(t *testing.T) {
}
// pin is recursively
err = p.Pin(a, true)
err = p.Pin(ctx, a, true)
if err != nil {
t.Fatal(err)
}
// pinning directly should fail
err = p.Pin(a, false)
err = p.Pin(ctx, a, false)
if err == nil {
t.Fatal("expected direct pin to fail")
}
// pinning recursively again should succeed
err = p.Pin(a, true)
err = p.Pin(ctx, a, true)
if err != nil {
t.Fatal(err)
}
}
func TestPinRecursiveFail(t *testing.T) {
ctx := context.Background()
dstore := dssync.MutexWrap(ds.NewMapDatastore())
bstore := blockstore.NewBlockstore(dstore)
bserv, err := bs.New(bstore, offline.Exchange(bstore))
if err != nil {
t.Fatal(err)
}
dserv := mdag.NewDAGService(bserv)
p := NewPinner(dstore, dserv)
a, _ := randNode()
b, _ := randNode()
err = a.AddNodeLinkClean("child", b)
if err != nil {
t.Fatal(err)
}
// Note: this isnt a time based test, we expect the pin to fail
mctx, _ := context.WithTimeout(ctx, time.Millisecond)
err = p.Pin(mctx, a, true)
if err == nil {
t.Fatal("should have failed to pin here")
}
_, err = dserv.Add(b)
if err != nil {
t.Fatal(err)
}
// this one is time based... but shouldnt cause any issues
mctx, _ = context.WithTimeout(ctx, time.Second)
err = p.Pin(mctx, a, true)
if err != nil {
t.Fatal(err)
}

30
test/bin/ipfs-pin-stat Executable file
View File

@ -0,0 +1,30 @@
#!/bin/sh
die() {
echo "$@"
exit 1
}
if [ "$#" -eq 0 ]; then
echo "usage: $0 <object>"
echo "show ipfs pin information for object"
exit 1
fi
path=$1
echo "$path" | grep "/" >/dev/null
if [ "$?" -eq 0 ]; then
die "error: paths not supported. please resolve to hash first."
fi
ipfs pin ls --type=recursive | grep "$path" >/dev/null
[ "$?" -eq 0 ] && echo "$path pinned recursively"
ipfs pin ls --type=indirect | grep "$path" >/dev/null
[ "$?" -eq 0 ] && echo "$path pinned indirectly"
ipfs pin ls --type=direct | grep "$path" >/dev/null
[ "$?" -eq 0 ] && echo "$path pinned directly"
exit 0

View File

@ -295,3 +295,9 @@ test_should_contain() {
return 1
fi
}
test_str_contains() {
find=$1
shift
echo "$@" | grep "$find" >/dev/null
}

View File

@ -140,8 +140,9 @@ test_expect_success "'ipfs pin ls -type=all' is correct" '
cat directpinout >allpins &&
cat rp_actual >>allpins &&
cat indirectpins >>allpins &&
cat allpins | sort | uniq >> allpins_uniq &&
ipfs pin ls -type=all >actual_allpins &&
test_sort_cmp allpins actual_allpins
test_sort_cmp allpins_uniq actual_allpins
'
test_kill_ipfs_daemon

View File

@ -0,0 +1,251 @@
#!/bin/sh
#
# Copyright (c) 2014 Jeromy Johnson
# MIT Licensed; see the LICENSE file in this repository.
#
test_description="Test ipfs repo pinning"
. lib/test-lib.sh
test_pin_flag() {
object=$1
ptype=$2
expect=$3
echo "test_pin_flag" $@
ipfs-pin-stat "$object" | grep "$ptype"
actual=$?
if [ "$expect" = "true" ]; then
if [ "$actual" != "0" ]; then
echo "$object should be pinned $ptype ($actual)"
return 1
fi
else
if [ "$actual" != "1" ]; then
echo "$object should NOT be pinned $ptype ($actual)"
return 1
fi
fi
return 0
}
test_pin() {
object=$1
shift
test_str_contains "recursive" $@
[ "$?" = "0" ] && r="true" || r="false"
test_str_contains "indirect" $@
[ "$?" = "0" ] && i="true" || i="false"
test_str_contains "direct" $@
[ "$?" = "0" ] && d="true" || d="false"
test_pin_flag "$object" "recursive" $r || return 1
test_pin_flag "$object" "indirect" $i || return 1
test_pin_flag "$object" "direct" $d || return 1
return 0
}
test_init_ipfs
# test runs much faster without daemon.
# TODO: turn this back on after:
# https://github.com/ipfs/go-ipfs/issues/1075
# test_launch_ipfs_daemon
HASH_FILE6="QmRsBC3Y2G6VRPYGAVpZczx1W7Xw54MtM1NcLKTkn6rx3U"
HASH_FILE5="QmaN3PtyP8DcVGHi3Q2Fcp7CfAFVcVXKddWbHoNvaA41zf"
HASH_FILE4="QmV1aiVgpDknKQugrK59uBUbMrPnsQM1F9FXbFcfgEvUvH"
HASH_FILE3="QmZrr4Pzqp3NnMzMfbMhNe7LghfoUFHVx7c9Po9GZrhKZ7"
HASH_FILE2="QmSkjTornLY72QhmK9NvAz26815pTaoAL42rF8Qi3w2WBP"
HASH_FILE1="QmbgX4aXhSSY88GHmPQ4roizD8wFwPX8jzTLjc8VAp89x4"
HASH_DIR4="QmW98gV71Ns4bX7QbgWAqLiGF3SDC1JpveZSgBh4ExaSAd"
HASH_DIR3="QmRsCaNBMkweZ9vHT5PJRd2TT9rtNKEKyuognCEVxZxF1H"
HASH_DIR2="QmTUTQAgeVfughDSFukMZLbfGvetDJY7Ef5cDXkKK4abKC"
HASH_DIR1="QmNyZVFbgvmzguS2jVMRb8PQMNcCMJrn9E3doDhBbcPNTY"
DIR1="dir1"
DIR2="dir1/dir2"
DIR4="dir1/dir2/dir4"
DIR3="dir1/dir3"
FILE1="dir1/file1"
FILE2="dir1/file2"
FILE3="dir1/file3"
FILE4="dir1/dir2/file4"
FILE6="dir1/dir2/dir4/file6"
FILE5="dir1/dir3/file5"
test_expect_success "'ipfs add dir' succeeds" '
mkdir dir1 &&
mkdir dir1/dir2 &&
mkdir dir1/dir2/dir4 &&
mkdir dir1/dir3 &&
echo "some text 1" >dir1/file1 &&
echo "some text 2" >dir1/file2 &&
echo "some text 3" >dir1/file3 &&
echo "some text 1" >dir1/dir2/file1 &&
echo "some text 4" >dir1/dir2/file4 &&
echo "some text 1" >dir1/dir2/dir4/file1 &&
echo "some text 2" >dir1/dir2/dir4/file2 &&
echo "some text 6" >dir1/dir2/dir4/file6 &&
echo "some text 2" >dir1/dir3/file2 &&
echo "some text 5" >dir1/dir3/file5 &&
ipfs add -q -r dir1 | tail -n1 >actual1 &&
echo "$HASH_DIR1" >expected1 &&
test_cmp actual1 expected1
'
test_expect_success "objects are there" '
ipfs cat "$HASH_FILE6" >FILE6_a &&
ipfs cat "$HASH_FILE5" >FILE5_a &&
ipfs cat "$HASH_FILE4" >FILE4_a &&
ipfs cat "$HASH_FILE3" >FILE3_a &&
ipfs cat "$HASH_FILE2" >FILE2_a &&
ipfs cat "$HASH_FILE1" >FILE1_a &&
ipfs ls "$HASH_DIR3" >DIR3_a &&
ipfs ls "$HASH_DIR4" >DIR4_a &&
ipfs ls "$HASH_DIR2" >DIR2_a &&
ipfs ls "$HASH_DIR1" >DIR1_a
'
# saving this output for later
test_expect_success "ipfs object links $HASH_DIR1 works" '
ipfs object links $HASH_DIR1 > DIR1_objlink
'
test_expect_success "added dir was pinned recursively" '
test_pin_flag $HASH_DIR1 recursive true
'
test_expect_success "rest were pinned indirectly" '
test_pin_flag "$HASH_FILE6" indirect true
test_pin_flag "$HASH_FILE5" indirect true
test_pin_flag "$HASH_FILE4" indirect true
test_pin_flag "$HASH_FILE3" indirect true
test_pin_flag "$HASH_FILE2" indirect true
test_pin_flag "$HASH_FILE1" indirect true
test_pin_flag "$HASH_DIR3" indirect true
test_pin_flag "$HASH_DIR4" indirect true
test_pin_flag "$HASH_DIR2" indirect true
'
test_expect_success "added dir was NOT pinned indirectly" '
test_pin_flag "$HASH_DIR1" indirect false
'
test_expect_success "nothing is pinned directly" '
ipfs pin ls -type=direct >actual4 &&
test_must_be_empty actual4
'
test_expect_success "'ipfs repo gc' succeeds" '
ipfs repo gc >gc_out_actual &&
test_must_be_empty gc_out_actual
'
test_expect_success "objects are still there" '
cat FILE6_a FILE5_a FILE4_a FILE3_a FILE2_a FILE1_a >expected45 &&
cat DIR3_a DIR4_a DIR2_a DIR1_a >>expected45 &&
ipfs cat "$HASH_FILE6" >actual45 &&
ipfs cat "$HASH_FILE5" >>actual45 &&
ipfs cat "$HASH_FILE4" >>actual45 &&
ipfs cat "$HASH_FILE3" >>actual45 &&
ipfs cat "$HASH_FILE2" >>actual45 &&
ipfs cat "$HASH_FILE1" >>actual45 &&
ipfs ls "$HASH_DIR3" >>actual45 &&
ipfs ls "$HASH_DIR4" >>actual45 &&
ipfs ls "$HASH_DIR2" >>actual45 &&
ipfs ls "$HASH_DIR1" >>actual45 &&
test_cmp expected45 actual45
'
test_expect_success "remove dir recursive pin succeeds" '
echo "unpinned $HASH_DIR1" >expected5 &&
ipfs pin rm -r "$HASH_DIR1" >actual5 &&
test_cmp expected5 actual5
'
test_expect_success "none are pinned any more" '
test_pin "$HASH_FILE6" &&
test_pin "$HASH_FILE5" &&
test_pin "$HASH_FILE4" &&
test_pin "$HASH_FILE3" &&
test_pin "$HASH_FILE2" &&
test_pin "$HASH_FILE1" &&
test_pin "$HASH_DIR3" &&
test_pin "$HASH_DIR4" &&
test_pin "$HASH_DIR2" &&
test_pin "$HASH_DIR1"
'
test_expect_success "pin some directly and indirectly" '
ipfs pin add "$HASH_DIR1" >actual7 &&
ipfs pin add -r "$HASH_DIR2" >>actual7 &&
ipfs pin add "$HASH_FILE1" >>actual7 &&
echo "pinned $HASH_DIR1 directly" >expected7 &&
echo "pinned $HASH_DIR2 recursively" >>expected7 &&
echo "pinned $HASH_FILE1 directly" >>expected7 &&
test_cmp expected7 actual7
'
test_expect_success "pin lists look good" '
test_pin $HASH_DIR1 direct &&
test_pin $HASH_DIR2 recursive &&
test_pin $HASH_DIR3 &&
test_pin $HASH_DIR4 indirect &&
test_pin $HASH_FILE1 indirect direct &&
test_pin $HASH_FILE2 indirect &&
test_pin $HASH_FILE3 &&
test_pin $HASH_FILE4 indirect &&
test_pin $HASH_FILE5 &&
test_pin $HASH_FILE6 indirect
'
test_expect_success "'ipfs repo gc' succeeds" '
ipfs repo gc >gc_out_actual2 &&
echo "removed $HASH_FILE3" > gc_out_exp2 &&
echo "removed $HASH_FILE5" >> gc_out_exp2 &&
echo "removed $HASH_DIR3" >> gc_out_exp2 &&
test_sort_cmp gc_out_actual2 gc_out_exp2
'
# use object links for HASH_DIR1 here because its children
# no longer exist
test_expect_success "some objects are still there" '
cat FILE6_a FILE4_a FILE2_a FILE1_a >expected8 &&
cat DIR4_a DIR2_a DIR1_objlink >>expected8 &&
ipfs cat "$HASH_FILE6" >actual8 &&
ipfs cat "$HASH_FILE4" >>actual8 &&
ipfs cat "$HASH_FILE2" >>actual8 &&
ipfs cat "$HASH_FILE1" >>actual8 &&
ipfs ls "$HASH_DIR4" >>actual8 &&
ipfs ls "$HASH_DIR2" >>actual8 &&
ipfs object links "$HASH_DIR1" >>actual8 &&
test_cmp actual8 expected8
'
# todo: make this faster somehow.
test_expect_success "some are no longer there" '
test_must_fail ipfs cat "$HASH_FILE5" &&
test_must_fail ipfs cat "$HASH_FILE3" &&
test_must_fail ipfs ls "$HASH_DIR3"
'
test_expect_success "recursive pin fails without objects" '
ipfs pin rm "$HASH_DIR1" &&
test_must_fail ipfs pin add -r "$HASH_DIR1" 2>err_expected8 &&
grep "context deadline exceeded" err_expected8
'
# test_kill_ipfs_daemon
test_done

View File

@ -74,7 +74,7 @@ func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (*Dag
if len(n.Links) == 0 {
return nil, errors.New("incorrectly formatted metadata object")
}
child, err := n.Links[0].GetNode(serv)
child, err := n.Links[0].GetNode(ctx, serv)
if err != nil {
return nil, err
}

View File

@ -1,6 +1,10 @@
package io
import (
"time"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
mdag "github.com/ipfs/go-ipfs/merkledag"
format "github.com/ipfs/go-ipfs/unixfs"
u "github.com/ipfs/go-ipfs/util"
@ -20,7 +24,10 @@ func NewDirectory(dserv mdag.DAGService) *directoryBuilder {
}
func (d *directoryBuilder) AddChild(name string, k u.Key) error {
cnode, err := d.dserv.Get(k)
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
defer cancel()
cnode, err := d.dserv.Get(ctx, k)
if err != nil {
return err
}

View File

@ -5,6 +5,7 @@ import (
"errors"
"io"
"os"
"time"
proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
mh "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
@ -184,7 +185,7 @@ func (dm *DagModifier) Sync() error {
return err
}
nd, err := dm.dagserv.Get(thisk)
nd, err := dm.dagserv.Get(dm.ctx, thisk)
if err != nil {
return err
}
@ -267,7 +268,7 @@ func (dm *DagModifier) modifyDag(node *mdag.Node, offset uint64, data io.Reader)
ckey := u.Key(node.Links[i].Hash)
dm.mp.RemovePinWithMode(ckey, pin.Indirect)
child, err := node.Links[i].GetNode(dm.dagserv)
child, err := node.Links[i].GetNode(dm.ctx, dm.dagserv)
if err != nil {
return "", false, err
}
@ -457,7 +458,10 @@ func dagTruncate(nd *mdag.Node, size uint64, ds mdag.DAGService) (*mdag.Node, er
var modified *mdag.Node
ndata := new(ft.FSNode)
for i, lnk := range nd.Links {
child, err := lnk.GetNode(ds)
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
defer cancel()
child, err := lnk.GetNode(ctx, ds)
if err != nil {
return nil, err
}

View File

@ -578,7 +578,7 @@ func enumerateChildren(t *testing.T, nd *mdag.Node, ds mdag.DAGService) []u.Key
var out []u.Key
for _, lnk := range nd.Links {
out = append(out, u.Key(lnk.Hash))
child, err := lnk.GetNode(ds)
child, err := lnk.GetNode(context.Background(), ds)
if err != nil {
t.Fatal(err)
}
@ -643,7 +643,7 @@ func printDag(nd *mdag.Node, ds mdag.DAGService, indent int) {
fmt.Println()
}
for _, lnk := range nd.Links {
child, err := lnk.GetNode(ds)
child, err := lnk.GetNode(context.Background(), ds)
if err != nil {
panic(err)
}