mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-02 03:28:25 +08:00
added bootstrap logging
This commit is contained in:
@ -1,17 +1,21 @@
|
|||||||
package core
|
package core
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
|
||||||
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
|
||||||
config "github.com/jbenet/go-ipfs/config"
|
config "github.com/jbenet/go-ipfs/config"
|
||||||
inet "github.com/jbenet/go-ipfs/net"
|
inet "github.com/jbenet/go-ipfs/net"
|
||||||
peer "github.com/jbenet/go-ipfs/peer"
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
dht "github.com/jbenet/go-ipfs/routing/dht"
|
dht "github.com/jbenet/go-ipfs/routing/dht"
|
||||||
|
lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"
|
||||||
math2 "github.com/jbenet/go-ipfs/util/math2"
|
math2 "github.com/jbenet/go-ipfs/util/math2"
|
||||||
|
|
||||||
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
|
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -50,14 +54,23 @@ func bootstrap(ctx context.Context,
|
|||||||
|
|
||||||
connectedPeers := n.Peers()
|
connectedPeers := n.Peers()
|
||||||
if len(connectedPeers) >= recoveryThreshold {
|
if len(connectedPeers) >= recoveryThreshold {
|
||||||
|
log.Event(ctx, "bootstrapSkip", n.LocalPeer())
|
||||||
|
log.Debugf("%s bootstrap skipped -- connected to %d (> %d) nodes",
|
||||||
|
n.LocalPeer(), len(connectedPeers), recoveryThreshold)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
numCxnsToCreate := recoveryThreshold - len(connectedPeers)
|
numCxnsToCreate := recoveryThreshold - len(connectedPeers)
|
||||||
|
|
||||||
|
log.Event(ctx, "bootstrapStart", n.LocalPeer())
|
||||||
|
log.Debugf("%s bootstrapping to %d more nodes", n.LocalPeer(), numCxnsToCreate)
|
||||||
|
|
||||||
var bootstrapPeers []peer.PeerInfo
|
var bootstrapPeers []peer.PeerInfo
|
||||||
for _, bootstrap := range boots {
|
for _, bootstrap := range boots {
|
||||||
p, err := toPeer(bootstrap)
|
p, err := toPeer(bootstrap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Event(ctx, "bootstrapError", n.LocalPeer(), lgbl.Error(err))
|
||||||
|
log.Errorf("%s bootstrap error: %s", n.LocalPeer(), err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
bootstrapPeers = append(bootstrapPeers, p)
|
bootstrapPeers = append(bootstrapPeers, p)
|
||||||
@ -70,14 +83,30 @@ func bootstrap(ctx context.Context,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(notConnected) < 1 {
|
||||||
|
s := "must bootstrap to %d more nodes, but already connected to all candidates"
|
||||||
|
err := fmt.Errorf(s, numCxnsToCreate)
|
||||||
|
log.Event(ctx, "bootstrapError", n.LocalPeer(), lgbl.Error(err))
|
||||||
|
log.Errorf("%s bootstrap error: %s", n.LocalPeer(), err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
var randomSubset = randomSubsetOfPeers(notConnected, numCxnsToCreate)
|
var randomSubset = randomSubsetOfPeers(notConnected, numCxnsToCreate)
|
||||||
|
|
||||||
|
log.Debugf("%s bootstrapping to %d nodes: %s", n.LocalPeer(), numCxnsToCreate, randomSubset)
|
||||||
if err := connect(ctx, ps, r, randomSubset); err != nil {
|
if err := connect(ctx, ps, r, randomSubset); err != nil {
|
||||||
|
log.Event(ctx, "bootstrapError", n.LocalPeer(), lgbl.Error(err))
|
||||||
|
log.Errorf("%s bootstrap error: %s", n.LocalPeer(), err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func connect(ctx context.Context, ps peer.Peerstore, r *dht.IpfsDHT, peers []peer.PeerInfo) error {
|
func connect(ctx context.Context, ps peer.Peerstore, r *dht.IpfsDHT, peers []peer.PeerInfo) error {
|
||||||
|
if len(peers) < 1 {
|
||||||
|
return errors.New("bootstrap set empty")
|
||||||
|
}
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for _, p := range peers {
|
for _, p := range peers {
|
||||||
|
|
||||||
@ -88,6 +117,9 @@ func connect(ctx context.Context, ps peer.Peerstore, r *dht.IpfsDHT, peers []pee
|
|||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(p peer.PeerInfo) {
|
go func(p peer.PeerInfo) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
log.Event(ctx, "bootstrapDial", r.LocalPeer(), p.ID)
|
||||||
|
log.Debugf("%s bootstrapping to %s", r.LocalPeer(), p.ID)
|
||||||
|
|
||||||
ps.AddAddresses(p.ID, p.Addrs)
|
ps.AddAddresses(p.ID, p.Addrs)
|
||||||
err := r.Connect(ctx, p.ID)
|
err := r.Connect(ctx, p.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -135,6 +135,7 @@ func (n *network) newConnHandler(c *swarm.Conn) {
|
|||||||
// DialPeer attempts to establish a connection to a given peer.
|
// DialPeer attempts to establish a connection to a given peer.
|
||||||
// Respects the context.
|
// Respects the context.
|
||||||
func (n *network) DialPeer(ctx context.Context, p peer.ID) error {
|
func (n *network) DialPeer(ctx context.Context, p peer.ID) error {
|
||||||
|
log.Debugf("[%s] network dialing peer [%s]", n.local, p)
|
||||||
sc, err := n.swarm.Dial(ctx, p)
|
sc, err := n.swarm.Dial(ctx, p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -242,8 +243,9 @@ func (n *network) Connectedness(p peer.ID) Connectedness {
|
|||||||
// NewStream returns a new stream to given peer p.
|
// NewStream returns a new stream to given peer p.
|
||||||
// If there is no connection to p, attempts to create one.
|
// If there is no connection to p, attempts to create one.
|
||||||
// If ProtocolID is "", writes no header.
|
// If ProtocolID is "", writes no header.
|
||||||
func (c *network) NewStream(pr ProtocolID, p peer.ID) (Stream, error) {
|
func (n *network) NewStream(pr ProtocolID, p peer.ID) (Stream, error) {
|
||||||
s, err := c.swarm.NewStreamWithPeer(p)
|
log.Debugf("[%s] network opening stream to peer [%s]: %s", n.local, p, pr)
|
||||||
|
s, err := n.swarm.NewStreamWithPeer(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -77,6 +77,11 @@ func NewDHT(ctx context.Context, p peer.ID, n inet.Network, dstore ds.ThreadSafe
|
|||||||
return dht
|
return dht
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LocalPeer returns the peer.Peer of the dht.
|
||||||
|
func (dht *IpfsDHT) LocalPeer() peer.ID {
|
||||||
|
return dht.self
|
||||||
|
}
|
||||||
|
|
||||||
// Connect to a new peer at the given address, ping and add to the routing table
|
// Connect to a new peer at the given address, ping and add to the routing table
|
||||||
func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.ID) error {
|
func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.ID) error {
|
||||||
if err := dht.network.DialPeer(ctx, npeer); err != nil {
|
if err := dht.network.DialPeer(ctx, npeer); err != nil {
|
||||||
|
Reference in New Issue
Block a user