mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-02 03:28:25 +08:00
implement a mock dht for use in testing
This commit is contained in:
@ -16,6 +16,7 @@ import (
|
|||||||
strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
|
strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
|
||||||
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
|
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
|
||||||
peer "github.com/jbenet/go-ipfs/peer"
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
|
mock "github.com/jbenet/go-ipfs/routing/mock"
|
||||||
util "github.com/jbenet/go-ipfs/util"
|
util "github.com/jbenet/go-ipfs/util"
|
||||||
testutil "github.com/jbenet/go-ipfs/util/testutil"
|
testutil "github.com/jbenet/go-ipfs/util/testutil"
|
||||||
)
|
)
|
||||||
@ -23,7 +24,7 @@ import (
|
|||||||
func TestGetBlockTimeout(t *testing.T) {
|
func TestGetBlockTimeout(t *testing.T) {
|
||||||
|
|
||||||
net := tn.VirtualNetwork()
|
net := tn.VirtualNetwork()
|
||||||
rs := tn.VirtualRoutingServer()
|
rs := mock.VirtualRoutingServer()
|
||||||
g := NewSessionGenerator(net, rs)
|
g := NewSessionGenerator(net, rs)
|
||||||
|
|
||||||
self := g.Next()
|
self := g.Next()
|
||||||
@ -40,7 +41,7 @@ func TestGetBlockTimeout(t *testing.T) {
|
|||||||
func TestProviderForKeyButNetworkCannotFind(t *testing.T) {
|
func TestProviderForKeyButNetworkCannotFind(t *testing.T) {
|
||||||
|
|
||||||
net := tn.VirtualNetwork()
|
net := tn.VirtualNetwork()
|
||||||
rs := tn.VirtualRoutingServer()
|
rs := mock.VirtualRoutingServer()
|
||||||
g := NewSessionGenerator(net, rs)
|
g := NewSessionGenerator(net, rs)
|
||||||
|
|
||||||
block := testutil.NewBlockOrFail(t, "block")
|
block := testutil.NewBlockOrFail(t, "block")
|
||||||
@ -61,7 +62,7 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) {
|
|||||||
func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
|
func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
|
||||||
|
|
||||||
net := tn.VirtualNetwork()
|
net := tn.VirtualNetwork()
|
||||||
rs := tn.VirtualRoutingServer()
|
rs := mock.VirtualRoutingServer()
|
||||||
block := testutil.NewBlockOrFail(t, "block")
|
block := testutil.NewBlockOrFail(t, "block")
|
||||||
g := NewSessionGenerator(net, rs)
|
g := NewSessionGenerator(net, rs)
|
||||||
|
|
||||||
@ -90,7 +91,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
|
|||||||
|
|
||||||
func TestSwarm(t *testing.T) {
|
func TestSwarm(t *testing.T) {
|
||||||
net := tn.VirtualNetwork()
|
net := tn.VirtualNetwork()
|
||||||
rs := tn.VirtualRoutingServer()
|
rs := mock.VirtualRoutingServer()
|
||||||
sg := NewSessionGenerator(net, rs)
|
sg := NewSessionGenerator(net, rs)
|
||||||
bg := NewBlockGenerator(t)
|
bg := NewBlockGenerator(t)
|
||||||
|
|
||||||
@ -151,7 +152,7 @@ func TestSendToWantingPeer(t *testing.T) {
|
|||||||
util.Debug = true
|
util.Debug = true
|
||||||
|
|
||||||
net := tn.VirtualNetwork()
|
net := tn.VirtualNetwork()
|
||||||
rs := tn.VirtualRoutingServer()
|
rs := mock.VirtualRoutingServer()
|
||||||
sg := NewSessionGenerator(net, rs)
|
sg := NewSessionGenerator(net, rs)
|
||||||
bg := NewBlockGenerator(t)
|
bg := NewBlockGenerator(t)
|
||||||
|
|
||||||
@ -237,7 +238,7 @@ func (bg *BlockGenerator) Blocks(n int) []*blocks.Block {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewSessionGenerator(
|
func NewSessionGenerator(
|
||||||
net tn.Network, rs tn.RoutingServer) SessionGenerator {
|
net tn.Network, rs mock.RoutingServer) SessionGenerator {
|
||||||
return SessionGenerator{
|
return SessionGenerator{
|
||||||
net: net,
|
net: net,
|
||||||
rs: rs,
|
rs: rs,
|
||||||
@ -248,7 +249,7 @@ func NewSessionGenerator(
|
|||||||
type SessionGenerator struct {
|
type SessionGenerator struct {
|
||||||
seq int
|
seq int
|
||||||
net tn.Network
|
net tn.Network
|
||||||
rs tn.RoutingServer
|
rs mock.RoutingServer
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *SessionGenerator) Next() instance {
|
func (g *SessionGenerator) Next() instance {
|
||||||
@ -276,11 +277,12 @@ type instance struct {
|
|||||||
// NB: It's easy make mistakes by providing the same peer ID to two different
|
// NB: It's easy make mistakes by providing the same peer ID to two different
|
||||||
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's
|
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's
|
||||||
// just a much better idea.
|
// just a much better idea.
|
||||||
func session(net tn.Network, rs tn.RoutingServer, id peer.ID) instance {
|
func session(net tn.Network, rs mock.RoutingServer, id peer.ID) instance {
|
||||||
p := &peer.Peer{ID: id}
|
p := &peer.Peer{ID: id}
|
||||||
|
|
||||||
adapter := net.Adapter(p)
|
adapter := net.Adapter(p)
|
||||||
htc := rs.Client(p)
|
htc := mock.NewMockRouter(p, nil)
|
||||||
|
htc.SetRoutingServer(rs)
|
||||||
|
|
||||||
blockstore := bstore.NewBlockstore(ds.NewMapDatastore())
|
blockstore := bstore.NewBlockstore(ds.NewMapDatastore())
|
||||||
const alwaysSendToPeer = true
|
const alwaysSendToPeer = true
|
||||||
|
@ -1,97 +1 @@
|
|||||||
package bitswap
|
package bitswap
|
||||||
|
|
||||||
import (
|
|
||||||
"math/rand"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
|
||||||
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
|
|
||||||
peer "github.com/jbenet/go-ipfs/peer"
|
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
|
||||||
)
|
|
||||||
|
|
||||||
type RoutingServer interface {
|
|
||||||
Announce(*peer.Peer, u.Key) error
|
|
||||||
|
|
||||||
Providers(u.Key) []*peer.Peer
|
|
||||||
|
|
||||||
// Returns a Routing instance configured to query this hash table
|
|
||||||
Client(*peer.Peer) bsnet.Routing
|
|
||||||
}
|
|
||||||
|
|
||||||
func VirtualRoutingServer() RoutingServer {
|
|
||||||
return &hashTable{
|
|
||||||
providers: make(map[u.Key]peer.Map),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type hashTable struct {
|
|
||||||
lock sync.RWMutex
|
|
||||||
providers map[u.Key]peer.Map
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rs *hashTable) Announce(p *peer.Peer, k u.Key) error {
|
|
||||||
rs.lock.Lock()
|
|
||||||
defer rs.lock.Unlock()
|
|
||||||
|
|
||||||
_, ok := rs.providers[k]
|
|
||||||
if !ok {
|
|
||||||
rs.providers[k] = make(peer.Map)
|
|
||||||
}
|
|
||||||
rs.providers[k][p.Key()] = p
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rs *hashTable) Providers(k u.Key) []*peer.Peer {
|
|
||||||
rs.lock.RLock()
|
|
||||||
defer rs.lock.RUnlock()
|
|
||||||
ret := make([]*peer.Peer, 0)
|
|
||||||
peerset, ok := rs.providers[k]
|
|
||||||
if !ok {
|
|
||||||
return ret
|
|
||||||
}
|
|
||||||
for _, peer := range peerset {
|
|
||||||
ret = append(ret, peer)
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := range ret {
|
|
||||||
j := rand.Intn(i + 1)
|
|
||||||
ret[i], ret[j] = ret[j], ret[i]
|
|
||||||
}
|
|
||||||
|
|
||||||
return ret
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rs *hashTable) Client(p *peer.Peer) bsnet.Routing {
|
|
||||||
return &routingClient{
|
|
||||||
peer: p,
|
|
||||||
hashTable: rs,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type routingClient struct {
|
|
||||||
peer *peer.Peer
|
|
||||||
hashTable RoutingServer
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *routingClient) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-chan *peer.Peer {
|
|
||||||
out := make(chan *peer.Peer)
|
|
||||||
go func() {
|
|
||||||
defer close(out)
|
|
||||||
for i, p := range a.hashTable.Providers(k) {
|
|
||||||
if max <= i {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case out <- p:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
return out
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *routingClient) Provide(_ context.Context, key u.Key) error {
|
|
||||||
return a.hashTable.Announce(a.peer, key)
|
|
||||||
}
|
|
||||||
|
@ -5,19 +5,15 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
)
|
|
||||||
import (
|
|
||||||
"github.com/jbenet/go-ipfs/peer"
|
"github.com/jbenet/go-ipfs/peer"
|
||||||
|
mock "github.com/jbenet/go-ipfs/routing/mock"
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestKeyNotFound(t *testing.T) {
|
func TestKeyNotFound(t *testing.T) {
|
||||||
|
|
||||||
rs := func() RoutingServer {
|
vrs := mock.VirtualRoutingServer()
|
||||||
// TODO fields
|
empty := vrs.Providers(u.Key("not there"))
|
||||||
return &hashTable{}
|
|
||||||
}()
|
|
||||||
empty := rs.Providers(u.Key("not there"))
|
|
||||||
if len(empty) != 0 {
|
if len(empty) != 0 {
|
||||||
t.Fatal("should be empty")
|
t.Fatal("should be empty")
|
||||||
}
|
}
|
||||||
@ -29,7 +25,7 @@ func TestSetAndGet(t *testing.T) {
|
|||||||
ID: pid,
|
ID: pid,
|
||||||
}
|
}
|
||||||
k := u.Key("42")
|
k := u.Key("42")
|
||||||
rs := VirtualRoutingServer()
|
rs := mock.VirtualRoutingServer()
|
||||||
err := rs.Announce(p, k)
|
err := rs.Announce(p, k)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -50,8 +46,9 @@ func TestClientFindProviders(t *testing.T) {
|
|||||||
peer := &peer.Peer{
|
peer := &peer.Peer{
|
||||||
ID: []byte("42"),
|
ID: []byte("42"),
|
||||||
}
|
}
|
||||||
rs := VirtualRoutingServer()
|
rs := mock.VirtualRoutingServer()
|
||||||
client := rs.Client(peer)
|
client := mock.NewMockRouter(peer, nil)
|
||||||
|
client.SetRoutingServer(rs)
|
||||||
k := u.Key("hello")
|
k := u.Key("hello")
|
||||||
err := client.Provide(context.Background(), k)
|
err := client.Provide(context.Background(), k)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -83,7 +80,7 @@ func TestClientFindProviders(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestClientOverMax(t *testing.T) {
|
func TestClientOverMax(t *testing.T) {
|
||||||
rs := VirtualRoutingServer()
|
rs := mock.VirtualRoutingServer()
|
||||||
k := u.Key("hello")
|
k := u.Key("hello")
|
||||||
numProvidersForHelloKey := 100
|
numProvidersForHelloKey := 100
|
||||||
for i := 0; i < numProvidersForHelloKey; i++ {
|
for i := 0; i < numProvidersForHelloKey; i++ {
|
||||||
@ -102,7 +99,8 @@ func TestClientOverMax(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
max := 10
|
max := 10
|
||||||
client := rs.Client(&peer.Peer{ID: []byte("TODO")})
|
client := mock.NewMockRouter(&peer.Peer{ID: []byte("TODO")}, nil)
|
||||||
|
client.SetRoutingServer(rs)
|
||||||
providersFromClient := client.FindProvidersAsync(context.Background(), k, max)
|
providersFromClient := client.FindProvidersAsync(context.Background(), k, max)
|
||||||
i := 0
|
i := 0
|
||||||
for _ = range providersFromClient {
|
for _ = range providersFromClient {
|
||||||
@ -115,7 +113,7 @@ func TestClientOverMax(t *testing.T) {
|
|||||||
|
|
||||||
// TODO does dht ensure won't receive self as a provider? probably not.
|
// TODO does dht ensure won't receive self as a provider? probably not.
|
||||||
func TestCanceledContext(t *testing.T) {
|
func TestCanceledContext(t *testing.T) {
|
||||||
rs := VirtualRoutingServer()
|
rs := mock.VirtualRoutingServer()
|
||||||
k := u.Key("hello")
|
k := u.Key("hello")
|
||||||
|
|
||||||
t.Log("async'ly announce infinite stream of providers for key")
|
t.Log("async'ly announce infinite stream of providers for key")
|
||||||
@ -133,7 +131,9 @@ func TestCanceledContext(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
client := rs.Client(&peer.Peer{ID: []byte("peer id doesn't matter")})
|
local := &peer.Peer{ID: []byte("peer id doesn't matter")}
|
||||||
|
client := mock.NewMockRouter(local, nil)
|
||||||
|
client.SetRoutingServer(rs)
|
||||||
|
|
||||||
t.Log("warning: max is finite so this test is non-deterministic")
|
t.Log("warning: max is finite so this test is non-deterministic")
|
||||||
t.Log("context cancellation could simply take lower priority")
|
t.Log("context cancellation could simply take lower priority")
|
||||||
|
130
routing/mock/routing.go
Normal file
130
routing/mock/routing.go
Normal file
@ -0,0 +1,130 @@
|
|||||||
|
package mockrouter
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"math/rand"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
|
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
|
||||||
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
|
routing "github.com/jbenet/go-ipfs/routing"
|
||||||
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ routing.IpfsRouting = &MockRouter{}
|
||||||
|
|
||||||
|
type MockRouter struct {
|
||||||
|
datastore ds.Datastore
|
||||||
|
hashTable RoutingServer
|
||||||
|
peer *peer.Peer
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMockRouter(local *peer.Peer, dstore ds.Datastore) *MockRouter {
|
||||||
|
return &MockRouter{
|
||||||
|
datastore: dstore,
|
||||||
|
peer: local,
|
||||||
|
hashTable: VirtualRoutingServer(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mr *MockRouter) SetRoutingServer(rs RoutingServer) {
|
||||||
|
mr.hashTable = rs
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mr *MockRouter) PutValue(ctx context.Context, key u.Key, val []byte) error {
|
||||||
|
return mr.datastore.Put(ds.NewKey(string(key)), val)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mr *MockRouter) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
|
||||||
|
v, err := mr.datastore.Get(ds.NewKey(string(key)))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
data, ok := v.([]byte)
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.New("could not cast value from datastore")
|
||||||
|
}
|
||||||
|
|
||||||
|
return data, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mr *MockRouter) FindProviders(ctx context.Context, key u.Key) ([]*peer.Peer, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mr *MockRouter) FindPeer(ctx context.Context, pid peer.ID) (*peer.Peer, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mr *MockRouter) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-chan *peer.Peer {
|
||||||
|
out := make(chan *peer.Peer)
|
||||||
|
go func() {
|
||||||
|
defer close(out)
|
||||||
|
for i, p := range mr.hashTable.Providers(k) {
|
||||||
|
if max <= i {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case out <- p:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mr *MockRouter) Provide(_ context.Context, key u.Key) error {
|
||||||
|
return mr.hashTable.Announce(mr.peer, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
type RoutingServer interface {
|
||||||
|
Announce(*peer.Peer, u.Key) error
|
||||||
|
|
||||||
|
Providers(u.Key) []*peer.Peer
|
||||||
|
}
|
||||||
|
|
||||||
|
func VirtualRoutingServer() RoutingServer {
|
||||||
|
return &hashTable{
|
||||||
|
providers: make(map[u.Key]peer.Map),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type hashTable struct {
|
||||||
|
lock sync.RWMutex
|
||||||
|
providers map[u.Key]peer.Map
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rs *hashTable) Announce(p *peer.Peer, k u.Key) error {
|
||||||
|
rs.lock.Lock()
|
||||||
|
defer rs.lock.Unlock()
|
||||||
|
|
||||||
|
_, ok := rs.providers[k]
|
||||||
|
if !ok {
|
||||||
|
rs.providers[k] = make(peer.Map)
|
||||||
|
}
|
||||||
|
rs.providers[k][p.Key()] = p
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rs *hashTable) Providers(k u.Key) []*peer.Peer {
|
||||||
|
rs.lock.RLock()
|
||||||
|
defer rs.lock.RUnlock()
|
||||||
|
ret := make([]*peer.Peer, 0)
|
||||||
|
peerset, ok := rs.providers[k]
|
||||||
|
if !ok {
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
for _, peer := range peerset {
|
||||||
|
ret = append(ret, peer)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range ret {
|
||||||
|
j := rand.Intn(i + 1)
|
||||||
|
ret[i], ret[j] = ret[j], ret[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret
|
||||||
|
}
|
Reference in New Issue
Block a user