diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 04935ea82..ea8e1d861 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -244,12 +244,14 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) { } iVal, err := dht.datastore.Get(dskey) if err == nil { + u.DOut("handleGetValue success!") resp.Success = true resp.Value = iVal.([]byte) } else if err == ds.ErrNotFound { // Check if we know any providers for the requested value provs, ok := dht.providers[u.Key(pmes.GetKey())] if ok && len(provs) > 0 { + u.DOut("handleGetValue returning %d provider[s]", len(provs)) for _, prov := range provs { resp.Peers = append(resp.Peers, prov.Value) } @@ -265,13 +267,21 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) { } else { level = int(pmes.GetValue()[0]) // Using value field to specify cluster level } + u.DOut("handleGetValue searching level %d clusters", level) closer := dht.routes[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey()))) + if closer.ID.Equal(dht.self.ID) { + u.DOut("Attempted to return self! this shouldnt happen...") + resp.Peers = nil + goto out + } // If this peer is closer than the one from the table, return nil if kb.Closer(dht.self.ID, closer.ID, u.Key(pmes.GetKey())) { resp.Peers = nil + u.DOut("handleGetValue could not find a closer node than myself.") } else { + u.DOut("handleGetValue returning a closer peer: '%s'", closer.ID.Pretty()) resp.Peers = []*peer.Peer{closer} } } @@ -280,6 +290,7 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) { panic(err) } +out: mes := swarm.NewMessage(p, resp.ToProtobuf()) dht.network.Send(mes) } @@ -349,9 +360,17 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *PBDHTMessage) { providers := dht.providers[u.Key(pmes.GetKey())] dht.providerLock.RUnlock() if providers == nil || len(providers) == 0 { - // TODO: work on tiering this - closer := dht.routes[0].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey()))) - resp.Peers = []*peer.Peer{closer} + level := 0 + if len(pmes.GetValue()) > 0 { + level = int(pmes.GetValue()[0]) + } + + closer := dht.routes[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey()))) + if kb.Closer(dht.self.ID, closer.ID, u.Key(pmes.GetKey())) { + resp.Peers = nil + } else { + resp.Peers = []*peer.Peer{closer} + } } else { for _, prov := range providers { resp.Peers = append(resp.Peers, prov.Value) @@ -626,3 +645,60 @@ func (dht *IpfsDHT) PrintTables() { route.Print() } } + +func (dht *IpfsDHT) findProvidersSingle(p *peer.Peer, key u.Key, level int, timeout time.Duration) (*PBDHTMessage, error) { + pmes := DHTMessage{ + Type: PBDHTMessage_GET_PROVIDERS, + Key: string(key), + Id: GenerateMessageID(), + Value: []byte{byte(level)}, + } + + mes := swarm.NewMessage(p, pmes.ToProtobuf()) + + listenChan := dht.ListenFor(pmes.Id, 1, time.Minute) + dht.network.Send(mes) + after := time.After(timeout) + select { + case <-after: + dht.Unlisten(pmes.Id) + return nil, u.ErrTimeout + case resp := <-listenChan: + u.DOut("FindProviders: got response.") + pmes_out := new(PBDHTMessage) + err := proto.Unmarshal(resp.Data, pmes_out) + if err != nil { + return nil, err + } + + return pmes_out, nil + } +} + +func (dht *IpfsDHT) addPeerList(key u.Key, peers []*PBDHTMessage_PBPeer) []*peer.Peer { + var prov_arr []*peer.Peer + for _, prov := range peers { + // Dont add outselves to the list + if peer.ID(prov.GetId()).Equal(dht.self.ID) { + continue + } + // Dont add someone who is already on the list + p := dht.network.Find(u.Key(prov.GetId())) + if p == nil { + u.DOut("given provider %s was not in our network already.", peer.ID(prov.GetId()).Pretty()) + maddr, err := ma.NewMultiaddr(prov.GetAddr()) + if err != nil { + u.PErr("error connecting to new peer: %s", err) + continue + } + p, err = dht.network.GetConnection(peer.ID(prov.GetId()), maddr) + if err != nil { + u.PErr("error connecting to new peer: %s", err) + continue + } + } + dht.addProviderEntry(key, p) + prov_arr = append(prov_arr, p) + } + return prov_arr +} diff --git a/routing/dht/dht_logger.go b/routing/dht/dht_logger.go new file mode 100644 index 000000000..c363add7b --- /dev/null +++ b/routing/dht/dht_logger.go @@ -0,0 +1,38 @@ +package dht + +import ( + "encoding/json" + "time" + + u "github.com/jbenet/go-ipfs/util" +) + +type logDhtRpc struct { + Type string + Start time.Time + End time.Time + Duration time.Duration + RpcCount int + Success bool +} + +func startNewRpc(name string) *logDhtRpc { + r := new(logDhtRpc) + r.Type = name + r.Start = time.Now() + return r +} + +func (l *logDhtRpc) EndLog() { + l.End = time.Now() + l.Duration = l.End.Sub(l.Start) +} + +func (l *logDhtRpc) Print() { + b, err := json.Marshal(l) + if err != nil { + u.POut(err.Error()) + } else { + u.POut(string(b)) + } +} diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index 2b0fb4a6b..a7e14d703 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -156,7 +156,7 @@ func TestValueGetSet(t *testing.T) { } if string(val) != "world" { - t.Fatalf("Expected 'world' got %s", string(val)) + t.Fatalf("Expected 'world' got '%s'", string(val)) } } diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 6dc4aa060..9923961d1 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -60,12 +60,19 @@ func (s *IpfsDHT) PutValue(key u.Key, value []byte) { // If the search does not succeed, a multiaddr string of a closer peer is // returned along with util.ErrSearchIncomplete func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { + ll := startNewRpc("GET") + defer func() { + ll.EndLog() + ll.Print() + }() route_level := 0 // If we have it local, dont bother doing an RPC! // NOTE: this might not be what we want to do... - val,err := s.GetLocal(key) - if err != nil { + val, err := s.GetLocal(key) + if err == nil { + ll.Success = true + u.DOut("Found local, returning.") return val, nil } @@ -74,11 +81,8 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { return nil, kb.ErrLookupFailure } - if kb.Closer(s.self.ID, p.ID, key) { - return nil, u.ErrNotFound - } - for route_level < len(s.routes) && p != nil { + ll.RpcCount++ pmes, err := s.getValueSingle(p, key, timeout, route_level) if err != nil { return nil, u.WrapError(err, "getValue Error") @@ -86,16 +90,19 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { if pmes.GetSuccess() { if pmes.Value == nil { // We were given provider[s] + ll.RpcCount++ return s.getFromPeerList(key, timeout, pmes.GetPeers(), route_level) } // Success! We were given the value + ll.Success = true return pmes.GetValue(), nil } else { // We were given a closer node closers := pmes.GetPeers() if len(closers) > 0 { if peer.ID(closers[0].GetId()).Equal(s.self.ID) { + u.DOut("Got myself back as a closer peer.") return nil, u.ErrNotFound } maddr, err := ma.NewMultiaddr(closers[0].GetAddr()) @@ -108,6 +115,7 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { if err != nil { u.PErr("[%s] Failed to connect to: %s", s.self.ID.Pretty(), closers[0].GetAddr()) route_level++ + continue } p = np } else { @@ -143,60 +151,52 @@ func (s *IpfsDHT) Provide(key u.Key) error { // FindProviders searches for peers who can provide the value for given key. func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) { + ll := startNewRpc("FindProviders") + defer func() { + ll.EndLog() + ll.Print() + }() + u.DOut("Find providers for: '%s'", key) p := s.routes[0].NearestPeer(kb.ConvertKey(key)) if p == nil { return nil, kb.ErrLookupFailure } - pmes := DHTMessage{ - Type: PBDHTMessage_GET_PROVIDERS, - Key: string(key), - Id: GenerateMessageID(), - } - - mes := swarm.NewMessage(p, pmes.ToProtobuf()) - - listenChan := s.ListenFor(pmes.Id, 1, time.Minute) - u.DOut("Find providers for: '%s'", key) - s.network.Send(mes) - after := time.After(timeout) - select { - case <-after: - s.Unlisten(pmes.Id) - return nil, u.ErrTimeout - case resp := <-listenChan: - u.DOut("FindProviders: got response.") - pmes_out := new(PBDHTMessage) - err := proto.Unmarshal(resp.Data, pmes_out) + for level := 0; level < len(s.routes); { + pmes, err := s.findProvidersSingle(p, key, level, timeout) if err != nil { return nil, err } - - var prov_arr []*peer.Peer - for _, prov := range pmes_out.GetPeers() { - if peer.ID(prov.GetId()).Equal(s.self.ID) { + if pmes.GetSuccess() { + provs := s.addPeerList(key, pmes.GetPeers()) + ll.Success = true + return provs, nil + } else { + closer := pmes.GetPeers() + if len(closer) == 0 { + level++ continue } - p := s.network.Find(u.Key(prov.GetId())) - if p == nil { - u.DOut("given provider %s was not in our network already.", peer.ID(prov.GetId()).Pretty()) - maddr, err := ma.NewMultiaddr(prov.GetAddr()) - if err != nil { - u.PErr("error connecting to new peer: %s", err) - continue - } - p, err = s.network.GetConnection(peer.ID(prov.GetId()), maddr) - if err != nil { - u.PErr("error connecting to new peer: %s", err) - continue - } + if peer.ID(closer[0].GetId()).Equal(s.self.ID) { + u.DOut("Got myself back as a closer peer.") + return nil, u.ErrNotFound + } + maddr, err := ma.NewMultiaddr(closer[0].GetAddr()) + if err != nil { + // ??? Move up route level??? + panic("not yet implemented") } - s.addProviderEntry(key, p) - prov_arr = append(prov_arr, p) - } - return prov_arr, nil + np, err := s.network.GetConnection(peer.ID(closer[0].GetId()), maddr) + if err != nil { + u.PErr("[%s] Failed to connect to: %s", s.self.ID.Pretty(), closer[0].GetAddr()) + level++ + continue + } + p = np + } } + return nil, u.ErrNotFound } // Find specific Peer