diff --git a/blockservice/mock.go b/blockservice/mock.go index 277519746..57432178e 100644 --- a/blockservice/mock.go +++ b/blockservice/mock.go @@ -11,9 +11,8 @@ import ( // Mocks returns |n| connected mock Blockservices func Mocks(t *testing.T, n int) []*BlockService { - net := tn.VirtualNetwork(delay.Fixed(0)) - rs := mockrouting.NewServer() - sg := bitswap.NewSessionGenerator(net, rs) + net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0)) + sg := bitswap.NewSessionGenerator(net) instances := sg.Instances(n) diff --git a/core/core.go b/core/core.go index e19f05ea5..7cabef281 100644 --- a/core/core.go +++ b/core/core.go @@ -150,9 +150,9 @@ func NewIpfsNode(ctx context.Context, cfg *config.Config, online bool) (n *IpfsN // setup exchange service const alwaysSendToPeer = true // use YesManStrategy - bitswapNetwork := bsnet.NewFromIpfsNetwork(n.Network) + bitswapNetwork := bsnet.NewFromIpfsNetwork(n.Network, n.Routing) - n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, n.Routing, blockstore, alwaysSendToPeer) + n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, blockstore, alwaysSendToPeer) // TODO consider moving connection supervision into the Network. We've // discussed improvements to this Node constructor. One improvement diff --git a/epictest/addcat_test.go b/epictest/addcat_test.go index 09ba58f79..7cf4d04e8 100644 --- a/epictest/addcat_test.go +++ b/epictest/addcat_test.go @@ -87,11 +87,12 @@ func RandomBytes(n int64) []byte { func AddCatBytes(data []byte, conf Config) error { sessionGenerator := bitswap.NewSessionGenerator( - tn.VirtualNetwork(delay.Fixed(conf.NetworkLatency)), // TODO rename VirtualNetwork - mockrouting.NewServerWithDelay(mockrouting.DelayConfig{ - Query: delay.Fixed(conf.RoutingLatency), - ValueVisibility: delay.Fixed(conf.RoutingLatency), - }), + tn.VirtualNetwork( + mockrouting.NewServerWithDelay(mockrouting.DelayConfig{ + Query: delay.Fixed(conf.RoutingLatency), + ValueVisibility: delay.Fixed(conf.RoutingLatency), + }), + delay.Fixed(conf.NetworkLatency)), // TODO rename VirtualNetwork ) defer sessionGenerator.Close() diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 20db60a00..58cdb54a5 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -46,7 +46,7 @@ var ( // BitSwapNetwork. This function registers the returned instance as the network // delegate. // Runs until context is cancelled. -func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, routing bsnet.Routing, +func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, bstore blockstore.Blockstore, nice bool) exchange.Interface { ctx, cancelFunc := context.WithCancel(parent) @@ -63,7 +63,6 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, routin cancelFunc: cancelFunc, notifications: notif, engine: decision.NewEngine(ctx, bstore), - routing: routing, network: network, wantlist: wantlist.NewThreadSafe(), batchRequests: make(chan []u.Key, sizeBatchRequestChan), @@ -85,9 +84,6 @@ type bitswap struct { // NB: ensure threadsafety blockstore blockstore.Blockstore - // routing interface for communication - routing bsnet.Routing - notifications notifications.PubSub // Requests for a set of related blocks @@ -165,7 +161,7 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { } bs.wantlist.Remove(blk.Key()) bs.notifications.Publish(blk) - return bs.routing.Provide(ctx, blk.Key()) + return bs.network.Provide(ctx, blk.Key()) } func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.PeerInfo) error { @@ -212,7 +208,7 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wantli go func(k u.Key) { defer wg.Done() child, _ := context.WithTimeout(ctx, providerRequestTimeout) - providers := bs.routing.FindProvidersAsync(child, k, maxProvidersPerRequest) + providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest) for prov := range providers { bs.network.Peerstore().AddAddresses(prov.ID, prov.Addrs) if set.TryAdd(prov.ID) { //Do once per peer @@ -265,7 +261,7 @@ func (bs *bitswap) clientWorker(parent context.Context) { // it. Later, this assumption may not hold as true if we implement // newer bitswap strategies. child, _ := context.WithTimeout(ctx, providerRequestTimeout) - providers := bs.routing.FindProvidersAsync(child, ks[0], maxProvidersPerRequest) + providers := bs.network.FindProvidersAsync(child, ks[0], maxProvidersPerRequest) err := bs.sendWantListTo(ctx, providers) if err != nil { log.Errorf("error sending wantlist: %s", err) diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index e0f2740e0..6da4aaeff 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -24,9 +24,8 @@ const kNetworkDelay = 0 * time.Millisecond func TestClose(t *testing.T) { // TODO t.Skip("TODO Bitswap's Close implementation is a WIP") - vnet := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) - rout := mockrouting.NewServer() - sesgen := NewSessionGenerator(vnet, rout) + vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + sesgen := NewSessionGenerator(vnet) defer sesgen.Close() bgen := blocksutil.NewBlockGenerator() @@ -39,9 +38,8 @@ func TestClose(t *testing.T) { func TestGetBlockTimeout(t *testing.T) { - net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) - rs := mockrouting.NewServer() - g := NewSessionGenerator(net, rs) + net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + g := NewSessionGenerator(net) defer g.Close() self := g.Next() @@ -55,11 +53,11 @@ func TestGetBlockTimeout(t *testing.T) { } } -func TestProviderForKeyButNetworkCannotFind(t *testing.T) { +func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this - net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) rs := mockrouting.NewServer() - g := NewSessionGenerator(net, rs) + net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay)) + g := NewSessionGenerator(net) defer g.Close() block := blocks.NewBlock([]byte("block")) @@ -81,10 +79,9 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { - net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) - rs := mockrouting.NewServer() + net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) block := blocks.NewBlock([]byte("block")) - g := NewSessionGenerator(net, rs) + g := NewSessionGenerator(net) defer g.Close() hasBlock := g.Next() @@ -136,9 +133,8 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { if testing.Short() { t.SkipNow() } - net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) - rs := mockrouting.NewServer() - sg := NewSessionGenerator(net, rs) + net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + sg := NewSessionGenerator(net) defer sg.Close() bg := blocksutil.NewBlockGenerator() @@ -152,10 +148,9 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { var blkeys []u.Key first := instances[0] for _, b := range blocks { - first.Blockstore().Put(b) + first.Blockstore().Put(b) // TODO remove. don't need to do this. bitswap owns block blkeys = append(blkeys, b.Key()) first.Exchange.HasBlock(context.Background(), b) - rs.Client(peer.PeerInfo{ID: first.Peer}).Provide(context.Background(), b.Key()) } t.Log("Distribute!") @@ -202,9 +197,8 @@ func TestSendToWantingPeer(t *testing.T) { t.SkipNow() } - net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) - rs := mockrouting.NewServer() - sg := NewSessionGenerator(net, rs) + net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + sg := NewSessionGenerator(net) defer sg.Close() bg := blocksutil.NewBlockGenerator() @@ -248,9 +242,8 @@ func TestSendToWantingPeer(t *testing.T) { } func TestBasicBitswap(t *testing.T) { - net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) - rs := mockrouting.NewServer() - sg := NewSessionGenerator(net, rs) + net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + sg := NewSessionGenerator(net) bg := blocksutil.NewBlockGenerator() t.Log("Test a few nodes trying to get one file with a lot of blocks") diff --git a/exchange/bitswap/network/interface.go b/exchange/bitswap/network/interface.go index fc9a7ddaa..3bf5eb0f6 100644 --- a/exchange/bitswap/network/interface.go +++ b/exchange/bitswap/network/interface.go @@ -31,6 +31,8 @@ type BitSwapNetwork interface { // SetDelegate registers the Reciver to handle messages received from the // network. SetDelegate(Receiver) + + Routing } // Implement Receiver to receive messages from the BitSwapNetwork diff --git a/exchange/bitswap/network/ipfs_impl.go b/exchange/bitswap/network/ipfs_impl.go index ea52ad8d7..4258579eb 100644 --- a/exchange/bitswap/network/ipfs_impl.go +++ b/exchange/bitswap/network/ipfs_impl.go @@ -13,9 +13,10 @@ var log = util.Logger("bitswap_network") // NewFromIpfsNetwork returns a BitSwapNetwork supported by underlying IPFS // Dialer & Service -func NewFromIpfsNetwork(n inet.Network) BitSwapNetwork { +func NewFromIpfsNetwork(n inet.Network, r Routing) BitSwapNetwork { bitswapNetwork := impl{ network: n, + routing: r, } n.SetHandler(inet.ProtocolBitswap, bitswapNetwork.handleNewStream) return &bitswapNetwork @@ -25,6 +26,7 @@ func NewFromIpfsNetwork(n inet.Network) BitSwapNetwork { // NetMessage objects, into the bitswap network interface. type impl struct { network inet.Network + routing Routing // inbound messages from the network are forwarded to the receiver receiver Receiver @@ -74,6 +76,16 @@ func (bsnet *impl) Peerstore() peer.Peerstore { return bsnet.Peerstore() } +// FindProvidersAsync returns a channel of providers for the given key +func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.PeerInfo { // TODO change to return ID + return bsnet.routing.FindProvidersAsync(ctx, k, max) +} + +// Provide provides the key to the network +func (bsnet *impl) Provide(ctx context.Context, k util.Key) error { + return bsnet.routing.Provide(ctx, k) +} + // handleNewStream receives a new stream from the network. func (bsnet *impl) handleNewStream(s inet.Stream) { diff --git a/exchange/bitswap/testnet/network.go b/exchange/bitswap/testnet/network.go index aa9b879fc..08c30a7d4 100644 --- a/exchange/bitswap/testnet/network.go +++ b/exchange/bitswap/testnet/network.go @@ -5,10 +5,12 @@ import ( "fmt" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + "github.com/jbenet/go-ipfs/routing/mock" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network" peer "github.com/jbenet/go-ipfs/peer" + "github.com/jbenet/go-ipfs/util" delay "github.com/jbenet/go-ipfs/util/delay" ) @@ -33,16 +35,18 @@ type Network interface { // network impl -func VirtualNetwork(d delay.D) Network { +func VirtualNetwork(rs mockrouting.Server, d delay.D) Network { return &network{ clients: make(map[peer.ID]bsnet.Receiver), delay: d, + routingserver: rs, } } type network struct { - clients map[peer.ID]bsnet.Receiver - delay delay.D + clients map[peer.ID]bsnet.Receiver + routingserver mockrouting.Server + delay delay.D } func (n *network) Adapter(p peer.ID) bsnet.BitSwapNetwork { @@ -50,6 +54,7 @@ func (n *network) Adapter(p peer.ID) bsnet.BitSwapNetwork { local: p, network: n, peerstore: peer.NewPeerstore(), + routing: n.routingserver.Client(peer.PeerInfo{ID: p}), } n.clients[p] = client return client @@ -151,6 +156,7 @@ type networkClient struct { bsnet.Receiver network Network peerstore peer.Peerstore + routing bsnet.Routing } func (nc *networkClient) SendMessage( @@ -167,6 +173,16 @@ func (nc *networkClient) SendRequest( return nc.network.SendRequest(ctx, nc.local, to, message) } +// FindProvidersAsync returns a channel of providers for the given key +func (nc *networkClient) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.PeerInfo { // TODO change to return ID + return nc.routing.FindProvidersAsync(ctx, k, max) +} + +// Provide provides the key to the network +func (nc *networkClient) Provide(ctx context.Context, k util.Key) error { + return nc.routing.Provide(ctx, k) +} + func (nc *networkClient) DialPeer(ctx context.Context, p peer.ID) error { // no need to do anything because dialing isn't a thing in this test net. if !nc.network.HasPeer(p) { diff --git a/exchange/bitswap/testnet/network_test.go b/exchange/bitswap/testnet/network_test.go index d47cb71e7..0728f63d6 100644 --- a/exchange/bitswap/testnet/network_test.go +++ b/exchange/bitswap/testnet/network_test.go @@ -11,10 +11,11 @@ import ( bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network" peer "github.com/jbenet/go-ipfs/peer" delay "github.com/jbenet/go-ipfs/util/delay" + mockrouting "github.com/jbenet/go-ipfs/routing/mock" ) func TestSendRequestToCooperativePeer(t *testing.T) { - net := VirtualNetwork(delay.Fixed(0)) + net := VirtualNetwork(mockrouting.NewServer(),delay.Fixed(0)) idOfRecipient := peer.ID("recipient") @@ -65,7 +66,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) { } func TestSendMessageAsyncButWaitForResponse(t *testing.T) { - net := VirtualNetwork(delay.Fixed(0)) + net := VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0)) idOfResponder := peer.ID("responder") waiter := net.Adapter(peer.ID("waiter")) responder := net.Adapter(idOfResponder) diff --git a/exchange/bitswap/testutils.go b/exchange/bitswap/testutils.go index 09ac1c363..70c1bd7a5 100644 --- a/exchange/bitswap/testutils.go +++ b/exchange/bitswap/testutils.go @@ -10,18 +10,16 @@ import ( exchange "github.com/jbenet/go-ipfs/exchange" tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" peer "github.com/jbenet/go-ipfs/peer" - mockrouting "github.com/jbenet/go-ipfs/routing/mock" datastore2 "github.com/jbenet/go-ipfs/util/datastore2" delay "github.com/jbenet/go-ipfs/util/delay" ) func NewSessionGenerator( - net tn.Network, rs mockrouting.Server) SessionGenerator { + net tn.Network) SessionGenerator { ctx, cancel := context.WithCancel(context.TODO()) return SessionGenerator{ ps: peer.NewPeerstore(), net: net, - rs: rs, seq: 0, ctx: ctx, // TODO take ctx as param to Next, Instances cancel: cancel, @@ -31,7 +29,6 @@ func NewSessionGenerator( type SessionGenerator struct { seq int net tn.Network - rs mockrouting.Server ps peer.Peerstore ctx context.Context cancel context.CancelFunc @@ -44,7 +41,7 @@ func (g *SessionGenerator) Close() error { func (g *SessionGenerator) Next() Instance { g.seq++ - return session(g.ctx, g.net, g.rs, g.ps, peer.ID(g.seq)) + return session(g.ctx, g.net, g.ps, peer.ID(g.seq)) } func (g *SessionGenerator) Instances(n int) []Instance { @@ -77,10 +74,9 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration { // NB: It's easy make mistakes by providing the same peer ID to two different // sessions. To safeguard, use the SessionGenerator to generate sessions. It's // just a much better idea. -func session(ctx context.Context, net tn.Network, rs mockrouting.Server, ps peer.Peerstore, p peer.ID) Instance { +func session(ctx context.Context, net tn.Network, ps peer.Peerstore, p peer.ID) Instance { adapter := net.Adapter(p) - htc := rs.Client(peer.PeerInfo{ID: p}) bsdelay := delay.Fixed(0) const kWriteCacheElems = 100 @@ -92,7 +88,7 @@ func session(ctx context.Context, net tn.Network, rs mockrouting.Server, ps peer const alwaysSendToPeer = true - bs := New(ctx, p, adapter, htc, bstore, alwaysSendToPeer) + bs := New(ctx, p, adapter, bstore, alwaysSendToPeer) return Instance{ Peer: p,