From 3ecdec985febc4985ef551d80893af87bc7e8ef7 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Fri, 12 Dec 2014 22:56:36 -0800 Subject: [PATCH] refactor(mockrouting) misc License: MIT Signed-off-by: Brian Tiger Chow --- blockservice/mock.go | 4 +- core/mock.go | 2 +- exchange/bitswap/bitswap_test.go | 20 +-- exchange/bitswap/testutils.go | 8 +- namesys/resolve_test.go | 6 +- routing/mock/client.go | 74 +++++++++ routing/mock/interface.go | 40 +++++ .../{routing_test.go => mockrouting_test.go} | 54 +++---- routing/mock/routing.go | 141 ------------------ routing/mock/server.go | 76 ++++++++++ 10 files changed, 228 insertions(+), 197 deletions(-) create mode 100644 routing/mock/client.go create mode 100644 routing/mock/interface.go rename routing/mock/{routing_test.go => mockrouting_test.go} (74%) delete mode 100644 routing/mock/routing.go create mode 100644 routing/mock/server.go diff --git a/blockservice/mock.go b/blockservice/mock.go index 2b646386c..277519746 100644 --- a/blockservice/mock.go +++ b/blockservice/mock.go @@ -5,14 +5,14 @@ import ( bitswap "github.com/jbenet/go-ipfs/exchange/bitswap" tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" - mock "github.com/jbenet/go-ipfs/routing/mock" + mockrouting "github.com/jbenet/go-ipfs/routing/mock" delay "github.com/jbenet/go-ipfs/util/delay" ) // Mocks returns |n| connected mock Blockservices func Mocks(t *testing.T, n int) []*BlockService { net := tn.VirtualNetwork(delay.Fixed(0)) - rs := mock.VirtualRoutingServer() + rs := mockrouting.NewServer() sg := bitswap.NewSessionGenerator(net, rs) instances := sg.Instances(n) diff --git a/core/mock.go b/core/mock.go index 835ba272b..9c893a6dc 100644 --- a/core/mock.go +++ b/core/mock.go @@ -42,7 +42,7 @@ func NewMockNode() (*IpfsNode, error) { nd.Datastore = ds2.CloserWrap(syncds.MutexWrap(dstore)) // Routing - dht := mdht.NewMockRouter(nd.Identity, nd.Datastore) + dht := mdht.NewServer().ClientWithDatastore(nd.Identity, nd.Datastore) nd.Routing = dht // Bitswap diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index 09018b870..d58ff596a 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -10,18 +10,20 @@ import ( blocks "github.com/jbenet/go-ipfs/blocks" blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil" tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" - mock "github.com/jbenet/go-ipfs/routing/mock" + mockrouting "github.com/jbenet/go-ipfs/routing/mock" delay "github.com/jbenet/go-ipfs/util/delay" testutil "github.com/jbenet/go-ipfs/util/testutil" ) +// FIXME the tests are really sensitive to the network delay. fix them to work +// well under varying conditions const kNetworkDelay = 0 * time.Millisecond func TestClose(t *testing.T) { // TODO t.Skip("TODO Bitswap's Close implementation is a WIP") vnet := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) - rout := mock.VirtualRoutingServer() + rout := mockrouting.NewServer() sesgen := NewSessionGenerator(vnet, rout) bgen := blocksutil.NewBlockGenerator() @@ -35,7 +37,7 @@ func TestClose(t *testing.T) { func TestGetBlockTimeout(t *testing.T) { net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) - rs := mock.VirtualRoutingServer() + rs := mockrouting.NewServer() g := NewSessionGenerator(net, rs) self := g.Next() @@ -52,11 +54,11 @@ func TestGetBlockTimeout(t *testing.T) { func TestProviderForKeyButNetworkCannotFind(t *testing.T) { net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) - rs := mock.VirtualRoutingServer() + rs := mockrouting.NewServer() g := NewSessionGenerator(net, rs) block := blocks.NewBlock([]byte("block")) - rs.Announce(testutil.NewPeerWithIDString("testing"), block.Key()) // but not on network + rs.Client(testutil.NewPeerWithIDString("testing")).Provide(context.Background(), block.Key()) // but not on network solo := g.Next() @@ -73,7 +75,7 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) - rs := mock.VirtualRoutingServer() + rs := mockrouting.NewServer() block := blocks.NewBlock([]byte("block")) g := NewSessionGenerator(net, rs) @@ -125,7 +127,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { t.SkipNow() } net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) - rs := mock.VirtualRoutingServer() + rs := mockrouting.NewServer() sg := NewSessionGenerator(net, rs) bg := blocksutil.NewBlockGenerator() @@ -140,7 +142,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { for _, b := range blocks { first.Blockstore().Put(b) first.Exchange.HasBlock(context.Background(), b) - rs.Announce(first.Peer, b.Key()) + rs.Client(first.Peer).Provide(context.Background(), b.Key()) } t.Log("Distribute!") @@ -185,7 +187,7 @@ func TestSendToWantingPeer(t *testing.T) { } net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) - rs := mock.VirtualRoutingServer() + rs := mockrouting.NewServer() sg := NewSessionGenerator(net, rs) bg := blocksutil.NewBlockGenerator() diff --git a/exchange/bitswap/testutils.go b/exchange/bitswap/testutils.go index 10a02606b..8ea4e7af8 100644 --- a/exchange/bitswap/testutils.go +++ b/exchange/bitswap/testutils.go @@ -10,13 +10,13 @@ import ( exchange "github.com/jbenet/go-ipfs/exchange" tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" peer "github.com/jbenet/go-ipfs/peer" - mock "github.com/jbenet/go-ipfs/routing/mock" + mockrouting "github.com/jbenet/go-ipfs/routing/mock" datastore2 "github.com/jbenet/go-ipfs/util/datastore2" delay "github.com/jbenet/go-ipfs/util/delay" ) func NewSessionGenerator( - net tn.Network, rs mock.RoutingServer) SessionGenerator { + net tn.Network, rs mockrouting.Server) SessionGenerator { return SessionGenerator{ net: net, rs: rs, @@ -28,7 +28,7 @@ func NewSessionGenerator( type SessionGenerator struct { seq int net tn.Network - rs mock.RoutingServer + rs mockrouting.Server ps peer.Peerstore } @@ -67,7 +67,7 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration { // NB: It's easy make mistakes by providing the same peer ID to two different // sessions. To safeguard, use the SessionGenerator to generate sessions. It's // just a much better idea. -func session(net tn.Network, rs mock.RoutingServer, ps peer.Peerstore, id peer.ID) Instance { +func session(net tn.Network, rs mockrouting.Server, ps peer.Peerstore, id peer.ID) Instance { p := ps.WithID(id) adapter := net.Adapter(p) diff --git a/namesys/resolve_test.go b/namesys/resolve_test.go index eef5e6825..1d487f9a7 100644 --- a/namesys/resolve_test.go +++ b/namesys/resolve_test.go @@ -3,17 +3,15 @@ package namesys import ( "testing" - ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ci "github.com/jbenet/go-ipfs/crypto" - mock "github.com/jbenet/go-ipfs/routing/mock" + mockrouting "github.com/jbenet/go-ipfs/routing/mock" u "github.com/jbenet/go-ipfs/util" testutil "github.com/jbenet/go-ipfs/util/testutil" ) func TestRoutingResolve(t *testing.T) { local := testutil.NewPeerWithIDString("testID") - lds := ds.NewMapDatastore() - d := mock.NewMockRouter(local, lds) + d := mockrouting.NewServer().Client(local) resolver := NewRoutingResolver(d) publisher := NewRoutingPublisher(d) diff --git a/routing/mock/client.go b/routing/mock/client.go new file mode 100644 index 000000000..f4702aae6 --- /dev/null +++ b/routing/mock/client.go @@ -0,0 +1,74 @@ +package mockrouting + +import ( + "errors" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + peer "github.com/jbenet/go-ipfs/peer" + routing "github.com/jbenet/go-ipfs/routing" + u "github.com/jbenet/go-ipfs/util" +) + +var log = u.Logger("mockrouter") + +type client struct { + datastore ds.Datastore + server server + peer peer.Peer +} + +// FIXME(brian): is this method meant to simulate putting a value into the network? +func (c *client) PutValue(ctx context.Context, key u.Key, val []byte) error { + log.Debugf("PutValue: %s", key) + return c.datastore.Put(key.DsKey(), val) +} + +// FIXME(brian): is this method meant to simulate getting a value from the network? +func (c *client) GetValue(ctx context.Context, key u.Key) ([]byte, error) { + log.Debugf("GetValue: %s", key) + v, err := c.datastore.Get(key.DsKey()) + if err != nil { + return nil, err + } + + data, ok := v.([]byte) + if !ok { + return nil, errors.New("could not cast value from datastore") + } + + return data, nil +} + +func (c *client) FindProviders(ctx context.Context, key u.Key) ([]peer.Peer, error) { + return c.server.Providers(key), nil +} + +func (c *client) FindPeer(ctx context.Context, pid peer.ID) (peer.Peer, error) { + log.Debugf("FindPeer: %s", pid) + return nil, nil +} + +func (c *client) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-chan peer.Peer { + out := make(chan peer.Peer) + go func() { + defer close(out) + for i, p := range c.server.Providers(k) { + if max <= i { + return + } + select { + case out <- p: + case <-ctx.Done(): + return + } + } + }() + return out +} + +func (c *client) Provide(_ context.Context, key u.Key) error { + return c.server.Announce(c.peer, key) +} + +var _ routing.IpfsRouting = &client{} diff --git a/routing/mock/interface.go b/routing/mock/interface.go new file mode 100644 index 000000000..e84a9ba5a --- /dev/null +++ b/routing/mock/interface.go @@ -0,0 +1,40 @@ +// Package mock provides a virtual routing server. To use it, create a virtual +// routing server and use the Client() method to get a routing client +// (IpfsRouting). The server quacks like a DHT but is really a local in-memory +// hash table. +package mockrouting + +import ( + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + peer "github.com/jbenet/go-ipfs/peer" + routing "github.com/jbenet/go-ipfs/routing" + u "github.com/jbenet/go-ipfs/util" + delay "github.com/jbenet/go-ipfs/util/delay" +) + +// Server provides mockrouting Clients +type Server interface { + Client(p peer.Peer) Client + ClientWithDatastore(peer.Peer, ds.Datastore) Client +} + +// Client implements IpfsRouting +type Client interface { + FindProviders(context.Context, u.Key) ([]peer.Peer, error) + + routing.IpfsRouting +} + +// NewServer returns a mockrouting Server +func NewServer() Server { + return NewServerWithDelay(delay.Fixed(0)) +} + +// NewServerWithDelay returns a mockrouting Server with a delay! +func NewServerWithDelay(d delay.D) Server { + return &s{ + providers: make(map[u.Key]peer.Map), + delay: d, + } +} diff --git a/routing/mock/routing_test.go b/routing/mock/mockrouting_test.go similarity index 74% rename from routing/mock/routing_test.go rename to routing/mock/mockrouting_test.go index 536d7b018..3f9bfab6c 100644 --- a/routing/mock/routing_test.go +++ b/routing/mock/mockrouting_test.go @@ -1,4 +1,4 @@ -package mock +package mockrouting import ( "bytes" @@ -12,37 +12,21 @@ import ( func TestKeyNotFound(t *testing.T) { - vrs := VirtualRoutingServer() - empty := vrs.Providers(u.Key("not there")) - if len(empty) != 0 { - t.Fatal("should be empty") - } -} + var peer = testutil.NewPeerWithID(peer.ID([]byte("the peer id"))) + var key = u.Key("mock key") + var ctx = context.Background() -func TestSetAndGet(t *testing.T) { - pid := peer.ID([]byte("the peer id")) - p := testutil.NewPeerWithID(pid) - k := u.Key("42") - rs := VirtualRoutingServer() - err := rs.Announce(p, k) - if err != nil { - t.Fatal(err) + rs := NewServer() + providers := rs.Client(peer).FindProvidersAsync(ctx, key, 10) + _, ok := <-providers + if ok { + t.Fatal("should be closed") } - providers := rs.Providers(k) - if len(providers) != 1 { - t.Fatal("should be one") - } - for _, elem := range providers { - if bytes.Equal(elem.ID(), pid) { - return - } - } - t.Fatal("ID should have matched") } func TestClientFindProviders(t *testing.T) { peer := testutil.NewPeerWithIDString("42") - rs := VirtualRoutingServer() + rs := NewServer() client := rs.Client(peer) k := u.Key("hello") @@ -52,7 +36,10 @@ func TestClientFindProviders(t *testing.T) { } max := 100 - providersFromHashTable := rs.Providers(k) + providersFromHashTable, err := rs.Client(peer).FindProviders(context.Background(), k) + if err != nil { + t.Fatal(err) + } isInHT := false for _, p := range providersFromHashTable { @@ -76,21 +63,16 @@ func TestClientFindProviders(t *testing.T) { } func TestClientOverMax(t *testing.T) { - rs := VirtualRoutingServer() + rs := NewServer() k := u.Key("hello") numProvidersForHelloKey := 100 for i := 0; i < numProvidersForHelloKey; i++ { peer := testutil.NewPeerWithIDString(string(i)) - err := rs.Announce(peer, k) + err := rs.Client(peer).Provide(context.Background(), k) if err != nil { t.Fatal(err) } } - providersFromHashTable := rs.Providers(k) - if len(providersFromHashTable) != numProvidersForHelloKey { - t.Log(1 == len(providersFromHashTable)) - t.Fatal("not all providers were returned") - } max := 10 peer := testutil.NewPeerWithIDString("TODO") @@ -108,7 +90,7 @@ func TestClientOverMax(t *testing.T) { // TODO does dht ensure won't receive self as a provider? probably not. func TestCanceledContext(t *testing.T) { - rs := VirtualRoutingServer() + rs := NewServer() k := u.Key("hello") t.Log("async'ly announce infinite stream of providers for key") @@ -116,7 +98,7 @@ func TestCanceledContext(t *testing.T) { go func() { // infinite stream for { peer := testutil.NewPeerWithIDString(string(i)) - err := rs.Announce(peer, k) + err := rs.Client(peer).Provide(context.Background(), k) if err != nil { t.Fatal(err) } diff --git a/routing/mock/routing.go b/routing/mock/routing.go deleted file mode 100644 index 23fe36644..000000000 --- a/routing/mock/routing.go +++ /dev/null @@ -1,141 +0,0 @@ -package mock - -import ( - "errors" - "math/rand" - "sync" - - "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" - peer "github.com/jbenet/go-ipfs/peer" - routing "github.com/jbenet/go-ipfs/routing" - u "github.com/jbenet/go-ipfs/util" -) - -var log = u.Logger("mockrouter") - -var _ routing.IpfsRouting = &MockRouter{} - -type MockRouter struct { - datastore ds.Datastore - hashTable RoutingServer - peer peer.Peer -} - -func NewMockRouter(local peer.Peer, dstore ds.Datastore) routing.IpfsRouting { - return &MockRouter{ - datastore: dstore, - peer: local, - hashTable: VirtualRoutingServer(), - } -} - -func (mr *MockRouter) PutValue(ctx context.Context, key u.Key, val []byte) error { - log.Debugf("PutValue: %s", key) - return mr.datastore.Put(key.DsKey(), val) -} - -func (mr *MockRouter) GetValue(ctx context.Context, key u.Key) ([]byte, error) { - log.Debugf("GetValue: %s", key) - v, err := mr.datastore.Get(key.DsKey()) - if err != nil { - return nil, err - } - - data, ok := v.([]byte) - if !ok { - return nil, errors.New("could not cast value from datastore") - } - - return data, nil -} - -func (mr *MockRouter) FindProviders(ctx context.Context, key u.Key) ([]peer.Peer, error) { - return nil, nil -} - -func (mr *MockRouter) FindPeer(ctx context.Context, pid peer.ID) (peer.Peer, error) { - log.Debugf("FindPeer: %s", pid) - return nil, nil -} - -func (mr *MockRouter) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-chan peer.Peer { - out := make(chan peer.Peer) - go func() { - defer close(out) - for i, p := range mr.hashTable.Providers(k) { - if max <= i { - return - } - select { - case out <- p: - case <-ctx.Done(): - return - } - } - }() - return out -} - -func (mr *MockRouter) Provide(_ context.Context, key u.Key) error { - return mr.hashTable.Announce(mr.peer, key) -} - -type RoutingServer interface { - Announce(peer.Peer, u.Key) error - - Providers(u.Key) []peer.Peer - - Client(p peer.Peer) routing.IpfsRouting -} - -func VirtualRoutingServer() RoutingServer { - return &hashTable{ - providers: make(map[u.Key]peer.Map), - } -} - -type hashTable struct { - lock sync.RWMutex - providers map[u.Key]peer.Map -} - -func (rs *hashTable) Announce(p peer.Peer, k u.Key) error { - rs.lock.Lock() - defer rs.lock.Unlock() - - _, ok := rs.providers[k] - if !ok { - rs.providers[k] = make(peer.Map) - } - rs.providers[k][p.Key()] = p - return nil -} - -func (rs *hashTable) Providers(k u.Key) []peer.Peer { - rs.lock.RLock() - defer rs.lock.RUnlock() - - var ret []peer.Peer - peerset, ok := rs.providers[k] - if !ok { - return ret - } - for _, peer := range peerset { - ret = append(ret, peer) - } - - for i := range ret { - j := rand.Intn(i + 1) - ret[i], ret[j] = ret[j], ret[i] - } - - return ret -} - -func (rs *hashTable) Client(p peer.Peer) routing.IpfsRouting { - return &MockRouter{ - peer: p, - hashTable: rs, - } -} diff --git a/routing/mock/server.go b/routing/mock/server.go new file mode 100644 index 000000000..3e189d954 --- /dev/null +++ b/routing/mock/server.go @@ -0,0 +1,76 @@ +package mockrouting + +import ( + "math/rand" + "sync" + + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + peer "github.com/jbenet/go-ipfs/peer" + u "github.com/jbenet/go-ipfs/util" + delay "github.com/jbenet/go-ipfs/util/delay" +) + +// server is the mockrouting.Client's private interface to the routing server +type server interface { + Announce(peer.Peer, u.Key) error + Providers(u.Key) []peer.Peer + + Server +} + +// s is an implementation of the private server interface +type s struct { + delay delay.D + + lock sync.RWMutex + providers map[u.Key]peer.Map +} + +func (rs *s) Announce(p peer.Peer, k u.Key) error { + rs.delay.Wait() // before locking + + rs.lock.Lock() + defer rs.lock.Unlock() + + _, ok := rs.providers[k] + if !ok { + rs.providers[k] = make(peer.Map) + } + rs.providers[k][p.Key()] = p + return nil +} + +func (rs *s) Providers(k u.Key) []peer.Peer { + rs.delay.Wait() // before locking + + rs.lock.RLock() + defer rs.lock.RUnlock() + + var ret []peer.Peer + peerset, ok := rs.providers[k] + if !ok { + return ret + } + for _, peer := range peerset { + ret = append(ret, peer) + } + + for i := range ret { + j := rand.Intn(i + 1) + ret[i], ret[j] = ret[j], ret[i] + } + + return ret +} + +func (rs *s) Client(p peer.Peer) Client { + return rs.ClientWithDatastore(p, ds.NewMapDatastore()) +} + +func (rs *s) ClientWithDatastore(p peer.Peer, datastore ds.Datastore) Client { + return &client{ + peer: p, + datastore: ds.NewMapDatastore(), + server: rs, + } +}