1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-30 18:13:54 +08:00

rename to peerRequestQueue

this opens up the possibility of having multiple queues. And for all
outgoing messages to be managed by the decision engine

License: MIT
Signed-off-by: Brian Tiger Chow <brian@perfmode.com>
This commit is contained in:
Brian Tiger Chow
2014-12-16 23:07:58 -08:00
committed by Juan Batiz-Benet
parent 1d23e94f16
commit acc714823b

View File

@ -22,9 +22,10 @@ type Envelope struct {
}
type Engine struct {
// FIXME taskqueue isn't threadsafe nor is it protected by a mutex. consider
// a way to avoid sharing the taskqueue between the worker and the receiver
taskqueue *taskQueue
// FIXME peerRequestQueue isn't threadsafe nor is it protected by a mutex.
// consider a way to avoid sharing the peerRequestQueue between the worker
// and the receiver
peerRequestQueue *taskQueue
workSignal chan struct{}
@ -39,11 +40,11 @@ type Engine struct {
func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
e := &Engine{
ledgerMap: make(map[u.Key]*ledger),
bs: bs,
taskqueue: newTaskQueue(),
outbox: make(chan Envelope, 4), // TODO extract constant
workSignal: make(chan struct{}),
ledgerMap: make(map[u.Key]*ledger),
bs: bs,
peerRequestQueue: newTaskQueue(),
outbox: make(chan Envelope, 4), // TODO extract constant
workSignal: make(chan struct{}),
}
go e.taskWorker(ctx)
return e
@ -51,7 +52,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
func (e *Engine) taskWorker(ctx context.Context) {
for {
nextTask := e.taskqueue.Pop()
nextTask := e.peerRequestQueue.Pop()
if nextTask == nil {
// No tasks in the list?
// Wait until there are!
@ -128,11 +129,11 @@ func (e *Engine) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error {
for _, entry := range m.Wantlist() {
if entry.Cancel {
l.CancelWant(entry.Key)
e.taskqueue.Remove(entry.Key, p)
e.peerRequestQueue.Remove(entry.Key, p)
} else {
l.Wants(entry.Key, entry.Priority)
newWorkExists = true
e.taskqueue.Push(entry.Key, entry.Priority, p)
e.peerRequestQueue.Push(entry.Key, entry.Priority, p)
}
}
@ -142,7 +143,7 @@ func (e *Engine) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error {
for _, l := range e.ledgerMap {
if l.WantListContains(block.Key()) {
newWorkExists = true
e.taskqueue.Push(block.Key(), 1, l.Partner)
e.peerRequestQueue.Push(block.Key(), 1, l.Partner)
}
}
}
@ -163,7 +164,7 @@ func (e *Engine) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error {
for _, block := range m.Blocks() {
l.SentBytes(len(block.Data))
l.wantList.Remove(block.Key())
e.taskqueue.Remove(block.Key(), p)
e.peerRequestQueue.Remove(block.Key(), p)
}
return nil