From ac62d13e426b25d611be3a60e474163a00830bd6 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Mon, 20 Oct 2014 06:37:09 -0700 Subject: [PATCH] peerstore Put -> Add Changed lots of peer use, and changed the peerstore to ensure there is only ever one peer in use. Fixed #174 --- core/core.go | 7 +------ core/mock.go | 8 ++++++-- net/conn/dial.go | 9 +++++---- net/conn/dial_test.go | 18 ++++++++++++++---- net/conn/multiconn.go | 8 ++++---- net/conn/multiconn_test.go | 2 ++ net/swarm/swarm.go | 2 +- peer/peer.go | 36 +++++++++++++++++++++++++++++++++++- peer/peerstore.go | 31 ++++++++++++++++++++++++++++--- peer/peerstore_test.go | 11 +++++++++-- routing/dht/ext_test.go | 4 ++-- 11 files changed, 107 insertions(+), 29 deletions(-) diff --git a/core/core.go b/core/core.go index 22cf77e8c..bb30d59d0 100644 --- a/core/core.go +++ b/core/core.go @@ -233,15 +233,10 @@ func initConnections(ctx context.Context, cfg *config.Config, pstore peer.Peerst // setup peer npeer, err := pstore.Get(peer.DecodePrettyID(p.PeerID)) if err != nil { - log.Error("%s", err) - continue - } - npeer.AddAddress(maddr) - - if err = pstore.Put(npeer); err != nil { log.Error("Bootstrapping error: %v", err) continue } + npeer.AddAddress(maddr) if _, err = route.Connect(ctx, npeer); err != nil { log.Error("Bootstrapping error: %v", err) diff --git a/core/mock.go b/core/mock.go index 114acd7db..adc30203f 100644 --- a/core/mock.go +++ b/core/mock.go @@ -22,12 +22,16 @@ func NewMockNode() (*IpfsNode, error) { return nil, err } - nd.Identity, err = peer.WithKeyPair(sk, pk) + p, err := peer.WithKeyPair(sk, pk) if err != nil { return nil, err } + nd.Peerstore = peer.NewPeerstore() - nd.Peerstore.Put(nd.Identity) + nd.Identity, err = nd.Peerstore.Add(p) + if err != nil { + return nil, err + } // Temp Datastore dstore := ds.NewMapDatastore() diff --git a/net/conn/dial.go b/net/conn/dial.go index 050475e78..3eb4573e2 100644 --- a/net/conn/dial.go +++ b/net/conn/dial.go @@ -23,6 +23,11 @@ func (d *Dialer) Dial(ctx context.Context, network string, remote peer.Peer) (Co return nil, fmt.Errorf("No remote address for network %s", network) } + remote, err := d.Peerstore.Add(remote) + if err != nil { + log.Error("Error putting peer into peerstore: %s", remote) + } + // TODO: try to get reusing addr/ports to work. // madialer := manet.Dialer{LocalAddr: laddr} madialer := manet.Dialer{} @@ -33,10 +38,6 @@ func (d *Dialer) Dial(ctx context.Context, network string, remote peer.Peer) (Co return nil, err } - if err := d.Peerstore.Put(remote); err != nil { - log.Error("Error putting peer into peerstore: %s", remote) - } - c, err := newSingleConn(ctx, d.LocalPeer, remote, maconn) if err != nil { return nil, err diff --git a/net/conn/dial_test.go b/net/conn/dial_test.go index f0fa6a3c1..50ef51e77 100644 --- a/net/conn/dial_test.go +++ b/net/conn/dial_test.go @@ -68,13 +68,18 @@ func setupConn(t *testing.T, ctx context.Context, a1, a2 string) (a, b Conn) { t.Fatal("Listen address is nil.") } - l1, err := Listen(ctx, laddr, p1, peer.NewPeerstore()) + ps1 := peer.NewPeerstore() + ps2 := peer.NewPeerstore() + ps1.Add(p1) + ps2.Add(p2) + + l1, err := Listen(ctx, laddr, p1, ps1) if err != nil { t.Fatal(err) } d2 := &Dialer{ - Peerstore: peer.NewPeerstore(), + Peerstore: ps2, LocalPeer: p2, } @@ -108,7 +113,12 @@ func TestDialer(t *testing.T) { t.Fatal("Listen address is nil.") } - l, err := Listen(ctx, laddr, p1, peer.NewPeerstore()) + ps1 := peer.NewPeerstore() + ps2 := peer.NewPeerstore() + ps1.Add(p1) + ps2.Add(p2) + + l, err := Listen(ctx, laddr, p1, ps1) if err != nil { t.Fatal(err) } @@ -116,7 +126,7 @@ func TestDialer(t *testing.T) { go echoListen(ctx, l) d := &Dialer{ - Peerstore: peer.NewPeerstore(), + Peerstore: ps2, LocalPeer: p2, } diff --git a/net/conn/multiconn.go b/net/conn/multiconn.go index dce3e3acc..5d68a7e96 100644 --- a/net/conn/multiconn.go +++ b/net/conn/multiconn.go @@ -72,10 +72,10 @@ func (c *MultiConn) Add(conns ...Conn) { log.Error("%s", c2) c.Unlock() // ok to unlock (to log). panicing. log.Error("%s", c) - log.Error("c.LocalPeer: %s %#v", c.LocalPeer(), c.LocalPeer()) - log.Error("c2.LocalPeer: %s %#v", c2.LocalPeer(), c2.LocalPeer()) - log.Error("c.RemotePeer: %s %#v", c.RemotePeer(), c.RemotePeer()) - log.Error("c2.RemotePeer: %s %#v", c2.RemotePeer(), c2.RemotePeer()) + log.Error("c.LocalPeer: %s %p", c.LocalPeer(), c.LocalPeer()) + log.Error("c2.LocalPeer: %s %p", c2.LocalPeer(), c2.LocalPeer()) + log.Error("c.RemotePeer: %s %p", c.RemotePeer(), c.RemotePeer()) + log.Error("c2.RemotePeer: %s %p", c2.RemotePeer(), c2.RemotePeer()) c.Lock() // gotta relock to avoid lock panic from deferring. panic("connection addresses mismatch") } diff --git a/net/conn/multiconn_test.go b/net/conn/multiconn_test.go index e45a348d8..aa80cd499 100644 --- a/net/conn/multiconn_test.go +++ b/net/conn/multiconn_test.go @@ -95,6 +95,8 @@ func setupMultiConns(t *testing.T, ctx context.Context) (a, b *MultiConn) { // peerstores p1ps := peer.NewPeerstore() p2ps := peer.NewPeerstore() + p1ps.Add(p1) + p2ps.Add(p2) // listeners listen := func(addr ma.Multiaddr, p peer.Peer, ps peer.Peerstore) Listener { diff --git a/net/swarm/swarm.go b/net/swarm/swarm.go index 1085abf61..2d73a86f6 100644 --- a/net/swarm/swarm.go +++ b/net/swarm/swarm.go @@ -116,7 +116,7 @@ func (s *Swarm) Dial(peer peer.Peer) (conn.Conn, error) { } // check if we don't have the peer in Peerstore - err := s.peers.Put(peer) + peer, err := s.peers.Add(peer) if err != nil { return nil, err } diff --git a/peer/peer.go b/peer/peer.go index 93ddc6498..736536b5e 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -1,6 +1,7 @@ package peer import ( + "errors" "fmt" "sync" "time" @@ -83,6 +84,9 @@ type Peer interface { // Get/SetLatency manipulate the current latency measurement. GetLatency() (out time.Duration) SetLatency(laten time.Duration) + + // Update with the data of another peer instance + Update(Peer) error } type peer struct { @@ -125,7 +129,9 @@ func (p *peer) PubKey() ic.PubKey { // Addresses returns the peer's multiaddrs func (p *peer) Addresses() []ma.Multiaddr { cp := make([]ma.Multiaddr, len(p.addresses)) + p.RLock() copy(cp, p.addresses) + defer p.RUnlock() return cp } @@ -182,7 +188,6 @@ func (p *peer) SetLatency(laten time.Duration) { // LoadAndVerifyKeyPair unmarshalls, loads a private/public key pair. // Error if (a) unmarshalling fails, or (b) pubkey does not match id. func (p *peer) LoadAndVerifyKeyPair(marshalled []byte) error { - sk, err := ic.UnmarshalPrivateKey(marshalled) if err != nil { return fmt.Errorf("Failed to unmarshal private key: %v", err) @@ -199,6 +204,9 @@ func (p *peer) VerifyAndSetPrivKey(sk ic.PrivKey) error { return err } + p.Lock() + defer p.Unlock() + // if we didn't have the priavte key, assign it if p.privKey == nil { p.privKey = sk @@ -224,6 +232,9 @@ func (p *peer) VerifyAndSetPubKey(pk ic.PubKey) error { return fmt.Errorf("Failed to hash public key: %v", err) } + p.Lock() + defer p.Unlock() + if !p.id.Equal(pkid) { return fmt.Errorf("Public key does not match peer.ID.") } @@ -246,6 +257,29 @@ func (p *peer) VerifyAndSetPubKey(pk ic.PubKey) error { panic("invariant violated: unexpected key mismatch") } +func (p *peer) Update(other Peer) error { + if !p.ID().Equal(other.ID()) { + return errors.New("peer ids do not match") + } + + for _, a := range other.Addresses() { + p.AddAddress(a) + } + + sk := other.PrivKey() + pk := other.PubKey() + p.Lock() + if p.privKey == nil { + p.privKey = sk + } + + if p.pubKey == nil { + p.pubKey = pk + } + defer p.Unlock() + return nil +} + // WithKeyPair returns a Peer object with given keys. func WithKeyPair(sk ic.PrivKey, pk ic.PubKey) (Peer, error) { if sk == nil && pk == nil { diff --git a/peer/peerstore.go b/peer/peerstore.go index 4004371fd..7bbb3e125 100644 --- a/peer/peerstore.go +++ b/peer/peerstore.go @@ -12,7 +12,7 @@ import ( // Peerstore provides a threadsafe collection for peers. type Peerstore interface { Get(ID) (Peer, error) - Put(Peer) error + Add(Peer) (Peer, error) Delete(ID) error All() (*Map, error) } @@ -63,12 +63,37 @@ func (p *peerstore) Get(i ID) (Peer, error) { } } -func (p *peerstore) Put(peer Peer) error { +func (p *peerstore) Add(peer Peer) (Peer, error) { p.Lock() defer p.Unlock() k := peer.Key().DsKey() - return p.peers.Put(k, peer) + val, err := p.peers.Get(k) + switch err { + // some other datastore error + default: + return nil, err + + // not found? just add and return. + case ds.ErrNotFound: + err := p.peers.Put(k, peer) + return peer, err + + // no error, already here. + case nil: + peer2, ok := val.(Peer) + if !ok { + return nil, errors.New("stored value was not a Peer") + } + + if peer == peer2 { + return peer, nil + } + + // must do some merging. + peer2.Update(peer) + return peer2, nil + } } func (p *peerstore) Delete(i ID) error { diff --git a/peer/peerstore_test.go b/peer/peerstore_test.go index 4dce4c46b..5068ddc55 100644 --- a/peer/peerstore_test.go +++ b/peer/peerstore_test.go @@ -27,11 +27,15 @@ func TestPeerstore(t *testing.T) { // p31, _ := setupPeer("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33", "/ip4/127.0.0.1/tcp/3456") // p41, _ := setupPeer("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a34", "/ip4/127.0.0.1/tcp/4567") - err := ps.Put(p11) + p13, err := ps.Add(p11) if err != nil { t.Error(err) } + if p13 != p11 { + t.Error("these should be the same") + } + p12, err := ps.Get(ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31")) if err != nil { t.Error(err) @@ -41,10 +45,13 @@ func TestPeerstore(t *testing.T) { t.Error(errors.New("peers should be the same")) } - err = ps.Put(p21) + p23, err := ps.Add(p21) if err != nil { t.Error(err) } + if p23 != p21 { + t.Error("These should be the same") + } p22, err := ps.Get(ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a32")) if err != nil { diff --git a/routing/dht/ext_test.go b/routing/dht/ext_test.go index c4ba09414..b38b12d6c 100644 --- a/routing/dht/ext_test.go +++ b/routing/dht/ext_test.go @@ -203,7 +203,7 @@ func TestNotFound(t *testing.T) { local := peer.WithIDString("test_peer") peerstore := peer.NewPeerstore() - peerstore.Put(local) + peerstore.Add(local) d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore()) @@ -269,7 +269,7 @@ func TestLessThanKResponses(t *testing.T) { fs := &fauxSender{} local := peer.WithIDString("test_peer") peerstore := peer.NewPeerstore() - peerstore.Put(local) + peerstore.Add(local) d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore())