From 670d0244b44de0e89eb71e42ef350f9ae666ad97 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Wed, 26 Nov 2014 16:55:28 -0800 Subject: [PATCH] feat(bitswap) make offline exchange query datastore License: MIT Signed-off-by: Brian Tiger Chow --- blockservice/blocks_test.go | 5 ++- core/core.go | 2 +- core/mock.go | 3 +- exchange/offline/offline.go | 52 ++++++++++++++++++++-------- exchange/offline/offline_test.go | 59 +++++++++++++++++++++++++++++--- pin/pin_test.go | 2 +- unixfs/io/dagmodifier_test.go | 2 +- util/testutil/gen.go | 5 ++- 8 files changed, 101 insertions(+), 29 deletions(-) diff --git a/blockservice/blocks_test.go b/blockservice/blocks_test.go index 1a75723e2..2645b2024 100644 --- a/blockservice/blocks_test.go +++ b/blockservice/blocks_test.go @@ -19,9 +19,8 @@ import ( ) func TestBlocks(t *testing.T) { - d := ds.NewMapDatastore() - tsds := dssync.MutexWrap(d) - bs, err := New(blockstore.NewBlockstore(tsds), offline.Exchange()) + bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + bs, err := New(bstore, offline.Exchange(bstore)) if err != nil { t.Error("failed to construct block service", err) return diff --git a/core/core.go b/core/core.go index 148853bfc..93382d20c 100644 --- a/core/core.go +++ b/core/core.go @@ -130,7 +130,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (n *IpfsNode, err error) { return nil, debugerror.Wrap(err) } - n.Exchange = offline.Exchange() + n.Exchange = offline.Exchange(blockstore.NewBlockstore(n.Datastore)) // setup online services if online { diff --git a/core/mock.go b/core/mock.go index 2d2359277..ab1149787 100644 --- a/core/mock.go +++ b/core/mock.go @@ -45,7 +45,8 @@ func NewMockNode() (*IpfsNode, error) { nd.Routing = dht // Bitswap - bserv, err := blockservice.New(blockstore.NewBlockstore(nd.Datastore), offline.Exchange()) + bstore := blockstore.NewBlockstore(nd.Datastore) + bserv, err := blockservice.New(bstore, offline.Exchange(bstore)) if err != nil { return nil, err } diff --git a/exchange/offline/offline.go b/exchange/offline/offline.go index 24a89e038..f1a6aaa61 100644 --- a/exchange/offline/offline.go +++ b/exchange/offline/offline.go @@ -3,42 +3,66 @@ package offline import ( - "errors" - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - blocks "github.com/jbenet/go-ipfs/blocks" + "github.com/jbenet/go-ipfs/blocks/blockstore" exchange "github.com/jbenet/go-ipfs/exchange" u "github.com/jbenet/go-ipfs/util" ) -var OfflineMode = errors.New("Block unavailable. Operating in offline mode") - -func Exchange() exchange.Interface { - return &offlineExchange{} +func Exchange(bs blockstore.Blockstore) exchange.Interface { + return &offlineExchange{bs: bs} } // offlineExchange implements the Exchange interface but doesn't return blocks. // For use in offline mode. -type offlineExchange struct{} +type offlineExchange struct { + bs blockstore.Blockstore +} // GetBlock returns nil to signal that a block could not be retrieved for the // given key. // NB: This function may return before the timeout expires. -func (_ *offlineExchange) GetBlock(context.Context, u.Key) (*blocks.Block, error) { - return nil, OfflineMode +func (e *offlineExchange) GetBlock(_ context.Context, k u.Key) (*blocks.Block, error) { + return e.bs.Get(k) } // HasBlock always returns nil. -func (_ *offlineExchange) HasBlock(context.Context, *blocks.Block) error { - return nil +func (e *offlineExchange) HasBlock(_ context.Context, b *blocks.Block) error { + return e.bs.Put(b) } // Close always returns nil. func (_ *offlineExchange) Close() error { + // NB: exchange doesn't own the blockstore's underlying datastore, so it is + // not responsible for closing it. return nil } -func (_ *offlineExchange) GetBlocks(context.Context, []u.Key) (<-chan *blocks.Block, error) { - return nil, OfflineMode +func (e *offlineExchange) GetBlocks(ctx context.Context, ks []u.Key) (<-chan *blocks.Block, error) { + out := make(chan *blocks.Block, 0) + go func() { + defer close(out) + var misses []u.Key + for _, k := range ks { + hit, err := e.bs.Get(k) + if err != nil { + misses = append(misses, k) + // a long line of misses should abort when context is cancelled. + select { + // TODO case send misses down channel + case <-ctx.Done(): + return + default: + continue + } + } + select { + case out <- hit: + case <-ctx.Done(): + return + } + } + }() + return out, nil } diff --git a/exchange/offline/offline_test.go b/exchange/offline/offline_test.go index ac02d2101..d32f336d0 100644 --- a/exchange/offline/offline_test.go +++ b/exchange/offline/offline_test.go @@ -4,13 +4,16 @@ import ( "testing" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" blocks "github.com/jbenet/go-ipfs/blocks" + "github.com/jbenet/go-ipfs/blocks/blockstore" + "github.com/jbenet/go-ipfs/blocks/blocksutil" u "github.com/jbenet/go-ipfs/util" ) func TestBlockReturnsErr(t *testing.T) { - off := Exchange() + off := Exchange(bstore()) _, err := off.GetBlock(context.Background(), u.Key("foo")) if err != nil { return // as desired @@ -19,10 +22,56 @@ func TestBlockReturnsErr(t *testing.T) { } func TestHasBlockReturnsNil(t *testing.T) { - off := Exchange() + store := bstore() + ex := Exchange(store) block := blocks.NewBlock([]byte("data")) - err := off.HasBlock(context.Background(), block) + + err := ex.HasBlock(context.Background(), block) if err != nil { - t.Fatal("") + t.Fail() + } + + if _, err := store.Get(block.Key()); err != nil { + t.Fatal(err) } } + +func TestGetBlocks(t *testing.T) { + store := bstore() + ex := Exchange(store) + g := blocksutil.NewBlockGenerator() + + expected := g.Blocks(2) + + for _, b := range expected { + if err := ex.HasBlock(context.Background(), b); err != nil { + t.Fail() + } + } + + request := func() []u.Key { + var ks []u.Key + + for _, b := range expected { + ks = append(ks, b.Key()) + } + return ks + }() + + received, err := ex.GetBlocks(context.Background(), request) + if err != nil { + t.Fatal(err) + } + + var count int + for _ = range received { + count++ + } + if len(expected) != count { + t.Fail() + } +} + +func bstore() blockstore.Blockstore { + return blockstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) +} diff --git a/pin/pin_test.go b/pin/pin_test.go index fc9dc215d..dada99803 100644 --- a/pin/pin_test.go +++ b/pin/pin_test.go @@ -23,7 +23,7 @@ func randNode() (*mdag.Node, util.Key) { func TestPinnerBasic(t *testing.T) { dstore := ds.NewMapDatastore() bstore := blockstore.NewBlockstore(dssync.MutexWrap(dstore)) - bserv, err := bs.New(bstore, offline.Exchange()) + bserv, err := bs.New(bstore, offline.Exchange(bstore)) if err != nil { t.Fatal(err) } diff --git a/unixfs/io/dagmodifier_test.go b/unixfs/io/dagmodifier_test.go index d0aa83795..ed5b10d69 100644 --- a/unixfs/io/dagmodifier_test.go +++ b/unixfs/io/dagmodifier_test.go @@ -24,7 +24,7 @@ func getMockDagServ(t *testing.T) mdag.DAGService { dstore := ds.NewMapDatastore() tsds := sync.MutexWrap(dstore) bstore := blockstore.NewBlockstore(tsds) - bserv, err := bs.New(bstore, offline.Exchange()) + bserv, err := bs.New(bstore, offline.Exchange(bstore)) if err != nil { t.Fatal(err) } diff --git a/util/testutil/gen.go b/util/testutil/gen.go index c7f310c2a..186740ea9 100644 --- a/util/testutil/gen.go +++ b/util/testutil/gen.go @@ -16,9 +16,8 @@ import ( ) func GetDAGServ(t testing.TB) dag.DAGService { - dstore := ds.NewMapDatastore() - tsds := dssync.MutexWrap(dstore) - bserv, err := bsrv.New(blockstore.NewBlockstore(tsds), offline.Exchange()) + bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + bserv, err := bsrv.New(bstore, offline.Exchange(bstore)) if err != nil { t.Fatal(err) }