mirror of
https://github.com/ipfs/kubo.git
synced 2025-09-10 05:52:20 +08:00
prevent wantmanager from leaking goroutines (and memory)
License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com>
This commit is contained in:
@ -137,36 +137,49 @@ func (mq *msgQueue) runQueue(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-mq.work: // there is work to be done
|
||||
|
||||
err := mq.network.ConnectTo(ctx, mq.p)
|
||||
if err != nil {
|
||||
log.Noticef("cant connect to peer %s: %s", mq.p, err)
|
||||
// TODO: cant connect, what now?
|
||||
continue
|
||||
}
|
||||
|
||||
// grab outgoing message
|
||||
mq.outlk.Lock()
|
||||
wlm := mq.out
|
||||
if wlm == nil || wlm.Empty() {
|
||||
mq.outlk.Unlock()
|
||||
continue
|
||||
}
|
||||
mq.out = nil
|
||||
mq.outlk.Unlock()
|
||||
|
||||
// send wantlist updates
|
||||
err = mq.network.SendMessage(ctx, mq.p, wlm)
|
||||
if err != nil {
|
||||
log.Noticef("bitswap send error: %s", err)
|
||||
// TODO: what do we do if this fails?
|
||||
}
|
||||
mq.doWork(ctx)
|
||||
case <-mq.done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (mq *msgQueue) doWork(ctx context.Context) {
|
||||
// allow a minute for connections
|
||||
// this includes looking them up in the dht
|
||||
// dialing them, and handshaking
|
||||
conctx, cancel := context.WithTimeout(ctx, time.Minute)
|
||||
defer cancel()
|
||||
|
||||
err := mq.network.ConnectTo(conctx, mq.p)
|
||||
if err != nil {
|
||||
log.Noticef("cant connect to peer %s: %s", mq.p, err)
|
||||
// TODO: cant connect, what now?
|
||||
return
|
||||
}
|
||||
|
||||
// grab outgoing message
|
||||
mq.outlk.Lock()
|
||||
wlm := mq.out
|
||||
mq.out = nil
|
||||
mq.outlk.Unlock()
|
||||
|
||||
if wlm == nil || wlm.Empty() {
|
||||
return
|
||||
}
|
||||
|
||||
sendctx, cancel := context.WithTimeout(ctx, time.Second*30)
|
||||
defer cancel()
|
||||
|
||||
// send wantlist updates
|
||||
err = mq.network.SendMessage(sendctx, mq.p, wlm)
|
||||
if err != nil {
|
||||
log.Noticef("bitswap send error: %s", err)
|
||||
// TODO: what do we do if this fails?
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (pm *WantManager) Connected(p peer.ID) {
|
||||
pm.connect <- p
|
||||
}
|
||||
|
Reference in New Issue
Block a user