1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-10-25 02:16:56 +08:00
Files
kubo/merkledag/merkledag.go
Jeromy 8f0623255d replace imports with absolute path instead of using symlink
License: MIT
Signed-off-by: Jeromy <jeromyj@gmail.com>
2015-10-03 14:30:50 -07:00

321 lines
6.9 KiB
Go

// package merkledag implements the ipfs Merkle DAG datastructures.
package merkledag
import (
"fmt"
"sync"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
blocks "github.com/ipfs/go-ipfs/blocks"
key "github.com/ipfs/go-ipfs/blocks/key"
bserv "github.com/ipfs/go-ipfs/blockservice"
logging "github.com/ipfs/go-ipfs/vendor/QmXJkcEXB6C9h6Ytb6rrUTFU56Ro62zxgrbxTT3dgjQGA8/go-log"
)
var log = logging.Logger("merkledag")
var ErrNotFound = fmt.Errorf("merkledag: not found")
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
Add(*Node) (key.Key, error)
AddRecursive(*Node) error
Get(context.Context, key.Key) (*Node, error)
Remove(*Node) error
// GetDAG returns, in order, all the single leve child
// nodes of the passed in node.
GetDAG(context.Context, *Node) []NodeGetter
GetNodes(context.Context, []key.Key) []NodeGetter
Batch() *Batch
}
func NewDAGService(bs *bserv.BlockService) DAGService {
return &dagService{bs}
}
// dagService is an IPFS Merkle DAG service.
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
// TODO: should cache Nodes that are in memory, and be
// able to free some of them when vm pressure is high
type dagService struct {
Blocks *bserv.BlockService
}
// Add adds a node to the dagService, storing the block in the BlockService
func (n *dagService) Add(nd *Node) (key.Key, error) {
if n == nil { // FIXME remove this assertion. protect with constructor invariant
return "", fmt.Errorf("dagService is nil")
}
d, err := nd.Encoded(false)
if err != nil {
return "", err
}
b := new(blocks.Block)
b.Data = d
b.Multihash, err = nd.Multihash()
if err != nil {
return "", err
}
return n.Blocks.AddBlock(b)
}
func (n *dagService) Batch() *Batch {
return &Batch{ds: n, MaxSize: 8 * 1024 * 1024}
}
// AddRecursive adds the given node and all child nodes to the BlockService
func (n *dagService) AddRecursive(nd *Node) error {
_, err := n.Add(nd)
if err != nil {
log.Info("AddRecursive Error: %s\n", err)
return err
}
for _, link := range nd.Links {
if link.Node != nil {
err := n.AddRecursive(link.Node)
if err != nil {
return err
}
}
}
return nil
}
// Get retrieves a node from the dagService, fetching the block in the BlockService
func (n *dagService) Get(ctx context.Context, k key.Key) (*Node, error) {
if n == nil {
return nil, fmt.Errorf("dagService is nil")
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
b, err := n.Blocks.GetBlock(ctx, k)
if err != nil {
if err == bserv.ErrNotFound {
return nil, ErrNotFound
}
return nil, err
}
return Decoded(b.Data)
}
// Remove deletes the given node and all of its children from the BlockService
func (n *dagService) Remove(nd *Node) error {
for _, l := range nd.Links {
if l.Node != nil {
n.Remove(l.Node)
}
}
k, err := nd.Key()
if err != nil {
return err
}
return n.Blocks.DeleteBlock(k)
}
// FetchGraph asynchronously fetches all nodes that are children of the given
// node, and returns a channel that may be waited upon for the fetch to complete
func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{} {
log.Warning("Untested.")
var wg sync.WaitGroup
done := make(chan struct{})
for _, l := range root.Links {
wg.Add(1)
go func(lnk *Link) {
// Signal child is done on way out
defer wg.Done()
select {
case <-ctx.Done():
return
}
nd, err := lnk.GetNode(ctx, serv)
if err != nil {
log.Debug(err)
return
}
// Wait for children to finish
<-FetchGraph(ctx, nd, serv)
}(l)
}
go func() {
wg.Wait()
done <- struct{}{}
}()
return done
}
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
func FindLinks(links []key.Key, k key.Key, start int) []int {
var out []int
for i, lnk_k := range links[start:] {
if k == lnk_k {
out = append(out, i+start)
}
}
return out
}
// GetDAG will fill out all of the links of the given Node.
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
func (ds *dagService) GetDAG(ctx context.Context, root *Node) []NodeGetter {
var keys []key.Key
for _, lnk := range root.Links {
keys = append(keys, key.Key(lnk.Hash))
}
return ds.GetNodes(ctx, keys)
}
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
func (ds *dagService) GetNodes(ctx context.Context, keys []key.Key) []NodeGetter {
// Early out if no work to do
if len(keys) == 0 {
return nil
}
promises := make([]NodeGetter, len(keys))
sendChans := make([]chan<- *Node, len(keys))
for i := range keys {
promises[i], sendChans[i] = newNodePromise(ctx)
}
dedupedKeys := dedupeKeys(keys)
go func() {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
blkchan := ds.Blocks.GetBlocks(ctx, dedupedKeys)
for count := 0; count < len(keys); {
select {
case blk, ok := <-blkchan:
if !ok {
return
}
nd, err := Decoded(blk.Data)
if err != nil {
// NB: can happen with improperly formatted input data
log.Debug("Got back bad block!")
return
}
is := FindLinks(keys, blk.Key(), 0)
for _, i := range is {
count++
sendChans[i] <- nd
}
case <-ctx.Done():
return
}
}
}()
return promises
}
// Remove duplicates from a list of keys
func dedupeKeys(ks []key.Key) []key.Key {
kmap := make(map[key.Key]struct{})
var out []key.Key
for _, k := range ks {
if _, ok := kmap[k]; !ok {
kmap[k] = struct{}{}
out = append(out, k)
}
}
return out
}
func newNodePromise(ctx context.Context) (NodeGetter, chan<- *Node) {
ch := make(chan *Node, 1)
return &nodePromise{
recv: ch,
ctx: ctx,
}, ch
}
type nodePromise struct {
cache *Node
recv <-chan *Node
ctx context.Context
}
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
type NodeGetter interface {
Get(context.Context) (*Node, error)
}
func (np *nodePromise) Get(ctx context.Context) (*Node, error) {
if np.cache != nil {
return np.cache, nil
}
select {
case blk := <-np.recv:
np.cache = blk
case <-np.ctx.Done():
return nil, np.ctx.Err()
case <-ctx.Done():
return nil, ctx.Err()
}
return np.cache, nil
}
type Batch struct {
ds *dagService
blocks []*blocks.Block
size int
MaxSize int
}
func (t *Batch) Add(nd *Node) (key.Key, error) {
d, err := nd.Encoded(false)
if err != nil {
return "", err
}
b := new(blocks.Block)
b.Data = d
b.Multihash, err = nd.Multihash()
if err != nil {
return "", err
}
k := key.Key(b.Multihash)
t.blocks = append(t.blocks, b)
t.size += len(b.Data)
if t.size > t.MaxSize {
return k, t.Commit()
}
return k, nil
}
func (t *Batch) Commit() error {
_, err := t.ds.Blocks.AddBlocks(t.blocks)
t.blocks = nil
t.size = 0
return err
}