1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-05-17 06:57:40 +08:00
Files
kubo/core/coreapi/unixfs.go
Hector Sanjuan 6b55e64918 feat(config): ipfs add and Import options for controling UnixFS DAG Width (#10774)
Co-authored-by: Marcin Rataj <lidel@lidel.org>
2025-04-15 22:56:38 +02:00

366 lines
11 KiB
Go

package coreapi
import (
"context"
"errors"
"fmt"
blockservice "github.com/ipfs/boxo/blockservice"
bstore "github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/files"
filestore "github.com/ipfs/boxo/filestore"
merkledag "github.com/ipfs/boxo/ipld/merkledag"
dagtest "github.com/ipfs/boxo/ipld/merkledag/test"
ft "github.com/ipfs/boxo/ipld/unixfs"
unixfile "github.com/ipfs/boxo/ipld/unixfs/file"
uio "github.com/ipfs/boxo/ipld/unixfs/io"
"github.com/ipfs/boxo/mfs"
"github.com/ipfs/boxo/path"
cid "github.com/ipfs/go-cid"
cidutil "github.com/ipfs/go-cidutil"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/kubo/config"
coreiface "github.com/ipfs/kubo/core/coreiface"
options "github.com/ipfs/kubo/core/coreiface/options"
"github.com/ipfs/kubo/core/coreunix"
"github.com/ipfs/kubo/tracing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
type UnixfsAPI CoreAPI
// Add builds a merkledag node from a reader, adds it to the blockstore,
// and returns the key representing that node.
func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options.UnixfsAddOption) (path.ImmutablePath, error) {
ctx, span := tracing.Span(ctx, "CoreAPI.UnixfsAPI", "Add")
defer span.End()
settings, prefix, err := options.UnixfsAddOptions(opts...)
if err != nil {
return path.ImmutablePath{}, err
}
span.SetAttributes(
attribute.String("chunker", settings.Chunker),
attribute.Int("cidversion", settings.CidVersion),
attribute.Bool("inline", settings.Inline),
attribute.Int("inlinelimit", settings.InlineLimit),
attribute.Bool("rawleaves", settings.RawLeaves),
attribute.Bool("rawleavesset", settings.RawLeavesSet),
attribute.Int("maxfilelinks", settings.MaxFileLinks),
attribute.Bool("maxfilelinksset", settings.MaxFileLinksSet),
attribute.Int("maxdirectorylinks", settings.MaxDirectoryLinks),
attribute.Bool("maxdirectorylinksset", settings.MaxDirectoryLinksSet),
attribute.Int("maxhamtfanout", settings.MaxHAMTFanout),
attribute.Bool("maxhamtfanoutset", settings.MaxHAMTFanoutSet),
attribute.Int("layout", int(settings.Layout)),
attribute.Bool("pin", settings.Pin),
attribute.Bool("onlyhash", settings.OnlyHash),
attribute.Bool("fscache", settings.FsCache),
attribute.Bool("nocopy", settings.NoCopy),
attribute.Bool("silent", settings.Silent),
attribute.Bool("progress", settings.Progress),
)
cfg, err := api.repo.Config()
if err != nil {
return path.ImmutablePath{}, err
}
// check if repo will exceed storage limit if added
// TODO: this doesn't handle the case if the hashed file is already in blocks (deduplicated)
// TODO: conditional GC is disabled due to it is somehow not possible to pass the size to the daemon
//if err := corerepo.ConditionalGC(req.Context(), n, uint64(size)); err != nil {
// res.SetError(err, cmds.ErrNormal)
// return
//}
if settings.NoCopy && !(cfg.Experimental.FilestoreEnabled || cfg.Experimental.UrlstoreEnabled) {
return path.ImmutablePath{}, errors.New("either the filestore or the urlstore must be enabled to use nocopy, see: https://github.com/ipfs/kubo/blob/master/docs/experimental-features.md#ipfs-filestore")
}
addblockstore := api.blockstore
if !(settings.FsCache || settings.NoCopy) {
addblockstore = bstore.NewGCBlockstore(api.baseBlocks, api.blockstore)
}
exch := api.exchange
pinning := api.pinning
if settings.OnlyHash {
// setup a /dev/null pipeline to simulate adding the data
dstore := dssync.MutexWrap(ds.NewNullDatastore())
bs := bstore.NewBlockstore(dstore, bstore.WriteThrough(true)) // we use NewNullDatastore, so ok to always WriteThrough when OnlyHash
addblockstore = bstore.NewGCBlockstore(bs, nil) // gclocker will never be used
exch = nil // exchange will never be used
pinning = nil // pinner will never be used
}
bserv := blockservice.New(addblockstore, exch,
blockservice.WriteThrough(cfg.Datastore.WriteThrough.WithDefault(config.DefaultWriteThrough)),
) // hash security 001
dserv := merkledag.NewDAGService(bserv)
// add a sync call to the DagService
// this ensures that data written to the DagService is persisted to the underlying datastore
// TODO: propagate the Sync function from the datastore through the blockstore, blockservice and dagservice
var syncDserv *syncDagService
if settings.OnlyHash {
syncDserv = &syncDagService{
DAGService: dserv,
syncFn: func() error { return nil },
}
} else {
syncDserv = &syncDagService{
DAGService: dserv,
syncFn: func() error {
rds := api.repo.Datastore()
if err := rds.Sync(ctx, bstore.BlockPrefix); err != nil {
return err
}
return rds.Sync(ctx, filestore.FilestorePrefix)
},
}
}
fileAdder, err := coreunix.NewAdder(ctx, pinning, addblockstore, syncDserv)
if err != nil {
return path.ImmutablePath{}, err
}
fileAdder.Chunker = settings.Chunker
if settings.Events != nil {
fileAdder.Out = settings.Events
fileAdder.Progress = settings.Progress
}
fileAdder.Pin = settings.Pin && !settings.OnlyHash
fileAdder.Silent = settings.Silent
fileAdder.RawLeaves = settings.RawLeaves
if settings.MaxFileLinksSet {
fileAdder.MaxLinks = settings.MaxFileLinks
}
if settings.MaxDirectoryLinksSet {
fileAdder.MaxDirectoryLinks = settings.MaxDirectoryLinks
}
if settings.MaxHAMTFanoutSet {
fileAdder.MaxHAMTFanout = settings.MaxHAMTFanout
}
fileAdder.NoCopy = settings.NoCopy
fileAdder.CidBuilder = prefix
fileAdder.PreserveMode = settings.PreserveMode
fileAdder.PreserveMtime = settings.PreserveMtime
fileAdder.FileMode = settings.Mode
fileAdder.FileMtime = settings.Mtime
switch settings.Layout {
case options.BalancedLayout:
// Default
case options.TrickleLayout:
fileAdder.Trickle = true
default:
return path.ImmutablePath{}, fmt.Errorf("unknown layout: %d", settings.Layout)
}
if settings.Inline {
fileAdder.CidBuilder = cidutil.InlineBuilder{
Builder: fileAdder.CidBuilder,
Limit: settings.InlineLimit,
}
}
if settings.OnlyHash {
md := dagtest.Mock()
emptyDirNode := ft.EmptyDirNode()
// Use the same prefix for the "empty" MFS root as for the file adder.
err := emptyDirNode.SetCidBuilder(fileAdder.CidBuilder)
if err != nil {
return path.ImmutablePath{}, err
}
mr, err := mfs.NewRoot(ctx, md, emptyDirNode, nil)
if err != nil {
return path.ImmutablePath{}, err
}
fileAdder.SetMfsRoot(mr)
}
nd, err := fileAdder.AddAllAndPin(ctx, files)
if err != nil {
return path.ImmutablePath{}, err
}
if !settings.OnlyHash {
if err := api.provider.Provide(ctx, nd.Cid(), true); err != nil {
return path.ImmutablePath{}, err
}
}
return path.FromCid(nd.Cid()), nil
}
func (api *UnixfsAPI) Get(ctx context.Context, p path.Path) (files.Node, error) {
ctx, span := tracing.Span(ctx, "CoreAPI.UnixfsAPI", "Get", trace.WithAttributes(attribute.String("path", p.String())))
defer span.End()
ses := api.core().getSession(ctx)
nd, err := ses.ResolveNode(ctx, p)
if err != nil {
return nil, err
}
return unixfile.NewUnixfsFile(ctx, ses.dag, nd)
}
// Ls returns the contents of an IPFS or IPNS object(s) at path p, with the format:
// `<link base58 hash> <link size in bytes> <link name>`
func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, out chan<- coreiface.DirEntry, opts ...options.UnixfsLsOption) error {
ctx, span := tracing.Span(ctx, "CoreAPI.UnixfsAPI", "Ls", trace.WithAttributes(attribute.String("path", p.String())))
defer span.End()
defer close(out)
settings, err := options.UnixfsLsOptions(opts...)
if err != nil {
return err
}
span.SetAttributes(attribute.Bool("resolvechildren", settings.ResolveChildren))
ses := api.core().getSession(ctx)
uses := (*UnixfsAPI)(ses)
dagnode, err := ses.ResolveNode(ctx, p)
if err != nil {
return err
}
dir, err := uio.NewDirectoryFromNode(ses.dag, dagnode)
if err != nil {
if errors.Is(err, uio.ErrNotADir) {
return uses.lsFromLinks(ctx, dagnode.Links(), settings, out)
}
return err
}
return uses.lsFromDirLinks(ctx, dir, settings, out)
}
func (api *UnixfsAPI) processLink(ctx context.Context, linkres ft.LinkResult, settings *options.UnixfsLsSettings) (coreiface.DirEntry, error) {
ctx, span := tracing.Span(ctx, "CoreAPI.UnixfsAPI", "ProcessLink")
defer span.End()
if linkres.Link != nil {
span.SetAttributes(attribute.String("linkname", linkres.Link.Name), attribute.String("cid", linkres.Link.Cid.String()))
}
if linkres.Err != nil {
return coreiface.DirEntry{}, linkres.Err
}
lnk := coreiface.DirEntry{
Name: linkres.Link.Name,
Cid: linkres.Link.Cid,
}
switch lnk.Cid.Type() {
case cid.Raw:
// No need to check with raw leaves
lnk.Type = coreiface.TFile
lnk.Size = linkres.Link.Size
case cid.DagProtobuf:
if settings.ResolveChildren {
linkNode, err := linkres.Link.GetNode(ctx, api.dag)
if err != nil {
return coreiface.DirEntry{}, err
}
if pn, ok := linkNode.(*merkledag.ProtoNode); ok {
d, err := ft.FSNodeFromBytes(pn.Data())
if err != nil {
return coreiface.DirEntry{}, err
}
switch d.Type() {
case ft.TFile, ft.TRaw:
lnk.Type = coreiface.TFile
case ft.THAMTShard, ft.TDirectory, ft.TMetadata:
lnk.Type = coreiface.TDirectory
case ft.TSymlink:
lnk.Type = coreiface.TSymlink
lnk.Target = string(d.Data())
}
if !settings.UseCumulativeSize {
lnk.Size = d.FileSize()
}
lnk.Mode = d.Mode()
lnk.ModTime = d.ModTime()
}
}
if settings.UseCumulativeSize {
lnk.Size = linkres.Link.Size
}
}
return lnk, nil
}
func (api *UnixfsAPI) lsFromDirLinks(ctx context.Context, dir uio.Directory, settings *options.UnixfsLsSettings, out chan<- coreiface.DirEntry) error {
for l := range dir.EnumLinksAsync(ctx) {
dirEnt, err := api.processLink(ctx, l, settings) // TODO: perf: processing can be done in background and in parallel
if err != nil {
return err
}
select {
case out <- dirEnt:
case <-ctx.Done():
return nil
}
}
return nil
}
func (api *UnixfsAPI) lsFromLinks(ctx context.Context, ndlinks []*ipld.Link, settings *options.UnixfsLsSettings, out chan<- coreiface.DirEntry) error {
// Create links channel large enough to not block when writing to out is slower.
links := make(chan coreiface.DirEntry, len(ndlinks))
errs := make(chan error, 1)
go func() {
defer close(links)
defer close(errs)
for _, l := range ndlinks {
lr := ft.LinkResult{Link: &ipld.Link{Name: l.Name, Size: l.Size, Cid: l.Cid}}
lnk, err := api.processLink(ctx, lr, settings) // TODO: can be parallel if settings.Async
if err != nil {
errs <- err
return
}
select {
case links <- lnk:
case <-ctx.Done():
return
}
}
}()
for lnk := range links {
out <- lnk
}
return <-errs
}
func (api *UnixfsAPI) core() *CoreAPI {
return (*CoreAPI)(api)
}
// syncDagService is used by the Adder to ensure blocks get persisted to the underlying datastore
type syncDagService struct {
ipld.DAGService
syncFn func() error
}
func (s *syncDagService) Sync() error {
return s.syncFn()
}