mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-25 23:21:54 +08:00
work on framework for dht message handling
This commit is contained in:
@ -2,6 +2,8 @@ package dht
|
||||
|
||||
import (
|
||||
swarm "github.com/jbenet/go-ipfs/swarm"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
"code.google.com/p/goprotobuf/proto"
|
||||
"sync"
|
||||
)
|
||||
|
||||
@ -14,36 +16,56 @@ type IpfsDHT struct {
|
||||
|
||||
network *swarm.Swarm
|
||||
|
||||
listeners map[uint64]chan swarm.Message
|
||||
// map of channels waiting for reply messages
|
||||
listeners map[uint64]chan *swarm.Message
|
||||
listenLock sync.RWMutex
|
||||
|
||||
// Signal to shutdown dht
|
||||
shutdown chan struct{}
|
||||
}
|
||||
|
||||
// Read in all messages from swarm and handle them appropriately
|
||||
// NOTE: this function is just a quick sketch
|
||||
func (dht *IpfsDHT) handleMessages() {
|
||||
for mes := range dht.network.Chan.Incoming {
|
||||
for {
|
||||
select {
|
||||
case mes := <-dht.network.Chan.Incoming:
|
||||
// Unmarshal message
|
||||
dht.listenLock.RLock()
|
||||
ch, ok := dht.listeners[id]
|
||||
dht.listenLock.RUnlock()
|
||||
if ok {
|
||||
// Send message to waiting goroutine
|
||||
ch <- mes
|
||||
}
|
||||
|
||||
//case closeChan: or something
|
||||
for {
|
||||
select {
|
||||
case mes := <-dht.network.Chan.Incoming:
|
||||
pmes := new(DHTMessage)
|
||||
err := proto.Unmarshal(mes.Data, pmes)
|
||||
if err != nil {
|
||||
u.PErr("Failed to decode protobuf message: %s", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Note: not sure if this is the correct place for this
|
||||
dht.listenLock.RLock()
|
||||
ch, ok := dht.listeners[pmes.GetId()]
|
||||
dht.listenLock.RUnlock()
|
||||
if ok {
|
||||
ch <- mes
|
||||
}
|
||||
//
|
||||
|
||||
// Do something else with the messages?
|
||||
switch pmes.GetType() {
|
||||
case DHTMessage_ADD_PROVIDER:
|
||||
case DHTMessage_FIND_NODE:
|
||||
case DHTMessage_GET_PROVIDERS:
|
||||
case DHTMessage_GET_VALUE:
|
||||
case DHTMessage_PING:
|
||||
case DHTMessage_PUT_VALUE:
|
||||
}
|
||||
|
||||
case <-dht.shutdown:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Register a handler for a specific message ID, used for getting replies
|
||||
// to certain messages (i.e. response to a GET_VALUE message)
|
||||
func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan swarm.Message {
|
||||
lchan := make(chan swarm.Message)
|
||||
func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan *swarm.Message {
|
||||
lchan := make(chan *swarm.Message)
|
||||
dht.listenLock.Lock()
|
||||
dht.listeners[mesid] = lchan
|
||||
dht.listenLock.Unlock()
|
||||
|
@ -23,23 +23,29 @@ var _ = math.Inf
|
||||
type DHTMessage_MessageType int32
|
||||
|
||||
const (
|
||||
DHTMessage_PUT_VALUE DHTMessage_MessageType = 0
|
||||
DHTMessage_GET_VALUE DHTMessage_MessageType = 1
|
||||
DHTMessage_PING DHTMessage_MessageType = 2
|
||||
DHTMessage_FIND_NODE DHTMessage_MessageType = 3
|
||||
DHTMessage_PUT_VALUE DHTMessage_MessageType = 0
|
||||
DHTMessage_GET_VALUE DHTMessage_MessageType = 1
|
||||
DHTMessage_ADD_PROVIDER DHTMessage_MessageType = 2
|
||||
DHTMessage_GET_PROVIDERS DHTMessage_MessageType = 3
|
||||
DHTMessage_FIND_NODE DHTMessage_MessageType = 4
|
||||
DHTMessage_PING DHTMessage_MessageType = 5
|
||||
)
|
||||
|
||||
var DHTMessage_MessageType_name = map[int32]string{
|
||||
0: "PUT_VALUE",
|
||||
1: "GET_VALUE",
|
||||
2: "PING",
|
||||
3: "FIND_NODE",
|
||||
2: "ADD_PROVIDER",
|
||||
3: "GET_PROVIDERS",
|
||||
4: "FIND_NODE",
|
||||
5: "PING",
|
||||
}
|
||||
var DHTMessage_MessageType_value = map[string]int32{
|
||||
"PUT_VALUE": 0,
|
||||
"GET_VALUE": 1,
|
||||
"PING": 2,
|
||||
"FIND_NODE": 3,
|
||||
"PUT_VALUE": 0,
|
||||
"GET_VALUE": 1,
|
||||
"ADD_PROVIDER": 2,
|
||||
"GET_PROVIDERS": 3,
|
||||
"FIND_NODE": 4,
|
||||
"PING": 5,
|
||||
}
|
||||
|
||||
func (x DHTMessage_MessageType) Enum() *DHTMessage_MessageType {
|
||||
@ -63,6 +69,7 @@ type DHTMessage struct {
|
||||
Type *DHTMessage_MessageType `protobuf:"varint,1,req,name=type,enum=dht.DHTMessage_MessageType" json:"type,omitempty"`
|
||||
Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"`
|
||||
Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"`
|
||||
Id *uint64 `protobuf:"varint,4,req,name=id" json:"id,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
@ -91,6 +98,13 @@ func (m *DHTMessage) GetValue() []byte {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *DHTMessage) GetId() uint64 {
|
||||
if m != nil && m.Id != nil {
|
||||
return *m.Id
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterEnum("dht.DHTMessage_MessageType", DHTMessage_MessageType_name, DHTMessage_MessageType_value)
|
||||
}
|
||||
|
@ -15,5 +15,5 @@ message DHTMessage {
|
||||
required MessageType type = 1;
|
||||
optional string key = 2;
|
||||
optional bytes value = 3;
|
||||
required int64 id = 4;
|
||||
required uint64 id = 4;
|
||||
}
|
||||
|
@ -7,6 +7,11 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// TODO: determine a way of creating and managing message IDs
|
||||
func GenerateMessageID() uint64 {
|
||||
return 4
|
||||
}
|
||||
|
||||
// This file implements the Routing interface for the IpfsDHT struct.
|
||||
|
||||
// Basic Put/Get
|
||||
@ -16,9 +21,15 @@ func (s *IpfsDHT) PutValue(key u.Key, value []byte) error {
|
||||
var p *peer.Peer
|
||||
p = s.routes.NearestNode(key)
|
||||
|
||||
pmes := new(PutValue)
|
||||
pmes.Key = &key
|
||||
pmes_type := DHTMessage_PUT_VALUE
|
||||
str_key := string(key)
|
||||
mes_id := GenerateMessageID()
|
||||
|
||||
pmes := new(DHTMessage)
|
||||
pmes.Type = &pmes_type
|
||||
pmes.Key = &str_key
|
||||
pmes.Value = value
|
||||
pmes.Id = &mes_id
|
||||
|
||||
mes := new(swarm.Message)
|
||||
mes.Data = []byte(pmes.String())
|
||||
@ -33,23 +44,27 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
|
||||
var p *peer.Peer
|
||||
p = s.routes.NearestNode(key)
|
||||
|
||||
str_key := string(key)
|
||||
mes_type := DHTMessage_GET_VALUE
|
||||
mes_id := GenerateMessageID()
|
||||
// protobuf structure
|
||||
pmes := new(GetValue)
|
||||
pmes.Key = &key
|
||||
pmes.Id = GenerateMessageID()
|
||||
pmes := new(DHTMessage)
|
||||
pmes.Type = &mes_type
|
||||
pmes.Key = &str_key
|
||||
pmes.Id = &mes_id
|
||||
|
||||
mes := new(swarm.Message)
|
||||
mes.Data = []byte(pmes.String())
|
||||
mes.Peer = p
|
||||
|
||||
response_chan := s.ListenFor(pmes.Id)
|
||||
response_chan := s.ListenFor(*pmes.Id)
|
||||
|
||||
// Wait for either the response or a timeout
|
||||
timeup := time.After(timeout)
|
||||
select {
|
||||
case <-timeup:
|
||||
// TODO: unregister listener
|
||||
return nil, timeoutError
|
||||
return nil, u.ErrTimeout
|
||||
case resp := <-response_chan:
|
||||
return resp.Data, nil
|
||||
}
|
||||
|
@ -3,6 +3,9 @@ package dht
|
||||
import (
|
||||
"bytes"
|
||||
"container/list"
|
||||
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
)
|
||||
|
||||
// ID for IpfsDHT should be a byte slice, to allow for simpler operations
|
||||
@ -22,6 +25,11 @@ type RoutingTable struct {
|
||||
Buckets []Bucket
|
||||
}
|
||||
|
||||
//TODO: make this accept an ID, requires method of converting keys to IDs
|
||||
func (rt *RoutingTable) NearestNode(key u.Key) *peer.Peer {
|
||||
panic("Function not implemented.")
|
||||
}
|
||||
|
||||
func (id ID) Equal(other ID) bool {
|
||||
return bytes.Equal(id, other)
|
||||
}
|
||||
|
@ -21,8 +21,8 @@ type Message struct {
|
||||
|
||||
// Chan is a swam channel, which provides duplex communication and errors.
|
||||
type Chan struct {
|
||||
Outgoing chan Message
|
||||
Incoming chan Message
|
||||
Outgoing chan *Message
|
||||
Incoming chan *Message
|
||||
Errors chan error
|
||||
Close chan bool
|
||||
}
|
||||
@ -30,8 +30,8 @@ type Chan struct {
|
||||
// NewChan constructs a Chan instance, with given buffer size bufsize.
|
||||
func NewChan(bufsize int) *Chan {
|
||||
return &Chan{
|
||||
Outgoing: make(chan Message, bufsize),
|
||||
Incoming: make(chan Message, bufsize),
|
||||
Outgoing: make(chan *Message, bufsize),
|
||||
Incoming: make(chan *Message, bufsize),
|
||||
Errors: make(chan error),
|
||||
Close: make(chan bool, bufsize),
|
||||
}
|
||||
@ -197,7 +197,7 @@ Loop:
|
||||
}
|
||||
|
||||
// wrap it for consumers.
|
||||
msg := Message{Peer: conn.Peer, Data: data}
|
||||
msg := &Message{Peer: conn.Peer, Data: data}
|
||||
s.Chan.Incoming <- msg
|
||||
}
|
||||
}
|
||||
|
@ -14,6 +14,9 @@ var Debug bool
|
||||
// ErrNotImplemented signifies a function has not been implemented yet.
|
||||
var ErrNotImplemented = fmt.Errorf("Error: not implemented yet.")
|
||||
|
||||
// ErrTimeout implies that a timeout has been triggered
|
||||
var ErrTimeout = fmt.Errorf("Error: Call timed out.")
|
||||
|
||||
// Key is a string representation of multihash for use with maps.
|
||||
type Key string
|
||||
|
||||
|
Reference in New Issue
Block a user