1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-08-06 19:44:01 +08:00
Files
kubo/exchange/bitswap/wantmanager.go
Jeromy 53d47669da bubble up go-datastore deps
License: MIT
Signed-off-by: Jeromy <why@ipfs.io>
2016-11-28 22:29:38 -08:00

365 lines
7.7 KiB
Go

package bitswap
import (
"context"
"sync"
"time"
engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
)
type WantManager struct {
// sync channels for Run loop
incoming chan []*bsmsg.Entry
connect chan peer.ID // notification channel for new peers connecting
disconnect chan peer.ID // notification channel for peers disconnecting
peerReqs chan chan []peer.ID // channel to request connected peers on
// synchronized by Run loop, only touch inside there
peers map[peer.ID]*msgQueue
wl *wantlist.ThreadSafe
network bsnet.BitSwapNetwork
ctx context.Context
cancel func()
}
func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager {
ctx, cancel := context.WithCancel(ctx)
return &WantManager{
incoming: make(chan []*bsmsg.Entry, 10),
connect: make(chan peer.ID, 10),
disconnect: make(chan peer.ID, 10),
peerReqs: make(chan chan []peer.ID),
peers: make(map[peer.ID]*msgQueue),
wl: wantlist.NewThreadSafe(),
network: network,
ctx: ctx,
cancel: cancel,
}
}
type msgPair struct {
to peer.ID
msg bsmsg.BitSwapMessage
}
type cancellation struct {
who peer.ID
blk *cid.Cid
}
type msgQueue struct {
p peer.ID
outlk sync.Mutex
out bsmsg.BitSwapMessage
network bsnet.BitSwapNetwork
sender bsnet.MessageSender
refcnt int
work chan struct{}
done chan struct{}
}
func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid) {
log.Infof("want blocks: %s", ks)
pm.addEntries(ctx, ks, false)
}
func (pm *WantManager) CancelWants(ks []*cid.Cid) {
log.Infof("cancel wants: %s", ks)
pm.addEntries(context.TODO(), ks, true)
}
func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, cancel bool) {
var entries []*bsmsg.Entry
for i, k := range ks {
entries = append(entries, &bsmsg.Entry{
Cancel: cancel,
Entry: &wantlist.Entry{
Cid: k,
Priority: kMaxPriority - i,
RefCnt: 1,
},
})
}
select {
case pm.incoming <- entries:
case <-pm.ctx.Done():
case <-ctx.Done():
}
}
func (pm *WantManager) ConnectedPeers() []peer.ID {
resp := make(chan []peer.ID)
pm.peerReqs <- resp
return <-resp
}
func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
// Blocks need to be sent synchronously to maintain proper backpressure
// throughout the network stack
defer env.Sent()
msg := bsmsg.New(false)
msg.AddBlock(env.Block)
log.Infof("Sending block %s to %s", env.Block, env.Peer)
err := pm.network.SendMessage(ctx, env.Peer, msg)
if err != nil {
log.Infof("sendblock error: %s", err)
}
}
func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue {
mq, ok := pm.peers[p]
if ok {
mq.refcnt++
return nil
}
mq = pm.newMsgQueue(p)
// new peer, we will want to give them our full wantlist
fullwantlist := bsmsg.New(true)
for _, e := range pm.wl.Entries() {
fullwantlist.AddEntry(e.Cid, e.Priority)
}
mq.out = fullwantlist
mq.work <- struct{}{}
pm.peers[p] = mq
go mq.runQueue(pm.ctx)
return mq
}
func (pm *WantManager) stopPeerHandler(p peer.ID) {
pq, ok := pm.peers[p]
if !ok {
// TODO: log error?
return
}
pq.refcnt--
if pq.refcnt > 0 {
return
}
close(pq.done)
delete(pm.peers, p)
}
func (mq *msgQueue) runQueue(ctx context.Context) {
defer func() {
if mq.sender != nil {
mq.sender.Close()
}
}()
for {
select {
case <-mq.work: // there is work to be done
mq.doWork(ctx)
case <-mq.done:
return
case <-ctx.Done():
return
}
}
}
func (mq *msgQueue) doWork(ctx context.Context) {
if mq.sender == nil {
err := mq.openSender(ctx)
if err != nil {
log.Infof("cant open message sender to peer %s: %s", mq.p, err)
// TODO: cant connect, what now?
return
}
}
// grab outgoing message
mq.outlk.Lock()
wlm := mq.out
if wlm == nil || wlm.Empty() {
mq.outlk.Unlock()
return
}
mq.out = nil
mq.outlk.Unlock()
// send wantlist updates
for { // try to send this message until we fail.
err := mq.sender.SendMsg(wlm)
if err == nil {
return
}
log.Infof("bitswap send error: %s", err)
mq.sender.Close()
mq.sender = nil
select {
case <-mq.done:
return
case <-ctx.Done():
return
case <-time.After(time.Millisecond * 100):
// wait 100ms in case disconnect notifications are still propogating
log.Warning("SendMsg errored but neither 'done' nor context.Done() were set")
}
err = mq.openSender(ctx)
if err != nil {
log.Error("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err)
// TODO(why): what do we do now?
// I think the *right* answer is to probably put the message we're
// trying to send back, and then return to waiting for new work or
// a disconnect.
return
}
// TODO: Is this the same instance for the remote peer?
// If its not, we should resend our entire wantlist to them
/*
if mq.sender.InstanceID() != mq.lastSeenInstanceID {
wlm = mq.getFullWantlistMessage()
}
*/
}
}
func (mq *msgQueue) openSender(ctx context.Context) error {
// allow ten minutes for connections this includes looking them up in the
// dht dialing them, and handshaking
conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
defer cancel()
err := mq.network.ConnectTo(conctx, mq.p)
if err != nil {
return err
}
nsender, err := mq.network.NewMessageSender(ctx, mq.p)
if err != nil {
return err
}
mq.sender = nsender
return nil
}
func (pm *WantManager) Connected(p peer.ID) {
select {
case pm.connect <- p:
case <-pm.ctx.Done():
}
}
func (pm *WantManager) Disconnected(p peer.ID) {
select {
case pm.disconnect <- p:
case <-pm.ctx.Done():
}
}
// TODO: use goprocess here once i trust it
func (pm *WantManager) Run() {
tock := time.NewTicker(rebroadcastDelay.Get())
defer tock.Stop()
for {
select {
case entries := <-pm.incoming:
// add changes to our wantlist
var filtered []*bsmsg.Entry
for _, e := range entries {
if e.Cancel {
if pm.wl.Remove(e.Cid) {
filtered = append(filtered, e)
}
} else {
if pm.wl.AddEntry(e.Entry) {
filtered = append(filtered, e)
}
}
}
// broadcast those wantlist changes
for _, p := range pm.peers {
p.addMessage(filtered)
}
case <-tock.C:
// resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY)
var es []*bsmsg.Entry
for _, e := range pm.wl.Entries() {
es = append(es, &bsmsg.Entry{Entry: e})
}
for _, p := range pm.peers {
p.outlk.Lock()
p.out = bsmsg.New(true)
p.outlk.Unlock()
p.addMessage(es)
}
case p := <-pm.connect:
pm.startPeerHandler(p)
case p := <-pm.disconnect:
pm.stopPeerHandler(p)
case req := <-pm.peerReqs:
var peers []peer.ID
for p := range pm.peers {
peers = append(peers, p)
}
req <- peers
case <-pm.ctx.Done():
return
}
}
}
func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue {
return &msgQueue{
done: make(chan struct{}),
work: make(chan struct{}, 1),
network: wm.network,
p: p,
refcnt: 1,
}
}
func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) {
mq.outlk.Lock()
defer func() {
mq.outlk.Unlock()
select {
case mq.work <- struct{}{}:
default:
}
}()
// if we have no message held allocate a new one
if mq.out == nil {
mq.out = bsmsg.New(false)
}
// TODO: add a msg.Combine(...) method
// otherwise, combine the one we are holding with the
// one passed in
for _, e := range entries {
if e.Cancel {
mq.out.Cancel(e.Cid)
} else {
mq.out.AddEntry(e.Cid, e.Priority)
}
}
}