1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-09-10 11:52:21 +08:00

peer.Peer is now an interface

![](http://m.memegen.com/77n7dk.jpg)
This commit is contained in:
Juan Batiz-Benet
2014-10-20 03:26:44 -07:00
parent b1d11ccfcb
commit 9ca87fbb93
65 changed files with 538 additions and 459 deletions

View File

@ -35,7 +35,7 @@ func Publish(n *core.IpfsNode, args []string, opts map[string]interface{}, out i
}
// later, n.Keychain.Get(name).PrivKey
k := n.Identity.PrivKey
k := n.Identity.PrivKey()
pub := nsys.NewRoutingPublisher(n.Routing)
err := pub.Publish(k, ref)

View File

@ -19,7 +19,7 @@ func Resolve(n *core.IpfsNode, args []string, opts map[string]interface{}, out i
if n.Identity == nil {
return errors.New("Identity not loaded!")
}
name = n.Identity.ID.String()
name = n.Identity.ID().String()
default:
return fmt.Errorf("Publish expects 1 or 2 args; got %d.", len(args))

View File

@ -36,7 +36,7 @@ type IpfsNode struct {
Config *config.Config
// the local node's identity
Identity *peer.Peer
Identity peer.Peer
// storage for other Peer instances
Peerstore peer.Peerstore
@ -177,7 +177,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) {
}, nil
}
func initIdentity(cfg *config.Config, peers peer.Peerstore, online bool) (*peer.Peer, error) {
func initIdentity(cfg *config.Config, peers peer.Peerstore, online bool) (peer.Peer, error) {
if cfg.Identity.PeerID == "" {
return nil, errors.New("Identity was not set in config (was ipfs init run?)")
}

View File

@ -8,7 +8,7 @@ import (
mdag "github.com/jbenet/go-ipfs/merkledag"
nsys "github.com/jbenet/go-ipfs/namesys"
path "github.com/jbenet/go-ipfs/path"
"github.com/jbenet/go-ipfs/peer"
peer "github.com/jbenet/go-ipfs/peer"
mdht "github.com/jbenet/go-ipfs/routing/mock"
)
@ -16,20 +16,18 @@ import (
func NewMockNode() (*IpfsNode, error) {
nd := new(IpfsNode)
//Generate Identity
nd.Peerstore = peer.NewPeerstore()
var err error
nd.Identity, err = nd.Peerstore.Get(peer.ID("TESTING"))
// Generate Identity
sk, pk, err := ci.GenerateKeyPair(ci.RSA, 1024)
if err != nil {
return nil, err
}
pk, sk, err := ci.GenerateKeyPair(ci.RSA, 1024)
nd.Identity, err = peer.WithKeyPair(sk, pk)
if err != nil {
return nil, err
}
nd.Identity.PrivKey = pk
nd.Identity.PubKey = sk
nd.Peerstore = peer.NewPeerstore()
nd.Peerstore.Put(nd.Identity)
// Temp Datastore
dstore := ds.NewMapDatastore()

View File

@ -53,7 +53,7 @@ func (s *SecurePipe) handshake() error {
}
log.Debug("handshake: %s <--> %s", s.local, s.remote)
myPubKey, err := s.local.PubKey.Bytes()
myPubKey, err := s.local.PubKey().Bytes()
if err != nil {
return err
}
@ -132,7 +132,7 @@ func (s *SecurePipe) handshake() error {
exPacket := new(Exchange)
exPacket.Epubkey = epubkey
exPacket.Signature, err = s.local.PrivKey.Sign(handshake.Bytes())
exPacket.Signature, err = s.local.PrivKey().Sign(handshake.Bytes())
if err != nil {
return err
}
@ -167,7 +167,7 @@ func (s *SecurePipe) handshake() error {
theirHandshake.Write(exchangeResp.GetEpubkey())
// u.POut("Remote Peer Identified as %s\n", s.remote)
ok, err := s.remote.PubKey.Verify(theirHandshake.Bytes(), exchangeResp.GetSignature())
ok, err := s.remote.PubKey().Verify(theirHandshake.Bytes(), exchangeResp.GetSignature())
if err != nil {
return err
}
@ -340,7 +340,7 @@ func selectBest(myPrefs, theirPrefs string) (string, error) {
// getOrConstructPeer attempts to fetch a peer from a peerstore.
// if succeeds, verify ID and PubKey match.
// else, construct it.
func getOrConstructPeer(peers peer.Peerstore, rpk ci.PubKey) (*peer.Peer, error) {
func getOrConstructPeer(peers peer.Peerstore, rpk ci.PubKey) (peer.Peer, error) {
rid, err := peer.IDFromPubKey(rpk)
if err != nil {

View File

@ -18,8 +18,8 @@ type SecurePipe struct {
Duplex
insecure Duplex
local *peer.Peer
remote *peer.Peer
local peer.Peer
remote peer.Peer
peers peer.Peerstore
params params
@ -33,7 +33,7 @@ type params struct {
}
// NewSecurePipe constructs a pipe with channels of a given buffer size.
func NewSecurePipe(ctx context.Context, bufsize int, local *peer.Peer,
func NewSecurePipe(ctx context.Context, bufsize int, local peer.Peer,
peers peer.Peerstore, insecure Duplex) (*SecurePipe, error) {
ctx, cancel := context.WithCancel(ctx)
@ -60,12 +60,12 @@ func NewSecurePipe(ctx context.Context, bufsize int, local *peer.Peer,
}
// LocalPeer retrieves the local peer.
func (s *SecurePipe) LocalPeer() *peer.Peer {
func (s *SecurePipe) LocalPeer() peer.Peer {
return s.local
}
// RemotePeer retrieves the local peer.
func (s *SecurePipe) RemotePeer() *peer.Peer {
func (s *SecurePipe) RemotePeer() peer.Peer {
return s.remote
}

View File

@ -26,14 +26,14 @@ const ResponseTimeout = time.Second * 10
type Diagnostics struct {
network net.Network
sender net.Sender
self *peer.Peer
self peer.Peer
diagLock sync.Mutex
diagMap map[string]time.Time
birth time.Time
}
func NewDiagnostics(self *peer.Peer, inet net.Network, sender net.Sender) *Diagnostics {
func NewDiagnostics(self peer.Peer, inet net.Network, sender net.Sender) *Diagnostics {
return &Diagnostics{
network: inet,
sender: sender,
@ -67,20 +67,21 @@ func (di *DiagInfo) Marshal() []byte {
return b
}
func (d *Diagnostics) getPeers() []*peer.Peer {
func (d *Diagnostics) getPeers() []peer.Peer {
return d.network.GetPeerList()
}
func (d *Diagnostics) getDiagInfo() *DiagInfo {
di := new(DiagInfo)
di.CodeVersion = "github.com/jbenet/go-ipfs"
di.ID = d.self.ID.Pretty()
di.ID = d.self.ID().Pretty()
di.LifeSpan = time.Since(d.birth)
di.Keys = nil // Currently no way to query datastore
di.BwIn, di.BwOut = d.network.GetBandwidthTotals()
for _, p := range d.getPeers() {
di.Connections = append(di.Connections, connDiagInfo{p.GetLatency(), p.ID.Pretty()})
d := connDiagInfo{p.GetLatency(), p.ID().Pretty()}
di.Connections = append(di.Connections, d)
}
return di
}
@ -116,7 +117,7 @@ func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*DiagInfo, error)
for _, p := range peers {
log.Debug("Sending getDiagnostic to: %s", p)
sends++
go func(p *peer.Peer) {
go func(p peer.Peer) {
data, err := d.getDiagnosticFromPeer(ctx, p, pmes)
if err != nil {
log.Error("GetDiagnostic error: %v", err)
@ -155,7 +156,7 @@ func AppendDiagnostics(data []byte, cur []*DiagInfo) []*DiagInfo {
}
// TODO: this method no longer needed.
func (d *Diagnostics) getDiagnosticFromPeer(ctx context.Context, p *peer.Peer, mes *Message) ([]byte, error) {
func (d *Diagnostics) getDiagnosticFromPeer(ctx context.Context, p peer.Peer, mes *Message) ([]byte, error) {
rpmes, err := d.sendRequest(ctx, p, mes)
if err != nil {
return nil, err
@ -169,7 +170,7 @@ func newMessage(diagID string) *Message {
return pmes
}
func (d *Diagnostics) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message) (*Message, error) {
func (d *Diagnostics) sendRequest(ctx context.Context, p peer.Peer, pmes *Message) (*Message, error) {
mes, err := msg.FromObject(p, pmes)
if err != nil {
@ -197,7 +198,7 @@ func (d *Diagnostics) sendRequest(ctx context.Context, p *peer.Peer, pmes *Messa
return rpmes, nil
}
func (d *Diagnostics) handleDiagnostic(p *peer.Peer, pmes *Message) (*Message, error) {
func (d *Diagnostics) handleDiagnostic(p peer.Peer, pmes *Message) (*Message, error) {
log.Debug("HandleDiagnostic from %s for id = %s", p, pmes.GetDiagID())
resp := newMessage(pmes.GetDiagID())
d.diagLock.Lock()
@ -220,7 +221,7 @@ func (d *Diagnostics) handleDiagnostic(p *peer.Peer, pmes *Message) (*Message, e
for _, p := range d.getPeers() {
log.Debug("Sending diagnostic request to peer: %s", p)
sendcount++
go func(p *peer.Peer) {
go func(p peer.Peer) {
out, err := d.getDiagnosticFromPeer(ctx, p, pmes)
if err != nil {
log.Error("getDiagnostic error: %v", err)
@ -267,7 +268,7 @@ func (d *Diagnostics) HandleMessage(ctx context.Context, mes msg.NetMessage) msg
// Print out diagnostic
log.Info("[peer: %s] Got message from [%s]\n",
d.self.ID.Pretty(), mPeer.ID.Pretty())
d.self.ID().Pretty(), mPeer.ID().Pretty())
// dispatch handler.
rpmes, err := d.handleDiagnostic(mPeer, pmes)

View File

@ -20,7 +20,7 @@ var log = u.Logger("bitswap")
// NetMessageSession initializes a BitSwap session that communicates over the
// provided NetMessage service
func NetMessageSession(parent context.Context, p *peer.Peer,
func NetMessageSession(parent context.Context, p peer.Peer,
net inet.Network, srv inet.Service, directory bsnet.Routing,
d ds.Datastore, nice bool) exchange.Interface {
@ -83,7 +83,7 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
message.AppendWanted(k)
for peerToQuery := range peersToQuery {
log.Debug("bitswap got peersToQuery: %s", peerToQuery)
go func(p *peer.Peer) {
go func(p peer.Peer) {
log.Debug("bitswap dialing peer: %s", p)
err := bs.sender.DialPeer(p)
@ -131,8 +131,8 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
}
// TODO(brian): handle errors
func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage) {
func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) (
peer.Peer, bsmsg.BitSwapMessage) {
log.Debug("ReceiveMessage from %v", p.Key())
if p == nil {
@ -181,7 +181,7 @@ func (bs *bitswap) ReceiveError(err error) {
// send strives to ensure that accounting is always performed when a message is
// sent
func (bs *bitswap) send(ctx context.Context, p *peer.Peer, m bsmsg.BitSwapMessage) {
func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage) {
bs.sender.SendMessage(ctx, p, m)
go bs.strategy.MessageSent(p, m)
}

View File

@ -44,7 +44,7 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) {
g := NewSessionGenerator(net, rs)
block := blocks.NewBlock([]byte("block"))
rs.Announce(&peer.Peer{}, block.Key()) // but not on network
rs.Announce(peer.WithIDString("testing"), block.Key()) // but not on network
solo := g.Next()
@ -263,7 +263,7 @@ func (g *SessionGenerator) Instances(n int) []instance {
}
type instance struct {
peer *peer.Peer
peer peer.Peer
exchange exchange.Interface
blockstore bstore.Blockstore
}
@ -274,7 +274,7 @@ type instance struct {
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's
// just a much better idea.
func session(net tn.Network, rs mock.RoutingServer, id peer.ID) instance {
p := &peer.Peer{ID: id}
p := peer.WithID(id)
adapter := net.Adapter(p)
htc := rs.Client(p)

View File

@ -19,7 +19,7 @@ type BitSwapMessage interface {
type Exportable interface {
ToProto() *PBMessage
ToNet(p *peer.Peer) (nm.NetMessage, error)
ToNet(p peer.Peer) (nm.NetMessage, error)
}
// message wraps a proto message for convenience
@ -82,6 +82,6 @@ func (m *message) ToProto() *PBMessage {
return pb
}
func (m *message) ToNet(p *peer.Peer) (nm.NetMessage, error) {
func (m *message) ToNet(p peer.Peer) (nm.NetMessage, error) {
return nm.FromObject(p, m.ToProto())
}

View File

@ -88,7 +88,7 @@ func TestCopyProtoByValue(t *testing.T) {
func TestToNetMethodSetsPeer(t *testing.T) {
m := New()
p := &peer.Peer{ID: []byte("X")}
p := peer.WithIDString("X")
netmsg, err := m.ToNet(p)
if err != nil {
t.Fatal(err)
@ -106,7 +106,8 @@ func TestToNetFromNetPreservesWantList(t *testing.T) {
original.AppendWanted(u.Key("T"))
original.AppendWanted(u.Key("F"))
netmsg, err := original.ToNet(&peer.Peer{ID: []byte("X")})
p := peer.WithIDString("X")
netmsg, err := original.ToNet(p)
if err != nil {
t.Fatal(err)
}
@ -136,7 +137,7 @@ func TestToAndFromNetMessage(t *testing.T) {
original.AppendBlock(*blocks.NewBlock([]byte("F")))
original.AppendBlock(*blocks.NewBlock([]byte("M")))
p := &peer.Peer{ID: []byte("X")}
p := peer.WithIDString("X")
netmsg, err := original.ToNet(p)
if err != nil {
t.Fatal(err)

View File

@ -12,18 +12,18 @@ import (
type Adapter interface {
// DialPeer ensures there is a connection to peer.
DialPeer(*peer.Peer) error
DialPeer(peer.Peer) error
// SendMessage sends a BitSwap message to a peer.
SendMessage(
context.Context,
*peer.Peer,
peer.Peer,
bsmsg.BitSwapMessage) error
// SendRequest sends a BitSwap message to a peer and waits for a response.
SendRequest(
context.Context,
*peer.Peer,
peer.Peer,
bsmsg.BitSwapMessage) (incoming bsmsg.BitSwapMessage, err error)
// SetDelegate registers the Reciver to handle messages received from the
@ -33,8 +33,8 @@ type Adapter interface {
type Receiver interface {
ReceiveMessage(
ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) (
destination *peer.Peer, outgoing bsmsg.BitSwapMessage)
ctx context.Context, sender peer.Peer, incoming bsmsg.BitSwapMessage) (
destination peer.Peer, outgoing bsmsg.BitSwapMessage)
ReceiveError(error)
}
@ -42,7 +42,7 @@ type Receiver interface {
// TODO rename -> Router?
type Routing interface {
// FindProvidersAsync returns a channel of providers for the given key
FindProvidersAsync(context.Context, u.Key, int) <-chan *peer.Peer
FindProvidersAsync(context.Context, u.Key, int) <-chan peer.Peer
// Provide provides the key to the network
Provide(context.Context, u.Key) error

View File

@ -60,13 +60,13 @@ func (adapter *impl) HandleMessage(
return outgoing
}
func (adapter *impl) DialPeer(p *peer.Peer) error {
func (adapter *impl) DialPeer(p peer.Peer) error {
return adapter.net.DialPeer(p)
}
func (adapter *impl) SendMessage(
ctx context.Context,
p *peer.Peer,
p peer.Peer,
outgoing bsmsg.BitSwapMessage) error {
nmsg, err := outgoing.ToNet(p)
@ -78,7 +78,7 @@ func (adapter *impl) SendMessage(
func (adapter *impl) SendRequest(
ctx context.Context,
p *peer.Peer,
p peer.Peer,
outgoing bsmsg.BitSwapMessage) (bsmsg.BitSwapMessage, error) {
outgoingMsg, err := outgoing.ToNet(p)

View File

@ -8,25 +8,25 @@ import (
type Strategy interface {
// Returns a slice of Peers with whom the local node has active sessions
Peers() []*peer.Peer
Peers() []peer.Peer
// BlockIsWantedByPeer returns true if peer wants the block given by this
// key
BlockIsWantedByPeer(u.Key, *peer.Peer) bool
BlockIsWantedByPeer(u.Key, peer.Peer) bool
// ShouldSendTo(Peer) decides whether to send data to this Peer
ShouldSendBlockToPeer(u.Key, *peer.Peer) bool
ShouldSendBlockToPeer(u.Key, peer.Peer) bool
// Seed initializes the decider to a deterministic state
Seed(int64)
// MessageReceived records receipt of message for accounting purposes
MessageReceived(*peer.Peer, bsmsg.BitSwapMessage) error
MessageReceived(peer.Peer, bsmsg.BitSwapMessage) error
// MessageSent records sending of message for accounting purposes
MessageSent(*peer.Peer, bsmsg.BitSwapMessage) error
MessageSent(peer.Peer, bsmsg.BitSwapMessage) error
NumBytesSentTo(*peer.Peer) uint64
NumBytesSentTo(peer.Peer) uint64
NumBytesReceivedFrom(*peer.Peer) uint64
NumBytesReceivedFrom(peer.Peer) uint64
}

View File

@ -12,7 +12,7 @@ import (
// access/lookups.
type keySet map[u.Key]struct{}
func newLedger(p *peer.Peer, strategy strategyFunc) *ledger {
func newLedger(p peer.Peer, strategy strategyFunc) *ledger {
return &ledger{
wantList: keySet{},
Strategy: strategy,
@ -25,7 +25,7 @@ type ledger struct {
lock sync.RWMutex
// Partner is the remote Peer.
Partner *peer.Peer
Partner peer.Peer
// Accounting tracks bytes sent and recieved.
Accounting debtRatio

View File

@ -37,20 +37,20 @@ type ledgerMap map[peerKey]*ledger
type peerKey u.Key
// Peers returns a list of peers
func (s *strategist) Peers() []*peer.Peer {
response := make([]*peer.Peer, 0)
func (s *strategist) Peers() []peer.Peer {
response := make([]peer.Peer, 0)
for _, ledger := range s.ledgerMap {
response = append(response, ledger.Partner)
}
return response
}
func (s *strategist) BlockIsWantedByPeer(k u.Key, p *peer.Peer) bool {
func (s *strategist) BlockIsWantedByPeer(k u.Key, p peer.Peer) bool {
ledger := s.ledger(p)
return ledger.WantListContains(k)
}
func (s *strategist) ShouldSendBlockToPeer(k u.Key, p *peer.Peer) bool {
func (s *strategist) ShouldSendBlockToPeer(k u.Key, p peer.Peer) bool {
ledger := s.ledger(p)
return ledger.ShouldSend()
}
@ -59,7 +59,7 @@ func (s *strategist) Seed(int64) {
// TODO
}
func (s *strategist) MessageReceived(p *peer.Peer, m bsmsg.BitSwapMessage) error {
func (s *strategist) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error {
// TODO find a more elegant way to handle this check
if p == nil {
return errors.New("Strategy received nil peer")
@ -84,7 +84,7 @@ func (s *strategist) MessageReceived(p *peer.Peer, m bsmsg.BitSwapMessage) error
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically
func (s *strategist) MessageSent(p *peer.Peer, m bsmsg.BitSwapMessage) error {
func (s *strategist) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error {
l := s.ledger(p)
for _, block := range m.Blocks() {
l.SentBytes(len(block.Data))
@ -95,16 +95,16 @@ func (s *strategist) MessageSent(p *peer.Peer, m bsmsg.BitSwapMessage) error {
return nil
}
func (s *strategist) NumBytesSentTo(p *peer.Peer) uint64 {
func (s *strategist) NumBytesSentTo(p peer.Peer) uint64 {
return s.ledger(p).Accounting.BytesSent
}
func (s *strategist) NumBytesReceivedFrom(p *peer.Peer) uint64 {
func (s *strategist) NumBytesReceivedFrom(p peer.Peer) uint64 {
return s.ledger(p).Accounting.BytesRecv
}
// ledger lazily instantiates a ledger
func (s *strategist) ledger(p *peer.Peer) *ledger {
func (s *strategist) ledger(p peer.Peer) *ledger {
l, ok := s.ledgerMap[peerKey(p.Key())]
if !ok {
l = newLedger(p, s.strategyFunc)

View File

@ -10,13 +10,13 @@ import (
)
type peerAndStrategist struct {
*peer.Peer
peer.Peer
Strategy
}
func newPeerAndStrategist(idStr string) peerAndStrategist {
return peerAndStrategist{
Peer: &peer.Peer{ID: peer.ID(idStr)},
Peer: peer.WithIDString(idStr),
Strategy: New(true),
}
}
@ -93,7 +93,7 @@ func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) {
}
}
func peerIsPartner(p *peer.Peer, s Strategy) bool {
func peerIsPartner(p peer.Peer, s Strategy) bool {
for _, partner := range s.Peers() {
if partner.Key() == p.Key() {
return true

View File

@ -13,20 +13,20 @@ import (
)
type Network interface {
Adapter(*peer.Peer) bsnet.Adapter
Adapter(peer.Peer) bsnet.Adapter
HasPeer(*peer.Peer) bool
HasPeer(peer.Peer) bool
SendMessage(
ctx context.Context,
from *peer.Peer,
to *peer.Peer,
from peer.Peer,
to peer.Peer,
message bsmsg.BitSwapMessage) error
SendRequest(
ctx context.Context,
from *peer.Peer,
to *peer.Peer,
from peer.Peer,
to peer.Peer,
message bsmsg.BitSwapMessage) (
incoming bsmsg.BitSwapMessage, err error)
}
@ -43,7 +43,7 @@ type network struct {
clients map[util.Key]bsnet.Receiver
}
func (n *network) Adapter(p *peer.Peer) bsnet.Adapter {
func (n *network) Adapter(p peer.Peer) bsnet.Adapter {
client := &networkClient{
local: p,
network: n,
@ -52,7 +52,7 @@ func (n *network) Adapter(p *peer.Peer) bsnet.Adapter {
return client
}
func (n *network) HasPeer(p *peer.Peer) bool {
func (n *network) HasPeer(p peer.Peer) bool {
_, found := n.clients[p.Key()]
return found
}
@ -61,8 +61,8 @@ func (n *network) HasPeer(p *peer.Peer) bool {
// TODO what does the network layer do with errors received from services?
func (n *network) SendMessage(
ctx context.Context,
from *peer.Peer,
to *peer.Peer,
from peer.Peer,
to peer.Peer,
message bsmsg.BitSwapMessage) error {
receiver, ok := n.clients[to.Key()]
@ -79,7 +79,7 @@ func (n *network) SendMessage(
}
func (n *network) deliver(
r bsnet.Receiver, from *peer.Peer, message bsmsg.BitSwapMessage) error {
r bsnet.Receiver, from peer.Peer, message bsmsg.BitSwapMessage) error {
if message == nil || from == nil {
return errors.New("Invalid input")
}
@ -107,8 +107,8 @@ var NoResponse = errors.New("No response received from the receiver")
// TODO
func (n *network) SendRequest(
ctx context.Context,
from *peer.Peer,
to *peer.Peer,
from peer.Peer,
to peer.Peer,
message bsmsg.BitSwapMessage) (
incoming bsmsg.BitSwapMessage, err error) {
@ -130,7 +130,7 @@ func (n *network) SendRequest(
}
// TODO test when receiver doesn't immediately respond to the initiator of the request
if !bytes.Equal(nextPeer.ID, from.ID) {
if !bytes.Equal(nextPeer.ID(), from.ID()) {
go func() {
nextReceiver, ok := n.clients[nextPeer.Key()]
if !ok {
@ -144,26 +144,26 @@ func (n *network) SendRequest(
}
type networkClient struct {
local *peer.Peer
local peer.Peer
bsnet.Receiver
network Network
}
func (nc *networkClient) SendMessage(
ctx context.Context,
to *peer.Peer,
to peer.Peer,
message bsmsg.BitSwapMessage) error {
return nc.network.SendMessage(ctx, nc.local, to, message)
}
func (nc *networkClient) SendRequest(
ctx context.Context,
to *peer.Peer,
to peer.Peer,
message bsmsg.BitSwapMessage) (incoming bsmsg.BitSwapMessage, err error) {
return nc.network.SendRequest(ctx, nc.local, to, message)
}
func (nc *networkClient) DialPeer(p *peer.Peer) error {
func (nc *networkClient) DialPeer(p peer.Peer) error {
// no need to do anything because dialing isn't a thing in this test net.
if !nc.network.HasPeer(p) {
return fmt.Errorf("Peer not in network: %s", p)

View File

@ -18,15 +18,15 @@ func TestSendRequestToCooperativePeer(t *testing.T) {
t.Log("Get two network adapters")
initiator := net.Adapter(&peer.Peer{ID: []byte("initiator")})
recipient := net.Adapter(&peer.Peer{ID: idOfRecipient})
initiator := net.Adapter(peer.WithIDString("initiator"))
recipient := net.Adapter(peer.WithID(idOfRecipient))
expectedStr := "response from recipient"
recipient.SetDelegate(lambda(func(
ctx context.Context,
from *peer.Peer,
from peer.Peer,
incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage) {
peer.Peer, bsmsg.BitSwapMessage) {
t.Log("Recipient received a message from the network")
@ -43,7 +43,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) {
message := bsmsg.New()
message.AppendBlock(*blocks.NewBlock([]byte("data")))
response, err := initiator.SendRequest(
context.Background(), &peer.Peer{ID: idOfRecipient}, message)
context.Background(), peer.WithID(idOfRecipient), message)
if err != nil {
t.Fatal(err)
}
@ -61,8 +61,8 @@ func TestSendRequestToCooperativePeer(t *testing.T) {
func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
net := VirtualNetwork()
idOfResponder := []byte("responder")
waiter := net.Adapter(&peer.Peer{ID: []byte("waiter")})
responder := net.Adapter(&peer.Peer{ID: idOfResponder})
waiter := net.Adapter(peer.WithIDString("waiter"))
responder := net.Adapter(peer.WithID(idOfResponder))
var wg sync.WaitGroup
@ -72,9 +72,9 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
responder.SetDelegate(lambda(func(
ctx context.Context,
fromWaiter *peer.Peer,
fromWaiter peer.Peer,
msgFromWaiter bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage) {
peer.Peer, bsmsg.BitSwapMessage) {
msgToWaiter := bsmsg.New()
msgToWaiter.AppendBlock(*blocks.NewBlock([]byte(expectedStr)))
@ -84,9 +84,9 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
waiter.SetDelegate(lambda(func(
ctx context.Context,
fromResponder *peer.Peer,
fromResponder peer.Peer,
msgFromResponder bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage) {
peer.Peer, bsmsg.BitSwapMessage) {
// TODO assert that this came from the correct peer and that the message contents are as expected
ok := false
@ -107,7 +107,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
messageSentAsync := bsmsg.New()
messageSentAsync.AppendBlock(*blocks.NewBlock([]byte("data")))
errSending := waiter.SendMessage(
context.Background(), &peer.Peer{ID: idOfResponder}, messageSentAsync)
context.Background(), peer.WithID(idOfResponder), messageSentAsync)
if errSending != nil {
t.Fatal(errSending)
}
@ -115,8 +115,8 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
wg.Wait() // until waiter delegate function is executed
}
type receiverFunc func(ctx context.Context, p *peer.Peer,
incoming bsmsg.BitSwapMessage) (*peer.Peer, bsmsg.BitSwapMessage)
type receiverFunc func(ctx context.Context, p peer.Peer,
incoming bsmsg.BitSwapMessage) (peer.Peer, bsmsg.BitSwapMessage)
// lambda returns a Receiver instance given a receiver function
func lambda(f receiverFunc) bsnet.Receiver {
@ -126,13 +126,13 @@ func lambda(f receiverFunc) bsnet.Receiver {
}
type lambdaImpl struct {
f func(ctx context.Context, p *peer.Peer, incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage)
f func(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) (
peer.Peer, bsmsg.BitSwapMessage)
}
func (lam *lambdaImpl) ReceiveMessage(ctx context.Context,
p *peer.Peer, incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage) {
p peer.Peer, incoming bsmsg.BitSwapMessage) (
peer.Peer, bsmsg.BitSwapMessage) {
return lam.f(ctx, p, incoming)
}

View File

@ -209,7 +209,7 @@ func TestFastRepublish(t *testing.T) {
node, mnt := setupIpnsTest(t, nil)
h, err := node.Identity.PrivKey.GetPublic().Hash()
h, err := node.Identity.PrivKey().GetPublic().Hash()
if err != nil {
t.Fatal(err)
}

View File

@ -35,7 +35,7 @@ type FileSystem struct {
// NewFileSystem constructs new fs using given core.IpfsNode instance.
func NewIpns(ipfs *core.IpfsNode, ipfspath string) (*FileSystem, error) {
root, err := CreateRoot(ipfs, []ci.PrivKey{ipfs.Identity.PrivKey}, ipfspath)
root, err := CreateRoot(ipfs, []ci.PrivKey{ipfs.Identity.PrivKey()}, ipfspath)
if err != nil {
return nil, err
}

View File

@ -11,9 +11,7 @@ import (
)
func TestRoutingResolve(t *testing.T) {
local := &peer.Peer{
ID: []byte("testID"),
}
local := peer.WithIDString("testID")
lds := ds.NewMapDatastore()
d := mock.NewMockRouter(local, lds)

View File

@ -42,8 +42,8 @@ func newMsgioPipe(size int) *msgioPipe {
// singleConn represents a single connection to another Peer (IPFS Node).
type singleConn struct {
local *peer.Peer
remote *peer.Peer
local peer.Peer
remote peer.Peer
maconn manet.Conn
msgio *msgioPipe
@ -51,7 +51,7 @@ type singleConn struct {
}
// newConn constructs a new connection
func newSingleConn(ctx context.Context, local, remote *peer.Peer,
func newSingleConn(ctx context.Context, local, remote peer.Peer,
maconn manet.Conn) (Conn, error) {
conn := &singleConn{
@ -117,12 +117,12 @@ func (c *singleConn) RemoteMultiaddr() ma.Multiaddr {
}
// LocalPeer is the Peer on this side
func (c *singleConn) LocalPeer() *peer.Peer {
func (c *singleConn) LocalPeer() peer.Peer {
return c.local
}
// RemotePeer is the Peer on the remote side
func (c *singleConn) RemotePeer() *peer.Peer {
func (c *singleConn) RemotePeer() peer.Peer {
return c.remote
}

View File

@ -12,7 +12,7 @@ import (
// Dial connects to a particular peer, over a given network
// Example: d.Dial(ctx, "udp", peer)
func (d *Dialer) Dial(ctx context.Context, network string, remote *peer.Peer) (Conn, error) {
func (d *Dialer) Dial(ctx context.Context, network string, remote peer.Peer) (Conn, error) {
laddr := d.LocalPeer.NetAddress(network)
if laddr == nil {
return nil, fmt.Errorf("No local address for network %s", network)

View File

@ -10,7 +10,7 @@ import (
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
func setupPeer(addr string) (*peer.Peer, error) {
func setupPeer(addr string) (peer.Peer, error) {
tcp, err := ma.NewMultiaddr(addr)
if err != nil {
return nil, err
@ -21,14 +21,10 @@ func setupPeer(addr string) (*peer.Peer, error) {
return nil, err
}
id, err := peer.IDFromPubKey(pk)
p, err := peer.WithKeyPair(sk, pk)
if err != nil {
return nil, err
}
p := &peer.Peer{ID: id}
p.PrivKey = sk
p.PubKey = pk
p.AddAddress(tcp)
return p, nil
}

View File

@ -23,13 +23,13 @@ type Conn interface {
LocalMultiaddr() ma.Multiaddr
// LocalPeer is the Peer on this side
LocalPeer() *peer.Peer
LocalPeer() peer.Peer
// RemoteMultiaddr is the Multiaddr on the remote side
RemoteMultiaddr() ma.Multiaddr
// RemotePeer is the Peer on the remote side
RemotePeer() *peer.Peer
RemotePeer() peer.Peer
// In returns a readable message channel
In() <-chan []byte
@ -47,7 +47,7 @@ type Conn interface {
type Dialer struct {
// LocalPeer is the identity of the local Peer.
LocalPeer *peer.Peer
LocalPeer peer.Peer
// Peerstore is the set of peers we know about locally. The Dialer needs it
// because when an incoming connection is identified, we should reuse the
@ -65,7 +65,7 @@ type Listener interface {
Multiaddr() ma.Multiaddr
// LocalPeer is the identity of the local Peer.
LocalPeer() *peer.Peer
LocalPeer() peer.Peer
// Peerstore is the set of peers we know about locally. The Listener needs it
// because when an incoming connection is identified, we should reuse the

View File

@ -23,7 +23,7 @@ type listener struct {
maddr ma.Multiaddr
// LocalPeer is the identity of the local Peer.
local *peer.Peer
local peer.Peer
// Peerstore is the set of peers we know about locally
peers peer.Peerstore
@ -105,7 +105,7 @@ func (l *listener) Multiaddr() ma.Multiaddr {
}
// LocalPeer is the identity of the local Peer.
func (l *listener) LocalPeer() *peer.Peer {
func (l *listener) LocalPeer() peer.Peer {
return l.local
}
@ -117,7 +117,7 @@ func (l *listener) Peerstore() peer.Peerstore {
}
// Listen listens on the particular multiaddr, with given peer and peerstore.
func Listen(ctx context.Context, addr ma.Multiaddr, local *peer.Peer, peers peer.Peerstore) (Listener, error) {
func Listen(ctx context.Context, addr ma.Multiaddr, local peer.Peer, peers peer.Peerstore) (Listener, error) {
ml, err := manet.Listen(addr)
if err != nil {

View File

@ -27,8 +27,8 @@ type MultiConn struct {
// this string is: /addr1/peer1/addr2/peer2 (peers ordered lexicographically)
conns map[string]Conn
local *peer.Peer
remote *peer.Peer
local peer.Peer
remote peer.Peer
// fan-in/fan-out
duplex Duplex
@ -39,7 +39,7 @@ type MultiConn struct {
}
// NewMultiConn constructs a new connection
func NewMultiConn(ctx context.Context, local, remote *peer.Peer, conns []Conn) (*MultiConn, error) {
func NewMultiConn(ctx context.Context, local, remote peer.Peer, conns []Conn) (*MultiConn, error) {
c := &MultiConn{
local: local,
@ -72,6 +72,10 @@ func (c *MultiConn) Add(conns ...Conn) {
log.Error("%s", c2)
c.Unlock() // ok to unlock (to log). panicing.
log.Error("%s", c)
log.Error("c.LocalPeer: %s %#v", c.LocalPeer(), c.LocalPeer())
log.Error("c2.LocalPeer: %s %#v", c2.LocalPeer(), c2.LocalPeer())
log.Error("c.RemotePeer: %s %#v", c.RemotePeer(), c.RemotePeer())
log.Error("c2.RemotePeer: %s %#v", c2.RemotePeer(), c2.RemotePeer())
c.Lock() // gotta relock to avoid lock panic from deferring.
panic("connection addresses mismatch")
}
@ -269,12 +273,12 @@ func (c *MultiConn) RemoteMultiaddr() ma.Multiaddr {
}
// LocalPeer is the Peer on this side
func (c *MultiConn) LocalPeer() *peer.Peer {
func (c *MultiConn) LocalPeer() peer.Peer {
return c.local
}
// RemotePeer is the Peer on the remote side
func (c *MultiConn) RemotePeer() *peer.Peer {
func (c *MultiConn) RemotePeer() peer.Peer {
return c.remote
}

View File

@ -97,7 +97,7 @@ func setupMultiConns(t *testing.T, ctx context.Context) (a, b *MultiConn) {
p2ps := peer.NewPeerstore()
// listeners
listen := func(addr ma.Multiaddr, p *peer.Peer, ps peer.Peerstore) Listener {
listen := func(addr ma.Multiaddr, p peer.Peer, ps peer.Peerstore) Listener {
l, err := Listen(ctx, addr, p, ps)
if err != nil {
t.Fatal(err)
@ -106,14 +106,14 @@ func setupMultiConns(t *testing.T, ctx context.Context) (a, b *MultiConn) {
}
log.Info("Setting up listeners")
p1l := listen(p1.Addresses[0], p1, p1ps)
p2l := listen(p2.Addresses[0], p2, p2ps)
p1l := listen(p1.Addresses()[0], p1, p1ps)
p2l := listen(p2.Addresses()[0], p2, p2ps)
// dialers
p1d := &Dialer{Peerstore: p1ps, LocalPeer: p1}
p2d := &Dialer{Peerstore: p2ps, LocalPeer: p2}
dial := func(d *Dialer, dst *peer.Peer) <-chan Conn {
dial := func(d *Dialer, dst peer.Peer) <-chan Conn {
cc := make(chan Conn)
go func() {
c, err := d.Dial(ctx, "tcp", dst)

View File

@ -79,6 +79,8 @@ func (c *secureConn) secureHandshake(peers peer.Peerstore) error {
// perhaps return an error. TBD.
log.Error("secureConn peer mismatch. %v != %v", insecureSC.remote, c.secure.RemotePeer())
log.Error("insecureSC.remote: %s %#v", insecureSC.remote, insecureSC.remote)
log.Error("c.secure.LocalPeer: %s %#v", c.secure.RemotePeer(), c.secure.RemotePeer())
panic("secureConn peer mismatch. consructed incorrectly?")
}
@ -114,12 +116,12 @@ func (c *secureConn) RemoteMultiaddr() ma.Multiaddr {
}
// LocalPeer is the Peer on this side
func (c *secureConn) LocalPeer() *peer.Peer {
func (c *secureConn) LocalPeer() peer.Peer {
return c.insecure.LocalPeer()
}
// RemotePeer is the Peer on the remote side
func (c *secureConn) RemotePeer() *peer.Peer {
func (c *secureConn) RemotePeer() peer.Peer {
return c.insecure.RemotePeer()
}

View File

@ -15,19 +15,19 @@ type Network interface {
// TODO: for now, only listen on addrs in local peer when initializing.
// DialPeer attempts to establish a connection to a given peer
DialPeer(*peer.Peer) error
DialPeer(peer.Peer) error
// ClosePeer connection to peer
ClosePeer(*peer.Peer) error
ClosePeer(peer.Peer) error
// IsConnected returns whether a connection to given peer exists.
IsConnected(*peer.Peer) (bool, error)
IsConnected(peer.Peer) (bool, error)
// GetProtocols returns the protocols registered in the network.
GetProtocols() *mux.ProtocolMap
// GetPeerList returns the list of peers currently connected in this network.
GetPeerList() []*peer.Peer
GetPeerList() []peer.Peer
// GetBandwidthTotals returns the total number of bytes passed through
// the network since it was instantiated

View File

@ -8,12 +8,12 @@ import (
// NetMessage is the interface for the message
type NetMessage interface {
Peer() *peer.Peer
Peer() peer.Peer
Data() []byte
}
// New is the interface for constructing a new message.
func New(p *peer.Peer, data []byte) NetMessage {
func New(p peer.Peer, data []byte) NetMessage {
return &message{peer: p, data: data}
}
@ -21,13 +21,13 @@ func New(p *peer.Peer, data []byte) NetMessage {
// particular Peer.
type message struct {
// To or from, depending on direction.
peer *peer.Peer
peer peer.Peer
// Opaque data
data []byte
}
func (m *message) Peer() *peer.Peer {
func (m *message) Peer() peer.Peer {
return m.peer
}
@ -36,7 +36,7 @@ func (m *message) Data() []byte {
}
// FromObject creates a message from a protobuf-marshallable message.
func FromObject(p *peer.Peer, data proto.Message) (NetMessage, error) {
func FromObject(p peer.Peer, data proto.Message) (NetMessage, error) {
bytes, err := proto.Marshal(data)
if err != nil {
return nil, err

View File

@ -21,14 +21,14 @@ func (t *TestProtocol) GetPipe() *msg.Pipe {
return t.Pipe
}
func newPeer(t *testing.T, id string) *peer.Peer {
func newPeer(t *testing.T, id string) peer.Peer {
mh, err := mh.FromHexString(id)
if err != nil {
t.Error(err)
return nil
}
return &peer.Peer{ID: peer.ID(mh)}
return peer.WithID(peer.ID(mh))
}
func testMsg(t *testing.T, m msg.NetMessage, data []byte) {

View File

@ -15,7 +15,7 @@ import (
type IpfsNetwork struct {
// local peer
local *peer.Peer
local peer.Peer
// protocol multiplexing
muxer *mux.Muxer
@ -29,7 +29,7 @@ type IpfsNetwork struct {
}
// NewIpfsNetwork is the structure that implements the network interface
func NewIpfsNetwork(ctx context.Context, local *peer.Peer,
func NewIpfsNetwork(ctx context.Context, local peer.Peer,
peers peer.Peerstore, pmap *mux.ProtocolMap) (*IpfsNetwork, error) {
ctx, cancel := context.WithCancel(ctx)
@ -63,19 +63,19 @@ func NewIpfsNetwork(ctx context.Context, local *peer.Peer,
// func (n *IpfsNetwork) Listen(*ma.Muliaddr) error {}
// DialPeer attempts to establish a connection to a given peer
func (n *IpfsNetwork) DialPeer(p *peer.Peer) error {
func (n *IpfsNetwork) DialPeer(p peer.Peer) error {
_, err := n.swarm.Dial(p)
return err
}
// ClosePeer connection to peer
func (n *IpfsNetwork) ClosePeer(p *peer.Peer) error {
func (n *IpfsNetwork) ClosePeer(p peer.Peer) error {
return n.swarm.CloseConnection(p)
}
// IsConnected returns whether a connection to given peer exists.
func (n *IpfsNetwork) IsConnected(p *peer.Peer) (bool, error) {
return n.swarm.GetConnection(p.ID) != nil, nil
func (n *IpfsNetwork) IsConnected(p peer.Peer) (bool, error) {
return n.swarm.GetConnection(p.ID()) != nil, nil
}
// GetProtocols returns the protocols registered in the network.
@ -108,10 +108,12 @@ func (n *IpfsNetwork) Close() error {
return nil
}
func (n *IpfsNetwork) GetPeerList() []*peer.Peer {
// GetPeerList returns the networks list of connected peers
func (n *IpfsNetwork) GetPeerList() []peer.Peer {
return n.swarm.GetPeerList()
}
// GetBandwidthTotals returns the total amount of bandwidth transferred
func (n *IpfsNetwork) GetBandwidthTotals() (in uint64, out uint64) {
return n.muxer.GetBandwidthTotals()
}

View File

@ -133,7 +133,7 @@ func (s *service) SendMessage(ctx context.Context, m msg.NetMessage) error {
func (s *service) SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error) {
// create a request
r, err := NewRequest(m.Peer().ID)
r, err := NewRequest(m.Peer().ID())
if err != nil {
return nil, err
}
@ -227,7 +227,7 @@ func (s *service) handleIncomingMessage(ctx context.Context, m msg.NetMessage) {
log.Error("RequestID should identify a response here.")
}
key := RequestKey(m.Peer().ID, RequestID(rid))
key := RequestKey(m.Peer().ID(), RequestID(rid))
s.RequestsLock.RLock()
r, found := s.Requests[key]
s.RequestsLock.RUnlock()

View File

@ -25,14 +25,14 @@ func (t *ReverseHandler) HandleMessage(ctx context.Context, m msg.NetMessage) ms
return msg.New(m.Peer(), d)
}
func newPeer(t *testing.T, id string) *peer.Peer {
func newPeer(t *testing.T, id string) peer.Peer {
mh, err := mh.FromHexString(id)
if err != nil {
t.Error(err)
return nil
}
return &peer.Peer{ID: peer.ID(mh)}
return peer.WithID(peer.ID(mh))
}
func TestServiceHandler(t *testing.T) {

View File

@ -14,11 +14,11 @@ import (
func (s *Swarm) listen() error {
hasErr := false
retErr := &ListenErr{
Errors: make([]error, len(s.local.Addresses)),
Errors: make([]error, len(s.local.Addresses())),
}
// listen on every address
for i, addr := range s.local.Addresses {
for i, addr := range s.local.Addresses() {
err := s.connListen(addr)
if err != nil {
hasErr = true

View File

@ -24,10 +24,10 @@ func TestSimultOpen(t *testing.T) {
// connect everyone
{
var wg sync.WaitGroup
connect := func(s *Swarm, dst *peer.Peer) {
connect := func(s *Swarm, dst peer.Peer) {
// copy for other peer
cp := &peer.Peer{ID: dst.ID}
cp.AddAddress(dst.Addresses[0])
cp := peer.WithID(dst.ID())
cp.AddAddress(dst.Addresses()[0])
if _, err := s.Dial(cp); err != nil {
t.Fatal("error swarm dialing to peer", err)

View File

@ -45,7 +45,7 @@ func (e *ListenErr) Error() string {
type Swarm struct {
// local is the peer this swarm represents
local *peer.Peer
local peer.Peer
// peers is a collection of peers for swarm to use
peers peer.Peerstore
@ -69,7 +69,7 @@ type Swarm struct {
}
// NewSwarm constructs a Swarm, with a Chan.
func NewSwarm(ctx context.Context, local *peer.Peer, ps peer.Peerstore) (*Swarm, error) {
func NewSwarm(ctx context.Context, local peer.Peer, ps peer.Peerstore) (*Swarm, error) {
s := &Swarm{
Pipe: msg.NewPipe(10),
conns: conn.MultiConnMap{},
@ -104,13 +104,13 @@ func (s *Swarm) close() error {
// etc. to achive connection.
//
// For now, Dial uses only TCP. This will be extended.
func (s *Swarm) Dial(peer *peer.Peer) (conn.Conn, error) {
if peer.ID.Equal(s.local.ID) {
func (s *Swarm) Dial(peer peer.Peer) (conn.Conn, error) {
if peer.ID().Equal(s.local.ID()) {
return nil, errors.New("Attempted connection to self!")
}
// check if we already have an open connection first
c := s.GetConnection(peer.ID)
c := s.GetConnection(peer.ID())
if c != nil {
return c, nil
}
@ -167,14 +167,14 @@ func (s *Swarm) Connections() []conn.Conn {
}
// CloseConnection removes a given peer from swarm + closes the connection
func (s *Swarm) CloseConnection(p *peer.Peer) error {
c := s.GetConnection(p.ID)
func (s *Swarm) CloseConnection(p peer.Peer) error {
c := s.GetConnection(p.ID())
if c == nil {
return u.ErrNotFound
}
s.connsLock.Lock()
delete(s.conns, u.Key(p.ID))
delete(s.conns, u.Key(p.ID()))
s.connsLock.Unlock()
return c.Close()
@ -190,8 +190,8 @@ func (s *Swarm) GetErrChan() chan error {
}
// GetPeerList returns a copy of the set of peers swarm is connected to.
func (s *Swarm) GetPeerList() []*peer.Peer {
var out []*peer.Peer
func (s *Swarm) GetPeerList() []peer.Peer {
var out []peer.Peer
s.connsLock.RLock()
for _, p := range s.conns {
out = append(out, p.RemotePeer())

View File

@ -32,7 +32,7 @@ func pong(ctx context.Context, swarm *Swarm) {
}
}
func setupPeer(t *testing.T, addr string) *peer.Peer {
func setupPeer(t *testing.T, addr string) peer.Peer {
tcp, err := ma.NewMultiaddr(addr)
if err != nil {
t.Fatal(err)
@ -43,19 +43,15 @@ func setupPeer(t *testing.T, addr string) *peer.Peer {
t.Fatal(err)
}
id, err := peer.IDFromPubKey(pk)
p, err := peer.WithKeyPair(sk, pk)
if err != nil {
t.Fatal(err)
}
p := &peer.Peer{ID: id}
p.PrivKey = sk
p.PubKey = pk
p.AddAddress(tcp)
return p
}
func makeSwarms(ctx context.Context, t *testing.T, addrs []string) ([]*Swarm, []*peer.Peer) {
func makeSwarms(ctx context.Context, t *testing.T, addrs []string) ([]*Swarm, []peer.Peer) {
swarms := []*Swarm{}
for _, addr := range addrs {
@ -68,7 +64,7 @@ func makeSwarms(ctx context.Context, t *testing.T, addrs []string) ([]*Swarm, []
swarms = append(swarms, swarm)
}
peers := make([]*peer.Peer, len(swarms))
peers := make([]peer.Peer, len(swarms))
for i, s := range swarms {
peers[i] = s.local
}
@ -85,14 +81,14 @@ func SubtestSwarm(t *testing.T, addrs []string, MsgNum int) {
// connect everyone
{
var wg sync.WaitGroup
connect := func(s *Swarm, dst *peer.Peer) {
connect := func(s *Swarm, dst peer.Peer) {
// copy for other peer
cp, err := s.peers.Get(dst.ID)
cp, err := s.peers.Get(dst.ID())
if err != nil {
cp = &peer.Peer{ID: dst.ID}
t.Fatal(err)
}
cp.AddAddress(dst.Addresses[0])
cp.AddAddress(dst.Addresses()[0])
log.Info("SWARM TEST: %s dialing %s", s.local, dst)
if _, err := s.Dial(cp); err != nil {

View File

@ -49,17 +49,48 @@ func IDFromPubKey(pk ic.PubKey) (ID, error) {
return ID(hash), nil
}
// Map maps Key (string) : *Peer (slices are not comparable).
type Map map[u.Key]*Peer
// Map maps Key (string) : *peer (slices are not comparable).
type Map map[u.Key]Peer
// Peer represents the identity information of an IPFS Node, including
// ID, and relevant Addresses.
type Peer struct {
ID ID
Addresses []ma.Multiaddr
type Peer interface {
// ID returns the peer's ID
ID() ID
PrivKey ic.PrivKey
PubKey ic.PubKey
// Key returns the ID as a Key (string) for maps.
Key() u.Key
// Addresses returns the peer's multiaddrs
Addresses() []ma.Multiaddr
// AddAddress adds the given Multiaddr address to Peer's addresses.
AddAddress(a ma.Multiaddr)
// NetAddress returns the first Multiaddr found for a given network.
NetAddress(n string) ma.Multiaddr
// Priv/PubKey returns the peer's Private Key
PrivKey() ic.PrivKey
PubKey() ic.PubKey
// LoadAndVerifyKeyPair unmarshalls, loads a private/public key pair.
// Error if (a) unmarshalling fails, or (b) pubkey does not match id.
LoadAndVerifyKeyPair(marshalled []byte) error
VerifyAndSetPrivKey(sk ic.PrivKey) error
VerifyAndSetPubKey(pk ic.PubKey) error
// Get/SetLatency manipulate the current latency measurement.
GetLatency() (out time.Duration)
SetLatency(laten time.Duration)
}
type peer struct {
id ID
addresses []ma.Multiaddr
privKey ic.PrivKey
pubKey ic.PubKey
latency time.Duration
@ -67,34 +98,56 @@ type Peer struct {
}
// String prints out the peer.
func (p *Peer) String() string {
return "[Peer " + p.ID.String()[:12] + "]"
func (p *peer) String() string {
return "[Peer " + p.id.String()[:12] + "]"
}
// Key returns the ID as a Key (string) for maps.
func (p *Peer) Key() u.Key {
return u.Key(p.ID)
func (p *peer) Key() u.Key {
return u.Key(p.id)
}
// ID returns the peer's ID
func (p *peer) ID() ID {
return p.id
}
// PrivKey returns the peer's Private Key
func (p *peer) PrivKey() ic.PrivKey {
return p.privKey
}
// PubKey returns the peer's Private Key
func (p *peer) PubKey() ic.PubKey {
return p.pubKey
}
// Addresses returns the peer's multiaddrs
func (p *peer) Addresses() []ma.Multiaddr {
cp := make([]ma.Multiaddr, len(p.addresses))
copy(cp, p.addresses)
return cp
}
// AddAddress adds the given Multiaddr address to Peer's addresses.
func (p *Peer) AddAddress(a ma.Multiaddr) {
func (p *peer) AddAddress(a ma.Multiaddr) {
p.Lock()
defer p.Unlock()
for _, addr := range p.Addresses {
for _, addr := range p.addresses {
if addr.Equal(a) {
return
}
}
p.Addresses = append(p.Addresses, a)
p.addresses = append(p.addresses, a)
}
// NetAddress returns the first Multiaddr found for a given network.
func (p *Peer) NetAddress(n string) ma.Multiaddr {
func (p *peer) NetAddress(n string) ma.Multiaddr {
p.RLock()
defer p.RUnlock()
for _, a := range p.Addresses {
for _, a := range p.addresses {
for _, p := range a.Protocols() {
if p.Name == n {
return a
@ -105,7 +158,7 @@ func (p *Peer) NetAddress(n string) ma.Multiaddr {
}
// GetLatency retrieves the current latency measurement.
func (p *Peer) GetLatency() (out time.Duration) {
func (p *peer) GetLatency() (out time.Duration) {
p.RLock()
out = p.latency
p.RUnlock()
@ -116,7 +169,7 @@ func (p *Peer) GetLatency() (out time.Duration) {
// TODO: Instead of just keeping a single number,
// keep a running average over the last hour or so
// Yep, should be EWMA or something. (-jbenet)
func (p *Peer) SetLatency(laten time.Duration) {
func (p *peer) SetLatency(laten time.Duration) {
p.Lock()
if p.latency == 0 {
p.latency = laten
@ -128,61 +181,98 @@ func (p *Peer) SetLatency(laten time.Duration) {
// LoadAndVerifyKeyPair unmarshalls, loads a private/public key pair.
// Error if (a) unmarshalling fails, or (b) pubkey does not match id.
func (p *Peer) LoadAndVerifyKeyPair(marshalled []byte) error {
func (p *peer) LoadAndVerifyKeyPair(marshalled []byte) error {
sk, err := ic.UnmarshalPrivateKey(marshalled)
if err != nil {
return fmt.Errorf("Failed to unmarshal private key: %v", err)
}
return p.VerifyAndSetPrivKey(sk)
}
// VerifyAndSetPrivKey sets private key, given its pubkey matches the peer.ID
func (p *peer) VerifyAndSetPrivKey(sk ic.PrivKey) error {
// construct and assign pubkey. ensure it matches this peer
if err := p.VerifyAndSetPubKey(sk.GetPublic()); err != nil {
return err
}
// if we didn't have the priavte key, assign it
if p.PrivKey == nil {
p.PrivKey = sk
if p.privKey == nil {
p.privKey = sk
return nil
}
// if we already had the keys, check they're equal.
if p.PrivKey.Equals(sk) {
if p.privKey.Equals(sk) {
return nil // as expected. keep the old objects.
}
// keys not equal. invariant violated. this warrants a panic.
// these keys should be _the same_ because peer.ID = H(pk)
// this mismatch should never happen.
log.Error("%s had PrivKey: %v -- got %v", p, p.PrivKey, sk)
log.Error("%s had PrivKey: %v -- got %v", p, p.privKey, sk)
panic("invariant violated: unexpected key mismatch")
}
// VerifyAndSetPubKey sets public key, given it matches the peer.ID
func (p *Peer) VerifyAndSetPubKey(pk ic.PubKey) error {
func (p *peer) VerifyAndSetPubKey(pk ic.PubKey) error {
pkid, err := IDFromPubKey(pk)
if err != nil {
return fmt.Errorf("Failed to hash public key: %v", err)
}
if !p.ID.Equal(pkid) {
if !p.id.Equal(pkid) {
return fmt.Errorf("Public key does not match peer.ID.")
}
// if we didn't have the keys, assign them.
if p.PubKey == nil {
p.PubKey = pk
if p.pubKey == nil {
p.pubKey = pk
return nil
}
// if we already had the pubkey, check they're equal.
if p.PubKey.Equals(pk) {
if p.pubKey.Equals(pk) {
return nil // as expected. keep the old objects.
}
// keys not equal. invariant violated. this warrants a panic.
// these keys should be _the same_ because peer.ID = H(pk)
// this mismatch should never happen.
log.Error("%s had PubKey: %v -- got %v", p, p.PubKey, pk)
log.Error("%s had PubKey: %v -- got %v", p, p.pubKey, pk)
panic("invariant violated: unexpected key mismatch")
}
// WithKeyPair returns a Peer object with given keys.
func WithKeyPair(sk ic.PrivKey, pk ic.PubKey) (Peer, error) {
if sk == nil && pk == nil {
return nil, fmt.Errorf("PeerWithKeyPair nil keys")
}
pk2 := sk.GetPublic()
if pk == nil {
pk = pk2
} else if !pk.Equals(pk2) {
return nil, fmt.Errorf("key mismatch. pubkey is not privkey's pubkey")
}
pkid, err := IDFromPubKey(pk)
if err != nil {
return nil, fmt.Errorf("Failed to hash public key: %v", err)
}
return &peer{id: pkid, pubKey: pk, privKey: sk}, nil
}
// WithID constructs a peer with given ID.
func WithID(id ID) Peer {
return &peer{id: id}
}
// WithIDString constructs a peer with given ID (string).
func WithIDString(id string) Peer {
return WithID(ID(id))
}

View File

@ -27,12 +27,12 @@ func TestNetAddress(t *testing.T) {
return
}
p := Peer{ID: ID(mh)}
p := WithID(ID(mh))
p.AddAddress(tcp)
p.AddAddress(udp)
p.AddAddress(tcp)
if len(p.Addresses) == 3 {
if len(p.Addresses()) == 3 {
t.Error("added same address twice")
}

View File

@ -11,8 +11,8 @@ import (
// Peerstore provides a threadsafe collection for peers.
type Peerstore interface {
Get(ID) (*Peer, error)
Put(*Peer) error
Get(ID) (Peer, error)
Put(Peer) error
Delete(ID) error
All() (*Map, error)
}
@ -29,9 +29,13 @@ func NewPeerstore() Peerstore {
}
}
func (p *peerstore) Get(i ID) (*Peer, error) {
p.RLock()
defer p.RUnlock()
func (p *peerstore) Get(i ID) (Peer, error) {
p.Lock()
defer p.Unlock()
if i == nil {
panic("wat")
}
k := u.Key(i).DsKey()
val, err := p.peers.Get(k)
@ -43,7 +47,7 @@ func (p *peerstore) Get(i ID) (*Peer, error) {
// not found, construct it ourselves, add it to datastore, and return.
case ds.ErrNotFound:
peer := &Peer{ID: i}
peer := &peer{id: i}
if err := p.peers.Put(k, peer); err != nil {
return nil, err
}
@ -51,7 +55,7 @@ func (p *peerstore) Get(i ID) (*Peer, error) {
// no error, got it back fine
case nil:
peer, ok := val.(*Peer)
peer, ok := val.(*peer)
if !ok {
return nil, errors.New("stored value was not a Peer")
}
@ -59,11 +63,11 @@ func (p *peerstore) Get(i ID) (*Peer, error) {
}
}
func (p *peerstore) Put(peer *Peer) error {
func (p *peerstore) Put(peer Peer) error {
p.Lock()
defer p.Unlock()
k := u.Key(peer.ID).DsKey()
k := peer.Key().DsKey()
return p.peers.Put(k, peer)
}
@ -91,9 +95,9 @@ func (p *peerstore) All() (*Map, error) {
continue
}
pval, ok := val.(*Peer)
pval, ok := val.(*peer)
if ok {
(*ps)[u.Key(pval.ID)] = pval
(*ps)[pval.Key()] = pval
}
}
return ps, nil

View File

@ -7,13 +7,13 @@ import (
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
func setupPeer(id string, addr string) (*Peer, error) {
func setupPeer(id string, addr string) (Peer, error) {
tcp, err := ma.NewMultiaddr(addr)
if err != nil {
return nil, err
}
p := &Peer{ID: ID(id)}
p := WithIDString(id)
p.AddAddress(tcp)
return p, nil
}

View File

@ -13,7 +13,7 @@ import (
// peerMetric tracks a peer and its distance to something else.
type peerMetric struct {
// the peer
peer *peer.Peer
peer peer.Peer
// big.Int for XOR metric
metric *big.Int
@ -64,11 +64,11 @@ func (pq *distancePQ) Len() int {
return len(pq.heap)
}
func (pq *distancePQ) Enqueue(p *peer.Peer) {
func (pq *distancePQ) Enqueue(p peer.Peer) {
pq.Lock()
defer pq.Unlock()
distance := ks.XORKeySpace.Key(p.ID).Distance(pq.from)
distance := ks.XORKeySpace.Key(p.ID()).Distance(pq.from)
heap.Push(&pq.heap, &peerMetric{
peer: p,
@ -76,7 +76,7 @@ func (pq *distancePQ) Enqueue(p *peer.Peer) {
})
}
func (pq *distancePQ) Dequeue() *peer.Peer {
func (pq *distancePQ) Dequeue() peer.Peer {
pq.Lock()
defer pq.Unlock()

View File

@ -11,8 +11,8 @@ type PeerQueue interface {
Len() int
// Enqueue adds this node to the queue.
Enqueue(*peer.Peer)
Enqueue(peer.Peer)
// Dequeue retrieves the highest (smallest int) priority node
Dequeue() *peer.Peer
Dequeue() peer.Peer
}

View File

@ -12,8 +12,8 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
func newPeer(id string) *peer.Peer {
return &peer.Peer{ID: peer.ID(id)}
func newPeer(id string) peer.Peer {
return peer.WithIDString(id)
}
func TestQueue(t *testing.T) {
@ -66,10 +66,10 @@ func TestQueue(t *testing.T) {
}
func newPeerTime(t time.Time) *peer.Peer {
func newPeerTime(t time.Time) peer.Peer {
s := fmt.Sprintf("hmmm time: %v", t)
h := u.Hash([]byte(s))
return &peer.Peer{ID: peer.ID(h)}
return peer.WithID(peer.ID(h))
}
func TestSyncQueue(t *testing.T) {

View File

@ -9,8 +9,8 @@ import (
// ChanQueue makes any PeerQueue synchronizable through channels.
type ChanQueue struct {
Queue PeerQueue
EnqChan chan<- *peer.Peer
DeqChan <-chan *peer.Peer
EnqChan chan<- peer.Peer
DeqChan <-chan peer.Peer
}
// NewChanQueue creates a ChanQueue by wrapping pq.
@ -23,8 +23,8 @@ func NewChanQueue(ctx context.Context, pq PeerQueue) *ChanQueue {
func (cq *ChanQueue) process(ctx context.Context) {
// construct the channels here to be able to use them bidirectionally
enqChan := make(chan *peer.Peer, 10)
deqChan := make(chan *peer.Peer, 10)
enqChan := make(chan peer.Peer, 10)
deqChan := make(chan peer.Peer, 10)
cq.EnqChan = enqChan
cq.DeqChan = deqChan
@ -32,8 +32,8 @@ func (cq *ChanQueue) process(ctx context.Context) {
go func() {
defer close(deqChan)
var next *peer.Peer
var item *peer.Peer
var next peer.Peer
var item peer.Peer
var more bool
for {

View File

@ -17,20 +17,21 @@ func newMessage(typ Message_MessageType, key string, level int) *Message {
return m
}
func peerToPBPeer(p *peer.Peer) *Message_Peer {
func peerToPBPeer(p peer.Peer) *Message_Peer {
pbp := new(Message_Peer)
if len(p.Addresses) == 0 || p.Addresses[0] == nil {
addrs := p.Addresses()
if len(addrs) == 0 || addrs[0] == nil {
pbp.Addr = proto.String("")
} else {
addr := p.Addresses[0].String()
addr := addrs[0].String()
pbp.Addr = &addr
}
pid := string(p.ID)
pid := string(p.ID())
pbp.Id = &pid
return pbp
}
func peersToPBPeers(peers []*peer.Peer) []*Message_Peer {
func peersToPBPeers(peers []peer.Peer) []*Message_Peer {
pbpeers := make([]*Message_Peer, len(peers))
for i, p := range peers {
pbpeers[i] = peerToPBPeer(p)

View File

@ -39,7 +39,7 @@ type IpfsDHT struct {
sender inet.Sender
// Local peer (yourself)
self *peer.Peer
self peer.Peer
// Other peers
peerstore peer.Peerstore
@ -60,7 +60,7 @@ type IpfsDHT struct {
}
// NewDHT creates a new DHT object with the given peer as the 'local' host
func NewDHT(ctx context.Context, p *peer.Peer, ps peer.Peerstore, net inet.Network, sender inet.Sender, dstore ds.Datastore) *IpfsDHT {
func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, net inet.Network, sender inet.Sender, dstore ds.Datastore) *IpfsDHT {
dht := new(IpfsDHT)
dht.network = net
dht.sender = sender
@ -69,12 +69,12 @@ func NewDHT(ctx context.Context, p *peer.Peer, ps peer.Peerstore, net inet.Netwo
dht.peerstore = ps
dht.ctx = ctx
dht.providers = NewProviderManager(p.ID)
dht.providers = NewProviderManager(p.ID())
dht.routingTables = make([]*kb.RoutingTable, 3)
dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*1000)
dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*1000)
dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Hour)
dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Millisecond*1000)
dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Millisecond*1000)
dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Hour)
dht.birth = time.Now()
if doPinging {
@ -84,7 +84,7 @@ func NewDHT(ctx context.Context, p *peer.Peer, ps peer.Peerstore, net inet.Netwo
}
// Connect to a new peer at the given address, ping and add to the routing table
func (dht *IpfsDHT) Connect(ctx context.Context, npeer *peer.Peer) (*peer.Peer, error) {
func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.Peer) (peer.Peer, error) {
log.Debug("Connect to new peer: %s", npeer)
// TODO(jbenet,whyrusleeping)
@ -175,7 +175,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N
// sendRequest sends out a request using dht.sender, but also makes sure to
// measure the RTT for latency measurements.
func (dht *IpfsDHT) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message) (*Message, error) {
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *Message) (*Message, error) {
mes, err := msg.FromObject(p, pmes)
if err != nil {
@ -208,7 +208,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message
}
// putValueToNetwork stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p *peer.Peer,
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.Peer,
key string, value []byte) error {
pmes := newMessage(Message_PUT_VALUE, string(key), 0)
@ -224,12 +224,12 @@ func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p *peer.Peer,
return nil
}
func (dht *IpfsDHT) putProvider(ctx context.Context, p *peer.Peer, key string) error {
func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.Peer, key string) error {
pmes := newMessage(Message_ADD_PROVIDER, string(key), 0)
// add self as the provider
pmes.ProviderPeers = peersToPBPeers([]*peer.Peer{dht.self})
pmes.ProviderPeers = peersToPBPeers([]peer.Peer{dht.self})
rpmes, err := dht.sendRequest(ctx, p, pmes)
if err != nil {
@ -244,8 +244,8 @@ func (dht *IpfsDHT) putProvider(ctx context.Context, p *peer.Peer, key string) e
return nil
}
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer,
key u.Key, level int) ([]byte, []*peer.Peer, error) {
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer,
key u.Key, level int) ([]byte, []peer.Peer, error) {
pmes, err := dht.getValueSingle(ctx, p, key, level)
if err != nil {
@ -270,7 +270,7 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer,
}
// Perhaps we were given closer peers
var peers []*peer.Peer
var peers []peer.Peer
for _, pb := range pmes.GetCloserPeers() {
pr, err := dht.addPeer(pb)
if err != nil {
@ -289,8 +289,8 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer,
return nil, nil, u.ErrNotFound
}
func (dht *IpfsDHT) addPeer(pb *Message_Peer) (*peer.Peer, error) {
if peer.ID(pb.GetId()).Equal(dht.self.ID) {
func (dht *IpfsDHT) addPeer(pb *Message_Peer) (peer.Peer, error) {
if peer.ID(pb.GetId()).Equal(dht.self.ID()) {
return nil, errors.New("cannot add self as peer")
}
@ -310,7 +310,7 @@ func (dht *IpfsDHT) addPeer(pb *Message_Peer) (*peer.Peer, error) {
}
// getValueSingle simply performs the get value RPC with the given parameters
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p *peer.Peer,
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer,
key u.Key, level int) (*Message, error) {
pmes := newMessage(Message_GET_VALUE, string(key), level)
@ -369,7 +369,7 @@ func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error {
// Update signals to all routingTables to Update their last-seen status
// on the given peer.
func (dht *IpfsDHT) Update(p *peer.Peer) {
func (dht *IpfsDHT) Update(p peer.Peer) {
log.Debug("updating peer: %s latency = %f\n", p, p.GetLatency().Seconds())
removedCount := 0
for _, route := range dht.routingTables {
@ -390,7 +390,7 @@ func (dht *IpfsDHT) Update(p *peer.Peer) {
}
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
func (dht *IpfsDHT) FindLocal(id peer.ID) (*peer.Peer, *kb.RoutingTable) {
func (dht *IpfsDHT) FindLocal(id peer.ID) (peer.Peer, *kb.RoutingTable) {
for _, table := range dht.routingTables {
p := table.Find(id)
if p != nil {
@ -400,7 +400,7 @@ func (dht *IpfsDHT) FindLocal(id peer.ID) (*peer.Peer, *kb.RoutingTable) {
return nil, nil
}
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p *peer.Peer, id peer.ID, level int) (*Message, error) {
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.Peer, id peer.ID, level int) (*Message, error) {
pmes := newMessage(Message_FIND_NODE, string(id), level)
return dht.sendRequest(ctx, p, pmes)
}
@ -411,14 +411,14 @@ func (dht *IpfsDHT) printTables() {
}
}
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p *peer.Peer, key u.Key, level int) (*Message, error) {
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.Key, level int) (*Message, error) {
pmes := newMessage(Message_GET_PROVIDERS, string(key), level)
return dht.sendRequest(ctx, p, pmes)
}
// TODO: Could be done async
func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []*peer.Peer {
var provArr []*peer.Peer
func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []peer.Peer {
var provArr []peer.Peer
for _, prov := range peers {
p, err := dht.peerFromInfo(prov)
if err != nil {
@ -429,7 +429,7 @@ func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []*peer.Peer
log.Debug("%s adding provider: %s for %s", dht.self, p, key)
// Dont add outselves to the list
if p.ID.Equal(dht.self.ID) {
if p.ID().Equal(dht.self.ID()) {
continue
}
@ -441,7 +441,7 @@ func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []*peer.Peer
}
// nearestPeersToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeersToQuery(pmes *Message, count int) []*peer.Peer {
func (dht *IpfsDHT) nearestPeersToQuery(pmes *Message, count int) []peer.Peer {
level := pmes.GetClusterLevel()
cluster := dht.routingTables[level]
@ -451,7 +451,7 @@ func (dht *IpfsDHT) nearestPeersToQuery(pmes *Message, count int) []*peer.Peer {
}
// betterPeerToQuery returns nearestPeersToQuery, but iff closer than self.
func (dht *IpfsDHT) betterPeersToQuery(pmes *Message, count int) []*peer.Peer {
func (dht *IpfsDHT) betterPeersToQuery(pmes *Message, count int) []peer.Peer {
closer := dht.nearestPeersToQuery(pmes, count)
// no node? nil
@ -461,17 +461,17 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *Message, count int) []*peer.Peer {
// == to self? thats bad
for _, p := range closer {
if p.ID.Equal(dht.self.ID) {
if p.ID().Equal(dht.self.ID()) {
log.Error("Attempted to return self! this shouldnt happen...")
return nil
}
}
var filtered []*peer.Peer
var filtered []peer.Peer
for _, p := range closer {
// must all be closer than self
key := u.Key(pmes.GetKey())
if !kb.Closer(dht.self.ID, p.ID, key) {
if !kb.Closer(dht.self.ID(), p.ID(), key) {
filtered = append(filtered, p)
}
}
@ -480,7 +480,7 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *Message, count int) []*peer.Peer {
return filtered
}
func (dht *IpfsDHT) getPeer(id peer.ID) (*peer.Peer, error) {
func (dht *IpfsDHT) getPeer(id peer.ID) (peer.Peer, error) {
p, err := dht.peerstore.Get(id)
if err != nil {
err = fmt.Errorf("Failed to get peer from peerstore: %s", err)
@ -490,12 +490,12 @@ func (dht *IpfsDHT) getPeer(id peer.ID) (*peer.Peer, error) {
return p, nil
}
func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (*peer.Peer, error) {
func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (peer.Peer, error) {
id := peer.ID(pbp.GetId())
// continue if it's ourselves
if id.Equal(dht.self.ID) {
if id.Equal(dht.self.ID()) {
return nil, errors.New("found self")
}
@ -512,7 +512,7 @@ func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (*peer.Peer, error) {
return p, nil
}
func (dht *IpfsDHT) ensureConnectedToPeer(pbp *Message_Peer) (*peer.Peer, error) {
func (dht *IpfsDHT) ensureConnectedToPeer(pbp *Message_Peer) (peer.Peer, error) {
p, err := dht.peerFromInfo(pbp)
if err != nil {
return nil, err

View File

@ -20,7 +20,7 @@ import (
"time"
)
func setupDHT(ctx context.Context, t *testing.T, p *peer.Peer) *IpfsDHT {
func setupDHT(ctx context.Context, t *testing.T, p peer.Peer) *IpfsDHT {
peerstore := peer.NewPeerstore()
dhts := netservice.NewService(nil) // nil handler for now, need to patch it
@ -40,7 +40,7 @@ func setupDHT(ctx context.Context, t *testing.T, p *peer.Peer) *IpfsDHT {
return d
}
func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []*peer.Peer, []*IpfsDHT) {
func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer.Peer, []*IpfsDHT) {
var addrs []ma.Multiaddr
for i := 0; i < n; i++ {
a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 5000+i))
@ -50,7 +50,7 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []*pee
addrs = append(addrs, a)
}
var peers []*peer.Peer
var peers []peer.Peer
for i := 0; i < n; i++ {
p := makePeer(addrs[i])
peers = append(peers, p)
@ -64,21 +64,16 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []*pee
return addrs, peers, dhts
}
func makePeer(addr ma.Multiaddr) *peer.Peer {
p := new(peer.Peer)
p.AddAddress(addr)
func makePeer(addr ma.Multiaddr) peer.Peer {
sk, pk, err := ci.GenerateKeyPair(ci.RSA, 512)
if err != nil {
panic(err)
}
p.PrivKey = sk
p.PubKey = pk
id, err := peer.IDFromPubKey(pk)
p, err := peer.WithKeyPair(sk, pk)
if err != nil {
panic(err)
}
p.ID = id
p.AddAddress(addr)
return p
}
@ -289,7 +284,7 @@ func TestProvidesAsync(t *testing.T) {
provs := dhts[0].FindProvidersAsync(ctxT, u.Key("hello"), 5)
select {
case p := <-provs:
if !p.ID.Equal(dhts[3].self.ID) {
if !p.ID().Equal(dhts[3].self.ID()) {
t.Fatalf("got a provider, but not the right one. %s", p)
}
case <-ctxT.Done():
@ -379,7 +374,7 @@ func TestFindPeer(t *testing.T) {
}
ctxT, _ := context.WithTimeout(ctx, time.Second)
p, err := dhts[0].FindPeer(ctxT, peers[2].ID)
p, err := dhts[0].FindPeer(ctxT, peers[2].ID())
if err != nil {
t.Fatal(err)
}
@ -388,7 +383,7 @@ func TestFindPeer(t *testing.T) {
t.Fatal("Failed to find peer.")
}
if !p.ID.Equal(peers[2].ID) {
if !p.ID().Equal(peers[2].ID()) {
t.Fatal("Didnt find expected peer.")
}
}

View File

@ -32,12 +32,13 @@ func (di *diagInfo) Marshal() []byte {
func (dht *IpfsDHT) getDiagInfo() *diagInfo {
di := new(diagInfo)
di.CodeVersion = "github.com/jbenet/go-ipfs"
di.ID = dht.self.ID
di.ID = dht.self.ID()
di.LifeSpan = time.Since(dht.birth)
di.Keys = nil // Currently no way to query datastore
for _, p := range dht.routingTables[0].ListPeers() {
di.Connections = append(di.Connections, connDiagInfo{p.GetLatency(), p.ID})
d := connDiagInfo{p.GetLatency(), p.ID()}
di.Connections = append(di.Connections, d)
}
return di
}

View File

@ -9,7 +9,6 @@ import (
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
msg "github.com/jbenet/go-ipfs/net/message"
mux "github.com/jbenet/go-ipfs/net/mux"
peer "github.com/jbenet/go-ipfs/peer"
@ -66,17 +65,17 @@ type fauxNet struct {
}
// DialPeer attempts to establish a connection to a given peer
func (f *fauxNet) DialPeer(*peer.Peer) error {
func (f *fauxNet) DialPeer(peer.Peer) error {
return nil
}
// ClosePeer connection to peer
func (f *fauxNet) ClosePeer(*peer.Peer) error {
func (f *fauxNet) ClosePeer(peer.Peer) error {
return nil
}
// IsConnected returns whether a connection to given peer exists.
func (f *fauxNet) IsConnected(*peer.Peer) (bool, error) {
func (f *fauxNet) IsConnected(peer.Peer) (bool, error) {
return true, nil
}
@ -88,7 +87,7 @@ func (f *fauxNet) SendMessage(msg.NetMessage) error {
return nil
}
func (f *fauxNet) GetPeerList() []*peer.Peer {
func (f *fauxNet) GetPeerList() []peer.Peer {
return nil
}
@ -107,11 +106,10 @@ func TestGetFailures(t *testing.T) {
fs := &fauxSender{}
peerstore := peer.NewPeerstore()
local := new(peer.Peer)
local.ID = peer.ID("test_peer")
local := peer.WithIDString("test_peer")
d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore())
other := &peer.Peer{ID: peer.ID("other_peer")}
other := peer.WithIDString("other_peer")
d.Update(other)
// This one should time out
@ -189,11 +187,10 @@ func TestGetFailures(t *testing.T) {
}
// TODO: Maybe put these in some sort of "ipfs_testutil" package
func _randPeer() *peer.Peer {
p := new(peer.Peer)
p.ID = make(peer.ID, 16)
p.Addresses = []ma.Multiaddr{nil}
crand.Read(p.ID)
func _randPeer() peer.Peer {
id := make(peer.ID, 16)
crand.Read(id)
p := peer.WithID(id)
return p
}
@ -204,13 +201,13 @@ func TestNotFound(t *testing.T) {
fn := &fauxNet{}
fs := &fauxSender{}
local := new(peer.Peer)
local.ID = peer.ID("test_peer")
local := peer.WithIDString("test_peer")
peerstore := peer.NewPeerstore()
peerstore.Put(local)
d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore())
var ps []*peer.Peer
var ps []peer.Peer
for i := 0; i < 5; i++ {
ps = append(ps, _randPeer())
d.Update(ps[i])
@ -228,7 +225,7 @@ func TestNotFound(t *testing.T) {
case Message_GET_VALUE:
resp := &Message{Type: pmes.Type}
peers := []*peer.Peer{}
peers := []peer.Peer{}
for i := 0; i < 7; i++ {
peers = append(peers, _randPeer())
}
@ -270,13 +267,13 @@ func TestLessThanKResponses(t *testing.T) {
u.Debug = false
fn := &fauxNet{}
fs := &fauxSender{}
local := peer.WithIDString("test_peer")
peerstore := peer.NewPeerstore()
local := new(peer.Peer)
local.ID = peer.ID("test_peer")
peerstore.Put(local)
d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore())
var ps []*peer.Peer
var ps []peer.Peer
for i := 0; i < 5; i++ {
ps = append(ps, _randPeer())
d.Update(ps[i])
@ -295,7 +292,7 @@ func TestLessThanKResponses(t *testing.T) {
case Message_GET_VALUE:
resp := &Message{
Type: pmes.Type,
CloserPeers: peersToPBPeers([]*peer.Peer{other}),
CloserPeers: peersToPBPeers([]peer.Peer{other}),
}
mes, err := msg.FromObject(mes.Peer(), resp)

View File

@ -14,7 +14,7 @@ import (
var CloserPeerCount = 4
// dhthandler specifies the signature of functions that handle DHT messages.
type dhtHandler func(*peer.Peer, *Message) (*Message, error)
type dhtHandler func(peer.Peer, *Message) (*Message, error)
func (dht *IpfsDHT) handlerForMsgType(t Message_MessageType) dhtHandler {
switch t {
@ -35,7 +35,7 @@ func (dht *IpfsDHT) handlerForMsgType(t Message_MessageType) dhtHandler {
}
}
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error) {
func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *Message) (*Message, error) {
log.Debug("%s handleGetValue for key: %s\n", dht.self, pmes.GetKey())
// setup response
@ -93,7 +93,7 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error
}
// Store a value in this peer local storage
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *Message) (*Message, error) {
func (dht *IpfsDHT) handlePutValue(p peer.Peer, pmes *Message) (*Message, error) {
dht.dslock.Lock()
defer dht.dslock.Unlock()
dskey := u.Key(pmes.GetKey()).DsKey()
@ -102,18 +102,18 @@ func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *Message) (*Message, error
return pmes, err
}
func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *Message) (*Message, error) {
func (dht *IpfsDHT) handlePing(p peer.Peer, pmes *Message) (*Message, error) {
log.Debug("%s Responding to ping from %s!\n", dht.self, p)
return pmes, nil
}
func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *Message) (*Message, error) {
func (dht *IpfsDHT) handleFindPeer(p peer.Peer, pmes *Message) (*Message, error) {
resp := newMessage(pmes.GetType(), "", pmes.GetClusterLevel())
var closest []*peer.Peer
var closest []peer.Peer
// if looking for self... special case where we send it on CloserPeers.
if peer.ID(pmes.GetKey()).Equal(dht.self.ID) {
closest = []*peer.Peer{dht.self}
if peer.ID(pmes.GetKey()).Equal(dht.self.ID()) {
closest = []peer.Peer{dht.self}
} else {
closest = dht.betterPeersToQuery(pmes, CloserPeerCount)
}
@ -123,9 +123,9 @@ func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *Message) (*Message, error
return resp, nil
}
var withAddresses []*peer.Peer
var withAddresses []peer.Peer
for _, p := range closest {
if len(p.Addresses) > 0 {
if len(p.Addresses()) > 0 {
withAddresses = append(withAddresses, p)
}
}
@ -137,7 +137,7 @@ func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *Message) (*Message, error
return resp, nil
}
func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *Message) (*Message, error) {
func (dht *IpfsDHT) handleGetProviders(p peer.Peer, pmes *Message) (*Message, error) {
resp := newMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
// check if we have this value, to add ourselves as provider.
@ -171,10 +171,10 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *Message) (*Message, e
type providerInfo struct {
Creation time.Time
Value *peer.Peer
Value peer.Peer
}
func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *Message) (*Message, error) {
func (dht *IpfsDHT) handleAddProvider(p peer.Peer, pmes *Message) (*Message, error) {
key := u.Key(pmes.GetKey())
log.Debug("%s adding %s as a provider for '%s'\n", dht.self, p, peer.ID(key))
@ -182,7 +182,7 @@ func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *Message) (*Message, er
// add provider should use the address given in the message
for _, pb := range pmes.GetProviderPeers() {
pid := peer.ID(pb.GetId())
if pid.Equal(p.ID) {
if pid.Equal(p.ID()) {
addr, err := pb.Address()
if err != nil {

View File

@ -20,12 +20,12 @@ type ProviderManager struct {
type addProv struct {
k u.Key
val *peer.Peer
val peer.Peer
}
type getProv struct {
k u.Key
resp chan []*peer.Peer
resp chan []peer.Peer
}
func NewProviderManager(local peer.ID) *ProviderManager {
@ -45,7 +45,7 @@ func (pm *ProviderManager) run() {
for {
select {
case np := <-pm.newprovs:
if np.val.ID.Equal(pm.lpeer) {
if np.val.ID().Equal(pm.lpeer) {
pm.local[np.k] = struct{}{}
}
pi := new(providerInfo)
@ -54,7 +54,7 @@ func (pm *ProviderManager) run() {
arr := pm.providers[np.k]
pm.providers[np.k] = append(arr, pi)
case gp := <-pm.getprovs:
var parr []*peer.Peer
var parr []peer.Peer
provs := pm.providers[gp.k]
for _, p := range provs {
parr = append(parr, p.Value)
@ -82,17 +82,17 @@ func (pm *ProviderManager) run() {
}
}
func (pm *ProviderManager) AddProvider(k u.Key, val *peer.Peer) {
func (pm *ProviderManager) AddProvider(k u.Key, val peer.Peer) {
pm.newprovs <- &addProv{
k: k,
val: val,
}
}
func (pm *ProviderManager) GetProviders(k u.Key) []*peer.Peer {
func (pm *ProviderManager) GetProviders(k u.Key) []peer.Peer {
gp := new(getProv)
gp.k = k
gp.resp = make(chan []*peer.Peer)
gp.resp = make(chan []peer.Peer)
pm.getprovs <- gp
return <-gp.resp
}

View File

@ -11,7 +11,7 @@ func TestProviderManager(t *testing.T) {
mid := peer.ID("testing")
p := NewProviderManager(mid)
a := u.Key("test")
p.AddProvider(a, &peer.Peer{})
p.AddProvider(a, peer.WithIDString("testingprovider"))
resp := p.GetProviders(a)
if len(resp) != 1 {
t.Fatal("Could not retrieve provider.")

View File

@ -27,9 +27,9 @@ type dhtQuery struct {
type dhtQueryResult struct {
value []byte // GetValue
peer *peer.Peer // FindPeer
providerPeers []*peer.Peer // GetProviders
closerPeers []*peer.Peer // *
peer peer.Peer // FindPeer
providerPeers []peer.Peer // GetProviders
closerPeers []peer.Peer // *
success bool
}
@ -47,10 +47,10 @@ func newQuery(k u.Key, f queryFunc) *dhtQuery {
// - the value
// - a list of peers potentially better able to serve the query
// - an error
type queryFunc func(context.Context, *peer.Peer) (*dhtQueryResult, error)
type queryFunc func(context.Context, peer.Peer) (*dhtQueryResult, error)
// Run runs the query at hand. pass in a list of peers to use first.
func (q *dhtQuery) Run(ctx context.Context, peers []*peer.Peer) (*dhtQueryResult, error) {
func (q *dhtQuery) Run(ctx context.Context, peers []peer.Peer) (*dhtQueryResult, error) {
runner := newQueryRunner(ctx, q)
return runner.Run(peers)
}
@ -100,7 +100,7 @@ func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner {
}
}
func (r *dhtQueryRunner) Run(peers []*peer.Peer) (*dhtQueryResult, error) {
func (r *dhtQueryRunner) Run(peers []peer.Peer) (*dhtQueryResult, error) {
log.Debug("Run query with %d peers.", len(peers))
if len(peers) == 0 {
log.Warning("Running query with no peers!")
@ -148,7 +148,7 @@ func (r *dhtQueryRunner) Run(peers []*peer.Peer) (*dhtQueryResult, error) {
return nil, err
}
func (r *dhtQueryRunner) addPeerToQuery(next *peer.Peer, benchmark *peer.Peer) {
func (r *dhtQueryRunner) addPeerToQuery(next peer.Peer, benchmark peer.Peer) {
if next == nil {
// wtf why are peers nil?!?
log.Error("Query getting nil peers!!!\n")
@ -156,7 +156,7 @@ func (r *dhtQueryRunner) addPeerToQuery(next *peer.Peer, benchmark *peer.Peer) {
}
// if new peer further away than whom we got it from, bother (loops)
if benchmark != nil && kb.Closer(benchmark.ID, next.ID, r.query.key) {
if benchmark != nil && kb.Closer(benchmark.ID(), next.ID(), r.query.key) {
return
}
@ -200,7 +200,7 @@ func (r *dhtQueryRunner) spawnWorkers() {
}
}
func (r *dhtQueryRunner) queryPeer(p *peer.Peer) {
func (r *dhtQueryRunner) queryPeer(p peer.Peer) {
log.Debug("spawned worker for: %v\n", p)
// make sure we rate limit concurrency.

View File

@ -23,13 +23,13 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error
return err
}
var peers []*peer.Peer
var peers []peer.Peer
for _, route := range dht.routingTables {
npeers := route.NearestPeers(kb.ConvertKey(key), KValue)
peers = append(peers, npeers...)
}
query := newQuery(key, func(ctx context.Context, p *peer.Peer) (*dhtQueryResult, error) {
query := newQuery(key, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
log.Debug("%s PutValue qry part %v", dht.self, p)
err := dht.putValueToNetwork(ctx, p, string(key), value)
if err != nil {
@ -65,7 +65,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
}
// setup the Query
query := newQuery(key, func(ctx context.Context, p *peer.Peer) (*dhtQueryResult, error) {
query := newQuery(key, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
val, peers, err := dht.getValueOrPeers(ctx, p, key, routeLevel)
if err != nil {
@ -117,8 +117,8 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error {
return nil
}
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int) <-chan *peer.Peer {
peerOut := make(chan *peer.Peer, count)
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int) <-chan peer.Peer {
peerOut := make(chan peer.Peer, count)
go func() {
ps := newPeerSet()
provs := dht.providers.GetProviders(key)
@ -136,7 +136,7 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), AlphaValue)
for _, pp := range peers {
wg.Add(1)
go func(p *peer.Peer) {
go func(p peer.Peer) {
defer wg.Done()
pmes, err := dht.findProvidersSingle(ctx, p, key, 0)
if err != nil {
@ -153,7 +153,7 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int
}
//TODO: this function could also be done asynchronously
func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*Message_Peer, ps *peerSet, count int, out chan *peer.Peer) {
func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*Message_Peer, ps *peerSet, count int, out chan peer.Peer) {
for _, pbp := range peers {
// construct new peer
@ -173,7 +173,7 @@ func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*Message_Peer, ps *peerSet
// Find specific Peer
// FindPeer searches for a peer with given ID.
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (*peer.Peer, error) {
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.Peer, error) {
// Check if were already connected to them
p, _ := dht.FindLocal(id)
@ -186,7 +186,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (*peer.Peer, error
if p == nil {
return nil, nil
}
if p.ID.Equal(id) {
if p.ID().Equal(id) {
return p, nil
}
@ -205,7 +205,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (*peer.Peer, error
continue
}
if nxtPeer.ID.Equal(id) {
if nxtPeer.ID().Equal(id) {
return nxtPeer, nil
}
@ -214,7 +214,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (*peer.Peer, error
return nil, u.ErrNotFound
}
func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (*peer.Peer, error) {
func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (peer.Peer, error) {
// Check if were already connected to them
p, _ := dht.FindLocal(id)
@ -230,7 +230,7 @@ func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (*peer.Pee
}
// setup query function
query := newQuery(u.Key(id), func(ctx context.Context, p *peer.Peer) (*dhtQueryResult, error) {
query := newQuery(u.Key(id), func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
pmes, err := dht.findPeerSingle(ctx, p, id, routeLevel)
if err != nil {
log.Error("%s getPeer error: %v", dht.self, err)
@ -242,7 +242,7 @@ func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (*peer.Pee
routeLevel++
}
nxtprs := make([]*peer.Peer, len(plist))
nxtprs := make([]peer.Peer, len(plist))
for i, fp := range plist {
nxtp, err := dht.peerFromInfo(fp)
if err != nil {
@ -250,7 +250,7 @@ func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (*peer.Pee
continue
}
if nxtp.ID.Equal(id) {
if nxtp.ID().Equal(id) {
return &dhtQueryResult{peer: nxtp, success: true}, nil
}
@ -272,7 +272,7 @@ func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (*peer.Pee
}
// Ping a peer, log the time it took
func (dht *IpfsDHT) Ping(ctx context.Context, p *peer.Peer) error {
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.Peer) error {
// Thoughts: maybe this should accept an ID and do a peer lookup?
log.Info("ping %s start", p)

View File

@ -52,15 +52,15 @@ func newPeerSet() *peerSet {
return ps
}
func (ps *peerSet) Add(p *peer.Peer) {
func (ps *peerSet) Add(p peer.Peer) {
ps.lk.Lock()
ps.ps[string(p.ID)] = true
ps.ps[string(p.ID())] = true
ps.lk.Unlock()
}
func (ps *peerSet) Contains(p *peer.Peer) bool {
func (ps *peerSet) Contains(p peer.Peer) bool {
ps.lk.RLock()
_, ok := ps.ps[string(p.ID)]
_, ok := ps.ps[string(p.ID())]
ps.lk.RUnlock()
return ok
}
@ -71,12 +71,12 @@ func (ps *peerSet) Size() int {
return len(ps.ps)
}
func (ps *peerSet) AddIfSmallerThan(p *peer.Peer, maxsize int) bool {
func (ps *peerSet) AddIfSmallerThan(p peer.Peer, maxsize int) bool {
var success bool
ps.lk.Lock()
if _, ok := ps.ps[string(p.ID)]; !ok && len(ps.ps) < maxsize {
if _, ok := ps.ps[string(p.ID())]; !ok && len(ps.ps) < maxsize {
success = true
ps.ps[string(p.ID)] = true
ps.ps[string(p.ID())] = true
}
ps.lk.Unlock()
return success

View File

@ -23,7 +23,7 @@ func (b *Bucket) find(id peer.ID) *list.Element {
b.lk.RLock()
defer b.lk.RUnlock()
for e := b.list.Front(); e != nil; e = e.Next() {
if e.Value.(*peer.Peer).ID.Equal(id) {
if e.Value.(peer.Peer).ID().Equal(id) {
return e
}
}
@ -36,18 +36,18 @@ func (b *Bucket) moveToFront(e *list.Element) {
b.lk.Unlock()
}
func (b *Bucket) pushFront(p *peer.Peer) {
func (b *Bucket) pushFront(p peer.Peer) {
b.lk.Lock()
b.list.PushFront(p)
b.lk.Unlock()
}
func (b *Bucket) popBack() *peer.Peer {
func (b *Bucket) popBack() peer.Peer {
b.lk.Lock()
defer b.lk.Unlock()
last := b.list.Back()
b.list.Remove(last)
return last.Value.(*peer.Peer)
return last.Value.(peer.Peer)
}
func (b *Bucket) len() int {
@ -68,7 +68,7 @@ func (b *Bucket) Split(cpl int, target ID) *Bucket {
newbuck.list = out
e := b.list.Front()
for e != nil {
peerID := ConvertPeerID(e.Value.(*peer.Peer).ID)
peerID := ConvertPeerID(e.Value.(peer.Peer).ID())
peerCPL := commonPrefixLen(peerID, target)
if peerCPL > cpl {
cur := e

View File

@ -42,10 +42,10 @@ func NewRoutingTable(bucketsize int, localID ID, latency time.Duration) *Routing
// Update adds or moves the given peer to the front of its respective bucket
// If a peer gets removed from a bucket, it is returned
func (rt *RoutingTable) Update(p *peer.Peer) *peer.Peer {
func (rt *RoutingTable) Update(p peer.Peer) peer.Peer {
rt.tabLock.Lock()
defer rt.tabLock.Unlock()
peerID := ConvertPeerID(p.ID)
peerID := ConvertPeerID(p.ID())
cpl := commonPrefixLen(peerID, rt.local)
bucketID := cpl
@ -54,7 +54,7 @@ func (rt *RoutingTable) Update(p *peer.Peer) *peer.Peer {
}
bucket := rt.Buckets[bucketID]
e := bucket.find(p.ID)
e := bucket.find(p.ID())
if e == nil {
// New peer, add to bucket
if p.GetLatency() > rt.maxLatency {
@ -93,7 +93,7 @@ func (rt *RoutingTable) Update(p *peer.Peer) *peer.Peer {
// A helper struct to sort peers by their distance to the local node
type peerDistance struct {
p *peer.Peer
p peer.Peer
distance ID
}
@ -110,8 +110,8 @@ func (p peerSorterArr) Less(a, b int) bool {
func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) peerSorterArr {
for e := peerList.Front(); e != nil; e = e.Next() {
p := e.Value.(*peer.Peer)
pID := ConvertPeerID(p.ID)
p := e.Value.(peer.Peer)
pID := ConvertPeerID(p.ID())
pd := peerDistance{
p: p,
distance: xor(target, pID),
@ -126,16 +126,16 @@ func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) pe
}
// Find a specific peer by ID or return nil
func (rt *RoutingTable) Find(id peer.ID) *peer.Peer {
func (rt *RoutingTable) Find(id peer.ID) peer.Peer {
srch := rt.NearestPeers(ConvertPeerID(id), 1)
if len(srch) == 0 || !srch[0].ID.Equal(id) {
if len(srch) == 0 || !srch[0].ID().Equal(id) {
return nil
}
return srch[0]
}
// NearestPeer returns a single peer that is nearest to the given ID
func (rt *RoutingTable) NearestPeer(id ID) *peer.Peer {
func (rt *RoutingTable) NearestPeer(id ID) peer.Peer {
peers := rt.NearestPeers(id, 1)
if len(peers) > 0 {
return peers[0]
@ -146,7 +146,7 @@ func (rt *RoutingTable) NearestPeer(id ID) *peer.Peer {
}
// NearestPeers returns a list of the 'count' closest peers to the given ID
func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer {
func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.Peer {
rt.tabLock.RLock()
defer rt.tabLock.RUnlock()
cpl := commonPrefixLen(id, rt.local)
@ -178,7 +178,7 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer {
// Sort by distance to local peer
sort.Sort(peerArr)
var out []*peer.Peer
var out []peer.Peer
for i := 0; i < count && i < peerArr.Len(); i++ {
out = append(out, peerArr[i].p)
}
@ -197,11 +197,11 @@ func (rt *RoutingTable) Size() int {
// ListPeers takes a RoutingTable and returns a list of all peers from all buckets in the table.
// NOTE: This is potentially unsafe... use at your own risk
func (rt *RoutingTable) ListPeers() []*peer.Peer {
var peers []*peer.Peer
func (rt *RoutingTable) ListPeers() []peer.Peer {
var peers []peer.Peer
for _, buck := range rt.Buckets {
for e := buck.getIter(); e != nil; e = e.Next() {
peers = append(peers, e.Value.(*peer.Peer))
peers = append(peers, e.Value.(peer.Peer))
}
}
return peers
@ -213,6 +213,6 @@ func (rt *RoutingTable) Print() {
rt.tabLock.RLock()
peers := rt.ListPeers()
for i, p := range peers {
fmt.Printf("%d) %s %s\n", i, p.ID.Pretty(), p.GetLatency().String())
fmt.Printf("%d) %s %s\n", i, p.ID().Pretty(), p.GetLatency().String())
}
}

View File

@ -10,11 +10,10 @@ import (
peer "github.com/jbenet/go-ipfs/peer"
)
func _randPeer() *peer.Peer {
p := new(peer.Peer)
p.ID = make(peer.ID, 16)
crand.Read(p.ID)
return p
func _randPeer() peer.Peer {
id := make(peer.ID, 16)
crand.Read(id)
return peer.WithID(id)
}
func _randID() ID {
@ -29,25 +28,25 @@ func _randID() ID {
func TestBucket(t *testing.T) {
b := newBucket()
peers := make([]*peer.Peer, 100)
peers := make([]peer.Peer, 100)
for i := 0; i < 100; i++ {
peers[i] = _randPeer()
b.pushFront(peers[i])
}
local := _randPeer()
localID := ConvertPeerID(local.ID)
localID := ConvertPeerID(local.ID())
i := rand.Intn(len(peers))
e := b.find(peers[i].ID)
e := b.find(peers[i].ID())
if e == nil {
t.Errorf("Failed to find peer: %v", peers[i])
}
spl := b.Split(0, ConvertPeerID(local.ID))
spl := b.Split(0, ConvertPeerID(local.ID()))
llist := b.list
for e := llist.Front(); e != nil; e = e.Next() {
p := ConvertPeerID(e.Value.(*peer.Peer).ID)
p := ConvertPeerID(e.Value.(peer.Peer).ID())
cpl := commonPrefixLen(p, localID)
if cpl > 0 {
t.Fatalf("Split failed. found id with cpl > 0 in 0 bucket")
@ -56,7 +55,7 @@ func TestBucket(t *testing.T) {
rlist := spl.list
for e := rlist.Front(); e != nil; e = e.Next() {
p := ConvertPeerID(e.Value.(*peer.Peer).ID)
p := ConvertPeerID(e.Value.(peer.Peer).ID())
cpl := commonPrefixLen(p, localID)
if cpl == 0 {
t.Fatalf("Split failed. found id with cpl == 0 in non 0 bucket")
@ -67,9 +66,9 @@ func TestBucket(t *testing.T) {
// Right now, this just makes sure that it doesnt hang or crash
func TestTableUpdate(t *testing.T) {
local := _randPeer()
rt := NewRoutingTable(10, ConvertPeerID(local.ID), time.Hour)
rt := NewRoutingTable(10, ConvertPeerID(local.ID()), time.Hour)
peers := make([]*peer.Peer, 100)
peers := make([]peer.Peer, 100)
for i := 0; i < 100; i++ {
peers[i] = _randPeer()
}
@ -93,33 +92,33 @@ func TestTableUpdate(t *testing.T) {
func TestTableFind(t *testing.T) {
local := _randPeer()
rt := NewRoutingTable(10, ConvertPeerID(local.ID), time.Hour)
rt := NewRoutingTable(10, ConvertPeerID(local.ID()), time.Hour)
peers := make([]*peer.Peer, 100)
peers := make([]peer.Peer, 100)
for i := 0; i < 5; i++ {
peers[i] = _randPeer()
rt.Update(peers[i])
}
t.Logf("Searching for peer: '%s'", peers[2])
found := rt.NearestPeer(ConvertPeerID(peers[2].ID))
if !found.ID.Equal(peers[2].ID) {
found := rt.NearestPeer(ConvertPeerID(peers[2].ID()))
if !found.ID().Equal(peers[2].ID()) {
t.Fatalf("Failed to lookup known node...")
}
}
func TestTableFindMultiple(t *testing.T) {
local := _randPeer()
rt := NewRoutingTable(20, ConvertPeerID(local.ID), time.Hour)
rt := NewRoutingTable(20, ConvertPeerID(local.ID()), time.Hour)
peers := make([]*peer.Peer, 100)
peers := make([]peer.Peer, 100)
for i := 0; i < 18; i++ {
peers[i] = _randPeer()
rt.Update(peers[i])
}
t.Logf("Searching for peer: '%s'", peers[2])
found := rt.NearestPeers(ConvertPeerID(peers[2].ID), 15)
found := rt.NearestPeers(ConvertPeerID(peers[2].ID()), 15)
if len(found) != 15 {
t.Fatalf("Got back different number of peers than we expected.")
}
@ -131,7 +130,7 @@ func TestTableFindMultiple(t *testing.T) {
func TestTableMultithreaded(t *testing.T) {
local := peer.ID("localPeer")
tab := NewRoutingTable(20, ConvertPeerID(local), time.Hour)
var peers []*peer.Peer
var peers []peer.Peer
for i := 0; i < 500; i++ {
peers = append(peers, _randPeer())
}
@ -156,7 +155,7 @@ func TestTableMultithreaded(t *testing.T) {
go func() {
for i := 0; i < 1000; i++ {
n := rand.Intn(len(peers))
tab.Find(peers[n].ID)
tab.Find(peers[n].ID())
}
done <- struct{}{}
}()
@ -170,7 +169,7 @@ func BenchmarkUpdates(b *testing.B) {
local := ConvertKey("localKey")
tab := NewRoutingTable(20, local, time.Hour)
var peers []*peer.Peer
var peers []peer.Peer
for i := 0; i < b.N; i++ {
peers = append(peers, _randPeer())
}
@ -186,7 +185,7 @@ func BenchmarkFinds(b *testing.B) {
local := ConvertKey("localKey")
tab := NewRoutingTable(20, local, time.Hour)
var peers []*peer.Peer
var peers []peer.Peer
for i := 0; i < b.N; i++ {
peers = append(peers, _randPeer())
tab.Update(peers[i])
@ -194,6 +193,6 @@ func BenchmarkFinds(b *testing.B) {
b.StartTimer()
for i := 0; i < b.N; i++ {
tab.Find(peers[i].ID)
tab.Find(peers[i].ID())
}
}

View File

@ -17,10 +17,10 @@ var _ routing.IpfsRouting = &MockRouter{}
type MockRouter struct {
datastore ds.Datastore
hashTable RoutingServer
peer *peer.Peer
peer peer.Peer
}
func NewMockRouter(local *peer.Peer, dstore ds.Datastore) routing.IpfsRouting {
func NewMockRouter(local peer.Peer, dstore ds.Datastore) routing.IpfsRouting {
return &MockRouter{
datastore: dstore,
peer: local,
@ -50,16 +50,16 @@ func (mr *MockRouter) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
return data, nil
}
func (mr *MockRouter) FindProviders(ctx context.Context, key u.Key) ([]*peer.Peer, error) {
func (mr *MockRouter) FindProviders(ctx context.Context, key u.Key) ([]peer.Peer, error) {
return nil, nil
}
func (mr *MockRouter) FindPeer(ctx context.Context, pid peer.ID) (*peer.Peer, error) {
func (mr *MockRouter) FindPeer(ctx context.Context, pid peer.ID) (peer.Peer, error) {
return nil, nil
}
func (mr *MockRouter) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-chan *peer.Peer {
out := make(chan *peer.Peer)
func (mr *MockRouter) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-chan peer.Peer {
out := make(chan peer.Peer)
go func() {
defer close(out)
for i, p := range mr.hashTable.Providers(k) {
@ -81,11 +81,11 @@ func (mr *MockRouter) Provide(_ context.Context, key u.Key) error {
}
type RoutingServer interface {
Announce(*peer.Peer, u.Key) error
Announce(peer.Peer, u.Key) error
Providers(u.Key) []*peer.Peer
Providers(u.Key) []peer.Peer
Client(p *peer.Peer) routing.IpfsRouting
Client(p peer.Peer) routing.IpfsRouting
}
func VirtualRoutingServer() RoutingServer {
@ -99,7 +99,7 @@ type hashTable struct {
providers map[u.Key]peer.Map
}
func (rs *hashTable) Announce(p *peer.Peer, k u.Key) error {
func (rs *hashTable) Announce(p peer.Peer, k u.Key) error {
rs.lock.Lock()
defer rs.lock.Unlock()
@ -111,10 +111,10 @@ func (rs *hashTable) Announce(p *peer.Peer, k u.Key) error {
return nil
}
func (rs *hashTable) Providers(k u.Key) []*peer.Peer {
func (rs *hashTable) Providers(k u.Key) []peer.Peer {
rs.lock.RLock()
defer rs.lock.RUnlock()
ret := make([]*peer.Peer, 0)
ret := make([]peer.Peer, 0)
peerset, ok := rs.providers[k]
if !ok {
return ret
@ -131,7 +131,7 @@ func (rs *hashTable) Providers(k u.Key) []*peer.Peer {
return ret
}
func (rs *hashTable) Client(p *peer.Peer) routing.IpfsRouting {
func (rs *hashTable) Client(p peer.Peer) routing.IpfsRouting {
return &MockRouter{
peer: p,
hashTable: rs,

View File

@ -20,9 +20,7 @@ func TestKeyNotFound(t *testing.T) {
func TestSetAndGet(t *testing.T) {
pid := peer.ID([]byte("the peer id"))
p := &peer.Peer{
ID: pid,
}
p := peer.WithID(pid)
k := u.Key("42")
rs := VirtualRoutingServer()
err := rs.Announce(p, k)
@ -34,7 +32,7 @@ func TestSetAndGet(t *testing.T) {
t.Fatal("should be one")
}
for _, elem := range providers {
if bytes.Equal(elem.ID, pid) {
if bytes.Equal(elem.ID(), pid) {
return
}
}
@ -42,7 +40,7 @@ func TestSetAndGet(t *testing.T) {
}
func TestClientFindProviders(t *testing.T) {
peer := &peer.Peer{ID: []byte("42")}
peer := peer.WithIDString("42")
rs := VirtualRoutingServer()
client := rs.Client(peer)
@ -57,7 +55,7 @@ func TestClientFindProviders(t *testing.T) {
isInHT := false
for _, p := range providersFromHashTable {
if bytes.Equal(p.ID, peer.ID) {
if bytes.Equal(p.ID(), peer.ID()) {
isInHT = true
}
}
@ -67,7 +65,7 @@ func TestClientFindProviders(t *testing.T) {
providersFromClient := client.FindProvidersAsync(context.Background(), u.Key("hello"), max)
isInClient := false
for p := range providersFromClient {
if bytes.Equal(p.ID, peer.ID) {
if bytes.Equal(p.ID(), peer.ID()) {
isInClient = true
}
}
@ -81,9 +79,7 @@ func TestClientOverMax(t *testing.T) {
k := u.Key("hello")
numProvidersForHelloKey := 100
for i := 0; i < numProvidersForHelloKey; i++ {
peer := &peer.Peer{
ID: []byte(string(i)),
}
peer := peer.WithIDString(string(i))
err := rs.Announce(peer, k)
if err != nil {
t.Fatal(err)
@ -96,7 +92,7 @@ func TestClientOverMax(t *testing.T) {
}
max := 10
peer := &peer.Peer{ID: []byte("TODO")}
peer := peer.WithIDString("TODO")
client := rs.Client(peer)
providersFromClient := client.FindProvidersAsync(context.Background(), k, max)
@ -118,9 +114,7 @@ func TestCanceledContext(t *testing.T) {
i := 0
go func() { // infinite stream
for {
peer := &peer.Peer{
ID: []byte(string(i)),
}
peer := peer.WithIDString(string(i))
err := rs.Announce(peer, k)
if err != nil {
t.Fatal(err)
@ -129,7 +123,7 @@ func TestCanceledContext(t *testing.T) {
}
}()
local := &peer.Peer{ID: []byte("peer id doesn't matter")}
local := peer.WithIDString("peer id doesn't matter")
client := rs.Client(local)
t.Log("warning: max is finite so this test is non-deterministic")

View File

@ -10,7 +10,7 @@ import (
// IpfsRouting is the routing module interface
// It is implemented by things like DHTs, etc.
type IpfsRouting interface {
FindProvidersAsync(context.Context, u.Key, int) <-chan *peer.Peer
FindProvidersAsync(context.Context, u.Key, int) <-chan peer.Peer
// Basic Put/Get
@ -28,5 +28,5 @@ type IpfsRouting interface {
// Find specific Peer
// FindPeer searches for a peer with given ID.
FindPeer(context.Context, peer.ID) (*peer.Peer, error)
FindPeer(context.Context, peer.ID) (peer.Peer, error)
}