diff --git a/identify/identify.go b/identify/identify.go new file mode 100644 index 000000000..20ad21c9a --- /dev/null +++ b/identify/identify.go @@ -0,0 +1,20 @@ +// The identify package handles how peers identify with eachother upon +// connection to the network +package identify + +import ( + peer "github.com/jbenet/go-ipfs/peer" + u "github.com/jbenet/go-ipfs/util" +) + +// Perform initial communication with this peer to share node ID's and +// initiate communication +func Handshake(self, remote *peer.Peer, in, out chan []byte) error { + // TODO: make this more... secure. + out <- self.ID + resp := <-in + remote.ID = peer.ID(resp) + u.DOut("[%s] identify: Got node id: %s", self.ID.Pretty(), remote.ID.Pretty()) + + return nil +} diff --git a/identify/message.proto b/identify/message.proto new file mode 100644 index 000000000..71804c883 --- /dev/null +++ b/identify/message.proto @@ -0,0 +1,3 @@ +message Identify { + required bytes id = 1; +} diff --git a/peer/peer.go b/peer/peer.go index e7c3af2b4..144189f58 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -1,6 +1,10 @@ package peer import ( + "sync" + "time" + + b58 "github.com/jbenet/go-base58" u "github.com/jbenet/go-ipfs/util" ma "github.com/jbenet/go-multiaddr" mh "github.com/jbenet/go-multihash" @@ -12,8 +16,12 @@ import ( type ID mh.Multihash // Utililty function for comparing two peer ID's -func (id *ID) Equal(other *ID) bool { - return bytes.Equal(*id, *other) +func (id ID) Equal(other ID) bool { + return bytes.Equal(id, other) +} + +func (id ID) Pretty() string { + return b58.Encode(id) } // Map maps Key (string) : *Peer (slices are not comparable). @@ -24,6 +32,9 @@ type Map map[u.Key]*Peer type Peer struct { ID ID Addresses []*ma.Multiaddr + + latency time.Duration + latenLock sync.RWMutex } // Key returns the ID as a Key (string) for maps. @@ -52,3 +63,18 @@ func (p *Peer) NetAddress(n string) *ma.Multiaddr { } return nil } + +func (p *Peer) GetLatency() (out time.Duration) { + p.latenLock.RLock() + out = p.latency + p.latenLock.RUnlock() + return +} + +// TODO: Instead of just keeping a single number, +// keep a running average over the last hour or so +func (p *Peer) SetLatency(laten time.Duration) { + p.latenLock.Lock() + p.latency = laten + p.latenLock.Unlock() +} diff --git a/routing/dht/DHTMessage.go b/routing/dht/DHTMessage.go new file mode 100644 index 000000000..e2034d7e0 --- /dev/null +++ b/routing/dht/DHTMessage.go @@ -0,0 +1,49 @@ +package dht + +import ( + peer "github.com/jbenet/go-ipfs/peer" +) + +// A helper struct to make working with protbuf types easier +type DHTMessage struct { + Type PBDHTMessage_MessageType + Key string + Value []byte + Response bool + Id uint64 + Success bool + Peers []*peer.Peer +} + +func peerInfo(p *peer.Peer) *PBDHTMessage_PBPeer { + pbp := new(PBDHTMessage_PBPeer) + addr, err := p.Addresses[0].String() + if err != nil { + //Temp: what situations could cause this? + panic(err) + } + pbp.Addr = &addr + pid := string(p.ID) + pbp.Id = &pid + return pbp +} + +// TODO: building the protobuf message this way is a little wasteful +// Unused fields wont be omitted, find a better way to do this +func (m *DHTMessage) ToProtobuf() *PBDHTMessage { + pmes := new(PBDHTMessage) + if m.Value != nil { + pmes.Value = m.Value + } + + pmes.Type = &m.Type + pmes.Key = &m.Key + pmes.Response = &m.Response + pmes.Id = &m.Id + pmes.Success = &m.Success + for _, p := range m.Peers { + pmes.Peers = append(pmes.Peers, peerInfo(p)) + } + + return pmes +} diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 0f55ba45b..c28ca0a0f 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -1,11 +1,17 @@ package dht import ( + "bytes" + "errors" "sync" + "time" - peer "github.com/jbenet/go-ipfs/peer" - swarm "github.com/jbenet/go-ipfs/swarm" - u "github.com/jbenet/go-ipfs/util" + peer "github.com/jbenet/go-ipfs/peer" + kb "github.com/jbenet/go-ipfs/routing/kbucket" + swarm "github.com/jbenet/go-ipfs/swarm" + u "github.com/jbenet/go-ipfs/util" + + ma "github.com/jbenet/go-multiaddr" ds "github.com/jbenet/datastore.go" @@ -17,9 +23,11 @@ import ( // IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications. // It is used to implement the base IpfsRouting module. type IpfsDHT struct { - routes RoutingTable + // Array of routing tables for differently distanced nodes + // NOTE: (currently, only a single table is used) + routes []*kb.RoutingTable - network *swarm.Swarm + network swarm.Network // Local peer (yourself) self *peer.Peer @@ -27,96 +35,221 @@ type IpfsDHT struct { // Local data datastore ds.Datastore - // map of channels waiting for reply messages - listeners map[uint64]chan *swarm.Message - listenLock sync.RWMutex + // Map keys to peers that can provide their value + providers map[u.Key][]*providerInfo + providerLock sync.RWMutex // Signal to shutdown dht shutdown chan struct{} + + // When this peer started up + birth time.Time + + //lock to make diagnostics work better + diaglock sync.Mutex + + // listener is a server to register to listen for responses to messages + listener *MesListener } -func NewDHT(p *peer.Peer) *IpfsDHT { +// NewDHT creates a new DHT object with the given peer as the 'local' host +func NewDHT(p *peer.Peer, net swarm.Network) *IpfsDHT { dht := new(IpfsDHT) + dht.network = net + dht.datastore = ds.NewMapDatastore() dht.self = p - dht.network = swarm.NewSwarm(p) - dht.listeners = make(map[uint64]chan *swarm.Message) + dht.providers = make(map[u.Key][]*providerInfo) dht.shutdown = make(chan struct{}) + + dht.routes = make([]*kb.RoutingTable, 3) + dht.routes[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*30) + dht.routes[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*100) + dht.routes[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Hour) + + dht.listener = NewMesListener() + dht.birth = time.Now() return dht } +// Start up background goroutines needed by the DHT +func (dht *IpfsDHT) Start() { + go dht.handleMessages() +} + +// Connect to a new peer at the given address, ping and add to the routing table +func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) { + maddrstr, _ := addr.String() + u.DOut("Connect to new peer: %s", maddrstr) + npeer, err := dht.network.ConnectNew(addr) + if err != nil { + return nil, err + } + + // Ping new peer to register in their routing table + // NOTE: this should be done better... + err = dht.Ping(npeer, time.Second*2) + if err != nil { + return nil, errors.New("failed to ping newly connected peer") + } + + dht.Update(npeer) + + return npeer, nil +} + // Read in all messages from swarm and handle them appropriately // NOTE: this function is just a quick sketch func (dht *IpfsDHT) handleMessages() { + u.DOut("Begin message handling routine") + + checkTimeouts := time.NewTicker(time.Minute * 5) + ch := dht.network.GetChan() for { select { - case mes := <-dht.network.Chan.Incoming: - pmes := new(DHTMessage) + case mes, ok := <-ch.Incoming: + if !ok { + u.DOut("handleMessages closing, bad recv on incoming") + return + } + pmes := new(PBDHTMessage) err := proto.Unmarshal(mes.Data, pmes) if err != nil { u.PErr("Failed to decode protobuf message: %s", err) continue } + dht.Update(mes.Peer) + // Note: not sure if this is the correct place for this if pmes.GetResponse() { - dht.listenLock.RLock() - ch, ok := dht.listeners[pmes.GetId()] - dht.listenLock.RUnlock() - if ok { - ch <- mes - } - - // this is expected behaviour during a timeout - u.DOut("Received response with nobody listening...") + dht.listener.Respond(pmes.GetId(), mes) continue } // + u.DOut("[peer: %s]\nGot message type: '%s' [id = %x, from = %s]", + dht.self.ID.Pretty(), + PBDHTMessage_MessageType_name[int32(pmes.GetType())], + pmes.GetId(), mes.Peer.ID.Pretty()) switch pmes.GetType() { - case DHTMessage_GET_VALUE: + case PBDHTMessage_GET_VALUE: dht.handleGetValue(mes.Peer, pmes) - case DHTMessage_PUT_VALUE: + case PBDHTMessage_PUT_VALUE: dht.handlePutValue(mes.Peer, pmes) - case DHTMessage_FIND_NODE: - dht.handleFindNode(mes.Peer, pmes) - case DHTMessage_ADD_PROVIDER: - case DHTMessage_GET_PROVIDERS: - case DHTMessage_PING: - dht.handleFindNode(mes.Peer, pmes) + case PBDHTMessage_FIND_NODE: + dht.handleFindPeer(mes.Peer, pmes) + case PBDHTMessage_ADD_PROVIDER: + dht.handleAddProvider(mes.Peer, pmes) + case PBDHTMessage_GET_PROVIDERS: + dht.handleGetProviders(mes.Peer, pmes) + case PBDHTMessage_PING: + dht.handlePing(mes.Peer, pmes) + case PBDHTMessage_DIAGNOSTIC: + dht.handleDiagnostic(mes.Peer, pmes) } + case err := <-ch.Errors: + u.PErr("dht err: %s", err) case <-dht.shutdown: + checkTimeouts.Stop() return + case <-checkTimeouts.C: + // Time to collect some garbage! + dht.cleanExpiredProviders() } } } -func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) { - dskey := ds.NewKey(pmes.GetKey()) - i_val, err := dht.datastore.Get(dskey) - if err == nil { - isResponse := true - resp := new(DHTMessage) - resp.Response = &isResponse - resp.Id = pmes.Id - resp.Key = pmes.Key - - val := i_val.([]byte) - resp.Value = val - - mes := new(swarm.Message) - mes.Peer = p - mes.Data = []byte(resp.String()) - } else if err == ds.ErrNotFound { - // Find closest node(s) to desired key and reply with that info - // TODO: this will need some other metadata in the protobuf message - // to signal to the querying node that the data its receiving - // is actually a list of other nodes +func (dht *IpfsDHT) cleanExpiredProviders() { + dht.providerLock.Lock() + for k, parr := range dht.providers { + var cleaned []*providerInfo + for _, v := range parr { + if time.Since(v.Creation) < time.Hour { + cleaned = append(cleaned, v) + } + } + dht.providers[k] = cleaned } + dht.providerLock.Unlock() } -// Store a value in this nodes local storage -func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) { +func (dht *IpfsDHT) putValueToNetwork(p *peer.Peer, key string, value []byte) error { + pmes := DHTMessage{ + Type: PBDHTMessage_PUT_VALUE, + Key: key, + Value: value, + Id: GenerateMessageID(), + } + + mes := swarm.NewMessage(p, pmes.ToProtobuf()) + dht.network.Send(mes) + return nil +} + +func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) { + u.DOut("handleGetValue for key: %s", pmes.GetKey()) + dskey := ds.NewKey(pmes.GetKey()) + resp := &DHTMessage{ + Response: true, + Id: pmes.GetId(), + Key: pmes.GetKey(), + } + iVal, err := dht.datastore.Get(dskey) + if err == nil { + u.DOut("handleGetValue success!") + resp.Success = true + resp.Value = iVal.([]byte) + } else if err == ds.ErrNotFound { + // Check if we know any providers for the requested value + provs, ok := dht.providers[u.Key(pmes.GetKey())] + if ok && len(provs) > 0 { + u.DOut("handleGetValue returning %d provider[s]", len(provs)) + for _, prov := range provs { + resp.Peers = append(resp.Peers, prov.Value) + } + resp.Success = true + } else { + // No providers? + // Find closest peer on given cluster to desired key and reply with that info + + level := 0 + if len(pmes.GetValue()) < 1 { + // TODO: maybe return an error? Defaulting isnt a good idea IMO + u.PErr("handleGetValue: no routing level specified, assuming 0") + } else { + level = int(pmes.GetValue()[0]) // Using value field to specify cluster level + } + u.DOut("handleGetValue searching level %d clusters", level) + + closer := dht.routes[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey()))) + + if closer.ID.Equal(dht.self.ID) { + u.DOut("Attempted to return self! this shouldnt happen...") + resp.Peers = nil + goto out + } + // If this peer is closer than the one from the table, return nil + if kb.Closer(dht.self.ID, closer.ID, u.Key(pmes.GetKey())) { + resp.Peers = nil + u.DOut("handleGetValue could not find a closer node than myself.") + } else { + u.DOut("handleGetValue returning a closer peer: '%s'", closer.ID.Pretty()) + resp.Peers = []*peer.Peer{closer} + } + } + } else { + //temp: what other errors can a datastore return? + panic(err) + } + +out: + mes := swarm.NewMessage(p, resp.ToProtobuf()) + dht.network.Send(mes) +} + +// Store a value in this peer local storage +func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *PBDHTMessage) { dskey := ds.NewKey(pmes.GetKey()) err := dht.datastore.Put(dskey, pmes.GetValue()) if err != nil { @@ -125,46 +258,399 @@ func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) { } } -func (dht *IpfsDHT) handleFindNode(p *peer.Peer, pmes *DHTMessage) { - panic("Not implemented.") -} - -func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) { - isResponse := true - resp := new(DHTMessage) - resp.Id = pmes.Id - resp.Response = &isResponse - - mes := new(swarm.Message) - mes.Peer = p - mes.Data = []byte(resp.String()) - dht.network.Chan.Outgoing <- mes -} - - -// Register a handler for a specific message ID, used for getting replies -// to certain messages (i.e. response to a GET_VALUE message) -func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan *swarm.Message { - lchan := make(chan *swarm.Message) - dht.listenLock.Lock() - dht.listeners[mesid] = lchan - dht.listenLock.Unlock() - return lchan -} - -func (dht *IpfsDHT) Unlisten(mesid uint64) { - dht.listenLock.Lock() - ch, ok := dht.listeners[mesid] - if ok { - delete(dht.listeners, mesid) +func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *PBDHTMessage) { + resp := DHTMessage{ + Type: pmes.GetType(), + Response: true, + Id: pmes.GetId(), } - dht.listenLock.Unlock() - close(ch) + + dht.network.Send(swarm.NewMessage(p, resp.ToProtobuf())) } +func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *PBDHTMessage) { + resp := DHTMessage{ + Type: pmes.GetType(), + Id: pmes.GetId(), + Response: true, + } + defer func() { + mes := swarm.NewMessage(p, resp.ToProtobuf()) + dht.network.Send(mes) + }() + level := pmes.GetValue()[0] + u.DOut("handleFindPeer: searching for '%s'", peer.ID(pmes.GetKey()).Pretty()) + closest := dht.routes[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey()))) + if closest == nil { + u.PErr("handleFindPeer: could not find anything.") + return + } -// Stop all communications from this node and shut down + if len(closest.Addresses) == 0 { + u.PErr("handleFindPeer: no addresses for connected peer...") + return + } + + // If the found peer further away than this peer... + if kb.Closer(dht.self.ID, closest.ID, u.Key(pmes.GetKey())) { + return + } + + u.DOut("handleFindPeer: sending back '%s'", closest.ID.Pretty()) + resp.Peers = []*peer.Peer{closest} + resp.Success = true +} + +func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *PBDHTMessage) { + resp := DHTMessage{ + Type: PBDHTMessage_GET_PROVIDERS, + Key: pmes.GetKey(), + Id: pmes.GetId(), + Response: true, + } + + dht.providerLock.RLock() + providers := dht.providers[u.Key(pmes.GetKey())] + dht.providerLock.RUnlock() + if providers == nil || len(providers) == 0 { + level := 0 + if len(pmes.GetValue()) > 0 { + level = int(pmes.GetValue()[0]) + } + + closer := dht.routes[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey()))) + if kb.Closer(dht.self.ID, closer.ID, u.Key(pmes.GetKey())) { + resp.Peers = nil + } else { + resp.Peers = []*peer.Peer{closer} + } + } else { + for _, prov := range providers { + resp.Peers = append(resp.Peers, prov.Value) + } + resp.Success = true + } + + mes := swarm.NewMessage(p, resp.ToProtobuf()) + dht.network.Send(mes) +} + +type providerInfo struct { + Creation time.Time + Value *peer.Peer +} + +func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *PBDHTMessage) { + //TODO: need to implement TTLs on providers + key := u.Key(pmes.GetKey()) + dht.addProviderEntry(key, p) +} + +// Stop all communications from this peer and shut down func (dht *IpfsDHT) Halt() { dht.shutdown <- struct{}{} dht.network.Close() } + +func (dht *IpfsDHT) addProviderEntry(key u.Key, p *peer.Peer) { + u.DOut("Adding %s as provider for '%s'", p.Key().Pretty(), key) + dht.providerLock.Lock() + provs := dht.providers[key] + dht.providers[key] = append(provs, &providerInfo{time.Now(), p}) + dht.providerLock.Unlock() +} + +// NOTE: not yet finished, low priority +func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *PBDHTMessage) { + seq := dht.routes[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10) + listenChan := dht.listener.Listen(pmes.GetId(), len(seq), time.Second*30) + + for _, ps := range seq { + mes := swarm.NewMessage(ps, pmes) + dht.network.Send(mes) + } + + buf := new(bytes.Buffer) + di := dht.getDiagInfo() + buf.Write(di.Marshal()) + + // NOTE: this shouldnt be a hardcoded value + after := time.After(time.Second * 20) + count := len(seq) + for count > 0 { + select { + case <-after: + //Timeout, return what we have + goto out + case req_resp := <-listenChan: + pmes_out := new(PBDHTMessage) + err := proto.Unmarshal(req_resp.Data, pmes_out) + if err != nil { + // It broke? eh, whatever, keep going + continue + } + buf.Write(req_resp.Data) + count-- + } + } + +out: + resp := DHTMessage{ + Type: PBDHTMessage_DIAGNOSTIC, + Id: pmes.GetId(), + Value: buf.Bytes(), + Response: true, + } + + mes := swarm.NewMessage(p, resp.ToProtobuf()) + dht.network.Send(mes) +} + +func (dht *IpfsDHT) getValueOrPeers(p *peer.Peer, key u.Key, timeout time.Duration, level int) ([]byte, []*peer.Peer, error) { + pmes, err := dht.getValueSingle(p, key, timeout, level) + if err != nil { + return nil, nil, err + } + + if pmes.GetSuccess() { + if pmes.Value == nil { // We were given provider[s] + val, err := dht.getFromPeerList(key, timeout, pmes.GetPeers(), level) + if err != nil { + return nil, nil, err + } + return val, nil, nil + } + + // Success! We were given the value + return pmes.GetValue(), nil, nil + } else { + // We were given a closer node + var peers []*peer.Peer + for _, pb := range pmes.GetPeers() { + if peer.ID(pb.GetId()).Equal(dht.self.ID) { + continue + } + addr, err := ma.NewMultiaddr(pb.GetAddr()) + if err != nil { + u.PErr(err.Error()) + continue + } + + np, err := dht.network.GetConnection(peer.ID(pb.GetId()), addr) + if err != nil { + u.PErr(err.Error()) + continue + } + + peers = append(peers, np) + } + return nil, peers, nil + } +} + +// getValueSingle simply performs the get value RPC with the given parameters +func (dht *IpfsDHT) getValueSingle(p *peer.Peer, key u.Key, timeout time.Duration, level int) (*PBDHTMessage, error) { + pmes := DHTMessage{ + Type: PBDHTMessage_GET_VALUE, + Key: string(key), + Value: []byte{byte(level)}, + Id: GenerateMessageID(), + } + response_chan := dht.listener.Listen(pmes.Id, 1, time.Minute) + + mes := swarm.NewMessage(p, pmes.ToProtobuf()) + t := time.Now() + dht.network.Send(mes) + + // Wait for either the response or a timeout + timeup := time.After(timeout) + select { + case <-timeup: + dht.listener.Unlisten(pmes.Id) + return nil, u.ErrTimeout + case resp, ok := <-response_chan: + if !ok { + u.PErr("response channel closed before timeout, please investigate.") + return nil, u.ErrTimeout + } + roundtrip := time.Since(t) + resp.Peer.SetLatency(roundtrip) + pmes_out := new(PBDHTMessage) + err := proto.Unmarshal(resp.Data, pmes_out) + if err != nil { + return nil, err + } + return pmes_out, nil + } +} + +// TODO: Im not certain on this implementation, we get a list of peers/providers +// from someone what do we do with it? Connect to each of them? randomly pick +// one to get the value from? Or just connect to one at a time until we get a +// successful connection and request the value from it? +func (dht *IpfsDHT) getFromPeerList(key u.Key, timeout time.Duration, + peerlist []*PBDHTMessage_PBPeer, level int) ([]byte, error) { + for _, pinfo := range peerlist { + p, _ := dht.Find(peer.ID(pinfo.GetId())) + if p == nil { + maddr, err := ma.NewMultiaddr(pinfo.GetAddr()) + if err != nil { + u.PErr("getValue error: %s", err) + continue + } + + p, err = dht.network.GetConnection(peer.ID(pinfo.GetId()), maddr) + if err != nil { + u.PErr("getValue error: %s", err) + continue + } + } + pmes, err := dht.getValueSingle(p, key, timeout, level) + if err != nil { + u.DErr("getFromPeers error: %s", err) + continue + } + dht.addProviderEntry(key, p) + + // Make sure it was a successful get + if pmes.GetSuccess() && pmes.Value != nil { + return pmes.GetValue(), nil + } + } + return nil, u.ErrNotFound +} + +func (dht *IpfsDHT) GetLocal(key u.Key) ([]byte, error) { + v, err := dht.datastore.Get(ds.NewKey(string(key))) + if err != nil { + return nil, err + } + return v.([]byte), nil +} + +func (dht *IpfsDHT) PutLocal(key u.Key, value []byte) error { + return dht.datastore.Put(ds.NewKey(string(key)), value) +} + +func (dht *IpfsDHT) Update(p *peer.Peer) { + for _, route := range dht.routes { + removed := route.Update(p) + // Only drop the connection if no tables refer to this peer + if removed != nil { + found := false + for _, r := range dht.routes { + if r.Find(removed.ID) != nil { + found = true + break + } + } + if !found { + dht.network.Drop(removed) + } + } + } +} + +// Look for a peer with a given ID connected to this dht +func (dht *IpfsDHT) Find(id peer.ID) (*peer.Peer, *kb.RoutingTable) { + for _, table := range dht.routes { + p := table.Find(id) + if p != nil { + return p, table + } + } + return nil, nil +} + +func (dht *IpfsDHT) findPeerSingle(p *peer.Peer, id peer.ID, timeout time.Duration, level int) (*PBDHTMessage, error) { + pmes := DHTMessage{ + Type: PBDHTMessage_FIND_NODE, + Key: string(id), + Id: GenerateMessageID(), + Value: []byte{byte(level)}, + } + + mes := swarm.NewMessage(p, pmes.ToProtobuf()) + listenChan := dht.listener.Listen(pmes.Id, 1, time.Minute) + t := time.Now() + dht.network.Send(mes) + after := time.After(timeout) + select { + case <-after: + dht.listener.Unlisten(pmes.Id) + return nil, u.ErrTimeout + case resp := <-listenChan: + roundtrip := time.Since(t) + resp.Peer.SetLatency(roundtrip) + pmes_out := new(PBDHTMessage) + err := proto.Unmarshal(resp.Data, pmes_out) + if err != nil { + return nil, err + } + + return pmes_out, nil + } +} + +func (dht *IpfsDHT) PrintTables() { + for _, route := range dht.routes { + route.Print() + } +} + +func (dht *IpfsDHT) findProvidersSingle(p *peer.Peer, key u.Key, level int, timeout time.Duration) (*PBDHTMessage, error) { + pmes := DHTMessage{ + Type: PBDHTMessage_GET_PROVIDERS, + Key: string(key), + Id: GenerateMessageID(), + Value: []byte{byte(level)}, + } + + mes := swarm.NewMessage(p, pmes.ToProtobuf()) + + listenChan := dht.listener.Listen(pmes.Id, 1, time.Minute) + dht.network.Send(mes) + after := time.After(timeout) + select { + case <-after: + dht.listener.Unlisten(pmes.Id) + return nil, u.ErrTimeout + case resp := <-listenChan: + u.DOut("FindProviders: got response.") + pmes_out := new(PBDHTMessage) + err := proto.Unmarshal(resp.Data, pmes_out) + if err != nil { + return nil, err + } + + return pmes_out, nil + } +} + +func (dht *IpfsDHT) addPeerList(key u.Key, peers []*PBDHTMessage_PBPeer) []*peer.Peer { + var prov_arr []*peer.Peer + for _, prov := range peers { + // Dont add outselves to the list + if peer.ID(prov.GetId()).Equal(dht.self.ID) { + continue + } + // Dont add someone who is already on the list + p := dht.network.Find(u.Key(prov.GetId())) + if p == nil { + u.DOut("given provider %s was not in our network already.", peer.ID(prov.GetId()).Pretty()) + maddr, err := ma.NewMultiaddr(prov.GetAddr()) + if err != nil { + u.PErr("error connecting to new peer: %s", err) + continue + } + p, err = dht.network.GetConnection(peer.ID(prov.GetId()), maddr) + if err != nil { + u.PErr("error connecting to new peer: %s", err) + continue + } + } + dht.addProviderEntry(key, p) + prov_arr = append(prov_arr, p) + } + return prov_arr +} diff --git a/routing/dht/dht_logger.go b/routing/dht/dht_logger.go new file mode 100644 index 000000000..c892959f0 --- /dev/null +++ b/routing/dht/dht_logger.go @@ -0,0 +1,38 @@ +package dht + +import ( + "encoding/json" + "time" + + u "github.com/jbenet/go-ipfs/util" +) + +type logDhtRpc struct { + Type string + Start time.Time + End time.Time + Duration time.Duration + RpcCount int + Success bool +} + +func startNewRpc(name string) *logDhtRpc { + r := new(logDhtRpc) + r.Type = name + r.Start = time.Now() + return r +} + +func (l *logDhtRpc) EndLog() { + l.End = time.Now() + l.Duration = l.End.Sub(l.Start) +} + +func (l *logDhtRpc) Print() { + b, err := json.Marshal(l) + if err != nil { + u.DOut(err.Error()) + } else { + u.DOut(string(b)) + } +} diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go new file mode 100644 index 000000000..a7e14d703 --- /dev/null +++ b/routing/dht/dht_test.go @@ -0,0 +1,290 @@ +package dht + +import ( + "testing" + + peer "github.com/jbenet/go-ipfs/peer" + swarm "github.com/jbenet/go-ipfs/swarm" + u "github.com/jbenet/go-ipfs/util" + ma "github.com/jbenet/go-multiaddr" + + "fmt" + "time" +) + +func setupDHTS(n int, t *testing.T) ([]*ma.Multiaddr, []*peer.Peer, []*IpfsDHT) { + var addrs []*ma.Multiaddr + for i := 0; i < 4; i++ { + a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 5000+i)) + if err != nil { + t.Fatal(err) + } + addrs = append(addrs, a) + } + + var peers []*peer.Peer + for i := 0; i < 4; i++ { + p := new(peer.Peer) + p.AddAddress(addrs[i]) + p.ID = peer.ID([]byte(fmt.Sprintf("peer_%d", i))) + peers = append(peers, p) + } + + var dhts []*IpfsDHT + for i := 0; i < 4; i++ { + net := swarm.NewSwarm(peers[i]) + err := net.Listen() + if err != nil { + t.Fatal(err) + } + d := NewDHT(peers[i], net) + dhts = append(dhts, d) + d.Start() + } + + return addrs, peers, dhts +} + +func TestPing(t *testing.T) { + u.Debug = false + addr_a, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/2222") + if err != nil { + t.Fatal(err) + } + addr_b, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5678") + if err != nil { + t.Fatal(err) + } + + peer_a := new(peer.Peer) + peer_a.AddAddress(addr_a) + peer_a.ID = peer.ID([]byte("peer_a")) + + peer_b := new(peer.Peer) + peer_b.AddAddress(addr_b) + peer_b.ID = peer.ID([]byte("peer_b")) + + neta := swarm.NewSwarm(peer_a) + err = neta.Listen() + if err != nil { + t.Fatal(err) + } + dht_a := NewDHT(peer_a, neta) + + netb := swarm.NewSwarm(peer_b) + err = netb.Listen() + if err != nil { + t.Fatal(err) + } + dht_b := NewDHT(peer_b, netb) + + dht_a.Start() + dht_b.Start() + + _, err = dht_a.Connect(addr_b) + if err != nil { + t.Fatal(err) + } + + //Test that we can ping the node + err = dht_a.Ping(peer_b, time.Second*2) + if err != nil { + t.Fatal(err) + } + + dht_a.Halt() + dht_b.Halt() +} + +func TestValueGetSet(t *testing.T) { + u.Debug = false + addr_a, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1235") + if err != nil { + t.Fatal(err) + } + addr_b, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5679") + if err != nil { + t.Fatal(err) + } + + peer_a := new(peer.Peer) + peer_a.AddAddress(addr_a) + peer_a.ID = peer.ID([]byte("peer_a")) + + peer_b := new(peer.Peer) + peer_b.AddAddress(addr_b) + peer_b.ID = peer.ID([]byte("peer_b")) + + neta := swarm.NewSwarm(peer_a) + err = neta.Listen() + if err != nil { + t.Fatal(err) + } + dht_a := NewDHT(peer_a, neta) + + netb := swarm.NewSwarm(peer_b) + err = netb.Listen() + if err != nil { + t.Fatal(err) + } + dht_b := NewDHT(peer_b, netb) + + dht_a.Start() + dht_b.Start() + + errsa := dht_a.network.GetChan().Errors + errsb := dht_b.network.GetChan().Errors + go func() { + select { + case err := <-errsa: + t.Fatal(err) + case err := <-errsb: + t.Fatal(err) + } + }() + + _, err = dht_a.Connect(addr_b) + if err != nil { + t.Fatal(err) + } + + dht_a.PutValue("hello", []byte("world")) + + val, err := dht_a.GetValue("hello", time.Second*2) + if err != nil { + t.Fatal(err) + } + + if string(val) != "world" { + t.Fatalf("Expected 'world' got '%s'", string(val)) + } +} + +func TestProvides(t *testing.T) { + u.Debug = false + + addrs, _, dhts := setupDHTS(4, t) + + _, err := dhts[0].Connect(addrs[1]) + if err != nil { + t.Fatal(err) + } + + _, err = dhts[1].Connect(addrs[2]) + if err != nil { + t.Fatal(err) + } + + _, err = dhts[1].Connect(addrs[3]) + if err != nil { + t.Fatal(err) + } + + err = dhts[3].PutLocal(u.Key("hello"), []byte("world")) + if err != nil { + t.Fatal(err) + } + + err = dhts[3].Provide(u.Key("hello")) + if err != nil { + t.Fatal(err) + } + + time.Sleep(time.Millisecond * 60) + + provs, err := dhts[0].FindProviders(u.Key("hello"), time.Second) + if err != nil { + t.Fatal(err) + } + + if len(provs) != 1 { + t.Fatal("Didnt get back providers") + } + + for i := 0; i < 4; i++ { + dhts[i].Halt() + } +} + +func TestLayeredGet(t *testing.T) { + u.Debug = false + addrs, _, dhts := setupDHTS(4, t) + + _, err := dhts[0].Connect(addrs[1]) + if err != nil { + t.Fatalf("Failed to connect: %s", err) + } + + _, err = dhts[1].Connect(addrs[2]) + if err != nil { + t.Fatal(err) + } + + _, err = dhts[1].Connect(addrs[3]) + if err != nil { + t.Fatal(err) + } + + err = dhts[3].PutLocal(u.Key("hello"), []byte("world")) + if err != nil { + t.Fatal(err) + } + + err = dhts[3].Provide(u.Key("hello")) + if err != nil { + t.Fatal(err) + } + + time.Sleep(time.Millisecond * 60) + + val, err := dhts[0].GetValue(u.Key("hello"), time.Second) + if err != nil { + t.Fatal(err) + } + + if string(val) != "world" { + t.Fatal("Got incorrect value.") + } + + for i := 0; i < 4; i++ { + dhts[i].Halt() + } +} + +func TestFindPeer(t *testing.T) { + u.Debug = false + + addrs, peers, dhts := setupDHTS(4, t) + + _, err := dhts[0].Connect(addrs[1]) + if err != nil { + t.Fatal(err) + } + + _, err = dhts[1].Connect(addrs[2]) + if err != nil { + t.Fatal(err) + } + + _, err = dhts[1].Connect(addrs[3]) + if err != nil { + t.Fatal(err) + } + + p, err := dhts[0].FindPeer(peers[2].ID, time.Second) + if err != nil { + t.Fatal(err) + } + + if p == nil { + t.Fatal("Failed to find peer.") + } + + if !p.ID.Equal(peers[2].ID) { + t.Fatal("Didnt find expected peer.") + } + + for i := 0; i < 4; i++ { + dhts[i].Halt() + } +} diff --git a/routing/dht/diag.go b/routing/dht/diag.go new file mode 100644 index 000000000..03997c5e7 --- /dev/null +++ b/routing/dht/diag.go @@ -0,0 +1,43 @@ +package dht + +import ( + "encoding/json" + "time" + + peer "github.com/jbenet/go-ipfs/peer" +) + +type connDiagInfo struct { + Latency time.Duration + Id peer.ID +} + +type diagInfo struct { + Id peer.ID + Connections []connDiagInfo + Keys []string + LifeSpan time.Duration + CodeVersion string +} + +func (di *diagInfo) Marshal() []byte { + b, err := json.Marshal(di) + if err != nil { + panic(err) + } + //TODO: also consider compressing this. There will be a lot of these + return b +} + +func (dht *IpfsDHT) getDiagInfo() *diagInfo { + di := new(diagInfo) + di.CodeVersion = "github.com/jbenet/go-ipfs" + di.Id = dht.self.ID + di.LifeSpan = time.Since(dht.birth) + di.Keys = nil // Currently no way to query datastore + + for _, p := range dht.routes[0].Listpeers() { + di.Connections = append(di.Connections, connDiagInfo{p.GetLatency(), p.ID}) + } + return di +} diff --git a/routing/dht/ext_test.go b/routing/dht/ext_test.go new file mode 100644 index 000000000..490c9f493 --- /dev/null +++ b/routing/dht/ext_test.go @@ -0,0 +1,152 @@ +package dht + +import ( + "testing" + + "code.google.com/p/goprotobuf/proto" + + peer "github.com/jbenet/go-ipfs/peer" + swarm "github.com/jbenet/go-ipfs/swarm" + u "github.com/jbenet/go-ipfs/util" + ma "github.com/jbenet/go-multiaddr" + + "time" +) + +// fauxNet is a standin for a swarm.Network in order to more easily recreate +// different testing scenarios +type fauxNet struct { + Chan *swarm.Chan + handlers []mesHandleFunc + + swarm.Network +} + +// mesHandleFunc is a function that takes in outgoing messages +// and can respond to them, simulating other peers on the network. +// returning nil will chose not to respond and pass the message onto the +// next registered handler +type mesHandleFunc func(*swarm.Message) *swarm.Message + +func newFauxNet() *fauxNet { + fn := new(fauxNet) + fn.Chan = swarm.NewChan(8) + + return fn +} + +// Instead of 'Listening' Start up a goroutine that will check +// all outgoing messages against registered message handlers, +// and reply if needed +func (f *fauxNet) Listen() error { + go func() { + for { + select { + case in := <-f.Chan.Outgoing: + for _, h := range f.handlers { + reply := h(in) + if reply != nil { + f.Chan.Incoming <- reply + break + } + } + } + } + }() + return nil +} + +func (f *fauxNet) AddHandler(fn func(*swarm.Message) *swarm.Message) { + f.handlers = append(f.handlers, fn) +} + +func (f *fauxNet) Send(mes *swarm.Message) { + f.Chan.Outgoing <- mes +} + +func (f *fauxNet) GetChan() *swarm.Chan { + return f.Chan +} + +func (f *fauxNet) Connect(addr *ma.Multiaddr) (*peer.Peer, error) { + return nil, nil +} + +func TestGetFailures(t *testing.T) { + fn := newFauxNet() + fn.Listen() + + local := new(peer.Peer) + local.ID = peer.ID("test_peer") + + d := NewDHT(local, fn) + + other := &peer.Peer{ID: peer.ID("other_peer")} + + d.Start() + + d.Update(other) + + // This one should time out + _, err := d.GetValue(u.Key("test"), time.Millisecond*10) + if err != nil { + if err != u.ErrTimeout { + t.Fatal("Got different error than we expected.") + } + } else { + t.Fatal("Did not get expected error!") + } + + // Reply with failures to every message + fn.AddHandler(func(mes *swarm.Message) *swarm.Message { + pmes := new(PBDHTMessage) + err := proto.Unmarshal(mes.Data, pmes) + if err != nil { + t.Fatal(err) + } + + resp := DHTMessage{ + Type: pmes.GetType(), + Id: pmes.GetId(), + Response: true, + Success: false, + } + return swarm.NewMessage(mes.Peer, resp.ToProtobuf()) + }) + + // This one should fail with NotFound + _, err = d.GetValue(u.Key("test"), time.Millisecond*1000) + if err != nil { + if err != u.ErrNotFound { + t.Fatalf("Expected ErrNotFound, got: %s", err) + } + } else { + t.Fatal("expected error, got none.") + } + + success := make(chan struct{}) + fn.handlers = nil + fn.AddHandler(func(mes *swarm.Message) *swarm.Message { + resp := new(PBDHTMessage) + err := proto.Unmarshal(mes.Data, resp) + if err != nil { + t.Fatal(err) + } + if resp.GetSuccess() { + t.Fatal("Get returned success when it shouldnt have.") + } + success <- struct{}{} + return nil + }) + + // Now we test this DHT's handleGetValue failure + req := DHTMessage{ + Type: PBDHTMessage_GET_VALUE, + Key: "hello", + Id: GenerateMessageID(), + Value: []byte{0}, + } + fn.Chan.Incoming <- swarm.NewMessage(other, req.ToProtobuf()) + + <-success +} diff --git a/routing/dht/messages.pb.go b/routing/dht/messages.pb.go index 3283ef4e2..a852c5e1f 100644 --- a/routing/dht/messages.pb.go +++ b/routing/dht/messages.pb.go @@ -9,7 +9,7 @@ It is generated from these files: messages.proto It has these top-level messages: - DHTMessage + PBDHTMessage */ package dht @@ -20,101 +20,142 @@ import math "math" var _ = proto.Marshal var _ = math.Inf -type DHTMessage_MessageType int32 +type PBDHTMessage_MessageType int32 const ( - DHTMessage_PUT_VALUE DHTMessage_MessageType = 0 - DHTMessage_GET_VALUE DHTMessage_MessageType = 1 - DHTMessage_ADD_PROVIDER DHTMessage_MessageType = 2 - DHTMessage_GET_PROVIDERS DHTMessage_MessageType = 3 - DHTMessage_FIND_NODE DHTMessage_MessageType = 4 - DHTMessage_PING DHTMessage_MessageType = 5 + PBDHTMessage_PUT_VALUE PBDHTMessage_MessageType = 0 + PBDHTMessage_GET_VALUE PBDHTMessage_MessageType = 1 + PBDHTMessage_ADD_PROVIDER PBDHTMessage_MessageType = 2 + PBDHTMessage_GET_PROVIDERS PBDHTMessage_MessageType = 3 + PBDHTMessage_FIND_NODE PBDHTMessage_MessageType = 4 + PBDHTMessage_PING PBDHTMessage_MessageType = 5 + PBDHTMessage_DIAGNOSTIC PBDHTMessage_MessageType = 6 ) -var DHTMessage_MessageType_name = map[int32]string{ +var PBDHTMessage_MessageType_name = map[int32]string{ 0: "PUT_VALUE", 1: "GET_VALUE", 2: "ADD_PROVIDER", 3: "GET_PROVIDERS", 4: "FIND_NODE", 5: "PING", + 6: "DIAGNOSTIC", } -var DHTMessage_MessageType_value = map[string]int32{ +var PBDHTMessage_MessageType_value = map[string]int32{ "PUT_VALUE": 0, "GET_VALUE": 1, "ADD_PROVIDER": 2, "GET_PROVIDERS": 3, "FIND_NODE": 4, "PING": 5, + "DIAGNOSTIC": 6, } -func (x DHTMessage_MessageType) Enum() *DHTMessage_MessageType { - p := new(DHTMessage_MessageType) +func (x PBDHTMessage_MessageType) Enum() *PBDHTMessage_MessageType { + p := new(PBDHTMessage_MessageType) *p = x return p } -func (x DHTMessage_MessageType) String() string { - return proto.EnumName(DHTMessage_MessageType_name, int32(x)) +func (x PBDHTMessage_MessageType) String() string { + return proto.EnumName(PBDHTMessage_MessageType_name, int32(x)) } -func (x *DHTMessage_MessageType) UnmarshalJSON(data []byte) error { - value, err := proto.UnmarshalJSONEnum(DHTMessage_MessageType_value, data, "DHTMessage_MessageType") +func (x *PBDHTMessage_MessageType) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(PBDHTMessage_MessageType_value, data, "PBDHTMessage_MessageType") if err != nil { return err } - *x = DHTMessage_MessageType(value) + *x = PBDHTMessage_MessageType(value) return nil } -type DHTMessage struct { - Type *DHTMessage_MessageType `protobuf:"varint,1,req,name=type,enum=dht.DHTMessage_MessageType" json:"type,omitempty"` - Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"` - Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"` - // Unique ID of this message, used to match queries with responses - Id *uint64 `protobuf:"varint,4,req,name=id" json:"id,omitempty"` - // Signals whether or not this message is a response to another message - Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"` - XXX_unrecognized []byte `json:"-"` +type PBDHTMessage struct { + Type *PBDHTMessage_MessageType `protobuf:"varint,1,req,name=type,enum=dht.PBDHTMessage_MessageType" json:"type,omitempty"` + Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"` + Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"` + Id *uint64 `protobuf:"varint,4,req,name=id" json:"id,omitempty"` + Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"` + Success *bool `protobuf:"varint,6,opt,name=success" json:"success,omitempty"` + Peers []*PBDHTMessage_PBPeer `protobuf:"bytes,7,rep,name=peers" json:"peers,omitempty"` + XXX_unrecognized []byte `json:"-"` } -func (m *DHTMessage) Reset() { *m = DHTMessage{} } -func (m *DHTMessage) String() string { return proto.CompactTextString(m) } -func (*DHTMessage) ProtoMessage() {} +func (m *PBDHTMessage) Reset() { *m = PBDHTMessage{} } +func (m *PBDHTMessage) String() string { return proto.CompactTextString(m) } +func (*PBDHTMessage) ProtoMessage() {} -func (m *DHTMessage) GetType() DHTMessage_MessageType { +func (m *PBDHTMessage) GetType() PBDHTMessage_MessageType { if m != nil && m.Type != nil { return *m.Type } - return DHTMessage_PUT_VALUE + return PBDHTMessage_PUT_VALUE } -func (m *DHTMessage) GetKey() string { +func (m *PBDHTMessage) GetKey() string { if m != nil && m.Key != nil { return *m.Key } return "" } -func (m *DHTMessage) GetValue() []byte { +func (m *PBDHTMessage) GetValue() []byte { if m != nil { return m.Value } return nil } -func (m *DHTMessage) GetId() uint64 { +func (m *PBDHTMessage) GetId() uint64 { if m != nil && m.Id != nil { return *m.Id } return 0 } -func (m *DHTMessage) GetResponse() bool { +func (m *PBDHTMessage) GetResponse() bool { if m != nil && m.Response != nil { return *m.Response } return false } -func init() { - proto.RegisterEnum("dht.DHTMessage_MessageType", DHTMessage_MessageType_name, DHTMessage_MessageType_value) +func (m *PBDHTMessage) GetSuccess() bool { + if m != nil && m.Success != nil { + return *m.Success + } + return false +} + +func (m *PBDHTMessage) GetPeers() []*PBDHTMessage_PBPeer { + if m != nil { + return m.Peers + } + return nil +} + +type PBDHTMessage_PBPeer struct { + Id *string `protobuf:"bytes,1,req,name=id" json:"id,omitempty"` + Addr *string `protobuf:"bytes,2,req,name=addr" json:"addr,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *PBDHTMessage_PBPeer) Reset() { *m = PBDHTMessage_PBPeer{} } +func (m *PBDHTMessage_PBPeer) String() string { return proto.CompactTextString(m) } +func (*PBDHTMessage_PBPeer) ProtoMessage() {} + +func (m *PBDHTMessage_PBPeer) GetId() string { + if m != nil && m.Id != nil { + return *m.Id + } + return "" +} + +func (m *PBDHTMessage_PBPeer) GetAddr() string { + if m != nil && m.Addr != nil { + return *m.Addr + } + return "" +} + +func init() { + proto.RegisterEnum("dht.PBDHTMessage_MessageType", PBDHTMessage_MessageType_name, PBDHTMessage_MessageType_value) } diff --git a/routing/dht/messages.proto b/routing/dht/messages.proto index d873c7559..4d4e8c61f 100644 --- a/routing/dht/messages.proto +++ b/routing/dht/messages.proto @@ -2,7 +2,7 @@ package dht; //run `protoc --go_out=. *.proto` to generate -message DHTMessage { +message PBDHTMessage { enum MessageType { PUT_VALUE = 0; GET_VALUE = 1; @@ -10,6 +10,12 @@ message DHTMessage { GET_PROVIDERS = 3; FIND_NODE = 4; PING = 5; + DIAGNOSTIC = 6; + } + + message PBPeer { + required string id = 1; + required string addr = 2; } required MessageType type = 1; @@ -21,4 +27,8 @@ message DHTMessage { // Signals whether or not this message is a response to another message optional bool response = 5; + optional bool success = 6; + + // Used for returning peers from queries (normally, peers closer to X) + repeated PBPeer peers = 7; } diff --git a/routing/dht/routing.go b/routing/dht/routing.go index e9ed64d98..e56a54e0c 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -1,17 +1,30 @@ package dht import ( + "bytes" + "encoding/json" + "errors" "math/rand" + "sync" "time" + proto "code.google.com/p/goprotobuf/proto" + + ma "github.com/jbenet/go-multiaddr" + peer "github.com/jbenet/go-ipfs/peer" + kb "github.com/jbenet/go-ipfs/routing/kbucket" swarm "github.com/jbenet/go-ipfs/swarm" u "github.com/jbenet/go-ipfs/util" ) +// Pool size is the number of nodes used for group find/set RPC calls +var PoolSize = 6 + // TODO: determine a way of creating and managing message IDs func GenerateMessageID() uint64 { - return uint64(rand.Uint32()) << 32 & uint64(rand.Uint32()) + //return (uint64(rand.Uint32()) << 32) & uint64(rand.Uint32()) + return uint64(rand.Uint32()) } // This file implements the Routing interface for the IpfsDHT struct. @@ -19,56 +32,190 @@ func GenerateMessageID() uint64 { // Basic Put/Get // PutValue adds value corresponding to given Key. -func (s *IpfsDHT) PutValue(key u.Key, value []byte) error { - var p *peer.Peer - p = s.routes.NearestNode(key) +// This is the top level "Store" operation of the DHT +func (s *IpfsDHT) PutValue(key u.Key, value []byte) { + complete := make(chan struct{}) + for _, route := range s.routes { + p := route.NearestPeer(kb.ConvertKey(key)) + if p == nil { + s.network.Error(kb.ErrLookupFailure) + go func() { + complete <- struct{}{} + }() + continue + } + go func() { + err := s.putValueToNetwork(p, string(key), value) + if err != nil { + s.network.Error(err) + } + complete <- struct{}{} + }() + } + for _, _ = range s.routes { + <-complete + } +} - pmes_type := DHTMessage_PUT_VALUE - str_key := string(key) - mes_id := GenerateMessageID() +// A counter for incrementing a variable across multiple threads +type counter struct { + n int + mut sync.RWMutex +} - pmes := new(DHTMessage) - pmes.Type = &pmes_type - pmes.Key = &str_key - pmes.Value = value - pmes.Id = &mes_id +func (c *counter) Increment() { + c.mut.Lock() + c.n++ + c.mut.Unlock() +} - mes := new(swarm.Message) - mes.Data = []byte(pmes.String()) - mes.Peer = p +func (c *counter) Decrement() { + c.mut.Lock() + c.n-- + c.mut.Unlock() +} - s.network.Chan.Outgoing <- mes - return nil +func (c *counter) Size() int { + c.mut.RLock() + defer c.mut.RUnlock() + return c.n +} + +type peerSet struct { + ps map[string]bool + lk sync.RWMutex +} + +func newPeerSet() *peerSet { + ps := new(peerSet) + ps.ps = make(map[string]bool) + return ps +} + +func (ps *peerSet) Add(p *peer.Peer) { + ps.lk.Lock() + ps.ps[string(p.ID)] = true + ps.lk.Unlock() +} + +func (ps *peerSet) Contains(p *peer.Peer) bool { + ps.lk.RLock() + _, ok := ps.ps[string(p.ID)] + ps.lk.RUnlock() + return ok +} + +func (ps *peerSet) Size() int { + ps.lk.RLock() + defer ps.lk.RUnlock() + return len(ps.ps) } // GetValue searches for the value corresponding to given Key. +// If the search does not succeed, a multiaddr string of a closer peer is +// returned along with util.ErrSearchIncomplete func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { - var p *peer.Peer - p = s.routes.NearestNode(key) + ll := startNewRpc("GET") + defer func() { + ll.EndLog() + ll.Print() + }() - str_key := string(key) - mes_type := DHTMessage_GET_VALUE - mes_id := GenerateMessageID() - // protobuf structure - pmes := new(DHTMessage) - pmes.Type = &mes_type - pmes.Key = &str_key - pmes.Id = &mes_id + // If we have it local, dont bother doing an RPC! + // NOTE: this might not be what we want to do... + val, err := s.GetLocal(key) + if err == nil { + ll.Success = true + u.DOut("Found local, returning.") + return val, nil + } - mes := new(swarm.Message) - mes.Data = []byte(pmes.String()) - mes.Peer = p + route_level := 0 + closest := s.routes[route_level].NearestPeers(kb.ConvertKey(key), PoolSize) + if closest == nil || len(closest) == 0 { + return nil, kb.ErrLookupFailure + } - response_chan := s.ListenFor(*pmes.Id) + val_chan := make(chan []byte) + npeer_chan := make(chan *peer.Peer, 30) + proc_peer := make(chan *peer.Peer, 30) + err_chan := make(chan error) + after := time.After(timeout) + pset := newPeerSet() + + for _, p := range closest { + pset.Add(p) + npeer_chan <- p + } + + c := counter{} + + // This limit value is referred to as k in the kademlia paper + limit := 20 + count := 0 + go func() { + for { + select { + case p := <-npeer_chan: + count++ + if count >= limit { + break + } + c.Increment() + + proc_peer <- p + default: + if c.Size() == 0 { + err_chan <- u.ErrNotFound + } + } + } + }() + + process := func() { + for { + select { + case p, ok := <-proc_peer: + if !ok || p == nil { + c.Decrement() + return + } + val, peers, err := s.getValueOrPeers(p, key, timeout/4, route_level) + if err != nil { + u.DErr(err.Error()) + c.Decrement() + continue + } + if val != nil { + val_chan <- val + c.Decrement() + return + } + + for _, np := range peers { + // TODO: filter out peers that arent closer + if !pset.Contains(np) && pset.Size() < limit { + pset.Add(np) //This is racey... make a single function to do operation + npeer_chan <- np + } + } + c.Decrement() + } + } + } + + concurFactor := 3 + for i := 0; i < concurFactor; i++ { + go process() + } - // Wait for either the response or a timeout - timeup := time.After(timeout) select { - case <-timeup: - // TODO: unregister listener + case val := <-val_chan: + return val, nil + case err := <-err_chan: + return nil, err + case <-after: return nil, u.ErrTimeout - case resp := <-response_chan: - return resp.Data, nil } } @@ -77,17 +224,196 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { // Announce that this node can provide value for given key func (s *IpfsDHT) Provide(key u.Key) error { - return u.ErrNotImplemented + peers := s.routes[0].NearestPeers(kb.ConvertKey(key), PoolSize) + if len(peers) == 0 { + return kb.ErrLookupFailure + } + + pmes := DHTMessage{ + Type: PBDHTMessage_ADD_PROVIDER, + Key: string(key), + } + pbmes := pmes.ToProtobuf() + + for _, p := range peers { + mes := swarm.NewMessage(p, pbmes) + s.network.Send(mes) + } + return nil } // FindProviders searches for peers who can provide the value for given key. -func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) (*peer.Peer, error) { - return nil, u.ErrNotImplemented +func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) { + ll := startNewRpc("FindProviders") + defer func() { + ll.EndLog() + ll.Print() + }() + u.DOut("Find providers for: '%s'", key) + p := s.routes[0].NearestPeer(kb.ConvertKey(key)) + if p == nil { + return nil, kb.ErrLookupFailure + } + + for level := 0; level < len(s.routes); { + pmes, err := s.findProvidersSingle(p, key, level, timeout) + if err != nil { + return nil, err + } + if pmes.GetSuccess() { + provs := s.addPeerList(key, pmes.GetPeers()) + ll.Success = true + return provs, nil + } else { + closer := pmes.GetPeers() + if len(closer) == 0 { + level++ + continue + } + if peer.ID(closer[0].GetId()).Equal(s.self.ID) { + u.DOut("Got myself back as a closer peer.") + return nil, u.ErrNotFound + } + maddr, err := ma.NewMultiaddr(closer[0].GetAddr()) + if err != nil { + // ??? Move up route level??? + panic("not yet implemented") + } + + np, err := s.network.GetConnection(peer.ID(closer[0].GetId()), maddr) + if err != nil { + u.PErr("[%s] Failed to connect to: %s", s.self.ID.Pretty(), closer[0].GetAddr()) + level++ + continue + } + p = np + } + } + return nil, u.ErrNotFound } // Find specific Peer // FindPeer searches for a peer with given ID. func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) { - return nil, u.ErrNotImplemented + // Check if were already connected to them + p, _ := s.Find(id) + if p != nil { + return p, nil + } + + route_level := 0 + p = s.routes[route_level].NearestPeer(kb.ConvertPeerID(id)) + if p == nil { + return nil, kb.ErrLookupFailure + } + if p.ID.Equal(id) { + return p, nil + } + + for route_level < len(s.routes) { + pmes, err := s.findPeerSingle(p, id, timeout, route_level) + plist := pmes.GetPeers() + if len(plist) == 0 { + route_level++ + } + found := plist[0] + + addr, err := ma.NewMultiaddr(found.GetAddr()) + if err != nil { + return nil, err + } + + nxtPeer, err := s.network.GetConnection(peer.ID(found.GetId()), addr) + if err != nil { + return nil, err + } + if pmes.GetSuccess() { + if !id.Equal(nxtPeer.ID) { + return nil, errors.New("got back invalid peer from 'successful' response") + } + return nxtPeer, nil + } else { + p = nxtPeer + } + } + return nil, u.ErrNotFound +} + +// Ping a peer, log the time it took +func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error { + // Thoughts: maybe this should accept an ID and do a peer lookup? + u.DOut("Enter Ping.") + + pmes := DHTMessage{Id: GenerateMessageID(), Type: PBDHTMessage_PING} + mes := swarm.NewMessage(p, pmes.ToProtobuf()) + + before := time.Now() + response_chan := dht.listener.Listen(pmes.Id, 1, time.Minute) + dht.network.Send(mes) + + tout := time.After(timeout) + select { + case <-response_chan: + roundtrip := time.Since(before) + p.SetLatency(roundtrip) + u.DOut("Ping took %s.", roundtrip.String()) + return nil + case <-tout: + // Timed out, think about removing peer from network + u.DOut("Ping peer timed out.") + dht.listener.Unlisten(pmes.Id) + return u.ErrTimeout + } +} + +func (dht *IpfsDHT) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) { + u.DOut("Begin Diagnostic") + //Send to N closest peers + targets := dht.routes[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10) + + // TODO: Add timeout to this struct so nodes know when to return + pmes := DHTMessage{ + Type: PBDHTMessage_DIAGNOSTIC, + Id: GenerateMessageID(), + } + + listenChan := dht.listener.Listen(pmes.Id, len(targets), time.Minute*2) + + pbmes := pmes.ToProtobuf() + for _, p := range targets { + mes := swarm.NewMessage(p, pbmes) + dht.network.Send(mes) + } + + var out []*diagInfo + after := time.After(timeout) + for count := len(targets); count > 0; { + select { + case <-after: + u.DOut("Diagnostic request timed out.") + return out, u.ErrTimeout + case resp := <-listenChan: + pmes_out := new(PBDHTMessage) + err := proto.Unmarshal(resp.Data, pmes_out) + if err != nil { + // NOTE: here and elsewhere, need to audit error handling, + // some errors should be continued on from + return out, err + } + + dec := json.NewDecoder(bytes.NewBuffer(pmes_out.GetValue())) + for { + di := new(diagInfo) + err := dec.Decode(di) + if err != nil { + break + } + + out = append(out, di) + } + } + } + + return nil, nil } diff --git a/routing/kbucket/bucket.go b/routing/kbucket/bucket.go new file mode 100644 index 000000000..1a55a0f69 --- /dev/null +++ b/routing/kbucket/bucket.go @@ -0,0 +1,87 @@ +package dht + +import ( + "container/list" + "sync" + + peer "github.com/jbenet/go-ipfs/peer" +) + +// Bucket holds a list of peers. +type Bucket struct { + lk sync.RWMutex + list *list.List +} + +func NewBucket() *Bucket { + b := new(Bucket) + b.list = list.New() + return b +} + +func (b *Bucket) Find(id peer.ID) *list.Element { + b.lk.RLock() + defer b.lk.RUnlock() + for e := b.list.Front(); e != nil; e = e.Next() { + if e.Value.(*peer.Peer).ID.Equal(id) { + return e + } + } + return nil +} + +func (b *Bucket) MoveToFront(e *list.Element) { + b.lk.Lock() + b.list.MoveToFront(e) + b.lk.Unlock() +} + +func (b *Bucket) PushFront(p *peer.Peer) { + b.lk.Lock() + b.list.PushFront(p) + b.lk.Unlock() +} + +func (b *Bucket) PopBack() *peer.Peer { + b.lk.Lock() + defer b.lk.Unlock() + last := b.list.Back() + b.list.Remove(last) + return last.Value.(*peer.Peer) +} + +func (b *Bucket) Len() int { + b.lk.RLock() + defer b.lk.RUnlock() + return b.list.Len() +} + +// Splits a buckets peers into two buckets, the methods receiver will have +// peers with CPL equal to cpl, the returned bucket will have peers with CPL +// greater than cpl (returned bucket has closer peers) +func (b *Bucket) Split(cpl int, target ID) *Bucket { + b.lk.Lock() + defer b.lk.Unlock() + + out := list.New() + newbuck := NewBucket() + newbuck.list = out + e := b.list.Front() + for e != nil { + peer_id := ConvertPeerID(e.Value.(*peer.Peer).ID) + peer_cpl := prefLen(peer_id, target) + if peer_cpl > cpl { + cur := e + out.PushBack(e.Value) + e = e.Next() + b.list.Remove(cur) + continue + } + e = e.Next() + } + return newbuck +} + +func (b *Bucket) getIter() *list.Element { + return b.list.Front() +} diff --git a/routing/kbucket/table.go b/routing/kbucket/table.go new file mode 100644 index 000000000..86a7031ce --- /dev/null +++ b/routing/kbucket/table.go @@ -0,0 +1,213 @@ +package dht + +import ( + "container/list" + "fmt" + "sort" + "sync" + "time" + + peer "github.com/jbenet/go-ipfs/peer" + u "github.com/jbenet/go-ipfs/util" +) + +// RoutingTable defines the routing table. +type RoutingTable struct { + + // ID of the local peer + local ID + + // Blanket lock, refine later for better performance + tabLock sync.RWMutex + + // Maximum acceptable latency for peers in this cluster + maxLatency time.Duration + + // kBuckets define all the fingers to other nodes. + Buckets []*Bucket + bucketsize int +} + +func NewRoutingTable(bucketsize int, local_id ID, latency time.Duration) *RoutingTable { + rt := new(RoutingTable) + rt.Buckets = []*Bucket{NewBucket()} + rt.bucketsize = bucketsize + rt.local = local_id + rt.maxLatency = latency + return rt +} + +// Update adds or moves the given peer to the front of its respective bucket +// If a peer gets removed from a bucket, it is returned +func (rt *RoutingTable) Update(p *peer.Peer) *peer.Peer { + rt.tabLock.Lock() + defer rt.tabLock.Unlock() + peer_id := ConvertPeerID(p.ID) + cpl := xor(peer_id, rt.local).commonPrefixLen() + + b_id := cpl + if b_id >= len(rt.Buckets) { + b_id = len(rt.Buckets) - 1 + } + + bucket := rt.Buckets[b_id] + e := bucket.Find(p.ID) + if e == nil { + // New peer, add to bucket + if p.GetLatency() > rt.maxLatency { + // Connection doesnt meet requirements, skip! + return nil + } + bucket.PushFront(p) + + // Are we past the max bucket size? + if bucket.Len() > rt.bucketsize { + if b_id == len(rt.Buckets)-1 { + new_bucket := bucket.Split(b_id, rt.local) + rt.Buckets = append(rt.Buckets, new_bucket) + if new_bucket.Len() > rt.bucketsize { + // TODO: This is a very rare and annoying case + panic("Case not handled.") + } + + // If all elements were on left side of split... + if bucket.Len() > rt.bucketsize { + return bucket.PopBack() + } + } else { + // If the bucket cant split kick out least active node + return bucket.PopBack() + } + } + return nil + } else { + // If the peer is already in the table, move it to the front. + // This signifies that it it "more active" and the less active nodes + // Will as a result tend towards the back of the list + bucket.MoveToFront(e) + return nil + } +} + +// A helper struct to sort peers by their distance to the local node +type peerDistance struct { + p *peer.Peer + distance ID +} + +// peerSorterArr implements sort.Interface to sort peers by xor distance +type peerSorterArr []*peerDistance + +func (p peerSorterArr) Len() int { return len(p) } +func (p peerSorterArr) Swap(a, b int) { p[a], p[b] = p[b], p[a] } +func (p peerSorterArr) Less(a, b int) bool { + return p[a].distance.Less(p[b].distance) +} + +// + +func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) peerSorterArr { + for e := peerList.Front(); e != nil; e = e.Next() { + p := e.Value.(*peer.Peer) + p_id := ConvertPeerID(p.ID) + pd := peerDistance{ + p: p, + distance: xor(target, p_id), + } + peerArr = append(peerArr, &pd) + if e == nil { + u.POut("list element was nil.") + return peerArr + } + } + return peerArr +} + +// Find a specific peer by ID or return nil +func (rt *RoutingTable) Find(id peer.ID) *peer.Peer { + srch := rt.NearestPeers(ConvertPeerID(id), 1) + if len(srch) == 0 || !srch[0].ID.Equal(id) { + return nil + } + return srch[0] +} + +// Returns a single peer that is nearest to the given ID +func (rt *RoutingTable) NearestPeer(id ID) *peer.Peer { + peers := rt.NearestPeers(id, 1) + if len(peers) > 0 { + return peers[0] + } else { + return nil + } +} + +// Returns a list of the 'count' closest peers to the given ID +func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer { + rt.tabLock.RLock() + defer rt.tabLock.RUnlock() + cpl := prefLen(id, rt.local) + + // Get bucket at cpl index or last bucket + var bucket *Bucket + if cpl >= len(rt.Buckets) { + cpl = len(rt.Buckets) - 1 + } + bucket = rt.Buckets[cpl] + + var peerArr peerSorterArr + if bucket.Len() == 0 { + // In the case of an unusual split, one bucket may be empty. + // if this happens, search both surrounding buckets for nearest peer + if cpl > 0 { + plist := rt.Buckets[cpl-1].list + peerArr = copyPeersFromList(id, peerArr, plist) + } + + if cpl < len(rt.Buckets)-1 { + plist := rt.Buckets[cpl+1].list + peerArr = copyPeersFromList(id, peerArr, plist) + } + } else { + peerArr = copyPeersFromList(id, peerArr, bucket.list) + } + + // Sort by distance to local peer + sort.Sort(peerArr) + + var out []*peer.Peer + for i := 0; i < count && i < peerArr.Len(); i++ { + out = append(out, peerArr[i].p) + } + + return out +} + +// Returns the total number of peers in the routing table +func (rt *RoutingTable) Size() int { + var tot int + for _, buck := range rt.Buckets { + tot += buck.Len() + } + return tot +} + +// NOTE: This is potentially unsafe... use at your own risk +func (rt *RoutingTable) Listpeers() []*peer.Peer { + var peers []*peer.Peer + for _, buck := range rt.Buckets { + for e := buck.getIter(); e != nil; e = e.Next() { + peers = append(peers, e.Value.(*peer.Peer)) + } + } + return peers +} + +func (rt *RoutingTable) Print() { + fmt.Printf("Routing Table, bs = %d, Max latency = %d\n", rt.bucketsize, rt.maxLatency) + rt.tabLock.RLock() + peers := rt.Listpeers() + for i, p := range peers { + fmt.Printf("%d) %s %s\n", i, p.ID.Pretty(), p.GetLatency().String()) + } +} diff --git a/routing/kbucket/table_test.go b/routing/kbucket/table_test.go new file mode 100644 index 000000000..02d8f5e0e --- /dev/null +++ b/routing/kbucket/table_test.go @@ -0,0 +1,199 @@ +package dht + +import ( + crand "crypto/rand" + "crypto/sha256" + "math/rand" + "testing" + "time" + + peer "github.com/jbenet/go-ipfs/peer" +) + +func _randPeer() *peer.Peer { + p := new(peer.Peer) + p.ID = make(peer.ID, 16) + crand.Read(p.ID) + return p +} + +func _randID() ID { + buf := make([]byte, 16) + crand.Read(buf) + + hash := sha256.Sum256(buf) + return ID(hash[:]) +} + +// Test basic features of the bucket struct +func TestBucket(t *testing.T) { + b := NewBucket() + + peers := make([]*peer.Peer, 100) + for i := 0; i < 100; i++ { + peers[i] = _randPeer() + b.PushFront(peers[i]) + } + + local := _randPeer() + local_id := ConvertPeerID(local.ID) + + i := rand.Intn(len(peers)) + e := b.Find(peers[i].ID) + if e == nil { + t.Errorf("Failed to find peer: %v", peers[i]) + } + + spl := b.Split(0, ConvertPeerID(local.ID)) + llist := b.list + for e := llist.Front(); e != nil; e = e.Next() { + p := ConvertPeerID(e.Value.(*peer.Peer).ID) + cpl := xor(p, local_id).commonPrefixLen() + if cpl > 0 { + t.Fatalf("Split failed. found id with cpl > 0 in 0 bucket") + } + } + + rlist := spl.list + for e := rlist.Front(); e != nil; e = e.Next() { + p := ConvertPeerID(e.Value.(*peer.Peer).ID) + cpl := xor(p, local_id).commonPrefixLen() + if cpl == 0 { + t.Fatalf("Split failed. found id with cpl == 0 in non 0 bucket") + } + } +} + +// Right now, this just makes sure that it doesnt hang or crash +func TestTableUpdate(t *testing.T) { + local := _randPeer() + rt := NewRoutingTable(10, ConvertPeerID(local.ID), time.Hour) + + peers := make([]*peer.Peer, 100) + for i := 0; i < 100; i++ { + peers[i] = _randPeer() + } + + // Testing Update + for i := 0; i < 10000; i++ { + p := rt.Update(peers[rand.Intn(len(peers))]) + if p != nil { + t.Log("evicted peer.") + } + } + + for i := 0; i < 100; i++ { + id := _randID() + ret := rt.NearestPeers(id, 5) + if len(ret) == 0 { + t.Fatal("Failed to find node near ID.") + } + } +} + +func TestTableFind(t *testing.T) { + local := _randPeer() + rt := NewRoutingTable(10, ConvertPeerID(local.ID), time.Hour) + + peers := make([]*peer.Peer, 100) + for i := 0; i < 5; i++ { + peers[i] = _randPeer() + rt.Update(peers[i]) + } + + t.Logf("Searching for peer: '%s'", peers[2].ID.Pretty()) + found := rt.NearestPeer(ConvertPeerID(peers[2].ID)) + if !found.ID.Equal(peers[2].ID) { + t.Fatalf("Failed to lookup known node...") + } +} + +func TestTableFindMultiple(t *testing.T) { + local := _randPeer() + rt := NewRoutingTable(20, ConvertPeerID(local.ID), time.Hour) + + peers := make([]*peer.Peer, 100) + for i := 0; i < 18; i++ { + peers[i] = _randPeer() + rt.Update(peers[i]) + } + + t.Logf("Searching for peer: '%s'", peers[2].ID.Pretty()) + found := rt.NearestPeers(ConvertPeerID(peers[2].ID), 15) + if len(found) != 15 { + t.Fatalf("Got back different number of peers than we expected.") + } +} + +// Looks for race conditions in table operations. For a more 'certain' +// test, increase the loop counter from 1000 to a much higher number +// and set GOMAXPROCS above 1 +func TestTableMultithreaded(t *testing.T) { + local := peer.ID("localPeer") + tab := NewRoutingTable(20, ConvertPeerID(local), time.Hour) + var peers []*peer.Peer + for i := 0; i < 500; i++ { + peers = append(peers, _randPeer()) + } + + done := make(chan struct{}) + go func() { + for i := 0; i < 1000; i++ { + n := rand.Intn(len(peers)) + tab.Update(peers[n]) + } + done <- struct{}{} + }() + + go func() { + for i := 0; i < 1000; i++ { + n := rand.Intn(len(peers)) + tab.Update(peers[n]) + } + done <- struct{}{} + }() + + go func() { + for i := 0; i < 1000; i++ { + n := rand.Intn(len(peers)) + tab.Find(peers[n].ID) + } + done <- struct{}{} + }() + <-done + <-done + <-done +} + +func BenchmarkUpdates(b *testing.B) { + b.StopTimer() + local := ConvertKey("localKey") + tab := NewRoutingTable(20, local, time.Hour) + + var peers []*peer.Peer + for i := 0; i < b.N; i++ { + peers = append(peers, _randPeer()) + } + + b.StartTimer() + for i := 0; i < b.N; i++ { + tab.Update(peers[i]) + } +} + +func BenchmarkFinds(b *testing.B) { + b.StopTimer() + local := ConvertKey("localKey") + tab := NewRoutingTable(20, local, time.Hour) + + var peers []*peer.Peer + for i := 0; i < b.N; i++ { + peers = append(peers, _randPeer()) + tab.Update(peers[i]) + } + + b.StartTimer() + for i := 0; i < b.N; i++ { + tab.Find(peers[i].ID) + } +} diff --git a/routing/dht/table.go b/routing/kbucket/util.go similarity index 53% rename from routing/dht/table.go rename to routing/kbucket/util.go index d7625e462..32ff2c269 100644 --- a/routing/dht/table.go +++ b/routing/kbucket/util.go @@ -2,40 +2,30 @@ package dht import ( "bytes" - "container/list" + "crypto/sha256" + "errors" peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" ) +// Returned if a routing table query returns no results. This is NOT expected +// behaviour +var ErrLookupFailure = errors.New("failed to find any peer in table") + // ID for IpfsDHT should be a byte slice, to allow for simpler operations // (xor). DHT ids are based on the peer.IDs. // -// NOTE: peer.IDs are biased because they are multihashes (first bytes -// biased). Thus, may need to re-hash keys (uniform dist). TODO(jbenet) +// The type dht.ID signifies that its contents have been hashed from either a +// peer.ID or a util.Key. This unifies the keyspace type ID []byte -// Bucket holds a list of peers. -type Bucket []*list.List - -// RoutingTable defines the routing table. -type RoutingTable struct { - - // kBuckets define all the fingers to other nodes. - Buckets []Bucket -} - -//TODO: make this accept an ID, requires method of converting keys to IDs -func (rt *RoutingTable) NearestNode(key u.Key) *peer.Peer { - panic("Function not implemented.") -} - func (id ID) Equal(other ID) bool { return bytes.Equal(id, other) } -func (id ID) Less(other interface{}) bool { - a, b := equalizeSizes(id, other.(ID)) +func (id ID) Less(other ID) bool { + a, b := equalizeSizes(id, other) for i := 0; i < len(a); i++ { if a[i] != b[i] { return a[i] < b[i] @@ -55,6 +45,10 @@ func (id ID) commonPrefixLen() int { return len(id)*8 - 1 } +func prefLen(a, b ID) int { + return xor(a, b).commonPrefixLen() +} + func xor(a, b ID) ID { a, b = equalizeSizes(a, b) @@ -81,3 +75,24 @@ func equalizeSizes(a, b ID) (ID, ID) { return a, b } + +func ConvertPeerID(id peer.ID) ID { + hash := sha256.Sum256(id) + return hash[:] +} + +func ConvertKey(id u.Key) ID { + hash := sha256.Sum256([]byte(id)) + return hash[:] +} + +// Returns true if a is closer to key than b is +func Closer(a, b peer.ID, key u.Key) bool { + aid := ConvertPeerID(a) + bid := ConvertPeerID(b) + tgt := ConvertKey(key) + adist := xor(aid, tgt) + bdist := xor(bid, tgt) + + return adist.Less(bdist) +} diff --git a/routing/routing.go b/routing/routing.go index 933032f46..3826f13cb 100644 --- a/routing/routing.go +++ b/routing/routing.go @@ -25,7 +25,7 @@ type IpfsRouting interface { Provide(key u.Key) error // FindProviders searches for peers who can provide the value for given key. - FindProviders(key u.Key, timeout time.Duration) (*peer.Peer, error) + FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) // Find specific Peer diff --git a/swarm/conn.go b/swarm/conn.go index 56e8eea17..072b53437 100644 --- a/swarm/conn.go +++ b/swarm/conn.go @@ -2,11 +2,12 @@ package swarm import ( "fmt" + "net" + peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" msgio "github.com/jbenet/go-msgio" ma "github.com/jbenet/go-multiaddr" - "net" ) // ChanBuffer is the size of the buffer in the Conn Chan diff --git a/swarm/interface.go b/swarm/interface.go new file mode 100644 index 000000000..9a70890e6 --- /dev/null +++ b/swarm/interface.go @@ -0,0 +1,20 @@ +package swarm + +import ( + peer "github.com/jbenet/go-ipfs/peer" + u "github.com/jbenet/go-ipfs/util" + + ma "github.com/jbenet/go-multiaddr" +) + +type Network interface { + Send(*Message) + Error(error) + Find(u.Key) *peer.Peer + Listen() error + ConnectNew(*ma.Multiaddr) (*peer.Peer, error) + GetConnection(id peer.ID, addr *ma.Multiaddr) (*peer.Peer, error) + GetChan() *Chan + Close() + Drop(*peer.Peer) error +} diff --git a/swarm/swarm.go b/swarm/swarm.go index fccc74777..cf968984c 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -1,12 +1,16 @@ package swarm import ( + "errors" "fmt" + "net" + "sync" + + proto "code.google.com/p/goprotobuf/proto" + ident "github.com/jbenet/go-ipfs/identify" peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" ma "github.com/jbenet/go-multiaddr" - "net" - "sync" ) // Message represents a packet of information sent to or received from a @@ -19,6 +23,19 @@ type Message struct { Data []byte } +// Cleaner looking helper function to make a new message struct +func NewMessage(p *peer.Peer, data proto.Message) *Message { + bytes, err := proto.Marshal(data) + if err != nil { + u.PErr(err.Error()) + return nil + } + return &Message{ + Peer: p, + Data: bytes, + } +} + // Chan is a swam channel, which provides duplex communication and errors. type Chan struct { Outgoing chan *Message @@ -32,11 +49,30 @@ func NewChan(bufsize int) *Chan { return &Chan{ Outgoing: make(chan *Message, bufsize), Incoming: make(chan *Message, bufsize), - Errors: make(chan error), + Errors: make(chan error, bufsize), Close: make(chan bool, bufsize), } } +// Contains a set of errors mapping to each of the swarms addresses +// that were listened on +type SwarmListenErr struct { + Errors []error +} + +func (se *SwarmListenErr) Error() string { + if se == nil { + return "<nil error>" + } + var out string + for i, v := range se.Errors { + if v != nil { + out += fmt.Sprintf("%d: %s\n", i, v) + } + } + return out +} + // Swarm is a connection muxer, allowing connections to other peers to // be opened and closed, while still using the same Chan for all // communication. The Chan sends/receives Messages, which note the @@ -46,7 +82,8 @@ type Swarm struct { conns ConnMap connsLock sync.RWMutex - local *peer.Peer + local *peer.Peer + listeners []net.Listener } // NewSwarm constructs a Swarm, with a Chan. @@ -61,13 +98,23 @@ func NewSwarm(local *peer.Peer) *Swarm { } // Open listeners for each network the swarm should listen on -func (s *Swarm) Listen() { - for _, addr := range s.local.Addresses { +func (s *Swarm) Listen() error { + var ret_err *SwarmListenErr + for i, addr := range s.local.Addresses { err := s.connListen(addr) if err != nil { + if ret_err == nil { + ret_err = new(SwarmListenErr) + ret_err.Errors = make([]error, len(s.local.Addresses)) + } + ret_err.Errors[i] = err u.PErr("Failed to listen on: %s [%s]", addr, err) } } + if ret_err == nil { + return nil + } + return ret_err } // Listen for new connections on the given multiaddr @@ -82,12 +129,17 @@ func (s *Swarm) connListen(maddr *ma.Multiaddr) error { return err } + // NOTE: this may require a lock around it later. currently, only run on setup + s.listeners = append(s.listeners, list) + // Accept and handle new connections on this listener until it errors go func() { for { nconn, err := list.Accept() if err != nil { - u.PErr("Failed to accept connection: %s - %s", netstr, addr) + e := fmt.Errorf("Failed to accept connection: %s - %s [%s]", + netstr, addr, err) + go func() { s.Chan.Errors <- e }() return } go s.handleNewConn(nconn) @@ -99,7 +151,31 @@ func (s *Swarm) connListen(maddr *ma.Multiaddr) error { // Handle getting ID from this peer and adding it into the map func (s *Swarm) handleNewConn(nconn net.Conn) { - panic("Not yet implemented!") + p := new(peer.Peer) + + conn := &Conn{ + Peer: p, + Addr: nil, + Conn: nconn, + } + newConnChans(conn) + + err := ident.Handshake(s.local, p, conn.Incoming.MsgChan, conn.Outgoing.MsgChan) + if err != nil { + u.PErr(err.Error()) + conn.Close() + return + } + + // Get address to contact remote peer from + addr := <-conn.Incoming.MsgChan + maddr, err := ma.NewMultiaddr(string(addr)) + if err != nil { + u.PErr("Got invalid address from peer.") + } + p.AddAddress(maddr) + + s.StartConn(conn) } // Close closes a swarm. @@ -113,6 +189,10 @@ func (s *Swarm) Close() { } s.Chan.Close <- true // fan out s.Chan.Close <- true // listener + + for _, list := range s.listeners { + list.Close() + } } // Dial connects to a peer. @@ -123,7 +203,7 @@ func (s *Swarm) Close() { // etc. to achive connection. // // For now, Dial uses only TCP. This will be extended. -func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error) { +func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error, bool) { k := peer.Key() // check if we already have an open connection first @@ -131,23 +211,32 @@ func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error) { conn, found := s.conns[k] s.connsLock.RUnlock() if found { - return conn, nil + return conn, nil, true } // open connection to peer conn, err := Dial("tcp", peer) if err != nil { - return nil, err + return nil, err, false } + return conn, nil, false +} + +func (s *Swarm) StartConn(conn *Conn) error { + if conn == nil { + return errors.New("Tried to start nil connection.") + } + + u.DOut("Starting connection: %s", conn.Peer.Key().Pretty()) // add to conns s.connsLock.Lock() - s.conns[k] = conn + s.conns[conn.Peer.Key()] = conn s.connsLock.Unlock() // kick off reader goroutine go s.fanIn(conn) - return conn, nil + return nil } // Handles the unwrapping + sending of messages to the right connection. @@ -160,13 +249,17 @@ func (s *Swarm) fanOut() { if !ok { return } + //u.DOut("fanOut: outgoing message for: '%s'", msg.Peer.Key().Pretty()) s.connsLock.RLock() conn, found := s.conns[msg.Peer.Key()] s.connsLock.RUnlock() + if !found { - e := fmt.Errorf("Sent msg to peer without open conn: %v", msg.Peer) + e := fmt.Errorf("Sent msg to peer without open conn: %v", + msg.Peer) s.Chan.Errors <- e + continue } // queue it in the connection's buffer @@ -178,22 +271,21 @@ func (s *Swarm) fanOut() { // Handles the receiving + wrapping of messages, per conn. // Consider using reflect.Select with one goroutine instead of n. func (s *Swarm) fanIn(conn *Conn) { -Loop: for { select { case <-s.Chan.Close: // close Conn. conn.Close() - break Loop + goto out case <-conn.Closed: - break Loop + goto out case data, ok := <-conn.Incoming.MsgChan: if !ok { - e := fmt.Errorf("Error retrieving from conn: %v", conn) + e := fmt.Errorf("Error retrieving from conn: %v", conn.Peer.Key().Pretty()) s.Chan.Errors <- e - break Loop + goto out } // wrap it for consumers. @@ -201,8 +293,110 @@ Loop: s.Chan.Incoming <- msg } } +out: s.connsLock.Lock() delete(s.conns, conn.Peer.Key()) s.connsLock.Unlock() } + +func (s *Swarm) Find(key u.Key) *peer.Peer { + conn, found := s.conns[key] + if !found { + return nil + } + return conn.Peer +} + +// GetConnection will check if we are already connected to the peer in question +// and only open a new connection if we arent already +func (s *Swarm) GetConnection(id peer.ID, addr *ma.Multiaddr) (*peer.Peer, error) { + p := &peer.Peer{ + ID: id, + Addresses: []*ma.Multiaddr{addr}, + } + + if id.Equal(s.local.ID) { + panic("Attempted connection to self!") + } + + conn, err, reused := s.Dial(p) + if err != nil { + return nil, err + } + + if reused { + return p, nil + } + + err = s.handleDialedCon(conn) + return conn.Peer, err +} + +func (s *Swarm) handleDialedCon(conn *Conn) error { + err := ident.Handshake(s.local, conn.Peer, conn.Incoming.MsgChan, conn.Outgoing.MsgChan) + if err != nil { + return err + } + + // Send node an address that you can be reached on + myaddr := s.local.NetAddress("tcp") + mastr, err := myaddr.String() + if err != nil { + errors.New("No local address to send to peer.") + } + + conn.Outgoing.MsgChan <- []byte(mastr) + + s.StartConn(conn) + + return nil +} + +// ConnectNew is for connecting to a peer when you dont know their ID, +// Should only be used when you are sure that you arent already connected to peer in question +func (s *Swarm) ConnectNew(addr *ma.Multiaddr) (*peer.Peer, error) { + if addr == nil { + return nil, errors.New("nil Multiaddr passed to swarm.Connect()") + } + npeer := new(peer.Peer) + npeer.AddAddress(addr) + + conn, err := Dial("tcp", npeer) + if err != nil { + return nil, err + } + + err = s.handleDialedCon(conn) + return npeer, err +} + +// Removes a given peer from the swarm and closes connections to it +func (s *Swarm) Drop(p *peer.Peer) error { + s.connsLock.RLock() + conn, found := s.conns[u.Key(p.ID)] + s.connsLock.RUnlock() + if !found { + return u.ErrNotFound + } + + s.connsLock.Lock() + delete(s.conns, u.Key(p.ID)) + s.connsLock.Unlock() + + return conn.Close() +} + +func (s *Swarm) Send(mes *Message) { + s.Chan.Outgoing <- mes +} + +func (s *Swarm) Error(e error) { + s.Chan.Errors <- e +} + +func (s *Swarm) GetChan() *Chan { + return s.Chan +} + +var _ Network = &Swarm{} diff --git a/swarm/swarm_test.go b/swarm/swarm_test.go index e289a5e77..609288c38 100644 --- a/swarm/swarm_test.go +++ b/swarm/swarm_test.go @@ -2,11 +2,12 @@ package swarm import ( "fmt" + "net" + "testing" + peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" msgio "github.com/jbenet/go-msgio" - "net" - "testing" ) func pingListen(listener *net.TCPListener, peer *peer.Peer) { @@ -43,8 +44,8 @@ func pong(c net.Conn, peer *peer.Peer) { func TestSwarm(t *testing.T) { swarm := NewSwarm(nil) - peers := []*peer.Peer{} - listeners := []*net.Listener{} + var peers []*peer.Peer + var listeners []net.Listener peerNames := map[string]string{ "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a30": "/ip4/127.0.0.1/tcp/1234", "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31": "/ip4/127.0.0.1/tcp/2345", @@ -71,14 +72,15 @@ func TestSwarm(t *testing.T) { } go pingListen(listener.(*net.TCPListener), peer) - _, err = swarm.Dial(peer) + conn, err, _ := swarm.Dial(peer) if err != nil { t.Fatal("error swarm dialing to peer", err) } + swarm.StartConn(conn) // ok done, add it. peers = append(peers, peer) - listeners = append(listeners, &listener) + listeners = append(listeners, listener) } MsgNum := 1000 @@ -112,6 +114,6 @@ func TestSwarm(t *testing.T) { fmt.Println("closing") swarm.Close() for _, listener := range listeners { - (*listener).(*net.TCPListener).Close() + listener.(*net.TCPListener).Close() } } diff --git a/util/util.go b/util/util.go index 69831ff8d..ac9ca8100 100644 --- a/util/util.go +++ b/util/util.go @@ -1,25 +1,39 @@ package util import ( + "errors" "fmt" - mh "github.com/jbenet/go-multihash" "os" "os/user" "strings" + + b58 "github.com/jbenet/go-base58" + mh "github.com/jbenet/go-multihash" ) // Debug is a global flag for debugging. var Debug bool // ErrNotImplemented signifies a function has not been implemented yet. -var ErrNotImplemented = fmt.Errorf("Error: not implemented yet.") +var ErrNotImplemented = errors.New("Error: not implemented yet.") // ErrTimeout implies that a timeout has been triggered -var ErrTimeout = fmt.Errorf("Error: Call timed out.") +var ErrTimeout = errors.New("Error: Call timed out.") + +// ErrSeErrSearchIncomplete implies that a search type operation didnt +// find the expected node, but did find 'a' node. +var ErrSearchIncomplete = errors.New("Error: Search Incomplete.") + +// ErrNotFound is returned when a search fails to find anything +var ErrNotFound = errors.New("Error: Not Found.") // Key is a string representation of multihash for use with maps. type Key string +func (k Key) Pretty() string { + return b58.Encode([]byte(k)) +} + // Hash is the global IPFS hash function. uses multihash SHA2_256, 256 bits func Hash(data []byte) (mh.Multihash, error) { return mh.Sum(data, mh.SHA2_256, -1) @@ -41,12 +55,12 @@ func TildeExpansion(filename string) (string, error) { // PErr is a shorthand printing function to output to Stderr. func PErr(format string, a ...interface{}) { - fmt.Fprintf(os.Stderr, format, a...) + fmt.Fprintf(os.Stderr, format+"\n", a...) } // POut is a shorthand printing function to output to Stdout. func POut(format string, a ...interface{}) { - fmt.Fprintf(os.Stdout, format, a...) + fmt.Fprintf(os.Stdout, format+"\n", a...) } // DErr is a shorthand debug printing function to output to Stderr.