mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-01 10:49:24 +08:00
test(bitswap) test with swarm of ~500 instances
test(bitswap) run synchronously to aid the scheduler
This commit is contained in:
@ -2,12 +2,14 @@ package bitswap
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
|
|
||||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
|
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
|
||||||
|
"github.com/jbenet/go-ipfs/blocks"
|
||||||
bstore "github.com/jbenet/go-ipfs/blockstore"
|
bstore "github.com/jbenet/go-ipfs/blockstore"
|
||||||
exchange "github.com/jbenet/go-ipfs/exchange"
|
exchange "github.com/jbenet/go-ipfs/exchange"
|
||||||
notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
|
notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
|
||||||
@ -85,6 +87,64 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSwarm(t *testing.T) {
|
||||||
|
net := tn.VirtualNetwork()
|
||||||
|
rs := tn.VirtualRoutingServer()
|
||||||
|
sg := NewSessionGenerator(net, rs)
|
||||||
|
bg := NewBlockGenerator(t)
|
||||||
|
|
||||||
|
t.Log("Create a ton of instances, and just a few blocks")
|
||||||
|
|
||||||
|
numInstances := 500
|
||||||
|
numBlocks := 2
|
||||||
|
|
||||||
|
instances := sg.Instances(numInstances)
|
||||||
|
blocks := bg.Blocks(numBlocks)
|
||||||
|
|
||||||
|
t.Log("Give the blocks to the first instance")
|
||||||
|
|
||||||
|
first := instances[0]
|
||||||
|
for _, b := range blocks {
|
||||||
|
first.blockstore.Put(*b)
|
||||||
|
first.exchange.HasBlock(context.Background(), *b)
|
||||||
|
rs.Announce(first.peer, b.Key())
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Log("Distribute!")
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
for _, inst := range instances {
|
||||||
|
for _, b := range blocks {
|
||||||
|
wg.Add(1)
|
||||||
|
// NB: executing getOrFail concurrently puts tremendous pressure on
|
||||||
|
// the goroutine scheduler
|
||||||
|
getOrFail(inst, b, t, &wg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
t.Log("Verify!")
|
||||||
|
|
||||||
|
for _, inst := range instances {
|
||||||
|
for _, b := range blocks {
|
||||||
|
if _, err := inst.blockstore.Get(b.Key()); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getOrFail(bitswap instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
|
||||||
|
if _, err := bitswap.blockstore.Get(b.Key()); err != nil {
|
||||||
|
_, err := bitswap.exchange.Block(context.Background(), b.Key())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}
|
||||||
|
|
||||||
func TestSendToWantingPeer(t *testing.T) {
|
func TestSendToWantingPeer(t *testing.T) {
|
||||||
t.Log("I get a file from peer |w|. In this message, I receive |w|'s wants")
|
t.Log("I get a file from peer |w|. In this message, I receive |w|'s wants")
|
||||||
t.Log("Peer |w| tells me it wants file |f|, but I don't have it")
|
t.Log("Peer |w| tells me it wants file |f|, but I don't have it")
|
||||||
@ -92,6 +152,31 @@ func TestSendToWantingPeer(t *testing.T) {
|
|||||||
t.Log("After receiving |f| from |o|, I send it to the wanting peer |w|")
|
t.Log("After receiving |f| from |o|, I send it to the wanting peer |w|")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewBlockGenerator(t *testing.T) BlockGenerator {
|
||||||
|
return BlockGenerator{
|
||||||
|
T: t,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type BlockGenerator struct {
|
||||||
|
*testing.T // b/c block generation can fail
|
||||||
|
seq int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bg *BlockGenerator) Next() blocks.Block {
|
||||||
|
bg.seq++
|
||||||
|
return testutil.NewBlockOrFail(bg.T, string(bg.seq))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bg *BlockGenerator) Blocks(n int) []*blocks.Block {
|
||||||
|
blocks := make([]*blocks.Block, 0)
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
b := bg.Next()
|
||||||
|
blocks = append(blocks, &b)
|
||||||
|
}
|
||||||
|
return blocks
|
||||||
|
}
|
||||||
|
|
||||||
func NewSessionGenerator(
|
func NewSessionGenerator(
|
||||||
net tn.Network, rs tn.RoutingServer) SessionGenerator {
|
net tn.Network, rs tn.RoutingServer) SessionGenerator {
|
||||||
return SessionGenerator{
|
return SessionGenerator{
|
||||||
@ -107,12 +192,21 @@ type SessionGenerator struct {
|
|||||||
rs tn.RoutingServer
|
rs tn.RoutingServer
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *SessionGenerator) Next() testnetBitSwap {
|
func (g *SessionGenerator) Next() instance {
|
||||||
g.seq++
|
g.seq++
|
||||||
return session(g.net, g.rs, []byte(string(g.seq)))
|
return session(g.net, g.rs, []byte(string(g.seq)))
|
||||||
}
|
}
|
||||||
|
|
||||||
type testnetBitSwap struct {
|
func (g *SessionGenerator) Instances(n int) []instance {
|
||||||
|
instances := make([]instance, 0)
|
||||||
|
for j := 0; j < n; j++ {
|
||||||
|
inst := g.Next()
|
||||||
|
instances = append(instances, inst)
|
||||||
|
}
|
||||||
|
return instances
|
||||||
|
}
|
||||||
|
|
||||||
|
type instance struct {
|
||||||
peer *peer.Peer
|
peer *peer.Peer
|
||||||
exchange exchange.Interface
|
exchange exchange.Interface
|
||||||
blockstore bstore.Blockstore
|
blockstore bstore.Blockstore
|
||||||
@ -123,7 +217,7 @@ type testnetBitSwap struct {
|
|||||||
// NB: It's easy make mistakes by providing the same peer ID to two different
|
// NB: It's easy make mistakes by providing the same peer ID to two different
|
||||||
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's
|
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's
|
||||||
// just a much better idea.
|
// just a much better idea.
|
||||||
func session(net tn.Network, rs tn.RoutingServer, id peer.ID) testnetBitSwap {
|
func session(net tn.Network, rs tn.RoutingServer, id peer.ID) instance {
|
||||||
p := &peer.Peer{ID: id}
|
p := &peer.Peer{ID: id}
|
||||||
|
|
||||||
adapter := net.Adapter(p)
|
adapter := net.Adapter(p)
|
||||||
@ -138,7 +232,7 @@ func session(net tn.Network, rs tn.RoutingServer, id peer.ID) testnetBitSwap {
|
|||||||
sender: adapter,
|
sender: adapter,
|
||||||
}
|
}
|
||||||
adapter.SetDelegate(bs)
|
adapter.SetDelegate(bs)
|
||||||
return testnetBitSwap{
|
return instance{
|
||||||
peer: p,
|
peer: p,
|
||||||
exchange: bs,
|
exchange: bs,
|
||||||
blockstore: blockstore,
|
blockstore: blockstore,
|
||||||
|
Reference in New Issue
Block a user