mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-26 23:53:19 +08:00
coreapi: dag: Batching interface
License: MIT Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
This commit is contained in:
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"sync"
|
||||||
|
|
||||||
gopath "path"
|
gopath "path"
|
||||||
|
|
||||||
@ -17,34 +18,25 @@ import (
|
|||||||
|
|
||||||
type DagAPI CoreAPI
|
type DagAPI CoreAPI
|
||||||
|
|
||||||
|
type dagBatch struct {
|
||||||
|
api *DagAPI
|
||||||
|
toPut []ipld.Node
|
||||||
|
|
||||||
|
lk sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
// Put inserts data using specified format and input encoding. Unless used with
|
// Put inserts data using specified format and input encoding. Unless used with
|
||||||
// `WithCodes` or `WithHash`, the defaults "dag-cbor" and "sha256" are used.
|
// `WithCodes` or `WithHash`, the defaults "dag-cbor" and "sha256" are used.
|
||||||
// Returns the path of the inserted data.
|
// Returns the path of the inserted data.
|
||||||
func (api *DagAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.DagPutOption) (coreiface.ResolvedPath, error) {
|
func (api *DagAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.DagPutOption) (coreiface.ResolvedPath, error) {
|
||||||
settings, err := caopts.DagPutOptions(opts...)
|
nd, err := getNode(src, opts...)
|
||||||
|
|
||||||
|
err = api.node.DAG.Add(ctx, nd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
codec, ok := cid.CodecToStr[settings.Codec]
|
return coreiface.IpldPath(nd.Cid()), nil
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("invalid codec %d", settings.Codec)
|
|
||||||
}
|
|
||||||
|
|
||||||
nds, err := coredag.ParseInputs(settings.InputEnc, codec, src, settings.MhType, settings.MhLength)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if len(nds) == 0 {
|
|
||||||
return nil, fmt.Errorf("no node returned from ParseInputs")
|
|
||||||
}
|
|
||||||
|
|
||||||
err = api.node.DAG.Add(ctx, nds[0])
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return coreiface.IpldPath(nds[0].Cid()), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get resolves `path` using Unixfs resolver, returns the resolved Node.
|
// Get resolves `path` using Unixfs resolver, returns the resolved Node.
|
||||||
@ -75,6 +67,58 @@ func (api *DagAPI) Tree(ctx context.Context, p coreiface.Path, opts ...caopts.Da
|
|||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (api *DagAPI) Batch(ctx context.Context) coreiface.DagBatch {
|
||||||
|
return &dagBatch{api: api}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *dagBatch) Put(ctx context.Context, src io.Reader, opts ...caopts.DagPutOption) (coreiface.ResolvedPath, error) {
|
||||||
|
nd, err := getNode(src, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
b.lk.Lock()
|
||||||
|
b.toPut = append(b.toPut, nd)
|
||||||
|
b.lk.Unlock()
|
||||||
|
|
||||||
|
return coreiface.IpldPath(nd.Cid()), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *dagBatch) Commit(ctx context.Context) error {
|
||||||
|
b.lk.Lock()
|
||||||
|
defer b.lk.Unlock()
|
||||||
|
defer func() {
|
||||||
|
b.toPut = nil
|
||||||
|
}()
|
||||||
|
|
||||||
|
return b.api.node.DAG.AddMany(ctx, b.toPut)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getNode(src io.Reader, opts ...caopts.DagPutOption) (ipld.Node, error) {
|
||||||
|
settings, err := caopts.DagPutOptions(opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
codec, ok := cid.CodecToStr[settings.Codec]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("invalid codec %d", settings.Codec)
|
||||||
|
}
|
||||||
|
|
||||||
|
nds, err := coredag.ParseInputs(settings.InputEnc, codec, src, settings.MhType, settings.MhLength)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(nds) == 0 {
|
||||||
|
return nil, fmt.Errorf("no node returned from ParseInputs")
|
||||||
|
}
|
||||||
|
if len(nds) != 1 {
|
||||||
|
return nil, fmt.Errorf("got more that one node from ParseInputs")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nds[0], nil
|
||||||
|
}
|
||||||
|
|
||||||
func (api *DagAPI) core() coreiface.CoreAPI {
|
func (api *DagAPI) core() coreiface.CoreAPI {
|
||||||
return (*CoreAPI)(api)
|
return (*CoreAPI)(api)
|
||||||
}
|
}
|
||||||
|
@ -4,21 +4,36 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
options "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
|
"github.com/ipfs/go-ipfs/core/coreapi/interface/options"
|
||||||
|
|
||||||
ipld "gx/ipfs/QmZtNq8dArGfnpCZfx2pUNY7UcjGhVp5qqwQ4hH6mpTMRQ/go-ipld-format"
|
ipld "gx/ipfs/QmZtNq8dArGfnpCZfx2pUNY7UcjGhVp5qqwQ4hH6mpTMRQ/go-ipld-format"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DagAPI specifies the interface to IPLD
|
// DagOps groups operations that can be batched together
|
||||||
type DagAPI interface {
|
type DagOps interface {
|
||||||
// Put inserts data using specified format and input encoding.
|
// Put inserts data using specified format and input encoding.
|
||||||
// Unless used with WithCodec or WithHash, the defaults "dag-cbor" and
|
// Unless used with WithCodec or WithHash, the defaults "dag-cbor" and
|
||||||
// "sha256" are used.
|
// "sha256" are used.
|
||||||
Put(ctx context.Context, src io.Reader, opts ...options.DagPutOption) (ResolvedPath, error)
|
Put(ctx context.Context, src io.Reader, opts ...options.DagPutOption) (ResolvedPath, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DagBatch is the batching version of DagAPI. All implementations of DagBatch
|
||||||
|
// should be threadsafe
|
||||||
|
type DagBatch interface {
|
||||||
|
DagOps
|
||||||
|
|
||||||
|
Commit(ctx context.Context) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// DagAPI specifies the interface to IPLD
|
||||||
|
type DagAPI interface {
|
||||||
|
DagOps
|
||||||
|
|
||||||
// Get attempts to resolve and get the node specified by the path
|
// Get attempts to resolve and get the node specified by the path
|
||||||
Get(ctx context.Context, path Path) (ipld.Node, error)
|
Get(ctx context.Context, path Path) (ipld.Node, error)
|
||||||
|
|
||||||
// Tree returns list of paths within a node specified by the path.
|
// Tree returns list of paths within a node specified by the path.
|
||||||
Tree(ctx context.Context, path Path, opts ...options.DagTreeOption) ([]Path, error)
|
Tree(ctx context.Context, path Path, opts ...options.DagTreeOption) ([]Path, error)
|
||||||
|
|
||||||
|
Batch(ctx context.Context) DagBatch
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user