mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-03 21:08:17 +08:00
refactor peerSet
This commit is contained in:
@ -222,7 +222,7 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wl.Wan
|
|||||||
providers := bs.routing.FindProvidersAsync(child, k, maxProvidersPerRequest)
|
providers := bs.routing.FindProvidersAsync(child, k, maxProvidersPerRequest)
|
||||||
|
|
||||||
for prov := range providers {
|
for prov := range providers {
|
||||||
if ps.AddIfSmallerThan(prov, -1) { //Do once per peer
|
if ps.TryAdd(prov) { //Do once per peer
|
||||||
bs.send(ctx, prov, message)
|
bs.send(ctx, prov, message)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -141,11 +141,11 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int
|
|||||||
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, count int, peerOut chan peer.Peer) {
|
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, count int, peerOut chan peer.Peer) {
|
||||||
defer close(peerOut)
|
defer close(peerOut)
|
||||||
|
|
||||||
ps := pset.NewPeerSet()
|
ps := pset.NewLimitedPeerSet(count)
|
||||||
provs := dht.providers.GetProviders(ctx, key)
|
provs := dht.providers.GetProviders(ctx, key)
|
||||||
for _, p := range provs {
|
for _, p := range provs {
|
||||||
// NOTE: assuming that this list of peers is unique
|
// NOTE: assuming that this list of peers is unique
|
||||||
if ps.AddIfSmallerThan(p, count) {
|
if ps.TryAdd(p) {
|
||||||
select {
|
select {
|
||||||
case peerOut <- p:
|
case peerOut <- p:
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
@ -176,7 +176,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co
|
|||||||
|
|
||||||
// Add unique providers from request, up to 'count'
|
// Add unique providers from request, up to 'count'
|
||||||
for _, prov := range provs {
|
for _, prov := range provs {
|
||||||
if ps.AddIfSmallerThan(prov, count) {
|
if ps.TryAdd(prov) {
|
||||||
select {
|
select {
|
||||||
case peerOut <- prov:
|
case peerOut <- prov:
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
@ -226,7 +226,7 @@ func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.M
|
|||||||
}
|
}
|
||||||
|
|
||||||
dht.providers.AddProvider(k, p)
|
dht.providers.AddProvider(k, p)
|
||||||
if ps.AddIfSmallerThan(p, count) {
|
if ps.TryAdd(p) {
|
||||||
select {
|
select {
|
||||||
case out <- p:
|
case out <- p:
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
@ -9,11 +9,20 @@ import (
|
|||||||
type PeerSet struct {
|
type PeerSet struct {
|
||||||
ps map[string]bool
|
ps map[string]bool
|
||||||
lk sync.RWMutex
|
lk sync.RWMutex
|
||||||
|
size int
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPeerSet() *PeerSet {
|
func NewPeerSet() *PeerSet {
|
||||||
ps := new(PeerSet)
|
ps := new(PeerSet)
|
||||||
ps.ps = make(map[string]bool)
|
ps.ps = make(map[string]bool)
|
||||||
|
ps.size = -1
|
||||||
|
return ps
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewLimitedPeerSet(size int) *PeerSet {
|
||||||
|
ps := new(PeerSet)
|
||||||
|
ps.ps = make(map[string]bool)
|
||||||
|
ps.size = -1
|
||||||
return ps
|
return ps
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -36,10 +45,14 @@ func (ps *PeerSet) Size() int {
|
|||||||
return len(ps.ps)
|
return len(ps.ps)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ps *PeerSet) AddIfSmallerThan(p peer.Peer, maxsize int) bool {
|
// TryAdd Attempts to add the given peer into the set.
|
||||||
|
// This operation can fail for one of two reasons:
|
||||||
|
// 1) The given peer is already in the set
|
||||||
|
// 2) The number of peers in the set is equal to size
|
||||||
|
func (ps *PeerSet) TryAdd(p peer.Peer) bool {
|
||||||
var success bool
|
var success bool
|
||||||
ps.lk.Lock()
|
ps.lk.Lock()
|
||||||
if _, ok := ps.ps[string(p.ID())]; !ok && len(ps.ps) < maxsize {
|
if _, ok := ps.ps[string(p.ID())]; !ok && (len(ps.ps) < ps.size || ps.size == -1) {
|
||||||
success = true
|
success = true
|
||||||
ps.ps[string(p.ID())] = true
|
ps.ps[string(p.ID())] = true
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user