mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-30 18:13:54 +08:00
peerstore constructs peers
Now, all peers should be retrieved from the Peerstore, which will construct the peers accordingly. This ensures there's only one peer object per peer (opposite would be bad: things get out sync) cc @whyrusleeping
This commit is contained in:
39
core/core.go
39
core/core.go
@ -12,7 +12,6 @@ import (
|
||||
|
||||
bserv "github.com/jbenet/go-ipfs/blockservice"
|
||||
config "github.com/jbenet/go-ipfs/config"
|
||||
ci "github.com/jbenet/go-ipfs/crypto"
|
||||
diag "github.com/jbenet/go-ipfs/diagnostics"
|
||||
exchange "github.com/jbenet/go-ipfs/exchange"
|
||||
bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
|
||||
@ -92,8 +91,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) {
|
||||
}
|
||||
|
||||
peerstore := peer.NewPeerstore()
|
||||
|
||||
local, err := initIdentity(cfg, online)
|
||||
local, err := initIdentity(cfg, peerstore, online)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -179,7 +177,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func initIdentity(cfg *config.Config, online bool) (*peer.Peer, error) {
|
||||
func initIdentity(cfg *config.Config, peers peer.Peerstore, online bool) (*peer.Peer, error) {
|
||||
if cfg.Identity.PeerID == "" {
|
||||
return nil, errors.New("Identity was not set in config (was ipfs init run?)")
|
||||
}
|
||||
@ -188,22 +186,23 @@ func initIdentity(cfg *config.Config, online bool) (*peer.Peer, error) {
|
||||
return nil, errors.New("No peer ID in config! (was ipfs init run?)")
|
||||
}
|
||||
|
||||
// get peer from peerstore (so it is constructed there)
|
||||
id := peer.ID(b58.Decode(cfg.Identity.PeerID))
|
||||
peer, err := peers.Get(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// address is optional
|
||||
var addresses []ma.Multiaddr
|
||||
if len(cfg.Addresses.Swarm) > 0 {
|
||||
maddr, err := ma.NewMultiaddr(cfg.Addresses.Swarm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
addresses = []ma.Multiaddr{maddr}
|
||||
peer.AddAddress(maddr)
|
||||
}
|
||||
|
||||
var (
|
||||
sk ci.PrivKey
|
||||
pk ci.PubKey
|
||||
)
|
||||
|
||||
// when not online, don't need to parse private keys (yet)
|
||||
if online {
|
||||
skb, err := base64.StdEncoding.DecodeString(cfg.Identity.PrivKey)
|
||||
@ -211,20 +210,12 @@ func initIdentity(cfg *config.Config, online bool) (*peer.Peer, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sk, err = ci.UnmarshalPrivateKey(skb)
|
||||
if err != nil {
|
||||
if err := peer.LoadAndVerifyKeyPair(skb); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pk = sk.GetPublic()
|
||||
}
|
||||
|
||||
return &peer.Peer{
|
||||
ID: peer.ID(b58.Decode(cfg.Identity.PeerID)),
|
||||
Addresses: addresses,
|
||||
PrivKey: sk,
|
||||
PubKey: pk,
|
||||
}, nil
|
||||
return peer, nil
|
||||
}
|
||||
|
||||
func initConnections(ctx context.Context, cfg *config.Config, pstore peer.Peerstore, route *dht.IpfsDHT) {
|
||||
@ -240,7 +231,11 @@ func initConnections(ctx context.Context, cfg *config.Config, pstore peer.Peerst
|
||||
}
|
||||
|
||||
// setup peer
|
||||
npeer := &peer.Peer{ID: peer.DecodePrettyID(p.PeerID)}
|
||||
npeer, err := pstore.Get(peer.DecodePrettyID(p.PeerID))
|
||||
if err != nil {
|
||||
log.Error("%s", err)
|
||||
continue
|
||||
}
|
||||
npeer.AddAddress(maddr)
|
||||
|
||||
if err = pstore.Put(npeer); err != nil {
|
||||
|
13
core/mock.go
13
core/mock.go
@ -12,11 +12,18 @@ import (
|
||||
mdht "github.com/jbenet/go-ipfs/routing/mock"
|
||||
)
|
||||
|
||||
// NewMockNode constructs an IpfsNode for use in tests.
|
||||
func NewMockNode() (*IpfsNode, error) {
|
||||
nd := new(IpfsNode)
|
||||
|
||||
//Generate Identity
|
||||
nd.Identity = &peer.Peer{ID: []byte("TESTING")}
|
||||
nd.Peerstore = peer.NewPeerstore()
|
||||
var err error
|
||||
nd.Identity, err = nd.Peerstore.Get(peer.ID("TESTING"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pk, sk, err := ci.GenerateKeyPair(ci.RSA, 1024)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -40,13 +47,13 @@ func NewMockNode() (*IpfsNode, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nd.DAG = &mdag.DAGService{bserv}
|
||||
nd.DAG = &mdag.DAGService{Blocks: bserv}
|
||||
|
||||
// Namespace resolver
|
||||
nd.Namesys = nsys.NewNameSystem(dht)
|
||||
|
||||
// Path resolver
|
||||
nd.Resolver = &path.Resolver{nd.DAG}
|
||||
nd.Resolver = &path.Resolver{DAG: nd.DAG}
|
||||
|
||||
return nd, nil
|
||||
}
|
||||
|
@ -348,41 +348,13 @@ func getOrConstructPeer(peers peer.Peerstore, rpk ci.PubKey) (*peer.Peer, error)
|
||||
}
|
||||
|
||||
npeer, err := peers.Get(rid)
|
||||
if err != nil || npeer == nil {
|
||||
if err != peer.ErrNotFound {
|
||||
return nil, err // unexpected error happened.
|
||||
}
|
||||
|
||||
// dont have peer, so construct it + add it to peerstore.
|
||||
npeer = &peer.Peer{ID: rid, PubKey: rpk}
|
||||
if err := peers.Put(npeer); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// done, return the newly constructed peer.
|
||||
return npeer, nil
|
||||
if err != nil {
|
||||
return nil, err // unexpected error happened.
|
||||
}
|
||||
|
||||
// did have it locally.
|
||||
|
||||
// let's verify ID
|
||||
if !npeer.ID.Equal(rid) {
|
||||
e := "Expected peer.ID does not match sent pubkey's hash: %v - %v"
|
||||
return nil, fmt.Errorf(e, npeer, rid)
|
||||
}
|
||||
|
||||
if npeer.PubKey == nil {
|
||||
// didn't have a pubkey, just set it.
|
||||
npeer.PubKey = rpk
|
||||
return npeer, nil
|
||||
}
|
||||
|
||||
// did have pubkey, let's verify it's really the same.
|
||||
// this shouldn't ever happen, given we hashed, etc, but it could mean
|
||||
// expected code (or protocol) invariants violated.
|
||||
if !npeer.PubKey.Equals(rpk) {
|
||||
log.Error("WARNING: PubKey mismatch: %v", npeer)
|
||||
panic("secure channel pubkey mismatch")
|
||||
// public key verification happens in Peer.VerifyAndSetPubKey
|
||||
if err := npeer.VerifyAndSetPubKey(rpk); err != nil {
|
||||
return nil, err // pubkey mismatch or other problem
|
||||
}
|
||||
return npeer, nil
|
||||
}
|
||||
|
64
peer/peer.go
64
peer/peer.go
@ -1,6 +1,7 @@
|
||||
package peer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -13,6 +14,8 @@ import (
|
||||
"bytes"
|
||||
)
|
||||
|
||||
var log = u.Logger("peer")
|
||||
|
||||
// ID is a byte slice representing the identity of a peer.
|
||||
type ID mh.Multihash
|
||||
|
||||
@ -122,3 +125,64 @@ func (p *Peer) SetLatency(laten time.Duration) {
|
||||
}
|
||||
p.Unlock()
|
||||
}
|
||||
|
||||
// LoadAndVerifyKeyPair unmarshalls, loads a private/public key pair.
|
||||
// Error if (a) unmarshalling fails, or (b) pubkey does not match id.
|
||||
func (p *Peer) LoadAndVerifyKeyPair(marshalled []byte) error {
|
||||
|
||||
sk, err := ic.UnmarshalPrivateKey(marshalled)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to unmarshal private key: %v", err)
|
||||
}
|
||||
|
||||
// construct and assign pubkey. ensure it matches this peer
|
||||
if err := p.VerifyAndSetPubKey(sk.GetPublic()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// if we didn't have the priavte key, assign it
|
||||
if p.PrivKey == nil {
|
||||
p.PrivKey = sk
|
||||
return nil
|
||||
}
|
||||
|
||||
// if we already had the keys, check they're equal.
|
||||
if p.PrivKey.Equals(sk) {
|
||||
return nil // as expected. keep the old objects.
|
||||
}
|
||||
|
||||
// keys not equal. invariant violated. this warrants a panic.
|
||||
// these keys should be _the same_ because peer.ID = H(pk)
|
||||
// this mismatch should never happen.
|
||||
log.Error("%s had PrivKey: %v -- got %v", p, p.PrivKey, sk)
|
||||
panic("invariant violated: unexpected key mismatch")
|
||||
}
|
||||
|
||||
// VerifyAndSetPubKey sets public key, given it matches the peer.ID
|
||||
func (p *Peer) VerifyAndSetPubKey(pk ic.PubKey) error {
|
||||
pkid, err := IDFromPubKey(pk)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to hash public key: %v", err)
|
||||
}
|
||||
|
||||
if !p.ID.Equal(pkid) {
|
||||
return fmt.Errorf("Public key does not match peer.ID.")
|
||||
}
|
||||
|
||||
// if we didn't have the keys, assign them.
|
||||
if p.PubKey == nil {
|
||||
p.PubKey = pk
|
||||
return nil
|
||||
}
|
||||
|
||||
// if we already had the pubkey, check they're equal.
|
||||
if p.PubKey.Equals(pk) {
|
||||
return nil // as expected. keep the old objects.
|
||||
}
|
||||
|
||||
// keys not equal. invariant violated. this warrants a panic.
|
||||
// these keys should be _the same_ because peer.ID = H(pk)
|
||||
// this mismatch should never happen.
|
||||
log.Error("%s had PubKey: %v -- got %v", p, p.PubKey, pk)
|
||||
panic("invariant violated: unexpected key mismatch")
|
||||
}
|
||||
|
@ -9,10 +9,6 @@ import (
|
||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
|
||||
)
|
||||
|
||||
// ErrNotFound signals a peer wasn't found. this is here to avoid having to
|
||||
// leak the ds abstraction to clients of Peerstore, just for the error.
|
||||
var ErrNotFound = ds.ErrNotFound
|
||||
|
||||
// Peerstore provides a threadsafe collection for peers.
|
||||
type Peerstore interface {
|
||||
Get(ID) (*Peer, error)
|
||||
@ -39,15 +35,28 @@ func (p *peerstore) Get(i ID) (*Peer, error) {
|
||||
|
||||
k := u.Key(i).DsKey()
|
||||
val, err := p.peers.Get(k)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
switch err {
|
||||
|
||||
peer, ok := val.(*Peer)
|
||||
if !ok {
|
||||
return nil, errors.New("stored value was not a Peer")
|
||||
// some other datastore error
|
||||
default:
|
||||
return nil, err
|
||||
|
||||
// not found, construct it ourselves, add it to datastore, and return.
|
||||
case ds.ErrNotFound:
|
||||
peer := &Peer{ID: i}
|
||||
if err := p.peers.Put(k, peer); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return peer, nil
|
||||
|
||||
// no error, got it back fine
|
||||
case nil:
|
||||
peer, ok := val.(*Peer)
|
||||
if !ok {
|
||||
return nil, errors.New("stored value was not a Peer")
|
||||
}
|
||||
return peer, nil
|
||||
}
|
||||
return peer, nil
|
||||
}
|
||||
|
||||
func (p *peerstore) Put(peer *Peer) error {
|
||||
|
@ -56,8 +56,8 @@ func TestPeerstore(t *testing.T) {
|
||||
}
|
||||
|
||||
_, err = ps.Get(ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33"))
|
||||
if err == nil {
|
||||
t.Error(errors.New("should've been an error here"))
|
||||
if err != nil {
|
||||
t.Error(errors.New("should not have an error here"))
|
||||
}
|
||||
|
||||
err = ps.Delete(ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31"))
|
||||
@ -65,9 +65,10 @@ func TestPeerstore(t *testing.T) {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
// reconstruct!
|
||||
_, err = ps.Get(ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31"))
|
||||
if err == nil {
|
||||
t.Error(errors.New("should've been an error here"))
|
||||
if err != nil {
|
||||
t.Error(errors.New("should not have an error anyway. reconstruct!"))
|
||||
}
|
||||
|
||||
p22, err = ps.Get(ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a32"))
|
||||
|
@ -300,10 +300,9 @@ func (dht *IpfsDHT) addPeer(pb *Message_Peer) (*peer.Peer, error) {
|
||||
}
|
||||
|
||||
// check if we already have this peer.
|
||||
pr, _ := dht.peerstore.Get(peer.ID(pb.GetId()))
|
||||
if pr == nil {
|
||||
pr = &peer.Peer{ID: peer.ID(pb.GetId())}
|
||||
dht.peerstore.Put(pr)
|
||||
pr, err := dht.getPeer(peer.ID(pb.GetId()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pr.AddAddress(addr) // idempotent
|
||||
|
||||
@ -481,6 +480,16 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *Message, count int) []*peer.Peer {
|
||||
return filtered
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) getPeer(id peer.ID) (*peer.Peer, error) {
|
||||
p, err := dht.peerstore.Get(id)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Failed to get peer from peerstore: %s", err)
|
||||
log.Error("%s", err)
|
||||
return nil, err
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (*peer.Peer, error) {
|
||||
|
||||
id := peer.ID(pbp.GetId())
|
||||
@ -490,26 +499,16 @@ func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (*peer.Peer, error) {
|
||||
return nil, errors.New("found self")
|
||||
}
|
||||
|
||||
p, _ := dht.peerstore.Get(id)
|
||||
if p == nil {
|
||||
p, _ = dht.FindLocal(id)
|
||||
if p != nil {
|
||||
panic("somehow peer not getting into peerstore")
|
||||
}
|
||||
p, err := dht.getPeer(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if p == nil {
|
||||
maddr, err := pbp.Address()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// create new Peer
|
||||
p = &peer.Peer{ID: id}
|
||||
p.AddAddress(maddr)
|
||||
dht.peerstore.Put(p)
|
||||
log.Info("dht found new peer: %s %s", p, maddr)
|
||||
maddr, err := pbp.Address()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.AddAddress(maddr)
|
||||
return p, nil
|
||||
}
|
||||
|
||||
@ -541,6 +540,7 @@ func (dht *IpfsDHT) loadProvidableKeys() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// PingRoutine periodically pings nearest neighbors.
|
||||
func (dht *IpfsDHT) PingRoutine(t time.Duration) {
|
||||
tick := time.Tick(t)
|
||||
for {
|
||||
|
Reference in New Issue
Block a user