1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-30 18:13:54 +08:00

Merge pull request #3629 from ipfs/feat/filestore0

Implement basic filestore 'no-copy' functionality
This commit is contained in:
Jeromy Johnson
2017-03-06 00:53:47 -08:00
committed by GitHub
26 changed files with 881 additions and 37 deletions

View File

@ -163,7 +163,11 @@ func (bs *blockstore) Has(k *cid.Cid) (bool, error) {
}
func (s *blockstore) DeleteBlock(k *cid.Cid) error {
return s.datastore.Delete(dshelp.CidToDsKey(k))
err := s.datastore.Delete(dshelp.CidToDsKey(k))
if err == ds.ErrNotFound {
return ErrNotFound
}
return err
}
// AllKeysChan runs a query for keys from the blockstore.

View File

@ -19,10 +19,10 @@ type File interface {
// they are not directories
io.ReadCloser
// FileName returns a filename path associated with this file
// FileName returns a filename associated with this file
FileName() string
// FullPath returns the full path in the os associated with this file
// FullPath returns the full path used when adding this file
FullPath() string
// IsDirectory returns true if the File is a directory (and therefore
@ -57,6 +57,6 @@ type SizeFile interface {
}
type FileInfo interface {
FullPath() string
AbsPath() string
Stat() os.FileInfo
}

View File

@ -14,6 +14,7 @@ const (
applicationDirectory = "application/x-directory"
applicationSymlink = "application/symlink"
applicationFile = "application/octet-stream"
contentTypeHeader = "Content-Type"
)
@ -34,7 +35,8 @@ func NewFileFromPart(part *multipart.Part) (File, error) {
}
contentType := part.Header.Get(contentTypeHeader)
if contentType == applicationSymlink {
switch contentType {
case applicationSymlink:
out, err := ioutil.ReadAll(part)
if err != nil {
return nil, err
@ -44,6 +46,13 @@ func NewFileFromPart(part *multipart.Part) (File, error) {
Target: string(out),
name: f.FileName(),
}, nil
case applicationFile:
return &ReaderFile{
reader: part,
filename: f.FileName(),
abspath: part.Header.Get("abspath"),
fullpath: f.FullPath(),
}, nil
}
var err error

View File

@ -4,6 +4,7 @@ import (
"errors"
"io"
"os"
"path/filepath"
)
// ReaderFile is a implementation of File created from an `io.Reader`.
@ -11,12 +12,22 @@ import (
type ReaderFile struct {
filename string
fullpath string
abspath string
reader io.ReadCloser
stat os.FileInfo
}
func NewReaderFile(filename, path string, reader io.ReadCloser, stat os.FileInfo) *ReaderFile {
return &ReaderFile{filename, path, reader, stat}
return &ReaderFile{filename, path, path, reader, stat}
}
func NewReaderPathFile(filename, path string, reader io.ReadCloser, stat os.FileInfo) (*ReaderFile, error) {
abspath, err := filepath.Abs(path)
if err != nil {
return nil, err
}
return &ReaderFile{filename, path, abspath, reader, stat}, nil
}
func (f *ReaderFile) IsDirectory() bool {
@ -35,6 +46,10 @@ func (f *ReaderFile) FullPath() string {
return f.fullpath
}
func (f *ReaderFile) AbsPath() string {
return f.abspath
}
func (f *ReaderFile) Read(p []byte) (int, error) {
return f.reader.Read(p)
}

View File

@ -23,13 +23,14 @@ type serialFile struct {
}
func NewSerialFile(name, path string, hidden bool, stat os.FileInfo) (File, error) {
switch mode := stat.Mode(); {
case mode.IsRegular():
file, err := os.Open(path)
if err != nil {
return nil, err
}
return NewReaderFile(name, path, file, stat), nil
return NewReaderPathFile(name, path, file, stat)
case mode.IsDir():
// for directories, stat all of the contents first, so we know what files to
// open when NextFile() is called

View File

@ -95,6 +95,9 @@ func (mfr *MultiFileReader) Read(buf []byte) (written int, err error) {
header.Set("Content-Disposition", fmt.Sprintf("file; filename=\"%s\"", filename))
header.Set("Content-Type", contentType)
if rf, ok := file.(*files.ReaderFile); ok {
header.Set("abspath", rf.AbsPath())
}
_, err := mfr.mpWriter.CreatePart(header)
if err != nil {

View File

@ -12,6 +12,7 @@ import (
bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
bserv "github.com/ipfs/go-ipfs/blockservice"
offline "github.com/ipfs/go-ipfs/exchange/offline"
filestore "github.com/ipfs/go-ipfs/filestore"
dag "github.com/ipfs/go-ipfs/merkledag"
path "github.com/ipfs/go-ipfs/path"
pin "github.com/ipfs/go-ipfs/pin"
@ -166,8 +167,8 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
TempErrFunc: isTooManyFDError,
}
var err error
bs := bstore.NewBlockstore(rds)
opts := bstore.DefaultCacheOpts()
conf, err := n.Repo.Config()
if err != nil {
@ -184,7 +185,14 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
return err
}
n.Blockstore = bstore.NewGCBlockstore(cbs, bstore.NewGCLocker())
n.BaseBlocks = cbs
n.GCLocker = bstore.NewGCLocker()
n.Blockstore = bstore.NewGCBlockstore(cbs, n.GCLocker)
if conf.Experimental.FilestoreEnabled {
n.Filestore = filestore.NewFilestore(bs, n.Repo.FileManager())
n.Blockstore = bstore.NewGCBlockstore(n.Filestore, n.GCLocker)
}
rcfg, err := n.Repo.Config()
if err != nil {

View File

@ -7,6 +7,7 @@ import (
"github.com/ipfs/go-ipfs/core/coreunix"
"gx/ipfs/QmeWjRodbcZFKe5tMN7poEx3izym6osrLSnTLf9UjJZBbs/pb"
bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
blockservice "github.com/ipfs/go-ipfs/blockservice"
cmds "github.com/ipfs/go-ipfs/commands"
files "github.com/ipfs/go-ipfs/commands/files"
@ -23,16 +24,18 @@ import (
var ErrDepthLimitExceeded = fmt.Errorf("depth limit exceeded")
const (
quietOptionName = "quiet"
silentOptionName = "silent"
progressOptionName = "progress"
trickleOptionName = "trickle"
wrapOptionName = "wrap-with-directory"
hiddenOptionName = "hidden"
onlyHashOptionName = "only-hash"
chunkerOptionName = "chunker"
pinOptionName = "pin"
rawLeavesOptionName = "raw-leaves"
quietOptionName = "quiet"
silentOptionName = "silent"
progressOptionName = "progress"
trickleOptionName = "trickle"
wrapOptionName = "wrap-with-directory"
hiddenOptionName = "hidden"
onlyHashOptionName = "only-hash"
chunkerOptionName = "chunker"
pinOptionName = "pin"
rawLeavesOptionName = "raw-leaves"
noCopyOptionName = "nocopy"
fstoreCacheOptionName = "fscache"
)
var AddCmd = &cmds.Command{
@ -78,6 +81,8 @@ You can now refer to the added file in a gateway, like so:
cmds.StringOption(chunkerOptionName, "s", "Chunking algorithm to use."),
cmds.BoolOption(pinOptionName, "Pin this object when adding.").Default(true),
cmds.BoolOption(rawLeavesOptionName, "Use raw blocks for leaf nodes. (experimental)"),
cmds.BoolOption(noCopyOptionName, "Add the file using filestore. (experimental)"),
cmds.BoolOption(fstoreCacheOptionName, "Check the filestore for pre-existing blocks. (experimental)"),
},
PreRun: func(req cmds.Request) error {
quiet, _, _ := req.Option(quietOptionName).Bool()
@ -139,7 +144,18 @@ You can now refer to the added file in a gateway, like so:
silent, _, _ := req.Option(silentOptionName).Bool()
chunker, _, _ := req.Option(chunkerOptionName).String()
dopin, _, _ := req.Option(pinOptionName).Bool()
rawblks, _, _ := req.Option(rawLeavesOptionName).Bool()
rawblks, rbset, _ := req.Option(rawLeavesOptionName).Bool()
nocopy, _, _ := req.Option(noCopyOptionName).Bool()
fscache, _, _ := req.Option(fstoreCacheOptionName).Bool()
if nocopy && !rbset {
rawblks = true
}
if nocopy && !rawblks {
res.SetError(fmt.Errorf("nocopy option requires '--raw-leaves' to be enabled as well"), cmds.ErrNormal)
return
}
if hash {
nilnode, err := core.NewNode(n.Context(), &core.BuildCfg{
@ -154,14 +170,20 @@ You can now refer to the added file in a gateway, like so:
n = nilnode
}
dserv := n.DAG
addblockstore := n.Blockstore
if !(fscache || nocopy) {
addblockstore = bstore.NewGCBlockstore(n.BaseBlocks, n.GCLocker)
}
exch := n.Exchange
local, _, _ := req.Option("local").Bool()
if local {
offlineexch := offline.Exchange(n.Blockstore)
bserv := blockservice.New(n.Blockstore, offlineexch)
dserv = dag.NewDAGService(bserv)
exch = offline.Exchange(addblockstore)
}
bserv := blockservice.New(addblockstore, exch)
dserv := dag.NewDAGService(bserv)
outChan := make(chan interface{}, 8)
res.SetOutput((<-chan interface{})(outChan))
@ -180,6 +202,7 @@ You can now refer to the added file in a gateway, like so:
fileAdder.Pin = dopin
fileAdder.Silent = silent
fileAdder.RawLeaves = rawblks
fileAdder.NoCopy = nocopy
if hash {
md := dagtest.Mock()

View File

@ -28,6 +28,7 @@ import (
bitswap "github.com/ipfs/go-ipfs/exchange/bitswap"
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
rp "github.com/ipfs/go-ipfs/exchange/reprovide"
filestore "github.com/ipfs/go-ipfs/filestore"
mount "github.com/ipfs/go-ipfs/fuse/mount"
merkledag "github.com/ipfs/go-ipfs/merkledag"
mfs "github.com/ipfs/go-ipfs/mfs"
@ -110,6 +111,9 @@ type IpfsNode struct {
// Services
Peerstore pstore.Peerstore // storage for other Peer instances
Blockstore bstore.GCBlockstore // the block store (lower level)
Filestore *filestore.Filestore // the filestore blockstore
BaseBlocks bstore.Blockstore // the raw blockstore, no filestore wrapping
GCLocker bstore.GCLocker // the locker used to protect the blockstore during gc
Blocks bserv.BlockService // the block service, get/add blocks.
DAG merkledag.DAGService // the merkle dag service, get/add objects.
Resolver *path.Resolver // the path resolution system

View File

@ -103,6 +103,7 @@ type Adder struct {
RawLeaves bool
Silent bool
Wrap bool
NoCopy bool
Chunker string
root node.Node
mr *mfs.Root
@ -124,6 +125,7 @@ func (adder Adder) add(reader io.Reader) (node.Node, error) {
Dagserv: adder.dagService,
RawLeaves: adder.RawLeaves,
Maxlinks: ihelper.DefaultLinksPerBlock,
NoCopy: adder.NoCopy,
}
if adder.Trickle {

View File

@ -193,6 +193,7 @@ func testAddWPosInfo(t *testing.T, rawLeaves bool) {
adder.Out = make(chan interface{})
adder.Progress = true
adder.RawLeaves = rawLeaves
adder.NoCopy = true
data := make([]byte, 5*1024*1024)
rand.New(rand.NewSource(2)).Read(data) // Rand.Read never returns an error
@ -210,12 +211,18 @@ func testAddWPosInfo(t *testing.T, rawLeaves bool) {
for _ = range adder.Out {
}
if bs.countAtOffsetZero != 2 {
t.Fatal("expected 2 blocks with an offset at zero (one root and one leafh), got", bs.countAtOffsetZero)
exp := 0
nonOffZero := 0
if rawLeaves {
exp = 1
nonOffZero = 19
}
if bs.countAtOffsetNonZero != 19 {
if bs.countAtOffsetZero != exp {
t.Fatalf("expected %d blocks with an offset at zero (one root and one leafh), got %d", exp, bs.countAtOffsetZero)
}
if bs.countAtOffsetNonZero != nonOffZero {
// note: the exact number will depend on the size and the sharding algo. used
t.Fatal("expected 19 blocks with an offset > 0, got", bs.countAtOffsetNonZero)
t.Fatalf("expected %d blocks with an offset > 0, got %d", nonOffZero, bs.countAtOffsetNonZero)
}
}

193
filestore/filestore.go Normal file
View File

@ -0,0 +1,193 @@
package filestore
import (
"context"
"github.com/ipfs/go-ipfs/blocks"
"github.com/ipfs/go-ipfs/blocks/blockstore"
posinfo "github.com/ipfs/go-ipfs/thirdparty/posinfo"
dsq "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/query"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
)
var log = logging.Logger("filestore")
type Filestore struct {
fm *FileManager
bs blockstore.Blockstore
}
func NewFilestore(bs blockstore.Blockstore, fm *FileManager) *Filestore {
return &Filestore{fm, bs}
}
func (f *Filestore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) {
ctx, cancel := context.WithCancel(ctx)
a, err := f.bs.AllKeysChan(ctx)
if err != nil {
return nil, err
}
out := make(chan *cid.Cid, dsq.KeysOnlyBufSize)
go func() {
defer cancel()
defer close(out)
var done bool
for !done {
select {
case c, ok := <-a:
if !ok {
done = true
continue
}
select {
case out <- c:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
// Can't do these at the same time because the abstractions around
// leveldb make us query leveldb for both operations. We apparently
// cant query leveldb concurrently
b, err := f.fm.AllKeysChan(ctx)
if err != nil {
log.Error("error querying filestore: ", err)
return
}
done = false
for !done {
select {
case c, ok := <-b:
if !ok {
done = true
continue
}
select {
case out <- c:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return out, nil
}
func (f *Filestore) DeleteBlock(c *cid.Cid) error {
err1 := f.bs.DeleteBlock(c)
if err1 != nil && err1 != blockstore.ErrNotFound {
return err1
}
err2 := f.fm.DeleteBlock(c)
// if we successfully removed something from the blockstore, but the
// filestore didnt have it, return success
switch err2 {
case nil:
return nil
case blockstore.ErrNotFound:
if err1 == blockstore.ErrNotFound {
return blockstore.ErrNotFound
}
return nil
default:
return err2
}
}
func (f *Filestore) Get(c *cid.Cid) (blocks.Block, error) {
blk, err := f.bs.Get(c)
switch err {
default:
return nil, err
case nil:
return blk, nil
case blockstore.ErrNotFound:
// try filestore
}
return f.fm.Get(c)
}
func (f *Filestore) Has(c *cid.Cid) (bool, error) {
has, err := f.bs.Has(c)
if err != nil {
return false, err
}
if has {
return true, nil
}
return f.fm.Has(c)
}
func (f *Filestore) Put(b blocks.Block) error {
has, err := f.Has(b.Cid())
if err != nil {
return err
}
if has {
return nil
}
switch b := b.(type) {
case *posinfo.FilestoreNode:
return f.fm.Put(b)
default:
return f.bs.Put(b)
}
}
func (f *Filestore) PutMany(bs []blocks.Block) error {
var normals []blocks.Block
var fstores []*posinfo.FilestoreNode
for _, b := range bs {
has, err := f.Has(b.Cid())
if err != nil {
return err
}
if has {
continue
}
switch b := b.(type) {
case *posinfo.FilestoreNode:
fstores = append(fstores, b)
default:
normals = append(normals, b)
}
}
if len(normals) > 0 {
err := f.bs.PutMany(normals)
if err != nil {
return err
}
}
if len(fstores) > 0 {
err := f.fm.PutMany(fstores)
if err != nil {
return err
}
}
return nil
}
var _ blockstore.Blockstore = (*Filestore)(nil)

164
filestore/filestore_test.go Normal file
View File

@ -0,0 +1,164 @@
package filestore
import (
"bytes"
"context"
"io/ioutil"
"math/rand"
"testing"
"github.com/ipfs/go-ipfs/blocks/blockstore"
dag "github.com/ipfs/go-ipfs/merkledag"
posinfo "github.com/ipfs/go-ipfs/thirdparty/posinfo"
ds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
)
func newTestFilestore(t *testing.T) (string, *Filestore) {
mds := ds.NewMapDatastore()
testdir, err := ioutil.TempDir("", "filestore-test")
if err != nil {
t.Fatal(err)
}
fm := NewFileManager(mds, testdir)
bs := blockstore.NewBlockstore(mds)
fstore := NewFilestore(bs, fm)
return testdir, fstore
}
func makeFile(dir string, data []byte) (string, error) {
f, err := ioutil.TempFile(dir, "file")
if err != nil {
return "", err
}
_, err = f.Write(data)
if err != nil {
return "", err
}
return f.Name(), nil
}
func TestBasicFilestore(t *testing.T) {
dir, fs := newTestFilestore(t)
buf := make([]byte, 1000)
rand.Read(buf)
fname, err := makeFile(dir, buf)
if err != nil {
t.Fatal(err)
}
var cids []*cid.Cid
for i := 0; i < 100; i++ {
n := &posinfo.FilestoreNode{
PosInfo: &posinfo.PosInfo{
FullPath: fname,
Offset: uint64(i * 10),
},
Node: dag.NewRawNode(buf[i*10 : (i+1)*10]),
}
err := fs.Put(n)
if err != nil {
t.Fatal(err)
}
cids = append(cids, n.Node.Cid())
}
for i, c := range cids {
blk, err := fs.Get(c)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(blk.RawData(), buf[i*10:(i+1)*10]) {
t.Fatal("data didnt match on the way out")
}
}
kch, err := fs.AllKeysChan(context.Background())
if err != nil {
t.Fatal(err)
}
out := make(map[string]struct{})
for c := range kch {
out[c.KeyString()] = struct{}{}
}
if len(out) != len(cids) {
t.Fatal("mismatch in number of entries")
}
for _, c := range cids {
if _, ok := out[c.KeyString()]; !ok {
t.Fatal("missing cid: ", c)
}
}
}
func randomFileAdd(t *testing.T, fs *Filestore, dir string, size int) (string, []*cid.Cid) {
buf := make([]byte, size)
rand.Read(buf)
fname, err := makeFile(dir, buf)
if err != nil {
t.Fatal(err)
}
var out []*cid.Cid
for i := 0; i < size/10; i++ {
n := &posinfo.FilestoreNode{
PosInfo: &posinfo.PosInfo{
FullPath: fname,
Offset: uint64(i * 10),
},
Node: dag.NewRawNode(buf[i*10 : (i+1)*10]),
}
err := fs.Put(n)
if err != nil {
t.Fatal(err)
}
out = append(out, n.Cid())
}
return fname, out
}
func TestDeletes(t *testing.T) {
dir, fs := newTestFilestore(t)
_, cids := randomFileAdd(t, fs, dir, 100)
todelete := cids[:4]
for _, c := range todelete {
err := fs.DeleteBlock(c)
if err != nil {
t.Fatal(err)
}
}
deleted := make(map[string]bool)
for _, c := range todelete {
_, err := fs.Get(c)
if err != blockstore.ErrNotFound {
t.Fatal("expected blockstore not found error")
}
deleted[c.KeyString()] = true
}
keys, err := fs.AllKeysChan(context.Background())
if err != nil {
t.Fatal(err)
}
for c := range keys {
if deleted[c.KeyString()] {
t.Fatal("shouldnt have reference to this key anymore")
}
}
}

210
filestore/fsrefstore.go Normal file
View File

@ -0,0 +1,210 @@
package filestore
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"github.com/ipfs/go-ipfs/blocks"
"github.com/ipfs/go-ipfs/blocks/blockstore"
pb "github.com/ipfs/go-ipfs/filestore/pb"
dshelp "github.com/ipfs/go-ipfs/thirdparty/ds-help"
posinfo "github.com/ipfs/go-ipfs/thirdparty/posinfo"
ds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
dsns "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/namespace"
dsq "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/query"
proto "gx/ipfs/QmT6n4mspWYEya864BhCUJEgyxiRfmiSY9ruQwTUNpRKaM/protobuf/proto"
cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
)
var FilestorePrefix = ds.NewKey("filestore")
type FileManager struct {
ds ds.Batching
root string
}
type CorruptReferenceError struct {
Err error
}
func (c CorruptReferenceError) Error() string {
return c.Err.Error()
}
func NewFileManager(ds ds.Batching, root string) *FileManager {
return &FileManager{dsns.Wrap(ds, FilestorePrefix), root}
}
func (f *FileManager) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) {
q := dsq.Query{KeysOnly: true}
q.Prefix = FilestorePrefix.String()
res, err := f.ds.Query(q)
if err != nil {
return nil, err
}
out := make(chan *cid.Cid, dsq.KeysOnlyBufSize)
go func() {
defer close(out)
for {
v, ok := res.NextSync()
if !ok {
return
}
k := ds.RawKey(v.Key)
c, err := dshelp.DsKeyToCid(k)
if err != nil {
log.Error("decoding cid from filestore: %s", err)
continue
}
select {
case out <- c:
case <-ctx.Done():
return
}
}
}()
return out, nil
}
func (f *FileManager) DeleteBlock(c *cid.Cid) error {
err := f.ds.Delete(dshelp.CidToDsKey(c))
if err == ds.ErrNotFound {
return blockstore.ErrNotFound
}
return err
}
func (f *FileManager) Get(c *cid.Cid) (blocks.Block, error) {
dobj, err := f.getDataObj(c)
if err != nil {
return nil, err
}
out, err := f.readDataObj(c, dobj)
if err != nil {
return nil, err
}
return blocks.NewBlockWithCid(out, c)
}
func (f *FileManager) getDataObj(c *cid.Cid) (*pb.DataObj, error) {
o, err := f.ds.Get(dshelp.CidToDsKey(c))
switch err {
case ds.ErrNotFound:
return nil, blockstore.ErrNotFound
default:
return nil, err
case nil:
//
}
data, ok := o.([]byte)
if !ok {
return nil, fmt.Errorf("stored filestore dataobj was not a []byte")
}
var dobj pb.DataObj
if err := proto.Unmarshal(data, &dobj); err != nil {
return nil, err
}
return &dobj, nil
}
// reads and verifies the block
func (f *FileManager) readDataObj(c *cid.Cid, d *pb.DataObj) ([]byte, error) {
p := filepath.FromSlash(d.GetFilePath())
abspath := filepath.Join(f.root, p)
fi, err := os.Open(abspath)
if err != nil {
return nil, &CorruptReferenceError{err}
}
defer fi.Close()
_, err = fi.Seek(int64(d.GetOffset()), os.SEEK_SET)
if err != nil {
return nil, &CorruptReferenceError{err}
}
outbuf := make([]byte, d.GetSize_())
_, err = io.ReadFull(fi, outbuf)
if err != nil {
return nil, &CorruptReferenceError{err}
}
outcid, err := c.Prefix().Sum(outbuf)
if err != nil {
return nil, err
}
if !c.Equals(outcid) {
return nil, &CorruptReferenceError{fmt.Errorf("data in file did not match. %s offset %d", d.GetFilePath(), d.GetOffset())}
}
return outbuf, nil
}
func (f *FileManager) Has(c *cid.Cid) (bool, error) {
// NOTE: interesting thing to consider. Has doesnt validate the data.
// So the data on disk could be invalid, and we could think we have it.
dsk := dshelp.CidToDsKey(c)
return f.ds.Has(dsk)
}
type putter interface {
Put(ds.Key, interface{}) error
}
func (f *FileManager) Put(b *posinfo.FilestoreNode) error {
return f.putTo(b, f.ds)
}
func (f *FileManager) putTo(b *posinfo.FilestoreNode, to putter) error {
var dobj pb.DataObj
if !filepath.HasPrefix(b.PosInfo.FullPath, f.root) {
return fmt.Errorf("cannot add filestore references outside ipfs root")
}
p, err := filepath.Rel(f.root, b.PosInfo.FullPath)
if err != nil {
return err
}
dobj.FilePath = proto.String(filepath.ToSlash(p))
dobj.Offset = proto.Uint64(b.PosInfo.Offset)
dobj.Size_ = proto.Uint64(uint64(len(b.RawData())))
data, err := proto.Marshal(&dobj)
if err != nil {
return err
}
return to.Put(dshelp.CidToDsKey(b.Cid()), data)
}
func (f *FileManager) PutMany(bs []*posinfo.FilestoreNode) error {
batch, err := f.ds.Batch()
if err != nil {
return err
}
for _, b := range bs {
if err := f.putTo(b, batch); err != nil {
return err
}
}
return batch.Commit()
}

10
filestore/pb/Makefile Normal file
View File

@ -0,0 +1,10 @@
PB = $(wildcard *.proto)
GO = $(PB:.proto=.pb.go)
all: $(GO)
%.pb.go: %.proto
protoc --gogo_out=. $<
clean:
rm *.pb.go

View File

@ -0,0 +1,59 @@
// Code generated by protoc-gen-gogo.
// source: dataobj.proto
// DO NOT EDIT!
/*
Package datastore_pb is a generated protocol buffer package.
It is generated from these files:
dataobj.proto
It has these top-level messages:
DataObj
*/
package datastore_pb
import proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
type DataObj struct {
FilePath *string `protobuf:"bytes,1,opt,name=FilePath" json:"FilePath,omitempty"`
Offset *uint64 `protobuf:"varint,2,opt,name=Offset" json:"Offset,omitempty"`
Size_ *uint64 `protobuf:"varint,3,opt,name=Size" json:"Size,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *DataObj) Reset() { *m = DataObj{} }
func (m *DataObj) String() string { return proto.CompactTextString(m) }
func (*DataObj) ProtoMessage() {}
func (m *DataObj) GetFilePath() string {
if m != nil && m.FilePath != nil {
return *m.FilePath
}
return ""
}
func (m *DataObj) GetOffset() uint64 {
if m != nil && m.Offset != nil {
return *m.Offset
}
return 0
}
func (m *DataObj) GetSize_() uint64 {
if m != nil && m.Size_ != nil {
return *m.Size_
}
return 0
}
func init() {
proto.RegisterType((*DataObj)(nil), "datastore.pb.DataObj")
}

View File

@ -0,0 +1,7 @@
package datastore.pb;
message DataObj {
optional string FilePath = 1;
optional uint64 Offset = 2;
optional uint64 Size = 3;
}

View File

@ -35,6 +35,10 @@ type DagBuilderParams struct {
// DAGService to write blocks to (required)
Dagserv dag.DAGService
// NoCopy signals to the chunker that it should track fileinfo for
// filestore adds
NoCopy bool
}
// Generate a new DagBuilderHelper from the given params, which data source comes
@ -47,8 +51,8 @@ func (dbp *DagBuilderParams) New(spl chunk.Splitter) *DagBuilderHelper {
maxlinks: dbp.Maxlinks,
batch: dbp.Dagserv.Batch(),
}
if fi, ok := spl.Reader().(files.FileInfo); ok {
db.fullPath = fi.FullPath()
if fi, ok := spl.Reader().(files.FileInfo); dbp.NoCopy && ok {
db.fullPath = fi.AbsPath()
db.stat = fi.Stat()
}
return db
@ -146,7 +150,7 @@ func (db *DagBuilderHelper) GetNextDataNode() (*UnixfsNode, error) {
}
func (db *DagBuilderHelper) SetPosInfo(node *UnixfsNode, offset uint64) {
if db.stat != nil {
if db.fullPath != "" {
node.SetPosInfo(offset, db.fullPath, db.stat)
}
}

View File

@ -160,10 +160,12 @@ func (n *UnixfsNode) GetDagNode() (node.Node, error) {
}
if n.posInfo != nil {
return &pi.FilestoreNode{
Node: nd,
PosInfo: n.posInfo,
}, nil
if rn, ok := nd.(*dag.RawNode); ok {
return &pi.FilestoreNode{
Node: rn,
PosInfo: n.posInfo,
}, nil
}
}
return nd, nil

View File

@ -51,7 +51,7 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin.
if !gcs.Has(k) {
err := bs.DeleteBlock(k)
if err != nil {
log.Debugf("Error removing key from blockstore: %s", err)
log.Errorf("Error removing key from blockstore: %s", err)
return
}
select {

View File

@ -30,7 +30,8 @@ type Config struct {
API API // local node's API settings
Swarm SwarmConfig
Reprovider Reprovider
Reprovider Reprovider
Experimental Experiments
}
const (

View File

@ -0,0 +1,5 @@
package config
type Experiments struct {
FilestoreEnabled bool
}

View File

@ -11,6 +11,7 @@ import (
"strings"
"sync"
filestore "github.com/ipfs/go-ipfs/filestore"
keystore "github.com/ipfs/go-ipfs/keystore"
repo "github.com/ipfs/go-ipfs/repo"
"github.com/ipfs/go-ipfs/repo/common"
@ -100,6 +101,7 @@ type FSRepo struct {
config *config.Config
ds repo.Datastore
keystore keystore.Keystore
filemgr *filestore.FileManager
}
var _ repo.Repo = (*FSRepo)(nil)
@ -172,6 +174,10 @@ func open(repoPath string) (repo.Repo, error) {
return nil, err
}
if r.config.Experimental.FilestoreEnabled {
r.filemgr = filestore.NewFileManager(r.ds, filepath.Dir(r.path))
}
keepLocked = true
return r, nil
}
@ -316,6 +322,10 @@ func (r *FSRepo) Keystore() keystore.Keystore {
return r.keystore
}
func (r *FSRepo) Path() string {
return r.path
}
// SetAPIAddr writes the API Addr to the /api file.
func (r *FSRepo) SetAPIAddr(addr ma.Multiaddr) error {
f, err := os.Create(filepath.Join(r.path, apiFile))
@ -424,6 +434,10 @@ func (r *FSRepo) Config() (*config.Config, error) {
return r.config, nil
}
func (r *FSRepo) FileManager() *filestore.FileManager {
return r.filemgr
}
// setConfigUnsynced is for private use.
func (r *FSRepo) setConfigUnsynced(updated *config.Config) error {
configFilename, err := config.Filename(r.path)

View File

@ -3,6 +3,7 @@ package repo
import (
"errors"
filestore "github.com/ipfs/go-ipfs/filestore"
keystore "github.com/ipfs/go-ipfs/keystore"
"github.com/ipfs/go-ipfs/repo/config"
@ -48,3 +49,5 @@ func (m *Mock) Keystore() keystore.Keystore { return nil }
func (m *Mock) SwarmKey() ([]byte, error) {
return nil, nil
}
func (m *Mock) FileManager() *filestore.FileManager { return nil }

View File

@ -4,6 +4,7 @@ import (
"errors"
"io"
filestore "github.com/ipfs/go-ipfs/filestore"
keystore "github.com/ipfs/go-ipfs/keystore"
config "github.com/ipfs/go-ipfs/repo/config"
@ -27,6 +28,8 @@ type Repo interface {
Keystore() keystore.Keystore
FileManager() *filestore.FileManager
// SetAPIAddr sets the API address in the repo.
SetAPIAddr(addr ma.Multiaddr) error

View File

@ -0,0 +1,93 @@
#!/bin/sh
#
# Copyright (c) 2017 Jeromy Johnson
# MIT Licensed; see the LICENSE file in this repository.
#
test_description="Test out the filestore nocopy functionality"
. lib/test-lib.sh
test_expect_success "create a dataset" '
random-files -seed=483 -depth=3 -dirs=4 -files=6 -filesize=1000000 somedir
'
EXPHASH="QmW4JLyeTxEWGwa4mkE9mHzdtAkyhMX2ToGFEKZNjCiJud"
get_repo_size() {
disk_usage "$IPFS_PATH"
}
assert_repo_size_less_than() {
expval="$1"
test_expect_success "check repo size" '
test "$(get_repo_size)" -lt "$expval" ||
(get_repo_size && false)
'
}
assert_repo_size_greater_than() {
expval="$1"
test_expect_success "check repo size" '
test "$(get_repo_size)" -gt "$expval" ||
(get_repo_size && false)
'
}
test_filestore_adds() {
test_expect_success "nocopy add succeeds" '
HASH=$(ipfs add --raw-leaves --nocopy -r -q somedir | tail -n1)
'
test_expect_success "nocopy add has right hash" '
test "$HASH" = "$EXPHASH"
'
assert_repo_size_less_than 1000000
test_expect_success "normal add with fscache doesnt duplicate data" '
HASH2=$(ipfs add --raw-leaves --fscache -r -q somedir | tail -n1)
'
assert_repo_size_less_than 1000000
test_expect_success "normal add without fscache duplicates data" '
HASH2=$(ipfs add --raw-leaves -r -q somedir | tail -n1)
'
assert_repo_size_greater_than 1000000
}
init_ipfs_filestore() {
test_expect_success "clean up old node" '
rm -rf "$IPFS_PATH" mountdir ipfs ipns
'
test_init_ipfs
test_expect_success "enable filestore config setting" '
ipfs config --json Experimental.FilestoreEnabled true
'
}
init_ipfs_filestore
test_filestore_adds
echo "WORKING DIR"
echo "IPFS PATH = " $IPFS_PATH
pwd
init_ipfs_filestore
test_launch_ipfs_daemon
test_filestore_adds
test_kill_ipfs_daemon
test_done