mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-30 18:13:54 +08:00
contextify peermanager
This commit is contained in:
@ -316,8 +316,6 @@ func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli
|
||||
|
||||
// TODO(brian): handle errors
|
||||
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) error {
|
||||
//defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done()
|
||||
|
||||
// This call records changes to wantlists, blocks received,
|
||||
// and number of bytes transfered.
|
||||
bs.engine.MessageReceived(p, incoming)
|
||||
|
@ -206,7 +206,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
|
||||
l.CancelWant(entry.Key)
|
||||
e.peerRequestQueue.Remove(entry.Key, p)
|
||||
} else {
|
||||
log.Debugf("wants %s", entry.Key, entry.Priority)
|
||||
log.Debugf("wants %s - %d", entry.Key, entry.Priority)
|
||||
l.Wants(entry.Key, entry.Priority)
|
||||
if exists, err := e.bs.Has(entry.Key); err == nil && exists {
|
||||
e.peerRequestQueue.Push(entry.Entry, p)
|
||||
|
@ -53,24 +53,24 @@ type msgQueue struct {
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
func (pm *PeerManager) SendBlock(env *engine.Envelope) {
|
||||
func (pm *PeerManager) SendBlock(ctx context.Context, env *engine.Envelope) {
|
||||
// Blocks need to be sent synchronously to maintain proper backpressure
|
||||
// throughout the network stack
|
||||
defer env.Sent()
|
||||
|
||||
msg := bsmsg.New()
|
||||
msg.AddBlock(env.Block)
|
||||
err := pm.network.SendMessage(context.TODO(), env.Peer, msg)
|
||||
err := pm.network.SendMessage(ctx, env.Peer, msg)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (pm *PeerManager) startPeerHandler(p peer.ID) {
|
||||
func (pm *PeerManager) startPeerHandler(ctx context.Context, p peer.ID) *msgQueue {
|
||||
_, ok := pm.peers[p]
|
||||
if ok {
|
||||
// TODO: log an error?
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
mq := new(msgQueue)
|
||||
@ -79,7 +79,8 @@ func (pm *PeerManager) startPeerHandler(p peer.ID) {
|
||||
mq.p = p
|
||||
|
||||
pm.peers[p] = mq
|
||||
go pm.runQueue(mq)
|
||||
go pm.runQueue(ctx, mq)
|
||||
return mq
|
||||
}
|
||||
|
||||
func (pm *PeerManager) stopPeerHandler(p peer.ID) {
|
||||
@ -93,14 +94,14 @@ func (pm *PeerManager) stopPeerHandler(p peer.ID) {
|
||||
delete(pm.peers, p)
|
||||
}
|
||||
|
||||
func (pm *PeerManager) runQueue(mq *msgQueue) {
|
||||
func (pm *PeerManager) runQueue(ctx context.Context, mq *msgQueue) {
|
||||
for {
|
||||
select {
|
||||
case <-mq.work: // there is work to be done
|
||||
|
||||
// TODO: this might not need to be done every time, figure out
|
||||
// a good heuristic
|
||||
err := pm.network.ConnectTo(context.TODO(), mq.p)
|
||||
err := pm.network.ConnectTo(ctx, mq.p)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
// TODO: cant connect, what now?
|
||||
@ -114,7 +115,7 @@ func (pm *PeerManager) runQueue(mq *msgQueue) {
|
||||
|
||||
if wlm != nil && !wlm.Empty() {
|
||||
// send wantlist updates
|
||||
err = pm.network.SendMessage(context.TODO(), mq.p, wlm)
|
||||
err = pm.network.SendMessage(ctx, mq.p, wlm)
|
||||
if err != nil {
|
||||
log.Error("bitswap send error: ", err)
|
||||
// TODO: what do we do if this fails?
|
||||
@ -162,13 +163,12 @@ func (pm *PeerManager) Run(ctx context.Context) {
|
||||
p, ok := pm.peers[msgp.to]
|
||||
if !ok {
|
||||
//TODO: decide, drop message? or dial?
|
||||
pm.startPeerHandler(msgp.to)
|
||||
p = pm.peers[msgp.to]
|
||||
p = pm.startPeerHandler(ctx, msgp.to)
|
||||
}
|
||||
|
||||
p.addMessage(msgp.msg)
|
||||
case p := <-pm.connect:
|
||||
pm.startPeerHandler(p)
|
||||
pm.startPeerHandler(ctx, p)
|
||||
case p := <-pm.disconnect:
|
||||
pm.stopPeerHandler(p)
|
||||
case <-ctx.Done():
|
||||
|
@ -46,6 +46,7 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
|
||||
bs.rebroadcastWorker(ctx)
|
||||
})
|
||||
|
||||
// Start up a worker to manage sending out provides messages
|
||||
px.Go(func(px process.Process) {
|
||||
bs.provideCollector(ctx)
|
||||
})
|
||||
@ -71,8 +72,7 @@ func (bs *Bitswap) taskWorker(ctx context.Context) {
|
||||
continue
|
||||
}
|
||||
|
||||
//log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer)
|
||||
bs.pm.SendBlock(envelope)
|
||||
bs.pm.SendBlock(ctx, envelope)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
Reference in New Issue
Block a user