diff --git a/blocks/blockstore/blockstore.go b/blocks/blockstore/blockstore.go index b4c0fd7fb..68ccc7c74 100644 --- a/blocks/blockstore/blockstore.go +++ b/blocks/blockstore/blockstore.go @@ -15,6 +15,8 @@ import ( var ValueTypeMismatch = errors.New("The retrieved value is not a Block") type Blockstore interface { + DeleteBlock(u.Key) error + Has(u.Key) (bool, error) Get(u.Key) (*blocks.Block, error) Put(*blocks.Block) error } @@ -45,3 +47,11 @@ func (bs *blockstore) Get(k u.Key) (*blocks.Block, error) { func (bs *blockstore) Put(block *blocks.Block) error { return bs.datastore.Put(block.Key().DsKey(), block.Data) } + +func (bs *blockstore) Has(k u.Key) (bool, error) { + return bs.datastore.Has(k.DsKey()) +} + +func (s *blockstore) DeleteBlock(k u.Key) error { + return s.datastore.Delete(k.DsKey()) +} diff --git a/blockservice/blocks_test.go b/blockservice/blocks_test.go index 1e837eb5d..9f579c530 100644 --- a/blockservice/blocks_test.go +++ b/blockservice/blocks_test.go @@ -6,15 +6,18 @@ import ( "time" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" blocks "github.com/jbenet/go-ipfs/blocks" + blockstore "github.com/jbenet/go-ipfs/blocks/blockstore" + offline "github.com/jbenet/go-ipfs/exchange/offline" u "github.com/jbenet/go-ipfs/util" ) func TestBlocks(t *testing.T) { d := ds.NewMapDatastore() - bs, err := NewBlockService(d, nil) + tsds := dssync.MutexWrap(d) + bs, err := New(blockstore.NewBlockstore(tsds), offline.Exchange()) if err != nil { t.Error("failed to construct block service", err) return diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 279e3ffd9..86e0c776b 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -9,9 +9,9 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" - mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash" blocks "github.com/jbenet/go-ipfs/blocks" + "github.com/jbenet/go-ipfs/blocks/blockstore" exchange "github.com/jbenet/go-ipfs/exchange" u "github.com/jbenet/go-ipfs/util" ) @@ -19,25 +19,28 @@ import ( var log = u.Logger("blockservice") var ErrNotFound = errors.New("blockservice: key not found") -// BlockService is a block datastore. +// BlockService is a hybrid block datastore. It stores data in a local +// datastore and may retrieve data from a remote Exchange. // It uses an internal `datastore.Datastore` instance to store values. type BlockService struct { - Datastore ds.Datastore - Remote exchange.Interface + // TODO don't expose underlying impl details + Blockstore blockstore.Blockstore + Remote exchange.Interface } // NewBlockService creates a BlockService with given datastore instance. -func NewBlockService(d ds.Datastore, rem exchange.Interface) (*BlockService, error) { - if d == nil { - return nil, fmt.Errorf("BlockService requires valid datastore") +func New(bs blockstore.Blockstore, rem exchange.Interface) (*BlockService, error) { + if bs == nil { + return nil, fmt.Errorf("BlockService requires valid blockstore") } if rem == nil { log.Warning("blockservice running in local (offline) mode.") } - return &BlockService{Datastore: d, Remote: rem}, nil + return &BlockService{Blockstore: bs, Remote: rem}, nil } // AddBlock adds a particular block to the service, Putting it into the datastore. +// TODO pass a context into this if the remote.HasBlock is going to remain here. func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) { k := b.Key() log.Debugf("blockservice: storing [%s] in datastore", k) @@ -47,7 +50,7 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) { // check if we have it before adding. this is an extra read, but large writes // are more expensive. // TODO(jbenet) cheaper has. https://github.com/jbenet/go-datastore/issues/6 - has, err := s.Datastore.Has(k.DsKey()) + has, err := s.Blockstore.Has(k) if err != nil { return k, err } @@ -55,12 +58,14 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) { log.Debugf("blockservice: storing [%s] in datastore (already stored)", k) } else { log.Debugf("blockservice: storing [%s] in datastore", k) - err := s.Datastore.Put(k.DsKey(), b.Data) + err := s.Blockstore.Put(b) if err != nil { return k, err } } + // TODO this operation rate-limits blockservice operations, we should + // consider moving this to an sync process. if s.Remote != nil { ctx := context.TODO() err = s.Remote.HasBlock(ctx, *b) @@ -72,17 +77,11 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) { // Getting it from the datastore using the key (hash). func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, error) { log.Debugf("BlockService GetBlock: '%s'", k) - datai, err := s.Datastore.Get(k.DsKey()) + block, err := s.Blockstore.Get(k) if err == nil { - log.Debug("Blockservice: Got data in datastore.") - bdata, ok := datai.([]byte) - if !ok { - return nil, fmt.Errorf("data associated with %s is not a []byte", k) - } - return &blocks.Block{ - Multihash: mh.Multihash(k), - Data: bdata, - }, nil + return block, nil + // TODO be careful checking ErrNotFound. If the underlying + // implementation changes, this will break. } else if err == ds.ErrNotFound && s.Remote != nil { log.Debug("Blockservice: Searching bitswap.") blk, err := s.Remote.GetBlock(ctx, k) @@ -101,21 +100,13 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []u.Key) <-chan *blocks go func() { var toFetch []u.Key for _, k := range ks { - datai, err := s.Datastore.Get(k.DsKey()) - if err == nil { - log.Debug("Blockservice: Got data in datastore.") - bdata, ok := datai.([]byte) - if !ok { - log.Criticalf("data associated with %s is not a []byte", k) - continue - } - out <- &blocks.Block{ - Multihash: mh.Multihash(k), - Data: bdata, - } - } else { + block, err := s.Blockstore.Get(k) + if err != nil { toFetch = append(toFetch, k) + continue } + log.Debug("Blockservice: Got data in datastore.") + out <- block } }() return out @@ -123,5 +114,5 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []u.Key) <-chan *blocks // DeleteBlock deletes a block in the blockservice from the datastore func (s *BlockService) DeleteBlock(k u.Key) error { - return s.Datastore.Delete(k.DsKey()) + return s.Blockstore.DeleteBlock(k) } diff --git a/core/core.go b/core/core.go index a0d07d9af..fcd421fc6 100644 --- a/core/core.go +++ b/core/core.go @@ -15,6 +15,7 @@ import ( exchange "github.com/jbenet/go-ipfs/exchange" bitswap "github.com/jbenet/go-ipfs/exchange/bitswap" bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network" + "github.com/jbenet/go-ipfs/exchange/offline" mount "github.com/jbenet/go-ipfs/fuse/mount" merkledag "github.com/jbenet/go-ipfs/merkledag" namesys "github.com/jbenet/go-ipfs/namesys" @@ -127,6 +128,8 @@ func NewIpfsNode(cfg *config.Config, online bool) (n *IpfsNode, err error) { return nil, debugerror.Wrap(err) } + n.Exchange = offline.Exchange() + // setup online services if online { @@ -178,7 +181,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (n *IpfsNode, err error) { // TODO(brian): when offline instantiate the BlockService with a bitswap // session that simply doesn't return blocks - n.Blocks, err = bserv.NewBlockService(n.Datastore, n.Exchange) + n.Blocks, err = bserv.New(blockstore.NewBlockstore(n.Datastore), n.Exchange) if err != nil { return nil, debugerror.Wrap(err) } diff --git a/core/mock.go b/core/mock.go index 92ffcb57d..2d2359277 100644 --- a/core/mock.go +++ b/core/mock.go @@ -3,8 +3,10 @@ package core import ( ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" syncds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" - bs "github.com/jbenet/go-ipfs/blockservice" + "github.com/jbenet/go-ipfs/blocks/blockstore" + blockservice "github.com/jbenet/go-ipfs/blockservice" ci "github.com/jbenet/go-ipfs/crypto" + "github.com/jbenet/go-ipfs/exchange/offline" mdag "github.com/jbenet/go-ipfs/merkledag" nsys "github.com/jbenet/go-ipfs/namesys" path "github.com/jbenet/go-ipfs/path" @@ -43,9 +45,7 @@ func NewMockNode() (*IpfsNode, error) { nd.Routing = dht // Bitswap - //?? - - bserv, err := bs.NewBlockService(nd.Datastore, nil) + bserv, err := blockservice.New(blockstore.NewBlockstore(nd.Datastore), offline.Exchange()) if err != nil { return nil, err } diff --git a/pin/pin_test.go b/pin/pin_test.go index 1ea302823..fc9dc215d 100644 --- a/pin/pin_test.go +++ b/pin/pin_test.go @@ -4,7 +4,10 @@ import ( "testing" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" + "github.com/jbenet/go-ipfs/blocks/blockstore" bs "github.com/jbenet/go-ipfs/blockservice" + "github.com/jbenet/go-ipfs/exchange/offline" mdag "github.com/jbenet/go-ipfs/merkledag" "github.com/jbenet/go-ipfs/util" ) @@ -19,13 +22,15 @@ func randNode() (*mdag.Node, util.Key) { func TestPinnerBasic(t *testing.T) { dstore := ds.NewMapDatastore() - bserv, err := bs.NewBlockService(dstore, nil) + bstore := blockstore.NewBlockstore(dssync.MutexWrap(dstore)) + bserv, err := bs.New(bstore, offline.Exchange()) if err != nil { t.Fatal(err) } dserv := mdag.NewDAGService(bserv) + // TODO does pinner need to share datastore with blockservice? p := NewPinner(dstore, dserv) a, ak := randNode() diff --git a/unixfs/io/dagmodifier_test.go b/unixfs/io/dagmodifier_test.go index 822c87471..d0aa83795 100644 --- a/unixfs/io/dagmodifier_test.go +++ b/unixfs/io/dagmodifier_test.go @@ -6,7 +6,10 @@ import ( "io/ioutil" "testing" + "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" + "github.com/jbenet/go-ipfs/blocks/blockstore" bs "github.com/jbenet/go-ipfs/blockservice" + "github.com/jbenet/go-ipfs/exchange/offline" imp "github.com/jbenet/go-ipfs/importer" "github.com/jbenet/go-ipfs/importer/chunk" mdag "github.com/jbenet/go-ipfs/merkledag" @@ -19,7 +22,9 @@ import ( func getMockDagServ(t *testing.T) mdag.DAGService { dstore := ds.NewMapDatastore() - bserv, err := bs.NewBlockService(dstore, nil) + tsds := sync.MutexWrap(dstore) + bstore := blockstore.NewBlockstore(tsds) + bserv, err := bs.New(bstore, offline.Exchange()) if err != nil { t.Fatal(err) } diff --git a/util/testutil/gen.go b/util/testutil/gen.go index be2fe5988..c7f310c2a 100644 --- a/util/testutil/gen.go +++ b/util/testutil/gen.go @@ -4,9 +4,12 @@ import ( crand "crypto/rand" "testing" + dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" + "github.com/jbenet/go-ipfs/exchange/offline" "github.com/jbenet/go-ipfs/peer" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + "github.com/jbenet/go-ipfs/blocks/blockstore" bsrv "github.com/jbenet/go-ipfs/blockservice" dag "github.com/jbenet/go-ipfs/merkledag" u "github.com/jbenet/go-ipfs/util" @@ -14,7 +17,8 @@ import ( func GetDAGServ(t testing.TB) dag.DAGService { dstore := ds.NewMapDatastore() - bserv, err := bsrv.NewBlockService(dstore, nil) + tsds := dssync.MutexWrap(dstore) + bserv, err := bsrv.New(blockstore.NewBlockstore(tsds), offline.Exchange()) if err != nil { t.Fatal(err) }