1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-09-10 05:52:20 +08:00

some code cleanup and commenting

This commit is contained in:
Jeromy
2015-04-03 11:40:26 -07:00
parent d765c14e8f
commit 738db201d9
2 changed files with 19 additions and 11 deletions

View File

@ -55,9 +55,6 @@ type Envelope struct {
Peer peer.ID
// Message is the payload
Message bsmsg.BitSwapMessage
// A callback to notify the decision queue that the task is complete
Sent func()
}
type Engine struct {
@ -143,7 +140,6 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
return &Envelope{
Peer: nextTask.Target,
Message: m,
Sent: nextTask.Done,
}, nil
}
}

View File

@ -27,6 +27,7 @@ func newPRQ() peerRequestQueue {
}
}
// verify interface implementation
var _ peerRequestQueue = &prq{}
// TODO: at some point, the strategy needs to plug in here
@ -81,12 +82,7 @@ func (tl *prq) Pop() *peerRequestTask {
if tl.pQueue.Len() == 0 {
return nil
}
pElem := tl.pQueue.Pop()
if pElem == nil {
return nil
}
partner := pElem.(*activePartner)
partner := tl.pQueue.Pop().(*activePartner)
var out *peerRequestTask
for partner.taskQueue.Len() > 0 {
@ -97,6 +93,8 @@ func (tl *prq) Pop() *peerRequestTask {
}
break // and return |out|
}
// start the new task, and push the partner back onto the queue
partner.StartTask()
partner.requests--
tl.pQueue.Push(partner)
@ -112,6 +110,8 @@ func (tl *prq) Remove(k u.Key, p peer.ID) {
// simply mark it as trash, so it'll be dropped when popped off the
// queue.
t.trash = true
// having canceled a block, we now account for that in the given partner
tl.partners[p].requests--
}
tl.lock.Unlock()
@ -121,6 +121,7 @@ type peerRequestTask struct {
Entry wantlist.Entry
Target peer.ID
// A callback to signal that this task has been completed
Done func()
// trash in a book-keeping field
@ -135,10 +136,12 @@ func (t *peerRequestTask) Key() string {
return taskKey(t.Target, t.Entry.Key)
}
// Index implements pq.Elem
func (t *peerRequestTask) Index() int {
return t.index
}
// SetIndex implements pq.Elem
func (t *peerRequestTask) SetIndex(i int) {
t.index = i
}
@ -172,17 +175,22 @@ type activePartner struct {
lk sync.Mutex
// Active is the number of blocks this peer is currently being sent
// active must be locked around as it will be updated externally
active int
// requests is the number of blocks this peer is currently requesting
// request need not be locked around as it will only be modified under
// the peerRequestQueue's locks
requests int
// for the PQ interface
index int
// priority queue of
// priority queue of tasks belonging to this peer
taskQueue pq.PQ
}
// partnerCompare implements pq.ElemComparator
func partnerCompare(a, b pq.Elem) bool {
pa := a.(*activePartner)
pb := b.(*activePartner)
@ -197,12 +205,14 @@ func partnerCompare(a, b pq.Elem) bool {
return pa.active < pb.active
}
// StartTask signals that a task was started for this partner
func (p *activePartner) StartTask() {
p.lk.Lock()
p.active++
p.lk.Unlock()
}
// TaskDone signals that a task was completed for this partner
func (p *activePartner) TaskDone() {
p.lk.Lock()
p.active--
@ -212,10 +222,12 @@ func (p *activePartner) TaskDone() {
p.lk.Unlock()
}
// Index implements pq.Elem
func (p *activePartner) Index() int {
return p.index
}
// SetIndex implements pq.Elem
func (p *activePartner) SetIndex(i int) {
p.index = i
}