1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-26 07:28:20 +08:00

Merge pull request #25 from jbenet/dht

DHT PR
This commit is contained in:
Juan Batiz-Benet
2014-08-16 14:11:05 -07:00
22 changed files with 2451 additions and 222 deletions

20
identify/identify.go Normal file
View File

@ -0,0 +1,20 @@
// The identify package handles how peers identify with eachother upon
// connection to the network
package identify
import (
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)
// Perform initial communication with this peer to share node ID's and
// initiate communication
func Handshake(self, remote *peer.Peer, in, out chan []byte) error {
// TODO: make this more... secure.
out <- self.ID
resp := <-in
remote.ID = peer.ID(resp)
u.DOut("[%s] identify: Got node id: %s", self.ID.Pretty(), remote.ID.Pretty())
return nil
}

3
identify/message.proto Normal file
View File

@ -0,0 +1,3 @@
message Identify {
required bytes id = 1;
}

View File

@ -1,6 +1,10 @@
package peer
import (
"sync"
"time"
b58 "github.com/jbenet/go-base58"
u "github.com/jbenet/go-ipfs/util"
ma "github.com/jbenet/go-multiaddr"
mh "github.com/jbenet/go-multihash"
@ -12,8 +16,12 @@ import (
type ID mh.Multihash
// Utililty function for comparing two peer ID's
func (id *ID) Equal(other *ID) bool {
return bytes.Equal(*id, *other)
func (id ID) Equal(other ID) bool {
return bytes.Equal(id, other)
}
func (id ID) Pretty() string {
return b58.Encode(id)
}
// Map maps Key (string) : *Peer (slices are not comparable).
@ -24,6 +32,9 @@ type Map map[u.Key]*Peer
type Peer struct {
ID ID
Addresses []*ma.Multiaddr
latency time.Duration
latenLock sync.RWMutex
}
// Key returns the ID as a Key (string) for maps.
@ -52,3 +63,18 @@ func (p *Peer) NetAddress(n string) *ma.Multiaddr {
}
return nil
}
func (p *Peer) GetLatency() (out time.Duration) {
p.latenLock.RLock()
out = p.latency
p.latenLock.RUnlock()
return
}
// TODO: Instead of just keeping a single number,
// keep a running average over the last hour or so
func (p *Peer) SetLatency(laten time.Duration) {
p.latenLock.Lock()
p.latency = laten
p.latenLock.Unlock()
}

49
routing/dht/DHTMessage.go Normal file
View File

@ -0,0 +1,49 @@
package dht
import (
peer "github.com/jbenet/go-ipfs/peer"
)
// A helper struct to make working with protbuf types easier
type DHTMessage struct {
Type PBDHTMessage_MessageType
Key string
Value []byte
Response bool
Id uint64
Success bool
Peers []*peer.Peer
}
func peerInfo(p *peer.Peer) *PBDHTMessage_PBPeer {
pbp := new(PBDHTMessage_PBPeer)
addr, err := p.Addresses[0].String()
if err != nil {
//Temp: what situations could cause this?
panic(err)
}
pbp.Addr = &addr
pid := string(p.ID)
pbp.Id = &pid
return pbp
}
// TODO: building the protobuf message this way is a little wasteful
// Unused fields wont be omitted, find a better way to do this
func (m *DHTMessage) ToProtobuf() *PBDHTMessage {
pmes := new(PBDHTMessage)
if m.Value != nil {
pmes.Value = m.Value
}
pmes.Type = &m.Type
pmes.Key = &m.Key
pmes.Response = &m.Response
pmes.Id = &m.Id
pmes.Success = &m.Success
for _, p := range m.Peers {
pmes.Peers = append(pmes.Peers, peerInfo(p))
}
return pmes
}

View File

@ -1,11 +1,17 @@
package dht
import (
"bytes"
"errors"
"sync"
"time"
peer "github.com/jbenet/go-ipfs/peer"
swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util"
peer "github.com/jbenet/go-ipfs/peer"
kb "github.com/jbenet/go-ipfs/routing/kbucket"
swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util"
ma "github.com/jbenet/go-multiaddr"
ds "github.com/jbenet/datastore.go"
@ -17,9 +23,11 @@ import (
// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
routes RoutingTable
// Array of routing tables for differently distanced nodes
// NOTE: (currently, only a single table is used)
routes []*kb.RoutingTable
network *swarm.Swarm
network swarm.Network
// Local peer (yourself)
self *peer.Peer
@ -27,96 +35,221 @@ type IpfsDHT struct {
// Local data
datastore ds.Datastore
// map of channels waiting for reply messages
listeners map[uint64]chan *swarm.Message
listenLock sync.RWMutex
// Map keys to peers that can provide their value
providers map[u.Key][]*providerInfo
providerLock sync.RWMutex
// Signal to shutdown dht
shutdown chan struct{}
// When this peer started up
birth time.Time
//lock to make diagnostics work better
diaglock sync.Mutex
// listener is a server to register to listen for responses to messages
listener *MesListener
}
func NewDHT(p *peer.Peer) *IpfsDHT {
// NewDHT creates a new DHT object with the given peer as the 'local' host
func NewDHT(p *peer.Peer, net swarm.Network) *IpfsDHT {
dht := new(IpfsDHT)
dht.network = net
dht.datastore = ds.NewMapDatastore()
dht.self = p
dht.network = swarm.NewSwarm(p)
dht.listeners = make(map[uint64]chan *swarm.Message)
dht.providers = make(map[u.Key][]*providerInfo)
dht.shutdown = make(chan struct{})
dht.routes = make([]*kb.RoutingTable, 3)
dht.routes[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*30)
dht.routes[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*100)
dht.routes[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Hour)
dht.listener = NewMesListener()
dht.birth = time.Now()
return dht
}
// Start up background goroutines needed by the DHT
func (dht *IpfsDHT) Start() {
go dht.handleMessages()
}
// Connect to a new peer at the given address, ping and add to the routing table
func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
maddrstr, _ := addr.String()
u.DOut("Connect to new peer: %s", maddrstr)
npeer, err := dht.network.ConnectNew(addr)
if err != nil {
return nil, err
}
// Ping new peer to register in their routing table
// NOTE: this should be done better...
err = dht.Ping(npeer, time.Second*2)
if err != nil {
return nil, errors.New("failed to ping newly connected peer")
}
dht.Update(npeer)
return npeer, nil
}
// Read in all messages from swarm and handle them appropriately
// NOTE: this function is just a quick sketch
func (dht *IpfsDHT) handleMessages() {
u.DOut("Begin message handling routine")
checkTimeouts := time.NewTicker(time.Minute * 5)
ch := dht.network.GetChan()
for {
select {
case mes := <-dht.network.Chan.Incoming:
pmes := new(DHTMessage)
case mes, ok := <-ch.Incoming:
if !ok {
u.DOut("handleMessages closing, bad recv on incoming")
return
}
pmes := new(PBDHTMessage)
err := proto.Unmarshal(mes.Data, pmes)
if err != nil {
u.PErr("Failed to decode protobuf message: %s", err)
continue
}
dht.Update(mes.Peer)
// Note: not sure if this is the correct place for this
if pmes.GetResponse() {
dht.listenLock.RLock()
ch, ok := dht.listeners[pmes.GetId()]
dht.listenLock.RUnlock()
if ok {
ch <- mes
}
// this is expected behaviour during a timeout
u.DOut("Received response with nobody listening...")
dht.listener.Respond(pmes.GetId(), mes)
continue
}
//
u.DOut("[peer: %s]\nGot message type: '%s' [id = %x, from = %s]",
dht.self.ID.Pretty(),
PBDHTMessage_MessageType_name[int32(pmes.GetType())],
pmes.GetId(), mes.Peer.ID.Pretty())
switch pmes.GetType() {
case DHTMessage_GET_VALUE:
case PBDHTMessage_GET_VALUE:
dht.handleGetValue(mes.Peer, pmes)
case DHTMessage_PUT_VALUE:
case PBDHTMessage_PUT_VALUE:
dht.handlePutValue(mes.Peer, pmes)
case DHTMessage_FIND_NODE:
dht.handleFindNode(mes.Peer, pmes)
case DHTMessage_ADD_PROVIDER:
case DHTMessage_GET_PROVIDERS:
case DHTMessage_PING:
dht.handleFindNode(mes.Peer, pmes)
case PBDHTMessage_FIND_NODE:
dht.handleFindPeer(mes.Peer, pmes)
case PBDHTMessage_ADD_PROVIDER:
dht.handleAddProvider(mes.Peer, pmes)
case PBDHTMessage_GET_PROVIDERS:
dht.handleGetProviders(mes.Peer, pmes)
case PBDHTMessage_PING:
dht.handlePing(mes.Peer, pmes)
case PBDHTMessage_DIAGNOSTIC:
dht.handleDiagnostic(mes.Peer, pmes)
}
case err := <-ch.Errors:
u.PErr("dht err: %s", err)
case <-dht.shutdown:
checkTimeouts.Stop()
return
case <-checkTimeouts.C:
// Time to collect some garbage!
dht.cleanExpiredProviders()
}
}
}
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) {
dskey := ds.NewKey(pmes.GetKey())
i_val, err := dht.datastore.Get(dskey)
if err == nil {
isResponse := true
resp := new(DHTMessage)
resp.Response = &isResponse
resp.Id = pmes.Id
resp.Key = pmes.Key
val := i_val.([]byte)
resp.Value = val
mes := new(swarm.Message)
mes.Peer = p
mes.Data = []byte(resp.String())
} else if err == ds.ErrNotFound {
// Find closest node(s) to desired key and reply with that info
// TODO: this will need some other metadata in the protobuf message
// to signal to the querying node that the data its receiving
// is actually a list of other nodes
func (dht *IpfsDHT) cleanExpiredProviders() {
dht.providerLock.Lock()
for k, parr := range dht.providers {
var cleaned []*providerInfo
for _, v := range parr {
if time.Since(v.Creation) < time.Hour {
cleaned = append(cleaned, v)
}
}
dht.providers[k] = cleaned
}
dht.providerLock.Unlock()
}
// Store a value in this nodes local storage
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) {
func (dht *IpfsDHT) putValueToNetwork(p *peer.Peer, key string, value []byte) error {
pmes := DHTMessage{
Type: PBDHTMessage_PUT_VALUE,
Key: key,
Value: value,
Id: GenerateMessageID(),
}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
dht.network.Send(mes)
return nil
}
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) {
u.DOut("handleGetValue for key: %s", pmes.GetKey())
dskey := ds.NewKey(pmes.GetKey())
resp := &DHTMessage{
Response: true,
Id: pmes.GetId(),
Key: pmes.GetKey(),
}
iVal, err := dht.datastore.Get(dskey)
if err == nil {
u.DOut("handleGetValue success!")
resp.Success = true
resp.Value = iVal.([]byte)
} else if err == ds.ErrNotFound {
// Check if we know any providers for the requested value
provs, ok := dht.providers[u.Key(pmes.GetKey())]
if ok && len(provs) > 0 {
u.DOut("handleGetValue returning %d provider[s]", len(provs))
for _, prov := range provs {
resp.Peers = append(resp.Peers, prov.Value)
}
resp.Success = true
} else {
// No providers?
// Find closest peer on given cluster to desired key and reply with that info
level := 0
if len(pmes.GetValue()) < 1 {
// TODO: maybe return an error? Defaulting isnt a good idea IMO
u.PErr("handleGetValue: no routing level specified, assuming 0")
} else {
level = int(pmes.GetValue()[0]) // Using value field to specify cluster level
}
u.DOut("handleGetValue searching level %d clusters", level)
closer := dht.routes[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
if closer.ID.Equal(dht.self.ID) {
u.DOut("Attempted to return self! this shouldnt happen...")
resp.Peers = nil
goto out
}
// If this peer is closer than the one from the table, return nil
if kb.Closer(dht.self.ID, closer.ID, u.Key(pmes.GetKey())) {
resp.Peers = nil
u.DOut("handleGetValue could not find a closer node than myself.")
} else {
u.DOut("handleGetValue returning a closer peer: '%s'", closer.ID.Pretty())
resp.Peers = []*peer.Peer{closer}
}
}
} else {
//temp: what other errors can a datastore return?
panic(err)
}
out:
mes := swarm.NewMessage(p, resp.ToProtobuf())
dht.network.Send(mes)
}
// Store a value in this peer local storage
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *PBDHTMessage) {
dskey := ds.NewKey(pmes.GetKey())
err := dht.datastore.Put(dskey, pmes.GetValue())
if err != nil {
@ -125,46 +258,399 @@ func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) {
}
}
func (dht *IpfsDHT) handleFindNode(p *peer.Peer, pmes *DHTMessage) {
panic("Not implemented.")
}
func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) {
isResponse := true
resp := new(DHTMessage)
resp.Id = pmes.Id
resp.Response = &isResponse
mes := new(swarm.Message)
mes.Peer = p
mes.Data = []byte(resp.String())
dht.network.Chan.Outgoing <- mes
}
// Register a handler for a specific message ID, used for getting replies
// to certain messages (i.e. response to a GET_VALUE message)
func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan *swarm.Message {
lchan := make(chan *swarm.Message)
dht.listenLock.Lock()
dht.listeners[mesid] = lchan
dht.listenLock.Unlock()
return lchan
}
func (dht *IpfsDHT) Unlisten(mesid uint64) {
dht.listenLock.Lock()
ch, ok := dht.listeners[mesid]
if ok {
delete(dht.listeners, mesid)
func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *PBDHTMessage) {
resp := DHTMessage{
Type: pmes.GetType(),
Response: true,
Id: pmes.GetId(),
}
dht.listenLock.Unlock()
close(ch)
dht.network.Send(swarm.NewMessage(p, resp.ToProtobuf()))
}
func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *PBDHTMessage) {
resp := DHTMessage{
Type: pmes.GetType(),
Id: pmes.GetId(),
Response: true,
}
defer func() {
mes := swarm.NewMessage(p, resp.ToProtobuf())
dht.network.Send(mes)
}()
level := pmes.GetValue()[0]
u.DOut("handleFindPeer: searching for '%s'", peer.ID(pmes.GetKey()).Pretty())
closest := dht.routes[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
if closest == nil {
u.PErr("handleFindPeer: could not find anything.")
return
}
// Stop all communications from this node and shut down
if len(closest.Addresses) == 0 {
u.PErr("handleFindPeer: no addresses for connected peer...")
return
}
// If the found peer further away than this peer...
if kb.Closer(dht.self.ID, closest.ID, u.Key(pmes.GetKey())) {
return
}
u.DOut("handleFindPeer: sending back '%s'", closest.ID.Pretty())
resp.Peers = []*peer.Peer{closest}
resp.Success = true
}
func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *PBDHTMessage) {
resp := DHTMessage{
Type: PBDHTMessage_GET_PROVIDERS,
Key: pmes.GetKey(),
Id: pmes.GetId(),
Response: true,
}
dht.providerLock.RLock()
providers := dht.providers[u.Key(pmes.GetKey())]
dht.providerLock.RUnlock()
if providers == nil || len(providers) == 0 {
level := 0
if len(pmes.GetValue()) > 0 {
level = int(pmes.GetValue()[0])
}
closer := dht.routes[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
if kb.Closer(dht.self.ID, closer.ID, u.Key(pmes.GetKey())) {
resp.Peers = nil
} else {
resp.Peers = []*peer.Peer{closer}
}
} else {
for _, prov := range providers {
resp.Peers = append(resp.Peers, prov.Value)
}
resp.Success = true
}
mes := swarm.NewMessage(p, resp.ToProtobuf())
dht.network.Send(mes)
}
type providerInfo struct {
Creation time.Time
Value *peer.Peer
}
func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *PBDHTMessage) {
//TODO: need to implement TTLs on providers
key := u.Key(pmes.GetKey())
dht.addProviderEntry(key, p)
}
// Stop all communications from this peer and shut down
func (dht *IpfsDHT) Halt() {
dht.shutdown <- struct{}{}
dht.network.Close()
}
func (dht *IpfsDHT) addProviderEntry(key u.Key, p *peer.Peer) {
u.DOut("Adding %s as provider for '%s'", p.Key().Pretty(), key)
dht.providerLock.Lock()
provs := dht.providers[key]
dht.providers[key] = append(provs, &providerInfo{time.Now(), p})
dht.providerLock.Unlock()
}
// NOTE: not yet finished, low priority
func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *PBDHTMessage) {
seq := dht.routes[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
listenChan := dht.listener.Listen(pmes.GetId(), len(seq), time.Second*30)
for _, ps := range seq {
mes := swarm.NewMessage(ps, pmes)
dht.network.Send(mes)
}
buf := new(bytes.Buffer)
di := dht.getDiagInfo()
buf.Write(di.Marshal())
// NOTE: this shouldnt be a hardcoded value
after := time.After(time.Second * 20)
count := len(seq)
for count > 0 {
select {
case <-after:
//Timeout, return what we have
goto out
case req_resp := <-listenChan:
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(req_resp.Data, pmes_out)
if err != nil {
// It broke? eh, whatever, keep going
continue
}
buf.Write(req_resp.Data)
count--
}
}
out:
resp := DHTMessage{
Type: PBDHTMessage_DIAGNOSTIC,
Id: pmes.GetId(),
Value: buf.Bytes(),
Response: true,
}
mes := swarm.NewMessage(p, resp.ToProtobuf())
dht.network.Send(mes)
}
func (dht *IpfsDHT) getValueOrPeers(p *peer.Peer, key u.Key, timeout time.Duration, level int) ([]byte, []*peer.Peer, error) {
pmes, err := dht.getValueSingle(p, key, timeout, level)
if err != nil {
return nil, nil, err
}
if pmes.GetSuccess() {
if pmes.Value == nil { // We were given provider[s]
val, err := dht.getFromPeerList(key, timeout, pmes.GetPeers(), level)
if err != nil {
return nil, nil, err
}
return val, nil, nil
}
// Success! We were given the value
return pmes.GetValue(), nil, nil
} else {
// We were given a closer node
var peers []*peer.Peer
for _, pb := range pmes.GetPeers() {
if peer.ID(pb.GetId()).Equal(dht.self.ID) {
continue
}
addr, err := ma.NewMultiaddr(pb.GetAddr())
if err != nil {
u.PErr(err.Error())
continue
}
np, err := dht.network.GetConnection(peer.ID(pb.GetId()), addr)
if err != nil {
u.PErr(err.Error())
continue
}
peers = append(peers, np)
}
return nil, peers, nil
}
}
// getValueSingle simply performs the get value RPC with the given parameters
func (dht *IpfsDHT) getValueSingle(p *peer.Peer, key u.Key, timeout time.Duration, level int) (*PBDHTMessage, error) {
pmes := DHTMessage{
Type: PBDHTMessage_GET_VALUE,
Key: string(key),
Value: []byte{byte(level)},
Id: GenerateMessageID(),
}
response_chan := dht.listener.Listen(pmes.Id, 1, time.Minute)
mes := swarm.NewMessage(p, pmes.ToProtobuf())
t := time.Now()
dht.network.Send(mes)
// Wait for either the response or a timeout
timeup := time.After(timeout)
select {
case <-timeup:
dht.listener.Unlisten(pmes.Id)
return nil, u.ErrTimeout
case resp, ok := <-response_chan:
if !ok {
u.PErr("response channel closed before timeout, please investigate.")
return nil, u.ErrTimeout
}
roundtrip := time.Since(t)
resp.Peer.SetLatency(roundtrip)
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
if err != nil {
return nil, err
}
return pmes_out, nil
}
}
// TODO: Im not certain on this implementation, we get a list of peers/providers
// from someone what do we do with it? Connect to each of them? randomly pick
// one to get the value from? Or just connect to one at a time until we get a
// successful connection and request the value from it?
func (dht *IpfsDHT) getFromPeerList(key u.Key, timeout time.Duration,
peerlist []*PBDHTMessage_PBPeer, level int) ([]byte, error) {
for _, pinfo := range peerlist {
p, _ := dht.Find(peer.ID(pinfo.GetId()))
if p == nil {
maddr, err := ma.NewMultiaddr(pinfo.GetAddr())
if err != nil {
u.PErr("getValue error: %s", err)
continue
}
p, err = dht.network.GetConnection(peer.ID(pinfo.GetId()), maddr)
if err != nil {
u.PErr("getValue error: %s", err)
continue
}
}
pmes, err := dht.getValueSingle(p, key, timeout, level)
if err != nil {
u.DErr("getFromPeers error: %s", err)
continue
}
dht.addProviderEntry(key, p)
// Make sure it was a successful get
if pmes.GetSuccess() && pmes.Value != nil {
return pmes.GetValue(), nil
}
}
return nil, u.ErrNotFound
}
func (dht *IpfsDHT) GetLocal(key u.Key) ([]byte, error) {
v, err := dht.datastore.Get(ds.NewKey(string(key)))
if err != nil {
return nil, err
}
return v.([]byte), nil
}
func (dht *IpfsDHT) PutLocal(key u.Key, value []byte) error {
return dht.datastore.Put(ds.NewKey(string(key)), value)
}
func (dht *IpfsDHT) Update(p *peer.Peer) {
for _, route := range dht.routes {
removed := route.Update(p)
// Only drop the connection if no tables refer to this peer
if removed != nil {
found := false
for _, r := range dht.routes {
if r.Find(removed.ID) != nil {
found = true
break
}
}
if !found {
dht.network.Drop(removed)
}
}
}
}
// Look for a peer with a given ID connected to this dht
func (dht *IpfsDHT) Find(id peer.ID) (*peer.Peer, *kb.RoutingTable) {
for _, table := range dht.routes {
p := table.Find(id)
if p != nil {
return p, table
}
}
return nil, nil
}
func (dht *IpfsDHT) findPeerSingle(p *peer.Peer, id peer.ID, timeout time.Duration, level int) (*PBDHTMessage, error) {
pmes := DHTMessage{
Type: PBDHTMessage_FIND_NODE,
Key: string(id),
Id: GenerateMessageID(),
Value: []byte{byte(level)},
}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
listenChan := dht.listener.Listen(pmes.Id, 1, time.Minute)
t := time.Now()
dht.network.Send(mes)
after := time.After(timeout)
select {
case <-after:
dht.listener.Unlisten(pmes.Id)
return nil, u.ErrTimeout
case resp := <-listenChan:
roundtrip := time.Since(t)
resp.Peer.SetLatency(roundtrip)
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
if err != nil {
return nil, err
}
return pmes_out, nil
}
}
func (dht *IpfsDHT) PrintTables() {
for _, route := range dht.routes {
route.Print()
}
}
func (dht *IpfsDHT) findProvidersSingle(p *peer.Peer, key u.Key, level int, timeout time.Duration) (*PBDHTMessage, error) {
pmes := DHTMessage{
Type: PBDHTMessage_GET_PROVIDERS,
Key: string(key),
Id: GenerateMessageID(),
Value: []byte{byte(level)},
}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
listenChan := dht.listener.Listen(pmes.Id, 1, time.Minute)
dht.network.Send(mes)
after := time.After(timeout)
select {
case <-after:
dht.listener.Unlisten(pmes.Id)
return nil, u.ErrTimeout
case resp := <-listenChan:
u.DOut("FindProviders: got response.")
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
if err != nil {
return nil, err
}
return pmes_out, nil
}
}
func (dht *IpfsDHT) addPeerList(key u.Key, peers []*PBDHTMessage_PBPeer) []*peer.Peer {
var prov_arr []*peer.Peer
for _, prov := range peers {
// Dont add outselves to the list
if peer.ID(prov.GetId()).Equal(dht.self.ID) {
continue
}
// Dont add someone who is already on the list
p := dht.network.Find(u.Key(prov.GetId()))
if p == nil {
u.DOut("given provider %s was not in our network already.", peer.ID(prov.GetId()).Pretty())
maddr, err := ma.NewMultiaddr(prov.GetAddr())
if err != nil {
u.PErr("error connecting to new peer: %s", err)
continue
}
p, err = dht.network.GetConnection(peer.ID(prov.GetId()), maddr)
if err != nil {
u.PErr("error connecting to new peer: %s", err)
continue
}
}
dht.addProviderEntry(key, p)
prov_arr = append(prov_arr, p)
}
return prov_arr
}

38
routing/dht/dht_logger.go Normal file
View File

@ -0,0 +1,38 @@
package dht
import (
"encoding/json"
"time"
u "github.com/jbenet/go-ipfs/util"
)
type logDhtRpc struct {
Type string
Start time.Time
End time.Time
Duration time.Duration
RpcCount int
Success bool
}
func startNewRpc(name string) *logDhtRpc {
r := new(logDhtRpc)
r.Type = name
r.Start = time.Now()
return r
}
func (l *logDhtRpc) EndLog() {
l.End = time.Now()
l.Duration = l.End.Sub(l.Start)
}
func (l *logDhtRpc) Print() {
b, err := json.Marshal(l)
if err != nil {
u.DOut(err.Error())
} else {
u.DOut(string(b))
}
}

290
routing/dht/dht_test.go Normal file
View File

@ -0,0 +1,290 @@
package dht
import (
"testing"
peer "github.com/jbenet/go-ipfs/peer"
swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util"
ma "github.com/jbenet/go-multiaddr"
"fmt"
"time"
)
func setupDHTS(n int, t *testing.T) ([]*ma.Multiaddr, []*peer.Peer, []*IpfsDHT) {
var addrs []*ma.Multiaddr
for i := 0; i < 4; i++ {
a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 5000+i))
if err != nil {
t.Fatal(err)
}
addrs = append(addrs, a)
}
var peers []*peer.Peer
for i := 0; i < 4; i++ {
p := new(peer.Peer)
p.AddAddress(addrs[i])
p.ID = peer.ID([]byte(fmt.Sprintf("peer_%d", i)))
peers = append(peers, p)
}
var dhts []*IpfsDHT
for i := 0; i < 4; i++ {
net := swarm.NewSwarm(peers[i])
err := net.Listen()
if err != nil {
t.Fatal(err)
}
d := NewDHT(peers[i], net)
dhts = append(dhts, d)
d.Start()
}
return addrs, peers, dhts
}
func TestPing(t *testing.T) {
u.Debug = false
addr_a, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/2222")
if err != nil {
t.Fatal(err)
}
addr_b, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5678")
if err != nil {
t.Fatal(err)
}
peer_a := new(peer.Peer)
peer_a.AddAddress(addr_a)
peer_a.ID = peer.ID([]byte("peer_a"))
peer_b := new(peer.Peer)
peer_b.AddAddress(addr_b)
peer_b.ID = peer.ID([]byte("peer_b"))
neta := swarm.NewSwarm(peer_a)
err = neta.Listen()
if err != nil {
t.Fatal(err)
}
dht_a := NewDHT(peer_a, neta)
netb := swarm.NewSwarm(peer_b)
err = netb.Listen()
if err != nil {
t.Fatal(err)
}
dht_b := NewDHT(peer_b, netb)
dht_a.Start()
dht_b.Start()
_, err = dht_a.Connect(addr_b)
if err != nil {
t.Fatal(err)
}
//Test that we can ping the node
err = dht_a.Ping(peer_b, time.Second*2)
if err != nil {
t.Fatal(err)
}
dht_a.Halt()
dht_b.Halt()
}
func TestValueGetSet(t *testing.T) {
u.Debug = false
addr_a, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1235")
if err != nil {
t.Fatal(err)
}
addr_b, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5679")
if err != nil {
t.Fatal(err)
}
peer_a := new(peer.Peer)
peer_a.AddAddress(addr_a)
peer_a.ID = peer.ID([]byte("peer_a"))
peer_b := new(peer.Peer)
peer_b.AddAddress(addr_b)
peer_b.ID = peer.ID([]byte("peer_b"))
neta := swarm.NewSwarm(peer_a)
err = neta.Listen()
if err != nil {
t.Fatal(err)
}
dht_a := NewDHT(peer_a, neta)
netb := swarm.NewSwarm(peer_b)
err = netb.Listen()
if err != nil {
t.Fatal(err)
}
dht_b := NewDHT(peer_b, netb)
dht_a.Start()
dht_b.Start()
errsa := dht_a.network.GetChan().Errors
errsb := dht_b.network.GetChan().Errors
go func() {
select {
case err := <-errsa:
t.Fatal(err)
case err := <-errsb:
t.Fatal(err)
}
}()
_, err = dht_a.Connect(addr_b)
if err != nil {
t.Fatal(err)
}
dht_a.PutValue("hello", []byte("world"))
val, err := dht_a.GetValue("hello", time.Second*2)
if err != nil {
t.Fatal(err)
}
if string(val) != "world" {
t.Fatalf("Expected 'world' got '%s'", string(val))
}
}
func TestProvides(t *testing.T) {
u.Debug = false
addrs, _, dhts := setupDHTS(4, t)
_, err := dhts[0].Connect(addrs[1])
if err != nil {
t.Fatal(err)
}
_, err = dhts[1].Connect(addrs[2])
if err != nil {
t.Fatal(err)
}
_, err = dhts[1].Connect(addrs[3])
if err != nil {
t.Fatal(err)
}
err = dhts[3].PutLocal(u.Key("hello"), []byte("world"))
if err != nil {
t.Fatal(err)
}
err = dhts[3].Provide(u.Key("hello"))
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 60)
provs, err := dhts[0].FindProviders(u.Key("hello"), time.Second)
if err != nil {
t.Fatal(err)
}
if len(provs) != 1 {
t.Fatal("Didnt get back providers")
}
for i := 0; i < 4; i++ {
dhts[i].Halt()
}
}
func TestLayeredGet(t *testing.T) {
u.Debug = false
addrs, _, dhts := setupDHTS(4, t)
_, err := dhts[0].Connect(addrs[1])
if err != nil {
t.Fatalf("Failed to connect: %s", err)
}
_, err = dhts[1].Connect(addrs[2])
if err != nil {
t.Fatal(err)
}
_, err = dhts[1].Connect(addrs[3])
if err != nil {
t.Fatal(err)
}
err = dhts[3].PutLocal(u.Key("hello"), []byte("world"))
if err != nil {
t.Fatal(err)
}
err = dhts[3].Provide(u.Key("hello"))
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 60)
val, err := dhts[0].GetValue(u.Key("hello"), time.Second)
if err != nil {
t.Fatal(err)
}
if string(val) != "world" {
t.Fatal("Got incorrect value.")
}
for i := 0; i < 4; i++ {
dhts[i].Halt()
}
}
func TestFindPeer(t *testing.T) {
u.Debug = false
addrs, peers, dhts := setupDHTS(4, t)
_, err := dhts[0].Connect(addrs[1])
if err != nil {
t.Fatal(err)
}
_, err = dhts[1].Connect(addrs[2])
if err != nil {
t.Fatal(err)
}
_, err = dhts[1].Connect(addrs[3])
if err != nil {
t.Fatal(err)
}
p, err := dhts[0].FindPeer(peers[2].ID, time.Second)
if err != nil {
t.Fatal(err)
}
if p == nil {
t.Fatal("Failed to find peer.")
}
if !p.ID.Equal(peers[2].ID) {
t.Fatal("Didnt find expected peer.")
}
for i := 0; i < 4; i++ {
dhts[i].Halt()
}
}

