1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-07-02 03:28:25 +08:00

Merge pull request #386 from jbenet/bithack

complete Bithack changes
This commit is contained in:
Jeromy Johnson
2014-12-06 00:20:59 +00:00
46 changed files with 1142 additions and 385 deletions

8
Godeps/Godeps.json generated
View File

@ -136,6 +136,10 @@
"Comment": "v0.6.0-5-gf92b795", "Comment": "v0.6.0-5-gf92b795",
"Rev": "f92b7950b372b1db80bd3527e4d40e42555fe6c2" "Rev": "f92b7950b372b1db80bd3527e4d40e42555fe6c2"
}, },
{
"ImportPath": "github.com/maybebtc/pubsub",
"Rev": "39ce5f556423a4c7223b370fa17a3bbd75b2d197"
},
{ {
"ImportPath": "github.com/mitchellh/go-homedir", "ImportPath": "github.com/mitchellh/go-homedir",
"Rev": "7d2d8c8a4e078ce3c58736ab521a40b37a504c52" "Rev": "7d2d8c8a4e078ce3c58736ab521a40b37a504c52"
@ -144,10 +148,6 @@
"ImportPath": "github.com/syndtr/goleveldb/leveldb", "ImportPath": "github.com/syndtr/goleveldb/leveldb",
"Rev": "99056d50e56252fbe0021d5c893defca5a76baf8" "Rev": "99056d50e56252fbe0021d5c893defca5a76baf8"
}, },
{
"ImportPath": "github.com/tuxychandru/pubsub",
"Rev": "02de8aa2db3d570c5ab1be5ba67b456fd0fb7c4e"
},
{ {
"ImportPath": "gopkg.in/natefinch/lumberjack.v2", "ImportPath": "gopkg.in/natefinch/lumberjack.v2",
"Comment": "v1.0-12-gd28785c", "Comment": "v1.0-12-gd28785c",

View File

@ -0,0 +1,13 @@
{
"ImportPath": "github.com/maybebtc/pubsub",
"GoVersion": "go1.3.3",
"Packages": [
"./..."
],
"Deps": [
{
"ImportPath": "gopkg.in/check.v1",
"Rev": "64131543e7896d5bcc6bd5a76287eb75ea96c673"
}
]
}

View File

@ -0,0 +1,5 @@
This directory tree is generated automatically by godep.
Please do not edit.
See https://github.com/tools/godep for more information.

View File

@ -0,0 +1,2 @@
vendor:
godep save -r ./...

View File

@ -15,6 +15,7 @@ type operation int
const ( const (
sub operation = iota sub operation = iota
subOnce subOnce
subOnceEach
pub pub
unsub unsub
unsubAll unsubAll
@ -55,6 +56,12 @@ func (ps *PubSub) SubOnce(topics ...string) chan interface{} {
return ps.sub(subOnce, topics...) return ps.sub(subOnce, topics...)
} }
// SubOnceEach returns a channel on which callers receive, at most, one message
// for each topic.
func (ps *PubSub) SubOnceEach(topics ...string) chan interface{} {
return ps.sub(subOnceEach, topics...)
}
func (ps *PubSub) sub(op operation, topics ...string) chan interface{} { func (ps *PubSub) sub(op operation, topics ...string) chan interface{} {
ch := make(chan interface{}, ps.capacity) ch := make(chan interface{}, ps.capacity)
ps.cmdChan <- cmd{op: op, topics: topics, ch: ch} ps.cmdChan <- cmd{op: op, topics: topics, ch: ch}
@ -66,6 +73,12 @@ func (ps *PubSub) AddSub(ch chan interface{}, topics ...string) {
ps.cmdChan <- cmd{op: sub, topics: topics, ch: ch} ps.cmdChan <- cmd{op: sub, topics: topics, ch: ch}
} }
// AddSubOnceEach adds subscriptions to an existing channel with SubOnceEach
// behavior.
func (ps *PubSub) AddSubOnceEach(ch chan interface{}, topics ...string) {
ps.cmdChan <- cmd{op: subOnceEach, topics: topics, ch: ch}
}
// Pub publishes the given message to all subscribers of // Pub publishes the given message to all subscribers of
// the specified topics. // the specified topics.
func (ps *PubSub) Pub(msg interface{}, topics ...string) { func (ps *PubSub) Pub(msg interface{}, topics ...string) {
@ -98,7 +111,7 @@ func (ps *PubSub) Shutdown() {
func (ps *PubSub) start() { func (ps *PubSub) start() {
reg := registry{ reg := registry{
topics: make(map[string]map[chan interface{}]bool), topics: make(map[string]map[chan interface{}]subtype),
revTopics: make(map[chan interface{}]map[string]bool), revTopics: make(map[chan interface{}]map[string]bool),
} }
@ -119,10 +132,13 @@ loop:
for _, topic := range cmd.topics { for _, topic := range cmd.topics {
switch cmd.op { switch cmd.op {
case sub: case sub:
reg.add(topic, cmd.ch, false) reg.add(topic, cmd.ch, stNorm)
case subOnce: case subOnce:
reg.add(topic, cmd.ch, true) reg.add(topic, cmd.ch, stOnceAny)
case subOnceEach:
reg.add(topic, cmd.ch, stOnceEach)
case pub: case pub:
reg.send(topic, cmd.msg) reg.send(topic, cmd.msg)
@ -146,15 +162,23 @@ loop:
// registry maintains the current subscription state. It's not // registry maintains the current subscription state. It's not
// safe to access a registry from multiple goroutines simultaneously. // safe to access a registry from multiple goroutines simultaneously.
type registry struct { type registry struct {
topics map[string]map[chan interface{}]bool topics map[string]map[chan interface{}]subtype
revTopics map[chan interface{}]map[string]bool revTopics map[chan interface{}]map[string]bool
} }
func (reg *registry) add(topic string, ch chan interface{}, once bool) { type subtype int
const (
stOnceAny = iota
stOnceEach
stNorm
)
func (reg *registry) add(topic string, ch chan interface{}, st subtype) {
if reg.topics[topic] == nil { if reg.topics[topic] == nil {
reg.topics[topic] = make(map[chan interface{}]bool) reg.topics[topic] = make(map[chan interface{}]subtype)
} }
reg.topics[topic][ch] = once reg.topics[topic][ch] = st
if reg.revTopics[ch] == nil { if reg.revTopics[ch] == nil {
reg.revTopics[ch] = make(map[string]bool) reg.revTopics[ch] = make(map[string]bool)
@ -163,12 +187,15 @@ func (reg *registry) add(topic string, ch chan interface{}, once bool) {
} }
func (reg *registry) send(topic string, msg interface{}) { func (reg *registry) send(topic string, msg interface{}) {
for ch, once := range reg.topics[topic] { for ch, st := range reg.topics[topic] {
ch <- msg ch <- msg
if once { switch st {
case stOnceAny:
for topic := range reg.revTopics[ch] { for topic := range reg.revTopics[ch] {
reg.remove(topic, ch) reg.remove(topic, ch)
} }
case stOnceEach:
reg.remove(topic, ch)
} }
} }
} }

View File

@ -5,7 +5,7 @@
package pubsub package pubsub
import ( import (
check "launchpad.net/gocheck" check "gopkg.in/check.v1"
"runtime" "runtime"
"testing" "testing"
"time" "time"
@ -181,6 +181,23 @@ func (s *Suite) TestMultiSubOnce(c *check.C) {
ps.Shutdown() ps.Shutdown()
} }
func (s *Suite) TestMultiSubOnceEach(c *check.C) {
ps := New(1)
ch := ps.SubOnceEach("t1", "t2")
ps.Pub("hi", "t1")
c.Check(<-ch, check.Equals, "hi")
ps.Pub("hi!", "t1") // ignored
ps.Pub("hello", "t2")
c.Check(<-ch, check.Equals, "hello")
_, ok := <-ch
c.Check(ok, check.Equals, false)
ps.Shutdown()
}
func (s *Suite) TestMultiPub(c *check.C) { func (s *Suite) TestMultiPub(c *check.C) {
ps := New(1) ps := New(1)
ch1 := ps.Sub("t1") ch1 := ps.Sub("t1")

View File

@ -15,6 +15,8 @@ import (
var ValueTypeMismatch = errors.New("The retrieved value is not a Block") var ValueTypeMismatch = errors.New("The retrieved value is not a Block")
type Blockstore interface { type Blockstore interface {
DeleteBlock(u.Key) error
Has(u.Key) (bool, error)
Get(u.Key) (*blocks.Block, error) Get(u.Key) (*blocks.Block, error)
Put(*blocks.Block) error Put(*blocks.Block) error
} }
@ -45,3 +47,11 @@ func (bs *blockstore) Get(k u.Key) (*blocks.Block, error) {
func (bs *blockstore) Put(block *blocks.Block) error { func (bs *blockstore) Put(block *blocks.Block) error {
return bs.datastore.Put(block.Key().DsKey(), block.Data) return bs.datastore.Put(block.Key().DsKey(), block.Data)
} }
func (bs *blockstore) Has(k u.Key) (bool, error) {
return bs.datastore.Has(k.DsKey())
}
func (s *blockstore) DeleteBlock(k u.Key) error {
return s.datastore.Delete(k.DsKey())
}

View File

@ -0,0 +1,25 @@
package blocksutil
import "github.com/jbenet/go-ipfs/blocks"
func NewBlockGenerator() BlockGenerator {
return BlockGenerator{}
}
type BlockGenerator struct {
seq int
}
func (bg *BlockGenerator) Next() *blocks.Block {
bg.seq++
return blocks.NewBlock([]byte(string(bg.seq)))
}
func (bg *BlockGenerator) Blocks(n int) []*blocks.Block {
blocks := make([]*blocks.Block, 0)
for i := 0; i < n; i++ {
b := bg.Next()
blocks = append(blocks, b)
}
return blocks
}

View File

@ -6,15 +6,22 @@ import (
"time" "time"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
blocks "github.com/jbenet/go-ipfs/blocks" blocks "github.com/jbenet/go-ipfs/blocks"
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil"
bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
offline "github.com/jbenet/go-ipfs/exchange/offline"
"github.com/jbenet/go-ipfs/routing/mock"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
) )
func TestBlocks(t *testing.T) { func TestBlocks(t *testing.T) {
d := ds.NewMapDatastore() d := ds.NewMapDatastore()
bs, err := NewBlockService(d, nil) tsds := dssync.MutexWrap(d)
bs, err := New(blockstore.NewBlockstore(tsds), offline.Exchange())
if err != nil { if err != nil {
t.Error("failed to construct block service", err) t.Error("failed to construct block service", err)
return return
@ -55,3 +62,46 @@ func TestBlocks(t *testing.T) {
t.Error("Block data is not equal.") t.Error("Block data is not equal.")
} }
} }
func TestGetBlocksSequential(t *testing.T) {
net := tn.VirtualNetwork()
rs := mock.VirtualRoutingServer()
sg := bitswap.NewSessionGenerator(net, rs)
bg := blocksutil.NewBlockGenerator()
instances := sg.Instances(4)
blks := bg.Blocks(50)
// TODO: verify no duplicates
var servs []*BlockService
for _, i := range instances {
bserv, err := New(i.Blockstore, i.Exchange)
if err != nil {
t.Fatal(err)
}
servs = append(servs, bserv)
}
var keys []u.Key
for _, blk := range blks {
keys = append(keys, blk.Key())
servs[0].AddBlock(blk)
}
t.Log("one instance at a time, get blocks concurrently")
for i := 1; i < len(instances); i++ {
ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
out := servs[i].GetBlocks(ctx, keys)
gotten := make(map[u.Key]*blocks.Block)
for blk := range out {
if _, ok := gotten[blk.Key()]; ok {
t.Fatal("Got duplicate block!")
}
gotten[blk.Key()] = blk
}
if len(gotten) != len(blks) {
t.Fatalf("Didnt get enough blocks back: %d/%d", len(gotten), len(blks))
}
}
}

View File

@ -9,9 +9,9 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
blocks "github.com/jbenet/go-ipfs/blocks" blocks "github.com/jbenet/go-ipfs/blocks"
"github.com/jbenet/go-ipfs/blocks/blockstore"
exchange "github.com/jbenet/go-ipfs/exchange" exchange "github.com/jbenet/go-ipfs/exchange"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
) )
@ -19,25 +19,28 @@ import (
var log = u.Logger("blockservice") var log = u.Logger("blockservice")
var ErrNotFound = errors.New("blockservice: key not found") var ErrNotFound = errors.New("blockservice: key not found")
// BlockService is a block datastore. // BlockService is a hybrid block datastore. It stores data in a local
// datastore and may retrieve data from a remote Exchange.
// It uses an internal `datastore.Datastore` instance to store values. // It uses an internal `datastore.Datastore` instance to store values.
type BlockService struct { type BlockService struct {
Datastore ds.Datastore // TODO don't expose underlying impl details
Remote exchange.Interface Blockstore blockstore.Blockstore
Remote exchange.Interface
} }
// NewBlockService creates a BlockService with given datastore instance. // NewBlockService creates a BlockService with given datastore instance.
func NewBlockService(d ds.Datastore, rem exchange.Interface) (*BlockService, error) { func New(bs blockstore.Blockstore, rem exchange.Interface) (*BlockService, error) {
if d == nil { if bs == nil {
return nil, fmt.Errorf("BlockService requires valid datastore") return nil, fmt.Errorf("BlockService requires valid blockstore")
} }
if rem == nil { if rem == nil {
log.Warning("blockservice running in local (offline) mode.") log.Warning("blockservice running in local (offline) mode.")
} }
return &BlockService{Datastore: d, Remote: rem}, nil return &BlockService{Blockstore: bs, Remote: rem}, nil
} }
// AddBlock adds a particular block to the service, Putting it into the datastore. // AddBlock adds a particular block to the service, Putting it into the datastore.
// TODO pass a context into this if the remote.HasBlock is going to remain here.
func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) { func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
k := b.Key() k := b.Key()
log.Debugf("blockservice: storing [%s] in datastore", k) log.Debugf("blockservice: storing [%s] in datastore", k)
@ -47,7 +50,7 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
// check if we have it before adding. this is an extra read, but large writes // check if we have it before adding. this is an extra read, but large writes
// are more expensive. // are more expensive.
// TODO(jbenet) cheaper has. https://github.com/jbenet/go-datastore/issues/6 // TODO(jbenet) cheaper has. https://github.com/jbenet/go-datastore/issues/6
has, err := s.Datastore.Has(k.DsKey()) has, err := s.Blockstore.Has(k)
if err != nil { if err != nil {
return k, err return k, err
} }
@ -55,15 +58,17 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
log.Debugf("blockservice: storing [%s] in datastore (already stored)", k) log.Debugf("blockservice: storing [%s] in datastore (already stored)", k)
} else { } else {
log.Debugf("blockservice: storing [%s] in datastore", k) log.Debugf("blockservice: storing [%s] in datastore", k)
err := s.Datastore.Put(k.DsKey(), b.Data) err := s.Blockstore.Put(b)
if err != nil { if err != nil {
return k, err return k, err
} }
} }
// TODO this operation rate-limits blockservice operations, we should
// consider moving this to an sync process.
if s.Remote != nil { if s.Remote != nil {
ctx := context.TODO() ctx := context.TODO()
err = s.Remote.HasBlock(ctx, *b) err = s.Remote.HasBlock(ctx, b)
} }
return k, err return k, err
} }
@ -72,20 +77,14 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
// Getting it from the datastore using the key (hash). // Getting it from the datastore using the key (hash).
func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, error) { func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, error) {
log.Debugf("BlockService GetBlock: '%s'", k) log.Debugf("BlockService GetBlock: '%s'", k)
datai, err := s.Datastore.Get(k.DsKey()) block, err := s.Blockstore.Get(k)
if err == nil { if err == nil {
log.Debug("Blockservice: Got data in datastore.") return block, nil
bdata, ok := datai.([]byte) // TODO be careful checking ErrNotFound. If the underlying
if !ok { // implementation changes, this will break.
return nil, fmt.Errorf("data associated with %s is not a []byte", k)
}
return &blocks.Block{
Multihash: mh.Multihash(k),
Data: bdata,
}, nil
} else if err == ds.ErrNotFound && s.Remote != nil { } else if err == ds.ErrNotFound && s.Remote != nil {
log.Debug("Blockservice: Searching bitswap.") log.Debug("Blockservice: Searching bitswap.")
blk, err := s.Remote.Block(ctx, k) blk, err := s.Remote.GetBlock(ctx, k)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -96,7 +95,46 @@ func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, er
} }
} }
// GetBlocks gets a list of blocks asynchronously and returns through
// the returned channel.
// NB: No guarantees are made about order.
func (s *BlockService) GetBlocks(ctx context.Context, ks []u.Key) <-chan *blocks.Block {
out := make(chan *blocks.Block, 0)
go func() {
defer close(out)
var misses []u.Key
for _, k := range ks {
hit, err := s.Blockstore.Get(k)
if err != nil {
misses = append(misses, k)
continue
}
log.Debug("Blockservice: Got data in datastore.")
select {
case out <- hit:
case <-ctx.Done():
return
}
}
rblocks, err := s.Remote.GetBlocks(ctx, misses)
if err != nil {
log.Errorf("Error with GetBlocks: %s", err)
return
}
for b := range rblocks {
select {
case out <- b:
case <-ctx.Done():
return
}
}
}()
return out
}
// DeleteBlock deletes a block in the blockservice from the datastore // DeleteBlock deletes a block in the blockservice from the datastore
func (s *BlockService) DeleteBlock(k u.Key) error { func (s *BlockService) DeleteBlock(k u.Key) error {
return s.Datastore.Delete(k.DsKey()) return s.Blockstore.DeleteBlock(k)
} }

View File

@ -6,6 +6,7 @@ import (
"io" "io"
"os" "os"
"os/signal" "os/signal"
"runtime"
"runtime/pprof" "runtime/pprof"
"syscall" "syscall"
@ -54,6 +55,7 @@ type cmdInvocation struct {
// - output the response // - output the response
// - if anything fails, print error, maybe with help // - if anything fails, print error, maybe with help
func main() { func main() {
runtime.GOMAXPROCS(3)
ctx := context.Background() ctx := context.Background()
var err error var err error
var invoc cmdInvocation var invoc cmdInvocation
@ -510,6 +512,6 @@ func (i *cmdInvocation) setupInterruptHandler() {
func allInterruptSignals() chan os.Signal { func allInterruptSignals() chan os.Signal {
sigc := make(chan os.Signal, 1) sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT, signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT,
syscall.SIGTERM, syscall.SIGQUIT) syscall.SIGTERM)
return sigc return sigc
} }

View File

@ -8,12 +8,14 @@ import (
b58 "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-base58" b58 "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-base58"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
bserv "github.com/jbenet/go-ipfs/blockservice" bserv "github.com/jbenet/go-ipfs/blockservice"
config "github.com/jbenet/go-ipfs/config" config "github.com/jbenet/go-ipfs/config"
diag "github.com/jbenet/go-ipfs/diagnostics" diag "github.com/jbenet/go-ipfs/diagnostics"
exchange "github.com/jbenet/go-ipfs/exchange" exchange "github.com/jbenet/go-ipfs/exchange"
bitswap "github.com/jbenet/go-ipfs/exchange/bitswap" bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network" bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
"github.com/jbenet/go-ipfs/exchange/offline"
mount "github.com/jbenet/go-ipfs/fuse/mount" mount "github.com/jbenet/go-ipfs/fuse/mount"
merkledag "github.com/jbenet/go-ipfs/merkledag" merkledag "github.com/jbenet/go-ipfs/merkledag"
namesys "github.com/jbenet/go-ipfs/namesys" namesys "github.com/jbenet/go-ipfs/namesys"
@ -28,7 +30,7 @@ import (
dht "github.com/jbenet/go-ipfs/routing/dht" dht "github.com/jbenet/go-ipfs/routing/dht"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser" ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
"github.com/jbenet/go-ipfs/util/debugerror" debugerror "github.com/jbenet/go-ipfs/util/debugerror"
"github.com/jbenet/go-ipfs/util/eventlog" "github.com/jbenet/go-ipfs/util/eventlog"
) )
@ -114,6 +116,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (n *IpfsNode, err error) {
Config: cfg, Config: cfg,
} }
n.ContextCloser = ctxc.NewContextCloser(ctx, n.teardown) n.ContextCloser = ctxc.NewContextCloser(ctx, n.teardown)
ctx = n.Context()
// setup datastore. // setup datastore.
if n.Datastore, err = makeDatastore(cfg.Datastore); err != nil { if n.Datastore, err = makeDatastore(cfg.Datastore); err != nil {
@ -127,6 +130,8 @@ func NewIpfsNode(cfg *config.Config, online bool) (n *IpfsNode, err error) {
return nil, debugerror.Wrap(err) return nil, debugerror.Wrap(err)
} }
n.Exchange = offline.Exchange()
// setup online services // setup online services
if online { if online {
@ -169,14 +174,16 @@ func NewIpfsNode(cfg *config.Config, online bool) (n *IpfsNode, err error) {
// setup exchange service // setup exchange service
const alwaysSendToPeer = true // use YesManStrategy const alwaysSendToPeer = true // use YesManStrategy
bitswapNetwork := bsnet.NewFromIpfsNetwork(exchangeService, n.Network) bitswapNetwork := bsnet.NewFromIpfsNetwork(exchangeService, n.Network)
n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, n.Routing, n.Datastore, alwaysSendToPeer) bstore := blockstore.NewBlockstore(n.Datastore)
n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, n.Routing, bstore, alwaysSendToPeer)
go initConnections(ctx, n.Config, n.Peerstore, dhtRouting) go initConnections(ctx, n.Config, n.Peerstore, dhtRouting)
} }
// TODO(brian): when offline instantiate the BlockService with a bitswap // TODO(brian): when offline instantiate the BlockService with a bitswap
// session that simply doesn't return blocks // session that simply doesn't return blocks
n.Blocks, err = bserv.NewBlockService(n.Datastore, n.Exchange) n.Blocks, err = bserv.New(blockstore.NewBlockstore(n.Datastore), n.Exchange)
if err != nil { if err != nil {
return nil, debugerror.Wrap(err) return nil, debugerror.Wrap(err)
} }

View File

@ -3,8 +3,10 @@ package core
import ( import (
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
syncds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" syncds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
bs "github.com/jbenet/go-ipfs/blockservice" "github.com/jbenet/go-ipfs/blocks/blockstore"
blockservice "github.com/jbenet/go-ipfs/blockservice"
ci "github.com/jbenet/go-ipfs/crypto" ci "github.com/jbenet/go-ipfs/crypto"
"github.com/jbenet/go-ipfs/exchange/offline"
mdag "github.com/jbenet/go-ipfs/merkledag" mdag "github.com/jbenet/go-ipfs/merkledag"
nsys "github.com/jbenet/go-ipfs/namesys" nsys "github.com/jbenet/go-ipfs/namesys"
path "github.com/jbenet/go-ipfs/path" path "github.com/jbenet/go-ipfs/path"
@ -43,9 +45,7 @@ func NewMockNode() (*IpfsNode, error) {
nd.Routing = dht nd.Routing = dht
// Bitswap // Bitswap
//?? bserv, err := blockservice.New(blockstore.NewBlockstore(nd.Datastore), offline.Exchange())
bserv, err := bs.NewBlockService(nd.Datastore, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -0,0 +1,47 @@
#Welcome to Bitswap
###(The data trading engine)
Bitswap is the module that is responsible for requesting and providing data
blocks over the network to and from other ipfs peers. The role of bitswap is
to be a merchant in the large global marketplace of data.
##Main Operations
Bitswap has three high level operations:
- **GetBlocks**
- `GetBlocks` is a bitswap method used to request multiple blocks that are likely
to all be provided by the same set of peers (part of a single file, for example).
- **GetBlock**
- `GetBlock` is a special case of `GetBlocks` that just requests a single block.
- **HasBlock**
- `HasBlock` registers a local block with bitswap. Bitswap will then send that
block to any connected peers who want it (with the strategies approval), record
that transaction in the ledger and announce to the DHT that the block is being
provided.
##Internal Details
All `GetBlock` requests are relayed into a single for-select loop via channels.
Calls to `GetBlocks` will have `FindProviders` called for only the first key in
the set initially, This is an optimization attempting to cut down on the number
of RPCs required. After a timeout (specified by the strategies
`GetRebroadcastDelay`) Bitswap will iterate through all keys still in the local
wantlist, perform a find providers call for each, and sent the wantlist out to
those providers. This is the fallback behaviour for cases where our initial
assumption about one peer potentially having multiple blocks in a set does not
hold true.
When receiving messages, Bitswaps `ReceiveMessage` method is called. A bitswap
message may contain the wantlist of the peer who sent the message, and an array
of blocks that were on our local wantlist. Any blocks we receive in a bitswap
message will be passed to `HasBlock`, and the other peers wantlist gets updated
in the strategy by `bs.strategy.MessageReceived`.
If another peers wantlist is received, Bitswap will call its strategies
`ShouldSendBlockToPeer` method to determine whether or not the other peer will
be sent the block they are requesting (if we even have it).
##Outstanding TODOs:
- [ ] Ensure only one request active per key
- [ ] More involved strategies
- [ ] Ensure only wanted blocks are counted in ledgers

View File

@ -3,13 +3,13 @@
package bitswap package bitswap
import ( import (
"sync"
"time" "time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
blocks "github.com/jbenet/go-ipfs/blocks" blocks "github.com/jbenet/go-ipfs/blocks"
blockstore "github.com/jbenet/go-ipfs/blockstore" blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
exchange "github.com/jbenet/go-ipfs/exchange" exchange "github.com/jbenet/go-ipfs/exchange"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network" bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
@ -17,17 +17,26 @@ import (
strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy" strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
) )
var log = u.Logger("bitswap") var log = eventlog.Logger("bitswap")
// Number of providers to request for sending a wantlist to
// TODO: if a 'non-nice' strategy is implemented, consider increasing this value
const maxProvidersPerRequest = 3
const providerRequestTimeout = time.Second * 10
const hasBlockTimeout = time.Second * 15
// New initializes a BitSwap instance that communicates over the // New initializes a BitSwap instance that communicates over the
// provided BitSwapNetwork. This function registers the returned instance as // provided BitSwapNetwork. This function registers the returned instance as
// the network delegate. // the network delegate.
// Runs until context is cancelled // Runs until context is cancelled
func New(ctx context.Context, p peer.Peer, func New(parent context.Context, p peer.Peer, network bsnet.BitSwapNetwork, routing bsnet.Routing,
network bsnet.BitSwapNetwork, routing bsnet.Routing, bstore blockstore.Blockstore, nice bool) exchange.Interface {
d ds.ThreadSafeDatastore, nice bool) exchange.Interface {
ctx, cancelFunc := context.WithCancel(parent)
notif := notifications.New() notif := notifications.New()
go func() { go func() {
@ -36,14 +45,17 @@ func New(ctx context.Context, p peer.Peer,
}() }()
bs := &bitswap{ bs := &bitswap{
blockstore: blockstore.NewBlockstore(d), blockstore: bstore,
cancelFunc: cancelFunc,
notifications: notif, notifications: notif,
strategy: strategy.New(nice), strategy: strategy.New(nice),
routing: routing, routing: routing,
sender: network, sender: network,
wantlist: u.NewKeySet(), wantlist: u.NewKeySet(),
batchRequests: make(chan []u.Key, 32),
} }
network.SetDelegate(bs) network.SetDelegate(bs)
go bs.loop(ctx)
return bs return bs
} }
@ -63,91 +75,207 @@ type bitswap struct {
notifications notifications.PubSub notifications notifications.PubSub
// Requests for a set of related blocks
// the assumption is made that the same peer is likely to
// have more than a single block in the set
batchRequests chan []u.Key
// strategy listens to network traffic and makes decisions about how to // strategy listens to network traffic and makes decisions about how to
// interact with partners. // interact with partners.
// TODO(brian): save the strategy's state to the datastore // TODO(brian): save the strategy's state to the datastore
strategy strategy.Strategy strategy strategy.Strategy
wantlist u.KeySet wantlist u.KeySet
// cancelFunc signals cancellation to the bitswap event loop
cancelFunc func()
} }
// GetBlock attempts to retrieve a particular block from peers within the // GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context // deadline enforced by the context.
// func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) {
// TODO ensure only one active request per key
func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) { // Any async work initiated by this function must end when this function
log.Debugf("Get Block %v", k) // returns. To ensure this, derive a new context. Note that it is okay to
now := time.Now() // listen on parent in this scope, but NOT okay to pass |parent| to
defer func() { // functions called by this one. Otherwise those functions won't return
log.Debugf("GetBlock took %f secs", time.Now().Sub(now).Seconds()) // when this context's cancel func is executed. This is difficult to
}() // enforce. May this comment keep you safe.
ctx, cancelFunc := context.WithCancel(parent) ctx, cancelFunc := context.WithCancel(parent)
defer cancelFunc()
bs.wantlist.Add(k) ctx = eventlog.ContextWithLoggable(ctx, eventlog.Uuid("GetBlockRequest"))
promise := bs.notifications.Subscribe(ctx, k) log.Event(ctx, "GetBlockRequestBegin", &k)
const maxProviders = 20 defer func() {
peersToQuery := bs.routing.FindProvidersAsync(ctx, k, maxProviders) cancelFunc()
log.Event(ctx, "GetBlockRequestEnd", &k)
go func() {
message := bsmsg.New()
for _, wanted := range bs.wantlist.Keys() {
message.AddWanted(wanted)
}
for peerToQuery := range peersToQuery {
log.Debugf("bitswap got peersToQuery: %s", peerToQuery)
go func(p peer.Peer) {
log.Debugf("bitswap dialing peer: %s", p)
err := bs.sender.DialPeer(ctx, p)
if err != nil {
log.Errorf("Error sender.DialPeer(%s)", p)
return
}
response, err := bs.sender.SendRequest(ctx, p, message)
if err != nil {
log.Errorf("Error sender.SendRequest(%s) = %s", p, err)
return
}
// FIXME ensure accounting is handled correctly when
// communication fails. May require slightly different API to
// get better guarantees. May need shared sequence numbers.
bs.strategy.MessageSent(p, message)
if response == nil {
return
}
bs.ReceiveMessage(ctx, p, response)
}(peerToQuery)
}
}() }()
promise, err := bs.GetBlocks(ctx, []u.Key{k})
if err != nil {
return nil, err
}
select { select {
case block := <-promise: case block := <-promise:
bs.wantlist.Remove(k) return block, nil
return &block, nil
case <-parent.Done(): case <-parent.Done():
return nil, parent.Err() return nil, parent.Err()
} }
}
// GetBlocks returns a channel where the caller may receive blocks that
// correspond to the provided |keys|. Returns an error if BitSwap is unable to
// begin this request within the deadline enforced by the context.
//
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.Block, error) {
// TODO log the request
promise := bs.notifications.Subscribe(ctx, keys...)
select {
case bs.batchRequests <- keys:
return promise, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) error {
if peers == nil {
panic("Cant send wantlist to nil peerchan")
}
message := bsmsg.New()
for _, wanted := range bs.wantlist.Keys() {
message.AddWanted(wanted)
}
for peerToQuery := range peers {
log.Debug("sending query to: %s", peerToQuery)
log.Event(ctx, "PeerToQuery", peerToQuery)
go func(p peer.Peer) {
log.Event(ctx, "DialPeer", p)
err := bs.sender.DialPeer(ctx, p)
if err != nil {
log.Errorf("Error sender.DialPeer(%s): %s", p, err)
return
}
err = bs.sender.SendMessage(ctx, p, message)
if err != nil {
log.Errorf("Error sender.SendMessage(%s) = %s", p, err)
return
}
// FIXME ensure accounting is handled correctly when
// communication fails. May require slightly different API to
// get better guarantees. May need shared sequence numbers.
bs.strategy.MessageSent(p, message)
}(peerToQuery)
}
return nil
}
func (bs *bitswap) sendWantlistToProviders(ctx context.Context, ks []u.Key) {
wg := sync.WaitGroup{}
for _, k := range ks {
wg.Add(1)
go func(k u.Key) {
child, _ := context.WithTimeout(ctx, providerRequestTimeout)
providers := bs.routing.FindProvidersAsync(child, k, maxProvidersPerRequest)
err := bs.sendWantListTo(ctx, providers)
if err != nil {
log.Errorf("error sending wantlist: %s", err)
}
wg.Done()
}(k)
}
wg.Wait()
}
// TODO ensure only one active request per key
func (bs *bitswap) loop(parent context.Context) {
ctx, cancel := context.WithCancel(parent)
broadcastSignal := time.NewTicker(bs.strategy.GetRebroadcastDelay())
defer func() {
cancel() // signal to derived async functions
broadcastSignal.Stop()
}()
for {
select {
case <-broadcastSignal.C:
// Resend unfulfilled wantlist keys
bs.sendWantlistToProviders(ctx, bs.wantlist.Keys())
case ks := <-bs.batchRequests:
// TODO: implement batching on len(ks) > X for some X
// i.e. if given 20 keys, fetch first five, then next
// five, and so on, so we are more likely to be able to
// effectively stream the data
if len(ks) == 0 {
log.Warning("Received batch request for zero blocks")
continue
}
for _, k := range ks {
bs.wantlist.Add(k)
}
// NB: send want list to providers for the first peer in this list.
// the assumption is made that the providers of the first key in
// the set are likely to have others as well.
// This currently holds true in most every situation, since when
// pinning a file, you store and provide all blocks associated with
// it. Later, this assumption may not hold as true if we implement
// newer bitswap strategies.
child, _ := context.WithTimeout(ctx, providerRequestTimeout)
providers := bs.routing.FindProvidersAsync(child, ks[0], maxProvidersPerRequest)
err := bs.sendWantListTo(ctx, providers)
if err != nil {
log.Errorf("error sending wantlist: %s", err)
}
case <-parent.Done():
return
}
}
} }
// HasBlock announces the existance of a block to this bitswap service. The // HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers. // service will potentially notify its peers.
func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error { func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
log.Debugf("Has Block %v", blk.Key()) // TODO check all errors
log.Debugf("Has Block %s", blk.Key())
bs.wantlist.Remove(blk.Key()) bs.wantlist.Remove(blk.Key())
bs.sendToPeersThatWant(ctx, blk) bs.notifications.Publish(blk)
return bs.routing.Provide(ctx, blk.Key())
child, _ := context.WithTimeout(ctx, hasBlockTimeout)
bs.sendToPeersThatWant(child, blk)
child, _ = context.WithTimeout(ctx, hasBlockTimeout)
return bs.routing.Provide(child, blk.Key())
}
// receiveBlock handles storing the block in the blockstore and calling HasBlock
func (bs *bitswap) receiveBlock(ctx context.Context, block *blocks.Block) {
// TODO verify blocks?
if err := bs.blockstore.Put(block); err != nil {
log.Criticalf("error putting block: %s", err)
return
}
err := bs.HasBlock(ctx, block)
if err != nil {
log.Warningf("HasBlock errored: %s", err)
}
} }
// TODO(brian): handle errors // TODO(brian): handle errors
func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) ( func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) (
peer.Peer, bsmsg.BitSwapMessage) { peer.Peer, bsmsg.BitSwapMessage) {
log.Debugf("ReceiveMessage from %v", p.Key()) log.Debugf("ReceiveMessage from %s", p)
log.Debugf("Message wantlist: %v", incoming.Wantlist())
if p == nil { if p == nil {
log.Error("Received message from nil peer!") log.Error("Received message from nil peer!")
@ -163,39 +291,39 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
// Record message bytes in ledger // Record message bytes in ledger
// TODO: this is bad, and could be easily abused. // TODO: this is bad, and could be easily abused.
// Should only track *useful* messages in ledger // Should only track *useful* messages in ledger
bs.strategy.MessageReceived(p, incoming) // FIRST // This call records changes to wantlists, blocks received,
// and number of bytes transfered.
bs.strategy.MessageReceived(p, incoming)
for _, block := range incoming.Blocks() { go func() {
// TODO verify blocks? for _, block := range incoming.Blocks() {
if err := bs.blockstore.Put(&block); err != nil { bs.receiveBlock(ctx, block)
continue // FIXME(brian): err ignored
} }
bs.notifications.Publish(block) }()
err := bs.HasBlock(ctx, block)
if err != nil {
log.Warningf("HasBlock errored: %s", err)
}
}
message := bsmsg.New()
for _, wanted := range bs.wantlist.Keys() {
message.AddWanted(wanted)
}
for _, key := range incoming.Wantlist() { for _, key := range incoming.Wantlist() {
// TODO: might be better to check if we have the block before checking
// if we should send it to someone
if bs.strategy.ShouldSendBlockToPeer(key, p) { if bs.strategy.ShouldSendBlockToPeer(key, p) {
if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil { if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil {
continue continue
} else { } else {
message.AddBlock(*block) // Create a separate message to send this block in
blkmsg := bsmsg.New()
// TODO: only send this the first time
// no sense in sending our wantlist to the
// same peer multiple times
for _, k := range bs.wantlist.Keys() {
blkmsg.AddWanted(k)
}
blkmsg.AddBlock(block)
bs.send(ctx, p, blkmsg)
} }
} }
} }
defer bs.strategy.MessageSent(p, message)
log.Debug("Returning message.") // TODO: consider changing this function to not return anything
return p, message return nil, nil
} }
func (bs *bitswap) ReceiveError(err error) { func (bs *bitswap) ReceiveError(err error) {
@ -211,8 +339,8 @@ func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage
bs.strategy.MessageSent(p, m) bs.strategy.MessageSent(p, m)
} }
func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) { func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block *blocks.Block) {
log.Debugf("Sending %v to peers that want it", block.Key()) log.Debugf("Sending %s to peers that want it", block)
for _, p := range bs.strategy.Peers() { for _, p := range bs.strategy.Peers() {
if bs.strategy.BlockIsWantedByPeer(block.Key(), p) { if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
@ -228,3 +356,8 @@ func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block)
} }
} }
} }
func (bs *bitswap) Close() error {
bs.cancelFunc()
return nil // to conform to Closer interface
}

View File

@ -7,20 +7,28 @@ import (
"time" "time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
blocks "github.com/jbenet/go-ipfs/blocks" blocks "github.com/jbenet/go-ipfs/blocks"
bstore "github.com/jbenet/go-ipfs/blockstore" blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil"
exchange "github.com/jbenet/go-ipfs/exchange"
notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
mock "github.com/jbenet/go-ipfs/routing/mock" mock "github.com/jbenet/go-ipfs/routing/mock"
util "github.com/jbenet/go-ipfs/util"
) )
func TestClose(t *testing.T) {
// TODO
t.Skip("TODO Bitswap's Close implementation is a WIP")
vnet := tn.VirtualNetwork()
rout := mock.VirtualRoutingServer()
sesgen := NewSessionGenerator(vnet, rout)
bgen := blocksutil.NewBlockGenerator()
block := bgen.Next()
bitswap := sesgen.Next()
bitswap.Exchange.Close()
bitswap.Exchange.GetBlock(context.Background(), block.Key())
}
func TestGetBlockTimeout(t *testing.T) { func TestGetBlockTimeout(t *testing.T) {
net := tn.VirtualNetwork() net := tn.VirtualNetwork()
@ -31,7 +39,7 @@ func TestGetBlockTimeout(t *testing.T) {
ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond) ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
block := blocks.NewBlock([]byte("block")) block := blocks.NewBlock([]byte("block"))
_, err := self.exchange.Block(ctx, block.Key()) _, err := self.Exchange.GetBlock(ctx, block.Key())
if err != context.DeadlineExceeded { if err != context.DeadlineExceeded {
t.Fatal("Expected DeadlineExceeded error") t.Fatal("Expected DeadlineExceeded error")
@ -50,7 +58,7 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) {
solo := g.Next() solo := g.Next()
ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond) ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
_, err := solo.exchange.Block(ctx, block.Key()) _, err := solo.Exchange.GetBlock(ctx, block.Key())
if err != context.DeadlineExceeded { if err != context.DeadlineExceeded {
t.Fatal("Expected DeadlineExceeded error") t.Fatal("Expected DeadlineExceeded error")
@ -68,17 +76,17 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
hasBlock := g.Next() hasBlock := g.Next()
if err := hasBlock.blockstore.Put(block); err != nil { if err := hasBlock.Blockstore.Put(block); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := hasBlock.exchange.HasBlock(context.Background(), *block); err != nil { if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
t.Fatal(err) t.Fatal(err)
} }
wantsBlock := g.Next() wantsBlock := g.Next()
ctx, _ := context.WithTimeout(context.Background(), time.Second) ctx, _ := context.WithTimeout(context.Background(), time.Second)
received, err := wantsBlock.exchange.Block(ctx, block.Key()) received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key())
if err != nil { if err != nil {
t.Log(err) t.Log(err)
t.Fatal("Expected to succeed") t.Fatal("Expected to succeed")
@ -89,19 +97,36 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
} }
} }
func TestSwarm(t *testing.T) { func TestLargeSwarm(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
t.Parallel()
numInstances := 5
numBlocks := 2
PerformDistributionTest(t, numInstances, numBlocks)
}
func TestLargeFile(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
t.Parallel()
numInstances := 10
numBlocks := 100
PerformDistributionTest(t, numInstances, numBlocks)
}
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
if testing.Short() { if testing.Short() {
t.SkipNow() t.SkipNow()
} }
net := tn.VirtualNetwork() net := tn.VirtualNetwork()
rs := mock.VirtualRoutingServer() rs := mock.VirtualRoutingServer()
sg := NewSessionGenerator(net, rs) sg := NewSessionGenerator(net, rs)
bg := NewBlockGenerator() bg := blocksutil.NewBlockGenerator()
t.Log("Create a ton of instances, and just a few blocks") t.Log("Test a few nodes trying to get one file with a lot of blocks")
numInstances := 500
numBlocks := 2
instances := sg.Instances(numInstances) instances := sg.Instances(numInstances)
blocks := bg.Blocks(numBlocks) blocks := bg.Blocks(numBlocks)
@ -110,9 +135,9 @@ func TestSwarm(t *testing.T) {
first := instances[0] first := instances[0]
for _, b := range blocks { for _, b := range blocks {
first.blockstore.Put(b) first.Blockstore.Put(b)
first.exchange.HasBlock(context.Background(), *b) first.Exchange.HasBlock(context.Background(), b)
rs.Announce(first.peer, b.Key()) rs.Announce(first.Peer, b.Key())
} }
t.Log("Distribute!") t.Log("Distribute!")
@ -133,16 +158,16 @@ func TestSwarm(t *testing.T) {
for _, inst := range instances { for _, inst := range instances {
for _, b := range blocks { for _, b := range blocks {
if _, err := inst.blockstore.Get(b.Key()); err != nil { if _, err := inst.Blockstore.Get(b.Key()); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
} }
} }
func getOrFail(bitswap instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) { func getOrFail(bitswap Instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
if _, err := bitswap.blockstore.Get(b.Key()); err != nil { if _, err := bitswap.Blockstore.Get(b.Key()); err != nil {
_, err := bitswap.exchange.Block(context.Background(), b.Key()) _, err := bitswap.Exchange.GetBlock(context.Background(), b.Key())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -152,149 +177,67 @@ func getOrFail(bitswap instance, b *blocks.Block, t *testing.T, wg *sync.WaitGro
// TODO simplify this test. get to the _essence_! // TODO simplify this test. get to the _essence_!
func TestSendToWantingPeer(t *testing.T) { func TestSendToWantingPeer(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
net := tn.VirtualNetwork() net := tn.VirtualNetwork()
rs := mock.VirtualRoutingServer() rs := mock.VirtualRoutingServer()
sg := NewSessionGenerator(net, rs) sg := NewSessionGenerator(net, rs)
bg := NewBlockGenerator() bg := blocksutil.NewBlockGenerator()
me := sg.Next() me := sg.Next()
w := sg.Next() w := sg.Next()
o := sg.Next() o := sg.Next()
t.Logf("Session %v\n", me.peer) t.Logf("Session %v\n", me.Peer)
t.Logf("Session %v\n", w.peer) t.Logf("Session %v\n", w.Peer)
t.Logf("Session %v\n", o.peer) t.Logf("Session %v\n", o.Peer)
alpha := bg.Next() alpha := bg.Next()
const timeout = 1 * time.Millisecond // FIXME don't depend on time const timeout = 100 * time.Millisecond // FIXME don't depend on time
t.Logf("Peer %v attempts to get %v. NB: not available\n", w.peer, alpha.Key()) t.Logf("Peer %v attempts to get %v. NB: not available\n", w.Peer, alpha.Key())
ctx, _ := context.WithTimeout(context.Background(), timeout) ctx, _ := context.WithTimeout(context.Background(), timeout)
_, err := w.exchange.Block(ctx, alpha.Key()) _, err := w.Exchange.GetBlock(ctx, alpha.Key())
if err == nil { if err == nil {
t.Fatalf("Expected %v to NOT be available", alpha.Key()) t.Fatalf("Expected %v to NOT be available", alpha.Key())
} }
beta := bg.Next() beta := bg.Next()
t.Logf("Peer %v announes availability of %v\n", w.peer, beta.Key()) t.Logf("Peer %v announes availability of %v\n", w.Peer, beta.Key())
ctx, _ = context.WithTimeout(context.Background(), timeout) ctx, _ = context.WithTimeout(context.Background(), timeout)
if err := w.blockstore.Put(&beta); err != nil { if err := w.Blockstore.Put(beta); err != nil {
t.Fatal(err) t.Fatal(err)
} }
w.exchange.HasBlock(ctx, beta) w.Exchange.HasBlock(ctx, beta)
t.Logf("%v gets %v from %v and discovers it wants %v\n", me.peer, beta.Key(), w.peer, alpha.Key()) t.Logf("%v gets %v from %v and discovers it wants %v\n", me.Peer, beta.Key(), w.Peer, alpha.Key())
ctx, _ = context.WithTimeout(context.Background(), timeout) ctx, _ = context.WithTimeout(context.Background(), timeout)
if _, err := me.exchange.Block(ctx, beta.Key()); err != nil { if _, err := me.Exchange.GetBlock(ctx, beta.Key()); err != nil {
t.Fatal(err) t.Fatal(err)
} }
t.Logf("%v announces availability of %v\n", o.peer, alpha.Key()) t.Logf("%v announces availability of %v\n", o.Peer, alpha.Key())
ctx, _ = context.WithTimeout(context.Background(), timeout) ctx, _ = context.WithTimeout(context.Background(), timeout)
if err := o.blockstore.Put(&alpha); err != nil { if err := o.Blockstore.Put(alpha); err != nil {
t.Fatal(err) t.Fatal(err)
} }
o.exchange.HasBlock(ctx, alpha) o.Exchange.HasBlock(ctx, alpha)
t.Logf("%v requests %v\n", me.peer, alpha.Key()) t.Logf("%v requests %v\n", me.Peer, alpha.Key())
ctx, _ = context.WithTimeout(context.Background(), timeout) ctx, _ = context.WithTimeout(context.Background(), timeout)
if _, err := me.exchange.Block(ctx, alpha.Key()); err != nil { if _, err := me.Exchange.GetBlock(ctx, alpha.Key()); err != nil {
t.Fatal(err) t.Fatal(err)
} }
t.Logf("%v should now have %v\n", w.peer, alpha.Key()) t.Logf("%v should now have %v\n", w.Peer, alpha.Key())
block, err := w.blockstore.Get(alpha.Key()) block, err := w.Blockstore.Get(alpha.Key())
if err != nil { if err != nil {
t.Fatal("Should not have received an error") t.Fatalf("Should not have received an error: %s", err)
} }
if block.Key() != alpha.Key() { if block.Key() != alpha.Key() {
t.Fatal("Expected to receive alpha from me") t.Fatal("Expected to receive alpha from me")
} }
} }
func NewBlockGenerator() BlockGenerator {
return BlockGenerator{}
}
type BlockGenerator struct {
seq int
}
func (bg *BlockGenerator) Next() blocks.Block {
bg.seq++
return *blocks.NewBlock([]byte(string(bg.seq)))
}
func (bg *BlockGenerator) Blocks(n int) []*blocks.Block {
blocks := make([]*blocks.Block, 0)
for i := 0; i < n; i++ {
b := bg.Next()
blocks = append(blocks, &b)
}
return blocks
}
func NewSessionGenerator(
net tn.Network, rs mock.RoutingServer) SessionGenerator {
return SessionGenerator{
net: net,
rs: rs,
seq: 0,
}
}
type SessionGenerator struct {
seq int
net tn.Network
rs mock.RoutingServer
}
func (g *SessionGenerator) Next() instance {
g.seq++
return session(g.net, g.rs, []byte(string(g.seq)))
}
func (g *SessionGenerator) Instances(n int) []instance {
instances := make([]instance, 0)
for j := 0; j < n; j++ {
inst := g.Next()
instances = append(instances, inst)
}
return instances
}
type instance struct {
peer peer.Peer
exchange exchange.Interface
blockstore bstore.Blockstore
}
// session creates a test bitswap session.
//
// NB: It's easy make mistakes by providing the same peer ID to two different
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's
// just a much better idea.
func session(net tn.Network, rs mock.RoutingServer, id peer.ID) instance {
p := peer.WithID(id)
adapter := net.Adapter(p)
htc := rs.Client(p)
blockstore := bstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
const alwaysSendToPeer = true
bs := &bitswap{
blockstore: blockstore,
notifications: notifications.New(),
strategy: strategy.New(alwaysSendToPeer),
routing: htc,
sender: adapter,
wantlist: util.NewKeySet(),
}
adapter.SetDelegate(bs)
return instance{
peer: p,
exchange: bs,
blockstore: blockstore,
}
}

View File

@ -19,7 +19,7 @@ type BitSwapMessage interface {
Wantlist() []u.Key Wantlist() []u.Key
// Blocks returns a slice of unique blocks // Blocks returns a slice of unique blocks
Blocks() []blocks.Block Blocks() []*blocks.Block
// AddWanted adds the key to the Wantlist. // AddWanted adds the key to the Wantlist.
// //
@ -32,7 +32,7 @@ type BitSwapMessage interface {
// implies Priority(A) > Priority(B) // implies Priority(A) > Priority(B)
AddWanted(u.Key) AddWanted(u.Key)
AddBlock(blocks.Block) AddBlock(*blocks.Block)
Exportable Exportable
} }
@ -42,14 +42,14 @@ type Exportable interface {
} }
type impl struct { type impl struct {
existsInWantlist map[u.Key]struct{} // map to detect duplicates existsInWantlist map[u.Key]struct{} // map to detect duplicates
wantlist []u.Key // slice to preserve ordering wantlist []u.Key // slice to preserve ordering
blocks map[u.Key]blocks.Block // map to detect duplicates blocks map[u.Key]*blocks.Block // map to detect duplicates
} }
func New() BitSwapMessage { func New() BitSwapMessage {
return &impl{ return &impl{
blocks: make(map[u.Key]blocks.Block), blocks: make(map[u.Key]*blocks.Block),
existsInWantlist: make(map[u.Key]struct{}), existsInWantlist: make(map[u.Key]struct{}),
wantlist: make([]u.Key, 0), wantlist: make([]u.Key, 0),
} }
@ -62,7 +62,7 @@ func newMessageFromProto(pbm pb.Message) BitSwapMessage {
} }
for _, d := range pbm.GetBlocks() { for _, d := range pbm.GetBlocks() {
b := blocks.NewBlock(d) b := blocks.NewBlock(d)
m.AddBlock(*b) m.AddBlock(b)
} }
return m return m
} }
@ -71,8 +71,8 @@ func (m *impl) Wantlist() []u.Key {
return m.wantlist return m.wantlist
} }
func (m *impl) Blocks() []blocks.Block { func (m *impl) Blocks() []*blocks.Block {
bs := make([]blocks.Block, 0) bs := make([]*blocks.Block, 0)
for _, block := range m.blocks { for _, block := range m.blocks {
bs = append(bs, block) bs = append(bs, block)
} }
@ -88,7 +88,7 @@ func (m *impl) AddWanted(k u.Key) {
m.wantlist = append(m.wantlist, k) m.wantlist = append(m.wantlist, k)
} }
func (m *impl) AddBlock(b blocks.Block) { func (m *impl) AddBlock(b *blocks.Block) {
m.blocks[b.Key()] = b m.blocks[b.Key()] = b
} }

View File

@ -42,7 +42,7 @@ func TestAppendBlock(t *testing.T) {
m := New() m := New()
for _, str := range strs { for _, str := range strs {
block := blocks.NewBlock([]byte(str)) block := blocks.NewBlock([]byte(str))
m.AddBlock(*block) m.AddBlock(block)
} }
// assert strings are in proto message // assert strings are in proto message
@ -133,10 +133,10 @@ func TestToNetFromNetPreservesWantList(t *testing.T) {
func TestToAndFromNetMessage(t *testing.T) { func TestToAndFromNetMessage(t *testing.T) {
original := New() original := New()
original.AddBlock(*blocks.NewBlock([]byte("W"))) original.AddBlock(blocks.NewBlock([]byte("W")))
original.AddBlock(*blocks.NewBlock([]byte("E"))) original.AddBlock(blocks.NewBlock([]byte("E")))
original.AddBlock(*blocks.NewBlock([]byte("F"))) original.AddBlock(blocks.NewBlock([]byte("F")))
original.AddBlock(*blocks.NewBlock([]byte("M"))) original.AddBlock(blocks.NewBlock([]byte("M")))
p := peer.WithIDString("X") p := peer.WithIDString("X")
netmsg, err := original.ToNet(p) netmsg, err := original.ToNet(p)
@ -180,8 +180,8 @@ func TestDuplicates(t *testing.T) {
t.Fatal("Duplicate in BitSwapMessage") t.Fatal("Duplicate in BitSwapMessage")
} }
msg.AddBlock(*b) msg.AddBlock(b)
msg.AddBlock(*b) msg.AddBlock(b)
if len(msg.Blocks()) != 1 { if len(msg.Blocks()) != 1 {
t.Fatal("Duplicate in BitSwapMessage") t.Fatal("Duplicate in BitSwapMessage")
} }

View File

@ -1,8 +1,6 @@
package network package network
import ( import (
"errors"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
@ -50,22 +48,8 @@ func (bsnet *impl) HandleMessage(
return nil return nil
} }
p, bsmsg := bsnet.receiver.ReceiveMessage(ctx, incoming.Peer(), received) bsnet.receiver.ReceiveMessage(ctx, incoming.Peer(), received)
return nil
// TODO(brian): put this in a helper function
if bsmsg == nil || p == nil {
bsnet.receiver.ReceiveError(errors.New("ReceiveMessage returned nil peer or message"))
return nil
}
outgoing, err := bsmsg.ToNet(p)
if err != nil {
go bsnet.receiver.ReceiveError(err)
return nil
}
log.Debugf("Message size: %d", len(outgoing.Data()))
return outgoing
} }
func (bsnet *impl) DialPeer(ctx context.Context, p peer.Peer) error { func (bsnet *impl) DialPeer(ctx context.Context, p peer.Peer) error {

View File

@ -2,20 +2,21 @@ package notifications
import ( import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
pubsub "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/tuxychandru/pubsub" pubsub "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/maybebtc/pubsub"
blocks "github.com/jbenet/go-ipfs/blocks" blocks "github.com/jbenet/go-ipfs/blocks"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
) )
const bufferSize = 16
type PubSub interface { type PubSub interface {
Publish(block blocks.Block) Publish(block *blocks.Block)
Subscribe(ctx context.Context, k u.Key) <-chan blocks.Block Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block
Shutdown() Shutdown()
} }
func New() PubSub { func New() PubSub {
const bufferSize = 16
return &impl{*pubsub.New(bufferSize)} return &impl{*pubsub.New(bufferSize)}
} }
@ -23,33 +24,58 @@ type impl struct {
wrapped pubsub.PubSub wrapped pubsub.PubSub
} }
func (ps *impl) Publish(block blocks.Block) { func (ps *impl) Publish(block *blocks.Block) {
topic := string(block.Key()) topic := string(block.Key())
ps.wrapped.Pub(block, topic) ps.wrapped.Pub(block, topic)
} }
// Subscribe returns a one-time use |blockChannel|. |blockChannel| returns nil
// if the |ctx| times out or is cancelled. Then channel is closed after the
// block given by |k| is sent.
func (ps *impl) Subscribe(ctx context.Context, k u.Key) <-chan blocks.Block {
topic := string(k)
subChan := ps.wrapped.SubOnce(topic)
blockChannel := make(chan blocks.Block, 1) // buffered so the sender doesn't wait on receiver
go func() {
defer close(blockChannel)
select {
case val := <-subChan:
block, ok := val.(blocks.Block)
if ok {
blockChannel <- block
}
case <-ctx.Done():
ps.wrapped.Unsub(subChan, topic)
}
}()
return blockChannel
}
func (ps *impl) Shutdown() { func (ps *impl) Shutdown() {
ps.wrapped.Shutdown() ps.wrapped.Shutdown()
} }
// Subscribe returns a channel of blocks for the given |keys|. |blockChannel|
// is closed if the |ctx| times out or is cancelled, or after sending len(keys)
// blocks.
func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block {
blocksCh := make(chan *blocks.Block, len(keys))
valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking
if len(keys) == 0 {
close(blocksCh)
return blocksCh
}
ps.wrapped.AddSubOnceEach(valuesCh, toStrings(keys)...)
go func() {
defer close(blocksCh)
defer ps.wrapped.Unsub(valuesCh) // with a len(keys) buffer, this is an optimization
for {
select {
case <-ctx.Done():
return
case val, ok := <-valuesCh:
if !ok {
return
}
block, ok := val.(*blocks.Block)
if !ok {
return
}
select {
case <-ctx.Done():
return
case blocksCh <- block: // continue
}
}
}
}()
return blocksCh
}
func toStrings(keys []u.Key) []string {
strs := make([]string, 0)
for _, key := range keys {
strs = append(strs, string(key))
}
return strs
}

View File

@ -7,8 +7,35 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
blocks "github.com/jbenet/go-ipfs/blocks" blocks "github.com/jbenet/go-ipfs/blocks"
blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil"
"github.com/jbenet/go-ipfs/util"
) )
func TestDuplicates(t *testing.T) {
b1 := blocks.NewBlock([]byte("1"))
b2 := blocks.NewBlock([]byte("2"))
n := New()
defer n.Shutdown()
ch := n.Subscribe(context.Background(), b1.Key(), b2.Key())
n.Publish(b1)
blockRecvd, ok := <-ch
if !ok {
t.Fail()
}
assertBlocksEqual(t, b1, blockRecvd)
n.Publish(b1) // ignored duplicate
n.Publish(b2)
blockRecvd, ok = <-ch
if !ok {
t.Fail()
}
assertBlocksEqual(t, b2, blockRecvd)
}
func TestPublishSubscribe(t *testing.T) { func TestPublishSubscribe(t *testing.T) {
blockSent := blocks.NewBlock([]byte("Greetings from The Interval")) blockSent := blocks.NewBlock([]byte("Greetings from The Interval"))
@ -16,16 +43,48 @@ func TestPublishSubscribe(t *testing.T) {
defer n.Shutdown() defer n.Shutdown()
ch := n.Subscribe(context.Background(), blockSent.Key()) ch := n.Subscribe(context.Background(), blockSent.Key())
n.Publish(*blockSent) n.Publish(blockSent)
blockRecvd, ok := <-ch blockRecvd, ok := <-ch
if !ok { if !ok {
t.Fail() t.Fail()
} }
assertBlocksEqual(t, blockRecvd, *blockSent) assertBlocksEqual(t, blockRecvd, blockSent)
} }
func TestSubscribeMany(t *testing.T) {
e1 := blocks.NewBlock([]byte("1"))
e2 := blocks.NewBlock([]byte("2"))
n := New()
defer n.Shutdown()
ch := n.Subscribe(context.Background(), e1.Key(), e2.Key())
n.Publish(e1)
r1, ok := <-ch
if !ok {
t.Fatal("didn't receive first expected block")
}
assertBlocksEqual(t, e1, r1)
n.Publish(e2)
r2, ok := <-ch
if !ok {
t.Fatal("didn't receive second expected block")
}
assertBlocksEqual(t, e2, r2)
}
func TestSubscribeIsANoopWhenCalledWithNoKeys(t *testing.T) {
n := New()
defer n.Shutdown()
ch := n.Subscribe(context.TODO()) // no keys provided
if _, ok := <-ch; ok {
t.Fatal("should be closed if no keys provided")
}
}
func TestCarryOnWhenDeadlineExpires(t *testing.T) { func TestCarryOnWhenDeadlineExpires(t *testing.T) {
impossibleDeadline := time.Nanosecond impossibleDeadline := time.Nanosecond
@ -39,18 +98,46 @@ func TestCarryOnWhenDeadlineExpires(t *testing.T) {
assertBlockChannelNil(t, blockChannel) assertBlockChannelNil(t, blockChannel)
} }
func assertBlockChannelNil(t *testing.T, blockChannel <-chan blocks.Block) { func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) {
g := blocksutil.NewBlockGenerator()
ctx, cancel := context.WithCancel(context.Background())
n := New()
defer n.Shutdown()
t.Log("generate a large number of blocks. exceed default buffer")
bs := g.Blocks(1000)
ks := func() []util.Key {
var keys []util.Key
for _, b := range bs {
keys = append(keys, b.Key())
}
return keys
}()
_ = n.Subscribe(ctx, ks...) // ignore received channel
t.Log("cancel context before any blocks published")
cancel()
for _, b := range bs {
n.Publish(b)
}
t.Log("publishing the large number of blocks to the ignored channel must not deadlock")
}
func assertBlockChannelNil(t *testing.T, blockChannel <-chan *blocks.Block) {
_, ok := <-blockChannel _, ok := <-blockChannel
if ok { if ok {
t.Fail() t.Fail()
} }
} }
func assertBlocksEqual(t *testing.T, a, b blocks.Block) { func assertBlocksEqual(t *testing.T, a, b *blocks.Block) {
if !bytes.Equal(a.Data, b.Data) { if !bytes.Equal(a.Data, b.Data) {
t.Fail() t.Fatal("blocks aren't equal")
} }
if a.Key() != b.Key() { if a.Key() != b.Key() {
t.Fail() t.Fatal("block keys aren't equal")
} }
} }

View File

@ -1,6 +1,8 @@
package strategy package strategy
import ( import (
"time"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
@ -29,4 +31,8 @@ type Strategy interface {
NumBytesSentTo(peer.Peer) uint64 NumBytesSentTo(peer.Peer) uint64
NumBytesReceivedFrom(peer.Peer) uint64 NumBytesReceivedFrom(peer.Peer) uint64
// Values determining bitswap behavioural patterns
GetBatchSize() int
GetRebroadcastDelay() time.Duration
} }

View File

@ -61,6 +61,7 @@ func (l *ledger) ReceivedBytes(n int) {
// TODO: this needs to be different. We need timeouts. // TODO: this needs to be different. We need timeouts.
func (l *ledger) Wants(k u.Key) { func (l *ledger) Wants(k u.Key) {
log.Debugf("peer %s wants %s", l.Partner, k)
l.wantList[k] = struct{}{} l.wantList[k] = struct{}{}
} }

View File

@ -3,12 +3,15 @@ package strategy
import ( import (
"errors" "errors"
"sync" "sync"
"time"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
) )
var log = u.Logger("strategy")
// TODO niceness should be on a per-peer basis. Use-case: Certain peers are // TODO niceness should be on a per-peer basis. Use-case: Certain peers are
// "trusted" and/or controlled by a single human user. The user may want for // "trusted" and/or controlled by a single human user. The user may want for
// these peers to exchange data freely // these peers to exchange data freely
@ -72,6 +75,8 @@ func (s *strategist) Seed(int64) {
// TODO // TODO
} }
// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
func (s *strategist) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error { func (s *strategist) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
@ -91,7 +96,7 @@ func (s *strategist) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error
// FIXME extract blocks.NumBytes(block) or block.NumBytes() method // FIXME extract blocks.NumBytes(block) or block.NumBytes() method
l.ReceivedBytes(len(block.Data)) l.ReceivedBytes(len(block.Data))
} }
return errors.New("TODO") return nil
} }
// TODO add contents of m.WantList() to my local wantlist? NB: could introduce // TODO add contents of m.WantList() to my local wantlist? NB: could introduce
@ -137,3 +142,11 @@ func (s *strategist) ledger(p peer.Peer) *ledger {
} }
return l return l
} }
func (s *strategist) GetBatchSize() int {
return 10
}
func (s *strategist) GetRebroadcastDelay() time.Duration {
return time.Second * 5
}

View File

@ -30,7 +30,7 @@ func TestConsistentAccounting(t *testing.T) {
m := message.New() m := message.New()
content := []string{"this", "is", "message", "i"} content := []string{"this", "is", "message", "i"}
m.AddBlock(*blocks.NewBlock([]byte(strings.Join(content, " ")))) m.AddBlock(blocks.NewBlock([]byte(strings.Join(content, " "))))
sender.MessageSent(receiver.Peer, m) sender.MessageSent(receiver.Peer, m)
receiver.MessageReceived(sender.Peer, m) receiver.MessageReceived(sender.Peer, m)

View File

@ -33,7 +33,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) {
// TODO test contents of incoming message // TODO test contents of incoming message
m := bsmsg.New() m := bsmsg.New()
m.AddBlock(*blocks.NewBlock([]byte(expectedStr))) m.AddBlock(blocks.NewBlock([]byte(expectedStr)))
return from, m return from, m
})) }))
@ -41,7 +41,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) {
t.Log("Build a message and send a synchronous request to recipient") t.Log("Build a message and send a synchronous request to recipient")
message := bsmsg.New() message := bsmsg.New()
message.AddBlock(*blocks.NewBlock([]byte("data"))) message.AddBlock(blocks.NewBlock([]byte("data")))
response, err := initiator.SendRequest( response, err := initiator.SendRequest(
context.Background(), peer.WithID(idOfRecipient), message) context.Background(), peer.WithID(idOfRecipient), message)
if err != nil { if err != nil {
@ -77,7 +77,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
peer.Peer, bsmsg.BitSwapMessage) { peer.Peer, bsmsg.BitSwapMessage) {
msgToWaiter := bsmsg.New() msgToWaiter := bsmsg.New()
msgToWaiter.AddBlock(*blocks.NewBlock([]byte(expectedStr))) msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr)))
return fromWaiter, msgToWaiter return fromWaiter, msgToWaiter
})) }))
@ -105,7 +105,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
})) }))
messageSentAsync := bsmsg.New() messageSentAsync := bsmsg.New()
messageSentAsync.AddBlock(*blocks.NewBlock([]byte("data"))) messageSentAsync.AddBlock(blocks.NewBlock([]byte("data")))
errSending := waiter.SendMessage( errSending := waiter.SendMessage(
context.Background(), peer.WithID(idOfResponder), messageSentAsync) context.Background(), peer.WithID(idOfResponder), messageSentAsync)
if errSending != nil { if errSending != nil {

View File

@ -0,0 +1,71 @@
package bitswap
import (
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
"github.com/jbenet/go-ipfs/blocks/blockstore"
"github.com/jbenet/go-ipfs/exchange"
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
"github.com/jbenet/go-ipfs/peer"
"github.com/jbenet/go-ipfs/routing/mock"
)
func NewSessionGenerator(
net tn.Network, rs mock.RoutingServer) SessionGenerator {
return SessionGenerator{
net: net,
rs: rs,
seq: 0,
}
}
type SessionGenerator struct {
seq int
net tn.Network
rs mock.RoutingServer
}
func (g *SessionGenerator) Next() Instance {
g.seq++
return session(g.net, g.rs, []byte(string(g.seq)))
}
func (g *SessionGenerator) Instances(n int) []Instance {
instances := make([]Instance, 0)
for j := 0; j < n; j++ {
inst := g.Next()
instances = append(instances, inst)
}
return instances
}
type Instance struct {
Peer peer.Peer
Exchange exchange.Interface
Blockstore blockstore.Blockstore
}
// session creates a test bitswap session.
//
// NB: It's easy make mistakes by providing the same peer ID to two different
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's
// just a much better idea.
func session(net tn.Network, rs mock.RoutingServer, id peer.ID) Instance {
p := peer.WithID(id)
adapter := net.Adapter(p)
htc := rs.Client(p)
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
const alwaysSendToPeer = true
ctx := context.TODO()
bs := New(ctx, p, adapter, htc, bstore, alwaysSendToPeer)
return Instance{
Peer: p,
Exchange: bs,
Blockstore: bstore,
}
}

View File

@ -2,6 +2,8 @@
package exchange package exchange
import ( import (
"io"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
blocks "github.com/jbenet/go-ipfs/blocks" blocks "github.com/jbenet/go-ipfs/blocks"
@ -11,11 +13,14 @@ import (
// Any type that implements exchange.Interface may be used as an IPFS block // Any type that implements exchange.Interface may be used as an IPFS block
// exchange protocol. // exchange protocol.
type Interface interface { type Interface interface {
// GetBlock returns the block associated with a given key.
GetBlock(context.Context, u.Key) (*blocks.Block, error)
// Block returns the block associated with a given key. GetBlocks(context.Context, []u.Key) (<-chan *blocks.Block, error)
Block(context.Context, u.Key) (*blocks.Block, error)
// TODO Should callers be concerned with whether the block was made // TODO Should callers be concerned with whether the block was made
// available on the network? // available on the network?
HasBlock(context.Context, blocks.Block) error HasBlock(context.Context, *blocks.Block) error
io.Closer
} }

View File

@ -14,23 +14,31 @@ import (
var OfflineMode = errors.New("Block unavailable. Operating in offline mode") var OfflineMode = errors.New("Block unavailable. Operating in offline mode")
func NewOfflineExchange() exchange.Interface { func Exchange() exchange.Interface {
return &offlineExchange{} return &offlineExchange{}
} }
// offlineExchange implements the Exchange interface but doesn't return blocks. // offlineExchange implements the Exchange interface but doesn't return blocks.
// For use in offline mode. // For use in offline mode.
type offlineExchange struct { type offlineExchange struct{}
}
// Block returns nil to signal that a block could not be retrieved for the // GetBlock returns nil to signal that a block could not be retrieved for the
// given key. // given key.
// NB: This function may return before the timeout expires. // NB: This function may return before the timeout expires.
func (_ *offlineExchange) Block(context.Context, u.Key) (*blocks.Block, error) { func (_ *offlineExchange) GetBlock(context.Context, u.Key) (*blocks.Block, error) {
return nil, OfflineMode return nil, OfflineMode
} }
// HasBlock always returns nil. // HasBlock always returns nil.
func (_ *offlineExchange) HasBlock(context.Context, blocks.Block) error { func (_ *offlineExchange) HasBlock(context.Context, *blocks.Block) error {
return nil return nil
} }
// Close always returns nil.
func (_ *offlineExchange) Close() error {
return nil
}
func (_ *offlineExchange) GetBlocks(context.Context, []u.Key) (<-chan *blocks.Block, error) {
return nil, OfflineMode
}

View File

@ -10,8 +10,8 @@ import (
) )
func TestBlockReturnsErr(t *testing.T) { func TestBlockReturnsErr(t *testing.T) {
off := NewOfflineExchange() off := Exchange()
_, err := off.Block(context.Background(), u.Key("foo")) _, err := off.GetBlock(context.Background(), u.Key("foo"))
if err != nil { if err != nil {
return // as desired return // as desired
} }
@ -19,9 +19,9 @@ func TestBlockReturnsErr(t *testing.T) {
} }
func TestHasBlockReturnsNil(t *testing.T) { func TestHasBlockReturnsNil(t *testing.T) {
off := NewOfflineExchange() off := Exchange()
block := blocks.NewBlock([]byte("data")) block := blocks.NewBlock([]byte("data"))
err := off.HasBlock(context.Background(), *block) err := off.HasBlock(context.Background(), block)
if err != nil { if err != nil {
t.Fatal("") t.Fatal("")
} }

View File

@ -69,6 +69,7 @@ func testFileConsistency(t *testing.T, bs chunk.BlockSplitter, nbytes int) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
r, err := uio.NewDagReader(nd, nil) r, err := uio.NewDagReader(nd, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@ -22,6 +22,22 @@ var ErrNotFound = fmt.Errorf("merkledag: not found")
// so have to convert Multihash bytes to string (u.Key) // so have to convert Multihash bytes to string (u.Key)
type NodeMap map[u.Key]*Node type NodeMap map[u.Key]*Node
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
Add(*Node) (u.Key, error)
AddRecursive(*Node) error
Get(u.Key) (*Node, error)
Remove(*Node) error
// GetDAG returns, in order, all the single leve child
// nodes of the passed in node.
GetDAG(context.Context, *Node) <-chan *Node
}
func NewDAGService(bs *bserv.BlockService) DAGService {
return &dagService{bs}
}
// Node represents a node in the IPFS Merkle DAG. // Node represents a node in the IPFS Merkle DAG.
// nodes have opaque data and a set of navigable links. // nodes have opaque data and a set of navigable links.
type Node struct { type Node struct {
@ -156,18 +172,6 @@ func (n *Node) Key() (u.Key, error) {
return u.Key(h), err return u.Key(h), err
} }
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
Add(*Node) (u.Key, error)
AddRecursive(*Node) error
Get(u.Key) (*Node, error)
Remove(*Node) error
}
func NewDAGService(bs *bserv.BlockService) DAGService {
return &dagService{bs}
}
// dagService is an IPFS Merkle DAG service. // dagService is an IPFS Merkle DAG service.
// - the root is virtual (like a forest) // - the root is virtual (like a forest)
// - stores nodes' data in a BlockService // - stores nodes' data in a BlockService
@ -252,6 +256,7 @@ func (n *dagService) Remove(nd *Node) error {
// FetchGraph asynchronously fetches all nodes that are children of the given // FetchGraph asynchronously fetches all nodes that are children of the given
// node, and returns a channel that may be waited upon for the fetch to complete // node, and returns a channel that may be waited upon for the fetch to complete
func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{} { func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{} {
log.Warning("Untested.")
var wg sync.WaitGroup var wg sync.WaitGroup
done := make(chan struct{}) done := make(chan struct{})
@ -284,3 +289,64 @@ func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{}
return done return done
} }
// Searches this nodes links for one to the given key,
// returns the index of said link
func FindLink(n *Node, k u.Key, found []*Node) (int, error) {
for i, lnk := range n.Links {
if u.Key(lnk.Hash) == k && found[i] == nil {
return i, nil
}
}
return -1, u.ErrNotFound
}
// GetDAG will fill out all of the links of the given Node.
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
func (ds *dagService) GetDAG(ctx context.Context, root *Node) <-chan *Node {
sig := make(chan *Node)
go func() {
var keys []u.Key
nodes := make([]*Node, len(root.Links))
for _, lnk := range root.Links {
keys = append(keys, u.Key(lnk.Hash))
}
blkchan := ds.Blocks.GetBlocks(ctx, keys)
next := 0
for blk := range blkchan {
i, err := FindLink(root, blk.Key(), nodes)
if err != nil {
// NB: can only occur as a result of programmer error
panic("Received block that wasnt in this nodes links!")
}
nd, err := Decoded(blk.Data)
if err != nil {
// NB: can occur in normal situations, with improperly formatted
// input data
log.Error("Got back bad block!")
break
}
nodes[i] = nd
if next == i {
sig <- nd
next++
for ; next < len(nodes) && nodes[next] != nil; next++ {
sig <- nodes[next]
}
}
}
if next < len(nodes) {
// TODO: bubble errors back up.
log.Errorf("Did not receive correct number of nodes!")
}
close(sig)
}()
return sig
}

View File

@ -1,9 +1,20 @@
package merkledag package merkledag_test
import ( import (
"bytes"
"fmt" "fmt"
"io"
"io/ioutil"
"testing" "testing"
bserv "github.com/jbenet/go-ipfs/blockservice"
bs "github.com/jbenet/go-ipfs/exchange/bitswap"
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
imp "github.com/jbenet/go-ipfs/importer"
chunk "github.com/jbenet/go-ipfs/importer/chunk"
. "github.com/jbenet/go-ipfs/merkledag"
"github.com/jbenet/go-ipfs/routing/mock"
uio "github.com/jbenet/go-ipfs/unixfs/io"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
) )
@ -56,3 +67,84 @@ func TestNode(t *testing.T) {
printn("boop", n2) printn("boop", n2)
printn("beep boop", n3) printn("beep boop", n3)
} }
func makeTestDag(t *testing.T) *Node {
read := io.LimitReader(u.NewTimeSeededRand(), 1024*32)
spl := &chunk.SizeSplitter{512}
root, err := imp.NewDagFromReaderWithSplitter(read, spl)
if err != nil {
t.Fatal(err)
}
return root
}
func TestBatchFetch(t *testing.T) {
net := tn.VirtualNetwork()
rs := mock.VirtualRoutingServer()
sg := bs.NewSessionGenerator(net, rs)
instances := sg.Instances(5)
var servs []*bserv.BlockService
var dagservs []DAGService
for _, i := range instances {
bsi, err := bserv.New(i.Blockstore, i.Exchange)
if err != nil {
t.Fatal(err)
}
servs = append(servs, bsi)
dagservs = append(dagservs, NewDAGService(bsi))
}
t.Log("finished setup.")
root := makeTestDag(t)
read, err := uio.NewDagReader(root, nil)
if err != nil {
t.Fatal(err)
}
expected, err := ioutil.ReadAll(read)
if err != nil {
t.Fatal(err)
}
err = dagservs[0].AddRecursive(root)
if err != nil {
t.Fatal(err)
}
t.Log("Added file to first node.")
k, err := root.Key()
if err != nil {
t.Fatal(err)
}
done := make(chan struct{})
for i := 1; i < len(dagservs); i++ {
go func(i int) {
first, err := dagservs[i].Get(k)
if err != nil {
t.Fatal(err)
}
fmt.Println("Got first node back.")
read, err := uio.NewDagReader(first, dagservs[i])
if err != nil {
t.Fatal(err)
}
datagot, err := ioutil.ReadAll(read)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(datagot, expected) {
t.Fatal("Got bad data back!")
}
done <- struct{}{}
}(i)
}
for i := 1; i < len(dagservs); i++ {
<-done
}
}

View File

@ -42,6 +42,10 @@ type Network interface {
// the network since it was instantiated // the network since it was instantiated
GetBandwidthTotals() (uint64, uint64) GetBandwidthTotals() (uint64, uint64)
// GetMessageCounts returns the total number of messages passed through
// the network since it was instantiated
GetMessageCounts() (uint64, uint64)
// SendMessage sends given Message out // SendMessage sends given Message out
SendMessage(msg.NetMessage) error SendMessage(msg.NetMessage) error

View File

@ -45,9 +45,11 @@ type Muxer struct {
bwiLock sync.Mutex bwiLock sync.Mutex
bwIn uint64 bwIn uint64
msgIn uint64
bwoLock sync.Mutex bwoLock sync.Mutex
bwOut uint64 bwOut uint64
msgOut uint64
*msg.Pipe *msg.Pipe
ctxc.ContextCloser ctxc.ContextCloser
@ -76,6 +78,18 @@ func (m *Muxer) GetPipe() *msg.Pipe {
return m.Pipe return m.Pipe
} }
// GetMessageCounts return the in/out message count measured over this muxer.
func (m *Muxer) GetMessageCounts() (in uint64, out uint64) {
m.bwiLock.Lock()
in = m.msgIn
m.bwiLock.Unlock()
m.bwoLock.Lock()
out = m.msgOut
m.bwoLock.Unlock()
return
}
// GetBandwidthTotals return the in/out bandwidth measured over this muxer. // GetBandwidthTotals return the in/out bandwidth measured over this muxer.
func (m *Muxer) GetBandwidthTotals() (in uint64, out uint64) { func (m *Muxer) GetBandwidthTotals() (in uint64, out uint64) {
m.bwiLock.Lock() m.bwiLock.Lock()
@ -125,6 +139,7 @@ func (m *Muxer) handleIncomingMessage(m1 msg.NetMessage) {
m.bwiLock.Lock() m.bwiLock.Lock()
// TODO: compensate for overhead // TODO: compensate for overhead
m.bwIn += uint64(len(m1.Data())) m.bwIn += uint64(len(m1.Data()))
m.msgIn++
m.bwiLock.Unlock() m.bwiLock.Unlock()
data, pid, err := unwrapData(m1.Data()) data, pid, err := unwrapData(m1.Data())
@ -182,6 +197,7 @@ func (m *Muxer) handleOutgoingMessage(pid pb.ProtocolID, m1 msg.NetMessage) {
// TODO: compensate for overhead // TODO: compensate for overhead
// TODO(jbenet): switch this to a goroutine to prevent sync waiting. // TODO(jbenet): switch this to a goroutine to prevent sync waiting.
m.bwOut += uint64(len(data)) m.bwOut += uint64(len(data))
m.msgOut++
m.bwoLock.Unlock() m.bwoLock.Unlock()
m2 := msg.New(m1.Peer(), data) m2 := msg.New(m1.Peer(), data)

View File

@ -110,6 +110,11 @@ func (n *IpfsNetwork) GetBandwidthTotals() (in uint64, out uint64) {
return n.muxer.GetBandwidthTotals() return n.muxer.GetBandwidthTotals()
} }
// GetBandwidthTotals returns the total amount of messages transferred
func (n *IpfsNetwork) GetMessageCounts() (in uint64, out uint64) {
return n.muxer.GetMessageCounts()
}
// ListenAddresses returns a list of addresses at which this network listens. // ListenAddresses returns a list of addresses at which this network listens.
func (n *IpfsNetwork) ListenAddresses() []ma.Multiaddr { func (n *IpfsNetwork) ListenAddresses() []ma.Multiaddr {
return n.swarm.ListenAddresses() return n.swarm.ListenAddresses()

View File

@ -182,7 +182,7 @@ func (s *Swarm) fanOut() {
return return
} }
if len(msg.Data()) >= conn.MaxMessageSize { if len(msg.Data()) >= conn.MaxMessageSize {
log.Critical("Attempted to send message bigger than max size.") log.Criticalf("Attempted to send message bigger than max size. (%d)", len(msg.Data()))
} }
s.connsLock.RLock() s.connsLock.RLock()

View File

@ -4,7 +4,10 @@ import (
"testing" "testing"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
"github.com/jbenet/go-ipfs/blocks/blockstore"
bs "github.com/jbenet/go-ipfs/blockservice" bs "github.com/jbenet/go-ipfs/blockservice"
"github.com/jbenet/go-ipfs/exchange/offline"
mdag "github.com/jbenet/go-ipfs/merkledag" mdag "github.com/jbenet/go-ipfs/merkledag"
"github.com/jbenet/go-ipfs/util" "github.com/jbenet/go-ipfs/util"
) )
@ -19,13 +22,15 @@ func randNode() (*mdag.Node, util.Key) {
func TestPinnerBasic(t *testing.T) { func TestPinnerBasic(t *testing.T) {
dstore := ds.NewMapDatastore() dstore := ds.NewMapDatastore()
bserv, err := bs.NewBlockService(dstore, nil) bstore := blockstore.NewBlockstore(dssync.MutexWrap(dstore))
bserv, err := bs.New(bstore, offline.Exchange())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
dserv := mdag.NewDAGService(bserv) dserv := mdag.NewDAGService(bserv)
// TODO does pinner need to share datastore with blockservice?
p := NewPinner(dstore, dserv) p := NewPinner(dstore, dserv)
a, ak := randNode() a, ak := randNode()

View File

@ -126,6 +126,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error {
} }
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int) <-chan peer.Peer { func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int) <-chan peer.Peer {
log.Event(ctx, "findProviders", &key)
peerOut := make(chan peer.Peer, count) peerOut := make(chan peer.Peer, count)
go func() { go func() {
ps := newPeerSet() ps := newPeerSet()

View File

@ -59,7 +59,7 @@ func (mr *MockRouter) FindProviders(ctx context.Context, key u.Key) ([]peer.Peer
} }
func (mr *MockRouter) FindPeer(ctx context.Context, pid peer.ID) (peer.Peer, error) { func (mr *MockRouter) FindPeer(ctx context.Context, pid peer.ID) (peer.Peer, error) {
log.Debug("FindPeer: %s", pid) log.Debugf("FindPeer: %s", pid)
return nil, nil return nil, nil
} }

View File

@ -6,7 +6,10 @@ import (
"io/ioutil" "io/ioutil"
"testing" "testing"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
"github.com/jbenet/go-ipfs/blocks/blockstore"
bs "github.com/jbenet/go-ipfs/blockservice" bs "github.com/jbenet/go-ipfs/blockservice"
"github.com/jbenet/go-ipfs/exchange/offline"
imp "github.com/jbenet/go-ipfs/importer" imp "github.com/jbenet/go-ipfs/importer"
"github.com/jbenet/go-ipfs/importer/chunk" "github.com/jbenet/go-ipfs/importer/chunk"
mdag "github.com/jbenet/go-ipfs/merkledag" mdag "github.com/jbenet/go-ipfs/merkledag"
@ -19,7 +22,9 @@ import (
func getMockDagServ(t *testing.T) mdag.DAGService { func getMockDagServ(t *testing.T) mdag.DAGService {
dstore := ds.NewMapDatastore() dstore := ds.NewMapDatastore()
bserv, err := bs.NewBlockService(dstore, nil) tsds := sync.MutexWrap(dstore)
bstore := blockstore.NewBlockstore(tsds)
bserv, err := bs.New(bstore, offline.Exchange())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -5,6 +5,8 @@ import (
"errors" "errors"
"io" "io"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
mdag "github.com/jbenet/go-ipfs/merkledag" mdag "github.com/jbenet/go-ipfs/merkledag"
ft "github.com/jbenet/go-ipfs/unixfs" ft "github.com/jbenet/go-ipfs/unixfs"
@ -15,10 +17,11 @@ var ErrIsDir = errors.New("this dag node is a directory")
// DagReader provides a way to easily read the data contained in a dag. // DagReader provides a way to easily read the data contained in a dag.
type DagReader struct { type DagReader struct {
serv mdag.DAGService serv mdag.DAGService
node *mdag.Node node *mdag.Node
position int buf io.Reader
buf io.Reader fetchChan <-chan *mdag.Node
linkPosition int
} }
// NewDagReader creates a new reader object that reads the data represented by the given // NewDagReader creates a new reader object that reads the data represented by the given
@ -35,10 +38,15 @@ func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) {
// Dont allow reading directories // Dont allow reading directories
return nil, ErrIsDir return nil, ErrIsDir
case ftpb.Data_File: case ftpb.Data_File:
var fetchChan <-chan *mdag.Node
if serv != nil {
fetchChan = serv.GetDAG(context.TODO(), n)
}
return &DagReader{ return &DagReader{
node: n, node: n,
serv: serv, serv: serv,
buf: bytes.NewBuffer(pb.GetData()), buf: bytes.NewBuffer(pb.GetData()),
fetchChan: fetchChan,
}, nil }, nil
case ftpb.Data_Raw: case ftpb.Data_Raw:
// Raw block will just be a single level, return a byte buffer // Raw block will just be a single level, return a byte buffer
@ -51,19 +59,43 @@ func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) {
// precalcNextBuf follows the next link in line and loads it from the DAGService, // precalcNextBuf follows the next link in line and loads it from the DAGService,
// setting the next buffer to read from // setting the next buffer to read from
func (dr *DagReader) precalcNextBuf() error { func (dr *DagReader) precalcNextBuf() error {
if dr.position >= len(dr.node.Links) { var nxt *mdag.Node
return io.EOF var ok bool
}
nxt, err := dr.node.Links[dr.position].GetNode(dr.serv) // TODO: require non-nil dagservice, use offline bitswap exchange
if err != nil { if dr.serv == nil {
return err // Only used when fetchChan is nil,
// which only happens when passed in a nil dagservice
// TODO: this logic is hard to follow, do it better.
// NOTE: the only time this code is used, is during the
// importer tests, consider just changing those tests
log.Warning("Running DAGReader with nil DAGService!")
if dr.linkPosition >= len(dr.node.Links) {
return io.EOF
}
nxt = dr.node.Links[dr.linkPosition].Node
if nxt == nil {
return errors.New("Got nil node back from link! and no DAGService!")
}
dr.linkPosition++
} else {
if dr.fetchChan == nil {
panic("this is wrong.")
}
select {
case nxt, ok = <-dr.fetchChan:
if !ok {
return io.EOF
}
}
} }
pb := new(ftpb.Data) pb := new(ftpb.Data)
err = proto.Unmarshal(nxt.Data, pb) err := proto.Unmarshal(nxt.Data, pb)
if err != nil { if err != nil {
return err return err
} }
dr.position++
switch pb.GetType() { switch pb.GetType() {
case ftpb.Data_Directory: case ftpb.Data_Directory:

View File

@ -63,6 +63,12 @@ func (k *Key) MarshalJSON() ([]byte, error) {
return json.Marshal(b58.Encode([]byte(*k))) return json.Marshal(b58.Encode([]byte(*k)))
} }
func (k *Key) Loggable() map[string]interface{} {
return map[string]interface{}{
"key": k.String(),
}
}
// KeyFromDsKey returns a Datastore key // KeyFromDsKey returns a Datastore key
func KeyFromDsKey(dsk ds.Key) Key { func KeyFromDsKey(dsk ds.Key) Key {
return Key(dsk.BaseNamespace()) return Key(dsk.BaseNamespace())

View File

@ -4,9 +4,12 @@ import (
crand "crypto/rand" crand "crypto/rand"
"testing" "testing"
dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
"github.com/jbenet/go-ipfs/exchange/offline"
"github.com/jbenet/go-ipfs/peer" "github.com/jbenet/go-ipfs/peer"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
"github.com/jbenet/go-ipfs/blocks/blockstore"
bsrv "github.com/jbenet/go-ipfs/blockservice" bsrv "github.com/jbenet/go-ipfs/blockservice"
dag "github.com/jbenet/go-ipfs/merkledag" dag "github.com/jbenet/go-ipfs/merkledag"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
@ -14,7 +17,8 @@ import (
func GetDAGServ(t testing.TB) dag.DAGService { func GetDAGServ(t testing.TB) dag.DAGService {
dstore := ds.NewMapDatastore() dstore := ds.NewMapDatastore()
bserv, err := bsrv.NewBlockService(dstore, nil) tsds := dssync.MutexWrap(dstore)
bserv, err := bsrv.New(blockstore.NewBlockstore(tsds), offline.Exchange())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }