diff --git a/routing/dht/dht.go b/routing/dht/dht.go index c7e6f3c2c..e5d7e9422 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -2,6 +2,8 @@ package dht import ( swarm "github.com/jbenet/go-ipfs/swarm" + u "github.com/jbenet/go-ipfs/util" + "code.google.com/p/goprotobuf/proto" "sync" ) @@ -14,36 +16,56 @@ type IpfsDHT struct { network *swarm.Swarm - listeners map[uint64]chan swarm.Message + // map of channels waiting for reply messages + listeners map[uint64]chan *swarm.Message listenLock sync.RWMutex + + // Signal to shutdown dht + shutdown chan struct{} } // Read in all messages from swarm and handle them appropriately // NOTE: this function is just a quick sketch func (dht *IpfsDHT) handleMessages() { - for mes := range dht.network.Chan.Incoming { - for { - select { - case mes := <-dht.network.Chan.Incoming: - // Unmarshal message - dht.listenLock.RLock() - ch, ok := dht.listeners[id] - dht.listenLock.RUnlock() - if ok { - // Send message to waiting goroutine - ch <- mes - } - - //case closeChan: or something + for { + select { + case mes := <-dht.network.Chan.Incoming: + pmes := new(DHTMessage) + err := proto.Unmarshal(mes.Data, pmes) + if err != nil { + u.PErr("Failed to decode protobuf message: %s", err) + continue } + + // Note: not sure if this is the correct place for this + dht.listenLock.RLock() + ch, ok := dht.listeners[pmes.GetId()] + dht.listenLock.RUnlock() + if ok { + ch <- mes + } + // + + // Do something else with the messages? + switch pmes.GetType() { + case DHTMessage_ADD_PROVIDER: + case DHTMessage_FIND_NODE: + case DHTMessage_GET_PROVIDERS: + case DHTMessage_GET_VALUE: + case DHTMessage_PING: + case DHTMessage_PUT_VALUE: + } + + case <-dht.shutdown: + return } } } // Register a handler for a specific message ID, used for getting replies // to certain messages (i.e. response to a GET_VALUE message) -func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan swarm.Message { - lchan := make(chan swarm.Message) +func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan *swarm.Message { + lchan := make(chan *swarm.Message) dht.listenLock.Lock() dht.listeners[mesid] = lchan dht.listenLock.Unlock() diff --git a/routing/dht/messages.pb.go b/routing/dht/messages.pb.go index 718f86b92..f4fcb8dfd 100644 --- a/routing/dht/messages.pb.go +++ b/routing/dht/messages.pb.go @@ -23,23 +23,29 @@ var _ = math.Inf type DHTMessage_MessageType int32 const ( - DHTMessage_PUT_VALUE DHTMessage_MessageType = 0 - DHTMessage_GET_VALUE DHTMessage_MessageType = 1 - DHTMessage_PING DHTMessage_MessageType = 2 - DHTMessage_FIND_NODE DHTMessage_MessageType = 3 + DHTMessage_PUT_VALUE DHTMessage_MessageType = 0 + DHTMessage_GET_VALUE DHTMessage_MessageType = 1 + DHTMessage_ADD_PROVIDER DHTMessage_MessageType = 2 + DHTMessage_GET_PROVIDERS DHTMessage_MessageType = 3 + DHTMessage_FIND_NODE DHTMessage_MessageType = 4 + DHTMessage_PING DHTMessage_MessageType = 5 ) var DHTMessage_MessageType_name = map[int32]string{ 0: "PUT_VALUE", 1: "GET_VALUE", - 2: "PING", - 3: "FIND_NODE", + 2: "ADD_PROVIDER", + 3: "GET_PROVIDERS", + 4: "FIND_NODE", + 5: "PING", } var DHTMessage_MessageType_value = map[string]int32{ - "PUT_VALUE": 0, - "GET_VALUE": 1, - "PING": 2, - "FIND_NODE": 3, + "PUT_VALUE": 0, + "GET_VALUE": 1, + "ADD_PROVIDER": 2, + "GET_PROVIDERS": 3, + "FIND_NODE": 4, + "PING": 5, } func (x DHTMessage_MessageType) Enum() *DHTMessage_MessageType { @@ -63,6 +69,7 @@ type DHTMessage struct { Type *DHTMessage_MessageType `protobuf:"varint,1,req,name=type,enum=dht.DHTMessage_MessageType" json:"type,omitempty"` Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"` Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"` + Id *uint64 `protobuf:"varint,4,req,name=id" json:"id,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -91,6 +98,13 @@ func (m *DHTMessage) GetValue() []byte { return nil } +func (m *DHTMessage) GetId() uint64 { + if m != nil && m.Id != nil { + return *m.Id + } + return 0 +} + func init() { proto.RegisterEnum("dht.DHTMessage_MessageType", DHTMessage_MessageType_name, DHTMessage_MessageType_value) } diff --git a/routing/dht/messages.proto b/routing/dht/messages.proto index 37024037b..67ffad447 100644 --- a/routing/dht/messages.proto +++ b/routing/dht/messages.proto @@ -15,5 +15,5 @@ message DHTMessage { required MessageType type = 1; optional string key = 2; optional bytes value = 3; - required int64 id = 4; + required uint64 id = 4; } diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 2bbe93e32..dfdde4dd1 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -7,6 +7,11 @@ import ( "time" ) +// TODO: determine a way of creating and managing message IDs +func GenerateMessageID() uint64 { + return 4 +} + // This file implements the Routing interface for the IpfsDHT struct. // Basic Put/Get @@ -16,9 +21,15 @@ func (s *IpfsDHT) PutValue(key u.Key, value []byte) error { var p *peer.Peer p = s.routes.NearestNode(key) - pmes := new(PutValue) - pmes.Key = &key + pmes_type := DHTMessage_PUT_VALUE + str_key := string(key) + mes_id := GenerateMessageID() + + pmes := new(DHTMessage) + pmes.Type = &pmes_type + pmes.Key = &str_key pmes.Value = value + pmes.Id = &mes_id mes := new(swarm.Message) mes.Data = []byte(pmes.String()) @@ -33,23 +44,27 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { var p *peer.Peer p = s.routes.NearestNode(key) + str_key := string(key) + mes_type := DHTMessage_GET_VALUE + mes_id := GenerateMessageID() // protobuf structure - pmes := new(GetValue) - pmes.Key = &key - pmes.Id = GenerateMessageID() + pmes := new(DHTMessage) + pmes.Type = &mes_type + pmes.Key = &str_key + pmes.Id = &mes_id mes := new(swarm.Message) mes.Data = []byte(pmes.String()) mes.Peer = p - response_chan := s.ListenFor(pmes.Id) + response_chan := s.ListenFor(*pmes.Id) // Wait for either the response or a timeout timeup := time.After(timeout) select { case <-timeup: // TODO: unregister listener - return nil, timeoutError + return nil, u.ErrTimeout case resp := <-response_chan: return resp.Data, nil } diff --git a/routing/dht/table.go b/routing/dht/table.go index 53fd0f0d1..d7625e462 100644 --- a/routing/dht/table.go +++ b/routing/dht/table.go @@ -3,6 +3,9 @@ package dht import ( "bytes" "container/list" + + peer "github.com/jbenet/go-ipfs/peer" + u "github.com/jbenet/go-ipfs/util" ) // ID for IpfsDHT should be a byte slice, to allow for simpler operations @@ -22,6 +25,11 @@ type RoutingTable struct { Buckets []Bucket } +//TODO: make this accept an ID, requires method of converting keys to IDs +func (rt *RoutingTable) NearestNode(key u.Key) *peer.Peer { + panic("Function not implemented.") +} + func (id ID) Equal(other ID) bool { return bytes.Equal(id, other) } diff --git a/swarm/swarm.go b/swarm/swarm.go index 1e23a8e74..fccc74777 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -21,8 +21,8 @@ type Message struct { // Chan is a swam channel, which provides duplex communication and errors. type Chan struct { - Outgoing chan Message - Incoming chan Message + Outgoing chan *Message + Incoming chan *Message Errors chan error Close chan bool } @@ -30,8 +30,8 @@ type Chan struct { // NewChan constructs a Chan instance, with given buffer size bufsize. func NewChan(bufsize int) *Chan { return &Chan{ - Outgoing: make(chan Message, bufsize), - Incoming: make(chan Message, bufsize), + Outgoing: make(chan *Message, bufsize), + Incoming: make(chan *Message, bufsize), Errors: make(chan error), Close: make(chan bool, bufsize), } @@ -197,7 +197,7 @@ Loop: } // wrap it for consumers. - msg := Message{Peer: conn.Peer, Data: data} + msg := &Message{Peer: conn.Peer, Data: data} s.Chan.Incoming <- msg } } diff --git a/util/util.go b/util/util.go index 639b913c9..69831ff8d 100644 --- a/util/util.go +++ b/util/util.go @@ -14,6 +14,9 @@ var Debug bool // ErrNotImplemented signifies a function has not been implemented yet. var ErrNotImplemented = fmt.Errorf("Error: not implemented yet.") +// ErrTimeout implies that a timeout has been triggered +var ErrTimeout = fmt.Errorf("Error: Call timed out.") + // Key is a string representation of multihash for use with maps. type Key string