mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-29 09:34:03 +08:00
add some more tests in
This commit is contained in:
@ -1,6 +1,7 @@
|
|||||||
package dht
|
package dht
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"code.google.com/p/goprotobuf/proto"
|
||||||
peer "github.com/jbenet/go-ipfs/peer"
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -17,12 +18,16 @@ type Message struct {
|
|||||||
|
|
||||||
func peerInfo(p *peer.Peer) *PBDHTMessage_PBPeer {
|
func peerInfo(p *peer.Peer) *PBDHTMessage_PBPeer {
|
||||||
pbp := new(PBDHTMessage_PBPeer)
|
pbp := new(PBDHTMessage_PBPeer)
|
||||||
addr, err := p.Addresses[0].String()
|
if len(p.Addresses) == 0 || p.Addresses[0] == nil {
|
||||||
if err != nil {
|
pbp.Addr = proto.String("")
|
||||||
//Temp: what situations could cause this?
|
} else {
|
||||||
panic(err)
|
addr, err := p.Addresses[0].String()
|
||||||
|
if err != nil {
|
||||||
|
//Temp: what situations could cause this?
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
pbp.Addr = &addr
|
||||||
}
|
}
|
||||||
pbp.Addr = &addr
|
|
||||||
pid := string(p.ID)
|
pid := string(p.ID)
|
||||||
pbp.Id = &pid
|
pbp.Id = &pid
|
||||||
return pbp
|
return pbp
|
||||||
|
@ -87,7 +87,7 @@ func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
|
|||||||
// NOTE: this should be done better...
|
// NOTE: this should be done better...
|
||||||
err = dht.Ping(npeer, time.Second*2)
|
err = dht.Ping(npeer, time.Second*2)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.New("failed to ping newly connected peer")
|
return nil, errors.New("failed to ping newly connected peer\n")
|
||||||
}
|
}
|
||||||
|
|
||||||
dht.Update(npeer)
|
dht.Update(npeer)
|
||||||
@ -98,14 +98,14 @@ func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
|
|||||||
// Read in all messages from swarm and handle them appropriately
|
// Read in all messages from swarm and handle them appropriately
|
||||||
// NOTE: this function is just a quick sketch
|
// NOTE: this function is just a quick sketch
|
||||||
func (dht *IpfsDHT) handleMessages() {
|
func (dht *IpfsDHT) handleMessages() {
|
||||||
u.DOut("Begin message handling routine")
|
u.DOut("Begin message handling routine\n")
|
||||||
|
|
||||||
ch := dht.network.GetChan()
|
ch := dht.network.GetChan()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case mes, ok := <-ch.Incoming:
|
case mes, ok := <-ch.Incoming:
|
||||||
if !ok {
|
if !ok {
|
||||||
u.DOut("handleMessages closing, bad recv on incoming")
|
u.DOut("handleMessages closing, bad recv on incoming\n")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
pmes := new(PBDHTMessage)
|
pmes := new(PBDHTMessage)
|
||||||
@ -178,7 +178,7 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) {
|
|||||||
}
|
}
|
||||||
iVal, err := dht.datastore.Get(dskey)
|
iVal, err := dht.datastore.Get(dskey)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
u.DOut("handleGetValue success!")
|
u.DOut("handleGetValue success!\n")
|
||||||
resp.Success = true
|
resp.Success = true
|
||||||
resp.Value = iVal.([]byte)
|
resp.Value = iVal.([]byte)
|
||||||
} else if err == ds.ErrNotFound {
|
} else if err == ds.ErrNotFound {
|
||||||
@ -195,7 +195,7 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) {
|
|||||||
level := 0
|
level := 0
|
||||||
if len(pmes.GetValue()) < 1 {
|
if len(pmes.GetValue()) < 1 {
|
||||||
// TODO: maybe return an error? Defaulting isnt a good idea IMO
|
// TODO: maybe return an error? Defaulting isnt a good idea IMO
|
||||||
u.PErr("handleGetValue: no routing level specified, assuming 0")
|
u.PErr("handleGetValue: no routing level specified, assuming 0\n")
|
||||||
} else {
|
} else {
|
||||||
level = int(pmes.GetValue()[0]) // Using value field to specify cluster level
|
level = int(pmes.GetValue()[0]) // Using value field to specify cluster level
|
||||||
}
|
}
|
||||||
@ -204,14 +204,14 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) {
|
|||||||
closer := dht.routingTables[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
|
closer := dht.routingTables[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
|
||||||
|
|
||||||
if closer.ID.Equal(dht.self.ID) {
|
if closer.ID.Equal(dht.self.ID) {
|
||||||
u.DOut("Attempted to return self! this shouldnt happen...")
|
u.DOut("Attempted to return self! this shouldnt happen...\n")
|
||||||
resp.Peers = nil
|
resp.Peers = nil
|
||||||
goto out
|
goto out
|
||||||
}
|
}
|
||||||
// If this peer is closer than the one from the table, return nil
|
// If this peer is closer than the one from the table, return nil
|
||||||
if kb.Closer(dht.self.ID, closer.ID, u.Key(pmes.GetKey())) {
|
if kb.Closer(dht.self.ID, closer.ID, u.Key(pmes.GetKey())) {
|
||||||
resp.Peers = nil
|
resp.Peers = nil
|
||||||
u.DOut("handleGetValue could not find a closer node than myself.")
|
u.DOut("handleGetValue could not find a closer node than myself.\n")
|
||||||
} else {
|
} else {
|
||||||
u.DOut("handleGetValue returning a closer peer: '%s'\n", closer.ID.Pretty())
|
u.DOut("handleGetValue returning a closer peer: '%s'\n", closer.ID.Pretty())
|
||||||
resp.Peers = []*peer.Peer{closer}
|
resp.Peers = []*peer.Peer{closer}
|
||||||
@ -263,12 +263,12 @@ func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *PBDHTMessage) {
|
|||||||
u.DOut("handleFindPeer: searching for '%s'\n", peer.ID(pmes.GetKey()).Pretty())
|
u.DOut("handleFindPeer: searching for '%s'\n", peer.ID(pmes.GetKey()).Pretty())
|
||||||
closest := dht.routingTables[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
|
closest := dht.routingTables[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
|
||||||
if closest == nil {
|
if closest == nil {
|
||||||
u.PErr("handleFindPeer: could not find anything.")
|
u.PErr("handleFindPeer: could not find anything.\n")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(closest.Addresses) == 0 {
|
if len(closest.Addresses) == 0 {
|
||||||
u.PErr("handleFindPeer: no addresses for connected peer...")
|
u.PErr("handleFindPeer: no addresses for connected peer...\n")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -438,7 +438,7 @@ func (dht *IpfsDHT) getValueSingle(p *peer.Peer, key u.Key, timeout time.Duratio
|
|||||||
return nil, u.ErrTimeout
|
return nil, u.ErrTimeout
|
||||||
case resp, ok := <-responseChan:
|
case resp, ok := <-responseChan:
|
||||||
if !ok {
|
if !ok {
|
||||||
u.PErr("response channel closed before timeout, please investigate.")
|
u.PErr("response channel closed before timeout, please investigate.\n")
|
||||||
return nil, u.ErrTimeout
|
return nil, u.ErrTimeout
|
||||||
}
|
}
|
||||||
roundtrip := time.Since(t)
|
roundtrip := time.Since(t)
|
||||||
@ -587,7 +587,7 @@ func (dht *IpfsDHT) findProvidersSingle(p *peer.Peer, key u.Key, level int, time
|
|||||||
dht.listener.Unlisten(pmes.ID)
|
dht.listener.Unlisten(pmes.ID)
|
||||||
return nil, u.ErrTimeout
|
return nil, u.ErrTimeout
|
||||||
case resp := <-listenChan:
|
case resp := <-listenChan:
|
||||||
u.DOut("FindProviders: got response.")
|
u.DOut("FindProviders: got response.\n")
|
||||||
pmesOut := new(PBDHTMessage)
|
pmesOut := new(PBDHTMessage)
|
||||||
err := proto.Unmarshal(resp.Data, pmesOut)
|
err := proto.Unmarshal(resp.Data, pmesOut)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -3,6 +3,8 @@ package dht
|
|||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
crand "crypto/rand"
|
||||||
|
|
||||||
"code.google.com/p/goprotobuf/proto"
|
"code.google.com/p/goprotobuf/proto"
|
||||||
|
|
||||||
peer "github.com/jbenet/go-ipfs/peer"
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
@ -72,6 +74,10 @@ func (f *fauxNet) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *fauxNet) GetConnection(id peer.ID, addr *ma.Multiaddr) (*peer.Peer, error) {
|
||||||
|
return &peer.Peer{ID: id, Addresses: []*ma.Multiaddr{addr}}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func TestGetFailures(t *testing.T) {
|
func TestGetFailures(t *testing.T) {
|
||||||
fn := newFauxNet()
|
fn := newFauxNet()
|
||||||
fn.Listen()
|
fn.Listen()
|
||||||
@ -150,3 +156,73 @@ func TestGetFailures(t *testing.T) {
|
|||||||
|
|
||||||
<-success
|
<-success
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: Maybe put these in some sort of "ipfs_testutil" package
|
||||||
|
func _randPeer() *peer.Peer {
|
||||||
|
p := new(peer.Peer)
|
||||||
|
p.ID = make(peer.ID, 16)
|
||||||
|
p.Addresses = []*ma.Multiaddr{nil}
|
||||||
|
crand.Read(p.ID)
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNotFound(t *testing.T) {
|
||||||
|
u.Debug = true
|
||||||
|
fn := newFauxNet()
|
||||||
|
fn.Listen()
|
||||||
|
|
||||||
|
local := new(peer.Peer)
|
||||||
|
local.ID = peer.ID("test_peer")
|
||||||
|
|
||||||
|
d := NewDHT(local, fn)
|
||||||
|
d.Start()
|
||||||
|
|
||||||
|
var ps []*peer.Peer
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
ps = append(ps, _randPeer())
|
||||||
|
d.Update(ps[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reply with random peers to every message
|
||||||
|
fn.AddHandler(func(mes *swarm.Message) *swarm.Message {
|
||||||
|
t.Log("Handling message...")
|
||||||
|
pmes := new(PBDHTMessage)
|
||||||
|
err := proto.Unmarshal(mes.Data, pmes)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch pmes.GetType() {
|
||||||
|
case PBDHTMessage_GET_VALUE:
|
||||||
|
resp := Message{
|
||||||
|
Type: pmes.GetType(),
|
||||||
|
ID: pmes.GetId(),
|
||||||
|
Response: true,
|
||||||
|
Success: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < 7; i++ {
|
||||||
|
resp.Peers = append(resp.Peers, _randPeer())
|
||||||
|
}
|
||||||
|
return swarm.NewMessage(mes.Peer, resp.ToProtobuf())
|
||||||
|
default:
|
||||||
|
panic("Shouldnt recieve this.")
|
||||||
|
}
|
||||||
|
|
||||||
|
})
|
||||||
|
|
||||||
|
_, err := d.GetValue(u.Key("hello"), time.Second*30)
|
||||||
|
if err != nil {
|
||||||
|
switch err {
|
||||||
|
case u.ErrNotFound:
|
||||||
|
t.Fail()
|
||||||
|
//Success!
|
||||||
|
return
|
||||||
|
case u.ErrTimeout:
|
||||||
|
t.Fatal("Should not have gotten timeout!")
|
||||||
|
default:
|
||||||
|
t.Fatalf("Got unexpected error: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
t.Fatal("Expected to recieve an error.")
|
||||||
|
}
|
||||||
|
33
routing/dht/mes_listener_test.go
Normal file
33
routing/dht/mes_listener_test.go
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
package dht
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/jbenet/go-ipfs/peer"
|
||||||
|
"github.com/jbenet/go-ipfs/swarm"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Ensure that the Message Listeners basic functionality works
|
||||||
|
func TestMesListenerBasic(t *testing.T) {
|
||||||
|
ml := newMesListener()
|
||||||
|
a := GenerateMessageID()
|
||||||
|
resp := ml.Listen(a, 1, time.Minute)
|
||||||
|
|
||||||
|
pmes := new(swarm.PBWrapper)
|
||||||
|
pmes.Message = []byte("Hello")
|
||||||
|
pmes.Type = new(swarm.PBWrapper_MessageType)
|
||||||
|
mes := swarm.NewMessage(new(peer.Peer), pmes)
|
||||||
|
|
||||||
|
go ml.Respond(a, mes)
|
||||||
|
|
||||||
|
del := time.After(time.Millisecond * 10)
|
||||||
|
select {
|
||||||
|
case get := <-resp:
|
||||||
|
if string(get.Data) != string(mes.Data) {
|
||||||
|
t.Fatal("Something got really messed up")
|
||||||
|
}
|
||||||
|
case <-del:
|
||||||
|
t.Fatal("Waiting on message response timed out.")
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user