mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-29 17:36:38 +08:00
refactor(bitswap) change PeerInfo to ID in bitswap package
@jbenet @whyrusleeping This commit replaces peer.PeerInfo with peer.ID in the bitswap package
This commit is contained in:

committed by
Juan Batiz-Benet

parent
27dc9594ba
commit
c34132e080
@ -164,7 +164,7 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
|
|||||||
return bs.network.Provide(ctx, blk.Key())
|
return bs.network.Provide(ctx, blk.Key())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.PeerInfo) error {
|
func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.ID) error {
|
||||||
if peers == nil {
|
if peers == nil {
|
||||||
panic("Cant send wantlist to nil peerchan")
|
panic("Cant send wantlist to nil peerchan")
|
||||||
}
|
}
|
||||||
@ -174,16 +174,15 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.PeerInf
|
|||||||
}
|
}
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
for peerToQuery := range peers {
|
for peerToQuery := range peers {
|
||||||
log.Event(ctx, "PeerToQuery", peerToQuery.ID)
|
log.Event(ctx, "PeerToQuery", peerToQuery)
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
bs.network.Peerstore().AddAddresses(peerToQuery.ID, peerToQuery.Addrs)
|
|
||||||
go func(p peer.ID) {
|
go func(p peer.ID) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
if err := bs.send(ctx, p, message); err != nil {
|
if err := bs.send(ctx, p, message); err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}(peerToQuery.ID)
|
}(peerToQuery)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
return nil
|
return nil
|
||||||
@ -210,9 +209,8 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wantli
|
|||||||
child, _ := context.WithTimeout(ctx, providerRequestTimeout)
|
child, _ := context.WithTimeout(ctx, providerRequestTimeout)
|
||||||
providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest)
|
providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest)
|
||||||
for prov := range providers {
|
for prov := range providers {
|
||||||
bs.network.Peerstore().AddAddresses(prov.ID, prov.Addrs)
|
if set.TryAdd(prov) { //Do once per peer
|
||||||
if set.TryAdd(prov.ID) { //Do once per peer
|
bs.send(ctx, prov, message)
|
||||||
bs.send(ctx, prov.ID, message)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}(e.Key)
|
}(e.Key)
|
||||||
|
@ -46,7 +46,7 @@ type Receiver interface {
|
|||||||
|
|
||||||
type Routing interface {
|
type Routing interface {
|
||||||
// FindProvidersAsync returns a channel of providers for the given key
|
// FindProvidersAsync returns a channel of providers for the given key
|
||||||
FindProvidersAsync(context.Context, u.Key, int) <-chan peer.PeerInfo
|
FindProvidersAsync(context.Context, u.Key, int) <-chan peer.ID
|
||||||
|
|
||||||
// Provide provides the key to the network
|
// Provide provides the key to the network
|
||||||
Provide(context.Context, u.Key) error
|
Provide(context.Context, u.Key) error
|
||||||
|
@ -2,10 +2,10 @@ package network
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
|
|
||||||
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
|
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
|
||||||
inet "github.com/jbenet/go-ipfs/net"
|
inet "github.com/jbenet/go-ipfs/net"
|
||||||
peer "github.com/jbenet/go-ipfs/peer"
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
|
routing "github.com/jbenet/go-ipfs/routing"
|
||||||
util "github.com/jbenet/go-ipfs/util"
|
util "github.com/jbenet/go-ipfs/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -13,7 +13,7 @@ var log = util.Logger("bitswap_network")
|
|||||||
|
|
||||||
// NewFromIpfsNetwork returns a BitSwapNetwork supported by underlying IPFS
|
// NewFromIpfsNetwork returns a BitSwapNetwork supported by underlying IPFS
|
||||||
// Dialer & Service
|
// Dialer & Service
|
||||||
func NewFromIpfsNetwork(n inet.Network, r Routing) BitSwapNetwork {
|
func NewFromIpfsNetwork(n inet.Network, r routing.IpfsRouting) BitSwapNetwork {
|
||||||
bitswapNetwork := impl{
|
bitswapNetwork := impl{
|
||||||
network: n,
|
network: n,
|
||||||
routing: r,
|
routing: r,
|
||||||
@ -26,7 +26,7 @@ func NewFromIpfsNetwork(n inet.Network, r Routing) BitSwapNetwork {
|
|||||||
// NetMessage objects, into the bitswap network interface.
|
// NetMessage objects, into the bitswap network interface.
|
||||||
type impl struct {
|
type impl struct {
|
||||||
network inet.Network
|
network inet.Network
|
||||||
routing Routing
|
routing routing.IpfsRouting
|
||||||
|
|
||||||
// inbound messages from the network are forwarded to the receiver
|
// inbound messages from the network are forwarded to the receiver
|
||||||
receiver Receiver
|
receiver Receiver
|
||||||
@ -77,8 +77,20 @@ func (bsnet *impl) Peerstore() peer.Peerstore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// FindProvidersAsync returns a channel of providers for the given key
|
// FindProvidersAsync returns a channel of providers for the given key
|
||||||
func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.PeerInfo { // TODO change to return ID
|
func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.ID {
|
||||||
return bsnet.routing.FindProvidersAsync(ctx, k, max)
|
out := make(chan peer.ID)
|
||||||
|
go func() {
|
||||||
|
defer close(out)
|
||||||
|
providers := bsnet.routing.FindProvidersAsync(ctx, k, max)
|
||||||
|
for info := range providers {
|
||||||
|
bsnet.network.Peerstore().AddAddresses(info.ID, info.Addrs)
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
case out <- info.ID:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
// Provide provides the key to the network
|
// Provide provides the key to the network
|
||||||
|
@ -5,6 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
|
"github.com/jbenet/go-ipfs/routing"
|
||||||
"github.com/jbenet/go-ipfs/routing/mock"
|
"github.com/jbenet/go-ipfs/routing/mock"
|
||||||
|
|
||||||
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
|
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
|
||||||
@ -156,7 +157,7 @@ type networkClient struct {
|
|||||||
bsnet.Receiver
|
bsnet.Receiver
|
||||||
network Network
|
network Network
|
||||||
peerstore peer.Peerstore
|
peerstore peer.Peerstore
|
||||||
routing bsnet.Routing
|
routing routing.IpfsRouting
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nc *networkClient) SendMessage(
|
func (nc *networkClient) SendMessage(
|
||||||
@ -174,8 +175,26 @@ func (nc *networkClient) SendRequest(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// FindProvidersAsync returns a channel of providers for the given key
|
// FindProvidersAsync returns a channel of providers for the given key
|
||||||
func (nc *networkClient) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.PeerInfo { // TODO change to return ID
|
func (nc *networkClient) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.ID {
|
||||||
return nc.routing.FindProvidersAsync(ctx, k, max)
|
|
||||||
|
// NB: this function duplicates the PeerInfo -> ID transformation in the
|
||||||
|
// bitswap network adapter. Not to worry. This network client will be
|
||||||
|
// deprecated once the ipfsnet.Mock is added. The code below is only
|
||||||
|
// temporary.
|
||||||
|
|
||||||
|
out := make(chan peer.ID)
|
||||||
|
go func() {
|
||||||
|
defer close(out)
|
||||||
|
providers := nc.routing.FindProvidersAsync(ctx, k, max)
|
||||||
|
for info := range providers {
|
||||||
|
nc.peerstore.AddAddresses(info.ID, info.Addrs)
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
case out <- info.ID:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
// Provide provides the key to the network
|
// Provide provides the key to the network
|
||||||
|
Reference in New Issue
Block a user