From 8ed0f4b854272b5369175ae2e3d9979cf11ddf54 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sun, 8 Mar 2015 14:10:02 -0700 Subject: [PATCH 1/2] respect contexts in a more timely manner --- exchange/bitswap/bitswap.go | 53 ++++++++++++++++++++++++++++--------- 1 file changed, 40 insertions(+), 13 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 5271e23f1..91105b20a 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -227,21 +227,40 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { func (bs *Bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error { set := pset.New() wg := sync.WaitGroup{} - for peerToQuery := range peers { - if !set.TryAdd(peerToQuery) { //Do once per peer - continue - } - - wg.Add(1) - go func(p peer.ID) { - defer wg.Done() - if err := bs.send(ctx, p, m); err != nil { - log.Debug(err) // TODO remove if too verbose +loop: + for { + select { + case peerToQuery, ok := <-peers: + if !ok { + break loop } - }(peerToQuery) + + if !set.TryAdd(peerToQuery) { //Do once per peer + continue + } + + wg.Add(1) + go func(p peer.ID) { + defer wg.Done() + if err := bs.send(ctx, p, m); err != nil { + log.Debug(err) // TODO remove if too verbose + } + }(peerToQuery) + case <-ctx.Done(): + return nil + } + } + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + case <-ctx.Done(): } - wg.Wait() return nil } @@ -385,7 +404,15 @@ func (bs *Bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) { } }(p) } - wg.Wait() + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + select { + case <-done: + case <-ctx.Done(): + } } func (bs *Bitswap) ReceiveError(err error) { From 5eb08c44736b920cfffc8560a501071098007fd3 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 9 Mar 2015 00:03:59 -0700 Subject: [PATCH 2/2] add warning comment about possibly leaked goroutines --- exchange/bitswap/bitswap.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 91105b20a..649b3cc48 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -260,6 +260,9 @@ loop: select { case <-done: case <-ctx.Done(): + // NB: we may be abandoning goroutines here before they complete + // this shouldnt be an issue because they will complete soon anyways + // we just don't want their being slow to impact bitswap transfer speeds } return nil } @@ -412,6 +415,9 @@ func (bs *Bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) { select { case <-done: case <-ctx.Done(): + // NB: we may be abandoning goroutines here before they complete + // this shouldnt be an issue because they will complete soon anyways + // we just don't want their being slow to impact bitswap transfer speeds } }