mirror of
https://github.com/ipfs/kubo.git
synced 2025-09-10 09:52:20 +08:00
feat(PQ)
refactor: peerRequestQueue it's a mistake to make one queue to fit all. Go's lack of algebraic types turns a generalized queue into a monstrosity of type checking/casting. Better to have individual queues for individual purposes. Conflicts: exchange/bitswap/decision/bench_test.go exchange/bitswap/decision/tasks/task_queue.go fix(bitswap.decision.PRQ): if peers match, always return result of pri comparison fix(bitswap.decision.Engine): push to the queue before notifying TOCTOU bug 1. client notifies 2. worker checks (finds nil) 3. worker sleeps 3. client pushes (worker missed the update) test(PQ): improve documentation and add test test(bitswap.decision.Engine): handling received messages License: MIT Signed-off-by: Brian Tiger Chow <brian@perfmode.com>
This commit is contained in:
@ -13,12 +13,13 @@ import (
|
||||
// FWIW: At the time of this commit, including a timestamp in task increases
|
||||
// time cost of Push by 3%.
|
||||
func BenchmarkTaskQueuePush(b *testing.B) {
|
||||
q := newTaskQueue()
|
||||
q := newPRQ()
|
||||
peers := []peer.ID{
|
||||
testutil.RandPeerIDFatal(b),
|
||||
testutil.RandPeerIDFatal(b),
|
||||
testutil.RandPeerIDFatal(b),
|
||||
}
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
q.Push(wantlist.Entry{Key: util.Key(i), Priority: math.MaxInt32}, peers[i%len(peers)])
|
||||
}
|
||||
|
@ -59,7 +59,7 @@ type Engine struct {
|
||||
// peerRequestQueue is a priority queue of requests received from peers.
|
||||
// Requests are popped from the queue, packaged up, and placed in the
|
||||
// outbox.
|
||||
peerRequestQueue *taskQueue
|
||||
peerRequestQueue peerRequestQueue
|
||||
|
||||
// FIXME it's a bit odd for the client and the worker to both share memory
|
||||
// (both modify the peerRequestQueue) and also to communicate over the
|
||||
@ -82,7 +82,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
|
||||
e := &Engine{
|
||||
ledgerMap: make(map[peer.ID]*ledger),
|
||||
bs: bs,
|
||||
peerRequestQueue: newTaskQueue(),
|
||||
peerRequestQueue: newPRQ(),
|
||||
outbox: make(chan Envelope, sizeOutboxChan),
|
||||
workSignal: make(chan struct{}),
|
||||
}
|
||||
@ -180,8 +180,8 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
|
||||
log.Debug("wants", entry.Key, entry.Priority)
|
||||
l.Wants(entry.Key, entry.Priority)
|
||||
if exists, err := e.bs.Has(entry.Key); err == nil && exists {
|
||||
newWorkExists = true
|
||||
e.peerRequestQueue.Push(entry.Entry, p)
|
||||
newWorkExists = true
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -191,8 +191,8 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
|
||||
l.ReceivedBytes(len(block.Data))
|
||||
for _, l := range e.ledgerMap {
|
||||
if entry, ok := l.WantListContains(block.Key()); ok {
|
||||
newWorkExists = true
|
||||
e.peerRequestQueue.Push(entry, l.Partner)
|
||||
newWorkExists = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,17 +1,19 @@
|
||||
package decision
|
||||
|
||||
import (
|
||||
"math"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||
sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
|
||||
|
||||
dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
|
||||
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
|
||||
message "github.com/jbenet/go-ipfs/exchange/bitswap/message"
|
||||
peer "github.com/jbenet/go-ipfs/p2p/peer"
|
||||
testutil "github.com/jbenet/go-ipfs/util/testutil"
|
||||
)
|
||||
|
||||
type peerAndEngine struct {
|
||||
@ -19,18 +21,20 @@ type peerAndEngine struct {
|
||||
Engine *Engine
|
||||
}
|
||||
|
||||
func newPeerAndLedgermanager(idStr string) peerAndEngine {
|
||||
func newEngine(ctx context.Context, idStr string) peerAndEngine {
|
||||
return peerAndEngine{
|
||||
Peer: peer.ID(idStr),
|
||||
//Strategy: New(true),
|
||||
Engine: NewEngine(context.TODO(),
|
||||
blockstore.NewBlockstore(sync.MutexWrap(ds.NewMapDatastore()))),
|
||||
Engine: NewEngine(ctx,
|
||||
blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))),
|
||||
}
|
||||
}
|
||||
|
||||
func TestConsistentAccounting(t *testing.T) {
|
||||
sender := newPeerAndLedgermanager("Ernie")
|
||||
receiver := newPeerAndLedgermanager("Bert")
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
sender := newEngine(ctx, "Ernie")
|
||||
receiver := newEngine(ctx, "Bert")
|
||||
|
||||
// Send messages from Ernie to Bert
|
||||
for i := 0; i < 1000; i++ {
|
||||
@ -62,8 +66,10 @@ func TestConsistentAccounting(t *testing.T) {
|
||||
|
||||
func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) {
|
||||
|
||||
sanfrancisco := newPeerAndLedgermanager("sf")
|
||||
seattle := newPeerAndLedgermanager("sea")
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
sanfrancisco := newEngine(ctx, "sf")
|
||||
seattle := newEngine(ctx, "sea")
|
||||
|
||||
m := message.New()
|
||||
|
||||
@ -91,3 +97,96 @@ func peerIsPartner(p peer.ID, e *Engine) bool {
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func TestOutboxClosedWhenEngineClosed(t *testing.T) {
|
||||
t.SkipNow() // TODO implement *Engine.Close
|
||||
e := NewEngine(context.Background(), blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())))
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
for _ = range e.Outbox() {
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
// e.Close()
|
||||
wg.Wait()
|
||||
if _, ok := <-e.Outbox(); ok {
|
||||
t.Fatal("channel should be closed")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPartnerWantsThenCancels(t *testing.T) {
|
||||
alphabet := strings.Split("abcdefghijklmnopqrstuvwxyz", "")
|
||||
vowels := strings.Split("aeiou", "")
|
||||
|
||||
type testCase [][]string
|
||||
testcases := []testCase{
|
||||
testCase{
|
||||
alphabet, vowels,
|
||||
},
|
||||
testCase{
|
||||
alphabet, stringsComplement(alphabet, vowels),
|
||||
},
|
||||
}
|
||||
|
||||
for _, testcase := range testcases {
|
||||
set := testcase[0]
|
||||
cancels := testcase[1]
|
||||
keeps := stringsComplement(set, cancels)
|
||||
|
||||
bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
|
||||
e := NewEngine(context.Background(), bs)
|
||||
partner := testutil.RandPeerIDFatal(t)
|
||||
for _, letter := range set {
|
||||
block := blocks.NewBlock([]byte(letter))
|
||||
bs.Put(block)
|
||||
}
|
||||
partnerWants(e, set, partner)
|
||||
partnerCancels(e, cancels, partner)
|
||||
assertPoppedInOrder(t, e, keeps)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func partnerWants(e *Engine, keys []string, partner peer.ID) {
|
||||
add := message.New()
|
||||
for i, letter := range keys {
|
||||
block := blocks.NewBlock([]byte(letter))
|
||||
add.AddEntry(block.Key(), math.MaxInt32-i)
|
||||
}
|
||||
e.MessageReceived(partner, add)
|
||||
}
|
||||
|
||||
func partnerCancels(e *Engine, keys []string, partner peer.ID) {
|
||||
cancels := message.New()
|
||||
for _, k := range keys {
|
||||
block := blocks.NewBlock([]byte(k))
|
||||
cancels.Cancel(block.Key())
|
||||
}
|
||||
e.MessageReceived(partner, cancels)
|
||||
}
|
||||
|
||||
func assertPoppedInOrder(t *testing.T, e *Engine, keys []string) {
|
||||
for _, k := range keys {
|
||||
envelope := <-e.Outbox()
|
||||
received := envelope.Message.Blocks()[0]
|
||||
expected := blocks.NewBlock([]byte(k))
|
||||
if received.Key() != expected.Key() {
|
||||
t.Fatal("received", string(received.Data), "expected", string(expected.Data))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func stringsComplement(set, subset []string) []string {
|
||||
m := make(map[string]struct{})
|
||||
for _, letter := range subset {
|
||||
m[letter] = struct{}{}
|
||||
}
|
||||
var complement []string
|
||||
for _, letter := range set {
|
||||
if _, exists := m[letter]; !exists {
|
||||
complement = append(complement, letter)
|
||||
}
|
||||
}
|
||||
return complement
|
||||
}
|
||||
|
134
exchange/bitswap/decision/peer_request_queue.go
Normal file
134
exchange/bitswap/decision/peer_request_queue.go
Normal file
@ -0,0 +1,134 @@
|
||||
package decision
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
pq "github.com/jbenet/go-ipfs/exchange/bitswap/decision/pq"
|
||||
wantlist "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
|
||||
peer "github.com/jbenet/go-ipfs/p2p/peer"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
)
|
||||
|
||||
type peerRequestQueue interface {
|
||||
// Pop returns the next peerRequestTask. Returns nil if the peerRequestQueue is empty.
|
||||
Pop() *peerRequestTask
|
||||
Push(entry wantlist.Entry, to peer.ID)
|
||||
Remove(k u.Key, p peer.ID)
|
||||
// NB: cannot expose simply expose taskQueue.Len because trashed elements
|
||||
// may exist. These trashed elements should not contribute to the count.
|
||||
}
|
||||
|
||||
func newPRQ() peerRequestQueue {
|
||||
return &prq{
|
||||
taskMap: make(map[string]*peerRequestTask),
|
||||
taskQueue: pq.New(wrapCmp(V1)),
|
||||
}
|
||||
}
|
||||
|
||||
var _ peerRequestQueue = &prq{}
|
||||
|
||||
// TODO: at some point, the strategy needs to plug in here
|
||||
// to help decide how to sort tasks (on add) and how to select
|
||||
// tasks (on getnext). For now, we are assuming a dumb/nice strategy.
|
||||
type prq struct {
|
||||
lock sync.Mutex
|
||||
taskQueue pq.PQ
|
||||
taskMap map[string]*peerRequestTask
|
||||
}
|
||||
|
||||
// Push currently adds a new peerRequestTask to the end of the list
|
||||
func (tl *prq) Push(entry wantlist.Entry, to peer.ID) {
|
||||
tl.lock.Lock()
|
||||
defer tl.lock.Unlock()
|
||||
if task, ok := tl.taskMap[taskKey(to, entry.Key)]; ok {
|
||||
task.Entry.Priority = entry.Priority
|
||||
tl.taskQueue.Update(task.index)
|
||||
return
|
||||
}
|
||||
task := &peerRequestTask{
|
||||
Entry: entry,
|
||||
Target: to,
|
||||
created: time.Now(),
|
||||
}
|
||||
tl.taskQueue.Push(task)
|
||||
tl.taskMap[task.Key()] = task
|
||||
}
|
||||
|
||||
// Pop 'pops' the next task to be performed. Returns nil if no task exists.
|
||||
func (tl *prq) Pop() *peerRequestTask {
|
||||
tl.lock.Lock()
|
||||
defer tl.lock.Unlock()
|
||||
var out *peerRequestTask
|
||||
for tl.taskQueue.Len() > 0 {
|
||||
out = tl.taskQueue.Pop().(*peerRequestTask)
|
||||
delete(tl.taskMap, out.Key())
|
||||
if out.trash {
|
||||
continue // discarding tasks that have been removed
|
||||
}
|
||||
break // and return |out|
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// Remove removes a task from the queue
|
||||
func (tl *prq) Remove(k u.Key, p peer.ID) {
|
||||
tl.lock.Lock()
|
||||
t, ok := tl.taskMap[taskKey(p, k)]
|
||||
if ok {
|
||||
// remove the task "lazily"
|
||||
// simply mark it as trash, so it'll be dropped when popped off the
|
||||
// queue.
|
||||
t.trash = true
|
||||
}
|
||||
tl.lock.Unlock()
|
||||
}
|
||||
|
||||
type peerRequestTask struct {
|
||||
Entry wantlist.Entry
|
||||
Target peer.ID // required
|
||||
|
||||
// trash in a book-keeping field
|
||||
trash bool
|
||||
// created marks the time that the task was added to the queue
|
||||
created time.Time
|
||||
index int // book-keeping field used by the pq container
|
||||
}
|
||||
|
||||
// Key uniquely identifies a task.
|
||||
func (t *peerRequestTask) Key() string {
|
||||
return taskKey(t.Target, t.Entry.Key)
|
||||
}
|
||||
|
||||
func (t *peerRequestTask) Index() int {
|
||||
return t.index
|
||||
}
|
||||
|
||||
func (t *peerRequestTask) SetIndex(i int) {
|
||||
t.index = i
|
||||
}
|
||||
|
||||
// taskKey returns a key that uniquely identifies a task.
|
||||
func taskKey(p peer.ID, k u.Key) string {
|
||||
return string(p.String() + k.String())
|
||||
}
|
||||
|
||||
// FIFO is a basic task comparator that returns tasks in the order created.
|
||||
var FIFO = func(a, b *peerRequestTask) bool {
|
||||
return a.created.Before(b.created)
|
||||
}
|
||||
|
||||
// V1 respects the target peer's wantlist priority. For tasks involving
|
||||
// different peers, the oldest task is prioritized.
|
||||
var V1 = func(a, b *peerRequestTask) bool {
|
||||
if a.Target == b.Target {
|
||||
return a.Entry.Priority > b.Entry.Priority
|
||||
}
|
||||
return FIFO(a, b)
|
||||
}
|
||||
|
||||
func wrapCmp(f func(a, b *peerRequestTask) bool) func(a, b pq.Elem) bool {
|
||||
return func(a, b pq.Elem) bool {
|
||||
return f(a.(*peerRequestTask), b.(*peerRequestTask))
|
||||
}
|
||||
}
|
56
exchange/bitswap/decision/peer_request_queue_test.go
Normal file
56
exchange/bitswap/decision/peer_request_queue_test.go
Normal file
@ -0,0 +1,56 @@
|
||||
package decision
|
||||
|
||||
import (
|
||||
"math"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
|
||||
"github.com/jbenet/go-ipfs/util"
|
||||
"github.com/jbenet/go-ipfs/util/testutil"
|
||||
)
|
||||
|
||||
func TestPushPop(t *testing.T) {
|
||||
prq := newPRQ()
|
||||
partner := testutil.RandPeerIDFatal(t)
|
||||
alphabet := strings.Split("abcdefghijklmnopqrstuvwxyz", "")
|
||||
vowels := strings.Split("aeiou", "")
|
||||
consonants := func() []string {
|
||||
var out []string
|
||||
for _, letter := range alphabet {
|
||||
skip := false
|
||||
for _, vowel := range vowels {
|
||||
if letter == vowel {
|
||||
skip = true
|
||||
}
|
||||
}
|
||||
if !skip {
|
||||
out = append(out, letter)
|
||||
}
|
||||
}
|
||||
return out
|
||||
}()
|
||||
sort.Strings(alphabet)
|
||||
sort.Strings(vowels)
|
||||
sort.Strings(consonants)
|
||||
|
||||
// add a bunch of blocks. cancel some. drain the queue. the queue should only have the kept entries
|
||||
|
||||
for _, index := range rand.Perm(len(alphabet)) { // add blocks for all letters
|
||||
letter := alphabet[index]
|
||||
t.Log(partner.String())
|
||||
prq.Push(wantlist.Entry{Key: util.Key(letter), Priority: math.MaxInt32 - index}, partner)
|
||||
}
|
||||
for _, consonant := range consonants {
|
||||
prq.Remove(util.Key(consonant), partner)
|
||||
}
|
||||
|
||||
for _, expected := range vowels {
|
||||
received := prq.Pop().Entry.Key
|
||||
if received != util.Key(expected) {
|
||||
t.Fatal("received", string(received), "expected", string(expected))
|
||||
}
|
||||
}
|
||||
}
|
105
exchange/bitswap/decision/pq/container.go
Normal file
105
exchange/bitswap/decision/pq/container.go
Normal file
@ -0,0 +1,105 @@
|
||||
package pq
|
||||
|
||||
import "container/heap"
|
||||
|
||||
// PQ is a basic priority queue.
|
||||
type PQ interface {
|
||||
// Push adds the ele
|
||||
Push(Elem)
|
||||
// Pop returns the highest priority Elem in PQ.
|
||||
Pop() Elem
|
||||
// Len returns the number of elements in the PQ.
|
||||
Len() int
|
||||
// Update `fixes` the PQ.
|
||||
Update(index int)
|
||||
|
||||
// TODO explain why this interface should not be extended
|
||||
// It does not support Remove. This is because...
|
||||
}
|
||||
|
||||
// Elem describes elements that can be added to the PQ. Clients must implement
|
||||
// this interface.
|
||||
type Elem interface {
|
||||
// SetIndex stores the int index.
|
||||
SetIndex(int)
|
||||
// Index returns the last given by SetIndex(int).
|
||||
Index() int
|
||||
}
|
||||
|
||||
// ElemComparator returns true if pri(a) > pri(b)
|
||||
type ElemComparator func(a, b Elem) bool
|
||||
|
||||
// New creates a PQ with a client-supplied comparator.
|
||||
func New(cmp ElemComparator) PQ {
|
||||
q := &wrapper{heapinterface{
|
||||
elems: make([]Elem, 0),
|
||||
cmp: cmp,
|
||||
}}
|
||||
heap.Init(&q.heapinterface)
|
||||
return q
|
||||
}
|
||||
|
||||
// wrapper exists because we cannot re-define Push. We want to expose
|
||||
// Push(Elem) but heap.Interface requires Push(interface{})
|
||||
type wrapper struct {
|
||||
heapinterface
|
||||
}
|
||||
|
||||
var _ PQ = &wrapper{}
|
||||
|
||||
func (w *wrapper) Push(e Elem) {
|
||||
heap.Push(&w.heapinterface, e)
|
||||
}
|
||||
|
||||
func (w *wrapper) Pop() Elem {
|
||||
return heap.Pop(&w.heapinterface).(Elem)
|
||||
}
|
||||
|
||||
func (w *wrapper) Update(index int) {
|
||||
heap.Fix(&w.heapinterface, index)
|
||||
}
|
||||
|
||||
// heapinterface handles dirty low-level details of managing the priority queue.
|
||||
type heapinterface struct {
|
||||
elems []Elem
|
||||
cmp ElemComparator
|
||||
}
|
||||
|
||||
var _ heap.Interface = &heapinterface{}
|
||||
|
||||
// public interface
|
||||
|
||||
func (q *heapinterface) Len() int {
|
||||
return len(q.elems)
|
||||
}
|
||||
|
||||
// Less delegates the decision to the comparator
|
||||
func (q *heapinterface) Less(i, j int) bool {
|
||||
return q.cmp(q.elems[i], q.elems[j])
|
||||
}
|
||||
|
||||
// Swap swaps the elements with indexes i and j.
|
||||
func (q *heapinterface) Swap(i, j int) {
|
||||
q.elems[i], q.elems[j] = q.elems[j], q.elems[i]
|
||||
q.elems[i].SetIndex(i)
|
||||
q.elems[j].SetIndex(j)
|
||||
}
|
||||
|
||||
// Note that Push and Pop in this interface are for package heap's
|
||||
// implementation to call. To add and remove things from the heap, wrap with
|
||||
// the pq struct to call heap.Push and heap.Pop.
|
||||
|
||||
func (q *heapinterface) Push(x interface{}) { // where to put the elem?
|
||||
t := x.(Elem)
|
||||
t.SetIndex(len(q.elems))
|
||||
q.elems = append(q.elems, t)
|
||||
}
|
||||
|
||||
func (q *heapinterface) Pop() interface{} {
|
||||
old := q.elems
|
||||
n := len(old)
|
||||
elem := old[n-1] // remove the last
|
||||
elem.SetIndex(-1) // for safety // FIXME why?
|
||||
q.elems = old[0 : n-1] // shrink
|
||||
return elem
|
||||
}
|
85
exchange/bitswap/decision/pq/container_test.go
Normal file
85
exchange/bitswap/decision/pq/container_test.go
Normal file
@ -0,0 +1,85 @@
|
||||
package pq
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"testing"
|
||||
)
|
||||
|
||||
type TestElem struct {
|
||||
Key string
|
||||
Priority int
|
||||
index int
|
||||
}
|
||||
|
||||
func (e *TestElem) Index() int {
|
||||
return e.index
|
||||
}
|
||||
|
||||
func (e *TestElem) SetIndex(i int) {
|
||||
e.index = i
|
||||
}
|
||||
|
||||
var PriorityComparator = func(i, j Elem) bool {
|
||||
return i.(*TestElem).Priority > j.(*TestElem).Priority
|
||||
}
|
||||
|
||||
func TestQueuesReturnTypeIsSameAsParameterToPush(t *testing.T) {
|
||||
q := New(PriorityComparator)
|
||||
expectedKey := "foo"
|
||||
elem := &TestElem{Key: expectedKey}
|
||||
q.Push(elem)
|
||||
switch v := q.Pop().(type) {
|
||||
case *TestElem:
|
||||
if v.Key != expectedKey {
|
||||
t.Fatal("the key doesn't match the pushed value")
|
||||
}
|
||||
default:
|
||||
t.Fatal("the queue is not casting values appropriately")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCorrectnessOfPop(t *testing.T) {
|
||||
q := New(PriorityComparator)
|
||||
tasks := []TestElem{
|
||||
TestElem{Key: "a", Priority: 9},
|
||||
TestElem{Key: "b", Priority: 4},
|
||||
TestElem{Key: "c", Priority: 3},
|
||||
TestElem{Key: "d", Priority: 0},
|
||||
TestElem{Key: "e", Priority: 6},
|
||||
}
|
||||
for _, e := range tasks {
|
||||
q.Push(&e)
|
||||
}
|
||||
var priorities []int
|
||||
for q.Len() > 0 {
|
||||
i := q.Pop().(*TestElem).Priority
|
||||
t.Log("popped %v", i)
|
||||
priorities = append(priorities, i)
|
||||
}
|
||||
if !sort.IntsAreSorted(priorities) {
|
||||
t.Fatal("the values were not returned in sorted order")
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdate(t *testing.T) {
|
||||
t.Log(`
|
||||
Add 3 elements.
|
||||
Update the highest priority element to have the lowest priority and fix the queue.
|
||||
It should come out last.`)
|
||||
q := New(PriorityComparator)
|
||||
lowest := &TestElem{Key: "originallyLowest", Priority: 1}
|
||||
middle := &TestElem{Key: "originallyMiddle", Priority: 2}
|
||||
highest := &TestElem{Key: "toBeUpdated", Priority: 3}
|
||||
q.Push(middle)
|
||||
q.Push(highest)
|
||||
q.Push(lowest)
|
||||
if q.Pop().(*TestElem).Key != highest.Key {
|
||||
t.Fatal("popped element doesn't have the highest priority")
|
||||
}
|
||||
q.Push(highest) // re-add the popped element
|
||||
highest.Priority = 0 // update the PQ
|
||||
q.Update(highest.Index()) // fix the PQ
|
||||
if q.Pop().(*TestElem).Key != middle.Key {
|
||||
t.Fatal("middle element should now have the highest priority")
|
||||
}
|
||||
}
|
@ -1,93 +0,0 @@
|
||||
package decision
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
wantlist "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
|
||||
peer "github.com/jbenet/go-ipfs/p2p/peer"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
)
|
||||
|
||||
// TODO: at some point, the strategy needs to plug in here
|
||||
// to help decide how to sort tasks (on add) and how to select
|
||||
// tasks (on getnext). For now, we are assuming a dumb/nice strategy.
|
||||
type taskQueue struct {
|
||||
// TODO: make this into a priority queue
|
||||
lock sync.Mutex
|
||||
tasks []*task
|
||||
taskmap map[string]*task
|
||||
}
|
||||
|
||||
func newTaskQueue() *taskQueue {
|
||||
return &taskQueue{
|
||||
taskmap: make(map[string]*task),
|
||||
}
|
||||
}
|
||||
|
||||
type task struct {
|
||||
Entry wantlist.Entry
|
||||
Target peer.ID
|
||||
Trash bool // TODO make private
|
||||
|
||||
created time.Time
|
||||
}
|
||||
|
||||
func (t *task) String() string {
|
||||
return fmt.Sprintf("<Task %s, %s, %v>", t.Target, t.Entry.Key, t.Trash)
|
||||
}
|
||||
|
||||
// Push currently adds a new task to the end of the list
|
||||
func (tl *taskQueue) Push(entry wantlist.Entry, to peer.ID) {
|
||||
tl.lock.Lock()
|
||||
defer tl.lock.Unlock()
|
||||
if task, ok := tl.taskmap[taskKey(to, entry.Key)]; ok {
|
||||
// TODO: when priority queue is implemented,
|
||||
// rearrange this task
|
||||
task.Entry.Priority = entry.Priority
|
||||
return
|
||||
}
|
||||
task := &task{
|
||||
Entry: entry,
|
||||
Target: to,
|
||||
created: time.Now(),
|
||||
}
|
||||
tl.tasks = append(tl.tasks, task)
|
||||
tl.taskmap[taskKey(to, entry.Key)] = task
|
||||
}
|
||||
|
||||
// Pop 'pops' the next task to be performed. Returns nil no task exists.
|
||||
func (tl *taskQueue) Pop() *task {
|
||||
tl.lock.Lock()
|
||||
defer tl.lock.Unlock()
|
||||
var out *task
|
||||
for len(tl.tasks) > 0 {
|
||||
// TODO: instead of zero, use exponential distribution
|
||||
// it will help reduce the chance of receiving
|
||||
// the same block from multiple peers
|
||||
out = tl.tasks[0]
|
||||
tl.tasks = tl.tasks[1:]
|
||||
delete(tl.taskmap, taskKey(out.Target, out.Entry.Key))
|
||||
if out.Trash {
|
||||
continue // discarding tasks that have been removed
|
||||
}
|
||||
break // and return |out|
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// Remove lazily removes a task from the queue
|
||||
func (tl *taskQueue) Remove(k u.Key, p peer.ID) {
|
||||
tl.lock.Lock()
|
||||
t, ok := tl.taskmap[taskKey(p, k)]
|
||||
if ok {
|
||||
t.Trash = true
|
||||
}
|
||||
tl.lock.Unlock()
|
||||
}
|
||||
|
||||
// taskKey returns a key that uniquely identifies a task.
|
||||
func taskKey(p peer.ID, k u.Key) string {
|
||||
return string(p) + string(k)
|
||||
}
|
Reference in New Issue
Block a user