diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 58cdb54a5..58c7a3584 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -164,7 +164,7 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { return bs.network.Provide(ctx, blk.Key()) } -func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.PeerInfo) error { +func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.ID) error { if peers == nil { panic("Cant send wantlist to nil peerchan") } @@ -174,16 +174,15 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.PeerInf } wg := sync.WaitGroup{} for peerToQuery := range peers { - log.Event(ctx, "PeerToQuery", peerToQuery.ID) + log.Event(ctx, "PeerToQuery", peerToQuery) wg.Add(1) - bs.network.Peerstore().AddAddresses(peerToQuery.ID, peerToQuery.Addrs) go func(p peer.ID) { defer wg.Done() if err := bs.send(ctx, p, message); err != nil { log.Error(err) return } - }(peerToQuery.ID) + }(peerToQuery) } wg.Wait() return nil @@ -210,9 +209,8 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wantli child, _ := context.WithTimeout(ctx, providerRequestTimeout) providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest) for prov := range providers { - bs.network.Peerstore().AddAddresses(prov.ID, prov.Addrs) - if set.TryAdd(prov.ID) { //Do once per peer - bs.send(ctx, prov.ID, message) + if set.TryAdd(prov) { //Do once per peer + bs.send(ctx, prov, message) } } }(e.Key) diff --git a/exchange/bitswap/network/interface.go b/exchange/bitswap/network/interface.go index 3bf5eb0f6..08e65cf10 100644 --- a/exchange/bitswap/network/interface.go +++ b/exchange/bitswap/network/interface.go @@ -46,7 +46,7 @@ type Receiver interface { type Routing interface { // FindProvidersAsync returns a channel of providers for the given key - FindProvidersAsync(context.Context, u.Key, int) <-chan peer.PeerInfo + FindProvidersAsync(context.Context, u.Key, int) <-chan peer.ID // Provide provides the key to the network Provide(context.Context, u.Key) error diff --git a/exchange/bitswap/network/ipfs_impl.go b/exchange/bitswap/network/ipfs_impl.go index 4258579eb..6205e9c29 100644 --- a/exchange/bitswap/network/ipfs_impl.go +++ b/exchange/bitswap/network/ipfs_impl.go @@ -2,10 +2,10 @@ package network import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" inet "github.com/jbenet/go-ipfs/net" peer "github.com/jbenet/go-ipfs/peer" + routing "github.com/jbenet/go-ipfs/routing" util "github.com/jbenet/go-ipfs/util" ) @@ -13,7 +13,7 @@ var log = util.Logger("bitswap_network") // NewFromIpfsNetwork returns a BitSwapNetwork supported by underlying IPFS // Dialer & Service -func NewFromIpfsNetwork(n inet.Network, r Routing) BitSwapNetwork { +func NewFromIpfsNetwork(n inet.Network, r routing.IpfsRouting) BitSwapNetwork { bitswapNetwork := impl{ network: n, routing: r, @@ -26,7 +26,7 @@ func NewFromIpfsNetwork(n inet.Network, r Routing) BitSwapNetwork { // NetMessage objects, into the bitswap network interface. type impl struct { network inet.Network - routing Routing + routing routing.IpfsRouting // inbound messages from the network are forwarded to the receiver receiver Receiver @@ -77,8 +77,20 @@ func (bsnet *impl) Peerstore() peer.Peerstore { } // FindProvidersAsync returns a channel of providers for the given key -func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.PeerInfo { // TODO change to return ID - return bsnet.routing.FindProvidersAsync(ctx, k, max) +func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.ID { + out := make(chan peer.ID) + go func() { + defer close(out) + providers := bsnet.routing.FindProvidersAsync(ctx, k, max) + for info := range providers { + bsnet.network.Peerstore().AddAddresses(info.ID, info.Addrs) + select { + case <-ctx.Done(): + case out <- info.ID: + } + } + }() + return out } // Provide provides the key to the network diff --git a/exchange/bitswap/testnet/network.go b/exchange/bitswap/testnet/network.go index 08c30a7d4..0461508ea 100644 --- a/exchange/bitswap/testnet/network.go +++ b/exchange/bitswap/testnet/network.go @@ -5,6 +5,7 @@ import ( "fmt" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + "github.com/jbenet/go-ipfs/routing" "github.com/jbenet/go-ipfs/routing/mock" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" @@ -37,8 +38,8 @@ type Network interface { func VirtualNetwork(rs mockrouting.Server, d delay.D) Network { return &network{ - clients: make(map[peer.ID]bsnet.Receiver), - delay: d, + clients: make(map[peer.ID]bsnet.Receiver), + delay: d, routingserver: rs, } } @@ -156,7 +157,7 @@ type networkClient struct { bsnet.Receiver network Network peerstore peer.Peerstore - routing bsnet.Routing + routing routing.IpfsRouting } func (nc *networkClient) SendMessage( @@ -174,8 +175,26 @@ func (nc *networkClient) SendRequest( } // FindProvidersAsync returns a channel of providers for the given key -func (nc *networkClient) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.PeerInfo { // TODO change to return ID - return nc.routing.FindProvidersAsync(ctx, k, max) +func (nc *networkClient) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.ID { + + // NB: this function duplicates the PeerInfo -> ID transformation in the + // bitswap network adapter. Not to worry. This network client will be + // deprecated once the ipfsnet.Mock is added. The code below is only + // temporary. + + out := make(chan peer.ID) + go func() { + defer close(out) + providers := nc.routing.FindProvidersAsync(ctx, k, max) + for info := range providers { + nc.peerstore.AddAddresses(info.ID, info.Addrs) + select { + case <-ctx.Done(): + case out <- info.ID: + } + } + }() + return out } // Provide provides the key to the network