43
routing/dht/diag.go Normal file
View File

@ -0,0 +1,43 @@
package dht
import (
"encoding/json"
"time"
peer "github.com/jbenet/go-ipfs/peer"
)
type connDiagInfo struct {
Latency time.Duration
Id peer.ID
}
type diagInfo struct {
Id peer.ID
Connections []connDiagInfo
Keys []string
LifeSpan time.Duration
CodeVersion string
}
func (di *diagInfo) Marshal() []byte {
b, err := json.Marshal(di)
if err != nil {
panic(err)
}
//TODO: also consider compressing this. There will be a lot of these
return b
}
func (dht *IpfsDHT) getDiagInfo() *diagInfo {
di := new(diagInfo)
di.CodeVersion = "github.com/jbenet/go-ipfs"
di.Id = dht.self.ID
di.LifeSpan = time.Since(dht.birth)
di.Keys = nil // Currently no way to query datastore
for _, p := range dht.routes[0].Listpeers() {
di.Connections = append(di.Connections, connDiagInfo{p.GetLatency(), p.ID})
}
return di
}

152
routing/dht/ext_test.go Normal file
View File

@ -0,0 +1,152 @@
package dht
import (
"testing"
"code.google.com/p/goprotobuf/proto"
peer "github.com/jbenet/go-ipfs/peer"
swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util"
ma "github.com/jbenet/go-multiaddr"
"time"
)
// fauxNet is a standin for a swarm.Network in order to more easily recreate
// different testing scenarios
type fauxNet struct {
Chan *swarm.Chan
handlers []mesHandleFunc
swarm.Network
}
// mesHandleFunc is a function that takes in outgoing messages
// and can respond to them, simulating other peers on the network.
// returning nil will chose not to respond and pass the message onto the
// next registered handler
type mesHandleFunc func(*swarm.Message) *swarm.Message
func newFauxNet() *fauxNet {
fn := new(fauxNet)
fn.Chan = swarm.NewChan(8)
return fn
}
// Instead of 'Listening' Start up a goroutine that will check
// all outgoing messages against registered message handlers,
// and reply if needed
func (f *fauxNet) Listen() error {
go func() {
for {
select {
case in := <-f.Chan.Outgoing:
for _, h := range f.handlers {
reply := h(in)
if reply != nil {
f.Chan.Incoming <- reply
break
}
}
}
}
}()
return nil
}
func (f *fauxNet) AddHandler(fn func(*swarm.Message) *swarm.Message) {
f.handlers = append(f.handlers, fn)
}
func (f *fauxNet) Send(mes *swarm.Message) {
f.Chan.Outgoing <- mes
}
func (f *fauxNet) GetChan() *swarm.Chan {
return f.Chan
}
func (f *fauxNet) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
return nil, nil
}
func TestGetFailures(t *testing.T) {
fn := newFauxNet()
fn.Listen()
local := new(peer.Peer)
local.ID = peer.ID("test_peer")
d := NewDHT(local, fn)
other := &peer.Peer{ID: peer.ID("other_peer")}
d.Start()
d.Update(other)
// This one should time out
_, err := d.GetValue(u.Key("test"), time.Millisecond*10)
if err != nil {
if err != u.ErrTimeout {
t.Fatal("Got different error than we expected.")
}
} else {
t.Fatal("Did not get expected error!")
}
// Reply with failures to every message
fn.AddHandler(func(mes *swarm.Message) *swarm.Message {
pmes := new(PBDHTMessage)
err := proto.Unmarshal(mes.Data, pmes)
if err != nil {
t.Fatal(err)
}
resp := DHTMessage{
Type: pmes.GetType(),
Id: pmes.GetId(),
Response: true,
Success: false,
}
return swarm.NewMessage(mes.Peer, resp.ToProtobuf())
})
// This one should fail with NotFound
_, err = d.GetValue(u.Key("test"), time.Millisecond*1000)
if err != nil {
if err != u.ErrNotFound {
t.Fatalf("Expected ErrNotFound, got: %s", err)
}
} else {
t.Fatal("expected error, got none.")
}
success := make(chan struct{})
fn.handlers = nil
fn.AddHandler(func(mes *swarm.Message) *swarm.Message {
resp := new(PBDHTMessage)
err := proto.Unmarshal(mes.Data, resp)
if err != nil {
t.Fatal(err)
}
if resp.GetSuccess() {
t.Fatal("Get returned success when it shouldnt have.")
}
success <- struct{}{}
return nil
})
// Now we test this DHT's handleGetValue failure
req := DHTMessage{
Type: PBDHTMessage_GET_VALUE,
Key: "hello",
Id: GenerateMessageID(),
Value: []byte{0},
}
fn.Chan.Incoming <- swarm.NewMessage(other, req.ToProtobuf())
<-success
}

