1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-26 07:28:20 +08:00

fix a few infinitely looping RPCs

This commit is contained in:
Jeromy
2014-08-14 08:32:17 -07:00
parent 8542380e8d
commit 60d061cb49
4 changed files with 165 additions and 51 deletions

View File

@ -244,12 +244,14 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) {
}
iVal, err := dht.datastore.Get(dskey)
if err == nil {
u.DOut("handleGetValue success!")
resp.Success = true
resp.Value = iVal.([]byte)
} else if err == ds.ErrNotFound {
// Check if we know any providers for the requested value
provs, ok := dht.providers[u.Key(pmes.GetKey())]
if ok && len(provs) > 0 {
u.DOut("handleGetValue returning %d provider[s]", len(provs))
for _, prov := range provs {
resp.Peers = append(resp.Peers, prov.Value)
}
@ -265,13 +267,21 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) {
} else {
level = int(pmes.GetValue()[0]) // Using value field to specify cluster level
}
u.DOut("handleGetValue searching level %d clusters", level)
closer := dht.routes[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
if closer.ID.Equal(dht.self.ID) {
u.DOut("Attempted to return self! this shouldnt happen...")
resp.Peers = nil
goto out
}
// If this peer is closer than the one from the table, return nil
if kb.Closer(dht.self.ID, closer.ID, u.Key(pmes.GetKey())) {
resp.Peers = nil
u.DOut("handleGetValue could not find a closer node than myself.")
} else {
u.DOut("handleGetValue returning a closer peer: '%s'", closer.ID.Pretty())
resp.Peers = []*peer.Peer{closer}
}
}
@ -280,6 +290,7 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) {
panic(err)
}
out:
mes := swarm.NewMessage(p, resp.ToProtobuf())
dht.network.Send(mes)
}
@ -349,9 +360,17 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *PBDHTMessage) {
providers := dht.providers[u.Key(pmes.GetKey())]
dht.providerLock.RUnlock()
if providers == nil || len(providers) == 0 {
// TODO: work on tiering this
closer := dht.routes[0].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
resp.Peers = []*peer.Peer{closer}
level := 0
if len(pmes.GetValue()) > 0 {
level = int(pmes.GetValue()[0])
}
closer := dht.routes[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
if kb.Closer(dht.self.ID, closer.ID, u.Key(pmes.GetKey())) {
resp.Peers = nil
} else {
resp.Peers = []*peer.Peer{closer}
}
} else {
for _, prov := range providers {
resp.Peers = append(resp.Peers, prov.Value)
@ -626,3 +645,60 @@ func (dht *IpfsDHT) PrintTables() {
route.Print()
}
}
func (dht *IpfsDHT) findProvidersSingle(p *peer.Peer, key u.Key, level int, timeout time.Duration) (*PBDHTMessage, error) {
pmes := DHTMessage{
Type: PBDHTMessage_GET_PROVIDERS,
Key: string(key),
Id: GenerateMessageID(),
Value: []byte{byte(level)},
}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
listenChan := dht.ListenFor(pmes.Id, 1, time.Minute)
dht.network.Send(mes)
after := time.After(timeout)
select {
case <-after:
dht.Unlisten(pmes.Id)
return nil, u.ErrTimeout
case resp := <-listenChan:
u.DOut("FindProviders: got response.")
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
if err != nil {
return nil, err
}
return pmes_out, nil
}
}
func (dht *IpfsDHT) addPeerList(key u.Key, peers []*PBDHTMessage_PBPeer) []*peer.Peer {
var prov_arr []*peer.Peer
for _, prov := range peers {
// Dont add outselves to the list
if peer.ID(prov.GetId()).Equal(dht.self.ID) {
continue
}
// Dont add someone who is already on the list
p := dht.network.Find(u.Key(prov.GetId()))
if p == nil {
u.DOut("given provider %s was not in our network already.", peer.ID(prov.GetId()).Pretty())
maddr, err := ma.NewMultiaddr(prov.GetAddr())
if err != nil {
u.PErr("error connecting to new peer: %s", err)
continue
}
p, err = dht.network.GetConnection(peer.ID(prov.GetId()), maddr)
if err != nil {
u.PErr("error connecting to new peer: %s", err)
continue
}
}
dht.addProviderEntry(key, p)
prov_arr = append(prov_arr, p)
}
return prov_arr
}

38
routing/dht/dht_logger.go Normal file
View File

@ -0,0 +1,38 @@
package dht
import (
"encoding/json"
"time"
u "github.com/jbenet/go-ipfs/util"
)
type logDhtRpc struct {
Type string
Start time.Time
End time.Time
Duration time.Duration
RpcCount int
Success bool
}
func startNewRpc(name string) *logDhtRpc {
r := new(logDhtRpc)
r.Type = name
r.Start = time.Now()
return r
}
func (l *logDhtRpc) EndLog() {
l.End = time.Now()
l.Duration = l.End.Sub(l.Start)
}
func (l *logDhtRpc) Print() {
b, err := json.Marshal(l)
if err != nil {
u.POut(err.Error())
} else {
u.POut(string(b))
}
}

View File

@ -156,7 +156,7 @@ func TestValueGetSet(t *testing.T) {
}
if string(val) != "world" {
t.Fatalf("Expected 'world' got %s", string(val))
t.Fatalf("Expected 'world' got '%s'", string(val))
}
}

View File

@ -60,12 +60,19 @@ func (s *IpfsDHT) PutValue(key u.Key, value []byte) {
// If the search does not succeed, a multiaddr string of a closer peer is
// returned along with util.ErrSearchIncomplete
func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
ll := startNewRpc("GET")
defer func() {
ll.EndLog()
ll.Print()
}()
route_level := 0
// If we have it local, dont bother doing an RPC!
// NOTE: this might not be what we want to do...
val,err := s.GetLocal(key)
if err != nil {
val, err := s.GetLocal(key)
if err == nil {
ll.Success = true
u.DOut("Found local, returning.")
return val, nil
}
@ -74,11 +81,8 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
return nil, kb.ErrLookupFailure
}
if kb.Closer(s.self.ID, p.ID, key) {
return nil, u.ErrNotFound
}
for route_level < len(s.routes) && p != nil {
ll.RpcCount++
pmes, err := s.getValueSingle(p, key, timeout, route_level)
if err != nil {
return nil, u.WrapError(err, "getValue Error")
@ -86,16 +90,19 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
if pmes.GetSuccess() {
if pmes.Value == nil { // We were given provider[s]
ll.RpcCount++
return s.getFromPeerList(key, timeout, pmes.GetPeers(), route_level)
}
// Success! We were given the value
ll.Success = true
return pmes.GetValue(), nil
} else {
// We were given a closer node
closers := pmes.GetPeers()
if len(closers) > 0 {
if peer.ID(closers[0].GetId()).Equal(s.self.ID) {
u.DOut("Got myself back as a closer peer.")
return nil, u.ErrNotFound
}
maddr, err := ma.NewMultiaddr(closers[0].GetAddr())
@ -108,6 +115,7 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
if err != nil {
u.PErr("[%s] Failed to connect to: %s", s.self.ID.Pretty(), closers[0].GetAddr())
route_level++
continue
}
p = np
} else {
@ -143,60 +151,52 @@ func (s *IpfsDHT) Provide(key u.Key) error {
// FindProviders searches for peers who can provide the value for given key.
func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) {
ll := startNewRpc("FindProviders")
defer func() {
ll.EndLog()
ll.Print()
}()
u.DOut("Find providers for: '%s'", key)
p := s.routes[0].NearestPeer(kb.ConvertKey(key))
if p == nil {
return nil, kb.ErrLookupFailure
}
pmes := DHTMessage{
Type: PBDHTMessage_GET_PROVIDERS,
Key: string(key),
Id: GenerateMessageID(),
}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
listenChan := s.ListenFor(pmes.Id, 1, time.Minute)
u.DOut("Find providers for: '%s'", key)
s.network.Send(mes)
after := time.After(timeout)
select {
case <-after:
s.Unlisten(pmes.Id)
return nil, u.ErrTimeout
case resp := <-listenChan:
u.DOut("FindProviders: got response.")
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
for level := 0; level < len(s.routes); {
pmes, err := s.findProvidersSingle(p, key, level, timeout)
if err != nil {
return nil, err
}
var prov_arr []*peer.Peer
for _, prov := range pmes_out.GetPeers() {
if peer.ID(prov.GetId()).Equal(s.self.ID) {
if pmes.GetSuccess() {
provs := s.addPeerList(key, pmes.GetPeers())
ll.Success = true
return provs, nil
} else {
closer := pmes.GetPeers()
if len(closer) == 0 {
level++
continue
}
p := s.network.Find(u.Key(prov.GetId()))
if p == nil {
u.DOut("given provider %s was not in our network already.", peer.ID(prov.GetId()).Pretty())
maddr, err := ma.NewMultiaddr(prov.GetAddr())
if err != nil {
u.PErr("error connecting to new peer: %s", err)
continue
}
p, err = s.network.GetConnection(peer.ID(prov.GetId()), maddr)
if err != nil {
u.PErr("error connecting to new peer: %s", err)
continue
}
if peer.ID(closer[0].GetId()).Equal(s.self.ID) {
u.DOut("Got myself back as a closer peer.")
return nil, u.ErrNotFound
}
maddr, err := ma.NewMultiaddr(closer[0].GetAddr())
if err != nil {
// ??? Move up route level???
panic("not yet implemented")
}
s.addProviderEntry(key, p)
prov_arr = append(prov_arr, p)
}
return prov_arr, nil
np, err := s.network.GetConnection(peer.ID(closer[0].GetId()), maddr)
if err != nil {
u.PErr("[%s] Failed to connect to: %s", s.self.ID.Pretty(), closer[0].GetAddr())
level++
continue
}
p = np
}
}
return nil, u.ErrNotFound
}
// Find specific Peer