diff --git a/core/commands/pin.go b/core/commands/pin.go index b23e89e8d..d96aace7e 100644 --- a/core/commands/pin.go +++ b/core/commands/pin.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "io" + "time" cmds "github.com/ipfs/go-ipfs/commands" core "github.com/ipfs/go-ipfs/core" @@ -33,6 +34,11 @@ type PinOutput struct { Pins []*cid.Cid } +type AddPinOutput struct { + Pins []*cid.Cid + Progress int `json:",omitempty"` +} + var addPinCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Pin objects to local storage.", @@ -44,8 +50,9 @@ var addPinCmd = &cmds.Command{ }, Options: []cmds.Option{ cmds.BoolOption("recursive", "r", "Recursively pin the object linked to by the specified object(s).").Default(true), + cmds.BoolOption("progress", "Show progress"), }, - Type: PinOutput{}, + Type: AddPinOutput{}, Run: func(req cmds.Request, res cmds.Response) { n, err := req.InvocContext().GetNode() if err != nil { @@ -61,22 +68,88 @@ var addPinCmd = &cmds.Command{ res.SetError(err, cmds.ErrNormal) return } + showProgress, _, _ := req.Option("progress").Bool() - added, err := corerepo.Pin(n, req.Context(), req.Arguments(), recursive) - if err != nil { - res.SetError(err, cmds.ErrNormal) + if !showProgress { + added, err := corerepo.Pin(n, req.Context(), req.Arguments(), recursive) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + res.SetOutput(&AddPinOutput{Pins: added}) return } - res.SetOutput(&PinOutput{added}) + v := new(dag.ProgressTracker) + ctx := v.DeriveContext(req.Context()) + + ch := make(chan []*cid.Cid) + go func() { + defer close(ch) + added, err := corerepo.Pin(n, ctx, req.Arguments(), recursive) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + ch <- added + }() + out := make(chan interface{}) + res.SetOutput((<-chan interface{})(out)) + go func() { + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + defer close(out) + for { + select { + case val, ok := <-ch: + if !ok { + // error already set just return + return + } + if pv := v.Value(); pv != 0 { + out <- &AddPinOutput{Progress: v.Value()} + } + out <- &AddPinOutput{Pins: val} + return + case <-ticker.C: + out <- &AddPinOutput{Progress: v.Value()} + case <-ctx.Done(): + res.SetError(ctx.Err(), cmds.ErrNormal) + return + } + } + }() }, Marshalers: cmds.MarshalerMap{ cmds.Text: func(res cmds.Response) (io.Reader, error) { - added, ok := res.Output().(*PinOutput) - if !ok { + var added []*cid.Cid + + switch out := res.Output().(type) { + case *AddPinOutput: + added = out.Pins + case <-chan interface{}: + progressLine := false + for r0 := range out { + r := r0.(*AddPinOutput) + if r.Pins != nil { + added = r.Pins + } else { + if progressLine { + fmt.Fprintf(res.Stderr(), "\r") + } + fmt.Fprintf(res.Stderr(), "Fetched/Processed %d nodes", r.Progress) + progressLine = true + } + } + if progressLine { + fmt.Fprintf(res.Stderr(), "\n") + } + if res.Error() != nil { + return nil, res.Error() + } + default: return nil, u.ErrCast() } - var pintype string rec, found, _ := res.Request().Option("recursive").Bool() if rec || !found { @@ -86,7 +159,7 @@ var addPinCmd = &cmds.Command{ } buf := new(bytes.Buffer) - for _, k := range added.Pins { + for _, k := range added { fmt.Fprintf(buf, "pinned %s %s\n", k, pintype) } return buf, nil diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index f752ff50f..b81d2b60b 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -139,8 +139,21 @@ func (n *dagService) Remove(nd node.Node) error { } // FetchGraph fetches all nodes that are children of the given node -func FetchGraph(ctx context.Context, c *cid.Cid, serv DAGService) error { - return EnumerateChildrenAsync(ctx, serv, c, cid.NewSet().Visit) +func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error { + v, _ := ctx.Value("progress").(*ProgressTracker) + if v == nil { + return EnumerateChildrenAsync(ctx, serv, root, cid.NewSet().Visit) + } + set := cid.NewSet() + visit := func(c *cid.Cid) bool { + if set.Visit(c) { + v.Increment() + return true + } else { + return false + } + } + return EnumerateChildrenAsync(ctx, serv, root, visit) } // FindLinks searches this nodes links for the given key, @@ -389,6 +402,27 @@ func EnumerateChildren(ctx context.Context, ds LinkService, root *cid.Cid, visit return nil } +type ProgressTracker struct { + Total int + lk sync.Mutex +} + +func (p *ProgressTracker) DeriveContext(ctx context.Context) context.Context { + return context.WithValue(ctx, "progress", p) +} + +func (p *ProgressTracker) Increment() { + p.lk.Lock() + defer p.lk.Unlock() + p.Total++ +} + +func (p *ProgressTracker) Value() int { + p.lk.Lock() + defer p.lk.Unlock() + return p.Total +} + // FetchGraphConcurrency is total number of concurrent fetches that // 'fetchNodes' will start at a time var FetchGraphConcurrency = 8 diff --git a/merkledag/merkledag_test.go b/merkledag/merkledag_test.go index bd310469f..e7cfc8891 100644 --- a/merkledag/merkledag_test.go +++ b/merkledag/merkledag_test.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "io/ioutil" + "math/rand" "strings" "sync" "testing" @@ -547,3 +548,80 @@ func TestEnumerateAsyncFailsNotFound(t *testing.T) { t.Fatal("this should have failed") } } + +func TestProgressIndicator(t *testing.T) { + testProgressIndicator(t, 5) +} + +func TestProgressIndicatorNoChildren(t *testing.T) { + testProgressIndicator(t, 0) +} + +func testProgressIndicator(t *testing.T, depth int) { + ds := dstest.Mock() + + top, numChildren := mkDag(ds, depth) + + v := new(ProgressTracker) + ctx := v.DeriveContext(context.Background()) + + err := FetchGraph(ctx, top, ds) + if err != nil { + t.Fatal(err) + } + + if v.Value() != numChildren+1 { + t.Errorf("wrong number of children reported in progress indicator, expected %d, got %d", + numChildren+1, v.Value()) + } +} + +func mkDag(ds DAGService, depth int) (*cid.Cid, int) { + totalChildren := 0 + f := func() *ProtoNode { + p := new(ProtoNode) + buf := make([]byte, 16) + rand.Read(buf) + + p.SetData(buf) + _, err := ds.Add(p) + if err != nil { + panic(err) + } + return p + } + + for i := 0; i < depth; i++ { + thisf := f + f = func() *ProtoNode { + pn := mkNodeWithChildren(thisf, 10) + _, err := ds.Add(pn) + if err != nil { + panic(err) + } + totalChildren += 10 + return pn + } + } + + nd := f() + c, err := ds.Add(nd) + if err != nil { + panic(err) + } + + return c, totalChildren +} + +func mkNodeWithChildren(getChild func() *ProtoNode, width int) *ProtoNode { + cur := new(ProtoNode) + + for i := 0; i < width; i++ { + c := getChild() + if err := cur.AddNodeLinkClean(fmt.Sprint(i), c); err != nil { + panic(err) + } + } + + return cur +} diff --git a/test/sharness/t0085-pins.sh b/test/sharness/t0085-pins.sh index b07b9eef4..94889dc8a 100755 --- a/test/sharness/t0085-pins.sh +++ b/test/sharness/t0085-pins.sh @@ -10,6 +10,8 @@ test_description="Test ipfs pinning operations" test_pins() { + EXTRA_ARGS=$1 + test_expect_success "create some hashes" ' HASH_A=$(echo "A" | ipfs add -q --pin=false) && HASH_B=$(echo "B" | ipfs add -q --pin=false) && @@ -30,8 +32,8 @@ test_pins() { echo $HASH_G >> hashes ' - test_expect_success "pin those hashes via stdin" ' - cat hashes | ipfs pin add + test_expect_success "'ipfs pin add $EXTRA_ARGS' via stdin" ' + cat hashes | ipfs pin add $EXTRA_ARGS ' test_expect_success "unpin those hashes" ' @@ -39,15 +41,30 @@ test_pins() { ' } -test_pin_dag() { +RANDOM_HASH=Qme8uX5n9hn15pw9p6WcVKoziyyC9LXv4LEgvsmKMULjnV + +test_pins_error_reporting() { + EXTRA_ARGS=$1 + + test_expect_success "'ipfs pin add $EXTRA_ARGS' on non-existent hash should fail" ' + test_must_fail ipfs pin add $EXTRA_ARGS $RANDOM_HASH 2> err && + grep -q "not found" err + ' +} + +test_pin_dag_init() { EXTRA_ARGS=$1 test_expect_success "'ipfs add $EXTRA_ARGS --pin=false' 1MB file" ' random 1048576 56 > afile && HASH=`ipfs add $EXTRA_ARGS --pin=false -q afile` ' +} - test_expect_success "'ipfs pin add' file" ' +test_pin_dag() { + test_pin_dag_init $1 + + test_expect_success "'ipfs pin add --progress' file" ' ipfs pin add --recursive=true $HASH ' @@ -67,20 +84,45 @@ test_pin_dag() { ' } +test_pin_progress() { + test_pin_dag_init + + test_expect_success "'ipfs pin add --progress' file" ' + ipfs pin add --progress $HASH 2> err + ' + + test_expect_success "pin progress reported correctly" ' + cat err + grep -q " 5 nodes" err + ' +} + test_init_ipfs test_pins +test_pins --progress + +test_pins_error_reporting +test_pins_error_reporting --progress test_pin_dag test_pin_dag --raw-leaves +test_pin_progress + test_launch_ipfs_daemon --offline test_pins +test_pins --progress + +test_pins_error_reporting +test_pins_error_reporting --progress test_pin_dag test_pin_dag --raw-leaves +test_pin_progress + test_kill_ipfs_daemon test_done