diff --git a/p2p/peer/queue/sync.go b/p2p/peer/queue/sync.go index 3f75cd0cf..3d7aa68ad 100644 --- a/p2p/peer/queue/sync.go +++ b/p2p/peer/queue/sync.go @@ -4,8 +4,11 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" peer "github.com/jbenet/go-ipfs/p2p/peer" + eventlog "github.com/jbenet/go-ipfs/util/eventlog" ) +var log = eventlog.Logger("peerqueue") + // ChanQueue makes any PeerQueue synchronizable through channels. type ChanQueue struct { Queue PeerQueue @@ -21,6 +24,7 @@ func NewChanQueue(ctx context.Context, pq PeerQueue) *ChanQueue { } func (cq *ChanQueue) process(ctx context.Context) { + log := log.Prefix("", cq) // construct the channels here to be able to use them bidirectionally enqChan := make(chan peer.ID) @@ -30,6 +34,8 @@ func (cq *ChanQueue) process(ctx context.Context) { cq.DeqChan = deqChan go func() { + log.Debug("processing") + defer log.Debug("closed") defer close(deqChan) var next peer.ID @@ -38,11 +44,13 @@ func (cq *ChanQueue) process(ctx context.Context) { for { if cq.Queue.Len() == 0 { + // log.Debug("wait for enqueue") select { case next, more = <-enqChan: if !more { return } + // log.Debug("got", next) case <-ctx.Done(): return @@ -50,19 +58,24 @@ func (cq *ChanQueue) process(ctx context.Context) { } else { next = cq.Queue.Dequeue() + // log.Debug("peek", next) } select { case item, more = <-enqChan: if !more { - return + if cq.Queue.Len() > 0 { + return // we're done done. + } + enqChan = nil // closed, so no use. } - + // log.Debug("got", item) cq.Queue.Enqueue(item) - cq.Queue.Enqueue(next) + cq.Queue.Enqueue(next) // order may have changed. next = "" case deqChan <- next: + // log.Debug("dequeued", next) next = "" case <-ctx.Done():