From 064c194b4f30a9834c1a3fca49495a514b68c58b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 9 Jan 2018 01:10:03 +0100 Subject: [PATCH 1/4] coreapi: pin draft MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/coreapi/interface/interface.go | 44 ++++++++++++++++++++ core/coreapi/interface/options/pin.go | 58 +++++++++++++++++++++++++++ core/coreapi/pin.go | 34 ++++++++++++++++ 3 files changed, 136 insertions(+) create mode 100644 core/coreapi/interface/options/pin.go create mode 100644 core/coreapi/pin.go diff --git a/core/coreapi/interface/interface.go b/core/coreapi/interface/interface.go index 95351e7d0..40fa4131e 100644 --- a/core/coreapi/interface/interface.go +++ b/core/coreapi/interface/interface.go @@ -58,6 +58,15 @@ type BlockStat interface { Path() Path } +// Pin holds information about pinned resource +type Pin interface { + // Path to the pinned object + Path() Path + + // Type of the pin + Type() string +} + // CoreAPI defines an unified interface to IPFS for Go programs. type CoreAPI interface { // Unixfs returns an implementation of Unixfs API. @@ -322,5 +331,40 @@ type ObjectStat struct { CumulativeSize int } +// PinAPI specifies the interface to pining +type PinAPI interface { + // Add creates new pin, be default recursive - pinning the whole referenced + // tree + Add(context.Context, Path, ...options.PinAddOption) error + + // WithRecursive is an option for Add which specifies whether to pin an entire + // object tree or just one object. Default: true + WithRecursive(bool) options.PinAddOption + + // Ls returns list of pinned objects on this node + Ls(context.Context) ([]Pin, error) + + // WithType is an option for Ls which allows to specify which pin types should + // be returned + // + // Supported values: + // * "direct" - directly pinned objects + // * "recursive" - roots of recursive pins + // * "indirect" - indirectly pinned objects (referenced by recursively pinned + // objects) + // * "all" - all pinned objects (default) + WithType(string) options.PinLsOption + + // Rm removes pin for object specified by the path + Rm(context.Context, Path) error + + // Update changes one pin to another, skipping checks for matching paths in + // the old tree + Update(ctx context.Context, from Path, to Path) error + + // Verify verifies the integrity of pinned objects + Verify(context.Context) error +} + var ErrIsDir = errors.New("object is a directory") var ErrOffline = errors.New("can't resolve, ipfs node is offline") diff --git a/core/coreapi/interface/options/pin.go b/core/coreapi/interface/options/pin.go new file mode 100644 index 000000000..4ad16d555 --- /dev/null +++ b/core/coreapi/interface/options/pin.go @@ -0,0 +1,58 @@ +package options + +type PinAddSettings struct { + Recursive bool +} + +type PinLsSettings struct { + Type string +} + +type PinAddOption func(*PinAddSettings) error +type PinLsOption func(settings *PinLsSettings) error + +func PinAddOptions(opts ...PinAddOption) (*PinAddSettings, error) { + options := &PinAddSettings{ + Recursive: true, + } + + for _, opt := range opts { + err := opt(options) + if err != nil { + return nil, err + } + } + + return options, nil +} + +func PinLsOptions(opts ...PinLsOption) (*PinLsSettings, error) { + options := &PinLsSettings{ + Type: "all", + } + + for _, opt := range opts { + err := opt(options) + if err != nil { + return nil, err + } + } + + return options, nil +} + +type PinOptions struct{} + +func (api *PinOptions) WithRecursive(recucsive bool) PinAddOption { + return func(settings *PinAddSettings) error { + settings.Recursive = recucsive + return nil + } +} + +func (api *PinOptions) WithType(t string) PinLsOption { + return func(settings *PinLsSettings) error { + settings.Type = t + return nil + } +} diff --git a/core/coreapi/pin.go b/core/coreapi/pin.go new file mode 100644 index 000000000..ef7f98c24 --- /dev/null +++ b/core/coreapi/pin.go @@ -0,0 +1,34 @@ +package coreapi + +import ( + "context" + + coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" + caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options" + "github.com/pkg/errors" +) + +type PinAPI struct { + *CoreAPI + *caopts.PinOptions +} + +func (api *PinAPI) Add(context.Context, coreiface.Path, ...caopts.PinAddOption) error { + return errors.New("TODO") +} + +func (api *PinAPI) Ls(context.Context) ([]coreiface.Pin, error) { + return nil, errors.New("TODO") +} + +func (api *PinAPI) Rm(context.Context, coreiface.Path) error { + return errors.New("TODO") +} + +func (api *PinAPI) Update(ctx context.Context, from coreiface.Path, to coreiface.Path) error { + return errors.New("TODO") +} + +func (api *PinAPI) Verify(context.Context) error { + return errors.New("TODO") +} From c8cfed5c84f1d4c991434184cc9e287d9bdebeff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 10 Jan 2018 18:41:06 +0100 Subject: [PATCH 2/4] coreapi: implement pin api MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/coreapi/coreapi.go | 4 + core/coreapi/interface/interface.go | 25 +++- core/coreapi/interface/options/pin.go | 27 ++++ core/coreapi/pin.go | 179 ++++++++++++++++++++++++-- 4 files changed, 221 insertions(+), 14 deletions(-) diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go index cc0ab39ce..bccb330cd 100644 --- a/core/coreapi/coreapi.go +++ b/core/coreapi/coreapi.go @@ -52,6 +52,10 @@ func (api *CoreAPI) Object() coreiface.ObjectAPI { return &ObjectAPI{api, nil} } +func (api *CoreAPI) Pin() coreiface.PinAPI { + return &PinAPI{api, nil} +} + // ResolveNode resolves the path `p` using Unixfx resolver, gets and returns the // resolved Node. func (api *CoreAPI) ResolveNode(ctx context.Context, p coreiface.Path) (coreiface.Node, error) { diff --git a/core/coreapi/interface/interface.go b/core/coreapi/interface/interface.go index 40fa4131e..75a168bf3 100644 --- a/core/coreapi/interface/interface.go +++ b/core/coreapi/interface/interface.go @@ -67,6 +67,24 @@ type Pin interface { Type() string } +// PinStatus holds information about pin health +type PinStatus interface { + // Ok indicates whether the pin has been verified to be correct + Ok() bool + + // BadNodes returns any bad (usually missing) nodes from the pin + BadNodes() []BadPinNode +} + +// BadPinNode is a node that has been marked as bad by Pin.Verify +type BadPinNode interface { + // Path is the path of the node + Path() Path + + // Err is the reason why the node has been marked as bad + Err() error +} + // CoreAPI defines an unified interface to IPFS for Go programs. type CoreAPI interface { // Unixfs returns an implementation of Unixfs API. @@ -83,6 +101,7 @@ type CoreAPI interface { // Key returns an implementation of Key API. Key() KeyAPI + Pin() PinAPI // ObjectAPI returns an implementation of Object API Object() ObjectAPI @@ -342,7 +361,7 @@ type PinAPI interface { WithRecursive(bool) options.PinAddOption // Ls returns list of pinned objects on this node - Ls(context.Context) ([]Pin, error) + Ls(context.Context, ...options.PinLsOption) ([]Pin, error) // WithType is an option for Ls which allows to specify which pin types should // be returned @@ -360,10 +379,10 @@ type PinAPI interface { // Update changes one pin to another, skipping checks for matching paths in // the old tree - Update(ctx context.Context, from Path, to Path) error + Update(ctx context.Context, from Path, to Path, opts ...options.PinUpdateOption) error // Verify verifies the integrity of pinned objects - Verify(context.Context) error + Verify(context.Context) (<-chan PinStatus, error) } var ErrIsDir = errors.New("object is a directory") diff --git a/core/coreapi/interface/options/pin.go b/core/coreapi/interface/options/pin.go index 4ad16d555..f97f7b16e 100644 --- a/core/coreapi/interface/options/pin.go +++ b/core/coreapi/interface/options/pin.go @@ -8,8 +8,13 @@ type PinLsSettings struct { Type string } +type PinUpdateSettings struct { + Unpin bool +} + type PinAddOption func(*PinAddSettings) error type PinLsOption func(settings *PinLsSettings) error +type PinUpdateOption func(*PinUpdateSettings) error func PinAddOptions(opts ...PinAddOption) (*PinAddSettings, error) { options := &PinAddSettings{ @@ -41,6 +46,21 @@ func PinLsOptions(opts ...PinLsOption) (*PinLsSettings, error) { return options, nil } +func PinUpdateOptions(opts ...PinUpdateOption) (*PinUpdateSettings, error) { + options := &PinUpdateSettings{ + Unpin: true, + } + + for _, opt := range opts { + err := opt(options) + if err != nil { + return nil, err + } + } + + return options, nil +} + type PinOptions struct{} func (api *PinOptions) WithRecursive(recucsive bool) PinAddOption { @@ -56,3 +76,10 @@ func (api *PinOptions) WithType(t string) PinLsOption { return nil } } + +func (api *PinOptions) WithUnpin(unpin bool) PinUpdateOption { + return func(settings *PinUpdateSettings) error { + settings.Unpin = unpin + return nil + } +} diff --git a/core/coreapi/pin.go b/core/coreapi/pin.go index ef7f98c24..bcfba1ff8 100644 --- a/core/coreapi/pin.go +++ b/core/coreapi/pin.go @@ -2,10 +2,15 @@ package coreapi import ( "context" + "fmt" coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options" - "github.com/pkg/errors" + corerepo "github.com/ipfs/go-ipfs/core/corerepo" + merkledag "github.com/ipfs/go-ipfs/merkledag" + pin "github.com/ipfs/go-ipfs/pin" + + cid "gx/ipfs/QmeSrf6pzut73u6zLQkRFQ3ygt3k6XFT2kjdYP8Tnkwwyg/go-cid" ) type PinAPI struct { @@ -13,22 +18,174 @@ type PinAPI struct { *caopts.PinOptions } -func (api *PinAPI) Add(context.Context, coreiface.Path, ...caopts.PinAddOption) error { - return errors.New("TODO") +func (api *PinAPI) Add(ctx context.Context, p coreiface.Path, opts ...caopts.PinAddOption) error { + settings, err := caopts.PinAddOptions(opts...) + if err != nil { + return err + } + + defer api.node.Blockstore.PinLock().Unlock() + + _, err = corerepo.Pin(api.node, ctx, []string{p.String()}, settings.Recursive) + if err != nil { + return err + } + + return nil } -func (api *PinAPI) Ls(context.Context) ([]coreiface.Pin, error) { - return nil, errors.New("TODO") +func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) ([]coreiface.Pin, error) { + settings, err := caopts.PinLsOptions(opts...) + if err != nil { + return nil, err + } + + switch settings.Type { + case "all", "direct", "indirect", "recursive": + default: + return nil, fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", settings.Type) + } + + return pinLsAll(settings.Type, ctx, api.node.Pinning, api.node.DAG) } -func (api *PinAPI) Rm(context.Context, coreiface.Path) error { - return errors.New("TODO") +func (api *PinAPI) Rm(ctx context.Context, p coreiface.Path) error { + _, err := corerepo.Unpin(api.node, ctx, []string{p.String()}, true) + if err != nil { + return err + } + + return nil } -func (api *PinAPI) Update(ctx context.Context, from coreiface.Path, to coreiface.Path) error { - return errors.New("TODO") +func (api *PinAPI) Update(ctx context.Context, from coreiface.Path, to coreiface.Path, opts ...caopts.PinUpdateOption) error { + settings, err := caopts.PinUpdateOptions(opts...) + if err != nil { + return err + } + + return api.node.Pinning.Update(ctx, from.Cid(), to.Cid(), settings.Unpin) } -func (api *PinAPI) Verify(context.Context) error { - return errors.New("TODO") +type pinStatus struct { + cid *cid.Cid + ok bool + badNodes []coreiface.BadPinNode +} + +// BadNode is used in PinVerifyRes +type badNode struct { + cid *cid.Cid + err error +} + +func (s *pinStatus) Ok() bool { + return s.Ok() +} + +func (s *pinStatus) BadNodes() []coreiface.BadPinNode { + return s.badNodes +} + +func (n *badNode) Path() coreiface.Path { + return ParseCid(n.cid) +} + +func (n *badNode) Err() error { + return n.err +} + +func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, error) { + visited := make(map[string]*pinStatus) + getLinks := api.node.DAG.GetOfflineLinkService().GetLinks + recPins := api.node.Pinning.RecursiveKeys() + + var checkPin func(root *cid.Cid) *pinStatus + checkPin = func(root *cid.Cid) *pinStatus { + key := root.String() + if status, ok := visited[key]; ok { + return status + } + + links, err := getLinks(ctx, root) + if err != nil { + status := &pinStatus{ok: false, cid: root} + status.badNodes = []coreiface.BadPinNode{&badNode{cid: root, err: err}} + visited[key] = status + return status + } + + status := &pinStatus{ok: true, cid: root} + for _, lnk := range links { + res := checkPin(lnk.Cid) + if !res.ok { + status.ok = false + status.badNodes = append(status.badNodes, res.badNodes...) + } + } + + visited[key] = status + return status + } + + out := make(chan coreiface.PinStatus) + go func() { + defer close(out) + for _, c := range recPins { + out <- checkPin(c) + } + }() + + return out, nil +} + +type pinInfo struct { + pinType string + object *cid.Cid +} + +func (p *pinInfo) Path() coreiface.Path { + return ParseCid(p.object) +} + +func (p *pinInfo) Type() string { + return p.pinType +} + +func pinLsAll(typeStr string, ctx context.Context, pinning pin.Pinner, dag merkledag.DAGService) ([]coreiface.Pin, error) { + + keys := make(map[string]*pinInfo) + + AddToResultKeys := func(keyList []*cid.Cid, typeStr string) { + for _, c := range keyList { + keys[c.String()] = &pinInfo{ + pinType: typeStr, + object: c, + } + } + } + + if typeStr == "direct" || typeStr == "all" { + AddToResultKeys(pinning.DirectKeys(), "direct") + } + if typeStr == "indirect" || typeStr == "all" { + set := cid.NewSet() + for _, k := range pinning.RecursiveKeys() { + err := merkledag.EnumerateChildren(ctx, dag.GetLinks, k, set.Visit) + if err != nil { + return nil, err + } + } + AddToResultKeys(set.Keys(), "indirect") + } + if typeStr == "recursive" || typeStr == "all" { + AddToResultKeys(pinning.RecursiveKeys(), "recursive") + } + + out := make([]coreiface.Pin, 0, len(keys)) + for _, v := range keys { + out = append(out, v) + } + + return out, nil } From 5ea9a3cde2cc7fe54bb6fa13d385ef6ed973c32c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 11 Jan 2018 23:34:30 +0100 Subject: [PATCH 3/4] coreapi: pin tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/coreapi/pin.go | 2 +- core/coreapi/pin_test.go | 209 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 210 insertions(+), 1 deletion(-) create mode 100644 core/coreapi/pin_test.go diff --git a/core/coreapi/pin.go b/core/coreapi/pin.go index bcfba1ff8..753743711 100644 --- a/core/coreapi/pin.go +++ b/core/coreapi/pin.go @@ -80,7 +80,7 @@ type badNode struct { } func (s *pinStatus) Ok() bool { - return s.Ok() + return s.ok } func (s *pinStatus) BadNodes() []coreiface.BadPinNode { diff --git a/core/coreapi/pin_test.go b/core/coreapi/pin_test.go new file mode 100644 index 000000000..6ab0e5350 --- /dev/null +++ b/core/coreapi/pin_test.go @@ -0,0 +1,209 @@ +package coreapi_test + +import ( + "context" + "strings" + "testing" +) + +func TestPinAdd(t *testing.T) { + ctx := context.Background() + _, api, err := makeAPI(ctx) + if err != nil { + t.Error(err) + } + + p, err := api.Unixfs().Add(ctx, strings.NewReader("foo")) + if err != nil { + t.Error(err) + } + + err = api.Pin().Add(ctx, p) + if err != nil { + t.Error(err) + } +} + +func TestPinSimple(t *testing.T) { + ctx := context.Background() + _, api, err := makeAPI(ctx) + if err != nil { + t.Error(err) + } + + p, err := api.Unixfs().Add(ctx, strings.NewReader("foo")) + if err != nil { + t.Error(err) + } + + err = api.Pin().Add(ctx, p) + if err != nil { + t.Error(err) + } + + list, err := api.Pin().Ls(ctx) + if err != nil { + t.Fatal(err) + } + + if len(list) != 1 { + t.Errorf("unexpected pin list len: %d", len(list)) + } + + if list[0].Path().String() != p.String() { + t.Error("paths don't match") + } + + if list[0].Type() != "recursive" { + t.Error("unexpected pin type") + } + + err = api.Pin().Rm(ctx, p) + if err != nil { + t.Fatal(err) + } + + list, err = api.Pin().Ls(ctx) + if err != nil { + t.Fatal(err) + } + + if len(list) != 0 { + t.Errorf("unexpected pin list len: %d", len(list)) + } +} + +func TestPinRecursive(t *testing.T) { + ctx := context.Background() + nd, api, err := makeAPI(ctx) + if err != nil { + t.Error(err) + } + + p0, err := api.Unixfs().Add(ctx, strings.NewReader("foo")) + if err != nil { + t.Error(err) + } + + p1, err := api.Unixfs().Add(ctx, strings.NewReader("bar")) + if err != nil { + t.Error(err) + } + + p2, err := api.Dag().Put(ctx, strings.NewReader(`{"lnk": {"/": "`+p0.Cid().String()+`"}}`)) + if err != nil { + t.Error(err) + } + + p3, err := api.Dag().Put(ctx, strings.NewReader(`{"lnk": {"/": "`+p1.Cid().String()+`"}}`)) + if err != nil { + t.Error(err) + } + + err = api.Pin().Add(ctx, p2) + if err != nil { + t.Error(err) + } + + err = api.Pin().Add(ctx, p3, api.Pin().WithRecursive(false)) + if err != nil { + t.Error(err) + } + + list, err := api.Pin().Ls(ctx) + if err != nil { + t.Fatal(err) + } + + if len(list) != 3 { + t.Errorf("unexpected pin list len: %d", len(list)) + } + + list, err = api.Pin().Ls(ctx, api.Pin().WithType("direct")) + if err != nil { + t.Fatal(err) + } + + if len(list) != 1 { + t.Errorf("unexpected pin list len: %d", len(list)) + } + + if list[0].Path().String() != p3.String() { + t.Error("unexpected path") + } + + list, err = api.Pin().Ls(ctx, api.Pin().WithType("recursive")) + if err != nil { + t.Fatal(err) + } + + if len(list) != 1 { + t.Errorf("unexpected pin list len: %d", len(list)) + } + + if list[0].Path().String() != p2.String() { + t.Error("unexpected path") + } + + list, err = api.Pin().Ls(ctx, api.Pin().WithType("indirect")) + if err != nil { + t.Fatal(err) + } + + if len(list) != 1 { + t.Errorf("unexpected pin list len: %d", len(list)) + } + + if list[0].Path().String() != p0.String() { + t.Error("unexpected path") + } + + res, err := api.Pin().Verify(ctx) + if err != nil { + t.Fatal(err) + } + n := 0 + for r := range res { + if !r.Ok() { + t.Error("expected pin to be ok") + } + n++ + } + + if n != 1 { + t.Errorf("unexpected verify result count: %d", n) + } + + err = nd.Blockstore.DeleteBlock(p0.Cid()) + if err != nil { + t.Fatal(err) + } + + res, err = api.Pin().Verify(ctx) + if err != nil { + t.Fatal(err) + } + n = 0 + for r := range res { + if r.Ok() { + t.Error("expected pin to not be ok") + } + + if len(r.BadNodes()) != 1 { + t.Fatalf("unexpected badNodes len") + } + + if r.BadNodes()[0].Path().String() != p0.String() { + t.Error("unexpected badNode path") + } + + if r.BadNodes()[0].Err().Error() != "merkledag: not found" { + t.Errorf("unexpected badNode error: %s", r.BadNodes()[0].Err().Error()) + } + n++ + } + + if n != 1 { + t.Errorf("unexpected verify result count: %d", n) + } +} From 242c98f44f4f8fe80cae1be6597648c478a46ffc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 31 Jan 2018 00:02:52 +0100 Subject: [PATCH 4/4] coreapi: update after rebase MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/coreapi/pin.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/core/coreapi/pin.go b/core/coreapi/pin.go index 753743711..61768999e 100644 --- a/core/coreapi/pin.go +++ b/core/coreapi/pin.go @@ -4,13 +4,16 @@ import ( "context" "fmt" + bserv "github.com/ipfs/go-ipfs/blockservice" coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options" corerepo "github.com/ipfs/go-ipfs/core/corerepo" + offline "github.com/ipfs/go-ipfs/exchange/offline" merkledag "github.com/ipfs/go-ipfs/merkledag" pin "github.com/ipfs/go-ipfs/pin" - cid "gx/ipfs/QmeSrf6pzut73u6zLQkRFQ3ygt3k6XFT2kjdYP8Tnkwwyg/go-cid" + cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid" + ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format" ) type PinAPI struct { @@ -97,7 +100,9 @@ func (n *badNode) Err() error { func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, error) { visited := make(map[string]*pinStatus) - getLinks := api.node.DAG.GetOfflineLinkService().GetLinks + bs := api.node.Blocks.Blockstore() + DAG := merkledag.NewDAGService(bserv.New(bs, offline.Exchange(bs))) + getLinks := merkledag.GetLinksWithDAG(DAG) recPins := api.node.Pinning.RecursiveKeys() var checkPin func(root *cid.Cid) *pinStatus @@ -152,7 +157,7 @@ func (p *pinInfo) Type() string { return p.pinType } -func pinLsAll(typeStr string, ctx context.Context, pinning pin.Pinner, dag merkledag.DAGService) ([]coreiface.Pin, error) { +func pinLsAll(typeStr string, ctx context.Context, pinning pin.Pinner, dag ipld.DAGService) ([]coreiface.Pin, error) { keys := make(map[string]*pinInfo) @@ -171,7 +176,7 @@ func pinLsAll(typeStr string, ctx context.Context, pinning pin.Pinner, dag merkl if typeStr == "indirect" || typeStr == "all" { set := cid.NewSet() for _, k := range pinning.RecursiveKeys() { - err := merkledag.EnumerateChildren(ctx, dag.GetLinks, k, set.Visit) + err := merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(dag), k, set.Visit) if err != nil { return nil, err }