1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-09-09 23:42:20 +08:00

dht: even more logging.

This commit is contained in:
Juan Batiz-Benet
2015-01-05 04:35:54 -08:00
parent f25dfb68b9
commit 172801712e
3 changed files with 72 additions and 52 deletions

View File

@ -7,6 +7,7 @@ import (
queue "github.com/jbenet/go-ipfs/p2p/peer/queue"
"github.com/jbenet/go-ipfs/routing"
u "github.com/jbenet/go-ipfs/util"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
pset "github.com/jbenet/go-ipfs/util/peerset"
todoctr "github.com/jbenet/go-ipfs/util/todocounter"
@ -55,32 +56,18 @@ func (q *dhtQuery) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, e
}
type dhtQueryRunner struct {
query *dhtQuery // query to run
peersSeen *pset.PeerSet // all peers queried. prevent querying same peer 2x
peersToQuery *queue.ChanQueue // peers remaining to be queried
peersRemaining todoctr.Counter // peersToQuery + currently processing
// the query to run
query *dhtQuery
result *dhtQueryResult // query result
errs []error // result errors. maybe should be a map[peer.ID]error
// peersToQuery is a list of peers remaining to query
peersToQuery *queue.ChanQueue
rateLimit chan struct{} // processing semaphore
log eventlog.EventLogger
// peersSeen are all the peers queried. used to prevent querying same peer 2x
peersSeen *pset.PeerSet
// rateLimit is a channel used to rate limit our processing (semaphore)
rateLimit chan struct{}
// peersRemaining is a counter of peers remaining (toQuery + processing)
peersRemaining todoctr.Counter
// context group
cg ctxgroup.ContextGroup
// result
result *dhtQueryResult
// result errors
errs []error
// lock for concurrent access to fields
sync.RWMutex
}
@ -96,6 +83,11 @@ func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner {
}
func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
log := log.Prefix("dht(%s).Query(%s).Run(%d)", r.query.dht.self, r.query.key, len(peers))
r.log = log
log.Debug("enter")
defer log.Debug("end")
log.Debugf("Run query with %d peers.", len(peers))
if len(peers) == 0 {
log.Warning("Running query with no peers!")
@ -115,6 +107,7 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
// go do this thing.
// do it as a child func to make sure Run exits
// ONLY AFTER spawn workers has exited.
log.Debugf("go spawn workers")
r.cg.AddChildFunc(r.spawnWorkers)
// so workers are working.
@ -124,41 +117,45 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
select {
case <-r.peersRemaining.Done():
log.Debug("all peers ended")
r.cg.Close()
r.RLock()
defer r.RUnlock()
if len(r.errs) > 0 {
err = r.errs[0]
err = r.errs[0] // take the first?
}
case <-r.cg.Closed():
log.Debug("r.cg.Closed()")
r.RLock()
defer r.RUnlock()
err = r.cg.Context().Err() // collect the error.
}
if r.result != nil && r.result.success {
log.Debug("success: %s", r.result)
return r.result, nil
}
log.Debug("failure: %s", err)
return nil, err
}
func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID) {
// if new peer is ourselves...
if next == r.query.dht.self {
r.log.Debug("addPeerToQuery skip self")
return
}
if !r.peersSeen.TryAdd(next) {
log.Debug("query peer was already seen")
r.log.Debugf("addPeerToQuery skip seen %s", next)
return
}
log.Debugf("adding peer to query: %v", next)
// do this after unlocking to prevent possible deadlocks.
r.log.Debugf("addPeerToQuery adding %s", next)
r.peersRemaining.Increment(1)
select {
case r.peersToQuery.EnqChan <- next:
@ -167,6 +164,10 @@ func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID) {
}
func (r *dhtQueryRunner) spawnWorkers(parent ctxgroup.ContextGroup) {
log := r.log.Prefix("spawnWorkers")
log.Debugf("begin")
defer log.Debugf("end")
for {
select {
@ -192,7 +193,9 @@ func (r *dhtQueryRunner) spawnWorkers(parent ctxgroup.ContextGroup) {
}
func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) {
log.Debugf("spawned worker for: %v", p)
log := r.log.Prefix("queryPeer(%s)", p)
log.Debugf("spawned")
defer log.Debugf("finished")
// make sure we rate limit concurrency.
select {
@ -203,34 +206,36 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) {
}
// ok let's do this!
log.Debugf("running worker for: %v", p)
log.Debugf("running")
// make sure we do this when we exit
defer func() {
// signal we're done proccessing peer p
log.Debugf("completing worker for: %v", p)
log.Debugf("completed")
r.peersRemaining.Decrement(1)
r.rateLimit <- struct{}{}
}()
// make sure we're connected to the peer.
if conns := r.query.dht.host.Network().ConnsToPeer(p); len(conns) == 0 {
log.Infof("worker for: %v -- not connected. dial start", p)
log.Infof("not connected. dialing.")
pi := peer.PeerInfo{ID: p}
if err := r.query.dht.host.Connect(cg.Context(), pi); err != nil {
log.Debugf("ERROR worker for: %v -- err connecting: %v", p, err)
log.Debugf("Error connecting: %s", err)
r.Lock()
r.errs = append(r.errs, err)
r.Unlock()
return
}
log.Infof("worker for: %v -- not connected. dial success!", p)
log.Debugf("connected. dial success.")
}
// finally, run the query against this peer
log.Debugf("query running")
res, err := r.query.qfunc(cg.Context(), p)
log.Debugf("query finished")
if err != nil {
log.Debugf("ERROR worker for: %v %v", p, err)
@ -239,7 +244,7 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) {
r.Unlock()
} else if res.success {
log.Debugf("SUCCESS worker for: %v", p, res)
log.Debugf("SUCCESS worker for: %v %s", p, res)
r.Lock()
r.result = res
r.Unlock()