mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-02 03:28:25 +08:00
Merge pull request #191 from ehmry/dagservice-interface
convert DAGService to an interface
This commit is contained in:
@ -58,7 +58,7 @@ type IpfsNode struct {
|
||||
Blocks *bserv.BlockService
|
||||
|
||||
// the merkle dag service, get/add objects.
|
||||
DAG *merkledag.DAGService
|
||||
DAG merkledag.DAGService
|
||||
|
||||
// the path resolution system
|
||||
Resolver *path.Resolver
|
||||
@ -161,7 +161,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dag := &merkledag.DAGService{Blocks: bs}
|
||||
dag := merkledag.NewDAGService(bs)
|
||||
ns := namesys.NewNameSystem(route)
|
||||
p, err := pin.LoadPinner(d, dag)
|
||||
if err != nil {
|
||||
|
@ -49,7 +49,7 @@ func NewMockNode() (*IpfsNode, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nd.DAG = &mdag.DAGService{Blocks: bserv}
|
||||
nd.DAG = mdag.NewDAGService(bserv)
|
||||
|
||||
// Namespace resolver
|
||||
nd.Namesys = nsys.NewNameSystem(dht)
|
||||
|
@ -62,7 +62,7 @@ func MakeLink(n *Node) (*Link, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (l *Link) GetNode(serv *DAGService) (*Node, error) {
|
||||
func (l *Link) GetNode(serv DAGService) (*Node, error) {
|
||||
if l.Node != nil {
|
||||
return l.Node, nil
|
||||
}
|
||||
@ -151,20 +151,32 @@ func (n *Node) Key() (u.Key, error) {
|
||||
}
|
||||
|
||||
// DAGService is an IPFS Merkle DAG service.
|
||||
type DAGService interface {
|
||||
Add(*Node) (u.Key, error)
|
||||
AddRecursive(*Node) error
|
||||
Get(u.Key) (*Node, error)
|
||||
Remove(*Node) error
|
||||
}
|
||||
|
||||
func NewDAGService(bs *bserv.BlockService) DAGService {
|
||||
return &dagService{bs}
|
||||
}
|
||||
|
||||
// dagService is an IPFS Merkle DAG service.
|
||||
// - the root is virtual (like a forest)
|
||||
// - stores nodes' data in a BlockService
|
||||
// TODO: should cache Nodes that are in memory, and be
|
||||
// able to free some of them when vm pressure is high
|
||||
type DAGService struct {
|
||||
type dagService struct {
|
||||
Blocks *bserv.BlockService
|
||||
}
|
||||
|
||||
// Add adds a node to the DAGService, storing the block in the BlockService
|
||||
func (n *DAGService) Add(nd *Node) (u.Key, error) {
|
||||
// Add adds a node to the dagService, storing the block in the BlockService
|
||||
func (n *dagService) Add(nd *Node) (u.Key, error) {
|
||||
k, _ := nd.Key()
|
||||
log.Debug("DagService Add [%s]", k)
|
||||
if n == nil {
|
||||
return "", fmt.Errorf("DAGService is nil")
|
||||
return "", fmt.Errorf("dagService is nil")
|
||||
}
|
||||
|
||||
d, err := nd.Encoded(false)
|
||||
@ -182,7 +194,7 @@ func (n *DAGService) Add(nd *Node) (u.Key, error) {
|
||||
return n.Blocks.AddBlock(b)
|
||||
}
|
||||
|
||||
func (n *DAGService) AddRecursive(nd *Node) error {
|
||||
func (n *dagService) AddRecursive(nd *Node) error {
|
||||
_, err := n.Add(nd)
|
||||
if err != nil {
|
||||
log.Info("AddRecursive Error: %s\n", err)
|
||||
@ -201,10 +213,10 @@ func (n *DAGService) AddRecursive(nd *Node) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get retrieves a node from the DAGService, fetching the block in the BlockService
|
||||
func (n *DAGService) Get(k u.Key) (*Node, error) {
|
||||
// Get retrieves a node from the dagService, fetching the block in the BlockService
|
||||
func (n *dagService) Get(k u.Key) (*Node, error) {
|
||||
if n == nil {
|
||||
return nil, fmt.Errorf("DAGService is nil")
|
||||
return nil, fmt.Errorf("dagService is nil")
|
||||
}
|
||||
|
||||
ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
|
||||
@ -216,7 +228,7 @@ func (n *DAGService) Get(k u.Key) (*Node, error) {
|
||||
return Decoded(b.Data)
|
||||
}
|
||||
|
||||
func (n *DAGService) Remove(nd *Node) error {
|
||||
func (n *dagService) Remove(nd *Node) error {
|
||||
for _, l := range nd.Links {
|
||||
if l.Node != nil {
|
||||
n.Remove(l.Node)
|
||||
|
@ -15,7 +15,7 @@ var log = u.Logger("path")
|
||||
// Resolver provides path resolution to IPFS
|
||||
// It has a pointer to a DAGService, which is uses to resolve nodes.
|
||||
type Resolver struct {
|
||||
DAG *merkledag.DAGService
|
||||
DAG merkledag.DAGService
|
||||
}
|
||||
|
||||
// ResolvePath fetches the node for given path. It uses the first
|
||||
|
@ -32,11 +32,11 @@ type pinner struct {
|
||||
recursePin set.BlockSet
|
||||
directPin set.BlockSet
|
||||
indirPin *indirectPin
|
||||
dserv *mdag.DAGService
|
||||
dserv mdag.DAGService
|
||||
dstore ds.Datastore
|
||||
}
|
||||
|
||||
func NewPinner(dstore ds.Datastore, serv *mdag.DAGService) Pinner {
|
||||
func NewPinner(dstore ds.Datastore, serv mdag.DAGService) Pinner {
|
||||
|
||||
// Load set from given datastore...
|
||||
rcds := nsds.Wrap(dstore, recursePinDatastoreKey)
|
||||
@ -151,7 +151,7 @@ func (p *pinner) IsPinned(key util.Key) bool {
|
||||
p.indirPin.HasKey(key)
|
||||
}
|
||||
|
||||
func LoadPinner(d ds.Datastore, dserv *mdag.DAGService) (Pinner, error) {
|
||||
func LoadPinner(d ds.Datastore, dserv mdag.DAGService) (Pinner, error) {
|
||||
p := new(pinner)
|
||||
|
||||
{ // load recursive set
|
||||
|
@ -24,7 +24,7 @@ func TestPinnerBasic(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
dserv := &mdag.DAGService{Blocks: bserv}
|
||||
dserv := mdag.NewDAGService(bserv)
|
||||
|
||||
p := NewPinner(dstore, dserv)
|
||||
|
||||
|
@ -17,14 +17,14 @@ import (
|
||||
// perform surgery on a DAG 'file'
|
||||
// Dear god, please rename this to something more pleasant
|
||||
type DagModifier struct {
|
||||
dagserv *mdag.DAGService
|
||||
dagserv mdag.DAGService
|
||||
curNode *mdag.Node
|
||||
|
||||
pbdata *ftpb.Data
|
||||
splitter chunk.BlockSplitter
|
||||
}
|
||||
|
||||
func NewDagModifier(from *mdag.Node, serv *mdag.DAGService, spl chunk.BlockSplitter) (*DagModifier, error) {
|
||||
func NewDagModifier(from *mdag.Node, serv mdag.DAGService, spl chunk.BlockSplitter) (*DagModifier, error) {
|
||||
pbd, err := ft.FromBytes(from.Data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -16,17 +16,17 @@ import (
|
||||
logging "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-logging"
|
||||
)
|
||||
|
||||
func getMockDagServ(t *testing.T) *mdag.DAGService {
|
||||
func getMockDagServ(t *testing.T) mdag.DAGService {
|
||||
dstore := ds.NewMapDatastore()
|
||||
bserv, err := bs.NewBlockService(dstore, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return &mdag.DAGService{Blocks: bserv}
|
||||
return mdag.NewDAGService(bserv)
|
||||
}
|
||||
|
||||
func getNode(t *testing.T, dserv *mdag.DAGService, size int64) ([]byte, *mdag.Node) {
|
||||
dw := NewDagWriter(dserv, &chunk.SizeSplitter{Size: 500})
|
||||
func getNode(t *testing.T, dserv mdag.DAGService, size int64) ([]byte, *mdag.Node) {
|
||||
dw := NewDagWriter(dserv, &chunk.SizeSplitter{500})
|
||||
|
||||
n, err := io.CopyN(dw, u.NewFastRand(), size)
|
||||
if err != nil {
|
||||
@ -36,7 +36,11 @@ func getNode(t *testing.T, dserv *mdag.DAGService, size int64) ([]byte, *mdag.No
|
||||
t.Fatal("Incorrect copy amount!")
|
||||
}
|
||||
|
||||
dw.Close()
|
||||
err = dw.Close()
|
||||
if err != nil {
|
||||
t.Fatal("DagWriter failed to close,", err)
|
||||
}
|
||||
|
||||
node := dw.GetNode()
|
||||
|
||||
dr, err := NewDagReader(node, dserv)
|
||||
|
@ -16,7 +16,7 @@ var ErrIsDir = errors.New("this dag node is a directory")
|
||||
|
||||
// DagReader provides a way to easily read the data contained in a dag.
|
||||
type DagReader struct {
|
||||
serv *mdag.DAGService
|
||||
serv mdag.DAGService
|
||||
node *mdag.Node
|
||||
position int
|
||||
buf *bytes.Buffer
|
||||
@ -24,7 +24,7 @@ type DagReader struct {
|
||||
|
||||
// NewDagReader creates a new reader object that reads the data represented by the given
|
||||
// node, using the passed in DAGService for data retreival
|
||||
func NewDagReader(n *mdag.Node, serv *mdag.DAGService) (io.Reader, error) {
|
||||
func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) {
|
||||
pb := new(ftpb.Data)
|
||||
err := proto.Unmarshal(n.Data, pb)
|
||||
if err != nil {
|
||||
|
@ -10,7 +10,7 @@ import (
|
||||
var log = util.Logger("dagwriter")
|
||||
|
||||
type DagWriter struct {
|
||||
dagserv *dag.DAGService
|
||||
dagserv dag.DAGService
|
||||
node *dag.Node
|
||||
totalSize int64
|
||||
splChan chan []byte
|
||||
@ -19,7 +19,7 @@ type DagWriter struct {
|
||||
seterr error
|
||||
}
|
||||
|
||||
func NewDagWriter(ds *dag.DAGService, splitter chunk.BlockSplitter) *DagWriter {
|
||||
func NewDagWriter(ds dag.DAGService, splitter chunk.BlockSplitter) *DagWriter {
|
||||
dw := new(DagWriter)
|
||||
dw.dagserv = ds
|
||||
dw.splChan = make(chan []byte, 8)
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
|
||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||
bs "github.com/jbenet/go-ipfs/blockservice"
|
||||
"github.com/jbenet/go-ipfs/importer"
|
||||
chunk "github.com/jbenet/go-ipfs/importer/chunk"
|
||||
mdag "github.com/jbenet/go-ipfs/merkledag"
|
||||
)
|
||||
@ -53,7 +54,7 @@ func TestDagWriter(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
dag := &mdag.DAGService{Blocks: bserv}
|
||||
dag := mdag.NewDAGService(bserv)
|
||||
dw := NewDagWriter(dag, &chunk.SizeSplitter{Size: 4096})
|
||||
|
||||
nbytes := int64(1024 * 1024 * 2)
|
||||
@ -87,7 +88,7 @@ func TestMassiveWrite(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
dag := &mdag.DAGService{Blocks: bserv}
|
||||
dag := mdag.NewDAGService(bserv)
|
||||
dw := NewDagWriter(dag, &chunk.SizeSplitter{Size: 4096})
|
||||
|
||||
nbytes := int64(1024 * 1024 * 1024 * 16)
|
||||
@ -107,7 +108,7 @@ func BenchmarkDagWriter(b *testing.B) {
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
dag := &mdag.DAGService{Blocks: bserv}
|
||||
dag := mdag.NewDAGService(bserv)
|
||||
|
||||
b.ResetTimer()
|
||||
nbytes := int64(100000)
|
||||
@ -125,3 +126,45 @@ func BenchmarkDagWriter(b *testing.B) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestAgainstImporter(t *testing.T) {
|
||||
dstore := ds.NewMapDatastore()
|
||||
bserv, err := bs.NewBlockService(dstore, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
dag := mdag.NewDAGService(bserv)
|
||||
|
||||
nbytes := int64(1024 * 1024 * 2)
|
||||
|
||||
// DagWriter
|
||||
dw := NewDagWriter(dag, &chunk.SizeSplitter{4096})
|
||||
n, err := io.CopyN(dw, &datasource{}, nbytes)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if n != nbytes {
|
||||
t.Fatal("Copied incorrect amount of bytes!")
|
||||
}
|
||||
|
||||
dw.Close()
|
||||
dwNode := dw.GetNode()
|
||||
dwKey, err := dwNode.Key()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// DagFromFile
|
||||
rl := &io.LimitedReader{&datasource{}, nbytes}
|
||||
|
||||
dffNode, err := importer.NewDagFromReaderWithSplitter(rl, &chunk.SizeSplitter{4096})
|
||||
dffKey, err := dffNode.Key()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if dwKey.String() != dffKey.String() {
|
||||
t.Errorf("\nDagWriter produced %s\n"+
|
||||
"DagFromReader produced %s",
|
||||
dwKey, dffKey)
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user