mirror of
https://github.com/ipfs/kubo.git
synced 2025-09-10 14:34:24 +08:00
dht/pb: changed PeersToPBPeers to set ConnectionType
Uses an inet.Dialer
This commit is contained in:
@ -88,12 +88,12 @@ const (
|
||||
NotConnected Connectedness = 0
|
||||
|
||||
// Connected means has an open, live connection to peer
|
||||
Connected
|
||||
Connected = 1
|
||||
|
||||
// CanConnect means recently connected to peer, terminated gracefully
|
||||
CanConnect
|
||||
CanConnect = 2
|
||||
|
||||
// CannotConnect means recently attempted connecting but failed to connect.
|
||||
// (should signal "made effort, failed")
|
||||
CannotConnect
|
||||
CannotConnect = 3
|
||||
)
|
||||
|
@ -227,7 +227,7 @@ func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.Peer, key string) er
|
||||
pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, string(key), 0)
|
||||
|
||||
// add self as the provider
|
||||
pmes.ProviderPeers = pb.PeersToPBPeers([]peer.Peer{dht.self})
|
||||
pmes.ProviderPeers = pb.PeersToPBPeers(dht.dialer, []peer.Peer{dht.self})
|
||||
|
||||
rpmes, err := dht.sendRequest(ctx, p, pmes)
|
||||
if err != nil {
|
||||
|
@ -262,7 +262,7 @@ func TestNotFound(t *testing.T) {
|
||||
for i := 0; i < 7; i++ {
|
||||
peers = append(peers, _randPeer())
|
||||
}
|
||||
resp.CloserPeers = pb.PeersToPBPeers(peers)
|
||||
resp.CloserPeers = pb.PeersToPBPeers(d.dialer, peers)
|
||||
mes, err := msg.FromObject(mes.Peer(), resp)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
@ -326,7 +326,7 @@ func TestLessThanKResponses(t *testing.T) {
|
||||
case pb.Message_GET_VALUE:
|
||||
resp := &pb.Message{
|
||||
Type: pmes.Type,
|
||||
CloserPeers: pb.PeersToPBPeers([]peer.Peer{other}),
|
||||
CloserPeers: pb.PeersToPBPeers(d.dialer, []peer.Peer{other}),
|
||||
}
|
||||
|
||||
mes, err := msg.FromObject(mes.Peer(), resp)
|
||||
|
@ -87,7 +87,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.Peer, pmes *pb.Me
|
||||
provs := dht.providers.GetProviders(ctx, u.Key(pmes.GetKey()))
|
||||
if len(provs) > 0 {
|
||||
log.Debugf("handleGetValue returning %d provider[s]", len(provs))
|
||||
resp.ProviderPeers = pb.PeersToPBPeers(provs)
|
||||
resp.ProviderPeers = pb.PeersToPBPeers(dht.dialer, provs)
|
||||
}
|
||||
|
||||
// Find closest peer on given cluster to desired key and reply with that info
|
||||
@ -99,7 +99,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.Peer, pmes *pb.Me
|
||||
log.Critical("no addresses on peer being sent!")
|
||||
}
|
||||
}
|
||||
resp.CloserPeers = pb.PeersToPBPeers(closer)
|
||||
resp.CloserPeers = pb.PeersToPBPeers(dht.dialer, closer)
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
@ -159,7 +159,7 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.Peer, pmes *pb.Me
|
||||
for _, p := range withAddresses {
|
||||
log.Debugf("handleFindPeer: sending back '%s'", p)
|
||||
}
|
||||
resp.CloserPeers = pb.PeersToPBPeers(withAddresses)
|
||||
resp.CloserPeers = pb.PeersToPBPeers(dht.dialer, withAddresses)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
@ -183,13 +183,13 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.Peer, pmes *p
|
||||
|
||||
// if we've got providers, send thos those.
|
||||
if providers != nil && len(providers) > 0 {
|
||||
resp.ProviderPeers = pb.PeersToPBPeers(providers)
|
||||
resp.ProviderPeers = pb.PeersToPBPeers(dht.dialer, providers)
|
||||
}
|
||||
|
||||
// Also send closer peers.
|
||||
closer := dht.betterPeersToQuery(pmes, CloserPeerCount)
|
||||
if closer != nil {
|
||||
resp.CloserPeers = pb.PeersToPBPeers(closer)
|
||||
resp.CloserPeers = pb.PeersToPBPeers(dht.dialer, closer)
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
|
@ -4,9 +4,12 @@ import (
|
||||
"errors"
|
||||
|
||||
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
|
||||
inet "github.com/jbenet/go-ipfs/net"
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
)
|
||||
|
||||
// NewMessage constructs a new dht message with given type, key, and level
|
||||
func NewMessage(typ Message_MessageType, key string, level int) *Message {
|
||||
m := &Message{
|
||||
Type: &typ,
|
||||
@ -29,9 +32,9 @@ func peerToPBPeer(p peer.Peer) *Message_Peer {
|
||||
return pbp
|
||||
}
|
||||
|
||||
// PeersToPBPeers converts a slice of Peers into a slice of *Message_Peers,
|
||||
// RawPeersToPBPeers converts a slice of Peers into a slice of *Message_Peers,
|
||||
// ready to go out on the wire.
|
||||
func PeersToPBPeers(peers []peer.Peer) []*Message_Peer {
|
||||
func RawPeersToPBPeers(peers []peer.Peer) []*Message_Peer {
|
||||
pbpeers := make([]*Message_Peer, len(peers))
|
||||
for i, p := range peers {
|
||||
pbpeers[i] = peerToPBPeer(p)
|
||||
@ -39,6 +42,19 @@ func PeersToPBPeers(peers []peer.Peer) []*Message_Peer {
|
||||
return pbpeers
|
||||
}
|
||||
|
||||
// PeersToPBPeers converts given []peer.Peer into a set of []*Message_Peer,
|
||||
// which can be written to a message and sent out. the key thing this function
|
||||
// does (in addition to PeersToPBPeers) is set the ConnectionType with
|
||||
// information from the given inet.Dialer.
|
||||
func PeersToPBPeers(d inet.Dialer, peers []peer.Peer) []*Message_Peer {
|
||||
pbps := RawPeersToPBPeers(peers)
|
||||
for i, pbp := range pbps {
|
||||
c := ConnectionType(d.Connectedness(peers[i]))
|
||||
pbp.Connection = &c
|
||||
}
|
||||
return pbps
|
||||
}
|
||||
|
||||
// Addresses returns a multiaddr associated with the Message_Peer entry
|
||||
func (m *Message_Peer) Addresses() ([]ma.Multiaddr, error) {
|
||||
if m == nil {
|
||||
@ -75,6 +91,7 @@ func (m *Message) SetClusterLevel(level int) {
|
||||
m.ClusterLevelRaw = &lvl
|
||||
}
|
||||
|
||||
// Loggable turns a Message into machine-readable log output
|
||||
func (m *Message) Loggable() map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"message": map[string]string{
|
||||
@ -82,3 +99,37 @@ func (m *Message) Loggable() map[string]interface{} {
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// ConnectionType returns a Message_ConnectionType associated with the
|
||||
// inet.Connectedness.
|
||||
func ConnectionType(c inet.Connectedness) Message_ConnectionType {
|
||||
switch c {
|
||||
default:
|
||||
return Message_NOT_CONNECTED
|
||||
case inet.NotConnected:
|
||||
return Message_NOT_CONNECTED
|
||||
case inet.Connected:
|
||||
return Message_CONNECTED
|
||||
case inet.CanConnect:
|
||||
return Message_CAN_CONNECT
|
||||
case inet.CannotConnect:
|
||||
return Message_CANNOT_CONNECT
|
||||
}
|
||||
}
|
||||
|
||||
// Connectedness returns an inet.Connectedness associated with the
|
||||
// Message_ConnectionType.
|
||||
func Connectedness(c Message_ConnectionType) inet.Connectedness {
|
||||
switch c {
|
||||
default:
|
||||
return inet.NotConnected
|
||||
case Message_NOT_CONNECTED:
|
||||
return inet.NotConnected
|
||||
case Message_CONNECTED:
|
||||
return inet.Connected
|
||||
case Message_CAN_CONNECT:
|
||||
return inet.CanConnect
|
||||
case Message_CANNOT_CONNECT:
|
||||
return inet.CannotConnect
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user