From bc2618df3fc3fa52f198e12270b8db1a340f43a6 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Wed, 23 Jul 2014 04:48:30 -0700 Subject: [PATCH 01/12] dht interface beginnings --- dht/dht.go | 111 ----------------------------------------- routing/dht/dht.go | 10 ++++ routing/dht/routing.go | 44 ++++++++++++++++ routing/dht/table.go | 61 ++++++++++++++++++++++ 4 files changed, 115 insertions(+), 111 deletions(-) delete mode 100644 dht/dht.go create mode 100644 routing/dht/dht.go create mode 100644 routing/dht/routing.go create mode 100644 routing/dht/table.go diff --git a/dht/dht.go b/dht/dht.go deleted file mode 100644 index 89a41a646..000000000 --- a/dht/dht.go +++ /dev/null @@ -1,111 +0,0 @@ -package dht - -import ( - "time" - mh "github.com/jbenet/go-multihash" - peer "github.com/jbenet/go-ipfs/peer" - "errors" - "net" -) - -var NotFound = errors.New("Not Found") -var NotAvailable = errors.New("Not Available") -var TimeoutExceeded = errors.New("Timeout Exceeded") - -// The IPFS DHT is an implementation of Kademlia with -// Coral and S/Kademlia modifications. It is used to -// implement the base IPFS Routing module. - -// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js -type DHT struct { - //Network - Network net.Conn - - // DHT Configuration Settings - Config DHTConfig - - //Republish - Republish *DHTRepublish -} - -// TODO: not call this republish -type DHTRepublish struct { - Strict []*DHTObject - Sloppy []*DHTObject -} - -type DHTObject struct { - Key string - Value *DHTValue - LastPublished *time.Time -} - -func (o *DHTObject) ShouldRepublish(interval time.Duration) bool { - return (time.Now().Second() - o.LastPublished.Second()) > int(interval.Seconds()) -} - -// A struct representing a value in the DHT -type DHTValue struct {} - -type DHTConfig struct { - // Time to wait between republishing intervals - RepublishInterval time.Duration - - // Multihash hash function - HashType int -} - -// Looks for a particular node -func (dht *DHT) FindNode(id *peer.ID /* and a callback? */) error { - panic("Not implemented.") -} - -func (dht *DHT) PingNode(id *peer.ID, timeout time.Duration) error { - panic("Not implemented.") -} - -// Retrieves a value for a given key -func (dht *DHT) GetValue(key string) *DHTValue { - panic("Not implemented.") -} - -// Stores a value for a given key -func (dht *DHT) SetValue(key string, value *DHTValue) error { - panic("Not implemented.") -} - -// GetSloppyValues finds (at least) a number of values for given key -func (dht *DHT) GetSloppyValues(key string, count int) ([]*DHTValue, error) { - panic("Not implemented.") -} - -func (dht *DHT) SetSloppyValue(key string, value *DHTValue) error { - panic("Not implemented.") -} - -func (dht *DHT) periodicRepublish() { - tick := time.NewTicker(time.Second * 5) - for { - select { - case <-tick.C: - for _,v := range dht.Republish.Strict { - if v.ShouldRepublish(dht.Config.RepublishInterval) { - dht.SetValue(v.Key, v.Value) - } - } - - for _,v := range dht.Republish.Sloppy { - if v.ShouldRepublish(dht.Config.RepublishInterval) { - dht.SetSloppyValue(v.Key, v.Value) - } - } - } - } -} - -func (dht *DHT) handleMessage(message []byte) { - -} - -func (dht *DHT) coerceMultihash(hash mh.Multihash) { -} diff --git a/routing/dht/dht.go b/routing/dht/dht.go new file mode 100644 index 000000000..e8f475a9e --- /dev/null +++ b/routing/dht/dht.go @@ -0,0 +1,10 @@ +package dht + +// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js + + +// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications. +// It is used to implement the base IpfsRouting module. +type IpfsDHT struct { + routes RoutingTable +} diff --git a/routing/dht/routing.go b/routing/dht/routing.go new file mode 100644 index 000000000..575f0a1bf --- /dev/null +++ b/routing/dht/routing.go @@ -0,0 +1,44 @@ +package dht + +import ( + "time" + peer "github.com/jbenet/go-ipfs/peer" + u "github.com/jbenet/go-ipfs/util" +) + + +// This file implements the Routing interface for the IpfsDHT struct. + +// Basic Put/Get + +// PutValue adds value corresponding to given Key. +func (s *IpfsDHT) PutValue(key u.Key, value []byte) (error) { + return u.ErrNotImplemented +} + +// GetValue searches for the value corresponding to given Key. +func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { + return nil, u.ErrNotImplemented +} + + +// Value provider layer of indirection. +// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT. + +// Announce that this node can provide value for given key +func (s *IpfsDHT) Provide(key u.Key) (error) { + return u.ErrNotImplemented +} + +// FindProviders searches for peers who can provide the value for given key. +func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) (*peer.Peer, error) { + return nil, u.ErrNotImplemented +} + + +// Find specific Peer + +// FindPeer searches for a peer with given ID. +func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) { + return nil, u.ErrNotImplemented +} diff --git a/routing/dht/table.go b/routing/dht/table.go new file mode 100644 index 000000000..5ac514e04 --- /dev/null +++ b/routing/dht/table.go @@ -0,0 +1,61 @@ +package dht + +import ( + "container/list" +) + + +// ID for IpfsDHT should be a byte slice, to allow for simpler operations +// (xor). DHT ids are based on the peer.IDs. +// +// NOTE: peer.IDs are biased because they are (a) multihashes (first bytes +// biased), and (b) first bits are zeroes when using the S/Kademlia PoW. +// Thus, may need to re-hash keys (uniform dist). TODO(jbenet) +type ID []byte + +// Bucket holds a list of peers. +type Bucket []*list.List + + +// RoutingTable defines the routing table. +type RoutingTable struct { + + // kBuckets define all the fingers to other nodes. + Buckets []Bucket +} + + +func (id ID) commonPrefixLen() int { + for i := 0; i < len(id); i++ { + for j := 0; j < 8; j++ { + if (id[i] >> uint8(7 - j)) & 0x1 != 0 { + return i * 8 + j; + } + } + } + return len(id) * 8 - 1; +} + +func xor(a, b ID) ID { + + // ids may actually be of different sizes. + var ba ID + var bb ID + if len(a) >= len(b) { + ba = a + bb = b + } else { + ba = b + bb = a + } + + c := make(ID, len(ba)) + for i := 0; i < len(ba); i++ { + if len(bb) > i { + c[i] = ba[i] ^ bb[i] + } else { + c[i] = ba[i] ^ 0 + } + } + return c +} From 73bd91a224c415deca62e3bfff70d6aca49de4ba Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 28 Jul 2014 20:46:52 -0700 Subject: [PATCH 02/12] implement listening on swarm object --- swarm/swarm.go | 52 ++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 50 insertions(+), 2 deletions(-) diff --git a/swarm/swarm.go b/swarm/swarm.go index 9f4759895..441779b3d 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -3,7 +3,10 @@ package swarm import ( "fmt" peer "github.com/jbenet/go-ipfs/peer" + ma "github.com/jbenet/go-multiaddr" + u "github.com/jbenet/go-ipfs/util" "sync" + "net" ) // Message represents a packet of information sent to or received from a @@ -42,19 +45,64 @@ type Swarm struct { Chan *Chan conns ConnMap connsLock sync.RWMutex + + local *peer.Peer } // NewSwarm constructs a Swarm, with a Chan. -func NewSwarm() *Swarm { +func NewSwarm(local *peer.Peer) *Swarm { s := &Swarm{ Chan: NewChan(10), conns: ConnMap{}, + local: local, } go s.fanOut() return s } -// Close closes a swam. +// Open listeners for each network the swarm should listen on +func (s *Swarm) Listen() { + for _,addr := range s.local.Addresses { + err := s.connListen(addr) + if err != nil { + u.PErr("Failed to listen on: %s [%s]", addr, err) + } + } +} + +// Listen for new connections on the given multiaddr +func (s *Swarm) connListen(maddr *ma.Multiaddr) error { + netstr, addr, err := maddr.DialArgs() + if err != nil { + return err + } + + list, err := net.Listen(netstr, addr) + if err != nil { + return err + } + + // Accept and handle new connections on this listener until it errors + go func() { + for { + nconn,err := list.Accept() + if err != nil { + u.PErr("Failed to accept connection: %s - %s", netstr, addr) + return + } + go s.handleNewConn(nconn) + } + }() + + return nil +} + +// Handle getting ID from this peer and adding it into the map +func (s *Swarm) handleNewConn(nconn net.Conn) { + panic("Not yet implemented!") +} + +// Close closes a swarm. func (s *Swarm) Close() { s.connsLock.RLock() l := len(s.conns) From 8bc80124a47849d3ad818782c939e27a71e27340 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 28 Jul 2014 22:14:27 -0700 Subject: [PATCH 03/12] working on upper level dht implementations, protbuf, etc --- routing/dht/dht.go | 12 +++++ routing/dht/messages.pb.go | 96 ++++++++++++++++++++++++++++++++++++++ routing/dht/messages.proto | 16 +++++++ routing/dht/routing.go | 36 +++++++++++++- 4 files changed, 158 insertions(+), 2 deletions(-) create mode 100644 routing/dht/messages.pb.go create mode 100644 routing/dht/messages.proto diff --git a/routing/dht/dht.go b/routing/dht/dht.go index e8f475a9e..16a10c9e1 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -1,5 +1,9 @@ package dht +import ( + swarm "github.com/jbenet/go-ipfs/swarm" +) + // TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js @@ -7,4 +11,12 @@ package dht // It is used to implement the base IpfsRouting module. type IpfsDHT struct { routes RoutingTable + + network *swarm.Swarm +} + +func (dht *IpfsDHT) handleMessages() { + for mes := range dht.network.Chan.Incoming { + + } } diff --git a/routing/dht/messages.pb.go b/routing/dht/messages.pb.go new file mode 100644 index 000000000..718f86b92 --- /dev/null +++ b/routing/dht/messages.pb.go @@ -0,0 +1,96 @@ +// Code generated by protoc-gen-go. +// source: messages.proto +// DO NOT EDIT! + +/* +Package dht is a generated protocol buffer package. + +It is generated from these files: + messages.proto + +It has these top-level messages: + DHTMessage +*/ +package dht + +import proto "code.google.com/p/goprotobuf/proto" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = math.Inf + +type DHTMessage_MessageType int32 + +const ( + DHTMessage_PUT_VALUE DHTMessage_MessageType = 0 + DHTMessage_GET_VALUE DHTMessage_MessageType = 1 + DHTMessage_PING DHTMessage_MessageType = 2 + DHTMessage_FIND_NODE DHTMessage_MessageType = 3 +) + +var DHTMessage_MessageType_name = map[int32]string{ + 0: "PUT_VALUE", + 1: "GET_VALUE", + 2: "PING", + 3: "FIND_NODE", +} +var DHTMessage_MessageType_value = map[string]int32{ + "PUT_VALUE": 0, + "GET_VALUE": 1, + "PING": 2, + "FIND_NODE": 3, +} + +func (x DHTMessage_MessageType) Enum() *DHTMessage_MessageType { + p := new(DHTMessage_MessageType) + *p = x + return p +} +func (x DHTMessage_MessageType) String() string { + return proto.EnumName(DHTMessage_MessageType_name, int32(x)) +} +func (x *DHTMessage_MessageType) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(DHTMessage_MessageType_value, data, "DHTMessage_MessageType") + if err != nil { + return err + } + *x = DHTMessage_MessageType(value) + return nil +} + +type DHTMessage struct { + Type *DHTMessage_MessageType `protobuf:"varint,1,req,name=type,enum=dht.DHTMessage_MessageType" json:"type,omitempty"` + Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"` + Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *DHTMessage) Reset() { *m = DHTMessage{} } +func (m *DHTMessage) String() string { return proto.CompactTextString(m) } +func (*DHTMessage) ProtoMessage() {} + +func (m *DHTMessage) GetType() DHTMessage_MessageType { + if m != nil && m.Type != nil { + return *m.Type + } + return DHTMessage_PUT_VALUE +} + +func (m *DHTMessage) GetKey() string { + if m != nil && m.Key != nil { + return *m.Key + } + return "" +} + +func (m *DHTMessage) GetValue() []byte { + if m != nil { + return m.Value + } + return nil +} + +func init() { + proto.RegisterEnum("dht.DHTMessage_MessageType", DHTMessage_MessageType_name, DHTMessage_MessageType_value) +} diff --git a/routing/dht/messages.proto b/routing/dht/messages.proto new file mode 100644 index 000000000..458b80fb6 --- /dev/null +++ b/routing/dht/messages.proto @@ -0,0 +1,16 @@ +package dht; + +//run `protoc --go_out=. *.proto` to generate + +message DHTMessage { + enum MessageType { + PUT_VALUE = 0; + GET_VALUE = 1; + PING = 2; + FIND_NODE = 3; + } + + required MessageType type = 1; + optional string key = 2; + optional bytes value = 3; +} diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 575f0a1bf..0242399dd 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -4,6 +4,7 @@ import ( "time" peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" + swarm "github.com/jbenet/go-ipfs/swarm" ) @@ -13,12 +14,43 @@ import ( // PutValue adds value corresponding to given Key. func (s *IpfsDHT) PutValue(key u.Key, value []byte) (error) { - return u.ErrNotImplemented + var p *peer.Peer + p = s.routes.NearestNode(key) + + pmes := new(PutValue) + pmes.Key = &key + pmes.Value = value + + mes := new(swarm.Message) + mes.Data = []byte(pmes.String()) + mes.Peer = p + + s.network.Chan.Outgoing <- mes + return nil } // GetValue searches for the value corresponding to given Key. func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { - return nil, u.ErrNotImplemented + var p *peer.Peer + p = s.routes.NearestNode(key) + + // protobuf structure + pmes := new(GetValue) + pmes.Key = &key + pmes.Id = GenerateMessageID() + + mes := new(swarm.Message) + mes.Data = []byte(pmes.String()) + mes.Peer = p + + response_chan := s.network.ListenFor(pmes.Id) + + timeup := time.After(timeout) + select { + case <-timeup: + return nil, timeoutError + case resp := <-response_chan: + } } From 171f96b794aae13cafe72d81705ba246574fb0aa Mon Sep 17 00:00:00 2001 From: Jeromy Johnson Date: Tue, 29 Jul 2014 14:50:33 -0700 Subject: [PATCH 04/12] update messages and add some new code around handling/creating messages --- routing/dht/dht.go | 34 ++++++++++++++++++++++++++++++++-- routing/dht/messages.proto | 7 +++++-- routing/dht/routing.go | 8 +++++++- 3 files changed, 44 insertions(+), 5 deletions(-) diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 16a10c9e1..21f054e69 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -2,6 +2,7 @@ package dht import ( swarm "github.com/jbenet/go-ipfs/swarm" + "sync" ) // TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js @@ -10,13 +11,42 @@ import ( // IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications. // It is used to implement the base IpfsRouting module. type IpfsDHT struct { - routes RoutingTable + routes RoutingTable - network *swarm.Swarm + network *swarm.Swarm + + listeners map[uint64]chan swarm.Message + listenLock sync.RWMutex } +// Read in all messages from swarm and handle them appropriately +// NOTE: this function is just a quick sketch func (dht *IpfsDHT) handleMessages() { for mes := range dht.network.Chan.Incoming { + for { + select { + case mes := <-dht.network.Chan.Incoming: + // Unmarshal message + dht.listenLock.RLock() + ch, ok := dht.listeners[id] + dht.listenLock.RUnlock() + if ok { + // Send message to waiting goroutine + ch <- mes + } + //case closeChan: or something + } + } } } + +// Register a handler for a specific message ID, used for getting replies +// to certain messages (i.e. response to a GET_VALUE message) +func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan swarm.Message { + lchan := make(chan swarm.Message) + dht.listenLock.Lock() + dht.listeners[mesid] = lchan + dht.listenLock.Unlock() + return lchan +} diff --git a/routing/dht/messages.proto b/routing/dht/messages.proto index 458b80fb6..37024037b 100644 --- a/routing/dht/messages.proto +++ b/routing/dht/messages.proto @@ -6,11 +6,14 @@ message DHTMessage { enum MessageType { PUT_VALUE = 0; GET_VALUE = 1; - PING = 2; - FIND_NODE = 3; + ADD_PROVIDER = 2; + GET_PROVIDERS = 3; + FIND_NODE = 4; + PING = 5; } required MessageType type = 1; optional string key = 2; optional bytes value = 3; + required int64 id = 4; } diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 0242399dd..bcbdec1da 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -43,14 +43,20 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { mes.Data = []byte(pmes.String()) mes.Peer = p - response_chan := s.network.ListenFor(pmes.Id) + response_chan := s.ListenFor(pmes.Id) + // Wait for either the response or a timeout timeup := time.After(timeout) select { case <-timeup: + // TODO: unregister listener return nil, timeoutError case resp := <-response_chan: + return resp.Data, nil } + + // Should never be hit + return nil, nil } From 550971fba6d734d8035e7b1ba5d0171df0ebe0d8 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Tue, 29 Jul 2014 17:53:31 -0700 Subject: [PATCH 05/12] equalize sizes --- routing/dht/table.go | 86 +++++++++++++++++++++++++------------------- 1 file changed, 50 insertions(+), 36 deletions(-) diff --git a/routing/dht/table.go b/routing/dht/table.go index 5ac514e04..53fd0f0d1 100644 --- a/routing/dht/table.go +++ b/routing/dht/table.go @@ -1,61 +1,75 @@ package dht import ( - "container/list" + "bytes" + "container/list" ) - // ID for IpfsDHT should be a byte slice, to allow for simpler operations // (xor). DHT ids are based on the peer.IDs. // -// NOTE: peer.IDs are biased because they are (a) multihashes (first bytes -// biased), and (b) first bits are zeroes when using the S/Kademlia PoW. -// Thus, may need to re-hash keys (uniform dist). TODO(jbenet) +// NOTE: peer.IDs are biased because they are multihashes (first bytes +// biased). Thus, may need to re-hash keys (uniform dist). TODO(jbenet) type ID []byte // Bucket holds a list of peers. type Bucket []*list.List - // RoutingTable defines the routing table. type RoutingTable struct { - // kBuckets define all the fingers to other nodes. - Buckets []Bucket + // kBuckets define all the fingers to other nodes. + Buckets []Bucket } +func (id ID) Equal(other ID) bool { + return bytes.Equal(id, other) +} + +func (id ID) Less(other interface{}) bool { + a, b := equalizeSizes(id, other.(ID)) + for i := 0; i < len(a); i++ { + if a[i] != b[i] { + return a[i] < b[i] + } + } + return len(a) < len(b) +} func (id ID) commonPrefixLen() int { - for i := 0; i < len(id); i++ { - for j := 0; j < 8; j++ { - if (id[i] >> uint8(7 - j)) & 0x1 != 0 { - return i * 8 + j; - } - } - } - return len(id) * 8 - 1; + for i := 0; i < len(id); i++ { + for j := 0; j < 8; j++ { + if (id[i]>>uint8(7-j))&0x1 != 0 { + return i*8 + j + } + } + } + return len(id)*8 - 1 } func xor(a, b ID) ID { + a, b = equalizeSizes(a, b) - // ids may actually be of different sizes. - var ba ID - var bb ID - if len(a) >= len(b) { - ba = a - bb = b - } else { - ba = b - bb = a - } - - c := make(ID, len(ba)) - for i := 0; i < len(ba); i++ { - if len(bb) > i { - c[i] = ba[i] ^ bb[i] - } else { - c[i] = ba[i] ^ 0 - } - } - return c + c := make(ID, len(a)) + for i := 0; i < len(a); i++ { + c[i] = a[i] ^ b[i] + } + return c +} + +func equalizeSizes(a, b ID) (ID, ID) { + la := len(a) + lb := len(b) + + if la < lb { + na := make([]byte, lb) + copy(na, a) + a = na + } else if lb < la { + nb := make([]byte, la) + copy(nb, b) + b = nb + } + + return a, b } From 74d26449c98267cb3c06f56ad2752d7d89c5202b Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Tue, 29 Jul 2014 17:55:19 -0700 Subject: [PATCH 06/12] whole project go fmt --- bitswap/bitswap.go | 20 +++++++++++--------- routing/dht/dht.go | 5 ++--- routing/dht/routing.go | 31 ++++++++++++++----------------- routing/routing.go | 36 +++++++++++++++++------------------- swarm/swarm.go | 10 +++++----- 5 files changed, 49 insertions(+), 53 deletions(-) diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index 737f43e63..f70b45209 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -1,24 +1,26 @@ package bitswap import ( - "time" - mh "github.com/jbenet/go-multihash" - blocks "github.com/jbenet/go-ipfs/blocks" - u "github.com/jbenet/go-ipfs/util" + blocks "github.com/jbenet/go-ipfs/blocks" + peer "github.com/jbenet/go-ipfs/peer" + u "github.com/jbenet/go-ipfs/util" + mh "github.com/jbenet/go-multihash" + "time" ) // aliases type Ledger struct { - Owner mh.Multihash - Partner mh.Multihash + Owner mh.Multihash + Partner mh.Multihash BytesSent uint64 BytesRecv uint64 Timestamp *time.Time } type BitSwap struct { - Ledgers map[u.Key]*Ledger // key is peer.ID - HaveList map[u.Key]*blocks.Block // key is multihash - WantList []*mh.Multihash + Ledgers map[u.Key]*Ledger // key is peer.ID + HaveList map[u.Key]*blocks.Block // key is multihash + WantList []*mh.Multihash + // todo } diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 21f054e69..c7e6f3c2c 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -7,7 +7,6 @@ import ( // TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js - // IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications. // It is used to implement the base IpfsRouting module. type IpfsDHT struct { @@ -15,7 +14,7 @@ type IpfsDHT struct { network *swarm.Swarm - listeners map[uint64]chan swarm.Message + listeners map[uint64]chan swarm.Message listenLock sync.RWMutex } @@ -35,7 +34,7 @@ func (dht *IpfsDHT) handleMessages() { ch <- mes } - //case closeChan: or something + //case closeChan: or something } } } diff --git a/routing/dht/routing.go b/routing/dht/routing.go index bcbdec1da..2bbe93e32 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -1,19 +1,18 @@ package dht import ( - "time" - peer "github.com/jbenet/go-ipfs/peer" - u "github.com/jbenet/go-ipfs/util" - swarm "github.com/jbenet/go-ipfs/swarm" + peer "github.com/jbenet/go-ipfs/peer" + swarm "github.com/jbenet/go-ipfs/swarm" + u "github.com/jbenet/go-ipfs/util" + "time" ) - // This file implements the Routing interface for the IpfsDHT struct. // Basic Put/Get // PutValue adds value corresponding to given Key. -func (s *IpfsDHT) PutValue(key u.Key, value []byte) (error) { +func (s *IpfsDHT) PutValue(key u.Key, value []byte) error { var p *peer.Peer p = s.routes.NearestNode(key) @@ -48,35 +47,33 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { // Wait for either the response or a timeout timeup := time.After(timeout) select { - case <-timeup: - // TODO: unregister listener - return nil, timeoutError - case resp := <-response_chan: - return resp.Data, nil + case <-timeup: + // TODO: unregister listener + return nil, timeoutError + case resp := <-response_chan: + return resp.Data, nil } // Should never be hit return nil, nil } - // Value provider layer of indirection. // This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT. // Announce that this node can provide value for given key -func (s *IpfsDHT) Provide(key u.Key) (error) { - return u.ErrNotImplemented +func (s *IpfsDHT) Provide(key u.Key) error { + return u.ErrNotImplemented } // FindProviders searches for peers who can provide the value for given key. func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) (*peer.Peer, error) { - return nil, u.ErrNotImplemented + return nil, u.ErrNotImplemented } - // Find specific Peer // FindPeer searches for a peer with given ID. func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) { - return nil, u.ErrNotImplemented + return nil, u.ErrNotImplemented } diff --git a/routing/routing.go b/routing/routing.go index 4a9240181..933032f46 100644 --- a/routing/routing.go +++ b/routing/routing.go @@ -1,36 +1,34 @@ package routing import ( - "time" - peer "github.com/jbenet/go-ipfs/peer" - u "github.com/jbenet/go-ipfs/util" + peer "github.com/jbenet/go-ipfs/peer" + u "github.com/jbenet/go-ipfs/util" + "time" ) // IpfsRouting is the routing module interface // It is implemented by things like DHTs, etc. type IpfsRouting interface { - // Basic Put/Get + // Basic Put/Get - // PutValue adds value corresponding to given Key. - PutValue(key u.Key, value []byte) (error) + // PutValue adds value corresponding to given Key. + PutValue(key u.Key, value []byte) error - // GetValue searches for the value corresponding to given Key. - GetValue(key u.Key, timeout time.Duration) ([]byte, error) + // GetValue searches for the value corresponding to given Key. + GetValue(key u.Key, timeout time.Duration) ([]byte, error) + // Value provider layer of indirection. + // This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT. - // Value provider layer of indirection. - // This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT. + // Announce that this node can provide value for given key + Provide(key u.Key) error - // Announce that this node can provide value for given key - Provide(key u.Key) (error) + // FindProviders searches for peers who can provide the value for given key. + FindProviders(key u.Key, timeout time.Duration) (*peer.Peer, error) - // FindProviders searches for peers who can provide the value for given key. - FindProviders(key u.Key, timeout time.Duration) (*peer.Peer, error) + // Find specific Peer - - // Find specific Peer - - // FindPeer searches for a peer with given ID. - FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) + // FindPeer searches for a peer with given ID. + FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) } diff --git a/swarm/swarm.go b/swarm/swarm.go index 441779b3d..1e23a8e74 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -3,10 +3,10 @@ package swarm import ( "fmt" peer "github.com/jbenet/go-ipfs/peer" - ma "github.com/jbenet/go-multiaddr" u "github.com/jbenet/go-ipfs/util" - "sync" + ma "github.com/jbenet/go-multiaddr" "net" + "sync" ) // Message represents a packet of information sent to or received from a @@ -46,7 +46,7 @@ type Swarm struct { conns ConnMap connsLock sync.RWMutex - local *peer.Peer + local *peer.Peer } // NewSwarm constructs a Swarm, with a Chan. @@ -62,7 +62,7 @@ func NewSwarm(local *peer.Peer) *Swarm { // Open listeners for each network the swarm should listen on func (s *Swarm) Listen() { - for _,addr := range s.local.Addresses { + for _, addr := range s.local.Addresses { err := s.connListen(addr) if err != nil { u.PErr("Failed to listen on: %s [%s]", addr, err) @@ -85,7 +85,7 @@ func (s *Swarm) connListen(maddr *ma.Multiaddr) error { // Accept and handle new connections on this listener until it errors go func() { for { - nconn,err := list.Accept() + nconn, err := list.Accept() if err != nil { u.PErr("Failed to accept connection: %s - %s", netstr, addr) return From 061331875c442d0e7e8a7951855c6ad919d4f504 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Tue, 29 Jul 2014 18:01:05 -0700 Subject: [PATCH 07/12] decomp chan creation for listener + bugfix test failed to compile, as NewSwarm now takes a parm. --- swarm/conn.go | 29 ++++++++++++++++++----------- swarm/swarm_test.go | 2 +- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/swarm/conn.go b/swarm/conn.go index a24a5e7ad..56e8eea17 100644 --- a/swarm/conn.go +++ b/swarm/conn.go @@ -44,29 +44,36 @@ func Dial(network string, peer *peer.Peer) (*Conn, error) { return nil, err } - out := msgio.NewChan(10) - inc := msgio.NewChan(10) - conn := &Conn{ Peer: peer, Addr: addr, Conn: nconn, - - Outgoing: out, - Incoming: inc, - Closed: make(chan bool, 1), } - go out.WriteTo(nconn) - go inc.ReadFrom(nconn, 1<<12) - + newConnChans(conn) return conn, nil } +// Construct new channels for given Conn. +func newConnChans(c *Conn) error { + if c.Outgoing != nil || c.Incoming != nil { + return fmt.Errorf("Conn already initialized") + } + + c.Outgoing = msgio.NewChan(10) + c.Incoming = msgio.NewChan(10) + c.Closed = make(chan bool, 1) + + go c.Outgoing.WriteTo(c.Conn) + go c.Incoming.ReadFrom(c.Conn, 1<<12) + + return nil +} + // Close closes the connection, and associated channels. func (s *Conn) Close() error { if s.Conn == nil { - return fmt.Errorf("Already closed.") // already closed + return fmt.Errorf("Already closed") // already closed } // closing net connection diff --git a/swarm/swarm_test.go b/swarm/swarm_test.go index 46299aab3..23d34facc 100644 --- a/swarm/swarm_test.go +++ b/swarm/swarm_test.go @@ -42,7 +42,7 @@ func pong(c net.Conn, peer *peer.Peer) { func TestSwarm(t *testing.T) { - swarm := NewSwarm() + swarm := NewSwarm(nil) peers := []*peer.Peer{} listeners := []*net.Listener{} peerNames := map[string]string{ From 3444d41dce89a9c1b94785509d93b44be8c837dc Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 29 Jul 2014 19:33:51 -0700 Subject: [PATCH 08/12] work on framework for dht message handling --- routing/dht/dht.go | 56 ++++++++++++++++++++++++++------------ routing/dht/messages.pb.go | 34 ++++++++++++++++------- routing/dht/messages.proto | 2 +- routing/dht/routing.go | 29 +++++++++++++++----- routing/dht/table.go | 8 ++++++ swarm/swarm.go | 10 +++---- util/util.go | 3 ++ 7 files changed, 102 insertions(+), 40 deletions(-) diff --git a/routing/dht/dht.go b/routing/dht/dht.go index c7e6f3c2c..e5d7e9422 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -2,6 +2,8 @@ package dht import ( swarm "github.com/jbenet/go-ipfs/swarm" + u "github.com/jbenet/go-ipfs/util" + "code.google.com/p/goprotobuf/proto" "sync" ) @@ -14,36 +16,56 @@ type IpfsDHT struct { network *swarm.Swarm - listeners map[uint64]chan swarm.Message + // map of channels waiting for reply messages + listeners map[uint64]chan *swarm.Message listenLock sync.RWMutex + + // Signal to shutdown dht + shutdown chan struct{} } // Read in all messages from swarm and handle them appropriately // NOTE: this function is just a quick sketch func (dht *IpfsDHT) handleMessages() { - for mes := range dht.network.Chan.Incoming { - for { - select { - case mes := <-dht.network.Chan.Incoming: - // Unmarshal message - dht.listenLock.RLock() - ch, ok := dht.listeners[id] - dht.listenLock.RUnlock() - if ok { - // Send message to waiting goroutine - ch <- mes - } - - //case closeChan: or something + for { + select { + case mes := <-dht.network.Chan.Incoming: + pmes := new(DHTMessage) + err := proto.Unmarshal(mes.Data, pmes) + if err != nil { + u.PErr("Failed to decode protobuf message: %s", err) + continue } + + // Note: not sure if this is the correct place for this + dht.listenLock.RLock() + ch, ok := dht.listeners[pmes.GetId()] + dht.listenLock.RUnlock() + if ok { + ch <- mes + } + // + + // Do something else with the messages? + switch pmes.GetType() { + case DHTMessage_ADD_PROVIDER: + case DHTMessage_FIND_NODE: + case DHTMessage_GET_PROVIDERS: + case DHTMessage_GET_VALUE: + case DHTMessage_PING: + case DHTMessage_PUT_VALUE: + } + + case <-dht.shutdown: + return } } } // Register a handler for a specific message ID, used for getting replies // to certain messages (i.e. response to a GET_VALUE message) -func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan swarm.Message { - lchan := make(chan swarm.Message) +func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan *swarm.Message { + lchan := make(chan *swarm.Message) dht.listenLock.Lock() dht.listeners[mesid] = lchan dht.listenLock.Unlock() diff --git a/routing/dht/messages.pb.go b/routing/dht/messages.pb.go index 718f86b92..f4fcb8dfd 100644 --- a/routing/dht/messages.pb.go +++ b/routing/dht/messages.pb.go @@ -23,23 +23,29 @@ var _ = math.Inf type DHTMessage_MessageType int32 const ( - DHTMessage_PUT_VALUE DHTMessage_MessageType = 0 - DHTMessage_GET_VALUE DHTMessage_MessageType = 1 - DHTMessage_PING DHTMessage_MessageType = 2 - DHTMessage_FIND_NODE DHTMessage_MessageType = 3 + DHTMessage_PUT_VALUE DHTMessage_MessageType = 0 + DHTMessage_GET_VALUE DHTMessage_MessageType = 1 + DHTMessage_ADD_PROVIDER DHTMessage_MessageType = 2 + DHTMessage_GET_PROVIDERS DHTMessage_MessageType = 3 + DHTMessage_FIND_NODE DHTMessage_MessageType = 4 + DHTMessage_PING DHTMessage_MessageType = 5 ) var DHTMessage_MessageType_name = map[int32]string{ 0: "PUT_VALUE", 1: "GET_VALUE", - 2: "PING", - 3: "FIND_NODE", + 2: "ADD_PROVIDER", + 3: "GET_PROVIDERS", + 4: "FIND_NODE", + 5: "PING", } var DHTMessage_MessageType_value = map[string]int32{ - "PUT_VALUE": 0, - "GET_VALUE": 1, - "PING": 2, - "FIND_NODE": 3, + "PUT_VALUE": 0, + "GET_VALUE": 1, + "ADD_PROVIDER": 2, + "GET_PROVIDERS": 3, + "FIND_NODE": 4, + "PING": 5, } func (x DHTMessage_MessageType) Enum() *DHTMessage_MessageType { @@ -63,6 +69,7 @@ type DHTMessage struct { Type *DHTMessage_MessageType `protobuf:"varint,1,req,name=type,enum=dht.DHTMessage_MessageType" json:"type,omitempty"` Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"` Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"` + Id *uint64 `protobuf:"varint,4,req,name=id" json:"id,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -91,6 +98,13 @@ func (m *DHTMessage) GetValue() []byte { return nil } +func (m *DHTMessage) GetId() uint64 { + if m != nil && m.Id != nil { + return *m.Id + } + return 0 +} + func init() { proto.RegisterEnum("dht.DHTMessage_MessageType", DHTMessage_MessageType_name, DHTMessage_MessageType_value) } diff --git a/routing/dht/messages.proto b/routing/dht/messages.proto index 37024037b..67ffad447 100644 --- a/routing/dht/messages.proto +++ b/routing/dht/messages.proto @@ -15,5 +15,5 @@ message DHTMessage { required MessageType type = 1; optional string key = 2; optional bytes value = 3; - required int64 id = 4; + required uint64 id = 4; } diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 2bbe93e32..dfdde4dd1 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -7,6 +7,11 @@ import ( "time" ) +// TODO: determine a way of creating and managing message IDs +func GenerateMessageID() uint64 { + return 4 +} + // This file implements the Routing interface for the IpfsDHT struct. // Basic Put/Get @@ -16,9 +21,15 @@ func (s *IpfsDHT) PutValue(key u.Key, value []byte) error { var p *peer.Peer p = s.routes.NearestNode(key) - pmes := new(PutValue) - pmes.Key = &key + pmes_type := DHTMessage_PUT_VALUE + str_key := string(key) + mes_id := GenerateMessageID() + + pmes := new(DHTMessage) + pmes.Type = &pmes_type + pmes.Key = &str_key pmes.Value = value + pmes.Id = &mes_id mes := new(swarm.Message) mes.Data = []byte(pmes.String()) @@ -33,23 +44,27 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { var p *peer.Peer p = s.routes.NearestNode(key) + str_key := string(key) + mes_type := DHTMessage_GET_VALUE + mes_id := GenerateMessageID() // protobuf structure - pmes := new(GetValue) - pmes.Key = &key - pmes.Id = GenerateMessageID() + pmes := new(DHTMessage) + pmes.Type = &mes_type + pmes.Key = &str_key + pmes.Id = &mes_id mes := new(swarm.Message) mes.Data = []byte(pmes.String()) mes.Peer = p - response_chan := s.ListenFor(pmes.Id) + response_chan := s.ListenFor(*pmes.Id) // Wait for either the response or a timeout timeup := time.After(timeout) select { case <-timeup: // TODO: unregister listener - return nil, timeoutError + return nil, u.ErrTimeout case resp := <-response_chan: return resp.Data, nil } diff --git a/routing/dht/table.go b/routing/dht/table.go index 53fd0f0d1..d7625e462 100644 --- a/routing/dht/table.go +++ b/routing/dht/table.go @@ -3,6 +3,9 @@ package dht import ( "bytes" "container/list" + + peer "github.com/jbenet/go-ipfs/peer" + u "github.com/jbenet/go-ipfs/util" ) // ID for IpfsDHT should be a byte slice, to allow for simpler operations @@ -22,6 +25,11 @@ type RoutingTable struct { Buckets []Bucket } +//TODO: make this accept an ID, requires method of converting keys to IDs +func (rt *RoutingTable) NearestNode(key u.Key) *peer.Peer { + panic("Function not implemented.") +} + func (id ID) Equal(other ID) bool { return bytes.Equal(id, other) } diff --git a/swarm/swarm.go b/swarm/swarm.go index 1e23a8e74..fccc74777 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -21,8 +21,8 @@ type Message struct { // Chan is a swam channel, which provides duplex communication and errors. type Chan struct { - Outgoing chan Message - Incoming chan Message + Outgoing chan *Message + Incoming chan *Message Errors chan error Close chan bool } @@ -30,8 +30,8 @@ type Chan struct { // NewChan constructs a Chan instance, with given buffer size bufsize. func NewChan(bufsize int) *Chan { return &Chan{ - Outgoing: make(chan Message, bufsize), - Incoming: make(chan Message, bufsize), + Outgoing: make(chan *Message, bufsize), + Incoming: make(chan *Message, bufsize), Errors: make(chan error), Close: make(chan bool, bufsize), } @@ -197,7 +197,7 @@ Loop: } // wrap it for consumers. - msg := Message{Peer: conn.Peer, Data: data} + msg := &Message{Peer: conn.Peer, Data: data} s.Chan.Incoming <- msg } } diff --git a/util/util.go b/util/util.go index 639b913c9..69831ff8d 100644 --- a/util/util.go +++ b/util/util.go @@ -14,6 +14,9 @@ var Debug bool // ErrNotImplemented signifies a function has not been implemented yet. var ErrNotImplemented = fmt.Errorf("Error: not implemented yet.") +// ErrTimeout implies that a timeout has been triggered +var ErrTimeout = fmt.Errorf("Error: Call timed out.") + // Key is a string representation of multihash for use with maps. type Key string From 87739b3af5bb9a59db757fcca44324afe1841b3b Mon Sep 17 00:00:00 2001 From: Jeromy Johnson Date: Wed, 30 Jul 2014 17:46:56 -0700 Subject: [PATCH 09/12] a little more work on message handling stuff --- routing/dht/dht.go | 77 ++++++++++++++++++++++++++++++++------ routing/dht/messages.pb.go | 20 +++++++--- routing/dht/messages.proto | 5 +++ routing/dht/routing.go | 9 ++--- 4 files changed, 90 insertions(+), 21 deletions(-) diff --git a/routing/dht/dht.go b/routing/dht/dht.go index e5d7e9422..a1f63f106 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -1,10 +1,12 @@ package dht import ( + "sync" + + peer "github.com/jbenet/go-ipfs/peer" swarm "github.com/jbenet/go-ipfs/swarm" u "github.com/jbenet/go-ipfs/util" "code.google.com/p/goprotobuf/proto" - "sync" ) // TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js @@ -16,6 +18,9 @@ type IpfsDHT struct { network *swarm.Swarm + // local data (TEMPORARY: until we formalize data storage with datastore) + data map[string][]byte + // map of channels waiting for reply messages listeners map[uint64]chan *swarm.Message listenLock sync.RWMutex @@ -38,22 +43,28 @@ func (dht *IpfsDHT) handleMessages() { } // Note: not sure if this is the correct place for this - dht.listenLock.RLock() - ch, ok := dht.listeners[pmes.GetId()] - dht.listenLock.RUnlock() - if ok { - ch <- mes + if pmes.GetResponse() { + dht.listenLock.RLock() + ch, ok := dht.listeners[pmes.GetId()] + dht.listenLock.RUnlock() + if ok { + ch <- mes + } + + // this is expected behaviour during a timeout + u.DOut("Received response with nobody listening...") + continue } // - // Do something else with the messages? switch pmes.GetType() { - case DHTMessage_ADD_PROVIDER: - case DHTMessage_FIND_NODE: - case DHTMessage_GET_PROVIDERS: case DHTMessage_GET_VALUE: - case DHTMessage_PING: + dht.handleGetValue(mes.Peer, pmes) case DHTMessage_PUT_VALUE: + case DHTMessage_FIND_NODE: + case DHTMessage_ADD_PROVIDER: + case DHTMessage_GET_PROVIDERS: + case DHTMessage_PING: } case <-dht.shutdown: @@ -62,6 +73,44 @@ func (dht *IpfsDHT) handleMessages() { } } +func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) { + val, found := dht.data[pmes.GetKey()] + if found { + isResponse := true + resp := new(DHTMessage) + resp.Response = &isResponse + resp.Id = pmes.Id + resp.Key = pmes.Key + resp.Value = val + } else { + // Find closest node(s) to desired key and reply with that info + // TODO: this will need some other metadata in the protobuf message + // to signal to the querying node that the data its receiving + // is actually a list of other nodes + } +} + +func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) { + panic("Not implemented.") +} + +func (dht *IpfsDHT) handleFindNode(p *peer.Peer, pmes *DHTMessage) { + panic("Not implemented.") +} + +func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) { + isResponse := true + resp := new(DHTMessage) + resp.Id = pmes.Id + resp.Response = &isResponse + + mes := new(swarm.Message) + mes.Peer = p + mes.Data = []byte(resp.String()) + dht.network.Chan.Outgoing <- mes +} + + // Register a handler for a specific message ID, used for getting replies // to certain messages (i.e. response to a GET_VALUE message) func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan *swarm.Message { @@ -71,3 +120,9 @@ func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan *swarm.Message { dht.listenLock.Unlock() return lchan } + +// Stop all communications from this node and shut down +func (dht *IpfsDHT) Halt() { + dht.shutdown <- struct{}{} + dht.network.Close() +} diff --git a/routing/dht/messages.pb.go b/routing/dht/messages.pb.go index f4fcb8dfd..3283ef4e2 100644 --- a/routing/dht/messages.pb.go +++ b/routing/dht/messages.pb.go @@ -66,11 +66,14 @@ func (x *DHTMessage_MessageType) UnmarshalJSON(data []byte) error { } type DHTMessage struct { - Type *DHTMessage_MessageType `protobuf:"varint,1,req,name=type,enum=dht.DHTMessage_MessageType" json:"type,omitempty"` - Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"` - Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"` - Id *uint64 `protobuf:"varint,4,req,name=id" json:"id,omitempty"` - XXX_unrecognized []byte `json:"-"` + Type *DHTMessage_MessageType `protobuf:"varint,1,req,name=type,enum=dht.DHTMessage_MessageType" json:"type,omitempty"` + Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"` + Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"` + // Unique ID of this message, used to match queries with responses + Id *uint64 `protobuf:"varint,4,req,name=id" json:"id,omitempty"` + // Signals whether or not this message is a response to another message + Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"` + XXX_unrecognized []byte `json:"-"` } func (m *DHTMessage) Reset() { *m = DHTMessage{} } @@ -105,6 +108,13 @@ func (m *DHTMessage) GetId() uint64 { return 0 } +func (m *DHTMessage) GetResponse() bool { + if m != nil && m.Response != nil { + return *m.Response + } + return false +} + func init() { proto.RegisterEnum("dht.DHTMessage_MessageType", DHTMessage_MessageType_name, DHTMessage_MessageType_value) } diff --git a/routing/dht/messages.proto b/routing/dht/messages.proto index 67ffad447..d873c7559 100644 --- a/routing/dht/messages.proto +++ b/routing/dht/messages.proto @@ -15,5 +15,10 @@ message DHTMessage { required MessageType type = 1; optional string key = 2; optional bytes value = 3; + + // Unique ID of this message, used to match queries with responses required uint64 id = 4; + + // Signals whether or not this message is a response to another message + optional bool response = 5; } diff --git a/routing/dht/routing.go b/routing/dht/routing.go index dfdde4dd1..e9ed64d98 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -1,15 +1,17 @@ package dht import ( + "math/rand" + "time" + peer "github.com/jbenet/go-ipfs/peer" swarm "github.com/jbenet/go-ipfs/swarm" u "github.com/jbenet/go-ipfs/util" - "time" ) // TODO: determine a way of creating and managing message IDs func GenerateMessageID() uint64 { - return 4 + return uint64(rand.Uint32()) << 32 & uint64(rand.Uint32()) } // This file implements the Routing interface for the IpfsDHT struct. @@ -68,9 +70,6 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { case resp := <-response_chan: return resp.Data, nil } - - // Should never be hit - return nil, nil } // Value provider layer of indirection. From 2a1ee3ae3a8bfa8d5b9aa45eb86bcad9bae33c77 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 30 Jul 2014 20:16:34 -0700 Subject: [PATCH 10/12] use datastore for local data --- routing/dht/dht.go | 60 +++++++++++++++++++++++++++++++++++++++------- 1 file changed, 51 insertions(+), 9 deletions(-) diff --git a/routing/dht/dht.go b/routing/dht/dht.go index a1f63f106..0f55ba45b 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -3,9 +3,12 @@ package dht import ( "sync" - peer "github.com/jbenet/go-ipfs/peer" - swarm "github.com/jbenet/go-ipfs/swarm" - u "github.com/jbenet/go-ipfs/util" + peer "github.com/jbenet/go-ipfs/peer" + swarm "github.com/jbenet/go-ipfs/swarm" + u "github.com/jbenet/go-ipfs/util" + + ds "github.com/jbenet/datastore.go" + "code.google.com/p/goprotobuf/proto" ) @@ -18,8 +21,11 @@ type IpfsDHT struct { network *swarm.Swarm - // local data (TEMPORARY: until we formalize data storage with datastore) - data map[string][]byte + // Local peer (yourself) + self *peer.Peer + + // Local data + datastore ds.Datastore // map of channels waiting for reply messages listeners map[uint64]chan *swarm.Message @@ -29,6 +35,15 @@ type IpfsDHT struct { shutdown chan struct{} } +func NewDHT(p *peer.Peer) *IpfsDHT { + dht := new(IpfsDHT) + dht.self = p + dht.network = swarm.NewSwarm(p) + dht.listeners = make(map[uint64]chan *swarm.Message) + dht.shutdown = make(chan struct{}) + return dht +} + // Read in all messages from swarm and handle them appropriately // NOTE: this function is just a quick sketch func (dht *IpfsDHT) handleMessages() { @@ -61,10 +76,13 @@ func (dht *IpfsDHT) handleMessages() { case DHTMessage_GET_VALUE: dht.handleGetValue(mes.Peer, pmes) case DHTMessage_PUT_VALUE: + dht.handlePutValue(mes.Peer, pmes) case DHTMessage_FIND_NODE: + dht.handleFindNode(mes.Peer, pmes) case DHTMessage_ADD_PROVIDER: case DHTMessage_GET_PROVIDERS: case DHTMessage_PING: + dht.handleFindNode(mes.Peer, pmes) } case <-dht.shutdown: @@ -74,15 +92,22 @@ func (dht *IpfsDHT) handleMessages() { } func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) { - val, found := dht.data[pmes.GetKey()] - if found { + dskey := ds.NewKey(pmes.GetKey()) + i_val, err := dht.datastore.Get(dskey) + if err == nil { isResponse := true resp := new(DHTMessage) resp.Response = &isResponse resp.Id = pmes.Id resp.Key = pmes.Key + + val := i_val.([]byte) resp.Value = val - } else { + + mes := new(swarm.Message) + mes.Peer = p + mes.Data = []byte(resp.String()) + } else if err == ds.ErrNotFound { // Find closest node(s) to desired key and reply with that info // TODO: this will need some other metadata in the protobuf message // to signal to the querying node that the data its receiving @@ -90,8 +115,14 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) { } } +// Store a value in this nodes local storage func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) { - panic("Not implemented.") + dskey := ds.NewKey(pmes.GetKey()) + err := dht.datastore.Put(dskey, pmes.GetValue()) + if err != nil { + // For now, just panic, handle this better later maybe + panic(err) + } } func (dht *IpfsDHT) handleFindNode(p *peer.Peer, pmes *DHTMessage) { @@ -121,6 +152,17 @@ func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan *swarm.Message { return lchan } +func (dht *IpfsDHT) Unlisten(mesid uint64) { + dht.listenLock.Lock() + ch, ok := dht.listeners[mesid] + if ok { + delete(dht.listeners, mesid) + } + dht.listenLock.Unlock() + close(ch) +} + + // Stop all communications from this node and shut down func (dht *IpfsDHT) Halt() { dht.shutdown <- struct{}{} From b12134cd4c8ed8db3627d3f9a5788e58f24b2702 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Thu, 31 Jul 2014 02:55:48 -0700 Subject: [PATCH 11/12] fixed tests --- bitswap/bitswap.go | 1 - swarm/swarm_test.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index f70b45209..c8681e883 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -2,7 +2,6 @@ package bitswap import ( blocks "github.com/jbenet/go-ipfs/blocks" - peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" mh "github.com/jbenet/go-multihash" "time" diff --git a/swarm/swarm_test.go b/swarm/swarm_test.go index 23d34facc..e289a5e77 100644 --- a/swarm/swarm_test.go +++ b/swarm/swarm_test.go @@ -84,7 +84,7 @@ func TestSwarm(t *testing.T) { MsgNum := 1000 for k := 0; k < MsgNum; k++ { for _, p := range peers { - swarm.Chan.Outgoing <- Message{Peer: p, Data: []byte("ping")} + swarm.Chan.Outgoing <- &Message{Peer: p, Data: []byte("ping")} } } From dd08e0ed621dadad0221d0d8beaf64d363455004 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Thu, 31 Jul 2014 03:22:03 -0700 Subject: [PATCH 12/12] config test: write local file --- config/config.go | 4 +++- config/config_test.go | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/config/config.go b/config/config.go index f1907edf1..a69166c31 100644 --- a/config/config.go +++ b/config/config.go @@ -51,7 +51,9 @@ func Load(filename string) (*Config, error) { // if nothing is there, write first config file. if _, err := os.Stat(filename); os.IsNotExist(err) { - WriteFile(filename, []byte(defaultConfigFile)) + if err := WriteFile(filename, []byte(defaultConfigFile)); err != nil { + return nil, err + } } var cfg Config diff --git a/config/config_test.go b/config/config_test.go index 0b79673b5..ffc7ef7af 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -7,7 +7,7 @@ import ( func TestConfig(t *testing.T) { - cfg, err := Load("") + cfg, err := Load(".ipfsconfig") if err != nil { t.Error(err) return