diff --git a/core/bootstrap.go b/core/bootstrap.go index b1d1f2587..116226028 100644 --- a/core/bootstrap.go +++ b/core/bootstrap.go @@ -190,7 +190,7 @@ func bootstrapConnect(ctx context.Context, defer log.EventBegin(ctx, "bootstrapDial", route.LocalPeer(), p.ID).Done() log.Debugf("%s bootstrapping to %s", route.LocalPeer(), p.ID) - ps.AddAddresses(p.ID, p.Addrs) + ps.AddAddrs(p.ID, p.Addrs, peer.PermanentAddrTTL) err := route.Connect(ctx, p.ID) if err != nil { log.Event(ctx, "bootstrapDialFailed", p.ID) diff --git a/core/commands/id.go b/core/commands/id.go index 11ef40575..911d50397 100644 --- a/core/commands/id.go +++ b/core/commands/id.go @@ -151,18 +151,18 @@ func printPeer(ps peer.Peerstore, p peer.ID) (interface{}, error) { info.PublicKey = base64.StdEncoding.EncodeToString(pkb) } - for _, a := range ps.Addresses(p) { + for _, a := range ps.Addrs(p) { info.Addresses = append(info.Addresses, a.String()) } if v, err := ps.Get(p, "ProtocolVersion"); err == nil { if vs, ok := v.(string); ok { - info.AgentVersion = vs + info.ProtocolVersion = vs } } if v, err := ps.Get(p, "AgentVersion"); err == nil { if vs, ok := v.(string); ok { - info.ProtocolVersion = vs + info.AgentVersion = vs } } diff --git a/core/commands/ping.go b/core/commands/ping.go index 73636db92..3242f6ea8 100644 --- a/core/commands/ping.go +++ b/core/commands/ping.go @@ -95,7 +95,7 @@ trip latency information. } if addr != nil { - n.Peerstore.AddAddress(peerID, addr) + n.Peerstore.AddAddr(peerID, addr, peer.TempAddrTTL) // temporary } // Set up number of pings @@ -120,7 +120,7 @@ func pingPeer(ctx context.Context, n *core.IpfsNode, pid peer.ID, numPings int) go func() { defer close(outChan) - if len(n.Peerstore.Addresses(pid)) == 0 { + if len(n.Peerstore.Addrs(pid)) == 0 { // Make sure we can find the node in question outChan <- &PingResult{ Text: fmt.Sprintf("Looking up peer %s", pid.Pretty()), @@ -132,7 +132,7 @@ func pingPeer(ctx context.Context, n *core.IpfsNode, pid peer.ID, numPings int) outChan <- &PingResult{Text: fmt.Sprintf("Peer lookup error: %s", err)} return } - n.Peerstore.AddPeerInfo(p) + n.Peerstore.AddAddrs(p.ID, p.Addrs, peer.TempAddrTTL) } outChan <- &PingResult{Text: fmt.Sprintf("PING %s.", pid.Pretty())} diff --git a/core/commands/swarm.go b/core/commands/swarm.go index 1db08aa02..5026a0153 100644 --- a/core/commands/swarm.go +++ b/core/commands/swarm.go @@ -18,11 +18,16 @@ type stringList struct { Strings []string } +type addrMap struct { + Addrs map[string][]string +} + var SwarmCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "swarm inspection tool", Synopsis: ` ipfs swarm peers - List peers with open connections +ipfs swarm addrs - List known addresses. Useful to debug. ipfs swarm connect
- Open connection to a given address ipfs swarm disconnect
- Close connection to a given address `, @@ -34,6 +39,7 @@ ipfs peers in the internet. }, Subcommands: map[string]*cmds.Command{ "peers": swarmPeersCmd, + "addrs": swarmAddrsCmd, "connect": swarmConnectCmd, "disconnect": swarmDisconnectCmd, }, @@ -77,6 +83,66 @@ ipfs swarm peers lists the set of peers this node is connected to. Type: stringList{}, } +var swarmAddrsCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "List known addresses. Useful to debug.", + ShortDescription: ` +ipfs swarm addrs lists all addresses this node is aware of. +`, + }, + Run: func(req cmds.Request, res cmds.Response) { + + n, err := req.Context().GetNode() + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + if n.PeerHost == nil { + res.SetError(errNotOnline, cmds.ErrClient) + return + } + + addrs := make(map[string][]string) + ps := n.PeerHost.Network().Peerstore() + for _, p := range ps.Peers() { + s := p.Pretty() + for _, a := range ps.Addrs(p) { + addrs[s] = append(addrs[s], a.String()) + } + sort.Sort(sort.StringSlice(addrs[s])) + } + + res.SetOutput(&addrMap{Addrs: addrs}) + }, + Marshalers: cmds.MarshalerMap{ + cmds.Text: func(res cmds.Response) (io.Reader, error) { + m, ok := res.Output().(*addrMap) + if !ok { + return nil, errors.New("failed to cast map[string]string") + } + + // sort the ids first + ids := make([]string, 0, len(m.Addrs)) + for p := range m.Addrs { + ids = append(ids, p) + } + sort.Sort(sort.StringSlice(ids)) + + var buf bytes.Buffer + for _, p := range ids { + paddrs := m.Addrs[p] + buf.WriteString(fmt.Sprintf("%s (%d)\n", p, len(paddrs))) + for _, addr := range paddrs { + buf.WriteString("\t" + addr + "\n") + } + } + return &buf, nil + }, + }, + Type: addrMap{}, +} + var swarmConnectCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Open connection to a given address", @@ -236,7 +302,7 @@ func peersWithAddresses(ps peer.Peerstore, addrs []string) (pids []peer.ID, err for _, iaddr := range iaddrs { pids = append(pids, iaddr.ID()) - ps.AddAddress(iaddr.ID(), iaddr.Multiaddr()) + ps.AddAddr(iaddr.ID(), iaddr.Multiaddr(), peer.TempAddrTTL) } return pids, nil } diff --git a/core/core.go b/core/core.go index a065ac905..f8c42a223 100644 --- a/core/core.go +++ b/core/core.go @@ -476,16 +476,12 @@ func startListening(ctx context.Context, host p2phost.Host, cfg *config.Config) return err } - // explicitly set these as our listen addrs. - // (why not do it inside inet.NewNetwork? because this way we can - // listen on addresses without necessarily advertising those publicly.) + // list out our addresses addrs, err := host.Network().InterfaceListenAddresses() if err != nil { return debugerror.Wrap(err) } log.Infof("Swarm listening at: %s", addrs) - - host.Peerstore().AddAddresses(host.ID(), addrs) return nil } diff --git a/exchange/bitswap/network/ipfs_impl.go b/exchange/bitswap/network/ipfs_impl.go index 2ea6705d0..22ead701c 100644 --- a/exchange/bitswap/network/ipfs_impl.go +++ b/exchange/bitswap/network/ipfs_impl.go @@ -38,18 +38,24 @@ type impl struct { receiver Receiver } +func (bsnet *impl) newStreamToPeer(ctx context.Context, p peer.ID) (inet.Stream, error) { + + // first, make sure we're connected. + // if this fails, we cannot connect to given peer. + //TODO(jbenet) move this into host.NewStream? + if err := bsnet.host.Connect(ctx, peer.PeerInfo{ID: p}); err != nil { + return nil, err + } + + return bsnet.host.NewStream(ProtocolBitswap, p) +} + func (bsnet *impl) SendMessage( ctx context.Context, p peer.ID, outgoing bsmsg.BitSwapMessage) error { - // ensure we're connected - //TODO(jbenet) move this into host.NewStream? - if err := bsnet.host.Connect(ctx, peer.PeerInfo{ID: p}); err != nil { - return err - } - - s, err := bsnet.host.NewStream(ProtocolBitswap, p) + s, err := bsnet.newStreamToPeer(ctx, p) if err != nil { return err } @@ -68,13 +74,7 @@ func (bsnet *impl) SendRequest( p peer.ID, outgoing bsmsg.BitSwapMessage) (bsmsg.BitSwapMessage, error) { - // ensure we're connected - //TODO(jbenet) move this into host.NewStream? - if err := bsnet.host.Connect(ctx, peer.PeerInfo{ID: p}); err != nil { - return nil, err - } - - s, err := bsnet.host.NewStream(ProtocolBitswap, p) + s, err := bsnet.newStreamToPeer(ctx, p) if err != nil { return nil, err } @@ -123,7 +123,7 @@ func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int) if info.ID == bsnet.host.ID() { continue // ignore self as provider } - bsnet.host.Peerstore().AddAddresses(info.ID, info.Addrs) + bsnet.host.Peerstore().AddAddrs(info.ID, info.Addrs, peer.TempAddrTTL) select { case <-ctx.Done(): return diff --git a/p2p/crypto/secio/rw.go b/p2p/crypto/secio/rw.go index b8635c7aa..fe221c30c 100644 --- a/p2p/crypto/secio/rw.go +++ b/p2p/crypto/secio/rw.go @@ -192,7 +192,7 @@ func (r *etmReader) macCheckThenDecrypt(m []byte) (int, error) { // check mac. if failed, return error. if !hmac.Equal(macd, expected) { - log.Error("MAC Invalid:", expected, "!=", macd) + log.Debug("MAC Invalid:", expected, "!=", macd) return 0, ErrMACInvalid } diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index b9314c399..f6a05ee0c 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -144,7 +144,7 @@ func (h *BasicHost) NewStream(pid protocol.ID, p peer.ID) (inet.Stream, error) { func (h *BasicHost) Connect(ctx context.Context, pi peer.PeerInfo) error { // absorb addresses into peerstore - h.Peerstore().AddPeerInfo(pi) + h.Peerstore().AddAddrs(pi.ID, pi.Addrs, peer.TempAddrTTL) cs := h.Network().ConnsToPeer(pi.ID) if len(cs) > 0 { @@ -189,6 +189,10 @@ func (h *BasicHost) Addrs() []ma.Multiaddr { log.Debug("error retrieving network interface addrs") } + if h.ids != nil { // add external observed addresses + addrs = append(addrs, h.ids.OwnObservedAddrs()...) + } + if h.natmgr != nil { // natmgr is nil if we do not use nat option. nat := h.natmgr.NAT() if nat != nil { // nat is nil if not ready, or no nat is available. diff --git a/p2p/net/mock/mock_link.go b/p2p/net/mock/mock_link.go index cfd4be142..3881e62c2 100644 --- a/p2p/net/mock/mock_link.go +++ b/p2p/net/mock/mock_link.go @@ -33,8 +33,8 @@ func (l *link) newConnPair(dialer *peernet) (*conn, *conn) { c.local = ln.peer c.remote = rn.peer - c.localAddr = ln.ps.Addresses(ln.peer)[0] - c.remoteAddr = rn.ps.Addresses(rn.peer)[0] + c.localAddr = ln.ps.Addrs(ln.peer)[0] + c.remoteAddr = rn.ps.Addrs(rn.peer)[0] c.localPrivKey = ln.ps.PrivKey(ln.peer) c.remotePubKey = rn.ps.PubKey(rn.peer) diff --git a/p2p/net/mock/mock_peernet.go b/p2p/net/mock/mock_peernet.go index 87e51faf6..79c4a6aa0 100644 --- a/p2p/net/mock/mock_peernet.go +++ b/p2p/net/mock/mock_peernet.go @@ -49,7 +49,7 @@ func newPeernet(ctx context.Context, m *mocknet, k ic.PrivKey, // create our own entirely, so that peers knowledge doesn't get shared ps := peer.NewPeerstore() - ps.AddAddress(p, a) + ps.AddAddr(p, a, peer.PermanentAddrTTL) ps.AddPrivKey(p, k) ps.AddPubKey(p, k.GetPublic()) @@ -307,13 +307,13 @@ func (pn *peernet) BandwidthTotals() (in uint64, out uint64) { // Listen tells the network to start listening on given multiaddrs. func (pn *peernet) Listen(addrs ...ma.Multiaddr) error { - pn.Peerstore().AddAddresses(pn.LocalPeer(), addrs) + pn.Peerstore().AddAddrs(pn.LocalPeer(), addrs, peer.PermanentAddrTTL) return nil } // ListenAddresses returns a list of addresses at which this network listens. func (pn *peernet) ListenAddresses() []ma.Multiaddr { - return pn.Peerstore().Addresses(pn.LocalPeer()) + return pn.Peerstore().Addrs(pn.LocalPeer()) } // InterfaceListenAddresses returns a list of addresses at which this network diff --git a/p2p/net/swarm/dial_test.go b/p2p/net/swarm/dial_test.go index 92e54c719..252859df0 100644 --- a/p2p/net/swarm/dial_test.go +++ b/p2p/net/swarm/dial_test.go @@ -48,7 +48,7 @@ func TestSimultDials(t *testing.T) { connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) { // copy for other peer log.Debugf("TestSimultOpen: connecting: %s --> %s (%s)", s.local, dst, addr) - s.peers.AddAddress(dst, addr) + s.peers.AddAddr(dst, addr, peer.TempAddrTTL) if _, err := s.Dial(ctx, dst); err != nil { t.Fatal("error swarm dialing to peer", err) } @@ -125,7 +125,7 @@ func TestDialWait(t *testing.T) { s2p, s2addr, s2l := newSilentPeer(t) go acceptAndHang(s2l) defer s2l.Close() - s1.peers.AddAddress(s2p, s2addr) + s1.peers.AddAddr(s2p, s2addr, peer.PermanentAddrTTL) before := time.Now() if c, err := s1.Dial(ctx, s2p); err == nil { @@ -171,13 +171,13 @@ func TestDialBackoff(t *testing.T) { if err != nil { t.Fatal(err) } - s1.peers.AddAddresses(s2.local, s2addrs) + s1.peers.AddAddrs(s2.local, s2addrs, peer.PermanentAddrTTL) // dial to a non-existent peer. s3p, s3addr, s3l := newSilentPeer(t) go acceptAndHang(s3l) defer s3l.Close() - s1.peers.AddAddress(s3p, s3addr) + s1.peers.AddAddr(s3p, s3addr, peer.PermanentAddrTTL) // in this test we will: // 1) dial 10x to each node. @@ -389,7 +389,7 @@ func TestDialBackoffClears(t *testing.T) { defer s2l.Close() // phase 1 -- dial to non-operational addresses - s1.peers.AddAddress(s2.local, s2bad) + s1.peers.AddAddr(s2.local, s2bad, peer.PermanentAddrTTL) before := time.Now() if c, err := s1.Dial(ctx, s2.local); err == nil { @@ -419,7 +419,7 @@ func TestDialBackoffClears(t *testing.T) { if err != nil { t.Fatal(err) } - s1.peers.AddAddresses(s2.local, ifaceAddrs1) + s1.peers.AddAddrs(s2.local, ifaceAddrs1, peer.PermanentAddrTTL) before = time.Now() if c, err := s1.Dial(ctx, s2.local); err != nil { diff --git a/p2p/net/swarm/peers_test.go b/p2p/net/swarm/peers_test.go index 7a35c70d4..583e218d4 100644 --- a/p2p/net/swarm/peers_test.go +++ b/p2p/net/swarm/peers_test.go @@ -19,7 +19,7 @@ func TestPeers(t *testing.T) { connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) { // TODO: make a DialAddr func. - s.peers.AddAddress(dst, addr) + s.peers.AddAddr(dst, addr, peer.PermanentAddrTTL) // t.Logf("connections from %s", s.LocalPeer()) // for _, c := range s.ConnectionsToPeer(dst) { // t.Logf("connection from %s to %s: %v", s.LocalPeer(), dst, c) diff --git a/p2p/net/swarm/simul_test.go b/p2p/net/swarm/simul_test.go index 8a3df0d2e..446a011d7 100644 --- a/p2p/net/swarm/simul_test.go +++ b/p2p/net/swarm/simul_test.go @@ -25,7 +25,7 @@ func TestSimultOpen(t *testing.T) { connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) { // copy for other peer log.Debugf("TestSimultOpen: connecting: %s --> %s (%s)", s.local, dst, addr) - s.peers.AddAddress(dst, addr) + s.peers.AddAddr(dst, addr, peer.PermanentAddrTTL) if _, err := s.Dial(ctx, dst); err != nil { t.Fatal("error swarm dialing to peer", err) } diff --git a/p2p/net/swarm/swarm_addr_test.go b/p2p/net/swarm/swarm_addr_test.go index 850ba6eb4..e822a4b01 100644 --- a/p2p/net/swarm/swarm_addr_test.go +++ b/p2p/net/swarm/swarm_addr_test.go @@ -110,7 +110,7 @@ func TestDialBadAddrs(t *testing.T) { test := func(a ma.Multiaddr) { p := testutil.RandPeerIDFatal(t) - s.peers.AddAddress(p, a) + s.peers.AddAddr(p, a, peer.PermanentAddrTTL) if _, err := s.Dial(ctx, p); err == nil { t.Error("swarm should not dial: %s", m) } diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index bad1422cc..d79066e59 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -3,6 +3,7 @@ package swarm import ( "errors" "fmt" + "math/rand" "net" "sync" "time" @@ -15,6 +16,9 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" + process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + procctx "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context" + ratelimit "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit" ) // Diagram of dial sync: @@ -289,14 +293,14 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { } // get remote peer addrs - remoteAddrs := s.peers.Addresses(p) + remoteAddrs := s.peers.Addrs(p) // make sure we can use the addresses. remoteAddrs = addrutil.FilterUsableAddrs(remoteAddrs) // drop out any addrs that would just dial ourselves. use ListenAddresses // as that is a more authoritative view than localAddrs. ila, _ := s.InterfaceListenAddresses() remoteAddrs = addrutil.Subtract(remoteAddrs, ila) - remoteAddrs = addrutil.Subtract(remoteAddrs, s.peers.Addresses(s.local)) + remoteAddrs = addrutil.Subtract(remoteAddrs, s.peers.Addrs(s.local)) log.Debugf("%s swarm dialing %s -- remote:%s local:%s", s.local, p, remoteAddrs, s.ListenAddresses()) if len(remoteAddrs) == 0 { err := errors.New("peer has no addresses") @@ -353,43 +357,63 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote conns := make(chan conn.Conn, len(remoteAddrs)) errs := make(chan error, len(remoteAddrs)) - //TODO: rate limiting just in case? - for _, addr := range remoteAddrs { - go func(addr ma.Multiaddr) { - connC, err := s.dialAddr(ctx, d, p, addr) + // dialSingleAddr is used in the rate-limited async thing below. + dialSingleAddr := func(addr ma.Multiaddr) { + connC, err := s.dialAddr(ctx, d, p, addr) - // check parent still wants our results + // check parent still wants our results + select { + case <-foundConn: + if connC != nil { + connC.Close() + } + return + default: + } + + if err != nil { + errs <- err + } else if connC == nil { + errs <- fmt.Errorf("failed to dial %s %s", p, addr) + } else { + conns <- connC + } + } + + // this whole thing is in a goroutine so we can use foundConn + // to end early. + go func() { + // rate limiting just in case. at most 10 addrs at once. + limiter := ratelimit.NewRateLimiter(procctx.WithContext(ctx), 10) + + // permute addrs so we try different sets first each time. + for _, i := range rand.Perm(len(remoteAddrs)) { select { - case <-foundConn: - if connC != nil { - connC.Close() - } - return + case <-foundConn: // if one of them succeeded already + break default: } - if err != nil { - errs <- err - } else if connC == nil { - errs <- fmt.Errorf("failed to dial %s %s", p, addr) - } else { - conns <- connC - } - }(addr) - } + workerAddr := remoteAddrs[i] // shadow variable to avoid race + limiter.Go(func(worker process.Process) { + dialSingleAddr(workerAddr) + }) + } + }() - err := fmt.Errorf("failed to dial %s", p) + // wair fot the results. + exitErr := fmt.Errorf("failed to dial %s", p) for i := 0; i < len(remoteAddrs); i++ { select { - case err = <-errs: - log.Debug(err) + case exitErr = <-errs: // + log.Debug(exitErr) case connC := <-conns: // take the first + return asap close(foundConn) return connC, nil } } - return nil, err + return nil, exitErr } func (s *Swarm) dialAddr(ctx context.Context, d *conn.Dialer, p peer.ID, addr ma.Multiaddr) (conn.Conn, error) { diff --git a/p2p/net/swarm/swarm_listen.go b/p2p/net/swarm/swarm_listen.go index 734726fac..8c7a7b5a7 100644 --- a/p2p/net/swarm/swarm_listen.go +++ b/p2p/net/swarm/swarm_listen.go @@ -53,7 +53,7 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error { // return err // } // for _, a := range resolved { - // s.peers.AddAddress(s.local, a) + // s.peers.AddAddr(s.local, a) // } sk := s.peers.PrivKey(s.local) diff --git a/p2p/net/swarm/swarm_test.go b/p2p/net/swarm/swarm_test.go index 07c583eab..1c2ae1d03 100644 --- a/p2p/net/swarm/swarm_test.go +++ b/p2p/net/swarm/swarm_test.go @@ -75,7 +75,7 @@ func connectSwarms(t *testing.T, ctx context.Context, swarms []*Swarm) { var wg sync.WaitGroup connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) { // TODO: make a DialAddr func. - s.peers.AddAddress(dst, addr) + s.peers.AddAddr(dst, addr, peer.PermanentAddrTTL) if _, err := s.Dial(ctx, dst); err != nil { t.Fatal("error swarm dialing to peer", err) } diff --git a/p2p/peer/addr_manager.go b/p2p/peer/addr_manager.go new file mode 100644 index 000000000..14a8010b0 --- /dev/null +++ b/p2p/peer/addr_manager.go @@ -0,0 +1,188 @@ +package peer + +import ( + "sync" + "time" + + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" +) + +const ( + + // TempAddrTTL is the ttl used for a short lived address + TempAddrTTL = time.Second * 10 + + // ProviderAddrTTL is the TTL of an address we've received from a provider. + // This is also a temporary address, but lasts longer. After this expires, + // the records we return will require an extra lookup. + ProviderAddrTTL = time.Minute * 10 + + // RecentlyConnectedAddrTTL is used when we recently connected to a peer. + // It means that we are reasonably certain of the peer's address. + RecentlyConnectedAddrTTL = time.Minute * 10 + + // OwnObservedAddrTTL is used for our own external addresses observed by peers. + OwnObservedAddrTTL = time.Minute * 20 + + // PermanentAddrTTL is the ttl for a "permanent address" (e.g. bootstrap nodes) + // if we haven't shipped you an update to ipfs in 356 days + // we probably arent running the same bootstrap nodes... + PermanentAddrTTL = time.Hour * 24 * 356 + + // ConnectedAddrTTL is the ttl used for the addresses of a peer to whom + // we're connected directly. This is basically permanent, as we will + // clear them + re-add under a TempAddrTTL after disconnecting. + ConnectedAddrTTL = PermanentAddrTTL +) + +type expiringAddr struct { + Addr ma.Multiaddr + TTL time.Time +} + +func (e *expiringAddr) ExpiredBy(t time.Time) bool { + return t.After(e.TTL) +} + +type addrSet map[string]expiringAddr + +// AddrManager manages addresses. +// The zero-value is ready to be used. +type AddrManager struct { + addrmu sync.Mutex // guards addrs + addrs map[ID]addrSet +} + +// ensures the AddrManager is initialized. +// So we can use the zero value. +func (mgr *AddrManager) init() { + if mgr.addrs == nil { + mgr.addrs = make(map[ID]addrSet) + } +} + +func (mgr *AddrManager) Peers() []ID { + mgr.addrmu.Lock() + defer mgr.addrmu.Unlock() + if mgr.addrs == nil { + return nil + } + + pids := make([]ID, 0, len(mgr.addrs)) + for pid := range mgr.addrs { + pids = append(pids, pid) + } + return pids +} + +// AddAddr calls AddAddrs(p, []ma.Multiaddr{addr}, ttl) +func (mgr *AddrManager) AddAddr(p ID, addr ma.Multiaddr, ttl time.Duration) { + mgr.AddAddrs(p, []ma.Multiaddr{addr}, ttl) +} + +// AddAddrs gives AddrManager addresses to use, with a given ttl +// (time-to-live), after which the address is no longer valid. +// If the manager has a longer TTL, the operation is a no-op for that address +func (mgr *AddrManager) AddAddrs(p ID, addrs []ma.Multiaddr, ttl time.Duration) { + mgr.addrmu.Lock() + defer mgr.addrmu.Unlock() + + // if ttl is zero, exit. nothing to do. + if ttl <= 0 { + return + } + + // so zero value can be used + mgr.init() + + amap, found := mgr.addrs[p] + if !found { + amap = make(addrSet) + mgr.addrs[p] = amap + } + + // only expand ttls + exp := time.Now().Add(ttl) + for _, addr := range addrs { + addrstr := addr.String() + a, found := amap[addrstr] + if !found || exp.After(a.TTL) { + amap[addrstr] = expiringAddr{Addr: addr, TTL: exp} + } + } +} + +// SetAddr calls mgr.SetAddrs(p, addr, ttl) +func (mgr *AddrManager) SetAddr(p ID, addr ma.Multiaddr, ttl time.Duration) { + mgr.SetAddrs(p, []ma.Multiaddr{addr}, ttl) +} + +// SetAddrs sets the ttl on addresses. This clears any TTL there previously. +// This is used when we receive the best estimate of the validity of an address. +func (mgr *AddrManager) SetAddrs(p ID, addrs []ma.Multiaddr, ttl time.Duration) { + mgr.addrmu.Lock() + defer mgr.addrmu.Unlock() + + // so zero value can be used + mgr.init() + + amap, found := mgr.addrs[p] + if !found { + amap = make(addrSet) + mgr.addrs[p] = amap + } + + exp := time.Now().Add(ttl) + for _, addr := range addrs { + // re-set all of them for new ttl. + addrs := addr.String() + + if ttl > 0 { + amap[addrs] = expiringAddr{Addr: addr, TTL: exp} + } else { + delete(amap, addrs) + } + } +} + +// Addresses returns all known (and valid) addresses for a given +func (mgr *AddrManager) Addrs(p ID) []ma.Multiaddr { + mgr.addrmu.Lock() + defer mgr.addrmu.Unlock() + + // not initialized? nothing to give. + if mgr.addrs == nil { + return nil + } + + maddrs, found := mgr.addrs[p] + if !found { + return nil + } + + now := time.Now() + good := make([]ma.Multiaddr, 0, len(maddrs)) + var expired []string + for s, m := range maddrs { + if m.ExpiredBy(now) { + expired = append(expired, s) + } else { + good = append(good, m.Addr) + } + } + + // clean up the expired ones. + for _, s := range expired { + delete(maddrs, s) + } + return good +} + +// ClearAddresses removes all previously stored addresses +func (mgr *AddrManager) ClearAddrs(p ID) { + mgr.addrmu.Lock() + defer mgr.addrmu.Unlock() + mgr.init() + + mgr.addrs[p] = make(addrSet) // clear what was there before +} diff --git a/p2p/peer/addr_manager_test.go b/p2p/peer/addr_manager_test.go new file mode 100644 index 000000000..1c488abda --- /dev/null +++ b/p2p/peer/addr_manager_test.go @@ -0,0 +1,180 @@ +package peer + +import ( + "testing" + "time" + + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" +) + +func IDS(t *testing.T, ids string) ID { + id, err := IDB58Decode(ids) + if err != nil { + t.Fatal(err) + } + return id +} + +func MA(t *testing.T, m string) ma.Multiaddr { + maddr, err := ma.NewMultiaddr(m) + if err != nil { + t.Fatal(err) + } + return maddr +} + +func testHas(t *testing.T, exp, act []ma.Multiaddr) { + if len(exp) != len(act) { + t.Fatal("lengths not the same") + } + + for _, a := range exp { + found := false + + for _, b := range act { + if a.Equal(b) { + found = true + break + } + } + + if !found { + t.Fatal("expected address %s not found", a) + } + } +} + +func TestAddresses(t *testing.T) { + + id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN") + id2 := IDS(t, "QmRmPL3FDZKE3Qiwv1RosLdwdvbvg17b2hB39QPScgWKKZ") + id3 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ6Kn") + id4 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ5Kn") + id5 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ5Km") + + ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111") + ma21 := MA(t, "/ip4/2.2.3.2/tcp/1111") + ma22 := MA(t, "/ip4/2.2.3.2/tcp/2222") + ma31 := MA(t, "/ip4/3.2.3.3/tcp/1111") + ma32 := MA(t, "/ip4/3.2.3.3/tcp/2222") + ma33 := MA(t, "/ip4/3.2.3.3/tcp/3333") + ma41 := MA(t, "/ip4/4.2.3.3/tcp/1111") + ma42 := MA(t, "/ip4/4.2.3.3/tcp/2222") + ma43 := MA(t, "/ip4/4.2.3.3/tcp/3333") + ma44 := MA(t, "/ip4/4.2.3.3/tcp/4444") + ma51 := MA(t, "/ip4/5.2.3.3/tcp/1111") + ma52 := MA(t, "/ip4/5.2.3.3/tcp/2222") + ma53 := MA(t, "/ip4/5.2.3.3/tcp/3333") + ma54 := MA(t, "/ip4/5.2.3.3/tcp/4444") + ma55 := MA(t, "/ip4/5.2.3.3/tcp/5555") + + ttl := time.Hour + m := AddrManager{} + m.AddAddr(id1, ma11, ttl) + + m.AddAddrs(id2, []ma.Multiaddr{ma21, ma22}, ttl) + m.AddAddrs(id2, []ma.Multiaddr{ma21, ma22}, ttl) // idempotency + + m.AddAddr(id3, ma31, ttl) + m.AddAddr(id3, ma32, ttl) + m.AddAddr(id3, ma33, ttl) + m.AddAddr(id3, ma33, ttl) // idempotency + m.AddAddr(id3, ma33, ttl) + + m.AddAddrs(id4, []ma.Multiaddr{ma41, ma42, ma43, ma44}, ttl) // multiple + + m.AddAddrs(id5, []ma.Multiaddr{ma21, ma22}, ttl) // clearing + m.AddAddrs(id5, []ma.Multiaddr{ma41, ma42, ma43, ma44}, ttl) // clearing + m.ClearAddrs(id5) + m.AddAddrs(id5, []ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}, ttl) // clearing + + // test the Addresses return value + testHas(t, []ma.Multiaddr{ma11}, m.Addrs(id1)) + testHas(t, []ma.Multiaddr{ma21, ma22}, m.Addrs(id2)) + testHas(t, []ma.Multiaddr{ma31, ma32, ma33}, m.Addrs(id3)) + testHas(t, []ma.Multiaddr{ma41, ma42, ma43, ma44}, m.Addrs(id4)) + testHas(t, []ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}, m.Addrs(id5)) + +} + +func TestAddressesExpire(t *testing.T) { + + id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN") + id2 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQM") + ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111") + ma12 := MA(t, "/ip4/2.2.3.2/tcp/2222") + ma13 := MA(t, "/ip4/3.2.3.3/tcp/3333") + ma24 := MA(t, "/ip4/4.2.3.3/tcp/4444") + ma25 := MA(t, "/ip4/5.2.3.3/tcp/5555") + + m := AddrManager{} + m.AddAddr(id1, ma11, time.Hour) + m.AddAddr(id1, ma12, time.Hour) + m.AddAddr(id1, ma13, time.Hour) + m.AddAddr(id2, ma24, time.Hour) + m.AddAddr(id2, ma25, time.Hour) + + testHas(t, []ma.Multiaddr{ma11, ma12, ma13}, m.Addrs(id1)) + testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2)) + + m.SetAddr(id1, ma11, 2*time.Hour) + m.SetAddr(id1, ma12, 2*time.Hour) + m.SetAddr(id1, ma13, 2*time.Hour) + m.SetAddr(id2, ma24, 2*time.Hour) + m.SetAddr(id2, ma25, 2*time.Hour) + + testHas(t, []ma.Multiaddr{ma11, ma12, ma13}, m.Addrs(id1)) + testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2)) + + m.SetAddr(id1, ma11, time.Millisecond) + <-time.After(time.Millisecond) + testHas(t, []ma.Multiaddr{ma12, ma13}, m.Addrs(id1)) + testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2)) + + m.SetAddr(id1, ma13, time.Millisecond) + <-time.After(time.Millisecond) + testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1)) + testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2)) + + m.SetAddr(id2, ma24, time.Millisecond) + <-time.After(time.Millisecond) + testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1)) + testHas(t, []ma.Multiaddr{ma25}, m.Addrs(id2)) + + m.SetAddr(id2, ma25, time.Millisecond) + <-time.After(time.Millisecond) + testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1)) + testHas(t, nil, m.Addrs(id2)) + + m.SetAddr(id1, ma12, time.Millisecond) + <-time.After(time.Millisecond) + testHas(t, nil, m.Addrs(id1)) + testHas(t, nil, m.Addrs(id2)) +} + +func TestClearWorks(t *testing.T) { + + id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN") + id2 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQM") + ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111") + ma12 := MA(t, "/ip4/2.2.3.2/tcp/2222") + ma13 := MA(t, "/ip4/3.2.3.3/tcp/3333") + ma24 := MA(t, "/ip4/4.2.3.3/tcp/4444") + ma25 := MA(t, "/ip4/5.2.3.3/tcp/5555") + + m := AddrManager{} + m.AddAddr(id1, ma11, time.Hour) + m.AddAddr(id1, ma12, time.Hour) + m.AddAddr(id1, ma13, time.Hour) + m.AddAddr(id2, ma24, time.Hour) + m.AddAddr(id2, ma25, time.Hour) + + testHas(t, []ma.Multiaddr{ma11, ma12, ma13}, m.Addrs(id1)) + testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2)) + + m.ClearAddrs(id1) + m.ClearAddrs(id2) + + testHas(t, nil, m.Addrs(id1)) + testHas(t, nil, m.Addrs(id2)) +} diff --git a/p2p/peer/peerstore.go b/p2p/peer/peerstore.go index 8be502ca4..f1281e86e 100644 --- a/p2p/peer/peerstore.go +++ b/p2p/peer/peerstore.go @@ -20,8 +20,8 @@ const ( // Peerstore provides a threadsafe store of Peer related // information. type Peerstore interface { + AddrBook KeyBook - AddressBook Metrics // Peers returns a list of all peer.IDs in this Peerstore @@ -32,9 +32,6 @@ type Peerstore interface { // that peer, useful to other services. PeerInfo(ID) PeerInfo - // AddPeerInfo absorbs the information listed in given PeerInfo. - AddPeerInfo(PeerInfo) - // Get/Put is a simple registry for other peer-related key/value pairs. // if we find something we use often, it should become its own set of // methods. this is a last resort. @@ -42,109 +39,30 @@ type Peerstore interface { Put(id ID, key string, val interface{}) error } -// AddressBook tracks the addresses of Peers -type AddressBook interface { - Addresses(ID) []ma.Multiaddr // returns addresses for ID - AddAddress(ID, ma.Multiaddr) // Adds given addr for ID - AddAddresses(ID, []ma.Multiaddr) // Adds given addrs for ID - SetAddresses(ID, []ma.Multiaddr) // Sets given addrs for ID (clears previously stored) -} +// AddrBook is an interface that fits the new AddrManager. I'm patching +// it up in here to avoid changing a ton of the codebase. +type AddrBook interface { -type expiringAddr struct { - Addr ma.Multiaddr - TTL time.Time -} + // AddAddr calls AddAddrs(p, []ma.Multiaddr{addr}, ttl) + AddAddr(p ID, addr ma.Multiaddr, ttl time.Duration) -func (e *expiringAddr) Expired() bool { - return time.Now().After(e.TTL) -} + // AddAddrs gives AddrManager addresses to use, with a given ttl + // (time-to-live), after which the address is no longer valid. + // If the manager has a longer TTL, the operation is a no-op for that address + AddAddrs(p ID, addrs []ma.Multiaddr, ttl time.Duration) -type addressMap map[string]expiringAddr + // SetAddr calls mgr.SetAddrs(p, addr, ttl) + SetAddr(p ID, addr ma.Multiaddr, ttl time.Duration) -type addressbook struct { - sync.RWMutex // guards all fields + // SetAddrs sets the ttl on addresses. This clears any TTL there previously. + // This is used when we receive the best estimate of the validity of an address. + SetAddrs(p ID, addrs []ma.Multiaddr, ttl time.Duration) - addrs map[ID]addressMap - ttl time.Duration // initial ttl -} + // Addresses returns all known (and valid) addresses for a given + Addrs(p ID) []ma.Multiaddr -func newAddressbook() *addressbook { - return &addressbook{ - addrs: map[ID]addressMap{}, - ttl: AddressTTL, - } -} - -func (ab *addressbook) Peers() []ID { - ab.RLock() - ps := make([]ID, 0, len(ab.addrs)) - for p := range ab.addrs { - ps = append(ps, p) - } - ab.RUnlock() - return ps -} - -func (ab *addressbook) Addresses(p ID) []ma.Multiaddr { - ab.Lock() - defer ab.Unlock() - - maddrs, found := ab.addrs[p] - if !found { - return nil - } - - good := make([]ma.Multiaddr, 0, len(maddrs)) - var expired []string - for s, m := range maddrs { - if m.Expired() { - expired = append(expired, s) - } else { - good = append(good, m.Addr) - } - } - - // clean up the expired ones. - for _, s := range expired { - delete(ab.addrs[p], s) - } - return good -} - -func (ab *addressbook) AddAddress(p ID, m ma.Multiaddr) { - ab.AddAddresses(p, []ma.Multiaddr{m}) -} - -func (ab *addressbook) AddAddresses(p ID, ms []ma.Multiaddr) { - ab.Lock() - defer ab.Unlock() - - amap, found := ab.addrs[p] - if !found { - amap = addressMap{} - ab.addrs[p] = amap - } - - ttl := time.Now().Add(ab.ttl) - for _, m := range ms { - // re-set all of them for new ttl. - amap[m.String()] = expiringAddr{ - Addr: m, - TTL: ttl, - } - } -} - -func (ab *addressbook) SetAddresses(p ID, ms []ma.Multiaddr) { - ab.Lock() - defer ab.Unlock() - - amap := addressMap{} - ttl := time.Now().Add(ab.ttl) - for _, m := range ms { - amap[m.String()] = expiringAddr{Addr: m, TTL: ttl} - } - ab.addrs[p] = amap // clear what was there before + // ClearAddresses removes all previously stored addresses + ClearAddrs(p ID) } // KeyBook tracks the Public keys of Peers. @@ -231,8 +149,8 @@ func (kb *keybook) AddPrivKey(p ID, sk ic.PrivKey) error { type peerstore struct { keybook - addressbook metrics + AddrManager // store other data, like versions ds ds.ThreadSafeDatastore @@ -242,8 +160,8 @@ type peerstore struct { func NewPeerstore() Peerstore { return &peerstore{ keybook: *newKeybook(), - addressbook: *newAddressbook(), metrics: *(NewMetrics()).(*metrics), + AddrManager: AddrManager{}, ds: dssync.MutexWrap(ds.NewMapDatastore()), } } @@ -263,7 +181,7 @@ func (ps *peerstore) Peers() []ID { for _, p := range ps.keybook.Peers() { set[p] = struct{}{} } - for _, p := range ps.addressbook.Peers() { + for _, p := range ps.AddrManager.Peers() { set[p] = struct{}{} } @@ -277,14 +195,10 @@ func (ps *peerstore) Peers() []ID { func (ps *peerstore) PeerInfo(p ID) PeerInfo { return PeerInfo{ ID: p, - Addrs: ps.addressbook.Addresses(p), + Addrs: ps.AddrManager.Addrs(p), } } -func (ps *peerstore) AddPeerInfo(pi PeerInfo) { - ps.AddAddresses(pi.ID, pi.Addrs) -} - func PeerInfos(ps Peerstore, peers []ID) []PeerInfo { pi := make([]PeerInfo, len(peers)) for i, p := range peers { diff --git a/p2p/peer/peerstore_test.go b/p2p/peer/peerstore_test.go deleted file mode 100644 index 1edf6ae70..000000000 --- a/p2p/peer/peerstore_test.go +++ /dev/null @@ -1,185 +0,0 @@ -package peer - -import ( - "testing" - "time" - - ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" -) - -func IDS(t *testing.T, ids string) ID { - id, err := IDB58Decode(ids) - if err != nil { - t.Fatal(err) - } - return id -} - -func MA(t *testing.T, m string) ma.Multiaddr { - maddr, err := ma.NewMultiaddr(m) - if err != nil { - t.Fatal(err) - } - return maddr -} - -func TestAddresses(t *testing.T) { - - ps := NewPeerstore() - - id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN") - id2 := IDS(t, "QmRmPL3FDZKE3Qiwv1RosLdwdvbvg17b2hB39QPScgWKKZ") - id3 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ6Kn") - id4 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ5Kn") - id5 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ5Km") - - ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111") - ma21 := MA(t, "/ip4/2.2.3.2/tcp/1111") - ma22 := MA(t, "/ip4/2.2.3.2/tcp/2222") - ma31 := MA(t, "/ip4/3.2.3.3/tcp/1111") - ma32 := MA(t, "/ip4/3.2.3.3/tcp/2222") - ma33 := MA(t, "/ip4/3.2.3.3/tcp/3333") - ma41 := MA(t, "/ip4/4.2.3.3/tcp/1111") - ma42 := MA(t, "/ip4/4.2.3.3/tcp/2222") - ma43 := MA(t, "/ip4/4.2.3.3/tcp/3333") - ma44 := MA(t, "/ip4/4.2.3.3/tcp/4444") - ma51 := MA(t, "/ip4/5.2.3.3/tcp/1111") - ma52 := MA(t, "/ip4/5.2.3.3/tcp/2222") - ma53 := MA(t, "/ip4/5.2.3.3/tcp/3333") - ma54 := MA(t, "/ip4/5.2.3.3/tcp/4444") - ma55 := MA(t, "/ip4/5.2.3.3/tcp/5555") - - ps.AddAddress(id1, ma11) - ps.AddAddresses(id2, []ma.Multiaddr{ma21, ma22}) - ps.AddAddresses(id2, []ma.Multiaddr{ma21, ma22}) // idempotency - ps.AddAddress(id3, ma31) - ps.AddAddress(id3, ma32) - ps.AddAddress(id3, ma33) - ps.AddAddress(id3, ma33) // idempotency - ps.AddAddress(id3, ma33) - ps.AddAddresses(id4, []ma.Multiaddr{ma41, ma42, ma43, ma44}) // multiple - ps.AddAddresses(id5, []ma.Multiaddr{ma21, ma22}) // clearing - ps.AddAddresses(id5, []ma.Multiaddr{ma41, ma42, ma43, ma44}) // clearing - ps.SetAddresses(id5, []ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}) // clearing - - test := func(exp, act []ma.Multiaddr) { - if len(exp) != len(act) { - t.Fatal("lengths not the same") - } - - for _, a := range exp { - found := false - - for _, b := range act { - if a.Equal(b) { - found = true - break - } - } - - if !found { - t.Fatal("expected address %s not found", a) - } - } - } - - // test the Addresses return value - test([]ma.Multiaddr{ma11}, ps.Addresses(id1)) - test([]ma.Multiaddr{ma21, ma22}, ps.Addresses(id2)) - test([]ma.Multiaddr{ma31, ma32, ma33}, ps.Addresses(id3)) - test([]ma.Multiaddr{ma41, ma42, ma43, ma44}, ps.Addresses(id4)) - test([]ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}, ps.Addresses(id5)) - - // test also the PeerInfo return - test([]ma.Multiaddr{ma11}, ps.PeerInfo(id1).Addrs) - test([]ma.Multiaddr{ma21, ma22}, ps.PeerInfo(id2).Addrs) - test([]ma.Multiaddr{ma31, ma32, ma33}, ps.PeerInfo(id3).Addrs) - test([]ma.Multiaddr{ma41, ma42, ma43, ma44}, ps.PeerInfo(id4).Addrs) - test([]ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}, ps.PeerInfo(id5).Addrs) -} - -func TestAddressTTL(t *testing.T) { - - ps := NewPeerstore() - id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN") - ma1 := MA(t, "/ip4/1.2.3.1/tcp/1111") - ma2 := MA(t, "/ip4/2.2.3.2/tcp/2222") - ma3 := MA(t, "/ip4/3.2.3.3/tcp/3333") - ma4 := MA(t, "/ip4/4.2.3.3/tcp/4444") - ma5 := MA(t, "/ip4/5.2.3.3/tcp/5555") - - ps.AddAddress(id1, ma1) - ps.AddAddress(id1, ma2) - ps.AddAddress(id1, ma3) - ps.AddAddress(id1, ma4) - ps.AddAddress(id1, ma5) - - test := func(exp, act []ma.Multiaddr) { - if len(exp) != len(act) { - t.Fatal("lengths not the same") - } - - for _, a := range exp { - found := false - - for _, b := range act { - if a.Equal(b) { - found = true - break - } - } - - if !found { - t.Fatal("expected address %s not found", a) - } - } - } - - testTTL := func(ttle time.Duration, id ID, addr ma.Multiaddr) { - ab := ps.(*peerstore).addressbook - ttlat := ab.addrs[id][addr.String()].TTL - ttla := ttlat.Sub(time.Now()) - if ttla > ttle { - t.Error("ttl is greater than expected", ttle, ttla) - } - if ttla < (ttle / 2) { - t.Error("ttl is smaller than expected", ttle/2, ttla) - } - } - - // should they are there - ab := ps.(*peerstore).addressbook - if len(ab.addrs[id1]) != 5 { - t.Error("incorrect addr count", len(ab.addrs[id1]), ab.addrs[id1]) - } - - // test the Addresses return value - test([]ma.Multiaddr{ma1, ma2, ma3, ma4, ma5}, ps.Addresses(id1)) - test([]ma.Multiaddr{ma1, ma2, ma3, ma4, ma5}, ps.PeerInfo(id1).Addrs) - - // check the addr TTL is a bit smaller than the init TTL - testTTL(AddressTTL, id1, ma1) - testTTL(AddressTTL, id1, ma2) - testTTL(AddressTTL, id1, ma3) - testTTL(AddressTTL, id1, ma4) - testTTL(AddressTTL, id1, ma5) - - // change the TTL - setTTL := func(id ID, addr ma.Multiaddr, ttl time.Time) { - a := ab.addrs[id][addr.String()] - a.TTL = ttl - ab.addrs[id][addr.String()] = a - } - setTTL(id1, ma1, time.Now().Add(-1*time.Second)) - setTTL(id1, ma2, time.Now().Add(-1*time.Hour)) - setTTL(id1, ma3, time.Now().Add(-1*AddressTTL)) - - // should no longer list those - test([]ma.Multiaddr{ma4, ma5}, ps.Addresses(id1)) - test([]ma.Multiaddr{ma4, ma5}, ps.PeerInfo(id1).Addrs) - - // should no longer be there - if len(ab.addrs[id1]) != 2 { - t.Error("incorrect addr count", len(ab.addrs[id1]), ab.addrs[id1]) - } -} diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 107c2538d..3447e607b 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -1,7 +1,7 @@ package identify import ( - "fmt" + "strings" "sync" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" @@ -11,10 +11,12 @@ import ( host "github.com/jbenet/go-ipfs/p2p/host" inet "github.com/jbenet/go-ipfs/p2p/net" + peer "github.com/jbenet/go-ipfs/p2p/peer" protocol "github.com/jbenet/go-ipfs/p2p/protocol" pb "github.com/jbenet/go-ipfs/p2p/protocol/identify/pb" config "github.com/jbenet/go-ipfs/repo/config" eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog" + lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables" ) var log = eventlog.Logger("net/identify") @@ -23,16 +25,9 @@ var log = eventlog.Logger("net/identify") const ID protocol.ID = "/ipfs/identify" // IpfsVersion holds the current protocol version for a client running this code -var IpfsVersion *semver.Version -var ClientVersion = "go-ipfs/" + config.CurrentVersionNumber - -func init() { - var err error - IpfsVersion, err = semver.NewVersion("0.0.1") - if err != nil { - panic(fmt.Errorf("invalid protocol version: %v", err)) - } -} +// TODO(jbenet): fix the versioning mess. +const IpfsVersion = "ipfs/0.1.0" +const ClientVersion = "go-ipfs/" + config.CurrentVersionNumber // IDService is a structure that implements ProtocolIdentify. // It is a trivial service that gives the other peer some @@ -49,6 +44,10 @@ type IDService struct { // for wait purposes currid map[inet.Conn]chan struct{} currmu sync.RWMutex + + // our own observed addresses. + // TODO: instead of expiring, remove these when we disconnect + addrs peer.AddrManager } func NewIDService(h host.Host) *IDService { @@ -60,6 +59,11 @@ func NewIDService(h host.Host) *IDService { return s } +// OwnObservedAddrs returns the addresses peers have reported we've dialed from +func (ids *IDService) OwnObservedAddrs() []ma.Multiaddr { + return ids.addrs.Addrs(ids.Host.ID()) +} + func (ids *IDService) IdentifyConn(c inet.Conn) { ids.currmu.Lock() if wait, found := ids.currid[c]; found { @@ -148,9 +152,10 @@ func (ids *IDService) populateMessage(mes *pb.Identify, c inet.Conn) { log.Debugf("%s sent listen addrs to %s: %s", c.LocalPeer(), c.RemotePeer(), laddrs) // set protocol versions - s := IpfsVersion.String() - mes.ProtocolVersion = &s - mes.AgentVersion = &ClientVersion + pv := IpfsVersion + av := ClientVersion + mes.ProtocolVersion = &pv + mes.AgentVersion = &av } func (ids *IDService) consumeMessage(mes *pb.Identify, c inet.Conn) { @@ -176,12 +181,22 @@ func (ids *IDService) consumeMessage(mes *pb.Identify, c inet.Conn) { // update our peerstore with the addresses. here, we SET the addresses, clearing old ones. // We are receiving from the peer itself. this is current address ground truth. - ids.Host.Peerstore().SetAddresses(p, lmaddrs) + ids.Host.Peerstore().SetAddrs(p, lmaddrs, peer.ConnectedAddrTTL) log.Debugf("%s received listen addrs for %s: %s", c.LocalPeer(), c.RemotePeer(), lmaddrs) // get protocol versions pv := mes.GetProtocolVersion() av := mes.GetAgentVersion() + + // version check. if we shouldn't talk, bail. + // TODO: at this point, we've already exchanged information. + // move this into a first handshake before the connection can open streams. + if !protocolVersionsAreCompatible(pv, IpfsVersion) { + logProtocolMismatchDisconnect(c, pv, av) + c.Close() + return + } + ids.Host.Peerstore().Put(p, "ProtocolVersion", pv) ids.Host.Peerstore().Put(p, "AgentVersion", av) } @@ -235,7 +250,7 @@ func (ids *IDService) consumeObservedAddress(observed []byte, c inet.Conn) { // ok! we have the observed version of one of our ListenAddresses! log.Debugf("added own observed listen addr: %s --> %s", c.LocalMultiaddr(), maddr) - ids.Host.Peerstore().AddAddress(ids.Host.ID(), maddr) + ids.addrs.AddAddr(ids.Host.ID(), maddr, peer.OwnObservedAddrTTL) } func addrInAddrs(a ma.Multiaddr, as []ma.Multiaddr) bool { @@ -246,3 +261,63 @@ func addrInAddrs(a ma.Multiaddr, as []ma.Multiaddr) bool { } return false } + +// protocolVersionsAreCompatible checks that the two implementations +// can talk to each other. It will use semver, but for now while +// we're in tight development, we will return false for minor version +// changes too. +func protocolVersionsAreCompatible(v1, v2 string) bool { + if strings.HasPrefix(v1, "ipfs/") { + v1 = v1[5:] + } + if strings.HasPrefix(v2, "ipfs/") { + v2 = v2[5:] + } + + v1s, err := semver.NewVersion(v1) + if err != nil { + return false + } + + v2s, err := semver.NewVersion(v2) + if err != nil { + return false + } + + return v1s.Major == v2s.Major && v1s.Minor == v2s.Minor +} + +// netNotifiee defines methods to be used with the IpfsDHT +type netNotifiee IDService + +func (nn *netNotifiee) IDService() *IDService { + return (*IDService)(nn) +} + +func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) { + // TODO: deprecate the setConnHandler hook, and kick off + // identification here. +} + +func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) { + // undo the setting of addresses to peer.ConnectedAddrTTL we did + ids := nn.IDService() + ps := ids.Host.Peerstore() + addrs := ps.Addrs(v.RemotePeer()) + ps.SetAddrs(v.RemotePeer(), addrs, peer.RecentlyConnectedAddrTTL) +} + +func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) {} +func (nn *netNotifiee) ClosedStream(n inet.Network, v inet.Stream) {} +func (nn *netNotifiee) Listen(n inet.Network, a ma.Multiaddr) {} +func (nn *netNotifiee) ListenClose(n inet.Network, a ma.Multiaddr) {} + +func logProtocolMismatchDisconnect(c inet.Conn, protocol, agent string) { + lm := make(lgbl.DeferredMap) + lm["remotePeer"] = func() interface{} { return c.RemotePeer().Pretty() } + lm["remoteAddr"] = func() interface{} { return c.RemoteMultiaddr().String() } + lm["protocolVersion"] = protocol + lm["agentVersion"] = agent + log.Event(context.TODO(), "IdentifyProtocolMismatch", lm) + log.Debug("IdentifyProtocolMismatch %s %s %s (disconnected)", c.RemotePeer(), protocol, agent) +} diff --git a/p2p/protocol/identify/id_test.go b/p2p/protocol/identify/id_test.go index c6707b0ac..6af01283d 100644 --- a/p2p/protocol/identify/id_test.go +++ b/p2p/protocol/identify/id_test.go @@ -38,7 +38,7 @@ func subtestIDService(t *testing.T, postDialWait time.Duration) { // the IDService should be opened automatically, by the network. // what we should see now is that both peers know about each others listen addresses. - testKnowsAddrs(t, h1, h2p, h2.Peerstore().Addresses(h2p)) // has them + testKnowsAddrs(t, h1, h2p, h2.Peerstore().Addrs(h2p)) // has them testHasProtocolVersions(t, h1, h2p) // now, this wait we do have to do. it's the wait for the Listening side @@ -50,12 +50,12 @@ func subtestIDService(t *testing.T, postDialWait time.Duration) { <-h2.IDService().IdentifyWait(c[0]) // and the protocol versions. - testKnowsAddrs(t, h2, h1p, h1.Peerstore().Addresses(h1p)) // has them + testKnowsAddrs(t, h2, h1p, h1.Peerstore().Addrs(h1p)) // has them testHasProtocolVersions(t, h2, h1p) } func testKnowsAddrs(t *testing.T, h host.Host, p peer.ID, expected []ma.Multiaddr) { - actual := h.Peerstore().Addresses(p) + actual := h.Peerstore().Addrs(p) if len(actual) != len(expected) { t.Error("dont have the same addresses") @@ -79,7 +79,7 @@ func testHasProtocolVersions(t *testing.T, h host.Host, p peer.ID) { t.Error("no protocol version") return } - if v.(string) != identify.IpfsVersion.String() { + if v.(string) != identify.IpfsVersion { t.Error("protocol mismatch", err) } v, err = h.Peerstore().Get(p, "AgentVersion") diff --git a/p2p/test/util/util.go b/p2p/test/util/util.go index 70b286429..a680d4f3d 100644 --- a/p2p/test/util/util.go +++ b/p2p/test/util/util.go @@ -22,14 +22,14 @@ func GenSwarmNetwork(t *testing.T, ctx context.Context) *swarm.Network { if err != nil { t.Fatal(err) } - ps.AddAddresses(p.ID, n.ListenAddresses()) + ps.AddAddrs(p.ID, n.ListenAddresses(), peer.PermanentAddrTTL) return n } func DivulgeAddresses(a, b inet.Network) { id := a.LocalPeer() - addrs := a.Peerstore().Addresses(id) - b.Peerstore().AddAddresses(id, addrs) + addrs := a.Peerstore().Addrs(id) + b.Peerstore().AddAddrs(id, addrs, peer.PermanentAddrTTL) } func GenHostSwarm(t *testing.T, ctx context.Context) *bhost.BasicHost { diff --git a/routing/dht/dht.go b/routing/dht/dht.go index d34ac720b..edd18ff11 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -142,16 +142,20 @@ func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.ID, key string) error { // add self as the provider - pi := dht.peerstore.PeerInfo(dht.self) + pi := peer.PeerInfo{ + ID: dht.self, + Addrs: dht.host.Addrs(), + } + // // only share WAN-friendly addresses ?? // pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs) if len(pi.Addrs) < 1 { - log.Infof("%s putProvider: %s for %s error: no wan-friendly addresses", dht.self, p, u.Key(key), pi.Addrs) + // log.Infof("%s putProvider: %s for %s error: no wan-friendly addresses", dht.self, p, u.Key(key), pi.Addrs) return fmt.Errorf("no known addresses for self. cannot put provider.") } pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, string(key), 0) - pmes.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), []peer.PeerInfo{pi}) + pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]peer.PeerInfo{pi}) err := dht.sendMessage(ctx, p, pmes) if err != nil { return err diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index 00597b016..9d65a6fac 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -55,7 +55,7 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer for i := 0; i < n; i++ { dhts[i] = setupDHT(ctx, t) peers[i] = dhts[i].self - addrs[i] = dhts[i].peerstore.Addresses(dhts[i].self)[0] + addrs[i] = dhts[i].peerstore.Addrs(dhts[i].self)[0] } return addrs, peers, dhts @@ -64,12 +64,12 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) { idB := b.self - addrB := b.peerstore.Addresses(idB) + addrB := b.peerstore.Addrs(idB) if len(addrB) == 0 { t.Fatal("peers setup incorrectly: no local address") } - a.peerstore.AddAddresses(idB, addrB) + a.peerstore.AddAddrs(idB, addrB, peer.TempAddrTTL) if err := a.Connect(ctx, idB); err != nil { t.Fatal(err) } @@ -754,20 +754,20 @@ func TestConnectCollision(t *testing.T) { dhtA := setupDHT(ctx, t) dhtB := setupDHT(ctx, t) - addrA := dhtA.peerstore.Addresses(dhtA.self)[0] - addrB := dhtB.peerstore.Addresses(dhtB.self)[0] + addrA := dhtA.peerstore.Addrs(dhtA.self)[0] + addrB := dhtB.peerstore.Addrs(dhtB.self)[0] peerA := dhtA.self peerB := dhtB.self errs := make(chan error) go func() { - dhtA.peerstore.AddAddress(peerB, addrB) + dhtA.peerstore.AddAddr(peerB, addrB, peer.TempAddrTTL) err := dhtA.Connect(ctx, peerB) errs <- err }() go func() { - dhtB.peerstore.AddAddress(peerA, addrA) + dhtB.peerstore.AddAddr(peerA, addrA, peer.TempAddrTTL) err := dhtB.Connect(ctx, peerA) errs <- err }() diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index 6376dbcba..6974ad08d 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -238,7 +238,7 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M log.Infof("received provider %s for %s (addrs: %s)", p, key, pi.Addrs) if pi.ID != dht.self { // dont add own addrs. // add the received addresses to our peerstore. - dht.peerstore.AddPeerInfo(pi) + dht.peerstore.AddAddrs(pi.ID, pi.Addrs, peer.ProviderAddrTTL) } dht.providers.AddProvider(key, p) } diff --git a/routing/dht/lookup.go b/routing/dht/lookup.go index 6e0acd9fa..a713e553d 100644 --- a/routing/dht/lookup.go +++ b/routing/dht/lookup.go @@ -100,7 +100,7 @@ func (dht *IpfsDHT) closerPeersSingle(ctx context.Context, key u.Key, p peer.ID) for _, pbp := range pmes.GetCloserPeers() { pid := peer.ID(pbp.GetId()) if pid != dht.self { // dont add self - dht.peerstore.AddAddresses(pid, pbp.Addresses()) + dht.peerstore.AddAddrs(pid, pbp.Addresses(), peer.TempAddrTTL) out = append(out, pid) } } diff --git a/routing/dht/query.go b/routing/dht/query.go index 8d6505b88..293c0ddd9 100644 --- a/routing/dht/query.go +++ b/routing/dht/query.go @@ -253,7 +253,7 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) { } // add their addresses to the dialer's peerstore - r.query.dht.peerstore.AddPeerInfo(next) + r.query.dht.peerstore.AddAddrs(next.ID, next.Addrs, peer.TempAddrTTL) r.addPeerToQuery(cg.Context(), next.ID) log.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs) } diff --git a/routing/grandcentral/server.go b/routing/grandcentral/server.go index f51b71917..179b15b3e 100644 --- a/routing/grandcentral/server.go +++ b/routing/grandcentral/server.go @@ -96,7 +96,7 @@ func (s *Server) handleMessage( } for _, maddr := range provider.Addresses() { // FIXME do we actually want to store to peerstore - s.peerstore.AddAddress(p, maddr) + s.peerstore.AddAddr(p, maddr, peer.TempAddrTTL) } } var providers []dhtpb.Message_Peer diff --git a/test/sharness/lib/test-lib.sh b/test/sharness/lib/test-lib.sh index 1bc60f5e6..cdcf45c1e 100644 --- a/test/sharness/lib/test-lib.sh +++ b/test/sharness/lib/test-lib.sh @@ -64,11 +64,14 @@ test_cmp_repeat_10_sec() { test_cmp "$1" "$2" } -test_run_repeat_10_sec() { - for i in 1 2 3 4 5 6 7 8 9 10 +test_run_repeat_60_sec() { + for i in 1 2 3 4 5 6 do - (test_eval_ "$1") && return - sleep 1 + for i in 1 2 3 4 5 6 7 8 9 10 + do + (test_eval_ "$1") && return + sleep 1 + done done return 1 # failed } @@ -177,13 +180,13 @@ test_launch_ipfs_daemon() { test_expect_success "'ipfs daemon' is ready" ' IPFS_PID=$! && test_wait_output_n_lines_60_sec actual_daemon 2 && - test_run_repeat_10_sec "grep \"API server listening on $ADDR_API\" actual_daemon" || + test_run_repeat_60_sec "grep \"API server listening on $ADDR_API\" actual_daemon" || fsh cat actual_daemon || fsh cat daemon_err ' if test "$ADDR_GWAY" != ""; then test_expect_success "'ipfs daemon' output includes Gateway address" ' - test_run_repeat_10_sec "grep \"Gateway server listening on $ADDR_GWAY\" actual_daemon" || + test_run_repeat_60_sec "grep \"Gateway server listening on $ADDR_GWAY\" actual_daemon" || fsh cat daemon_err ' fi