1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-09-18 13:54:02 +08:00
Files
kubo/core/coreapi/pin.go
Hector Sanjuan a673c2ec95 fix: Provide according to Reprovider.Strategy (#10886)
* Provide according to strategy

Updates boxo to a version with the changes from https://github.com/ipfs/boxo/pull/976, which decentralize the providing responsibilities (from a central providing.Exchange to blockstore, pinner, mfs).

The changes consist in initializing the Pinner, MFS and the blockstore with the provider.System, which is created first.

Since the provider.System is created first, the reproviding KeyChanFunc is set
later when we can create it once we have the Pinner, MFS and the blockstore.

Some additional work applies to the Add() workflow. Normally, blocks would get provided at the Blockstore or the Pinner, but when adding blocks AND a "pinned" strategy is used, the blockstore does not provide, and the
pinner does not traverse the DAG (and thus doesn't provide either), so we need to provide directly from the Adder. This is resolved by wrapping the DAGService in a "providingDAGService" which provides every added block, when using the "pinned" strategy.

`ipfs --offline add` when the ONLINE daemon is running will now announce blocks per the chosen strategy, where before it did not announce them. This is documented in the changelog. A couple of releases ago, adding with `ipfs --offline add` was faster, but this is no longer the case so we are not incurring in any penalties by sticking to the fact that the daemon is online and has a providing strategy that we follow.

Co-authored-by: gammazero <11790789+gammazero@users.noreply.github.com>
Co-authored-by: Marcin Rataj <lidel@lidel.org>
2025-08-08 10:56:44 +02:00

371 lines
8.9 KiB
Go

package coreapi
import (
"context"
"fmt"
"strings"
bserv "github.com/ipfs/boxo/blockservice"
offline "github.com/ipfs/boxo/exchange/offline"
"github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/boxo/path"
pin "github.com/ipfs/boxo/pinning/pinner"
"github.com/ipfs/go-cid"
coreiface "github.com/ipfs/kubo/core/coreiface"
caopts "github.com/ipfs/kubo/core/coreiface/options"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"github.com/ipfs/kubo/tracing"
)
type PinAPI CoreAPI
func (api *PinAPI) Add(ctx context.Context, p path.Path, opts ...caopts.PinAddOption) error {
ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "Add", trace.WithAttributes(attribute.String("path", p.String())))
defer span.End()
dagNode, err := api.core().ResolveNode(ctx, p)
if err != nil {
return fmt.Errorf("pin: %s", err)
}
settings, err := caopts.PinAddOptions(opts...)
if err != nil {
return err
}
span.SetAttributes(attribute.Bool("recursive", settings.Recursive))
defer api.blockstore.PinLock(ctx).Unlock(ctx)
err = api.pinning.Pin(ctx, dagNode, settings.Recursive, settings.Name)
if err != nil {
return fmt.Errorf("pin: %s", err)
}
return api.pinning.Flush(ctx)
}
func (api *PinAPI) Ls(ctx context.Context, pins chan<- coreiface.Pin, opts ...caopts.PinLsOption) error {
ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "Ls")
defer span.End()
settings, err := caopts.PinLsOptions(opts...)
if err != nil {
close(pins)
return err
}
span.SetAttributes(attribute.String("type", settings.Type))
switch settings.Type {
case "all", "direct", "indirect", "recursive":
default:
close(pins)
return fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", settings.Type)
}
return api.pinLsAll(ctx, settings.Type, settings.Detailed, settings.Name, pins)
}
func (api *PinAPI) IsPinned(ctx context.Context, p path.Path, opts ...caopts.PinIsPinnedOption) (string, bool, error) {
ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "IsPinned", trace.WithAttributes(attribute.String("path", p.String())))
defer span.End()
resolved, _, err := api.core().ResolvePath(ctx, p)
if err != nil {
return "", false, fmt.Errorf("error resolving path: %s", err)
}
settings, err := caopts.PinIsPinnedOptions(opts...)
if err != nil {
return "", false, err
}
span.SetAttributes(attribute.String("withtype", settings.WithType))
mode, ok := pin.StringToMode(settings.WithType)
if !ok {
return "", false, fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", settings.WithType)
}
return api.pinning.IsPinnedWithType(ctx, resolved.RootCid(), mode)
}
// Rm pin rm api
func (api *PinAPI) Rm(ctx context.Context, p path.Path, opts ...caopts.PinRmOption) error {
ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "Rm", trace.WithAttributes(attribute.String("path", p.String())))
defer span.End()
rp, _, err := api.core().ResolvePath(ctx, p)
if err != nil {
return err
}
settings, err := caopts.PinRmOptions(opts...)
if err != nil {
return err
}
span.SetAttributes(attribute.Bool("recursive", settings.Recursive))
// Note: after unpin the pin sets are flushed to the blockstore, so we need
// to take a lock to prevent a concurrent garbage collection
defer api.blockstore.PinLock(ctx).Unlock(ctx)
if err = api.pinning.Unpin(ctx, rp.RootCid(), settings.Recursive); err != nil {
return err
}
return api.pinning.Flush(ctx)
}
func (api *PinAPI) Update(ctx context.Context, from path.Path, to path.Path, opts ...caopts.PinUpdateOption) error {
ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "Update", trace.WithAttributes(
attribute.String("from", from.String()),
attribute.String("to", to.String()),
))
defer span.End()
settings, err := caopts.PinUpdateOptions(opts...)
if err != nil {
return err
}
span.SetAttributes(attribute.Bool("unpin", settings.Unpin))
fp, _, err := api.core().ResolvePath(ctx, from)
if err != nil {
return err
}
tp, _, err := api.core().ResolvePath(ctx, to)
if err != nil {
return err
}
defer api.blockstore.PinLock(ctx).Unlock(ctx)
err = api.pinning.Update(ctx, fp.RootCid(), tp.RootCid(), settings.Unpin)
if err != nil {
return err
}
return api.pinning.Flush(ctx)
}
type pinStatus struct {
err error
cid cid.Cid
ok bool
badNodes []coreiface.BadPinNode
}
// BadNode is used in PinVerifyRes
type badNode struct {
path path.ImmutablePath
err error
}
func (s *pinStatus) Ok() bool {
return s.ok
}
func (s *pinStatus) BadNodes() []coreiface.BadPinNode {
return s.badNodes
}
func (s *pinStatus) Err() error {
return s.err
}
func (n *badNode) Path() path.ImmutablePath {
return n.path
}
func (n *badNode) Err() error {
return n.err
}
func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, error) {
ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "Verify")
defer span.End()
visited := make(map[cid.Cid]*pinStatus)
bs := api.blockstore
DAG := merkledag.NewDAGService(bserv.New(bs, offline.Exchange(bs)))
getLinks := merkledag.GetLinksWithDAG(DAG)
var checkPin func(root cid.Cid) *pinStatus
checkPin = func(root cid.Cid) *pinStatus {
ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "Verify.CheckPin", trace.WithAttributes(attribute.String("cid", root.String())))
defer span.End()
if status, ok := visited[root]; ok {
return status
}
links, err := getLinks(ctx, root)
if err != nil {
status := &pinStatus{ok: false, cid: root}
status.badNodes = []coreiface.BadPinNode{&badNode{path: path.FromCid(root), err: err}}
visited[root] = status
return status
}
status := &pinStatus{ok: true, cid: root}
for _, lnk := range links {
res := checkPin(lnk.Cid)
if !res.ok {
status.ok = false
status.badNodes = append(status.badNodes, res.badNodes...)
}
}
visited[root] = status
return status
}
out := make(chan coreiface.PinStatus)
go func() {
defer close(out)
for p := range api.pinning.RecursiveKeys(ctx, false) {
var res *pinStatus
if p.Err != nil {
res = &pinStatus{err: p.Err}
} else {
res = checkPin(p.Pin.Key)
}
select {
case <-ctx.Done():
return
case out <- res:
}
}
}()
return out, nil
}
type pinInfo struct {
pinType string
path path.ImmutablePath
name string
}
func (p *pinInfo) Path() path.ImmutablePath {
return p.path
}
func (p *pinInfo) Type() string {
return p.pinType
}
func (p *pinInfo) Name() string {
return p.name
}
// pinLsAll is an internal function for returning a list of pins
//
// The caller must keep reading results until the channel is closed to prevent
// leaking the goroutine that is fetching pins.
func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool, name string, out chan<- coreiface.Pin) error {
defer close(out)
emittedSet := cid.NewSet()
AddToResultKeys := func(c cid.Cid, pinName, typeStr string) error {
if emittedSet.Visit(c) && (name == "" || strings.Contains(pinName, name)) {
select {
case out <- &pinInfo{
pinType: typeStr,
name: pinName,
path: path.FromCid(c),
}:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
var rkeys []cid.Cid
var err error
if typeStr == "recursive" || typeStr == "all" {
for streamedCid := range api.pinning.RecursiveKeys(ctx, detailed) {
if streamedCid.Err != nil {
return streamedCid.Err
}
if err = AddToResultKeys(streamedCid.Pin.Key, streamedCid.Pin.Name, "recursive"); err != nil {
return err
}
rkeys = append(rkeys, streamedCid.Pin.Key)
}
}
if typeStr == "direct" || typeStr == "all" {
for streamedCid := range api.pinning.DirectKeys(ctx, detailed) {
if streamedCid.Err != nil {
return streamedCid.Err
}
if err = AddToResultKeys(streamedCid.Pin.Key, streamedCid.Pin.Name, "direct"); err != nil {
return err
}
}
}
if typeStr == "indirect" {
// We need to first visit the direct pins that have priority
// without emitting them
for streamedCid := range api.pinning.DirectKeys(ctx, detailed) {
if streamedCid.Err != nil {
return streamedCid.Err
}
emittedSet.Add(streamedCid.Pin.Key)
}
for streamedCid := range api.pinning.RecursiveKeys(ctx, detailed) {
if streamedCid.Err != nil {
return streamedCid.Err
}
emittedSet.Add(streamedCid.Pin.Key)
rkeys = append(rkeys, streamedCid.Pin.Key)
}
}
if typeStr == "indirect" || typeStr == "all" {
if len(rkeys) == 0 {
return nil
}
var addErr error
walkingSet := cid.NewSet()
for _, k := range rkeys {
err = merkledag.Walk(
ctx, merkledag.GetLinksWithDAG(api.dag), k,
func(c cid.Cid) bool {
if !walkingSet.Visit(c) {
return false
}
if emittedSet.Has(c) {
return true // skipped
}
addErr = AddToResultKeys(c, "", "indirect")
return addErr == nil
},
merkledag.SkipRoot(), merkledag.Concurrent(),
)
if err != nil {
return err
}
if addErr != nil {
return addErr
}
}
}
return nil
}
func (api *PinAPI) core() coreiface.CoreAPI {
return (*CoreAPI)(api)
}