From ca32a83394a19311eae8661cd6400d00ed1bd578 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Wed, 17 Dec 2014 10:02:19 -0800 Subject: [PATCH] wip with DHT @whyrusleeping @jbenet this is a WIP with the DHT. wip License: MIT Signed-off-by: Brian Tiger Chow Conflicts: epictest/addcat_test.go exchange/bitswap/testnet/peernet.go exchange/bitswap/testutils.go routing/mock/centralized_server.go routing/mock/centralized_test.go routing/mock/interface.go fix(routing/mock) fill in function definition --- core/mock.go | 9 ++++--- epictest/addcat_test.go | 36 +++++++++++++++++++++++---- epictest/bench_test.go | 1 + exchange/bitswap/bitswap_test.go | 4 +-- exchange/bitswap/testnet/peernet.go | 17 +++---------- exchange/bitswap/testnet/virtual.go | 2 +- exchange/bitswap/testutils.go | 12 ++++----- namesys/resolve_test.go | 7 +----- net/mux.go | 1 + routing/dht/routing.go | 10 ++++++++ routing/mock/centralized_client.go | 10 ++++++-- routing/mock/centralized_server.go | 8 +++--- routing/mock/centralized_test.go | 34 +++++++++----------------- routing/mock/interface.go | 6 ++--- routing/mock/server2.go | 38 +++++++++++++++++++++++++++++ 15 files changed, 127 insertions(+), 68 deletions(-) create mode 100644 routing/mock/server2.go diff --git a/core/mock.go b/core/mock.go index 83f1a5a81..cd47875c9 100644 --- a/core/mock.go +++ b/core/mock.go @@ -1,6 +1,7 @@ package core import ( + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" syncds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" "github.com/jbenet/go-ipfs/blocks/blockstore" @@ -11,12 +12,13 @@ import ( nsys "github.com/jbenet/go-ipfs/namesys" path "github.com/jbenet/go-ipfs/path" peer "github.com/jbenet/go-ipfs/peer" - mdht "github.com/jbenet/go-ipfs/routing/mock" + dht "github.com/jbenet/go-ipfs/routing/dht" ds2 "github.com/jbenet/go-ipfs/util/datastore2" ) // NewMockNode constructs an IpfsNode for use in tests. func NewMockNode() (*IpfsNode, error) { + ctx := context.TODO() nd := new(IpfsNode) // Generate Identity @@ -41,8 +43,7 @@ func NewMockNode() (*IpfsNode, error) { nd.Datastore = ds2.CloserWrap(syncds.MutexWrap(dstore)) // Routing - dht := mdht.NewServer().ClientWithDatastore(peer.PeerInfo{ID: p}, nd.Datastore) - nd.Routing = dht + nd.Routing = dht.NewDHT(ctx, nd.Identity, nd.Network, nd.Datastore) // Bitswap bstore := blockstore.NewBlockstore(nd.Datastore) @@ -54,7 +55,7 @@ func NewMockNode() (*IpfsNode, error) { nd.DAG = mdag.NewDAGService(bserv) // Namespace resolver - nd.Namesys = nsys.NewNameSystem(dht) + nd.Namesys = nsys.NewNameSystem(nd.Routing) // Path resolver nd.Resolver = &path.Resolver{DAG: nd.DAG} diff --git a/epictest/addcat_test.go b/epictest/addcat_test.go index 7c2d915ae..9b742d281 100644 --- a/epictest/addcat_test.go +++ b/epictest/addcat_test.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "io" + "math" "os" "testing" "time" @@ -16,12 +17,12 @@ import ( importer "github.com/jbenet/go-ipfs/importer" chunk "github.com/jbenet/go-ipfs/importer/chunk" merkledag "github.com/jbenet/go-ipfs/merkledag" + mocknet "github.com/jbenet/go-ipfs/net/mock" path "github.com/jbenet/go-ipfs/path" mockrouting "github.com/jbenet/go-ipfs/routing/mock" uio "github.com/jbenet/go-ipfs/unixfs/io" util "github.com/jbenet/go-ipfs/util" errors "github.com/jbenet/go-ipfs/util/debugerror" - delay "github.com/jbenet/go-ipfs/util/delay" ) const kSeed = 1 @@ -87,11 +88,14 @@ func RandomBytes(n int64) []byte { func AddCatBytes(data []byte, conf Config) error { ctx := context.Background() - rs := mockrouting.NewServerWithDelay(mockrouting.DelayConfig{ - Query: delay.Fixed(conf.RoutingLatency), - ValueVisibility: delay.Fixed(conf.RoutingLatency), + mn := mocknet.New(ctx) + // defer mn.Close() FIXME does mocknet require clean-up + mn.SetLinkDefaults(mocknet.LinkOptions{ + Latency: conf.NetworkLatency, + Bandwidth: math.MaxInt32, // TODO add to conf }) - net, err := tn.StreamNetWithDelay(ctx, rs, delay.Fixed(conf.NetworkLatency)) + dhtNetwork := mockrouting.NewDHTNetwork(mn) + net, err := tn.StreamNet(ctx, mn, dhtNetwork) if err != nil { return errors.Wrap(err) } @@ -100,6 +104,28 @@ func AddCatBytes(data []byte, conf Config) error { adder := sessionGenerator.Next() catter := sessionGenerator.Next() + // catter.Routing.Update(context.TODO(), adder.Peer) + + peers := mn.Peers() + if len(peers) != 2 { + return errors.New("peers not in network") + } + + for _, i := range peers { + for _, j := range peers { + if i == j { + continue + } + fmt.Println(i, " and ", j) + if _, err := mn.LinkPeers(i, j); err != nil { + return err + } + if err := mn.ConnectPeers(i, j); err != nil { + return err + } + } + } + catter.SetBlockstoreLatency(conf.BlockstoreLatency) adder.SetBlockstoreLatency(0) // disable blockstore latency during add operation diff --git a/epictest/bench_test.go b/epictest/bench_test.go index aa53147f4..a680f04fe 100644 --- a/epictest/bench_test.go +++ b/epictest/bench_test.go @@ -18,6 +18,7 @@ func benchmarkAddCat(numBytes int64, conf Config, b *testing.B) { var instant = Config{}.All_Instantaneous() +func BenchmarkInstantaneousAddCat1KB(b *testing.B) { benchmarkAddCat(1*KB, instant, b) } func BenchmarkInstantaneousAddCat1MB(b *testing.B) { benchmarkAddCat(1*MB, instant, b) } func BenchmarkInstantaneousAddCat2MB(b *testing.B) { benchmarkAddCat(2*MB, instant, b) } func BenchmarkInstantaneousAddCat4MB(b *testing.B) { benchmarkAddCat(4*MB, instant, b) } diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index 6da4aaeff..4ef0838a5 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -11,10 +11,10 @@ import ( blocks "github.com/jbenet/go-ipfs/blocks" blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil" tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" - peer "github.com/jbenet/go-ipfs/peer" mockrouting "github.com/jbenet/go-ipfs/routing/mock" u "github.com/jbenet/go-ipfs/util" delay "github.com/jbenet/go-ipfs/util/delay" + "github.com/jbenet/go-ipfs/util/testutil" ) // FIXME the tests are really sensitive to the network delay. fix them to work @@ -61,7 +61,7 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this defer g.Close() block := blocks.NewBlock([]byte("block")) - pinfo := peer.PeerInfo{ID: peer.ID("testing")} + pinfo := testutil.RandPeerOrFatal(t) rs.Client(pinfo).Provide(context.Background(), block.Key()) // but not on network solo := g.Next() diff --git a/exchange/bitswap/testnet/peernet.go b/exchange/bitswap/testnet/peernet.go index af2e57258..ef4f3d503 100644 --- a/exchange/bitswap/testnet/peernet.go +++ b/exchange/bitswap/testnet/peernet.go @@ -1,14 +1,12 @@ package bitswap import ( - "math" - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network" mockpeernet "github.com/jbenet/go-ipfs/net/mock" peer "github.com/jbenet/go-ipfs/peer" mockrouting "github.com/jbenet/go-ipfs/routing/mock" - delay "github.com/jbenet/go-ipfs/util/delay" testutil "github.com/jbenet/go-ipfs/util/testutil" ) @@ -17,16 +15,7 @@ type peernet struct { routingserver mockrouting.Server } -func StreamNetWithDelay( - ctx context.Context, - rs mockrouting.Server, - d delay.D) (Network, error) { - - net := mockpeernet.New(ctx) - net.SetLinkDefaults(mockpeernet.LinkOptions{ - Latency: d.Get(), - Bandwidth: math.MaxInt32, // TODO inject - }) +func StreamNet(ctx context.Context, net mockpeernet.Mocknet, rs mockrouting.Server) (Network, error) { return &peernet{net, rs}, nil } @@ -39,7 +28,7 @@ func (pn *peernet) Adapter(p testutil.Peer) bsnet.BitSwapNetwork { for _, other := range peers { pn.Mocknet.LinkPeers(p.ID(), other) } - routing := pn.routingserver.Client(peer.PeerInfo{ID: p.ID()}) + routing := pn.routingserver.ClientWithDatastore(context.TODO(), p, ds.NewMapDatastore()) return bsnet.NewFromIpfsNetwork(client, routing) } diff --git a/exchange/bitswap/testnet/virtual.go b/exchange/bitswap/testnet/virtual.go index 0bcffbe51..5811db3bb 100644 --- a/exchange/bitswap/testnet/virtual.go +++ b/exchange/bitswap/testnet/virtual.go @@ -33,7 +33,7 @@ func (n *network) Adapter(p testutil.Peer) bsnet.BitSwapNetwork { client := &networkClient{ local: p.ID(), network: n, - routing: n.routingserver.Client(peer.PeerInfo{ID: p.ID()}), + routing: n.routingserver.Client(p), } n.clients[p.ID()] = client return client diff --git a/exchange/bitswap/testutils.go b/exchange/bitswap/testutils.go index 728c2ba3b..9ad3cf312 100644 --- a/exchange/bitswap/testutils.go +++ b/exchange/bitswap/testutils.go @@ -79,15 +79,15 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration { // sessions. To safeguard, use the SessionGenerator to generate sessions. It's // just a much better idea. func session(ctx context.Context, net tn.Network, p testutil.Peer) Instance { - - adapter := net.Adapter(p) - bsdelay := delay.Fixed(0) const kWriteCacheElems = 100 - bstore, err := blockstore.WriteCached(blockstore.NewBlockstore(ds_sync.MutexWrap(datastore2.WithDelay(ds.NewMapDatastore(), bsdelay))), kWriteCacheElems) + + adapter := net.Adapter(p) + dstore := ds_sync.MutexWrap(datastore2.WithDelay(ds.NewMapDatastore(), bsdelay)) + + bstore, err := blockstore.WriteCached(blockstore.NewBlockstore(ds_sync.MutexWrap(dstore)), kWriteCacheElems) if err != nil { - // FIXME perhaps change signature and return error. - panic(err.Error()) + panic(err.Error()) // FIXME perhaps change signature and return error. } const alwaysSendToPeer = true diff --git a/namesys/resolve_test.go b/namesys/resolve_test.go index fb29490f3..74fb08982 100644 --- a/namesys/resolve_test.go +++ b/namesys/resolve_test.go @@ -4,18 +4,13 @@ import ( "testing" ci "github.com/jbenet/go-ipfs/crypto" - peer "github.com/jbenet/go-ipfs/peer" mockrouting "github.com/jbenet/go-ipfs/routing/mock" u "github.com/jbenet/go-ipfs/util" testutil "github.com/jbenet/go-ipfs/util/testutil" ) func TestRoutingResolve(t *testing.T) { - local, err := testutil.RandPeerID() - if err != nil { - t.Fatal(err) - } - d := mockrouting.NewServer().Client(peer.PeerInfo{ID: local}) + d := mockrouting.NewServer().Client(testutil.RandPeerOrFatal(t)) resolver := NewRoutingResolver(d) publisher := NewRoutingPublisher(d) diff --git a/net/mux.go b/net/mux.go index a1513325a..b8cebd55a 100644 --- a/net/mux.go +++ b/net/mux.go @@ -76,6 +76,7 @@ func (m *Mux) ReadProtocolHeader(s io.Reader) (string, StreamHandler, error) { // This operation is threadsafe. func (m *Mux) SetHandler(p ProtocolID, h StreamHandler) { m.Lock() + log.Debug("setting protocol ", p) m.Handlers[p] = h m.Unlock() } diff --git a/routing/dht/routing.go b/routing/dht/routing.go index c515324c5..34108f076 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -1,6 +1,7 @@ package dht import ( + "math" "sync" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" @@ -127,6 +128,15 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error { return nil } +// FindProviders searches until the context expires. +func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]peer.PeerInfo, error) { + var providers []peer.PeerInfo + for p := range dht.FindProvidersAsync(ctx, key, math.MaxInt32) { + providers = append(providers, p) + } + return providers, nil +} + // FindProvidersAsync is the same thing as FindProviders, but returns a channel. // Peers will be returned on the channel as soon as they are found, even before // the search query completes. diff --git a/routing/mock/centralized_client.go b/routing/mock/centralized_client.go index 9be43b653..0ba4be538 100644 --- a/routing/mock/centralized_client.go +++ b/routing/mock/centralized_client.go @@ -5,9 +5,11 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" peer "github.com/jbenet/go-ipfs/peer" routing "github.com/jbenet/go-ipfs/routing" u "github.com/jbenet/go-ipfs/util" + "github.com/jbenet/go-ipfs/util/testutil" ) var log = u.Logger("mockrouter") @@ -15,7 +17,7 @@ var log = u.Logger("mockrouter") type client struct { datastore ds.Datastore server server - peer peer.PeerInfo + peer testutil.Peer } // FIXME(brian): is this method meant to simulate putting a value into the network? @@ -70,7 +72,11 @@ func (c *client) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-cha // Provide returns once the message is on the network. Value is not necessarily // visible yet. func (c *client) Provide(_ context.Context, key u.Key) error { - return c.server.Announce(c.peer, key) + info := peer.PeerInfo{ + ID: c.peer.ID(), + Addrs: []ma.Multiaddr{c.peer.Address()}, + } + return c.server.Announce(info, key) } var _ routing.IpfsRouting = &client{} diff --git a/routing/mock/centralized_server.go b/routing/mock/centralized_server.go index 31ae4b730..10f81eb2c 100644 --- a/routing/mock/centralized_server.go +++ b/routing/mock/centralized_server.go @@ -5,9 +5,11 @@ import ( "sync" "time" + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" + "github.com/jbenet/go-ipfs/util/testutil" ) // server is the mockrouting.Client's private interface to the routing server @@ -71,11 +73,11 @@ func (rs *s) Providers(k u.Key) []peer.PeerInfo { return ret } -func (rs *s) Client(p peer.PeerInfo) Client { - return rs.ClientWithDatastore(p, ds.NewMapDatastore()) +func (rs *s) Client(p testutil.Peer) Client { + return rs.ClientWithDatastore(context.Background(), p, ds.NewMapDatastore()) } -func (rs *s) ClientWithDatastore(p peer.PeerInfo, datastore ds.Datastore) Client { +func (rs *s) ClientWithDatastore(_ context.Context, p testutil.Peer, datastore ds.Datastore) Client { return &client{ peer: p, datastore: ds.NewMapDatastore(), diff --git a/routing/mock/centralized_test.go b/routing/mock/centralized_test.go index 739edbc63..bda7ac004 100644 --- a/routing/mock/centralized_test.go +++ b/routing/mock/centralized_test.go @@ -8,11 +8,12 @@ import ( peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" delay "github.com/jbenet/go-ipfs/util/delay" + "github.com/jbenet/go-ipfs/util/testutil" ) func TestKeyNotFound(t *testing.T) { - var pi = peer.PeerInfo{ID: peer.ID("the peer id")} + var pi = testutil.RandPeerOrFatal(t) var key = u.Key("mock key") var ctx = context.Background() @@ -25,7 +26,7 @@ func TestKeyNotFound(t *testing.T) { } func TestClientFindProviders(t *testing.T) { - pi := peer.PeerInfo{ID: peer.ID("42")} + pi := testutil.RandPeerOrFatal(t) rs := NewServer() client := rs.Client(pi) @@ -39,20 +40,6 @@ func TestClientFindProviders(t *testing.T) { time.Sleep(time.Millisecond * 300) max := 100 - providersFromHashTable, err := rs.Client(pi).FindProviders(context.Background(), k) - if err != nil { - t.Fatal(err) - } - - isInHT := false - for _, pi := range providersFromHashTable { - if pi.ID == pi.ID { - isInHT = true - } - } - if !isInHT { - t.Fatal("Despite client providing key, peer wasn't in hash table as a provider") - } providersFromClient := client.FindProvidersAsync(context.Background(), u.Key("hello"), max) isInClient := false for pi := range providersFromClient { @@ -70,7 +57,7 @@ func TestClientOverMax(t *testing.T) { k := u.Key("hello") numProvidersForHelloKey := 100 for i := 0; i < numProvidersForHelloKey; i++ { - pi := peer.PeerInfo{ID: peer.ID(i)} + pi := testutil.RandPeerOrFatal(t) err := rs.Client(pi).Provide(context.Background(), k) if err != nil { t.Fatal(err) @@ -78,7 +65,7 @@ func TestClientOverMax(t *testing.T) { } max := 10 - pi := peer.PeerInfo{ID: peer.ID("TODO")} + pi := testutil.RandPeerOrFatal(t) client := rs.Client(pi) providersFromClient := client.FindProvidersAsync(context.Background(), k, max) @@ -113,8 +100,11 @@ func TestCanceledContext(t *testing.T) { default: } - pi := peer.PeerInfo{ID: peer.ID(i)} - err := rs.Client(pi).Provide(context.Background(), k) + pi, err := testutil.RandPeer() + if err != nil { + t.Error(err) + } + err = rs.Client(pi).Provide(context.Background(), k) if err != nil { t.Error(err) } @@ -122,7 +112,7 @@ func TestCanceledContext(t *testing.T) { } }() - local := peer.PeerInfo{ID: peer.ID("peer id doesn't matter")} + local := testutil.RandPeerOrFatal(t) client := rs.Client(local) t.Log("warning: max is finite so this test is non-deterministic") @@ -148,7 +138,7 @@ func TestCanceledContext(t *testing.T) { func TestValidAfter(t *testing.T) { - var pi = peer.PeerInfo{ID: peer.ID("the peer id")} + pi := testutil.RandPeerOrFatal(t) var key = u.Key("mock key") var ctx = context.Background() conf := DelayConfig{ diff --git a/routing/mock/interface.go b/routing/mock/interface.go index abb869eb4..3ff1ca059 100644 --- a/routing/mock/interface.go +++ b/routing/mock/interface.go @@ -11,18 +11,18 @@ import ( routing "github.com/jbenet/go-ipfs/routing" u "github.com/jbenet/go-ipfs/util" delay "github.com/jbenet/go-ipfs/util/delay" + "github.com/jbenet/go-ipfs/util/testutil" ) // Server provides mockrouting Clients type Server interface { - Client(p peer.PeerInfo) Client - ClientWithDatastore(peer.PeerInfo, ds.Datastore) Client + Client(p testutil.Peer) Client + ClientWithDatastore(context.Context, testutil.Peer, ds.Datastore) Client } // Client implements IpfsRouting type Client interface { FindProviders(context.Context, u.Key) ([]peer.PeerInfo, error) - routing.IpfsRouting } diff --git a/routing/mock/server2.go b/routing/mock/server2.go new file mode 100644 index 000000000..dc3dccdfa --- /dev/null +++ b/routing/mock/server2.go @@ -0,0 +1,38 @@ +package mockrouting + +import ( + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" + mocknet "github.com/jbenet/go-ipfs/net/mock" + dht "github.com/jbenet/go-ipfs/routing/dht" + "github.com/jbenet/go-ipfs/util/testutil" +) + +type mocknetserver struct { + mn mocknet.Mocknet +} + +func NewDHTNetwork(mn mocknet.Mocknet) Server { + return &mocknetserver{ + mn: mn, + } +} + +func (rs *mocknetserver) Client(p testutil.Peer) Client { + return rs.ClientWithDatastore(context.TODO(), p, ds.NewMapDatastore()) +} + +func (rs *mocknetserver) ClientWithDatastore(ctx context.Context, p testutil.Peer, ds ds.Datastore) Client { + + // FIXME AddPeer doesn't appear to be idempotent + + net, err := rs.mn.AddPeer(p.PrivateKey(), p.Address()) + if err != nil { + panic("FIXME") + // return nil, debugerror.Wrap(err) + } + return dht.NewDHT(ctx, p.ID(), net, sync.MutexWrap(ds)) +} + +var _ Server = &mocknetserver{}