From a21c1b6b6229e8fc01defca2f57141194d38aa1a Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Wed, 17 Sep 2014 02:58:10 -0700 Subject: [PATCH] PeerQueue (based on XOR distance metric) --- peer/queue/distance.go | 84 ++++++++++++++++++++++++++++++++++++++++ peer/queue/interface.go | 15 +++++++ peer/queue/queue_test.go | 62 +++++++++++++++++++++++++++++ 3 files changed, 161 insertions(+) create mode 100644 peer/queue/distance.go create mode 100644 peer/queue/interface.go create mode 100644 peer/queue/queue_test.go diff --git a/peer/queue/distance.go b/peer/queue/distance.go new file mode 100644 index 000000000..0a53adb04 --- /dev/null +++ b/peer/queue/distance.go @@ -0,0 +1,84 @@ +package queue + +import ( + "container/heap" + "math/big" + + peer "github.com/jbenet/go-ipfs/peer" + ks "github.com/jbenet/go-ipfs/routing/keyspace" + u "github.com/jbenet/go-ipfs/util" +) + +// peerDistance tracks a peer and its distance to something else. +type peerDistance struct { + // the peer + peer *peer.Peer + + // big.Int for XOR metric + distance *big.Int +} + +// distancePQ implements heap.Interface and PeerQueue +type distancePQ struct { + // from is the Key this PQ measures against + from ks.Key + + // peers is a heap of peerDistance items + peers []*peerDistance +} + +func (pq distancePQ) Len() int { + return len(pq.peers) +} + +func (pq distancePQ) Less(i, j int) bool { + return -1 == pq.peers[i].distance.Cmp(pq.peers[j].distance) +} + +func (pq distancePQ) Swap(i, j int) { + p := pq.peers + p[i], p[j] = p[j], p[i] +} + +func (pq *distancePQ) Push(x interface{}) { + item := x.(*peerDistance) + pq.peers = append(pq.peers, item) +} + +func (pq *distancePQ) Pop() interface{} { + old := pq.peers + n := len(old) + item := old[n-1] + pq.peers = old[0 : n-1] + return item +} + +func (pq *distancePQ) Enqueue(p *peer.Peer) { + distance := ks.XORKeySpace.Key(p.ID).Distance(pq.from) + + heap.Push(pq, &peerDistance{ + peer: p, + distance: distance, + }) +} + +func (pq *distancePQ) Dequeue() *peer.Peer { + if len(pq.peers) < 1 { + panic("called Dequeue on an empty PeerQueue") + // will panic internally anyway, but we can help debug here + } + + o := heap.Pop(pq) + p := o.(*peerDistance) + return p.peer +} + +// NewXORDistancePQ returns a PeerQueue which maintains its peers sorted +// in terms of their distances to each other in an XORKeySpace (i.e. using +// XOR as a metric of distance). +func NewXORDistancePQ(fromKey u.Key) PeerQueue { + return &distancePQ{ + from: ks.XORKeySpace.Key([]byte(fromKey)), + peers: []*peerDistance{}, + } +} diff --git a/peer/queue/interface.go b/peer/queue/interface.go new file mode 100644 index 000000000..180cc9ce9 --- /dev/null +++ b/peer/queue/interface.go @@ -0,0 +1,15 @@ +package queue + +import peer "github.com/jbenet/go-ipfs/peer" + +// PeerQueue maintains a set of peers ordered according to a metric. +// Implementations of PeerQueue could order peers based on distances along +// a KeySpace, latency measurements, trustworthiness, reputation, etc. +type PeerQueue interface { + + // Enqueue adds this node to the queue. + Enqueue(*peer.Peer) + + // Dequeue retrieves the highest (smallest int) priority node + Dequeue() *peer.Peer +} diff --git a/peer/queue/queue_test.go b/peer/queue/queue_test.go new file mode 100644 index 000000000..f9a843396 --- /dev/null +++ b/peer/queue/queue_test.go @@ -0,0 +1,62 @@ +package queue + +import ( + "testing" + + peer "github.com/jbenet/go-ipfs/peer" + u "github.com/jbenet/go-ipfs/util" +) + +func newPeer(id string) *peer.Peer { + return &peer.Peer{ID: peer.ID(id)} +} + +func TestPeerstore(t *testing.T) { + + p1 := newPeer("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31") + p2 := newPeer("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a32") + p3 := newPeer("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33") + p4 := newPeer("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a34") + p5 := newPeer("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31") + + // these are the peer.IDs' XORKeySpace Key values: + // [228 47 151 130 156 102 222 232 218 31 132 94 170 208 80 253 120 103 55 35 91 237 48 157 81 245 57 247 66 150 9 40] + // [26 249 85 75 54 49 25 30 21 86 117 62 85 145 48 175 155 194 210 216 58 14 241 143 28 209 129 144 122 28 163 6] + // [78 135 26 216 178 181 224 181 234 117 2 248 152 115 255 103 244 34 4 152 193 88 9 225 8 127 216 158 226 8 236 246] + // [125 135 124 6 226 160 101 94 192 57 39 12 18 79 121 140 190 154 147 55 44 83 101 151 63 255 94 179 51 203 241 51] + + pq := NewXORDistancePQ(u.Key("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31")) + pq.Enqueue(p3) + pq.Enqueue(p1) + pq.Enqueue(p2) + pq.Enqueue(p4) + pq.Enqueue(p5) + pq.Enqueue(p1) + + // should come out as: p1, p4, p3, p2 + + if d := pq.Dequeue(); d != p1 && d != p5 { + t.Error("ordering failed") + } + + if d := pq.Dequeue(); d != p1 && d != p5 { + t.Error("ordering failed") + } + + if d := pq.Dequeue(); d != p1 && d != p5 { + t.Error("ordering failed") + } + + if pq.Dequeue() != p4 { + t.Error("ordering failed") + } + + if pq.Dequeue() != p3 { + t.Error("ordering failed") + } + + if pq.Dequeue() != p2 { + t.Error("ordering failed") + } + +}