mirror of
https://github.com/ipfs/kubo.git
synced 2025-05-17 15:06:47 +08:00
refactor(cmds): do not return errors embedded in result type (#10527)
incl. https://github.com/ipfs/boxo/pull/738
This commit is contained in:
@ -62,10 +62,12 @@ type pinLsObject struct {
|
||||
Type string
|
||||
}
|
||||
|
||||
func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan iface.Pin, error) {
|
||||
func (api *PinAPI) Ls(ctx context.Context, pins chan<- iface.Pin, opts ...caopts.PinLsOption) error {
|
||||
defer close(pins)
|
||||
|
||||
options, err := caopts.PinLsOptions(opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
res, err := api.core().Request("pin/ls").
|
||||
@ -73,48 +75,32 @@ func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan i
|
||||
Option("stream", true).
|
||||
Send(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
defer res.Output.Close()
|
||||
|
||||
pins := make(chan iface.Pin)
|
||||
go func(ch chan<- iface.Pin) {
|
||||
defer res.Output.Close()
|
||||
defer close(ch)
|
||||
|
||||
dec := json.NewDecoder(res.Output)
|
||||
var out pinLsObject
|
||||
for {
|
||||
switch err := dec.Decode(&out); err {
|
||||
case nil:
|
||||
case io.EOF:
|
||||
return
|
||||
default:
|
||||
select {
|
||||
case ch <- pin{err: err}:
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
c, err := cid.Parse(out.Cid)
|
||||
if err != nil {
|
||||
select {
|
||||
case ch <- pin{err: err}:
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case ch <- pin{typ: out.Type, name: out.Name, path: path.FromCid(c)}:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
dec := json.NewDecoder(res.Output)
|
||||
var out pinLsObject
|
||||
for {
|
||||
err := dec.Decode(&out)
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}(pins)
|
||||
return pins, nil
|
||||
|
||||
c, err := cid.Parse(out.Cid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
select {
|
||||
case pins <- pin{typ: out.Type, name: out.Name, path: path.FromCid(c)}:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// IsPinned returns whether or not the given cid is pinned
|
||||
|
@ -144,10 +144,12 @@ type lsOutput struct {
|
||||
Objects []lsObject
|
||||
}
|
||||
|
||||
func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.UnixfsLsOption) (<-chan iface.DirEntry, error) {
|
||||
func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, out chan<- iface.DirEntry, opts ...caopts.UnixfsLsOption) error {
|
||||
defer close(out)
|
||||
|
||||
options, err := caopts.UnixfsLsOptions(opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
resp, err := api.core().Request("ls", p.String()).
|
||||
@ -156,86 +158,64 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.Unixfs
|
||||
Option("stream", true).
|
||||
Send(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
if resp.Error != nil {
|
||||
return nil, resp.Error
|
||||
return err
|
||||
}
|
||||
defer resp.Close()
|
||||
|
||||
dec := json.NewDecoder(resp.Output)
|
||||
out := make(chan iface.DirEntry)
|
||||
|
||||
go func() {
|
||||
defer resp.Close()
|
||||
defer close(out)
|
||||
|
||||
for {
|
||||
var link lsOutput
|
||||
if err := dec.Decode(&link); err != nil {
|
||||
if err == io.EOF {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case out <- iface.DirEntry{Err: err}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if len(link.Objects) != 1 {
|
||||
select {
|
||||
case out <- iface.DirEntry{Err: errors.New("unexpected Objects len")}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if len(link.Objects[0].Links) != 1 {
|
||||
select {
|
||||
case out <- iface.DirEntry{Err: errors.New("unexpected Links len")}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
l0 := link.Objects[0].Links[0]
|
||||
|
||||
c, err := cid.Decode(l0.Hash)
|
||||
if err != nil {
|
||||
select {
|
||||
case out <- iface.DirEntry{Err: err}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
var ftype iface.FileType
|
||||
switch l0.Type {
|
||||
case unixfs.TRaw, unixfs.TFile:
|
||||
ftype = iface.TFile
|
||||
case unixfs.THAMTShard, unixfs.TDirectory, unixfs.TMetadata:
|
||||
ftype = iface.TDirectory
|
||||
case unixfs.TSymlink:
|
||||
ftype = iface.TSymlink
|
||||
}
|
||||
|
||||
select {
|
||||
case out <- iface.DirEntry{
|
||||
Name: l0.Name,
|
||||
Cid: c,
|
||||
Size: l0.Size,
|
||||
Type: ftype,
|
||||
Target: l0.Target,
|
||||
|
||||
Mode: l0.Mode,
|
||||
ModTime: l0.ModTime,
|
||||
}:
|
||||
case <-ctx.Done():
|
||||
for {
|
||||
var link lsOutput
|
||||
if err = dec.Decode(&link); err != nil {
|
||||
if err != io.EOF {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}()
|
||||
|
||||
return out, nil
|
||||
if len(link.Objects) != 1 {
|
||||
return errors.New("unexpected Objects len")
|
||||
}
|
||||
|
||||
if len(link.Objects[0].Links) != 1 {
|
||||
return errors.New("unexpected Links len")
|
||||
}
|
||||
|
||||
l0 := link.Objects[0].Links[0]
|
||||
|
||||
c, err := cid.Decode(l0.Hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var ftype iface.FileType
|
||||
switch l0.Type {
|
||||
case unixfs.TRaw, unixfs.TFile:
|
||||
ftype = iface.TFile
|
||||
case unixfs.THAMTShard, unixfs.TDirectory, unixfs.TMetadata:
|
||||
ftype = iface.TDirectory
|
||||
case unixfs.TSymlink:
|
||||
ftype = iface.TSymlink
|
||||
}
|
||||
|
||||
select {
|
||||
case out <- iface.DirEntry{
|
||||
Name: l0.Name,
|
||||
Cid: c,
|
||||
Size: l0.Size,
|
||||
Type: ftype,
|
||||
Target: l0.Target,
|
||||
|
||||
Mode: l0.Mode,
|
||||
ModTime: l0.ModTime,
|
||||
}:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (api *UnixfsAPI) core() *HttpApi {
|
||||
|
@ -183,7 +183,7 @@ func pinMFS(ctx context.Context, node pinMFSNode, cid cid.Cid, svcName string, s
|
||||
|
||||
// check if MFS pin exists (across all possible states) and inspect its CID
|
||||
pinStatuses := []pinclient.Status{pinclient.StatusQueued, pinclient.StatusPinning, pinclient.StatusPinned, pinclient.StatusFailed}
|
||||
lsPinCh, lsErrCh := c.Ls(ctx, pinclient.PinOpts.FilterName(pinName), pinclient.PinOpts.FilterStatus(pinStatuses...))
|
||||
lsPinCh, lsErrCh := c.GoLs(ctx, pinclient.PinOpts.FilterName(pinName), pinclient.PinOpts.FilterStatus(pinStatuses...))
|
||||
existingRequestID := "" // is there any pre-existing MFS pin with pinName (for any CID)?
|
||||
pinning := false // is CID for current MFS already being pinned?
|
||||
pinTime := time.Now().UTC()
|
||||
|
@ -1,6 +1,7 @@
|
||||
package commands
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
@ -133,23 +134,24 @@ The JSON output contains type information.
|
||||
}
|
||||
}
|
||||
|
||||
lsCtx, cancel := context.WithCancel(req.Context)
|
||||
defer cancel()
|
||||
|
||||
for i, fpath := range paths {
|
||||
pth, err := cmdutils.PathOrCidPath(fpath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
results, err := api.Unixfs().Ls(req.Context, pth,
|
||||
options.Unixfs.ResolveChildren(resolveSize || resolveType))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
results := make(chan iface.DirEntry)
|
||||
lsErr := make(chan error, 1)
|
||||
go func() {
|
||||
lsErr <- api.Unixfs().Ls(lsCtx, pth, results,
|
||||
options.Unixfs.ResolveChildren(resolveSize || resolveType))
|
||||
}()
|
||||
|
||||
processLink, dirDone = processDir()
|
||||
for link := range results {
|
||||
if link.Err != nil {
|
||||
return link.Err
|
||||
}
|
||||
var ftype unixfs_pb.Data_DataType
|
||||
switch link.Type {
|
||||
case iface.TFile:
|
||||
@ -170,10 +172,13 @@ The JSON output contains type information.
|
||||
Mode: link.Mode,
|
||||
ModTime: link.ModTime,
|
||||
}
|
||||
if err := processLink(paths[i], lsLink); err != nil {
|
||||
if err = processLink(paths[i], lsLink); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err = <-lsErr; err != nil {
|
||||
return err
|
||||
}
|
||||
dirDone(i)
|
||||
}
|
||||
return done()
|
||||
|
@ -557,15 +557,16 @@ func pinLsAll(req *cmds.Request, typeStr string, detailed bool, name string, api
|
||||
panic("unhandled pin type")
|
||||
}
|
||||
|
||||
pins, err := api.Pin().Ls(req.Context, opt, options.Pin.Ls.Detailed(detailed), options.Pin.Ls.Name(name))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pins := make(chan coreiface.Pin)
|
||||
lsErr := make(chan error, 1)
|
||||
lsCtx, cancel := context.WithCancel(req.Context)
|
||||
defer cancel()
|
||||
|
||||
go func() {
|
||||
lsErr <- api.Pin().Ls(lsCtx, pins, opt, options.Pin.Ls.Detailed(detailed), options.Pin.Ls.Name(name))
|
||||
}()
|
||||
|
||||
for p := range pins {
|
||||
if err := p.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
err = emit(PinLsOutputWrapper{
|
||||
PinLsObject: PinLsObject{
|
||||
Type: p.Type(),
|
||||
@ -577,8 +578,7 @@ func pinLsAll(req *cmds.Request, typeStr string, detailed bool, name string, api
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return <-lsErr
|
||||
}
|
||||
|
||||
const (
|
||||
|
@ -285,26 +285,26 @@ Pass '--status=queued,pinning,pinned,failed' to list pins in all states.
|
||||
cmds.DelimitedStringsOption(",", pinStatusOptionName, "Return pins with the specified statuses (queued,pinning,pinned,failed).").WithDefault([]string{"pinned"}),
|
||||
},
|
||||
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
|
||||
ctx, cancel := context.WithCancel(req.Context)
|
||||
defer cancel()
|
||||
|
||||
c, err := getRemotePinServiceFromRequest(req, env)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
psCh, errCh, err := lsRemote(ctx, req, c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ctx, cancel := context.WithCancel(req.Context)
|
||||
defer cancel()
|
||||
|
||||
psCh := make(chan pinclient.PinStatusGetter)
|
||||
lsErr := make(chan error, 1)
|
||||
go func() {
|
||||
lsErr <- lsRemote(ctx, req, c, psCh)
|
||||
}()
|
||||
for ps := range psCh {
|
||||
if err := res.Emit(toRemotePinOutput(ps)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return <-errCh
|
||||
return <-lsErr
|
||||
},
|
||||
Type: RemotePinOutput{},
|
||||
Encoders: cmds.EncoderMap{
|
||||
@ -317,7 +317,7 @@ Pass '--status=queued,pinning,pinned,failed' to list pins in all states.
|
||||
}
|
||||
|
||||
// Executes GET /pins/?query-with-filters
|
||||
func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client) (chan pinclient.PinStatusGetter, chan error, error) {
|
||||
func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client, out chan<- pinclient.PinStatusGetter) error {
|
||||
opts := []pinclient.LsOption{}
|
||||
if name, nameFound := req.Options[pinNameOptionName]; nameFound {
|
||||
nameStr := name.(string)
|
||||
@ -330,7 +330,8 @@ func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client) (chan
|
||||
for _, rawCID := range cidsRawArr {
|
||||
parsedCID, err := cid.Decode(rawCID)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("CID %q cannot be parsed: %v", rawCID, err)
|
||||
close(out)
|
||||
return fmt.Errorf("CID %q cannot be parsed: %v", rawCID, err)
|
||||
}
|
||||
parsedCIDs = append(parsedCIDs, parsedCID)
|
||||
}
|
||||
@ -342,16 +343,15 @@ func lsRemote(ctx context.Context, req *cmds.Request, c *pinclient.Client) (chan
|
||||
for _, rawStatus := range statusRawArr {
|
||||
s := pinclient.Status(rawStatus)
|
||||
if s.String() == string(pinclient.StatusUnknown) {
|
||||
return nil, nil, fmt.Errorf("status %q is not valid", rawStatus)
|
||||
close(out)
|
||||
return fmt.Errorf("status %q is not valid", rawStatus)
|
||||
}
|
||||
parsedStatuses = append(parsedStatuses, s)
|
||||
}
|
||||
opts = append(opts, pinclient.PinOpts.FilterStatus(parsedStatuses...))
|
||||
}
|
||||
|
||||
psCh, errCh := c.Ls(ctx, opts...)
|
||||
|
||||
return psCh, errCh, nil
|
||||
return c.Ls(ctx, out, opts...)
|
||||
}
|
||||
|
||||
var rmRemotePinCmd = &cmds.Command{
|
||||
@ -393,36 +393,37 @@ To list and then remove all pending pin requests, pass an explicit status list:
|
||||
cmds.BoolOption(pinForceOptionName, "Allow removal of multiple pins matching the query without additional confirmation.").WithDefault(false),
|
||||
},
|
||||
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
|
||||
ctx, cancel := context.WithCancel(req.Context)
|
||||
defer cancel()
|
||||
|
||||
c, err := getRemotePinServiceFromRequest(req, env)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rmIDs := []string{}
|
||||
if len(req.Arguments) == 0 {
|
||||
psCh, errCh, err := lsRemote(ctx, req, c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for ps := range psCh {
|
||||
rmIDs = append(rmIDs, ps.GetRequestId())
|
||||
}
|
||||
if err = <-errCh; err != nil {
|
||||
return fmt.Errorf("error while listing remote pins: %v", err)
|
||||
}
|
||||
|
||||
if len(rmIDs) > 1 && !req.Options[pinForceOptionName].(bool) {
|
||||
return fmt.Errorf("multiple remote pins are matching this query, add --force to confirm the bulk removal")
|
||||
}
|
||||
} else {
|
||||
if len(req.Arguments) != 0 {
|
||||
return fmt.Errorf("unexpected argument %q", req.Arguments[0])
|
||||
}
|
||||
|
||||
psCh := make(chan pinclient.PinStatusGetter)
|
||||
errCh := make(chan error, 1)
|
||||
ctx, cancel := context.WithCancel(req.Context)
|
||||
defer cancel()
|
||||
|
||||
go func() {
|
||||
errCh <- lsRemote(ctx, req, c, psCh)
|
||||
}()
|
||||
for ps := range psCh {
|
||||
rmIDs = append(rmIDs, ps.GetRequestId())
|
||||
}
|
||||
if err = <-errCh; err != nil {
|
||||
return fmt.Errorf("error while listing remote pins: %v", err)
|
||||
}
|
||||
|
||||
if len(rmIDs) > 1 && !req.Options[pinForceOptionName].(bool) {
|
||||
return fmt.Errorf("multiple remote pins are matching this query, add --force to confirm the bulk removal")
|
||||
}
|
||||
|
||||
for _, rmID := range rmIDs {
|
||||
if err := c.DeleteByID(ctx, rmID); err != nil {
|
||||
if err = c.DeleteByID(ctx, rmID); err != nil {
|
||||
return fmt.Errorf("removing pin identified by requestid=%q failed: %v", rmID, err)
|
||||
}
|
||||
}
|
||||
|
@ -51,13 +51,14 @@ func (api *PinAPI) Add(ctx context.Context, p path.Path, opts ...caopts.PinAddOp
|
||||
return api.pinning.Flush(ctx)
|
||||
}
|
||||
|
||||
func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan coreiface.Pin, error) {
|
||||
func (api *PinAPI) Ls(ctx context.Context, pins chan<- coreiface.Pin, opts ...caopts.PinLsOption) error {
|
||||
ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "Ls")
|
||||
defer span.End()
|
||||
|
||||
settings, err := caopts.PinLsOptions(opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
close(pins)
|
||||
return err
|
||||
}
|
||||
|
||||
span.SetAttributes(attribute.String("type", settings.Type))
|
||||
@ -65,10 +66,11 @@ func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan c
|
||||
switch settings.Type {
|
||||
case "all", "direct", "indirect", "recursive":
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", settings.Type)
|
||||
close(pins)
|
||||
return fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", settings.Type)
|
||||
}
|
||||
|
||||
return api.pinLsAll(ctx, settings.Type, settings.Detailed, settings.Name), nil
|
||||
return api.pinLsAll(ctx, settings.Type, settings.Detailed, settings.Name, pins)
|
||||
}
|
||||
|
||||
func (api *PinAPI) IsPinned(ctx context.Context, p path.Path, opts ...caopts.PinIsPinnedOption) (string, bool, error) {
|
||||
@ -230,6 +232,7 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, erro
|
||||
}
|
||||
|
||||
out := make(chan coreiface.PinStatus)
|
||||
|
||||
go func() {
|
||||
defer close(out)
|
||||
for p := range api.pinning.RecursiveKeys(ctx, false) {
|
||||
@ -254,7 +257,6 @@ type pinInfo struct {
|
||||
pinType string
|
||||
path path.ImmutablePath
|
||||
name string
|
||||
err error
|
||||
}
|
||||
|
||||
func (p *pinInfo) Path() path.ImmutablePath {
|
||||
@ -269,17 +271,12 @@ func (p *pinInfo) Name() string {
|
||||
return p.name
|
||||
}
|
||||
|
||||
func (p *pinInfo) Err() error {
|
||||
return p.err
|
||||
}
|
||||
|
||||
// pinLsAll is an internal function for returning a list of pins
|
||||
//
|
||||
// The caller must keep reading results until the channel is closed to prevent
|
||||
// leaking the goroutine that is fetching pins.
|
||||
func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool, name string) <-chan coreiface.Pin {
|
||||
out := make(chan coreiface.Pin, 1)
|
||||
|
||||
func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool, name string, out chan<- coreiface.Pin) error {
|
||||
defer close(out)
|
||||
emittedSet := cid.NewSet()
|
||||
|
||||
AddToResultKeys := func(c cid.Cid, pinName, typeStr string) error {
|
||||
@ -297,87 +294,79 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool,
|
||||
return nil
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer close(out)
|
||||
|
||||
var rkeys []cid.Cid
|
||||
var err error
|
||||
if typeStr == "recursive" || typeStr == "all" {
|
||||
for streamedCid := range api.pinning.RecursiveKeys(ctx, detailed) {
|
||||
if streamedCid.Err != nil {
|
||||
out <- &pinInfo{err: streamedCid.Err}
|
||||
return
|
||||
}
|
||||
if err = AddToResultKeys(streamedCid.Pin.Key, streamedCid.Pin.Name, "recursive"); err != nil {
|
||||
out <- &pinInfo{err: err}
|
||||
return
|
||||
}
|
||||
rkeys = append(rkeys, streamedCid.Pin.Key)
|
||||
var rkeys []cid.Cid
|
||||
var err error
|
||||
if typeStr == "recursive" || typeStr == "all" {
|
||||
for streamedCid := range api.pinning.RecursiveKeys(ctx, detailed) {
|
||||
if streamedCid.Err != nil {
|
||||
return streamedCid.Err
|
||||
}
|
||||
if err = AddToResultKeys(streamedCid.Pin.Key, streamedCid.Pin.Name, "recursive"); err != nil {
|
||||
return err
|
||||
}
|
||||
rkeys = append(rkeys, streamedCid.Pin.Key)
|
||||
}
|
||||
}
|
||||
if typeStr == "direct" || typeStr == "all" {
|
||||
for streamedCid := range api.pinning.DirectKeys(ctx, detailed) {
|
||||
if streamedCid.Err != nil {
|
||||
return streamedCid.Err
|
||||
}
|
||||
if err = AddToResultKeys(streamedCid.Pin.Key, streamedCid.Pin.Name, "direct"); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if typeStr == "direct" || typeStr == "all" {
|
||||
for streamedCid := range api.pinning.DirectKeys(ctx, detailed) {
|
||||
if streamedCid.Err != nil {
|
||||
out <- &pinInfo{err: streamedCid.Err}
|
||||
return
|
||||
}
|
||||
if err = AddToResultKeys(streamedCid.Pin.Key, streamedCid.Pin.Name, "direct"); err != nil {
|
||||
out <- &pinInfo{err: err}
|
||||
return
|
||||
}
|
||||
}
|
||||
if typeStr == "indirect" {
|
||||
// We need to first visit the direct pins that have priority
|
||||
// without emitting them
|
||||
|
||||
for streamedCid := range api.pinning.DirectKeys(ctx, detailed) {
|
||||
if streamedCid.Err != nil {
|
||||
return streamedCid.Err
|
||||
}
|
||||
emittedSet.Add(streamedCid.Pin.Key)
|
||||
}
|
||||
|
||||
for streamedCid := range api.pinning.RecursiveKeys(ctx, detailed) {
|
||||
if streamedCid.Err != nil {
|
||||
return streamedCid.Err
|
||||
}
|
||||
emittedSet.Add(streamedCid.Pin.Key)
|
||||
rkeys = append(rkeys, streamedCid.Pin.Key)
|
||||
}
|
||||
}
|
||||
if typeStr == "indirect" || typeStr == "all" {
|
||||
if len(rkeys) == 0 {
|
||||
return nil
|
||||
}
|
||||
var addErr error
|
||||
walkingSet := cid.NewSet()
|
||||
for _, k := range rkeys {
|
||||
err = merkledag.Walk(
|
||||
ctx, merkledag.GetLinksWithDAG(api.dag), k,
|
||||
func(c cid.Cid) bool {
|
||||
if !walkingSet.Visit(c) {
|
||||
return false
|
||||
}
|
||||
if emittedSet.Has(c) {
|
||||
return true // skipped
|
||||
}
|
||||
addErr = AddToResultKeys(c, "", "indirect")
|
||||
return addErr == nil
|
||||
},
|
||||
merkledag.SkipRoot(), merkledag.Concurrent(),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if addErr != nil {
|
||||
return addErr
|
||||
}
|
||||
}
|
||||
if typeStr == "indirect" {
|
||||
// We need to first visit the direct pins that have priority
|
||||
// without emitting them
|
||||
}
|
||||
|
||||
for streamedCid := range api.pinning.DirectKeys(ctx, detailed) {
|
||||
if streamedCid.Err != nil {
|
||||
out <- &pinInfo{err: streamedCid.Err}
|
||||
return
|
||||
}
|
||||
emittedSet.Add(streamedCid.Pin.Key)
|
||||
}
|
||||
|
||||
for streamedCid := range api.pinning.RecursiveKeys(ctx, detailed) {
|
||||
if streamedCid.Err != nil {
|
||||
out <- &pinInfo{err: streamedCid.Err}
|
||||
return
|
||||
}
|
||||
emittedSet.Add(streamedCid.Pin.Key)
|
||||
rkeys = append(rkeys, streamedCid.Pin.Key)
|
||||
}
|
||||
}
|
||||
if typeStr == "indirect" || typeStr == "all" {
|
||||
walkingSet := cid.NewSet()
|
||||
for _, k := range rkeys {
|
||||
err = merkledag.Walk(
|
||||
ctx, merkledag.GetLinksWithDAG(api.dag), k,
|
||||
func(c cid.Cid) bool {
|
||||
if !walkingSet.Visit(c) {
|
||||
return false
|
||||
}
|
||||
if emittedSet.Has(c) {
|
||||
return true // skipped
|
||||
}
|
||||
err := AddToResultKeys(c, "", "indirect")
|
||||
if err != nil {
|
||||
out <- &pinInfo{err: err}
|
||||
return false
|
||||
}
|
||||
return true
|
||||
},
|
||||
merkledag.SkipRoot(), merkledag.Concurrent(),
|
||||
)
|
||||
if err != nil {
|
||||
out <- &pinInfo{err: err}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return out
|
||||
return nil
|
||||
}
|
||||
|
||||
func (api *PinAPI) core() coreiface.CoreAPI {
|
||||
|
@ -2,6 +2,7 @@ package coreapi
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
blockservice "github.com/ipfs/boxo/blockservice"
|
||||
@ -197,13 +198,15 @@ func (api *UnixfsAPI) Get(ctx context.Context, p path.Path) (files.Node, error)
|
||||
|
||||
// Ls returns the contents of an IPFS or IPNS object(s) at path p, with the format:
|
||||
// `<link base58 hash> <link size in bytes> <link name>`
|
||||
func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...options.UnixfsLsOption) (<-chan coreiface.DirEntry, error) {
|
||||
func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, out chan<- coreiface.DirEntry, opts ...options.UnixfsLsOption) error {
|
||||
ctx, span := tracing.Span(ctx, "CoreAPI.UnixfsAPI", "Ls", trace.WithAttributes(attribute.String("path", p.String())))
|
||||
defer span.End()
|
||||
|
||||
defer close(out)
|
||||
|
||||
settings, err := options.UnixfsLsOptions(opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
span.SetAttributes(attribute.Bool("resolvechildren", settings.ResolveChildren))
|
||||
@ -213,21 +216,21 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...options.Unixf
|
||||
|
||||
dagnode, err := ses.ResolveNode(ctx, p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
dir, err := uio.NewDirectoryFromNode(ses.dag, dagnode)
|
||||
if err == uio.ErrNotADir {
|
||||
return uses.lsFromLinks(ctx, dagnode.Links(), settings)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if errors.Is(err, uio.ErrNotADir) {
|
||||
return uses.lsFromLinks(ctx, dagnode.Links(), settings, out)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
return uses.lsFromLinksAsync(ctx, dir, settings)
|
||||
return uses.lsFromDirLinks(ctx, dir, settings, out)
|
||||
}
|
||||
|
||||
func (api *UnixfsAPI) processLink(ctx context.Context, linkres ft.LinkResult, settings *options.UnixfsLsSettings) coreiface.DirEntry {
|
||||
func (api *UnixfsAPI) processLink(ctx context.Context, linkres ft.LinkResult, settings *options.UnixfsLsSettings) (coreiface.DirEntry, error) {
|
||||
ctx, span := tracing.Span(ctx, "CoreAPI.UnixfsAPI", "ProcessLink")
|
||||
defer span.End()
|
||||
if linkres.Link != nil {
|
||||
@ -235,7 +238,7 @@ func (api *UnixfsAPI) processLink(ctx context.Context, linkres ft.LinkResult, se
|
||||
}
|
||||
|
||||
if linkres.Err != nil {
|
||||
return coreiface.DirEntry{Err: linkres.Err}
|
||||
return coreiface.DirEntry{}, linkres.Err
|
||||
}
|
||||
|
||||
lnk := coreiface.DirEntry{
|
||||
@ -252,15 +255,13 @@ func (api *UnixfsAPI) processLink(ctx context.Context, linkres ft.LinkResult, se
|
||||
if settings.ResolveChildren {
|
||||
linkNode, err := linkres.Link.GetNode(ctx, api.dag)
|
||||
if err != nil {
|
||||
lnk.Err = err
|
||||
break
|
||||
return coreiface.DirEntry{}, err
|
||||
}
|
||||
|
||||
if pn, ok := linkNode.(*merkledag.ProtoNode); ok {
|
||||
d, err := ft.FSNodeFromBytes(pn.Data())
|
||||
if err != nil {
|
||||
lnk.Err = err
|
||||
break
|
||||
return coreiface.DirEntry{}, err
|
||||
}
|
||||
switch d.Type() {
|
||||
case ft.TFile, ft.TRaw:
|
||||
@ -284,35 +285,50 @@ func (api *UnixfsAPI) processLink(ctx context.Context, linkres ft.LinkResult, se
|
||||
}
|
||||
}
|
||||
|
||||
return lnk
|
||||
return lnk, nil
|
||||
}
|
||||
|
||||
func (api *UnixfsAPI) lsFromLinksAsync(ctx context.Context, dir uio.Directory, settings *options.UnixfsLsSettings) (<-chan coreiface.DirEntry, error) {
|
||||
out := make(chan coreiface.DirEntry, uio.DefaultShardWidth)
|
||||
func (api *UnixfsAPI) lsFromDirLinks(ctx context.Context, dir uio.Directory, settings *options.UnixfsLsSettings, out chan<- coreiface.DirEntry) error {
|
||||
for l := range dir.EnumLinksAsync(ctx) {
|
||||
dirEnt, err := api.processLink(ctx, l, settings) // TODO: perf: processing can be done in background and in parallel
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
select {
|
||||
case out <- dirEnt:
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (api *UnixfsAPI) lsFromLinks(ctx context.Context, ndlinks []*ipld.Link, settings *options.UnixfsLsSettings, out chan<- coreiface.DirEntry) error {
|
||||
// Create links channel large enough to not block when writing to out is slower.
|
||||
links := make(chan coreiface.DirEntry, len(ndlinks))
|
||||
errs := make(chan error, 1)
|
||||
go func() {
|
||||
defer close(out)
|
||||
for l := range dir.EnumLinksAsync(ctx) {
|
||||
defer close(links)
|
||||
defer close(errs)
|
||||
for _, l := range ndlinks {
|
||||
lr := ft.LinkResult{Link: &ipld.Link{Name: l.Name, Size: l.Size, Cid: l.Cid}}
|
||||
lnk, err := api.processLink(ctx, lr, settings) // TODO: can be parallel if settings.Async
|
||||
if err != nil {
|
||||
errs <- err
|
||||
return
|
||||
}
|
||||
select {
|
||||
case out <- api.processLink(ctx, l, settings): // TODO: perf: processing can be done in background and in parallel
|
||||
case links <- lnk:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (api *UnixfsAPI) lsFromLinks(ctx context.Context, ndlinks []*ipld.Link, settings *options.UnixfsLsSettings) (<-chan coreiface.DirEntry, error) {
|
||||
links := make(chan coreiface.DirEntry, len(ndlinks))
|
||||
for _, l := range ndlinks {
|
||||
lr := ft.LinkResult{Link: &ipld.Link{Name: l.Name, Size: l.Size, Cid: l.Cid}}
|
||||
|
||||
links <- api.processLink(ctx, lr, settings) // TODO: can be parallel if settings.Async
|
||||
for lnk := range links {
|
||||
out <- lnk
|
||||
}
|
||||
close(links)
|
||||
return links, nil
|
||||
return <-errs
|
||||
}
|
||||
|
||||
func (api *UnixfsAPI) core() *CoreAPI {
|
||||
|
@ -18,9 +18,6 @@ type Pin interface {
|
||||
|
||||
// Type of the pin
|
||||
Type() string
|
||||
|
||||
// if not nil, an error happened. Everything else should be ignored.
|
||||
Err() error
|
||||
}
|
||||
|
||||
// PinStatus holds information about pin health
|
||||
@ -50,8 +47,9 @@ type PinAPI interface {
|
||||
// tree
|
||||
Add(context.Context, path.Path, ...options.PinAddOption) error
|
||||
|
||||
// Ls returns list of pinned objects on this node
|
||||
Ls(context.Context, ...options.PinLsOption) (<-chan Pin, error)
|
||||
// Ls returns this node's pinned objects on the provided channel. The
|
||||
// channel is closed when there are no more pins and an error is returned.
|
||||
Ls(context.Context, chan<- Pin, ...options.PinLsOption) error
|
||||
|
||||
// IsPinned returns whether or not the given cid is pinned
|
||||
// and an explanation of why its pinned
|
||||
|
@ -323,9 +323,17 @@ func (tp *TestSuite) TestBlockPin(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if pins, err := api.Pin().Ls(ctx); err != nil || len(pins) != 0 {
|
||||
pinCh := make(chan coreiface.Pin)
|
||||
go func() {
|
||||
err = api.Pin().Ls(ctx, pinCh)
|
||||
}()
|
||||
|
||||
for range pinCh {
|
||||
t.Fatal("expected 0 pins")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
res, err := api.Block().Put(
|
||||
ctx,
|
||||
@ -337,7 +345,7 @@ func (tp *TestSuite) TestBlockPin(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
pins, err := accPins(api.Pin().Ls(ctx))
|
||||
pins, err := accPins(ctx, api)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -67,7 +67,7 @@ func (tp *TestSuite) TestPinSimple(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
list, err := accPins(api.Pin().Ls(ctx))
|
||||
list, err := accPins(ctx, api)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -91,7 +91,7 @@ func (tp *TestSuite) TestPinSimple(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
list, err = accPins(api.Pin().Ls(ctx))
|
||||
list, err = accPins(ctx, api)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -143,7 +143,7 @@ func (tp *TestSuite) TestPinRecursive(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
list, err := accPins(api.Pin().Ls(ctx))
|
||||
list, err := accPins(ctx, api)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -152,7 +152,7 @@ func (tp *TestSuite) TestPinRecursive(t *testing.T) {
|
||||
t.Errorf("unexpected pin list len: %d", len(list))
|
||||
}
|
||||
|
||||
list, err = accPins(api.Pin().Ls(ctx, opt.Pin.Ls.Direct()))
|
||||
list, err = accPins(ctx, api, opt.Pin.Ls.Direct())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -165,7 +165,7 @@ func (tp *TestSuite) TestPinRecursive(t *testing.T) {
|
||||
t.Errorf("unexpected path, %s != %s", list[0].Path().String(), path.FromCid(nd3.Cid()).String())
|
||||
}
|
||||
|
||||
list, err = accPins(api.Pin().Ls(ctx, opt.Pin.Ls.Recursive()))
|
||||
list, err = accPins(ctx, api, opt.Pin.Ls.Recursive())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -178,7 +178,7 @@ func (tp *TestSuite) TestPinRecursive(t *testing.T) {
|
||||
t.Errorf("unexpected path, %s != %s", list[0].Path().String(), path.FromCid(nd2.Cid()).String())
|
||||
}
|
||||
|
||||
list, err = accPins(api.Pin().Ls(ctx, opt.Pin.Ls.Indirect()))
|
||||
list, err = accPins(ctx, api, opt.Pin.Ls.Indirect())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -436,21 +436,21 @@ func getThreeChainedNodes(t *testing.T, ctx context.Context, api iface.CoreAPI,
|
||||
func assertPinTypes(t *testing.T, ctx context.Context, api iface.CoreAPI, recusive, direct, indirect []cidContainer) {
|
||||
assertPinLsAllConsistency(t, ctx, api)
|
||||
|
||||
list, err := accPins(api.Pin().Ls(ctx, opt.Pin.Ls.Recursive()))
|
||||
list, err := accPins(ctx, api, opt.Pin.Ls.Recursive())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
assertPinCids(t, list, recusive...)
|
||||
|
||||
list, err = accPins(api.Pin().Ls(ctx, opt.Pin.Ls.Direct()))
|
||||
list, err = accPins(ctx, api, opt.Pin.Ls.Direct())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
assertPinCids(t, list, direct...)
|
||||
|
||||
list, err = accPins(api.Pin().Ls(ctx, opt.Pin.Ls.Indirect()))
|
||||
list, err = accPins(ctx, api, opt.Pin.Ls.Indirect())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -500,7 +500,7 @@ func assertPinCids(t *testing.T, pins []iface.Pin, cids ...cidContainer) {
|
||||
// assertPinLsAllConsistency verifies that listing all pins gives the same result as listing the pin types individually
|
||||
func assertPinLsAllConsistency(t *testing.T, ctx context.Context, api iface.CoreAPI) {
|
||||
t.Helper()
|
||||
allPins, err := accPins(api.Pin().Ls(ctx))
|
||||
allPins, err := accPins(ctx, api)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -531,7 +531,7 @@ func assertPinLsAllConsistency(t *testing.T, ctx context.Context, api iface.Core
|
||||
}
|
||||
|
||||
for typeStr, pinProps := range typeMap {
|
||||
pins, err := accPins(api.Pin().Ls(ctx, pinProps.PinLsOption))
|
||||
pins, err := accPins(ctx, api, pinProps.PinLsOption)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -593,19 +593,19 @@ func assertNotPinned(t *testing.T, ctx context.Context, api iface.CoreAPI, p pat
|
||||
}
|
||||
}
|
||||
|
||||
func accPins(pins <-chan iface.Pin, err error) ([]iface.Pin, error) {
|
||||
func accPins(ctx context.Context, api iface.CoreAPI, opts ...opt.PinLsOption) ([]iface.Pin, error) {
|
||||
var err error
|
||||
pins := make(chan iface.Pin)
|
||||
go func() {
|
||||
err = api.Pin().Ls(ctx, pins, opts...)
|
||||
}()
|
||||
|
||||
var results []iface.Pin
|
||||
for pin := range pins {
|
||||
results = append(results, pin)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var result []iface.Pin
|
||||
|
||||
for pin := range pins {
|
||||
if pin.Err() != nil {
|
||||
return nil, pin.Err()
|
||||
}
|
||||
result = append(result, pin)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
return results, nil
|
||||
}
|
||||
|
@ -544,7 +544,7 @@ func (tp *TestSuite) TestAddPinned(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
pins, err := accPins(api.Pin().Ls(ctx))
|
||||
pins, err := accPins(ctx, api)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -681,14 +681,15 @@ func (tp *TestSuite) TestLs(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
entries, err := api.Unixfs().Ls(ctx, p)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
errCh := make(chan error, 1)
|
||||
entries := make(chan coreiface.DirEntry)
|
||||
go func() {
|
||||
errCh <- api.Unixfs().Ls(ctx, p, entries)
|
||||
}()
|
||||
|
||||
entry := <-entries
|
||||
if entry.Err != nil {
|
||||
t.Fatal(entry.Err)
|
||||
entry, ok := <-entries
|
||||
if !ok {
|
||||
t.Fatal("expected another entry")
|
||||
}
|
||||
if entry.Size != 15 {
|
||||
t.Errorf("expected size = 15, got %d", entry.Size)
|
||||
@ -702,9 +703,9 @@ func (tp *TestSuite) TestLs(t *testing.T) {
|
||||
if entry.Cid.String() != "QmX3qQVKxDGz3URVC3861Z3CKtQKGBn6ffXRBBWGMFz9Lr" {
|
||||
t.Errorf("expected cid = QmX3qQVKxDGz3URVC3861Z3CKtQKGBn6ffXRBBWGMFz9Lr, got %s", entry.Cid)
|
||||
}
|
||||
entry = <-entries
|
||||
if entry.Err != nil {
|
||||
t.Fatal(entry.Err)
|
||||
entry, ok = <-entries
|
||||
if !ok {
|
||||
t.Fatal("expected another entry")
|
||||
}
|
||||
if entry.Type != coreiface.TSymlink {
|
||||
t.Errorf("wrong type %s", entry.Type)
|
||||
@ -716,11 +717,12 @@ func (tp *TestSuite) TestLs(t *testing.T) {
|
||||
t.Errorf("expected symlink target to be /foo/bar, got %s", entry.Target)
|
||||
}
|
||||
|
||||
if l, ok := <-entries; ok {
|
||||
t.Errorf("didn't expect a second link")
|
||||
if l.Err != nil {
|
||||
t.Error(l.Err)
|
||||
}
|
||||
_, ok = <-entries
|
||||
if ok {
|
||||
t.Errorf("didn't expect a another link")
|
||||
}
|
||||
if err = <-errCh; err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -779,13 +781,22 @@ func (tp *TestSuite) TestLsEmptyDir(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
links, err := api.Unixfs().Ls(ctx, p)
|
||||
if err != nil {
|
||||
errCh := make(chan error, 1)
|
||||
links := make(chan coreiface.DirEntry)
|
||||
go func() {
|
||||
errCh <- api.Unixfs().Ls(ctx, p, links)
|
||||
}()
|
||||
|
||||
var count int
|
||||
for range links {
|
||||
count++
|
||||
}
|
||||
if err = <-errCh; err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(links) != 0 {
|
||||
t.Fatalf("expected 0 links, got %d", len(links))
|
||||
if count != 0 {
|
||||
t.Fatalf("expected 0 links, got %d", count)
|
||||
}
|
||||
}
|
||||
|
||||
@ -808,13 +819,22 @@ func (tp *TestSuite) TestLsNonUnixfs(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
links, err := api.Unixfs().Ls(ctx, path.FromCid(nd.Cid()))
|
||||
if err != nil {
|
||||
errCh := make(chan error, 1)
|
||||
links := make(chan coreiface.DirEntry)
|
||||
go func() {
|
||||
errCh <- api.Unixfs().Ls(ctx, path.FromCid(nd.Cid()), links)
|
||||
}()
|
||||
|
||||
var count int
|
||||
for range links {
|
||||
count++
|
||||
}
|
||||
if err = <-errCh; err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(links) != 0 {
|
||||
t.Fatalf("expected 0 links, got %d", len(links))
|
||||
if count != 0 {
|
||||
t.Fatalf("expected 0 links, got %d", count)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2,6 +2,7 @@ package iface
|
||||
|
||||
import (
|
||||
"context"
|
||||
"iter"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
@ -63,8 +64,6 @@ type DirEntry struct {
|
||||
|
||||
Mode os.FileMode
|
||||
ModTime time.Time
|
||||
|
||||
Err error
|
||||
}
|
||||
|
||||
// UnixfsAPI is the basic interface to immutable files in IPFS
|
||||
@ -81,7 +80,56 @@ type UnixfsAPI interface {
|
||||
// to operations performed on the returned file
|
||||
Get(context.Context, path.Path) (files.Node, error)
|
||||
|
||||
// Ls returns the list of links in a directory. Links aren't guaranteed to be
|
||||
// returned in order
|
||||
Ls(context.Context, path.Path, ...options.UnixfsLsOption) (<-chan DirEntry, error)
|
||||
// Ls writes the links in a directory to the DirEntry channel. Links aren't
|
||||
// guaranteed to be returned in order. If an error occurs or the context is
|
||||
// canceled, the DirEntry channel is closed and an error is returned.
|
||||
//
|
||||
// Example:
|
||||
//
|
||||
// dirs := make(chan DirEntry)
|
||||
// lsErr := make(chan error, 1)
|
||||
// go func() {
|
||||
// lsErr <- Ls(ctx, p, dirs)
|
||||
// }()
|
||||
// for dirEnt := range dirs {
|
||||
// fmt.Println("Dir name:", dirEnt.Name)
|
||||
// }
|
||||
// err := <-lsErr
|
||||
// if err != nil {
|
||||
// return fmt.Errorf("error listing directory: %w", err)
|
||||
// }
|
||||
Ls(context.Context, path.Path, chan<- DirEntry, ...options.UnixfsLsOption) error
|
||||
}
|
||||
|
||||
// LsIter returns a go iterator that allows ranging over DirEntry results.
|
||||
// Iteration stops if the context is canceled or if the iterator yields an
|
||||
// error.
|
||||
//
|
||||
// Exmaple:
|
||||
//
|
||||
// for dirEnt, err := LsIter(ctx, ufsAPI, p) {
|
||||
// if err != nil {
|
||||
// return fmt.Errorf("error listing directory: %w", err)
|
||||
// }
|
||||
// fmt.Println("Dir name:", dirEnt.Name)
|
||||
// }
|
||||
func LsIter(ctx context.Context, api UnixfsAPI, p path.Path, opts ...options.UnixfsLsOption) iter.Seq2[DirEntry, error] {
|
||||
return func(yield func(DirEntry, error) bool) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel() // cancel Ls if done iterating early
|
||||
|
||||
dirs := make(chan DirEntry)
|
||||
lsErr := make(chan error, 1)
|
||||
go func() {
|
||||
lsErr <- api.Ls(ctx, p, dirs, opts...)
|
||||
}()
|
||||
for dirEnt := range dirs {
|
||||
if !yield(dirEnt, nil) {
|
||||
return
|
||||
}
|
||||
}
|
||||
if err := <-lsErr; err != nil {
|
||||
yield(DirEntry{}, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -7,7 +7,7 @@ go 1.23
|
||||
replace github.com/ipfs/kubo => ./../../..
|
||||
|
||||
require (
|
||||
github.com/ipfs/boxo v0.24.4-0.20241125210908-37756ce2eeb1
|
||||
github.com/ipfs/boxo v0.24.4-0.20241203185533-3a3e8afa3492
|
||||
github.com/ipfs/kubo v0.0.0-00010101000000-000000000000
|
||||
github.com/libp2p/go-libp2p v0.37.2
|
||||
github.com/multiformats/go-multiaddr v0.13.0
|
||||
@ -52,6 +52,8 @@ require (
|
||||
github.com/francoispqt/gojay v1.2.13 // indirect
|
||||
github.com/fsnotify/fsnotify v1.7.0 // indirect
|
||||
github.com/gabriel-vasile/mimetype v1.4.6 // indirect
|
||||
github.com/gammazero/chanqueue v1.0.0 // indirect
|
||||
github.com/gammazero/deque v1.0.0 // indirect
|
||||
github.com/getsentry/sentry-go v0.27.0 // indirect
|
||||
github.com/go-jose/go-jose/v4 v4.0.4 // indirect
|
||||
github.com/go-logr/logr v1.4.2 // indirect
|
||||
|
@ -164,6 +164,10 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos
|
||||
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
|
||||
github.com/gabriel-vasile/mimetype v1.4.6 h1:3+PzJTKLkvgjeTbts6msPJt4DixhT4YtFNf1gtGe3zc=
|
||||
github.com/gabriel-vasile/mimetype v1.4.6/go.mod h1:JX1qVKqZd40hUPpAfiNTe0Sne7hdfKSbOqqmkq8GCXc=
|
||||
github.com/gammazero/chanqueue v1.0.0 h1:FER/sMailGFA3DDvFooEkipAMU+3c9Bg3bheloPSz6o=
|
||||
github.com/gammazero/chanqueue v1.0.0/go.mod h1:fMwpwEiuUgpab0sH4VHiVcEoji1pSi+EIzeG4TPeKPc=
|
||||
github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34=
|
||||
github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo=
|
||||
github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps=
|
||||
github.com/getsentry/sentry-go v0.27.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY=
|
||||
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
|
||||
@ -298,8 +302,8 @@ github.com/ipfs-shipyard/nopfs/ipfs v0.13.2-0.20231027223058-cde3b5ba964c h1:7Uy
|
||||
github.com/ipfs-shipyard/nopfs/ipfs v0.13.2-0.20231027223058-cde3b5ba964c/go.mod h1:6EekK/jo+TynwSE/ZOiOJd4eEvRXoavEC3vquKtv4yI=
|
||||
github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs=
|
||||
github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0=
|
||||
github.com/ipfs/boxo v0.24.4-0.20241125210908-37756ce2eeb1 h1:Ox1qTlON8qG46rUL7dDEwnIt7W9MhaidtvR/97RywWw=
|
||||
github.com/ipfs/boxo v0.24.4-0.20241125210908-37756ce2eeb1/go.mod h1:Kxk43F+avGAsJSwhJW4isNYrpGwXHRJCvJ19Pt+MQc4=
|
||||
github.com/ipfs/boxo v0.24.4-0.20241203185533-3a3e8afa3492 h1:kiS5+H+6aJeNWWDynuYu/ijgzkBTrInl++VFcNDgq+g=
|
||||
github.com/ipfs/boxo v0.24.4-0.20241203185533-3a3e8afa3492/go.mod h1:lAoydO+oJhB1e7pUn4ju1Z1fuUIwy+zb0hQXRb/bu2g=
|
||||
github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA=
|
||||
github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU=
|
||||
github.com/ipfs/go-bitswap v0.11.0 h1:j1WVvhDX1yhG32NTC9xfxnqycqYIlhzEzLXG/cU1HyQ=
|
||||
|
4
go.mod
4
go.mod
@ -22,7 +22,7 @@ require (
|
||||
github.com/hashicorp/go-version v1.7.0
|
||||
github.com/ipfs-shipyard/nopfs v0.0.12
|
||||
github.com/ipfs-shipyard/nopfs/ipfs v0.13.2-0.20231027223058-cde3b5ba964c
|
||||
github.com/ipfs/boxo v0.24.4-0.20241125210908-37756ce2eeb1
|
||||
github.com/ipfs/boxo v0.24.4-0.20241203185533-3a3e8afa3492
|
||||
github.com/ipfs/go-block-format v0.2.0
|
||||
github.com/ipfs/go-cid v0.4.1
|
||||
github.com/ipfs/go-cidutil v0.1.0
|
||||
@ -125,6 +125,8 @@ require (
|
||||
github.com/flynn/noise v1.1.0 // indirect
|
||||
github.com/francoispqt/gojay v1.2.13 // indirect
|
||||
github.com/gabriel-vasile/mimetype v1.4.6 // indirect
|
||||
github.com/gammazero/chanqueue v1.0.0 // indirect
|
||||
github.com/gammazero/deque v1.0.0 // indirect
|
||||
github.com/getsentry/sentry-go v0.27.0 // indirect
|
||||
github.com/go-jose/go-jose/v4 v4.0.4 // indirect
|
||||
github.com/go-kit/log v0.2.1 // indirect
|
||||
|
8
go.sum
8
go.sum
@ -198,6 +198,10 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos
|
||||
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
|
||||
github.com/gabriel-vasile/mimetype v1.4.6 h1:3+PzJTKLkvgjeTbts6msPJt4DixhT4YtFNf1gtGe3zc=
|
||||
github.com/gabriel-vasile/mimetype v1.4.6/go.mod h1:JX1qVKqZd40hUPpAfiNTe0Sne7hdfKSbOqqmkq8GCXc=
|
||||
github.com/gammazero/chanqueue v1.0.0 h1:FER/sMailGFA3DDvFooEkipAMU+3c9Bg3bheloPSz6o=
|
||||
github.com/gammazero/chanqueue v1.0.0/go.mod h1:fMwpwEiuUgpab0sH4VHiVcEoji1pSi+EIzeG4TPeKPc=
|
||||
github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34=
|
||||
github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo=
|
||||
github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps=
|
||||
github.com/getsentry/sentry-go v0.27.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY=
|
||||
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
|
||||
@ -362,8 +366,8 @@ github.com/ipfs-shipyard/nopfs/ipfs v0.13.2-0.20231027223058-cde3b5ba964c h1:7Uy
|
||||
github.com/ipfs-shipyard/nopfs/ipfs v0.13.2-0.20231027223058-cde3b5ba964c/go.mod h1:6EekK/jo+TynwSE/ZOiOJd4eEvRXoavEC3vquKtv4yI=
|
||||
github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs=
|
||||
github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0=
|
||||
github.com/ipfs/boxo v0.24.4-0.20241125210908-37756ce2eeb1 h1:Ox1qTlON8qG46rUL7dDEwnIt7W9MhaidtvR/97RywWw=
|
||||
github.com/ipfs/boxo v0.24.4-0.20241125210908-37756ce2eeb1/go.mod h1:Kxk43F+avGAsJSwhJW4isNYrpGwXHRJCvJ19Pt+MQc4=
|
||||
github.com/ipfs/boxo v0.24.4-0.20241203185533-3a3e8afa3492 h1:kiS5+H+6aJeNWWDynuYu/ijgzkBTrInl++VFcNDgq+g=
|
||||
github.com/ipfs/boxo v0.24.4-0.20241203185533-3a3e8afa3492/go.mod h1:lAoydO+oJhB1e7pUn4ju1Z1fuUIwy+zb0hQXRb/bu2g=
|
||||
github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA=
|
||||
github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU=
|
||||
github.com/ipfs/go-bitswap v0.11.0 h1:j1WVvhDX1yhG32NTC9xfxnqycqYIlhzEzLXG/cU1HyQ=
|
||||
|
@ -119,7 +119,7 @@ require (
|
||||
github.com/huin/goupnp v1.3.0 // indirect
|
||||
github.com/inconshreveable/mousetrap v1.1.0 // indirect
|
||||
github.com/ipfs/bbloom v0.0.4 // indirect
|
||||
github.com/ipfs/boxo v0.24.4-0.20241125210908-37756ce2eeb1 // indirect
|
||||
github.com/ipfs/boxo v0.24.4-0.20241203185533-3a3e8afa3492 // indirect
|
||||
github.com/ipfs/go-block-format v0.2.0 // indirect
|
||||
github.com/ipfs/go-cid v0.4.1 // indirect
|
||||
github.com/ipfs/go-datastore v0.6.0 // indirect
|
||||
|
@ -162,6 +162,10 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos
|
||||
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
|
||||
github.com/fzipp/gocyclo v0.6.0 h1:lsblElZG7d3ALtGMx9fmxeTKZaLLpU8mET09yN4BBLo=
|
||||
github.com/fzipp/gocyclo v0.6.0/go.mod h1:rXPyn8fnlpa0R2csP/31uerbiVBugk5whMdlyaLkLoA=
|
||||
github.com/gammazero/chanqueue v1.0.0 h1:FER/sMailGFA3DDvFooEkipAMU+3c9Bg3bheloPSz6o=
|
||||
github.com/gammazero/chanqueue v1.0.0/go.mod h1:fMwpwEiuUgpab0sH4VHiVcEoji1pSi+EIzeG4TPeKPc=
|
||||
github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34=
|
||||
github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo=
|
||||
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
|
||||
github.com/ghostiam/protogetter v0.3.6 h1:R7qEWaSgFCsy20yYHNIJsU9ZOb8TziSRRxuAOTVKeOk=
|
||||
github.com/ghostiam/protogetter v0.3.6/go.mod h1:7lpeDnEJ1ZjL/YtyoN99ljO4z0pd3H0d18/t2dPBxHw=
|
||||
@ -318,8 +322,8 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2
|
||||
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
|
||||
github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs=
|
||||
github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0=
|
||||
github.com/ipfs/boxo v0.24.4-0.20241125210908-37756ce2eeb1 h1:Ox1qTlON8qG46rUL7dDEwnIt7W9MhaidtvR/97RywWw=
|
||||
github.com/ipfs/boxo v0.24.4-0.20241125210908-37756ce2eeb1/go.mod h1:Kxk43F+avGAsJSwhJW4isNYrpGwXHRJCvJ19Pt+MQc4=
|
||||
github.com/ipfs/boxo v0.24.4-0.20241203185533-3a3e8afa3492 h1:kiS5+H+6aJeNWWDynuYu/ijgzkBTrInl++VFcNDgq+g=
|
||||
github.com/ipfs/boxo v0.24.4-0.20241203185533-3a3e8afa3492/go.mod h1:lAoydO+oJhB1e7pUn4ju1Z1fuUIwy+zb0hQXRb/bu2g=
|
||||
github.com/ipfs/go-block-format v0.2.0 h1:ZqrkxBA2ICbDRbK8KJs/u0O3dlp6gmAuuXUJNiW1Ycs=
|
||||
github.com/ipfs/go-block-format v0.2.0/go.mod h1:+jpL11nFx5A/SPpsoBn6Bzkra/zaArfSmsknbPMYgzM=
|
||||
github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s=
|
||||
|
Reference in New Issue
Block a user