View File

@ -9,7 +9,7 @@ It is generated from these files:
messages.proto
It has these top-level messages:
DHTMessage
PBDHTMessage
*/
package dht
@ -20,101 +20,142 @@ import math "math"
var _ = proto.Marshal
var _ = math.Inf
type DHTMessage_MessageType int32
type PBDHTMessage_MessageType int32
const (
DHTMessage_PUT_VALUE DHTMessage_MessageType = 0
DHTMessage_GET_VALUE DHTMessage_MessageType = 1
DHTMessage_ADD_PROVIDER DHTMessage_MessageType = 2
DHTMessage_GET_PROVIDERS DHTMessage_MessageType = 3
DHTMessage_FIND_NODE DHTMessage_MessageType = 4
DHTMessage_PING DHTMessage_MessageType = 5
PBDHTMessage_PUT_VALUE PBDHTMessage_MessageType = 0
PBDHTMessage_GET_VALUE PBDHTMessage_MessageType = 1
PBDHTMessage_ADD_PROVIDER PBDHTMessage_MessageType = 2
PBDHTMessage_GET_PROVIDERS PBDHTMessage_MessageType = 3
PBDHTMessage_FIND_NODE PBDHTMessage_MessageType = 4
PBDHTMessage_PING PBDHTMessage_MessageType = 5
PBDHTMessage_DIAGNOSTIC PBDHTMessage_MessageType = 6
)
var DHTMessage_MessageType_name = map[int32]string{
var PBDHTMessage_MessageType_name = map[int32]string{
0: "PUT_VALUE",
1: "GET_VALUE",
2: "ADD_PROVIDER",
3: "GET_PROVIDERS",
4: "FIND_NODE",
5: "PING",
6: "DIAGNOSTIC",
}
var DHTMessage_MessageType_value = map[string]int32{
var PBDHTMessage_MessageType_value = map[string]int32{
"PUT_VALUE": 0,
"GET_VALUE": 1,
"ADD_PROVIDER": 2,
"GET_PROVIDERS": 3,
"FIND_NODE": 4,
"PING": 5,
"DIAGNOSTIC": 6,
}
func (x DHTMessage_MessageType) Enum() *DHTMessage_MessageType {
p := new(DHTMessage_MessageType)
func (x PBDHTMessage_MessageType) Enum() *PBDHTMessage_MessageType {
p := new(PBDHTMessage_MessageType)
*p = x
return p
}
func (x DHTMessage_MessageType) String() string {
return proto.EnumName(DHTMessage_MessageType_name, int32(x))
func (x PBDHTMessage_MessageType) String() string {
return proto.EnumName(PBDHTMessage_MessageType_name, int32(x))
}
func (x *DHTMessage_MessageType) UnmarshalJSON(data []byte) error {
value, err := proto.UnmarshalJSONEnum(DHTMessage_MessageType_value, data, "DHTMessage_MessageType")
func (x *PBDHTMessage_MessageType) UnmarshalJSON(data []byte) error {
value, err := proto.UnmarshalJSONEnum(PBDHTMessage_MessageType_value, data, "PBDHTMessage_MessageType")
if err != nil {
return err
}
*x = DHTMessage_MessageType(value)
*x = PBDHTMessage_MessageType(value)
return nil
}
type DHTMessage struct {
Type *DHTMessage_MessageType `protobuf:"varint,1,req,name=type,enum=dht.DHTMessage_MessageType" json:"type,omitempty"`
Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"`
Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"`
// Unique ID of this message, used to match queries with responses
Id *uint64 `protobuf:"varint,4,req,name=id" json:"id,omitempty"`
// Signals whether or not this message is a response to another message
Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"`
XXX_unrecognized []byte `json:"-"`
type PBDHTMessage struct {
Type *PBDHTMessage_MessageType `protobuf:"varint,1,req,name=type,enum=dht.PBDHTMessage_MessageType" json:"type,omitempty"`
Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"`
Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"`
Id *uint64 `protobuf:"varint,4,req,name=id" json:"id,omitempty"`
Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"`
Success *bool `protobuf:"varint,6,opt,name=success" json:"success,omitempty"`
Peers []*PBDHTMessage_PBPeer `protobuf:"bytes,7,rep,name=peers" json:"peers,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *DHTMessage) Reset() { *m = DHTMessage{} }
func (m *DHTMessage) String() string { return proto.CompactTextString(m) }
func (*DHTMessage) ProtoMessage() {}
func (m *PBDHTMessage) Reset() { *m = PBDHTMessage{} }
func (m *PBDHTMessage) String() string { return proto.CompactTextString(m) }
func (*PBDHTMessage) ProtoMessage() {}
func (m *DHTMessage) GetType() DHTMessage_MessageType {
func (m *PBDHTMessage) GetType() PBDHTMessage_MessageType {
if m != nil && m.Type != nil {
return *m.Type
}
return DHTMessage_PUT_VALUE
return PBDHTMessage_PUT_VALUE
}
func (m *DHTMessage) GetKey() string {
func (m *PBDHTMessage) GetKey() string {
if m != nil && m.Key != nil {
return *m.Key
}
return ""
}
func (m *DHTMessage) GetValue() []byte {
func (m *PBDHTMessage) GetValue() []byte {
if m != nil {
return m.Value
}
return nil
}
func (m *DHTMessage) GetId() uint64 {
func (m *PBDHTMessage) GetId() uint64 {
if m != nil && m.Id != nil {
return *m.Id
}
return 0
}
func (m *DHTMessage) GetResponse() bool {
func (m *PBDHTMessage) GetResponse() bool {
if m != nil && m.Response != nil {
return *m.Response
}
return false
}
func init() {
proto.RegisterEnum("dht.DHTMessage_MessageType", DHTMessage_MessageType_name, DHTMessage_MessageType_value)
func (m *PBDHTMessage) GetSuccess() bool {
if m != nil && m.Success != nil {
return *m.Success
}
return false
}
func (m *PBDHTMessage) GetPeers() []*PBDHTMessage_PBPeer {
if m != nil {
return m.Peers
}
return nil
}
type PBDHTMessage_PBPeer struct {
Id *string `protobuf:"bytes,1,req,name=id" json:"id,omitempty"`
Addr *string `protobuf:"bytes,2,req,name=addr" json:"addr,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *PBDHTMessage_PBPeer) Reset() { *m = PBDHTMessage_PBPeer{} }
func (m *PBDHTMessage_PBPeer) String() string { return proto.CompactTextString(m) }
func (*PBDHTMessage_PBPeer) ProtoMessage() {}
func (m *PBDHTMessage_PBPeer) GetId() string {
if m != nil && m.Id != nil {
return *m.Id
}
return ""
}
func (m *PBDHTMessage_PBPeer) GetAddr() string {
if m != nil && m.Addr != nil {
return *m.Addr
}
return ""
}
func init() {
proto.RegisterEnum("dht.PBDHTMessage_MessageType", PBDHTMessage_MessageType_name, PBDHTMessage_MessageType_value)
}

View File

@ -2,7 +2,7 @@ package dht;
//run `protoc --go_out=. *.proto` to generate
message DHTMessage {
message PBDHTMessage {
enum MessageType {
PUT_VALUE = 0;
GET_VALUE = 1;
@ -10,6 +10,12 @@ message DHTMessage {
GET_PROVIDERS = 3;
FIND_NODE = 4;
PING = 5;
DIAGNOSTIC = 6;
}
message PBPeer {
required string id = 1;
required string addr = 2;
}
required MessageType type = 1;
@ -21,4 +27,8 @@ message DHTMessage {
// Signals whether or not this message is a response to another message
optional bool response = 5;
optional bool success = 6;
// Used for returning peers from queries (normally, peers closer to X)
repeated PBPeer peers = 7;
}

View File

@ -1,17 +1,30 @@
package dht
import (
"bytes"
"encoding/json"
"errors"
"math/rand"
"sync"
"time"
proto "code.google.com/p/goprotobuf/proto"
ma "github.com/jbenet/go-multiaddr"
peer "github.com/jbenet/go-ipfs/peer"
kb "github.com/jbenet/go-ipfs/routing/kbucket"
swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util"
)
// Pool size is the number of nodes used for group find/set RPC calls
var PoolSize = 6
// TODO: determine a way of creating and managing message IDs
func GenerateMessageID() uint64 {
return uint64(rand.Uint32()) << 32 & uint64(rand.Uint32())
//return (uint64(rand.Uint32()) << 32) & uint64(rand.Uint32())
return uint64(rand.Uint32())
}
// This file implements the Routing interface for the IpfsDHT struct.
@ -19,56 +32,190 @@ func GenerateMessageID() uint64 {
// Basic Put/Get
// PutValue adds value corresponding to given Key.
func (s *IpfsDHT) PutValue(key u.Key, value []byte) error {
var p *peer.Peer
p = s.routes.NearestNode(key)
// This is the top level "Store" operation of the DHT
func (s *IpfsDHT) PutValue(key u.Key, value []byte) {
complete := make(chan struct{})
for _, route := range s.routes {
p := route.NearestPeer(kb.ConvertKey(key))
if p == nil {
s.network.Error(kb.ErrLookupFailure)
go func() {
complete <- struct{}{}
}()
continue
}
go func() {
err := s.putValueToNetwork(p, string(key), value)
if err != nil {
s.network.Error(err)
}
complete <- struct{}{}
}()
}
for _, _ = range s.routes {
<-complete
}
}
pmes_type := DHTMessage_PUT_VALUE
str_key := string(key)
mes_id := GenerateMessageID()
// A counter for incrementing a variable across multiple threads
type counter struct {
n int
mut sync.RWMutex
}
pmes := new(DHTMessage)
pmes.Type = &pmes_type
pmes.Key = &str_key
pmes.Value = value
pmes.Id = &mes_id
func (c *counter) Increment() {
c.mut.Lock()
c.n++
c.mut.Unlock()
}
mes := new(swarm.Message)
mes.Data = []byte(pmes.String())
mes.Peer = p
func (c *counter) Decrement() {
c.mut.Lock()
c.n--
c.mut.Unlock()
}
s.network.Chan.Outgoing <- mes
return nil
func (c *counter) Size() int {
c.mut.RLock()
defer c.mut.RUnlock()
return c.n
}
type peerSet struct {
ps map[string]bool
lk sync.RWMutex
}
func newPeerSet() *peerSet {
ps := new(peerSet)
ps.ps = make(map[string]bool)
return ps
}
func (ps *peerSet) Add(p *peer.Peer) {
ps.lk.Lock()
ps.ps[string(p.ID)] = true
ps.lk.Unlock()
}
func (ps *peerSet) Contains(p *peer.Peer) bool {
ps.lk.RLock()
_, ok := ps.ps[string(p.ID)]
ps.lk.RUnlock()
return ok
}
func (ps *peerSet) Size() int {
ps.lk.RLock()
defer ps.lk.RUnlock()
return len(ps.ps)
}
// GetValue searches for the value corresponding to given Key.
// If the search does not succeed, a multiaddr string of a closer peer is
// returned along with util.ErrSearchIncomplete
func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
var p *peer.Peer
p = s.routes.NearestNode(key)
ll := startNewRpc("GET")
defer func() {
ll.EndLog()
ll.Print()
}()
str_key := string(key)
mes_type := DHTMessage_GET_VALUE
mes_id := GenerateMessageID()
// protobuf structure
pmes := new(DHTMessage)
pmes.Type = &mes_type
pmes.Key = &str_key
pmes.Id = &mes_id
// If we have it local, dont bother doing an RPC!
// NOTE: this might not be what we want to do...
val, err := s.GetLocal(key)
if err == nil {
ll.Success = true
u.DOut("Found local, returning.")
return val, nil
}
mes := new(swarm.Message)
mes.Data = []byte(pmes.String())
mes.Peer = p
route_level := 0
closest := s.routes[route_level].NearestPeers(kb.ConvertKey(key), PoolSize)
if closest == nil || len(closest) == 0 {
return nil, kb.ErrLookupFailure
}
response_chan := s.ListenFor(*pmes.Id)
val_chan := make(chan []byte)
npeer_chan := make(chan *peer.Peer, 30)
proc_peer := make(chan *peer.Peer, 30)
err_chan := make(chan error)
after := time.After(timeout)
pset := newPeerSet()
for _, p := range closest {
pset.Add(p)
npeer_chan <- p
}
c := counter{}
// This limit value is referred to as k in the kademlia paper
limit := 20
count := 0
go func() {
for {
select {
case p := <-npeer_chan:
count++
if count >= limit {
break
}
c.Increment()
proc_peer <- p
default:
if c.Size() == 0 {
err_chan <- u.ErrNotFound
}
}
}
}()
process := func() {
for {
select {
case p, ok := <-proc_peer:
if !ok || p == nil {
c.Decrement()
return
}
val, peers, err := s.getValueOrPeers(p, key, timeout/4, route_level)
if err != nil {
u.DErr(err.Error())
c.Decrement()
continue
}
if val != nil {
val_chan <- val
c.Decrement()
return
}
for _, np := range peers {
// TODO: filter out peers that arent closer
if !pset.Contains(np) && pset.Size() < limit {
pset.Add(np) //This is racey... make a single function to do operation
npeer_chan <- np
}
}
c.Decrement()
}
}
}
concurFactor := 3
for i := 0; i < concurFactor; i++ {
go process()
}
// Wait for either the response or a timeout
timeup := time.After(timeout)
select {
case <-timeup:
// TODO: unregister listener
case val := <-val_chan:
return val, nil
case err := <-err_chan:
return nil, err
case <-after:
return nil, u.ErrTimeout
case resp := <-response_chan:
return resp.Data, nil
}
}
@ -77,17 +224,196 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
// Announce that this node can provide value for given key
func (s *IpfsDHT) Provide(key u.Key) error {
return u.ErrNotImplemented
peers := s.routes[0].NearestPeers(kb.ConvertKey(key), PoolSize)
if len(peers) == 0 {
return kb.ErrLookupFailure
}
pmes := DHTMessage{
Type: PBDHTMessage_ADD_PROVIDER,
Key: string(key),
}
pbmes := pmes.ToProtobuf()
for _, p := range peers {
mes := swarm.NewMessage(p, pbmes)
s.network.Send(mes)
}
return nil
}
// FindProviders searches for peers who can provide the value for given key.
func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) (*peer.Peer, error) {
return nil, u.ErrNotImplemented
func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) {
ll := startNewRpc("FindProviders")
defer func() {
ll.EndLog()
ll.Print()
}()
u.DOut("Find providers for: '%s'", key)
p := s.routes[0].NearestPeer(kb.ConvertKey(key))
if p == nil {
return nil, kb.ErrLookupFailure
}
for level := 0; level < len(s.routes); {
pmes, err := s.findProvidersSingle(p, key, level, timeout)
if err != nil {
return nil, err
}
if pmes.GetSuccess() {
provs := s.addPeerList(key, pmes.GetPeers())
ll.Success = true
return provs, nil
} else {
closer := pmes.GetPeers()
if len(closer) == 0 {
level++
continue
}
if peer.ID(closer[0].GetId()).Equal(s.self.ID) {
u.DOut("Got myself back as a closer peer.")
return nil, u.ErrNotFound
}
maddr, err := ma.NewMultiaddr(closer[0].GetAddr())
if err != nil {
// ??? Move up route level???
panic("not yet implemented")
}
np, err := s.network.GetConnection(peer.ID(closer[0].GetId()), maddr)
if err != nil {
u.PErr("[%s] Failed to connect to: %s", s.self.ID.Pretty(), closer[0].GetAddr())
level++
continue
}
p = np
}
}
return nil, u.ErrNotFound
}
// Find specific Peer
// FindPeer searches for a peer with given ID.
func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
return nil, u.ErrNotImplemented
// Check if were already connected to them
p, _ := s.Find(id)
if p != nil {
return p, nil
}
route_level := 0
p = s.routes[route_level].NearestPeer(kb.ConvertPeerID(id))
if p == nil {
return nil, kb.ErrLookupFailure
}
if p.ID.Equal(id) {
return p, nil
}
for route_level < len(s.routes) {
pmes, err := s.findPeerSingle(p, id, timeout, route_level)
plist := pmes.GetPeers()
if len(plist) == 0 {
route_level++
}
found := plist[0]
addr, err := ma.NewMultiaddr(found.GetAddr())
if err != nil {
return nil, err
}
nxtPeer, err := s.network.GetConnection(peer.ID(found.GetId()), addr)
if err != nil {
return nil, err
}
if pmes.GetSuccess() {
if !id.Equal(nxtPeer.ID) {
return nil, errors.New("got back invalid peer from 'successful' response")
}
return nxtPeer, nil
} else {
p = nxtPeer
}
}
return nil, u.ErrNotFound
}
// Ping a peer, log the time it took
func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
// Thoughts: maybe this should accept an ID and do a peer lookup?
u.DOut("Enter Ping.")
pmes := DHTMessage{Id: GenerateMessageID(), Type: PBDHTMessage_PING}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
before := time.Now()
response_chan := dht.listener.Listen(pmes.Id, 1, time.Minute)
dht.network.Send(mes)
tout := time.After(timeout)
select {
case <-response_chan:
roundtrip := time.Since(before)
p.SetLatency(roundtrip)
u.DOut("Ping took %s.", roundtrip.String())
return nil
case <-tout:
// Timed out, think about removing peer from network
u.DOut("Ping peer timed out.")
dht.listener.Unlisten(pmes.Id)
return u.ErrTimeout
}
}
func (dht *IpfsDHT) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
u.DOut("Begin Diagnostic")
//Send to N closest peers
targets := dht.routes[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
// TODO: Add timeout to this struct so nodes know when to return
pmes := DHTMessage{
Type: PBDHTMessage_DIAGNOSTIC,
Id: GenerateMessageID(),
}
listenChan := dht.listener.Listen(pmes.Id, len(targets), time.Minute*2)
pbmes := pmes.ToProtobuf()
for _, p := range targets {
mes := swarm.NewMessage(p, pbmes)
dht.network.Send(mes)
}
var out []*diagInfo
after := time.After(timeout)
for count := len(targets); count > 0; {
select {
case <-after:
u.DOut("Diagnostic request timed out.")
return out, u.ErrTimeout
case resp := <-listenChan:
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
if err != nil {
// NOTE: here and elsewhere, need to audit error handling,
// some errors should be continued on from
return out, err
}
dec := json.NewDecoder(bytes.NewBuffer(pmes_out.GetValue()))
for {
di := new(diagInfo)
err := dec.Decode(di)
if err != nil {
break
}
out = append(out, di)
}
}
}
return nil, nil
}

87
routing/kbucket/bucket.go Normal file
View File

@ -0,0 +1,87 @@
package dht
import (
"container/list"
"sync"
peer "github.com/jbenet/go-ipfs/peer"
)
// Bucket holds a list of peers.
type Bucket struct {
lk sync.RWMutex
list *list.List
}
func NewBucket() *Bucket {
b := new(Bucket)
b.list = list.New()
return b
}
func (b *Bucket) Find(id peer.ID) *list.Element {
b.lk.RLock()
defer b.lk.RUnlock()
for e := b.list.Front(); e != nil; e = e.Next() {
if e.Value.(*peer.Peer).ID.Equal(id) {
return e
}
}
return nil
}
func (b *Bucket) MoveToFront(e *list.Element) {
b.lk.Lock()
b.list.MoveToFront(e)
b.lk.Unlock()
}
func (b *Bucket) PushFront(p *peer.Peer) {
b.lk.Lock()
b.list.PushFront(p)
b.lk.Unlock()
}
func (b *Bucket) PopBack() *peer.Peer {
b.lk.Lock()
defer b.lk.Unlock()
last := b.list.Back()
b.list.Remove(last)
return last.Value.(*peer.Peer)
}
func (b *Bucket) Len() int {
b.lk.RLock()
defer b.lk.RUnlock()
return b.list.Len()
}
// Splits a buckets peers into two buckets, the methods receiver will have
// peers with CPL equal to cpl, the returned bucket will have peers with CPL
// greater than cpl (returned bucket has closer peers)
func (b *Bucket) Split(cpl int, target ID) *Bucket {
b.lk.Lock()
defer b.lk.Unlock()
out := list.New()
newbuck := NewBucket()
newbuck.list = out
e := b.list.Front()
for e != nil {
peer_id := ConvertPeerID(e.Value.(*peer.Peer).ID)
peer_cpl := prefLen(peer_id, target)
if peer_cpl > cpl {
cur := e
out.PushBack(e.Value)
e = e.Next()
b.list.Remove(cur)
continue
}
e = e.Next()
}
return newbuck
}
func (b *Bucket) getIter() *list.Element {
return b.list.Front()
}

213
routing/kbucket/table.go Normal file
View File

@ -0,0 +1,213 @@
package dht
import (
"container/list"
"fmt"
"sort"
"sync"
"time"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)
// RoutingTable defines the routing table.
type RoutingTable struct {
// ID of the local peer
local ID
// Blanket lock, refine later for better performance
tabLock sync.RWMutex
// Maximum acceptable latency for peers in this cluster
maxLatency time.Duration
// kBuckets define all the fingers to other nodes.
Buckets []*Bucket
bucketsize int
}
func NewRoutingTable(bucketsize int, local_id ID, latency time.Duration) *RoutingTable {
rt := new(RoutingTable)
rt.Buckets = []*Bucket{NewBucket()}
rt.bucketsize = bucketsize
rt.local = local_id
rt.maxLatency = latency
return rt
}
// Update adds or moves the given peer to the front of its respective bucket
// If a peer gets removed from a bucket, it is returned
func (rt *RoutingTable) Update(p *peer.Peer) *peer.Peer {
rt.tabLock.Lock()
defer rt.tabLock.Unlock()
peer_id := ConvertPeerID(p.ID)
cpl := xor(peer_id, rt.local).commonPrefixLen()
b_id := cpl
if b_id >= len(rt.Buckets) {
b_id = len(rt.Buckets) - 1
}
bucket := rt.Buckets[b_id]
e := bucket.Find(p.ID)
if e == nil {
// New peer, add to bucket
if p.GetLatency() > rt.maxLatency {
// Connection doesnt meet requirements, skip!
return nil
}
bucket.PushFront(p)
// Are we past the max bucket size?
if bucket.Len() > rt.bucketsize {
if b_id == len(rt.Buckets)-1 {
new_bucket := bucket.Split(b_id, rt.local)
rt.Buckets = append(rt.Buckets, new_bucket)
if new_bucket.Len() > rt.bucketsize {
// TODO: This is a very rare and annoying case
panic("Case not handled.")
}
// If all elements were on left side of split...
if bucket.Len() > rt.bucketsize {
return bucket.PopBack()
}
} else {
// If the bucket cant split kick out least active node
return bucket.PopBack()
}
}
return nil
} else {
// If the peer is already in the table, move it to the front.
// This signifies that it it "more active" and the less active nodes
// Will as a result tend towards the back of the list
bucket.MoveToFront(e)
return nil
}
}
// A helper struct to sort peers by their distance to the local node
type peerDistance struct {
p *peer.Peer
distance ID
}
// peerSorterArr implements sort.Interface to sort peers by xor distance
type peerSorterArr []*peerDistance
func (p peerSorterArr) Len() int { return len(p) }
func (p peerSorterArr) Swap(a, b int) { p[a], p[b] = p[b], p[a] }
func (p peerSorterArr) Less(a, b int) bool {
return p[a].distance.Less(p[b].distance)
}
//
func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) peerSorterArr {
for e := peerList.Front(); e != nil; e = e.Next() {
p := e.Value.(*peer.Peer)
p_id := ConvertPeerID(p.ID)
pd := peerDistance{
p: p,
distance: xor(target, p_id),
}
peerArr = append(peerArr, &pd)
if e == nil {
u.POut("list element was nil.")
return peerArr
}
}
return peerArr
}
// Find a specific peer by ID or return nil
func (rt *RoutingTable) Find(id peer.ID) *peer.Peer {
srch := rt.NearestPeers(ConvertPeerID(id), 1)
if len(srch) == 0 || !srch[0].ID.Equal(id) {
return nil
}
return srch[0]
}
// Returns a single peer that is nearest to the given ID
func (rt *RoutingTable) NearestPeer(id ID) *peer.Peer {
peers := rt.NearestPeers(id, 1)
if len(peers) > 0 {
return peers[0]
} else {
return nil
}
}
// Returns a list of the 'count' closest peers to the given ID
func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer {
rt.tabLock.RLock()
defer rt.tabLock.RUnlock()
cpl := prefLen(id, rt.local)
// Get bucket at cpl index or last bucket
var bucket *Bucket
if cpl >= len(rt.Buckets) {
cpl = len(rt.Buckets) - 1
}
bucket = rt.Buckets[cpl]
var peerArr peerSorterArr
if bucket.Len() == 0 {
// In the case of an unusual split, one bucket may be empty.
// if this happens, search both surrounding buckets for nearest peer
if cpl > 0 {
plist := rt.Buckets[cpl-1].list
peerArr = copyPeersFromList(id, peerArr, plist)
}
if cpl < len(rt.Buckets)-1 {
plist := rt.Buckets[cpl+1].list
peerArr = copyPeersFromList(id, peerArr, plist)
}
} else {
peerArr = copyPeersFromList(id, peerArr, bucket.list)
}
// Sort by distance to local peer
sort.Sort(peerArr)
var out []*peer.Peer
for i := 0; i < count && i < peerArr.Len(); i++ {
out = append(out, peerArr[i].p)
}
return out
}
// Returns the total number of peers in the routing table
func (rt *RoutingTable) Size() int {
var tot int
for _, buck := range rt.Buckets {
tot += buck.Len()
}
return tot
}
// NOTE: This is potentially unsafe... use at your own risk
func (rt *RoutingTable) Listpeers() []*peer.Peer {
var peers []*peer.Peer
for _, buck := range rt.Buckets {
for e := buck.getIter(); e != nil; e = e.Next() {
peers = append(peers, e.Value.(*peer.Peer))
}
}
return peers
}
func (rt *RoutingTable) Print() {
fmt.Printf("Routing Table, bs = %d, Max latency = %d\n", rt.bucketsize, rt.maxLatency)
rt.tabLock.RLock()
peers := rt.Listpeers()
for i, p := range peers {
fmt.Printf("%d) %s %s\n", i, p.ID.Pretty(), p.GetLatency().String())
}
}

View File

@ -0,0 +1,199 @@
package dht
import (
crand "crypto/rand"
"crypto/sha256"
"math/rand"
"testing"
"time"
peer "github.com/jbenet/go-ipfs/peer"
)
func _randPeer() *peer.Peer {
p := new(peer.Peer)
p.ID = make(peer.ID, 16)
crand.Read(p.ID)
return p
}
func _randID() ID {
buf := make([]byte, 16)
crand.Read(buf)
hash := sha256.Sum256(buf)
return ID(hash[:])
}
// Test basic features of the bucket struct
func TestBucket(t *testing.T) {
b := NewBucket()
peers := make([]*peer.Peer, 100)
for i := 0; i < 100; i++ {
peers[i] = _randPeer()
b.PushFront(peers[i])
}
local := _randPeer()
local_id := ConvertPeerID(local.ID)
i := rand.Intn(len(peers))
e := b.Find(peers[i].ID)
if e == nil {
t.Errorf("Failed to find peer: %v", peers[i])
}
spl := b.Split(0, ConvertPeerID(local.ID))
llist := b.list
for e := llist.Front(); e != nil; e = e.Next() {
p := ConvertPeerID(e.Value.(*peer.Peer).ID)
cpl := xor(p, local_id).commonPrefixLen()
if cpl > 0 {
t.Fatalf("Split failed. found id with cpl > 0 in 0 bucket")
}
}
rlist := spl.list
for e := rlist.Front(); e != nil; e = e.Next() {
p := ConvertPeerID(e.Value.(*peer.Peer).ID)
cpl := xor(p, local_id).commonPrefixLen()
if cpl == 0 {
t.Fatalf("Split failed. found id with cpl == 0 in non 0 bucket")
}
}
}
// Right now, this just makes sure that it doesnt hang or crash
func TestTableUpdate(t *testing.T) {
local := _randPeer()
rt := NewRoutingTable(10, ConvertPeerID(local.ID), time.Hour)
peers := make([]*peer.Peer, 100)
for i := 0; i < 100; i++ {
peers[i] = _randPeer()
}
// Testing Update
for i := 0; i < 10000; i++ {
p := rt.Update(peers[rand.Intn(len(peers))])
if p != nil {
t.Log("evicted peer.")
}
}
for i := 0; i < 100; i++ {
id := _randID()
ret := rt.NearestPeers(id, 5)
if len(ret) == 0 {
t.Fatal("Failed to find node near ID.")
}
}
}
func TestTableFind(t *testing.T) {
local := _randPeer()
rt := NewRoutingTable(10, ConvertPeerID(local.ID), time.Hour)
peers := make([]*peer.Peer, 100)
for i := 0; i < 5; i++ {
peers[i] = _randPeer()
rt.Update(peers[i])
}
t.Logf("Searching for peer: '%s'", peers[2].ID.Pretty())
found := rt.NearestPeer(ConvertPeerID(peers[2].ID))
if !found.ID.Equal(peers[2].ID) {
t.Fatalf("Failed to lookup known node...")
}
}
func TestTableFindMultiple(t *testing.T) {
local := _randPeer()
rt := NewRoutingTable(20, ConvertPeerID(local.ID), time.Hour)
peers := make([]*peer.Peer, 100)
for i := 0; i < 18; i++ {
peers[i] = _randPeer()
rt.Update(peers[i])
}
t.Logf("Searching for peer: '%s'", peers[2].ID.Pretty())
found := rt.NearestPeers(ConvertPeerID(peers[2].ID), 15)
if len(found) != 15 {
t.Fatalf("Got back different number of peers than we expected.")
}
}
// Looks for race conditions in table operations. For a more 'certain'
// test, increase the loop counter from 1000 to a much higher number
// and set GOMAXPROCS above 1
func TestTableMultithreaded(t *testing.T) {
local := peer.ID("localPeer")
tab := NewRoutingTable(20, ConvertPeerID(local), time.Hour)
var peers []*peer.Peer
for i := 0; i < 500; i++ {
peers = append(peers, _randPeer())
}
done := make(chan struct{})
go func() {
for i := 0; i < 1000; i++ {
n := rand.Intn(len(peers))
tab.Update(peers[n])
}
done <- struct{}{}
}()
go func() {
for i := 0; i < 1000; i++ {
n := rand.Intn(len(peers))
tab.Update(peers[n])
}
done <- struct{}{}
}()
go func() {
for i := 0; i < 1000; i++ {
n := rand.Intn(len(peers))
tab.Find(peers[n].ID)
}
done <- struct{}{}
}()
<-done
<-done
<-done
}
func BenchmarkUpdates(b *testing.B) {
b.StopTimer()
local := ConvertKey("localKey")
tab := NewRoutingTable(20, local, time.Hour)
var peers []*peer.Peer
for i := 0; i < b.N; i++ {
peers = append(peers, _randPeer())
}
b.StartTimer()
for i := 0; i < b.N; i++ {
tab.Update(peers[i])
}
}
func BenchmarkFinds(b *testing.B) {
b.StopTimer()
local := ConvertKey("localKey")
tab := NewRoutingTable(20, local, time.Hour)
var peers []*peer.Peer
for i := 0; i < b.N; i++ {
peers = append(peers, _randPeer())
tab.Update(peers[i])
}
b.StartTimer()
for i := 0; i < b.N; i++ {
tab.Find(peers[i].ID)
}
}

View File

@ -2,40 +2,30 @@ package dht
import (
"bytes"
"container/list"
"crypto/sha256"
"errors"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)
// Returned if a routing table query returns no results. This is NOT expected
// behaviour
var ErrLookupFailure = errors.New("failed to find any peer in table")
// ID for IpfsDHT should be a byte slice, to allow for simpler operations
// (xor). DHT ids are based on the peer.IDs.
//
// NOTE: peer.IDs are biased because they are multihashes (first bytes
// biased). Thus, may need to re-hash keys (uniform dist). TODO(jbenet)
// The type dht.ID signifies that its contents have been hashed from either a
// peer.ID or a util.Key. This unifies the keyspace
type ID []byte
// Bucket holds a list of peers.
type Bucket []*list.List
// RoutingTable defines the routing table.
type RoutingTable struct {
// kBuckets define all the fingers to other nodes.
Buckets []Bucket
}
//TODO: make this accept an ID, requires method of converting keys to IDs
func (rt *RoutingTable) NearestNode(key u.Key) *peer.Peer {
panic("Function not implemented.")
}
func (id ID) Equal(other ID) bool {
return bytes.Equal(id, other)
}
func (id ID) Less(other interface{}) bool {
a, b := equalizeSizes(id, other.(ID))
func (id ID) Less(other ID) bool {
a, b := equalizeSizes(id, other)
for i := 0; i < len(a); i++ {
if a[i] != b[i] {
return a[i] < b[i]
@ -55,6 +45,10 @@ func (id ID) commonPrefixLen() int {
return len(id)*8 - 1
}
func prefLen(a, b ID) int {
return xor(a, b).commonPrefixLen()
}
func xor(a, b ID) ID {
a, b = equalizeSizes(a, b)
@ -81,3 +75,24 @@ func equalizeSizes(a, b ID) (ID, ID) {
return a, b
}
func ConvertPeerID(id peer.ID) ID {
hash := sha256.Sum256(id)
return hash[:]
}
func ConvertKey(id u.Key) ID {
hash := sha256.Sum256([]byte(id))
return hash[:]
}
// Returns true if a is closer to key than b is
func Closer(a, b peer.ID, key u.Key) bool {
aid := ConvertPeerID(a)
bid := ConvertPeerID(b)
tgt := ConvertKey(key)
adist := xor(aid, tgt)
bdist := xor(bid, tgt)
return adist.Less(bdist)
}

View File

@ -25,7 +25,7 @@ type IpfsRouting interface {
Provide(key u.Key) error
// FindProviders searches for peers who can provide the value for given key.
FindProviders(key u.Key, timeout time.Duration) (*peer.Peer, error)
FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error)
// Find specific Peer

View File

@ -2,11 +2,12 @@ package swarm
import (
"fmt"
"net"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
msgio "github.com/jbenet/go-msgio"
ma "github.com/jbenet/go-multiaddr"
"net"
)
// ChanBuffer is the size of the buffer in the Conn Chan

20
swarm/interface.go Normal file
View File

@ -0,0 +1,20 @@
package swarm
import (
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
ma "github.com/jbenet/go-multiaddr"
)
type Network interface {
Send(*Message)
Error(error)
Find(u.Key) *peer.Peer
Listen() error
ConnectNew(*ma.Multiaddr) (*peer.Peer, error)
GetConnection(id peer.ID, addr *ma.Multiaddr) (*peer.Peer, error)
GetChan() *Chan
Close()
Drop(*peer.Peer) error
}

View File

@ -1,12 +1,16 @@
package swarm
import (
"errors"
"fmt"
"net"
"sync"
proto "code.google.com/p/goprotobuf/proto"
ident "github.com/jbenet/go-ipfs/identify"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
ma "github.com/jbenet/go-multiaddr"
"net"
"sync"
)
// Message represents a packet of information sent to or received from a
@ -19,6 +23,19 @@ type Message struct {
Data []byte
}
// Cleaner looking helper function to make a new message struct
func NewMessage(p *peer.Peer, data proto.Message) *Message {
bytes, err := proto.Marshal(data)
if err != nil {
u.PErr(err.Error())
return nil
}
return &Message{
Peer: p,
Data: bytes,
}
}
// Chan is a swam channel, which provides duplex communication and errors.
type Chan struct {
Outgoing chan *Message
@ -32,11 +49,30 @@ func NewChan(bufsize int) *Chan {
return &Chan{
Outgoing: make(chan *Message, bufsize),
Incoming: make(chan *Message, bufsize),
Errors: make(chan error),
Errors: make(chan error, bufsize),
Close: make(chan bool, bufsize),
}
}
// Contains a set of errors mapping to each of the swarms addresses
// that were listened on
type SwarmListenErr struct {
Errors []error
}
func (se *SwarmListenErr) Error() string {
if se == nil {
return "<nil error>"
}
var out string
for i, v := range se.Errors {
if v != nil {
out += fmt.Sprintf("%d: %s\n", i, v)
}
}
return out
}
// Swarm is a connection muxer, allowing connections to other peers to
// be opened and closed, while still using the same Chan for all
// communication. The Chan sends/receives Messages, which note the
@ -46,7 +82,8 @@ type Swarm struct {
conns ConnMap
connsLock sync.RWMutex
local *peer.Peer
local *peer.Peer
listeners []net.Listener
}
// NewSwarm constructs a Swarm, with a Chan.
@ -61,13 +98,23 @@ func NewSwarm(local *peer.Peer) *Swarm {
}
// Open listeners for each network the swarm should listen on
func (s *Swarm) Listen() {
for _, addr := range s.local.Addresses {
func (s *Swarm) Listen() error {
var ret_err *SwarmListenErr
for i, addr := range s.local.Addresses {
err := s.connListen(addr)
if err != nil {
if ret_err == nil {
ret_err = new(SwarmListenErr)
ret_err.Errors = make([]error, len(s.local.Addresses))
}
ret_err.Errors[i] = err
u.PErr("Failed to listen on: %s [%s]", addr, err)
}
}
if ret_err == nil {
return nil
}
return ret_err
}
// Listen for new connections on the given multiaddr
@ -82,12 +129,17 @@ func (s *Swarm) connListen(maddr *ma.Multiaddr) error {
return err
}
// NOTE: this may require a lock around it later. currently, only run on setup
s.listeners = append(s.listeners, list)
// Accept and handle new connections on this listener until it errors
go func() {
for {
nconn, err := list.Accept()
if err != nil {
u.PErr("Failed to accept connection: %s - %s", netstr, addr)
e := fmt.Errorf("Failed to accept connection: %s - %s [%s]",
netstr, addr, err)
go func() { s.Chan.Errors <- e }()
return
}
go s.handleNewConn(nconn)
@ -99,7 +151,31 @@ func (s *Swarm) connListen(maddr *ma.Multiaddr) error {
// Handle getting ID from this peer and adding it into the map
func (s *Swarm) handleNewConn(nconn net.Conn) {
panic("Not yet implemented!")
p := new(peer.Peer)
conn := &Conn{
Peer: p,
Addr: nil,
Conn: nconn,
}
newConnChans(conn)
err := ident.Handshake(s.local, p, conn.Incoming.MsgChan, conn.Outgoing.MsgChan)
if err != nil {
u.PErr(err.Error())
conn.Close()
return
}
// Get address to contact remote peer from
addr := <-conn.Incoming.MsgChan
maddr, err := ma.NewMultiaddr(string(addr))
if err != nil {
u.PErr("Got invalid address from peer.")
}
p.AddAddress(maddr)
s.StartConn(conn)
}
// Close closes a swarm.
@ -113,6 +189,10 @@ func (s *Swarm) Close() {
}
s.Chan.Close <- true // fan out
s.Chan.Close <- true // listener
for _, list := range s.listeners {
list.Close()
}
}
// Dial connects to a peer.
@ -123,7 +203,7 @@ func (s *Swarm) Close() {
// etc. to achive connection.
//
// For now, Dial uses only TCP. This will be extended.
func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error) {
func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error, bool) {
k := peer.Key()
// check if we already have an open connection first
@ -131,23 +211,32 @@ func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error) {
conn, found := s.conns[k]
s.connsLock.RUnlock()
if found {
return conn, nil
return conn, nil, true
}
// open connection to peer
conn, err := Dial("tcp", peer)
if err != nil {
return nil, err
return nil, err, false
}
return conn, nil, false
}
func (s *Swarm) StartConn(conn *Conn) error {
if conn == nil {
return errors.New("Tried to start nil connection.")
}
u.DOut("Starting connection: %s", conn.Peer.Key().Pretty())
// add to conns
s.connsLock.Lock()
s.conns[k] = conn
s.conns[conn.Peer.Key()] = conn
s.connsLock.Unlock()
// kick off reader goroutine
go s.fanIn(conn)
return conn, nil
return nil
}
// Handles the unwrapping + sending of messages to the right connection.
@ -160,13 +249,17 @@ func (s *Swarm) fanOut() {
if !ok {
return
}
//u.DOut("fanOut: outgoing message for: '%s'", msg.Peer.Key().Pretty())
s.connsLock.RLock()
conn, found := s.conns[msg.Peer.Key()]
s.connsLock.RUnlock()
if !found {
e := fmt.Errorf("Sent msg to peer without open conn: %v", msg.Peer)
e := fmt.Errorf("Sent msg to peer without open conn: %v",
msg.Peer)
s.Chan.Errors <- e
continue
}
// queue it in the connection's buffer
@ -178,22 +271,21 @@ func (s *Swarm) fanOut() {
// Handles the receiving + wrapping of messages, per conn.
// Consider using reflect.Select with one goroutine instead of n.
func (s *Swarm) fanIn(conn *Conn) {
Loop:
for {
select {
case <-s.Chan.Close:
// close Conn.
conn.Close()
break Loop
goto out
case <-conn.Closed:
break Loop
goto out
case data, ok := <-conn.Incoming.MsgChan:
if !ok {
e := fmt.Errorf("Error retrieving from conn: %v", conn)
e := fmt.Errorf("Error retrieving from conn: %v", conn.Peer.Key().Pretty())
s.Chan.Errors <- e
break Loop
goto out
}
// wrap it for consumers.
@ -201,8 +293,110 @@ Loop:
s.Chan.Incoming <- msg
}
}
out:
s.connsLock.Lock()
delete(s.conns, conn.Peer.Key())
s.connsLock.Unlock()
}
func (s *Swarm) Find(key u.Key) *peer.Peer {
conn, found := s.conns[key]
if !found {
return nil
}
return conn.Peer
}
// GetConnection will check if we are already connected to the peer in question
// and only open a new connection if we arent already
func (s *Swarm) GetConnection(id peer.ID, addr *ma.Multiaddr) (*peer.Peer, error) {
p := &peer.Peer{
ID: id,
Addresses: []*ma.Multiaddr{addr},
}
if id.Equal(s.local.ID) {
panic("Attempted connection to self!")
}
conn, err, reused := s.Dial(p)
if err != nil {
return nil, err
}
if reused {
return p, nil
}
err = s.handleDialedCon(conn)
return conn.Peer, err
}
func (s *Swarm) handleDialedCon(conn *Conn) error {
err := ident.Handshake(s.local, conn.Peer, conn.Incoming.MsgChan, conn.Outgoing.MsgChan)
if err != nil {
return err
}
// Send node an address that you can be reached on
myaddr := s.local.NetAddress("tcp")
mastr, err := myaddr.String()
if err != nil {
errors.New("No local address to send to peer.")
}
conn.Outgoing.MsgChan <- []byte(mastr)
s.StartConn(conn)
return nil
}
// ConnectNew is for connecting to a peer when you dont know their ID,
// Should only be used when you are sure that you arent already connected to peer in question
func (s *Swarm) ConnectNew(addr *ma.Multiaddr) (*peer.Peer, error) {
if addr == nil {
return nil, errors.New("nil Multiaddr passed to swarm.Connect()")
}
npeer := new(peer.Peer)
npeer.AddAddress(addr)
conn, err := Dial("tcp", npeer)
if err != nil {
return nil, err
}
err = s.handleDialedCon(conn)
return npeer, err
}
// Removes a given peer from the swarm and closes connections to it
func (s *Swarm) Drop(p *peer.Peer) error {
s.connsLock.RLock()
conn, found := s.conns[u.Key(p.ID)]
s.connsLock.RUnlock()
if !found {
return u.ErrNotFound
}
s.connsLock.Lock()
delete(s.conns, u.Key(p.ID))
s.connsLock.Unlock()
return conn.Close()
}
func (s *Swarm) Send(mes *Message) {
s.Chan.Outgoing <- mes
}
func (s *Swarm) Error(e error) {
s.Chan.Errors <- e
}
func (s *Swarm) GetChan() *Chan {
return s.Chan
}
var _ Network = &Swarm{}

View File

@ -2,11 +2,12 @@ package swarm
import (
"fmt"
"net"
"testing"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
msgio "github.com/jbenet/go-msgio"
"net"
"testing"
)
func pingListen(listener *net.TCPListener, peer *peer.Peer) {
@ -43,8 +44,8 @@ func pong(c net.Conn, peer *peer.Peer) {
func TestSwarm(t *testing.T) {
swarm := NewSwarm(nil)
peers := []*peer.Peer{}
listeners := []*net.Listener{}
var peers []*peer.Peer
var listeners []net.Listener
peerNames := map[string]string{
"11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a30": "/ip4/127.0.0.1/tcp/1234",
"11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31": "/ip4/127.0.0.1/tcp/2345",
@ -71,14 +72,15 @@ func TestSwarm(t *testing.T) {
}
go pingListen(listener.(*net.TCPListener), peer)
_, err = swarm.Dial(peer)
conn, err, _ := swarm.Dial(peer)
if err != nil {
t.Fatal("error swarm dialing to peer", err)
}
swarm.StartConn(conn)
// ok done, add it.
peers = append(peers, peer)
listeners = append(listeners, &listener)
listeners = append(listeners, listener)
}
MsgNum := 1000
@ -112,6 +114,6 @@ func TestSwarm(t *testing.T) {
fmt.Println("closing")
swarm.Close()
for _, listener := range listeners {
(*listener).(*net.TCPListener).Close()
listener.(*net.TCPListener).Close()
}
}

View File

@ -1,25 +1,39 @@
package util
import (
"errors"
"fmt"
mh "github.com/jbenet/go-multihash"
"os"
"os/user"
"strings"
b58 "github.com/jbenet/go-base58"
mh "github.com/jbenet/go-multihash"
)
// Debug is a global flag for debugging.
var Debug bool
// ErrNotImplemented signifies a function has not been implemented yet.
var ErrNotImplemented = fmt.Errorf("Error: not implemented yet.")
var ErrNotImplemented = errors.New("Error: not implemented yet.")
// ErrTimeout implies that a timeout has been triggered
var ErrTimeout = fmt.Errorf("Error: Call timed out.")
var ErrTimeout = errors.New("Error: Call timed out.")
// ErrSeErrSearchIncomplete implies that a search type operation didnt
// find the expected node, but did find 'a' node.
var ErrSearchIncomplete = errors.New("Error: Search Incomplete.")
// ErrNotFound is returned when a search fails to find anything
var ErrNotFound = errors.New("Error: Not Found.")
// Key is a string representation of multihash for use with maps.
type Key string
func (k Key) Pretty() string {
return b58.Encode([]byte(k))
}
// Hash is the global IPFS hash function. uses multihash SHA2_256, 256 bits
func Hash(data []byte) (mh.Multihash, error) {
return mh.Sum(data, mh.SHA2_256, -1)
@ -41,12 +55,12 @@ func TildeExpansion(filename string) (string, error) {
// PErr is a shorthand printing function to output to Stderr.
func PErr(format string, a ...interface{}) {
fmt.Fprintf(os.Stderr, format, a...)
fmt.Fprintf(os.Stderr, format+"\n", a...)
}
// POut is a shorthand printing function to output to Stdout.
func POut(format string, a ...interface{}) {
fmt.Fprintf(os.Stdout, format, a...)
fmt.Fprintf(os.Stdout, format+"\n", a...)
}
// DErr is a shorthand debug printing function to output to Stderr.