From c73da8486a88c28312e9e4148e9a92c6b44dd34b Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 8 Feb 2016 15:59:22 -0800 Subject: [PATCH] wait for peers in wantmanager to all appear License: MIT Signed-off-by: Jeromy --- exchange/bitswap/bitswap_test.go | 13 +++++++++++++ exchange/bitswap/wantmanager.go | 18 ++++++++++++++++-- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index 04a1fb709..435779fd8 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -158,6 +158,19 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { t.Log("Give the blocks to the first instance") + nump := len(instances) - 1 + // assert we're properly connected + for _, inst := range instances { + peers := inst.Exchange.wm.ConnectedPeers() + for i := 0; i < 10 && len(peers) != nump; i++ { + time.Sleep(time.Millisecond * 50) + peers = inst.Exchange.wm.ConnectedPeers() + } + if len(peers) != nump { + t.Fatal("not enough peers connected to instance") + } + } + var blkeys []key.Key first := instances[0] for _, b := range blocks { diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index 243edac37..73bd4b4c8 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -16,8 +16,9 @@ import ( type WantManager struct { // sync channels for Run loop incoming chan []*bsmsg.Entry - connect chan peer.ID // notification channel for new peers connecting - disconnect chan peer.ID // notification channel for peers disconnecting + connect chan peer.ID // notification channel for new peers connecting + disconnect chan peer.ID // notification channel for peers disconnecting + peerReqs chan chan []peer.ID // channel to request connected peers on // synchronized by Run loop, only touch inside there peers map[peer.ID]*msgQueue @@ -32,6 +33,7 @@ func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantMana incoming: make(chan []*bsmsg.Entry, 10), connect: make(chan peer.ID, 10), disconnect: make(chan peer.ID, 10), + peerReqs: make(chan chan []peer.ID), peers: make(map[peer.ID]*msgQueue), wl: wantlist.NewThreadSafe(), network: network, @@ -88,6 +90,12 @@ func (pm *WantManager) addEntries(ks []key.Key, cancel bool) { } } +func (pm *WantManager) ConnectedPeers() []peer.ID { + resp := make(chan []peer.ID) + pm.peerReqs <- resp + return <-resp +} + func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) { // Blocks need to be sent synchronously to maintain proper backpressure // throughout the network stack @@ -242,6 +250,12 @@ func (pm *WantManager) Run() { pm.startPeerHandler(p) case p := <-pm.disconnect: pm.stopPeerHandler(p) + case req := <-pm.peerReqs: + var peers []peer.ID + for p := range pm.peers { + peers = append(peers, p) + } + req <- peers case <-pm.ctx.Done(): return }