mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-28 17:03:58 +08:00
Merge pull request #900 from jbenet/fix/bitswap-ctx-respect
respect contexts in a more timely manner
This commit is contained in:
@ -227,21 +227,43 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
|
|||||||
func (bs *Bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error {
|
func (bs *Bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error {
|
||||||
set := pset.New()
|
set := pset.New()
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
for peerToQuery := range peers {
|
|
||||||
|
|
||||||
if !set.TryAdd(peerToQuery) { //Do once per peer
|
loop:
|
||||||
continue
|
for {
|
||||||
}
|
select {
|
||||||
|
case peerToQuery, ok := <-peers:
|
||||||
wg.Add(1)
|
if !ok {
|
||||||
go func(p peer.ID) {
|
break loop
|
||||||
defer wg.Done()
|
|
||||||
if err := bs.send(ctx, p, m); err != nil {
|
|
||||||
log.Debug(err) // TODO remove if too verbose
|
|
||||||
}
|
}
|
||||||
}(peerToQuery)
|
|
||||||
|
if !set.TryAdd(peerToQuery) { //Do once per peer
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func(p peer.ID) {
|
||||||
|
defer wg.Done()
|
||||||
|
if err := bs.send(ctx, p, m); err != nil {
|
||||||
|
log.Debug(err) // TODO remove if too verbose
|
||||||
|
}
|
||||||
|
}(peerToQuery)
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
wg.Wait()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-ctx.Done():
|
||||||
|
// NB: we may be abandoning goroutines here before they complete
|
||||||
|
// this shouldnt be an issue because they will complete soon anyways
|
||||||
|
// we just don't want their being slow to impact bitswap transfer speeds
|
||||||
}
|
}
|
||||||
wg.Wait()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -385,7 +407,18 @@ func (bs *Bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) {
|
|||||||
}
|
}
|
||||||
}(p)
|
}(p)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
wg.Wait()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-ctx.Done():
|
||||||
|
// NB: we may be abandoning goroutines here before they complete
|
||||||
|
// this shouldnt be an issue because they will complete soon anyways
|
||||||
|
// we just don't want their being slow to impact bitswap transfer speeds
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *Bitswap) ReceiveError(err error) {
|
func (bs *Bitswap) ReceiveError(err error) {
|
||||||
|
Reference in New Issue
Block a user