mirror of
https://github.com/ipfs/kubo.git
synced 2025-12-14 13:27:45 +08:00
pin: implement pin/ls with only CoreApi
This commit is contained in:
committed by
Steven Allen
parent
01e27f9065
commit
6ef190f5a6
@@ -3,11 +3,11 @@ package coreapi
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
bserv "github.com/ipfs/go-blockservice"
|
||||
"github.com/ipfs/go-cid"
|
||||
offline "github.com/ipfs/go-ipfs-exchange-offline"
|
||||
pin "github.com/ipfs/go-ipfs-pinner"
|
||||
ipld "github.com/ipfs/go-ipld-format"
|
||||
"github.com/ipfs/go-merkledag"
|
||||
coreiface "github.com/ipfs/interface-go-ipfs-core"
|
||||
caopts "github.com/ipfs/interface-go-ipfs-core/options"
|
||||
@@ -41,7 +41,7 @@ func (api *PinAPI) Add(ctx context.Context, p path.Path, opts ...caopts.PinAddOp
|
||||
return api.pinning.Flush(ctx)
|
||||
}
|
||||
|
||||
func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) ([]coreiface.Pin, error) {
|
||||
func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan coreiface.Pin, error) {
|
||||
settings, err := caopts.PinLsOptions(opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -53,7 +53,26 @@ func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) ([]coreif
|
||||
return nil, fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", settings.Type)
|
||||
}
|
||||
|
||||
return api.pinLsAll(settings.Type, ctx)
|
||||
return api.pinLsAll(settings.Type, ctx), nil
|
||||
}
|
||||
|
||||
func (api *PinAPI) IsPinned(ctx context.Context, p path.Path, opts ...caopts.PinIsPinnedOption) (string, bool, error) {
|
||||
dagNode, err := api.core().ResolveNode(ctx, p)
|
||||
if err != nil {
|
||||
return "", false, fmt.Errorf("pin: %s", err)
|
||||
}
|
||||
|
||||
settings, err := caopts.PinIsPinnedOptions(opts...)
|
||||
if err != nil {
|
||||
return "", false, err
|
||||
}
|
||||
|
||||
mode, ok := pin.StringToMode(settings.WithType)
|
||||
if !ok {
|
||||
return "", false, fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", settings.WithType)
|
||||
}
|
||||
|
||||
return api.pinning.IsPinnedWithType(ctx, dagNode.Cid(), mode)
|
||||
}
|
||||
|
||||
// Rm pin rm api
|
||||
@@ -184,6 +203,7 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, erro
|
||||
type pinInfo struct {
|
||||
pinType string
|
||||
path path.Resolved
|
||||
err error
|
||||
}
|
||||
|
||||
func (p *pinInfo) Path() path.Resolved {
|
||||
@@ -194,123 +214,106 @@ func (p *pinInfo) Type() string {
|
||||
return p.pinType
|
||||
}
|
||||
|
||||
func (api *PinAPI) pinLsAll(typeStr string, ctx context.Context) ([]coreiface.Pin, error) {
|
||||
pinCh, errCh := PinLsAll(ctx, typeStr, api.pinning, api.dag)
|
||||
|
||||
var pins []coreiface.Pin
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case p, ok := <-pinCh:
|
||||
if !ok {
|
||||
break loop
|
||||
}
|
||||
pins = append(pins, p)
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
err := <-errCh
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return pins, nil
|
||||
func (p *pinInfo) Err() error {
|
||||
return p.err
|
||||
}
|
||||
|
||||
// PinLsAll is an internal function for returning a list of pins
|
||||
func PinLsAll(ctx context.Context, typeStr string, pin pin.Pinner, dag ipld.DAGService) (chan coreiface.Pin, chan error) {
|
||||
ch := make(chan coreiface.Pin, 32)
|
||||
errCh := make(chan error, 1)
|
||||
func (api *PinAPI) pinLsAll(typeStr string, ctx context.Context) <-chan coreiface.Pin {
|
||||
out := make(chan coreiface.Pin)
|
||||
|
||||
keys := cid.NewSet()
|
||||
AddToResultKeys := func(keyList []cid.Cid, typeStr string) error {
|
||||
|
||||
AddToResultKeys := func(keyList []cid.Cid, typeStr string) {
|
||||
for _, c := range keyList {
|
||||
if keys.Visit(c) {
|
||||
select {
|
||||
case ch <- &pinInfo{
|
||||
out <- &pinInfo{
|
||||
pinType: typeStr,
|
||||
path: path.IpldPath(c),
|
||||
}:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
VisitKeys := func(keyList []cid.Cid) {
|
||||
for _, c := range keyList {
|
||||
keys.Visit(c)
|
||||
}
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer close(ch)
|
||||
defer close(errCh)
|
||||
if typeStr == "direct" || typeStr == "all" {
|
||||
dkeys, err := pin.DirectKeys(ctx)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
if err := AddToResultKeys(dkeys, "direct"); err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
defer close(out)
|
||||
|
||||
if typeStr == "recursive" || typeStr == "all" {
|
||||
rkeys, err := pin.RecursiveKeys(ctx)
|
||||
rkeys, err := api.pinning.RecursiveKeys(ctx)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
if err := AddToResultKeys(rkeys, "recursive"); err != nil {
|
||||
errCh <- err
|
||||
out <- &pinInfo{err: err}
|
||||
return
|
||||
}
|
||||
AddToResultKeys(rkeys, "recursive")
|
||||
}
|
||||
if typeStr == "indirect" || typeStr == "all" {
|
||||
rkeys, err := pin.RecursiveKeys(ctx)
|
||||
if typeStr == "direct" || typeStr == "all" {
|
||||
dkeys, err := api.pinning.DirectKeys(ctx)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
out <- &pinInfo{err: err}
|
||||
return
|
||||
}
|
||||
|
||||
// If we're only listing indirect pins, we need to
|
||||
// explicitly mark direct/recursive pins so we don't
|
||||
// send them.
|
||||
if typeStr == "indirect" {
|
||||
dkeys, err := pin.DirectKeys(ctx)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
for _, k := range dkeys {
|
||||
keys.Add(k)
|
||||
}
|
||||
for _, k := range rkeys {
|
||||
keys.Add(k)
|
||||
}
|
||||
AddToResultKeys(dkeys, "direct")
|
||||
}
|
||||
if typeStr == "all" {
|
||||
set := cid.NewSet()
|
||||
rkeys, err := api.pinning.RecursiveKeys(ctx)
|
||||
if err != nil {
|
||||
out <- &pinInfo{err: err}
|
||||
return
|
||||
}
|
||||
|
||||
indirectKeys := cid.NewSet()
|
||||
for _, k := range rkeys {
|
||||
err := merkledag.Walk(ctx, merkledag.GetLinksWithDAG(dag), k, func(c cid.Cid) bool {
|
||||
r := indirectKeys.Visit(c)
|
||||
if r {
|
||||
if err := AddToResultKeys([]cid.Cid{c}, "indirect"); err != nil {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return r
|
||||
}, merkledag.SkipRoot(), merkledag.Concurrent())
|
||||
|
||||
err := merkledag.Walk(
|
||||
ctx, merkledag.GetLinksWithDAG(api.dag), k,
|
||||
set.Visit,
|
||||
merkledag.SkipRoot(), merkledag.Concurrent(),
|
||||
)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
out <- &pinInfo{err: err}
|
||||
return
|
||||
}
|
||||
}
|
||||
AddToResultKeys(set.Keys(), "indirect")
|
||||
}
|
||||
if typeStr == "indirect" {
|
||||
// We need to first visit the direct pins that have priority
|
||||
// without emitting them
|
||||
|
||||
dkeys, err := api.pinning.DirectKeys(ctx)
|
||||
if err != nil {
|
||||
out <- &pinInfo{err: err}
|
||||
return
|
||||
}
|
||||
VisitKeys(dkeys)
|
||||
|
||||
rkeys, err := api.pinning.RecursiveKeys(ctx)
|
||||
if err != nil {
|
||||
out <- &pinInfo{err: err}
|
||||
return
|
||||
}
|
||||
VisitKeys(rkeys)
|
||||
|
||||
set := cid.NewSet()
|
||||
for _, k := range rkeys {
|
||||
err := merkledag.Walk(
|
||||
ctx, merkledag.GetLinksWithDAG(api.dag), k,
|
||||
set.Visit,
|
||||
merkledag.SkipRoot(), merkledag.Concurrent(),
|
||||
)
|
||||
if err != nil {
|
||||
out <- &pinInfo{err: err}
|
||||
return
|
||||
}
|
||||
}
|
||||
AddToResultKeys(set.Keys(), "indirect")
|
||||
}
|
||||
}()
|
||||
|
||||
return ch, errCh
|
||||
return out
|
||||
}
|
||||
|
||||
func (api *PinAPI) core() coreiface.CoreAPI {
|
||||
|
||||
Reference in New Issue
Block a user