diff --git a/routing/dht/mes_listener.go b/routing/dht/mes_listener.go new file mode 100644 index 000000000..2fcd99fc1 --- /dev/null +++ b/routing/dht/mes_listener.go @@ -0,0 +1,116 @@ +package dht + +import ( + "sync" + "time" + + swarm "github.com/jbenet/go-ipfs/swarm" + u "github.com/jbenet/go-ipfs/util" +) + +type MesListener struct { + listeners map[uint64]*listenInfo + haltchan chan struct{} + unlist chan uint64 + nlist chan *listenInfo + send chan *respMes +} + +// The listen info struct holds information about a message that is being waited for +type listenInfo struct { + // Responses matching the listen ID will be sent through resp + resp chan *swarm.Message + + // count is the number of responses to listen for + count int + + // eol is the time at which this listener will expire + eol time.Time + + // sendlock is used to prevent conditions where we try to send on the resp + // channel as its being closed by a timeout in another thread + sendLock sync.Mutex + + closed bool + + id uint64 +} + +func NewMesListener() *MesListener { + ml := new(MesListener) + ml.haltchan = make(chan struct{}) + ml.listeners = make(map[uint64]*listenInfo) + ml.nlist = make(chan *listenInfo, 16) + ml.send = make(chan *respMes, 16) + ml.unlist = make(chan uint64, 16) + go ml.run() + return ml +} + +func (ml *MesListener) Listen(id uint64, count int, timeout time.Duration) <-chan *swarm.Message { + li := new(listenInfo) + li.count = count + li.eol = time.Now().Add(timeout) + li.resp = make(chan *swarm.Message, count) + li.id = id + ml.nlist <- li + return li.resp +} + +func (ml *MesListener) Unlisten(id uint64) { + ml.unlist <- id +} + +type respMes struct { + id uint64 + mes *swarm.Message +} + +func (ml *MesListener) Respond(id uint64, mes *swarm.Message) { + ml.send <- &respMes{ + id: id, + mes: mes, + } +} + +func (ml *MesListener) Halt() { + ml.haltchan <- struct{}{} +} + +func (ml *MesListener) run() { + for { + select { + case <-ml.haltchan: + return + case id := <-ml.unlist: + trg, ok := ml.listeners[id] + if !ok { + continue + } + close(trg.resp) + delete(ml.listeners, id) + case li := <-ml.nlist: + ml.listeners[li.id] = li + case s := <-ml.send: + trg, ok := ml.listeners[s.id] + if !ok { + u.DOut("Send with no listener.") + continue + } + + if time.Now().After(trg.eol) { + close(trg.resp) + delete(ml.listeners, s.id) + continue + } + + trg.resp <- s.mes + trg.count-- + + if trg.count == 0 { + close(trg.resp) + delete(ml.listeners, s.id) + } + } + } +} diff --git a/routing/dht/routing.go b/routing/dht/routing.go index e56a54e0c..309962a93 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -21,6 +21,12 @@ import ( // Pool size is the number of nodes used for group find/set RPC calls var PoolSize = 6 +// We put the 'K' in kademlia! +var KValue = 10 + +// Its in the paper, i swear +var AlphaValue = 3 + // TODO: determine a way of creating and managing message IDs func GenerateMessageID() uint64 { //return (uint64(rand.Uint32()) << 32) & uint64(rand.Uint32()) @@ -35,24 +41,25 @@ func GenerateMessageID() uint64 { // This is the top level "Store" operation of the DHT func (s *IpfsDHT) PutValue(key u.Key, value []byte) { complete := make(chan struct{}) + count := 0 for _, route := range s.routes { - p := route.NearestPeer(kb.ConvertKey(key)) - if p == nil { - s.network.Error(kb.ErrLookupFailure) - go func() { - complete <- struct{}{} - }() - continue - } - go func() { - err := s.putValueToNetwork(p, string(key), value) - if err != nil { - s.network.Error(err) + peers := route.NearestPeers(kb.ConvertKey(key), KValue) + for _, p := range peers { + if p == nil { + s.network.Error(kb.ErrLookupFailure) + continue } - complete <- struct{}{} - }() + count++ + go func(sp *peer.Peer) { + err := s.putValueToNetwork(sp, string(key), value) + if err != nil { + s.network.Error(err) + } + complete <- struct{}{} + }(p) + } } - for _, _ = range s.routes { + for i := 0; i < count; i++ { <-complete } } @@ -150,15 +157,13 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { c := counter{} - // This limit value is referred to as k in the kademlia paper - limit := 20 count := 0 go func() { for { select { case p := <-npeer_chan: count++ - if count >= limit { + if count >= KValue { break } c.Increment() @@ -194,7 +199,7 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { for _, np := range peers { // TODO: filter out peers that arent closer - if !pset.Contains(np) && pset.Size() < limit { + if !pset.Contains(np) && pset.Size() < KValue { pset.Add(np) //This is racey... make a single function to do operation npeer_chan <- np } @@ -204,8 +209,7 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { } } - concurFactor := 3 - for i := 0; i < concurFactor; i++ { + for i := 0; i < AlphaValue; i++ { go process() }