1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-29 17:36:38 +08:00

wire GetBlocks into blockservice

This commit is contained in:
Jeromy
2014-11-21 06:40:34 +00:00
parent ab201c15bb
commit d53deebada
15 changed files with 88 additions and 55 deletions

View File

@ -68,7 +68,7 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
// consider moving this to an sync process.
if s.Remote != nil {
ctx := context.TODO()
err = s.Remote.HasBlock(ctx, *b)
err = s.Remote.HasBlock(ctx, b)
}
return k, err
}
@ -98,6 +98,7 @@ func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, er
func (s *BlockService) GetBlocks(ctx context.Context, ks []u.Key) <-chan *blocks.Block {
out := make(chan *blocks.Block, 32)
go func() {
defer close(out)
var toFetch []u.Key
for _, k := range ks {
block, err := s.Blockstore.Get(k)
@ -108,6 +109,15 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []u.Key) <-chan *blocks
log.Debug("Blockservice: Got data in datastore.")
out <- block
}
nblocks, err := s.Remote.GetBlocks(ctx, toFetch)
if err != nil {
log.Errorf("Error with GetBlocks: %s", err)
return
}
for blk := range nblocks {
out <- blk
}
}()
return out
}

View File

@ -108,7 +108,7 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err
select {
case block := <-promise:
return &block, nil
return block, nil
case <-parent.Done():
return nil, parent.Err()
}
@ -122,7 +122,7 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan blocks.Block, error) {
func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.Block, error) {
// TODO log the request
promise := bs.notifications.Subscribe(ctx, keys...)
@ -213,7 +213,7 @@ func (bs *bitswap) loop(parent context.Context) {
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
log.Debugf("Has Block %v", blk.Key())
bs.wantlist.Remove(blk.Key())
bs.sendToPeersThatWant(ctx, blk)
@ -244,7 +244,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
for _, block := range incoming.Blocks() {
// TODO verify blocks?
if err := bs.blockstore.Put(&block); err != nil {
if err := bs.blockstore.Put(block); err != nil {
log.Criticalf("error putting block: %s", err)
continue // FIXME(brian): err ignored
}
@ -267,7 +267,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil {
continue
} else {
message.AddBlock(*block)
message.AddBlock(block)
}
}
}
@ -290,7 +290,7 @@ func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage
bs.strategy.MessageSent(p, m)
}
func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) {
func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block *blocks.Block) {
log.Debugf("Sending %v to peers that want it", block.Key())
for _, p := range bs.strategy.Peers() {

View File

@ -83,7 +83,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
if err := hasBlock.blockstore.Put(block); err != nil {
t.Fatal(err)
}
if err := hasBlock.exchange.HasBlock(context.Background(), *block); err != nil {
if err := hasBlock.exchange.HasBlock(context.Background(), block); err != nil {
t.Fatal(err)
}
@ -140,7 +140,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
first := instances[0]
for _, b := range blocks {
first.blockstore.Put(b)
first.exchange.HasBlock(context.Background(), *b)
first.exchange.HasBlock(context.Background(), b)
rs.Announce(first.peer, b.Key())
}
@ -212,7 +212,7 @@ func TestSendToWantingPeer(t *testing.T) {
beta := bg.Next()
t.Logf("Peer %v announes availability of %v\n", w.peer, beta.Key())
ctx, _ = context.WithTimeout(context.Background(), timeout)
if err := w.blockstore.Put(&beta); err != nil {
if err := w.blockstore.Put(beta); err != nil {
t.Fatal(err)
}
w.exchange.HasBlock(ctx, beta)
@ -225,7 +225,7 @@ func TestSendToWantingPeer(t *testing.T) {
t.Logf("%v announces availability of %v\n", o.peer, alpha.Key())
ctx, _ = context.WithTimeout(context.Background(), timeout)
if err := o.blockstore.Put(&alpha); err != nil {
if err := o.blockstore.Put(alpha); err != nil {
t.Fatal(err)
}
o.exchange.HasBlock(ctx, alpha)
@ -254,16 +254,16 @@ type BlockGenerator struct {
seq int
}
func (bg *BlockGenerator) Next() blocks.Block {
func (bg *BlockGenerator) Next() *blocks.Block {
bg.seq++
return *blocks.NewBlock([]byte(string(bg.seq)))
return blocks.NewBlock([]byte(string(bg.seq)))
}
func (bg *BlockGenerator) Blocks(n int) []*blocks.Block {
blocks := make([]*blocks.Block, 0)
for i := 0; i < n; i++ {
b := bg.Next()
blocks = append(blocks, &b)
blocks = append(blocks, b)
}
return blocks
}

View File

@ -19,7 +19,7 @@ type BitSwapMessage interface {
Wantlist() []u.Key
// Blocks returns a slice of unique blocks
Blocks() []blocks.Block
Blocks() []*blocks.Block
// AddWanted adds the key to the Wantlist.
//
@ -32,7 +32,7 @@ type BitSwapMessage interface {
// implies Priority(A) > Priority(B)
AddWanted(u.Key)
AddBlock(blocks.Block)
AddBlock(*blocks.Block)
Exportable
}
@ -42,14 +42,14 @@ type Exportable interface {
}
type impl struct {
existsInWantlist map[u.Key]struct{} // map to detect duplicates
wantlist []u.Key // slice to preserve ordering
blocks map[u.Key]blocks.Block // map to detect duplicates
existsInWantlist map[u.Key]struct{} // map to detect duplicates
wantlist []u.Key // slice to preserve ordering
blocks map[u.Key]*blocks.Block // map to detect duplicates
}
func New() BitSwapMessage {
return &impl{
blocks: make(map[u.Key]blocks.Block),
blocks: make(map[u.Key]*blocks.Block),
existsInWantlist: make(map[u.Key]struct{}),
wantlist: make([]u.Key, 0),
}
@ -62,7 +62,7 @@ func newMessageFromProto(pbm pb.Message) BitSwapMessage {
}
for _, d := range pbm.GetBlocks() {
b := blocks.NewBlock(d)
m.AddBlock(*b)
m.AddBlock(b)
}
return m
}
@ -71,8 +71,8 @@ func (m *impl) Wantlist() []u.Key {
return m.wantlist
}
func (m *impl) Blocks() []blocks.Block {
bs := make([]blocks.Block, 0)
func (m *impl) Blocks() []*blocks.Block {
bs := make([]*blocks.Block, 0)
for _, block := range m.blocks {
bs = append(bs, block)
}
@ -88,7 +88,7 @@ func (m *impl) AddWanted(k u.Key) {
m.wantlist = append(m.wantlist, k)
}
func (m *impl) AddBlock(b blocks.Block) {
func (m *impl) AddBlock(b *blocks.Block) {
m.blocks[b.Key()] = b
}

View File

@ -42,7 +42,7 @@ func TestAppendBlock(t *testing.T) {
m := New()
for _, str := range strs {
block := blocks.NewBlock([]byte(str))
m.AddBlock(*block)
m.AddBlock(block)
}
// assert strings are in proto message
@ -133,10 +133,10 @@ func TestToNetFromNetPreservesWantList(t *testing.T) {
func TestToAndFromNetMessage(t *testing.T) {
original := New()
original.AddBlock(*blocks.NewBlock([]byte("W")))
original.AddBlock(*blocks.NewBlock([]byte("E")))
original.AddBlock(*blocks.NewBlock([]byte("F")))
original.AddBlock(*blocks.NewBlock([]byte("M")))
original.AddBlock(blocks.NewBlock([]byte("W")))
original.AddBlock(blocks.NewBlock([]byte("E")))
original.AddBlock(blocks.NewBlock([]byte("F")))
original.AddBlock(blocks.NewBlock([]byte("M")))
p := peer.WithIDString("X")
netmsg, err := original.ToNet(p)
@ -180,8 +180,8 @@ func TestDuplicates(t *testing.T) {
t.Fatal("Duplicate in BitSwapMessage")
}
msg.AddBlock(*b)
msg.AddBlock(*b)
msg.AddBlock(b)
msg.AddBlock(b)
if len(msg.Blocks()) != 1 {
t.Fatal("Duplicate in BitSwapMessage")
}

View File

@ -11,8 +11,8 @@ import (
const bufferSize = 16
type PubSub interface {
Publish(block blocks.Block)
Subscribe(ctx context.Context, keys ...u.Key) <-chan blocks.Block
Publish(block *blocks.Block)
Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block
Shutdown()
}
@ -24,7 +24,7 @@ type impl struct {
wrapped pubsub.PubSub
}
func (ps *impl) Publish(block blocks.Block) {
func (ps *impl) Publish(block *blocks.Block) {
topic := string(block.Key())
ps.wrapped.Pub(block, topic)
}
@ -32,18 +32,18 @@ func (ps *impl) Publish(block blocks.Block) {
// Subscribe returns a one-time use |blockChannel|. |blockChannel| returns nil
// if the |ctx| times out or is cancelled. Then channel is closed after the
// blocks given by |keys| are sent.
func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan blocks.Block {
func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block {
topics := make([]string, 0)
for _, key := range keys {
topics = append(topics, string(key))
}
subChan := ps.wrapped.SubOnce(topics...)
blockChannel := make(chan blocks.Block, 1) // buffered so the sender doesn't wait on receiver
blockChannel := make(chan *blocks.Block, 1) // buffered so the sender doesn't wait on receiver
go func() {
defer close(blockChannel)
select {
case val := <-subChan:
block, ok := val.(blocks.Block)
block, ok := val.(*blocks.Block)
if ok {
blockChannel <- block
}

View File

@ -16,13 +16,13 @@ func TestPublishSubscribe(t *testing.T) {
defer n.Shutdown()
ch := n.Subscribe(context.Background(), blockSent.Key())
n.Publish(*blockSent)
n.Publish(blockSent)
blockRecvd, ok := <-ch
if !ok {
t.Fail()
}
assertBlocksEqual(t, blockRecvd, *blockSent)
assertBlocksEqual(t, blockRecvd, blockSent)
}
@ -39,14 +39,14 @@ func TestCarryOnWhenDeadlineExpires(t *testing.T) {
assertBlockChannelNil(t, blockChannel)
}
func assertBlockChannelNil(t *testing.T, blockChannel <-chan blocks.Block) {
func assertBlockChannelNil(t *testing.T, blockChannel <-chan *blocks.Block) {
_, ok := <-blockChannel
if ok {
t.Fail()
}
}
func assertBlocksEqual(t *testing.T, a, b blocks.Block) {
func assertBlocksEqual(t *testing.T, a, b *blocks.Block) {
if !bytes.Equal(a.Data, b.Data) {
t.Fail()
}

View File

@ -30,7 +30,7 @@ func TestConsistentAccounting(t *testing.T) {
m := message.New()
content := []string{"this", "is", "message", "i"}
m.AddBlock(*blocks.NewBlock([]byte(strings.Join(content, " "))))
m.AddBlock(blocks.NewBlock([]byte(strings.Join(content, " "))))
sender.MessageSent(receiver.Peer, m)
receiver.MessageReceived(sender.Peer, m)

View File

@ -33,7 +33,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) {
// TODO test contents of incoming message
m := bsmsg.New()
m.AddBlock(*blocks.NewBlock([]byte(expectedStr)))
m.AddBlock(blocks.NewBlock([]byte(expectedStr)))
return from, m
}))
@ -41,7 +41,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) {
t.Log("Build a message and send a synchronous request to recipient")
message := bsmsg.New()
message.AddBlock(*blocks.NewBlock([]byte("data")))
message.AddBlock(blocks.NewBlock([]byte("data")))
response, err := initiator.SendRequest(
context.Background(), peer.WithID(idOfRecipient), message)
if err != nil {
@ -77,7 +77,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
peer.Peer, bsmsg.BitSwapMessage) {
msgToWaiter := bsmsg.New()
msgToWaiter.AddBlock(*blocks.NewBlock([]byte(expectedStr)))
msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr)))
return fromWaiter, msgToWaiter
}))
@ -105,7 +105,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
}))
messageSentAsync := bsmsg.New()
messageSentAsync.AddBlock(*blocks.NewBlock([]byte("data")))
messageSentAsync.AddBlock(blocks.NewBlock([]byte("data")))
errSending := waiter.SendMessage(
context.Background(), peer.WithID(idOfResponder), messageSentAsync)
if errSending != nil {

View File

@ -16,9 +16,11 @@ type Interface interface {
// GetBlock returns the block associated with a given key.
GetBlock(context.Context, u.Key) (*blocks.Block, error)
GetBlocks(context.Context, []u.Key) (<-chan *blocks.Block, error)
// TODO Should callers be concerned with whether the block was made
// available on the network?
HasBlock(context.Context, blocks.Block) error
HasBlock(context.Context, *blocks.Block) error
io.Closer
}

View File

@ -30,7 +30,7 @@ func (_ *offlineExchange) GetBlock(context.Context, u.Key) (*blocks.Block, error
}
// HasBlock always returns nil.
func (_ *offlineExchange) HasBlock(context.Context, blocks.Block) error {
func (_ *offlineExchange) HasBlock(context.Context, *blocks.Block) error {
return nil
}
@ -38,3 +38,7 @@ func (_ *offlineExchange) HasBlock(context.Context, blocks.Block) error {
func (_ *offlineExchange) Close() error {
return nil
}
func (_ *offlineExchange) GetBlocks(context.Context, []u.Key) (<-chan *blocks.Block, error) {
return nil, OfflineMode
}

View File

@ -21,7 +21,7 @@ func TestBlockReturnsErr(t *testing.T) {
func TestHasBlockReturnsNil(t *testing.T) {
off := Exchange()
block := blocks.NewBlock([]byte("data"))
err := off.HasBlock(context.Background(), *block)
err := off.HasBlock(context.Background(), block)
if err != nil {
t.Fatal("")
}

View File

@ -69,6 +69,7 @@ func testFileConsistency(t *testing.T, bs chunk.BlockSplitter, nbytes int) {
if err != nil {
t.Fatal(err)
}
r, err := uio.NewDagReader(nd, nil)
if err != nil {
t.Fatal(err)

View File

@ -328,7 +328,7 @@ func (ds *dagService) BatchFetch(ctx context.Context, root *Node) <-chan *Node {
if next == i {
sig <- nd
next++
for ; nodes[next] != nil; next++ {
for ; next < len(nodes) && nodes[next] != nil; next++ {
sig <- nodes[next]
}
}

View File

@ -17,10 +17,11 @@ var ErrIsDir = errors.New("this dag node is a directory")
// DagReader provides a way to easily read the data contained in a dag.
type DagReader struct {
serv mdag.DAGService
node *mdag.Node
buf io.Reader
fetchChan <-chan *mdag.Node
serv mdag.DAGService
node *mdag.Node
buf io.Reader
fetchChan <-chan *mdag.Node
linkPosition int
}
// NewDagReader creates a new reader object that reads the data represented by the given
@ -37,11 +38,15 @@ func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) {
// Dont allow reading directories
return nil, ErrIsDir
case ftpb.Data_File:
var fetchChan <-chan *mdag.Node
if serv != nil {
fetchChan = serv.BatchFetch(context.TODO(), n)
}
return &DagReader{
node: n,
serv: serv,
buf: bytes.NewBuffer(pb.GetData()),
fetchChan: serv.BatchFetch(context.TODO(), n),
fetchChan: fetchChan,
}, nil
case ftpb.Data_Raw:
// Raw block will just be a single level, return a byte buffer
@ -61,6 +66,17 @@ func (dr *DagReader) precalcNextBuf() error {
if !ok {
return io.EOF
}
default:
// Only used when fetchChan is nil,
// which only happens when passed in a nil dagservice
// TODO: this logic is hard to follow, do it better.
// NOTE: the only time this code is used, is during the
// importer tests, consider just changing those tests
if dr.linkPosition >= len(dr.node.Links) {
return io.EOF
}
nxt = dr.node.Links[dr.linkPosition].Node
dr.linkPosition++
}
pb := new(ftpb.Data)