mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-02 03:28:25 +08:00
convert DAGService to an interface
This commit is contained in:
@ -58,7 +58,7 @@ type IpfsNode struct {
|
|||||||
Blocks *bserv.BlockService
|
Blocks *bserv.BlockService
|
||||||
|
|
||||||
// the merkle dag service, get/add objects.
|
// the merkle dag service, get/add objects.
|
||||||
DAG *merkledag.DAGService
|
DAG merkledag.DAGService
|
||||||
|
|
||||||
// the path resolution system
|
// the path resolution system
|
||||||
Resolver *path.Resolver
|
Resolver *path.Resolver
|
||||||
@ -161,7 +161,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
dag := &merkledag.DAGService{Blocks: bs}
|
dag := merkledag.NewDAGService(bs)
|
||||||
ns := namesys.NewNameSystem(route)
|
ns := namesys.NewNameSystem(route)
|
||||||
p, err := pin.LoadPinner(d, dag)
|
p, err := pin.LoadPinner(d, dag)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -49,7 +49,7 @@ func NewMockNode() (*IpfsNode, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
nd.DAG = &mdag.DAGService{Blocks: bserv}
|
nd.DAG = mdag.NewDAGService(bserv)
|
||||||
|
|
||||||
// Namespace resolver
|
// Namespace resolver
|
||||||
nd.Namesys = nsys.NewNameSystem(dht)
|
nd.Namesys = nsys.NewNameSystem(dht)
|
||||||
|
@ -62,7 +62,7 @@ func MakeLink(n *Node) (*Link, error) {
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Link) GetNode(serv *DAGService) (*Node, error) {
|
func (l *Link) GetNode(serv DAGService) (*Node, error) {
|
||||||
if l.Node != nil {
|
if l.Node != nil {
|
||||||
return l.Node, nil
|
return l.Node, nil
|
||||||
}
|
}
|
||||||
@ -151,20 +151,32 @@ func (n *Node) Key() (u.Key, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// DAGService is an IPFS Merkle DAG service.
|
// DAGService is an IPFS Merkle DAG service.
|
||||||
|
type DAGService interface {
|
||||||
|
Add(*Node) (u.Key, error)
|
||||||
|
AddRecursive(*Node) error
|
||||||
|
Get(u.Key) (*Node, error)
|
||||||
|
Remove(*Node) error
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDAGService(bs *bserv.BlockService) DAGService {
|
||||||
|
return &dagService{bs}
|
||||||
|
}
|
||||||
|
|
||||||
|
// dagService is an IPFS Merkle DAG service.
|
||||||
// - the root is virtual (like a forest)
|
// - the root is virtual (like a forest)
|
||||||
// - stores nodes' data in a BlockService
|
// - stores nodes' data in a BlockService
|
||||||
// TODO: should cache Nodes that are in memory, and be
|
// TODO: should cache Nodes that are in memory, and be
|
||||||
// able to free some of them when vm pressure is high
|
// able to free some of them when vm pressure is high
|
||||||
type DAGService struct {
|
type dagService struct {
|
||||||
Blocks *bserv.BlockService
|
Blocks *bserv.BlockService
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add adds a node to the DAGService, storing the block in the BlockService
|
// Add adds a node to the dagService, storing the block in the BlockService
|
||||||
func (n *DAGService) Add(nd *Node) (u.Key, error) {
|
func (n *dagService) Add(nd *Node) (u.Key, error) {
|
||||||
k, _ := nd.Key()
|
k, _ := nd.Key()
|
||||||
log.Debug("DagService Add [%s]", k)
|
log.Debug("DagService Add [%s]", k)
|
||||||
if n == nil {
|
if n == nil {
|
||||||
return "", fmt.Errorf("DAGService is nil")
|
return "", fmt.Errorf("dagService is nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
d, err := nd.Encoded(false)
|
d, err := nd.Encoded(false)
|
||||||
@ -182,7 +194,7 @@ func (n *DAGService) Add(nd *Node) (u.Key, error) {
|
|||||||
return n.Blocks.AddBlock(b)
|
return n.Blocks.AddBlock(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *DAGService) AddRecursive(nd *Node) error {
|
func (n *dagService) AddRecursive(nd *Node) error {
|
||||||
_, err := n.Add(nd)
|
_, err := n.Add(nd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Info("AddRecursive Error: %s\n", err)
|
log.Info("AddRecursive Error: %s\n", err)
|
||||||
@ -201,10 +213,10 @@ func (n *DAGService) AddRecursive(nd *Node) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get retrieves a node from the DAGService, fetching the block in the BlockService
|
// Get retrieves a node from the dagService, fetching the block in the BlockService
|
||||||
func (n *DAGService) Get(k u.Key) (*Node, error) {
|
func (n *dagService) Get(k u.Key) (*Node, error) {
|
||||||
if n == nil {
|
if n == nil {
|
||||||
return nil, fmt.Errorf("DAGService is nil")
|
return nil, fmt.Errorf("dagService is nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
|
ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
|
||||||
@ -216,7 +228,7 @@ func (n *DAGService) Get(k u.Key) (*Node, error) {
|
|||||||
return Decoded(b.Data)
|
return Decoded(b.Data)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *DAGService) Remove(nd *Node) error {
|
func (n *dagService) Remove(nd *Node) error {
|
||||||
for _, l := range nd.Links {
|
for _, l := range nd.Links {
|
||||||
if l.Node != nil {
|
if l.Node != nil {
|
||||||
n.Remove(l.Node)
|
n.Remove(l.Node)
|
||||||
|
@ -15,7 +15,7 @@ var log = u.Logger("path")
|
|||||||
// Resolver provides path resolution to IPFS
|
// Resolver provides path resolution to IPFS
|
||||||
// It has a pointer to a DAGService, which is uses to resolve nodes.
|
// It has a pointer to a DAGService, which is uses to resolve nodes.
|
||||||
type Resolver struct {
|
type Resolver struct {
|
||||||
DAG *merkledag.DAGService
|
DAG merkledag.DAGService
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResolvePath fetches the node for given path. It uses the first
|
// ResolvePath fetches the node for given path. It uses the first
|
||||||
|
@ -32,11 +32,11 @@ type pinner struct {
|
|||||||
recursePin set.BlockSet
|
recursePin set.BlockSet
|
||||||
directPin set.BlockSet
|
directPin set.BlockSet
|
||||||
indirPin *indirectPin
|
indirPin *indirectPin
|
||||||
dserv *mdag.DAGService
|
dserv mdag.DAGService
|
||||||
dstore ds.Datastore
|
dstore ds.Datastore
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPinner(dstore ds.Datastore, serv *mdag.DAGService) Pinner {
|
func NewPinner(dstore ds.Datastore, serv mdag.DAGService) Pinner {
|
||||||
|
|
||||||
// Load set from given datastore...
|
// Load set from given datastore...
|
||||||
rcds := nsds.Wrap(dstore, recursePinDatastoreKey)
|
rcds := nsds.Wrap(dstore, recursePinDatastoreKey)
|
||||||
@ -151,7 +151,7 @@ func (p *pinner) IsPinned(key util.Key) bool {
|
|||||||
p.indirPin.HasKey(key)
|
p.indirPin.HasKey(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
func LoadPinner(d ds.Datastore, dserv *mdag.DAGService) (Pinner, error) {
|
func LoadPinner(d ds.Datastore, dserv mdag.DAGService) (Pinner, error) {
|
||||||
p := new(pinner)
|
p := new(pinner)
|
||||||
|
|
||||||
{ // load recursive set
|
{ // load recursive set
|
||||||
|
@ -24,7 +24,7 @@ func TestPinnerBasic(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
dserv := &mdag.DAGService{Blocks: bserv}
|
dserv := mdag.NewDAGService(bserv)
|
||||||
|
|
||||||
p := NewPinner(dstore, dserv)
|
p := NewPinner(dstore, dserv)
|
||||||
|
|
||||||
|
@ -17,14 +17,14 @@ import (
|
|||||||
// perform surgery on a DAG 'file'
|
// perform surgery on a DAG 'file'
|
||||||
// Dear god, please rename this to something more pleasant
|
// Dear god, please rename this to something more pleasant
|
||||||
type DagModifier struct {
|
type DagModifier struct {
|
||||||
dagserv *mdag.DAGService
|
dagserv mdag.DAGService
|
||||||
curNode *mdag.Node
|
curNode *mdag.Node
|
||||||
|
|
||||||
pbdata *ftpb.Data
|
pbdata *ftpb.Data
|
||||||
splitter chunk.BlockSplitter
|
splitter chunk.BlockSplitter
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDagModifier(from *mdag.Node, serv *mdag.DAGService, spl chunk.BlockSplitter) (*DagModifier, error) {
|
func NewDagModifier(from *mdag.Node, serv mdag.DAGService, spl chunk.BlockSplitter) (*DagModifier, error) {
|
||||||
pbd, err := ft.FromBytes(from.Data)
|
pbd, err := ft.FromBytes(from.Data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -16,17 +16,17 @@ import (
|
|||||||
logging "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-logging"
|
logging "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-logging"
|
||||||
)
|
)
|
||||||
|
|
||||||
func getMockDagServ(t *testing.T) *mdag.DAGService {
|
func getMockDagServ(t *testing.T) mdag.DAGService {
|
||||||
dstore := ds.NewMapDatastore()
|
dstore := ds.NewMapDatastore()
|
||||||
bserv, err := bs.NewBlockService(dstore, nil)
|
bserv, err := bs.NewBlockService(dstore, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
return &mdag.DAGService{Blocks: bserv}
|
return mdag.NewDAGService(bserv)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getNode(t *testing.T, dserv *mdag.DAGService, size int64) ([]byte, *mdag.Node) {
|
func getNode(t *testing.T, dserv mdag.DAGService, size int64) ([]byte, *mdag.Node) {
|
||||||
dw := NewDagWriter(dserv, &chunk.SizeSplitter{Size: 500})
|
dw := NewDagWriter(dserv, &chunk.SizeSplitter{500})
|
||||||
|
|
||||||
n, err := io.CopyN(dw, u.NewFastRand(), size)
|
n, err := io.CopyN(dw, u.NewFastRand(), size)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -36,7 +36,11 @@ func getNode(t *testing.T, dserv *mdag.DAGService, size int64) ([]byte, *mdag.No
|
|||||||
t.Fatal("Incorrect copy amount!")
|
t.Fatal("Incorrect copy amount!")
|
||||||
}
|
}
|
||||||
|
|
||||||
dw.Close()
|
err = dw.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("DagWriter failed to close,", err)
|
||||||
|
}
|
||||||
|
|
||||||
node := dw.GetNode()
|
node := dw.GetNode()
|
||||||
|
|
||||||
dr, err := NewDagReader(node, dserv)
|
dr, err := NewDagReader(node, dserv)
|
||||||
|
@ -16,7 +16,7 @@ var ErrIsDir = errors.New("this dag node is a directory")
|
|||||||
|
|
||||||
// DagReader provides a way to easily read the data contained in a dag.
|
// DagReader provides a way to easily read the data contained in a dag.
|
||||||
type DagReader struct {
|
type DagReader struct {
|
||||||
serv *mdag.DAGService
|
serv mdag.DAGService
|
||||||
node *mdag.Node
|
node *mdag.Node
|
||||||
position int
|
position int
|
||||||
buf *bytes.Buffer
|
buf *bytes.Buffer
|
||||||
@ -24,7 +24,7 @@ type DagReader struct {
|
|||||||
|
|
||||||
// NewDagReader creates a new reader object that reads the data represented by the given
|
// NewDagReader creates a new reader object that reads the data represented by the given
|
||||||
// node, using the passed in DAGService for data retreival
|
// node, using the passed in DAGService for data retreival
|
||||||
func NewDagReader(n *mdag.Node, serv *mdag.DAGService) (io.Reader, error) {
|
func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) {
|
||||||
pb := new(ftpb.Data)
|
pb := new(ftpb.Data)
|
||||||
err := proto.Unmarshal(n.Data, pb)
|
err := proto.Unmarshal(n.Data, pb)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -10,7 +10,7 @@ import (
|
|||||||
var log = util.Logger("dagwriter")
|
var log = util.Logger("dagwriter")
|
||||||
|
|
||||||
type DagWriter struct {
|
type DagWriter struct {
|
||||||
dagserv *dag.DAGService
|
dagserv dag.DAGService
|
||||||
node *dag.Node
|
node *dag.Node
|
||||||
totalSize int64
|
totalSize int64
|
||||||
splChan chan []byte
|
splChan chan []byte
|
||||||
@ -19,7 +19,7 @@ type DagWriter struct {
|
|||||||
seterr error
|
seterr error
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDagWriter(ds *dag.DAGService, splitter chunk.BlockSplitter) *DagWriter {
|
func NewDagWriter(ds dag.DAGService, splitter chunk.BlockSplitter) *DagWriter {
|
||||||
dw := new(DagWriter)
|
dw := new(DagWriter)
|
||||||
dw.dagserv = ds
|
dw.dagserv = ds
|
||||||
dw.splChan = make(chan []byte, 8)
|
dw.splChan = make(chan []byte, 8)
|
||||||
|
@ -7,6 +7,7 @@ import (
|
|||||||
|
|
||||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||||
bs "github.com/jbenet/go-ipfs/blockservice"
|
bs "github.com/jbenet/go-ipfs/blockservice"
|
||||||
|
"github.com/jbenet/go-ipfs/importer"
|
||||||
chunk "github.com/jbenet/go-ipfs/importer/chunk"
|
chunk "github.com/jbenet/go-ipfs/importer/chunk"
|
||||||
mdag "github.com/jbenet/go-ipfs/merkledag"
|
mdag "github.com/jbenet/go-ipfs/merkledag"
|
||||||
)
|
)
|
||||||
@ -53,7 +54,7 @@ func TestDagWriter(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
dag := &mdag.DAGService{Blocks: bserv}
|
dag := mdag.NewDAGService(bserv)
|
||||||
dw := NewDagWriter(dag, &chunk.SizeSplitter{Size: 4096})
|
dw := NewDagWriter(dag, &chunk.SizeSplitter{Size: 4096})
|
||||||
|
|
||||||
nbytes := int64(1024 * 1024 * 2)
|
nbytes := int64(1024 * 1024 * 2)
|
||||||
@ -87,7 +88,7 @@ func TestMassiveWrite(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
dag := &mdag.DAGService{Blocks: bserv}
|
dag := mdag.NewDAGService(bserv)
|
||||||
dw := NewDagWriter(dag, &chunk.SizeSplitter{Size: 4096})
|
dw := NewDagWriter(dag, &chunk.SizeSplitter{Size: 4096})
|
||||||
|
|
||||||
nbytes := int64(1024 * 1024 * 1024 * 16)
|
nbytes := int64(1024 * 1024 * 1024 * 16)
|
||||||
@ -107,7 +108,7 @@ func BenchmarkDagWriter(b *testing.B) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
}
|
}
|
||||||
dag := &mdag.DAGService{Blocks: bserv}
|
dag := mdag.NewDAGService(bserv)
|
||||||
|
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
nbytes := int64(100000)
|
nbytes := int64(100000)
|
||||||
@ -125,3 +126,45 @@ func BenchmarkDagWriter(b *testing.B) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAgainstImporter(t *testing.T) {
|
||||||
|
dstore := ds.NewMapDatastore()
|
||||||
|
bserv, err := bs.NewBlockService(dstore, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
dag := mdag.NewDAGService(bserv)
|
||||||
|
|
||||||
|
nbytes := int64(1024 * 1024 * 2)
|
||||||
|
|
||||||
|
// DagWriter
|
||||||
|
dw := NewDagWriter(dag, &chunk.SizeSplitter{4096})
|
||||||
|
n, err := io.CopyN(dw, &datasource{}, nbytes)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if n != nbytes {
|
||||||
|
t.Fatal("Copied incorrect amount of bytes!")
|
||||||
|
}
|
||||||
|
|
||||||
|
dw.Close()
|
||||||
|
dwNode := dw.GetNode()
|
||||||
|
dwKey, err := dwNode.Key()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DagFromFile
|
||||||
|
rl := &io.LimitedReader{&datasource{}, nbytes}
|
||||||
|
|
||||||
|
dffNode, err := importer.NewDagFromReaderWithSplitter(rl, &chunk.SizeSplitter{4096})
|
||||||
|
dffKey, err := dffNode.Key()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if dwKey.String() != dffKey.String() {
|
||||||
|
t.Errorf("\nDagWriter produced %s\n"+
|
||||||
|
"DagFromReader produced %s",
|
||||||
|
dwKey, dffKey)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user