diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index e904d28a6..1539b5fc8 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -128,7 +128,7 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e log.Event(ctx, "DialPeer", p) err := bs.sender.DialPeer(ctx, p) if err != nil { - log.Errorf("Error sender.DialPeer(%s)", p) + log.Errorf("Error sender.DialPeer(%s): %s", p, err) return } @@ -153,10 +153,8 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e func (bs *bitswap) run(ctx context.Context) { - const batchDelay = time.Millisecond * 3 // Time to wait before sending out wantlists to better batch up requests - const numKeysPerBatch = 10 - const maxProvidersPerRequest = 6 - const rebroadcastPeriod = time.Second * 5 // Every so often, we should resend out our current want list + // Every so often, we should resend out our current want list + rebroadcastTime := time.Second * 5 var providers <-chan peer.Peer // NB: must be initialized to zero value broadcastSignal := time.After(rebroadcastPeriod) diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index e06eabefa..e3b4d913a 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -100,7 +100,7 @@ func TestSwarm(t *testing.T) { t.Log("Create a ton of instances, and just a few blocks") - numInstances := 5 + numInstances := 500 numBlocks := 2 instances := sg.Instances(numInstances) @@ -140,6 +140,57 @@ func TestSwarm(t *testing.T) { } } +func TestLargeFile(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + net := tn.VirtualNetwork() + rs := mock.VirtualRoutingServer() + sg := NewSessionGenerator(net, rs) + bg := NewBlockGenerator() + + t.Log("Test a few nodes trying to get one file with a lot of blocks") + + numInstances := 10 + numBlocks := 100 + + instances := sg.Instances(numInstances) + blocks := bg.Blocks(numBlocks) + + t.Log("Give the blocks to the first instance") + + first := instances[0] + for _, b := range blocks { + first.blockstore.Put(b) + first.exchange.HasBlock(context.Background(), *b) + rs.Announce(first.peer, b.Key()) + } + + t.Log("Distribute!") + + var wg sync.WaitGroup + + for _, inst := range instances { + for _, b := range blocks { + wg.Add(1) + // NB: executing getOrFail concurrently puts tremendous pressure on + // the goroutine scheduler + getOrFail(inst, b, t, &wg) + } + } + wg.Wait() + + t.Log("Verify!") + + for _, inst := range instances { + for _, b := range blocks { + if _, err := inst.blockstore.Get(b.Key()); err != nil { + t.Fatal(err) + } + } + } +} + func getOrFail(bitswap instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) { if _, err := bitswap.blockstore.Get(b.Key()); err != nil { _, err := bitswap.exchange.GetBlock(context.Background(), b.Key())