mirror of
https://github.com/containers/podman.git
synced 2025-12-02 02:58:03 +08:00
Also, do a general cleanup of all the timeout code. Changes include: - Convert from int to *uint where possible. Timeouts cannot be negative, hence the uint change; and a timeout of 0 is valid, so we need a new way to detect that the user set a timeout (hence, pointer). - Change name in the database to avoid conflicts between new data type and old one. This will cause timeouts set with 4.2.0 to be lost, but considering nobody is using the feature at present (and the lack of validation means we could have invalid, negative timeouts in the DB) this feels safe. - Ensure volume plugin timeouts can only be used with volumes created using a plugin. Timeouts on the local driver are nonsensical. - Remove the existing test, as it did not use a volume plugin. Write a new test that does. The actual plumbing of the containers.conf timeout in is one line in volume_api.go; the remainder are the above-described cleanups. Signed-off-by: Matthew Heon <mheon@redhat.com>
93 lines
2.1 KiB
Go
93 lines
2.1 KiB
Go
package queue
|
|
|
|
import (
|
|
"errors"
|
|
"sync"
|
|
)
|
|
|
|
var ErrQueueClosed = errors.New("the queue is closed for reading and writing")
|
|
|
|
// MessageQueue represents a threadsafe message queue to be used to retrieve or
|
|
// write messages to.
|
|
type MessageQueue struct {
|
|
m *sync.RWMutex
|
|
c *sync.Cond
|
|
messages []interface{}
|
|
closed bool
|
|
}
|
|
|
|
// NewMessageQueue returns a new MessageQueue.
|
|
func NewMessageQueue() *MessageQueue {
|
|
m := &sync.RWMutex{}
|
|
return &MessageQueue{
|
|
m: m,
|
|
c: sync.NewCond(m),
|
|
messages: []interface{}{},
|
|
}
|
|
}
|
|
|
|
// Enqueue writes `msg` to the queue.
|
|
func (mq *MessageQueue) Enqueue(msg interface{}) error {
|
|
mq.m.Lock()
|
|
defer mq.m.Unlock()
|
|
|
|
if mq.closed {
|
|
return ErrQueueClosed
|
|
}
|
|
mq.messages = append(mq.messages, msg)
|
|
// Signal a waiter that there is now a value available in the queue.
|
|
mq.c.Signal()
|
|
return nil
|
|
}
|
|
|
|
// Dequeue will read a value from the queue and remove it. If the queue
|
|
// is empty, this will block until the queue is closed or a value gets enqueued.
|
|
func (mq *MessageQueue) Dequeue() (interface{}, error) {
|
|
mq.m.Lock()
|
|
defer mq.m.Unlock()
|
|
|
|
for !mq.closed && mq.size() == 0 {
|
|
mq.c.Wait()
|
|
}
|
|
|
|
// We got woken up, check if it's because the queue got closed.
|
|
if mq.closed {
|
|
return nil, ErrQueueClosed
|
|
}
|
|
|
|
val := mq.messages[0]
|
|
mq.messages[0] = nil
|
|
mq.messages = mq.messages[1:]
|
|
return val, nil
|
|
}
|
|
|
|
// Size returns the size of the queue.
|
|
func (mq *MessageQueue) Size() int {
|
|
mq.m.RLock()
|
|
defer mq.m.RUnlock()
|
|
return mq.size()
|
|
}
|
|
|
|
// Nonexported size check to check if the queue is empty inside already locked functions.
|
|
func (mq *MessageQueue) size() int {
|
|
return len(mq.messages)
|
|
}
|
|
|
|
// Close closes the queue for future writes or reads. Any attempts to read or write from the
|
|
// queue after close will return ErrQueueClosed. This is safe to call multiple times.
|
|
func (mq *MessageQueue) Close() {
|
|
mq.m.Lock()
|
|
defer mq.m.Unlock()
|
|
|
|
// Already closed, noop
|
|
if mq.closed {
|
|
return
|
|
}
|
|
|
|
mq.messages = nil
|
|
mq.closed = true
|
|
// If there's anybody currently waiting on a value from Dequeue, we need to
|
|
// broadcast so the read(s) can return ErrQueueClosed.
|
|
mq.c.Broadcast()
|
|
}
|