mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-01 02:30:39 +08:00
refactor(bitswap) bitswap.Network now abstracts ipfs.Network + ipfs.Routing
@jbenet @whyrusleeping the next commit will change bitswap.Network.FindProviders to only deal with IDs
This commit is contained in:

committed by
Juan Batiz-Benet

parent
c6294646e9
commit
27dc9594ba
@ -11,9 +11,8 @@ import (
|
|||||||
|
|
||||||
// Mocks returns |n| connected mock Blockservices
|
// Mocks returns |n| connected mock Blockservices
|
||||||
func Mocks(t *testing.T, n int) []*BlockService {
|
func Mocks(t *testing.T, n int) []*BlockService {
|
||||||
net := tn.VirtualNetwork(delay.Fixed(0))
|
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0))
|
||||||
rs := mockrouting.NewServer()
|
sg := bitswap.NewSessionGenerator(net)
|
||||||
sg := bitswap.NewSessionGenerator(net, rs)
|
|
||||||
|
|
||||||
instances := sg.Instances(n)
|
instances := sg.Instances(n)
|
||||||
|
|
||||||
|
@ -150,9 +150,9 @@ func NewIpfsNode(ctx context.Context, cfg *config.Config, online bool) (n *IpfsN
|
|||||||
|
|
||||||
// setup exchange service
|
// setup exchange service
|
||||||
const alwaysSendToPeer = true // use YesManStrategy
|
const alwaysSendToPeer = true // use YesManStrategy
|
||||||
bitswapNetwork := bsnet.NewFromIpfsNetwork(n.Network)
|
bitswapNetwork := bsnet.NewFromIpfsNetwork(n.Network, n.Routing)
|
||||||
|
|
||||||
n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, n.Routing, blockstore, alwaysSendToPeer)
|
n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, blockstore, alwaysSendToPeer)
|
||||||
|
|
||||||
// TODO consider moving connection supervision into the Network. We've
|
// TODO consider moving connection supervision into the Network. We've
|
||||||
// discussed improvements to this Node constructor. One improvement
|
// discussed improvements to this Node constructor. One improvement
|
||||||
|
@ -87,11 +87,12 @@ func RandomBytes(n int64) []byte {
|
|||||||
func AddCatBytes(data []byte, conf Config) error {
|
func AddCatBytes(data []byte, conf Config) error {
|
||||||
|
|
||||||
sessionGenerator := bitswap.NewSessionGenerator(
|
sessionGenerator := bitswap.NewSessionGenerator(
|
||||||
tn.VirtualNetwork(delay.Fixed(conf.NetworkLatency)), // TODO rename VirtualNetwork
|
tn.VirtualNetwork(
|
||||||
mockrouting.NewServerWithDelay(mockrouting.DelayConfig{
|
mockrouting.NewServerWithDelay(mockrouting.DelayConfig{
|
||||||
Query: delay.Fixed(conf.RoutingLatency),
|
Query: delay.Fixed(conf.RoutingLatency),
|
||||||
ValueVisibility: delay.Fixed(conf.RoutingLatency),
|
ValueVisibility: delay.Fixed(conf.RoutingLatency),
|
||||||
}),
|
}),
|
||||||
|
delay.Fixed(conf.NetworkLatency)), // TODO rename VirtualNetwork
|
||||||
)
|
)
|
||||||
defer sessionGenerator.Close()
|
defer sessionGenerator.Close()
|
||||||
|
|
||||||
|
@ -46,7 +46,7 @@ var (
|
|||||||
// BitSwapNetwork. This function registers the returned instance as the network
|
// BitSwapNetwork. This function registers the returned instance as the network
|
||||||
// delegate.
|
// delegate.
|
||||||
// Runs until context is cancelled.
|
// Runs until context is cancelled.
|
||||||
func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, routing bsnet.Routing,
|
func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
|
||||||
bstore blockstore.Blockstore, nice bool) exchange.Interface {
|
bstore blockstore.Blockstore, nice bool) exchange.Interface {
|
||||||
|
|
||||||
ctx, cancelFunc := context.WithCancel(parent)
|
ctx, cancelFunc := context.WithCancel(parent)
|
||||||
@ -63,7 +63,6 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, routin
|
|||||||
cancelFunc: cancelFunc,
|
cancelFunc: cancelFunc,
|
||||||
notifications: notif,
|
notifications: notif,
|
||||||
engine: decision.NewEngine(ctx, bstore),
|
engine: decision.NewEngine(ctx, bstore),
|
||||||
routing: routing,
|
|
||||||
network: network,
|
network: network,
|
||||||
wantlist: wantlist.NewThreadSafe(),
|
wantlist: wantlist.NewThreadSafe(),
|
||||||
batchRequests: make(chan []u.Key, sizeBatchRequestChan),
|
batchRequests: make(chan []u.Key, sizeBatchRequestChan),
|
||||||
@ -85,9 +84,6 @@ type bitswap struct {
|
|||||||
// NB: ensure threadsafety
|
// NB: ensure threadsafety
|
||||||
blockstore blockstore.Blockstore
|
blockstore blockstore.Blockstore
|
||||||
|
|
||||||
// routing interface for communication
|
|
||||||
routing bsnet.Routing
|
|
||||||
|
|
||||||
notifications notifications.PubSub
|
notifications notifications.PubSub
|
||||||
|
|
||||||
// Requests for a set of related blocks
|
// Requests for a set of related blocks
|
||||||
@ -165,7 +161,7 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
|
|||||||
}
|
}
|
||||||
bs.wantlist.Remove(blk.Key())
|
bs.wantlist.Remove(blk.Key())
|
||||||
bs.notifications.Publish(blk)
|
bs.notifications.Publish(blk)
|
||||||
return bs.routing.Provide(ctx, blk.Key())
|
return bs.network.Provide(ctx, blk.Key())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.PeerInfo) error {
|
func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.PeerInfo) error {
|
||||||
@ -212,7 +208,7 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wantli
|
|||||||
go func(k u.Key) {
|
go func(k u.Key) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
child, _ := context.WithTimeout(ctx, providerRequestTimeout)
|
child, _ := context.WithTimeout(ctx, providerRequestTimeout)
|
||||||
providers := bs.routing.FindProvidersAsync(child, k, maxProvidersPerRequest)
|
providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest)
|
||||||
for prov := range providers {
|
for prov := range providers {
|
||||||
bs.network.Peerstore().AddAddresses(prov.ID, prov.Addrs)
|
bs.network.Peerstore().AddAddresses(prov.ID, prov.Addrs)
|
||||||
if set.TryAdd(prov.ID) { //Do once per peer
|
if set.TryAdd(prov.ID) { //Do once per peer
|
||||||
@ -265,7 +261,7 @@ func (bs *bitswap) clientWorker(parent context.Context) {
|
|||||||
// it. Later, this assumption may not hold as true if we implement
|
// it. Later, this assumption may not hold as true if we implement
|
||||||
// newer bitswap strategies.
|
// newer bitswap strategies.
|
||||||
child, _ := context.WithTimeout(ctx, providerRequestTimeout)
|
child, _ := context.WithTimeout(ctx, providerRequestTimeout)
|
||||||
providers := bs.routing.FindProvidersAsync(child, ks[0], maxProvidersPerRequest)
|
providers := bs.network.FindProvidersAsync(child, ks[0], maxProvidersPerRequest)
|
||||||
err := bs.sendWantListTo(ctx, providers)
|
err := bs.sendWantListTo(ctx, providers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("error sending wantlist: %s", err)
|
log.Errorf("error sending wantlist: %s", err)
|
||||||
|
@ -24,9 +24,8 @@ const kNetworkDelay = 0 * time.Millisecond
|
|||||||
func TestClose(t *testing.T) {
|
func TestClose(t *testing.T) {
|
||||||
// TODO
|
// TODO
|
||||||
t.Skip("TODO Bitswap's Close implementation is a WIP")
|
t.Skip("TODO Bitswap's Close implementation is a WIP")
|
||||||
vnet := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
|
vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
|
||||||
rout := mockrouting.NewServer()
|
sesgen := NewSessionGenerator(vnet)
|
||||||
sesgen := NewSessionGenerator(vnet, rout)
|
|
||||||
defer sesgen.Close()
|
defer sesgen.Close()
|
||||||
bgen := blocksutil.NewBlockGenerator()
|
bgen := blocksutil.NewBlockGenerator()
|
||||||
|
|
||||||
@ -39,9 +38,8 @@ func TestClose(t *testing.T) {
|
|||||||
|
|
||||||
func TestGetBlockTimeout(t *testing.T) {
|
func TestGetBlockTimeout(t *testing.T) {
|
||||||
|
|
||||||
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
|
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
|
||||||
rs := mockrouting.NewServer()
|
g := NewSessionGenerator(net)
|
||||||
g := NewSessionGenerator(net, rs)
|
|
||||||
defer g.Close()
|
defer g.Close()
|
||||||
|
|
||||||
self := g.Next()
|
self := g.Next()
|
||||||
@ -55,11 +53,11 @@ func TestGetBlockTimeout(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestProviderForKeyButNetworkCannotFind(t *testing.T) {
|
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
|
||||||
|
|
||||||
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
|
|
||||||
rs := mockrouting.NewServer()
|
rs := mockrouting.NewServer()
|
||||||
g := NewSessionGenerator(net, rs)
|
net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
|
||||||
|
g := NewSessionGenerator(net)
|
||||||
defer g.Close()
|
defer g.Close()
|
||||||
|
|
||||||
block := blocks.NewBlock([]byte("block"))
|
block := blocks.NewBlock([]byte("block"))
|
||||||
@ -81,10 +79,9 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) {
|
|||||||
|
|
||||||
func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
|
func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
|
||||||
|
|
||||||
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
|
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
|
||||||
rs := mockrouting.NewServer()
|
|
||||||
block := blocks.NewBlock([]byte("block"))
|
block := blocks.NewBlock([]byte("block"))
|
||||||
g := NewSessionGenerator(net, rs)
|
g := NewSessionGenerator(net)
|
||||||
defer g.Close()
|
defer g.Close()
|
||||||
|
|
||||||
hasBlock := g.Next()
|
hasBlock := g.Next()
|
||||||
@ -136,9 +133,8 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
|
|||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.SkipNow()
|
t.SkipNow()
|
||||||
}
|
}
|
||||||
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
|
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
|
||||||
rs := mockrouting.NewServer()
|
sg := NewSessionGenerator(net)
|
||||||
sg := NewSessionGenerator(net, rs)
|
|
||||||
defer sg.Close()
|
defer sg.Close()
|
||||||
bg := blocksutil.NewBlockGenerator()
|
bg := blocksutil.NewBlockGenerator()
|
||||||
|
|
||||||
@ -152,10 +148,9 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
|
|||||||
var blkeys []u.Key
|
var blkeys []u.Key
|
||||||
first := instances[0]
|
first := instances[0]
|
||||||
for _, b := range blocks {
|
for _, b := range blocks {
|
||||||
first.Blockstore().Put(b)
|
first.Blockstore().Put(b) // TODO remove. don't need to do this. bitswap owns block
|
||||||
blkeys = append(blkeys, b.Key())
|
blkeys = append(blkeys, b.Key())
|
||||||
first.Exchange.HasBlock(context.Background(), b)
|
first.Exchange.HasBlock(context.Background(), b)
|
||||||
rs.Client(peer.PeerInfo{ID: first.Peer}).Provide(context.Background(), b.Key())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Log("Distribute!")
|
t.Log("Distribute!")
|
||||||
@ -202,9 +197,8 @@ func TestSendToWantingPeer(t *testing.T) {
|
|||||||
t.SkipNow()
|
t.SkipNow()
|
||||||
}
|
}
|
||||||
|
|
||||||
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
|
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
|
||||||
rs := mockrouting.NewServer()
|
sg := NewSessionGenerator(net)
|
||||||
sg := NewSessionGenerator(net, rs)
|
|
||||||
defer sg.Close()
|
defer sg.Close()
|
||||||
bg := blocksutil.NewBlockGenerator()
|
bg := blocksutil.NewBlockGenerator()
|
||||||
|
|
||||||
@ -248,9 +242,8 @@ func TestSendToWantingPeer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestBasicBitswap(t *testing.T) {
|
func TestBasicBitswap(t *testing.T) {
|
||||||
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
|
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
|
||||||
rs := mockrouting.NewServer()
|
sg := NewSessionGenerator(net)
|
||||||
sg := NewSessionGenerator(net, rs)
|
|
||||||
bg := blocksutil.NewBlockGenerator()
|
bg := blocksutil.NewBlockGenerator()
|
||||||
|
|
||||||
t.Log("Test a few nodes trying to get one file with a lot of blocks")
|
t.Log("Test a few nodes trying to get one file with a lot of blocks")
|
||||||
|
@ -31,6 +31,8 @@ type BitSwapNetwork interface {
|
|||||||
// SetDelegate registers the Reciver to handle messages received from the
|
// SetDelegate registers the Reciver to handle messages received from the
|
||||||
// network.
|
// network.
|
||||||
SetDelegate(Receiver)
|
SetDelegate(Receiver)
|
||||||
|
|
||||||
|
Routing
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implement Receiver to receive messages from the BitSwapNetwork
|
// Implement Receiver to receive messages from the BitSwapNetwork
|
||||||
|
@ -13,9 +13,10 @@ var log = util.Logger("bitswap_network")
|
|||||||
|
|
||||||
// NewFromIpfsNetwork returns a BitSwapNetwork supported by underlying IPFS
|
// NewFromIpfsNetwork returns a BitSwapNetwork supported by underlying IPFS
|
||||||
// Dialer & Service
|
// Dialer & Service
|
||||||
func NewFromIpfsNetwork(n inet.Network) BitSwapNetwork {
|
func NewFromIpfsNetwork(n inet.Network, r Routing) BitSwapNetwork {
|
||||||
bitswapNetwork := impl{
|
bitswapNetwork := impl{
|
||||||
network: n,
|
network: n,
|
||||||
|
routing: r,
|
||||||
}
|
}
|
||||||
n.SetHandler(inet.ProtocolBitswap, bitswapNetwork.handleNewStream)
|
n.SetHandler(inet.ProtocolBitswap, bitswapNetwork.handleNewStream)
|
||||||
return &bitswapNetwork
|
return &bitswapNetwork
|
||||||
@ -25,6 +26,7 @@ func NewFromIpfsNetwork(n inet.Network) BitSwapNetwork {
|
|||||||
// NetMessage objects, into the bitswap network interface.
|
// NetMessage objects, into the bitswap network interface.
|
||||||
type impl struct {
|
type impl struct {
|
||||||
network inet.Network
|
network inet.Network
|
||||||
|
routing Routing
|
||||||
|
|
||||||
// inbound messages from the network are forwarded to the receiver
|
// inbound messages from the network are forwarded to the receiver
|
||||||
receiver Receiver
|
receiver Receiver
|
||||||
@ -74,6 +76,16 @@ func (bsnet *impl) Peerstore() peer.Peerstore {
|
|||||||
return bsnet.Peerstore()
|
return bsnet.Peerstore()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FindProvidersAsync returns a channel of providers for the given key
|
||||||
|
func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.PeerInfo { // TODO change to return ID
|
||||||
|
return bsnet.routing.FindProvidersAsync(ctx, k, max)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Provide provides the key to the network
|
||||||
|
func (bsnet *impl) Provide(ctx context.Context, k util.Key) error {
|
||||||
|
return bsnet.routing.Provide(ctx, k)
|
||||||
|
}
|
||||||
|
|
||||||
// handleNewStream receives a new stream from the network.
|
// handleNewStream receives a new stream from the network.
|
||||||
func (bsnet *impl) handleNewStream(s inet.Stream) {
|
func (bsnet *impl) handleNewStream(s inet.Stream) {
|
||||||
|
|
||||||
|
@ -5,10 +5,12 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
|
"github.com/jbenet/go-ipfs/routing/mock"
|
||||||
|
|
||||||
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
|
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
|
||||||
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
|
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
|
||||||
peer "github.com/jbenet/go-ipfs/peer"
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
|
"github.com/jbenet/go-ipfs/util"
|
||||||
delay "github.com/jbenet/go-ipfs/util/delay"
|
delay "github.com/jbenet/go-ipfs/util/delay"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -33,16 +35,18 @@ type Network interface {
|
|||||||
|
|
||||||
// network impl
|
// network impl
|
||||||
|
|
||||||
func VirtualNetwork(d delay.D) Network {
|
func VirtualNetwork(rs mockrouting.Server, d delay.D) Network {
|
||||||
return &network{
|
return &network{
|
||||||
clients: make(map[peer.ID]bsnet.Receiver),
|
clients: make(map[peer.ID]bsnet.Receiver),
|
||||||
delay: d,
|
delay: d,
|
||||||
|
routingserver: rs,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type network struct {
|
type network struct {
|
||||||
clients map[peer.ID]bsnet.Receiver
|
clients map[peer.ID]bsnet.Receiver
|
||||||
delay delay.D
|
routingserver mockrouting.Server
|
||||||
|
delay delay.D
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *network) Adapter(p peer.ID) bsnet.BitSwapNetwork {
|
func (n *network) Adapter(p peer.ID) bsnet.BitSwapNetwork {
|
||||||
@ -50,6 +54,7 @@ func (n *network) Adapter(p peer.ID) bsnet.BitSwapNetwork {
|
|||||||
local: p,
|
local: p,
|
||||||
network: n,
|
network: n,
|
||||||
peerstore: peer.NewPeerstore(),
|
peerstore: peer.NewPeerstore(),
|
||||||
|
routing: n.routingserver.Client(peer.PeerInfo{ID: p}),
|
||||||
}
|
}
|
||||||
n.clients[p] = client
|
n.clients[p] = client
|
||||||
return client
|
return client
|
||||||
@ -151,6 +156,7 @@ type networkClient struct {
|
|||||||
bsnet.Receiver
|
bsnet.Receiver
|
||||||
network Network
|
network Network
|
||||||
peerstore peer.Peerstore
|
peerstore peer.Peerstore
|
||||||
|
routing bsnet.Routing
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nc *networkClient) SendMessage(
|
func (nc *networkClient) SendMessage(
|
||||||
@ -167,6 +173,16 @@ func (nc *networkClient) SendRequest(
|
|||||||
return nc.network.SendRequest(ctx, nc.local, to, message)
|
return nc.network.SendRequest(ctx, nc.local, to, message)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FindProvidersAsync returns a channel of providers for the given key
|
||||||
|
func (nc *networkClient) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.PeerInfo { // TODO change to return ID
|
||||||
|
return nc.routing.FindProvidersAsync(ctx, k, max)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Provide provides the key to the network
|
||||||
|
func (nc *networkClient) Provide(ctx context.Context, k util.Key) error {
|
||||||
|
return nc.routing.Provide(ctx, k)
|
||||||
|
}
|
||||||
|
|
||||||
func (nc *networkClient) DialPeer(ctx context.Context, p peer.ID) error {
|
func (nc *networkClient) DialPeer(ctx context.Context, p peer.ID) error {
|
||||||
// no need to do anything because dialing isn't a thing in this test net.
|
// no need to do anything because dialing isn't a thing in this test net.
|
||||||
if !nc.network.HasPeer(p) {
|
if !nc.network.HasPeer(p) {
|
||||||
|
@ -11,10 +11,11 @@ import (
|
|||||||
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
|
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
|
||||||
peer "github.com/jbenet/go-ipfs/peer"
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
delay "github.com/jbenet/go-ipfs/util/delay"
|
delay "github.com/jbenet/go-ipfs/util/delay"
|
||||||
|
mockrouting "github.com/jbenet/go-ipfs/routing/mock"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestSendRequestToCooperativePeer(t *testing.T) {
|
func TestSendRequestToCooperativePeer(t *testing.T) {
|
||||||
net := VirtualNetwork(delay.Fixed(0))
|
net := VirtualNetwork(mockrouting.NewServer(),delay.Fixed(0))
|
||||||
|
|
||||||
idOfRecipient := peer.ID("recipient")
|
idOfRecipient := peer.ID("recipient")
|
||||||
|
|
||||||
@ -65,7 +66,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
|
func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
|
||||||
net := VirtualNetwork(delay.Fixed(0))
|
net := VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0))
|
||||||
idOfResponder := peer.ID("responder")
|
idOfResponder := peer.ID("responder")
|
||||||
waiter := net.Adapter(peer.ID("waiter"))
|
waiter := net.Adapter(peer.ID("waiter"))
|
||||||
responder := net.Adapter(idOfResponder)
|
responder := net.Adapter(idOfResponder)
|
||||||
|
@ -10,18 +10,16 @@ import (
|
|||||||
exchange "github.com/jbenet/go-ipfs/exchange"
|
exchange "github.com/jbenet/go-ipfs/exchange"
|
||||||
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
|
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
|
||||||
peer "github.com/jbenet/go-ipfs/peer"
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
mockrouting "github.com/jbenet/go-ipfs/routing/mock"
|
|
||||||
datastore2 "github.com/jbenet/go-ipfs/util/datastore2"
|
datastore2 "github.com/jbenet/go-ipfs/util/datastore2"
|
||||||
delay "github.com/jbenet/go-ipfs/util/delay"
|
delay "github.com/jbenet/go-ipfs/util/delay"
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewSessionGenerator(
|
func NewSessionGenerator(
|
||||||
net tn.Network, rs mockrouting.Server) SessionGenerator {
|
net tn.Network) SessionGenerator {
|
||||||
ctx, cancel := context.WithCancel(context.TODO())
|
ctx, cancel := context.WithCancel(context.TODO())
|
||||||
return SessionGenerator{
|
return SessionGenerator{
|
||||||
ps: peer.NewPeerstore(),
|
ps: peer.NewPeerstore(),
|
||||||
net: net,
|
net: net,
|
||||||
rs: rs,
|
|
||||||
seq: 0,
|
seq: 0,
|
||||||
ctx: ctx, // TODO take ctx as param to Next, Instances
|
ctx: ctx, // TODO take ctx as param to Next, Instances
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
@ -31,7 +29,6 @@ func NewSessionGenerator(
|
|||||||
type SessionGenerator struct {
|
type SessionGenerator struct {
|
||||||
seq int
|
seq int
|
||||||
net tn.Network
|
net tn.Network
|
||||||
rs mockrouting.Server
|
|
||||||
ps peer.Peerstore
|
ps peer.Peerstore
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
@ -44,7 +41,7 @@ func (g *SessionGenerator) Close() error {
|
|||||||
|
|
||||||
func (g *SessionGenerator) Next() Instance {
|
func (g *SessionGenerator) Next() Instance {
|
||||||
g.seq++
|
g.seq++
|
||||||
return session(g.ctx, g.net, g.rs, g.ps, peer.ID(g.seq))
|
return session(g.ctx, g.net, g.ps, peer.ID(g.seq))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *SessionGenerator) Instances(n int) []Instance {
|
func (g *SessionGenerator) Instances(n int) []Instance {
|
||||||
@ -77,10 +74,9 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration {
|
|||||||
// NB: It's easy make mistakes by providing the same peer ID to two different
|
// NB: It's easy make mistakes by providing the same peer ID to two different
|
||||||
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's
|
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's
|
||||||
// just a much better idea.
|
// just a much better idea.
|
||||||
func session(ctx context.Context, net tn.Network, rs mockrouting.Server, ps peer.Peerstore, p peer.ID) Instance {
|
func session(ctx context.Context, net tn.Network, ps peer.Peerstore, p peer.ID) Instance {
|
||||||
|
|
||||||
adapter := net.Adapter(p)
|
adapter := net.Adapter(p)
|
||||||
htc := rs.Client(peer.PeerInfo{ID: p})
|
|
||||||
|
|
||||||
bsdelay := delay.Fixed(0)
|
bsdelay := delay.Fixed(0)
|
||||||
const kWriteCacheElems = 100
|
const kWriteCacheElems = 100
|
||||||
@ -92,7 +88,7 @@ func session(ctx context.Context, net tn.Network, rs mockrouting.Server, ps peer
|
|||||||
|
|
||||||
const alwaysSendToPeer = true
|
const alwaysSendToPeer = true
|
||||||
|
|
||||||
bs := New(ctx, p, adapter, htc, bstore, alwaysSendToPeer)
|
bs := New(ctx, p, adapter, bstore, alwaysSendToPeer)
|
||||||
|
|
||||||
return Instance{
|
return Instance{
|
||||||
Peer: p,
|
Peer: p,
|
||||||
|
Reference in New Issue
Block a user