1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-20 02:21:48 +08:00

Merge pull request #5574 from ipfs/feat/coreapi/remove-cat

coreapi unixfs: remove Cat, use sessions
This commit is contained in:
Steven Allen
2018-10-23 00:56:39 +01:00
committed by GitHub
15 changed files with 106 additions and 80 deletions

View File

@ -6,7 +6,7 @@ import (
"io"
"os"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
"github.com/ipfs/go-ipfs/core/commands/cmdenv"
"github.com/ipfs/go-ipfs/core/coreapi/interface"
cmds "gx/ipfs/QmRRovo1DE6i5cMjCbf19mQCSuszF6SKwdZNUMS7MtBnH1/go-ipfs-cmds"
@ -135,32 +135,48 @@ func cat(ctx context.Context, api iface.CoreAPI, paths []string, offset int64, m
return nil, 0, err
}
read, err := api.Unixfs().Cat(ctx, fpath)
file, err := api.Unixfs().Get(ctx, fpath)
if err != nil {
return nil, 0, err
}
if offset > int64(read.Size()) {
offset = offset - int64(read.Size())
if file.IsDirectory() {
return nil, 0, iface.ErrIsDir
}
fsize, err := file.Size()
if err != nil {
return nil, 0, err
}
if offset > fsize {
offset = offset - fsize
continue
}
count, err := read.Seek(offset, io.SeekStart)
count, err := file.Seek(offset, io.SeekStart)
if err != nil {
return nil, 0, err
}
offset = 0
size := uint64(read.Size() - uint64(count))
fsize, err = file.Size()
if err != nil {
return nil, 0, err
}
size := uint64(fsize - count)
length += size
if max > 0 && length >= uint64(max) {
var r io.Reader = read
var r io.Reader = file
if overshoot := int64(length - uint64(max)); overshoot != 0 {
r = io.LimitReader(read, int64(size)-overshoot)
r = io.LimitReader(file, int64(size)-overshoot)
length = uint64(max)
}
readers = append(readers, r)
break
}
readers = append(readers, read)
readers = append(readers, file)
}
return readers, length, nil
}

View File

