mirror of
https://github.com/ipfs/kubo.git
synced 2025-10-12 09:15:03 +08:00
clean up channel use
This commit is contained in:

committed by
Brian Tiger Chow

parent
b77a785cd8
commit
98c3afeecf
@ -9,50 +9,66 @@ import (
|
||||
// ChanQueue makes any PeerQueue synchronizable through channels.
|
||||
type ChanQueue struct {
|
||||
Queue PeerQueue
|
||||
EnqChan chan *peer.Peer
|
||||
DeqChan chan *peer.Peer
|
||||
EnqChan chan<- *peer.Peer
|
||||
DeqChan <-chan *peer.Peer
|
||||
}
|
||||
|
||||
// NewChanQueue creates a ChanQueue by wrapping pq.
|
||||
func NewChanQueue(ctx context.Context, pq PeerQueue) *ChanQueue {
|
||||
cq := &ChanQueue{
|
||||
Queue: pq,
|
||||
EnqChan: make(chan *peer.Peer, 10),
|
||||
DeqChan: make(chan *peer.Peer, 10),
|
||||
}
|
||||
go cq.process(ctx)
|
||||
cq := &ChanQueue{Queue: pq}
|
||||
cq.process(ctx)
|
||||
return cq
|
||||
}
|
||||
|
||||
func (cq *ChanQueue) process(ctx context.Context) {
|
||||
var next *peer.Peer
|
||||
|
||||
for {
|
||||
// construct the channels here to be able to use them bidirectionally
|
||||
enqChan := make(chan *peer.Peer, 10)
|
||||
deqChan := make(chan *peer.Peer, 10)
|
||||
|
||||
if cq.Queue.Len() == 0 {
|
||||
select {
|
||||
case next = <-cq.EnqChan:
|
||||
case <-ctx.Done():
|
||||
close(cq.DeqChan)
|
||||
return
|
||||
cq.EnqChan = enqChan
|
||||
cq.DeqChan = deqChan
|
||||
|
||||
go func() {
|
||||
defer close(deqChan)
|
||||
|
||||
var next *peer.Peer
|
||||
var item *peer.Peer
|
||||
var more bool
|
||||
|
||||
for {
|
||||
if cq.Queue.Len() == 0 {
|
||||
select {
|
||||
case next, more = <-enqChan:
|
||||
if !more {
|
||||
return
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
} else {
|
||||
next = cq.Queue.Dequeue()
|
||||
}
|
||||
|
||||
} else {
|
||||
next = cq.Queue.Dequeue()
|
||||
select {
|
||||
case item, more = <-enqChan:
|
||||
if !more {
|
||||
return
|
||||
}
|
||||
|
||||
cq.Queue.Enqueue(item)
|
||||
cq.Queue.Enqueue(next)
|
||||
next = nil
|
||||
|
||||
case deqChan <- next:
|
||||
next = nil
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case item := <-cq.EnqChan:
|
||||
cq.Queue.Enqueue(item)
|
||||
cq.Queue.Enqueue(next)
|
||||
next = nil
|
||||
|
||||
case cq.DeqChan <- next:
|
||||
next = nil
|
||||
|
||||
case <-ctx.Done():
|
||||
close(cq.DeqChan)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
Reference in New Issue
Block a user