mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-26 23:53:19 +08:00
Merge pull request #2792 from ipfs-filestore/kevina/filestore-bits
Stable bits from Pull Request #2634
This commit is contained in:
@ -20,47 +20,47 @@ type Block interface {
|
||||
}
|
||||
|
||||
// Block is a singular block of data in ipfs
|
||||
type RawBlock struct {
|
||||
type BasicBlock struct {
|
||||
multihash mh.Multihash
|
||||
data []byte
|
||||
}
|
||||
|
||||
// NewBlock creates a Block object from opaque data. It will hash the data.
|
||||
func NewBlock(data []byte) *RawBlock {
|
||||
return &RawBlock{data: data, multihash: u.Hash(data)}
|
||||
func NewBlock(data []byte) *BasicBlock {
|
||||
return &BasicBlock{data: data, multihash: u.Hash(data)}
|
||||
}
|
||||
|
||||
// NewBlockWithHash creates a new block when the hash of the data
|
||||
// is already known, this is used to save time in situations where
|
||||
// we are able to be confident that the data is correct
|
||||
func NewBlockWithHash(data []byte, h mh.Multihash) (*RawBlock, error) {
|
||||
func NewBlockWithHash(data []byte, h mh.Multihash) (*BasicBlock, error) {
|
||||
if u.Debug {
|
||||
chk := u.Hash(data)
|
||||
if string(chk) != string(h) {
|
||||
return nil, errors.New("Data did not match given hash!")
|
||||
}
|
||||
}
|
||||
return &RawBlock{data: data, multihash: h}, nil
|
||||
return &BasicBlock{data: data, multihash: h}, nil
|
||||
}
|
||||
|
||||
func (b *RawBlock) Multihash() mh.Multihash {
|
||||
func (b *BasicBlock) Multihash() mh.Multihash {
|
||||
return b.multihash
|
||||
}
|
||||
|
||||
func (b *RawBlock) Data() []byte {
|
||||
func (b *BasicBlock) Data() []byte {
|
||||
return b.data
|
||||
}
|
||||
|
||||
// Key returns the block's Multihash as a Key value.
|
||||
func (b *RawBlock) Key() key.Key {
|
||||
func (b *BasicBlock) Key() key.Key {
|
||||
return key.Key(b.multihash)
|
||||
}
|
||||
|
||||
func (b *RawBlock) String() string {
|
||||
func (b *BasicBlock) String() string {
|
||||
return fmt.Sprintf("[Block %s]", b.Key())
|
||||
}
|
||||
|
||||
func (b *RawBlock) Loggable() map[string]interface{} {
|
||||
func (b *BasicBlock) Loggable() map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"block": b.Key().String(),
|
||||
}
|
||||
|
@ -141,11 +141,13 @@ You can now refer to the added file in a gateway, like so:
|
||||
outChan := make(chan interface{}, 8)
|
||||
res.SetOutput((<-chan interface{})(outChan))
|
||||
|
||||
fileAdder, err := coreunix.NewAdder(req.Context(), n, outChan)
|
||||
fileAdder, err := coreunix.NewAdder(req.Context(), n.Pinning, n.Blockstore, n.DAG)
|
||||
if err != nil {
|
||||
res.SetError(err, cmds.ErrNormal)
|
||||
return
|
||||
}
|
||||
|
||||
fileAdder.Out = outChan
|
||||
fileAdder.Chunker = chunker
|
||||
fileAdder.Progress = progress
|
||||
fileAdder.Hidden = hidden
|
||||
|
@ -67,42 +67,46 @@ type AddedObject struct {
|
||||
Bytes int64 `json:",omitempty"`
|
||||
}
|
||||
|
||||
func NewAdder(ctx context.Context, n *core.IpfsNode, out chan interface{}) (*Adder, error) {
|
||||
mr, err := mfs.NewRoot(ctx, n.DAG, newDirNode(), nil)
|
||||
func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCBlockstore, ds dag.DAGService) (*Adder, error) {
|
||||
mr, err := mfs.NewRoot(ctx, ds, newDirNode(), nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Adder{
|
||||
mr: mr,
|
||||
ctx: ctx,
|
||||
node: n,
|
||||
out: out,
|
||||
Progress: false,
|
||||
Hidden: true,
|
||||
Pin: true,
|
||||
Trickle: false,
|
||||
Wrap: false,
|
||||
Chunker: "",
|
||||
mr: mr,
|
||||
ctx: ctx,
|
||||
pinning: p,
|
||||
blockstore: bs,
|
||||
dagService: ds,
|
||||
Progress: false,
|
||||
Hidden: true,
|
||||
Pin: true,
|
||||
Trickle: false,
|
||||
Wrap: false,
|
||||
Chunker: "",
|
||||
}, nil
|
||||
|
||||
}
|
||||
|
||||
// Internal structure for holding the switches passed to the `add` call
|
||||
type Adder struct {
|
||||
ctx context.Context
|
||||
node *core.IpfsNode
|
||||
out chan interface{}
|
||||
Progress bool
|
||||
Hidden bool
|
||||
Pin bool
|
||||
Trickle bool
|
||||
Silent bool
|
||||
Wrap bool
|
||||
Chunker string
|
||||
root *dag.Node
|
||||
mr *mfs.Root
|
||||
unlocker bs.Unlocker
|
||||
tempRoot key.Key
|
||||
ctx context.Context
|
||||
pinning pin.Pinner
|
||||
blockstore bstore.GCBlockstore
|
||||
dagService dag.DAGService
|
||||
Out chan interface{}
|
||||
Progress bool
|
||||
Hidden bool
|
||||
Pin bool
|
||||
Trickle bool
|
||||
Silent bool
|
||||
Wrap bool
|
||||
Chunker string
|
||||
root *dag.Node
|
||||
mr *mfs.Root
|
||||
unlocker bs.Unlocker
|
||||
tempRoot key.Key
|
||||
}
|
||||
|
||||
// Perform the actual add & pin locally, outputting results to reader
|
||||
@ -114,12 +118,12 @@ func (adder Adder) add(reader io.Reader) (*dag.Node, error) {
|
||||
|
||||
if adder.Trickle {
|
||||
return importer.BuildTrickleDagFromReader(
|
||||
adder.node.DAG,
|
||||
adder.dagService,
|
||||
chnk,
|
||||
)
|
||||
}
|
||||
return importer.BuildDagFromReader(
|
||||
adder.node.DAG,
|
||||
adder.dagService,
|
||||
chnk,
|
||||
)
|
||||
}
|
||||
@ -137,7 +141,7 @@ func (adder *Adder) RootNode() (*dag.Node, error) {
|
||||
|
||||
// if not wrapping, AND one root file, use that hash as root.
|
||||
if !adder.Wrap && len(root.Links) == 1 {
|
||||
root, err = root.Links[0].GetNode(adder.ctx, adder.node.DAG)
|
||||
root, err = root.Links[0].GetNode(adder.ctx, adder.dagService)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -156,21 +160,21 @@ func (adder *Adder) PinRoot() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
rnk, err := adder.node.DAG.Add(root)
|
||||
rnk, err := adder.dagService.Add(root)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if adder.tempRoot != "" {
|
||||
err := adder.node.Pinning.Unpin(adder.ctx, adder.tempRoot, true)
|
||||
err := adder.pinning.Unpin(adder.ctx, adder.tempRoot, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
adder.tempRoot = rnk
|
||||
}
|
||||
|
||||
adder.node.Pinning.PinWithMode(rnk, pin.Recursive)
|
||||
return adder.node.Pinning.Flush()
|
||||
adder.pinning.PinWithMode(rnk, pin.Recursive)
|
||||
return adder.pinning.Flush()
|
||||
}
|
||||
|
||||
func (adder *Adder) Finalize() (*dag.Node, error) {
|
||||
@ -237,7 +241,7 @@ func (adder *Adder) outputDirs(path string, fs mfs.FSNode) error {
|
||||
}
|
||||
}
|
||||
|
||||
return outputDagnode(adder.out, path, nd)
|
||||
return outputDagnode(adder.Out, path, nd)
|
||||
}
|
||||
|
||||
// Add builds a merkledag from the a reader, pinning all objects to the local
|
||||
@ -245,7 +249,7 @@ func (adder *Adder) outputDirs(path string, fs mfs.FSNode) error {
|
||||
func Add(n *core.IpfsNode, r io.Reader) (string, error) {
|
||||
defer n.Blockstore.PinLock().Unlock()
|
||||
|
||||
fileAdder, err := NewAdder(n.Context(), n, nil)
|
||||
fileAdder, err := NewAdder(n.Context(), n.Pinning, n.Blockstore, n.DAG)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@ -277,7 +281,7 @@ func AddR(n *core.IpfsNode, root string) (key string, err error) {
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
fileAdder, err := NewAdder(n.Context(), n, nil)
|
||||
fileAdder, err := NewAdder(n.Context(), n.Pinning, n.Blockstore, n.DAG)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@ -306,7 +310,7 @@ func AddR(n *core.IpfsNode, root string) (key string, err error) {
|
||||
// the directory, and and error if any.
|
||||
func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, *dag.Node, error) {
|
||||
file := files.NewReaderFile(filename, filename, ioutil.NopCloser(r), nil)
|
||||
fileAdder, err := NewAdder(n.Context(), n, nil)
|
||||
fileAdder, err := NewAdder(n.Context(), n.Pinning, n.Blockstore, n.DAG)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
@ -355,14 +359,14 @@ func (adder *Adder) addNode(node *dag.Node, path string) error {
|
||||
}
|
||||
|
||||
if !adder.Silent {
|
||||
return outputDagnode(adder.out, path, node)
|
||||
return outputDagnode(adder.Out, path, node)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Add the given file while respecting the adder.
|
||||
func (adder *Adder) AddFile(file files.File) error {
|
||||
adder.unlocker = adder.node.Blockstore.PinLock()
|
||||
adder.unlocker = adder.blockstore.PinLock()
|
||||
defer func() {
|
||||
adder.unlocker.Unlock()
|
||||
}()
|
||||
@ -388,7 +392,7 @@ func (adder *Adder) addFile(file files.File) error {
|
||||
}
|
||||
|
||||
dagnode := &dag.Node{Data: sdata}
|
||||
_, err = adder.node.DAG.Add(dagnode)
|
||||
_, err = adder.dagService.Add(dagnode)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -401,7 +405,7 @@ func (adder *Adder) addFile(file files.File) error {
|
||||
// progress updates to the client (over the output channel)
|
||||
var reader io.Reader = file
|
||||
if adder.Progress {
|
||||
reader = &progressReader{file: file, out: adder.out}
|
||||
reader = &progressReader{file: file, out: adder.Out}
|
||||
}
|
||||
|
||||
dagnode, err := adder.add(reader)
|
||||
@ -445,14 +449,14 @@ func (adder *Adder) addDir(dir files.File) error {
|
||||
}
|
||||
|
||||
func (adder *Adder) maybePauseForGC() error {
|
||||
if adder.node.Blockstore.GCRequested() {
|
||||
if adder.blockstore.GCRequested() {
|
||||
err := adder.PinRoot()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
adder.unlocker.Unlock()
|
||||
adder.unlocker = adder.node.Blockstore.PinLock()
|
||||
adder.unlocker = adder.blockstore.PinLock()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -54,10 +54,11 @@ func TestAddGCLive(t *testing.T) {
|
||||
|
||||
errs := make(chan error)
|
||||
out := make(chan interface{})
|
||||
adder, err := NewAdder(context.Background(), node, out)
|
||||
adder, err := NewAdder(context.Background(), node.Pinning, node.Blockstore, node.DAG)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
adder.Out = out
|
||||
|
||||
dataa := ioutil.NopCloser(bytes.NewBufferString("testfileA"))
|
||||
rfa := files.NewReaderFile("a", "a", dataa, nil)
|
||||
|
Reference in New Issue
Block a user