mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-27 16:07:42 +08:00
feat(bitswap) make offline exchange query datastore
License: MIT Signed-off-by: Brian Tiger Chow <brian@perfmode.com>
This commit is contained in:
@ -19,9 +19,8 @@ import (
|
||||
)
|
||||
|
||||
func TestBlocks(t *testing.T) {
|
||||
d := ds.NewMapDatastore()
|
||||
tsds := dssync.MutexWrap(d)
|
||||
bs, err := New(blockstore.NewBlockstore(tsds), offline.Exchange())
|
||||
bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
|
||||
bs, err := New(bstore, offline.Exchange(bstore))
|
||||
if err != nil {
|
||||
t.Error("failed to construct block service", err)
|
||||
return
|
||||
|
@ -130,7 +130,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (n *IpfsNode, err error) {
|
||||
return nil, debugerror.Wrap(err)
|
||||
}
|
||||
|
||||
n.Exchange = offline.Exchange()
|
||||
n.Exchange = offline.Exchange(blockstore.NewBlockstore(n.Datastore))
|
||||
|
||||
// setup online services
|
||||
if online {
|
||||
|
@ -45,7 +45,8 @@ func NewMockNode() (*IpfsNode, error) {
|
||||
nd.Routing = dht
|
||||
|
||||
// Bitswap
|
||||
bserv, err := blockservice.New(blockstore.NewBlockstore(nd.Datastore), offline.Exchange())
|
||||
bstore := blockstore.NewBlockstore(nd.Datastore)
|
||||
bserv, err := blockservice.New(bstore, offline.Exchange(bstore))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -3,42 +3,66 @@
|
||||
package offline
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
|
||||
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||
"github.com/jbenet/go-ipfs/blocks/blockstore"
|
||||
exchange "github.com/jbenet/go-ipfs/exchange"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
)
|
||||
|
||||
var OfflineMode = errors.New("Block unavailable. Operating in offline mode")
|
||||
|
||||
func Exchange() exchange.Interface {
|
||||
return &offlineExchange{}
|
||||
func Exchange(bs blockstore.Blockstore) exchange.Interface {
|
||||
return &offlineExchange{bs: bs}
|
||||
}
|
||||
|
||||
// offlineExchange implements the Exchange interface but doesn't return blocks.
|
||||
// For use in offline mode.
|
||||
type offlineExchange struct{}
|
||||
type offlineExchange struct {
|
||||
bs blockstore.Blockstore
|
||||
}
|
||||
|
||||
// GetBlock returns nil to signal that a block could not be retrieved for the
|
||||
// given key.
|
||||
// NB: This function may return before the timeout expires.
|
||||
func (_ *offlineExchange) GetBlock(context.Context, u.Key) (*blocks.Block, error) {
|
||||
return nil, OfflineMode
|
||||
func (e *offlineExchange) GetBlock(_ context.Context, k u.Key) (*blocks.Block, error) {
|
||||
return e.bs.Get(k)
|
||||
}
|
||||
|
||||
// HasBlock always returns nil.
|
||||
func (_ *offlineExchange) HasBlock(context.Context, *blocks.Block) error {
|
||||
return nil
|
||||
func (e *offlineExchange) HasBlock(_ context.Context, b *blocks.Block) error {
|
||||
return e.bs.Put(b)
|
||||
}
|
||||
|
||||
// Close always returns nil.
|
||||
func (_ *offlineExchange) Close() error {
|
||||
// NB: exchange doesn't own the blockstore's underlying datastore, so it is
|
||||
// not responsible for closing it.
|
||||
return nil
|
||||
}
|
||||
|
||||
func (_ *offlineExchange) GetBlocks(context.Context, []u.Key) (<-chan *blocks.Block, error) {
|
||||
return nil, OfflineMode
|
||||
func (e *offlineExchange) GetBlocks(ctx context.Context, ks []u.Key) (<-chan *blocks.Block, error) {
|
||||
out := make(chan *blocks.Block, 0)
|
||||
go func() {
|
||||
defer close(out)
|
||||
var misses []u.Key
|
||||
for _, k := range ks {
|
||||
hit, err := e.bs.Get(k)
|
||||
if err != nil {
|
||||
misses = append(misses, k)
|
||||
// a long line of misses should abort when context is cancelled.
|
||||
select {
|
||||
// TODO case send misses down channel
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
select {
|
||||
case out <- hit:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
return out, nil
|
||||
}
|
||||
|
@ -4,13 +4,16 @@ import (
|
||||
"testing"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
|
||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||
ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
|
||||
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||
"github.com/jbenet/go-ipfs/blocks/blockstore"
|
||||
"github.com/jbenet/go-ipfs/blocks/blocksutil"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
)
|
||||
|
||||
func TestBlockReturnsErr(t *testing.T) {
|
||||
off := Exchange()
|
||||
off := Exchange(bstore())
|
||||
_, err := off.GetBlock(context.Background(), u.Key("foo"))
|
||||
if err != nil {
|
||||
return // as desired
|
||||
@ -19,10 +22,56 @@ func TestBlockReturnsErr(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestHasBlockReturnsNil(t *testing.T) {
|
||||
off := Exchange()
|
||||
store := bstore()
|
||||
ex := Exchange(store)
|
||||
block := blocks.NewBlock([]byte("data"))
|
||||
err := off.HasBlock(context.Background(), block)
|
||||
|
||||
err := ex.HasBlock(context.Background(), block)
|
||||
if err != nil {
|
||||
t.Fatal("")
|
||||
t.Fail()
|
||||
}
|
||||
|
||||
if _, err := store.Get(block.Key()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetBlocks(t *testing.T) {
|
||||
store := bstore()
|
||||
ex := Exchange(store)
|
||||
g := blocksutil.NewBlockGenerator()
|
||||
|
||||
expected := g.Blocks(2)
|
||||
|
||||
for _, b := range expected {
|
||||
if err := ex.HasBlock(context.Background(), b); err != nil {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
||||
request := func() []u.Key {
|
||||
var ks []u.Key
|
||||
|
||||
for _, b := range expected {
|
||||
ks = append(ks, b.Key())
|
||||
}
|
||||
return ks
|
||||
}()
|
||||
|
||||
received, err := ex.GetBlocks(context.Background(), request)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var count int
|
||||
for _ = range received {
|
||||
count++
|
||||
}
|
||||
if len(expected) != count {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
||||
func bstore() blockstore.Blockstore {
|
||||
return blockstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
|
||||
}
|
||||
|
@ -23,7 +23,7 @@ func randNode() (*mdag.Node, util.Key) {
|
||||
func TestPinnerBasic(t *testing.T) {
|
||||
dstore := ds.NewMapDatastore()
|
||||
bstore := blockstore.NewBlockstore(dssync.MutexWrap(dstore))
|
||||
bserv, err := bs.New(bstore, offline.Exchange())
|
||||
bserv, err := bs.New(bstore, offline.Exchange(bstore))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -24,7 +24,7 @@ func getMockDagServ(t *testing.T) mdag.DAGService {
|
||||
dstore := ds.NewMapDatastore()
|
||||
tsds := sync.MutexWrap(dstore)
|
||||
bstore := blockstore.NewBlockstore(tsds)
|
||||
bserv, err := bs.New(bstore, offline.Exchange())
|
||||
bserv, err := bs.New(bstore, offline.Exchange(bstore))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -16,9 +16,8 @@ import (
|
||||
)
|
||||
|
||||
func GetDAGServ(t testing.TB) dag.DAGService {
|
||||
dstore := ds.NewMapDatastore()
|
||||
tsds := dssync.MutexWrap(dstore)
|
||||
bserv, err := bsrv.New(blockstore.NewBlockstore(tsds), offline.Exchange())
|
||||
bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
|
||||
bserv, err := bsrv.New(bstore, offline.Exchange(bstore))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
Reference in New Issue
Block a user