1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-09-11 07:03:32 +08:00

rewrite sendWantlistToProviders

This commit is contained in:
Jeromy
2014-12-15 01:33:04 +00:00
committed by Juan Batiz-Benet
parent 946d2a96f2
commit cfbe92bc8b
4 changed files with 70 additions and 87 deletions

View File

@ -19,6 +19,7 @@ import (
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
eventlog "github.com/jbenet/go-ipfs/util/eventlog" eventlog "github.com/jbenet/go-ipfs/util/eventlog"
pset "github.com/jbenet/go-ipfs/util/peerset"
) )
var log = eventlog.Logger("bitswap") var log = eventlog.Logger("bitswap")
@ -204,58 +205,35 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e
} }
func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wl.Wantlist) { func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wl.Wantlist) {
provset := make(map[u.Key]peer.Peer)
provcollect := make(chan peer.Peer)
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
wg := sync.WaitGroup{}
// Get providers for all entries in wantlist (could take a while)
for _, e := range wantlist.Entries() {
wg.Add(1)
go func(k u.Key) {
child, _ := context.WithTimeout(ctx, providerRequestTimeout)
providers := bs.routing.FindProvidersAsync(child, k, maxProvidersPerRequest)
for prov := range providers {
provcollect <- prov
}
wg.Done()
}(e.Value)
}
// When all workers finish, close the providers channel
go func() {
wg.Wait()
close(provcollect)
}()
// Filter out duplicates,
// no need to send our wantlists out twice in a given time period
for {
select {
case p, ok := <-provcollect:
if !ok {
break
}
provset[p.Key()] = p
case <-ctx.Done():
log.Error("Context cancelled before we got all the providers!")
return
}
}
message := bsmsg.New() message := bsmsg.New()
message.SetFull(true) message.SetFull(true)
for _, e := range bs.wantlist.Entries() { for _, e := range bs.wantlist.Entries() {
message.AddEntry(e.Value, e.Priority, false) message.AddEntry(e.Value, e.Priority, false)
} }
for _, prov := range provset { ps := pset.NewPeerSet()
// Get providers for all entries in wantlist (could take a while)
wg := sync.WaitGroup{}
for _, e := range wantlist.Entries() {
wg.Add(1)
go func(k u.Key) {
defer wg.Done()
child, _ := context.WithTimeout(ctx, providerRequestTimeout)
providers := bs.routing.FindProvidersAsync(child, k, maxProvidersPerRequest)
for prov := range providers {
if ps.AddIfSmallerThan(prov, -1) { //Do once per peer
bs.send(ctx, prov, message) bs.send(ctx, prov, message)
} }
} }
}(e.Value)
}
wg.Wait()
}
func (bs *bitswap) roundWorker(ctx context.Context) { func (bs *bitswap) roundWorker(ctx context.Context) {
roundTicker := time.NewTicker(roundTime) roundTicker := time.NewTicker(roundTime)

View File

@ -11,6 +11,7 @@ import (
pb "github.com/jbenet/go-ipfs/routing/dht/pb" pb "github.com/jbenet/go-ipfs/routing/dht/pb"
kb "github.com/jbenet/go-ipfs/routing/kbucket" kb "github.com/jbenet/go-ipfs/routing/kbucket"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
pset "github.com/jbenet/go-ipfs/util/peerset"
) )
// asyncQueryBuffer is the size of buffered channels in async queries. This // asyncQueryBuffer is the size of buffered channels in async queries. This
@ -140,7 +141,7 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, count int, peerOut chan peer.Peer) { func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, count int, peerOut chan peer.Peer) {
defer close(peerOut) defer close(peerOut)
ps := newPeerSet() ps := pset.NewPeerSet()
provs := dht.providers.GetProviders(ctx, key) provs := dht.providers.GetProviders(ctx, key)
for _, p := range provs { for _, p := range provs {
// NOTE: assuming that this list of peers is unique // NOTE: assuming that this list of peers is unique
@ -207,7 +208,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co
} }
} }
func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.Message_Peer, ps *peerSet, count int, out chan peer.Peer) { func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.Message_Peer, ps *pset.PeerSet, count int, out chan peer.Peer) {
var wg sync.WaitGroup var wg sync.WaitGroup
for _, pbp := range peers { for _, pbp := range peers {
wg.Add(1) wg.Add(1)

View File

@ -2,8 +2,6 @@ package dht
import ( import (
"sync" "sync"
peer "github.com/jbenet/go-ipfs/peer"
) )
// Pool size is the number of nodes used for group find/set RPC calls // Pool size is the number of nodes used for group find/set RPC calls
@ -39,45 +37,3 @@ func (c *counter) Size() (s int) {
c.mut.Unlock() c.mut.Unlock()
return return
} }
// peerSet is a threadsafe set of peers
type peerSet struct {
ps map[string]bool
lk sync.RWMutex
}
func newPeerSet() *peerSet {
ps := new(peerSet)
ps.ps = make(map[string]bool)
return ps
}
func (ps *peerSet) Add(p peer.Peer) {
ps.lk.Lock()
ps.ps[string(p.ID())] = true
ps.lk.Unlock()
}
func (ps *peerSet) Contains(p peer.Peer) bool {
ps.lk.RLock()
_, ok := ps.ps[string(p.ID())]
ps.lk.RUnlock()
return ok
}
func (ps *peerSet) Size() int {
ps.lk.RLock()
defer ps.lk.RUnlock()
return len(ps.ps)
}
func (ps *peerSet) AddIfSmallerThan(p peer.Peer, maxsize int) bool {
var success bool
ps.lk.Lock()
if _, ok := ps.ps[string(p.ID())]; !ok && len(ps.ps) < maxsize {
success = true
ps.ps[string(p.ID())] = true
}
ps.lk.Unlock()
return success
}

48
util/peerset/peerset.go Normal file
View File

@ -0,0 +1,48 @@
package peerset
import (
peer "github.com/jbenet/go-ipfs/peer"
"sync"
)
// PeerSet is a threadsafe set of peers
type PeerSet struct {
ps map[string]bool
lk sync.RWMutex
}
func NewPeerSet() *PeerSet {
ps := new(PeerSet)
ps.ps = make(map[string]bool)
return ps
}
func (ps *PeerSet) Add(p peer.Peer) {
ps.lk.Lock()
ps.ps[string(p.ID())] = true
ps.lk.Unlock()
}
func (ps *PeerSet) Contains(p peer.Peer) bool {
ps.lk.RLock()
_, ok := ps.ps[string(p.ID())]
ps.lk.RUnlock()
return ok
}
func (ps *PeerSet) Size() int {
ps.lk.RLock()
defer ps.lk.RUnlock()
return len(ps.ps)
}
func (ps *PeerSet) AddIfSmallerThan(p peer.Peer, maxsize int) bool {
var success bool
ps.lk.Lock()
if _, ok := ps.ps[string(p.ID())]; !ok && len(ps.ps) < maxsize {
success = true
ps.ps[string(p.ID())] = true
}
ps.lk.Unlock()
return success
}