diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index b37ae027b..f487713e7 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -136,6 +136,10 @@ "Comment": "v0.6.0-5-gf92b795", "Rev": "f92b7950b372b1db80bd3527e4d40e42555fe6c2" }, + { + "ImportPath": "github.com/maybebtc/pubsub", + "Rev": "39ce5f556423a4c7223b370fa17a3bbd75b2d197" + }, { "ImportPath": "github.com/mitchellh/go-homedir", "Rev": "7d2d8c8a4e078ce3c58736ab521a40b37a504c52" @@ -144,10 +148,6 @@ "ImportPath": "github.com/syndtr/goleveldb/leveldb", "Rev": "99056d50e56252fbe0021d5c893defca5a76baf8" }, - { - "ImportPath": "github.com/tuxychandru/pubsub", - "Rev": "02de8aa2db3d570c5ab1be5ba67b456fd0fb7c4e" - }, { "ImportPath": "gopkg.in/natefinch/lumberjack.v2", "Comment": "v1.0-12-gd28785c", diff --git a/Godeps/_workspace/src/github.com/maybebtc/pubsub/Godeps/Godeps.json b/Godeps/_workspace/src/github.com/maybebtc/pubsub/Godeps/Godeps.json new file mode 100644 index 000000000..3a0925c00 --- /dev/null +++ b/Godeps/_workspace/src/github.com/maybebtc/pubsub/Godeps/Godeps.json @@ -0,0 +1,13 @@ +{ + "ImportPath": "github.com/maybebtc/pubsub", + "GoVersion": "go1.3.3", + "Packages": [ + "./..." + ], + "Deps": [ + { + "ImportPath": "gopkg.in/check.v1", + "Rev": "64131543e7896d5bcc6bd5a76287eb75ea96c673" + } + ] +} diff --git a/Godeps/_workspace/src/github.com/maybebtc/pubsub/Godeps/Readme b/Godeps/_workspace/src/github.com/maybebtc/pubsub/Godeps/Readme new file mode 100644 index 000000000..4cdaa53d5 --- /dev/null +++ b/Godeps/_workspace/src/github.com/maybebtc/pubsub/Godeps/Readme @@ -0,0 +1,5 @@ +This directory tree is generated automatically by godep. + +Please do not edit. + +See https://github.com/tools/godep for more information. diff --git a/Godeps/_workspace/src/github.com/maybebtc/pubsub/Makefile b/Godeps/_workspace/src/github.com/maybebtc/pubsub/Makefile new file mode 100644 index 000000000..ed3992384 --- /dev/null +++ b/Godeps/_workspace/src/github.com/maybebtc/pubsub/Makefile @@ -0,0 +1,2 @@ +vendor: + godep save -r ./... diff --git a/Godeps/_workspace/src/github.com/tuxychandru/pubsub/README.md b/Godeps/_workspace/src/github.com/maybebtc/pubsub/README.md similarity index 97% rename from Godeps/_workspace/src/github.com/tuxychandru/pubsub/README.md rename to Godeps/_workspace/src/github.com/maybebtc/pubsub/README.md index c1aab80b5..e176dab68 100644 --- a/Godeps/_workspace/src/github.com/tuxychandru/pubsub/README.md +++ b/Godeps/_workspace/src/github.com/maybebtc/pubsub/README.md @@ -6,7 +6,7 @@ View the [API Documentation](http://godoc.org/github.com/tuxychandru/pubsub). ## License -Copyright (c) 2013, Chandra Sekar S +Copyright (c) 2013, Chandra Sekar S All rights reserved. Redistribution and use in source and binary forms, with or without diff --git a/Godeps/_workspace/src/github.com/tuxychandru/pubsub/pubsub.go b/Godeps/_workspace/src/github.com/maybebtc/pubsub/pubsub.go similarity index 81% rename from Godeps/_workspace/src/github.com/tuxychandru/pubsub/pubsub.go rename to Godeps/_workspace/src/github.com/maybebtc/pubsub/pubsub.go index 9cbf9cffa..f42587d07 100644 --- a/Godeps/_workspace/src/github.com/tuxychandru/pubsub/pubsub.go +++ b/Godeps/_workspace/src/github.com/maybebtc/pubsub/pubsub.go @@ -15,6 +15,7 @@ type operation int const ( sub operation = iota subOnce + subOnceEach pub unsub unsubAll @@ -55,6 +56,12 @@ func (ps *PubSub) SubOnce(topics ...string) chan interface{} { return ps.sub(subOnce, topics...) } +// SubOnceEach returns a channel on which callers receive, at most, one message +// for each topic. +func (ps *PubSub) SubOnceEach(topics ...string) chan interface{} { + return ps.sub(subOnceEach, topics...) +} + func (ps *PubSub) sub(op operation, topics ...string) chan interface{} { ch := make(chan interface{}, ps.capacity) ps.cmdChan <- cmd{op: op, topics: topics, ch: ch} @@ -66,6 +73,12 @@ func (ps *PubSub) AddSub(ch chan interface{}, topics ...string) { ps.cmdChan <- cmd{op: sub, topics: topics, ch: ch} } +// AddSubOnceEach adds subscriptions to an existing channel with SubOnceEach +// behavior. +func (ps *PubSub) AddSubOnceEach(ch chan interface{}, topics ...string) { + ps.cmdChan <- cmd{op: subOnceEach, topics: topics, ch: ch} +} + // Pub publishes the given message to all subscribers of // the specified topics. func (ps *PubSub) Pub(msg interface{}, topics ...string) { @@ -98,7 +111,7 @@ func (ps *PubSub) Shutdown() { func (ps *PubSub) start() { reg := registry{ - topics: make(map[string]map[chan interface{}]bool), + topics: make(map[string]map[chan interface{}]subtype), revTopics: make(map[chan interface{}]map[string]bool), } @@ -119,10 +132,13 @@ loop: for _, topic := range cmd.topics { switch cmd.op { case sub: - reg.add(topic, cmd.ch, false) + reg.add(topic, cmd.ch, stNorm) case subOnce: - reg.add(topic, cmd.ch, true) + reg.add(topic, cmd.ch, stOnceAny) + + case subOnceEach: + reg.add(topic, cmd.ch, stOnceEach) case pub: reg.send(topic, cmd.msg) @@ -146,15 +162,23 @@ loop: // registry maintains the current subscription state. It's not // safe to access a registry from multiple goroutines simultaneously. type registry struct { - topics map[string]map[chan interface{}]bool + topics map[string]map[chan interface{}]subtype revTopics map[chan interface{}]map[string]bool } -func (reg *registry) add(topic string, ch chan interface{}, once bool) { +type subtype int + +const ( + stOnceAny = iota + stOnceEach + stNorm +) + +func (reg *registry) add(topic string, ch chan interface{}, st subtype) { if reg.topics[topic] == nil { - reg.topics[topic] = make(map[chan interface{}]bool) + reg.topics[topic] = make(map[chan interface{}]subtype) } - reg.topics[topic][ch] = once + reg.topics[topic][ch] = st if reg.revTopics[ch] == nil { reg.revTopics[ch] = make(map[string]bool) @@ -163,12 +187,15 @@ func (reg *registry) add(topic string, ch chan interface{}, once bool) { } func (reg *registry) send(topic string, msg interface{}) { - for ch, once := range reg.topics[topic] { + for ch, st := range reg.topics[topic] { ch <- msg - if once { + switch st { + case stOnceAny: for topic := range reg.revTopics[ch] { reg.remove(topic, ch) } + case stOnceEach: + reg.remove(topic, ch) } } } diff --git a/Godeps/_workspace/src/github.com/tuxychandru/pubsub/pubsub_test.go b/Godeps/_workspace/src/github.com/maybebtc/pubsub/pubsub_test.go similarity index 92% rename from Godeps/_workspace/src/github.com/tuxychandru/pubsub/pubsub_test.go rename to Godeps/_workspace/src/github.com/maybebtc/pubsub/pubsub_test.go index 16392d33b..f4a80ab62 100644 --- a/Godeps/_workspace/src/github.com/tuxychandru/pubsub/pubsub_test.go +++ b/Godeps/_workspace/src/github.com/maybebtc/pubsub/pubsub_test.go @@ -5,7 +5,7 @@ package pubsub import ( - check "launchpad.net/gocheck" + check "gopkg.in/check.v1" "runtime" "testing" "time" @@ -181,6 +181,23 @@ func (s *Suite) TestMultiSubOnce(c *check.C) { ps.Shutdown() } +func (s *Suite) TestMultiSubOnceEach(c *check.C) { + ps := New(1) + ch := ps.SubOnceEach("t1", "t2") + + ps.Pub("hi", "t1") + c.Check(<-ch, check.Equals, "hi") + + ps.Pub("hi!", "t1") // ignored + + ps.Pub("hello", "t2") + c.Check(<-ch, check.Equals, "hello") + + _, ok := <-ch + c.Check(ok, check.Equals, false) + ps.Shutdown() +} + func (s *Suite) TestMultiPub(c *check.C) { ps := New(1) ch1 := ps.Sub("t1") diff --git a/blockstore/blockstore.go b/blocks/blockstore/blockstore.go similarity index 83% rename from blockstore/blockstore.go rename to blocks/blockstore/blockstore.go index b4c0fd7fb..68ccc7c74 100644 --- a/blockstore/blockstore.go +++ b/blocks/blockstore/blockstore.go @@ -15,6 +15,8 @@ import ( var ValueTypeMismatch = errors.New("The retrieved value is not a Block") type Blockstore interface { + DeleteBlock(u.Key) error + Has(u.Key) (bool, error) Get(u.Key) (*blocks.Block, error) Put(*blocks.Block) error } @@ -45,3 +47,11 @@ func (bs *blockstore) Get(k u.Key) (*blocks.Block, error) { func (bs *blockstore) Put(block *blocks.Block) error { return bs.datastore.Put(block.Key().DsKey(), block.Data) } + +func (bs *blockstore) Has(k u.Key) (bool, error) { + return bs.datastore.Has(k.DsKey()) +} + +func (s *blockstore) DeleteBlock(k u.Key) error { + return s.datastore.Delete(k.DsKey()) +} diff --git a/blockstore/blockstore_test.go b/blocks/blockstore/blockstore_test.go similarity index 100% rename from blockstore/blockstore_test.go rename to blocks/blockstore/blockstore_test.go diff --git a/blocks/blocksutil/block_generator.go b/blocks/blocksutil/block_generator.go new file mode 100644 index 000000000..20f6a3edc --- /dev/null +++ b/blocks/blocksutil/block_generator.go @@ -0,0 +1,25 @@ +package blocksutil + +import "github.com/jbenet/go-ipfs/blocks" + +func NewBlockGenerator() BlockGenerator { + return BlockGenerator{} +} + +type BlockGenerator struct { + seq int +} + +func (bg *BlockGenerator) Next() *blocks.Block { + bg.seq++ + return blocks.NewBlock([]byte(string(bg.seq))) +} + +func (bg *BlockGenerator) Blocks(n int) []*blocks.Block { + blocks := make([]*blocks.Block, 0) + for i := 0; i < n; i++ { + b := bg.Next() + blocks = append(blocks, b) + } + return blocks +} diff --git a/blockservice/blocks_test.go b/blockservice/blocks_test.go index 1e837eb5d..1a75723e2 100644 --- a/blockservice/blocks_test.go +++ b/blockservice/blocks_test.go @@ -6,15 +6,22 @@ import ( "time" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" blocks "github.com/jbenet/go-ipfs/blocks" + blockstore "github.com/jbenet/go-ipfs/blocks/blockstore" + blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil" + bitswap "github.com/jbenet/go-ipfs/exchange/bitswap" + tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" + offline "github.com/jbenet/go-ipfs/exchange/offline" + "github.com/jbenet/go-ipfs/routing/mock" u "github.com/jbenet/go-ipfs/util" ) func TestBlocks(t *testing.T) { d := ds.NewMapDatastore() - bs, err := NewBlockService(d, nil) + tsds := dssync.MutexWrap(d) + bs, err := New(blockstore.NewBlockstore(tsds), offline.Exchange()) if err != nil { t.Error("failed to construct block service", err) return @@ -55,3 +62,46 @@ func TestBlocks(t *testing.T) { t.Error("Block data is not equal.") } } + +func TestGetBlocksSequential(t *testing.T) { + net := tn.VirtualNetwork() + rs := mock.VirtualRoutingServer() + sg := bitswap.NewSessionGenerator(net, rs) + bg := blocksutil.NewBlockGenerator() + + instances := sg.Instances(4) + blks := bg.Blocks(50) + // TODO: verify no duplicates + + var servs []*BlockService + for _, i := range instances { + bserv, err := New(i.Blockstore, i.Exchange) + if err != nil { + t.Fatal(err) + } + servs = append(servs, bserv) + } + + var keys []u.Key + for _, blk := range blks { + keys = append(keys, blk.Key()) + servs[0].AddBlock(blk) + } + + t.Log("one instance at a time, get blocks concurrently") + + for i := 1; i < len(instances); i++ { + ctx, _ := context.WithTimeout(context.TODO(), time.Second*5) + out := servs[i].GetBlocks(ctx, keys) + gotten := make(map[u.Key]*blocks.Block) + for blk := range out { + if _, ok := gotten[blk.Key()]; ok { + t.Fatal("Got duplicate block!") + } + gotten[blk.Key()] = blk + } + if len(gotten) != len(blks) { + t.Fatalf("Didnt get enough blocks back: %d/%d", len(gotten), len(blks)) + } + } +} diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 82a1b03c1..0ddec7955 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -9,9 +9,9 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" - mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash" blocks "github.com/jbenet/go-ipfs/blocks" + "github.com/jbenet/go-ipfs/blocks/blockstore" exchange "github.com/jbenet/go-ipfs/exchange" u "github.com/jbenet/go-ipfs/util" ) @@ -19,25 +19,28 @@ import ( var log = u.Logger("blockservice") var ErrNotFound = errors.New("blockservice: key not found") -// BlockService is a block datastore. +// BlockService is a hybrid block datastore. It stores data in a local +// datastore and may retrieve data from a remote Exchange. // It uses an internal `datastore.Datastore` instance to store values. type BlockService struct { - Datastore ds.Datastore - Remote exchange.Interface + // TODO don't expose underlying impl details + Blockstore blockstore.Blockstore + Remote exchange.Interface } // NewBlockService creates a BlockService with given datastore instance. -func NewBlockService(d ds.Datastore, rem exchange.Interface) (*BlockService, error) { - if d == nil { - return nil, fmt.Errorf("BlockService requires valid datastore") +func New(bs blockstore.Blockstore, rem exchange.Interface) (*BlockService, error) { + if bs == nil { + return nil, fmt.Errorf("BlockService requires valid blockstore") } if rem == nil { log.Warning("blockservice running in local (offline) mode.") } - return &BlockService{Datastore: d, Remote: rem}, nil + return &BlockService{Blockstore: bs, Remote: rem}, nil } // AddBlock adds a particular block to the service, Putting it into the datastore. +// TODO pass a context into this if the remote.HasBlock is going to remain here. func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) { k := b.Key() log.Debugf("blockservice: storing [%s] in datastore", k) @@ -47,7 +50,7 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) { // check if we have it before adding. this is an extra read, but large writes // are more expensive. // TODO(jbenet) cheaper has. https://github.com/jbenet/go-datastore/issues/6 - has, err := s.Datastore.Has(k.DsKey()) + has, err := s.Blockstore.Has(k) if err != nil { return k, err } @@ -55,15 +58,17 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) { log.Debugf("blockservice: storing [%s] in datastore (already stored)", k) } else { log.Debugf("blockservice: storing [%s] in datastore", k) - err := s.Datastore.Put(k.DsKey(), b.Data) + err := s.Blockstore.Put(b) if err != nil { return k, err } } + // TODO this operation rate-limits blockservice operations, we should + // consider moving this to an sync process. if s.Remote != nil { ctx := context.TODO() - err = s.Remote.HasBlock(ctx, *b) + err = s.Remote.HasBlock(ctx, b) } return k, err } @@ -72,20 +77,14 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) { // Getting it from the datastore using the key (hash). func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, error) { log.Debugf("BlockService GetBlock: '%s'", k) - datai, err := s.Datastore.Get(k.DsKey()) + block, err := s.Blockstore.Get(k) if err == nil { - log.Debug("Blockservice: Got data in datastore.") - bdata, ok := datai.([]byte) - if !ok { - return nil, fmt.Errorf("data associated with %s is not a []byte", k) - } - return &blocks.Block{ - Multihash: mh.Multihash(k), - Data: bdata, - }, nil + return block, nil + // TODO be careful checking ErrNotFound. If the underlying + // implementation changes, this will break. } else if err == ds.ErrNotFound && s.Remote != nil { log.Debug("Blockservice: Searching bitswap.") - blk, err := s.Remote.Block(ctx, k) + blk, err := s.Remote.GetBlock(ctx, k) if err != nil { return nil, err } @@ -96,7 +95,46 @@ func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, er } } +// GetBlocks gets a list of blocks asynchronously and returns through +// the returned channel. +// NB: No guarantees are made about order. +func (s *BlockService) GetBlocks(ctx context.Context, ks []u.Key) <-chan *blocks.Block { + out := make(chan *blocks.Block, 0) + go func() { + defer close(out) + var misses []u.Key + for _, k := range ks { + hit, err := s.Blockstore.Get(k) + if err != nil { + misses = append(misses, k) + continue + } + log.Debug("Blockservice: Got data in datastore.") + select { + case out <- hit: + case <-ctx.Done(): + return + } + } + + rblocks, err := s.Remote.GetBlocks(ctx, misses) + if err != nil { + log.Errorf("Error with GetBlocks: %s", err) + return + } + + for b := range rblocks { + select { + case out <- b: + case <-ctx.Done(): + return + } + } + }() + return out +} + // DeleteBlock deletes a block in the blockservice from the datastore func (s *BlockService) DeleteBlock(k u.Key) error { - return s.Datastore.Delete(k.DsKey()) + return s.Blockstore.DeleteBlock(k) } diff --git a/cmd/ipfs/main.go b/cmd/ipfs/main.go index 20278ab31..18923a64a 100644 --- a/cmd/ipfs/main.go +++ b/cmd/ipfs/main.go @@ -6,6 +6,7 @@ import ( "io" "os" "os/signal" + "runtime" "runtime/pprof" "syscall" @@ -54,6 +55,7 @@ type cmdInvocation struct { // - output the response // - if anything fails, print error, maybe with help func main() { + runtime.GOMAXPROCS(3) ctx := context.Background() var err error var invoc cmdInvocation @@ -510,6 +512,6 @@ func (i *cmdInvocation) setupInterruptHandler() { func allInterruptSignals() chan os.Signal { sigc := make(chan os.Signal, 1) signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT, - syscall.SIGTERM, syscall.SIGQUIT) + syscall.SIGTERM) return sigc } diff --git a/core/core.go b/core/core.go index 58d99b6f4..148853bfc 100644 --- a/core/core.go +++ b/core/core.go @@ -8,12 +8,14 @@ import ( b58 "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-base58" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + blockstore "github.com/jbenet/go-ipfs/blocks/blockstore" bserv "github.com/jbenet/go-ipfs/blockservice" config "github.com/jbenet/go-ipfs/config" diag "github.com/jbenet/go-ipfs/diagnostics" exchange "github.com/jbenet/go-ipfs/exchange" bitswap "github.com/jbenet/go-ipfs/exchange/bitswap" bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network" + "github.com/jbenet/go-ipfs/exchange/offline" mount "github.com/jbenet/go-ipfs/fuse/mount" merkledag "github.com/jbenet/go-ipfs/merkledag" namesys "github.com/jbenet/go-ipfs/namesys" @@ -28,7 +30,7 @@ import ( dht "github.com/jbenet/go-ipfs/routing/dht" u "github.com/jbenet/go-ipfs/util" ctxc "github.com/jbenet/go-ipfs/util/ctxcloser" - "github.com/jbenet/go-ipfs/util/debugerror" + debugerror "github.com/jbenet/go-ipfs/util/debugerror" "github.com/jbenet/go-ipfs/util/eventlog" ) @@ -114,6 +116,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (n *IpfsNode, err error) { Config: cfg, } n.ContextCloser = ctxc.NewContextCloser(ctx, n.teardown) + ctx = n.Context() // setup datastore. if n.Datastore, err = makeDatastore(cfg.Datastore); err != nil { @@ -127,6 +130,8 @@ func NewIpfsNode(cfg *config.Config, online bool) (n *IpfsNode, err error) { return nil, debugerror.Wrap(err) } + n.Exchange = offline.Exchange() + // setup online services if online { @@ -169,14 +174,16 @@ func NewIpfsNode(cfg *config.Config, online bool) (n *IpfsNode, err error) { // setup exchange service const alwaysSendToPeer = true // use YesManStrategy bitswapNetwork := bsnet.NewFromIpfsNetwork(exchangeService, n.Network) - n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, n.Routing, n.Datastore, alwaysSendToPeer) + bstore := blockstore.NewBlockstore(n.Datastore) + + n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, n.Routing, bstore, alwaysSendToPeer) go initConnections(ctx, n.Config, n.Peerstore, dhtRouting) } // TODO(brian): when offline instantiate the BlockService with a bitswap // session that simply doesn't return blocks - n.Blocks, err = bserv.NewBlockService(n.Datastore, n.Exchange) + n.Blocks, err = bserv.New(blockstore.NewBlockstore(n.Datastore), n.Exchange) if err != nil { return nil, debugerror.Wrap(err) } diff --git a/core/mock.go b/core/mock.go index 92ffcb57d..2d2359277 100644 --- a/core/mock.go +++ b/core/mock.go @@ -3,8 +3,10 @@ package core import ( ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" syncds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" - bs "github.com/jbenet/go-ipfs/blockservice" + "github.com/jbenet/go-ipfs/blocks/blockstore" + blockservice "github.com/jbenet/go-ipfs/blockservice" ci "github.com/jbenet/go-ipfs/crypto" + "github.com/jbenet/go-ipfs/exchange/offline" mdag "github.com/jbenet/go-ipfs/merkledag" nsys "github.com/jbenet/go-ipfs/namesys" path "github.com/jbenet/go-ipfs/path" @@ -43,9 +45,7 @@ func NewMockNode() (*IpfsNode, error) { nd.Routing = dht // Bitswap - //?? - - bserv, err := bs.NewBlockService(nd.Datastore, nil) + bserv, err := blockservice.New(blockstore.NewBlockstore(nd.Datastore), offline.Exchange()) if err != nil { return nil, err } diff --git a/exchange/bitswap/README.md b/exchange/bitswap/README.md new file mode 100644 index 000000000..bfa0aaa86 --- /dev/null +++ b/exchange/bitswap/README.md @@ -0,0 +1,47 @@ +#Welcome to Bitswap +###(The data trading engine) + +Bitswap is the module that is responsible for requesting and providing data +blocks over the network to and from other ipfs peers. The role of bitswap is +to be a merchant in the large global marketplace of data. + +##Main Operations +Bitswap has three high level operations: + +- **GetBlocks** + - `GetBlocks` is a bitswap method used to request multiple blocks that are likely +to all be provided by the same set of peers (part of a single file, for example). + +- **GetBlock** + - `GetBlock` is a special case of `GetBlocks` that just requests a single block. + +- **HasBlock** + - `HasBlock` registers a local block with bitswap. Bitswap will then send that +block to any connected peers who want it (with the strategies approval), record +that transaction in the ledger and announce to the DHT that the block is being +provided. + +##Internal Details +All `GetBlock` requests are relayed into a single for-select loop via channels. +Calls to `GetBlocks` will have `FindProviders` called for only the first key in +the set initially, This is an optimization attempting to cut down on the number +of RPCs required. After a timeout (specified by the strategies +`GetRebroadcastDelay`) Bitswap will iterate through all keys still in the local +wantlist, perform a find providers call for each, and sent the wantlist out to +those providers. This is the fallback behaviour for cases where our initial +assumption about one peer potentially having multiple blocks in a set does not +hold true. + +When receiving messages, Bitswaps `ReceiveMessage` method is called. A bitswap +message may contain the wantlist of the peer who sent the message, and an array +of blocks that were on our local wantlist. Any blocks we receive in a bitswap +message will be passed to `HasBlock`, and the other peers wantlist gets updated +in the strategy by `bs.strategy.MessageReceived`. +If another peers wantlist is received, Bitswap will call its strategies +`ShouldSendBlockToPeer` method to determine whether or not the other peer will +be sent the block they are requesting (if we even have it). + +##Outstanding TODOs: +- [ ] Ensure only one request active per key +- [ ] More involved strategies +- [ ] Ensure only wanted blocks are counted in ledgers diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 8af8426d3..e00b23f91 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -3,13 +3,13 @@ package bitswap import ( + "sync" "time" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" blocks "github.com/jbenet/go-ipfs/blocks" - blockstore "github.com/jbenet/go-ipfs/blockstore" + blockstore "github.com/jbenet/go-ipfs/blocks/blockstore" exchange "github.com/jbenet/go-ipfs/exchange" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network" @@ -17,17 +17,26 @@ import ( strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy" peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" + eventlog "github.com/jbenet/go-ipfs/util/eventlog" ) -var log = u.Logger("bitswap") +var log = eventlog.Logger("bitswap") + +// Number of providers to request for sending a wantlist to +// TODO: if a 'non-nice' strategy is implemented, consider increasing this value +const maxProvidersPerRequest = 3 + +const providerRequestTimeout = time.Second * 10 +const hasBlockTimeout = time.Second * 15 // New initializes a BitSwap instance that communicates over the // provided BitSwapNetwork. This function registers the returned instance as // the network delegate. // Runs until context is cancelled -func New(ctx context.Context, p peer.Peer, - network bsnet.BitSwapNetwork, routing bsnet.Routing, - d ds.ThreadSafeDatastore, nice bool) exchange.Interface { +func New(parent context.Context, p peer.Peer, network bsnet.BitSwapNetwork, routing bsnet.Routing, + bstore blockstore.Blockstore, nice bool) exchange.Interface { + + ctx, cancelFunc := context.WithCancel(parent) notif := notifications.New() go func() { @@ -36,14 +45,17 @@ func New(ctx context.Context, p peer.Peer, }() bs := &bitswap{ - blockstore: blockstore.NewBlockstore(d), + blockstore: bstore, + cancelFunc: cancelFunc, notifications: notif, strategy: strategy.New(nice), routing: routing, sender: network, wantlist: u.NewKeySet(), + batchRequests: make(chan []u.Key, 32), } network.SetDelegate(bs) + go bs.loop(ctx) return bs } @@ -63,91 +75,207 @@ type bitswap struct { notifications notifications.PubSub + // Requests for a set of related blocks + // the assumption is made that the same peer is likely to + // have more than a single block in the set + batchRequests chan []u.Key + // strategy listens to network traffic and makes decisions about how to // interact with partners. // TODO(brian): save the strategy's state to the datastore strategy strategy.Strategy wantlist u.KeySet + + // cancelFunc signals cancellation to the bitswap event loop + cancelFunc func() } // GetBlock attempts to retrieve a particular block from peers within the -// deadline enforced by the context -// -// TODO ensure only one active request per key -func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) { - log.Debugf("Get Block %v", k) - now := time.Now() - defer func() { - log.Debugf("GetBlock took %f secs", time.Now().Sub(now).Seconds()) - }() +// deadline enforced by the context. +func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) { + + // Any async work initiated by this function must end when this function + // returns. To ensure this, derive a new context. Note that it is okay to + // listen on parent in this scope, but NOT okay to pass |parent| to + // functions called by this one. Otherwise those functions won't return + // when this context's cancel func is executed. This is difficult to + // enforce. May this comment keep you safe. ctx, cancelFunc := context.WithCancel(parent) - defer cancelFunc() - bs.wantlist.Add(k) - promise := bs.notifications.Subscribe(ctx, k) + ctx = eventlog.ContextWithLoggable(ctx, eventlog.Uuid("GetBlockRequest")) + log.Event(ctx, "GetBlockRequestBegin", &k) - const maxProviders = 20 - peersToQuery := bs.routing.FindProvidersAsync(ctx, k, maxProviders) - - go func() { - message := bsmsg.New() - for _, wanted := range bs.wantlist.Keys() { - message.AddWanted(wanted) - } - for peerToQuery := range peersToQuery { - log.Debugf("bitswap got peersToQuery: %s", peerToQuery) - go func(p peer.Peer) { - - log.Debugf("bitswap dialing peer: %s", p) - err := bs.sender.DialPeer(ctx, p) - if err != nil { - log.Errorf("Error sender.DialPeer(%s)", p) - return - } - - response, err := bs.sender.SendRequest(ctx, p, message) - if err != nil { - log.Errorf("Error sender.SendRequest(%s) = %s", p, err) - return - } - // FIXME ensure accounting is handled correctly when - // communication fails. May require slightly different API to - // get better guarantees. May need shared sequence numbers. - bs.strategy.MessageSent(p, message) - - if response == nil { - return - } - bs.ReceiveMessage(ctx, p, response) - }(peerToQuery) - } + defer func() { + cancelFunc() + log.Event(ctx, "GetBlockRequestEnd", &k) }() + promise, err := bs.GetBlocks(ctx, []u.Key{k}) + if err != nil { + return nil, err + } + select { case block := <-promise: - bs.wantlist.Remove(k) - return &block, nil + return block, nil case <-parent.Done(): return nil, parent.Err() } + +} + +// GetBlocks returns a channel where the caller may receive blocks that +// correspond to the provided |keys|. Returns an error if BitSwap is unable to +// begin this request within the deadline enforced by the context. +// +// NB: Your request remains open until the context expires. To conserve +// resources, provide a context with a reasonably short deadline (ie. not one +// that lasts throughout the lifetime of the server) +func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.Block, error) { + // TODO log the request + + promise := bs.notifications.Subscribe(ctx, keys...) + select { + case bs.batchRequests <- keys: + return promise, nil + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) error { + if peers == nil { + panic("Cant send wantlist to nil peerchan") + } + message := bsmsg.New() + for _, wanted := range bs.wantlist.Keys() { + message.AddWanted(wanted) + } + for peerToQuery := range peers { + log.Debug("sending query to: %s", peerToQuery) + log.Event(ctx, "PeerToQuery", peerToQuery) + go func(p peer.Peer) { + + log.Event(ctx, "DialPeer", p) + err := bs.sender.DialPeer(ctx, p) + if err != nil { + log.Errorf("Error sender.DialPeer(%s): %s", p, err) + return + } + + err = bs.sender.SendMessage(ctx, p, message) + if err != nil { + log.Errorf("Error sender.SendMessage(%s) = %s", p, err) + return + } + // FIXME ensure accounting is handled correctly when + // communication fails. May require slightly different API to + // get better guarantees. May need shared sequence numbers. + bs.strategy.MessageSent(p, message) + }(peerToQuery) + } + return nil +} + +func (bs *bitswap) sendWantlistToProviders(ctx context.Context, ks []u.Key) { + wg := sync.WaitGroup{} + for _, k := range ks { + wg.Add(1) + go func(k u.Key) { + child, _ := context.WithTimeout(ctx, providerRequestTimeout) + providers := bs.routing.FindProvidersAsync(child, k, maxProvidersPerRequest) + + err := bs.sendWantListTo(ctx, providers) + if err != nil { + log.Errorf("error sending wantlist: %s", err) + } + wg.Done() + }(k) + } + wg.Wait() +} + +// TODO ensure only one active request per key +func (bs *bitswap) loop(parent context.Context) { + + ctx, cancel := context.WithCancel(parent) + + broadcastSignal := time.NewTicker(bs.strategy.GetRebroadcastDelay()) + defer func() { + cancel() // signal to derived async functions + broadcastSignal.Stop() + }() + + for { + select { + case <-broadcastSignal.C: + // Resend unfulfilled wantlist keys + bs.sendWantlistToProviders(ctx, bs.wantlist.Keys()) + case ks := <-bs.batchRequests: + // TODO: implement batching on len(ks) > X for some X + // i.e. if given 20 keys, fetch first five, then next + // five, and so on, so we are more likely to be able to + // effectively stream the data + if len(ks) == 0 { + log.Warning("Received batch request for zero blocks") + continue + } + for _, k := range ks { + bs.wantlist.Add(k) + } + // NB: send want list to providers for the first peer in this list. + // the assumption is made that the providers of the first key in + // the set are likely to have others as well. + // This currently holds true in most every situation, since when + // pinning a file, you store and provide all blocks associated with + // it. Later, this assumption may not hold as true if we implement + // newer bitswap strategies. + child, _ := context.WithTimeout(ctx, providerRequestTimeout) + providers := bs.routing.FindProvidersAsync(child, ks[0], maxProvidersPerRequest) + + err := bs.sendWantListTo(ctx, providers) + if err != nil { + log.Errorf("error sending wantlist: %s", err) + } + case <-parent.Done(): + return + } + } } // HasBlock announces the existance of a block to this bitswap service. The // service will potentially notify its peers. -func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error { - log.Debugf("Has Block %v", blk.Key()) +func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { + // TODO check all errors + log.Debugf("Has Block %s", blk.Key()) bs.wantlist.Remove(blk.Key()) - bs.sendToPeersThatWant(ctx, blk) - return bs.routing.Provide(ctx, blk.Key()) + bs.notifications.Publish(blk) + + child, _ := context.WithTimeout(ctx, hasBlockTimeout) + bs.sendToPeersThatWant(child, blk) + child, _ = context.WithTimeout(ctx, hasBlockTimeout) + return bs.routing.Provide(child, blk.Key()) +} + +// receiveBlock handles storing the block in the blockstore and calling HasBlock +func (bs *bitswap) receiveBlock(ctx context.Context, block *blocks.Block) { + // TODO verify blocks? + if err := bs.blockstore.Put(block); err != nil { + log.Criticalf("error putting block: %s", err) + return + } + err := bs.HasBlock(ctx, block) + if err != nil { + log.Warningf("HasBlock errored: %s", err) + } } // TODO(brian): handle errors func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) ( peer.Peer, bsmsg.BitSwapMessage) { - log.Debugf("ReceiveMessage from %v", p.Key()) - log.Debugf("Message wantlist: %v", incoming.Wantlist()) + log.Debugf("ReceiveMessage from %s", p) if p == nil { log.Error("Received message from nil peer!") @@ -163,39 +291,39 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm // Record message bytes in ledger // TODO: this is bad, and could be easily abused. // Should only track *useful* messages in ledger - bs.strategy.MessageReceived(p, incoming) // FIRST + // This call records changes to wantlists, blocks received, + // and number of bytes transfered. + bs.strategy.MessageReceived(p, incoming) - for _, block := range incoming.Blocks() { - // TODO verify blocks? - if err := bs.blockstore.Put(&block); err != nil { - continue // FIXME(brian): err ignored + go func() { + for _, block := range incoming.Blocks() { + bs.receiveBlock(ctx, block) } - bs.notifications.Publish(block) - err := bs.HasBlock(ctx, block) - if err != nil { - log.Warningf("HasBlock errored: %s", err) - } - } + }() - message := bsmsg.New() - for _, wanted := range bs.wantlist.Keys() { - message.AddWanted(wanted) - } for _, key := range incoming.Wantlist() { - // TODO: might be better to check if we have the block before checking - // if we should send it to someone if bs.strategy.ShouldSendBlockToPeer(key, p) { if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil { continue } else { - message.AddBlock(*block) + // Create a separate message to send this block in + blkmsg := bsmsg.New() + + // TODO: only send this the first time + // no sense in sending our wantlist to the + // same peer multiple times + for _, k := range bs.wantlist.Keys() { + blkmsg.AddWanted(k) + } + + blkmsg.AddBlock(block) + bs.send(ctx, p, blkmsg) } } } - defer bs.strategy.MessageSent(p, message) - log.Debug("Returning message.") - return p, message + // TODO: consider changing this function to not return anything + return nil, nil } func (bs *bitswap) ReceiveError(err error) { @@ -211,8 +339,8 @@ func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage bs.strategy.MessageSent(p, m) } -func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) { - log.Debugf("Sending %v to peers that want it", block.Key()) +func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block *blocks.Block) { + log.Debugf("Sending %s to peers that want it", block) for _, p := range bs.strategy.Peers() { if bs.strategy.BlockIsWantedByPeer(block.Key(), p) { @@ -228,3 +356,8 @@ func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) } } } + +func (bs *bitswap) Close() error { + bs.cancelFunc() + return nil // to conform to Closer interface +} diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index a851f0f56..d26a8ffc9 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -7,20 +7,28 @@ import ( "time" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - - ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" - ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" blocks "github.com/jbenet/go-ipfs/blocks" - bstore "github.com/jbenet/go-ipfs/blockstore" - exchange "github.com/jbenet/go-ipfs/exchange" - notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications" - strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy" + blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil" tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" peer "github.com/jbenet/go-ipfs/peer" mock "github.com/jbenet/go-ipfs/routing/mock" - util "github.com/jbenet/go-ipfs/util" ) +func TestClose(t *testing.T) { + // TODO + t.Skip("TODO Bitswap's Close implementation is a WIP") + vnet := tn.VirtualNetwork() + rout := mock.VirtualRoutingServer() + sesgen := NewSessionGenerator(vnet, rout) + bgen := blocksutil.NewBlockGenerator() + + block := bgen.Next() + bitswap := sesgen.Next() + + bitswap.Exchange.Close() + bitswap.Exchange.GetBlock(context.Background(), block.Key()) +} + func TestGetBlockTimeout(t *testing.T) { net := tn.VirtualNetwork() @@ -31,7 +39,7 @@ func TestGetBlockTimeout(t *testing.T) { ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond) block := blocks.NewBlock([]byte("block")) - _, err := self.exchange.Block(ctx, block.Key()) + _, err := self.Exchange.GetBlock(ctx, block.Key()) if err != context.DeadlineExceeded { t.Fatal("Expected DeadlineExceeded error") @@ -50,7 +58,7 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { solo := g.Next() ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond) - _, err := solo.exchange.Block(ctx, block.Key()) + _, err := solo.Exchange.GetBlock(ctx, block.Key()) if err != context.DeadlineExceeded { t.Fatal("Expected DeadlineExceeded error") @@ -68,17 +76,17 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { hasBlock := g.Next() - if err := hasBlock.blockstore.Put(block); err != nil { + if err := hasBlock.Blockstore.Put(block); err != nil { t.Fatal(err) } - if err := hasBlock.exchange.HasBlock(context.Background(), *block); err != nil { + if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil { t.Fatal(err) } wantsBlock := g.Next() ctx, _ := context.WithTimeout(context.Background(), time.Second) - received, err := wantsBlock.exchange.Block(ctx, block.Key()) + received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key()) if err != nil { t.Log(err) t.Fatal("Expected to succeed") @@ -89,19 +97,36 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { } } -func TestSwarm(t *testing.T) { +func TestLargeSwarm(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + t.Parallel() + numInstances := 5 + numBlocks := 2 + PerformDistributionTest(t, numInstances, numBlocks) +} + +func TestLargeFile(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + t.Parallel() + numInstances := 10 + numBlocks := 100 + PerformDistributionTest(t, numInstances, numBlocks) +} + +func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { if testing.Short() { t.SkipNow() } net := tn.VirtualNetwork() rs := mock.VirtualRoutingServer() sg := NewSessionGenerator(net, rs) - bg := NewBlockGenerator() + bg := blocksutil.NewBlockGenerator() - t.Log("Create a ton of instances, and just a few blocks") - - numInstances := 500 - numBlocks := 2 + t.Log("Test a few nodes trying to get one file with a lot of blocks") instances := sg.Instances(numInstances) blocks := bg.Blocks(numBlocks) @@ -110,9 +135,9 @@ func TestSwarm(t *testing.T) { first := instances[0] for _, b := range blocks { - first.blockstore.Put(b) - first.exchange.HasBlock(context.Background(), *b) - rs.Announce(first.peer, b.Key()) + first.Blockstore.Put(b) + first.Exchange.HasBlock(context.Background(), b) + rs.Announce(first.Peer, b.Key()) } t.Log("Distribute!") @@ -133,16 +158,16 @@ func TestSwarm(t *testing.T) { for _, inst := range instances { for _, b := range blocks { - if _, err := inst.blockstore.Get(b.Key()); err != nil { + if _, err := inst.Blockstore.Get(b.Key()); err != nil { t.Fatal(err) } } } } -func getOrFail(bitswap instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) { - if _, err := bitswap.blockstore.Get(b.Key()); err != nil { - _, err := bitswap.exchange.Block(context.Background(), b.Key()) +func getOrFail(bitswap Instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) { + if _, err := bitswap.Blockstore.Get(b.Key()); err != nil { + _, err := bitswap.Exchange.GetBlock(context.Background(), b.Key()) if err != nil { t.Fatal(err) } @@ -152,149 +177,67 @@ func getOrFail(bitswap instance, b *blocks.Block, t *testing.T, wg *sync.WaitGro // TODO simplify this test. get to the _essence_! func TestSendToWantingPeer(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + net := tn.VirtualNetwork() rs := mock.VirtualRoutingServer() sg := NewSessionGenerator(net, rs) - bg := NewBlockGenerator() + bg := blocksutil.NewBlockGenerator() me := sg.Next() w := sg.Next() o := sg.Next() - t.Logf("Session %v\n", me.peer) - t.Logf("Session %v\n", w.peer) - t.Logf("Session %v\n", o.peer) + t.Logf("Session %v\n", me.Peer) + t.Logf("Session %v\n", w.Peer) + t.Logf("Session %v\n", o.Peer) alpha := bg.Next() - const timeout = 1 * time.Millisecond // FIXME don't depend on time + const timeout = 100 * time.Millisecond // FIXME don't depend on time - t.Logf("Peer %v attempts to get %v. NB: not available\n", w.peer, alpha.Key()) + t.Logf("Peer %v attempts to get %v. NB: not available\n", w.Peer, alpha.Key()) ctx, _ := context.WithTimeout(context.Background(), timeout) - _, err := w.exchange.Block(ctx, alpha.Key()) + _, err := w.Exchange.GetBlock(ctx, alpha.Key()) if err == nil { t.Fatalf("Expected %v to NOT be available", alpha.Key()) } beta := bg.Next() - t.Logf("Peer %v announes availability of %v\n", w.peer, beta.Key()) + t.Logf("Peer %v announes availability of %v\n", w.Peer, beta.Key()) ctx, _ = context.WithTimeout(context.Background(), timeout) - if err := w.blockstore.Put(&beta); err != nil { + if err := w.Blockstore.Put(beta); err != nil { t.Fatal(err) } - w.exchange.HasBlock(ctx, beta) + w.Exchange.HasBlock(ctx, beta) - t.Logf("%v gets %v from %v and discovers it wants %v\n", me.peer, beta.Key(), w.peer, alpha.Key()) + t.Logf("%v gets %v from %v and discovers it wants %v\n", me.Peer, beta.Key(), w.Peer, alpha.Key()) ctx, _ = context.WithTimeout(context.Background(), timeout) - if _, err := me.exchange.Block(ctx, beta.Key()); err != nil { + if _, err := me.Exchange.GetBlock(ctx, beta.Key()); err != nil { t.Fatal(err) } - t.Logf("%v announces availability of %v\n", o.peer, alpha.Key()) + t.Logf("%v announces availability of %v\n", o.Peer, alpha.Key()) ctx, _ = context.WithTimeout(context.Background(), timeout) - if err := o.blockstore.Put(&alpha); err != nil { + if err := o.Blockstore.Put(alpha); err != nil { t.Fatal(err) } - o.exchange.HasBlock(ctx, alpha) + o.Exchange.HasBlock(ctx, alpha) - t.Logf("%v requests %v\n", me.peer, alpha.Key()) + t.Logf("%v requests %v\n", me.Peer, alpha.Key()) ctx, _ = context.WithTimeout(context.Background(), timeout) - if _, err := me.exchange.Block(ctx, alpha.Key()); err != nil { + if _, err := me.Exchange.GetBlock(ctx, alpha.Key()); err != nil { t.Fatal(err) } - t.Logf("%v should now have %v\n", w.peer, alpha.Key()) - block, err := w.blockstore.Get(alpha.Key()) + t.Logf("%v should now have %v\n", w.Peer, alpha.Key()) + block, err := w.Blockstore.Get(alpha.Key()) if err != nil { - t.Fatal("Should not have received an error") + t.Fatalf("Should not have received an error: %s", err) } if block.Key() != alpha.Key() { t.Fatal("Expected to receive alpha from me") } } - -func NewBlockGenerator() BlockGenerator { - return BlockGenerator{} -} - -type BlockGenerator struct { - seq int -} - -func (bg *BlockGenerator) Next() blocks.Block { - bg.seq++ - return *blocks.NewBlock([]byte(string(bg.seq))) -} - -func (bg *BlockGenerator) Blocks(n int) []*blocks.Block { - blocks := make([]*blocks.Block, 0) - for i := 0; i < n; i++ { - b := bg.Next() - blocks = append(blocks, &b) - } - return blocks -} - -func NewSessionGenerator( - net tn.Network, rs mock.RoutingServer) SessionGenerator { - return SessionGenerator{ - net: net, - rs: rs, - seq: 0, - } -} - -type SessionGenerator struct { - seq int - net tn.Network - rs mock.RoutingServer -} - -func (g *SessionGenerator) Next() instance { - g.seq++ - return session(g.net, g.rs, []byte(string(g.seq))) -} - -func (g *SessionGenerator) Instances(n int) []instance { - instances := make([]instance, 0) - for j := 0; j < n; j++ { - inst := g.Next() - instances = append(instances, inst) - } - return instances -} - -type instance struct { - peer peer.Peer - exchange exchange.Interface - blockstore bstore.Blockstore -} - -// session creates a test bitswap session. -// -// NB: It's easy make mistakes by providing the same peer ID to two different -// sessions. To safeguard, use the SessionGenerator to generate sessions. It's -// just a much better idea. -func session(net tn.Network, rs mock.RoutingServer, id peer.ID) instance { - p := peer.WithID(id) - - adapter := net.Adapter(p) - htc := rs.Client(p) - - blockstore := bstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) - const alwaysSendToPeer = true - bs := &bitswap{ - blockstore: blockstore, - notifications: notifications.New(), - strategy: strategy.New(alwaysSendToPeer), - routing: htc, - sender: adapter, - wantlist: util.NewKeySet(), - } - adapter.SetDelegate(bs) - return instance{ - peer: p, - exchange: bs, - blockstore: blockstore, - } -} diff --git a/exchange/bitswap/message/message.go b/exchange/bitswap/message/message.go index e0aea227d..b69450a6f 100644 --- a/exchange/bitswap/message/message.go +++ b/exchange/bitswap/message/message.go @@ -19,7 +19,7 @@ type BitSwapMessage interface { Wantlist() []u.Key // Blocks returns a slice of unique blocks - Blocks() []blocks.Block + Blocks() []*blocks.Block // AddWanted adds the key to the Wantlist. // @@ -32,7 +32,7 @@ type BitSwapMessage interface { // implies Priority(A) > Priority(B) AddWanted(u.Key) - AddBlock(blocks.Block) + AddBlock(*blocks.Block) Exportable } @@ -42,14 +42,14 @@ type Exportable interface { } type impl struct { - existsInWantlist map[u.Key]struct{} // map to detect duplicates - wantlist []u.Key // slice to preserve ordering - blocks map[u.Key]blocks.Block // map to detect duplicates + existsInWantlist map[u.Key]struct{} // map to detect duplicates + wantlist []u.Key // slice to preserve ordering + blocks map[u.Key]*blocks.Block // map to detect duplicates } func New() BitSwapMessage { return &impl{ - blocks: make(map[u.Key]blocks.Block), + blocks: make(map[u.Key]*blocks.Block), existsInWantlist: make(map[u.Key]struct{}), wantlist: make([]u.Key, 0), } @@ -62,7 +62,7 @@ func newMessageFromProto(pbm pb.Message) BitSwapMessage { } for _, d := range pbm.GetBlocks() { b := blocks.NewBlock(d) - m.AddBlock(*b) + m.AddBlock(b) } return m } @@ -71,8 +71,8 @@ func (m *impl) Wantlist() []u.Key { return m.wantlist } -func (m *impl) Blocks() []blocks.Block { - bs := make([]blocks.Block, 0) +func (m *impl) Blocks() []*blocks.Block { + bs := make([]*blocks.Block, 0) for _, block := range m.blocks { bs = append(bs, block) } @@ -88,7 +88,7 @@ func (m *impl) AddWanted(k u.Key) { m.wantlist = append(m.wantlist, k) } -func (m *impl) AddBlock(b blocks.Block) { +func (m *impl) AddBlock(b *blocks.Block) { m.blocks[b.Key()] = b } diff --git a/exchange/bitswap/message/message_test.go b/exchange/bitswap/message/message_test.go index 9c69136cd..de64b7925 100644 --- a/exchange/bitswap/message/message_test.go +++ b/exchange/bitswap/message/message_test.go @@ -42,7 +42,7 @@ func TestAppendBlock(t *testing.T) { m := New() for _, str := range strs { block := blocks.NewBlock([]byte(str)) - m.AddBlock(*block) + m.AddBlock(block) } // assert strings are in proto message @@ -133,10 +133,10 @@ func TestToNetFromNetPreservesWantList(t *testing.T) { func TestToAndFromNetMessage(t *testing.T) { original := New() - original.AddBlock(*blocks.NewBlock([]byte("W"))) - original.AddBlock(*blocks.NewBlock([]byte("E"))) - original.AddBlock(*blocks.NewBlock([]byte("F"))) - original.AddBlock(*blocks.NewBlock([]byte("M"))) + original.AddBlock(blocks.NewBlock([]byte("W"))) + original.AddBlock(blocks.NewBlock([]byte("E"))) + original.AddBlock(blocks.NewBlock([]byte("F"))) + original.AddBlock(blocks.NewBlock([]byte("M"))) p := peer.WithIDString("X") netmsg, err := original.ToNet(p) @@ -180,8 +180,8 @@ func TestDuplicates(t *testing.T) { t.Fatal("Duplicate in BitSwapMessage") } - msg.AddBlock(*b) - msg.AddBlock(*b) + msg.AddBlock(b) + msg.AddBlock(b) if len(msg.Blocks()) != 1 { t.Fatal("Duplicate in BitSwapMessage") } diff --git a/exchange/bitswap/network/ipfs_impl.go b/exchange/bitswap/network/ipfs_impl.go index c94a4859f..f356285ef 100644 --- a/exchange/bitswap/network/ipfs_impl.go +++ b/exchange/bitswap/network/ipfs_impl.go @@ -1,8 +1,6 @@ package network import ( - "errors" - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" @@ -50,22 +48,8 @@ func (bsnet *impl) HandleMessage( return nil } - p, bsmsg := bsnet.receiver.ReceiveMessage(ctx, incoming.Peer(), received) - - // TODO(brian): put this in a helper function - if bsmsg == nil || p == nil { - bsnet.receiver.ReceiveError(errors.New("ReceiveMessage returned nil peer or message")) - return nil - } - - outgoing, err := bsmsg.ToNet(p) - if err != nil { - go bsnet.receiver.ReceiveError(err) - return nil - } - - log.Debugf("Message size: %d", len(outgoing.Data())) - return outgoing + bsnet.receiver.ReceiveMessage(ctx, incoming.Peer(), received) + return nil } func (bsnet *impl) DialPeer(ctx context.Context, p peer.Peer) error { diff --git a/exchange/bitswap/notifications/notifications.go b/exchange/bitswap/notifications/notifications.go index 34888d510..4616ac735 100644 --- a/exchange/bitswap/notifications/notifications.go +++ b/exchange/bitswap/notifications/notifications.go @@ -2,20 +2,21 @@ package notifications import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - pubsub "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/tuxychandru/pubsub" + pubsub "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/maybebtc/pubsub" blocks "github.com/jbenet/go-ipfs/blocks" u "github.com/jbenet/go-ipfs/util" ) +const bufferSize = 16 + type PubSub interface { - Publish(block blocks.Block) - Subscribe(ctx context.Context, k u.Key) <-chan blocks.Block + Publish(block *blocks.Block) + Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block Shutdown() } func New() PubSub { - const bufferSize = 16 return &impl{*pubsub.New(bufferSize)} } @@ -23,33 +24,58 @@ type impl struct { wrapped pubsub.PubSub } -func (ps *impl) Publish(block blocks.Block) { +func (ps *impl) Publish(block *blocks.Block) { topic := string(block.Key()) ps.wrapped.Pub(block, topic) } -// Subscribe returns a one-time use |blockChannel|. |blockChannel| returns nil -// if the |ctx| times out or is cancelled. Then channel is closed after the -// block given by |k| is sent. -func (ps *impl) Subscribe(ctx context.Context, k u.Key) <-chan blocks.Block { - topic := string(k) - subChan := ps.wrapped.SubOnce(topic) - blockChannel := make(chan blocks.Block, 1) // buffered so the sender doesn't wait on receiver - go func() { - defer close(blockChannel) - select { - case val := <-subChan: - block, ok := val.(blocks.Block) - if ok { - blockChannel <- block - } - case <-ctx.Done(): - ps.wrapped.Unsub(subChan, topic) - } - }() - return blockChannel -} - func (ps *impl) Shutdown() { ps.wrapped.Shutdown() } + +// Subscribe returns a channel of blocks for the given |keys|. |blockChannel| +// is closed if the |ctx| times out or is cancelled, or after sending len(keys) +// blocks. +func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block { + + blocksCh := make(chan *blocks.Block, len(keys)) + valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking + if len(keys) == 0 { + close(blocksCh) + return blocksCh + } + ps.wrapped.AddSubOnceEach(valuesCh, toStrings(keys)...) + go func() { + defer close(blocksCh) + defer ps.wrapped.Unsub(valuesCh) // with a len(keys) buffer, this is an optimization + for { + select { + case <-ctx.Done(): + return + case val, ok := <-valuesCh: + if !ok { + return + } + block, ok := val.(*blocks.Block) + if !ok { + return + } + select { + case <-ctx.Done(): + return + case blocksCh <- block: // continue + } + } + } + }() + + return blocksCh +} + +func toStrings(keys []u.Key) []string { + strs := make([]string, 0) + for _, key := range keys { + strs = append(strs, string(key)) + } + return strs +} diff --git a/exchange/bitswap/notifications/notifications_test.go b/exchange/bitswap/notifications/notifications_test.go index 063634f61..3a6ada1ea 100644 --- a/exchange/bitswap/notifications/notifications_test.go +++ b/exchange/bitswap/notifications/notifications_test.go @@ -7,8 +7,35 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" blocks "github.com/jbenet/go-ipfs/blocks" + blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil" + "github.com/jbenet/go-ipfs/util" ) +func TestDuplicates(t *testing.T) { + b1 := blocks.NewBlock([]byte("1")) + b2 := blocks.NewBlock([]byte("2")) + + n := New() + defer n.Shutdown() + ch := n.Subscribe(context.Background(), b1.Key(), b2.Key()) + + n.Publish(b1) + blockRecvd, ok := <-ch + if !ok { + t.Fail() + } + assertBlocksEqual(t, b1, blockRecvd) + + n.Publish(b1) // ignored duplicate + + n.Publish(b2) + blockRecvd, ok = <-ch + if !ok { + t.Fail() + } + assertBlocksEqual(t, b2, blockRecvd) +} + func TestPublishSubscribe(t *testing.T) { blockSent := blocks.NewBlock([]byte("Greetings from The Interval")) @@ -16,16 +43,48 @@ func TestPublishSubscribe(t *testing.T) { defer n.Shutdown() ch := n.Subscribe(context.Background(), blockSent.Key()) - n.Publish(*blockSent) + n.Publish(blockSent) blockRecvd, ok := <-ch if !ok { t.Fail() } - assertBlocksEqual(t, blockRecvd, *blockSent) + assertBlocksEqual(t, blockRecvd, blockSent) } +func TestSubscribeMany(t *testing.T) { + e1 := blocks.NewBlock([]byte("1")) + e2 := blocks.NewBlock([]byte("2")) + + n := New() + defer n.Shutdown() + ch := n.Subscribe(context.Background(), e1.Key(), e2.Key()) + + n.Publish(e1) + r1, ok := <-ch + if !ok { + t.Fatal("didn't receive first expected block") + } + assertBlocksEqual(t, e1, r1) + + n.Publish(e2) + r2, ok := <-ch + if !ok { + t.Fatal("didn't receive second expected block") + } + assertBlocksEqual(t, e2, r2) +} + +func TestSubscribeIsANoopWhenCalledWithNoKeys(t *testing.T) { + n := New() + defer n.Shutdown() + ch := n.Subscribe(context.TODO()) // no keys provided + if _, ok := <-ch; ok { + t.Fatal("should be closed if no keys provided") + } +} + func TestCarryOnWhenDeadlineExpires(t *testing.T) { impossibleDeadline := time.Nanosecond @@ -39,18 +98,46 @@ func TestCarryOnWhenDeadlineExpires(t *testing.T) { assertBlockChannelNil(t, blockChannel) } -func assertBlockChannelNil(t *testing.T, blockChannel <-chan blocks.Block) { +func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) { + + g := blocksutil.NewBlockGenerator() + ctx, cancel := context.WithCancel(context.Background()) + n := New() + defer n.Shutdown() + + t.Log("generate a large number of blocks. exceed default buffer") + bs := g.Blocks(1000) + ks := func() []util.Key { + var keys []util.Key + for _, b := range bs { + keys = append(keys, b.Key()) + } + return keys + }() + + _ = n.Subscribe(ctx, ks...) // ignore received channel + + t.Log("cancel context before any blocks published") + cancel() + for _, b := range bs { + n.Publish(b) + } + + t.Log("publishing the large number of blocks to the ignored channel must not deadlock") +} + +func assertBlockChannelNil(t *testing.T, blockChannel <-chan *blocks.Block) { _, ok := <-blockChannel if ok { t.Fail() } } -func assertBlocksEqual(t *testing.T, a, b blocks.Block) { +func assertBlocksEqual(t *testing.T, a, b *blocks.Block) { if !bytes.Equal(a.Data, b.Data) { - t.Fail() + t.Fatal("blocks aren't equal") } if a.Key() != b.Key() { - t.Fail() + t.Fatal("block keys aren't equal") } } diff --git a/exchange/bitswap/strategy/interface.go b/exchange/bitswap/strategy/interface.go index ac1f09a1f..503a50d41 100644 --- a/exchange/bitswap/strategy/interface.go +++ b/exchange/bitswap/strategy/interface.go @@ -1,6 +1,8 @@ package strategy import ( + "time" + bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" @@ -29,4 +31,8 @@ type Strategy interface { NumBytesSentTo(peer.Peer) uint64 NumBytesReceivedFrom(peer.Peer) uint64 + + // Values determining bitswap behavioural patterns + GetBatchSize() int + GetRebroadcastDelay() time.Duration } diff --git a/exchange/bitswap/strategy/ledger.go b/exchange/bitswap/strategy/ledger.go index 9f33b1aba..74feb3407 100644 --- a/exchange/bitswap/strategy/ledger.go +++ b/exchange/bitswap/strategy/ledger.go @@ -61,6 +61,7 @@ func (l *ledger) ReceivedBytes(n int) { // TODO: this needs to be different. We need timeouts. func (l *ledger) Wants(k u.Key) { + log.Debugf("peer %s wants %s", l.Partner, k) l.wantList[k] = struct{}{} } diff --git a/exchange/bitswap/strategy/strategy.go b/exchange/bitswap/strategy/strategy.go index b778c7a34..fb353d84a 100644 --- a/exchange/bitswap/strategy/strategy.go +++ b/exchange/bitswap/strategy/strategy.go @@ -3,12 +3,15 @@ package strategy import ( "errors" "sync" + "time" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" ) +var log = u.Logger("strategy") + // TODO niceness should be on a per-peer basis. Use-case: Certain peers are // "trusted" and/or controlled by a single human user. The user may want for // these peers to exchange data freely @@ -72,6 +75,8 @@ func (s *strategist) Seed(int64) { // TODO } +// MessageReceived performs book-keeping. Returns error if passed invalid +// arguments. func (s *strategist) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error { s.lock.Lock() defer s.lock.Unlock() @@ -91,7 +96,7 @@ func (s *strategist) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error // FIXME extract blocks.NumBytes(block) or block.NumBytes() method l.ReceivedBytes(len(block.Data)) } - return errors.New("TODO") + return nil } // TODO add contents of m.WantList() to my local wantlist? NB: could introduce @@ -137,3 +142,11 @@ func (s *strategist) ledger(p peer.Peer) *ledger { } return l } + +func (s *strategist) GetBatchSize() int { + return 10 +} + +func (s *strategist) GetRebroadcastDelay() time.Duration { + return time.Second * 5 +} diff --git a/exchange/bitswap/strategy/strategy_test.go b/exchange/bitswap/strategy/strategy_test.go index ef93d9827..d07af601b 100644 --- a/exchange/bitswap/strategy/strategy_test.go +++ b/exchange/bitswap/strategy/strategy_test.go @@ -30,7 +30,7 @@ func TestConsistentAccounting(t *testing.T) { m := message.New() content := []string{"this", "is", "message", "i"} - m.AddBlock(*blocks.NewBlock([]byte(strings.Join(content, " ")))) + m.AddBlock(blocks.NewBlock([]byte(strings.Join(content, " ")))) sender.MessageSent(receiver.Peer, m) receiver.MessageReceived(sender.Peer, m) diff --git a/exchange/bitswap/testnet/network_test.go b/exchange/bitswap/testnet/network_test.go index 3930c2a8c..6f57aad50 100644 --- a/exchange/bitswap/testnet/network_test.go +++ b/exchange/bitswap/testnet/network_test.go @@ -33,7 +33,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) { // TODO test contents of incoming message m := bsmsg.New() - m.AddBlock(*blocks.NewBlock([]byte(expectedStr))) + m.AddBlock(blocks.NewBlock([]byte(expectedStr))) return from, m })) @@ -41,7 +41,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) { t.Log("Build a message and send a synchronous request to recipient") message := bsmsg.New() - message.AddBlock(*blocks.NewBlock([]byte("data"))) + message.AddBlock(blocks.NewBlock([]byte("data"))) response, err := initiator.SendRequest( context.Background(), peer.WithID(idOfRecipient), message) if err != nil { @@ -77,7 +77,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { peer.Peer, bsmsg.BitSwapMessage) { msgToWaiter := bsmsg.New() - msgToWaiter.AddBlock(*blocks.NewBlock([]byte(expectedStr))) + msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr))) return fromWaiter, msgToWaiter })) @@ -105,7 +105,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { })) messageSentAsync := bsmsg.New() - messageSentAsync.AddBlock(*blocks.NewBlock([]byte("data"))) + messageSentAsync.AddBlock(blocks.NewBlock([]byte("data"))) errSending := waiter.SendMessage( context.Background(), peer.WithID(idOfResponder), messageSentAsync) if errSending != nil { diff --git a/exchange/bitswap/testutils.go b/exchange/bitswap/testutils.go new file mode 100644 index 000000000..402a5b1d2 --- /dev/null +++ b/exchange/bitswap/testutils.go @@ -0,0 +1,71 @@ +package bitswap + +import ( + "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" + "github.com/jbenet/go-ipfs/blocks/blockstore" + "github.com/jbenet/go-ipfs/exchange" + tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" + "github.com/jbenet/go-ipfs/peer" + "github.com/jbenet/go-ipfs/routing/mock" +) + +func NewSessionGenerator( + net tn.Network, rs mock.RoutingServer) SessionGenerator { + return SessionGenerator{ + net: net, + rs: rs, + seq: 0, + } +} + +type SessionGenerator struct { + seq int + net tn.Network + rs mock.RoutingServer +} + +func (g *SessionGenerator) Next() Instance { + g.seq++ + return session(g.net, g.rs, []byte(string(g.seq))) +} + +func (g *SessionGenerator) Instances(n int) []Instance { + instances := make([]Instance, 0) + for j := 0; j < n; j++ { + inst := g.Next() + instances = append(instances, inst) + } + return instances +} + +type Instance struct { + Peer peer.Peer + Exchange exchange.Interface + Blockstore blockstore.Blockstore +} + +// session creates a test bitswap session. +// +// NB: It's easy make mistakes by providing the same peer ID to two different +// sessions. To safeguard, use the SessionGenerator to generate sessions. It's +// just a much better idea. +func session(net tn.Network, rs mock.RoutingServer, id peer.ID) Instance { + p := peer.WithID(id) + + adapter := net.Adapter(p) + htc := rs.Client(p) + bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) + + const alwaysSendToPeer = true + ctx := context.TODO() + + bs := New(ctx, p, adapter, htc, bstore, alwaysSendToPeer) + + return Instance{ + Peer: p, + Exchange: bs, + Blockstore: bstore, + } +} diff --git a/exchange/interface.go b/exchange/interface.go index 82782a046..aa2e2431c 100644 --- a/exchange/interface.go +++ b/exchange/interface.go @@ -2,6 +2,8 @@ package exchange import ( + "io" + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" blocks "github.com/jbenet/go-ipfs/blocks" @@ -11,11 +13,14 @@ import ( // Any type that implements exchange.Interface may be used as an IPFS block // exchange protocol. type Interface interface { + // GetBlock returns the block associated with a given key. + GetBlock(context.Context, u.Key) (*blocks.Block, error) - // Block returns the block associated with a given key. - Block(context.Context, u.Key) (*blocks.Block, error) + GetBlocks(context.Context, []u.Key) (<-chan *blocks.Block, error) // TODO Should callers be concerned with whether the block was made // available on the network? - HasBlock(context.Context, blocks.Block) error + HasBlock(context.Context, *blocks.Block) error + + io.Closer } diff --git a/exchange/offline/offline.go b/exchange/offline/offline.go index 5f7ef8835..24a89e038 100644 --- a/exchange/offline/offline.go +++ b/exchange/offline/offline.go @@ -14,23 +14,31 @@ import ( var OfflineMode = errors.New("Block unavailable. Operating in offline mode") -func NewOfflineExchange() exchange.Interface { +func Exchange() exchange.Interface { return &offlineExchange{} } // offlineExchange implements the Exchange interface but doesn't return blocks. // For use in offline mode. -type offlineExchange struct { -} +type offlineExchange struct{} -// Block returns nil to signal that a block could not be retrieved for the +// GetBlock returns nil to signal that a block could not be retrieved for the // given key. // NB: This function may return before the timeout expires. -func (_ *offlineExchange) Block(context.Context, u.Key) (*blocks.Block, error) { +func (_ *offlineExchange) GetBlock(context.Context, u.Key) (*blocks.Block, error) { return nil, OfflineMode } // HasBlock always returns nil. -func (_ *offlineExchange) HasBlock(context.Context, blocks.Block) error { +func (_ *offlineExchange) HasBlock(context.Context, *blocks.Block) error { return nil } + +// Close always returns nil. +func (_ *offlineExchange) Close() error { + return nil +} + +func (_ *offlineExchange) GetBlocks(context.Context, []u.Key) (<-chan *blocks.Block, error) { + return nil, OfflineMode +} diff --git a/exchange/offline/offline_test.go b/exchange/offline/offline_test.go index cc3f3ec82..ac02d2101 100644 --- a/exchange/offline/offline_test.go +++ b/exchange/offline/offline_test.go @@ -10,8 +10,8 @@ import ( ) func TestBlockReturnsErr(t *testing.T) { - off := NewOfflineExchange() - _, err := off.Block(context.Background(), u.Key("foo")) + off := Exchange() + _, err := off.GetBlock(context.Background(), u.Key("foo")) if err != nil { return // as desired } @@ -19,9 +19,9 @@ func TestBlockReturnsErr(t *testing.T) { } func TestHasBlockReturnsNil(t *testing.T) { - off := NewOfflineExchange() + off := Exchange() block := blocks.NewBlock([]byte("data")) - err := off.HasBlock(context.Background(), *block) + err := off.HasBlock(context.Background(), block) if err != nil { t.Fatal("") } diff --git a/importer/importer_test.go b/importer/importer_test.go index e8177aa91..21af40670 100644 --- a/importer/importer_test.go +++ b/importer/importer_test.go @@ -69,6 +69,7 @@ func testFileConsistency(t *testing.T, bs chunk.BlockSplitter, nbytes int) { if err != nil { t.Fatal(err) } + r, err := uio.NewDagReader(nd, nil) if err != nil { t.Fatal(err) diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index 1874e5304..fbb07c9ee 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -22,6 +22,22 @@ var ErrNotFound = fmt.Errorf("merkledag: not found") // so have to convert Multihash bytes to string (u.Key) type NodeMap map[u.Key]*Node +// DAGService is an IPFS Merkle DAG service. +type DAGService interface { + Add(*Node) (u.Key, error) + AddRecursive(*Node) error + Get(u.Key) (*Node, error) + Remove(*Node) error + + // GetDAG returns, in order, all the single leve child + // nodes of the passed in node. + GetDAG(context.Context, *Node) <-chan *Node +} + +func NewDAGService(bs *bserv.BlockService) DAGService { + return &dagService{bs} +} + // Node represents a node in the IPFS Merkle DAG. // nodes have opaque data and a set of navigable links. type Node struct { @@ -156,18 +172,6 @@ func (n *Node) Key() (u.Key, error) { return u.Key(h), err } -// DAGService is an IPFS Merkle DAG service. -type DAGService interface { - Add(*Node) (u.Key, error) - AddRecursive(*Node) error - Get(u.Key) (*Node, error) - Remove(*Node) error -} - -func NewDAGService(bs *bserv.BlockService) DAGService { - return &dagService{bs} -} - // dagService is an IPFS Merkle DAG service. // - the root is virtual (like a forest) // - stores nodes' data in a BlockService @@ -252,6 +256,7 @@ func (n *dagService) Remove(nd *Node) error { // FetchGraph asynchronously fetches all nodes that are children of the given // node, and returns a channel that may be waited upon for the fetch to complete func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{} { + log.Warning("Untested.") var wg sync.WaitGroup done := make(chan struct{}) @@ -284,3 +289,64 @@ func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{} return done } + +// Searches this nodes links for one to the given key, +// returns the index of said link +func FindLink(n *Node, k u.Key, found []*Node) (int, error) { + for i, lnk := range n.Links { + if u.Key(lnk.Hash) == k && found[i] == nil { + return i, nil + } + } + return -1, u.ErrNotFound +} + +// GetDAG will fill out all of the links of the given Node. +// It returns a channel of nodes, which the caller can receive +// all the child nodes of 'root' on, in proper order. +func (ds *dagService) GetDAG(ctx context.Context, root *Node) <-chan *Node { + sig := make(chan *Node) + go func() { + var keys []u.Key + nodes := make([]*Node, len(root.Links)) + + for _, lnk := range root.Links { + keys = append(keys, u.Key(lnk.Hash)) + } + + blkchan := ds.Blocks.GetBlocks(ctx, keys) + + next := 0 + for blk := range blkchan { + i, err := FindLink(root, blk.Key(), nodes) + if err != nil { + // NB: can only occur as a result of programmer error + panic("Received block that wasnt in this nodes links!") + } + + nd, err := Decoded(blk.Data) + if err != nil { + // NB: can occur in normal situations, with improperly formatted + // input data + log.Error("Got back bad block!") + break + } + nodes[i] = nd + + if next == i { + sig <- nd + next++ + for ; next < len(nodes) && nodes[next] != nil; next++ { + sig <- nodes[next] + } + } + } + if next < len(nodes) { + // TODO: bubble errors back up. + log.Errorf("Did not receive correct number of nodes!") + } + close(sig) + }() + + return sig +} diff --git a/merkledag/merkledag_test.go b/merkledag/merkledag_test.go index 2db166beb..0f628e6c1 100644 --- a/merkledag/merkledag_test.go +++ b/merkledag/merkledag_test.go @@ -1,9 +1,20 @@ -package merkledag +package merkledag_test import ( + "bytes" "fmt" + "io" + "io/ioutil" "testing" + bserv "github.com/jbenet/go-ipfs/blockservice" + bs "github.com/jbenet/go-ipfs/exchange/bitswap" + tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" + imp "github.com/jbenet/go-ipfs/importer" + chunk "github.com/jbenet/go-ipfs/importer/chunk" + . "github.com/jbenet/go-ipfs/merkledag" + "github.com/jbenet/go-ipfs/routing/mock" + uio "github.com/jbenet/go-ipfs/unixfs/io" u "github.com/jbenet/go-ipfs/util" ) @@ -56,3 +67,84 @@ func TestNode(t *testing.T) { printn("boop", n2) printn("beep boop", n3) } + +func makeTestDag(t *testing.T) *Node { + read := io.LimitReader(u.NewTimeSeededRand(), 1024*32) + spl := &chunk.SizeSplitter{512} + root, err := imp.NewDagFromReaderWithSplitter(read, spl) + if err != nil { + t.Fatal(err) + } + return root +} + +func TestBatchFetch(t *testing.T) { + net := tn.VirtualNetwork() + rs := mock.VirtualRoutingServer() + sg := bs.NewSessionGenerator(net, rs) + + instances := sg.Instances(5) + + var servs []*bserv.BlockService + var dagservs []DAGService + for _, i := range instances { + bsi, err := bserv.New(i.Blockstore, i.Exchange) + if err != nil { + t.Fatal(err) + } + servs = append(servs, bsi) + dagservs = append(dagservs, NewDAGService(bsi)) + } + t.Log("finished setup.") + + root := makeTestDag(t) + read, err := uio.NewDagReader(root, nil) + if err != nil { + t.Fatal(err) + } + expected, err := ioutil.ReadAll(read) + if err != nil { + t.Fatal(err) + } + + err = dagservs[0].AddRecursive(root) + if err != nil { + t.Fatal(err) + } + + t.Log("Added file to first node.") + + k, err := root.Key() + if err != nil { + t.Fatal(err) + } + + done := make(chan struct{}) + for i := 1; i < len(dagservs); i++ { + go func(i int) { + first, err := dagservs[i].Get(k) + if err != nil { + t.Fatal(err) + } + fmt.Println("Got first node back.") + + read, err := uio.NewDagReader(first, dagservs[i]) + if err != nil { + t.Fatal(err) + } + datagot, err := ioutil.ReadAll(read) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(datagot, expected) { + t.Fatal("Got bad data back!") + } + done <- struct{}{} + }(i) + } + + for i := 1; i < len(dagservs); i++ { + <-done + } +} diff --git a/net/interface.go b/net/interface.go index 3c0e8d204..60e650c2f 100644 --- a/net/interface.go +++ b/net/interface.go @@ -42,6 +42,10 @@ type Network interface { // the network since it was instantiated GetBandwidthTotals() (uint64, uint64) + // GetMessageCounts returns the total number of messages passed through + // the network since it was instantiated + GetMessageCounts() (uint64, uint64) + // SendMessage sends given Message out SendMessage(msg.NetMessage) error diff --git a/net/mux/mux.go b/net/mux/mux.go index 1b4c06344..d971e9054 100644 --- a/net/mux/mux.go +++ b/net/mux/mux.go @@ -45,9 +45,11 @@ type Muxer struct { bwiLock sync.Mutex bwIn uint64 + msgIn uint64 bwoLock sync.Mutex bwOut uint64 + msgOut uint64 *msg.Pipe ctxc.ContextCloser @@ -76,6 +78,18 @@ func (m *Muxer) GetPipe() *msg.Pipe { return m.Pipe } +// GetMessageCounts return the in/out message count measured over this muxer. +func (m *Muxer) GetMessageCounts() (in uint64, out uint64) { + m.bwiLock.Lock() + in = m.msgIn + m.bwiLock.Unlock() + + m.bwoLock.Lock() + out = m.msgOut + m.bwoLock.Unlock() + return +} + // GetBandwidthTotals return the in/out bandwidth measured over this muxer. func (m *Muxer) GetBandwidthTotals() (in uint64, out uint64) { m.bwiLock.Lock() @@ -125,6 +139,7 @@ func (m *Muxer) handleIncomingMessage(m1 msg.NetMessage) { m.bwiLock.Lock() // TODO: compensate for overhead m.bwIn += uint64(len(m1.Data())) + m.msgIn++ m.bwiLock.Unlock() data, pid, err := unwrapData(m1.Data()) @@ -182,6 +197,7 @@ func (m *Muxer) handleOutgoingMessage(pid pb.ProtocolID, m1 msg.NetMessage) { // TODO: compensate for overhead // TODO(jbenet): switch this to a goroutine to prevent sync waiting. m.bwOut += uint64(len(data)) + m.msgOut++ m.bwoLock.Unlock() m2 := msg.New(m1.Peer(), data) diff --git a/net/net.go b/net/net.go index 0ee1e76b5..3778d839a 100644 --- a/net/net.go +++ b/net/net.go @@ -110,6 +110,11 @@ func (n *IpfsNetwork) GetBandwidthTotals() (in uint64, out uint64) { return n.muxer.GetBandwidthTotals() } +// GetBandwidthTotals returns the total amount of messages transferred +func (n *IpfsNetwork) GetMessageCounts() (in uint64, out uint64) { + return n.muxer.GetMessageCounts() +} + // ListenAddresses returns a list of addresses at which this network listens. func (n *IpfsNetwork) ListenAddresses() []ma.Multiaddr { return n.swarm.ListenAddresses() diff --git a/net/swarm/conn.go b/net/swarm/conn.go index f95334d0d..ab6035917 100644 --- a/net/swarm/conn.go +++ b/net/swarm/conn.go @@ -182,7 +182,7 @@ func (s *Swarm) fanOut() { return } if len(msg.Data()) >= conn.MaxMessageSize { - log.Critical("Attempted to send message bigger than max size.") + log.Criticalf("Attempted to send message bigger than max size. (%d)", len(msg.Data())) } s.connsLock.RLock() diff --git a/pin/pin_test.go b/pin/pin_test.go index 1ea302823..fc9dc215d 100644 --- a/pin/pin_test.go +++ b/pin/pin_test.go @@ -4,7 +4,10 @@ import ( "testing" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" + "github.com/jbenet/go-ipfs/blocks/blockstore" bs "github.com/jbenet/go-ipfs/blockservice" + "github.com/jbenet/go-ipfs/exchange/offline" mdag "github.com/jbenet/go-ipfs/merkledag" "github.com/jbenet/go-ipfs/util" ) @@ -19,13 +22,15 @@ func randNode() (*mdag.Node, util.Key) { func TestPinnerBasic(t *testing.T) { dstore := ds.NewMapDatastore() - bserv, err := bs.NewBlockService(dstore, nil) + bstore := blockstore.NewBlockstore(dssync.MutexWrap(dstore)) + bserv, err := bs.New(bstore, offline.Exchange()) if err != nil { t.Fatal(err) } dserv := mdag.NewDAGService(bserv) + // TODO does pinner need to share datastore with blockservice? p := NewPinner(dstore, dserv) a, ak := randNode() diff --git a/routing/dht/routing.go b/routing/dht/routing.go index fedf281d3..f0bfbe485 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -126,6 +126,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error { } func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int) <-chan peer.Peer { + log.Event(ctx, "findProviders", &key) peerOut := make(chan peer.Peer, count) go func() { ps := newPeerSet() diff --git a/routing/mock/routing.go b/routing/mock/routing.go index 358d57901..ff83ddca3 100644 --- a/routing/mock/routing.go +++ b/routing/mock/routing.go @@ -59,7 +59,7 @@ func (mr *MockRouter) FindProviders(ctx context.Context, key u.Key) ([]peer.Peer } func (mr *MockRouter) FindPeer(ctx context.Context, pid peer.ID) (peer.Peer, error) { - log.Debug("FindPeer: %s", pid) + log.Debugf("FindPeer: %s", pid) return nil, nil } diff --git a/unixfs/io/dagmodifier_test.go b/unixfs/io/dagmodifier_test.go index 822c87471..d0aa83795 100644 --- a/unixfs/io/dagmodifier_test.go +++ b/unixfs/io/dagmodifier_test.go @@ -6,7 +6,10 @@ import ( "io/ioutil" "testing" + "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" + "github.com/jbenet/go-ipfs/blocks/blockstore" bs "github.com/jbenet/go-ipfs/blockservice" + "github.com/jbenet/go-ipfs/exchange/offline" imp "github.com/jbenet/go-ipfs/importer" "github.com/jbenet/go-ipfs/importer/chunk" mdag "github.com/jbenet/go-ipfs/merkledag" @@ -19,7 +22,9 @@ import ( func getMockDagServ(t *testing.T) mdag.DAGService { dstore := ds.NewMapDatastore() - bserv, err := bs.NewBlockService(dstore, nil) + tsds := sync.MutexWrap(dstore) + bstore := blockstore.NewBlockstore(tsds) + bserv, err := bs.New(bstore, offline.Exchange()) if err != nil { t.Fatal(err) } diff --git a/unixfs/io/dagreader.go b/unixfs/io/dagreader.go index ea33c3540..f4290dd4b 100644 --- a/unixfs/io/dagreader.go +++ b/unixfs/io/dagreader.go @@ -5,6 +5,8 @@ import ( "errors" "io" + "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" mdag "github.com/jbenet/go-ipfs/merkledag" ft "github.com/jbenet/go-ipfs/unixfs" @@ -15,10 +17,11 @@ var ErrIsDir = errors.New("this dag node is a directory") // DagReader provides a way to easily read the data contained in a dag. type DagReader struct { - serv mdag.DAGService - node *mdag.Node - position int - buf io.Reader + serv mdag.DAGService + node *mdag.Node + buf io.Reader + fetchChan <-chan *mdag.Node + linkPosition int } // NewDagReader creates a new reader object that reads the data represented by the given @@ -35,10 +38,15 @@ func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) { // Dont allow reading directories return nil, ErrIsDir case ftpb.Data_File: + var fetchChan <-chan *mdag.Node + if serv != nil { + fetchChan = serv.GetDAG(context.TODO(), n) + } return &DagReader{ - node: n, - serv: serv, - buf: bytes.NewBuffer(pb.GetData()), + node: n, + serv: serv, + buf: bytes.NewBuffer(pb.GetData()), + fetchChan: fetchChan, }, nil case ftpb.Data_Raw: // Raw block will just be a single level, return a byte buffer @@ -51,19 +59,43 @@ func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) { // precalcNextBuf follows the next link in line and loads it from the DAGService, // setting the next buffer to read from func (dr *DagReader) precalcNextBuf() error { - if dr.position >= len(dr.node.Links) { - return io.EOF - } - nxt, err := dr.node.Links[dr.position].GetNode(dr.serv) - if err != nil { - return err + var nxt *mdag.Node + var ok bool + + // TODO: require non-nil dagservice, use offline bitswap exchange + if dr.serv == nil { + // Only used when fetchChan is nil, + // which only happens when passed in a nil dagservice + // TODO: this logic is hard to follow, do it better. + // NOTE: the only time this code is used, is during the + // importer tests, consider just changing those tests + log.Warning("Running DAGReader with nil DAGService!") + if dr.linkPosition >= len(dr.node.Links) { + return io.EOF + } + nxt = dr.node.Links[dr.linkPosition].Node + if nxt == nil { + return errors.New("Got nil node back from link! and no DAGService!") + } + dr.linkPosition++ + + } else { + if dr.fetchChan == nil { + panic("this is wrong.") + } + select { + case nxt, ok = <-dr.fetchChan: + if !ok { + return io.EOF + } + } } + pb := new(ftpb.Data) - err = proto.Unmarshal(nxt.Data, pb) + err := proto.Unmarshal(nxt.Data, pb) if err != nil { return err } - dr.position++ switch pb.GetType() { case ftpb.Data_Directory: diff --git a/util/key.go b/util/key.go index 7cbf09518..eca1255b5 100644 --- a/util/key.go +++ b/util/key.go @@ -63,6 +63,12 @@ func (k *Key) MarshalJSON() ([]byte, error) { return json.Marshal(b58.Encode([]byte(*k))) } +func (k *Key) Loggable() map[string]interface{} { + return map[string]interface{}{ + "key": k.String(), + } +} + // KeyFromDsKey returns a Datastore key func KeyFromDsKey(dsk ds.Key) Key { return Key(dsk.BaseNamespace()) diff --git a/util/testutil/gen.go b/util/testutil/gen.go index be2fe5988..c7f310c2a 100644 --- a/util/testutil/gen.go +++ b/util/testutil/gen.go @@ -4,9 +4,12 @@ import ( crand "crypto/rand" "testing" + dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" + "github.com/jbenet/go-ipfs/exchange/offline" "github.com/jbenet/go-ipfs/peer" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + "github.com/jbenet/go-ipfs/blocks/blockstore" bsrv "github.com/jbenet/go-ipfs/blockservice" dag "github.com/jbenet/go-ipfs/merkledag" u "github.com/jbenet/go-ipfs/util" @@ -14,7 +17,8 @@ import ( func GetDAGServ(t testing.TB) dag.DAGService { dstore := ds.NewMapDatastore() - bserv, err := bsrv.NewBlockService(dstore, nil) + tsds := dssync.MutexWrap(dstore) + bserv, err := bsrv.New(blockstore.NewBlockstore(tsds), offline.Exchange()) if err != nil { t.Fatal(err) }