1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-09-10 11:52:21 +08:00

get implementation according to kademlia spec.

This commit is contained in:
Jeromy
2014-08-15 09:39:38 -07:00
parent 60d061cb49
commit b7a882be89
4 changed files with 141 additions and 46 deletions

View File

@ -496,6 +496,45 @@ out:
dht.network.Send(mes) dht.network.Send(mes)
} }
func (dht *IpfsDHT) getValueOrPeers(p *peer.Peer, key u.Key, timeout time.Duration, level int) ([]byte, []*peer.Peer, error) {
pmes, err := dht.getValueSingle(p, key, timeout, level)
if err != nil {
return nil, nil, u.WrapError(err, "getValue Error")
}
if pmes.GetSuccess() {
if pmes.Value == nil { // We were given provider[s]
val, err := dht.getFromPeerList(key, timeout, pmes.GetPeers(), level)
if err != nil {
return nil, nil, err
}
return val, nil, nil
}
// Success! We were given the value
return pmes.GetValue(), nil, nil
} else {
// We were given a closer node
var peers []*peer.Peer
for _, pb := range pmes.GetPeers() {
addr, err := ma.NewMultiaddr(pb.GetAddr())
if err != nil {
u.PErr(err.Error())
continue
}
np, err := dht.network.GetConnection(peer.ID(pb.GetId()), addr)
if err != nil {
u.PErr(err.Error())
continue
}
peers = append(peers, np)
}
return nil, peers, nil
}
}
// getValueSingle simply performs the get value RPC with the given parameters // getValueSingle simply performs the get value RPC with the given parameters
func (dht *IpfsDHT) getValueSingle(p *peer.Peer, key u.Key, timeout time.Duration, level int) (*PBDHTMessage, error) { func (dht *IpfsDHT) getValueSingle(p *peer.Peer, key u.Key, timeout time.Duration, level int) (*PBDHTMessage, error) {
pmes := DHTMessage{ pmes := DHTMessage{

View File

@ -31,8 +31,8 @@ func (l *logDhtRpc) EndLog() {
func (l *logDhtRpc) Print() { func (l *logDhtRpc) Print() {
b, err := json.Marshal(l) b, err := json.Marshal(l)
if err != nil { if err != nil {
u.POut(err.Error()) u.DOut(err.Error())
} else { } else {
u.POut(string(b)) u.DOut(string(b))
} }
} }

View File

@ -88,13 +88,9 @@ func TestGetFailures(t *testing.T) {
d.Update(other) d.Update(other)
// This one should time out // This one should time out
_, err := d.GetValue(u.Key("test"), time.Millisecond*5) _, err := d.GetValue(u.Key("test"), time.Millisecond*10)
if err != nil { if err != nil {
nerr, ok := err.(*u.IpfsError) if err != u.ErrTimeout {
if !ok {
t.Fatal("Got different error than we expected.")
}
if nerr.Inner != u.ErrTimeout {
t.Fatal("Got different error than we expected.") t.Fatal("Got different error than we expected.")
} }
} else { } else {
@ -119,10 +115,10 @@ func TestGetFailures(t *testing.T) {
}) })
// This one should fail with NotFound // This one should fail with NotFound
_, err = d.GetValue(u.Key("test"), time.Millisecond*5) _, err = d.GetValue(u.Key("test"), time.Millisecond*1000)
if err != nil { if err != nil {
if err != u.ErrNotFound { if err != u.ErrNotFound {
t.Fatal("Expected ErrNotFound, got: %s", err) t.Fatalf("Expected ErrNotFound, got: %s", err)
} }
} else { } else {
t.Fatal("expected error, got none.") t.Fatal("expected error, got none.")

View File

@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"math/rand" "math/rand"
"sync"
"time" "time"
proto "code.google.com/p/goprotobuf/proto" proto "code.google.com/p/goprotobuf/proto"
@ -56,6 +57,30 @@ func (s *IpfsDHT) PutValue(key u.Key, value []byte) {
} }
} }
// A counter for incrementing a variable across multiple threads
type counter struct {
n int
mut sync.RWMutex
}
func (c *counter) Increment() {
c.mut.Lock()
c.n++
c.mut.Unlock()
}
func (c *counter) Decrement() {
c.mut.Lock()
c.n--
c.mut.Unlock()
}
func (c *counter) Size() int {
c.mut.RLock()
defer c.mut.RUnlock()
return c.n
}
// GetValue searches for the value corresponding to given Key. // GetValue searches for the value corresponding to given Key.
// If the search does not succeed, a multiaddr string of a closer peer is // If the search does not succeed, a multiaddr string of a closer peer is
// returned along with util.ErrSearchIncomplete // returned along with util.ErrSearchIncomplete
@ -65,7 +90,6 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
ll.EndLog() ll.EndLog()
ll.Print() ll.Print()
}() }()
route_level := 0
// If we have it local, dont bother doing an RPC! // If we have it local, dont bother doing an RPC!
// NOTE: this might not be what we want to do... // NOTE: this might not be what we want to do...
@ -76,54 +100,90 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
return val, nil return val, nil
} }
p := s.routes[route_level].NearestPeer(kb.ConvertKey(key)) route_level := 0
if p == nil { closest := s.routes[route_level].NearestPeers(kb.ConvertKey(key), PoolSize)
if closest == nil || len(closest) == 0 {
return nil, kb.ErrLookupFailure return nil, kb.ErrLookupFailure
} }
for route_level < len(s.routes) && p != nil { val_chan := make(chan []byte)
ll.RpcCount++ npeer_chan := make(chan *peer.Peer, 30)
pmes, err := s.getValueSingle(p, key, timeout, route_level) proc_peer := make(chan *peer.Peer, 30)
if err != nil { err_chan := make(chan error)
return nil, u.WrapError(err, "getValue Error") after := time.After(timeout)
}
if pmes.GetSuccess() { for _, p := range closest {
if pmes.Value == nil { // We were given provider[s] npeer_chan <- p
ll.RpcCount++ }
return s.getFromPeerList(key, timeout, pmes.GetPeers(), route_level)
c := counter{}
// This limit value is referred to as k in the kademlia paper
limit := 20
count := 0
go func() {
for {
select {
case p := <-npeer_chan:
count++
if count >= limit {
break
}
c.Increment()
proc_peer <- p
default:
if c.Size() == 0 {
err_chan <- u.ErrNotFound
}
} }
}
}()
// Success! We were given the value process := func() {
ll.Success = true for {
return pmes.GetValue(), nil select {
} else { case p, ok := <-proc_peer:
// We were given a closer node if !ok || p == nil {
closers := pmes.GetPeers() c.Decrement()
if len(closers) > 0 { return
if peer.ID(closers[0].GetId()).Equal(s.self.ID) {
u.DOut("Got myself back as a closer peer.")
return nil, u.ErrNotFound
} }
maddr, err := ma.NewMultiaddr(closers[0].GetAddr()) val, peers, err := s.getValueOrPeers(p, key, timeout/4, route_level)
if err != nil { if err != nil {
// ??? Move up route level??? u.DErr(err.Error())
panic("not yet implemented") c.Decrement()
}
np, err := s.network.GetConnection(peer.ID(closers[0].GetId()), maddr)
if err != nil {
u.PErr("[%s] Failed to connect to: %s", s.self.ID.Pretty(), closers[0].GetAddr())
route_level++
continue continue
} }
p = np if val != nil {
} else { val_chan <- val
route_level++ c.Decrement()
return
}
for _, np := range peers {
// TODO: filter out peers that arent closer
npeer_chan <- np
}
c.Decrement()
} }
} }
} }
return nil, u.ErrNotFound
concurFactor := 3
for i := 0; i < concurFactor; i++ {
go process()
}
select {
case val := <-val_chan:
close(npeer_chan)
return val, nil
case err := <-err_chan:
close(npeer_chan)
return nil, err
case <-after:
close(npeer_chan)
return nil, u.ErrTimeout
}
} }
// Value provider layer of indirection. // Value provider layer of indirection.