mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-28 17:03:58 +08:00
parallelize batch flushing
1. Modern storage devices (i.e., SSDs) tend to be highly parallel. 2. Allows us to read and write at the same time (avoids pausing while flushing). fixes https://github.com/ipfs/go-ipfs/issues/898#issuecomment-331849064 License: MIT Signed-off-by: Steven Allen <steven@stebalien.com>
This commit is contained in:
98
merkledag/batch.go
Normal file
98
merkledag/batch.go
Normal file
@ -0,0 +1,98 @@
|
||||
package merkledag
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
|
||||
cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
|
||||
node "gx/ipfs/QmPN7cwmpcc4DWXb4KTB9dNAJgjuPY69h3npsMfhRrQL9c/go-ipld-format"
|
||||
blocks "gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format"
|
||||
)
|
||||
|
||||
// ParallelBatchCommits is the number of batch commits that can be in-flight before blocking.
|
||||
// TODO: Experiment with multiple datastores, storage devices, and CPUs to find
|
||||
// the right value/formula.
|
||||
var ParallelBatchCommits = runtime.NumCPU() * 2
|
||||
|
||||
// Batch is a buffer for batching adds to a dag.
|
||||
type Batch struct {
|
||||
ds *dagService
|
||||
|
||||
activeCommits int
|
||||
commitError error
|
||||
commitResults chan error
|
||||
|
||||
blocks []blocks.Block
|
||||
size int
|
||||
|
||||
MaxSize int
|
||||
MaxBlocks int
|
||||
}
|
||||
|
||||
func (t *Batch) processResults() {
|
||||
for t.activeCommits > 0 && t.commitError == nil {
|
||||
select {
|
||||
case err := <-t.commitResults:
|
||||
t.activeCommits--
|
||||
if err != nil {
|
||||
t.commitError = err
|
||||
}
|
||||
default:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Batch) asyncCommit() {
|
||||
if len(t.blocks) == 0 || t.commitError != nil {
|
||||
return
|
||||
}
|
||||
if t.activeCommits >= ParallelBatchCommits {
|
||||
err := <-t.commitResults
|
||||
t.activeCommits--
|
||||
|
||||
if err != nil {
|
||||
t.commitError = err
|
||||
return
|
||||
}
|
||||
}
|
||||
go func(b []blocks.Block) {
|
||||
_, err := t.ds.Blocks.AddBlocks(b)
|
||||
t.commitResults <- err
|
||||
}(t.blocks)
|
||||
|
||||
t.activeCommits++
|
||||
t.blocks = nil
|
||||
t.size = 0
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Add adds a node to the batch and commits the batch if necessary.
|
||||
func (t *Batch) Add(nd node.Node) (*cid.Cid, error) {
|
||||
// Not strictly necessary but allows us to catch errors early.
|
||||
t.processResults()
|
||||
if t.commitError != nil {
|
||||
return nil, t.commitError
|
||||
}
|
||||
|
||||
t.blocks = append(t.blocks, nd)
|
||||
t.size += len(nd.RawData())
|
||||
if t.size > t.MaxSize || len(t.blocks) > t.MaxBlocks {
|
||||
t.asyncCommit()
|
||||
}
|
||||
return nd.Cid(), t.commitError
|
||||
}
|
||||
|
||||
// Commit commits batched nodes.
|
||||
func (t *Batch) Commit() error {
|
||||
t.asyncCommit()
|
||||
for t.activeCommits > 0 && t.commitError == nil {
|
||||
err := <-t.commitResults
|
||||
t.activeCommits--
|
||||
if err != nil {
|
||||
t.commitError = err
|
||||
}
|
||||
}
|
||||
|
||||
return t.commitError
|
||||
}
|
@ -11,7 +11,6 @@ import (
|
||||
|
||||
cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
|
||||
node "gx/ipfs/QmPN7cwmpcc4DWXb4KTB9dNAJgjuPY69h3npsMfhRrQL9c/go-ipld-format"
|
||||
blocks "gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format"
|
||||
ipldcbor "gx/ipfs/QmWCs8kMecJwCPK8JThue8TjgM2ieJ2HjTLDu7Cv2NEmZi/go-ipld-cbor"
|
||||
)
|
||||
|
||||
@ -76,6 +75,7 @@ func (n *dagService) Add(nd node.Node) (*cid.Cid, error) {
|
||||
func (n *dagService) Batch() *Batch {
|
||||
return &Batch{
|
||||
ds: n,
|
||||
commitResults: make(chan error, ParallelBatchCommits),
|
||||
MaxSize: 8 << 20,
|
||||
|
||||
// By default, only batch up to 128 nodes at a time.
|
||||
@ -389,31 +389,6 @@ func (np *nodePromise) Get(ctx context.Context) (node.Node, error) {
|
||||
}
|
||||
}
|
||||
|
||||
type Batch struct {
|
||||
ds *dagService
|
||||
|
||||
blocks []blocks.Block
|
||||
size int
|
||||
MaxSize int
|
||||
MaxBlocks int
|
||||
}
|
||||
|
||||
func (t *Batch) Add(nd node.Node) (*cid.Cid, error) {
|
||||
t.blocks = append(t.blocks, nd)
|
||||
t.size += len(nd.RawData())
|
||||
if t.size > t.MaxSize || len(t.blocks) > t.MaxBlocks {
|
||||
return nd.Cid(), t.Commit()
|
||||
}
|
||||
return nd.Cid(), nil
|
||||
}
|
||||
|
||||
func (t *Batch) Commit() error {
|
||||
_, err := t.ds.Blocks.AddBlocks(t.blocks)
|
||||
t.blocks = nil
|
||||
t.size = 0
|
||||
return err
|
||||
}
|
||||
|
||||
type GetLinks func(context.Context, *cid.Cid) ([]*node.Link, error)
|
||||
|
||||
// EnumerateChildren will walk the dag below the given root node and add all
|
||||
|
Reference in New Issue
Block a user