mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-01 10:49:24 +08:00
wait for peers in wantmanager to all appear
License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com>
This commit is contained in:
@ -158,6 +158,19 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
|
|||||||
|
|
||||||
t.Log("Give the blocks to the first instance")
|
t.Log("Give the blocks to the first instance")
|
||||||
|
|
||||||
|
nump := len(instances) - 1
|
||||||
|
// assert we're properly connected
|
||||||
|
for _, inst := range instances {
|
||||||
|
peers := inst.Exchange.wm.ConnectedPeers()
|
||||||
|
for i := 0; i < 10 && len(peers) != nump; i++ {
|
||||||
|
time.Sleep(time.Millisecond * 50)
|
||||||
|
peers = inst.Exchange.wm.ConnectedPeers()
|
||||||
|
}
|
||||||
|
if len(peers) != nump {
|
||||||
|
t.Fatal("not enough peers connected to instance")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var blkeys []key.Key
|
var blkeys []key.Key
|
||||||
first := instances[0]
|
first := instances[0]
|
||||||
for _, b := range blocks {
|
for _, b := range blocks {
|
||||||
|
@ -16,8 +16,9 @@ import (
|
|||||||
type WantManager struct {
|
type WantManager struct {
|
||||||
// sync channels for Run loop
|
// sync channels for Run loop
|
||||||
incoming chan []*bsmsg.Entry
|
incoming chan []*bsmsg.Entry
|
||||||
connect chan peer.ID // notification channel for new peers connecting
|
connect chan peer.ID // notification channel for new peers connecting
|
||||||
disconnect chan peer.ID // notification channel for peers disconnecting
|
disconnect chan peer.ID // notification channel for peers disconnecting
|
||||||
|
peerReqs chan chan []peer.ID // channel to request connected peers on
|
||||||
|
|
||||||
// synchronized by Run loop, only touch inside there
|
// synchronized by Run loop, only touch inside there
|
||||||
peers map[peer.ID]*msgQueue
|
peers map[peer.ID]*msgQueue
|
||||||
@ -32,6 +33,7 @@ func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantMana
|
|||||||
incoming: make(chan []*bsmsg.Entry, 10),
|
incoming: make(chan []*bsmsg.Entry, 10),
|
||||||
connect: make(chan peer.ID, 10),
|
connect: make(chan peer.ID, 10),
|
||||||
disconnect: make(chan peer.ID, 10),
|
disconnect: make(chan peer.ID, 10),
|
||||||
|
peerReqs: make(chan chan []peer.ID),
|
||||||
peers: make(map[peer.ID]*msgQueue),
|
peers: make(map[peer.ID]*msgQueue),
|
||||||
wl: wantlist.NewThreadSafe(),
|
wl: wantlist.NewThreadSafe(),
|
||||||
network: network,
|
network: network,
|
||||||
@ -88,6 +90,12 @@ func (pm *WantManager) addEntries(ks []key.Key, cancel bool) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (pm *WantManager) ConnectedPeers() []peer.ID {
|
||||||
|
resp := make(chan []peer.ID)
|
||||||
|
pm.peerReqs <- resp
|
||||||
|
return <-resp
|
||||||
|
}
|
||||||
|
|
||||||
func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
|
func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
|
||||||
// Blocks need to be sent synchronously to maintain proper backpressure
|
// Blocks need to be sent synchronously to maintain proper backpressure
|
||||||
// throughout the network stack
|
// throughout the network stack
|
||||||
@ -242,6 +250,12 @@ func (pm *WantManager) Run() {
|
|||||||
pm.startPeerHandler(p)
|
pm.startPeerHandler(p)
|
||||||
case p := <-pm.disconnect:
|
case p := <-pm.disconnect:
|
||||||
pm.stopPeerHandler(p)
|
pm.stopPeerHandler(p)
|
||||||
|
case req := <-pm.peerReqs:
|
||||||
|
var peers []peer.ID
|
||||||
|
for p := range pm.peers {
|
||||||
|
peers = append(peers, p)
|
||||||
|
}
|
||||||
|
req <- peers
|
||||||
case <-pm.ctx.Done():
|
case <-pm.ctx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user