1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-29 01:12:24 +08:00

peerstore Put -> Add

Changed lots of peer use, and changed the peerstore to ensure
there is only ever one peer in use.

Fixed #174
This commit is contained in:
Juan Batiz-Benet
2014-10-20 06:37:09 -07:00
parent 9ca87fbb93
commit ac62d13e42
11 changed files with 107 additions and 29 deletions

View File

@ -233,15 +233,10 @@ func initConnections(ctx context.Context, cfg *config.Config, pstore peer.Peerst
// setup peer // setup peer
npeer, err := pstore.Get(peer.DecodePrettyID(p.PeerID)) npeer, err := pstore.Get(peer.DecodePrettyID(p.PeerID))
if err != nil { if err != nil {
log.Error("%s", err)
continue
}
npeer.AddAddress(maddr)
if err = pstore.Put(npeer); err != nil {
log.Error("Bootstrapping error: %v", err) log.Error("Bootstrapping error: %v", err)
continue continue
} }
npeer.AddAddress(maddr)
if _, err = route.Connect(ctx, npeer); err != nil { if _, err = route.Connect(ctx, npeer); err != nil {
log.Error("Bootstrapping error: %v", err) log.Error("Bootstrapping error: %v", err)

View File

@ -22,12 +22,16 @@ func NewMockNode() (*IpfsNode, error) {
return nil, err return nil, err
} }
nd.Identity, err = peer.WithKeyPair(sk, pk) p, err := peer.WithKeyPair(sk, pk)
if err != nil { if err != nil {
return nil, err return nil, err
} }
nd.Peerstore = peer.NewPeerstore() nd.Peerstore = peer.NewPeerstore()
nd.Peerstore.Put(nd.Identity) nd.Identity, err = nd.Peerstore.Add(p)
if err != nil {
return nil, err
}
// Temp Datastore // Temp Datastore
dstore := ds.NewMapDatastore() dstore := ds.NewMapDatastore()

View File

@ -23,6 +23,11 @@ func (d *Dialer) Dial(ctx context.Context, network string, remote peer.Peer) (Co
return nil, fmt.Errorf("No remote address for network %s", network) return nil, fmt.Errorf("No remote address for network %s", network)
} }
remote, err := d.Peerstore.Add(remote)
if err != nil {
log.Error("Error putting peer into peerstore: %s", remote)
}
// TODO: try to get reusing addr/ports to work. // TODO: try to get reusing addr/ports to work.
// madialer := manet.Dialer{LocalAddr: laddr} // madialer := manet.Dialer{LocalAddr: laddr}
madialer := manet.Dialer{} madialer := manet.Dialer{}
@ -33,10 +38,6 @@ func (d *Dialer) Dial(ctx context.Context, network string, remote peer.Peer) (Co
return nil, err return nil, err
} }
if err := d.Peerstore.Put(remote); err != nil {
log.Error("Error putting peer into peerstore: %s", remote)
}
c, err := newSingleConn(ctx, d.LocalPeer, remote, maconn) c, err := newSingleConn(ctx, d.LocalPeer, remote, maconn)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -68,13 +68,18 @@ func setupConn(t *testing.T, ctx context.Context, a1, a2 string) (a, b Conn) {
t.Fatal("Listen address is nil.") t.Fatal("Listen address is nil.")
} }
l1, err := Listen(ctx, laddr, p1, peer.NewPeerstore()) ps1 := peer.NewPeerstore()
ps2 := peer.NewPeerstore()
ps1.Add(p1)
ps2.Add(p2)
l1, err := Listen(ctx, laddr, p1, ps1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
d2 := &Dialer{ d2 := &Dialer{
Peerstore: peer.NewPeerstore(), Peerstore: ps2,
LocalPeer: p2, LocalPeer: p2,
} }
@ -108,7 +113,12 @@ func TestDialer(t *testing.T) {
t.Fatal("Listen address is nil.") t.Fatal("Listen address is nil.")
} }
l, err := Listen(ctx, laddr, p1, peer.NewPeerstore()) ps1 := peer.NewPeerstore()
ps2 := peer.NewPeerstore()
ps1.Add(p1)
ps2.Add(p2)
l, err := Listen(ctx, laddr, p1, ps1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -116,7 +126,7 @@ func TestDialer(t *testing.T) {
go echoListen(ctx, l) go echoListen(ctx, l)
d := &Dialer{ d := &Dialer{
Peerstore: peer.NewPeerstore(), Peerstore: ps2,
LocalPeer: p2, LocalPeer: p2,
} }

View File

@ -72,10 +72,10 @@ func (c *MultiConn) Add(conns ...Conn) {
log.Error("%s", c2) log.Error("%s", c2)
c.Unlock() // ok to unlock (to log). panicing. c.Unlock() // ok to unlock (to log). panicing.
log.Error("%s", c) log.Error("%s", c)
log.Error("c.LocalPeer: %s %#v", c.LocalPeer(), c.LocalPeer()) log.Error("c.LocalPeer: %s %p", c.LocalPeer(), c.LocalPeer())
log.Error("c2.LocalPeer: %s %#v", c2.LocalPeer(), c2.LocalPeer()) log.Error("c2.LocalPeer: %s %p", c2.LocalPeer(), c2.LocalPeer())
log.Error("c.RemotePeer: %s %#v", c.RemotePeer(), c.RemotePeer()) log.Error("c.RemotePeer: %s %p", c.RemotePeer(), c.RemotePeer())
log.Error("c2.RemotePeer: %s %#v", c2.RemotePeer(), c2.RemotePeer()) log.Error("c2.RemotePeer: %s %p", c2.RemotePeer(), c2.RemotePeer())
c.Lock() // gotta relock to avoid lock panic from deferring. c.Lock() // gotta relock to avoid lock panic from deferring.
panic("connection addresses mismatch") panic("connection addresses mismatch")
} }

View File

@ -95,6 +95,8 @@ func setupMultiConns(t *testing.T, ctx context.Context) (a, b *MultiConn) {
// peerstores // peerstores
p1ps := peer.NewPeerstore() p1ps := peer.NewPeerstore()
p2ps := peer.NewPeerstore() p2ps := peer.NewPeerstore()
p1ps.Add(p1)
p2ps.Add(p2)
// listeners // listeners
listen := func(addr ma.Multiaddr, p peer.Peer, ps peer.Peerstore) Listener { listen := func(addr ma.Multiaddr, p peer.Peer, ps peer.Peerstore) Listener {

View File

@ -116,7 +116,7 @@ func (s *Swarm) Dial(peer peer.Peer) (conn.Conn, error) {
} }
// check if we don't have the peer in Peerstore // check if we don't have the peer in Peerstore
err := s.peers.Put(peer) peer, err := s.peers.Add(peer)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -1,6 +1,7 @@
package peer package peer
import ( import (
"errors"
"fmt" "fmt"
"sync" "sync"
"time" "time"
@ -83,6 +84,9 @@ type Peer interface {
// Get/SetLatency manipulate the current latency measurement. // Get/SetLatency manipulate the current latency measurement.
GetLatency() (out time.Duration) GetLatency() (out time.Duration)
SetLatency(laten time.Duration) SetLatency(laten time.Duration)
// Update with the data of another peer instance
Update(Peer) error
} }
type peer struct { type peer struct {
@ -125,7 +129,9 @@ func (p *peer) PubKey() ic.PubKey {
// Addresses returns the peer's multiaddrs // Addresses returns the peer's multiaddrs
func (p *peer) Addresses() []ma.Multiaddr { func (p *peer) Addresses() []ma.Multiaddr {
cp := make([]ma.Multiaddr, len(p.addresses)) cp := make([]ma.Multiaddr, len(p.addresses))
p.RLock()
copy(cp, p.addresses) copy(cp, p.addresses)
defer p.RUnlock()
return cp return cp
} }
@ -182,7 +188,6 @@ func (p *peer) SetLatency(laten time.Duration) {
// LoadAndVerifyKeyPair unmarshalls, loads a private/public key pair. // LoadAndVerifyKeyPair unmarshalls, loads a private/public key pair.
// Error if (a) unmarshalling fails, or (b) pubkey does not match id. // Error if (a) unmarshalling fails, or (b) pubkey does not match id.
func (p *peer) LoadAndVerifyKeyPair(marshalled []byte) error { func (p *peer) LoadAndVerifyKeyPair(marshalled []byte) error {
sk, err := ic.UnmarshalPrivateKey(marshalled) sk, err := ic.UnmarshalPrivateKey(marshalled)
if err != nil { if err != nil {
return fmt.Errorf("Failed to unmarshal private key: %v", err) return fmt.Errorf("Failed to unmarshal private key: %v", err)
@ -199,6 +204,9 @@ func (p *peer) VerifyAndSetPrivKey(sk ic.PrivKey) error {
return err return err
} }
p.Lock()
defer p.Unlock()
// if we didn't have the priavte key, assign it // if we didn't have the priavte key, assign it
if p.privKey == nil { if p.privKey == nil {
p.privKey = sk p.privKey = sk
@ -224,6 +232,9 @@ func (p *peer) VerifyAndSetPubKey(pk ic.PubKey) error {
return fmt.Errorf("Failed to hash public key: %v", err) return fmt.Errorf("Failed to hash public key: %v", err)
} }
p.Lock()
defer p.Unlock()
if !p.id.Equal(pkid) { if !p.id.Equal(pkid) {
return fmt.Errorf("Public key does not match peer.ID.") return fmt.Errorf("Public key does not match peer.ID.")
} }
@ -246,6 +257,29 @@ func (p *peer) VerifyAndSetPubKey(pk ic.PubKey) error {
panic("invariant violated: unexpected key mismatch") panic("invariant violated: unexpected key mismatch")
} }
func (p *peer) Update(other Peer) error {
if !p.ID().Equal(other.ID()) {
return errors.New("peer ids do not match")
}
for _, a := range other.Addresses() {
p.AddAddress(a)
}
sk := other.PrivKey()
pk := other.PubKey()
p.Lock()
if p.privKey == nil {
p.privKey = sk
}
if p.pubKey == nil {
p.pubKey = pk
}
defer p.Unlock()
return nil
}
// WithKeyPair returns a Peer object with given keys. // WithKeyPair returns a Peer object with given keys.
func WithKeyPair(sk ic.PrivKey, pk ic.PubKey) (Peer, error) { func WithKeyPair(sk ic.PrivKey, pk ic.PubKey) (Peer, error) {
if sk == nil && pk == nil { if sk == nil && pk == nil {

View File

@ -12,7 +12,7 @@ import (
// Peerstore provides a threadsafe collection for peers. // Peerstore provides a threadsafe collection for peers.
type Peerstore interface { type Peerstore interface {
Get(ID) (Peer, error) Get(ID) (Peer, error)
Put(Peer) error Add(Peer) (Peer, error)
Delete(ID) error Delete(ID) error
All() (*Map, error) All() (*Map, error)
} }
@ -63,12 +63,37 @@ func (p *peerstore) Get(i ID) (Peer, error) {
} }
} }
func (p *peerstore) Put(peer Peer) error { func (p *peerstore) Add(peer Peer) (Peer, error) {
p.Lock() p.Lock()
defer p.Unlock() defer p.Unlock()
k := peer.Key().DsKey() k := peer.Key().DsKey()
return p.peers.Put(k, peer) val, err := p.peers.Get(k)
switch err {
// some other datastore error
default:
return nil, err
// not found? just add and return.
case ds.ErrNotFound:
err := p.peers.Put(k, peer)
return peer, err
// no error, already here.
case nil:
peer2, ok := val.(Peer)
if !ok {
return nil, errors.New("stored value was not a Peer")
}
if peer == peer2 {
return peer, nil
}
// must do some merging.
peer2.Update(peer)
return peer2, nil
}
} }
func (p *peerstore) Delete(i ID) error { func (p *peerstore) Delete(i ID) error {

View File

@ -27,11 +27,15 @@ func TestPeerstore(t *testing.T) {
// p31, _ := setupPeer("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33", "/ip4/127.0.0.1/tcp/3456") // p31, _ := setupPeer("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33", "/ip4/127.0.0.1/tcp/3456")
// p41, _ := setupPeer("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a34", "/ip4/127.0.0.1/tcp/4567") // p41, _ := setupPeer("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a34", "/ip4/127.0.0.1/tcp/4567")
err := ps.Put(p11) p13, err := ps.Add(p11)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
if p13 != p11 {
t.Error("these should be the same")
}
p12, err := ps.Get(ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31")) p12, err := ps.Get(ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31"))
if err != nil { if err != nil {
t.Error(err) t.Error(err)
@ -41,10 +45,13 @@ func TestPeerstore(t *testing.T) {
t.Error(errors.New("peers should be the same")) t.Error(errors.New("peers should be the same"))
} }
err = ps.Put(p21) p23, err := ps.Add(p21)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
if p23 != p21 {
t.Error("These should be the same")
}
p22, err := ps.Get(ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a32")) p22, err := ps.Get(ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a32"))
if err != nil { if err != nil {

View File

@ -203,7 +203,7 @@ func TestNotFound(t *testing.T) {
local := peer.WithIDString("test_peer") local := peer.WithIDString("test_peer")
peerstore := peer.NewPeerstore() peerstore := peer.NewPeerstore()
peerstore.Put(local) peerstore.Add(local)
d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore()) d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore())
@ -269,7 +269,7 @@ func TestLessThanKResponses(t *testing.T) {
fs := &fauxSender{} fs := &fauxSender{}
local := peer.WithIDString("test_peer") local := peer.WithIDString("test_peer")
peerstore := peer.NewPeerstore() peerstore := peer.NewPeerstore()
peerstore.Put(local) peerstore.Add(local)
d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore()) d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore())