1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-09-11 15:15:58 +08:00

add in message type routing to the swarm object. tired, needs cleanup.

This commit is contained in:
Jeromy
2014-08-19 22:05:49 -07:00
committed by Juan Batiz-Benet
parent 060930c486
commit afdac2ca3a
8 changed files with 167 additions and 60 deletions

View File

@ -100,10 +100,11 @@ func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
func (dht *IpfsDHT) handleMessages() { func (dht *IpfsDHT) handleMessages() {
u.DOut("Begin message handling routine\n") u.DOut("Begin message handling routine\n")
ch := dht.network.GetChan() errs := dht.network.GetErrChan()
dhtmes := dht.network.GetChannel(swarm.PBWrapper_DHT_MESSAGE)
for { for {
select { select {
case mes, ok := <-ch.Incoming: case mes, ok := <-dhtmes:
if !ok { if !ok {
u.DOut("handleMessages closing, bad recv on incoming\n") u.DOut("handleMessages closing, bad recv on incoming\n")
return return
@ -147,7 +148,7 @@ func (dht *IpfsDHT) handleMessages() {
u.PErr("Recieved invalid message type") u.PErr("Recieved invalid message type")
} }
case err := <-ch.Errors: case err := <-errs:
u.PErr("dht err: %s\n", err) u.PErr("dht err: %s\n", err)
case <-dht.shutdown: case <-dht.shutdown:
return return

View File

@ -132,8 +132,8 @@ func TestValueGetSet(t *testing.T) {
dhtA.Start() dhtA.Start()
dhtB.Start() dhtB.Start()
errsa := dhtA.network.GetChan().Errors errsa := dhtA.network.GetErrChan()
errsb := dhtB.network.GetChan().Errors errsb := dhtB.network.GetErrChan()
go func() { go func() {
select { select {
case err := <-errsa: case err := <-errsa:

View File

@ -66,8 +66,12 @@ func (f *fauxNet) Send(mes *swarm.Message) {
f.Chan.Outgoing <- mes f.Chan.Outgoing <- mes
} }
func (f *fauxNet) GetChan() *swarm.Chan { func (f *fauxNet) GetErrChan() chan error {
return f.Chan return f.Chan.Errors
}
func (f *fauxNet) GetChannel(t swarm.PBWrapper_MessageType) chan *swarm.Message {
return f.Chan.Incoming
} }
func (f *fauxNet) Connect(addr *ma.Multiaddr) (*peer.Peer, error) { func (f *fauxNet) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
@ -167,7 +171,6 @@ func _randPeer() *peer.Peer {
} }
func TestNotFound(t *testing.T) { func TestNotFound(t *testing.T) {
u.Debug = true
fn := newFauxNet() fn := newFauxNet()
fn.Listen() fn.Listen()
@ -225,3 +228,64 @@ func TestNotFound(t *testing.T) {
} }
t.Fatal("Expected to recieve an error.") t.Fatal("Expected to recieve an error.")
} }
// If less than K nodes are in the entire network, it should fail when we make
// a GET rpc and nobody has the value
func TestLessThanKResponses(t *testing.T) {
u.Debug = false
fn := newFauxNet()
fn.Listen()
local := new(peer.Peer)
local.ID = peer.ID("test_peer")
d := NewDHT(local, fn)
d.Start()
var ps []*peer.Peer
for i := 0; i < 5; i++ {
ps = append(ps, _randPeer())
d.Update(ps[i])
}
other := _randPeer()
// Reply with random peers to every message
fn.AddHandler(func(mes *swarm.Message) *swarm.Message {
t.Log("Handling message...")
pmes := new(PBDHTMessage)
err := proto.Unmarshal(mes.Data, pmes)
if err != nil {
t.Fatal(err)
}
switch pmes.GetType() {
case PBDHTMessage_GET_VALUE:
resp := Message{
Type: pmes.GetType(),
ID: pmes.GetId(),
Response: true,
Success: false,
Peers: []*peer.Peer{other},
}
return swarm.NewMessage(mes.Peer, resp.ToProtobuf())
default:
panic("Shouldnt recieve this.")
}
})
_, err := d.GetValue(u.Key("hello"), time.Second*30)
if err != nil {
switch err {
case u.ErrNotFound:
//Success!
return
case u.ErrTimeout:
t.Fatal("Should not have gotten timeout!")
default:
t.Fatalf("Got unexpected error: %s", err)
}
}
t.Fatal("Expected to recieve an error.")
}

View File

@ -3,8 +3,8 @@ package dht
import ( import (
"time" "time"
u "github.com/jbenet/go-ipfs/util"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
) )
type ProviderManager struct { type ProviderManager struct {

View File

@ -164,7 +164,8 @@ func (dht *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
case p := <-npeerChan: case p := <-npeerChan:
count++ count++
if count >= KValue { if count >= KValue {
break errChan <- u.ErrNotFound
return
} }
c.Increment() c.Increment()
@ -172,16 +173,15 @@ func (dht *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
default: default:
if c.Size() == 0 { if c.Size() == 0 {
errChan <- u.ErrNotFound errChan <- u.ErrNotFound
return
} }
} }
} }
}() }()
process := func() { process := func() {
for { for p := range procPeer {
select { if p == nil {
case p, ok := <-procPeer:
if !ok || p == nil {
c.Decrement() c.Decrement()
return return
} }
@ -207,7 +207,6 @@ func (dht *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
c.Decrement() c.Decrement()
} }
} }
}
for i := 0; i < AlphaValue; i++ { for i := 0; i < AlphaValue; i++ {
go process() go process()

View File

@ -40,8 +40,6 @@ func Dial(network string, peer *peer.Peer) (*Conn, error) {
return nil, err return nil, err
} }
fmt.Printf("Making connection to: %s\n", host)
nconn, err := net.Dial(network, host) nconn, err := net.Dial(network, host)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -14,7 +14,8 @@ type Network interface {
Listen() error Listen() error
ConnectNew(*ma.Multiaddr) (*peer.Peer, error) ConnectNew(*ma.Multiaddr) (*peer.Peer, error)
GetConnection(id peer.ID, addr *ma.Multiaddr) (*peer.Peer, error) GetConnection(id peer.ID, addr *ma.Multiaddr) (*peer.Peer, error)
GetChan() *Chan GetErrChan() chan error
GetChannel(PBWrapper_MessageType) chan *Message
Close() Close()
Drop(*peer.Peer) error Drop(*peer.Peer) error
} }

View File

@ -84,6 +84,10 @@ type Swarm struct {
conns ConnMap conns ConnMap
connsLock sync.RWMutex connsLock sync.RWMutex
filterChans map[PBWrapper_MessageType]chan *Message
toFilter chan *Message
newFilters chan *newFilterInfo
local *peer.Peer local *peer.Peer
listeners []net.Listener listeners []net.Listener
} }
@ -94,7 +98,11 @@ func NewSwarm(local *peer.Peer) *Swarm {
Chan: NewChan(10), Chan: NewChan(10),
conns: ConnMap{}, conns: ConnMap{},
local: local, local: local,
filterChans: make(map[PBWrapper_MessageType]chan *Message),
toFilter: make(chan *Message, 32),
newFilters: make(chan *newFilterInfo),
} }
go s.routeMessages()
go s.fanOut() go s.fanOut()
return s return s
} }
@ -299,15 +307,8 @@ func (s *Swarm) fanIn(conn *Conn) {
goto out goto out
} }
wrapper, err := Unwrap(data) msg := &Message{Peer: conn.Peer, Data: data}
if err != nil { s.toFilter <- msg
s.Error(err)
continue
}
// wrap it for consumers.
msg := &Message{Peer: conn.Peer, Data: wrapper.GetMessage()}
s.Chan.Incoming <- msg
} }
} }
out: out:
@ -317,6 +318,39 @@ out:
s.connsLock.Unlock() s.connsLock.Unlock()
} }
type newFilterInfo struct {
Type PBWrapper_MessageType
resp chan chan *Message
}
func (s *Swarm) routeMessages() {
for {
select {
case mes, ok := <-s.toFilter:
if !ok {
return
}
wrapper, err := Unwrap(mes.Data)
if err != nil {
u.PErr("error in route messages: %s\n", err)
}
ch, ok := s.filterChans[PBWrapper_MessageType(wrapper.GetType())]
if !ok {
u.PErr("Received message with invalid type: %d\n", wrapper.GetType())
continue
}
mes.Data = wrapper.GetMessage()
ch <- mes
case gchan := <-s.newFilters:
nch := make(chan *Message)
s.filterChans[gchan.Type] = nch
gchan.resp <- nch
}
}
}
func (s *Swarm) Find(key u.Key) *peer.Peer { func (s *Swarm) Find(key u.Key) *peer.Peer {
s.connsLock.RLock() s.connsLock.RLock()
defer s.connsLock.RUnlock() defer s.connsLock.RUnlock()
@ -414,8 +448,8 @@ func (s *Swarm) Error(e error) {
s.Chan.Errors <- e s.Chan.Errors <- e
} }
func (s *Swarm) GetChan() *Chan { func (s *Swarm) GetErrChan() chan error {
return s.Chan return s.Chan.Errors
} }
func Wrap(data []byte, typ PBWrapper_MessageType) ([]byte, error) { func Wrap(data []byte, typ PBWrapper_MessageType) ([]byte, error) {
@ -439,5 +473,15 @@ func Unwrap(data []byte) (*PBWrapper, error) {
return mes, nil return mes, nil
} }
func (s *Swarm) GetChannel(typ PBWrapper_MessageType) chan *Message {
nfi := &newFilterInfo{
Type: typ,
resp: make(chan chan *Message),
}
s.newFilters <- nfi
return <-nfi.resp
}
// Temporary to ensure that the Swarm always matches the Network interface as we are changing it // Temporary to ensure that the Swarm always matches the Network interface as we are changing it
var _ Network = &Swarm{} var _ Network = &Swarm{}