@ -14,21 +14,26 @@ Interfaces here aren't yet completely stable.
package coreapi
import (
"context"
core "github.com/ipfs/go-ipfs/core"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
dag "gx/ipfs/QmVvNkTCx8V9Zei8xuTYTBdUXmbnDRS4iNuw1SztYyhQwQ/go-merkledag"
logging "gx/ipfs/QmZChCsSt8DctjceaL56Eibc29CVQq4dGKRXC5JRZ6Ppae/go-log"
ipld "gx/ipfs/QmdDXJs4axxefSPgK6Y1QhpJWKuDPnGJiqgq4uncb4rFHL/go-ipld-format"
)
var log = logging.Logger("core/coreapi")
type CoreAPI struct {
node *core.IpfsNode
dag ipld.DAGService
}
// NewCoreAPI creates new instance of IPFS CoreAPI backed by go-ipfs Node.
func NewCoreAPI(n *core.IpfsNode) coreiface.CoreAPI {
api := &CoreAPI{n}
api := &CoreAPI{n, n.DAG}
return api
}
@ -81,3 +86,9 @@ func (api *CoreAPI) Swarm() coreiface.SwarmAPI {
func (api *CoreAPI) PubSub() coreiface.PubSubAPI {
return (*PubSubAPI)(api)
}
// getSession returns new api backed by the same node with a read-only session DAG
func (api *CoreAPI) getSession(ctx context.Context) *CoreAPI {
ng := dag.NewReadOnlyDagService(dag.NewSession(ctx, api.dag))
return &CoreAPI{api.node, ng}
}

View File

@ -31,7 +31,7 @@ type dagBatch struct {
func (api *DagAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.DagPutOption) (coreiface.ResolvedPath, error) {
nd, err := getNode(src, opts...)
err = api.node.DAG.Add(ctx, nd)
err = api.dag.Add(ctx, nd)
if err != nil {
return nil, err
}
@ -96,7 +96,7 @@ func (b *dagBatch) Commit(ctx context.Context) error {
b.toPut = nil
}()
return b.api.node.DAG.AddMany(ctx, b.toPut)
return b.api.dag.AddMany(ctx, b.toPut)
}
func getNode(src io.Reader, opts ...caopts.DagPutOption) (ipld.Node, error) {

View File

@ -2,6 +2,7 @@ package iface
import (
"context"
"io"
options "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
@ -17,6 +18,11 @@ type AddEvent struct {
Size string `json:",omitempty"`
}
type UnixfsFile interface {
files.SizeFile
io.Seeker
}
// UnixfsAPI is the basic interface to immutable files in IPFS
// NOTE: This API is heavily WIP, things are guaranteed to break frequently
type UnixfsAPI interface {
@ -29,11 +35,7 @@ type UnixfsAPI interface {
//
// Note that some implementations of this API may apply the specified context
// to operations performed on the returned file
Get(context.Context, Path) (files.File, error)
// Cat returns a reader for the file
// TODO: Remove in favour of Get (if we use Get on a file we still have reader directly, so..)
Cat(context.Context, Path) (Reader, error)
Get(context.Context, Path) (UnixfsFile, error)
// Ls returns the list of links in a directory
Ls(context.Context, Path) ([]*ipld.Link, error)

View File

@ -50,7 +50,7 @@ func (api *ObjectAPI) New(ctx context.Context, opts ...caopts.ObjectNewOption) (
n = ft.EmptyDirNode()
}
err = api.node.DAG.Add(ctx, n)
err = api.dag.Add(ctx, n)
if err != nil {
return nil, err
}
@ -121,7 +121,7 @@ func (api *ObjectAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.Obj
defer api.node.Blockstore.PinLock().Unlock()
}
err = api.node.DAG.Add(ctx, dagnode)
err = api.dag.Add(ctx, dagnode)
if err != nil {
return nil, err
}
@ -219,14 +219,14 @@ func (api *ObjectAPI) AddLink(ctx context.Context, base coreiface.Path, name str
createfunc = ft.EmptyDirNode
}
e := dagutils.NewDagEditor(basePb, api.node.DAG)
e := dagutils.NewDagEditor(basePb, api.dag)
err = e.InsertNodeAtPath(ctx, name, childNd, createfunc)
if err != nil {
return nil, err
}
nnode, err := e.Finalize(ctx, api.node.DAG)
nnode, err := e.Finalize(ctx, api.dag)
if err != nil {
return nil, err
}
@ -245,14 +245,14 @@ func (api *ObjectAPI) RmLink(ctx context.Context, base coreiface.Path, link stri
return nil, dag.ErrNotProtobuf
}
e := dagutils.NewDagEditor(basePb, api.node.DAG)
e := dagutils.NewDagEditor(basePb, api.dag)
err = e.RmLink(ctx, link)
if err != nil {
return nil, err
}
nnode, err := e.Finalize(ctx, api.node.DAG)
nnode, err := e.Finalize(ctx, api.dag)
if err != nil {
return nil, err
}
@ -289,7 +289,7 @@ func (api *ObjectAPI) patchData(ctx context.Context, path coreiface.Path, r io.R
}
pbnd.SetData(data)
err = api.node.DAG.Add(ctx, pbnd)
err = api.dag.Add(ctx, pbnd)
if err != nil {
return nil, err
}
@ -308,7 +308,7 @@ func (api *ObjectAPI) Diff(ctx context.Context, before coreiface.Path, after cor
return nil, err
}
changes, err := dagutils.Diff(ctx, api.node.DAG, beforeNd, afterNd)
changes, err := dagutils.Diff(ctx, api.dag, beforeNd, afterNd)
if err != nil {
return nil, err
}

View File

@ -7,12 +7,12 @@ import (
"github.com/ipfs/go-ipfs/core"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
uio "gx/ipfs/QmWE6Ftsk98cG2MTVgH4wJT8VP2nL9TuBkYTrz9GSqcsh5/go-unixfs/io"
ipfspath "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path"
"gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path/resolver"
"gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
uio "gx/ipfs/QmWE6Ftsk98cG2MTVgH4wJT8VP2nL9TuBkYTrz9GSqcsh5/go-unixfs/io"
ipld "gx/ipfs/QmdDXJs4axxefSPgK6Y1QhpJWKuDPnGJiqgq4uncb4rFHL/go-ipld-format"
ipfspath "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path"
"gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path/resolver"
)
// ResolveNode resolves the path `p` using Unixfs resolver, gets and returns the
@ -23,7 +23,7 @@ func (api *CoreAPI) ResolveNode(ctx context.Context, p coreiface.Path) (ipld.Nod
return nil, err
}
node, err := api.node.DAG.Get(ctx, rp.Cid())
node, err := api.dag.Get(ctx, rp.Cid())
if err != nil {
return nil, err
}
@ -57,7 +57,7 @@ func (api *CoreAPI) ResolvePath(ctx context.Context, p coreiface.Path) (coreifac
}
r := &resolver.Resolver{
DAG: api.node.DAG,
DAG: api.dag,
ResolveOnce: resolveOnce,
}

View File

@ -193,7 +193,7 @@ func (api *PinAPI) pinLsAll(typeStr string, ctx context.Context) ([]coreiface.Pi
if typeStr == "indirect" || typeStr == "all" {
set := cid.NewSet()
for _, k := range api.node.Pinning.RecursiveKeys() {
err := merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(api.node.DAG), k, set.Visit)
err := merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(api.dag), k, set.Visit)
if err != nil {
return nil, err
}

View File

@ -8,6 +8,8 @@ import (
gopath "path"
"time"
"github.com/ipfs/go-ipfs/core/coreapi/interface"
dag "gx/ipfs/QmVvNkTCx8V9Zei8xuTYTBdUXmbnDRS4iNuw1SztYyhQwQ/go-merkledag"
ft "gx/ipfs/QmWE6Ftsk98cG2MTVgH4wJT8VP2nL9TuBkYTrz9GSqcsh5/go-unixfs"
uio "gx/ipfs/QmWE6Ftsk98cG2MTVgH4wJT8VP2nL9TuBkYTrz9GSqcsh5/go-unixfs/io"
@ -95,6 +97,14 @@ func (d *ufsDirectory) NextFile() (files.File, error) {
return newUnixfsFile(d.ctx, d.dserv, nd, l.Name, d)
}
func (d *ufsDirectory) Size() (int64, error) {
return 0, files.ErrNotReader
}
func (d *ufsDirectory) Seek(offset int64, whence int) (int64, error) {
return 0, files.ErrNotReader
}
type ufsFile struct {
uio.DagReader
@ -122,7 +132,7 @@ func (f *ufsFile) Size() (int64, error) {
return int64(f.DagReader.Size()), nil
}
func newUnixfsDir(ctx context.Context, dserv ipld.DAGService, nd ipld.Node, name string, path string) (files.File, error) {
func newUnixfsDir(ctx context.Context, dserv ipld.DAGService, nd ipld.Node, name string, path string) (iface.UnixfsFile, error) {
dir, err := uio.NewDirectoryFromNode(dserv, nd)
if err != nil {
return nil, err
@ -153,7 +163,7 @@ func newUnixfsDir(ctx context.Context, dserv ipld.DAGService, nd ipld.Node, name
}, nil
}
func newUnixfsFile(ctx context.Context, dserv ipld.DAGService, nd ipld.Node, name string, parent files.File) (files.File, error) {
func newUnixfsFile(ctx context.Context, dserv ipld.DAGService, nd ipld.Node, name string, parent files.File) (iface.UnixfsFile, error) {
path := name
if parent != nil {
path = gopath.Join(parent.FullPath(), name)

View File

@ -133,31 +133,15 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.File, opts ...options
return coreiface.IpfsPath(nd.Cid()), nil
}
func (api *UnixfsAPI) Get(ctx context.Context, p coreiface.Path) (files.File, error) {
nd, err := api.core().ResolveNode(ctx, p)
func (api *UnixfsAPI) Get(ctx context.Context, p coreiface.Path) (coreiface.UnixfsFile, error) {
ses := api.core().getSession(ctx)
nd, err := ses.ResolveNode(ctx, p)
if err != nil {
return nil, err
}
return newUnixfsFile(ctx, api.node.DAG, nd, "", nil)
}
// Cat returns the data contained by an IPFS or IPNS object(s) at path `p`.
func (api *UnixfsAPI) Cat(ctx context.Context, p coreiface.Path) (coreiface.Reader, error) {
dget := api.node.DAG // TODO: use a session here once routing perf issues are resolved
dagnode, err := api.core().ResolveNode(ctx, p)
if err != nil {
return nil, err
}
r, err := uio.NewDagReader(ctx, dagnode, dget)
if err == uio.ErrIsDir {
return nil, coreiface.ErrIsDir
} else if err != nil {
return nil, err
}
return r, nil
return newUnixfsFile(ctx, ses.dag, nd, "", nil)
}
// Ls returns the contents of an IPFS or IPNS object(s) at path p, with the format:
@ -169,7 +153,7 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p coreiface.Path) ([]*ipld.Link, e
}
var ndlinks []*ipld.Link
dir, err := uio.NewDirectoryFromNode(api.node.DAG, dagnode)
dir, err := uio.NewDirectoryFromNode(api.dag, dagnode)
switch err {
case nil:
l, err := dir.Links(ctx)
@ -190,6 +174,6 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p coreiface.Path) ([]*ipld.Link, e
return links, nil
}
func (api *UnixfsAPI) core() coreiface.CoreAPI {
func (api *UnixfsAPI) core() *CoreAPI {
return (*CoreAPI)(api)
}

View File

@ -623,7 +623,7 @@ func TestAddHashOnly(t *testing.T) {
}
}
func TestCatEmptyFile(t *testing.T) {
func TestGetEmptyFile(t *testing.T) {
ctx := context.Background()
node, api, err := makeAPI(ctx)
if err != nil {
@ -640,7 +640,7 @@ func TestCatEmptyFile(t *testing.T) {
t.Fatal(err)
}
r, err := api.Unixfs().Cat(ctx, emptyFilePath)
r, err := api.Unixfs().Get(ctx, emptyFilePath)
if err != nil {
t.Fatal(err)
}
@ -655,7 +655,7 @@ func TestCatEmptyFile(t *testing.T) {
}
}
func TestCatDir(t *testing.T) {
func TestGetDir(t *testing.T) {
ctx := context.Background()
node, api, err := makeAPI(ctx)
if err != nil {
@ -677,13 +677,18 @@ func TestCatDir(t *testing.T) {
t.Fatalf("expected path %s, got: %s", emptyDir.Cid(), p.String())
}
_, err = api.Unixfs().Cat(ctx, coreiface.IpfsPath(emptyDir.Cid()))
if err != coreiface.ErrIsDir {
r, err := api.Unixfs().Get(ctx, coreiface.IpfsPath(emptyDir.Cid()))
if err != nil {
t.Error(err)
}
_, err = r.Read(make([]byte, 2))
if err != files.ErrNotReader {
t.Fatalf("expected ErrIsDir, got: %s", err)
}
}
func TestCatNonUnixfs(t *testing.T) {
func TestGetNonUnixfs(t *testing.T) {
ctx := context.Background()
node, api, err := makeAPI(ctx)
if err != nil {
@ -696,7 +701,7 @@ func TestCatNonUnixfs(t *testing.T) {
t.Error(err)
}
_, err = api.Unixfs().Cat(ctx, coreiface.IpfsPath(nd.Cid()))
_, err = api.Unixfs().Get(ctx, coreiface.IpfsPath(nd.Cid()))
if !strings.Contains(err.Error(), "proto: required field") {
t.Fatalf("expected protobuf error, got: %s", err)
}
@ -713,7 +718,7 @@ func TestCatOffline(t *testing.T) {
if err != nil {
t.Error(err)
}
_, err = api.Unixfs().Cat(ctx, p)
_, err = api.Unixfs().Get(ctx, p)
if err != coreiface.ErrOffline {
t.Fatalf("expected ErrOffline, got: %s", err)
}

View File

@ -178,19 +178,17 @@ func (i *gatewayHandler) getOrHeadHandler(ctx context.Context, w http.ResponseWr
return
}
dr, err := i.api.Unixfs().Cat(ctx, resolvedPath)
dir := false
switch err {
case nil:
// Cat() worked
defer dr.Close()
case coreiface.ErrIsDir:
dir = true
default:
dr, err := i.api.Unixfs().Get(ctx, resolvedPath)
if err != nil {
webError(w, "ipfs cat "+escapedURLPath, err, http.StatusNotFound)
return
}
dir := dr.IsDirectory()
if !dir {
defer dr.Close()
}
// Check etag send back to us
etag := "\"" + resolvedPath.Cid().String() + "\""
if r.Header.Get("If-None-Match") == etag || r.Header.Get("If-None-Match") == "W/"+etag {
@ -297,7 +295,7 @@ func (i *gatewayHandler) getOrHeadHandler(ctx context.Context, w http.ResponseWr
return
}
dr, err := i.api.Unixfs().Cat(ctx, coreiface.IpfsPath(ixnd.Cid()))
dr, err := i.api.Unixfs().Get(ctx, coreiface.IpfsPath(ixnd.Cid()))
if err != nil {
internalWebError(w, err)
return
@ -372,7 +370,7 @@ func (i *gatewayHandler) getOrHeadHandler(ctx context.Context, w http.ResponseWr
}
type sizeReadSeeker interface {
Size() uint64
Size() (int64, error)
io.ReadSeeker
}
@ -383,7 +381,7 @@ type sizeSeeker struct {
func (s *sizeSeeker) Seek(offset int64, whence int) (int64, error) {
if whence == io.SeekEnd && offset == 0 {
return int64(s.Size()), nil
return s.Size()
}
return s.sizeReadSeeker.Seek(offset, whence)

View File

@ -175,7 +175,7 @@ func TestIpfsStressRead(t *testing.T) {
errs <- err
}
read, err := api.Unixfs().Cat(nd.Context(), item)
read, err := api.Unixfs().Get(nd.Context(), item)
if err != nil {
errs <- err
}

View File

@ -147,7 +147,7 @@ func DirectAddCat(data []byte, conf testutil.LatencyConfig) error {
return err
}
readerCatted, err := catterApi.Unixfs().Cat(ctx, ap)
readerCatted, err := catterApi.Unixfs().Get(ctx, ap)
if err != nil {
return err
}

View File

@ -94,7 +94,7 @@ func benchCat(b *testing.B, data []byte, conf testutil.LatencyConfig) error {
}
b.StartTimer()
readerCatted, err := catterApi.Unixfs().Cat(ctx, ap)
readerCatted, err := catterApi.Unixfs().Get(ctx, ap)
if err != nil {
return err
}

View File

@ -126,7 +126,7 @@ func RunThreeLeggedCat(data []byte, conf testutil.LatencyConfig) error {
return err
}
readerCatted, err := catterApi.Unixfs().Cat(ctx, ap)
readerCatted, err := catterApi.Unixfs().Get(ctx, ap)
if err != nil {
return err
}