1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-27 16:07:42 +08:00

refactor(mockrouting) misc

License: MIT
Signed-off-by: Brian Tiger Chow <brian@perfmode.com>
This commit is contained in:
Brian Tiger Chow
2014-12-12 22:56:36 -08:00
parent 193004a061
commit 3ecdec985f
10 changed files with 228 additions and 197 deletions

View File

@ -5,14 +5,14 @@ import (
bitswap "github.com/jbenet/go-ipfs/exchange/bitswap" bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
mock "github.com/jbenet/go-ipfs/routing/mock" mockrouting "github.com/jbenet/go-ipfs/routing/mock"
delay "github.com/jbenet/go-ipfs/util/delay" delay "github.com/jbenet/go-ipfs/util/delay"
) )
// Mocks returns |n| connected mock Blockservices // Mocks returns |n| connected mock Blockservices
func Mocks(t *testing.T, n int) []*BlockService { func Mocks(t *testing.T, n int) []*BlockService {
net := tn.VirtualNetwork(delay.Fixed(0)) net := tn.VirtualNetwork(delay.Fixed(0))
rs := mock.VirtualRoutingServer() rs := mockrouting.NewServer()
sg := bitswap.NewSessionGenerator(net, rs) sg := bitswap.NewSessionGenerator(net, rs)
instances := sg.Instances(n) instances := sg.Instances(n)

View File

@ -42,7 +42,7 @@ func NewMockNode() (*IpfsNode, error) {
nd.Datastore = ds2.CloserWrap(syncds.MutexWrap(dstore)) nd.Datastore = ds2.CloserWrap(syncds.MutexWrap(dstore))
// Routing // Routing
dht := mdht.NewMockRouter(nd.Identity, nd.Datastore) dht := mdht.NewServer().ClientWithDatastore(nd.Identity, nd.Datastore)
nd.Routing = dht nd.Routing = dht
// Bitswap // Bitswap

View File

@ -10,18 +10,20 @@ import (
blocks "github.com/jbenet/go-ipfs/blocks" blocks "github.com/jbenet/go-ipfs/blocks"
blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil" blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil"
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
mock "github.com/jbenet/go-ipfs/routing/mock" mockrouting "github.com/jbenet/go-ipfs/routing/mock"
delay "github.com/jbenet/go-ipfs/util/delay" delay "github.com/jbenet/go-ipfs/util/delay"
testutil "github.com/jbenet/go-ipfs/util/testutil" testutil "github.com/jbenet/go-ipfs/util/testutil"
) )
// FIXME the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
const kNetworkDelay = 0 * time.Millisecond const kNetworkDelay = 0 * time.Millisecond
func TestClose(t *testing.T) { func TestClose(t *testing.T) {
// TODO // TODO
t.Skip("TODO Bitswap's Close implementation is a WIP") t.Skip("TODO Bitswap's Close implementation is a WIP")
vnet := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) vnet := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
rout := mock.VirtualRoutingServer() rout := mockrouting.NewServer()
sesgen := NewSessionGenerator(vnet, rout) sesgen := NewSessionGenerator(vnet, rout)
bgen := blocksutil.NewBlockGenerator() bgen := blocksutil.NewBlockGenerator()
@ -35,7 +37,7 @@ func TestClose(t *testing.T) {
func TestGetBlockTimeout(t *testing.T) { func TestGetBlockTimeout(t *testing.T) {
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
rs := mock.VirtualRoutingServer() rs := mockrouting.NewServer()
g := NewSessionGenerator(net, rs) g := NewSessionGenerator(net, rs)
self := g.Next() self := g.Next()
@ -52,11 +54,11 @@ func TestGetBlockTimeout(t *testing.T) {
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { func TestProviderForKeyButNetworkCannotFind(t *testing.T) {
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
rs := mock.VirtualRoutingServer() rs := mockrouting.NewServer()
g := NewSessionGenerator(net, rs) g := NewSessionGenerator(net, rs)
block := blocks.NewBlock([]byte("block")) block := blocks.NewBlock([]byte("block"))
rs.Announce(testutil.NewPeerWithIDString("testing"), block.Key()) // but not on network rs.Client(testutil.NewPeerWithIDString("testing")).Provide(context.Background(), block.Key()) // but not on network
solo := g.Next() solo := g.Next()
@ -73,7 +75,7 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) {
func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
rs := mock.VirtualRoutingServer() rs := mockrouting.NewServer()
block := blocks.NewBlock([]byte("block")) block := blocks.NewBlock([]byte("block"))
g := NewSessionGenerator(net, rs) g := NewSessionGenerator(net, rs)
@ -125,7 +127,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
t.SkipNow() t.SkipNow()
} }
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
rs := mock.VirtualRoutingServer() rs := mockrouting.NewServer()
sg := NewSessionGenerator(net, rs) sg := NewSessionGenerator(net, rs)
bg := blocksutil.NewBlockGenerator() bg := blocksutil.NewBlockGenerator()
@ -140,7 +142,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
for _, b := range blocks { for _, b := range blocks {
first.Blockstore().Put(b) first.Blockstore().Put(b)
first.Exchange.HasBlock(context.Background(), b) first.Exchange.HasBlock(context.Background(), b)
rs.Announce(first.Peer, b.Key()) rs.Client(first.Peer).Provide(context.Background(), b.Key())
} }
t.Log("Distribute!") t.Log("Distribute!")
@ -185,7 +187,7 @@ func TestSendToWantingPeer(t *testing.T) {
} }
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
rs := mock.VirtualRoutingServer() rs := mockrouting.NewServer()
sg := NewSessionGenerator(net, rs) sg := NewSessionGenerator(net, rs)
bg := blocksutil.NewBlockGenerator() bg := blocksutil.NewBlockGenerator()

View File

@ -10,13 +10,13 @@ import (
exchange "github.com/jbenet/go-ipfs/exchange" exchange "github.com/jbenet/go-ipfs/exchange"
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
mock "github.com/jbenet/go-ipfs/routing/mock" mockrouting "github.com/jbenet/go-ipfs/routing/mock"
datastore2 "github.com/jbenet/go-ipfs/util/datastore2" datastore2 "github.com/jbenet/go-ipfs/util/datastore2"
delay "github.com/jbenet/go-ipfs/util/delay" delay "github.com/jbenet/go-ipfs/util/delay"
) )
func NewSessionGenerator( func NewSessionGenerator(
net tn.Network, rs mock.RoutingServer) SessionGenerator { net tn.Network, rs mockrouting.Server) SessionGenerator {
return SessionGenerator{ return SessionGenerator{
net: net, net: net,
rs: rs, rs: rs,
@ -28,7 +28,7 @@ func NewSessionGenerator(
type SessionGenerator struct { type SessionGenerator struct {
seq int seq int
net tn.Network net tn.Network
rs mock.RoutingServer rs mockrouting.Server
ps peer.Peerstore ps peer.Peerstore
} }
@ -67,7 +67,7 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration {
// NB: It's easy make mistakes by providing the same peer ID to two different // NB: It's easy make mistakes by providing the same peer ID to two different
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's // sessions. To safeguard, use the SessionGenerator to generate sessions. It's
// just a much better idea. // just a much better idea.
func session(net tn.Network, rs mock.RoutingServer, ps peer.Peerstore, id peer.ID) Instance { func session(net tn.Network, rs mockrouting.Server, ps peer.Peerstore, id peer.ID) Instance {
p := ps.WithID(id) p := ps.WithID(id)
adapter := net.Adapter(p) adapter := net.Adapter(p)

View File

@ -3,17 +3,15 @@ package namesys
import ( import (
"testing" "testing"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
ci "github.com/jbenet/go-ipfs/crypto" ci "github.com/jbenet/go-ipfs/crypto"
mock "github.com/jbenet/go-ipfs/routing/mock" mockrouting "github.com/jbenet/go-ipfs/routing/mock"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
testutil "github.com/jbenet/go-ipfs/util/testutil" testutil "github.com/jbenet/go-ipfs/util/testutil"
) )
func TestRoutingResolve(t *testing.T) { func TestRoutingResolve(t *testing.T) {
local := testutil.NewPeerWithIDString("testID") local := testutil.NewPeerWithIDString("testID")
lds := ds.NewMapDatastore() d := mockrouting.NewServer().Client(local)
d := mock.NewMockRouter(local, lds)
resolver := NewRoutingResolver(d) resolver := NewRoutingResolver(d)
publisher := NewRoutingPublisher(d) publisher := NewRoutingPublisher(d)

74
routing/mock/client.go Normal file
View File

@ -0,0 +1,74 @@
package mockrouting
import (
"errors"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
peer "github.com/jbenet/go-ipfs/peer"
routing "github.com/jbenet/go-ipfs/routing"
u "github.com/jbenet/go-ipfs/util"
)
var log = u.Logger("mockrouter")
type client struct {
datastore ds.Datastore
server server
peer peer.Peer
}
// FIXME(brian): is this method meant to simulate putting a value into the network?
func (c *client) PutValue(ctx context.Context, key u.Key, val []byte) error {
log.Debugf("PutValue: %s", key)
return c.datastore.Put(key.DsKey(), val)
}
// FIXME(brian): is this method meant to simulate getting a value from the network?
func (c *client) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
log.Debugf("GetValue: %s", key)
v, err := c.datastore.Get(key.DsKey())
if err != nil {
return nil, err
}
data, ok := v.([]byte)
if !ok {
return nil, errors.New("could not cast value from datastore")
}
return data, nil
}
func (c *client) FindProviders(ctx context.Context, key u.Key) ([]peer.Peer, error) {
return c.server.Providers(key), nil
}
func (c *client) FindPeer(ctx context.Context, pid peer.ID) (peer.Peer, error) {
log.Debugf("FindPeer: %s", pid)
return nil, nil
}
func (c *client) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-chan peer.Peer {
out := make(chan peer.Peer)
go func() {
defer close(out)
for i, p := range c.server.Providers(k) {
if max <= i {
return
}
select {
case out <- p:
case <-ctx.Done():
return
}
}
}()
return out
}
func (c *client) Provide(_ context.Context, key u.Key) error {
return c.server.Announce(c.peer, key)
}
var _ routing.IpfsRouting = &client{}

40
routing/mock/interface.go Normal file
View File

@ -0,0 +1,40 @@
// Package mock provides a virtual routing server. To use it, create a virtual
// routing server and use the Client() method to get a routing client
// (IpfsRouting). The server quacks like a DHT but is really a local in-memory
// hash table.
package mockrouting
import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
peer "github.com/jbenet/go-ipfs/peer"
routing "github.com/jbenet/go-ipfs/routing"
u "github.com/jbenet/go-ipfs/util"
delay "github.com/jbenet/go-ipfs/util/delay"
)
// Server provides mockrouting Clients
type Server interface {
Client(p peer.Peer) Client
ClientWithDatastore(peer.Peer, ds.Datastore) Client
}
// Client implements IpfsRouting
type Client interface {
FindProviders(context.Context, u.Key) ([]peer.Peer, error)
routing.IpfsRouting
}
// NewServer returns a mockrouting Server
func NewServer() Server {
return NewServerWithDelay(delay.Fixed(0))
}
// NewServerWithDelay returns a mockrouting Server with a delay!
func NewServerWithDelay(d delay.D) Server {
return &s{
providers: make(map[u.Key]peer.Map),
delay: d,
}
}

View File

@ -1,4 +1,4 @@
package mock package mockrouting
import ( import (
"bytes" "bytes"
@ -12,37 +12,21 @@ import (
func TestKeyNotFound(t *testing.T) { func TestKeyNotFound(t *testing.T) {
vrs := VirtualRoutingServer() var peer = testutil.NewPeerWithID(peer.ID([]byte("the peer id")))
empty := vrs.Providers(u.Key("not there")) var key = u.Key("mock key")
if len(empty) != 0 { var ctx = context.Background()
t.Fatal("should be empty")
}
}
func TestSetAndGet(t *testing.T) { rs := NewServer()
pid := peer.ID([]byte("the peer id")) providers := rs.Client(peer).FindProvidersAsync(ctx, key, 10)
p := testutil.NewPeerWithID(pid) _, ok := <-providers
k := u.Key("42") if ok {
rs := VirtualRoutingServer() t.Fatal("should be closed")
err := rs.Announce(p, k)
if err != nil {
t.Fatal(err)
} }
providers := rs.Providers(k)
if len(providers) != 1 {
t.Fatal("should be one")
}
for _, elem := range providers {
if bytes.Equal(elem.ID(), pid) {
return
}
}
t.Fatal("ID should have matched")
} }
func TestClientFindProviders(t *testing.T) { func TestClientFindProviders(t *testing.T) {
peer := testutil.NewPeerWithIDString("42") peer := testutil.NewPeerWithIDString("42")
rs := VirtualRoutingServer() rs := NewServer()
client := rs.Client(peer) client := rs.Client(peer)
k := u.Key("hello") k := u.Key("hello")
@ -52,7 +36,10 @@ func TestClientFindProviders(t *testing.T) {
} }
max := 100 max := 100
providersFromHashTable := rs.Providers(k) providersFromHashTable, err := rs.Client(peer).FindProviders(context.Background(), k)
if err != nil {
t.Fatal(err)
}
isInHT := false isInHT := false
for _, p := range providersFromHashTable { for _, p := range providersFromHashTable {
@ -76,21 +63,16 @@ func TestClientFindProviders(t *testing.T) {
} }
func TestClientOverMax(t *testing.T) { func TestClientOverMax(t *testing.T) {
rs := VirtualRoutingServer() rs := NewServer()
k := u.Key("hello") k := u.Key("hello")
numProvidersForHelloKey := 100 numProvidersForHelloKey := 100
for i := 0; i < numProvidersForHelloKey; i++ { for i := 0; i < numProvidersForHelloKey; i++ {
peer := testutil.NewPeerWithIDString(string(i)) peer := testutil.NewPeerWithIDString(string(i))
err := rs.Announce(peer, k) err := rs.Client(peer).Provide(context.Background(), k)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
providersFromHashTable := rs.Providers(k)
if len(providersFromHashTable) != numProvidersForHelloKey {
t.Log(1 == len(providersFromHashTable))
t.Fatal("not all providers were returned")
}
max := 10 max := 10
peer := testutil.NewPeerWithIDString("TODO") peer := testutil.NewPeerWithIDString("TODO")
@ -108,7 +90,7 @@ func TestClientOverMax(t *testing.T) {
// TODO does dht ensure won't receive self as a provider? probably not. // TODO does dht ensure won't receive self as a provider? probably not.
func TestCanceledContext(t *testing.T) { func TestCanceledContext(t *testing.T) {
rs := VirtualRoutingServer() rs := NewServer()
k := u.Key("hello") k := u.Key("hello")
t.Log("async'ly announce infinite stream of providers for key") t.Log("async'ly announce infinite stream of providers for key")
@ -116,7 +98,7 @@ func TestCanceledContext(t *testing.T) {
go func() { // infinite stream go func() { // infinite stream
for { for {
peer := testutil.NewPeerWithIDString(string(i)) peer := testutil.NewPeerWithIDString(string(i))
err := rs.Announce(peer, k) err := rs.Client(peer).Provide(context.Background(), k)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -1,141 +0,0 @@
package mock
import (
"errors"
"math/rand"
"sync"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
peer "github.com/jbenet/go-ipfs/peer"
routing "github.com/jbenet/go-ipfs/routing"
u "github.com/jbenet/go-ipfs/util"
)
var log = u.Logger("mockrouter")
var _ routing.IpfsRouting = &MockRouter{}
type MockRouter struct {
datastore ds.Datastore
hashTable RoutingServer
peer peer.Peer
}
func NewMockRouter(local peer.Peer, dstore ds.Datastore) routing.IpfsRouting {
return &MockRouter{
datastore: dstore,
peer: local,
hashTable: VirtualRoutingServer(),
}
}
func (mr *MockRouter) PutValue(ctx context.Context, key u.Key, val []byte) error {
log.Debugf("PutValue: %s", key)
return mr.datastore.Put(key.DsKey(), val)
}
func (mr *MockRouter) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
log.Debugf("GetValue: %s", key)
v, err := mr.datastore.Get(key.DsKey())
if err != nil {
return nil, err
}
data, ok := v.([]byte)
if !ok {
return nil, errors.New("could not cast value from datastore")
}
return data, nil
}
func (mr *MockRouter) FindProviders(ctx context.Context, key u.Key) ([]peer.Peer, error) {
return nil, nil
}
func (mr *MockRouter) FindPeer(ctx context.Context, pid peer.ID) (peer.Peer, error) {
log.Debugf("FindPeer: %s", pid)
return nil, nil
}
func (mr *MockRouter) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-chan peer.Peer {
out := make(chan peer.Peer)
go func() {
defer close(out)
for i, p := range mr.hashTable.Providers(k) {
if max <= i {
return
}
select {
case out <- p:
case <-ctx.Done():
return
}
}
}()
return out
}
func (mr *MockRouter) Provide(_ context.Context, key u.Key) error {
return mr.hashTable.Announce(mr.peer, key)
}
type RoutingServer interface {
Announce(peer.Peer, u.Key) error
Providers(u.Key) []peer.Peer
Client(p peer.Peer) routing.IpfsRouting
}
func VirtualRoutingServer() RoutingServer {
return &hashTable{
providers: make(map[u.Key]peer.Map),
}
}
type hashTable struct {
lock sync.RWMutex
providers map[u.Key]peer.Map
}
func (rs *hashTable) Announce(p peer.Peer, k u.Key) error {
rs.lock.Lock()
defer rs.lock.Unlock()
_, ok := rs.providers[k]
if !ok {
rs.providers[k] = make(peer.Map)
}
rs.providers[k][p.Key()] = p
return nil
}
func (rs *hashTable) Providers(k u.Key) []peer.Peer {
rs.lock.RLock()
defer rs.lock.RUnlock()
var ret []peer.Peer
peerset, ok := rs.providers[k]
if !ok {
return ret
}
for _, peer := range peerset {
ret = append(ret, peer)
}
for i := range ret {
j := rand.Intn(i + 1)
ret[i], ret[j] = ret[j], ret[i]
}
return ret
}
func (rs *hashTable) Client(p peer.Peer) routing.IpfsRouting {
return &MockRouter{
peer: p,
hashTable: rs,
}
}

76
routing/mock/server.go Normal file
View File

@ -0,0 +1,76 @@
package mockrouting
import (
"math/rand"
"sync"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
delay "github.com/jbenet/go-ipfs/util/delay"
)
// server is the mockrouting.Client's private interface to the routing server
type server interface {
Announce(peer.Peer, u.Key) error
Providers(u.Key) []peer.Peer
Server
}
// s is an implementation of the private server interface
type s struct {
delay delay.D
lock sync.RWMutex
providers map[u.Key]peer.Map
}
func (rs *s) Announce(p peer.Peer, k u.Key) error {
rs.delay.Wait() // before locking
rs.lock.Lock()
defer rs.lock.Unlock()
_, ok := rs.providers[k]
if !ok {
rs.providers[k] = make(peer.Map)
}
rs.providers[k][p.Key()] = p
return nil
}
func (rs *s) Providers(k u.Key) []peer.Peer {
rs.delay.Wait() // before locking
rs.lock.RLock()
defer rs.lock.RUnlock()
var ret []peer.Peer
peerset, ok := rs.providers[k]
if !ok {
return ret
}
for _, peer := range peerset {
ret = append(ret, peer)
}
for i := range ret {
j := rand.Intn(i + 1)
ret[i], ret[j] = ret[j], ret[i]
}
return ret
}
func (rs *s) Client(p peer.Peer) Client {
return rs.ClientWithDatastore(p, ds.NewMapDatastore())
}
func (rs *s) ClientWithDatastore(p peer.Peer, datastore ds.Datastore) Client {
return &client{
peer: p,
datastore: ds.NewMapDatastore(),
server: rs,
}
}