package coreapi
import (
"context"
"fmt"
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/filestore"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
"github.com/ipfs/go-ipfs/core/coreapi/interface/options"
"github.com/ipfs/go-ipfs/core/coreunix"
ft "gx/ipfs/QmQ1JnYpnzkaurjW1yxkQxC2w3K1PorNE1nv1vaP5Le7sq/go-unixfs"
unixfile "gx/ipfs/QmQ1JnYpnzkaurjW1yxkQxC2w3K1PorNE1nv1vaP5Le7sq/go-unixfs/file"
uio "gx/ipfs/QmQ1JnYpnzkaurjW1yxkQxC2w3K1PorNE1nv1vaP5Le7sq/go-unixfs/io"
mfs "gx/ipfs/QmR66iEqVtNMbbZxTHPY3F6W5QLFqZEDbFD7gzbE9HpYXU/go-mfs"
cid "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid"
ipld "gx/ipfs/QmRL22E4paat7ky7vx9MLpR97JHHbFPrg3ytFQw6qp1y1s/go-ipld-format"
bstore "gx/ipfs/QmS2aqUZLJp8kF1ihE5rvDGE5LvmKDPnx32w9Z1BW9xLV5/go-ipfs-blockstore"
blockservice "gx/ipfs/QmVKQHuzni68SWByzJgBUCwHvvr4TWiXfutNWWwpZpp4rE/go-blockservice"
files "gx/ipfs/QmaXvvAVAQ5ABqM5xtjYmV85xmN5MkWAZsX9H9Fwo4FVXp/go-ipfs-files"
dag "gx/ipfs/Qmb2UEG2TAeVrEJSjqsZF7Y2he7wRDkrdt6c3bECxwZf4k/go-merkledag"
merkledag "gx/ipfs/Qmb2UEG2TAeVrEJSjqsZF7Y2he7wRDkrdt6c3bECxwZf4k/go-merkledag"
dagtest "gx/ipfs/Qmb2UEG2TAeVrEJSjqsZF7Y2he7wRDkrdt6c3bECxwZf4k/go-merkledag/test"
cidutil "gx/ipfs/QmdPQx9fvN5ExVwMhRmh7YpCQJzJrFhd1AjVBwJmRMFJeX/go-cidutil"
)
type UnixfsAPI CoreAPI
// Add builds a merkledag node from a reader, adds it to the blockstore,
// and returns the key representing that node.
func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options.UnixfsAddOption) (coreiface.ResolvedPath, error) {
settings, prefix, err := options.UnixfsAddOptions(opts...)
if err != nil {
return nil, err
}
cfg, err := api.repo.Config()
if err != nil {
return nil, err
}
// check if repo will exceed storage limit if added
// TODO: this doesn't handle the case if the hashed file is already in blocks (deduplicated)
// TODO: conditional GC is disabled due to it is somehow not possible to pass the size to the daemon
//if err := corerepo.ConditionalGC(req.Context(), n, uint64(size)); err != nil {
// res.SetError(err, cmdkit.ErrNormal)
// return
//}
if settings.NoCopy && !cfg.Experimental.FilestoreEnabled {
return nil, filestore.ErrFilestoreNotEnabled
}
addblockstore := api.blockstore
if !(settings.FsCache || settings.NoCopy) {
addblockstore = bstore.NewGCBlockstore(api.baseBlocks, api.blockstore)
}
exch := api.exchange
pinning := api.pinning
if settings.OnlyHash {
nilnode, err := core.NewNode(ctx, &core.BuildCfg{
//TODO: need this to be true or all files
// hashed will be stored in memory!
NilRepo: true,
})
if err != nil {
return nil, err
}
addblockstore = nilnode.Blockstore
exch = nilnode.Exchange
pinning = nilnode.Pinning
}
bserv := blockservice.New(addblockstore, exch) // hash security 001
dserv := dag.NewDAGService(bserv)
fileAdder, err := coreunix.NewAdder(ctx, pinning, addblockstore, dserv)
if err != nil {
return nil, err
}
fileAdder.Chunker = settings.Chunker
if settings.Events != nil {
fileAdder.Out = settings.Events
fileAdder.Progress = settings.Progress
}
fileAdder.Hidden = settings.Hidden
fileAdder.Wrap = settings.Wrap
fileAdder.Pin = settings.Pin && !settings.OnlyHash
fileAdder.Silent = settings.Silent
fileAdder.RawLeaves = settings.RawLeaves
fileAdder.NoCopy = settings.NoCopy
fileAdder.Name = settings.StdinName
fileAdder.CidBuilder = prefix
switch settings.Layout {
case options.BalancedLayout:
// Default
case options.TrickleLayout:
fileAdder.Trickle = true
default:
return nil, fmt.Errorf("unknown layout: %d", settings.Layout)
}
if settings.Inline {
fileAdder.CidBuilder = cidutil.InlineBuilder{
Builder: fileAdder.CidBuilder,
Limit: settings.InlineLimit,
}
}
if settings.OnlyHash {
md := dagtest.Mock()
emptyDirNode := ft.EmptyDirNode()
// Use the same prefix for the "empty" MFS root as for the file adder.
emptyDirNode.SetCidBuilder(fileAdder.CidBuilder)
mr, err := mfs.NewRoot(ctx, md, emptyDirNode, nil)
if err != nil {
return nil, err
}
fileAdder.SetMfsRoot(mr)
}
nd, err := fileAdder.AddAllAndPin(files)
if err != nil {
return nil, err
}
return coreiface.IpfsPath(nd.Cid()), nil
}
func (api *UnixfsAPI) Get(ctx context.Context, p coreiface.Path) (files.Node, error) {
ses := api.core().getSession(ctx)
nd, err := ses.ResolveNode(ctx, p)
if err != nil {
return nil, err
}
return unixfile.NewUnixfsFile(ctx, ses.dag, nd)
}
// Ls returns the contents of an IPFS or IPNS object(s) at path p, with the format:
// ` `
func (api *UnixfsAPI) Ls(ctx context.Context, p coreiface.Path, opts ...options.UnixfsLsOption) (<-chan coreiface.LsLink, error) {
settings, err := options.UnixfsLsOptions(opts...)
if err != nil {
return nil, err
}
ses := api.core().getSession(ctx)
uses := (*UnixfsAPI)(ses)
dagnode, err := ses.ResolveNode(ctx, p)
if err != nil {
return nil, err
}
dir, err := uio.NewDirectoryFromNode(ses.dag, dagnode)
if err == uio.ErrNotADir {
return uses.lsFromLinks(ctx, dagnode.Links(), settings)
}
if err != nil {
return nil, err
}
return uses.lsFromLinksAsync(ctx, dir, settings)
}
func (api *UnixfsAPI) processLink(ctx context.Context, linkres ft.LinkResult, settings *options.UnixfsLsSettings) coreiface.LsLink {
lnk := coreiface.LsLink{
Link: linkres.Link,
Err: linkres.Err,
}
if lnk.Err != nil {
return lnk
}
switch lnk.Link.Cid.Type() {
case cid.Raw:
// No need to check with raw leaves
lnk.Type = coreiface.TFile
lnk.Size = lnk.Link.Size
case cid.DagProtobuf:
if !settings.ResolveChildren {
break
}
linkNode, err := lnk.Link.GetNode(ctx, api.dag)
if err != nil {
lnk.Err = err
break
}
if pn, ok := linkNode.(*merkledag.ProtoNode); ok {
d, err := ft.FSNodeFromBytes(pn.Data())
if err != nil {
lnk.Err = err
break
}
lnk.Type = coreiface.FileType(d.Type())
lnk.Size = d.FileSize()
}
}
return lnk
}
func (api *UnixfsAPI) lsFromLinksAsync(ctx context.Context, dir uio.Directory, settings *options.UnixfsLsSettings) (<-chan coreiface.LsLink, error) {
out := make(chan coreiface.LsLink)
go func() {
defer close(out)
for l := range dir.EnumLinksAsync(ctx) {
select {
case out <- api.processLink(ctx, l, settings): //TODO: perf: processing can be done in background and in parallel
case <-ctx.Done():
return
}
}
}()
return out, nil
}
func (api *UnixfsAPI) lsFromLinks(ctx context.Context, ndlinks []*ipld.Link, settings *options.UnixfsLsSettings) (<-chan coreiface.LsLink, error) {
links := make(chan coreiface.LsLink, len(ndlinks))
for _, l := range ndlinks {
lr := ft.LinkResult{Link: &ipld.Link{Name: l.Name, Size: l.Size, Cid: l.Cid}}
links <- api.processLink(ctx, lr, settings) //TODO: can be parallel if settings.Async
}
close(links)
return links, nil
}
func (api *UnixfsAPI) core() *CoreAPI {
return (*CoreAPI)(api)
}