mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-01 19:24:14 +08:00
added a new test for a dhthell scenario that was failing
This commit is contained in:
@ -128,7 +128,7 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e
|
||||
log.Event(ctx, "DialPeer", p)
|
||||
err := bs.sender.DialPeer(ctx, p)
|
||||
if err != nil {
|
||||
log.Errorf("Error sender.DialPeer(%s)", p)
|
||||
log.Errorf("Error sender.DialPeer(%s): %s", p, err)
|
||||
return
|
||||
}
|
||||
|
||||
@ -153,10 +153,8 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e
|
||||
|
||||
func (bs *bitswap) run(ctx context.Context) {
|
||||
|
||||
const batchDelay = time.Millisecond * 3 // Time to wait before sending out wantlists to better batch up requests
|
||||
const numKeysPerBatch = 10
|
||||
const maxProvidersPerRequest = 6
|
||||
const rebroadcastPeriod = time.Second * 5 // Every so often, we should resend out our current want list
|
||||
// Every so often, we should resend out our current want list
|
||||
rebroadcastTime := time.Second * 5
|
||||
|
||||
var providers <-chan peer.Peer // NB: must be initialized to zero value
|
||||
broadcastSignal := time.After(rebroadcastPeriod)
|
||||
|
@ -100,7 +100,7 @@ func TestSwarm(t *testing.T) {
|
||||
|
||||
t.Log("Create a ton of instances, and just a few blocks")
|
||||
|
||||
numInstances := 5
|
||||
numInstances := 500
|
||||
numBlocks := 2
|
||||
|
||||
instances := sg.Instances(numInstances)
|
||||
@ -140,6 +140,57 @@ func TestSwarm(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestLargeFile(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.SkipNow()
|
||||
}
|
||||
net := tn.VirtualNetwork()
|
||||
rs := mock.VirtualRoutingServer()
|
||||
sg := NewSessionGenerator(net, rs)
|
||||
bg := NewBlockGenerator()
|
||||
|
||||
t.Log("Test a few nodes trying to get one file with a lot of blocks")
|
||||
|
||||
numInstances := 10
|
||||
numBlocks := 100
|
||||
|
||||
instances := sg.Instances(numInstances)
|
||||
blocks := bg.Blocks(numBlocks)
|
||||
|
||||
t.Log("Give the blocks to the first instance")
|
||||
|
||||
first := instances[0]
|
||||
for _, b := range blocks {
|
||||
first.blockstore.Put(b)
|
||||
first.exchange.HasBlock(context.Background(), *b)
|
||||
rs.Announce(first.peer, b.Key())
|
||||
}
|
||||
|
||||
t.Log("Distribute!")
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for _, inst := range instances {
|
||||
for _, b := range blocks {
|
||||
wg.Add(1)
|
||||
// NB: executing getOrFail concurrently puts tremendous pressure on
|
||||
// the goroutine scheduler
|
||||
getOrFail(inst, b, t, &wg)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
t.Log("Verify!")
|
||||
|
||||
for _, inst := range instances {
|
||||
for _, b := range blocks {
|
||||
if _, err := inst.blockstore.Get(b.Key()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func getOrFail(bitswap instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
|
||||
if _, err := bitswap.blockstore.Get(b.Key()); err != nil {
|
||||
_, err := bitswap.exchange.GetBlock(context.Background(), b.Key())
|
||||
|
Reference in New Issue
Block a user