mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-25 06:58:18 +08:00
use an option type to simplify concurrency
License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com>
This commit is contained in:
@ -3,6 +3,7 @@ package merkledag
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
blocks "github.com/ipfs/go-ipfs/blocks"
|
blocks "github.com/ipfs/go-ipfs/blocks"
|
||||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||||
@ -24,7 +25,7 @@ type DAGService interface {
|
|||||||
|
|
||||||
// GetDAG returns, in order, all the single leve child
|
// GetDAG returns, in order, all the single leve child
|
||||||
// nodes of the passed in node.
|
// nodes of the passed in node.
|
||||||
GetMany(context.Context, []key.Key) (<-chan *Node, <-chan error)
|
GetMany(context.Context, []key.Key) <-chan *NodeOption
|
||||||
|
|
||||||
Batch() *Batch
|
Batch() *Batch
|
||||||
}
|
}
|
||||||
@ -145,9 +146,13 @@ func FindLinks(links []key.Key, k key.Key, start int) []int {
|
|||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) (<-chan *Node, <-chan error) {
|
type NodeOption struct {
|
||||||
out := make(chan *Node, len(keys))
|
Node *Node
|
||||||
errs := make(chan error, 1)
|
Err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) <-chan *NodeOption {
|
||||||
|
out := make(chan *NodeOption, len(keys))
|
||||||
blocks := ds.Blocks.GetBlocks(ctx, keys)
|
blocks := ds.Blocks.GetBlocks(ctx, keys)
|
||||||
var count int
|
var count int
|
||||||
|
|
||||||
@ -158,27 +163,27 @@ func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) (<-chan *Node
|
|||||||
case b, ok := <-blocks:
|
case b, ok := <-blocks:
|
||||||
if !ok {
|
if !ok {
|
||||||
if count != len(keys) {
|
if count != len(keys) {
|
||||||
errs <- fmt.Errorf("failed to fetch all nodes")
|
out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
nd, err := Decoded(b.Data)
|
nd, err := Decoded(b.Data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errs <- err
|
out <- &NodeOption{Err: err}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// buffered, no need to select
|
// buffered, no need to select
|
||||||
out <- nd
|
out <- &NodeOption{Node: nd}
|
||||||
count++
|
count++
|
||||||
|
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
errs <- ctx.Err()
|
out <- &NodeOption{Err: ctx.Err()}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return out, errs
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetDAG will fill out all of the links of the given Node.
|
// GetDAG will fill out all of the links of the given Node.
|
||||||
@ -213,15 +218,22 @@ func GetNodes(ctx context.Context, ds DAGService, keys []key.Key) []NodeGetter {
|
|||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
nodechan, errchan := ds.GetMany(ctx, dedupedKeys)
|
nodechan := ds.GetMany(ctx, dedupedKeys)
|
||||||
|
|
||||||
for count := 0; count < len(keys); {
|
for count := 0; count < len(keys); {
|
||||||
select {
|
select {
|
||||||
case nd, ok := <-nodechan:
|
case opt, ok := <-nodechan:
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if opt.Err != nil {
|
||||||
|
log.Error("error fetching: ", opt.Err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
nd := opt.Node
|
||||||
|
|
||||||
k, err := nd.Key()
|
k, err := nd.Key()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Failed to get node key: ", err)
|
log.Error("Failed to get node key: ", err)
|
||||||
@ -233,9 +245,6 @@ func GetNodes(ctx context.Context, ds DAGService, keys []key.Key) []NodeGetter {
|
|||||||
count++
|
count++
|
||||||
sendChans[i] <- nd
|
sendChans[i] <- nd
|
||||||
}
|
}
|
||||||
case err := <-errchan:
|
|
||||||
log.Error("error fetching: ", err)
|
|
||||||
return
|
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -356,24 +365,30 @@ func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, set key.K
|
|||||||
|
|
||||||
func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, set key.KeySet) error {
|
func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, set key.KeySet) error {
|
||||||
toprocess := make(chan []key.Key, 8)
|
toprocess := make(chan []key.Key, 8)
|
||||||
nodes := make(chan *Node, 8)
|
nodes := make(chan *NodeOption, 8)
|
||||||
errs := make(chan error, 1)
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
defer close(toprocess)
|
defer close(toprocess)
|
||||||
|
|
||||||
go fetchNodes(ctx, ds, toprocess, nodes, errs)
|
go fetchNodes(ctx, ds, toprocess, nodes)
|
||||||
|
|
||||||
nodes <- root
|
nodes <- &NodeOption{Node: root}
|
||||||
live := 1
|
live := 1
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case nd, ok := <-nodes:
|
case opt, ok := <-nodes:
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if opt.Err != nil {
|
||||||
|
return opt.Err
|
||||||
|
}
|
||||||
|
|
||||||
|
nd := opt.Node
|
||||||
|
|
||||||
// a node has been fetched
|
// a node has been fetched
|
||||||
live--
|
live--
|
||||||
|
|
||||||
@ -398,38 +413,35 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, set
|
|||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case err := <-errs:
|
|
||||||
return err
|
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *Node, errs chan<- error) {
|
func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *NodeOption) {
|
||||||
defer close(out)
|
var wg sync.WaitGroup
|
||||||
|
defer func() {
|
||||||
|
// wait for all 'get' calls to complete so we don't accidentally send
|
||||||
|
// on a closed channel
|
||||||
|
wg.Wait()
|
||||||
|
close(out)
|
||||||
|
}()
|
||||||
|
|
||||||
get := func(ks []key.Key) {
|
get := func(ks []key.Key) {
|
||||||
nodes, errch := ds.GetMany(ctx, ks)
|
defer wg.Done()
|
||||||
for {
|
nodes := ds.GetMany(ctx, ks)
|
||||||
|
for opt := range nodes {
|
||||||
select {
|
select {
|
||||||
case nd, ok := <-nodes:
|
case out <- opt:
|
||||||
if !ok {
|
case <-ctx.Done():
|
||||||
return
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case out <- nd:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
case err := <-errch:
|
|
||||||
errs <- err
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for ks := range in {
|
for ks := range in {
|
||||||
|
wg.Add(1)
|
||||||
go get(ks)
|
go get(ks)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user