mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-28 00:39:31 +08:00
add files i forgot to last night
This commit is contained in:
116
routing/dht/mes_listener.go
Normal file
116
routing/dht/mes_listener.go
Normal file
@ -0,0 +1,116 @@
|
||||
package dht
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
swarm "github.com/jbenet/go-ipfs/swarm"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
)
|
||||
|
||||
type MesListener struct {
|
||||
listeners map[uint64]*listenInfo
|
||||
haltchan chan struct{}
|
||||
unlist chan uint64
|
||||
nlist chan *listenInfo
|
||||
send chan *respMes
|
||||
}
|
||||
|
||||
// The listen info struct holds information about a message that is being waited for
|
||||
type listenInfo struct {
|
||||
// Responses matching the listen ID will be sent through resp
|
||||
resp chan *swarm.Message
|
||||
|
||||
// count is the number of responses to listen for
|
||||
count int
|
||||
|
||||
// eol is the time at which this listener will expire
|
||||
eol time.Time
|
||||
|
||||
// sendlock is used to prevent conditions where we try to send on the resp
|
||||
// channel as its being closed by a timeout in another thread
|
||||
sendLock sync.Mutex
|
||||
|
||||
closed bool
|
||||
|
||||
id uint64
|
||||
}
|
||||
|
||||
func NewMesListener() *MesListener {
|
||||
ml := new(MesListener)
|
||||
ml.haltchan = make(chan struct{})
|
||||
ml.listeners = make(map[uint64]*listenInfo)
|
||||
ml.nlist = make(chan *listenInfo, 16)
|
||||
ml.send = make(chan *respMes, 16)
|
||||
ml.unlist = make(chan uint64, 16)
|
||||
go ml.run()
|
||||
return ml
|
||||
}
|
||||
|
||||
func (ml *MesListener) Listen(id uint64, count int, timeout time.Duration) <-chan *swarm.Message {
|
||||
li := new(listenInfo)
|
||||
li.count = count
|
||||
li.eol = time.Now().Add(timeout)
|
||||
li.resp = make(chan *swarm.Message, count)
|
||||
li.id = id
|
||||
ml.nlist <- li
|
||||
return li.resp
|
||||
}
|
||||
|
||||
func (ml *MesListener) Unlisten(id uint64) {
|
||||
ml.unlist <- id
|
||||
}
|
||||
|
||||
type respMes struct {
|
||||
id uint64
|
||||
mes *swarm.Message
|
||||
}
|
||||
|
||||
func (ml *MesListener) Respond(id uint64, mes *swarm.Message) {
|
||||
ml.send <- &respMes{
|
||||
id: id,
|
||||
mes: mes,
|
||||
}
|
||||
}
|
||||
|
||||
func (ml *MesListener) Halt() {
|
||||
ml.haltchan <- struct{}{}
|
||||
}
|
||||
|
||||
func (ml *MesListener) run() {
|
||||
for {
|
||||
select {
|
||||
case <-ml.haltchan:
|
||||
return
|
||||
case id := <-ml.unlist:
|
||||
trg, ok := ml.listeners[id]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
close(trg.resp)
|
||||
delete(ml.listeners, id)
|
||||
case li := <-ml.nlist:
|
||||
ml.listeners[li.id] = li
|
||||
case s := <-ml.send:
|
||||
trg, ok := ml.listeners[s.id]
|
||||
if !ok {
|
||||
u.DOut("Send with no listener.")
|
||||
continue
|
||||
}
|
||||
|
||||
if time.Now().After(trg.eol) {
|
||||
close(trg.resp)
|
||||
delete(ml.listeners, s.id)
|
||||
continue
|
||||
}
|
||||
|
||||
trg.resp <- s.mes
|
||||
trg.count--
|
||||
|
||||
if trg.count == 0 {
|
||||
close(trg.resp)
|
||||
delete(ml.listeners, s.id)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -21,6 +21,12 @@ import (
|
||||
// Pool size is the number of nodes used for group find/set RPC calls
|
||||
var PoolSize = 6
|
||||
|
||||
// We put the 'K' in kademlia!
|
||||
var KValue = 10
|
||||
|
||||
// Its in the paper, i swear
|
||||
var AlphaValue = 3
|
||||
|
||||
// TODO: determine a way of creating and managing message IDs
|
||||
func GenerateMessageID() uint64 {
|
||||
//return (uint64(rand.Uint32()) << 32) & uint64(rand.Uint32())
|
||||
@ -35,24 +41,25 @@ func GenerateMessageID() uint64 {
|
||||
// This is the top level "Store" operation of the DHT
|
||||
func (s *IpfsDHT) PutValue(key u.Key, value []byte) {
|
||||
complete := make(chan struct{})
|
||||
count := 0
|
||||
for _, route := range s.routes {
|
||||
p := route.NearestPeer(kb.ConvertKey(key))
|
||||
if p == nil {
|
||||
s.network.Error(kb.ErrLookupFailure)
|
||||
go func() {
|
||||
complete <- struct{}{}
|
||||
}()
|
||||
continue
|
||||
}
|
||||
go func() {
|
||||
err := s.putValueToNetwork(p, string(key), value)
|
||||
if err != nil {
|
||||
s.network.Error(err)
|
||||
peers := route.NearestPeers(kb.ConvertKey(key), KValue)
|
||||
for _, p := range peers {
|
||||
if p == nil {
|
||||
s.network.Error(kb.ErrLookupFailure)
|
||||
continue
|
||||
}
|
||||
complete <- struct{}{}
|
||||
}()
|
||||
count++
|
||||
go func(sp *peer.Peer) {
|
||||
err := s.putValueToNetwork(sp, string(key), value)
|
||||
if err != nil {
|
||||
s.network.Error(err)
|
||||
}
|
||||
complete <- struct{}{}
|
||||
}(p)
|
||||
}
|
||||
}
|
||||
for _, _ = range s.routes {
|
||||
for i := 0; i < count; i++ {
|
||||
<-complete
|
||||
}
|
||||
}
|
||||
@ -150,15 +157,13 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
|
||||
|
||||
c := counter{}
|
||||
|
||||
// This limit value is referred to as k in the kademlia paper
|
||||
limit := 20
|
||||
count := 0
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case p := <-npeer_chan:
|
||||
count++
|
||||
if count >= limit {
|
||||
if count >= KValue {
|
||||
break
|
||||
}
|
||||
c.Increment()
|
||||
@ -194,7 +199,7 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
|
||||
|
||||
for _, np := range peers {
|
||||
// TODO: filter out peers that arent closer
|
||||
if !pset.Contains(np) && pset.Size() < limit {
|
||||
if !pset.Contains(np) && pset.Size() < KValue {
|
||||
pset.Add(np) //This is racey... make a single function to do operation
|
||||
npeer_chan <- np
|
||||
}
|
||||
@ -204,8 +209,7 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
|
||||
}
|
||||
}
|
||||
|
||||
concurFactor := 3
|
||||
for i := 0; i < concurFactor; i++ {
|
||||
for i := 0; i < AlphaValue; i++ {
|
||||
go process()
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user