diff --git a/epictest/addcat_test.go b/epictest/addcat_test.go index e34b37295..ab40552df 100644 --- a/epictest/addcat_test.go +++ b/epictest/addcat_test.go @@ -10,19 +10,27 @@ import ( "time" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-random" + datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" + random "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-random" + blockstore "github.com/jbenet/go-ipfs/blocks/blockstore" blockservice "github.com/jbenet/go-ipfs/blockservice" + exchange "github.com/jbenet/go-ipfs/exchange" bitswap "github.com/jbenet/go-ipfs/exchange/bitswap" - tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" + bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network" importer "github.com/jbenet/go-ipfs/importer" chunk "github.com/jbenet/go-ipfs/importer/chunk" merkledag "github.com/jbenet/go-ipfs/merkledag" + net "github.com/jbenet/go-ipfs/net" mocknet "github.com/jbenet/go-ipfs/net/mock" path "github.com/jbenet/go-ipfs/path" - mockrouting "github.com/jbenet/go-ipfs/routing/mock" + peer "github.com/jbenet/go-ipfs/peer" + dht "github.com/jbenet/go-ipfs/routing/dht" uio "github.com/jbenet/go-ipfs/unixfs/io" util "github.com/jbenet/go-ipfs/util" + "github.com/jbenet/go-ipfs/util/datastore2" errors "github.com/jbenet/go-ipfs/util/debugerror" + delay "github.com/jbenet/go-ipfs/util/delay" ) const kSeed = 1 @@ -34,7 +42,7 @@ func Test1KBInstantaneous(t *testing.T) { BlockstoreLatency: 0, } - if err := AddCatBytes(RandomBytes(1*KB), conf); err != nil { + if err := AddCatBytes(RandomBytes(100*MB), conf); err != nil { t.Fatal(err) } } @@ -88,55 +96,83 @@ func RandomBytes(n int64) []byte { return data.Bytes() } +type instance struct { + ID peer.ID + Network net.Network + Blockstore blockstore.Blockstore + Datastore datastore.ThreadSafeDatastore + DHT *dht.IpfsDHT + Exchange exchange.Interface + BitSwapNetwork bsnet.BitSwapNetwork + + datastoreDelay delay.D +} + func AddCatBytes(data []byte, conf Config) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - mn := mocknet.New(ctx) - // defer mn.Close() FIXME does mocknet require clean-up - mn.SetLinkDefaults(mocknet.LinkOptions{ - Latency: conf.NetworkLatency, - // TODO add to conf. This is tricky because we want 0 values to be functional. - Bandwidth: math.MaxInt32, - }) - dhtNetwork := mockrouting.NewDHTNetwork(mn) - net, err := tn.StreamNet(ctx, mn, dhtNetwork) + const numPeers = 2 + instances := make(map[peer.ID]*instance, numPeers) + + // create network + mn, err := mocknet.FullMeshLinked(ctx, numPeers) if err != nil { return errors.Wrap(err) } - sessionGenerator := bitswap.NewSessionGenerator(net) - defer sessionGenerator.Close() - - adder := sessionGenerator.Next() - catter := sessionGenerator.Next() - // catter.Routing.Update(context.TODO(), adder.Peer) - - peers := mn.Peers() - if len(peers) != 2 { - return errors.New("peers not in network") - } - - for _, i := range peers { - for _, j := range peers { - if i == j { - continue - } - if _, err := mn.LinkPeers(i, j); err != nil { - return err - } - if err := mn.ConnectPeers(i, j); err != nil { - return err - } + mn.SetLinkDefaults(mocknet.LinkOptions{ + Latency: conf.NetworkLatency, + // TODO add to conf. This is tricky because we want 0 values to be functional. + Bandwidth: math.MaxInt32, + }) + for _, p := range mn.Peers() { + instances[p] = &instance{ + ID: p, + Network: mn.Net(p), } } - catter.SetBlockstoreLatency(conf.BlockstoreLatency) + // create dht network + for _, p := range mn.Peers() { + dsDelay := delay.Fixed(conf.BlockstoreLatency) + instances[p].Datastore = sync.MutexWrap(datastore2.WithDelay(datastore.NewMapDatastore(), dsDelay)) + instances[p].datastoreDelay = dsDelay + } + for _, p := range mn.Peers() { + instances[p].DHT = dht.NewDHT(ctx, p, instances[p].Network, instances[p].Datastore) + } + // create two bitswap network clients + for _, p := range mn.Peers() { + instances[p].BitSwapNetwork = bsnet.NewFromIpfsNetwork(instances[p].Network, instances[p].DHT) + } + for _, p := range mn.Peers() { + const kWriteCacheElems = 100 + const alwaysSendToPeer = true + adapter := instances[p].BitSwapNetwork + dstore := instances[p].Datastore + instances[p].Blockstore, err = blockstore.WriteCached(blockstore.NewBlockstore(dstore), kWriteCacheElems) + if err != nil { + return err + } + instances[p].Exchange = bitswap.New(ctx, p, adapter, instances[p].Blockstore, alwaysSendToPeer) + } + var peers []peer.ID + for _, p := range mn.Peers() { + peers = append(peers, p) + } - adder.SetBlockstoreLatency(0) // disable blockstore latency during add operation + adder := instances[peers[0]] + catter := instances[peers[1]] + + // bootstrap the DHTs + adder.DHT.Connect(ctx, catter.ID) + catter.DHT.Connect(ctx, adder.ID) + + adder.datastoreDelay.Set(0) // disable blockstore latency during add operation keyAdded, err := add(adder, bytes.NewReader(data)) if err != nil { return err } - adder.SetBlockstoreLatency(conf.BlockstoreLatency) // add some blockstore delay to make the catter wait + adder.datastoreDelay.Set(conf.BlockstoreLatency) // add some blockstore delay to make the catter wait readerCatted, err := cat(catter, keyAdded) if err != nil { @@ -152,8 +188,8 @@ func AddCatBytes(data []byte, conf Config) error { return nil } -func cat(catter bitswap.Instance, k util.Key) (io.Reader, error) { - catterdag := merkledag.NewDAGService(&blockservice.BlockService{catter.Blockstore(), catter.Exchange}) +func cat(catter *instance, k util.Key) (io.Reader, error) { + catterdag := merkledag.NewDAGService(&blockservice.BlockService{catter.Blockstore, catter.Exchange}) nodeCatted, err := (&path.Resolver{catterdag}).ResolvePath(k.String()) if err != nil { return nil, err @@ -161,10 +197,10 @@ func cat(catter bitswap.Instance, k util.Key) (io.Reader, error) { return uio.NewDagReader(nodeCatted, catterdag) } -func add(adder bitswap.Instance, r io.Reader) (util.Key, error) { +func add(adder *instance, r io.Reader) (util.Key, error) { nodeAdded, err := importer.BuildDagFromReader( r, - merkledag.NewDAGService(&blockservice.BlockService{adder.Blockstore(), adder.Exchange}), + merkledag.NewDAGService(&blockservice.BlockService{adder.Blockstore, adder.Exchange}), nil, chunk.DefaultSplitter, )