mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-01 19:24:14 +08:00
refactor(blockstore, blockservice) use Blockstore and offline.Exchange
License: MIT Signed-off-by: Brian Tiger Chow <brian@perfmode.com>
This commit is contained in:
@ -15,6 +15,8 @@ import (
|
|||||||
var ValueTypeMismatch = errors.New("The retrieved value is not a Block")
|
var ValueTypeMismatch = errors.New("The retrieved value is not a Block")
|
||||||
|
|
||||||
type Blockstore interface {
|
type Blockstore interface {
|
||||||
|
DeleteBlock(u.Key) error
|
||||||
|
Has(u.Key) (bool, error)
|
||||||
Get(u.Key) (*blocks.Block, error)
|
Get(u.Key) (*blocks.Block, error)
|
||||||
Put(*blocks.Block) error
|
Put(*blocks.Block) error
|
||||||
}
|
}
|
||||||
@ -45,3 +47,11 @@ func (bs *blockstore) Get(k u.Key) (*blocks.Block, error) {
|
|||||||
func (bs *blockstore) Put(block *blocks.Block) error {
|
func (bs *blockstore) Put(block *blocks.Block) error {
|
||||||
return bs.datastore.Put(block.Key().DsKey(), block.Data)
|
return bs.datastore.Put(block.Key().DsKey(), block.Data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (bs *blockstore) Has(k u.Key) (bool, error) {
|
||||||
|
return bs.datastore.Has(k.DsKey())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *blockstore) DeleteBlock(k u.Key) error {
|
||||||
|
return s.datastore.Delete(k.DsKey())
|
||||||
|
}
|
||||||
|
@ -6,15 +6,18 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
|
|
||||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||||
|
dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
|
||||||
blocks "github.com/jbenet/go-ipfs/blocks"
|
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||||
|
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
|
||||||
|
offline "github.com/jbenet/go-ipfs/exchange/offline"
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestBlocks(t *testing.T) {
|
func TestBlocks(t *testing.T) {
|
||||||
d := ds.NewMapDatastore()
|
d := ds.NewMapDatastore()
|
||||||
bs, err := NewBlockService(d, nil)
|
tsds := dssync.MutexWrap(d)
|
||||||
|
bs, err := New(blockstore.NewBlockstore(tsds), offline.Exchange())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error("failed to construct block service", err)
|
t.Error("failed to construct block service", err)
|
||||||
return
|
return
|
||||||
|
@ -9,9 +9,9 @@ import (
|
|||||||
|
|
||||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||||
mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
|
|
||||||
|
|
||||||
blocks "github.com/jbenet/go-ipfs/blocks"
|
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||||
|
"github.com/jbenet/go-ipfs/blocks/blockstore"
|
||||||
exchange "github.com/jbenet/go-ipfs/exchange"
|
exchange "github.com/jbenet/go-ipfs/exchange"
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
)
|
)
|
||||||
@ -19,25 +19,28 @@ import (
|
|||||||
var log = u.Logger("blockservice")
|
var log = u.Logger("blockservice")
|
||||||
var ErrNotFound = errors.New("blockservice: key not found")
|
var ErrNotFound = errors.New("blockservice: key not found")
|
||||||
|
|
||||||
// BlockService is a block datastore.
|
// BlockService is a hybrid block datastore. It stores data in a local
|
||||||
|
// datastore and may retrieve data from a remote Exchange.
|
||||||
// It uses an internal `datastore.Datastore` instance to store values.
|
// It uses an internal `datastore.Datastore` instance to store values.
|
||||||
type BlockService struct {
|
type BlockService struct {
|
||||||
Datastore ds.Datastore
|
// TODO don't expose underlying impl details
|
||||||
Remote exchange.Interface
|
Blockstore blockstore.Blockstore
|
||||||
|
Remote exchange.Interface
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBlockService creates a BlockService with given datastore instance.
|
// NewBlockService creates a BlockService with given datastore instance.
|
||||||
func NewBlockService(d ds.Datastore, rem exchange.Interface) (*BlockService, error) {
|
func New(bs blockstore.Blockstore, rem exchange.Interface) (*BlockService, error) {
|
||||||
if d == nil {
|
if bs == nil {
|
||||||
return nil, fmt.Errorf("BlockService requires valid datastore")
|
return nil, fmt.Errorf("BlockService requires valid blockstore")
|
||||||
}
|
}
|
||||||
if rem == nil {
|
if rem == nil {
|
||||||
log.Warning("blockservice running in local (offline) mode.")
|
log.Warning("blockservice running in local (offline) mode.")
|
||||||
}
|
}
|
||||||
return &BlockService{Datastore: d, Remote: rem}, nil
|
return &BlockService{Blockstore: bs, Remote: rem}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddBlock adds a particular block to the service, Putting it into the datastore.
|
// AddBlock adds a particular block to the service, Putting it into the datastore.
|
||||||
|
// TODO pass a context into this if the remote.HasBlock is going to remain here.
|
||||||
func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
|
func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
|
||||||
k := b.Key()
|
k := b.Key()
|
||||||
log.Debugf("blockservice: storing [%s] in datastore", k)
|
log.Debugf("blockservice: storing [%s] in datastore", k)
|
||||||
@ -47,7 +50,7 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
|
|||||||
// check if we have it before adding. this is an extra read, but large writes
|
// check if we have it before adding. this is an extra read, but large writes
|
||||||
// are more expensive.
|
// are more expensive.
|
||||||
// TODO(jbenet) cheaper has. https://github.com/jbenet/go-datastore/issues/6
|
// TODO(jbenet) cheaper has. https://github.com/jbenet/go-datastore/issues/6
|
||||||
has, err := s.Datastore.Has(k.DsKey())
|
has, err := s.Blockstore.Has(k)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return k, err
|
return k, err
|
||||||
}
|
}
|
||||||
@ -55,12 +58,14 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
|
|||||||
log.Debugf("blockservice: storing [%s] in datastore (already stored)", k)
|
log.Debugf("blockservice: storing [%s] in datastore (already stored)", k)
|
||||||
} else {
|
} else {
|
||||||
log.Debugf("blockservice: storing [%s] in datastore", k)
|
log.Debugf("blockservice: storing [%s] in datastore", k)
|
||||||
err := s.Datastore.Put(k.DsKey(), b.Data)
|
err := s.Blockstore.Put(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return k, err
|
return k, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO this operation rate-limits blockservice operations, we should
|
||||||
|
// consider moving this to an sync process.
|
||||||
if s.Remote != nil {
|
if s.Remote != nil {
|
||||||
ctx := context.TODO()
|
ctx := context.TODO()
|
||||||
err = s.Remote.HasBlock(ctx, *b)
|
err = s.Remote.HasBlock(ctx, *b)
|
||||||
@ -72,17 +77,11 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
|
|||||||
// Getting it from the datastore using the key (hash).
|
// Getting it from the datastore using the key (hash).
|
||||||
func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, error) {
|
func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, error) {
|
||||||
log.Debugf("BlockService GetBlock: '%s'", k)
|
log.Debugf("BlockService GetBlock: '%s'", k)
|
||||||
datai, err := s.Datastore.Get(k.DsKey())
|
block, err := s.Blockstore.Get(k)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
log.Debug("Blockservice: Got data in datastore.")
|
return block, nil
|
||||||
bdata, ok := datai.([]byte)
|
// TODO be careful checking ErrNotFound. If the underlying
|
||||||
if !ok {
|
// implementation changes, this will break.
|
||||||
return nil, fmt.Errorf("data associated with %s is not a []byte", k)
|
|
||||||
}
|
|
||||||
return &blocks.Block{
|
|
||||||
Multihash: mh.Multihash(k),
|
|
||||||
Data: bdata,
|
|
||||||
}, nil
|
|
||||||
} else if err == ds.ErrNotFound && s.Remote != nil {
|
} else if err == ds.ErrNotFound && s.Remote != nil {
|
||||||
log.Debug("Blockservice: Searching bitswap.")
|
log.Debug("Blockservice: Searching bitswap.")
|
||||||
blk, err := s.Remote.GetBlock(ctx, k)
|
blk, err := s.Remote.GetBlock(ctx, k)
|
||||||
@ -101,21 +100,13 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []u.Key) <-chan *blocks
|
|||||||
go func() {
|
go func() {
|
||||||
var toFetch []u.Key
|
var toFetch []u.Key
|
||||||
for _, k := range ks {
|
for _, k := range ks {
|
||||||
datai, err := s.Datastore.Get(k.DsKey())
|
block, err := s.Blockstore.Get(k)
|
||||||
if err == nil {
|
if err != nil {
|
||||||
log.Debug("Blockservice: Got data in datastore.")
|
|
||||||
bdata, ok := datai.([]byte)
|
|
||||||
if !ok {
|
|
||||||
log.Criticalf("data associated with %s is not a []byte", k)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
out <- &blocks.Block{
|
|
||||||
Multihash: mh.Multihash(k),
|
|
||||||
Data: bdata,
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
toFetch = append(toFetch, k)
|
toFetch = append(toFetch, k)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
log.Debug("Blockservice: Got data in datastore.")
|
||||||
|
out <- block
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return out
|
return out
|
||||||
@ -123,5 +114,5 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []u.Key) <-chan *blocks
|
|||||||
|
|
||||||
// DeleteBlock deletes a block in the blockservice from the datastore
|
// DeleteBlock deletes a block in the blockservice from the datastore
|
||||||
func (s *BlockService) DeleteBlock(k u.Key) error {
|
func (s *BlockService) DeleteBlock(k u.Key) error {
|
||||||
return s.Datastore.Delete(k.DsKey())
|
return s.Blockstore.DeleteBlock(k)
|
||||||
}
|
}
|
||||||
|
@ -15,6 +15,7 @@ import (
|
|||||||
exchange "github.com/jbenet/go-ipfs/exchange"
|
exchange "github.com/jbenet/go-ipfs/exchange"
|
||||||
bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
|
bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
|
||||||
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
|
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
|
||||||
|
"github.com/jbenet/go-ipfs/exchange/offline"
|
||||||
mount "github.com/jbenet/go-ipfs/fuse/mount"
|
mount "github.com/jbenet/go-ipfs/fuse/mount"
|
||||||
merkledag "github.com/jbenet/go-ipfs/merkledag"
|
merkledag "github.com/jbenet/go-ipfs/merkledag"
|
||||||
namesys "github.com/jbenet/go-ipfs/namesys"
|
namesys "github.com/jbenet/go-ipfs/namesys"
|
||||||
@ -127,6 +128,8 @@ func NewIpfsNode(cfg *config.Config, online bool) (n *IpfsNode, err error) {
|
|||||||
return nil, debugerror.Wrap(err)
|
return nil, debugerror.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
n.Exchange = offline.Exchange()
|
||||||
|
|
||||||
// setup online services
|
// setup online services
|
||||||
if online {
|
if online {
|
||||||
|
|
||||||
@ -178,7 +181,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (n *IpfsNode, err error) {
|
|||||||
|
|
||||||
// TODO(brian): when offline instantiate the BlockService with a bitswap
|
// TODO(brian): when offline instantiate the BlockService with a bitswap
|
||||||
// session that simply doesn't return blocks
|
// session that simply doesn't return blocks
|
||||||
n.Blocks, err = bserv.NewBlockService(n.Datastore, n.Exchange)
|
n.Blocks, err = bserv.New(blockstore.NewBlockstore(n.Datastore), n.Exchange)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, debugerror.Wrap(err)
|
return nil, debugerror.Wrap(err)
|
||||||
}
|
}
|
||||||
|
@ -3,8 +3,10 @@ package core
|
|||||||
import (
|
import (
|
||||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||||
syncds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
|
syncds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
|
||||||
bs "github.com/jbenet/go-ipfs/blockservice"
|
"github.com/jbenet/go-ipfs/blocks/blockstore"
|
||||||
|
blockservice "github.com/jbenet/go-ipfs/blockservice"
|
||||||
ci "github.com/jbenet/go-ipfs/crypto"
|
ci "github.com/jbenet/go-ipfs/crypto"
|
||||||
|
"github.com/jbenet/go-ipfs/exchange/offline"
|
||||||
mdag "github.com/jbenet/go-ipfs/merkledag"
|
mdag "github.com/jbenet/go-ipfs/merkledag"
|
||||||
nsys "github.com/jbenet/go-ipfs/namesys"
|
nsys "github.com/jbenet/go-ipfs/namesys"
|
||||||
path "github.com/jbenet/go-ipfs/path"
|
path "github.com/jbenet/go-ipfs/path"
|
||||||
@ -43,9 +45,7 @@ func NewMockNode() (*IpfsNode, error) {
|
|||||||
nd.Routing = dht
|
nd.Routing = dht
|
||||||
|
|
||||||
// Bitswap
|
// Bitswap
|
||||||
//??
|
bserv, err := blockservice.New(blockstore.NewBlockstore(nd.Datastore), offline.Exchange())
|
||||||
|
|
||||||
bserv, err := bs.NewBlockService(nd.Datastore, nil)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -4,7 +4,10 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||||
|
dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
|
||||||
|
"github.com/jbenet/go-ipfs/blocks/blockstore"
|
||||||
bs "github.com/jbenet/go-ipfs/blockservice"
|
bs "github.com/jbenet/go-ipfs/blockservice"
|
||||||
|
"github.com/jbenet/go-ipfs/exchange/offline"
|
||||||
mdag "github.com/jbenet/go-ipfs/merkledag"
|
mdag "github.com/jbenet/go-ipfs/merkledag"
|
||||||
"github.com/jbenet/go-ipfs/util"
|
"github.com/jbenet/go-ipfs/util"
|
||||||
)
|
)
|
||||||
@ -19,13 +22,15 @@ func randNode() (*mdag.Node, util.Key) {
|
|||||||
|
|
||||||
func TestPinnerBasic(t *testing.T) {
|
func TestPinnerBasic(t *testing.T) {
|
||||||
dstore := ds.NewMapDatastore()
|
dstore := ds.NewMapDatastore()
|
||||||
bserv, err := bs.NewBlockService(dstore, nil)
|
bstore := blockstore.NewBlockstore(dssync.MutexWrap(dstore))
|
||||||
|
bserv, err := bs.New(bstore, offline.Exchange())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
dserv := mdag.NewDAGService(bserv)
|
dserv := mdag.NewDAGService(bserv)
|
||||||
|
|
||||||
|
// TODO does pinner need to share datastore with blockservice?
|
||||||
p := NewPinner(dstore, dserv)
|
p := NewPinner(dstore, dserv)
|
||||||
|
|
||||||
a, ak := randNode()
|
a, ak := randNode()
|
||||||
|
@ -6,7 +6,10 @@ import (
|
|||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
|
||||||
|
"github.com/jbenet/go-ipfs/blocks/blockstore"
|
||||||
bs "github.com/jbenet/go-ipfs/blockservice"
|
bs "github.com/jbenet/go-ipfs/blockservice"
|
||||||
|
"github.com/jbenet/go-ipfs/exchange/offline"
|
||||||
imp "github.com/jbenet/go-ipfs/importer"
|
imp "github.com/jbenet/go-ipfs/importer"
|
||||||
"github.com/jbenet/go-ipfs/importer/chunk"
|
"github.com/jbenet/go-ipfs/importer/chunk"
|
||||||
mdag "github.com/jbenet/go-ipfs/merkledag"
|
mdag "github.com/jbenet/go-ipfs/merkledag"
|
||||||
@ -19,7 +22,9 @@ import (
|
|||||||
|
|
||||||
func getMockDagServ(t *testing.T) mdag.DAGService {
|
func getMockDagServ(t *testing.T) mdag.DAGService {
|
||||||
dstore := ds.NewMapDatastore()
|
dstore := ds.NewMapDatastore()
|
||||||
bserv, err := bs.NewBlockService(dstore, nil)
|
tsds := sync.MutexWrap(dstore)
|
||||||
|
bstore := blockstore.NewBlockstore(tsds)
|
||||||
|
bserv, err := bs.New(bstore, offline.Exchange())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -4,9 +4,12 @@ import (
|
|||||||
crand "crypto/rand"
|
crand "crypto/rand"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
|
||||||
|
"github.com/jbenet/go-ipfs/exchange/offline"
|
||||||
"github.com/jbenet/go-ipfs/peer"
|
"github.com/jbenet/go-ipfs/peer"
|
||||||
|
|
||||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||||
|
"github.com/jbenet/go-ipfs/blocks/blockstore"
|
||||||
bsrv "github.com/jbenet/go-ipfs/blockservice"
|
bsrv "github.com/jbenet/go-ipfs/blockservice"
|
||||||
dag "github.com/jbenet/go-ipfs/merkledag"
|
dag "github.com/jbenet/go-ipfs/merkledag"
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
@ -14,7 +17,8 @@ import (
|
|||||||
|
|
||||||
func GetDAGServ(t testing.TB) dag.DAGService {
|
func GetDAGServ(t testing.TB) dag.DAGService {
|
||||||
dstore := ds.NewMapDatastore()
|
dstore := ds.NewMapDatastore()
|
||||||
bserv, err := bsrv.NewBlockService(dstore, nil)
|
tsds := dssync.MutexWrap(dstore)
|
||||||
|
bserv, err := bsrv.New(blockstore.NewBlockstore(tsds), offline.Exchange())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user